[TOC]
對于kafka消費者來說晾浴,我們通常需要監(jiān)控或者檢查一下consumer的狀態(tài),除了進程本身的狀態(tài)和系統(tǒng)狀態(tài),最關(guān)心的可能就是消費者的消費速度是不是太慢溶握,有沒有消息滯后/堆積了,對于高版本的kafka蒸播,kafka自身就提供了比較方便的命令睡榆,而對于低版本的kafka,可能需要自己通過命令組合查看袍榆,或者安裝第三方監(jiān)控應(yīng)用胀屿。
本文提供了高低版本通過命令的方式查看consumer消費狀態(tài)的方法,供參考包雀。
1. kafka_0.8.x系列
1.1 要求
- 消費者存儲offset在zk
- 需要放一個zk運行包到腳本執(zhí)行的環(huán)境宿崭,執(zhí)行時需要zk客戶端;即需要將zk的bin加入到PATH
- 需要放一個kafka運行包到腳本執(zhí)行環(huán)境才写,執(zhí)行時需要kafka客戶端葡兑;即需要將kakfa的bin加入到PATH
最簡單的做法就是在腳本部署機器上放上zk和kafka的安裝包,然后配置好各自的環(huán)境變量赞草,也就是要能夠直接執(zhí)行zkCli.sh/kafka-topics.sh等命令
1.2 腳本
#kafka broker節(jié)點地址讹堤,如果有多個,以逗號隔開
KAFKA_BROKER_LIST=172.21.37.178:39091
#監(jiān)控的consumer所消費的kafka topic
KAFKA_TOPIC=risk_info
#topic的分區(qū)數(shù)量
KAFKA_TOPIC_PARTITION_COUNT=12
#zk地址和端口列表房资,蜕劝,如果有多個,以逗號隔開
ZOOKEEPER_LIST=172.21.34.92:32181
#consumer的group id
CONSUMER_GROUP_ID=kafka0822_test1
CHECK_RES_FILE=consumer_check.log
echo "ATTENTION: this script is useful for kafka 0.8.x and consumer with zookeeper."
echo "start to fetch kafka consumer status now. please wait for about 10s. result will be put in file $CHECK_RES_FILE"
offsets[0]=0
consumed[0]=0
#kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKA_BROKER_LIST --topic $KAFKA_TOPIC --time -2
echo "START----------------------------------------------------------------------------" >> $CHECK_RES_FILE
echo "check kafka latest offset command: kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKA_BROKER_LIST --topic $KAFKA_TOPIC --time -1" >> $CHECK_RES_FILE
eval $(kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKA_BROKER_LIST --topic $KAFKA_TOPIC --time -1|grep -v kafka|grep -v grep|awk -F: '{print "offsets["(FNR-1)"]="$3}')
#echo "kafka latest offset:" >> $CHECK_RES_FILE
#for var in ${offsets[@]};
#do
# echo $var >> $CHECK_RES_FILE
#done
for ((i=0; i<$KAFKA_TOPIC_PARTITION_COUNT; i++))
do
if [ $i == 0 ]; then
echo "check kafka consumed offset command:zkCli.sh -server $ZOOKEEPER_LIST get /consumers/$CONSUMER_GROUP_ID/offsets/$KAFKA_TOPIC/$i|tail -1" >> $CHECK_RES_FILE
fi
consumed[$i]=`zkCli.sh -server $ZOOKEEPER_LIST get /consumers/$CONSUMER_GROUP_ID/offsets/$KAFKA_TOPIC/$i|tail -1`
done
for ((i=0; i<$KAFKA_TOPIC_PARTITION_COUNT; i++))
do
LAG=`expr ${offsets[$i]} - ${consumed[$i]}`
echo "topic:$KAFKA_TOPIC, partition:$i, consumer group:$CONSUMER_GROUP_ID, latest offset:${offsets[$i]}, consumed offset:${consumed[$i]}, lag:$LAG" >> $CHECK_RES_FILE
done
echo "END----------------------------------------------------------------------------" >> $CHECK_RES_FILE
執(zhí)行時轰异,需要修改的地方為
#kafka 節(jié)點host和端口岖沛,如果寫多個,用逗號隔開
KAFKA_BROKER_LIST=172.21.37.178:39091
#要查看的topic
KAFKA_TOPIC=risk_info
#topic的分區(qū)數(shù)量搭独,需要確定好
KAFKA_TOPIC_PARTITION_COUNT=12
#zookeeper的host和端口婴削,如果寫多個,用逗號隔開
ZOOKEEPER_LIST=172.21.34.92:32181
#consumer group
CONSUMER_GROUP_ID=kafka0822_test1
執(zhí)行結(jié)果為:
結(jié)果文件默認保存在kafka_consumer_check_res.log中
cat kafka_consumer_check_res.log
顯示:
START----------------------------------------------------------------------------
check kafka latest offset command: kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.21.37.178:39091 --topic risk_info --time -1
check kafka consumed offset command:zkCli.sh -server 172.21.34.92:32181 get /consumers/kafka0822_test1/offsets/risk_info/0|tail -1
topic:risk_info, partition:0, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:1, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:2, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:3, consumer group:kafka0822_test1, latest offset:1083, consumed offset:1082, lag:1
topic:risk_info, partition:4, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:5, consumer group:kafka0822_test1, latest offset:1083, consumed offset:1081, lag:2
topic:risk_info, partition:6, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:7, consumer group:kafka0822_test1, latest offset:1083, consumed offset:1082, lag:1
topic:risk_info, partition:8, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:9, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:10, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:11, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1080, lag:2
END----------------------------------------------------------------------------
具體只需要關(guān)注結(jié)果文件最后的lag值牙肝,以確定是否滯后
1.3 檢查建議
- 如果每個分區(qū)的堆積在兩位數(shù)以內(nèi)唉俗,且沒有增長趨勢嗤朴,則表示consumer沒有大問題
- 如果堆積數(shù)量不斷增長,則需要檢查消費者是否有問題
由于命令行的zk客戶端執(zhí)行較慢虫溜,因此如果分區(qū)較多的時候雹姊,在kafka吞吐量高峰時期,靠后的分區(qū)檢查結(jié)果可能顯示LAG為負數(shù)衡楞,這是因為高峰時期offset更新的比較快吱雏,我們從kafka里查詢offset只在腳本的開頭執(zhí)行了一次,而zk命令的執(zhí)行過程卻要很久(每個分區(qū)要執(zhí)行一次查詢請求命令)瘾境,此時zk里的offset已經(jīng)被consumer提交成相對更新的歧杏,因此出現(xiàn)了二者相減出現(xiàn)了負數(shù)
2. kafka 0.9.x系列
具體是不是0.9不記得了,kafka的版本比較多迷守,如果0.9.x不支持犬绒,可以繼續(xù)使用上一節(jié)的腳本
命令:
//首先我們需要知道當(dāng)前有哪些消費者group,如果已知兑凿,此步驟可忽略
bin/kafka-consumer-groups.sh --bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --list
bin/kafka-consumer-groups.sh --bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --group GROUP_NAME --describe
- BROKER_HOST是kafka server的ip地址凯力,PORTt是server的監(jiān)聽端口。多個host port之間用逗號隔開
- 第一條命令是獲取group列表急膀,一般而言沮协,應(yīng)用是知道消費者group的,通常在應(yīng)用的配置里卓嫂,如果已知慷暂,該步驟可以省略
- 第二條命令是查看具體的消費者group的詳情信息,需要給出group的名稱
例如晨雳,首先列出消費者group列表
bin/kafka-consumer-groups.sh --bootstrap-server 172.21.37.194:39092 --list
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
console-consumer-89764
console-consumer-45728
我們以console-consumer-8976為例行瑞,查看詳情
bin/kafka-consumer-groups.sh --bootstrap-server 172.21.37.194:39092 --group console-consumer-89764 --describe
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
tttttttt_topic 0 641 641 0 consumer-1-c313db2b-7758-4de0-8cbd-025997d1a4cc /172.21.37.194 consumer-1
tttttttt_topic 1 632 632 0 consumer-1-c313db2b-7758-4de0-8cbd-025997d1a4cc /172.21.37.194 consumer-1
tttttttt_topic 2 699 699 0 consumer-1-c313db2b-7758-4de0-8cbd-025997d1a4cc /172.21.37.194 consumer-1
其中
- TOPIC:該group里消費的topic名稱
- PARTITION:分區(qū)編號
- CURRENT-OFFSET:該分區(qū)當(dāng)前消費到的offset
- LOG-END-OFFSET:該分區(qū)當(dāng)前l(fā)atest offset
- LAG:消費滯后區(qū)間,為LOG-END-OFFSET-CURRENT-OFFSET餐禁,具體大小需要看應(yīng)用消費速度和生產(chǎn)者速度血久,一般過大則可能出現(xiàn)消費跟不上,需要引起應(yīng)用注意
- CONSUMER-ID:server端給該分區(qū)分配的consumer編號
- HOST:消費者所在主機
- CLIENT-ID:消費者id帮非,一般由應(yīng)用指定