盒子
盒子
文章目录
  1. 1. 聚合函数
  2. 2. UDF 自定义函数
  3. 3. Spark on Hive
    1. 3.1 Spark on Hive 与 Hive on Spark 区别?
    2. 3.2 Spark on Hive
      1. 3.2.1 相关组件
      2. 3.2.2 如何配置 Spark on Hive?
      3. 3.23 Spark on Hive 练习

SparkSQL 聚合函数、UDF 自定义函数、Spark on Hive

1. 聚合函数

1
2
3
4
5
6
// 聚合函数
// 4. 计算名为 CA 的 state,每个 city 的 zip 总数、人口总量
// API
ds.where("state = 'CA'").groupBy("city").agg(functions.count("zip").alias("zip_count"), functions.sum("pop").alias("total_pop")).show();
// SQL
spark.sql("select city, count(zip) as zip_count, sum(pop) as total_pop from zips where state = 'CA' group by city").show();

2. UDF 自定义函数

如果 Spark 内置函数不够用,那么可以自定义函数。
注册一个 UDF,用于将 String 类型的数据转为 Long 类型。

1
2
3
4
5
6
7
8
// API
UserDefinedFunction zipToLong = functions.udf((String zip) -> Long.valueOf(zip), DataTypes.LongType);
Dataset<Row> zip = ds.select(zipToLong.apply(functions.col("zip")));
zip.printSchema();

// SQL
spark.udf().register("zipToLong", (String zip1) -> Long.valueOf(zip1), DataTypes.LongType);
spark.sql("select zipToLong(zip) from zips").printSchema();

以上是两种不同的定义方式,① 定义 UDF 函数,只能在 Dataset API 中使用;② 将 UDF 函数注册到 SparkSession 中,在 SQL 中调用。
UDF 是一个函数,但是 UDF 不仅仅是一个函数,有自己特殊性:需要将 UDF 的参数看作数据表的某个列。
在使用 UDF 时,不一定非要传入列,还可以传入常量。

1
2
3
4
5
6
7
8
9
10
11
12
13
// API
UserDefinedFunction largerThan = functions.udf((String zip2, Long number) -> Long.valueOf(zip2) > number, DataTypes.BooleanType);
ds.select(
zipToLong.apply(functions.col("zip")).as("zipToLong"),
functions.col("city"),
largerThan.apply(functions.col("zip"), functions.lit(99923L)).as("largerThanConst")
)
.orderBy(functions.desc("zipToLong"))
.show();

// SQL
spark.udf().register("largerThan",(String zip3, Long number)-> Long.valueOf(zip3) > number, DataTypes.BooleanType);
spark.sql("select zipToLong(zip) as zipToLong, city, largerThan(zip,99923L) as largerThanConst from zips order by zipToLong desc").show();

3. Spark on Hive

3.1 Spark on Hive 与 Hive on Spark 区别?

Spark on Hive 是将 Hive 作为数据库,Spark 读取 Hive 的数据做计算;
Hive on Spark 是 Hive 使用 Spark 作为计算工具,相对于 Hive on MapReduce。

3.2 Spark on Hive

3.2.1 相关组件

  • 是否需要启动 Hive?
    不需要

  • 是否需要启动 HDFS?
    需要

  • 是否需要启动 YARN?
    不需要

3.2.2 如何配置 Spark on Hive?

服务器环境

  • 将 hive-site.xml、core-site.xml、hdfs-site.xml 拷贝到 /opt/modules/spark243/conf 文件夹下
  • 在拷贝到 spark_home/conf 所在节点上以 local 模式启动 spark-sql,验证连接
  • 如果 hive 的 metastore 是 mysql 数据库,需要将 mysql 驱动放到 spark_home/jars 目录下面

开发环境

  • 项目根目录创建 conf 文件夹,标记为 Resources Root,将 hive-site.xml、core-site.xml、hdfs-site.xml 复制到该目录
  • pom.xml 添加 mysql 连接工具包依赖
    1
    2
    3
    4
    5
    6
    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.47</version>
    <!--<scope>provided</scope>-->
    </dependency>

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
SparkSession spark = SparkSession.builder()
.master("local[*]")
.appName("SparkSQL_d3")
.enableHiveSupport()
.getOrCreate();
spark.sparkContext().setLogLevel("ERROR");

spark.sql("create table if not exists student(num int, name string) row format delimited fields terminated by ','");

spark.sql("load data inpath 'hdfs://ns/in/student' into table student");

spark.sql("select * from student").show();

3.23 Spark on Hive 练习

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 最贵的10类手机
* 销量最好的10类手机
* "dpmc","cpmc","cpjg","cpxl","splj","cppj"
*/
spark.sql("create table if not exists phone (dpmc string, cpmc string, cpjg string, cpxl string, splj string, cppj string) tblproperties('skip.header.line.count'=1) row format delimited fields terminated by ','");
spark.sql("load data inpath 'hdfs://ns/in/phone.csv' into table phone");

spark.sql("select * from phone").show();

// 最贵的 10 类手机
spark.udf().register("getPrice", (String p) -> Double.parseDouble(p.substring(2, p.length() - 1)), DataTypes.DoubleType);
spark.sql("select cpmc, getPrice(cpjg) as price from phone order by price desc limit 10").show();

// 销量最好的 10 类手机
spark.udf().register("getSales", (String str) -> {
int result = 0;
if (str != null && !str.equals("")) {
String numStr = str.substring(7, str.length() - 1).replaceAll("笔", "");
if (numStr.contains("万")) {
result = (int) (Double.valueOf(numStr.replaceAll("万", "")) * 10000);
} else {
result = Integer.valueOf(numStr);
}
}
return result;
}, DataTypes.IntegerType);

Dataset<Row> df = spark.sql("select cpmc, getSales(cpxl) as sales from phone order by sales desc limit 10");
// 存入 Hive 仓库
df.write().saveAsTable("sales");

// 验证
spark.sql("select * from sales").show();
支持一下
扫一扫,支持forsigner
  • 微信扫一扫
  • 支付宝扫一扫