kafka安裝目錄下的bin目錄包含了很多運維可操作的shell腳本,列舉如下:
腳本名稱 | 用途描述 |
---|---|
connect-distributed.sh | 連接kafka集群模式 |
connect-standalone.sh | 連接kafka單機模式 |
kafka-acls.sh | todo |
kafka-broker-api-versions.sh | todo |
kafka-configs.sh | 配置管理腳本 |
kafka-console-consumer.sh | kafka消費者控制臺 |
kafka-console-producer.sh | kafka生產者控制臺 |
kafka-consumer-groups.sh | kafka消費者組相關信息 |
kafka-consumer-perf-test.sh | kafka消費者性能測試腳本 |
kafka-delegation-tokens.sh | todo |
kafka-delete-records.sh | 刪除低水位的日志文件 |
kafka-log-dirs.sh | kafka消息日志目錄信息 |
kafka-mirror-maker.sh | 不同數(shù)據(jù)中心kafka集群復制工具 |
kafka-preferred-replica-election.sh | 觸發(fā)preferred replica選舉 |
kafka-producer-perf-test.sh | kafka生產者性能測試腳本 |
kafka-reassign-partitions.sh | 分區(qū)重分配腳本 |
kafka-replay-log-producer.sh | todo |
kafka-replica-verification.sh | 復制進度驗證腳本 |
kafka-run-class.sh | todo |
kafka-server-start.sh | 啟動kafka服務 |
kafka-server-stop.sh | 停止kafka服務 |
kafka-simple-consumer-shell.sh | deprecated,推薦使用kafka-console-consumer.sh |
kafka-streams-application-reset.sh | todo |
kafka-topics.sh | topic管理腳本 |
kafka-verifiable-consumer.sh | 可檢驗的kafka消費者 |
kafka-verifiable-producer.sh | 可檢驗的kafka生產者 |
trogdor.sh | todo |
zookeeper-security-migration.sh | todo |
zookeeper-server-start.sh | 啟動zk服務 |
zookeeper-server-stop.sh | 停止zk服務 |
zookeeper-shell.sh | zk客戶端 |
接下來詳細說明每個腳本的使用方法茶鹃。
- connect-distributed.sh&connect-standalone.sh
Kafka Connect是在0.9以后加入的功能恋拷,主要是用來將其他系統(tǒng)的數(shù)據(jù)導入到Kafka,然后再將Kafka中的數(shù)據(jù)導出到另外的系統(tǒng)冗栗∧∶裕可以用來做實時數(shù)據(jù)同步的ETL,數(shù)據(jù)實時分析處理等孤荣。
主要有2種模式:Standalone(單機模式)和Distribute(分布式模式)甸陌。
單機主要用來開發(fā),測試盐股,分布式的用于生產環(huán)境钱豁。
用法比較復雜,建議參考:Kafka Connect教程詳解 https://3gods.com/bigdata/Kafka-Connect-Details.html
- kafka-broker-api-versions.sh
用法:bin/kafka-broker-api-versions.sh --bootstrap-server 10.0.55.229:9092,10.0.55.229:9093,10.0.55.229:9094
- kafka-configs.sh
配置管理腳本疯汁,這個腳本主要分兩類用法:describe和alter牲尺。
describe相關用法:
查看每個topic的配置:bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics
部分結果如下:
Configs for topic 'afei' are
Configs for topic 'TOPIC-TEST-AFEI' are retention.ms=600000
Configs for topic '__consumer_offsets' are segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
查看broker的配置:bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type brokers --entity-name 0
說明:0是broker.id,因為entity-type為brokers幌蚊,所以entity-name表示broker.id谤碳。
alter相關用法:
給指定topic增加配置:bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name TOPIC-TEST-AFEI --add-config retention.ms=600000
給指定topic刪除配置:bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name TOPIC-TEST-AFEI --delete-config max.message.bytes
通過該腳本可以管理的屬性,可以通過執(zhí)行
bin/kafka-configs.sh
得到的輸出中--add-config
的desc可以得到溢豆。
- kafka-broker-api-versions.sh
用法:bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092
- kafka-console-consumer.sh
用法:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic afei [--group groupName] [--partition 目標分區(qū)]
這個命令后面還可帶很多參數(shù):
--key-deserializer
:指定key的反序列化方式蜒简,默認是 org.apache.kafka.common.serialization.StringDeserializer
--value-deserializer
:指定value的反序列化方式,默認是 org.apache.kafka.common.serialization.StringDeserializer
--from-beginning
:從最早的消息開始消費漩仙,默認是從最新消息開始消費搓茬。
--offset
: 從指定的消息位置開始消費,如果設置了這個參數(shù)讯赏,還需要帶上--partition
垮兑。否則會提示:The partition is required when offset is specified.
--timeout-ms
:當消費者在這個參數(shù)指定時間間隔內沒有收到消息就會推出,并拋出異常:kafka.consumer.ConsumerTimeoutException漱挎。
--whitelist
:接收的topic白名單集合,和--topic
二者取其一雀哨。例如:--whitelist "afei.*"
(以afei開頭的topic)磕谅,--whitelist "afei"
(指定afei這個topic)私爷,"afei|fly"
(指定afei或者fly這兩個topic)。另外一個參數(shù)--blacklist
用法類似膊夹。
- kafka-console-producer.sh
用法:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic afei
衬浑,如果連接集群,那么broker-list參數(shù)格式為:HOST1:PORT1,HOST2:PORT2,HOST3:PORT3
這個命令后面還可帶很多參數(shù):
--key-serializer
:指定key的序列化方式放刨,默認是 org.apache.kafka.common.serialization.StringSerializer
--value-serializer
:指定value的序列化方式工秩,默認是 org.apache.kafka.common.serialization.StringSerializer
- kafka-consumer-groups.sh
查看所有消費者組:bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
查看某個消費者組:bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group AfeiGroup --describe,輸出結果如下:
Note: This will not show information about old Zookeeper-based consumers.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
afei 0 8 8 0 consumer-1-7a46c647-8221-4aca-b6bf-ed14571fb0f1 /172.18.36.203 consumer-1
afei 4 10 10 0 consumer-1-7a46c647-8221-4aca-b6bf-ed14571fb0f1 /172.18.36.203 consumer-1
afei 1 8 8 0 consumer-1-7a46c647-8221-4aca-b6bf-ed14571fb0f1 /172.18.36.203 consumer-1
afei 3 6 6 0 consumer-1-7a46c647-8221-4aca-b6bf-ed14571fb0f1 /172.18.36.203 consumer-1
afei 2 9 9 0 consumer-1-7a46c647-8221-4aca-b6bf-ed14571fb0f1 /172.18.36.203 consumer-1
- kafka-consumer-perf-test.sh
perf是performance的縮寫进统,所以這個腳本是kafka消費者性能測試腳本助币。
用法:bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --group testGroup --topic afei --messages 1024
輸出結果如下:
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2018-07-02 22:49:10:068, 2018-07-02 22:49:12:077, 0.0001, 0.0001, 41, 20.4082, 19, 1990, 0.0001, 20.6030
- kafka-delete-records.sh
用法:bin/kafka-delete-records.sh --bootstrap-server 10.0.55.229:9092,10.0.55.229:9093,10.0.55.229:9094 --offset-json-file offset.json,offset.json文件內容:
{
"partitions": [{
"topic": "afei",
"partition": 3,
"offset": 10
}],
"version": 1
}
執(zhí)行結果如下螟碎,表示刪除afei這個topic下分區(qū)為3的offset少于10的消息日志(不會刪除offset=10的消息日志):
Executing records delete operation
Records delete operation completed:
partition: afei-3 low_watermark: 10
- kafka-log-dirs.sh
用法:bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe --topic-list afei[,topicName2,topicNameN]眉菱,如果沒有指定--topic-list
,那么會輸出所有kafka消息日志目錄以及目錄下所有topic信息掉分。加上--topic-list
參數(shù)后俭缓,輸出結果如下,由這段結果可知酥郭,消息日志所在目錄為/data/kafka-logs
华坦,并且afei這個topic有3個分區(qū):
{
"version": 1,
"brokers": [{
"broker": 0,
"logDirs": [{
"logDir": "/data/kafka-logs",
"error": null,
"partitions": [{
"partition": "afei-1",
"size": 567,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "afei-2",
"size": 639,
"offsetLag": 0,
"isFuture": false
}, {
"partition": "afei-0",
"size": 561,
"offsetLag": 0,
"isFuture": false
}]
}]
}]
}
- kafka-preferred-replica-election.sh
用法:bin/kafka-preferred-replica-election.sh --zookeeper 10.0.55.208:2181/wallet,10.0.55.209:2181/wallet,10.0.55.210:2181/wallet --path-to-json-file afei-preferred.json(如果不帶--path-to-json-file就是對所有topic進行preferred replica election),json文件內容如下::
{
"partitions": [{
"topic": "afei",
"partition": 0
},
{
"topic": "afei",
"partition": 1
},
{
"topic": "afei",
"partition": 2
}]
}
場景:在創(chuàng)建一個topic時不从,kafka盡量將partition均分在所有的brokers上惜姐,并且將replicas也均分在不同的broker上。每個partitiion的所有replicas叫做"assigned replicas"消返,"assigned replicas"中的第一個replicas叫"preferred replica"载弄,剛創(chuàng)建的topic一般"preferred replica"是leader。leader replica負責所有的讀寫撵颊。其他replica只是冷備狀態(tài)宇攻,不接受讀寫請求。但隨著時間推移倡勇,broker可能會主動停機甚至客觀宕機逞刷,會發(fā)生leader選舉遷移,導致機群的負載不均衡妻熊。我們期望對topic的leader進行重新負載均衡夸浅,讓partition選擇"preferred replica"做為leader。
kafka提供了一個參數(shù)auto.leader.rebalance.enable
自動做這件事情扔役,且默認為true帆喇,原理是一個后臺線程檢查并觸發(fā)leader balance。但是并不建議把這個參數(shù)設置為true亿胸。因為擔心這個自動選舉發(fā)生在業(yè)務高峰期坯钦,從而導致影響業(yè)務预皇。
驗證:
操作比較簡單,常見一個3個分區(qū)3個副本的topic婉刀,然后kill掉一個broker吟温。這時候topic信息如下,我們可以看到broker.id為0的broker上有兩個leader:
Topic:afei PartitionCount:3 ReplicationFactor:3 Configs:
Topic: afei Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: afei Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 0,1,2
Topic: afei Partition: 2 Leader: 0 Replicas: 2,0,1 Isr: 0,1,2
執(zhí)行kafka-preferred-replica-election.sh
腳本后突颊,topic信息如下鲁豪,leader均勻分布在3個不同的broker上,
Topic:afei PartitionCount:3 ReplicationFactor:3 Configs:
Topic: afei Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: afei Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 0,1,2
Topic: afei Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 0,1,2
- kafka-producer-perf-test.sh
perf是performance的縮寫律秃,所以這個腳本是kafka生產者性能測試腳本爬橡。
- kafka-reassign-partitions.sh
場景:將一些topic上的分區(qū)從當前所在broker移到其他比如新增的broker上。假設有個名為ORDER-DETAIL的topic友绝,在broker.id為2的broker上:
Topic:ORDER-DETAIL PartitionCount:1 ReplicationFactor:1 Configs:
Topic: ORDER-DETAIL Partition: 0 Leader: 2 Replicas: 2 Isr: 2
現(xiàn)在想要把它移動到broker.id為1的broker上堤尾,執(zhí)行腳本:bin/kafka-reassign-partitions.sh --zookeeper 10.0.55.208:2181/wallet,10.0.55.209:2181/wallet,10.0.55.210:2181/wallet --topics-to-move-json-file move.json --broker-list "1" --generate
--generate
參數(shù)表示生成一個分區(qū)再分配配置,并不會真正的執(zhí)行迁客,命令執(zhí)行結果如下:
Current partition replica assignment
{"version":1,"partitions":[{"topic":"ORDER-DETAIL","partition":0,"replicas":[2],"log_dirs":["any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"ORDER-DETAIL","partition":0,"replicas":[1],"log_dirs":["any"]}]}
我們只需要把第二段json內容保存到一個新建的final.json文件中(如果知道如何編寫這段json內容郭宝,那么也可以不執(zhí)行第一條命令),然后執(zhí)行命令:bin/kafka-reassign-partitions.sh --zookeeper 10.0.55.208:2181/wallet,10.0.55.209:2181/wallet,10.0.55.210:2181/wallet --reassignment-json-file move_final.json --execute掷漱,此次執(zhí)行的命令帶有--execute
參數(shù)粘室,說明是真正的執(zhí)行分區(qū)重分配。
通過這個命令還可以給某個topic增加副本卜范,例如有一個名為ORDER-DETAIL的topic衔统,有3個分區(qū),但是只有1個副本海雪,為了高可用锦爵,需要將副本數(shù)增加到2,那么編寫replica.json文本內容如下:
{
"version": 1,
"partitions": [{
"topic": "ORDER-DETAIL",
"partition": 0,
"replicas": [1, 2]
},
{
"topic": "ORDER-DETAIL",
"partition": 1,
"replicas": [0, 2]
},
{
"topic": "ORDER-DETAIL",
"partition": 2,
"replicas": [1, 0]
}]
}
然后執(zhí)行命令即可:bin/kafka-reassign-partitions.sh --zookeeper 10.0.55.208:2181/wallet,10.0.55.209:2181/wallet,10.0.55.210:2181/wallet --reassignment-json-file replica.json
- kafka-replica-verification.sh
用法:bin/kafka-replica-verification.sh --broker-list 10.0.55.229:9092,10.0.55.229:9093,10.0.55.229:9094 [--topic-white-list afei]奥裸,參數(shù)--topic-white-list指定要檢查的目標topic险掀。輸出結果如下,如果輸出信息為max lag is 0 for ...
表示這個topic的復制沒有任何延遲:
2018-07-03 15:04:46,889: verification process is started.
2018-07-03 15:05:16,811: max lag is 0 for partition multi-1 at offset 0 among 5 partitions
2018-07-03 15:05:46,812: max lag is 0 for partition multi-1 at offset 0 among 5 partitions
... ...
- kafka-server-start.sh
用法:bin/kafka-server-start.sh -daemon config/server.properties湾宙,指定配置文件并以守護進程模式啟動樟氢。
- kafka-server-stop.sh
用法:bin/kafka-server-stop.sh 。說明侠鳄,這個命令會kill掉當前服務器上所有kafka broker埠啃。但是這個腳本可能執(zhí)行結果為:No kafka server to stop
分析原因:我們先看一下kafka-server-stop.sh
腳本內容,這個腳本非常簡單伟恶,就是得到所有包含kafka.Kafka的進程ID碴开,但是由于kafka啟動依賴比較多的jar,導致kafka進程的ps
結果輸出內容比較長博秫,而ps
輸出結果受到PAGE_SIZE
(其值通過命令getconf PAGE_SIZE
可以得到)的限制叹螟,從而導致ps
結果中看不到kafka\.Kafka
鹃骂,所以不能kill掉kafka server:
SIGNAL=${SIGNAL:-TERM}
PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')
if [ -z "$PIDS" ]; then
echo "No kafka server to stop"
exit 1
else
kill -s $SIGNAL $PIDS
fi
為了kafka-server-stop.sh腳本可以正常執(zhí)行台盯,建議修改腳本如下罢绽,通過bin腳本所在目錄的上級目錄來查找進程ID,從而kill相關進程:
cd `dirname $0`
BIN_DIR=`pwd`
cd ..
DEPLOY_DIR=`pwd`
SIGNAL=${SIGNAL:-TERM}
PIDS=$(ps ax | grep -i "${DEPLOY_DIR}" | grep java | grep -v grep | awk '{print $1}')
if [ -z "$PIDS" ]; then
echo "No kafka server to stop"
exit 1
else
kill -s $SIGNAL $PIDS
fi
- kafka-simple-consumer-shell.sh
deprecated静盅,用法:bin/kafka-simple-consumer-shell.sh --broker-list 10.0.55.229:9092,10.0.55.229:9093,10.0.55.229:9094 --topic afei
- kafka-topics.sh
創(chuàng)建topic: bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic afei --partitions 3 --replication-factor 1
刪除topic: bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
修改topic: bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic afei --partitions 5良价,修改topic時只能增加分區(qū)數(shù)量。
查詢topic: bin/kafka-topics.sh --zookeeper localhost:2181 --describe [ --topic afei ]蒿叠,查詢時如果帶上--topic topicName
明垢,那么表示只查詢該topic的詳細信息。這時候還可以帶上--unavailable-partitions
和--under-replicated-partitions
任意一個參數(shù)市咽。
- kafka-verifiable-consumer.sh
用法:bin/kafka-verifiable-consumer.sh --broker-list 10.0.55.229:9092,10.0.55.229:9093,10.0.55.229:9094 --topic afei --group-id groupName
這個腳本的作用是接收指定topic的消息消費痊银,并發(fā)出消費者事件,例如:offset提交等施绎。
- kafka-verifiable-producer.sh
用法:bin/kafka-verifiable-producer.sh --broker-list 10.0.55.229:9092,10.0.55.229:9093,10.0.55.229:9094 --topic afei [--max-messages 64]溯革,建議使用該腳本時增加參數(shù)--max-messages
,否則會不停的發(fā)送消息谷醉。
這個腳本的作用是持續(xù)發(fā)送消息到指定的topic中致稀,參數(shù)--max-messages
限制最大發(fā)送消息數(shù)。且每條發(fā)送的消息都會有響應信息俱尼,這就是和kafka-console-producer.sh
最大的不同:
[mwopr@jtcrtvdra35 kafka_2.12-1.1.0]$ bin/kafka-verifiable-producer.sh --broker-list 10.0.55.229:9092,10.0.55.229:9093,10.0.55.229:9094 --topic afei --max-messages 9
{"timestamp":1530515959900,"name":"startup_complete"}
{"timestamp":1530515960310,"name":"producer_send_success","key":null,"value":"1","offset":5,"topic":"afei","partition":0}
{"timestamp":1530515960315,"name":"producer_send_success","key":null,"value":"4","offset":6,"topic":"afei","partition":0}
{"timestamp":1530515960315,"name":"producer_send_success","key":null,"value":"7","offset":7,"topic":"afei","partition":0}
{"timestamp":1530515960316,"name":"producer_send_success","key":null,"value":"0","offset":5,"topic":"afei","partition":1}
{"timestamp":1530515960316,"name":"producer_send_success","key":null,"value":"3","offset":6,"topic":"afei","partition":1}
{"timestamp":1530515960316,"name":"producer_send_success","key":null,"value":"6","offset":7,"topic":"afei","partition":1}
{"timestamp":1530515960316,"name":"producer_send_success","key":null,"value":"2","offset":6,"topic":"afei","partition":2}
{"timestamp":1530515960316,"name":"producer_send_success","key":null,"value":"5","offset":7,"topic":"afei","partition":2}
{"timestamp":1530515960316,"name":"producer_send_success","key":null,"value":"8","offset":8,"topic":"afei","partition":2}
{"timestamp":1530515960333,"name":"shutdown_complete"}
{"timestamp":1530515960334,"name":"tool_data","sent":9,"acked":9,"target_throughput":-1,"avg_throughput":20.689655172413794}
afei這個topic有3個分區(qū)抖单,使用kafka-verifiable-producer.sh發(fā)送9條消息。根據(jù)輸出結果可以看出遇八,往每個分區(qū)發(fā)送了3條消息矛绘。另外,我們可以通過設置參數(shù)
--max-messages
一個比較大的值刃永,可以壓測一下搭建的kafka集群環(huán)境货矮。
- zookeeper-shell.sh
用法:bin/zookeeper-shell.sh localhost:2181[/path],如果kafka集群的zk配置了chroot路徑揽碘,那么需要加上/path次屠,例如bin/zookeeper-shell.sh localhost:2181/mykafka
,登陸zk后雳刺,就可以查看kafka寫在zk上的節(jié)點信息劫灶。例如查看有哪些broker,以及broker的詳細信息:
ls /brokers/ids
[0]
get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://izwz91rhzhed2c54e6yx87z:9092"],"jmx_port":-1,"host":"izwz91rhzhed2c54e6yx87z","timestamp":"1530435834272","port":9092,"version":4}
cZxid = 0x2d3
ctime = Sun Jul 01 17:03:54 CST 2018
mZxid = 0x2d3
mtime = Sun Jul 01 17:03:54 CST 2018
pZxid = 0x2d3
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1642cb09421006c
dataLength = 216
numChildren = 0
- 寫在最后
上面的這些kafka運維腳本掖桦,有些是指定參數(shù)--zookeeper本昏,有些是指定參數(shù)--broker-list,有些是指定參數(shù)--bootstrap-server枪汪。
這實際上是歷史問題涌穆。broker-list代表broker地址怔昨,而bootstrap-server代表連接起點,可以從中拉取broker地址信息(前面的[4. kafka生產者&消費者]已經分析過)宿稀。bootstrap-server的命名更高級點趁舀。還有通過zookeeper連接的,kafka早起很多信息存方在zk中祝沸,后期慢慢弱化了zk的作用矮烹,這三個參數(shù)代表kafka的三個時代。往好的講是見證kafka設計的理念變遷罩锐,往壞的講:什么**玩意兒奉狈,繞的一筆(來自廝大大的解答),哈涩惑。