本文檔整理于Flink社區(qū)直播課
Flink窗口是實(shí)時(shí)處理非常重要的技術(shù)僻族,廣泛用于實(shí)時(shí)ETL紧阔、實(shí)時(shí)報(bào)表以及一些實(shí)時(shí)的監(jiān)控蛾找。
學(xué)習(xí)路線
為什么要關(guān)心實(shí)現(xiàn)
- ReduceFunction為什么不用計(jì)算每個(gè)key的聚合值漠吻?
- 當(dāng)key基數(shù)很大時(shí)途乃,如何有效計(jì)算每個(gè)key窗口計(jì)算?
- 窗口計(jì)算的中間結(jié)果如何存儲(chǔ)烫饼,何時(shí)被清理枫弟?
- 窗口計(jì)算如何容忍late data?
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
實(shí)時(shí)數(shù)倉(cāng)典型架構(gòu)
Window應(yīng)用場(chǎng)景
Window抽象概念
- TimeStampAssigner: 當(dāng)基于Event Time時(shí)間語(yǔ)義時(shí)骇塘,定義數(shù)據(jù)流的event time并定義Watermark的生成算法款违。
- KeySelector:keyBy(col...)插爹,以什么字段分組,選擇根據(jù)什么維度來(lái)做窗口聚合力穗。
- WindowAssigner: 窗口分配器当窗,定義即將到來(lái)的元素應(yīng)該分配給哪一個(gè)窗口
- state:窗口狀態(tài)崖面,增量聚合函數(shù)時(shí)狀態(tài)為ValueState梯影,全量聚合函數(shù)時(shí)狀態(tài)為一個(gè)ListState
- Trigger 觸發(fā)器甲棍,定義窗口何時(shí)出發(fā)計(jì)算,發(fā)出計(jì)算結(jié)果
- Evictor 移除器倘零,在ProcessWindowFunction調(diào)用之后或之前引用戳寸∫呷担可以從窗口中刪除已經(jīng)收集的元素拆吆。由于需要遍歷所有元素,只能在沒(méi)有定義增量聚合函數(shù)時(shí)使用霉晕。
- ProcessWindowFunction 全量窗口函數(shù)牺堰,可以訪問(wèn)窗口的元數(shù)據(jù)如窗口的開(kāi)始時(shí)間以及結(jié)束時(shí)間伟葫,訪問(wèn)當(dāng)前處理時(shí)間和Watermark筏养,表達(dá)能力比增量集合函數(shù)強(qiáng)渐溶,代價(jià)是大狀態(tài)掌猛,可以結(jié)合增量聚合函數(shù)使用眉睹。
Window編程接口
Window Assigner
Window Trigger
什么是Trigger
Flink中Trigger用于定義何時(shí)對(duì)窗口進(jìn)行計(jì)算并發(fā)出結(jié)果慕蔚,它的觸發(fā)條件可以是時(shí)間也可以是某些特定條件斋配。對(duì)于時(shí)間窗口而言,默認(rèn)Trigger是Watermark大于窗口結(jié)束時(shí)間時(shí)觸發(fā)桂对。
在Flink窗口機(jī)制中鸠匀,還有一個(gè)窗格的概念宅此,它將窗口劃分成多個(gè)規(guī)則的部分爬范,這些部分可看作子窗口青瀑,可簡(jiǎn)單理解為對(duì)窗口再次分片狱窘。窗口則定義為一組key相同(分區(qū)操作),并且位于同一個(gè)窗口中的元素。每個(gè)窗格都有一個(gè)Trigger對(duì)象搭儒。
先看看Trigger類中的幾個(gè)重要函數(shù):
// 每當(dāng)有元素添加到窗口都會(huì)調(diào)用
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
// 在處理時(shí)間計(jì)時(shí)器觸發(fā)時(shí)調(diào)用
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
// 在事件時(shí)間計(jì)時(shí)器觸發(fā)時(shí)調(diào)用
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
// 該函數(shù)會(huì)在清除窗口是調(diào)用
public abstract void clear(W window, TriggerContext ctx) throws Exception;
TriggerResult
每次調(diào)用觸發(fā)器都會(huì)生成一個(gè)TriggerResult,它用于決定窗口接下來(lái)的行為茴扁。
- CONTINUE: 不做任何處理
- FIRE:觸發(fā)計(jì)算
- PURGE:完全清除窗口內(nèi)容峭火,并刪除窗口自身及元數(shù)據(jù)纺且。
- FIRE_AND_PURGE:先進(jìn)行窗口計(jì)算(FIRE)载碌,隨后刪除所有狀態(tài)及元數(shù)據(jù)(PURGE)
Flink中Trigger的幾個(gè)實(shí)現(xiàn)類:
內(nèi)置Trigger | 說(shuō)明 |
---|---|
ProcessingTimeTrigger | 一次觸發(fā),machine time大于窗口結(jié)束時(shí)間時(shí)觸發(fā) |
EventTimeTrigger | 一次觸發(fā),watermark大于窗口結(jié)束時(shí)間時(shí)觸發(fā) |
ContinuousProcessingTimeTrigger | 多次觸發(fā),基于processing time的固定時(shí)間間隔 |
ContinuousEventTimeTrigger | 多次觸發(fā) ,基于event time的固定時(shí)間間隔 |
CountTrigger | 多次觸發(fā),基于element的固定條數(shù) |
DeltaTrigger | 多次觸發(fā),當(dāng)前element與上次觸發(fā)trigger的element做delta計(jì)算,超過(guò)threshold(閾值)時(shí)觸發(fā) |
PurgingTrigger | trigger wrapper,當(dāng)nested trigger時(shí)觸發(fā),額外會(huì)清理窗口當(dāng)前的中間狀態(tài) |
DeltaTrigger :會(huì)計(jì)算一個(gè)Delta值,那么到底是誰(shuí)跟誰(shuí)計(jì)算一個(gè)Delta呢?當(dāng)前的element與上次觸發(fā)Trigger的element做一個(gè)Delta計(jì)算.如果超過(guò)了指定的閾值,那么就觸發(fā)計(jì)算.
PurgingTrigger:實(shí)際上是一個(gè)wrapper,是對(duì)上一個(gè)Trigger的包裝,它可以對(duì)嵌套的Trigger做什么事情呢?讓嵌套的Trigger自己去觸發(fā),一旦它觸發(fā)的時(shí)候,可以給一個(gè)額外的功能,額外會(huì)清除窗口當(dāng)前的中間狀態(tài).
Trigger實(shí)例
-
圖trigger-1表示event time的數(shù)據(jù)流朗伶,定義5分鐘的滾動(dòng)窗口步咪,窗口函數(shù)會(huì)計(jì)算事件的累計(jì)次數(shù)纯丸,同時(shí)使用基于event time的固定事件間隔的觸發(fā)器將結(jié)構(gòu)(窗口狀態(tài))sink到外部系統(tǒng)。
-
前面2min的數(shù)據(jù)進(jìn)入窗口后,積累次數(shù)(作為窗口狀態(tài))坠陈,由于定義了間隔2min的窗口觸發(fā)器捐康,此時(shí)結(jié)果將sink到外部系統(tǒng)解总。
-
第三分鐘數(shù)據(jù)到達(dá)時(shí)刻盐,窗口狀態(tài)更新敦锌,但還沒(méi)有達(dá)到下次窗口出發(fā)的時(shí)間乙墙。
-
第4分鐘數(shù)據(jù)進(jìn)入窗口听想,更新窗口狀態(tài)哗魂,達(dá)到窗口觸發(fā)條件(間隔2min)录别,發(fā)出結(jié)果组题,并對(duì)外部系統(tǒng)中已經(jīng)存在的結(jié)果值發(fā)出更新梢褐。
Question:如果Result只能Append赵讯,不支持Update(如Druid)盈咳,該如何解決呢?
PurgingTrigger:對(duì)于嵌套在內(nèi)的觸發(fā)器觸發(fā)計(jì)算時(shí)边翼,同時(shí)清除窗口中的狀態(tài)鱼响。
Window Evictor
注:關(guān)于Window Trigger與Evictor的具體實(shí)現(xiàn)與案例,總結(jié)起來(lái)比較多组底,我會(huì)放在Trigger丈积、Evictor源碼篇里面去講解。