Saturday night萤皂,繼續(xù)超短文模式。
在ClickHouse集群中,我們可以在DDL語(yǔ)句上附加ON CLUSTER <cluster_name>
的語(yǔ)法人芽,使得該DDL語(yǔ)句執(zhí)行一次即可在集群中所有實(shí)例上都執(zhí)行,簡(jiǎn)單方便绩脆。每執(zhí)行一條分布式DDL萤厅,會(huì)在配置文件中<distributed_ddl><path>
指定的ZooKeeper路徑上寫一條執(zhí)行記錄(路徑默認(rèn)為/clickhouse/task_queue/ddl
)。如下圖所示靴迫。
但是惕味,這個(gè)隊(duì)列默認(rèn)似乎不會(huì)自動(dòng)清理,造成znode不斷增長(zhǎng)玉锌,官方文檔中也沒有提供對(duì)應(yīng)的參數(shù)來控制名挥。考慮到手動(dòng)刪除znode可能會(huì)有風(fēng)險(xiǎn)主守,遂去ClickHouse源碼中尋找蛛絲馬跡禀倔,最終在dbms/src/interpreters/DDLWorker.h里找到如下定義:
/// Cleaning starts after new node event is received if the last cleaning wasn't made sooner than N seconds ago
Int64 cleanup_delay_period = 60; // minute (in seconds)
/// Delete node if its age is greater than that
Int64 task_max_lifetime = 7 * 24 * 60 * 60; // week (in seconds)
/// How many tasks could be in the queue
size_t max_tasks_in_queue = 1000;
- cleanup_delay_period:檢查DDL記錄清理的間隔榄融,單位為秒,默認(rèn)60秒救湖。
- task_max_lifetime:分布式DDL記錄可以保留的最大時(shí)長(zhǎng)愧杯,單位為秒,默認(rèn)保留7天鞋既。
- max_tasks_in_queue:分布式DDL隊(duì)列中可以保留的最大記錄數(shù)力九,默認(rèn)為1000條。
將以上參數(shù)加入config.xml的<distributed_ddl>
一節(jié)即可涛救。
<distributed_ddl>
<!-- Path in ZooKeeper to queue with DDL queries -->
<path>/clickhouse/task_queue/ddl</path>
<cleanup_delay_period>60</cleanup_delay_period>
<task_max_lifetime>86400</task_max_lifetime>
<max_tasks_in_queue>200</max_tasks_in_queue>
</distributed_ddl>
ClickHouse內(nèi)部有專門的線程來清理DDL隊(duì)列畏邢,具體邏輯位于DDLWorker.cpp中,不難检吆,代碼錄如下舒萎。
void DDLWorker::runCleanupThread()
{
setThreadName("DDLWorkerClnr");
LOG_DEBUG(log, "Started DDLWorker cleanup thread");
Int64 last_cleanup_time_seconds = 0;
while (!stop_flag)
{
try
{
cleanup_event->wait();
if (stop_flag)
break;
Int64 current_time_seconds = Poco::Timestamp().epochTime();
if (last_cleanup_time_seconds && current_time_seconds < last_cleanup_time_seconds + cleanup_delay_period)
{
LOG_TRACE(log, "Too early to clean queue, will do it later.");
continue;
}
auto zookeeper = tryGetZooKeeper();
if (zookeeper->expired())
continue;
cleanupQueue(current_time_seconds, zookeeper);
last_cleanup_time_seconds = current_time_seconds;
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
}
}
void DDLWorker::cleanupQueue(Int64 current_time_seconds, const ZooKeeperPtr & zookeeper)
{
LOG_DEBUG(log, "Cleaning queue");
Strings queue_nodes = zookeeper->getChildren(queue_dir);
filterAndSortQueueNodes(queue_nodes);
size_t num_outdated_nodes = (queue_nodes.size() > max_tasks_in_queue) ? queue_nodes.size() - max_tasks_in_queue : 0;
auto first_non_outdated_node = queue_nodes.begin() + num_outdated_nodes;
for (auto it = queue_nodes.cbegin(); it < queue_nodes.cend(); ++it)
{
if (stop_flag)
return;
String node_name = *it;
String node_path = queue_dir + "/" + node_name;
String lock_path = node_path + "/lock";
Coordination::Stat stat;
String dummy;
try
{
/// Already deleted
if (!zookeeper->exists(node_path, &stat))
continue;
/// Delete node if its lifetime is expired (according to task_max_lifetime parameter)
constexpr UInt64 zookeeper_time_resolution = 1000;
Int64 zookeeper_time_seconds = stat.ctime / zookeeper_time_resolution;
bool node_lifetime_is_expired = zookeeper_time_seconds + task_max_lifetime < current_time_seconds;
/// If too many nodes in task queue (> max_tasks_in_queue), delete oldest one
bool node_is_outside_max_window = it < first_non_outdated_node;
if (!node_lifetime_is_expired && !node_is_outside_max_window)
continue;
/// Skip if there are active nodes (it is weak guard)
if (zookeeper->exists(node_path + "/active", &stat) && stat.numChildren > 0)
{
LOG_INFO(log, "Task " << node_name << " should be deleted, but there are active workers. Skipping it.");
continue;
}
/// Usage of the lock is not necessary now (tryRemoveRecursive correctly removes node in a presence of concurrent cleaners)
/// But the lock will be required to implement system.distributed_ddl_queue table
auto lock = createSimpleZooKeeperLock(zookeeper, node_path, "lock", host_fqdn_id);
if (!lock->tryLock())
{
LOG_INFO(log, "Task " << node_name << " should be deleted, but it is locked. Skipping it.");
continue;
}
if (node_lifetime_is_expired)
LOG_INFO(log, "Lifetime of task " << node_name << " is expired, deleting it");
else if (node_is_outside_max_window)
LOG_INFO(log, "Task " << node_name << " is outdated, deleting it");
/// Deleting
{
Strings childs = zookeeper->getChildren(node_path);
for (const String & child : childs)
{
if (child != "lock")
zookeeper->tryRemoveRecursive(node_path + "/" + child);
}
/// Remove the lock node and its parent atomically
Coordination::Requests ops;
ops.emplace_back(zkutil::makeRemoveRequest(lock_path, -1));
ops.emplace_back(zkutil::makeRemoveRequest(node_path, -1));
zookeeper->multi(ops);
lock->unlockAssumeLockNodeRemovedManually();
}
}
catch (...)
{
LOG_INFO(log, "An error occured while checking and cleaning task " + node_name + " from queue: " + getCurrentExceptionMessage(false));
}
}
}
民那晚安晚安。