Flink 原理與實現(xiàn):Window 機制

Flink 認為 Batch 是 Streaming 的一個特例砖织,所以 Flink 底層引擎是一個流式引擎,在上面實現(xiàn)了流處理和批處理攒霹。而窗口(window)就是從 Streaming 到 Batch 的一個橋梁涂臣。Flink 提供了非常完善的窗口機制,這是我認為的 Flink 最大的亮點之一(其他的亮點包括消息亂序處理搞糕,和 checkpoint 機制)。本文我們將介紹流式處理中的窗口概念曼追,介紹 Flink 內(nèi)建的一些窗口和 Window API窍仰,最后討論下窗口在底層是如何實現(xiàn)的。

什么是 Window

在流處理應用中礼殊,數(shù)據(jù)是連續(xù)不斷的驹吮,因此我們不可能等到所有數(shù)據(jù)都到了才開始處理。當然我們可以每來一個消息就處理一次晶伦,但是有時我們需要做一些聚合類的處理碟狞,例如:在過去的1分鐘內(nèi)有多少用戶點擊了我們的網(wǎng)頁。在這種情況下婚陪,我們必須定義一個窗口族沃,用來收集最近一分鐘內(nèi)的數(shù)據(jù),并對這個窗口內(nèi)的數(shù)據(jù)進行計算泌参。

窗口可以是時間驅(qū)動的(Time Window脆淹,例如:每30秒鐘),也可以是數(shù)據(jù)驅(qū)動的(Count Window沽一,例如:每一百個元素)盖溺。一種經(jīng)典的窗口分類可以分成:翻滾窗口(Tumbling Window,無重疊)铣缠,滾動窗口(Sliding Window烘嘱,有重疊),和會話窗口(Session Window攘残,活動間隙)拙友。

我們舉個具體的場景來形象地理解不同窗口的概念。假設歼郭,淘寶網(wǎng)會記錄每個用戶每次購買的商品個數(shù),我們要做的是統(tǒng)計不同窗口中用戶購買商品的總數(shù)病曾。下圖給出了幾種經(jīng)典的窗口切分概述圖:

上圖中牍蜂,raw data stream 代表用戶的購買行為流,圈中的數(shù)字代表該用戶本次購買的商品個數(shù)泰涂,事件是按時間分布的鲫竞,所以可以看出事件之間是有time gap的。Flink 提供了上圖中所有的窗口類型逼蒙,下面我們會逐一進行介紹从绘。

Time Window

就如名字所說的,Time Window 是根據(jù)時間對數(shù)據(jù)流進行分組的。這里我們涉及到了流處理中的時間問題僵井,時間問題和消息亂序問題是緊密關聯(lián)的陕截,這是流處理中現(xiàn)存的難題之一,我們將在后續(xù)的 EventTime 和消息亂序處理 中對這部分問題進行深入探討批什。這里我們只需要知道 Flink 提出了三種時間的概念农曲,分別是event time(事件時間:事件發(fā)生時的時間),ingestion time(攝取時間:事件進入流處理系統(tǒng)的時間)驻债,processing time(處理時間:消息被計算處理的時間)乳规。Flink 中窗口機制和時間類型是完全解耦的,也就是說當需要改變時間類型時不需要更改窗口邏輯相關的代碼合呐。

  • Tumbling Time Window
    如上圖暮的,我們需要統(tǒng)計每一分鐘中用戶購買的商品的總數(shù),需要將用戶的行為事件按每一分鐘進行切分合砂,這種切分被成為翻滾時間窗口(Tumbling Time Window)青扔。翻滾窗口能將數(shù)據(jù)流切分成不重疊的窗口,每一個事件只能屬于一個窗口翩伪。通過使用 DataStream API微猖,我們可以這樣實現(xiàn):
val slidingCnts: DataStream[(Int, Int)] = buyCnts
 .keyBy(0) 
 // sliding time window of 1 minute length and 30 secs trigger interval
 .timeWindow(Time.minutes(1), Time.seconds(30))
 .sum(1)

  • Sliding Time Window
    但是對于某些應用,它們需要的窗口是不間斷的缘屹,需要平滑地進行窗口聚合凛剥。比如,我們可以每30秒計算一次最近一分鐘用戶購買的商品總數(shù)轻姿。這種窗口我們稱為滑動時間窗口(Sliding Time Window)犁珠。在滑窗中,一個元素可以對應多個窗口互亮。通過使用 DataStream API犁享,我們可以這樣實現(xiàn):
  val slidingCnts: DataStream[(Int, Int)] = buyCnts
     .keyBy(0) 
     // sliding time window of 1 minute length and 30 secs trigger interval
     .timeWindow(Time.minutes(1), Time.seconds(30))
     .sum(1)
   

Count Window

Count Window 是根據(jù)元素個數(shù)對數(shù)據(jù)流進行分組的。

  • Tumbling Count Window
    當我們想要每100個用戶購買行為事件統(tǒng)計購買總數(shù)豹休,那么每當窗口中填滿100個元素了炊昆,就會對窗口進行計算,這種窗口我們稱之為翻滾計數(shù)窗口(Tumbling Count Window)威根,上圖所示窗口大小為3個凤巨。通過使用 DataStream API,我們可以這樣實現(xiàn):
// Stream of (userId, buyCnts)
 val buyCnts: DataStream[(Int, Int)] = ...

 val tumblingCnts: DataStream[(Int, Int)] = buyCnts
  // key stream by sensorId
  .keyBy(0)
  // tumbling count window of 100 elements size
  .countWindow(100)
  // compute the buyCnt sum 
  .sum(1)
  • Sliding Count Window
    當然Count Window 也支持 Sliding Window洛搀,雖在上圖中未描述出來敢茁,但和Sliding Time Window含義是類似的,例如計算每10個元素計算一次最近100個元素的總和留美,代碼示例如下彰檬。
val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
    .keyBy(0)
    // sliding count window of 100 elements size and 10 elements trigger interval
    .countWindow(100, 10)
    .sum(1)

Session Window

在這種用戶交互事件流中伸刃,我們首先想到的是將事件聚合到會話窗口中(一段用戶持續(xù)活躍的周期),由非活躍的間隙分隔開僧叉。如上圖所示奕枝,就是需要計算每個用戶在活躍期間總共購買的商品數(shù)量,如果用戶30秒沒有活動則視為會話斷開(假設raw data stream是單個用戶的購買行為流)瓶堕。Session Window 的示例代碼如下:

// Stream of (userId, buyCnts)
val buyCnts: DataStream[(Int, Int)] = ...

val sessionCnts: DataStream[(Int, Int)] = vehicleCnts
 .keyBy(0)
 // session window based on a 30 seconds session gap interval 
 .window(ProcessingTimeSessionWindows.withGap(Time.seconds(30)))
 .sum(1)

一般而言,window 是在無限的流上定義了一個有限的元素集合症歇。這個集合可以是基于時間的郎笆,元素個數(shù)的,時間和個數(shù)結(jié)合的忘晤,會話間隙的宛蚓,或者是自定義的。Flink 的 DataStream API 提供了簡潔的算子來滿足常用的窗口操作设塔,同時提供了通用的窗口機制來允許用戶自己定義窗口分配邏輯凄吏。下面我們會對 Flink 窗口相關的 API 進行剖析。

剖析 Window API

得益于 Flink Window API 松耦合設計闰蛔,我們可以非常靈活地定義符合特定業(yè)務的窗口痕钢。Flink 中定義一個窗口主要需要以下三個組件。

  • Window Assigner:用來決定某個元素被分配到哪個/哪些窗口中去序六。

    如下類圖展示了目前內(nèi)置實現(xiàn)的 Window Assigners:

  • Trigger:觸發(fā)器任连。決定了一個窗口何時能夠被計算或清除,每個窗口都會擁有一個自己的Trigger例诀。

    如下類圖展示了目前內(nèi)置實現(xiàn)的 Triggers:

  • Evictor:可以譯為“驅(qū)逐者”随抠。在Trigger觸發(fā)之后,在窗口被處理之前繁涂,Evictor(如果有Evictor的話)會用來剔除窗口中不需要的元素拱她,相當于一個filter。

    如下類圖展示了目前內(nèi)置實現(xiàn)的 Evictors:

上述三個組件的不同實現(xiàn)的不同組合扔罪,可以定義出非常復雜的窗口秉沼。Flink 中內(nèi)置的窗口也都是基于這三個組件構(gòu)成的,當然內(nèi)置窗口有時候無法解決用戶特殊的需求步势,所以 Flink 也暴露了這些窗口機制的內(nèi)部接口供用戶實現(xiàn)自定義的窗口氧猬。下面我們將基于這三者探討窗口的實現(xiàn)機制。

Window 的實現(xiàn)

下圖描述了 Flink 的窗口機制以及各組件之間是如何相互工作的坏瘩。

首先上圖中的組件都位于一個算子(window operator)中盅抚,數(shù)據(jù)流源源不斷地進入算子,每一個到達的元素都會被交給 WindowAssigner倔矾。WindowAssigner 會決定元素被放到哪個或哪些窗口(window)妄均,可能會創(chuàng)建新窗口柱锹。因為一個元素可以被放入多個窗口中,所以同時存在多個窗口是可能的丰包。注意禁熏,Window本身只是一個ID標識符,其內(nèi)部可能存儲了一些元數(shù)據(jù)邑彪,如TimeWindow中有開始和結(jié)束時間瞧毙,但是并不會存儲窗口中的元素。窗口中的元素實際存儲在 Key/Value State 中寄症,key為Window宙彪,value為元素集合(或聚合值)。為了保證窗口的容錯性,該實現(xiàn)依賴了 Flink 的 State 機制(參見 state 文檔)。

每一個窗口都擁有一個屬于自己的 Trigger绒尊,Trigger上會有定時器,用來決定一個窗口何時能夠被計算或清除男图。每當有元素加入到該窗口,或者之前注冊的定時器超時了甜橱,那么Trigger都會被調(diào)用逊笆。Trigger的返回結(jié)果可以是 continue(不做任何操作),fire(處理窗口數(shù)據(jù))渗鬼,purge(移除窗口和窗口中的數(shù)據(jù))览露,或者 fire + purge。一個Trigger的調(diào)用結(jié)果只是fire的話譬胎,那么會計算窗口并保留窗口原樣差牛,也就是說窗口中的數(shù)據(jù)仍然保留不變,等待下次Trigger fire的時候再次執(zhí)行計算堰乔。一個窗口可以被重復計算多次知道它被 purge 了偏化。在purge之前,窗口會一直占用著內(nèi)存镐侯。

當Trigger fire了侦讨,窗口中的元素集合就會交給Evictor(如果指定了的話)。Evictor 主要用來遍歷窗口中的元素列表苟翻,并決定最先進入窗口的多少個元素需要被移除韵卤。剩余的元素會交給用戶指定的函數(shù)進行窗口的計算。如果沒有 Evictor 的話崇猫,窗口中的所有元素會一起交給函數(shù)進行計算沈条。

計算函數(shù)收到了窗口的元素(可能經(jīng)過了 Evictor 的過濾),并計算出窗口的結(jié)果值诅炉,并發(fā)送給下游蜡歹。窗口的結(jié)果值可以是一個也可以是多個屋厘。DataStream API 上可以接收不同類型的計算函數(shù),包括預定義的sum(),min(),max()月而,還有 ReduceFunction汗洒,FoldFunction,還有WindowFunction父款。WindowFunction 是最通用的計算函數(shù)溢谤,其他的預定義的函數(shù)基本都是基于該函數(shù)實現(xiàn)的。

Flink 對于一些聚合類的窗口計算(如sum,min)做了優(yōu)化铛漓,因為聚合類的計算不需要將窗口中的所有數(shù)據(jù)都保存下來溯香,只需要保存一個result值就可以了。每個進入窗口的元素都會執(zhí)行一次聚合函數(shù)并修改result值浓恶。這樣可以大大降低內(nèi)存的消耗并提升性能。但是如果用戶定義了 Evictor结笨,則不會啟用對聚合窗口的優(yōu)化包晰,因為 Evictor 需要遍歷窗口中的所有元素,必須要將窗口中所有元素都存下來炕吸。

源碼分析

上述的三個組件構(gòu)成了 Flink 的窗口機制伐憾。為了更清楚地描述窗口機制,以及解開一些疑惑(比如 purge 和 Evictor 的區(qū)別和用途)赫模,我們將一步步地解釋 Flink 內(nèi)置的一些窗口(Time Window树肃,Count Window,Session Window)是如何實現(xiàn)的瀑罗。

Count Window 實現(xiàn)

Count Window 是使用三組件的典范胸嘴,我們可以在 KeyedStream 上創(chuàng)建 Count Window,其源碼如下所示:

// tumbling count window
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
 return window(GlobalWindows.create())  // create window stream using GlobalWindows
 .trigger(PurgingTrigger.of(CountTrigger.of(size))); // trigger is window size
}
// sliding count window
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
 return window(GlobalWindows.create())
 .evictor(CountEvictor.of(size))  // evictor is window size
 .trigger(CountTrigger.of(slide)); // trigger is slide size
}

第一個函數(shù)是申請翻滾計數(shù)窗口斩祭,參數(shù)為窗口大小劣像。第二個函數(shù)是申請滑動計數(shù)窗口,參數(shù)分別為窗口大小和滑動大小摧玫。它們都是基于 GlobalWindows 這個 WindowAssigner 來創(chuàng)建的窗口耳奕,該assigner會將所有元素都分配到同一個global window中,所有GlobalWindows的返回值一直是 GlobalWindow 單例诬像∥萑海基本上自定義的窗口都會基于該assigner實現(xiàn)。

翻滾計數(shù)窗口并不帶evictor坏挠,只注冊了一個trigger芍躏。該trigger是帶purge功能的 CountTrigger。也就是說每當窗口中的元素數(shù)量達到了 window-size癞揉,trigger就會返回fire+purge纸肉,窗口就會執(zhí)行計算并清空窗口中的所有元素溺欧,再接著儲備新的元素。從而實現(xiàn)了tumbling的窗口之間無重疊柏肪。

滑動計數(shù)窗口的各窗口之間是有重疊的姐刁,但我們用的 GlobalWindows assinger 從始至終只有一個窗口,不像 sliding time assigner 可以同時存在多個窗口烦味。所以trigger結(jié)果不能帶purge聂使,也就是說計算完窗口后窗口中的數(shù)據(jù)要保留下來(供下個滑窗使用)。另外谬俄,trigger的間隔是slide-size柏靶,evictor的保留的元素個數(shù)是window-size。也就是說溃论,每個滑動間隔就觸發(fā)一次窗口計算屎蜓,并保留下最新進入窗口的window-size個元素,剔除舊元素钥勋。

假設有一個滑動計數(shù)窗口炬转,每2個元素計算一次最近4個元素的總和,那么窗口工作示意圖如下所示:

圖中所示的各個窗口邏輯上是不同的窗口算灸,但在物理上是同一個窗口扼劈。該滑動計數(shù)窗口,trigger的觸發(fā)條件是元素個數(shù)達到2個(每進入?2個元素就會觸發(fā)一次)菲驴,evictor保留的元素個數(shù)是4個荐吵,每次計算完窗口總和后會保留剩余的元素。所以第一次觸發(fā)trigger是當元素5進入赊瞬,第三次觸發(fā)trigger是當元素2進入先煎,并驅(qū)逐5和2,計算剩余的4個元素的總和(22)并發(fā)送出去森逮,保留下2,4,9,7元素供下個邏輯窗口使用榨婆。

Time Window 實現(xiàn)

同樣的,我們也可以在 KeyedStream 上申請 Time Window褒侧,其源碼如下所示:

// tumbling time window
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
 if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
 return window(TumblingProcessingTimeWindows.of(size));
 } else {
 return window(TumblingEventTimeWindows.of(size));
 }
}
// sliding time window
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
 if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
 return window(SlidingProcessingTimeWindows.of(size, slide));
 } else {
 return window(SlidingEventTimeWindows.of(size, slide));
 }
}

在方法體內(nèi)部會根據(jù)當前環(huán)境注冊的時間類型良风,使用不同的WindowAssigner創(chuàng)建window∶乒可以看到烟央,EventTime和IngestTime都使用了XXXEventTimeWindows這個assigner,因為EventTime和IngestTime在底層的實現(xiàn)上只是在Source處為Record打時間戳的實現(xiàn)不同歪脏,在window operator中的處理邏輯是一樣的疑俭。

這里我們主要分析sliding process time window,如下是相關源碼:

public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
 private static final long serialVersionUID = 1L;

 private final long size;

 private final long slide;

 private SlidingProcessingTimeWindows(long size, long slide) {
 this.size = size;
 this.slide = slide;
 }

 @Override
 public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
 timestamp = System.currentTimeMillis();
 List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
 // 對齊時間戳
 long lastStart = timestamp - timestamp % slide;
 for (long start = lastStart;
 start > timestamp - size;
 start -= slide) {
 // 當前時間戳對應了多個window
 windows.add(new TimeWindow(start, start + size));
 }
 return windows;
 }
 ...
}
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
 @Override
 // 每個元素進入窗口都會調(diào)用該方法
 public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
 // 注冊定時器婿失,當系統(tǒng)時間到達window end timestamp時會回調(diào)該trigger的onProcessingTime方法
 ctx.registerProcessingTimeTimer(window.getEnd());
 return TriggerResult.CONTINUE;
 }

 @Override
 // 返回結(jié)果表示執(zhí)行窗口計算并清空窗口
 public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
 return TriggerResult.FIRE_AND_PURGE;
 }
 ...
}

首先钞艇,SlidingProcessingTimeWindows會對每個進入窗口的元素根據(jù)系統(tǒng)時間分配到(size / slide)個不同的窗口啄寡,并會在每個窗口上根據(jù)窗口結(jié)束時間注冊一個定時器(相同窗口只會注冊一份),當定時器超時時意味著該窗口完成了哩照,這時會回調(diào)對應窗口的Trigger的onProcessingTime方法挺物,返回FIRE_AND_PURGE,也就是會執(zhí)行窗口計算并清空窗口飘弧。整個過程示意圖如下:

如上圖所示橫軸代表時間戳(為簡化問題识藤,時間戳從0開始),第一條record會被分配到[-5,5)和[0,10)兩個窗口中次伶,當系統(tǒng)時間到5時痴昧,就會計算[-5,5)窗口中的數(shù)據(jù),并將結(jié)果發(fā)送出去冠王,最后清空窗口中的數(shù)據(jù)赶撰,釋放該窗口資源。

Session Window 實現(xiàn)

Session Window 是一個需求很強烈的窗口機制柱彻,但Session也比之前的Window更復雜扣囊,所以 Flink 也是在即將到來的 1.1.0 版本中才支持了該功能。由于篇幅問題绒疗,我們將在后續(xù)的 Session Window 的實現(xiàn) 中深入探討 Session Window 的實現(xiàn)。

參考資料

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末骂澄,一起剝皮案震驚了整個濱河市吓蘑,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌坟冲,老刑警劉巖磨镶,帶你破解...
    沈念sama閱讀 217,406評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異健提,居然都是意外死亡琳猫,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評論 3 393
  • 文/潘曉璐 我一進店門私痹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來脐嫂,“玉大人,你說我怎么就攤上這事紊遵≌饲В” “怎么了?”我有些...
    開封第一講書人閱讀 163,711評論 0 353
  • 文/不壞的土叔 我叫張陵暗膜,是天一觀的道長匀奏。 經(jīng)常有香客問我,道長学搜,這世上最難降的妖魔是什么娃善? 我笑而不...
    開封第一講書人閱讀 58,380評論 1 293
  • 正文 為了忘掉前任论衍,我火速辦了婚禮,結(jié)果婚禮上聚磺,老公的妹妹穿的比我還像新娘坯台。我一直安慰自己,他們只是感情好咧最,可當我...
    茶點故事閱讀 67,432評論 6 392
  • 文/花漫 我一把揭開白布捂人。 她就那樣靜靜地躺著,像睡著了一般矢沿。 火紅的嫁衣襯著肌膚如雪滥搭。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,301評論 1 301
  • 那天捣鲸,我揣著相機與錄音瑟匆,去河邊找鬼。 笑死栽惶,一個胖子當著我的面吹牛愁溜,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播外厂,決...
    沈念sama閱讀 40,145評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼冕象,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了汁蝶?” 一聲冷哼從身側(cè)響起渐扮,我...
    開封第一講書人閱讀 39,008評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎掖棉,沒想到半個月后墓律,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,443評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡幔亥,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,649評論 3 334
  • 正文 我和宋清朗相戀三年耻讽,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片帕棉。...
    茶點故事閱讀 39,795評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡针肥,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出笤昨,到底是詐尸還是另有隱情祖驱,我是刑警寧澤,帶...
    沈念sama閱讀 35,501評論 5 345
  • 正文 年R本政府宣布瞒窒,位于F島的核電站捺僻,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜匕坯,卻給世界環(huán)境...
    茶點故事閱讀 41,119評論 3 328
  • 文/蒙蒙 一束昵、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧葛峻,春花似錦锹雏、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至采记,卻和暖如春佣耐,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背唧龄。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評論 1 269
  • 我被黑心中介騙來泰國打工兼砖, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人既棺。 一個月前我還...
    沈念sama閱讀 47,899評論 2 370
  • 正文 我出身青樓讽挟,卻偏偏與公主長得像,于是被迫代替她去往敵國和親丸冕。 傳聞我的和親對象是個殘疾皇子耽梅,可洞房花燭夜當晚...
    茶點故事閱讀 44,724評論 2 354