Flink Time-windowed Joins過期數(shù)據(jù)清理機制分析

在flink雙流Time-windowed Joins的主要實現(xiàn)是在TimeBoundedStreamJoin中智嚷,這個類里面的變量非常的多,所以首先要清楚糠悯,這些重要變量或者概念的計算過程帮坚。簡單的說整個join過程就是把左流的數(shù)據(jù)和右流的數(shù)據(jù)都通過state保存起來妻往,左流有新的數(shù)據(jù)到,就會根據(jù)key去遍歷右流state中的數(shù)據(jù)试和,符合關聯(lián)條件就輸出讯泣,關聯(lián)不上的就保存在左流的state中等待右流數(shù)據(jù)的遍歷,反之亦然阅悍。另外會對每個流計算過期時間好渠,以及每個數(shù)據(jù)的清理時間。本文主要根據(jù)代碼的實現(xiàn)過程對清理機制做一個分步的演算节视。

基本公式

建表語句

CREATE TABLE LeftTable (
  l_id STRING, 
  l_imsi STRING, 
  l_time TIMESTAMP(3), 
  WATERMARK FOR l_time AS l_time - INTERVAL '5' SECOND 
) WITH ( 
  'connector.type' = 'kafka',   
  'connector.version' = 'universal',  
  'connector.topic' = 'foo',  
  'connector.properties.zookeeper.connect' = 'hostA:2181',
  'connector.properties.bootstrap.servers' = 'hostA:6667',  
  'connector.properties.group.id' = 'tg1',   
  'connector.startup-mode' = 'latest-offset', 
  'format.type' = 'csv',   
  'format.field-delimiter' = ','
  )
CREATE TABLE RightTable (
  r_id STRING, 
  r_location STRING, 
  r_time TIMESTAMP(3), 
  WATERMARK FOR r_time AS r_time - INTERVAL '2' SECOND 
) WITH (  
  'connector.type' = 'kafka',   
  'connector.version' = 'universal',  
  'connector.topic' = 'bar',  
  'connector.properties.zookeeper.connect' = 'hostA:2181',
  'connector.properties.bootstrap.servers' = 'hostA:6667',
  'connector.properties.group.id' = 'tg1',   
  'connector.startup-mode' = 'latest-offset', 
  'format.type' = 'csv',   
  'format.field-delimiter' = ',' 
)

執(zhí)行SQL

SELECT l_id, l_imsi, r_location 
FROM LeftTable 
LEFT JOIN RightTable 
on l_id = r_id 
and r_time >= l_time - INTERVAL '4' SECOND AND r_time <= l_time + INTERVAL '6' SECOND

upperBound = 6lowerNound = -4

那么

leftRelativeSize = -leftLowerBound = -(-upperBound) = upperBound = 6

rightRelativeSize = leftUpperBound = -lowerBound = 4

leftExpirationTime = wm - upperBound - 0.001 
                   = wm - 6 - 0.001 
                   = wm - 6.001

rightExpirationTime = wm + lowerBound - 0.001 
                    = wm - 4 - 0.001 
                    = wm - 4.001
  
leftRowCleanUpTime = rowTime + leftRelativeSize + minCleanUpInterval + allowedLateness + 0.001 
                   = rowTime + 6 + (6+4)/2 + 0.001 
                   = rowTime + 11.001
                   
rightRowCleanUpTime = rowTime + rightRelativeSize + minCleanUpInterval + allowedLateness + 0.001 
                    = rowTime + 4 + (6+4)/2 + 0.001 
                    = rowTime + 9.001

rightOperatorTime = leftOperatorTime = wm = min(leftWatermark, rightWatermark)
  
allowedLateness = 0 //忽略不計

根據(jù)上面的公式進行模擬計算

順序 來源 數(shù)據(jù) wm ExpirationTime RowCleanUpTime 結果
1 Left 1,111,2020-01-01 10:10:16 0 left=0, right=-4001 10:10:27.001
2 Right 2,B,2020-01-01 10:10:20 10:10:11 left=-6001, right=-4001 10:10:29.001
3 Left 2,222,2020-01-01 10:10:22 10:10:17 left=-6001, right=10:10:06.999 10:10:33.001 join輸出2,222,B
4 Left 4,4444,2020-01-01 10:10:35 10:10:18 left=-6001, right=10:10:12.999 10:10:46.001
5 Right 4,D,2020-01-01 10:10:29 10:10:27 left=10:10:11.999, right=10:10:12.999 10:10:38.001
6-1 Right 5,E,2020-01-01 10:10:30 10:10:28 left=10:10:20.999, right=10:10:12.999 10:10:39.001
wm超過第一條數(shù)據(jù)的CleanUpTime拳锚,觸發(fā)定時器 10:10:28 left=10:10:21.999, right=10:10:12.999 刪除第一條數(shù)據(jù),因為是left join所以輸出1,111,
6-2 Right 1,A,2020-01-01 10:10:17 10:10:27 left=10:10:20.999, right=10:10:12.999 可以和第一條數(shù)據(jù)join上肴茄,所以輸出1,111,A但是第一條數(shù)據(jù)的時間戳已經小于leftExpirationTime說明已經過期晌畅,同時在緩存中刪除第一條數(shù)據(jù),但是這時wm并沒有超過第一條數(shù)據(jù)的CleanUpTime寡痰,不會觸發(fā)清理的定時器
6-3 Right 1,A,2020-01-01 10:10:30 10:10:28 left=10:10:20.999, right=10:10:12.999 不能和第一條數(shù)據(jù)join上抗楔,所以輸出1,111,但是第一條數(shù)據(jù)的時間戳已經小于leftExpirationTime說明已經過期,同時在緩存中刪除第一條數(shù)據(jù)
wm超過第一條數(shù)據(jù)的CleanUpTime拦坠,觸發(fā)定時器 10:10:28 left=10:10:21.999, right=10:10:12.999 第一條數(shù)據(jù)已經刪除连躏,定時器不需要做其他操作

總結

每次計算ExpirationTime的時候用的是上一次的wm;
6-1贞滨,6-2入热,6-3為三種獨立可能觸發(fā)刪除過期數(shù)據(jù)的場景;
通過上面的測試可以發(fā)現(xiàn)晓铆,數(shù)據(jù)即便過期了勺良,但是沒有到清理時間,如果這時候有符合關聯(lián)條件的數(shù)據(jù)還是可以關聯(lián)上的骄噪,例如6-2場景尚困。

以上數(shù)據(jù)基于flink 1.10.0版本blink planner進行測試。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末链蕊,一起剝皮案震驚了整個濱河市事甜,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌滔韵,老刑警劉巖逻谦,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異陪蜻,居然都是意外死亡邦马,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來勇婴,“玉大人忱嘹,你說我怎么就攤上這事「剩” “怎么了拘悦?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長橱脸。 經常有香客問我础米,道長,這世上最難降的妖魔是什么添诉? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任屁桑,我火速辦了婚禮,結果婚禮上栏赴,老公的妹妹穿的比我還像新娘蘑斧。我一直安慰自己,他們只是感情好须眷,可當我...
    茶點故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布竖瘾。 她就那樣靜靜地躺著,像睡著了一般花颗。 火紅的嫁衣襯著肌膚如雪捕传。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天扩劝,我揣著相機與錄音庸论,去河邊找鬼。 笑死棒呛,一個胖子當著我的面吹牛聂示,可吹牛的內容都是我干的。 我是一名探鬼主播簇秒,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼鱼喉,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了宰睡?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤气筋,失蹤者是張志新(化名)和其女友劉穎拆内,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體宠默,經...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡麸恍,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片抹沪。...
    茶點故事閱讀 38,117評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡刻肄,死狀恐怖,靈堂內的尸體忽然破棺而出融欧,到底是詐尸還是另有隱情敏弃,我是刑警寧澤,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布噪馏,位于F島的核電站麦到,受9級特大地震影響,放射性物質發(fā)生泄漏欠肾。R本人自食惡果不足惜瓶颠,卻給世界環(huán)境...
    茶點故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望刺桃。 院中可真熱鬧粹淋,春花似錦、人聲如沸瑟慈。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽封豪。三九已至谴轮,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間吹埠,已是汗流浹背第步。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留缘琅,地道東北人粘都。 一個月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓,卻偏偏與公主長得像刷袍,于是被迫代替她去往敵國和親翩隧。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,877評論 2 345

推薦閱讀更多精彩內容