啟動(dòng)zookeeper:
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
啟動(dòng)kafka server:
./kafka-server-start.sh -daemon ../config/server.properties
新建topic:
./kafka-topics.sh --create --topic topic_test --replication-factor 3 --partitions 3 --zookeeper localhost:2181
啟動(dòng)kafka server:
[圖片上傳中...(image.png-cb22c8-1563602345967-0)]
查詢topic列表:
./kafka-topics.sh --list --zookeeper localhost:2181
查詢topic信息:
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic_test
向topic生產(chǎn)數(shù)據(jù):
./kafka-console-producer.sh --broker-list localhost:9092 --topic topic_test
消費(fèi)topic數(shù)據(jù):
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_test --from-beginning
增加topic分區(qū)數(shù):
./kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic_test --partitions 6
刪除topic:
./kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic topic_test_1
注:如果server.properties中沒(méi)有把delete.topic.enable設(shè)為true埋嵌,那么此時(shí)的刪除并不是真正的刪除评汰,而是把topic標(biāo)記為:marked for deletion
查看消費(fèi)offset情況:
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test
展示的幾個(gè)參數(shù):
TOPIC: 該消費(fèi)者組消費(fèi)的是哪些topic, 本例是只創(chuàng)建了一個(gè)topic, test, 所以只消費(fèi)了test。
PARTITION: 表示該消費(fèi)者消費(fèi)的是哪些分區(qū), 本例創(chuàng)建topic時(shí)只有一個(gè)分區(qū), 所以輸出只有一行, 分區(qū)號(hào)為0煤禽。
CURRENT-OFFSET: 表示消費(fèi)者組最新消費(fèi)的位移值, 此值在消費(fèi)過(guò)程中是變化的。
LOG-END-OFFSET: 表示topic所有分區(qū)當(dāng)前的日志終端位移值, 因?yàn)槲覀兩a(chǎn)了11185243萬(wàn)數(shù)據(jù), 所以此處是11185243萬(wàn)肝箱。
LAG: 表示滯后進(jìn)度, 此值為L(zhǎng)OG-END-OFFSET 與 CURRENT-OFFSET的差值, 代表的是滯后情況, 此值越大表示滯后嚴(yán)重, 本例最終LAG為0 說(shuō)明沒(méi)有消費(fèi)滯后暗膜。