Flink SQL 入門指北

1. Overview

本文主要來自官網(wǎng)蝌诡,旨在整理處 Flink SQL 的基本語(yǔ)法和使用挎春,基礎(chǔ)向看疙。

2. API 調(diào)用

2.1 Old Planner VS Blink Planner

  • Blink Planner 對(duì)代碼生成機(jī)制做了改進(jìn)、對(duì)部分算子進(jìn)行了優(yōu)化直奋,提供了豐富實(shí)用的新功能能庆,如維表 join、Top N脚线、MiniBatch搁胆、流式去重、聚合場(chǎng)景的數(shù)據(jù)傾斜優(yōu)化等新功能。

  • Blink Planner 的優(yōu)化策略是基于公共子圖的優(yōu)化算法渠旁,包含了基于成本的優(yōu)化(CBO)和基于規(guī)則的優(yōu)化(CRO)兩種策略攀例,優(yōu)化更為全面。同時(shí)顾腊,Blink Planner 支持從 catalog 中獲取數(shù)據(jù)源的統(tǒng)計(jì)信息肛度,這對(duì)CBO優(yōu)化非常重要。

  • Blink Planner 提供了更多的內(nèi)置函數(shù)投慈,更標(biāo)準(zhǔn)的 SQL 支持承耿,在 Flink 1.9 版本中已經(jīng)完整支持 TPC-H ,對(duì)高階的 TPC-DS 支持也計(jì)劃在下一個(gè)版本實(shí)現(xiàn)伪煤。

Flink 1.11 已經(jīng)默認(rèn)使用 Blink Planner加袋。

2.2 基本程序結(jié)構(gòu)

1.創(chuàng)建 TableEnvironment ( old/blink planner + stream/batch )
2.創(chuàng)建表( tableEnv.connect 外部數(shù)據(jù)源 或者 tableEnv.fromDataStream )
3.查詢表( Table API 或者 SQL )
4.輸出表( table.insertInto("xxtable") 或者 table.toRetractStream[T]/toAppendStream[T])

2.3 創(chuàng)建 TableEnvironment

// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);

// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;

ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);

// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

2.4 創(chuàng)建表

// 1.通過外部數(shù)據(jù)源創(chuàng)建
//數(shù)據(jù)格式:sensor_1,1547718225,22.8
tableEnv.connect(new Kafka()
      .version("0.11")
      .topic("sensor")
      .property("zookeeper.connect", "localhost:2181")
      .property("bootstrap.servers", "localhost:9092")
    )
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
      )
      .createTemporaryTable("kafkaInputTable")


// 2.通過 datastream 轉(zhuǎn)換
val table1: Table = tableEnv.fromDataStream(stream)

2.5 查詢表

//table api
val sensorTable = tableEnv.from("inputTable")
val resultTable = sensorTable
.select('id, 'temperature)
.filter('id === "sensor_1")

// SQL
val resultSqlTable = tableEnv.sqlQuery(
"""
|select id, temperature
|from inputTable
|where id = 'sensor_1'
""".stripMargin)

2.6 表轉(zhuǎn)流的三種輸出模式

  • 追加( Append )模式

    • 只做插入操作,和外部連接起只交換插入( insert )消息
  • 撤回 ( Retract )模式

    • 表和外部連接起交換添加( Add )和撤回( Retract )消息
    • 插入操作編碼為 Add 消息抱既,刪除編碼為 Retract 消息职烧,更新編碼為上一條的 Retract 和下一條的 Add 消息
    • 不能定義 Key
  • 更新( Upsert )模式

    • 更新和插入都被編碼為 Upsert 消息,刪除編碼為 Delete 消息
    • 需要定義 Key

DataStream 只支持 Append 和 Retract 模式防泵。(toRetractStream[T] & toAppendStream[T] )
外部文件系統(tǒng)的流支持哪種模式取決于具體實(shí)現(xiàn)蚀之,比如 Kakfa 只支持 Append 模式。

2.7 輸出表

tableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temp", DataTypes.DOUBLE())
      )
      .createTemporaryTable("inputTable")

    // 轉(zhuǎn)換操作
    val sensorTable: Table = tableEnv.from("inputTable")
    // 簡(jiǎn)單轉(zhuǎn)換
    val resultTable: Table = sensorTable
      .select('id, 'temp)
      .filter('id === "sensor_1")

    //  聚合轉(zhuǎn)換
    val aggTable: Table = sensorTable
      .groupBy('id)    
      .select('id, 'id.count as 'count)

    // 輸出到外部文件系統(tǒng)或者 DataStream
    val outputPath = "..."

    // 注冊(cè)輸出表
    tableEnv.connect(new FileSystem().path(outputPath))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("temperature", DataTypes.DOUBLE())
      )
      .createTemporaryTable("outputTable")

    //aggTable.insertInto("outputTable")  aggTable 因?yàn)橛行薷牟僮鹘菖ⅲ珻svTableSink 只實(shí)現(xiàn)了 AppendStreamTableSink足删,所以無法輸出到文件。
    resultTable.insertInto("outputTable")

    resultTable.toAppendStream[(String, Double)].print("result")
    // aggTable 因?yàn)橛行薷牟僮鞑荒苁褂?append锁右,需要使用 Retract
    aggTable.toRetractStream[Row].print("agg")

3 動(dòng)態(tài)表

3.1 DataStream 上的關(guān)系查詢

關(guān)系代數(shù) / SQL 流處理
關(guān)系(或表)是有界(多)元組集合失受。 流是一個(gè)無限元組序列。
對(duì)批數(shù)據(jù)(例如關(guān)系數(shù)據(jù)庫(kù)中的表)執(zhí)行的查詢可以訪問完整的輸入數(shù)據(jù)咏瑟。 流式查詢?cè)趩?dòng)時(shí)不能訪問所有數(shù)據(jù)拂到,必須“等待”數(shù)據(jù)流入。
批處理查詢?cè)诋a(chǎn)生固定大小的結(jié)果后終止码泞。 流查詢不斷地根據(jù)接收到的記錄更新其結(jié)果兄旬,并且始終不會(huì)結(jié)束。

盡管存在這些差異余寥,但是使用關(guān)系查詢和 SQL 處理流并不是不可能的领铐。高級(jí)關(guān)系數(shù)據(jù)庫(kù)系統(tǒng)提供了一個(gè)稱為 物化視圖(Materialized Views) 的特性。物化視圖被定義為一條 SQL 查詢劈狐,緩存查詢的結(jié)果罐孝。緩存的一個(gè)常見難題是防止緩存結(jié)果過期。當(dāng)其定義查詢的基表被修改時(shí)肥缔,物化視圖將過期莲兢。 即時(shí)視圖維護(hù)(Eager View Maintenance) 是一種一旦更新了物化視圖的基表就立即更新視圖的技術(shù)。

3.2 動(dòng)態(tài)表 & 連續(xù)查詢( Continuous Query )

動(dòng)態(tài)表查詢流程
  1. 將流轉(zhuǎn)換為動(dòng)態(tài)表。
  2. 在動(dòng)態(tài)表上計(jì)算一個(gè)連續(xù)查詢改艇,生成一個(gè)新的動(dòng)態(tài)表收班。
  3. 生成的動(dòng)態(tài)表被轉(zhuǎn)換回流。
流轉(zhuǎn)為動(dòng)態(tài)表

連續(xù)查詢并生成新動(dòng)態(tài)表

動(dòng)態(tài)表轉(zhuǎn)換回流(Retract模式)

4. 窗口和時(shí)間語(yǔ)義

關(guān)于窗口和時(shí)間語(yǔ)義的介紹可以參考這篇文章谒兄。之前是在流上進(jìn)行討論的摔桦。Flink 在表上同樣支持相應(yīng)的邏輯。

4.1 時(shí)間語(yǔ)義

可以通過 DDL 方式創(chuàng)建兩種時(shí)間語(yǔ)義承疲,但是比較晦澀邻耕,這里不做舉例,感興趣可以到官網(wǎng)查看燕鸽。

4.1.1 processing time

注意處理時(shí)間屬性一定不能定義在一個(gè)已有字段上

  • 流轉(zhuǎn)表時(shí):

    // 聲明一個(gè)額外的字段作為時(shí)間屬性字段
    val table = tEnv.fromDataStream(stream, $"UserActionTimestamp", $"user_name", $"data", $"user_action_time".proctime)
    
  • 定義 tableSchema 時(shí):

      .withSchema(new Schema()
        .field("id",DataTypes.STRING())
        .field("timestamp",DataTypes.BIGINT())
        .field("temperature",DataTypes.DOUBLE())
        .field("pt",DataTypes.TIMESTAMP(3)).proctime()    //將該字段定義為 processing time
      )
    

4.1.2 event time

  • 流轉(zhuǎn)表時(shí):
    // 基于 stream 中的事件產(chǎn)生時(shí)間戳和 watermark
    val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)
    
    // 聲明一個(gè)額外的邏輯字段作為事件時(shí)間屬性(數(shù)據(jù)來源于上面datastream定義好的字段)兄世,必須放在 schema 最后
    val table = tEnv.fromDataStream(stream, $"user_name", $"data", $"user_action_time".rowtime)
    
    
    // Option 2:
    
    // 從第一個(gè)字段獲取事件時(shí)間,并且產(chǎn)生 watermark
    val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)
    
    // 第一個(gè)字段已經(jīng)用作事件時(shí)間抽取了啊研,不用再用一個(gè)新字段來表示事件時(shí)間了
    val table = tEnv.fromDataStream(stream, $"user_action_time".rowtime, $"user_name", $"data")
    
  • 定義 tableSchema 時(shí):
        //需要注意這種方式的 source 必須實(shí)現(xiàn) DefinedRowtimeAttributes 接口御滩。如 KafkaTableSource 實(shí)現(xiàn)了該接口。CsvTableSource 則沒有党远。 
    .withSchema(new Schema()
        .field("id",DataTypes.STRING())
        .field("timestamp",DataTypes.BIGINT())
        .rowtime(
            new Rowtime()
                .timestampsFromFiled("timestamp")
                .watermarksPeriodicBounded(1000)
        )
        .field("temperature",DataTypes.DOUBLE())
      )
    

4.2 窗口操作

窗口操作相當(dāng)于對(duì)數(shù)據(jù)進(jìn)行分組時(shí)削解,除了按照字段以外,增加了新的維度進(jìn)行分組沟娱,一般是時(shí)間或者數(shù)據(jù)數(shù)量氛驮。

4.2.1 Group Windows

根據(jù)時(shí)間或者行數(shù)間隔,將行聚集在有限的組中花沉,并對(duì)每個(gè)組的數(shù)據(jù)執(zhí)行一次聚合函數(shù)柳爽。最終每個(gè)組得出一個(gè)結(jié)果,類似于傳統(tǒng)對(duì) group by 操作

// 基本使用結(jié)構(gòu)
val table = input
        .window([w:GroupWindow] as "w") //定義窗口和別名 w
        .groupBy($"w",$"a")  //以屬性 a 和窗口 w 作為分組的key
        .select($"a",$"b".sum)  //聚合字段b的值碱屁,求和

tumbling window
    - .window( Tumble over 10.minutes on $"a_rowtime"/$"a_proctime" as "w")
    - .window( Tumble over 10.rows on $"a_proctime" as "w")
    - sql: tumble(ts, interval '10' second)

sliding windows 
    - .window( Slide over 10.minutes every 5.minutes on $"a_rowtime"/$"a_proctime as "w")
    - .window( Slide over 10.rows every 5.rows on $"a_proctime" as "w")
    - sql: hop(ts,interval '10' second,interval '10' second) p.s. 第二個(gè)是步長(zhǎng),第三個(gè)是窗口長(zhǎng)度

session windows
    - .window( Session withGap 10.minutes on $"a_rowtime"/$"a_proctime" as "w")
    - sql: session(ts,interval '10' second)

sql 輔助函數(shù),xx = {tumble,hop,session}:
    - xx_start(ts, interval '10' second)
    - xx_end(ts, interval '10' second)
    - xx_rowtime(ts, interval '10' second)
    - xx_proctime(ts, interval '10' second)

4.2.2 Over Windows

針對(duì)每個(gè)輸入行蛾找,進(jìn)行開窗娩脾,增加一列表示結(jié)果,每個(gè)行都有自己所在窗口的結(jié)果打毛。類似于傳統(tǒng)的 over 操作

// 基本使用結(jié)構(gòu)
val table = input
        .window([w:OverWindow] as "w") 
        .select($"a",$"b".sum over $"w", $"c".min over $"w")  

無界 over window
    - .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding UNBOUNDEN_RANGE as "w")
    - .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding UNBOUNDEN_ROW as "w")

有界 over window
    - .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding 1.minutes as "w")
    - .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding 10.rows as "w")
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末柿赊,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子幻枉,更是在濱河造成了極大的恐慌碰声,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,402評(píng)論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件熬甫,死亡現(xiàn)場(chǎng)離奇詭異胰挑,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門瞻颂,熙熙樓的掌柜王于貴愁眉苦臉地迎上來豺谈,“玉大人,你說我怎么就攤上這事贡这〔缒” “怎么了?”我有些...
    開封第一講書人閱讀 162,483評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵盖矫,是天一觀的道長(zhǎng)丽惭。 經(jīng)常有香客問我,道長(zhǎng)辈双,這世上最難降的妖魔是什么责掏? 我笑而不...
    開封第一講書人閱讀 58,165評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮辐马,結(jié)果婚禮上拷橘,老公的妹妹穿的比我還像新娘。我一直安慰自己喜爷,他們只是感情好冗疮,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,176評(píng)論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著檩帐,像睡著了一般术幔。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上湃密,一...
    開封第一講書人閱讀 51,146評(píng)論 1 297
  • 那天诅挑,我揣著相機(jī)與錄音,去河邊找鬼泛源。 笑死拔妥,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的达箍。 我是一名探鬼主播没龙,決...
    沈念sama閱讀 40,032評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼缎玫!你這毒婦竟也來了硬纤?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,896評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤赃磨,失蹤者是張志新(化名)和其女友劉穎筝家,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體邻辉,經(jīng)...
    沈念sama閱讀 45,311評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡溪王,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,536評(píng)論 2 332
  • 正文 我和宋清朗相戀三年腮鞍,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片在扰。...
    茶點(diǎn)故事閱讀 39,696評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡缕减,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出芒珠,到底是詐尸還是另有隱情桥狡,我是刑警寧澤,帶...
    沈念sama閱讀 35,413評(píng)論 5 343
  • 正文 年R本政府宣布皱卓,位于F島的核電站裹芝,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏娜汁。R本人自食惡果不足惜嫂易,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,008評(píng)論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望掐禁。 院中可真熱鬧怜械,春花似錦、人聲如沸傅事。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)蹭越。三九已至障本,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間响鹃,已是汗流浹背驾霜。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留买置,地道東北人粪糙。 一個(gè)月前我還...
    沈念sama閱讀 47,698評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像忿项,于是被迫代替她去往敵國(guó)和親猜旬。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,592評(píng)論 2 353