在flink雙流Time-windowed Joins的主要實現(xiàn)是在TimeBoundedStreamJoin中智嚷,這個類里面的變量非常的多,所以首先要清楚糠悯,這些重要變量或者概念的計算過程帮坚。簡單的說整個join過程就是把左流的數(shù)據(jù)和右流的數(shù)據(jù)都通過state保存起來妻往,左流有新的數(shù)據(jù)到,就會根據(jù)key去遍歷右流state中的數(shù)據(jù)试和,符合關聯(lián)條件就輸出讯泣,關聯(lián)不上的就保存在左流的state中等待右流數(shù)據(jù)的遍歷,反之亦然阅悍。另外會對每個流計算過期時間好渠,以及每個數(shù)據(jù)的清理時間。本文主要根據(jù)代碼的實現(xiàn)過程對清理機制做一個分步的演算节视。
基本公式
建表語句
CREATE TABLE LeftTable (
l_id STRING,
l_imsi STRING,
l_time TIMESTAMP(3),
WATERMARK FOR l_time AS l_time - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'foo',
'connector.properties.zookeeper.connect' = 'hostA:2181',
'connector.properties.bootstrap.servers' = 'hostA:6667',
'connector.properties.group.id' = 'tg1',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'csv',
'format.field-delimiter' = ','
)
CREATE TABLE RightTable (
r_id STRING,
r_location STRING,
r_time TIMESTAMP(3),
WATERMARK FOR r_time AS r_time - INTERVAL '2' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'bar',
'connector.properties.zookeeper.connect' = 'hostA:2181',
'connector.properties.bootstrap.servers' = 'hostA:6667',
'connector.properties.group.id' = 'tg1',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'csv',
'format.field-delimiter' = ','
)
執(zhí)行SQL
SELECT l_id, l_imsi, r_location
FROM LeftTable
LEFT JOIN RightTable
on l_id = r_id
and r_time >= l_time - INTERVAL '4' SECOND AND r_time <= l_time + INTERVAL '6' SECOND
即 upperBound = 6
和lowerNound = -4
那么
leftRelativeSize = -leftLowerBound = -(-upperBound) = upperBound = 6
rightRelativeSize = leftUpperBound = -lowerBound = 4
leftExpirationTime = wm - upperBound - 0.001
= wm - 6 - 0.001
= wm - 6.001
rightExpirationTime = wm + lowerBound - 0.001
= wm - 4 - 0.001
= wm - 4.001
leftRowCleanUpTime = rowTime + leftRelativeSize + minCleanUpInterval + allowedLateness + 0.001
= rowTime + 6 + (6+4)/2 + 0.001
= rowTime + 11.001
rightRowCleanUpTime = rowTime + rightRelativeSize + minCleanUpInterval + allowedLateness + 0.001
= rowTime + 4 + (6+4)/2 + 0.001
= rowTime + 9.001
rightOperatorTime = leftOperatorTime = wm = min(leftWatermark, rightWatermark)
allowedLateness = 0 //忽略不計
根據(jù)上面的公式進行模擬計算
順序 | 來源 | 數(shù)據(jù) | wm | ExpirationTime | RowCleanUpTime | 結果 |
---|---|---|---|---|---|---|
1 | Left | 1,111,2020-01-01 10:10:16 | 0 | left=0, right=-4001 | 10:10:27.001 | |
2 | Right | 2,B,2020-01-01 10:10:20 | 10:10:11 | left=-6001, right=-4001 | 10:10:29.001 | |
3 | Left | 2,222,2020-01-01 10:10:22 | 10:10:17 | left=-6001, right=10:10:06.999 | 10:10:33.001 | join輸出2,222,B |
4 | Left | 4,4444,2020-01-01 10:10:35 | 10:10:18 | left=-6001, right=10:10:12.999 | 10:10:46.001 | |
5 | Right | 4,D,2020-01-01 10:10:29 | 10:10:27 | left=10:10:11.999, right=10:10:12.999 | 10:10:38.001 | |
6-1 | Right | 5,E,2020-01-01 10:10:30 | 10:10:28 | left=10:10:20.999, right=10:10:12.999 | 10:10:39.001 | |
wm超過第一條數(shù)據(jù)的CleanUpTime拳锚,觸發(fā)定時器 | 10:10:28 | left=10:10:21.999, right=10:10:12.999 | 刪除第一條數(shù)據(jù),因為是left join所以輸出1,111, | |||
6-2 | Right | 1,A,2020-01-01 10:10:17 | 10:10:27 | left=10:10:20.999, right=10:10:12.999 | 可以和第一條數(shù)據(jù)join上肴茄,所以輸出1,111,A但是第一條數(shù)據(jù)的時間戳已經小于leftExpirationTime說明已經過期晌畅,同時在緩存中刪除第一條數(shù)據(jù),但是這時wm并沒有超過第一條數(shù)據(jù)的CleanUpTime寡痰,不會觸發(fā)清理的定時器 | |
6-3 | Right | 1,A,2020-01-01 10:10:30 | 10:10:28 | left=10:10:20.999, right=10:10:12.999 | 不能和第一條數(shù)據(jù)join上抗楔,所以輸出1,111,但是第一條數(shù)據(jù)的時間戳已經小于leftExpirationTime說明已經過期,同時在緩存中刪除第一條數(shù)據(jù) | |
wm超過第一條數(shù)據(jù)的CleanUpTime拦坠,觸發(fā)定時器 | 10:10:28 | left=10:10:21.999, right=10:10:12.999 | 第一條數(shù)據(jù)已經刪除连躏,定時器不需要做其他操作 |
總結
每次計算ExpirationTime的時候用的是上一次的wm;
6-1贞滨,6-2入热,6-3為三種獨立可能觸發(fā)刪除過期數(shù)據(jù)的場景;
通過上面的測試可以發(fā)現(xiàn)晓铆,數(shù)據(jù)即便過期了勺良,但是沒有到清理時間,如果這時候有符合關聯(lián)條件的數(shù)據(jù)還是可以關聯(lián)上的骄噪,例如6-2場景尚困。
以上數(shù)據(jù)基于flink 1.10.0版本blink planner進行測試。