Flink 流流關(guān)聯(lián)( Interval Join)總結(jié)

Flink對流流JOIN的支持

Flink對于join的支持有多種支持谭跨,可參考 Flink Join類型, 本文主要討論Time interval join支持Table API的雙流join让簿,同時支持基于EventTime 和 ProcessingTime的的流流join。 Flink在TableApi中將流作為表使用,下文也不再區(qū)分流和表笑窜。

Flink對于interval join的支持從1.4版本開始,直到Flink1.6登疗,經(jīng)過幾個版本的增強排截,形成了從開始的Table/Sql Api的支持,到后續(xù)DataStream Api的支持辐益,從開始的inner join 到后來的left outer断傲,right outer, full outerjoin的支持,算是完成了FLink對雙流關(guān)聯(lián)的支持智政,不同版本的功能支持如下:

Flink版本 Join支持類型 join API
1.4 inner Table/SQL
1.5 inner,left,right,full Table/SQL
1.6 inner,left,right,full Table/SQL/DataStream

從官方給出的Release Note來看认罩,F(xiàn)link1.4,F(xiàn)link1.5中的雙流join是指windowed join续捂,但從官方給出的樣例以及源碼來看猜年,此處的Windowed Join 應(yīng)當指的就是interval join;鑒于Flink版本近期的變更較大,筆者不再在原有老版本中測試相關(guān)功能疾忍,下文的介紹基于Flink最新release版本1.8

關(guān)于Interval Join

在流與流的join中乔外,與其他window join相比,window中的關(guān)聯(lián)通常是兩個流中對應(yīng)的window中的消息可以發(fā)生關(guān)聯(lián)一罩, 不能跨window杨幼,Interval Join則沒有window的概念,直接用時間戳作為關(guān)聯(lián)的條件聂渊,更具表達力差购。由于流消息的無限性以及消息亂序的影響,本應(yīng)關(guān)聯(lián)上的消息可能進入處理系統(tǒng)的時間有較大差異汉嗽,一條流中的消息欲逃,可能需要和另一條流的多條消息關(guān)聯(lián),因此流流關(guān)聯(lián)時饼暑,通常需要類似如下關(guān)聯(lián)條件:

1. 等值條件如 a.id = b.id
2. 時間戳范圍條件 : a.timestamp ∈ [b.timestamp + lowerBound; b.timestamp + upperBound]  b.timestamp + lowerBound <= a.timestamp and a.timestamp <= b.timestamp + upperBound

其中l(wèi)ower bound,upperBound可設(shè)置為正值稳析,負值洗做,0

  • 關(guān)聯(lián)條件的含義

    如a.id = b.id and b.timestamp - 1 minutes <= a.timestamp and a.timestamp <= b.timestamp + 2 minutes 即表明兩條流中的兩條消息如果滿足a.id = b.id 并且兩條消息的時間戳滿足a.timestamp在[b.timestamp-1minute, b.timestamp + 2 minutes] 之間,則兩條消息應(yīng)當發(fā)生關(guān)聯(lián)

Interval Join的實現(xiàn)

Interval join的實現(xiàn)基本邏輯比較簡單彰居,主要依靠TimeBoundedStreamJoin完成消息的關(guān)聯(lián)诚纸,其核心邏輯主要包含消息的緩存,不同關(guān)聯(lián)類型的處理陈惰,消息的清理畦徘,但實現(xiàn)起來并不簡單,下面基于eventTime分別對以上進行分析:

由于Flink對于流關(guān)聯(lián)的處理邏輯是對于兩條流的消息分別處理抬闯,但兩條流的處理方式是完全一致的井辆,一下基于第一條流(左流)進行分析

假定左流中的消息l如a,b,2019-07-22 00:00:00,左流的可容忍亂序時間OutOfOrder時間設(shè)置為10s,其中第三個字段為時間戳字段

  1. 更新當前的leftOperatorTime和rightOperatorTime值溶握,更新其值為當前應(yīng)用的CombineWatermark掘剪,當前應(yīng)用watermark的獲取方式如下:

    1. 兩條流會分別基于接收到的消息計算每條流獨立的watermark1,watermark2
    2. 選取較小的watermark作為應(yīng)用的CombineWatermark = min(watermark1,watermark2)
  2. 找出消息時間戳,并結(jié)算右表中的能關(guān)聯(lián)的時間戳范圍

    val leftRow = cRowValue.row
    val timeForLeftRow: Long = getTimeForLeftStream(ctx, leftRow)
    val rightQualifiedLowerBound: Long = timeForLeftRow - rightRelativeSize // 2019-07-21 23:58:00.999
    val rightQualifiedUpperBound: Long = timeForLeftRow + leftRelativeSize // 2019-07-22 00:01:00.000
    表名右流中的消息奈虾,如果id滿足需求,當其時間戳在[rightQualifiedLowerBound,rightQualifiedUpperBound]范圍內(nèi)時廉赔,將可以與左表發(fā)生關(guān)聯(lián)
    
  3. 將消息l與右表中的消息關(guān)聯(lián)肉微,并緩存l:

    當rightExpirationTime < rightQualifiedUpperBound時,將右表中的數(shù)據(jù)取出蜡塌,判斷是否可以與消息l發(fā)生關(guān)聯(lián):

    1. 首先計算新的rightExpirationTime:

      rightExpirationTime = leftOperatorTime - rightRelativeSize - allowedLateness - 1

      其中rightExpirationTime表名由流中的有效消息碉纳,當右流中的消息m的時間戳小于rightExpirationTime時,表示不會再有左流中的消息可以與m發(fā)生關(guān)聯(lián)馏艾,及m消息可以被清理

    2. 遍歷rightCache劳曹,完成關(guān)聯(lián)

       其中rightCache的數(shù)據(jù)結(jié)構(gòu)為MapState[Long, JList[JTuple2[Row, Boolean]]]
       key為時間戳,value為對應(yīng)時間戳的所有消息組成的List,其中List的元素為消息值和該消息是否被關(guān)聯(lián)過的標記組成的tuple:
      

      遍歷rightCache琅摩,完成如下操作:

       if( 當其key值也就是時間戳rightTime滿足時間的條件時) {
          遍歷其對應(yīng)的List,將消息值與l完成關(guān)聯(lián)并輸出铁孵,將其關(guān)聯(lián)標記設(shè)置為true 
       }
           
       // 清理右表中已經(jīng)不可能和左表中數(shù)據(jù)發(fā)生關(guān)聯(lián)的消息
       if (當rightTime <= rightExpirationTime){
           if (如果是right outer join或者full outer join) {
               遍歷JList,如果消息的關(guān)聯(lián)標記為false,根據(jù)關(guān)聯(lián)條件補齊空字段,并輸出
           }
           移除List
       }
      
    3. 緩存消息l

       if (rightOperatorTime < rightQualifiedUpperBound){
           //表明消息l可能與后續(xù)右表中的消息發(fā)生關(guān)聯(lián)房资,需要緩存消息l
           1. 在leftCache中緩存消息l
           2. 注冊清理器TimerHeapInternalTimer(timeForLeftRow,...)蜕劝,異步完成消息的清理,(清理器的觸發(fā)由更新combinedwatermark時轰异,當combinedwatermark>TimerHeapInternalTimer.timestamp將會觸發(fā)清理器工作岖沛,和核心工作邏輯在TimeBoundedStreamJoin#onTimer方法中)
       }else{
           // 即該消息不需要緩存用于與右側(cè)表的關(guān)聯(lián)
           if (left outer join 或者 full outer join ) && if (消息未被關(guān)聯(lián)過){
               根據(jù)關(guān)聯(lián)條件補齊空字段并輸出
           }
       }
      

Interval join 總結(jié)

  • Flink的流關(guān)聯(lián)當前只能支持兩條流的關(guān)聯(lián)
  • Flink同時支持基于EventTime和ProcessingTime的流流join
  • Interval join 已經(jīng)支持inner ,left outer, right outer , full outer 等類型的join,由此來看官網(wǎng)對interval join類型支持的說明不夠準確搭独。
  • 當前版本Interval join的兩條流的消息清理是基于兩條流共有的combinedWatermark(較小的流的watermark)
  • 流的watermark不會用于將消息直接過濾掉婴削,即時消息在本流中的watermark表示中已經(jīng)遲到,但會直接將遲到的消息根據(jù)相應(yīng)的join類型或輸出或丟棄
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末牙肝,一起剝皮案震驚了整個濱河市唉俗,隨后出現(xiàn)的幾起案子嗤朴,更是在濱河造成了極大的恐慌,老刑警劉巖互躬,帶你破解...
    沈念sama閱讀 218,941評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件播赁,死亡現(xiàn)場離奇詭異,居然都是意外死亡吼渡,警方通過查閱死者的電腦和手機容为,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,397評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來寺酪,“玉大人坎背,你說我怎么就攤上這事〖娜福” “怎么了得滤?”我有些...
    開封第一講書人閱讀 165,345評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長盒犹。 經(jīng)常有香客問我懂更,道長,這世上最難降的妖魔是什么急膀? 我笑而不...
    開封第一講書人閱讀 58,851評論 1 295
  • 正文 為了忘掉前任沮协,我火速辦了婚禮,結(jié)果婚禮上卓嫂,老公的妹妹穿的比我還像新娘慷暂。我一直安慰自己,他們只是感情好晨雳,可當我...
    茶點故事閱讀 67,868評論 6 392
  • 文/花漫 我一把揭開白布行瑞。 她就那樣靜靜地躺著,像睡著了一般餐禁。 火紅的嫁衣襯著肌膚如雪血久。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,688評論 1 305
  • 那天帮非,我揣著相機與錄音洋魂,去河邊找鬼。 笑死喜鼓,一個胖子當著我的面吹牛副砍,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播庄岖,決...
    沈念sama閱讀 40,414評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼豁翎,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了隅忿?” 一聲冷哼從身側(cè)響起心剥,我...
    開封第一講書人閱讀 39,319評論 0 276
  • 序言:老撾萬榮一對情侶失蹤邦尊,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后优烧,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蝉揍,經(jīng)...
    沈念sama閱讀 45,775評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年畦娄,在試婚紗的時候發(fā)現(xiàn)自己被綠了又沾。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,096評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡熙卡,死狀恐怖杖刷,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情驳癌,我是刑警寧澤滑燃,帶...
    沈念sama閱讀 35,789評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站颓鲜,受9級特大地震影響表窘,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜甜滨,卻給世界環(huán)境...
    茶點故事閱讀 41,437評論 3 331
  • 文/蒙蒙 一乐严、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧艳吠,春花似錦、人聲如沸孽椰。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,993評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽黍匾。三九已至栏渺,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間锐涯,已是汗流浹背磕诊。 一陣腳步聲響...
    開封第一講書人閱讀 33,107評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留纹腌,地道東北人霎终。 一個月前我還...
    沈念sama閱讀 48,308評論 3 372
  • 正文 我出身青樓,卻偏偏與公主長得像升薯,于是被迫代替她去往敵國和親莱褒。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,037評論 2 355

推薦閱讀更多精彩內(nèi)容

  • pyspark.sql模塊 模塊上下文 Spark SQL和DataFrames的重要類: pyspark.sql...
    mpro閱讀 9,457評論 0 13
  • 在流處理系統(tǒng)中涎劈,通常使用基于ProcessTime 广凸,EventTime阅茶,Ingestion Time的消息處理模...
    WestC閱讀 2,140評論 0 4
  • 二胎脸哀,不是由來已久,也非意料之外扭吁。 記得有一次撞蜂,下班路上,斜坐在紅色雙龍的后排座上智末,左右兩排的綠蔭從車窗飛馳而過谅摄,...
    射手座彼岸心閱讀 447評論 2 4
  • 或許在我們心中送漠,都對那些優(yōu)秀的人心生向往。年少時懷著一份羞澀由蘑,不敢走近闽寡,遠遠的觀望就已足夠;慢慢長大后尼酿,各自一方爷狈,...
    心里有很多事閱讀 381評論 0 0
  • 高考早已離我甚遠,結(jié)果雖讓我滿意裳擎,但與心儀的大學(xué)失之交臂仍是遺憾涎永。不愿重新開始的我,決定在另一個大學(xué)開始我的征程鹿响。...
    禰錦閱讀 171評論 2 1