kafka增加了賬號(hào)認(rèn)證后標(biāo)志著它向企業(yè)級(jí)發(fā)展邁出了關(guān)鍵的一步败潦,在這個(gè)功能后kafka也終于有了大版本,到現(xiàn)在已經(jīng)演進(jìn)到1.1.0准脂,發(fā)展迅速以至于國(guó)內(nèi)相關(guān)實(shí)踐的資料很少劫扒,或者問(wèn)題較多,經(jīng)過(guò)一番折騰后發(fā)現(xiàn)有點(diǎn)復(fù)雜狸膏,所以把過(guò)程分享給大家沟饥,幫助大家少走彎路,主要使用SASL_PLAINTEXT 實(shí)現(xiàn)認(rèn)證。
系統(tǒng)要求:
centos7.4/7.5? jdk1.8? ?/data為數(shù)據(jù)目錄 各服務(wù)器的防火墻相互允許所有端口訪問(wèn)
服務(wù)器三臺(tái):
server1 172.16.99.35
server2 172.16.99.36
server3 172.16.99.37
下載:
wget?http://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
解壓:
tar -zxvf kafka_2.11-1.1.0.tgz -C /usr/local/
mv?/usr/local/kafka_2.11-1.1.0? /usr/local/kafka
1.zookeeper配置
1.搭建集群
每臺(tái)服務(wù)都執(zhí)行:
mkdir -p ?/data/app/kafka/data/zookeeper
mkdir -p /data/app/logs/kafka/zookeeper
vi?/usr/local/kafka/config/zookeeper.properties
dataDir=/data/app/kafka/data/zookeeper
dataLogDir=/data/app/logs/kafka/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=20
tickTime=2000
initLimit=5
syncLimit=2
server.1=172.16.99.35:2888:3888
server.2=172.16.99.36:2888:3888
server.3=172.16.99.37:2888:3888
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
2.設(shè)置zookeeper主機(jī)id
echo 1 > /data/app/kafka/data/zookeeper/myid
注意:這個(gè)id是zookeeper的主機(jī)標(biāo)示,每個(gè)主機(jī)id不同第二臺(tái)是2 第三臺(tái)是3贤旷。
3.啟動(dòng)
逐次啟動(dòng)3臺(tái)機(jī)器的zookeeper 構(gòu)成一個(gè)集群
cd?/usr/local/kafka
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
2.kafka集群配置
每臺(tái)服務(wù)器都需要重復(fù)以下操作
1.創(chuàng)建kafka登陸認(rèn)證文件
vi?/usr/local/kafka/config/kafka_client_jaas.conf
KafkaClient {
? org.apache.kafka.common.security.plain.PlainLoginModule required
? username="test"
? password="testpwd";
};
vi?/usr/local/kafka/config/kafka_server_jaas.conf
KafkaServer {
? org.apache.kafka.common.security.plain.PlainLoginModule required
? username="admin"
? password="adminpwd"
? user_admin="adminpwd"
? user_test="testpwd"
};
Client {
? org.apache.kafka.common.security.plain.PlainLoginModule required
? username="admin"
? password="adminpwd";
};
vi?/usr/local/kafka/config/kafka_zoo_jaas.conf
Server {
? ? ? ? org.apache.kafka.common.security.plain.PlainLoginModule required
? ? ? ? username="admin"
? ? ? ? password="adminpwd"
? ? ? ? user_admin="adminpwd";
};
2.綁定認(rèn)證文件
修改kafka各項(xiàng)sh腳本,在最后一行之前添加綁定
在/usr/local/kafka/bin/zookeeper-server-start.sh中添加:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/usr/local/kafka/config/kafka_zoo_jaas.conf"
在/usr/local/kafka/bin/kafka-server-start.sh中添加:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf"
在/usr/local/kafka/bin/kafka-console-consumer.sh/kafka-console-producer.sh中添加:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/usr/local/kafka/config/kafka_client_jaas.conf"
3.kafka添加SASL_PLAINTEXT支持
在/usr/local/kafka/config/server.properties文件最后添加:
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
auto.create.topics.enable=false
allow.everyone.if.no.acl.found=false
在/usr/local/kafka/config/producer.properties文件最后添加:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
在/usr/local/kafka/config/consumer.properties文件最后添加:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
4.kafka集群設(shè)置
在/usr/local/kafka/config/server.properties修改如下的值:
zookeeper.connect=172.16.99.35:2181,172.16.99.36:2181,172.16.99.37:2181
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://本機(jī)內(nèi)網(wǎng)IP:9092? (當(dāng)需要提供外網(wǎng)服務(wù)時(shí)則為本機(jī)外網(wǎng)IP,防火墻須打開外網(wǎng)IP相互訪問(wèn)9092端口)
log.dirs=/data/app/logs/kafka/kafka-logs
broker.id=0? (數(shù)字广料,每個(gè)服務(wù)都是唯一值)
5.啟動(dòng)
依次啟動(dòng)
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
3.kafka測(cè)試
創(chuàng)建topic:
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test_topic replication-factor
查看所有topic:
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
創(chuàng)建有權(quán)限的消費(fèi)者
/usr/local/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Create --cluster kafka-cluster (consumer不包括create 會(huì)導(dǎo)致無(wú)法讀,所以需要單獨(dú)加幼驶,但producer卻會(huì)自動(dòng)加)
/usr/local/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --consumer --topic test_topic --group test-consumer-group
創(chuàng)建生產(chǎn)者
/usr/local/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --producer --topictest_topic
生產(chǎn)數(shù)據(jù)
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 172.16.99.35:9092 --topic test_topic --producer.config=/usr/local/kafka/config/producer.properties
消費(fèi)數(shù)據(jù):
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.16.99.36:9092 --topic test_topic --from-beginning --consumer.config=/usr/local/kafka/config/consumer.properties??
4.kafka壓力測(cè)試
在/usr/local/kafka/bin/kafka-consumer-perf-test.sh/kafka-producer-perf-test.sh倒數(shù)第一行前添加:
export KAFKA_OPTS=" -Djava.security.auth.login.config=/usr/local/kafka/config/kafka_client_jaas.conf"
執(zhí)行壓測(cè)腳本:
/usr/local/kafka/bin/kafka-consumer-perf-test.sh --consumer.config /usr/local/kafka/config/consumer.properties --broker-list 172.16.99.35:9092,172.16.99.36:9092,172.16.99.37:9092 -messages 50000 --topic test_topic --group=test-consumer-group --threads 1
bin/kafka-producer-perf-test.sh --topic test_topic --num-records 5000000 --record-size 100 --throughput -1 --producer.config config/producer.properties --producer-props acks=1 bootstrap.servers=172.16.108.128:9092,172.16.108.139:9092,172.16.108.136:9092 buffer.memory=67108864 batch.size=8196