Flink EventTime和watermarks觸發(fā)機制以及sideOutputLateData應用場景

針對數據亂序的需求页畦,需要使用eventtime和watermark來解決。

watermarks的生成方式有兩種:

  • With Periodic Watermarks:周期性的觸發(fā)watermark的生成和發(fā)送
  • With Punctuated Watermarks:基于某些事件觸發(fā)watermark的生成和發(fā)送

第一種方式比較常用内地,本文主要針對Periodic Watermarks進行分析。

參照官網文檔中With Periodic Watermarks的使用方法:

官網With Periodic Watermarks

/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxOutOfOrderness = 3500L // 3.5 seconds

    var currentMaxTimestamp: Long = _

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        val timestamp = element.getCreationTime()
        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
        timestamp
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        new Watermark(currentMaxTimestamp - maxOutOfOrderness)
    }
}

代碼中的extractTimestamp方法是從數據本身中提取EventTime
getCurrentWatermar方法是獲取當前水位線叠赐,利用currentMaxTimestamp - maxOutOfOrderness
maxOutOfOrderness表示是允許數據的最大亂序時間

所以在這里我們使用的話也實現接口AssignerWithPeriodicWatermarks偿洁。

watermark代碼實現

從socket模擬接收數據,然后使用map進行處理轿钠,后面再調用assignTimestampsAndWatermarks方法抽取timestamp并生成watermark巢钓。最后再調用window打印信息來驗證window被觸發(fā)的時機。

Flink主程序代碼:

package com.ly.jtbi

import java.text.SimpleDateFormat

import com.ly.jtbi.wk.StreamingPeriodicWatermark
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import org.apache.flink.api.java.utils.ParameterTool

/**
  * @Auther: fc.w
  * @Date: 2019/4/4
  */
object StreamingWindowWatermark {

  def main(args: Array[String]): Unit = {
    var port: Int = 0
    var hostname: String = ""

    try {
      val parameterTool = ParameterTool.fromArgs(args)
      hostname = parameterTool.get("hostname")
      port = parameterTool.getInt("port")
    } catch {
      case e: Exception => {
        System.err.println("USAGE: \n StreamingWindowWatermark <hostname> <host>")
        System.exit(1)
      }
    }

    // 獲取Flink執(zhí)行環(huán)境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 設置使用eventtime谣膳,默認是使用processtime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 啟用checkpoint
    env.enableCheckpointing(1000)
    // 設置Exactly_once
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    // 設置并行度 1
    env.setParallelism(1)

    val input = env.socketTextStream(hostname, port)

    // 解析輸入的數據
    val dataDStream = input.map(record => {
      val arr = record.split(",")
      (arr(0), arr(1).toLong)
    })

    // 抽取timestamp 和 watermark
   val waterMarkStream = dataDStream.assignTimestampsAndWatermarks(StreamingPeriodicWatermark)

    // 保存被丟棄的亂序數據
    val outputTag = new OutputTag[(String, Long)]("late-data")
    val window = waterMarkStream
      .keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(3))) // 按照消息的EventTime分配窗口竿报,和調用TimeWindow效果一樣
      .allowedLateness(Time.seconds(2)) // 允許延遲2s
      .sideOutputLateData(outputTag)
      .apply(new WindowFunction[(String, Long), String, Tuple, TimeWindow]() {
      override def apply(tuple: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
        val key = tuple.toString
        val arrarList = input.toList.sortBy(_._2)
        val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
        val result = key + "," + arrarList.size + "," + sdf.format(arrarList(0)) + "," +
          "" + sdf.format(arrarList(arrarList.size - 1)) + "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd())

        out.collect(result)
      }
    })

    // 把延遲的數據暫時打印到控制臺,實際可以保存到存儲介質中继谚。
    val sideOutput = window.getSideOutput(outputTag)
    sideOutput.print()

    window.print()

    // 因為flink是懶加載的烈菌,所以必須調用execute方法才會執(zhí)行上面的代碼
    env.execute("eventtime-watermark")
  }

}

StreamingPeriodicWatermark代碼實現

package com.ly.jtbi.wk

import java.text.SimpleDateFormat

import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark

/**
  * @Auther: fc.w
  * @Date: 2019/4/4
  */
object StreamingPeriodicWatermark extends AssignerWithPeriodicWatermarks[(String, Long)]{

  var currentMaxTimestamp = 0L
  val maxOutOfOrderness = 10000L // 最大允許的亂序時間是10s

  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

  /**
    * 定義生成watermark的邏輯
    * @return
    */
  override def getCurrentWatermark: Watermark = {
    new Watermark(currentMaxTimestamp - maxOutOfOrderness)
  }

  /**
    * 定義如何提取timestamp
    * @param element
    * @param previousElementTimestamp
    * @return
    */
  override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {
    val timestamp = element._2
    currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp)
    println("key:" + element._1 + ",eventtime:[" + element._1 + "|" + sdf.format(element._2) + "], currentMaxTimestamp:["+currentMaxTimestamp + "|" +
      sdf.format(currentMaxTimestamp) + "], watermark:[" + getCurrentWatermark().getTimestamp() + "|" + sdf.format(getCurrentWatermark().getTimestamp()) + "]")

    timestamp
  }


}

執(zhí)行流程:

  1. 接收socket數據。
  2. 通過map算子對每行數據按照逗號分隔并轉換成(String,Long) Tuple類型。其中Tuple中的第一個元素代表具體的數據芽世,第二行代表數據的eventTime挚赊。
  3. 提取timestamp,生成watermarks济瓢,允許的最大亂序時間是10s荠割,并打印(key, eventtime, currentMaxTimestamp, watermark)等信息旺矾。
  4. 根據第一個元素分組聚合蔑鹦,window窗口大小為3秒,輸出(key箕宙,窗口內元素個數嚎朽,窗口內最早元素的時間,窗口內最晚元素的時間柬帕,窗口自身開始時間哟忍,窗口自身結束時間)。

watermark的觸發(fā)時機:

  1. 通過watermark和timestamp的時間陷寝,分析輸出來的數據的定window的觸發(fā)時機锅很。

通過socket出入第一條數據:

$ 0001,1554363502000

程序輸出:

$ key:0001,eventtime:[1554363502000|2019-04-04 15:38:22.000], currentMaxTimestamp:[1554363502000|2019-04-04 15:38:22.000], watermark:[1554363492000|2019-04-04 15:38:12.000]

為了方便查看,把輸入內容匯總到表格中

Key Event Time CurrentMaxTimeStamp WaterMark
0001 1554363502000

2019-04-04 15:38:22.000
1554363502000

2019-04-04 15:38:22.000
1554363492000

2019-04-04 15:38:12.000

此時凤跑,wartermark的時間爆安,已經落后于currentMaxTimestamp10秒了。我們繼續(xù)輸入:

$nc -l -p 9000
0001,1554363502000
0001,1554363506000

程序輸出:

Key Event Time CurrentMaxTimeStamp WaterMark
0001 1554363502000

2019-04-04 15:38:22.000
1554363502000

2019-04-04 15:38:22.000
1554363492000

2019-04-04 15:38:12.000
0001 1554363506000

2019-04-04 15:38:26.000
1554363506000

2019-04-04 15:38:26.000
1554363496000

2019-04-04 15:38:16.000

繼續(xù)輸入:

$nc -l -p 9000
0001,1554363502000
0001,1554363506000
0001,1554363512000

程序輸出:

Key Event Time CurrentMaxTimeStamp WaterMark
0001 1554363502000

2019-04-04 15:38:22.000
1554363502000

2019-04-04 15:38:22.000
1554363492000

2019-04-04 15:38:12.000
0001 1554363506000

2019-04-04 15:38:26.000
1554363506000

2019-04-04 15:38:26.000
1554363496000

2019-04-04 15:38:16.000
0001 1554363512000

2019-04-04 15:38:32.000
1554363512000

2019-04-04 15:38:32.000
1554363502000

2019-04-04 15:38:22.000

到這里饶火,window仍然沒有被觸發(fā)鹏控,此時watermark的時間已經等于了第一條數據的Event Time了。那么window到底什么時候被觸發(fā)呢肤寝?

繼續(xù)輸入:

$nc -l -p 9000
0001,1554363502000
0001,1554363506000
0001,1554363512000
0001,1554363513000

程序輸出:

Key Event Time CurrentMaxTimeStamp WaterMark
0001 1554363502000

2019-04-04 15:38:22.000
1554363502000

2019-04-04 15:38:22.000
1554363492000

2019-04-04 15:38:12.000
0001 1554363506000

2019-04-04 15:38:26.000
1554363506000

2019-04-04 15:38:26.000
1554363496000

2019-04-04 15:38:16.000
0001 1554363512000

2019-04-04 15:38:32.000
1554363512000

2019-04-04 15:38:32.000
1554363502000

2019-04-04 15:38:22.000
0001 1554363513000

2019-04-04 15:38:33.000
1554363513000

2019-04-04 15:38:33.000
1554363503000

2019-04-04 15:38:23.000

window仍然沒有觸發(fā)当辐,此時,我們的數據已經發(fā)到2018-10-01 10:11:33.000了鲤看,根據eventtime來算缘揪,最早的數據已經過去了11秒了,window還沒有開始計算义桂,那到底什么時候會觸發(fā)window呢找筝?

再增加一秒:

$nc -l -p 9000
0001,1554363502000
0001,1554363506000
0001,1554363512000
0001,1554363513000
0001,1554363514000

程序輸出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363502000

2019-04-04 15:38:22.000
1554363502000

2019-04-04 15:38:22.000
1554363492000

2019-04-04 15:38:12.000
0001 1554363506000

2019-04-04 15:38:26.000
1554363506000

2019-04-04 15:38:26.000
1554363496000

2019-04-04 15:38:16.000
0001 1554363512000

2019-04-04 15:38:32.000
1554363512000

2019-04-04 15:38:32.000
1554363502000

2019-04-04 15:38:22.000
0001 1554363513000

2019-04-04 15:38:33.000
1554363513000

2019-04-04 15:38:33.000
1554363503000

2019-04-04 15:38:23.000
0001 1554363514000

2019-04-04 15:38:34.000
1554363514000

2019-04-04 15:38:34.000
1554363504000

2019-04-04 15:38:24.000
[10:11:21.000 10:11:24.000)

到這里,我們做一個說明:
window的觸發(fā)機制慷吊,是先按照自然時間將window劃分袖裕,如果window大小是3秒,那么1分鐘內會把window劃分為如下的形式【左閉右開】:


[00:00:00,00:00:03)
[00:00:03,00:00:06)
[00:00:06,00:00:09)
[00:00:09,00:00:12)
···

[00:00:54,00:00:57)
[00:00:57,00:01:00)
···

window的設定無關數據本身溉瓶,而是系統(tǒng)定義好了的急鳄。

輸入的數據中谤民,根據自身的Event Time,將數據劃分到不同的window中疾宏,如果window中有數據张足,則當watermark時間 >= Event Time時,就符合了window觸發(fā)的條件了坎藐,最終決定window觸發(fā)为牍,還是由數據本身的Event Time所屬的window中的window_end_time決定。

上面的測試中岩馍,最后一條數據到達后碉咆,其水位線已經升至10:11:24秒,正好是最早的一條記錄所在window的window_end_time兼雄,所以window就被觸發(fā)了吟逝。

為了驗證window的觸發(fā)機制,我們繼續(xù)輸入數據:

$nc -l -p 9000
0001,1554363502000
0001,1554363506000
0001,1554363512000
0001,1554363513000
0001,1554363514000
0001,1554363516000

程序輸出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363502000

2019-04-04 15:38:22.000
1554363502000

2019-04-04 15:38:22.000
1554363492000

2019-04-04 15:38:12.000
0001 1554363506000

2019-04-04 15:38:26.000
1554363506000

2019-04-04 15:38:26.000
1554363496000

2019-04-04 15:38:16.000
0001 1554363512000

2019-04-04 15:38:32.000
1554363512000

2019-04-04 15:38:32.000
1554363502000

2019-04-04 15:38:22.000
0001 1554363513000

2019-04-04 15:38:33.000
1554363513000

2019-04-04 15:38:33.000
1554363503000

2019-04-04 15:38:23.000
0001 1554363514000

2019-04-04 15:38:34.000
1554363514000

2019-04-04 15:38:34.000
1554363504000

2019-04-04 15:38:24.000
[10:11:21.000 10:11:24.000)
0001 1554363516000

2019-04-04 15:38:36.000
1554363516000

2019-04-04 15:38:36.000
1554363506000

2019-04-04 15:38:26.000

此時赦肋,watermark時間雖然已經達到了第二條數據的時間,但是由于其沒有達到第二條數據所在window的結束時間励稳,所以window并沒有被觸發(fā)佃乘。那么,第二條數據所在的window時間是:

[00:00:24,00:00:27)

也就是說驹尼,我們必須輸入一個15:38:27秒的數據趣避,第二條數據所在的window才會被觸發(fā)。我們繼續(xù)輸入:

$nc -l -p 9000
0001,1554363502000
0001,1554363506000
0001,1554363512000
0001,1554363513000
0001,1554363514000
0001,1554363516000

程序輸出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363502000

2019-04-04 15:38:22.000
1554363502000

2019-04-04 15:38:22.000
1554363492000

2019-04-04 15:38:12.000
0001 1554363506000

2019-04-04 15:38:26.000
1554363506000

2019-04-04 15:38:26.000
1554363496000

2019-04-04 15:38:16.000
0001 1554363512000

2019-04-04 15:38:32.000
1554363512000

2019-04-04 15:38:32.000
1554363502000

2019-04-04 15:38:22.000
0001 1554363513000

2019-04-04 15:38:33.000
1554363513000

2019-04-04 15:38:33.000
1554363503000

2019-04-04 15:38:23.000
0001 1554363514000

2019-04-04 15:38:34.000
1554363514000

2019-04-04 15:38:34.000
1554363504000

2019-04-04 15:38:24.000
[15:38:21.000 15:38:24.000)
0001 1554363516000

2019-04-04 15:38:36.000
1554363516000

2019-04-04 15:38:36.000
1554363506000

2019-04-04 15:38:26.000
0001 1554363517000

2019-04-04 15:38:37.000
1554363517000

2019-04-04 15:38:37.000
1554363507000

2019-04-04 15:38:27.000
[15:38:24.000 15:38:27.000)

此時新翎,window的觸發(fā)要符合以下幾個條件:

  1. watermark時間 >= window_end_time
    2. 在[window_start_time,window_end_time)區(qū)間中有數據存在程帕,注意是左閉右開的區(qū)間

同時滿足了以上2個條件,window才會觸發(fā)地啰。

watermark+window處理亂序數據

上面的測試愁拭,數據都是按照時間順序遞增的,現在亏吝,我們輸入一些亂序的(late)數據岭埠,看看watermark結合window機制,是如何處理亂序的蔚鸥。

輸入兩行數據:

$nc -l -p 9000
0001,1554363502000
0001,1554363506000
0001,1554363512000
0001,1554363513000
0001,1554363514000
0001,1554363516000
0001,1554363517000
0001,1554363519000
0001,1554363511000

程序輸出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363502000

2019-04-04 15:38:22.000
1554363502000

2019-04-04 15:38:22.000
1554363492000

2019-04-04 15:38:12.000
0001 1554363506000

2019-04-04 15:38:26.000
1554363506000

2019-04-04 15:38:26.000
1554363496000

2019-04-04 15:38:16.000
0001 1554363512000

2019-04-04 15:38:32.000
1554363512000

2019-04-04 15:38:32.000
1554363502000

2019-04-04 15:38:22.000
0001 1554363513000

2019-04-04 15:38:33.000
1554363513000

2019-04-04 15:38:33.000
1554363503000

2019-04-04 15:38:23.000
0001 1554363514000

2019-04-04 15:38:34.000
1554363514000

2019-04-04 15:38:34.000
1554363504000

2019-04-04 15:38:24.000
[15:38:21.000 15:38:24.000)
0001 1554363516000

2019-04-04 15:38:36.000
1554363516000

2019-04-04 15:38:36.000
1554363506000

2019-04-04 15:38:26.000
0001 1554363517000

2019-04-04 15:38:37.000
1554363517000

2019-04-04 15:38:37.000
1554363507000

2019-04-04 15:38:27.000
[15:38:24.000 15:38:27.000)
0001 1554363519000

2019-04-04 15:38:39.000
1554363517000

2019-04-04 15:38:39.000
1554363509000

2019-04-04 15:38:29.000
0001 1554363511000

2019-04-04 15:38:31.000
1554363517000

2019-04-04 15:38:39.000
1554363509000

2019-04-04 15:38:29.000

可以看到惜论,雖然我們輸入了一個15:38:31的數據,但是currentMaxTimestamp和watermark都沒變止喷。此時馆类,按照我們上面提到的公式:

  1. watermark時間 >= window_end_time
    2. 在[window_start_time,window_end_time)區(qū)間中有數據存在,注意是左閉右開的區(qū)間

watermark時間(15:38:29) < window_end_time(15:38:33)弹谁,因此不能觸發(fā)window乾巧。

那如果我們再次輸入一條15:38:43的數據技羔,此時watermark時間會升高到15:38:33,這時的window一定就會觸發(fā)了卧抗,我們試一試:
輸入:

$nc -l -p 9000
0001,1554363502000
0001,1554363506000
0001,1554363512000
0001,1554363513000
0001,1554363514000
0001,1554363516000
0001,1554363517000
0001,1554363519000
0001,1554363511000

程序輸出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363502000

2019-04-04 15:38:22.000
1554363502000

2019-04-04 15:38:22.000
1554363492000

2019-04-04 15:38:12.000
0001 1554363506000

2019-04-04 15:38:26.000
1554363506000

2019-04-04 15:38:26.000
1554363496000

2019-04-04 15:38:16.000
0001 1554363512000

2019-04-04 15:38:32.000
1554363512000

2019-04-04 15:38:32.000
1554363502000

2019-04-04 15:38:22.000
0001 1554363513000

2019-04-04 15:38:33.000
1554363513000

2019-04-04 15:38:33.000
1554363503000

2019-04-04 15:38:23.000
0001 1554363514000

2019-04-04 15:38:34.000
1554363514000

2019-04-04 15:38:34.000
1554363504000

2019-04-04 15:38:24.000
[15:38:21.000 15:38:24.000)
0001 1554363516000

2019-04-04 15:38:36.000
1554363516000

2019-04-04 15:38:36.000
1554363506000

2019-04-04 15:38:26.000
0001 1554363517000

2019-04-04 15:38:37.000
1554363517000

2019-04-04 15:38:37.000
1554363507000

2019-04-04 15:38:27.000
[15:38:24.000 15:38:27.000)
0001 1554363519000

2019-04-04 15:38:39.000
1554363517000

2019-04-04 15:38:39.000
1554363509000

2019-04-04 15:38:29.000
0001 1554363511000

2019-04-04 15:38:31.000
1554363517000

2019-04-04 15:38:39.000
1554363509000

2019-04-04 15:38:29.000
0001 1554363523000

2019-04-04 15:38:43.000
1554363523000

2019-04-04 15:38:43.000
1554363513000

2019-04-04 15:38:33.000
[15:38:30.000 15:38:33.000)

這里可以看到藤滥,窗口中有2個數據,15:38:31和15:38:32社裆,但是沒有15:38:33的數據拙绊,原因是窗口是一個前閉后開的區(qū)間,15:38:33的數據是屬于[15:38:33,15:38:36)的窗口的泳秀。

Flink應該如何設置最大亂序時間 ?

這個要結合自己的業(yè)務以及數據情況去設置标沪。如果maxOutOfOrderness設置的太小,而自身數據發(fā)送時由于網絡等原因導致亂序或者late太多嗜傅,那么最終的結果就是會有很多單條的數據在window中被觸發(fā)金句,數據的正確性影響太大
對于嚴重亂序的數據,需要嚴格統(tǒng)計數據最大延遲時間吕嘀,才能保證計算的數據準確违寞,延時設置太小會影響數據準確性,延時設置太大不僅影響數據的實時性偶房,更加會加重Flink作業(yè)的負擔趁曼,不是對eventTime要求特別嚴格的數據,盡量不要采用eventTime方式來處理棕洋,會有丟數據的風險挡闰。

上邊的結果,已經表明掰盘,對于out-of-order的數據摄悯,Flink可以通過watermark機制結合window的操作,來處理一定范圍內的亂序數據愧捕。那么對于“遲到(late element)”太多的數據奢驯,Flink是怎么處理的呢?

late element(延遲數據)的處理

延遲數據的三種處理方案:

1. 丟棄(默認)

我們輸入一個亂序很多的(其實只要Event Time < watermark時間)數據來測試下:
輸入:

$nc -l -p 9000
0001,1554363510000
0001,1554363523000

程序輸出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:30.000
1554363500000

2019-04-04 15:38:20.000
0001 1554363523000

2019-04-04 15:38:43.000
1554363523000

2019-04-04 15:38:43.000
1554363513000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)

注意:此時watermark是2019-04-04 15:38:33.000

下面我們再輸入幾個eventtime小于watermark的時間
輸入:

$nc -l -p 9000
0001,1554363510000
0001,1554363523000
0001,1554363510000
0001,1554363511000
0001,1554363512000

程序輸出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:30.000
1554363500000

2019-04-04 15:38:20.000
0001 1554363523000

2019-04-04 15:38:43.000
1554363523000

2019-04-04 15:38:43.000
1554363513000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
0001 1554363511000

2019-04-04 15:38:31.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
0001 1554363512000

2019-04-04 15:38:32.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000

注意:此時并沒有觸發(fā)window晃财。因為輸入的數據所在的窗口已經執(zhí)行過了叨橱,flink默認對這些遲到的數據的處理方案就是丟棄。

2. allowedLateness 指定允許數據延遲的時間

在某些情況下断盛,我們希望對遲到的數據再提供一個寬容的時間罗洗。

Flink提供了allowedLateness方法可以實現對遲到的數據設置一個延遲時間,在指定延遲時間內到達的數據還是可以觸發(fā)window執(zhí)行的钢猛。
輸入:

$nc -l -p 9000
0001,1554363510000
0001,1554363523000

程序輸出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:30.000
1554363500000

2019-04-04 15:38:20.000
0001 1554363523000

2019-04-04 15:38:43.000
1554363523000

2019-04-04 15:38:43.000
1554363513000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)

正常觸發(fā)window伙菜,沒什么問題。
此時watermark是2019-04-04 15:38:33.000

那么現在我們輸入幾條eventtime<watermark的數據驗證一下效果
輸入:

$nc -l -p 9000
0001,1554363510000
0001,1554363523000
0001,1554363510000
0001,1554363511000
0001,1554363512000

程序輸出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:30.000
1554363500000

2019-04-04 15:38:20.000
0001 1554363523000

2019-04-04 15:38:43.000
1554363523000

2019-04-04 15:38:43.000
1554363513000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363511000

2019-04-04 15:38:31.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363512000

2019-04-04 15:38:32.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)

我們再輸入一條數據命迈,把water調整到10:11:34
輸入:

$nc -l -p 9000
0001,1554363510000
0001,1554363523000
0001,1554363510000
0001,1554363511000
0001,1554363512000
0001,1554363524000

程序輸出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:30.000
1554363500000

2019-04-04 15:38:20.000
0001 1554363523000

2019-04-04 15:38:43.000
1554363523000

2019-04-04 15:38:43.000
1554363513000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363511000

2019-04-04 15:38:31.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363512000

2019-04-04 15:38:32.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363524000

2019-04-04 15:38:44.000
1554363524000

2019-04-04 15:38:44.000
1554363514000

2019-04-04 15:38:34.000

此時贩绕,把water上升到了15:38:34火的,我們再輸入幾條eventtime<watermark的數據驗證一下效果

輸入:

$nc -l -p 9000
0001,1554363510000
0001,1554363523000
0001,1554363510000
0001,1554363511000
0001,1554363512000
0001,1554363524000
0001,1554363510000
0001,1554363511000
0001,1554363513000

程序輸出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:30.000
1554363500000

2019-04-04 15:38:20.000
0001 1554363523000

2019-04-04 15:38:43.000
1554363523000

2019-04-04 15:38:43.000
1554363513000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363511000

2019-04-04 15:38:31.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363512000

2019-04-04 15:38:32.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363524000

2019-04-04 15:38:44.000
1554363524000

2019-04-04 15:38:44.000
1554363514000

2019-04-04 15:38:34.000
0001 1554363511000

2019-04-04 15:38:31.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363512000

2019-04-04 15:38:32.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363513000

2019-04-04 15:38:33.000
1554363513000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)

發(fā)現輸入的三行數據都觸發(fā)了window的執(zhí)行。

我們再輸入一條數據淑倾,把water調整到15:38:35
輸入:

$nc -l -p 9000
0001,1554363525000

程序輸出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363525000

2019-04-04 15:38:45.000
1554363525000

2019-04-04 15:38:45.000
1554363515000

2019-04-04 15:38:35.000

此時馏鹤,watermark上升到了15:38:35

我們再輸入幾條eventtime<watermark的數據驗證一下效果
輸入:

$nc -l -p 9000
0001,1554363525000
0001,1554363510000
0001,1554363511000
0001,1554363513000

程序輸出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363525000

2019-04-04 15:38:45.000
1554363525000

2019-04-04 15:38:45.000
1554363515000

2019-04-04 15:38:35.000
0001 1554363511000

2019-04-04 15:38:31.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
0001 1554363512000

2019-04-04 15:38:32.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
0001 1554363513000

2019-04-04 15:38:33.000
1554363513000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000

發(fā)現這幾條數據都沒有觸發(fā)window。

分析:

當watemark等于15:38:33的時候娇哆,正好是window_end_time湃累,所以會觸發(fā)[15:38:30~15:38:33) 的window執(zhí)行。

當窗口執(zhí)行過后碍讨,我們輸入[15:38:30~15:38:33) window內的數據會發(fā)現window是可以被觸發(fā)的治力。

當watemark提升到15:38:34的時候,我們輸入[15:38:30~15:38:33)window內的數據會發(fā)現window也是可以被觸發(fā)的勃黍。

當watemark提升到15:38:35的時候宵统,我們輸入[15:38:30~15:38:33)window內的數據會發(fā)現window不會被觸發(fā)了。

由于我們在前面設置了allowedLateness(Time.seconds(2))覆获,可以允許延遲在2s內的數據繼續(xù)觸發(fā)window執(zhí)行马澈。

所以當watermark是15:38:34的時候可以觸發(fā)window,但是15:38:35的時候就不行了锻梳。

總結:

對于此窗口而言箭券,允許2秒的遲到數據,即第一次觸發(fā)是在watermark >= window_end_time時
第二次(或多次)觸發(fā)的條件是watermark < window_end_time + allowedLateness時間內疑枯,這個窗口有l(wèi)ate數據到達時。

解釋:

當watermark等于15:38:34的時候蛔六,我們輸入eventtime為15:38:30荆永、15:38:31、15:38:32的數據的時候国章,是可以觸發(fā)的具钥,因為這些數據的window_end_time都是15:38:33,也就是15:38:34<15:38:33+2 為true液兽。

但是當watermark等于15:38:35的時候骂删,我們再輸入eventtime為15:38:30、15:38:31四啰、15:38:32的數據的時候宁玫,這些數據的window_end_time都是15:38:33,此時柑晒,15:38:35<15:38:33+2 為false了欧瘪。所以最終這些數據遲到的時間太久了,就不會再觸發(fā)window執(zhí)行了匙赞。

3. sideOutputLateData 收集遲到的數據

通過sideOutputLateData 可以把遲到的數據統(tǒng)一收集佛掖,統(tǒng)計存儲妖碉,方便后期排查問題。



輸入:

$nc -l -p 9000
0001,1554363510000
0001,1554363523000

程序輸出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:30.000
1554363500000

2019-04-04 15:38:20.000
0001 1554363523000

2019-04-04 15:38:43.000
1554363523000

2019-04-04 15:38:43.000
1554363513000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)

此時芥被,window被觸發(fā)執(zhí)行了欧宜,此時watermark是15:38:33

下面我們再輸入幾個eventtime小于watermark的數據測試一下
輸入:

$nc -l -p 9000
0001,1554363510000
0001,1554363523000
0001,1554363510000
0001,1554363511000
0001,1554363512000

程序輸出:

Key Event Time CurrentMaxTimeStamp WaterMark window_start_time window_end_time
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:30.000
1554363500000

2019-04-04 15:38:20.000
0001 1554363523000

2019-04-04 15:38:43.000
1554363523000

2019-04-04 15:38:43.000
1554363513000

2019-04-04 15:38:33.000
[15:38:33.000 15:38:33.000)
0001 1554363510000

2019-04-04 15:38:30.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
0001 1554363511000

2019-04-04 15:38:31.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000
0001 1554363512000

2019-04-04 15:38:32.000
1554363510000

2019-04-04 15:38:43.000
1554363500000

2019-04-04 15:38:33.000

此時沟沙,針對這幾條遲到的數據茬祷,都通過sideOutputLateData保存到了outputTag中,然后輸出到控制臺稳衬。

    // 把延遲的數據暫時打印到控制臺羹铅,實際可以保存到存儲介質中蚀狰。
    val sideOutput = window.getSideOutput(outputTag)
    sideOutput.print()
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市职员,隨后出現的幾起案子麻蹋,更是在濱河造成了極大的恐慌,老刑警劉巖焊切,帶你破解...
    沈念sama閱讀 217,509評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件扮授,死亡現場離奇詭異,居然都是意外死亡专肪,警方通過查閱死者的電腦和手機刹勃,發(fā)現死者居然都...
    沈念sama閱讀 92,806評論 3 394
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來嚎尤,“玉大人荔仁,你說我怎么就攤上這事⊙克溃” “怎么了乏梁?”我有些...
    開封第一講書人閱讀 163,875評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長关贵。 經常有香客問我遇骑,道長,這世上最難降的妖魔是什么揖曾? 我笑而不...
    開封第一講書人閱讀 58,441評論 1 293
  • 正文 為了忘掉前任落萎,我火速辦了婚禮,結果婚禮上炭剪,老公的妹妹穿的比我還像新娘练链。我一直安慰自己,他們只是感情好念祭,可當我...
    茶點故事閱讀 67,488評論 6 392
  • 文/花漫 我一把揭開白布兑宇。 她就那樣靜靜地躺著,像睡著了一般。 火紅的嫁衣襯著肌膚如雪隶糕。 梳的紋絲不亂的頭發(fā)上瓷产,一...
    開封第一講書人閱讀 51,365評論 1 302
  • 那天,我揣著相機與錄音枚驻,去河邊找鬼濒旦。 笑死,一個胖子當著我的面吹牛再登,可吹牛的內容都是我干的尔邓。 我是一名探鬼主播,決...
    沈念sama閱讀 40,190評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼锉矢,長吁一口氣:“原來是場噩夢啊……” “哼梯嗽!你這毒婦竟也來了?” 一聲冷哼從身側響起沽损,我...
    開封第一講書人閱讀 39,062評論 0 276
  • 序言:老撾萬榮一對情侶失蹤灯节,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后绵估,有當地人在樹林里發(fā)現了一具尸體炎疆,經...
    沈念sama閱讀 45,500評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,706評論 3 335
  • 正文 我和宋清朗相戀三年国裳,在試婚紗的時候發(fā)現自己被綠了形入。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,834評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡缝左,死狀恐怖亿遂,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情渺杉,我是刑警寧澤崩掘,帶...
    沈念sama閱讀 35,559評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站少办,受9級特大地震影響,放射性物質發(fā)生泄漏诵原。R本人自食惡果不足惜英妓,卻給世界環(huán)境...
    茶點故事閱讀 41,167評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望绍赛。 院中可真熱鬧蔓纠,春花似錦、人聲如沸吗蚌。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,779評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蚯妇。三九已至敷燎,卻和暖如春暂筝,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背硬贯。 一陣腳步聲響...
    開封第一講書人閱讀 32,912評論 1 269
  • 我被黑心中介騙來泰國打工焕襟, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人饭豹。 一個月前我還...
    沈念sama閱讀 47,958評論 2 370
  • 正文 我出身青樓鸵赖,卻偏偏與公主長得像,于是被迫代替她去往敵國和親拄衰。 傳聞我的和親對象是個殘疾皇子它褪,可洞房花燭夜當晚...
    茶點故事閱讀 44,779評論 2 354