Flink 源碼之 Window Slice

Flink源碼分析系列文檔目錄

請點擊:Flink 源碼分析系列文檔目錄

前言

Window slice是Flink對SQL window聚合的一種優(yōu)化方式。我們回憶下window的三種窗口寨蹋,其中如果使用hopping window或cumulative window幕垦,不難發(fā)現(xiàn)window和window之間是有重疊的。如何避免重復(fù)計算窗口間重疊的部分很明顯是優(yōu)化的一個方向奕翔。為了最大化復(fù)用重疊部分的數(shù)據(jù)裕寨,F(xiàn)link引入了切片(slice)的概念。一個window可以拆分成1個或多個slice的組合(必須恰好組合成window,不能存在slice跨過window邊界的情況)宾袜,對于整個window的計算粒度可以分解為對slice的計算捻艳。Flink將這些slice緩存,從而實現(xiàn)了slice復(fù)用庆猫,避免了window重疊部分重復(fù)計算认轨。以上是window slice優(yōu)化的思路。對于滾動窗口而言月培,一個窗口就是一個切片(窗口間無重疊嘁字,切片之間不共享);而對滑動/累積窗口而言杉畜,一個窗口可能包含多個切片纪蜒,一個切片也可能位于多個窗口中(切片之間會共享)。

接下來我們開始分析整個優(yōu)化過程的核心邏輯此叠。

SqlWindowTableFunction

Flink支持window table valued function(TVF)纯续。TVF對應(yīng)的window計算function的基類為SqlWindowTableFunction它有3個子類:

  • SqlCumulateTableFunction
  • SqlHopTableFunction
  • SqlTumbleTableFunction

分別對應(yīng)了累計窗口,滑動窗口和滾動窗口TVF灭袁。

這些方法在SQL優(yōu)化規(guī)則中(ProjectWindowTableFunctionTransposeRule和)生成猬错。LogicalProject轉(zhuǎn)換為LogicalTableFunctionScan。SQL優(yōu)化的過程不是本篇重點茸歧,后續(xù)單獨開篇分析倦炒。

SliceAssigner

SliceAssigner用途是將元素指定給某個slice(根據(jù)元素的時間,判斷位于哪個slice中)软瞎。StreamExecWindowAggregateBase根據(jù)window的執(zhí)行計劃析校,創(chuàng)建對應(yīng)的assigner。

在Slice優(yōu)化過程中铜涉,通常使用slice/window的結(jié)束時間戳來表示slice/window智玻。

SliceAssigner有兩個子接口:

  • SliceSharedAssigner
  • SliceUnsharedAssigner

分別代表了slice共享和slice不共享的assigner。

SliceAssigner繼承圖

接下來分析它們的源代碼:

@Internal
public interface SliceAssigner extends Serializable {

    /**
     * Returns the end timestamp of a slice that the given element should belong.
     *
     * @param element the element to which slice should belong to.
     * @param clock the service to get current processing time.
     */
    // 獲取元素所屬的slice芙代,返回這個slice的結(jié)束時間戳
    long assignSliceEnd(RowData element, ClockService clock);

    /**
     * Returns the last window which the slice belongs to. The window and and slices are both
     * identified by the end timestamp.
     */
    // 獲取slice所屬的window吊奢,返回這個window的結(jié)束時間戳
    // 如果slice屬于多個window(共享slice),則返回這個slice所屬的最后一個window
    long getLastWindowEnd(long sliceEnd);

    /** Returns the corresponding window start timestamp of the given window end timestamp. */
    // 給出window的結(jié)束時間纹烹,返回這個window的開始時間
    long getWindowStart(long windowEnd);

    /**
     * Returns an iterator of slices to expire when the given window is emitted. The window and
     * slices are both identified by the end timestamp.
     *
     * @param windowEnd the end timestamp of window emitted.
     */
    // 當(dāng)window數(shù)據(jù)發(fā)送到下游之后页滚,返回需要過期處理的slice的iterator(有些共享的slice再也用不到了)
    // window和slice都用結(jié)束時間戳來表示
    Iterable<Long> expiredSlices(long windowEnd);

    /**
     * Returns the interval of slice ends, i.e. the step size to advance of the slice end when a new
     * slice assigned.
     */
    // 返回slice之間的時間間隔。比如說分配下一個slice的時候铺呵,新的slice時間需要前進多少
    long getSliceEndInterval();

    /**
     * Returns {@code true} if elements are assigned to windows based on event time, {@code false}
     * based on processing time.
     */
    // 返回時候使用 event time裹驰。如果返回false說明使用的是processing time
    boolean isEventTime();
}

SliceSharedAssigner

SliceSharedAssigner在window之間共享slice。一個window被分割為多個slice片挂。當(dāng)發(fā)送window數(shù)據(jù)的時候幻林,需要合并slice贞盯,形成window計算結(jié)果。

源代碼如下:

@Internal
public interface SliceSharedAssigner extends SliceAssigner {

    /**
     * Determines which slices (if any) should be merged.
     *
     * @param sliceEnd the triggered slice, identified by end timestamp
     * @param callback a callback that can be invoked to signal which slices should be merged.
     */
    // 合并slice沪饺。sliceEnd為觸發(fā)合并操作的slice躏敢,callback為合并slice的操作
    void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception;

    /**
     * Returns the optional end timestamp of next window which should be triggered. Empty if no
     * following window to trigger for now.
     *
     * <p>The purpose of this method is avoid register too many timers for each hopping and
     * cumulative slice, e.g. HOP(1day, 10s) needs register 8640 timers for every slice. In order to
     * improve this, we only register one timer for the next window. For hopping windows we don't
     * register next window if current window is empty (i.e. no records in current window). That
     * means we will have one more unnecessary window triggered for hopping windows if no elements
     * arrives for a key for a long time. We will skip to emit window result for the triggered empty
     * window, see {@link SliceSharedWindowAggProcessor#fireWindow(Long)}.
     *
     * @param windowEnd the current triggered window, identified by end timestamp
     * @param isWindowEmpty a supplier that can be invoked to get whether the triggered window is
     *     empty (i.e. no records in the window).
     */
    // 返回下一個需要觸發(fā)計算的window
    // windowEnd為當(dāng)前觸發(fā)計算的window的結(jié)束時間戳
    // isWindowEmpty為觸發(fā)計算的window內(nèi)是否有數(shù)據(jù),是一個supplier類型整葡,在需要計算的時候再返回結(jié)果
    Optional<Long> nextTriggerWindow(long windowEnd, Supplier<Boolean> isWindowEmpty);

    // ------------------------------------------------------------------------

    /**
     * Callback to be used in {@link #mergeSlices(long, MergeCallback)} for specifying which slices
     * should be merged.
     */
    // 合并slice的回調(diào)函數(shù)件余,確定哪些slice需要合并,執(zhí)行合并操作
    interface MergeCallback {

        /**
         * Specifies that states of the given slices should be merged into the result slice.
         *
         * @param mergeResult The resulting merged slice, {@code null} if it represents a non-state
         *     namespace.
         * @param toBeMerged The list of slices that should be merged into one slice.
         */
        // mergeResult 合并后的slice(結(jié)束時間戳表示)
        // toBeMerged 需要合并的slice(同樣是結(jié)束時間戳表示)
        void merge(@Nullable Long mergeResult, Iterable<Long> toBeMerged) throws Exception;
    }
}

SliceUnsharedAssigner

SliceUnsharedAssigner表示分配的slice不需要在多個window之間共享遭居,因此1個window只會被分割成1個slice啼器。發(fā)送window數(shù)據(jù)的時候不需要合并多個slice。該接口沒有任何專屬的方法俱萍。代碼如下所示:

@Internal
public interface SliceUnsharedAssigner extends SliceAssigner {}

AbstractSliceAssigner

這是具備SlicAssigner基礎(chǔ)功能的抽象類端壳。所有SliceAssigner的實現(xiàn)類都繼承自該類。我們查看下基本功能有哪些鼠次。

private abstract static class AbstractSliceAssigner implements SliceAssigner {
    private static final long serialVersionUID = 1L;

    // rowtime字段位于RowData的第幾列
    protected final int rowtimeIndex;
    // 是否使用event time
    protected final boolean isEventTime;
    // 時區(qū)ID
    protected final ZoneId shiftTimeZone;

    protected AbstractSliceAssigner(int rowtimeIndex, ZoneId shiftTimeZone) {
        this.rowtimeIndex = rowtimeIndex;
        this.shiftTimeZone = shiftTimeZone;
        this.isEventTime = rowtimeIndex >= 0;
    }

    // 引出一個新的抽象方法。傳入的參數(shù)為數(shù)據(jù)對應(yīng)的timestamp
    // 詳細請見下面的方法分析
    public abstract long assignSliceEnd(long timestamp);

    @Override
    public final long assignSliceEnd(RowData element, ClockService clock) {
        // 這個方法提取出element對應(yīng)的時間
        final long timestamp;
        if (rowtimeIndex >= 0) {
            // 如果是event time模式
            // 提取出這行數(shù)據(jù)對應(yīng)的rowtime芋齿,轉(zhuǎn)換時間為UTC
            // Precision for row timestamp is always 3
            TimestampData rowTime = element.getTimestamp(rowtimeIndex, 3);
            timestamp = toUtcTimestampMills(rowTime.getMillisecond(), shiftTimeZone);
        } else {
            // 如果是processing time 模式腥寇,轉(zhuǎn)換當(dāng)前時間為UTC
            // in processing time mode
            timestamp = toUtcTimestampMills(clock.currentProcessingTime(), shiftTimeZone);
        }
        // 根據(jù)提取出的時間,分配slice
        return assignSliceEnd(timestamp);
    }

    @Override
    public final boolean isEventTime() {
        return isEventTime;
    }
}

下面我們逐個分析SliceAssigner的具體實現(xiàn)觅捆。

TumblingSliceAssigner

用于tumbling window(滾動窗口)赦役。TumblingSliceAssigner的slice不共享,一個window劃分中一個slice栅炒。它的代碼如下:

public static final class TumblingSliceAssigner extends AbstractSliceAssigner
        implements SliceUnsharedAssigner {
    private static final long serialVersionUID = 1L;

    /** Creates a new {@link TumblingSliceAssigner} with a new specified offset. */
    public TumblingSliceAssigner withOffset(Duration offset) {
        return new TumblingSliceAssigner(rowtimeIndex, shiftTimeZone, size, offset.toMillis());
    }

    // 窗口大械嗨ぁ(時間跨度)
    private final long size;
    // 窗口起始時間偏移量
    private final long offset;
    // 用來保存需要清除數(shù)據(jù)(過期)的slice的end
    private final ReusableListIterable reuseExpiredList = new ReusableListIterable();

    private TumblingSliceAssigner(
            int rowtimeIndex, ZoneId shiftTimeZone, long size, long offset) {
        super(rowtimeIndex, shiftTimeZone);
        checkArgument(
                size > 0,
                String.format(
                        "Tumbling Window parameters must satisfy size > 0, but got size %dms.",
                        size));
        checkArgument(
                Math.abs(offset) < size,
                String.format(
                        "Tumbling Window parameters must satisfy abs(offset) < size, bot got size %dms and offset %dms.",
                        size, offset));
        this.size = size;
        this.offset = offset;
    }

    @Override
    public long assignSliceEnd(long timestamp) {
        // 獲取window的開始時間
        long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
        // 開始時間+窗口大小即window的結(jié)束時間
        // slice不共享,window即slice
        return start + size;
    }

    @Override
    public long getLastWindowEnd(long sliceEnd) {
        // 對于Tumbling Window赢赊,每個slice對應(yīng)著一個window乙漓,因此sliceEnd就是windowEnd
        return sliceEnd;
    }

    public long getWindowStart(long windowEnd) {
        // 結(jié)束 - 大小 = 開始
        return windowEnd - size;
    }

    @Override
    public Iterable<Long> expiredSlices(long windowEnd) {
        // 清空reuseExpiredList,只加入windowEnd
        // 每次都過期當(dāng)前window對應(yīng)的slice释移,因為Tumbling window的slice不共享
        reuseExpiredList.reset(windowEnd);
        return reuseExpiredList;
    }

    @Override
    public long getSliceEndInterval() {
        // slice間隔時間為window的size
        return size;
    }
}

CumulativeSliceAssigner

用于Cumulaive window叭披。Cumulative window有一個最大長度maxSize,超過這個長度之后這一批數(shù)據(jù)的累計統(tǒng)計結(jié)束玩讳,開始新一批的統(tǒng)計涩蜘。還有一個步進step,即在同一批數(shù)據(jù)累計過程中熏纯,每過多久輸出一次中間結(jié)果同诫。maxSize必須是step的整數(shù)倍。所以說Cumulaive window每個step劃分為一個slice樟澜。Slice需要共享误窖。

public static final class CumulativeSliceAssigner extends AbstractSliceAssigner
        implements SliceSharedAssigner {
    private static final long serialVersionUID = 1L;

    /** Creates a new {@link CumulativeSliceAssigner} with a new specified offset. */
    public CumulativeSliceAssigner withOffset(Duration offset) {
        return new CumulativeSliceAssigner(
                rowtimeIndex, shiftTimeZone, maxSize, step, offset.toMillis());
    }

    // window的最大長度
    private final long maxSize;
    // 每隔多久輸出一次累計值
    private final long step;
    private final long offset;
    // 記錄需要合并的slice
    private final ReusableListIterable reuseToBeMergedList = new ReusableListIterable();
    // 記錄需要過期的slice
    private final ReusableListIterable reuseExpiredList = new ReusableListIterable();

    protected CumulativeSliceAssigner(
            int rowtimeIndex, ZoneId shiftTimeZone, long maxSize, long step, long offset) {
        super(rowtimeIndex, shiftTimeZone);
        if (maxSize <= 0 || step <= 0) {
            throw new IllegalArgumentException(
                    String.format(
                            "Cumulative Window parameters must satisfy maxSize > 0 and step > 0, but got maxSize %dms and step %dms.",
                            maxSize, step));
        }
        if (maxSize % step != 0) {
            throw new IllegalArgumentException(
                    String.format(
                            "Cumulative Window requires maxSize must be an integral multiple of step, but got maxSize %dms and step %dms.",
                            maxSize, step));
        }

        this.maxSize = maxSize;
        this.step = step;
        this.offset = offset;
    }

    @Override
    public long assignSliceEnd(long timestamp) {
        // 計算window的開始時間
        // 注意這里頭傳入的windowSize實際上是step而不是maxSize叮盘。這里將每個slice作為一個window來計算window start
        // 因此,實際上計算出來的是slice start
        long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, step);
        // slice start + step即slice end
        return start + step;
    }

    @Override
    public long getLastWindowEnd(long sliceEnd) {
        long windowStart = getWindowStart(sliceEnd);
        return windowStart + maxSize;
    }

    @Override
    public long getWindowStart(long windowEnd) {
        // 返回window的開始時間
        return TimeWindow.getWindowStartWithOffset(windowEnd - 1, offset, maxSize);
    }

    @Override
    public Iterable<Long> expiredSlices(long windowEnd) {
        // 獲取window開始時間
        long windowStart = getWindowStart(windowEnd);
        // 獲取第一個slice結(jié)束時間
        long firstSliceEnd = windowStart + step;
        // 獲取屬于這個窗口的最后一個slice的結(jié)束時間
        long lastSliceEnd = windowStart + maxSize;
        if (windowEnd == firstSliceEnd) {
            // we share state in the first slice, skip cleanup for the first slice
            // 如果是第一個slice贩猎,不清除任何slice
            reuseExpiredList.clear();
        } else if (windowEnd == lastSliceEnd) {
            // when this is the last slice,
            // we need to cleanup the shared state (i.e. first slice) and the current slice
            // 如果到達了window最后的slice熊户,需要清除第一個slice和當(dāng)前的slice
            // 為什么不用清除所有的slice,是因為下面mergeSlices方法將后面slice的計算結(jié)果合并到了第一個slice中
            reuseExpiredList.reset(windowEnd, firstSliceEnd);
        } else {
            // clean up current slice
            // 其他情況吭服,清除當(dāng)前的slice即可
            reuseExpiredList.reset(windowEnd);
        }
        return reuseExpiredList;
    }

    @Override
    public long getSliceEndInterval() {
        // 間隔時間即slice步長
        return step;
    }

    @Override
    public void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception {
        // 該方法將sliceEnd對應(yīng)的slice內(nèi)容合并到window第一個slice中
        // 獲取window開始時間
        long windowStart = getWindowStart(sliceEnd);
        // 第一個slice的結(jié)束時間
        long firstSliceEnd = windowStart + step;
        if (sliceEnd == firstSliceEnd) {
            // if this is the first slice, there is nothing to merge
            // 如果相等嚷堡,說明這是window中的第一個slice,不需要合并
            reuseToBeMergedList.clear();
        } else {
            // otherwise, merge the current slice state into the first slice state
            // 否則艇棕,返回當(dāng)前slice
            reuseToBeMergedList.reset(sliceEnd);
        }
        // 將當(dāng)前slice合并到第一個slice中
        callback.merge(firstSliceEnd, reuseToBeMergedList);
    }

    @Override
    public Optional<Long> nextTriggerWindow(long windowEnd, Supplier<Boolean> isWindowEmpty) {
        // 下一個window的結(jié)束時間為這個window的結(jié)束時間+步長
        long nextWindowEnd = windowEnd + step;
        // 獲取下一個累計結(jié)束時候的window結(jié)束時間
        long maxWindowEnd = getWindowStart(windowEnd) + maxSize;
        if (nextWindowEnd > maxWindowEnd) {
            return Optional.empty();
        } else {
            return Optional.of(nextWindowEnd);
        }
    }
}

HoppingSliceAssigner

用于Hopping window蝌戒。Hopping window為滑動窗口。具有兩個重要屬性:滑動距離(slide)和window跨度(size)沼琉。要求size必須為slide的整數(shù)倍北苟。Hopping window之間是一定有重疊的,因此slice需要共享打瘪。為了確保slice跨度盡可能的大(減少合并次數(shù))和盡可能復(fù)用(不跨window邊界)友鼻,slice的跨度值選取size和slide的最大公約數(shù)。

public static final class HoppingSliceAssigner extends AbstractSliceAssigner
        implements SliceSharedAssigner {
    private static final long serialVersionUID = 1L;

    /** Creates a new {@link HoppingSliceAssigner} with a new specified offset. */
    public HoppingSliceAssigner withOffset(Duration offset) {
        return new HoppingSliceAssigner(
                rowtimeIndex, shiftTimeZone, size, slide, offset.toMillis());
    }

    // window大小
    private final long size;
    // 滑動距離
    private final long slide;
    // 初始偏移
    private final long offset;
    // slice大小
    private final long sliceSize;
    // 每個window有多少個slice
    private final int numSlicesPerWindow;
    private final ReusableListIterable reuseExpiredList = new ReusableListIterable();

    protected HoppingSliceAssigner(
            int rowtimeIndex, ZoneId shiftTimeZone, long size, long slide, long offset) {
        super(rowtimeIndex, shiftTimeZone);
        if (size <= 0 || slide <= 0) {
            throw new IllegalArgumentException(
                    String.format(
                            "Hopping Window must satisfy slide > 0 and size > 0, but got slide %dms and size %dms.",
                            slide, size));
        }
        if (size % slide != 0) {
            throw new IllegalArgumentException(
                    String.format(
                            "Slicing Hopping Window requires size must be an integral multiple of slide, but got size %dms and slide %dms.",
                            size, slide));
        }
        this.size = size;
        this.slide = slide;
        this.offset = offset;
        // slice大小為window size和滑動距離的最大公約數(shù)
        this.sliceSize = ArithmeticUtils.gcd(size, slide);
        // 每個window擁有的slice數(shù)量為window大小 / slice大小
        this.numSlicesPerWindow = MathUtils.checkedDownCast(size / sliceSize);
    }

    @Override
    public long assignSliceEnd(long timestamp) {
        // 計算slice start
        long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, sliceSize);
        // sliceStart + sliceSize = sliceEnd
        return start + sliceSize;
    }

    @Override
    public long getLastWindowEnd(long sliceEnd) {
        // slice所屬的最后一個window end為slice start + window size
        return sliceEnd - sliceSize + size;
    }

    @Override
    public long getWindowStart(long windowEnd) {
        return windowEnd - size;
    }

    @Override
    public Iterable<Long> expiredSlices(long windowEnd) {
        // we need to cleanup the first slice of the window
        // 每次都清理掉當(dāng)前window中的第一個slice闺骚,每次都只有這一個過期
        long windowStart = getWindowStart(windowEnd);
        long firstSliceEnd = windowStart + sliceSize;
        reuseExpiredList.reset(firstSliceEnd);
        return reuseExpiredList;
    }

    @Override
    public long getSliceEndInterval() {
        // slice size就是slice間隔時間
        return sliceSize;
    }

    @Override
    public void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception {
        // the iterable to list all the slices of the triggered window
        // 返回window中所有的slice
        Iterable<Long> toBeMerged =
                new HoppingSlicesIterable(sliceEnd, sliceSize, numSlicesPerWindow);
        // null namespace means use heap data views, instead of state data views
        // mergeResult為null意味著計算結(jié)果不在state中存儲
        callback.merge(null, toBeMerged);
    }

    @Override
    public Optional<Long> nextTriggerWindow(long windowEnd, Supplier<Boolean> isWindowEmpty) {
        // 如果下一個需要觸發(fā)的window內(nèi)容不為空就返回下一個window
        if (isWindowEmpty.get()) {
            return Optional.empty();
        } else {
            return Optional.of(windowEnd + sliceSize);
        }
    }
}

WindowedSliceAssigner

適用于window開始時間和結(jié)束時間在element中攜帶這類場景彩扔。代碼較為簡單不過多分析。

public static final class WindowedSliceAssigner implements SliceUnsharedAssigner {
    private static final long serialVersionUID = 1L;

    // element中第幾個字段表示的是window end
    private final int windowEndIndex;
    // 包裝的assigner
    private final SliceAssigner innerAssigner;
    private final ReusableListIterable reuseExpiredList = new ReusableListIterable();

    public WindowedSliceAssigner(int windowEndIndex, SliceAssigner innerAssigner) {
        checkArgument(
                windowEndIndex >= 0,
                "Windowed slice assigner must have a positive window end index.");
        this.windowEndIndex = windowEndIndex;
        this.innerAssigner = innerAssigner;
    }

    @Override
    public long assignSliceEnd(RowData element, ClockService clock) {
        // 獲取element中攜帶的window end
        return element.getTimestamp(windowEndIndex, 3).getMillisecond();
    }

    @Override
    public long getLastWindowEnd(long sliceEnd) {
        // we shouldn't use innerAssigner.getLastWindowEnd here,
        // because WindowedSliceAssigner is slice unshared, an attached window can't be
        // shared with other windows and the last window should be itself.
        // 由于slice不共享僻爽,直接返回sliceEnd
        return sliceEnd;
    }

    @Override
    public long getWindowStart(long windowEnd) {
        return innerAssigner.getWindowStart(windowEnd);
    }

    @Override
    public Iterable<Long> expiredSlices(long windowEnd) {
        reuseExpiredList.reset(windowEnd);
        return reuseExpiredList;
    }

    @Override
    public long getSliceEndInterval() {
        return innerAssigner.getSliceEndInterval();
    }

    @Override
    public boolean isEventTime() {
        // it always works in event-time mode if input row has been attached windows
        // 只適用于event time
        return true;
    }
}

slice的分配邏輯分析完了虫碉,接下來開始slice window數(shù)據(jù)的計算過程分析。

SlicingWindowProcessor

Cumulaive window用于處理slice window的數(shù)據(jù)胸梆。具有子類AbstractWindowAggProcessor敦捧。該類擁有兩個實現(xiàn)類:

  • SliceSharedWindowAggProcessor
  • SliceUnsharedWindowAggProcessor
SlicingWindowProcessor繼承圖

AbstractWindowAggProcessor

我們重點分析每個元素到來的時候AbstractWindowAggProcessor的處理邏輯。這個邏輯位于processElement方法碰镜。代碼如下:

// 返回true說明這個元素可以被丟棄兢卵,false則不可丟棄
@Override
public boolean processElement(RowData key, RowData element) throws Exception {
    // 返回這個元素所屬的slice
    long sliceEnd = sliceAssigner.assignSliceEnd(element, clockService);
    if (!isEventTime) {
        // always register processing time for every element when processing time mode
        // 如果使用processing time,在slice結(jié)束的時候注冊一個processing time timer绪颖,用于觸發(fā)計算
        windowTimerService.registerProcessingTimeWindowTimer(sliceEnd);
    }

    if (isEventTime && isWindowFired(sliceEnd, currentProgress, shiftTimeZone)) {
        // 如果slice已觸發(fā)計算济蝉,說明這個元素來遲了
        // the assigned slice has been triggered, which means current element is late,
        // but maybe not need to drop
        // 獲取這個slice所屬的最后一個window
        long lastWindowEnd = sliceAssigner.getLastWindowEnd(sliceEnd);
        if (isWindowFired(lastWindowEnd, currentProgress, shiftTimeZone)) {
            // the last window has been triggered, so the element can be dropped now
            // 如果這個window已觸發(fā)計算,這個元素可以丟棄
            return true;
        } else {
            // sliceStateMergeTarget返回保存合并狀態(tài)的slice菠发,即把一系列slice合并到哪個slice中
            // 緩存slice數(shù)據(jù)王滤,可以認為是一個multiKeyMap(多個key的map)。key滓鸠,合并后的sliceEnd對應(yīng)一系列的element
            windowBuffer.addElement(key, sliceStateMergeTarget(sliceEnd), element);
            // we need to register a timer for the next unfired window,
            // because this may the first time we see elements under the key
            long unfiredFirstWindow = sliceEnd;
            while (isWindowFired(unfiredFirstWindow, currentProgress, shiftTimeZone)) {
                // unfiredFirstWindow一直加windowInterval(sliceAssigner.getSliceEndInterval())雁乡,直到它大于currentProgress
                // 最終結(jié)果是下一個未觸發(fā)計算的slice
                unfiredFirstWindow += windowInterval;
            }
            // 注冊一個event time timer
            windowTimerService.registerEventTimeWindowTimer(unfiredFirstWindow);
            return false;
        }
    } else {
        // the assigned slice hasn't been triggered, accumulate into the assigned slice
        // 如果slice沒觸發(fā)計算,將元素加入slice緩存
        windowBuffer.addElement(key, sliceEnd, element);
        return false;
    }
}

SliceUnsharedWindowAggProcessor

非共享slice window聚合處理器糜俗。代碼和分析如下:

public final class SliceUnsharedWindowAggProcessor extends AbstractWindowAggProcessor {
    private static final long serialVersionUID = 1L;

    public SliceUnsharedWindowAggProcessor(
            GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
            WindowBuffer.Factory windowBufferFactory,
            SliceUnsharedAssigner sliceAssigner,
            TypeSerializer<RowData> accSerializer,
            ZoneId shiftTimeZone) {
        super(genAggsHandler, windowBufferFactory, sliceAssigner, accSerializer, shiftTimeZone);
    }

    // 觸發(fā)計算
    @Override
    public void fireWindow(Long windowEnd) throws Exception {
        // 返回window計算出的累計值
        RowData acc = windowState.value(windowEnd);
        if (acc == null) {
            // 如果沒有在創(chuàng)建一個累加器
            acc = aggregator.createAccumulators();
        }
        // 設(shè)置當(dāng)前window的累加器
        aggregator.setAccumulators(windowEnd, acc);
        // 獲取累計結(jié)果
        RowData aggResult = aggregator.getValue(windowEnd);
        // 輸出結(jié)果到下游
        collect(aggResult);
    }

    @Override
    protected long sliceStateMergeTarget(long sliceToMerge) throws Exception {
        // 非共享slice模式不存在merge情況踱稍,即merge前后為同一個slice曲饱。直接返回slice
        return sliceToMerge;
    }
}

SliceSharedWindowAggProcessor

共享slice window聚合處理器。該處理器需要處理slice合并的具體邏輯珠月。代碼和分析如下:

public final class SliceSharedWindowAggProcessor extends AbstractWindowAggProcessor
        implements SliceSharedAssigner.MergeCallback {
    private static final long serialVersionUID = 1L;

    private final SliceSharedAssigner sliceSharedAssigner;
    private final WindowIsEmptySupplier emptySupplier;
    // 實現(xiàn)了SliceSharedAssigner.MergeCallback扩淀,處理merge的結(jié)果
    private final SliceMergeTargetHelper mergeTargetHelper;

    public SliceSharedWindowAggProcessor(
            GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
            WindowBuffer.Factory bufferFactory,
            SliceSharedAssigner sliceAssigner,
            TypeSerializer<RowData> accSerializer,
            int indexOfCountStar,
            ZoneId shiftTimeZone) {
        super(genAggsHandler, bufferFactory, sliceAssigner, accSerializer, shiftTimeZone);
        this.sliceSharedAssigner = sliceAssigner;
        this.mergeTargetHelper = new SliceMergeTargetHelper();
        this.emptySupplier = new WindowIsEmptySupplier(indexOfCountStar, sliceAssigner);
    }

    @Override
    public void fireWindow(Long windowEnd) throws Exception {
        // 合并slice,指定自己為MergeCallback
        // SliceSharedWindowAggProcessor實現(xiàn)了MergeCallback的merge方法
        sliceSharedAssigner.mergeSlices(windowEnd, this);
        // we have set accumulator in the merge() method
        // 獲取window統(tǒng)計結(jié)果
        RowData aggResult = aggregator.getValue(windowEnd);
        // 如果window內(nèi)容不為空啤挎,收集計算聚合結(jié)果到下游
        if (!isWindowEmpty()) {
            // for hopping windows, the triggered window may be an empty window
            // (see register next window below), for such window, we shouldn't emit it
            // 因為hopping window中可能不存在數(shù)據(jù)
            collect(aggResult);
        }

        // we should register next window timer here,
        // because slices are shared, maybe no elements arrived for the next slices
        // 獲取下一個需要觸發(fā)的window驻谆,設(shè)置定時器
        Optional<Long> nextWindowEndOptional =
                sliceSharedAssigner.nextTriggerWindow(windowEnd, emptySupplier);
        if (nextWindowEndOptional.isPresent()) {
            long nextWindowEnd = nextWindowEndOptional.get();
            if (sliceSharedAssigner.isEventTime()) {
                windowTimerService.registerEventTimeWindowTimer(nextWindowEnd);
            } else {
                windowTimerService.registerProcessingTimeWindowTimer(nextWindowEnd);
            }
        }
    }

    // slice合并的具體操作在此
    // sliceSharedAssigner.mergeSlices(windowEnd, this)調(diào)用了此方法
    @Override
    public void merge(@Nullable Long mergeResult, Iterable<Long> toBeMerged) throws Exception {
        // get base accumulator
        final RowData acc;
        // mergeResult為null表示結(jié)果不在state中保存
        // Hop window的結(jié)果不在state中保存
        if (mergeResult == null) {
            // null means the merged is not on state, create a new acc
            acc = aggregator.createAccumulators();
        } else {
            // 否則從state中讀取聚合結(jié)果
            RowData stateAcc = windowState.value(mergeResult);
            if (stateAcc == null) {
                acc = aggregator.createAccumulators();
            } else {
                acc = stateAcc;
            }
        }
        // set base accumulator
        aggregator.setAccumulators(mergeResult, acc);

        // merge slice accumulators
        // 聚合所有toBeMerged slice的值
        for (Long slice : toBeMerged) {
            RowData sliceAcc = windowState.value(slice);
            if (sliceAcc != null) {
                aggregator.merge(slice, sliceAcc);
            }
        }

        // set merged acc into state if the merged acc is on state
        // 如果mergeResult不為null,更新聚合結(jié)果到windowState
        if (mergeResult != null) {
            windowState.update(mergeResult, aggregator.getAccumulators());
        }
    }

    // 該方法執(zhí)行merge slice操作
    // slice數(shù)據(jù)遲到的時候調(diào)用
    // 遲到數(shù)據(jù)屬于哪個slice就被merge到哪個slice中
    protected long sliceStateMergeTarget(long sliceToMerge) throws Exception {
        // 將sliceToMerge作為merge結(jié)果返回
        mergeTargetHelper.setMergeTarget(null);
        sliceSharedAssigner.mergeSlices(sliceToMerge, mergeTargetHelper);

        // the mergeTarget might be null, which means the merging happens in memory instead of
        // on state, so the slice state to merge into is itself.
        if (mergeTargetHelper.getMergeTarget() != null) {
            return mergeTargetHelper.getMergeTarget();
        } else {
            return sliceToMerge;
        }
    }

    private boolean isWindowEmpty() {
        if (emptySupplier.indexOfCountStar < 0) {
            // 對于hopping window會自動添加一個count(*)字段用來判斷window內(nèi)是否有元素
            // 對于非hopping window庆聘,比如cumulative window胜臊,沒有count(*)字段,永遠不會為空
            // for non-hopping windows, the window is never empty
            return false;
        } else {
            return emptySupplier.get();
        }
    }

    // ------------------------------------------------------------------------------------------

    private final class WindowIsEmptySupplier implements Supplier<Boolean>, Serializable {
        private static final long serialVersionUID = 1L;

        // count(*) 字段的index伙判,僅用于hopping window象对。用來判斷窗口中是否有數(shù)據(jù)
        private final int indexOfCountStar;

        private WindowIsEmptySupplier(int indexOfCountStar, SliceAssigner assigner) {
            if (assigner instanceof SliceAssigners.HoppingSliceAssigner) {
                checkArgument(
                        indexOfCountStar >= 0,
                        "Hopping window requires a COUNT(*) in the aggregate functions.");
            }
            this.indexOfCountStar = indexOfCountStar;
        }

        @Override
        public Boolean get() {
            if (indexOfCountStar < 0) {
                return false;
            }
            try {
                // hopping window情況,如果累加器為空宴抚,或者count(*)返回0勒魔,說明沒有數(shù)據(jù),window為空
                RowData acc = aggregator.getAccumulators();
                return acc == null || acc.getLong(indexOfCountStar) == 0;
            } catch (Exception e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        }
    }

    // 將mergeResult作為mergeTarget返回
    private static final class SliceMergeTargetHelper
            implements SliceSharedAssigner.MergeCallback, Serializable {

        private static final long serialVersionUID = 1L;
        private Long mergeTarget = null;

        @Override
        public void merge(@Nullable Long mergeResult, Iterable<Long> toBeMerged) throws Exception {
            this.mergeTarget = mergeResult;
        }

        public Long getMergeTarget() {
            return mergeTarget;
        }

        public void setMergeTarget(Long mergeTarget) {
            this.mergeTarget = mergeTarget;
        }
    }
}

本博客為作者原創(chuàng)菇曲,歡迎大家參與討論和批評指正冠绢。如需轉(zhuǎn)載請注明出處。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末羊娃,一起剝皮案震驚了整個濱河市唐全,隨后出現(xiàn)的幾起案子埃跷,更是在濱河造成了極大的恐慌蕊玷,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件弥雹,死亡現(xiàn)場離奇詭異垃帅,居然都是意外死亡,警方通過查閱死者的電腦和手機剪勿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進店門贸诚,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人厕吉,你說我怎么就攤上這事酱固。” “怎么了头朱?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵运悲,是天一觀的道長。 經(jīng)常有香客問我项钮,道長班眯,這世上最難降的妖魔是什么希停? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮署隘,結(jié)果婚禮上宠能,老公的妹妹穿的比我還像新娘。我一直安慰自己磁餐,他們只是感情好违崇,可當(dāng)我...
    茶點故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著崖媚,像睡著了一般亦歉。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上畅哑,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天肴楷,我揣著相機與錄音,去河邊找鬼荠呐。 笑死赛蔫,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的泥张。 我是一名探鬼主播呵恢,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼媚创!你這毒婦竟也來了渗钉?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤钞钙,失蹤者是張志新(化名)和其女友劉穎鳄橘,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體芒炼,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡瘫怜,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了本刽。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鲸湃。...
    茶點故事閱讀 40,505評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖子寓,靈堂內(nèi)的尸體忽然破棺而出暗挑,到底是詐尸還是另有隱情,我是刑警寧澤斜友,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布炸裆,位于F島的核電站,受9級特大地震影響蝙寨,放射性物質(zhì)發(fā)生泄漏晒衩。R本人自食惡果不足惜嗤瞎,卻給世界環(huán)境...
    茶點故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望听系。 院中可真熱鬧贝奇,春花似錦、人聲如沸靠胜。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽浪漠。三九已至陕习,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間址愿,已是汗流浹背该镣。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留响谓,地道東北人损合。 一個月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像娘纷,于是被迫代替她去往敵國和親嫁审。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,515評論 2 359

推薦閱讀更多精彩內(nèi)容