背景
實時計算中整陌,數(shù)據(jù)時間比較敏感拗窃。有eventTime和processTime區(qū)分,一般來說eventTime是從原始的消息中提取過來的泌辫,processTime是Flink自己提供的随夸,F(xiàn)link中一個亮點就是可以基于eventTime計算,這個功能很有用甥郑,因為實時數(shù)據(jù)可能會經(jīng)過比較長的鏈路逃魄,多少會有延時荤西,并且有很大的不確定性澜搅,對于一些需要精確體現(xiàn)事件變化趨勢的場景中,單純使用processTime顯然是不合理的邪锌。
概念
watermark是一種衡量Event Time進展的機制勉躺,它是數(shù)據(jù)本身的一個隱藏屬性。通趁俜幔基于Event Time的數(shù)據(jù)饵溅,自身都包含一個timestamp.watermark是用于處理亂序事件的,而正確的處理亂序事件妇萄,通常用watermark機制結(jié)合window來實現(xiàn)蜕企。
流處理從事件產(chǎn)生,到流經(jīng)source冠句,再到operator轻掩,中間是有一個過程和時間的。雖然大部分情況下懦底,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時間順序來的唇牧,但是也不排除由于網(wǎng)絡(luò)、背壓等原因聚唐,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說late element)丐重。
但是對于late element,我們又不能無限期的等下去杆查,必須要有個機制來保證一個特定的時間后扮惦,必須觸發(fā)window去進行計算了。這個特別的機制亲桦,就是watermark崖蜜。
window劃分
window的設(shè)定無關(guān)數(shù)據(jù)本身掺栅,而是系統(tǒng)定義好了的。
window是flink中劃分數(shù)據(jù)一個基本單位纳猪,window的劃分方式是固定的氧卧,默認會根據(jù)自然時間劃分window,并且劃分方式是前閉后開氏堤。
window劃分 | w1 | w2 | w3 |
---|---|---|---|
3s | [00:00:00~00:00:03) | [00:00:03~00:00:06) | [00:00:06~00:00:09) |
5s | [00:00:00~00:00:05) | [00:00:05~00:00:10) | [00:00:10~00:00:15) |
10s | [00:00:00~00:00:10) | [00:00:10~00:00:20) | [00:00:20~00:00:30) |
1min | [00:00:00~00:01:00) | [00:01:00~00:02:00) | [00:02:00~00:03:00) |
示例
如果設(shè)置最大允許的亂序時間是10s沙绝,滾動時間窗口為5s
{"datetime":"2019-03-26 16:25:24","name":"zhangsan"}
//currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:24],currentMaxTimestamp:[2019-03-26 16:25:24],watermark:[2019-03-26 16:25:14]
觸達改記錄的時間窗口應(yīng)該為2019-03-26 16:25:20~2019-03-26 16:25:25
即當(dāng)有數(shù)據(jù)eventTime >= 2019-03-26 16:25:35 時
{"datetime":"2019-03-26 16:25:35","name":"zhangsan"}
//currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:35],currentMaxTimestamp:[2019-03-26 16:25:35],watermark:[2019-03-26 16:25:25]
//(zhangsan,1,2019-03-26 16:25:24,2019-03-26 16:25:24,2019-03-26 16:25:20,2019-03-26 16:25:25)
后面會詳細講解。_
提取watermark
watermark的提取工作在taskManager中完成鼠锈,意味著這項工作是并行進行的的闪檬,而watermark是一個全局的概念,就是一個整個Flink作業(yè)之后一個warkermark购笆。
AssignerWithPeriodicWatermarks
定時提取watermark粗悯,這種方式會定時提取更新wartermark。
//默認200ms
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
getConfig().setAutoWatermarkInterval(0);
} else {
getConfig().setAutoWatermarkInterval(200);
}
}
AssignerWithPunctuatedWatermarks
伴隨event的到來就提取watermark同欠,就是每一個event到來的時候样傍,就會提取一次Watermark。
這樣的方式當(dāng)然設(shè)置watermark更為精準(zhǔn)铺遂,但是當(dāng)數(shù)據(jù)量大的時候衫哥,頻繁的更新wartermark會比較影響性能。
通常情況下采用定時提取就足夠了襟锐。
使用
設(shè)置數(shù)據(jù)流時間特征
//設(shè)置為事件時間
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
默認為TimeCharacteristic.ProcessingTime
,默認水位線更新每隔200ms
入口文件
val env = StreamExecutionEnvironment.getExecutionEnvironment
//便于測試撤逢,并行度設(shè)置為1
env.setParallelism(1)
//env.getConfig.setAutoWatermarkInterval(9000)
//設(shè)置為事件時間
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//設(shè)置source 本地socket
val text: DataStream[String] = env.socketTextStream("localhost", 9000)
val lateText = new OutputTag[(String, String, Long, Long)]("late_data")
val value = text.filter(new MyFilterNullOrWhitespace)
.flatMap(new MyFlatMap)
.assignTimestampsAndWatermarks(new MyWaterMark)
.map(x => (x.name, x.datetime, x.timestamp, 1L))
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sideOutputLateData(lateText)
//.sum(2)
.apply(new MyWindow)
//.window(TumblingEventTimeWindows.of(Time.seconds(3)))
//.apply(new MyWindow)
value.getSideOutput(lateText).map(x => {
"延遲數(shù)據(jù)|name:" + x._1 + "|datetime:" + x._2
}).print()
value.print()
env.execute("watermark test")
class MyWaterMark extends AssignerWithPeriodicWatermarks[EventObj] {
val maxOutOfOrderness = 10000L // 3.0 seconds
var currentMaxTimestamp = 0L
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
/**
* 用于生成新的水位線,新的水位線只有大于當(dāng)前水位線才是有效的
*
* 通過生成水印的間隔(每n毫秒)定義 ExecutionConfig.setAutoWatermarkInterval(...)粮坞。
* getCurrentWatermark()每次調(diào)用分配器的方法蚊荣,如果返回的水印非空并且大于先前的水印,則將發(fā)出新的水印莫杈。
*
* @return
*/
override def getCurrentWatermark: Watermark = {
new Watermark(this.currentMaxTimestamp - this.maxOutOfOrderness)
}
/**
* 用于從消息中提取事件時間
*
* @param element EventObj
* @param previousElementTimestamp Long
* @return
*/
override def extractTimestamp(element: EventObj, previousElementTimestamp: Long): Long = {
currentMaxTimestamp = Math.max(element.timestamp, currentMaxTimestamp)
val id = Thread.currentThread().getId
println("currentThreadId:" + id + ",key:" + element.name + ",eventTime:[" + element.datetime + "],currentMaxTimestamp:[" + sdf.format(currentMaxTimestamp) + "],watermark:[" + sdf.format(getCurrentWatermark().getTimestamp) + "]")
element.timestamp
}
}
代碼詳解
- 設(shè)置為事件時間
- 接受本地socket數(shù)據(jù)
- 抽取timestamp生成watermark互例,打印(線程id,key,eventTime,currentMaxTimestamp,watermark)
- event time每隔3秒觸發(fā)一次窗口姓迅,打忧没簟(key,窗口內(nèi)元素個數(shù),窗口內(nèi)最早元素的時間丁存,窗口內(nèi)最晚元素的時間肩杈,窗口自身開始時間,窗口自身結(jié)束時間)
試驗
第一次
數(shù)據(jù)
{"datetime":"2019-03-26 16:25:24","name":"zhangsan"}
輸出
|currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:24],currentMaxTimestamp:[2019-03-26 16:25:24],watermark:[2019-03-26 16:25:14]
匯總
Key | EventTime | currentMaxTimestamp | Watermark |
---|---|---|---|
zhangsan | 2019-03-26 16:25:24 | 2019-03-26 16:25:24 | 2019-03-26 16:25:14 |
第二次
數(shù)據(jù)
{"datetime":"2019-03-26 16:25:27","name":"zhangsan"}
輸出
currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:27],currentMaxTimestamp:[2019-03-26 16:25:27],watermark:[2019-03-26 16:25:17]
匯總
Key | EventTime | currentMaxTimestamp | Watermark |
---|---|---|---|
zhangsan | 2019-03-26 16:25:24 | 2019-03-26 16:25:24 | 2019-03-26 16:25:14 |
zhangsan | 2019-03-26 16:25:27 | 2019-03-26 16:25:27 | 2019-03-26 16:25:17 |
隨著EventTime的升高解寝,Watermark升高扩然。
第三次
數(shù)據(jù)
{"datetime":"2019-03-26 16:25:34","name":"zhangsan"}
輸出
currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:34],currentMaxTimestamp:[2019-03-26 16:25:34],watermark:[2019-03-26 16:25:24]
匯總
Key | EventTime | currentMaxTimestamp | Watermark |
---|---|---|---|
zhangsan | 2019-03-26 16:25:24 | 2019-03-26 16:25:24 | 2019-03-26 16:25:14 |
zhangsan | 2019-03-26 16:25:27 | 2019-03-26 16:25:27 | 2019-03-26 16:25:17 |
zhangsan | 2019-03-26 16:25:34 | 2019-03-26 16:25:34 | 2019-03-26 16:25:24 |
到這里,window仍然沒有被觸發(fā)聋伦,此時watermark的時間已經(jīng)等于了第一條數(shù)據(jù)的Event Time了夫偶。
第四次
數(shù)據(jù)
{"datetime":"2019-03-26 16:25:35","name":"zhangsan"}
輸出
currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:35],currentMaxTimestamp:[2019-03-26 16:25:35],watermark:[2019-03-26 16:25:25]
(zhangsan,1,2019-03-26 16:25:24,2019-03-26 16:25:24,2019-03-26 16:25:20,2019-03-26 16:25:25)
匯總
Key | EventTime | currentMaxTimestamp | Watermark | WindowStartTime | WindowEndTime |
---|---|---|---|---|---|
zhangsan | 2019-03-26 16:25:24 | 2019-03-26 16:25:24 | 2019-03-26 16:25:14 | ||
zhangsan | 2019-03-26 16:25:27 | 2019-03-26 16:25:27 | 2019-03-26 16:25:17 | ||
zhangsan | 2019-03-26 16:25:34 | 2019-03-26 16:25:34 | 2019-03-26 16:25:24 | ||
zhangsan | 2019-03-26 16:25:35 | 2019-03-26 16:25:35 | 2019-03-26 16:25:25 | [2019-03-26 16:25:20 | 2019-03-26 16:25:25) |
直接證明了window的設(shè)定無關(guān)數(shù)據(jù)本身界睁,而是系統(tǒng)定義好了的。
輸入的數(shù)據(jù)中兵拢,根據(jù)自身的Event Time翻斟,將數(shù)據(jù)劃分到不同的window中,如果window中有數(shù)據(jù)说铃,則當(dāng)watermark時間>=Event Time時访惜,就符合了window觸發(fā)的條件了,最終決定window觸發(fā)腻扇,還是由數(shù)據(jù)本身的Event Time所屬的window中的window_end_time決定捏题。
當(dāng)最后一條數(shù)據(jù)16:25:35到達是明郭,Watermark提升到16:25:25,此時窗口16:25:20~16:25:25中有數(shù)據(jù)淹魄,Window被觸發(fā)灿里。
第五次
數(shù)據(jù)
{"datetime":"2019-03-26 16:25:37","name":"zhangsan"}
輸出
currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:37],currentMaxTimestamp:[2019-03-26 16:25:37],watermark:[2019-03-26 16:25:27]
匯總
Key | EventTime | currentMaxTimestamp | Watermark | WindowStartTime | WindowEndTime |
---|---|---|---|---|---|
zhangsan | 2019-03-26 16:25:24 | 2019-03-26 16:25:24 | 2019-03-26 16:25:14 | ||
zhangsan | 2019-03-26 16:25:27 | 2019-03-26 16:25:27 | 2019-03-26 16:25:17 | ||
zhangsan | 2019-03-26 16:25:34 | 2019-03-26 16:25:34 | 2019-03-26 16:25:24 | ||
zhangsan | 2019-03-26 16:25:35 | 2019-03-26 16:25:35 | 2019-03-26 16:25:25 | [2019-03-26 16:25:20 | 2019-03-26 16:25:25) |
zhangsan | 2019-03-26 16:25:37 | 2019-03-26 16:25:37 | 2019-03-26 16:25:27 |
此時靖苇,watermark時間雖然已經(jīng)達到了第二條數(shù)據(jù)的時間嘶朱,但是由于其沒有達到第二條數(shù)據(jù)所在window的結(jié)束時間溯捆,所以window并沒有被觸發(fā)。
第二條數(shù)據(jù)所在的window時間是:[2019-03-26 16:25:25,2019-03-26 16:25:30)
第六次
數(shù)據(jù)
{"datetime":"2019-03-26 16:25:40","name":"zhangsan"}
輸出
currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:40],currentMaxTimestamp:[2019-03-26 16:25:40],watermark:[2019-03-26 16:25:30]
(zhangsan,1,2019-03-26 16:25:27,2019-03-26 16:25:27,2019-03-26 16:25:25,2019-03-26 16:25:30)
匯總
Key | EventTime | currentMaxTimestamp | Watermark | WindowStartTime | WindowEndTime |
---|---|---|---|---|---|
zhangsan | 2019-03-26 16:25:24 | 2019-03-26 16:25:24 | 2019-03-26 16:25:14 | ||
zhangsan | 2019-03-26 16:25:27 | 2019-03-26 16:25:27 | 2019-03-26 16:25:17 | ||
zhangsan | 2019-03-26 16:25:34 | 2019-03-26 16:25:34 | 2019-03-26 16:25:24 | ||
zhangsan | 2019-03-26 16:25:35 | 2019-03-26 16:25:35 | 2019-03-26 16:25:25 | [2019-03-26 16:25:20 | 2019-03-26 16:25:25) |
zhangsan | 2019-03-26 16:25:37 | 2019-03-26 16:25:37 | 2019-03-26 16:25:27 | ||
zhangsan | 2019-03-26 16:25:40 | 2019-03-26 16:25:40 | 2019-03-26 16:25:30 | [2019-03-26 16:25:25 | 2019-03-26 16:25:30) |
結(jié)論
window的觸發(fā)要符合以下幾個條件:
- watermark時間 >= window_end_time
- 在[window_start_time,window_end_time)中有數(shù)據(jù)存在
同時滿足了以上2個條件暑椰,window才會觸發(fā)霍转。
watermark是一個全局的值,不是某一個key下的值一汽,所以即使不是同一個key的數(shù)據(jù),其warmark也會增加.
多并行度
總結(jié)
Flink如何處理亂序低滩?
watermark+window機制召夹。window中可以對input進行按照Event Time排序,使得完全按照Event Time發(fā)生的順序去處理數(shù)據(jù)恕沫,以達到處理亂序數(shù)據(jù)的目的监憎。
Flink何時觸發(fā)window?
對于late element太多的數(shù)據(jù)而言
- Event Time < watermark時間
對于out-of-order以及正常的數(shù)據(jù)而言
- watermark時間 >= window_end_time
- 在[window_start_time,window_end_time)中有數(shù)據(jù)存在
Flink應(yīng)該如何設(shè)置最大亂序時間婶溯?
結(jié)合自己的業(yè)務(wù)以及數(shù)據(jù)情況去設(shè)置鲸阔。
參考
Flink WaterMark(水位線)分布式執(zhí)行理解
Flink流計算編程--watermark(水位線)簡介
The Dataflow Model