簡介
Flink具有兩個(gè)關(guān)系A(chǔ)PI - 表API和SQL - 用于統(tǒng)一流和批處理。Table API是Scala和Java的語言集成查詢API,允許以非常直觀的方式組合來自關(guān)系運(yùn)算符的查詢丁恭,Table API和SQL接口彼此緊密集成,以及Flink的DataStream和DataSet API斋日。
Flink SQL的編程模型
創(chuàng)建一個(gè)TableEnvironment
TableEnvironment是Table API和SQL集成的核心概念牲览,它主要負(fù)責(zé):
- 在內(nèi)部目錄中注冊一個(gè)Table
- 注冊一個(gè)外部目錄
- 執(zhí)行SQL查詢
- 注冊一個(gè)用戶自定義函數(shù)(標(biāo)量、表及聚合)
- 將DataStream或者DataSet轉(zhuǎn)換成Table
- 持有ExecutionEnvironment或者StreamExecutionEnvironment的引用 一個(gè)Table總是會(huì)綁定到一個(gè)指定的TableEnvironment中恶守,相同的查詢不同的TableEnvironment是無法通過join第献、union合并在一起。 TableEnvironment有一個(gè)在內(nèi)部通過表名組織起來的表目錄兔港,Table API或者SQL查詢可以訪問注冊在目錄中的表庸毫,并通過名稱來引用它們。
在目錄中注冊表
TableEnvironment允許通過各種源來注冊一個(gè)表:
- 一個(gè)已存在的Table對(duì)象衫樊,通常是Table API或者SQL查詢的結(jié)果
Table projTable = tableEnv.scan("X").select(...);
- TableSource飒赃,可以訪問外部數(shù)據(jù)如文件、數(shù)據(jù)庫或者消息系統(tǒng)
TableSource csvSource = new CsvTableSource("/path/to/file", ...);
- DataStream或者DataSet程序中的DataStream或者DataSet //將DataSet轉(zhuǎn)換為
Table Table table= tableEnv.fromDataSet(tableset);
注冊TableSink
注冊TableSink可用于將 Table API或SQL查詢的結(jié)果發(fā)送到外部存儲(chǔ)系統(tǒng)科侈,例如數(shù)據(jù)庫载佳,鍵值存儲(chǔ),消息隊(duì)列或文件系統(tǒng)(在不同的編碼中臀栈,例如蔫慧,CSV,Apache [Parquet] 权薯,Avro姑躲,ORC],......):
TableSink csvSink = new CsvTableSink("/path/to/file", ...);
String[] fieldNames = {"a", "b", "c"};
TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
示例
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
List list = new ArrayList();
String wordsStr = "Hello Flink Hello TOM";
String[] words = wordsStr.split("\\W+");
for(String word : words){
WC wc = new WC(word, 1);
list.add(wc);
}
DataSet<WC> input = env.fromCollection(list);
tEnv.registerDataSet("WordCount", input, "word, frequency");
Table table = tEnv.sqlQuery(
"SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
DataSet<WC> result = tEnv.toDataSet(table, WC.class);
result.print();
}
public static class WC {
public String word;//hello
public long frequency;//1
// public constructor to make it a Flink POJO
public WC() {}
public WC(String word, long frequency) {
this.word = word;
this.frequency = frequency;
}
@Override
public String toString() {
return "WC " + word + " " + frequency;
}
}
由于Table API是Scala和Java的語言集成查詢API黍析,所以maven需要把scala的pom依賴加進(jìn)來
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.8.0</version>
</dependency>