Flink 窗口觸發(fā)器解析

1.前言

窗口的觸發(fā)器定義了窗口是何時(shí)被觸發(fā)并同時(shí)決定觸發(fā)行為(對(duì)窗口進(jìn)行清理或者計(jì)算)爪飘。
注意:窗口的觸發(fā)在內(nèi)部是設(shè)置定時(shí)器來(lái)實(shí)現(xiàn)的。

2. 觸發(fā)器相關(guān)類

Trigger抽象類

Trigger抽象類
  • onElement:每個(gè)元素到達(dá)觸發(fā)的回調(diào)方法愿阐;
  • onProcessingTime:基于處理時(shí)間定時(shí)器觸發(fā)的回調(diào)方法污它;
  • onEventTime:基于事件時(shí)間定時(shí)器觸發(fā)的回調(diào)方法诊杆;
  • onMerge:窗口在合并時(shí)觸發(fā)的回調(diào)方法(會(huì)話窗口分配器assigner);

TriggerContext接口(定義在Trigger類中)氓拼,用于維持狀態(tài)你画,注冊(cè)定時(shí)器等:

TriggerContext
  • registerXXXTimeTimer:注冊(cè)(處理/事件)時(shí)間定時(shí)器;
  • deleteXXXTimeTimer:刪除(處理/事件)時(shí)間定時(shí)器桃漾;
  • getPartitionedState:從Flink狀態(tài)存儲(chǔ)終端獲取狀態(tài)坏匪;

TriggerResult枚舉類,用于決定窗口在觸發(fā)后的行為:

  • CONTINUE:不作任何處理撬统;
  • FIRE_AND_PURGE:觸發(fā)窗口計(jì)算并輸出結(jié)果同時(shí)清理并釋放窗口(該值只會(huì)被清理觸發(fā)器PurgingTrigger使用)适滓;
  • FIRE:觸發(fā)窗口計(jì)算并輸出結(jié)果,但窗口并沒(méi)有被釋放并且數(shù)據(jù)仍然保留恋追;
  • PURGE:不觸發(fā)窗口計(jì)算凭迹,不輸出結(jié)果,只清除窗口中的所有數(shù)據(jù)并釋放窗口
3.時(shí)間窗口觸發(fā)器
3.1 ProcessingTimeTrigger

onElement方法

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }

先去注冊(cè)一個(gè)ProcessingTime定時(shí)器苦囱,觸發(fā)時(shí)間點(diǎn)就是當(dāng)前窗口的最大時(shí)間戳嗅绸;
觸發(fā)結(jié)果就是不做任何操作。

onProcessingTime和onEventTime方法

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
        return TriggerResult.FIRE;
    }
3.2 EventTimeTrigger

onElement方法

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

當(dāng)窗口的最大時(shí)間戳小于等于水位線立即觸發(fā)窗口計(jì)算撕彤;否則的話就去注冊(cè)EventTime定時(shí)器鱼鸠,結(jié)果就是不做任何操作。

onProcessingTime和onEventTime方法

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ?
            TriggerResult.FIRE :
            TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

onEventTime:如果時(shí)間等于窗口的最大時(shí)間戳羹铅,則觸發(fā)對(duì)窗口進(jìn)行計(jì)算蚀狰,否則不做任何操作

這里為什么需要有一層時(shí)間的判斷呢(time == window.maxTimestamp() )?

參考博客:Flink中EventTimeTrigger的理解

4.持續(xù)時(shí)間觸發(fā)器

筆者之前寫過(guò) 窗口實(shí)用觸發(fā)器
持續(xù)觸發(fā)职员,顧名思義麻蹋,在本次觸發(fā)之后需要更新并且保存下一次觸發(fā)的時(shí)間戳,因此在持續(xù)時(shí)間觸發(fā)器中引入了狀態(tài)保存機(jī)制:

/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */
    private final ReducingStateDescriptor<Long> stateDesc =
            new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);

使用的是ReducingState焊切,這里面調(diào)用的是Min函數(shù)扮授,選擇多個(gè)時(shí)間戳內(nèi)最小的。

ContinuousProcessingTimeTrigger
onElement方法:

@Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        //獲得存儲(chǔ)觸發(fā)時(shí)間戳的狀態(tài)容器
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
        //獲取當(dāng)前處理時(shí)間
        timestamp = ctx.getCurrentProcessingTime();
        //如果狀態(tài)對(duì)象為空蛛蒙,則初始化糙箍;否則直接返回
        if (fireTimestamp.get() == null) {
            //計(jì)算起始時(shí)間
            long start = timestamp - (timestamp % interval);
            //下一次觸發(fā)時(shí)間戳為起始時(shí)間加上觸發(fā)間隔
            long nextFireTimestamp = start + interval;
            //以下一次觸發(fā)的時(shí)間戳注冊(cè)處理時(shí)間定時(shí)器
            ctx.registerProcessingTimeTimer(nextFireTimestamp);
            //將下一次觸發(fā)計(jì)算的時(shí)間戳加入狀態(tài)進(jìn)行保存
            fireTimestamp.add(nextFireTimestamp);
            return TriggerResult.CONTINUE;
        }
        return TriggerResult.CONTINUE;
    }

所以,ContinuousProcessingTimeTrigger的onElement方法主要是完成對(duì)存儲(chǔ)窗口觸發(fā)時(shí)間狀態(tài)對(duì)象的初始化牵祟,并注冊(cè)了第一次執(zhí)行的定時(shí)器。
ContinuousEventTimeTrigger的onElement方法實(shí)現(xiàn)跟ContinuousProcessingTimeTrigger除了獲取時(shí)間戳的方式不同抖格,基本與此類似诺苹。

基于時(shí)間的回調(diào)方法咕晋,onProcessingTime

public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { 
    //首先從狀態(tài)中查找觸發(fā)時(shí)間  
    ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);   
    //跟定時(shí)器的注冊(cè)時(shí)間進(jìn)行對(duì)比,只有兩者相等時(shí)才會(huì)觸發(fā)計(jì)算
    if (fireTimestamp.get().equals(time)) {      
        //清空狀態(tài)并重新初始化值
        fireTimestamp.clear();      
        fireTimestamp.add(time + interval);      
        //注冊(cè)下一次觸發(fā)的定時(shí)器
        ctx.registerProcessingTimeTimer(time + interval);      
        //觸發(fā)窗口計(jì)算
        return TriggerResult.FIRE;   
    }   
    return TriggerResult.CONTINUE;
}
5.計(jì)數(shù)觸發(fā)器
6.
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末收奔,一起剝皮案震驚了整個(gè)濱河市掌呜,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌坪哄,老刑警劉巖质蕉,帶你破解...
    沈念sama閱讀 218,941評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異翩肌,居然都是意外死亡模暗,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門念祭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)兑宇,“玉大人,你說(shuō)我怎么就攤上這事粱坤×ジ猓” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 165,345評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵站玄,是天一觀的道長(zhǎng)枚驻。 經(jīng)常有香客問(wèn)我,道長(zhǎng)株旷,這世上最難降的妖魔是什么再登? 我笑而不...
    開(kāi)封第一講書人閱讀 58,851評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮灾常,結(jié)果婚禮上霎冯,老公的妹妹穿的比我還像新娘。我一直安慰自己钞瀑,他們只是感情好沈撞,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,868評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著雕什,像睡著了一般缠俺。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上贷岸,一...
    開(kāi)封第一講書人閱讀 51,688評(píng)論 1 305
  • 那天壹士,我揣著相機(jī)與錄音,去河邊找鬼偿警。 笑死躏救,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播盒使,決...
    沈念sama閱讀 40,414評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼崩掘,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了少办?” 一聲冷哼從身側(cè)響起苞慢,我...
    開(kāi)封第一講書人閱讀 39,319評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎英妓,沒(méi)想到半個(gè)月后挽放,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,775評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蔓纠,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,945評(píng)論 3 336
  • 正文 我和宋清朗相戀三年辑畦,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片贺纲。...
    茶點(diǎn)故事閱讀 40,096評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡航闺,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出猴誊,到底是詐尸還是另有隱情潦刃,我是刑警寧澤,帶...
    沈念sama閱讀 35,789評(píng)論 5 346
  • 正文 年R本政府宣布懈叹,位于F島的核電站乖杠,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏澄成。R本人自食惡果不足惜胧洒,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,437評(píng)論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望墨状。 院中可真熱鬧卫漫,春花似錦、人聲如沸肾砂。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,993評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)镐确。三九已至包吝,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間源葫,已是汗流浹背诗越。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,107評(píng)論 1 271
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留息堂,地道東北人嚷狞。 一個(gè)月前我還...
    沈念sama閱讀 48,308評(píng)論 3 372
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親感耙。 傳聞我的和親對(duì)象是個(gè)殘疾皇子褂乍,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,037評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 摘要 Flink 認(rèn)為 Batch 是 Streaming 的一個(gè)特例持隧,所以 Flink 底層引擎是一個(gè)流式引擎即硼,...
    尼小摩閱讀 5,093評(píng)論 0 13
  • 介紹 window(窗口)是Flink流處理中非常重要的概念,本篇我們來(lái)對(duì)窗口相關(guān)的概念以及關(guān)聯(lián)的實(shí)現(xiàn)進(jìn)行解析屡拨。本...
    苗棟棟閱讀 944評(píng)論 0 1
  • 原文連接 https://ci.apache.org/projects/flink/flink-docs-rele...
    Alex90閱讀 3,459評(píng)論 0 5
  • 架構(gòu) Apache Flink是一個(gè)框架和分布式處理引擎只酥,用于對(duì)無(wú)界和有界數(shù)據(jù)流進(jìn)行有狀態(tài)計(jì)算。Flink設(shè)計(jì)為在...
    盜夢(mèng)者_(dá)56f2閱讀 37,805評(píng)論 0 6
  • 鏈接:https://zhuanlan.zhihu.com/p/20585530來(lái)源:知乎著作權(quán)歸作者所有呀狼。商業(yè)轉(zhuǎn)...
    七海的游風(fēng)閱讀 1,605評(píng)論 0 4