ReplicatedMergeTree是ClickHouse最常用的表引擎之一沙庐,該引擎和MergeTree一樣都繼承自MergeTreeData, 和MergeTree共享相同的底層存儲層實現(xiàn)俊嗽。ReplicatedMergeTree區(qū)別于MergeTree引擎的地方在于其實現(xiàn)了多副本數(shù)據(jù)存儲滴某,并借助zookeeper進行多副本之間的數(shù)據(jù)同步。
本文針對ReplicatedMergeTree引擎如何實現(xiàn)副本間數(shù)據(jù)同步,結(jié)合部分源代碼(20.10版本)做一個簡單剖析。
0 術(shù)語解釋
data part : clickhouse中的數(shù)據(jù)組織單位,一次INSERT會生成一個或多個data part, 一個data part里的數(shù)據(jù)存放在同一個文件夾里辙诞。
mutation : 對一個或多個data parts的內(nèi)容數(shù)據(jù)的修改操作,最典型的觸發(fā)mutation的操作是:ALTER TABLE ... DELETE 和 ALTER TABLE ... UPDATE(詳見mutations)轻抱。
1 哪些操作會引發(fā)數(shù)據(jù)同步飞涂?
這里說的數(shù)據(jù)同步包括了元數(shù)據(jù)和內(nèi)容數(shù)據(jù)的同步。在ClickHouse中祈搜,數(shù)據(jù)插入(INSERT)较店,元數(shù)據(jù)變更(ALTER),內(nèi)容數(shù)據(jù)變更(MUTATION)容燕,合并(MERGE)等操作都會引發(fā)數(shù)據(jù)同步梁呈。
1.1 INSERT
clickhouse執(zhí)行ReplicatedMergeTree表的插入操作時,會將數(shù)據(jù)以data part為單位寫入到磁盤/內(nèi)存里蘸秘,每個data part寫入完成后都會做一個commit操作, 這個commit操作會生成一個GET_PART類型的log entry并上傳到zookeeper中供其他副本拉取同步官卡,詳見源碼蝗茁。
由此可見,INSERT語句引發(fā)的是data part級別的內(nèi)容數(shù)據(jù)的同步寻咒。
1.2 ALTER METADATA
這里之所以稱為'ALTER METADATA'而不直接稱'ALTER'哮翘,是因為,在ClickHouse中update和delete也是通過ALTER語句實現(xiàn)的毛秘,即ALTER TABLE ... DELETE 和 ALTER TABLE ... UPDATE.? ALTER TABLE ... DELETE 和 ALTER TABLE ... UPDATE語句觸發(fā)的是mutations饭寺,所以它們被歸為mutation的范疇。
這里說的是會對表的元數(shù)據(jù)進行修改的ALTER語句叫挟。ClickHouse將ReplicatedMergeTree表的元數(shù)據(jù)信息存放在zookeeper和本地文件中艰匙,ALTER語句會觸發(fā)對這兩個地方的元數(shù)據(jù)信息進行修改。注意抹恳,在zookeeper上旬薯,表的元數(shù)據(jù)信息會在表級別和副本級別的znodes(即下文說的zookeeper_path和replica_path)下都進行存儲。
alter語句首先會修改掉zookeeper上zookeeper_path下的元數(shù)據(jù)适秩,然后將修改后的元數(shù)據(jù)包裝生成一個ALTER_METADATA類型的ReplicatedMergeTreeLogEntryData對象,并將其上傳到zookeeper上硕舆,供所有副本(包括當前副本)拉取更新秽荞。詳見源碼。
注意抚官,這里說的”會對表的元數(shù)據(jù)進行修改的ALTER語句“ 有一些同時也會觸發(fā)mutation扬跋,下文將進一步介紹。
1.3 MERGE
merge操作用于將若干個data parts合并為單個data part凌节,ClickHouse會在后臺選擇需要做merge的data parts钦听,然后創(chuàng)建一個MERGE_PARTS類型的ReplicatedMergeTreeLogEntryData對象,并將其上傳到zookeeper上倍奢,供所有副本(包括當前副本)拉取同步朴上。詳見源碼StorageReplicatedMergeTree::mergeSelectingTask()?.
1.4 MUTATION
mutation是對data parts數(shù)據(jù)的修改操作,一個mutation可能包含了對很多個data parts的修改卒煞。
除了ALTER TABLE ... DELETE 和 ALTER TABLE ... UPDATE痪宰,以下類型的ALTER語句也會觸發(fā)mutation :
? ? 1. DROP_COLUMN
? ? 2. DROP_INDEX
? ? 3. RENAME_COLUMN
? ? 4. 非meta only的MODIFY_COLUMN
對于會觸發(fā)mutation的ALTER語句,ClickHouse會生成一個ReplicatedMergeTreeMutationEntry對象畔裕,并將其上傳到zookeeper衣撬,供所有副本(包括當前副本)拉取同步。
詳見源碼StorageReplicatedMergeTree::alter?和 AlterCommand::isRequireMutationStage.
2 ZooKeeper的結(jié)構(gòu)和作用
zookeeper是ReplicateMergeTree表副本之間進行數(shù)據(jù)同步的橋梁扮饶,存儲了復制表的各類元數(shù)據(jù)信息具练。這里介紹幾個和數(shù)據(jù)同步相關(guān)的主要信息。
2.1 zookeeper_path和replica_path
一張ReplicatedMergeTree表的所有副本在zookeeper上共享一個znode路徑甜无,我們稱之為zookeeper_path扛点,這個路徑在建表時指定哥遮,我們對該路徑的命名規(guī)范是:
????????/clickhouse/tables/{layer}-{shard}/<table_name>
其中l(wèi)ayer和shard都是macro,各個副本節(jié)點取值可能不同占键。
zookeeper_path是表級別的昔善,其下有一個replicas節(jié)點,內(nèi)部包含了各個副本的元信息節(jié)點畔乙,即:
????? ? /clickhouse/tables/{layer}-{shard}/<table_name>/replicas/<replica>
我們稱副本的元信息節(jié)點為replica_path.
2.2 表的元信息(metadata, columns)
在zookeeper_path和replica_path下都保存了表的metadata和columns信息君仆,元數(shù)據(jù)變更時會先修改zookeeper_path下的metadata和columns,然后會異步地將更新同步到replica_path下牲距。
除此之外返咱,replica_path下面還會有一個metadata_version用于記錄當前元數(shù)據(jù)信息的版本,這個值就是zookeeper_path下的metadata節(jié)點的dataVersion.
zookeeper_path下的metadata和columns信息的更新詳見源碼牍鞠。
replica_path下的metadata, columns和metadata_version的更新見源碼咖摹。
2.3 同步日志和副本隊列(log, queue)
2.3.1 同步日志(log)
log位于zookeeper_path下面,用于保存所有副本間的數(shù)據(jù)同步日志(稱為log entry)难述,ClickHouse支持的LogEntry類型有:
從各個副本觸發(fā)的數(shù)據(jù)變更操作都會上傳對應的log entry到log節(jié)點下萤晴,比如:
? ? ? ? 1. 在一個副本上執(zhí)行alter table ... add column操作會上傳一個ALTER_METADATA類型的log entry到log節(jié)點下。
? ? ? ? 2. 在一個副本上執(zhí)行insert into ...操作會上傳一個GET_PART類型的log entry到log節(jié)點下胁后。
zookeeper_path/log下的log entry在zookeeper上的znode命名規(guī)范是log-seqNum, 其中seqNum為創(chuàng)建sequential znode生成的自增序號店读,如log-0000000001。
2.3.2 副本隊列 (queue)
queue位于replica_path下面攀芯,保存了當前副本需要同步的log entries. queue里面的log entries是從log中拉取的(由下文說的queue updating task負責拉韧投稀),被所有副本都拉取過的log entry就可以從log中刪除掉, 刪除操作由后臺清理線程執(zhí)行侣诺。
replica_path下還有一個log_pointer節(jié)點殖演,保存了當前副本從log拉取到的最大log entry id + 1,即下一個需要拉取的log entry的id.
replica_path/queue下的entry在zookeeper上的znode命名規(guī)范是queue-seqNum, 其中seqNum為創(chuàng)建sequential znode生成的自增序號年鸳,如queue-0000000001趴久。
2.4 數(shù)據(jù)變更(mutations)
在zookeeper_path下面有一個mutations節(jié)點,ALTER操作觸發(fā)的mutation操作都會先封裝成mutation entry 后被上傳到這個節(jié)點搔确。mutation entry保存了mutation操作的相關(guān)信息朋鞍,包括:
????a. source replica : 觸發(fā)mutation的副本節(jié)點。
? ? b. mutation commands : 需要apply到data parts上的mutation指令妥箕。
? ? c. block numbers :這是一個partition_id -> block_number的映射滥酥,保存了每個partition的mutation version。mutation只會被apply到partition中block number小于mutation version的blocks(因為merge的關(guān)系畦幢,每個data part可能包含一個或多個blocks坎吻,并且data part包含的block numbers不一定是連續(xù)的)。
????d. alter version : mutation操作對應的metadata version宇葱,只有ALTER MODIFY/DROP 操作觸發(fā)的mutation才會有非-1的值瘦真,對于ALTER TABLE ... DELETE 和 ALTER TABLE ... UPDATE操作觸發(fā)的mutation操作刊头,這個值都為-1。這個值是為了將ALTER MODIFY/DROP的ALTER METADATA操作和MUTATION操作關(guān)聯(lián)起來诸尽。
mutation entry的結(jié)構(gòu)詳見ReplicatedMergeTreeMutationEntry原杂。
一個mutation entry會被轉(zhuǎn)換成若干個MUTATE_PART類型的log entry被投放到同步日志(zookeeper_path/log)中,下文介紹merge/mutation selecting task時會介紹其轉(zhuǎn)換過程您机。
另外穿肄,在replica_path下面有一個mutation_pointer節(jié)點,用于記錄副本上最后完成的mutation的id(也就是znode name).
3 負責數(shù)據(jù)同步的tasks
每個副本節(jié)點都有一系列的負責數(shù)據(jù)同步的任務际看,這些任務會與zookeeper交互咸产,獲取最新的同步任務,然后在本地內(nèi)存中維護任務隊列并借助任務池調(diào)度執(zhí)行仲闽,執(zhí)行完成后會更新zookeer上的對應狀態(tài)脑溢。下面我們逐一介紹各類同步任務。
3.1 queue updating task
queue updating task負責當前副本上同步日志(log entry)隊列的更新赖欣。
ReplicatedMergeTree表對應的StoreageReplicatedMergeTree類中有一個queue_updating_task對象屑彻,它是由后臺調(diào)度池(BackgroundSchedulePool)調(diào)度執(zhí)行的任務,其執(zhí)行的函數(shù)是StorageReplicatedMergeTree::queueUpdatingTask().
3.1.1 工作內(nèi)容
queue updating task做的事情包括:
1. 從zookeeper拉取zookeeper_path/log下的所有l(wèi)og entries顶吮,并從replica_path/log_pointer獲取當前副本的log_pointer (即下一個需要拉取的log entry的id)社牲。
2. 根據(jù)log_pointer在log entries中定位到需要拉取的entries,如果log_pointer為空云矫,則從log entries中最小的entry開始拉取。
3. 將需要拉取的log entries都同步到replica_path/queue下面汗菜,這里的同步就是以選定的log entries的內(nèi)容為節(jié)點內(nèi)容让禀,一個一個地在replica_path/queue下創(chuàng)建新的sequential znode. 注意,這里創(chuàng)建的znode名稱并不會和zookeeper_path/log下的一致陨界。
4. 更新replica_path/log_pointer的內(nèi)容為拉取到的log entries中最大的entry id + 1.
5. 將拉取到的log entries插入到內(nèi)存中的同步任務隊列里巡揍,即插入到ReplicatedMergeTreeQueue::queue。
6. 觸發(fā)queue executing task(下文會介紹)執(zhí)行菌瘪。
這些步驟的實現(xiàn)細節(jié)請見源碼ReplicatedMergeTreeQueue::pullLogsToQueue.
3.1.2 觸發(fā)時機
queue updating task被觸發(fā)執(zhí)行的時機有:
1. 每張ReplicatedMergeTree表都有一個restarting thread腮敌,這個thread會被周期性調(diào)度執(zhí)行,默認間隔是1分鐘(由參數(shù)zookeeper_session_expiration_check_period控制)俏扩。當復制表首次被加載或者zookeeper session過期后被重新加載時糜工,restarting thread會激活并調(diào)度queue updating task執(zhí)行。
2. queue updating task每次被執(zhí)行時, 會在從zookeeper_path/log獲取log entries時添加一個watch callback (詳見源碼)录淡,這個callback會調(diào)度執(zhí)行queue updating task, 所以每當zookeeper_path/log下有新的log entry生成時捌木,queue updating task就會被調(diào)度執(zhí)行。
3. 如果queue updating task在拉取和更新隊列時發(fā)生異常嫉戚,在異常處理中會再次調(diào)度執(zhí)行queue updating task刨裆,調(diào)度執(zhí)行的間隔時間是QUEUE_UPDATE_ERROR_SLEEP_MS(1秒)澈圈。
3.1.3 為什么既要拉取到replica_path/queue又要拉取到本地內(nèi)存中?
1. 拉取到replica_path/queue是為了方便記錄每個副本有哪些是待執(zhí)行的entries帆啃,執(zhí)行完成的直接從replica_path/queue里刪除即可瞬女。同時,當某個副本重啟或者某張ReplicatedMergeTree表被detach后再attach時努潘,ClickHouse可以從replica_path/queue快速加載待執(zhí)行的entries (詳見源碼ReplicatedMergeTreeQueue::load)诽偷。
2. 拉取到本地內(nèi)存相當于做了一個緩存,避免每次遍歷待執(zhí)行entries都需要從zookeeper拉取慈俯,提升了性能渤刃,也可以降低zookeeper的負載。
3.2 queue executing task
queue updating task把同步日志(log entries)拉取到本地內(nèi)存后贴膘,我們需要一個任務去執(zhí)行這些log entries. 這個任務就是queue executing task.?
queue executing task在StoreageReplicatedMergeTree類中被實現(xiàn)為一個通過BackgroundProcessingPool調(diào)度執(zhí)行的task卖子,即 StorageReplicatedMergeTree::queue_task_handle.
queue_task_handle執(zhí)行的函數(shù)是StorageReplicatedMergeTree::queueTask.
下面介紹queueTask函數(shù)的主要執(zhí)行步驟。
3.2.1 選擇要處理的log entry
從ReplicatedMergeTreeQueue::queue中選擇一個log entry, 選擇邏輯如下(實現(xiàn)細節(jié)詳見ReplicatedMergeTreeQueue::shouldExecuteLogEntry):
1. 對于GET_PART類型的log entry刑峡,如果它生成的data part被某個正在執(zhí)行的log entry的resulting data parts所包含洋闽,則當前entry此輪不被選擇。
2. 對于MERGE_PARTS類型的log entry:
????2.1 如果它生成的data part被某個正在執(zhí)行的log entry的resulting data parts所包含突梦,則當前entry此輪不被選擇诫舅。
????2.2 如果它的source data parts中存在某個data part還處于被生成過程中,則當前entry此輪不被選擇宫患。
????2.3 如果它是TTL類型merge(包括TTL_DELETE, TTL_RECOMPRESS刊懈,詳見MergeType)且在運行的TTL類型merges的個數(shù)(全局值,不是針對當前表的)>= max_number_of_merges_with_ttl_in_pool (配置)娃闲,則當前entry此輪不被選擇虚汛。
????2.4 如果source data parts的總數(shù)據(jù)量大于max_source_parts_size(max_source_parts_size是運行時確定的變量,確定邏輯詳見源碼)且background pool中沒有足夠的free threads做large merges (如果有足夠free threads則不考慮source data parts的數(shù)據(jù)量是否超過max_source_parts_size皇帮,詳見源碼1和源碼2)卷哩,則當前entry此輪不被選擇。
3. 對于MUTATE_PART類型的log entry:
????3.1 如果它生成的data part被某個正在執(zhí)行的log entry的resulting data parts所包含属拾,則當前entry此輪不被選擇将谊。
????3.2 如果它的source data parts中存在某個data part還處于被生成過程中,則當前entry此輪不被選擇渐白。
????3.3 如果source data parts的總數(shù)據(jù)量大于max_source_parts_size(max_source_parts_size是運行時確定的變量尊浓,確定邏輯詳見源碼),則當前entry此輪不被選擇纯衍。
????3.4 如果它是某個alter modify/drop query的一部分眠砾,則需要按alter語句的發(fā)生順序依次執(zhí)行,如果存在比它早的alter語句尚未執(zhí)行完成,則當前entry此輪不被選擇褒颈。詳見源碼柒巫。
4. 對于ALTER_METADATA類型的log entry, 需要按照alter語句的發(fā)生順序依次執(zhí)行,如果存在比它早的alter語句尚未執(zhí)行完成谷丸,則當前entry此輪不被選擇堡掏。詳見源碼。
5. 對于DROP_RANGE 和 REPLACE_RANGE類型的log entry刨疼,因為它們會等待生成指定范圍內(nèi)的data parts的log entries執(zhí)行完成泉唁,為了避免死鎖,如果已經(jīng)有DROP_RANGE 和 REPLACE_RANGE類型的log entry正在執(zhí)行揩慕,則當前entry此輪不被選擇亭畜。詳見源碼。
6. 對于MERGE_PARTS 和 MUTATE_PART類型的log entry迎卤,如果merge/mutation/TTL類型merge被cancel了(如果被cancel則后續(xù)不能提交對應類別的操作拴鸵,在運行的該類操作也會拋異常),則對應操作的log entry也不會被選中執(zhí)行蜗搔。詳見源碼1和源碼2.
如果上述條件都不滿足劲藐,則該log entry被選中執(zhí)行。每次queue executing task執(zhí)行只會選擇一個log entry進行處理樟凄。
3.2.2 處理選中的log entry
選中l(wèi)og entry后聘芜,StorageReplicatedMergeTree::queueTask函數(shù)會調(diào)用ReplicatedMergeTreeQueue::processEntry對log entry進行處理。processEntry函數(shù)的第三個參數(shù)是用于執(zhí)行l(wèi)og entry的函數(shù)缝龄,此處傳入的是StorageReplicatedMergeTree::executeLogEntry.
3.2.2.1 execute log entry
log entry的處理邏輯如下:
1. 對于DROP_RANGE和REPLACE_RANGE類型的log entry, 調(diào)用對應的StorageReplicatedMergeTree::executeDropRange 和?StorageReplicatedMergeTree::executeReplaceRange函數(shù)進行處理汰现,這里對executeDropRange的執(zhí)行過程做簡要介紹(另一個讀者可自行分析源碼):
????1.1 從zookeeper上的replica_path/queue節(jié)點和內(nèi)存中移出所有會生成對應range(這里的range是指定partition內(nèi)指定的block number的范圍)中的data part的log entries (這里會刪除zookeeper上replica_path/queue節(jié)點下的entry),如果有正在運行中的叔壤,則等待其運行完成瞎饲。詳見源碼ReplicatedMergeTreeQueue::removePartProducingOpsInRange。
????1.2 移除當前working set(即MergeTreeData::data_parts_by_info)中屬于對應range的data parts(詳見MergeTreeData::removePartsInRangeFromWorkingSet)百新。注意企软,這里并沒有從磁盤刪除這些data parts庐扫,只是修改了內(nèi)存中的一些狀態(tài)饭望,比如,將data part的state改成IMergeTreeDataPart::State::Outdated(詳見MergeTreeData::removePartsFromWorkingSet)形庭,真正從磁盤刪除data parts的操作是通過喚醒cleanup線程完成的铅辞。
? ? 1.3 從zookeeper上的replica_path/parts節(jié)點中刪除在1.2中被移除的data parts,這里如果刪除失敗則會重試萨醒,默認重試5次斟珊。
? ? 1.4 喚醒cleanup線程,從磁盤清理掉已被移除的data parts富纸。
2. 對于GET_PART類型的log entry囤踩,處理邏輯如下:
????2.1 判斷目標data part在當前副本是否已經(jīng)存在或被已存在的data part所包含旨椒,這里不僅會檢查active data parts,也會檢查處于MergeTreeDataPartState::PreCommitted狀態(tài)的data parts堵漱,詳見源碼综慎。
? ? 2.2 判斷目標data part在zookeeper的replica_path/parts節(jié)點下是否已經(jīng)存在。
? ? 2.3 如果2.1和2.2的判斷結(jié)果都是已存在勤庐,則直接跳過當前l(fā)og entry示惊,不做處理(此時executeLogEntry返回true垄提,表示處理成功)罩扇。
? ? 2.4 否則自脯,進一步判斷目標data part是否是某次失敗的write with quorum操作的resulting data part华坦,判斷方法是查看zookeeper上zookeeper_path/quorum/failed_parts節(jié)點下是否存在目標data part锅知。如果存在阴幌,則和2.3一樣寄纵,直接跳過當前l(fā)og entry立美,不做處理(此時executeLogEntry返回true类嗤,表示處理成功)糊肠。
? ? 2.5 否則,從其他有目標data part的副本節(jié)點去拉取目標data part遗锣。拉取是調(diào)用StorageReplicatedMergeTree::executeFetch實現(xiàn)的货裹。具體拉取過程暫不剖析,后續(xù)data fetcher相關(guān)文章中再介紹精偿。
3. 對于MERGE_PARTS類型的log entry弧圆,處理邏輯如下:
? ? 3.1 同GET_PART的2.1 ~ 2.4操作。
? ? 3.2 調(diào)用StorageReplicatedMergeTree::tryExecuteMerge函數(shù)笔咽,該函數(shù)會根據(jù)各種情況/狀態(tài)及相關(guān)配置而決定是執(zhí)行該merge操作還是建議從其他副本節(jié)點拉取目標data part搔预。具體merge邏輯暫不剖析,后續(xù)merge相關(guān)文章中再介紹叶组。
? ? 3.3 如果tryExecuteMerge返回false拯田,即建議從其他副本拉取data part,則調(diào)用StorageReplicatedMergeTree::executeFetch進行拉取甩十。
4. 對于MUTATE_PART類型的log entry船庇,處理邏輯如下:
? ? 4.1?同GET_PART的2.1 ~ 2.4操作。
? ? 4.2 調(diào)用StorageReplicatedMergeTree::tryExecutePartMutation函數(shù)侣监,和StorageReplicatedMergeTree::tryExecuteMerge一樣鸭轮,該函數(shù)也會根據(jù)各種情況/狀態(tài)及相關(guān)配置而決定是執(zhí)行該mutate操作還是建議從其他副本節(jié)點拉取目標data part。具體mutate過程此處不做剖析橄霉,讀者可自行分析源碼窃爷。
? ? 4.3?如果tryExecutePartMutation返回false,即建議從其他副本拉取data part,則調(diào)用StorageReplicatedMergeTree::executeFetch進行拉取按厘。
5. 對于ALTER_METADATA類型的log entry, 調(diào)用StorageReplicatedMergeTree::executeMetadataAlter函數(shù)進行處理医吊,處理邏輯如下:
? ? 5.1 從該log entry中解析出要更新的metadata和columns信息。
? ? 5.2 更新zookeeper上的replica_path/metadata 和 replica_path/columns節(jié)點的內(nèi)容為5.1中的metadata和columns信息逮京。
? ? 5.3 更新當前副本本地的元信息(對于ordinary database下的表遮咖,會修改本地保存的sql文件),詳見StorageReplicatedMergeTree::setTableStructure造虏。
3.2.2.2 remove processed entry
executeLogEntry執(zhí)行完成后會返回一個bool值御吞,表示是否執(zhí)行成功。如果執(zhí)行成功漓藕,則processEntry會調(diào)用ReplicatedMergeTreeQueue::removeProcessedEntry從內(nèi)存中的隊列和zookeeper上的replica_path/queue節(jié)點中移除剛執(zhí)行完成的log entry陶珠。
3.3 mutations updating task
mutations updating task做的事情是:基于zookeeper上的zookeeper_path/mutations節(jié)點更新內(nèi)存中的mutation狀態(tài)數(shù)據(jù)。這里說的狀態(tài)數(shù)據(jù)主要是mutations_by_znode 和 mutations_by_partition享钞。
其中揍诽,mutations_by_znode保存了mutation id(也就是zookeeper_path/mutations下的節(jié)點名稱)到MutationStatus對象的映射,MutationStatus中保存了單個mutation的狀態(tài)信息栗竖,包括entry, parts_to_do, is_done等暑脆。
mutations_by_partition的類型是std::unordered_map<String, std::map<Int64, MutationStatus *>>, 保存的是Partition -> (block_number -> MutationStatus)的雙層映射關(guān)系狐肢,這樣我們通過partition id和data part包含的block number就可以找到某個data part需要執(zhí)行的mutation操作添吗。
注意,mutations_by_partition中的第二層key(ie. block_number)就是mutation version份名,每個data part的MergeTreePartInfo::getDataVersion(如果data part被mutate過或者包含mutated part碟联,則返回的是對應的mutation version,否則返回改data part中最小的block number)會和mutation version做比較僵腺,如果小于這個version鲤孵,則說明data part需要執(zhí)行這個mutation。
mutations updating task是一個通過BackgroundSchedulePool調(diào)度執(zhí)行的task辰如,即StorageReplicatedMergeTree::mutations_updating_task普监。mutations_updating_task執(zhí)行的函數(shù)是StorageReplicatedMergeTree::mutationsUpdatingTask,其主要工作是通過調(diào)用ReplicatedMergeTreeQueue::updateMutations函數(shù)完成的琉兜。
下面我們看看mutations updating task的主要工作內(nèi)容有哪些凯正。
3.3.1 工作內(nèi)容
1. 從zookeeper_path/mutations節(jié)點拉取得到所有的mutation entries的id (即對應的znode name)。
2. 從mutations_by_znode和mutations_by_partition中移除掉zookeeper_path/mutations中不存在的mutation呕童,出現(xiàn)這種mutation的原因可能是:
? ? 2.1 被KILL MUTATION語句殺掉的漆际。
? ? 2.2 已經(jīng)指向完成后被mutations finalizing task (下文會介紹)移除掉的淆珊。
3. 將zookeeper_path/mutations中新的mutations(即在mutations_by_znode和mutations_by_partition中不存在的)加入到mutations_by_znode和mutations_by_partition中夺饲。
4. 對每個新增的mutation,填充其parts_to_do (即需要mutate的data parts),填充的步驟如下:
? ? 4.1 遍歷mutation entry中的block_numbers往声,對于其中的每個partition擂找, 從當前副本當前表的現(xiàn)存data parts(ReplicatedMergeTreeQueue::current_parts)中找到屬于這個partition并且MergeTreePartInfo::getDataVersion小于對應mutation version的data parts(詳見getPartNamesToMutate),將這些data parts添加到parts_to_do中浩销。
? ? 4.2 遍歷內(nèi)存中的同步任務隊列(ReplicatedMergeTreeQueue::queue)中的每個log entry贯涎,在每個log entry的resulting data parts中找到partition存在于mutation entry的block_numbers中且MergeTreePartInfo::getDataVersion小于對應mutation version的data parts,將這些data parts添加到parts_to_do中慢洋。
5. 如果步驟3中獲取到的新的mutations不為空塘雳,則調(diào)度merge/mutation selecting task(下文會介紹)執(zhí)行。
6. 如果在步驟4中發(fā)現(xiàn)某些新增的mutation的parts_to_do為空(表名這個mutation可能可以終止了)普筹,則調(diào)度mutations finalizing task執(zhí)行败明。
3.3.2 觸發(fā)時機
mutations updating task被觸發(fā)執(zhí)行的時機和queue updating task基本一致:
1. 和queue updating task一樣,當復制表首次被加載或者zookeeper session過期后被重新加載時太防,restarting thread會激活并調(diào)度mutations updating task執(zhí)行妻顶。
2. mutations updating task每次被執(zhí)行時, 會在從zookeeper_path/mutations獲取mutation entries時添加一個watch callback (詳見源碼),這個callback會調(diào)度執(zhí)行mutations updating task, 所以每當zookeeper_path/mutations下有新的mutations entry生成時蜒车,mutations updating task就會被調(diào)度執(zhí)行讳嘱。
3. 如果mutations updating task在拉取和更新mutations時發(fā)生異常,在異常處理中會再次調(diào)度執(zhí)行mutations updating task酿愧,調(diào)度執(zhí)行的間隔時間是QUEUE_UPDATE_ERROR_SLEEP_MS(1秒)沥潭。
3.4 merge/mutation selecting task
上文中曾提到,zookeeper_path/muations中的一個mutation entry會被轉(zhuǎn)換成多個MUTATE_PART類型的log entry并投放到zookeeper_path/log中嬉挡。這個轉(zhuǎn)換工作是由merge/mutation select task完成的叛氨。顧名思義,這個task會負責merge和mutation任務的選擇和提交棘伴。
merge/mutation selecting task是一個通過BackgroundSchedulePool調(diào)度執(zhí)行的task寞埠,即StorageReplicatedMergeTree::merge_selecting_task。注意焊夸,這里的命名雖然是merge_selecting_task, 但該task涵蓋了merge和mutation任務的選擇和提交仁连。
merge_selecting_task執(zhí)行的函數(shù)是StorageRreplicatedMergeTree::mergeSelectingTask,我們看看merge/mutate selecting task做了哪些工作阱穗。
3.4.1 工作內(nèi)容
1. 統(tǒng)計同步任務隊列(ReplicatedMergeTreeQueue::queue)中MERGE_PARTS和MUTATE_PART類型的log entry的數(shù)量饭冬,詳見源碼。
2. 如果步驟1中統(tǒng)計得到的merge + mutation的任務總數(shù)大于或等于max_replicated_merges_in_queue(配置)揪阶,則ClickHouse認為隊列中merge和mutation任務已經(jīng)夠多了昌抠,不會選擇和提交新的merge/mutation任務,跳至步驟5鲁僚,否則繼續(xù)執(zhí)行步驟3炊苫。
3. 根據(jù)配置和一些runtime狀態(tài)(比如裁厅,磁盤空閑空間)計算出max_source_parts_size_for_merge 和?max_source_part_size_for_mutation,即最大可合并的source parts大小 和 最大可mutate的source part大小侨艾。這兩個值的計算邏輯見MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge 和?MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation执虹。相關(guān)配置有:
? ? ????a.?max_bytes_to_merge_at_min_space_in_pool
? ? ? ? b.?max_bytes_to_merge_at_max_space_in_pool
? ? ? ? c.?number_of_free_entries_in_pool_to_lower_max_size_of_merge
? ? ? ? d.?number_of_free_entries_in_pool_to_execute_mutation
4. 選擇是提交MERGE_PARTS任務,還是MUTATE_PART任務唠梨,或者都不執(zhí)行:
? ? 4.1 如果max_source_parts_size_for_merge為0(即沒空間做merge了)袋励,或者沒有需要merge且可以merge的data parts(選取過程詳見MergeTreeDataMergerMutator::selectPartsToMerge),則不提交MERGE_PARTS任務当叭,跳至步驟4.3茬故,否則繼續(xù)執(zhí)行步驟4.2。
? ? 4.2 調(diào)用StorageReplicatedMergeTree::createLogEntryToMergeParts構(gòu)建并提交一個MERGE_PARTS log entry到zookeeper_path/log中蚁鳖。然后跳至步驟5均牢,否則繼續(xù)執(zhí)行步驟4.3。
? ? 4.3 如果滿足以下條件之一才睹,則不提交MUTATE_PART徘跪,跳至步驟5,否則繼續(xù)執(zhí)行步驟4.4琅攘。
? ? ? ? a.?max_source_part_size_for_mutation為0垮庐。?
? ? ? ? b.?mutations_by_partition的size為0,即沒有需要執(zhí)行的mutation操作坞琴。
? ? ? ? c. 同步任務隊列中的MUTATE_PART任務數(shù)大于等于max_replicated_mutations_in_queue(配置)哨查。
? ? 4.4 遍歷當前表的committed data parts(MergeTreeData::getDataPartsVector),選擇一個滿足以下全部條件的data part 剧辐。如果沒有滿足條件的data part則跳至步驟5寒亥,否則繼續(xù)執(zhí)行步驟4.5。
? ? ? ? a. size on disk <=?max_source_part_size_for_mutation
? ? ? ? b. 在mutations_by_partition中存在data part所在partition的mutation操作荧关,并且對應的mutation version高于data part當前的mutation version溉奕。詳見ReplicatedMergeTreeMergePredicate::getDesiredMutationVersion和?ReplicatedMergeTreeQueue::getCurrentMutationVersionImpl。
? ? 4.5 在步驟4.4中選中一個data part后忍啤,調(diào)用StorageReplicatedMergeTree::createLogEntryToMutatePart構(gòu)建并提交一個MUTATE_PART log entry到zookeeper_path/log中加勤。
5. 如果上述步驟沒有成功提交log entry,則調(diào)度merge_selecting_task在MERGE_SELECTING_SLEEP_MS(5秒)后執(zhí)行同波。如果成功提交了鳄梅,則立刻調(diào)度merge_selecting_task執(zhí)行。
說明:
1. 每次merge_selecting_task的調(diào)用執(zhí)行未檩,只會提交一個MERGE_PARTS或MUTATE_PART任務到zookeeper_path/log中戴尸。
2. 根據(jù)MergeTreeDataMergerMutator::getMaxSourcePartSizeForMutation的實現(xiàn)可知,只有在空閑線程充足的情況下冤狡,才會考慮提交MUTATE_PART孙蒙。這是為了留更多線程給MERGE_PARTS项棠。而且,在步驟4中做選擇時马篮,也是先判斷是否可以提交MERGE_PARTS,如果有可提交的MERE_PARTS怜奖,則不會考慮MUTATE_PART浑测。 由此可見,在ClickHouse中歪玲,MERGE_PARTS任務的優(yōu)先級是高于MUTATE_PART任務的迁央。
3.?merge/mutation selecting task只在leader副本上執(zhí)行(常見所有副本都是leader的情況)。
3.4.2 觸發(fā)時機
merge/mutation selecting task被觸發(fā)執(zhí)行的時機有:
1. 當前副本贏得leader election后滥崩,激活并調(diào)度merge/mutation selecting task執(zhí)行岖圈。
2.?merge/mutation selecting task在執(zhí)行完成后,如果結(jié)果成功钙皮,則立刻調(diào)度merge/mutation selecting task執(zhí)行蜂科,如果失敗,則調(diào)度merge/mutation selecting task延遲MERGE_SELECTING_SLEEP_MS(5秒)后執(zhí)行短条。
3.?ReplicatedMergeTree表的插入操作在完成commit data part后, 會觸發(fā)merge/mutation selecting task調(diào)度執(zhí)行导匣,詳見源碼。
4. 在mutations updating task發(fā)現(xiàn)新的mutation并完成狀態(tài)更新后茸时,會觸發(fā)merge/mutation selecting task調(diào)度執(zhí)行贡定。
5. queue executing task在成功執(zhí)行MERG_PARTS log entry后,會觸發(fā)merge/mutation selecting task調(diào)度執(zhí)行可都,詳見StorageReplicatedMergeTree::tryExecuteMerge缓待。
6. queue executing task在成功執(zhí)行MUTATE_PART log entry后,會觸發(fā)merge/mutation selecting task調(diào)度執(zhí)行渠牲,詳見StorageReplicatedMergeTree::tryExecutePartMutation旋炒。
3.5 mutations finalizing task
merge/mutation selecting task將zookeeper_path/mutations下的mutation entry轉(zhuǎn)換成一個個MUTATE_PART log entry并投放到zookeeper_path/log下后,queue executing task會執(zhí)行這些MUTATE_PART任務签杈。當一個mutation entry相關(guān)的所有MUTATE_PART任務都完成了国葬,那么這個mutation entry就可以被結(jié)束了。
mutations finalizing task就是用來結(jié)束mutation entry的任務芹壕。mutations finalizing task是一個通過BackgroundSchedulePool調(diào)度執(zhí)行的任務汇四,即StorageReplicatedMergeTree::mutations_finalizing_task,其調(diào)用的函數(shù)是StorageReplicatedMergeTree::mutationsFinalizingTask踢涌,該函數(shù)的主要功能是通過調(diào)用ReplicatedMergeTreeQueue::tryFinalizeMutations實現(xiàn)的通孽。
下面介紹一下mutations finalizing task的工作內(nèi)容。
3.5.1 工作內(nèi)容
1. 遍歷mutations_by_znode中的所有mutations睁壁,對于is_done標志位false的mutations:
? ? 1.1 如果znode小于等于mutation_pointer(mutation已經(jīng)完成)背苦,則執(zhí)行以下操作:
? ? ? ? a. is_done置true互捌。
? ? ? ? b. 清空parts_to_do。
? ? ? ? c. 調(diào)用ReplicatedMergeTreeAltersSequence::finishDataAlter更新alter的狀態(tài)信息(如果對應的metadata alter也完成了行剂,則會從alter sequence移除該alter)秕噪。
? ? 1.2 如果znode大于mutation_pointer,且parts_to_do為空(mutation可能已完成)厚宰,則將該mutation放入candidates中腌巾。
2. 對于每個candidate,調(diào)用ReplicatedMergeTreeMergePredicate::isMutationFinished判斷其是否已經(jīng)結(jié)束铲觉,判斷邏輯是:
????2.1 如果mutation涉及的blocks還有在committing階段的澈蝙,則判定mutation為unfinished。
????2.2 如果現(xiàn)存data parts 或者 同步任務隊列(ReplicatedMergeTreeQueue::queue)的resulting data parts中還有需要執(zhí)行改mutation的data parts (詳見getPartNamesToMutate)撵幽,則判定mutation為unfinished灯荧。
????2.3 如果2.1和2.2都不成立,則判定mutation為finished盐杂。
3. 如果存在finished candidates逗载,則:
? ? 3.1 更新mutation_pointer : 將zookeeper上的replica_path/mutation_pointer和內(nèi)存里的ReplicatedMergeTreeQueue::mutation_pointer都設置為finished candidates中最大的znode name。
? ? 3.2 更新每個finished candidate的內(nèi)存狀態(tài)(MutationStatus):
? ? ? ? a.?is_done置true链烈。
? ? ? ? b.?調(diào)用ReplicatedMergeTreeAltersSequence::finishDataAlter更新alter的狀態(tài)信息撕贞。
說明:對于mutation finalizing的邏輯尚未完全理解,這里只是根據(jù)源碼列出執(zhí)行過程测垛,后續(xù)會在mutation相關(guān)文章做進一步解釋捏膨。
3.5.2 觸發(fā)時機
1. 當復制表首次被加載或者zookeeper session過期后被重新加載時,restarting thread會激活并調(diào)度mutations finalizing task執(zhí)行食侮。
2. 在更新mutations相關(guān)狀態(tài)時号涯,如果發(fā)現(xiàn)some_mutations_are_probably_done(即parts_to_do為空),則調(diào)度mutations finalizing task執(zhí)行锯七。
3. 如果mutations finalizing task在finalize mutations發(fā)生異常 或者 成功地finalize了一個或多個mutations链快,那么,則調(diào)度mutations finalizing task延遲MUTATIONS_FINALIZING_SLEEP_MS(1秒)執(zhí)行眉尸。
4. 如果mutations finalizing task順利運行后沒有finalize任何mutation域蜗,則調(diào)度mutations finalizing task延遲MUTATIONS_FINALIZING_IDLE_SLEEP_MS(5秒)執(zhí)行。
4 總結(jié)
1. 復制表的數(shù)據(jù)同步包括元數(shù)據(jù)同步和內(nèi)容數(shù)據(jù)同步噪猾。
2. 引發(fā)數(shù)據(jù)同步任務的操作包括INSERT霉祸,ALTER METADATA,MERGE袱蜡,MUTATION等丝蹭。
3. zookeeper在復制表的數(shù)據(jù)同步中起到橋梁作用,znode路徑分為表級別zookeeper_path和表副本級別的replica_path, 關(guān)鍵節(jié)點包括log坪蚁,log_pointer, queue, mutations, mutation_pointer等奔穿。
4. 各個副本節(jié)點都運行著負責數(shù)據(jù)同步的各類任務镜沽,包括queue updating task, queue executing task, mutations updating task, merge/mutation selecting task和mutations finalizing task。