SpringBoot與RocketMQ客戶端集成原理解讀與示例

????本文將對rocktmq-spring-boot的設(shè)計實現(xiàn)做一個簡單的介紹特纤,讀者可以通過本文了解將RocketMQ Client端集成為spring-boot-starter框架的開發(fā)細節(jié)睡陪,然后通過一個簡單的示例來一步一步的講解如何使用這個spring-boot-starter工具包來配置,發(fā)送和消費RocketMQ消息宠互。該項目git地址:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-spring-boot-starter

  文章主要內(nèi)容包括以下幾個方面:

  1 前言

  2 Spring的消息框架介紹

  Spring Messaging

  Sping Cloud Stream

  3 rocketmq-spring-boot具體實現(xiàn)

  實現(xiàn)步驟

  發(fā)送端實現(xiàn)

  消費端實現(xiàn)

  4 使用示例

  服務(wù)端準備

  編譯rocketmq-spring-boot-starter

  代碼示例

? ? ? ? 前言

  上世紀90年代末随静,隨著Java EE(Enterprise Edition)的出現(xiàn)茅特,特別是Enterprise Java Beans的使用需要復雜的描述符配置和死板復雜的代碼實現(xiàn)哆料,增加了廣大開發(fā)者的學習曲線和開發(fā)成本,由此基于簡單的XML配置和普通Java對象(Plain Old Java Objects)的Spring技術(shù)應運而生策添,依賴注入(Dependency Injection), 控制反轉(zhuǎn)(Inversion of Control)和面向切面編程(AOP)的技術(shù)更加敏捷地解決了傳統(tǒng)Java企業(yè)及版本的不足材部。隨著Spring的持續(xù)演進,基于注解(Annotation)的配置逐漸取代了XML文件配置唯竹, 2014年4月1日乐导,Spring Boot 1.0.0正式發(fā)布,它基于“約定大于配置”(Convention over configuration)這一理念來快速地開發(fā)摩窃,測試兽叮,運行和部署Spring應用,并能通過簡單地與各種啟動器(如 spring-boot-web-starter)結(jié)合猾愿,讓應用直接以命令行的方式運行鹦聪,不需再部署到獨立容器中。這種簡便直接快速構(gòu)建和開發(fā)應用的過程蒂秘,可以使用約定的配置并且簡化部署泽本,受到越來越多的開發(fā)者的歡迎。

  Apache RocketMQ是業(yè)界知名的分布式消息和流處理中間件姻僧,簡單地理解规丽,它由Broker服務(wù)器和客戶端兩部分組成,其中客戶端一個是消息發(fā)布者客戶端(Producer)撇贺,它負責向Broker服務(wù)器發(fā)送消息赌莺;另外一個是消息的消費者客戶端(Consumer),多個消費者可以組成一個消費組松嘶,來訂閱和拉取消費Broker服務(wù)器上存儲的消息艘狭。為了利用Spring Boot的快速開發(fā)和讓用戶能夠更靈活地使用RocketMQ消息客戶端,Apache RocketMQ社區(qū)推出了spring-boot-starter實現(xiàn)翠订。隨著分布式事務(wù)消息功能在RocketMQ 4.3.0版本的發(fā)布巢音,近期升級了相關(guān)的spring-boot代碼,通過注解方式支持分布式事務(wù)的回查和事務(wù)消息的發(fā)送尽超。

  本文將對當前的設(shè)計實現(xiàn)做一個簡單的介紹官撼,讀者可以通過本文了解將RocketMQ Client端集成為spring-boot-starter框架的開發(fā)細節(jié),然后通過一個簡單的示例來一步一步的講解如何使用這個spring-boot-starter工具包來配置似谁,發(fā)送和消費RocketMQ消息傲绣。

  Spring中的消息框架

  順便在這里討論一下在Spring中關(guān)于消息的兩個主要的框架掠哥,即Spring Messaging和Spring Cloud Stream。它們都能夠與Spring Boot整合并提供了一些參考的實現(xiàn)秃诵。和所有的實現(xiàn)框架一樣龙致,消息框架的目的是實現(xiàn)輕量級的消息驅(qū)動的微服務(wù),可以有效地簡化開發(fā)人員對消息中間件的使用復雜度顷链,讓系統(tǒng)開發(fā)人員可以有更多的精力關(guān)注于核心業(yè)務(wù)邏輯的處理。

  2.1 Spring Messaging

  Spring Messaging是Spring Framework 4中添加的模塊屈梁,是Spring與消息系統(tǒng)集成的一個擴展性的支持嗤练。它實現(xiàn)了從基于JmsTemplate的簡單的使用JMS接口到異步接收消息的一整套完整的基礎(chǔ)架構(gòu),Spring AMQP提供了該協(xié)議所要求的類似的功能集在讶。 在與Spring Boot的集成后煞抬,它擁有了自動配置能力,能夠在測試和運行時與相應的消息傳遞系統(tǒng)進行集成构哺。

  單純對于客戶端而言革答,Spring Messaging提供了一套抽象的API或者說是約定的標準,對消息發(fā)送端和消息接收端的模式進行規(guī)定曙强,不同的消息中間件提供商可以在這個模式下提供自己的Spring實現(xiàn):在消息發(fā)送端需要實現(xiàn)的是一個XXXTemplate形式的Java Bean残拐,結(jié)合Spring Boot的自動化配置選項提供多個不同的發(fā)送消息方法;在消息的消費端是一個XXXMessageListener接口(實現(xiàn)方式通常會使用一個注解來聲明一個消息驅(qū)動的POJO)碟嘴,提供回調(diào)方法來監(jiān)聽和消費消息溪食,這個接口同樣可以使用Spring Boot的自動化選項和一些定制化的屬性。如果有興趣深入的了解Spring Messaging及針對不同的消息產(chǎn)品的使用娜扇,推薦閱讀這個文件错沃。參考Spring Messaging的既有實現(xiàn),RocketMQ的spring-boot-starter中遵循了相關(guān)的設(shè)計模式并結(jié)合RocketMQ自身的功能特點提供了相應的API(如雀瓢,順序枢析,異步和事務(wù)半消息等)。

  2.2 Spring Cloud Stream

  Spring Cloud Stream結(jié)合了Spring Integration的注解和功能刃麸,它的應用模型如下:


? ? ? 該圖片引自spring cloud stream

? ? ? Spring Cloud Stream框架中提供一個獨立的應用內(nèi)核醒叁,它通過輸入(@Input)和輸出(@Output)通道與外部世界進行通信,消息源端(Source)通過輸入通道發(fā)送消息嫌蚤,消費目標端(Sink)通過監(jiān)聽輸出通道來獲取消費的消息辐益。這些通道通過專用的Binder實現(xiàn)與外部代理連接。開發(fā)人員的代碼只需要針對應用內(nèi)核提供的固定的接口和注解方式進行編程脱吱,而不需要關(guān)心運行時具體的Binder綁定的消息中間件智政。在運行時,Spring Cloud Stream能夠自動探測并使用在classpath下找到的Binder箱蝠。這樣開發(fā)人員可以輕松地在相同的代碼中使用不同類型的中間件:僅僅需要在構(gòu)建時包含進不同的Binder续捂。在更加復雜的使用場景中垦垂,也可以在應用中打包多個Binder并讓它自己選擇Binder,甚至在運行時為不同的通道使用不同的Binder牙瓢。

  Binder抽象使得Spring Cloud Stream應用可以靈活的連接到中間件劫拗,加之Spring Cloud Stream使用利用了Spring Boot的靈活配置配置能力,這樣的配置可以通過外部配置的屬性和Spring Boo支持的任何形式來提供(包括應用啟動參數(shù)矾克、環(huán)境變量和application.yml或者application.properties文件)页慷,部署人員可以在運行時動態(tài)選擇通道連接destination(例如,Kafka的topic或者RabbitMQ的exchange)胁附。

????Binder SPI的方式來讓消息中間件產(chǎn)品使用可擴展的API來編寫相應的Binder酒繁,并集成到Spring Cloud Steam環(huán)境,目前RocketMQ還沒有提供相關(guān)的Binder控妻,我們計劃在下一步將完善這一功能州袒,也希望社區(qū)里有這方面經(jīng)驗的同學積極嘗試,貢獻PR或建議弓候。

????rocketmq-spring-boot的實現(xiàn)

????在開始的時候我們已經(jīng)知道郎哭,spring boot starter構(gòu)造的啟動器對于使用者是非常方便的,使用者只要在pom.xml引入starter的依賴定義菇存,相應的編譯夸研,運行和部署功能就全部自動引入。因此常用的開源組件都會為Spring的用戶提供一個spring-boot-starter封裝給開發(fā)者依鸥,讓開發(fā)者非常方便集成和使用陈惰,這里我們詳細的介紹一下RocketMQ(客戶端)的starter實現(xiàn)過程。

????3.1. spring-boot-starter的實現(xiàn)步驟

????對于一個spring-boot-starter實現(xiàn)需要包含如下幾個部分:

????1. 在pom.xml的定義

????定義最終要生成的starter組件信息


定義依賴包毕籽,

它分為兩個部分: A Spring自身的依賴包抬闯; B RocketMQ的依賴包


????2. 配置文件類

????定義應用屬性配置文件類RocketMQProperties,這個Bean定義一組默認的屬性值。用戶在使用最終的starter時关筒,可以根據(jù)這個類定義的屬性來修改取值溶握,當然不是直接修改這個類的配置,而是spring-boot應用中對應的配置文件:src/main/resources/application.properties.

????3. 定義自動加載類

????定義 src/resources/META-INF/spring.factories文件中的自動加載類蒸播, 其目的是讓spring boot更具文中中所指定的自動化配置類來自動初始化相關(guān)的Bean睡榆,Component或Service,它的內(nèi)容如下:

????在RocketMQAutoConfiguration類的具體實現(xiàn)中袍榆,定義開放給用戶直接使用的Bean對象. 包括:

????RocketMQProperties 加載應用屬性配置文件的處理類

????RocketMQTemplate 發(fā)送端用戶發(fā)送消息的發(fā)送模板類

????ListenerContainerConfiguration 容器Bean負責發(fā)現(xiàn)和注冊消費端消費實現(xiàn)接口類胀屿,這個類要求:1. 由@RocketMQMessageListener注解標注;2. 實現(xiàn)RocketMQListener泛化接口包雀。

????4. 最后具體的RocketMQ相關(guān)的封裝

????在發(fā)送端(producer)和消費端(consumer)客戶端分別進行封裝宿崭,在當前的實現(xiàn)版本提供了對Spring Messaging接口的兼容方式。

????3.2. 消息發(fā)送端實現(xiàn)

????1. 普通發(fā)送端

  發(fā)送端的代碼封裝在RocketMQTemplate POJO中才写,下圖是發(fā)送端的相關(guān)代碼的調(diào)用關(guān)系圖:


????為了與Spring Messaging的發(fā)送模板兼容葡兑,在RocketMQTemplate集成了AbstractMessageSendingTemplate抽象類奖蔓,來支持相關(guān)的消息轉(zhuǎn)換和發(fā)送方法,這些方法最終會代理給doSend()方法讹堤;doSend()以及RocoketMQ所特有的一些方法如異步吆鹤,單向和順序等方法直接添加到RoketMQTempalte中,這些方法直接代理調(diào)用到RocketMQ的Producer API來進行消息的發(fā)送洲守。 ??

????2 事務(wù)消息發(fā)送端

????對于事務(wù)消息的處理疑务,在消息發(fā)送端進行了部分的擴展,參考下圖的調(diào)用關(guān)系類圖:


?????RocketMQTemplate里加入了一個發(fā)送事務(wù)消息的方法sendMessageInTransaction(), 并且最終這個方法會代理到RocketMQ的TransactionProducer進行調(diào)用梗醇,在這個Producer上會注冊其關(guān)聯(lián)的TransactionListener實現(xiàn)類暑始,以便在發(fā)送消息后能夠?qū)ransactionListener里的方法實現(xiàn)進行調(diào)用。

????3. 消息消費端實現(xiàn)

????在消費端Spring-Boot應用啟動后婴削,會掃描所有包含@RocketMQMessageListener注解的類(這些類需要集成RocketMQListener接口,并實現(xiàn)onMessage()方法)牙肝,這個Listener會一對一的被放置到DefaultRocketMQListenerContainer容器對象中唉俗,容器對象會根據(jù)消費的方式(并發(fā)或順序),將RocketMQListener封裝到具體的RocketMQ內(nèi)部的并發(fā)或者順序接口實現(xiàn)配椭。在容器中創(chuàng)建RocketMQ Consumer對象虫溜,啟動并監(jiān)聽定制的Topic消息,如果有消費消息股缸,則回調(diào)到Listener的onMessage()方法衡楞。

????使用示例

????上面的一章介紹了RocketMQ在spring-boot-starter方式的實現(xiàn),這里通過一個最簡單的消息發(fā)送和消費的例子來介紹如何使這個rocketmq-spring-boot-starter敦姻。

????4.1 RocketMQ服務(wù)端的準備

????1 啟動NameServer和Broker

????要驗證RocketMQ的Spring-Boot客戶端瘾境,首先要確保RocketMQ服務(wù)正確的下載并啟動×耄可以參考RocketMQ主站的快速開始來進行操作迷守。確保啟動NameServer和Broker已經(jīng)正確啟動。

????2 創(chuàng)建實例中所需要的Topics

????在執(zhí)行啟動命令的目錄下執(zhí)行下面的命令行操作

????bash bin/mqadmin updateTopic -c DefaultCluster -t string-topic

????4.2. 編譯rocketmq-spring-boot-starter

????目前的spring-boot-starter依賴還沒有提交的Maven的中心庫旺入,用戶使用前需要自行下載git源碼兑凿,然后執(zhí)行mvn clean install 安裝到本地倉庫。

????git clone https://github.com/apache/rocketmq-externals.git

????cd rocketmq-spring-boot-starter

????mvn clean install

????4.3. 編寫客戶端代碼

????用戶如果使用它茵瘾,需要在消息的發(fā)布和消費客戶端的maven配置文件pom.xml中添加如下的依賴:

????1.0.0-SNAPSHOT

????org.apache.rocketmq

????spring-boot-starter-rocketmq

????${spring-boot-starter-rocketmq-version}

????屬性spring-boot-starter-rocketmq-version的取值為:1.0.0-SNAPSHOT礼华, 這與上一步驟中執(zhí)行安裝到本地倉庫的版本一致。

????1. 消息發(fā)送端的代碼

????發(fā)送端的配置文件application.properties

????# 定義name-server地址

????spring.rocketmq.name-server=localhost:9876

????# 定義發(fā)布者組名

????spring.rocketmq.producer.group=my-group1

????# 定義要發(fā)送的topic

????spring.rocketmq.topic=string-topic

????發(fā)送端的Java代碼

import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;

...

@SpringBootApplication

public class ProducerApplication implements CommandLineRunner {

// 聲明并引用RocketMQTemplate

@Resource

private RocketMQTemplate rocketMQTemplate;

// 使用application.properties里定義的topic屬性

@Value("${spring.rocketmq.springTopic}")

private String springTopic;

public static void main(String[] args){

SpringApplication.run(ProducerApplication.class, args);

}

public void run(String... args) throws Exception {

// 以同步的方式發(fā)送字符串消息給指定的topic

SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello, World!");

// 打印發(fā)送結(jié)果信息

System.out.printf("string-topic syncSend1 sendResult=%s %n", sendResult);

}

}

????2. 消息消費端代碼

????消費端的配置文件application.properties

# 定義name-server地址

spring.rocketmq.name-server=localhost:9876

# 定義發(fā)布者組名

spring.rocketmq.consumer.group=my-customer-group1

# 定義要發(fā)送的topic

spring.rocketmq.topic=string-topic

  消費端的Java代碼

@SpringBootApplication

public class ConsumerApplication {

public static void main(String[] args) {

SpringApplication.run(ConsumerApplication.class, args);

}

}

// 聲明消費消息的類拗秘,并在注解中指定圣絮,相關(guān)的消費信息

@Service

@RocketMQMessageListener(topic = "${spring.rocketmq.topic}", consumerGroup = "${spring.rocketmq.consumer.group}")

class StringConsumer implements RocketMQListener {

@Override

public void onMessage(String message) {

System.out.printf("------- StringConsumer received: %s %f", message);

}

}

????這里只是簡單的介紹了使用spring-boot來編寫最基本的消息發(fā)送和接收的代碼,如果需要了解更多的調(diào)用方式雕旨,如: 異步發(fā)送晨雳,對象消息體行瑞,指定tag標簽以及指定事務(wù)消息,請參看github的說明文檔和詳細的代碼餐禁。我們后續(xù)還會對這些高級功能進行陸續(xù)的介紹血久。

????預告:近期還會推出第二篇文章解讀springboot框架下消息事務(wù)的用法。

  參考文檔

  1.Spring Boot features-Messaging

  2.Enterprise Integration Pattern-組成簡介

  3.Spring Cloud Stream Reference Guide

  4.https://dzone.com/articles/creating-custom-springboot-starter-for-twitter4j

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末帮非,一起剝皮案震驚了整個濱河市氧吐,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌末盔,老刑警劉巖筑舅,帶你破解...
    沈念sama閱讀 216,591評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異陨舱,居然都是意外死亡翠拣,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評論 3 392
  • 文/潘曉璐 我一進店門游盲,熙熙樓的掌柜王于貴愁眉苦臉地迎上來误墓,“玉大人,你說我怎么就攤上這事益缎∶栈牛” “怎么了?”我有些...
    開封第一講書人閱讀 162,823評論 0 353
  • 文/不壞的土叔 我叫張陵莺奔,是天一觀的道長欣范。 經(jīng)常有香客問我,道長令哟,這世上最難降的妖魔是什么恼琼? 我笑而不...
    開封第一講書人閱讀 58,204評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮屏富,結(jié)果婚禮上驳癌,老公的妹妹穿的比我還像新娘。我一直安慰自己役听,他們只是感情好颓鲜,可當我...
    茶點故事閱讀 67,228評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著典予,像睡著了一般甜滨。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上瘤袖,一...
    開封第一講書人閱讀 51,190評論 1 299
  • 那天衣摩,我揣著相機與錄音,去河邊找鬼。 笑死艾扮,一個胖子當著我的面吹牛既琴,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播泡嘴,決...
    沈念sama閱讀 40,078評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼甫恩,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了酌予?” 一聲冷哼從身側(cè)響起磺箕,我...
    開封第一講書人閱讀 38,923評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎抛虫,沒想到半個月后松靡,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,334評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡建椰,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,550評論 2 333
  • 正文 我和宋清朗相戀三年雕欺,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片棉姐。...
    茶點故事閱讀 39,727評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡屠列,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出谅海,到底是詐尸還是另有隱情,我是刑警寧澤蹦浦,帶...
    沈念sama閱讀 35,428評論 5 343
  • 正文 年R本政府宣布扭吁,位于F島的核電站,受9級特大地震影響盲镶,放射性物質(zhì)發(fā)生泄漏侥袜。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,022評論 3 326
  • 文/蒙蒙 一溉贿、第九天 我趴在偏房一處隱蔽的房頂上張望枫吧。 院中可真熱鬧,春花似錦宇色、人聲如沸九杂。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,672評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽例隆。三九已至,卻和暖如春抢蚀,著一層夾襖步出監(jiān)牢的瞬間镀层,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,826評論 1 269
  • 我被黑心中介騙來泰國打工皿曲, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留唱逢,地道東北人吴侦。 一個月前我還...
    沈念sama閱讀 47,734評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像坞古,于是被迫代替她去往敵國和親备韧。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,619評論 2 354

推薦閱讀更多精彩內(nèi)容