什么是activeMQ
activeMQ是一種開源的移层,實(shí)現(xiàn)了JMS1.1規(guī)范的,面向消息(MOM)的中間件赫粥,為應(yīng)用程序提供高效的观话、可擴(kuò)展的、穩(wěn)定的和安全的企業(yè)級(jí)消息通信越平。
ActiveMQ的作用以及原理
Activemq 的作用就是系統(tǒng)之間進(jìn)行通信剃毒。 當(dāng)然可以使用其他方式進(jìn)行系統(tǒng)間通信暴备, 如果使用 Activemq 的話可以對(duì)系統(tǒng)之間的調(diào)用進(jìn)行解耦, 實(shí)現(xiàn)系統(tǒng)間的異步通信。 原理就是生產(chǎn)者生產(chǎn)消息胰伍, 把消息發(fā)送給activemq台汇。 Activemq 接收到消息衷咽, 然后查看有多少個(gè)消費(fèi)者悬嗓, 然后把消息轉(zhuǎn)發(fā)給消費(fèi)者, 此過程中生產(chǎn)者無需參與。 消費(fèi)者接收到消息后做相應(yīng)的處理和生產(chǎn)者沒有任何關(guān)系
activemq的幾種通信方式
(1)publish(發(fā)布)-subscribe(訂閱)(發(fā)布-訂閱方式)
發(fā)布/訂閱方式用于多接收客戶端的方式.作為發(fā)布訂閱的方式舟肉,可能存在多個(gè)接收客戶端修噪,并且接收端客戶端與發(fā)送客戶端存在時(shí)間上的依賴。一個(gè)接收端只能接收他創(chuàng)建以后發(fā)送客戶端發(fā)送的信息路媚。作為subscriber ,在接收消息時(shí)有兩種方法割按,destination的receive方法,和實(shí)現(xiàn)message listener 接口的onMessage 方法磷籍。
(2)p2p(point-to-point)(點(diǎn)對(duì)點(diǎn))
p2p的過程則理解起來比較簡(jiǎn)單适荣。它好比是兩個(gè)人打電話,這兩個(gè)人是獨(dú)享這一條通信鏈路的院领。一方發(fā)送消息弛矛,另外一方接收,就這么簡(jiǎn)單比然。
ActiveMQ服務(wù)器宕機(jī)怎么辦丈氓?
這得從ActiveMQ的儲(chǔ)存機(jī)制說起。在通常的情況下强法,非持久化消息是存儲(chǔ)在內(nèi)存中的万俗,持久化消息是存儲(chǔ)在文件中的,它們的最大限制在配置文件的<systemUsage>節(jié)點(diǎn)中配置饮怯。但是闰歪,在非持久化消息堆積到一定程度,內(nèi)存告急的時(shí)候蓖墅,ActiveMQ會(huì)將內(nèi)存中的非持久化消息寫入臨時(shí)文件中库倘,以騰出內(nèi)存。雖然都保存到了文件里论矾,但它和持久化消息的區(qū)別是教翩,重啟后持久化消息會(huì)從文件中恢復(fù),非持久化的臨時(shí)文件會(huì)直接刪除贪壳。
解決方案:盡量不要用非持久化消息饱亿,非要用的話,將臨時(shí)文件限制盡可能的調(diào)大闰靴。
丟消息怎么辦彪笼?
這得從java的java.net.SocketException異常說起。簡(jiǎn)單點(diǎn)說就是當(dāng)網(wǎng)絡(luò)發(fā)送方發(fā)送一堆數(shù)據(jù)传黄,然后調(diào)用close關(guān)閉連接之后杰扫。這些發(fā)送的數(shù)據(jù)都在接收者的緩存里队寇,接收者如果調(diào)用read方法仍舊能從緩存中讀取這些數(shù)據(jù)膘掰,盡管對(duì)方已經(jīng)關(guān)閉了連接。但是當(dāng)接收者嘗試發(fā)送數(shù)據(jù)時(shí),由于此時(shí)連接已關(guān)閉识埋,所以會(huì)發(fā)生異常凡伊,這個(gè)很好理解。不過需要注意的是窒舟,當(dāng)發(fā)生SocketException后系忙,原本緩存區(qū)中數(shù)據(jù)也作廢了,此時(shí)接收者再次調(diào)用read方法去讀取緩存中的數(shù)據(jù)惠豺,就會(huì)報(bào)Software caused connection abort: recv failed錯(cuò)誤银还。
通過抓包得知,ActiveMQ會(huì)每隔10秒發(fā)送一個(gè)心跳包洁墙,這個(gè)心跳包是服務(wù)器發(fā)送給客戶端的蛹疯,用來判斷客戶端死沒死。如果你看過上面第一條热监,就會(huì)知道非持久化消息堆積到一定程度會(huì)寫到文件里捺弦,這個(gè)寫的過程會(huì)阻塞所有動(dòng)作,而且會(huì)持續(xù)20到30秒孝扛,并且隨著內(nèi)存的增大而增大列吼。當(dāng)客戶端發(fā)完消息調(diào)用connection.close()時(shí),會(huì)期待服務(wù)器對(duì)于關(guān)閉連接的回答苦始,如果超過15秒沒回答就直接調(diào)用socket層的close關(guān)閉tcp連接了寞钥。這時(shí)客戶端發(fā)出的消息其實(shí)還在服務(wù)器的緩存里等待處理,不過由于服務(wù)器心跳包的設(shè)置陌选,導(dǎo)致發(fā)生了java.net.SocketException異常凑耻,把緩存里的數(shù)據(jù)作廢了,沒處理的消息全部丟失柠贤。
解決方案:用持久化消息香浩,或者非持久化消息及時(shí)處理不要堆積,或者啟動(dòng)事務(wù)臼勉,啟動(dòng)事務(wù)后邻吭,commit()方法會(huì)負(fù)責(zé)任的等待服務(wù)器的返回,也就不會(huì)關(guān)閉連接導(dǎo)致消息丟失了宴霸。
持久化消息非常慢
默認(rèn)的情況下囱晴,非持久化的消息是異步發(fā)送的,持久化的消息是同步發(fā)送的瓢谢,遇到慢一點(diǎn)的硬盤畸写,發(fā)送消息的速度是無法忍受的。但是在開啟事務(wù)的情況下氓扛,消息都是異步發(fā)送的枯芬,效率會(huì)有2個(gè)數(shù)量級(jí)的提升论笔。所以在發(fā)送持久化消息時(shí),請(qǐng)務(wù)必開啟事務(wù)模式千所。其實(shí)發(fā)送非持久化消息時(shí)也建議開啟事務(wù)狂魔,因?yàn)楦静粫?huì)影響性能。
消息的不均勻消費(fèi)
有時(shí)在發(fā)送一些消息之后淫痰,開啟2個(gè)消費(fèi)者去處理消息最楷。會(huì)發(fā)現(xiàn)一個(gè)消費(fèi)者處理了所有的消息,另一個(gè)消費(fèi)者根本沒收到消息待错。原因在于ActiveMQ的prefetch機(jī)制籽孙。當(dāng)消費(fèi)者去獲取消息時(shí),不會(huì)一條一條去獲取火俄,而是一次性獲取一批蚯撩,默認(rèn)是1000條。這些預(yù)獲取的消息烛占,在還沒確認(rèn)消費(fèi)之前胎挎,在管理控制臺(tái)還是可以看見這些消息的,但是不會(huì)再分配給其他消費(fèi)者忆家,此時(shí)這些消息的狀態(tài)應(yīng)該算作“已分配未消費(fèi)”犹菇,如果消息最后被消費(fèi),則會(huì)在服務(wù)器端被刪除芽卿,如果消費(fèi)者崩潰揭芍,則這些消息會(huì)被重新分配給新的消費(fèi)者。但是如果消費(fèi)者既不消費(fèi)確認(rèn)卸例,又不崩潰称杨,那這些消息就永遠(yuǎn)躺在消費(fèi)者的緩存區(qū)里無法處理。更通常的情況是筷转,消費(fèi)這些消息非常耗時(shí)姑原,你開了10個(gè)消費(fèi)者去處理,結(jié)果發(fā)現(xiàn)只有一臺(tái)機(jī)器吭哧吭哧處理呜舒,另外9臺(tái)啥事不干锭汛。
解決方案:將prefetch設(shè)為1,每次處理1條消息袭蝗,處理完再去取唤殴,這樣也慢不了多少。
死信隊(duì)列
如果你想在消息處理失敗后到腥,不被服務(wù)器刪除朵逝,還能被其他消費(fèi)者處理或重試,可以關(guān)閉AUTO_ACKNOWLEDGE乡范,將ack交由程序自己處理配名。那如果使用了AUTO_ACKNOWLEDGE啤咽,消息是什么時(shí)候被確認(rèn)的,還有沒有阻止消息確認(rèn)的方法段誊?有闰蚕!
消費(fèi)消息有2種方法栈拖,一種是調(diào)用consumer.receive()方法连舍,該方法將阻塞直到獲得并返回一條消息。這種情況下涩哟,消息返回給方法調(diào)用者之后就自動(dòng)被確認(rèn)了索赏。另一種方法是采用listener回調(diào)函數(shù),在有消息到達(dá)時(shí)贴彼,會(huì)調(diào)用listener接口的onMessage方法潜腻。在這種情況下,在onMessage方法執(zhí)行完畢后器仗,消息才會(huì)被確認(rèn)融涣,此時(shí)只要在方法中拋出異常,該消息就不會(huì)被確認(rèn)精钮。那么問題來了威鹿,如果一條消息不能被處理,會(huì)被退回服務(wù)器重新分配轨香,如果只有一個(gè)消費(fèi)者忽你,該消息又會(huì)重新被獲取,重新拋異常臂容。就算有多個(gè)消費(fèi)者科雳,往往在一個(gè)服務(wù)器上不能處理的消息,在另外的服務(wù)器上依然不能被處理脓杉。難道就這么退回--獲取--報(bào)錯(cuò)死循環(huán)了嗎糟秘?
在重試6次后,ActiveMQ認(rèn)為這條消息是“有毒”的球散,將會(huì)把消息丟到死信隊(duì)列里蚌堵。如果你的消息不見了,去ActiveMQ.DLQ里找找沛婴,說不定就躺在那里吼畏。
java消息服務(wù)
不同系統(tǒng)之間的信息交換,是我們開發(fā)中比較常見的場(chǎng)景嘁灯,比如系統(tǒng)A要把數(shù)據(jù)發(fā)送給系統(tǒng)B泻蚊,這個(gè)問題我們應(yīng)該如何去處理? 1999年丑婿,原來的SUN公司領(lǐng)銜提出了一種面向消息的中間件服務(wù)--JMS規(guī)范(標(biāo)準(zhǔn))性雄;常用的幾種信息交互技術(shù)(httpClient没卸、hessian、dubbo秒旋、jms约计、webservice 五種).
JMS概述
JMS即Java消息服務(wù)(Java Message Service的簡(jiǎn)稱),是Java EE 的標(biāo)準(zhǔn)/規(guī)范之一迁筛。這種規(guī)范(標(biāo)準(zhǔn))指出:消息的發(fā)送應(yīng)該是異步的煤蚌、非阻塞的。也就是說消息的發(fā)送者發(fā)送完消息后就直接返回了细卧,不需要等待接收者返回后才能返回尉桩,發(fā)送者和接收者可以說是互不影響。所以這種規(guī)范(標(biāo)準(zhǔn))能夠減輕或消除系統(tǒng)瓶頸贪庙,實(shí)現(xiàn)系統(tǒng)之間去除耦合蜘犁,提高系統(tǒng)的整體可伸縮性和靈活性。JMS只是Java EE中定義的一組標(biāo)準(zhǔn)API止邮,它自身并不是一個(gè)消息服務(wù)系統(tǒng)这橙,它是消息傳送服務(wù)的一個(gè)抽象,也就是說它定義了消息傳送的接口而并沒有具體實(shí)現(xiàn)导披。
ActiveMQ概述:
我們知道JMS只是消息服務(wù)的一組規(guī)范和接口屈扎,并沒有具體的實(shí)現(xiàn),而ActiveMQ就是JMS規(guī)范的具體實(shí)現(xiàn)盛卡;它是Apache下的一個(gè)項(xiàng)目助隧,采用Java語言開發(fā);是一款非常流行的開源消息服務(wù)器.
ActiveMQ與JMS關(guān)系
我們知道滑沧,JMS只是定義了一組有關(guān)消息傳送的規(guī)范和標(biāo)準(zhǔn)并村,并沒有真正實(shí)現(xiàn),也就說JMS只是定義了一組接口而已滓技;就像JDBC抽象了關(guān)系數(shù)據(jù)庫訪問哩牍、JPA抽象了對(duì)象與關(guān)系數(shù)據(jù)庫映射、JNDI抽象了命名目錄服務(wù)訪問一樣令漂,JMS具體的實(shí)現(xiàn)由不同的消息中間件廠商提供膝昆,比如Apache ActiveMQ就是JMS規(guī)范的具體實(shí)現(xiàn),Apache ActiveMQ才是一個(gè)消息服務(wù)系統(tǒng)叠必,而JMS不是荚孵。
基本要素
1、生產(chǎn)者producer ; 2纬朝、消費(fèi)者consumer ; 3收叶、消息服務(wù)broker
JMS兩種消息傳送模式
點(diǎn)對(duì)點(diǎn)( Point-to-Point):專門用于使用隊(duì)列Queue傳送消息;基于隊(duì)列Queue的點(diǎn)對(duì)點(diǎn)消息只能被一個(gè)消費(fèi)者消費(fèi)共苛,如多個(gè)消費(fèi)者都注冊(cè)到同一個(gè)消息隊(duì)列上判没,當(dāng)生產(chǎn)者發(fā)送一條消息后蜓萄,而只有其中一個(gè)消費(fèi)者會(huì)接收到該消息,而不是所有消費(fèi)者都能接收到該消息澄峰。
發(fā)布/訂閱(Publish/Subscribe):專門用于使用主題Topic傳送消息嫉沽。基于主題的發(fā)布與訂閱消息能被多個(gè)消費(fèi)者消費(fèi)俏竞,生產(chǎn)者發(fā)送的消息绸硕,所有訂閱了該topic的消費(fèi)者都能接收到。
JMS API 概覽
JMS API可以分為3個(gè)主要部分:
1胞此、公共API:可用于向一個(gè)隊(duì)列或主題發(fā)送消息或從其中接收消息臣咖;
2跃捣、點(diǎn)對(duì)點(diǎn)API:專門用于使用隊(duì)列Queue傳送消息漱牵;
3、發(fā)布/訂閱API:專門用于使用主題Topic傳送消息疚漆。
JMS公共API
在JMS公共API內(nèi)部酣胀,和發(fā)送與接收消息有關(guān)的JMS API接口主要是:ConnectionFactory / Connection / Session / Message / Destination / MessageProducer / MessageConsumer . 它們的關(guān)系是:一旦有了ConnectionFactory,就可以創(chuàng)建Connection娶聘,一旦有了Connection闻镶,就可以創(chuàng)建Session,而一旦有了Session丸升,就可以創(chuàng)建 Message 铆农、MessageProducer 和 MessageConsumer 。
JMS點(diǎn)對(duì)點(diǎn)API
點(diǎn)對(duì)點(diǎn)(p2p)消息傳送模型API是指JMS API之內(nèi)基于隊(duì)列(Queue)的接口:QueueConnectionFactory / QueueConnection / QueueSession / Message / Queue / QueueSender / QueueReceiver .
從接口的命名可以看出狡耻,大多數(shù)接口名稱僅僅是在公共API接口
名稱之前添加Queue一詞墩剖。一般來說,使用點(diǎn)對(duì)點(diǎn)消息傳送模型的應(yīng)用程序?qū)⑹褂没陉?duì)列的API夷狰,而不使用公共API 岭皂。
JMS發(fā)布/訂閱API發(fā)布/訂閱消息傳送模型API是指JMS
API之內(nèi)基于主題(Topic)的接口:TopicConnectionFactory / TopicConnection /
TopicSession / Message / Topic / TopicPublisher / TopicSubscriber . 由于基于主題(Topic)的JMS API類似于基于隊(duì)列(Queue)
的API,因此在大多數(shù)情況下沼头,Queue這個(gè)詞會(huì)由Topic取代爷绘。
ActiveMQ點(diǎn)對(duì)點(diǎn)發(fā)送與接收消息示例
簡(jiǎn)單示例
發(fā)送
package com.kinglong.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 消息發(fā)送者
*
*/
public class Sender {
/**消息服務(wù)器的連接地址**/
public static final String BROKER_URL = "tcp://192.168.174.129:61616";
public static void main(String[] args) {
Sender sender = new Sender();
sender.sendMessage("Hello ActiveMQ.");
}
/**
* 發(fā)送消息
*
* @param msg
*/
public void sendMessage (String msg) {
Connection connection = null;
Session session = null;
MessageProducer messageProducer = null;
try {
//1.創(chuàng)建一個(gè)連接工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
//2.創(chuàng)建一個(gè)連接
connection = connectionFactory.createConnection();
//3.創(chuàng)建一個(gè)Session
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//4.創(chuàng)建消息,此處創(chuàng)建了一個(gè)文本消息
Message message = session.createTextMessage(msg);
//5.創(chuàng)建一個(gè)目的地
Destination destination = session.createQueue("myQueue");
//6.創(chuàng)建一個(gè)消息的生產(chǎn)者(發(fā)送者)
messageProducer = session.createProducer(destination);
//7.發(fā)送消息
messageProducer.send(message);
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
//關(guān)閉連接釋放資源
if (null != messageProducer) {
messageProducer.close();
}
if (null != session) {
session.close();
}
if (null != connection) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
接收
package com.kinglong.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Receiver {
/**消息服務(wù)器的連接地址**/
public static final String BROKER_URL = "tcp://192.168.174.129:61616";
public static void main(String[] args) {
Receiver receiver = new Receiver();
receiver.receiveMessage();
}
/**
* 接收消息
*
*/
public void receiveMessage () {
Connection connection = null;
Session session = null;
MessageConsumer messageConsumer = null;
try {
//1.創(chuàng)建一個(gè)連接工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
//2.創(chuàng)建一個(gè)連接
connection = connectionFactory.createConnection();
//3.創(chuàng)建一個(gè)Session
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//4.創(chuàng)建一個(gè)目的地
Destination destination = session.createQueue("myQueue");
//5.創(chuàng)建一個(gè)消息的消費(fèi)者(接收者)
messageConsumer = session.createConsumer(destination);
//接收消息之前进倍,需要把連接啟動(dòng)一下
connection.start();
//6.接收消息
Message message = messageConsumer.receive();
//判斷消息的類型
if (message instanceof TextMessage) { //判斷是否是文本消息
String text = ((TextMessage) message).getText();
System.out.println("接收到的消息內(nèi)容是:" + text);
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
//關(guān)閉連接釋放資源
if (null != messageConsumer) {
messageConsumer.close();
}
if (null != session) {
session.close();
}
if (null != connection) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
總結(jié)
1. session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
// 其中:Boolean.FALSE表示本次會(huì)話不開啟事務(wù)管理,假如需要開啟事務(wù)管理,將其改為Boolean.TRUE即可
//同時(shí)需要在發(fā)送消息后添加session.commit(),否則,消息是不會(huì)被提交的.
//Session.AUTO_ACKNOWLEDGE表示消息確認(rèn)機(jī)制
AUTO_ACKNOWLEDGE:自動(dòng)確認(rèn)
CLIENT_ACKNOWLEDGE:客戶端確認(rèn)
SESSION_TRANSACTED:事務(wù)確認(rèn),如果使用事務(wù)推薦使用該確認(rèn)機(jī)制
AUTO_ACKNOWLEDGE:懶散式確認(rèn),消息偶爾不會(huì)被確認(rèn),也就是消息可能會(huì)被重復(fù)發(fā)送.但發(fā)生的概率很小
2. connection.start();
//在消息接收端,接受消息前需要加入這段代碼,開啟連接,否則一樣無法獲取消息.
3. Destination destination = session.createQueue("myQueue");
//創(chuàng)建目的地時(shí),如果做測(cè)試收不到信息,可以將目的地名稱修改一下,我用的是IDEA,不清楚為何,
//有時(shí)候收不到信息,修改一下就好了,猜測(cè)可能是緩存的原因吧
發(fā)布與訂閱的topic方式實(shí)際與點(diǎn)對(duì)點(diǎn)的queue方式,代碼通用很多,只是在創(chuàng)建目的地Destination時(shí)候創(chuàng)建為
Destination destination = session.createTopic("myTopic
Queue與Topic比較
比較項(xiàng) | 隊(duì)列 | 主題 |
---|---|---|
概要說明 | point-to-ponit點(diǎn)對(duì)點(diǎn)小欻 | publish/subscribe發(fā)布訂閱消息 |
消息完整性保障 | 隊(duì)列保證每條數(shù)據(jù)都能被receiver接收 | 不保證publisher發(fā)布的每條數(shù)據(jù)土至,subscriber都能接收到 |
消息是否會(huì)丟失 | sender發(fā)送消息到目標(biāo)隊(duì)列,receiver可以同步或者異步接收到這個(gè)隊(duì)列上的消息猾昆,隊(duì)列上的消息如果暫時(shí)沒有被receiver接收到陶因,也不會(huì)丟失 | publisher發(fā)布消息到目標(biāo)主題時(shí),只有正在監(jiān)聽該主題的subscriber能夠接收到消息毡庆,如果目標(biāo)topic沒有subscriber在監(jiān)聽坑赡,該主題上的消息就會(huì)丟失烙如。 |
消息傳送策略 | 一對(duì)一的消息發(fā)布接受策略,一個(gè)sender發(fā)送的消息毅否,只能有一個(gè)receiver接收亚铁,receiver接收到后,通知mq服務(wù)器已接收螟加,mq服務(wù)器對(duì)隊(duì)列里的消息采取刪除或其他操作徘溢。 | 一對(duì)多的消息發(fā)布接收策略,監(jiān)聽同一個(gè)主題地址的多個(gè)subscriber都能接收到publisher發(fā)送的消息捆探,subscriber接收完通知mq服務(wù)器然爆。 |
拉模式與推模式
a.點(diǎn)對(duì)點(diǎn)消息,如果沒有消費(fèi)者在監(jiān)聽隊(duì)列黍图,消息將保留在隊(duì)列中曾雕,直至消息消費(fèi)者連接到隊(duì)列為止。這種消息傳遞模型是
傳統(tǒng)意義上的懶模型或輪詢模型助被。在此模型中剖张,消息不是自動(dòng)推動(dòng)給消息消費(fèi)者的,而是要由消息消費(fèi)者從隊(duì)列中請(qǐng)求獲得(拉模式)揩环。
b.pub/sub消息傳遞模型基本上是一個(gè)推模型搔弄。在該模型中,消息會(huì)自動(dòng)廣播丰滑,消息消費(fèi)者無須通過主動(dòng)請(qǐng)求或輪詢主題的方法來獲得新的消息顾犹。
ActiveMQ消息類型
1、TextMessage 文本消息:攜帶一個(gè)java.lang.String作為有效數(shù)據(jù)(負(fù)載)的消息褒墨,可用于字符串類型的信息交換炫刷;
2、ObjectMessage 對(duì)象消息:攜帶一個(gè)可以序列化的Java對(duì)象作為有效負(fù)載的消息貌亭,可用于Java對(duì)象類型的信息交換柬唯;
3、MapMessage 映射消息:攜帶一組鍵值對(duì)的數(shù)據(jù)作為有效負(fù)載的消息圃庭,有效數(shù)據(jù)值必須是Java原始數(shù)據(jù)類型(或者它們的包裝類)及String锄奢。即:byte , short , int , long , float , double , char , boolean , String
4、BytesMessage 字節(jié)消息 :攜帶一組原始數(shù)據(jù)類型的字節(jié)流作為有效負(fù)載的消息剧腻;
5拘央、StreamMessage 流消息:攜帶一個(gè)原始數(shù)據(jù)類型流作為有效負(fù)載的消息,它保持了寫入流時(shí)的數(shù)據(jù)類型书在,寫入什么類型灰伟,則讀取也需要是相同的類型;
ActiveMQ事務(wù)消息和非事務(wù)消息
消息分為事務(wù)消息和非事務(wù)消息
1、事務(wù)消息:創(chuàng)建會(huì)話Session使用transacted=true
connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
2栏账、非事務(wù)消息:創(chuàng)建會(huì)話Session使用transacted=false
connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
事務(wù)消息必須在發(fā)送和接收完消息后顯式地調(diào)用session.commit();
事務(wù)性消息帖族,不管設(shè)置何種消息確認(rèn)模式,都會(huì)自動(dòng)被確認(rèn)挡爵;與設(shè)置的確認(rèn)機(jī)制無關(guān),但官方推薦事務(wù)性消息使用事務(wù)確認(rèn)機(jī)制.
ActiveMQ消息確認(rèn)機(jī)制
消息只有在被確認(rèn)之后竖般,才認(rèn)為已經(jīng)被成功消費(fèi),然后消息才會(huì)從隊(duì)列或主題中刪除茶鹃。消息的成功消費(fèi)通常包含三個(gè)階段:
(1)涣雕、客戶接收消息;(2)闭翩、客戶處理消息;(3)挣郭、消息被確認(rèn);
確認(rèn)機(jī)制前面提過一下,共有四種:
(1)疗韵、Session.AUTO_ACKNOWLEDGE兑障;客戶(消費(fèi)者)成功從receive方法返回時(shí),或者從MessageListener.onMessage方法成功返回時(shí)伶棒,會(huì)話自動(dòng)確認(rèn)消息,然后自動(dòng)刪除消息.
(2)旺垒、Session.CLIENT_ACKNOWLEDGE彩库;客戶通過顯式調(diào)用消息的acknowledge方法確認(rèn)消息,肤无。 即在接收端調(diào)用message.acknowledge();方法,否則,消息是不會(huì)被刪除的.
(3)、Session. DUPS_OK_ACKNOWLEDGE 骇钦;不是必須確認(rèn)宛渐,是一種“懶散的”消息確認(rèn),消息可能會(huì)重復(fù)發(fā)送眯搭,在第二次重新傳送消息時(shí)窥翩,消息頭的JMSRedelivered會(huì)被置為true標(biāo)識(shí)當(dāng)前消息已經(jīng)傳送過一次,客戶端需要進(jìn)行消息的重復(fù)處理控制鳞仙。
(4)寇蚊、 Session.SESSION_TRANSACTED;事務(wù)提交并確認(rèn)棍好。
ActiveMQ持久化消息與非持久化消息
messageProducer.setDeliveryMode(DeliveryMode. NON_PERSISTENT);//不持久化
messageProducer.setDeliveryMode(DeliveryMode.
PERSISTENT);//持久化的仗岸,當(dāng)然activemq發(fā)送消息默認(rèn)都是持久化的
說明:
設(shè)置完后,如果為持久化,那么消息在沒有被消費(fèi)前都會(huì)被寫入本地磁盤kahadb文件中保存起來,即使服務(wù)器宕機(jī),也不會(huì)影響
消息.如果是非持久化的,那么,服務(wù)一旦宕機(jī)之類的情況發(fā)生,消息即會(huì)被刪除.
ActiveMQ默認(rèn)是持久化的.
ActiveMQ消息過濾
ActiveMQ提供了一種機(jī)制,可根據(jù)消息選擇器中的標(biāo)準(zhǔn)來執(zhí)行消息過濾借笙,只接收符合過濾標(biāo)準(zhǔn)的消息扒怖;
生產(chǎn)者可在消息中放入特有的標(biāo)志,而消費(fèi)者使用基于這些特定的標(biāo)志來接收消息业稼;
1盗痒、發(fā)送消息放入特殊標(biāo)志:message . setString Property ( name , value ) ;
2、接收消息使用基于特殊標(biāo)志的消息選擇器:
MessageConsumer createConsumer(Destination destination, String messageSelector);
注:消息選擇器是一個(gè)字符串低散,語法與數(shù)據(jù)庫的SQL相似俯邓,相當(dāng)于SQL語句where條件后面的內(nèi)容骡楼;
具體代碼如下:
發(fā)送端代碼:
package com.bjpowernode.activemq.selector;
import org.apache.activemq.Active
MQConnectionFactory;
import javax.jms.*;
/**
* 消息發(fā)送者
*
*/
public class Sender {
/**消息服務(wù)器的連接地址**/
public static final String BROKER_URL = "tcp://192.168.174.129:61616";
public static void main(String[] args) {
Sender sender = new Sender();
sender.sendMessage("Hello ActiveMQ.");
}
/**
* 發(fā)送消息
*
* @param msg
*/
public void sendMessage (String msg) {
Connection connection = null;
Session session = null;
MessageProducer messageProducer = null;
try {
//1.創(chuàng)建一個(gè)連接工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
//2.創(chuàng)建一個(gè)連接
connection = connectionFactory.
createConnection();
//3.創(chuàng)建一個(gè)Session
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//5.創(chuàng)建一個(gè)目的地
Destination destination = session.create
Queue("myQueue");
//6.創(chuàng)建一個(gè)消息的生產(chǎn)者(發(fā)送者)
messageProducer = session.createProducer
(destination);
//設(shè)置發(fā)送的消息是否需要持久化
messageProducer.setDeliveryMode(Delivery
Mode.NON_PERSISTENT);//這里使用不持久化
//創(chuàng)建一個(gè)循環(huán),測(cè)試消息標(biāo)識(shí)的使用
for (int i=0; i<20; i++) {
//4.創(chuàng)建消息,此處創(chuàng)建了一個(gè)文本消息
Message message = session.createText
Message(msg+i);
//將消息設(shè)置一個(gè)特有的標(biāo)識(shí)
message.setIntProperty("id", i);
//7.發(fā)送消息
messageProducer.send(message);
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
//關(guān)閉連接釋放資源
if (null != messageProducer) {
messageProducer.close();
}
if (null != session) {
session.close();
}
if (null != connection) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
接收端代碼:
package com.bjpowernode.activemq.selector;
import org.apache.activemq.ActiveMQ
ConnectionFactory;
import javax.jms.*;
public class Receiver {
/**消息服務(wù)器的連接地址**/
public static final String BROKER_URL = "tcp://192.168.174.129:61616";
public static void main(String[] args) {
Receiver receiver = new Receiver();
receiver.receiveMessage();
}
/**
* 接收消息
*
*/
public void receiveMessage () {
Connection connection = null;
Session session = null;
MessageConsumer messageConsumer = null;
try {
//1.創(chuàng)建一個(gè)連接工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
//2.創(chuàng)建一個(gè)連接
connection = connectionFactory.
createConnection();
//3.創(chuàng)建一個(gè)Session
session = connection.createSession
(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//4.創(chuàng)建一個(gè)目的地
Destination destination = session.createQueue("myQueue");
//5.創(chuàng)建一個(gè)消息的消費(fèi)者(接收者),
selector即為消息選擇器,通過選擇需要的標(biāo)識(shí),過濾消息接受id為10-15之 //間的消息
String selector = "id >=10 and id<=15";
messageConsumer = session.createConsumer
(destination, selector);
//接收消息之前稽鞭,需要把連接啟動(dòng)一下
connection.start();
while (true) {
//6.接收消息 同步接收
Message message = messageConsumer.
receive();
//判斷消息的類型
if (message instanceof TextMessage)
{ //判斷是否是文本消息
String text = ((TextMessage) message).getText();
System.out.println("接收到的消息內(nèi)容是:" + text);
}
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
//關(guān)閉連接釋放資源
if (null != messageConsumer) {
messageConsumer.close();
}
if (null != session) {
session.close();
}
if (null != connection) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
ActiveMQ消息接收方式
同步接收:receive()方法接收消息叫同步接收,就是之前的Demo代碼使用的接收方式.在不使用循環(huán)方法時(shí)接收端代碼執(zhí)行
一次即結(jié)束.
異步接收:使用監(jiān)聽器接收消息君编,這種接收方式叫異步接收,接收端會(huì)一直處于監(jiān)聽狀態(tài),只要有消息產(chǎn)生,即會(huì)接收消息.
下面是異步接收代碼:
package com.bjpowernode.activemq.listener;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Receiver {
/**消息服務(wù)器的連接地址**/
public static final String BROKER_URL = "tcp://192.168.174.129:61616";
public static void main(String[] args) {
Receiver receiver = new Receiver();
receiver.receiveMessage();
}
/**
* 接收消息
*
*/
public void receiveMessage () {
Connection connection = null;
Session session = null;
MessageConsumer messageConsumer = null;
try {
//1.創(chuàng)建一個(gè)連接工廠
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
//2.創(chuàng)建一個(gè)連接
connection = connectionFactory.createConnection();
//3.創(chuàng)建一個(gè)Session
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//4.創(chuàng)建一個(gè)目的地
Destination destination = session.createQueue("myQueue");
//5.創(chuàng)建一個(gè)消息的消費(fèi)者(接收者)
messageConsumer = session.createConsumer(destination);
//接收消息之前,需要把連接啟動(dòng)一下
connection.start();
//6.接收消息 同步接收
//Message message = messageConsumer.receive();
//異步接收川慌,使用監(jiān)聽器接收消息
messageConsumer.setMessageListener(new MessageListener(){
public void onMessage(Message message) {
//判斷消息的類型
if (message instanceof TextMessage) { //判斷是否是文本消息
String text = null;
try {
text = ((TextMessage) message).getText();
} catch (JMSException e) {
e.printStackTrace();
}
System.out.println("接收到的消息內(nèi)容是:" + text);
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {
/*try {
//關(guān)閉連接釋放資源
if (null != messageConsumer) {
messageConsumer.close();
}
if (null != session) {
session.close();
}
if (null != connection) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}*/
}
}
}