注:本文轉(zhuǎn)自我的個人博客龄章。
Watermark是流式處理中的一個基礎(chǔ)概念乞封,關(guān)于Watermark的概念有很多做裙,在這里不做闡述。
Watermark特性
這不是官方的表述肃晚,我根據(jù)自己的認(rèn)知覺得Watermark有以下幾個特點:
- 基于已有數(shù)據(jù)生成Watermark锚贱,Apache Flink生成Watermark有Punctuated和Periodic兩種方式,但大多數(shù)人的使用方法都是基于已有數(shù)據(jù)生成Watermark关串。這個會給我們帶來一定的問題,這個在后面講到吧碾。
- 只允許遞增。這個特性讓我們需要對可能產(chǎn)生"臟"Watermark的情況進行判斷倦春,比如需不需要對出現(xiàn)的未來時間做處理等落剪。
- StreamInputProcessor從多個channel中收到的多個Watermark取最小值傳遞給下游算子。這也可能成為在數(shù)據(jù)傾斜時忠怖,導(dǎo)致程序OOM掛掉的原因之一呢堰。
isWatermarkAligned
在Watermark相關(guān)源碼中經(jīng)衬杂郑可以看到Watermark對齊的判斷,如果各個子任務(wù)中的Watermark都對齊的話往衷,那是最理想的情況严卖。然而實際生產(chǎn)中席舍,總會很多意外哮笆。這里有兩個極端的例子。
一條數(shù)據(jù)流長時間沒有數(shù)據(jù)福铅,其他流數(shù)據(jù)正常项阴。
出現(xiàn)這種情況的原因有很多滑黔,比如數(shù)據(jù)本身是波峰狀环揽,數(shù)據(jù)被上游算子過濾掉等。剛剛提到汛兜,下游算子的Watermark取決于多個上游算子Watermark的最小值,那么如果一條數(shù)據(jù)流長時間沒有數(shù)據(jù)粥谬,豈不是會造成Watermark一直停留在原地不動的情況衡创?
當(dāng)然不會,針對這種情況璃氢,F(xiàn)link在數(shù)據(jù)流中實現(xiàn)了一個Idle的概念。 用戶首先需要設(shè)置timeout(IdleTimeout)巢寡,這個timeout表示數(shù)據(jù)源在多久沒有收到數(shù)據(jù)后椰苟,數(shù)據(jù)流要判斷為Idle抑月,下游Window算子在接收到Idle狀態(tài)后舆蝴,將不再使用這個Channel之前留下的Watermark题诵,而用其他Active Channel發(fā)送的Watermark數(shù)據(jù)來計算层皱。
如上圖所示,Source(1)在接收到數(shù)據(jù)后叫胖,會觸發(fā)生成一個timeout之后調(diào)用的callback,如果在timeout時間長度中怎棱,沒有再接收新的數(shù)據(jù)绷跑,便會向下游發(fā)送Idle狀態(tài),將自己標(biāo)識為Idle砸捏。
數(shù)據(jù)傾斜。
假設(shè)單條數(shù)據(jù)流傾斜带膜,那么該數(shù)據(jù)流中處理的數(shù)據(jù)所帶的時間戳,是遠(yuǎn)低于其他數(shù)據(jù)流中事件的時間戳式廷。
如圖所示芭挽,假設(shè)Watermark設(shè)置為收到的時間戳-1,那么Window的Watermark始終都保持在0袜爪,這會導(dǎo)致Window存儲大量的窗口,并且窗口狀態(tài)無法釋放辛馆,極有可能出現(xiàn)OOM。這個問題目前沒有好的解決辦法腊状,需要具體情況具體分析了苔可。
State Of WatermarkOperator
stream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[PatternWrapper] {
override def getCurrentWatermark: Watermark = new Watermark(System.currentTimeMillis() - 15000)
override def extractTimestamp(element: PatternWrapper, previousElementTimestamp: Long): Long = {
System.currentTimeMillis()
}
})
我們都知道,用戶可以通過顯式調(diào)用這種方法來實現(xiàn)Watermark焚辅,那么對于這種方法苟鸯,本質(zhì)上是生成了一個WatermarkOperator棚点,并且在Operator中計算Watermark傳遞給下游的算子。
此時問題來了乙濒,剛剛提到Watermark是由已有數(shù)據(jù)產(chǎn)生的卵蛉,那么在程序剛恢復(fù)還沒有發(fā)送數(shù)據(jù)的時候怎么辦?當(dāng)然不能選擇當(dāng)前時間甘有,這樣的話,稍晚一點的事件就被過濾掉了亏掀。
這就引出一個問題泛释,Watermark是不會存儲在Checkpoint中的,也就是說WatermarkOperator是一個無狀態(tài)算子怜校,那么程序啟動時,F(xiàn)link自身的邏輯是初始化Watermark為Long.MIN_VALUE茄茁。由于我們架構(gòu)設(shè)計的原因,對臟數(shù)據(jù)非常敏感付燥,不允許發(fā)送過于久的歷史數(shù)據(jù)愈犹,于是我們將WatermarkOperator的算子改成了有狀態(tài)的算子,其中為了兼容并行度scale的情況漩怎,我們將Watermark設(shè)置為所有數(shù)據(jù)流中Watermark的最小值。具體的JIRA可以看FLINK-5601中提出的一些方案和代碼改動扬卷。