SpringBoot集成Kafka

本篇主要講解SpringBoot 如何集成Kafka 榨崩,并且簡單的 編寫了一個(gè)Demo 來測試 發(fā)送和消費(fèi)功能

前言

選擇的版本如下:

springboot : 2.3.4.RELEASE

spring-kafka : 2.5.6.RELEASE

kafka : 2.5.1

zookeeper : 3.4.14

本Demo 使用的是 SpringBoot 比較高的版本 SpringBoot 2.3.4.RELEASE 它會(huì)引入 spring-kafka 2.5.6 RELEASE 谴垫,對應(yīng)了版本關(guān)系中的

Spring Boot 2.3 users should use 2.5.x (Boot dependency management will use the correct version).

spring和 kafka 的版本 關(guān)系

https://spring.io/projects/spring-kafka

1.搭建Kafka 和 Zookeeper 環(huán)境

搭建kafka 和 zookeeper 環(huán)境 并且啟動(dòng) 它們

2.創(chuàng)建Demo 項(xiàng)目引入spring-kafka

2.1 pom 文件

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
</dependency>

2.2 配置application.yml

spring:
  kafka:
    bootstrap-servers: 192.168.25.6:9092  #bootstrap-servers:連接kafka的地址,多個(gè)地址用逗號(hào)分隔
    consumer:
      group-id: myGroup
      enable-auto-commit: true
      auto-commit-interval: 100ms
      properties:
        session.timeout.ms: 15000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
    producer:
      retries: 0 #若設(shè)置大于0的值母蛛,客戶端會(huì)將發(fā)送失敗的記錄重新發(fā)送
      batch-size: 16384 #當(dāng)將多個(gè)記錄被發(fā)送到同一個(gè)分區(qū)時(shí)翩剪, Producer 將嘗試將記錄組合到更少的請求中。這有助于提升客戶端和服務(wù)器端的性能。這個(gè)配置控制一個(gè)批次的默認(rèn)大小(以字節(jié)為單位)妻枕。16384是缺省的配置
      buffer-memory: 33554432 #Producer 用來緩沖等待被發(fā)送到服務(wù)器的記錄的總字節(jié)數(shù)杭棵,33554432是缺省配置
      key-serializer: org.apache.kafka.common.serialization.StringSerializer #關(guān)鍵字的序列化類
      value-serializer: org.apache.kafka.common.serialization.StringSerializer #值的序列化類

2.3 定義消息體Message

/**
 * @author johnny
 * @create 2020-09-23 上午9:21
 **/
@Data
public class Message {


    private Long id;

    private String msg;

    private Date sendTime;
}

2.4 定義KafkaSender

主要利用 KafkaTemplate 來發(fā)送消息 ,將消息封裝成Message 并且進(jìn)行 轉(zhuǎn)化成Json串 發(fā)送到Kafka中

@Component
@Slf4j
public class KafkaSender {

    private final KafkaTemplate<String, String> kafkaTemplate;

    //構(gòu)造器方式注入  kafkaTemplate
    public KafkaSender(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    private Gson gson = new GsonBuilder().create();

    public void send(String msg) {
        Message message = new Message();

        message.setId(System.currentTimeMillis());
        message.setMsg(msg);
        message.setSendTime(new Date());
        log.info("【++++++++++++++++++ message :{}】", gson.toJson(message));
        //對 topic =  hello2 的發(fā)送消息
        kafkaTemplate.send("hello2",gson.toJson(message));
    }

}

2.5 定義KafkaConsumer

在監(jiān)聽的方法上通過注解配置一個(gè)監(jiān)聽器即可椿胯,另外就是指定需要監(jiān)聽的topic

kafka的消息再接收端會(huì)被封裝成ConsumerRecord對象返回筷登,它內(nèi)部的value屬性就是實(shí)際的消息。

@Component
@Slf4j
public class KafkaConsumer {


    @KafkaListener(topics = {"hello2"})
    public void listen(ConsumerRecord<?, ?> record) {

        Optional.ofNullable(record.value())
                .ifPresent(message -> {
                    log.info("【+++++++++++++++++ record = {} 】", record);
                    log.info("【+++++++++++++++++ message = {}】", message);
                });
    }

}

3.測試 效果

提供一個(gè) Http接口調(diào)用 KafkaSender 去發(fā)送消息

3.1 提供Http 測試接口

@RestController
@Slf4j
public class TestController {


    @Autowired
    private KafkaSender kafkaSender;


    @GetMapping("sendMessage/{msg}")
    public void sendMessage(@PathVariable("msg") String msg){
        kafkaSender.send(msg);
    }
}

3.2 啟動(dòng)項(xiàng)目

監(jiān)聽8080 端口

KafkaMessageListenerContainer中有 consumer group = myGroup 有一個(gè) 監(jiān)聽 hello2-0 topic 的 消費(fèi)者

1609896741848.jpg

3.3 調(diào)用Http接口

http://localhost:8080/sendMessage/KafkaTestMsg

1609896783695.jpg

至此 SpringBoot集成Kafka 結(jié)束 哩盲。前方。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末狈醉,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子惠险,更是在濱河造成了極大的恐慌苗傅,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,734評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件班巩,死亡現(xiàn)場離奇詭異渣慕,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)抱慌,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門逊桦,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人抑进,你說我怎么就攤上這事强经。” “怎么了寺渗?”我有些...
    開封第一講書人閱讀 164,133評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵匿情,是天一觀的道長。 經(jīng)常有香客問我信殊,道長炬称,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,532評(píng)論 1 293
  • 正文 為了忘掉前任鸡号,我火速辦了婚禮转砖,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘鲸伴。我一直安慰自己府蔗,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,585評(píng)論 6 392
  • 文/花漫 我一把揭開白布汞窗。 她就那樣靜靜地躺著姓赤,像睡著了一般。 火紅的嫁衣襯著肌膚如雪仲吏。 梳的紋絲不亂的頭發(fā)上不铆,一...
    開封第一講書人閱讀 51,462評(píng)論 1 302
  • 那天,我揣著相機(jī)與錄音裹唆,去河邊找鬼誓斥。 笑死,一個(gè)胖子當(dāng)著我的面吹牛许帐,可吹牛的內(nèi)容都是我干的劳坑。 我是一名探鬼主播,決...
    沈念sama閱讀 40,262評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼成畦,長吁一口氣:“原來是場噩夢啊……” “哼距芬!你這毒婦竟也來了涝开?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,153評(píng)論 0 276
  • 序言:老撾萬榮一對情侶失蹤框仔,失蹤者是張志新(化名)和其女友劉穎舀武,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體离斩,經(jīng)...
    沈念sama閱讀 45,587評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡银舱,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,792評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了捐腿。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片纵朋。...
    茶點(diǎn)故事閱讀 39,919評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖茄袖,靈堂內(nèi)的尸體忽然破棺而出操软,到底是詐尸還是另有隱情,我是刑警寧澤宪祥,帶...
    沈念sama閱讀 35,635評(píng)論 5 345
  • 正文 年R本政府宣布聂薪,位于F島的核電站,受9級(jí)特大地震影響蝗羊,放射性物質(zhì)發(fā)生泄漏藏澳。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,237評(píng)論 3 329
  • 文/蒙蒙 一耀找、第九天 我趴在偏房一處隱蔽的房頂上張望翔悠。 院中可真熱鬧,春花似錦野芒、人聲如沸蓄愁。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽撮抓。三九已至,卻和暖如春摇锋,著一層夾襖步出監(jiān)牢的瞬間丹拯,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評(píng)論 1 269
  • 我被黑心中介騙來泰國打工荸恕, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留乖酬,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,048評(píng)論 3 370
  • 正文 我出身青樓融求,卻偏偏與公主長得像剑刑,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子双肤,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,864評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容