Spring集成ActiveMQ

ActiveMQ基礎(chǔ)

JMS(JAVA Message Service,java消息服務(wù))API是一個(gè)消息服務(wù)的標(biāo)準(zhǔn)或者說是規(guī)范闹啦,允許應(yīng)用程序組件基于JavaEE平臺(tái)創(chuàng)建特愿、發(fā)送、接收和讀取消息。它使分布式通信耦合度更低,消息服務(wù)更加可靠以及異步性。

基本概念

JMS是java的消息服務(wù)圃郊,JMS的客戶端之間可以通過JMS服務(wù)進(jìn)行異步的消息傳輸。

消息模型

○ Point-to-Point(P2P)

○ Publish/Subscribe(Pub/Sub)

即點(diǎn)對(duì)點(diǎn)和發(fā)布訂閱模型

JMS編程模型

(1) ConnectionFactory

創(chuàng)建Connection對(duì)象的工廠女蜈,針對(duì)兩種不同的jms消息模型持舆,分別有QueueConnectionFactory和TopicConnectionFactory兩種色瘩。可以通過JNDI來查找ConnectionFactory對(duì)象逸寓。

(2) Destination

Destination的意思是消息生產(chǎn)者的消息發(fā)送目標(biāo)或者說消息消費(fèi)者的消息來源居兆。對(duì)于消息生產(chǎn)者來說,它的Destination是某個(gè)隊(duì)列(Queue)或某個(gè)主題(Topic);對(duì)于消息消費(fèi)者來說竹伸,它的Destination也是某個(gè)隊(duì)列或主題(即消息來源)泥栖。

所以,Destination實(shí)際上就是兩種類型的對(duì)象:Queue勋篓、Topic可以通過JNDI來查找Destination吧享。

(3) Connection

Connection表示在客戶端和JMS系統(tǒng)之間建立的鏈接(對(duì)TCP/IP socket的包裝)。Connection可以產(chǎn)生一個(gè)或多個(gè)Session譬嚣。跟ConnectionFactory一樣钢颂,Connection也有兩種類型:QueueConnection和TopicConnection。

(4) Session

Session是我們操作消息的接口拜银∈獗蓿可以通過session創(chuàng)建生產(chǎn)者、消費(fèi)者尼桶、消息等操灿。Session提供了事務(wù)的功能。當(dāng)我們需要使用session發(fā)送/接收多個(gè)消息時(shí)泵督,可以將這些發(fā)送/接收動(dòng)作放到一個(gè)事務(wù)中趾盐。同樣,也分QueueSession和TopicSession幌蚊。

(5) 消息的生產(chǎn)者

消息生產(chǎn)者由Session創(chuàng)建谤碳,并用于將消息發(fā)送到Destination。同樣溢豆,消息生產(chǎn)者分兩種類型:QueueSender和TopicPublisher∪诚郏可以調(diào)用消息生產(chǎn)者的方法(send或publish方法)發(fā)送消息漩仙。

(6) 消息消費(fèi)者

消息消費(fèi)者由Session創(chuàng)建,用于接收被發(fā)送到Destination的消息犹赖。兩種類型:QueueReceiver和TopicSubscriber队他。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來創(chuàng)建峻村。當(dāng)然麸折,也可以session的creatDurableSubscriber方法來創(chuàng)建持久化的訂閱者。

(7) MessageListener

消息監(jiān)聽器粘昨。如果注冊(cè)了消息監(jiān)聽器垢啼,一旦消息到達(dá)窜锯,將自動(dòng)調(diào)用監(jiān)聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener芭析。

ActiveMQ下載及安裝

這個(gè)網(wǎng)上例子比較多锚扎,不同操作系統(tǒng)方式不同,這里不做說明馁启。
Mac可以使用brew命令來進(jìn)行安裝驾孔,也比較簡(jiǎn)單

brew install activemq
image.png

下載安裝成功以后,啟動(dòng)activemq

activemq start

然后本地瀏覽器中輸入http://localhost:8161/查看是否啟動(dòng)成功惯疙,登錄用戶及密碼為admin/admin

image.png

Spring集成

1.使用maven引入activemq包

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
  <version>5.15.2</version>
</dependency>

2.配置spring文件

<!-- 真正可以產(chǎn)生Connection的ConnectionFactory翠勉,由對(duì)應(yīng)的 JMS服務(wù)廠商提供-->
    <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"></property>
    </bean>
<!-- Spring Caching連接工廠 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="cachingConnectionFactory"
          class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目標(biāo)ConnectionFactory對(duì)應(yīng)真實(shí)的可以產(chǎn)生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <!-- Session緩存數(shù)量 -->
        <property name="sessionCacheSize" value="10"></property>
    </bean>

消息提供者/發(fā)布者配置

jmsTemplate配置

<!-- Spring提供的JMS工具類,它可以進(jìn)行消息發(fā)送霉颠、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這個(gè)connectionFactory對(duì)應(yīng)的是我們定義的Spring提供的那個(gè)ConnectionFactory對(duì)象 -->
        <constructor-arg ref="cachingConnectionFactory"/>
    </bean>

消息模型配置

   <!-- 點(diǎn)對(duì)點(diǎn)隊(duì)列 -->   
    <bean id="mqQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="MessageQueue" />
    </bean>
   <!-- 發(fā)布/消費(fèi) -->
    <bean id="mqTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="MessageTopic" />
    </bean>

Java代碼

消息提供者/發(fā)布者

package com.permission.core.activemq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import javax.jms.*;

/**
 * @Description: 消息提供者/發(fā)布者
 * @Author: 
 * @Date: Created in 下午5:44 2018/1/11
 */
@Component
public class MessageQueueSender {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    private Destination mqQueue;

    @Autowired
    private Destination mqTopic;


    public void sendMessage(String message) {
        jmsTemplate.send(mqTopic, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage(message);
                return textMessage;
            }
        });
    }
}

如果是點(diǎn)對(duì)點(diǎn)模式对碌,jmsTemplate.send(mqQueue,****)
如果是發(fā)布/訂閱模式,jmsTemplate.send(mqTopic,****)

消息接收者/消費(fèi)者配置

如果跟消息提供者不在一個(gè)工程下掉分,也需要配置connectionFactory

    <!-- 真正可以產(chǎn)生Connection的ConnectionFactory俭缓,由對(duì)應(yīng)的 JMS服務(wù)廠商提供-->
    <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"></property>
    </bean>

    <!-- Spring Caching連接工廠 -->
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="cachingConnectionFactory"
          class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目標(biāo)ConnectionFactory對(duì)應(yīng)真實(shí)的可以產(chǎn)生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <!-- Session緩存數(shù)量 -->
        <property name="sessionCacheSize" value="10"></property>
    </bean>

配置消息監(jiān)聽

    <!-- 消息監(jiān)聽器 -->
    <bean id="consumerMessageListener" class="com.bigdata.core.activemq.ConsumerMessageListener" />

    <jms:listener-container container-type="default" destination-type="topic" connection-factory="cachingConnectionFactory" acknowledge="auto">
        <jms:listener destination="MessageTopic" ref="consumerMessageListener"/>
    </jms:listener-container>

這里需要注意,如果是點(diǎn)對(duì)點(diǎn)模式 jms:listener-container標(biāo)簽屬性destination-type需要配置為queue酥郭,默認(rèn)情況為queue华坦;如果是發(fā)布/訂閱模式jms:listener-container標(biāo)簽屬性destination-type需要配置為topic

消息接收者/訂閱者 java代碼

package com.bigdata.core.activemq;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * @Description: TODO
 * @Author: 
 * @Date: Created in 下午4:28 2018/1/11
 */
public class ConsumerMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try{
            System.out.println("接收到的消息內(nèi)容是:" + textMessage.getText());
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

java代碼監(jiān)聽需要實(shí)現(xiàn)接口MessageListener

此外可以在amqConnectionFactory中配置消息傳輸監(jiān)聽器,用以處理網(wǎng)絡(luò)異常及服務(wù)器異常,配置信息如下:

   <!-- 真正可以產(chǎn)生Connection的ConnectionFactory不从,由對(duì)應(yīng)的 JMS服務(wù)廠商提供-->
   <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
       <property name="brokerURL" value="tcp://localhost:61616"></property>
       <!-- 消息傳輸監(jiān)聽器 處理網(wǎng)絡(luò)及服務(wù)器異常 -->
       <property name="transportListener">
           <bean class="com.permission.core.activemq.ActiveMQTransportListener" />
       </property>
   </bean>

ActiveMQTransportListener代碼

package com.permission.core.activemq;

import org.apache.activemq.transport.TransportListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
 * @Description: TODO
 * @Author: 
 * @Date: Created in 下午4:31 2018/1/11
 */
public class ActiveMQTransportListener implements TransportListener {

    private final static Logger log = LoggerFactory.getLogger(ActiveMQTransportListener.class);

    @Override
    public void onCommand(Object o) {
        log.info("onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控  ...");
    }

    @Override
    public void onException(IOException e) {
        log.error("onException -> 消息服務(wù)器連接錯(cuò)誤......", e);
    }

    @Override
    public void transportInterupted() {
        log.warn("transportInterupted -> 消息服務(wù)器連接發(fā)生中斷...");
    }

    @Override
    public void transportResumed() {
        log.info("transportResumed -> 消息服務(wù)器連接已恢復(fù)...");
    }
}

訂閱著/接收者消息傳送監(jiān)聽器會(huì)對(duì)所有數(shù)據(jù)進(jìn)行監(jiān)控惜姐,如下:

接收到的消息內(nèi)容是:發(fā)布消息:0
[2018-01-16 15:06:35.202] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控  ...
接收到的消息內(nèi)容是:發(fā)布消息:1
[2018-01-16 15:06:35.206] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控  ...
接收到的消息內(nèi)容是:發(fā)布消息:2
[2018-01-16 15:06:35.209] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控  ...
接收到的消息內(nèi)容是:發(fā)布消息:3
[2018-01-16 15:06:35.214] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控  ...
接收到的消息內(nèi)容是:發(fā)布消息:4
[2018-01-16 15:06:35.216] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控  ...
接收到的消息內(nèi)容是:發(fā)布消息:5
[2018-01-16 15:06:35.220] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控  ...
接收到的消息內(nèi)容是:發(fā)布消息:6
[2018-01-16 15:06:35.222] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控  ...
接收到的消息內(nèi)容是:發(fā)布消息:7
[2018-01-16 15:06:35.223] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控  ...
接收到的消息內(nèi)容是:發(fā)布消息:8
[2018-01-16 15:06:35.225] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控  ...
接收到的消息內(nèi)容是:發(fā)布消息:9
[2018-01-16 15:06:35.226] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0   at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控  ...

如果activemq服務(wù)器down掉,消息傳輸會(huì)輸出

2018-01-16 15:09:26,467 WARN [ActiveMQ Connection Executor: tcp://localhost/127.0.0.1:61616@56068] org.springframework.jms.connection.CachingConnectionFactory#onException[SingleConnectionFactory.java:322] Encountered a JMSException - resetting the underlying JMS Connection
javax.jms.JMSException: java.io.EOFException
    at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54)
    at org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1952)
    at org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:1971)
    at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)
    at org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:126)
    at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)
    at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)
    at org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.java:173)
    at org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:345)
    at org.apache.activemq.transport.TransportSupport.onException(TransportSupport.java:96)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:219)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException: null
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
    ... 1 common frames omitted
2018-01-16 15:09:26,479 ERROR [ActiveMQ Connection Executor: tcp://localhost/127.0.0.1:61616@56068] com.permission.core.activemq.ActiveMQTransportListener#onException[ActiveMQTransportListener.java:26] onException -> 消息服務(wù)器連接錯(cuò)誤......
java.io.EOFException: null
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
    at java.lang.Thread.run(Thread.java:748)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末椿息,一起剝皮案震驚了整個(gè)濱河市歹袁,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌寝优,老刑警劉巖条舔,帶你破解...
    沈念sama閱讀 212,383評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異乏矾,居然都是意外死亡孟抗,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門钻心,熙熙樓的掌柜王于貴愁眉苦臉地迎上來凄硼,“玉大人,你說我怎么就攤上這事捷沸√粒” “怎么了?”我有些...
    開封第一講書人閱讀 157,852評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵痒给,是天一觀的道長(zhǎng)说墨。 經(jīng)常有香客問我骏全,道長(zhǎng),這世上最難降的妖魔是什么婉刀? 我笑而不...
    開封第一講書人閱讀 56,621評(píng)論 1 284
  • 正文 為了忘掉前任吟温,我火速辦了婚禮,結(jié)果婚禮上突颊,老公的妹妹穿的比我還像新娘鲁豪。我一直安慰自己,他們只是感情好律秃,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,741評(píng)論 6 386
  • 文/花漫 我一把揭開白布爬橡。 她就那樣靜靜地躺著,像睡著了一般棒动。 火紅的嫁衣襯著肌膚如雪糙申。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,929評(píng)論 1 290
  • 那天船惨,我揣著相機(jī)與錄音柜裸,去河邊找鬼。 笑死粱锐,一個(gè)胖子當(dāng)著我的面吹牛疙挺,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播怜浅,決...
    沈念sama閱讀 39,076評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼铐然,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了恶座?” 一聲冷哼從身側(cè)響起搀暑,我...
    開封第一講書人閱讀 37,803評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎跨琳,沒想到半個(gè)月后自点,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,265評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡脉让,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,582評(píng)論 2 327
  • 正文 我和宋清朗相戀三年樟氢,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片侠鳄。...
    茶點(diǎn)故事閱讀 38,716評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖死宣,靈堂內(nèi)的尸體忽然破棺而出伟恶,到底是詐尸還是另有隱情,我是刑警寧澤毅该,帶...
    沈念sama閱讀 34,395評(píng)論 4 333
  • 正文 年R本政府宣布博秫,位于F島的核電站潦牛,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏挡育。R本人自食惡果不足惜巴碗,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,039評(píng)論 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望即寒。 院中可真熱鬧橡淆,春花似錦、人聲如沸母赵。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽凹嘲。三九已至师倔,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間周蹭,已是汗流浹背趋艘。 一陣腳步聲響...
    開封第一講書人閱讀 32,027評(píng)論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留凶朗,地道東北人瓷胧。 一個(gè)月前我還...
    沈念sama閱讀 46,488評(píng)論 2 361
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像俱尼,于是被迫代替她去往敵國和親抖单。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,612評(píng)論 2 350