介紹
window(窗口)是Flink流處理中非常重要的概念击胜,本篇我們來對(duì)窗口相關(guān)的概念以及關(guān)聯(lián)的實(shí)現(xiàn)進(jìn)行解析褥实。本篇的內(nèi)容主要集中在package org.apache.flink.streaming.api.windowing下仓犬。
Window
一個(gè)Window代表有限對(duì)象的集合桶现。一個(gè)窗口有一個(gè)最大的時(shí)間戳匪凡,該時(shí)間戳意味著在其代表的某時(shí)間點(diǎn)——所有應(yīng)該進(jìn)入這個(gè)窗口的元素都已經(jīng)到達(dá)严衬。
Flink的根窗口對(duì)象是一個(gè)抽象類巡李,只提供了一個(gè)抽象方法:
public abstract long maxTimestamp();
用于獲取最大的時(shí)間戳抚笔。Flink提供了兩個(gè)窗口的具體實(shí)現(xiàn)。在實(shí)現(xiàn)Window時(shí)侨拦,子類應(yīng)該override equals和hashCode這兩個(gè)方法殊橙,以使得在邏輯上兩個(gè)相等的window被認(rèn)為是同一個(gè)。
GlobalWindow
GlobalWindow是一個(gè)全局窗口狱从,被實(shí)現(xiàn)為單例模式膨蛮。其maxTimestamp被設(shè)置為L(zhǎng)ong.MAX_VALUE。
該類內(nèi)部有一個(gè)靜態(tài)類定義了GlobalWindow的序列化器:Serializer季研。
TimeWindow
TimeWindow表示一個(gè)時(shí)間間隔窗口敞葛,這體現(xiàn)在其構(gòu)造器需要注入的兩個(gè)屬性:
- start : 時(shí)間間隔的起始
- end : 時(shí)間間隔的截止
TimeWindow表示的時(shí)間間隔為[start, end)。其maxTimestamp的實(shí)現(xiàn)為:
public long maxTimestamp() {
return end - 1;
}
其equals的實(shí)現(xiàn)中与涡,除了常規(guī)比較(比較引用惹谐,比較Class的實(shí)例)持偏,還會(huì)比較start,end這兩個(gè)屬性。
TimeWindow也在內(nèi)部實(shí)現(xiàn)了序列化器氨肌,該序列化器主要針對(duì)start和end兩個(gè)屬性综液。
WindowAssigner
元素的窗口分配器。用于將元素分配給一個(gè)或者多個(gè)窗口儒飒。該抽象類定義了三個(gè)抽象方法:
- assignWindows :將某個(gè)帶有時(shí)間戳timestamp的元素element分配給一個(gè)或多個(gè)窗口谬莹,并返回窗口集合
- getDefaultTrigger :返回跟WindowAssigner關(guān)聯(lián)的默認(rèn)觸發(fā)器
- getWindowSerializer :返回WindowAssigner分配的窗口的序列化器
內(nèi)置的WindowAssigner
整個(gè)類型繼承圖如下:
下面會(huì)談到很多基于時(shí)間的窗口,這里有兩個(gè)概念桩了,分別是時(shí)間類型和窗口類型:
時(shí)間類型:
- eventTime :用戶賦予的自定義的時(shí)間戳(事件時(shí)間戳)
- processingTime : 執(zhí)行當(dāng)前task的subtask主機(jī)的本地時(shí)間戳(系統(tǒng)時(shí)間戳)
窗口類型:
- Sliding:滑動(dòng)窗口附帽,可能會(huì)重疊(某個(gè)元素可能會(huì)身處多個(gè)窗口中)
- Tumbling:非重疊窗口(在assignWindows方法中返回的一般都是Collections.singletonList())
GlobalWindows
該分配器對(duì)應(yīng)于窗口GlobalWindow,它將所有的元素分配給同一個(gè)GlobalWindow(本質(zhì)上而言井誉,GlobalWindow也只有一個(gè)實(shí)例)蕉扮。跟GlobalWindow的實(shí)現(xiàn)方式一樣,GlobalWindows也被實(shí)現(xiàn)為單例模式颗圣。
方法實(shí)現(xiàn):
- assignWindows :方法的實(shí)現(xiàn)即返回存放GlobalWindow單實(shí)例的集合對(duì)象
- getDefaultTrigger :的實(shí)現(xiàn)是返回一個(gè)不做任何動(dòng)作的NerverTrigger
TumblingEventTimeWindows
依據(jù)給定的窗口大小喳钟,結(jié)合event-time,返回存儲(chǔ)TimeWindow單實(shí)例的集合在岂。getDefaultTrigger方法返回EventTimeTrigger類型的實(shí)例奔则。
TumblingProcessingTimeWindows
依據(jù)給定窗口的大小,結(jié)合processing-time蔽午,返回存儲(chǔ)TimeWindow單實(shí)例的集合易茬。需要注意的是,這里依據(jù)的是運(yùn)行當(dāng)前任務(wù)所在主機(jī)的本地時(shí)間戳及老。getDefaultTrigger方法返回的是ProcessingTimeTrigger類型的實(shí)例抽莱。
SlidingProcessingTimeWindows
Sliding窗口不同于Tumbling窗口,它除了指定窗口的大小骄恶,還要指定一個(gè)滑動(dòng)值食铐,即slide。所謂的滑動(dòng)窗口可以這么理解僧鲁,比如:一分鐘里每十秒鐘虐呻。這里一分鐘是窗口大小,每十秒即為滑動(dòng)值悔捶。
在Sliding窗口中铃慷,assignWindows方法返回的就不再是單個(gè)窗口了,而是窗口的集合蜕该。首先計(jì)算出窗口的個(gè)數(shù):size/slide犁柜,然后循環(huán)初始化給定的size內(nèi)不同slide的窗口對(duì)象。
SlidingEventTimeWindows
類似SlidingProcessingTimeWindows只不過窗口的start參數(shù)的計(jì)算方式依賴于系統(tǒng)時(shí)間戳堂淡。
EventTimeSessionWindows
繼承MergingWindowAssigner類
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
}
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
ProcessingTimeSessionWindows
繼承MergingWindowAssigner類
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
long currentProcessingTime = context.getCurrentProcessingTime();
return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
}
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return ProcessingTimeTrigger.create();
}
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
SlidingAlignedProcessingTimeWindows
繼承自BaseAlignedWindowAssigner.
簡(jiǎn)單說馋缅,就是要廢棄的扒腕。
更詳細(xì)的看:
*這是一個(gè)特殊的窗口分配器,用于告訴系統(tǒng)使用
* <i>“快速對(duì)齊處理時(shí)間窗口運(yùn)算符”</ i>進(jìn)行窗口化萤悴。
*
* <p>以前的Flink版本使用該操作符自動(dòng)進(jìn)行簡(jiǎn)單的處理時(shí)間
*窗口(翻滾和滑動(dòng))瘾腰,沒有指定自定義觸發(fā)和沒有驅(qū)逐器。
*在當(dāng)前的Flink版本中覆履,該運(yùn)算符僅在程序明確使用時(shí)才使用
*指定此窗口分配器蹋盆。這僅適用于程序依賴的特殊情況
*快速對(duì)齊窗口操作員的性能更好,并愿意接受缺點(diǎn)
*支持各種功能硝全,如下所示:
*
* <ul>
* <li>無法選擇自定義狀態(tài)后端栖雾,操作員始終將數(shù)據(jù)存儲(chǔ)在Java堆上。</li>
* <li>運(yùn)算符不支持鍵組伟众,這意味著它不能改變并行性析藕。</ li>
* <li> Flink的未來版本可能無法從此采取的檢查點(diǎn)/保存點(diǎn)恢復(fù)
*操作員。</ li>
* </ ul>
*
* <p>未來實(shí)施計(jì)劃:我們計(jì)劃添加該運(yùn)算符使用的一些優(yōu)化
*一般窗口操作符凳厢,以便將來版本的Flink不會(huì)具有性能/功能
*權(quán)衡更多账胧。
TumblingAlignedProcessingTimeWindows
繼承自BaseAlignedWindowAssigner。
簡(jiǎn)單說先紫,就是要廢棄的治泥。
同上。
evictors
evitor : 中文譯為驅(qū)逐者泡孩;顧名思義其用于剔除窗口中的某些元素
它剔除元素的時(shí)機(jī)是:在觸發(fā)器觸發(fā)之后车摄,在窗口被處理(apply windowFunction)之前
該接口只定義了一個(gè)方法:
int evict(Iterable<StreamRecord<T>> elements, int size, W window);
接口的返回值即表示要剔除元素的個(gè)數(shù)寺谤。
內(nèi)置的Evitor
Flink內(nèi)置實(shí)現(xiàn)了三個(gè)Evitor:
- TimeEvitor
- CountEvitor
- DeltaEvitor
TimeEvitor
這個(gè)Evitor基于給定的保留時(shí)間(keep time)作為剔除規(guī)則仑鸥,大致的實(shí)現(xiàn)如下:
public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
int toEvict = 0;
long currentTime = Iterables.getLast(elements).getTimestamp();
long evictCutoff = currentTime - windowSize;
for (StreamRecord<Object> record: elements) {
if (record.getTimestamp() > evictCutoff) {
break;
}
toEvict++;
}
return toEvict;
}
大致的邏輯是,先取出最后一個(gè)元素的時(shí)間戳作為“當(dāng)前”時(shí)間变屁,然后減去期望中的“窗口大小”眼俊,得到一個(gè)基準(zhǔn)時(shí)間戳(只需要比基準(zhǔn)時(shí)間戳大的元素)。
然后從第一個(gè)元素開始循環(huán)比較每一個(gè)元素粟关,如果比基準(zhǔn)時(shí)間戳小疮胖,則累加剔除統(tǒng)計(jì)數(shù),一旦發(fā)現(xiàn)某個(gè)元素的時(shí)間戳大于基準(zhǔn)時(shí)間戳闷板,則直接跳出循環(huán)澎灸,不再累加了(因?yàn)楸镜卮翱谥性厥腔跁r(shí)間有序的,這一點(diǎn)由Flink運(yùn)行時(shí)來保證遮晚,如果從某個(gè)元素開始其時(shí)間戳大于基準(zhǔn)時(shí)間戳性昭,則后續(xù)的所有元素都滿足這一條件,因此也就沒必要再循環(huán)下去了)县遣。
CountEvictor
基于容量的Evictor糜颠,它通過比對(duì)evict方法的第二個(gè)參數(shù)size來判斷應(yīng)該剔除多少個(gè)元素汹族。具體的實(shí)現(xiàn):
public int evict(Iterable<StreamRecord<Object>> elements, int size, W window) {
if (size > maxCount) {
return (int) (size - maxCount);
} else {
return 0;
}
}
DeltaEvictor
基于給定的閾值threshold和deltaFunction來進(jìn)行判斷。也是拿當(dāng)前元素跟最后一個(gè)元素一起計(jì)算delta跟閾值做對(duì)比其兴。
Time
Flink中僅有一個(gè)類Time來定義窗口的時(shí)間間隔顶瞒。該時(shí)間默認(rèn)指執(zhí)行環(huán)境下的時(shí)間。創(chuàng)建一個(gè)Time對(duì)象元旬,需要兩個(gè)參數(shù):
- size : 時(shí)間間隔的大辛裥臁(數(shù)值)
- unit : TimeUnit的實(shí)例,表示時(shí)間間隔的單位
該類提供的很多靜態(tài)方法提供對(duì)不同unit的設(shè)置匀归。
Trigger
Trigger(觸發(fā)器)用于決定某個(gè)窗口的元素集合什么時(shí)候觸發(fā)計(jì)算以及結(jié)果什么時(shí)候被emit箕速。
以粗粒度來看,F(xiàn)link主要提供了三種形式的觸發(fā)方式:
- 按元素
- 按系統(tǒng)時(shí)間
- 按事件時(shí)間
這體現(xiàn)為Trigger的三個(gè)主要的抽象方法:
- onElement :針對(duì)每個(gè)元素觸發(fā)朋譬,這主要針對(duì)于那些基于元素的觸發(fā)器盐茎,比如后面我們將看到的CountTrigger
- onProcessingTime :被processing-time(Flink系統(tǒng)時(shí)間時(shí)間戳)定時(shí)器觸發(fā)
- onEventTime :被event-time(事件時(shí)間戳)定時(shí)器觸發(fā)
以上這些方法中都有一個(gè)共同的參數(shù):TriggerContext。
TriggerContext
顧名思義徙赢,它提供觸發(fā)器執(zhí)行時(shí)的上下文信息字柠,但它只是Trigger的內(nèi)部接口:
- getCurrentWatermark :返回當(dāng)前的watermark
- registerProcessingTimeTimer :注冊(cè)一個(gè)系統(tǒng)時(shí)間的定時(shí)器,觸發(fā)onProcessingTime
- registerEventTimeTimer :注冊(cè)一個(gè)事件時(shí)間的定時(shí)器狡赐,觸發(fā)onEventTime
- deleteProcessingTimeTimer :刪除系統(tǒng)時(shí)間的定時(shí)器
- deleteEventTimeTimer :刪除事件時(shí)間的定時(shí)器
- getPartitionedState :用于失敗恢復(fù)的獲取狀態(tài)的接口
其中窑业,registerXXX/deleteXXX模式對(duì)主要針對(duì)上面兩種基于時(shí)間的觸發(fā)器。而最后一個(gè)方法getKeyValueState也是非常重要的枕屉,因?yàn)樗糜讷@取窗口相關(guān)的狀態(tài)常柄,比如后面談到的一些觸發(fā)器是依賴于一些上下文狀態(tài)的,那些狀態(tài)的獲取就是依靠這個(gè)方法搀擂。
TrigerResult
Trigger中定義的三個(gè)觸發(fā)方法被調(diào)用后西潘,最終要返回一個(gè)結(jié)果以決定觸發(fā)之后產(chǎn)生的行為(比如是調(diào)用window function還是將窗口丟棄),這個(gè)定義觸發(fā)器觸發(fā)結(jié)果行為是通過TriggerResult來表達(dá)的哨颂。它是一個(gè)枚舉類型喷市,有這么幾個(gè)枚舉值:
- FIRE :window將會(huì)被應(yīng)用window Function進(jìn)行計(jì)算,然后將結(jié)果emit出去威恼,但元素并沒有被清洗品姓,仍然在window中
- PURGE :清除window中的元素
- FIRE_AND_PURGE :同時(shí)具備FIRE和PURGE兩種屬性產(chǎn)生的行為
- CONTINUE :不做任何操作
內(nèi)置的Trigger
Flink內(nèi)置實(shí)現(xiàn)了很多觸發(fā)器,完整的類圖如下:
[圖片上傳失敗...(image-4f804c-1511100517497)]
這些觸發(fā)器都具有一些共性箫措,這里一并說明:
- 由于Flink在Trigger中已事先將各種觸發(fā)器類型的回調(diào)封裝為不同的方法(onXXX)腹备,所以后續(xù)各種不同的觸發(fā)器類型的核心邏輯將主要在其特定相關(guān)的onXXX方法中,而無關(guān)的onXXX方法將直接返回TriggerResult.CONTINUE(這種設(shè)計(jì)方式有欠妥當(dāng)斤蔓,因?yàn)椴焕跀U(kuò)展)
- 因?yàn)橛胁簧儆|發(fā)類型依賴于上下文的某些狀態(tài)值(比如下文典型的ContinuousXXXTrigger)植酥,這些狀態(tài)值將通過TriggerContext的getPartitionedState方法進(jìn)行存取
EventTimeTrigger
基于事件時(shí)間的觸發(fā)器,對(duì)應(yīng)onEventTime
ProcessingTimeTrigger
基于當(dāng)前系統(tǒng)時(shí)間的觸發(fā)器附迷,對(duì)應(yīng)onProcessingTime
ContinuousEventTimeTrigger
該觸發(fā)器是基于事件時(shí)間的按照指定時(shí)間間隔持續(xù)觸發(fā)的觸發(fā)器惧互,它的首次觸發(fā)取決于Watermark哎媚。首次觸發(fā)的判斷位于onElement中,它注冊(cè)下一次(也是首次)觸發(fā)eventTime 定時(shí)器的時(shí)機(jī)喊儡,然后將其first狀態(tài)標(biāo)識(shí)為false拨与。具體實(shí)現(xiàn)如下:
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ValueState<Boolean> first = ctx.getPartitionedState(stateDesc);
if (first.value()) {
long start = timestamp - (timestamp % interval);
long nextFireTimestamp = start + interval;
ctx.registerEventTimeTimer(nextFireTimestamp);
first.update(false);
return TriggerResult.CONTINUE;
}
return TriggerResult.CONTINUE;
}
持續(xù)的觸發(fā)依賴于在onEventTime中不斷注冊(cè)下一次觸發(fā)的定時(shí)器:
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
ctx.registerEventTimeTimer(time + interval);
return TriggerResult.FIRE;
}
ContinuousProcessingTimeTrigger
基于系統(tǒng)時(shí)間的按照指定時(shí)間間隔持續(xù)觸發(fā)的觸發(fā)器,它也是基于保存的狀態(tài)值fire-timestamp來判斷是否需要觸發(fā)艾猜,不過它的循環(huán)注冊(cè)過程是在onElement中买喧。
CountTrigger
基于一個(gè)給定的累加值觸發(fā),由于累加值不是基于時(shí)間而是基于元素的匆赃,所有其觸發(fā)機(jī)制實(shí)現(xiàn)在onElement中淤毛,邏輯很簡(jiǎn)單,先累加如果大于給定的閾值則觸發(fā):
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws IOException {
ValueState<Long> count = ctx.getPartitionedState(stateDesc);
long currentCount = count.value() + 1;
count.update(currentCount);
if (currentCount >= maxCount) {
count.update(0L);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
PurgingTrigger
該觸發(fā)器類似于一個(gè)包裝器算柳,用于將任何給定的觸發(fā)器轉(zhuǎn)變成purging觸發(fā)器低淡。它的實(shí)現(xiàn)機(jī)制是,它接收一個(gè)trigger實(shí)例瞬项,然后在各個(gè)onXXX回調(diào)上執(zhí)行該實(shí)例的相應(yīng)的onXXX并獲得TriggerResult的實(shí)例蔗蹋,進(jìn)行相應(yīng)的判斷,最后返回FIRE_AND_PURGE枚舉值囱淋。
DeltaTrigger
基于DeltaFunction和一個(gè)給定的閾值觸發(fā)猪杭,該觸發(fā)器在最后一個(gè)到達(dá)元素和當(dāng)前元素之間計(jì)算一個(gè)delta值跟給定的閾值比較,如果高于給定的閾值妥衣,則觸發(fā)皂吮。因?yàn)槭腔谠氐模灾饕壿媽?shí)現(xiàn)在onElement中税手。