前言
說(shuō)明
開發(fā)環(huán)境:IDEA,Active MQ
項(xiàng)目構(gòu)建:Maven
軟件環(huán)境:Spring钻心,Spring JMS凄硼,Active MQ
項(xiàng)目描述:Spring JMS與Active MQ進(jìn)行通訊
目的
入門Spring JMS、Active MQ捷沸,使用Spring JMS向Active MQ的Message Queue發(fā)消息和讀消息摊沉。
PS:
demo整合過(guò)程均親測(cè),按順序編碼已測(cè)試通過(guò)
項(xiàng)目代碼有注釋痒给,可供理清各個(gè)class和各個(gè)方法塊及屬性的作用说墨。
編碼
1. 開啟Active MQ服務(wù)
2. 使用maven導(dǎo)入依賴庫(kù)
pom.xml
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>4.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>4.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
</dependencies>
3. 創(chuàng)建application.properties,封裝Message Queue的配置
application.properties
jms.broker.url=tcp://localhost:61616
jms.queue.name=bar
4. JMS全局配置侈玄,配置與ActiveMQ的連接
JMSConfiguration.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
<!-- 配置與ActiveMQ的連接 -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="location">
<value>application.properties</value>
</property>
</bean>
<!-- Activemq connection factory -->
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<constructor-arg index="0" value="${jms.broker.url}" />
<property name="useAsyncSend" value="true" />
</bean>
<!-- ConnectionFactory Definition -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
</bean>
<!-- Default Destination Queue Definition-->
<bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="${jms.queue.name}" />
</bean>
<!-- JmsTemplate Definition -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="defaultDestination" ref="defaultDestination"/>
</bean>
<!-- Message Sender Definition -->
<bean id="messageSender" class="com.net.jms.MessageSender">
<constructor-arg index="0" ref="jmsTemplate" />
</bean>
</beans>
分析:
(1) 配置ActiveMQ提供的ActiveMQConnectionFactory
(2) 配置一個(gè)Spring JMS提供的CachingConnectionFactory
(3) 定義一個(gè)ActiveMQ Queue作為消息的接收Queue(即Destination)
(4) 創(chuàng)建JmsTemplate
(5) 自定義一個(gè)MessageSender婉刀,使用該JmsTemplate進(jìn)行消息發(fā)送
5. 創(chuàng)建MessageSender,提供發(fā)送消息的服務(wù)
MessageSender.java
package com.net.jms;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
/**
* 發(fā)送消息的服務(wù):發(fā)送字符串信息
*/
@Component
public class MessageSender {
private final JmsTemplate jmsTemplate;
public MessageSender(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void send(final String text) {
jmsTemplate.convertAndSend(text);
System.out.println("send: " + text);
}
}
作用:通過(guò)jmsTemplate發(fā)送一個(gè)字符串信息
6. 配置一個(gè)Listener來(lái)監(jiān)聽和處理當(dāng)前的Message Queue
JMSReceiver.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- 配置一個(gè)Listener來(lái)監(jiān)聽和處理當(dāng)前的Message Queue -->
<!-- Message Receiver Definition -->
<bean id="messageReceiver" class="com.net.jms.MessageReceiver" />
<bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationName" value="${jms.queue.name}"/>
<property name="messageListener" ref="messageReceiver"/>
</bean>
</beans>
自定義了一個(gè)MessageListener序仙,且使用Spring提供的SimpleMessageListenerContainer作為Container突颊。
7. 創(chuàng)建MessageListener的具體實(shí)現(xiàn)
MessageReceiver.java
package com.net.jms;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* 消息偵聽器:監(jiān)聽當(dāng)前的Message Queue
* 從Queue中讀取消息,并輸出到當(dāng)前控制臺(tái)中
*/
public class MessageReceiver implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
System.out.println(String.format("Received: %s",text));
try {
Thread.sleep(100);
} catch (InterruptedException e){
e.printStackTrace();
}
} catch (JMSException e){
e.printStackTrace();
}
}
}
}
作用:從Queue中讀取消息,并輸出到當(dāng)前控制臺(tái)中律秃。
8. 創(chuàng)建兩個(gè)測(cè)試類爬橡,一個(gè)用于發(fā)送消息到ActiveMQ的MessageQueue中,一個(gè)用于從MessageQueue中讀取消息
SenderApp.java
package com.net;
import com.net.jms.MessageSender;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.util.StringUtils;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* 發(fā)送消息到ActiveMQ的Message Queue
*/
public class SenderApp {
public static void main(String[] args) throws IOException {
start("JMSConfiguration.xml");
}
public static void start(String configLocation) throws IOException {
MessageSender sender = getMessageSender(configLocation);
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
System.out.println("Please input your message:");
String text = br.readLine();
while (!StringUtils.isEmpty(text)) {
System.out.println(String.format("send message: %s", text));
sender.send(text);
text = br.readLine();
}
}
public static MessageSender getMessageSender(String configLocation) {
ApplicationContext context = new ClassPathXmlApplicationContext(configLocation);
return (MessageSender) context.getBean("messageSender");
}
}
ReceiverApp.java
package com.net;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* 從Message Queue中讀取消息
*/
public class ReceiverApp {
public static void main(String[] args) {
new ClassPathXmlApplicationContext("JMSConfiguration.xml", "JMSReceiver.xml");
}
}
9. 測(cè)試:運(yùn)行SenderApp.java棒动,在控制臺(tái)輸入消息糙申,接著運(yùn)行ReceiverApp.java,結(jié)果如下
總結(jié)
到此船惨,spring jms與activemq的通信就已經(jīng)集成結(jié)束了柜裸。