本文基于Kafka 0.10.0
基本操作
列出所有topic
kafka-topics.sh --zookeeper localhost:2181 --list
創(chuàng)建topic
kafka-topics.sh --zookeeper localhost:2181 \
--create \
--topic earth \
--partitions 1 \
--replication-factor 1
生產(chǎn)數(shù)據(jù)
向earth發(fā)送一條消息
echo "The first record" | kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic earth
向earth發(fā)送一條包含key的消息
echo '00000,{"name":"Steve", "title":"Captain America"}' | kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic earth \
--property parse.key=true \
--property key.separator=,
消費數(shù)據(jù)
kafka-console-consumer.sh --zookeeper localhost:2181 \
--topic earth \
--from-beginning
將消息的key也輸出
kafka-console-consumer.sh --zookeeper localhost:2181 \
--topic earth \
--from-beginning \
--property print.key=true \
--property key.separator=,
如果要消費__consumer_offsets
中的內(nèi)容渗勘,需要在consumer.properties
中配置exclude.internal.topics=false
税稼,并執(zhí)行下面命令
kafka-console-consumer.sh --zookeeper localhost:2181 \
--topic __consumer_offsets \
--from-beginning \
--formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" \
--consumer.config ~/consumer.properties
Topic的offset統(tǒng)計
kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic earth \
--time -1
最后的參數(shù)-1表示顯示獲取當(dāng)前offset最大值,-2表示offset的最小值
如果遇到數(shù)據(jù)傾斜的情況格郁,可以通過kafka-simple-consumer-shell.sh
查看具體某個partition數(shù)據(jù)內(nèi)容,例如
kafka-simple-consumer-shell.sh --broker-list localhost:9092 \
--topic earth \
--partition 1 \
--print-offsets \
--offset 18 \
--clientId test \
--property print.key=true
高級Consumers和Groups
創(chuàng)建一個consumer.properties配置文件,指定group.id
echo "group.id=Avengers" > consumer.properties
然后再發(fā)送一條數(shù)據(jù)
echo "The second record" | kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic earth
通過consumer驗證一下當(dāng)前topic的數(shù)據(jù),
kafka-console-consumer.sh --zookeeper localhost:2181 \
--topic earth \
--from-beginning \
--consumer.config consumer.properties
得到的結(jié)果是
The first record
The second record
這是看一下zookeeper中存儲的內(nèi)容
[zk: localhost:2181(CONNECTED) 0] get /consumers/Avengers/offsets/earth/0
2
cZxid = 0x8200012d1d
ctime = Fri May 05 17:10:02 CST 2017
mZxid = 0x8200012d1d
mtime = Fri May 05 17:10:02 CST 2017
pZxid = 0x8200012d1d
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 1
numChildren = 0
第一行的2表示的就是我們配置的這個group消費的最后一個offset促王,如果再次運行
kafka-console-consumer.sh --zookeeper localhost:2181 \
--topic earth \
--consumer.config consumer.properties
沒有任何結(jié)果輸出
這時需要通過UpdateOffsetsInZK重置offset,在剛才的配置中加入
echo "zookeeper.connect=localhost:2181" >> consumer.properties
然后運行
kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest consumer.properties earth
顯示如下結(jié)果
updating partition 0 with new offset: 0
updated the offset for 1 partitions
這樣運行剛才的命令
kafka-console-consumer.sh --zookeeper localhost:2181 \
--topic earth \
--consumer.config consumer.properties
會重新從第一個offset開始讀而晒,即顯示
The first record
The second record
但是如果運行下面的命令蝇狼,即加上--from-beginning
kafka-console-consumer.sh --zookeeper localhost:2181 \
--topic earth \
--from-beginning \
--consumer.config consumer.properties
就會提示
Found previous offset information for this group Avengers. Please use --delete-consumer-offsets to delete previous offsets metadata
必須要加上--delete-consumer-offsets
才可以,像這樣
kafka-console-consumer.sh --zookeeper localhost:2181 \
--topic earth \
--delete-consumer-offsets \
--from-beginning \
--consumer.config consumer.properties