實(shí)戰(zhàn)Spring4+ActiveMQ整合實(shí)現(xiàn)消息隊(duì)列(生產(chǎn)者+消費(fèi)者)

引言:

最近公司做了一個(gè)以信息安全為主的項(xiàng)目,其中有一個(gè)業(yè)務(wù)需求就是,項(xiàng)目定時(shí)監(jiān)控操作用戶的行為,對(duì)于一些違規(guī)操作嚴(yán)重的行為馋记,以發(fā)送郵件(FoxMail)的形式進(jìn)行郵件告警,可能是多人懊烤,也可能是一個(gè)人梯醒,第一次是以單人的形式,,直接在業(yè)務(wù)層需要告警的地方發(fā)送郵件即可腌紧,可是后邊需求變更了茸习,對(duì)于某些告警郵件可能會(huì)發(fā)送多人,這其中可能就會(huì)有阻塞發(fā)郵件的可能壁肋,直到把所有郵件發(fā)送完畢后再繼續(xù)做下邊的業(yè)務(wù)号胚,領(lǐng)導(dǎo)說這樣會(huì)影響用戶體驗(yàn),發(fā)郵件的時(shí)候用戶一直處于等待狀態(tài)浸遗,不能干別的事情猫胁。最后研究說用消息隊(duì)列,當(dāng)有需要發(fā)送郵件告警的時(shí)候,就向隊(duì)列中添加一個(gè)標(biāo)識(shí)消息跛锌,ActiveMQ通過監(jiān)聽器的形式弃秆,實(shí)時(shí)監(jiān)聽隊(duì)列里邊的小時(shí),收到消息后髓帽,判斷是不是需要發(fā)送告警的標(biāo)識(shí)菠赚,是的話就自行就行發(fā)送郵件!這是就研究的消息隊(duì)列ActiveMQ,下邊就是具體內(nèi)容:

一郑藏、ActiveMQ

1.1). ActiveMQ

ActiveMQ是Apache所提供的一個(gè)開源的消息系統(tǒng)衡查,完全采用Java來實(shí)現(xiàn),因此必盖,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服務(wù))規(guī)范拌牲。JMS是一組Java應(yīng)用程序接口俱饿,它提供消息的創(chuàng)建、發(fā)送们拙、讀取等一系列服務(wù)稍途。JMS提供了一組公共應(yīng)用程序接口和響應(yīng)的語法,類似于Java數(shù)據(jù)庫的統(tǒng)一訪問接口JDBC,它是一種與廠商無關(guān)的API砚婆,使得Java程序能夠與不同廠商的消息組件很好地進(jìn)行通信。

1. 2). Java Message Service(JMS)

JMS支持兩種消息發(fā)送和接收模型突勇。

  • 一種稱為P2P(Ponit to Point)模型(點(diǎn)對(duì)點(diǎn)一對(duì)一)装盯,即采用點(diǎn)對(duì)點(diǎn)的方式發(fā)送消息。P2P模型是基于隊(duì)列的甲馋,消息生產(chǎn)者發(fā)送消息到隊(duì)列埂奈,消息消費(fèi)者從隊(duì)列中接收消息,隊(duì)列的存在使得消息的異步傳輸稱為可能定躏,P2P模型在點(diǎn)對(duì)點(diǎn)的情況下進(jìn)行消息傳遞時(shí)采用账磺。
  • 另一種稱為Pub/Sub(Publish/Subscribe,即發(fā)布-訂閱)模型痊远,發(fā)布-訂閱模型定義了如何向一個(gè)內(nèi)容節(jié)點(diǎn)發(fā)布和訂閱消息垮抗,這個(gè)內(nèi)容節(jié)點(diǎn)稱為topic(主題)。主題可以認(rèn)為是消息傳遞的中介碧聪,消息發(fā)布這將消息發(fā)布到某個(gè)主題冒版,而消息訂閱者則從主題訂閱消息。主題使得消息的訂閱者與消息的發(fā)布者互相保持獨(dú)立逞姿,不需要進(jìn)行接觸即可保證消息的傳遞辞嗡,發(fā)布-訂閱模型在消息的一對(duì)多廣播時(shí)采用。
1.3). JMS術(shù)語
  1. Provider/MessageProvider:生產(chǎn)者
  2. Consumer/MessageConsumer:消費(fèi)者
  3. PTP:Point To Point滞造,點(diǎn)對(duì)點(diǎn)通信消息模型
  4. Pub/Sub:Publish/Subscribe续室,發(fā)布訂閱消息模型
  5. Queue:隊(duì)列,目標(biāo)類型之一谒养,和PTP結(jié)合
  6. Topic:主題挺狰,目標(biāo)類型之一,和Pub/Sub結(jié)合
  7. ConnectionFactory:連接工廠蝴光,JMS用它創(chuàng)建連接
  8. Connnection:JMS Client到JMS Provider的連接
  9. Destination:消息目的地她渴,由Session創(chuàng)建
  10. Session:會(huì)話,由Connection創(chuàng)建蔑祟,實(shí)質(zhì)上就是發(fā)送趁耗、接受消息的一個(gè)線程,因此生產(chǎn)者疆虚、消費(fèi)者都是 Session創(chuàng)建的
1.4). ActiveMQ應(yīng)用場(chǎng)景

類似送快遞苛败,快遞員(producer)將快遞(Message)放到指定地點(diǎn)(destination)后满葛,就可以離開了,拿快遞的人(customer)在接收到通知后罢屈,到指定地點(diǎn)(destination)去取快遞(Message)就可以了嘀韧。當(dāng)然,取快遞時(shí)可能要進(jìn)行身份驗(yàn)證缠捌,這就涉及到創(chuàng)建連接(connection)時(shí)锄贷,需要指定用戶名和密碼了。還有就是曼月,實(shí)際生活中谊却,當(dāng)快遞員把快遞放好之后,照理說應(yīng)該通知客戶去哪里取快遞哑芹,而ActiveMq幫我們做好了一切炎辨,通知的工作Activemq會(huì)幫我們實(shí)現(xiàn),而無需我們親自編碼通知消費(fèi)者聪姿,生產(chǎn)者只需要將Message放到Mq中即可碴萧,通知消費(fèi)者的工作,mq會(huì)幫我們處理

用途就是用來處理消息末购,也就是處理JMS在大型電子商務(wù)類網(wǎng)站破喻,如京東、淘寶招盲、去哪兒等網(wǎng)站有著深入的應(yīng)用低缩,隊(duì)列的主要作用是消除高并發(fā)訪問高峰,加快網(wǎng)站的響應(yīng)速度曹货。

在不使用消息隊(duì)列的情況下咆繁,用戶的請(qǐng)求數(shù)據(jù)直接寫入數(shù)據(jù)庫,高發(fā)的情況下顶籽,會(huì)對(duì)數(shù)據(jù)庫造成巨大的壓力玩般,同時(shí)也使得系統(tǒng)響應(yīng)延遲加劇,但使用隊(duì)列后礼饱,用戶的請(qǐng)求發(fā)給隊(duì)列后立即返回坏为。

1.5). ActiveMQ下載
1.6). 啟動(dòng)

/apache-activemq-5.15.3/bin/win64/目錄下雙擊activemq.bat文件,在瀏覽器中輸入http://localhost:8161/admin/, 用戶名和密碼輸入admin即可

二镊绪、Srping+ActiveMQ應(yīng)用實(shí)例

2,1). 項(xiàng)目結(jié)構(gòu)
** 2,2). 導(dǎo)入maven依賴,pom.xml文件**
  1 <?xml version="1.0" encoding="UTF-8"?>
  2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4   <modelVersion>4.0.0</modelVersion>
  5 
  6   <groupId>www.cnblogs.com.hongmoshu</groupId>
  7   <artifactId>test_actmq</artifactId>
  8   <version>0.0.1-SNAPSHOT</version>
  9   <packaging>war</packaging>
 10   <name>test_actmq Maven Webapp</name>
 11   <url>http://www.example.com</url>
 12   
 13    <!-- 版本管理 -->
 14   <properties>
 15     <springframework>4.1.8.RELEASE</springframework>
 16   </properties>
 17  
 18 
 19    <dependencies>
 20    
 21      <!-- junit單元測(cè)試 -->
 22     <dependency>
 23       <groupId>junit</groupId>
 24       <artifactId>junit</artifactId>
 25       <version>4.11</version>
 26       <scope>test</scope>
 27     </dependency>
 28     
 29     <!-- JSP相關(guān) -->
 30   <dependency>
 31     <groupId>jstl</groupId>
 32     <artifactId>jstl</artifactId>
 33     <version>1.2</version>
 34   </dependency>
 35   <dependency>
 36     <groupId>javax.servlet</groupId>
 37     <artifactId>servlet-api</artifactId>
 38     <scope>provided</scope>
 39     <version>2.5</version>
 40   </dependency>
 41 
 42      <!-- spring -->
 43     <dependency>
 44       <groupId>org.springframework</groupId>
 45       <artifactId>spring-core</artifactId>
 46       <version>${springframework}</version>
 47     </dependency>
 48     <dependency>
 49       <groupId>org.springframework</groupId>
 50       <artifactId>spring-context</artifactId>
 51       <version>${springframework}</version>
 52     </dependency>
 53     <dependency>
 54       <groupId>org.springframework</groupId>
 55       <artifactId>spring-tx</artifactId>
 56       <version>${springframework}</version>
 57     </dependency>
 58     <dependency>
 59       <groupId>org.springframework</groupId>
 60       <artifactId>spring-webmvc</artifactId>
 61       <version>${springframework}</version>
 62     </dependency>
 63     <dependency>
 64       <groupId>org.springframework</groupId>
 65       <artifactId>spring-jms</artifactId>
 66       <version>${springframework}</version>
 67     </dependency>
 68     
 69     <!-- xbean 如<amq:connectionFactory /> -->
 70     <dependency>
 71       <groupId>org.apache.xbean</groupId>
 72       <artifactId>xbean-spring</artifactId>
 73       <version>3.16</version>
 74     </dependency>
 75     
 76     <!-- activemq -->
 77     <dependency>
 78       <groupId>org.apache.activemq</groupId>
 79       <artifactId>activemq-core</artifactId>
 80       <version>5.7.0</version>
 81     </dependency>
 82     <dependency>
 83       <groupId>org.apache.activemq</groupId>
 84       <artifactId>activemq-pool</artifactId>
 85       <version>5.12.1</version>
 86     </dependency>
 87     
 88     <!-- gson -->
 89     <dependency>
 90       <groupId>com.google.code.gson</groupId>
 91       <artifactId>gson</artifactId>
 92       <version>1.7.1</version>
 93     </dependency>
 94     
 95       <!-- JSON -->
 96     <dependency>
 97         <groupId>net.sf.json-lib</groupId>
 98         <artifactId>json-lib</artifactId>
 99         <version>2.4</version>
100         <classifier>jdk15</classifier>
101     </dependency>
102     
103   </dependencies>
104 
105   <build>
106     <finalName>test_actmq</finalName>
107   
108   </build>
109 </project>
2,3). ActiveMQ的配置文件ActiveMQ.xml
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xmlns:amq="http://activemq.apache.org/schema/core"
 5        xmlns:jms="http://www.springframework.org/schema/jms"
 6        xmlns:context="http://www.springframework.org/schema/context"
 7        xmlns:mvc="http://www.springframework.org/schema/mvc"
 8        xsi:schemaLocation="
 9         http://www.springframework.org/schema/beans
10         http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
11         http://www.springframework.org/schema/context
12         http://www.springframework.org/schema/context/spring-context-4.1.xsd
13         http://www.springframework.org/schema/mvc
14         http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
15         http://www.springframework.org/schema/jms
16         http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
17         http://activemq.apache.org/schema/core
18         http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd"
19 >
20 
21     <context:component-scan base-package="com.svse.service" />
22     <mvc:annotation-driven />
23 
24     <!-- jms.useAsyncSend=true 允許異步接收消息 -->
25     <amq:connectionFactory id="amqConnectionFactory"
26                            brokerURL="tcp://192.168.6.111:61616?jms.useAsyncSend=true"
27                            userName="admin"
28                            password="admin" />
29 
30     <!-- 配置JMS連接工 廠 -->
31     <bean id="connectionFactory"
32           class="org.springframework.jms.connection.CachingConnectionFactory">
33         <constructor-arg ref="amqConnectionFactory" />
34         <property name="sessionCacheSize" value="100" />
35     </bean>
36 
37     <!-- 定義消息隊(duì)列名稱(Queue) -->
38     <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
39         <!-- 設(shè)置消息隊(duì)列的名字 -->
40         <constructor-arg>
41             <value>Jaycekon</value>
42         </constructor-arg>
43     </bean>
44 
45     <!-- 配置JMS模板(Queue)匀伏,Spring提供的JMS工具類,它發(fā)送蝴韭、接收消息够颠。 -->
46     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
47         <property name="connectionFactory" ref="connectionFactory" />
48         <property name="defaultDestination" ref="demoQueueDestination" />
49         <property name="receiveTimeout" value="10000" />
50         <!-- true是topic,false是queue榄鉴,默認(rèn)是false履磨,此處顯示寫出false -->
51         <property name="pubSubDomain" value="false" />
52         <!-- 消息轉(zhuǎn)換器 -->    
53         <property name="messageConverter" ref="userMessageConverter"/>  
54     </bean>
55 
56      <!-- 類型轉(zhuǎn)換器 -->    
57     <bean id="userMessageConverter" class="com.svse.util.ObjectMessageConverter"/>    
58 
59 
60     <!-- 配置消息隊(duì)列監(jiān)聽者(Queue) -->
61      <bean id="queueMessageListener" class="com.svse.util.QueueMessageListener" /> 
62 
63     <!-- 顯示注入消息監(jiān)聽容器(Queue)蛉抓,配置連接工廠,監(jiān)聽的目標(biāo)是demoQueueDestination剃诅,監(jiān)聽器是上面定義的監(jiān)聽器 -->
64     <bean id="queueListenerContainer"
65           class="org.springframework.jms.listener.DefaultMessageListenerContainer">
66         <property name="connectionFactory" ref="connectionFactory" />
67         <property name="destination" ref="demoQueueDestination" />
68         <property name="messageListener" ref="queueMessageListener" />
69     </bean> 
70     
71 </beans>
2,4). Spring的配置文件 spring-mvc.xml
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4     xmlns:context="http://www.springframework.org/schema/context"
 5     xmlns:mvc="http://www.springframework.org/schema/mvc"
 6     xsi:schemaLocation="http://www.springframework.org/schema/beans 
 7         http://www.springframework.org/schema/beans/spring-beans.xsd
 8         http://www.springframework.org/schema/context
 9         http://www.springframework.org/schema/context/spring-context-4.1.xsd
10         http://www.springframework.org/schema/mvc 
11         http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd">
12          
13     <context:component-scan base-package="com.svse.controller" />
14     <mvc:annotation-driven />
15     <bean id="viewResolver" class="org.springframework.web.servlet.view.UrlBasedViewResolver">
16         <property name="viewClass"
17             value="org.springframework.web.servlet.view.JstlView" />
18         <property name="prefix" value="/WEB-INF/views/" />
19         <property name="suffix" value=".jsp" />
20     </bean>
21      
22 </beans>
2,5). web.xml
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
 3          xmlns="http://xmlns.jcp.org/xml/ns/javaee" 
 4          xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd" 
 5          id="WebApp_ID" version="3.1">
 6   <display-name>mydemo</display-name>
 7   
 8   <welcome-file-list>
 9     <welcome-file>index.jsp</welcome-file>
10   </welcome-file-list>
11   
12   <!-- 加載spring及active的配置文件巷送,classpath為項(xiàng)目src下的路徑 -->
13   <context-param>
14     <param-name>contextConfigLocation</param-name>
15     <param-value>
16           classpath:spring-mvc.xml;
17           classpath:ActiveMQ.xml;
18     </param-value>
19     </context-param>
20 
21 
22  <listener>
23     <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
24   </listener>
25 
26   <servlet>
27     <servlet-name>springMVC</servlet-name>
28     <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
29     <init-param>
30       <param-name>contextConfigLocation</param-name>
31       <param-value>classpath:spring-mvc.xml</param-value>
32     </init-param>
33     <load-on-startup>1</load-on-startup>
34   </servlet>
35   <servlet-mapping>
36     <servlet-name>springMVC</servlet-name>
37     <url-pattern>/</url-pattern>
38   </servlet-mapping>
39 
40   <!-- 處理編碼格式 -->
41   <filter>
42     <filter-name>characterEncodingFilter</filter-name>
43     <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
44     <init-param>
45       <param-name>encoding</param-name>
46       <param-value>UTF-8</param-value>
47     </init-param>
48     <init-param>
49       <param-name>forceEncoding</param-name>
50       <param-value>true</param-value>
51     </init-param>
52   </filter>
53   <filter-mapping>
54     <filter-name>characterEncodingFilter</filter-name>
55     <url-pattern>/*</url-pattern>
56   </filter-mapping>
57   
58 </web-app>
2,6). 實(shí)體類Users對(duì)象
 1 package com.svse.entity;
 2 import java.io.Serializable;
 3 
 4 public class Users implements Serializable{
 5 
 6     private String userId;
 7     private String userName;
 8     private String sex;
 9     private String age;
10     private String type;
11     
12     
13     public Users() {
14         super();
15     }
16     public Users(String userId, String userName, String sex, String age,
17             String type) {
18         super();
19         this.userId = userId;
20         this.userName = userName;
21         this.sex = sex;
22         this.age = age;
23         this.type = type;
24     }
25     public String getUserId() {
26         return userId;
27     }
28     public void setUserId(String userId) {
29         this.userId = userId;
30     }
31     public String getUserName() {
32         return userName;
33     }
34     public void setUserName(String userName) {
35         this.userName = userName;
36     }
37     public String getSex() {
38         return sex;
39     }
40     public void setSex(String sex) {
41         this.sex = sex;
42     }
43     public String getAge() {
44         return age;
45     }
46     public void setAge(String age) {
47         this.age = age;
48     }
49     public String getType() {
50         return type;
51     }
52     public void setType(String type) {
53         this.type = type;
54     }
55     @Override
56     public String toString() {
57         return "Users [userId=" + userId + ", userName=" + userName + ", sex="
58                 + sex + ", age=" + age + ", type=" + type + "]";
59     }
60     
61     
62 }
2,7). 核心代碼(生產(chǎn)者ProducerService)
 1 package com.svse.service;
 2 
 3 import javax.annotation.Resource;
 4 import javax.jms.Destination;
 5 import javax.jms.JMSException;
 6 import javax.jms.Message;
 7 import javax.jms.Session;
 8 
 9 import org.springframework.jms.core.JmsTemplate;
10 import org.springframework.jms.core.MessageCreator;
11 import org.springframework.stereotype.Service;
12 
13 import com.svse.entity.Users;
14 
15 @Service
16 public class ProducerService {
17 
18     @Resource(name="jmsTemplate")
19     private JmsTemplate jmsTemplate;
20     
21     
22     /**
23      * 向指定隊(duì)列發(fā)送消息 (發(fā)送文本消息)
24      */
25     public void sendMessage(Destination destination,final String msg){
26         
27         jmsTemplate.setDeliveryPersistent(true);
28         
29         System.out.println(Thread.currentThread().getName()+" 向隊(duì)列"+destination.toString()+"發(fā)送消息---------------------->"+msg);
30         jmsTemplate.send(destination, new MessageCreator() {
31             public Message createMessage(Session session) throws JMSException {
32                 return session.createTextMessage(msg);
33             }
34         });
35     }
36     
37     /**
38      * 向指定隊(duì)列發(fā)送消息以對(duì)象的方式 (發(fā)送對(duì)象消息)
39      */
40     public void sendMessageNew(Destination destination,Users user){
41         System.out.println(Thread.currentThread().getName()+" 向隊(duì)列"+destination.toString()+"發(fā)送消息---------------------->"+user);
42         jmsTemplate.convertAndSend(user);
43     }
44 
45     /**
46      * 向默認(rèn)隊(duì)列發(fā)送消息
47      */
48     public void sendMessage(final String msg){
49         String destination = jmsTemplate.getDefaultDestinationName();
50         System.out.println(Thread.currentThread().getName()+" 向隊(duì)列"+destination+"發(fā)送消息---------------------->"+msg);
51         jmsTemplate.send(new MessageCreator() {
52             public Message createMessage(Session session) throws JMSException {
53                 return session.createTextMessage(msg);
54             }
55         });
56     }
57 }
2,8). 核心代碼(消費(fèi)產(chǎn)者ConsumerService)
 1 package com.svse.service;
 2 
 3 import javax.annotation.Resource;
 4 import javax.jms.Destination;
 5 import javax.jms.JMSException;
 6 import javax.jms.ObjectMessage;
 7 import javax.jms.TextMessage;
 8 
 9 import net.sf.json.JSONObject;
10 
11 import org.springframework.jms.core.JmsTemplate;
12 import org.springframework.stereotype.Service;
13 
14 import com.svse.entity.Users;
15 
16 @Service
17 public class ConsumerService {
18 
19      @Resource(name="jmsTemplate")
20      private JmsTemplate jmsTemplate;
21      //接收文本消息
22      public TextMessage receive(Destination destination){
23             TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
24             try{
25                 JSONObject json=JSONObject.fromObject(textMessage.getText());
26                 System.out.println("name:"+json.getString("userName"));
27                 System.out.println("從隊(duì)列" + destination.toString() + "收到了消息:\t"
28                         + textMessage.getText());
29             } catch (JMSException e) {
30                 e.printStackTrace();
31             }
32             return textMessage;
33         }
34      //接收對(duì)象消息
35      public ObjectMessage receiveNew(Destination destination){
36              ObjectMessage objMsg=(ObjectMessage) jmsTemplate.receive(destination);
38              try{
39                 Users users= (Users) objMsg.getObject();
44                 System.out.println("name:"+users.getUserName());
47                 System.out.println("從隊(duì)列" + destination.toString() + "收到了消息:\t"
48                         + users);
49             } catch (JMSException e) {
50                 e.printStackTrace();
51             }
52             return objMsg;
53         }
54 }
2,9). 核心代碼(控制器ConsumerService)
  1 package com.svse.controller.mq;
  2 
  3 import java.io.IOException;
  4 import java.text.SimpleDateFormat;
  5 import java.util.Date;
  7 import javax.annotation.Resource;
  8 import javax.jms.DeliveryMode;
  9 import javax.jms.Destination;
 10 import javax.jms.JMSException;
 11 import javax.jms.ObjectMessage;
 12 import javax.jms.TextMessage;
 13 import javax.management.MBeanServerConnection;
 14 import javax.management.remote.JMXConnector;
 15 import javax.management.remote.JMXConnectorFactory;
 16 import javax.management.remote.JMXServiceURL;
 18 import org.springframework.stereotype.Controller;
 19 import org.springframework.web.bind.annotation.RequestMapping;
 20 import org.springframework.web.bind.annotation.RequestMethod;
 21 import org.springframework.web.bind.annotation.RequestParam;
 22 import org.springframework.web.servlet.ModelAndView;
 24 import com.google.gson.Gson;
 25 import com.svse.entity.Users;
 26 import com.svse.service.ConsumerService;
 27 import com.svse.service.ProducerService;
 28 
 29 @Controller
 30 public class DemoController {
 35     
 36      //隊(duì)列名Jaycekon (ActiveMQ中設(shè)置的隊(duì)列的名稱)
 37     @Resource(name="demoQueueDestination")
 38     private Destination demoQueueDestination;
 39 
 40     //隊(duì)列消息生產(chǎn)者
 41     @Resource(name="producerService")
 42     private ProducerService producer;
 43     
 44    //隊(duì)列消息消費(fèi)者
 45     @Resource(name="consumerService")
 46     private ConsumerService consumer;
 47     
 48     /*
 49      * 準(zhǔn)備發(fā)消息
 50      */
 51     @RequestMapping(value="/producer",method=RequestMethod.GET)
 52     public ModelAndView producer(){
 53         System.out.println("------------go producer");
 54         
 55         Date now = new Date(); 
 56         SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 57         String time = dateFormat.format( now ); 
 58         System.out.println(time);
 59         
 60         ModelAndView mv = new ModelAndView();
 61         mv.addObject("time", time);
 62         mv.setViewName("producer");        
 63         return mv;
 64     }
 65     
 66     /*
 67      * 發(fā)消息
 68      */
 69     @RequestMapping(value="/onsend",method=RequestMethod.POST)
 70     public ModelAndView producer(@RequestParam("message") String message) {
 71         System.out.println("------------send to jms");
 72         ModelAndView mv = new ModelAndView();
 73         for(int i=0;i<5;i++){
 74             try {
 75                 Users users=new Users("10"+(i+1),"趙媛媛"+(i+1),"女","27","影視演員");
 76                 Gson gson=new Gson();
 77                 String sendMessage=gson.toJson(users);
 78                 System.out.println("發(fā)送的消息sendMessage:"+sendMessage.toString());
 79              // producer.sendMessage(demoQueueDestination,sendMessage.toString());//以文本的形式
 80               producer.sendMessageNew(demoQueueDestination, users);//以對(duì)象的方式
 81              
 82             } catch (Exception e) {
 83                 e.printStackTrace();
 84             }
 85         }
 86         mv.setViewName("index");
 87         return mv;
 88     }
 89     /*
 90      * 手動(dòng)接收消息
 91      */
 92     @RequestMapping(value="/receive",method=RequestMethod.GET)
 93     public ModelAndView queue_receive() throws JMSException {
 94         System.out.println("------------receive message");
 95         ModelAndView mv = new ModelAndView();
 96         
 97       //TextMessage tm = consumer.receive(demoQueueDestination);//接收文本消息
 98         
 99         ObjectMessage objMsg=consumer.receiveNew(demoQueueDestination);//接收對(duì)象消息
100         Users users= (Users) objMsg.getObject();
101         //mv.addObject("textMessage", tm.getText());
102         mv.addObject("textMessage", users.getUserId()+" || "+users.getUserName());
103         mv.setViewName("receive");
104         return mv;
105     }
106     
107     /*
108      * ActiveMQ Manager Test
109      */
110     @RequestMapping(value="/jms",method=RequestMethod.GET)
111     public ModelAndView jmsManager() throws IOException {
112         System.out.println("------------jms manager");
113         ModelAndView mv = new ModelAndView();
114         mv.setViewName("index");
115         
116         JMXServiceURL url = new JMXServiceURL("");
117         JMXConnector connector = JMXConnectorFactory.connect(url);
118         connector.connect();
119         MBeanServerConnection connection = connector.getMBeanServerConnection();
120         
121         return mv;
122     }
123     
124 }

三、.對(duì)象轉(zhuǎn)換器MessageConverter和消息監(jiān)聽器MessageListener

在上邊的ProducerService和ConsumerService中,不論是發(fā)送消息還是接收消息,都可以以文本TextMessage的方式和ObjectMessage的方式.如果是簡(jiǎn)單的文本消息可以以TextMessage,但是如果需要發(fā)送的內(nèi)容比較多,結(jié)構(gòu)比較復(fù)雜,這時(shí)候就建議用對(duì)象文本ObjectMessage的方式向隊(duì)列queue中發(fā)送消息了.但是這時(shí)候就需要用到對(duì)象消息轉(zhuǎn)換器MessageConverter.

3,1). 消息轉(zhuǎn)換器MessageageConverte

MessageConverter的作用主要有兩方面矛辕,一方面它可以把我們的非標(biāo)準(zhǔn)化Message對(duì)象轉(zhuǎn)換成我們的目標(biāo)Message對(duì)象笑跛,這主要是用在發(fā)送消息的時(shí)候;另一方面它又可以把我們的Message對(duì)象
轉(zhuǎn)換成對(duì)應(yīng)的目標(biāo)對(duì)象聊品,這主要是用在接收消息的時(shí)候堡牡。

 1 package com.svse.util;
 2 
 3 import java.io.Serializable;
 4 
 5 import javax.jms.JMSException;
 6 import javax.jms.Message;
 7 import javax.jms.ObjectMessage;
 8 import javax.jms.Session;
 9 
10 import org.springframework.jms.support.converter.MessageConversionException;
11 import org.springframework.jms.support.converter.MessageConverter;
12 
13 /**
14  *功能說明:通用的消息對(duì)象轉(zhuǎn)換類
15  *@author:zsq
16  *create date:2019年7月12日 上午9:28:31
17  *修改人   修改時(shí)間  修改描述
18  *Copyright (c)2019北京智華天成科技有限公司-版權(quán)所有
19  */
20 public class ObjectMessageConverter implements MessageConverter {
21 
22     
23     //把一個(gè)Java對(duì)象轉(zhuǎn)換成對(duì)應(yīng)的JMS Message (生產(chǎn)消息的時(shí)候)
24     public Message toMessage(Object object, Session session)
25             throws JMSException, MessageConversionException {
26         
27         return session.createObjectMessage((Serializable) object);  
28     }
29 
30     //把一個(gè)JMS Message轉(zhuǎn)換成對(duì)應(yīng)的Java對(duì)象 (消費(fèi)消息的時(shí)候)
31     public Object fromMessage(Message message) throws JMSException,
32             MessageConversionException {
33         ObjectMessage objMessage = (ObjectMessage) message;  
34         return objMessage.getObject();  
35     }
36 
37 }

注意:寫了消息轉(zhuǎn)化器之后還需要的ActiveMQ.xml中進(jìn)行配置

3,2). 消息監(jiān)聽器MessageageListe

MessageageListe作用就是動(dòng)態(tài)的自行監(jiān)聽消息隊(duì)列的生產(chǎn)者發(fā)送的消息,不需要人工手動(dòng)接收!

 1 package com.svse.util;
 2 import javax.jms.JMSException;
 3 import javax.jms.Message;
 4 import javax.jms.MessageListener;
 5 import javax.jms.ObjectMessage;
 6 import javax.jms.TextMessage;
 7 
 8 import com.svse.entity.Users;
 9 
10 
11 public class QueueMessageListener implements MessageListener {
12 
13    //添加了監(jiān)聽器,只要生產(chǎn)者發(fā)布了消息,監(jiān)聽器監(jiān)聽到有消息消費(fèi)者就會(huì)自動(dòng)消費(fèi)(獲取消息)
14     public void onMessage(Message message) {
15          //(第1種方式)沒加轉(zhuǎn)換器之前接收到的是文本消息
16         //TextMessage tm = (TextMessage) message;
17         
18         //(第2種方式)加了轉(zhuǎn)換器之后接收到的ObjectMessage對(duì)象消息
19         ObjectMessage objMsg=(ObjectMessage) message;
20          Users users;
21         try {
22             users = (Users) objMsg.getObject();
23             //System.out.println("QueueMessageListener監(jiān)聽到了文本消息:\t" + tm.getText());
24             System.out.println("QueueMessageListener監(jiān)聽到了文本消息:\t" + users);      
25             //do something ...
26         } catch (JMSException e1) {
27             // TODO Auto-generated catch block
28             e1.printStackTrace();
29         }
30     }
31     
32 }

同樣寫好監(jiān)聽器后也是需在ActiveMQ.xml中進(jìn)行配置注冊(cè)的

總結(jié)

(1)注冊(cè)JmsTemplate時(shí)杨刨,pubSubDomain這個(gè)屬性的值要特別注意。默認(rèn)值是false擦剑,也就是說默認(rèn)只是支持queue模式妖胀,不支持topic模式。但是惠勒,如果將它改為true赚抡,則不支持queue模式。因此如果項(xiàng)目需要同時(shí)支持queue和topic模式纠屋,那么需要注冊(cè)2個(gè)JmsTemplate涂臣,同時(shí)監(jiān)聽容器也需要注冊(cè)2個(gè)

(2)使用Queue時(shí),生產(chǎn)者只要將Message發(fā)送到MQ服務(wù)器端售担,消費(fèi)者就可以進(jìn)行消費(fèi)赁遗,而無需生產(chǎn)者程序一直運(yùn)行;

(3)消息是按照先入先出的順序族铆,一旦有消費(fèi)者將Message消費(fèi)岩四,該Message就會(huì)從MQ服務(wù)器隊(duì)列中刪去;

(4)有文章說哥攘,“生產(chǎn)者”<-->"消費(fèi)者"是一對(duì)一的關(guān)系剖煌,其實(shí)并不準(zhǔn)確,從應(yīng)用中可以看出逝淹,一個(gè)生產(chǎn)者產(chǎn)生的消息耕姊,可以被多個(gè)消費(fèi)者進(jìn)行消費(fèi),只不過多個(gè)消費(fèi)者在消費(fèi)消息時(shí)是競(jìng)爭(zhēng)的關(guān)系栅葡,先得到的先消費(fèi)茉兰,一旦消費(fèi)完成,該消息就會(huì)出隊(duì)列妥畏,
就不能被其他消費(fèi)者再消費(fèi)了邦邦,即“一次性消費(fèi)”安吁。就是我們熟悉的“點(diǎn)對(duì)點(diǎn)”通信了;

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末燃辖,一起剝皮案震驚了整個(gè)濱河市鬼店,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌黔龟,老刑警劉巖妇智,帶你破解...
    沈念sama閱讀 221,635評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異氏身,居然都是意外死亡巍棱,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,543評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門蛋欣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來航徙,“玉大人,你說我怎么就攤上這事陷虎〉教ぃ” “怎么了?”我有些...
    開封第一講書人閱讀 168,083評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵尚猿,是天一觀的道長(zhǎng)窝稿。 經(jīng)常有香客問我,道長(zhǎng)凿掂,這世上最難降的妖魔是什么伴榔? 我笑而不...
    開封第一講書人閱讀 59,640評(píng)論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮庄萎,結(jié)果婚禮上踪少,老公的妹妹穿的比我還像新娘。我一直安慰自己惨恭,他們只是感情好秉馏,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,640評(píng)論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著脱羡,像睡著了一般萝究。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上锉罐,一...
    開封第一講書人閱讀 52,262評(píng)論 1 308
  • 那天帆竹,我揣著相機(jī)與錄音,去河邊找鬼脓规。 笑死栽连,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播秒紧,決...
    沈念sama閱讀 40,833評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼绢陌,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了熔恢?” 一聲冷哼從身側(cè)響起脐湾,我...
    開封第一講書人閱讀 39,736評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎叙淌,沒想到半個(gè)月后秤掌,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,280評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡鹰霍,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,369評(píng)論 3 340
  • 正文 我和宋清朗相戀三年闻鉴,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片茂洒。...
    茶點(diǎn)故事閱讀 40,503評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡孟岛,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出督勺,到底是詐尸還是另有隱情蚀苛,我是刑警寧澤,帶...
    沈念sama閱讀 36,185評(píng)論 5 350
  • 正文 年R本政府宣布玷氏,位于F島的核電站,受9級(jí)特大地震影響腋舌,放射性物質(zhì)發(fā)生泄漏盏触。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,870評(píng)論 3 333
  • 文/蒙蒙 一块饺、第九天 我趴在偏房一處隱蔽的房頂上張望赞辩。 院中可真熱鬧,春花似錦授艰、人聲如沸辨嗽。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,340評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽糟需。三九已至,卻和暖如春谷朝,著一層夾襖步出監(jiān)牢的瞬間洲押,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,460評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工圆凰, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留杈帐,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,909評(píng)論 3 376
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像挑童,于是被迫代替她去往敵國(guó)和親累铅。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,512評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容