ActiveMQ
[toc]
簡書不支持 toc 目錄模式方仿,截圖一張蝗羊。
什么是ActiveMQ
ActiveMQ 是Apache出品藏澳,最流行的,能力強勁的開源消息總線耀找。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實現(xiàn),盡管JMS規(guī)范出臺已經(jīng)是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位翔悠。
主要特點:
- 多種語言和協(xié)議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP野芒。應用協(xié)議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
- 完全支持JMS1.1和J2EE 1.4規(guī)范 (持久化,XA消息,事務)
- 對Spring的支持,ActiveMQ可以很容易內(nèi)嵌到使用Spring的系統(tǒng)里面去,而且也支持Spring2.0的特性
- 通過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業(yè)服務器上
- 支持多種傳送協(xié)議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
- 支持通過JDBC和journal提供高速的消息持久化
- 從設(shè)計上保證了高性能的集群,客戶端-服務器,點對點
- 支持Ajax
- 支持與Axis的整合
- 可以很容易得調(diào)用內(nèi)嵌JMS provider,進行測試
ActiveMQ的消息形式
- 對于消息的傳遞有兩種類型:
- 一種是點對點的蓄愁,即一個生產(chǎn)者和一個消費者一一對應;
- 另一種是發(fā)布/訂閱模式狞悲,即一個生產(chǎn)者產(chǎn)生消息并進行發(fā)送后撮抓,可以由多個消費者進行接收。
JMS定義了五種不同的消息正文格式摇锋,以及調(diào)用的消息類型丹拯,允許你發(fā)送并接收以一些不同形式的數(shù)據(jù),提供現(xiàn)有消息格式的一些級別的兼容性荸恕。
- StreamMessage -- Java原始值的數(shù)據(jù)流
- MapMessage--一套名稱-值對
- TextMessage--一個字符串對象
- ObjectMessage--一個序列化的 Java對象
- BytesMessage--一個字節(jié)的數(shù)據(jù)流
ActiveMQ的安裝
進入http://activemq.apache.org/ 下載ActiveMQ
最新版本: 5.14.5
安裝環(huán)境:
需要jdk
安裝Linux系統(tǒng)咽笼。生產(chǎn)環(huán)境都是Linux系統(tǒng)。
安裝步驟
第一步: 把ActiveMQ 的壓縮包上傳到Linux系統(tǒng)戚炫。
第二步:解壓縮。
第三步:啟動媳纬。
使用bin目錄下的activemq命令啟動:
[root@localhost bin]# ./activemq start
關(guān)閉:
[root@localhost bin]# ./activemq stop
查看狀態(tài):
[root@localhost bin]# ./activemq status
注意:如果ActiveMQ整合spring使用双肤,不要使用activemq-all-5.14.5.jar包(spring 可能少方法)。建議使用5.11.2
進入管理后臺:
http://192.168.25.168:8161/admin
用戶名:admin
密碼:admin
可能出現(xiàn)的問題:
405的問題:機器名和 ip 沒有對上钮惠,修改 host 文件茅糜。
查看 hostname,然后檢查 hosts 文件中是否相同素挽。
/etc/sysconfig/network
vim /etc/h/hosts
ActiveMQ的使用方法
Queue
點對點(point-to-point蔑赘,簡稱PTP)Queue消息傳遞模型
Producer
生產(chǎn)者:生產(chǎn)消息,發(fā)送端预明。
把jar包添加到工程中缩赛。使用5.11.2版本的jar包。
第一步:創(chuàng)建ConnectionFactory對象撰糠,需要指定服務端ip及端口號酥馍。
第二步:使用ConnectionFactory對象創(chuàng)建一個Connection對象。
第三步:開啟連接阅酪,調(diào)用Connection對象的start方法旨袒。
第四步:使用Connection對象創(chuàng)建一個Session對象汁针。
第五步:使用Session對象創(chuàng)建一個Destination對象(topic、queue)砚尽,此處創(chuàng)建一個Queue對象施无。
第六步:使用Session對象創(chuàng)建一個Producer對象。
第七步:創(chuàng)建一個Message對象必孤,創(chuàng)建一個TextMessage對象猾骡。
第八步:使用Producer對象發(fā)送消息。
第九步:關(guān)閉資源隧魄。
@Test
public void testQueueProducer() throws Exception {
// 第一步:創(chuàng)建ConnectionFactory對象卓练,需要指定服務端ip及端口號。
//brokerURL服務器的ip及端口號
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
// 第二步:使用ConnectionFactory對象創(chuàng)建一個Connection對象购啄。
Connection connection = connectionFactory.createConnection();
// 第三步:開啟連接襟企,調(diào)用Connection對象的start方法。
connection.start();
// 第四步:使用Connection對象創(chuàng)建一個Session對象狮含。
//第一個參數(shù):是否開啟事務顽悼。true:開啟事務,第二個參數(shù)忽略几迄。
//第二個參數(shù):當?shù)谝粋€參數(shù)為false時蔚龙,才有意義。消息的應答模式映胁。1木羹、自動應答2、手動應答解孙。一般是自動應答坑填。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session對象創(chuàng)建一個Destination對象(topic、queue)弛姜,此處創(chuàng)建一個Queue對象脐瑰。
//參數(shù):隊列的名稱。
Queue queue = session.createQueue("test-queue");
// 第六步:使用Session對象創(chuàng)建一個Producer對象廷臼。
MessageProducer producer = session.createProducer(queue);
// 第七步:創(chuàng)建一個Message對象苍在,創(chuàng)建一個TextMessage對象。
/*TextMessage message = new ActiveMQTextMessage();
message.setText("hello activeMq,this is my first test.");*/
TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
// 第八步:使用Producer對象發(fā)送消息荠商。
producer.send(textMessage);
// 第九步:關(guān)閉資源寂恬。
producer.close();
session.close();
connection.close();
}
Consumer
消費者:接收消息。
第一步:創(chuàng)建一個ConnectionFactory對象莱没。
第二步:從ConnectionFactory對象中獲得一個Connection對象掠剑。
第三步:開啟連接。調(diào)用Connection對象的start方法郊愧。
第四步:使用Connection對象創(chuàng)建一個Session對象朴译。
第五步:使用Session對象創(chuàng)建一個Destination對象井佑。和發(fā)送端保持一致queue,并且隊列的名稱一致眠寿。
第六步:使用Session對象創(chuàng)建一個Consumer對象躬翁。
第七步:接收消息。
第八步:打印消息盯拱。
第九步:關(guān)閉資源
@Test
public void testQueueConsumer() throws Exception {
// 第一步:創(chuàng)建一個ConnectionFactory對象盒发。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
// 第二步:從ConnectionFactory對象中獲得一個Connection對象。
Connection connection = connectionFactory.createConnection();
// 第三步:開啟連接狡逢。調(diào)用Connection對象的start方法宁舰。
connection.start();
// 第四步:使用Connection對象創(chuàng)建一個Session對象。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session對象創(chuàng)建一個Destination對象奢浑。和發(fā)送端保持一致queue蛮艰,并且隊列的名稱一致。
Queue queue = session.createQueue("test-queue");
// 第六步:使用Session對象創(chuàng)建一個Consumer對象雀彼。
MessageConsumer consumer = session.createConsumer(queue);
// 第七步:接收消息壤蚜。
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = null;
//取消息的內(nèi)容
text = textMessage.getText();
// 第八步:打印消息。
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//等待鍵盤輸入
System.in.read();
// 第九步:關(guān)閉資源
consumer.close();
session.close();
connection.close();
}
Topic
發(fā)布/訂閱(publish/subscribe徊哑,簡稱pub/sub)Topic消息傳遞模型
Producer
使用步驟:
第一步:創(chuàng)建ConnectionFactory對象袜刷,需要指定服務端ip及端口號。
第二步:使用ConnectionFactory對象創(chuàng)建一個Connection對象莺丑。
第三步:開啟連接著蟹,調(diào)用Connection對象的start方法。
第四步:使用Connection對象創(chuàng)建一個Session對象梢莽。
第五步:使用Session對象創(chuàng)建一個Destination對象(topic草则、queue),此處創(chuàng)建一個Topic對象蟹漓。
第六步:使用Session對象創(chuàng)建一個Producer對象。
第七步:創(chuàng)建一個Message對象源内,創(chuàng)建一個TextMessage對象葡粒。
第八步:使用Producer對象發(fā)送消息。
第九步:關(guān)閉資源膜钓。
@Test
public void testTopicProducer() throws Exception {
// 第一步:創(chuàng)建ConnectionFactory對象嗽交,需要指定服務端ip及端口號。
// brokerURL服務器的ip及端口號
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
// 第二步:使用ConnectionFactory對象創(chuàng)建一個Connection對象颂斜。
Connection connection = connectionFactory.createConnection();
// 第三步:開啟連接夫壁,調(diào)用Connection對象的start方法。
connection.start();
// 第四步:使用Connection對象創(chuàng)建一個Session對象沃疮。
// 第一個參數(shù):是否開啟事務盒让。true:開啟事務梅肤,第二個參數(shù)忽略。
// 第二個參數(shù):當?shù)谝粋€參數(shù)為false時邑茄,才有意義姨蝴。消息的應答模式。1肺缕、自動應答2左医、手動應答。一般是自動應答同木。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session對象創(chuàng)建一個Destination對象(topic浮梢、queue),此處創(chuàng)建一個topic對象彤路。
// 參數(shù):話題的名稱秕硝。
Topic topic = session.createTopic("test-topic");
// 第六步:使用Session對象創(chuàng)建一個Producer對象。
MessageProducer producer = session.createProducer(topic);
// 第七步:創(chuàng)建一個Message對象斩萌,創(chuàng)建一個TextMessage對象缝裤。
/*
* TextMessage message = new ActiveMQTextMessage(); message.setText(
* "hello activeMq,this is my first test.");
*/
TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
// 第八步:使用Producer對象發(fā)送消息。
producer.send(textMessage);
// 第九步:關(guān)閉資源颊郎。
producer.close();
session.close();
connection.close();
}
Consumer
消費者:接收消息憋飞。
第一步:創(chuàng)建一個ConnectionFactory對象。
第二步:從ConnectionFactory對象中獲得一個Connection對象姆吭。
第三步:開啟連接榛做。調(diào)用Connection對象的start方法。
第四步:使用Connection對象創(chuàng)建一個Session對象内狸。
第五步:使用Session對象創(chuàng)建一個Destination對象检眯。和發(fā)送端保持一致topic,并且話題的名稱一致昆淡。
第六步:使用Session對象創(chuàng)建一個Consumer對象锰瘸。
第七步:接收消息。
第八步:打印消息昂灵。
第九步:關(guān)閉資源
@Test
public void testTopicConsumer() throws Exception {
// 第一步:創(chuàng)建一個ConnectionFactory對象避凝。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
// 第二步:從ConnectionFactory對象中獲得一個Connection對象。
Connection connection = connectionFactory.createConnection();
// 第三步:開啟連接眨补。調(diào)用Connection對象的start方法管削。
connection.start();
// 第四步:使用Connection對象創(chuàng)建一個Session對象。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session對象創(chuàng)建一個Destination對象撑螺。和發(fā)送端保持一致topic含思,并且話題的名稱一致。
Topic topic = session.createTopic("test-topic");
// 第六步:使用Session對象創(chuàng)建一個Consumer對象。
MessageConsumer consumer = session.createConsumer(topic);
// 第七步:接收消息含潘。
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = null;
// 取消息的內(nèi)容
text = textMessage.getText();
// 第八步:打印消息饲做。
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.out.println("topic的消費端03。调鬓。艇炎。。腾窝。");
// 等待鍵盤輸入
System.in.read();
// 第九步:關(guān)閉資源
consumer.close();
session.close();
connection.close();
}