由于consumer在消費(fèi)過程中可能會(huì)出現(xiàn)斷電宕機(jī)等故障兜叨,consumer恢復(fù)后穿扳,需要從故障前的位置的繼續(xù)消費(fèi),所以consumer需要實(shí)時(shí)記錄自己消費(fèi)到了哪個(gè)offset国旷,以便故障恢復(fù)后繼續(xù)消費(fèi)矛物。
創(chuàng)建一個(gè)topic
kafka-topics.sh --create --topic bigdata --zookeeper zkKafka1:2181 --partitions 2 --replication-factor 2
Created topic "bigdata".
連接上這個(gè)topic準(zhǔn)備發(fā)消息
kafka-console-producer.sh --broker-list zkKafka1:9092 --topic bigdata
> hello
啟動(dòng)一個(gè)消費(fèi)者消費(fèi)消息
kafka-console-consumer.sh --zookeeper zkKafka1:2181 --topic bigdata
查看zk
zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, kafka-manager, latest_producer_id_block, zookeeper]
[zk: localhost:2181(CONNECTED) 2] ls /brokers
[ids, seqid, topics]
[zk: localhost:2181(CONNECTED) 3] ls /brokers/ids
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics
[__consumer_offsets, bigdata, first]
[zk: localhost:2181(CONNECTED) 5] ls /consumers
# 消費(fèi)者組
[console-consumer-90976]
[zk: localhost:2181(CONNECTED) 6] ls /consumers/console-consumer-90976
[ids, offsets, owners]
[zk: localhost:2181(CONNECTED) 7] ls /consumers/console-consumer-90976/offsets
[bigdata]
[zk: localhost:2181(CONNECTED) 8] ls /consumers/console-consumer-90976/offsets/bigdata
[0, 1]
[zk: localhost:2181(CONNECTED) 10] get /consumers/console-consumer-90976/offsets/bigdata/0
1
Kafka 0.9 版本之前,consumer默認(rèn)將offset保存在zookeeper中跪但,從0.9版本開始履羞,consumer默認(rèn)將offset保存在Kafka一個(gè)內(nèi)置的topic中,該topic為__consumer_offsets屡久。
1)修改配置文件consumer.properties
exclude.internal.topics=false
2)讀取offset
# 重新啟動(dòng)一個(gè)消費(fèi)者
kafka-console-consumer.sh --bootstrap-server zkKafka1:9092 --topic bigdata
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper zkKafka1:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.conf
ig config/consumer.properties --from-beginning
[console-consumer-17599,bigdata,0]::[OffsetMetadata[2,NO_METADATA],CommitTime 1644921362785,ExpirationTime 1645007762785]
[console-consumer-17599,bigdata,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1644921362785,ExpirationTime 1645007762785]
[console-consumer-17599,bigdata,0]::[OffsetMetadata[3,NO_METADATA],CommitTime 1644921367772,ExpirationTime 1645007767772]
[console-consumer-17599,bigdata,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1644921367772,ExpirationTime 1645007767772]
[console-consumer-17599,bigdata,0]::[OffsetMetadata[3,NO_METADATA],CommitTime 1644921372775,ExpirationTime 1645007772775]
[console-consumer-17599,bigdata,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1644921372775,ExpirationTime 1645007772775]
[console-consumer-17599,bigdata,0]::[OffsetMetadata[3,NO_METADATA],CommitTime 1644921377777,ExpirationTime 1645007777777]
[console-consumer-17599,bigdata,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1644921377777,ExpirationTime 1645007777777]