ActiveMQ

什么是activeMQ

activeMQ是一種開源的移层,實(shí)現(xiàn)了JMS1.1規(guī)范的,面向消息(MOM)的中間件赫粥,為應(yīng)用程序提供高效的观话、可擴(kuò)展的、穩(wěn)定的和安全的企業(yè)級(jí)消息通信越平。

ActiveMQ的作用以及原理

Activemq 的作用就是系統(tǒng)之間進(jìn)行通信剃毒。 當(dāng)然可以使用其他方式進(jìn)行系統(tǒng)間通信暴备, 如果使用 Activemq 的話可以對(duì)系統(tǒng)之間的調(diào)用進(jìn)行解耦, 實(shí)現(xiàn)系統(tǒng)間的異步通信。 原理就是生產(chǎn)者生產(chǎn)消息胰伍, 把消息發(fā)送給activemq台汇。 Activemq 接收到消息衷咽, 然后查看有多少個(gè)消費(fèi)者悬嗓, 然后把消息轉(zhuǎn)發(fā)給消費(fèi)者, 此過程中生產(chǎn)者無需參與。 消費(fèi)者接收到消息后做相應(yīng)的處理和生產(chǎn)者沒有任何關(guān)系

activemq的幾種通信方式

(1)publish(發(fā)布)-subscribe(訂閱)(發(fā)布-訂閱方式)

發(fā)布/訂閱方式用于多接收客戶端的方式.作為發(fā)布訂閱的方式舟肉,可能存在多個(gè)接收客戶端修噪,并且接收端客戶端與發(fā)送客戶端存在時(shí)間上的依賴。一個(gè)接收端只能接收他創(chuàng)建以后發(fā)送客戶端發(fā)送的信息路媚。作為subscriber ,在接收消息時(shí)有兩種方法割按,destination的receive方法,和實(shí)現(xiàn)message listener 接口的onMessage 方法磷籍。

(2)p2p(point-to-point)(點(diǎn)對(duì)點(diǎn))

p2p的過程則理解起來比較簡(jiǎn)單适荣。它好比是兩個(gè)人打電話,這兩個(gè)人是獨(dú)享這一條通信鏈路的院领。一方發(fā)送消息弛矛,另外一方接收,就這么簡(jiǎn)單比然。

ActiveMQ服務(wù)器宕機(jī)怎么辦丈氓?

這得從ActiveMQ的儲(chǔ)存機(jī)制說起。在通常的情況下强法,非持久化消息是存儲(chǔ)在內(nèi)存中的万俗,持久化消息是存儲(chǔ)在文件中的,它們的最大限制在配置文件的<systemUsage>節(jié)點(diǎn)中配置饮怯。但是闰歪,在非持久化消息堆積到一定程度,內(nèi)存告急的時(shí)候蓖墅,ActiveMQ會(huì)將內(nèi)存中的非持久化消息寫入臨時(shí)文件中库倘,以騰出內(nèi)存。雖然都保存到了文件里论矾,但它和持久化消息的區(qū)別是教翩,重啟后持久化消息會(huì)從文件中恢復(fù),非持久化的臨時(shí)文件會(huì)直接刪除贪壳。

解決方案:盡量不要用非持久化消息饱亿,非要用的話,將臨時(shí)文件限制盡可能的調(diào)大闰靴。

丟消息怎么辦彪笼?

這得從java的java.net.SocketException異常說起。簡(jiǎn)單點(diǎn)說就是當(dāng)網(wǎng)絡(luò)發(fā)送方發(fā)送一堆數(shù)據(jù)传黄,然后調(diào)用close關(guān)閉連接之后杰扫。這些發(fā)送的數(shù)據(jù)都在接收者的緩存里队寇,接收者如果調(diào)用read方法仍舊能從緩存中讀取這些數(shù)據(jù)膘掰,盡管對(duì)方已經(jīng)關(guān)閉了連接。但是當(dāng)接收者嘗試發(fā)送數(shù)據(jù)時(shí),由于此時(shí)連接已關(guān)閉识埋,所以會(huì)發(fā)生異常凡伊,這個(gè)很好理解。不過需要注意的是窒舟,當(dāng)發(fā)生SocketException后系忙,原本緩存區(qū)中數(shù)據(jù)也作廢了,此時(shí)接收者再次調(diào)用read方法去讀取緩存中的數(shù)據(jù)惠豺,就會(huì)報(bào)Software caused connection abort: recv failed錯(cuò)誤银还。

通過抓包得知,ActiveMQ會(huì)每隔10秒發(fā)送一個(gè)心跳包洁墙,這個(gè)心跳包是服務(wù)器發(fā)送給客戶端的蛹疯,用來判斷客戶端死沒死。如果你看過上面第一條热监,就會(huì)知道非持久化消息堆積到一定程度會(huì)寫到文件里捺弦,這個(gè)寫的過程會(huì)阻塞所有動(dòng)作,而且會(huì)持續(xù)20到30秒孝扛,并且隨著內(nèi)存的增大而增大列吼。當(dāng)客戶端發(fā)完消息調(diào)用connection.close()時(shí),會(huì)期待服務(wù)器對(duì)于關(guān)閉連接的回答苦始,如果超過15秒沒回答就直接調(diào)用socket層的close關(guān)閉tcp連接了寞钥。這時(shí)客戶端發(fā)出的消息其實(shí)還在服務(wù)器的緩存里等待處理,不過由于服務(wù)器心跳包的設(shè)置陌选,導(dǎo)致發(fā)生了java.net.SocketException異常凑耻,把緩存里的數(shù)據(jù)作廢了,沒處理的消息全部丟失柠贤。

解決方案:用持久化消息香浩,或者非持久化消息及時(shí)處理不要堆積,或者啟動(dòng)事務(wù)臼勉,啟動(dòng)事務(wù)后邻吭,commit()方法會(huì)負(fù)責(zé)任的等待服務(wù)器的返回,也就不會(huì)關(guān)閉連接導(dǎo)致消息丟失了宴霸。

持久化消息非常慢

默認(rèn)的情況下囱晴,非持久化的消息是異步發(fā)送的,持久化的消息是同步發(fā)送的瓢谢,遇到慢一點(diǎn)的硬盤畸写,發(fā)送消息的速度是無法忍受的。但是在開啟事務(wù)的情況下氓扛,消息都是異步發(fā)送的枯芬,效率會(huì)有2個(gè)數(shù)量級(jí)的提升论笔。所以在發(fā)送持久化消息時(shí),請(qǐng)務(wù)必開啟事務(wù)模式千所。其實(shí)發(fā)送非持久化消息時(shí)也建議開啟事務(wù)狂魔,因?yàn)楦静粫?huì)影響性能。

消息的不均勻消費(fèi)

有時(shí)在發(fā)送一些消息之后淫痰,開啟2個(gè)消費(fèi)者去處理消息最楷。會(huì)發(fā)現(xiàn)一個(gè)消費(fèi)者處理了所有的消息,另一個(gè)消費(fèi)者根本沒收到消息待错。原因在于ActiveMQ的prefetch機(jī)制籽孙。當(dāng)消費(fèi)者去獲取消息時(shí),不會(huì)一條一條去獲取火俄,而是一次性獲取一批蚯撩,默認(rèn)是1000條。這些預(yù)獲取的消息烛占,在還沒確認(rèn)消費(fèi)之前胎挎,在管理控制臺(tái)還是可以看見這些消息的,但是不會(huì)再分配給其他消費(fèi)者忆家,此時(shí)這些消息的狀態(tài)應(yīng)該算作“已分配未消費(fèi)”犹菇,如果消息最后被消費(fèi),則會(huì)在服務(wù)器端被刪除芽卿,如果消費(fèi)者崩潰揭芍,則這些消息會(huì)被重新分配給新的消費(fèi)者。但是如果消費(fèi)者既不消費(fèi)確認(rèn)卸例,又不崩潰称杨,那這些消息就永遠(yuǎn)躺在消費(fèi)者的緩存區(qū)里無法處理。更通常的情況是筷转,消費(fèi)這些消息非常耗時(shí)姑原,你開了10個(gè)消費(fèi)者去處理,結(jié)果發(fā)現(xiàn)只有一臺(tái)機(jī)器吭哧吭哧處理呜舒,另外9臺(tái)啥事不干锭汛。

解決方案:將prefetch設(shè)為1,每次處理1條消息袭蝗,處理完再去取唤殴,這樣也慢不了多少。

死信隊(duì)列

如果你想在消息處理失敗后到腥,不被服務(wù)器刪除朵逝,還能被其他消費(fèi)者處理或重試,可以關(guān)閉AUTO_ACKNOWLEDGE乡范,將ack交由程序自己處理配名。那如果使用了AUTO_ACKNOWLEDGE啤咽,消息是什么時(shí)候被確認(rèn)的,還有沒有阻止消息確認(rèn)的方法段誊?有闰蚕!

消費(fèi)消息有2種方法栈拖,一種是調(diào)用consumer.receive()方法连舍,該方法將阻塞直到獲得并返回一條消息。這種情況下涩哟,消息返回給方法調(diào)用者之后就自動(dòng)被確認(rèn)了索赏。另一種方法是采用listener回調(diào)函數(shù),在有消息到達(dá)時(shí)贴彼,會(huì)調(diào)用listener接口的onMessage方法潜腻。在這種情況下,在onMessage方法執(zhí)行完畢后器仗,消息才會(huì)被確認(rèn)融涣,此時(shí)只要在方法中拋出異常,該消息就不會(huì)被確認(rèn)精钮。那么問題來了威鹿,如果一條消息不能被處理,會(huì)被退回服務(wù)器重新分配轨香,如果只有一個(gè)消費(fèi)者忽你,該消息又會(huì)重新被獲取,重新拋異常臂容。就算有多個(gè)消費(fèi)者科雳,往往在一個(gè)服務(wù)器上不能處理的消息,在另外的服務(wù)器上依然不能被處理脓杉。難道就這么退回--獲取--報(bào)錯(cuò)死循環(huán)了嗎糟秘?

在重試6次后,ActiveMQ認(rèn)為這條消息是“有毒”的球散,將會(huì)把消息丟到死信隊(duì)列里蚌堵。如果你的消息不見了,去ActiveMQ.DLQ里找找沛婴,說不定就躺在那里吼畏。

java消息服務(wù)

不同系統(tǒng)之間的信息交換,是我們開發(fā)中比較常見的場(chǎng)景嘁灯,比如系統(tǒng)A要把數(shù)據(jù)發(fā)送給系統(tǒng)B泻蚊,這個(gè)問題我們應(yīng)該如何去處理? 1999年丑婿,原來的SUN公司領(lǐng)銜提出了一種面向消息的中間件服務(wù)--JMS規(guī)范(標(biāo)準(zhǔn))性雄;常用的幾種信息交互技術(shù)(httpClient没卸、hessian、dubbo秒旋、jms约计、webservice 五種).

JMS概述

JMS即Java消息服務(wù)(Java Message Service的簡(jiǎn)稱),是Java EE 的標(biāo)準(zhǔn)/規(guī)范之一迁筛。這種規(guī)范(標(biāo)準(zhǔn))指出:消息的發(fā)送應(yīng)該是異步的煤蚌、非阻塞的。也就是說消息的發(fā)送者發(fā)送完消息后就直接返回了细卧,不需要等待接收者返回后才能返回尉桩,發(fā)送者和接收者可以說是互不影響。所以這種規(guī)范(標(biāo)準(zhǔn))能夠減輕或消除系統(tǒng)瓶頸贪庙,實(shí)現(xiàn)系統(tǒng)之間去除耦合蜘犁,提高系統(tǒng)的整體可伸縮性和靈活性。JMS只是Java EE中定義的一組標(biāo)準(zhǔn)API止邮,它自身并不是一個(gè)消息服務(wù)系統(tǒng)这橙,它是消息傳送服務(wù)的一個(gè)抽象,也就是說它定義了消息傳送的接口而并沒有具體實(shí)現(xiàn)导披。

ActiveMQ概述:

我們知道JMS只是消息服務(wù)的一組規(guī)范和接口屈扎,并沒有具體的實(shí)現(xiàn),而ActiveMQ就是JMS規(guī)范的具體實(shí)現(xiàn)盛卡;它是Apache下的一個(gè)項(xiàng)目助隧,采用Java語言開發(fā);是一款非常流行的開源消息服務(wù)器.

ActiveMQ與JMS關(guān)系

我們知道滑沧,JMS只是定義了一組有關(guān)消息傳送的規(guī)范和標(biāo)準(zhǔn)并村,并沒有真正實(shí)現(xiàn),也就說JMS只是定義了一組接口而已滓技;就像JDBC抽象了關(guān)系數(shù)據(jù)庫訪問哩牍、JPA抽象了對(duì)象與關(guān)系數(shù)據(jù)庫映射、JNDI抽象了命名目錄服務(wù)訪問一樣令漂,JMS具體的實(shí)現(xiàn)由不同的消息中間件廠商提供膝昆,比如Apache ActiveMQ就是JMS規(guī)范的具體實(shí)現(xiàn),Apache ActiveMQ才是一個(gè)消息服務(wù)系統(tǒng)叠必,而JMS不是荚孵。

基本要素

1、生產(chǎn)者producer ; 2纬朝、消費(fèi)者consumer ; 3收叶、消息服務(wù)broker

JMS兩種消息傳送模式

點(diǎn)對(duì)點(diǎn)( Point-to-Point):專門用于使用隊(duì)列Queue傳送消息;基于隊(duì)列Queue的點(diǎn)對(duì)點(diǎn)消息只能被一個(gè)消費(fèi)者消費(fèi)共苛,如多個(gè)消費(fèi)者都注冊(cè)到同一個(gè)消息隊(duì)列上判没,當(dāng)生產(chǎn)者發(fā)送一條消息后蜓萄,而只有其中一個(gè)消費(fèi)者會(huì)接收到該消息,而不是所有消費(fèi)者都能接收到該消息澄峰。

發(fā)布/訂閱(Publish/Subscribe):專門用于使用主題Topic傳送消息嫉沽。基于主題的發(fā)布與訂閱消息能被多個(gè)消費(fèi)者消費(fèi)俏竞,生產(chǎn)者發(fā)送的消息绸硕,所有訂閱了該topic的消費(fèi)者都能接收到。

JMS API 概覽

JMS API可以分為3個(gè)主要部分:

1胞此、公共API:可用于向一個(gè)隊(duì)列或主題發(fā)送消息或從其中接收消息臣咖;

2跃捣、點(diǎn)對(duì)點(diǎn)API:專門用于使用隊(duì)列Queue傳送消息漱牵;

3、發(fā)布/訂閱API:專門用于使用主題Topic傳送消息疚漆。

JMS公共API

在JMS公共API內(nèi)部酣胀,和發(fā)送與接收消息有關(guān)的JMS API接口主要是:ConnectionFactory / Connection / Session / Message / Destination / MessageProducer / MessageConsumer . 它們的關(guān)系是:一旦有了ConnectionFactory,就可以創(chuàng)建Connection娶聘,一旦有了Connection闻镶,就可以創(chuàng)建Session,而一旦有了Session丸升,就可以創(chuàng)建 Message 铆农、MessageProducer 和 MessageConsumer 。

JMS點(diǎn)對(duì)點(diǎn)API

點(diǎn)對(duì)點(diǎn)(p2p)消息傳送模型API是指JMS API之內(nèi)基于隊(duì)列(Queue)的接口:QueueConnectionFactory / QueueConnection / QueueSession / Message / Queue / QueueSender / QueueReceiver .

從接口的命名可以看出狡耻,大多數(shù)接口名稱僅僅是在公共API接口

名稱之前添加Queue一詞墩剖。一般來說,使用點(diǎn)對(duì)點(diǎn)消息傳送模型的應(yīng)用程序?qū)⑹褂没陉?duì)列的API夷狰,而不使用公共API 岭皂。

JMS發(fā)布/訂閱API發(fā)布/訂閱消息傳送模型API是指JMS

API之內(nèi)基于主題(Topic)的接口:TopicConnectionFactory / TopicConnection /

TopicSession / Message / Topic / TopicPublisher / TopicSubscriber . 由于基于主題(Topic)的JMS API類似于基于隊(duì)列(Queue)

的API,因此在大多數(shù)情況下沼头,Queue這個(gè)詞會(huì)由Topic取代爷绘。

ActiveMQ點(diǎn)對(duì)點(diǎn)發(fā)送與接收消息示例

簡(jiǎn)單示例

發(fā)送

package com.kinglong.activemq.queue; 

import org.apache.activemq.ActiveMQConnectionFactory; 

import javax.jms.*; 

/** 
* 消息發(fā)送者 
* 
*/ 
public class Sender { 

/**消息服務(wù)器的連接地址**/ 
public static final String BROKER_URL = "tcp://192.168.174.129:61616"; 

public static void main(String[] args) { 
Sender sender = new Sender(); 
sender.sendMessage("Hello ActiveMQ."); 
} 

/** 
* 發(fā)送消息 
* 
* @param msg 
*/ 
public void sendMessage (String msg) { 

Connection connection = null; 
Session session = null; 
MessageProducer messageProducer = null; 

try { 
//1.創(chuàng)建一個(gè)連接工廠 
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); 

//2.創(chuàng)建一個(gè)連接 
connection = connectionFactory.createConnection(); 

//3.創(chuàng)建一個(gè)Session 
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 

//4.創(chuàng)建消息,此處創(chuàng)建了一個(gè)文本消息 
Message message = session.createTextMessage(msg); 

//5.創(chuàng)建一個(gè)目的地 
Destination destination = session.createQueue("myQueue"); 

//6.創(chuàng)建一個(gè)消息的生產(chǎn)者(發(fā)送者) 
messageProducer = session.createProducer(destination); 

//7.發(fā)送消息 
messageProducer.send(message); 

} catch (JMSException e) { 
e.printStackTrace(); 
} finally { 
try { 
//關(guān)閉連接釋放資源 
if (null != messageProducer) { 
messageProducer.close(); 
} 
if (null != session) { 
session.close(); 
} 
if (null != connection) { 
connection.close(); 
} 
} catch (JMSException e) { 
e.printStackTrace(); 
} 
} 
} 
}

接收

package com.kinglong.activemq.queue; 

import org.apache.activemq.ActiveMQConnectionFactory; 

import javax.jms.*; 

public class Receiver { 

/**消息服務(wù)器的連接地址**/ 
public static final String BROKER_URL = "tcp://192.168.174.129:61616"; 

public static void main(String[] args) { 
Receiver receiver = new Receiver(); 
receiver.receiveMessage(); 
} 

/** 
* 接收消息 
* 
*/ 
public void receiveMessage () { 

Connection connection = null; 
Session session = null; 
MessageConsumer messageConsumer = null; 

try { 
//1.創(chuàng)建一個(gè)連接工廠 
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); 

//2.創(chuàng)建一個(gè)連接 
connection = connectionFactory.createConnection(); 

//3.創(chuàng)建一個(gè)Session 
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 

//4.創(chuàng)建一個(gè)目的地 
Destination destination = session.createQueue("myQueue"); 

//5.創(chuàng)建一個(gè)消息的消費(fèi)者(接收者) 
messageConsumer = session.createConsumer(destination); 

//接收消息之前进倍,需要把連接啟動(dòng)一下 
connection.start(); 

//6.接收消息 
Message message = messageConsumer.receive(); 

//判斷消息的類型 
if (message instanceof TextMessage) { //判斷是否是文本消息 
String text = ((TextMessage) message).getText(); 
System.out.println("接收到的消息內(nèi)容是:" + text); 
} 
} catch (JMSException e) { 
e.printStackTrace(); 
} finally { 
try { 
//關(guān)閉連接釋放資源 
if (null != messageConsumer) { 
messageConsumer.close(); 
} 
if (null != session) { 
session.close(); 
} 
if (null != connection) { 
connection.close(); 
} 
} catch (JMSException e) { 
e.printStackTrace(); 
} 
} 
} 
}

總結(jié)

1. session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 
// 其中:Boolean.FALSE表示本次會(huì)話不開啟事務(wù)管理,假如需要開啟事務(wù)管理,將其改為Boolean.TRUE即可 
//同時(shí)需要在發(fā)送消息后添加session.commit(),否則,消息是不會(huì)被提交的. 
//Session.AUTO_ACKNOWLEDGE表示消息確認(rèn)機(jī)制 
AUTO_ACKNOWLEDGE:自動(dòng)確認(rèn) 
CLIENT_ACKNOWLEDGE:客戶端確認(rèn) 
SESSION_TRANSACTED:事務(wù)確認(rèn),如果使用事務(wù)推薦使用該確認(rèn)機(jī)制 
AUTO_ACKNOWLEDGE:懶散式確認(rèn),消息偶爾不會(huì)被確認(rèn),也就是消息可能會(huì)被重復(fù)發(fā)送.但發(fā)生的概率很小 
2. connection.start(); 
//在消息接收端,接受消息前需要加入這段代碼,開啟連接,否則一樣無法獲取消息. 
3. Destination destination = session.createQueue("myQueue"); 
//創(chuàng)建目的地時(shí),如果做測(cè)試收不到信息,可以將目的地名稱修改一下,我用的是IDEA,不清楚為何, 
//有時(shí)候收不到信息,修改一下就好了,猜測(cè)可能是緩存的原因吧

發(fā)布與訂閱的topic方式實(shí)際與點(diǎn)對(duì)點(diǎn)的queue方式,代碼通用很多,只是在創(chuàng)建目的地Destination時(shí)候創(chuàng)建為

Destination destination = session.createTopic("myTopic

Queue與Topic比較

比較項(xiàng) 隊(duì)列 主題
概要說明 point-to-ponit點(diǎn)對(duì)點(diǎn)小欻 publish/subscribe發(fā)布訂閱消息
消息完整性保障 隊(duì)列保證每條數(shù)據(jù)都能被receiver接收 不保證publisher發(fā)布的每條數(shù)據(jù)土至,subscriber都能接收到
消息是否會(huì)丟失 sender發(fā)送消息到目標(biāo)隊(duì)列,receiver可以同步或者異步接收到這個(gè)隊(duì)列上的消息猾昆,隊(duì)列上的消息如果暫時(shí)沒有被receiver接收到陶因,也不會(huì)丟失 publisher發(fā)布消息到目標(biāo)主題時(shí),只有正在監(jiān)聽該主題的subscriber能夠接收到消息毡庆,如果目標(biāo)topic沒有subscriber在監(jiān)聽坑赡,該主題上的消息就會(huì)丟失烙如。
消息傳送策略 一對(duì)一的消息發(fā)布接受策略,一個(gè)sender發(fā)送的消息毅否,只能有一個(gè)receiver接收亚铁,receiver接收到后,通知mq服務(wù)器已接收螟加,mq服務(wù)器對(duì)隊(duì)列里的消息采取刪除或其他操作徘溢。 一對(duì)多的消息發(fā)布接收策略,監(jiān)聽同一個(gè)主題地址的多個(gè)subscriber都能接收到publisher發(fā)送的消息捆探,subscriber接收完通知mq服務(wù)器然爆。

拉模式與推模式

a.點(diǎn)對(duì)點(diǎn)消息,如果沒有消費(fèi)者在監(jiān)聽隊(duì)列黍图,消息將保留在隊(duì)列中曾雕,直至消息消費(fèi)者連接到隊(duì)列為止。這種消息傳遞模型是

傳統(tǒng)意義上的懶模型或輪詢模型助被。在此模型中剖张,消息不是自動(dòng)推動(dòng)給消息消費(fèi)者的,而是要由消息消費(fèi)者從隊(duì)列中請(qǐng)求獲得(拉模式)揩环。

b.pub/sub消息傳遞模型基本上是一個(gè)推模型搔弄。在該模型中,消息會(huì)自動(dòng)廣播丰滑,消息消費(fèi)者無須通過主動(dòng)請(qǐng)求或輪詢主題的方法來獲得新的消息顾犹。

ActiveMQ消息類型

1、TextMessage 文本消息:攜帶一個(gè)java.lang.String作為有效數(shù)據(jù)(負(fù)載)的消息褒墨,可用于字符串類型的信息交換炫刷;

2、ObjectMessage 對(duì)象消息:攜帶一個(gè)可以序列化的Java對(duì)象作為有效負(fù)載的消息貌亭,可用于Java對(duì)象類型的信息交換柬唯;

3、MapMessage 映射消息:攜帶一組鍵值對(duì)的數(shù)據(jù)作為有效負(fù)載的消息圃庭,有效數(shù)據(jù)值必須是Java原始數(shù)據(jù)類型(或者它們的包裝類)及String锄奢。即:byte , short , int , long , float , double , char , boolean , String

4、BytesMessage 字節(jié)消息 :攜帶一組原始數(shù)據(jù)類型的字節(jié)流作為有效負(fù)載的消息剧腻;

5拘央、StreamMessage 流消息:攜帶一個(gè)原始數(shù)據(jù)類型流作為有效負(fù)載的消息,它保持了寫入流時(shí)的數(shù)據(jù)類型书在,寫入什么類型灰伟,則讀取也需要是相同的類型;

ActiveMQ事務(wù)消息和非事務(wù)消息

消息分為事務(wù)消息和非事務(wù)消息

1、事務(wù)消息:創(chuàng)建會(huì)話Session使用transacted=true

connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

2栏账、非事務(wù)消息:創(chuàng)建會(huì)話Session使用transacted=false

connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

事務(wù)消息必須在發(fā)送和接收完消息后顯式地調(diào)用session.commit();

事務(wù)性消息帖族,不管設(shè)置何種消息確認(rèn)模式,都會(huì)自動(dòng)被確認(rèn)挡爵;與設(shè)置的確認(rèn)機(jī)制無關(guān),但官方推薦事務(wù)性消息使用事務(wù)確認(rèn)機(jī)制.

ActiveMQ消息確認(rèn)機(jī)制

消息只有在被確認(rèn)之后竖般,才認(rèn)為已經(jīng)被成功消費(fèi),然后消息才會(huì)從隊(duì)列或主題中刪除茶鹃。消息的成功消費(fèi)通常包含三個(gè)階段:


(1)涣雕、客戶接收消息;(2)闭翩、客戶處理消息;(3)挣郭、消息被確認(rèn);

確認(rèn)機(jī)制前面提過一下,共有四種:

(1)疗韵、Session.AUTO_ACKNOWLEDGE兑障;客戶(消費(fèi)者)成功從receive方法返回時(shí),或者從MessageListener.onMessage方法成功返回時(shí)伶棒,會(huì)話自動(dòng)確認(rèn)消息,然后自動(dòng)刪除消息.

(2)旺垒、Session.CLIENT_ACKNOWLEDGE彩库;客戶通過顯式調(diào)用消息的acknowledge方法確認(rèn)消息,肤无。 即在接收端調(diào)用message.acknowledge();方法,否則,消息是不會(huì)被刪除的.

(3)、Session. DUPS_OK_ACKNOWLEDGE 骇钦;不是必須確認(rèn)宛渐,是一種“懶散的”消息確認(rèn),消息可能會(huì)重復(fù)發(fā)送眯搭,在第二次重新傳送消息時(shí)窥翩,消息頭的JMSRedelivered會(huì)被置為true標(biāo)識(shí)當(dāng)前消息已經(jīng)傳送過一次,客戶端需要進(jìn)行消息的重復(fù)處理控制鳞仙。

(4)寇蚊、 Session.SESSION_TRANSACTED;事務(wù)提交并確認(rèn)棍好。

ActiveMQ持久化消息與非持久化消息

messageProducer.setDeliveryMode(DeliveryMode. NON_PERSISTENT);//不持久化
messageProducer.setDeliveryMode(DeliveryMode.
PERSISTENT);//持久化的仗岸,當(dāng)然activemq發(fā)送消息默認(rèn)都是持久化的

說明:

設(shè)置完后,如果為持久化,那么消息在沒有被消費(fèi)前都會(huì)被寫入本地磁盤kahadb文件中保存起來,即使服務(wù)器宕機(jī),也不會(huì)影響

消息.如果是非持久化的,那么,服務(wù)一旦宕機(jī)之類的情況發(fā)生,消息即會(huì)被刪除.

ActiveMQ默認(rèn)是持久化的.

ActiveMQ消息過濾

ActiveMQ提供了一種機(jī)制,可根據(jù)消息選擇器中的標(biāo)準(zhǔn)來執(zhí)行消息過濾借笙,只接收符合過濾標(biāo)準(zhǔn)的消息扒怖;

生產(chǎn)者可在消息中放入特有的標(biāo)志,而消費(fèi)者使用基于這些特定的標(biāo)志來接收消息业稼;

1盗痒、發(fā)送消息放入特殊標(biāo)志:message . setString Property ( name , value ) ;

2、接收消息使用基于特殊標(biāo)志的消息選擇器:

MessageConsumer createConsumer(Destination destination, String messageSelector);

注:消息選擇器是一個(gè)字符串低散,語法與數(shù)據(jù)庫的SQL相似俯邓,相當(dāng)于SQL語句where條件后面的內(nèi)容骡楼;

具體代碼如下:

發(fā)送端代碼:

package com.bjpowernode.activemq.selector; 

import org.apache.activemq.Active

MQConnectionFactory; 

import javax.jms.*; 

/** 
* 消息發(fā)送者 
* 
*/ 
public class Sender { 

/**消息服務(wù)器的連接地址**/ 
public static final String BROKER_URL = "tcp://192.168.174.129:61616"; 

public static void main(String[] args) { 
Sender sender = new Sender(); 
sender.sendMessage("Hello ActiveMQ."); 
} 

/** 
* 發(fā)送消息 
* 
* @param msg 
*/ 
public void sendMessage (String msg) { 

Connection connection = null; 
Session session = null; 
MessageProducer messageProducer = null; 

try { 
//1.創(chuàng)建一個(gè)連接工廠 
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); 

//2.創(chuàng)建一個(gè)連接 
connection = connectionFactory.

createConnection(); 

//3.創(chuàng)建一個(gè)Session 
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 



//5.創(chuàng)建一個(gè)目的地 
Destination destination = session.create

Queue("myQueue"); 

//6.創(chuàng)建一個(gè)消息的生產(chǎn)者(發(fā)送者) 
messageProducer = session.createProducer

(destination); 

//設(shè)置發(fā)送的消息是否需要持久化 
messageProducer.setDeliveryMode(Delivery

Mode.NON_PERSISTENT);//這里使用不持久化 

//創(chuàng)建一個(gè)循環(huán),測(cè)試消息標(biāo)識(shí)的使用 
for (int i=0; i<20; i++) { 
//4.創(chuàng)建消息,此處創(chuàng)建了一個(gè)文本消息 
Message message = session.createText

Message(msg+i); 

//將消息設(shè)置一個(gè)特有的標(biāo)識(shí) 
message.setIntProperty("id", i); 

//7.發(fā)送消息 
messageProducer.send(message); 
} 
} catch (JMSException e) { 
e.printStackTrace(); 
} finally { 
try { 
//關(guān)閉連接釋放資源 
if (null != messageProducer) { 
messageProducer.close(); 
} 
if (null != session) { 
session.close(); 
} 
if (null != connection) { 
connection.close(); 
} 
} catch (JMSException e) { 
e.printStackTrace(); 
} 
} 
} 
}

接收端代碼:

package com.bjpowernode.activemq.selector; 

import org.apache.activemq.ActiveMQ

ConnectionFactory; 

import javax.jms.*; 

public class Receiver { 

/**消息服務(wù)器的連接地址**/ 
public static final String BROKER_URL = "tcp://192.168.174.129:61616"; 

public static void main(String[] args) { 
Receiver receiver = new Receiver(); 
receiver.receiveMessage(); 
} 

/** 
* 接收消息 
* 
*/ 
public void receiveMessage () { 

Connection connection = null; 
Session session = null; 
MessageConsumer messageConsumer = null; 

try { 
//1.創(chuàng)建一個(gè)連接工廠 
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); 

//2.創(chuàng)建一個(gè)連接 
connection = connectionFactory.

createConnection(); 

//3.創(chuàng)建一個(gè)Session 
session = connection.createSession

(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 

//4.創(chuàng)建一個(gè)目的地 
Destination destination = session.createQueue("myQueue"); 

//5.創(chuàng)建一個(gè)消息的消費(fèi)者(接收者),

selector即為消息選擇器,通過選擇需要的標(biāo)識(shí),過濾消息接受id為10-15之 //間的消息 
String selector = "id >=10 and id<=15"; 
messageConsumer = session.createConsumer

(destination, selector); 

//接收消息之前稽鞭,需要把連接啟動(dòng)一下 
connection.start(); 

while (true) { 
//6.接收消息 同步接收 
Message message = messageConsumer.

receive(); 

//判斷消息的類型 
if (message instanceof TextMessage)

{ //判斷是否是文本消息 
String text = ((TextMessage) message).getText(); 
System.out.println("接收到的消息內(nèi)容是:" + text); 
} 
} 
} catch (JMSException e) { 
e.printStackTrace(); 
} finally { 
try { 
//關(guān)閉連接釋放資源 
if (null != messageConsumer) { 
messageConsumer.close(); 
} 
if (null != session) { 
session.close(); 
} 
if (null != connection) { 
connection.close(); 
} 
} catch (JMSException e) { 
e.printStackTrace(); 
} 
} 
} 
}

ActiveMQ消息接收方式

同步接收:receive()方法接收消息叫同步接收,就是之前的Demo代碼使用的接收方式.在不使用循環(huán)方法時(shí)接收端代碼執(zhí)行

一次即結(jié)束.

異步接收:使用監(jiān)聽器接收消息君编,這種接收方式叫異步接收,接收端會(huì)一直處于監(jiān)聽狀態(tài),只要有消息產(chǎn)生,即會(huì)接收消息.

下面是異步接收代碼:

package com.bjpowernode.activemq.listener; 

import org.apache.activemq.ActiveMQConnectionFactory; 

import javax.jms.*; 

public class Receiver { 

/**消息服務(wù)器的連接地址**/ 
public static final String BROKER_URL = "tcp://192.168.174.129:61616"; 

public static void main(String[] args) { 
Receiver receiver = new Receiver(); 
receiver.receiveMessage(); 
} 

/** 
* 接收消息 
* 
*/ 
public void receiveMessage () { 

Connection connection = null; 
Session session = null; 
MessageConsumer messageConsumer = null; 

try { 
//1.創(chuàng)建一個(gè)連接工廠 
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); 

//2.創(chuàng)建一個(gè)連接 
connection = connectionFactory.createConnection(); 

//3.創(chuàng)建一個(gè)Session 
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 

//4.創(chuàng)建一個(gè)目的地 
Destination destination = session.createQueue("myQueue"); 

//5.創(chuàng)建一個(gè)消息的消費(fèi)者(接收者) 
messageConsumer = session.createConsumer(destination); 

//接收消息之前,需要把連接啟動(dòng)一下 
connection.start(); 

//6.接收消息 同步接收 
//Message message = messageConsumer.receive(); 

//異步接收川慌,使用監(jiān)聽器接收消息 
messageConsumer.setMessageListener(new MessageListener(){ 
public void onMessage(Message message) { 
//判斷消息的類型 
if (message instanceof TextMessage) { //判斷是否是文本消息 
String text = null; 
try { 
text = ((TextMessage) message).getText(); 
} catch (JMSException e) { 
e.printStackTrace(); 
} 
System.out.println("接收到的消息內(nèi)容是:" + text); 
} 
} 
}); 
} catch (JMSException e) { 
e.printStackTrace(); 
} finally { 
/*try { 
//關(guān)閉連接釋放資源 
if (null != messageConsumer) { 
messageConsumer.close(); 
} 
if (null != session) { 
session.close(); 
} 
if (null != connection) { 
connection.close(); 
} 
} catch (JMSException e) { 
e.printStackTrace(); 
}*/ 
} 
} 
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末吃嘿,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子梦重,更是在濱河造成了極大的恐慌兑燥,老刑警劉巖,帶你破解...
    沈念sama閱讀 207,113評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件琴拧,死亡現(xiàn)場(chǎng)離奇詭異降瞳,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)蚓胸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,644評(píng)論 2 381
  • 文/潘曉璐 我一進(jìn)店門挣饥,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人沛膳,你說我怎么就攤上這事扔枫。” “怎么了锹安?”我有些...
    開封第一講書人閱讀 153,340評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵短荐,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我叹哭,道長(zhǎng)忍宋,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,449評(píng)論 1 279
  • 正文 為了忘掉前任风罩,我火速辦了婚禮糠排,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘超升。我一直安慰自己入宦,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,445評(píng)論 5 374
  • 文/花漫 我一把揭開白布廓俭。 她就那樣靜靜地躺著云石,像睡著了一般。 火紅的嫁衣襯著肌膚如雪研乒。 梳的紋絲不亂的頭發(fā)上汹忠,一...
    開封第一講書人閱讀 49,166評(píng)論 1 284
  • 那天,我揣著相機(jī)與錄音,去河邊找鬼宽菜。 笑死谣膳,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的铅乡。 我是一名探鬼主播继谚,決...
    沈念sama閱讀 38,442評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼阵幸!你這毒婦竟也來了花履?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,105評(píng)論 0 261
  • 序言:老撾萬榮一對(duì)情侶失蹤挚赊,失蹤者是張志新(化名)和其女友劉穎诡壁,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體荠割,經(jīng)...
    沈念sama閱讀 43,601評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡妹卿,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,066評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了蔑鹦。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片夺克。...
    茶點(diǎn)故事閱讀 38,161評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖嚎朽,靈堂內(nèi)的尸體忽然破棺而出铺纽,到底是詐尸還是另有隱情,我是刑警寧澤火鼻,帶...
    沈念sama閱讀 33,792評(píng)論 4 323
  • 正文 年R本政府宣布室囊,位于F島的核電站,受9級(jí)特大地震影響魁索,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜盼铁,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,351評(píng)論 3 307
  • 文/蒙蒙 一粗蔚、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧饶火,春花似錦鹏控、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,352評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至鲤看,卻和暖如春缘揪,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,584評(píng)論 1 261
  • 我被黑心中介騙來泰國打工找筝, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蹈垢,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,618評(píng)論 2 355
  • 正文 我出身青樓袖裕,卻偏偏與公主長(zhǎng)得像曹抬,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子急鳄,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,916評(píng)論 2 344

推薦閱讀更多精彩內(nèi)容