零审丘、序言
本篇文章探究Flink Window窗口機制,首先介紹窗口機制使用的總綱可都,涉及的所有組件進行介紹啄枕,心中有一個大體的藍圖和認識婚陪。之后基于keyBy方法返回的Keyed Window入手,分析window方法频祝,并依次進行WindowAssigner泌参、Trigger類介紹脆淹。篇幅所限,計劃在其他文章中繼續(xù)介紹evictor沽一、reduce/aggregate等聚合方法盖溺,以及allowedLateness方法等使用。
一铣缠、背景&目標
為了實現(xiàn)項目場景中自定義窗口功能烘嘱,還是要先把目前Flink提供的窗口機制剖析一下,先從簡單好理解的入手蝗蛙,以tumbling windows為例蝇庭,sliding windows思路也相似。
二捡硅、窗口機制
考慮keyed Windows 哮内,從官網(wǎng)介紹可以通過窗口機制整個使用方法,提綱挈領(lǐng)了解所涉及的組件壮韭,雖然有些組件使用的是時候是可以使用默認而不用指定北发。
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
組件 | 作用 |
---|---|
keyBy | 數(shù)據(jù)流按照key分流 |
window | 需要傳入WindowAssigner類,用來進行Event元素時間窗口分配喷屋。滾動窗口和session窗口一個Event對應(yīng)一個時間窗口琳拨,滑動窗口一個Eevent可以對應(yīng)多個時間窗口。 |
trigger | 用來決定觸發(fā)針對特定時間窗口進行運算的window function執(zhí)行屯曹。 |
evictor | 用來在trigger觸發(fā)后狱庇、window function執(zhí)行之前進行event過濾。 |
allowedLatteness | 允許event延遲時間是牢。 |
sideOutputLateData | 設(shè)置遲到的event 標簽 |
getSideOutput | 獲取遲到event |
reduce/aggregate/fold/apply | window function窗口計算函數(shù)僵井,對時間窗口中的event 元素進行計算。 |
先從最基礎(chǔ)的WindowAssigner類開始驳棱,本篇重點以tumbling windows為例。
三农曲、探究剖析--溯源
Flink1.7.2版本Java代碼社搅。
一切都是始于stream.keyBy().window()
stream.keyBy()返回的是一個DataStream類子類:KeyedStream類對象
DataStream類的相關(guān)知識已經(jīng)在通過Flink 程序模板來學(xué)習(xí)StreamExecutionEnvironment 、DataStream 乳规、StreamTransformation類文章中探究過了形葬。
來看看KeyedStream類對象中:window()
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
return new WindowedStream<>(this, assigner);
}
返回的是一個WindowedStream類:
public class WindowedStream<T, K, W extends Window> {
/** The keyed data stream that is windowed by this stream. */
private final KeyedStream<T, K> input;
/** The window assigner. */
private final WindowAssigner<? super T, W> windowAssigner;
/** The trigger that is used for window evaluation/emission. */
private Trigger<? super T, ? super W> trigger;
/** The evictor that is used for evicting elements before window evaluation. */
private Evictor<? super T, ? super W> evictor;
/** The user-specified allowed lateness. */
private long allowedLateness = 0L;
// 其他省略
public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
function = input.getExecutionEnvironment().clean(function);
return apply(new InternalIterableWindowFunction<>(function), resultType, function);
}
第一反應(yīng),WindowedStream雖然名字叫WindowedStream但是他不是DataStream類(雖然源代碼中也在datastream包中)暮的!但是呢笙以,他提供了一系列計算操作function,返回的可都是DataStream類的子類:SingleOutputStreamOperator類冻辩。
之后呢猖腕,看到了WindowedStream的成員和方法拆祈,可以看到窗口機制的組件 windowAssigner、trigger倘感、evicto和窗口計算函數(shù)都在放坏,開心啊,按圖索驥即可老玛!
根據(jù)實際運行時的dataflow來看淤年,最終Flink拓撲會被轉(zhuǎn)換為一個有一個包含算子的處理結(jié)構(gòu)。Flink怎么把窗口機制所有的組件都調(diào)動起來呢蜡豹?通過觀察窗口計算函數(shù)返回值都是DataStream類麸粮,整個拓撲就串起來了,對應(yīng)有就有相應(yīng)的Transformation镜廉,也就有相應(yīng)的operator(StreamOperator真正在底層處理一個一個元素的操作類)弄诲。WindowStream 的apply方法對應(yīng)調(diào)用一個private apply方法:
private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {
final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null);
KeySelector<T, K> keySel = input.getKeySelector();
WindowOperator<K, T, Iterable<T>, R, W> operator;
if (evictor != null) {
@SuppressWarnings({"unchecked", "rawtypes"})
TypeSerializer<StreamRecord<T>> streamRecordSerializer =
(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
ListStateDescriptor<StreamRecord<T>> stateDesc =
new ListStateDescriptor<>("window-contents", streamRecordSerializer);
operator =
new EvictingWindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
function,
trigger,
evictor,
allowedLateness,
lateDataOutputTag);
} else {
ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
input.getType().createSerializer(getExecutionEnvironment().getConfig()));
operator =
new WindowOperator<>(windowAssigner,
windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
function,
trigger,
allowedLateness,
lateDataOutputTag);
}
return input.transform(opName, resultType, operator);
}
全程都在構(gòu)建operator啊桨吊! 最后通過 return input.transform(opName, resultType, operator); 也就是說還是串在 KeyedStream上的哦威根!所以說WindowStream看似是一個 Stream其實只是為了構(gòu)建Window機制而提供的API,到真正Flink 運行的時候视乐,所有在KeyedStream定義的時間窗口洛搀,最終都會因為window function的調(diào)用返回一個DataStream,一個新的 Transformation被創(chuàng)建佑淀,窗口中的各種組件 windowAssigner 留美、trigger、evictor都會被打包在EvictingWindowOperator或者WindowOperator傳給這個Transformation伸刃,Transformation 為王盎牙!
我們來看看Transformation的operator對象捧颅,以WindowOperator類(AbstractUdfStreamOperator的子類)為例景图,看看他的processElement方法,代碼很長100多行碉哑,先看骨架:
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
//if element is handled by none of assigned elementWindows
boolean isSkippedElement = true;
final K key = this.<K>getKeyedStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
// 代碼塊1挚币,windowAssigner可以merge
} else {
// 代碼塊2,windowAssigner不可以merge
}
// side output input event if
// element not handled by any window
// late arriving tag has been set
// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
if (isSkippedElement && isElementLate(element)) {
if (lateDataOutputTag != null){
sideOutput(element);
} else {
this.numLateRecordsDropped.inc();
}
}
}
第一步扣典,調(diào)用windowAssigner.assignWindows給當前event element分配Window妆毕。
第二步,核心贮尖,會針對第一步window集合笛粘,一個window一個window一次處理,調(diào)用trigger.onElement方法,如果fire的話就會調(diào)用window function進行計算薪前。
第三部润努,處理side ouput,遲到的元素序六。
四任连、探究剖析--WindowAssigner類
Flink源碼中,WindowAssigner類對應(yīng)滾動窗口的類有TumblingEventTimeWindows和TumblingProcessingTimeWindows例诀,我們以TumblingEventTimeWindows為例随抠,二者區(qū)別主要是窗口時間使用Event Time還是Process Time。
先看一下WindowAssigner類源碼繁涂,可以看出主要包含四個抽象方法拱她。
/**
* WindowAssigner可以分配 0個或者多個 Windows 給 Event 元素.
* @param <T> Event 元素類別.
* @param <W> Window類別.
*/
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 返回Event element 應(yīng)該被分配的 window的集合
* @param timestamp :event 的時間戳.
*/
public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
/**
* 返回WindowAssigner默認的trigger
*/
public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
/**
* 返回Window 的 TypeSerializer
*/
public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
/**
* 是否是 event time
*/
public abstract boolean isEventTime();
/**
*其他省略
*/
}
方法名稱 | 方法用途 |
---|---|
assignWindows | 返回Event element 應(yīng)該被分配的 window的集合 |
getDefaultTrigger | 返回WindowAssigner默認的trigger |
getWindowSerializer | 返回Window 的 TypeSerializer |
isEventTime | 是否是 event time |
接下來 我們來看 繼承WindowAssigner類 的 TumblingEventTimeWindows類的具體實現(xiàn)。
/**
* 示例:keyedStream.window(TumblingEventTimeWindows.of(Time.minutes(1)));
*/
@PublicEvolving
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
// 窗口大小
private final long size;
// 偏移量
private final long offset;
protected TumblingEventTimeWindows(long size, long offset) {
if (offset < 0 || offset >= size) {
throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size");
}
this.size = size;
this.offset = offset;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// 根據(jù)滾動窗口機制扔罪,按照當前timestamp秉沼,計算對應(yīng)窗口的start時間,并返回對應(yīng)窗口
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
// 其他省略
@Override
public boolean isEventTime() {
return true;
}
}
我們可以看到矿酵,主要區(qū)別就是包含了size 和 offset兩個變量唬复,使得 assignWindows 實現(xiàn)的時候可以返回想要的翻滾窗口。另外上面代碼可以看出getDefaultTrigger返回的是 EventTimeTrigger類全肮。
接下來我們看一下Trigger類敞咧。
Trigger方法名稱 | 方法用途 |
---|---|
onElement | 每當有event element 被加到window中,會觸發(fā)辜腺。結(jié)果返回事件元素對應(yīng)的window是否可以進行window function計算休建。 |
onProcessingTime | timer 計時器觸發(fā)調(diào)用,使用的是process time 评疗。 |
onEventTime | 同上测砂,使用的是 event time。 |
canMerge | 是否支持 窗口合并百匆,如果返回true砌些,必須實現(xiàn)onMerge方法 |
onMerge | 當多個window被WindowAssigner合并的時候的調(diào)用。 |
clear | 清理相關(guān)window 的state |
TriggerContext | TriggerContext接口加匈,給Trigger提供state 處理和注冊Timer callback |
OnMergeContext | TriggerContext子接口寄症,onMerge方法使用,增加了mergePartitionedState方法矩动。 |
單獨整理TriggerContext 接口方法
TriggerContext方法名稱 | 方法用途 |
---|---|
getCurrentProcessingTime | 返回當前processing time |
getMetricGroup | 返回MetricGroup類對象 |
getCurrentWatermark | 返回當前Watermark time |
registerProcessingTimeTimer | 注冊time callback ,一旦到達time释漆,Trigger的onProcessingTime會被調(diào)用 |
registerEventTimeTimer | 同上悲没,當watermark 達到time,會觸發(fā)Trigger的onEventTime方法。 |
deleteProcessingTimeTimer | 刪除指定時間的processing time trigger |
deleteEventTimeTimer | 刪除指定時間的event time trigger |
getPartitionedState | 返回 State對象 |
getKeyValueState | 返回ValueState對象 |
/**
* @param <T> Event 元素類別.
* @param <W> Window類別.
*/
@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {
private static final long serialVersionUID = -4104633972991191369L;
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
public boolean canMerge() {
return false;
}
public void onMerge(W window, OnMergeContext ctx) throws Exception {
throw new UnsupportedOperationException("This trigger does not support merging.");
}
public abstract void clear(W window, TriggerContext ctx) throws Exception;
// ------------------------------------------------------------------------
/**
* A context object that is given to {@link Trigger} methods to allow them to register timer
* callbacks and deal with state.
*/
public interface TriggerContext {
long getCurrentProcessingTime();
MetricGroup getMetricGroup();
long getCurrentWatermark();
void registerProcessingTimeTimer(long time);
void registerEventTimeTimer(long time);
void deleteProcessingTimeTimer(long time);
void deleteEventTimeTimer(long time);
<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);
<S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);
<S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
}
public interface OnMergeContext extends TriggerContext {
<S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);
}
}
接下來我們看繼承Trigger的 EventTimeTrigger類的實現(xiàn):
/**
* EventTime使用watermark示姿,一旦 watermark 超過 the end of the window甜橱,EventTimeTrigger觸發(fā)
*/
@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private EventTimeTrigger() {}
/**
* 判斷window的最大時間戳是否小于目前的watermark,小于的話返回TriggerResult.FIRE栈戳,否則的話為這個window注冊一個trimer岂傲,返回TriggerResult.CONTINUE。TriggerResult是個枚舉類型
*/
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
ctx.registerEventTimeTimer(windowMaxTimestamp);
}
}
public static EventTimeTrigger create() {
return new EventTimeTrigger();
}
}
順便看一下枚舉類TriggerResult:
TriggerResult決定window將會發(fā)生什么子檀,如window function是否會調(diào)用镊掖,或者window是否被丟棄。當然如果window里面沒有任何數(shù)據(jù)褂痰,什么都不會發(fā)生亩进。
TriggerResult 值 | 解釋 |
---|---|
CONTINUE | 對于window來說什么都不會發(fā)生 |
FIRE_AND_PURGE | 觸發(fā)window function ,而且 purge |
FIRE | 觸發(fā)window function缩歪, window不會被purged |
PURGE | window會被丟棄归薛,里面所有的元素都被清理 |
public enum TriggerResult {
CONTINUE(false, false),
FIRE_AND_PURGE(true, true),
FIRE(true, false),
PURGE(false, true);
private final boolean fire;
private final boolean purge;
TriggerResult(boolean fire, boolean purge) {
this.purge = purge;
this.fire = fire;
}
public boolean isFire() {
return fire;
}
public boolean isPurge() {
return purge;
}
}