盒子
盒子
文章目录
  1. 1. 诞生背景
  2. 2. Duration 时间窗口(Batch Duration,Slide Duration,Window Duration)
    1. 2.1 Batch Duration
    2. 2.2 Slide Duration(处理数据的时间间隔)和 Window Duration(处理的数据量间隔)
  3. 3. DStream
    1. 3.1 DStream 定义
    2. 3.2 DStream 输入源
    3. 3.3 DStream 操作
      1. 3.3.1 转换操作
      2. 3.3.2 输出操作
  4. 4. 例一
    1. 4.1 需求
    2. 4.2 代码
    3. 4.3 执行
    4. 4.4 说明
    5. 4.5 Streaming 作业和 Spark 作业之间的关系
  5. 5. Spark Streaming 架构
  6. 6. Spark 栈
  7. 7. 例二
    1. 7.1 需求
    2. 7.2 代码
    3. 7.3 执行
  8. 8. 例三
    1. 8.1 需求
    2. 8.2 代码
    3. 8.3 执行
    4. 8.4 说明

Spark Streaming 入门

1. 诞生背景

传统 MapReduce 等批处理框架已经满足不了人们对实时性的需求,出现了 Storm,Flink 等一批实时计算框架。

Spark Streaming 是在 Spark 批处理基础上构建的流式框架。

2. Duration 时间窗口(Batch Duration,Slide Duration,Window Duration)

2.1 Batch Duration

批处理间隔,它是指 Spark Streaming 以多少时间间隔为单位来提交任务逻辑。比如 1min,30s。这一参数将会伴随整个 StreamingContext 的生命周期且无法重新设置

Spark Streaming 处理数据的单位是一批,Spark Streaming 系统需要设置间隔使得数据汇总到一定的量后再一并进行批处理,这个间隔就是 Batch Duration。此参数决定提交作业的频率和数据处理的延迟。

2.2 Slide Duration(处理数据的时间间隔)和 Window Duration(处理的数据量间隔)

默认值都是和 Batch Duration相同,也可自行设置,但必须为 Batch Duration 的整数倍。
图片2.png

Spark Streaming 针对 Slide Duration 和 Window Duration 的保证:

  • 因为每个 Batch 内的数据可能被后几个窗口间隔所处理,所以数据会保存在 Spark Streaming 系统中,不会立即清理。
  • 窗口的重叠会带来重复计算,Spark Streaming 框架会进行优化,保证计算过的数据不会被重复计算。

注意
对于初始的几个窗口,有可能数据是没有撑满的,随着时间的推进,窗口会被最终撑满。

3. DStream

3.1 DStream 定义

Spark Streaming 抽象了离散数据流(DStream,Discretized Stream)这个概念,它包含了一组连续的 RDD,这一组连续的 RDD 代表了连续的流式数据。

DStream 是一组时间序列上连续的 RDD 来表示的,每个 RDD 都包含特定时间间隔内的数据流。我们对 DStream 上的各种操作最终都会映射到内部的 RDD 中。
图片1.png

3.2 DStream 输入源

图片3.png

3.3 DStream 操作

3.3.1 转换操作

转换操作不会产生和提交作业,只构成 DStream 的操作链。
图片4.png
图片5.png
图片6.png

3.3.2 输出操作

图片7.png

4. 例一

4.1 需求

使用 Spark Streaming + Socket 流实时进行 wordcount

4.2 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.miracle.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;

/**
* @program: sparkapp
* @description: SparkStreaming 实现 wordcount
* @author: Miracle
* @create: 2019-09-04 15:54
**/
public class Ss_wordcount {
public static void main(String[] args) {
if (args.length != 2) {
System.err.println("usage: Spark Streaming App <host> <port>");
}
String host = args[0];
int port = Integer.valueOf(args[1]);

SparkConf sparkConf = new SparkConf().setAppName("ss_wordcount");

// Durations.seconds(5) 指定 batchDuration
// JavaStreamingContext 内部包含 SparkContext
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
jsc.sparkContext().setLogLevel("ERROR");
JavaReceiverInputDStream<String> lines = jsc.socketTextStream(host, port);
lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator())
.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((a, b) -> a + b)
.print();

// 以上流程创建完成
jsc.start(); // 启动执行计划
try {
jsc.awaitTermination(); // 等待程序停止,执行期间发出的异常都将会抛出
} catch (InterruptedException e) {
e.printStackTrace();
}
jsc.stop();


}
}

4.3 执行

  • Maven install

  • 将 jar 包上传到服务器上

  • 提交 spark 程序

    1
    spark-submit --master local[2,3] --class com.miracle.spark.Ss_wordcount spark-app-1.0-SNAPSHOT-jar-with-dependencies.jar localhost 9999
  • 同一台机器再打开一个终端,执行 > nc -l 9999

  • 输入字符,程序每 5s 执行一次并输出 wordcount 结果。

4.4 说明

在 Spark Streaming 中,作业产生后并不会立即被提交,而是需要等到 StreamingContext 启动后才会被依次提交。 //jsc.start()
作业的提交间隔是由批处理间隔 Slide Duration(默认和 Batch Duration相同)决定的。

4.5 Streaming 作业和 Spark 作业之间的关系

Spark Streaming 作业最终会被翻译成 Spark 作业并提交和执行。DStream 在内部维护了相应的 RDD,对于 DStream 的操作,无论是转换操作还是输出操作,最终都会被映射到 RDD 上。

当我们在程序中构建 DStream 操作链,在 Spark Streaming 内部会隐式地构建 RDD 操作链。
见 DStream.generateJob 代码部分

5. Spark Streaming 架构

图片8.png

6. Spark 栈

Spark 容纳了一个栈式的库:包括 SQL and DataFrames,用于机器学习的 MLib,GraphX,SparkStreaming,你可以在相同的应用中无缝结合这些库。
图片9.png

7. 例二

7.1 需求

监听 HDFS 目录,对新增内容做 ETL 处理,checkpoint 检查点。

7.2 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.miracle.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import java.util.Arrays;

/**
* @program: sparkapp
* @description: 实时监听 HDFS 目录
* @author: Miracle
* @create: 2019-09-05 08:55
**/
public class Ss_HdfsApp {

public static void main(String[] args) {
if (args.length != 2) {
System.err.println("usage: Spark Streaming App <host> <port>");
}
String input = args[0];
String output = args[1];

// 检查点,checkpoint,用于容错,当程序崩溃时,你可以重启程序,并让驱动程序 driver 从检查点恢复
// 这样 Spark Streaming 就可以读取之前运行的程序处理数据的进度,并从失败的地方开始继续处理
JavaStreamingContext context = JavaStreamingContext.getOrCreate("/user/work/checkpoint", () -> {
SparkConf sparkConf = new SparkConf().setAppName("Ss_HdfsApp");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
jsc.checkpoint("/user/work/checkpoint");
JavaDStream<String> lineDStream = jsc.textFileStream(input);
JavaDStream<String> map = lineDStream.map(line -> line + "miracle");
JavaDStream<String> transform = map.flatMap(line -> Arrays.asList(line.split("=")).iterator());
transform.repartition(1).dstream().saveAsTextFiles(output, "");
return jsc;
});

// 以上流程创建完成
context.start(); // 启动执行计划
try {
context.awaitTermination(); // 等待程序停止,执行期间发出的异常都将会抛出
} catch (InterruptedException e) {
e.printStackTrace();
}
context.stop();
}
}

7.3 执行

  • spark-env.sh 增加配置(这样代码中 hdfs 前缀可以省略)

    HADOOP_CONF_DIR=/opt/modules/hadoop277/etc/hadoop

  • 执行 generateData.sh 不断生成 content* 文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    #! /bin/bash
    num=1
    hdfs dfs -rm -r /user/work/content
    hdfs dfs -rm -r /user/work/content_copy
    hdfs dfs -mkdir /user/work/content
    hdfs dfs -mkdir /user/work/content_copy

    rm -f content*
    while [ $num -le 10000 ]
    do
    sleep 2s
    (( num ++ ))
    echo "content="$num > content$num
    hdfs dfs -put content$num /user/work/content_copy
    hdfs dfs -mv /user/work/content_copy/content$num /user/work/content
    done
  • 提交任务

    1
    spark-submit --master yarn --deploy-mode cluster --driver-memory 1g --executor-memory 1g --executor-cores 1 --num-executors 3 --class com.miracle.spark.Ss_HdfsApp spark-app-1.0-SNAPSHOT-jar-with-dependencies.jar /user/work/content /user/work/output

   image.png

  • 验证 checkpoint 可用性

    1
    yarn application -kill application_1567915234723_0001
  • 查看最后一个输出结果
    image.png
    image.png

  • 重新提交任务

  • 查看输出文件夹
    image.png

  • 查看结果是否中断
    image.png

8. 例三

8.1 需求

滑动窗口实例:热点搜索词滑动统计,每隔 10s,统计最近 60s 的搜索词的搜索频次,并打印出排名最靠前的 3 个搜索词和它出现次数。(Slide Duration,Window Duration)

8.2 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.miracle.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;

/**
* @program: sparkapp
* @description: 热点搜索词滑动统计
* 每隔 10s,统计最近 60s 的搜索词的搜索频次,并打印出排名最靠前的 3 个搜索词和它出现次数。(Slide Duration,Window Duration)
* @author: Miracle
* @create: 2019-09-08 12:39
**/
public class Ss_WindowApp {
public static void main(String[] args) {
if (args.length != 2) {
System.err.println("usage: Spark Streaming App <host> <port>");
}
String host = args[0];
int port = Integer.valueOf(args[1]);

SparkConf sparkConf = new SparkConf().setAppName("ss_wordcount");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(5));
JavaReceiverInputDStream<String> searchDStream = jsc.socketTextStream(host, port);
JavaPairDStream<String, Integer> searchWordPairDStream = searchDStream.flatMap(line -> Arrays.asList(line.split(" ")).iterator()).mapToPair(word -> new Tuple2<>(word, 1));

// Window Duration 窗口长度是 60s
// Slide Duration 窗口长度是 10s
JavaPairDStream<String, Integer> searchWordCountPairDStream = searchWordPairDStream.reduceByKeyAndWindow((x, y) -> (x + y), Durations.seconds(60), Durations.seconds(10));

searchWordCountPairDStream.foreachRDD(p ->
p.mapToPair(x -> new Tuple2<>(x._2, x._1))
.sortByKey(false)
.mapToPair(x -> new Tuple2<>(x._2, x._1))
.take(3)
.forEach(x -> System.out.println("---------------top 3 word is: " + x))
);


// 以上流程创建完成
jsc.start(); // 启动执行计划
try {
jsc.awaitTermination(); // 等待程序停止,执行期间发出的异常都将会抛出
} catch (InterruptedException e) {
e.printStackTrace();
}
jsc.stop();
}
}

8.3 执行

  • 开启 socket 流端口并键入字符

    1
    2
    nc -l 9999
    不停输入字符...
  • 提交 spark 程序

    1
    spark-submit --master local[2,3] --class com.miracle.spark.Ss_WindowApp /opt/sparkAPP/spark-app-1.0-SNAPSHOT-jar-with-dependencies.jar localhost 9999
  • 输出

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    job1:
    ---------------top 3 word is: (hello,2)
    ---------------top 3 word is: (s,2)
    ---------------top 3 word is: (d,1)

    ···

    job2:
    ---------------top 3 word is: (,11)
    ---------------top 3 word is: (d,5)
    ---------------top 3 word is: (s,5)

    ···

8.4 说明

Slide Duration 为 10s,表示每 10s 执行一次 job;
Window Duration 为 60s,表示每次的 job 统计的是最近 60s 的数据。

支持一下
扫一扫,支持forsigner
  • 微信扫一扫
  • 支付宝扫一扫