盒子
盒子
文章目录
  1. 1. 什么是RDD?
  2. 2. RDD 的特点
  3. 3. Spark RDD 的操作类型
    1. 3.1 Transformation 操作:
    2. 3.2 action 操作
  4. 4. 创建 RDD
    1. 4.1 使用集合创建 RDD
    2. 4.2 从外部数据源创建 RDD
      1. 4.2.1 读取本地文件
      2. 4.2.2 读取 HDFS 上的数据
      3. 4.2.3 提交应用程序到 Spark 集群
      4. 4.2.4 配置并启动 Spark History Server
  5. 5. 向 Spark 算子传递函数
  6. 6. Spark 算子实战——transformation
    1. 6.1 map 和 flatMap 算子
    2. 6.2 filter 算子
    3. 6.3 distinct 算子
    4. 6.4 mapPartitions
    5. 6.5 mapPartitionWithIndex()
    6. 6.6 union 并集
    7. 6.7 intersection(交集)
    8. 6.8 PairRDD
      1. 6.8.1 PairRDD 上的 transformation 操作
  7. 7. Spark 算子实战–action
    1. 7.1 count
    2. 7.2 take
    3. 7.3 top
    4. 7.4 countByValue
    5. 7.5 reduce
    6. 7.6 collect
    7. 7.7 foreach (无返回值)
    8. 7.8 foreachParitition(无返回值)
    9. 7.9 作业:
  8. 8. Spark RDD 分区实战
    1. 8.1 RDD partition 概念
    2. 8.2 RDD partition 的相关属性
    3. 8.3 查看 RDD partition 信息
    4. 8.4 RDD 的初始分区
    5. 8.5 Transformation 操作对分区的影响
      1. 8.5.1 普通 RDD 操作
      2. 8.5.2 Key-value RDD 操作
    6. 8.6 有多少分区是合适的(重点!!)
      1. 8.6.1 分区太少的缺点
      2. 8.6.2 分区太多的缺点
    7. 8.7 Spark 中的分区器
      1. 8.7.1 HashPartitioner
      2. 8.7.2 RangePartitioner
      3. 8.7.3 自定义分区器
  9. 9. Spark RDD 数据保存实战
    1. 9.1 保存数据到 HDFS
    2. 9.2 保存数据到 mysql 数据库
      1. 9.2.1 读
      2. 9.2.2 写
    3. 9.3 保存数据到 kafka
  10. 10. Spark RDD 缓存实战
    1. 10.1 前言
    2. 10.2 要点
    3. 10.3 RDD 持久化存储级别如何选择

Spark 编程核心抽象——RDD

1. 什么是RDD?

RDD 是 Resilient Distributed Dataset(弹性分布式数据集) 的简称。它是 Apache Spark 的基本数据结构。它是一个不可变的对象集合,在集群的不同节点上进行计算。

  • Resilient: 即在 RDD lineage(DAG) 的帮助下具有容错能力,能够重新计算由于节点故障而丢失或损坏的数据分区。
  • Distributed: 数据分布在多个节点上。
  • Dataset: 表示所操作的数据集。用户可以通过 JDBC 从外部加载数据集,数据集可以是 JSON 文件,CSV 文件,文本文件或数据库。

2. RDD 的特点

  1. 内存计算:它将中间计算结果存储在分布式内存(RAM)中,而不是磁盘中。

  2. 延迟计算:Apache Spark 中的所有 transformation 都是惰性的,因为它们不会立即计算结果,它们会记住应用于数据集的那些 transformation。直到 action 出现时,才会真正开始计算。

  3. 容错性:Spark RDDs 能够容错,因为它们跟踪数据沿袭(lineage)信息,以便在故障时自动重建丢失的数据。

  4. 不可变性:跨进程共享数据是安全的。它也可以在任何时候创建或检索,这使得缓存、共享和复制变得容易。因此,它是一种在计算中达到一致性的方法。

  5. 分区性:partition 是 Spark RDD 中并行性的基本单元,每个分区都是数据的逻辑分区。Partition—task 一 一对应。

  6. 持久化:用户可以声明他们将重用哪些 RDDs,并为它们选择存储策略。

  7. 数据本地性:RDDs 能够定义计算分区的位置首选项。位置首选项是关于 RDD 位置的信息。

3. Spark RDD 的操作类型

Apache Spark 中的 RDD 支持两种操作:

  • Transformation
  • Action

3.1 Transformation 操作:

Spark RDD transformation 操作是一个从现有的 RDD 生成新 RDD 的函数(方法、算子)。如:map(), filter(), reduceByKey()。

Transformation 操作都是延迟计算的操作。

有两种类型:窄变换、宽变换(窄依赖、宽依赖)。

  1. 窄依赖:它是map、filter 这样数据来自一个单独分区的操作。即输出 RDD 分区中的数据,来自父 RDD 中的单个分区。不需要 shuffle 操作就能解决。
    窄依赖.png

窄依赖算子:map(), flatMap(), mapPartition(), filter(), sample(), union()

  1. 宽依赖:在子 RDD 单个分区中计算结果所需的数据可能存在于父 RDD 的多个分区中。类似 groupByKey() 和 reduceBykey() 这样的 transformation。宽依赖也称为 shuffle transformation。
    宽依赖.png

宽依赖算子:intersection(), distinct(), reduceByKey(), groupByKey(), join(), cartesian(), repartition(), coalesce()。

3.2 action 操作

Spark 中的 action 操作 ,返回 RDD 计算 的最终结果,其结果是一个值,而不是一个 RDD

Action 触发血缘关系中 RDD 上的 transformation 操作的真正计算,计算结果返回 Driver端或者写入数据库。

这种设计使 Spark 运行更加高效。例如:map操作返回的数据集用于 reduce 操作,返回到 driver 端的只是 reduce 的结果值,而不是 map操作的数据集。

常见的 Action:first(), take(), reduce(), collect(), the count()。

4. 创建 RDD

三种创建 RDD 的方法:

  1. 使用集合创建 RDD(parallelize
  2. 使用已有 RDD 创建 RDD(父生子
  3. 从外部数据源创建 RDD(textFile

在我们学习 Spark 的初始阶段 ,RDD 通常由集合创建的,即在 Driver 程序中创建集合并将其传递给 SparkContext 的 paralize() 方法。这种方法很少在 正式环境中使用,因为这种方法的整个数据集位于一台主机上。

首先实例化 SparkContext 对象:

Scala

1
2
val sparkConf = new SparkConf().setAppName("sc_wordcount").setMaster("local[*]")
val sc = new SparkContext(sparkConf)

Java:

1
2
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext jsc = new JavaSparkContext(conf);

其中 appName 用于显示在 Spark 集群的 webUI上面。master 是一个 spark、YARN、mesos 集群 URL,或者是一个 local 字符串。实际项目中,在集群上运行时,不会对 master 进行硬编码。而是用 spark-submit 启动应用程序,并传递 master 给应用程序。但是,对于本地测试和单元测试,可以使用 local 运行 Spark
sparksubmit示例.png

4.1 使用集合创建 RDD

Scala:

1
2
val data = Array(1, 2, 3, 4, 5)
val distData = sc.paralize(data)

Java:

1
2
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.paralize(data);

4.2 从外部数据源创建 RDD

Spark 可以从 Hadoop 支持的任何存储源创建分布式数据集,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3等。
Spark 支持文本文件、SequenceFiles 和任何其他 Hadoop InputFormat。

4.2.1 读取本地文件

文本文件 RDDs 可以使用 SparkContext 的 textFile 方法创建。此方法接受文件的 URI(机器上的本地文件路径、hdfs://、s3a://等URI),并将其作为行集合读取。下面是一个示例:

1
val distFile = sc.textFile("data.txt")

一旦创建,就可以对 distFile 进行相应操作。例如,我们 可以将所有行的长度相加,使用 map 和 reduce 操作。

1
distFile.map(line=>line.length).reduce(_+_)

关于用 Spark 读取文件的一些注意事项

  1. 如果使用本地文件系统上的路径,则必须在 worker 节点上的同一路径上,此文件可访问。要么将文件复制到所有 worker 上,要么使用一个挂载网络的共享文件系统。

  2. Spark 所有基于文件的输入方法(textFile 等),支持在目录、压缩文件和通配符上运行,例如:

    1
    textFile("/my/directory"), textFile("/my/directory/*.txt"), textFile("/my/directory/*.gz))
  3. textFile 方法还接受一个可选的第二个参数,用于控制文件的分区数量。默认情况下,Spark 为文件的每个块创建一个分区(HDFS 中的块默认 是 128MB),但是 您也可以通过传递更大的值来要求更高数量的分区。注意,分区数不能少于块数。

除了文本文件,Spark 的 Scala API 还支持其他几种数据格式

  1. SparkContext.wholeTextFile 允许您读取包含多个小文本文件的目录,并将它们作为(filename, content)的键值对返回。这与 textFile 不同,textFile 将在每个文件中每行返回一条 记录。分区由数据本地性决定,在某些情况下,数据本地性可能导致分区太少。对于这些情况,wholeTextFile 提供了控制最小分区数量的第二个可选 参数。

  2. 对于** SequenceFiles**,使用 SparkContext 的 sequenceFile[K, V]方法,其中K和V是文件中的键和值的类型。这些应该是 Hadoop Writable 接口的子类,比如 IntWritable 和 Text。

  3. 对于其他 Hadoop inputformat,您可以使用 SparkContext.hadoopRDD方法,它接受任意的 JobConf 和 输入格式类、键类和值类。将这些设置为与使用输入源 Hadoop 作业相同的方式。还可以使用 SparkContext。基于“new”MapReduce API(org.apache.hadoop.mapreduce)的 inputformat 的 newAPIHadoopRDD。

4.2.2 读取 HDFS 上的数据

  1. 启动 HDFS
  2. 读取 HDFS 上的数据
    1
    2
    3
    val textFileRDD = sc.textFile("hdfs://bigdata01:9000/textdata/order.txt")
    val count = textFileRDD.count()
    println("count:" + count)

4.2.3 提交应用程序到 Spark 集群

  1. 打包应用程序

  2. 上传 jar 包到服务器

  3. 运行 spark-submit 命令
    ./spark-submit –class sparkcore.learnTextFile –deploy-mode client /opt/sparkapp/learnTextFile.jar

4.2.4 配置并启动 Spark History Server

  1. 重命名 conf/spark-deaults.conf.template 为 conf/spark-defaults.conf

  2. 修改 spark-defaults.conf 配置文件,并同步到其他节点。
    修改前:
    spark historyserver 配置1.png
    修改后(注意:hdfs 目录要先创建):
    spark historyserver 配置2.png

  3. 启动 ./start-history-server.sh
    spark historyserver 配置3.png

  4. 访问 http://bigdata01:18080/ webUI

5. 向 Spark 算子传递函数

Spark API 严重依赖于将 dirver 程序中的函数传递到集群上运行

Scala:
推荐使用如下两种方式实现函数的传递:

  1. 匿名函数语法,可用于短代码段。
  2. 全局单例对象中的静态方法。例如,您可以定义对象 MyFunctions,然后传递 MyFunctions.func1,如下所示:
    1
    2
    3
    4
    5
    object MyFunctions{
    def func1(s:String): String = {...}
    }

    myRDD.map(MyFunctions.func1)

Java:
在 Java 中,函数由实现 org.apacche.spark.api.java.function 包中的接口的类表示。
Java 函数编程1.png

Java 函数编程2.png

有两种方法可以创建这样的函数:

  1. 可以是匿名内部类
  2. 也可以是创建 类实现相应接口,并将其实例传递给 Spark

使用lambda 表达式简洁的定义实现。
例如,可以这样编写代码:

1
2
3
4
5
6
7
8
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>(){
public Integer call(String s) {return s.length();}
});

int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>(){
public Integer call(Integer a, Integer b){return a + b;}
});

或者 可以这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class GetLength implements Function<String, Integer>{
public Integer call(String s){ return s.length();}
}

class Sum implements Function2<Integer, Integer, Integer> {

public Integer call(Integer a, Integer b) { return a + b; }

}

JavaRDD<String> lines = sc.textFile("data.txt");

JavaRDD<Integer> lineLengths = lines.map(new GetLength());

int totalLength = lineLengths.reduce(new Sum());

注意:Java 中的匿名内部类也可以访问闭包作用域中的变量,只要它们被标记为 final。 Spark 将把这些变量的副本发送到每个 worker 节点上。

6. Spark 算子实战——transformation

6.1 map 和 flatMap 算子

map(): 将传入的函数应用于RDD 中的每一条记录,返回由函数结果组成的新 RDD。函数的结果值是一个对象,不是一个集合。

flatMap(): 与map() 操作类似。但是传入 flatMap() 的函数可以返回 0个、1个或多个结果值。即函数结果值是一个集合,而不是一个对象。

map操作 Scala版本

1
2
3
4
5
val textFileRDD = sc.textFile("in/README.md")
val uppercaseRDD = textFileRDD.map(line=>line.toUpperCase)
for ( elem <- uppercaseRDDD.take(3)){
println(elem)
}

map操作 Java版本

1
2
3
4
5
JavaRDD<String> javaRDD = jsc.textFile("in/README.md");
JavaRDD<String> map = javaRDD.map(line -> line.toUpperCase());
for (String line : map.take(3)) {
System.out.println(line);
}

flatMap操作 Scala版本

1
2
val flatMapRDD = textFileRDD.flatMap(line => line.split(" "))
flatMapRDD.take(3).foreach(println)

flatMap操作 Java版本

1
2
3
JavaRDD<String> javaRDD = jsc.textFile("in/README.md");
JavaRDD<String> wordsRDD = javaRDD.flatMap(line -> (Arrays.asList(line.split(" ")).iterator()));
wordsRDD.take(3).forEach(word-> System.out.println(word));

flatMap执行过程.png
从Spark map() 和 flatMap() 的比较中可以看出,Spark map函数表达的是一对一的转换。它将集合的每个数据元素转换为结果集合的一个数据元素。而Spark flatMap 函数表示一对多的转换。它将每个元素转换为 0 或更多的元素。

6.2 filter 算子

Spark RDD filter() 函数返回一个新的 RDD,只包含满足过滤条件的元素。这是一个窄依赖的操作,不会将数据从一个分区转移到其他分区。——不会发生shuffle。
例如,假设 RDD 包含5个整数(1, 2, 3, 4, 5),过滤条件是 判断是否偶数。过滤后得到的 RDD将只包含偶数,即 2 和 4。

Scala:

1
2
val filterRDD = fileRDD.filter(line => line.contains("Spark"))
filterRDD.foreach(println)

Java:

1
2
JavaRDD<String> filterRDD = javaRDD.filter(line -> line.contains("Spark"));
filterRDD.foreach(line -> System.out.println(line));

6.3 distinct 算子

返回 RDD 中的非重复记录。注意:此操作是昂贵的,因为它需要对数据进行 shuffle。

Scala:

1
2
val distintRDD = sc.parallelize(Seq(1,2,3,4,5,1,2,3,4)).distinct()
distintRDD.foreach(println)

Java:

1
2
3
JavaRDD<Integer> numsRDD = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 1, 2, 3, 4));
JavaRDD<Integer> distinctRDD = numsRDD.distinct();
distinctRDD.foreach(ele -> System.out.println(ele));

6.4 mapPartitions

在 mapPartition() 函数中,map() 函数同时应用于每个 partition 分区。对比学习 foreachPartition() 函数,foreachPartition() 是一个action 算子,操作方式与 mapPartition相同。

Scala:

1
2
3
4
5
6
7
val mapPartitionRDD = fileRDD.mapPartitions(partition => {
// map 每一个分区,然后再 map 分区中的每一个元素
partition.map(line => line.toUpperCase())
})

// foreach 是一个没有返回值的 action
mapPartitionRDD.foreach(println)

Java:

1
2
3
4
5
6
7
8
JavaRDD<String> mapPartitionsRDD = javaRDD.mapPartitions(stringIterator -> {
List<String> list = new ArrayList<>();
while (stringIterator.hasNext()) {
list.add(stringIterator.next().toUpperCase());
}
return list.iterator();
});
mapPartitionsRDD.foreach(line-> System.out.println(line));

6.5 mapPartitionWithIndex()

就像 mapPartition,除了 mapPartition外,它还为传入的函数提供了一个整数值,表示分区的索引,map()在分区索引上依次应用。

Scala:

1
2
3
4
val mapPartitionRDD = fileRDD.mapPartitionsWithIndex((index, partition) => {
partition.map(line => index + line.toUpperCase())
})
mapPartitionRDD.foreach(println)

Java:

1
2
3
4
5
6
7
8
JavaRDD<String> mapPartitionsRDD = javaRDD.mapPartitionsWithIndex((index, stringIterator) -> {
List<String> list = new ArrayList<>();
while (stringIterator.hasNext()) {
list.add(index + stringIterator.next().toUpperCase());
}
return list.iterator();
}, false);
mapPartitionsRDD.foreach(line -> System.out.println(line));

6.6 union 并集

使用 union() 函数,我们可以在新的 RDD 中获得两个 RDD 的元素。这个函数的关键规则是两个RDDs 属于同一类型。例如,RDD1 的元素是(Spark, Spark, Hadoop, Flink),而 RDD2 的元素是(Big data, Spark, Flink),所以结果 union(rdd1.union) 有元素(Spark, Spark, Spark, Hadoop, Flink, Flikn, Big data)

Scala:

1
2
3
4
val RDD1 = sc.parallelize(Seq(1,2,3,4))
val RDD2 = sc.parallelize(Seq(1,2,3,4))
val unionRDD = RDD1.union(RDD2)
unionRDD.foreach(println)

Java:

1
2
3
4
JavaRDD<Integer> RDD1 = jsc.parallelize(Arrays.asList(1, 2, 3, 4));
JavaRDD<Integer> RDD2 = jsc.parallelize(Arrays.asList(1, 2, 3, 4));
JavaRDD<Integer> unionRDD = RDD1.union(RDD2);
unionRDD.foreach(num -> System.out.println(num));

6.7 intersection(交集)

使用 intersection() 函数,我们只得到新 RDD 中的两个 RDD 的公共元素。这个函数的关键规则是这两个 RDDs 应该是同一类型的

举个例子,RDD1 的元素是(Spark, Spark, Hadoop, Flink),RDD2的元素是(Big data, Spark, Flink) 交集(RDD1.intersection(RDD2))将包含元素(Spark)。

Scala:

1
2
3
4
val RDD1 = sc.parallelize(Seq(1,2,3,4))
val RDD2 = sc.parallelize(Seq(1,2,3,4))
val intersectionRDD = RDD1.intersection(RDD2)
intersectionRDD.foreach(println)

Java:

1
2
3
4
JavaRDD<Integer> RDD1 = jsc.parallelize(Arrays.asList(1, 2, 3, 4));
JavaRDD<Integer> RDD2 = jsc.parallelize(Arrays.asList(1, 2, 3, 4));
JavaRDD<Integer> intersectionRDD = RDD1.intersection(RDD2);
intersectionRDD.foreach(num -> System.out.println(num));

6.8 PairRDD

现实生活中的许多数据集通常是键值对形式的。例如:包含课程名称和选修课程的学生名单的数据集。
这种数据集的典型模式是每一行都是一个key映射到一个或多个value。为此,Spark提供了一个名为 PairRDD 的数据结构,而不是常规的 RDD。这使得处理此类数据更加简单和高效。

PairRDD 是一种特殊类型的 RDD,可以存储 键-值对

创建 PairRDD:

  1. 通过键值数据结构列表构建 Pair RDD。键值数据结构称为 tuple2 元组。(Java 语言没有内置的 tuple类型,所以Spark 的 Java API 允许用户使用 scala.Tuple2 类创建元组)。

Scala:

1
2
val tuple = List(("张三", "语文"), ("李四", "数学"), ("王五", "英语"))
val pairRDD = sc.parallelize(tuple)pairRDD.foreach(t => println(t._1 + ": " + t._2))

Java:

1
2
3
4
5
List<Tuple2<String, String>> tuple2s = Arrays.asList(new Tuple2<>("张三", "语文"),
new Tuple2<>("李四", "数学"),
new Tuple2<>("王五", "英语"));
JavaPairRDD<String, String> pairRDD = jsc.parallelizePairs(tuple2s);
pairRDD.foreach(t -> System.out.println(t._1 + ": " + t._2));
  1. 将一个常规的 RDD 转换为 PairRDD

Scala

1
2
3
val regularRDD = sc.parallelize(List("张三 语文", "李四 数学", "王五 英语"))
val pairRDD = regularRDD.map(item => (item.split(" ")(0), item.split(" ")(1)))
pairRDD.foreach(item => println(item._1 + ": " + item._2))

Java:

1
2
3
JavaRDD<String> parallelizeRDD = jsc.parallelize(Arrays.asList("张三 语文", "李四 数学", "王五 英语"));
JavaPairRDD<String, String> pairRDD = parallelizeRDD.mapToPair(item -> new Tuple2<>(item.split(" ")[0], item.split(" ")[1]));
pairRDD.foreach(t -> System.out.println(t._1 + ": " + t._2));

6.8.1 PairRDD 上的 transformation 操作

PairRDDs 允许使用常规 RDDs 可用的所有转换,支持与常规 RDDs 相同功能。
由于 PairRDDs 包含元组,所以我们 需要传递操作元组而不是 单个元素的函数给 Spark。

1. filter
在 pairRDD 上使用 filter transformation:

Scala:

1
2
3
4
5
val regularRDD = sc.parallelize(List("张三 语文", "李四 数学", "王五 英语"))
val pairRDD = regularRDD.map(item => (item.split(" ")(0), item.split(" ")(1)))

val filterPairRDD = pairRDD.filter(t => t._2.equals("语文"))
filterPairRDD.foreach(println)

Java:

1
2
3
4
5
JavaRDD<String> parallelizeRDD = jsc.parallelize(Arrays.asList("张三 语文", "李四 数学", "王五 英语"));
JavaPairRDD<String, String> pairRDD = parallelizeRDD.mapToPair(item -> new Tuple2<>(item.split(" ")[0], item.split(" ")[1]));

JavaPairRDD<String, String> filterRDD = pairRDD.filter(t -> t._2.equals("数学"));
filterRDD.foreach(t -> System.out.println(t));

2. reduceByKey—另一个版本的 wordcount

Scala:

1
2
3
4
5
6
7
val fileRDD = sc.textFile("in/README.md")
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
.sortBy(_._2, false)
.collect()
.foreach(println)

Java:

1
2
3
4
5
6
7
8
JavaRDD<String> javaRDD = jsc.textFile("in/README.md");
JavaRDD<String> wordsRDD = javaRDD.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
JavaPairRDD<String, Integer> javaPairRDD = wordsRDD.mapToPair(word -> new Tuple2<>(word, 1));
JavaPairRDD<String, Integer> wordCounts = javaPairRDD.reduceByKey((a, b) -> (a + b));
JavaRDD<Tuple2<Integer, String>> tuple2JavaRDD = wordCounts.map(t -> new Tuple2<>(t._2, t._1));
JavaRDD<Tuple2<Integer, String>> tuple2JavaRDD1 = tuple2JavaRDD.sortBy(t -> t._1, false, 1);
JavaRDD<Tuple2<String, Integer>> sortedRDD = tuple2JavaRDD1.map(t -> new Tuple2<>(t._2, t._1));
sortedRDD.foreach(t-> System.out.println(t));

3. combineByKey
combineByKey 是 Spark 中一个核心的高级函数,其他一些 键值对函数底层都是用它实现的。如 groupByKey, reduceByKey 等。

例:计算平均分数(Scala)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
object learnCombineBeKey {

case class ScoreDetail(studentName: String, subject: String, score: Float)

def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder()
.master("local[*]")
.appName("learnCombineBeKey")
.getOrCreate()
val sc = sparkSession.sparkContext

/**
* https://www.edureka.co/blog/apache-spark-combinebykey-explained
*
* combineByKey transformation
* combineByKey API 有三个函数
* Create combiner function: x
* Merge value function: y
* Merger combiners function: z
*
* API 格式为 combineByKey(x, y, z)
* 让我们看一个例子(Scala语言):本例的目标是找到每个学生的平均分数
*/

val scores = List(
ScoreDetail("A", "Math", 98),
ScoreDetail("A", "English", 66),
ScoreDetail("B", "Math", 74),
ScoreDetail("B", "English", 80),
ScoreDetail("C", "Math", 98),
ScoreDetail("C", "English", 96),
ScoreDetail("D", "Math", 100),
ScoreDetail("D", "English", 95)
)

/**
* 将测试数据转换为键值对形式--键key为学生名称Student Name,值为ScoreDetail 实例对象
*/
val scoreWithKey = for (i <- scores) yield (i.studentName, i)

/**
* 创建一个 pairRDD
*/
val scoreWithKeyRDD = sc.parallelize(scoreWithKey)

/**
* 计算 平均分数
*/
val avgScoresRDD = scoreWithKeyRDD.combineByKey(
(x: ScoreDetail) => (x.score, 1),
(acc: (Float, Int), x: ScoreDetail) => (acc._1 + x.score, acc._2 + 1),
(acc1: (Float, Int), acc2: (Float, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).map({
case (key, value) => (key, value._1 / value._2)
})
avgScoresRDD.foreach(println)

sc.stop()
}
}

4. sortByKey
当我们在(K, V)数据集中应用 sortByKey() 函数时,数据是根据 RDD 中的键 K 排序的。

Scala:

1
2
3
val data =sc.parallelize(Seq(("maths",52), ("english",75), ("science",82), ("computer",65), ("maths",85)))
val sorted = data.sortByKey()
sorted.collect().foreach(println)

:在上面的代码中,sortByKey() 将数据 RDD的 key(String) 按升序排序。

Java:

1
2
3
4
5
6
7
8
List<Tuple2<String, Integer>> tuple2s = Arrays.asList(new Tuple2<>("maths", 52),
new Tuple2<>("english", 75),
new Tuple2<>("science", 82),
new Tuple2<>("computer", 65),
new Tuple2<>("maths", 85));
JavaPairRDD<String, Integer> javaPairRDD = jsc.parallelizePairs(tuple2s);
JavaPairRDD<String, Integer> javaPairRDD1 = javaPairRDD.sortByKey();
javaPairRDD1.collect().forEach(t -> System.out.println(t));

5. join
join 是数据库术语。它使用公共值组合两个表中的字段。Spark 中的 join() 操作是在 pairRDD 上定义的。pairRDD 每个元素都以 tuple 的形式出现。tuple 第一个元素是 key,第二个元素是 value。join() 操作根据 key 组合两个数据集。

Scala:

1
2
3
4
val data1 = sc.parallelize(Array(('A', 1), ('B', 2)))
val data2 = sc.parallelize(Array(('A', 4), ('A', 6), ('b', 7), ('c', 3), ('c', 8)))
val result = data1.join(data2)
println(result.collect().mkString(",")) // (A,(1,4)),(A,(1,6))

7. Spark 算子实战–action

7.1 count

count() 返回 RDD 中的元素数量。

7.2 take

从 RDD 返回 n 个元素。它试图减少它访问的分区数量,不能使用此方法来控制访问元素的顺序。

7.3 top

如果 RDD 中元素有序,那么可以使用 top() 从 RDD 中提取前几个元素。

Scala

1
2
3
val fileRDD = sc.textFile("in/README.md")
val lengthRDD = fileRDD.map(line => (line,line.length))
lengthRDD.top(3).foreach(println)

Java:

1
2
JavaRDD<String> javaRDD = jsc.textFile("in/README.md");
javaRDD.top(3).forEach(item -> System.out.println(item));

7.4 countByValue

countByValue() 返回,每个元素都出现在 RDD 中的次数。例如:
RDD 中的元素{1, 2, 2, 3, 4, 5, 5, 6},“rdd.countByValue()” -> {(1,1), (2,2), (3,1), (4,1), (5,2), (6,1)},返回一个 HashMap(K, Int) ,包括每个 key 的计数。

Scala: wordcount 另一种实现

1
2
val fileRDD = sc.textFile("in/README.md")
fileRDD.flatMap(line => line.split(" ")).countByValue().foreach(println)

Java:

1
2
3
4
JavaRDD<String> javaRDD = jsc.textFile("in/README.md");
JavaRDD<String> javaRDD1 = javaRDD.flatMap(line -> Arrays.asList(line.split("
")).iterator());javaRDD1.countByValue().forEach((key, value) -> System.out.println(key + "," +
value));

7.5 reduce

reduce() 函数将 RDD 的两个元素作为输入,然后生成与输入元素相同类型的输出。这种函数的简单形式就是一个加法。

Scala:

1
2
3
val rdd1 = sc.parallelize(List(12,13,14,43,53,65,34))
val sum = rdd1.reduce(_+_)
println(sum)

Java:

1
2
3
JavaRDD<Integer> parallelizeRDD = jsc.parallelize(Arrays.asList(12, 13, 14, 43, 53, 65, 34));
int sum = parallelizeRDD.reduce((a, b) -> a + b);
System.out.println(sum);

7.6 collect

collect() 是将整个 RDDs 内容返回给 driver 程序的常见且最简单的操作。collect() 的应用是单元测试,在单元测试中,期望整个 RDD 能够装入内存。如果使用了 collect 方法,但是 driver 内存不够,则会内存溢出。

Scala:

1
2
3
4
val data1 = sc.parallelize(Array(('A', 1), ('B', 2)))
val data2 = sc.parallelize(Array(('A', 4), ('A', 6), ('b', 7), ('c', 3), ('c', 8)))
val result = data1.join(data2)
println(result.collect().mkString(",")) // (A,(1,4)),(A,(1,6))

7.7 foreach (无返回值)

当我们希望对 RDD 的每个元素应用操作,但它不应该返回值给 driver 程序时。在这种情况下,foreach() 函数是非常合适的。例如,向输入库插入一条记录。

7.8 foreachParitition(无返回值)

类似 mapPartitions, 区别在于:1、foreachPartition 是 action 操作 2、foreachPartition 函数没有返回值(返回值是unit)。

7.9 作业:

  1. 航班数最多的航空公司,算出前 6 名。
  2. 北京飞往重庆的航空公司,有多少个?

数据格式

1
阿克苏,41.188341,80.293842,北京,39.92998578,116.395645,3049,CA1276,中国国航,JET,15:40,21:40,阿克苏机场,41.26940127,80.30091874,首都机场,40.06248537,116.5992671,63%,42分钟,1,0,1,0,1,0,1
  1. 航班数最多的航空公司,算出前 6 名。

Scala:

1
2
3
4
5
6
7
8
9
val textFileRDD = sc.textFile("in/Flight.csv")
val airlinesRDD = textFileRDD.map(line => (line.split(",")(8), 1))
val resRDD = airlinesRDD.reduceByKey(_ + _)
val list1 = resRDD.sortBy(_._2, false)
.collect()
// 取前6名
for (i <- 0 to 5) {
println(list1(i))
}

Java:

1
2
3
4
5
6
7
8
9
10
11
JavaRDD<String> textFileRDD = jsc.textFile("in/Flight.csv");
JavaPairRDD<String, Integer> javaPairRDD = textFileRDD.mapToPair(line -> new Tuple2<>(line.split(",")[8], 1));
JavaPairRDD<String, Integer> javaPairRDD1 = javaPairRDD.reduceByKey((a, b) -> (a + b));
JavaPairRDD<Integer, String> integerStringJavaPairRDD = javaPairRDD1.mapToPair(t -> new Tuple2<>(t._2, t._1));
JavaPairRDD<Integer, String> integerStringJavaPairRDD1 = integerStringJavaPairRDD.sortByKey(false);
JavaPairRDD<String, Integer> javaPairRDD2 = integerStringJavaPairRDD1.mapToPair(t -> new Tuple2<>(t._2, t._1));

List<Tuple2<String, Integer>> collect = javaPairRDD2.collect();
for (int i = 0; i < 5; i++) {
System.out.println(collect.get(i));
}
  1. 北京飞往重庆的航空公司,有多少个?

Scala:

1
2
3
val BJ_CQ_RDD = textFileRDD.filter(line => (line.split(",")(0).equals("北京") && line.split(",")(3).equals("重庆")))
val count = BJ_CQ_RDD.map(line => (line.split(",")(8), 1)).countByKey().size
println(count)

Java:

1
2
3
JavaRDD<String> filterRDD = textFileRDD.filter(line -> (line.split(",")[0].equals("北京") && line.split(",")[3].equals("重庆")));
int size = filterRDD.mapToPair(line -> new Tuple2<>(line.split(",")[8], 1)).countByKey().size();
System.out.println(size);

8. Spark RDD 分区实战

8.1 RDD partition 概念

我们处理大数据时,由于数据量太大,以至于单个节点无法完全存储、计算。所以这些数据需要分割成多个数据块 block,以利用多个集群节点的存储、计算资源。Spark 自动对 RDDs 中的大量数据元素进行分区,并在 worker 节点之间分配分区,计算。分区是逻辑上。

8.2 RDD partition 的相关属性

属性 描述
partitions 返回包含 RDD 所有分区引用 的一个数组
partitions.size 返回 RDD 的分区数量
partitioner 返回下列分区器之一:
NONE
HashPartitioner
RangePartitioner
自定义分区器

Spark 使用 partitioner 属性来确定分区算法,以此来确定哪些 worker 需要存储特定的 RDD记录。如果 partitoner 的值为 NONE,意思是分区不是基于数据的特性 ,但是分布是随机的,并且保证在节点之间是均匀地。

8.3 查看 RDD partition 信息

textfile 方法的 partition size 查看

1
println(textFileRDD.partitions.size)

注意:使用 textFile 方法读取数据,可以设置 partition 大小:

1
val textFileRDD = sc.textFile("in/Flight.csv", 2)

在集群环境中,读取本地文件、HDFS数据,由数据的 block 个数决定,最小为2。

特殊情况:如果 local 模式,单线程运行,默认 partitions.size 为1。(项目中不会使用此情况)

注意:每个 partition 会运行一个task 来处理其中的数据元素。

8.4 RDD 的初始分区

conf.set(“spark.default.parallelism”,”3”) // 设置默认的并行度

local: 一个线程———–sc.defaultParallelism 值为1
local[*]: 服务器core 数量——–sc. defaultParallelism 的值为8
local[4]: 4个线程———–sc.defaultParallelism 的值为4

spark.default.parallelism 参数值的说明
如果 spark-default.conf 或 SparkConf 中设置了 spark.default.parallelism 参数值,那么 spark.default.parallelism = 设置值;
如果 spark-default.conf 或 SparkConf 中没有设置,那么:

local 模式
local: spark.default.parallelism =1
local[4]: spark.default.parallelism = 4

yarn 和 standalone 模式
spark.default.parallelism = max(所有 executor 使用的core 总数, 2)

由上述规则,确定 spark.default.parallelism 的默认值
当Spark 程序执行时,会生成个 SparkContext 对象,同时会生成以下两个参数值:
sc.defaultParallelism = spark.default.parallelism
sc.defaultMinPartitions = min(spark.default.parallelism, 2)

当sc.defaultParallelism 和 sc.defaultMinPartitions 确认了,就可以推算出RDD 的分区数了。

有三种产生 RDD 的方式

  1. 通过集合创建

    val rdd = sc.parallelize(1 to 100)

没有指定分区数,则rdd的分区数 = sc.defaultParallelism

  1. 通过外部存储创建

    val rdd = sc.textFile(filePath)

2.1 从本地 文件生成 RDD,没有指定分区数,则默认分区规则为: rdd 的分区数 = max(本地 file 的分片数, sc.defaultMinPartitions)
2.2 从 HDFS 读取数据生成 RDD,没有指定分区数,则默认 分区规则为:rdd 的分区数 = max(HDFS文件的 block 数, sc.defaultMinPartitions)

  1. 通过已有 RDD 产生新的 RDD,新 RDD的分区数遵循遗传特性。见下节。

:项目中,在 spark-default.conf 文件中,spark.default.parallelism 属性值设置为 executor-cores * executors 个数 * 3

8.5 Transformation 操作对分区的影响

8.5.1 普通 RDD 操作

API调用 RDD 分区属性值
partition.size
RDD 分区属性值
partitioner
map(), flatMap(), distinct() 与父RDD相同 NONE
filter() 与父RDD相同 与父RDD相同
rdd.union(otherRDD) rdd.partitions.size + otherRDD.partitions.size NONE
rdd.intersection(otherRDD) max(rdd.partitions.size, otherRDD.partitions.size) NONE
rdd.subtract(otherRDD) rdd.partitions.size NONE
rdd.cartesian(otherRDD) rdd.partitions.size * otherRDD.partitions.size NONE

8.5.2 Key-value RDD 操作

API调用 RDD 分区属性值
partition.size
RDD 分区属性值
partitioner
reduceByKey(),foldByKey(),combineByKey(),groupByKey() 与父RDD相同 HashPartitioner
sortByKey 与父RDD相同 RangePartitioner
mapValues(), flatMapValues() 与父RDD相同 与父RDD相同
cogroup(),join(),leftOuterJoin(), rightOuterJoin() 取决于所涉及的两个 RDDs 的某些输入属性 HashPartitioner

8.6 有多少分区是合适的(重点!!)

分区数量太少、太多都有一定的优点和缺点。因此,建议根据集群配置和需求进行明智的分区。
Core-partition-task

8.6.1 分区太少的缺点

减少并发性——您没有使用并行性的优点。可能存在空闲的 wroker 节点。
数据倾斜和不恰当的资源利用——数据可能在一个分区上倾斜,因此一个 worker 可能比其他 worker 做的更多,因此可能会出现资源问题。

8.6.2 分区太多的缺点

任务调度可能比实际执行时间花费更多的时间。

因此,在分区的数量之间存在权衡。推荐如下

  1. 可用 core 数量的2-3 倍。Spark 只为 RDD 的每个分区运行一个并发任务,最多可以同时运行集群中的核心数量个 task,分区数量至少与可用 core 数量相等。可以通过调用 sc.defaultParallelism 获得可用 core 值。单个分区的数据量大小最终取决于执行程序的可用内存。

  2. WebUI 上查看任务执行,至少需要 100+ ms 时间。如果所用 时间少于 100ms,那么应用程序可能会花更多的时间来调度任务。此时就要减少 partition 的数量。

8.7 Spark 中的分区器

要使用分区器,首先要创建 PairRDD类型的 RDD
Spark 有两种类型的分区器。一个是 HashPartitioner,另一个是 RangePartitioner。

8.7.1 HashPartitioner

HashPartitioner 基于 Java 的 Object.hashcode() 方法进行分区。

8.7.2 RangePartitioner

如果有可排序的记录,那么范围分区将几乎在相同的范围内划分记录。范围 Range 是 通过采样传入 RDD的数据内容来确定的。首先,RangePartitioner 将根据 key 对记录进行排序,然后根据给定的值将记录划分为 若干个分区。

8.7.3 自定义分区器

还可以通过扩展 Spark 中的默认分区器类来定制 需要的分区数量和应该存储在这些分区中的内容。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val listRDD = sc.parallelize((1 to 10).toList)
val pairRDD = listRDD.map(num => (num, num))
println("NumPartitions: " + pairRDD.getNumPartitions) // NumPartitions: 8
println("Partitioner: " + pairRDD.partitioner) // Partitioner: None
pairRDD.saveAsTextFile("out/None")

// 使用 HashPartitioner 并 设置分区个数
val hashPartitionerRDD = pairRDD.partitionBy(new HashPartitioner(4))
hashPartitionerRDD.saveAsTextFile("out/hashPartition4")

// coalesce 方法只能用来减少 分区数量,不能用来增加分区数量
// partitionBy 方法可以减少,也可以增加
hashPartitionerRDD.coalesce(2).saveAsTextFile("out/hashPartition2")
println(hashPartitionerRDD.partitioner) // Some(org.apache.spark.HashPartitioner@4)

9. Spark RDD 数据保存实战

9.1 保存数据到 HDFS

代码示例

1
2
3
4
5
6
val textFileRDD = sc.textFile("hdfs://master01:8020/in/README.txt", 2)
val wordsRDD = textFileRDD.flatMap(line => line.split(" "))
val wordcountPairRDD = wordsRDD.map(w => (w, 1))
val wordcountRDD = wordcountPairRDD.reduceByKey(_ + _)

wordcountRDD.saveAsTextFile("hdfs://master01:8020/out/wordcount")

9.2 保存数据到 mysql 数据库

9.2.1 读

1
2
3
4
5
6
7
8
9
//mysql 读
val jdbcDF = sparkSession.read
.format("jdbc")
.option("url", "jdbc:mysql://master01:3306/test")
.option("user", "root")
.option("password", "Mysql123!")
.option("dbtable", "flight")
.load()
jdbcDF.printSchema()

9.2.2 写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
val schema = StructType(List(
StructField("name", StringType, nullable = false),
StructField("age", IntegerType, nullable = false),
StructField("gender", StringType, nullable = false)
))

val rowRDD = sc.parallelize(Seq(
Row("张1", 18, "男"),
Row("张2", 19, "女"),
Row("张3", 10, "男"),
Row("张4", 48, "女"),
Row("张5", 68, "男"),
Row("张6", 16, "男")
))
val df = sparkSession.createDataFrame(rowRDD, schema)

// mysql 写
df.write
.format("jdbc")
.option("url", "jdbc:mysql://master01:3306/test")
.option("dbtable", "user")
.option("user", "root")
.option("password", "Mysql123!")
.mode(SaveMode.Overwrite)
.save()

9.3 保存数据到 kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// producer 配置
val props = new Properties()
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master01:9092")
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)

// producer 发送 RDD 数据
val textFileRDD = sc.textFile("in/Flight1.csv")
textFileRDD.foreach(line => {
val producer = new KafkaProducer[String, String](props)
val message = new ProducerRecord[String, String]("myTopic", line)
println(message)
producer.send(message)
Thread.sleep(3000)
})

10. Spark RDD 缓存实战

10.1 前言

一个 action 会启动一个 job, 一个 job 里面有一个或多个 stage,一个 stage 里面有一个或者多个 task。

Repartiton 引起 shuffle 操作,shuffle 操作发生的时候,stage 会一分为二。

窄依赖, 宽依赖
一种性能调优的方式。

10.2 要点

  1. 缓存持久化是 Spark 计算过程中的调优技术。缓存和持久化可以保存中间计算结果,以便在后续的 stage 中重用,而不需要再次从头计算。这些中间结果以 RDD 的形式保存在内存(默认)中,或者磁盘中。

  2. StorageLevel 描述了 RDD 是如何被持久化(persist)的,可以提供如下相关信息:

  • RDD 持久化磁盘存储还是内存存储
  • RDD 持久化是否使用了 off-heap
  • RDD 是否需要被序列化
  • 缓存的副本是多少(默认是 1)
  1. StorageLevel 的值包括:
  • NONE(默认)
  • DISK_ONLY: RDD 只是存储在磁盘,内存消耗低,CPU 密集型。
  • DISK_ONLY_2
  • MEMORY_ONLY(cache 操作):RDD 以非序列化的 Java 对象存储在 JVM中。如果 RDD 的大小超过了内存大小,那么某些 partition 将会不缓存,下次使用时重新计算。这种存储级别比较耗内存,但是不耗 CPU。数据只存储在内存,不存储在磁盘。
  • MEMORY_ONLY_2
  • MEMORY_ONLY_SER: RDD 以序列化 Java 对象(每个 partition 一个字节数组)的形式存储。在这个级别,内存空间 使用很低,CPU计算时间高。
  • MEMORY_ONLY_SER_2
  • MEMORY_AND_DISK: RDD 以非序列化的 Java 对象存储在 JVM 中。当 RDD 的大小超过了内存 大小,多出的 partition 会缓存在磁盘上,后续计算如果用到这些多出的 partiton,会从磁盘获取。这种存储级别比较耗内存,CPU消耗一般。
  • MEMORY_AND_DISK_2
  • MEMORY_AND_DISK_SER: 与 MEMORY_ONLY_SER 类似,只是将大于内存的partition 数据序列化到磁盘,而不是重新计算。内存消耗低,CPU密集型。
  • MEMORY_AND_DISK_SER_2
  • OFF_HEAP

可以使用 getStorageLevel 方法查看 RDD 的 StorageLevel:

1
2
3
val textFileRDD = sc.textFile("\in\README.txt")
println(textFileRDD.getStorageLevel)
println(textFileRDD.getStorageLevel)

输出

StorageLevel(1 replicas)

  1. RDD 可以被缓存(cache)到内存,使用 cache() 方法,也可以被持久化(persist),使用 persist() 方法。

  2. cache() 和 persist() 方法的区别在于:cache() 等价于 persist(MEMEORY_ONLY),即 cache() 仅仅是 persist() 使用默认存储级别 MEMORY_ONLY 的一种情况。使用 persist() 方法可以设置不同的 StorageLevel值。

  3. 对于迭代算法,缓存和持久化是一个重要的工具。因为,当我们在一个节点上缓存了 RDD 的某个 partiton 到内存中,其就可以在下面的计算中重复使用,而不需要从头计算,可以使计算性能提高 10倍。如果缓存中的某个 partiton 丢失或者不可用,根据 Spark RDD 的容错特性,Spark 会从头计算这个 partition。

  4. 什么时候需要对 RDD 进行持久化?在 Spark 中,我们可以多次使用同一个 RDD,如:使用 RDD 计算 count()、max()、min()等 action 操作。而且这些操作可能很耗内存,尤其是迭代算法(机器学习)。为了解决频繁重复计算的问题,此时就需要对 RDD 进行持久化。

  5. Spark 自动监控每个节点的缓存和以 LRU(最近最少使用)方式删除旧数据分区。LRU算法,保证了最常用的数据被缓存。我们 也可以使用 RDD.unpersist() 方法手动删除缓存。

  6. Spark 会在 shuffle 操作中自动持久化一些中间数据(例如 redueByKey),即使没有调用 persist 方法。这样做是为了避免在 shuffle 期间节点故障时重新计算整个输入。如果用户准备重用生成的 RDD,推荐显式调用持久化。

10.3 RDD 持久化存储级别如何选择

Spark 的存储级别是为了在内存使用CPU 效率之间 提供不同的权衡,具体选择哪个存储级别,可以从以下方面考虑:

  • 如果 RDDs 数据适合默认存储级别(MEMORY_ONLY),那么就使用默认。此时,RDD 的运算速度最快。
  • 如果没有,请尝试使用 MEMORY_ONLY_SER 并选择一个快速序列化库,以使对象更节省空间,但访问速度仍然相当快。
  • 不要持久化到磁盘,除非计算 数据集的函数很耗时,或者 过滤了大量 数据。因为,从磁盘读取分区,可能没有重新计算快。
  • 如果需要快速的故障恢复,则使用副本存储级别。
支持一下
扫一扫,支持forsigner
  • 微信扫一扫
  • 支付宝扫一扫