Flink WaterMark 詳解

背景

image

實時計算中整陌,數(shù)據(jù)時間比較敏感拗窃。有eventTime和processTime區(qū)分,一般來說eventTime是從原始的消息中提取過來的泌辫,processTime是Flink自己提供的随夸,F(xiàn)link中一個亮點就是可以基于eventTime計算,這個功能很有用甥郑,因為實時數(shù)據(jù)可能會經(jīng)過比較長的鏈路逃魄,多少會有延時荤西,并且有很大的不確定性澜搅,對于一些需要精確體現(xiàn)事件變化趨勢的場景中,單純使用processTime顯然是不合理的邪锌。

概念

watermark是一種衡量Event Time進展的機制勉躺,它是數(shù)據(jù)本身的一個隱藏屬性。通趁俜幔基于Event Time的數(shù)據(jù)饵溅,自身都包含一個timestamp.watermark是用于處理亂序事件的,而正確的處理亂序事件妇萄,通常用watermark機制結(jié)合window來實現(xiàn)蜕企。

流處理從事件產(chǎn)生,到流經(jīng)source冠句,再到operator轻掩,中間是有一個過程和時間的。雖然大部分情況下懦底,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時間順序來的唇牧,但是也不排除由于網(wǎng)絡(luò)、背壓等原因聚唐,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說late element)丐重。

但是對于late element,我們又不能無限期的等下去杆查,必須要有個機制來保證一個特定的時間后扮惦,必須觸發(fā)window去進行計算了。這個特別的機制亲桦,就是watermark崖蜜。

window劃分

window的設(shè)定無關(guān)數(shù)據(jù)本身掺栅,而是系統(tǒng)定義好了的。
window是flink中劃分數(shù)據(jù)一個基本單位纳猪,window的劃分方式是固定的氧卧,默認會根據(jù)自然時間劃分window,并且劃分方式是前閉后開氏堤。

window劃分 w1 w2 w3
3s [00:00:00~00:00:03) [00:00:03~00:00:06) [00:00:06~00:00:09)
5s [00:00:00~00:00:05) [00:00:05~00:00:10) [00:00:10~00:00:15)
10s [00:00:00~00:00:10) [00:00:10~00:00:20) [00:00:20~00:00:30)
1min [00:00:00~00:01:00) [00:01:00~00:02:00) [00:02:00~00:03:00)

示例

如果設(shè)置最大允許的亂序時間是10s沙绝,滾動時間窗口為5s

{"datetime":"2019-03-26 16:25:24","name":"zhangsan"}
//currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:24],currentMaxTimestamp:[2019-03-26 16:25:24],watermark:[2019-03-26 16:25:14]

觸達改記錄的時間窗口應(yīng)該為2019-03-26 16:25:20~2019-03-26 16:25:25
即當(dāng)有數(shù)據(jù)eventTime >= 2019-03-26 16:25:35 時

{"datetime":"2019-03-26 16:25:35","name":"zhangsan"}
//currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:35],currentMaxTimestamp:[2019-03-26 16:25:35],watermark:[2019-03-26 16:25:25]
//(zhangsan,1,2019-03-26 16:25:24,2019-03-26 16:25:24,2019-03-26 16:25:20,2019-03-26 16:25:25)

后面會詳細講解。_

提取watermark

watermark的提取工作在taskManager中完成鼠锈,意味著這項工作是并行進行的的闪檬,而watermark是一個全局的概念,就是一個整個Flink作業(yè)之后一個warkermark购笆。

AssignerWithPeriodicWatermarks

定時提取watermark粗悯,這種方式會定時提取更新wartermark。

//默認200ms
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
    this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
    if (characteristic == TimeCharacteristic.ProcessingTime) {
        getConfig().setAutoWatermarkInterval(0);
    } else {
        getConfig().setAutoWatermarkInterval(200);
    }
}

AssignerWithPunctuatedWatermarks

伴隨event的到來就提取watermark同欠,就是每一個event到來的時候样傍,就會提取一次Watermark。
這樣的方式當(dāng)然設(shè)置watermark更為精準(zhǔn)铺遂,但是當(dāng)數(shù)據(jù)量大的時候衫哥,頻繁的更新wartermark會比較影響性能。
通常情況下采用定時提取就足夠了襟锐。

使用

設(shè)置數(shù)據(jù)流時間特征

//設(shè)置為事件時間
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

默認為TimeCharacteristic.ProcessingTime,默認水位線更新每隔200ms

入口文件

val env = StreamExecutionEnvironment.getExecutionEnvironment

//便于測試撤逢,并行度設(shè)置為1
env.setParallelism(1)

//env.getConfig.setAutoWatermarkInterval(9000)

//設(shè)置為事件時間
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//設(shè)置source 本地socket
val text: DataStream[String] = env.socketTextStream("localhost", 9000)


val lateText = new OutputTag[(String, String, Long, Long)]("late_data")

val value = text.filter(new MyFilterNullOrWhitespace)
.flatMap(new MyFlatMap)
.assignTimestampsAndWatermarks(new MyWaterMark)
.map(x => (x.name, x.datetime, x.timestamp, 1L))
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sideOutputLateData(lateText)
//.sum(2)
.apply(new MyWindow)
//.window(TumblingEventTimeWindows.of(Time.seconds(3)))
//.apply(new MyWindow)
value.getSideOutput(lateText).map(x => {
"延遲數(shù)據(jù)|name:" + x._1 + "|datetime:" + x._2
}).print()

value.print()

env.execute("watermark test")

class MyWaterMark extends AssignerWithPeriodicWatermarks[EventObj] {

  val maxOutOfOrderness = 10000L // 3.0 seconds
  var currentMaxTimestamp = 0L

  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  /**
    * 用于生成新的水位線,新的水位線只有大于當(dāng)前水位線才是有效的
    *
    * 通過生成水印的間隔(每n毫秒)定義 ExecutionConfig.setAutoWatermarkInterval(...)粮坞。
    * getCurrentWatermark()每次調(diào)用分配器的方法蚊荣,如果返回的水印非空并且大于先前的水印,則將發(fā)出新的水印莫杈。
    *
    * @return
    */
  override def getCurrentWatermark: Watermark = {
    new Watermark(this.currentMaxTimestamp - this.maxOutOfOrderness)
  }

  /**
    * 用于從消息中提取事件時間
    *
    * @param element                  EventObj
    * @param previousElementTimestamp Long
    * @return
    */
  override def extractTimestamp(element: EventObj, previousElementTimestamp: Long): Long = {

    currentMaxTimestamp = Math.max(element.timestamp, currentMaxTimestamp)

    val id = Thread.currentThread().getId
    println("currentThreadId:" + id + ",key:" + element.name + ",eventTime:[" + element.datetime + "],currentMaxTimestamp:[" + sdf.format(currentMaxTimestamp) + "],watermark:[" + sdf.format(getCurrentWatermark().getTimestamp) + "]")

    element.timestamp
  }
}

代碼詳解

  1. 設(shè)置為事件時間
  2. 接受本地socket數(shù)據(jù)
  3. 抽取timestamp生成watermark互例,打印(線程id,key,eventTime,currentMaxTimestamp,watermark)
  4. event time每隔3秒觸發(fā)一次窗口姓迅,打忧没簟(key,窗口內(nèi)元素個數(shù),窗口內(nèi)最早元素的時間丁存,窗口內(nèi)最晚元素的時間肩杈,窗口自身開始時間,窗口自身結(jié)束時間)

試驗

第一次

數(shù)據(jù)

{"datetime":"2019-03-26 16:25:24","name":"zhangsan"}

輸出

|currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:24],currentMaxTimestamp:[2019-03-26 16:25:24],watermark:[2019-03-26 16:25:14]

匯總

Key EventTime currentMaxTimestamp Watermark
zhangsan 2019-03-26 16:25:24 2019-03-26 16:25:24 2019-03-26 16:25:14

第二次

數(shù)據(jù)

{"datetime":"2019-03-26 16:25:27","name":"zhangsan"}

輸出

currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:27],currentMaxTimestamp:[2019-03-26 16:25:27],watermark:[2019-03-26 16:25:17]

匯總

Key EventTime currentMaxTimestamp Watermark
zhangsan 2019-03-26 16:25:24 2019-03-26 16:25:24 2019-03-26 16:25:14
zhangsan 2019-03-26 16:25:27 2019-03-26 16:25:27 2019-03-26 16:25:17

隨著EventTime的升高解寝,Watermark升高扩然。

第三次

數(shù)據(jù)

{"datetime":"2019-03-26 16:25:34","name":"zhangsan"}

輸出

currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:34],currentMaxTimestamp:[2019-03-26 16:25:34],watermark:[2019-03-26 16:25:24]

匯總

Key EventTime currentMaxTimestamp Watermark
zhangsan 2019-03-26 16:25:24 2019-03-26 16:25:24 2019-03-26 16:25:14
zhangsan 2019-03-26 16:25:27 2019-03-26 16:25:27 2019-03-26 16:25:17
zhangsan 2019-03-26 16:25:34 2019-03-26 16:25:34 2019-03-26 16:25:24

到這里,window仍然沒有被觸發(fā)聋伦,此時watermark的時間已經(jīng)等于了第一條數(shù)據(jù)的Event Time了夫偶。

第四次

數(shù)據(jù)

{"datetime":"2019-03-26 16:25:35","name":"zhangsan"}
image

輸出

currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:35],currentMaxTimestamp:[2019-03-26 16:25:35],watermark:[2019-03-26 16:25:25]
(zhangsan,1,2019-03-26 16:25:24,2019-03-26 16:25:24,2019-03-26 16:25:20,2019-03-26 16:25:25)
image

匯總

Key EventTime currentMaxTimestamp Watermark WindowStartTime WindowEndTime
zhangsan 2019-03-26 16:25:24 2019-03-26 16:25:24 2019-03-26 16:25:14
zhangsan 2019-03-26 16:25:27 2019-03-26 16:25:27 2019-03-26 16:25:17
zhangsan 2019-03-26 16:25:34 2019-03-26 16:25:34 2019-03-26 16:25:24
zhangsan 2019-03-26 16:25:35 2019-03-26 16:25:35 2019-03-26 16:25:25 [2019-03-26 16:25:20 2019-03-26 16:25:25)

直接證明了window的設(shè)定無關(guān)數(shù)據(jù)本身界睁,而是系統(tǒng)定義好了的。
輸入的數(shù)據(jù)中兵拢,根據(jù)自身的Event Time翻斟,將數(shù)據(jù)劃分到不同的window中,如果window中有數(shù)據(jù)说铃,則當(dāng)watermark時間>=Event Time時访惜,就符合了window觸發(fā)的條件了,最終決定window觸發(fā)腻扇,還是由數(shù)據(jù)本身的Event Time所屬的window中的window_end_time決定捏题。

當(dāng)最后一條數(shù)據(jù)16:25:35到達是明郭,Watermark提升到16:25:25,此時窗口16:25:20~16:25:25中有數(shù)據(jù)淹魄,Window被觸發(fā)灿里。

第五次

數(shù)據(jù)

{"datetime":"2019-03-26 16:25:37","name":"zhangsan"}

輸出

currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:37],currentMaxTimestamp:[2019-03-26 16:25:37],watermark:[2019-03-26 16:25:27]

匯總

Key EventTime currentMaxTimestamp Watermark WindowStartTime WindowEndTime
zhangsan 2019-03-26 16:25:24 2019-03-26 16:25:24 2019-03-26 16:25:14
zhangsan 2019-03-26 16:25:27 2019-03-26 16:25:27 2019-03-26 16:25:17
zhangsan 2019-03-26 16:25:34 2019-03-26 16:25:34 2019-03-26 16:25:24
zhangsan 2019-03-26 16:25:35 2019-03-26 16:25:35 2019-03-26 16:25:25 [2019-03-26 16:25:20 2019-03-26 16:25:25)
zhangsan 2019-03-26 16:25:37 2019-03-26 16:25:37 2019-03-26 16:25:27

此時靖苇,watermark時間雖然已經(jīng)達到了第二條數(shù)據(jù)的時間嘶朱,但是由于其沒有達到第二條數(shù)據(jù)所在window的結(jié)束時間溯捆,所以window并沒有被觸發(fā)。

第二條數(shù)據(jù)所在的window時間是:[2019-03-26 16:25:25,2019-03-26 16:25:30)

第六次

數(shù)據(jù)

{"datetime":"2019-03-26 16:25:40","name":"zhangsan"}

輸出

currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:40],currentMaxTimestamp:[2019-03-26 16:25:40],watermark:[2019-03-26 16:25:30]
(zhangsan,1,2019-03-26 16:25:27,2019-03-26 16:25:27,2019-03-26 16:25:25,2019-03-26 16:25:30)

匯總

Key EventTime currentMaxTimestamp Watermark WindowStartTime WindowEndTime
zhangsan 2019-03-26 16:25:24 2019-03-26 16:25:24 2019-03-26 16:25:14
zhangsan 2019-03-26 16:25:27 2019-03-26 16:25:27 2019-03-26 16:25:17
zhangsan 2019-03-26 16:25:34 2019-03-26 16:25:34 2019-03-26 16:25:24
zhangsan 2019-03-26 16:25:35 2019-03-26 16:25:35 2019-03-26 16:25:25 [2019-03-26 16:25:20 2019-03-26 16:25:25)
zhangsan 2019-03-26 16:25:37 2019-03-26 16:25:37 2019-03-26 16:25:27
zhangsan 2019-03-26 16:25:40 2019-03-26 16:25:40 2019-03-26 16:25:30 [2019-03-26 16:25:25 2019-03-26 16:25:30)

結(jié)論

window的觸發(fā)要符合以下幾個條件:

  1. watermark時間 >= window_end_time
  2. 在[window_start_time,window_end_time)中有數(shù)據(jù)存在

同時滿足了以上2個條件暑椰,window才會觸發(fā)霍转。
watermark是一個全局的值,不是某一個key下的值一汽,所以即使不是同一個key的數(shù)據(jù),其warmark也會增加.

多并行度

image

總結(jié)

Flink如何處理亂序低滩?

watermark+window機制召夹。window中可以對input進行按照Event Time排序,使得完全按照Event Time發(fā)生的順序去處理數(shù)據(jù)恕沫,以達到處理亂序數(shù)據(jù)的目的监憎。

Flink何時觸發(fā)window?

對于late element太多的數(shù)據(jù)而言

  1. Event Time < watermark時間

對于out-of-order以及正常的數(shù)據(jù)而言

  1. watermark時間 >= window_end_time
  2. 在[window_start_time,window_end_time)中有數(shù)據(jù)存在

Flink應(yīng)該如何設(shè)置最大亂序時間婶溯?

結(jié)合自己的業(yè)務(wù)以及數(shù)據(jù)情況去設(shè)置鲸阔。

image

參考

Flink WaterMark(水位線)分布式執(zhí)行理解
Flink流計算編程--watermark(水位線)簡介
The Dataflow Model

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市迄委,隨后出現(xiàn)的幾起案子褐筛,更是在濱河造成了極大的恐慌,老刑警劉巖叙身,帶你破解...
    沈念sama閱讀 217,406評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件渔扎,死亡現(xiàn)場離奇詭異,居然都是意外死亡信轿,警方通過查閱死者的電腦和手機晃痴,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評論 3 393
  • 文/潘曉璐 我一進店門残吩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人倘核,你說我怎么就攤上這事泣侮。” “怎么了紧唱?”我有些...
    開封第一講書人閱讀 163,711評論 0 353
  • 文/不壞的土叔 我叫張陵旁瘫,是天一觀的道長。 經(jīng)常有香客問我琼蚯,道長酬凳,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,380評論 1 293
  • 正文 為了忘掉前任遭庶,我火速辦了婚禮宁仔,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘峦睡。我一直安慰自己翎苫,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,432評論 6 392
  • 文/花漫 我一把揭開白布榨了。 她就那樣靜靜地躺著煎谍,像睡著了一般。 火紅的嫁衣襯著肌膚如雪龙屉。 梳的紋絲不亂的頭發(fā)上呐粘,一...
    開封第一講書人閱讀 51,301評論 1 301
  • 那天,我揣著相機與錄音转捕,去河邊找鬼作岖。 笑死,一個胖子當(dāng)著我的面吹牛五芝,可吹牛的內(nèi)容都是我干的痘儡。 我是一名探鬼主播,決...
    沈念sama閱讀 40,145評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼枢步,長吁一口氣:“原來是場噩夢啊……” “哼沉删!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起醉途,我...
    開封第一講書人閱讀 39,008評論 0 276
  • 序言:老撾萬榮一對情侶失蹤矾瑰,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后结蟋,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體脯倚,經(jīng)...
    沈念sama閱讀 45,443評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,649評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了推正。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片恍涂。...
    茶點故事閱讀 39,795評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖植榕,靈堂內(nèi)的尸體忽然破棺而出再沧,到底是詐尸還是另有隱情,我是刑警寧澤尊残,帶...
    沈念sama閱讀 35,501評論 5 345
  • 正文 年R本政府宣布炒瘸,位于F島的核電站,受9級特大地震影響寝衫,放射性物質(zhì)發(fā)生泄漏顷扩。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,119評論 3 328
  • 文/蒙蒙 一慰毅、第九天 我趴在偏房一處隱蔽的房頂上張望隘截。 院中可真熱鬧,春花似錦汹胃、人聲如沸婶芭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽犀农。三九已至,卻和暖如春宰掉,著一層夾襖步出監(jiān)牢的瞬間呵哨,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評論 1 269
  • 我被黑心中介騙來泰國打工贵扰, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留仇穗,地道東北人。 一個月前我還...
    沈念sama閱讀 47,899評論 2 370
  • 正文 我出身青樓戚绕,卻偏偏與公主長得像,于是被迫代替她去往敵國和親枝冀。 傳聞我的和親對象是個殘疾皇子舞丛,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,724評論 2 354

推薦閱讀更多精彩內(nèi)容

  • Watermark 是 Flink 實時處理計算平臺的一個重要概念,也是 Google 的著名實時計算論文 The...
    hongyuzhou閱讀 2,619評論 2 13
  • 1. Watermark概念 watermark是一種衡量Event Time進展的機制果漾,它是數(shù)據(jù)本身的一個隱藏屬...
    maskwang520閱讀 17,894評論 0 6
  • Window是無限數(shù)據(jù)流處理的核心球切,Window將一個無限的stream拆分成有限大小的”buckets”桶,我們...
    尼小摩閱讀 3,447評論 0 13
  • 聽完樊登老師講的《即興演講》绒障、《演講的力量》吨凑、《高效演講》最少的一本書,都聽了兩遍,其他都是三四遍鸵钝。和女...
    蘇小濤閱讀 151評論 0 0
  • 按照官方網(wǎng)站的介紹糙臼,IPFS編譯一般無法通過。原因基本是中間下載包失敗恩商。所以你最好有一個能訪問ipfs.io的代理...
    樂樂_6272閱讀 1,703評論 0 0