Flink實(shí)戰(zhàn)—Flink SQL在Batch場(chǎng)景的Demo

最近工作會(huì)用到Flink SQL,周末學(xué)習(xí)了一下,寫個(gè)demo做記錄拆内,全部代碼請(qǐng)參考Github.

基于的Flink版本是1.9.1连锯,使用的是java8開(kāi)發(fā)归苍。

本例是Flink SQL在Batch場(chǎng)景下的應(yīng)用,目標(biāo)是從students运怖、scores表中讀取學(xué)生的信息拼弃,計(jì)算班級(jí)平均分。

1. 準(zhǔn)備數(shù)據(jù)

students.txt 保存學(xué)生信息:id摇展,name吻氧,classname

1 張三 1班
2 李四 1班
3 王五 2班
4 趙六 2班
5 郭大寶 2班

scores.txt 保存成績(jī):id,chinese咏连,math盯孙,english

1 100 90 80
2 97 87 74
3 70 50 43
4 100 99 99
5 80 81 82

2. 創(chuàng)建工程

根據(jù)官網(wǎng)的提示,通過(guò)mvn創(chuàng)建flink項(xiàng)目

   $ mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-java      \
      -DarchetypeVersion=1.9.0

創(chuàng)建后使用IDEA打開(kāi)祟滴,項(xiàng)目結(jié)構(gòu)如圖镀梭,把創(chuàng)建好的兩份數(shù)據(jù)保存在resources中.


1586602165374.jpg

編輯pom.xml,主要是引入一些flink的依賴:

<dependencies>
<!--flink core-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--flink-table-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-common</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!--kafka-->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
  </dependencies>

3. 實(shí)現(xiàn)功能

創(chuàng)建SQLBatch的JAVA類踱启,實(shí)現(xiàn)功能报账。

package com.cmbc.flink;
?
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
?
import static org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE;
?
?
public class SQLBatch {
    public static void main(String[] args) throws Exception {
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
?
        // read files
        DataSet<String> s_students = env.readTextFile("/Users/guoxingyu/Documents/work/java/flink/flinksql/src/main/resources/students.txt");
        DataSet<String> s_score = env.readTextFile("/Users/guoxingyu/Documents/work/java/flink/flinksql/src/main/resources/scores.txt");
?
        // prepare data
        DataSet<Tuple3<Integer, String, String>> students = s_students.map(new MapFunction<String, Tuple3<Integer, String, String>>() {
            @Override
            public Tuple3<Integer, String, String> map(String s) throws Exception {
                String[] line = s.split(" ");
                return new Tuple3<Integer, String, String>(Integer.valueOf(line[0]), line[1], line[2]);
            }
        });
?
        DataSet<Tuple4<Integer, Integer, Integer, Integer>> score = s_score.map(new MapFunction<String, Tuple4<Integer, Integer, Integer, Integer>>() {
            @Override
            public Tuple4<Integer, Integer, Integer, Integer> map(String s) throws Exception {
                String[] line = s.split(" ");
                return new Tuple4<Integer, Integer, Integer, Integer>(Integer.valueOf(line[0]), Integer.valueOf(line[1]),
                        Integer.valueOf(line[2]), Integer.valueOf(line[3]));
            }
        });
?
        // join data
        DataSet<Tuple6<Integer, String, String, Integer, Integer, Integer>> data = students.join(score)
                .where(0)
                .equalTo(0)
                .projectFirst(0,1,2)
                .projectSecond(1,2,3);
?
?
        // register to a table
        tEnv.registerDataSet("Data", data, "id, name, classname, chinese, math, english");
?
?
        // do sql
        Table sqlQuery = tEnv.sqlQuery("SELECT classname, AVG(chinese) as avg_chinese, AVG(math) as avg_math, AVG(english) as avg_english, " +
                "AVG(chinese + math + english) as avg_total " +
                "FROM Data " +
                "GROUP BY classname " +
                "ORDER BY avg_total"
        );
?
        // to sink
        DataSet<Info> result = tEnv.toDataSet(sqlQuery, Info.class);
        result.writeAsText("/Users/guoxingyu/Documents/work/java/flink/flinksql/src/main/resources/info.txt", OVERWRITE);
        tEnv.execute("do flink sql demo in batch");
?
    }
?
    public static class Info {
        public String classname;
        public Integer avg_chinese;
        public Integer avg_math;
        public Integer avg_english;
        public Integer avg_total;
?
        public Info() {
        }
?
        public Info(String classname, Integer avg_chinese, Integer avg_math, Integer avg_english, Integer avg_total) {
            this.classname = classname;
            this.avg_chinese = avg_chinese;
            this.avg_math = avg_math;
            this.avg_english = avg_english;
            this.avg_total = avg_total;
        }
?
        @Override
        public String toString() {
            return
                    "classname=" + classname +
                    ", avg_chinese=" + avg_chinese +
                    ", avg_math=" + avg_math +
                    ", avg_english=" + avg_english +
                    ", avg_total=" + avg_total +
                    "";
        }
    }
}

功能比較簡(jiǎn)單研底,簡(jiǎn)單說(shuō)一下:

  • 初始化flink env
  • 讀取文件數(shù)據(jù),這里讀取student.txt透罢、scores.txt兩張表
  • 數(shù)據(jù)預(yù)處理榜晦,這里通過(guò)id字段將兩個(gè)表的數(shù)據(jù)join出dataset
  • 將dataset映射成table,并執(zhí)行sql
  • 數(shù)據(jù)保存

4. 運(yùn)行和結(jié)果

  • 啟動(dòng)flink on local的模式 羽圃,在flink的安裝路徑下找到腳本start-cluster.sh
  • mvn打Jar包:mvn clean package乾胶,或者在idea里完成這一步,jar包位置在項(xiàng)目target路徑下
  • 執(zhí)行腳本:
flink run -c com.cmbc.flink.SQLBatch flinksql-1.0-SNAPSHOT.jar
  • 結(jié)果


    1586602913833.jpg

5. 參考

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末朽寞,一起剝皮案震驚了整個(gè)濱河市识窿,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌脑融,老刑警劉巖喻频,帶你破解...
    沈念sama閱讀 218,607評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異肘迎,居然都是意外死亡甥温,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,239評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門妓布,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)姻蚓,“玉大人,你說(shuō)我怎么就攤上這事匣沼≌玻” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 164,960評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵释涛,是天一觀的道長(zhǎng)加叁。 經(jīng)常有香客問(wèn)我,道長(zhǎng)枢贿,這世上最難降的妖魔是什么殉农? 我笑而不...
    開(kāi)封第一講書人閱讀 58,750評(píng)論 1 294
  • 正文 為了忘掉前任刀脏,我火速辦了婚禮局荚,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘愈污。我一直安慰自己耀态,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,764評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布暂雹。 她就那樣靜靜地躺著首装,像睡著了一般。 火紅的嫁衣襯著肌膚如雪杭跪。 梳的紋絲不亂的頭發(fā)上仙逻,一...
    開(kāi)封第一講書人閱讀 51,604評(píng)論 1 305
  • 那天驰吓,我揣著相機(jī)與錄音,去河邊找鬼系奉。 笑死檬贰,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的缺亮。 我是一名探鬼主播翁涤,決...
    沈念sama閱讀 40,347評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼萌踱!你這毒婦竟也來(lái)了葵礼?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 39,253評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤并鸵,失蹤者是張志新(化名)和其女友劉穎鸳粉,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體能真,經(jīng)...
    沈念sama閱讀 45,702評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡赁严,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,893評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了粉铐。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片疼约。...
    茶點(diǎn)故事閱讀 40,015評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖蝙泼,靈堂內(nèi)的尸體忽然破棺而出程剥,到底是詐尸還是另有隱情,我是刑警寧澤汤踏,帶...
    沈念sama閱讀 35,734評(píng)論 5 346
  • 正文 年R本政府宣布织鲸,位于F島的核電站,受9級(jí)特大地震影響溪胶,放射性物質(zhì)發(fā)生泄漏搂擦。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,352評(píng)論 3 330
  • 文/蒙蒙 一哗脖、第九天 我趴在偏房一處隱蔽的房頂上張望瀑踢。 院中可真熱鬧,春花似錦才避、人聲如沸橱夭。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,934評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)棘劣。三九已至,卻和暖如春楞遏,著一層夾襖步出監(jiān)牢的瞬間茬暇,已是汗流浹背首昔。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,052評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留糙俗,地道東北人沙廉。 一個(gè)月前我還...
    沈念sama閱讀 48,216評(píng)論 3 371
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像臼节,于是被迫代替她去往敵國(guó)和親撬陵。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,969評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • pyspark.sql模塊 模塊上下文 Spark SQL和DataFrames的重要類: pyspark.sql...
    mpro閱讀 9,456評(píng)論 0 13
  • 了解Flink是什么网缝,F(xiàn)link應(yīng)用程序運(yùn)行的多樣化巨税,對(duì)比業(yè)界常用的流處理框架,F(xiàn)link的發(fā)展趨勢(shì)粉臊,F(xiàn)link生...
    JavaEdge閱讀 5,077評(píng)論 1 18
  • 前篇主要介紹流式計(jì)算相關(guān)的核心概念草添,這篇簡(jiǎn)要聊聊Flink總體架構(gòu)、運(yùn)行環(huán)境及其在大數(shù)據(jù)生態(tài)系統(tǒng)中的位置扼仲,讓大家先...
    data之道閱讀 1,226評(píng)論 0 6
  • Table API和SQL通過(guò)join API集成在一起远寸,這個(gè)join API的核心概念是Table,Table可...
    寫B(tài)ug的張小天閱讀 16,770評(píng)論 0 15
  • 因?yàn)榘讣蛲佬祝蛱焱砩鲜c(diǎn)驰后,從一個(gè)不熟悉的寫字樓區(qū)域步行去地鐵站,從白日里繁忙景象里安靜下來(lái)的寬闊道路矗愧,靜得有點(diǎn)不...
    環(huán)環(huán)lawyer閱讀 151評(píng)論 0 1