ActiveMq隊列spring實現(xiàn),案列如下
(1)pom.xml引入相關jar
<!-- activeMQ相關 begin-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.1</version></dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.1.4.RELEASE</version>
</dependency>
(2)添加生產者配置activemq-sender.xml
<description>JMS發(fā)布者應用配置</description>
? ? <!-- CachingConnectionFactory 連接工廠 (有緩存功能)-->
? ? <bean id="cachingConnectionFactory"
? ? ? ? class="org.springframework.jms.connection.CachingConnectionFactory">
? ? ? ? <!-- Session緩存數(shù)量 -->
? ? ? ? <property name="sessionCacheSize" value="20" />
? ? ? ? <property name="targetConnectionFactory">?
? ? ? ? ? ? <bean class="org.apache.activemq.ActiveMQConnectionFactory">?
? ? ? ? ? ? ? ? <!-- MQ地址 賬戶名 密碼-->?
? ? ? ? ? ? ? ? <property name="brokerURL" value="tcp://192.168.56.129:61616" />
? ? ? ? ? ? ? ? <property name="userName" value="parry" />
? ? ? ? ? ? ? ? <property name="password" value="parry123" />
? ? ? ? ? ? ? ? <!-- 是否異步發(fā)送 -->
? ? ? ? ? ? ? ? <property name="useAsyncSend" value="true"/>
? ? ? ? ? ? </bean>?
? ? ? ? </property>?
? ? </bean>
? ? <!-- 接收消息的目的地(一個主題)點對點隊列 -->
? ? <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
? ? ? ? <!-- 設置消息主題的名字 -->
? ? ? ? <constructor-arg index="0" value="messages" />
? ? </bean>
? ? <!-- 接收配置JMS模版 -->
? ? <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
? ? ? ? <property name="connectionFactory" ref="cachingConnectionFactory" />
? ? ? ? <property name="defaultDestination" ref="destination" />
? ? ? ? <!-- value為true為發(fā)布/訂閱模式疹鳄; value為false為點對點模式-->
? ? ? ? <property name="pubSubDomain" value="false"/>
? ? </bean>
(3)添加消費者配置activemq-consumer.xml
<description>JMS訂閱者應用配置</description>
? ? <!-- CachingConnectionFactory 連接工廠 (有緩存功能)-->
? ? <bean id="cachingConnectionFactory"
? ? ? ? class="org.springframework.jms.connection.CachingConnectionFactory">
? ? ? ? <!-- Session緩存數(shù)量 -->
? ? ? ? <property name="sessionCacheSize" value="20" />
? ? ? ? <property name="targetConnectionFactory">?
? ? ? ? ? ? <bean class="org.apache.activemq.ActiveMQConnectionFactory">?
? ? ? ? ? ? ? ? <!-- MQ地址 賬戶名 密碼-->?
? ? ? ? ? ? ? ? <property name="brokerURL" value="tcp://192.168.56.129:61616" />
? ? ? ? ? ? ? ? <property name="userName" value="parry" />
? ? ? ? ? ? ? ? <property name="password" value="parry123" />
? ? ? ? ? ? ? ? <!-- 是否異步發(fā)送 -->
? ? ? ? ? ? ? ? <property name="useAsyncSend" value="true"/>
? ? ? ? ? ? </bean>?
? ? ? ? </property>?
? ? </bean>
? ? <!-- 接收消息的目的地(一個主題)點對點隊列 -->
? ? <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
? ? ? ? <!-- 設置消息主題的名字 -->
? ? ? ? <constructor-arg index="0" value="messages" />
? ? </bean>
? ? <!-- 消費者配置 (自己定義) -->
? ? <bean id="consumer" class="com.parry.MQ.funcion.Listener" />
? ? <!-- 消息監(jiān)聽容器 -->
? ? <bean id="myListenerContainer"
? ? ? ? class="org.springframework.jms.listener.DefaultMessageListenerContainer">
? ? ? ? <property name="connectionFactory" ref="cachingConnectionFactory" />
? ? ? ? <property name="destination" ref="destination" />
? ? ? ? <property name="messageListener" ref="consumer" />
? ? ? ? <!-- 如果消息的接收速率,大于消息處理的速率時,可以采取線程池方式 -->
? ? ? ? <property name="taskExecutor" ref="queueMessageExecutor"/>
? ? ? ? <!-- 設置固定的線程數(shù) -->
? ? ? ? <property name="concurrentConsumers" value="30"/>
? ? ? ? <!-- 設置動態(tài)的線程數(shù) -->
? ? ? ? <property name="concurrency" value="20-50"/>
? ? ? ? <!-- 設置最大的線程數(shù) -->
? ? ? ? <property name="maxConcurrentConsumers" value="80"/>
? ? </bean>
? ? <bean id="queueMessageExecutor"
? ? ? ? class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
? ? ? ? <property name="corePoolSize" value="30" />
? ? ? ? <property name="maxPoolSize" value="80" />
? ? ? ? <property name="daemon" value="true" />
? ? ? ? <property name="keepAliveSeconds" value="120" />
? ? </bean>
∨÷浴(4)新建一個發(fā)送消息的方法
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
/**
* 發(fā)送消息
* @author Administrator
*
*/
@Component
public class QueueSender {
? ? @Autowired
? ? private JmsTemplate myJmsTemplate;
? ? /**
? ? * 發(fā)送一條消息到指定的隊列(目標)
? ? *
? ? * @param queueName
? ? *? ? ? ? ? ? 隊列名稱
? ? * @param message
? ? *? ? ? ? ? ? 消息內容
? ? */
? ? public void send(String queueName, final String message) {
? ? ? ? myJmsTemplate.send(queueName, new MessageCreator() {
? ? ? ? ? ? public Message createMessage(Session session) throws JMSException {
? ? ? ? ? ? ? ? return session.createTextMessage(message);
? ? ? ? ? ? }
? ? ? ? });
? ? }
}
(5)添加監(jiān)聽器
package com.parry.MQ.funcion;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* 接收者監(jiān)聽類
* @author Administrator
*
*/
public class Listener implements MessageListener {
? ? public void onMessage(Message message) {
? ? ? ? // 業(yè)務處理
? ? ? ? try {
? ? ? ? ? ? TextMessage message2 = (TextMessage) message;
? ? ? ? ? ? System.out.println("接收到信息:" + message2.getText());
? ? ? ? } catch (JMSException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }
}
(6)寫個一請求測試一下
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import com.parry.MQ.funcion.QueueSender;
@Controller
public class App {
? ? @Autowired
? ? private QueueSender sender;
? ? @RequestMapping("test")
? ? @ResponseBody
? ? public String Test() {
? ? ? ? sender.send("messages", "你好瘪弓,這是我的第一條消息垫蛆!");
? ? ? ? return "Hello world";
? ? }
}
(7)開啟服務,訪問路徑測試