盒子
盒子
文章目录
  1. 1. 概览
    1. 1.1 简述
    2. 1.2 SparkSQL 的用法之一:SQL 语句交互
    3. 1.3 SparkSQL 的用法之二:Dataset API 交互
    4. 1.4 SparkSession
    5. 1.5 Partition 分区
    6. 1.6 Transformation
    7. 1.7 延迟计算
    8. 1.8 Action
    9. 1.9 注册为表或视图
  2. 2. 重要概念
    1. 2.1 DataFrame
    2. 2.2 Dataset
    3. 2.3 DataFrame 和 Dataset
  3. 3. 代码实战

SparkSQL 入门

1. 概览

1.1 简述

SparkSQL 是 Spark 计算框架的一个模块。与基础的 Spark RDD API 不同,SparkSQL 为 Spark 提供了更多的数据结构 schema 信息。在内部,SparkSQL 使用这些额外的数据结构信息做进一步的优化操作。与 SparkSQL 交互的方式有多种,包括SQL 语句交互Dataset API 交互。当使用 SparkSQL 获取数据处理结果时,无论使用什么样的交互方式,无论什么样的语言编写的程序,其底层的执行引擎都是相同的。这就意味着,Spark 开发人员可以很容易在不同 API 之间来回切换,而不同担心性能方面的问题。

spark框架.png

1.2 SparkSQL 的用法之一:SQL 语句交互

SparkSQL 的用途之一是执行 SQL 查询。SparkSQL 也可以从已有 Hive数据仓库中读取数据(Spark on Hive)。SparkSQL 语句的执行结果是一个 DataFrame或Dataset 对象。同时支持命令行执行 SQL 和 JDBC/ODBC 连接。

1.3 SparkSQL 的用法之二:Dataset API 交互

Dataset 是一个分布式的数据集合。在 Spark 1.6 版本中,Dataset 作为一个新接口添加进来,兼具 RDD 的优点(强类型、使用强大的 lambda 函数的能力)和 SparkSQL 优化引擎的优势。Dataset 可以从 JVM 对象来构建,然后使用一系列转换函数(map、flatmap、filter等)进行计算。Dataset API 在 Scala 和 Java 语言中是支持的,Python 语言不支持 Dataset API。但是,由于 Python 的动态特性,Dataset API 的许多优势,Python 语言已经具备了。R 语言类似。

DataFrame(Dataset<Row>)是一个由命令列组成的 Dataset。概念上相当于关系型数据库中的一个表,但底层提供了更丰富的优化操作。DataFrames 可以从一系列广泛的数据来源中构建,如结构化数据文件,Hive 中的表、外部数据库、或者已有 RDD。

DataFrame API 在 Scala、Java、Python、R 语言中都支持。在 Scala API 中,DataFrame 是 Dataset[Row] 的类型别名。但是在 Java API 中,开发人员需要使用 Dataset<Row> 来表示一个 DataFrame。

1.4 SparkSession

SparkSession 提供了与底层 Spark 功能交互的入口,允许使用 DataFrame 和 Dataset API 对 Spark 进行编程。最重要的是,它限制了概念的数量(SparkContext、SQLContext 和 HiveContext),并且构建了开发人员在与 Spark 交互时必须兼顾的结构。

启动 spark-shell 控制台时,SparkSession 被实例化为 Spark 变量,可以直接使用。
sparkshell.png

运行代码:

1
val myRange = spark.range(1000).toDF("number")

我们创建了一个 DataFrame(Dataset<Row>),其中一个列包含 1000 行,值从 0 到 999。这一系列数字代表一个分布式集合。

注意
DataFrame 的概念并不是 Spark 特有的。R 和 Python 都有类似的概念。然而,Python/R DataFrames(有一些列外)存在于一台机器上,而不是多台机器上。这限制了给定的 DataFrame 只能使用某一台特定机器上存在的资源 。但是,因为 Spark 具有 Python 和 R 的语言接口,很容易将 Pandas(Python) 的 DataFrames、R DataFrames 转换为 Spark DataFrames。

DataFrame 和 Dataset 具有和 RDD 相同的概念。

1.5 Partition 分区

为了使每个 executor 执行器并行执行任务,Spark 将数据分为 partition(分区)。每个分区是集群中的一个物理机器上的行集合。DataFrame 的分区表示了在执行过程中数据是如何在集群中物理分布的。

需要注意的是,对于 DataFrames 操作,大多数情况下不需要手动或单独操作分区,因为使用 DataFrame 的高级 transformation 操作,底层会做一些优化操作,然后转化为 RDD 进行计算。

1.6 Transformation

执行一个简单的转换,以在当前的 DataFrame 中找到所有偶数:

1
2
3
// 使用 Dataset API
Dataset<Row> where = number.where("number % 2 = 0");
// where.show();

注意:这些操作没有输出。这是因为我们只声明了一个抽象转换(Transformation) where。Spark 将不会对转换进行操作,知道我们调用一个 Action 操作。

1.7 延迟计算

延迟计算意味着 Spark 将等到最后一刻才执行一系列 Transformation。在 SparkSQL 中,不会在执行某个 Transformation 操作时立即修改数据,Spark 会构建一个应用于源数据的 Plan。直到最后 Action 时执行代码,Spark 将这个计划从原始的 DataFrame 转换为 Physical Plan,该计划将在整个集群中高效地运行,因为 Spark 可以从端到端优化整个数据流。

例如 DataFrame 的谓词下推 pushdown 优化方式:如果我们构建一个大型的 Spark 作业,在最后指定一个过滤器 where,只需要从源数据中获取一行。最有效的执行方式从数据源过滤所需的单个记录。Spark 实际上是通过自动将过滤器下推来优化的。

1.8 Action

为了触发计算,需要运行一个 Action 操作。Action 操作使 Spark 通过执行一系列 Transformation 转换,得到计算结果。最简单的 Action 操作是 count,它给出了 DataFrame 中记录的总数(行数)。

1
System.out.println(where.count());

输出:500

有三种类型的 Action:

  • 在控制台中查看数据的 Action
  • 数据收集的 Action 操作
  • 输出到第三方存储系统的 Action 操作

在执行这个 count 操作时,启动了一个 Spark job,运行过滤器 where 转换(一个窄依赖转换),然后是一个聚合(一个宽依赖转换),它在每个分区基础上执行计数,然后是一个收集 Action,它将我们的结果返回到 driver 端。通过检查 Spark UI,可以看到所有这一切。

1.9 注册为表或视图

可以通过一个简单的方法 将任何 DataFrame 转换为一个表或视图:

1
2
number.createOrReplaceTempView("number");
Dataset<Row> sql = spark.sql("select * from number where number % 2 =0");

现在我们可以用 SQL 查询我们的数据了。使用 spark.sql(sqlString),spark 是我们的 SparkSession 变量,返回一个新的 DataFrame。

Spark 中的 DataFrames(和 SQL ) 已经有大量可用的操作。您可以使用和导入数百个函数来帮助您更快地解决大数据问题。

2. 重要概念

2.1 DataFrame

  • SparkSQL 从 Spark 1.3 开始引入了一个名为 DataFrame 的表格式数据抽象。
  • DataFrame 是用于处理结构化和半结构化的数据抽象。
  • DataFrame 利用其 Schema 以比原始 RDDs 更有效的方式存储数据。
  • DataFrame 利用 RDD 的不可变的、内存计算的、弹性的、分布式的和并行的特性,并对数据应用一个称为 Schema 的数据结构,允许 Spark 管理 Schema,以比 Java 序列化更有效的方式在集群节点之间传递数据。
  • 与 RDD 不同,DataFrame 中的数据被组织到指定的 columns 中,就像关系数据库中的表一样。

2.2 Dataset

  • 从 Spark 1.6 版本开始提供 Dataset API, Dataset API提供了:
    • 面向对象的编程风格
    • 像 RDD API 一样的编译时类型安全,编译时异常,运行时异常
    • 利用 Schema 处理结构化数据的优势
  • Dataset 是结构化数据集,数据集泛型可以是 Row(DataFrame),也可以是特定的数据类型。
  • Java 和 Spark 在编译时将指导数据集中数据的类型。

2.3 DataFrame 和 Dataset

  • 从 Spark 2.0 开始,DataFrame API 和 Dataset API 合并。
  • Dataset 提供了两个截然不同的 API 特性:strong typed API 和 untyped API。
  • 可以将 DataFrame 看作是 Dataset 的 untyped 类型:Dataset<Row>,Row 是一个 untyped 的 JVM 对象。
  • Dataset 是强类型 JVM 对象的集合。

3. 代码实战

创建 DataFrame 方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 集合创建 DataFrame
List<Person> personList = new ArrayList<>();
personList.add(new Person("张三", 20, "深圳"));
personList.add(new Person("李四", 22, "天津"));
Dataset<Row> dataFrame = spark.createDataFrame(personList, Person.class);
dataFrame.printSchema();
dataFrame.show();

// RDD 创建 DataFrame
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Person> parallelize = jsc.parallelize(personList);
Dataset<Row> dataFrame1 = spark.createDataFrame(parallelize, Person.class);
dataFrame1.printSchema();
dataFrame1.show();

// Json/CSV 文件创建 DataFrame
Dataset<Row> json = spark.read().json("E:\\Java\\IntelliJ IDEA\\spark\\in\\2015-summary.json");
json.createOrReplaceTempView("summary");
Dataset<Row> sql = spark.sql("select * from summary where count > 50 order by count desc limit 10");
sql.show();
支持一下
扫一扫,支持forsigner
  • 微信扫一扫
  • 支付宝扫一扫