1. Overview
本文主要來自官網(wǎng)蝌诡,旨在整理處 Flink SQL 的基本語(yǔ)法和使用挎春,基礎(chǔ)向看疙。
2. API 調(diào)用
2.1 Old Planner VS Blink Planner
Blink Planner 對(duì)代碼生成機(jī)制做了改進(jìn)、對(duì)部分算子進(jìn)行了優(yōu)化直奋,提供了豐富實(shí)用的新功能能庆,如維表 join、Top N脚线、MiniBatch搁胆、流式去重、聚合場(chǎng)景的數(shù)據(jù)傾斜優(yōu)化等新功能。
Blink Planner 的優(yōu)化策略是基于公共子圖的優(yōu)化算法渠旁,包含了基于成本的優(yōu)化(CBO)和基于規(guī)則的優(yōu)化(CRO)兩種策略攀例,優(yōu)化更為全面。同時(shí)顾腊,Blink Planner 支持從 catalog 中獲取數(shù)據(jù)源的統(tǒng)計(jì)信息肛度,這對(duì)CBO優(yōu)化非常重要。
Blink Planner 提供了更多的內(nèi)置函數(shù)投慈,更標(biāo)準(zhǔn)的 SQL 支持承耿,在 Flink 1.9 版本中已經(jīng)完整支持 TPC-H ,對(duì)高階的 TPC-DS 支持也計(jì)劃在下一個(gè)版本實(shí)現(xiàn)伪煤。
Flink 1.11 已經(jīng)默認(rèn)使用 Blink Planner加袋。
2.2 基本程序結(jié)構(gòu)
1.創(chuàng)建 TableEnvironment ( old/blink planner + stream/batch )
2.創(chuàng)建表( tableEnv.connect 外部數(shù)據(jù)源 或者 tableEnv.fromDataStream )
3.查詢表( Table API 或者 SQL )
4.輸出表( table.insertInto("xxtable") 或者 table.toRetractStream[T]/toAppendStream[T])
2.3 創(chuàng)建 TableEnvironment
// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
2.4 創(chuàng)建表
// 1.通過外部數(shù)據(jù)源創(chuàng)建
//數(shù)據(jù)格式:sensor_1,1547718225,22.8
tableEnv.connect(new Kafka()
.version("0.11")
.topic("sensor")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaInputTable")
// 2.通過 datastream 轉(zhuǎn)換
val table1: Table = tableEnv.fromDataStream(stream)
2.5 查詢表
//table api
val sensorTable = tableEnv.from("inputTable")
val resultTable = sensorTable
.select('id, 'temperature)
.filter('id === "sensor_1")
// SQL
val resultSqlTable = tableEnv.sqlQuery(
"""
|select id, temperature
|from inputTable
|where id = 'sensor_1'
""".stripMargin)
2.6 表轉(zhuǎn)流的三種輸出模式
-
追加( Append )模式
- 只做插入操作,和外部連接起只交換插入( insert )消息
-
撤回 ( Retract )模式
- 表和外部連接起交換添加( Add )和撤回( Retract )消息
- 插入操作編碼為 Add 消息抱既,刪除編碼為 Retract 消息职烧,更新編碼為上一條的 Retract 和下一條的 Add 消息
- 不能定義 Key
-
更新( Upsert )模式
- 更新和插入都被編碼為 Upsert 消息,刪除編碼為 Delete 消息
- 需要定義 Key
DataStream 只支持 Append 和 Retract 模式防泵。(toRetractStream[T] & toAppendStream[T]
)
外部文件系統(tǒng)的流支持哪種模式取決于具體實(shí)現(xiàn)蚀之,比如 Kakfa 只支持 Append 模式。
2.7 輸出表
tableEnv.connect(new FileSystem().path(filePath))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temp", DataTypes.DOUBLE())
)
.createTemporaryTable("inputTable")
// 轉(zhuǎn)換操作
val sensorTable: Table = tableEnv.from("inputTable")
// 簡(jiǎn)單轉(zhuǎn)換
val resultTable: Table = sensorTable
.select('id, 'temp)
.filter('id === "sensor_1")
// 聚合轉(zhuǎn)換
val aggTable: Table = sensorTable
.groupBy('id)
.select('id, 'id.count as 'count)
// 輸出到外部文件系統(tǒng)或者 DataStream
val outputPath = "..."
// 注冊(cè)輸出表
tableEnv.connect(new FileSystem().path(outputPath))
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("outputTable")
//aggTable.insertInto("outputTable") aggTable 因?yàn)橛行薷牟僮鹘菖ⅲ珻svTableSink 只實(shí)現(xiàn)了 AppendStreamTableSink足删,所以無法輸出到文件。
resultTable.insertInto("outputTable")
resultTable.toAppendStream[(String, Double)].print("result")
// aggTable 因?yàn)橛行薷牟僮鞑荒苁褂?append锁右,需要使用 Retract
aggTable.toRetractStream[Row].print("agg")
3 動(dòng)態(tài)表
3.1 DataStream 上的關(guān)系查詢
關(guān)系代數(shù) / SQL | 流處理 |
---|---|
關(guān)系(或表)是有界(多)元組集合失受。 | 流是一個(gè)無限元組序列。 |
對(duì)批數(shù)據(jù)(例如關(guān)系數(shù)據(jù)庫(kù)中的表)執(zhí)行的查詢可以訪問完整的輸入數(shù)據(jù)咏瑟。 | 流式查詢?cè)趩?dòng)時(shí)不能訪問所有數(shù)據(jù)拂到,必須“等待”數(shù)據(jù)流入。 |
批處理查詢?cè)诋a(chǎn)生固定大小的結(jié)果后終止码泞。 | 流查詢不斷地根據(jù)接收到的記錄更新其結(jié)果兄旬,并且始終不會(huì)結(jié)束。 |
盡管存在這些差異余寥,但是使用關(guān)系查詢和 SQL 處理流并不是不可能的领铐。高級(jí)關(guān)系數(shù)據(jù)庫(kù)系統(tǒng)提供了一個(gè)稱為 物化視圖(Materialized Views) 的特性。物化視圖被定義為一條 SQL 查詢劈狐,緩存查詢的結(jié)果罐孝。緩存的一個(gè)常見難題是防止緩存結(jié)果過期。當(dāng)其定義查詢的基表被修改時(shí)肥缔,物化視圖將過期莲兢。 即時(shí)視圖維護(hù)(Eager View Maintenance) 是一種一旦更新了物化視圖的基表就立即更新視圖的技術(shù)。
3.2 動(dòng)態(tài)表 & 連續(xù)查詢( Continuous Query )
- 將流轉(zhuǎn)換為動(dòng)態(tài)表。
- 在動(dòng)態(tài)表上計(jì)算一個(gè)連續(xù)查詢改艇,生成一個(gè)新的動(dòng)態(tài)表收班。
- 生成的動(dòng)態(tài)表被轉(zhuǎn)換回流。
4. 窗口和時(shí)間語(yǔ)義
關(guān)于窗口和時(shí)間語(yǔ)義的介紹可以參考這篇文章谒兄。之前是在流上進(jìn)行討論的摔桦。Flink 在表上同樣支持相應(yīng)的邏輯。
4.1 時(shí)間語(yǔ)義
可以通過 DDL 方式創(chuàng)建兩種時(shí)間語(yǔ)義承疲,但是比較晦澀邻耕,這里不做舉例,感興趣可以到官網(wǎng)查看燕鸽。
4.1.1 processing time
注意處理時(shí)間屬性一定不能定義在一個(gè)已有字段上
-
流轉(zhuǎn)表時(shí):
// 聲明一個(gè)額外的字段作為時(shí)間屬性字段 val table = tEnv.fromDataStream(stream, $"UserActionTimestamp", $"user_name", $"data", $"user_action_time".proctime)
-
定義 tableSchema 時(shí):
.withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .field("temperature",DataTypes.DOUBLE()) .field("pt",DataTypes.TIMESTAMP(3)).proctime() //將該字段定義為 processing time )
4.1.2 event time
- 流轉(zhuǎn)表時(shí):
// 基于 stream 中的事件產(chǎn)生時(shí)間戳和 watermark val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...) // 聲明一個(gè)額外的邏輯字段作為事件時(shí)間屬性(數(shù)據(jù)來源于上面datastream定義好的字段)兄世,必須放在 schema 最后 val table = tEnv.fromDataStream(stream, $"user_name", $"data", $"user_action_time".rowtime) // Option 2: // 從第一個(gè)字段獲取事件時(shí)間,并且產(chǎn)生 watermark val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...) // 第一個(gè)字段已經(jīng)用作事件時(shí)間抽取了啊研,不用再用一個(gè)新字段來表示事件時(shí)間了 val table = tEnv.fromDataStream(stream, $"user_action_time".rowtime, $"user_name", $"data")
- 定義 tableSchema 時(shí):
//需要注意這種方式的 source 必須實(shí)現(xiàn) DefinedRowtimeAttributes 接口御滩。如 KafkaTableSource 實(shí)現(xiàn)了該接口。CsvTableSource 則沒有党远。 .withSchema(new Schema() .field("id",DataTypes.STRING()) .field("timestamp",DataTypes.BIGINT()) .rowtime( new Rowtime() .timestampsFromFiled("timestamp") .watermarksPeriodicBounded(1000) ) .field("temperature",DataTypes.DOUBLE()) )
4.2 窗口操作
窗口操作相當(dāng)于對(duì)數(shù)據(jù)進(jìn)行分組時(shí)削解,除了按照字段以外,增加了新的維度進(jìn)行分組沟娱,一般是時(shí)間或者數(shù)據(jù)數(shù)量氛驮。
4.2.1 Group Windows
根據(jù)時(shí)間或者行數(shù)間隔,將行聚集在有限的組中花沉,并對(duì)每個(gè)組的數(shù)據(jù)執(zhí)行一次聚合函數(shù)柳爽。最終每個(gè)組得出一個(gè)結(jié)果,類似于傳統(tǒng)對(duì) group by 操作
// 基本使用結(jié)構(gòu)
val table = input
.window([w:GroupWindow] as "w") //定義窗口和別名 w
.groupBy($"w",$"a") //以屬性 a 和窗口 w 作為分組的key
.select($"a",$"b".sum) //聚合字段b的值碱屁,求和
tumbling window
- .window( Tumble over 10.minutes on $"a_rowtime"/$"a_proctime" as "w")
- .window( Tumble over 10.rows on $"a_proctime" as "w")
- sql: tumble(ts, interval '10' second)
sliding windows
- .window( Slide over 10.minutes every 5.minutes on $"a_rowtime"/$"a_proctime as "w")
- .window( Slide over 10.rows every 5.rows on $"a_proctime" as "w")
- sql: hop(ts,interval '10' second,interval '10' second) p.s. 第二個(gè)是步長(zhǎng),第三個(gè)是窗口長(zhǎng)度
session windows
- .window( Session withGap 10.minutes on $"a_rowtime"/$"a_proctime" as "w")
- sql: session(ts,interval '10' second)
sql 輔助函數(shù),xx = {tumble,hop,session}:
- xx_start(ts, interval '10' second)
- xx_end(ts, interval '10' second)
- xx_rowtime(ts, interval '10' second)
- xx_proctime(ts, interval '10' second)
4.2.2 Over Windows
針對(duì)每個(gè)輸入行蛾找,進(jìn)行開窗娩脾,增加一列表示結(jié)果,每個(gè)行都有自己所在窗口的結(jié)果打毛。類似于傳統(tǒng)的 over 操作
// 基本使用結(jié)構(gòu)
val table = input
.window([w:OverWindow] as "w")
.select($"a",$"b".sum over $"w", $"c".min over $"w")
無界 over window
- .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding UNBOUNDEN_RANGE as "w")
- .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding UNBOUNDEN_ROW as "w")
有界 over window
- .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding 1.minutes as "w")
- .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding 10.rows as "w")