Flink Table API 和 SQL

Apache Flink 具有兩個關(guān)系型API:Table API 和SQL桐猬,用于統(tǒng)一流和批處理。
Table API 是用于 Scala 和 Java 語言的查詢API凡傅,允許以非常直觀的方式組合關(guān)系運算符的查詢肴裙,例如 select,filter 和 join。Flink SQL 的支持是基于實現(xiàn)了SQL標準的 Apache Calcite昼蛀。無論輸入是批輸入(DataSet)還是流輸入(DataStream)雀瓢,任一接口中指定的查詢都具有相同的語義并指定相同的結(jié)果。

Table API 和 SQL 還沒有完全支持并且正在積極開發(fā)中酌住。

要使用 Table API 和SQL店归,需要將以下依賴引入項目:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table_2.11</artifactId>
  <version>1.6.1</version>
</dependency>

Table API 和SQL

批處理和流式傳輸?shù)?Table API 和SQL程序都遵循相同的模式。以下代碼示例顯示了常見的程序結(jié)構(gòu):

// 批處理使用 ExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 創(chuàng)建 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 注冊 Table
tableEnv.registerTable("table1", ...)

// Table API query
val tapiResult = tableEnv.scan("table1").select(...)

// SQL query
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ...")

// Sink query result
tapiResult.writeToSink(...)

// execute
env.execute()

TableEnvironment

TableEnvironment 是 Table API 和SQL集成的核心概念酪我,它負責(zé):

  • 在內(nèi)部目錄中注冊表
  • 注冊外部目錄
  • 執(zhí)行SQL查詢
  • 注冊用戶定義的函數(shù)
  • DataStream 或 DataSet 轉(zhuǎn)換為 Table
  • 持有 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

Table 總是與特定的 TableEnvironment 綁定消痛。不能在同一查詢中組合不同 TableEnvironments 的表(例如,union 或 join)都哭。創(chuàng)建 TableEnvironment:

// STREAMING QUERY
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)

// BATCH QUERY
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for batch queries
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)

注冊 Table

TableEnvironment 維護一個按名稱注冊的表的目錄秩伞。有兩種類型的表,輸入表(input table)和輸出表(output table)欺矫。輸入表可以在 Table API 和SQL查詢中引用纱新,并提供輸入數(shù)據(jù)。輸出表可用于將 Table API 或SQL查詢的結(jié)果發(fā)送到外部系統(tǒng)穆趴。

輸入表的注冊源:

  • Table API 或SQL查詢的結(jié)果表
  • 訪問外部數(shù)據(jù)的 TableSource脸爱,例如文件,數(shù)據(jù)庫或消息系統(tǒng)
  • DataStream 或 DataSet未妹。

輸出表的注冊源:TableSink

代碼示例:

val tableEnv = TableEnvironment.getTableEnvironment(env)

// from Table API or SQL
val projTable: Table = tableEnv.scan("X").select(...)
tableEnv.registerTable("projectedTable", projTable)

// from TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
tableEnv.registerTableSource("CsvTable", csvSource)

// from TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)

// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)

tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)

注冊外部目錄

外部目錄(external catalog)可以提供有關(guān)外部數(shù)據(jù)庫和表的信息(如名稱簿废,schema,統(tǒng)計信息以及訪問信息)络它∽迕剩可以通過實現(xiàn) ExternalCatalog 接口創(chuàng)建外部目錄,并在 TableEnvironment 中注冊:

// 獲取一個 StreamTableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 創(chuàng)建一個外部目錄
val catalog: ExternalCatalog = new InMemoryExternalCatalog

// 注冊外部目錄
tableEnv.registerExternalCatalog("InMemCatalog", catalog)

查詢

Table API

Table API 是一個 Scala 和 Java 的語言集成查詢API化戳,是基于 Table類单料。Table類代表了一個流或者批表,并提供方法來使用關(guān)系型操作迂烁。這些方法返回一個新的 Table 對象看尼,這個新的 Table 對象代表著輸入的 Table 應(yīng)用關(guān)系型操作后的結(jié)果。下面的例子展示了一個簡單的 Table API 聚合查詢:

// 獲取一個 StreamTableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 注冊一個名叫 Orders 的表 ...

// 掃描注冊的 Orders 表
val orders = tableEnv.scan("Orders")

// 計算所有來自法國的客戶的收入
val revenue = orders
  .filter('cCountry === "FRANCE")
  .groupBy('cID, 'cName')
  .select('cID, 'cName, 'revenue.sum AS 'revSum)

// 執(zhí)行查詢

SQL

Flink SQL 集成是基于 Apache Calcite盟步,Apache Calcite 實現(xiàn)了標準的SQL藏斩。下面的例子展示了如何指定一個查詢并返回結(jié)果:

// 獲取一個 StreamTableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 注冊一個名叫 Orders 的表

// 計算所有來自法國的客戶的收入
val revenue = tableEnv.sqlQuery("""
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

// 執(zhí)行查詢

指定將其結(jié)果插入已注冊表的更新查詢:

// 注冊一個名叫 RevenueFrance 的輸出表

// 計算所有來自法國的客戶的收入,并更新到 RevenueFrance 表
tableEnv.sqlUpdate("""
  |INSERT INTO RevenueFrance
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

// 執(zhí)行查詢

混合使用 Table API 和SQL却盘,Table API 和SQL查詢可以很容易地合并因為它們都返回 Table 對象:

  1. Table API 查詢可以基于SQL查詢結(jié)果的 Table 來進行
  2. SQL查詢可以基于 Table API 查詢的結(jié)果來定義

輸出表

要輸出 Table 可以寫入 TableSink狰域。TableSink 是通用接口媳拴,支持各種文件格式(如:CSV,Apache Parquet兆览,Apache Avro)屈溉、存儲系統(tǒng)(如:JDBC,Apache HBase抬探,Apache Cassandra子巾,Elasticsearch)或消息系統(tǒng)(如:Apache Kafka,RabbitMQ)小压。

批處理 Table 只能寫入 BatchTableSink线梗,而流式處理 Table 需要 AppendStreamTableSink,RetractStreamTableSink 或 UpsertStreamTableSink怠益。有關(guān)可用接收器的詳細信息仪搔,請參閱 Sources & Sinks

有兩種方法可以發(fā)送表:

  • Table.writeToSink(TableSink sink) 自動匹配 schema
  • Table.insertInto(String sinkTable) 使用特定 schema

以下示例顯示如何發(fā)出Table:

// 獲取一個 StreamTableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 使用Table API和/或SQL查詢獲取一個 Table
val result: Table = ...

// 創(chuàng)建一個 TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")

// METHOD 1:
//   將結(jié)果表寫入 TableSink
result.writeToSink(sink)

// METHOD 2:
//   注冊指定 schema 的 TableSink
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG)
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)
//   將結(jié)果表寫入 TableSink
result.insertInto("CsvSinkTable")

// 執(zhí)行程序

與 DataStream 和 DataSet API 集成

Table API 和SQL查詢可以很容易地進行集成并嵌入到 DataStream 和 DataSet 程序中蜻牢。例如烤咧,可以查詢一個外部表(來自關(guān)系型數(shù)據(jù)庫的表),做一些處理(如過濾抢呆、映射煮嫌、聚合或者關(guān)聯(lián)元數(shù)據(jù)),然后使用 DataStream 或者 DataSet API(以及在這些API之上構(gòu)建的任何庫镀娶,例如CEP或 Gelly) 進行進一步處理立膛。

同樣,Table API 或者SQL查詢也可以應(yīng)用于 DataStream 或者 DataSet 程序的結(jié)果中梯码。這種交互可以通過將 DataStream 或者 DataSet 轉(zhuǎn)換成一個 Table 及將 Table 轉(zhuǎn)換成 DataStream 或者 DataSet 來實現(xiàn)。

Scala 隱式轉(zhuǎn)換

Scala Table API 支持 DataSet好啰,DataStream 以及 Table 間的隱式轉(zhuǎn)換轩娶。需要引入 org.apache.flink.table.api.scala._org.apache.flink.api.scala._

DataStream 或 DataSet 轉(zhuǎn)換為 Table

DataStream 或 DataSet 可以在 TableEnvironment 中注冊為表框往,表的 schema 根據(jù)注冊的 DataStream 或 DataSet 的數(shù)據(jù)類型來定:

val stream: DataStream[(Long, String)] = ...

// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream)

// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)

也可以直接轉(zhuǎn)換為表鳄抒,而不需要注冊:

val stream: DataStream[(Long, String)] = ...

// convert the DataStream into a Table with default fields '_1, '_2
val table1: Table = tableEnv.fromDataStream(stream)

// convert the DataStream into a Table with fields 'myLong, 'myString
val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

Table 轉(zhuǎn)換為 DataStream 或 DataSet

Table 可以轉(zhuǎn)換為 DataStream 或者 DataSet,通過這種方式椰弊,DataStream 或者 DataSet 程序就可以基于 Table API 或者SQL查詢的結(jié)果來執(zhí)行了许溅。

當(dāng)將一個 Table 轉(zhuǎn)換為 DataStream 或 DataSet 時,需要指定生成的 DataStream 或 DataSet 的數(shù)據(jù)類型秉版,即需要轉(zhuǎn)換表的行的數(shù)據(jù)類型贤重,通常最方便的轉(zhuǎn)換類型是 Row,下面列表描述了不同選項的功能:

  1. Row:字段按位置清焕、任意數(shù)量字段映射并蝗,支持 null 值祭犯,無類型安全訪問
  2. POJO:字段按名稱(POJO 字段命名為 Table 字段)、任意數(shù)量字段映射滚停,支持 null 值沃粗,類型安全訪問
  3. Case Class:字段按位置映射,不支持 null 值键畴,類型安全訪問
  4. Tuple:字段按位置映射最盅,不得多于22(Scala)或 25(Java)個字段,不支持 null 值起惕,類型安全訪問
  5. Atomic Type:Table 必須有一個字段檩禾,不支持 null 值,類型安全訪問

Table 轉(zhuǎn)換 DataStream

流式查詢的結(jié)果表會動態(tài)地更新疤祭,每個新的記錄到達輸入流時結(jié)果就會發(fā)生變化盼产。有兩種模式將 Table 轉(zhuǎn)換為 DataStream:

  1. Append Mode:只適用于當(dāng)動態(tài)表僅由 INSERT 修改時,之前的結(jié)果不會被更新勺馆。
  2. Retract Mode:始終都可以使用此模式戏售,使用一個 boolean 標識來編碼 INSERTDELETE 更改。
// 有兩個字段的 Table(String name, Integer age)
val table: Table = ...

// 將 Table 轉(zhuǎn)換為 Row 類型的 Append DataStream
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)

// 將 Table 轉(zhuǎn)換為 Tuple2<String, Integer> 類型的 Append DataStream  
val dsTuple: DataStream[(String, Int)] dsTuple = 
  tableEnv.toAppendStream[(String, Int)](table)

// 將 Table 轉(zhuǎn)換為 Row 類型的 Retact DataStream
//   一個 ReactDataStream 的類型X為表示為 DataStream[(Boolean, X)]
//   boolean 字段指定了更改的類型
//   True 是 INSERT, false 是 DELETE
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

Table 轉(zhuǎn)換 DataSet

// 有兩個字段的 Table(String name, Integer age)
val table: Table = ...

// 將 Table 轉(zhuǎn)換為 Row 類型的 DataSet
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)

//  將 Table 轉(zhuǎn)換為 Tuple2<String, Integer> 類型的 DataSet
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

將數(shù)據(jù)類型映射到表模式(Table schema)

DataStream 和 DataSet API 支持多種數(shù)據(jù)類型草穆,如:Tuple灌灾、POJO、case class 及 Row 類型悲柱。

原子類型

Flink 將原生類型(Integer锋喜、Double、String...)或泛型類型視為原子類型(Atomic type)豌鸡。一個原子類型的 DataStream 或 DataSet 可以轉(zhuǎn)換為只有一個屬性的 Table嘿般,屬性的類型根據(jù)原子類型推算,并且必須指定屬性的名稱涯冠。

val stream: DataStream[Long] = ...

// convert DataStream into Table with default field name "f0"
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field name "myLong"
val table: Table = tableEnv.fromDataStream(stream, 'myLong)

Tuple 和 Case Class

Flink 支持 Scala 原生的 Tuple 類型炉奴,也為 Java 提供了 Tuple 類。兩種類型的 DataStream 和 DataSet 都可以被轉(zhuǎn)換為 Table蛇更。通過為所有字段提供名稱(基于位置的映射)瞻赶,可以重命名字段。如果未指定字段名派任,則使用默認字段名砸逊。基于名稱的映射允許使用別名(as)重新排序字段掌逛。

val stream: DataStream[(Long, String)] = ...

// convert DataStream into Table with renamed default field names '_1, '_2
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field names "myLong", "myString" (position-based)
val table: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

// convert DataStream into Table with reordered fields "_2", "_1" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2, '_1)

// convert DataStream into Table with projected field "_2" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2)

// convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myString, '_1 as 'myLong)

// define case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...

// convert DataStream into Table with default field names 'name, 'age
val table = tableEnv.fromDataStream(streamCC)

// convert DataStream into Table with field names 'myName, 'myAge (position-based)
val table = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)

// convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)

POJO

Flink 支持使用 POJO 作為復(fù)合類型师逸。當(dāng)將一個 POJO 類型的 DataStream 或 DataSet 轉(zhuǎn)換為 Table 而不指定字段名稱時,Table 的字段名稱將采用 POJO 原生的字段名稱颤诀。重命名原始的 POJO 字段需要關(guān)鍵字AS字旭,因為 POJO 沒有固定的順序对湃,名稱映射需要原始名稱并且不能通過位置來完成。

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Person is a POJO with field names "name" and "age"
val stream: DataStream[Person] = ...

// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)

// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name)

// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)

Row

Row 類型支持任意數(shù)量的字段遗淳,并且支持 null 值拍柒。字段名稱可以通過 RowTypeInfo 來指定或者將一個 Row 類型的 DataStream 或 DataSet 轉(zhuǎn)換為 Table 時指定。Row 類型支持按位置和名字映射屈暗〔鹧叮可以通過為所有字段提供名稱(基于位置)或為 映射/排序/重命名(基于名稱)單獨選擇字段來重命名字段。

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
val stream: DataStream[Row] = ...

// convert DataStream into Table with default field names "name", "age"
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
val table: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)

// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)

// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name)

// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)

Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/common.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末养叛,一起剝皮案震驚了整個濱河市种呐,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌弃甥,老刑警劉巖爽室,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異淆攻,居然都是意外死亡阔墩,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進店門瓶珊,熙熙樓的掌柜王于貴愁眉苦臉地迎上來啸箫,“玉大人,你說我怎么就攤上這事伞芹⊥粒” “怎么了?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵唱较,是天一觀的道長扎唾。 經(jīng)常有香客問我,道長绊汹,這世上最難降的妖魔是什么稽屏? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮西乖,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘坛增。我一直安慰自己获雕,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布收捣。 她就那樣靜靜地躺著届案,像睡著了一般。 火紅的嫁衣襯著肌膚如雪罢艾。 梳的紋絲不亂的頭發(fā)上楣颠,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天尽纽,我揣著相機與錄音,去河邊找鬼童漩。 笑死弄贿,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的矫膨。 我是一名探鬼主播差凹,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼侧馅!你這毒婦竟也來了危尿?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤馁痴,失蹤者是張志新(化名)和其女友劉穎谊娇,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體罗晕,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡济欢,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了攀例。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片船逮。...
    茶點故事閱讀 38,117評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖粤铭,靈堂內(nèi)的尸體忽然破棺而出挖胃,到底是詐尸還是另有隱情,我是刑警寧澤梆惯,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布酱鸭,位于F島的核電站,受9級特大地震影響垛吗,放射性物質(zhì)發(fā)生泄漏凹髓。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一怯屉、第九天 我趴在偏房一處隱蔽的房頂上張望蔚舀。 院中可真熱鬧,春花似錦锨络、人聲如沸赌躺。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽礼患。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間缅叠,已是汗流浹背悄泥。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留肤粱,地道東北人弹囚。 一個月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓,卻偏偏與公主長得像狼犯,于是被迫代替她去往敵國和親余寥。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,877評論 2 345

推薦閱讀更多精彩內(nèi)容