轉(zhuǎn)自:https://bbs.huaweicloud.com/blogs/146333
[白話解析] Flink的Watermark機(jī)制
0x00 摘要
對(duì)于Flink來(lái)說(shuō),Watermark是個(gè)很難繞過(guò)去的概念针余。本文將從整體的思路上來(lái)說(shuō),運(yùn)用感性直覺(jué)的思考來(lái)幫大家梳理Watermark概念锅棕。
0x01 問(wèn)題
關(guān)于Watermark酥夭,很容易產(chǎn)生幾個(gè)問(wèn)題
Flink 流處理應(yīng)用中,常見(jiàn)的處理需求/應(yīng)對(duì)方案是什么?
Watermark究竟應(yīng)該翻譯成水印還是水位線筏勒?
Watermark本質(zhì)是什么?
Watermark是如何解決問(wèn)題?
下面我們就來(lái)簡(jiǎn)要解答這些問(wèn)題以給大家一個(gè)大致概念遮精,在后文中居夹,會(huì)再深入描述。
問(wèn)題1. Flink 流處理應(yīng)用中常見(jiàn)的需求/方案是什么
聚合類(lèi)的處理?Flink可以每來(lái)一個(gè)消息就處理一次本冲,但是有時(shí)我們需要做一些聚合類(lèi)的處理准脂,例如:在過(guò)去的1分鐘內(nèi)有多少用戶點(diǎn)擊了我們的網(wǎng)頁(yè)。所以Flink引入了窗口概念檬洞。
窗口?窗口的作用為了周期性的獲取數(shù)據(jù)狸膏。就是把傳入的原始數(shù)據(jù)流切分成多個(gè)buckets,所有計(jì)算都在單一的buckets中進(jìn)行添怔。窗口(window)就是從 Streaming 到 Batch 的一個(gè)橋梁湾戳。
帶來(lái)的問(wèn)題:聚合類(lèi)處理帶來(lái)了新的問(wèn)題贤旷,比如亂序/延遲。其解決方案就是 Watermark / allowLateNess / sideOutPut 這一組合拳砾脑。
Watermark?的作用是防止 數(shù)據(jù)亂序 / 指定時(shí)間內(nèi)獲取不到全部數(shù)據(jù)幼驶。
allowLateNess?是將窗口關(guān)閉時(shí)間再延遲一段時(shí)間。
sideOutPut?是最后兜底操作韧衣,當(dāng)指定窗口已經(jīng)徹底關(guān)閉后盅藻,就會(huì)把所有過(guò)期延遲數(shù)據(jù)放到側(cè)輸出流,讓用戶決定如何處理畅铭。
總結(jié)起來(lái)就是說(shuō)
Windows----->Watermark----->allowLateNess----->sideOutPut? ? 用Windows把流數(shù)據(jù)分塊處理氏淑,用Watermark確定什么時(shí)候不再等待更早的數(shù)據(jù)/觸發(fā)窗口進(jìn)行計(jì)算,用allowLateNess 將窗口關(guān)閉時(shí)間再延遲一段時(shí)間硕噩。用sideOutPut 最后兜底把數(shù)據(jù)導(dǎo)出到其他地方假残。
問(wèn)題2. Watermark應(yīng)該翻譯成水位線
我最初看的一篇文章中把Watermark翻譯成“水印”。我當(dāng)時(shí)比較暈炉擅。因?yàn)榘凑f(shuō)名字一定能夠反應(yīng)事物本質(zhì)守问。但是我怎么也腦補(bǔ)不出這個(gè)”水印“的本質(zhì)。
繼續(xù)看文章內(nèi)容坑资,越來(lái)越覺(jué)得這個(gè)應(yīng)該翻譯成“水位線”。于是查了查穆端,確實(shí)英文有如下翻譯:high-water mark 高水位線(海水或洪水所達(dá)到的最高水位)袱贮。
后來(lái)逐漸看到其他文章中也有翻譯成水位線,我才放心下來(lái)体啰,終于不會(huì)出現(xiàn)第二個(gè)“套接字”這樣神奇的翻譯了攒巍。
問(wèn)題3. Watermark本質(zhì)是什么
Watermarks是基于已經(jīng)收集的消息來(lái)估算是否還有消息未到達(dá),本質(zhì)上是一個(gè)時(shí)間戳荒勇。時(shí)間戳反映的是事件發(fā)生的時(shí)間柒莉,而不是事件處理的時(shí)間。
這個(gè)從Flink的源碼就能看出來(lái)沽翔,唯一有意義的成員變量就是 timestamp兢孝。
publicfinalclassWatermarkextendsStreamElement{/*The watermark that signifies end-of-event-time. */publicstaticfinal WatermarkMAX_WATERMARK=newWatermark(Long.MAX_VALUE);/* The timestamp of the watermark in milliseconds. */privatefinal long timestamp;/* Creates a new watermark with the given timestamp in milliseconds.*/publicWatermarklong timestamp){this.timestamp=timestamp;}/*Returns the timestamp associated with this {@link Watermark} in milliseconds.**/publiclonggetTimestamp(){returntimestamp;}}
問(wèn)題4. Watermark如何解決問(wèn)題
Watermark是一種告訴Flink一個(gè)消息延遲多少的方式。它定義了什么時(shí)候不再等待更早的數(shù)據(jù)仅偎。
可以把Watermarks理解為一個(gè)水位線跨蟹,這個(gè)Watermarks在不斷的變化。Watermark實(shí)際上作為數(shù)據(jù)流的一部分隨數(shù)據(jù)流流動(dòng)橘沥。
當(dāng)Flink中的運(yùn)算符接收到Watermarks時(shí)窗轩,它明白早于該時(shí)間的消息已經(jīng)完全抵達(dá)計(jì)算引擎,即假設(shè)不會(huì)再有時(shí)間小于水位線的事件到達(dá)座咆。
這個(gè)假設(shè)是觸發(fā)窗口計(jì)算的基礎(chǔ)痢艺,只有水位線越過(guò)窗口對(duì)應(yīng)的結(jié)束時(shí)間仓洼,窗口才會(huì)關(guān)閉和進(jìn)行計(jì)算。
0x02 背景概念
流處理
流處理堤舒,最本質(zhì)的是在處理數(shù)據(jù)的時(shí)候色建,接受一條處理一條數(shù)據(jù)。
批處理植酥,則是累積數(shù)據(jù)到一定程度在處理镀岛。這是他們本質(zhì)的區(qū)別。
在設(shè)計(jì)上Flink認(rèn)為數(shù)據(jù)是流式的友驮,批處理只是流處理的特例漂羊。同時(shí)對(duì)數(shù)據(jù)分為有界數(shù)據(jù)和無(wú)界數(shù)據(jù)。
有界數(shù)據(jù)對(duì)應(yīng)批處理卸留,API對(duì)應(yīng)Dateset走越。
無(wú)界數(shù)據(jù)對(duì)應(yīng)流處理,API對(duì)應(yīng)DataStream耻瑟。
亂序(out-of-order)
什么是亂序呢旨指?可以理解為數(shù)據(jù)到達(dá)的順序和其實(shí)際產(chǎn)生時(shí)間的排序不一致。導(dǎo)致這的原因有很多喳整,比如延遲谆构,消息積壓,重試等等框都。
我們知道搬素,流處理從事件產(chǎn)生,到流經(jīng)source魏保,再到operator熬尺,中間是有一個(gè)過(guò)程和時(shí)間的。雖然大部分情況下谓罗,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來(lái)的粱哼,但是也不排除由于網(wǎng)絡(luò)、背壓等原因檩咱,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說(shuō)late element)揭措。
比如:
某數(shù)據(jù)源中的某些數(shù)據(jù)由于某種原因(如:網(wǎng)絡(luò)原因,外部存儲(chǔ)自身原因)會(huì)有5秒的延時(shí)刻蚯,也就是在實(shí)際時(shí)間的第1秒產(chǎn)生的數(shù)據(jù)有可能在第5秒中產(chǎn)生的數(shù)據(jù)之后到來(lái)(比如到Window處理節(jié)點(diǎn))蜂筹。有1~10個(gè)事件。亂序到達(dá)的序列是:2,3,4,5,1,6,3,8,9,10,7
0x03 Flink中的窗口概念
窗口
對(duì)于Flink芦倒,如果來(lái)一條消息計(jì)算一條艺挪,這樣是可以的,但是這樣計(jì)算是非常頻繁而且消耗資源,如果想做一些統(tǒng)計(jì)這是不可能的麻裳。所以對(duì)于Spark和Flink都產(chǎn)生了窗口計(jì)算口蝠。
比如 是因?yàn)槲覀兿肟吹竭^(guò)去一分鐘,過(guò)去半小時(shí)的訪問(wèn)數(shù)據(jù)津坑,這時(shí)候我們就需要窗口妙蔗。
Window:Window是處理無(wú)界流的關(guān)鍵,Windows將流拆分為一個(gè)個(gè)有限大小的buckets疆瑰,可以可以在每一個(gè)buckets中進(jìn)行計(jì)算眉反。
start_time,end_time:當(dāng)Window時(shí)時(shí)間窗口的時(shí)候,每個(gè)window都會(huì)有一個(gè)開(kāi)始時(shí)間和結(jié)束時(shí)間(前開(kāi)后閉)穆役,這個(gè)時(shí)間是系統(tǒng)時(shí)間寸五。
窗口生命周期
簡(jiǎn)而言之,只要屬于此窗口的第一個(gè)元素到達(dá)耿币,就會(huì)創(chuàng)建一個(gè)窗口梳杏,當(dāng)時(shí)間(事件或處理時(shí)間)超過(guò)其結(jié)束時(shí)間戳加上用戶指定的允許延遲時(shí),窗口將被完全刪除淹接。
例如:
使用基于事件時(shí)間的窗口策略十性,每5分鐘創(chuàng)建一個(gè)不重疊(或翻滾)的窗口并允許延遲1分鐘。? ? 假定目前是12:00塑悼。當(dāng)具有落入該間隔的時(shí)間戳的第一個(gè)元素到達(dá)時(shí)蟹肘,F(xiàn)link將為12:00到12:05之間的間隔創(chuàng)建一個(gè)新窗口澈魄,當(dāng)水位線(watermark)到12:06時(shí)間戳?xí)r將刪除它张吉。
窗口有如下組件:
Window Assigner:用來(lái)決定某個(gè)元素被分配到哪個(gè)/哪些窗口中去耐亏。
Trigger:觸發(fā)器。決定了一個(gè)窗口何時(shí)能夠被計(jì)算或清除郭怪。觸發(fā)策略可能類(lèi)似于“當(dāng)窗口中的元素?cái)?shù)量大于4”時(shí),或“當(dāng)水位線通過(guò)窗口結(jié)束時(shí)”刊橘。
Evictor:它可以在 觸發(fā)器觸發(fā)后 & 應(yīng)用函數(shù)之前和/或之后 從窗口中刪除元素鄙才。
窗口還擁有函數(shù),比如 ProcessWindowFunction促绵,ReduceFunction攒庵,AggregateFunction或FoldFunction。該函數(shù)將包含要應(yīng)用于窗口內(nèi)容的計(jì)算败晴,而觸發(fā)器指定窗口被認(rèn)為準(zhǔn)備好應(yīng)用該函數(shù)的條件浓冒。
Keyed vs Non-Keyed Windows
在定義窗口之前,要指定的第一件事是流是否需要Keyed,使用keyBy(...)將無(wú)界流分成邏輯的keyed stream尖坤。 如果未調(diào)用keyBy(...)稳懒,則表示流不是keyed stream。
對(duì)于Keyed流慢味,可以將傳入事件的任何屬性用作key场梆。 擁有Keyed stream將允許窗口計(jì)算由多個(gè)任務(wù)并行執(zhí)行墅冷,因?yàn)槊總€(gè)邏輯Keyed流可以獨(dú)立于其余任務(wù)進(jìn)行處理。 相同Key的所有元素將被發(fā)送到同一個(gè)任務(wù)或油。
在Non-Keyed流的情況下寞忿,原始流將不會(huì)被分成多個(gè)邏輯流,并且所有窗口邏輯將由單個(gè)任務(wù)執(zhí)行顶岸,即并行性為1腔彰。
窗口分類(lèi)
窗口分類(lèi)可以分成:翻滾窗口(Tumbling Window,無(wú)重疊)辖佣,滾動(dòng)窗口(Sliding Window霹抛,有重疊),和會(huì)話窗口凌简,(Session Window上炎,活動(dòng)間隙)
滾動(dòng)窗口
滾動(dòng)窗口分配器將每個(gè)元素分配給固定窗口大小的窗口。滾動(dòng)窗口大小固定的并且不重疊雏搂。例如藕施,如果指定大小為5分鐘的滾動(dòng)窗口,則將執(zhí)行當(dāng)前窗口凸郑,并且每五分鐘將啟動(dòng)一個(gè)新窗口裳食。
滑動(dòng)窗口
滑動(dòng)窗口與滾動(dòng)窗口的區(qū)別就是滑動(dòng)窗口有重復(fù)的計(jì)算部分。
滑動(dòng)窗口分配器將每個(gè)元素分配給固定窗口大小的窗口芙沥。類(lèi)似于滾動(dòng)窗口分配器诲祸,窗口的大小由窗口大小參數(shù)配置。另外一個(gè)窗口滑動(dòng)參數(shù)控制滑動(dòng)窗口的啟動(dòng)頻率(how frequently a sliding window is started)而昨。因此救氯,如果滑動(dòng)大小小于窗口大小,滑動(dòng)窗可以重疊歌憨。在這種情況下着憨,元素被分配到多個(gè)窗口。
例如务嫡,你可以使用窗口大小為10分鐘的窗口甲抖,滑動(dòng)大小為5分鐘。這樣心铃,每5分鐘會(huì)生成一個(gè)窗口准谚,包含最后10分鐘內(nèi)到達(dá)的事件。
會(huì)話窗口
會(huì)話窗口分配器通過(guò)活動(dòng)會(huì)話分組元素去扣。與滾動(dòng)窗口和滑動(dòng)窗口相比柱衔,會(huì)話窗口不會(huì)重疊,也沒(méi)有固定的開(kāi)始和結(jié)束時(shí)間。相反秀存,當(dāng)會(huì)話窗口在一段時(shí)間內(nèi)沒(méi)有接收到元素時(shí)會(huì)關(guān)閉捶码。
例如,不活動(dòng)的間隙時(shí)或链。會(huì)話窗口分配器配置會(huì)話間隙惫恼,定義所需的不活動(dòng)時(shí)間長(zhǎng)度(defines how long is the required period of inactivity)。當(dāng)此時(shí)間段到期時(shí)澳盐,當(dāng)前會(huì)話關(guān)閉祈纯,后續(xù)元素被分配到新的會(huì)話窗口。
0x04 Flink中的時(shí)間概念
Flink在流處理程序支持不同的時(shí)間概念叼耙。分別為Event Time/Processing Time/Ingestion Time腕窥,也就是事件時(shí)間、處理時(shí)間筛婉、提取時(shí)間簇爆。
從時(shí)間序列角度來(lái)說(shuō),發(fā)生的先后順序是:
事件時(shí)間(Event Time)---->提取時(shí)間(Ingestion Time)---->處理時(shí)間(Processing Time)
Event Time 是事件在現(xiàn)實(shí)世界中發(fā)生的時(shí)間爽撒,它通常由事件中的時(shí)間戳描述入蛆。
Ingestion Time 是數(shù)據(jù)進(jìn)入Apache Flink流處理系統(tǒng)的時(shí)間,也就是Flink讀取數(shù)據(jù)源時(shí)間硕勿。
Processing Time 是數(shù)據(jù)流入到具體某個(gè)算子 (消息被計(jì)算處理) 時(shí)候相應(yīng)的系統(tǒng)時(shí)間哨毁。也就是Flink程序處理該事件時(shí)當(dāng)前系統(tǒng)時(shí)間。
但是我們講解時(shí)源武,會(huì)從后往前講解扼褪,把最重要的Event Time放在最后。
處理時(shí)間
是數(shù)據(jù)流入到具體某個(gè)算子時(shí)候相應(yīng)的系統(tǒng)時(shí)間粱栖。
這個(gè)系統(tǒng)時(shí)間指的是執(zhí)行相應(yīng)操作的機(jī)器的系統(tǒng)時(shí)間话浇。當(dāng)一個(gè)流程序通過(guò)處理時(shí)間來(lái)運(yùn)行時(shí),所有基于時(shí)間的操作(如: 時(shí)間窗口)將使用各自操作所在的物理機(jī)的系統(tǒng)時(shí)間闹究。
ProcessingTime 有最好的性能和最低的延遲幔崖。但在分布式計(jì)算環(huán)境或者異步環(huán)境中,ProcessingTime具有不確定性跋核,相同數(shù)據(jù)流多次運(yùn)行有可能產(chǎn)生不同的計(jì)算結(jié)果。因?yàn)樗菀资艿綇挠涗浀竭_(dá)系統(tǒng)的速度(例如從消息隊(duì)列)到記錄在系統(tǒng)內(nèi)的operator之間流動(dòng)的速度的影響(停電叛买,調(diào)度或其他)砂代。
提取時(shí)間
IngestionTime是數(shù)據(jù)進(jìn)入Apache Flink框架的時(shí)間,是在Source Operator中設(shè)置的率挣。每個(gè)記錄將源的當(dāng)前時(shí)間作為時(shí)間戳刻伊,并且后續(xù)基于時(shí)間的操作(如時(shí)間窗口)引用該時(shí)間戳。
提取時(shí)間在概念上位于事件時(shí)間和處理時(shí)間之間。與處理時(shí)間相比捶箱,它稍早一些智什。IngestionTime與ProcessingTime相比可以提供更可預(yù)測(cè)的結(jié)果,因?yàn)镮ngestionTime的時(shí)間戳比較穩(wěn)定(在源處只記錄一次)丁屎,所以同一數(shù)據(jù)在流經(jīng)不同窗口操作時(shí)將使用相同的時(shí)間戳荠锭,而對(duì)于ProcessingTime同一數(shù)據(jù)在流經(jīng)不同窗口算子會(huì)有不同的處理時(shí)間戳。
與事件時(shí)間相比晨川,提取時(shí)間程序無(wú)法處理任何無(wú)序事件或后期數(shù)據(jù)证九,但程序不必指定如何生成水位線。
在內(nèi)部共虑,提取時(shí)間與事件時(shí)間非常相似愧怜,但具有自動(dòng)時(shí)間戳分配和自動(dòng)水位線生成功能。
事件時(shí)間
事件時(shí)間就是事件在真實(shí)世界的發(fā)生時(shí)間妈拌,即每個(gè)事件在產(chǎn)生它的設(shè)備上發(fā)生的時(shí)間(當(dāng)?shù)貢r(shí)間)拥坛。比如一個(gè)點(diǎn)擊事件的時(shí)間發(fā)生時(shí)間,是用戶點(diǎn)擊操作所在的手機(jī)或電腦的時(shí)間尘分。
在進(jìn)入Apache Flink框架之前EventTime通常要嵌入到記錄中猜惋,并且EventTime也可以從記錄中提取出來(lái)。在實(shí)際的網(wǎng)上購(gòu)物訂單等業(yè)務(wù)場(chǎng)景中音诫,大多會(huì)使用EventTime來(lái)進(jìn)行數(shù)據(jù)計(jì)算惨奕。
基于事件時(shí)間處理的強(qiáng)大之處在于即使在亂序事件,延遲事件竭钝,歷史數(shù)據(jù)以及從備份或持久化日志中的重復(fù)數(shù)據(jù)也能獲得正確的結(jié)果梨撞。對(duì)于事件時(shí)間,時(shí)間的進(jìn)度取決于數(shù)據(jù)香罐,而不是任何時(shí)鐘卧波。
事件時(shí)間程序必須指定如何生成事件時(shí)間的Watermarks,這是表示事件時(shí)間進(jìn)度的機(jī)制庇茫。
現(xiàn)在假設(shè)我們正在創(chuàng)建一個(gè)排序的數(shù)據(jù)流港粱。這意味著應(yīng)用程序處理流中的亂序到達(dá)的事件,并生成同樣事件但按時(shí)間戳(事件時(shí)間)排序的新數(shù)據(jù)流旦签。
比如:
有1~10個(gè)事件查坪。亂序到達(dá)的序列是:1,2,4,5,6,3,8,9,10,7經(jīng)過(guò)按 事件時(shí)間 處理后的序列是:1,2,3,4,5,6,7,8,9,10
為了處理事件時(shí)間,F(xiàn)link需要知道事件的時(shí)間戳宁炫,這意味著流中的每條數(shù)據(jù)都需要分配其事件時(shí)間戳偿曙。這通常通過(guò)提取每條數(shù)據(jù)中的固定字段來(lái)完成時(shí)間戳的獲取。
設(shè)定時(shí)間特性
Flink DataStream 程序的第一部分通常是設(shè)置基本時(shí)間特性羔巢。 該設(shè)置定義了數(shù)據(jù)流源的行為方式(例如:它們是否將分配時(shí)間戳)望忆,以及像?KeyedStream.timeWindow(Time.seconds(30))?這樣的窗口操作應(yīng)該使用上面哪種時(shí)間概念罩阵。
比如:
final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
0x05 Watermark
前文講到了事件時(shí)間,這個(gè)真實(shí)發(fā)生的時(shí)間是我們業(yè)務(wù)在實(shí)時(shí)處理程序中非常關(guān)心的启摄。在一個(gè)理想的情況下稿壁,事件時(shí)間處理將產(chǎn)生完全一致和確定的結(jié)果,無(wú)論事件何時(shí)到達(dá)或其排序歉备。但是在現(xiàn)實(shí)中傅是,消息不在是按照順序發(fā)送,產(chǎn)生了亂序威创,這時(shí)候該怎么處理落午?
Watermark是Apache Flink為了處理EventTime 窗口計(jì)算提出的一種機(jī)制,本質(zhì)上也是一種時(shí)間戳。watermark是用于處理亂序事件或延遲數(shù)據(jù)的肚豺,這通常用watermark機(jī)制結(jié)合window來(lái)實(shí)現(xiàn)(Watermarks用來(lái)觸發(fā)window窗口計(jì)算)溃斋。
比如對(duì)于late element,我們不能無(wú)限期的等下去吸申,必須要有個(gè)機(jī)制來(lái)保證一個(gè)特定的時(shí)間后梗劫,必須觸發(fā)window去進(jìn)行計(jì)算了。這個(gè)特別的機(jī)制截碴,就是watermark梳侨。 可以把Watermark看作是一種告訴Flink一個(gè)消息延遲多少的方式。定義了什么時(shí)候不再等待更早的數(shù)據(jù)日丹。
1. 窗口觸發(fā)條件
上面談到了對(duì)數(shù)據(jù)亂序問(wèn)題的處理機(jī)制是watermark+window走哺,那么window什么時(shí)候該被觸發(fā)呢?
基于Event Time的事件處理哲虾,F(xiàn)link默認(rèn)的事件觸發(fā)條件為:
對(duì)于out-of-order及正常的數(shù)據(jù)而言
watermark的時(shí)間戳 > = window endTime
在 [window_start_time,window_end_time] 中有數(shù)據(jù)存在丙躏。
對(duì)于late element太多的數(shù)據(jù)而言
Event Time > watermark的時(shí)間戳
WaterMark相當(dāng)于一個(gè)EndLine,一旦Watermarks大于了某個(gè)window的end_time束凑,就意味著windows_end_time時(shí)間和WaterMark時(shí)間相同的窗口開(kāi)始計(jì)算執(zhí)行了晒旅。
就是說(shuō),我們根據(jù)一定規(guī)則汪诉,計(jì)算出Watermarks废恋,并且設(shè)置一些延遲,給遲到的數(shù)據(jù)一些機(jī)會(huì)扒寄,也就是說(shuō)正常來(lái)講鱼鼓,對(duì)于遲到的數(shù)據(jù),我只等你一段時(shí)間该编,再不來(lái)就沒(méi)有機(jī)會(huì)了迄本。
WaterMark時(shí)間可以用Flink系統(tǒng)現(xiàn)實(shí)時(shí)間,也可以用處理數(shù)據(jù)所攜帶的Event time上渴。
使用Flink系統(tǒng)現(xiàn)實(shí)時(shí)間岸梨,在并行和多線程中需要注意的問(wèn)題較少,因?yàn)槎际且袁F(xiàn)實(shí)時(shí)間為標(biāo)準(zhǔn)稠氮。
如果使用處理數(shù)據(jù)所攜帶的Event time作為WaterMark時(shí)間曹阔,需要注意兩點(diǎn):
因?yàn)閿?shù)據(jù)到達(dá)并不是循序的,注意保存一個(gè)當(dāng)前最大時(shí)間戳作為WaterMark時(shí)間
并行同步問(wèn)題
2. WaterMark設(shè)定方法
標(biāo)點(diǎn)水位線(Punctuated Watermark)
標(biāo)點(diǎn)水位線(Punctuated Watermark)通過(guò)數(shù)據(jù)流中某些特殊標(biāo)記事件來(lái)觸發(fā)新水位線的生成隔披。這種方式下窗口的觸發(fā)與時(shí)間無(wú)關(guān)赃份,而是決定于何時(shí)收到標(biāo)記事件。
在實(shí)際的生產(chǎn)中Punctuated方式在TPS很高的場(chǎng)景下會(huì)產(chǎn)生大量的Watermark在一定程度上對(duì)下游算子造成壓力奢米,所以只有在實(shí)時(shí)性要求非常高的場(chǎng)景才會(huì)選擇Punctuated的方式進(jìn)行Watermark的生成抓韩。
定期水位線(Periodic Watermark)
周期性的(允許一定時(shí)間間隔或者達(dá)到一定的記錄條數(shù))產(chǎn)生一個(gè)Watermark。水位線提升的時(shí)間間隔是由用戶設(shè)置的鬓长,在兩次水位線提升時(shí)隔內(nèi)會(huì)有一部分消息流入谒拴,用戶可以根據(jù)這部分?jǐn)?shù)據(jù)來(lái)計(jì)算出新的水位線。
在實(shí)際的生產(chǎn)中Periodic的方式必須結(jié)合時(shí)間和積累條數(shù)兩個(gè)維度繼續(xù)周期性產(chǎn)生Watermark涉波,否則在極端情況下會(huì)有很大的延時(shí)英上。
舉個(gè)例子,最簡(jiǎn)單的水位線算法就是取目前為止最大的事件時(shí)間啤覆,然而這種方式比較暴力苍日,對(duì)亂序事件的容忍程度比較低,容易出現(xiàn)大量遲到事件窗声。
3. 遲到事件
雖說(shuō)水位線表明著早于它的事件不應(yīng)該再出現(xiàn)相恃,但是上如上文所講,接收到水位線以前的的消息是不可避免的笨觅,這就是所謂的遲到事件拦耐。實(shí)際上遲到事件是亂序事件的特例,和一般亂序事件不同的是它們的亂序程度超出了水位線的預(yù)計(jì)屋摇,導(dǎo)致窗口在它們到達(dá)之前已經(jīng)關(guān)閉揩魂。
遲到事件出現(xiàn)時(shí)窗口已經(jīng)關(guān)閉并產(chǎn)出了計(jì)算結(jié)果,因此處理的方法有3種:
重新激活已經(jīng)關(guān)閉的窗口并重新計(jì)算以修正結(jié)果炮温。
將遲到事件收集起來(lái)另外處理火脉。
將遲到事件視為錯(cuò)誤消息并丟棄。
Flink 默認(rèn)的處理方式是第3種直接丟棄柒啤,其他兩種方式分別使用Side Output和Allowed Lateness倦挂。
Side Output機(jī)制可以將遲到事件單獨(dú)放入一個(gè)數(shù)據(jù)流分支,這會(huì)作為 window 計(jì)算結(jié)果的副產(chǎn)品担巩,以便用戶獲取并對(duì)其進(jìn)行特殊處理方援。
Allowed Lateness機(jī)制允許用戶設(shè)置一個(gè)允許的最大遲到時(shí)長(zhǎng)。Flink 會(huì)在窗口關(guān)閉后一直保存窗口的狀態(tài)直至超過(guò)允許遲到時(shí)長(zhǎng)涛癌,這期間的遲到事件不會(huì)被丟棄犯戏,而是默認(rèn)會(huì)觸發(fā)窗口重新計(jì)算送火。因?yàn)楸4娲翱跔顟B(tài)需要額外內(nèi)存,并且如果窗口計(jì)算使用了?ProcessWindowFunction?API 還可能使得每個(gè)遲到事件觸發(fā)一次窗口的全量計(jì)算先匪,代價(jià)比較大种吸,所以允許遲到時(shí)長(zhǎng)不宜設(shè)得太長(zhǎng),遲到事件也不宜過(guò)多呀非,否則應(yīng)該考慮降低水位線提高的速度或者調(diào)整算法坚俗。
這里總結(jié)機(jī)制為:
窗口window 的作用是為了周期性的獲取數(shù)據(jù)。
watermark的作用是防止數(shù)據(jù)出現(xiàn)亂序(經(jīng)常)岸裙,事件時(shí)間內(nèi)獲取不到指定的全部數(shù)據(jù)猖败,而做的一種保險(xiǎn)方法。
allowLateNess是將窗口關(guān)閉時(shí)間再延遲一段時(shí)間降允。
sideOutPut是最后兜底操作恩闻,所有過(guò)期延遲數(shù)據(jù),指定窗口已經(jīng)徹底關(guān)閉了剧董,就會(huì)把數(shù)據(jù)放到側(cè)輸出流判呕。
4. 實(shí)例
采用系統(tǒng)時(shí)間做Watermark
我們將水位線設(shè)置為當(dāng)前系統(tǒng)時(shí)間間-5秒。
override defgetCurrentWatermark():Watermark={newWatermark(System.currentTimeMillis-5000)}
通常最好保持接收到的最大時(shí)間戳送滞,并創(chuàng)建具有最大預(yù)期延遲的水位線侠草,而不是從當(dāng)前系統(tǒng)時(shí)間減去。
采用Event Time做watermark
例如基于Event Time的數(shù)據(jù)犁嗅,自身都包含一個(gè)類(lèi)型為timestamp的字段边涕,假設(shè)叫做rowtime,例如1543903383(2018-12-04 14:03:03)褂微,定義一個(gè)基于rowtime列功蜓,策略為偏移3s的watermark,這條數(shù)據(jù)的水位線時(shí)間戳則是:
1543903383-3000=1543900383(2018-12-0414:03:00)
該條數(shù)據(jù)的水位線時(shí)間含義:timestamp小于1543900383(2018-12-04 14:03:00)的數(shù)據(jù)宠蚂,都已經(jīng)到達(dá)了式撼。
classBoundedOutOfOrdernessGeneratorextendsAssignerWithPeriodicWatermarks[MyEvent]{val maxOutOfOrderness=3000L;// 3 secondsvarcurrentMaxTimestamp:Long;override defextractTimestamp(element:MyEvent,previousElementTimestamp:Long):Long={val timestamp=element.getCreationTime()currentMaxTimestamp=max(timestamp,currentMaxTimestamp)timestamp;}override defgetCurrentWatermark():Watermark={// return the watermark as current highest timestamp minus the out-of-orderness boundnewWatermark(currentMaxTimestamp-maxOutOfOrderness);}}
看看如何觸發(fā)窗口
我們明白了窗口的觸發(fā)機(jī)制,這里我們添加了水位線求厕,到底是個(gè)怎么個(gè)情況著隆?我們來(lái)看下面
假如我們?cè)O(shè)置10s的時(shí)間窗口(window),那么0~10s呀癣,10~20s都是一個(gè)窗口美浦,以0~10s為例,0為start-time项栏,10為end-time浦辨。假如有4個(gè)數(shù)據(jù)的event-time分別是8(A),12.5(B),9(C),13.5(D),我們?cè)O(shè)置Watermarks為當(dāng)前所有到達(dá)數(shù)據(jù)event-time的最大值減去延遲值3.5秒
當(dāng)A到達(dá)的時(shí)候沼沈,Watermarks為max{8}-3.5=8-3.5 = 4.5 < 10,不會(huì)觸發(fā)計(jì)算
當(dāng)B到達(dá)的時(shí)候流酬,Watermarks為max(12.5,8)-3.5=12.5-3.5 = 9 < 10,不會(huì)觸發(fā)計(jì)算
當(dāng)C到達(dá)的時(shí)候币厕,Watermarks為max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不會(huì)觸發(fā)計(jì)算
當(dāng)D到達(dá)的時(shí)候,Watermarks為max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,觸發(fā)計(jì)算
觸發(fā)計(jì)算的時(shí)候芽腾,會(huì)將A劈榨,C(因?yàn)樗麄兌夹∮?0)都計(jì)算進(jìn)去,其中C是遲到的晦嵌。
max這個(gè)很關(guān)鍵,就是當(dāng)前窗口內(nèi)拷姿,所有事件的最大事件惭载。
這里的延遲3.5s是我們假設(shè)一個(gè)數(shù)據(jù)到達(dá)的時(shí)候,比他早3.5s的數(shù)據(jù)肯定也都到達(dá)了响巢,這個(gè)是需要根據(jù)經(jīng)驗(yàn)推算描滔。假設(shè)加入D到達(dá)以后有到達(dá)了一個(gè)E,event-time=6,但是由于0~10的時(shí)間窗口已經(jīng)開(kāi)始計(jì)算了踪古,所以E就丟了含长。
從這里上面E的丟失說(shuō)明,水位線也不是萬(wàn)能的伏穆,但是如果根據(jù)我們自己的生產(chǎn)經(jīng)驗(yàn)+側(cè)道輸出等方案拘泞,可以做到數(shù)據(jù)不丟失。
0x06 Flink源碼
數(shù)據(jù)結(jié)構(gòu)定義
在Flink DataStream中流動(dòng)著不同的元素枕扫,統(tǒng)稱為StreamElement陪腌,StreamElement可以是StreamRecord、Watermark烟瞧、StreamStatus诗鸭、LatencyMarker中任何一種類(lèi)型。
StreamElement
StreamElement是一個(gè)抽象類(lèi)(是Flink 承載消息的基類(lèi))参滴,其他四種類(lèi)型繼承StreamElement强岸。
publicabstractclassStreamElement{//判斷是否是Watermarkpublicfinal booleanisWatermark(){returngetClass()==Watermark.class;}//判斷是否為StreamStatuspublicfinal booleanisStreamStatus(){returngetClass()==StreamStatus.class;}//判斷是否為StreamRecordpublicfinal booleanisRecord(){returngetClass()==StreamRecord.class;}//判斷是否為L(zhǎng)atencyMarkerpublicfinal booleanisLatencyMarker(){returngetClass()==LatencyMarker.class;}//轉(zhuǎn)換為StreamRecordpublicfinal? StreamRecordasRecord(){return(StreamRecord)this;}//轉(zhuǎn)換為Watermarkpublicfinal WatermarkasWatermark(){return(Watermark)this;}//轉(zhuǎn)換為StreamStatuspublicfinal StreamStatusasStreamStatus(){return(StreamStatus)this;}//轉(zhuǎn)換為L(zhǎng)atencyMarkerpublicfinal LatencyMarkerasLatencyMarker(){return(LatencyMarker)this;}}
Watermark
Watermark繼承了StreamElement。Watermark 是和事件一個(gè)級(jí)別的抽象砾赔,其內(nèi)部包含一個(gè)成員變量時(shí)間戳timestamp蝌箍,標(biāo)識(shí)當(dāng)前數(shù)據(jù)的時(shí)間進(jìn)度。Watermark實(shí)際上作為數(shù)據(jù)流的一部分隨數(shù)據(jù)流流動(dòng)暴心。
@PublicEvolvingpublicfinalclassWatermarkextendsStreamElement{/*The watermark that signifies end-of-event-time. */publicstaticfinal WatermarkMAX_WATERMARK=newWatermark(Long.MAX_VALUE);/* The timestamp of the watermark in milliseconds. */privatefinal long timestamp;/* Creates a new watermark with the given timestamp in milliseconds.*/publicWatermarklong timestamp){this.timestamp=timestamp;}/*Returns the timestamp associated with this {@link Watermark} in milliseconds.**/publiclonggetTimestamp(){returntimestamp;}}
Flink如何生成&處理Watermark
在實(shí)際使用中大多數(shù)情況下會(huì)選擇周期性生成方式也就是AssignerWithPeriodicWatermarks方式.
//指定為evenTime時(shí)間語(yǔ)義env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//生成watermark的周期env.getConfig.setAutoWatermarkInterval(watermarkInterval)//指定方式dataStream.assignTimestampsAndWatermarks(newBoundedOutOfOrdernessTimestampExtractor[Element](Time.seconds(allowDealy)){override defextractTimestamp(element:Element):Long=element.dT})
BoundedOutOfOrdernessTimestampExtractor 是Flink內(nèi)置提供的允許亂序最大延時(shí)的watermark生成方式十绑,只需要重寫(xiě)其extractTimestamp方法即可。
assignTimestampsAndWatermarks 可以理解為是一個(gè)算子轉(zhuǎn)換操作酷勺,等同于map/window一樣理解本橙,可以為其設(shè)置并行度、名稱脆诉,也是一個(gè)transformation/operator甚亭,
publicSingleOutputStreamOperatorassignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks timestampAndWatermarkAssigner){// match parallelism to input, otherwise dop=1 sources could lead to some strange// behaviour: the watermark will creep along very slowly because the elements// from the source go to each extraction operator round robin.final int inputParallelism=getTransformation().getParallelism();final AssignerWithPeriodicWatermarks cleanedAssigner=clean(timestampAndWatermarkAssigner);TimestampsAndPeriodicWatermarksOperator operator=newTimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);returntransform("Timestamps/Watermarks",getTransformation().getOutputType(),operator).setParallelism(inputParallelism);}
其使用的StreamOperator類(lèi)型TimestampsAndPeriodicWatermarksOperator贷币,繼承了AbstractUdfStreamOperator,實(shí)現(xiàn)了OneInputStreamOperator接口與ProcessingTimeCallback接口亏狰,
TimestampsAndPeriodicWatermarksOperator役纹。
/**
* A stream operator that extracts timestamps from stream elements and
* generates periodic watermarks.
*
* @param? The type of the input elements
*/publicclassTimestampsAndPeriodicWatermarksOperatorextendsAbstractUdfStreamOperator>implementsOneInputStreamOperator,ProcessingTimeCallback{privatestaticfinal long serialVersionUID=1L;privatetransient long watermarkInterval;privatetransient long currentWatermark;publicTimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks assigner){super(assigner);this.chainingStrategy=ChainingStrategy.ALWAYS;}@Overridepublicvoidopen()throws Exception{super.open();//初始化默認(rèn)當(dāng)前watermarkcurrentWatermark=Long.MIN_VALUE;//生成watermark周期時(shí)間配置watermarkInterval=getExecutionConfig().getAutoWatermarkInterval();//注冊(cè)定時(shí)其配置if(watermarkInterval>0){long now=getProcessingTimeService().getCurrentProcessingTime();//注冊(cè)一個(gè)watermarkInterval后觸發(fā)的定時(shí)器,傳入回調(diào)參數(shù)是this暇唾,也就是會(huì)調(diào)用當(dāng)前對(duì)象的onProcessingTime方法getProcessingTimeService().registerTimer(now+watermarkInterval,this);}}@OverridepublicvoidprocessElement(StreamRecord element)throws Exception{//提取當(dāng)前的事件時(shí)間final long newTimestamp=userFunction.extractTimestamp(element.getValue(),element.hasTimestamp()?element.getTimestamp():Long.MIN_VALUE);//保存當(dāng)前最大的事件時(shí)間促脉。output.collect(element.replace(element.getValue(),newTimestamp));}@OverridepublicvoidonProcessingTime(long timestamp)throws Exception{//此方法表示的就是定時(shí)回調(diào)的方法,將符合要求的watermark發(fā)送出去并且注冊(cè)下一個(gè)定時(shí)器策州。// register next timerWatermark newWatermark=userFunction.getCurrentWatermark();//當(dāng)新的watermark大于當(dāng)前的watermarkif(newWatermark!=null&&newWatermark.getTimestamp()>currentWatermark){currentWatermark=newWatermark.getTimestamp();//將符合要求的watermark發(fā)送出去// emit watermarkoutput.emitWatermark(newWatermark);}//注冊(cè)下一次觸發(fā)時(shí)間long now=getProcessingTimeService().getCurrentProcessingTime();getProcessingTimeService().registerTimer(now+watermarkInterval,this);}/**
? ? * Override the base implementation to completely ignore watermarks propagated from
? ? * upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit
? ? * watermarks from here).
? ? */@OverridepublicvoidprocessWatermark(Watermark mark)throws Exception{//用來(lái)處理上游發(fā)送過(guò)來(lái)的watermark瘸味,可以認(rèn)為不做任何處理,下游的watermark只與其上游最近的生成方式相關(guān)够挂。// if we receive a Long.MAX_VALUE watermark we forward it since it is used// to signal the end of input and to not block watermark progress downstreamif(mark.getTimestamp()==Long.MAX_VALUE&¤tWatermark!=Long.MAX_VALUE){currentWatermark=Long.MAX_VALUE;output.emitWatermark(mark);}}@Overridepublicvoidclose()throws Exception{super.close();// emit a final watermarkWatermark newWatermark=userFunction.getCurrentWatermark();if(newWatermark!=null&&newWatermark.getTimestamp()>currentWatermark){currentWatermark=newWatermark.getTimestamp();// emit watermarkoutput.emitWatermark(newWatermark);}}}
Flink如何處理遲到數(shù)據(jù)
這里我們使用 Side Output機(jī)制來(lái)說(shuō)明旁仿。Side Output機(jī)制可以將遲到事件單獨(dú)放入一個(gè)數(shù)據(jù)流分支,這會(huì)作為 window 計(jì)算結(jié)果的副產(chǎn)品孽糖,以便用戶獲取并對(duì)其進(jìn)行特殊處理枯冈。
生成新的Watermark
Flink會(huì)替換StreamRecord 對(duì)象中的Timestamp,如果 根據(jù)當(dāng)前事件的Timestamp 生成的Watermark 大于上一次的Watermark办悟,就發(fā)出新的Watermark尘奏。
具體代碼在 TimestampsAndPunctuatedWatermarksOperator.processElement。
@OverridepublicvoidprocessElement(StreamRecord element)throws Exception{finalTvalue=element.getValue();// 調(diào)用 用戶實(shí)現(xiàn)的 extractTimestamp 獲取新的Timestampfinal long newTimestamp=userFunction.extractTimestamp(value,element.hasTimestamp()?element.getTimestamp():Long.MIN_VALUE);// 用新Timestamp 替換StreamRecord中的舊Timestampoutput.collect(element.replace(element.getValue(),newTimestamp));// 調(diào)用 用戶實(shí)現(xiàn)的 checkAndGetNextWatermark 方法獲取下一個(gè)Watermarkfinal Watermark nextWatermark=userFunction.checkAndGetNextWatermark(value,newTimestamp);// 如果下一個(gè)Watermark 大于當(dāng)前Watermark病蛉,就發(fā)出新的Watermarkif(nextWatermark!=null&&nextWatermark.getTimestamp()>currentWatermark){currentWatermark=nextWatermark.getTimestamp();output.emitWatermark(nextWatermark);}}
處理遲到數(shù)據(jù)
首先罪既,判斷是否是遲到數(shù)據(jù)。
@OverridepublicvoidprocessElement(StreamRecord element)throws Exception{for(Wwindow:elementWindows){// drop if the window is already late// 如果窗口已經(jīng)遲到了铡恕,則處理下一條數(shù)據(jù)if(isWindowLate(window)){continue;}}......}/**
Returns {@code true} if the watermark is after the end timestamp plus the allowed lateness of the given window.
*/protectedbooleanisWindowLate(Wwindow){// 當(dāng)前機(jī)制是 事件時(shí)間 && 窗口元素的最大時(shí)間戳 + 允許遲到時(shí)間 <= 當(dāng)前水位線 的時(shí)候?yàn)閠rue(即當(dāng)前窗口元素遲到了)return(windowAssigner.isEventTime()&&(cleanupTime(window)<=internalTimerService.currentWatermark()));}/**
* Returns the cleanup time for a window, which is
* {@code window.maxTimestamp + allowedLateness}. In
* case this leads to a value greater than {@link Long#MAX_VALUE}
* then a cleanup time of {@link Long#MAX_VALUE} is
* returned.
*
* @param window the window whose cleanup time we are computing.
*/privatelongcleanupTime(Wwindow){if(windowAssigner.isEventTime()){long cleanupTime=window.maxTimestamp()+allowedLateness;//返回窗口的 cleanup 時(shí)間 : 窗口元素的最大時(shí)間戳 + 允許延遲的時(shí)間returncleanupTime>=window.maxTimestamp()?cleanupTime:Long.MAX_VALUE;}else{returnwindow.maxTimestamp();}}
其次琢感,處理遲到數(shù)據(jù)的具體代碼在WindowOperator.processElement 方法的最后一段。這里就是旁路輸出探熔。
@OverridepublicvoidprocessElement(StreamRecord element)throws Exception{......// 其他操作......// side output input event if element not handled by any window? late arriving tag has been set// 如果沒(méi)有window處理過(guò)這條數(shù)據(jù)驹针,isSkippedElement = true,如果上面判斷為遲到數(shù)據(jù)诀艰,isSkippedElement = false// windowAssigner is event time and current timestamp + allowed lateness no less than element timestampif(isSkippedElement&&isElementLate(element)){if(lateDataOutputTag!=null){//旁路輸出//這就是我們之前提到的柬甥,F(xiàn)link 的 Side Output 機(jī)制可以將遲到事件單獨(dú)放入一個(gè)數(shù)據(jù)流分支,這會(huì)作為 window 計(jì)算結(jié)果的副產(chǎn)品其垄,以便用戶獲取并對(duì)其進(jìn)行特殊處理苛蒲。sideOutput(element);}else{this.numLateRecordsDropped.inc();}}}/**
* Decide if a record is currently late, based on current watermark and allowed lateness.
* 當(dāng)前機(jī)制是 事件時(shí)間 && (元素時(shí)間戳 + 允許延遲的時(shí)間) <= 當(dāng)前水位線
* @param element The element to check
* @return The element for which should be considered when sideoutputs
*/protectedbooleanisElementLate(StreamRecord element){return(windowAssigner.isEventTime())&&(element.getTimestamp()+allowedLateness<=internalTimerService.currentWatermark());}/**
* Write skipped late arriving element to SideOutput.
* // 把數(shù)據(jù)輸出到旁路,供用戶決定如何處理绿满。
* @param element skipped late arriving element to side output
*/protectedvoidsideOutput(StreamRecord element){output.collect(lateDataOutputTag,element);}