一友浸、producer 相關(guān)命令
1. kafka-console-producer 生產(chǎn)消息
使用kafka-console-producer我們可以快速往某個topic推送消息。kafka-console-producer使用的也是KafkaProducer類進(jìn)行消息的推送,因此KafkaProducer支持的參數(shù)kafka-console-producer都可以配置。
有關(guān)KafkaProducer的相關(guān)原理可以看我的這篇博客:
https://blog.csdn.net/u013332124/article/details/81321942
# 執(zhí)行下面這條命令后會進(jìn)入producer的交互界面,輸入字符串就會將消息推送到kafka集群
kafka-console-producer --broker-list 127.0.0.1:9092 --topic test
# 推送10條消息 分別是1唾琼、2、3澎剥、...锡溯、10
seq 10 | kafka-console-producer --broker-list 127.0.0.1:9092 --topic yangjb
# 推送hello world 到kafka集群
echo "nihao world" | kafka-console-producer --broker-list 127.0.0.1:9092 --topic yangjb
2. 使用 kafka-producer-perf-test 進(jìn)行producer的基準(zhǔn)測試
我們要修改某個配置時,經(jīng)常想知道這個配置的修改對kafka的性能會有哪些影響哑姚,這時候就可以來個基準(zhǔn)測試來衡量配置修改對producer性能的影響祭饭。kafka官方就提供了這樣一個工具,讓我們很方面的對produer的性能進(jìn)行測試叙量。
下面是kafka-producer-perf-test支持的一些參數(shù)
--topic TOPIC 指定topic
--num-records NUM-RECORDS 要推送多少條數(shù)據(jù)
--payload-delimiter PAYLOAD-DELIMITER 當(dāng)使用payload文件生成數(shù)據(jù)時甜癞,指定每條消息的之間的分割符,默認(rèn)是換行符
--throughput THROUGHPUT 推送消息時的吞吐量宛乃,單位是 messages/sec悠咱。必須指定
--producer-props PROP-NAME=PROP-VALUE [PROP-NAME=PROP-VALUE ...] 指定producer的一些配置
--producer.config CONFIG-FILE 直接指定配置文件
--print-metrics 是否要在最后輸出度量指標(biāo),默認(rèn)是false
# 生成數(shù)據(jù)的方式有兩種征炼,一種是我們指定一個消息大小析既,該工具會隨機(jī)生成一個指定大小的字符串推送,一個是我們指定一個文件谆奥,工具會從該文件中隨機(jī)選取一條消息推送
# 下面兩種方式只能選擇一種
--record-size RECORD-SIZE 指定每條消息的大小,大小是bytes
--payload-file PAYLOAD-FILE 指定文件存放目錄
下面我們測試使用producer一次推送100條數(shù)據(jù)
# 通過 --producer-props指定要連接的broker地址
# --num-records 指定一共要推送100條
# --throughput 表示吞吐量眼坏,限制每秒20
# --record-size 表示每條消息的大小是20B
kafka-producer-perf-test --producer-props bootstrap.servers=127.0.0.1:9092 client.id=perftest --num-records 100 --throughput 10 --topic test --record-size 20
最后輸出報(bào)告:
52 records sent, 10.4 records/sec (0.00 MB/sec), 5.2 ms avg latency, 137.0 max latency.
100 records sent, 9.993005 records/sec (0.00 MB/sec), 3.78 ms avg latency, 137.00 ms max latency, 2 ms 50th, 4 ms 95th, 137 ms 99th, 137 ms 99.9th.
我們可以編輯一個payload.txt,輸入
hello
world
producer
perf
test
接著使用該payload.txt進(jìn)行測試
# --payload-file 指定文件地址
kafka-producer-perf-test --producer-props bootstrap.servers=127.0.0.1:9092 client.id=perftest --payload-file payload.txt --num-records 100 --throughput 100 --topic test
該工具在執(zhí)行時酸些,會讀取payload.txt的內(nèi)容宰译,然后根據(jù)--payload-delimiter
將文本分成一條條消息,接著測試的時候會隨機(jī)發(fā)送這些消息魄懂。
3. 使用 kafka-verifiable-producer 批量推送消息
kafka提供了kafka-verifiable-producer工具用于快速的推送一批消息到producer沿侈,并且可以打印出各條推送消息的元信息。推送的消息是從0開始不斷往上遞增市栗。
支持參數(shù)
--topic TOPIC 指定topic
--broker-list HOST1:PORT1[,HOST2:PORT2[...]] 指定kafka broker地址
--max-messages MAX-MESSAGES 一共要推送多少條缀拭,默認(rèn)為-1咳短,-1表示一直推送到進(jìn)程關(guān)閉位置
--throughput THROUGHPUT 推送消息時的吞吐量,單位messages/sec蛛淋。默認(rèn)為-1咙好,表示沒有限制
--acks ACKS 每次推送消息的ack值,默認(rèn)是-1
--producer.config CONFIG_FILE 指定producer的配置文件
--value-prefix VALUE-PREFIX 推送的消息默認(rèn)是遞增的數(shù)字褐荷,我們可以在這些消息前面加上指定的前綴勾效。這個前綴好像也必須是數(shù)字
demo:
# --max-messages 10 總共推送10條
# 每秒推送2條
kafka-verifiable-producer --broker-list 127.0.0.1:9092 --topic test --max-messages 10 --throughput 2
輸出:
{"timestamp":1544327879247,"name":"startup_complete"}
{"timestamp":1544327879413,"name":"producer_send_success","key":null,"value":"0","offset":91029,"partition":0,"topic":"test"}
{"timestamp":1544327879415,"name":"producer_send_success","key":null,"value":"1","offset":91030,"partition":0,"topic":"test"}
{"timestamp":1544327879904,"name":"producer_send_success","key":null,"value":"2","offset":91031,"partition":0,"topic":"test"}
{"timestamp":1544327880406,"name":"producer_send_success","key":null,"value":"3","offset":91032,"partition":0,"topic":"test"}
{"timestamp":1544327880913,"name":"producer_send_success","key":null,"value":"4","offset":91033,"partition":0,"topic":"test"}
{"timestamp":1544327881414,"name":"producer_send_success","key":null,"value":"5","offset":91034,"partition":0,"topic":"test"}
{"timestamp":1544327881918,"name":"producer_send_success","key":null,"value":"6","offset":91035,"partition":0,"topic":"test"}
{"timestamp":1544327882422,"name":"producer_send_success","key":null,"value":"7","offset":91036,"partition":0,"topic":"test"}
{"timestamp":1544327882924,"name":"producer_send_success","key":null,"value":"8","offset":91037,"partition":0,"topic":"test"}
{"timestamp":1544327883430,"name":"producer_send_success","key":null,"value":"9","offset":91038,"partition":0,"topic":"test"}
{"timestamp":1544327883942,"name":"shutdown_complete"}
{"timestamp":1544327883943,"name":"tool_data","sent":10,"acked":10,"target_throughput":2,"avg_throughput":2.1294718909710393}
4. 使用kafka-replay-log-producer進(jìn)行topic之間的消息復(fù)制
使用kafka-replay-log-producer可以將一個topic的消息復(fù)制到另外一個topic上。它的流程是先從topic拉取消息叛甫,然后推送到另一個topic葵第。
支持的參數(shù):
--broker-list <String: hostname:port> 指定broker的地址
--inputtopic <String: input-topic> 要讀取的topic名稱
--messages <Integer: count> 要復(fù)制的消息數(shù)量,默認(rèn)是-1合溺,也就是全部
--outputtopic <String: output-topic> 要復(fù)制到哪個topic
--property <String: producer properties> 可以指定producer的一些參數(shù)
--reporting-interval <Integer: size> 匯報(bào)進(jìn)度的頻率卒密,默認(rèn)是5000匯報(bào)一次
--sync 是否開啟同步模式
--threads <Integer: threads> 復(fù)制消息的線程數(shù)
--zookeeper <String: zookeeper url> zk地址
demo:
# 把topic-test的消息復(fù)制給topic-aaaa
# --messages 表示只復(fù)制前50條
kafka-replay-log-producer --broker-list 127.0.0.1:9092 --zookeeper 127.0.0.1:2181/kafka --inputtopic test --outputtopic aaaa --messages 50
二、consumer相關(guān)命令
1. kafka-console-consumer 消費(fèi)消息
使用kafka-console-consumer可以消費(fèi)指定topic的消息棠赛。底層也是使用KafkaConsumer進(jìn)行消費(fèi)的哮奇。相關(guān)消費(fèi)原理可以看我的這兩篇博客:
# 指定消費(fèi)topic-test的消息
# --from-beginning 表示如果之前沒有過消費(fèi)記錄,就從第一條開始消費(fèi)
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --group hahaeh --topic test --from-beginning
kafka-console-consumer 還支持配置一些其他的參數(shù)睛约,用戶可以自行通過 —help 參數(shù)查看鼎俘。
2. 使用 kafka-consumer-perf-test 進(jìn)行consumer的基準(zhǔn)測試
和producer一樣雏节,kafka也會consumer提供了一個命令來進(jìn)行基準(zhǔn)測試止状。
# --fetch-size 表示一次請求拉取多少條數(shù)據(jù)
# --messages 表示總共要拉取多少條數(shù)據(jù)
kafka-consumer-perf-test --broker-list 127.0.0.1:9092 --fetch-size 200 --group oka --topic test --messages 200
輸出:
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2018-12-09 13:37:19:118, 2018-12-09 13:37:20:393, 0.0003, 0.0002, 162, 127.0588, 21, 1254, 0.0002, 129.1866
kafka-consumer-perf-test還支持其他參數(shù):
--consumer.config <String: config file> 指定Consumer使用的配置文件
--date-format 定義輸出的日志格式
--from-latest 如果之前沒有消費(fèi)記錄剂府,是否從之前消費(fèi)過的地方開始消費(fèi)
--num-fetch-threads 拉取消息的線程數(shù)
--threads 處理消息的線程數(shù)
--reporting-interval 多久輸出一次執(zhí)行過程信息
3. 使用kafka-verifiable-consumer批量拉取消息
kafka-verifiable-consumer可以批量的拉取消息娇未,其實(shí)和kafka-console-consumer命令差不多。不過使用kafka-verifiable-consumer消費(fèi)消息輸出的內(nèi)容更豐富锅知,還包括offset等信息康震,并且可以設(shè)置只讀取幾條消息等吮螺。kafka-console-consumer是有多少讀多少商膊。
# --max-messages 5 表示只拉取5條
# --verbose 表示輸出每一條消息的內(nèi)容
kafka-verifiable-consumer --broker-list 127.0.0.1:9092 --max-messages 5 --group-id hello --topic test --verbose
輸出:
{"timestamp":1544335112709,"name":"startup_complete"}
{"timestamp":1544335112862,"name":"partitions_revoked","partitions":[]}
{"timestamp":1544335112883,"name":"partitions_assigned","partitions":[{"topic":"test","partition":0}]}
{"timestamp":1544335112919,"name":"record_data","key":null,"value":"90218","topic":"test","partition":0,"offset":90877}
{"timestamp":1544335112920,"name":"record_data","key":null,"value":"90219","topic":"test","partition":0,"offset":90878}
{"timestamp":1544335112920,"name":"record_data","key":null,"value":"0","topic":"test","partition":0,"offset":90879}
{"timestamp":1544335112921,"name":"record_data","key":null,"value":"1","topic":"test","partition":0,"offset":90880}
{"timestamp":1544335112921,"name":"record_data","key":null,"value":"2","topic":"test","partition":0,"offset":90881}
{"timestamp":1544335112921,"name":"records_consumed","count":162,"partitions":[{"topic":"test","partition":0,"count":5,"minOffset":90877,"maxOffset":90881}]}
{"timestamp":1544335112928,"name":"offsets_committed","offsets":[{"topic":"test","partition":0,"offset":90882}],"success":true}
{"timestamp":1544335112943,"name":"shutdown_complete"}
kafka-verifiable-consumer命令還支持以下參數(shù):
--session-timeout consumer的超時時間
--enable-autocommit 是否開啟自動offset提交伏伐,默認(rèn)是false
--reset-policy 當(dāng)以前沒有消費(fèi)記錄時,選擇要拉取offset的策略晕拆,可以是'earliest', 'latest','none'藐翎。默認(rèn)是earliest
--assignment-strategy consumer分配分區(qū)策略,默認(rèn)是RoundRobinAssignor
--consumer.config 指定consumer的配置
4. 使用kafka-consumer-groups命令管理ConsumerGroup
?实幕、列出所有的ConsumerGroup(新舊版本api區(qū)別)
由于kafka consumer api有新版本和舊版本的區(qū)別吝镣,因此使用kafka-consumer-groups進(jìn)行g(shù)roup的管理時,內(nèi)部使用的機(jī)制也不一樣昆庇。如果我們使用—zookeeper
來連接集群末贾,則使用的是舊版本的consumer group管理規(guī)則,也就是ConsumerGroup的一些元數(shù)據(jù)是存儲在zk上的凰锡。如果使用--bootstrap-server
來連接未舟,則是面向新版本的consumer group規(guī)則圈暗。
列出使用舊版本的所有consumer group
kafka-consumer-groups --zookeeper 127.0.0.1:2181/kafka --list
列出新版本的所有consumer group
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --list
?掂为、刪除ConsumerGroup
刪除指定的group
# 刪除 helo和hahah 這兩個group
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --delete --group helo --group hahah
刪除指定group的指定topic的消費(fèi)記錄(topic級別的刪除僅在舊版本api中支持)
# 舊版本api 必須指定zk地址
kafka-consumer-groups --zookeeper 127.0.0.1:2181/kafka --delete --group helo --topic test
刪除指定topic在所有g(shù)roup中的消費(fèi)記錄(topic級別的刪除僅在舊版本api中支持)
# 舊版本api 必須指定zk地址
kafka-consumer-groups --zookeeper 127.0.0.1:2181/kafka --delete --topic test
?裕膀、列出ConsumerGroup詳情
通過—describe可以從不同維度觀察group的信息。
查看group的offset消費(fèi)記錄
# --offset是--describe的默認(rèn)選項(xiàng)勇哗,可以不傳
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --describe --group mytest --offset
輸出
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 796670 796676 6 consumer-1-00ab2315-e3f3-4261-8392-f9fae4668f87 /172.20.16.13 consumer-1
查看group的member信息
# --members 表示輸出member信息
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --describe --group mytest --members
輸出
CONSUMER-ID HOST CLIENT-ID #PARTITIONS
consumer-1-00ab2315-e3f3-4261-8392-f9fae4668f87 /172.20.16.13 consumer-1 1
查看group的狀態(tài)
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --describe --group mytest --state
輸出:
COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS
172.20.16.13:9092 (0) range Stable 1
?昼扛、重置group的消費(fèi)記錄
當(dāng)選擇重置消費(fèi)記錄操作時,目標(biāo)Group的狀態(tài)一定不能是活躍的欲诺。也就是該group中不能有consumer在消費(fèi)抄谐。
通過 --reset-offsets
可以重置指定group的消費(fèi)記錄。和--reset-offsets
搭配的有兩個選項(xiàng)扰法,--dry-run
和--execute
蛹含,默認(rèn)是--dry-run
。
dry-run 模式
當(dāng)運(yùn)行在--dry-run
模式下塞颁,重置操作不會真正的執(zhí)行浦箱,只會預(yù)演重置offset的結(jié)果。該模式也是為了讓用戶謹(jǐn)慎的操作祠锣,否則直接重置消費(fèi)記錄會造成各個consumer消息讀取的異常酷窥。
# --shift-by -1 表示將消費(fèi)的offset重置成當(dāng)前消費(fèi)的offset-1
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --shift-by -1 --topic test --group mytest --dry-run
輸出
TOPIC PARTITION NEW-OFFSET
test 0 797054
此時如果去查詢該group的消費(fèi)offset,會發(fā)現(xiàn)該group的消費(fèi)offset其實(shí)還是797055伴网,并沒有發(fā)生改變蓬推。
—execute 模式
通過--execute
參數(shù)可以直接執(zhí)行重置操作。
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --shift-by -1 --topic test --group mytest --execute
重置offset的幾種策略
該命令提供了多種offset重置策略給我們選擇如何重置offset
--to-current 直接重置offset到當(dāng)前的offset澡腾,也就是LOE
--to-datetime <String: datetime> 重置offset到指定時間的offset處
--to-earliest 重置offset到最開始的那條offset
--to-offset <Long: offset> 重置offset到目標(biāo)的offset
--shift-by <Long:n> 根據(jù)當(dāng)前的offset進(jìn)行重置沸伏,n可以是正負(fù)數(shù)
--from-file <String: path to CSV file> 通過外部的csv文件描述來進(jìn)行重置
Demo:
# 將group mytest的test 這個topic的消費(fèi)offset重置到666
# 注意如果topic分區(qū)中的最小offset比666還大的話,就會直接使用該最小offset作為消費(fèi)offset
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --topic test --group mytest --execute --to-offset 666
# 重置到最早的那條offset
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --topic test --group mytest --execute --to-earliest
我們再看下如何使用--from-file
來重置offset动分。首先先編輯一個文件 reset.csv
test,0,796000
3列分別是topicName,partition,offset馋评。最后輸入重置命令
kafka-consumer-groups --bootstrap-server 127.0.0.1:9092 --reset-offsets --group mytest --execute --from-file reset.csv
三、replica數(shù)據(jù)一致性校驗(yàn)
通過kafka-replica-verification命令可以檢查指定topic的各個partition的replic的數(shù)據(jù)是否一致刺啦。
kafka-replica-verification --broker-list 127.0.0.1:9092
默認(rèn)是檢查全部topic留特,可以通過指定topic-white-list
來指定只檢查一些topic。
replica一致性檢查主要是根據(jù)partition的HW來檢查的玛瘸,大概原理是在所有的broker都開啟一個fetcher蜕青,然后拉取數(shù)據(jù)做檢查各個replica的數(shù)據(jù)是否一致。因此糊渊,進(jìn)行該檢查時要保證所有的broker都在線右核,否則該工具會一直阻塞直到broker全部啟動。