轉(zhuǎn): 架構(gòu)設(shè)計(jì):系統(tǒng)間通信——ActiveMQ的安裝與使用

1严衬、前言

之前我們通過(guò)兩篇文章(架構(gòu)設(shè)計(jì):系統(tǒng)間通信(19)——MQ:消息協(xié)議(上)齐苛、架構(gòu)設(shè)計(jì):系統(tǒng)間通信(20)——MQ:消息協(xié)議(下))從理論層面上為大家介紹了消息協(xié)議的基本定義盛龄,并花了較大篇幅向讀者介紹了三種典型的消息協(xié)議:XMPP協(xié)議、Stomp協(xié)議和AMQP協(xié)議爱只。本小節(jié)開(kāi)始浪规,我們基于之前的知識(shí)點(diǎn)講解這些協(xié)議在具體的“消息隊(duì)列中間件”中是如何被我們操作的。由于本人在實(shí)際工作中經(jīng)常使用ActiveMQ和RabbitMQ脯倚,所以就選取這兩個(gè)“消息隊(duì)列中間件”進(jìn)行講解渔彰。如果讀者可以補(bǔ)充其他“消息隊(duì)列中間件”的使用,那當(dāng)然是再好不過(guò)了推正。

2恍涂、ActiveMQ的安裝和使用

ActiveMQ是Apache軟件基金會(huì)的開(kāi)源產(chǎn)品,支持AMQP協(xié)議植榕、MQTT協(xié)議(和XMPP協(xié)議作用類似)再沧、Openwire協(xié)議和Stomp協(xié)議等多種消息協(xié)議。并且ActiveMQ完整支持JMS API接口規(guī)范(當(dāng)然Apache也提供多種其他語(yǔ)言的客戶端尊残,例如:C炒瘸、C++、C#寝衫、Ruby顷扩、Perl)。

2-1慰毅、ActiveMQ的安裝

在本文發(fā)布之時(shí)隘截,ActiveMQ最新的版本號(hào)是5.13.2(版本號(hào)升級(jí)很快,不過(guò)并不推薦使用最新的版本)。由ActiveMQ的安裝是很簡(jiǎn)單婶芭,所以這個(gè)過(guò)程并不值得我們花很大篇幅進(jìn)行討論东臀。具體的過(guò)程就是:下載->解壓->配置環(huán)境變量->運(yùn)行:

下載軟件

您可以Apache ActiveMQ的官網(wǎng)下載安裝包:https://activemq.apache.org/download-archives.html。這里我們示例在CentOS下的安裝過(guò)程雕擂,所以下載Linux下的壓縮包即可(http://www.apache.org/dyn/closer.cgi?path=/activemq/5.13.2/apache-activemq-5.13.2-bin.tar.gz)啡邑。

解壓安裝

將下載的安裝包放置在root用戶的home目錄內(nèi),解壓即可(當(dāng)然您可以根據(jù)自己的需要加壓到不同的文件路徑下)井赌。如下所示:

[root@localhost~]# tar -zxvf ./apache-activemq-5.13.2-bin.tar.gz

1

1

以上解壓使用的是root用戶谤逼,這是為了演示方便。正式環(huán)境中還是建議禁用root用戶仇穗,為activeMQ的運(yùn)行專門創(chuàng)建一個(gè)用戶和用戶組流部。

配置環(huán)境變量(不是必須的)

如果您只是在測(cè)試環(huán)境使用Apache ActiveMQ,以便熟悉消息中間件本身的特性和使用方式纹坐。那么您無(wú)需對(duì)解壓后的軟件進(jìn)行任何配置枝冀,所有可運(yùn)行的命令都在軟件安裝目錄的./bin目錄下。為了使用方便耘子,最好配置一下環(huán)境變量果漾,如下所示(注意,根據(jù)您自己的軟件安裝位置谷誓,環(huán)境變量的設(shè)置是不一樣的绒障,請(qǐng)不要盲目粘貼復(fù)制):

設(shè)置該次會(huì)話的環(huán)境變量:[root@localhost~]# export PATH=/usr/apache-activemq-5.13.1/bin/linux-x86-64:$PATH;永久設(shè)置環(huán)境變量:[root@localhost~]# echo "export PATH=/usr/apache-activemq-5.13.1/bin/linux-x86-64:$PATH;" >> /etc/profile

在ActiveMQ Version 5.9+的版本中,Apache ActiveMQ 針對(duì)操作系統(tǒng)進(jìn)行了更深入的優(yōu)化捍歪,所以您可以看到./bin目錄下户辱,有一個(gè)針對(duì)32位linux運(yùn)行命令的./linux-x86-32目錄,和針對(duì)64位Linux運(yùn)行命令的./linux-x86-64目錄糙臼。請(qǐng)按照您自己的情況進(jìn)行環(huán)境變量設(shè)置和命令運(yùn)行庐镐。

運(yùn)行程序

現(xiàn)在您可以在任何目錄,運(yùn)行activemq命令了变逃。注意activemq命令一共有6個(gè)參數(shù)(console | start | stop | restart | status | dump)必逆,啟動(dòng)Apache ActiveMQ使用的命令是activemq start:

[root@localhost~]# activemq start


如啟動(dòng)成功,就可以在瀏覽器上訪問(wèn)服務(wù)節(jié)點(diǎn)在8161端口的管理頁(yè)面了(例如http://localhost:8161):

點(diǎn)擊‘manage ActiveMQ broker’連接揽乱,可以進(jìn)入管理主界面(默認(rèn)的用戶和密碼都是admin)末患。以上就是Apache ActiveMQ消息中間件最簡(jiǎn)的安裝和運(yùn)行方式。在后續(xù)的文章中锤窑,我們會(huì)陸續(xù)討論ActiveMQ的集群和高性能優(yōu)化,那時(shí)會(huì)介紹對(duì)應(yīng)的ActiveMQ的配置問(wèn)題嚷炉。

2-2渊啰、ActiveMQ的其他命令參數(shù)

如同上文講到的,activemq命令除了start參數(shù)用于啟動(dòng)activemq程序以外,還有另外5個(gè)參數(shù)可以使用:console | stop | restart | status | dump绘证。他們代表的使用意義是:

stop:停止當(dāng)前ActiveMQ節(jié)點(diǎn)的運(yùn)行隧膏。

restart:重新啟動(dòng)當(dāng)前ActiveMQ節(jié)點(diǎn)。

status:查看當(dāng)前ActiveMQ節(jié)點(diǎn)的運(yùn)行狀態(tài)嚷那。如果當(dāng)前ActiveMQ節(jié)點(diǎn)沒(méi)有運(yùn)行胞枕,那么將返回“ActiveMQ Broker is not running”的提示信息。注意魏宽,status命令只能告訴開(kāi)發(fā)人員當(dāng)前節(jié)點(diǎn)時(shí)停止的還是運(yùn)行的腐泻,除此之外不能從status命令獲取更多的信息。例如队询,ActiveMQ為什么創(chuàng)建Queue失斉勺?當(dāng)前ActiveMQ使用了多少內(nèi)存蚌斩?而要獲取這些信息铆惑,需要使用以下參數(shù)啟動(dòng)ActiveMQ節(jié)點(diǎn)。

console:使用控制臺(tái)模式啟動(dòng)ActiveMQ節(jié)點(diǎn)送膳;在這種模式下员魏,開(kāi)發(fā)人員可以調(diào)試、監(jiān)控當(dāng)前ActivieMQ節(jié)點(diǎn)的實(shí)時(shí)情況叠聋,并獲取實(shí)時(shí)狀態(tài)。

dump:如果您采用console模式運(yùn)行ActiveMQ晒奕,那么就可以使用dump參數(shù)闻书,在console控制臺(tái)上獲取當(dāng)前ActiveMQ節(jié)點(diǎn)的線程狀態(tài)快照。

2-3脑慧、在ActiveMQ中傳遞Stomp消息

好吧魄眉,既然我們已經(jīng)討論過(guò)如何安裝和運(yùn)行ActiveMQ,也討論了Stomp協(xié)議的組織結(jié)構(gòu)闷袒,為什么我們不立即動(dòng)手試一試操作ActiveMQ承載Stomp協(xié)議的消息呢坑律?

下面我們使用ActiveMQ提供的Java客戶端(實(shí)際上就是ActiveMQ對(duì)JMS規(guī)范的實(shí)現(xiàn)),向ActiveMQ中的Queue(示例代碼中將這個(gè)Queue命名為’test’)發(fā)送一條Stomp協(xié)議消息囊骤,然后再使用JAVA語(yǔ)言的客戶端晃择,從ActiveMQ上接受這條消息:

使用ActiveMQ的API發(fā)送Stomp協(xié)議消息:

packagemq.test.stomp;importjava.net.Socket;importjava.util.Date;importorg.apache.activemq.transport.stomp.StompConnection;// 消息生產(chǎn)者publicclassTestProducer{publicstaticvoidmain(String[] args) {try{// 建立Stomp協(xié)議的連接StompConnection con =newStompConnection();? ? ? ? ? ? Socket so =newSocket("192.168.61.138",61613);? ? ? ? ? ? con.open(so);// 注意,協(xié)議版本可以是1.2也物,也可以是1.1con.setVersion("1.2");// 用戶名和密碼宫屠,這個(gè)不必多說(shuō)了con.connect("admin","admin");// 以下發(fā)送一條信息(您也可以使用“事務(wù)”方式)con.send("/test","234543"+newDate().getTime());? ? ? ? }catch(Exception e) {? ? ? ? ? ? e.printStackTrace(System.out);? ? ? ? }? ? }}

使用ActiveMQ的API接收Stomp協(xié)議消息:

package mq.test.stomp;import java.net.Socket;import java.net.SocketTimeoutException;import java.util.Map;import org.apache.activemq.transport.stomp.StompConnection;import org.apache.activemq.transport.stomp.StompFrame;public class TestConsumer {? ? public static void main(String[] args) throws Exception {? ? ? ? // 建立連接? ? ? ? StompConnection con = new StompConnection();Socket so = new Socket("192.168.61.138",61613);con.open(so);con.setVersion("1.2");con.connect("admin","admin");String ack ="client";con.subscribe("/test","client");// 接受消息(使用循環(huán)進(jìn)行)? ? ? ? for(;;) {StompFrame frame = null;try {? ? ? ? ? ? ? ? // 注意,如果沒(méi)有接收到消息滑蚯,? ? ? ? ? ? ? ? // 這個(gè)消費(fèi)者線程會(huì)停在這里浪蹂,直到本次等待超時(shí)? ? ? ? ? ? ? ? frame = con.receive();} catch(SocketTimeoutException e) {? ? ? ? ? ? ? ? continue;}? ? ? ? ? ? // 打印本次接收到的消息? ? ? ? ? ? System.out.println("frame.getAction() = "+ frame.getAction());Map headers = frame.getHeaders();String meesage_id = headers.get("message-id");System.out.println("frame.getBody() = "+ frame.getBody());System.out.println("frame.getCommandId() = "+ frame.getCommandId());// 在ack是client標(biāo)記的情況下抵栈,確認(rèn)消息? ? ? ? ? ? if("client".equals(ack)) {? ? ? ? ? ? ? ? con.ack(meesage_id);}? ? ? ? }? ? }}

以上分別是使用Activie提供的Stomp協(xié)議的消息生產(chǎn)端和Stomp協(xié)議的消息消費(fèi)端的代碼(如果您不清楚Stomp協(xié)議的細(xì)節(jié),可以參考我另一篇文章:《架構(gòu)設(shè)計(jì):系統(tǒng)間通信(19)——MQ:消息協(xié)議(上)》)坤次。請(qǐng)注意在代碼片段中古劲,并沒(méi)有出現(xiàn)任何一個(gè)帶有jms名稱的包或者類——這是因?yàn)锳ctiveMQ為Stomp協(xié)議提供的JavaAPI在內(nèi)部進(jìn)行了JMS規(guī)范的封裝。

您可以查看activemq-stomp中關(guān)于協(xié)議轉(zhuǎn)換部分的源代碼:org.apache.activemq.transport.stomp.JmsFrameTranslator和其父級(jí)接口:org.apache.activemq.transport.stomp.FrameTranslator來(lái)驗(yàn)證這件事情(關(guān)于ActiveMQ對(duì)JMS規(guī)范的實(shí)現(xiàn)設(shè)計(jì)缰猴,如果后續(xù)有時(shí)間再回頭進(jìn)行講解)产艾。

以下是Stomp協(xié)議的消費(fèi)者端的運(yùn)行效果(在生產(chǎn)者端已經(jīng)向ActiveMQ插入了一條消息之后):

frame.getAction() = MESSAGEframe.getBody() =2345431458460073204frame.getCommandId() =0


注意,由于消息體中插入了一個(gè)時(shí)間戳滑绒,所以您復(fù)制粘貼代碼后運(yùn)行效果并不會(huì)和我的演示程序完全一致闷堡。

2-4、ActiveMQ中的Queue和Topics

如果您細(xì)心的話蹬挤,在ActiveMQ提供的管理頁(yè)面上已經(jīng)看到有兩個(gè)功能頁(yè)面:Queue和Topic缚窿。Queue和Topic是JMS為開(kāi)發(fā)人員提供的兩種不同工作機(jī)制的消息隊(duì)列。在ActiveMQ官方的解釋是:

Topics

In JMS a Topic implements publish and subscribe semantics. When you publish a message it goes to all the subscribers who are interested - so zero to many subscribers will receive a copy of the message. Only subscribers who had an active subscription at the time the broker receives the message will get a copy of the message.

中文的可以譯做:JMS-Topic 隊(duì)列基于“訂閱-發(fā)布”模式焰扳,當(dāng)操作者發(fā)布一條消息后倦零,所有對(duì)這條消息感興趣的訂閱者都可以收到它——也就是說(shuō)這條消息會(huì)被拷貝成多份,進(jìn)行分發(fā)吨悍。只有當(dāng)前“活動(dòng)的”訂閱者能夠收到消息(換句話說(shuō)扫茅,如果當(dāng)前JMS-Topic隊(duì)列中沒(méi)有訂閱者,這條消息將被丟棄)育瓜。

Queue

A JMS Queue implements load balancer semantics. A single message will be received by exactly one consumer. If there are no consumers available at the time the message is sent it will be kept until a consumer is available that can process the message. If a consumer receives a message and does not acknowledge it before closing then the message will be redelivered to another consumer. A queue can have many consumers with messages load balanced across the available consumers.

So Queues implement a reliable load balancer in JMS.

中文的可以譯做:JMS-Queue是一種“負(fù)載均衡模式”的實(shí)現(xiàn)葫隙。一個(gè)消息能且只能被一個(gè)消費(fèi)者接受。如果當(dāng)前JMS-Queue中沒(méi)有任何的消費(fèi)者躏仇,那么這條消息將會(huì)被Queue存儲(chǔ)起來(lái)(實(shí)際應(yīng)用中可以存儲(chǔ)在磁盤上恋脚,也可以存儲(chǔ)在數(shù)據(jù)庫(kù)中,看軟件的配置)焰手,直到有一個(gè)消費(fèi)者連接上糟描。另外,如果消費(fèi)者在接受到消息后书妻,在他斷開(kāi)與JMS-Queue連接之前船响,沒(méi)有發(fā)送ack信息(可以是客戶端手動(dòng)發(fā)送,也可以是自動(dòng)發(fā)送)躲履,那么這條消息將被發(fā)送給其他消費(fèi)者见间。

以下表格摘自互聯(lián)網(wǎng)上的資料,基本上把Queue和Topic這兩種隊(duì)列的不同特性說(shuō)清楚了:

比較項(xiàng)目Topic 模式隊(duì)列Queue 模式隊(duì)列

工作模式“訂閱-發(fā)布”模式工猜,如果當(dāng)前沒(méi)有訂閱者米诉,消息將會(huì)被丟棄。如果有多個(gè)訂閱者篷帅,那么這些訂閱者都會(huì)收到消息“負(fù)載均衡”模式史侣,如果當(dāng)前沒(méi)有消費(fèi)者汗销,消息也不會(huì)丟棄;如果有多個(gè)消費(fèi)者抵窒,那么一條消息也只會(huì)發(fā)送給其中一個(gè)消費(fèi)者,并且要求消費(fèi)者ack信息叠骑。

有無(wú)狀態(tài)無(wú)狀態(tài)Queue數(shù)據(jù)默認(rèn)會(huì)在mq服務(wù)器上以文件形式保存李皇,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存儲(chǔ)宙枷。

傳遞完整性如果沒(méi)有訂閱者掉房,消息會(huì)被丟棄消息不會(huì)丟棄

處理效率由于消息要按照訂閱者的數(shù)量進(jìn)行復(fù)制,所以處理性能會(huì)隨著訂閱者的增加而明顯降低慰丛,并且還要結(jié)合不同消息協(xié)議自身的性能差異由于一條消息只發(fā)送給一個(gè)消費(fèi)者卓囚,所以就算消費(fèi)者再多,性能也不會(huì)有明顯降低诅病。當(dāng)然不同消息協(xié)議的具體性能也是有差異的

2-5哪亿、JMS和協(xié)議間轉(zhuǎn)換

上文已經(jīng)說(shuō)到,JMS這套面向消息通信的javaAPI 是一個(gè)和廠商無(wú)關(guān)的規(guī)范贤笆。通過(guò)JMS蝇棉,我們能實(shí)現(xiàn)不同消息中間件廠商、不同協(xié)議間的轉(zhuǎn)換和交互芥永。這一小節(jié)我們就來(lái)討論一下這個(gè)問(wèn)題篡殷。如果用一張圖來(lái)表示JMS在消息中間件中的作用話,那么就可以這么來(lái)畫:

首先您使用的MQ消息中間件需要實(shí)現(xiàn)了JMS規(guī)范埋涧;那么通過(guò)JMS規(guī)范板辽,開(kāi)發(fā)人員可以忽略各種消息協(xié)議的細(xì)節(jié),只要消息在同一隊(duì)列中棘催,就能夠保證各種消息協(xié)議間實(shí)現(xiàn)互相轉(zhuǎn)換劲弦。下面我們首先來(lái)看一個(gè)使用JMS API在ActiveMQ中操作openwire協(xié)議消息的簡(jiǎn)單示例,然后再給出一個(gè)通過(guò)JMS巧鸭,實(shí)現(xiàn)Stomp消息協(xié)議和Openwire消息協(xié)議間的互轉(zhuǎn)示例瓶您。

2-5-1、JMS操作

以下代碼使用向某個(gè)Queue(命名為test)中發(fā)送一條消息:

packagejms;importjavax.jms.Connection;importjavax.jms.Destination;importjavax.jms.MessageProducer;importjavax.jms.Session;importjavax.jms.TextMessage;importorg.apache.activemq.ActiveMQConnectionFactory;/** * 測(cè)試使用JMS API連接ActiveMQ *@authoryinwenjie */publicclassJMSProducer{/**? ? * 由于是測(cè)試代碼纲仍,這里忽略了異常處理呀袱。? ? * 正是代碼可不能這樣做? ? *@paramargs? ? *@throwsRuntimeException? ? */publicstaticvoidmain(String[] args)throwsException {// 定義JMS-ActiveMQ連接信息(默認(rèn)為Openwire協(xié)議)ActiveMQConnectionFactory connectionFactory =newActiveMQConnectionFactory("tcp://192.168.61.138:61616");? ? ? ? Session session =null;? ? ? ? Destination sendQueue;? ? ? ? Connection connection =null;//進(jìn)行連接connection = connectionFactory.createQueueConnection();? ? ? ? connection.start();//建立會(huì)話(設(shè)置一個(gè)帶有事務(wù)特性的會(huì)話)session = connection.createSession(true, Session.SESSION_TRANSACTED);//建立queue(當(dāng)然如果有了就不會(huì)重復(fù)建立)sendQueue = session.createQueue("/test");//建立消息發(fā)送者對(duì)象MessageProducer sender = session.createProducer(sendQueue);? ? ? ? TextMessage outMessage = session.createTextMessage();? ? ? ? outMessage.setText("這是發(fā)送的消息內(nèi)容");//發(fā)送(JMS是支持事務(wù)的)sender.send(outMessage);? ? ? ? session.commit();//關(guān)閉sender.close();? ? ? ? connection.close();? ? }}

當(dāng)以上代碼運(yùn)行到“start”的位置時(shí),我們可以通過(guò)觀察ActiveMQ管理界面中connection列表中的連接信息郑叠,發(fā)現(xiàn)消息生產(chǎn)者已經(jīng)建立了一個(gè)Openwire協(xié)議的連接:

從而確定我們通過(guò)JMS API建立了一個(gè)openwire協(xié)議的通訊連接夜赵。接著我們使用以下代碼,建立一個(gè)基于openwire協(xié)議的“消費(fèi)者”乡革。注意:消息生產(chǎn)者和消息消費(fèi)者寇僧,映射的隊(duì)列必須一致摊腋。(在示例代碼中,它們都映射名稱為test的JMS-Queue)

以下代碼使用JMS從某個(gè)Queue中接收消息:

packagejms;importjavax.jms.Connection;importjavax.jms.Destination;importjavax.jms.Message;importjavax.jms.MessageConsumer;importjavax.jms.MessageListener;importjavax.jms.Session;importorg.apache.activemq.ActiveMQConnectionFactory;/** * 測(cè)試使用JMS API連接ActiveMQ *@authoryinwenjie */publicclassJMSConsumer{/**? ? * 由于是測(cè)試代碼嘁傀,這里忽略了異常處理兴蒸。? ? * 正是代碼可不能這樣做? ? *@paramargs? ? *@throwsRuntimeException? ? */publicstaticvoidmain(String[] args)throwsException {// 定義JMS-ActiveMQ連接信息ActiveMQConnectionFactory connectionFactory =newActiveMQConnectionFactory("tcp://192.168.61.138:61616");? ? ? ? Session session =null;? ? ? ? Destination sendQueue;? ? ? ? Connection connection =null;//進(jìn)行連接connection = connectionFactory.createQueueConnection();? ? ? ? connection.start();//建立會(huì)話(設(shè)置為自動(dòng)ack)session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//建立Queue(當(dāng)然如果有了就不會(huì)重復(fù)建立)sendQueue = session.createQueue("/test");//建立消息發(fā)送者對(duì)象MessageConsumer consumer = session.createConsumer(sendQueue);? ? ? ? consumer.setMessageListener(newMessageListener() {@OverridepublicvoidonMessage(Message arg0) {// 接收到消息后,不需要再發(fā)送ack了细办。System.out.println("Message = "+ arg0);? ? ? ? ? ? }? ? ? ? });synchronized(JMSConsumer.class) {? ? ? ? ? ? JMSConsumer.class.wait();? ? ? ? }//關(guān)閉consumer.close();? ? ? ? connection.close();? ? }}

當(dāng)以上“消費(fèi)者”代碼運(yùn)行到start的位置時(shí)橙凳,我們通過(guò)ActiveMQ提供的管理界面可以看到,基于Openwire協(xié)議的連接增加到了兩條:

注意笑撞,您在運(yùn)行以上測(cè)試代碼時(shí)岛啸,不用和我的運(yùn)行順序一致。由于Queue模式的隊(duì)列是要進(jìn)行消息狀態(tài)保存的茴肥,所以無(wú)論您是先運(yùn)行“消費(fèi)者”端坚踩,還是先運(yùn)行“生產(chǎn)者”端,最后“消費(fèi)者”都會(huì)收到一條消息瓤狐。類似如下的效果:

Message=ActiveMQTextMessage{commandId =6, responseRequired =false, messageId =ID:yinwenjie-240-60482-1458616972423-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId =ID:yinwenjie-240-60482-1458616972423-1:1:1:1, destination =queue:///test, transactionId =TX:ID:yinwenjie-240-60482-1458616972423-1:1:1, expiration =0, timestamp =1458617840154, arrival =0, brokerInTime =1458617840166, brokerOutTime =1458617840187, correlationId = null, replyTo = null, persistent =true, type = null, priority =4, groupID = null, groupSequence =0, targetConsumerId = null, compressed =false, userID = null, content = org.apache.activemq.util.ByteSequence@66968df8, marshalledProperties = null, dataStructure = null, redeliveryCounter =0, size =0, properties = null, readOnlyProperties =true, readOnlyBody =true, droppable =false, jmsXGroupFirstForConsumer =false, text = 這是發(fā)送的消息內(nèi)容}

2-5-2瞬铸、協(xié)議間轉(zhuǎn)換

下面我們將Openwire協(xié)議的消息通過(guò)JMS送入Queue隊(duì)列,并且讓基于Stomp協(xié)議的消費(fèi)者接收到這條消息芬首。為了節(jié)約篇幅赴捞,基于Openwire協(xié)議的生產(chǎn)者的代碼請(qǐng)參考上一小節(jié)2-5-1中“生產(chǎn)者”的代碼片段。這里只列出Stomp消息的接受者代碼(實(shí)際上這段代碼在上文中也可以找到):

Stomp協(xié)議的消息消費(fèi)者(消息接收者):

packagejms;importjavax.jms.Connection;importjavax.jms.Destination;importjavax.jms.MessageProducer;importjavax.jms.Session;importjavax.jms.TextMessage;importorg.apache.activemq.ActiveMQConnectionFactory;/** * 測(cè)試使用JMS API連接ActiveMQ *@authoryinwenjie */publicclassJMSProducer{/**? ? * 由于是測(cè)試代碼郁稍,這里忽略了異常處理赦政。? ? * 正是代碼可不能這樣做? ? *@paramargs? ? *@throwsRuntimeException? ? */publicstaticvoidmain(String[] args)throwsException {// 定義JMS-ActiveMQ連接信息(默認(rèn)為Openwire協(xié)議)ActiveMQConnectionFactory connectionFactory =newActiveMQConnectionFactory("tcp://192.168.61.138:61616");

Session session =null;

Destination sendQueue;

Connection connection =null;//進(jìn)行連接connection = connectionFactory.createQueueConnection();

connection.start();//建立會(huì)話(設(shè)置一個(gè)帶有事務(wù)特性的會(huì)話)session = connection.createSession(true, Session.SESSION_TRANSACTED);//建立queue(當(dāng)然如果有了就不會(huì)重復(fù)建立)sendQueue = session.createQueue("/test");//建立消息發(fā)送者對(duì)象MessageProducer sender = session.createProducer(sendQueue);

TextMessage outMessage = session.createTextMessage();

outMessage.setText("這是發(fā)送的消息內(nèi)容");//發(fā)送(JMS是支持事務(wù)的)sender.send(outMessage);

session.commit();//關(guān)閉sender.close();

connection.close();

}

}

當(dāng)您同時(shí)運(yùn)行Openwire消息發(fā)送者和Stomp消息接收者時(shí),您可以在ActiveMQ的管理界面看到這兩種協(xié)議的連接信息:

以下是Stomp協(xié)議消費(fèi)者接收到的消息內(nèi)容(經(jīng)過(guò)轉(zhuǎn)換的openwire協(xié)議消息):

frame.getAction() = MESSAGEframe.getBody() = 這是發(fā)送的消息內(nèi)容frame.getCommandId() =0


接下文

轉(zhuǎn):http://blog.csdn.net/yinwenjie/article/details/50916518

? ? ? ? ? http://blog.csdn.net/yinwenjie/article/details/50955502

? ? ? ? ? http://blog.csdn.net/yinwenjie/article/details/50991443

? ? ? ? ? http://blog.csdn.net/yinwenjie/article/details/51064242

? ? ? ? ? http://blog.csdn.net/yinwenjie/article/details/51124749

? ? ? ? ? http://blog.csdn.net/yinwenjie/article/details/51205822

? ? ? ? ? http://blog.csdn.net/yinwenjie/article/details/51247935

? ? ??

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末耀怜,一起剝皮案震驚了整個(gè)濱河市恢着,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌财破,老刑警劉巖掰派,帶你破解...
    沈念sama閱讀 206,013評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異左痢,居然都是意外死亡靡羡,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,205評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門俊性,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)略步,“玉大人,你說(shuō)我怎么就攤上這事定页√吮。” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 152,370評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵典徊,是天一觀的道長(zhǎng)杭煎。 經(jīng)常有香客問(wèn)我恩够,道長(zhǎng),這世上最難降的妖魔是什么羡铲? 我笑而不...
    開(kāi)封第一講書人閱讀 55,168評(píng)論 1 278
  • 正文 為了忘掉前任蜂桶,我火速辦了婚禮,結(jié)果婚禮上也切,老公的妹妹穿的比我還像新娘屎飘。我一直安慰自己,他們只是感情好贾费,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,153評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著檐盟,像睡著了一般褂萧。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上葵萎,一...
    開(kāi)封第一講書人閱讀 48,954評(píng)論 1 283
  • 那天导犹,我揣著相機(jī)與錄音,去河邊找鬼羡忘。 笑死谎痢,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的卷雕。 我是一名探鬼主播节猿,決...
    沈念sama閱讀 38,271評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼漫雕!你這毒婦竟也來(lái)了滨嘱?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 36,916評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤浸间,失蹤者是張志新(化名)和其女友劉穎太雨,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體魁蒜,經(jīng)...
    沈念sama閱讀 43,382評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡囊扳,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,877評(píng)論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了兜看。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片锥咸。...
    茶點(diǎn)故事閱讀 37,989評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖铣减,靈堂內(nèi)的尸體忽然破棺而出她君,到底是詐尸還是另有隱情,我是刑警寧澤葫哗,帶...
    沈念sama閱讀 33,624評(píng)論 4 322
  • 正文 年R本政府宣布缔刹,位于F島的核電站球涛,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏校镐。R本人自食惡果不足惜亿扁,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,209評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望鸟廓。 院中可真熱鬧从祝,春花似錦、人聲如沸引谜。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,199評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)员咽。三九已至毒涧,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間贝室,已是汗流浹背契讲。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 31,418評(píng)論 1 260
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留滑频,地道東北人捡偏。 一個(gè)月前我還...
    沈念sama閱讀 45,401評(píng)論 2 352
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像峡迷,于是被迫代替她去往敵國(guó)和親银伟。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,700評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容

  • ActiveMQ 即時(shí)通訊服務(wù) 淺析http://www.cnblogs.com/hoojo/p/active_m...
    bboymonk閱讀 1,479評(píng)論 0 11
  • ActiveMQ入門教程 本博客內(nèi)容皆為網(wǎng)絡(luò)搜集而來(lái)绘搞,不保證任何版權(quán)問(wèn)題枣申,不保證長(zhǎng)期有效性(即具有時(shí)效性),如有侵...
    龍圣賢閱讀 33,897評(píng)論 6 48
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理看杭,服務(wù)發(fā)現(xiàn)忠藤,斷路器,智...
    卡卡羅2017閱讀 134,599評(píng)論 18 139
  • 消息中間件 消息中間件有很多的用途和優(yōu)點(diǎn): 1. 將數(shù)據(jù)從一個(gè)應(yīng)用程序傳送到另一個(gè)應(yīng)用程序楼雹,或者從軟件的一個(gè)模塊傳...
    錯(cuò)位的季節(jié)閱讀 784評(píng)論 0 1
  • 文/小伊 圖/網(wǎng)絡(luò) 01. 生活中總能聽(tīng)到一些人哭訴著抱怨說(shuō),生活過(guò)得...
    小伊讀書閱讀 503評(píng)論 4 2