使用Flink批處理完成數(shù)據(jù)比對(對賬)二

使用Flink批處理完成數(shù)據(jù)比對(對賬)一中,我們只是簡單的實現(xiàn)了F000/F113/F114的情況淘这,如果我的需求場景需要實現(xiàn)F115的場景該怎么辦呢?

編寫代碼

在上一篇文章的基礎(chǔ)上完成代碼如下:

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;

import java.util.List;

/***
 * <strong>對賬流程</strong>
 * <ol>
 * <li>兩方文件處理如下:</li>
 * <ul>
 * <li>所有唯一性字段(如OrderNO)存放到一個table1</li>
 * <li>所有唯一性字段+比較字段(如OrderNO+OrderMoney)存放到一個table2</li>
 * </ul>
 * <li>比對
 * <ul>
 * <li>兩個文件的table1做差集可以得到F113、F114</li>
 * <li>兩個文件的table1做交集可以得到F000+F115</li>
 * <li>兩個文件的set2做差集可以得到F113+F115</li>
 * <li>F113+F115去除比較字段磺樱,只留下關(guān)鍵字段</li>
 * <li>去除F113+F115中的F113誓竿,得到F115</li>
 * <li>去除F000+F115中的F115磅网,得到F000</li>
 * </ul>
 * </ol>
 */
public class BatchJob2 {

    public static void main(String[] args) throws Exception {
        // set up the batch execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // Table Environment
        BatchTableEnvironment tableEnvironment = BatchTableEnvironment.getTableEnvironment(env);

        /**
         * 構(gòu)造兩個數(shù)據(jù)集,實際生產(chǎn)從自己需要的source中獲取即可
         */
        // 只包含唯一性(用于關(guān)聯(lián))字段的數(shù)據(jù)源
        DataSource<String> dataSourceA_unique = env.fromElements("orderId_1_f113", "orderId_2_f000", "orderId_3_f115");
        DataSource<String> dataSourceB_unique = env.fromElements("orderId_2_f000", "orderId_3_f115", "orderId_4_f114");
        // 包含唯一性字段和比較字段
        DataSource<String> dataSourceA_compare = env.fromElements("orderId_1_f113:payment_1", "orderId_2_f000:payment_2", "orderId_3_f115:payment_33");
        DataSource<String> dataSourceB_compare = env.fromElements("orderId_2_f000:payment_2", "orderId_3_f115:payment_333", "orderId_4_f114:payment_4");

        // 轉(zhuǎn)換成table
        Table tableA_unique = tableEnvironment.fromDataSet(dataSourceA_unique);
        Table tableB_unique = tableEnvironment.fromDataSet(dataSourceB_unique);
        Table tableA_compare = tableEnvironment.fromDataSet(dataSourceA_compare);
        Table tableB_compare = tableEnvironment.fromDataSet(dataSourceB_compare);

        /**
         * 核心對賬邏輯
         */
        Table f113_table = tableA_unique.minusAll(tableB_unique);
        Table f114_table = tableB_unique.minusAll(tableA_unique);
        Table f000_f115_table = tableA_unique.intersect(tableB_unique);

        Table f113_f115_compare_table = tableA_compare.minusAll(tableB_compare);
        // 拆分筷屡,留下唯一性字段
        Table f113_f115_table = convert(tableEnvironment, f113_f115_compare_table);

        Table f115_table = f113_f115_table.minusAll(f113_table);
        Table f000_table = f000_f115_table.minusAll(f115_table);

        DataSet<String> f000 = tableEnvironment.toDataSet(f000_table, String.class);
        DataSet<String> f113 = tableEnvironment.toDataSet(f113_table, String.class);
        DataSet<String> f114 = tableEnvironment.toDataSet(f114_table, String.class);
        DataSet<String> f115 = tableEnvironment.toDataSet(f115_table, String.class);


        /**
         * 輸出涧偷,實際輸出到自己需要的sink即可
         */
        List<String> f000_list = f000.collect();
        List<String> f113_list = f113.collect();
        List<String> f114_list = f114.collect();
        List<String> f115_list = f115.collect();

        System.out.println("==============================");
        System.out.println("f000 ->" + f000_list);
        System.out.println("==============================");
        System.out.println("f113 ->" + f113_list);
        System.out.println("==============================");
        System.out.println("f114 ->" + f114_list);
        System.out.println("==============================");
        System.out.println("f115 ->" + f115_list);

    }

    private static Table convert(BatchTableEnvironment tableEnvironment, Table inputTable) {
        DataSet<String> f000_compare_dataset = tableEnvironment.toDataSet(inputTable, String.class);
        MapOperator<String, String> map = f000_compare_dataset.map(e -> {
            return e.split(":")[0];// 留下前半段,關(guān)鍵字段
        });
        return tableEnvironment.fromDataSet(map);
    }
}

中間的處理邏輯在代碼中對注釋清楚了毙死。

源碼

源碼

總結(jié)

需要知道兩邊都有數(shù)據(jù)(訂單號相同)但存在差異的情況需要處理的步驟多點燎潮。
如果你有更好的想法,歡迎留言扼倘,多多指教确封。
轉(zhuǎn)載請注明出處

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市再菊,隨后出現(xiàn)的幾起案子爪喘,更是在濱河造成了極大的恐慌,老刑警劉巖袄简,帶你破解...
    沈念sama閱讀 219,539評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件腥放,死亡現(xiàn)場離奇詭異,居然都是意外死亡绿语,警方通過查閱死者的電腦和手機秃症,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評論 3 396
  • 文/潘曉璐 我一進店門候址,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人种柑,你說我怎么就攤上這事岗仑。” “怎么了聚请?”我有些...
    開封第一講書人閱讀 165,871評論 0 356
  • 文/不壞的土叔 我叫張陵荠雕,是天一觀的道長。 經(jīng)常有香客問我驶赏,道長炸卑,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,963評論 1 295
  • 正文 為了忘掉前任煤傍,我火速辦了婚禮盖文,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘蚯姆。我一直安慰自己五续,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,984評論 6 393
  • 文/花漫 我一把揭開白布龄恋。 她就那樣靜靜地躺著疙驾,像睡著了一般。 火紅的嫁衣襯著肌膚如雪郭毕。 梳的紋絲不亂的頭發(fā)上它碎,一...
    開封第一講書人閱讀 51,763評論 1 307
  • 那天,我揣著相機與錄音铣卡,去河邊找鬼链韭。 笑死,一個胖子當(dāng)著我的面吹牛煮落,可吹牛的內(nèi)容都是我干的敞峭。 我是一名探鬼主播,決...
    沈念sama閱讀 40,468評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼蝉仇,長吁一口氣:“原來是場噩夢啊……” “哼旋讹!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起轿衔,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤沉迹,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后害驹,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體鞭呕,經(jīng)...
    沈念sama閱讀 45,850評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,002評論 3 338
  • 正文 我和宋清朗相戀三年宛官,在試婚紗的時候發(fā)現(xiàn)自己被綠了葫松。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片瓦糕。...
    茶點故事閱讀 40,144評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖腋么,靈堂內(nèi)的尸體忽然破棺而出咕娄,到底是詐尸還是另有隱情,我是刑警寧澤珊擂,帶...
    沈念sama閱讀 35,823評論 5 346
  • 正文 年R本政府宣布圣勒,位于F島的核電站,受9級特大地震影響摧扇,放射性物質(zhì)發(fā)生泄漏圣贸。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,483評論 3 331
  • 文/蒙蒙 一扛稽、第九天 我趴在偏房一處隱蔽的房頂上張望旁趟。 院中可真熱鬧,春花似錦庇绽、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,026評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至凡傅,卻和暖如春辟狈,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背夏跷。 一陣腳步聲響...
    開封第一講書人閱讀 33,150評論 1 272
  • 我被黑心中介騙來泰國打工哼转, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人槽华。 一個月前我還...
    沈念sama閱讀 48,415評論 3 373
  • 正文 我出身青樓壹蔓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親猫态。 傳聞我的和親對象是個殘疾皇子佣蓉,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,092評論 2 355

推薦閱讀更多精彩內(nèi)容