1严衬、前言
之前我們通過(guò)兩篇文章(架構(gòu)設(shè)計(jì):系統(tǒng)間通信(19)——MQ:消息協(xié)議(上)齐苛、架構(gòu)設(shè)計(jì):系統(tǒng)間通信(20)——MQ:消息協(xié)議(下))從理論層面上為大家介紹了消息協(xié)議的基本定義盛龄,并花了較大篇幅向讀者介紹了三種典型的消息協(xié)議:XMPP協(xié)議、Stomp協(xié)議和AMQP協(xié)議爱只。本小節(jié)開(kāi)始浪规,我們基于之前的知識(shí)點(diǎn)講解這些協(xié)議在具體的“消息隊(duì)列中間件”中是如何被我們操作的。由于本人在實(shí)際工作中經(jīng)常使用ActiveMQ和RabbitMQ脯倚,所以就選取這兩個(gè)“消息隊(duì)列中間件”進(jìn)行講解渔彰。如果讀者可以補(bǔ)充其他“消息隊(duì)列中間件”的使用,那當(dāng)然是再好不過(guò)了推正。
ActiveMQ是Apache軟件基金會(huì)的開(kāi)源產(chǎn)品,支持AMQP協(xié)議植榕、MQTT協(xié)議(和XMPP協(xié)議作用類似)再沧、Openwire協(xié)議和Stomp協(xié)議等多種消息協(xié)議。并且ActiveMQ完整支持JMS API接口規(guī)范(當(dāng)然Apache也提供多種其他語(yǔ)言的客戶端尊残,例如:C炒瘸、C++、C#寝衫、Ruby顷扩、Perl)。
在本文發(fā)布之時(shí)隘截,ActiveMQ最新的版本號(hào)是5.13.2(版本號(hào)升級(jí)很快,不過(guò)并不推薦使用最新的版本)。由ActiveMQ的安裝是很簡(jiǎn)單婶芭,所以這個(gè)過(guò)程并不值得我們花很大篇幅進(jìn)行討論东臀。具體的過(guò)程就是:下載->解壓->配置環(huán)境變量->運(yùn)行:
下載軟件
您可以Apache ActiveMQ的官網(wǎng)下載安裝包:https://activemq.apache.org/download-archives.html。這里我們示例在CentOS下的安裝過(guò)程雕擂,所以下載Linux下的壓縮包即可(http://www.apache.org/dyn/closer.cgi?path=/activemq/5.13.2/apache-activemq-5.13.2-bin.tar.gz)啡邑。
解壓安裝
將下載的安裝包放置在root用戶的home目錄內(nèi),解壓即可(當(dāng)然您可以根據(jù)自己的需要加壓到不同的文件路徑下)井赌。如下所示:
[root@localhost~]# tar -zxvf ./apache-activemq-5.13.2-bin.tar.gz
1
1
以上解壓使用的是root用戶谤逼,這是為了演示方便。正式環(huán)境中還是建議禁用root用戶仇穗,為activeMQ的運(yùn)行專門創(chuàng)建一個(gè)用戶和用戶組流部。
配置環(huán)境變量(不是必須的)
如果您只是在測(cè)試環(huán)境使用Apache ActiveMQ,以便熟悉消息中間件本身的特性和使用方式纹坐。那么您無(wú)需對(duì)解壓后的軟件進(jìn)行任何配置枝冀,所有可運(yùn)行的命令都在軟件安裝目錄的./bin目錄下。為了使用方便耘子,最好配置一下環(huán)境變量果漾,如下所示(注意,根據(jù)您自己的軟件安裝位置谷誓,環(huán)境變量的設(shè)置是不一樣的绒障,請(qǐng)不要盲目粘貼復(fù)制):
設(shè)置該次會(huì)話的環(huán)境變量:[root@localhost~]# export PATH=/usr/apache-activemq-5.13.1/bin/linux-x86-64:$PATH;永久設(shè)置環(huán)境變量:[root@localhost~]# echo "export PATH=/usr/apache-activemq-5.13.1/bin/linux-x86-64:$PATH;" >> /etc/profile
在ActiveMQ Version 5.9+的版本中,Apache ActiveMQ 針對(duì)操作系統(tǒng)進(jìn)行了更深入的優(yōu)化捍歪,所以您可以看到./bin目錄下户辱,有一個(gè)針對(duì)32位linux運(yùn)行命令的./linux-x86-32目錄,和針對(duì)64位Linux運(yùn)行命令的./linux-x86-64目錄糙臼。請(qǐng)按照您自己的情況進(jìn)行環(huán)境變量設(shè)置和命令運(yùn)行庐镐。
運(yùn)行程序
現(xiàn)在您可以在任何目錄,運(yùn)行activemq命令了变逃。注意activemq命令一共有6個(gè)參數(shù)(console | start | stop | restart | status | dump)必逆,啟動(dòng)Apache ActiveMQ使用的命令是activemq start:
[root@localhost~]# activemq start
如啟動(dòng)成功,就可以在瀏覽器上訪問(wèn)服務(wù)節(jié)點(diǎn)在8161端口的管理頁(yè)面了(例如http://localhost:8161):
點(diǎn)擊‘manage ActiveMQ broker’連接揽乱,可以進(jìn)入管理主界面(默認(rèn)的用戶和密碼都是admin)末患。以上就是Apache ActiveMQ消息中間件最簡(jiǎn)的安裝和運(yùn)行方式。在后續(xù)的文章中锤窑,我們會(huì)陸續(xù)討論ActiveMQ的集群和高性能優(yōu)化,那時(shí)會(huì)介紹對(duì)應(yīng)的ActiveMQ的配置問(wèn)題嚷炉。
如同上文講到的,activemq命令除了start參數(shù)用于啟動(dòng)activemq程序以外,還有另外5個(gè)參數(shù)可以使用:console | stop | restart | status | dump绘证。他們代表的使用意義是:
stop:停止當(dāng)前ActiveMQ節(jié)點(diǎn)的運(yùn)行隧膏。
restart:重新啟動(dòng)當(dāng)前ActiveMQ節(jié)點(diǎn)。
status:查看當(dāng)前ActiveMQ節(jié)點(diǎn)的運(yùn)行狀態(tài)嚷那。如果當(dāng)前ActiveMQ節(jié)點(diǎn)沒(méi)有運(yùn)行胞枕,那么將返回“ActiveMQ Broker is not running”的提示信息。注意魏宽,status命令只能告訴開(kāi)發(fā)人員當(dāng)前節(jié)點(diǎn)時(shí)停止的還是運(yùn)行的腐泻,除此之外不能從status命令獲取更多的信息。例如队询,ActiveMQ為什么創(chuàng)建Queue失斉勺?當(dāng)前ActiveMQ使用了多少內(nèi)存蚌斩?而要獲取這些信息铆惑,需要使用以下參數(shù)啟動(dòng)ActiveMQ節(jié)點(diǎn)。
console:使用控制臺(tái)模式啟動(dòng)ActiveMQ節(jié)點(diǎn)送膳;在這種模式下员魏,開(kāi)發(fā)人員可以調(diào)試、監(jiān)控當(dāng)前ActivieMQ節(jié)點(diǎn)的實(shí)時(shí)情況叠聋,并獲取實(shí)時(shí)狀態(tài)。
dump:如果您采用console模式運(yùn)行ActiveMQ晒奕,那么就可以使用dump參數(shù)闻书,在console控制臺(tái)上獲取當(dāng)前ActiveMQ節(jié)點(diǎn)的線程狀態(tài)快照。
好吧魄眉,既然我們已經(jīng)討論過(guò)如何安裝和運(yùn)行ActiveMQ,也討論了Stomp協(xié)議的組織結(jié)構(gòu)闷袒,為什么我們不立即動(dòng)手試一試操作ActiveMQ承載Stomp協(xié)議的消息呢坑律?
下面我們使用ActiveMQ提供的Java客戶端(實(shí)際上就是ActiveMQ對(duì)JMS規(guī)范的實(shí)現(xiàn)),向ActiveMQ中的Queue(示例代碼中將這個(gè)Queue命名為’test’)發(fā)送一條Stomp協(xié)議消息囊骤,然后再使用JAVA語(yǔ)言的客戶端晃择,從ActiveMQ上接受這條消息:
使用ActiveMQ的API發(fā)送Stomp協(xié)議消息:
packagemq.test.stomp;importjava.net.Socket;importjava.util.Date;importorg.apache.activemq.transport.stomp.StompConnection;// 消息生產(chǎn)者publicclassTestProducer{publicstaticvoidmain(String[] args) {try{// 建立Stomp協(xié)議的連接StompConnection con =newStompConnection();? ? ? ? ? ? Socket so =newSocket("192.168.61.138",61613);? ? ? ? ? ? con.open(so);// 注意,協(xié)議版本可以是1.2也物,也可以是1.1con.setVersion("1.2");// 用戶名和密碼宫屠,這個(gè)不必多說(shuō)了con.connect("admin","admin");// 以下發(fā)送一條信息(您也可以使用“事務(wù)”方式)con.send("/test","234543"+newDate().getTime());? ? ? ? }catch(Exception e) {? ? ? ? ? ? e.printStackTrace(System.out);? ? ? ? }? ? }}
使用ActiveMQ的API接收Stomp協(xié)議消息:
package mq.test.stomp;import java.net.Socket;import java.net.SocketTimeoutException;import java.util.Map;import org.apache.activemq.transport.stomp.StompConnection;import org.apache.activemq.transport.stomp.StompFrame;public class TestConsumer {? ? public static void main(String[] args) throws Exception {? ? ? ? // 建立連接? ? ? ? StompConnection con = new StompConnection();Socket so = new Socket("192.168.61.138",61613);con.open(so);con.setVersion("1.2");con.connect("admin","admin");String ack ="client";con.subscribe("/test","client");// 接受消息(使用循環(huán)進(jìn)行)? ? ? ? for(;;) {StompFrame frame = null;try {? ? ? ? ? ? ? ? // 注意,如果沒(méi)有接收到消息滑蚯,? ? ? ? ? ? ? ? // 這個(gè)消費(fèi)者線程會(huì)停在這里浪蹂,直到本次等待超時(shí)? ? ? ? ? ? ? ? frame = con.receive();} catch(SocketTimeoutException e) {? ? ? ? ? ? ? ? continue;}? ? ? ? ? ? // 打印本次接收到的消息? ? ? ? ? ? System.out.println("frame.getAction() = "+ frame.getAction());Map headers = frame.getHeaders();String meesage_id = headers.get("message-id");System.out.println("frame.getBody() = "+ frame.getBody());System.out.println("frame.getCommandId() = "+ frame.getCommandId());// 在ack是client標(biāo)記的情況下抵栈,確認(rèn)消息? ? ? ? ? ? if("client".equals(ack)) {? ? ? ? ? ? ? ? con.ack(meesage_id);}? ? ? ? }? ? }}
以上分別是使用Activie提供的Stomp協(xié)議的消息生產(chǎn)端和Stomp協(xié)議的消息消費(fèi)端的代碼(如果您不清楚Stomp協(xié)議的細(xì)節(jié),可以參考我另一篇文章:《架構(gòu)設(shè)計(jì):系統(tǒng)間通信(19)——MQ:消息協(xié)議(上)》)坤次。請(qǐng)注意在代碼片段中古劲,并沒(méi)有出現(xiàn)任何一個(gè)帶有jms名稱的包或者類——這是因?yàn)锳ctiveMQ為Stomp協(xié)議提供的JavaAPI在內(nèi)部進(jìn)行了JMS規(guī)范的封裝。
您可以查看activemq-stomp中關(guān)于協(xié)議轉(zhuǎn)換部分的源代碼:org.apache.activemq.transport.stomp.JmsFrameTranslator和其父級(jí)接口:org.apache.activemq.transport.stomp.FrameTranslator來(lái)驗(yàn)證這件事情(關(guān)于ActiveMQ對(duì)JMS規(guī)范的實(shí)現(xiàn)設(shè)計(jì)缰猴,如果后續(xù)有時(shí)間再回頭進(jìn)行講解)产艾。
以下是Stomp協(xié)議的消費(fèi)者端的運(yùn)行效果(在生產(chǎn)者端已經(jīng)向ActiveMQ插入了一條消息之后):
frame.getAction() = MESSAGEframe.getBody() =2345431458460073204frame.getCommandId() =0
注意,由于消息體中插入了一個(gè)時(shí)間戳滑绒,所以您復(fù)制粘貼代碼后運(yùn)行效果并不會(huì)和我的演示程序完全一致闷堡。
如果您細(xì)心的話蹬挤,在ActiveMQ提供的管理頁(yè)面上已經(jīng)看到有兩個(gè)功能頁(yè)面:Queue和Topic缚窿。Queue和Topic是JMS為開(kāi)發(fā)人員提供的兩種不同工作機(jī)制的消息隊(duì)列。在ActiveMQ官方的解釋是:
Topics
In JMS a Topic implements publish and subscribe semantics. When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message. Only subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.
中文的可以譯做:JMS-Topic 隊(duì)列基于“訂閱-發(fā)布”模式焰扳,當(dāng)操作者發(fā)布一條消息后倦零,所有對(duì)這條消息感興趣的訂閱者都可以收到它——也就是說(shuō)這條消息會(huì)被拷貝成多份,進(jìn)行分發(fā)吨悍。只有當(dāng)前“活動(dòng)的”訂閱者能夠收到消息(換句話說(shuō)扫茅,如果當(dāng)前JMS-Topic隊(duì)列中沒(méi)有訂閱者,這條消息將被丟棄)育瓜。
Queue
A JMS Queue implements load balancer semantics. A single message will be received by exactly one consumer. If there are no consumers available at the time the message is sent it will be kept until a consumer is available that can process the message. If a consumer receives a message and does not acknowledge it before closing then the message will be redelivered to another consumer. A queue can have many consumers with messages load balanced across the available consumers.
So Queues implement a reliable load balancer in JMS.
中文的可以譯做:JMS-Queue是一種“負(fù)載均衡模式”的實(shí)現(xiàn)葫隙。一個(gè)消息能且只能被一個(gè)消費(fèi)者接受。如果當(dāng)前JMS-Queue中沒(méi)有任何的消費(fèi)者躏仇,那么這條消息將會(huì)被Queue存儲(chǔ)起來(lái)(實(shí)際應(yīng)用中可以存儲(chǔ)在磁盤上恋脚,也可以存儲(chǔ)在數(shù)據(jù)庫(kù)中,看軟件的配置)焰手,直到有一個(gè)消費(fèi)者連接上糟描。另外,如果消費(fèi)者在接受到消息后书妻,在他斷開(kāi)與JMS-Queue連接之前船响,沒(méi)有發(fā)送ack信息(可以是客戶端手動(dòng)發(fā)送,也可以是自動(dòng)發(fā)送)躲履,那么這條消息將被發(fā)送給其他消費(fèi)者见间。
以下表格摘自互聯(lián)網(wǎng)上的資料,基本上把Queue和Topic這兩種隊(duì)列的不同特性說(shuō)清楚了:
比較項(xiàng)目Topic 模式隊(duì)列Queue 模式隊(duì)列
工作模式“訂閱-發(fā)布”模式工猜,如果當(dāng)前沒(méi)有訂閱者米诉,消息將會(huì)被丟棄。如果有多個(gè)訂閱者篷帅,那么這些訂閱者都會(huì)收到消息“負(fù)載均衡”模式史侣,如果當(dāng)前沒(méi)有消費(fèi)者汗销,消息也不會(huì)丟棄;如果有多個(gè)消費(fèi)者抵窒,那么一條消息也只會(huì)發(fā)送給其中一個(gè)消費(fèi)者,并且要求消費(fèi)者ack信息叠骑。
有無(wú)狀態(tài)無(wú)狀態(tài)Queue數(shù)據(jù)默認(rèn)會(huì)在mq服務(wù)器上以文件形式保存李皇,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存儲(chǔ)宙枷。
傳遞完整性如果沒(méi)有訂閱者掉房,消息會(huì)被丟棄消息不會(huì)丟棄
處理效率由于消息要按照訂閱者的數(shù)量進(jìn)行復(fù)制,所以處理性能會(huì)隨著訂閱者的增加而明顯降低慰丛,并且還要結(jié)合不同消息協(xié)議自身的性能差異由于一條消息只發(fā)送給一個(gè)消費(fèi)者卓囚,所以就算消費(fèi)者再多,性能也不會(huì)有明顯降低诅病。當(dāng)然不同消息協(xié)議的具體性能也是有差異的
上文已經(jīng)說(shuō)到,JMS這套面向消息通信的javaAPI 是一個(gè)和廠商無(wú)關(guān)的規(guī)范贤笆。通過(guò)JMS蝇棉,我們能實(shí)現(xiàn)不同消息中間件廠商、不同協(xié)議間的轉(zhuǎn)換和交互芥永。這一小節(jié)我們就來(lái)討論一下這個(gè)問(wèn)題篡殷。如果用一張圖來(lái)表示JMS在消息中間件中的作用話,那么就可以這么來(lái)畫:
首先您使用的MQ消息中間件需要實(shí)現(xiàn)了JMS規(guī)范埋涧;那么通過(guò)JMS規(guī)范板辽,開(kāi)發(fā)人員可以忽略各種消息協(xié)議的細(xì)節(jié),只要消息在同一隊(duì)列中棘催,就能夠保證各種消息協(xié)議間實(shí)現(xiàn)互相轉(zhuǎn)換劲弦。下面我們首先來(lái)看一個(gè)使用JMS API在ActiveMQ中操作openwire協(xié)議消息的簡(jiǎn)單示例,然后再給出一個(gè)通過(guò)JMS巧鸭,實(shí)現(xiàn)Stomp消息協(xié)議和Openwire消息協(xié)議間的互轉(zhuǎn)示例瓶您。
以下代碼使用向某個(gè)Queue(命名為test)中發(fā)送一條消息:
packagejms;importjavax.jms.Connection;importjavax.jms.Destination;importjavax.jms.MessageProducer;importjavax.jms.Session;importjavax.jms.TextMessage;importorg.apache.activemq.ActiveMQConnectionFactory;/** * 測(cè)試使用JMS API連接ActiveMQ *@authoryinwenjie */publicclassJMSProducer{/**? ? * 由于是測(cè)試代碼纲仍,這里忽略了異常處理呀袱。? ? * 正是代碼可不能這樣做? ? *@paramargs? ? *@throwsRuntimeException? ? */publicstaticvoidmain(String[] args)throwsException {// 定義JMS-ActiveMQ連接信息(默認(rèn)為Openwire協(xié)議)ActiveMQConnectionFactory connectionFactory =newActiveMQConnectionFactory("tcp://192.168.61.138:61616");? ? ? ? Session session =null;? ? ? ? Destination sendQueue;? ? ? ? Connection connection =null;//進(jìn)行連接connection = connectionFactory.createQueueConnection();? ? ? ? connection.start();//建立會(huì)話(設(shè)置一個(gè)帶有事務(wù)特性的會(huì)話)session = connection.createSession(true, Session.SESSION_TRANSACTED);//建立queue(當(dāng)然如果有了就不會(huì)重復(fù)建立)sendQueue = session.createQueue("/test");//建立消息發(fā)送者對(duì)象MessageProducer sender = session.createProducer(sendQueue);? ? ? ? TextMessage outMessage = session.createTextMessage();? ? ? ? outMessage.setText("這是發(fā)送的消息內(nèi)容");//發(fā)送(JMS是支持事務(wù)的)sender.send(outMessage);? ? ? ? session.commit();//關(guān)閉sender.close();? ? ? ? connection.close();? ? }}
當(dāng)以上代碼運(yùn)行到“start”的位置時(shí),我們可以通過(guò)觀察ActiveMQ管理界面中connection列表中的連接信息郑叠,發(fā)現(xiàn)消息生產(chǎn)者已經(jīng)建立了一個(gè)Openwire協(xié)議的連接:
從而確定我們通過(guò)JMS API建立了一個(gè)openwire協(xié)議的通訊連接夜赵。接著我們使用以下代碼,建立一個(gè)基于openwire協(xié)議的“消費(fèi)者”乡革。注意:消息生產(chǎn)者和消息消費(fèi)者寇僧,映射的隊(duì)列必須一致摊腋。(在示例代碼中,它們都映射名稱為test的JMS-Queue)
以下代碼使用JMS從某個(gè)Queue中接收消息:
packagejms;importjavax.jms.Connection;importjavax.jms.Destination;importjavax.jms.Message;importjavax.jms.MessageConsumer;importjavax.jms.MessageListener;importjavax.jms.Session;importorg.apache.activemq.ActiveMQConnectionFactory;/** * 測(cè)試使用JMS API連接ActiveMQ *@authoryinwenjie */publicclassJMSConsumer{/**? ? * 由于是測(cè)試代碼嘁傀,這里忽略了異常處理兴蒸。? ? * 正是代碼可不能這樣做? ? *@paramargs? ? *@throwsRuntimeException? ? */publicstaticvoidmain(String[] args)throwsException {// 定義JMS-ActiveMQ連接信息ActiveMQConnectionFactory connectionFactory =newActiveMQConnectionFactory("tcp://192.168.61.138:61616");? ? ? ? Session session =null;? ? ? ? Destination sendQueue;? ? ? ? Connection connection =null;//進(jìn)行連接connection = connectionFactory.createQueueConnection();? ? ? ? connection.start();//建立會(huì)話(設(shè)置為自動(dòng)ack)session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//建立Queue(當(dāng)然如果有了就不會(huì)重復(fù)建立)sendQueue = session.createQueue("/test");//建立消息發(fā)送者對(duì)象MessageConsumer consumer = session.createConsumer(sendQueue);? ? ? ? consumer.setMessageListener(newMessageListener() {@OverridepublicvoidonMessage(Message arg0) {// 接收到消息后,不需要再發(fā)送ack了细办。System.out.println("Message = "+ arg0);? ? ? ? ? ? }? ? ? ? });synchronized(JMSConsumer.class) {? ? ? ? ? ? JMSConsumer.class.wait();? ? ? ? }//關(guān)閉consumer.close();? ? ? ? connection.close();? ? }}
當(dāng)以上“消費(fèi)者”代碼運(yùn)行到start的位置時(shí)橙凳,我們通過(guò)ActiveMQ提供的管理界面可以看到,基于Openwire協(xié)議的連接增加到了兩條:
注意笑撞,您在運(yùn)行以上測(cè)試代碼時(shí)岛啸,不用和我的運(yùn)行順序一致。由于Queue模式的隊(duì)列是要進(jìn)行消息狀態(tài)保存的茴肥,所以無(wú)論您是先運(yùn)行“消費(fèi)者”端坚踩,還是先運(yùn)行“生產(chǎn)者”端,最后“消費(fèi)者”都會(huì)收到一條消息瓤狐。類似如下的效果:
Message=ActiveMQTextMessage{commandId =6, responseRequired =false, messageId =ID:yinwenjie-240-60482-1458616972423-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId =ID:yinwenjie-240-60482-1458616972423-1:1:1:1, destination =queue:///test, transactionId =TX:ID:yinwenjie-240-60482-1458616972423-1:1:1, expiration =0, timestamp =1458617840154, arrival =0, brokerInTime =1458617840166, brokerOutTime =1458617840187, correlationId = null, replyTo = null, persistent =true, type = null, priority =4, groupID = null, groupSequence =0, targetConsumerId = null, compressed =false, userID = null, content = org.apache.activemq.util.ByteSequence@66968df8, marshalledProperties = null, dataStructure = null, redeliveryCounter =0, size =0, properties = null, readOnlyProperties =true, readOnlyBody =true, droppable =false, jmsXGroupFirstForConsumer =false, text = 這是發(fā)送的消息內(nèi)容}
下面我們將Openwire協(xié)議的消息通過(guò)JMS送入Queue隊(duì)列,并且讓基于Stomp協(xié)議的消費(fèi)者接收到這條消息芬首。為了節(jié)約篇幅赴捞,基于Openwire協(xié)議的生產(chǎn)者的代碼請(qǐng)參考上一小節(jié)2-5-1中“生產(chǎn)者”的代碼片段。這里只列出Stomp消息的接受者代碼(實(shí)際上這段代碼在上文中也可以找到):
Stomp協(xié)議的消息消費(fèi)者(消息接收者):
packagejms;importjavax.jms.Connection;importjavax.jms.Destination;importjavax.jms.MessageProducer;importjavax.jms.Session;importjavax.jms.TextMessage;importorg.apache.activemq.ActiveMQConnectionFactory;/** * 測(cè)試使用JMS API連接ActiveMQ *@authoryinwenjie */publicclassJMSProducer{/**? ? * 由于是測(cè)試代碼郁稍,這里忽略了異常處理赦政。? ? * 正是代碼可不能這樣做? ? *@paramargs? ? *@throwsRuntimeException? ? */publicstaticvoidmain(String[] args)throwsException {// 定義JMS-ActiveMQ連接信息(默認(rèn)為Openwire協(xié)議)ActiveMQConnectionFactory connectionFactory =newActiveMQConnectionFactory("tcp://192.168.61.138:61616");
Session session =null;
Destination sendQueue;
Connection connection =null;//進(jìn)行連接connection = connectionFactory.createQueueConnection();
connection.start();//建立會(huì)話(設(shè)置一個(gè)帶有事務(wù)特性的會(huì)話)session = connection.createSession(true, Session.SESSION_TRANSACTED);//建立queue(當(dāng)然如果有了就不會(huì)重復(fù)建立)sendQueue = session.createQueue("/test");//建立消息發(fā)送者對(duì)象MessageProducer sender = session.createProducer(sendQueue);
TextMessage outMessage = session.createTextMessage();
outMessage.setText("這是發(fā)送的消息內(nèi)容");//發(fā)送(JMS是支持事務(wù)的)sender.send(outMessage);
session.commit();//關(guān)閉sender.close();
connection.close();
}
}
當(dāng)您同時(shí)運(yùn)行Openwire消息發(fā)送者和Stomp消息接收者時(shí),您可以在ActiveMQ的管理界面看到這兩種協(xié)議的連接信息:
以下是Stomp協(xié)議消費(fèi)者接收到的消息內(nèi)容(經(jīng)過(guò)轉(zhuǎn)換的openwire協(xié)議消息):
frame.getAction() = MESSAGEframe.getBody() = 這是發(fā)送的消息內(nèi)容frame.getCommandId() =0
接下文