為了kafka集群的安全企软,我們一般會(huì)開(kāi)啟認(rèn)證,kafka的認(rèn)證方式有多種枕磁,詳細(xì)見(jiàn)http://kafka.apache.org/documentation/#security, 這里只講下sasl認(rèn)證方式
1. kafka的認(rèn)證范圍
- kafka client 與 kafka server(broker)
- broker與broker之間
- broker與zookeeper之間
2. 開(kāi)啟認(rèn)證的步驟
2.1 zookeeper的認(rèn)證配置
- 在zookeeper安裝根目錄的conf目錄下妻坝,創(chuàng)建zk_server_jaas.conf, 文件內(nèi)容如下
Server { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_kafka="kafka-secret"; };
user_kafka="kafka-secret"; 創(chuàng)建用戶kafka, 密碼是"kafka-secret", 該賬號(hào)用于kafka broker與zookeeper連接的時(shí)候的認(rèn)證
- 修改zoo.cfg铣鹏, 添加一下內(nèi)容
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000
- 因?yàn)檎J(rèn)證的時(shí)候用到包org.apache.kafka.common.security.plain.PlainLoginModule, 這個(gè)是kafka-client.jar里面敷扫,所有需要將相應(yīng)的jar拷貝到 zookeeper安裝根目錄的lib目錄下, 大概要copy這些jar
kafka-clients-2.0.0.jar lz4-java-1.4.1.jar osgi-resource-locator-1.0.1.jar slf4j-api-1.7.25.jar snappy-java-1.1.7.1.jar
- 然后就是修改zk的啟動(dòng)參數(shù), 修改 bin/zkEnv.sh, 在文件尾加上
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOCFGDIR/zk_server_jaas.conf "
- 然后重新啟動(dòng)zookeeper服務(wù)就好(如果是集群,每個(gè)集群都進(jìn)行相應(yīng)的操作)
2.2 kafka broker的認(rèn)證配置
- 在安裝根目錄的config目錄下, 創(chuàng)建kafka_server_jaas.conf, 內(nèi)容如下
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_alice="alice-secret"; }; Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-secret"; };
KafkaServer段里面配置了broker之間的認(rèn)證配置以及client和broker之間的認(rèn)證配置
KafkaServer.username, KafkaServer.password用于broker之間的相互認(rèn)證
KafkaServer.user_admin和KafkaServer.user_alice用于client和broker之間的認(rèn)證, 下面我們client里面都用用戶alice進(jìn)行認(rèn)證
Client段里面定義username和password用于broker與zookeeper連接的認(rèn)證
- 修改 config/server.properties
# 本例使用SASL_PLAINTEXT listeners=SASL_PLAINTEXT://127.0.0.1:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN # 配置ACL入口類(lèi) authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer # default false | true to accept all the users to use it. # allow.everyone.if.no.acl.found=true # 設(shè)置本例中admin為超級(jí)用戶 super.users=User:admin
- 修改kafka啟動(dòng)腳本, 添加 java.security.auth.login.config 環(huán)境變量诚卸。打開(kāi) bin/kafka-server-start.sh
將最后一句
修改為exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/data1/soft/kafka_cluster_ss/k1/config/kafka_server_jaas.conf kafka.Kafka "$@"
- 然后重啟kafka server就可(如果是集群葵第,所有的broker都進(jìn)行相應(yīng)的修改操作)
2.3 kafka client的認(rèn)證配置
- 在kakfka根目錄/config/下創(chuàng)建kafka_client_jaas.conf, 內(nèi)容如下
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret"; };
- producer 配置修改
- 修改 producer腳本啟動(dòng)參數(shù), 打開(kāi)bin/kafka-console-producer.sh
將最后一行
修改為exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data1/soft/kafka_cluster_ss/k1/config/kafka_client_jaas.conf kafka.tools.ConsoleProducer "$@"
- 創(chuàng)建一個(gè)producer.config 為 console producer指定下面兩個(gè)屬性
security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN
- 測(cè)試
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config producer.config
- 修改 producer腳本啟動(dòng)參數(shù), 打開(kāi)bin/kafka-console-producer.sh
- consumer配置修改
- 修改 consume腳本啟動(dòng)參數(shù), 打開(kāi)bin/kafka-console-consumer.sh
將最后一行
修改為exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data1/soft/kafka_cluster_ss/k1/config/kafka_client_jaas.conf kafka.tools.ConsoleConsumer "$@"
- 創(chuàng)建一個(gè)consumer.config 為 console consumer指定下面三個(gè)屬性
security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN group.id=test-group
- 修改 consume腳本啟動(dòng)參數(shù), 打開(kāi)bin/kafka-console-consumer.sh
- 測(cè)試
上面修改了console-producer 以及console-consumer的配置以及啟動(dòng)腳本之后,會(huì)發(fā)現(xiàn)并不能正常的進(jìn)行消息生成和消費(fèi)合溺,會(huì)提示認(rèn)證失敗卒密,這個(gè)時(shí)候就需要對(duì)用戶進(jìn)行權(quán)限分配了- 為topic test_tp1 添加生產(chǎn)者用戶alice
[root@test kafka_cluster_ss]# k1/bin/kafka-acls.sh --authorizer- properties zookeeper.connect=localhost:2181,localhost:2182,localhost:2183 --add --allow-principal User:alice --producer --topic test_tp1 Adding ACLs for resource `Topic:LITERAL:test_tp1`: User:alice has Allow permission for operations: Describe from hosts: * User:alice has Allow permission for operations: Create from hosts: * User:alice has Allow permission for operations: Write from hosts: * Current ACLs for resource `Topic:LITERAL:test_tp1`: User:alice has Allow permission for operations: Describe from hosts: * User:alice has Allow permission for operations: Create from hosts: * User:alice has Allow permission for operations: Write from hosts: *
- 為topic test_tp1添加消費(fèi)者用戶alice, 消費(fèi)組位 test_group
[root@test kafka_cluster_ss]# k1/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181,localhost:2182,localhost:2183 --add --allow-principal User:alice -- consumer --topic test_tp1 --group test_group Adding ACLs for resource `Topic:LITERAL:test_tp1`: User:alice has Allow permission for operations: Describe from hosts: * User:alice has Allow permission for operations: Read from hosts: * Adding ACLs for resource `Group:LITERAL:test_group`: User:alice has Allow permission for operations: Read from hosts: * Current ACLs for resource `Topic:LITERAL:test_tp1`: User:alice has Allow permission for operations: Describe from hosts: * User:alice has Allow permission for operations: Create from hosts: * User:alice has Allow permission for operations: Write from hosts: * User:alice has Allow permission for operations: Read from hosts: * Current ACLs for resource `Group:LITERAL:test_group`: User:alice has Allow permission for operations: Read from hosts: *
- 生產(chǎn)者測(cè)試
[root@test kafka_cluster_ss]# k1/bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test_tp1 --producer.config producer.config >s >ss1 >ss2 >
- 消費(fèi)者測(cè)試
[root@test kafka_cluster_ss]# k1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic test_tp1 --from-beginning --consumer.config consumer.config s1 ss1 ss2
- 為topic test_tp1 添加生產(chǎn)者用戶alice
2.4 acl 例子
-
添加acl
假設(shè)你要添加一個(gè)acl “以允許198.51.100.0和198.51.100.1棠赛,Principal為User:Bob和User:Alice對(duì)主題是Test-Topic有Read和Write的執(zhí)行權(quán)限” 哮奇√鸥可通過(guò)以下命令實(shí)現(xiàn):bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
默認(rèn)情況下,所有的principal在沒(méi)有一個(gè)明確的對(duì)資源操作訪問(wèn)的acl都是拒絕訪問(wèn)的鼎俘。在極少的情況下哲身,acl
允許訪問(wèn)所有的資源,但一些principal我們可以使用 --deny-principal 和 --deny-host來(lái)拒絕訪問(wèn)贸伐。例如勘天,如
果我們想讓所有用戶讀取Test-topic,只拒絕IP為198.51.100.3的User:BadBob捉邢,我們可以使用下面的命令:bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic
需要注意的是--allow-host和deny-host僅支持IP地址(主機(jī)名不支持)脯丝。上面的例子中通過(guò)指定--topic [topic-name]作為資源選項(xiàng)添加ACL到一個(gè)topic。同樣伏伐,用戶通過(guò)指定--cluster和通過(guò)指定--group [group-name]消費(fèi)者組添加ACL宠进。
-
刪除acl
刪除和添加是一樣的,--add換成--remove選項(xiàng)藐翎,要?jiǎng)h除第一個(gè)例子中添加的材蹬,可以使用下面的命令:bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
-
acl列表
我們可以通過(guò)--list選項(xiàng)列出所有資源的ACL。假設(shè)要列出Test-topic阱高,我們可以用下面的選項(xiàng)執(zhí)行CLI所有的ACL:bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
-
添加或刪除作為生產(chǎn)者或消費(fèi)者的principal
acl管理添加/移除一個(gè)生產(chǎn)者或消費(fèi)者principal是最常見(jiàn)的使用情況赚导,所以我們?cè)黾痈憷倪x項(xiàng)處理這些情況。為主題Test-topic添加一個(gè)生產(chǎn)者User:Bob赤惊,我們可以執(zhí)行以下命令:bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topic
同樣吼旧,添加Alice作為主題Test-topic的消費(fèi)者,用消費(fèi)者組為Group-1未舟,我們只用 --consumer 選項(xiàng):
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic test-topic --group Group-1
注意圈暗,消費(fèi)者的選擇,我們還必須指定消費(fèi)者組裕膀。從生產(chǎn)者或消費(fèi)者角色刪除主體员串,我們只需要通過(guò)--remove選項(xiàng)。
2.6 kafka的一些client的設(shè)置
-
librdkafkacpp
std::string broker_list = "localhost:9092,localhost:9093,localhost:9094"; RdKafka::Conf* global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); global_conf->set("metadata.broker.list", broker_list, err_string); global_conf->set("security.protocol", "sasl_plaintext", err_string); global_conf->set("sasl.mechanisms", "PLAIN", err_string); global_conf->set("sasl.username", username.c_str(), err_string); global_conf->set("sasl.password", password.c_str(), err_string);