ActiveMQ基礎(chǔ)
JMS(JAVA Message Service,java消息服務(wù))API是一個(gè)消息服務(wù)的標(biāo)準(zhǔn)或者說是規(guī)范闹啦,允許應(yīng)用程序組件基于JavaEE平臺(tái)創(chuàng)建特愿、發(fā)送、接收和讀取消息。它使分布式通信耦合度更低,消息服務(wù)更加可靠以及異步性。
基本概念
JMS是java的消息服務(wù)圃郊,JMS的客戶端之間可以通過JMS服務(wù)進(jìn)行異步的消息傳輸。
消息模型
○ Point-to-Point(P2P)
○ Publish/Subscribe(Pub/Sub)
即點(diǎn)對(duì)點(diǎn)和發(fā)布訂閱模型
JMS編程模型
(1) ConnectionFactory
創(chuàng)建Connection對(duì)象的工廠女蜈,針對(duì)兩種不同的jms消息模型持舆,分別有QueueConnectionFactory和TopicConnectionFactory兩種色瘩。可以通過JNDI來查找ConnectionFactory對(duì)象逸寓。
(2) Destination
Destination的意思是消息生產(chǎn)者的消息發(fā)送目標(biāo)或者說消息消費(fèi)者的消息來源居兆。對(duì)于消息生產(chǎn)者來說,它的Destination是某個(gè)隊(duì)列(Queue)或某個(gè)主題(Topic);對(duì)于消息消費(fèi)者來說竹伸,它的Destination也是某個(gè)隊(duì)列或主題(即消息來源)泥栖。
所以,Destination實(shí)際上就是兩種類型的對(duì)象:Queue勋篓、Topic可以通過JNDI來查找Destination吧享。
(3) Connection
Connection表示在客戶端和JMS系統(tǒng)之間建立的鏈接(對(duì)TCP/IP socket的包裝)。Connection可以產(chǎn)生一個(gè)或多個(gè)Session譬嚣。跟ConnectionFactory一樣钢颂,Connection也有兩種類型:QueueConnection和TopicConnection。
(4) Session
Session是我們操作消息的接口拜银∈獗蓿可以通過session創(chuàng)建生產(chǎn)者、消費(fèi)者尼桶、消息等操灿。Session提供了事務(wù)的功能。當(dāng)我們需要使用session發(fā)送/接收多個(gè)消息時(shí)泵督,可以將這些發(fā)送/接收動(dòng)作放到一個(gè)事務(wù)中趾盐。同樣,也分QueueSession和TopicSession幌蚊。
(5) 消息的生產(chǎn)者
消息生產(chǎn)者由Session創(chuàng)建谤碳,并用于將消息發(fā)送到Destination。同樣溢豆,消息生產(chǎn)者分兩種類型:QueueSender和TopicPublisher∪诚郏可以調(diào)用消息生產(chǎn)者的方法(send或publish方法)發(fā)送消息漩仙。
(6) 消息消費(fèi)者
消息消費(fèi)者由Session創(chuàng)建,用于接收被發(fā)送到Destination的消息犹赖。兩種類型:QueueReceiver和TopicSubscriber队他。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來創(chuàng)建峻村。當(dāng)然麸折,也可以session的creatDurableSubscriber方法來創(chuàng)建持久化的訂閱者。
(7) MessageListener
消息監(jiān)聽器粘昨。如果注冊(cè)了消息監(jiān)聽器垢啼,一旦消息到達(dá)窜锯,將自動(dòng)調(diào)用監(jiān)聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener芭析。
ActiveMQ下載及安裝
這個(gè)網(wǎng)上例子比較多锚扎,不同操作系統(tǒng)方式不同,這里不做說明馁启。
Mac可以使用brew命令來進(jìn)行安裝驾孔,也比較簡(jiǎn)單
brew install activemq
下載安裝成功以后,啟動(dòng)activemq
activemq start
然后本地瀏覽器中輸入http://localhost:8161/查看是否啟動(dòng)成功惯疙,登錄用戶及密碼為admin/admin
Spring集成
1.使用maven引入activemq包
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.2</version>
</dependency>
2.配置spring文件
<!-- 真正可以產(chǎn)生Connection的ConnectionFactory翠勉,由對(duì)應(yīng)的 JMS服務(wù)廠商提供-->
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"></property>
</bean>
<!-- Spring Caching連接工廠 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="cachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目標(biāo)ConnectionFactory對(duì)應(yīng)真實(shí)的可以產(chǎn)生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- Session緩存數(shù)量 -->
<property name="sessionCacheSize" value="10"></property>
</bean>
消息提供者/發(fā)布者配置
jmsTemplate配置
<!-- Spring提供的JMS工具類,它可以進(jìn)行消息發(fā)送霉颠、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個(gè)connectionFactory對(duì)應(yīng)的是我們定義的Spring提供的那個(gè)ConnectionFactory對(duì)象 -->
<constructor-arg ref="cachingConnectionFactory"/>
</bean>
消息模型配置
<!-- 點(diǎn)對(duì)點(diǎn)隊(duì)列 -->
<bean id="mqQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="MessageQueue" />
</bean>
<!-- 發(fā)布/消費(fèi) -->
<bean id="mqTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="MessageTopic" />
</bean>
Java代碼
消息提供者/發(fā)布者
package com.permission.core.activemq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import javax.jms.*;
/**
* @Description: 消息提供者/發(fā)布者
* @Author:
* @Date: Created in 下午5:44 2018/1/11
*/
@Component
public class MessageQueueSender {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination mqQueue;
@Autowired
private Destination mqTopic;
public void sendMessage(String message) {
jmsTemplate.send(mqTopic, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(message);
return textMessage;
}
});
}
}
如果是點(diǎn)對(duì)點(diǎn)模式对碌,jmsTemplate.send(mqQueue,****)
如果是發(fā)布/訂閱模式,jmsTemplate.send(mqTopic,****)
消息接收者/消費(fèi)者配置
如果跟消息提供者不在一個(gè)工程下掉分,也需要配置connectionFactory
<!-- 真正可以產(chǎn)生Connection的ConnectionFactory俭缓,由對(duì)應(yīng)的 JMS服務(wù)廠商提供-->
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"></property>
</bean>
<!-- Spring Caching連接工廠 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="cachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目標(biāo)ConnectionFactory對(duì)應(yīng)真實(shí)的可以產(chǎn)生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- Session緩存數(shù)量 -->
<property name="sessionCacheSize" value="10"></property>
</bean>
配置消息監(jiān)聽
<!-- 消息監(jiān)聽器 -->
<bean id="consumerMessageListener" class="com.bigdata.core.activemq.ConsumerMessageListener" />
<jms:listener-container container-type="default" destination-type="topic" connection-factory="cachingConnectionFactory" acknowledge="auto">
<jms:listener destination="MessageTopic" ref="consumerMessageListener"/>
</jms:listener-container>
這里需要注意,如果是點(diǎn)對(duì)點(diǎn)模式 jms:listener-container
標(biāo)簽屬性destination-type
需要配置為queue
酥郭,默認(rèn)情況為queue
华坦;如果是發(fā)布/訂閱模式jms:listener-container
標(biāo)簽屬性destination-type
需要配置為topic
消息接收者/訂閱者 java代碼
package com.bigdata.core.activemq;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* @Description: TODO
* @Author:
* @Date: Created in 下午4:28 2018/1/11
*/
public class ConsumerMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try{
System.out.println("接收到的消息內(nèi)容是:" + textMessage.getText());
}catch (Exception e){
e.printStackTrace();
}
}
}
java代碼監(jiān)聽需要實(shí)現(xiàn)接口MessageListener
此外可以在amqConnectionFactory中配置消息傳輸監(jiān)聽器,用以處理網(wǎng)絡(luò)異常及服務(wù)器異常,配置信息如下:
<!-- 真正可以產(chǎn)生Connection的ConnectionFactory不从,由對(duì)應(yīng)的 JMS服務(wù)廠商提供-->
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"></property>
<!-- 消息傳輸監(jiān)聽器 處理網(wǎng)絡(luò)及服務(wù)器異常 -->
<property name="transportListener">
<bean class="com.permission.core.activemq.ActiveMQTransportListener" />
</property>
</bean>
ActiveMQTransportListener代碼
package com.permission.core.activemq;
import org.apache.activemq.transport.TransportListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* @Description: TODO
* @Author:
* @Date: Created in 下午4:31 2018/1/11
*/
public class ActiveMQTransportListener implements TransportListener {
private final static Logger log = LoggerFactory.getLogger(ActiveMQTransportListener.class);
@Override
public void onCommand(Object o) {
log.info("onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控 ...");
}
@Override
public void onException(IOException e) {
log.error("onException -> 消息服務(wù)器連接錯(cuò)誤......", e);
}
@Override
public void transportInterupted() {
log.warn("transportInterupted -> 消息服務(wù)器連接發(fā)生中斷...");
}
@Override
public void transportResumed() {
log.info("transportResumed -> 消息服務(wù)器連接已恢復(fù)...");
}
}
訂閱著/接收者消息傳送監(jiān)聽器會(huì)對(duì)所有數(shù)據(jù)進(jìn)行監(jiān)控惜姐,如下:
接收到的消息內(nèi)容是:發(fā)布消息:0
[2018-01-16 15:06:35.202] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0 at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控 ...
接收到的消息內(nèi)容是:發(fā)布消息:1
[2018-01-16 15:06:35.206] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0 at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控 ...
接收到的消息內(nèi)容是:發(fā)布消息:2
[2018-01-16 15:06:35.209] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0 at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控 ...
接收到的消息內(nèi)容是:發(fā)布消息:3
[2018-01-16 15:06:35.214] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0 at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控 ...
接收到的消息內(nèi)容是:發(fā)布消息:4
[2018-01-16 15:06:35.216] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0 at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控 ...
接收到的消息內(nèi)容是:發(fā)布消息:5
[2018-01-16 15:06:35.220] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0 at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控 ...
接收到的消息內(nèi)容是:發(fā)布消息:6
[2018-01-16 15:06:35.222] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0 at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控 ...
接收到的消息內(nèi)容是:發(fā)布消息:7
[2018-01-16 15:06:35.223] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0 at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控 ...
接收到的消息內(nèi)容是:發(fā)布消息:8
[2018-01-16 15:06:35.225] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0 at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控 ...
接收到的消息內(nèi)容是:發(fā)布消息:9
[2018-01-16 15:06:35.226] [INFO] [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@56196] [Caller+0 at com.bigdata.core.activemq.ActiveMQTransportListener.onCommand(ActiveMQTransportListener.java:21)
] >>> onCommand -> 對(duì)消息傳輸命令進(jìn)行監(jiān)控 ...
如果activemq服務(wù)器down掉,消息傳輸會(huì)輸出
2018-01-16 15:09:26,467 WARN [ActiveMQ Connection Executor: tcp://localhost/127.0.0.1:61616@56068] org.springframework.jms.connection.CachingConnectionFactory#onException[SingleConnectionFactory.java:322] Encountered a JMSException - resetting the underlying JMS Connection
javax.jms.JMSException: java.io.EOFException
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54)
at org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1952)
at org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:1971)
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)
at org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:126)
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)
at org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.java:173)
at org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:345)
at org.apache.activemq.transport.TransportSupport.onException(TransportSupport.java:96)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:219)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException: null
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
... 1 common frames omitted
2018-01-16 15:09:26,479 ERROR [ActiveMQ Connection Executor: tcp://localhost/127.0.0.1:61616@56068] com.permission.core.activemq.ActiveMQTransportListener#onException[ActiveMQTransportListener.java:26] onException -> 消息服務(wù)器連接錯(cuò)誤......
java.io.EOFException: null
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
at java.lang.Thread.run(Thread.java:748)