盒子
盒子
文章目录
  1. 1. 有一份数据,需求如下:
    1. 1.1 根据 Department 字段对数据进行分组,然后根据 Annual Salary 排序,然后根据 first name 排序。
    2. 1.2 将每个部门的工资最高的前三名,存入 Mysql 数据库,数据库表包括如下字段(Department, Annual Salary, Firstname)

自定义分区器 + 二次排序

1. 有一份数据,需求如下:

  1. 根据 Department 字段对数据进行分组,然后根据 Annual Salary 排序,然后根据 first name 排序。
  2. 将每个部门的工资最高的前三名,存入 Mysql 数据库,数据库表包括如下字段(Department, Annual Salary, Firstname)。

数据格式如下:

数据字段说明:
First name: 名
Last name: 姓
Job Title: 职称
Department: 部门
Full or Part-time: 全职或兼职
Salary: 工资
Typical Hours: 小时工
Annual Salary: 年薪
Hourly Rate: 时薪

1.1 根据 Department 字段对数据进行分组,然后根据 Annual Salary 排序,然后根据 first name 排序。

思路

  1. 使用 repartitionAndSortWithinPartitions(partitioner: Partitioner) 算子,可以自定义分区器,根据部门分区,并对分区内的数据排序;
  2. 构造 EmployKey 实体类,属性有 Annual Salary, first name, Department,实现 Comparable 接口,重写 compareTo 方法,实现二次排序。

代码

EmployKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.miracle.sparkproject;

import java.io.Serializable;

/**
* @program: sparkdemo
* @description:
* @author: Miracle
* @create: 2019-04-16 21:31
**/
public class EmployKey implements Comparable<EmployKey>, Serializable {
private static final long serialVersionUID = 6280745086974614533L;
private String name;
private String department;
private double salary;

public EmployKey(String name, String department, double salary) {
this.name = name;
this.department = department;
this.salary = salary;
}

public static long getSerialVersionUID() {
return serialVersionUID;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getDepartment() {
return department;
}

public void setDepartment(String department) {
this.department = department;
}

public double getSalary() {
return salary;
}

public void setSalary(double salary) {
this.salary = salary;
}

@Override
public int compareTo(EmployKey o) {
//先按salary排序
int compare=(int)o.getSalary()-(int)this.getSalary();
//如果salary相同,再按name排
if(compare==0){
compare=this.getName().compareTo(o.getName());
}
return compare;
}
}

EmployValue:存储其余信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.miracle.sparkproject;

import java.io.Serializable;

/**
* @program: sparkdemo
* @description:
* @author: Miracle
* @create: 2019-04-16 21:50
**/
public class EmployValue implements Serializable {
private static final long serialVersionUID = 5053727575607097704L;
private String jobTitle;
private String lastName;

public EmployValue(String jobTitle, String lastName) {
this.jobTitle = jobTitle;
this.lastName = lastName;
}

public static long getSerialVersionUID() {
return serialVersionUID;
}

public String getJobTitle() {
return jobTitle;
}

public void setJobTitle(String jobTitle) {
this.jobTitle = jobTitle;
}

public String getLastName() {
return lastName;
}

public void setLastName(String lastName) {
this.lastName = lastName;
}
}

自定义分区器 Sec_Partitioner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.miracle.review;

import com.miracle.sparkproject.EmployKey;
import org.apache.spark.Partitioner;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* @program: spark
* @description:
* @author: Miracle
* @create: 2019-08-19 02:02
**/
public class Sec_Partitioner extends Partitioner {

private Map<String, Integer> hashCodePartitionIndexMap = new ConcurrentHashMap();

private int numPartitions;

public int getNumPartitions() {
return numPartitions;
}

public void setNumPartitions(int numPartitions) {
this.numPartitions = numPartitions;
}

public Sec_Partitioner(List<EmployKey> keyList) {
super();
int flag =0;
for (int i = 0; i < keyList.size(); i++) {
if (hashCodePartitionIndexMap.get(keyList.get(i).getDepartment()) == null){
hashCodePartitionIndexMap.put(keyList.get(i).getDepartment(), flag);
flag++;
}
}
setNumPartitions(hashCodePartitionIndexMap.keySet().size());
}

@Override
public int numPartitions() {
return getNumPartitions();
}

@Override
public int getPartition(Object key) {
EmployKey employKey = (EmployKey) key;
return hashCodePartitionIndexMap.get(employKey.getDepartment());
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Sec_Partitioner other = (Sec_Partitioner) obj;
if (other.getNumPartitions() == this.getNumPartitions())
return true;
return false;
}
}

主类 SecondarySort

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.miracle.review;

import com.miracle.sparkproject.EmployKey;
import com.miracle.sparkproject.EmployValue;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

/**
* @program: spark
* @description: 二次排序
* 需求1:根据Department字段对数据进行分组,然后根据Annual Salary排序,然后根据first name排序。
* 需求2:将每个部门的工资最高的前三名,存入Mysql数据库,数据库表包括如下字段(Department, Annual Salary, Firstname, )
* @author: Miracle
* @create: 2019-08-18 22:45
**/
public class SecondarySort {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("SecondarySort").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(conf);

JavaRDD<String> javaRDD = jsc.textFile("in/employee_all.csv");

JavaPairRDD<EmployKey, EmployValue> pair = javaRDD.mapToPair(data -> {
String[] fields = data.split(",");
double salary = fields[7].length() > 0 ? Double.parseDouble(fields[7]) : 0.00;
return new Tuple2<>(
new EmployKey(fields[0], fields[3], salary),
new EmployValue(fields[2], fields[1])
);
});
JavaPairRDD<EmployKey, EmployValue> employKeyEmployValueJavaPairRDD = pair.repartitionAndSortWithinPartitions(new Sec_Partitioner(pair.keys().collect()));
for (Tuple2<EmployKey, EmployValue> tuple2 : employKeyEmployValueJavaPairRDD.collect()) {
System.out.println(tuple2._1.getDepartment() + " " + tuple2._1.getSalary() + " " + tuple2._1.getName() + " " + tuple2._2.getLastName() + " " + tuple2._2.getJobTitle());
}

jsc.stop();
jsc.close();
}
}

1.2 将每个部门的工资最高的前三名,存入 Mysql 数据库,数据库表包括如下字段(Department, Annual Salary, Firstname)

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
employKeyEmployValueJavaPairRDD.foreachPartition(partition -> {
int flag = 0;

// 存入 MySQL 数据库
Connection conn = JdbcUtils.getConnection();
PreparedStatement preparedStatement = null;

while (partition.hasNext() && flag < 3) {
Tuple2<EmployKey, EmployValue> tuple2 = partition.next();
System.out.println(tuple2._1.getDepartment() + " " + tuple2._1.getSalary() + " " + tuple2._1.getName() + " " + tuple2._2.getLastName() + " " + tuple2._2.getJobTitle());

String sql = "insert into employee (firstname,department,annual_salary) values(?,?,?)";
preparedStatement = conn.prepareStatement(sql);
preparedStatement.setString(1, tuple2._1.getName());
preparedStatement.setString(2, tuple2._1.getDepartment());
preparedStatement.setDouble(3, tuple2._1.getSalary());
preparedStatement.executeUpdate();

flag++;
}
JdbcUtils.free(preparedStatement, conn);
});

JdbcUtils 工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package com.miracle.utils;


import java.sql.*;

/**
* @program: sparkdemo
* @description:
* @author: Miracle
* @create: 2019-03-31 13:24
**/
public class JdbcUtils {

private static String url="jdbc:mysql://master01:3306/test?characterEncoding=UTF-8";
private static String user="user";
private static String pwd="pwd";

private JdbcUtils(){}

// 1. 注册驱动 oracle.jdbc.driver.OracleDriver
static {
try {
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
System.out.println("数据库驱动加载失败!");
}
}

// 2. 建立一个连接
public static Connection getConnection() throws SQLException{
return DriverManager.getConnection(url,user,pwd);
}

// 3. 关闭资源
public static void free(Statement statement,Connection connection){
if(statement!=null){
try {
statement.close();
} catch (SQLException e) {
e.printStackTrace();
}finally {
if(connection!=null){
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}

// 3. 关闭资源2
public static void free2(ResultSet rs,Statement statement,Connection connection){
if(rs!=null){
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}finally {
free(statement,connection);
}
}
}

// main
public static void main(String[] args) {
try {
Connection connection=getConnection();
String sql="insert into textFile (filename) values (?)";
PreparedStatement preparedStatement;
preparedStatement=(PreparedStatement)connection.prepareStatement(sql);
preparedStatement.setString(1,"test");
preparedStatement.executeUpdate();
free(preparedStatement,connection);
System.out.println("**************"+connection);
} catch (SQLException e) {
e.printStackTrace();
}
}
}
支持一下
扫一扫,支持forsigner
  • 微信扫一扫
  • 支付宝扫一扫