盒子
盒子
文章目录
  1. 1. SparkSQL 连接 MySQL
    1. 1.1 SparkSQL 读 MySQL
    2. 1.2 SparkSQL 写 MySQL
  2. 2. SparkSQL 内部运行机制
    1. 2.1 SparkSQL 程序执行过程
    2. 2.2 逻辑执行计划(Logical Plan)
    3. 2.3 物理执行计划
    4. 2.4 执行

SparkSQL 连接 MySQL、SparkSQL 内部运行机制

1. SparkSQL 连接 MySQL

1.1 SparkSQL 读 MySQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 读 MySQL
// 方法一
Dataset<Row> df = spark.read()
.format("jdbc")
.option("url", "jdbc:mysql://master01:3306/test?characterEncoding=UTF-8")
.option("user", "root")
.option("password", "xxx")
.option("dbtable", "flight")
.load();
df.show();

// 方法二
Properties properties = new Properties();
properties.put("user", "root");
properties.put("password", "xxx");
Dataset<Row> flight = spark.read()
.jdbc("jdbc:mysql://master01:3306/test?characterEncoding=UTF-8", "flight", properties);
flight.show();

1.2 SparkSQL 写 MySQL

1
2
3
4
5
6
7
8
9
// Dataframe 写入 MySQL
spark.sql("select * from sales").write()
.format("jdbc")
.option("url", "jdbc:mysql://master01:3306/test?characterEncoding=UTF-8")
.option("user", "root")
.option("password", "xxx")
.option("dbtable", "sales")
.mode(SaveMode.Overwrite) // 对表格进行重写
.save();

2. SparkSQL 内部运行机制

示例

1
2
3
4
5
6
7
8
UserDefinedFunction largerThan = functions.udf((String zip2, Long number) -> Long.valueOf(zip2) > number, DataTypes.BooleanType);
ds.select(
zipToLong.apply(functions.col("zip")).as("zipToLong"),
functions.col("city"),
largerThan.apply(functions.col("zip"), functions.lit(99923L)).as("largerThanConst")
)
.orderBy(functions.desc("zipToLong"))
.explain(true);

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
== Parsed Logical Plan ==
'Sort ['zipToLong DESC NULLS LAST], true
+- AnalysisBarrier
+- Project [UDF(zip#16) AS zipToLong#91L, city#7, UDF(zip#16, 99923) AS largerThanConst#92]
+- Project [_id#6 AS zip#16, city#7, loc#8, pop#9L, state#10]
+- Relation[_id#6,city#7,loc#8,pop#9L,state#10] json

== Analyzed Logical Plan ==
zipToLong: bigint, city: string, largerThanConst: boolean
Sort [zipToLong#91L DESC NULLS LAST], true
+- Project [UDF(zip#16) AS zipToLong#91L, city#7, UDF(zip#16, 99923) AS largerThanConst#92]
+- Project [_id#6 AS zip#16, city#7, loc#8, pop#9L, state#10]
+- Relation[_id#6,city#7,loc#8,pop#9L,state#10] json

== Optimized Logical Plan ==
Sort [zipToLong#91L DESC NULLS LAST], true
+- Project [UDF(_id#6) AS zipToLong#91L, city#7, UDF(_id#6, 99923) AS largerThanConst#92]
+- Relation[_id#6,city#7,loc#8,pop#9L,state#10] json

== Physical Plan ==
*(2) Sort [zipToLong#91L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(zipToLong#91L DESC NULLS LAST, 200)
+- *(1) Project [UDF(_id#6) AS zipToLong#91L, city#7, UDF(_id#6, 99923) AS largerThanConst#92]
+- *(1) FileScan json [_id#6,city#7] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/E:/Java/IntelliJ IDEA/spark/in/zips.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_id:string,city:string>

2.1 SparkSQL 程序执行过程

  • 先写 Dataset API, SQL 代码;
  • 如果代码没有编译错误,Spark 会将代码转换为逻辑计划;
  • Spark 会将逻辑计划转换为物理计划,会对代码进行优化(catalyst 优化器);
  • Spark 会执行物理计划(RDD)。

2.2 逻辑执行计划(Logical Plan)

逻辑计划不涉及 Executor 和 Driver,只是将用户写的代码转换为最优版本,通过将用户代码转换为 unsolved logic plan,接着会转换为 resolved logic plan,catalog(所有表和 DataFrame 信息的存储库),接着会把这个计划给 catalyst 优化器,catalyst 优化器是一组优化规则的集合:谓词下推,投影等。

逻辑执行计划.png

2.3 物理执行计划

物理执行计划制定了逻辑执行计划,如果通过生成不同的物理执行策略(A/B/C 计划),会通过 Cost Model 来比较这些执行策略,从而选择一个最优的。其结果是一系列的 RDD 和 Transformation。

物理执行计划.png

2.4 执行

选择一个物理计划,运行所有的 RDD 代码,使用(tungsten)进一步优化,生成本地 Java 字节码文件,执行生成的各种 stage,最后返回结果给用户。

支持一下
扫一扫,支持forsigner
  • 微信扫一扫
  • 支付宝扫一扫