Flink window窗口機制探究--以tumbling windows為例(一)

零审丘、序言

本篇文章探究Flink Window窗口機制,首先介紹窗口機制使用的總綱可都,涉及的所有組件進行介紹啄枕,心中有一個大體的藍圖和認識婚陪。之后基于keyBy方法返回的Keyed Window入手,分析window方法频祝,并依次進行WindowAssigner泌参、Trigger類介紹脆淹。篇幅所限,計劃在其他文章中繼續(xù)介紹evictor沽一、reduce/aggregate等聚合方法盖溺,以及allowedLateness方法等使用。

一铣缠、背景&目標

為了實現(xiàn)項目場景中自定義窗口功能烘嘱,還是要先把目前Flink提供的窗口機制剖析一下,先從簡單好理解的入手蝗蛙,以tumbling windows為例蝇庭,sliding windows思路也相似。

二捡硅、窗口機制

考慮keyed Windows 哮内,從官網(wǎng)介紹可以通過窗口機制整個使用方法,提綱挈領(lǐng)了解所涉及的組件壮韭,雖然有些組件使用的是時候是可以使用默認而不用指定北发。

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"
組件 作用
keyBy 數(shù)據(jù)流按照key分流
window 需要傳入WindowAssigner類,用來進行Event元素時間窗口分配喷屋。滾動窗口和session窗口一個Event對應(yīng)一個時間窗口琳拨,滑動窗口一個Eevent可以對應(yīng)多個時間窗口。
trigger 用來決定觸發(fā)針對特定時間窗口進行運算的window function執(zhí)行屯曹。
evictor 用來在trigger觸發(fā)后狱庇、window function執(zhí)行之前進行event過濾。
allowedLatteness 允許event延遲時間是牢。
sideOutputLateData 設(shè)置遲到的event 標簽
getSideOutput 獲取遲到event
reduce/aggregate/fold/apply window function窗口計算函數(shù)僵井,對時間窗口中的event 元素進行計算。

先從最基礎(chǔ)的WindowAssigner類開始驳棱,本篇重點以tumbling windows為例。

三农曲、探究剖析--溯源

Flink1.7.2版本Java代碼社搅。

一切都是始于stream.keyBy().window()
stream.keyBy()返回的是一個DataStream類子類:KeyedStream類對象

DataStream類的相關(guān)知識已經(jīng)在通過Flink 程序模板來學(xué)習(xí)StreamExecutionEnvironment 、DataStream 乳规、StreamTransformation類文章中探究過了形葬。

來看看KeyedStream類對象中:window()

public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
        return new WindowedStream<>(this, assigner);
    }

返回的是一個WindowedStream類:

public class WindowedStream<T, K, W extends Window> {

    /** The keyed data stream that is windowed by this stream. */
    private final KeyedStream<T, K> input;

    /** The window assigner. */
    private final WindowAssigner<? super T, W> windowAssigner;

    /** The trigger that is used for window evaluation/emission. */
    private Trigger<? super T, ? super W> trigger;

    /** The evictor that is used for evicting elements before window evaluation. */
    private Evictor<? super T, ? super W> evictor;

    /** The user-specified allowed lateness. */
    private long allowedLateness = 0L;
    // 其他省略
    public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
        function = input.getExecutionEnvironment().clean(function);
        return apply(new InternalIterableWindowFunction<>(function), resultType, function);
    }
        

第一反應(yīng),WindowedStream雖然名字叫WindowedStream但是他不是DataStream類(雖然源代碼中也在datastream包中)暮的!但是呢笙以,他提供了一系列計算操作function,返回的可都是DataStream類的子類:SingleOutputStreamOperator類冻辩。

之后呢猖腕,看到了WindowedStream的成員和方法拆祈,可以看到窗口機制的組件 windowAssigner、trigger倘感、evicto和窗口計算函數(shù)都在放坏,開心啊,按圖索驥即可老玛!

根據(jù)實際運行時的dataflow來看淤年,最終Flink拓撲會被轉(zhuǎn)換為一個有一個包含算子的處理結(jié)構(gòu)。Flink怎么把窗口機制所有的組件都調(diào)動起來呢蜡豹?通過觀察窗口計算函數(shù)返回值都是DataStream類麸粮,整個拓撲就串起來了,對應(yīng)有就有相應(yīng)的Transformation镜廉,也就有相應(yīng)的operator(StreamOperator真正在底層處理一個一個元素的操作類)弄诲。WindowStream 的apply方法對應(yīng)調(diào)用一個private apply方法:

    private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {

        final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null);
        KeySelector<T, K> keySel = input.getKeySelector();

        WindowOperator<K, T, Iterable<T>, R, W> operator;

        if (evictor != null) {
            @SuppressWarnings({"unchecked", "rawtypes"})
            TypeSerializer<StreamRecord<T>> streamRecordSerializer =
                    (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));

            ListStateDescriptor<StreamRecord<T>> stateDesc =
                    new ListStateDescriptor<>("window-contents", streamRecordSerializer);

            operator =
                new EvictingWindowOperator<>(windowAssigner,
                    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                    keySel,
                    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                    stateDesc,
                    function,
                    trigger,
                    evictor,
                    allowedLateness,
                    lateDataOutputTag);

        } else {
            ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
                input.getType().createSerializer(getExecutionEnvironment().getConfig()));

            operator =
                new WindowOperator<>(windowAssigner,
                    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                    keySel,
                    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                    stateDesc,
                    function,
                    trigger,
                    allowedLateness,
                    lateDataOutputTag);
        }

        return input.transform(opName, resultType, operator);
    }

全程都在構(gòu)建operator啊桨吊! 最后通過 return input.transform(opName, resultType, operator); 也就是說還是串在 KeyedStream上的哦威根!所以說WindowStream看似是一個 Stream其實只是為了構(gòu)建Window機制而提供的API,到真正Flink 運行的時候视乐,所有在KeyedStream定義的時間窗口洛搀,最終都會因為window function的調(diào)用返回一個DataStream,一個新的 Transformation被創(chuàng)建佑淀,窗口中的各種組件 windowAssigner 留美、trigger、evictor都會被打包在EvictingWindowOperator或者WindowOperator傳給這個Transformation伸刃,Transformation 為王盎牙!

我們來看看Transformation的operator對象捧颅,以WindowOperator類(AbstractUdfStreamOperator的子類)為例景图,看看他的processElement方法,代碼很長100多行碉哑,先看骨架:

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {

        final Collection<W> elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);

        //if element is handled by none of assigned elementWindows
        boolean isSkippedElement = true;

        final K key = this.<K>getKeyedStateBackend().getCurrentKey();

        if (windowAssigner instanceof MergingWindowAssigner) {
        // 代碼塊1挚币,windowAssigner可以merge
        } else {
        // 代碼塊2,windowAssigner不可以merge
        }
        // side output input event if
        // element not handled by any window
        // late arriving tag has been set
        // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
        if (isSkippedElement && isElementLate(element)) {
            if (lateDataOutputTag != null){
                sideOutput(element);
            } else {
                this.numLateRecordsDropped.inc();
            }
        }
    }

第一步扣典,調(diào)用windowAssigner.assignWindows給當前event element分配Window妆毕。
第二步,核心贮尖,會針對第一步window集合笛粘,一個window一個window一次處理,調(diào)用trigger.onElement方法,如果fire的話就會調(diào)用window function進行計算薪前。
第三部润努,處理side ouput,遲到的元素序六。

四任连、探究剖析--WindowAssigner類

Flink源碼中,WindowAssigner類對應(yīng)滾動窗口的類有TumblingEventTimeWindows和TumblingProcessingTimeWindows例诀,我們以TumblingEventTimeWindows為例随抠,二者區(qū)別主要是窗口時間使用Event Time還是Process Time。

先看一下WindowAssigner類源碼繁涂,可以看出主要包含四個抽象方法拱她。

/**
 *  WindowAssigner可以分配 0個或者多個 Windows 給 Event 元素.
 * @param <T> Event 元素類別.
 * @param <W> Window類別.
 */
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
    private static final long serialVersionUID = 1L;

    /**
     * 返回Event  element 應(yīng)該被分配的 window的集合
     * @param timestamp :event 的時間戳.
     */
    public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
    /**
        * 返回WindowAssigner默認的trigger
     */
    public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
    /**
        * 返回Window 的 TypeSerializer
     */
    public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
       /**
        * 是否是 event time
     */
    public abstract boolean isEventTime();
      /**
        *其他省略
     */
}

方法名稱 方法用途
assignWindows 返回Event element 應(yīng)該被分配的 window的集合
getDefaultTrigger 返回WindowAssigner默認的trigger
getWindowSerializer 返回Window 的 TypeSerializer
isEventTime 是否是 event time

接下來 我們來看 繼承WindowAssigner類 的 TumblingEventTimeWindows類的具體實現(xiàn)。

/**
 *   示例:keyedStream.window(TumblingEventTimeWindows.of(Time.minutes(1)));
 */
@PublicEvolving
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;
  // 窗口大小
    private final long size;
  // 偏移量
    private final long offset;

    protected TumblingEventTimeWindows(long size, long offset) {
        if (offset < 0 || offset >= size) {
            throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size");
        }
        this.size = size;
        this.offset = offset;
    }

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        if (timestamp > Long.MIN_VALUE) {
                        // 根據(jù)滾動窗口機制扔罪,按照當前timestamp秉沼,計算對應(yīng)窗口的start時間,并返回對應(yīng)窗口                
            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
            return Collections.singletonList(new TimeWindow(start, start + size));
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                    "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                    "'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }
    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }
// 其他省略
    @Override
    public boolean isEventTime() {
        return true;
    }
}

我們可以看到矿酵,主要區(qū)別就是包含了size 和 offset兩個變量唬复,使得 assignWindows 實現(xiàn)的時候可以返回想要的翻滾窗口。另外上面代碼可以看出getDefaultTrigger返回的是 EventTimeTrigger類全肮。

接下來我們看一下Trigger類敞咧。

Trigger方法名稱 方法用途
onElement 每當有event element 被加到window中,會觸發(fā)辜腺。結(jié)果返回事件元素對應(yīng)的window是否可以進行window function計算休建。
onProcessingTime timer 計時器觸發(fā)調(diào)用,使用的是process time 评疗。
onEventTime 同上测砂,使用的是 event time。
canMerge 是否支持 窗口合并百匆,如果返回true砌些,必須實現(xiàn)onMerge方法
onMerge 當多個window被WindowAssigner合并的時候的調(diào)用。
clear 清理相關(guān)window 的state
TriggerContext TriggerContext接口加匈,給Trigger提供state 處理和注冊Timer callback
OnMergeContext TriggerContext子接口寄症,onMerge方法使用,增加了mergePartitionedState方法矩动。

單獨整理TriggerContext 接口方法

TriggerContext方法名稱 方法用途
getCurrentProcessingTime 返回當前processing time
getMetricGroup 返回MetricGroup類對象
getCurrentWatermark 返回當前Watermark time
registerProcessingTimeTimer 注冊time callback ,一旦到達time释漆,Trigger的onProcessingTime會被調(diào)用
registerEventTimeTimer 同上悲没,當watermark 達到time,會觸發(fā)Trigger的onEventTime方法。
deleteProcessingTimeTimer 刪除指定時間的processing time trigger
deleteEventTimeTimer 刪除指定時間的event time trigger
getPartitionedState 返回 State對象
getKeyValueState 返回ValueState對象
/**
 * @param <T> Event 元素類別.
 * @param <W> Window類別.
 */
@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {

    private static final long serialVersionUID = -4104633972991191369L;

    public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;

    public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;

    public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;

    public boolean canMerge() {
        return false;
    }

    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        throw new UnsupportedOperationException("This trigger does not support merging.");
    }

    public abstract void clear(W window, TriggerContext ctx) throws Exception;

    // ------------------------------------------------------------------------

    /**
     * A context object that is given to {@link Trigger} methods to allow them to register timer
     * callbacks and deal with state.
     */
    public interface TriggerContext {

        long getCurrentProcessingTime();

        MetricGroup getMetricGroup();

        long getCurrentWatermark();

        void registerProcessingTimeTimer(long time);

        void registerEventTimeTimer(long time);

        void deleteProcessingTimeTimer(long time);

        void deleteEventTimeTimer(long time);

        <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);

        <S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);

        <S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
    }

    public interface OnMergeContext extends TriggerContext {
        <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);
    }
}

接下來我們看繼承Trigger的 EventTimeTrigger類的實現(xiàn):

/**
 * EventTime使用watermark示姿,一旦 watermark 超過 the end of the window甜橱,EventTimeTrigger觸發(fā)
 */
@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {}
    /**
     * 判斷window的最大時間戳是否小于目前的watermark,小于的話返回TriggerResult.FIRE栈戳,否則的話為這個window注冊一個trimer岂傲,返回TriggerResult.CONTINUE。TriggerResult是個枚舉類型
     */    
    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ?
            TriggerResult.FIRE :
            TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window,
            OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }
    }

    public static EventTimeTrigger create() {
        return new EventTimeTrigger();
    }
}

順便看一下枚舉類TriggerResult:
TriggerResult決定window將會發(fā)生什么子檀,如window function是否會調(diào)用镊掖,或者window是否被丟棄。當然如果window里面沒有任何數(shù)據(jù)褂痰,什么都不會發(fā)生亩进。

TriggerResult 值 解釋
CONTINUE 對于window來說什么都不會發(fā)生
FIRE_AND_PURGE 觸發(fā)window function ,而且 purge
FIRE 觸發(fā)window function缩歪, window不會被purged
PURGE window會被丟棄归薛,里面所有的元素都被清理
public enum TriggerResult {

    CONTINUE(false, false),

    FIRE_AND_PURGE(true, true),

    FIRE(true, false),

    PURGE(false, true);

    private final boolean fire;
    private final boolean purge;

    TriggerResult(boolean fire, boolean purge) {
        this.purge = purge;
        this.fire = fire;
    }

    public boolean isFire() {
        return fire;
    }

    public boolean isPurge() {
        return purge;
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市匪蝙,隨后出現(xiàn)的幾起案子主籍,更是在濱河造成了極大的恐慌,老刑警劉巖逛球,帶你破解...
    沈念sama閱讀 222,104評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件千元,死亡現(xiàn)場離奇詭異,居然都是意外死亡需忿,警方通過查閱死者的電腦和手機诅炉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來屋厘,“玉大人涕烧,你說我怎么就攤上這事『谷鳎” “怎么了议纯?”我有些...
    開封第一講書人閱讀 168,697評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長溢谤。 經(jīng)常有香客問我瞻凤,道長,這世上最難降的妖魔是什么世杀? 我笑而不...
    開封第一講書人閱讀 59,836評論 1 298
  • 正文 為了忘掉前任阀参,我火速辦了婚禮,結(jié)果婚禮上瞻坝,老公的妹妹穿的比我還像新娘蛛壳。我一直安慰自己,他們只是感情好,可當我...
    茶點故事閱讀 68,851評論 6 397
  • 文/花漫 我一把揭開白布衙荐。 她就那樣靜靜地躺著捞挥,像睡著了一般。 火紅的嫁衣襯著肌膚如雪忧吟。 梳的紋絲不亂的頭發(fā)上砌函,一...
    開封第一講書人閱讀 52,441評論 1 310
  • 那天,我揣著相機與錄音溜族,去河邊找鬼讹俊。 笑死,一個胖子當著我的面吹牛斩祭,可吹牛的內(nèi)容都是我干的劣像。 我是一名探鬼主播,決...
    沈念sama閱讀 40,992評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼摧玫,長吁一口氣:“原來是場噩夢啊……” “哼耳奕!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起诬像,我...
    開封第一講書人閱讀 39,899評論 0 276
  • 序言:老撾萬榮一對情侶失蹤屋群,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后坏挠,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體芍躏,經(jīng)...
    沈念sama閱讀 46,457評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,529評論 3 341
  • 正文 我和宋清朗相戀三年降狠,在試婚紗的時候發(fā)現(xiàn)自己被綠了对竣。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,664評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡榜配,死狀恐怖否纬,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情蛋褥,我是刑警寧澤临燃,帶...
    沈念sama閱讀 36,346評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站烙心,受9級特大地震影響膜廊,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜淫茵,卻給世界環(huán)境...
    茶點故事閱讀 42,025評論 3 334
  • 文/蒙蒙 一爪瓜、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧匙瘪,春花似錦钥勋、人聲如沸炬转。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,511評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至驻啤,卻和暖如春菲驴,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背骑冗。 一陣腳步聲響...
    開封第一講書人閱讀 33,611評論 1 272
  • 我被黑心中介騙來泰國打工赊瞬, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人贼涩。 一個月前我還...
    沈念sama閱讀 49,081評論 3 377
  • 正文 我出身青樓巧涧,卻偏偏與公主長得像,于是被迫代替她去往敵國和親遥倦。 傳聞我的和親對象是個殘疾皇子谤绳,可洞房花燭夜當晚...
    茶點故事閱讀 45,675評論 2 359

推薦閱讀更多精彩內(nèi)容