盒子
盒子
文章目录
  1. 1. Spark 共享变量实战
    1. 1.1 广播变量 Broadcast Variables
    2. 1.2 累加器 Accumulator
      1. 1.2.1 注意使用累加器的陷阱
    3. 1.2.2 累加器和广播变量的不同
  2. 2. Spark 应用程序调度过程
    1. 2.1 Client 提交 Job, Driver 会向 master 申请资源,master 收到请求之后,会向 worker 进行资源调度,启动 Executor,然后,Executor 向 Driver 进行注册。
    2. 2.2 此时Spark 应用程序就会知道哪些 worker 上面的 executor 已经就绪。接着开始执行 spark 任务:
    3. 2.3 从 Spark 集群角度(静态):
      1. 2.3.1 Master
      2. 2.3.2 Worker
    4. 2.4 从 Spark 应用程序的角度(动态):
      1. 2.4.1 Application
      2. 2.4.2 Driver
      3. 2.4.3 Action
      4. 2.4.4 Transformation
      5. 2.4.5 Job
      6. 2.4.6 Stage
      7. 2.4.7 Task
      8. 2.4.8 Executor
  3. 3. Spark Master 高可用配置 HA
    1. 3.1 Standby Masters with ZooKeeper
      1. 3.1.1 概述
      2. 3.1.2 配置
      3. 3.1.3 可能的陷阱
      4. 3.1.4 详细
      5. 3.1.5 HA 配置实战
    2. 3.2 带有本地文件系统的单节点恢复
      1. 3.2.1 概述
      2. 3.2.2 配置
      3. 3.2.3 细节
  4. 4. Spark 应用程序内存和 CPU 的分配
    1. 4.1 Spark 运行模式
      1. 4.1.1 local
      2. 4.1.2 standalone
      3. 4.1.3 yarn
      4. 4.1.4 mesos
    2. 4.2 如何设置 Spark 配置属性
      1. 4.2.1 有三种方式设置spark 的配置属性
      2. 4.2.2 典型的 spark-defaults.conf 文件内容
    3. 4.3 集群资源配置实战
    4. 4.4 Spark 应用程序 资源配置实战
      1. 4.4.1 –executor-cores
      2. 4.4.2 参数优先级
      3. 4.4.3 几个常用的配置项
  5. 5. Spark on yarn
    1. 5.1 概述
    2. 5.2 资源分配
      1. 5.2.1 CPU 的分配
      2. 5.2.2 内存的分配
    3. 5.3 如何给 Spark 分配资源
    4. 5.4 Spark 应用资源分配的限制
    5. 5.5 spark on yarn 应用的提交方式
    6. 5.6 Spark on yarn 的资源分配依赖于 spark 的运行模式
      1. 5.6.1 为 spark driver 分配资源
    7. 5.7 Client 模式下 AM、Driver 的资源分配
    8. 5.8 Cluster 模式下 AM、Driver 的资源分配
    9. 5.9 Executor 的资源配置
    10. 5.10 Spark 内存使用
      1. 5.10.1 Spark 如何使用内存
      2. 5.10.2 Spark 内存使用的配置
      3. 5.10.3 一个 Executor 究竟能使用多少内存?
      4. 5.10.4 发现当前内存使用量
      5. 5.10.5 需要记住的几点
    11. 5.11 Client 模式还是 Cluster 模式

SparkCore 知识点

1. Spark 共享变量实战

通常,Spark 程序计算的时候,我们传递的函数时在远程集群节点上执行的,在函数中使用的所有变量副本会传递到远程节点,计算任务使用变量副本进行计算。这些变量被复制到每台机器上,对远程机器上的变量的更新不会返回 driver 程序。

跨任务支持通用的读写共享变量将是低效的。但是,Spark 为两种常见的使用模式提供了两种有限 功能的共享变量:广播变量累加器

1.1 广播变量 Broadcast Variables

广播变量允许 Spark 程序员将 只读变量 缓存在每台机器上,而不是将它的副本与 task 一起发送出去。例如,可以使用广播变量以高效的方式为每个 worker 节点提供一个大型输入数据集的副本。广播变量是只读的,对每个 worker 节点只需要传输一次。这样,就从每个 task 一份变量副本 ,变成了 一个 executor 一个变量副本,executor 中执行的 task 共用这个副本。如果有多个 worker 节点,各个 worker 上的 executor 中的变量副本并不都是来自 driver,因为 Spark 采用了高效地广播算法(TorrentBroadcast)来分配广播变量,以降低通信成本。

代码片段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
    val list1=List(("zhangsan",20),("lisi",18),("wangwu",30))
val list2=List(("zhangsan","Math"),("lisi","English"),("wangwu","Science"))

val rdd1=sc.parallelize(list1)
val rdd2=sc.parallelize(list2)

// 直接join会进行shuffle,导致性能低下
// val joinRDD=rdd1.join(rdd2)
// joinRDD.foreach(println)

// 可以将较小的rdd广播出去
val rddData=rdd1.collectAsMap()
val rddBC=sc.broadcast(rddData)
val rdd3=rdd2.mapPartitions(partition=>{
val bc=rddBC.value
for{
(key,value)<-partition
if(rddData.contains(key))
}yield (key,bc.get(key).getOrElse(""),value)
})

rdd3.foreach(t=>println(t._1+","+t._2+","+t._3))

1.2 累加器 Accumulator

  1. 顾名思义,累加器是只能 累加的变量。在 task 中只能对累加器进行添加数值,而不能获取累加器的值。累加器只能在 driver 端获取

  2. 累加器支持加法交换律和结合律,因此 可以有效的支持 spark task 的并行计算。

  3. 累加器可用于 Spark 任务计数场景或者 Spark 任务求和场景。Spark 内置了几种累加器,支持自定义累加器。作为 Spark 开发工程师,可以创建命名的或未命名的累加器。如下所示,一个命名累加器(在这个实例计数器 中)将显示在修改该累加器的 stage web UI 中。

  4. Spark 显示 “Tasks”表中由任务修改的每个累加器的值。
    累加器.png

1.2.1 注意使用累加器的陷阱

  1. 执行多个 action !!
  2. Task 缓存数据被踢出时,下次用到时,会重新计算,此时累加器会重复计数。综上,实际项目中,累加器仅用于程序调试。

代码片段

1
2
3
4
5
6
7
8
9
10
11
AccumulatorV2<Long, Long> longAccumulator = jsc.sc().longAccumulator("longAccumulator");
JavaRDD<String> javaRDD = jsc.textFile("in/README.md");
JavaRDD<String> map = javaRDD.map(line -> {
longAccumulator.add(Long.valueOf(1));
return line.toUpperCase();
});

// 执行 action 操作才会执行以上 transformation
map.count();
System.out.println("count: " + javaRDD.count());
System.out.println("i: " + longAccumulator.value());

输出

count: 103
i: 103

1.2.2 累加器和广播变量的不同

  1. 广播变量一般用于 Spark 程序调优,如果不使用广播变量,程序计算结果不会错误,只是性能可能低下。

  2. 累加器不同,如果应该使用累加器的场景,你不使用,此时,程序计算结果就是错误的。

2. Spark 应用程序调度过程

spark 应用程序调度过程.png

2.1 Client 提交 Job, Driver 会向 master 申请资源,master 收到请求之后,会向 worker 进行资源调度,启动 Executor,然后,Executor 向 Driver 进行注册。

2.2 此时Spark 应用程序就会知道哪些 worker 上面的 executor 已经就绪。接着开始执行 spark 任务:

  1. 遇到 action 操作—> 创建一个 job—> 提交给 DAGScheduler,DAGScheduler 会 把 Job分为多个 stages(shuffle: 最后一个 stage 里面的 task 叫 ResultTask,前面 stage 里面的 task 叫 ShuffleMapTask),为每个 stage 创建 一个 taskset 集合,集合中的 task 计算逻辑完全 相同,只是处理的数据不同。Task 的数量等于 partition 的数量,但是同时执行的 task 的数量等于集群 core 的数量。整个 Application 运行完成时间,等于最后一个执行完成的 task 的时间。(数据倾斜问题)

  2. 然后 DAGScheduler 会把 taskset 交给 TaskScheduler ,TaskScheduler 会把 taskset 里面的 task 发送给 Executor。Executor 接收到 task,会启动一个线程池 TaskRunner,在里面运行 task。

spark-submit –master spark://bigdata01:7077 –class sparkcore.learnTextFile
F:\train\data\sparkapp\learnTextFile.jar

2.3 从 Spark 集群角度(静态):

2.3.1 Master

standalone 模式下的集群管理器,负责资源的分配。相当于 YARN 模式下的 ResourceManager。

2.3.2 Worker

集群中任何可以运行 Application 代码的节点,类似于 YARN 中的 NodeManager 节点。在 Standalone 模式中指的就是通过 Slave 文件配置的 Worker 节点,在 Spark on YARN 模式中指的就是 NodeManager 节点。

2.4 从 Spark 应用程序的角度(动态):

2.4.1 Application

Spark 应用程序,包含 Driver 功能代码和分布在集群中多个节点上运行的 Executor 代码。

2.4.2 Driver

运行 Application 的 main() 函数并创建 SparkContext,其中创建 SparkContext 的目的是为了准备 Spark 应用程序的运行环境。在Spark 中有 SparkContext 负责和 ClusterManager 通信,进行资源的申请、任务的分配和监控等。当 Executor 部分运行完毕后, Driver 负责将 SparkContext 关闭。

2.4.3 Action

2.4.4 Transformation

2.4.5 Job

包含多个 Task 组成的并行计算,由 Spark Action 生成,一个 Job 包含多个 RDD 及作用于 RDD 上的各种 transformation。

2.4.6 Stage

每个 Job 会被拆分很多组 Task,每组 task 被称为 Stage,也可称 TaskSet,一个作业分为多个 stage。

2.4.7 Task

被送到 Executor 上的工作任务,运行计算逻辑的地方。

2.4.8 Executor

运行在 Worker 节点上的一个进程,该进程负责运行 Task,并且负责将数据存在内存或者磁盘上,每个 Application 都有各自独立的一批 Executor。

3. Spark Master 高可用配置 HA

默认情况下,Standalone 调度集群能够处理 worker 故障(通过将计算任务转移到其他 worker)。然而,调度器使用一个 Master 来做出调度决策,这(默认情况下)会产生一个单点故障问题:如果 Master 崩溃,就不能提交新的应用程序。为了克服这一点,有两个高可用性方案,详细如下。

3.1 Standby Masters with ZooKeeper

3.1.1 概述

使用 ZooKeeper 来提供 leader 选举和一些状态存储,您可以在集群中启动多个连接到同一个 ZooKeeper 实例的 master 服务器。其中一个将被选为“leader”,其他的将保持 standby 状态。如果当前的 leader 挂掉 ,将选举另一个 master 成为 leader,恢复旧 master 的状态,然后恢复调度工作。整个恢复过程需要 1 到 2 分钟。注意,这种延迟只影响调度新应用程序——在 master 故障转移期间已经运行的应用程序不受影响。

3.1.2 配置

为了启动这个恢复模式,您可以在 spark-env.sh 中设置 SPARK_DAEMON_JAVA_OPTS,配置 spark.deploy.recoveryMode 和 spark.deploy.zookeeper.* 相关的配置。参考: http://spark.apache.org/docs/latest/configuration.html#deploy

3.1.3 可能的陷阱

如果您的集群中有多个 master,但是没有正确地配置 master 来使用 ZooKeeper,那么 master 将无法发现彼此,并认为他们都是 leader。这将不会导致一个健康的集群状态(因为所有 master 都将独立调度)。

3.1.4 详细

在设置了 ZooKeeper 集群之后,启用高可用性就很简单了。只需只用相同的 ZooKeeper 配置(ZooKeeper URL和目录)在不同的节点上启动多个 master 进程。master 可以在任何时候添加和删除。

为了安排新的应用程序或将 worker 添加到集群中,他们需要知道当前 leader master 的 IP 地址。这可以通过简单地传递一个 master URL 列表来实现,您以前在这些 master 列表中只传递一个 master URL。例如,您可以启动 SparkContext,指向spark://host1:port1,host2:port2。这将导致您的 SparkContext 尝试向两个 master 注册——如果 host1 宕机,这个配置仍然是正确的,因为我们将找到新的 leader: host2。

“注册到一个 Master 服务器”和正常 操作之间有一个重要的区别。在启动时,应用程序或 worker 需要能够找到并注册到当前的 master。但是,一旦注册成功,master 就是“在系统中”(即存储在 ZooKeeper)。如果发生故障转移,新 master leader 将联系所有以前注册的应用程序和 worker,通知他们 leader 的更改,因此他们在启动时甚至不需要知道新 master的存在。

由于这个属性,可以在任何时候创建新的 master,唯一需要担心的是,新的应用程序和 worker 可以找到它进行注册,以防它成为 leader。

3.1.5 HA 配置实战

  1. spark-env.sh 配置(所有节点都需要配置)

    export SPARK_DAEMON_JAVA_OPTS=”-Dspark.deploy.recoveryMode=ZOOKEEPER

-Dspark.deploy.zookeeper.url=master01:2181master02:2181,slave01:2181,slave02:2181,slave03:2181
-Dspark.deploy.zookeeper.dir=/opt/modules/spark213/meta”

  1. master01

    start.all.sh

  2. master02

    start-master.sh

  3. master01

    stop-master.sh

  4. 查看 master 是否切换,standby->active

3.2 带有本地文件系统的单节点恢复

3.2.1 概述

ZooKeeper 是实现 产品级高可用性的最佳方法,但如果您只是想在 master 服务器宕机时重启它,那么文件系统模式 可以解决这个问题。当应用程序和工作者注册时,它们有足够的状态被写入到提供的目录中,以便在 master 进程重新启动时可以恢复它们。

3.2.2 配置

为了启动这个恢复模式,可以在 spark-env.sh 中设置 SPARK_DAEMON_JAVA_OPTS 的相关配置:

属性 含义
spark.deploy.recoveryMode 将文件系统设置为启用单节点恢复莫斯(默认 :无)
spark.deploy.recoveryDirectory Spark 将存储恢复状态的目录,从 master 的角度进行访问

3.2.3 细节

这个解决方案可以与进程监视器、管理器(如 monitor)一起使用,或者只是通过重新启动启用手动恢复。

虽然文件系统恢复看起来比不进行任何恢复要好得多,但是对于某些开发或实验目的来说,这种模式可能不是最优的。特别是,通过 kill master 来停止 master 不会清理它的恢复状态,所以无论何时启动一个新主,它都会进入恢复模式。如果需要等待所有以前注册的工人、客户端超时,这将使启动时间增加最多 1 分钟。

虽然 它不受官方支持,但您可以将 NFS 目录挂载为恢复目录。如果原始的 master 节点完全死亡,那么您可以在另一个节点 上启动一个主节点,它将正确地恢复所有以前注册的 worker、应用程序(相当于 ZooKeeper 恢复)。然而,为了注册,未来的应用程序必须能够找到新的主程序。

4. Spark 应用程序内存和 CPU 的分配

4.1 Spark 运行模式

4.1.1 local

本地模式,不需要安装 spark,也不需要启动 spark 集群。用于开发环境

4.1.2 standalone

需要安装 Spark 需要启动 Spark 集群。
client: driver 运行在和 sparksubmit 同一个进程中,此时如果关闭命令行窗口,相当于取消程序运行。可以从控制台看到程序输出内容
cluster: driver 运行在 worker 节点上,spark-submit 提交后即退出,此时命令行窗口关闭,不影响程序运行,但是从控制台看不到程序输出内容。

4.1.3 yarn

需要安装 spark,不需要启动 spark 集群。
client: driver 运行在 client 进程中 ,此时如果关闭命令行窗口,相当于取消 程序运行。可以从控制台看到程序输出内容。
cluster: driver 运行在 ApplicationMaster 进程 中,client 端提交后即退出,此时命令行窗口关闭,不影响程序运行,但是控制台上看不到程序输出内容

4.1.4 mesos

4.2 如何设置 Spark 配置属性

在 spark-defaults.conf 文件中设置 spark 的资源配置,资源分配参数名为 spark.xx.xx,如 spark.driver.cores

4.2.1 有三种方式设置spark 的配置属性

(按优先级从高到低)

  1. 在程序代码中通过 SparkConf 对象设置;
  2. 通过 spark-submit 任务提交参数设置;
  3. 通过 spark-defaults.conf 文件设置。

4.2.2 典型的 spark-defaults.conf 文件内容

1
2
3
4
spark.executor.memory   8G
spark.driver.memory 16G
spark.driver.maxResultSize 8G
spark.akka.frameSize 512

4.3 集群资源配置实战

在 spark-env.sh 中配置

1
2
3
SPARK_WORKER_CORES=1
SPARK_WORKER_MEMORY=900m
SPARK_WORKER_INSTANCES=2

sparkenv配置.png

4.4 Spark 应用程序 资源配置实战

4.4.1 –executor-cores

  1. spark-submit –master spark://master01:7077 –executor-memory 900m –executor-cores 6 –class spark.learnTextFile /opt/sparkapp/learnTextFile.jar

上面的命令中,所需要的的单个 executor 的 cores 数量 6 超过了 worker 节点的 cores 数量(配置为单核),程序无法运行。
cores 配置超过物理节点 所有的核心数.png

  1. 如果 worker 有足够的资源,对于同一个应用,会在一个 worker 上启动多个 executor。如:

    spark-submit –master spark://master01:7077 –executor-memory 918m –executor-cores 1 –class sparkcore.learnTextFile /opt/sparkapp/learnTextFile.jar

一个 worker 启动多 exe1.png

一个 worker 启动多 exe2.png

另一个例子:

spark-submit –master spark://master01:7077 –executor-memory 1200m –executor-cores 1 –class sparkcore.learnTextFile /opt/sparkapp/learnTextFile.jar
一个 worker 没启动多 exe1.png

一个 worker 没启动多 exe2.png

另一种参数赋值的方式:
–conf PROP=VALUE

spark-submit –master spark://master01:7077 –conf spark.executor.memory-1201m –conf spark.executor.cores=1 –class sparkcore.learnTextFile /opt/sparkapp/learnTextFile.jar

4.4.2 参数优先级

spark.default.conf < spark-submit -conf < SparkConf 代码

测试

  1. 代码:
    参数优先级 代码设置.png

  2. spark-submit:

    spark-submit –master spark://master01:7077 –conf spark.executor.memory=1201m –conf spark.executor.cores=1 –class sparkcore.learnTextFile /opt/sparkapp/learnTextFile.jar

  3. 运行结果:
    参数优先级 结果1.png

参数优先级 结果2.png

4.4.3 几个常用的配置项

Spark 常用配置项,主要是对 Spark 运行过程中各个使用资源的地方,通过调整参数值,来优化资源使用效率,提升 Spark 作业性能。以下每个参数都对应着 spark 应用程序运行原理中的某个部分,同时给出了一些参考值。

1. num-executors

  • 参数说明:该参数用于设置 Spark 作业要用多少个 Executor 进程来执行。Driver 在向 YARN 集群管理器申请资源时,YARN 集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的 Executor 进程。默认只会给你启动少量的 Executor 进程,此时你的 Spark 作业的运行速度是非常慢的。

  • 参数调优建议:参考 yarn container 的大小,设置太少或太多的 Executor 进程都不好。设置的太少,无法充分利用集群资源;设置 的太多的话,大部分队列可能无法给予充分的资源。

2. executor-memory (spark.executor.memory)

  • 参数说明:该参数用于设置每个 Executor 进程的内存 。Executor 内存的大小,很多时候直接决定了 Spark 作业的性能,而且跟常见的 JVM OOM异常,也有直接的关联。Executor 8G 4Core; 2Executor 4G 2Core

  • 参数调优建议:每个 Executor 进程的内存设置 4G ~ 8G 较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors 乘以 executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源 队列最大总内存的 1/3~1/2,避免你自己的 Spark 作业占用了队列所有的资源,导致别的作业无法运行。

3. executor-cores (spark.executor.cores)

  • 参数说明:该参数用于设置每个 Executor 进程的 CPU core数量。这个参数决定了每个 Executor 进程并行执行 task 线程的能力。因为每个 CPU core 同一时间只能执行一个 task 线程,因此每个 Executor 进程的 CPU core 数量越多,越能够快速地执行完分配给自己的所有 task 线程。
  • 参数调优建议:Executor 的 CPU core 数量设置为2 ~ 3 个较为合适。同样地根据不同部门的资源队列来定,可以看看自己的资源队列的最大 CPU core限制是多少,再依据设置的 Executor 数量,来决定每个 Executor 进程可以分配到几个 CPU core。同样建议,如果是跟他人共享这个队列,那么 num-executors * executor-cores 不要超过队列总 CPU core 的1/3~1/2 左右比较合适,也是避免影响其他同学的作业运行。

4. driver-memory

  • 参数说明:该参数用于设置 Driver 进程的内存。
  • 参数调优建议:Driver 的内存通常来说不设置,或者设置 1G 左右应该就够了。唯一需要注意的一点是,如果需要使用 collect 算子将RDD的数据全部拉取到 Driver 上进行处理,那么必须确保 Driver 的内存足够大, 否则会出现 OOM 内存溢出的问题。

5. spark.default.parallelism

  • 参数说明:该参数用于设置每个 stage 的默认 task 数量。这个参数极为重要,如果不设置可能会直接影响你的 Spark 作业性能。
  • 参数调优建议:Spark 作业的默认 task 数量为 500 ~ 1000 个较为合适。如果不设置这个参数,那么此时就会导致 Spark 自己根据底层 HDFS的 block数量来设置 task的数量,默认是一个 HDFS block 对应一个 task。通常来说,Spark 默认设置的数量是偏少的,如果task 数量偏少的话,就会导致你前面设置好的 Executor 的参数都无法提升性能。比如,无论你的 Executor 内存和 CPU 有多大,但是 task 只有一个或者几个,那么 90% 的 Executor 进程可能根本就没有 task 执行,也就是白白浪费了资源!因此 Spark 官网建议的设置原则是,设置该参数为 num-executors * executor-cores 的 2~3 倍比较合适

5. spark.storage.memoryFraction

  • 参数说明:该参数用于设置 RDD 持久化数据在 Executor 内存中能占的比例,默认是 0.6。也就是说,默认 Executor 60% 的内存,可以用来保存持久化的 RDD数据。

  • 参数调优建议:如果 Spark 作业中,有较多的 RDD 持久化操作,该参数的值可以适当 提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果 Spark 作业中的 shuffle 类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的 gc 导致运行缓慢 (通过 spark web ui 可以观察到作业的 gc 耗时),意味着 task 执行用户代码的内存不够用,那么同样建议调低这个参数的值。

6. spark.shuffle.memoryFraction

  • 参数说明:该参数用于设置 shuffle 过程中一个 task 拉取到上个 stage 的task 的输出后,进行聚合操作时能够使用的 Executor 内存的比例,默认是 0.2。也就是说,Executor 默认只有 20% 的内存用来进行该操作。shuffle 操作在进行聚合时,如果发现使用的内存超过了这个 20% 的内存用来进行该操作。shuffle 操作在进行聚合时,如果发现使用的内存超过了这个 20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
  • 参数调优建议:如果 Spark 作业中的 RDD 持久化操作较少,shuffle 操作较多时,建议降低持久化操作的内存占比,提高 shuffle 操作的内存占比比例,避免 shuffle 过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现由于频繁的 gc 导致运行缓慢,意味着 task 执行用户代码的内存不够用,那么同样建议调低这个参数值。

:资源参数的调优,没有一个固定的值,需要根据自己的实际情况(包括 Spark 作业中的 shuffle 操作数量、RDD 持久化操作数量以及 spark web UI 中显示的作业 gc 情况),同时参考本篇文章中给出的原理以及调优建议,合理地设置上述参数。

5. Spark on yarn

5.1 概述

  1. Spark 支持可插拔的集群管理器(Standalone, YARN),集群管理负责启动 executor 进程。Spark 支持的四种集群模式(standalone, mesos, yarn, Kubernetes),三种集群模式都是由两个组件组成:master 和 slave。Master 服务(YARN ResourceManager, Mesos master 和 Spark standalone master)决定哪些 application 可以运行,何时运行以及在哪里运行。而 slave 服务(YARN NodeManager, Mesos slave 和 Spark standalone worker)是运行 executor 进程。

  2. 当在 YARN 上运行 Spark 作业,每个 Spark executor 作为一个 YARN 容器(container)运行。Spark 应用程序的多个 Tasks 在同一个容器(container)里面运行。注意:MapReduce作业为每个 Task 开启不同的 JVM 来运行。

  3. 不需要启动 spark 集群,但是需要配置 SPARK_HOME。
    配置 HADOOP_CONF_DIR=$HADOOP_HOME。

  4. 提交命令:

    spark-submit –class sparkcore.learnTextFile –master yarn –deploy cluster /opt/sparkapp/learnTextFile.jar

  5. 查看输出:

    yarn logs –applicationId application_xxx

  6. 查看任务运行状态:

    yarn application -status application_xx

5.2 资源分配

资源分配包括内存的分配和 CPU 的分配

5.2.1 CPU 的分配

Spark 计算的关键抽象是 RDD,RDD 包括多个 partitions, 一个 partition 对应一个 task,对应一个 CPU core。一个 spark job 的并行度依赖于 RDD的partitions 和可用的 CPU cores。

5.2.2 内存的分配

spark 内存主要用于执行 job(执行内存)和存储数据(存储内存)
执行内存可以驱逐存储内存(仅在存储内存达到一定阈值时)。存储内存不可以驱逐执行内存。

执行内存用于 shuffle、joins、sorts 和 aggregations。

存储内存用于缓存数据和集群内传播数据。

5.3 如何给 Spark 分配资源

1. 当 Spark 应用运行在 yarn 上面,对 yarn 来说,Spark 应用只是一个应用,就像 MapReduce 应用一样,只是一个运行在 yarn 上面的应用而已。这是极好的!因为你已经学得有关 yarn 架构、资源分配和调优知识,对 Spark on yarn 完全有效。

2. 如前所述,yarn 有两个关键的实体:
ResourceManager–管理集群资源;
ApplicationMaster–负责从 ResourceManager 请求资源,进而将资源分配给每个集群节点上的 NodeManager,这样集群就可以执行每个 task。

3. ApplicationMaster 是属于某个特定 Application的,执行 MapReduce 应用时,yarn 使用了 MapReduce 框架特定的 ApplicationMaster,当执行 Spark job 时,yarn 使用了 Spark 框架特定的 ApplicationMaster。

4. yarn 通过逻辑抽象单元 container 来分配资源,container 代表一个资源集——内存和 CPU。例如,一个 container 可以由 2个 CPU core 和 4G 内存组成。当 Spark ApplicationMaster 从 ResourceManager 请求资源时,通过评估 job 的资源需求,然后请求一定数量的 container 来完成 job。基于集群可用资源,ApplicationMaster 会要求 worker 节点上的 NodeManaer 运行一定数量的 container。

5. 当运行 Spark on yarn 应用时,Spark 位于 yarn 架构之上,使用相同的过程请求资源,yarn 用相同的方式将 container 分配给 Spark job。Spark 所有的 executor 和 driver 都运行在 container 中,ApplicationMaster 负责 container 间的通信。

6. ApplicationMaster 运行在一个单独的 container 中。Executor 也运行在 yarn container 中,一个 Executor 运行在一个 container 中,在 MapReduce 应用资源分配过程中,一个 map/reduce task 运行在单独的 container 中。但是,在 spark Executor 中,Executor container 包含一个更细粒度的实体——task。每个 Executor container 可以运行一个 task 集合,来完成实际的工作。

7. Spark 使用了由 YARN 管理的两个关键资源:CPU 和内存。虽然磁盘 I/O 和网络性能对应用程序性能有影响,YARN 并不是真正关注这些资源。

5.4 Spark 应用资源分配的限制

可以配置 YARN 属性来限制 YARN 可以使用的最大内存和 CPU core。Spark 的资源使用受限于这些配置属性。总结一下这些属性。

  • yarn.nodemanager.resource.memory-mb 参数设置了分配给集群中一个运行 NodeManager 节点 上所有 container 的内存上限。此内存设置对 Spark on yarn 同样有效。

  • yarn.nodemanager.resource.cpu-vcores 参数设置了集群中一个运行 NodeManager 节点上所有 containers 可以使用的最大 CPU 核心数。

  1. yarn 以块的形式分配内存,内存块的大小依赖于参数 yarn.scheduler.minimum-allocation-mb 值—— yarn 可以为每个 container 请求分配的最小内存块。

  2. yarn 以 core 的形式分配 CPU,core 的个数依赖于参数 yarn.scheduler.minimum-allocation-vcores 值—— yarn 可以为每个 container 请求分配的最小 core 数。

5.5 spark on yarn 应用的提交方式

  1. yarn-client 模式:spark driver 运行在客户端 client 进程中。YARN ApplicationMaster 进程代表应用向 YARN 请求资源;
    client 向 yarn 的 RM 申请 container,用于运行 AM,RM 在 NodeManager 上启动 container 运行 AM (里面没有 SparkContext),SparkContext 在 client 端实例化,会通知 AM,申请资源,AM 向 RM 申请资源,AM 在 NodeManager 上启动 container(executor), SparkContext 分配 task 给 executor,executor 启动线程池执行,运行完成后,driver 会通知 RM 收回资源,sparkcontext.close()。

  2. yarn-cluster 模式:spark driver 运行在 YARN 管理的 ApplicationMaster 进程中——client 将定期轮询 AM 以获取状态更新,并在控制台显示它们。一旦应用程序运行完毕,client 将退出。


    client 向 yarn 的 RM 申请 container,用于运行 AM,RM 在 NodeManager 上启动 container 运行 AM,**AM 类里面会实例化 SparkContext(driver)**,AM 向 RM 申请资源,AM 在 NodeManager 上启动 container(executor), sparkContext 分配 task 给 executor,executor 启动线程池执行,运行完成后,driver 会通知 RM 收回资源,sparkcontext.close()。

5.6 Spark on yarn 的资源分配依赖于 spark 的运行模式

因此,我们分别讨论 client 和 cluster 模式中的资源分配。

注意,关于配置 YARN 资源分配的所有知识,为 Spark 分配资源仍然有效。

yarn 需要为以下 Spark 关键实体分配资源

  1. driver
  2. Executor

5.6.1 为 spark driver 分配资源

在开始讨论如何分配资源给 Spark driver 之前,请允许我总结一下 driver 所扮演的角色。

driver 的职责
spark on yarn 应用程序的 driver 进程完成如下功能:

  1. 使用 spark 执行引擎,将应用程序分成 job,stages 和 tasks。
  2. 为 Executor 进程提供包依赖服务。 –jars
  3. 与 YARN ResourceManager 交互,获取资源,分配给各个节点用于执行应用程序的 task。

注意

  • 在 Spark 运行环境中,driver负责:
  1. 定义和调用 RDD 的 action;
  2. 跟踪 RDD 的血统 lineage。
  • workers(executors)负责:
  1. 存储 RDD partitions;
  2. 执行 RDD transformation。

一旦 driver 获取资源执行 Spark Application, 其会创建一个执行计划——根据应用程序代码中的 action 和 transformation 生成一个有向无环图 DAG,并发送给 worker节点。

driver 进程包括两个组件,用来处理 task 分配:

  1. DAG Scheduler 进程将 DAG 划分为 task;
  2. task scheduler 在集群各个节点间调度 task。一旦 task scheduler 完成了 task 分配,Executor 开始执行 DAG 中的操作。如果有 task 失败或者出现延迟,task scheduler 会重启失败(失败次数可配置 4 次)的 task 或创建新的 task 来替换延迟的 task。

5.7 Client 模式下 AM、Driver 的资源分配

Spark on yarn cient 模式,Spark 应用程序对应的 AM 的资源分配依赖于下面两个配置参数:

配置参数 参数描述 默认值 例子
spark.yarn.am.memory AM 的 JVM 堆内存 512M spark.yarn.am.memory 777m
spark.yarn.am.cores AM 可用的 core 数量 1 spark.yarn.am.cores 4

由于 YARN 分配资源的单位是 container,那么 AM 运行所在的 container 的大小是多少呢?使用参数spark.yarn.am.memory 设置的内存并不意味着 yarn 为 container 分配的内存就是 777m。

在 client 模式下,Spark 为 AM 进程分配了一定量的堆外内存。配置参数 spark.yarn.am.memoryOverhead 设置了堆外内存的大小。默认值为:
AM Memory * 0.1,最小值为 384M

那么,我们为 AM 设置了 777MB,777MB*0.1 = 77.7MB,比 384MB 小,所以堆外内存为 384MB。AM 的 cotainer 大小为 777MB+384MB=1161MB,因为 yarn.scheduler.minimum-allocation-mb 的默认值为 1G,yarn 会做舍入操作,为 AM 分配一个内存为 2G 的 container。所以,上例中的 container 大小为:2GB 内存(java 堆内存:-Xmx777M)和 4 CPU core。

Client 模式下的 driver 内存,使用 spark.driver.memeory

5.8 Cluster 模式下 AM、Driver 的资源分配

在 cluster 模式下,spark driver 运行在 yarn ApplicationMaster 进程中。所以,分配给 AM 的资源决定了 driver 的可用资源。下面看下 driver 相关的资源配置参数,这些配置参数,控制了 ApplicationMaster 的资源分配。

配置参数 参数描述 默认值 例子
spark.drivers.cores AM 可用的 core 数量 1 spark.driver.cores 2
spark.driver.memory AM 的 JVM 堆内存 512M spark.driver.memory 1665m
spark.driver.memoryOverhead Driver 堆外内存 driverMemory * 0.1,至少 384M

注意:与 client 模式一样,cluster 模式也有一个类似的配置参数:
spark.driver.memoryOverhead 用于指定 cluster模式下的堆外存大小。该属性默认值为分配给 ApplicationMaster 内存的 10%,最小值为 384M。

提示:在 cluster 模式中,当你配置 spark driver 的资源时,间接配置了 yarn AM 服务的资源,因为 driver 运行在 AM 中。

因为 1665+Max(384, 1665*0.1) = 1665+384 = 2049 > 2048,并且 yarn.scheduler.minimum-allocation-mb = 1024,所以 container 大小为:3GB 内存(Java 堆内存:-Xmx1665M)和 2 CPU core。

5.9 Executor 的资源配置

Spark 任务的真正执行是在 worker 节点上——也就是说所有的 task 运行在 worker 节点上。spark job 的大部分资源应该分配给 Executor。相比而言,driver 的资源分配要小的多。

对于 Spark executor 资源,yarn-client 和 yarn-cluster 模式使用相同的配置

配置参数 参数描述 默认值
spark.executor.instances(–num-executors) 用于静态分配 executor 的数量 2
spark.executor.cores(–executor-cores) 单个 executor 的 core 数量 1
spark.executor.memeory(–executor-memeory) 每个 executor 的堆内存大小 1G
spark.executor.memoryOverhead 每个 executor 的堆外存大小 executorMemory*0.1,至少 384M

计算:如果设置 spark.executor.memeory 大小为 2G,那么将启动 2 个 container,大小为 3G,1core,-Xmx2048M

在 Spark 资源分配表述中,Executor 是 container 的同义词——即一个 Executor,一个 Container。因此,Spark 的 Executor 的分配转换成了 yarn 的 container 的分配。当 Spark 计算出所需要的 Executor 数量,其与 AM 交互,AM 进而与 YARN RM 交互,请求分配 container。

spark.executor.memeory 参数设置会影响下面两个地方:
–Spark 可以缓存的数据量;
–shuffle 操作可以使用的内存最大值。

下图展示了 spark executor 内存与 yarn 总内存 yarn.nodemanager.resource.memory-mb的关系:
图展示了.png

下面的例子显示了在提交 Spark 应用程序时,如何设置目前为止讨论的属性:

spark-submit –class org.apaches.spark.example.SparkPi \
–master yarn \
–deploy-mode cluster \
–driver-memory 4g \
–executor-memory 2g \
–executor-cores 1 \
–queue thequeue \
lib/spark-example*.jar \
10

5.10 Spark 内存使用

5.10.1 Spark 如何使用内存

在前面的章节中,我解释了如何分配内存给 Spark:通过分配内存给 driver 和 executor。Spark 是如何使用分配到的内存呢?Spark 主要在两个方面使用内存:执行代码和存储数据(执行内存 execute memory 和存储内存 storage memory)。
执行内存,用于执行 Spark 操作,包括 shuffle, joins 和 sorts。
存储内存,用于 Spark 缓存数据和在集群间移动中间数据。
执行内存的增加,会导致内存中的数据对象被清除,直到存储内存值达到一个阈值。

5.10.2 Spark 内存使用的配置

为了了解 Spark 如何分配执行内存和存储内存,我会用下列符号代表内存的各个组成部分。

  • M:表示执行内存和存储内存的总和。
  • R:最小存储空间(阈值),低于这个值,不能将 RDD 从存储内存中清除出去。

使用以下两个配置属性来调整执行内存和存储内存大小:

  • spark.memory.fraction:这个分数表示 M 是 JVM 堆空间的一部分。默认值为0.75(Spark1.6),0.6(Spark2.0)。意思是:分配给 executor 的内存,60% 用于 M,剩下的 40% 用于存储用户数据结构和内部 Spark 元数据。Apache Spark 建议保留该属性默认值。

  • spark.memory.storageFraction:这个分数表示,保留存储内存 R 的大小占总内存 M 的百分比。默认值为 0.5。意思是:如果 Spark 应用程序缓存的 RDD 位于 M 总内存的 50% 之内,将禁止被清除出去。注意:这个属性值越大,用于执行代码的内存越小,task 会频繁溢出数据到磁盘。Apache Spark 建议保留该属性默认值。如果在执行 Spark 应用程序过程中,频繁发生 GC,此时可以适当调低这个属性值,如从 0.5 调低到 0.4,这样 M 中的 60% 内存用于代码执行。如:

    spark-shell –conf spark.memory.storageFraction=0.4

5.10.3 一个 Executor 究竟能使用多少内存?

分配给 Executor 的内存是 Spark 应用程序执行的关键。假设你设置了 spark.executor.memory 的值为 4GB,那么 4GB内存中,有多少内存真正用于 Spark 应用程序代码执行呢?下面的分析显示了 executor 实际上可以使用多少内存执行代码。

  1. 首先,Spark 会从 4GB Java heap 中减去 300MB,作为”reserved memeory”(保留内存)。现在 Java heap 中只剩下 4096-300= 3796MB 内存。

  2. spark.memory.fraction 属性值,按默认值 0.75,将会使用内存 0.75*3796=2847MB。

  3. spark.memory.storageFraction 属性值,默认值 0.5,那么用于存储内存的大小为:0.5*2847MB=1423.5MB,这是初始存储内存区域大小。

  4. executor 可以使用另外的 50%,即 1423.5MB 执行应用程序代码。

好好看看你的的应用程序并为存储内存和执行内存找出近似比例。然后调整 spark.memory.fraction 和 spark.memory.storageFraction 的属性值。Spark 建议您保留默认位置,因为它们是用于大多数应用程序场景。但是,每个 Spark 应用程序都是不同的。

5.10.4 发现当前内存使用量

如果您想要计算 RDD 需要多少内存,最好的办法是创建 RDD 并缓存它。然后您可以查看 Spark web UI 存储页面统计数据,以计算 RDD 的当前内存使用量。

使用 SizeEstimator 类的 estimate 方法计算某个对象占内存空间的大小。具体用法参见:
https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/util/SizeEstimator.html
您可以尝试各种存储、执行内存设置来优化内存使用。

5.10.5 需要记住的几点

以下是您在配置用于 Sparkdriver 程序和 Executor 的资源时可能需要考虑的一些重要事项。

1. 使用尝试和错误来调整 executor 内存
如果分配给 Executor 过多的内存,GC 会耗时较长。然而,在许多情况线下,您需要为 Executor 分配更高的内存,因为默认值只有 1024MB。在一个特定应用程序调优中,你可以尝试升高不同的内存值,以找到合适的数字。大多数情况下,分配给 Executor 的内存不需要超过 6GB,大约 4GB 的大小对于一个 Executor 来说是一个很好的内存分配。任何分配内存太多,垃圾回收可能会对应用程序性能产生负面影响,尽管如果使用新的 GC 方法(G1GC),可以减轻这种情况。

2. 限制每个 Executor 的 task 数量

5.11 Client 模式还是 Cluster 模式

在client 模式下,spark-submit 脚本创建了 driver,driver 与任务提交脚本运行在同一个进程中,此时,你可以方便的跟踪调试应用程序,因为应用程序的输出信息都会打印到屏幕上。如果在你的笔记本电脑上面运行 spark-submit 命令提交应用程序,然后关闭了笔记本电脑,那么这个应用程序就死掉;如果是在 cluster 模式下,这种情况不会影响应用程序的执行,因为你提交应用程序后,应用程序 driver 会在集群节点上运行。在 cluster 模式下,ResourceManager 决定 driver 进程运行在哪个集群节点上,不能人为指定。因此,需要保证在所有的集群节点上提供了所有的依赖库,如 jar 文件和 py 文件等。

spark.driver.maxResultSize 配置参数决定了 spark action 操作中,RDD 所有分区的记录总数大小,如 collect()。默认值为 1GB,如果数据集过大,你需要调高此参数值,因为一旦超过了此参数值,job 会被 kill 掉。如果你调高了这个参数值,请确认同时调高 spark.driver.memory参数值。spark.driver.maxResultSize 参数值为 0,无限大。

spark.cleaner.ttl 参数,定时清理 spark 程序运行过程中产生的元数据(stages 元数据或 task 元数据),对于长时间运行的 Spark Streaming 程序可以设置这个参数,以避免耗尽 driver 内存。

配置 spark 相关的网络参数
spark 使用 akka 框架进行网络通信。在生产环境中,有两个重要的网络配置参数可能需要调整:

  1. spark.akka.framesize: 默认值为128MB,设置了 driver 和 executors 传输消息的最大值。如果 job 运行了大量的 map/reduce task,可能需要调整这个参数。

  2. spark.akka.threads:默认值为 4,设置了用于交互的 akka actor 线程数。如果 driver 配置了多个 cpu core,你可能需要提升这个参数值。

支持一下
扫一扫,支持forsigner
  • 微信扫一扫
  • 支付宝扫一扫