聊聊flink的Table API及SQL Programs

本文主要研究一下flink的Table API及SQL Programs

實(shí)例

// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// create a TableEnvironment
// for batch programs use BatchTableEnvironment instead of StreamTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register a Table
tableEnv.registerTable("table1", ...)            // or
tableEnv.registerTableSource("table2", ...);     // or
tableEnv.registerExternalCatalog("extCat", ...);
// register an output Table
tableEnv.registerTableSink("outputTable", ...);

// create a Table from a Table API query
Table tapiResult = tableEnv.scan("table1").select(...);
// create a Table from a SQL query
Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");

// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.insertInto("outputTable");

// execute
env.execute();
  • 本實(shí)例展示了flink的Table API及SQL Programs的基本用法

Table API實(shí)例

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register Orders table

// scan registered Orders table
Table orders = tableEnv.scan("Orders");
// compute revenue for all customers from France
Table revenue = orders
  .filter("cCountry === 'FRANCE'")
  .groupBy("cID, cName")
  .select("cID, cName, revenue.sum AS revSum");

// emit or convert Table
// execute query
  • 通過(guò)tableEnv.scan方法來(lái)創(chuàng)建Table普筹,之后使用Table的各種查詢api

sqlQuery實(shí)例

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register Orders table

// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );

// emit or convert Table
// execute query
  • sqlQuery內(nèi)部是使用Apache Calcite來(lái)實(shí)現(xiàn)的

sqlUpdate實(shí)例(TableSink)

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// register "Orders" table
// register "RevenueFrance" output table

// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.sqlUpdate(
    "INSERT INTO RevenueFrance " +
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );

// execute query
  • 這里使用TableSink注冊(cè)output table之后前硫,就可以使用TableEnvironment的sqlUpdate方法sink到output table

insertInto實(shí)例(TableSink)

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// create a TableSink
TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");

// register the TableSink with a specific schema
String[] fieldNames = {"a", "b", "c"};
TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink);

// compute a result Table using Table API operators and/or SQL queries
Table result = ...
// emit the result Table to the registered TableSink
result.insertInto("CsvSinkTable");

// execute the program
  • 通過(guò)Table.insertInto方法sink到output table

DataStream(或DataSet)與Table轉(zhuǎn)換

注冊(cè)DataStream為Table

// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream);

// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
  • 通過(guò)StreamTableEnvironment.registerDataStream注冊(cè)DataStream為Table

DataStream轉(zhuǎn)Table實(shí)例

// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);

// Convert the DataStream into a Table with fields "myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
  • 這里通過(guò)StreamTableEnvironment.fromDataStream將DataStream轉(zhuǎn)為Table

Table轉(zhuǎn)DataStream實(shí)例


// get StreamTableEnvironment. 
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// Table with two fields (String name, Integer age)
Table table = ...

// convert the Table into an append DataStream of Row by specifying the class
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);

// convert the Table into an append DataStream of Tuple2<String, Integer> 
//   via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  Types.STRING(),
  Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple = 
  tableEnv.toAppendStream(table, tupleType);

// convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream<Tuple2<Boolean, X>>. 
//   The boolean field indicates the type of the change. 
//   True is INSERT, false is DELETE.
DataStream<Tuple2<Boolean, Row>> retractStream = 
  tableEnv.toRetractStream(table, Row.class);
  • 這里通過(guò)StreamTableEnvironment.toRetractStream將Table轉(zhuǎn)換為DataStream

Table轉(zhuǎn)DataSet實(shí)例

// get BatchTableEnvironment
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// Table with two fields (String name, Integer age)
Table table = ...

// convert the Table into a DataSet of Row by specifying a class
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);

// convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  Types.STRING(),
  Types.INT());
DataSet<Tuple2<String, Integer>> dsTuple = 
  tableEnv.toDataSet(table, tupleType);
  • 這里通過(guò)BatchTableEnvironment.toDataSet將Table轉(zhuǎn)換為DataSet

Data Types與Table Schema映射

Position-based Mapping(Tuple類型)

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

//---Tuple類型---

DataStream<Tuple2<Long, Integer>> stream = ...

// convert DataStream into Table with default field names "f0" and "f1"
Table table = tableEnv.fromDataStream(stream);

// convert DataStream into Table with field names "myLong" and "myInt"
Table table = tableEnv.fromDataStream(stream, "myLong, myInt");
  • Position-based的映射要求新指定的字段名不能與input data type重名,如果沒有指定拱礁,則默認(rèn)從f0開始來(lái)命名原始類型;此模式適用于Tuple赞赖、Row類型绪爸,POJO類型不能使用此模式

Name-based Mapping(POJO類型)

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

//---Tuple類型---

DataStream<Tuple2<Long, Integer>> stream = ...

// convert DataStream into Table with default field names "f0" and "f1"
Table table = tableEnv.fromDataStream(stream);

// convert DataStream into Table with field "f1" only
Table table = tableEnv.fromDataStream(stream, "f1");

// convert DataStream into Table with swapped fields
Table table = tableEnv.fromDataStream(stream, "f1, f0");

// convert DataStream into Table with swapped fields and field names "myInt" and "myLong"
Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong");

//---POJO類型---

// Person is a POJO with fields "name" and "age"
DataStream<Person> stream = ...

// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
Table table = tableEnv.fromDataStream(stream);

// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName");

// convert DataStream into Table with projected field "name" (name-based)
Table table = tableEnv.fromDataStream(stream, "name");

// convert DataStream into Table with projected and renamed field "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName");
  • Tuple或者POJO類型都可以使用這種模式蛉拙,也可以使用as進(jìn)行別名

Atomic類型

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Long> stream = ...

// convert DataStream into Table with default field name "f0"
Table table = tableEnv.fromDataStream(stream);

// convert DataStream into Table with field name "myLong"
Table table = tableEnv.fromDataStream(stream, "myLong");
  • 原始類型被轉(zhuǎn)換為單個(gè)字段

Row類型

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
DataStream<Row> stream = ...

// convert DataStream into Table with default field names "name", "age"
Table table = tableEnv.fromDataStream(stream);

// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
Table table = tableEnv.fromDataStream(stream, "myName, myAge");

// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge");

// convert DataStream into Table with projected field "name" (name-based)
Table table = tableEnv.fromDataStream(stream, "name");

// convert DataStream into Table with projected and renamed field "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName");
  • Row類型支持任意數(shù)量的字段,并允許字段值為null觉增,它可以使用Position-based Mapping及Name-based Mapping

小結(jié)

flink的Table API及SQL Programs的基本用法

  • 首先是創(chuàng)建TableEnvironment(BatchTableEnvironment或者StreamTableEnvironment)兵拢,之后就是創(chuàng)建Table或者TableSource并注冊(cè)到catalog(默認(rèn)使用的catalog是internal的,也可以自己選擇注冊(cè)external catalog)逾礁,然后就進(jìn)行table的query卵佛,之后就是一些轉(zhuǎn)換操作
  • 關(guān)于Table的創(chuàng)建可以從DataSet、DataStream轉(zhuǎn)換過(guò)來(lái)敞斋;關(guān)于Table的查詢可以使用api query(scan方法),也可以使用sql query(sqlQuery方法)疾牲,或者是混合使用
  • 也可以將查詢的Table轉(zhuǎn)換為DataSet或者DataStream進(jìn)行其他處理植捎;如果輸出也是輸出到table的話,可以注冊(cè)TableSink阳柔,然后使用TableEnvironment的sqlUpdate方法或Table的insertInto方法輸出到TableSink

doc

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末焰枢,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子舌剂,更是在濱河造成了極大的恐慌济锄,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,941評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件霍转,死亡現(xiàn)場(chǎng)離奇詭異荐绝,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)避消,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門低滩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人岩喷,你說(shuō)我怎么就攤上這事恕沫。” “怎么了纱意?”我有些...
    開封第一講書人閱讀 165,345評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵婶溯,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我偷霉,道長(zhǎng)迄委,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,851評(píng)論 1 295
  • 正文 為了忘掉前任类少,我火速辦了婚禮跑筝,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘瞒滴。我一直安慰自己曲梗,他們只是感情好赞警,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,868評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著虏两,像睡著了一般愧旦。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上定罢,一...
    開封第一講書人閱讀 51,688評(píng)論 1 305
  • 那天笤虫,我揣著相機(jī)與錄音,去河邊找鬼祖凫。 笑死琼蚯,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的惠况。 我是一名探鬼主播遭庶,決...
    沈念sama閱讀 40,414評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼稠屠!你這毒婦竟也來(lái)了峦睡?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,319評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤权埠,失蹤者是張志新(化名)和其女友劉穎榨了,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體攘蔽,經(jīng)...
    沈念sama閱讀 45,775評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡龙屉,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了满俗。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片叔扼。...
    茶點(diǎn)故事閱讀 40,096評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖漫雷,靈堂內(nèi)的尸體忽然破棺而出瓜富,到底是詐尸還是另有隱情,我是刑警寧澤降盹,帶...
    沈念sama閱讀 35,789評(píng)論 5 346
  • 正文 年R本政府宣布与柑,位于F島的核電站,受9級(jí)特大地震影響蓄坏,放射性物質(zhì)發(fā)生泄漏价捧。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,437評(píng)論 3 331
  • 文/蒙蒙 一涡戳、第九天 我趴在偏房一處隱蔽的房頂上張望结蟋。 院中可真熱鬧,春花似錦渔彰、人聲如沸嵌屎。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)宝惰。三九已至植榕,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間尼夺,已是汗流浹背尊残。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留淤堵,地道東北人寝衫。 一個(gè)月前我還...
    沈念sama閱讀 48,308評(píng)論 3 372
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像拐邪,于是被迫代替她去往敵國(guó)和親慰毅。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,037評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容