Flink源碼分析系列文檔目錄
請點擊:Flink 源碼分析系列文檔目錄
前言
Window slice是Flink對SQL window聚合的一種優(yōu)化方式。我們回憶下window的三種窗口寨蹋,其中如果使用hopping window或cumulative window幕垦,不難發(fā)現(xiàn)window和window之間是有重疊的。如何避免重復(fù)計算窗口間重疊的部分很明顯是優(yōu)化的一個方向奕翔。為了最大化復(fù)用重疊部分的數(shù)據(jù)裕寨,F(xiàn)link引入了切片(slice)的概念。一個window可以拆分成1個或多個slice的組合(必須恰好組合成window,不能存在slice跨過window邊界的情況)宾袜,對于整個window的計算粒度可以分解為對slice的計算捻艳。Flink將這些slice緩存,從而實現(xiàn)了slice復(fù)用庆猫,避免了window重疊部分重復(fù)計算认轨。以上是window slice優(yōu)化的思路。對于滾動窗口而言月培,一個窗口就是一個切片(窗口間無重疊嘁字,切片之間不共享);而對滑動/累積窗口而言杉畜,一個窗口可能包含多個切片纪蜒,一個切片也可能位于多個窗口中(切片之間會共享)。
接下來我們開始分析整個優(yōu)化過程的核心邏輯此叠。
SqlWindowTableFunction
Flink支持window table valued function(TVF)纯续。TVF對應(yīng)的window計算function的基類為SqlWindowTableFunction
它有3個子類:
- SqlCumulateTableFunction
- SqlHopTableFunction
- SqlTumbleTableFunction
分別對應(yīng)了累計窗口,滑動窗口和滾動窗口TVF灭袁。
這些方法在SQL優(yōu)化規(guī)則中(ProjectWindowTableFunctionTransposeRule和)生成猬错。LogicalProject
轉(zhuǎn)換為LogicalTableFunctionScan
。SQL優(yōu)化的過程不是本篇重點茸歧,后續(xù)單獨開篇分析倦炒。
SliceAssigner
SliceAssigner
用途是將元素指定給某個slice(根據(jù)元素的時間,判斷位于哪個slice中)软瞎。StreamExecWindowAggregateBase
根據(jù)window的執(zhí)行計劃析校,創(chuàng)建對應(yīng)的assigner。
在Slice優(yōu)化過程中铜涉,通常使用slice/window的結(jié)束時間戳來表示slice/window智玻。
SliceAssigner
有兩個子接口:
- SliceSharedAssigner
- SliceUnsharedAssigner
分別代表了slice共享和slice不共享的assigner。
接下來分析它們的源代碼:
@Internal
public interface SliceAssigner extends Serializable {
/**
* Returns the end timestamp of a slice that the given element should belong.
*
* @param element the element to which slice should belong to.
* @param clock the service to get current processing time.
*/
// 獲取元素所屬的slice芙代,返回這個slice的結(jié)束時間戳
long assignSliceEnd(RowData element, ClockService clock);
/**
* Returns the last window which the slice belongs to. The window and and slices are both
* identified by the end timestamp.
*/
// 獲取slice所屬的window吊奢,返回這個window的結(jié)束時間戳
// 如果slice屬于多個window(共享slice),則返回這個slice所屬的最后一個window
long getLastWindowEnd(long sliceEnd);
/** Returns the corresponding window start timestamp of the given window end timestamp. */
// 給出window的結(jié)束時間纹烹,返回這個window的開始時間
long getWindowStart(long windowEnd);
/**
* Returns an iterator of slices to expire when the given window is emitted. The window and
* slices are both identified by the end timestamp.
*
* @param windowEnd the end timestamp of window emitted.
*/
// 當(dāng)window數(shù)據(jù)發(fā)送到下游之后页滚,返回需要過期處理的slice的iterator(有些共享的slice再也用不到了)
// window和slice都用結(jié)束時間戳來表示
Iterable<Long> expiredSlices(long windowEnd);
/**
* Returns the interval of slice ends, i.e. the step size to advance of the slice end when a new
* slice assigned.
*/
// 返回slice之間的時間間隔。比如說分配下一個slice的時候铺呵,新的slice時間需要前進多少
long getSliceEndInterval();
/**
* Returns {@code true} if elements are assigned to windows based on event time, {@code false}
* based on processing time.
*/
// 返回時候使用 event time裹驰。如果返回false說明使用的是processing time
boolean isEventTime();
}
SliceSharedAssigner
SliceSharedAssigner
在window之間共享slice。一個window被分割為多個slice片挂。當(dāng)發(fā)送window數(shù)據(jù)的時候幻林,需要合并slice贞盯,形成window計算結(jié)果。
源代碼如下:
@Internal
public interface SliceSharedAssigner extends SliceAssigner {
/**
* Determines which slices (if any) should be merged.
*
* @param sliceEnd the triggered slice, identified by end timestamp
* @param callback a callback that can be invoked to signal which slices should be merged.
*/
// 合并slice沪饺。sliceEnd為觸發(fā)合并操作的slice躏敢,callback為合并slice的操作
void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception;
/**
* Returns the optional end timestamp of next window which should be triggered. Empty if no
* following window to trigger for now.
*
* <p>The purpose of this method is avoid register too many timers for each hopping and
* cumulative slice, e.g. HOP(1day, 10s) needs register 8640 timers for every slice. In order to
* improve this, we only register one timer for the next window. For hopping windows we don't
* register next window if current window is empty (i.e. no records in current window). That
* means we will have one more unnecessary window triggered for hopping windows if no elements
* arrives for a key for a long time. We will skip to emit window result for the triggered empty
* window, see {@link SliceSharedWindowAggProcessor#fireWindow(Long)}.
*
* @param windowEnd the current triggered window, identified by end timestamp
* @param isWindowEmpty a supplier that can be invoked to get whether the triggered window is
* empty (i.e. no records in the window).
*/
// 返回下一個需要觸發(fā)計算的window
// windowEnd為當(dāng)前觸發(fā)計算的window的結(jié)束時間戳
// isWindowEmpty為觸發(fā)計算的window內(nèi)是否有數(shù)據(jù),是一個supplier類型整葡,在需要計算的時候再返回結(jié)果
Optional<Long> nextTriggerWindow(long windowEnd, Supplier<Boolean> isWindowEmpty);
// ------------------------------------------------------------------------
/**
* Callback to be used in {@link #mergeSlices(long, MergeCallback)} for specifying which slices
* should be merged.
*/
// 合并slice的回調(diào)函數(shù)件余,確定哪些slice需要合并,執(zhí)行合并操作
interface MergeCallback {
/**
* Specifies that states of the given slices should be merged into the result slice.
*
* @param mergeResult The resulting merged slice, {@code null} if it represents a non-state
* namespace.
* @param toBeMerged The list of slices that should be merged into one slice.
*/
// mergeResult 合并后的slice(結(jié)束時間戳表示)
// toBeMerged 需要合并的slice(同樣是結(jié)束時間戳表示)
void merge(@Nullable Long mergeResult, Iterable<Long> toBeMerged) throws Exception;
}
}
SliceUnsharedAssigner
SliceUnsharedAssigner
表示分配的slice不需要在多個window之間共享遭居,因此1個window只會被分割成1個slice啼器。發(fā)送window數(shù)據(jù)的時候不需要合并多個slice。該接口沒有任何專屬的方法俱萍。代碼如下所示:
@Internal
public interface SliceUnsharedAssigner extends SliceAssigner {}
AbstractSliceAssigner
這是具備SlicAssigner
基礎(chǔ)功能的抽象類端壳。所有SliceAssigner
的實現(xiàn)類都繼承自該類。我們查看下基本功能有哪些鼠次。
private abstract static class AbstractSliceAssigner implements SliceAssigner {
private static final long serialVersionUID = 1L;
// rowtime字段位于RowData的第幾列
protected final int rowtimeIndex;
// 是否使用event time
protected final boolean isEventTime;
// 時區(qū)ID
protected final ZoneId shiftTimeZone;
protected AbstractSliceAssigner(int rowtimeIndex, ZoneId shiftTimeZone) {
this.rowtimeIndex = rowtimeIndex;
this.shiftTimeZone = shiftTimeZone;
this.isEventTime = rowtimeIndex >= 0;
}
// 引出一個新的抽象方法。傳入的參數(shù)為數(shù)據(jù)對應(yīng)的timestamp
// 詳細請見下面的方法分析
public abstract long assignSliceEnd(long timestamp);
@Override
public final long assignSliceEnd(RowData element, ClockService clock) {
// 這個方法提取出element對應(yīng)的時間
final long timestamp;
if (rowtimeIndex >= 0) {
// 如果是event time模式
// 提取出這行數(shù)據(jù)對應(yīng)的rowtime芋齿,轉(zhuǎn)換時間為UTC
// Precision for row timestamp is always 3
TimestampData rowTime = element.getTimestamp(rowtimeIndex, 3);
timestamp = toUtcTimestampMills(rowTime.getMillisecond(), shiftTimeZone);
} else {
// 如果是processing time 模式腥寇,轉(zhuǎn)換當(dāng)前時間為UTC
// in processing time mode
timestamp = toUtcTimestampMills(clock.currentProcessingTime(), shiftTimeZone);
}
// 根據(jù)提取出的時間,分配slice
return assignSliceEnd(timestamp);
}
@Override
public final boolean isEventTime() {
return isEventTime;
}
}
下面我們逐個分析SliceAssigner
的具體實現(xiàn)觅捆。
TumblingSliceAssigner
用于tumbling window(滾動窗口)赦役。TumblingSliceAssigner
的slice不共享,一個window劃分中一個slice栅炒。它的代碼如下:
public static final class TumblingSliceAssigner extends AbstractSliceAssigner
implements SliceUnsharedAssigner {
private static final long serialVersionUID = 1L;
/** Creates a new {@link TumblingSliceAssigner} with a new specified offset. */
public TumblingSliceAssigner withOffset(Duration offset) {
return new TumblingSliceAssigner(rowtimeIndex, shiftTimeZone, size, offset.toMillis());
}
// 窗口大械嗨ぁ(時間跨度)
private final long size;
// 窗口起始時間偏移量
private final long offset;
// 用來保存需要清除數(shù)據(jù)(過期)的slice的end
private final ReusableListIterable reuseExpiredList = new ReusableListIterable();
private TumblingSliceAssigner(
int rowtimeIndex, ZoneId shiftTimeZone, long size, long offset) {
super(rowtimeIndex, shiftTimeZone);
checkArgument(
size > 0,
String.format(
"Tumbling Window parameters must satisfy size > 0, but got size %dms.",
size));
checkArgument(
Math.abs(offset) < size,
String.format(
"Tumbling Window parameters must satisfy abs(offset) < size, bot got size %dms and offset %dms.",
size, offset));
this.size = size;
this.offset = offset;
}
@Override
public long assignSliceEnd(long timestamp) {
// 獲取window的開始時間
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
// 開始時間+窗口大小即window的結(jié)束時間
// slice不共享,window即slice
return start + size;
}
@Override
public long getLastWindowEnd(long sliceEnd) {
// 對于Tumbling Window赢赊,每個slice對應(yīng)著一個window乙漓,因此sliceEnd就是windowEnd
return sliceEnd;
}
public long getWindowStart(long windowEnd) {
// 結(jié)束 - 大小 = 開始
return windowEnd - size;
}
@Override
public Iterable<Long> expiredSlices(long windowEnd) {
// 清空reuseExpiredList,只加入windowEnd
// 每次都過期當(dāng)前window對應(yīng)的slice释移,因為Tumbling window的slice不共享
reuseExpiredList.reset(windowEnd);
return reuseExpiredList;
}
@Override
public long getSliceEndInterval() {
// slice間隔時間為window的size
return size;
}
}
CumulativeSliceAssigner
用于Cumulaive window叭披。Cumulative window有一個最大長度maxSize,超過這個長度之后這一批數(shù)據(jù)的累計統(tǒng)計結(jié)束玩讳,開始新一批的統(tǒng)計涩蜘。還有一個步進step,即在同一批數(shù)據(jù)累計過程中熏纯,每過多久輸出一次中間結(jié)果同诫。maxSize必須是step的整數(shù)倍。所以說Cumulaive window每個step劃分為一個slice樟澜。Slice需要共享误窖。
public static final class CumulativeSliceAssigner extends AbstractSliceAssigner
implements SliceSharedAssigner {
private static final long serialVersionUID = 1L;
/** Creates a new {@link CumulativeSliceAssigner} with a new specified offset. */
public CumulativeSliceAssigner withOffset(Duration offset) {
return new CumulativeSliceAssigner(
rowtimeIndex, shiftTimeZone, maxSize, step, offset.toMillis());
}
// window的最大長度
private final long maxSize;
// 每隔多久輸出一次累計值
private final long step;
private final long offset;
// 記錄需要合并的slice
private final ReusableListIterable reuseToBeMergedList = new ReusableListIterable();
// 記錄需要過期的slice
private final ReusableListIterable reuseExpiredList = new ReusableListIterable();
protected CumulativeSliceAssigner(
int rowtimeIndex, ZoneId shiftTimeZone, long maxSize, long step, long offset) {
super(rowtimeIndex, shiftTimeZone);
if (maxSize <= 0 || step <= 0) {
throw new IllegalArgumentException(
String.format(
"Cumulative Window parameters must satisfy maxSize > 0 and step > 0, but got maxSize %dms and step %dms.",
maxSize, step));
}
if (maxSize % step != 0) {
throw new IllegalArgumentException(
String.format(
"Cumulative Window requires maxSize must be an integral multiple of step, but got maxSize %dms and step %dms.",
maxSize, step));
}
this.maxSize = maxSize;
this.step = step;
this.offset = offset;
}
@Override
public long assignSliceEnd(long timestamp) {
// 計算window的開始時間
// 注意這里頭傳入的windowSize實際上是step而不是maxSize叮盘。這里將每個slice作為一個window來計算window start
// 因此,實際上計算出來的是slice start
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, step);
// slice start + step即slice end
return start + step;
}
@Override
public long getLastWindowEnd(long sliceEnd) {
long windowStart = getWindowStart(sliceEnd);
return windowStart + maxSize;
}
@Override
public long getWindowStart(long windowEnd) {
// 返回window的開始時間
return TimeWindow.getWindowStartWithOffset(windowEnd - 1, offset, maxSize);
}
@Override
public Iterable<Long> expiredSlices(long windowEnd) {
// 獲取window開始時間
long windowStart = getWindowStart(windowEnd);
// 獲取第一個slice結(jié)束時間
long firstSliceEnd = windowStart + step;
// 獲取屬于這個窗口的最后一個slice的結(jié)束時間
long lastSliceEnd = windowStart + maxSize;
if (windowEnd == firstSliceEnd) {
// we share state in the first slice, skip cleanup for the first slice
// 如果是第一個slice贩猎,不清除任何slice
reuseExpiredList.clear();
} else if (windowEnd == lastSliceEnd) {
// when this is the last slice,
// we need to cleanup the shared state (i.e. first slice) and the current slice
// 如果到達了window最后的slice熊户,需要清除第一個slice和當(dāng)前的slice
// 為什么不用清除所有的slice,是因為下面mergeSlices方法將后面slice的計算結(jié)果合并到了第一個slice中
reuseExpiredList.reset(windowEnd, firstSliceEnd);
} else {
// clean up current slice
// 其他情況吭服,清除當(dāng)前的slice即可
reuseExpiredList.reset(windowEnd);
}
return reuseExpiredList;
}
@Override
public long getSliceEndInterval() {
// 間隔時間即slice步長
return step;
}
@Override
public void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception {
// 該方法將sliceEnd對應(yīng)的slice內(nèi)容合并到window第一個slice中
// 獲取window開始時間
long windowStart = getWindowStart(sliceEnd);
// 第一個slice的結(jié)束時間
long firstSliceEnd = windowStart + step;
if (sliceEnd == firstSliceEnd) {
// if this is the first slice, there is nothing to merge
// 如果相等嚷堡,說明這是window中的第一個slice,不需要合并
reuseToBeMergedList.clear();
} else {
// otherwise, merge the current slice state into the first slice state
// 否則艇棕,返回當(dāng)前slice
reuseToBeMergedList.reset(sliceEnd);
}
// 將當(dāng)前slice合并到第一個slice中
callback.merge(firstSliceEnd, reuseToBeMergedList);
}
@Override
public Optional<Long> nextTriggerWindow(long windowEnd, Supplier<Boolean> isWindowEmpty) {
// 下一個window的結(jié)束時間為這個window的結(jié)束時間+步長
long nextWindowEnd = windowEnd + step;
// 獲取下一個累計結(jié)束時候的window結(jié)束時間
long maxWindowEnd = getWindowStart(windowEnd) + maxSize;
if (nextWindowEnd > maxWindowEnd) {
return Optional.empty();
} else {
return Optional.of(nextWindowEnd);
}
}
}
HoppingSliceAssigner
用于Hopping window蝌戒。Hopping window為滑動窗口。具有兩個重要屬性:滑動距離(slide)和window跨度(size)沼琉。要求size必須為slide的整數(shù)倍北苟。Hopping window之間是一定有重疊的,因此slice需要共享打瘪。為了確保slice跨度盡可能的大(減少合并次數(shù))和盡可能復(fù)用(不跨window邊界)友鼻,slice的跨度值選取size和slide的最大公約數(shù)。
public static final class HoppingSliceAssigner extends AbstractSliceAssigner
implements SliceSharedAssigner {
private static final long serialVersionUID = 1L;
/** Creates a new {@link HoppingSliceAssigner} with a new specified offset. */
public HoppingSliceAssigner withOffset(Duration offset) {
return new HoppingSliceAssigner(
rowtimeIndex, shiftTimeZone, size, slide, offset.toMillis());
}
// window大小
private final long size;
// 滑動距離
private final long slide;
// 初始偏移
private final long offset;
// slice大小
private final long sliceSize;
// 每個window有多少個slice
private final int numSlicesPerWindow;
private final ReusableListIterable reuseExpiredList = new ReusableListIterable();
protected HoppingSliceAssigner(
int rowtimeIndex, ZoneId shiftTimeZone, long size, long slide, long offset) {
super(rowtimeIndex, shiftTimeZone);
if (size <= 0 || slide <= 0) {
throw new IllegalArgumentException(
String.format(
"Hopping Window must satisfy slide > 0 and size > 0, but got slide %dms and size %dms.",
slide, size));
}
if (size % slide != 0) {
throw new IllegalArgumentException(
String.format(
"Slicing Hopping Window requires size must be an integral multiple of slide, but got size %dms and slide %dms.",
size, slide));
}
this.size = size;
this.slide = slide;
this.offset = offset;
// slice大小為window size和滑動距離的最大公約數(shù)
this.sliceSize = ArithmeticUtils.gcd(size, slide);
// 每個window擁有的slice數(shù)量為window大小 / slice大小
this.numSlicesPerWindow = MathUtils.checkedDownCast(size / sliceSize);
}
@Override
public long assignSliceEnd(long timestamp) {
// 計算slice start
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, sliceSize);
// sliceStart + sliceSize = sliceEnd
return start + sliceSize;
}
@Override
public long getLastWindowEnd(long sliceEnd) {
// slice所屬的最后一個window end為slice start + window size
return sliceEnd - sliceSize + size;
}
@Override
public long getWindowStart(long windowEnd) {
return windowEnd - size;
}
@Override
public Iterable<Long> expiredSlices(long windowEnd) {
// we need to cleanup the first slice of the window
// 每次都清理掉當(dāng)前window中的第一個slice闺骚,每次都只有這一個過期
long windowStart = getWindowStart(windowEnd);
long firstSliceEnd = windowStart + sliceSize;
reuseExpiredList.reset(firstSliceEnd);
return reuseExpiredList;
}
@Override
public long getSliceEndInterval() {
// slice size就是slice間隔時間
return sliceSize;
}
@Override
public void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception {
// the iterable to list all the slices of the triggered window
// 返回window中所有的slice
Iterable<Long> toBeMerged =
new HoppingSlicesIterable(sliceEnd, sliceSize, numSlicesPerWindow);
// null namespace means use heap data views, instead of state data views
// mergeResult為null意味著計算結(jié)果不在state中存儲
callback.merge(null, toBeMerged);
}
@Override
public Optional<Long> nextTriggerWindow(long windowEnd, Supplier<Boolean> isWindowEmpty) {
// 如果下一個需要觸發(fā)的window內(nèi)容不為空就返回下一個window
if (isWindowEmpty.get()) {
return Optional.empty();
} else {
return Optional.of(windowEnd + sliceSize);
}
}
}
WindowedSliceAssigner
適用于window開始時間和結(jié)束時間在element中攜帶這類場景彩扔。代碼較為簡單不過多分析。
public static final class WindowedSliceAssigner implements SliceUnsharedAssigner {
private static final long serialVersionUID = 1L;
// element中第幾個字段表示的是window end
private final int windowEndIndex;
// 包裝的assigner
private final SliceAssigner innerAssigner;
private final ReusableListIterable reuseExpiredList = new ReusableListIterable();
public WindowedSliceAssigner(int windowEndIndex, SliceAssigner innerAssigner) {
checkArgument(
windowEndIndex >= 0,
"Windowed slice assigner must have a positive window end index.");
this.windowEndIndex = windowEndIndex;
this.innerAssigner = innerAssigner;
}
@Override
public long assignSliceEnd(RowData element, ClockService clock) {
// 獲取element中攜帶的window end
return element.getTimestamp(windowEndIndex, 3).getMillisecond();
}
@Override
public long getLastWindowEnd(long sliceEnd) {
// we shouldn't use innerAssigner.getLastWindowEnd here,
// because WindowedSliceAssigner is slice unshared, an attached window can't be
// shared with other windows and the last window should be itself.
// 由于slice不共享僻爽,直接返回sliceEnd
return sliceEnd;
}
@Override
public long getWindowStart(long windowEnd) {
return innerAssigner.getWindowStart(windowEnd);
}
@Override
public Iterable<Long> expiredSlices(long windowEnd) {
reuseExpiredList.reset(windowEnd);
return reuseExpiredList;
}
@Override
public long getSliceEndInterval() {
return innerAssigner.getSliceEndInterval();
}
@Override
public boolean isEventTime() {
// it always works in event-time mode if input row has been attached windows
// 只適用于event time
return true;
}
}
slice的分配邏輯分析完了虫碉,接下來開始slice window數(shù)據(jù)的計算過程分析。
SlicingWindowProcessor
Cumulaive window
用于處理slice window的數(shù)據(jù)胸梆。具有子類AbstractWindowAggProcessor
敦捧。該類擁有兩個實現(xiàn)類:
- SliceSharedWindowAggProcessor
- SliceUnsharedWindowAggProcessor
AbstractWindowAggProcessor
我們重點分析每個元素到來的時候AbstractWindowAggProcessor
的處理邏輯。這個邏輯位于processElement
方法碰镜。代碼如下:
// 返回true說明這個元素可以被丟棄兢卵,false則不可丟棄
@Override
public boolean processElement(RowData key, RowData element) throws Exception {
// 返回這個元素所屬的slice
long sliceEnd = sliceAssigner.assignSliceEnd(element, clockService);
if (!isEventTime) {
// always register processing time for every element when processing time mode
// 如果使用processing time,在slice結(jié)束的時候注冊一個processing time timer绪颖,用于觸發(fā)計算
windowTimerService.registerProcessingTimeWindowTimer(sliceEnd);
}
if (isEventTime && isWindowFired(sliceEnd, currentProgress, shiftTimeZone)) {
// 如果slice已觸發(fā)計算济蝉,說明這個元素來遲了
// the assigned slice has been triggered, which means current element is late,
// but maybe not need to drop
// 獲取這個slice所屬的最后一個window
long lastWindowEnd = sliceAssigner.getLastWindowEnd(sliceEnd);
if (isWindowFired(lastWindowEnd, currentProgress, shiftTimeZone)) {
// the last window has been triggered, so the element can be dropped now
// 如果這個window已觸發(fā)計算,這個元素可以丟棄
return true;
} else {
// sliceStateMergeTarget返回保存合并狀態(tài)的slice菠发,即把一系列slice合并到哪個slice中
// 緩存slice數(shù)據(jù)王滤,可以認為是一個multiKeyMap(多個key的map)。key滓鸠,合并后的sliceEnd對應(yīng)一系列的element
windowBuffer.addElement(key, sliceStateMergeTarget(sliceEnd), element);
// we need to register a timer for the next unfired window,
// because this may the first time we see elements under the key
long unfiredFirstWindow = sliceEnd;
while (isWindowFired(unfiredFirstWindow, currentProgress, shiftTimeZone)) {
// unfiredFirstWindow一直加windowInterval(sliceAssigner.getSliceEndInterval())雁乡,直到它大于currentProgress
// 最終結(jié)果是下一個未觸發(fā)計算的slice
unfiredFirstWindow += windowInterval;
}
// 注冊一個event time timer
windowTimerService.registerEventTimeWindowTimer(unfiredFirstWindow);
return false;
}
} else {
// the assigned slice hasn't been triggered, accumulate into the assigned slice
// 如果slice沒觸發(fā)計算,將元素加入slice緩存
windowBuffer.addElement(key, sliceEnd, element);
return false;
}
}
SliceUnsharedWindowAggProcessor
非共享slice window聚合處理器糜俗。代碼和分析如下:
public final class SliceUnsharedWindowAggProcessor extends AbstractWindowAggProcessor {
private static final long serialVersionUID = 1L;
public SliceUnsharedWindowAggProcessor(
GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
WindowBuffer.Factory windowBufferFactory,
SliceUnsharedAssigner sliceAssigner,
TypeSerializer<RowData> accSerializer,
ZoneId shiftTimeZone) {
super(genAggsHandler, windowBufferFactory, sliceAssigner, accSerializer, shiftTimeZone);
}
// 觸發(fā)計算
@Override
public void fireWindow(Long windowEnd) throws Exception {
// 返回window計算出的累計值
RowData acc = windowState.value(windowEnd);
if (acc == null) {
// 如果沒有在創(chuàng)建一個累加器
acc = aggregator.createAccumulators();
}
// 設(shè)置當(dāng)前window的累加器
aggregator.setAccumulators(windowEnd, acc);
// 獲取累計結(jié)果
RowData aggResult = aggregator.getValue(windowEnd);
// 輸出結(jié)果到下游
collect(aggResult);
}
@Override
protected long sliceStateMergeTarget(long sliceToMerge) throws Exception {
// 非共享slice模式不存在merge情況踱稍,即merge前后為同一個slice曲饱。直接返回slice
return sliceToMerge;
}
}
SliceSharedWindowAggProcessor
共享slice window聚合處理器。該處理器需要處理slice合并的具體邏輯珠月。代碼和分析如下:
public final class SliceSharedWindowAggProcessor extends AbstractWindowAggProcessor
implements SliceSharedAssigner.MergeCallback {
private static final long serialVersionUID = 1L;
private final SliceSharedAssigner sliceSharedAssigner;
private final WindowIsEmptySupplier emptySupplier;
// 實現(xiàn)了SliceSharedAssigner.MergeCallback扩淀,處理merge的結(jié)果
private final SliceMergeTargetHelper mergeTargetHelper;
public SliceSharedWindowAggProcessor(
GeneratedNamespaceAggsHandleFunction<Long> genAggsHandler,
WindowBuffer.Factory bufferFactory,
SliceSharedAssigner sliceAssigner,
TypeSerializer<RowData> accSerializer,
int indexOfCountStar,
ZoneId shiftTimeZone) {
super(genAggsHandler, bufferFactory, sliceAssigner, accSerializer, shiftTimeZone);
this.sliceSharedAssigner = sliceAssigner;
this.mergeTargetHelper = new SliceMergeTargetHelper();
this.emptySupplier = new WindowIsEmptySupplier(indexOfCountStar, sliceAssigner);
}
@Override
public void fireWindow(Long windowEnd) throws Exception {
// 合并slice,指定自己為MergeCallback
// SliceSharedWindowAggProcessor實現(xiàn)了MergeCallback的merge方法
sliceSharedAssigner.mergeSlices(windowEnd, this);
// we have set accumulator in the merge() method
// 獲取window統(tǒng)計結(jié)果
RowData aggResult = aggregator.getValue(windowEnd);
// 如果window內(nèi)容不為空啤挎,收集計算聚合結(jié)果到下游
if (!isWindowEmpty()) {
// for hopping windows, the triggered window may be an empty window
// (see register next window below), for such window, we shouldn't emit it
// 因為hopping window中可能不存在數(shù)據(jù)
collect(aggResult);
}
// we should register next window timer here,
// because slices are shared, maybe no elements arrived for the next slices
// 獲取下一個需要觸發(fā)的window驻谆,設(shè)置定時器
Optional<Long> nextWindowEndOptional =
sliceSharedAssigner.nextTriggerWindow(windowEnd, emptySupplier);
if (nextWindowEndOptional.isPresent()) {
long nextWindowEnd = nextWindowEndOptional.get();
if (sliceSharedAssigner.isEventTime()) {
windowTimerService.registerEventTimeWindowTimer(nextWindowEnd);
} else {
windowTimerService.registerProcessingTimeWindowTimer(nextWindowEnd);
}
}
}
// slice合并的具體操作在此
// sliceSharedAssigner.mergeSlices(windowEnd, this)調(diào)用了此方法
@Override
public void merge(@Nullable Long mergeResult, Iterable<Long> toBeMerged) throws Exception {
// get base accumulator
final RowData acc;
// mergeResult為null表示結(jié)果不在state中保存
// Hop window的結(jié)果不在state中保存
if (mergeResult == null) {
// null means the merged is not on state, create a new acc
acc = aggregator.createAccumulators();
} else {
// 否則從state中讀取聚合結(jié)果
RowData stateAcc = windowState.value(mergeResult);
if (stateAcc == null) {
acc = aggregator.createAccumulators();
} else {
acc = stateAcc;
}
}
// set base accumulator
aggregator.setAccumulators(mergeResult, acc);
// merge slice accumulators
// 聚合所有toBeMerged slice的值
for (Long slice : toBeMerged) {
RowData sliceAcc = windowState.value(slice);
if (sliceAcc != null) {
aggregator.merge(slice, sliceAcc);
}
}
// set merged acc into state if the merged acc is on state
// 如果mergeResult不為null,更新聚合結(jié)果到windowState
if (mergeResult != null) {
windowState.update(mergeResult, aggregator.getAccumulators());
}
}
// 該方法執(zhí)行merge slice操作
// slice數(shù)據(jù)遲到的時候調(diào)用
// 遲到數(shù)據(jù)屬于哪個slice就被merge到哪個slice中
protected long sliceStateMergeTarget(long sliceToMerge) throws Exception {
// 將sliceToMerge作為merge結(jié)果返回
mergeTargetHelper.setMergeTarget(null);
sliceSharedAssigner.mergeSlices(sliceToMerge, mergeTargetHelper);
// the mergeTarget might be null, which means the merging happens in memory instead of
// on state, so the slice state to merge into is itself.
if (mergeTargetHelper.getMergeTarget() != null) {
return mergeTargetHelper.getMergeTarget();
} else {
return sliceToMerge;
}
}
private boolean isWindowEmpty() {
if (emptySupplier.indexOfCountStar < 0) {
// 對于hopping window會自動添加一個count(*)字段用來判斷window內(nèi)是否有元素
// 對于非hopping window庆聘,比如cumulative window胜臊,沒有count(*)字段,永遠不會為空
// for non-hopping windows, the window is never empty
return false;
} else {
return emptySupplier.get();
}
}
// ------------------------------------------------------------------------------------------
private final class WindowIsEmptySupplier implements Supplier<Boolean>, Serializable {
private static final long serialVersionUID = 1L;
// count(*) 字段的index伙判,僅用于hopping window象对。用來判斷窗口中是否有數(shù)據(jù)
private final int indexOfCountStar;
private WindowIsEmptySupplier(int indexOfCountStar, SliceAssigner assigner) {
if (assigner instanceof SliceAssigners.HoppingSliceAssigner) {
checkArgument(
indexOfCountStar >= 0,
"Hopping window requires a COUNT(*) in the aggregate functions.");
}
this.indexOfCountStar = indexOfCountStar;
}
@Override
public Boolean get() {
if (indexOfCountStar < 0) {
return false;
}
try {
// hopping window情況,如果累加器為空宴抚,或者count(*)返回0勒魔,說明沒有數(shù)據(jù),window為空
RowData acc = aggregator.getAccumulators();
return acc == null || acc.getLong(indexOfCountStar) == 0;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
// 將mergeResult作為mergeTarget返回
private static final class SliceMergeTargetHelper
implements SliceSharedAssigner.MergeCallback, Serializable {
private static final long serialVersionUID = 1L;
private Long mergeTarget = null;
@Override
public void merge(@Nullable Long mergeResult, Iterable<Long> toBeMerged) throws Exception {
this.mergeTarget = mergeResult;
}
public Long getMergeTarget() {
return mergeTarget;
}
public void setMergeTarget(Long mergeTarget) {
this.mergeTarget = mergeTarget;
}
}
}
本博客為作者原創(chuàng)菇曲,歡迎大家參與討論和批評指正冠绢。如需轉(zhuǎn)載請注明出處。