Flink 認為 Batch 是 Streaming 的一個特例砖织,所以 Flink 底層引擎是一個流式引擎,在上面實現(xiàn)了流處理和批處理攒霹。而窗口(window)就是從 Streaming 到 Batch 的一個橋梁涂臣。Flink 提供了非常完善的窗口機制,這是我認為的 Flink 最大的亮點之一(其他的亮點包括消息亂序處理搞糕,和 checkpoint 機制)。本文我們將介紹流式處理中的窗口概念曼追,介紹 Flink 內(nèi)建的一些窗口和 Window API窍仰,最后討論下窗口在底層是如何實現(xiàn)的。
什么是 Window
在流處理應用中礼殊,數(shù)據(jù)是連續(xù)不斷的驹吮,因此我們不可能等到所有數(shù)據(jù)都到了才開始處理。當然我們可以每來一個消息就處理一次晶伦,但是有時我們需要做一些聚合類的處理碟狞,例如:在過去的1分鐘內(nèi)有多少用戶點擊了我們的網(wǎng)頁。在這種情況下婚陪,我們必須定義一個窗口族沃,用來收集最近一分鐘內(nèi)的數(shù)據(jù),并對這個窗口內(nèi)的數(shù)據(jù)進行計算泌参。
窗口可以是時間驅(qū)動的(Time Window脆淹,例如:每30秒鐘),也可以是數(shù)據(jù)驅(qū)動的(Count Window沽一,例如:每一百個元素)盖溺。一種經(jīng)典的窗口分類可以分成:翻滾窗口(Tumbling Window,無重疊)铣缠,滾動窗口(Sliding Window烘嘱,有重疊),和會話窗口(Session Window攘残,活動間隙)拙友。
我們舉個具體的場景來形象地理解不同窗口的概念。假設歼郭,淘寶網(wǎng)會記錄每個用戶每次購買的商品個數(shù),我們要做的是統(tǒng)計不同窗口中用戶購買商品的總數(shù)病曾。下圖給出了幾種經(jīng)典的窗口切分概述圖:
上圖中牍蜂,raw data stream 代表用戶的購買行為流,圈中的數(shù)字代表該用戶本次購買的商品個數(shù)泰涂,事件是按時間分布的鲫竞,所以可以看出事件之間是有time gap的。Flink 提供了上圖中所有的窗口類型逼蒙,下面我們會逐一進行介紹从绘。
Time Window
就如名字所說的,Time Window 是根據(jù)時間對數(shù)據(jù)流進行分組的。這里我們涉及到了流處理中的時間問題僵井,時間問題和消息亂序問題是緊密關聯(lián)的陕截,這是流處理中現(xiàn)存的難題之一,我們將在后續(xù)的 EventTime 和消息亂序處理 中對這部分問題進行深入探討批什。這里我們只需要知道 Flink 提出了三種時間的概念农曲,分別是event time(事件時間:事件發(fā)生時的時間),ingestion time(攝取時間:事件進入流處理系統(tǒng)的時間)驻债,processing time(處理時間:消息被計算處理的時間)乳规。Flink 中窗口機制和時間類型是完全解耦的,也就是說當需要改變時間類型時不需要更改窗口邏輯相關的代碼合呐。
-
Tumbling Time Window
如上圖暮的,我們需要統(tǒng)計每一分鐘中用戶購買的商品的總數(shù),需要將用戶的行為事件按每一分鐘進行切分合砂,這種切分被成為翻滾時間窗口(Tumbling Time Window)青扔。翻滾窗口能將數(shù)據(jù)流切分成不重疊的窗口,每一個事件只能屬于一個窗口翩伪。通過使用 DataStream API微猖,我們可以這樣實現(xiàn):
val slidingCnts: DataStream[(Int, Int)] = buyCnts
.keyBy(0)
// sliding time window of 1 minute length and 30 secs trigger interval
.timeWindow(Time.minutes(1), Time.seconds(30))
.sum(1)
-
Sliding Time Window
但是對于某些應用,它們需要的窗口是不間斷的缘屹,需要平滑地進行窗口聚合凛剥。比如,我們可以每30秒計算一次最近一分鐘用戶購買的商品總數(shù)轻姿。這種窗口我們稱為滑動時間窗口(Sliding Time Window)犁珠。在滑窗中,一個元素可以對應多個窗口互亮。通過使用 DataStream API犁享,我們可以這樣實現(xiàn):
val slidingCnts: DataStream[(Int, Int)] = buyCnts
.keyBy(0)
// sliding time window of 1 minute length and 30 secs trigger interval
.timeWindow(Time.minutes(1), Time.seconds(30))
.sum(1)
Count Window
Count Window 是根據(jù)元素個數(shù)對數(shù)據(jù)流進行分組的。
-
Tumbling Count Window
當我們想要每100個用戶購買行為事件統(tǒng)計購買總數(shù)豹休,那么每當窗口中填滿100個元素了炊昆,就會對窗口進行計算,這種窗口我們稱之為翻滾計數(shù)窗口(Tumbling Count Window)威根,上圖所示窗口大小為3個凤巨。通過使用 DataStream API,我們可以這樣實現(xiàn):
// Stream of (userId, buyCnts)
val buyCnts: DataStream[(Int, Int)] = ...
val tumblingCnts: DataStream[(Int, Int)] = buyCnts
// key stream by sensorId
.keyBy(0)
// tumbling count window of 100 elements size
.countWindow(100)
// compute the buyCnt sum
.sum(1)
-
Sliding Count Window
當然Count Window 也支持 Sliding Window洛搀,雖在上圖中未描述出來敢茁,但和Sliding Time Window含義是類似的,例如計算每10個元素計算一次最近100個元素的總和留美,代碼示例如下彰檬。
val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
.keyBy(0)
// sliding count window of 100 elements size and 10 elements trigger interval
.countWindow(100, 10)
.sum(1)
Session Window
在這種用戶交互事件流中伸刃,我們首先想到的是將事件聚合到會話窗口中(一段用戶持續(xù)活躍的周期),由非活躍的間隙分隔開僧叉。如上圖所示奕枝,就是需要計算每個用戶在活躍期間總共購買的商品數(shù)量,如果用戶30秒沒有活動則視為會話斷開(假設raw data stream是單個用戶的購買行為流)瓶堕。Session Window 的示例代碼如下:
// Stream of (userId, buyCnts)
val buyCnts: DataStream[(Int, Int)] = ...
val sessionCnts: DataStream[(Int, Int)] = vehicleCnts
.keyBy(0)
// session window based on a 30 seconds session gap interval
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30)))
.sum(1)
一般而言,window 是在無限的流上定義了一個有限的元素集合症歇。這個集合可以是基于時間的郎笆,元素個數(shù)的,時間和個數(shù)結(jié)合的忘晤,會話間隙的宛蚓,或者是自定義的。Flink 的 DataStream API 提供了簡潔的算子來滿足常用的窗口操作设塔,同時提供了通用的窗口機制來允許用戶自己定義窗口分配邏輯凄吏。下面我們會對 Flink 窗口相關的 API 進行剖析。
剖析 Window API
得益于 Flink Window API 松耦合設計闰蛔,我們可以非常靈活地定義符合特定業(yè)務的窗口痕钢。Flink 中定義一個窗口主要需要以下三個組件。
-
Window Assigner:用來決定某個元素被分配到哪個/哪些窗口中去序六。
-
Trigger:觸發(fā)器任连。決定了一個窗口何時能夠被計算或清除,每個窗口都會擁有一個自己的Trigger例诀。
-
Evictor:可以譯為“驅(qū)逐者”随抠。在Trigger觸發(fā)之后,在窗口被處理之前繁涂,Evictor(如果有Evictor的話)會用來剔除窗口中不需要的元素拱她,相當于一個filter。
上述三個組件的不同實現(xiàn)的不同組合扔罪,可以定義出非常復雜的窗口秉沼。Flink 中內(nèi)置的窗口也都是基于這三個組件構(gòu)成的,當然內(nèi)置窗口有時候無法解決用戶特殊的需求步势,所以 Flink 也暴露了這些窗口機制的內(nèi)部接口供用戶實現(xiàn)自定義的窗口氧猬。下面我們將基于這三者探討窗口的實現(xiàn)機制。
Window 的實現(xiàn)
下圖描述了 Flink 的窗口機制以及各組件之間是如何相互工作的坏瘩。
首先上圖中的組件都位于一個算子(window operator)中盅抚,數(shù)據(jù)流源源不斷地進入算子,每一個到達的元素都會被交給 WindowAssigner倔矾。WindowAssigner 會決定元素被放到哪個或哪些窗口(window)妄均,可能會創(chuàng)建新窗口柱锹。因為一個元素可以被放入多個窗口中,所以同時存在多個窗口是可能的丰包。注意禁熏,Window
本身只是一個ID標識符,其內(nèi)部可能存儲了一些元數(shù)據(jù)邑彪,如TimeWindow
中有開始和結(jié)束時間瞧毙,但是并不會存儲窗口中的元素。窗口中的元素實際存儲在 Key/Value State 中寄症,key為Window
宙彪,value為元素集合(或聚合值)。為了保證窗口的容錯性,該實現(xiàn)依賴了 Flink 的 State 機制(參見 state 文檔)。
每一個窗口都擁有一個屬于自己的 Trigger绒尊,Trigger上會有定時器,用來決定一個窗口何時能夠被計算或清除男图。每當有元素加入到該窗口,或者之前注冊的定時器超時了甜橱,那么Trigger都會被調(diào)用逊笆。Trigger的返回結(jié)果可以是 continue(不做任何操作),fire(處理窗口數(shù)據(jù))渗鬼,purge(移除窗口和窗口中的數(shù)據(jù))览露,或者 fire + purge。一個Trigger的調(diào)用結(jié)果只是fire的話譬胎,那么會計算窗口并保留窗口原樣差牛,也就是說窗口中的數(shù)據(jù)仍然保留不變,等待下次Trigger fire的時候再次執(zhí)行計算堰乔。一個窗口可以被重復計算多次知道它被 purge 了偏化。在purge之前,窗口會一直占用著內(nèi)存镐侯。
當Trigger fire了侦讨,窗口中的元素集合就會交給Evictor
(如果指定了的話)。Evictor 主要用來遍歷窗口中的元素列表苟翻,并決定最先進入窗口的多少個元素需要被移除韵卤。剩余的元素會交給用戶指定的函數(shù)進行窗口的計算。如果沒有 Evictor 的話崇猫,窗口中的所有元素會一起交給函數(shù)進行計算沈条。
計算函數(shù)收到了窗口的元素(可能經(jīng)過了 Evictor 的過濾),并計算出窗口的結(jié)果值诅炉,并發(fā)送給下游蜡歹。窗口的結(jié)果值可以是一個也可以是多個屋厘。DataStream API 上可以接收不同類型的計算函數(shù),包括預定義的sum()
,min()
,max()
月而,還有 ReduceFunction
汗洒,FoldFunction
,還有WindowFunction
父款。WindowFunction 是最通用的計算函數(shù)溢谤,其他的預定義的函數(shù)基本都是基于該函數(shù)實現(xiàn)的。
Flink 對于一些聚合類的窗口計算(如sum,min)做了優(yōu)化铛漓,因為聚合類的計算不需要將窗口中的所有數(shù)據(jù)都保存下來溯香,只需要保存一個result值就可以了。每個進入窗口的元素都會執(zhí)行一次聚合函數(shù)并修改result值浓恶。這樣可以大大降低內(nèi)存的消耗并提升性能。但是如果用戶定義了 Evictor结笨,則不會啟用對聚合窗口的優(yōu)化包晰,因為 Evictor 需要遍歷窗口中的所有元素,必須要將窗口中所有元素都存下來炕吸。
源碼分析
上述的三個組件構(gòu)成了 Flink 的窗口機制伐憾。為了更清楚地描述窗口機制,以及解開一些疑惑(比如 purge 和 Evictor 的區(qū)別和用途)赫模,我們將一步步地解釋 Flink 內(nèi)置的一些窗口(Time Window树肃,Count Window,Session Window)是如何實現(xiàn)的瀑罗。
Count Window 實現(xiàn)
Count Window 是使用三組件的典范胸嘴,我們可以在 KeyedStream
上創(chuàng)建 Count Window,其源碼如下所示:
// tumbling count window
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
return window(GlobalWindows.create()) // create window stream using GlobalWindows
.trigger(PurgingTrigger.of(CountTrigger.of(size))); // trigger is window size
}
// sliding count window
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size)) // evictor is window size
.trigger(CountTrigger.of(slide)); // trigger is slide size
}
第一個函數(shù)是申請翻滾計數(shù)窗口斩祭,參數(shù)為窗口大小劣像。第二個函數(shù)是申請滑動計數(shù)窗口,參數(shù)分別為窗口大小和滑動大小摧玫。它們都是基于 GlobalWindows
這個 WindowAssigner 來創(chuàng)建的窗口耳奕,該assigner會將所有元素都分配到同一個global window中,所有GlobalWindows
的返回值一直是 GlobalWindow
單例诬像∥萑海基本上自定義的窗口都會基于該assigner實現(xiàn)。
翻滾計數(shù)窗口并不帶evictor坏挠,只注冊了一個trigger芍躏。該trigger是帶purge功能的 CountTrigger。也就是說每當窗口中的元素數(shù)量達到了 window-size癞揉,trigger就會返回fire+purge纸肉,窗口就會執(zhí)行計算并清空窗口中的所有元素溺欧,再接著儲備新的元素。從而實現(xiàn)了tumbling的窗口之間無重疊柏肪。
滑動計數(shù)窗口的各窗口之間是有重疊的姐刁,但我們用的 GlobalWindows assinger 從始至終只有一個窗口,不像 sliding time assigner 可以同時存在多個窗口烦味。所以trigger結(jié)果不能帶purge聂使,也就是說計算完窗口后窗口中的數(shù)據(jù)要保留下來(供下個滑窗使用)。另外谬俄,trigger的間隔是slide-size柏靶,evictor的保留的元素個數(shù)是window-size。也就是說溃论,每個滑動間隔就觸發(fā)一次窗口計算屎蜓,并保留下最新進入窗口的window-size個元素,剔除舊元素钥勋。
假設有一個滑動計數(shù)窗口炬转,每2個元素計算一次最近4個元素的總和,那么窗口工作示意圖如下所示:
圖中所示的各個窗口邏輯上是不同的窗口算灸,但在物理上是同一個窗口扼劈。該滑動計數(shù)窗口,trigger的觸發(fā)條件是元素個數(shù)達到2個(每進入?2個元素就會觸發(fā)一次)菲驴,evictor保留的元素個數(shù)是4個荐吵,每次計算完窗口總和后會保留剩余的元素。所以第一次觸發(fā)trigger是當元素5進入赊瞬,第三次觸發(fā)trigger是當元素2進入先煎,并驅(qū)逐5和2,計算剩余的4個元素的總和(22)并發(fā)送出去森逮,保留下2,4,9,7元素供下個邏輯窗口使用榨婆。
Time Window 實現(xiàn)
同樣的,我們也可以在 KeyedStream
上申請 Time Window褒侧,其源碼如下所示:
// tumbling time window
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(TumblingProcessingTimeWindows.of(size));
} else {
return window(TumblingEventTimeWindows.of(size));
}
}
// sliding time window
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(SlidingProcessingTimeWindows.of(size, slide));
} else {
return window(SlidingEventTimeWindows.of(size, slide));
}
}
在方法體內(nèi)部會根據(jù)當前環(huán)境注冊的時間類型良风,使用不同的WindowAssigner創(chuàng)建window∶乒可以看到烟央,EventTime和IngestTime都使用了XXXEventTimeWindows
這個assigner,因為EventTime和IngestTime在底層的實現(xiàn)上只是在Source處為Record打時間戳的實現(xiàn)不同歪脏,在window operator中的處理邏輯是一樣的疑俭。
這里我們主要分析sliding process time window,如下是相關源碼:
public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private final long size;
private final long slide;
private SlidingProcessingTimeWindows(long size, long slide) {
this.size = size;
this.slide = slide;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
timestamp = System.currentTimeMillis();
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
// 對齊時間戳
long lastStart = timestamp - timestamp % slide;
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
// 當前時間戳對應了多個window
windows.add(new TimeWindow(start, start + size));
}
return windows;
}
...
}
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
@Override
// 每個元素進入窗口都會調(diào)用該方法
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
// 注冊定時器婿失,當系統(tǒng)時間到達window end timestamp時會回調(diào)該trigger的onProcessingTime方法
ctx.registerProcessingTimeTimer(window.getEnd());
return TriggerResult.CONTINUE;
}
@Override
// 返回結(jié)果表示執(zhí)行窗口計算并清空窗口
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE_AND_PURGE;
}
...
}
首先钞艇,SlidingProcessingTimeWindows
會對每個進入窗口的元素根據(jù)系統(tǒng)時間分配到(size / slide)
個不同的窗口啄寡,并會在每個窗口上根據(jù)窗口結(jié)束時間注冊一個定時器(相同窗口只會注冊一份),當定時器超時時意味著該窗口完成了哩照,這時會回調(diào)對應窗口的Trigger的onProcessingTime
方法挺物,返回FIRE_AND_PURGE,也就是會執(zhí)行窗口計算并清空窗口飘弧。整個過程示意圖如下:
如上圖所示橫軸代表時間戳(為簡化問題识藤,時間戳從0開始),第一條record會被分配到[-5,5)和[0,10)兩個窗口中次伶,當系統(tǒng)時間到5時痴昧,就會計算[-5,5)窗口中的數(shù)據(jù),并將結(jié)果發(fā)送出去冠王,最后清空窗口中的數(shù)據(jù)赶撰,釋放該窗口資源。
Session Window 實現(xiàn)
Session Window 是一個需求很強烈的窗口機制柱彻,但Session也比之前的Window更復雜扣囊,所以 Flink 也是在即將到來的 1.1.0 版本中才支持了該功能。由于篇幅問題绒疗,我們將在后續(xù)的 Session Window 的實現(xiàn) 中深入探討 Session Window 的實現(xiàn)。