查看consumer組內(nèi)消費的offset
在${KAFKA_HOME}/bin下
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper host:2181 --group [消費者組名] --topic [topic名]
其中的group可去zookeeper中查看:
[hadoop@h71 zookeeper-3.4.5-cdh5.5.2]$ bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 2] ls /consumers
[console-consumer-74653, WordcountConsumerGroup]
注:(console-consumer-74653這個組當我在另一個窗口啟動消費者[hadoop@h71 kafka_2.10-0.8.2.0]$ bin/kafka-console-consumer.sh --zookeeper h71:2181,h72:2181,h73:2181 --topic test --from-beginning時才會有沦童,關(guān)閉消費者進程該組會自動消失)
我們在使用kafka消費信息的過程中泉粉,不同group的consumer是可以消費相同的信息的胆数,group是在創(chuàng)建consumer時指定的冷尉,如果group不存在倒源,會自動創(chuàng)建评也。其實簡單點說就是每個group都會在zk中注冊遵湖,區(qū)別就是注冊過還是沒注冊過塑娇。每個group內(nèi)的consumer只能消費在group注冊過之后生產(chǎn)的信息。
執(zhí)行結(jié)果如下:列出了所有消費者組的所有信息形庭,包括Group(消費者組)铅辞、Topic、Pid(分區(qū)id)萨醒、Offset(當前已消費的條數(shù))斟珊、LogSize(總條數(shù))、Lag(未消費的條數(shù))富纸、Owner
[hadoop@h71 kafka_2.10-0.8.2.0]$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper 192.168.8.71:2181,192.168.8.72:2181,192.168.8.73:2181 --group WordcountConsumerGroup --topic test
或者:
[hadoop@h71 kafka_2.10-0.8.2.0]$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper=192.168.8.71:2181,192.168.8.72:2181,192.168.8.73:2181 --group=WordcountConsumerGroup --topic=test
再或者:
[hadoop@h71 kafka_2.10-0.8.2.0]$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper=h71:2181 --group=WordcountConsumerGroup
Group Topic Pid Offset logSize Lag Owner
test-consumer-group test 0 4 40 36 none
test-consumer-group test 1 14 57 43 none
[hadoop@h71 kafka_2.10-0.8.2.0]$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper 192.168.8.71:2181,192.168.8.72:2181,192.168.8.73:2181 --group=WordcountConsumerGroup
Group Topic Pid Offset logSize Lag Owner
WordcountConsumerGroup test 0 9 40 31 none
WordcountConsumerGroup test 1 21 57 36 none
[hadoop@h71 kafka_2.10-0.8.2.0]$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper 192.168.8.71:2181,192.168.8.72:2181,192.168.8.73:2181 --group=console-consumer-42639
Group Topic Pid Offset logSize Lag Owner
console-consumer-42639 test 0 40 40 0 console-consumer-42639_h71-1498000434797-f1c703cf-0
console-consumer-42639 test 1 57 57 0 console-consumer-42639_h71-1498000434797-f1c703cf-0
kafka-run-class.sh腳本囤踩,它是調(diào)用了ConsumerOffsetChecker的main方法,所以晓褪,我們也可以通過java代碼來訪問scala的ConsumerOffsetChecker類堵漱,代碼如下:
import kafka.tools.ConsumerOffsetChecker;
public class hui
{
public static void main(String[] args)
{
//適用于kafka0.8.2.0
String[] arr = new String[]{"--zookeeper=h71:2181,h72:2181,h73:2181","--group=test-consumer-group"};
//適用于kafka0.8.1
// String[] arr = new String[]{"--zkconnect=h71:2181,h72:2181,h73:2181","--group=test-consumer-group"};
ConsumerOffsetChecker.main(arr);
}
}
運行結(jié)果如下:
跟通過kafa-run-class.sh執(zhí)行的結(jié)果是一樣的