springboot 之集成kafka

前言

一直沒(méi)機(jī)會(huì)做spring生態(tài)圈的框架,公司選擇的是一些小眾的微服務(wù),鑒于此考慮筷登,豐富自己的技術(shù)棧魂爪,花了兩天時(shí)間從網(wǎng)上各網(wǎng)站上學(xué)習(xí)了springboot一些基礎(chǔ)知識(shí)先舷。
本章只介紹springboot微服務(wù)集成kafka,跟rabbitmq用法相同滓侍,作為一個(gè)消息中間件收發(fā)消息使用蒋川,本章僅介紹集成后的基礎(chǔ)用法,研究不深撩笆,請(qǐng)各位諒解捺球。

環(huán)境準(zhǔn)備

  • IntelliJ IDEA
  • 前一章中搭建的微服務(wù)框架
  • 前一章之后缸浦,對(duì)目錄結(jié)構(gòu)進(jìn)行了優(yōu)化,將config相關(guān)類都放到demo.config包下

開(kāi)始集成

  1. pom.xml中增加依賴包


    依賴包.png
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

加入依賴包后最好先執(zhí)行mvn clean install編一把氮兵,把所需依賴包下下來(lái)裂逐,后續(xù)寫(xiě)代碼的時(shí)候直接就可以引了。

  1. application.yml中引入kafka相關(guān)配置


    kafka服務(wù)配置.png
spring:
  kafka:
    bootstrap-servers: 172.101.203.33:9092
    producer:
      # 發(fā)生錯(cuò)誤后泣栈,消息重發(fā)的次數(shù)卜高。
      retries: 0
      #當(dāng)有多個(gè)消息需要被發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者會(huì)把它們放在同一個(gè)批次里南片。該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小掺涛,按照字節(jié)數(shù)計(jì)算。
      batch-size: 16384
      # 設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小疼进。
      buffer-memory: 33554432
      # 鍵的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生產(chǎn)者在成功寫(xiě)入消息之前不會(huì)等待任何來(lái)自服務(wù)器的響應(yīng)薪缆。
      # acks=1 : 只要集群的首領(lǐng)節(jié)點(diǎn)收到消息,生產(chǎn)者就會(huì)收到一個(gè)來(lái)自服務(wù)器成功響應(yīng)伞广。
      # acks=all :只有當(dāng)所有參與復(fù)制的節(jié)點(diǎn)全部收到消息時(shí)矮燎,生產(chǎn)者才會(huì)收到一個(gè)來(lái)自服務(wù)器的成功響應(yīng)。
      acks: 1
    consumer:
      # 自動(dòng)提交的時(shí)間間隔 在spring boot 2.X 版本中這里采用的是值的類型為Duration 需要符合特定的格式赔癌,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 該屬性指定了消費(fèi)者在讀取一個(gè)沒(méi)有偏移量的分區(qū)或者偏移量無(wú)效的情況下該作何處理:
      # latest(默認(rèn)值)在偏移量無(wú)效的情況下诞外,消費(fèi)者將從最新的記錄開(kāi)始讀取數(shù)據(jù)(在消費(fèi)者啟動(dòng)之后生成的記錄)
      # earliest :在偏移量無(wú)效的情況下,消費(fèi)者將從起始位置讀取分區(qū)的記錄
      auto-offset-reset: earliest
      # 是否自動(dòng)提交偏移量灾票,默認(rèn)值是true,為了避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失峡谊,可以把它設(shè)置為false,然后手動(dòng)提交偏移量
      enable-auto-commit: false
      # 鍵的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在偵聽(tīng)器容器中運(yùn)行的線程數(shù)。
      concurrency: 5
      #listner負(fù)責(zé)ack刊苍,每調(diào)用一次既们,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false

該配置位于spring下,其中可以配置kafka server的IP:port正什,producer啥纸、consumer、listener的一些配置婴氮,可以參考中文注釋了解其作用

  1. 開(kāi)始寫(xiě)代碼了:demo下新增kafka包斯棒,并在其下面新增producer和consumer


    consumer.png

    producer.png
package com.example.demo.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * 類功能描述:<br>
 * <ul>
 * <li>類功能描述1<br>
 * <li>類功能描述2<br>
 * <li>類功能描述3<br>
 * </ul>
 * 修改記錄:<br>
 * <ul>
 * <li>修改記錄描述1<br>
 * <li>修改記錄描述2<br>
 * <li>修改記錄描述3<br>
 * </ul>
 *
 * @author xuefl
 * @version 5.0 since 2020-01-13
 */
@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP1)
    public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("topic_test 消費(fèi)了: Topic:" + topic + ",Message:" + msg);
            ack.acknowledge();
        }
    }

    @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP2)
    public void topic_test1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("topic_test1 消費(fèi)了: Topic:" + topic + ",Message:" + msg);
            ack.acknowledge();
        }
    }
}
package com.example.demo.kafka;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * 類功能描述:<br>
 * <ul>
 * <li>類功能描述1<br>
 * <li>類功能描述2<br>
 * <li>類功能描述3<br>
 * </ul>
 * 修改記錄:<br>
 * <ul>
 * <li>修改記錄描述1<br>
 * <li>修改記錄描述2<br>
 * <li>修改記錄描述3<br>
 * </ul>
 *
 * @author xuefl
 * @version 5.0 since 2020-01-13
 */
@Component
@Slf4j
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    //自定義topic
    public static final String TOPIC_TEST = "topic.test";

    //
    public static final String TOPIC_GROUP1 = "topic.group1";

    //
    public static final String TOPIC_GROUP2 = "topic.group2";

    public void send(Object obj) {
        String obj2String = JSONObject.toJSONString(obj);
        log.info("準(zhǔn)備發(fā)送消息為:{}", obj2String);
        //發(fā)送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                //發(fā)送失敗的處理
                log.info(TOPIC_TEST + " - 生產(chǎn)者 發(fā)送消息失敗:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                //成功的處理
                log.info(TOPIC_TEST + " - 生產(chǎn)者 發(fā)送消息成功:" + stringObjectSendResult.toString());
            }
        });


    }
}
  1. 增加測(cè)試controller類主经,在controller下新建KafkaController類


    controller.png
  2. 測(cè)試結(jié)果


    swagger接口調(diào)用.png

    測(cè)試結(jié)果.png

    測(cè)試結(jié)果.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末荣暮,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子罩驻,更是在濱河造成了極大的恐慌穗酥,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,454評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異砾跃,居然都是意外死亡骏啰,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)抽高,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)判耕,“玉大人,你說(shuō)我怎么就攤上這事厨内∑盹酰” “怎么了渺贤?”我有些...
    開(kāi)封第一講書(shū)人閱讀 157,921評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵雏胃,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我志鞍,道長(zhǎng)瞭亮,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,648評(píng)論 1 284
  • 正文 為了忘掉前任固棚,我火速辦了婚禮统翩,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘此洲。我一直安慰自己厂汗,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,770評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布呜师。 她就那樣靜靜地躺著娶桦,像睡著了一般。 火紅的嫁衣襯著肌膚如雪汁汗。 梳的紋絲不亂的頭發(fā)上衷畦,一...
    開(kāi)封第一講書(shū)人閱讀 49,950評(píng)論 1 291
  • 那天,我揣著相機(jī)與錄音知牌,去河邊找鬼祈争。 笑死,一個(gè)胖子當(dāng)著我的面吹牛角寸,可吹牛的內(nèi)容都是我干的菩混。 我是一名探鬼主播,決...
    沈念sama閱讀 39,090評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼扁藕,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼墨吓!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起纹磺,我...
    開(kāi)封第一講書(shū)人閱讀 37,817評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤帖烘,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后橄杨,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體秘症,經(jīng)...
    沈念sama閱讀 44,275評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡照卦,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,592評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了乡摹。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片役耕。...
    茶點(diǎn)故事閱讀 38,724評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖聪廉,靈堂內(nèi)的尸體忽然破棺而出瞬痘,到底是詐尸還是另有隱情,我是刑警寧澤板熊,帶...
    沈念sama閱讀 34,409評(píng)論 4 333
  • 正文 年R本政府宣布框全,位于F島的核電站,受9級(jí)特大地震影響干签,放射性物質(zhì)發(fā)生泄漏津辩。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,052評(píng)論 3 316
  • 文/蒙蒙 一容劳、第九天 我趴在偏房一處隱蔽的房頂上張望喘沿。 院中可真熱鬧,春花似錦竭贩、人聲如沸蚜印。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,815評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)窄赋。三九已至,卻和暖如春肪获,著一層夾襖步出監(jiān)牢的瞬間寝凌,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,043評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工孝赫, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留较木,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,503評(píng)論 2 361
  • 正文 我出身青樓青柄,卻偏偏與公主長(zhǎng)得像伐债,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子致开,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,627評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容