本文命令均在kafka2.11-1.0.1下使用,如果端口沖突挪蹭,此時(shí)需要加JMX_PORT=10111(或者其他數(shù)字) 來更改端口號(hào)
1. 查看kafka下有多少topic
bin/kafka-topics.sh --list --zookeeper header-1:2181
2.創(chuàng)建topic
bin/kafka-topics.sh --create --topic test-topic --zookeeper header-1:2181
\ --config max.message.bytes=10485760 --partitions 1 --replication-factor 2
--config后可以添加許多參數(shù)亭饵,比如
參數(shù)名稱 | 含義 |
---|---|
max.message.bytes | 接收消息的最大字節(jié)數(shù) |
partitions | 分區(qū)數(shù) |
replication-factor | 副本個(gè)數(shù) |
3.往topic寫消息
/usr/lib/kafka-current/bin/kafka-console-producer.sh --broker-list worker-ip:9092 --topic test-topic
4.消費(fèi)topic里的消息
/usr/lib/kafka-current/bin/kafka-console-consumer.sh --bootstrap-server worker-ip:9092 --topic test-topic --from-beginning
增加--from-beginning表示從頭消費(fèi),否則去掉表示最新消費(fèi)
5.查看topic信息
bin/kafka-topics.sh --describe --zookeeper header-1:2181 --topic test-topic
6.修改topic分區(qū)數(shù)
bin/kafka-topics.sh --zookeeper header-1:2181 -alter --partitions 10 --topic test-topic
只增不減
7.為topic增加時(shí)間戳
kafka-topics.sh --zookeeper -header-1:2181 --topic test-topic --alter --config message.timestamp.type=LogAppendTime