Flink 窗口分配器解析

1 前言

WindowAssigner:用于給當(dāng)前流中元素分配0個(gè)或者多個(gè)窗口
相關(guān)的抽象方法就是assignWindows婿崭,該方法為某個(gè)帶有時(shí)間戳timestamp的元素element分配一個(gè)或多個(gè)窗口凛澎,并返回窗口集合谜悟。
不同的分配器的主要差異就體現(xiàn)在該方法的實(shí)現(xiàn)上伙狐。

2 源碼分析
2.1 翻滾窗口分配器

分為TumblingProcessingTimeWindowsTumblingEventTimeWindows

TumblingProcessingTimeWindows:

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        final long now = context.getCurrentProcessingTime();
        long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
        return Collections.singletonList(new TimeWindow(start, start + size));
    }

getWindowStartWithOffset方法:計(jì)算具有偏移offset的窗口起始時(shí)間戳氓奈,例如抒钱,通過of(Time.hours(1),Time.minutes(15))來指定1個(gè)小時(shí)的窗口蜓肆,偏移量offset為15分鐘,那么谋币,你將會(huì)得到一個(gè)窗口開始時(shí)間為0:15:00,1:15:00,2:15:00..的窗口仗扬。

該方法理解起來比較難,可以選擇參數(shù)試一試看看計(jì)算結(jié)果蕾额。
注意:這里的窗口只要是offset定了早芭,各窗口也就定了,不會(huì)隨著處理元素的時(shí)間戳的變化而發(fā)生變化诅蝶。

TumblingEventTimeWindows:

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        if (timestamp > Long.MIN_VALUE) {
            // Long.MIN_VALUE is currently assigned when no timestamp is present
            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
            return Collections.singletonList(new TimeWindow(start, start + size));
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                    "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                    "'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }

這里先去判斷元素有沒有帶時(shí)間戳退个,所以在設(shè)定時(shí)間類型為EventTime時(shí),要記得設(shè)置水位線
之后同樣調(diào)用getWindowStartWithOffset方法

總結(jié):

  • 在翻滾窗口中调炬,每一個(gè)元素都只能分配到一個(gè)窗口中语盈,所以最后返回的是 Collections.singletonList,只有一個(gè)元素的列表缰泡。

  • 基于上可以看出這兩種翻滾窗口差別就在于獲取時(shí)間刀荒,即獲取timestamp不同,一個(gè)是通過context調(diào)用getCurrentProcessingTime獲取當(dāng)前時(shí)間棘钞;一個(gè)是通過元素帶的時(shí)間戳缠借。


2.2 滑動(dòng)窗口分配器

分為SlidingProcessingTimeWindowsSlidingEventTimeWindows

SlidingProcessingTimeWindows:

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        timestamp = context.getCurrentProcessingTime();
        List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
        long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
        for (long start = lastStart;
            start > timestamp - size;
            start -= slide) {
            windows.add(new TimeWindow(start, start + size));
        }
        return windows;
    }

首先計(jì)算當(dāng)前元素能夠被分配到多少個(gè)窗口中,size/slide(即窗口大小/滑動(dòng)距離大幸瞬隆)向下取整烈炭,比如說每隔10分鐘計(jì)算過去每小時(shí)的數(shù)據(jù),那么size/slide就是6宝恶,如此刻處理時(shí)間是20:37符隙,那么元素被分配到的窗口(offset為0的情況)就是[19:40-20:40),[19:50-20:50),[20:00-21:00),[20:10-21:10),[20:20-21:20),[20:30-21:30);
lastStart就是最近的窗口的(開始時(shí)間最大的)開始時(shí)間垫毙,然后使用循環(huán)添加Window霹疫。

SlidingEventTimeWindows:

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        if (timestamp > Long.MIN_VALUE) {
            List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
            long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
            for (long start = lastStart;
                start > timestamp - size;
                start -= slide) {
                windows.add(new TimeWindow(start, start + size));
            }
            return windows;
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                    "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                    "'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }

這個(gè)與上面基于ProcessingTime的處理邏輯基本一致,除了獲取時(shí)間不一樣综芥,一個(gè)是獲取當(dāng)前處理時(shí)間戳丽蝎,一個(gè)使用元素自帶的時(shí)間戳。


2.3 會(huì)話窗口分配器

會(huì)話窗口分配器涉及到窗口合并,那么什么是窗口合并?
示例:


三組數(shù)據(jù)屬于K1屠阻,一組數(shù)據(jù)屬于K2红省。
可以發(fā)現(xiàn),會(huì)話窗口分配器的起始時(shí)間是當(dāng)前事件時(shí)間或者處理時(shí)間国觉,而結(jié)束時(shí)間則為在起始時(shí)間基礎(chǔ)上后延超時(shí)間隔吧恃。
[K1,V4]數(shù)據(jù)應(yīng)該是落在[K1麻诀,V1]的窗口痕寓,即[13:02,13:32]內(nèi)蝇闭,但是其本來的窗口是[13:20呻率,13:50],所以會(huì)話時(shí)間延遲變?yōu)閇13:02呻引,13:50]礼仗。

四個(gè)實(shí)現(xiàn)類:
ProcessingTimeSessionWindowsEventTimeSessionWindows
DynamicProcessingTimeSessionWindows
DynamicEventTimeSessionWindows

@PublicEvolving
public abstract class MergingWindowAssigner<T, W extends Window> extends WindowAssigner<T, W> {
    private static final long serialVersionUID = 1L;

    /**
     * Determines which windows (if any) should be merged.
     *
     * @param windows The window candidates.
     * @param callback A callback that can be invoked to signal which windows should be merged.
     */
    public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback);

    /**
     * Callback to be used in {@link #mergeWindows(Collection, MergeCallback)} for specifying which
     * windows should be merged.
     */
    public interface MergeCallback<W> {

        /**
         * Specifies that the given windows should be merged into the result window.
         *
         * @param toBeMerged The list of windows that should be merged into one window.
         * @param mergeResult The resulting merged window.
         */
        void merge(Collection<W> toBeMerged, W mergeResult);
    }
}

ProcessingTimeSessionWindows:
assignWindows方法:

@Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        long currentProcessingTime = context.getCurrentProcessingTime();
        return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
    }

構(gòu)建一個(gè)以當(dāng)前時(shí)間為起點(diǎn)逻悠,長度為sessionTimeout的TimeWindow單對象集合元践。

mergeWindows方法,調(diào)用TimeWindow的mergeWindows方法

    /**
     * Merge overlapping {@link TimeWindow}s.
     */
    @Override
    public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) {
        TimeWindow.mergeWindows(windows, c);
    }

TimeWindow.mergeWindows方法:

public static void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {

        // sort the windows by the start time and then merge overlapping windows

        List<TimeWindow> sortedWindows = new ArrayList<>(windows);

        Collections.sort(sortedWindows, new Comparator<TimeWindow>() {
            @Override
            public int compare(TimeWindow o1, TimeWindow o2) {
                return Long.compare(o1.getStart(), o2.getStart());
            }
        });

        List<Tuple2<TimeWindow, Set<TimeWindow>>> merged = new ArrayList<>();
        Tuple2<TimeWindow, Set<TimeWindow>> currentMerge = null;

        for (TimeWindow candidate: sortedWindows) {
            if (currentMerge == null) {
                currentMerge = new Tuple2<>();
                currentMerge.f0 = candidate;
                currentMerge.f1 = new HashSet<>();
                currentMerge.f1.add(candidate);
            } else if (currentMerge.f0.intersects(candidate)) {
                currentMerge.f0 = currentMerge.f0.cover(candidate);
                currentMerge.f1.add(candidate);
            } else {
                merged.add(currentMerge);
                currentMerge = new Tuple2<>();
                currentMerge.f0 = candidate;
                currentMerge.f1 = new HashSet<>();
                currentMerge.f1.add(candidate);
            }
        }

        if (currentMerge != null) {
            merged.add(currentMerge);
        }

        for (Tuple2<TimeWindow, Set<TimeWindow>> m: merged) {
            if (m.f1.size() > 1) {
                c.merge(m.f1, m.f0);
            }
        }
    }
  • 根據(jù)窗口的startTime對窗口進(jìn)行排序
  • 對時(shí)間上有重疊的窗口進(jìn)行合并

但是真正執(zhí)行mergeWindows方法實(shí)際是在MergingWindowSet類中的addWindow方法中蹂风,
看一下MergingWindowSet:
<1> 類注釋說明:

<2> addWindow方法解析:
方法注釋說明:

向正在運(yùn)行的窗口集添加一個(gè)新的窗口卢厂,這可能會(huì)觸發(fā)合并之前正在運(yùn)行的窗口乾蓬,在這種情況下惠啄,MergeFunction會(huì)被調(diào)用。
添加之后任内,會(huì)返回代表已添加窗口的窗口撵渡。如果沒有合并,則可以是新窗口本身死嗦,也可以是新合并的窗口趋距。添加元素或者調(diào)用觸發(fā)器函數(shù)僅僅發(fā)生在返回的代表上, 這樣越除,我們再也不必處理新窗口立即被另一個(gè)窗口吞沒节腐。

public W addWindow(W newWindow, MergeFunction<W> mergeFunction) throws Exception {

        List<W> windows = new ArrayList<>();
          
        windows.addAll(this.mapping.keySet());
        windows.add(newWindow);

        final Map<W, Collection<W>> mergeResults = new HashMap<>();
        windowAssigner.mergeWindows(windows,
                new MergingWindowAssigner.MergeCallback<W>() {
                    @Override
                    public void merge(Collection<W> toBeMerged, W mergeResult) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Merging {} into {}", toBeMerged, mergeResult);
                        }
                        mergeResults.put(mergeResult, toBeMerged);
                    }
                });

(1) 將之前所有的窗口都加入到集合中,并且將新加入的窗口也加入到集合中去摘盆;
(2) 調(diào)用合并窗口分配器對窗口進(jìn)行合并翼雀,參與合并的窗口集為之前的所有窗口,并注冊回調(diào)(在回調(diào)中將合并結(jié)果及其關(guān)系加入到mergeResults中)孩擂;


        W resultWindow = newWindow;
        boolean mergedNewWindow = false;

        // perform the merge
        for (Map.Entry<W, Collection<W>> c: mergeResults.entrySet()) {
            W mergeResult = c.getKey();
            Collection<W> mergedWindows = c.getValue();

            // if our new window is in the merged windows make the merge result the
            // result window
            if (mergedWindows.remove(newWindow)) {
                mergedNewWindow = true;
                resultWindow = mergeResult;
            }

            // pick any of the merged windows and choose that window's state window
            // as the state window for the merge result
            W mergedStateWindow = this.mapping.get(mergedWindows.iterator().next());

            // figure out the state windows that we are merging
            List<W> mergedStateWindows = new ArrayList<>();
            for (W mergedWindow: mergedWindows) {
                W res = this.mapping.remove(mergedWindow);
                if (res != null) {
                    mergedStateWindows.add(res);
                }
            }

            this.mapping.put(mergeResult, mergedStateWindow);

            // don't put the target state window into the merged windows
            mergedStateWindows.remove(mergedStateWindow);

            // don't merge the new window itself, it never had any state associated with it
            // i.e. if we are only merging one pre-existing window into itself
            // without extending the pre-existing window
            if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
                mergeFunction.merge(mergeResult,
                        mergedWindows,
                        this.mapping.get(mergeResult),
                        mergedStateWindows);
            }
        }

        // the new window created a new, self-contained window without merging
        if (mergeResults.isEmpty() || (resultWindow.equals(newWindow) && !mergedNewWindow)) {
            this.mapping.put(resultWindow, resultWindow);
        }

        return resultWindow;
    }

(3) 預(yù)置新加入的窗口為最終要返回的結(jié)果窗口狼渊,并將mergedNewWindow初始化為false
(4) 在for循環(huán)中,獲取某個(gè)合并后的窗口类垦,以及參與合并該窗口的原始窗口集合狈邑;
如果說當(dāng)前新加入的窗口在參與該合并的窗口集合中(也即當(dāng)前新加入的窗口被合并)城须,那么最終返回的窗口就是當(dāng)前合并后的窗口;
(5) 選擇參與本次合并的原始窗口集合中的任意一個(gè)元素將其作為當(dāng)前合并后窗口的狀態(tài)窗口米苹,接著新建一個(gè)集合存儲(chǔ)合并后的窗口的狀態(tài)窗口集合糕伐;
(6) 遍歷參與合并的窗口集合中的每個(gè)原始窗口,將參與合并的原始窗口從全局集合中刪除驱入,并獲得其對應(yīng)的狀態(tài)窗口(即其本身)赤炒,加入到新建的狀態(tài)窗口集合中;
(7) 將合并的結(jié)果窗口以及被選中的狀態(tài)窗口加入到全局集合亏较;將將合并的結(jié)果窗口對應(yīng)的狀態(tài)窗口從狀態(tài)窗口集合中刪除
(8) 排除單一窗口本身(即參與合并的窗口集合中有且只有新加入窗口這一條記錄)莺褒,因?yàn)閱我淮翱诓恍枰鰻顟B(tài)合并;調(diào)用合并回調(diào)方法雪情。

這塊比較難理解遵岩,附張流程圖:


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市巡通,隨后出現(xiàn)的幾起案子尘执,更是在濱河造成了極大的恐慌,老刑警劉巖宴凉,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件誊锭,死亡現(xiàn)場離奇詭異,居然都是意外死亡弥锄,警方通過查閱死者的電腦和手機(jī)丧靡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來籽暇,“玉大人温治,你說我怎么就攤上這事〗溆疲” “怎么了熬荆?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長绸狐。 經(jīng)常有香客問我卤恳,道長,這世上最難降的妖魔是什么寒矿? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任突琳,我火速辦了婚禮,結(jié)果婚禮上劫窒,老公的妹妹穿的比我還像新娘本今。我一直安慰自己,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布冠息。 她就那樣靜靜地躺著挪凑,像睡著了一般。 火紅的嫁衣襯著肌膚如雪逛艰。 梳的紋絲不亂的頭發(fā)上躏碳,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天,我揣著相機(jī)與錄音散怖,去河邊找鬼菇绵。 笑死,一個(gè)胖子當(dāng)著我的面吹牛镇眷,可吹牛的內(nèi)容都是我干的咬最。 我是一名探鬼主播,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼欠动,長吁一口氣:“原來是場噩夢啊……” “哼永乌!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起具伍,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤翅雏,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后人芽,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體望几,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年萤厅,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了橄抹。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,505評論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡祈坠,死狀恐怖害碾,靈堂內(nèi)的尸體忽然破棺而出矢劲,到底是詐尸還是另有隱情赦拘,我是刑警寧澤,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布芬沉,位于F島的核電站躺同,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏丸逸。R本人自食惡果不足惜蹋艺,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望黄刚。 院中可真熱鬧捎谨,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至检吆,卻和暖如春舒萎,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背蹭沛。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工臂寝, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人摊灭。 一個(gè)月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓咆贬,卻偏偏與公主長得像,于是被迫代替她去往敵國和親帚呼。 傳聞我的和親對象是個(gè)殘疾皇子素征,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,515評論 2 359