一欢搜、簡介
1.1 什么是 JMS
JMS 即 Java 消息服務(wù)(Java Message Service)應(yīng)用程序接口苹丸,是一個(gè)Java平臺中關(guān)于面向消息中間件(MOM)的API圣贸,用于在兩個(gè)應(yīng)用程序之間艾扮,或分布式系統(tǒng)中發(fā)送消息风钻,進(jìn)行異步通信活喊。Java 消息服務(wù)是一個(gè)與具體平臺無關(guān)的 API云头,絕大多數(shù)MOM提供商都對JMS提供支持捐友。
1.2 什么是 ActiveMQ
ActiveMQ 是由 Apache 出品的一款開源消息中間件,旨在為應(yīng)用程序提供高效溃槐、可擴(kuò)展匣砖、穩(wěn)定、安全的企業(yè)級消息通信竿痰。
它的設(shè)計(jì)目標(biāo)是提供標(biāo)準(zhǔn)的脆粥、面向消息的、多語言的應(yīng)用集成消息通信中間件影涉。ActiveMQ 實(shí)現(xiàn)了 JMS 1.1 并提供了很多附加的特性变隔,比如 JMX 管理、主從管理蟹倾、消息組通信匣缘、消息優(yōu)先級、延遲接收消息鲜棠、虛擬接收者肌厨、消息持久化、消息隊(duì)列監(jiān)控等
1.3 ActiveMQ 特點(diǎn)
- 支持包括 Java豁陆、C柑爸、C++、C#盒音、Ruby表鳍、Perl馅而、Python、PHP 等多種語言的客戶端和協(xié)議譬圣。協(xié)議包含 OpenWire瓮恭、Stomp、AMQP厘熟、MQTT 屯蹦。
- 提供了像消息組通信、消息優(yōu)先級绳姨、延遲接收消息登澜、虛擬接收者、消息持久化之類的高級特性
- 完全支持 JMS 1.1 和 J2EE 1.4規(guī)范(包括持久化就缆、分布式事務(wù)消息帖渠、事務(wù))
- 對 Spring 框架的支持,ActiveMQ 可以通過 Spring 的配置文件方式很容易嵌入到 Spring 應(yīng)用中
- 通過了常見的 J2EE 服務(wù)器測試竭宰,比如 TomEE空郊、Geronimo、JBoss切揭、GlassFish狞甚、WebLogic
- 連接方式的多樣化,ActiveMQ 提供了多種連接模式廓旬,例如 in-VM哼审、TCP、SSL孕豹、NIO涩盾、UDP、多播励背、JGroups春霍、JXTA
- 支持通過使用 JDBC 和 journal 實(shí)現(xiàn)消息的快速持久化
- 為高性能集群、客戶端-服務(wù)器叶眉、點(diǎn)對點(diǎn)通信等場景而設(shè)計(jì)
- 提供了技術(shù)和語言中立的 REST API 接口
- 支持 Ajax 方式調(diào)用 ActiveMQ
- ActiveMQ 可以輕松地與 CXF址儒、Axis 等 Web Service 技術(shù)整合,以提供可靠的消息傳遞
- 可用作為內(nèi)存中的 JMS 提供者衅疙,非常適合 JMS 單元測試
1.4 基本組件
ActiveMQ 使用時(shí)包含的基本組件如下(與 JMS 相同):
- Broker:消息代理莲趣,表示消息隊(duì)列服務(wù)器實(shí)體,接受客戶端連接饱溢,提供消息通信的核心服務(wù)喧伞;
- Producer:消息生產(chǎn)者,業(yè)務(wù)的發(fā)起方,負(fù)責(zé)生產(chǎn)消息并傳輸給 Broker絮识;
- Consumer:消息消費(fèi)者绿聘,業(yè)務(wù)的處理方,負(fù)責(zé)從 Broker 獲取消息并進(jìn)行業(yè)務(wù)邏輯處理次舌;
- Topic:主題,發(fā)布訂閱模式下的消息統(tǒng)一匯集地兽愤,不同生產(chǎn)者向 Topic 發(fā)送消息彼念,由 Broker 分發(fā)到不同的訂閱者,實(shí)現(xiàn)消息的廣播浅萧;
- Queue:隊(duì)列逐沙,點(diǎn)對點(diǎn)模式下特定生產(chǎn)者向特定隊(duì)列發(fā)送消息,消費(fèi)者訂閱特定隊(duì)列接收消息并進(jìn)行業(yè)務(wù)邏輯處理洼畅;
- Message:消息體吩案,根據(jù)不同通信協(xié)議定義的固定格式進(jìn)行編碼的數(shù)據(jù)包,來封裝業(yè)務(wù) 數(shù)據(jù)帝簇,實(shí)現(xiàn)消息的傳輸徘郭。
1.5 消息傳送模型
ActiveMQ 支持兩種截然不同的消息傳送模型:PTP(即點(diǎn)對點(diǎn)模型)和Pub/Sub(即發(fā)布 /訂閱模型),分別稱作:PTP Domain 和Pub/Sub Domain丧肴。
PTP(即點(diǎn)對點(diǎn)模型)
PTP 消息域使用 queue(隊(duì)列) 作為 Destination(消息被尋址残揉、發(fā)送以及接收的對象),消息可以被同步或異步的發(fā)送和接收芋浮,每個(gè)消息只會給一個(gè) Consumer 傳送一次抱环。Consumer 可以使用 MessageConsumer.receive()
同步地接收消息,也可以通過使用MessageConsumer.setMessageListener()
注冊一個(gè) MessageListener 實(shí)現(xiàn)異步接收纸巷。多個(gè) Consumer 可以注冊到同一個(gè) queue 上镇草,但一個(gè)消息只能被一個(gè) Consumer 所接收,然后由該 Consumer 來確認(rèn)消息瘤旨。并且在這種情況下梯啤,Provider 對所有注冊的 Consumer 以輪詢的方式發(fā)送消息。
Pub/Sub 發(fā)布訂閱模型
Pub/Sub(發(fā)布/訂閱,Publish/Subscribe)消息域使用 topic (主題) 作為 Destination(消息被尋址流昏、發(fā)送以及接收的對象)媒熊,發(fā)布者向 topic 發(fā)送消息,訂閱者注冊接收來自 topic 的消息羽嫡。發(fā)送到 topic 的任何消息都將自動傳遞給所有訂閱者。接收方式(同步和異步)與 P2P 域相同肩袍。除非顯式指定杭棵,否則 topic 不會為訂閱者保留消息。當(dāng)然,這可以通過持久化(Durable)訂閱來實(shí)現(xiàn)消息的保存魂爪。這種情況下先舷,當(dāng)訂閱者與 Provider 斷開時(shí),Provider 會為它存儲消息滓侍。當(dāng)持久化訂閱者重新連接時(shí)蒋川,將會受到所有的斷連期間未消費(fèi)的消息。
1.6 消息存儲
JMS 規(guī)范中消息的分發(fā)方式有兩種:非持久化和持久化撩笆。對于非持久化消息 JMS 實(shí)現(xiàn)者須保證盡最大努力分發(fā)消息捺球,但消息不會持久化存儲;而持久化方式分發(fā)的消息則必須進(jìn)行持久化存儲夕冲。非持久化消息常用于發(fā)送通知或?qū)崟r(shí)數(shù)據(jù)氮兵,當(dāng)你比較看重系統(tǒng)性能并且即使丟失一些消息并不影響業(yè)務(wù)正常運(yùn)作時(shí)可選擇非持久化消息。持久化消息被發(fā)送到消息服務(wù)器后如果當(dāng)前消息的消費(fèi)者并沒有運(yùn)行則該消息繼續(xù)存在歹鱼,只有等到消息被處理并被消息消費(fèi)者確認(rèn)之后泣栈,消息才會從消息服務(wù)器中刪除。具體的消息存儲方式如下:
- AMQ弥姻,是 ActiveMQ 5.0及以前版本默認(rèn)的消息存儲方式南片,它是一個(gè)基于文件的、支持事務(wù)的消息存儲解決方案蚁阳。在此方案下消息本身以日志的形式實(shí)現(xiàn)持久化铃绒,存放在 Data Log 里。并且還對日志里的消息做了引用索引螺捐,方便快速取回消息
<broker brokerName="broker" persistent="true" useShutdownHook="false"> <persistenceAdapter> <amqPersistenceAdapter directory="${activemq.data}/amq" syncOnWrite="true" indexPageSize="16kb" indexMaxBinSize="100" maxFileLength="10mb" /> </persistenceAdapter> </broker>
- KahaDB颠悬,也是一種基于文件并具有支持事務(wù)的消息存儲方式,從5.3開始推薦使用 KahaDB 存儲消息定血,它提供了比 AMQ 消息存儲更好的可擴(kuò)展性和可恢復(fù)性
<broker brokerName="broker" persistent="true" useShutdownHook="false"> <persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/> </persistenceAdapter> </broker>
- JDBC赔癌,基于 JDBC 方式將消息存儲在數(shù)據(jù)庫中,將消息存到數(shù)據(jù)庫相對來說比較慢澜沟,所以 ActiveMQ 建議結(jié)合 journal 來存儲灾票,它使用了快速的緩存寫入技術(shù),大大提高了性能茫虽;
<beans> <broker brokerName="broker" persistent="true" xmlns="http://activemq.apache.org/schema/core"> <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter> </broker> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean> </beans>
- 內(nèi)存存儲刊苍,是指將所有要持久化的消息放到內(nèi)存中,因?yàn)檫@里沒有動態(tài)的緩存濒析,所以需要注意設(shè)置消息服務(wù)器的 JVM 和內(nèi)存大小
<broker brokerName="broker" persistent="false" xmlns="http://activemq.apache.org/schema/core"> <transportConnectors> <transportConnector uri="tcp://localhost:61635"/> </transportConnectors> </broker>
- LevelDB正什,5.6版本之后推出了 LevelDB 的持久化引擎,它使用了自定義的索引代替常用的 BTree 索引号杏,其持久化性能高于 KahaDB婴氮,雖然默認(rèn)的持久化方式還是 KahaDB,但是 LevelDB 將是趨勢。在5.9版本還提供了基于 LevelDB 和 Zookeeper 的數(shù)據(jù)復(fù)制方式主经,作為 Master-Slave 方式的首選數(shù)據(jù)復(fù)制方案
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="broker" dataDirectory="${activemq.data}"> <persistenceAdapter> <levelDB directory="${activemq.data}/levelDB"/> </persistenceAdapter> </broker>
1.7 連接器
ActiveMQ Broker (消息代理) 的主要作用是為客戶端應(yīng)用提供一種通信機(jī)制荣暮,為此 ActiveMQ 提供了一種連接機(jī)制,并用連接器(connector)來描述這種連接機(jī)制罩驻。ActiveMQ 中的連接器有兩種穗酥,一種是用于客戶端與消息代理服務(wù)器(client-to-broker)之間通信的傳輸連接器(transport connector),一種是用于消息代理服務(wù)器之間(broker-to-broker)通信的網(wǎng)絡(luò)連接器(network connector)惠遏。connector 使用 URI(統(tǒng)一資源定位符)來表示迷扇,URI 格式為: <schema name>:<hierarchical part>[?<query>][#<fragment>]
, 例如:foo://username:password@example.com:8042/over/there/index.dtb?type=animal&name=narwhal#nose
- schema name: fool
- hierarchical part: username:password@example.com:8042/over/there/index.dtb
- query: type=animal&name=narwhal
- fragment: nose
1.7.1 傳輸連接器(transport connector)
為了交換消息,消息生產(chǎn)者和消息消費(fèi)者(統(tǒng)稱為客戶端)都需要連接到消息代理服務(wù)器爽哎,這種客戶端和消息代理服務(wù)器之間的通信就是通過傳輸連接器(Transport connectors)完成的。很多情況下用戶連接消息代理時(shí)的需求側(cè)重點(diǎn)不同器一,有的更關(guān)注性能课锌,有的更注重安全性,因此 ActiveMQ 提供了一系列l(wèi)連接協(xié)議供選擇祈秕,來覆蓋這些使用場景渺贤。從消息代理的角度看,傳輸連接器就是用來處理和監(jiān)聽客戶端連接的请毛,查看 ActiveMQ demo 的配置文件(/examples/conf/activemq-demo.xml)志鞍,傳輸連接的相關(guān)配置如下:
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
<transportConnector name="ssl" uri="ssl://localhost:61617"/>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
<transportConnector name="ws" uri="ws://localhost:61614/" />
</transportConnectors>
傳輸連接器定義在<transportConnectors>
元素中,一個(gè)<transportConnector>
元素定義一個(gè)特定的連接器方仿,一個(gè)連接器必須有自己唯一的名字和URI
屬性固棚,但discoveryUri
屬性是可選的。目前在 ActiveMQ 最新的5.15版本中常用的傳輸連接器連接協(xié)議有:vm仙蚜、tcp此洲、udp、multicast委粉、nio呜师、ssl、http贾节、https汁汗、websocket、amqp栗涂、mqtt知牌、stomp 等等
- VM:允許客戶端和消息服務(wù)器直接在 VM 內(nèi)部通信,采用的連接不是 Socket 連接戴差,而是直接的虛擬機(jī)本地方法調(diào)用送爸,從而避免網(wǎng)絡(luò)傳輸?shù)拈_銷。應(yīng)用場景僅限于服務(wù)器和客戶端在同一 JVM 中。如使用代碼啟動嵌入式的ActiveMQ Broker實(shí)例袭厂,通常用于單元測試墨吓。因?yàn)槭乔度胧剑圆恍枰渲肁ctiveMQ的配置文件纹磺,只要在連接Broker的URI中直接使用即可
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker"> <property name="brokerURL" value="vm://localhost"/> </bean>
- TCP:ActiveMQ默認(rèn)的傳輸連接帖烘,也是最常用的使用方式。長連接橄杨,每個(gè)客戶端實(shí)例都會與服務(wù)器維持一個(gè)連接秘症。每個(gè)連接一個(gè)線程。TCP的優(yōu)點(diǎn)是:
- 性能高:ActiveMQ使用默認(rèn)協(xié)議OpenWire序列化和反序列化消息式矫。OpenWire是一個(gè)性能很高的序列化協(xié)議
- 可用性高:TCP是使用最廣泛的技術(shù)乡摹,幾乎所有的開發(fā)語言都支持TCP協(xié)議
- 可靠性高:TCP協(xié)議確保消息不會在網(wǎng)絡(luò)傳說的過程中丟失
<transportConnector name="tcp" uri="tcp://localhost:61616"/>
- UDP:與面向連接,可靠的字節(jié)流服務(wù)的TCP不同采转,UDP是一個(gè)面向數(shù)據(jù)的簡單傳輸連接聪廉,沒有TCP的三次握手,所以性能大大強(qiáng)于TCP故慈,但是是以犧牲可靠性為前提板熊。適用于丟失也無所謂的消息,如統(tǒng)計(jì)uv察绷,pv干签。(當(dāng)然如果真是統(tǒng)計(jì)uv什么的,有Kafka這樣專門的消息中間件)
<transportConnector name="udp" uri="udp://localhost:8123"/>
- NIO:使用Java的NIO方式對連接進(jìn)行改進(jìn)拆撼,因?yàn)镹IO使用線程池容劳,可以復(fù)用線程,所以可以用更少的線程維持更多的連接情萤。如果有大量的客戶端鸭蛙,或者性能瓶頸在網(wǎng)絡(luò)傳輸上,可以考慮使用NIO的連接方式筋岛。也可以根據(jù)不同的場景選擇不用的傳輸連接娶视,比如:Producer有很多,但是Consumer很少睁宰,可以Producer用NIO協(xié)議肪获,Consumer用TCP協(xié)議。從ActiveMQ 5.6版本開始柒傻,NIO可以支持和SSL搭配使用的傳輸連接孝赫。
- NIO配置:
<transportConnector name="nio" uri="nio://localhost:61616"/>
- NIO+SSL配置:
<transportConnector name="nio+ssl" uri="nio+ssl://localhost:61616"/>
- SSL:需要一個(gè)安全連接的時(shí)候可以考慮使用SSL,適用于client和broker在公網(wǎng)的情況红符,如使用aws云平臺等
<transportConnector name="ssl" uri="ssl://localhost:8123"/>
- HTTP(S):需要穿越防火墻青柄,可以考慮使用HTTP(S)伐债,但由于HTTP(S)是短連接,每次創(chuàng)建連接的成本較高致开,所以性能最差峰锁。允許客戶端使用 REST 或 Ajax 的方式進(jìn)行連接,這意味著可以直接使用 Javascript 向 ActiveMQ 發(fā)送消息
- HTTP配置
<transportConnector name="http" uri="http://localhost:8080"/>
- HTTPS配置
<transportConnector name="https" uri="http://localhost:8080"/>
- AMQP:ActiveMQ 5.8新增加的傳輸連接双戳。用于支持AMQP(高級消息隊(duì)列協(xié)議)虹蒋。因?yàn)锳MQP是消息隊(duì)列的標(biāo)準(zhǔn)協(xié)議,而且已經(jīng)越來越被廣泛使用飒货,所以ActiveMQ也支持了此協(xié)議魄衅。AMQP協(xié)議可以搭配NIO或SSL協(xié)議使用,amqp+nio用于提升系統(tǒng)的延展性和性能塘辅。amqp+ssl可以創(chuàng)建安全連接晃虫。
- amqp配置:
<transportConnector name="amqp" uri="amqp://localhost:5672"/>
- amqp+nio配置:
<transportConnector name="amqp+nio" uri="amqp://localhost:5672"/>
- amqp+ssl配置:
<transportConnector name="amqp+ssl" uri="amqp://localhost:5672"/>
- MQTT:ActiveMQ 5.8新增加的傳輸連接。是一個(gè)輕量級的消息訂閱/發(fā)布協(xié)議扣墩。和AMQP一樣傲茄,同樣支持搭配NIO或SSL使用
<transportConnector name="mqtt" uri="mqtt://localhost:1883"/>
1.7.2 網(wǎng)絡(luò)連接器(network connector)
很多情況下,我們要處理的數(shù)據(jù)可能是海量的沮榜,這種場景單臺服務(wù)器很難支撐,這就要用到集群功能喻粹,為此 ActiveMQ 提供了網(wǎng)絡(luò)連接的模式蟆融,簡單說就是通過把多個(gè)消息服務(wù)器實(shí)例連接在一起作為一個(gè)整體對外提供服務(wù),從而提高整體對外的消息服務(wù)能力守呜。通過這種方式連接在一起的服務(wù)器實(shí)例之間可共享隊(duì)列和消費(fèi)者列表型酥,從而達(dá)到分布式隊(duì)列的目的,網(wǎng)絡(luò)連接器就是用來配置服務(wù)器之間的通信查乒。
如圖所示弥喉,服務(wù)器S1 和 S2 通過 NewworkConnector 相連,生產(chǎn)者 P1 發(fā)送的消息玛迄,消費(fèi)者 C3 和 C4 都可以接收到由境,而生產(chǎn)者 P3 發(fā)送的消息,消費(fèi)者 C1 和 C2 也可以接收到蓖议。要使用網(wǎng)絡(luò)連接器的功能需要在服務(wù)器 S1 的 activemq.xml 中的 broker 節(jié)點(diǎn)下添加如下配置(假設(shè)192.168.11.23:61617 為 S2 的地址):
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.11.23:61617)"/>
</networkConnectors>
如果只是這樣虏杰,S1 可以將消息發(fā)送到 S2,但這只是單方向的通信勒虾,發(fā)送到 S2 上的的消息還不能發(fā)送到 S1 上纺阔。如果想 S1 也收到從 S2 發(fā)來的消息需要在 S2 的 activemq.xml 中的 broker 節(jié)點(diǎn)下也添加如下配置(假設(shè)192.168.11.45:61617為 S1 的地址):
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.11.45:61617)"/>
</networkConnectors>
這樣,S1和S2就可以雙向通信了修然。目前在 ActiveMQ 最新的5.15版本中常用的網(wǎng)絡(luò)連接器協(xié)議有 static
和 multicast
兩種:
- static笛钝,靜態(tài)協(xié)議质况,用于為一個(gè)網(wǎng)絡(luò)中多個(gè)代理創(chuàng)建靜態(tài)配置,這種配置協(xié)議支持復(fù)合的 URI (即包含其他 URI 的 URI)玻靡。例如:
static://(tcp://ip:61616,tcp://ip2:61616)
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerA" dataDirectory="${activemq.base}/data"> <networkConnectors> <networkConnector uri="static:(tcp://localhost:61617)" /> </networkConnectors> <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616" /> </transportConnectors> </broker>
- multicast结榄,多點(diǎn)傳送協(xié)議,消息服務(wù)器會廣播自己的服務(wù)啃奴,也會定位其他代理潭陪。這種方式用于服務(wù)器之間實(shí)現(xiàn)動態(tài)識別,而不是配置靜態(tài)的 IP 組最蕾。默認(rèn)配置:
multicast://default
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="multicast" dataDirectory="${activemq.base}/data"> <networkConnectors> <networkConnector name="default-nc" uri="multicast://default"/> </networkConnectors> <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/> </transportConnectors> </broker>
2.1 添加依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>priv.simon.boot</groupId>
<artifactId>activemq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>activemq</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.2 點(diǎn)對點(diǎn)配置
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
2.3 發(fā)布/訂閱配置
Spring Boot 默認(rèn)開啟點(diǎn)對點(diǎn)模式依溯,發(fā)布訂閱模式需要手動開啟
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.jms.pub-sub-domain=true
2.4 消息提供者
@Service
public class ProviderService {
@Autowired
JmsMessagingTemplate jmsMessagingTemplate;
/**
* 點(diǎn)對點(diǎn)消息發(fā)送
*/
public void sendPTPMessage(String message){
ActiveMQQueue queue = new ActiveMQQueue("queue");
jmsMessagingTemplate.convertAndSend(queue,message);
}
/**
* 發(fā)布訂閱消息發(fā)送
*/
public void sendPubSubMesage(String message){
ActiveMQTopic topic = new ActiveMQTopic("topic");
jmsMessagingTemplate.convertAndSend(topic,message);
}
}
2.5 消息消費(fèi)者
@Service
public class ConsumerService {
/**
* 監(jiān)聽點(diǎn)對點(diǎn)消息
*/
@JmsListener(destination = "queue")
public void receiveQueue(String message){
System.err.println("queue收到的消息:"+message);
}
/**
* 監(jiān)聽發(fā)布訂閱消息
*/
@JmsListener(destination = "topic")
public void receiveTopic(String message){
System.err.println("topic收到的消息:"+message);
}
}
2.6 測試類
@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqApplicationTests {
@Autowired
private ProviderService providerService;
@Test
public void ptpTest() {
providerService.sendPTPMessage("ptp hello");
}
@Test
public void pubSubTest() {
providerService.sendPubSubMesage("pub/sub hello");
}
}