Flink--基于官網(wǎng)對(duì)窗口Window的理解

概述

窗口Window類型

  • 根據(jù)官網(wǎng)的介紹寒亥;如上:The first snippet refers to keyed streams, while the second to non-keyed ones. As one can see, the only difference is the keyBy(...) call for the keyed streams and the window(...) which becomes windowAll(...) for non-keyed streams. This is also going to serve as a roadmap for the rest of the page.
  • 可以看出邮府,對(duì)于窗口的操作分為兩種,一種是keyedstrem溉奕,另一種是DataStream褂傀;他們的主要區(qū)別也僅僅在于建立窗口的時(shí)候一個(gè)為.window(...),一個(gè)為.windowAll(...)加勤。對(duì)于Keyedstream的窗口來(lái)說(shuō)仙辟,他可以使得多任務(wù)并行計(jì)算同波,每一個(gè)logical key stream將會(huì)被獨(dú)立的進(jìn)行處理。
  • Keyed Windows
stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"
  • Non-Keyed Windows
stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"
  • 按照窗口的Assigner來(lái)分叠国,窗口可以分為Tumbling window未檩, sliding window,session window粟焊,global window冤狡,custom window;
  • 每種窗口又可分別基于processing time和event time项棠,這樣的話悲雳,窗口的類型嚴(yán)格來(lái)說(shuō)就有很多;
  • 還有一種window叫做count window香追,依據(jù)元素到達(dá)的數(shù)量進(jìn)行分配合瓢;

窗口window的生命周期

  • 總結(jié)一句話說(shuō):窗口的生命周期開(kāi)始在第一個(gè)屬于這個(gè)窗口的元素到達(dá)的時(shí)候,結(jié)束于第一個(gè)不屬于這個(gè)窗口的元素到達(dá)的時(shí)候透典。窗口結(jié)束時(shí)間: end timestamp plus the user-specified allowed lateness晴楔;
  • 每一個(gè)Window都有一個(gè)Trigger和process functions (ProcessWindowFunction, ReduceFunction, AggregateFunction or FoldFunction)

Keyed vs Non-Keyed Windows

  • 第一件事,確定DataStream是否需要keyBy峭咒;
  • 如果是使用了keyBy()算子滥崩,初始化定義的DataStream會(huì)被切割為keyed stream,如果沒(méi)有使用keyBy()算子讹语,則不會(huì)被切割钙皮;
  • 在keyed stream情況中,可以使用event的任何屬性作為鍵顽决,keyed stream允許window 計(jì)算任務(wù)由多個(gè)task并行執(zhí)行短条,keyed stream可以獨(dú)立于剩余的stream計(jì)算處理,擁有相同key的元素會(huì)被發(fā)送到相同的并行task中才菠;
  • 在non-keyed stream茸时,原始的DataStream不會(huì)被切割到很多stream,并且所有的window都是單個(gè)task執(zhí)行赋访,并行度為1可都;

Window Assigners窗口分配器

  • 當(dāng)定義DataStream是keyedStream或者non-key stream,接下來(lái)需要定義一個(gè)window assigner。Window Assinger定義了元素如何被分配到window蚓耽。根據(jù)datastream的類型選擇windowAssinger中的方法----window(...) (for keyed streams) or the windowAll() (for non-keyed streams)渠牲;
  • windowAssigner負(fù)責(zé)分配每一個(gè)incoming element到一個(gè)或者多個(gè)window中。針對(duì)大多數(shù)的使用場(chǎng)景和案例步悠,F(xiàn)link內(nèi)有預(yù)定義的window assigner签杈,分別為tumbling windows, sliding windows, session windows and global windows,共四種。用戶可以通過(guò)繼承WindowAssigner class實(shí)現(xiàn)自定義window assigner消費(fèi)鼎兽。
  • Flink內(nèi)置的window assigner(除了global windows)是基于time分配element到window種答姥,time可以是event time或者processing time铣除。(event time官網(wǎng)連接 event time
  • time-based windows有一個(gè)start timestamp(開(kāi)始時(shí)間戳)和一個(gè)end timestamp(結(jié)束時(shí)間戳),start timestamp和end timestamp是左開(kāi)右閉即 [5,10)類型鹦付;兩者共同定義window size(窗口長(zhǎng)度)尚粘;確切的說(shuō)window size由start timestamp、end timestamp 敲长、allow lateness 共同確定背苦。
  • 在flink代碼內(nèi),F(xiàn)link在使用基于時(shí)間(time-based)的窗口時(shí)采用TimeWindow類型潘明,該窗口具有查詢開(kāi)始和結(jié)束時(shí)間戳的方法行剂,flink帶有附加方法maxTimestamp()返回給定窗口的最大允許時(shí)間戳。

Tumbling Windows 翻滾窗口

  • 翻滾窗口 Tumbling Windows分配器將每個(gè)元素分配給指定窗口大小的窗口钳降。 翻滾窗 Tumbling Windows 具有固定的長(zhǎng)度厚宰,不重疊。 例如遂填,如果指定大小為5分鐘的翻滾窗口铲觉,則將評(píng)估當(dāng)前窗口,并且每五分鐘將啟動(dòng)一個(gè)新窗口吓坚,如下圖所示撵幽。


    tumbling-windows
  • 窗口使用方法
    java
DataStream<T> input = ...;

// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

scala

val input: DataStream[T] = ...

// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)
  • 窗口時(shí)間間隔可以使用Time.milliseconds(x), Time.seconds(x), Time.minutes(x)中的一種定義;
  • 翻滾窗口tumbling window有一個(gè)offset參數(shù)選項(xiàng)礁击,可以改變window的對(duì)齊方式盐杂;
  • An important use case for offsets is to adjust windows to timezones other than UTC-0. For example, in China you would have to specify an offset of Time.hours(-8).

Sliding Windows 滑動(dòng)窗口

  • 滑動(dòng)窗口sliding window分配器將element分配到指定長(zhǎng)度的window。和翻滾窗口類似哆窿,sliding window固定相同間隔分配窗口链烈,只不過(guò)每個(gè)窗口之間有重疊。窗口重疊的部分如果比窗口小挚躯,窗口將會(huì)有多個(gè)重疊强衡,即一個(gè)元素可能被分配到多個(gè)窗口里去。
    sliding-windows

    java
DataStream<T> input = ...;

// sliding event-time windows
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// sliding processing-time windows
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// sliding processing-time windows offset by -8 hours
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

scala

val input: DataStream[T] = ...

// sliding event-time windows
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows offset by -8 hours
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)
  • 窗口時(shí)間間隔可以使用Time.milliseconds(x), Time.seconds(x), Time.minutes(x)中的一種定義码荔;
  • An important use case for offsets is to adjust windows to timezones other than UTC-0. For example, in China you would have to specify an offset of Time.hours(-8).

Session Windows會(huì)話窗口

  • 主要是根據(jù)活動(dòng)的事件進(jìn)行窗口化漩勤,他們通常不重疊,也沒(méi)有一個(gè)固定的開(kāi)始和結(jié)束時(shí)間缩搅。
  • session window和tumbling windows and sliding windows相比有明顯的差異越败;一個(gè)session window關(guān)閉通常是由于一段時(shí)間沒(méi)有收到元素。
  • session window會(huì)話窗口分配器可以配置靜態(tài)會(huì)話間隙或會(huì)話間隙提取器功能誉己,該功能定義不活動(dòng)時(shí)間段的長(zhǎng)度眉尸。 當(dāng)此期限到期時(shí)域蜗,當(dāng)前會(huì)話將關(guān)閉巨双,后續(xù)元素將分配給新的會(huì)話窗口噪猾。


    session-window
  • 官網(wǎng)api
    java
DataStream<T> input = ...;

// event-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// event-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .<windowed transformation>(<window function>);

// processing-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// processing-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .<windowed transformation>(<window function>);

scala

val input: DataStream[T] = ...

// event-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)

// event-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)

// processing-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)


// processing-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)
  • 靜態(tài)會(huì)話間隔可以使用one of Time.milliseconds(x), Time.seconds(x), Time.minutes(x)其中一種定義;
  • 動(dòng)態(tài)會(huì)話間隔可以通過(guò)實(shí)現(xiàn) SessionWindowTimeGapExtractor 接口筑累;
  • Attention:
  • 由于session window會(huì)話窗口沒(méi)有固定的開(kāi)始和結(jié)束時(shí)間袱蜡,因此它們的計(jì)算加工方式與tumbling window和sliding window不同。 在內(nèi)部慢宗,session window 算子為每個(gè)到達(dá)的記錄創(chuàng)建一個(gè)新窗口坪蚁,如果它們彼此之間的距離比定義的間隙更接近,則將窗口合并在一起镜沽。 為了可合并敏晤,session window 算子需要合并觸發(fā)器和合并窗口函數(shù)嘴脾,例如ReduceFunction蔬墩,AggregateFunction或ProcessWindowFunction(FoldFunction無(wú)法合并译打。)

Global Windows 全局窗口

  • global window配器將具有相同鍵的所有元素分配給同一個(gè)全局窗口拇颅。 此窗口僅在指定自定義觸發(fā)器時(shí)才有用。 否則樟插,將不執(zhí)行任何計(jì)算韵洋,因?yàn)間lobal session沒(méi)有可以聚合元素而自然結(jié)束窗口的生命周期黄锤;
  • 官網(wǎng)api
    java
DataStream<T> input = ...;

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>);

scala

val input: DataStream[T] = ...

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>)

Window Functions 窗口函數(shù)

  • 窗口函數(shù)只有四種:ReduceFunction猜扮,AggregateFunction旅赢,F(xiàn)oldFunction,ProcessWindowFunction短纵。前兩個(gè)執(zhí)行得更有效香到,因?yàn)镕link可以增量地聚合每個(gè)到達(dá)窗口的元素悠就。
  • ProcessWindowFunction獲取窗口中包含的所有元素的Iterable以及有關(guān)元素所屬窗口的其他元信息梗脾。
  • Flink必須在調(diào)用函數(shù)之前在內(nèi)部緩沖窗口中的所有元素炸茧,所以使用ProcessWindowFunction進(jìn)行操作效率不高。不過(guò)ProcessWindowFunction可以跟其他的窗口函數(shù)(ReduceFunction, AggregateFunction, or FoldFunction)結(jié)合使用辕狰,其他函數(shù)接受增量信息柳琢,ProcessWindowFunction接受窗口的元數(shù)據(jù)柬脸。

ReduceFunction

  • ReduceFunction指定如何組合輸入中的兩個(gè)元素以生成相同類型的輸出元素倒堕。 Flink使用ReduceFunction逐步聚合窗口的元素垦巴。
  • 官網(wǎng)api
    java
DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce(new ReduceFunction<Tuple2<String, Long>> {
      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
      }
    });

scala

val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }

AggregateFunction

  • AggregateFunction是ReduceFunction的通用版本骤宣,有三種類型:輸入類型(IN)憔披,累加器類型(ACC)和輸出類型(OUT)爸吮。 輸入類型是輸入流DataStream中元素的類型形娇,AggregateFunction具有將一個(gè)輸入元素添加到累加器的方法桐早。 該接口還具有用于創(chuàng)建初始累加器的方法,用于將兩個(gè)累加器合并到一個(gè)累加器中以及用于從累加器提取輸出(類型OUT)的方法祷膳。
  • 官網(wǎng)api
    java
/**
 * The accumulator is used to keep a running sum and a count. The {@code getResult} method
 * computes the average.
 */
private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate());

scala

/**
 * The accumulator is used to keep a running sum and a count. The [getResult] method
 * computes the average.
 */
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
  override def createAccumulator() = (0L, 0L)

  override def add(value: (String, Long), accumulator: (Long, Long)) =
    (accumulator._1 + value._2, accumulator._2 + 1L)

  override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2

  override def merge(a: (Long, Long), b: (Long, Long)) =
    (a._1 + b._1, a._2 + b._2)
}

val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate)

FoldFunction

  • FoldFunction指定窗口的輸入元素如何與輸出類型的元素組合万哪。 對(duì)于添加到窗口的每個(gè)元素和當(dāng)前輸出值奕巍,將逐步調(diào)用FoldFunction的止。 第一個(gè)元素與輸出類型的預(yù)定義初始值組合诅福。
  • 官網(wǎng)api
    java
DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .fold("", new FoldFunction<Tuple2<String, Long>, String>> {
       public String fold(String acc, Tuple2<String, Long> value) {
         return acc + value.f1;
       }
    });

scala

val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .fold("") { (acc, v) => acc + v._2 }
  • Attention:fold() cannot be used with session windows or other mergeable windows.

ProcessWindowFunction

  • ProcessWindowFunction能獲取包含窗口所有元素的Iterable氓润,以及可訪問(wèn)時(shí)間和狀態(tài)信息的Context對(duì)象咖气,這使其能夠提供比其他窗口函數(shù)更多的靈活性崩溪。 這是以性能和資源消耗為代價(jià)的,因?yàn)樵夭荒芤赃f增方式聚合觉既,而是需要在內(nèi)部進(jìn)行緩沖奋救,直到認(rèn)為窗口已準(zhǔn)備好進(jìn)行處理。
  • DataStream的key是通過(guò)為keyBy()調(diào)用指定的KeySelector提取姿染。 在元組索引鍵或字符串字段引用的情況下,此鍵類型始終為Tuple娄徊,必須手動(dòng)將其轉(zhuǎn)換為正確大小的元組以提取鍵字段盾戴。
  • 官網(wǎng)api
    java
DataStream<Tuple2<String, Long>> input = ...;

input
  .keyBy(t -> t.f0)
  .timeWindow(Time.minutes(5))
  .process(new MyProcessWindowFunction());

/* ... */

public class MyProcessWindowFunction 
    extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {

  @Override
  public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
    long count = 0;
    for (Tuple2<String, Long> in: input) {
      count++;
    }
    out.collect("Window: " + context.window() + "count: " + count);
  }
}

scala

val input: DataStream[(String, Long)] = ...

input
  .keyBy(_._1)
  .timeWindow(Time.minutes(5))
  .process(new MyProcessWindowFunction())

/* ... */

class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {

  def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = {
    var count = 0L
    for (in <- input) {
      count = count + 1
    }
    out.collect(s"Window ${context.window} count: $count")
  }
}

ProcessWindowFunction with Incremental Aggregation

  • ProcessWindowFunction可以與ReduceFunction橄仆,AggregateFunction或FoldFunction結(jié)合使用盆顾,以便在元素到達(dá)窗口時(shí)遞增聚合元素您宪。 當(dāng)關(guān)閉窗口時(shí)宪巨,ReduceFunction捏卓,AggregateFunction或FoldFunction將為ProcessWindowFunction提供聚合結(jié)果达皿。 這允許ReduceFunction,AggregateFunction或FoldFunction訪問(wèn)ProcessWindowFunction的附加窗口元信息的同時(shí)遞增地計(jì)算窗口龄寞。

Incremental Window Aggregation with ReduceFunction

  • java
DataStream<SensorReading> input = ...;

input
  .keyBy(<key selector>)
  .timeWindow(<duration>)
  .reduce(new MyReduceFunction(), new MyProcessWindowFunction());

// Function definitions

private static class MyReduceFunction implements ReduceFunction<SensorReading> {

  public SensorReading reduce(SensorReading r1, SensorReading r2) {
      return r1.value() > r2.value() ? r2 : r1;
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {

  public void process(String key,
                    Context context,
                    Iterable<SensorReading> minReadings,
                    Collector<Tuple2<Long, SensorReading>> out) {
      SensorReading min = minReadings.iterator().next();
      out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
  }
}
  • scala
val input: DataStream[SensorReading] = ...

input
  .keyBy(<key selector>)
  .timeWindow(<duration>)
  .reduce(
    (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
    ( key: String,
      context: ProcessWindowFunction[_, _, _, TimeWindow]#Context,
      minReadings: Iterable[SensorReading],
      out: Collector[(Long, SensorReading)] ) =>
      {
        val min = minReadings.iterator.next()
        out.collect((context.window.getStart, min))
      }
  )

Incremental Window Aggregation with AggregateFunction

  • java
DataStream<Tuple2<String, Long>> input = ...;

input
  .keyBy(<key selector>)
  .timeWindow(<duration>)
  .aggregate(new AverageAggregate(), new MyProcessWindowFunction());

// Function definitions

/**
 * The accumulator is used to keep a running sum and a count. The {@code getResult} method
 * computes the average.
 */
private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {

  public void process(String key,
                    Context context,
                    Iterable<Double> averages,
                    Collector<Tuple2<String, Double>> out) {
      Double average = averages.iterator().next();
      out.collect(new Tuple2<>(key, average));
  }
}
  • scala
val input: DataStream[(String, Long)] = ...

input
  .keyBy(<key selector>)
  .timeWindow(<duration>)
  .aggregate(new AverageAggregate(), new MyProcessWindowFunction())

// Function definitions

/**
 * The accumulator is used to keep a running sum and a count. The [getResult] method
 * computes the average.
 */
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
  override def createAccumulator() = (0L, 0L)

  override def add(value: (String, Long), accumulator: (Long, Long)) =
    (accumulator._1 + value._2, accumulator._2 + 1L)

  override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2

  override def merge(a: (Long, Long), b: (Long, Long)) =
    (a._1 + b._1, a._2 + b._2)
}

class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] {

  def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double)]): () = {
    val average = averages.iterator.next()
    out.collect((key, average))
  }
}

Incremental Window Aggregation with FoldFunction

  • java
DataStream<SensorReading> input = ...;

input
  .keyBy(<key selector>)
  .timeWindow(<duration>)
  .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction())

// Function definitions

private static class MyFoldFunction
    implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > {

  public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) {
      Integer cur = acc.getField(2);
      acc.setField(cur + 1, 2);
      return acc;
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {

  public void process(String key,
                    Context context,
                    Iterable<Tuple3<String, Long, Integer>> counts,
                    Collector<Tuple3<String, Long, Integer>> out) {
    Integer count = counts.iterator().next().getField(2);
    out.collect(new Tuple3<String, Long, Integer>(key, context.window().getEnd(),count));
  }
}
  • scala
val input: DataStream[SensorReading] = ...

input
 .keyBy(<key selector>)
 .timeWindow(<duration>)
 .fold (
    ("", 0L, 0),
    (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
    ( key: String,
      window: TimeWindow,
      counts: Iterable[(String, Long, Int)],
      out: Collector[(String, Long, Int)] ) =>
      {
        val count = counts.iterator.next()
        out.collect((key, window.getEnd, count._3))
      }
  )

Using per-window state in ProcessWindowFunction

  • ProcessWindowFunction 可以在獲取keyed 狀態(tài)(state)---(as any rich function can)外,同樣可以使用 keyed state--keyed state的作用域?yàn)楫?dāng)前正在處理的窗口科阎。
  • 在這種情況下锣笨,了解每個(gè)窗口狀態(tài)所指的窗口是很重要的错英。 涉及不同的“窗口”:
  • 指定窗口操作時(shí)定義的窗口:這可能是1小時(shí)的翻滾窗口或滑動(dòng)1小時(shí)的2小時(shí)滑動(dòng)窗口椭岩。
  • 給定key的已定義窗口的實(shí)際實(shí)例:對(duì)于user-id xyz判哥,這可能是從12:00到13:00的時(shí)間窗口姨伟。 這基于窗口定義豆励,并且將基于作業(yè)當(dāng)前正在處理的key的數(shù)量以及基于事件落入的time slots而存在許多窗口良蒸。
  • 每窗口狀態(tài)與后兩者相關(guān)聯(lián)嫩痰。 這意味著如果我們處理1000個(gè)不同key的事件串纺,并且所有這些event當(dāng)前都落入[12:00,13:00)時(shí)間窗口纺棺,那么將有1000個(gè)窗口實(shí)例祷蝌,每個(gè)窗口實(shí)例都有自己的keyed pre-window state(窗口狀態(tài))。
  • 在Context對(duì)象上有兩個(gè)方法米丘,process()調(diào)用接收它們?cè)试S訪問(wèn)兩種類型的狀態(tài):
    • globalState():允許訪問(wèn)未限定在窗口的keyed state
    • windowState():允許訪問(wèn)也限定在窗口范圍內(nèi)的keyed state
  • 在開(kāi)發(fā)中拄查,如果一個(gè)window會(huì)多次觸發(fā)執(zhí)行堕扶,那么這個(gè)功能將非常有用挣柬;如果window有遲到的數(shù)據(jù)或者自定義的window trigger會(huì)推測(cè)執(zhí)行(提前觸發(fā)window計(jì)算)邪蛔,在這種情況下侧到,你需要在per-window state存儲(chǔ)先前觸發(fā)的信息或觸發(fā)執(zhí)行的次數(shù)匠抗;
  • 使用窗口狀態(tài)時(shí)汞贸,清除窗口時(shí)清除該狀態(tài)也很重要矢腻。 這應(yīng)該在clear()方法中發(fā)生多柑。

Triggers 觸發(fā)器

  • 觸發(fā)器決定window function何時(shí)進(jìn)行執(zhí)行(由)。 每個(gè)WindowAssigner都帶有一個(gè)默認(rèn)觸發(fā)器聂沙。
  • 如果默認(rèn)觸發(fā)器不符合需要及汉,您可以使用trigger(...)自定義觸發(fā)器豁生。
  • trigger interface有五種方法允許trigger對(duì)不同的事件做出響應(yīng):
    • oneElement():為添加到窗口的每個(gè)元素調(diào)用方法甸箱。
    • oneEventTime():當(dāng)注冊(cè)的事件時(shí)間計(jì)時(shí)器觸發(fā)時(shí)芍殖,將調(diào)用onEventTime()方法豌骏。
    • onProcessingTime():當(dāng)注冊(cè)的處理時(shí)間計(jì)時(shí)器觸發(fā)時(shí)窃躲,將調(diào)用onProcessingTime()方法蒂窒。
    • onMerge():onMerge()方法與有狀態(tài)觸發(fā)器相關(guān)洒琢,并在它們相應(yīng)的窗口合并時(shí)合并兩個(gè)觸發(fā)器的狀態(tài)衰抑,例如: 使用會(huì)話窗口時(shí)呛踊。
    • clear():finaly恋技,clear()方法執(zhí)行刪除相應(yīng)窗口時(shí)所需的任何操作蜻底。
  • 關(guān)于上述方法需要注意兩點(diǎn):
  • 1)前三個(gè)決定如何通過(guò)返回TriggerResult來(lái)對(duì)其調(diào)用事件進(jìn)行操作薄辅。 該操作可以是以下之一:
  • CONTINUE: do nothing,
  • FIRE: trigger the computation,
  • PURGE: clear the elements in the window, and(官網(wǎng)缺失內(nèi)容......)
  • FIRE_AND_PURGE: trigger the computation and clear the elements in the window afterwards.
    2)這些方法中的任何一種都可用于為將來(lái)的操作processing- or event-time時(shí)間計(jì)時(shí)器站楚。

Fire and Purge(觸發(fā)和清除)

  • 一旦trigger確定窗口已準(zhǔn)備好進(jìn)行處理窿春,它就會(huì)觸發(fā)執(zhí)行旧乞,即返回FIRE或FIRE_AND_PURGE尺栖。 這是窗口算子給出當(dāng)前窗口結(jié)果的信號(hào)延赌。 給定一個(gè)帶有ProcessWindowFunction的窗口挫以,所有元素都傳遞給ProcessWindowFunction(可能在將它們傳遞給evictor后)掐松。 具有ReduceFunction甩栈,AggregateFunction或FoldFunction的Windows只會(huì)發(fā)出聚合的結(jié)果量没。
  • 當(dāng)trigger觸發(fā)時(shí)殴蹄,它可以是FIRE或FIRE_AND_PURGE袭灯。 當(dāng)FIRE保留窗口內(nèi)容時(shí)稽荧,F(xiàn)IRE_AND_PURGE會(huì)刪除其內(nèi)容姨丈。 默認(rèn)情況下蟋恬,預(yù)先實(shí)現(xiàn)的觸發(fā)器只需FIRE而不會(huì)清除窗口狀態(tài)歼争。
  • purge只是簡(jiǎn)單地刪除窗口的內(nèi)容沐绒,但是并將保留有關(guān)窗口和trigger state的潛在元信息洒沦。

Built-in and Custom Triggers

  • The EventTimeTrigger fires based on the progress of event-time as measured by watermarks.
  • The ProcessingTimeTrigger fires based on processing time.
  • The CountTrigger fires once the number of elements in a window exceeds the given limit.
  • The PurgingTrigger takes as argument another trigger and transforms it into a purging one.

Evictors 驅(qū)逐器

  • 除了WindowAssigner和Trigger之外瞒津,F(xiàn)link的窗口模型還允許指定可選的Evictor巷蚪。
  • evictor可以實(shí)現(xiàn)在窗口trigger觸發(fā)后屁柏,window funtions執(zhí)行 before and/or after移除元素淌喻;
  • evictor接口有兩種方法:evictBefore裸删,evictAfter涯塔;
  • flink中有三種已經(jīng)實(shí)現(xiàn)好的evictor:
    • CountEvictor:從窗口保持用戶指定數(shù)量的元素匕荸,并從窗口緩沖區(qū)的開(kāi)頭丟棄剩余的元素榛搔。
    • DeltaEvictor:采用DeltaFunction和閾值药薯,計(jì)算窗口緩沖區(qū)中最后一個(gè)元素與其余每個(gè)元素之間的差值童本,并刪除delta大于或等于閾值的值穷娱。
    • TimeEvictor:將參數(shù)作為一個(gè)間隔(以毫秒為單位),對(duì)于給定的窗口嫁盲,它查找其元素中的最大時(shí)間戳max_ts羞秤,并刪除時(shí)間戳小于(max_ts - interval)的所有元素瘾蛋。
  • By default, all the pre-implemented evictors apply their logic before the window function.

Allowed Lateness

  • 默認(rèn)情況下哺哼,當(dāng)水印超過(guò)窗口end timestamp時(shí)取董,會(huì)刪除延遲元素廊勃。 但是坡垫,F(xiàn)link允許為窗口算子指定最大允許延遲(allowed lateness)冰悠。 允許延遲指定元素在被刪除之前可以遲到多少時(shí)間溉卓,并且其默認(rèn)值為0桑寨。在element的watermark已經(jīng)過(guò)了窗口end timestamp尉尾,但在element的watermark在窗口的end timestamp+allowed lateness時(shí)間前沙咏,element仍然被添加到對(duì)應(yīng)的窗口中。 根據(jù)所使用的trriger吆豹,延遲但未丟棄的元素可能會(huì)導(dǎo)致窗口再次觸發(fā)痘煤。 EventTimeTrigger就是這種情況凑阶。
  • By default, the allowed lateness is set to 0. That is, elements that arrive behind the watermark will be dropped.
  • 官網(wǎng)api
    java
DataStream<T> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .<windowed transformation>(<window function>);

scala

val input: DataStream[T] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .<windowed transformation>(<window function>)
  • When using the GlobalWindows window assigner no data is ever considered late because the end timestamp of the global window is Long.MAX_VALUE。

Getting late data as a side output

  • Using Flink’s side output feature you can get a stream of the data that was discarded as late.
  • You first need to specify that you want to get late data using sideOutputLateData(OutputTag) on the windowed stream. Then, you can get the side-output stream on the result of the windowed operation:
  • 官網(wǎng)api
    java
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};

DataStream<T> input = ...;

SingleOutputStreamOperator<T> result = input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .sideOutputLateData(lateOutputTag)
    .<windowed transformation>(<window function>);

DataStream<T> lateStream = result.getSideOutput(lateOutputTag);

scala

val lateOutputTag = OutputTag[T]("late-data")

val input: DataStream[T] = ...

val result = input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .sideOutputLateData(lateOutputTag)
    .<windowed transformation>(<window function>)

val lateStream = result.getSideOutput(lateOutputTag)

Late elements considerations 遲到元素的考慮項(xiàng)

  • 當(dāng)指定允許的延遲大于0時(shí)速勇,在水印通過(guò)窗口結(jié)束后保持窗口及其內(nèi)容晌砾。 在這些情況下,當(dāng)一個(gè)遲到但未刪除的元素到達(dá)時(shí)烦磁,它可能觸發(fā)窗口的trigger养匈。 這些觸發(fā)被稱為late firings都伪,因?yàn)樗鼈兪怯蛇t到事件觸發(fā)的呕乎,與main firing相反,后者是窗口的第一次觸發(fā)陨晶。 在會(huì)話窗口的情況下猬仁,late firings可以進(jìn)一步導(dǎo)致窗口的合并,因?yàn)樗鼈兛梢浴皹蚪印眱蓚€(gè)預(yù)先存在的未合并窗口之間的間隙先誉。
  • 你應(yīng)該知道湿刽,后期觸發(fā)發(fā)出的元素應(yīng)該被視為先前計(jì)算的更新結(jié)果,即你的數(shù)據(jù)流將包含同一計(jì)算的多個(gè)結(jié)果褐耳。 根據(jù)你的應(yīng)用程序诈闺,你需要考慮這些重復(fù)的結(jié)果或?qū)ζ溥M(jìn)行重復(fù)數(shù)據(jù)刪除。

Consecutive windowed operations 連續(xù)的窗口操作

  • 如前所述铃芦,計(jì)算窗口結(jié)果的時(shí)間戳的方式以及水印與窗口交互的方式允許將連續(xù)的窗口操作串聯(lián)在一起雅镊。 當(dāng)想要執(zhí)行兩個(gè)連續(xù)的窗口操作時(shí),希望使用不同的key刃滓,但仍希望來(lái)自同一上游窗口的元素最終位于同一下游窗口中仁烹。 考慮這個(gè)例子:
DataStream<Integer> input = ...;

DataStream<Integer> resultsPerKey = input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .reduce(new Summer());

DataStream<Integer> globalResults = resultsPerKey
    .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
    .process(new TopKWindowFunction());

scala

val input: DataStream[Int] = ...

val resultsPerKey = input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .reduce(new Summer())

val globalResults = resultsPerKey
    .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
    .process(new TopKWindowFunction())

Useful state size considerations state size使用注意事項(xiàng)

  • Windows可以在很大一段時(shí)間內(nèi)(例如幾天,幾周或幾個(gè)月)定義咧虎,因此可以累積非常大的狀態(tài)卓缰。 在估算窗口計(jì)算的存儲(chǔ)要求時(shí),需要記住幾條規(guī)則:
  • Flink為每個(gè)窗口創(chuàng)建一個(gè)每個(gè)元素的副本老客。 鑒于此僚饭,翻滾窗口保留每個(gè)元素的一個(gè)副本(一個(gè)元素恰好屬于一個(gè)窗口,除非它被延遲)胧砰。 相反,滑動(dòng)窗口會(huì)創(chuàng)建每個(gè)元素的幾個(gè)苇瓣,如"window assigner"部分中所述尉间。 因此,window size為1天且滑動(dòng)1秒的滑動(dòng)窗口可能不是一個(gè)好主意。
  • ReduceFunction哲嘲,AggregateFunction和FoldFunction可以顯著降低存儲(chǔ)要求贪薪,因?yàn)樗鼈兤惹械鼐酆显夭⑶颐總€(gè)窗口只存儲(chǔ)一個(gè)值。 相反眠副,只需使用ProcessWindowFunction就需要存儲(chǔ)所有元素画切。
  • 使用Evictor可以防止任何預(yù)聚合,因?yàn)樵趹?yīng)用計(jì)算之前囱怕,窗口的所有元素都必須通過(guò)evictor傳遞(參見(jiàn)Evictors)霍弹。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市娃弓,隨后出現(xiàn)的幾起案子典格,更是在濱河造成了極大的恐慌,老刑警劉巖台丛,帶你破解...
    沈念sama閱讀 211,290評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件耍缴,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡挽霉,警方通過(guò)查閱死者的電腦和手機(jī)防嗡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)侠坎,“玉大人蚁趁,你說(shuō)我怎么就攤上這事」璞模” “怎么了荣德?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,872評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)童芹。 經(jīng)常有香客問(wèn)我涮瞻,道長(zhǎng),這世上最難降的妖魔是什么假褪? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,415評(píng)論 1 283
  • 正文 為了忘掉前任署咽,我火速辦了婚禮,結(jié)果婚禮上生音,老公的妹妹穿的比我還像新娘宁否。我一直安慰自己慕匠,他們只是感情好域醇,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,453評(píng)論 6 385
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著锅铅,像睡著了一般。 火紅的嫁衣襯著肌膚如雪盐须。 梳的紋絲不亂的頭發(fā)上贼邓,一...
    開(kāi)封第一講書(shū)人閱讀 49,784評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音眼溶,去河邊找鬼晓勇。 笑死,一個(gè)胖子當(dāng)著我的面吹牛绰筛,可吹牛的內(nèi)容都是我干的铝噩。 我是一名探鬼主播,決...
    沈念sama閱讀 38,927評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼骏庸,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼具被!你這毒婦竟也來(lái)了只损?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,691評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤叮叹,失蹤者是張志新(化名)和其女友劉穎蛉顽,沒(méi)想到半個(gè)月后先较,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體拇泣,經(jīng)...
    沈念sama閱讀 44,137評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡噪叙,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,472評(píng)論 2 326
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了债朵。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,622評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡臭杰,死狀恐怖谚中,靈堂內(nèi)的尸體忽然破棺而出宪塔,到底是詐尸還是另有隱情,我是刑警寧澤比搭,帶...
    沈念sama閱讀 34,289評(píng)論 4 329
  • 正文 年R本政府宣布身诺,位于F島的核電站,受9級(jí)特大地震影響霉赡,放射性物質(zhì)發(fā)生泄漏同廉。R本人自食惡果不足惜柑司,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,887評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望蟆湖。 院中可真熱鬧隅津,春花似錦诬垂、人聲如沸伦仍。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,741評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)官脓。三九已至卑笨,卻和暖如春赤兴,著一層夾襖步出監(jiān)牢的瞬間搀缠,已是汗流浹背近迁。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工鉴竭, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留瑰步,地道東北人缩焦。 一個(gè)月前我還...
    沈念sama閱讀 46,316評(píng)論 2 360
  • 正文 我出身青樓袁滥,卻偏偏與公主長(zhǎng)得像题翻,于是被迫代替她去往敵國(guó)和親嵌赠。 傳聞我的和親對(duì)象是個(gè)殘疾皇子姜挺,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,490評(píng)論 2 348