Flink對流流JOIN的支持
Flink對于join的支持有多種支持谭跨,可參考 Flink Join類型, 本文主要討論Time interval join支持Table API的雙流join让簿,同時支持基于EventTime 和 ProcessingTime的的流流join。 Flink在TableApi中將流作為表使用,下文也不再區(qū)分流和表笑窜。
Flink對于interval join的支持從1.4版本開始,直到Flink1.6登疗,經(jīng)過幾個版本的增強排截,形成了從開始的Table/Sql Api的支持,到后續(xù)DataStream Api的支持辐益,從開始的inner join 到后來的left outer断傲,right outer, full outerjoin的支持,算是完成了FLink對雙流關(guān)聯(lián)的支持智政,不同版本的功能支持如下:
Flink版本 | Join支持類型 | join API |
---|---|---|
1.4 | inner | Table/SQL |
1.5 | inner,left,right,full | Table/SQL |
1.6 | inner,left,right,full | Table/SQL/DataStream |
從官方給出的Release Note來看认罩,F(xiàn)link1.4,F(xiàn)link1.5中的雙流join是指windowed join续捂,但從官方給出的樣例以及源碼來看猜年,此處的Windowed Join 應(yīng)當指的就是interval join;鑒于Flink版本近期的變更較大,筆者不再在原有老版本中測試相關(guān)功能疾忍,下文的介紹基于Flink最新release版本1.8
關(guān)于Interval Join
在流與流的join中乔外,與其他window join相比,window中的關(guān)聯(lián)通常是兩個流中對應(yīng)的window中的消息可以發(fā)生關(guān)聯(lián)一罩, 不能跨window杨幼,Interval Join則沒有window的概念,直接用時間戳作為關(guān)聯(lián)的條件聂渊,更具表達力差购。由于流消息的無限性以及消息亂序的影響,本應(yīng)關(guān)聯(lián)上的消息可能進入處理系統(tǒng)的時間有較大差異汉嗽,一條流中的消息欲逃,可能需要和另一條流的多條消息關(guān)聯(lián),因此流流關(guān)聯(lián)時饼暑,通常需要類似如下關(guān)聯(lián)條件:
1. 等值條件如 a.id = b.id
2. 時間戳范圍條件 : a.timestamp ∈ [b.timestamp + lowerBound; b.timestamp + upperBound] b.timestamp + lowerBound <= a.timestamp and a.timestamp <= b.timestamp + upperBound
其中l(wèi)ower bound,upperBound可設(shè)置為正值稳析,負值洗做,0
-
關(guān)聯(lián)條件的含義
如a.id = b.id and b.timestamp - 1 minutes <= a.timestamp and a.timestamp <= b.timestamp + 2 minutes 即表明兩條流中的兩條消息如果滿足a.id = b.id 并且兩條消息的時間戳滿足a.timestamp在[b.timestamp-1minute, b.timestamp + 2 minutes] 之間,則兩條消息應(yīng)當發(fā)生關(guān)聯(lián)
Interval Join的實現(xiàn)
Interval join的實現(xiàn)基本邏輯比較簡單彰居,主要依靠TimeBoundedStreamJoin完成消息的關(guān)聯(lián)诚纸,其核心邏輯主要包含消息的緩存,不同關(guān)聯(lián)類型的處理陈惰,消息的清理畦徘,但實現(xiàn)起來并不簡單,下面基于eventTime分別對以上進行分析:
由于Flink對于流關(guān)聯(lián)的處理邏輯是對于兩條流的消息分別處理抬闯,但兩條流的處理方式是完全一致的井辆,一下基于第一條流(左流)進行分析
假定左流中的消息l如a,b,2019-07-22 00:00:00
,左流的可容忍亂序時間OutOfOrder時間設(shè)置為10s,其中第三個字段為時間戳字段
-
更新當前的leftOperatorTime和rightOperatorTime值溶握,更新其值為當前應(yīng)用的CombineWatermark掘剪,當前應(yīng)用watermark的獲取方式如下:
- 兩條流會分別基于接收到的消息計算每條流獨立的watermark1,watermark2
- 選取較小的watermark作為應(yīng)用的CombineWatermark = min(watermark1,watermark2)
-
找出消息時間戳,并結(jié)算右表中的能關(guān)聯(lián)的時間戳范圍
val leftRow = cRowValue.row val timeForLeftRow: Long = getTimeForLeftStream(ctx, leftRow) val rightQualifiedLowerBound: Long = timeForLeftRow - rightRelativeSize // 2019-07-21 23:58:00.999 val rightQualifiedUpperBound: Long = timeForLeftRow + leftRelativeSize // 2019-07-22 00:01:00.000 表名右流中的消息奈虾,如果id滿足需求,當其時間戳在[rightQualifiedLowerBound,rightQualifiedUpperBound]范圍內(nèi)時廉赔,將可以與左表發(fā)生關(guān)聯(lián)
-
將消息l與右表中的消息關(guān)聯(lián)肉微,并緩存l:
當rightExpirationTime < rightQualifiedUpperBound時,將右表中的數(shù)據(jù)取出蜡塌,判斷是否可以與消息l發(fā)生關(guān)聯(lián):
-
首先計算新的rightExpirationTime:
rightExpirationTime = leftOperatorTime - rightRelativeSize - allowedLateness - 1
其中rightExpirationTime表名由流中的有效消息碉纳,當右流中的消息m的時間戳小于rightExpirationTime時,表示不會再有左流中的消息可以與m發(fā)生關(guān)聯(lián)馏艾,及m消息可以被清理
-
遍歷rightCache劳曹,完成關(guān)聯(lián)
其中rightCache的數(shù)據(jù)結(jié)構(gòu)為MapState[Long, JList[JTuple2[Row, Boolean]]] key為時間戳,value為對應(yīng)時間戳的所有消息組成的List,其中List的元素為消息值和該消息是否被關(guān)聯(lián)過的標記組成的tuple:
遍歷rightCache琅摩,完成如下操作:
if( 當其key值也就是時間戳rightTime滿足時間的條件時) { 遍歷其對應(yīng)的List,將消息值與l完成關(guān)聯(lián)并輸出铁孵,將其關(guān)聯(lián)標記設(shè)置為true } // 清理右表中已經(jīng)不可能和左表中數(shù)據(jù)發(fā)生關(guān)聯(lián)的消息 if (當rightTime <= rightExpirationTime){ if (如果是right outer join或者full outer join) { 遍歷JList,如果消息的關(guān)聯(lián)標記為false,根據(jù)關(guān)聯(lián)條件補齊空字段,并輸出 } 移除List }
-
緩存消息l
if (rightOperatorTime < rightQualifiedUpperBound){ //表明消息l可能與后續(xù)右表中的消息發(fā)生關(guān)聯(lián)房资,需要緩存消息l 1. 在leftCache中緩存消息l 2. 注冊清理器TimerHeapInternalTimer(timeForLeftRow,...)蜕劝,異步完成消息的清理,(清理器的觸發(fā)由更新combinedwatermark時轰异,當combinedwatermark>TimerHeapInternalTimer.timestamp將會觸發(fā)清理器工作岖沛,和核心工作邏輯在TimeBoundedStreamJoin#onTimer方法中) }else{ // 即該消息不需要緩存用于與右側(cè)表的關(guān)聯(lián) if (left outer join 或者 full outer join ) && if (消息未被關(guān)聯(lián)過){ 根據(jù)關(guān)聯(lián)條件補齊空字段并輸出 } }
-
Interval join 總結(jié)
- Flink的流關(guān)聯(lián)當前只能支持兩條流的關(guān)聯(lián)
- Flink同時支持基于EventTime和ProcessingTime的流流join
- Interval join 已經(jīng)支持inner ,left outer, right outer , full outer 等類型的join,由此來看官網(wǎng)對interval join類型支持的說明不夠準確搭独。
- 當前版本Interval join的兩條流的消息清理是基于兩條流共有的combinedWatermark(較小的流的watermark)
- 流的watermark不會用于將消息
直接
過濾掉婴削,即時消息在本流中的watermark表示中已經(jīng)遲到,但會直接將遲到的消息根據(jù)相應(yīng)的join類型或輸出或丟棄