<blockquote><h4>認(rèn)識(shí)消息隊(duì)列</h4></blockquote>
“消息”是在兩臺(tái)計(jì)算機(jī)間傳送的數(shù)據(jù)單位贱案。消息可以非常簡(jiǎn)單摩泪,例如只包含文本字符串族购;也可以更復(fù)雜,可能包含對(duì)象……
消息被發(fā)送到隊(duì)列中直撤。“消息隊(duì)列”是在消息的傳輸過(guò)程中保存消息的容器蜕着。消息隊(duì)列管理器在將消息從它的源中繼到它的目標(biāo)時(shí)充當(dāng)中間人谋竖。隊(duì)列的主要目的是提供路由并保證消息的傳遞;如果發(fā)送消息時(shí)接收者不可用侮东,消息隊(duì)列會(huì)保留消息圈盔,直到可以成功地傳遞它。
<blockquote><h4>JMS消息隊(duì)列</h4></blockquote>
Jms即[Java消息服務(wù)](http://baike.baidu.com/view/3292569.htm)(Java Message Service)[應(yīng)用程序](http://baike.baidu.com/view/330120.htm)接口是一個(gè)[Java平臺(tái)](http://baike.baidu.com/view/209634.htm)中關(guān)于面向[消息中間件](http://baike.baidu.com/view/3118541.htm)(MOM)的API悄雅,用于在兩個(gè)應(yīng)用程序之間驱敲,或[分布式系統(tǒng)](http://baike.baidu.com/view/991489.htm)中發(fā)送消息,進(jìn)行異步通信宽闲。Java消息服務(wù)是一個(gè)與具體平臺(tái)無(wú)關(guān)的API众眨,絕大多數(shù)MOM提供[商都](http://baike.baidu.com/view/19763.htm)對(duì)JMS提供支持。
JMS(Java Messaging Service)是[Java](http://baike.baidu.com/view/29.htm)平臺(tái)上有關(guān)面向消息中間件(MOM)的技術(shù)規(guī)范容诬,它便于消息系統(tǒng)中的Java[應(yīng)用程序](http://baike.baidu.com/view/330120.htm)進(jìn)行消息交換,并且通過(guò)提供標(biāo)準(zhǔn)的產(chǎn)生娩梨、發(fā)送、接收消息的接口簡(jiǎn)化[企業(yè)](http://baike.baidu.com/view/38340.htm)應(yīng)用的開(kāi)發(fā)览徒,翻譯為[Java](http://baike.baidu.com/view/29.htm)消息[服務(wù)](http://baike.baidu.com/view/133203.htm)狈定。
<blockquote><h4>JMS對(duì)象模型</h4></blockquote>
1)連接工廠。連接工廠(ConnectionFactory)是由管理員創(chuàng)建习蓬,并綁定到JNDI樹(shù)中纽什。客戶(hù)端使用JNDI查找連接工廠躲叼,然后利用連接工廠創(chuàng)建一個(gè)JMS連接芦缰。
2)JMS連接。JMS連接(Connection)表示JMS客戶(hù)端和服務(wù)器端之間的一個(gè)活動(dòng)的連接枫慷,是由客戶(hù)端通過(guò)調(diào)用連接工廠的方法建立的让蕾。
3)JMS會(huì)話(huà)。JMS會(huì)話(huà)(Session)表示JMS客戶(hù)與JMS服務(wù)器之間的會(huì)話(huà)狀態(tài)或听。JMS會(huì)話(huà)建立在JMS連接上探孝,表示客戶(hù)與服務(wù)器之間的一個(gè)會(huì)話(huà)線(xiàn)程。
4)JMS目的神帅。JMS目的(Destination)再姑,又稱(chēng)為消息隊(duì)列,是實(shí)際的消息源找御。
5)JMS生產(chǎn)者和消費(fèi)者元镀。生產(chǎn)者(Message Producer)和消費(fèi)者(Message Consumer)對(duì)象由Session對(duì)象創(chuàng)建绍填,用于發(fā)送和接收消息。
6)JMS消息通常有兩種類(lèi)型: ∑芤伞① 點(diǎn)對(duì)點(diǎn)(Point-to-Point)讨永。在點(diǎn)對(duì)點(diǎn)的消息系統(tǒng)中,消息分發(fā)給一個(gè)單獨(dú)的使用者遇革。點(diǎn)對(duì)點(diǎn)消息往往與隊(duì)列(javax.jms.Queue)相關(guān)聯(lián)卿闹。 ② 發(fā)布/訂閱(Publish/Subscribe)萝快。發(fā)布/訂閱消息系統(tǒng)支持一個(gè)事件驅(qū)動(dòng)模型锻霎,消息生產(chǎn)者和消費(fèi)者都參與消息的傳遞。生產(chǎn)者發(fā)布事件揪漩,而使用者訂閱感興趣的事件旋恼,并使用事件。該類(lèi)型消息一般與特定的主題(javax.jms.Topic)關(guān)聯(lián)奄容。
<blockquote><h4>ActiveMQ代碼實(shí)現(xiàn)</h4></blockquote>
1)下載ActiveMQ
去官方網(wǎng)站下載:http://activemq.apache.org/
2)解壓運(yùn)行AciveMQ
解壓apache-activemq-5.13.3-bin.zip文件冰更,運(yùn)行apache-activemq-5.13.3\bin\win64\activemq.bat,啟動(dòng)ActiveMQ昂勒,登錄http://localhost:8161/admin/蜀细,創(chuàng)建一個(gè)Queues,命名為my-activemq
3)創(chuàng)建Maven項(xiàng)目戈盈,添加依賴(lài)POM.xml
dependencies>
<!-- activemq 相關(guān)maven依賴(lài) -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.7.0</version>
</dependency>
<!-- 日志相關(guān)依賴(lài) -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
4)創(chuàng)建消息發(fā)送者(生產(chǎn)者)Sender.java
package com.zxp.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 發(fā)送者
* 2016年5月6日 下午3:00:57
* @author zhangxiaoping
*/
public class Sender {
private static final Logger LOGGER=LoggerFactory.getLogger(Sender.class);
//默認(rèn)代理地址 "failover://tcp://localhost:61616" 服務(wù)器地址不同IP修改不同的IP
private static final String BROKER_URL=ActiveMQConnection.DEFAULT_BROKER_URL;
//消息隊(duì)列名稱(chēng)
private static final String SUBJECT="my-activemq";
private static int i=1;
public static void main(String[] args) throws JMSException, InterruptedException {
//初始化連接工廠
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(BROKER_URL);
//建立連接
Connection conn= connectionFactory.createConnection();
//啟動(dòng)連接
conn.start();
//創(chuàng)建Session奠衔,此方法第一個(gè)參數(shù)表示會(huì)話(huà)是否在事務(wù)中執(zhí)行,第二個(gè)參數(shù)設(shè)定會(huì)話(huà)的應(yīng)答模式
Session session= conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建目標(biāo)隊(duì)列
Destination dest = session.createQueue(SUBJECT);
//通過(guò)session創(chuàng)建消息的發(fā)送者
MessageProducer producer=session.createProducer(dest);
while(true){
//定義要發(fā)送的消息
TextMessage message= session.createTextMessage("======ActiveMQ發(fā)送消息===="+i+"===");
LOGGER.debug(message.getText());
//發(fā)送消息
producer.send(message);
//休眠2秒
Thread.sleep(2000);
i++;
}
// conn.close();
}
}
5)創(chuàng)建消息的接收者(消費(fèi)者)Receiver.java
package com.zxp.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 接收者
* 2016年5月6日 下午3:03:16
* @author zhangxiaoping
*/
public class Receiver implements MessageListener{
private static final Logger LOGGER=LoggerFactory.getLogger(Receiver.class);
//默認(rèn)代理地址 "failover://tcp://localhost:61616" 服務(wù)器地址不同IP修改不同的IP
private static final String BROKER_URL=ActiveMQConnection.DEFAULT_BROKER_URL;
//消息隊(duì)列名稱(chēng)
private static final String SUBJECT="my-activemq";
public static void main(String[] args) throws JMSException {
//初始化連接工廠
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(BROKER_URL);
//建立連接
Connection conn= connectionFactory.createConnection();
//啟動(dòng)連接
conn.start();
//創(chuàng)建Session塘娶,此方法第一個(gè)參數(shù)表示會(huì)話(huà)是否在事務(wù)中執(zhí)行涣觉,第二個(gè)參數(shù)設(shè)定會(huì)話(huà)的應(yīng)答模式
Session session= conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建目標(biāo)隊(duì)列
Destination dest=session.createQueue(SUBJECT);
//通過(guò)session創(chuàng)建消息的接收者
MessageConsumer consumer= session.createConsumer(dest);
//初始化監(jiān)聽(tīng)
Receiver receiver=new Receiver();
//給接收者添加監(jiān)聽(tīng)對(duì)象
consumer.setMessageListener(receiver);
}
public void onMessage(Message arg0) {
TextMessage message=(TextMessage) arg0;
try {
LOGGER.debug("接收到消息"+message.getText());
Thread.sleep(4000);
} catch (Exception e) {
LOGGER.error("error"+e.getMessage());
}
}
}
6)運(yùn)行Sender.java、Receiver.java登錄http://localhost:8161/admin/查看隊(duì)列信息血柳。