在Flink中,使用event-time模式時冠胯,默認提供的window有TumblingEventTimeWindows火诸,SlidingEventTimeWindows,EventTimeSessionWindow等荠察,其中這些是屬于window operator中的一部分置蜀,稱作 window assigner。window operator包含四個組件悉盆,除了 window assigner外盯荤,還包括 trigger , evictor, window process。其作用分別如下:
- window assigner 指明數(shù)據(jù)流中的數(shù)據(jù)屬于哪個window
- trigger 指明在哪些條件下觸發(fā)window計算焕盟,基于處理數(shù)據(jù)時的時間以及事件的特定屬性秋秤、
- evictor 可選組件,在window執(zhí)行計算前或后脚翘,將window中的數(shù)據(jù)移除航缀,如使用globalWindow時,由于該window的默認trigger為永不觸發(fā)堰怨,所以既需要實現(xiàn)自定義trigger芥玉,也需要實現(xiàn)evictor,移除部分已經(jīng)計算完畢的數(shù)據(jù)备图。
- window process flink默認提供的有 ReduceFunction,AggragateFunction.還可以自定義實現(xiàn) windowProcessFunction
而上述TumblingEventTimeWindows灿巧,SlidingEventTimeWindows,EventTimeSessionWindow三個默認提供的window operator中揽涮,都提供了默認的trigger抠藕,因此我們在初次接觸flink,使用以上三個方法時蒋困,沒有寫trigger盾似,直接寫window process,如 .reduce()
雪标。這是因為這三個window中的getDefaultTrigger()
方法使用的是EventTimeTrigger,也就是它給我們提供了默認的trigger零院。
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
而我們知道,在event-time模式中村刨,window的計算是在watermark到達后再執(zhí)行計算告抄,watermark在計算延遲與結(jié)果的完整性之間提供了一個權(quán)衡,flink也提供了其他機制來處理遲到數(shù)據(jù)嵌牺。所以打洼,我們來看看EventTimeTrigger中是如何實現(xiàn)這樣的邏輯的:
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
此處龄糊,僅貼出兩個相關(guān)的方法,onElement
以及onEventTime
方法募疮。其中onElement方法在每次數(shù)據(jù)進入該window時會觸發(fā)炫惩,而onEventTime是在之前注冊的eventTime回調(diào)函數(shù)到達時間時,進行觸發(fā)阿浓。
需要注意的是诡必,根據(jù)Trigger的接口,onElement方法中的timestamp參數(shù)搔扁,指的是處理時的機器時間,如下:
/**
* Called for every element that gets added to a pane. The result of this will determine
* whether the pane is evaluated to emit results.
*
* @param element The element that arrived.
* @param timestamp The timestamp of the element that arrived.
* @param window The window to which the element is being added.
* @param ctx A context object that can be used to register timer callbacks.
*/
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
修改內(nèi)容:此處按照注釋的理解好像是說的是機器時間蟋字,但是跟進代碼稿蹲,在 WindowOperator.Context.onElement 方法中,寫明了:
public TriggerResult onElement(StreamRecord<IN> element) throws Exception {
return trigger.onElement(element.getValue(), element.getTimestamp(), window, this);
}
而 onEventTime方法中的 timestamp 參數(shù)鹊奖,則是指注冊回調(diào)函數(shù)時寫入的 timestamp 數(shù)值苛聘。
下面詳細看一下,EventTimeTrigger中的onElement方法:
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
我們假設(shè)watermark使用默認值忠聚,200ms產(chǎn)生一個设哗,為了簡便,我們假設(shè)第二個窗口在第一個窗口結(jié)束后两蟀,立即有第一個數(shù)據(jù)流入网梢,按照時間先后順序依次套入上面的代碼,看一下執(zhí)行結(jié)果:
第一種情況:watermark不處理遲到數(shù)據(jù)赂毯,其時間戳與數(shù)據(jù)的時間戳相同战虏。
- window.maxTimestamp()指的是當前窗口的最大時間戳的數(shù)值,而ctx.getCurrentWatermark()此時指的是window上下文所接收到的党涕,最新到達的watermark烦感,該watermark觸發(fā)了上一個窗口的計算。此時新窗口盡管還未填滿膛堤,僅有一個數(shù)據(jù)手趣,但是其最大時間邊界已經(jīng)確定,一般來說要比此時的watermark的值要大肥荔。
- 因此不入if語句绿渣,而進入else語句。
- else語句會注冊一個回調(diào)函數(shù)燕耿,當未來某個watermark的時間戳大于該trigger的注冊時間時怯晕,就會觸發(fā)trigger,執(zhí)行該trigger所在的window operator的window process進行計算缸棵。
- 每進入一個新數(shù)據(jù)舟茶,就會注冊一遍這個回調(diào)函數(shù)。但是由于trigger會添加到set集合中,因此重復(fù)添加相同的trigger不會真的注冊重復(fù)的trigger吧凉。詳見:WindowOperator.registerEventTimeTimer() -> HeapInternalTimerService.registerEventTimeTimer -> InternalTimer.equals()
- 200ms后隧出,新的watermark跟隨新的數(shù)據(jù)到達,新的數(shù)據(jù)進入該window阀捅,再次判斷胀瞪,仍然進入else語句,新的watermark的時間戳沒有超過window.maxTimestamp饲鄙,不會觸發(fā)trigger凄诞,執(zhí)行計算。
- 如此持續(xù)忍级,直到一個時間戳大于等于window.maxTimestamp的事件到達并跟隨著watermark帆谍。假設(shè)到達的數(shù)據(jù)時間戳恰好為window.maxTimestamp,也就是該window理論上所能容納的最大的時間戳轴咱。此時watermark有兩種情況汛蝙,一為緊跟著數(shù)據(jù)到達,二為最多等待200ms后才到達朴肺。不論哪種情況窖剑,都不會在此時進入
onElement
方法了,watermark后沒有新的數(shù)據(jù)進入window戈稿,就不會走該方法西土。而是會觸發(fā)時間小于該watermark的所有trigger,就會觸發(fā)該window執(zhí)行計算鞍盗。
那么onElement
方法中的if條件什么時候進入呢翠储?難道是處理遲到數(shù)據(jù)用的?橡疼,我們看一下第二種情況援所,即:watermark處理5秒內(nèi)的延遲數(shù)據(jù)
- 第一個數(shù)據(jù)進入新的window,但此時上一個window還未計算欣除,因為watermark處理5秒內(nèi)延遲住拭,此時window的上下文還未收到大于上一個window的maxTimestamp的watermark。因此上一個window還未執(zhí)行历帚,在等待遲到數(shù)據(jù)滔岳。
- 假設(shè)在新的window處理了4.9秒后,來了個新數(shù)據(jù)挽牢,該數(shù)據(jù)從時間戳上來看谱煤,應(yīng)該屬于上一個window,因此window assigner將其放入上一個window中禽拔,執(zhí)行
onElement
方法刘离,但是此時才過了4.9秒而已室叉,還未到第5秒,也就是當前window的上下文保存的watermark的時間戳硫惕,還未大于上一個window的maxTimestamp茧痕,(但是要注意,windwo上下文中保存的watermark是200ms增長一次的恼除,因為watermark就是200ms產(chǎn)生一次踪旷,但是由于處理5秒內(nèi)的遲到數(shù)據(jù),因此即便4.9秒內(nèi)豁辉,watermark不斷增長令野,但是仍未能觸發(fā)上一個window的trigger),在onElement
方法中徽级,還是不會進入if語句气破,仍然是else語句,待5秒過后灰追,新的watermark到達,其時間戳大于等于上一個window的maxTimestamp狗超,才會觸發(fā)trigger弹澎,執(zhí)行上一個window的計算。
因此努咐,在watermark處理遲到數(shù)據(jù)情境下苦蒿,我們看到確實是處理了遲到數(shù)據(jù),但是并沒有走if條件渗稍。
那if條件存在的意義是什么呢佩迟?
我的理解是Flink在處理遲到數(shù)據(jù)時,watermark只是其中一個選擇竿屹,它只是權(quán)衡了結(jié)果完整性與延遲报强。通過上面的例子也可以看出,假設(shè)這個系統(tǒng)的數(shù)據(jù)真的最多延遲5秒拱燃,使用watermark后秉溉,確實能處理5秒內(nèi)延遲的數(shù)據(jù),保證了結(jié)果的準確性碗誉,但是window在5秒后才觸發(fā)執(zhí)行召嘶,系統(tǒng)就有最少5秒的延遲。
但是哮缺,現(xiàn)實世界的情況更加復(fù)雜弄跌,我們無法預(yù)估系統(tǒng)的數(shù)據(jù)是否真的最多遲到5秒。因此flink還提供了其他處理遲到數(shù)據(jù)的策略〕⑽現(xiàn)在還沒有閱讀到這一部分內(nèi)容铛只,不過我估計onElement方法中的if條件分支就是為其提供的埠胖。
當window在watermark觸發(fā)其計算后,該回調(diào)函數(shù)就從set集合中移除了格仲。但是后面若來了比5秒還要延遲的數(shù)據(jù)押袍,如遲到1分鐘的數(shù)據(jù)。當我們選擇了flink的其他處理遲到數(shù)據(jù)的策略后凯肋,window assginer將該遲到了1分鐘的數(shù)據(jù)谊惭,正確的放入已經(jīng)觸發(fā)了計算,得出了結(jié)果的window中侮东,執(zhí)行onElement方法圈盔,此時,window.maxTimestamp肯定是小于ctx.currentWatermark的悄雅,會再次觸發(fā)該舊的窗口執(zhí)行計算驱敲,保證了結(jié)果的完整性。
------------------------ 2018/12/12 更新 ---------------------------
今天看了 WindowOperator 類宽闲,明白了if條件確實是為遲到數(shù)據(jù)提供的众眨,以下著重分析 WindowOperator 類中的 processElement() 方法颤绕,該方法大致邏輯為:在每個元素進入時喇完,使用 windowAssigner 指定該元素屬于哪個或者哪幾個window(如香伴,slidingWindow执赡,同一個元素屬于多個window)官研,然后每個window都執(zhí)行以下邏輯:
- 首先分析哀蘑,該元素屬于的window是否已經(jīng)過期了苫拍,如果程序沒有指定處理遲到數(shù)據(jù)的策略瓜富,則遲到數(shù)據(jù)進入后就會被判定為該window已失效习蓬,該遲到數(shù)據(jù)不處理纽什,交由GC慢慢回收
- 如果window還有效,則根據(jù)該window獲取該window的狀態(tài)躲叼,將新進入的元素寫入該window的狀態(tài)芦缰,window就是這樣在狀態(tài)中緩存數(shù)據(jù)的,用于后面 processWindowFunction 處理 window 的數(shù)據(jù)枫慷。
- 根據(jù) key 與 window 找到相應(yīng)的 trigger饺藤,調(diào)用 trigger 的 onElement 方法,如果此時這個數(shù)據(jù)是一個遲到的數(shù)據(jù)流礁,則可以進入上面所說的if條件涕俗,直接出發(fā)trigger
- 根據(jù)trigger的結(jié)果,執(zhí)行相應(yīng)邏輯
- 注冊該window的失效回調(diào)函數(shù)
下面貼上WindowOperator中 processElement() 方法的相關(guān)代碼:
for (W window: elementWindows) {
// drop if the window is already late
if (isWindowLate(window)) {
continue;
}
isSkippedElement = false;
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = window;
TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(window);
}
注意的是神帅,在 processElement 方法內(nèi)再姑,會先判斷 windowAssigner 是否是 MergingWindowAssigner,我們常用的 TumblingEventTimeWindows 是正常的 windowAssigner 接口找御,而 EventTimeSessionWindows 是 MergingWindowAssigner元镀。 因此這里貼的代碼是針對 windowAssigner 的绍填。
其中 isWindowLate(window) 方法,其判斷條件是 該window的最大時間戳加上設(shè)置允許的最大遲到時間與當前watermark的比較栖疑。也就是 window.maxTimestamp() + allowedLateness <= watermark 讨永。若小于,則window過期遇革,需要刪除window對象卿闹,刪除window狀態(tài),觸發(fā)為給window注冊的onEventTime回調(diào)函數(shù)萝快。
再再提醒注意一點:
在EventTimeTrigger中返回的triggerResult要么是 continue 要么是 fire锻霎,就是沒有 purge 或者 fire_and_purge。現(xiàn)在看來揪漩,也是為了配合flink提供的 allowedLateness 處理遲到數(shù)據(jù)的策略旋恼。這是因為,如果trigger的結(jié)果是 purge 或者 fire_and_purge奄容,就會在 WindowOperator 中觸發(fā) windowState.clear() 動作冰更,這樣的話,真正遲到的數(shù)據(jù)加入該window后昂勒,該window的狀態(tài)已經(jīng)被刪除蜀细,無法更新了。但是 windowState.clear() 是為了清除該 window 的狀態(tài)的叁怪,如果trigger的狀態(tài)不指定為 purge 或者 fire_and_purge审葬,該window的狀態(tài)會不會刪除深滚?什么時候刪除奕谭?若不刪除豈不是會造成內(nèi)存問題?答案就在之前說的痴荐,當判定該window過期血柳,需要刪除時,會同時刪除window的狀態(tài)生兆,可以參考 WindowOperator 中的 clearAllState() 方法难捌。