Java連接Kafka Kerberos

Java連接Kafka Kerberos

平臺:Ambari hdp 2.6.2.0
開啟keberos

1.配置kafka_client_jaas.conf

  • 注意keyTabprincipal兩個配置項
KafkaServer {  
    com.sun.security.auth.module.Krb5LoginModule required  
    useKeyTab=true 
    keyTab="/etc/security/keytabs/kafka.service.keytab"  
    storeKey=true  
    useTicketCache=false  
    principal="kafka/yamb2@EXAMPLE.COM"; 
};
KafkaClient {  
    com.sun.security.auth.module.Krb5LoginModule required  
    useKeyTab=true  
    keyTab="/etc/security/keytabs/kafka.service.keytab"  
    storeKey=true  
    useTicketCache=false  
    principal="kafka/yamb2@EXAMPLE.COM"; 
};
Client {  
    com.sun.security.auth.module.Krb5LoginModule required  
    useKeyTab=true  
    storeKey=true 
    useTicketCache=false  
    keyTab="/etc/security/keytabs/kafka.service.keytab"  
    principal="kafka/yamb2@EXAMPLE.COM"; 
};

2.kafka Producer Java Demo

  • 在kafka中創(chuàng)建一個topic:cw_test2019042301
  • kafka_client_jaas.conf為上一步配置的
  • krb5.conf為集群上的配置文件垫竞。默認目錄為/etc/krb5.conf
package com.caiw.nuwapi;

import java.util.Date;
import java.util.Properties;
import java.util.UUID;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
 * @Author: caiwei
 * @Description:
 * @Date: create in 2019/4/12 14:54
 */
public class TestProducer {

    public static void main(String... args) throws InterruptedException {
        String topic = "cw_test2019042301";

        System.setProperty("java.security.auth.login.config","D:\\tmp\\161hdp\\kafka_client_jaas.conf");
        System.setProperty("java.security.krb5.conf","D:\\tmp\\161hdp\\krb5.conf");
//        System.setProperty("security.auth.useSubjectCredsOnly","false");

        Properties props = new Properties();
        props.put("bootstrap.servers", "yamb2:6667,yamb3:6667,yamb4:6667");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//        props.put("request.required.acks", "1");
        props.put("security.protocol", "PLAINTEXTSASL");
        props.put("sasl.kerberos.service.name","kafka");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 10000; i++){
            String s = UUID.randomUUID().toString() +" " + i + " Test Date: " + new Date();
            System.out.println(s);
            producer.send(new ProducerRecord<>(topic,s ));
            Thread.sleep(1000);
        }
    }
}

3.kafka Consumer Java Demo

  • 在kafka中創(chuàng)建一個topic:cw_test2019042301
  • kafka_client_jaas.conf為上一步配置的
  • krb5.conf為集群上的配置文件拉盾。默認目錄為/etc/krb5.conf
package com.caiw.nuwapi;

import org.apache.commons.collections.map.HashedMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
/**
 * @Author: caiwei
 * @Description:
 * @Date: create in 2019/4/12 14:54
 */
public class TestConsumer {
    private static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashedMap();
    int count = 0;

    public static void main(String[] args) {
        System.setProperty("java.security.auth.login.config","D:\\tmp\\161hdp\\kafka_client_jaas.conf");
        System.setProperty("java.security.krb5.conf","D:\\tmp\\161hdp\\krb5.conf");
        Properties props = new Properties();
        props.put("group.id", "test_2019042301");// 指定消費者組
        props.put("enable.auto.commit", "true");// 關閉自動提交
        //props.put("auto.commit.interval.ms", "1000");// 自動提交的時間間隔
        // 反序列化消息主鍵
        props.put("auto.offset.reset", "earliest"); // 緩沖大小
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 反序列化消費記錄
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//        props.put("log4j.appender.kafkaAppender.Threshold", "ERROR");

//        props.put("bootstrap.servers", "yamb2:6667,yamb3:6667,yamb4:6667");
        props.put("bootstrap.servers", "192.168.23.162:6667,192.168.23.163:6667,192.168.23.164:6667");
//        props.put("request.required.acks", "1");
        props.put("security.protocol", "PLAINTEXTSASL");
        props.put("sasl.kerberos.service.name","kafka");

        // 創(chuàng)建一個消費者實例對象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 訂閱消費主題集合
        consumer.subscribe(Collections.singletonList("cw_test2019042301"));
        // 實時消費標識
        boolean flag = true;
        while (flag) {
            // 獲取主題消息數據
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records){
                // 循環(huán)打印消息記錄
                currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
                //處理消息
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                //解析消息將消息存儲到Hbase上的表中;
                // consumer.commitSync(currentOffsets);
                //手動提交偏移量
            }


        }
        // 出現異常關閉消費者對象
//      consumer.commitAsync();
//      consumer.commitSync();
        consumer.close();
    }
}
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末岔霸,一起剝皮案震驚了整個濱河市,隨后出現的幾起案子默责,更是在濱河造成了極大的恐慌议经,老刑警劉巖威彰,帶你破解...
    沈念sama閱讀 217,406評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現場離奇詭異悴了,居然都是意外死亡,警方通過查閱死者的電腦和手機违寿,發(fā)現死者居然都...
    沈念sama閱讀 92,732評論 3 393
  • 文/潘曉璐 我一進店門湃交,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人藤巢,你說我怎么就攤上這事搞莺。” “怎么了掂咒?”我有些...
    開封第一講書人閱讀 163,711評論 0 353
  • 文/不壞的土叔 我叫張陵才沧,是天一觀的道長迈喉。 經常有香客問我,道長温圆,這世上最難降的妖魔是什么挨摸? 我笑而不...
    開封第一講書人閱讀 58,380評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮捌木,結果婚禮上油坝,老公的妹妹穿的比我還像新娘。我一直安慰自己刨裆,他們只是感情好澈圈,可當我...
    茶點故事閱讀 67,432評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著帆啃,像睡著了一般瞬女。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上努潘,一...
    開封第一講書人閱讀 51,301評論 1 301
  • 那天诽偷,我揣著相機與錄音,去河邊找鬼疯坤。 笑死报慕,一個胖子當著我的面吹牛,可吹牛的內容都是我干的压怠。 我是一名探鬼主播眠冈,決...
    沈念sama閱讀 40,145評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼菌瘫!你這毒婦竟也來了蜗顽?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,008評論 0 276
  • 序言:老撾萬榮一對情侶失蹤雨让,失蹤者是張志新(化名)和其女友劉穎雇盖,沒想到半個月后,有當地人在樹林里發(fā)現了一具尸體栖忠,經...
    沈念sama閱讀 45,443評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡崔挖,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,649評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現自己被綠了庵寞。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片虚汛。...
    茶點故事閱讀 39,795評論 1 347
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖皇帮,靈堂內的尸體忽然破棺而出卷哩,到底是詐尸還是另有隱情,我是刑警寧澤属拾,帶...
    沈念sama閱讀 35,501評論 5 345
  • 正文 年R本政府宣布将谊,位于F島的核電站冷溶,受9級特大地震影響,放射性物質發(fā)生泄漏尊浓。R本人自食惡果不足惜逞频,卻給世界環(huán)境...
    茶點故事閱讀 41,119評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望栋齿。 院中可真熱鬧苗胀,春花似錦、人聲如沸瓦堵。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽菇用。三九已至澜驮,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間惋鸥,已是汗流浹背杂穷。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留卦绣,地道東北人耐量。 一個月前我還...
    沈念sama閱讀 47,899評論 2 370
  • 正文 我出身青樓,卻偏偏與公主長得像滤港,于是被迫代替她去往敵國和親拴鸵。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,724評論 2 354

推薦閱讀更多精彩內容