1 意義
1.1 分層的 APIs & 抽象層次
Flink提供三層API鸭你。 每個(gè)API在簡(jiǎn)潔性和表達(dá)性之間提供不同的權(quán)衡,并針對(duì)不同的用例。
而且Flink提供不同級(jí)別的抽象來(lái)開(kāi)發(fā)流/批處理應(yīng)用程序
- 最低級(jí)抽象只提供有狀態(tài)流垮刹。它通過(guò)Process Function嵌入到DataStream API中。它允許用戶自由處理來(lái)自一個(gè)或多個(gè)流的事件张弛,并使用一致的容錯(cuò)狀態(tài)荒典。此外酪劫,用戶可以注冊(cè)事件時(shí)間和處理時(shí)間回調(diào),允許程序?qū)崿F(xiàn)復(fù)雜的計(jì)算寺董。
- 實(shí)際上覆糟,大多數(shù)應(yīng)用程序不需要上述低級(jí)抽象,而是針對(duì)Core API編程遮咖, 如DataStream API(有界/無(wú)界流)和DataSet API (有界數(shù)據(jù)集)滩字。這些流暢的API提供了用于數(shù)據(jù)處理的通用構(gòu)建塊,例如各種形式的用戶指定的轉(zhuǎn)換御吞,連接麦箍,聚合,窗口陶珠,狀態(tài)等挟裂。在這些API中處理的數(shù)據(jù)類型在相應(yīng)的編程語(yǔ)言中表示為類。
低級(jí)Process Function與DataStream API集成揍诽,因此只能對(duì)某些 算子操作進(jìn)行低級(jí)抽象诀蓉。該數(shù)據(jù)集API提供的有限數(shù)據(jù)集的其他原語(yǔ),如循環(huán)/迭代暑脆。 - 該 Table API 是為中心的聲明性DSL 表渠啤,其可被動(dòng)態(tài)地改變的表(表示流時(shí))。該 Table API遵循(擴(kuò)展)關(guān)系模型:表有一個(gè)模式連接(類似于在關(guān)系數(shù)據(jù)庫(kù)中的表)和API提供可比的 算子操作饵筑,如選擇埃篓,項(xiàng)目,連接根资,分組依據(jù),聚合等 Table API程序以聲明方式定義應(yīng)該執(zhí)行的邏輯 算子操作同窘,而不是準(zhǔn)確指定 算子操作代碼的外觀玄帕。雖然 Table API可以通過(guò)各種類型的用戶定義函數(shù)進(jìn)行擴(kuò)展,但它的表現(xiàn)力不如Core API想邦,但使用更簡(jiǎn)潔(編寫(xiě)的代碼更少)裤纹。此外, Table API程序還會(huì)通過(guò)優(yōu)化程序丧没,在執(zhí)行之前應(yīng)用優(yōu)化規(guī)則鹰椒。
可以在表和DataStream / DataSet之間無(wú)縫轉(zhuǎn)換,允許程序混合 Table API以及DataStream 和DataSet API呕童。 - Flink提供的最高級(jí)抽象是SQL漆际。這種抽象在語(yǔ)義和表達(dá)方面類似于 Table API,但是將程序表示為SQL查詢表達(dá)式夺饲。在SQL抽象與 Table API緊密地相互作用奸汇,和SQL查詢可以通過(guò)定義表來(lái)執(zhí)行 Table API施符。1.2 模型類比MapReduce ==> Hive SQL
Spark ==> Spark SQL
Flink ==> SQL
2 總覽
2.1 簡(jiǎn)介
Apache Flink具有兩個(gè)關(guān)系型API
- Table API
- SQL
用于統(tǒng)一流和批處理
Table API是Scala和Java語(yǔ)言集成查詢API,可以非常直觀的方式組合來(lái)自關(guān)系算子的查詢(e.g. 選擇擂找,過(guò)濾和連接).
Flink的SQL支持基于實(shí)現(xiàn)SQL標(biāo)準(zhǔn)的Apache Calcite戳吝。無(wú)論輸入是批輸入(DataSet)還是流輸入(DataStream),任一接口中指定的查詢都具有相同的語(yǔ)義并指定相同的結(jié)果贯涎。
Table API和SQL接口彼此緊密集成听哭,就如Flink的DataStream和DataSet API。我們可以輕松地在基于API構(gòu)建的所有API和庫(kù)之間切換塘雳。例如欢唾,可以使用CEP庫(kù)從DataStream中提取模式,然后使用 Table API分析模式粉捻,或者可以在預(yù)處理上運(yùn)行Gelly圖算法之前使用SQL查詢掃描礁遣,過(guò)濾和聚合批處理表數(shù)據(jù)。
Table API和SQL尚未完成并且正在積極開(kāi)發(fā)中肩刃。并非 Table API祟霍,SQL和stream,batch輸入的每種組合都支持所有算子操作
2.2 依賴結(jié)構(gòu)
所有Table API和SQL組件都捆綁在flink-table Maven工件中盈包。
以下依賴項(xiàng)與大多數(shù)項(xiàng)目相關(guān):
- flink-table-common
通過(guò)自定義函數(shù)沸呐,格式等擴(kuò)展表生態(tài)系統(tǒng)的通用模塊。 - flink-table-api-java
使用Java編程語(yǔ)言的純表程序的表和SQL API(在早期開(kāi)發(fā)階段呢燥,不推薦U柑怼)。 - flink-table-api-scala
使用Scala編程語(yǔ)言的純表程序的表和SQL API(在早期開(kāi)發(fā)階段叛氨,不推薦:粼)。 - flink-table-api-java-bridge
使用Java編程語(yǔ)言支持DataStream / DataSet API的Table&SQL API寞埠。 - flink-table-api-scala-bridge
使用Scala編程語(yǔ)言支持DataStream / DataSet API的Table&SQL API屁置。 - flink-table-planner
表程序規(guī)劃器和運(yùn)行時(shí)。 - flink-table-uber
將上述模塊打包成大多數(shù)Table&SQL API用例的發(fā)行版仁连。 uber JAR文件flink-table * .jar位于Flink版本的/ opt目錄中蓝角,如果需要可以移動(dòng)到/ lib。
2.3 項(xiàng)目依賴
必須將以下依賴項(xiàng)添加到項(xiàng)目中才能使用Table API和SQL來(lái)定義管道:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.8.0</version>
</dependency>
此外饭冬,根據(jù)目標(biāo)編程語(yǔ)言使鹅,您需要添加Java或Scala API。
<!-- Either... -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.8.0</version>
</dependency>
<!-- or... -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.8.0</version>
</dependency>
在內(nèi)部昌抠,表生態(tài)系統(tǒng)的一部分是在Scala中實(shí)現(xiàn)的患朱。 因此,請(qǐng)確保為批處理和流應(yīng)用程序添加以下依賴項(xiàng):
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.8.0</version>
</dependency>
2.4 擴(kuò)展依賴
如果要實(shí)現(xiàn)與Kafka或一組用戶定義函數(shù)交互的自定義格式扰魂,以下依賴關(guān)系就足夠了麦乞,可用于SQL客戶端的JAR文件:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.8.0</version>
</dependency>
目前蕴茴,該模塊包括以下擴(kuò)展點(diǎn):
- SerializationSchemaFactory
- DeserializationSchemaFactory
- ScalarFunction
- TableFunction
- AggregateFunction
3 概念和通用API
Table API和SQL集成在一個(gè)聯(lián)合API中。此API的核心概念是Table用作查詢的輸入和輸出姐直。本文檔顯示了具有 Table API和SQL查詢的程序的常見(jiàn)結(jié)構(gòu)倦淀,如何注冊(cè)Table,如何查詢Table以及如何發(fā)出Table声畏。
3.1 Table API和SQL程序的結(jié)構(gòu)
批處理和流式傳輸?shù)乃?Table API和SQL程序都遵循相同的模式撞叽。以下代碼示例顯示了 Table API和SQL程序的常見(jiàn)結(jié)構(gòu)。
// 對(duì)于批處理程序插龄,使用ExecutionEnvironment而不是StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 創(chuàng)建一個(gè)TableEnvironment
// 對(duì)于批處理程序使用BatchTableEnvironment而不是StreamTableEnvironment
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 注冊(cè)一個(gè) Table
tableEnv.registerTable("table1", ...) // 或者
tableEnv.registerTableSource("table2", ...); // 或者
tableEnv.registerExternalCatalog("extCat", ...);
// 注冊(cè)一個(gè)輸出 Table
tableEnv.registerTableSink("outputTable", ...);
/ 從 Table API query 創(chuàng)建一個(gè)Table
Table tapiResult = tableEnv.scan("table1").select(...);
// 從 SQL query 創(chuàng)建一個(gè)Table
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");
// 將表API結(jié)果表發(fā)送到TableSink愿棋,對(duì)于SQL結(jié)果也是如此
tapiResult.insertInto("outputTable");
// 執(zhí)行
env.execute();
3.2 將DataStream或DataSet轉(zhuǎn)換為表
它也可以直接轉(zhuǎn)換為a 而不是注冊(cè)a DataStream或DataSetin 。如果要在 Table API查詢中使用Table均牢,這很方便糠雨。TableEnvironmentTable
// 獲取StreamTableEnvironment
//在BatchTableEnvironment中注冊(cè)DataSet是等效的
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStream<Tuple2<Long, String>> stream = ...
// 將DataStream轉(zhuǎn)換為默認(rèn)字段為“f0”,“f1”的表
Table table1 = tableEnv.fromDataStream(stream);
// 將DataStream轉(zhuǎn)換為包含字段“myLong”徘跪,“myString”的表
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
-
sale.csv文件
-
Scala
-
Java
還不完善,等日后Flink該模塊開(kāi)發(fā)完畢再深入研究!