Flink CDC 源碼學(xué)習(xí)(四)

CDC全量階段chunk劃分實(shí)現(xiàn)

前面分析到其主要?jiǎng)澐謈hunk入口在MySqlSourceEnumerator類的start方法中, 最終實(shí)現(xiàn)在MySqlSnapshotSplitAssigner類的open方法


image.png
 public void open() {
        chunkSplitter.open();
        discoveryCaptureTables();
        captureNewlyAddedTables();
        startAsynchronouslySplit();
    }

核心查看startAsynchronouslySplit方法

private void startAsynchronouslySplit() {
        if (chunkSplitter.hasNextChunk() || !remainingTables.isEmpty()) {
            if (executor == null) {
                ThreadFactory threadFactory =
                        new ThreadFactoryBuilder().setNameFormat("snapshot-splitting").build();
                this.executor = Executors.newSingleThreadExecutor(threadFactory);
            }
            // 創(chuàng)建單線程池, 異步執(zhí)行splitChunksForRemainingTables進(jìn)行chunk劃分
            executor.submit(this::splitChunksForRemainingTables);
        }
    }

調(diào)用splitChunksForRemainingTables

private void splitChunksForRemainingTables() {
        try {
            // restore from a checkpoint and start to split the table from the previous
            // checkpoint
            // CDC任務(wù)重啟從CK恢復(fù)處理
            if (chunkSplitter.hasNextChunk()) {
                LOG.info(
                        "Start splitting remaining chunks for table {}",
                        chunkSplitter.getCurrentSplittingTableId());
                splitTable(chunkSplitter.getCurrentSplittingTableId());
            }

            // split the remaining tables
            for (TableId nextTable : remainingTables) {
                // 進(jìn)行chunk劃分
                splitTable(nextTable);
            }
        } catch (Throwable e) {
            synchronized (lock) {
                if (uncaughtSplitterException == null) {
                    uncaughtSplitterException = e;
                } else {
                    uncaughtSplitterException.addSuppressed(e);
                }
                // Release the potential waiting getNext() call
                lock.notify();
            }
        }
    }

再調(diào)用splitTable 方法

 private void splitTable(TableId nextTable) {
       // 省略
        do {
            synchronized (lock) {
                List<MySqlSnapshotSplit> splits;
                try {
                    splits = chunkSplitter.splitChunks(partition, nextTable);
                } catch (Exception e) {
                    throw new IllegalStateException(
                            "Error when splitting chunks for " + nextTable, e);
                }

               
        } while (chunkSplitter.hasNextChunk());
         // 省略
    }

繼續(xù)調(diào)用MySqlChunkSplitter類的splitChunks方法進(jìn)行劃分

@Override
    public List<MySqlSnapshotSplit> splitChunks(MySqlPartition partition, TableId tableId)
            throws Exception {
        if (!hasNextChunk()) {
            analyzeTable(partition, tableId);
            Optional<List<MySqlSnapshotSplit>> evenlySplitChunks =
                    trySplitAllEvenlySizedChunks(partition, tableId);
            if (evenlySplitChunks.isPresent()) {
                return evenlySplitChunks.get();
            } else {
                synchronized (lock) {
                    this.currentSplittingTableId = tableId;
                    this.nextChunkStart = ChunkSplitterState.ChunkBound.START_BOUND;
                    this.nextChunkId = 0;
                    return Collections.singletonList(
                            splitOneUnevenlySizedChunk(partition, tableId));
                }
            }
        } else {
            Preconditions.checkState(
                    currentSplittingTableId.equals(tableId),
                    "Can not split a new table before the previous table splitting finish.");
            if (currentSplittingTable == null) {
                /**
                 *  1.如果表沒有主鍵币叹,則必須設(shè)置chunkKeyColumn。
                 *  2.如果表有主鍵模狭,則chunkKeyColumn必須是其中的一列颈抚,否則為空。
                 *  3.當(dāng)參數(shù)chunkKeyColumn未設(shè)置且表具有主鍵時(shí)嚼鹉,返回主鍵的第一列(聯(lián)合主鍵情況)贩汉。
                 *  4.根據(jù)拆分列, 查詢其最大值, 最小值
                 *  5.執(zhí)行SHOW TABLE STATUS LIKE 'TablaName'獲取表數(shù)據(jù)量
                 */
                analyzeTable(partition, currentSplittingTableId);
            }
            synchronized (lock) {
                return Collections.singletonList(splitOneUnevenlySizedChunk(partition, tableId));
            }
        }
    }

最終調(diào)用splitOneUnevenlySizedChunk方法, 劃分后的MySqlSnapshotSplit列表保存到remainingSplits集合中

  • 均勻分布

主鍵列自增且類型為整數(shù)類型(int,bigint,decimal)。查詢出主鍵列的最小值锚赤,最大值匹舞,按 chunkSize 大小將數(shù)據(jù)均勻劃分,因?yàn)橹麈I為整數(shù)類型线脚,根據(jù)當(dāng)前chunk 起始位置赐稽、chunkSize大小,直接計(jì)算chunk 的結(jié)束位置浑侥。

// 計(jì)算主鍵列數(shù)據(jù)區(qū)間
select min(order_id), max(order_id) from demo_orders;
// 將數(shù)據(jù)劃分為 chunkSize 大小的切片
chunk-0: [min姊舵,start + chunkSize)
chunk-1: [start + chunkSize, start + 2chunkSize)
.......
chunk-last: [max,null)

  • 非均勻分布
    主鍵列非自增或者類型為非整數(shù)類型寓落。主鍵為非數(shù)值類型括丁,每次劃分需要對(duì)未劃分的數(shù)據(jù)按主鍵進(jìn)行升序排列,取出前 chunkSize 的最大值為當(dāng)前 chunk 的結(jié)束位置

// 未拆分的數(shù)據(jù)排序后伶选,取 chunkSize 條數(shù)據(jù)取最大值,作為切片的終止位置考蕾。
chunkend = SELECT MAX(order_id) FROM (
SELECT order_id FROM demo_orders
WHERE order_id >= [前一個(gè)切片的起始位置]
ORDER BY order_id ASC
LIMIT [chunkSize]
) AS T

 private MySqlSnapshotSplit splitOneUnevenlySizedChunk(MySqlPartition partition, TableId tableId)
            throws SQLException {
        final int chunkSize = sourceConfig.getSplitSize();
        final Object chunkStartVal = nextChunkStart.getValue();
        LOG.info(
                "Use unevenly-sized chunks for table {}, the chunk size is {} from {}",
                tableId,
                chunkSize,
                nextChunkStart == ChunkSplitterState.ChunkBound.START_BOUND
                        ? "null"
                        : chunkStartVal.toString());
        // we start from [null, min + chunk_size) and avoid [null, min)
        Object chunkEnd =
                nextChunkEnd(
                        jdbcConnection,
                        nextChunkStart == ChunkSplitterState.ChunkBound.START_BOUND
                                ? minMaxOfSplitColumn[0]
                                : chunkStartVal,
                        tableId,
                        splitColumn.name(),
                        minMaxOfSplitColumn[1],
                        chunkSize);
        // may sleep a while to avoid DDOS on MySQL server
        maySleep(nextChunkId, tableId);
        if (chunkEnd != null && ObjectUtils.compare(chunkEnd, minMaxOfSplitColumn[1]) <= 0) {
            nextChunkStart = ChunkSplitterState.ChunkBound.middleOf(chunkEnd);
            return createSnapshotSplit(
                    jdbcConnection,
                    partition,
                    tableId,
                    nextChunkId++,
                    splitType,
                    chunkStartVal,
                    chunkEnd);
        } else {
            currentSplittingTableId = null;
            nextChunkStart = ChunkSplitterState.ChunkBound.END_BOUND;
            return createSnapshotSplit(
                    jdbcConnection,
                    partition,
                    tableId,
                    nextChunkId++,
                    splitType,
                    chunkStartVal,
                    null);
        }
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市肖卧,隨后出現(xiàn)的幾起案子蚯窥,更是在濱河造成了極大的恐慌,老刑警劉巖拦赠,帶你破解...
    沈念sama閱讀 206,311評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異荷鼠,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)允乐,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,339評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門矮嫉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)牍疏,“玉大人,你說(shuō)我怎么就攤上這事鳞陨∽蚰” “怎么了厦滤?”我有些...
    開封第一講書人閱讀 152,671評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)掏导。 經(jīng)常有香客問(wèn)我享怀,道長(zhǎng),這世上最難降的妖魔是什么趟咆? 我笑而不...
    開封第一講書人閱讀 55,252評(píng)論 1 279
  • 正文 為了忘掉前任垄分,我火速辦了婚禮室囊,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘蘸嘶。我一直安慰自己履植,他們只是感情好计雌,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,253評(píng)論 5 371
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著玫霎,像睡著了一般凿滤。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上庶近,一...
    開封第一講書人閱讀 49,031評(píng)論 1 285
  • 那天翁脆,我揣著相機(jī)與錄音,去河邊找鬼鼻种。 笑死反番,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播罢缸,決...
    沈念sama閱讀 38,340評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼篙贸,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了枫疆?” 一聲冷哼從身側(cè)響起爵川,我...
    開封第一講書人閱讀 36,973評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎息楔,沒想到半個(gè)月后寝贡,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,466評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡值依,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,937評(píng)論 2 323
  • 正文 我和宋清朗相戀三年圃泡,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片鳞滨。...
    茶點(diǎn)故事閱讀 38,039評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡洞焙,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出拯啦,到底是詐尸還是另有隱情澡匪,我是刑警寧澤,帶...
    沈念sama閱讀 33,701評(píng)論 4 323
  • 正文 年R本政府宣布褒链,位于F島的核電站唁情,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏甫匹。R本人自食惡果不足惜甸鸟,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,254評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望兵迅。 院中可真熱鬧抢韭,春花似錦、人聲如沸恍箭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,259評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)扯夭。三九已至鳍贾,卻和暖如春交洗,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背构拳。 一陣腳步聲響...
    開封第一講書人閱讀 31,485評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工梁棠, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人伍掀。 一個(gè)月前我還...
    沈念sama閱讀 45,497評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像蜜笤,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子把兔,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,786評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容