背景:最近公司因?yàn)橛玫脑品?wù)器,需要保證kafka的安全性熄诡。可喜的是kafka0.9開始诗力,已經(jīng)支持權(quán)限控制了凰浮。網(wǎng)上中文資料又少,特此基于kafka0.9苇本,記錄kafaka的權(quán)限控制 ( flume需要1.7及其以上才支持kafka的SSL認(rèn)證)袜茧。
下面各位看官跟著小二一起開始kafak權(quán)限認(rèn)證之旅吧!嘎嘎嘎瓣窄!
介紹:kafka權(quán)限控制整體可以分為三種類型:1.基于SSL(CDH 5.8不支持)2.基于Kerberos(此認(rèn)證一般基于CDH笛厦,本文不與討論)3.基于acl的 (CDH5.8中的kafka2.X不支持 )
本文主要基于apace版本的,實(shí)現(xiàn)1和3俺夕,也是用的最多的展開討論裳凸。
統(tǒng)一說明:在本文中&符號(hào)表示注釋
一,準(zhǔn)備工作組件分布kafka centos11劝贸,centos12姨谷,centos13zoopeeker centos11,centos12映九,centos13
二梦湘、在kafka集群任選一臺(tái)機(jī)子 ( 先介紹基于SSL的 )
密碼統(tǒng)一為123456
&Step 1 Generate SSL key and certificate for each Kafka broker
keytool -keystore server.keystore.jks -alias centos11 -validity 365 -genkey
?
%Step 2 Creating your own CA
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
?
&Step 3 Signing the certificate
keytool -keystore server.keystore.jks -alias centos11 -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:123456
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias centos11 -import -file cert-signed
三、其他的kafka集群
&機(jī)器centos13 centos12
keytool -keystore kafka.client.keystore.jks -alias centos13 -validity 365 -genkey
keytool -keystore kafka.client.keystore.jks -alias centos13 -certreq -file cert-file
cp cert-file cert-file-centos13
¢os11
scp ./ca* ce* server* root@centos13:/opt/kafka_2.10/
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-centos13 -out cert-signed-centos13 -days 365 -CAcreateserial -passin pass:123456
keytool -keystore kafkacentos13.client.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafkacentos13.client.keystore.jks -alias centos13 -import -file cert-signed-centos13
?
?
?
rm -rf producer.properties
echo "bootstrap.servers=centos13:9093" >> producer.properties
echo "security.protocol=SSL" >> producer.properties
echo "ssl.truststore.location=/opt/kafka_2.10/kafkacentos12.client.keystore.jks">> producer.properties
echo "ssl.truststore.password=123456">> producer.properties
echo "ssl.keystore.location=/opt/kafka_2.10/server.keystore.jks">> producer.properties
echo "ssl.keystore.password=123456">> producer.properties
echo "ssl.key.password=123456">> producer.properties
4.驗(yàn)證:
openssl s_client -debug -connect localhost:9093 -tls1
?
output:
-----BEGIN CERTIFICATE-----
{variable sized random bytes}
-----END CERTIFICATE-----
subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com
5.使用:
bin/kafka-console-consumer.sh --bootstrap-server kafka2:9093 --topic test --new-consumer --consumer.config config/producer.properties
?
bin/kafka-console-producer.sh --broker-list centos11:9093 --topic test --producer.config ./config/producer.properties
?
bin/kafka-console-consumer.sh --bootstrap-server centos11:9093 --topic test --new-consumer --consumer.config ./config/producer.properties
?
bin/kafka-console-consumer.sh --bootstrap-server centos13:9093 --topic test --new-consumer --consumer.config ./config/producer.properties --from-beginning
6.基于ACL
server.properties中加配置
allow.everyone.if.no.acl.found=true
super.users=User:root
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
principal.builder.class=org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
7.ACL的簡(jiǎn)單使用:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=centos11:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic test
?
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=centos11:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic test
8.Java Demo需要將server.keystore.jks、client.truststore.jks從任一臺(tái)機(jī)器上拷貝下來即可践叠。
SSL is supported only for the new Kafka Producer and Consumer, the older API is not supported
ConsumerDemo
package xmhd.examples;
?
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Arrays;
import java.util.Properties;
/**
- Created by shengjk1.
- blog address :http://blog.csdn.net/jsjsjs1789
- 生產(chǎn)者可以保證權(quán)限認(rèn)證
- SSL is supported only for the new Kafka Producer and Consumer, the older API is not supported
/
public class ConsumerZbdba {
public static void main(String[] args) {
// new ConsumerZbdba("test").start();// 使用kafka集群中創(chuàng)建好的主題 test
Properties props = new Properties();
/ 定義kakfa 服務(wù)的地址言缤,不需要將所有broker指定上 /
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos11:9093;centos13:9093;centos12:9093");
/ 制定consumer group /
props.put("group.id", "test");
props.put("auto.offset.reset","earliest");
/ 是否自動(dòng)確認(rèn)offset /
// props.put("enable.auto.commit", "true");
// props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApiKey");
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "F:\server.keystore.jks");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "123456");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "F:\client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "123456");
props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
//
?
?
/// 自動(dòng)確認(rèn)offset的時(shí)間間隔 /
// props.put("auto.commit.interval.ms", "1000");
// props.put("session.timeout.ms", "30000");
/ key的序列化類 /
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/ value的序列化類 /
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/ 定義consumer /
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
/ 消費(fèi)者訂閱的topic, 可同時(shí)訂閱多個(gè) /
consumer.subscribe(Arrays.asList("test"));
?
/ 讀取數(shù)據(jù),讀取超時(shí)時(shí)間為100ms */
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()+"\n");
}
}
}
ProducerDemo
package xmhd.examples;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SslConfigs;
import java.util.Properties;
/**
- Created by shengjk1.
- blog address :http://blog.csdn.net/jsjsjs1789
- 生產(chǎn)者可以保證權(quán)限認(rèn)證
*/
public class ProducerZbdba {
public static void main(String[] args) {
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos12:9093");
// producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "myApiKey");
producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "F:\server.keystore.jks");
producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "123456");
producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "F:\client.truststore.jks");
producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "123456");
producerProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(producerProps);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i)));
System.out.println("test");
producer.close();
}
}
8.1 flume1.7 的配置 (基于kafka SSL認(rèn)證)
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
?
?
tier1.sources.source1.type = exec
tier1.sources.source1.command = tail -F -n+1 /opt/scan.log
tier1.sources.source1.channels = channel1
?
tier1.channels.channel1.type = memory
tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.topic = test
tier1.sinks.sink1.kafka.bootstrap.servers = centos11:9093,centos12:9093,centos13:9093
tier1.channels.channel1.kafka.bootstrap.servers = centos11:9093,centos12:9093,centos13:9093
tier1.sinks.sink1.requiredAcks = 1
tier1.sinks.sink1.batchSize = 100
?
?
tier1.sinks.sink1.kafka.producer.security.protocol = SSL
tier1.sinks.sink1.kafka.producer.ssl.truststore.type=JKS
tier1.sinks.sink1.kafka.producer.ssl.truststore.location = /opt/kafka_2.10/server.truststore.jks
tier1.sinks.sink1.kafka.producer.ssl.truststore.password =123456
tier1.sinks.sink1.kafka.producer.security.protocol = SSL
tier1.sinks.sink1.kafka.producer.ssl.keystore.location = /opt/kafka_2.10/server.keystore.jks
tier1.sinks.sink1.kafka.producer.ssl.keystore.password =123456
&tier1.sinks.sink1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
?
?
tier1.sinks.sink1.channel = channel1
tier1.channels.channel1.capacity = 100
?
?
tier1.channels.channel1.kafka.producer.security.protocol = SSL
tier1.channels.channel1.kafka.producer.ssl.truststore.type=JKS
tier1.channels.channel1.kafka.producer.ssl.truststore.location = /opt/kafka_2.10/server.truststore.jks
tier1.channels.channel1.kafka.producer.ssl.truststore.password =123456
tier1.channels.channel1.kafka.producer.security.protocol = SSL
tier1.channels.channel1.kafka.producer.ssl.keystore.location = /opt/kafka_2.10/server.keystore.jks
tier1.channels.channel1.kafka.producer.ssl.keystore.password =123456
tier1.channels.channel1.kafka.producer.ssl.endpoint.identification.algorithm = HTTPS
?
?
tier1.channels.channel1.kafka.consumer.security.protocol = SSL
tier1.channels.channel1.kafka.consumer.ssl.truststore.location = /opt/kafka_2.10/server.truststore.jks
tier1.channels.channel1.kafka.consumer.ssl.truststore.password =123456
tier1.channels.channel1.kafka.consumer.security.protocol = SSL
tier1.channels.channel1.kafka.consumer.ssl.keystore.location = /opt/kafka_2.10/server.truststore.jks
tier1.channels.channel1.kafka.consumer.ssl.keystore.password =123456
tier1.channels.channel1.kafka.consumer.ssl.endpoint.identification.algorithm = HTTPS
9.kafak Server.properties最終版的
三臺(tái)機(jī)子需保證一樣禁灼,centos11為機(jī)器名管挟,根據(jù)需要自行修改
broker.id=0
############################# Socket Server Settings #############################
&這一點(diǎn)可能需要特別的注意,PLAINTEXT注釋掉之后,一些基本的kafka腳本都不在能用了
&listeners=PLAINTEXT://centos11:9092,SSL://centos11:9093
listeners=SSL://centos11:9093
advertised.listeners=SSL://centos11:9093
ssl.keystore.location=/opt/kafka_2.10/server.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=/opt/kafka_2.10/server.truststore.jks
ssl.truststore.password=123456
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
security.inter.broker.protocol=SSL
?
&acl
allow.everyone.if.no.acl.found=true
super.users=User:root
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
principal.builder.class=org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
?
host.name=centos11
advertised.host.name=centos11
?
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dirs=/opt/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=centos11:2181,centos12:2181,centos13:2181
zookeeper.connection.timeout.ms=6000</pre>
kafka producer.propertiescentos11為機(jī)器名弄捕,根據(jù)需求自行修改
bootstrap.servers=centos11:9093
security.protocol=SSL
ssl.truststore.location=/opt/kafka_2.10/client.truststore.jks
ssl.truststore.password=123456
ssl.keystore.location=/opt/kafka_2.10/server.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456</pre>
10僻孝、參考網(wǎng)址:具體細(xì)節(jié)可參考官網(wǎng)!
11.通信協(xié)議的支持情況http://kafka.apache.org/090/documentation.html#security_authzhttp://www.cloudera.com/documentation/kafka/latest/topics/kafka_security.htmlhttp://blog.csdn.net/zbdba/article/details/52458654
12.擴(kuò)展閱讀:
關(guān)于SSL原理 http://www.linuxde.net/2012/03/8301.html