針對數據亂序的需求页畦,需要使用eventtime和watermark來解決。
watermarks的生成方式有兩種:
- With Periodic Watermarks:周期性的觸發(fā)watermark的生成和發(fā)送
- With Punctuated Watermarks:基于某些事件觸發(fā)watermark的生成和發(fā)送
第一種方式比較常用内地,本文主要針對Periodic Watermarks進行分析。
參照官網文檔中With Periodic Watermarks的使用方法:
/**
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
*/
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxOutOfOrderness = 3500L // 3.5 seconds
var currentMaxTimestamp: Long = _
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
val timestamp = element.getCreationTime()
currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp
}
override def getCurrentWatermark(): Watermark = {
// return the watermark as current highest timestamp minus the out-of-orderness bound
new Watermark(currentMaxTimestamp - maxOutOfOrderness)
}
}
代碼中的extractTimestamp
方法是從數據本身中提取EventTime
getCurrentWatermar
方法是獲取當前水位線叠赐,利用currentMaxTimestamp - maxOutOfOrderness
maxOutOfOrderness
表示是允許數據的最大亂序時間
所以在這里我們使用的話也實現接口AssignerWithPeriodicWatermarks偿洁。
watermark代碼實現
從socket模擬接收數據,然后使用map進行處理轿钠,后面再調用assignTimestampsAndWatermarks方法抽取timestamp并生成watermark巢钓。最后再調用window打印信息來驗證window被觸發(fā)的時機。
Flink主程序代碼:
package com.ly.jtbi
import java.text.SimpleDateFormat
import com.ly.jtbi.wk.StreamingPeriodicWatermark
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import org.apache.flink.api.java.utils.ParameterTool
/**
* @Auther: fc.w
* @Date: 2019/4/4
*/
object StreamingWindowWatermark {
def main(args: Array[String]): Unit = {
var port: Int = 0
var hostname: String = ""
try {
val parameterTool = ParameterTool.fromArgs(args)
hostname = parameterTool.get("hostname")
port = parameterTool.getInt("port")
} catch {
case e: Exception => {
System.err.println("USAGE: \n StreamingWindowWatermark <hostname> <host>")
System.exit(1)
}
}
// 獲取Flink執(zhí)行環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 設置使用eventtime谣膳,默認是使用processtime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 啟用checkpoint
env.enableCheckpointing(1000)
// 設置Exactly_once
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 設置并行度 1
env.setParallelism(1)
val input = env.socketTextStream(hostname, port)
// 解析輸入的數據
val dataDStream = input.map(record => {
val arr = record.split(",")
(arr(0), arr(1).toLong)
})
// 抽取timestamp 和 watermark
val waterMarkStream = dataDStream.assignTimestampsAndWatermarks(StreamingPeriodicWatermark)
// 保存被丟棄的亂序數據
val outputTag = new OutputTag[(String, Long)]("late-data")
val window = waterMarkStream
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(3))) // 按照消息的EventTime分配窗口竿报,和調用TimeWindow效果一樣
.allowedLateness(Time.seconds(2)) // 允許延遲2s
.sideOutputLateData(outputTag)
.apply(new WindowFunction[(String, Long), String, Tuple, TimeWindow]() {
override def apply(tuple: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
val key = tuple.toString
val arrarList = input.toList.sortBy(_._2)
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
val result = key + "," + arrarList.size + "," + sdf.format(arrarList(0)) + "," +
"" + sdf.format(arrarList(arrarList.size - 1)) + "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd())
out.collect(result)
}
})
// 把延遲的數據暫時打印到控制臺,實際可以保存到存儲介質中继谚。
val sideOutput = window.getSideOutput(outputTag)
sideOutput.print()
window.print()
// 因為flink是懶加載的烈菌,所以必須調用execute方法才會執(zhí)行上面的代碼
env.execute("eventtime-watermark")
}
}
StreamingPeriodicWatermark代碼實現
package com.ly.jtbi.wk
import java.text.SimpleDateFormat
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
/**
* @Auther: fc.w
* @Date: 2019/4/4
*/
object StreamingPeriodicWatermark extends AssignerWithPeriodicWatermarks[(String, Long)]{
var currentMaxTimestamp = 0L
val maxOutOfOrderness = 10000L // 最大允許的亂序時間是10s
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
/**
* 定義生成watermark的邏輯
* @return
*/
override def getCurrentWatermark: Watermark = {
new Watermark(currentMaxTimestamp - maxOutOfOrderness)
}
/**
* 定義如何提取timestamp
* @param element
* @param previousElementTimestamp
* @return
*/
override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {
val timestamp = element._2
currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp)
println("key:" + element._1 + ",eventtime:[" + element._1 + "|" + sdf.format(element._2) + "], currentMaxTimestamp:["+currentMaxTimestamp + "|" +
sdf.format(currentMaxTimestamp) + "], watermark:[" + getCurrentWatermark().getTimestamp() + "|" + sdf.format(getCurrentWatermark().getTimestamp()) + "]")
timestamp
}
}
執(zhí)行流程:
- 接收socket數據。
- 通過map算子對每行數據按照逗號分隔并轉換成(String,Long) Tuple類型。其中Tuple中的第一個元素代表具體的數據芽世,第二行代表數據的eventTime挚赊。
- 提取timestamp,生成watermarks济瓢,允許的最大亂序時間是10s荠割,并打印(key, eventtime, currentMaxTimestamp, watermark)等信息旺矾。
- 根據第一個元素分組聚合蔑鹦,window窗口大小為3秒,輸出(key箕宙,窗口內元素個數嚎朽,窗口內最早元素的時間,窗口內最晚元素的時間柬帕,窗口自身開始時間哟忍,窗口自身結束時間)。
watermark的觸發(fā)時機:
- 通過watermark和timestamp的時間陷寝,分析輸出來的數據的定window的觸發(fā)時機锅很。
通過socket出入第一條數據:
$ 0001,1554363502000
程序輸出:
$ key:0001,eventtime:[1554363502000|2019-04-04 15:38:22.000], currentMaxTimestamp:[1554363502000|2019-04-04 15:38:22.000], watermark:[1554363492000|2019-04-04 15:38:12.000]
為了方便查看,把輸入內容匯總到表格中
Key | Event Time | CurrentMaxTimeStamp | WaterMark |
---|---|---|---|
0001 | 1554363502000 2019-04-04 15:38:22.000 |
1554363502000 2019-04-04 15:38:22.000 |
1554363492000 2019-04-04 15:38:12.000 |
此時凤跑,wartermark的時間爆安,已經落后于currentMaxTimestamp10秒了。我們繼續(xù)輸入:
$nc -l -p 9000
0001,1554363502000
0001,1554363506000
程序輸出:
Key | Event Time | CurrentMaxTimeStamp | WaterMark |
---|---|---|---|
0001 | 1554363502000 2019-04-04 15:38:22.000 |
1554363502000 2019-04-04 15:38:22.000 |
1554363492000 2019-04-04 15:38:12.000 |
0001 | 1554363506000 2019-04-04 15:38:26.000 |
1554363506000 2019-04-04 15:38:26.000 |
1554363496000 2019-04-04 15:38:16.000 |
繼續(xù)輸入:
$nc -l -p 9000
0001,1554363502000
0001,1554363506000
0001,1554363512000
程序輸出:
Key | Event Time | CurrentMaxTimeStamp | WaterMark |
---|---|---|---|
0001 | 1554363502000 2019-04-04 15:38:22.000 |
1554363502000 2019-04-04 15:38:22.000 |
1554363492000 2019-04-04 15:38:12.000 |
0001 | 1554363506000 2019-04-04 15:38:26.000 |
1554363506000 2019-04-04 15:38:26.000 |
1554363496000 2019-04-04 15:38:16.000 |
0001 | 1554363512000 2019-04-04 15:38:32.000 |
1554363512000 2019-04-04 15:38:32.000 |
1554363502000 2019-04-04 15:38:22.000 |
到這里饶火,window仍然沒有被觸發(fā)鹏控,此時watermark的時間已經等于了第一條數據的Event Time了。那么window到底什么時候被觸發(fā)呢肤寝?
繼續(xù)輸入:
$nc -l -p 9000
0001,1554363502000
0001,1554363506000
0001,1554363512000
0001,1554363513000
程序輸出:
Key | Event Time | CurrentMaxTimeStamp | WaterMark |
---|---|---|---|
0001 | 1554363502000 2019-04-04 15:38:22.000 |
1554363502000 2019-04-04 15:38:22.000 |
1554363492000 2019-04-04 15:38:12.000 |
0001 | 1554363506000 2019-04-04 15:38:26.000 |
1554363506000 2019-04-04 15:38:26.000 |
1554363496000 2019-04-04 15:38:16.000 |
0001 | 1554363512000 2019-04-04 15:38:32.000 |
1554363512000 2019-04-04 15:38:32.000 |
1554363502000 2019-04-04 15:38:22.000 |
0001 | 1554363513000 2019-04-04 15:38:33.000 |
1554363513000 2019-04-04 15:38:33.000 |
1554363503000 2019-04-04 15:38:23.000 |
window仍然沒有觸發(fā)当辐,此時,我們的數據已經發(fā)到2018-10-01 10:11:33.000了鲤看,根據eventtime來算缘揪,最早的數據已經過去了11秒了,window還沒有開始計算义桂,那到底什么時候會觸發(fā)window呢找筝?
再增加一秒:
$nc -l -p 9000
0001,1554363502000
0001,1554363506000
0001,1554363512000
0001,1554363513000
0001,1554363514000
程序輸出:
Key | Event Time | CurrentMaxTimeStamp | WaterMark | window_start_time | window_end_time |
---|---|---|---|---|---|
0001 | 1554363502000 2019-04-04 15:38:22.000 |
1554363502000 2019-04-04 15:38:22.000 |
1554363492000 2019-04-04 15:38:12.000 |
||
0001 | 1554363506000 2019-04-04 15:38:26.000 |
1554363506000 2019-04-04 15:38:26.000 |
1554363496000 2019-04-04 15:38:16.000 |
||
0001 | 1554363512000 2019-04-04 15:38:32.000 |
1554363512000 2019-04-04 15:38:32.000 |
1554363502000 2019-04-04 15:38:22.000 |
||
0001 | 1554363513000 2019-04-04 15:38:33.000 |
1554363513000 2019-04-04 15:38:33.000 |
1554363503000 2019-04-04 15:38:23.000 |
||
0001 | 1554363514000 2019-04-04 15:38:34.000 |
1554363514000 2019-04-04 15:38:34.000 |
1554363504000 2019-04-04 15:38:24.000 |
[10:11:21.000 | 10:11:24.000) |
到這里,我們做一個說明:
window的觸發(fā)機制慷吊,是先按照自然時間將window劃分袖裕,如果window大小是3秒,那么1分鐘內會把window劃分為如下的形式【左閉右開】:
[00:00:00,00:00:03)
[00:00:03,00:00:06)
[00:00:06,00:00:09)
[00:00:09,00:00:12)
···
[00:00:54,00:00:57)
[00:00:57,00:01:00)
···
window的設定無關數據本身溉瓶,而是系統(tǒng)定義好了的急鳄。
輸入的數據中谤民,根據自身的Event Time,將數據劃分到不同的window中疾宏,如果window中有數據张足,則當watermark時間 >= Event Time時,就符合了window觸發(fā)的條件了坎藐,最終決定window觸發(fā)为牍,還是由數據本身的Event Time所屬的window中的window_end_time決定。
上面的測試中岩馍,最后一條數據到達后碉咆,其水位線已經升至10:11:24秒,正好是最早的一條記錄所在window的window_end_time兼雄,所以window就被觸發(fā)了吟逝。
為了驗證window的觸發(fā)機制,我們繼續(xù)輸入數據:
$nc -l -p 9000
0001,1554363502000
0001,1554363506000
0001,1554363512000
0001,1554363513000
0001,1554363514000
0001,1554363516000
程序輸出:
Key | Event Time | CurrentMaxTimeStamp | WaterMark | window_start_time | window_end_time |
---|---|---|---|---|---|
0001 | 1554363502000 2019-04-04 15:38:22.000 |
1554363502000 2019-04-04 15:38:22.000 |
1554363492000 2019-04-04 15:38:12.000 |
||
0001 | 1554363506000 2019-04-04 15:38:26.000 |
1554363506000 2019-04-04 15:38:26.000 |
1554363496000 2019-04-04 15:38:16.000 |
||
0001 | 1554363512000 2019-04-04 15:38:32.000 |
1554363512000 2019-04-04 15:38:32.000 |
1554363502000 2019-04-04 15:38:22.000 |
||
0001 | 1554363513000 2019-04-04 15:38:33.000 |
1554363513000 2019-04-04 15:38:33.000 |
1554363503000 2019-04-04 15:38:23.000 |
||
0001 | 1554363514000 2019-04-04 15:38:34.000 |
1554363514000 2019-04-04 15:38:34.000 |
1554363504000 2019-04-04 15:38:24.000 |
[10:11:21.000 | 10:11:24.000) |
0001 | 1554363516000 2019-04-04 15:38:36.000 |
1554363516000 2019-04-04 15:38:36.000 |
1554363506000 2019-04-04 15:38:26.000 |
此時赦肋,watermark時間雖然已經達到了第二條數據的時間,但是由于其沒有達到第二條數據所在window的結束時間励稳,所以window并沒有被觸發(fā)佃乘。那么,第二條數據所在的window時間是:
[00:00:24,00:00:27)
也就是說驹尼,我們必須輸入一個15:38:27秒的數據趣避,第二條數據所在的window才會被觸發(fā)。我們繼續(xù)輸入:
$nc -l -p 9000
0001,1554363502000
0001,1554363506000
0001,1554363512000
0001,1554363513000
0001,1554363514000
0001,1554363516000
程序輸出:
Key | Event Time | CurrentMaxTimeStamp | WaterMark | window_start_time | window_end_time |
---|---|---|---|---|---|
0001 | 1554363502000 2019-04-04 15:38:22.000 |
1554363502000 2019-04-04 15:38:22.000 |
1554363492000 2019-04-04 15:38:12.000 |
||
0001 | 1554363506000 2019-04-04 15:38:26.000 |
1554363506000 2019-04-04 15:38:26.000 |
1554363496000 2019-04-04 15:38:16.000 |
||
0001 | 1554363512000 2019-04-04 15:38:32.000 |
1554363512000 2019-04-04 15:38:32.000 |
1554363502000 2019-04-04 15:38:22.000 |
||
0001 | 1554363513000 2019-04-04 15:38:33.000 |
1554363513000 2019-04-04 15:38:33.000 |
1554363503000 2019-04-04 15:38:23.000 |
||
0001 | 1554363514000 2019-04-04 15:38:34.000 |
1554363514000 2019-04-04 15:38:34.000 |
1554363504000 2019-04-04 15:38:24.000 |
[15:38:21.000 | 15:38:24.000) |
0001 | 1554363516000 2019-04-04 15:38:36.000 |
1554363516000 2019-04-04 15:38:36.000 |
1554363506000 2019-04-04 15:38:26.000 |
||
0001 | 1554363517000 2019-04-04 15:38:37.000 |
1554363517000 2019-04-04 15:38:37.000 |
1554363507000 2019-04-04 15:38:27.000 |
[15:38:24.000 | 15:38:27.000) |
此時新翎,window的觸發(fā)要符合以下幾個條件:
- watermark時間 >= window_end_time
2. 在[window_start_time,window_end_time)區(qū)間中有數據存在程帕,注意是左閉右開的區(qū)間
同時滿足了以上2個條件,window才會觸發(fā)地啰。
watermark+window處理亂序數據
上面的測試愁拭,數據都是按照時間順序遞增的,現在亏吝,我們輸入一些亂序的(late)數據岭埠,看看watermark結合window機制,是如何處理亂序的蔚鸥。
輸入兩行數據:
$nc -l -p 9000
0001,1554363502000
0001,1554363506000
0001,1554363512000
0001,1554363513000
0001,1554363514000
0001,1554363516000
0001,1554363517000
0001,1554363519000
0001,1554363511000
程序輸出:
Key | Event Time | CurrentMaxTimeStamp | WaterMark | window_start_time | window_end_time |
---|---|---|---|---|---|
0001 | 1554363502000 2019-04-04 15:38:22.000 |
1554363502000 2019-04-04 15:38:22.000 |
1554363492000 2019-04-04 15:38:12.000 |
||
0001 | 1554363506000 2019-04-04 15:38:26.000 |
1554363506000 2019-04-04 15:38:26.000 |
1554363496000 2019-04-04 15:38:16.000 |
||
0001 | 1554363512000 2019-04-04 15:38:32.000 |
1554363512000 2019-04-04 15:38:32.000 |
1554363502000 2019-04-04 15:38:22.000 |
||
0001 | 1554363513000 2019-04-04 15:38:33.000 |
1554363513000 2019-04-04 15:38:33.000 |
1554363503000 2019-04-04 15:38:23.000 |
||
0001 | 1554363514000 2019-04-04 15:38:34.000 |
1554363514000 2019-04-04 15:38:34.000 |
1554363504000 2019-04-04 15:38:24.000 |
[15:38:21.000 | 15:38:24.000) |
0001 | 1554363516000 2019-04-04 15:38:36.000 |
1554363516000 2019-04-04 15:38:36.000 |
1554363506000 2019-04-04 15:38:26.000 |
||
0001 | 1554363517000 2019-04-04 15:38:37.000 |
1554363517000 2019-04-04 15:38:37.000 |
1554363507000 2019-04-04 15:38:27.000 |
[15:38:24.000 | 15:38:27.000) |
0001 | 1554363519000 2019-04-04 15:38:39.000 |
1554363517000 2019-04-04 15:38:39.000 |
1554363509000 2019-04-04 15:38:29.000 |
||
0001 | 1554363511000 2019-04-04 15:38:31.000 |
1554363517000 2019-04-04 15:38:39.000 |
1554363509000 2019-04-04 15:38:29.000 |
可以看到惜论,雖然我們輸入了一個15:38:31的數據,但是currentMaxTimestamp和watermark都沒變止喷。此時馆类,按照我們上面提到的公式:
- watermark時間 >= window_end_time
2. 在[window_start_time,window_end_time)區(qū)間中有數據存在,注意是左閉右開的區(qū)間
watermark時間(15:38:29) < window_end_time(15:38:33)弹谁,因此不能觸發(fā)window乾巧。
那如果我們再次輸入一條15:38:43的數據技羔,此時watermark時間會升高到15:38:33,這時的window一定就會觸發(fā)了卧抗,我們試一試:
輸入:
$nc -l -p 9000
0001,1554363502000
0001,1554363506000
0001,1554363512000
0001,1554363513000
0001,1554363514000
0001,1554363516000
0001,1554363517000
0001,1554363519000
0001,1554363511000
程序輸出:
Key | Event Time | CurrentMaxTimeStamp | WaterMark | window_start_time | window_end_time |
---|---|---|---|---|---|
0001 | 1554363502000 2019-04-04 15:38:22.000 |
1554363502000 2019-04-04 15:38:22.000 |
1554363492000 2019-04-04 15:38:12.000 |
||
0001 | 1554363506000 2019-04-04 15:38:26.000 |
1554363506000 2019-04-04 15:38:26.000 |
1554363496000 2019-04-04 15:38:16.000 |
||
0001 | 1554363512000 2019-04-04 15:38:32.000 |
1554363512000 2019-04-04 15:38:32.000 |
1554363502000 2019-04-04 15:38:22.000 |
||
0001 | 1554363513000 2019-04-04 15:38:33.000 |
1554363513000 2019-04-04 15:38:33.000 |
1554363503000 2019-04-04 15:38:23.000 |
||
0001 | 1554363514000 2019-04-04 15:38:34.000 |
1554363514000 2019-04-04 15:38:34.000 |
1554363504000 2019-04-04 15:38:24.000 |
[15:38:21.000 | 15:38:24.000) |
0001 | 1554363516000 2019-04-04 15:38:36.000 |
1554363516000 2019-04-04 15:38:36.000 |
1554363506000 2019-04-04 15:38:26.000 |
||
0001 | 1554363517000 2019-04-04 15:38:37.000 |
1554363517000 2019-04-04 15:38:37.000 |
1554363507000 2019-04-04 15:38:27.000 |
[15:38:24.000 | 15:38:27.000) |
0001 | 1554363519000 2019-04-04 15:38:39.000 |
1554363517000 2019-04-04 15:38:39.000 |
1554363509000 2019-04-04 15:38:29.000 |
||
0001 | 1554363511000 2019-04-04 15:38:31.000 |
1554363517000 2019-04-04 15:38:39.000 |
1554363509000 2019-04-04 15:38:29.000 |
||
0001 | 1554363523000 2019-04-04 15:38:43.000 |
1554363523000 2019-04-04 15:38:43.000 |
1554363513000 2019-04-04 15:38:33.000 |
[15:38:30.000 | 15:38:33.000) |
這里可以看到藤滥,窗口中有2個數據,15:38:31和15:38:32社裆,但是沒有15:38:33的數據拙绊,原因是窗口是一個前閉后開的區(qū)間,15:38:33的數據是屬于[15:38:33,15:38:36)的窗口的泳秀。
Flink應該如何設置最大亂序時間 ?
這個要結合自己的業(yè)務以及數據情況去設置标沪。如果maxOutOfOrderness設置的太小,而自身數據發(fā)送時由于網絡等原因導致亂序或者late太多嗜傅,那么最終的結果就是會有很多單條的數據在window中被觸發(fā)金句,數據的正確性影響太大
對于嚴重亂序的數據,需要嚴格統(tǒng)計數據最大延遲時間吕嘀,才能保證計算的數據準確违寞,延時設置太小會影響數據準確性,延時設置太大不僅影響數據的實時性偶房,更加會加重Flink作業(yè)的負擔趁曼,不是對eventTime要求特別嚴格的數據,盡量不要采用eventTime方式來處理棕洋,會有丟數據的風險挡闰。
上邊的結果,已經表明掰盘,對于out-of-order的數據摄悯,Flink可以通過watermark機制結合window的操作,來處理一定范圍內的亂序數據愧捕。那么對于“遲到(late element)”太多的數據奢驯,Flink是怎么處理的呢?
late element(延遲數據)的處理
延遲數據的三種處理方案:
1. 丟棄(默認)
我們輸入一個亂序很多的(其實只要Event Time < watermark時間)數據來測試下:
輸入:
$nc -l -p 9000
0001,1554363510000
0001,1554363523000
程序輸出:
Key | Event Time | CurrentMaxTimeStamp | WaterMark | window_start_time | window_end_time |
---|---|---|---|---|---|
0001 | 1554363510000 2019-04-04 15:38:30.000 |
1554363510000 2019-04-04 15:38:30.000 |
1554363500000 2019-04-04 15:38:20.000 |
||
0001 | 1554363523000 2019-04-04 15:38:43.000 |
1554363523000 2019-04-04 15:38:43.000 |
1554363513000 2019-04-04 15:38:33.000 |
[15:38:33.000 | 15:38:33.000) |
注意:此時watermark是2019-04-04 15:38:33.000
下面我們再輸入幾個eventtime小于watermark的時間
輸入:
$nc -l -p 9000
0001,1554363510000
0001,1554363523000
0001,1554363510000
0001,1554363511000
0001,1554363512000
程序輸出:
Key | Event Time | CurrentMaxTimeStamp | WaterMark | window_start_time | window_end_time |
---|---|---|---|---|---|
0001 | 1554363510000 2019-04-04 15:38:30.000 |
1554363510000 2019-04-04 15:38:30.000 |
1554363500000 2019-04-04 15:38:20.000 |
||
0001 | 1554363523000 2019-04-04 15:38:43.000 |
1554363523000 2019-04-04 15:38:43.000 |
1554363513000 2019-04-04 15:38:33.000 |
[15:38:33.000 | 15:38:33.000) |
0001 | 1554363510000 2019-04-04 15:38:30.000 |
1554363510000 2019-04-04 15:38:43.000 |
1554363500000 2019-04-04 15:38:33.000 |
||
0001 | 1554363511000 2019-04-04 15:38:31.000 |
1554363510000 2019-04-04 15:38:43.000 |
1554363500000 2019-04-04 15:38:33.000 |
||
0001 | 1554363512000 2019-04-04 15:38:32.000 |
1554363510000 2019-04-04 15:38:43.000 |
1554363500000 2019-04-04 15:38:33.000 |
注意:此時并沒有觸發(fā)window晃财。因為輸入的數據所在的窗口已經執(zhí)行過了叨橱,flink默認對這些遲到的數據的處理方案就是丟棄。
2. allowedLateness 指定允許數據延遲的時間
在某些情況下断盛,我們希望對遲到的數據再提供一個寬容的時間罗洗。
Flink提供了allowedLateness方法可以實現對遲到的數據設置一個延遲時間,在指定延遲時間內到達的數據還是可以觸發(fā)window執(zhí)行的钢猛。
輸入:
$nc -l -p 9000
0001,1554363510000
0001,1554363523000
程序輸出:
Key | Event Time | CurrentMaxTimeStamp | WaterMark | window_start_time | window_end_time |
---|---|---|---|---|---|
0001 | 1554363510000 2019-04-04 15:38:30.000 |
1554363510000 2019-04-04 15:38:30.000 |
1554363500000 2019-04-04 15:38:20.000 |
||
0001 | 1554363523000 2019-04-04 15:38:43.000 |
1554363523000 2019-04-04 15:38:43.000 |
1554363513000 2019-04-04 15:38:33.000 |
[15:38:33.000 | 15:38:33.000) |
正常觸發(fā)window伙菜,沒什么問題。
此時watermark是2019-04-04 15:38:33.000
那么現在我們輸入幾條eventtime<watermark的數據驗證一下效果
輸入:
$nc -l -p 9000
0001,1554363510000
0001,1554363523000
0001,1554363510000
0001,1554363511000
0001,1554363512000
程序輸出:
Key | Event Time | CurrentMaxTimeStamp | WaterMark | window_start_time | window_end_time |
---|---|---|---|---|---|
0001 | 1554363510000 2019-04-04 15:38:30.000 |
1554363510000 2019-04-04 15:38:30.000 |
1554363500000 2019-04-04 15:38:20.000 |
||
0001 | 1554363523000 2019-04-04 15:38:43.000 |
1554363523000 2019-04-04 15:38:43.000 |
1554363513000 2019-04-04 15:38:33.000 |
[15:38:33.000 | 15:38:33.000) |
0001 | 1554363510000 2019-04-04 15:38:30.000 |
1554363510000 2019-04-04 15:38:43.000 |
1554363500000 2019-04-04 15:38:33.000 |
[15:38:33.000 | 15:38:33.000) |
0001 | 1554363511000 2019-04-04 15:38:31.000 |
1554363510000 2019-04-04 15:38:43.000 |
1554363500000 2019-04-04 15:38:33.000 |
[15:38:33.000 | 15:38:33.000) |
0001 | 1554363512000 2019-04-04 15:38:32.000 |
1554363510000 2019-04-04 15:38:43.000 |
1554363500000 2019-04-04 15:38:33.000 |
[15:38:33.000 | 15:38:33.000) |
我們再輸入一條數據命迈,把water調整到10:11:34
輸入:
$nc -l -p 9000
0001,1554363510000
0001,1554363523000
0001,1554363510000
0001,1554363511000
0001,1554363512000
0001,1554363524000
程序輸出:
Key | Event Time | CurrentMaxTimeStamp | WaterMark | window_start_time | window_end_time |
---|---|---|---|---|---|
0001 | 1554363510000 2019-04-04 15:38:30.000 |
1554363510000 2019-04-04 15:38:30.000 |
1554363500000 2019-04-04 15:38:20.000 |
||
0001 | 1554363523000 2019-04-04 15:38:43.000 |
1554363523000 2019-04-04 15:38:43.000 |
1554363513000 2019-04-04 15:38:33.000 |
[15:38:33.000 | 15:38:33.000) |
0001 | 1554363510000 2019-04-04 15:38:30.000 |
1554363510000 2019-04-04 15:38:43.000 |
1554363500000 2019-04-04 15:38:33.000 |
[15:38:33.000 | 15:38:33.000) |
0001 | 1554363511000 2019-04-04 15:38:31.000 |
1554363510000 2019-04-04 15:38:43.000 |
1554363500000 2019-04-04 15:38:33.000 |
[15:38:33.000 | 15:38:33.000) |
0001 | 1554363512000 2019-04-04 15:38:32.000 |
1554363510000 2019-04-04 15:38:43.000 |
1554363500000 2019-04-04 15:38:33.000 |
[15:38:33.000 | 15:38:33.000) |
0001 | 1554363524000 2019-04-04 15:38:44.000 |
1554363524000 2019-04-04 15:38:44.000 |
1554363514000 2019-04-04 15:38:34.000 |
此時贩绕,把water上升到了15:38:34火的,我們再輸入幾條eventtime<watermark的數據驗證一下效果
輸入:
$nc -l -p 9000
0001,1554363510000
0001,1554363523000
0001,1554363510000
0001,1554363511000
0001,1554363512000
0001,1554363524000
0001,1554363510000
0001,1554363511000
0001,1554363513000
程序輸出:
Key | Event Time | CurrentMaxTimeStamp | WaterMark | window_start_time | window_end_time |
---|---|---|---|---|---|
0001 | 1554363510000 2019-04-04 15:38:30.000 |
1554363510000 2019-04-04 15:38:30.000 |
1554363500000 2019-04-04 15:38:20.000 |
||
0001 | 1554363523000 2019-04-04 15:38:43.000 |
1554363523000 2019-04-04 15:38:43.000 |
1554363513000 2019-04-04 15:38:33.000 |
[15:38:33.000 | 15:38:33.000) |
0001 | 1554363510000 2019-04-04 15:38:30.000 |
1554363510000 2019-04-04 15:38:43.000 |
1554363500000 2019-04-04 15:38:33.000 |
[15:38:33.000 | 15:38:33.000) |
0001 | 1554363511000 2019-04-04 15:38:31.000 |
1554363510000 2019-04-04 15:38:43.000 |
1554363500000 2019-04-04 15:38:33.000 |
[15:38:33.000 | 15:38:33.000) |
0001 | 1554363512000 2019-04-04 15:38:32.000 |
1554363510000 2019-04-04 15:38:43.000 |
1554363500000 2019-04-04 15:38:33.000 |
[15:38:33.000 | 15:38:33.000) |
0001 | 1554363524000 2019-04-04 15:38:44.000 |
1554363524000 2019-04-04 15:38:44.000 |
1554363514000 2019-04-04 15:38:34.000 |
||
0001 | 1554363511000 2019-04-04 15:38:31.000 |
1554363510000 2019-04-04 15:38:43.000 |
1554363500000 2019-04-04 15:38:33.000 |
[15:38:33.000 | 15:38:33.000) |
0001 | 1554363512000 2019-04-04 15:38:32.000 |
1554363510000 2019-04-04 15:38:43.000 |
1554363500000 2019-04-04 15:38:33.000 |
[15:38:33.000 | 15:38:33.000) |
0001 | 1554363513000 2019-04-04 15:38:33.000 |
1554363513000 2019-04-04 15:38:43.000 |
1554363500000 2019-04-04 15:38:33.000 |
[15:38:33.000 | 15:38:33.000) |
發(fā)現輸入的三行數據都觸發(fā)了window的執(zhí)行。
我們再輸入一條數據淑倾,把water調整到15:38:35
輸入:
$nc -l -p 9000
0001,1554363525000
程序輸出:
Key | Event Time | CurrentMaxTimeStamp | WaterMark | window_start_time | window_end_time |
---|---|---|---|---|---|
0001 | 1554363525000 2019-04-04 15:38:45.000 |
1554363525000 2019-04-04 15:38:45.000 |
1554363515000 2019-04-04 15:38:35.000 |
此時馏鹤,watermark上升到了15:38:35
我們再輸入幾條eventtime<watermark的數據驗證一下效果
輸入:
$nc -l -p 9000
0001,1554363525000
0001,1554363510000
0001,1554363511000
0001,1554363513000
程序輸出:
Key | Event Time | CurrentMaxTimeStamp | WaterMark | window_start_time | window_end_time |
---|---|---|---|---|---|
0001 | 1554363525000 2019-04-04 15:38:45.000 |
1554363525000 2019-04-04 15:38:45.000 |
1554363515000 2019-04-04 15:38:35.000 |
||
0001 | 1554363511000 2019-04-04 15:38:31.000 |
1554363510000 2019-04-04 15:38:43.000 |
1554363500000 2019-04-04 15:38:33.000 |
||
0001 | 1554363512000 2019-04-04 15:38:32.000 |
1554363510000 2019-04-04 15:38:43.000 |
1554363500000 2019-04-04 15:38:33.000 |
||
0001 | 1554363513000 2019-04-04 15:38:33.000 |
1554363513000 2019-04-04 15:38:43.000 |
1554363500000 2019-04-04 15:38:33.000 |
發(fā)現這幾條數據都沒有觸發(fā)window。
分析:
當watemark等于15:38:33的時候娇哆,正好是window_end_time湃累,所以會觸發(fā)[15:38:30~15:38:33) 的window執(zhí)行。
當窗口執(zhí)行過后碍讨,我們輸入[15:38:30~15:38:33) window內的數據會發(fā)現window是可以被觸發(fā)的治力。
當watemark提升到15:38:34的時候,我們輸入[15:38:30~15:38:33)window內的數據會發(fā)現window也是可以被觸發(fā)的勃黍。
當watemark提升到15:38:35的時候宵统,我們輸入[15:38:30~15:38:33)window內的數據會發(fā)現window不會被觸發(fā)了。
由于我們在前面設置了allowedLateness(Time.seconds(2))覆获,可以允許延遲在2s內的數據繼續(xù)觸發(fā)window執(zhí)行马澈。
所以當watermark是15:38:34的時候可以觸發(fā)window,但是15:38:35的時候就不行了锻梳。
總結:
對于此窗口而言箭券,允許2秒的遲到數據,即第一次觸發(fā)是在watermark >= window_end_time時
第二次(或多次)觸發(fā)的條件是watermark < window_end_time + allowedLateness時間內疑枯,這個窗口有l(wèi)ate數據到達時。
解釋:
當watermark等于15:38:34的時候蛔六,我們輸入eventtime為15:38:30荆永、15:38:31、15:38:32的數據的時候国章,是可以觸發(fā)的具钥,因為這些數據的window_end_time都是15:38:33,也就是15:38:34<15:38:33+2 為true液兽。
但是當watermark等于15:38:35的時候骂删,我們再輸入eventtime為15:38:30、15:38:31四啰、15:38:32的數據的時候宁玫,這些數據的window_end_time都是15:38:33,此時柑晒,15:38:35<15:38:33+2 為false了欧瘪。所以最終這些數據遲到的時間太久了,就不會再觸發(fā)window執(zhí)行了匙赞。
3. sideOutputLateData 收集遲到的數據
通過sideOutputLateData 可以把遲到的數據統(tǒng)一收集佛掖,統(tǒng)計存儲妖碉,方便后期排查問題。
輸入:
$nc -l -p 9000
0001,1554363510000
0001,1554363523000
程序輸出:
Key | Event Time | CurrentMaxTimeStamp | WaterMark | window_start_time | window_end_time |
---|---|---|---|---|---|
0001 | 1554363510000 2019-04-04 15:38:30.000 |
1554363510000 2019-04-04 15:38:30.000 |
1554363500000 2019-04-04 15:38:20.000 |
||
0001 | 1554363523000 2019-04-04 15:38:43.000 |
1554363523000 2019-04-04 15:38:43.000 |
1554363513000 2019-04-04 15:38:33.000 |
[15:38:33.000 | 15:38:33.000) |
此時芥被,window被觸發(fā)執(zhí)行了欧宜,此時watermark是15:38:33
下面我們再輸入幾個eventtime小于watermark的數據測試一下
輸入:
$nc -l -p 9000
0001,1554363510000
0001,1554363523000
0001,1554363510000
0001,1554363511000
0001,1554363512000
程序輸出:
Key | Event Time | CurrentMaxTimeStamp | WaterMark | window_start_time | window_end_time |
---|---|---|---|---|---|
0001 | 1554363510000 2019-04-04 15:38:30.000 |
1554363510000 2019-04-04 15:38:30.000 |
1554363500000 2019-04-04 15:38:20.000 |
||
0001 | 1554363523000 2019-04-04 15:38:43.000 |
1554363523000 2019-04-04 15:38:43.000 |
1554363513000 2019-04-04 15:38:33.000 |
[15:38:33.000 | 15:38:33.000) |
0001 | 1554363510000 2019-04-04 15:38:30.000 |
1554363510000 2019-04-04 15:38:43.000 |
1554363500000 2019-04-04 15:38:33.000 |
||
0001 | 1554363511000 2019-04-04 15:38:31.000 |
1554363510000 2019-04-04 15:38:43.000 |
1554363500000 2019-04-04 15:38:33.000 |
||
0001 | 1554363512000 2019-04-04 15:38:32.000 |
1554363510000 2019-04-04 15:38:43.000 |
1554363500000 2019-04-04 15:38:33.000 |
此時沟沙,針對這幾條遲到的數據茬祷,都通過sideOutputLateData保存到了outputTag中,然后輸出到控制臺稳衬。
// 把延遲的數據暫時打印到控制臺羹铅,實際可以保存到存儲介質中蚀狰。
val sideOutput = window.getSideOutput(outputTag)
sideOutput.print()