Flink 中的 API
Flink 為流式/批式處理應用程序的開發(fā)提供了不同級別的抽象
- 最頂層抽象是 SQL,這層抽象在語義和程序表達式上都類似于 Table API,但是其程序?qū)崿F(xiàn)都是 SQL 查詢表達式惫周。SQL 抽象與 Table API 抽象之間的關聯(lián)是非常緊密的,并且 SQL 查詢語句可以在 Table API 中定義的表上執(zhí)行饥臂。
- 第三層抽象是 Table API,Table API 是以表(Table)為中心的聲明式編程(DSL)API
- 第二層抽象是 Core APIs酵熙,包含 DataStream API(應用于有界/無界數(shù)據(jù)流場景)和 DataSet API(應用于有界數(shù)據(jù)集場景)兩部分
- 最底層察藐,F(xiàn)link API 最底層的抽象為有狀態(tài)實時流處理譬猫。其抽象實現(xiàn)是 Process Function柳刮,并且 Process Function 被 Flink 框架集成到了 DataStream API 中來為我們使用
Flink DataStream API 編程
Flink 中的 DataStream 程序是對數(shù)據(jù)流(例如過濾、更新狀態(tài)梢灭、定義窗口义屏、聚合)進行轉(zhuǎn)換的常規(guī)程序兄墅。數(shù)據(jù)流的起始是從各種源(例如消息隊列五督、套接字流误证、文件)創(chuàng)建的。結(jié)果通過 sink 返回,例如可以將數(shù)據(jù)寫入文件或標準輸出(例如命令行終端)。Flink 程序可以在各種上下文中運行珠十,可以獨立運行,也可以嵌入到其它程序中。任務執(zhí)行可以運行在本地 JVM 中姿搜,也可以運行在多臺機器的集群上致份。
DataStream 是什么?
DataStream API 得名于特殊的 DataStream 類,該類用于表示 Flink 程序中的數(shù)據(jù)集合。你可以通過在 Flink 程序中添加 source 創(chuàng)建一個初始的 DataStream吊洼。然后,你可以基于 DataStream 派生新的流意乓,并使用 map乞而、filter 等 API 方法把 DataStream 和派生的流連接在一起。
(感覺比較偏開發(fā),先跳過)
Table API & SQL
Apache Flink 有兩種關系型 API 來做流批統(tǒng)一處理:Table API 和 SQL罐韩。Table API 是用于 Scala 和 Java 語言的查詢API晦款,它可以用一種非常直觀的方式來組合使用選取淤齐、過濾、join 等關系型算子。Flink SQL 是基于 Apache Calcite 來實現(xiàn)的標準 SQL贰拿。無論輸入是連續(xù)的(流式)還是有界的(批處理)珍德,在兩個接口中指定的查詢都具有相同的語義,并指定相同的結(jié)果。
概念與通用API
Table API 和 SQL 集成在同一套 API 中创泄。 這套 API 的核心概念是Table若治,用作查詢的輸入和輸出。 本文介紹 Table API 和 SQL 查詢程序的通用結(jié)構(gòu)募谎、如何注冊 Table 铜异、如何查詢 Table 以及如何輸出 Table 。
1试浙、在Catalog中創(chuàng)建表
Table 可以是虛擬的(視圖 VIEWS)也可以是常規(guī)的(表 TABLES)艘刚。視圖 VIEWS可以從已經(jīng)存在的Table中創(chuàng)建钱床,一般是 Table API 或者 SQL 的查詢結(jié)果。 表TABLES描述的是外部數(shù)據(jù),例如文件胡岔、數(shù)據(jù)庫表或者消息隊列苫亦。
臨時表(Temporary Table)和永久表(Permanent Table)
表可以是臨時的屋剑,并與單個 Flink 會話(session)的生命周期相關,也可以是永久的峡懈,并且在多個 Flink 會話和群集(cluster)中可見磷支。
永久表需要 catalog(例如 Hive Metastore)以維護表的元數(shù)據(jù)箍邮。一旦永久表被創(chuàng)建,它將對任何連接到 catalog 的 Flink 會話可見且持續(xù)存在光戈,直至被明確刪除哪痰。
創(chuàng)建表
可以通過 connector 聲明。Connector 描述了存儲表數(shù)據(jù)的外部系統(tǒng)久妆。存儲系統(tǒng)例如 Apache Kafka 或者常規(guī)的文件系統(tǒng)都可以通過這種方式來聲明
CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)
擴展表標識符
表總是通過三元標識符注冊晌杰,包括 catalog 名、數(shù)據(jù)庫名和表名筷弦。
用戶可以指定一個 catalog 和數(shù)據(jù)庫作為 “當前catalog” 和"當前數(shù)據(jù)庫"肋演。有了這些,那么剛剛提到的三元標識符的前兩個部分就可以被省略了。如果前兩部分的標識符沒有指定惋啃, 那么會使用當前的 catalog 和當前數(shù)據(jù)庫哼鬓。
2、查詢表
Table API(不常用边灭,跳過)
Table API 是關于 Scala 和 Java 的集成語言式查詢 API异希。與 SQL 相反,Table API 的查詢不是由字符串指定绒瘦,而是在宿主語言中逐步構(gòu)建称簿。
SQL
文檔 SQL 描述了Flink對流處理和批處理表的SQL支持
3、輸出表
Table 通過寫入 TableSink 輸出惰帽。TableSink 是一個通用接口憨降,用于支持多種文件格式(如 CSV、Apache Parquet该酗、Apache Avro)授药、存儲系統(tǒng)(如 JDBC、Apache HBase呜魄、Apache Cassandra悔叽、Elasticsearch)或消息隊列系統(tǒng)(如 Apache Kafka、RabbitMQ)爵嗅。
批處理 Table 只能寫入 BatchTableSink娇澎,而流處理 Table 需要指定寫入 AppendStreamTableSink,RetractStreamTableSink 或者 UpsertStreamTableSink睹晒。
更多關于可用 Sink 的信息以及如何自定義 DynamicTableSink在鏈接User-defined Sources & Sinks
4趟庄、解釋表
Table API 提供了一種機制來解釋計算 Table 的邏輯和優(yōu)化查詢計劃。 這是通過 Table.explain() 方法或者 StatementSet.explain() 方法來完成的伪很。Table.explain() 返回一個 Table 的計劃戚啥。StatementSet.explain() 返回多 sink 計劃的結(jié)果。它返回一個描述三種計劃的字符串:
- 關系查詢的抽象語法樹(the Abstract Syntax Tree)锉试,即未優(yōu)化的邏輯查詢計劃虑鼎,
- 優(yōu)化的邏輯查詢計劃
- 物理執(zhí)行計劃。
(提交flink的sql代碼的時候需要先檢查键痛,檢查輸出的結(jié)果如下)
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
: +- LogicalTableScan(table=[[Unregistered_DataStream_1]])
+- LogicalTableScan(table=[[Unregistered_DataStream_2]])
== Optimized Physical Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
: +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[count, word])
+- DataStreamScan(table=[[Unregistered_DataStream_2]], fields=[count, word])
== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
: +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[count, word])
+- DataStreamScan(table=[[Unregistered_DataStream_2]], fields=[count, word])