Flink intervalJoin 使用和原理分析

1.前言

Flink中基于DataStream的join壹士,只能實(shí)現(xiàn)在同一個(gè)窗口的兩個(gè)數(shù)據(jù)流進(jìn)行join,但是在實(shí)際中常常會(huì)存在數(shù)據(jù)亂序或者延時(shí)的情況钾埂,導(dǎo)致兩個(gè)流的數(shù)據(jù)進(jìn)度不一致球涛,就會(huì)出現(xiàn)數(shù)據(jù)跨窗口的情況庄敛,那么數(shù)據(jù)就無法在同一個(gè)窗口內(nèi)join。
Flink基于KeyedStream提供的interval join機(jī)制沽讹,intervaljoin 連接兩個(gè)keyedStream, 按照相同的key在一個(gè)相對(duì)數(shù)據(jù)時(shí)間的時(shí)間段內(nèi)進(jìn)行連接。

2.代碼示例

將訂單流與訂單品流通過訂單id進(jìn)行關(guān)聯(lián)武鲁,獲得訂單流中的會(huì)員id爽雄。
其中ds1就是訂單品流,ds2就是訂單流沐鼠,分別對(duì)ds1和ds2通過訂單id進(jìn)行keyBy操作挚瘟,得到兩個(gè)KeyedStream,再進(jìn)行intervalJoin操作饲梭;
between方法傳遞的兩個(gè)參數(shù)lowerBound和upperBound乘盖,用來控制右邊的流可以與哪個(gè)時(shí)間范圍內(nèi)的左邊的流進(jìn)行關(guān)聯(lián),即:
leftElement.timestamp + lowerBound <= rightElement.timestamp <= leftElement.timestamp + upperBound
相當(dāng)于左邊的流可以晚到lowerBound(lowerBound為負(fù)的話)時(shí)間憔涉,也可以早到upperBound(upperBound為正的話)時(shí)間订框。

DataStream<OrderItemBean> ds = ds1.keyBy(jo -> jo.getString("fk_tgou_order_id"))
                .intervalJoin(ds2.keyBy(jo -> jo.getString("id")))
                .between(Time.milliseconds(-5), Time.milliseconds(5))
                .process(new ProcessJoinFunction<JSONObject, JSONObject, OrderItemBean>() {

                    @Override
                    public void processElement(JSONObject joItem, JSONObject joOrder, Context context, Collector<OrderItemBean> collector) throws Exception {
                        String order_id = joItem.getString("fk_tgou_order_id");
                        String item_id = joItem.getString("activity_to_product_id");
                        String create_time = df.format(joItem.getLong("create_time"));
                        String member_id = joOrder.getString("fk_member_id");
                        Double price = joItem.getDouble("price");
                        Integer quantity = joItem.getInteger("quantity");
                        collector.collect(new OrderItemBean(order_id, item_id, create_time, member_id, price, quantity));
                    }
                });
ds.map(JSON::toJSONString).addSink(new FlinkKafkaProducer010<String>("berkeley-order-item", schema, produceConfig));
3.Interval Join源碼

<1> 使用Interval Join時(shí),必須要指定的時(shí)間類型為EventTime


image.png

<2>兩個(gè)KeyedStream在進(jìn)行intervalJoin并調(diào)用between方法后兜叨,跟著使用process方法穿扳;
process方法傳遞一個(gè)自定義的 ProcessJoinFunction 作為參數(shù),ProcessJoinFunction的三個(gè)參數(shù)就是左邊流的元素類型国旷,右邊流的元素類型矛物,輸出流的元素類型。

image.png

image.png

<3>intervalJoin跪但,底層是將兩個(gè)KeyedStream進(jìn)行connect操作履羞,得到ConnectedStreams,這樣的兩個(gè)數(shù)據(jù)流之間就可以實(shí)現(xiàn)狀態(tài)共享屡久,對(duì)于intervalJoin來說就是兩個(gè)流相同key的數(shù)據(jù)可以相互訪問忆首。
ConnectedStreams的keyby?被环?雄卷??

<4> 在ConnectedStreams之上執(zhí)行的操作就是IntervalJoinOperator


image.png

這里有兩個(gè)參數(shù)控制是否包括上下界蛤售,默認(rèn)都是包括的丁鹉。

a.initializeState()方法
這里面初始化了兩個(gè)狀態(tài)對(duì)象,

image.png

分別用來存儲(chǔ)兩個(gè)流的數(shù)據(jù)悴能,其中Long對(duì)應(yīng)數(shù)據(jù)的時(shí)間戳揣钦,List<BufferEntry<T1>>對(duì)應(yīng)相同時(shí)間戳的數(shù)據(jù)

b.processElement1和processElement2方法
方法描述的是,當(dāng)兩個(gè)流達(dá)到之后漠酿,比如左邊的流有數(shù)據(jù)到達(dá)之后冯凹,就去右邊的流去查找對(duì)應(yīng)上下界范圍內(nèi)的數(shù)據(jù)。這兩個(gè)方法調(diào)用的都是processElement方法。

private <THIS, OTHER> void processElement(
            final StreamRecord<THIS> record,
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
            final long relativeLowerBound,
            final long relativeUpperBound,
            final boolean isLeft) throws Exception {
                
        final THIS ourValue = record.getValue();
        final long ourTimestamp = record.getTimestamp();

        if (ourTimestamp == Long.MIN_VALUE) {
            throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
                    "interval stream joins need to have timestamps meaningful timestamps.");
        }

        if (isLate(ourTimestamp)) {
            return;
        }

        addToBuffer(ourBuffer, ourValue, ourTimestamp);

        for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
            final long timestamp  = bucket.getKey();

            if (timestamp < ourTimestamp + relativeLowerBound ||
                    timestamp > ourTimestamp + relativeUpperBound) {
                continue;
            }

            for (BufferEntry<OTHER> entry: bucket.getValue()) {
                if (isLeft) {
                    collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
                } else {
                    collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
                }
            }
        }

        long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
        if (isLeft) {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
        } else {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
        }
    }

(1)獲取記錄的值和時(shí)間戳宇姚,判斷是否延時(shí)匈庭,當(dāng)?shù)竭_(dá)的記錄的時(shí)間戳小于水位線時(shí),說明該數(shù)據(jù)延時(shí)浑劳,不去處理阱持,不去關(guān)聯(lián)另一條流的數(shù)據(jù)。


image.png
    private boolean isLate(long timestamp) {
        long currentWatermark = internalTimerService.currentWatermark();
        return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
    }

(2)將數(shù)據(jù)添加到對(duì)應(yīng)自己流的MapState緩存狀態(tài)中魔熏,key為數(shù)據(jù)的時(shí)間衷咽。
addToBuffer(ourBuffer, ourValue, ourTimestamp);

private static <T> void addToBuffer(
            final MapState<Long, List<IntervalJoinOperator.BufferEntry<T>>> buffer,
            final T value,
            final long timestamp) throws Exception {
        List<BufferEntry<T>> elemsInBucket = buffer.get(timestamp);
        if (elemsInBucket == null) {
            elemsInBucket = new ArrayList<>();
        }
        elemsInBucket.add(new BufferEntry<>(value, false));
        buffer.put(timestamp, elemsInBucket);
    }

(3)去遍歷另一條流的MapState,如果ourTimestamp + relativeLowerBound <=timestamp<= ourTimestamp + relativeUpperBound 蒜绽,則將數(shù)據(jù)輸出給ProcessJoinFunction調(diào)用镶骗,ourTimestamp表示流入的數(shù)據(jù)時(shí)間,timestamp表示對(duì)應(yīng)join的數(shù)據(jù)時(shí)間

        for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
            final long timestamp  = bucket.getKey();

            if (timestamp < ourTimestamp + relativeLowerBound ||
                    timestamp > ourTimestamp + relativeUpperBound) {
                continue;
            }

            for (BufferEntry<OTHER> entry: bucket.getValue()) {
                if (isLeft) {
                    collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
                } else {
                    collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
                }
            }
        }

對(duì)應(yīng)的collect方法:

   private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
        final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);

        collector.setAbsoluteTimestamp(resultTimestamp);
        context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);

        userFunction.processElement(left, right, context, collector);
    }

設(shè)置結(jié)果的Timestamp為兩邊流中最大的躲雅,之后執(zhí)行processElement方法


image.png

image.png

(4)注冊(cè)定時(shí)清理時(shí)間

        long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
        if (isLeft) {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
        } else {
            internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
        }

定時(shí)的清理時(shí)間鼎姊,就是當(dāng)下流入的數(shù)據(jù)的時(shí)間+relativeUpperBound,當(dāng)watermark大于該時(shí)間就需要清理相赁。

public void onEventTime(InternalTimer<K, String> timer) throws Exception {

        long timerTimestamp = timer.getTimestamp();
        String namespace = timer.getNamespace();

        logger.trace("onEventTime @ {}", timerTimestamp);

        switch (namespace) {
            case CLEANUP_NAMESPACE_LEFT: {
                long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
                logger.trace("Removing from left buffer @ {}", timestamp);
                leftBuffer.remove(timestamp);
                break;
            }
            case CLEANUP_NAMESPACE_RIGHT: {
                long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
                logger.trace("Removing from right buffer @ {}", timestamp);
                rightBuffer.remove(timestamp);
                break;
            }
            default:
                throw new RuntimeException("Invalid namespace " + namespace);
        }
    }

清理時(shí)間邏輯:
假設(shè)目前流到達(dá)的數(shù)據(jù)的時(shí)間戳為10s此蜈,between傳進(jìn)去的時(shí)間分別為1s,5s噪生,
upperBound為5s裆赵,lowerBound為1s
根據(jù) 左邊流時(shí)間戳+1s<=右邊時(shí)間戳<=左邊流時(shí)間戳+5s右邊時(shí)間戳-5s<=左邊流時(shí)間戳<=右邊時(shí)間戳-1s
a跺嗽。如果為左邊流數(shù)據(jù)到達(dá)战授,調(diào)用processElement1方法
此時(shí)relativeUpperBound為5,relativeLowerBound為1桨嫁,relativeUpperBound>0植兰,所以定時(shí)清理時(shí)間為10+5即15s
當(dāng)時(shí)間達(dá)到15s時(shí),清除左邊流數(shù)據(jù)璃吧,即看右邊流在15s時(shí)楣导,需要查找的左邊流時(shí)間范圍
10s<=左邊流時(shí)間戳<=14s,所以watermark>15s時(shí)可清除10s的數(shù)據(jù)畜挨。

image.png

b筒繁。如果為右邊流數(shù)據(jù)到達(dá),調(diào)用processElement2方法
此時(shí)relativeUpperBound為-1巴元,relativeLowerBound為-5毡咏,relativeUpperBound<0,所以定時(shí)清理時(shí)間為10s
當(dāng)時(shí)間達(dá)到10s時(shí)逮刨,清除右邊流數(shù)據(jù)呕缭,即看左邊流在10s時(shí),需要查找的右邊流時(shí)間范圍
11s<=右邊流時(shí)間戳<=15s,所以可以清除10s的數(shù)據(jù)恢总。


image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末迎罗,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子片仿,更是在濱河造成了極大的恐慌纹安,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,376評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件滋戳,死亡現(xiàn)場(chǎng)離奇詭異钻蔑,居然都是意外死亡啥刻,警方通過查閱死者的電腦和手機(jī)奸鸯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,126評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來可帽,“玉大人娄涩,你說我怎么就攤上這事∮掣” “怎么了蓄拣?”我有些...
    開封第一講書人閱讀 156,966評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)努隙。 經(jīng)常有香客問我球恤,道長(zhǎng),這世上最難降的妖魔是什么荸镊? 我笑而不...
    開封第一講書人閱讀 56,432評(píng)論 1 283
  • 正文 為了忘掉前任咽斧,我火速辦了婚禮,結(jié)果婚禮上躬存,老公的妹妹穿的比我還像新娘张惹。我一直安慰自己,他們只是感情好岭洲,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,519評(píng)論 6 385
  • 文/花漫 我一把揭開白布宛逗。 她就那樣靜靜地躺著,像睡著了一般盾剩。 火紅的嫁衣襯著肌膚如雪雷激。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,792評(píng)論 1 290
  • 那天告私,我揣著相機(jī)與錄音侥锦,去河邊找鬼。 笑死德挣,一個(gè)胖子當(dāng)著我的面吹牛肖油,可吹牛的內(nèi)容都是我干的叫胁。 我是一名探鬼主播凿滤,決...
    沈念sama閱讀 38,933評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼查近,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了雀摘?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,701評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎襟衰,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體粪摘,經(jīng)...
    沈念sama閱讀 44,143評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡瀑晒,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,488評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了徘意。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片苔悦。...
    茶點(diǎn)故事閱讀 38,626評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖椎咧,靈堂內(nèi)的尸體忽然破棺而出玖详,到底是詐尸還是另有隱情,我是刑警寧澤勤讽,帶...
    沈念sama閱讀 34,292評(píng)論 4 329
  • 正文 年R本政府宣布蟋座,位于F島的核電站,受9級(jí)特大地震影響脚牍,放射性物質(zhì)發(fā)生泄漏向臀。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,896評(píng)論 3 313
  • 文/蒙蒙 一诸狭、第九天 我趴在偏房一處隱蔽的房頂上張望券膀。 院中可真熱鬧,春花似錦作谚、人聲如沸三娩。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,742評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽雀监。三九已至,卻和暖如春眨唬,著一層夾襖步出監(jiān)牢的瞬間会前,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來泰國(guó)打工匾竿, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留瓦宜,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,324評(píng)論 2 360
  • 正文 我出身青樓岭妖,卻偏偏與公主長(zhǎng)得像临庇,于是被迫代替她去往敵國(guó)和親反璃。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,494評(píng)論 2 348