一 概述
目前Kafka數(shù)據(jù)導(dǎo)入ClickHouse的常用方案有兩種,一種是通過ClickHouse內(nèi)置的Kafka表引擎實現(xiàn),另一種是借助數(shù)據(jù)流組件,如Logstash盯捌。
以下會分別介紹這兩種方案。
二 數(shù)據(jù)導(dǎo)入方案
方案一 Kafka表引擎
方案介紹
Kafka表引擎基于librdkafka庫實現(xiàn)與Kafka的通信蘑秽,但它只充當(dāng)一個數(shù)據(jù)管道的角色饺著,負(fù)責(zé)拉取Kafka中的數(shù)據(jù);所以還需要一張物化視圖將Kafka引擎表中的數(shù)據(jù)實時同步到本地MergeTree系列表中肠牲。
為了提高性能幼衰,接受的消息被分組為 maxinsertblocksize 大小(由kafkamax_block_size參數(shù)空值缀雳,默認(rèn)值為65536)的塊渡嚣。如果未在 streamflushinterval_ms 毫秒(默認(rèn)500 ms)內(nèi)形成塊,則不關(guān)心塊的完整性肥印,都會將數(shù)據(jù)刷新到表中识椰。
相關(guān)配置參數(shù):
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
<kafka>
<debug>cgrp</debug>
<auto_offset_reset>smallest</auto_offset_reset>
</kafka>
<kafka_logs>
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_logs>
使用示例
1)部署Kafka
# 部署ZK(略)
# 部署Kafka
docker run -d --name=kafka21 --hostname=node21 --network=host -e KAFKA_ADVERTISED_HOST_NAME=node21 -e KAFKA_ZOOKEEPER_CONNECT=node21:2181,node22:2181,node23:2181 wurstmeister/kafka
# 創(chuàng)建Topic
./bin/kafka-topics.sh --zookeeper node21:2181 --create --topic test --partitions 1 --replication-factor 1
# 測試
./bin/kafka-console-producer.sh --broker-list node21:9092 --topic test
>{"id":1,"name":"tom"}
>{"id":2,"name":"jerry"}
./bin/kafka-console-consumer.sh --bootstrap-server node21:9092 --topic test --from-beginning
{"id":1,"name":"tom"}
{"id":2,"name":"jerry"}
2)創(chuàng)建Kafka引擎表
CREATE TABLE kafka_queue
(
warehouse_id Int64,
product_id Int64,
product_name String
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list='node21:9092',
kafka_topic_list='test',
kafka_group_name='test',
kafka_format='JSONEachRow',
kafka_skip_broken_messages=100
必選參數(shù):
- kafka_broker_list:brokers列表
- kafka_topic_list:Topic列表
- kafka_group_name:消費組名稱
- kafka_format:消息格式
可選參數(shù):
- kafka_row_delimiter:每個消息體(記錄)之間的分隔符,默認(rèn)為‘‘\0’’
- kafka_schema:對應(yīng)Kafka的schema參數(shù)
- kafka_num_consumers:消費者數(shù)量(即線程數(shù))深碱,默認(rèn)值為1
- kafka_skip_broken_messages:數(shù)據(jù)解析失敗時腹鹉,允許跳過的失敗的數(shù)據(jù)行數(shù),默認(rèn)值為0
- kafka_max_block_size:每次可發(fā)送的最大數(shù)據(jù)量敷硅,默認(rèn)值等于max_block_size大小
- kafka_commit_every_batch:執(zhí)行kafka commit的頻率功咒,默認(rèn)值為0,即當(dāng)一整個Block數(shù)據(jù)塊完全寫入數(shù)據(jù)表后才執(zhí)行commit绞蹦;如果設(shè)置為1力奋,則每寫完一個Batch批次的數(shù)據(jù)就會執(zhí)行一次commit(一次Block寫入操作由多次Batch寫入操作組成)
3)創(chuàng)建數(shù)據(jù)表
使用已有的數(shù)據(jù)表,以下只給出了分布表的創(chuàng)建語句坦辟。
CREATE TABLE warehouse_dist ON CLUSTER cluster_1
(
warehouse_id Int64,
product_id Int64,
product_name String
)
ENGINE = Distributed(cluster_1, default, warehouse_local, warehouse_id)
4)創(chuàng)建物化視圖
CREATE MATERIALIZED VIEW kafka_view TO warehouse_dist AS SELECT * FROM kafka_queue
方案二 Logstash
方案介紹
與Elasticsearch寫入類似刊侯,通過Logstash的ClickHouse插件,訂閱Kafka中的數(shù)據(jù)并寫入CH中锉走。其中,ClickHouse插件調(diào)用HTTP接口完成數(shù)據(jù)寫入藕届。
使用示例
1)部署Logstash
部署Logstash挪蹭,并安裝ClickHouse插件:
bin/logstash-plugin install logstash-output-clickhouse
2)創(chuàng)建Logstash配置文件
input {
stdin {
codec => "json"
}
}
output {
stdout {
codec => "json"
}
clickhouse {
http_hosts => ["http://192.168.167.21:8123"]
table => "warehouse_dist"
request_tolerance => 1
flush_size => 1000
pool_max => 1000
}
}
filter {
mutate {
remove_field => [ "@timestamp", "host", "@version" ]
}
}
相關(guān)參數(shù):
- save_on_failure:發(fā)送失敗是否保存數(shù)據(jù),默認(rèn)值為true
- save_dir:發(fā)送失敗的數(shù)據(jù)的存儲路徑休偶,默認(rèn)為/tmp
- automatic_retries:連接的重試次數(shù)梁厉,默認(rèn)值為1
- request_tolerance:發(fā)送失敗(響應(yīng)碼不是200)的重試次數(shù),默認(rèn)值為5
- backoff_time:下一次重試連接或發(fā)送請求的時間词顾,默認(rèn)值為3s
- flush_size:每次批量發(fā)送的數(shù)據(jù)大小八秃,默認(rèn)值為50
- idle_flush_time:每次數(shù)據(jù)批量發(fā)送的時間間隔
3)啟動Logstash
./bin/logstash -f config/logstash.conf
三 總結(jié)
Kafka引擎表和Logstash都是常見的數(shù)據(jù)導(dǎo)入方式,
- Logstash可以將Kafka和ClickHouse解耦肉盹,與Elasticsearch數(shù)據(jù)的導(dǎo)入方式保持一直
- Kafka引擎表是CH官方提供的數(shù)據(jù)導(dǎo)入方式昔驱,可靠性上會更好;Logstash的ClickHouse插件為第三方貢獻(xiàn)上忍,且已經(jīng)一年多沒有更新了
- Kafka引擎表是基于librdkafka實現(xiàn)的骤肛,有更豐富的配置參數(shù)
參考:《ClickHouse原理解析與應(yīng)用實踐》