Flink - 當數(shù)據(jù)流入window時屯断,會發(fā)生什么

上篇文章對Flink中常見的windowAssigner,如:TumblingEventTimeWindow, SlidingEventTimeWindow, EventTimeSessionWindows 等中的默認提供的trigger:EventTimeTrigger進行了剖析,討論了trigger注冊的回調(diào)函數(shù)的執(zhí)行殿遂,trigger會觸發(fā)怎樣的triggerResult命咐,如何處理遲到數(shù)據(jù)的trigger觸發(fā)篡九,以及提醒了需要注意的幾點,明白flink中的EventTimeTrigger為什么這么寫醋奠。

這篇文章就討論下當數(shù)據(jù)流入window時榛臼,會發(fā)生什么伊佃。我們著重分析的類就是上篇文章中提到的WindowOperator類。
在討論數(shù)據(jù)流入window會觸發(fā)的一系列動作之前沛善,我們需要明確一個window操作包括哪些部分航揉。

  • window assigner 指明數(shù)據(jù)流中的數(shù)據(jù)屬于哪個window
  • trigger 指明在哪些條件下觸發(fā)window計算,基于處理數(shù)據(jù)時的時間以及事件的特定屬性金刁、
  • evictor 可選組件迷捧,在window執(zhí)行計算前或后,將window中的數(shù)據(jù)移除胀葱,如使用globalWindow時漠秋,由于該window的默認trigger為永不觸發(fā),所以既需要實現(xiàn)自定義trigger抵屿,也需要實現(xiàn)evictor庆锦,移除部分已經(jīng)計算完畢的數(shù)據(jù)。
  • window process flink默認提供的有 ReduceFunction,AggragateFunction.還可以自定義實現(xiàn) windowProcessFunction轧葛。作用便是當trigger返回FIRE結(jié)果時搂抒,計算window的結(jié)果。

這篇文章尿扯,先不討論window的early emit求晶,即window在觸發(fā)完整的計算之前,為減小延遲而進行的提前計算衷笋。不過通過上面的四個組件芳杏,也可以想明白,只要自定義的trigger定時或定量或根據(jù)某條件在window觸發(fā)完整計算之前辟宗,產(chǎn)生FIRE結(jié)果爵赵,便可使用window process先行根據(jù)目前window中的不完整數(shù)據(jù),提前計算一個結(jié)果泊脐。

以下例子中空幻,我們使用以下前提:

  • 使用的window assigner實現(xiàn)的是 WindowAssigner 而不是 MergingWindowAssigner
  • 同時我們分析的 event-time 語義
  • window操作作用在 KeyedStream 上

接下來要分析的 WindowOperator 類可以看做是一個調(diào)度器,它持有window操作相關(guān)的所有組件容客,不包括evctor秕铛,因為含有evctor組件的window操作被封裝為 EvictingWindowOperator 。 WindowOperator 定義了
window assigner, trigger, windowProcessFunction 的執(zhí)行順序如何缩挑,它們之間的執(zhí)行邏輯等但两。

WindowOperator 類實現(xiàn)了 Triggerable 接口,為什么要實現(xiàn)這個接口呢调煎?這是為了方便為 window 指派 window 過期時的回調(diào)函數(shù)镜遣,因此 WindowOperater 類中實現(xiàn)了 onEventTime 與 onProcessTime 兩個方法己肮,分別對應不用語義下 window 過期時的回調(diào)函數(shù)的執(zhí)行邏輯士袄,即:當flink 決定刪除 window 時悲关,都做了什么操作,刪除了哪些東西娄柳。

WindowOperator 類也實現(xiàn)了 OneInputStreamOperator 接口寓辱,實現(xiàn)了其 processElement 方法,當新數(shù)據(jù)流入時赤拒,調(diào)用該方法秫筏。

在真正分析代碼前,有必要先說明下 WindowOperator 中的幾個內(nèi)部類:

  • WindowContext 每個window都有自己的context挎挖,持有該window與window的state这敬。在 WindowOperator 類中,該類對應 processContext 成員蕉朵。
  • Context 實現(xiàn)了 Trigger.OnMergeContext 接口崔涂。作為一個處理window中trigger的公共類,該類中持有key與window兩個成員始衅,方便根據(jù)key與window處理特定的trigger冷蚂。在 WindowOperator 類中,該類對應 triggerContext 成員汛闸。

另外蝙茶,在 WindowOperator 中有一個 windowState 成員,以 window 為 namespace诸老,以隔離不同的window的context隆夯。這里雖然叫做 windowState 。但是通過稍后的代碼可以發(fā)現(xiàn)别伏,該類存儲的是不同window中的對應的原始數(shù)據(jù)(processWindowFunction情況)或結(jié)果(ReduceFunction/AggregateFunction情況)吮廉。

有了上面的基本認識,下面分析畸肆,當數(shù)據(jù)流入window時宦芦,發(fā)生了什么。

首先轴脐,根據(jù)剛剛所說调卑,每一個流入的數(shù)據(jù)都會調(diào)用 processElement 方法:

public void processElement(StreamRecord<IN> element) throws Exception {
        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);

        //if element is handled by none of assigned elementWindows
        boolean isSkippedElement = true;

        final K key = this.<K>getKeyedStateBackend().getCurrentKey();

        if (windowAssigner instanceof MergingWindowAssigner) {

WindowOperator 類,首先使用用戶選擇的 windowAssigner 將流入的數(shù)據(jù)分配到響應的window中大咱,有可能是1個恬涧,0個甚至多個window。
第二句的 isSkippedElement 變量碴巾,在我們的前提下溯捆,沒有作用。
第三句獲取當前數(shù)據(jù)所在的KeyedStream的那個key上厦瓢。這個key在稍后的 triggerContext 成員中會用到提揍。
再下面的if語句不會進入啤月,在我們前提中,我們進入的是else語句:

} else {
            for (W window: elementWindows) {
                // drop if the window is already late
                if (isWindowLate(window)) {
                    continue;
                }
                isSkippedElement = false;

                windowState.setCurrentNamespace(window);
                windowState.add(element.getValue());

                triggerContext.key = key;
                triggerContext.window = window;

                TriggerResult triggerResult = triggerContext.onElement(element);

                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(window, contents);
                }
                if (triggerResult.isPurge()) {
                    windowState.clear();
                }

                registerCleanupTimer(window);
            }
        }

在 else 語句中劳跃,對該流入數(shù)據(jù)所分配到的每個window執(zhí)行以下邏輯:

  1. 判斷該window是否已過期谎仲。判斷條件如下:
protected boolean isWindowLate(W window) {
        return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
}
private long cleanupTime(W window) {
        if (windowAssigner.isEventTime()) {
            long cleanupTime = window.maxTimestamp() + allowedLateness;
            return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
        } else {
            return window.maxTimestamp();
        }
    }

通過該判斷可以看出,flink提供了 allowedLateness 變量用于指明允許遲到數(shù)據(jù)最多可以遲到多久刨仑,因此郑诺,window的過期時間不僅僅是其 maxTimestamp, 還需要加上等待遲到數(shù)據(jù)的時間杉武。

  1. 獲取該window的context辙诞,將數(shù)據(jù)加入。
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());

這里需要指出轻抱,flink提供了兩種 window process:

  • Incremental Aggregation Functions倘要。ReduceFunction 與 AggregateFunction ,其特點是無需保存 window 中的所有數(shù)據(jù)十拣,一旦新數(shù)據(jù)進入封拧,便可與之前的中間結(jié)果進行計算,因此這種 window 中其狀態(tài)僅需保存一個結(jié)果便可夭问。
  • ProcessWindowFunction泽西。用戶實現(xiàn) ProcessWindowFunction 的自定義處理邏輯,特點是保存了 window 的所有數(shù)據(jù)缰趋,只有觸發(fā)了 trigger 后才可以執(zhí)行計算捧杉。

因此這里 windowState 根據(jù) window 獲取到的 state 是不同的。針對第一種情況秘血,返回的是 HeapReducingState味抖, HeapAggregatingState ,當執(zhí)行到 windowState.add(element.getValue());語句時灰粮,便直接得出結(jié)果仔涩。而第二種情況,返回的是 HeapListState 粘舟,當執(zhí)行到 windowState.add(element.getValue());語句時熔脂,僅僅是將數(shù)據(jù)加入到list中。

  1. 繼續(xù)往下走柑肴,獲取該key下該 window 的trigger霞揉,并執(zhí)行trigger的 onElement 方法,來確定需不需要觸發(fā)計算晰骑。
triggerContext.key = key;
triggerContext.window = window;
TriggerResult triggerResult = triggerContext.onElement(element);

根據(jù)上篇的解釋可知适秩,在默認trigger下,僅當流入的是遲到數(shù)據(jù)才會在 onElement 中觸發(fā)trigger。
因此秽荞,這里大家就可以實現(xiàn)自己的trigger骤公,根據(jù)流入的每一個數(shù)據(jù),判斷是否需要觸發(fā)trigger蚂会,達到提前觸發(fā)計算的目的。

  1. 根據(jù)trigger的結(jié)果耗式,執(zhí)行不同的邏輯
                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();
                    if (contents == null) {
                        continue;
                    }
                    emitWindowContents(window, contents);
                }
                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
  • FIRE: 代表觸發(fā)window的計算胁住。首先從 windowState 中獲取內(nèi)容。由剛剛的分析知道刊咳,在 Incremental Aggregation Functions 情況下彪见,返回的是一個常量 : 計算結(jié)果。在 ProcessWindowFunction 情況下娱挨,返回的是當前window中的數(shù)據(jù)余指,一個list的iterator對象。然后執(zhí)行 emitWindowContents(window, contents); 語句
private void emitWindowContents(W window, ACC contents) throws Exception {
        timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
        processContext.window = window;
        userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
    }

該方法會調(diào)用用戶實現(xiàn)的計算邏輯(ProcessWindowFunction實現(xiàn)類)跷坝,將流入的數(shù)據(jù) contents 經(jīng)過計算酵镜,得到結(jié)果后寫入 timestampedCollector。

  • PURGE: 代表需要清除window柴钻。這里就是執(zhí)行 windowState.clear(); 語句淮韭。結(jié)果便是window的計算結(jié)果(Incremental Aggregation Functions 情況下)或者緩存的數(shù)據(jù)(ProcessWindwoFunction 情況下)清除,即:該window的狀態(tài)被清除贴届。但是此時window對象還未刪除靠粪,相關(guān)的trigger中的自定義狀態(tài)與 ProcessWindowFunction 中的狀態(tài)還未刪除。
  1. 最后毫蚓,為該window注冊失效后的回調(diào)函數(shù)占键,在window失效后,刪除window并做其他收尾工作元潘。
registerCleanupTimer(window);

前面說過了畔乙, WindowOperator 實現(xiàn)了 Triggerable 接口,且有 triggerContext 獲取當前正在處理的window的trigger來注冊回調(diào)函數(shù)翩概,registerCleanupTimer(window)方法如下:

protected void registerCleanupTimer(W window) {
        long cleanupTime = cleanupTime(window);
        if (cleanupTime == Long.MAX_VALUE) {
            // don't set a GC timer for "end of time"
            return;
        }

        if (windowAssigner.isEventTime()) {
            triggerContext.registerEventTimeTimer(cleanupTime);
        } else {
            triggerContext.registerProcessingTimeTimer(cleanupTime);
        }
    }

public void registerEventTimeTimer(long time) {
            internalTimerService.registerEventTimeTimer(window, time);
        }

通過上面的兩個方法可以看出啸澡,這里的回調(diào)函數(shù)并不是注冊在當前window的trigger中,而是注冊在 WindowOperator 內(nèi)部持有的一個 internalTimerService 中氮帐。

那該window是在何時才會失效呢嗅虏?

private long cleanupTime(W window) {
        if (windowAssigner.isEventTime()) {
            long cleanupTime = window.maxTimestamp() + allowedLateness;
            return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
        } else {
            return window.maxTimestamp();
        }
    }

window 在 watermark 的時間戳大于 maxTimestamp + allowedLateness 時,才會過期上沐,這便是 flink 提供的除了 watermark 外的另一種處理遲到數(shù)據(jù)的機制皮服。

我們再看看,window過期后,回調(diào)函數(shù)是怎么處理的龄广。

public void onEventTime(InternalTimer<K, W> timer) throws Exception {
        triggerContext.key = timer.getKey();
        triggerContext.window = timer.getNamespace();

        MergingWindowSet<W> mergingWindows;

        if (windowAssigner instanceof MergingWindowAssigner) {
            mergingWindows = getMergingWindowSet();
            W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
            if (stateWindow == null) {
                // Timer firing for non-existent window, this can only happen if a
                // trigger did not clean up timers. We have already cleared the merging
                // window and therefore the Trigger state, however, so nothing to do.
                return;
            } else {
                windowState.setCurrentNamespace(stateWindow);
            }
        } else {
            windowState.setCurrentNamespace(triggerContext.window);
            mergingWindows = null;
        }

        ACC contents = null;
        if (windowState != null) {
            contents = windowState.get();
        }

        if (contents != null) {
            TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
            if (triggerResult.isFire()) {
                emitWindowContents(triggerContext.window, contents);
            }
            if (triggerResult.isPurge()) {
                windowState.clear();
            }
        }

        if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
            clearAllState(triggerContext.window, windowState, mergingWindows);
        }

        if (mergingWindows != null) {
            // need to make sure to update the merging state in state
            mergingWindows.persist();
        }
    }

上面與 MergingWindowAssigner 相關(guān)的分支我們不進入分析硫眯。
為了方便分析,我們假設 window 的過期時間 maxTimestamp + allowedLateness = 2000 + 1500 = 3500择同。 當 watermark 的時間戳大于 3500 時两入,便會觸發(fā)該回調(diào)函數(shù),為了說明普遍性敲才,假設 watermark 的時間戳為 4000裹纳。

將與 MergingWindowAssigner 無關(guān)的語句去掉后,該方法的前面部分如下:

triggerContext.key = timer.getKey();
triggerContext.window = timer.getNamespace();

windowState.setCurrentNamespace(triggerContext.window);
mergingWindows = null;

ACC contents = null;
if (windowState != null) {
    contents = windowState.get();
}

首先紧武,將 triggerContext 根據(jù)key與window找到特定的trigger剃氧,同樣 windowState 根據(jù)window找到特定的window中的context,該context中存儲的是window的計算結(jié)果(Incremental Aggregation Functions 情況下)或者緩存的數(shù)據(jù)(ProcessWindwoFunction 情況下)阻星。
接下來:

       if (contents != null) {
            TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
            if (triggerResult.isFire()) {
                emitWindowContents(triggerContext.window, contents);
            }
            if (triggerResult.isPurge()) {
                windowState.clear();
            }
        }

        if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
            clearAllState(triggerContext.window, windowState, mergingWindows);
        }

以上代碼說明了當window過期朋鞍,觸發(fā)過期的回到函數(shù)時,都會做哪些操作妥箕。
可以看到滥酥,會先到該window的trigger中執(zhí)行 onEventTme 方法。此時的 timer.getTimestamp() 的值為 3500畦幢。而一般我們自定義的trigger中一般不會注冊一個時間為 maxTimestamp + allowedLateness 的回調(diào)函數(shù)恨狈。以flink的默認trigger - EventTimeTrigger 為例,其注冊的回調(diào)函數(shù)最大時間便是 maxTimestamp 呛讲。因此禾怠,除非用戶設置的 allowedLateness 為0,且在trigger中注冊了時間為 maxTmestamp 的回調(diào)函數(shù)贝搁,否則此處不會有triggerResult吗氏。

假設此處確實有對應的回調(diào)函數(shù)且被執(zhí)行,下面的兩個if條件的邏輯與上面分析 processElement 時一樣雷逆,不再贅述弦讽。

再往下走,調(diào)用了 clearAllState 方法膀哲,進入該方法:

private void clearAllState(W window,AppendingState<IN, ACC> windowState,MergingWindowSet<W> mergingWindows) throws Exception {
        windowState.clear();
        triggerContext.clear();
        processContext.window = window;
        processContext.clear();
        if (mergingWindows != null) {
            mergingWindows.retireWindow(window);
            mergingWindows.persist();
        }
}
  • windowState.clear() 將window中暫存的結(jié)果或原始數(shù)據(jù)刪除往产。
  • triggerContext.clear() 調(diào)用該window的trigger的clear()方法,刪除用戶自定義trigger中的自定義狀態(tài)某宪,同時刪除trigger的timer仿村。需要用戶在實現(xiàn)自定義trigger且使用自定義狀態(tài)時,實現(xiàn)該方法兴喂,方便此時調(diào)用清除狀態(tài)蔼囊,避免內(nèi)存問題焚志。
  • processContext.clear(); 調(diào)用用戶實現(xiàn)自定義邏輯的,ProcessWindwoFuncton接口實現(xiàn)類的clear()方法畏鼓,目的同上酱酬。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市云矫,隨后出現(xiàn)的幾起案子膳沽,更是在濱河造成了極大的恐慌,老刑警劉巖让禀,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件挑社,死亡現(xiàn)場離奇詭異,居然都是意外死亡堆缘,警方通過查閱死者的電腦和手機滔灶,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進店門普碎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來吼肥,“玉大人,你說我怎么就攤上這事麻车∽褐澹” “怎么了?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵动猬,是天一觀的道長啤斗。 經(jīng)常有香客問我,道長赁咙,這世上最難降的妖魔是什么钮莲? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮彼水,結(jié)果婚禮上崔拥,老公的妹妹穿的比我還像新娘。我一直安慰自己凤覆,他們只是感情好链瓦,可當我...
    茶點故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著盯桦,像睡著了一般慈俯。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上拥峦,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天贴膘,我揣著相機與錄音,去河邊找鬼略号。 笑死步鉴,一個胖子當著我的面吹牛揪胃,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播氛琢,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼喊递,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了阳似?” 一聲冷哼從身側(cè)響起骚勘,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎撮奏,沒想到半個月后俏讹,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡畜吊,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年泽疆,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片玲献。...
    茶點故事閱讀 40,561評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡殉疼,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出捌年,到底是詐尸還是另有隱情瓢娜,我是刑警寧澤,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布礼预,位于F島的核電站眠砾,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏托酸。R本人自食惡果不足惜褒颈,卻給世界環(huán)境...
    茶點故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望励堡。 院中可真熱鬧吵冒,春花似錦宣虾、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽燕锥。三九已至顽频,卻和暖如春菱蔬,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背砾层。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工漩绵, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人肛炮。 一個月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓止吐,卻偏偏與公主長得像宝踪,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子碍扔,可洞房花燭夜當晚...
    茶點故事閱讀 45,573評論 2 359

推薦閱讀更多精彩內(nèi)容