ClickHouse復制表同步機制淺析

ReplicatedMergeTree是ClickHouse最常用的表引擎之一沙庐,該引擎和MergeTree一樣都繼承自MergeTreeData, 和MergeTree共享相同的底層存儲層實現(xiàn)俊嗽。ReplicatedMergeTree區(qū)別于MergeTree引擎的地方在于其實現(xiàn)了多副本數(shù)據(jù)存儲滴某,并借助zookeeper進行多副本之間的數(shù)據(jù)同步。

本文針對ReplicatedMergeTree引擎如何實現(xiàn)副本間數(shù)據(jù)同步,結(jié)合部分源代碼(20.10版本)做一個簡單剖析。

0 術(shù)語解釋

data part : clickhouse中的數(shù)據(jù)組織單位,一次INSERT會生成一個或多個data part, 一個data part里的數(shù)據(jù)存放在同一個文件夾里辙诞。

mutation : 對一個或多個data parts的內(nèi)容數(shù)據(jù)的修改操作,最典型的觸發(fā)mutation的操作是:ALTER TABLE ... DELETE 和 ALTER TABLE ... UPDATE(詳見mutations)轻抱。

1 哪些操作會引發(fā)數(shù)據(jù)同步飞涂?

這里說的數(shù)據(jù)同步包括了元數(shù)據(jù)和內(nèi)容數(shù)據(jù)的同步。在ClickHouse中祈搜,數(shù)據(jù)插入(INSERT)较店,元數(shù)據(jù)變更(ALTER),內(nèi)容數(shù)據(jù)變更(MUTATION)容燕,合并(MERGE)等操作都會引發(fā)數(shù)據(jù)同步梁呈。

1.1 INSERT

clickhouse執(zhí)行ReplicatedMergeTree表的插入操作時,會將數(shù)據(jù)以data part為單位寫入到磁盤/內(nèi)存里蘸秘,每個data part寫入完成后都會做一個commit操作, 這個commit操作會生成一個GET_PART類型的log entry并上傳到zookeeper中供其他副本拉取同步官卡,詳見源碼蝗茁。

由此可見,INSERT語句引發(fā)的是data part級別的內(nèi)容數(shù)據(jù)的同步寻咒。

1.2 ALTER METADATA

這里之所以稱為'ALTER METADATA'而不直接稱'ALTER'哮翘,是因為,在ClickHouse中update和delete也是通過ALTER語句實現(xiàn)的毛秘,即ALTER TABLE ... DELETE 和 ALTER TABLE ... UPDATE.? ALTER TABLE ... DELETE 和 ALTER TABLE ... UPDATE語句觸發(fā)的是mutations饭寺,所以它們被歸為mutation的范疇。

這里說的是會對表的元數(shù)據(jù)進行修改的ALTER語句叫挟。ClickHouse將ReplicatedMergeTree表的元數(shù)據(jù)信息存放在zookeeper和本地文件中艰匙,ALTER語句會觸發(fā)對這兩個地方的元數(shù)據(jù)信息進行修改。注意抹恳,在zookeeper上旬薯,表的元數(shù)據(jù)信息會在表級別和副本級別的znodes(即下文說的zookeeper_path和replica_path)下都進行存儲。

alter語句首先會修改掉zookeeper上zookeeper_path下的元數(shù)據(jù)适秩,然后將修改后的元數(shù)據(jù)包裝生成一個ALTER_METADATA類型的ReplicatedMergeTreeLogEntryData對象,并將其上傳到zookeeper上硕舆,供所有副本(包括當前副本)拉取更新秽荞。詳見源碼

注意抚官,這里說的”會對表的元數(shù)據(jù)進行修改的ALTER語句“ 有一些同時也會觸發(fā)mutation扬跋,下文將進一步介紹。

1.3 MERGE

merge操作用于將若干個data parts合并為單個data part凌节,ClickHouse會在后臺選擇需要做merge的data parts钦听,然后創(chuàng)建一個MERGE_PARTS類型的ReplicatedMergeTreeLogEntryData對象,并將其上傳到zookeeper上倍奢,供所有副本(包括當前副本)拉取同步朴上。詳見源碼StorageReplicatedMergeTree::mergeSelectingTask()?.

1.4 MUTATION

mutation是對data parts數(shù)據(jù)的修改操作,一個mutation可能包含了對很多個data parts的修改卒煞。

除了ALTER TABLE ... DELETE 和 ALTER TABLE ... UPDATE痪宰,以下類型的ALTER語句也會觸發(fā)mutation :

? ? 1. DROP_COLUMN

? ? 2. DROP_INDEX

? ? 3. RENAME_COLUMN

? ? 4. 非meta only的MODIFY_COLUMN

對于會觸發(fā)mutation的ALTER語句,ClickHouse會生成一個ReplicatedMergeTreeMutationEntry對象畔裕,并將其上傳到zookeeper衣撬,供所有副本(包括當前副本)拉取同步。

詳見源碼StorageReplicatedMergeTree::alter?和 AlterCommand::isRequireMutationStage.

2 ZooKeeper的結(jié)構(gòu)和作用

zookeeper是ReplicateMergeTree表副本之間進行數(shù)據(jù)同步的橋梁扮饶,存儲了復制表的各類元數(shù)據(jù)信息具练。這里介紹幾個和數(shù)據(jù)同步相關(guān)的主要信息。

2.1 zookeeper_path和replica_path

一張ReplicatedMergeTree表的所有副本在zookeeper上共享一個znode路徑甜无,我們稱之為zookeeper_path扛点,這個路徑在建表時指定哥遮,我們對該路徑的命名規(guī)范是:

????????/clickhouse/tables/{layer}-{shard}/<table_name>

其中l(wèi)ayer和shard都是macro,各個副本節(jié)點取值可能不同占键。

zookeeper_path是表級別的昔善,其下有一個replicas節(jié)點,內(nèi)部包含了各個副本的元信息節(jié)點畔乙,即:

????? ? /clickhouse/tables/{layer}-{shard}/<table_name>/replicas/<replica>

我們稱副本的元信息節(jié)點為replica_path.

2.2 表的元信息(metadata, columns)

在zookeeper_path和replica_path下都保存了表的metadata和columns信息君仆,元數(shù)據(jù)變更時會先修改zookeeper_path下的metadata和columns,然后會異步地將更新同步到replica_path下牲距。

除此之外返咱,replica_path下面還會有一個metadata_version用于記錄當前元數(shù)據(jù)信息的版本,這個值就是zookeeper_path下的metadata節(jié)點的dataVersion.

zookeeper_path下的metadata和columns信息的更新詳見源碼牍鞠。

replica_path下的metadata, columns和metadata_version的更新見源碼咖摹。

2.3 同步日志和副本隊列(log, queue)

2.3.1 同步日志(log)

log位于zookeeper_path下面,用于保存所有副本間的數(shù)據(jù)同步日志(稱為log entry)难述,ClickHouse支持的LogEntry類型有:

Supported LogEntry type

從各個副本觸發(fā)的數(shù)據(jù)變更操作都會上傳對應的log entry到log節(jié)點下萤晴,比如:

? ? ? ? 1. 在一個副本上執(zhí)行alter table ... add column操作會上傳一個ALTER_METADATA類型的log entry到log節(jié)點下。

? ? ? ? 2. 在一個副本上執(zhí)行insert into ...操作會上傳一個GET_PART類型的log entry到log節(jié)點下胁后。

zookeeper_path/log下的log entry在zookeeper上的znode命名規(guī)范是log-seqNum, 其中seqNum為創(chuàng)建sequential znode生成的自增序號店读,如log-0000000001。

2.3.2 副本隊列 (queue)

queue位于replica_path下面攀芯,保存了當前副本需要同步的log entries. queue里面的log entries是從log中拉取的(由下文說的queue updating task負責拉韧投稀),被所有副本都拉取過的log entry就可以從log中刪除掉, 刪除操作由后臺清理線程執(zhí)行侣诺。

replica_path下還有一個log_pointer節(jié)點殖演,保存了當前副本從log拉取到的最大log entry id + 1,即下一個需要拉取的log entry的id.

replica_path/queue下的entry在zookeeper上的znode命名規(guī)范是queue-seqNum, 其中seqNum為創(chuàng)建sequential znode生成的自增序號年鸳,如queue-0000000001趴久。

2.4 數(shù)據(jù)變更(mutations)

在zookeeper_path下面有一個mutations節(jié)點,ALTER操作觸發(fā)的mutation操作都會先封裝成mutation entry 后被上傳到這個節(jié)點搔确。mutation entry保存了mutation操作的相關(guān)信息朋鞍,包括:

????a. source replica : 觸發(fā)mutation的副本節(jié)點。

? ? b. mutation commands : 需要apply到data parts上的mutation指令妥箕。

? ? c. block numbers :這是一個partition_id -> block_number的映射滥酥,保存了每個partition的mutation version。mutation只會被apply到partition中block number小于mutation version的blocks(因為merge的關(guān)系畦幢,每個data part可能包含一個或多個blocks坎吻,并且data part包含的block numbers不一定是連續(xù)的)。

????d. alter version : mutation操作對應的metadata version宇葱,只有ALTER MODIFY/DROP 操作觸發(fā)的mutation才會有非-1的值瘦真,對于ALTER TABLE ... DELETE 和 ALTER TABLE ... UPDATE操作觸發(fā)的mutation操作刊头,這個值都為-1。這個值是為了將ALTER MODIFY/DROP的ALTER METADATA操作和MUTATION操作關(guān)聯(lián)起來诸尽。

mutation entry的結(jié)構(gòu)詳見ReplicatedMergeTreeMutationEntry原杂。

一個mutation entry會被轉(zhuǎn)換成若干個MUTATE_PART類型的log entry被投放到同步日志(zookeeper_path/log)中,下文介紹merge/mutation selecting task時會介紹其轉(zhuǎn)換過程您机。

另外穿肄,在replica_path下面有一個mutation_pointer節(jié)點,用于記錄副本上最后完成的mutation的id(也就是znode name).

3 負責數(shù)據(jù)同步的tasks

每個副本節(jié)點都有一系列的負責數(shù)據(jù)同步的任務际看,這些任務會與zookeeper交互咸产,獲取最新的同步任務,然后在本地內(nèi)存中維護任務隊列并借助任務池調(diào)度執(zhí)行仲闽,執(zhí)行完成后會更新zookeer上的對應狀態(tài)脑溢。下面我們逐一介紹各類同步任務。

3.1 queue updating task

queue updating task負責當前副本上同步日志(log entry)隊列的更新赖欣。

ReplicatedMergeTree表對應的StoreageReplicatedMergeTree類中有一個queue_updating_task對象屑彻,它是由后臺調(diào)度池(BackgroundSchedulePool)調(diào)度執(zhí)行的任務,其執(zhí)行的函數(shù)是StorageReplicatedMergeTree::queueUpdatingTask().

3.1.1 工作內(nèi)容

queue updating task做的事情包括:

1. 從zookeeper拉取zookeeper_path/log下的所有l(wèi)og entries顶吮,并從replica_path/log_pointer獲取當前副本的log_pointer (即下一個需要拉取的log entry的id)社牲。

2. 根據(jù)log_pointer在log entries中定位到需要拉取的entries,如果log_pointer為空云矫,則從log entries中最小的entry開始拉取。

3. 將需要拉取的log entries都同步到replica_path/queue下面汗菜,這里的同步就是以選定的log entries的內(nèi)容為節(jié)點內(nèi)容让禀,一個一個地在replica_path/queue下創(chuàng)建新的sequential znode. 注意,這里創(chuàng)建的znode名稱并不會和zookeeper_path/log下的一致陨界。

4. 更新replica_path/log_pointer的內(nèi)容為拉取到的log entries中最大的entry id + 1.

5. 將拉取到的log entries插入到內(nèi)存中的同步任務隊列里巡揍,即插入到ReplicatedMergeTreeQueue::queue

6. 觸發(fā)queue executing task(下文會介紹)執(zhí)行菌瘪。

這些步驟的實現(xiàn)細節(jié)請見源碼ReplicatedMergeTreeQueue::pullLogsToQueue.

3.1.2 觸發(fā)時機

queue updating task被觸發(fā)執(zhí)行的時機有:

1. 每張ReplicatedMergeTree表都有一個restarting thread腮敌,這個thread會被周期性調(diào)度執(zhí)行,默認間隔是1分鐘(由參數(shù)zookeeper_session_expiration_check_period控制)俏扩。當復制表首次被加載或者zookeeper session過期后被重新加載時糜工,restarting thread會激活并調(diào)度queue updating task執(zhí)行。

2. queue updating task每次被執(zhí)行時, 會在從zookeeper_path/log獲取log entries時添加一個watch callback (詳見源碼)录淡,這個callback會調(diào)度執(zhí)行queue updating task, 所以每當zookeeper_path/log下有新的log entry生成時捌木,queue updating task就會被調(diào)度執(zhí)行

3. 如果queue updating task在拉取和更新隊列時發(fā)生異常嫉戚,在異常處理中會再次調(diào)度執(zhí)行queue updating task刨裆,調(diào)度執(zhí)行的間隔時間是QUEUE_UPDATE_ERROR_SLEEP_MS(1秒)澈圈。

3.1.3 為什么既要拉取到replica_path/queue又要拉取到本地內(nèi)存中?

1. 拉取到replica_path/queue是為了方便記錄每個副本有哪些是待執(zhí)行的entries帆啃,執(zhí)行完成的直接從replica_path/queue里刪除即可瞬女。同時,當某個副本重啟或者某張ReplicatedMergeTree表被detach后再attach時努潘,ClickHouse可以從replica_path/queue快速加載待執(zhí)行的entries (詳見源碼ReplicatedMergeTreeQueue::load)诽偷。

2. 拉取到本地內(nèi)存相當于做了一個緩存,避免每次遍歷待執(zhí)行entries都需要從zookeeper拉取慈俯,提升了性能渤刃,也可以降低zookeeper的負載。

3.2 queue executing task

queue updating task把同步日志(log entries)拉取到本地內(nèi)存后贴膘,我們需要一個任務去執(zhí)行這些log entries. 這個任務就是queue executing task.?

queue executing task在StoreageReplicatedMergeTree類中被實現(xiàn)為一個通過BackgroundProcessingPool調(diào)度執(zhí)行的task卖子,即 StorageReplicatedMergeTree::queue_task_handle.

queue_task_handle執(zhí)行的函數(shù)是StorageReplicatedMergeTree::queueTask.

下面介紹queueTask函數(shù)的主要執(zhí)行步驟。

3.2.1 選擇要處理的log entry

從ReplicatedMergeTreeQueue::queue中選擇一個log entry, 選擇邏輯如下(實現(xiàn)細節(jié)詳見ReplicatedMergeTreeQueue::shouldExecuteLogEntry):

1. 對于GET_PART類型的log entry刑峡,如果它生成的data part被某個正在執(zhí)行的log entry的resulting data parts所包含洋闽,則當前entry此輪不被選擇。

2. 對于MERGE_PARTS類型的log entry:

????2.1 如果它生成的data part被某個正在執(zhí)行的log entry的resulting data parts所包含突梦,則當前entry此輪不被選擇诫舅。

????2.2 如果它的source data parts中存在某個data part還處于被生成過程中,則當前entry此輪不被選擇宫患。

????2.3 如果它是TTL類型merge(包括TTL_DELETE, TTL_RECOMPRESS刊懈,詳見MergeType)且在運行的TTL類型merges的個數(shù)(全局值,不是針對當前表的)>= max_number_of_merges_with_ttl_in_pool (配置)娃闲,則當前entry此輪不被選擇虚汛。

????2.4 如果source data parts的總數(shù)據(jù)量大于max_source_parts_size(max_source_parts_size是運行時確定的變量,確定邏輯詳見源碼)且background pool中沒有足夠的free threads做large merges (如果有足夠free threads則不考慮source data parts的數(shù)據(jù)量是否超過max_source_parts_size皇帮,詳見源碼1源碼2)卷哩,則當前entry此輪不被選擇。

3. 對于MUTATE_PART類型的log entry:

????3.1 如果它生成的data part被某個正在執(zhí)行的log entry的resulting data parts所包含属拾,則當前entry此輪不被選擇将谊。

????3.2 如果它的source data parts中存在某個data part還處于被生成過程中,則當前entry此輪不被選擇渐白。

????3.3 如果source data parts的總數(shù)據(jù)量大于max_source_parts_size(max_source_parts_size是運行時確定的變量尊浓,確定邏輯詳見源碼),則當前entry此輪不被選擇纯衍。

????3.4 如果它是某個alter modify/drop query的一部分眠砾,則需要按alter語句的發(fā)生順序依次執(zhí)行,如果存在比它早的alter語句尚未執(zhí)行完成,則當前entry此輪不被選擇褒颈。詳見源碼柒巫。

4. 對于ALTER_METADATA類型的log entry, 需要按照alter語句的發(fā)生順序依次執(zhí)行,如果存在比它早的alter語句尚未執(zhí)行完成谷丸,則當前entry此輪不被選擇堡掏。詳見源碼

5. 對于DROP_RANGE 和 REPLACE_RANGE類型的log entry刨疼,因為它們會等待生成指定范圍內(nèi)的data parts的log entries執(zhí)行完成泉唁,為了避免死鎖,如果已經(jīng)有DROP_RANGE 和 REPLACE_RANGE類型的log entry正在執(zhí)行揩慕,則當前entry此輪不被選擇亭畜。詳見源碼

6. 對于MERGE_PARTS 和 MUTATE_PART類型的log entry迎卤,如果merge/mutation/TTL類型merge被cancel了(如果被cancel則后續(xù)不能提交對應類別的操作拴鸵,在運行的該類操作也會拋異常),則對應操作的log entry也不會被選中執(zhí)行蜗搔。詳見源碼1源碼2.

如果上述條件都不滿足劲藐,則該log entry被選中執(zhí)行。每次queue executing task執(zhí)行只會選擇一個log entry進行處理樟凄。

3.2.2 處理選中的log entry

選中l(wèi)og entry后聘芜,StorageReplicatedMergeTree::queueTask函數(shù)會調(diào)用ReplicatedMergeTreeQueue::processEntry對log entry進行處理。processEntry函數(shù)的第三個參數(shù)是用于執(zhí)行l(wèi)og entry的函數(shù)缝龄,此處傳入的是StorageReplicatedMergeTree::executeLogEntry.

3.2.2.1 execute log entry

log entry的處理邏輯如下:

1. 對于DROP_RANGE和REPLACE_RANGE類型的log entry, 調(diào)用對應的StorageReplicatedMergeTree::executeDropRange 和?StorageReplicatedMergeTree::executeReplaceRange函數(shù)進行處理汰现,這里對executeDropRange的執(zhí)行過程做簡要介紹(另一個讀者可自行分析源碼):

????1.1 從zookeeper上的replica_path/queue節(jié)點和內(nèi)存中移出所有會生成對應range(這里的range是指定partition內(nèi)指定的block number的范圍)中的data part的log entries (這里會刪除zookeeper上replica_path/queue節(jié)點下的entry),如果有正在運行中的叔壤,則等待其運行完成瞎饲。詳見源碼ReplicatedMergeTreeQueue::removePartProducingOpsInRange

????1.2 移除當前working set(即MergeTreeData::data_parts_by_info)中屬于對應range的data parts(詳見MergeTreeData::removePartsInRangeFromWorkingSet)百新。注意企软,這里并沒有從磁盤刪除這些data parts庐扫,只是修改了內(nèi)存中的一些狀態(tài)饭望,比如,將data part的state改成IMergeTreeDataPart::State::Outdated(詳見MergeTreeData::removePartsFromWorkingSet)形庭,真正從磁盤刪除data parts的操作是通過喚醒cleanup線程完成的铅辞。

? ? 1.3 從zookeeper上的replica_path/parts節(jié)點中刪除在1.2中被移除的data parts,這里如果刪除失敗則會重試萨醒,默認重試5次斟珊。

? ? 1.4 喚醒cleanup線程,從磁盤清理掉已被移除的data parts富纸。

2. 對于GET_PART類型的log entry囤踩,處理邏輯如下:

????2.1 判斷目標data part在當前副本是否已經(jīng)存在或被已存在的data part所包含旨椒,這里不僅會檢查active data parts,也會檢查處于MergeTreeDataPartState::PreCommitted狀態(tài)的data parts堵漱,詳見源碼综慎。

? ? 2.2 判斷目標data part在zookeeper的replica_path/parts節(jié)點下是否已經(jīng)存在。

? ? 2.3 如果2.1和2.2的判斷結(jié)果都是已存在勤庐,則直接跳過當前l(fā)og entry示惊,不做處理(此時executeLogEntry返回true垄提,表示處理成功)罩扇。

? ? 2.4 否則自脯,進一步判斷目標data part是否是某次失敗的write with quorum操作的resulting data part华坦,判斷方法是查看zookeeper上zookeeper_path/quorum/failed_parts節(jié)點下是否存在目標data part锅知。如果存在阴幌,則和2.3一樣寄纵,直接跳過當前l(fā)og entry立美,不做處理(此時executeLogEntry返回true类嗤,表示處理成功)糊肠。

? ? 2.5 否則,從其他有目標data part的副本節(jié)點去拉取目標data part遗锣。拉取是調(diào)用StorageReplicatedMergeTree::executeFetch實現(xiàn)的货裹。具體拉取過程暫不剖析,后續(xù)data fetcher相關(guān)文章中再介紹精偿。

3. 對于MERGE_PARTS類型的log entry弧圆,處理邏輯如下:

? ? 3.1 同GET_PART的2.1 ~ 2.4操作。

? ? 3.2 調(diào)用StorageReplicatedMergeTree::tryExecuteMerge函數(shù)笔咽,該函數(shù)會根據(jù)各種情況/狀態(tài)及相關(guān)配置而決定是執(zhí)行該merge操作還是建議從其他副本節(jié)點拉取目標data part搔预。具體merge邏輯暫不剖析,后續(xù)merge相關(guān)文章中再介紹叶组。

? ? 3.3 如果tryExecuteMerge返回false拯田,即建議從其他副本拉取data part,則調(diào)用StorageReplicatedMergeTree::executeFetch進行拉取甩十。

4. 對于MUTATE_PART類型的log entry船庇,處理邏輯如下:

? ? 4.1?同GET_PART的2.1 ~ 2.4操作。

? ? 4.2 調(diào)用StorageReplicatedMergeTree::tryExecutePartMutation函數(shù)侣监,和StorageReplicatedMergeTree::tryExecuteMerge一樣鸭轮,該函數(shù)也會根據(jù)各種情況/狀態(tài)及相關(guān)配置而決定是執(zhí)行該mutate操作還是建議從其他副本節(jié)點拉取目標data part。具體mutate過程此處不做剖析橄霉,讀者可自行分析源碼窃爷。

? ? 4.3?如果tryExecutePartMutation返回false,即建議從其他副本拉取data part,則調(diào)用StorageReplicatedMergeTree::executeFetch進行拉取按厘。

5. 對于ALTER_METADATA類型的log entry, 調(diào)用StorageReplicatedMergeTree::executeMetadataAlter函數(shù)進行處理医吊,處理邏輯如下:

? ? 5.1 從該log entry中解析出要更新的metadata和columns信息。

? ? 5.2 更新zookeeper上的replica_path/metadatareplica_path/columns節(jié)點的內(nèi)容為5.1中的metadata和columns信息逮京。

? ? 5.3 更新當前副本本地的元信息(對于ordinary database下的表遮咖,會修改本地保存的sql文件),詳見StorageReplicatedMergeTree::setTableStructure造虏。

3.2.2.2 remove processed entry

executeLogEntry執(zhí)行完成后會返回一個bool值御吞,表示是否執(zhí)行成功。如果執(zhí)行成功漓藕,則processEntry會調(diào)用ReplicatedMergeTreeQueue::removeProcessedEntry從內(nèi)存中的隊列和zookeeper上的replica_path/queue節(jié)點中移除剛執(zhí)行完成的log entry陶珠。

3.3 mutations updating task

mutations updating task做的事情是:基于zookeeper上的zookeeper_path/mutations節(jié)點更新內(nèi)存中的mutation狀態(tài)數(shù)據(jù)。這里說的狀態(tài)數(shù)據(jù)主要是mutations_by_znode 和 mutations_by_partition享钞。

其中揍诽,mutations_by_znode保存了mutation id(也就是zookeeper_path/mutations下的節(jié)點名稱)到MutationStatus對象的映射,MutationStatus中保存了單個mutation的狀態(tài)信息栗竖,包括entry, parts_to_do, is_done等暑脆。

mutations_by_partition的類型是std::unordered_map<String, std::map<Int64, MutationStatus *>>, 保存的是Partition -> (block_number -> MutationStatus)的雙層映射關(guān)系狐肢,這樣我們通過partition id和data part包含的block number就可以找到某個data part需要執(zhí)行的mutation操作添吗。

注意,mutations_by_partition中的第二層key(ie. block_number)就是mutation version份名,每個data part的MergeTreePartInfo::getDataVersion(如果data part被mutate過或者包含mutated part碟联,則返回的是對應的mutation version,否則返回改data part中最小的block number)會和mutation version做比較僵腺,如果小于這個version鲤孵,則說明data part需要執(zhí)行這個mutation。

mutations updating task是一個通過BackgroundSchedulePool調(diào)度執(zhí)行的task辰如,即StorageReplicatedMergeTree::mutations_updating_task普监。mutations_updating_task執(zhí)行的函數(shù)是StorageReplicatedMergeTree::mutationsUpdatingTask,其主要工作是通過調(diào)用ReplicatedMergeTreeQueue::updateMutations函數(shù)完成的琉兜。

下面我們看看mutations updating task的主要工作內(nèi)容有哪些凯正。

3.3.1 工作內(nèi)容

1. 從zookeeper_path/mutations節(jié)點拉取得到所有的mutation entries的id (即對應的znode name)。

2. 從mutations_by_znode和mutations_by_partition中移除掉zookeeper_path/mutations中不存在的mutation呕童,出現(xiàn)這種mutation的原因可能是:

? ? 2.1 被KILL MUTATION語句殺掉的漆际。

? ? 2.2 已經(jīng)指向完成后被mutations finalizing task (下文會介紹)移除掉的淆珊。

3. 將zookeeper_path/mutations中新的mutations(即在mutations_by_znode和mutations_by_partition中不存在的)加入到mutations_by_znode和mutations_by_partition中夺饲。

4. 對每個新增的mutation,填充其parts_to_do (即需要mutate的data parts),填充的步驟如下:

? ? 4.1 遍歷mutation entry中的block_numbers往声,對于其中的每個partition擂找, 從當前副本當前表的現(xiàn)存data parts(ReplicatedMergeTreeQueue::current_parts)中找到屬于這個partition并且MergeTreePartInfo::getDataVersion小于對應mutation version的data parts(詳見getPartNamesToMutate),將這些data parts添加到parts_to_do中浩销。

? ? 4.2 遍歷內(nèi)存中的同步任務隊列(ReplicatedMergeTreeQueue::queue)中的每個log entry贯涎,在每個log entry的resulting data parts中找到partition存在于mutation entry的block_numbers中且MergeTreePartInfo::getDataVersion小于對應mutation version的data parts,將這些data parts添加到parts_to_do中慢洋。

5. 如果步驟3中獲取到的新的mutations不為空塘雳,則調(diào)度merge/mutation selecting task(下文會介紹)執(zhí)行。

6. 如果在步驟4中發(fā)現(xiàn)某些新增的mutation的parts_to_do為空(表名這個mutation可能可以終止了)普筹,則調(diào)度mutations finalizing task執(zhí)行败明。

3.3.2 觸發(fā)時機

mutations updating task被觸發(fā)執(zhí)行的時機和queue updating task基本一致:

1. 和queue updating task一樣,當復制表首次被加載或者zookeeper session過期后被重新加載時太防,restarting thread會激活并調(diào)度mutations updating task執(zhí)行妻顶。

2. mutations updating task每次被執(zhí)行時, 會在從zookeeper_path/mutations獲取mutation entries時添加一個watch callback (詳見源碼),這個callback會調(diào)度執(zhí)行mutations updating task, 所以每當zookeeper_path/mutations下有新的mutations entry生成時蜒车,mutations updating task就會被調(diào)度執(zhí)行讳嘱。

3. 如果mutations updating task在拉取和更新mutations時發(fā)生異常,在異常處理中會再次調(diào)度執(zhí)行mutations updating task酿愧,調(diào)度執(zhí)行的間隔時間是QUEUE_UPDATE_ERROR_SLEEP_MS(1秒)沥潭。

3.4 merge/mutation selecting task

上文中曾提到,zookeeper_path/muations中的一個mutation entry會被轉(zhuǎn)換成多個MUTATE_PART類型的log entry并投放到zookeeper_path/log中嬉挡。這個轉(zhuǎn)換工作是由merge/mutation select task完成的叛氨。顧名思義,這個task會負責merge和mutation任務的選擇和提交棘伴。

merge/mutation selecting task是一個通過BackgroundSchedulePool調(diào)度執(zhí)行的task寞埠,即StorageReplicatedMergeTree::merge_selecting_task。注意焊夸,這里的命名雖然是merge_selecting_task, 但該task涵蓋了merge和mutation任務的選擇和提交仁连。

merge_selecting_task執(zhí)行的函數(shù)是StorageRreplicatedMergeTree::mergeSelectingTask,我們看看merge/mutate selecting task做了哪些工作阱穗。

3.4.1 工作內(nèi)容

1. 統(tǒng)計同步任務隊列(ReplicatedMergeTreeQueue::queue)中MERGE_PARTS和MUTATE_PART類型的log entry的數(shù)量饭冬,詳見源碼

2. 如果步驟1中統(tǒng)計得到的merge + mutation的任務總數(shù)大于或等于max_replicated_merges_in_queue(配置)揪阶,則ClickHouse認為隊列中merge和mutation任務已經(jīng)夠多了昌抠,不會選擇和提交新的merge/mutation任務,跳至步驟5鲁僚,否則繼續(xù)執(zhí)行步驟3炊苫。

3. 根據(jù)配置和一些runtime狀態(tài)(比如裁厅,磁盤空閑空間)計算出max_source_parts_size_for_merge 和?max_source_part_size_for_mutation,即最大可合并的source parts大小 和 最大可mutate的source part大小侨艾。這兩個值的計算邏輯見MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge 和?MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation执虹。相關(guān)配置有:

? ? ????a.?max_bytes_to_merge_at_min_space_in_pool

? ? ? ? b.?max_bytes_to_merge_at_max_space_in_pool

? ? ? ? c.?number_of_free_entries_in_pool_to_lower_max_size_of_merge

? ? ? ? d.?number_of_free_entries_in_pool_to_execute_mutation

4. 選擇是提交MERGE_PARTS任務,還是MUTATE_PART任務唠梨,或者都不執(zhí)行:

? ? 4.1 如果max_source_parts_size_for_merge為0(即沒空間做merge了)袋励,或者沒有需要merge且可以merge的data parts(選取過程詳見MergeTreeDataMergerMutator::selectPartsToMerge),則不提交MERGE_PARTS任務当叭,跳至步驟4.3茬故,否則繼續(xù)執(zhí)行步驟4.2。

? ? 4.2 調(diào)用StorageReplicatedMergeTree::createLogEntryToMergeParts構(gòu)建并提交一個MERGE_PARTS log entry到zookeeper_path/log中蚁鳖。然后跳至步驟5均牢,否則繼續(xù)執(zhí)行步驟4.3。

? ? 4.3 如果滿足以下條件之一才睹,則不提交MUTATE_PART徘跪,跳至步驟5,否則繼續(xù)執(zhí)行步驟4.4琅攘。

? ? ? ? a.?max_source_part_size_for_mutation為0垮庐。?

? ? ? ? b.?mutations_by_partition的size為0,即沒有需要執(zhí)行的mutation操作坞琴。

? ? ? ? c. 同步任務隊列中的MUTATE_PART任務數(shù)大于等于max_replicated_mutations_in_queue(配置)哨查。

? ? 4.4 遍歷當前表的committed data parts(MergeTreeData::getDataPartsVector),選擇一個滿足以下全部條件的data part 剧辐。如果沒有滿足條件的data part則跳至步驟5寒亥,否則繼續(xù)執(zhí)行步驟4.5。

? ? ? ? a. size on disk <=?max_source_part_size_for_mutation

? ? ? ? b. 在mutations_by_partition中存在data part所在partition的mutation操作荧关,并且對應的mutation version高于data part當前的mutation version溉奕。詳見ReplicatedMergeTreeMergePredicate::getDesiredMutationVersion和?ReplicatedMergeTreeQueue::getCurrentMutationVersionImpl

? ? 4.5 在步驟4.4中選中一個data part后忍啤,調(diào)用StorageReplicatedMergeTree::createLogEntryToMutatePart構(gòu)建并提交一個MUTATE_PART log entry到zookeeper_path/log中加勤。

5. 如果上述步驟沒有成功提交log entry,則調(diào)度merge_selecting_task在MERGE_SELECTING_SLEEP_MS(5秒)后執(zhí)行同波。如果成功提交了鳄梅,則立刻調(diào)度merge_selecting_task執(zhí)行。

說明:

1. 每次merge_selecting_task的調(diào)用執(zhí)行未檩,只會提交一個MERGE_PARTS或MUTATE_PART任務到zookeeper_path/log中戴尸。

2. 根據(jù)MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation的實現(xiàn)可知,只有在空閑線程充足的情況下冤狡,才會考慮提交MUTATE_PART孙蒙。這是為了留更多線程給MERGE_PARTS项棠。而且,在步驟4中做選擇時马篮,也是先判斷是否可以提交MERGE_PARTS,如果有可提交的MERE_PARTS怜奖,則不會考慮MUTATE_PART浑测。 由此可見,在ClickHouse中歪玲,MERGE_PARTS任務的優(yōu)先級是高于MUTATE_PART任務的迁央。

3.?merge/mutation selecting task只在leader副本上執(zhí)行(常見所有副本都是leader的情況)。

3.4.2 觸發(fā)時機

merge/mutation selecting task被觸發(fā)執(zhí)行的時機有:

1. 當前副本贏得leader election后滥崩,激活并調(diào)度merge/mutation selecting task執(zhí)行岖圈。

2.?merge/mutation selecting task在執(zhí)行完成后,如果結(jié)果成功钙皮,則立刻調(diào)度merge/mutation selecting task執(zhí)行蜂科,如果失敗,則調(diào)度merge/mutation selecting task延遲MERGE_SELECTING_SLEEP_MS(5秒)后執(zhí)行短条。

3.?ReplicatedMergeTree表的插入操作在完成commit data part后, 會觸發(fā)merge/mutation selecting task調(diào)度執(zhí)行导匣,詳見源碼

4. 在mutations updating task發(fā)現(xiàn)新的mutation并完成狀態(tài)更新后茸时,會觸發(fā)merge/mutation selecting task調(diào)度執(zhí)行贡定。

5. queue executing task在成功執(zhí)行MERG_PARTS log entry后,會觸發(fā)merge/mutation selecting task調(diào)度執(zhí)行可都,詳見StorageReplicatedMergeTree::tryExecuteMerge缓待。

6. queue executing task在成功執(zhí)行MUTATE_PART log entry后,會觸發(fā)merge/mutation selecting task調(diào)度執(zhí)行渠牲,詳見StorageReplicatedMergeTree::tryExecutePartMutation旋炒。

3.5 mutations finalizing task

merge/mutation selecting task將zookeeper_path/mutations下的mutation entry轉(zhuǎn)換成一個個MUTATE_PART log entry并投放到zookeeper_path/log下后,queue executing task會執(zhí)行這些MUTATE_PART任務签杈。當一個mutation entry相關(guān)的所有MUTATE_PART任務都完成了国葬,那么這個mutation entry就可以被結(jié)束了。

mutations finalizing task就是用來結(jié)束mutation entry的任務芹壕。mutations finalizing task是一個通過BackgroundSchedulePool調(diào)度執(zhí)行的任務汇四,即StorageReplicatedMergeTree::mutations_finalizing_task,其調(diào)用的函數(shù)是StorageReplicatedMergeTree::mutationsFinalizingTask踢涌,該函數(shù)的主要功能是通過調(diào)用ReplicatedMergeTreeQueue::tryFinalizeMutations實現(xiàn)的通孽。

下面介紹一下mutations finalizing task的工作內(nèi)容。

3.5.1 工作內(nèi)容

1. 遍歷mutations_by_znode中的所有mutations睁壁,對于is_done標志位false的mutations:

? ? 1.1 如果znode小于等于mutation_pointer(mutation已經(jīng)完成)背苦,則執(zhí)行以下操作:

? ? ? ? a. is_done置true互捌。

? ? ? ? b. 清空parts_to_do。

? ? ? ? c. 調(diào)用ReplicatedMergeTreeAltersSequence::finishDataAlter更新alter的狀態(tài)信息(如果對應的metadata alter也完成了行剂,則會從alter sequence移除該alter)秕噪。

? ? 1.2 如果znode大于mutation_pointer,且parts_to_do為空(mutation可能已完成)厚宰,則將該mutation放入candidates中腌巾。

2. 對于每個candidate,調(diào)用ReplicatedMergeTreeMergePredicate::isMutationFinished判斷其是否已經(jīng)結(jié)束铲觉,判斷邏輯是:

????2.1 如果mutation涉及的blocks還有在committing階段的澈蝙,則判定mutation為unfinished。

????2.2 如果現(xiàn)存data parts 或者 同步任務隊列(ReplicatedMergeTreeQueue::queue)的resulting data parts中還有需要執(zhí)行改mutation的data parts (詳見getPartNamesToMutate)撵幽,則判定mutation為unfinished灯荧。

????2.3 如果2.1和2.2都不成立,則判定mutation為finished盐杂。

3. 如果存在finished candidates逗载,則:

? ? 3.1 更新mutation_pointer : 將zookeeper上的replica_path/mutation_pointer和內(nèi)存里的ReplicatedMergeTreeQueue::mutation_pointer都設置為finished candidates中最大的znode name。

? ? 3.2 更新每個finished candidate的內(nèi)存狀態(tài)(MutationStatus):

? ? ? ? a.?is_done置true链烈。

? ? ? ? b.?調(diào)用ReplicatedMergeTreeAltersSequence::finishDataAlter更新alter的狀態(tài)信息撕贞。

說明:對于mutation finalizing的邏輯尚未完全理解,這里只是根據(jù)源碼列出執(zhí)行過程测垛,后續(xù)會在mutation相關(guān)文章做進一步解釋捏膨。

3.5.2 觸發(fā)時機

1. 當復制表首次被加載或者zookeeper session過期后被重新加載時,restarting thread會激活并調(diào)度mutations finalizing task執(zhí)行食侮。

2. 在更新mutations相關(guān)狀態(tài)時号涯,如果發(fā)現(xiàn)some_mutations_are_probably_done(即parts_to_do為空),則調(diào)度mutations finalizing task執(zhí)行锯七。

3. 如果mutations finalizing task在finalize mutations發(fā)生異常 或者 成功地finalize了一個或多個mutations链快,那么,則調(diào)度mutations finalizing task延遲MUTATIONS_FINALIZING_SLEEP_MS(1秒)執(zhí)行眉尸。

4. 如果mutations finalizing task順利運行后沒有finalize任何mutation域蜗,則調(diào)度mutations finalizing task延遲MUTATIONS_FINALIZING_IDLE_SLEEP_MS(5秒)執(zhí)行。

4 總結(jié)

1. 復制表的數(shù)據(jù)同步包括元數(shù)據(jù)同步和內(nèi)容數(shù)據(jù)同步噪猾。

2. 引發(fā)數(shù)據(jù)同步任務的操作包括INSERT霉祸,ALTER METADATA,MERGE袱蜡,MUTATION等丝蹭。

3. zookeeper在復制表的數(shù)據(jù)同步中起到橋梁作用,znode路徑分為表級別zookeeper_path和表副本級別的replica_path, 關(guān)鍵節(jié)點包括log坪蚁,log_pointer, queue, mutations, mutation_pointer等奔穿。

4. 各個副本節(jié)點都運行著負責數(shù)據(jù)同步的各類任務镜沽,包括queue updating task, queue executing task, mutations updating task, merge/mutation selecting task和mutations finalizing task。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末贱田,一起剝皮案震驚了整個濱河市缅茉,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌男摧,老刑警劉巖蔬墩,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異彩倚,居然都是意外死亡筹我,警方通過查閱死者的電腦和手機扶平,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進店門帆离,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人结澄,你說我怎么就攤上這事哥谷。” “怎么了麻献?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵们妥,是天一觀的道長。 經(jīng)常有香客問我勉吻,道長监婶,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任齿桃,我火速辦了婚禮惑惶,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘短纵。我一直安慰自己带污,他們只是感情好,可當我...
    茶點故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布香到。 她就那樣靜靜地躺著鱼冀,像睡著了一般。 火紅的嫁衣襯著肌膚如雪悠就。 梳的紋絲不亂的頭發(fā)上千绪,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天,我揣著相機與錄音梗脾,去河邊找鬼翘紊。 笑死,一個胖子當著我的面吹牛藐唠,可吹牛的內(nèi)容都是我干的帆疟。 我是一名探鬼主播鹉究,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼踪宠!你這毒婦竟也來了自赔?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤柳琢,失蹤者是張志新(化名)和其女友劉穎绍妨,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體柬脸,經(jīng)...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡他去,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了倒堕。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片灾测。...
    茶點故事閱讀 38,137評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖垦巴,靈堂內(nèi)的尸體忽然破棺而出媳搪,到底是詐尸還是另有隱情,我是刑警寧澤骤宣,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布秦爆,位于F島的核電站,受9級特大地震影響憔披,放射性物質(zhì)發(fā)生泄漏等限。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一芬膝、第九天 我趴在偏房一處隱蔽的房頂上張望望门。 院中可真熱鬧,春花似錦蔗候、人聲如沸怒允。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽纫事。三九已至,卻和暖如春所灸,著一層夾襖步出監(jiān)牢的瞬間丽惶,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工爬立, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留钾唬,地道東北人。 一個月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓,卻偏偏與公主長得像抡秆,于是被迫代替她去往敵國和親奕巍。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,901評論 2 345

推薦閱讀更多精彩內(nèi)容