1呐芥、ActiveMq安裝啟動(dòng)
1、從官網(wǎng)http://activemq.apache.org/download.html下載對應(yīng)的版本奋岁,并解壓
2思瘟、設(shè)置activeMQ配置環(huán)境.activemqrc :bin/activemq setup ~/.activemqrc
3、到bin目錄下執(zhí)行啟動(dòng)腳本:bin/activemq start
4闻伶、查看端口號是否監(jiān)聽:netstat -an|grep 61616
5滨攻、訪問activeMQ管理界面:如果是本機(jī)就是http://localhost:8161/admin
2、spring整合jms--基于ActiveMQ
1蓝翰、增加依賴(gradle版本)
apply plugin: 'java'
apply plugin: 'jetty'
apply plugin: 'idea'
apply plugin: 'eclipse'
sourceSets {
main {
resources.srcDirs += 'src/main/webapp'
}
test {
resources.srcDirs += 'src/test/webapp'
}
}
repositories {
mavenRepo urls: ["http://maven.lujs.cn/nexus/content/groups/public/"]
mavenCentral()
mavenLocal()
}
dependencies {
spring_version = "3.0.5.RELEASE"
spring = ["org.springframework:spring-core:$spring_version",
"org.springframework:spring-expression:$spring_version",
"org.springframework:spring-beans:$spring_version",
"org.springframework:spring-aop:$spring_version",
"org.springframework:spring-context:$spring_version",
"org.springframework:spring-context-support:$spring_version",
"org.springframework:spring-tx:$spring_version",
"org.springframework:spring-orm:$spring_version",
"org.springframework:spring-web:$spring_version",
"org.springframework:spring-asm:$spring_version",
"org.springframework:spring-jdbc:$spring_version",
"org.springframework:spring-webmvc:$spring_version",
"org.springframework:spring-test:$spring_version",
"org.springframework:spring-jms:$spring_version",
"aopalliance:aopalliance:1.0"]
apache = ["org.apache.velocity:velocity:1.7",
"org.apache.velocity:velocity-tools:2.0",
"commons-logging:commons-logging:1.1.1",
"commons-io:commons-io:2.0.1",
"commons-codec:commons-codec:1.5"]
activemq_version = "5.6.0"
activemq = ["org.apache.activemq:activemq-web:$activemq_version",
"org.apache.activemq:activemq-camel:$activemq_version",
"org.apache.activemq:activemq-pool:$activemq_version",
"org.apache.activemq:activemq-all:$activemq_version",
"org.apache.activemq:activemq-core:$activemq_version",
"org.apache.activemq:activemq-jaas:$activemq_version",
"org.apache.activemq:activeio-core:3.1.4",
"dom4j:dom4j:1.6.1",
"commons-pool:commons-pool:1.5.6",
"org.apache.geronimo.specs:geronimo-j2ee-management_1.1_spec:1.0.1",
"org.apache.geronimo.specs:geronimo-jms_1.1_spec:1.1.1"
]
junit = [
"junit:junit:4.10"
]
gson = ["com.google.code.gson:gson:1.7.2"]
dubbo_version = "2.0.13"
dubbo = ["com.alibaba:dubbo:$dubbo_version"]
zookeeper = ["org.apache.zookeeper:zookeeper:3.4.9"]
log4j = ["log4j:log4j:1.2.16"]
guava = ["com.google.guava:guava:18.0"]
apollo = ["com.ctrip.framework.apollo:apollo-client:0.9.1-SNAPSHOT"]
netty = ["io.netty:netty-all:4.1.8.Final"]
jsr = ["javax.annotation:jsr250-api:1.0"]
compile spring, gson, dubbo, zookeeper, log4j, guava, apollo, netty, apache, activemq, junit
testCompile group: 'junit', name: 'junit', version: '4.12'
}
task "create-dirs" << {
sourceSets*.java.srcDirs*.each { it.mkdirs() }
sourceSets*.resources.srcDirs*.each { it.mkdirs() }
file("release").mkdir()
file("src/main/webapp/WEB-INF").mkdirs()
}
task wrapper(type: Wrapper) {
outputs.upToDateWhen { false }
distributionUrl = 'http://lujs.cn/lts/static/software/gradle-1.0-milestone-4.zip'
}
stopKey = 'stop-jetty'
stopPort = 8881
httpPort = 8880
2光绕、配置ConnectionFactory
ConnectionFactory是用于產(chǎn)生到JMS服務(wù)器的鏈接的,Spring為我們提供了多個(gè)ConnectionFactory畜份,有SingleConnectionFactory和CachingConnectionFactory诞帐。SingleConnectionFactory對于建立JMS服務(wù)器鏈接的請求會(huì)一直返回同一個(gè)鏈接,并且會(huì)忽略Connection的close方法調(diào)用爆雹。CachingConnectionFactory繼承了SingleConnectionFactory停蕉,所以它擁有SingleConnectionFactory的所有功能,同時(shí)它還新增了緩存功能钙态,它可以緩存Session慧起、MessageProducer和MessageConsumer。除此之外驯绎, ActiveMQ為我們提供了一個(gè)PooledConnectionFactory完慧,通過往里面注入一個(gè)ActiveMQConnectionFactory可以用來將Connection、Session和MessageProducer池化剩失,這樣可以大大的減少我們的資源消耗屈尼。
Spring提供的ConnectionFactory只是Spring用于管理ConnectionFactory的,真正產(chǎn)生到JMS服務(wù)器鏈接的ConnectionFactory還得是由JMS服務(wù)廠商提供拴孤,并且需要把它注入到Spring提供的ConnectionFactory中脾歧。我們這里使用的是ActiveMQ實(shí)現(xiàn)的JMS,所以在我們這里真正的可以產(chǎn)生Connection的就應(yīng)該是由ActiveMQ提供的ConnectionFactory演熟。這里MQ配置的是本地鞭执,實(shí)例如下:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<constructor-arg value="tcp://localhost:61616"></constructor-arg>
</bean>
<!--<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">-->
<!--<property name="connectionFactory" ref="connectionFactory"/>-->
<!--</bean>-->
<bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="connectionFactory"></property>
</bean>
3、配置生產(chǎn)者
生產(chǎn)者負(fù)責(zé)產(chǎn)生消息并發(fā)送到JMS服務(wù)器芒粹,這通常對應(yīng)的是我們的一個(gè)業(yè)務(wù)邏輯服務(wù)實(shí)現(xiàn)類兄纺。我們可以利用Spring為我們提供的JmsTemplate類來實(shí)現(xiàn)的,使用JmsTemplate進(jìn)行消息發(fā)送時(shí)沒有指定destination的時(shí)候?qū)⑹褂媚J(rèn)的Destination化漆。默認(rèn)Destination可以通過在定義jmsTemplate bean對象時(shí)通過屬性defaultDestination或defaultDestinationName來進(jìn)行注入估脆,defaultDestinationName對應(yīng)的就是一個(gè)普通字符串。另一種方式是使用jms提供的messageProducer實(shí)現(xiàn)座云,在ActiveMQ中實(shí)現(xiàn)了兩種類型的Destination疙赠,一個(gè)是點(diǎn)對點(diǎn)的ActiveMQQueue付材,另一個(gè)就是支持訂閱/發(fā)布模式的ActiveMQTopic,本實(shí)例使用topic方式圃阳。
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg>
<value>test.topic1</value>
</constructor-arg>
</bean>
<bean id="mqMessageProducer" class="com.test.activeMq.MqMessageProducer">
<property name="singleConnectionFactory" ref="singleConnectionFactory"/>
</bean>
producer實(shí)現(xiàn)類
public class MqMessageProducer {
// private PooledConnectionFactory pooledConnectionFactory;
//
// public PooledConnectionFactory getPooledConnectionFactory() {
// return pooledConnectionFactory;
// }
//
// public void setPooledConnectionFactory(PooledConnectionFactory pooledConnectionFactory) {
// this.pooledConnectionFactory = pooledConnectionFactory;
// }
private SingleConnectionFactory singleConnectionFactory;
public SingleConnectionFactory getSingleConnectionFactory() {
return singleConnectionFactory;
}
public void setSingleConnectionFactory(SingleConnectionFactory singleConnectionFactory) {
this.singleConnectionFactory = singleConnectionFactory;
}
public void sendMessage(Destination destination, String message){
// ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
Connection connection = singleConnectionFactory.createConnection();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// Destination destination = session.createTopic("test.topic1");
MessageProducer producer = session.createProducer(destination);
TextMessage msg = session.createTextMessage();
msg.setText(message);
producer.send(msg);
System.out.println("往目的地"+destination.toString()+"發(fā)送消息成功:"+ message);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
這里我們使用MessageProducer來發(fā)送消息厌衔。
3、配置消費(fèi)者
生產(chǎn)者往指定目的地Destination發(fā)送消息后,接下來就是消費(fèi)者對指定目的地的消息進(jìn)行消費(fèi)了。Spring為我們封裝的消息監(jiān)聽容器MessageListenerContainer它負(fù)責(zé)接收信息买雾,并把接收到的信息分發(fā)給真正的MessageListener進(jìn)行處理困檩。每個(gè)消費(fèi)者對應(yīng)每個(gè)目的地都需要有對應(yīng)的MessageListenerContainer,另外,MessageListenerContainer還需要知道去監(jiān)聽哪個(gè)JMS服務(wù)器,這是通過在配置MessageConnectionFactory的時(shí)候往里面注入一個(gè)ConnectionFactory來實(shí)現(xiàn)的,MessageListenerContainer有三個(gè)屬性必須指定泞坦,一個(gè)是表示從哪里監(jiān)聽的ConnectionFactory;一個(gè)是表示監(jiān)聽什么的Destination砖顷;一個(gè)是接收到消息以后進(jìn)行消息處理的MessageListener贰锁。Spring一共為我們提供了兩種類型的MessageListenerContainer,SimpleMessageListenerContainer和DefaultMessageListenerContainer滤蝠。
SimpleMessageListenerContainer會(huì)在一開始的時(shí)候就創(chuàng)建一個(gè)會(huì)話session和消費(fèi)者Consumer豌熄,并且會(huì)使用標(biāo)準(zhǔn)的JMS MessageConsumer.setMessageListener()方法注冊監(jiān)聽器讓JMS提供者調(diào)用監(jiān)聽器的回調(diào)函數(shù)。它不會(huì)動(dòng)態(tài)的適應(yīng)運(yùn)行時(shí)需要和參與外部的事務(wù)管理物咳。兼容性方面锣险,它非常接近于獨(dú)立的JMS規(guī)范,但一般不兼容Java EE的JMS限制览闰。
大多數(shù)情況下我們還是使用的DefaultMessageListenerContainer芯肤,跟SimpleMessageListenerContainer相比,DefaultMessageListenerContainer會(huì)動(dòng)態(tài)的適應(yīng)運(yùn)行時(shí)需要压鉴,并且能夠參與外部的事務(wù)管理崖咨。它很好的平衡了對JMS提供者要求低、先進(jìn)功能如事務(wù)參與和兼容Java EE環(huán)境油吭。
定義處理消息的MessageListener
要定義處理消息的MessageListener我們只需要實(shí)現(xiàn)JMS規(guī)范中的MessageListener接口就可以了击蹲。MessageListener接口中只有一個(gè)方法onMessage方法,當(dāng)接收到消息的時(shí)候會(huì)自動(dòng)調(diào)用該方法婉宰。
public class ConsumerMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
System.out.println("接受到一條消息");
//todo可以做一些復(fù)雜業(yè)務(wù)
String receive = ((TextMessage)message).getText();
System.out.println("接受到的消息是:"+receive);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
spring配置如下:
<bean id="consumerMessageListener" class="com.test.activeMq.ConsumerMessageListener"/>
<bean id="springContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="messageListener" ref="consumerMessageListener"/>
<property name="connectionFactory" ref="singleConnectionFactory" />
<property name="destination" ref="queueDestination" />
</bean>
測試代碼如下:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:/WEB-INF/bean.xml")
public class Test {
@Autowired
private MqMessageProducer mqMessageProducer;
@Autowired
private MessageListenerAdapterProducer messageListenerAdapterProducer;
@Autowired
@Qualifier("sessionAwareQueue")
private Destination sessionQueueDestination;
@Autowired
private Destination adapterQueue;
@Autowired
private Destination queueDestination;
// @Autowired
// private DefaultMessageListenerContainer springContainer;springContainer
// @org.junit.Test
// public void testSend(){
// mqMessageProducer.sendMessage(queueDestination,"測試用例");
// }
@org.junit.Test
public void testMessageListenerAdapter(){
//mqMessageProducer.sendMessage(adapterQueue,"測試MessageListenerAdapter");
messageListenerAdapterProducer.sendMessage(adapterQueue,"測試MessageListenerAdapter");
}
}
此外歌豺,在Spring整合JMS的應(yīng)用中我們在定義消息監(jiān)聽器的時(shí)候一共可以定義三種類型的消息監(jiān)聽器,分別是MessageListener心包、SessionAwareMessageListener和MessageListenerAdapter世曾。MessageListener上面例子已經(jīng)介紹。
SessionAwareMessageListener是Spring為我們提供的,它不是標(biāo)準(zhǔn)的JMS MessageListener轮听。MessageListener的設(shè)計(jì)只是純粹用來接收消息的,假如我們在使用MessageListener處理接收到的消息時(shí)我們需要發(fā)送一個(gè)消息通知對方我們已經(jīng)收到這個(gè)消息了岭佳,那么這個(gè)時(shí)候我們就需要在代碼里面去重新獲取一個(gè)Connection或Session血巍。SessionAwareMessageListener的設(shè)計(jì)就是為了方便我們在接收到消息后發(fā)送一個(gè)回復(fù)的消息,它同樣為我們提供了一個(gè)處理接收到的消息的onMessage方法珊随,但是這個(gè)方法可以同時(shí)接收兩個(gè)參數(shù)述寡,一個(gè)是表示當(dāng)前接收到的消息Message,另一個(gè)就是可以用來發(fā)送消息的Session對象叶洞。例子:
public class ConsumerSessionAwareMessageListener implements SessionAwareMessageListener {
private Destination destination;
public Destination getDestination() {
return destination;
}
public void setDestination(Destination destination) {
this.destination = destination;
}
@Override
public void onMessage(Message message, Session session) throws JMSException {
System.out.println("收到一條消息");
System.out.println("消息內(nèi)容是:"+((TextMessage)message).getText());
MessageProducer producer = session.createProducer(destination);
Message textMessage = session.createTextMessage("ConsumerSessionAwareMessageListener....");
producer.send(textMessage);
}
}
spring配置:
<bean id="sessionAwareQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>sessionAwareQueue</value>
</constructor-arg>
</bean>
<bean id="consumerSessionAwareMessageListener" class="com.lufax.activeMq.ConsumerSessionAwareMessageListener">
<property name="destination" ref="queueDestination"/>
</bean>
<bean id="sessionAwareListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="singleConnectionFactory" />
<property name="destination" ref="sessionAwareQueue" />
<property name="messageListener" ref="consumerSessionAwareMessageListener" />
</bean>
MessageListenerAdapter類實(shí)現(xiàn)了MessageListener接口和SessionAwareMessageListener接口鲫凶,它的主要作用是將接收到的消息進(jìn)行類型轉(zhuǎn)換,然后通過反射的形式把它交給一個(gè)普通的Java類進(jìn)行處理衩辟。
MessageListenerAdapter會(huì)把接收到的消息做如下轉(zhuǎn)換:
- TextMessage轉(zhuǎn)換為String對象螟炫;
- BytesMessage轉(zhuǎn)換為byte數(shù)組;
- MapMessage轉(zhuǎn)換為Map對象艺晴;
- ObjectMessage轉(zhuǎn)換為對應(yīng)的Serializable對象昼钻。
既然前面說了MessageListenerAdapter會(huì)把接收到的消息做一個(gè)類型轉(zhuǎn)換,然后利用反射把它交給真正的目標(biāo)處理器——一個(gè)普通的Java類進(jìn)行處理(如果真正的目標(biāo)處理器是一個(gè)MessageListener或者是一個(gè)SessionAwareMessageListener封寞,那么Spring將直接使用接收到的Message對象作為參數(shù)調(diào)用它們的onMessage方法然评,而不會(huì)再利用反射去進(jìn)行調(diào)用),那么我們在定義一個(gè)MessageListenerAdapter的時(shí)候就需要為它指定這樣一個(gè)目標(biāo)類狈究。這個(gè)目標(biāo)類我們可以通過MessageListenerAdapter的構(gòu)造方法參數(shù)指定碗淌,也可以通過它的delegate屬性來指定,如果指定的目標(biāo)處理器是一個(gè)普通的Java類時(shí)Spring將利用Message進(jìn)行了類型轉(zhuǎn)換之后的對象作為參數(shù)通過反射去調(diào)用真正的目標(biāo)處理器的處理方法抖锥,MessageListenerAdapter的defaultListenerMethod屬性來決定真正的目標(biāo)處理器的處理方法亿眠,當(dāng)我們沒有指定該屬性時(shí),Spring會(huì)默認(rèn)調(diào)用目標(biāo)處理器的handleMessage方法如:
<bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<property name="delegate">
<bean class="com.test.activeMq.ConsumerMessageListenerAdapter"/>
</property>
<!--<property name="defaultListenerMethod" value="receiveMessage"/>-->
</bean>
<bean id="adapterQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>adapterQueue</value>
</constructor-arg>
</bean>
<bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="singleConnectionFactory"/>
<property name="destination" ref="adapterQueue"/>
<property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter來作為消息監(jiān)聽器 -->
</bean>
public class ConsumerMessageListenerAdapter {
public String handleMessage(String message){
System.out.println("通過handleMessage接收到純文本消息宁改,消息內(nèi)容是:"+message);
return "這是ConsumerMessageListenerAdapter對象handleMessage方法的返回值缕探。";
}
public void receiveMessage(String message){
System.out.println("通過receiveMessage接受到純文本消息,消息內(nèi)容是"+message);
}
}
MessageListenerAdapter除了會(huì)自動(dòng)的把一個(gè)普通Java類當(dāng)做MessageListener來處理接收到的消息之外还蹲,其另外一個(gè)主要的功能是可以自動(dòng)的發(fā)送返回消息爹耗。
當(dāng)我們用于處理接收到的消息的方法的返回值不為空的時(shí)候,Spring會(huì)自動(dòng)將它封裝為一個(gè)JMS Message谜喊,然后自動(dòng)進(jìn)行回復(fù)潭兽。那么這個(gè)時(shí)候這個(gè)回復(fù)消息將發(fā)送到哪里呢?這主要有兩種方式可以指定斗遏。
第一山卦,可以通過發(fā)送的Message的setJMSReplyTo方法指定該消息對應(yīng)的回復(fù)消息的目的地。這里我們把我們的生產(chǎn)者發(fā)送消息的代碼做一下修改诵次,在發(fā)送消息之前先指定該消息對應(yīng)的回復(fù)目的地為一個(gè)叫responseQueue的隊(duì)列目的地账蓉,具體代碼如下所示:
public class MessageListenerAdapterProducer {
@Autowired
private Destination responseQueue;
private SingleConnectionFactory singleConnectionFactory;
public SingleConnectionFactory getSingleConnectionFactory() {
return singleConnectionFactory;
}
public void setSingleConnectionFactory(SingleConnectionFactory singleConnectionFactory) {
this.singleConnectionFactory = singleConnectionFactory;
}
public void sendMessage(Destination destination, final String message){
System.out.println("---------------生產(chǎn)者發(fā)送消息-----------------");
System.out.println("---------------生產(chǎn)者發(fā)了一個(gè)消息:" + message);
Connection connection = null;
try {
connection = singleConnectionFactory.createConnection();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// Destination destination = session.createTopic("test.topic1");
MessageProducer producer = session.createProducer(destination);
TextMessage msg = session.createTextMessage();
msg.setJMSReplyTo(responseQueue);
msg.setText(message);
producer.send(msg);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
接著定義一個(gè)叫responseQueue的隊(duì)列目的地及其對應(yīng)的消息監(jiān)聽器和監(jiān)聽容器枚碗。
<bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>responseQueue</value>
</constructor-arg>
</bean>
<bean id="responseQueueListener" class="com.test.activeMq.ResponseQueueListener"></bean>
<bean id="responseQueueMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="singleConnectionFactory"/>
<property name="destination" ref="responseQueue"/>
<property name="messageListener" ref="responseQueueListener"/>
</bean>
<bean id="messageListenerAdapterProducer" class="com.test.activeMq.MessageListenerAdapterProducer">
<property name="singleConnectionFactory" ref="singleConnectionFactory"/>
</bean>
responseQueueListener定義如下:
public class ResponseQueueListener implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收到發(fā)送到responseQueue的一個(gè)文本消息,內(nèi)容是:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
上面ConsumerMessageListenerAdapter中的handleMessage有一個(gè)非空的返回值铸本,這樣我們王adapterQueue發(fā)送一個(gè)消息的時(shí)候肮雨,
messageListenerAdapterContainer 監(jiān)聽到消息之后ConsumerMessageListenerAdapter的handleMessage方法處理消息,并往responseQueue發(fā)送一個(gè)消息箱玷,消息內(nèi)容就是handleMessage方法返回的內(nèi)容怨规,最后responseQueueMessageListenerContainer會(huì)監(jiān)聽到消息,觸發(fā)responseQueueListener處理消息锡足,打印內(nèi)容:
接收到發(fā)送到responseQueue的一個(gè)文本消息波丰,內(nèi)容是:XXXXX