盒子
盒子
文章目录
  1. 1. Schema
  2. 2. Rows
  3. 3. Column
  4. 4. Spark DataType
  5. 5. DataFrame 基本操作
  6. 6. Columns 操作
  7. 7. 字面常量转换为 Spark 类型(Literals)
  8. 8. 添加列
  9. 9. 重命名列
  10. 10. 删除列
  11. 11. 更改列类型
  12. 12. 过滤行
  13. 13. 行去重
  14. 14. DataFrame union
  15. 15. 行排序
  16. 16. Limit
  17. 17. Repartition 和 Coalesce
  18. 18. 收集数据到 driver
  19. 19. 作业

DataFrame 操作详解

DataFrame 和 Dataset 是(分布式的)类似于表的集合,具有定义好的行和列。每个列必须具有与所有其他列相同的行数(尽管您可以使用 null 来指定值的缺失),并且每个列都有类型信息,这些信息必须与集合中的每一行一致。这是因为内部存在一个 schema 的概念,其定义了分布式集合中存储的数据的类型。

1. Schema

Schema 定义了 DataFrame 的列名和类型。您可以手动定义 Schema 模式或从数据源读取 Schema 模式(通常称为读模式)。Schema 包含列类型,用于声明什么列存储了什么类型的数据

2. Rows

一行(Row)只是表示数据的一条记录。DataFrame 中的每条数据记录必须是 Row 类型。我们可以从 SQL 、弹性分布式数据集(RDDs)、数据源或手动创建这些 Rows。

1
2
3
4
5
6
// 返回 Row 对象数组
Row[] collect = (Row[])spark.range(2).toDF().collect();
for (Row r :
collect) {
System.out.println(r);
}

3. Column

Column 表示一个简单的类型,如 Integer 或 String,复杂类型,如 Array 或 Map,或 Null。Spark 将跟踪所有这些类型的信息,并提供多种方式对 Column 进行转换。

4. Spark DataType

Spark 有大量的内部类型表示,这样就可以很容易地引用在特定语言(Java、Scala)中,与 Spark 类型相匹配的类型。import org.apache.spark.sql.types.DataTypes 类中。

Scala type reference:

Data type Value type in Scala API to access or create a data type
ByteType Byte ByteType
ShortType Short ShortType
IntegerType Int IntegerType
LongType Long LongType
FloatType Float FloatType
DoubleType Double DoubleType
DecimalType java.math.BigDecimal DecimalType
StringType String StringType
BinaryType Array[Byte] BinaryType
BooleanType Boolean BooleanType
TimestampType java.sql.Timestamp TimestampType
DateType java.sql.Date DateType
ArrayType scala.collection.Seq ArrayType(elementType, [containsNull]. Note: The default value of containsNull is true.
MapType scala.collection.Map MapType(keyType, valueType, [valueContainsNull). Note: The default value of valueContainsNull is true.
StructType org.apache.spark.sql.Row StructType(fields). Note: fields is an Array of StructFields. Also, fields with the same name are not allowed.
StructField The value type in Scala of the data type of this field(for example, Int for a StructField with the data type IntegerType) StructField(name, dataType, [nullable]). Note: The default value of nullable is true.

Java type reference:

Data type Value type in Java API to access or create a data type
ByteType byte or Byte DataTypes.ByteType
ShortType short or Short DataTypes.ShortType
IntegerType int or Integer DataTypes.IntegerType
LongType long or Long DataTypes.LongType
FloatType float or Float DataTypes.FloatType
DoubleType double or Double DataTypes.DoubleType
DecimalType java.math.BigDecimal DataTypes.createDecimalType(); DataTypes.createDecimalType(precision, scale).
StringType String DataTypes.StringType
BinaryType byte[] DataTypes.BinaryType
TimestmapType java.sql.Timestamp DataTypes.TimestampType
DateType java.sql.Date DataTypes.DateType
ArrayType java.util.List DataTypes.createArrayType(elementType). Note: The value of containsNull will be true; DataTypes.createArrayType(elementType, containsNull).
MapType java.util.Map DataTypes.createMapType(keyType, valueType). Note: The value of valueContainsNull will be true. DataTypes.createMapType(keyType, valueType, valueContainsNull)
StructType org.apache.spark.sql.Row DataTypes.createStructType(fields). Note: fields is a List or an array of StructFields. Also, two fields with the same name are not allowed.
StructField The value type in Java of the data type of this field(for example, int for a StructField with the data typee IntegerType) DataTypes.createStructField(name, dataType, nullable)

Java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 使用 List<Row>、Schema 创建 DataFrame
* 注意: 创建 StructField、StructType,要使用 DataTypes 的工厂方法
*/
List<Row> rows = new ArrayList<>();
rows.add(RowFactory.create("张三", 20, "北京"));
rows.add(RowFactory.create("李四", 22, "上海"));
StructField[] fields = new StructField[]{
DataTypes.createStructField("name", DataTypes.StringType, false),
DataTypes.createStructField("age", DataTypes.IntegerType, false),
DataTypes.createStructField("address", DataTypes.StringType, false)
};
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> listSchemaDF = spark.createDataFrame(rows, schema);
listSchemaDF.show();
listSchemaDF.printSchema();

// DataFrame 转 Dataset
Encoder<Person> bean = Encoders.bean(Person.class);
Dataset<Person> ds = listSchemaDF.as(bean);

listSchemaDF.foreach(row -> {
// untyped
System.out.println(row.getAs("name").toString());
});

System.out.println("---------------------------");

ds.foreach(person -> {
// typed
System.out.println(person.getName());
});

5. DataFrame 基本操作

从定义上看,一个 DataFrame 包括一系列的 records(记录,就像 table 中的 rows),这些行的类型是 Row 类型,包括一系列的 columns(就像表格中的列)。Schema 定义了每一列的列名和数据类型。DataFrame 的分区定义了 DataFrame 或 Dataset 在整个集群中的物理分布情况。

1
2
3
4
5
6
// 创建 DataFrame
Dataset<Row> df = spark.read().json("E:\\Java\\IntelliJ IDEA\\spark\\in\\2015-summary.json");
// 查看 Schema
df.printSchema();
// 查看 schema
System.out.println(df.schema());

一个 Schema 就是一个 StructType,由多个 StructField 类型的 fields 组成,每个 field 包括一个列名称、一个列类型、一个布尔型的标识(是否可以有缺失值和 Null 值)。

6. Columns 操作

Spark 中的列类似于表格中的列。可以从 DataFrame 中选择列、操作列和删除列。对 Spark 来说,列是逻辑结构,它仅仅表示通过一个表达式按每条记录计算出的一个值。这意味着,要得到一个 column 列的真实值,我们需要一行 row 数据,为了得到一行数据,我们需要有一个 DataFrame。不能在 DataFrame 的上下文之外操作单个列。必须在 DataFrame 内使用 Spark 转换来操作列。

有许多不同的方法来构造和引用列,但最简单的两种方法是使用 col()column()函数。要使用这些函数中的任何一个,需要传入一个列名:

1
2
3
4
// in Scala
import org.apache.spark.sql.functions.{col, column}
col("someColumnName")
column("someColumnName")

如前所述,这个列可能存在于我们的 DataFrames 中,也可能不存在。在将列名称与我们 Catalog 中维护的列进行比较之前,列不会被解析,即列是 unresolved

注意
我们刚才提到的两种不同的方法引用列。Scala 有一些独特的语言特性,允许使用更多的简写方式来引用列。以下的语法糖执行完全相同的事情,即创建一个列,但不提供性能改进:
$”myColumn”
‘myColumn
$允许我们将一个字符串指定为一个特殊的字符串,该字符串应该引用一个表达式。标记(‘)是一种特殊的东西,称为符号;这是一个特定于 Scala 语言的,指向某个标识符。它们都执行相同的操作,是按名称引用列的简写方法。当您阅读不同对的人的 Spark 代码时,可能会看到前面提到的所有引用。

表达式 expression
列是表达式。表达式是什么?表达式是在 DataFrame 中数据记录的一个或多个值上的一组转换。把它想象成一个函数,它将一个或多个列名作为输入,表达式会解析它们,为数据集中的每个记录返回一个单一值。

在最简单的情况下,expr(“someCol”)等价于 col(“someCol”)。
列操作是表达式功能的一个子集
expr(“someCol - 5”) 与执行 col(“someCol”) - 5,或甚至 expr(“someCol”) - 5 的转换相同。这是因为 Spark 将它们编译为一个逻辑树,逻辑树指定了操作的顺序。

一些 DataFrames 操作列的示例

1
2
3
4
5
6
7
// 创建 DataFrame
Dataset<Row> df = spark.read().json("E:\\Java\\IntelliJ IDEA\\spark\\in\\2015-summary.json");
// Dataset API
df.select("DEST_COUNTRY_NAME").show(5);
// SQL
df.createOrReplaceTempView("flight");
spark.sql("select DEST_COUNTRY_NAME from flight limit 5").show();

可以使用相同的查询样式选择多个列,只需在 select 方法调用中添加更多的列名字符串参数

1
2
3
4
5
// Dataset API
df.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME").show(5);
// SQL
df.createOrReplaceTempView("flight");
spark.sql("select DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME from flight limit 5").show();

可以用许多不同的方式引用列,可以交替使用它们

1
2
3
4
5
6
7
8
9
10
// in Scala
import org.apache.spark.sql.functions.{expr, col, column}
df.select(
df.col("DEST_COUNTRY_NAME"),
col("DEST_COUNTRY_NAME"),
column("DEST_COUNTRY_NAME"),
'DEST_COUNTRY_NAME,
$"DEST_COUNTRY_NAME",
expr("DEST_COUNTRY_NAME"))
.show(2)

一个常见的错误是混合使用列对象和列字符串。例如,下列代码将导致编译错误

1
df.select(col("DEST_COUNTRY_NAME"), "EST_COUNTRY_NAME")

expr 是我们可以使用的最灵活的引用。它可以引用一个简单的列或一个列字符串操作。
为了说明这一点,让我们更改列名,然后通过 AS 关键字来更改它

1
2
3
4
5
// Dataset API
df.select(functions.expr("DEST_COUNTRY_NAME as destination")).show(2);
// SQL
df.createOrReplaceTempView("flight");
spark.sql("select DEST_COUNTRY_NAME as dest from flight limit 2").show();

7. 字面常量转换为 Spark 类型(Literals)

有时,我们需要将显式字面常量值传递给 Spark,它只是一个值(而不是一个新列)。这可能是一个常数值或者我们以后需要比较的值。我们的方法是通过 Literals,将给定编程语言的字面值转换为 Spark 能够理解的值:

1
2
3
4
5
// Dataset API
df.select(functions.expr("*"), functions.lit(1).as("One")).show();
// SQL
df.createOrReplaceTempView("flight");
spark.sql("select *, 1 as One from flight").show();

8. 添加列

将新列添加到 DataFrame 中,这是通过在 DataFrame 上使用 withColumn 方法来实现的,例如,让我们添加一个列,将数字 1 添加为一个列,列名为 numberOne:

1
2
3
4
5
//  Dataset API
df.withColumn("numberOne", functions.lit(1)).show();
// SQL
df.createOrReplaceTempView("flight");
spark.sql("select *, 1 as One from flight").show();

另一个例子:设置一个布尔标志,表示源国与目标国相同:

1
df.withColumn("ifSame", functions.expr("DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME")).filter("ifSame = true").show();

注意:withColumn 函数由两个参数:列名为 DataFrame 中的给定行创建值的表达式

我们也可以这样 重命名列:

1
2
// 复制某列但重新定义列名
df.withColumn("destination", functions.expr("DEST_COUNTRY_NAME")).show();

9. 重命名列

虽然我们可以按照刚才描述的方式重命名列,但是另一种方法是使用 withcolumnrename 方法。这会将第一个参数中的字符串的名称重命名为第二个参数中的字符串:

1
2
// 重命名列
df.withColumnRenamed("DEST_COUNTRY_NAME","dest").show();

10. 删除列

可能已经注意到我们可以通过使用 select 来实现这一点。然而,还有一个专门的方法叫做 drop:

1
2
// 删除列
df.drop("DEST_COUNTRY_NAME").show();

11. 更改列类型

有时,可能性需要列从一种类型转换为另一种类型,例如,如果有一组 StringType 应该是整数。我们可以将列从一种类型转换为另一种类型,例如,让我们将 count 列从整数转换为类型 Long:

1
2
3
4
5
6
// 更改列类型
df.withColumn("count", functions.col("count").cast("int")).printSchema();

// SQL
df.createOrReplaceTempView("flight");
spark.sql("select *, cast(count as int) as count2 from flight").printSchema();

12. 过滤行

为了过滤行,我们创建一个计算值为 true 或 false 的表达式。然后用一个等于 false 的表达式过滤掉这些行。使用 DataFrames 执行此操作的最常见方法是将表达式创建为字符串,或者使用一组列操作构建表达式。执行此操作有两种方法:您可以使用 where 或 filter,它们都将执行相同的操作,并在使用 DataFrames 时接受相同的参数类型。

1
2
3
4
5
6
7
// API
df.filter(functions.col("count").$less(2)).show(2);
df.where("count < 2").show(2);

// SQL
df.createOrReplaceTempView("flight");
spark.sql("select * from flight where count < 2").show(2);

你可能希望将多个过滤器放入相同的表达式中。尽管这是可能的,但它并不总是有用的,因为 Spark 会自动执行所有的过滤操作,而不考虑过滤器的排序。这意味着,如果你想指定多个过滤器,只需将它们按顺序连接起来,让 Spark 处理其余部分:

多重过滤:

1
2
3
4
5
// API
df.where("count < 2").where("DEST_COUNTRY_NAME != 'United States'").show();
// SQL
df.createOrReplaceTempView("flight");
spark.sql("select * from flight where count < 2 and DEST_COUNTRY_NAME != 'United States'").show();

13. 行去重

一个常见的用例是在一个 DataFrame 中提取唯一的或不同的值。这些值可以在一个或多个列中。我们这样做的方法是在 DataFrame 上使用不同的方法,它允许我们对该 DataFrame 中的任何行进行删除重复行。例如,让我们在数据集中获取唯一的起源地。当然,这是一个转换,它将返回一个新的 DataFrame,只有唯一的行:

去除重复航线:

1
2
3
4
5
6
// API
System.out.println(df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").distinct().count());

// SQL
df.createOrReplaceTempView("flight");
spark.sql("select count(distinct(DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME)) as count from flight").show();

14. DataFrame union

DataFrame 是不可变的。这意味着用户不能向 DataFrame 追加,因为这会改变它。要附加到 DataFrame,必须将原始的 DataFrame 与新的 DataFrame 结合起来 。这只是连接了两个 DataFrames。对于 Union 2 DataFrames,必须确保他们具有相同的模式和列数(Schema),否则,union 将会失败。

1
2
3
4
5
6
7
8
9
10
11
12
List<Person> p1 = new ArrayList<>();
p1.add(new Person("张三", 20, "北京"));
p1.add(new Person("李四", 22, "上海"));
Dataset<Row> df1 = spark.createDataFrame(p1, Person.class);

List<Person> p2 = new ArrayList<>();
p2.add(new Person("wangwu", 20, "北京"));
p2.add(new Person("qianliu", 22, "上海"));
Dataset<Row> df2 = spark.createDataFrame(p2, Person.class);

Dataset<Row> union = df1.union(df2);
union.show();

15. 行排序

在对 DataFrame 中的值进行排序时,我们总是希望对 DataFrame 顶部的最大或最小值进行排序。有两个相同的操作可以实现:sort 和 orderBy。它们接受列表达式,字符串以及多个列。默认 是按升序排序:

1
2
3
4
5
6
7
df.sort("count").show(5);

// 二次排序
df.orderBy("count", "DEST_COUNTRY_NAME").show(5);

// 如果要指定升序、降序,需要用asc 和 desc 函数
df.orderBy(functions.desc("ORIGIN_COUNTRY_NAME"),functions.desc("count")).show();

出于优化的目的,有时建议在另一组转换之前对每个分区进行排序。可以使用 sortWithinPartitons 方法来执行以下操作:

1
df.sortWithinPartitions("count").show();

16. Limit

通常,你可能想要限制从 DataFrame 中提取的内容;例如,您可能只想要一些 DataFrame 的前十位。

1
2
3
4
5
// API
df.sortWithinPartitions("count").limit(10).show();
// SQL
df.sortWithinPartitions("count").createOrReplaceTempView("flight");
spark.sql("select * from flight limit 10").show();

17. Repartition 和 Coalesce

一个重要的优化方式是根据一些经常过滤的列对数据进行分区,它控制跨集群的数据的物理布局,包括分区计划和分区数量。

Repartition 将导致数据的完全 shuffle,无论是否需要重新 shuffle。这意味着只有当将来的分区数目大于当前的分区数目时,或者当你希望通过一组列进行分区时,你才应该使用 Repartition

1
2
3
4
5
6
7
8
// 获取当前 分区数量
System.out.println(df.rdd().getNumPartitions());
// 重分区
Dataset<Row> repartition = df.repartition(2);
System.out.println(repartition.rdd().getNumPartitions());

// 如果需要经常对某个列进行过滤,那么基于该列进行重新分区是值得的
Dataset<Row> dest_country_name = df.repartition(5, functions.col("DEST_COUNTRY_NAME"));

另一方面,Coalesce 不会导致完全 shuffle,并尝试合并分区。
下面操作将根据目标国家的名称将你的数据转移到 5 个分区中,然后合并它们(没有完全shuffle):

1
2
// 合并成 2 个分区
Dataset<Row> coalesce = dest_country_name.coalesce(2);

18. 收集数据到 driver

  • collect: 从整个 DataFrame 中获取所有数据
  • take: 选取 DataFrame 的前几行
  • show: 打印出几行数据
1
2
3
4
5
df.take(2);
df.collect();
// Whether truncate long strings. If true, strings more than 20 characters will
// be truncated and all cells will be aligned right
df.show(5, false);

更多相关操作参考官网:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset

19. 作业

  1. 读取zips.json文件为DataFrame,并将列名_id重命名为zip
  2. 创建名为zips的case class或者javaBean,用于将第一步创建的DF转换为DS
  3. 显示DS中的数据
    1
    2
    3
    4
    5
    6
    7
    8
    // 读取 json 文件并修改列名
    Dataset<Row> df = spark.read().json("E:\\Java\\IntelliJ IDEA\\spark\\in\\zips.json");
    Dataset<Row> rowDataset = df.withColumnRenamed("_id", "zip");

    // 定义 Java bean 类,自定义 encoder 将 df 转为 ds
    Encoder<Zips> zipsBeanEncoder = Encoders.bean(Zips.class);
    Dataset<Zips> ds = rowDataset.as(zipsBeanEncoder);
    ds.show();

数据分析:

  1. 以降序显示人口超过40000的state, zip, city, pop
  2. 显示名为CA的states中人口最多的三个城市
  3. 把所有州states的人口加起来,按降序排列,显示前10名

分别使用Dataset API和sql实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ds.createOrReplaceTempView("zips");

// 1. 以降序显示人口超过40000的state, zip, city, pop
// API
ds.where("pop > 40000").select("state", "zip", "city", "pop").orderBy(functions.desc("pop")).show();
// SQL
spark.sql("select state, zip, city, pop from zips where pop >40000 order by pop desc").show();

// 2. 显示名为CA的states中人口最多的三个城市
// API
ds.where("state = 'CA'").orderBy(functions.desc("pop")).select("city").show(3);
// SQL
spark.sql("select city from zips where state = 'CA' order by pop desc limit 3").show();

// 3. 把所有州states的人口加起来,按降序排列,显示前10名
// API
ds.groupBy("state").sum("pop").withColumnRenamed("sum(pop)", "total").orderBy(functions.desc("total")).limit(10).show();
// SQL
spark.sql("select state, sum(pop) as total from zips group by state order by total desc limit 10").show();
支持一下
扫一扫,支持forsigner
  • 微信扫一扫
  • 支付宝扫一扫