一秩铆、ActiveMQ消息中間件
在傳統(tǒng)的消息發(fā)送和接收模式上逝撬,一般是以同步的方式來發(fā)送接收消息,以同步的方式來推送消息對我們的服務(wù)有時(shí)造成了很大的影響巾陕,比如當(dāng)我們的服務(wù)器出現(xiàn)了故障魂奥,客戶端推送消息到服務(wù)器菠剩,但是服務(wù)器此時(shí)出現(xiàn)問題它不會返回給我們的客戶端它任何接收到消息的信息,這是我們的客戶端一直處于等待狀態(tài)耻煤,這樣不僅浪費(fèi)資源而且有時(shí)會對我們的產(chǎn)品造成很大的影響具壮,這是我們有一個好的主意,退出了JMS消息服務(wù)哈蝇,作為開源的JMS Provider棺妓,ActiveMQ幫助我們解決了這一麻煩的事情,首先我們來看一下使用消息中間件的圖解炮赦。
在這里我們可以看到我們?nèi)绻捎孟⒅虚g件的方式來推送我們的消息怜跑,這是當(dāng)我們的客戶端想給服務(wù)器發(fā)送消息的時(shí)候,我們只要將消息 發(fā)送出去吠勘,經(jīng)過消息中間件性芬,至于消息中間件什么時(shí)候推送消息給服務(wù)器,這些客戶端并不關(guān)系剧防,它直接返回植锉,因?yàn)椴捎玫氖钱惒降姆绞剑词宫F(xiàn)在服務(wù)器我們的服務(wù)器忙碌或者出現(xiàn)問題峭拘,也不影響我們消息的推送俊庇,當(dāng)下次服務(wù)器啟動的時(shí)候自然可以接收到消息搏熄。這里舉個簡單的例子就會對消息中間件有一個更好的理解,假如你要向你的老板請假暇赤,但此時(shí)你的boss正在開會,而你又急著趕飛機(jī)宵凌,這是該怎么辦呢鞋囊?當(dāng)然你可以給你的boss打電話,但是也許你一個電話導(dǎo)致會議中斷瞎惫,這時(shí)造成了不必要的麻煩溜腐,所以打電話似乎不太好,但是如果要你等的話瓜喇,飛機(jī)不等你挺益,這是我們想到一個更好的辦法,我們可以給boss留言乘寒,這樣你就可以去趕你的飛機(jī)望众,而老板開完會自然會看到你的留言,這樣都不耽誤伞辛,所以烂翰,這樣就輕松解決了我們的問題。采用消息中間件的平臺比如微信等等蚤氏。
這里我們主要介紹ActiveMQ消息中間件,它有兩種消息模式,一種是隊(duì)列模式试浙,一種是主題模式徒役。下面我們分別來介紹。
二于游、隊(duì)列模式
什么是隊(duì)列模式呢毁葱?所謂隊(duì)列模式,就是我們的消息消息生產(chǎn)者(Producer)向一個目的地(Desitinate)存放我們的消息贰剥,如果是多個消費(fèi)者我們可以它們會分別消費(fèi)這些消息头谜,沒個消息只能供一個消費(fèi)者使用,下面我們用圖解來清除表示:
從上面的圖示我們可以很清除的看出隊(duì)列模式的含義鸠澈,即每一個消息只能被一個消費(fèi)者消費(fèi)柱告,當(dāng)有一個消費(fèi)者使用該消息后,即從我們的隊(duì)列中取出該消息笑陈。
三际度、主題模式
如果我們上面理解了隊(duì)列模式,這里我們很容易理解主題模式涵妥,與之相反的乖菱,每個消息可以供多個消費(fèi)者使用,如圖所示。
四窒所、ActiveMQ實(shí)戰(zhàn)
經(jīng)過上面的一番講解鹉勒,相信大家都對兩種模式有了一個清楚的概念,下面吵取,我們來看一個ActiveMQ的工作流程圖禽额。
這里使用一個工廠類來創(chuàng)建一個連接,通過連接來創(chuàng)建一個會話皮官,然后通過會話來創(chuàng)建消費(fèi)者和生產(chǎn)者脯倒,這里有一個消息監(jiān)聽器用來監(jiān)聽消息隊(duì)列的,生產(chǎn)者和消費(fèi)者通過這個監(jiān)聽器來手法消息捺氢,先這樣簡單了解一下藻丢,我們通過例子來詳細(xì)了解。這里引用一句話摄乒,學(xué)習(xí)最好的方式就是實(shí)戰(zhàn)(這是spring3.x這本書中的一句話悠反,原話記不太清了,大概就是這樣馍佑,哈哈)问慎。下面我們來創(chuàng)建項(xiàng)目編寫我們的代碼。
1.首先挤茄,我們創(chuàng)建一個maven工程如叼,下面是所需要的依賴。
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
首先穷劈,我們創(chuàng)建一個隊(duì)列模式下的生產(chǎn)者和消費(fèi)者
Producer
public class AppProducter {
private static final String URL="tcp://127.0.0.1:61616";
private static final String QueueName="QueueTest";
public static void main(String[] args) throws JMSException {
//創(chuàng)建工廠類
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(URL);
//創(chuàng)建連接
Connection conn=connectionFactory.createConnection();
//啟動連接
conn.start();
//創(chuàng)建會話
Session session=conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建目的地(消息發(fā)送的目標(biāo))
Destination destination=session.createQueue(QueueName);
//創(chuàng)建消息發(fā)布者
MessageProducer producer= session.createProducer(destination);
for(int i=0;i<100;i++) {
//創(chuàng)建消息
TextMessage message=session.createTextMessage("test"+i);
producer.send(message);
System.out.println("發(fā)送消息"+i+" "+message.getText());
}
//關(guān)閉連接
conn.close();
}
}
Consumer
package cn.shinelon.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class AppConsumer {
private static final String URL="tcp://127.0.0.1:61616";
private static final String QueueName="QueueTest";
public static void main(String[] args) throws JMSException {
//創(chuàng)建工廠類
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(URL);
//創(chuàng)建連接
Connection conn=connectionFactory.createConnection();
//啟動連接
conn.start();
//創(chuàng)建會話
Session session=conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建目的地(消息發(fā)送的目標(biāo))
Destination destination=session.createQueue(QueueName);
//創(chuàng)建消息發(fā)送者
MessageConsumer consumer=session.createConsumer(destination);
//創(chuàng)建監(jiān)聽器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
TextMessage message=(TextMessage) msg;
try {
System.out.println("接收消息:"+" "+message.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//因?yàn)榻邮障⑹钱惒降牧。圆荒荜P(guān)閉連接
}
}
上面代碼就不用多加解釋了,可以看代碼注釋歇终,運(yùn)行上面兩個程序社证,我們會發(fā)現(xiàn)一個消息只能被一個消費(fèi)者使用(自己動手試試)
接著,我們創(chuàng)建主題模式下的生產(chǎn)者和消費(fèi)者的運(yùn)行代碼评凝,基本和上面一樣追葡,這里貼出完成的代碼。
Producer
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class AppProducter {
private static final String URL="tcp://127.0.0.1:61616";
private static final String topicName="topicTest";
public static void main(String[] args) throws JMSException {
//創(chuàng)建工廠類
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(URL);
//創(chuàng)建連接
Connection conn=connectionFactory.createConnection();
//啟動連接
conn.start();
//創(chuàng)建會話
Session session=conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建目的地(消息發(fā)送的目標(biāo))
Destination destination=session.createTopic(topicName);
//創(chuàng)建消息發(fā)布者
MessageProducer producer= session.createProducer(destination);
for(int i=0;i<100;i++) {
//創(chuàng)建消息
TextMessage message=session.createTextMessage("test"+i);
producer.send(message);
System.out.println("發(fā)送消息"+i+" "+message.getText());
//關(guān)閉連接
conn.close();
}
}
Consumer
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import jvax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 與隊(duì)列模式不同的是主題模式要提前訂閱消息(即先啟動消費(fèi)者)奕短,不然消費(fèi)者不會受到任何消息
*
*/
public class AppConsumer {
private static final String URL="tcp://127.0.0.1:61616";
private static final String topicName="topicTest";
public static void main(String[] args) throws JMSException {
//創(chuàng)建工廠類
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(URL);
//創(chuàng)建連接
Connection conn=connectionFactory.createConnection();
//啟動連接
conn.start();
//創(chuàng)建會話
Session session=conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建目的地(消息發(fā)送的目標(biāo))
Destination destination=session.createTopic(topicName);
//創(chuàng)建消息發(fā)送者
MessageConsumer consumer=session.createConsumer(destination);
//創(chuàng)建監(jiān)聽器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
TextMessage message=(TextMessage) msg;
try {
System.out.println("接收消息:"+" "+message.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//因?yàn)榻邮障⑹钱惒降囊巳猓圆荒荜P(guān)閉連接
}
}
這時(shí)我們運(yùn)行代碼會發(fā)送每一個消息可以被多個消費(fèi)者使用,當(dāng)一個消息被一個消費(fèi)者使用戶翎碑,它不會取出來谬返,而是等待被下一個消費(fèi)者使用(與隊(duì)列模式恰好相反)。
通過上面的介紹日杈,我們了解到了ActiveMQ在兩種不同模式下實(shí)現(xiàn)簡單消息的推送遣铝,在后面的文章中我們會實(shí)現(xiàn)與spring的整合佑刷,歡迎討論。
文章來源于網(wǎng)絡(luò)酿炸。
感謝大家閱讀瘫絮,歡迎大家私信討論。給大家推薦一個Java技術(shù)交流群:473984645里面會分享一些資深架構(gòu)師錄制的視頻資料:有Spring填硕,MyBatis麦萤,Netty源碼分析,高并發(fā)廷支、高性能、分布式栓辜、微服務(wù)架構(gòu)的原理恋拍,JVM性能優(yōu)化、分布式架構(gòu)等這些成為架構(gòu)師必備的知識體系藕甩。還能領(lǐng)取免費(fèi)的學(xué)習(xí)資源施敢,目前受益良多!
推薦大家閱讀:
Java高級架構(gòu)學(xué)習(xí)資料分享+架構(gòu)師成長之路?
個人整理了更多資料以PDF文件的形式分享給大家狭莱,需要查閱的程序員朋友可以來免費(fèi)領(lǐng)取僵娃。還有我的學(xué)習(xí)筆記PDF文件也免費(fèi)分享給有需要朋友!