一: 概述 Clickhouse 插入數(shù)據(jù)
Clickhouse 插入數(shù)據(jù)過程
當(dāng)需要在ReplicatedMergeTree中執(zhí)行INSERT以寫入數(shù)據(jù)時(shí),即會(huì)進(jìn)入INSERT核心流程,整個(gè)流程從上至下按照時(shí)間順序進(jìn)行闲先。
NEXT-1: 創(chuàng)建副本
副本1:
create table test_replicated(
id Int8,
ctime DateTime,
name String
)ENGINE = ReplicatedMergeTree('/clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated','replic1')
partition by toYYYYMM(ctime)
order by id;
副本2:
create table test_replicated(
id Int8,
ctime DateTime,
name String
)ENGINE = ReplicatedMergeTree('/clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated','replic2')
partition by toYYYYMM(ctime)
order by id;
在創(chuàng)建的過程中硝枉,ReplicatedMergeTree會(huì)進(jìn)行一些初始化操作孵坚。
-
根據(jù)zk_path初始化所有的ZooKeeper節(jié)點(diǎn)奶陈。
表的zk_path: ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated [alter_partition_version, block_numbers, blocks, columns, leader_election, log, metadata, mutations, nonincrement_block_numbers, part_moves_shard, pinned_part_uuids, quorum, replicas, temp, zero_copy_hdfs, zero_copy_s3]
在/replicas/節(jié)點(diǎn)下注冊(cè)自己的副本實(shí)例replic1或replic2
replic1的zk path:
ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic1
[columns, flags, host, is_active, is_lost, log_pointer, max_processed_insert_time, metadata, metadata_version, min_unprocessed_insert_time, mutation_pointer, parts, queue]
- 啟動(dòng)監(jiān)聽任務(wù),監(jiān)聽/log日志節(jié)點(diǎn)贼陶。
[zk: 10.178.19.198:2181(CONNECTED) 24] ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/log
[log-0000000000]
NEXT-2: 向第一個(gè)副本實(shí)例寫入數(shù)據(jù)
1) 術(shù)語
Block數(shù)據(jù)塊:insert依據(jù)max_insert_block_size的大小(默認(rèn)1048576行)將數(shù)據(jù)切分成若干個(gè)Block數(shù)據(jù)塊巧娱。因此Block數(shù)據(jù)塊是數(shù)據(jù)寫入的基本單元碉怔,并且具有寫入的原子性和唯一性。
每個(gè)壓縮數(shù)據(jù)塊的體積禁添,按照其壓縮前的數(shù)據(jù)字節(jié)大小撮胧,被嚴(yán)格控制在64KB~1MB之間,上下限大小由min_compress_block_size(默認(rèn)65536)
和max_compress_block_size(默認(rèn)1048576)
參數(shù)指定上荡。而每一個(gè)壓縮數(shù)據(jù)塊最終大小趴樱,則和一個(gè)index_granularity內(nèi)實(shí)際的數(shù)據(jù)大小有關(guān)馒闷。
單個(gè)索引粒度間隔數(shù)據(jù)size < 64KB:如果單個(gè)索引粒度數(shù)據(jù)大小小于64KB,則繼續(xù)獲取下一個(gè)索引粒度的數(shù)據(jù)叁征,一直到size >= 64KB纳账,生成下一個(gè)壓縮數(shù)據(jù)塊。
單個(gè)索引粒度間隔數(shù)據(jù) 64KB <= size <= 1MB:如果單個(gè)索引粒度數(shù)據(jù)大小大于64KB捺疼,小于1MB疏虫,則直接生成下一個(gè)壓縮數(shù)據(jù)塊
-
單個(gè)索引粒度間隔數(shù)據(jù) size > 1MB:如果單個(gè)索引粒度數(shù)據(jù)大小超過1MB,則先按照1MB大小截?cái)嗖⑸上乱粋€(gè)壓縮數(shù)據(jù)塊啤呼,剩余數(shù)據(jù)按照這三個(gè)規(guī)則對(duì)應(yīng)執(zhí)行卧秘。這時(shí)就會(huì)出現(xiàn)一批數(shù)據(jù)生成多個(gè)壓縮數(shù)據(jù)塊的情況。
clickhouse數(shù)據(jù)塊.jpg
2) 插入數(shù)據(jù)
replic1寫入如下數(shù)據(jù):
insert into test_replicated values(1,now(),'zs'),(2,now(),'ls');
插入數(shù)據(jù)后如下zk會(huì)有如下數(shù)據(jù):
1: /blocks
ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/blocks
[202201_5253379567567382390_63093905895384417]
get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/blocks/202201_5253379567567382390_63093905895384417
202201_0_0_0
2: log
ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/log
[log-0000000000]
get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/log/log-0000000000
format version: 4
create_time: 2022-01-07 21:37:16
source replica: replic1
block_id: 202201_5253379567567382390_63093905895384417
get
202201_0_0_0
part_type: Compact
NEXT-3: 第二個(gè)副本實(shí)例拉取Log日志
replic2副本會(huì)一直監(jiān)聽/log節(jié)點(diǎn)變化官扣,當(dāng)replic1推送log/log-0000000000后翅敌,replic2會(huì)觸發(fā)日志的拉取任務(wù)并更新log_pointer
get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic1/log_pointer
1
replic2 在拉取了LogEntry之后,它并不會(huì)直接執(zhí)行惕蹄,而是將其轉(zhuǎn)為任務(wù)對(duì)象放至隊(duì)列:
ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic2/queue
queue-0000000001
NEXT-4: 第二個(gè)副本實(shí)例向其他副本發(fā)起下載請(qǐng)求
replic2基于/queue隊(duì)列開始執(zhí)行任務(wù)蚯涮。當(dāng)看到type類型為get的時(shí)候,ReplicatedMerge-Tree會(huì)明白此時(shí)在遠(yuǎn)端的其他副本中已經(jīng)成功寫入了數(shù)據(jù)分區(qū)卖陵,而自己需要同步這些數(shù)據(jù)遭顶。
replic2上的第二個(gè)副本實(shí)例會(huì)開始選擇一個(gè)遠(yuǎn)端的其他副本作為數(shù)據(jù)的下載來源。遠(yuǎn)端副本的選擇算法大致是這樣的:
(1)從/replicas節(jié)點(diǎn)拿到所有的副本節(jié)點(diǎn)泪蔫。
(2)遍歷這些副本棒旗,選取其中一個(gè)。選取的副本需要擁有最大的log_pointer下標(biāo)撩荣,并且/queue子節(jié)點(diǎn)數(shù)量最少铣揉。log_pointer下標(biāo)最大,意味著該副本執(zhí)行的日志最多婿滓,數(shù)據(jù)應(yīng)該更加完整老速;而/queue最小,則意味著該副本目前的任務(wù)執(zhí)行負(fù)擔(dān)較小凸主。
例如:
Sending request to http://replic1.clickhouse-headless.cluster.local:9009/?endpoint=DataPartsExchange……
NEXT-5 第一個(gè)副本實(shí)例響應(yīng)數(shù)據(jù)下載
replic1的DataPartsExchange端口服務(wù)接收到調(diào)用請(qǐng)求橘券,在得知對(duì)方來意之后,根據(jù)參數(shù)做出響應(yīng)卿吐,將本地分區(qū)202107_0_0_0基于DataPartsExchang的服務(wù)響應(yīng)發(fā)送回replic1
NEXT-6 第二個(gè)副本實(shí)例下載數(shù)據(jù)并完成本地寫入
首先將其寫至臨時(shí)目錄
```
tmp_fetch_202107_1_1_0
```
待全部數(shù)據(jù)接收完成之后旁舰,重命名該目錄
```
Renaming temporary part tmp_fetch_202107_1_1_0 to 202107_1_1_0.
```
二: Clickhouse副本同步過程
2.1 哪些操作會(huì)引發(fā)數(shù)據(jù)同步?
數(shù)據(jù)同步包括了元數(shù)據(jù)和內(nèi)容數(shù)據(jù)的同步嗡官。在ClickHouse中箭窜,數(shù)據(jù)插入(INSERT),元數(shù)據(jù)變更(ALTER)衍腥,內(nèi)容數(shù)據(jù)變更(MUTATION)磺樱,合并(MERGE)等操作都會(huì)引發(fā)數(shù)據(jù)同步纳猫。
其中MERGE
和MUTATION
操作 只能由主副本主導(dǎo), 主副本選舉過程如下:
/leader_election**:用于主副本的選舉工作竹捉,主副本會(huì)主導(dǎo)`MERGE`和`MUTATION`操作(`ALTER DELETE`和`ALTER UPDATE`)芜辕。這些任務(wù)在主副本完成之后再借助ZooKeeper將消息事件分發(fā)至其他副本
主副本選舉的方式是向/leader_election/插入子節(jié)點(diǎn),第一個(gè)插入成功的副本 就是主副本
2.2: INSERT同步
clickhouse執(zhí)行ReplicatedMergeTree表的插入操作時(shí)块差,會(huì)將數(shù)據(jù)以data part為單位寫入到磁盤/內(nèi)存里侵续,每個(gè)data part寫入完成后都會(huì)做一個(gè)commit操作, 這個(gè)commit操作會(huì)生成一個(gè)GET_PART類型的log entry并上傳到zookeeper中供其他副本拉取同步。
由此可見憨闰,INSERT語句引發(fā)的是data part級(jí)別的內(nèi)容數(shù)據(jù)的同步状蜗。
GET_PART類型的log entry 文件內(nèi)容demo:
format version: 4
create_time: 2022-01-07 21:37:16
source replica: replic1
block_id: 202201_5253379567567382390_63093905895384417
get
202201_0_0_0
part_type: Compact
2.3 同步日志和副本隊(duì)列(log, queue)
2.3.1 同步日志(log)
log位于zookeeper_path下面,用于保存所有副本間的數(shù)據(jù)同步日志(稱為log entry)鹉动,ClickHouse支持的LogEntry類型有:
筆記:
每插入一個(gè)數(shù)據(jù) 都會(huì)生成一個(gè)entity log:
[zk: localhost:2181(CONNECTED) 20] ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/log
[log-0000000000, log-0000000001]
原理: 在一個(gè)副本上執(zhí)行insert into ...操作會(huì)上傳一個(gè)GET_PART類型的log entry到log節(jié)點(diǎn)下轧坎。
zookeeper_path/log下的log entry在zookeeper上的znode命名規(guī)范是log-seqNum, 其中seqNum為創(chuàng)建sequential znode生成的自增序號(hào),如log-0000000001训裆。
log entity 下有什么東西:
[zk: localhost:2181(CONNECTED) 22] get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/log/log-0000000001
format version: 4
create_time: 2022-01-07 22:08:20
source replica: replic1 //數(shù)據(jù)源
block_id: 202201_4569856920166365182_7768759302129368290 // 分區(qū)的block_id
get
202201_1_1_0 //從那個(gè)part下載
part_type: Compact
2.3.2 副本隊(duì)列 (queue)
queue位于replica_path下面眶根,保存了當(dāng)前副本需要同步的log entries. queue里面的log entries是從log中拉取的(由queue updating task 任務(wù)負(fù)責(zé)拉取)边琉,被所有副本都拉取過的log entry就可以從log中刪除掉, 刪除操作由后臺(tái)清理線程執(zhí)行。
replica_path/queue下的entry在zookeeper上的znode命名規(guī)范是queue-seqNum, 其中seqNum為創(chuàng)建sequential znode生成的自增序號(hào)记劝,如queue-0000000001变姨。
replica_path下還有一個(gè)log_pointer節(jié)點(diǎn),保存了當(dāng)前副本從log拉取到的最大log entry id + 1厌丑,即下一個(gè)需要拉取的log entry的id.
[zk:localhost:2181(CONNECTED) 11] get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic1/log_pointer //副本2
2
[zk:localhost:2181(CONNECTED) 12] get /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic2/log_pointer // 副本1
2
3 負(fù)責(zé)數(shù)據(jù)同步的tasks
每個(gè)副本節(jié)點(diǎn)都有一系列的負(fù)責(zé)數(shù)據(jù)同步的任務(wù)定欧,這些任務(wù)會(huì)與zookeeper交互,獲取最新的同步任務(wù)怒竿,然后在本地內(nèi)存中維護(hù)任務(wù)隊(duì)列并借助任務(wù)池調(diào)度執(zhí)行砍鸠,執(zhí)行完成后會(huì)更新zookeer上的對(duì)應(yīng)狀態(tài)。下面我們逐一介紹各類同步任務(wù)耕驰。
3.1 queue updating task
queue updating task負(fù)責(zé)當(dāng)前副本上同步日志(log entry)隊(duì)列的更新爷辱。
ReplicatedMergeTree表對(duì)應(yīng)的StoreageReplicatedMergeTree類中有一個(gè)queue_updating_task對(duì)象,它是由后臺(tái)調(diào)度池(BackgroundSchedulePool)調(diào)度執(zhí)行的任務(wù)朦肘,其執(zhí)行的函數(shù)是
StorageReplicatedMergeTree::queueUpdatingTask()
queue updating task做的事情包括:
1. 從zookeeper拉取zookeeper_path/log下的所有l(wèi)og entries饭弓,并從replica_path/log_pointer獲取當(dāng)前副本的log_pointer (即下一個(gè)需要拉取的log entry的id)。
2. 根據(jù)log_pointer在log entries中定位到需要拉取的entries媒抠,如果log_pointer為空弟断,則從log entries中最小的entry開始拉取。
3. 將需要拉取的log entries都同步到replica_path/queue下面趴生,這里的同步就是以選定的log entries的內(nèi)容為節(jié)點(diǎn)內(nèi)容阀趴,一個(gè)一個(gè)地在replica_path/queue下創(chuàng)建新的sequential znode. 注意昏翰,這里創(chuàng)建的znode名稱并不會(huì)和zookeeper_path/log下的一致。
4. 更新replica_path/log_pointer的內(nèi)容為拉取到的log entries中最大的entry id + 1.
5. 將拉取到的log entries插入到內(nèi)存中的同步任務(wù)隊(duì)列里刘急,即插入到ReplicatedMergeTreeQueue::queue矩父。
6. 觸發(fā)queue executing task(下文會(huì)介紹)執(zhí)行。
這些步驟的實(shí)現(xiàn)細(xì)節(jié)請(qǐng)見源碼ReplicatedMergeTreeQueue::pullLogsToQueue.
void StorageReplicatedMergeTree::queueUpdatingTask()
{
....
try
{
queue.pullLogsToQueue(getZooKeeper(), queue_updating_task->getWatchCallback());
.....
}
catch (const Coordination::Exception & e)
{
....
}
....
}
3.2 ReplicatedMergeTreeQueue::queue 隊(duì)列任務(wù)
StorageReplicatedMergeTree::queueUpdatingTask() 會(huì)將拉取到的log entries插入到內(nèi)存中的同步任務(wù)隊(duì)列里.
3.3 queue executing task
queue updating task把同步日志(log entries)拉取到本地內(nèi)存后排霉,我們需要一個(gè)任務(wù)去執(zhí)行這些log entries. 這個(gè)任務(wù)就是queue executing task.
queue executing 最終調(diào)用執(zhí)行的函數(shù)是 StorageReplicatedMergeTree::queueTask
下面介紹queueTask函數(shù)的主要執(zhí)行步驟窍株。
step1: 選擇要處理的log entry
對(duì)于GET_PART類型的log entry,如果它生成的data part被某個(gè)正在執(zhí)行的log entry的resulting data parts所包含攻柠,則當(dāng)前entry此輪不被選擇球订。
step2: 處理選中的log entry
選中l(wèi)og entry后,StorageReplicatedMergeTree::queueTask函數(shù)會(huì)調(diào)用ReplicatedMergeTreeQueue::processEntry對(duì)log entry進(jìn)行處理瑰钮。processEntry函數(shù)的第三個(gè)參數(shù)是用于執(zhí)行l(wèi)og entry的函數(shù)冒滩,此處傳入的是StorageReplicatedMergeTree::executeLogEntry
源碼:
BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask()
{
....
// 對(duì)正在執(zhí)行的entry用selected變量來表示,方便以后處理
ReplicatedMergeTreeQueue::SelectedEntry selected;
try
{
// 1: 選擇需要處理的entrys
selected = queue.selectEntryToProcess(merger_mutator, *this);
}
....
LogEntryPtr & entry = selected.first;
// 2: 如果沒有需要處理的直接返回
if (!entry)
return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
// 3: 有需要處理的執(zhí)行以下代碼
time_t prev_attempt_time = entry->last_attempt_time;
// 這里是真正的處理方法浪谴,并且傳入了一個(gè)匿名方法
bool res = queue.processEntry([this]{ return getZooKeeper(); }, entry, [&](LogEntryPtr & entry_to_process)
{
try
{
// 4: 這里是真正的執(zhí)行器executeLogEntry
return executeLogEntry(*entry_to_process);
}
....
});
....
}
3.4 StorageReplicatedMergeTree::executeLogEntry
每次queue executing task執(zhí)行只會(huì)選擇一個(gè)log entry進(jìn)行處理开睡。選中l(wèi)og entry后,StorageReplicatedMergeTree::queueTask函數(shù)會(huì)最終會(huì)調(diào)用StorageReplicatedMergeTree::executeLogEntry進(jìn)行處理
每個(gè)log entiry的處理邏輯不一樣苟耻, 本節(jié)對(duì)于GET_PART類型的log entry篇恒,處理流程如下:
step1: 判斷目標(biāo)data part在當(dāng)前副本是否已經(jīng)存在或被已存在的data part所包含,這里不僅會(huì)檢查active data parts凶杖,也會(huì)檢查處于MergeTreeDataPartState::PreCommitted狀態(tài)的data parts胁艰。
step2: 判斷目標(biāo)data part在zookeeper的replica_path/parts節(jié)點(diǎn)下是否已經(jīng)存在。
step3: 如果step1和step2的判斷結(jié)果都是已存在智蝠,則直接跳過當(dāng)前l(fā)og entry腾么,不做處理(此時(shí)executeLogEntry返回true,表示處理成功)杈湾。
step4: 否則解虱,進(jìn)一步判斷目標(biāo)data part是否是某次失敗的write with quorum操作的resulting data part,判斷方法是查看zookeeper上zookeeper_path/quorum/failed_parts節(jié)點(diǎn)下是否存在目標(biāo)data part漆撞。如果存在殴泰,則和2.3一樣,直接跳過當(dāng)前l(fā)og entry叫挟,不做處理(此時(shí)executeLogEntry返回true艰匙,表示處理成功)。
step5: 否則抹恳,從其他有目標(biāo)data part的副本節(jié)點(diǎn)去拉取目標(biāo)data part员凝。拉取是調(diào)用StorageReplicatedMergeTree::executeFetch實(shí)現(xiàn)的。
以上過程對(duì)應(yīng)源碼如下:
bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
{
....
if (entry.type == LogEntry::GET_PART ||
entry.type == LogEntry::MERGE_PARTS ||
entry.type == LogEntry::MUTATE_PART)
{
//1:判斷目標(biāo)data part在當(dāng)前副本是否已經(jīng)存在或被已存在的data part所包含奋献,這里不僅會(huì)檢查active data parts健霹,也會(huì)檢查處于MergeTreeDataPartState::PreCommitted狀態(tài)的data parts
DataPartPtr existing_part = getPartIfExists(entry.new_part_name, {MergeTreeDataPartState::PreCommitted});
if (!existing_part)
// 2: getActiveContainingPart 判斷分區(qū)是否已經(jīng) merge 到磁盤
existing_part = getActiveContainingPart(entry.new_part_name);
/**
3: 本地存在的話,判斷目標(biāo)data part在zookeeper的*replica_path/parts*節(jié)點(diǎn)下是否已經(jīng)存在旺上。
例如:
[zk: localhost:2181(CONNECTED) 59] ls /clickhouse/ssc_common_test_cluster/test_clickhousedb/test_replicated/replicas/replic1/parts
[202201_0_0_0, 202201_1_1_0]
*
**/
if (existing_part && getZooKeeper()->exists(replica_path + "/parts/" + existing_part->name))
{
....
// 4: 如果 part在當(dāng)前副本是否已經(jīng)存在或zk已經(jīng)存在,則直接跳過當(dāng)前l(fā)og entry糖埋,不做處理(此時(shí)executeLogEntry返回true宣吱,表示處理成功
return true;
}
bool do_fetch = false;
switch (entry.type)
{
case LogEntry::ATTACH_PART:
/// We surely don't have this part locally as we've checked it before, so download it.
[[fallthrough]];
case LogEntry::GET_PART:
do_fetch = true;
break;
.....
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected log entry type: {}", static_cast<int>(entry.type));
}
// 5: 對(duì)于get類型part, 則直接調(diào)用 fetch
if (do_fetch)
// 實(shí)際調(diào)用的是executeFetch
return executeFetch (entry);
return true;
3.4 StorageReplicatedMergeTree::executeFetch
executeFetch方法是處理拉取part的拉取任務(wù)
源碼如下:
bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
{
/// 1: 查找是否有需要覆蓋的part
String replica = findReplicaHavingCoveringPart(entry, true);
const auto storage_settings_ptr = getSettings();
// 2: 設(shè)置一些并行參數(shù),判斷replicated_max_parallel_fetches和
// replicated_max_parallel_fetches_for_table是否符合要求
static std::atomic_uint total_fetches {0};
// 判斷是否超過拉取次數(shù)限制(storage級(jí)別)
if (storage_settings_ptr->replicated_max_parallel_fetches && total_fetches >= storage_settings_ptr->replicated_max_parallel_fetches)
{
throw Exception("Too many total fetches from replicas, maximum: " + storage_settings_ptr->replicated_max_parallel_fetches.toString(),
ErrorCodes::TOO_MANY_FETCHES);
}
++total_fetches;
SCOPE_EXIT({--total_fetches;});
// 判斷是否超過拉取次數(shù)限制(table級(jí)別)
if (storage_settings_ptr->replicated_max_parallel_fetches_for_table && current_table_fetches >= storage_settings_ptr->replicated_max_parallel_fetches_for_table)
{
throw Exception("Too many fetches from replicas for table, maximum: " + storage_settings_ptr->replicated_max_parallel_fetches_for_table.toString(),
ErrorCodes::TOO_MANY_FETCHES);
}
++current_table_fetches;
SCOPE_EXIT({--current_table_fetches;});
.....
try
{
String part_name = entry.actual_new_part_name.empty() ? entry.new_part_name : entry.actual_new_part_name;
// 3: 拉取part
if (!fetchPart(part_name, zookeeper_path + "/replicas/" + replica, false, entry.quorum))
return false;
}
.....
}
...
return true;
}
Next1-StorageReplicatedMergeTree::fetchPart 方法
executeFetch 最終拉取part 調(diào)用的是fetchPart 方法, fetchPart 方法實(shí)現(xiàn)如下:
bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & source_replica_path, bool to_detached, size_t quorum)
{
auto zookeeper = zookeeper_ ? zookeeper_ : getZooKeeper();
const auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
...
LOG_DEBUG(log, "Fetching part {} from {}", part_name, source_replica_path);
/***
2022.01.05 16:29:06.940980 [ 4423 ] {} <Debug> hdp_teu_dpd_clickhousedb.hybrid_test_tongbu (4058c757-c516-4345-8058-c757c5166345): Fetching part 20220105_1_1_0 from /clickhouse/common_test_cluster/hdp_teu_dpd_clickhousedb/hybrid_test_tongbu/shard2/replicas/replic2
**/
...
std::function<MutableDataPartPtr()> get_part;
if (part_to_clone)
{
.....
}
else
{
// 1: 獲取需要clone數(shù)據(jù)的副本地址
ReplicatedMergeTreeAddress address(getZooKeeper()->get(source_replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(global_context);
auto user_password = global_context.getInterserverCredentials();
String interserver_scheme = global_context.getInterserverScheme();
get_part = [&, address, timeouts, user_password, interserver_scheme]()
{
...
// 2: 這里的fetchPart主要就是構(gòu)造HTTP參數(shù)及連接真正拉取數(shù)據(jù)
/**
參數(shù) demo:
source_replica_path: zookeeper_path + "/replicas/" + replica
address.host:hostname1
port: 9009
interserver_scheme:
to_detached: false
part_name : 202107_1_1_0
**/
// 3: 最終調(diào)用的是 fetcher.fetchPart 方法
return fetcher.fetchPart(
part_name, source_replica_path,
address.host, address.replication_port,
timeouts, user_password.first, user_password.second, interserver_scheme, to_detached);
};
}
...
return true;
}
Next2-fetcher.fetchPart 代碼如下(DataPartsExchange.cpp):
MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
const String & part_name,
const String & replica_path,
const String & host,
int port,
const ConnectionTimeouts & timeouts,
const String & user,
const String & password,
const String & interserver_scheme,
ThrottlerPtr throttler,
bool to_detached,
const String & tmp_prefix_,
std::optional<CurrentlySubmergingEmergingTagger> * tagger_ptr,
bool try_zero_copy,
DiskPtr disk)
{
auto part_info = MergeTreePartInfo::fromPartName(part_name, data.format_version);
const auto data_settings = data.getSettings();
// 1: 設(shè)置 uri
Poco::URI uri;
uri.setScheme(interserver_scheme);
uri.setHost(host);
uri.setPort(port);
uri.setQueryParameters(
{
{"endpoint", getEndpointId(replica_path)},
{"part", part_name},
{"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)},
{"compress", "false"}
});
Strings capability;
if (try_zero_copy && data_settings->allow_remote_fs_zero_copy_replication)
{
/*******
2: 判斷是如果是zero_copy 添加:
capability.push_back(DiskType::toString(disk->getType()));
細(xì)節(jié)略
***/
}
if (!capability.empty())
{
const String & remote_fs_metadata = boost::algorithm::join(capability, ", ");
uri.addQueryParameter("remote_fs_metadata", remote_fs_metadata);
}
else
{
try_zero_copy = false;
}
Poco::Net::HTTPBasicCredentials creds{};
if (!user.empty())
{
creds.setUsername(user);
creds.setPassword(password);
}
PooledReadWriteBufferFromHTTP in{
uri,
Poco::Net::HTTPRequest::HTTP_POST,
{},
timeouts,
creds,
DBMS_DEFAULT_BUFFER_SIZE,
0, /* no redirects */
data_settings->replicated_max_parallel_fetches_for_host
};
int server_protocol_version = parse<int>(in.getResponseCookie("server_protocol_version", "0"));
ReservationPtr reservation;
size_t sum_files_size = 0;
// 3: 獲取服務(wù)端part目下文件的累計(jì)大小
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
{
readBinary(sum_files_size, in);
....
}
.....
bool sync = (data_settings->min_compressed_bytes_to_fsync_after_fetch
&& sum_files_size >= data_settings->min_compressed_bytes_to_fsync_after_fetch);
String part_type = "Wide";
// 4: 讀取server端part的part_type
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE)
readStringBinary(part_type, in);
// 5: 讀取server端part的part_uuid
UUID part_uuid = UUIDHelpers::Nil;
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID)
readUUIDText(part_uuid, in);
String remote_fs_metadata = parse<String>(in.getResponseCookie("remote_fs_metadata", ""));
if (!remote_fs_metadata.empty())
{
// 6: 如果是遠(yuǎn)程服務(wù)端走元數(shù)據(jù)同步邏輯瞳别,如s3或者 hdfs (相當(dāng)于握手協(xié)議征候,雙方都是zero-copy存儲(chǔ)類型才可以走元數(shù)據(jù)同步邏輯)
try
{
return downloadPartToDiskRemoteMeta(part_name, replica_path, to_detached, tmp_prefix_, disk, in, throttler);
}
....
}
auto storage_id = data.getStorageID();
// 7: 構(gòu)建副本的part存放路徑
String new_part_path = part_type == "InMemory" ? "memory" : fs::path(data.getFullPathOnDisk(disk)) / part_name / "";
auto entry = data.getContext()->getReplicatedFetchList().insert(
storage_id.getDatabaseName(), storage_id.getTableName(),
part_info.partition_id, part_name, new_part_path,
replica_path, uri, to_detached, sum_files_size);
in.setNextCallback(ReplicatedFetchReadCallback(*entry));
size_t projections = 0;
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)
readBinary(projections, in);
MergeTreeData::DataPart::Checksums checksums;
8: 構(gòu)建副本的part_type 觸發(fā)不同的下載邏輯,一般是downloadPartToDisk
return part_type == "InMemory"
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, disk, in, projections, throttler)
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, disk, in, projections, checksums, throttler);
}
Next3-downloadPartToDisk:(寫入本地磁盤的方法)
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
const String & part_name,
const String & replica_path,
bool to_detached,
const String & tmp_prefix_,
bool sync,
DiskPtr disk,
PooledReadWriteBufferFromHTTP & in,
size_t projections,
MergeTreeData::DataPart::Checksums & checksums,
ThrottlerPtr throttler)
{
// 1: 定義臨時(shí)目錄前綴
static const String TMP_PREFIX = "tmp-fetch_";
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
.........
String part_relative_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
// 2: 構(gòu)建下載后的存儲(chǔ)路徑 注意是寫到臨時(shí)目錄
String part_download_path = data.getRelativeDataPath() + part_relative_path + "/";
// 3: 如果本地磁盤下存在該路徑祟敛,將以前的臨時(shí)目錄刪除
if (disk->exists(part_download_path))
{
LOG_WARNING(log, "Directory {} already exists, probably result of a failed fetch. Will remove it before fetching part.",
fullPath(disk, part_download_path));
disk->removeRecursive(part_download_path);
}
// 4: 創(chuàng)建臨時(shí)目錄: 例如:/data00/clickhouse/store/85f/85f61e82-f5cc-429b-85f6-1e82f5cc229b/temp_20220103_0_5_1
disk->createDirectories(part_download_path);
.....
// 5: 開始下載 Download the base part
downloadBaseOrProjectionPartToDisk(replica_path, part_download_path, sync, disk, in, checksums, throttler);
....
}
// Next4-downloadBaseOrProjectionPartToDisk
void Fetcher::downloadBaseOrProjectionPartToDisk(
const String & replica_path,
const String & part_download_path,
bool sync,
DiskPtr disk,
PooledReadWriteBufferFromHTTP & in,
MergeTreeData::DataPart::Checksums & checksums,
ThrottlerPtr throttler) const
{
size_t files;
readBinary(files, in);
//1: 遍歷服務(wù)端part目錄下的文件
for (size_t i = 0; i < files; ++i)
{
String file_name;
UInt64 file_size;
readStringBinary(file_name, in);
readBinary(file_size, in);
.....
//2: 創(chuàng)建out 目標(biāo)
auto file_out = disk->writeFile(fs::path(part_download_path) / file_name);
HashingWriteBuffer hashing_out(*file_out);
//3: copy 寫入
copyDataWithThrottler(in, hashing_out, file_size, blocker.getCounter(), throttler);
.....
// 4: 將同步過來的文件 信息添加到checksums.txt
if (file_name != "checksums.txt" &&
file_name != "columns.txt" &&
file_name != IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME)
checksums.addFile(file_name, file_size, expected_hash);
if (sync)
hashing_out.sync();
}
}