CDC全量階段chunk劃分實(shí)現(xiàn)
前面分析到其主要?jiǎng)澐謈hunk入口在MySqlSourceEnumerator類的start方法中, 最終實(shí)現(xiàn)在MySqlSnapshotSplitAssigner類的open方法
public void open() {
chunkSplitter.open();
discoveryCaptureTables();
captureNewlyAddedTables();
startAsynchronouslySplit();
}
核心查看startAsynchronouslySplit方法
private void startAsynchronouslySplit() {
if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty()) {
if (executor == null) {
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build();
this.executor = Executors.newSingleThreadExecutor(threadFactory);
}
// 創(chuàng)建單線程池, 異步執(zhí)行splitChunksForRemainingTables進(jìn)行chunk劃分
executor.submit(this::splitChunksForRemainingTables);
}
}
調(diào)用splitChunksForRemainingTables
private void splitChunksForRemainingTables() {
try {
// restore from a checkpoint and start to split the table from the previous
// checkpoint
// CDC任務(wù)重啟從CK恢復(fù)處理
if (chunkSplitter.hasNextChunk()) {
LOG.info(
"Start splitting remaining chunks for table {}",
chunkSplitter.getCurrentSplittingTableId());
splitTable(chunkSplitter.getCurrentSplittingTableId());
}
// split the remaining tables
for (TableId nextTable : remainingTables) {
// 進(jìn)行chunk劃分
splitTable(nextTable);
}
} catch (Throwable e) {
synchronized (lock) {
if (uncaughtSplitterException == null) {
uncaughtSplitterException = e;
} else {
uncaughtSplitterException.addSuppressed(e);
}
// Release the potential waiting getNext() call
lock.notify();
}
}
}
再調(diào)用splitTable 方法
private void splitTable(TableId nextTable) {
// 省略
do {
synchronized (lock) {
List<MySqlSnapshotSplit> splits;
try {
splits = chunkSplitter.splitChunks(partition, nextTable);
} catch (Exception e) {
throw new IllegalStateException(
"Error when splitting chunks for " + nextTable, e);
}
} while (chunkSplitter.hasNextChunk());
// 省略
}
繼續(xù)調(diào)用MySqlChunkSplitter類的splitChunks方法進(jìn)行劃分
@Override
public List<MySqlSnapshotSplit> splitChunks(MySqlPartition partition, TableId tableId)
throws Exception {
if (!hasNextChunk()) {
analyzeTable(partition, tableId);
Optional<List<MySqlSnapshotSplit>> evenlySplitChunks =
trySplitAllEvenlySizedChunks(partition, tableId);
if (evenlySplitChunks.isPresent()) {
return evenlySplitChunks.get();
} else {
synchronized (lock) {
this.currentSplittingTableId = tableId;
this.nextChunkStart = ChunkSplitterState.ChunkBound.START_BOUND;
this.nextChunkId = 0;
return Collections.singletonList(
splitOneUnevenlySizedChunk(partition, tableId));
}
}
} else {
Preconditions.checkState(
currentSplittingTableId.equals(tableId),
"Can not split a new table before the previous table splitting finish.");
if (currentSplittingTable == null) {
/**
* 1.如果表沒有主鍵币叹,則必須設(shè)置chunkKeyColumn。
* 2.如果表有主鍵模狭,則chunkKeyColumn必須是其中的一列颈抚,否則為空。
* 3.當(dāng)參數(shù)chunkKeyColumn未設(shè)置且表具有主鍵時(shí)嚼鹉,返回主鍵的第一列(聯(lián)合主鍵情況)贩汉。
* 4.根據(jù)拆分列, 查詢其最大值, 最小值
* 5.執(zhí)行SHOW TABLE STATUS LIKE 'TablaName'獲取表數(shù)據(jù)量
*/
analyzeTable(partition, currentSplittingTableId);
}
synchronized (lock) {
return Collections.singletonList(splitOneUnevenlySizedChunk(partition, tableId));
}
}
}
最終調(diào)用splitOneUnevenlySizedChunk方法, 劃分后的MySqlSnapshotSplit列表保存到remainingSplits集合中
- 均勻分布
主鍵列自增且類型為整數(shù)類型(int,bigint,decimal)。查詢出主鍵列的最小值锚赤,最大值匹舞,按 chunkSize 大小將數(shù)據(jù)均勻劃分,因?yàn)橹麈I為整數(shù)類型线脚,根據(jù)當(dāng)前chunk 起始位置赐稽、chunkSize大小,直接計(jì)算chunk 的結(jié)束位置浑侥。
// 計(jì)算主鍵列數(shù)據(jù)區(qū)間
select min(order_id
), max(order_id
) from demo_orders;
// 將數(shù)據(jù)劃分為 chunkSize 大小的切片
chunk-0: [min姊舵,start + chunkSize)
chunk-1: [start + chunkSize, start + 2chunkSize)
.......
chunk-last: [max,null)
- 非均勻分布
主鍵列非自增或者類型為非整數(shù)類型寓落。主鍵為非數(shù)值類型括丁,每次劃分需要對(duì)未劃分的數(shù)據(jù)按主鍵進(jìn)行升序排列,取出前 chunkSize 的最大值為當(dāng)前 chunk 的結(jié)束位置
// 未拆分的數(shù)據(jù)排序后伶选,取 chunkSize 條數(shù)據(jù)取最大值,作為切片的終止位置考蕾。
chunkend = SELECT MAX(order_id
) FROM (
SELECTorder_id
FROMdemo_orders
WHEREorder_id
>= [前一個(gè)切片的起始位置]
ORDER BYorder_id
ASC
LIMIT [chunkSize]
) AS T
private MySqlSnapshotSplit splitOneUnevenlySizedChunk(MySqlPartition partition, TableId tableId)
throws SQLException {
final int chunkSize = sourceConfig.getSplitSize();
final Object chunkStartVal = nextChunkStart.getValue();
LOG.info(
"Use unevenly-sized chunks for table {}, the chunk size is {} from {}",
tableId,
chunkSize,
nextChunkStart == ChunkSplitterState.ChunkBound.START_BOUND
? "null"
: chunkStartVal.toString());
// we start from [null, min + chunk_size) and avoid [null, min)
Object chunkEnd =
nextChunkEnd(
jdbcConnection,
nextChunkStart == ChunkSplitterState.ChunkBound.START_BOUND
? minMaxOfSplitColumn[0]
: chunkStartVal,
tableId,
splitColumn.name(),
minMaxOfSplitColumn[1],
chunkSize);
// may sleep a while to avoid DDOS on MySQL server
maySleep(nextChunkId, tableId);
if (chunkEnd != null && ObjectUtils.compare(chunkEnd, minMaxOfSplitColumn[1]) <= 0) {
nextChunkStart = ChunkSplitterState.ChunkBound.middleOf(chunkEnd);
return createSnapshotSplit(
jdbcConnection,
partition,
tableId,
nextChunkId++,
splitType,
chunkStartVal,
chunkEnd);
} else {
currentSplittingTableId = null;
nextChunkStart = ChunkSplitterState.ChunkBound.END_BOUND;
return createSnapshotSplit(
jdbcConnection,
partition,
tableId,
nextChunkId++,
splitType,
chunkStartVal,
null);
}
}