盒子
盒子
文章目录
  1. 1. 概述
  2. 2. 窗口函数的核心
  3. 3. 窗口函数分类
  4. 4. 窗口函数的用法
  5. 5. 窗口规范
    1. 5.1 分区规范、排序规范
    2. 5.2 Frame 规范
    3. 5.3 Frame 类型
  6. 6. 例一
    1. 6.1 数据
    2. 6.2 需求
    3. 6.3 代码
  7. 7. 例二
    1. 7.1 数据
    2. 7.2 需求
    3. 7.3 代码
  8. 8. 例三
    1. 8.1 数据
    2. 8.2 需求
    3. 8.3 代码
  9. 9. rank、dense_rank、row_number区别

SparkSQL 窗口函数

1. 概述

在 Spark 1.4 版本之前,Spark SQL 支持两种函数:

  • 内置函数或 UDF(如 substr() 或 round()):从单个行获取值作为输入,并为每个输入行生成一个返回值;
  • 聚合函数(如 SUM() 或 MAX()):对一组行进行操作,并为每组计算一个返回值。

虽然它们在项目中非常有用,但是仍然有很多操作不能单独使用这些类型的函数来表示。具体地说,无法同时对一组行进行操作,同时还为每个输入行返回一个值。

这种限制使得其很难执行各种数据处理任务,如计算移动平均线、计算累计和、访问出现在当前行之前的行值。幸运的是,对于 SparkSQL 用户来说,窗口函数填补了这个空白。

2. 窗口函数的核心

窗口函数的核心是根据一组称为 Frame 的行计算表的每一行输入的返回值。每个输入行可以有一个与之相关联的唯一 Frame。窗口函数的这一特性使得它们比其他函数更强大,并允许用户以简洁的方式表达各种数据处理任务,如果没有窗口函数,这些任务是很难表达的。

3. 窗口函数分类

SparkSQL 支持三种窗口函数:排序函数、分析函数和聚合函数
可用的排序函数和分析函数:
排序函数和分析函数.png

对于聚合函数,可以使用任何现有的聚合函数作为窗口函数。

4. 窗口函数的用法

  • 在 SQL 支持的函数后面加 over 方法,over 方法后面是 window 规范定义:avg(revenue) OVER(…)
  • 在 DataFrame API 中支持的函数上调用 over 方法:rank().over(…)

一旦一个函数被标记为一个窗口函数,下一步关键步骤就是定义与这个函数相关的窗口规范。

5. 窗口规范

  • 分区规范:控制哪些行将与给定行位于同一分区中。Spark 会根据分区规范获取指定的数据到某节点,而不是全部数据。
  • 排序规范:控制分区中的行排序的方式,确定给定行在其分区中的位置。
  • Frame 规范:根据当前输入行的相对位置,确定将哪些行包括在当前输入行的 Frame 中。

例如:“当前行到当前行之前的三行”描述了一个 Frame,包括当前输入行和当前行之前的三行。

5.1 分区规范、排序规范

在 SQL 中,partition by 和 order by 关键字分别用于为分区规范指定分区表达式和为排序规范指定排序表达式。如:over(partition by … order by …)

在 DataFrame API 中,提供了实用工具函数来定义窗口规范。

5.2 Frame 规范

除了排序和分区之外,用户还需要定义 Frame 的开始边界、结束边界和 Frame 类型,这是 Frame 规范的三个组成部分。

有五种类型的边界:

UNBOUNDED PRECEDING,
UNBOUNDED FOLLOWING,
CURRENT ROW,
PRECEDING
FOLLOWING.

其中 UNBOUNDED PRECEDING 和 UNBOUNDED FOLLOWING 分别表示分区的第一行和最后一行。
对其他三种类型的边界,它们指定当前输入行位置的偏移量,并根据 Frame 的类型定义其特定含义。

5.3 Frame 类型

有两种类型的 Frame: ROW Frame 和 RANGE frame:

ROW frame 基于当前输入行位置的物理偏移量,即 CURRENT ROW, PERCEDING 或 FOLLOWING 声明了一个物理偏移量。

RANGE frame 基于当前输入行位置的逻辑偏移量,具有与 ROW frame 类似的语法。

1
2
Window.partitionBy(...).orderBy(...).rowBetween(start, end)
Window.partitionBy(...).orderBy(...).rangeBetween(start, end)

6. 例一

6.1 数据

表1.png

6.2 需求

  • 每个类别中收益最高的前两名
  • 每种产品的收益与同类产品中收益最高的产品的收益差额

6.3 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Dataset<Row> product_revenue = spark.read()
.format("jdbc")
.option("url", "jdbc:mysql://master01:3306/test?characterEncoding=UTF-8")
.option("user", "root")
.option("password", "xxx")
.option("dbtable", "product_revenue")
.load();

product_revenue.createOrReplaceTempView("product_revenue");

// 每个类别中收益最高的前两名(排序函数 dense_rank())
// API
WindowSpec dense_rank_windowSpec = Window.partitionBy("category")
.orderBy(product_revenue.col("revenue").desc());
Column rank = functions.dense_rank().over(dense_rank_windowSpec);
product_revenue.select(functions.col("product"),
functions.col("category"),
functions.col("revenue"),
rank.alias("rank"))
.where("rank <= 2")
.show();

// SQL
spark.sql("select *, dense_rank() over (partition by category order by revenue desc) as rank from product_revenue").show();
spark.sql("select * from (select *, dense_rank() over (partition by category order by revenue desc) as rank from product_revenue) where rank <= 2").show();

// 每种产品的收益与同类产品中收益最高的产品的收益差额(聚合函数 max())
// API
WindowSpec max_windowSpec = Window.partitionBy("category")
.orderBy(product_revenue.col("revenue").desc());
Column difference = functions.max(product_revenue.col("revenue")).over(max_windowSpec).$minus(product_revenue.col("revenue"));
product_revenue.select(functions.col("product"),
functions.col("category"),
functions.col("revenue"),
difference.alias("difference")).show();

// SQL
spark.sql("select *, (max(revenue) over (partition by category order by revenue desc) - revenue) as difference from product_revenue").show();

7. 例二

7.1 数据

表2.png

7.2 需求

  • 按部门(deptno)对员工工资排序(salary)
  • 求每个部门工资总和

7.3 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 例二
Dataset<Row> employee = spark.read()
.format("jdbc")
.option("url", "jdbc:mysql://master01:3306/test?characterEncoding=UTF-8")
.option("user", "root")
.option("password", "xxx")
.option("dbtable", "employee")
.load();
employee.createOrReplaceTempView("employee");

// 按部门(deptno)对员工工资排序(salary)
// API
WindowSpec dense_rank_windowSpec = Window.partitionBy("deptno")
.orderBy("salary");
Column rank = functions.dense_rank().over(dense_rank_windowSpec);
employee.select(functions.col("empno"),
functions.col("ename"),
functions.col("job"),
functions.col("mgr"),
functions.col("hiredate"),
functions.col("salary"),
functions.col("comm"),
functions.col("deptno"),
rank.alias("rank")
).show();

// SQL
spark.sql("select *, dense_rank() over (partition by deptno order by salary) as rank from employee").show();

// 求每个部门工资总和
// API
employee.groupBy("deptno").agg(functions.sum("salary").alias("sum")).show();

// SQL
spark.sql("select deptno, sum(salary) from employee group by deptno").show();

8. 例三

8.1 数据

表3.png

8.2 需求

  • 当天用户对某产品的使用时长,期望在进行统计的时候,14号能累加13号的,15号能累加14、13号的,以此类推
  • 计算移动平均的例子
  • 累加历史:累加分区内当天及之前所有
  • 累加当日和昨天
  • 累加当日、昨日、明日
  • 累加分区内所有:当天和之前之后所有

8.3 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 例三
Dataset<Row> product_logs = spark.read()
.format("jdbc")
.option("url", "jdbc:mysql://master01:3306/test?characterEncoding=UTF-8")
.option("user", "root")
.option("password", "xxx")
.option("dbtable", "product_logs")
.load();
product_logs.createOrReplaceTempView("product_logs");

// 当天用户对某产品的使用时长,期望在进行统计的时候,14号能累加13号的,15号能累加14、13号的,以此类推
// API
WindowSpec sum_windowSpec = Window.partitionBy("product_code")
.orderBy("event_date");
Column sum_duration = functions.sum("duration").over(sum_windowSpec);
product_logs.select(
functions.col("product_code"),
functions.col("event_date"),
sum_duration.as("sum_duration")
).show();

// SQL
spark.sql("select product_code, event_date, sum(duration) over(partition by product_code order by event_date) as sum_duration from product_logs").show();

// 计算移动平均的例子
spark.sql("select product_code, event_date, avg(duration) over(partition by product_code order by event_date) as avg_duration from product_logs").show();

// 更加复杂的计算逻辑---累加历史:累加分区内当天及之前所有
// 默认 frame 边界: 'rows between unbounded preceding and current row'
// API
WindowSpec p_c_duration_windowSpec = Window.partitionBy("product_code")
.orderBy("event_date")
.rowsBetween(Window.unboundedPreceding(), Window.currentRow());
Column p_c_duration = functions.sum("duration").over(p_c_duration_windowSpec);
product_logs.select(
functions.col("product_code"),
functions.col("event_date"),
p_c_duration.as("p_c_duration")
).show();

// SQL
spark.sql("select product_code, event_date, sum(duration) over(partition by product_code order by event_date rows between unbounded preceding and current row) as p_c_duration from product_logs").show();

// 更加复杂的计算逻辑---累加当日和昨天
spark.sql("select product_code, event_date, sum(duration) over(partition by product_code order by event_date rows between 1 preceding and current row) as yest_to_duration from product_logs").show();

// 更加复杂的计算逻辑---累加当日、昨日、明日
spark.sql("select product_code, event_date, sum(duration) over(partition by product_code order by event_date rows between 1 preceding and 1 following) yest_tomo_duration from product_logs").show();

// 更加复杂的计算逻辑---累加分区内所有:当天和之前之后所有
spark.sql("select product_code, event_date, sum(duration) over(partition by product_code order by event_date rows between unbounded preceding and unbounded following) as p_f_duration from product_logs").show();

9. rank、dense_rank、row_number区别

代码

1
2
3
4
5
6
7
8
// rank
spark.sql("select *, rank() over (partition by deptno order by salary) as rank from employee").show();

// dense_rank
spark.sql("select *, dense_rank() over (partition by deptno order by salary) as dense_rank from employee").show();

// row_number
spark.sql("select *, row_number() over (partition by deptno order by salary) as row_number from employee").show();

输出

empno ename job mgr hiredate salary comm deptno rank
7876 ADAMS CLERK 7788 23-May-87 1100 0 20 1
7566 Jones MANAGER 7839 2-Apr-81 2975 0 20 2
7788 SCOTT SNALYST 7566 19-Apr-87 3000 0 20 3
7369 SMITH CLERK 7902 17-Dec-80 800 20 10 1
7782 CLARK MANAGER 7839 9-Jun-81 2450 0 10 2
7839 KING PRESIDENT 0 17-Nov-81 5000 0 10 3
7521 WARD SALESMAN 7698 22-Feb-81 1250 500 30 1
7654 MARTIN SALESMAN 7698 28-Sep-81 1250 1400 30 1
7844 TURNER SALESMAN 7698 8-Sep-81 1500 0 30 3
7499 ALLEN SALESMAN 7698 20-Feb-81 1600 300 30 4
7698 BLAKE MANAGER 7839 1-May-81 2850 0 30 5
empno ename job mgr hiredate salary comm deptno dense_rank
7876 ADAMS CLERK 7788 23-May-87 1100 0 20 1
7566 Jones MANAGER 7839 2-Apr-81 2975 0 20 2
7788 SCOTT SNALYST 7566 19-Apr-87 3000 0 20 3
7369 SMITH CLERK 7902 17-Dec-80 800 20 10 1
7782 CLARK MANAGER 7839 9-Jun-81 2450 0 10 2
7839 KING PRESIDENT 0 17-Nov-81 5000 0 10 3
7521 WARD SALESMAN 7698 22-Feb-81 1250 500 30 1
7654 MARTIN SALESMAN 7698 28-Sep-81 1250 1400 30 1
7844 TURNER SALESMAN 7698 8-Sep-81 1500 0 30 2
7499 ALLEN SALESMAN 7698 20-Feb-81 1600 300 30 3
7698 BLAKE MANAGER 7839 1-May-81 2850 0 30 4
empno ename job mgr hiredate salary comm deptno row_number
7876 ADAMS CLERK 7788 23-May-87 1100 0 20 1
7566 Jones MANAGER 7839 2-Apr-81 2975 0 20 2
7788 SCOTT SNALYST 7566 19-Apr-87 3000 0 20 3
7369 SMITH CLERK 7902 17-Dec-80 800 20 10 1
7782 CLARK MANAGER 7839 9-Jun-81 2450 0 10 2
7839 KING PRESIDENT 0 17-Nov-81 5000 0 10 3
7521 WARD SALESMAN 7698 22-Feb-81 1250 500 30 1
7654 MARTIN SALESMAN 7698 28-Sep-81 1250 1400 30 2
7844 TURNER SALESMAN 7698 8-Sep-81 1500 0 30 3
7499 ALLEN SALESMAN 7698 20-Feb-81 1600 300 30 4
7698 BLAKE MANAGER 7839 1-May-81 2850 0 30 5
支持一下
扫一扫,支持forsigner
  • 微信扫一扫
  • 支付宝扫一扫