看了幾天flink烦味,剛?cè)腴T峦椰。
簡單說下對flink的感受龄寞,flink有4層(有些說3層,將Table API和SQL看成一層)API汤功,越底層物邑,對數(shù)據(jù)的操作就越精細,越高層完成功能所需要的代碼就越少滔金,而且代碼越易讀色解。
api使用起來很像java中的stream,這個其實很顯然餐茵,都是為了對流數(shù)據(jù)進行處理科阎。感覺就像flink是java中并行流的分布式版本,所以對stream熟悉的話忿族,flink上手不難锣笨,或者說使用flink編寫代碼并不難。
Flink的編程模式:輸入(source) -> 處理(轉(zhuǎn)換transform) -> 輸出(sink)道批,3部分错英,相當(dāng)清爽。
統(tǒng)一術(shù)語
數(shù)據(jù)比對一般針對兩個數(shù)據(jù)集A/B屹徘,在選定一個基準方A后走趋,定義如下:
F000:A/B兩方數(shù)據(jù)相同
F113:A中存在,但B中沒有噪伊,A比B多
F114:B中存在簿煌,但A中沒有,B比A多
F115:A與B的關(guān)鍵字段相同鉴吹,但畢竟字段不同姨伟,如A與B都有同一筆訂單,但訂單金額不同
新建工程
這里我們使用官方提供的quickstart做模板豆励,如果是比較新版的idea(如2020.1)里面直接有flink的quickstart模板夺荒,舊版的idea的話,需要自己添加一下良蒸。
下次使用的時候可以直接從這里看到:
如果你使用的是scala技扼,ArtifactId則填flink-quickstart-scala。具體的版本信息可以根據(jù)最新版的填寫嫩痰。
添加Table API依賴
在pom.xml中添加Table API依賴剿吻。
<!-- Table API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Table API需要scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
編寫代碼
利用模板里的BatchJob來編寫:
package com.flink;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import java.util.List;
/**
* Skeleton for a Flink Batch Job.
*
* <p>For a tutorial how to write a Flink batch application, check the
* tutorials and examples on the <a >Flink Website</a>.
*
* <p>To package your application into a JAR file for execution,
* change the main class in the POM.xml file to this class (simply search for 'mainClass')
* and run 'mvn clean package' on the command line.
*/
public class BatchJob {
public static void main(String[] args) throws Exception {
// set up the batch execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Table Environment
BatchTableEnvironment tableEnvironment = BatchTableEnvironment.getTableEnvironment(env);
/**
* 構(gòu)造兩個數(shù)據(jù)集,實際生產(chǎn)從自己需要的source中獲取即可
*/
DataSource<String> dataSourceA_unique = env.fromElements("orderId_1_f113", "orderId_2_f000", "orderId_3_f115");
DataSource<String> dataSourceB_unique = env.fromElements("orderId_2_f000", "orderId_3_f115", "orderId_4_f114");
// 轉(zhuǎn)換成table
Table tableA_unique = tableEnvironment.fromDataSet(dataSourceA_unique);
Table tableB_unique = tableEnvironment.fromDataSet(dataSourceB_unique);
/**
* 核心比對(對賬)邏輯
*/
Table f113_table = tableA_unique.minusAll(tableB_unique);// 差集
Table f114_table = tableB_unique.minusAll(tableA_unique);// 差集
Table f000_table = tableA_unique.intersect(tableB_unique);// 交集
// 轉(zhuǎn)回DataSet用于輸出
DataSet<String> f000 = tableEnvironment.toDataSet(f000_table, String.class);
DataSet<String> f113 = tableEnvironment.toDataSet(f113_table, String.class);
DataSet<String> f114 = tableEnvironment.toDataSet(f114_table, String.class);
/**
* 輸出串纺,實際輸出到自己需要的sink即可
*/
List<String> f000_list = f000.collect();
List<String> f113_list = f113.collect();
List<String> f114_list = f114.collect();
System.out.println("==============================");
System.out.println("f000 ->" + f000_list);
System.out.println("==============================");
System.out.println("f113 ->" + f113_list);
System.out.println("==============================");
System.out.println("f114 ->" + f114_list);
// 批處理不需要顯示調(diào)用execute丽旅,否則會報錯
// env.execute("Flink Batch Java API Skeleton");
}
}
簡單說下幾個關(guān)鍵點:
- 使用Table API需要創(chuàng)建對應(yīng)的執(zhí)行環(huán)境:
BatchTableEnvironment tableEnvironment = BatchTableEnvironment.getTableEnvironment(env);
- 模板代碼中最后顯式調(diào)用
env.execute()
,其實在批處理中不需要纺棺,顯式調(diào)用反而會報錯榄笙。
源碼
總結(jié)
本質(zhì)上就是利用Table API中對數(shù)據(jù)集的處理函數(shù)(交集、差集)來完成數(shù)據(jù)比對祷蝌。
如果你有更好的想法茅撞,歡迎留言,多多指教巨朦。
轉(zhuǎn)載請注明出處