ActiveMQ 簡介:ActiveMQ 是Apache出品,最流行的习勤,能力強(qiáng)勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn)焙格,盡管JMS規(guī)范出臺已經(jīng)是很久的事情了图毕,但是JMS在當(dāng)今的J2EE應(yīng)用中間仍然扮演著特殊的地位。
在學(xué)習(xí)之前我們先要知道一下幾個問題
ActiveMQ特性
⒈ 多種語言和協(xié)議編寫客戶端间螟。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP吴旋。應(yīng)用協(xié)議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
⒉ 完全支持JMS1.1和J2EE 1.4規(guī)范 (持久化损肛,XA消息厢破,事務(wù))
⒊ 對Spring的支持,ActiveMQ可以很容易內(nèi)嵌到使用Spring的系統(tǒng)里面去治拿,而且也支持Spring2.0的特性
⒋ 通過了常見J2EE服務(wù)器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的測試摩泪,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業(yè)服務(wù)器上
⒌ 支持多種傳送協(xié)議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
⒍ 支持通過JDBC和journal提供高速的消息持久化
⒎ 從設(shè)計(jì)上保證了高性能的集群劫谅,客戶端-服務(wù)器见坑,點(diǎn)對點(diǎn)
⒏ 支持Ajax
⒐ 支持與Axis的整合
⒑ 可以很容易的調(diào)用內(nèi)嵌JMS provider嚷掠,進(jìn)行測試
為什么要用ActiveMQ?
高性能的數(shù)據(jù)分發(fā):ActiveMQ的這個特性主要關(guān)注的是消息的吞吐率以及高效的消息投遞路由荞驴,中心思想就是在一個大的網(wǎng)絡(luò)中盡可能快的傳遞大量的并且快速改變的消息數(shù)據(jù)不皆。
鑒于大量的數(shù)據(jù)和頻繁的數(shù)據(jù)數(shù)據(jù)交換負(fù)荷很高,所以這種情況下很少使用數(shù)據(jù)持久化熊楼,在失敗時丟失幾條數(shù)據(jù)也是可以接受的因?yàn)槔系臄?shù)據(jù)通常都不再被需要了霹娄,最新的數(shù)據(jù)才是真正我們關(guān)心的。
集群和通用的異步消息模型:這種特性重點(diǎn)在網(wǎng)絡(luò)延遲和速度鲫骗,當(dāng)實(shí)現(xiàn)一個web或者EJB集群的時候犬耻,目的是維護(hù)一個node集群,典型的是使用多點(diǎn)廣播來discovery&keep-alive然后使用socket直接連接這些node來進(jìn)行高效的通信执泰。
這和使用JMS provider在EJB-Style或者WS-style的服務(wù)中作為RMI層是很相似的枕磁,都能使用多點(diǎn)廣播來discovery&keep-alive并且使用socket直接連接通信以減少延遲。所以與其使用不同的服務(wù)器來協(xié)調(diào)client之間的通信术吝,不如讓client直接和彼此通信來減少延遲计济。
Ps: 此段主要講的是activeMQ的node之間會有高效的異步通信機(jī)制,網(wǎng)絡(luò)延遲小并且高效
網(wǎng)絡(luò)數(shù)據(jù)流:這種特性關(guān)注點(diǎn)是activeMQ的ajax支持排苍,越來越多的人希望數(shù)據(jù)流能實(shí)時的傳遞到網(wǎng)絡(luò)瀏覽器中峭咒,例如金融行業(yè)的股票價格數(shù)據(jù),實(shí)時的在展示IM會話纪岁,實(shí)時拍賣并且動態(tài)更新內(nèi)容和消息凑队。
鑒于這種情況,我們把ActiveMQ集成到了web容器中來提供封閉的網(wǎng)絡(luò)集成幔翰,使用HTTP POSTS來發(fā)布消息并且在js中通過HTTP GET來接受并展示消息漩氨。
簡易的使用HTTP來傳遞消息的API:ActiveMQ這種特性主要關(guān)注跨語言跨技術(shù)的連接能力,我們?yōu)閙essage broker提供了一個HTTP接口允許跨語言或者技術(shù)來進(jìn)行簡單的發(fā)送和接受消息遗增。使用HTTP POST將消息發(fā)送到message broker叫惊,使用HTTP GET從message broker獲取消息,使用URI并且指定參數(shù)來決定接受/發(fā)送的目的地做修。
什么場景下使用了ActiveMQ霍狰?
在分布式應(yīng)用的環(huán)境中,多個獨(dú)立運(yùn)行的系統(tǒng)之間饰及,需要相互調(diào)用蔗坯,相互通訊的場景下使用。
在我們的商城中有很多地方需要用到ActiveMQ通訊的燎含,我簡單列舉幾個吧:
商品系統(tǒng)在確認(rèn)訂單后宾濒,發(fā)送一個“生產(chǎn)訂單”的主題消息到ActiveMQ,訂單系統(tǒng)收到該主題消息后會做相應(yīng)的生產(chǎn)訂單及相關(guān)的所有業(yè)務(wù)操作屏箍。
在支付界面中绘梦,點(diǎn)擊確認(rèn)支付橘忱,調(diào)用第三方支付接口,支付成功后回調(diào)我們的支付系統(tǒng)卸奉,然后支付系統(tǒng)會發(fā)送一個“支付成功”的主題消息到ActiveMQ钝诚,倉儲系統(tǒng)收到該主題消息后會做庫存的相關(guān)操作及其相應(yīng)的業(yè)務(wù)。
在網(wǎng)頁靜態(tài)化中榄棵,我們修改了某些數(shù)據(jù)敲长,那么會發(fā)送一個“生成靜態(tài)化”的隊(duì)列消息到ActiveMQ,負(fù)責(zé)生成靜態(tài)化頁面的方法接收到該消息后秉继,從新生成靜態(tài)化頁面祈噪,從而達(dá)到更新頁面數(shù)據(jù)的目的。
JMS
JMS(Java Messaging Service)應(yīng)用程序接口,是Java平臺上有關(guān)面向消息中間件(MOM)的技術(shù)規(guī)范API尚辑,它便于消息系統(tǒng)中的Java應(yīng)用程序進(jìn)行消息交換,并且通過提供標(biāo)準(zhǔn)的產(chǎn)生辑鲤、發(fā)送、接收消息的接口簡化企業(yè)應(yīng)用的開發(fā)杠茬,用于在兩個應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息瓢喉,進(jìn)行異步通信。翻譯為Java消息服務(wù)栓票。
JMS規(guī)范:
JMS定義了Java中訪問消息中間件的接口,并沒有給予實(shí)現(xiàn)走贪,實(shí)現(xiàn)JSM接口的消息中間件稱為JMS Provider,例如ActiveMQ
幾個關(guān)于JMS的概念:
1坠狡、JMS Provider:實(shí)現(xiàn)JMS接口和規(guī)范的消息中間件
2、JMS Message:JMS消息逃沿,分3個部分:
(1)消息頭:每個消息頭字段都有相應(yīng)的getter和setter方法
(2)消息屬性:如果需要除消息頭字段以外的值,那么可以使用消息屬性
(3)消息體:封裝具體的消息數(shù)據(jù)
3凯亮、JMS Producer:消息生產(chǎn)者,創(chuàng)建和發(fā)送JMS消息的客戶端應(yīng)用
4触幼、JMS Consumer:消息消費(fèi)者,接收和處理JMS消息的客戶端應(yīng)用
5置谦、JMS Domain:消息傳遞域,JMS規(guī)范中定義了兩種消息傳遞域:
(1)點(diǎn)對點(diǎn)(point-to-point瘟栖,簡寫PTP或P2P)消息傳遞域,該消息傳遞域發(fā)送的消息目的地稱為隊(duì)列(queue)
特點(diǎn):
a半哟、每個消息只能有一個消費(fèi)者
b、消息的生產(chǎn)者和消費(fèi)者之間沒有時間上的相關(guān)性签餐,無論消息消費(fèi)者在提取消息的時候寓涨,消息生產(chǎn)者是否處于運(yùn)行狀態(tài),消息消費(fèi)者還是可以提取消息
(2)發(fā)布/訂閱(publish/subscribe氯檐,簡寫pub/sub)消息傳遞域戒良,該消息傳遞域發(fā)送的消息目的地稱為主題(topic)
特點(diǎn):
a、每個消息可以有多個消費(fèi)者
b冠摄、生產(chǎn)者和消費(fèi)者之間有時間上的相關(guān)性糯崎,訂閱一個主題的消費(fèi)者只能消費(fèi)自它訂閱之后發(fā)布的消息。
ActiveMQ和JMS是什么關(guān)系河泳?
JMS和ActiveMQ是接口和實(shí)現(xiàn)的關(guān)系沃呢,ActiveMQ完全支持JMS1.1規(guī)范的JMS Provider實(shí)現(xiàn)的消息中間件(MOM)
JMS的消息傳遞域有什么?各自有什么特點(diǎn)拆挥?簡單列舉一下各自的一些使用場景
JMS的消息傳遞域有點(diǎn)對點(diǎn)(point-to-point薄霜,簡寫PTP或P2P)消息傳遞域和發(fā)布/訂閱(publish/subscribe,簡寫pub/sub)消息傳遞域纸兔。
點(diǎn)對點(diǎn)消息傳遞域的特點(diǎn):每個消息只能有一個消費(fèi)者并且消息的生產(chǎn)者和消費(fèi)者之間沒有時間上的相關(guān)性黄锤,無論消息消費(fèi)者在提取消息的時候,消息生產(chǎn)者是否處于運(yùn)行狀態(tài)食拜,消息消費(fèi)者還是可以提取消息
發(fā)布/訂閱消息傳遞域的特點(diǎn):每個消息可以有多個消費(fèi)者并且生產(chǎn)者和消費(fèi)者之間有時間上的相關(guān)性鸵熟,訂閱一個主題的消費(fèi)者只能消費(fèi)自它訂閱之后發(fā)布的消息。
ActiveMQ的原理是什么负甸?
首先需要創(chuàng)建連接工廠(ConnectionFactory )流强,用于連接ActiveMQ的服務(wù)端,其中ConnectionFactory 又有兩個子類分別是隊(duì)列連接工廠(QueueConnectionFactory)和主題連接工廠(TopicConnectionFactory)呻待,在我們發(fā)送消息的時候具體使用哪個連接工廠是由消息發(fā)送的目的地(Destination)來決定的打月,Destination是消息生產(chǎn)者的消息發(fā)送目標(biāo)或者說消息消費(fèi)者的消息來源。對于消息生產(chǎn)者來說蚕捉,它的Destination是某個隊(duì)列(Queue)或某個主題(Topic)奏篙,對于消息消費(fèi)者來說,它的Destination也是某個隊(duì)列或主題(即消息來源)
然后在確定了使用哪種連接工廠后,就通過該連接工廠創(chuàng)建出連接對象(Connection)秘通,該連接對象是對TCP/IP socket的包裝为严。連接對象可以產(chǎn)生多個會話(session),通過會話來發(fā)送消息(Message)肺稀,這就是我們發(fā)送和接受的消息第股。
消息中間件
消息中間件MOM(Message Oriented Middleware)是在分布式環(huán)境中夕吻,兩個或多個獨(dú)立運(yùn)行的系統(tǒng)之間涉馅,提供消息通訊作用的中介稚矿。
如圖所示,假設(shè)當(dāng)我們客戶端現(xiàn)在需要確認(rèn)商品的訂單,首先需要調(diào)用商品服務(wù)的方法,商品系統(tǒng)服務(wù)不能直接與訂單系統(tǒng)通信.這時商品系統(tǒng)只能通過發(fā)送消息msg1(id=xxx)到消息中間件,當(dāng)訂單系統(tǒng)接收到消息中間件的消息msg1時就會立刻執(zhí)行生成訂單的方法.這就是我們消息中間件的角色
消息中間件的作用:
把各個系統(tǒng)之間服務(wù)的調(diào)用以消息通訊的方式交互
消息中間件的特點(diǎn):
1盐捷、消息異步接收碉渡,消息發(fā)送者不需要等待消息接收著的響應(yīng)滞诺,提高整個應(yīng)用程序的性能
2习霹、消息可靠接收:消息發(fā)送出去后保存在一個中間容器內(nèi)淋叶,只有消息接收者收到消息后才刪除消息
消息中間件的主要應(yīng)用場景:
在多個系統(tǒng)間進(jìn)行通訊的時候煞檩,通常會要求:
(1)可靠傳輸栅贴,數(shù)據(jù)不能丟檐薯,有的時候,也會要求不能重復(fù)傳輸
(2)異步傳輸捆昏,否則各個系統(tǒng)同步發(fā)送接收數(shù)據(jù)屡立,相互等待搀军,造成系統(tǒng)性能低下
比較流行的消息中間件有哪些:
收費(fèi):IBM MQSeries罩句,BEA WebLogic JMS Server门烂,Oracle AQ
開源:ActiveMQ屯远,RocketMQ捕虽,Kafka
ActiveMQ安裝使用
下載ActiveMQ運(yùn)行程序:
1泄私、從官網(wǎng)上下載運(yùn)行程序:http://activemq.apache.org/download.html
2晌端、拷貝到你要安裝的位置直接解壓就好了
啟動ActiveMQ
Windows版:雙擊bin目錄下activemq.bat
Unix/Linux/Cygwin版本:1.需要使用終端命令(cd +apache-activemq-5.10.0的根目錄 )
cd /Users/zhangshuai/Library/apache-activemq-5.10.0
- bin/activemq start
然后訪問地址為localhost:8161/admin
如果需要輸入賬號與密碼可以直接都輸入admin
在conf目錄中有個activemq.xml文件蓬痒,該文件有3個重要設(shè)置:
1梧奢、設(shè)置ActiveMQ的消息監(jiān)聽端口粹断,默認(rèn)為61616
2瓶埋、設(shè)置ActiveMQ的管理界面的端口,默認(rèn)為8161曾撤。ActiveMQ使用的是內(nèi)嵌的jetty服務(wù)器來運(yùn)行它提供的管理界面的挤悉,該管理界面的訪問地址為localhost:8161/admin
3装悲、設(shè)置ActiveMQ的訪問用戶名和密碼诀诊,默認(rèn)用戶名和密碼都為admin阅嘶。設(shè)置方法讯柔,添加如下配置:
<!-- 添加訪問ActiveMQ的賬號密碼 -->
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="zhangshuai" password="123456" groups="users,admins"/>
</users>
</simpleAuthenticationPlugin>
</plugins>
Springboot整合ActiveMQ
在這里我以一個簡單商城為案例
1.向商品的服務(wù)端shop-server-goods和shop-server-order的pom.xml中添加springboot-ActiveMQ依賴
<!--spring-boot的activeMQ依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
2.向商品的服務(wù)端shop-server-goods和shop-server-order的application.properties添加屬性配置
在springboot應(yīng)用程序中的主配置文件application.properties文件中添加如下ActiveMQ的屬性配置
#activeMQ部署地址
spring.activemq.broker-url=tcp://127.0.0.1:61616
#activeMQ訪問用戶名
spring.activemq.user=admin
#activeMQ訪問密碼
spring.activemq.password=admin
主要我們的兩個服務(wù)端都連接到同一個ActiveMQ上了
接下來我們先演示發(fā)送Queue模型的消息(以創(chuàng)建訂單為例子)
發(fā)送queue模型的消息的關(guān)鍵是創(chuàng)建一個ActiveMQQueue對象
創(chuàng)建一個消息生產(chǎn)者類CreateOrderInfoProducer.java
/**
* 創(chuàng)建訂單的消息生產(chǎn)類
*/
@Component//交給spring容器管理
public class CreateOrderInfoProducer {
//確定消息發(fā)送到哪個地點(diǎn)
Destination destination = new ActiveMQQueue("shop.queue.createOrderInfo");
//注入spirng幫我們封裝好的JMS模板類
@Autowired
private JmsTemplate jmsTemplate;
/**
* 發(fā)送創(chuàng)建訂單消息的方法
* userId 哪個用戶下的訂單
*/
public void sendMessage(Long userId){
//實(shí)用JMS模板類發(fā)送消息
jmsTemplate.convertAndSend(destination,userId);
}
}
在需要發(fā)送該消息的業(yè)務(wù)調(diào)用:
//調(diào)用發(fā)送創(chuàng)建訂單消息的方法
createOrderInfoProducer.sendMessage(1L);
在默認(rèn)情況下消息消費(fèi)者監(jiān)聽的是Queue消息域的模型
創(chuàng)建訂單消息的消費(fèi)對象
/**
* 創(chuàng)建訂單消息的消費(fèi)對象
*/
//把消息接收者交給spring容器管理
@Component
public class CreateOrderInfoConsumer {
@Autowired
OrderInfoMapper dao;
/**
* 使用spring給我們提供的JMS監(jiān)聽注解,監(jiān)聽一個消息地點(diǎn)
* 注意极祸,監(jiān)聽的消息地點(diǎn)要和消息發(fā)送到的地點(diǎn)一致
* @param userId 消息生產(chǎn)者發(fā)送過來的userId
*/
@JmsListener(destination = "shop.queue.createOrderInfo")
public void receiveMessage(Long userId) {
//做具體的業(yè)務(wù)
OrderInfo orderInfo = new OrderInfo();
orderInfo.setUserId(userId);
orderInfo.setOrderSn("222222");
orderInfo.setOrderStatus(Byte.valueOf("0"));
orderInfo.setPayStatus(Byte.valueOf("0"));
orderInfo.setShippingStatus(Byte.valueOf("0"));
dao.insert(orderInfo);
}
}
發(fā)送Topic模型的消息
消息生產(chǎn)者:發(fā)送Topic模型的消息的關(guān)鍵是創(chuàng)建一個ActiveMQTopic對象,還有把消息發(fā)送到的地方修改一下浴捆,其余的代碼跟Queue模型是一樣的
消息消費(fèi)者:因?yàn)樵谀J(rèn)情況下消息消費(fèi)者監(jiān)聽的是Queue消息域的模型稿械,所以想要springboot應(yīng)用監(jiān)聽Topic消息域模型的消息美莫,就必須修改springboot的默認(rèn)配置
修改方法,在消息消費(fèi)者應(yīng)用的application.properties文件添加一個配置屬性:
是否使用訂閱和發(fā)布的模型監(jiān)聽器窝撵,默認(rèn)為false碌奉,也就是說默認(rèn)使用的是點(diǎn)對點(diǎn)的模型監(jiān)聽器
spring.jms.pub-sub-domain=true
還有把監(jiān)聽的消費(fèi)接收地方修改一下,其余的代碼跟Queue消息域的模型一樣
Queue和Topic兩種模型同時存在
新建一個配置類:springboot配置類嫉拐,使ActiveMQ同時支持監(jiān)聽Queue和Topic模型的消息
自定義模型的監(jiān)聽容器bean:
在新建的配置類中添加兩個bean婉徘,分別用于監(jiān)聽Queue和Topic兩個模型的消息盖呼,并且注入到spring容器中
/**
* 用于監(jiān)聽Topic模型消息的容器bean
* @param connectionFactory
* @return
*/
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
//是否使用訂閱和發(fā)布的模型監(jiān)聽器塌计,默認(rèn)為false侯谁,也就是說默認(rèn)使用的是點(diǎn)對點(diǎn)的模型監(jiān)聽器
bean.setPubSubDomain(true);
bean.setConnectionFactory(connectionFactory);
return bean;
}
/**
* 用于監(jiān)聽Queue模型消息的容器bean
* @param connectionFactory
* @return
*/
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(connectionFactory);
return bean;
}
按需選擇監(jiān)聽器容器
現(xiàn)在我們已經(jīng)定義出了兩個監(jiān)聽器的容器墙贱,分別用于監(jiān)聽Queue和Topic模型的消息了惨撇,現(xiàn)在要做的就是在消息消費(fèi)者中選擇自己需要監(jiān)聽的是哪一種模型
@JmsListener(destination = "shop.topic.createOrderInfo",containerFactory="jmsListenerContainerTopic")
在原本的@JmsListener基礎(chǔ)上加一個屬性:containerFactory魁衙,該屬性值就是我們自定義監(jiān)聽器的Bean的方法名