ActiveMq簡單介紹
一般常見的消息中間件有:RabbitMQ,ActiveMq,RocketMQ(阿里)等森爽,都稱之為MQ(Message Queue督函,消息隊列)嘀粱,這里介紹ActiveMQ。
ActiveMQ是Apache出品辰狡,最流行的锋叨,能力強勁的開源消息總線。從設(shè)計上保證了高性能的集群宛篇,客戶端-服務(wù)器娃磺,點對點。完全支持JMS1.1和J2EE1.4規(guī)范的JMSProvider實現(xiàn)叫倍,盡管JMS規(guī)范出臺已經(jīng)是很久的事情了偷卧,但是JMS在當(dāng)今的J2EE應(yīng)用中間仍然扮演著特殊的地位。
特性:
⒈多種語言和協(xié)議編寫客戶端吆倦。語言:Java,C,C++,C#,Ruby,Perl,Python,PHP听诸。應(yīng)用協(xié)議:OpenWire,StompREST,WSNotification,XMPP,AMQP
⒉完全支持JMS1.1和J2EE1.4規(guī)范(持久化,XA消息蚕泽,事務(wù))
⒊對Spring的支持晌梨,ActiveMQ可以很容易內(nèi)嵌到使用Spring的系統(tǒng)里面去,而且也支持Spring2.0的特性
⒋通過了常見J2EE服務(wù)器(如Geronimo,JBoss4,GlassFish,WebLogic)的測試赛糟,其中通過JCA1.5resourceadaptors的配置派任,可以讓ActiveMQ可以自動的部署到任何兼容J2EE1.4商業(yè)服務(wù)器上
⒌支持多種傳送協(xié)議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
⒍支持通過JDBC和journal提供高速的消息持久化
⒎從設(shè)計上保證了高性能的集群,客戶端-服務(wù)器璧南,點對點
⒏支持Ajax
⒐支持與Axis的整合
⒑可以很容易得調(diào)用內(nèi)嵌JMSprovider掌逛,進行測試
消息形式:
一種是點對點的,即一個生產(chǎn)者和一個消費者一一對應(yīng)司倚;
另一種是發(fā)布/訂閱模式豆混,即一個生產(chǎn)者產(chǎn)生消息并進行發(fā)送后,可以由多個消費者進行接收动知。
JMS定義了五種不同的消息正文格式皿伺,以及調(diào)用的消息類型,允許你發(fā)送并接收以一些不同形式的數(shù)據(jù)盒粮,提供現(xiàn)有消息格式的一些級別的兼容性鸵鸥。
? StreamMessage – Java原始值的數(shù)據(jù)流
? MapMessage–一套名稱-值對
? TextMessage–一個字符串對象
? ObjectMessage–一個序列化的 Java對象
? BytesMessage–一個字節(jié)的數(shù)據(jù)流
ActiveMq的安裝啟動
ActiveMQ的安裝啟動很簡單。
下載:進入http://activemq.apache.org/下載ActiveMQ
這里在linux下進行安裝。
1妒穴、將下載的ActiveMQ壓縮包上傳到linux(我是上傳到了/usr目錄下)宋税,用遠程連接工具(比如ssh)。
2讼油、解壓縮 :tar zxvf apache-activemq-5.15.4-bin.tar.gz
3杰赛、啟動:進入bin目錄 ./activemq start
查看是否啟動成功,./activemq status
如果沒有啟動成功矮台,那么就要修改hosts文件了乏屯,配置機器名和本機(127.0.0.1)的映射關(guān)系。比如本機名是laowang(可以使用命令/etc/sysconfig/network查看機器名稱)瘦赫。那么需要做如下的修改:
[root@laowang bin]# cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
127.0.0.1 laowang.localdomain itheima32//加上這一行
然后瀏覽器訪問:192.168.25.128:8161(8161是固定端口號)
那么說明啟動成功了辰晕。
注:點擊Manage ActiveMqQ broker輸入用戶名密碼為admin。
ActiveMQ在java中的使用
對照著上面兩種模式圖耸彪,編寫代碼的時候后會更容易理解伞芹。
創(chuàng)建個maven項目,導(dǎo)入的包是
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.14.0</version>
</dependency>
包版本超過5.14.0會和mybatis沖突
1蝉娜、點對點模式:生產(chǎn)者
/*
* 測試點對點模式:生產(chǎn)者
*/
@Test
public void testQueueProducer() throws JMSException{
//1.創(chuàng)建一個連接工廠對象,需要指定服務(wù)的ip及端口號
ActiveMQConnectionFactory factory =
new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
//2.使用工廠對象來創(chuàng)建一個Connection對象
Connection connection = factory.createConnection();
//3.開啟連接,調(diào)用Connection對象的start方法
connection.start();
//4.創(chuàng)建一個Session對象
//第一個參數(shù):是否開啟事務(wù),如果開啟事務(wù)true帶二個參數(shù)無意義扎唾,一般不開啟事務(wù)false
//第二個參數(shù):應(yīng)答模式召川。一般自動應(yīng)答或者手動應(yīng)答。一般自動應(yīng)答
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.使用Session對象創(chuàng)建一個Destination對象胸遇。兩種形式queue,topic,現(xiàn)在應(yīng)該使用queue
Queue queue = session.createQueue("測試消息隊列First");
//6.使用Session對象創(chuàng)建一個Producer對象
MessageProducer producer = session.createProducer(queue);
//7.創(chuàng)建一個Message對象荧呐,可以使用TextMessage
Message message = session.createTextMessage("hello activemq!");
//8.發(fā)送消息
producer.send(message);
//9.關(guān)閉資源
producer.close();
session.close();
connection.close();
}
運行方法后,會發(fā)送一個消息纸镊,可以在頁面查看倍阐。
一條掛起的消息,一條入隊的消息,也能看到目的地跟消息詳情逗威,以及消息被持久化峰搪。
2、點對點模式:消費者
前面生產(chǎn)了一條消息凯旭,這里就測試消費者去消費這條消息概耻。
/*
* 測試點對點模式:消費者
*/
@Test
public void testQueueConsumer() throws Exception{
//1.創(chuàng)建連接工廠
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
//2.創(chuàng)建連接
Connection connection = factory.createConnection();
//3.開啟連接
connection.start();
//4.創(chuàng)建session對象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.創(chuàng)建目的地,需要跟生產(chǎn)者發(fā)送消息的目的地一致不然不能接收到消息
Queue queue = session.createQueue("測試消息隊列First");
//使用Session對象創(chuàng)建一個消費者對象
MessageConsumer consumer = session.createConsumer(queue);
//接收消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage)message;
try {
String text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//等待接收消息,消費完了就沒了
System.in.read();//等待我們敲擊鍵盤才會向下執(zhí)行
//關(guān)閉資源
consumer.close();
session.close();
connection.close();
}
運行后
被掛起的消息已經(jīng)被消費了罐呼,一個消費者鞠柄,一個入隊消息一個出隊消息。如果再發(fā)生消息的話控制臺會持續(xù)(也就是消費者能持續(xù)接收生產(chǎn)者發(fā)送的消息)輸出直至按回下回車鍵才結(jié)束嫉柴。這個時候消費者數(shù)量也變?yōu)?了厌杜。
3、發(fā)布/訂閱模式:生產(chǎn)者
跟點對點模式的生產(chǎn)者很類似计螺,主要是創(chuàng)建的目的地為Topic而不是Queue了夯尽。
/*
* 發(fā)布訂閱模式:生產(chǎn)者
*/
@Test
public void testTopicProducer() throws Exception{
//1.創(chuàng)建連接工廠
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
//2.創(chuàng)建連接
Connection connection = factory.createConnection();
//3.開啟連接
connection.start();
//4.創(chuàng)建session對象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.創(chuàng)建目的地Destination瞧壮,該模式創(chuàng)建的是Topic
Topic topic = session.createTopic("測試消息隊列First");
//6.創(chuàng)建生產(chǎn)者
MessageProducer producer = session.createProducer(topic);
//7.創(chuàng)建消息
TextMessage message = session.createTextMessage("hello topic");
//8.發(fā)送消息
producer.send(message);
//9.關(guān)閉資源
producer.close();
session.close();
connection.close();
}
運行后去頁面點擊Topics查看
image.png
0個消費者,該模式默認(rèn)消息不被持久化呐萌,但是可以修改為持久化馁痴,只有一條入隊消息。
4肺孤、發(fā)布/訂閱模式:消費者
/*
* 發(fā)布訂閱模式:消費者
*/
@SuppressWarnings("unused")
@Test
public void testTopicConsumer() throws Exception{
//1.創(chuàng)建連接工廠
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
//2.創(chuàng)建連接
Connection connection = factory.createConnection();
//3.開啟連接
connection.start();
//4.創(chuàng)建session對象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.創(chuàng)建目的地Destination
Topic topic = session.createTopic("測試消息隊列First");
//6.創(chuàng)建消費者
MessageConsumer consumer = session.createConsumer(topic);
//7.接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage)message;
//獲取信息并打印
try {
String text = textMessage.getText();
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.out.println("----消費者1準(zhǔn)備就緒----");
System.in.read();
//9.關(guān)閉資源
consumer.close();
session.close();
connection.close();
}
雖然前面發(fā)布了一條消息但是既然不能被持久化罗晕,那么這里自然就不能去接收該消息了。
所以需要先執(zhí)行該方法赠堵,使消費者處于持續(xù)能接收消息的狀態(tài)小渊,這樣生產(chǎn)者每次發(fā)送消息都能被接收到。
這里該測試方法運行三次茫叭。打印依次為:
System.out.println(“—-消費者1準(zhǔn)備就緒—-“);
System.out.println(“—-消費者2準(zhǔn)備就緒—-“);
System.out.println(“—-消費者3準(zhǔn)備就緒—-“);
就等于有三個消費者了酬屉。這個時候再執(zhí)行測試生產(chǎn)者的方法(執(zhí)行三次,等于發(fā)了三次消息)揍愁,就能發(fā)現(xiàn)三個消費者都接收到了三條消息呐萨。
查看控制臺:
----消費者1準(zhǔn)備就緒----
hello topic
hello topic
hello topic
----消費者2準(zhǔn)備就緒----
hello topic
hello topic
hello topic
----消費者3準(zhǔn)備就緒----
hello topic
hello topic
hello topic
再看頁面:
4條入隊消息(其中一條沒被消費),9條出隊消息(3個消費者每個消費3條消息)莽囤。