服務(wù)端
-
在服務(wù)器節(jié)點(diǎn)配置認(rèn)證文件:
文件路徑:kafka/config/kafka_server_jaas.conf
文件內(nèi)容:KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_admin="admin" user_rex="123456" user_alice="123456" user_lucy="123456"; };
注意配置文件中的兩個分號的位置,多一不可,缺一不可舅踪。
-
修改服務(wù)器節(jié)點(diǎn)的啟動配置文件:
復(fù)制kafka/config/server.properties
為kafka/config/server-sasl.properties
绕沈,在文件最末尾添加:listeners=SASL_PLAINTEXT://localhost:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer super.users=User:admin
-
修改服務(wù)器節(jié)點(diǎn)的啟動腳本:
復(fù)制kafka/bin/kafka-server-start.sh
為kafka/bin/kafka-server-start-sasl.sh
,將認(rèn)證信息配置到 kafka 服務(wù)器節(jié)點(diǎn)的 JVM 啟動參數(shù)中:if [ "x$KAFKA_OPTS" ]; then export KAFKA_OPTS="-Djava.security.auth.login.config=/home/weijie/myprojects/kafka_2.12-2.6.0/config/kafka_server_jaas.conf" fi
啟動 zookeeper:
./bin/zookeeper-server-start.sh config/zookeeper.properties
以安全認(rèn)證的方式啟動 kafka-server:
./bin/kafka-server-start-sasl.sh config/server-sasl.properties
創(chuàng)建一個 topic:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
生產(chǎn)者 + 消費(fèi)者
-
配置 生產(chǎn)者/消費(fèi)者 的認(rèn)證信息:
文件路徑:kafka/config/kafka_client_jaas.conf
文件內(nèi)容:KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; };
注意配置文件中的兩個分號的位置融蹂,多一不可旺订,缺一不可弄企。
-
修改 生產(chǎn)者 的啟動配置文件:
復(fù)制kafka/config/producer.properties
為kafka/config/producer-sasl.properties
在文件最末尾添加:security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN
-
修改 消費(fèi)者 的啟動配置文件:
復(fù)制kafka/config/consumer.properties
為kafka/config/consumer-sasl.properties
在文件最末尾添加:security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN
-
修改生產(chǎn)者的啟動腳本:
復(fù)制kafka/bin/kafka-console-producer.sh
為kafka/bin/kafka-console-producer-sasl.sh
,將認(rèn)證信息配置到 kafka 生產(chǎn)者的 JVM 啟動參數(shù)中:if [ "x$KAFKA_OPTS" ]; then export KAFKA_OPTS="-Djava.security.auth.login.config=/home/weijie/myprojects/kafka_2.12-2.6.0/config/kafka_client_jaas.conf" fi
-
修改消費(fèi)者的啟動腳本:
復(fù)制kafka/bin/kafka-console-consumer.sh
為kafka/bin/kafka-console-consumer-sasl.sh
耸峭,將認(rèn)證信息配置到 kafka 生產(chǎn)者的 JVM 啟動參數(shù)中:if [ "x$KAFKA_OPTS" ]; then export KAFKA_OPTS="-Djava.security.auth.login.config=/home/weijie/myprojects/kafka_2.12-2.6.0/config/kafka_client_jaas.conf" fi
-
以安全認(rèn)證的方式啟動 kafka-producer:
./bin/kafka-console-producer-sasl.sh --broker-list localhost:9092 --topic test --producer.config config/producer-sasl.properties
發(fā)送消息測試:
生產(chǎn)者發(fā)送消息測試.png -
以安全認(rèn)證的方式啟動 kafka-consumer:
./bin/kafka-console-consumer-sasl.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer-sasl.properties
消費(fèi)者接收消息.png
java client
- maven
pom.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.6.0</version>
</dependency>
-
java client 中添加 SASL 設(shè)置信息:
Java client consumer properties配置.png注意
sasl.jaas.config
配置中的分號必不可少桩蓉。package kafka; import java.time.Duration; import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TestKafkaSasl { private static final Logger logger = LoggerFactory.getLogger(TestKafkaSasl.class); public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test_group"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // sasl.jaas.config的配置, 結(jié)尾分號必不可少. props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";"); props.setProperty("security.protocol", "SASL_PLAINTEXT"); props.setProperty("sasl.mechanism", "PLAIN"); @SuppressWarnings("resource") KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("test")); while (true) { try { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, partition = %d, key = %s, value = %s%n", record.offset(), record.partition(), record.key(), record.value()); logger.info("offset = {}, partition = {}, key = {}, value = {}", record.offset(), record.partition(), record.key(), record.value()); } } catch (Exception e) { e.printStackTrace(); logger.error(e.getMessage()); } } } }
運(yùn)行測試:
繼續(xù)生產(chǎn)數(shù)據(jù).pngjava client sasl消費(fèi)數(shù)據(jù)測試.png
可能遇到的問題
-
生產(chǎn)者進(jìn)程啟動報錯
生產(chǎn)者啟動報錯.png
解決方案:以安全認(rèn)證的方式啟動 kafka-producer,別忘記添加設(shè)置配置文件參數(shù)劳闹。./bin/kafka-console-producer-sasl.sh --broker-list localhost:9092 --topic test --producer.config config/producer-sasl.properties
-
消費(fèi)者進(jìn)程啟動報錯
消費(fèi)者啟動報錯.png
解決方案:以安全認(rèn)證的方式啟動 kafka-consumer院究,別忘記添加設(shè)置配置文件參數(shù)。./bin/kafka-console-consumer-sasl.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer-sasl.properties