ActiveMQ 即時(shí)通訊服務(wù) 淺析http://www.cnblogs.com/hoojo/p/active_mq_jms_apache_activeMQ.html
**一咧织、 ****概述與介紹******
ActiveMQ 是Apache出品,最流行的籍救、功能強(qiáng)大的即時(shí)通訊和集成模式的開源服務(wù)器拯爽。ActiveMQ 是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn)。提供客戶端支持跨語(yǔ)言和協(xié)議钧忽,帶有易于在充分支持JMS 1.1和1.4使用J2EE企業(yè)集成模式和許多先進(jìn)的功能毯炮。
**二、 ****特性******
1耸黑、 多種語(yǔ)言和協(xié)議編寫客戶端桃煎。語(yǔ)言: Java、C大刊、C++为迈、C#、Ruby、Perl葫辐、Python搜锰、PHP。應(yīng)用協(xié)議:OpenWire耿战、Stomp REST蛋叼、WS Notification、XMPP剂陡、AMQP
2狈涮、完全支持JMS1.1和J2EE 1.4規(guī)范 (持久化,XA消息鸭栖,事務(wù))
3歌馍、對(duì)Spring的支持,ActiveMQ可以很容易內(nèi)嵌到使用Spring的系統(tǒng)里面去晕鹊,而且也支持Spring2.0的特性
4松却、通過了常見J2EE服務(wù)器(如 Geronimo、JBoss 4溅话、GlassFish玻褪、WebLogic)的測(cè)試,其中通過JCA 1.5 resource adaptors的配置公荧,可以讓ActiveMQ可以自動(dòng)的部署到任何兼容J2EE 1.4 商業(yè)服務(wù)器上
5、支持多種傳送協(xié)議:in-VM同规、TCP循狰、SSL、NIO券勺、UDP绪钥、JGroups、JXTA
6关炼、支持通過JDBC和journal提供高速的消息持久化
7程腹、從設(shè)計(jì)上保證了高性能的集群,客戶端-服務(wù)器儒拂,點(diǎn)對(duì)點(diǎn)
8寸潦、支持Ajax
9、支持與Axis的整合
10社痛、可以很容易得調(diào)用內(nèi)嵌JMS provider见转,進(jìn)行測(cè)試
三、 ****安裝
開發(fā)環(huán)境:
System:Windows
JDK:1.6+
IDE:eclipse
apache ActiveMQ 5.8
Email:hoojo_@126.com
Blog:http://blog.csdn.net/IBM_hoojo
http://hoojo.cnblogs.com/
1蒜哀、 下載ActiveMQ斩箫,下載地址:http://www.apache.org/dyn/closer.cgi?path=/activemq/apache-activemq/5.8.0/apache-activemq-5.8.0-bin.zip
2、 解壓apache-activemq-5.8.0.zip即可完成ActiveMQ的安裝
3、 解壓后目錄結(jié)構(gòu)如下
+bin (windows下面的bat和unix/linux下面的sh) 啟動(dòng)ActiveMQ的啟動(dòng)服務(wù)就在這里
+conf (activeMQ配置目錄乘客,包含最基本的activeMQ配置文件)
+data (默認(rèn)是空的)
+docs (index,replease版本里面沒有文檔)
+example (幾個(gè)例子)
+lib (activeMQ使用到的lib)
+webapps (系統(tǒng)管理員控制臺(tái)代碼)
+webapps-demo(系統(tǒng)示例代碼)
-activemq-all-5.8.0.jar (ActiveMQ的binary)
-user-guide.html (部署指引)
-LICENSE.txt
-NOTICE.txt
-README.txt
其他文件就不相信介紹了狐血,搞Java的應(yīng)該都知道干什么用的。
你可以進(jìn)入bin目錄易核,使用activemq.bat雙擊啟動(dòng)(windows用戶可以選擇系統(tǒng)位數(shù)匈织,如果你是linux的話,就用命令行的發(fā)送去啟動(dòng))耸成,如果一切順利报亩,你就會(huì)看見類似下面的信息:
如果你看到這個(gè),那么恭喜你成功了井氢。如果你啟動(dòng)看到了異常信息:
Caused by: java.io.IOException: Failed to bind to server socket: tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600 due to: java.net.SocketException: Unrecognized Windows Sockets error: 0: JVM_Bind
那么我告訴你弦追,很不幸,你的端口被占用了花竞。接下來(lái)你大概想知道是哪個(gè)程序占用了你的端口劲件,并kill掉該進(jìn)程或服務(wù)≡技保或者你要嘗試修改ActiveMQ的默認(rèn)端口61616(ActiveMQ使用的默認(rèn)端口是61616)零远,在大多數(shù)情況下,占用61616端口的是Internet Connection Sharing (ICS) 這個(gè)Windows服務(wù)厌蔽,你只需停止它就可以啟動(dòng)ActiveMQ了牵辣。
4、 啟動(dòng)成功就可以訪問管理員界面:http://localhost:8161/admin奴饮,默認(rèn)用戶名和密碼admin/admin纬向。如果你想修改用戶名和密碼的話,在conf/jetty-realm.properties中修改即可戴卜。
其中在導(dǎo)航菜單中逾条,Queues是隊(duì)列方式消息。Topics是主題方式消息投剥。Subscribers消息訂閱監(jiān)控查詢师脂。Connections可以查看鏈接數(shù),分別可以查看xmpp江锨、ssl吃警、stomp、openwire啄育、ws和網(wǎng)絡(luò)鏈接汤徽。Network是網(wǎng)絡(luò)鏈接數(shù)監(jiān)控。Send可以發(fā)送消息數(shù)據(jù)灸撰。
5谒府、 運(yùn)行demo示例拼坎,在dos控制臺(tái)輸入activemq.bat xbean:../conf/activemq-demo.xml 即可啟動(dòng)demo示例。官方提供的user-guide.html中的access the web console中是提示輸入:activemq.bat console xbean:conf/activemq-demo.xml完疫,我用這種方式不成功泰鸡。
當(dāng)然你還可以用絕對(duì)的文件目錄方式:activemq.bat xbean:file:D:/mq/conf/activemq-demo.xml
如果提示conf/activemq-demo.xml沒有找到,你可以嘗試改變下路徑壳鹤,也就是去掉上面的“..”盛龄。通過http://localhost:8161/demo/ 就可以訪問示例了。
**四芳誓、 ****消息示例******
1余舶、ActiviteMQ消息有3中形式
JMS ****公共
點(diǎn)對(duì)點(diǎn)域
發(fā)布/訂閱域
ConnectionFactory
QueueConnectionFactory
TopicConnectionFactory
Connection
QueueConnection
TopicConnection
Destination
Queue
Topic
Session
QueueSession
TopicSession
MessageProducer
QueueSender
TopicPublisher
MessageConsumer
QueueReceiver
TopicSubscriber
(1)、點(diǎn)對(duì)點(diǎn)方式(point-to-point)
點(diǎn)對(duì)點(diǎn)的消息發(fā)送方式主要建立在 Message Queue,Sender,reciever上锹淌,Message Queue 存貯消息匿值,Sneder 發(fā)送消息,receive接收消息.具體點(diǎn)就是Sender Client發(fā)送Message Queue ,而 receiver Cliernt從Queue中接收消息和"發(fā)送消息已接受"到Quere,確認(rèn)消息接收赂摆。消息發(fā)送客戶端與接收客戶端沒有時(shí)間上的依賴挟憔,發(fā)送客戶端可以在任何時(shí)刻發(fā)送信息到Queue,而不需要知道接收客戶端是不是在運(yùn)行
(2)烟号、發(fā)布/訂閱 方式(publish/subscriber Messaging)
發(fā)布/訂閱方式用于多接收客戶端的方式.作為發(fā)布訂閱的方式绊谭,可能存在多個(gè)接收客戶端,并且接收端客戶端與發(fā)送客戶端存在時(shí)間上的依賴汪拥。一個(gè)接收端只能接收他創(chuàng)建以后發(fā)送客戶端發(fā)送的信息达传。作為subscriber ,在接收消息時(shí)有兩種方法,destination的receive方法迫筑,和實(shí)現(xiàn)message listener 接口的onMessage 方法宪赶。
發(fā)送消息的基本步驟:
(1)铣焊、創(chuàng)建連接使用的工廠類JMS ConnectionFactory
(2)、使用管理對(duì)象JMS ConnectionFactory建立連接Connection罕伯,并啟動(dòng)
(3)曲伊、使用連接Connection 建立會(huì)話Session
(4)、使用會(huì)話Session和管理對(duì)象Destination創(chuàng)建消息生產(chǎn)者M(jìn)essageSender
(5)追他、使用消息生產(chǎn)者M(jìn)essageSender發(fā)送消息
消息接收者從JMS接受消息的步驟
(1)坟募、創(chuàng)建連接使用的工廠類JMS ConnectionFactory
(2)、使用管理對(duì)象JMS ConnectionFactory建立連接Connection邑狸,并啟動(dòng)
(3)懈糯、使用連接Connection 建立會(huì)話Session
(4)、使用會(huì)話Session和管理對(duì)象Destination創(chuàng)建消息接收者M(jìn)essageReceiver
(5)单雾、使用消息接收者M(jìn)essageReceiver接受消息赚哗,需要用setMessageListener將MessageListener接口綁定到MessageReceiver消息接收者必須實(shí)現(xiàn)了MessageListener接口她紫,需要定義onMessage事件方法。
**五屿储、 ****代碼示例******
在代碼開始贿讹,我們先建一個(gè)project,在這個(gè)project中添加如下jar包
添加完jar包后就可以開始實(shí)際的代碼工作了够掠。
1民褂、 使用JMS方式發(fā)送接收消息
消息發(fā)送者
package com.hoo.mq.jms;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
- <b>function:</b> 消息發(fā)送者
- @author hoojo
- @createDate 2013-6-19 上午11:26:43
- @file MessageSender.java
- @package com.hoo.mq.jms
- @project ActiveMQ-5.8
- @blog http://blog.csdn.net/IBM_hoojo
- @email hoojo_@126.com
- @version 1.0
*/
public class MessageSender {
// 發(fā)送次數(shù)
public static final int SEND_NUM = 5;
// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";
// 目標(biāo),在ActiveMQ管理員控制臺(tái)創(chuàng)建 http://localhost:8161/admin/queues.jsp
public static final String DESTINATION = "hoo.mq.queue";
/**
- <b>function:</b> 發(fā)送消息
- @author hoojo
- @createDate 2013-6-19 下午12:05:42
- @param session
- @param producer
- @throws Exception
*/
public static void sendMessage(Session session, MessageProducer producer) throws Exception {
for (int i = 0; i < SEND_NUM; i++) {
String message = "發(fā)送消息第" + (i + 1) + "條";
TextMessage text = session.createTextMessage(message);
System.out.println(message);
producer.send(text);
}
}
public static void run() throws Exception {
Connection connection = null;
Session session = null;
try {
// 創(chuàng)建鏈接工廠
ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通過工廠創(chuàng)建一個(gè)連接
connection = factory.createConnection();
// 啟動(dòng)連接
connection.start();
// 創(chuàng)建一個(gè)session會(huì)話
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建一個(gè)消息隊(duì)列
Destination destination = session.createQueue(DESTINATION);
// 創(chuàng)建消息制作者
MessageProducer producer = session.createProducer(destination);
// 設(shè)置持久化模式
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, producer);
// 提交會(huì)話
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 關(guān)閉釋放資源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
MessageSender.run();
}
}
接受者
package com.hoo.mq.jms;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
- <b>function:</b> 消息接收者
- @author hoojo
- @createDate 2013-6-19 下午01:34:27
- @file MessageReceiver.java
- @package com.hoo.mq.jms
- @project ActiveMQ-5.8
- @blog http://blog.csdn.net/IBM_hoojo
- @email hoojo_@126.com
- @version 1.0
*/
public class MessageReceiver {
// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";
// 目標(biāo)疯潭,在ActiveMQ管理員控制臺(tái)創(chuàng)建 http://localhost:8161/admin/queues.jsp
public static final String DESTINATION = "hoo.mq.queue";
public static void run() throws Exception {
Connection connection = null;
Session session = null;
try {
// 創(chuàng)建鏈接工廠
ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通過工廠創(chuàng)建一個(gè)連接
connection = factory.createConnection();
// 啟動(dòng)連接
connection.start();
// 創(chuàng)建一個(gè)session會(huì)話
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建一個(gè)消息隊(duì)列
Destination destination = session.createQueue(DESTINATION);
// 創(chuàng)建消息制作者
MessageConsumer consumer = session.createConsumer(destination);
while (true) {
// 接收數(shù)據(jù)的時(shí)間(等待) 100 ms
Message message = consumer.receive(1000 * 100);
TextMessage text = (TextMessage) message;
if (text != null) {
System.out.println("接收:" + text.getText());
} else {
break;
}
}
// 提交會(huì)話
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 關(guān)閉釋放資源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
MessageReceiver.run();
}
}
2赊堪、 Queue隊(duì)列方式發(fā)送點(diǎn)對(duì)點(diǎn)消息數(shù)據(jù)
發(fā)送方
package com.hoo.mq.queue;
import javax.jms.DeliveryMode;
import javax.jms.MapMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
- <b>function:</b> Queue 方式消息發(fā)送者
- @author hoojo
- @createDate 2013-6-19 下午04:34:36
- @file QueueSender.java
- @package com.hoo.mq.queue
- @project ActiveMQ-5.8
- @blog http://blog.csdn.net/IBM_hoojo
- @email hoojo_@126.com
- @version 1.0
*/
public class QueueSender {
// 發(fā)送次數(shù)
public static final int SEND_NUM = 5;
// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";
// 目標(biāo),在ActiveMQ管理員控制臺(tái)創(chuàng)建 http://localhost:8161/admin/queues.jsp
public static final String DESTINATION = "hoo.mq.queue";
/**
- <b>function:</b> 發(fā)送消息
- @author hoojo
- @createDate 2013-6-19 下午12:05:42
- @param session
- @param sender
- @throws Exception
*/
public static void sendMessage(QueueSession session, javax.jms.QueueSender sender) throws Exception {
for (int i = 0; i < SEND_NUM; i++) {
String message = "發(fā)送消息第" + (i + 1) + "條";
MapMessage map = session.createMapMessage();
map.setString("text", message);
map.setLong("time", System.currentTimeMillis());
System.out.println(map);
sender.send(map);
}
}
public static void run() throws Exception {
QueueConnection connection = null;
QueueSession session = null;
try {
// 創(chuàng)建鏈接工廠
QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通過工廠創(chuàng)建一個(gè)連接
connection = factory.createQueueConnection();
// 啟動(dòng)連接
connection.start();
// 創(chuàng)建一個(gè)session會(huì)話
session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建一個(gè)消息隊(duì)列
Queue queue = session.createQueue(DESTINATION);
// 創(chuàng)建消息發(fā)送者
javax.jms.QueueSender sender = session.createSender(queue);
// 設(shè)置持久化模式
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, sender);
// 提交會(huì)話
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 關(guān)閉釋放資源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
QueueSender.run();
}
}
接收方
package com.hoo.mq.queue;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
- <b>function:</b> 消息接收者竖哩; 依賴hawtbuf-1.9.jar
- @author hoojo
- @createDate 2013-6-19 下午01:34:27
- @file MessageReceiver.java
- @package com.hoo.mq.queue
- @project ActiveMQ-5.8
- @blog http://blog.csdn.net/IBM_hoojo
- @email hoojo_@126.com
- @version 1.0
*/
public class QueueReceiver {
// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";
// 目標(biāo)哭廉,在ActiveMQ管理員控制臺(tái)創(chuàng)建 http://localhost:8161/admin/queues.jsp
public static final String TARGET = "hoo.mq.queue";
public static void run() throws Exception {
QueueConnection connection = null;
QueueSession session = null;
try {
// 創(chuàng)建鏈接工廠
QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通過工廠創(chuàng)建一個(gè)連接
connection = factory.createQueueConnection();
// 啟動(dòng)連接
connection.start();
// 創(chuàng)建一個(gè)session會(huì)話
session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建一個(gè)消息隊(duì)列
Queue queue = session.createQueue(TARGET);
// 創(chuàng)建消息制作者
javax.jms.QueueReceiver receiver = session.createReceiver(queue);
receiver.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
if (msg != null) {
MapMessage map = (MapMessage) msg;
try {
System.out.println(map.getLong("time") + "接收#" + map.getString("text"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 休眠100ms再關(guān)閉
Thread.sleep(1000 * 100);
// 提交會(huì)話
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 關(guān)閉釋放資源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
QueueReceiver.run();
}
}
3、 Topic主題發(fā)布和訂閱消息
消息發(fā)送方
package com.hoo.mq.topic;
import javax.jms.DeliveryMode;
import javax.jms.MapMessage;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
- <b>function:</b> Queue 方式消息發(fā)送者
- @author hoojo
- @createDate 2013-6-19 下午04:34:36
- @file QueueSender.java
- @package com.hoo.mq.topic
- @project ActiveMQ-5.8
- @blog http://blog.csdn.net/IBM_hoojo
- @email hoojo_@126.com
- @version 1.0
*/
public class TopicSender {
// 發(fā)送次數(shù)
public static final int SEND_NUM = 5;
// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";
// 目標(biāo)期丰,在ActiveMQ管理員控制臺(tái)創(chuàng)建 http://localhost:8161/admin/queues.jsp
public static final String DESTINATION = "hoo.mq.topic";
/**
- <b>function:</b> 發(fā)送消息
- @author hoojo
- @createDate 2013-6-19 下午12:05:42
- @param session 會(huì)話
- @param publisher 發(fā)布者
- @throws Exception
*/
public static void sendMessage(TopicSession session, TopicPublisher publisher) throws Exception {
for (int i = 0; i < SEND_NUM; i++) {
String message = "發(fā)送消息第" + (i + 1) + "條";
MapMessage map = session.createMapMessage();
map.setString("text", message);
map.setLong("time", System.currentTimeMillis());
System.out.println(map);
publisher.send(map);
}
}
public static void run() throws Exception {
TopicConnection connection = null;
TopicSession session = null;
try {
// 創(chuàng)建鏈接工廠
TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通過工廠創(chuàng)建一個(gè)連接
connection = factory.createTopicConnection();
// 啟動(dòng)連接
connection.start();
// 創(chuàng)建一個(gè)session會(huì)話
session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建一個(gè)消息隊(duì)列
Topic topic = session.createTopic(DESTINATION);
// 創(chuàng)建消息發(fā)送者
TopicPublisher publisher = session.createPublisher(topic);
// 設(shè)置持久化模式
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, publisher);
// 提交會(huì)話
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 關(guān)閉釋放資源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
TopicSender.run();
}
}
接收方
package com.hoo.mq.topic;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
- <b>function:</b> 消息接收者群叶; 依賴hawtbuf-1.9.jar
- @author hoojo
- @createDate 2013-6-19 下午01:34:27
- @file MessageReceiver.java
- @package com.hoo.mq.topic
- @project ActiveMQ-5.8
- @blog http://blog.csdn.net/IBM_hoojo
- @email hoojo_@126.com
- @version 1.0
*/
public class TopicReceiver {
// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";
// 目標(biāo),在ActiveMQ管理員控制臺(tái)創(chuàng)建 http://localhost:8161/admin/queues.jsp
public static final String TARGET = "hoo.mq.topic";
public static void run() throws Exception {
TopicConnection connection = null;
TopicSession session = null;
try {
// 創(chuàng)建鏈接工廠
TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通過工廠創(chuàng)建一個(gè)連接
connection = factory.createTopicConnection();
// 啟動(dòng)連接
connection.start();
// 創(chuàng)建一個(gè)session會(huì)話
session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建一個(gè)消息隊(duì)列
Topic topic = session.createTopic(TARGET);
// 創(chuàng)建消息制作者
TopicSubscriber subscriber = session.createSubscriber(topic);
subscriber.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
if (msg != null) {
MapMessage map = (MapMessage) msg;
try {
System.out.println(map.getLong("time") + "接收#" + map.getString("text"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 休眠100ms再關(guān)閉
Thread.sleep(1000 * 100);
// 提交會(huì)話
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 關(guān)閉釋放資源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
TopicReceiver.run();
}
}
4钝荡、 整合Spring實(shí)現(xiàn)消息發(fā)送和接收街立,在整合之前我們需要添加jar包,需要的jar包如下
這些jar包可以在D:\apache-activemq-5.8.0\lib這個(gè)lib目錄中找到埠通,添加完jar包后就開始編碼工作赎离。
消息發(fā)送者
package com.hoo.mq.spring.support;
import java.util.Date;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
/**
- <b>function:</b> Spring JMSTemplate 消息發(fā)送者
- @author hoojo
- @createDate 2013-6-24 下午02:18:48
- @file Sender.java
- @package com.hoo.mq.spring.support
- @project ActiveMQ-5.8
- @blog http://blog.csdn.net/IBM_hoojo
- @email hoojo_@126.com
- @version 1.0
*/
public class Sender {
public static void main(String[] args) {
ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:applicationContext-*.xml");
JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");
jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
MapMessage message = session.createMapMessage();
message.setString("message", "current system time: " + new Date().getTime());
return message;
}
});
}
}
消息接收者
package com.hoo.mq.spring.support;
import java.util.Map;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
/**
- <b>function:</b> Spring JMSTemplate 消息接收者
- @author hoojo
- @createDate 2013-6-24 下午02:22:32
- @file Receiver.java
- @package com.hoo.mq.spring.support
- @project ActiveMQ-5.8
- @blog http://blog.csdn.net/IBM_hoojo
- @email hoojo_@126.com
- @version 1.0
*/
public class Receiver {
@SuppressWarnings("unchecked")
public static void main(String[] args) {
ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:applicationContext-*.xml");
JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");
while(true) {
Map<String, Object> map = (Map<String, Object>) jmsTemplate.receiveAndConvert();
System.out.println("收到消息:" + map.get("message"));
}
}
}
這里主要是用到了JmsTemplate這個(gè)消息模板,這個(gè)對(duì)象在spring的IoC容器中管理端辱,所以要從spring的容器上下文中獲取梁剔。下面看看spring的配置文件applicationContext-beans.xml內(nèi)容:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd">
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
</property>
</bean>
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="hoo.mq.queue" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="activeMQConnectionFactory" />
<property name="defaultDestination" ref="destination" />
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</property>
</bean>
</beans>
這里的整合就比較簡(jiǎn)單了,如果你是web工程舞蔽,那你在需要用jms的時(shí)候荣病,只需用注入jmsTemplate即可。