轉(zhuǎn)發(fā)文章--http://www.reibang.com/p/011567554f0f
SASL/SCRAM+ACL實現(xiàn)動態(tài)創(chuàng)建用戶及權(quán)限控制
本篇文檔中使用的是自己部署的zookeeper, zookeeper無需做任何特殊配置
使用SASL / SCRAM進行身份驗證
請先在不配置任何身份驗證的情況下啟動Kafka
1. 創(chuàng)建SCRAM Credentials
Kafka中的SCRAM實現(xiàn)使用Zookeeper作為憑證(credential)存儲儒鹿。 可以使用kafka-configs.sh
在Zookeeper中創(chuàng)建憑據(jù)韧献。 對于啟用的每個SCRAM機制,必須通過添加具有機制名稱的配置來創(chuàng)建憑證纠脾。 必須在啟動Kafka broker之前創(chuàng)建代理間通信的憑據(jù)孩哑。 可以動態(tài)創(chuàng)建和更新客戶端憑證栓霜,并使用更新的憑證來驗證新連接。
創(chuàng)建broker建通信用戶(或稱超級用戶)
bin/kafka-configs.sh --zookeeper node1:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin
創(chuàng)建客戶端用戶fanboshi
bin/kafka-configs.sh --zookeeper node1:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=fanboshi],SCRAM-SHA-512=[password=fanboshi]' --entity-type users --entity-name fanboshi
查看SCRAM證書
[root@node002229 kafka]# bin/kafka-configs.sh --zookeeper node1:2181 --describe --entity-type users --entity-name fanboshi
Configs for user-principal 'fanboshi' are SCRAM-SHA-512=salt=MWwwdWJqcjBncmUwdzY1Mzdoa2NwNXppd3A=,stored_key=mGCJy5k3LrE2gs6Dp4ALRhgy37l1WYPUIdoOncCF+B3Ti3wL2sQNmzg8oEz3tUs9DFsclFCygjbysb0S0BU9bA==,server_key=iTyX0U0Jt02dkddUm6QrVwNf3lJk72dBNs9EDHTqe8kLlNGIp9ypzRkcgkc+WVMd1bkAF3cg8vk9Q1LrJ/2i/A==,iterations=4096,SCRAM-SHA-256=salt=ZDg5MHVlYW40dW9jbXJ6MndvZDVlazd3ag==,stored_key=cgX1ldpXnDL1+TlLHJ3IHn7tAQS/7pQ7BVZUtECpQ3A=,server_key=i7Mcnb5sPUqfIFs6qKWWHZ2ortoKiRc7oabHOV5dawI=,iterations=8192
刪除SCRAM證書
這里只演示,不操作
[root@node002229 kafka]# bin/kafka-configs.sh --zookeeper node1:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name fanboshi
2. 配置Kafka Brokers
- 在每個Kafka broker的config目錄中添加一個類似下面的JAAS文件横蜒,我們稱之為kafka_server_jaas.conf:
[root@node002229 config]# cat kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
};
注意不要少寫了分號
- 將JAAS配置文件位置作為JVM參數(shù)傳遞給每個Kafka broker:
修改 /usr/local/kafka/bin/kafka-server-start.sh
將exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
注釋, 增加下面的內(nèi)容
#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=$base_dir/../config/kafka_server_jaas.conf kafka.Kafka "$@"
或者不修改kafka-server-start.sh
腳本, 而是將下面的內(nèi)容添加到~/.bashrc
export KAFKA_PLAIN_PARAMS="-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf"
export KAFKA_OPTS="$KAFKA_PLAIN_PARAMS $KAFKA_OPTS"
- 如此處所述胳蛮,在server.properties中配置SASL端口和SASL機制销凑。 例如:
# 認證配置
listeners=SASL_PLAINTEXT://node002229:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
# ACL配置
allow.everyone.if.no.acl.found=false
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
再官方文檔中寫的是
listeners=SASL_SSL://host.name:port
security.inter.broker.protocol=SASL_SSL
這里其實沒必要寫成SASL_SSL
, 我們可以根據(jù)自己的需求選擇SSL或PLAINTEXT, 我這里選擇PLAINTEXT不加密明文傳輸, 省事, 性能也相對好一些
- 重啟ZK/Kafka
重啟ZK / Kafka服務. 所有broker在連接之前都會引用'kafka_server_jaas.conf'.
Zookeeper所有節(jié)點
[root@node002229 zookeeper]# zkServer.sh stop /usr/local/zookeeper/bin/../conf/zoo.cfg
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo1.cfg
Stopping zookeeper ... STOPPED
[root@node002229 zookeeper]# zkServer.sh start /usr/local/zookeeper/bin/../conf/zoo.cfg
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo1.cfg
Kafka所有Broker
cd /usr/local/kafka/;bin/kafka-server-stop.sh
cd /usr/local/kafka/;bin/kafka-server-start.sh -daemon config/server.properties
客戶端配置
先使用kafka-console-producer 和 kafka-console-consumer 測試一下
kafka-console-producer
- 創(chuàng)建 config/client-sasl.properties 文件
[root@node002229 kafka]# vim config/client-sasl.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
- 創(chuàng)建config/kafka_client_jaas_admin.conf文件
[root@node002229 kafka]# vim config/kafka_client_jaas_admin.conf
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
};
- 修改kafka-console-producer.sh腳本
這里我復制一份,再改
cp bin/kafka-console-producer.sh bin/kafka-console-producer-admin.sh
vim bin/kafka-console-producer-admin.sh
#exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/kafka_client_jaas_admin.conf kafka.tools.ConsoleProducer "$@"
- 創(chuàng)建測試topic
bin/kafka-topics.sh --create --zookeeper node1:2181 --partitions 1 --replication-factor 1 --topic test
- 測試生產(chǎn)消息
bin/kafka-console-producer-admin.sh --broker-list node1:9092 --topic test --producer.config config/client-sasl.properties
>1
>
可以看到admin用戶無需配置ACL就可以生成消息
- 測試fanboshi用戶
如法炮制, 我們創(chuàng)建一個bin/kafka-console-producer-fanboshi.sh文件, 只是修改其中的kafka_client_jaas_admin.conf 為 kafka_client_jaas_fanboshi.conf
vim config/kafka_client_jaas_fanboshi.conf
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="fanboshi"
password="fanboshi";
};
cp bin/kafka-console-producer-admin.sh bin/kafka-console-producer-fanboshi.sh
vi bin/kafka-console-producer-fanboshi.sh
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/kafka_client_jaas_fanboshi.conf kafka.tools.ConsoleProducer "$@"
生產(chǎn)消息
[root@node002229 kafka]# bin/kafka-console-producer-fanboshi.sh --broker-list node1:9092 --topic test --producer.config config/client-sasl.properties
>1
[2019-01-26 18:07:50,099] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2019-01-26 18:07:50,100] ERROR Error when sending message to topic test with key: null, value: 1 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]
可以看到報錯了, 因為fanboshi用戶還沒有權(quán)限
kafka-console-consumer
- 創(chuàng)建 config/consumer-fanboshi.properties 文件
[root@node002229 kafka]# vim config/consumer-fanboshi.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
group.id=fanboshi-group
- 創(chuàng)建 bin/kafka-console-consumer-fanboshi.sh 文件
cp bin/kafka-console-consumer.sh bin/kafka-console-consumer-fanboshi.sh
vim bin/kafka-console-consumer-fanboshi.sh
#exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/kafka_client_jaas_fanboshi.conf kafka.tools.ConsoleConsumer "$@"
- 測試消費者
bin/kafka-console-consumer-fanboshi.sh --bootstrap-server node1:9092 --topic test --consumer.config config/consumer-fanboshi.properties --from-beginning
其實也會報錯的, 報錯內(nèi)容就不貼了
ACL配置
授予fanboshi用戶對test topic 寫權(quán)限, 只允許 192.168.2.* 網(wǎng)段
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=node1:2181 --add --allow-principal User:fanboshi --operation Write --topic test --allow-host 192.168.2.*
授予fanboshi用戶對test topic 讀權(quán)限, 只允許 192.168.2.* 網(wǎng)段
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=node1:2181 --add --allow-principal User:fanboshi --operation Read --topic test --allow-host 192.168.2.*
授予fanboshi用戶, fanboshi-group 消費者組 對test topic 讀權(quán)限, 只允許 192.168.2.* 網(wǎng)段
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=node1:2181 --add --allow-principal User:fanboshi --operation Read --group fanboshi-group --allow-host 192.168.2.*
查看acl配置
[root@node002229 kafka]# bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=node1:2181 --list
Current ACLs for resource `Group:LITERAL:fanboshi-group`:
User:fanboshi has Allow permission for operations: Read from hosts: *
Current ACLs for resource `Topic:LITERAL:test`:
User:fanboshi has Allow permission for operations: Write from hosts: *
User:fanboshi has Allow permission for operations: Read from hosts: *
刪除配置
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=node1:2181 --remove --allow-principal User:fanboshi --operation Write --topic test --allow-host *
再次測試
生產(chǎn)者
[root@node002229 kafka]# bin/kafka-console-producer-fanboshi.sh --broker-list node1:9092 --topic test --producer.config config/client-sasl.properties
>1
[2019-01-26 18:07:50,099] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2019-01-26 18:07:50,100] ERROR Error when sending message to topic test with key: null, value: 1 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]
>1
消費者
[root@node002229 kafka]# bin/kafka-console-consumer-fanboshi.sh --bootstrap-server node1:9092 --topic test --consumer.config config/consumer-fanboshi.properties --from-beginning
1
1
都沒問題了
如何查看我們創(chuàng)建了哪些"用戶"
好像只能去zookeeper看?
zkCli.sh -server node1:2181
ls /config/users
[admin, alice, fanboshi]
嘗試刪除alice
[root@node002229 kafka]# bin/kafka-configs.sh --zookeeper node1:2181 --describe --entity-type users --entity-name alice
Configs for user-principal 'alice' are SCRAM-SHA-512=salt=MWt1OHRhZnd3cWZvZ2I4bXcwdTM0czIyaTQ=,stored_key=JYeud1Cx5Z2+FaJgJsZGbMcIi63B9XtA9Wyc+KEm2gXK8+2IxxAVvi1CfSjlkqeupfeIMFJ7/EUkOw+zqvYz6w==,server_key=O4NIgjleroia7puK01/ZZoagFeoxh+zHzckGXXooBsWTdx/7Shb0pMHniMu4IY2jb5orWB2t9K8MZkxCliJDsg==,iterations=4096,SCRAM-SHA-256=salt=MTJ3bXRod3EyN3FtZWdsNHk0NXoyeWdlNjE=,stored_key=chQX35reoBYtfg/U5HBtkzvBAk+gSCgskNzUiScOrUE=,server_key=rRTbUzAehwVMUDTMuoOMumGEuvc7wDecKcqK6yYlbWY=,iterations=8192
[root@node002229 kafka]# bin/kafka-configs.sh --zookeeper node1:2181 --alter --delete-config 'SCRAM-SHA-512' --entity-type users --entity-name alice
Completed Updating config for entity: user-principal 'alice'.
[root@node002229 kafka]# bin/kafka-configs.sh --zookeeper node1:2181 --describe --entity-type users --entity-name alice
Configs for user-principal 'alice' are SCRAM-SHA-256=salt=MTJ3bXRod3EyN3FtZWdsNHk0NXoyeWdlNjE=,stored_key=chQX35reoBYtfg/U5HBtkzvBAk+gSCgskNzUiScOrUE=,server_key=rRTbUzAehwVMUDTMuoOMumGEuvc7wDecKcqK6yYlbWY=,iterations=8192
[root@node002229 kafka]# bin/kafka-configs.sh --zookeeper node1:2181 --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name alice
Completed Updating config for entity: user-principal 'alice'.
[root@node002229 kafka]# bin/kafka-configs.sh --zookeeper node1:2181 --describe --entity-type users --entity-name alice
Configs for user-principal 'alice' are
去ZK查看
[zk: node1:2181(CONNECTED) 0] ls /config/users
[admin, alice, fanboshi]
kafka ACL常用權(quán)限操作
創(chuàng)建topic
使用bin/kafka-topics.sh創(chuàng)建
注意工具bin/kafka-topics.sh訪問的是zookeeper而不是kafka,即他是一個zookeeper client而不是一個kafka client仅炊,所以它的認證都是通過zookeeper完成的斗幼。
Case 1:如果zookeeper沒有配置ACL激活:
/opt/kafka/bin/kafka-topics.sh --create \
--zookeeper node1:2181 \
--replication-factor 1 \
--partitions 1 \
--topic kafkaclient-topic
Case 2:如果zookeeper已經(jīng)配置ACL激活:
命令還是前面的那個命令,但是必須提供java.security.auth.login.config指向jaas.conf文件茂洒。例如:
$ cat $ZOOKEEPER_HOME/conf/zookeeper_client_jaas_admin.conf
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpwd";
};
命令的配置可以直接修改jvm的啟動腳本孟岛,或者設置在環(huán)境變量里:
export KAFKA_OPTS=-Djava.security.auth.login.config=$ZOOKEEPER_HOME/conf/zookeeper_client_jaas_admin.conf
這里配置的用戶必須是zookeeper服務端已經(jīng)配置運行可以訪問的客戶端用戶。例如督勺,下面的zookeeper服務端配置:
$ cat $ZOOKEEPER_HOME/conf/zookeeper_jaas.conf
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpwd"
user_admin="adminpwd";
};
運行客戶端為admin的用戶作為zookeeper客戶端連接訪問渠羞。
查詢topic
kafka-topics.sh --list --zookeeper node1:2181
查詢topic操作的ACL認證,同前面創(chuàng)建topic操作的認證一樣智哀,不細說次询,參考前面。
刪除topic
/kafka-topics.sh \
--delete \
--zookeeper node1:2181 \
--topic kafkaclient-topic
刪除topic操作的ACL認證瓷叫,同前面創(chuàng)建topic操作的認證一樣屯吊,不細說,參考前面摹菠。