Spring Boot整kafka的極簡(jiǎn)操作

kafka應(yīng)用最少需要兩部分柒室,一部分是producer履磨,另外一部分是consumer,這兩部分可以在一個(gè)應(yīng)用中蛉抓,也可以不在一個(gè)應(yīng)用中。在通常情況下剃诅,為了消費(fèi)性能巷送,可能需要多個(gè)消費(fèi)者,也可能需要多個(gè)生產(chǎn)者矛辕,而消費(fèi)者和生產(chǎn)者可能處于不同的位置或者環(huán)境笑跛,所以本示例將生產(chǎn)者和消費(fèi)者放在不同的應(yīng)用中付魔。

生產(chǎn)者端

引入依賴(lài)

在spring boot中使用kafka生產(chǎn)者端,需要引入如下依賴(lài)

 <dependencies>
    ......
    <!--Spring 的kafka依賴(lài)-->
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
    ......
  </dependencies>

啟用kafka支持

在項(xiàng)目中的配置類(lèi)飞蹂,或者啟動(dòng)類(lèi)上增加@EnableKafka,它會(huì)幫助我們創(chuàng)建一些必要的Bean几苍,包括KafkaTemplate,KafkaMessageListenerContainer等

@EnableKafka
public class KafkaConfig{
}

修改連接配置

修改producer的配置文件,配置連接地址

spring:
  kafka:
    # 指定kafka集群地址陈哑,多為地址用逗號(hào)分割
    bootstrap-servers: dm105:9092,dm106:9092,dm107:9092

創(chuàng)建Topic

如果你的topic還未在kafka中創(chuàng)建妻坝,則可以使用spring-boot自動(dòng)創(chuàng)建主題,只需創(chuàng)建一個(gè)類(lèi)型為NewTopic的bean,并指定topic相關(guān)的信息即可

    /**
     * 新建一個(gè)主題
     *
     * @return
     */
    @Bean
    public NewTopic testTopic() {
        return TopicBuilder.name("test")// 指定主題名稱(chēng)
            .partitions(30) // 指定分區(qū)數(shù)量惊窖,這個(gè)數(shù)量通常要大于消費(fèi)者的數(shù)量刽宪,按消費(fèi)者線(xiàn)程數(shù)計(jì)算
            .replicas(2) // 指定副本數(shù)量
            .compact()
            .build();
    }

基本的配置已經(jīng)完成,接下來(lái)就是發(fā)送消息了界酒。

使用KafkaTemplate發(fā)送消息

接下來(lái)使用kafkaTempalte發(fā)送消息到服務(wù)端,以下是一個(gè)極簡(jiǎn)示例

/**
 * 使用Spring boot test測(cè)試消息發(fā)送
 */
@SpringBootTest
class KafkaDemoApplicationTests {

    /**
     * 注入KafkaTemplate,用于發(fā)送消息
     */
    @Autowired
    private KafkaTemplate template;

    @Test
    public void newMessage() {
        System.out.println("start at " + ZonedDateTime.now() + "");

        for (int i = 0; i < 1000000; i++) {
            long now = System.currentTimeMillis();
            // 調(diào)用template,將消息發(fā)送到kafka
            // 第一個(gè)參數(shù)是topic名稱(chēng)圣拄,第二個(gè)參數(shù)是要發(fā)送的消息內(nèi)容
            template.send("test", "adg" + now);
        }
    }
}

消費(fèi)者端

根據(jù)上面的生產(chǎn)者,需要一個(gè)消費(fèi)者來(lái)消費(fèi)生產(chǎn)者生產(chǎn)的數(shù)據(jù)毁欣。spring boot整合kafka的消費(fèi)者也非常方便

引入依賴(lài)

生產(chǎn)者和消費(fèi)者的依賴(lài)是一致的庇谆。在此不再贅述

啟用kafka支持

該操作和生產(chǎn)者應(yīng)用一致,不再贅述署辉。

修改連接配置

消費(fèi)者的配置需要除了需要指定連之外族铆,最好指定一些額外的配置參數(shù),以便提高消費(fèi)者性能

spring:
  kafka:
    # 指定kafka集群地址
    bootstrap-servers: dm105:9092,dm106:9092,dm107:9092
    consumer:
      # 如果兩個(gè)應(yīng)用程序?yàn)椴⑿邢M(fèi)某個(gè)topic的消息哭尝,需要將兩個(gè)應(yīng)用的group-id指定一致
      group-id: "message-group"
    listener:
      # 指定消息消費(fèi)的模式,type=batch代表可以批量消費(fèi)
      type: batch
      # 指定消費(fèi)者的并發(fā)數(shù)剖煌,也就是可以同時(shí)有多少個(gè)消費(fèi)者線(xiàn)程在監(jiān)聽(tīng)數(shù)據(jù)材鹦,默認(rèn)為1,
      # 更具情況設(shè)置并行數(shù)據(jù)耕姊,通常建議最小為Cpu的核心數(shù)
      concurrency: 16

創(chuàng)建消費(fèi)者

消費(fèi)者的就是一個(gè)普通的Spring bean.在對(duì)應(yīng)的方法上添加@KafkaListener注解桶唐,并指定需要消費(fèi)的topic即可開(kāi)始消費(fèi)者監(jiān)聽(tīng)。

@Component
public class Consumer {
    
    /**
     * 注入repository,用戶(hù)數(shù)據(jù)持久化(略)
     */
    @Autowired
    private MessageRepository repository;

    /**
     * 使用@KafkaListener注解標(biāo)記消費(fèi)方法茉兰,指定topics屬性指定監(jiān)聽(tīng)的待消費(fèi)topic
     *
     * @param messages 待消費(fèi)的數(shù)據(jù)尤泽,由于啟用了批量消費(fèi)模式,所以監(jiān)聽(tīng)獲取到的是一個(gè)集合
     */
    @KafkaListener(topics = {"test"})
    @Transactional
    public void test(List<String> messages) {
        List<Message> result = messages.stream().map(Message::new).collect(Collectors.toList());
        repository.saveAll(result);
        System.out.println("save message [" + messages.size() + "] 條 at" + ZonedDateTime.now().toString());
    }
}

測(cè)試項(xiàng)目

當(dāng)項(xiàng)目構(gòu)建完成之后规脸,可以按照如下步驟來(lái)測(cè)試項(xiàng)目

  1. 啟動(dòng)消費(fèi)者程序
  2. 執(zhí)行生產(chǎn)者測(cè)試代碼坯约,觀察生產(chǎn)者執(zhí)行結(jié)果

項(xiàng)目倉(cāng)庫(kù)地址

完整項(xiàng)目參考https://github.com/ldwqh0/hadoop-demo

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市莫鸭,隨后出現(xiàn)的幾起案子闹丐,更是在濱河造成了極大的恐慌,老刑警劉巖被因,帶你破解...
    沈念sama閱讀 219,188評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件卿拴,死亡現(xiàn)場(chǎng)離奇詭異衫仑,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)堕花,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,464評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)文狱,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人缘挽,你說(shuō)我怎么就攤上這事如贷。” “怎么了到踏?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,562評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵杠袱,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我窝稿,道長(zhǎng)楣富,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,893評(píng)論 1 295
  • 正文 為了忘掉前任伴榔,我火速辦了婚禮纹蝴,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘踪少。我一直安慰自己塘安,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,917評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布援奢。 她就那樣靜靜地躺著兼犯,像睡著了一般。 火紅的嫁衣襯著肌膚如雪集漾。 梳的紋絲不亂的頭發(fā)上切黔,一...
    開(kāi)封第一講書(shū)人閱讀 51,708評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音具篇,去河邊找鬼纬霞。 笑死,一個(gè)胖子當(dāng)著我的面吹牛驱显,可吹牛的內(nèi)容都是我干的诗芜。 我是一名探鬼主播,決...
    沈念sama閱讀 40,430評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼埃疫,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼伏恐!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起熔恢,我...
    開(kāi)封第一講書(shū)人閱讀 39,342評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤脐湾,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后叙淌,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體秤掌,經(jīng)...
    沈念sama閱讀 45,801評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡愁铺,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,976評(píng)論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了闻鉴。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片茵乱。...
    茶點(diǎn)故事閱讀 40,115評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖孟岛,靈堂內(nèi)的尸體忽然破棺而出瓶竭,到底是詐尸還是另有隱情,我是刑警寧澤渠羞,帶...
    沈念sama閱讀 35,804評(píng)論 5 346
  • 正文 年R本政府宣布斤贰,位于F島的核電站,受9級(jí)特大地震影響次询,放射性物質(zhì)發(fā)生泄漏荧恍。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,458評(píng)論 3 331
  • 文/蒙蒙 一屯吊、第九天 我趴在偏房一處隱蔽的房頂上張望送巡。 院中可真熱鬧,春花似錦盒卸、人聲如沸骗爆。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,008評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)摘投。三九已至,卻和暖如春屉佳,著一層夾襖步出監(jiān)牢的瞬間谷朝,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,135評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工武花, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人杈帐。 一個(gè)月前我還...
    沈念sama閱讀 48,365評(píng)論 3 373
  • 正文 我出身青樓体箕,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親挑童。 傳聞我的和親對(duì)象是個(gè)殘疾皇子累铅,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,055評(píng)論 2 355