ClickHouse數(shù)據(jù)導(dǎo)入

一 概述

目前Kafka數(shù)據(jù)導(dǎo)入ClickHouse的常用方案有兩種,一種是通過ClickHouse內(nèi)置的Kafka表引擎實現(xiàn),另一種是借助數(shù)據(jù)流組件,如Logstash盯捌。

以下會分別介紹這兩種方案。

二 數(shù)據(jù)導(dǎo)入方案

方案一 Kafka表引擎

方案介紹

Kafka表引擎基于librdkafka庫實現(xiàn)與Kafka的通信蘑秽,但它只充當(dāng)一個數(shù)據(jù)管道的角色饺著,負(fù)責(zé)拉取Kafka中的數(shù)據(jù);所以還需要一張物化視圖將Kafka引擎表中的數(shù)據(jù)實時同步到本地MergeTree系列表中肠牲。

為了提高性能幼衰,接受的消息被分組為 maxinsertblocksize 大小(由kafkamax_block_size參數(shù)空值缀雳,默認(rèn)值為65536)的塊渡嚣。如果未在 streamflushinterval_ms 毫秒(默認(rèn)500 ms)內(nèi)形成塊,則不關(guān)心塊的完整性肥印,都會將數(shù)據(jù)刷新到表中识椰。


方案一.png

相關(guān)配置參數(shù):
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

<kafka>
  <debug>cgrp</debug>
  <auto_offset_reset>smallest</auto_offset_reset>
</kafka>
 
<kafka_logs>
  <retry_backoff_ms>250</retry_backoff_ms>
  <fetch_min_bytes>100000</fetch_min_bytes>
</kafka_logs>

使用示例

1)部署Kafka

# 部署ZK(略)

# 部署Kafka
docker run -d --name=kafka21 --hostname=node21 --network=host -e KAFKA_ADVERTISED_HOST_NAME=node21 -e KAFKA_ZOOKEEPER_CONNECT=node21:2181,node22:2181,node23:2181 wurstmeister/kafka
 
# 創(chuàng)建Topic
./bin/kafka-topics.sh --zookeeper node21:2181 --create --topic test --partitions 1 --replication-factor 1
 
# 測試
./bin/kafka-console-producer.sh --broker-list node21:9092 --topic test
>{"id":1,"name":"tom"}
>{"id":2,"name":"jerry"}
 
./bin/kafka-console-consumer.sh --bootstrap-server node21:9092 --topic test --from-beginning
{"id":1,"name":"tom"}
{"id":2,"name":"jerry"}

2)創(chuàng)建Kafka引擎表

CREATE TABLE kafka_queue
(
  warehouse_id Int64,
  product_id  Int64,
  product_name String
)
ENGINE = Kafka()
SETTINGS
  kafka_broker_list='node21:9092',
  kafka_topic_list='test',
  kafka_group_name='test',
  kafka_format='JSONEachRow',
  kafka_skip_broken_messages=100

必選參數(shù):

  • kafka_broker_list:brokers列表
  • kafka_topic_list:Topic列表
  • kafka_group_name:消費組名稱
  • kafka_format:消息格式

可選參數(shù):

  • kafka_row_delimiter:每個消息體(記錄)之間的分隔符,默認(rèn)為‘‘\0’’
  • kafka_schema:對應(yīng)Kafka的schema參數(shù)
  • kafka_num_consumers:消費者數(shù)量(即線程數(shù))深碱,默認(rèn)值為1
  • kafka_skip_broken_messages:數(shù)據(jù)解析失敗時腹鹉,允許跳過的失敗的數(shù)據(jù)行數(shù),默認(rèn)值為0
  • kafka_max_block_size:每次可發(fā)送的最大數(shù)據(jù)量敷硅,默認(rèn)值等于max_block_size大小
  • kafka_commit_every_batch:執(zhí)行kafka commit的頻率功咒,默認(rèn)值為0,即當(dāng)一整個Block數(shù)據(jù)塊完全寫入數(shù)據(jù)表后才執(zhí)行commit绞蹦;如果設(shè)置為1力奋,則每寫完一個Batch批次的數(shù)據(jù)就會執(zhí)行一次commit(一次Block寫入操作由多次Batch寫入操作組成)

3)創(chuàng)建數(shù)據(jù)表
使用已有的數(shù)據(jù)表,以下只給出了分布表的創(chuàng)建語句坦辟。

CREATE TABLE warehouse_dist ON CLUSTER cluster_1
(
    warehouse_id Int64,
    product_id Int64,
    product_name String
)
ENGINE = Distributed(cluster_1, default, warehouse_local, warehouse_id)

4)創(chuàng)建物化視圖

CREATE MATERIALIZED VIEW kafka_view TO warehouse_dist AS SELECT * FROM kafka_queue

方案二 Logstash

方案介紹

與Elasticsearch寫入類似刊侯,通過Logstash的ClickHouse插件,訂閱Kafka中的數(shù)據(jù)并寫入CH中锉走。其中,ClickHouse插件調(diào)用HTTP接口完成數(shù)據(jù)寫入藕届。


方案二.png

使用示例

1)部署Logstash
部署Logstash挪蹭,并安裝ClickHouse插件:

bin/logstash-plugin install logstash-output-clickhouse

2)創(chuàng)建Logstash配置文件

input {
  stdin {
      codec => "json"
  }
}
 
output {
  stdout {
    codec => "json"
  }
  clickhouse {
    http_hosts => ["http://192.168.167.21:8123"]
    table => "warehouse_dist"
    request_tolerance => 1
    flush_size => 1000
    pool_max => 1000
  }
}
 
filter {
  mutate {
    remove_field => [ "@timestamp", "host", "@version" ]
  }
}

相關(guān)參數(shù):

  • save_on_failure:發(fā)送失敗是否保存數(shù)據(jù),默認(rèn)值為true
  • save_dir:發(fā)送失敗的數(shù)據(jù)的存儲路徑休偶,默認(rèn)為/tmp
  • automatic_retries:連接的重試次數(shù)梁厉,默認(rèn)值為1
  • request_tolerance:發(fā)送失敗(響應(yīng)碼不是200)的重試次數(shù),默認(rèn)值為5
  • backoff_time:下一次重試連接或發(fā)送請求的時間词顾,默認(rèn)值為3s
  • flush_size:每次批量發(fā)送的數(shù)據(jù)大小八秃,默認(rèn)值為50
  • idle_flush_time:每次數(shù)據(jù)批量發(fā)送的時間間隔

3)啟動Logstash

./bin/logstash -f config/logstash.conf

三 總結(jié)

Kafka引擎表和Logstash都是常見的數(shù)據(jù)導(dǎo)入方式,

  • Logstash可以將Kafka和ClickHouse解耦肉盹,與Elasticsearch數(shù)據(jù)的導(dǎo)入方式保持一直
  • Kafka引擎表是CH官方提供的數(shù)據(jù)導(dǎo)入方式昔驱,可靠性上會更好;Logstash的ClickHouse插件為第三方貢獻(xiàn)上忍,且已經(jīng)一年多沒有更新了
  • Kafka引擎表是基于librdkafka實現(xiàn)的骤肛,有更豐富的配置參數(shù)

參考:《ClickHouse原理解析與應(yīng)用實踐》

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市窍蓝,隨后出現(xiàn)的幾起案子腋颠,更是在濱河造成了極大的恐慌,老刑警劉巖吓笙,帶你破解...
    沈念sama閱讀 212,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件淑玫,死亡現(xiàn)場離奇詭異,居然都是意外死亡面睛,警方通過查閱死者的電腦和手機絮蒿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,755評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來侮穿,“玉大人歌径,你說我怎么就攤上這事∏酌” “怎么了回铛?”我有些...
    開封第一講書人閱讀 158,369評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長克锣。 經(jīng)常有香客問我茵肃,道長,這世上最難降的妖魔是什么袭祟? 我笑而不...
    開封第一講書人閱讀 56,799評論 1 285
  • 正文 為了忘掉前任验残,我火速辦了婚禮,結(jié)果婚禮上巾乳,老公的妹妹穿的比我還像新娘您没。我一直安慰自己,他們只是感情好胆绊,可當(dāng)我...
    茶點故事閱讀 65,910評論 6 386
  • 文/花漫 我一把揭開白布氨鹏。 她就那樣靜靜地躺著,像睡著了一般压状。 火紅的嫁衣襯著肌膚如雪仆抵。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,096評論 1 291
  • 那天,我揣著相機與錄音镣丑,去河邊找鬼舔糖。 笑死,一個胖子當(dāng)著我的面吹牛莺匠,可吹牛的內(nèi)容都是我干的金吗。 我是一名探鬼主播,決...
    沈念sama閱讀 39,159評論 3 411
  • 文/蒼蘭香墨 我猛地睜開眼慨蛙,長吁一口氣:“原來是場噩夢啊……” “哼辽聊!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起期贫,我...
    開封第一講書人閱讀 37,917評論 0 268
  • 序言:老撾萬榮一對情侶失蹤跟匆,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后通砍,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體玛臂,經(jīng)...
    沈念sama閱讀 44,360評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,673評論 2 327
  • 正文 我和宋清朗相戀三年封孙,在試婚紗的時候發(fā)現(xiàn)自己被綠了迹冤。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,814評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡虎忌,死狀恐怖泡徙,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情膜蠢,我是刑警寧澤堪藐,帶...
    沈念sama閱讀 34,509評論 4 334
  • 正文 年R本政府宣布,位于F島的核電站挑围,受9級特大地震影響礁竞,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜杉辙,卻給世界環(huán)境...
    茶點故事閱讀 40,156評論 3 317
  • 文/蒙蒙 一模捂、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧蜘矢,春花似錦狂男、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至珍昨,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背镣典。 一陣腳步聲響...
    開封第一講書人閱讀 32,123評論 1 267
  • 我被黑心中介騙來泰國打工兔毙, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人兄春。 一個月前我還...
    沈念sama閱讀 46,641評論 2 362
  • 正文 我出身青樓澎剥,卻偏偏與公主長得像,于是被迫代替她去往敵國和親赶舆。 傳聞我的和親對象是個殘疾皇子哑姚,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,728評論 2 351

推薦閱讀更多精彩內(nèi)容