clickhouse寫入和副本同步過程

一: 概述 Clickhouse 插入數(shù)據(jù)

Clickhouse 插入數(shù)據(jù)過程

當(dāng)需要在ReplicatedMergeTree中執(zhí)行INSERT以寫入數(shù)據(jù)時(shí),即會(huì)進(jìn)入INSERT核心流程,整個(gè)流程從上至下按照時(shí)間順序進(jìn)行闲先。

NEXT-1: 創(chuàng)建副本

副本1:

create table test_replicated(
id Int8,
ctime DateTime,
name String
)ENGINE = ReplicatedMergeTree('/clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated','replic1')
partition by toYYYYMM(ctime)
order by id;

副本2:

create table test_replicated(
id Int8,
ctime DateTime,
name String
)ENGINE = ReplicatedMergeTree('/clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated','replic2')
partition by toYYYYMM(ctime)
order by id;

在創(chuàng)建的過程中硝枉,ReplicatedMergeTree會(huì)進(jìn)行一些初始化操作孵坚。

  • 根據(jù)zk_path初始化所有的ZooKeeper節(jié)點(diǎn)奶陈。

    表的zk_path:
    ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated 
    
    [alter_partition_version, block_numbers, blocks, columns, leader_election, log, metadata, mutations, nonincrement_block_numbers, part_moves_shard, pinned_part_uuids, quorum, replicas, temp, zero_copy_hdfs, zero_copy_s3]
    
  • 在/replicas/節(jié)點(diǎn)下注冊(cè)自己的副本實(shí)例replic1或replic2

replic1的zk path:
ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic1

[columns, flags, host, is_active, is_lost, log_pointer, max_processed_insert_time, metadata, metadata_version, min_unprocessed_insert_time, mutation_pointer, parts, queue]
  • 啟動(dòng)監(jiān)聽任務(wù),監(jiān)聽/log日志節(jié)點(diǎn)贼陶。
[zk: 10.178.19.198:2181(CONNECTED) 24] ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/log 
[log-0000000000]

NEXT-2: 向第一個(gè)副本實(shí)例寫入數(shù)據(jù)

1) 術(shù)語

Block數(shù)據(jù)塊:insert依據(jù)max_insert_block_size的大小(默認(rèn)1048576行)將數(shù)據(jù)切分成若干個(gè)Block數(shù)據(jù)塊巧娱。因此Block數(shù)據(jù)塊是數(shù)據(jù)寫入的基本單元碉怔,并且具有寫入的原子性和唯一性。

每個(gè)壓縮數(shù)據(jù)塊的體積禁添,按照其壓縮前的數(shù)據(jù)字節(jié)大小撮胧,被嚴(yán)格控制在64KB~1MB之間,上下限大小由min_compress_block_size(默認(rèn)65536)max_compress_block_size(默認(rèn)1048576)參數(shù)指定上荡。而每一個(gè)壓縮數(shù)據(jù)塊最終大小趴樱,則和一個(gè)index_granularity內(nèi)實(shí)際的數(shù)據(jù)大小有關(guān)馒闷。

  • 單個(gè)索引粒度間隔數(shù)據(jù)size < 64KB:如果單個(gè)索引粒度數(shù)據(jù)大小小于64KB,則繼續(xù)獲取下一個(gè)索引粒度的數(shù)據(jù)叁征,一直到size >= 64KB纳账,生成下一個(gè)壓縮數(shù)據(jù)塊。

  • 單個(gè)索引粒度間隔數(shù)據(jù) 64KB <= size <= 1MB:如果單個(gè)索引粒度數(shù)據(jù)大小大于64KB捺疼,小于1MB疏虫,則直接生成下一個(gè)壓縮數(shù)據(jù)塊

  • 單個(gè)索引粒度間隔數(shù)據(jù) size > 1MB:如果單個(gè)索引粒度數(shù)據(jù)大小超過1MB,則先按照1MB大小截?cái)嗖⑸上乱粋€(gè)壓縮數(shù)據(jù)塊啤呼,剩余數(shù)據(jù)按照這三個(gè)規(guī)則對(duì)應(yīng)執(zhí)行卧秘。這時(shí)就會(huì)出現(xiàn)一批數(shù)據(jù)生成多個(gè)壓縮數(shù)據(jù)塊的情況。


    clickhouse數(shù)據(jù)塊.jpg

2) 插入數(shù)據(jù)

replic1寫入如下數(shù)據(jù):

insert into test_replicated values(1,now(),'zs'),(2,now(),'ls');                       

插入數(shù)據(jù)后如下zk會(huì)有如下數(shù)據(jù):

1: /blocks
ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/blocks 
[202201_5253379567567382390_63093905895384417]

get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/blocks/202201_5253379567567382390_63093905895384417 
202201_0_0_0


2: log
ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/log
[log-0000000000]

get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/log/log-0000000000 

format version: 4
create_time: 2022-01-07 21:37:16
source replica: replic1
block_id: 202201_5253379567567382390_63093905895384417
get
202201_0_0_0
part_type: Compact

NEXT-3: 第二個(gè)副本實(shí)例拉取Log日志

replic2副本會(huì)一直監(jiān)聽/log節(jié)點(diǎn)變化官扣,當(dāng)replic1推送log/log-0000000000后翅敌,replic2會(huì)觸發(fā)日志的拉取任務(wù)并更新log_pointer

get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic1/log_pointer 
1

replic2 在拉取了LogEntry之后,它并不會(huì)直接執(zhí)行惕蹄,而是將其轉(zhuǎn)為任務(wù)對(duì)象放至隊(duì)列:

ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic2/queue 
queue-0000000001

NEXT-4: 第二個(gè)副本實(shí)例向其他副本發(fā)起下載請(qǐng)求

replic2基于/queue隊(duì)列開始執(zhí)行任務(wù)蚯涮。當(dāng)看到type類型為get的時(shí)候,ReplicatedMerge-Tree會(huì)明白此時(shí)在遠(yuǎn)端的其他副本中已經(jīng)成功寫入了數(shù)據(jù)分區(qū)卖陵,而自己需要同步這些數(shù)據(jù)遭顶。

replic2上的第二個(gè)副本實(shí)例會(huì)開始選擇一個(gè)遠(yuǎn)端的其他副本作為數(shù)據(jù)的下載來源。遠(yuǎn)端副本的選擇算法大致是這樣的:
(1)從/replicas節(jié)點(diǎn)拿到所有的副本節(jié)點(diǎn)泪蔫。
(2)遍歷這些副本棒旗,選取其中一個(gè)。選取的副本需要擁有最大的log_pointer下標(biāo)撩荣,并且/queue子節(jié)點(diǎn)數(shù)量最少铣揉。log_pointer下標(biāo)最大,意味著該副本執(zhí)行的日志最多婿滓,數(shù)據(jù)應(yīng)該更加完整老速;而/queue最小,則意味著該副本目前的任務(wù)執(zhí)行負(fù)擔(dān)較小凸主。

例如:
Sending request to http://replic1.clickhouse-headless.cluster.local:9009/?endpoint=DataPartsExchange……

NEXT-5 第一個(gè)副本實(shí)例響應(yīng)數(shù)據(jù)下載

replic1的DataPartsExchange端口服務(wù)接收到調(diào)用請(qǐng)求橘券,在得知對(duì)方來意之后,根據(jù)參數(shù)做出響應(yīng)卿吐,將本地分區(qū)202107_0_0_0基于DataPartsExchang的服務(wù)響應(yīng)發(fā)送回replic1

NEXT-6 第二個(gè)副本實(shí)例下載數(shù)據(jù)并完成本地寫入

首先將其寫至臨時(shí)目錄

```
tmp_fetch_202107_1_1_0
```

待全部數(shù)據(jù)接收完成之后旁舰,重命名該目錄

```
Renaming temporary part tmp_fetch_202107_1_1_0 to 202107_1_1_0.
```

二: Clickhouse副本同步過程

2.1 哪些操作會(huì)引發(fā)數(shù)據(jù)同步?

數(shù)據(jù)同步包括了元數(shù)據(jù)和內(nèi)容數(shù)據(jù)的同步嗡官。在ClickHouse中箭窜,數(shù)據(jù)插入(INSERT),元數(shù)據(jù)變更(ALTER)衍腥,內(nèi)容數(shù)據(jù)變更(MUTATION)磺樱,合并(MERGE)等操作都會(huì)引發(fā)數(shù)據(jù)同步纳猫。

其中MERGEMUTATION操作 只能由主副本主導(dǎo), 主副本選舉過程如下:

/leader_election**:用于主副本的選舉工作竹捉,主副本會(huì)主導(dǎo)`MERGE`和`MUTATION`操作(`ALTER DELETE`和`ALTER UPDATE`)芜辕。這些任務(wù)在主副本完成之后再借助ZooKeeper將消息事件分發(fā)至其他副本

主副本選舉的方式是向/leader_election/插入子節(jié)點(diǎn),第一個(gè)插入成功的副本 就是主副本

2.2: INSERT同步

clickhouse執(zhí)行ReplicatedMergeTree表的插入操作時(shí)块差,會(huì)將數(shù)據(jù)以data part為單位寫入到磁盤/內(nèi)存里侵续,每個(gè)data part寫入完成后都會(huì)做一個(gè)commit操作, 這個(gè)commit操作會(huì)生成一個(gè)GET_PART類型的log entry并上傳到zookeeper中供其他副本拉取同步。

由此可見憨闰,INSERT語句引發(fā)的是data part級(jí)別的內(nèi)容數(shù)據(jù)的同步状蜗。

GET_PART類型的log entry 文件內(nèi)容demo:

format version: 4
create_time: 2022-01-07 21:37:16
source replica: replic1
block_id: 202201_5253379567567382390_63093905895384417
get
202201_0_0_0
part_type: Compact

2.3 同步日志和副本隊(duì)列(log, queue)

2.3.1 同步日志(log)

log位于zookeeper_path下面,用于保存所有副本間的數(shù)據(jù)同步日志(稱為log entry)鹉动,ClickHouse支持的LogEntry類型有:

img

筆記:

每插入一個(gè)數(shù)據(jù)  都會(huì)生成一個(gè)entity log:
[zk: localhost:2181(CONNECTED) 20] ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/log
[log-0000000000, log-0000000001]

原理: 在一個(gè)副本上執(zhí)行insert into ...操作會(huì)上傳一個(gè)GET_PART類型的log entry到log節(jié)點(diǎn)下轧坎。
zookeeper_path/log下的log entry在zookeeper上的znode命名規(guī)范是log-seqNum, 其中seqNum為創(chuàng)建sequential znode生成的自增序號(hào),如log-0000000001训裆。

log entity 下有什么東西:

[zk: localhost:2181(CONNECTED) 22] get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/log/log-0000000001

format version: 4
create_time: 2022-01-07 22:08:20
source replica: replic1  //數(shù)據(jù)源
block_id: 202201_4569856920166365182_7768759302129368290  // 分區(qū)的block_id
get
202201_1_1_0  //從那個(gè)part下載
part_type: Compact

2.3.2 副本隊(duì)列 (queue)

queue位于replica_path下面眶根,保存了當(dāng)前副本需要同步的log entries. queue里面的log entries是從log中拉取的(由queue updating task 任務(wù)負(fù)責(zé)拉取)边琉,被所有副本都拉取過的log entry就可以從log中刪除掉, 刪除操作由后臺(tái)清理線程執(zhí)行

replica_path/queue下的entry在zookeeper上的znode命名規(guī)范是queue-seqNum, 其中seqNum為創(chuàng)建sequential znode生成的自增序號(hào)记劝,如queue-0000000001变姨。

replica_path下還有一個(gè)log_pointer節(jié)點(diǎn),保存了當(dāng)前副本從log拉取到的最大log entry id + 1厌丑,即下一個(gè)需要拉取的log entry的id.

[zk:localhost:2181(CONNECTED) 11] get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic1/log_pointer   //副本2 
2
[zk:localhost:2181(CONNECTED) 12] get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic2/log_pointer    // 副本1
2

3 負(fù)責(zé)數(shù)據(jù)同步的tasks

每個(gè)副本節(jié)點(diǎn)都有一系列的負(fù)責(zé)數(shù)據(jù)同步的任務(wù)定欧,這些任務(wù)會(huì)與zookeeper交互,獲取最新的同步任務(wù)怒竿,然后在本地內(nèi)存中維護(hù)任務(wù)隊(duì)列并借助任務(wù)池調(diào)度執(zhí)行砍鸠,執(zhí)行完成后會(huì)更新zookeer上的對(duì)應(yīng)狀態(tài)。下面我們逐一介紹各類同步任務(wù)耕驰。

3.1 queue updating task

queue updating task負(fù)責(zé)當(dāng)前副本上同步日志(log entry)隊(duì)列的更新爷辱。

ReplicatedMergeTree表對(duì)應(yīng)的StoreageReplicatedMergeTree類中有一個(gè)queue_updating_task對(duì)象,它是由后臺(tái)調(diào)度池(BackgroundSchedulePool)調(diào)度執(zhí)行的任務(wù)朦肘,其執(zhí)行的函數(shù)是

StorageReplicatedMergeTree::queueUpdatingTask()

queue updating task做的事情包括:
1. 從zookeeper拉取zookeeper_path/log下的所有l(wèi)og entries饭弓,并從replica_path/log_pointer獲取當(dāng)前副本的log_pointer (即下一個(gè)需要拉取的log entry的id)。

2. 根據(jù)log_pointer在log entries中定位到需要拉取的entries媒抠,如果log_pointer為空弟断,則從log entries中最小的entry開始拉取。

3. 將需要拉取的log entries都同步到replica_path/queue下面趴生,這里的同步就是以選定的log entries的內(nèi)容為節(jié)點(diǎn)內(nèi)容阀趴,一個(gè)一個(gè)地在replica_path/queue下創(chuàng)建新的sequential znode. 注意昏翰,這里創(chuàng)建的znode名稱并不會(huì)和zookeeper_path/log下的一致。

4. 更新replica_path/log_pointer的內(nèi)容為拉取到的log entries中最大的entry id + 1.

5. 將拉取到的log entries插入到內(nèi)存中的同步任務(wù)隊(duì)列里刘急,即插入到ReplicatedMergeTreeQueue::queue矩父。

6. 觸發(fā)queue executing task(下文會(huì)介紹)執(zhí)行。

這些步驟的實(shí)現(xiàn)細(xì)節(jié)請(qǐng)見源碼ReplicatedMergeTreeQueue::pullLogsToQueue.

void StorageReplicatedMergeTree::queueUpdatingTask()
{
    ....
    try
    {
        queue.pullLogsToQueue(getZooKeeper(), queue_updating_task->getWatchCallback());
        .....
    }
    catch (const Coordination::Exception & e)
    {
       ....
    }
    ....
}

3.2 ReplicatedMergeTreeQueue::queue 隊(duì)列任務(wù)

StorageReplicatedMergeTree::queueUpdatingTask() 會(huì)將拉取到的log entries插入到內(nèi)存中的同步任務(wù)隊(duì)列里.

3.3 queue executing task

queue updating task把同步日志(log entries)拉取到本地內(nèi)存后排霉,我們需要一個(gè)任務(wù)去執(zhí)行這些log entries. 這個(gè)任務(wù)就是queue executing task.

queue executing 最終調(diào)用執(zhí)行的函數(shù)是 StorageReplicatedMergeTree::queueTask

下面介紹queueTask函數(shù)的主要執(zhí)行步驟窍株。

step1: 選擇要處理的log entry

對(duì)于GET_PART類型的log entry,如果它生成的data part被某個(gè)正在執(zhí)行的log entry的resulting data parts所包含攻柠,則當(dāng)前entry此輪不被選擇球订。

step2: 處理選中的log entry

選中l(wèi)og entry后,StorageReplicatedMergeTree::queueTask函數(shù)會(huì)調(diào)用ReplicatedMergeTreeQueue::processEntry對(duì)log entry進(jìn)行處理瑰钮。processEntry函數(shù)的第三個(gè)參數(shù)是用于執(zhí)行l(wèi)og entry的函數(shù)冒滩,此處傳入的是StorageReplicatedMergeTree::executeLogEntry

源碼:

BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask()
{
    ....
    // 對(duì)正在執(zhí)行的entry用selected變量來表示,方便以后處理
    ReplicatedMergeTreeQueue::SelectedEntry selected;
    try
    {
        // 1: 選擇需要處理的entrys
        selected = queue.selectEntryToProcess(merger_mutator, *this);
    }
    ....
    LogEntryPtr & entry = selected.first;
    // 2: 如果沒有需要處理的直接返回
    if (!entry)
        return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
    // 3: 有需要處理的執(zhí)行以下代碼
    time_t prev_attempt_time = entry->last_attempt_time;

    // 這里是真正的處理方法浪谴,并且傳入了一個(gè)匿名方法
    bool res = queue.processEntry([this]{ return getZooKeeper(); }, entry, [&](LogEntryPtr & entry_to_process)
    {
        try
        {
            // 4: 這里是真正的執(zhí)行器executeLogEntry
            return executeLogEntry(*entry_to_process);
        }
        ....
    });
    ....
}

3.4 StorageReplicatedMergeTree::executeLogEntry

每次queue executing task執(zhí)行只會(huì)選擇一個(gè)log entry進(jìn)行處理开睡。選中l(wèi)og entry后,StorageReplicatedMergeTree::queueTask函數(shù)會(huì)最終會(huì)調(diào)用StorageReplicatedMergeTree::executeLogEntry進(jìn)行處理

每個(gè)log entiry的處理邏輯不一樣苟耻, 本節(jié)對(duì)于GET_PART類型的log entry篇恒,處理流程如下:

step1: 判斷目標(biāo)data part在當(dāng)前副本是否已經(jīng)存在或被已存在的data part所包含,這里不僅會(huì)檢查active data parts凶杖,也會(huì)檢查處于MergeTreeDataPartState::PreCommitted狀態(tài)的data parts胁艰。

step2: 判斷目標(biāo)data part在zookeeper的replica_path/parts節(jié)點(diǎn)下是否已經(jīng)存在。

step3: 如果step1和step2的判斷結(jié)果都是已存在智蝠,則直接跳過當(dāng)前l(fā)og entry腾么,不做處理(此時(shí)executeLogEntry返回true,表示處理成功)杈湾。

step4: 否則解虱,進(jìn)一步判斷目標(biāo)data part是否是某次失敗的write with quorum操作的resulting data part,判斷方法是查看zookeeper上zookeeper_path/quorum/failed_parts節(jié)點(diǎn)下是否存在目標(biāo)data part漆撞。如果存在殴泰,則和2.3一樣,直接跳過當(dāng)前l(fā)og entry叫挟,不做處理(此時(shí)executeLogEntry返回true艰匙,表示處理成功)。

step5: 否則抹恳,從其他有目標(biāo)data part的副本節(jié)點(diǎn)去拉取目標(biāo)data part员凝。拉取是調(diào)用StorageReplicatedMergeTree::executeFetch實(shí)現(xiàn)的。

以上過程對(duì)應(yīng)源碼如下:

bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
{
....
if (entry.type == LogEntry::GET_PART ||
        entry.type == LogEntry::MERGE_PARTS ||
        entry.type == LogEntry::MUTATE_PART)
    {
        //1:判斷目標(biāo)data part在當(dāng)前副本是否已經(jīng)存在或被已存在的data part所包含奋献,這里不僅會(huì)檢查active data parts健霹,也會(huì)檢查處于MergeTreeDataPartState::PreCommitted狀態(tài)的data parts
        DataPartPtr existing_part = getPartIfExists(entry.new_part_name, {MergeTreeDataPartState::PreCommitted});
        if (!existing_part)
       // 2: getActiveContainingPart 判斷分區(qū)是否已經(jīng) merge 到磁盤
            existing_part = getActiveContainingPart(entry.new_part_name);
        /**
        3: 本地存在的話,判斷目標(biāo)data part在zookeeper的*replica_path/parts*節(jié)點(diǎn)下是否已經(jīng)存在旺上。
        例如:
  [zk: localhost:2181(CONNECTED) 59] ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic1/parts 
[202201_0_0_0, 202201_1_1_0]
        *
        **/
        if (existing_part && getZooKeeper()->exists(replica_path + "/parts/" + existing_part->name))
        {
            ....
            // 4: 如果 part在當(dāng)前副本是否已經(jīng)存在或zk已經(jīng)存在,則直接跳過當(dāng)前l(fā)og entry糖埋,不做處理(此時(shí)executeLogEntry返回true宣吱,表示處理成功
            return true;
        }
        
    bool do_fetch = false;

    switch (entry.type)
    {
        case LogEntry::ATTACH_PART:
            /// We surely don't have this part locally as we've checked it before, so download it.
            [[fallthrough]];
        case LogEntry::GET_PART:
            do_fetch = true;
            break;
        .....
        default:
            throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected log entry type: {}", static_cast<int>(entry.type));
    }
    // 5: 對(duì)于get類型part, 則直接調(diào)用 fetch  
    if (do_fetch)
        // 實(shí)際調(diào)用的是executeFetch 
        return executeFetch (entry);
    return true;

3.4 StorageReplicatedMergeTree::executeFetch

executeFetch方法是處理拉取part的拉取任務(wù)

源碼如下:

bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
{
    /// 1: 查找是否有需要覆蓋的part
    String replica = findReplicaHavingCoveringPart(entry, true);
    const auto storage_settings_ptr = getSettings();

    // 2: 設(shè)置一些并行參數(shù),判斷replicated_max_parallel_fetches和
    // replicated_max_parallel_fetches_for_table是否符合要求
    static std::atomic_uint total_fetches {0};
    // 判斷是否超過拉取次數(shù)限制(storage級(jí)別)
    if (storage_settings_ptr->replicated_max_parallel_fetches && total_fetches >= storage_settings_ptr->replicated_max_parallel_fetches)
    {
        throw Exception("Too many total fetches from replicas, maximum: " + storage_settings_ptr->replicated_max_parallel_fetches.toString(),
            ErrorCodes::TOO_MANY_FETCHES);
    }

    ++total_fetches;
    SCOPE_EXIT({--total_fetches;});
    // 判斷是否超過拉取次數(shù)限制(table級(jí)別)
    if (storage_settings_ptr->replicated_max_parallel_fetches_for_table && current_table_fetches >= storage_settings_ptr->replicated_max_parallel_fetches_for_table)
    {
        throw Exception("Too many fetches from replicas for table, maximum: " + storage_settings_ptr->replicated_max_parallel_fetches_for_table.toString(),
            ErrorCodes::TOO_MANY_FETCHES);
    }

    ++current_table_fetches;
    SCOPE_EXIT({--current_table_fetches;});
   .....
        try
        {
            String part_name = entry.actual_new_part_name.empty() ? entry.new_part_name : entry.actual_new_part_name;
            // 3: 拉取part
            if (!fetchPart(part_name, zookeeper_path + "/replicas/" + replica, false, entry.quorum))
                return false;
        }
        .....
    }
    ...
    return true;
}

Next1-StorageReplicatedMergeTree::fetchPart 方法

executeFetch 最終拉取part 調(diào)用的是fetchPart 方法, fetchPart 方法實(shí)現(xiàn)如下:

bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & source_replica_path, bool to_detached, size_t quorum)
{
    auto zookeeper = zookeeper_ ? zookeeper_ : getZooKeeper();
    const auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
    ...
    LOG_DEBUG(log, "Fetching part {} from {}", part_name, source_replica_path);
    /***
    2022.01.05 16:29:06.940980 [ 4423 ] {} <Debug> hdp_teu_dpd_clickhousedb.hybrid_test_tongbu (4058c757-c516-4345-8058-c757c5166345): Fetching part 20220105_1_1_0 from /clickhouse/common_test_cluster/hdp_teu_dpd_clickhousedb/hybrid_test_tongbu/shard2/replicas/replic2
    **/
    ...
    std::function<MutableDataPartPtr()> get_part;
    if (part_to_clone)
    {
        .....
    }
    else
    {
        // 1: 獲取需要clone數(shù)據(jù)的副本地址
        ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
        auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
        auto user_password = global_context.getInterserverCredentials();
        String interserver_scheme = global_context.getInterserverScheme();

        get_part = [&, address, timeouts, user_password, interserver_scheme]()
        {
            ...
            // 2: 這里的fetchPart主要就是構(gòu)造HTTP參數(shù)及連接真正拉取數(shù)據(jù)
            /**
            參數(shù) demo:  
            source_replica_path: zookeeper_path + "/replicas/" + replica
            address.host:hostname1
            port: 9009 
            interserver_scheme: 
            to_detached: false
            part_name : 202107_1_1_0
            **/
            // 3: 最終調(diào)用的是 fetcher.fetchPart 方法
            return fetcher.fetchPart(
                part_name, source_replica_path,
                address.host, address.replication_port,
                timeouts, user_password.first, user_password.second, interserver_scheme, to_detached);
        };
    }
    ...
    return true;
}

Next2-fetcher.fetchPart 代碼如下(DataPartsExchange.cpp):

MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
    const StorageMetadataPtr & metadata_snapshot,
    ContextPtr context,
    const String & part_name,
    const String & replica_path,
    const String & host,
    int port,
    const ConnectionTimeouts & timeouts,
    const String & user,
    const String & password,
    const String & interserver_scheme,
    ThrottlerPtr throttler,
    bool to_detached,
    const String & tmp_prefix_,
    std::optional<CurrentlySubmergingEmergingTagger> * tagger_ptr,
    bool try_zero_copy,
    DiskPtr disk)
{

    auto part_info = MergeTreePartInfo::fromPartName(part_name, data.format_version);
    const auto data_settings = data.getSettings();
    // 1: 設(shè)置 uri
    Poco::URI uri;
    uri.setScheme(interserver_scheme);
    uri.setHost(host);
    uri.setPort(port);
    uri.setQueryParameters(
    {
        {"endpoint",                getEndpointId(replica_path)},
        {"part",                    part_name},
        {"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)},
        {"compress",                "false"}
    });

    Strings capability;
    if (try_zero_copy && data_settings->allow_remote_fs_zero_copy_replication)
    {
       /*******
       2: 判斷是如果是zero_copy 添加:
       capability.push_back(DiskType::toString(disk->getType()));
       細(xì)節(jié)略
       ***/
        
    }
    if (!capability.empty())
    {
        const String & remote_fs_metadata = boost::algorithm::join(capability, ", ");
        uri.addQueryParameter("remote_fs_metadata", remote_fs_metadata);
    }
    else
    {
        try_zero_copy = false;
    }

    Poco::Net::HTTPBasicCredentials creds{};
    if (!user.empty())
    {
        creds.setUsername(user);
        creds.setPassword(password);
    }

    PooledReadWriteBufferFromHTTP in{
        uri,
        Poco::Net::HTTPRequest::HTTP_POST,
        {},
        timeouts,
        creds,
        DBMS_DEFAULT_BUFFER_SIZE,
        0, /* no redirects */
        data_settings->replicated_max_parallel_fetches_for_host
    };
   
    int server_protocol_version = parse<int>(in.getResponseCookie("server_protocol_version", "0"));

    ReservationPtr reservation;
    size_t sum_files_size = 0;
    // 3: 獲取服務(wù)端part目下文件的累計(jì)大小
    if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
    {
        readBinary(sum_files_size, in);
        ....
    }
    .....

    bool sync = (data_settings->min_compressed_bytes_to_fsync_after_fetch
                    && sum_files_size >= data_settings->min_compressed_bytes_to_fsync_after_fetch);

    String part_type = "Wide";
    // 4: 讀取server端part的part_type
    if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE)
        readStringBinary(part_type, in);
    // 5: 讀取server端part的part_uuid
    UUID part_uuid = UUIDHelpers::Nil;
    if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID)
        readUUIDText(part_uuid, in);

    String remote_fs_metadata = parse<String>(in.getResponseCookie("remote_fs_metadata", ""));
    if (!remote_fs_metadata.empty())
    {
       // 6: 如果是遠(yuǎn)程服務(wù)端走元數(shù)據(jù)同步邏輯瞳别,如s3或者 hdfs (相當(dāng)于握手協(xié)議征候,雙方都是zero-copy存儲(chǔ)類型才可以走元數(shù)據(jù)同步邏輯)
        try
        {   
            return downloadPartToDiskRemoteMeta(part_name, replica_path, to_detached, tmp_prefix_, disk, in, throttler);
        }
        ....
     }
    auto storage_id = data.getStorageID();
    // 7: 構(gòu)建副本的part存放路徑
     String new_part_path = part_type == "InMemory" ? "memory" : fs::path(data.getFullPathOnDisk(disk)) / part_name / "";
    
    auto entry = data.getContext()->getReplicatedFetchList().insert(
        storage_id.getDatabaseName(), storage_id.getTableName(),
        part_info.partition_id, part_name, new_part_path,
        replica_path, uri, to_detached, sum_files_size);

    in.setNextCallback(ReplicatedFetchReadCallback(*entry));

    size_t projections = 0;
    if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)
        readBinary(projections, in);

    MergeTreeData::DataPart::Checksums checksums;
    8: 構(gòu)建副本的part_type 觸發(fā)不同的下載邏輯,一般是downloadPartToDisk
    return part_type == "InMemory"
        ? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, disk, in, projections, throttler)
        : downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, disk, in, projections, checksums, throttler);
}

Next3-downloadPartToDisk:(寫入本地磁盤的方法)

MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
    const String & part_name,
    const String & replica_path,
    bool to_detached,
    const String & tmp_prefix_,
    bool sync,
    DiskPtr disk,
    PooledReadWriteBufferFromHTTP & in,
    size_t projections,
    MergeTreeData::DataPart::Checksums & checksums,
    ThrottlerPtr throttler)
{
   // 1: 定義臨時(shí)目錄前綴
    static const String TMP_PREFIX = "tmp-fetch_";
    String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
    .........
    String part_relative_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
    // 2: 構(gòu)建下載后的存儲(chǔ)路徑 注意是寫到臨時(shí)目錄
    String part_download_path = data.getRelativeDataPath() + part_relative_path + "/";
    // 3: 如果本地磁盤下存在該路徑祟敛,將以前的臨時(shí)目錄刪除
    if (disk->exists(part_download_path))
    {
        LOG_WARNING(log, "Directory {} already exists, probably result of a failed fetch. Will remove it before fetching part.",
            fullPath(disk, part_download_path));
        disk->removeRecursive(part_download_path);
    }
    // 4: 創(chuàng)建臨時(shí)目錄: 例如:/data00/clickhouse/store/85f/85f61e82-f5cc-429b-85f6-1e82f5cc229b/temp_20220103_0_5_1
    disk->createDirectories(part_download_path);
    .....
    // 5: 開始下載 Download the base part
    downloadBaseOrProjectionPartToDisk(replica_path, part_download_path, sync, disk, in, checksums, throttler);
    ....
}

// Next4-downloadBaseOrProjectionPartToDisk

void Fetcher::downloadBaseOrProjectionPartToDisk(
    const String & replica_path,
    const String & part_download_path,
    bool sync,
    DiskPtr disk,
    PooledReadWriteBufferFromHTTP & in,
    MergeTreeData::DataPart::Checksums & checksums,
    ThrottlerPtr throttler) const
{
    size_t files;
    readBinary(files, in);
    //1: 遍歷服務(wù)端part目錄下的文件
    for (size_t i = 0; i < files; ++i)
    {
        String file_name;
        UInt64 file_size;

        readStringBinary(file_name, in);
        readBinary(file_size, in);
        .....
        //2: 創(chuàng)建out 目標(biāo)
        auto file_out = disk->writeFile(fs::path(part_download_path) / file_name);
        HashingWriteBuffer hashing_out(*file_out);
        //3: copy 寫入
        copyDataWithThrottler(in, hashing_out, file_size, blocker.getCounter(), throttler);
        .....
        // 4: 將同步過來的文件 信息添加到checksums.txt
        if (file_name != "checksums.txt" &&
            file_name != "columns.txt" &&
            file_name != IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME)
            checksums.addFile(file_name, file_size, expected_hash);
        if (sync)
            hashing_out.sync();
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
禁止轉(zhuǎn)載疤坝,如需轉(zhuǎn)載請(qǐng)通過簡信或評(píng)論聯(lián)系作者。
  • 序言:七十年代末馆铁,一起剝皮案震驚了整個(gè)濱河市跑揉,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌埠巨,老刑警劉巖历谍,帶你破解...
    沈念sama閱讀 212,454評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異辣垒,居然都是意外死亡望侈,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門乍构,熙熙樓的掌柜王于貴愁眉苦臉地迎上來甜无,“玉大人,你說我怎么就攤上這事哥遮。” “怎么了陵究?”我有些...
    開封第一講書人閱讀 157,921評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵眠饮,是天一觀的道長。 經(jīng)常有香客問我铜邮,道長仪召,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,648評(píng)論 1 284
  • 正文 為了忘掉前任松蒜,我火速辦了婚禮扔茅,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘秸苗。我一直安慰自己召娜,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,770評(píng)論 6 386
  • 文/花漫 我一把揭開白布惊楼。 她就那樣靜靜地躺著玖瘸,像睡著了一般秸讹。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上雅倒,一...
    開封第一講書人閱讀 49,950評(píng)論 1 291
  • 那天璃诀,我揣著相機(jī)與錄音,去河邊找鬼蔑匣。 笑死劣欢,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的裁良。 我是一名探鬼主播凿将,決...
    沈念sama閱讀 39,090評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼趴久!你這毒婦竟也來了丸相?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,817評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤彼棍,失蹤者是張志新(化名)和其女友劉穎灭忠,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體座硕,經(jīng)...
    沈念sama閱讀 44,275評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡弛作,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,592評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了华匾。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片映琳。...
    茶點(diǎn)故事閱讀 38,724評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖蜘拉,靈堂內(nèi)的尸體忽然破棺而出萨西,到底是詐尸還是另有隱情,我是刑警寧澤旭旭,帶...
    沈念sama閱讀 34,409評(píng)論 4 333
  • 正文 年R本政府宣布谎脯,位于F島的核電站,受9級(jí)特大地震影響持寄,放射性物質(zhì)發(fā)生泄漏源梭。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,052評(píng)論 3 316
  • 文/蒙蒙 一稍味、第九天 我趴在偏房一處隱蔽的房頂上張望废麻。 院中可真熱鬧,春花似錦模庐、人聲如沸烛愧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽屑彻。三九已至验庙,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間社牲,已是汗流浹背粪薛。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評(píng)論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留搏恤,地道東北人违寿。 一個(gè)月前我還...
    沈念sama閱讀 46,503評(píng)論 2 361
  • 正文 我出身青樓,卻偏偏與公主長得像熟空,于是被迫代替她去往敵國和親藤巢。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,627評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容