消息中間件-ActiveMQ

一欢搜、簡介

1.1 什么是 JMS

JMS 即 Java 消息服務(wù)(Java Message Service)應(yīng)用程序接口苹丸,是一個(gè)Java平臺中關(guān)于面向消息中間件(MOM)的API圣贸,用于在兩個(gè)應(yīng)用程序之間艾扮,或分布式系統(tǒng)中發(fā)送消息风钻,進(jìn)行異步通信活喊。Java 消息服務(wù)是一個(gè)與具體平臺無關(guān)的 API云头,絕大多數(shù)MOM提供商都對JMS提供支持捐友。


jms

1.2 什么是 ActiveMQ

ActiveMQ 是由 Apache 出品的一款開源消息中間件,旨在為應(yīng)用程序提供高效溃槐、可擴(kuò)展匣砖、穩(wěn)定、安全的企業(yè)級消息通信竿痰。
它的設(shè)計(jì)目標(biāo)是提供標(biāo)準(zhǔn)的脆粥、面向消息的、多語言的應(yīng)用集成消息通信中間件影涉。ActiveMQ 實(shí)現(xiàn)了 JMS 1.1 并提供了很多附加的特性变隔,比如 JMX 管理、主從管理蟹倾、消息組通信匣缘、消息優(yōu)先級、延遲接收消息鲜棠、虛擬接收者肌厨、消息持久化、消息隊(duì)列監(jiān)控等

1.3 ActiveMQ 特點(diǎn)

  • 支持包括 Java豁陆、C柑爸、C++、C#盒音、Ruby表鳍、Perl馅而、Python、PHP 等多種語言的客戶端和協(xié)議譬圣。協(xié)議包含 OpenWire瓮恭、Stomp、AMQP厘熟、MQTT 屯蹦。
  • 提供了像消息組通信、消息優(yōu)先級绳姨、延遲接收消息登澜、虛擬接收者、消息持久化之類的高級特性
  • 完全支持 JMS 1.1 和 J2EE 1.4規(guī)范(包括持久化就缆、分布式事務(wù)消息帖渠、事務(wù))
  • 對 Spring 框架的支持,ActiveMQ 可以通過 Spring 的配置文件方式很容易嵌入到 Spring 應(yīng)用中
  • 通過了常見的 J2EE 服務(wù)器測試竭宰,比如 TomEE空郊、Geronimo、JBoss切揭、GlassFish狞甚、WebLogic
  • 連接方式的多樣化,ActiveMQ 提供了多種連接模式廓旬,例如 in-VM哼审、TCP、SSL孕豹、NIO涩盾、UDP、多播励背、JGroups春霍、JXTA
  • 支持通過使用 JDBC 和 journal 實(shí)現(xiàn)消息的快速持久化
  • 為高性能集群、客戶端-服務(wù)器叶眉、點(diǎn)對點(diǎn)通信等場景而設(shè)計(jì)
  • 提供了技術(shù)和語言中立的 REST API 接口
  • 支持 Ajax 方式調(diào)用 ActiveMQ
  • ActiveMQ 可以輕松地與 CXF址儒、Axis 等 Web Service 技術(shù)整合,以提供可靠的消息傳遞
  • 可用作為內(nèi)存中的 JMS 提供者衅疙,非常適合 JMS 單元測試

1.4 基本組件

ActiveMQ 使用時(shí)包含的基本組件如下(與 JMS 相同):

  • Broker:消息代理莲趣,表示消息隊(duì)列服務(wù)器實(shí)體,接受客戶端連接饱溢,提供消息通信的核心服務(wù)喧伞;
  • Producer:消息生產(chǎn)者,業(yè)務(wù)的發(fā)起方,負(fù)責(zé)生產(chǎn)消息并傳輸給 Broker絮识;
  • Consumer:消息消費(fèi)者绿聘,業(yè)務(wù)的處理方,負(fù)責(zé)從 Broker 獲取消息并進(jìn)行業(yè)務(wù)邏輯處理次舌;
  • Topic:主題,發(fā)布訂閱模式下的消息統(tǒng)一匯集地兽愤,不同生產(chǎn)者向 Topic 發(fā)送消息彼念,由 Broker 分發(fā)到不同的訂閱者,實(shí)現(xiàn)消息的廣播浅萧;
  • Queue:隊(duì)列逐沙,點(diǎn)對點(diǎn)模式下特定生產(chǎn)者向特定隊(duì)列發(fā)送消息,消費(fèi)者訂閱特定隊(duì)列接收消息并進(jìn)行業(yè)務(wù)邏輯處理洼畅;
  • Message:消息體吩案,根據(jù)不同通信協(xié)議定義的固定格式進(jìn)行編碼的數(shù)據(jù)包,來封裝業(yè)務(wù) 數(shù)據(jù)帝簇,實(shí)現(xiàn)消息的傳輸徘郭。

1.5 消息傳送模型

ActiveMQ 支持兩種截然不同的消息傳送模型:PTP(即點(diǎn)對點(diǎn)模型)和Pub/Sub(即發(fā)布 /訂閱模型),分別稱作:PTP Domain 和Pub/Sub Domain丧肴。

PTP(即點(diǎn)對點(diǎn)模型)

PTP 消息域使用 queue(隊(duì)列) 作為 Destination(消息被尋址残揉、發(fā)送以及接收的對象),消息可以被同步或異步的發(fā)送和接收芋浮,每個(gè)消息只會給一個(gè) Consumer 傳送一次抱环。Consumer 可以使用 MessageConsumer.receive() 同步地接收消息,也可以通過使用MessageConsumer.setMessageListener()注冊一個(gè) MessageListener 實(shí)現(xiàn)異步接收纸巷。多個(gè) Consumer 可以注冊到同一個(gè) queue 上镇草,但一個(gè)消息只能被一個(gè) Consumer 所接收,然后由該 Consumer 來確認(rèn)消息瘤旨。并且在這種情況下梯啤,Provider 對所有注冊的 Consumer 以輪詢的方式發(fā)送消息。

PTP

Pub/Sub 發(fā)布訂閱模型

Pub/Sub(發(fā)布/訂閱,Publish/Subscribe)消息域使用 topic (主題) 作為 Destination(消息被尋址流昏、發(fā)送以及接收的對象)媒熊,發(fā)布者向 topic 發(fā)送消息,訂閱者注冊接收來自 topic 的消息羽嫡。發(fā)送到 topic 的任何消息都將自動傳遞給所有訂閱者。接收方式(同步和異步)與 P2P 域相同肩袍。除非顯式指定杭棵,否則 topic 不會為訂閱者保留消息。當(dāng)然,這可以通過持久化(Durable)訂閱來實(shí)現(xiàn)消息的保存魂爪。這種情況下先舷,當(dāng)訂閱者與 Provider 斷開時(shí),Provider 會為它存儲消息滓侍。當(dāng)持久化訂閱者重新連接時(shí)蒋川,將會受到所有的斷連期間未消費(fèi)的消息。


PUB/SUB

1.6 消息存儲

JMS 規(guī)范中消息的分發(fā)方式有兩種:非持久化和持久化撩笆。對于非持久化消息 JMS 實(shí)現(xiàn)者須保證盡最大努力分發(fā)消息捺球,但消息不會持久化存儲;而持久化方式分發(fā)的消息則必須進(jìn)行持久化存儲夕冲。非持久化消息常用于發(fā)送通知或?qū)崟r(shí)數(shù)據(jù)氮兵,當(dāng)你比較看重系統(tǒng)性能并且即使丟失一些消息并不影響業(yè)務(wù)正常運(yùn)作時(shí)可選擇非持久化消息。持久化消息被發(fā)送到消息服務(wù)器后如果當(dāng)前消息的消費(fèi)者并沒有運(yùn)行則該消息繼續(xù)存在歹鱼,只有等到消息被處理并被消息消費(fèi)者確認(rèn)之后泣栈,消息才會從消息服務(wù)器中刪除。具體的消息存儲方式如下:

  • AMQ弥姻,是 ActiveMQ 5.0及以前版本默認(rèn)的消息存儲方式南片,它是一個(gè)基于文件的、支持事務(wù)的消息存儲解決方案蚁阳。在此方案下消息本身以日志的形式實(shí)現(xiàn)持久化铃绒,存放在 Data Log 里。并且還對日志里的消息做了引用索引螺捐,方便快速取回消息
    <broker brokerName="broker" persistent="true" useShutdownHook="false">
      <persistenceAdapter>
              <amqPersistenceAdapter
                      directory="${activemq.data}/amq"
                      syncOnWrite="true"
                      indexPageSize="16kb"
                      indexMaxBinSize="100"
                      maxFileLength="10mb" />
      </persistenceAdapter>
    </broker>
    
  • KahaDB颠悬,也是一種基于文件并具有支持事務(wù)的消息存儲方式,從5.3開始推薦使用 KahaDB 存儲消息定血,它提供了比 AMQ 消息存儲更好的可擴(kuò)展性和可恢復(fù)性
    <broker brokerName="broker" persistent="true" useShutdownHook="false">
            <persistenceAdapter>
                    <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/>
            </persistenceAdapter>
    </broker>
    
  • JDBC赔癌,基于 JDBC 方式將消息存儲在數(shù)據(jù)庫中,將消息存到數(shù)據(jù)庫相對來說比較慢澜沟,所以 ActiveMQ 建議結(jié)合 journal 來存儲灾票,它使用了快速的緩存寫入技術(shù),大大提高了性能茫虽;
    <beans>
            <broker brokerName="broker" persistent="true" xmlns="http://activemq.apache.org/schema/core">
                    <persistenceAdapter>
                            <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
                    </persistenceAdapter>
            </broker>
            <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
                    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
                    <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
                    <property name="username" value="activemq"/>
                    <property name="password" value="activemq"/>
                    <property name="maxActive" value="200"/>
                    <property name="poolPreparedStatements" value="true"/>
            </bean>
    </beans>
    
  • 內(nèi)存存儲刊苍,是指將所有要持久化的消息放到內(nèi)存中,因?yàn)檫@里沒有動態(tài)的緩存濒析,所以需要注意設(shè)置消息服務(wù)器的 JVM 和內(nèi)存大小
    <broker brokerName="broker" persistent="false" xmlns="http://activemq.apache.org/schema/core">
            <transportConnectors>
                    <transportConnector uri="tcp://localhost:61635"/>
            </transportConnectors>
    </broker>
    
  • LevelDB正什,5.6版本之后推出了 LevelDB 的持久化引擎,它使用了自定義的索引代替常用的 BTree 索引号杏,其持久化性能高于 KahaDB婴氮,雖然默認(rèn)的持久化方式還是 KahaDB,但是 LevelDB 將是趨勢。在5.9版本還提供了基于 LevelDB 和 Zookeeper 的數(shù)據(jù)復(fù)制方式主经,作為 Master-Slave 方式的首選數(shù)據(jù)復(fù)制方案
    <broker xmlns="http://activemq.apache.org/schema/core"  brokerName="broker" dataDirectory="${activemq.data}">
        <persistenceAdapter>
            <levelDB directory="${activemq.data}/levelDB"/>
        </persistenceAdapter>
    </broker>
    

1.7 連接器

ActiveMQ Broker (消息代理) 的主要作用是為客戶端應(yīng)用提供一種通信機(jī)制荣暮,為此 ActiveMQ 提供了一種連接機(jī)制,并用連接器(connector)來描述這種連接機(jī)制罩驻。ActiveMQ 中的連接器有兩種穗酥,一種是用于客戶端與消息代理服務(wù)器(client-to-broker)之間通信的傳輸連接器(transport connector),一種是用于消息代理服務(wù)器之間(broker-to-broker)通信的網(wǎng)絡(luò)連接器(network connector)惠遏。connector 使用 URI(統(tǒng)一資源定位符)來表示迷扇,URI 格式為: <schema name>:<hierarchical part>[?<query>][#<fragment>], 例如:foo://username:password@example.com:8042/over/there/index.dtb?type=animal&name=narwhal#nose

1.7.1 傳輸連接器(transport connector)

為了交換消息,消息生產(chǎn)者和消息消費(fèi)者(統(tǒng)稱為客戶端)都需要連接到消息代理服務(wù)器爽哎,這種客戶端和消息代理服務(wù)器之間的通信就是通過傳輸連接器(Transport connectors)完成的。很多情況下用戶連接消息代理時(shí)的需求側(cè)重點(diǎn)不同器一,有的更關(guān)注性能课锌,有的更注重安全性,因此 ActiveMQ 提供了一系列l(wèi)連接協(xié)議供選擇祈秕,來覆蓋這些使用場景渺贤。從消息代理的角度看,傳輸連接器就是用來處理和監(jiān)聽客戶端連接的请毛,查看 ActiveMQ demo 的配置文件(/examples/conf/activemq-demo.xml)志鞍,傳輸連接的相關(guān)配置如下:

<transportConnectors>
    <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
    <transportConnector name="ssl" uri="ssl://localhost:61617"/>
    <transportConnector name="stomp" uri="stomp://localhost:61613"/>
    <transportConnector name="ws" uri="ws://localhost:61614/" />
</transportConnectors>

傳輸連接器定義在<transportConnectors>元素中,一個(gè)<transportConnector>元素定義一個(gè)特定的連接器方仿,一個(gè)連接器必須有自己唯一的名字和URI屬性固棚,但discoveryUri屬性是可選的。目前在 ActiveMQ 最新的5.15版本中常用的傳輸連接器連接協(xié)議有:vm仙蚜、tcp此洲、udp、multicast委粉、nio呜师、ssl、http贾节、https汁汗、websocket、amqp栗涂、mqtt知牌、stomp 等等

  • VM:允許客戶端和消息服務(wù)器直接在 VM 內(nèi)部通信,采用的連接不是 Socket 連接戴差,而是直接的虛擬機(jī)本地方法調(diào)用送爸,從而避免網(wǎng)絡(luò)傳輸?shù)拈_銷。應(yīng)用場景僅限于服務(wù)器和客戶端在同一 JVM 中。如使用代碼啟動嵌入式的ActiveMQ Broker實(shí)例袭厂,通常用于單元測試墨吓。因?yàn)槭乔度胧剑圆恍枰渲肁ctiveMQ的配置文件纹磺,只要在連接Broker的URI中直接使用即可
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker">
        <property name="brokerURL" value="vm://localhost"/>
    </bean>
    
  • TCP:ActiveMQ默認(rèn)的傳輸連接帖烘,也是最常用的使用方式。長連接橄杨,每個(gè)客戶端實(shí)例都會與服務(wù)器維持一個(gè)連接秘症。每個(gè)連接一個(gè)線程。TCP的優(yōu)點(diǎn)是:
    • 性能高:ActiveMQ使用默認(rèn)協(xié)議OpenWire序列化和反序列化消息式矫。OpenWire是一個(gè)性能很高的序列化協(xié)議
    • 可用性高:TCP是使用最廣泛的技術(shù)乡摹,幾乎所有的開發(fā)語言都支持TCP協(xié)議
    • 可靠性高:TCP協(xié)議確保消息不會在網(wǎng)絡(luò)傳說的過程中丟失
    <transportConnector name="tcp" uri="tcp://localhost:61616"/>
    
  • UDP:與面向連接,可靠的字節(jié)流服務(wù)的TCP不同采转,UDP是一個(gè)面向數(shù)據(jù)的簡單傳輸連接聪廉,沒有TCP的三次握手,所以性能大大強(qiáng)于TCP故慈,但是是以犧牲可靠性為前提板熊。適用于丟失也無所謂的消息,如統(tǒng)計(jì)uv察绷,pv干签。(當(dāng)然如果真是統(tǒng)計(jì)uv什么的,有Kafka這樣專門的消息中間件)
    <transportConnector name="udp" uri="udp://localhost:8123"/>
    
  • NIO:使用Java的NIO方式對連接進(jìn)行改進(jìn)拆撼,因?yàn)镹IO使用線程池容劳,可以復(fù)用線程,所以可以用更少的線程維持更多的連接情萤。如果有大量的客戶端鸭蛙,或者性能瓶頸在網(wǎng)絡(luò)傳輸上,可以考慮使用NIO的連接方式筋岛。也可以根據(jù)不同的場景選擇不用的傳輸連接娶视,比如:Producer有很多,但是Consumer很少睁宰,可以Producer用NIO協(xié)議肪获,Consumer用TCP協(xié)議。從ActiveMQ 5.6版本開始柒傻,NIO可以支持和SSL搭配使用的傳輸連接孝赫。
    • NIO配置:
    <transportConnector name="nio" uri="nio://localhost:61616"/>
    
    • NIO+SSL配置:
    <transportConnector name="nio+ssl" uri="nio+ssl://localhost:61616"/>
    
  • SSL:需要一個(gè)安全連接的時(shí)候可以考慮使用SSL,適用于client和broker在公網(wǎng)的情況红符,如使用aws云平臺等
    <transportConnector name="ssl" uri="ssl://localhost:8123"/>
    
  • HTTP(S):需要穿越防火墻青柄,可以考慮使用HTTP(S)伐债,但由于HTTP(S)是短連接,每次創(chuàng)建連接的成本較高致开,所以性能最差峰锁。允許客戶端使用 REST 或 Ajax 的方式進(jìn)行連接,這意味著可以直接使用 Javascript 向 ActiveMQ 發(fā)送消息
    • HTTP配置
    <transportConnector name="http" uri="http://localhost:8080"/>
    
    • HTTPS配置
    <transportConnector name="https" uri="http://localhost:8080"/>
    
  • AMQP:ActiveMQ 5.8新增加的傳輸連接双戳。用于支持AMQP(高級消息隊(duì)列協(xié)議)虹蒋。因?yàn)锳MQP是消息隊(duì)列的標(biāo)準(zhǔn)協(xié)議,而且已經(jīng)越來越被廣泛使用飒货,所以ActiveMQ也支持了此協(xié)議魄衅。AMQP協(xié)議可以搭配NIO或SSL協(xié)議使用,amqp+nio用于提升系統(tǒng)的延展性和性能塘辅。amqp+ssl可以創(chuàng)建安全連接晃虫。
    • amqp配置:
    <transportConnector name="amqp" uri="amqp://localhost:5672"/>
    
    • amqp+nio配置:
    <transportConnector name="amqp+nio" uri="amqp://localhost:5672"/>
    
    • amqp+ssl配置:
    <transportConnector name="amqp+ssl" uri="amqp://localhost:5672"/>
    
  • MQTT:ActiveMQ 5.8新增加的傳輸連接。是一個(gè)輕量級的消息訂閱/發(fā)布協(xié)議扣墩。和AMQP一樣傲茄,同樣支持搭配NIO或SSL使用
<transportConnector name="mqtt" uri="mqtt://localhost:1883"/>

1.7.2 網(wǎng)絡(luò)連接器(network connector)

很多情況下,我們要處理的數(shù)據(jù)可能是海量的沮榜,這種場景單臺服務(wù)器很難支撐,這就要用到集群功能喻粹,為此 ActiveMQ 提供了網(wǎng)絡(luò)連接的模式蟆融,簡單說就是通過把多個(gè)消息服務(wù)器實(shí)例連接在一起作為一個(gè)整體對外提供服務(wù),從而提高整體對外的消息服務(wù)能力守呜。通過這種方式連接在一起的服務(wù)器實(shí)例之間可共享隊(duì)列和消費(fèi)者列表型酥,從而達(dá)到分布式隊(duì)列的目的,網(wǎng)絡(luò)連接器就是用來配置服務(wù)器之間的通信查乒。

如圖所示弥喉,服務(wù)器
網(wǎng)絡(luò)連接器

S1 和 S2 通過 NewworkConnector 相連,生產(chǎn)者 P1 發(fā)送的消息玛迄,消費(fèi)者 C3 和 C4 都可以接收到由境,而生產(chǎn)者 P3 發(fā)送的消息,消費(fèi)者 C1 和 C2 也可以接收到蓖议。要使用網(wǎng)絡(luò)連接器的功能需要在服務(wù)器 S1 的 activemq.xml 中的 broker 節(jié)點(diǎn)下添加如下配置(假設(shè)192.168.11.23:61617 為 S2 的地址):

<networkConnectors>      
          <networkConnector uri="static:(tcp://192.168.11.23:61617)"/>    
</networkConnectors>

如果只是這樣虏杰,S1 可以將消息發(fā)送到 S2,但這只是單方向的通信勒虾,發(fā)送到 S2 上的的消息還不能發(fā)送到 S1 上纺阔。如果想 S1 也收到從 S2 發(fā)來的消息需要在 S2 的 activemq.xml 中的 broker 節(jié)點(diǎn)下也添加如下配置(假設(shè)192.168.11.45:61617為 S1 的地址):

<networkConnectors>      
          <networkConnector uri="static:(tcp://192.168.11.45:61617)"/>    
</networkConnectors>

這樣,S1和S2就可以雙向通信了修然。目前在 ActiveMQ 最新的5.15版本中常用的網(wǎng)絡(luò)連接器協(xié)議有 staticmulticast 兩種:

  • static笛钝,靜態(tài)協(xié)議质况,用于為一個(gè)網(wǎng)絡(luò)中多個(gè)代理創(chuàng)建靜態(tài)配置,這種配置協(xié)議支持復(fù)合的 URI (即包含其他 URI 的 URI)玻靡。例如:static://(tcp://ip:61616,tcp://ip2:61616)
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerA" dataDirectory="${activemq.base}/data">   
      <networkConnectors>  
        <networkConnector uri="static:(tcp://localhost:61617)" />  
      </networkConnectors>  
      <transportConnectors>  
        <transportConnector name="openwire" uri="tcp://localhost:61616" />  
      </transportConnectors> 
    </broker> 
    
  • multicast结榄,多點(diǎn)傳送協(xié)議,消息服務(wù)器會廣播自己的服務(wù)啃奴,也會定位其他代理潭陪。這種方式用于服務(wù)器之間實(shí)現(xiàn)動態(tài)識別,而不是配置靜態(tài)的 IP 組最蕾。默認(rèn)配置:multicast://default
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="multicast" dataDirectory="${activemq.base}/data">
        <networkConnectors>
            <networkConnector name="default-nc" uri="multicast://default"/>
        </networkConnectors>
        <transportConnectors>
            <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
        </transportConnectors>
    </broker>
    

2.1 添加依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>priv.simon.boot</groupId>
    <artifactId>activemq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>activemq</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2.2 點(diǎn)對點(diǎn)配置

spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin

2.3 發(fā)布/訂閱配置

Spring Boot 默認(rèn)開啟點(diǎn)對點(diǎn)模式依溯,發(fā)布訂閱模式需要手動開啟

spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.jms.pub-sub-domain=true

2.4 消息提供者

@Service
public class ProviderService {
  @Autowired
  JmsMessagingTemplate jmsMessagingTemplate;
  /**
   * 點(diǎn)對點(diǎn)消息發(fā)送
   */
  public void sendPTPMessage(String message){
    ActiveMQQueue queue = new ActiveMQQueue("queue");
    jmsMessagingTemplate.convertAndSend(queue,message);
  }
  /**
   * 發(fā)布訂閱消息發(fā)送
   */
  public void sendPubSubMesage(String message){
    ActiveMQTopic topic = new ActiveMQTopic("topic");
    jmsMessagingTemplate.convertAndSend(topic,message);
  }
}

2.5 消息消費(fèi)者

@Service
public class ConsumerService {
  /**
   * 監(jiān)聽點(diǎn)對點(diǎn)消息
   */
  @JmsListener(destination = "queue")
  public void receiveQueue(String message){
    System.err.println("queue收到的消息:"+message);
  }
  /**
   * 監(jiān)聽發(fā)布訂閱消息
   */
  @JmsListener(destination = "topic")
  public void receiveTopic(String message){
    System.err.println("topic收到的消息:"+message);
  }
}

2.6 測試類

@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqApplicationTests {

  @Autowired
  private ProviderService providerService;

  @Test
  public void ptpTest() {
    providerService.sendPTPMessage("ptp hello");
  }

  @Test
  public void pubSubTest() {
    providerService.sendPubSubMesage("pub/sub hello");
  }

}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市瘟则,隨后出現(xiàn)的幾起案子黎炉,更是在濱河造成了極大的恐慌,老刑警劉巖醋拧,帶你破解...
    沈念sama閱讀 217,406評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件慷嗜,死亡現(xiàn)場離奇詭異,居然都是意外死亡丹壕,警方通過查閱死者的電腦和手機(jī)庆械,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,732評論 3 393
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來菌赖,“玉大人缭乘,你說我怎么就攤上這事×鹩茫” “怎么了堕绩?”我有些...
    開封第一講書人閱讀 163,711評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長邑时。 經(jīng)常有香客問我奴紧,道長,這世上最難降的妖魔是什么晶丘? 我笑而不...
    開封第一講書人閱讀 58,380評論 1 293
  • 正文 為了忘掉前任黍氮,我火速辦了婚禮,結(jié)果婚禮上浅浮,老公的妹妹穿的比我還像新娘滤钱。我一直安慰自己,他們只是感情好脑题,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,432評論 6 392
  • 文/花漫 我一把揭開白布件缸。 她就那樣靜靜地躺著,像睡著了一般叔遂。 火紅的嫁衣襯著肌膚如雪他炊。 梳的紋絲不亂的頭發(fā)上争剿,一...
    開封第一講書人閱讀 51,301評論 1 301
  • 那天,我揣著相機(jī)與錄音痊末,去河邊找鬼蚕苇。 笑死,一個(gè)胖子當(dāng)著我的面吹牛凿叠,可吹牛的內(nèi)容都是我干的涩笤。 我是一名探鬼主播,決...
    沈念sama閱讀 40,145評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼盒件,長吁一口氣:“原來是場噩夢啊……” “哼蹬碧!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起炒刁,我...
    開封第一講書人閱讀 39,008評論 0 276
  • 序言:老撾萬榮一對情侶失蹤恩沽,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后翔始,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體罗心,經(jīng)...
    沈念sama閱讀 45,443評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,649評論 3 334
  • 正文 我和宋清朗相戀三年城瞎,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了渤闷。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,795評論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡脖镀,死狀恐怖肤晓,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情认然,我是刑警寧澤,帶...
    沈念sama閱讀 35,501評論 5 345
  • 正文 年R本政府宣布漫萄,位于F島的核電站卷员,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏腾务。R本人自食惡果不足惜毕骡,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,119評論 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望岩瘦。 院中可真熱鬧未巫,春花似錦、人聲如沸启昧。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,731評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽密末。三九已至握爷,卻和暖如春跛璧,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背新啼。 一陣腳步聲響...
    開封第一講書人閱讀 32,865評論 1 269
  • 我被黑心中介騙來泰國打工追城, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人燥撞。 一個(gè)月前我還...
    沈念sama閱讀 47,899評論 2 370
  • 正文 我出身青樓座柱,卻偏偏與公主長得像,于是被迫代替她去往敵國和親物舒。 傳聞我的和親對象是個(gè)殘疾皇子色洞,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,724評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 消息中間件 消息中間件有很多的用途和優(yōu)點(diǎn): 1. 將數(shù)據(jù)從一個(gè)應(yīng)用程序傳送到另一個(gè)應(yīng)用程序,或者從軟件的一個(gè)模塊傳...
    錯(cuò)位的季節(jié)閱讀 792評論 0 1
  • WebSocket入門篇 背景介紹最近突然發(fā)現(xiàn)有一種http協(xié)議不能很好的滿足的一種需求茶鉴。具體來說就是锋玲,頁面的某個(gè)...
    rpf_siwash閱讀 4,407評論 1 4
  • 中間件在中大型的系統(tǒng)中應(yīng)用較為廣泛,主要用來解決系統(tǒng)模塊之間的強(qiáng)耦合關(guān)系涵叮;也就是說消息中間件不需要同步返回結(jié)果惭蹂,也...
    帥可兒妞閱讀 308評論 0 0
  • 中間件 非底層操作系統(tǒng)軟件,非業(yè)務(wù)應(yīng)用軟件割粮,不能直接給最終用戶使用和帶來價(jià)值的軟件盾碗。 消息中間件 關(guān)注于數(shù)據(jù)的發(fā)送...
    wch853閱讀 1,019評論 0 0
  • 佇立遠(yuǎn)方的河岸 巴山雪兒 遠(yuǎn)方有一條河 依稀里 水草用夢編織一條長長...
    興安居士閱讀 306評論 2 6