概述
- Windows是Flink流計(jì)算的核心,重點(diǎn)在于窗口的理解和應(yīng)用推励;
- 建議詳細(xì)閱讀官網(wǎng)的window介紹,鏈接地址:https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/operators/windows.html
- 基于flink-1.9.0官網(wǎng)理解陨囊,文章略長(zhǎng)略枯燥逗抑,建議耐心看完剧辐。
窗口Window類型
- 根據(jù)官網(wǎng)的介紹寒亥;如上:
The first snippet refers to keyed streams, while the second to non-keyed ones. As one can see, the only difference is the keyBy(...) call for the keyed streams and the window(...) which becomes windowAll(...) for non-keyed streams. This is also going to serve as a roadmap for the rest of the page.
- 可以看出邮府,對(duì)于窗口的操作分為兩種,一種是keyedstrem溉奕,另一種是DataStream褂傀;他們的主要區(qū)別也僅僅在于建立窗口的時(shí)候一個(gè)為.window(...),一個(gè)為.windowAll(...)加勤。對(duì)于Keyedstream的窗口來(lái)說(shuō)仙辟,他可以使得多任務(wù)并行計(jì)算同波,每一個(gè)logical key stream將會(huì)被獨(dú)立的進(jìn)行處理。
- Keyed Windows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
- Non-Keyed Windows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
- 按照窗口的Assigner來(lái)分叠国,窗口可以分為Tumbling window未檩, sliding window,session window粟焊,global window冤狡,custom window;
- 每種窗口又可分別基于processing time和event time项棠,這樣的話悲雳,窗口的類型嚴(yán)格來(lái)說(shuō)就有很多;
- 還有一種window叫做count window香追,依據(jù)元素到達(dá)的數(shù)量進(jìn)行分配合瓢;
窗口window的生命周期
- 總結(jié)一句話說(shuō):窗口的生命周期開(kāi)始在第一個(gè)屬于這個(gè)窗口的元素到達(dá)的時(shí)候,結(jié)束于第一個(gè)不屬于這個(gè)窗口的元素到達(dá)的時(shí)候透典。窗口結(jié)束時(shí)間:
end timestamp plus the user-specified allowed lateness
晴楔; - 每一個(gè)Window都有一個(gè)Trigger和process functions (ProcessWindowFunction, ReduceFunction, AggregateFunction or FoldFunction)
Keyed vs Non-Keyed Windows
- 第一件事,確定DataStream是否需要keyBy峭咒;
- 如果是使用了keyBy()算子滥崩,初始化定義的DataStream會(huì)被切割為keyed stream,如果沒(méi)有使用keyBy()算子讹语,則不會(huì)被切割钙皮;
- 在keyed stream情況中,可以使用event的任何屬性作為鍵顽决,keyed stream允許window 計(jì)算任務(wù)由多個(gè)task并行執(zhí)行短条,keyed stream可以獨(dú)立于剩余的stream計(jì)算處理,擁有相同key的元素會(huì)被發(fā)送到相同的并行task中才菠;
- 在non-keyed stream茸时,原始的DataStream不會(huì)被切割到很多stream,并且所有的window都是單個(gè)task執(zhí)行赋访,并行度為1可都;
Window Assigners窗口分配器
- 當(dāng)定義DataStream是keyedStream或者non-key stream,接下來(lái)需要定義一個(gè)window assigner。Window Assinger定義了元素如何被分配到window蚓耽。根據(jù)datastream的類型選擇windowAssinger中的方法----window(...) (for keyed streams) or the windowAll() (for non-keyed streams)渠牲;
- windowAssigner負(fù)責(zé)分配每一個(gè)incoming element到一個(gè)或者多個(gè)window中。針對(duì)大多數(shù)的使用場(chǎng)景和案例步悠,F(xiàn)link內(nèi)有預(yù)定義的window assigner签杈,分別為
tumbling windows, sliding windows, session windows and global windows
,共四種。用戶可以通過(guò)繼承WindowAssigner class
實(shí)現(xiàn)自定義window assigner消費(fèi)鼎兽。 - Flink內(nèi)置的window assigner(除了global windows)是基于time分配element到window種答姥,time可以是event time或者processing time铣除。(event time官網(wǎng)連接 event time)
- time-based windows有一個(gè)start timestamp(開(kāi)始時(shí)間戳)和一個(gè)end timestamp(結(jié)束時(shí)間戳),start timestamp和end timestamp是左開(kāi)右閉即
[5,10)
類型鹦付;兩者共同定義window size(窗口長(zhǎng)度)尚粘;確切的說(shuō)window size由start timestamp、end timestamp 敲长、allow lateness 共同確定背苦。 - 在flink代碼內(nèi),F(xiàn)link在使用基于時(shí)間(time-based)的窗口時(shí)采用TimeWindow類型潘明,該窗口具有查詢開(kāi)始和結(jié)束時(shí)間戳的方法行剂,flink帶有附加方法maxTimestamp()返回給定窗口的最大允許時(shí)間戳。
Tumbling Windows 翻滾窗口
-
翻滾窗口 Tumbling Windows分配器將每個(gè)元素分配給指定窗口大小的窗口钳降。 翻滾窗 Tumbling Windows 具有固定的長(zhǎng)度厚宰,不重疊。 例如遂填,如果指定大小為5分鐘的翻滾窗口铲觉,則將評(píng)估當(dāng)前窗口,并且每五分鐘將啟動(dòng)一個(gè)新窗口吓坚,如下圖所示撵幽。
- 窗口使用方法
java
DataStream<T> input = ...;
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
scala
val input: DataStream[T] = ...
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
- 窗口時(shí)間間隔可以使用
Time.milliseconds(x), Time.seconds(x), Time.minutes(x)
中的一種定義; - 翻滾窗口tumbling window有一個(gè)offset參數(shù)選項(xiàng)礁击,可以改變window的對(duì)齊方式盐杂;
An important use case for offsets is to adjust windows to timezones other than UTC-0. For example, in China you would have to specify an offset of Time.hours(-8).
Sliding Windows 滑動(dòng)窗口
- 滑動(dòng)窗口sliding window分配器將element分配到指定長(zhǎng)度的window。和翻滾窗口類似哆窿,sliding window固定相同間隔分配窗口链烈,只不過(guò)每個(gè)窗口之間有重疊。窗口重疊的部分如果比窗口小挚躯,窗口將會(huì)有多個(gè)重疊强衡,即一個(gè)元素可能被分配到多個(gè)窗口里去。
java
DataStream<T> input = ...;
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
scala
val input: DataStream[T] = ...
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
- 窗口時(shí)間間隔可以使用
Time.milliseconds(x), Time.seconds(x), Time.minutes(x)
中的一種定義码荔; An important use case for offsets is to adjust windows to timezones other than UTC-0. For example, in China you would have to specify an offset of Time.hours(-8).
Session Windows會(huì)話窗口
- 主要是根據(jù)活動(dòng)的事件進(jìn)行窗口化漩勤,他們通常不重疊,也沒(méi)有一個(gè)固定的開(kāi)始和結(jié)束時(shí)間缩搅。
- session window和
tumbling windows and sliding windows
相比有明顯的差異越败;一個(gè)session window關(guān)閉通常是由于一段時(shí)間沒(méi)有收到元素。 -
session window會(huì)話窗口分配器可以配置靜態(tài)會(huì)話間隙或會(huì)話間隙提取器功能誉己,該功能定義不活動(dòng)時(shí)間段的長(zhǎng)度眉尸。 當(dāng)此期限到期時(shí)域蜗,當(dāng)前會(huì)話將關(guān)閉巨双,后續(xù)元素將分配給新的會(huì)話窗口噪猾。
- 官網(wǎng)api
java
DataStream<T> input = ...;
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);
scala
val input: DataStream[T] = ...
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
- 靜態(tài)會(huì)話間隔可以使用
one of Time.milliseconds(x), Time.seconds(x), Time.minutes(x)
其中一種定義; - 動(dòng)態(tài)會(huì)話間隔可以通過(guò)實(shí)現(xiàn)
SessionWindowTimeGapExtractor
接口筑累; - Attention:
- 由于session window會(huì)話窗口沒(méi)有固定的開(kāi)始和結(jié)束時(shí)間袱蜡,因此它們的計(jì)算加工方式與tumbling window和sliding window不同。 在內(nèi)部慢宗,session window 算子為每個(gè)到達(dá)的記錄創(chuàng)建一個(gè)新窗口坪蚁,如果它們彼此之間的距離比定義的間隙更接近,則將窗口合并在一起镜沽。 為了可合并敏晤,session window 算子需要合并觸發(fā)器和合并窗口函數(shù)嘴脾,例如ReduceFunction蔬墩,AggregateFunction或ProcessWindowFunction(FoldFunction無(wú)法合并译打。)
Global Windows 全局窗口
- global window配器將具有相同鍵的所有元素分配給同一個(gè)全局窗口拇颅。 此窗口僅在指定自定義觸發(fā)器時(shí)才有用。 否則樟插,將不執(zhí)行任何計(jì)算韵洋,因?yàn)間lobal session沒(méi)有可以聚合元素而自然結(jié)束窗口的生命周期黄锤;
- 官網(wǎng)api
java
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);
scala
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>)
Window Functions 窗口函數(shù)
- 窗口函數(shù)只有四種:ReduceFunction猜扮,AggregateFunction旅赢,F(xiàn)oldFunction,ProcessWindowFunction短纵。前兩個(gè)執(zhí)行得更有效香到,因?yàn)镕link可以增量地聚合每個(gè)到達(dá)窗口的元素悠就。
- ProcessWindowFunction獲取窗口中包含的所有元素的Iterable以及有關(guān)元素所屬窗口的其他元信息梗脾。
- Flink必須在調(diào)用函數(shù)之前在內(nèi)部緩沖窗口中的所有元素炸茧,所以使用ProcessWindowFunction進(jìn)行操作效率不高。不過(guò)ProcessWindowFunction可以跟其他的窗口函數(shù)(
ReduceFunction, AggregateFunction, or FoldFunction
)結(jié)合使用辕狰,其他函數(shù)接受增量信息柳琢,ProcessWindowFunction接受窗口的元數(shù)據(jù)柬脸。
ReduceFunction
- ReduceFunction指定如何組合輸入中的兩個(gè)元素以生成相同類型的輸出元素倒堕。 Flink使用ReduceFunction逐步聚合窗口的元素垦巴。
- 官網(wǎng)api
java
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>> {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
scala
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
AggregateFunction
- AggregateFunction是ReduceFunction的通用版本骤宣,有三種類型:輸入類型(IN)憔披,累加器類型(ACC)和輸出類型(OUT)爸吮。 輸入類型是輸入流DataStream中元素的類型形娇,AggregateFunction具有將一個(gè)輸入元素添加到累加器的方法桐早。 該接口還具有用于創(chuàng)建初始累加器的方法,用于將兩個(gè)累加器合并到一個(gè)累加器中以及用于從累加器提取輸出(類型OUT)的方法祷膳。
- 官網(wǎng)api
java
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate());
scala
/**
* The accumulator is used to keep a running sum and a count. The [getResult] method
* computes the average.
*/
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
override def createAccumulator() = (0L, 0L)
override def add(value: (String, Long), accumulator: (Long, Long)) =
(accumulator._1 + value._2, accumulator._2 + 1L)
override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
override def merge(a: (Long, Long), b: (Long, Long)) =
(a._1 + b._1, a._2 + b._2)
}
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate)
FoldFunction
- FoldFunction指定窗口的輸入元素如何與輸出類型的元素組合万哪。 對(duì)于添加到窗口的每個(gè)元素和當(dāng)前輸出值奕巍,將逐步調(diào)用FoldFunction的止。 第一個(gè)元素與輸出類型的預(yù)定義初始值組合诅福。
- 官網(wǎng)api
java
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("", new FoldFunction<Tuple2<String, Long>, String>> {
public String fold(String acc, Tuple2<String, Long> value) {
return acc + value.f1;
}
});
scala
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("") { (acc, v) => acc + v._2 }
- Attention:fold() cannot be used with session windows or other mergeable windows.
ProcessWindowFunction
- ProcessWindowFunction能獲取包含窗口所有元素的Iterable氓润,以及可訪問(wèn)時(shí)間和狀態(tài)信息的Context對(duì)象咖气,這使其能夠提供比其他窗口函數(shù)更多的靈活性崩溪。 這是以性能和資源消耗為代價(jià)的,因?yàn)樵夭荒芤赃f增方式聚合觉既,而是需要在內(nèi)部進(jìn)行緩沖奋救,直到認(rèn)為窗口已準(zhǔn)備好進(jìn)行處理。
- DataStream的key是通過(guò)為keyBy()調(diào)用指定的KeySelector提取姿染。 在元組索引鍵或字符串字段引用的情況下,此鍵類型始終為Tuple娄徊,必須手動(dòng)將其轉(zhuǎn)換為正確大小的元組以提取鍵字段盾戴。
- 官網(wǎng)api
java
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(t -> t.f0)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction());
/* ... */
public class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple2<String, Long> in: input) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
}
}
scala
val input: DataStream[(String, Long)] = ...
input
.keyBy(_._1)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction())
/* ... */
class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = {
var count = 0L
for (in <- input) {
count = count + 1
}
out.collect(s"Window ${context.window} count: $count")
}
}
ProcessWindowFunction with Incremental Aggregation
- ProcessWindowFunction可以與ReduceFunction橄仆,AggregateFunction或FoldFunction結(jié)合使用盆顾,以便在元素到達(dá)窗口時(shí)遞增聚合元素您宪。 當(dāng)關(guān)閉窗口時(shí)宪巨,
ReduceFunction捏卓,AggregateFunction或FoldFunction
將為ProcessWindowFunction提供聚合結(jié)果达皿。 這允許ReduceFunction,AggregateFunction或FoldFunction
訪問(wèn)ProcessWindowFunction的附加窗口元信息的同時(shí)遞增地計(jì)算窗口龄寞。
Incremental Window Aggregation with ReduceFunction
- java
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
}
}
- scala
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.reduce(
(r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
( key: String,
context: ProcessWindowFunction[_, _, _, TimeWindow]#Context,
minReadings: Iterable[SensorReading],
out: Collector[(Long, SensorReading)] ) =>
{
val min = minReadings.iterator.next()
out.collect((context.window.getStart, min))
}
)
Incremental Window Aggregation with AggregateFunction
- java
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.aggregate(new AverageAggregate(), new MyProcessWindowFunction());
// Function definitions
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<Double> averages,
Collector<Tuple2<String, Double>> out) {
Double average = averages.iterator().next();
out.collect(new Tuple2<>(key, average));
}
}
- scala
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.aggregate(new AverageAggregate(), new MyProcessWindowFunction())
// Function definitions
/**
* The accumulator is used to keep a running sum and a count. The [getResult] method
* computes the average.
*/
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
override def createAccumulator() = (0L, 0L)
override def add(value: (String, Long), accumulator: (Long, Long)) =
(accumulator._1 + value._2, accumulator._2 + 1L)
override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
override def merge(a: (Long, Long), b: (Long, Long)) =
(a._1 + b._1, a._2 + b._2)
}
class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] {
def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double)]): () = {
val average = averages.iterator.next()
out.collect((key, average))
}
}
Incremental Window Aggregation with FoldFunction
- java
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction())
// Function definitions
private static class MyFoldFunction
implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > {
public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) {
Integer cur = acc.getField(2);
acc.setField(cur + 1, 2);
return acc;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<Tuple3<String, Long, Integer>> counts,
Collector<Tuple3<String, Long, Integer>> out) {
Integer count = counts.iterator().next().getField(2);
out.collect(new Tuple3<String, Long, Integer>(key, context.window().getEnd(),count));
}
}
- scala
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.fold (
("", 0L, 0),
(acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
( key: String,
window: TimeWindow,
counts: Iterable[(String, Long, Int)],
out: Collector[(String, Long, Int)] ) =>
{
val count = counts.iterator.next()
out.collect((key, window.getEnd, count._3))
}
)
Using per-window state in ProcessWindowFunction
- ProcessWindowFunction 可以在獲取keyed 狀態(tài)(state)---(as any rich function can)外,同樣可以使用 keyed state--keyed state的作用域?yàn)楫?dāng)前正在處理的窗口科阎。
- 在這種情況下锣笨,了解每個(gè)窗口狀態(tài)所指的窗口是很重要的错英。 涉及不同的“窗口”:
- 指定窗口操作時(shí)定義的窗口:這可能是1小時(shí)的翻滾窗口或滑動(dòng)1小時(shí)的2小時(shí)滑動(dòng)窗口椭岩。
- 給定key的已定義窗口的實(shí)際實(shí)例:對(duì)于user-id xyz判哥,這可能是從12:00到13:00的時(shí)間窗口姨伟。 這基于窗口定義豆励,并且將基于作業(yè)當(dāng)前正在處理的key的數(shù)量以及基于事件落入的time slots而存在許多窗口良蒸。
- 每窗口狀態(tài)與后兩者相關(guān)聯(lián)嫩痰。 這意味著如果我們處理1000個(gè)不同key的事件串纺,并且所有這些event當(dāng)前都落入
[12:00,13:00)
時(shí)間窗口纺棺,那么將有1000個(gè)窗口實(shí)例祷蝌,每個(gè)窗口實(shí)例都有自己的keyed pre-window state(窗口狀態(tài))。 - 在Context對(duì)象上有兩個(gè)方法米丘,process()調(diào)用接收它們?cè)试S訪問(wèn)兩種類型的狀態(tài):
- globalState():允許訪問(wèn)未限定在窗口的keyed state
- windowState():允許訪問(wèn)也限定在窗口范圍內(nèi)的keyed state
- 在開(kāi)發(fā)中拄查,如果一個(gè)window會(huì)多次觸發(fā)執(zhí)行堕扶,那么這個(gè)功能將非常有用挣柬;如果window有遲到的數(shù)據(jù)或者自定義的window trigger會(huì)推測(cè)執(zhí)行(提前觸發(fā)window計(jì)算)邪蛔,在這種情況下侧到,你需要在per-window state存儲(chǔ)先前觸發(fā)的信息或觸發(fā)執(zhí)行的次數(shù)匠抗;
- 使用窗口狀態(tài)時(shí)汞贸,清除窗口時(shí)清除該狀態(tài)也很重要矢腻。 這應(yīng)該在clear()方法中發(fā)生多柑。
Triggers 觸發(fā)器
- 觸發(fā)器決定window function何時(shí)進(jìn)行執(zhí)行(由)。 每個(gè)WindowAssigner都帶有一個(gè)默認(rèn)觸發(fā)器聂沙。
- 如果默認(rèn)觸發(fā)器不符合需要及汉,您可以使用trigger(...)自定義觸發(fā)器豁生。
- trigger interface有五種方法允許trigger對(duì)不同的事件做出響應(yīng):
- oneElement():為添加到窗口的每個(gè)元素調(diào)用方法甸箱。
- oneEventTime():當(dāng)注冊(cè)的事件時(shí)間計(jì)時(shí)器觸發(fā)時(shí)芍殖,將調(diào)用onEventTime()方法豌骏。
- onProcessingTime():當(dāng)注冊(cè)的處理時(shí)間計(jì)時(shí)器觸發(fā)時(shí)窃躲,將調(diào)用onProcessingTime()方法蒂窒。
- onMerge():onMerge()方法與有狀態(tài)觸發(fā)器相關(guān)洒琢,并在它們相應(yīng)的窗口合并時(shí)合并兩個(gè)觸發(fā)器的狀態(tài)衰抑,例如: 使用會(huì)話窗口時(shí)呛踊。
- clear():finaly恋技,clear()方法執(zhí)行刪除相應(yīng)窗口時(shí)所需的任何操作蜻底。
- 關(guān)于上述方法需要注意兩點(diǎn):
- 1)前三個(gè)決定如何通過(guò)返回TriggerResult來(lái)對(duì)其調(diào)用事件進(jìn)行操作薄辅。 該操作可以是以下之一:
- CONTINUE: do nothing,
- FIRE: trigger the computation,
- PURGE: clear the elements in the window, and(官網(wǎng)缺失內(nèi)容......)
- FIRE_AND_PURGE: trigger the computation and clear the elements in the window afterwards.
2)這些方法中的任何一種都可用于為將來(lái)的操作processing- or event-time時(shí)間計(jì)時(shí)器站楚。
Fire and Purge(觸發(fā)和清除)
- 一旦trigger確定窗口已準(zhǔn)備好進(jìn)行處理窿春,它就會(huì)觸發(fā)執(zhí)行旧乞,即返回FIRE或FIRE_AND_PURGE尺栖。 這是窗口算子給出當(dāng)前窗口結(jié)果的信號(hào)延赌。 給定一個(gè)帶有ProcessWindowFunction的窗口挫以,所有元素都傳遞給ProcessWindowFunction(可能在將它們傳遞給evictor后)掐松。 具有ReduceFunction甩栈,AggregateFunction或FoldFunction的Windows只會(huì)發(fā)出聚合的結(jié)果量没。
- 當(dāng)trigger觸發(fā)時(shí)殴蹄,它可以是FIRE或FIRE_AND_PURGE袭灯。 當(dāng)FIRE保留窗口內(nèi)容時(shí)稽荧,F(xiàn)IRE_AND_PURGE會(huì)刪除其內(nèi)容姨丈。 默認(rèn)情況下蟋恬,預(yù)先實(shí)現(xiàn)的觸發(fā)器只需FIRE而不會(huì)清除窗口狀態(tài)歼争。
- purge只是簡(jiǎn)單地刪除窗口的內(nèi)容沐绒,但是并將保留有關(guān)窗口和trigger state的潛在元信息洒沦。
Built-in and Custom Triggers
- The EventTimeTrigger fires based on the progress of event-time as measured by watermarks.
- The ProcessingTimeTrigger fires based on processing time.
- The CountTrigger fires once the number of elements in a window exceeds the given limit.
- The PurgingTrigger takes as argument another trigger and transforms it into a purging one.
Evictors 驅(qū)逐器
- 除了WindowAssigner和Trigger之外瞒津,F(xiàn)link的窗口模型還允許指定可選的Evictor巷蚪。
- evictor可以實(shí)現(xiàn)在窗口trigger觸發(fā)后屁柏,window funtions執(zhí)行
before and/or after
移除元素淌喻; - evictor接口有兩種方法:evictBefore裸删,evictAfter涯塔;
- flink中有三種已經(jīng)實(shí)現(xiàn)好的evictor:
- CountEvictor:從窗口保持用戶指定數(shù)量的元素匕荸,并從窗口緩沖區(qū)的開(kāi)頭丟棄剩余的元素榛搔。
- DeltaEvictor:采用DeltaFunction和閾值药薯,計(jì)算窗口緩沖區(qū)中最后一個(gè)元素與其余每個(gè)元素之間的差值童本,并刪除delta大于或等于閾值的值穷娱。
- TimeEvictor:將參數(shù)作為一個(gè)間隔(以毫秒為單位),對(duì)于給定的窗口嫁盲,它查找其元素中的最大時(shí)間戳max_ts羞秤,并刪除時(shí)間戳小于(max_ts - interval)的所有元素瘾蛋。
- By default, all the pre-implemented evictors apply their logic before the window function.
Allowed Lateness
- 默認(rèn)情況下哺哼,當(dāng)水印超過(guò)窗口end timestamp時(shí)取董,會(huì)刪除延遲元素廊勃。 但是坡垫,F(xiàn)link允許為窗口算子指定最大允許延遲(
allowed lateness
)冰悠。 允許延遲指定元素在被刪除之前可以遲到多少時(shí)間溉卓,并且其默認(rèn)值為0桑寨。在element的watermark已經(jīng)過(guò)了窗口end timestamp尉尾,但在element的watermark在窗口的end timestamp+allowed lateness時(shí)間前沙咏,element仍然被添加到對(duì)應(yīng)的窗口中。 根據(jù)所使用的trriger吆豹,延遲但未丟棄的元素可能會(huì)導(dǎo)致窗口再次觸發(fā)痘煤。 EventTimeTrigger就是這種情況凑阶。 - By default, the allowed lateness is set to 0. That is, elements that arrive behind the watermark will be dropped.
- 官網(wǎng)api
java
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>);
scala
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>)
- When using the GlobalWindows window assigner no data is ever considered late because the end timestamp of the global window is Long.MAX_VALUE。
Getting late data as a side output
- Using Flink’s side output feature you can get a stream of the data that was discarded as late.
- You first need to specify that you want to get late data using
sideOutputLateData(OutputTag)
on the windowed stream. Then, you can get the side-output stream on the result of the windowed operation: - 官網(wǎng)api
java
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
DataStream<T> input = ...;
SingleOutputStreamOperator<T> result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>);
DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
scala
val lateOutputTag = OutputTag[T]("late-data")
val input: DataStream[T] = ...
val result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>)
val lateStream = result.getSideOutput(lateOutputTag)
Late elements considerations 遲到元素的考慮項(xiàng)
- 當(dāng)指定允許的延遲大于0時(shí)速勇,在水印通過(guò)窗口結(jié)束后保持窗口及其內(nèi)容晌砾。 在這些情況下,當(dāng)一個(gè)遲到但未刪除的元素到達(dá)時(shí)烦磁,它可能觸發(fā)窗口的trigger养匈。 這些觸發(fā)被稱為
late firings
都伪,因?yàn)樗鼈兪怯蛇t到事件觸發(fā)的呕乎,與main firing
相反,后者是窗口的第一次觸發(fā)陨晶。 在會(huì)話窗口的情況下猬仁,late firings
可以進(jìn)一步導(dǎo)致窗口的合并,因?yàn)樗鼈兛梢浴皹蚪印眱蓚€(gè)預(yù)先存在的未合并窗口之間的間隙先誉。 - 你應(yīng)該知道湿刽,后期觸發(fā)發(fā)出的元素應(yīng)該被視為先前計(jì)算的更新結(jié)果,即你的數(shù)據(jù)流將包含同一計(jì)算的多個(gè)結(jié)果褐耳。 根據(jù)你的應(yīng)用程序诈闺,你需要考慮這些重復(fù)的結(jié)果或?qū)ζ溥M(jìn)行重復(fù)數(shù)據(jù)刪除。
Consecutive windowed operations 連續(xù)的窗口操作
- 如前所述铃芦,計(jì)算窗口結(jié)果的時(shí)間戳的方式以及水印與窗口交互的方式允許將連續(xù)的窗口操作串聯(lián)在一起雅镊。 當(dāng)想要執(zhí)行兩個(gè)連續(xù)的窗口操作時(shí),希望使用不同的key刃滓,但仍希望來(lái)自同一上游窗口的元素最終位于同一下游窗口中仁烹。 考慮這個(gè)例子:
DataStream<Integer> input = ...;
DataStream<Integer> resultsPerKey = input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new Summer());
DataStream<Integer> globalResults = resultsPerKey
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new TopKWindowFunction());
scala
val input: DataStream[Int] = ...
val resultsPerKey = input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new Summer())
val globalResults = resultsPerKey
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new TopKWindowFunction())
Useful state size considerations state size使用注意事項(xiàng)
- Windows可以在很大一段時(shí)間內(nèi)(例如幾天,幾周或幾個(gè)月)定義咧虎,因此可以累積非常大的狀態(tài)卓缰。 在估算窗口計(jì)算的存儲(chǔ)要求時(shí),需要記住幾條規(guī)則:
- Flink為每個(gè)窗口創(chuàng)建一個(gè)每個(gè)元素的副本老客。 鑒于此僚饭,翻滾窗口保留每個(gè)元素的一個(gè)副本(一個(gè)元素恰好屬于一個(gè)窗口,除非它被延遲)胧砰。 相反,滑動(dòng)窗口會(huì)創(chuàng)建每個(gè)元素的幾個(gè)苇瓣,如"window assigner"部分中所述尉间。 因此,window size為1天且滑動(dòng)1秒的滑動(dòng)窗口可能不是一個(gè)好主意。
- ReduceFunction哲嘲,AggregateFunction和FoldFunction可以顯著降低存儲(chǔ)要求贪薪,因?yàn)樗鼈兤惹械鼐酆显夭⑶颐總€(gè)窗口只存儲(chǔ)一個(gè)值。 相反眠副,只需使用ProcessWindowFunction就需要存儲(chǔ)所有元素画切。
- 使用Evictor可以防止任何預(yù)聚合,因?yàn)樵趹?yīng)用計(jì)算之前囱怕,窗口的所有元素都必須通過(guò)evictor傳遞(參見(jiàn)Evictors)霍弹。
- 參考博客: Flink窗口介紹及應(yīng)用