最近工作會(huì)用到Flink SQL,周末學(xué)習(xí)了一下,寫個(gè)demo做記錄拆内,全部代碼請(qǐng)參考Github.
基于的Flink版本是1.9.1连锯,使用的是java8開(kāi)發(fā)归苍。
本例是Flink SQL在Batch場(chǎng)景下的應(yīng)用,目標(biāo)是從students运怖、scores表中讀取學(xué)生的信息拼弃,計(jì)算班級(jí)平均分。
1. 準(zhǔn)備數(shù)據(jù)
students.txt 保存學(xué)生信息:id摇展,name吻氧,classname
1 張三 1班
2 李四 1班
3 王五 2班
4 趙六 2班
5 郭大寶 2班
scores.txt 保存成績(jī):id,chinese咏连,math盯孙,english
1 100 90 80
2 97 87 74
3 70 50 43
4 100 99 99
5 80 81 82
2. 創(chuàng)建工程
根據(jù)官網(wǎng)的提示,通過(guò)mvn創(chuàng)建flink項(xiàng)目
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.9.0
創(chuàng)建后使用IDEA打開(kāi)祟滴,項(xiàng)目結(jié)構(gòu)如圖镀梭,把創(chuàng)建好的兩份數(shù)據(jù)保存在resources中.
1586602165374.jpg
編輯pom.xml,主要是引入一些flink的依賴:
<dependencies>
<!--flink core-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--flink-table-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
3. 實(shí)現(xiàn)功能
創(chuàng)建SQLBatch的JAVA類踱启,實(shí)現(xiàn)功能报账。
package com.cmbc.flink;
?
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
?
import static org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE;
?
?
public class SQLBatch {
public static void main(String[] args) throws Exception {
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
?
// read files
DataSet<String> s_students = env.readTextFile("/Users/guoxingyu/Documents/work/java/flink/flinksql/src/main/resources/students.txt");
DataSet<String> s_score = env.readTextFile("/Users/guoxingyu/Documents/work/java/flink/flinksql/src/main/resources/scores.txt");
?
// prepare data
DataSet<Tuple3<Integer, String, String>> students = s_students.map(new MapFunction<String, Tuple3<Integer, String, String>>() {
@Override
public Tuple3<Integer, String, String> map(String s) throws Exception {
String[] line = s.split(" ");
return new Tuple3<Integer, String, String>(Integer.valueOf(line[0]), line[1], line[2]);
}
});
?
DataSet<Tuple4<Integer, Integer, Integer, Integer>> score = s_score.map(new MapFunction<String, Tuple4<Integer, Integer, Integer, Integer>>() {
@Override
public Tuple4<Integer, Integer, Integer, Integer> map(String s) throws Exception {
String[] line = s.split(" ");
return new Tuple4<Integer, Integer, Integer, Integer>(Integer.valueOf(line[0]), Integer.valueOf(line[1]),
Integer.valueOf(line[2]), Integer.valueOf(line[3]));
}
});
?
// join data
DataSet<Tuple6<Integer, String, String, Integer, Integer, Integer>> data = students.join(score)
.where(0)
.equalTo(0)
.projectFirst(0,1,2)
.projectSecond(1,2,3);
?
?
// register to a table
tEnv.registerDataSet("Data", data, "id, name, classname, chinese, math, english");
?
?
// do sql
Table sqlQuery = tEnv.sqlQuery("SELECT classname, AVG(chinese) as avg_chinese, AVG(math) as avg_math, AVG(english) as avg_english, " +
"AVG(chinese + math + english) as avg_total " +
"FROM Data " +
"GROUP BY classname " +
"ORDER BY avg_total"
);
?
// to sink
DataSet<Info> result = tEnv.toDataSet(sqlQuery, Info.class);
result.writeAsText("/Users/guoxingyu/Documents/work/java/flink/flinksql/src/main/resources/info.txt", OVERWRITE);
tEnv.execute("do flink sql demo in batch");
?
}
?
public static class Info {
public String classname;
public Integer avg_chinese;
public Integer avg_math;
public Integer avg_english;
public Integer avg_total;
?
public Info() {
}
?
public Info(String classname, Integer avg_chinese, Integer avg_math, Integer avg_english, Integer avg_total) {
this.classname = classname;
this.avg_chinese = avg_chinese;
this.avg_math = avg_math;
this.avg_english = avg_english;
this.avg_total = avg_total;
}
?
@Override
public String toString() {
return
"classname=" + classname +
", avg_chinese=" + avg_chinese +
", avg_math=" + avg_math +
", avg_english=" + avg_english +
", avg_total=" + avg_total +
"";
}
}
}
功能比較簡(jiǎn)單研底,簡(jiǎn)單說(shuō)一下:
- 初始化flink env
- 讀取文件數(shù)據(jù),這里讀取student.txt透罢、scores.txt兩張表
- 數(shù)據(jù)預(yù)處理榜晦,這里通過(guò)id字段將兩個(gè)表的數(shù)據(jù)join出dataset
- 將dataset映射成table,并執(zhí)行sql
- 數(shù)據(jù)保存
4. 運(yùn)行和結(jié)果
- 啟動(dòng)flink on local的模式 羽圃,在flink的安裝路徑下找到腳本start-cluster.sh
- mvn打Jar包:mvn clean package乾胶,或者在idea里完成這一步,jar包位置在項(xiàng)目target路徑下
- 執(zhí)行腳本:
flink run -c com.cmbc.flink.SQLBatch flinksql-1.0-SNAPSHOT.jar
-
結(jié)果
1586602913833.jpg