使用Flink批處理完成數(shù)據(jù)比對(對賬)一

看了幾天flink烦味,剛?cè)腴T峦椰。
簡單說下對flink的感受龄寞,flink有4層(有些說3層,將Table API和SQL看成一層)API汤功,越底層物邑,對數(shù)據(jù)的操作就越精細,越高層完成功能所需要的代碼就越少滔金,而且代碼越易讀色解。


image.png

api使用起來很像java中的stream,這個其實很顯然餐茵,都是為了對流數(shù)據(jù)進行處理科阎。感覺就像flink是java中并行流的分布式版本,所以對stream熟悉的話忿族,flink上手不難锣笨,或者說使用flink編寫代碼并不難。

Flink的編程模式:輸入(source) -> 處理(轉(zhuǎn)換transform) -> 輸出(sink)道批,3部分错英,相當(dāng)清爽。

統(tǒng)一術(shù)語

數(shù)據(jù)比對一般針對兩個數(shù)據(jù)集A/B屹徘,在選定一個基準方A后走趋,定義如下:
F000:A/B兩方數(shù)據(jù)相同
F113:A中存在,但B中沒有噪伊,A比B多
F114:B中存在簿煌,但A中沒有,B比A多
F115:A與B的關(guān)鍵字段相同鉴吹,但畢竟字段不同姨伟,如A與B都有同一筆訂單,但訂單金額不同

新建工程

這里我們使用官方提供的quickstart做模板豆励,如果是比較新版的idea(如2020.1)里面直接有flink的quickstart模板夺荒,舊版的idea的話,需要自己添加一下良蒸。


image.png
image.png

下次使用的時候可以直接從這里看到:


image.png

如果你使用的是scala技扼,ArtifactId則填flink-quickstart-scala。具體的版本信息可以根據(jù)最新版的填寫嫩痰。

添加Table API依賴

在pom.xml中添加Table API依賴剿吻。

<!-- Table API -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- Table API需要scala -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

編寫代碼

利用模板里的BatchJob來編寫:

package com.flink;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;

import java.util.List;

/**
 * Skeleton for a Flink Batch Job.
 *
 * <p>For a tutorial how to write a Flink batch application, check the
 * tutorials and examples on the <a >Flink Website</a>.
 *
 * <p>To package your application into a JAR file for execution,
 * change the main class in the POM.xml file to this class (simply search for 'mainClass')
 * and run 'mvn clean package' on the command line.
 */
public class BatchJob {

    public static void main(String[] args) throws Exception {
        // set up the batch execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // Table Environment
        BatchTableEnvironment tableEnvironment = BatchTableEnvironment.getTableEnvironment(env);

        /**
         * 構(gòu)造兩個數(shù)據(jù)集,實際生產(chǎn)從自己需要的source中獲取即可
         */
        DataSource<String> dataSourceA_unique = env.fromElements("orderId_1_f113", "orderId_2_f000", "orderId_3_f115");
        DataSource<String> dataSourceB_unique = env.fromElements("orderId_2_f000", "orderId_3_f115", "orderId_4_f114");


        // 轉(zhuǎn)換成table
        Table tableA_unique = tableEnvironment.fromDataSet(dataSourceA_unique);
        Table tableB_unique = tableEnvironment.fromDataSet(dataSourceB_unique);


        /**
         * 核心比對(對賬)邏輯
         */
        Table f113_table = tableA_unique.minusAll(tableB_unique);// 差集
        Table f114_table = tableB_unique.minusAll(tableA_unique);// 差集
        Table f000_table = tableA_unique.intersect(tableB_unique);// 交集

        // 轉(zhuǎn)回DataSet用于輸出
        DataSet<String> f000 = tableEnvironment.toDataSet(f000_table, String.class);
        DataSet<String> f113 = tableEnvironment.toDataSet(f113_table, String.class);
        DataSet<String> f114 = tableEnvironment.toDataSet(f114_table, String.class);


        /**
         * 輸出串纺,實際輸出到自己需要的sink即可
         */
        List<String> f000_list = f000.collect();
        List<String> f113_list = f113.collect();
        List<String> f114_list = f114.collect();

        System.out.println("==============================");
        System.out.println("f000 ->" + f000_list);
        System.out.println("==============================");
        System.out.println("f113 ->" + f113_list);
        System.out.println("==============================");
        System.out.println("f114 ->" + f114_list);



        // 批處理不需要顯示調(diào)用execute丽旅,否則會報錯
        // env.execute("Flink Batch Java API Skeleton");
    }

}

簡單說下幾個關(guān)鍵點:

  1. 使用Table API需要創(chuàng)建對應(yīng)的執(zhí)行環(huán)境:
BatchTableEnvironment tableEnvironment = BatchTableEnvironment.getTableEnvironment(env);
  1. 模板代碼中最后顯式調(diào)用env.execute(),其實在批處理中不需要纺棺,顯式調(diào)用反而會報錯榄笙。

源碼

源碼

總結(jié)

本質(zhì)上就是利用Table API中對數(shù)據(jù)集的處理函數(shù)(交集、差集)來完成數(shù)據(jù)比對祷蝌。
如果你有更好的想法茅撞,歡迎留言,多多指教巨朦。
轉(zhuǎn)載請注明出處

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末乡翅,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子罪郊,更是在濱河造成了極大的恐慌蠕蚜,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,265評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件悔橄,死亡現(xiàn)場離奇詭異靶累,居然都是意外死亡,警方通過查閱死者的電腦和手機癣疟,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評論 2 385
  • 文/潘曉璐 我一進店門挣柬,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人睛挚,你說我怎么就攤上這事邪蛔。” “怎么了扎狱?”我有些...
    開封第一講書人閱讀 156,852評論 0 347
  • 文/不壞的土叔 我叫張陵侧到,是天一觀的道長勃教。 經(jīng)常有香客問我,道長匠抗,這世上最難降的妖魔是什么故源? 我笑而不...
    開封第一講書人閱讀 56,408評論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮汞贸,結(jié)果婚禮上绳军,老公的妹妹穿的比我還像新娘。我一直安慰自己矢腻,他們只是感情好门驾,可當(dāng)我...
    茶點故事閱讀 65,445評論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著多柑,像睡著了一般奶是。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上顷蟆,一...
    開封第一講書人閱讀 49,772評論 1 290
  • 那天诫隅,我揣著相機與錄音,去河邊找鬼帐偎。 笑死逐纬,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的削樊。 我是一名探鬼主播豁生,決...
    沈念sama閱讀 38,921評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼漫贞!你這毒婦竟也來了甸箱?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,688評論 0 266
  • 序言:老撾萬榮一對情侶失蹤迅脐,失蹤者是張志新(化名)和其女友劉穎芍殖,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體谴蔑,經(jīng)...
    沈念sama閱讀 44,130評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡豌骏,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,467評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了隐锭。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片窃躲。...
    茶點故事閱讀 38,617評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖钦睡,靈堂內(nèi)的尸體忽然破棺而出蒂窒,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 34,276評論 4 329
  • 正文 年R本政府宣布洒琢,位于F島的核電站秧秉,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏纬凤。R本人自食惡果不足惜福贞,卻給世界環(huán)境...
    茶點故事閱讀 39,882評論 3 312
  • 文/蒙蒙 一撩嚼、第九天 我趴在偏房一處隱蔽的房頂上張望停士。 院中可真熱鬧,春花似錦完丽、人聲如沸恋技。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蜻底。三九已至,卻和暖如春聘鳞,著一層夾襖步出監(jiān)牢的瞬間薄辅,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評論 1 265
  • 我被黑心中介騙來泰國打工抠璃, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留站楚,地道東北人。 一個月前我還...
    沈念sama閱讀 46,315評論 2 360
  • 正文 我出身青樓搏嗡,卻偏偏與公主長得像窿春,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子采盒,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,486評論 2 348

推薦閱讀更多精彩內(nèi)容

  • 了解Flink是什么旧乞,F(xiàn)link應(yīng)用程序運行的多樣化,對比業(yè)界常用的流處理框架磅氨,F(xiàn)link的發(fā)展趨勢尺栖,F(xiàn)link生...
    JavaEdge閱讀 5,066評論 1 18
  • 前面的文章使用Flink批處理完成數(shù)據(jù)比對(對賬)二討論了使用Table API來處理數(shù)據(jù)比對的問題,但有些場景下...
    李不言被占用了閱讀 3,452評論 0 2
  • 我們搭建起一座座橋梁烦租,去深入這個世界延赌。 無論怎樣 要去熱愛這個世界。見山是山之前左权,要“執(zhí)迷不悔”地去體驗皮胡,見山不是...
    曦之日記閱讀 277評論 1 2
  • 今天早上起床,打開窗戶赏迟,拉開窗簾屡贺,就看見外面陰沉沉的天,又是個陰天,今年的雨水太多了甩栈,只要看天氣預(yù)報說有雨就一定會...
    夏日么么茶_f4e5閱讀 245評論 0 1
  • 她是那個讓你愿意變的更好的人泻仙,你讀書是想跟她聊聊更大的世界,你健身是想跟她去走走更遠的路量没,你賺錢是想跟她過過...
    知非詩詩未為奇奇閱讀 317評論 0 0