1.簡(jiǎn)介
1.1. ActiveMQ 由Apache出品的開源消息總線狼钮。是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn)。
1.2 它的優(yōu)點(diǎn)是:
支持多種語(yǔ)言編寫客戶端围橡;
對(duì)spring的支持,很容易和spring整合 缕贡;
支持多種傳輸協(xié)議:TCP,SSL,NIO,UDP等 翁授;
支持AJAX ;
非常成熟晾咪,功能強(qiáng)大收擦,在早些年業(yè)內(nèi)大量的公司以及項(xiàng)目中都有應(yīng)用 ;
單機(jī)吞吐量:萬(wàn)級(jí)
時(shí)效性:ms級(jí)
可用性:高谍倦,基于主從架構(gòu)實(shí)現(xiàn)高可用性
消息可靠性:有較低的概率丟失數(shù)據(jù)
功能支持:MQ領(lǐng)域的功能極其完備
1.3 它的缺點(diǎn)是
偶爾會(huì)有較低概率丟失消息塞赂;
現(xiàn)在社區(qū)以及國(guó)內(nèi)應(yīng)用都越來(lái)越少,官方社區(qū)現(xiàn)在對(duì)ActiveMQ 5.x維護(hù)越來(lái)越少昼蛀,幾個(gè)月才發(fā)布一個(gè)版本宴猾;
1.4 使用場(chǎng)景
主要是基于解耦和異步來(lái)用的,較少在大規(guī)模吞吐的場(chǎng)景中使用
2.安裝
2.1 下載安裝包
到官網(wǎng)下載:https://activemq.apache.org/components/classic/
2.2 啟動(dòng)activeMQ
解壓縮叼旋,進(jìn)入bin目錄仇哆,啟動(dòng)/停止/查看狀態(tài):./activemq [start|stop|status]
? 啟動(dòng)信息如下
INFO | Apache ActiveMQ 5.15.9 (localhost, ID:LAPTOP-BRJ9A33I-60490-1563860323722-0:1) is starting
INFO | Listening for connections at: tcp://LAPTOP-BRJ9A33I:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600
INFO | Connector openwire started
INFO | Listening for connections at: amqp://LAPTOP-BRJ9A33I:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600
INFO | Connector amqp started
INFO | Listening for connections at: stomp://LAPTOP-BRJ9A33I:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600
INFO | Connector stomp started
INFO | Listening for connections at: mqtt://LAPTOP-BRJ9A33I:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600
INFO | Connector mqtt started
INFO | Starting Jetty server
INFO | Creating Jetty connector
WARN | ServletContext@o.e.j.s.ServletContextHandler@f107c50{/,null,STARTING} has uncovered http methods for path: /
INFO | Listening for connections at ws://LAPTOP-BRJ9A33I:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600
INFO | Connector ws started
INFO | Apache ActiveMQ 5.15.9 (localhost, ID:LAPTOP-BRJ9A33I-60490-1563860323722-0:1) started
INFO | For help or more information please see: http://activemq.apache.org
INFO | No Spring WebApplicationInitializer types detected on classpath
INFO | ActiveMQ WebConsole available at http://0.0.0.0:8161/
INFO | ActiveMQ Jolokia REST API available at http://0.0.0.0:8161/api/jolokia/
INFO | Initializing Spring FrameworkServlet 'dispatcher'
2.3 查看控制臺(tái)
進(jìn)入管理后臺(tái)查看消息:http://ip:8161/admin 用戶名:admin 密碼:admin
3.編碼實(shí)現(xiàn)
3.1 發(fā)送端編碼
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.3</version>
</dependency>
package com.qhs.mq.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class MsgProducer {
public static void main(String[] args) {
testQueue();
testTopic();
}
private static void testQueue() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.137.1:61616");
Connection connection = null;
Session session = null;
MessageProducer producer = null;
try {
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("qhs-queue");
producer = session.createProducer(queue);
TextMessage textMessage = null;
for (int i = 0; i < 50; i++) {
textMessage = session.createTextMessage("字符串queue消息:" + i);
producer.send(textMessage);
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
private static void testTopic() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.137.1:61616");
Connection connection = null;
Session session = null;
MessageProducer producer = null;
try {
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("qhs-topic");
producer = session.createProducer(topic);
TextMessage textMessage = null;
for (int i = 0; i < 50; i++) {
textMessage = session.createTextMessage("字符串topic消息:" + i);
producer.send(textMessage);
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
3.2 接收端編碼
package com.qhs.mq.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class MsgConsumer {
public static void main(String[] args) {
testQueue();
testTopic();
}
public static void testQueue() {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.137.1:61616");
Connection connection = null;
Session session = null;
MessageConsumer queueConsumer = null;
MessageConsumer topicConsumer = null;
try {
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("qhs-queue");
queueConsumer = session.createConsumer(queue);
queueConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println(((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
Topic topic = session.createTopic("qhs-topic");
topicConsumer = session.createConsumer(topic);
topicConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println(((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
queueConsumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
topicConsumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
public static void testTopic() {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.137.1:61616");
Connection connection = null;
Session session = null;
MessageConsumer consumer = null;
try {
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
System.in.read();
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
運(yùn)行程序,先運(yùn)行接收端夫植,再運(yùn)行發(fā)送端即可(topic消息默認(rèn)不持久化)讹剔。
通過(guò)web控制臺(tái)可以查看到對(duì)應(yīng)的消息。
4.一些特征
- 點(diǎn)對(duì)點(diǎn)模式不會(huì)造成重復(fù)消費(fèi)详民,一個(gè)生產(chǎn)者的消息只能給一個(gè)消費(fèi)者消費(fèi)延欠;
- 發(fā)布訂閱模型和點(diǎn)對(duì)點(diǎn)模型不一樣,若發(fā)布消息之前未啟動(dòng)訂閱者阐斜,則訂閱者啟動(dòng)時(shí)衫冻,訂閱者也無(wú)法接收到消息(未持久化狀態(tài))。另外谒出,發(fā)布訂閱模型隅俘,消費(fèi)者是可以重新消費(fèi)的;
- 消息保序方式是broker維持了一個(gè)consumer列表笤喳,對(duì)于同一session的消息取列表中的第一個(gè)來(lái)發(fā)送消息为居;
5.實(shí)現(xiàn)機(jī)制
5.1 可靠性保證
- 自動(dòng)簽收,生產(chǎn)者向隊(duì)列發(fā)送消息后杀狡,只要消費(fèi)者監(jiān)聽了消息隊(duì)列蒙畴,消費(fèi)者將立刻獲得消息,不管消費(fèi)者是否成功取得消息,過(guò)程是否拋出異常導(dǎo)致消費(fèi)者無(wú)法獲得消息膳凝,都不會(huì)觸發(fā)重試機(jī)制碑隆。--沒(méi)有事務(wù)機(jī)制,沒(méi)有補(bǔ)償機(jī)制
- 事務(wù)簽收蹬音,對(duì)于生產(chǎn)者而言上煤,生產(chǎn)者要想向消息隊(duì)列發(fā)送消息,必須提交事務(wù)著淆。對(duì)于消費(fèi)者而言劫狠,如果消費(fèi)沒(méi)有提交事務(wù),則默認(rèn)表示沒(méi)有消費(fèi)永部,會(huì)觸發(fā)重試機(jī)制独泞。-- 雙方事務(wù)提交
- 手動(dòng)簽收,需要消費(fèi)者手動(dòng)簽收苔埋,如果消費(fèi)者沒(méi)有進(jìn)行簽收懦砂,則默認(rèn)消息沒(méi)有被消費(fèi)。--單方事務(wù)提交
代碼范例:
//場(chǎng)景1
//生產(chǎn)者不開啟事務(wù)讲坎,客戶端必須有手動(dòng)簽收模式
Session session = createConnection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
//消費(fèi)者不開啟事務(wù)孕惜,客戶端必須有手動(dòng)簽收模式
TextMessage textMessage = (TextMessage) createConsumer.receive();
//接受消息
textMessage.acknowledge();
//場(chǎng)景2
//生產(chǎn)者不開啟事務(wù),客戶端自動(dòng)簽收模式
Session session = createConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//消費(fèi)者不開啟事務(wù)晨炕,自動(dòng)簽收消息
Session session = createConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//場(chǎng)景3
//在支持事務(wù)的session中衫画,producer發(fā)送message時(shí)在message中帶有transactionID。broker收到message后判斷是否有transactionID瓮栗,如果有就把message保存在transaction store中削罩,等待commit或者rollback消息
//事務(wù)消息 生產(chǎn)者以事務(wù)形式,必須要將消息提交事務(wù)费奸,才可以提交到隊(duì)列中弥激。
Session session = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
session.commit();
//消費(fèi)者 消費(fèi)完后必須提交,不提交生產(chǎn)者不知道消費(fèi)者消費(fèi)了
Session session = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
session.commit();
5.2 事務(wù)機(jī)制
- 消息事務(wù)是在生產(chǎn)者producer到broker或broker到consumer過(guò)程中同一個(gè)session中發(fā)生的,保證幾條消息在發(fā)送過(guò)程中的原子性愿阐。(Broker:消息隊(duì)列核心微服,相當(dāng)于一個(gè)控制中心,負(fù)責(zé)路由消息缨历、保存訂閱和連接以蕴、消息確認(rèn)和控制事務(wù))
- 消息生產(chǎn)者-異步發(fā)送
消息生產(chǎn)者使用持久(persistent)傳遞模式發(fā)送消息的時(shí)候,Producer.send() 方法會(huì)被阻塞辛孵,直到 broker 發(fā)送一個(gè)確認(rèn)消息給生產(chǎn)者(ProducerAck)丛肮,這個(gè)確認(rèn)消息暗示broker已經(jīng)成功接收到消息并把消息保存到二級(jí)存儲(chǔ)中。這個(gè)過(guò)程通常稱為同步發(fā)送魄缚。
如果應(yīng)用程序能夠容忍一些消息的丟失宝与,那么可以使用異步發(fā)送。異步發(fā)送不會(huì)在受到 broker 的確認(rèn)之前一直阻塞 Producer.send 方法。但有一個(gè)例外习劫,當(dāng)發(fā)送方法在一個(gè)事務(wù)上下文中時(shí)咆瘟,被阻塞的是 commit 方法而不是 send 方法。commit 方法成功返回意味著所有的持久消息都以被寫到二級(jí)存儲(chǔ)中诽里。
想要使用異步搞疗,在brokerURL中增加 jms.alwaysSyncSend=false&jms.useAsyncSend=true
如果設(shè)置了alwaysSyncSend=true系統(tǒng)將會(huì)忽略u(píng)seAsyncSend設(shè)置的值都采用同步
- 當(dāng)alwaysSyncSend=false時(shí),“NON_PERSISTENT”(非持久化)须肆、事務(wù)中的消息將使用“異步發(fā)送”;
- 當(dāng)alwaysSyncSend=false時(shí),如果指定了useAsyncSend=true桩皿,“PERSISTENT”類型的消息使用異步發(fā)送豌汇。如果useAsyncSend=false,“PERSISTENT”類型的消息使用同步發(fā)送泄隔。
總結(jié):
- 默認(rèn)情況(alwaysSyncSend=false,useAsyncSend=false)拒贱,非持久化消息、事務(wù)內(nèi)的消息均采用異步發(fā)送佛嬉;對(duì)于持久化消息采用同步發(fā)送逻澳。
jms.sendTimeout:發(fā)送超時(shí)時(shí)間,默認(rèn)等于0暖呕,如果jms.sendTimeout>0將會(huì)忽略(alwaysSyncSend斜做、useAsyncSend、消息是否持久化)所有的消息都是用同步發(fā)送湾揽!- 即使使用異步發(fā)送瓤逼,也可以通過(guò)producerWindowSize來(lái)控制發(fā)送端無(wú)節(jié)制的向broker發(fā)送消息:
producerWindowSize:窗口尺寸,用來(lái)約束在異步發(fā)送時(shí)producer端允許積壓的(尚未ACK)的消息的尺寸库物,且只對(duì)異步發(fā)送有意義霸旗。每次發(fā)送消息之后,都將會(huì)導(dǎo)致memoryUsage尺寸增加(+message.size)戚揭,當(dāng)broker返回producerAck時(shí)诱告,如果達(dá)到了producerWindowSize上限,即使是異步調(diào)用也會(huì)被阻塞民晒,防止不停向broker發(fā)送消息精居。
通過(guò)jms.producerWindowSize=。镀虐。箱蟆。來(lái)設(shè)置
5.3 持久化機(jī)制
ActiveMQ的消息持久化機(jī)制有JDBC,AMQ刮便,KahaDB和LevelDB空猜,無(wú)論使用哪種持久化方式,消息的存儲(chǔ)邏輯都是一致的。在發(fā)送者將消息發(fā)送出去后辈毯,消息中心首先將消息存儲(chǔ)到本地?cái)?shù)據(jù)文件坝疼、內(nèi)存數(shù)據(jù)庫(kù)或者遠(yuǎn)程數(shù)據(jù)庫(kù)等,然后試圖將消息發(fā)送給接收者谆沃,發(fā)送成功則將消息從存儲(chǔ)中刪除钝凶,失敗則繼續(xù)嘗試。消息中心啟動(dòng)以后首先要檢查指定的存儲(chǔ)位置唁影,如果有未發(fā)送成功的消息耕陷,則需要把消息發(fā)送出去。
5.4 消息冪等性
消費(fèi)者保證消息冪等性据沈,不被重復(fù)消費(fèi)
網(wǎng)絡(luò)延遲傳輸中哟沫,會(huì)造成進(jìn)行MQ重試中,在重試過(guò)程中锌介,可能會(huì)造成重復(fù)消費(fèi)
使用全局MessageID判斷消費(fèi)方使用同一個(gè)嗜诀,解決冪等性
activeMQ提供了textMessageId,使用業(yè)務(wù)ID(訂單號(hào))也可以
String jmsMessageId = textMessage.getJMSMessageID();
網(wǎng)絡(luò)延遲環(huán)境下,第二次請(qǐng)求過(guò)來(lái),應(yīng)該使用全局ID判斷該消息是否被使用過(guò)
if(jmsMessageId == redis內(nèi)的id){
//把消息簽收掉,否則將繼續(xù)重試,有些人覺(jué)得難理解,我解釋一下:
textMessage.acknowledge(); //避免第三次重試
// 消息可以重發(fā),但是消息不能做重復(fù)操作,如重復(fù)向數(shù)據(jù)庫(kù)中做插入操作,造成冪等性問(wèn)題,
//所以當(dāng)?shù)诙伟l(fā)送過(guò)來(lái)的時(shí)候,就可能造成重復(fù)提交問(wèn)題,我們使用手動(dòng)提交(業(yè)務(wù)中一般也是使用手動(dòng)提交多)
//手動(dòng)提交可以把重復(fù)發(fā)送的消息從隊(duì)列中移除,那么接下來(lái)就不會(huì)觸發(fā)重試了。
}
//將拿到的消息做業(yè)務(wù)處理,如插入修改操作等...
// 消費(fèi)成功,把jmsMessageId放入redis
5.5 隊(duì)列消息過(guò)濾收取
不同消息可以放到不同的隊(duì)列孔祸,也可以放到一個(gè)隊(duì)列中隆敢,通過(guò)增加消息的屬性,在發(fā)送message的時(shí)候崔慧,可以設(shè)置它的消息屬性值拂蝎,然后在消費(fèi)端建立消費(fèi)隊(duì)列時(shí),傳入過(guò)濾條件即可尊浪。
發(fā)送端:
MapMessage message1 = session.createMapMessage();
message1.setString("name","張1");//設(shè)置了name值
message1.setIntProperty("age",23);//設(shè)置了age屬性
接受端:
MessageConsumer consumer = session.createConsumer(queue,"age > 30");
這樣匣屡,這個(gè)consumer就只消費(fèi)這個(gè)queue中age>30的消息。
5.6 消息保序性
利用Activemq的高級(jí)特性:consumer之獨(dú)有消費(fèi)者(exclusive consumer)
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
這樣會(huì)確保consumer列表中拇涤,始終取一個(gè)主消費(fèi)者來(lái)消費(fèi)捣作。但是缺點(diǎn)在于將會(huì)始終發(fā)往這個(gè)消費(fèi)者,導(dǎo)致無(wú)法負(fù)載均衡鹅士。比如券躁,訂單消息,希望一個(gè)訂單內(nèi)的消息是保序的掉盅,但各個(gè)訂單之間可以并發(fā)也拜,這就需要用到下文消息組 messageGroup 的概念。
Message Groups特性是一種負(fù)載均衡的機(jī)制趾痘。在一個(gè)消息被分發(fā)到consumer之前慢哈,broker首先檢查消息JMSXGroupID屬性。如果存在永票,那么broker會(huì)檢查是否有某個(gè)consumer擁有這個(gè)message group卵贱。如果沒(méi)有滥沫,那么broker會(huì)選擇一個(gè)consumer,并將它關(guān)聯(lián)到這個(gè)message group键俱。此后兰绣,這個(gè)consumer會(huì)接收這個(gè)message group的所有消息,直到:
- Consumer被關(guān)閉
- Message group被關(guān)閉编振,通過(guò)發(fā)送一個(gè)消息缀辩,并設(shè)置這個(gè)消息的JMSXGroupSeq為-1
例如:
message.setStringProperty("JMSXGroupID", "constact-20100000002");
.....
message.setIntProperty("JMSXGroupSeq", -1);
5.6 死信隊(duì)列
在使用Message Queue的過(guò)程中,總會(huì)由于種種原因而導(dǎo)致消息失敗踪央。一個(gè)經(jīng)典的場(chǎng)景是一個(gè)生成者向Queue中發(fā)消息臀玄,里面包含了一組郵件地址和郵件內(nèi)容。而消費(fèi)者從Queue中將消息一條條讀出來(lái)畅蹂,向指定郵件地址發(fā)送郵件镐牺。消費(fèi)者在發(fā)送消息的過(guò)程中由于種種原因會(huì)導(dǎo)致失敗,比如網(wǎng)絡(luò)超時(shí)魁莉、當(dāng)前郵件服務(wù)器不可用等。這樣我們就希望建立一種機(jī)制募胃,對(duì)于未發(fā)送成功的郵件再重新發(fā)送旗唁,也就是重新處理。重新處理超過(guò)一定次數(shù)還不成功痹束,就放棄對(duì)該消息的處理检疫,記錄下來(lái),繼續(xù)對(duì)剩余消息進(jìn)行處理祷嘶。
ActiveMQ為我們實(shí)現(xiàn)了這一功能屎媳,叫做ReDelivery(重新投遞)。當(dāng)消費(fèi)者在處理消息時(shí)有異常發(fā)生论巍,會(huì)將消息重新放回Queue里烛谊,進(jìn)行下一次處理。當(dāng)超過(guò)重試次數(shù)時(shí)嘉汰,會(huì)給broker發(fā)送一個(gè)"Poison ack"丹禀,這個(gè)消息被認(rèn)為是a poison pill(毒丸),這時(shí)broker會(huì)將這個(gè)消息發(fā)送到DLQ鞋怀。
在以下四種情況中双泪,ActiveMQ消息會(huì)被重發(fā)給客戶端/消費(fèi)者:
- 在一個(gè)事務(wù)session中,并且調(diào)用了session.rollback()方法密似。
- 在一個(gè)事務(wù)session中焙矛,session.commit()之前調(diào)用了commit.close()。
- 在session中使用CLIENT_ACKNOWLEDGE簽收模式残腌,并且調(diào)用了session.recover()方法村斟。
- 在session中使用AUTO_ACKNOWLEDGE簽收模式贫导,在異步(messageListener)消費(fèi)消息情況下,如果onMessage方法異常且沒(méi)有被catch邓梅,此消息會(huì)被redelivery脱盲。
缺省情況下:持久消息過(guò)期,會(huì)被送到DLQ日缨,非持久消息不會(huì)送到DLQ(不會(huì)redelivery)钱反。
可以在connectionFactory中注入自定義的redeliveryPolicy來(lái)改變?nèi)笔?shù)。
activeMQ默認(rèn)是發(fā)送六次匣距,每次間隔1秒面哥。
6.常見(jiàn)問(wèn)題
6.1 消息堆積導(dǎo)致服務(wù)不可用
- ActiveMQ分為持久化和非持久化。非持久化消息是存儲(chǔ)在內(nèi)存中的毅待,持久化消息是存儲(chǔ)在文件中的尚卫,它們的最大限制在配置文件的<systemUsage>節(jié)點(diǎn)中配置。
- 非持久化消息堆積到內(nèi)存告急時(shí)尸红,ActiveMQ會(huì)將內(nèi)存中的非持久化消息寫入臨時(shí)文件中吱涉,以騰出內(nèi)存。(重啟后持久化消息會(huì)從文件中恢復(fù)外里,非持久化的臨時(shí)文件會(huì)直接刪除)
- 當(dāng)文件增大到達(dá)了配置中的最大限制的時(shí):
- 持久化情況怎爵,到達(dá)文件限額時(shí),生產(chǎn)者阻塞盅蝗,消費(fèi)者可連接可消費(fèi)鳖链,消費(fèi)后生產(chǎn)者繼續(xù)運(yùn)行:
設(shè)置2G左右的持久化文件限制,大量生產(chǎn)持久化消息直到文件達(dá)到最大限制墩莫,此時(shí)生產(chǎn)者阻塞芙委,但消費(fèi)者可正常連接并消費(fèi)消息,等消息消費(fèi)掉一部分狂秦,文件刪除又騰出空間之后灌侣,生產(chǎn)者又可繼續(xù)發(fā)送消息,服務(wù)自動(dòng)恢復(fù)正常裂问。- 非持久化情況顶瞳,到達(dá)文件限額時(shí),生產(chǎn)者阻塞愕秫,消費(fèi)者可連接但不能消費(fèi)慨菱,系統(tǒng)崩潰:
設(shè)置2G左右的臨時(shí)文件限制,大量生產(chǎn)非持久化消息并寫入臨時(shí)文件戴甩,在達(dá)到最大限制時(shí)符喝,生產(chǎn)者阻塞,消費(fèi)者可正常連接但不能消費(fèi)消息甜孤,或者原本慢速消費(fèi)的消費(fèi)者协饲,消費(fèi)突然停止畏腕。整個(gè)系統(tǒng)可連接,但是無(wú)法提供服務(wù)茉稠,就這樣掛了描馅。
解決方案:盡量不要用非持久化消息,非要用的話而线,將臨時(shí)文件限制盡可能的調(diào)大
6.2 消息丟失
6.3 消息持久化慢
默認(rèn)情況非持久化的消息是異步發(fā)送的铭污,持久化的消息是同步發(fā)送的,遇到慢一點(diǎn)的硬盤膀篮,發(fā)送消息的速度很慢嘹狞。
但是在開啟事務(wù)的情況下,消息都是異步發(fā)送的誓竿,效率會(huì)有2個(gè)數(shù)量級(jí)的提升磅网。所以在發(fā)送持久化消息時(shí),請(qǐng)務(wù)必開啟事務(wù)模式筷屡。發(fā)送非持久化消息時(shí)也建議開啟事務(wù)涧偷,因?yàn)楦静粫?huì)影響性能;
解決方案:消息發(fā)送啟用事務(wù)毙死;
6.4 消息不均勻消費(fèi)(存在無(wú)法處理消息嫂丙,prefech機(jī)制)
ActiveMQ的一個(gè)主要的設(shè)計(jì)目標(biāo)是:提供一個(gè)高性能的消息中間件。它使用了SEDA(Staged Event Driven Architecture)架構(gòu)及異步傳輸规哲。為了提供更高的性能,很重要的一點(diǎn)是 盡快地將消息傳送給消費(fèi)者诽表,這樣消費(fèi)者利用消息緩沖區(qū)等待處理唉锌,而不是等待消息。
然后竿奏,這樣也有很大風(fēng)險(xiǎn):不斷地向 消費(fèi)者 傳送消息可能使得其消息緩沖溢出袄简,因?yàn)閭魉偷乃俣缺认M(fèi)者真正“消費(fèi)”消息更快,這是很可能泛啸。
因此绿语,ActiveMQ使用了 消息”預(yù)取限制“(prefetch limit):表示在某個(gè)時(shí)間段內(nèi),可能向消費(fèi)者傳輸?shù)淖畲笙⒘亢蛑罚绻_(dá)到該上限吕粹,那么停止發(fā)送,直到ActiveMQ收到消費(fèi)者的acknowledgements(確認(rèn)岗仑,表示已經(jīng)處理了該消息)匹耕。prefetch limit可以針對(duì)每個(gè)不同的consumer來(lái)設(shè)置。
為了獲取更高的性能荠雕,prefetch limit當(dāng)然是越大越好稳其,只要consumer有足夠大的消息緩沖區(qū)(messagevolume)驶赏。如果消息的總量非常少,而且每個(gè)消息的處理時(shí)間非常的長(zhǎng)既鞠,那么煤傍,可以將prefetch設(shè)置為1,這樣嘱蛋,每次向consumer發(fā)送一個(gè)消息蚯姆,等其確認(rèn)已經(jīng)處理完畢后,再發(fā)送第二個(gè)浑槽。
特別地蒋失,如果prefetch設(shè)置為0,表示consumer每次 主動(dòng)向activeMQ要求傳輸最大的數(shù)據(jù)量桐玻,而不是被動(dòng)地接收消息
原因在于ActiveMQ的prefetch機(jī)制:每個(gè)消費(fèi)者獲取消息時(shí)篙挽,是批量獲取的,相當(dāng)于預(yù)定镊靴。因?yàn)閍ctiveMQ提倡是盡量消費(fèi)理念铣卡,所以會(huì)采取消費(fèi)者批量獲取信息的方式來(lái)提高消費(fèi)效率。prefech的值默認(rèn)是1000偏竟,即表示一個(gè)消費(fèi)者在取消息時(shí)煮落,會(huì)一次取1000條到自己的消息緩沖區(qū)中(不溢出),然后再一條一條的處理踊谋,每處理完一條蝉仇,mq就會(huì)刪除這一條,沒(méi)處理的在mq中依然可見(jiàn)殖蚕。
在發(fā)送一些消息之后轿衔,開啟2個(gè)消費(fèi)者去處理消息。會(huì)發(fā)現(xiàn)一個(gè)消費(fèi)者處理了所有的消息睦疫,另一個(gè)消費(fèi)者根本沒(méi)收到消息害驹。原因在于ActiveMQ的prefetch機(jī)制。當(dāng)消費(fèi)者去獲取消息時(shí)蛤育,不會(huì)一條一條去獲取宛官,而是一次性獲取一批,默認(rèn)是1000條瓦糕。這些預(yù)獲取的消息底洗,在還沒(méi)確認(rèn)消費(fèi)之前,在管理控制臺(tái)還是可以看見(jiàn)這些消息的咕娄,但是不會(huì)再分配給其他消費(fèi)者枷恕,此時(shí)這些消息的狀態(tài)應(yīng)該算作“已分配未消費(fèi)”,如果消息最后被消費(fèi)谭胚,則會(huì)在服務(wù)器端被刪除徐块,如果消費(fèi)者崩潰未玻,則這些消息會(huì)被重新分配給新的消費(fèi)者。但是如果消費(fèi)者既不消費(fèi)確認(rèn)胡控,又不崩潰扳剿,那這些消息就永遠(yuǎn)躺在消費(fèi)者的緩存區(qū)里無(wú)法處理。更通常的情況是昼激,消費(fèi)這些消息非常耗時(shí)庇绽,你開了10個(gè)消費(fèi)者去處理,結(jié)果發(fā)現(xiàn)只有一臺(tái)機(jī)器吭哧吭哧處理橙困,另外9臺(tái)啥事不干瞧掺。
解決方案:將prefetch設(shè)為1,每次處理1條消息凡傅,處理完再去取辟狈,這樣也慢不了多少.
另外,當(dāng)連接來(lái)自一個(gè)連接池中夏跷,消費(fèi)消息可能出現(xiàn)一些由于“prefetch”而產(chǎn)生的問(wèn)題:預(yù)取的消息(還未被處理)當(dāng)連接關(guān)閉時(shí)會(huì)被釋放(即哼转,可以在activeMQ中再次讀取到該消息)。而連接池中的連接只有在連接池關(guān)閉后才真正的銷毀槽华。這使得 預(yù)取的消息直到連接被重用時(shí)才會(huì)被處理(或者等連接池關(guān)閉壹蔓,可再次從activeMQ中讀取)猫态。這樣導(dǎo)致了消息可能丟失佣蓉,或者當(dāng)連接池中有多個(gè)連接時(shí),消息亂序(out-of-sequence)亲雪!
- 消費(fèi)者則不放入連接池勇凭!當(dāng)然,這樣在消費(fèi)者的性能上匆光,會(huì)受到影響(當(dāng)有多個(gè)線程快速的消費(fèi)消息時(shí),對(duì)象被不斷的創(chuàng)建銷毀)酿联。
- 將消費(fèi)者的連接池中數(shù)量設(shè)為1终息。
- 在使用連接池的情況下,將prefetch設(shè)為1或者0贞让。當(dāng)使用Spring JMS和MessageDrivenPojo時(shí)周崭,只能將prefetch設(shè)為1,而不能為0喳张;