盒子
盒子
文章目录
  1. 1. Spark 调优
    1. 1.1 避免重复创建 RDD,尽可能复用 RDD
    2. 1.2 对重复使用的 RDD 进行持久化
    3. 1.3 尽量避免使用会触发 shuffle 的算子
    4. 1.4 使用高性能算子
      1. 1.4.1 使用 mapPartitions 替代普通 map
      2. 1.4.2 使用 foreachPartitions 替代 foreach
      3. 1.4.3 filter 算子之后使用 coalesce 算子
      4. 1.4.4 使用 repartitionAndSortWithinPartitions 替代 repartition 与 sort 类操作
    5. 1.5 将大变量广播出去
    6. 1.6 使用 kryo 序列化方式来优化序列化性能
    7. 1.7 使用优化的数据结构
    8. 1.8 数据倾斜调优
    9. 1.9 资源调优
    10. 1.10 JVM 内存管理简介
    11. 1.11 filter 算子导致数据倾斜

Spark 调优

1. Spark 调优

参考链接:https://spark.apache.org/docs/latest/rdd-programming-guide.html#shuffle-operations

spark core 调优:Spark 应用程序的优化涉及到多个方面,包括 Spark 应用程序调优,资源调优,网络调优,硬盘调优等多个方面。本节课主要关注 Spark 应用程序调优和资源调优方面的内容,这也是 Spark 开发工程师的主要工作之一。
Spark应用程序调优,在编写 Spark 应用程序时主要考虑以下几个方面:

1.1 避免重复创建 RDD,尽可能复用 RDD

开发 Spark 应用程序时,一般的步骤是:首先基于某个数据源(比如 HDFS 文件)创建一个初始的 RDD;接着对此 RDD 执行某个算子操作,得到下一个 RDD,以此类推,最后调用 action 操作得出我们想要的结果。在此过程中,多个 RDD 形成了RDD 的血缘关系链(lineage)

对于同一份数据,只应该创建一个 RDD,不应该创建多个 RDD。如果基于一份数据创建了多个 RDD。Spark 作业会进行多次重复计算,增加了作业的性能开销。

例如:

1
2
3
4
5
6
7
8
9
10
// 错误的做法
val rdd1 = sc.textFile("in/README.md")
rdd1.map(..)
val rdd2 = sc.textFile("in/README.md")
rdd2.reduce(..)

// 正确的做法
val rdd1 = sc.textFile("in/README.md")
rdd1.map(..)
rdd2.reduce(..)

1.2 对重复使用的 RDD 进行持久化

参见 RDD 缓存持久化章节

1.3 尽量避免使用会触发 shuffle 的算子

如果有可能,要尽量避免使用 shuffle 类算子。因为 Spark 作业运行过程中,最消耗性能的地方就是 shuffle 过程。shuffle 过程,就是将分布在集群中多个节点上的相同 key 的数据,拉取到同一个节点上,进行聚合或 join 等操作。比如 reduceByKey、join 等 算子,都会触发 shuffle。

shuffle,中文意思就是洗牌,洗牌是把扑克牌打乱,而 spark 的 shuffle 过程与洗牌的过程恰恰相反 ,spark shuffle 是将数据按 key 梳理好。

shuffle 过程中,各个节点上的相同 key 数据都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同 key。相同 key 都拉取到同一个节点进行聚合操作,此时有可能会因为某个节点上处理的 key 过多(数据倾斜),导致内存不够存放,进而溢写到磁盘文件中。因此在 shuffle 过程中,可能会发生大量的磁盘文件读写的 IO操作,以及数据的网络传输操作。磁盘 IO 和网络数据传输是 shuffle 性能较差的主要原因。

因此开发过程中,尽可能避免使用 reduceByKey、join、distinct、repartition等会触发 shuffle 的算子,尽量使用 map 类的非 shuffle 算子,尽量使用广播变量来避免 shuffle,优先选用 reduceByKey、aggregateByKey、combineByKey替换 groupByKey,应用 reduceByKey 算子内部使用了预聚合操作。

1.4 使用高性能算子

1.4.1 使用 mapPartitions 替代普通 map

mapPartitions 类的算子,一次函数调用会处理一个 partition 所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。但是有的时候,使用 mapPartitions 会出现 OOM(out of memory 内存溢出)的问题。因为每次函数调用就要处理一个 partition 中的所有数据,如果内存不够,会频繁 GC(垃圾回收),可能出现 OOM 异常。

1.4.2 使用 foreachPartitions 替代 foreach

原理类似于 6.4.1。比如在 foreach 函数中,将 RDD 中所有数据写入 MySQL 数据库,就会一条数据一条数据地写,每次函数调用 可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用 foreachPartitions 算子一次性处理一个 partition 的数据,那么对于每个 partition,只要创建一个数据库连接即可,然后执行批量插入炒作,此时性能是比较高的。

1.4.3 filter 算子之后使用 coalesce 算子

1.4.4 使用 repartitionAndSortWithinPartitions 替代 repartition 与 sort 类操作

二次排序的例子

1.5 将大变量广播出去

在开发过程中,一旦出现在算子函数中使用外部变量的场景(尤其是较大的变量,注意:超过 4M需要修改默认配置),此时就应该使用 Spark 的广播(Broadcast)变量功能来提升性能。

在算子函数中使用外部变量时,默认情况下,Spark 会将该变量复制多个副本,通过网络传输到 task 中,此时每个 task 都有一个变量副本。如果变量本身比较大,那么大量的变量副本在网络中传输的性能开销,以及在各个节点的 Executor 中占用过多内存导致的频繁 GC 垃圾回收(miner GC -> full GC 的时候,程序时停止运行的),会极大地影响性能。使用 Spark 广播功能,对该变量进行广播。广播后的变量,每个 Executor 只保留一份变量副本,而 Executor 中的 task 执行时共享该 Executor 中变量副本。这样,就可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对 Executor 内存的占用开销,降低 GC 的频率。

1.6 使用 kryo 序列化方式来优化序列化性能

在 Spark 中,主要有三个地方涉及到了序列化:

  1. 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输,此时需要对此变量序列化。

  2. 将自定义的类作为 RDD 的泛型类型时(比如二次排序例子中,JavaPairRDD<Employee_Key,Employee_Value>,Employee_key 和 Employee_Value 是自定义类型),所有自定义类的对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现 Serializable接口。

  3. 使用可序列化的持久化策略时(比如 MEMORY_ONLY_SER),Spark 会将 RDD 中的每个 partition 都序列化成一个大的字节数组。

序列化大大减少了数据在内存、硬盘中占用的空间,减少了网络数据传输的开销,但是使用数据中,需要将数据进行反序列化,会消耗 CPU、延长程序执行时间,从而降低了 Spark 的性能,所以,序列化实际上利用了时间换空间的套路。

Spark 默认使 Java 序列化机制(ObjectOutputStream/ObjectInputStream API)来进行序列化和反序列化。Spark 同时支持使用 Kryo 序列化库,Kryo 序列化类库的性能比 Java序列化类库的性能高很多。官方介绍:性能高 10 倍左右。Spark 之所以默认没有使用 Kryo 作为序列化类库,是因为它不支持所有对象的序列化,同时 Kryo 需要用户在使用前注册需要序列化的类型,不够方便。
注册需要.png

Kryo 相关的配置项:
Kryo 相关.png

主要使用步骤

  1. 设置 spark 序列化使用的库

    1
    sparkconf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");	// 使用 Kryo 序列化库
  2. 在该库中注册用户定义的类型

    1
    sparkconf.set("spark.kyro.registrator", MyKryoRegistrator.class.getName());	// 在 Kryo 序列化库中注册自定义的类集合
  3. MyKryoRegistrator 类需要实现 KryoRegistrator 接口的 registerClasses 方法
    MyKryoRegistrator.png

def register.png

此时使用持久化方法,查看持久化数据的大小:
修改二次排序入口类,增加如下几行代码:
修改二次排序.png

JavaRDD.png

查看默认序列化和 Kryo 序列化所占空间的差别:

Kryo 序列化
kryo 序列化.png

默认序列化
默认序列化.png

进一步优化,启用 RDD 压缩
进一步优化.png

ShuffledRDD.png

注意:压缩机制虽然更进一步节省了空间,但是使用数据时,需要解压,耗费了 CPU。

1.7 使用优化的数据结构

Java 中,有三种类型比较耗内存:

  1. 自定义对象,每个 Java 对象都有对象头、引用等额外的信息,因此比较占用内存空间。

  2. 字符串,每个字符串内部都有一个字符数组以及长度等额外信息。

  3. 集合类型,比如 HashMap、LinkedList 等,因为集合类型内部通常会使用一些内部类封装集合元素,比如 Map.Entry。

Spark 官方建议,在 Spark 编码实现中,特别是对于算子函数中的代码,尽量使用字符串替代对象,使用原始类型(比如 Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而减低 GC 频率,提升性能。注意:前提是保证代码可运行,易维护。

1.8 数据倾斜调优

大数据计算中一个最棘手的问题——数据倾斜。数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证 Spark 作业的性能。

如何知道是否发生了数据倾斜?如果绝大多数 task 执行的都非常快,但个别 task 执行极慢。比如,总共有 100 个 task ,99 个都在 1 分钟之内执行完了,但是剩余一个 task 却要一两个小时。另外,数据倾斜严重的话,就发生 OOM 错误,导致这个 Application 失败。

为什么会发生数据倾斜?使用了引起 shuffle 的算子,进行 shuffle 的时候,必须将各个节点上相同的 key 拉取到某个节点上的一个 task 来进行处理,比如按照 key 进行聚合或 join 等操作。此时如果某个 key 对应的数据量特别大的话,就会发生数据倾斜。比如大部分 key 对应 10 条数,但是个别 key 却对应了 100 万条数据,那么大部分 task 可能就只会分配到 10 条数据,1 秒钟就运行完了;但是个别 task 可能分配到了 100 万数据,要运行一两个小时。因此,整个 Spark 作业的运行进度是由运行时间最长的那个 task 决定的。

如何定位发生数据倾斜的位置?shuffle 导致了数据倾斜,常见导致 shuffle 的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition 等。出现了数据倾斜,直接在代码中找这样的算子。

如何定位是哪个算子?这样的算子会产生 shuffle ,shuffle 会划分 stage,所以,从 web UI 中查看发生数据倾斜的 task(即运行时间较长的 task)发生在哪个 stage中。无论是 spark standalone 模式还是 spark on yarn 模式的应用程序,都可以在 spark history server 中看到详细执行信息。也可以通过 yarn logs 命令查看详细的日志信息。
细的执行信息.png

定位了数据倾斜发生在哪里之后,接着需要分析一下那个执行了 shuffle 操作并且导致了数据倾斜的 RDD/Hive 表,查看一下其中 key 的分布情况。这主要是为之后选择那一种技术方案提供依据。查看 key 分布的方式:

  1. 如果是 Spark SQL 中的 group by、join 语句导致的数据倾斜,那么就查询一下 SQL 中使用的表的 key 分布情况。

  2. 如果是对 Spark RDD 执行 shuffle 算子导致的数据倾斜,那么可以在 Spark 作业中加入查看 key 分布的代码,比如 RDD.countByKey()。然后对统计出来的各个 key 出现的次数,collect、take 到客户端打印一下,就可以看到 key 的分布情况。

如何解决数据倾斜问题
方法一、过滤引起数据倾斜的 key
适用场景:如果发现导致倾斜的 key 就少数几个,而且对计算本身的影响并不大的话,适合使用这种方法。比如 90% 的 key 就对应 10 条数据,但是只有一个 key 对应了 100 万数据,从而导致了数据倾斜。

实现思路:countByKey 确定数据量超多的某个 key,使用 filter 方法过滤。SparkSQL 中使用 where 方法过滤。

此方法实现简单 ,而且效果也很好,可以完全规避掉数据倾斜。但是,适用场景不多,大多数情况下,导致倾斜的 key 还是很多的,并不是只有少数几个。

方法二、提高 shuffle 操作的并行度
适用场景:无法使用方法一规避,只有直面数据倾斜问题。

实现思路:执行 RDD shuffle 算子时,给 shuffle 算子传入一个参数,比如 reduceByKey(100),该参数就设置了这个 shuffle 算子执行时 shuffle read task 的数量。对于 Spark SQL 中的 shuffle 类语句,比如 groupByKey、join 等,需要设置一个参数,即 spark.sql.shuffle.partitions,该参数代表了 shuffle read task 的并行度,该值默认是 200,对于很多场景来说都有点过小。

此方法虽然实现简单,但是治标不治本。比如某个 key 对应的数据量有 100万,那么无论你的 task 数量增加到多少,这个对应着 100万数量的 key 肯定还是会分配到一个 task 中去处理,因此还是会发生数据倾斜的。

方法三、对数据倾斜 key 使用随机数,实现两阶段聚合
适用场景:对 RDD 执行 reduceByKey 等聚合类 shuffle 算子或者在 Spark SQL 中使用 group by 语句进行分组聚合时,比较使用这种方法。

实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一阶段是局部聚合,先给每个 key 都打上一个随机数,比如 10 以内的随机数,此时原先一样的 key 就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行 reducyByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个 key 的随机数给去掉,就会变成(hello,2)(hello,2),在此进行全局聚合操作,就可以得到最终结果了,比如(hello,4)。

如果聚合类的 shuffle 算子导致的数据倾斜,效果是非常不错的。如果是 join 类的 shuffle 算子,则不适合。

Scala:

1
2
3
4
5
6
7
8
9
10
11
12
// 1. 加随机前缀
val prefixRDD = wordPairRDD.map(t => {
val random = new Random()
val prefix = random.nextInt(100)
(prefix + "_" + t._1, t._2)
})
// 2. 局部聚合
val partialReduceRDD = prefixRDD.reduceByKey(_ + _)
// 3. 去除前缀
val noPrefixRDD = partialReduceRDD.map(t => (t._1.split("_")(1), t._2))
// 4. 全局聚合
val allReduceRDD = noPrefixRDD.reduceByKey(_ + _)

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 第一步,加随机前缀
JavaPairRDD<String, Integer> randomPrefixRDD = javaPairRDD.mapToPair(t -> {
Random random = new Random();
int prefix = random.nextInt(100);
return new Tuple2<>(prefix +"_"+t._1, t._2);
});

// 第二步,局部聚合
JavaPairRDD<String, Integer> javaPairRDD1 = randomPrefixRDD.reduceByKey((a, b) -> (a + b));

// 第三步,去除 prefix 前缀
JavaPairRDD<String, Integer> javaPairRDD2 = javaPairRDD1.mapToPair(t -> (new Tuple2<>(t._1.split("_")[1], t._2)));

// 第四步,全局聚合
JavaPairRDD<String, Integer> javaPairRDD3 = javaPairRDD2.reduceByKey((a, b) -> (a + b));

方法四、将 hash shuffle join 转换为 map join
适用场景:在对 RDD 使用 join 类操作,或者是在 Spark SQL 中使用 join 语句时,而且 join 操作中的一个 RDD 或表的数据量比较小(比如几百兆),比较适合此方案。

实现思路:不适用 join 算子进行连接操作,而使用 Broadcast 变量与 map 类算子实现 join 操作,进而完全规避掉 shuffle 类的操作,彻底避免数据倾斜的发生和出现。见广播变量课程中的 map join 代码。

对 join 操作导致的数据倾斜,效果非常好,因为不会发生 shuffle,也就不会发生数据倾斜。但是此方法适用场景较少,只适用于一个大 RDD 和一个小 RDD 的情况。

方法五、使用 partitioner 优化 hash shuffle join
为了对两个 RDD 中的数据进行 join,Spark 需要对两个 RDD 上的数据拉取同一个分区。Spark 中 join 的默认实现是 shuffled hash join:通过使用与第一个数据集相同的默认分区器对第二个数据集进行分区,从而确保每个分区上的数据将包含相同的 key,从而使两个数据集具有相同哈希值的键位于同一个分区中。虽然这种方法总是可以运行的,但是此种操作比较 耗费资源,因为它需要一次 shuffle。

如果两个 RDD 都有一个已知的分区器,则可以避免 shuffle,如果它们有相同分区器,则数据可能被本地合并;避免网络传输,因此,建议在 join 两个 RDD 之前,调用 partitionBy 方法,并且使用相同对的分区器。

代码示例

1
2
3
val partitioner = new HashPartitioner(10)
agesRDD.partitionBy(partitioner)
addressRDD.partitionBy(partitioner)

方法六、综合使用前面的方法
如果只是处理较为简单的数据倾斜场景,使用上述方法中的某一种基本就可以解决。但是如果要处理一个较为复杂的数据倾斜场景,那么可能需要将多种方法组合起来使用。例如:针对出现了多个数据倾斜环节的 Spark 作业,可以先运用方法一和方法二,预处理一部分数据,并过滤一部分数据来缓解;其次可以对某些 shuffle 操作提升并行度,优化其性能;最后还可以针对不同的聚合或 join 操作,选择一种方法来优化其性能。需要对这些方法的思路和原理都透彻理解之后,在实践中根据各种不同的情况,灵活运用多种方案,来解决数据倾斜问题。

1.9 资源调优

理解了 Spark 作业运行的基本原理之后,对资源相关的参数就容易理解了。所谓 Spark 资源参数调优,其实主要就是对 Spark 运行过程中各个使用资源的地方,通过调节各种参数,来优化资源使用的效率,从而提升 Spark 作业的执行性能。

资源调优主要涉及到内存、CPU等资源的分配,参见资源分配相关内容。

1.10 JVM 内存管理简介

Java 的堆内存分为两个区域:新生代和老生代。新生代保存的是生命周期比较短的对象,老生代保存生命周期比较长的对象。新生代又分了三个区域(Eden, Survivor1, Survivor2)。

垃圾收集过程简要说明:当 Eden 已满时,Eden 上运行了一个 minor GC,并将 Eden 和 Survivor1 中存在的对象复制到 Survivor2。Survivor 将进行交换。如果一个足够老,或者 Survivor2 已满,则会移动到老年代。最后当老年代接近满的时候,会触发 full GC

  1. 通过收集垃圾回收信息,判断是否有太多的垃圾回收过程。假如 full gc 在一个 task 完成之前触发了好几次,那说明运行 task 的内存空间不足,需要加内存。

  2. 配置 JVM 相关信息的位置 spark-default.con
    spark.executor.extraJavaOptions
    spark.driver.extraJavaOptions

注意:driver 和 executor 的 JVM 堆内存的大小通过 driver-memory 和 executor.memory 配置项设置。
配置项设置.png

推荐一篇 JVM 文章:https://blog.csdn.net/kidoo1012/article/details/54599046

1.11 filter 算子导致数据倾斜

使用 filter 算子进行过滤操作,会在每个 partition 进行单独过滤,如果某个 partition 过滤后没有符合条件的元素,RDD 就为空,这样的结果是数据倾斜
解决方案:执行完 filter 之后进行 repartition 操作。

支持一下
扫一扫,支持forsigner
  • 微信扫一扫
  • 支付宝扫一扫