上篇文章對Flink中常見的windowAssigner,如:TumblingEventTimeWindow, SlidingEventTimeWindow, EventTimeSessionWindows 等中的默認提供的trigger:EventTimeTrigger進行了剖析,討論了trigger注冊的回調(diào)函數(shù)的執(zhí)行殿遂,trigger會觸發(fā)怎樣的triggerResult命咐,如何處理遲到數(shù)據(jù)的trigger觸發(fā)篡九,以及提醒了需要注意的幾點,明白flink中的EventTimeTrigger為什么這么寫醋奠。
這篇文章就討論下當數(shù)據(jù)流入window時榛臼,會發(fā)生什么伊佃。我們著重分析的類就是上篇文章中提到的WindowOperator類。
在討論數(shù)據(jù)流入window會觸發(fā)的一系列動作之前沛善,我們需要明確一個window操作包括哪些部分航揉。
- window assigner 指明數(shù)據(jù)流中的數(shù)據(jù)屬于哪個window
- trigger 指明在哪些條件下觸發(fā)window計算,基于處理數(shù)據(jù)時的時間以及事件的特定屬性金刁、
- evictor 可選組件迷捧,在window執(zhí)行計算前或后,將window中的數(shù)據(jù)移除胀葱,如使用globalWindow時漠秋,由于該window的默認trigger為永不觸發(fā),所以既需要實現(xiàn)自定義trigger抵屿,也需要實現(xiàn)evictor庆锦,移除部分已經(jīng)計算完畢的數(shù)據(jù)。
- window process flink默認提供的有 ReduceFunction,AggragateFunction.還可以自定義實現(xiàn) windowProcessFunction轧葛。作用便是當trigger返回FIRE結(jié)果時搂抒,計算window的結(jié)果。
這篇文章尿扯,先不討論window的early emit求晶,即window在觸發(fā)完整的計算之前,為減小延遲而進行的提前計算衷笋。不過通過上面的四個組件芳杏,也可以想明白,只要自定義的trigger定時或定量或根據(jù)某條件在window觸發(fā)完整計算之前辟宗,產(chǎn)生FIRE結(jié)果爵赵,便可使用window process先行根據(jù)目前window中的不完整數(shù)據(jù),提前計算一個結(jié)果泊脐。
以下例子中空幻,我們使用以下前提:
- 使用的window assigner實現(xiàn)的是 WindowAssigner 而不是 MergingWindowAssigner
- 同時我們分析的 event-time 語義
- window操作作用在 KeyedStream 上
接下來要分析的 WindowOperator 類可以看做是一個調(diào)度器,它持有window操作相關(guān)的所有組件容客,不包括evctor秕铛,因為含有evctor組件的window操作被封裝為 EvictingWindowOperator 。 WindowOperator 定義了
window assigner, trigger, windowProcessFunction 的執(zhí)行順序如何缩挑,它們之間的執(zhí)行邏輯等但两。
WindowOperator 類實現(xiàn)了 Triggerable 接口,為什么要實現(xiàn)這個接口呢调煎?這是為了方便為 window 指派 window 過期時的回調(diào)函數(shù)镜遣,因此 WindowOperater 類中實現(xiàn)了 onEventTime 與 onProcessTime 兩個方法己肮,分別對應不用語義下 window 過期時的回調(diào)函數(shù)的執(zhí)行邏輯士袄,即:當flink 決定刪除 window 時悲关,都做了什么操作,刪除了哪些東西娄柳。
WindowOperator 類也實現(xiàn)了 OneInputStreamOperator 接口寓辱,實現(xiàn)了其 processElement 方法,當新數(shù)據(jù)流入時赤拒,調(diào)用該方法秫筏。
在真正分析代碼前,有必要先說明下 WindowOperator 中的幾個內(nèi)部類:
- WindowContext 每個window都有自己的context挎挖,持有該window與window的state这敬。在 WindowOperator 類中,該類對應 processContext 成員蕉朵。
- Context 實現(xiàn)了 Trigger.OnMergeContext 接口崔涂。作為一個處理window中trigger的公共類,該類中持有key與window兩個成員始衅,方便根據(jù)key與window處理特定的trigger冷蚂。在 WindowOperator 類中,該類對應 triggerContext 成員汛闸。
另外蝙茶,在 WindowOperator 中有一個 windowState 成員,以 window 為 namespace诸老,以隔離不同的window的context隆夯。這里雖然叫做 windowState 。但是通過稍后的代碼可以發(fā)現(xiàn)别伏,該類存儲的是不同window中的對應的原始數(shù)據(jù)(processWindowFunction情況)或結(jié)果(ReduceFunction/AggregateFunction情況)吮廉。
有了上面的基本認識,下面分析畸肆,當數(shù)據(jù)流入window時宦芦,發(fā)生了什么。
首先轴脐,根據(jù)剛剛所說调卑,每一個流入的數(shù)據(jù)都會調(diào)用 processElement 方法:
public void processElement(StreamRecord<IN> element) throws Exception {
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
//if element is handled by none of assigned elementWindows
boolean isSkippedElement = true;
final K key = this.<K>getKeyedStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
WindowOperator 類,首先使用用戶選擇的 windowAssigner 將流入的數(shù)據(jù)分配到響應的window中大咱,有可能是1個恬涧,0個甚至多個window。
第二句的 isSkippedElement 變量碴巾,在我們的前提下溯捆,沒有作用。
第三句獲取當前數(shù)據(jù)所在的KeyedStream的那個key上厦瓢。這個key在稍后的 triggerContext 成員中會用到提揍。
再下面的if語句不會進入啤月,在我們前提中,我們進入的是else語句:
} else {
for (W window: elementWindows) {
// drop if the window is already late
if (isWindowLate(window)) {
continue;
}
isSkippedElement = false;
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = window;
TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(window);
}
}
在 else 語句中劳跃,對該流入數(shù)據(jù)所分配到的每個window執(zhí)行以下邏輯:
- 判斷該window是否已過期谎仲。判斷條件如下:
protected boolean isWindowLate(W window) {
return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
}
private long cleanupTime(W window) {
if (windowAssigner.isEventTime()) {
long cleanupTime = window.maxTimestamp() + allowedLateness;
return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
} else {
return window.maxTimestamp();
}
}
通過該判斷可以看出,flink提供了 allowedLateness 變量用于指明允許遲到數(shù)據(jù)最多可以遲到多久刨仑,因此郑诺,window的過期時間不僅僅是其 maxTimestamp, 還需要加上等待遲到數(shù)據(jù)的時間杉武。
- 獲取該window的context辙诞,將數(shù)據(jù)加入。
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
這里需要指出轻抱,flink提供了兩種 window process:
- Incremental Aggregation Functions倘要。ReduceFunction 與 AggregateFunction ,其特點是無需保存 window 中的所有數(shù)據(jù)十拣,一旦新數(shù)據(jù)進入封拧,便可與之前的中間結(jié)果進行計算,因此這種 window 中其狀態(tài)僅需保存一個結(jié)果便可夭问。
- ProcessWindowFunction泽西。用戶實現(xiàn) ProcessWindowFunction 的自定義處理邏輯,特點是保存了 window 的所有數(shù)據(jù)缰趋,只有觸發(fā)了 trigger 后才可以執(zhí)行計算捧杉。
因此這里 windowState 根據(jù) window 獲取到的 state 是不同的。針對第一種情況秘血,返回的是 HeapReducingState味抖, HeapAggregatingState ,當執(zhí)行到 windowState.add(element.getValue());
語句時灰粮,便直接得出結(jié)果仔涩。而第二種情況,返回的是 HeapListState 粘舟,當執(zhí)行到 windowState.add(element.getValue());
語句時熔脂,僅僅是將數(shù)據(jù)加入到list中。
- 繼續(xù)往下走柑肴,獲取該key下該 window 的trigger霞揉,并執(zhí)行trigger的 onElement 方法,來確定需不需要觸發(fā)計算晰骑。
triggerContext.key = key;
triggerContext.window = window;
TriggerResult triggerResult = triggerContext.onElement(element);
根據(jù)上篇的解釋可知适秩,在默認trigger下,僅當流入的是遲到數(shù)據(jù)才會在 onElement 中觸發(fā)trigger。
因此秽荞,這里大家就可以實現(xiàn)自己的trigger骤公,根據(jù)流入的每一個數(shù)據(jù),判斷是否需要觸發(fā)trigger蚂会,達到提前觸發(fā)計算的目的。
- 根據(jù)trigger的結(jié)果耗式,執(zhí)行不同的邏輯
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
emitWindowContents(window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
- FIRE: 代表觸發(fā)window的計算胁住。首先從 windowState 中獲取內(nèi)容。由剛剛的分析知道刊咳,在 Incremental Aggregation Functions 情況下彪见,返回的是一個常量 : 計算結(jié)果。在 ProcessWindowFunction 情況下娱挨,返回的是當前window中的數(shù)據(jù)余指,一個list的iterator對象。然后執(zhí)行
emitWindowContents(window, contents);
語句
private void emitWindowContents(W window, ACC contents) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
processContext.window = window;
userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
}
該方法會調(diào)用用戶實現(xiàn)的計算邏輯(ProcessWindowFunction實現(xiàn)類)跷坝,將流入的數(shù)據(jù) contents 經(jīng)過計算酵镜,得到結(jié)果后寫入 timestampedCollector。
- PURGE: 代表需要清除window柴钻。這里就是執(zhí)行
windowState.clear();
語句淮韭。結(jié)果便是window的計算結(jié)果(Incremental Aggregation Functions 情況下)或者緩存的數(shù)據(jù)(ProcessWindwoFunction 情況下)清除,即:該window的狀態(tài)被清除贴届。但是此時window對象還未刪除靠粪,相關(guān)的trigger中的自定義狀態(tài)與 ProcessWindowFunction 中的狀態(tài)還未刪除。
- 最后毫蚓,為該window注冊失效后的回調(diào)函數(shù)占键,在window失效后,刪除window并做其他收尾工作元潘。
registerCleanupTimer(window);
前面說過了畔乙, WindowOperator 實現(xiàn)了 Triggerable 接口,且有 triggerContext 獲取當前正在處理的window的trigger來注冊回調(diào)函數(shù)翩概,registerCleanupTimer(window)
方法如下:
protected void registerCleanupTimer(W window) {
long cleanupTime = cleanupTime(window);
if (cleanupTime == Long.MAX_VALUE) {
// don't set a GC timer for "end of time"
return;
}
if (windowAssigner.isEventTime()) {
triggerContext.registerEventTimeTimer(cleanupTime);
} else {
triggerContext.registerProcessingTimeTimer(cleanupTime);
}
}
public void registerEventTimeTimer(long time) {
internalTimerService.registerEventTimeTimer(window, time);
}
通過上面的兩個方法可以看出啸澡,這里的回調(diào)函數(shù)并不是注冊在當前window的trigger中,而是注冊在 WindowOperator 內(nèi)部持有的一個 internalTimerService 中氮帐。
那該window是在何時才會失效呢嗅虏?
private long cleanupTime(W window) {
if (windowAssigner.isEventTime()) {
long cleanupTime = window.maxTimestamp() + allowedLateness;
return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
} else {
return window.maxTimestamp();
}
}
window 在 watermark 的時間戳大于 maxTimestamp + allowedLateness 時,才會過期上沐,這便是 flink 提供的除了 watermark 外的另一種處理遲到數(shù)據(jù)的機制皮服。
我們再看看,window過期后,回調(diào)函數(shù)是怎么處理的龄广。
public void onEventTime(InternalTimer<K, W> timer) throws Exception {
triggerContext.key = timer.getKey();
triggerContext.window = timer.getNamespace();
MergingWindowSet<W> mergingWindows;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
if (stateWindow == null) {
// Timer firing for non-existent window, this can only happen if a
// trigger did not clean up timers. We have already cleared the merging
// window and therefore the Trigger state, however, so nothing to do.
return;
} else {
windowState.setCurrentNamespace(stateWindow);
}
} else {
windowState.setCurrentNamespace(triggerContext.window);
mergingWindows = null;
}
ACC contents = null;
if (windowState != null) {
contents = windowState.get();
}
if (contents != null) {
TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
if (triggerResult.isFire()) {
emitWindowContents(triggerContext.window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
}
if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
clearAllState(triggerContext.window, windowState, mergingWindows);
}
if (mergingWindows != null) {
// need to make sure to update the merging state in state
mergingWindows.persist();
}
}
上面與 MergingWindowAssigner 相關(guān)的分支我們不進入分析硫眯。
為了方便分析,我們假設 window 的過期時間 maxTimestamp + allowedLateness = 2000 + 1500 = 3500择同。 當 watermark 的時間戳大于 3500 時两入,便會觸發(fā)該回調(diào)函數(shù),為了說明普遍性敲才,假設 watermark 的時間戳為 4000裹纳。
將與 MergingWindowAssigner 無關(guān)的語句去掉后,該方法的前面部分如下:
triggerContext.key = timer.getKey();
triggerContext.window = timer.getNamespace();
windowState.setCurrentNamespace(triggerContext.window);
mergingWindows = null;
ACC contents = null;
if (windowState != null) {
contents = windowState.get();
}
首先紧武,將 triggerContext 根據(jù)key與window找到特定的trigger剃氧,同樣 windowState 根據(jù)window找到特定的window中的context,該context中存儲的是window的計算結(jié)果(Incremental Aggregation Functions 情況下)或者緩存的數(shù)據(jù)(ProcessWindwoFunction 情況下)阻星。
接下來:
if (contents != null) {
TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
if (triggerResult.isFire()) {
emitWindowContents(triggerContext.window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
}
if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
clearAllState(triggerContext.window, windowState, mergingWindows);
}
以上代碼說明了當window過期朋鞍,觸發(fā)過期的回到函數(shù)時,都會做哪些操作妥箕。
可以看到滥酥,會先到該window的trigger中執(zhí)行 onEventTme 方法。此時的 timer.getTimestamp() 的值為 3500畦幢。而一般我們自定義的trigger中一般不會注冊一個時間為 maxTimestamp + allowedLateness 的回調(diào)函數(shù)恨狈。以flink的默認trigger - EventTimeTrigger 為例,其注冊的回調(diào)函數(shù)最大時間便是 maxTimestamp 呛讲。因此禾怠,除非用戶設置的 allowedLateness 為0,且在trigger中注冊了時間為 maxTmestamp 的回調(diào)函數(shù)贝搁,否則此處不會有triggerResult吗氏。
假設此處確實有對應的回調(diào)函數(shù)且被執(zhí)行,下面的兩個if條件的邏輯與上面分析 processElement 時一樣雷逆,不再贅述弦讽。
再往下走,調(diào)用了 clearAllState 方法膀哲,進入該方法:
private void clearAllState(W window,AppendingState<IN, ACC> windowState,MergingWindowSet<W> mergingWindows) throws Exception {
windowState.clear();
triggerContext.clear();
processContext.window = window;
processContext.clear();
if (mergingWindows != null) {
mergingWindows.retireWindow(window);
mergingWindows.persist();
}
}
- windowState.clear() 將window中暫存的結(jié)果或原始數(shù)據(jù)刪除往产。
- triggerContext.clear() 調(diào)用該window的trigger的clear()方法,刪除用戶自定義trigger中的自定義狀態(tài)某宪,同時刪除trigger的timer仿村。需要用戶在實現(xiàn)自定義trigger且使用自定義狀態(tài)時,實現(xiàn)該方法兴喂,方便此時調(diào)用清除狀態(tài)蔼囊,避免內(nèi)存問題焚志。
- processContext.clear(); 調(diào)用用戶實現(xiàn)自定義邏輯的,ProcessWindwoFuncton接口實現(xiàn)類的clear()方法畏鼓,目的同上酱酬。