spring boot與kafka集成

引入相關(guān)依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.1.1.RELEASE</version>
</dependency>

從依賴項(xiàng)的引入即可看出林艘,當(dāng)前spring boot(1.4.2)還不支持完全以配置項(xiàng)的配置來實(shí)現(xiàn)與kafka的無縫集成徽惋。也就意味著必須通過java config的方式進(jìn)行手工配置簇搅。

定義kafka基礎(chǔ)配置

與redisTemplate及jdbcTemplate等類似浮驳。spring同樣提供了org.springframework.kafka.core.KafkaTemplate作為kafka相關(guān)api操作的入口。

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
@EnableKafka
public class KafkaProducerConfig {

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.179.200:9092");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }
}

KafkaTemplate依賴于ProducerFactory麸祷,而創(chuàng)建ProducerFactory時則通過一個Map指定kafka相關(guān)配置參數(shù)选脊。通過KafkaTemplate對象即可實(shí)現(xiàn)消息發(fā)送杭抠。

kafkaTemplate.send("test-topic", "hello");
or
kafkaTemplate.send("test-topic", "key-1", "hello");

監(jiān)聽消息配置

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.179.200:9092");
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return propsMap;
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }
}

實(shí)現(xiàn)消息監(jiān)聽的最終目標(biāo)是得到監(jiān)聽器對象。該監(jiān)聽器對象自行實(shí)現(xiàn)恳啥。

import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    
    import java.util.Optional;
    
    public class Listener {

    @KafkaListener(topics = {"test-topic"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("listen1 " + message);
        }
    }
}

只需用@KafkaListener指定哪個方法處理消息即可偏灿。同時指定該方法用于監(jiān)聽kafka中哪些topic。

注意事項(xiàng)

定義監(jiān)聽消息配置時钝的,GROUP_ID_CONFIG配置項(xiàng)的值用于指定消費(fèi)者組的名稱翁垂,如果同組中存在多個監(jiān)聽器對象則只有一個監(jiān)聽器對象能收到消息。

@KafkaListener中topics屬性用于指定kafka topic名稱扁藕,topic名稱由消息生產(chǎn)者指定沮峡,也就是由kafkaTemplate在發(fā)送消息時指定。

KEY_DESERIALIZER_CLASS_CONFIG與VALUE_DESERIALIZER_CLASS_CONFIG指定key和value的編碼亿柑、解碼策略。kafka用key值確定value存放在哪個分區(qū)中棍弄。

后記

時間是解決問題的有效手段之一望薄。

在spring boot 1.5版本中即可實(shí)現(xiàn)spring boot與kafka Auto-configuration

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市呼畸,隨后出現(xiàn)的幾起案子痕支,更是在濱河造成了極大的恐慌,老刑警劉巖蛮原,帶你破解...
    沈念sama閱讀 212,542評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件卧须,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)花嘶,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,596評論 3 385
  • 文/潘曉璐 我一進(jìn)店門笋籽,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人椭员,你說我怎么就攤上這事车海。” “怎么了隘击?”我有些...
    開封第一講書人閱讀 158,021評論 0 348
  • 文/不壞的土叔 我叫張陵侍芝,是天一觀的道長。 經(jīng)常有香客問我埋同,道長州叠,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,682評論 1 284
  • 正文 為了忘掉前任凶赁,我火速辦了婚禮留量,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘哟冬。我一直安慰自己楼熄,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,792評論 6 386
  • 文/花漫 我一把揭開白布浩峡。 她就那樣靜靜地躺著可岂,像睡著了一般。 火紅的嫁衣襯著肌膚如雪翰灾。 梳的紋絲不亂的頭發(fā)上缕粹,一...
    開封第一講書人閱讀 49,985評論 1 291
  • 那天,我揣著相機(jī)與錄音纸淮,去河邊找鬼平斩。 笑死,一個胖子當(dāng)著我的面吹牛咽块,可吹牛的內(nèi)容都是我干的绘面。 我是一名探鬼主播,決...
    沈念sama閱讀 39,107評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼侈沪,長吁一口氣:“原來是場噩夢啊……” “哼揭璃!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起亭罪,我...
    開封第一講書人閱讀 37,845評論 0 268
  • 序言:老撾萬榮一對情侶失蹤瘦馍,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后应役,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體情组,經(jīng)...
    沈念sama閱讀 44,299評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡燥筷,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,612評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了院崇。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片肆氓。...
    茶點(diǎn)故事閱讀 38,747評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖亚脆,靈堂內(nèi)的尸體忽然破棺而出做院,到底是詐尸還是另有隱情,我是刑警寧澤濒持,帶...
    沈念sama閱讀 34,441評論 4 333
  • 正文 年R本政府宣布键耕,位于F島的核電站,受9級特大地震影響柑营,放射性物質(zhì)發(fā)生泄漏屈雄。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,072評論 3 317
  • 文/蒙蒙 一官套、第九天 我趴在偏房一處隱蔽的房頂上張望酒奶。 院中可真熱鬧,春花似錦奶赔、人聲如沸惋嚎。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,828評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽另伍。三九已至,卻和暖如春绞旅,著一層夾襖步出監(jiān)牢的瞬間摆尝,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,069評論 1 267
  • 我被黑心中介騙來泰國打工因悲, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留堕汞,地道東北人。 一個月前我還...
    沈念sama閱讀 46,545評論 2 362
  • 正文 我出身青樓晃琳,卻偏偏與公主長得像讯检,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子蝎土,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,658評論 2 350

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理视哑,服務(wù)發(fā)現(xiàn),斷路器誊涯,智...
    卡卡羅2017閱讀 134,637評論 18 139
  • Spring Boot 參考指南 介紹 轉(zhuǎn)載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 46,778評論 6 342
  • 隨著spring boot 1.5版本的發(fā)布,在spring項(xiàng)目中與kafka集成更為簡便蒜撮。 引入依賴 具體spr...
    SamHxm閱讀 17,374評論 9 40
  • Kafka入門經(jīng)典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語閱讀 10,812評論 4 54
  • kafka的定義:是一個分布式消息系統(tǒng)暴构,由LinkedIn使用Scala編寫跪呈,用作LinkedIn的活動流(Act...
    時待吾閱讀 5,311評論 1 15