以下文章全部基于 Flink 1.14
StreamingJoinOperator.java
debug 代碼可以看到 Regular Join
邏輯實現在 StreamingJoinOperator.java
(幾種 Regular Join
都是 )
regular-join.jpg
processElement 注釋偽代碼
if input record is accumulate
| if input side is outer
| | if there is no matched rows on the other side, send +I[record+null], state.add(record, 0)
| | if there are matched rows on the other side
| | | if other side is outer
| | | | if the matched num in the matched rows == 0, send -D[null+other]
| | | | if the matched num in the matched rows > 0, skip
| | | | otherState.update(other, old + 1)
| | | endif
| | | send +I[record+other]s, state.add(record, other.size)
| | endif
| endif
| if input side not outer
| | state.add(record)
| | if there is no matched rows on the other side, skip
| | if there are matched rows on the other side
| | | if other side is outer
| | | | if the matched num in the matched rows == 0, send -D[null+other]
| | | | if the matched num in the matched rows > 0, skip
| | | | otherState.update(other, old + 1)
| | | | send +I[record+other]s
| | | else
| | | | send +I/+U[record+other]s (using input RowKind)
| | | endif
| | endif
| endif
endif
if input record is retract
| state.retract(record)
| if there is no matched rows on the other side
| | if input side is outer, send -D[record+null]
| endif
| if there are matched rows on the other side, send -D[record+other]s if outer, send -D/-U[record+other]s if inner.
| | if other side is outer
| | | if the matched num in the matched rows == 0, this should never happen!
| | | if the matched num in the matched rows == 1, send +I[null+other]
| | | if the matched num in the matched rows > 1, skip
| | | otherState.update(other, old - 1)
| | endif
| endif
endif
數據
order_log
order_id | movie_id | order_timestamp |
---|---|---|
1 | 1 | 2021-12-25 00:00:00 |
2 | 2 | 2021-12-25 00:01:00 |
3 | 3 | 2021-12-25 00:02:00 |
price_log
order_id | set_price | price_timestamp |
---|---|---|
1 | 40 | 2021-12-25 00:00:01 |
3 | 80 | 2021-12-25 00:02:00 |
Left Join
Left Join Demo
INSERT INTO print
SELECT
o.order_id
,movie_id
,seat_price
,o.timestamp
FROM order_log o
LEFT JOIN price_log p ON o.order_id = p.order_id
Left Join 過程
- 左表來一條數據蚀同,與右表逐個比較進行關聯
- 如果能關聯上缘缚,發(fā)送
+I[left_record, matched_right_record]
鹤耍,并將(left_record, matched_right_record_number)
保存到左表狀態(tài)中 (以供后續(xù) join 使用) - 如果不能關聯上迄埃,發(fā)送
+I[left_record, null]
湘捎,并將(left_record, 0)
保存到左表狀態(tài)中
- 如果能關聯上缘缚,發(fā)送
- 右表來一條數據,與左表逐個比較進行關聯,不論是否關聯到钱豁,都將右表數據保存到右表狀態(tài)中
- 如果能關聯上
- 如果關聯到左表數據的
numOfAssociations
等于0 ,則發(fā)送-D[matched_left_record, null]
疯汁,更新左表狀態(tài)(matched_left_record, numOfAssociations + 1)
牲尺,發(fā)送+I[matched_left_record, right_record]
(把之前沒關聯到右表數據的left_record
撤回,把關聯到的最新結果下發(fā)) - 如果關聯到左表數據的
numOfAssociations
大于0幌蚊,更新左表狀態(tài)(matched_left_record, numOfAssociations + 1)
谤碳,發(fā)送+I[matched_left_record, right_record]
- 如果關聯到左表數據的
- 否則什么也不做
- 如果能關聯上
Inner Join
Inner Join Demo
INSERT INTO print
SELECT
o.order_id
,movie_id
,seat_price
,o.timestamp
FROM order_log o
INNER JOIN price_log p ON o.order_id = p.order_id
Inner Join 過程
- 左表來一條數據,與右邊逐個比較進行關聯溢豆,將左表數據保存到狀態(tài)中
- 如果不能關聯上蜒简,什么都不做
- 如果能關聯上,發(fā)送
+I[left_record, matched_right_record]
- 右表來一條數據漩仙,與左邊逐個比較進行關聯搓茬,將右表數據保存到狀態(tài)中,
- 如果不能關聯上讯赏,什么都不做
- 如果能關聯上垮兑,發(fā)送
+I[matched_left_record, reight_record]
Full Outer Join
Full Outer Join Demo
INSERT INTO print
SELECT
o.order_id
,movie_id
,seat_price
,o.timestamp
FROM order_log o
FULL OUTER JOIN price_log p ON o.order_id = p.order_id
Full Outer Join 過程
-
左表來一條數據,與右表逐個比較關聯
- 如果不能關聯上漱挎,發(fā)送
+I[left_record, null]
系枪,并將(left_record, 0)
保存到左表狀態(tài)中 - 如果能關聯上
- 如果關聯到的右表數據的
numOfAssociations
等于0,則發(fā)送-D[null, matched_right_record]
磕谅,更新右表狀態(tài)(matched_right_record, numOfAssociations + 1)
私爷,發(fā)送+I[left_record, matched_right_record]
,更新左表狀態(tài)(left_record, matched_right_record_number)
- 如果關聯到右表數據的
numOfAssociations
不等于0膊夹,更新右表狀態(tài)(matched_right_record, numOfAssociations + 1)
衬浑,發(fā)送+I[left_record, matched_right_record]
,更新左表狀態(tài)(left_record, matched_right_record_number)
- 如果關聯到的右表數據的
- 如果不能關聯上漱挎,發(fā)送
-
右表來一條數據放刨,與左表逐個比較關聯
- 如果不能關聯上工秩,發(fā)送
+I[null, right_record]
,并將(right_record, 0)
保存到右表狀態(tài)中 - 如果能關聯上
- 如果關聯到的左表數據的
numOfAssociations
等于0进统,則發(fā)送-D[matched_left_record, null]
助币,更新左表狀態(tài)(matched_left_record, numOfAssociations + 1)
,發(fā)送+I[matched_left_record, right_record]
螟碎,更新右表狀態(tài)(right_record, matched_left_record_number)
- 如果關聯到的左表數據的
numOfAssociations
不等于0眉菱,更新左表狀態(tài)(matched_left_record, numOfAssociations + 1)
,發(fā)送+I[matched_left_record, right_record]
掉分,更新右表狀態(tài)(right_record, matched_left_record_number)
- 如果關聯到的左表數據的
- 如果不能關聯上工秩,發(fā)送