ubuntu版本
java安裝
- sudo apt-get update
- sudo apt-get install openjdk-8-jdk
- java --version
zookeeper安裝
下載
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz解壓縮
tar -zxvf zookeeper-3.4.6.tar.gz進(jìn)入目錄 zookeeper-3.4.6/conf磁椒,將 zoo_sample.cfg重命名為 zoo.cfg
cp zoo_sample.cfg zoo.cfg啟動(dòng)zk
bin/zkServer.sh start查看zk啟動(dòng)狀態(tài)
bin/zkServer.sh status
kafka安裝
下載
wget https://archive.apache.org/dist/kafka/2.4.1/kafka_2.11-2.4.1.tgz解壓縮
tar -zxvf kafka_2.11-2.4.1.tgz進(jìn)入 kafka_2.11-2.4.1 啟動(dòng)kafka
bin/kafka-server-start.sh config/server.properties
后臺(tái)啟動(dòng):nohup bin/kafka-server-start.sh config/server.properties 2>&1 &正常kafka能啟動(dòng)井辜,如果kafka報(bào)錯(cuò)如下
Caused by: java.net.UnknownHostException: xxx: Name or service not known
修改 config/server.properties 加上配置 listeners=PLAINTEXT://10.0.0.3:8090
發(fā)送生產(chǎn)消息
./bin/kafka-console-producer.sh --broker-list 10.0.0.3:8090 --topic mytest --producer.config config/producer.properties接收生產(chǎn)消息
./bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.3:8090 --topic mytest --from-beginning --consumer.config config/consumer.properties遠(yuǎn)程外網(wǎng)連接使用
以上配置者疤,用外網(wǎng)進(jìn)行連接的時(shí)候苛让,會(huì)報(bào)超時(shí)
Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic XXX_TOPIC not present in metadata after 30000 ms.
Connection to node 0 (/10.0.0.3:8090) could not be established. Broker may not be available.
[Producer clientId=producer-1] Connection to node 0 (/10.0.0.3:8090) could not be established. Broker may not be available.
增加以下配置即可: advertised.listeners=PLAINTEXT://xx_外網(wǎng)ip地址:8090
sals/scram配置
- 修改 kafka_2.11-2.4.1/config 下的 server.properties 文件
# 開(kāi)啟 sasl認(rèn)證,如果要開(kāi)啟ACL恢共,則要加上下面一段配置,是針對(duì)topic進(jìn)行權(quán)限控制
listeners=SASL_PLAINTEXT://10.0.0.3:8090
advertised.listeners=SASL_PLAINTEXT://xx_外網(wǎng)ip地址:8090
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512
# false 只有配置了用戶(hù)能訪問(wèn) ; true 所有用戶(hù)都能訪問(wèn)稚补,只有部分不能訪問(wèn),針對(duì)黑名單
allow.everyone.if.no.acl.found=false
super.users=User:admin
# ACL相關(guān)配置框喳,配置了開(kāi)啟針對(duì)topic/用戶(hù) 級(jí)別的讀寫(xiě)控制课幕,老版本
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
- 在 config 目錄下增加 kafka_server_jaas.conf 文件(名字自定義即可),文件里面的配置的內(nèi)容
KafkaServer{
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-sec"
};
KafkaServer的意思是指五垮,kafka broker 之間通訊的賬號(hào)和密碼
- 將賬號(hào)信息乍惊,通過(guò)zookeeper的方式進(jìn)行配置,2中 admin 賬號(hào)也是通過(guò)這種方式配置
# 配置admin
./bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter --add-config 'SCRAM-SHA-512=[password=admin-sec]' --entity-type users --entity-name admin
# 配置生產(chǎn)者賬號(hào) producer
./bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter --add-config 'SCRAM-SHA-512=[password=ptest]' --entity-type users --entity-name ptest
# 配置消費(fèi)者賬號(hào) consumer
./bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter --add-config 'SCRAM-SHA-512=[password=ctest]' --entity-type users --entity-name ctest
#如果開(kāi)啟了acl認(rèn)證放仗,則需要針對(duì)topic進(jìn)行授權(quán)润绎,以下是對(duì) ptest 賬號(hào)授權(quán)
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:ptest --operation Write --topic mytes
# ctest
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:ctest --operation Read --topic mytest
#授權(quán)消費(fèi)組隊(duì)topic的讀權(quán)限
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:ctest --operation Read --group test-consumer-group
- 配置kafka啟動(dòng)加載的賬號(hào)信息,修改 bin/kafka-server-start.sh文件诞挨,在配置文件中加上以下配置莉撇,在有效配置的最上面貼上就可以
export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka_2.11-2.4.1/config/kafka_server_jaas.conf"
- Java客戶(hù)端測(cè)試
#增加配置
Map<String, Object> props = new HashMap<>();
props.putxxx ...
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","SCRAM-SHA-512");
props.put("sasl.jaas.config","org.apache.kafka.common.security.scram.ScramLoginModule required username='ctest' password='ctest'");
- 如果要在服務(wù)器上測(cè)試,則需要做以下配置
# 生產(chǎn)者 - 修改配置 config/producer.properties 增加以下配置
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="ptest" \
password="ptest";
# 生產(chǎn)者啟動(dòng)命令
./bin/kafka-console-producer.sh --broker-list 10.0.0.3:8090 --topic mytest --producer.config config/producer.properties
# 消費(fèi)者 - 修改配置 config/consumer.properties 增加以下配置
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="ctest" \
password="ctest";
# 消費(fèi)者啟動(dòng)命令
./bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.3:8090 --topic mytest --from-beginning --consumer.config config/consumer.properties
問(wèn)題及概念理解
1 .listeners與 advertised.listeners 的區(qū)別
listeners: 定義了kafka服務(wù)器內(nèi)部監(jiān)聽(tīng)的地址和端口
advertised.listeners 定義了向客戶(hù)端公開(kāi)的地址和端口惶傻,通過(guò)zk進(jìn)行數(shù)據(jù)共享棍郎,它會(huì)保存在zk中 /brokers/ids/0 的endpoints中 。
(使用場(chǎng)景:當(dāng)公網(wǎng)ip不是服務(wù)器網(wǎng)卡银室,而是通過(guò)代理綁定涂佃,無(wú)法通過(guò)listener進(jìn)行綁定,只能通過(guò)0.0.0.0綁定蜈敢,當(dāng)外部需要訪問(wèn)kafka集群時(shí)辜荠,通過(guò)zk拿所有的broker節(jié)點(diǎn)的公網(wǎng)地址進(jìn)行訪問(wèn))