概述
????ActiveMQ是由Apache出品的犹撒,一款最流行的鞭达,能力強(qiáng)勁的開源消息總線凭涂。ActiveMQ是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn),它非呈唇快速早芭,支持多種語(yǔ)言的客戶端和協(xié)議典蝌,而且可以非常容易的嵌入到企業(yè)的應(yīng)用環(huán)境中本橙,并有許多高級(jí)功能埋凯。
特性
- 遵循JMS規(guī)范:ActiveMQ的各種特性是JMS1.1規(guī)范的實(shí)現(xiàn)握础。它們包括同步和異步消息傳遞辐董,一次和只有一次的消息傳遞,對(duì)于預(yù)訂者的持久消息等等禀综。依附于JMS規(guī)范意味著简烘,不論JMS消息提供者是誰(shuí),同樣的基本特性都是有效的定枷。(JMS可查看前篇博文(ActiveMQ基礎(chǔ)教程(一)JMS概述http://www.reibang.com/p/639627f88a6e)孤澎。
- 連接:ActiveMQ提供各種連接選擇,包括HTTP依鸥,HTTPS亥至,IP多點(diǎn)傳送,SSL贱迟,STOMP姐扮,TCP,UDP衣吠,XMPP等茶敏。大量的連接協(xié)議支持使之具有更好的靈活性。
- 支持多種語(yǔ)言客戶端:ActiveMQ對(duì)多種語(yǔ)言提供客戶端API缚俏,除了Java之外還有C/C++惊搏、.NET、Perl忧换、PHP恬惯、Ruby、Python等亚茬。這使得ActiveMQ能用在Java之外的其它語(yǔ)言中酪耳。很多其它語(yǔ)言都可以通過(guò)ActiveMQ提供的客戶端API使用ActiveMQ的全部特性。當(dāng)然刹缝,ActiveMQ代理器(broker)仍然是運(yùn)行在java虛擬機(jī)上碗暗,但是客戶端能夠使用其它的被支持的語(yǔ)言。
- 可插拔的持久性和安全:ActiveMQ提供多種持久性方案可供選擇梢夯,也可以完全按自己需求定制驗(yàn)證和授權(quán)言疗。例如,ActiveMQ通過(guò)KahaDB提供自己的超快速消息持久方案(ultra-fast message persistence)颂砸,但也支持標(biāo)準(zhǔn)的JDBC方案噪奄。ActiveMQ可以通過(guò)配置文件提供簡(jiǎn)單的驗(yàn)證和授權(quán)死姚,也提供標(biāo)準(zhǔn)的JAAS登陸模塊。
- 簡(jiǎn)單的管理:ActiveMQ是為開發(fā)者設(shè)計(jì)的勤篮。它并不需要專門的管理工具知允,因?yàn)樗峁└鞣N易用且強(qiáng)大的管理特性。有很多方法去監(jiān)控ActiveMQ的各個(gè)方面叙谨,可以通過(guò)JMX使用JConsole或ActiveMQ web console;可以運(yùn)行ActiveMQ消息報(bào)告保屯;可以用命令行腳本手负;可以通過(guò)日志。
- 支持集群:為了利于擴(kuò)展姑尺,多個(gè)ActiveMQ broker能夠聯(lián)合工作竟终。這個(gè)方式就是network of brokers并且能支持多種拓?fù)浣Y(jié)構(gòu)。
安裝與管理后臺(tái)
安裝
????ActiveMQ官網(wǎng)下載地址:http://activemq.apache.org/download.html
????ActiveMQ 提供了Windows 和Linux切蟋、Unix 等幾個(gè)版本统捶。具體安裝方法請(qǐng)自行查找資料進(jìn)行安裝,博主這邊就不多敘述柄粹。
管理后臺(tái)
????安裝成功啟動(dòng)ActiveMQ服務(wù)后喘鸟,在瀏覽器輸入http://localhost:8161,用戶名密碼默認(rèn)都是 admin。下面為登陸成功后的頁(yè)面:
Queues頁(yè)面
????Queues是隊(duì)列方式消息驻右,從菜單欄中點(diǎn)擊Queues可以進(jìn)入到Queues頁(yè)面什黑,頁(yè)面主要內(nèi)容包括:
- Name:消息隊(duì)列的名稱。
- Number Of Pending Messages:未被消費(fèi)的消息數(shù)目堪夭。
- Number Of Consumers:消費(fèi)者的數(shù)量愕把。
- Messages Enqueued:進(jìn)入隊(duì)列的消息 ;進(jìn)入隊(duì)列的總消息數(shù)目森爽,包括已經(jīng)被消費(fèi)的和未被消費(fèi)的恨豁。 這個(gè)數(shù)量只增不減。
- Messages Dequeued:出了隊(duì)列的消息爬迟,可以理解為是被消費(fèi)掉的消息數(shù)量橘蜜。在Queues里它和進(jìn)入隊(duì)列的總數(shù)量相等(因?yàn)橐粋€(gè)消息只會(huì)被成功消費(fèi)一次),如果暫時(shí)不等是因?yàn)橄M(fèi)者還沒(méi)來(lái)得及消費(fèi)。
Topics頁(yè)面
????Topics是主題方式消息雕旨,從菜單欄中點(diǎn)擊Topics可以進(jìn)入到Topics頁(yè)面扮匠,頁(yè)面主要內(nèi)容包括:
- Name:主題名稱。
- Number Of Pending Messages:未被消費(fèi)的消息數(shù)目凡涩。
- Number Of Consumers:消費(fèi)者的數(shù)量棒搜。
- Messages Enqueued:進(jìn)入隊(duì)列的消息 ;進(jìn)入隊(duì)列的總消息數(shù)目活箕,包括已經(jīng)被消費(fèi)的和未被消費(fèi)的力麸。 這個(gè)數(shù)量只增不減。
- Messages Dequeued:出了隊(duì)列的消息,可以理解為是被消費(fèi)掉的消息數(shù)量克蚂。在Topics里闺鲸,因?yàn)槎嘞M(fèi)者從而導(dǎo)致數(shù)量會(huì)比入隊(duì)列數(shù)高。
Subscribers頁(yè)面
????Subscribers 是查看訂閱者的頁(yè)面埃叭,可以查看訂閱者的信息等摸恍。只在Topics消息類型中這個(gè)頁(yè)面才會(huì)有數(shù)據(jù)。
Connections頁(yè)面
????Connections頁(yè)面可以查看到所有的連接數(shù)赤屋。
使用
Queue消息模式
????點(diǎn)對(duì)點(diǎn)的模式主要建立在一個(gè)隊(duì)列上面立镶,當(dāng)連接一個(gè)列隊(duì)的時(shí)候,發(fā)送端不需要知道接收端是否正在接收类早,可以直接向ActiveMQ發(fā)送消息媚媒,發(fā)送的消息將會(huì)先進(jìn)入隊(duì)列中,如果有接收端在監(jiān)聽涩僻,則會(huì)發(fā)向接收端缭召,如果沒(méi)有接收端接收,則會(huì)保存在activemq服務(wù)器逆日,直到接收端接收消息嵌巷,點(diǎn)對(duì)點(diǎn)的消息模式可以有多個(gè)發(fā)送端,多個(gè)接收端室抽,但是一條消息晴竞,只會(huì)被一個(gè)接收端給接收到,哪個(gè)接收端先連上ActiveMQ狠半,則會(huì)先接收到噩死,而后來(lái)的接收端則接收不到那條消息。
生產(chǎn)者
public class JmsProducer {
/**
* 默認(rèn)連接用戶名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默認(rèn)用戶密碼
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默認(rèn)連接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "queue.test";
public static void main(String[] args) {
/**
* 第一步:創(chuàng)建連接工廠
*/
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 連接
*/
Connection connection = null;
/**
* 會(huì)話
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息生產(chǎn)者
*/
MessageProducer messageProducer = null;
try {
/**
* 第二步:創(chuàng)建連接
*/
connection = connectionFactory.createConnection();
/**
* 啟動(dòng)連接
*/
connection.start();
/**
* 第三步:創(chuàng)建會(huì)話
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:創(chuàng)建消息目的地神年,其實(shí)就是連接到哪個(gè)隊(duì)列已维,如果是點(diǎn)對(duì)點(diǎn),那么它的實(shí)現(xiàn)是Queue已日,如果是訂閱模式垛耳,那它的實(shí)現(xiàn)是Topic。這里我們創(chuàng)建一個(gè)名為queue.test的消息隊(duì)列飘千。
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:創(chuàng)建消息生產(chǎn)者
*/
messageProducer = session.createProducer(destination);
/**
* 第六步:發(fā)送消息堂鲜,這個(gè)步驟包括創(chuàng)建消息,然后發(fā)送消息
*/
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 發(fā)送消息
*
* @param session
* @param messageProducer
* @throws JMSException
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
for (int i = 0; i < 10; i++) {
/**
* 創(chuàng)建一條文本消息
*/
TextMessage message = session.createTextMessage("ActiveMQ 發(fā)送消息" + i);
System.out.println("發(fā)送消息:Activemq 發(fā)送消息" + i);
/**
* 通過(guò)消息生產(chǎn)者發(fā)出消息
*/
messageProducer.send(message);
}
}
}
運(yùn)行結(jié)果圖:
????我們可以看的护奈,當(dāng)運(yùn)行JmsProducer程序時(shí)缔莲,在ActiveMQ控制臺(tái),可以看到生產(chǎn)者往queue.test的隊(duì)列中發(fā)送了10條消息霉旗,因?yàn)檫@時(shí)還沒(méi)有消費(fèi)者痴奏,所以這邊的Number Of Pending Messages顯示的是10蛀骇,
Number Of Consumers顯示的是0,Messages Enqueued顯示的也是10读拆。
消費(fèi)者
public class JmsConsumer {
/**
* 默認(rèn)連接用戶名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默認(rèn)用戶密碼
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默認(rèn)連接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "queue.test";
public static void main(String[] args) {
/**
* 第一步:創(chuàng)建連接工廠
*/
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 連接
*/
Connection connection = null;
/**
* 會(huì)話
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息消費(fèi)者
*/
MessageConsumer messageConsumer = null;
try {
/**
* 第二步:創(chuàng)建連接
*/
connection = connectionFactory.createConnection();
/**
* 啟動(dòng)連接
*/
connection.start();
/**
* 第三步:創(chuàng)建會(huì)話
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:創(chuàng)建消息目的地擅憔,其實(shí)就是連接到哪個(gè)隊(duì)列,如果是點(diǎn)對(duì)點(diǎn)檐晕,那么它的實(shí)現(xiàn)是Queue暑诸,如果是訂閱模式,那它的實(shí)現(xiàn)是Topic辟灰。這里我們創(chuàng)建一個(gè)名為queue.test的消息隊(duì)列屠列。
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:創(chuàng)建消費(fèi)者
*/
messageConsumer = session.createConsumer(destination);
while (true) {
/**
* 接收數(shù)據(jù)的時(shí)間(等待) 100 ms
*/
TextMessage textMessage = (TextMessage) messageConsumer.receive(1000 * 100);
if (textMessage != null) {
System.out.println("收到的消息:" + textMessage.getText());
} else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
運(yùn)行結(jié)果圖:
????我們可以看到,但運(yùn)行JmsConsumer程序時(shí)伞矩,在運(yùn)行程序的控制臺(tái)中我們可以看到消費(fèi)者消費(fèi)了剛剛生產(chǎn)者生產(chǎn)的消息。在ActiveMQ控制臺(tái)夏志,可以看到所以這邊的Number Of Pending Messages顯示的是0乃坤,Number Of Consumers顯示的是1,Messages Enqueued顯示的是10沟蔑,Messages Dequeued顯示的也是10湿诊,即消息被消費(fèi)。
???? 在前面的消費(fèi)者例子中瘦材,我們這邊使用while (true) 死循環(huán)來(lái)不停接受消息厅须。這樣很浪費(fèi)cpu資源,實(shí)際生產(chǎn)中不會(huì)這么做食棕。下面朗和,我們采用注冊(cè)一個(gè)監(jiān)聽器的方法,當(dāng)監(jiān)聽到有消息入隊(duì)列后簿晓,才去接收消息眶拉。
public class JmsConsumerMessageListener {
/**
* 默認(rèn)連接用戶名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默認(rèn)用戶密碼
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默認(rèn)連接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "queue.test";
public static void main(String[] args) {
/**
* 第一步:創(chuàng)建連接工廠
*/
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 連接
*/
Connection connection = null;
/**
* 會(huì)話
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息消費(fèi)者
*/
MessageConsumer messageConsumer = null;
try {
/**
* 第二步:創(chuàng)建連接
*/
connection = connectionFactory.createConnection();
/**
* 啟動(dòng)連接
*/
connection.start();
/**
* 第三步:創(chuàng)建會(huì)話
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:創(chuàng)建消息目的地,其實(shí)就是連接到哪個(gè)隊(duì)列憔儿,如果是點(diǎn)對(duì)點(diǎn)忆植,那么它的實(shí)現(xiàn)是Queue,如果是訂閱模式谒臼,那它的實(shí)現(xiàn)是Topic朝刊。這里我們創(chuàng)建一個(gè)名為queue.test的消息隊(duì)列。
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:創(chuàng)建消費(fèi)者
*/
messageConsumer = session.createConsumer(destination);
/**
* 第六步:創(chuàng)建監(jiān)聽器
*/
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("收到的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
運(yùn)行結(jié)果圖:
????當(dāng)生產(chǎn)者一生產(chǎn)消息到隊(duì)列中時(shí)蜈缤,我們的消費(fèi)者就馬上進(jìn)行消費(fèi)拾氓,注意程序中我們沒(méi)有將會(huì)話和連接關(guān)閉,因?yàn)楸O(jiān)聽器是異步的底哥,如果關(guān)閉后就無(wú)法接收到消息田盈。
Topic消息模式
????訂閱/發(fā)布模式掀虎,同樣可以有著多個(gè)發(fā)送端與多個(gè)接收端村斟,但是接收端與發(fā)送端存在時(shí)間上的依賴,就是如果發(fā)送端發(fā)送消息的時(shí)候易阳,接收端并沒(méi)有監(jiān)聽消息,那么ActiveMQ將不會(huì)保存消息吃粒,將會(huì)認(rèn)為消息已經(jīng)發(fā)送潦俺,換一種說(shuō)法,就是發(fā)送端發(fā)送消息的時(shí)候徐勃,接收端不在線事示,是接收不到消息的,哪怕以后監(jiān)聽消息僻肖,同樣也是接收不到的肖爵。這個(gè)模式還有一個(gè)特點(diǎn),那就是發(fā)送端發(fā)送的消息臀脏,將會(huì)被所有的接收端給接收到劝堪,不類似點(diǎn)對(duì)點(diǎn),一條消息只會(huì)被一個(gè)接收端給接收到揉稚。
發(fā)布者
public class JmsProducer {
/**
* 默認(rèn)連接用戶名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默認(rèn)用戶密碼
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默認(rèn)連接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String TOPIC_NAME = "topic.test";
public static void main(String[] args) {
/**
* 第一步:創(chuàng)建連接工廠
*/
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 連接
*/
Connection connection = null;
/**
* 會(huì)話
*/
Session session = null;
/**
* 消息目的地秒啦,其實(shí)就是連接到哪個(gè)隊(duì)列,如果是點(diǎn)對(duì)點(diǎn)搀玖,那么它的實(shí)現(xiàn)是Queue余境,如果是訂閱模式,那它的實(shí)現(xiàn)是Topic
*/
Destination destination = null;
/**
* 消息生產(chǎn)者
*/
MessageProducer messageProducer = null;
try {
/**
* 第二步:創(chuàng)建連接
*/
connection = connectionFactory.createConnection();
/**
* 啟動(dòng)連接
*/
connection.start();
/**
* 第三步:創(chuàng)建會(huì)話
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:創(chuàng)建消息目的地灌诅,這里我們創(chuàng)建一個(gè)名為topic.test的主題
*/
destination = session.createTopic(TOPIC_NAME);
/**
* 第五步:創(chuàng)建消息生產(chǎn)者
*/
messageProducer = session.createProducer(destination);
/**
* 第六步:發(fā)送消息芳来,這個(gè)步驟包括創(chuàng)建消息,然后發(fā)送消息
*/
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 發(fā)送消息
*
* @param session
* @param messageProducer
* @throws JMSException
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
for (int i = 0; i < 10; i++) {
/**
* 創(chuàng)建一條文本消息
*/
TextMessage message = session.createTextMessage("ActiveMQ 發(fā)送消息" + i);
System.out.println("發(fā)送消息:Activemq 發(fā)送消息" + i);
/**
* 通過(guò)消息生產(chǎn)者發(fā)出消息
*/
messageProducer.send(message);
}
}
}
運(yùn)行結(jié)果圖:
訂閱者
public class JmsConsumer {
/**
* 默認(rèn)連接用戶名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默認(rèn)用戶密碼
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默認(rèn)連接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String TOPIC_NAME = "topic.test";
public static void main(String[] args) {
/**
* 第一步:創(chuàng)建連接工廠
*/
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 連接
*/
Connection connection = null;
/**
* 會(huì)話
*/
Session session = null;
/**
* 消息目的地猜拾,其實(shí)就是連接到哪個(gè)隊(duì)列绣张,如果是點(diǎn)對(duì)點(diǎn),那么它的實(shí)現(xiàn)是Queue关带,如果是訂閱模式侥涵,那它的實(shí)現(xiàn)是Topic
*/
Destination destination = null;
/**
* 消息消費(fèi)者
*/
MessageConsumer messageConsumer = null;
try {
/**
* 第二步:創(chuàng)建連接
*/
connection = connectionFactory.createConnection();
/**
* 啟動(dòng)連接
*/
connection.start();
/**
* 第三步:創(chuàng)建會(huì)話
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:創(chuàng)建消息目的地,這里我們創(chuàng)建一個(gè)名為topic.test的主題
*/
destination = session.createTopic(TOPIC_NAME);
/**
* 第五步:創(chuàng)建消費(fèi)者
*/
messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("收到的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
運(yùn)行結(jié)果圖:
????我們可以發(fā)現(xiàn)宋雏,Topic消息模式的代碼跟Queue消息模式的代碼基本是一樣的芜飘,除了在創(chuàng)建消息目的地的時(shí)候,一個(gè)是queue一個(gè)是topic磨总;還有一點(diǎn)區(qū)別就是嗦明,Topic消息模式,訂閱者需要先訂閱蚪燕,才能接收到發(fā)布者發(fā)布的消息娶牌。
談?wù)凷ession
????在通過(guò)Connection創(chuàng)建Session的時(shí)候奔浅,需要設(shè)置2個(gè)參數(shù),一個(gè)是否支持事務(wù)诗良,另一個(gè)是簽收的模式汹桦。
????簽收就是消費(fèi)者接受到消息后,需要告訴消息服務(wù)器鉴裹,我收到消息了舞骆。當(dāng)消息服務(wù)器收到回執(zhí)后,本條消息將失效径荔。因此簽收將對(duì)PTP模式產(chǎn)生很大影響督禽。如果消費(fèi)者收到消息后,并不簽收总处,那么本條消息繼續(xù)有效狈惫,很可能會(huì)被其他消費(fèi)者消費(fèi)掉!
????簽收方式有三種:
- AUTO_ACKNOWLEDGE:表示在消費(fèi)者receive消息的時(shí)候自動(dòng)的簽收鹦马‰侍福客戶端發(fā)送和接收消息不需要做額外的工作。哪怕是接收端發(fā)生異常菠红,也會(huì)被當(dāng)作正常發(fā)送成功。
- CLIENT_ACKNOWLEDGE:表示消費(fèi)者receive消息后必須手動(dòng)的調(diào)用acknowledge()方法進(jìn)行簽收难菌。
- DUPS_OK_ACKNOWLEDGE:允許副本的確認(rèn)模式试溯。一旦接收方應(yīng)用程序的方法調(diào)用從處理消息處返回,會(huì)話對(duì)象就會(huì)確認(rèn)消息的接收郊酒;而且允許重復(fù)確認(rèn)遇绞。
發(fā)送消息的數(shù)據(jù)類型
????我們上面演示的全都是字符串的消息類型,但ActiveMQ支持的還有ObjectMessage燎窘,StreamMessage摹闽,MapMessage,BytesMessage等消息類型褐健。下面我們來(lái)看看其他消息類型是如何編寫的付鹿,以下都是以隊(duì)列的消息模式進(jìn)行。
ObjectMessage
傳輸對(duì)象
public class User implements Serializable {
private static final long serialVersionUID = 2504467948968634865L;
private String userName;
private String password;
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
@Override
public String toString() {
return "User{" +
"userName='" + userName + '\'' +
", password='" + password + '\'' +
'}';
}
}
生產(chǎn)者
public class JmsProducer {
/**
* 默認(rèn)連接用戶名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默認(rèn)用戶密碼
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默認(rèn)連接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "object.test";
public static void main(String[] args) {
/**
* 第一步:創(chuàng)建連接工廠
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 設(shè)置所有對(duì)所有序列化包都信任
*/
connectionFactory.setTrustAllPackages(true);
/**
* 連接
*/
Connection connection = null;
/**
* 會(huì)話
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息生產(chǎn)者
*/
MessageProducer messageProducer = null;
try {
/**
* 第二步:創(chuàng)建連接
*/
connection = connectionFactory.createConnection();
/**
* 啟動(dòng)連接
*/
connection.start();
/**
* 第三步:創(chuàng)建會(huì)話
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:創(chuàng)建消息目的地蚜迅,這里我們創(chuàng)建一個(gè)名為object.test的消息隊(duì)列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:創(chuàng)建消息生產(chǎn)者
*/
messageProducer = session.createProducer(destination);
/**
* 第六步:發(fā)送消息舵匾,這個(gè)步驟包括創(chuàng)建消息,然后發(fā)送消息
*/
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 發(fā)送消息
*
* @param session
* @param messageProducer
* @throws JMSException
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
/**
* 創(chuàng)建一條Object消息
*/
ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) session.createObjectMessage();
for (int i = 0; i < 10; i++) {
User user = new User();
user.setUserName("hyn" + i);
user.setPassword("qwe" + i);
System.out.println("發(fā)送消息:Activemq 發(fā)送消息" + user.toString());
/**
* 對(duì)象需要序列化
*/
objectMessage.setObject(user);
/**
* 通過(guò)消息生產(chǎn)者發(fā)出消息
*/
messageProducer.send(objectMessage);
}
}
}
運(yùn)行結(jié)果圖:
消費(fèi)者
public class JmsConsumerMessageListener {
/**
* 默認(rèn)連接用戶名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默認(rèn)用戶密碼
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默認(rèn)連接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "object.test";
public static void main(String[] args) {
/**
* 第一步:創(chuàng)建連接工廠
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 設(shè)置所有對(duì)所有序列化包都信任
*/
connectionFactory.setTrustAllPackages(true);
/**
* 連接
*/
Connection connection = null;
/**
* 會(huì)話
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息消費(fèi)者
*/
MessageConsumer messageConsumer = null;
try {
/**
* 第二步:創(chuàng)建連接
*/
connection = connectionFactory.createConnection();
/**
* 啟動(dòng)連接
*/
connection.start();
/**
* 第三步:創(chuàng)建會(huì)話
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:創(chuàng)建消息目的地谁不,這里我們創(chuàng)建一個(gè)名為object.test的消息隊(duì)列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:創(chuàng)建消費(fèi)者
*/
messageConsumer = session.createConsumer(destination);
/**
* 第六步:創(chuàng)建監(jiān)聽器
*/
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
User user = (User) ((ActiveMQObjectMessage) message).getObject();
System.out.println("收到的消息:" + user.toString());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
運(yùn)行結(jié)果圖:
????從代碼中我們可以看的坐梯,ObjectMessage跟TextMessage代碼差不多,只不過(guò)有兩個(gè)地方需要注意:
- 所傳輸?shù)膶?duì)象必須是序列化的刹帕,也就是要實(shí)現(xiàn)Serializable接口吵血;
- 在創(chuàng)建連接工廠時(shí)谎替,需要添加對(duì)所有或需要傳輸?shù)男蛄谢瘜?duì)象所在的包為白名單,這個(gè)是從ActiveMQ 5.12.2 開始為了增強(qiáng)這個(gè)框架的安全性蹋辅,ActiveMQ將強(qiáng)制用戶配置可序列化的包名钱贯;
BytesMessage
????首先我們項(xiàng)目的資源目錄下新建兩個(gè)文件,producer.txt 和 consumer.txt晕翠,在producer.txt輸入如下內(nèi)容喷舀,consumer.txt為空。
生產(chǎn)者
public class JmsProducer {
/**
* 默認(rèn)連接用戶名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默認(rèn)用戶密碼
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默認(rèn)連接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "bytes.test";
public static void main(String[] args) {
/**
* 第一步:創(chuàng)建連接工廠
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 連接
*/
Connection connection = null;
/**
* 會(huì)話
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息生產(chǎn)者
*/
MessageProducer messageProducer = null;
try {
/**
* 第二步:創(chuàng)建連接
*/
connection = connectionFactory.createConnection();
/**
* 啟動(dòng)連接
*/
connection.start();
/**
* 第三步:創(chuàng)建會(huì)話
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:創(chuàng)建消息目的地淋肾,這里我們創(chuàng)建一個(gè)名為bytes.test的消息隊(duì)列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:創(chuàng)建消息生產(chǎn)者
*/
messageProducer = session.createProducer(destination);
/**
* 第六步:發(fā)送消息硫麻,這個(gè)步驟包括創(chuàng)建消息,然后發(fā)送消息
*/
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 發(fā)送消息
*
* @param session
* @param messageProducer
* @throws JMSException
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
/**
* 創(chuàng)建一條Byte消息
*/
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(getFileByte(System.getProperty("user.dir")+"/src/main/resources/producer.txt"));
messageProducer.send(bytesMessage);
}
/**
* 讀取文件
*
* @param fileUrl
* @return
*/
public static byte[] getFileByte(String fileUrl) {
byte[] buffer = null;
FileInputStream fileInputStream = null;
try {
fileInputStream = new FileInputStream(new File(ResourceUtils.getURL(fileUrl).getPath()));
buffer = new byte[fileInputStream.available()];
fileInputStream.read(buffer);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return buffer;
}
}
運(yùn)行結(jié)果圖:
消費(fèi)者
public class JmsConsumerMessageListener {
/**
* 默認(rèn)連接用戶名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默認(rèn)用戶密碼
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默認(rèn)連接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "bytes.test";
public static void main(String[] args) {
/**
* 第一步:創(chuàng)建連接工廠
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 連接
*/
Connection connection = null;
/**
* 會(huì)話
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息消費(fèi)者
*/
MessageConsumer messageConsumer = null;
try {
/**
* 第二步:創(chuàng)建連接
*/
connection = connectionFactory.createConnection();
/**
* 啟動(dòng)連接
*/
connection.start();
/**
* 第三步:創(chuàng)建會(huì)話
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:創(chuàng)建消息目的地樊卓,這里我們創(chuàng)建一個(gè)名為bytes.test的消息隊(duì)列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:創(chuàng)建消費(fèi)者
*/
messageConsumer = session.createConsumer(destination);
/**
* 第六步:創(chuàng)建監(jiān)聽器
*/
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
FileOutputStream fileOutputStream = null;
try {
BytesMessage bytesMessage = (BytesMessage) message;
fileOutputStream = new FileOutputStream(new File((System.getProperty("user.dir") + "/src/main/resources/consumer.txt")));
byte[] content = new byte[1024];
int len;
while ((len = bytesMessage.readBytes(content)) != -1) {
fileOutputStream.write(content, 0, len);
}
} catch (JMSException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
運(yùn)行結(jié)果圖:
????從結(jié)果可以看出拿愧,consumer.txt的內(nèi)容結(jié)果跟product.txt內(nèi)容是一致的,即消息接收成功碌尔。當(dāng)然浇辜,發(fā)送文件的話我們也可以使用StreamMessage,下面我們來(lái)看看StreamMessage的使用唾戚。
StreamMessage
????同樣需要在項(xiàng)目中新建producer.txt 和 consumer.txt兩個(gè)文件柳洋;
生產(chǎn)者
public class JmsProducer {
/**
* 默認(rèn)連接用戶名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默認(rèn)用戶密碼
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默認(rèn)連接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "stream.test";
public static void main(String[] args) {
/**
* 第一步:創(chuàng)建連接工廠
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 連接
*/
Connection connection = null;
/**
* 會(huì)話
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息生產(chǎn)者
*/
MessageProducer messageProducer = null;
try {
/**
* 第二步:創(chuàng)建連接
*/
connection = connectionFactory.createConnection();
/**
* 啟動(dòng)連接
*/
connection.start();
/**
* 第三步:創(chuàng)建會(huì)話
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:創(chuàng)建消息目的地,這里我們創(chuàng)建一個(gè)名為stream.test的消息隊(duì)列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:創(chuàng)建消息生產(chǎn)者
*/
messageProducer = session.createProducer(destination);
/**
* 第六步:發(fā)送消息叹坦,這個(gè)步驟包括創(chuàng)建消息熊镣,然后發(fā)送消息
*/
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 發(fā)送消息
*
* @param session
* @param messageProducer
* @throws JMSException
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
/**
* 創(chuàng)建一條streamMessage消息
*/
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeBytes(getFileByte(System.getProperty("user.dir") + "/src/main/resources/producer.txt"));
messageProducer.send(streamMessage);
}
/**
* 讀取文件
*
* @param fileUrl
* @return
*/
public static byte[] getFileByte(String fileUrl) {
byte[] buffer = null;
FileInputStream fileInputStream = null;
try {
fileInputStream = new FileInputStream(new File(ResourceUtils.getURL(fileUrl).getPath()));
buffer = new byte[fileInputStream.available()];
fileInputStream.read(buffer);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return buffer;
}
}
運(yùn)行結(jié)果圖:
消費(fèi)者
public class JmsConsumerMessageListener {
/**
* 默認(rèn)連接用戶名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默認(rèn)用戶密碼
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默認(rèn)連接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "stream.test";
public static void main(String[] args) {
/**
* 第一步:創(chuàng)建連接工廠
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 連接
*/
Connection connection = null;
/**
* 會(huì)話
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息消費(fèi)者
*/
MessageConsumer messageConsumer = null;
try {
/**
* 第二步:創(chuàng)建連接
*/
connection = connectionFactory.createConnection();
/**
* 啟動(dòng)連接
*/
connection.start();
/**
* 第三步:創(chuàng)建會(huì)話
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:創(chuàng)建消息目的地,這里我們創(chuàng)建一個(gè)名為stream.test的消息隊(duì)列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:創(chuàng)建消費(fèi)者
*/
messageConsumer = session.createConsumer(destination);
/**
* 第六步:創(chuàng)建監(jiān)聽器
*/
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
FileOutputStream fileOutputStream = null;
try {
StreamMessage streamMessage = (StreamMessage) message;
fileOutputStream = new FileOutputStream(new File((System.getProperty("user.dir") + "/src/main/resources/consumer.txt")));
byte[] content = new byte[1024];
int len;
while ((len = streamMessage.readBytes(content)) != -1) {
fileOutputStream.write(content, 0, len);
}
} catch (JMSException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
運(yùn)行結(jié)果圖:
MapMessage
生產(chǎn)者
public class JmsProducer {
/**
* 默認(rèn)連接用戶名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默認(rèn)用戶密碼
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默認(rèn)連接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "map.test";
public static void main(String[] args) {
/**
* 第一步:創(chuàng)建連接工廠
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 連接
*/
Connection connection = null;
/**
* 會(huì)話
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息生產(chǎn)者
*/
MessageProducer messageProducer = null;
try {
/**
* 第二步:創(chuàng)建連接
*/
connection = connectionFactory.createConnection();
/**
* 啟動(dòng)連接
*/
connection.start();
/**
* 第三步:創(chuàng)建會(huì)話
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:創(chuàng)建消息目的地募书,這里我們創(chuàng)建一個(gè)名為map.test的消息隊(duì)列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:創(chuàng)建消息生產(chǎn)者
*/
messageProducer = session.createProducer(destination);
/**
* 第六步:發(fā)送消息绪囱,這個(gè)步驟包括創(chuàng)建消息,然后發(fā)送消息
*/
sendMessage(session, messageProducer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (null != session) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (null != connection) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 發(fā)送消息
*
* @param session
* @param messageProducer
* @throws JMSException
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
/**
* 創(chuàng)建一條mapMessage消息
*/
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name","hyn");
mapMessage.setInt("age",27);
messageProducer.send(mapMessage);
}
}
運(yùn)行結(jié)果圖:
消費(fèi)者
public class JmsConsumerMessageListener {
/**
* 默認(rèn)連接用戶名
*/
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默認(rèn)用戶密碼
*/
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默認(rèn)連接地址
*/
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final String QUEUE_NAME = "map.test";
public static void main(String[] args) {
/**
* 第一步:創(chuàng)建連接工廠
*/
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
/**
* 連接
*/
Connection connection = null;
/**
* 會(huì)話
*/
Session session = null;
/**
* 消息目的地
*/
Destination destination = null;
/**
* 消息消費(fèi)者
*/
MessageConsumer messageConsumer = null;
try {
/**
* 第二步:創(chuàng)建連接
*/
connection = connectionFactory.createConnection();
/**
* 啟動(dòng)連接
*/
connection.start();
/**
* 第三步:創(chuàng)建會(huì)話
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
/**
* 第四步:創(chuàng)建消息目的地莹捡,這里我們創(chuàng)建一個(gè)名為stream.test的消息隊(duì)列
*/
destination = session.createQueue(QUEUE_NAME);
/**
* 第五步:創(chuàng)建消費(fèi)者
*/
messageConsumer = session.createConsumer(destination);
/**
* 第六步:創(chuàng)建監(jiān)聽器
*/
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
MapMessage mapMessage = (MapMessage) message;
try {
System.out.println("name:" + mapMessage.getString("name"));
System.out.println("age:" + mapMessage.getInt("age"));
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
// if (null != session) {
// try {
// session.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
// if (null != connection) {
// try {
// connection.close();
// } catch (JMSException e) {
// e.printStackTrace();
// }
// }
}
}
}
運(yùn)行結(jié)果圖:
ActiveMQ的應(yīng)用
保證消息的成功處理
????消息發(fā)送成功后鬼吵,接收端接收到了消息。然后進(jìn)行處理篮赢,但是可能由于某種原因齿椅,高并發(fā)也好,IO阻塞也好启泣,反正這條消息在接收端處理失敗了媒咳。而點(diǎn)對(duì)點(diǎn)的特性是一條消息,只會(huì)被一個(gè)接收端給接收种远,只要接收端A接收成功了涩澡,接收端B,就不可能接收到這條消息,如果是一些普通的消息還好妙同,但是如果是一些很重要的消息射富,比如說(shuō)用戶的支付訂單,用戶的退款粥帚,這些與金錢相關(guān)的胰耗,是必須保證成功的,那么這個(gè)時(shí)候要怎么處理呢芒涡?
????我們可以在創(chuàng)建session的時(shí)候使用 CLIENT_ACKNOWLEDGE 模式柴灯。創(chuàng)建session的時(shí)候是需要指定事務(wù)以及消息的處理模式的。我們之前是這樣創(chuàng)建session:
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
????AUTO_ACKNOWLEDGE的消息處理模式是當(dāng)消息發(fā)送給接收端之后费尽,就自動(dòng)確認(rèn)成功了赠群,而不管接收端有沒(méi)有處理成功,而一旦確認(rèn)成功后旱幼,就會(huì)把隊(duì)列里面的消息給清除掉查描,避免下一個(gè)接收端接收到同樣的消息。
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
而當(dāng)我們使用CLIENT_ACKNOWLEDGE的消息處理模式時(shí)柏卤,如果接收端不確認(rèn)消息冬三,那么activemq將會(huì)把這條消息一直保留,直到有一個(gè)接收端確定了消息缘缚。那么要怎么確認(rèn)消息呢勾笆?具體代碼如下:
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("收到的消息:" + textMessage.getText());
//確認(rèn)接收,并成功處理了消息
textMessage.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
避免消息隊(duì)列的并發(fā)
主動(dòng)接收隊(duì)列消息
????之前的代碼里面桥滨,實(shí)現(xiàn)了一個(gè)監(jiān)聽器窝爪,監(jiān)聽消息的傳遞,這樣只要每有一個(gè)消息该园,都會(huì)即時(shí)的傳遞到程序中酸舍。但是帅韧,這樣的處理里初,在高并發(fā)的時(shí)候,因?yàn)樗潜粍?dòng)接收忽舟,并沒(méi)有考慮到程序的處理能力双妨,可能會(huì)壓跨系統(tǒng),那要怎么辦呢?
????答案就是把被動(dòng)變?yōu)橹鲃?dòng)叮阅,當(dāng)程序有著處理消息的能力時(shí)刁品,主動(dòng)去接收一條消息進(jìn)行處理
if(當(dāng)程序有能力處理){//當(dāng)程序有能力處理時(shí)接收
Message receive = consumer.receive();
//這個(gè)可以設(shè)置超時(shí)時(shí)間,超過(guò)則不等待消息
recieve.receive(10000);
//其實(shí)receive是一個(gè)阻塞式方法浩姥,一定會(huì)拿到值的
if(null != receive){
String text = ((TextMessage)receive).getText();
receive.acknowledge();
System.out.println(text);
}else{
//沒(méi)有值
}
}
使用多個(gè)接收端
????ActiveMQ是支持多個(gè)接收端的挑随,如果當(dāng)程序無(wú)法處理這么多數(shù)據(jù)的時(shí)候,可以考慮多個(gè)線程勒叠,或者增加服務(wù)器來(lái)處理兜挨。
消息有效期的管理
????這樣的場(chǎng)景也是有的膏孟,一條消息的有效時(shí)間,當(dāng)發(fā)送一條消息的時(shí)候拌汇,可能希望這條消息在指定的時(shí)間被處理柒桑,如果超過(guò)了指定的時(shí)間,那么這條消息就失效了噪舀,就不需要進(jìn)行處理了魁淳,那么我們可以使用ActiveMQ的設(shè)置有效期來(lái)實(shí)現(xiàn)。具體設(shè)置如下:
producer.setTimeToLive(long l);
過(guò)期消息与倡,處理失敗的消息如何處理
????過(guò)期的界逛、處理失敗的消息,將會(huì)被ActiveMQ置入“ActiveMQ.DLQ”這個(gè)隊(duì)列中蒸走。這個(gè)隊(duì)列是ActiveMQ自動(dòng)創(chuàng)建的仇奶。如果需要查看這些未被處理的消息,可以進(jìn)入這個(gè)隊(duì)列中查看:
//指定一個(gè)目的地比驻,也就是一個(gè)隊(duì)列的位置
destination = session.createQueue("ActiveMQ.DLQ");
????這樣就可以進(jìn)入隊(duì)列中该溯,然后實(shí)現(xiàn)接口,或者通過(guò)receive()方法别惦,就可以拿到未被處理的消息狈茉,從而保證正確的處理。
????整理文章主要為了自己日后復(fù)習(xí)用掸掸,文章中可能會(huì)引用到別的博主的文章氯庆,如涉及到博主的版權(quán)問(wèn)題,請(qǐng)博主聯(lián)系我扰付。