1. kafka-0.8.x常用命令
目錄
- 1.1 啟停kafkaserver
- 1.2 建立和刪除topic
- 1.3 查看topic的最大最小offset
- 1.4 通過控制臺命令生產(chǎn)和消費消息
- 1.5 查看消費者狀態(tài)和消費詳情
- 1.6 重置消費者offset
- 1.7 查看topic的狀態(tài)和分區(qū)負(fù)載詳情
- 此文檔適用于kafka-0.10.1及之后的版本
- 此文檔所有命令默認(rèn)的路徑都是kafka的home府适,即kafka安裝目錄
1.1 啟停kafkaserver
命令:
//啟動kafka
nohup bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &
//關(guān)閉kafka,由于是優(yōu)雅啟停,在進程真的結(jié)束之前可能有一些清理工作栅葡,所以不會進程不會立刻消失盖桥,等待數(shù)秒
bin/kafka-server-stop.sh config/server.properties
//等待數(shù)秒勃痴,如果kafka進程仍無法停止恬惯,執(zhí)行
kill pid
//如果仍然無法停止
kill -9 pid
啟動之后誉尖,可以查看進程狭郑、日志是否正常
1.2 建立和刪除topic
kafka server端需要配置delete.topic.enable=true才可以刪除腹暖,否則執(zhí)行刪除無效,只會將topic標(biāo)記為刪除翰萨,不會執(zhí)行真正的刪除
命令:
bin/kafka-topics.sh --zookeeper ZOOKEEPER_HOST1:PORT1,ZOOKEEPER_HOST2:PORT2 --create --replication-factor REPLICA_NUM --partitions PARTITION_NUM --topic TOPIC_NAME
- ZOOKEEPER_HOST是kafka所使用的zookeeper的ip地址脏答,PORT是zookeeper監(jiān)聽的端口。多個host port之間用逗號隔開
- TOPIC_NAME是要創(chuàng)建的topic的名稱
- PARTITION_NUM是要創(chuàng)建的topic的分區(qū)數(shù)
- REPLICA_NUM是要創(chuàng)建的topic的每個分區(qū)的副本數(shù)
- zookeeper集群不需要全部列上亩鬼,給出一個可用的zk地址和端口即可
例如殖告,新建一個名為new_created_topic的topic
//新建topic,topic name為new_created_topic雳锋,zk為筆者的kafka集群使用的zk
bin/kafka-topics.sh --zookeeper 172.21.37.197:22181 --create --replication-factor 3 --partitions 16 --topic new_created_topic
//查看建立的topic的狀態(tài)
bin/kafka-topics.sh --zookeeper 172.21.37.197:22181 --describe --topic new_created_topic
顯示:
Topic:new_created_topic PartitionCount:16 ReplicationFactor:3 Configs:
Topic: new_created_topic Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: new_created_topic Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: new_created_topic Partition: 2 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: new_created_topic Partition: 3 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: new_created_topic Partition: 4 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: new_created_topic Partition: 5 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: new_created_topic Partition: 6 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: new_created_topic Partition: 7 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: new_created_topic Partition: 8 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: new_created_topic Partition: 9 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: new_created_topic Partition: 10 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: new_created_topic Partition: 11 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: new_created_topic Partition: 12 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: new_created_topic Partition: 13 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: new_created_topic Partition: 14 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: new_created_topic Partition: 15 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
其中:
第一列是topic名稱黄绩;第二列是partition序號;第三列是leader副本所在的kafka broker id玷过,和kafka配置的id一致
第三列是副本分配在哪些broker上爽丹,其值是broker id列表筑煮;第四列是處于同步狀態(tài)的副本所在的broker id列表
//查看topic列表,將顯示server上所有的topic的列表粤蝎,不顯示詳細信息
bin/kafka-topics.sh --zookeeper 172.21.37.197:22181 --list
//刪除topic咆瘟,刪除后執(zhí)行上一步的list,可以看到topic已經(jīng)被刪除诽里;如果server沒有配置允許刪除袒餐,則只會標(biāo)記marked for deleted
bin/kafka-topics.sh --zookeeper 172.21.37.197:22181 --delete --topic new_created_topic
1.3 查看topic的最大最小offset
查看最大最小offset可以讓我們知道某個topic的數(shù)據(jù)量是多少,并且知道每個partition的earliest和latest offset谤狡。這里解釋下earliest offset和latest offset:
- earliest offset:最早的offset灸眼,也就是最小的offset,在早期的kafka客戶端api中,稱之為smallest墓懂。假設(shè)partition 0的消息是從1127到7892焰宣,那么1127就是該partition的earliest offset
- latest offset:最新offset,也就是最大的offset,在早期的kafka客戶端api中捕仔,稱之為largest匕积。。假設(shè)partition 0的消息是從1127到7892榜跌,那么7892就是該partition的latest offset
該命令在查看kafka消息的存量數(shù)量或者手工調(diào)整消費者的offset時需要用到闪唆。如果kafka消費了無效的offset,即消費的offset小于實際offset最小值或者大于實際offset最大值钓葫,將返回一個錯誤悄蕾。
命令:
//查看最小offset
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --topic TOPIC_NAME --time -2
//查看最大offset
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --topic TOPIC_NAME --time -1
例如
//這里筆者事先建立了一個名為tttttttt_topic的topic
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.21.37.194:39092 --topic tttttttt_topic --time -2
tttttttt_topic:2:1
tttttttt_topic:1:2
tttttttt_topic:0:7
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.21.37.194:39092 --topic tttttttt_topic --time -1
tttttttt_topic:2:698
tttttttt_topic:1:630
tttttttt_topic:0:639
其中partition 2的earlist offset是1,latest offset是698础浮,則里面有698條消息帆调;所有的加起來,便是該topic的可用信息總量豆同。
1.4 通過控制臺命令生產(chǎn)和消費消息
在開發(fā)測試的過程中番刊,有時候也在生產(chǎn)上,為了驗證我們的topic可以正常被寫入消息或者可以被正常消費影锈,通常我們需要一個簡單芹务、直接的生產(chǎn)/消費工具。kafka在其bin里為我們提供了這樣的腳本精居,可以直接在kafka server上通過命令的方式模擬生產(chǎn)者和消費者锄禽。
1.4.1 控制臺生產(chǎn)者-console producer
命令:
bin/kafka-console-producer.sh --broker-list BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --topic TOPIC_NAME
- BROKER_HOST是kafka server的ip地址,PORT是server的監(jiān)聽端口靴姿。多個host port之間用逗號隔開
- TOPIC_NAME是要發(fā)送消息的topic名稱
- 不用將所有的kafka server的ip和port都列在后面沃但,事實上只要有一個或者的broker的ip和port配上就可以了,客戶端連接上server之后會自動獲取其他節(jié)點的信息
例如佛吓,發(fā)送消息到topic為tttttttt_topic
bin/kafka-console-producer.sh --zookeeper 172.21.37.194:39092 --topic tttttttt_topic
>testmessage-111
>testmessage-222
>
1.4.2 控制臺消費者-console consumer
命令:
bin/kafka-console-consumer.sh --zookeeper ZOOKEEPER_HOST1:PORT1,ZOOKEEPER_HOST2:PORT2 --topic TOPIC_NAME [--from-beginning]
- ZOOKEEPER_HOST是kafka所使用的zookeeper的ip地址宵晚,PORT是zookeeper監(jiān)聽的端口垂攘。多個host port之間用逗號隔開
- zookeeper集群不需要全部列上,給出一個可用的zk地址和端口即可
- TOPIC_NAME是要發(fā)送消息的topic名稱
- --from-beginning是可選項淤刃,如果不加晒他,則consumer從當(dāng)前的latest offset開始消費,也就是說逸贾,執(zhí)行了命令之后陨仅,需要有新的消息寫入,這里才能消費到铝侵;如果加上了該參數(shù)灼伤,則從earliest offset開始消費。具體需不需要看場合咪鲜,但是如果加了該參數(shù)狐赡,注意大量消息的瘋狂刷屏,一般我們會結(jié)合grep或者定向到文件里
例如疟丙,從topic為tttttttt_topic的主題里消費消息
bin/kafka-console-consumer.sh --zookeeper 172.21.37.197:22181 --topic tttttttt_topic
testmessage-444
testmessage-555
1.5 查看消費者狀態(tài)和消費詳情
注意本節(jié)的命令僅適用于將消費者信息存放在zookeeper上的情況颖侄。kafka-0.8.x系列的版本,都是默認(rèn)將consumer的狀態(tài)信息寫在zookeeper上享郊。如果應(yīng)用自己修改了消費者信息存儲的位置览祖,那么此章節(jié)可能不適用。
有時候我們需要關(guān)心消費者應(yīng)用的狀態(tài)拂蝎,一般消費者應(yīng)用會自己通過日志獲知當(dāng)前消費到了哪個topic的哪個partition的哪個offset穴墅,但當(dāng)消費者出問題之后惶室,或者出于監(jiān)控的原因温自,我們需要知道消費者的狀態(tài)和詳情,那么需要借助kafka提供的相關(guān)命令皇钞。
命令:
首先我們要連接到zookeeper悼泌,在一臺有zookeeper客戶端的機器上執(zhí)行
bin/zkCli.sh -server ZOOKEEPER_HOST1:PORT1,ZOOKEEPER_HOST2:PORT2
- ZOOKEEPER_HOST是kafka所使用的zookeeper的ip地址,PORT是zookeeper監(jiān)聽的端口夹界。多個host port之間用逗號隔開
- zookeeper集群不需要全部列上馆里,給出一個可用的zk地址和端口即可
接著,通過在zk客戶端上執(zhí)行命令可柿,查看消費者信息鸠踪,
get /consumers/CONSUMER_GROUP_NAME/offsets/TOPIC_NAME/PARTITION_INDEX
- CONSUMER_GROUP_NAME是消費者組的名稱
- TOPIC_NAME是消費的topic的名稱
- PARTITION_INDEX是要查看的partition的編號
例如,查看topic為dcs_async_redis_to_db的主題复斥,group為consumer_buffer_20181025_2的消費者信息:
[zk: 172.21.132.66:22181,172.21.132.67:22181(CONNECTED) 0] get /consumers/consumer_buffer_20181025_2/offsets/dcs_async_redis_to_db/2
119901
cZxid = 0x40069bbb3
ctime = Thu Oct 25 15:34:43 CST 2018
mZxid = 0x500007194
mtime = Mon Oct 29 13:55:54 CST 2018
pZxid = 0x40069bbb3
cversion = 0
dataVersion = 386
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0
類似的营密,可以更換partition編號,去查看其它partition被消費到了什么位置
1.6 重置消費者offset
注意本節(jié)的命令僅適用于將消費者信息存放在broker上的情況目锭。對于kafka 0.10.1之后评汰,默認(rèn)的java api都是將消費信息保存在broker上纷捞,那么適用于以下命令;如果是早期版本被去,或者是人為將消費者信息保存在zk或者其他地方主儡,那么此處的命令將無效。如果是保存在zk上惨缆,可以參照第二章的相關(guān)命令糜值。
通過上一節(jié),我們知道了如何查看消費者詳情坯墨,那么在生產(chǎn)實踐中臀玄,有時我們可能希望認(rèn)為修改消費者消費到的offset位置,以達到重新消費畅蹂,或者跳過一部分消息的目的健无,這時候重置offset的工具就非常實用。
命令:
與上一節(jié)類似液斜,重置消費者的offset累贤,也是僅限于基于zookeeper存儲消費者信息的模式。那么第一步是登錄zookeeper
bin/zkCli.sh -server ZOOKEEPER_HOST1:PORT1,ZOOKEEPER_HOST2:PORT2
- ZOOKEEPER_HOST是kafka所使用的zookeeper的ip地址少漆,PORT是zookeeper監(jiān)聽的端口臼膏。多個host port之間用逗號隔開
- zookeeper集群不需要全部列上,給出一個可用的zk地址和端口即可
登錄之后示损,執(zhí)行set即可
set /consumers/CONSUMER_GROUP_NAME/offsets/TOPIC_NAME/PARTITION_INDEX NEW_OFFSET
- CONSUMER_GROUP_NAME是消費者組的名稱
- TOPIC_NAME是消費的topic的名稱
- PARTITION_INDEX是要查看的partition的編號
- NEW_OFFSET是想設(shè)置的offset值
注意NEW_OFFSET應(yīng)該被設(shè)置成合理的offset值渗磅,不可以超過partition的earlist offset和largest offset,否則消費者可能會接收到OffsetOutofRange的異常检访,當(dāng)收到該異常時始鱼,消費者可能會根據(jù)不同的策略選擇從earliest開始繼續(xù)消費或者從latest開始消費
例如,消費者將dcs_async_redis_to_db的信息全部都消費了脆贵,假設(shè)此時我想將每個分區(qū)的offset都拉到200去医清,從200往后重新消費,則通過命令
[zk: 172.21.132.66:22181,172.21.132.67:22181(CONNECTED) 1] set /consumers/consumer_buffer_20181025_2/offsets/dcs_async_redis_to_db/2 100
cZxid = 0x40069bbb3
ctime = Thu Oct 25 15:34:43 CST 2018
mZxid = 0x6000289a7
mtime = Wed Oct 31 21:54:58 CST 2018
pZxid = 0x40069bbb3
cversion = 0
dataVersion = 387
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0
即可卖氨,這時候可以使用上一節(jié)的命令進行檢查
[zk: 172.21.132.66:22181,172.21.132.67:22181(CONNECTED) 2] get /consumers/consumer_buffer_20181025_2/offsets/dcs_async_redis_to_db/2
100
cZxid = 0x40069bbb3
ctime = Thu Oct 25 15:34:43 CST 2018
mZxid = 0x6000289a7
mtime = Wed Oct 31 21:54:58 CST 2018
pZxid = 0x40069bbb3
cversion = 0
dataVersion = 387
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 0 -
可以看到offset已經(jīng)被調(diào)整到了200
在生產(chǎn)實踐中会烙,如果需要修改消費者消費的offset,可能需要編寫一個腳本來達到批量的設(shè)置筒捺,否則在分區(qū)數(shù)很多的情況下柏腻,一個個設(shè)置比較低效,且可能出錯系吭。
注意:需要先停掉消費者五嫂,才可以成功執(zhí)行該命令
另,可以通過直接更換消費者group id的方式村斟,配合消費者默認(rèn)的消費策略贫导,可以達到類似的效果抛猫,反而更加簡單、高效和安全孩灯。
1.7 查看topic的狀態(tài)和分區(qū)負(fù)載詳情
當(dāng)broker出現(xiàn)宕機闺金,恢復(fù)之后,我們可以看下topic的leader是否負(fù)載均衡峰档。因為kafka的所有讀寫消息的請求败匹,都是發(fā)送到partition leader上的,因此在生產(chǎn)環(huán)境讥巡,負(fù)載均衡顯得尤其重要掀亩。
命令:
bin/kafka-topics.sh --zookeeper ZOOKEEPER_HOST1:PORT1,ZOOKEEPER_HOST2:PORT2 --describe --topic TOPIC_NAME
- ZOOKEEPER_HOST是kafka所使用的zookeeper的ip地址,PORT是zookeeper監(jiān)聽的端口欢顷。多個host port之間用逗號隔開
- 類似的槽棍,zookeeper集群不需要全部列上,給出一個可用的zk地址和端口即可
例如抬驴,查看topic為dcs_storm_collect_info_ios的負(fù)載信息:
bin/kafka-topics.sh --zookeeper 172.21.22.161:2181 --describe --topic dcs_storm_collect_info_ios
顯示信息 :
Topic:dcs_storm_collect_info_ios PartitionCount:16 ReplicationFactor:2 Configs:
Topic: dcs_storm_collect_info_ios Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: dcs_storm_collect_info_ios Partition: 1 Leader: 1 Replicas: 1,0 Isr: 0,1
Topic: dcs_storm_collect_info_ios Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: dcs_storm_collect_info_ios Partition: 3 Leader: 1 Replicas: 1,0 Isr: 0,1
Topic: dcs_storm_collect_info_ios Partition: 4 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: dcs_storm_collect_info_ios Partition: 5 Leader: 1 Replicas: 1,0 Isr: 0,1
Topic: dcs_storm_collect_info_ios Partition: 6 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: dcs_storm_collect_info_ios Partition: 7 Leader: 1 Replicas: 1,0 Isr: 0,1
Topic: dcs_storm_collect_info_ios Partition: 8 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: dcs_storm_collect_info_ios Partition: 9 Leader: 1 Replicas: 1,0 Isr: 0,1
Topic: dcs_storm_collect_info_ios Partition: 10 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: dcs_storm_collect_info_ios Partition: 11 Leader: 1 Replicas: 1,0 Isr: 0,1
Topic: dcs_storm_collect_info_ios Partition: 12 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: dcs_storm_collect_info_ios Partition: 13 Leader: 1 Replicas: 1,0 Isr: 0,1
Topic: dcs_storm_collect_info_ios Partition: 14 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: dcs_storm_collect_info_ios Partition: 15 Leader: 1 Replicas: 1,0 Isr: 0,1
本例中查看的是topic dcs_storm_collect_info_ios炼七,其分區(qū)數(shù)是16(PartitionCount:16),副本數(shù)是2(ReplicationFactor:2)
根據(jù)第一行的分區(qū)信息
Topic: dcs_storm_collect_info_ios Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
我們知道編號為0的分區(qū)布持,其副本在broker id 0 和id 1上(Replicas: 0,1)
其分區(qū)的首領(lǐng)也就是leader是broker 0豌拙,也就是編號為0的那個kafka節(jié)點(Leader: 0)
在其所有副本(分布在0和1上)中,處于同步著的狀態(tài)副本是0和1(Isr: 0,1)
假設(shè)此時broker 0宕機了题暖,那么應(yīng)該看到的信息會是
Topic: dcs_storm_collect_info_ios Partition: 0 Leader: 1 Replicas: 1 Isr: 1
如果發(fā)現(xiàn)以下現(xiàn)象說明kafka異常:
- 某個topic的每個分區(qū)按傅,同步副本數(shù)量和設(shè)定的副本數(shù)量不一致
- 某個topic的每個分區(qū),leader的id數(shù)值是-1或者none