Flink實(shí)戰(zhàn)(六) - Table API & SQL編程

1 意義

1.1 分層的 APIs & 抽象層次

Flink提供三層API鸭你。 每個(gè)API在簡(jiǎn)潔性和表達(dá)性之間提供不同的權(quán)衡,并針對(duì)不同的用例。

而且Flink提供不同級(jí)別的抽象來(lái)開(kāi)發(fā)流/批處理應(yīng)用程序

  • 最低級(jí)抽象只提供有狀態(tài)流垮刹。它通過(guò)Process Function嵌入到DataStream API中。它允許用戶自由處理來(lái)自一個(gè)或多個(gè)流的事件张弛,并使用一致的容錯(cuò)狀態(tài)荒典。此外酪劫,用戶可以注冊(cè)事件時(shí)間和處理時(shí)間回調(diào),允許程序?qū)崿F(xiàn)復(fù)雜的計(jì)算寺董。
  • 實(shí)際上覆糟,大多數(shù)應(yīng)用程序不需要上述低級(jí)抽象,而是針對(duì)Core API編程遮咖, 如DataStream API(有界/無(wú)界流)和DataSet API (有界數(shù)據(jù)集)滩字。這些流暢的API提供了用于數(shù)據(jù)處理的通用構(gòu)建塊,例如各種形式的用戶指定的轉(zhuǎn)換御吞,連接麦箍,聚合,窗口陶珠,狀態(tài)等挟裂。在這些API中處理的數(shù)據(jù)類型在相應(yīng)的編程語(yǔ)言中表示為類。
    低級(jí)Process Function與DataStream API集成揍诽,因此只能對(duì)某些 算子操作進(jìn)行低級(jí)抽象诀蓉。該數(shù)據(jù)集API提供的有限數(shù)據(jù)集的其他原語(yǔ),如循環(huán)/迭代暑脆。
  • Table API 是為中心的聲明性DSL 表渠啤,其可被動(dòng)態(tài)地改變的表(表示流時(shí))。該 Table API遵循(擴(kuò)展)關(guān)系模型:表有一個(gè)模式連接(類似于在關(guān)系數(shù)據(jù)庫(kù)中的表)和API提供可比的 算子操作饵筑,如選擇埃篓,項(xiàng)目,連接根资,分組依據(jù),聚合等 Table API程序以聲明方式定義應(yīng)該執(zhí)行的邏輯 算子操作同窘,而不是準(zhǔn)確指定 算子操作代碼的外觀玄帕。雖然 Table API可以通過(guò)各種類型的用戶定義函數(shù)進(jìn)行擴(kuò)展,但它的表現(xiàn)力不如Core API想邦,但使用更簡(jiǎn)潔(編寫(xiě)的代碼更少)裤纹。此外, Table API程序還會(huì)通過(guò)優(yōu)化程序丧没,在執(zhí)行之前應(yīng)用優(yōu)化規(guī)則鹰椒。
    可以在表和DataStream / DataSet之間無(wú)縫轉(zhuǎn)換,允許程序混合 Table API以及DataStream 和DataSet API呕童。
  • Flink提供的最高級(jí)抽象是SQL漆际。這種抽象在語(yǔ)義和表達(dá)方面類似于 Table API,但是將程序表示為SQL查詢表達(dá)式夺饲。在SQL抽象與 Table API緊密地相互作用奸汇,和SQL查詢可以通過(guò)定義表來(lái)執(zhí)行 Table API施符。1.2 模型類比MapReduce ==> Hive SQL
    Spark ==> Spark SQL
    Flink ==> SQL

2 總覽

2.1 簡(jiǎn)介

Apache Flink具有兩個(gè)關(guān)系型API

  • Table API
  • SQL

用于統(tǒng)一流和批處理

Table API是Scala和Java語(yǔ)言集成查詢API,可以非常直觀的方式組合來(lái)自關(guān)系算子的查詢(e.g. 選擇擂找,過(guò)濾和連接).

Flink的SQL支持基于實(shí)現(xiàn)SQL標(biāo)準(zhǔn)的Apache Calcite戳吝。無(wú)論輸入是批輸入(DataSet)還是流輸入(DataStream),任一接口中指定的查詢都具有相同的語(yǔ)義并指定相同的結(jié)果贯涎。

Table API和SQL接口彼此緊密集成听哭,就如Flink的DataStream和DataSet API。我們可以輕松地在基于API構(gòu)建的所有API和庫(kù)之間切換塘雳。例如欢唾,可以使用CEP庫(kù)從DataStream中提取模式,然后使用 Table API分析模式粉捻,或者可以在預(yù)處理上運(yùn)行Gelly圖算法之前使用SQL查詢掃描礁遣,過(guò)濾和聚合批處理表數(shù)據(jù)。

Table API和SQL尚未完成并且正在積極開(kāi)發(fā)中肩刃。并非 Table API祟霍,SQL和stream,batch輸入的每種組合都支持所有算子操作

2.2 依賴結(jié)構(gòu)

所有Table API和SQL組件都捆綁在flink-table Maven工件中盈包。

以下依賴項(xiàng)與大多數(shù)項(xiàng)目相關(guān):

  • flink-table-common
    通過(guò)自定義函數(shù)沸呐,格式等擴(kuò)展表生態(tài)系統(tǒng)的通用模塊。
  • flink-table-api-java
    使用Java編程語(yǔ)言的純表程序的表和SQL API(在早期開(kāi)發(fā)階段呢燥,不推薦U柑怼)。
  • flink-table-api-scala
    使用Scala編程語(yǔ)言的純表程序的表和SQL API(在早期開(kāi)發(fā)階段叛氨,不推薦:粼)。
  • flink-table-api-java-bridge
    使用Java編程語(yǔ)言支持DataStream / DataSet API的Table&SQL API寞埠。
  • flink-table-api-scala-bridge
    使用Scala編程語(yǔ)言支持DataStream / DataSet API的Table&SQL API屁置。
  • flink-table-planner
    表程序規(guī)劃器和運(yùn)行時(shí)。
  • flink-table-uber
    將上述模塊打包成大多數(shù)Table&SQL API用例的發(fā)行版仁连。 uber JAR文件flink-table * .jar位于Flink版本的/ opt目錄中蓝角,如果需要可以移動(dòng)到/ lib。

2.3 項(xiàng)目依賴

必須將以下依賴項(xiàng)添加到項(xiàng)目中才能使用Table API和SQL來(lái)定義管道:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner_2.11</artifactId>
  <version>1.8.0</version>
</dependency>

此外饭冬,根據(jù)目標(biāo)編程語(yǔ)言使鹅,您需要添加Java或Scala API。

<!-- Either... -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.8.0</version>
</dependency>
<!-- or... -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
  <version>1.8.0</version>
</dependency>

在內(nèi)部昌抠,表生態(tài)系統(tǒng)的一部分是在Scala中實(shí)現(xiàn)的患朱。 因此,請(qǐng)確保為批處理和流應(yīng)用程序添加以下依賴項(xiàng):

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-scala_2.11</artifactId>
  <version>1.8.0</version>
</dependency>

2.4 擴(kuò)展依賴

如果要實(shí)現(xiàn)與Kafka或一組用戶定義函數(shù)交互的自定義格式扰魂,以下依賴關(guān)系就足夠了麦乞,可用于SQL客戶端的JAR文件:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-common</artifactId>
  <version>1.8.0</version>
</dependency>

目前蕴茴,該模塊包括以下擴(kuò)展點(diǎn):

  • SerializationSchemaFactory
  • DeserializationSchemaFactory
  • ScalarFunction
  • TableFunction
  • AggregateFunction

3 概念和通用API

Table API和SQL集成在一個(gè)聯(lián)合API中。此API的核心概念是Table用作查詢的輸入和輸出姐直。本文檔顯示了具有 Table API和SQL查詢的程序的常見(jiàn)結(jié)構(gòu)倦淀,如何注冊(cè)Table,如何查詢Table以及如何發(fā)出Table声畏。

3.1 Table API和SQL程序的結(jié)構(gòu)

批處理和流式傳輸?shù)乃?Table API和SQL程序都遵循相同的模式撞叽。以下代碼示例顯示了 Table API和SQL程序的常見(jiàn)結(jié)構(gòu)。

// 對(duì)于批處理程序插龄,使用ExecutionEnvironment而不是StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 創(chuàng)建一個(gè)TableEnvironment
// 對(duì)于批處理程序使用BatchTableEnvironment而不是StreamTableEnvironment
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// 注冊(cè)一個(gè) Table
tableEnv.registerTable("table1", ...)            // 或者
tableEnv.registerTableSource("table2", ...);     // 或者
tableEnv.registerExternalCatalog("extCat", ...);
// 注冊(cè)一個(gè)輸出 Table
tableEnv.registerTableSink("outputTable", ...);

/ 從 Table API query 創(chuàng)建一個(gè)Table
Table tapiResult = tableEnv.scan("table1").select(...);
// 從 SQL query 創(chuàng)建一個(gè)Table
Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");

// 將表API結(jié)果表發(fā)送到TableSink愿棋,對(duì)于SQL結(jié)果也是如此
tapiResult.insertInto("outputTable");

// 執(zhí)行
env.execute();

3.2 將DataStream或DataSet轉(zhuǎn)換為表

它也可以直接轉(zhuǎn)換為a 而不是注冊(cè)a DataStream或DataSetin 。如果要在 Table API查詢中使用Table均牢,這很方便糠雨。TableEnvironmentTable

// 獲取StreamTableEnvironment
//在BatchTableEnvironment中注冊(cè)DataSet是等效的
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

DataStream<Tuple2<Long, String>> stream = ...

// 將DataStream轉(zhuǎn)換為默認(rèn)字段為“f0”,“f1”的表
Table table1 = tableEnv.fromDataStream(stream);

// 將DataStream轉(zhuǎn)換為包含字段“myLong”徘跪,“myString”的表
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
  • sale.csv文件


  • Scala


  • Java


還不完善,等日后Flink該模塊開(kāi)發(fā)完畢再深入研究!

參考

Table API & SQL

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末甘邀,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子垮庐,更是在濱河造成了極大的恐慌松邪,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,204評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件哨查,死亡現(xiàn)場(chǎng)離奇詭異逗抑,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)寒亥,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)邮府,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人护盈,你說(shuō)我怎么就攤上這事挟纱。” “怎么了腐宋?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,548評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)檀轨。 經(jīng)常有香客問(wèn)我胸竞,道長(zhǎng),這世上最難降的妖魔是什么参萄? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,657評(píng)論 1 293
  • 正文 為了忘掉前任卫枝,我火速辦了婚禮,結(jié)果婚禮上讹挎,老公的妹妹穿的比我還像新娘校赤。我一直安慰自己吆玖,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,689評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布马篮。 她就那樣靜靜地躺著沾乘,像睡著了一般。 火紅的嫁衣襯著肌膚如雪浑测。 梳的紋絲不亂的頭發(fā)上翅阵,一...
    開(kāi)封第一講書(shū)人閱讀 51,554評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音迁央,去河邊找鬼掷匠。 笑死,一個(gè)胖子當(dāng)著我的面吹牛岖圈,可吹牛的內(nèi)容都是我干的讹语。 我是一名探鬼主播,決...
    沈念sama閱讀 40,302評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼蜂科,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼顽决!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起崇摄,我...
    開(kāi)封第一講書(shū)人閱讀 39,216評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤擎值,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后逐抑,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體鸠儿,經(jīng)...
    沈念sama閱讀 45,661評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,851評(píng)論 3 336
  • 正文 我和宋清朗相戀三年厕氨,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了进每。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,977評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡命斧,死狀恐怖田晚,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情国葬,我是刑警寧澤贤徒,帶...
    沈念sama閱讀 35,697評(píng)論 5 347
  • 正文 年R本政府宣布,位于F島的核電站汇四,受9級(jí)特大地震影響接奈,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜通孽,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,306評(píng)論 3 330
  • 文/蒙蒙 一序宦、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧背苦,春花似錦互捌、人聲如沸潘明。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,898評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)钳降。三九已至,卻和暖如春巢价,著一層夾襖步出監(jiān)牢的瞬間牲阁,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,019評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工壤躲, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留城菊,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,138評(píng)論 3 370
  • 正文 我出身青樓碉克,卻偏偏與公主長(zhǎng)得像凌唬,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子漏麦,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,927評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容