本人很懶,筆記這些都寫在有道云忘衍;此外逾苫,感覺kafka的資料是真的多,且官網(wǎng)關(guān)于命令是真的很詳細(xì)了枚钓,把自己常用的命令記錄與分享下铅搓。
備注:命令基于kafka-2.1.1,如有雷同搀捷,聯(lián)系侵刪
# topic 相關(guān)
./kafka-topics.sh --zookeeper=zk_server --describe --topic test_topic
./kafka-topics.sh --zookeeper=zk_server --delete --topic test_topic
./kafka-topics.sh --zookeeper=zk_server --create --partitions 1 --replication-factor 3 --topic test_topic
# 可以將副本指定到對應(yīng)的節(jié)點,結(jié)合寫個小工具生成分布字符串,方便做隔離,
./kafka-topics.sh --zookeeper=zk_server --create --replica-assignment 1:3,2:1,3:2 --topic test_topic
# 查看topic列表星掰,看是否創(chuàng)建成功
./kafka-topics.sh --zookeeper=zk_server --list
./kafka-topics.sh --zookeeper=zk_server --alter --topic test_topic --partitions 3
./kafka-topics.sh --zookeeper=zk_server --alter --partitions 3 --topic test_topic --replica-assignment 1:3,2:1,3:2
./kafka-configs.sh --zookeeper=zk_server --alter --entity-name test_topic --entity-type topics --delete-config retention.ms
# 遷移相關(guān),可先修改保存時間避免大量數(shù)據(jù)同步
./kafka-configs.sh --zookeeper=zk_server --alter --entity-type topics --add-config retention.ms=86400000 --entity-name test_topic
# 第一步:指定topic
echo {\"topics\": [{\"topic\": \"test_topic\"}],\"version\": 1} > ../conf/topic.json
# 第二步:生成分配策略
./kafka-reassign-partitions.sh --zookeeper=zk_server --topics-to-move-json-file ../conf/topic.json --broker-list "1,2,3" --generate
# 第三步:不限速執(zhí)行遷移(沒有流量推薦使用,有流量慎重使用)
./kafka-reassign-partitions.sh --zookeeper=zk_server --reassignment-json-file ../conf/reassignment.json --execute
# 第四步:限速執(zhí)行遷移(推薦使用)嫩舟,遷移完成需執(zhí)行下面的命令
./kafka-reassign-partitions.sh --zookeeper=zk_server --reassignment-json-file ../conf/reassignment.json --execute --throttle 52400000
# 切換目錄遷移:any同時限速(如果是鑒權(quán)集群可能要加一些賬密配置)
./kafka-reassign-partitions.sh --zookeeper=zk_server --bootstrap-server broker:port --reassignment-json-file ../conf/reassignment.json --execute --replica-alter-log-dirs-throttle 52400000
# 第五步:查看遷移進度氢烘,如果全部遷移完成執(zhí)行該命令移除限速
./kafka-reassign-partitions.sh --zookeeper=zk_server --reassignment-json-file ../conf/reassignment.json --verify
./kafka-reassign-partitions.sh --zookeeper=zk_server --bootstrap-server broker:port --reassignment-json-file ../conf/reassignment.json --verify
# 指定均衡
./kafka-preferred-replica-election.sh --zookeeper=zk_server --path-to-json-file ../conf/reassignment.json
# 全局均衡(生產(chǎn)慎用)
./kafka-preferred-replica-election.sh --zookeeper=zk_server
# 授權(quán)相關(guān),查看授權(quán)列表
./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --list
# 查看用戶信息
./kafka-configs.sh --zookeeper=zk_server --describe --entity-name test_user --entity-type users
# 添加賬戶
./kafka-configs.sh --zookeeper=zk_server --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=test_user]' --entity-type users --entity-name test_user
# 添加寫
./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --add --allow-principal User:'test_user' --allow-host '*' --operation Write --topic 'test_topic'
# 添加讀
./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --add --allow-principal User:'test_user' --allow-host '*' --operation Read --topic 'test_topic' --group 'group_test_topic'
# 事務(wù)-賬戶冪等寫入集群
./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --add --allow-principal User:'test_user' --allow-host '*' --operation IdempotentWrite --cluster kafka-cluster
# 事務(wù)-賬戶授事務(wù)ID的所有權(quán)限
./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --add --allow-principal User:'test_user' --allow-host '*' --operation ALL --transactional-id '*'
# 移除授權(quán)(生產(chǎn)慎用)
# ./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --remove --allow-principal User:'test_user' --operation Read --topic 'test_topic' --group 'group_test_topic' --allow-host '*'
# ./kafka-acls.sh --authorizer-properties zookeeper.connect=zk_server --remove --allow-principal User:'test_user' --operation Write --topic 'test_topic' --allow-host '*'
# LAG
./kafka-consumer-groups.sh --bootstrap-server broker:port --describe --group group_test_topic
# 消費內(nèi)部topic數(shù)據(jù)
./kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server broker:port --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"? --max-messages 10000 > ./group.txt
./kafka-console-consumer.sh --topic __consumer_offsets --partition 1 --bootstrap-server broker:port --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
# 設(shè)置到某個時間-新加坡不用調(diào)整時差,國內(nèi)有8h時差家厌,比如:國內(nèi)需要設(shè)置為14:00開始播玖,則:2021-02-25T06:00:00.000
./kafka-consumer-groups.sh --bootstrap-server broker:port --topic test_topic --reset-offsets --to-datetime 2021-06-25T23:00:00.000 --group group_test_topic --execute
# 控制臺生產(chǎn)
./kafka-console-producer.sh --topic resource_author_pond --broker-list broker:port
# 控制臺消費
./kafka-console-consumer.sh --bootstrap-server broker:port --topic test_topic --from-beginning
./kafka-console-consumer.sh --bootstrap-server broker:port --max-messages 100 --topic test_topic > ./records.txt
# 限速相關(guān)
# 用戶-clientId
./kafka-configs.sh --zookeeper=zk_server --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=700,request_percentage=200' --entity-type users --entity-name test_user --entity-type clients --entity-name my_client_id
# 刪除
./kafka-configs.sh --zookeeper=zk_server --alter --delete-config 'producer_byte_rate,consumer_byte_rate,request_percentage' --entity-type users --entity-name test_user --entity-type clients --entity-name my_client_id
# clientId
./kafka-configs.sh --zookeeper=zk_server --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type clients --entity-name clientA
# 刪除
./kafka-configs.sh --zookeeper=zk_server --alter --delete-config 'producer_byte_rate,consumer_byte_rate,request_percentage' --entity-type clients --entity-name my_client_id
# 添加限速[關(guān)于broker建的副本同步]
./kafka-configs.sh --zookeeper=zk_server --alter --add-config 'leader.replication.throttled.rate=629145600,follower.replication.throttled.rate=629145600' --entity-type brokers --entity-name 1
# 必須同步設(shè)置topic
./kafka-configs.sh --zookeeper=zk_server --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test_topic
# 移除限速
./kafka-configs.sh --zookeeper=zk_server --alter --delete-config 'leader.replication.throttled.rate,follower.replication.throttled.rate' --entity-type brokers --entity-name 1
./kafka-configs.sh --zookeeper=zk_server --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas' --entity-type topics --entity-name test_topic
./kafka-configs.sh --zookeeper=zk_server --alter --delete-config 'consumer_byte_rate' --entity-type users --entity-name test_user --entity-type clients --entity-name 'my_client_id'
# 查看文件內(nèi)容
./kafka-run-class.sh kafka.tools.DumpLogSegments --files ./xxxxx.log --print-data-log | less
## 修改消費組偏移量,最近
./kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server broker:port --describe --group group_test_topic
./kafka-consumer-groups.sh --bootstrap-server broker:port --topic test_topic --reset-offsets --to-earliest --execute --group group_test_topic
## 修改消費組偏移量,任意
./kafka-consumer-groups.sh --bootstrap-server broker:port --topic test_topic --reset-offsets --to-offset 174000 --execute --group group_test_topic
## 修改消費組偏移量,最開始
./kafka-consumer-groups.sh --bootstrap-server broker:port --topic test_topic --reset-offsets --to-latest --execute --group group_test_topic
# 壓測
./kafka-producer-perf-test.sh --topic tiger_test --num-records 5000000000000 --record-size 10240 --throughput 500000 --producer-props bootstrap.servers=broker:port
./kafka-consumer-perf-test.sh --broker-list broker:port --topic test_topic --fetch-size 1048576 --messages 10000000 --threads 1