序
本文主要研究一下flink的Table API及SQL Programs
實(shí)例
// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// create a TableEnvironment
// for batch programs use BatchTableEnvironment instead of StreamTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register a Table
tableEnv.registerTable("table1", ...) // or
tableEnv.registerTableSource("table2", ...); // or
tableEnv.registerExternalCatalog("extCat", ...);
// register an output Table
tableEnv.registerTableSink("outputTable", ...);
// create a Table from a Table API query
Table tapiResult = tableEnv.scan("table1").select(...);
// create a Table from a SQL query
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... ");
// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.insertInto("outputTable");
// execute
env.execute();
- 本實(shí)例展示了flink的Table API及SQL Programs的基本用法
Table API實(shí)例
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register Orders table
// scan registered Orders table
Table orders = tableEnv.scan("Orders");
// compute revenue for all customers from France
Table revenue = orders
.filter("cCountry === 'FRANCE'")
.groupBy("cID, cName")
.select("cID, cName, revenue.sum AS revSum");
// emit or convert Table
// execute query
- 通過(guò)tableEnv.scan方法來(lái)創(chuàng)建Table普筹,之后使用Table的各種查詢api
sqlQuery實(shí)例
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register Orders table
// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// emit or convert Table
// execute query
- sqlQuery內(nèi)部是使用Apache Calcite來(lái)實(shí)現(xiàn)的
sqlUpdate實(shí)例(TableSink
)
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// register "Orders" table
// register "RevenueFrance" output table
// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.sqlUpdate(
"INSERT INTO RevenueFrance " +
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// execute query
- 這里使用TableSink注冊(cè)output table之后前硫,就可以使用TableEnvironment的sqlUpdate方法sink到output table
insertInto實(shí)例(TableSink
)
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// create a TableSink
TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");
// register the TableSink with a specific schema
String[] fieldNames = {"a", "b", "c"};
TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink);
// compute a result Table using Table API operators and/or SQL queries
Table result = ...
// emit the result Table to the registered TableSink
result.insertInto("CsvSinkTable");
// execute the program
- 通過(guò)Table.insertInto方法sink到output table
DataStream(或DataSet
)與Table轉(zhuǎn)換
注冊(cè)DataStream為Table
// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Long, String>> stream = ...
// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream);
// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
- 通過(guò)StreamTableEnvironment.registerDataStream注冊(cè)DataStream為Table
DataStream轉(zhuǎn)Table實(shí)例
// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Tuple2<Long, String>> stream = ...
// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);
// Convert the DataStream into a Table with fields "myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
- 這里通過(guò)StreamTableEnvironment.fromDataStream將DataStream轉(zhuǎn)為Table
Table轉(zhuǎn)DataStream實(shí)例
// get StreamTableEnvironment.
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// Table with two fields (String name, Integer age)
Table table = ...
// convert the Table into an append DataStream of Row by specifying the class
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// convert the Table into an append DataStream of Tuple2<String, Integer>
// via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple =
tableEnv.toAppendStream(table, tupleType);
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
DataStream<Tuple2<Boolean, Row>> retractStream =
tableEnv.toRetractStream(table, Row.class);
- 這里通過(guò)StreamTableEnvironment.toRetractStream將Table轉(zhuǎn)換為DataStream
Table轉(zhuǎn)DataSet實(shí)例
// get BatchTableEnvironment
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// Table with two fields (String name, Integer age)
Table table = ...
// convert the Table into a DataSet of Row by specifying a class
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
// convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataSet<Tuple2<String, Integer>> dsTuple =
tableEnv.toDataSet(table, tupleType);
- 這里通過(guò)BatchTableEnvironment.toDataSet將Table轉(zhuǎn)換為DataSet
Data Types與Table Schema映射
Position-based Mapping(Tuple類型
)
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
//---Tuple類型---
DataStream<Tuple2<Long, Integer>> stream = ...
// convert DataStream into Table with default field names "f0" and "f1"
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with field names "myLong" and "myInt"
Table table = tableEnv.fromDataStream(stream, "myLong, myInt");
- Position-based的映射要求新指定的字段名不能與input data type重名,如果沒有指定拱礁,則默認(rèn)從f0開始來(lái)命名原始類型;此模式適用于Tuple赞赖、Row類型绪爸,POJO類型不能使用此模式
Name-based Mapping(POJO類型
)
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
//---Tuple類型---
DataStream<Tuple2<Long, Integer>> stream = ...
// convert DataStream into Table with default field names "f0" and "f1"
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with field "f1" only
Table table = tableEnv.fromDataStream(stream, "f1");
// convert DataStream into Table with swapped fields
Table table = tableEnv.fromDataStream(stream, "f1, f0");
// convert DataStream into Table with swapped fields and field names "myInt" and "myLong"
Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong");
//---POJO類型---
// Person is a POJO with fields "name" and "age"
DataStream<Person> stream = ...
// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName");
// convert DataStream into Table with projected field "name" (name-based)
Table table = tableEnv.fromDataStream(stream, "name");
// convert DataStream into Table with projected and renamed field "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName");
- Tuple或者POJO類型都可以使用這種模式蛉拙,也可以使用as進(jìn)行別名
Atomic類型
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
DataStream<Long> stream = ...
// convert DataStream into Table with default field name "f0"
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with field name "myLong"
Table table = tableEnv.fromDataStream(stream, "myLong");
- 原始類型被轉(zhuǎn)換為單個(gè)字段
Row類型
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
DataStream<Row> stream = ...
// convert DataStream into Table with default field names "name", "age"
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
Table table = tableEnv.fromDataStream(stream, "myName, myAge");
// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
// convert DataStream into Table with projected field "name" (name-based)
Table table = tableEnv.fromDataStream(stream, "name");
// convert DataStream into Table with projected and renamed field "myName" (name-based)
Table table = tableEnv.fromDataStream(stream, "name as myName");
- Row類型支持任意數(shù)量的字段,并允許字段值為null觉增,它可以使用Position-based Mapping及Name-based Mapping
小結(jié)
flink的Table API及SQL Programs的基本用法
- 首先是創(chuàng)建TableEnvironment(
BatchTableEnvironment或者StreamTableEnvironment
)兵拢,之后就是創(chuàng)建Table或者TableSource并注冊(cè)到catalog(默認(rèn)使用的catalog是internal的,也可以自己選擇注冊(cè)external catalog
)逾礁,然后就進(jìn)行table的query卵佛,之后就是一些轉(zhuǎn)換操作 - 關(guān)于Table的創(chuàng)建可以從DataSet、DataStream轉(zhuǎn)換過(guò)來(lái)敞斋;關(guān)于Table的查詢可以使用api query(
scan方法
),也可以使用sql query(sqlQuery方法
)疾牲,或者是混合使用 - 也可以將查詢的Table轉(zhuǎn)換為DataSet或者DataStream進(jìn)行其他處理植捎;如果輸出也是輸出到table的話,可以注冊(cè)TableSink阳柔,然后使用TableEnvironment的sqlUpdate方法或Table的insertInto方法輸出到TableSink