一犯眠、核心設(shè)計(jì)
CDC2.4版本支持先同步全量數(shù)據(jù)(Snapshot階段)后自動(dòng)切換讀取增量數(shù)據(jù)(Binlog階段)
1雌团、切片劃分
全量階段數(shù)據(jù)讀取方式為分布式讀取瞳别,會(huì)先對(duì)當(dāng)前表數(shù)據(jù)按主鍵劃分成多個(gè)Chunk麦牺,后續(xù)子任務(wù)讀取Chunk 區(qū)間內(nèi)的數(shù)據(jù)丈屹。根據(jù)主鍵列是否為自增整數(shù)類型酪我,對(duì)表數(shù)據(jù)劃分為均勻分布的Chunk及非均勻分布的Chunk棕洋。
1.1椅挣、均勻分布
主鍵列自增且類型為整數(shù)類型(int,bigint,decimal)遂庄。查詢出主鍵列的最小值寥院,最大值,按 chunkSize 大小將數(shù)據(jù)均勻劃分涛目,因?yàn)橹麈I為整數(shù)類型秸谢,根據(jù)當(dāng)前chunk 起始位置、chunkSize大小霹肝,直接計(jì)算chunk 的結(jié)束位置估蹄。
// 計(jì)算主鍵列數(shù)據(jù)區(qū)間
select min(`order_id`), max(`order_id`) from demo_orders;
// 將數(shù)據(jù)劃分為 chunkSize 大小的切片
chunk-0: [min,start + chunkSize)
chunk-1: [start + chunkSize, start + 2chunkSize)
.......
chunk-last: [max沫换,null)
1.2臭蚁、非均勻分布
主鍵列非自增或者類型為非整數(shù)類型。主鍵為非數(shù)值類型,每次劃分需要對(duì)未劃分的數(shù)據(jù)按主鍵進(jìn)行升序排列刊棕,取出前 chunkSize 的最大值為當(dāng)前 chunk 的結(jié)束位置炭晒。注意如果未設(shè)置chunkKeyColumn屬性, 且主鍵為聯(lián)合字段, 則取主鍵第一個(gè)字段列進(jìn)行劃分
// 未拆分的數(shù)據(jù)排序后,取 chunkSize 條數(shù)據(jù)取最大值甥角,作為切片的終止位置网严。
chunkend = SELECT MAX(`order_id`) FROM (
SELECT `order_id` FROM `demo_orders`
WHERE `order_id` >= [前一個(gè)切片的起始位置]
ORDER BY `order_id` ASC
LIMIT [chunkSize]
) AS T
1.3、全量切片數(shù)據(jù)讀取
Flink 將表數(shù)據(jù)劃分為多個(gè)Chunk嗤无,子任務(wù)在不加鎖的情況下震束,并行讀取 Chunk數(shù)據(jù)。因?yàn)槿虩o(wú)鎖在數(shù)據(jù)分片讀取過(guò)程中当犯,可能有其他事務(wù)對(duì)切片范圍內(nèi)的數(shù)據(jù)進(jìn)行修改垢村,此時(shí)無(wú)法保證數(shù)據(jù)一致性。因此嚎卫,在全量階段Flink 使用快照記錄讀取+Binlog數(shù)據(jù)修正的方式來(lái)保證數(shù)據(jù)的一致性嘉栓。
1.4、快照讀取
通過(guò)JDBC執(zhí)行SQL查詢切片范圍的數(shù)據(jù)記錄拓诸。
## 快照記錄數(shù)據(jù)讀取SQL
SELECT * FROM `test`.`demo_orders`
WHERE order_id >= [chunkStart]
AND NOT (order_id = [chunkEnd])
AND order_id <= [chunkEnd]
1.5侵佃、數(shù)據(jù)修正
在快照讀取操作前、后執(zhí)行 SHOW MASTER STATUS 查詢binlog文件的當(dāng)前偏移量奠支,在快照讀取完畢后馋辈,查詢區(qū)間內(nèi)的binlog數(shù)據(jù)并對(duì)讀取的快照記錄進(jìn)行修正。
BinlogEvents 修正 SnapshotEvents 規(guī)則:
- 未讀取到binlog數(shù)據(jù)倍谜,即在執(zhí)行select階段沒(méi)有其他事務(wù)進(jìn)行操作迈螟,直接下發(fā)所有快照記錄。
- 讀取到binlog數(shù)據(jù)尔崔,且變更的數(shù)據(jù)記錄不屬于當(dāng)前切片答毫,下發(fā)快照記錄。
- 讀取到binlog數(shù)據(jù)季春,且數(shù)據(jù)記錄的變更屬于當(dāng)前切片洗搂。delete 操作從快照內(nèi)存中移除該數(shù)據(jù),insert 操作向快照內(nèi)存添加新的數(shù)據(jù)鹤盒,update操作向快照內(nèi)存中添加變更記錄,最終會(huì)輸出更新前后的兩條記錄到下游侦副。
單個(gè)切片數(shù)據(jù)處理完畢后會(huì)向 SplitEnumerator 發(fā)送已完成切片數(shù)據(jù)的起始位置(ChunkStart, ChunkStartEnd)侦锯、Binlog的最大偏移量(High watermark),用來(lái)為增量讀取指定起始偏移量秦驯。
1.6尺碰、增量切片數(shù)據(jù)讀取
全量階段切片數(shù)據(jù)讀取完成后,SplitEnumerator 會(huì)下發(fā)一個(gè) BinlogSplit 進(jìn)行增量數(shù)據(jù)讀取。BinlogSplit讀取最重要的屬性就是起始偏移量亲桥,偏移量如果設(shè)置過(guò)小下游可能會(huì)有重復(fù)數(shù)據(jù)洛心,偏移量如果設(shè)置過(guò)大下游可能是已超期的臟數(shù)據(jù)。而 Flink CDC增量讀取的起始偏移量為所有已完成的全量切片最小的Binlog偏移量题篷,只有滿足條件的數(shù)據(jù)才被下發(fā)到下游词身。
數(shù)據(jù)下發(fā)條件:
捕獲的Binlog數(shù)據(jù)的偏移量 > 數(shù)據(jù)所屬分片的Binlog的最大偏移量。
參考: https://www.cnblogs.com/importbigdata/articles/15625753.html