Java連接Kafka Kerberos
平臺:
Ambari hdp 2.6.2.0
開啟keberos
1.配置kafka_client_jaas.conf
- 注意
keyTab
和principal
兩個配置項
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/kafka.service.keytab"
storeKey=true
useTicketCache=false
principal="kafka/yamb2@EXAMPLE.COM";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/kafka.service.keytab"
storeKey=true
useTicketCache=false
principal="kafka/yamb2@EXAMPLE.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=false
keyTab="/etc/security/keytabs/kafka.service.keytab"
principal="kafka/yamb2@EXAMPLE.COM";
};
2.kafka Producer Java Demo
- 在kafka中創(chuàng)建一個topic:
cw_test2019042301
-
kafka_client_jaas.conf
為上一步配置的 -
krb5.conf
為集群上的配置文件垫竞。默認目錄為/etc/krb5.conf
package com.caiw.nuwapi;
import java.util.Date;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* @Author: caiwei
* @Description:
* @Date: create in 2019/4/12 14:54
*/
public class TestProducer {
public static void main(String... args) throws InterruptedException {
String topic = "cw_test2019042301";
System.setProperty("java.security.auth.login.config","D:\\tmp\\161hdp\\kafka_client_jaas.conf");
System.setProperty("java.security.krb5.conf","D:\\tmp\\161hdp\\krb5.conf");
// System.setProperty("security.auth.useSubjectCredsOnly","false");
Properties props = new Properties();
props.put("bootstrap.servers", "yamb2:6667,yamb3:6667,yamb4:6667");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// props.put("request.required.acks", "1");
props.put("security.protocol", "PLAINTEXTSASL");
props.put("sasl.kerberos.service.name","kafka");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10000; i++){
String s = UUID.randomUUID().toString() +" " + i + " Test Date: " + new Date();
System.out.println(s);
producer.send(new ProducerRecord<>(topic,s ));
Thread.sleep(1000);
}
}
}
3.kafka Consumer Java Demo
- 在kafka中創(chuàng)建一個topic:
cw_test2019042301
-
kafka_client_jaas.conf
為上一步配置的 -
krb5.conf
為集群上的配置文件拉盾。默認目錄為/etc/krb5.conf
package com.caiw.nuwapi;
import org.apache.commons.collections.map.HashedMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
/**
* @Author: caiwei
* @Description:
* @Date: create in 2019/4/12 14:54
*/
public class TestConsumer {
private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashedMap();
int count = 0;
public static void main(String[] args) {
System.setProperty("java.security.auth.login.config","D:\\tmp\\161hdp\\kafka_client_jaas.conf");
System.setProperty("java.security.krb5.conf","D:\\tmp\\161hdp\\krb5.conf");
Properties props = new Properties();
props.put("group.id", "test_2019042301");// 指定消費者組
props.put("enable.auto.commit", "true");// 關閉自動提交
//props.put("auto.commit.interval.ms", "1000");// 自動提交的時間間隔
// 反序列化消息主鍵
props.put("auto.offset.reset", "earliest"); // 緩沖大小
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 反序列化消費記錄
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// props.put("log4j.appender.kafkaAppender.Threshold", "ERROR");
// props.put("bootstrap.servers", "yamb2:6667,yamb3:6667,yamb4:6667");
props.put("bootstrap.servers", "192.168.23.162:6667,192.168.23.163:6667,192.168.23.164:6667");
// props.put("request.required.acks", "1");
props.put("security.protocol", "PLAINTEXTSASL");
props.put("sasl.kerberos.service.name","kafka");
// 創(chuàng)建一個消費者實例對象
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 訂閱消費主題集合
consumer.subscribe(Collections.singletonList("cw_test2019042301"));
// 實時消費標識
boolean flag = true;
while (flag) {
// 獲取主題消息數據
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records){
// 循環(huán)打印消息記錄
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
//處理消息
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
//解析消息將消息存儲到Hbase上的表中;
// consumer.commitSync(currentOffsets);
//手動提交偏移量
}
}
// 出現異常關閉消費者對象
// consumer.commitAsync();
// consumer.commitSync();
consumer.close();
}
}