1 前言
WindowAssigner:用于給當(dāng)前流中元素分配0個(gè)或者多個(gè)窗口
相關(guān)的抽象方法就是assignWindows
婿崭,該方法為某個(gè)帶有時(shí)間戳timestamp的元素element分配一個(gè)或多個(gè)窗口凛澎,并返回窗口集合谜悟。
不同的分配器的主要差異就體現(xiàn)在該方法的實(shí)現(xiàn)上伙狐。
2 源碼分析
2.1 翻滾窗口分配器
分為TumblingProcessingTimeWindows
和TumblingEventTimeWindows
TumblingProcessingTimeWindows:
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
final long now = context.getCurrentProcessingTime();
long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
}
getWindowStartWithOffset
方法:計(jì)算具有偏移offset的窗口起始時(shí)間戳氓奈,例如抒钱,通過of(Time.hours(1),Time.minutes(15))來指定1個(gè)小時(shí)的窗口蜓肆,偏移量offset為15分鐘,那么谋币,你將會(huì)得到一個(gè)窗口開始時(shí)間為0:15:00,1:15:00,2:15:00..的窗口仗扬。
該方法理解起來比較難,可以選擇參數(shù)試一試看看計(jì)算結(jié)果蕾额。
注意:這里的窗口只要是offset定了早芭,各窗口也就定了,不會(huì)隨著處理元素的時(shí)間戳的變化而發(fā)生變化诅蝶。
TumblingEventTimeWindows:
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
這里先去判斷元素有沒有帶時(shí)間戳退个,所以在設(shè)定時(shí)間類型為EventTime時(shí),要記得設(shè)置水位線
之后同樣調(diào)用getWindowStartWithOffset
方法
總結(jié):
在翻滾窗口中调炬,每一個(gè)元素都只能分配到一個(gè)窗口中语盈,所以最后返回的是 Collections.singletonList,只有一個(gè)元素的列表缰泡。
基于上可以看出這兩種翻滾窗口差別就在于獲取時(shí)間刀荒,即獲取timestamp不同,一個(gè)是通過
context
調(diào)用getCurrentProcessingTime
獲取當(dāng)前時(shí)間棘钞;一個(gè)是通過元素帶的時(shí)間戳缠借。
2.2 滑動(dòng)窗口分配器
分為SlidingProcessingTimeWindows
和SlidingEventTimeWindows
SlidingProcessingTimeWindows:
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
timestamp = context.getCurrentProcessingTime();
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
}
首先計(jì)算當(dāng)前元素能夠被分配到多少個(gè)窗口中,size/slide
(即窗口大小/滑動(dòng)距離大幸瞬隆)向下取整烈炭,比如說每隔10分鐘計(jì)算過去每小時(shí)的數(shù)據(jù),那么size/slide
就是6宝恶,如此刻處理時(shí)間是20:37符隙,那么元素被分配到的窗口(offset為0的情況)就是[19:40-20:40),[19:50-20:50),[20:00-21:00),[20:10-21:10),[20:20-21:20),[20:30-21:30);
lastStart
就是最近的窗口的(開始時(shí)間最大的)開始時(shí)間垫毙,然后使用循環(huán)添加Window霹疫。
SlidingEventTimeWindows:
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
這個(gè)與上面基于ProcessingTime的處理邏輯基本一致,除了獲取時(shí)間不一樣综芥,一個(gè)是獲取當(dāng)前處理時(shí)間戳丽蝎,一個(gè)使用元素自帶的時(shí)間戳。
2.3 會(huì)話窗口分配器
會(huì)話窗口分配器涉及到窗口合并,那么什么是窗口合并?
示例:
三組數(shù)據(jù)屬于K1屠阻,一組數(shù)據(jù)屬于K2红省。
可以發(fā)現(xiàn),會(huì)話窗口分配器的起始時(shí)間是當(dāng)前事件時(shí)間或者處理時(shí)間国觉,而結(jié)束時(shí)間則為在起始時(shí)間基礎(chǔ)上后延超時(shí)間隔吧恃。
[K1,V4]數(shù)據(jù)應(yīng)該是落在[K1麻诀,V1]的窗口痕寓,即[13:02,13:32]內(nèi)蝇闭,但是其本來的窗口是[13:20呻率,13:50],所以會(huì)話時(shí)間延遲變?yōu)閇13:02呻引,13:50]礼仗。
四個(gè)實(shí)現(xiàn)類:
ProcessingTimeSessionWindows
和EventTimeSessionWindows
;
DynamicProcessingTimeSessionWindows
和
DynamicEventTimeSessionWindows
@PublicEvolving
public abstract class MergingWindowAssigner<T, W extends Window> extends WindowAssigner<T, W> {
private static final long serialVersionUID = 1L;
/**
* Determines which windows (if any) should be merged.
*
* @param windows The window candidates.
* @param callback A callback that can be invoked to signal which windows should be merged.
*/
public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback);
/**
* Callback to be used in {@link #mergeWindows(Collection, MergeCallback)} for specifying which
* windows should be merged.
*/
public interface MergeCallback<W> {
/**
* Specifies that the given windows should be merged into the result window.
*
* @param toBeMerged The list of windows that should be merged into one window.
* @param mergeResult The resulting merged window.
*/
void merge(Collection<W> toBeMerged, W mergeResult);
}
}
ProcessingTimeSessionWindows:
assignWindows
方法:
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
long currentProcessingTime = context.getCurrentProcessingTime();
return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
}
構(gòu)建一個(gè)以當(dāng)前時(shí)間為起點(diǎn)逻悠,長度為sessionTimeout的TimeWindow單對象集合元践。
mergeWindows
方法,調(diào)用TimeWindow的mergeWindows方法
/**
* Merge overlapping {@link TimeWindow}s.
*/
@Override
public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) {
TimeWindow.mergeWindows(windows, c);
}
TimeWindow.mergeWindows
方法:
public static void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {
// sort the windows by the start time and then merge overlapping windows
List<TimeWindow> sortedWindows = new ArrayList<>(windows);
Collections.sort(sortedWindows, new Comparator<TimeWindow>() {
@Override
public int compare(TimeWindow o1, TimeWindow o2) {
return Long.compare(o1.getStart(), o2.getStart());
}
});
List<Tuple2<TimeWindow, Set<TimeWindow>>> merged = new ArrayList<>();
Tuple2<TimeWindow, Set<TimeWindow>> currentMerge = null;
for (TimeWindow candidate: sortedWindows) {
if (currentMerge == null) {
currentMerge = new Tuple2<>();
currentMerge.f0 = candidate;
currentMerge.f1 = new HashSet<>();
currentMerge.f1.add(candidate);
} else if (currentMerge.f0.intersects(candidate)) {
currentMerge.f0 = currentMerge.f0.cover(candidate);
currentMerge.f1.add(candidate);
} else {
merged.add(currentMerge);
currentMerge = new Tuple2<>();
currentMerge.f0 = candidate;
currentMerge.f1 = new HashSet<>();
currentMerge.f1.add(candidate);
}
}
if (currentMerge != null) {
merged.add(currentMerge);
}
for (Tuple2<TimeWindow, Set<TimeWindow>> m: merged) {
if (m.f1.size() > 1) {
c.merge(m.f1, m.f0);
}
}
}
- 根據(jù)窗口的startTime對窗口進(jìn)行排序
- 對時(shí)間上有重疊的窗口進(jìn)行合并
但是真正執(zhí)行mergeWindows方法實(shí)際是在MergingWindowSet類中的addWindow方法中蹂风,
看一下MergingWindowSet:
<1> 類注釋說明:
<2> addWindow方法解析:
方法注釋說明:
向正在運(yùn)行的窗口集添加一個(gè)新的窗口卢厂,這可能會(huì)觸發(fā)合并之前正在運(yùn)行的窗口乾蓬,在這種情況下惠啄,MergeFunction會(huì)被調(diào)用。
添加之后任内,會(huì)返回代表已添加窗口的窗口撵渡。如果沒有合并,則可以是新窗口本身死嗦,也可以是新合并的窗口趋距。添加元素或者調(diào)用觸發(fā)器函數(shù)僅僅發(fā)生在返回的代表上, 這樣越除,我們再也不必處理新窗口立即被另一個(gè)窗口吞沒节腐。
public W addWindow(W newWindow, MergeFunction<W> mergeFunction) throws Exception {
List<W> windows = new ArrayList<>();
windows.addAll(this.mapping.keySet());
windows.add(newWindow);
final Map<W, Collection<W>> mergeResults = new HashMap<>();
windowAssigner.mergeWindows(windows,
new MergingWindowAssigner.MergeCallback<W>() {
@Override
public void merge(Collection<W> toBeMerged, W mergeResult) {
if (LOG.isDebugEnabled()) {
LOG.debug("Merging {} into {}", toBeMerged, mergeResult);
}
mergeResults.put(mergeResult, toBeMerged);
}
});
(1) 將之前所有的窗口都加入到集合中,并且將新加入的窗口也加入到集合中去摘盆;
(2) 調(diào)用合并窗口分配器對窗口進(jìn)行合并翼雀,參與合并的窗口集為之前的所有窗口,并注冊回調(diào)(在回調(diào)中將合并結(jié)果及其關(guān)系加入到mergeResults
中)孩擂;
W resultWindow = newWindow;
boolean mergedNewWindow = false;
// perform the merge
for (Map.Entry<W, Collection<W>> c: mergeResults.entrySet()) {
W mergeResult = c.getKey();
Collection<W> mergedWindows = c.getValue();
// if our new window is in the merged windows make the merge result the
// result window
if (mergedWindows.remove(newWindow)) {
mergedNewWindow = true;
resultWindow = mergeResult;
}
// pick any of the merged windows and choose that window's state window
// as the state window for the merge result
W mergedStateWindow = this.mapping.get(mergedWindows.iterator().next());
// figure out the state windows that we are merging
List<W> mergedStateWindows = new ArrayList<>();
for (W mergedWindow: mergedWindows) {
W res = this.mapping.remove(mergedWindow);
if (res != null) {
mergedStateWindows.add(res);
}
}
this.mapping.put(mergeResult, mergedStateWindow);
// don't put the target state window into the merged windows
mergedStateWindows.remove(mergedStateWindow);
// don't merge the new window itself, it never had any state associated with it
// i.e. if we are only merging one pre-existing window into itself
// without extending the pre-existing window
if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
mergeFunction.merge(mergeResult,
mergedWindows,
this.mapping.get(mergeResult),
mergedStateWindows);
}
}
// the new window created a new, self-contained window without merging
if (mergeResults.isEmpty() || (resultWindow.equals(newWindow) && !mergedNewWindow)) {
this.mapping.put(resultWindow, resultWindow);
}
return resultWindow;
}
(3) 預(yù)置新加入的窗口為最終要返回的結(jié)果窗口狼渊,并將mergedNewWindow
初始化為false
(4) 在for循環(huán)中,獲取某個(gè)合并后的窗口类垦,以及參與合并該窗口的原始窗口集合狈邑;
如果說當(dāng)前新加入的窗口在參與該合并的窗口集合中(也即當(dāng)前新加入的窗口被合并)城须,那么最終返回的窗口就是當(dāng)前合并后的窗口;
(5) 選擇參與本次合并的原始窗口集合中的任意一個(gè)元素將其作為當(dāng)前合并后窗口的狀態(tài)窗口米苹,接著新建一個(gè)集合存儲(chǔ)合并后的窗口的狀態(tài)窗口集合糕伐;
(6) 遍歷參與合并的窗口集合中的每個(gè)原始窗口,將參與合并的原始窗口從全局集合中刪除驱入,并獲得其對應(yīng)的狀態(tài)窗口(即其本身)赤炒,加入到新建的狀態(tài)窗口集合中;
(7) 將合并的結(jié)果窗口以及被選中的狀態(tài)窗口加入到全局集合亏较;將將合并的結(jié)果窗口對應(yīng)的狀態(tài)窗口從狀態(tài)窗口集合中刪除
(8) 排除單一窗口本身(即參與合并的窗口集合中有且只有新加入窗口這一條記錄)莺褒,因?yàn)閱我淮翱诓恍枰鰻顟B(tài)合并;調(diào)用合并回調(diào)方法雪情。
這塊比較難理解遵岩,附張流程圖: