activeMQ 入門指南

1.簡(jiǎn)介

1.1. ActiveMQ 由Apache出品的開源消息總線狼钮。是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn)。


1.2 它的優(yōu)點(diǎn)是:
支持多種語(yǔ)言編寫客戶端围橡;
對(duì)spring的支持,很容易和spring整合 缕贡;
支持多種傳輸協(xié)議:TCP,SSL,NIO,UDP等 翁授;
支持AJAX ;
非常成熟晾咪,功能強(qiáng)大收擦,在早些年業(yè)內(nèi)大量的公司以及項(xiàng)目中都有應(yīng)用 ;
單機(jī)吞吐量:萬(wàn)級(jí)
時(shí)效性:ms級(jí)
可用性:高谍倦,基于主從架構(gòu)實(shí)現(xiàn)高可用性
消息可靠性:有較低的概率丟失數(shù)據(jù)
功能支持:MQ領(lǐng)域的功能極其完備


1.3 它的缺點(diǎn)是
偶爾會(huì)有較低概率丟失消息塞赂;
現(xiàn)在社區(qū)以及國(guó)內(nèi)應(yīng)用都越來(lái)越少,官方社區(qū)現(xiàn)在對(duì)ActiveMQ 5.x維護(hù)越來(lái)越少昼蛀,幾個(gè)月才發(fā)布一個(gè)版本宴猾;


1.4 使用場(chǎng)景
主要是基于解耦和異步來(lái)用的,較少在大規(guī)模吞吐的場(chǎng)景中使用

2.安裝

2.1 下載安裝包

到官網(wǎng)下載:https://activemq.apache.org/components/classic/

2.2 啟動(dòng)activeMQ

解壓縮叼旋,進(jìn)入bin目錄仇哆,啟動(dòng)/停止/查看狀態(tài):./activemq [start|stop|status]

? 啟動(dòng)信息如下


INFO | Apache ActiveMQ 5.15.9 (localhost, ID:LAPTOP-BRJ9A33I-60490-1563860323722-0:1) is starting

INFO | Listening for connections at: tcp://LAPTOP-BRJ9A33I:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600

INFO | Connector openwire started

INFO | Listening for connections at: amqp://LAPTOP-BRJ9A33I:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600

INFO | Connector amqp started

INFO | Listening for connections at: stomp://LAPTOP-BRJ9A33I:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600

INFO | Connector stomp started

INFO | Listening for connections at: mqtt://LAPTOP-BRJ9A33I:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600

INFO | Connector mqtt started

INFO | Starting Jetty server

INFO | Creating Jetty connector

WARN | ServletContext@o.e.j.s.ServletContextHandler@f107c50{/,null,STARTING} has uncovered http methods for path: /

INFO | Listening for connections at ws://LAPTOP-BRJ9A33I:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600

INFO | Connector ws started

INFO | Apache ActiveMQ 5.15.9 (localhost, ID:LAPTOP-BRJ9A33I-60490-1563860323722-0:1) started

INFO | For help or more information please see: http://activemq.apache.org

INFO | No Spring WebApplicationInitializer types detected on classpath

INFO | ActiveMQ WebConsole available at http://0.0.0.0:8161/

INFO | ActiveMQ Jolokia REST API available at http://0.0.0.0:8161/api/jolokia/

INFO | Initializing Spring FrameworkServlet 'dispatcher'

2.3 查看控制臺(tái)

進(jìn)入管理后臺(tái)查看消息:http://ip:8161/admin 用戶名:admin 密碼:admin

3.編碼實(shí)現(xiàn)

3.1 發(fā)送端編碼

<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.3</version>
        </dependency>


package com.qhs.mq.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class MsgProducer {

    public static void main(String[] args) {

        testQueue();

        testTopic();

    }

    private static void testQueue() {

        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.137.1:61616");

        Connection connection = null;

        Session session = null;

        MessageProducer producer = null;

        try {

            connection = factory.createConnection();

            connection.start();

            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            Queue queue = session.createQueue("qhs-queue");

            producer = session.createProducer(queue);

            TextMessage textMessage = null;

            for (int i = 0; i < 50; i++) {

                textMessage = session.createTextMessage("字符串queue消息:" + i);

                producer.send(textMessage);

            }

        } catch (JMSException e) {

            e.printStackTrace();

        } finally {

            try {

                producer.close();

            } catch (JMSException e) {

                e.printStackTrace();

            }

            try {

                session.close();

            } catch (JMSException e) {

                e.printStackTrace();

            }

            try {

                connection.close();

            } catch (JMSException e) {

                e.printStackTrace();

            }

        }

    }

    private static void testTopic() {

        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.137.1:61616");

        Connection connection = null;

        Session session = null;

        MessageProducer producer = null;

        try {

            connection = factory.createConnection();

            connection.start();

            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            Topic topic = session.createTopic("qhs-topic");

            producer = session.createProducer(topic);

            TextMessage textMessage = null;

            for (int i = 0; i < 50; i++) {

                textMessage = session.createTextMessage("字符串topic消息:" + i);

                producer.send(textMessage);

            }

        } catch (JMSException e) {

            e.printStackTrace();

        } finally {

            try {

                producer.close();

            } catch (JMSException e) {

                e.printStackTrace();

            }

            try {

                session.close();

            } catch (JMSException e) {

                e.printStackTrace();

            }

            try {

                connection.close();

            } catch (JMSException e) {

                e.printStackTrace();

            }

        }

    }

}

3.2 接收端編碼


package com.qhs.mq.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class MsgConsumer {

    public static void main(String[] args) {

        testQueue();

        testTopic();

    }

    public static void testQueue() {

        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.137.1:61616");

        Connection connection = null;

        Session session = null;

        MessageConsumer queueConsumer = null;

        MessageConsumer topicConsumer = null;

        try {

            connection = factory.createConnection();

            connection.start();

            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            Queue queue = session.createQueue("qhs-queue");

            queueConsumer = session.createConsumer(queue);

            queueConsumer.setMessageListener(new MessageListener() {

                @Override

                public void onMessage(Message message) {

                    try {

                        System.out.println(((TextMessage)message).getText());

                    } catch (JMSException e) {

                        e.printStackTrace();

                    }

                }

            });

            Topic topic = session.createTopic("qhs-topic");

            topicConsumer = session.createConsumer(topic);

            topicConsumer.setMessageListener(new MessageListener() {

                @Override

                public void onMessage(Message message) {

                    try {

                        System.out.println(((TextMessage)message).getText());

                    } catch (JMSException e) {

                        e.printStackTrace();

                    }

                }

            });

            System.in.read();

        } catch (Exception e) {

            e.printStackTrace();

        }finally {

            try {

                queueConsumer.close();

            } catch (JMSException e) {

                e.printStackTrace();

            }

            try {

                topicConsumer.close();

            } catch (JMSException e) {

                e.printStackTrace();

            }

            try {

                session.close();

            } catch (JMSException e) {

                e.printStackTrace();

            }

            try {

                connection.close();

            } catch (JMSException e) {

                e.printStackTrace();

            }

        }

    }

    public static void testTopic() {

        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.137.1:61616");

        Connection connection = null;

        Session session = null;

        MessageConsumer consumer = null;

        try {

            connection = factory.createConnection();

            connection.start();

            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            System.in.read();

        } catch (Exception e) {

            e.printStackTrace();

        }finally {

            try {

                consumer.close();

            } catch (JMSException e) {

                e.printStackTrace();

            }

            try {

                session.close();

            } catch (JMSException e) {

                e.printStackTrace();

            }

            try {

                connection.close();

            } catch (JMSException e) {

                e.printStackTrace();

            }

        }

    }

}

運(yùn)行程序,先運(yùn)行接收端夫植,再運(yùn)行發(fā)送端即可(topic消息默認(rèn)不持久化)讹剔。
通過(guò)web控制臺(tái)可以查看到對(duì)應(yīng)的消息。

4.一些特征

  • 點(diǎn)對(duì)點(diǎn)模式不會(huì)造成重復(fù)消費(fèi)详民,一個(gè)生產(chǎn)者的消息只能給一個(gè)消費(fèi)者消費(fèi)延欠;
  • 發(fā)布訂閱模型和點(diǎn)對(duì)點(diǎn)模型不一樣,若發(fā)布消息之前未啟動(dòng)訂閱者阐斜,則訂閱者啟動(dòng)時(shí)衫冻,訂閱者也無(wú)法接收到消息(未持久化狀態(tài))。另外谒出,發(fā)布訂閱模型隅俘,消費(fèi)者是可以重新消費(fèi)的;
  • 消息保序方式是broker維持了一個(gè)consumer列表笤喳,對(duì)于同一session的消息取列表中的第一個(gè)來(lái)發(fā)送消息为居;

5.實(shí)現(xiàn)機(jī)制

5.1 可靠性保證

  1. 自動(dòng)簽收,生產(chǎn)者向隊(duì)列發(fā)送消息后杀狡,只要消費(fèi)者監(jiān)聽了消息隊(duì)列蒙畴,消費(fèi)者將立刻獲得消息,不管消費(fèi)者是否成功取得消息,過(guò)程是否拋出異常導(dǎo)致消費(fèi)者無(wú)法獲得消息膳凝,都不會(huì)觸發(fā)重試機(jī)制碑隆。--沒(méi)有事務(wù)機(jī)制,沒(méi)有補(bǔ)償機(jī)制
  2. 事務(wù)簽收蹬音,對(duì)于生產(chǎn)者而言上煤,生產(chǎn)者要想向消息隊(duì)列發(fā)送消息,必須提交事務(wù)著淆。對(duì)于消費(fèi)者而言劫狠,如果消費(fèi)沒(méi)有提交事務(wù),則默認(rèn)表示沒(méi)有消費(fèi)永部,會(huì)觸發(fā)重試機(jī)制独泞。-- 雙方事務(wù)提交
  3. 手動(dòng)簽收,需要消費(fèi)者手動(dòng)簽收苔埋,如果消費(fèi)者沒(méi)有進(jìn)行簽收懦砂,則默認(rèn)消息沒(méi)有被消費(fèi)。--單方事務(wù)提交

代碼范例:


//場(chǎng)景1 

//生產(chǎn)者不開啟事務(wù)讲坎,客戶端必須有手動(dòng)簽收模式

Session session = createConnection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);

//消費(fèi)者不開啟事務(wù)孕惜,客戶端必須有手動(dòng)簽收模式

TextMessage textMessage = (TextMessage) createConsumer.receive();

//接受消息

textMessage.acknowledge();

//場(chǎng)景2

//生產(chǎn)者不開啟事務(wù),客戶端自動(dòng)簽收模式

Session session = createConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

//消費(fèi)者不開啟事務(wù)晨炕,自動(dòng)簽收消息

Session session = createConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

//場(chǎng)景3

//在支持事務(wù)的session中衫画,producer發(fā)送message時(shí)在message中帶有transactionID。broker收到message后判斷是否有transactionID瓮栗,如果有就把message保存在transaction store中削罩,等待commit或者rollback消息

//事務(wù)消息 生產(chǎn)者以事務(wù)形式,必須要將消息提交事務(wù)费奸,才可以提交到隊(duì)列中弥激。

Session session = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

session.commit();

//消費(fèi)者 消費(fèi)完后必須提交,不提交生產(chǎn)者不知道消費(fèi)者消費(fèi)了

Session session = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

session.commit();

5.2 事務(wù)機(jī)制

  • 消息事務(wù)是在生產(chǎn)者producer到broker或broker到consumer過(guò)程中同一個(gè)session中發(fā)生的,保證幾條消息在發(fā)送過(guò)程中的原子性愿阐。(Broker:消息隊(duì)列核心微服,相當(dāng)于一個(gè)控制中心,負(fù)責(zé)路由消息缨历、保存訂閱和連接以蕴、消息確認(rèn)和控制事務(wù))
  • 消息生產(chǎn)者-異步發(fā)送
    消息生產(chǎn)者使用持久(persistent)傳遞模式發(fā)送消息的時(shí)候,Producer.send() 方法會(huì)被阻塞辛孵,直到 broker 發(fā)送一個(gè)確認(rèn)消息給生產(chǎn)者(ProducerAck)丛肮,這個(gè)確認(rèn)消息暗示broker已經(jīng)成功接收到消息并把消息保存到二級(jí)存儲(chǔ)中。這個(gè)過(guò)程通常稱為同步發(fā)送魄缚。
    如果應(yīng)用程序能夠容忍一些消息的丟失宝与,那么可以使用異步發(fā)送。異步發(fā)送不會(huì)在受到 broker 的確認(rèn)之前一直阻塞 Producer.send 方法。但有一個(gè)例外习劫,當(dāng)發(fā)送方法在一個(gè)事務(wù)上下文中時(shí)咆瘟,被阻塞的是 commit 方法而不是 send 方法。commit 方法成功返回意味著所有的持久消息都以被寫到二級(jí)存儲(chǔ)中诽里。
    想要使用異步搞疗,在brokerURL中增加 jms.alwaysSyncSend=false&jms.useAsyncSend=true
    如果設(shè)置了alwaysSyncSend=true系統(tǒng)將會(huì)忽略u(píng)seAsyncSend設(shè)置的值都采用同步
  1. 當(dāng)alwaysSyncSend=false時(shí),“NON_PERSISTENT”(非持久化)须肆、事務(wù)中的消息將使用“異步發(fā)送”;
  2. 當(dāng)alwaysSyncSend=false時(shí),如果指定了useAsyncSend=true桩皿,“PERSISTENT”類型的消息使用異步發(fā)送豌汇。如果useAsyncSend=false,“PERSISTENT”類型的消息使用同步發(fā)送泄隔。

總結(jié):

  • 默認(rèn)情況(alwaysSyncSend=false,useAsyncSend=false)拒贱,非持久化消息、事務(wù)內(nèi)的消息均采用異步發(fā)送佛嬉;對(duì)于持久化消息采用同步發(fā)送逻澳。
    jms.sendTimeout:發(fā)送超時(shí)時(shí)間,默認(rèn)等于0暖呕,如果jms.sendTimeout>0將會(huì)忽略(alwaysSyncSend斜做、useAsyncSend、消息是否持久化)所有的消息都是用同步發(fā)送湾揽!
  • 即使使用異步發(fā)送瓤逼,也可以通過(guò)producerWindowSize來(lái)控制發(fā)送端無(wú)節(jié)制的向broker發(fā)送消息:
    producerWindowSize:窗口尺寸,用來(lái)約束在異步發(fā)送時(shí)producer端允許積壓的(尚未ACK)的消息的尺寸库物,且只對(duì)異步發(fā)送有意義霸旗。每次發(fā)送消息之后,都將會(huì)導(dǎo)致memoryUsage尺寸增加(+message.size)戚揭,當(dāng)broker返回producerAck時(shí)诱告,如果達(dá)到了producerWindowSize上限,即使是異步調(diào)用也會(huì)被阻塞民晒,防止不停向broker發(fā)送消息精居。
    通過(guò)jms.producerWindowSize=。镀虐。箱蟆。來(lái)設(shè)置

5.3 持久化機(jī)制

ActiveMQ的消息持久化機(jī)制有JDBC,AMQ刮便,KahaDB和LevelDB空猜,無(wú)論使用哪種持久化方式,消息的存儲(chǔ)邏輯都是一致的。在發(fā)送者將消息發(fā)送出去后辈毯,消息中心首先將消息存儲(chǔ)到本地?cái)?shù)據(jù)文件坝疼、內(nèi)存數(shù)據(jù)庫(kù)或者遠(yuǎn)程數(shù)據(jù)庫(kù)等,然后試圖將消息發(fā)送給接收者谆沃,發(fā)送成功則將消息從存儲(chǔ)中刪除钝凶,失敗則繼續(xù)嘗試。消息中心啟動(dòng)以后首先要檢查指定的存儲(chǔ)位置唁影,如果有未發(fā)送成功的消息耕陷,則需要把消息發(fā)送出去。

5.4 消息冪等性

消費(fèi)者保證消息冪等性据沈,不被重復(fù)消費(fèi)
網(wǎng)絡(luò)延遲傳輸中哟沫,會(huì)造成進(jìn)行MQ重試中,在重試過(guò)程中锌介,可能會(huì)造成重復(fù)消費(fèi)
使用全局MessageID判斷消費(fèi)方使用同一個(gè)嗜诀,解決冪等性
activeMQ提供了textMessageId,使用業(yè)務(wù)ID(訂單號(hào))也可以

String jmsMessageId = textMessage.getJMSMessageID();

網(wǎng)絡(luò)延遲環(huán)境下,第二次請(qǐng)求過(guò)來(lái),應(yīng)該使用全局ID判斷該消息是否被使用過(guò)

if(jmsMessageId == redis內(nèi)的id){

    //把消息簽收掉,否則將繼續(xù)重試,有些人覺(jué)得難理解,我解釋一下:

    textMessage.acknowledge();  //避免第三次重試

   // 消息可以重發(fā),但是消息不能做重復(fù)操作,如重復(fù)向數(shù)據(jù)庫(kù)中做插入操作,造成冪等性問(wèn)題,

    //所以當(dāng)?shù)诙伟l(fā)送過(guò)來(lái)的時(shí)候,就可能造成重復(fù)提交問(wèn)題,我們使用手動(dòng)提交(業(yè)務(wù)中一般也是使用手動(dòng)提交多)

    //手動(dòng)提交可以把重復(fù)發(fā)送的消息從隊(duì)列中移除,那么接下來(lái)就不會(huì)觸發(fā)重試了。

}

    //將拿到的消息做業(yè)務(wù)處理,如插入修改操作等...

   // 消費(fèi)成功,把jmsMessageId放入redis

5.5 隊(duì)列消息過(guò)濾收取

不同消息可以放到不同的隊(duì)列孔祸,也可以放到一個(gè)隊(duì)列中隆敢,通過(guò)增加消息的屬性,在發(fā)送message的時(shí)候崔慧,可以設(shè)置它的消息屬性值拂蝎,然后在消費(fèi)端建立消費(fèi)隊(duì)列時(shí),傳入過(guò)濾條件即可尊浪。

發(fā)送端:


MapMessage message1 = session.createMapMessage();

          message1.setString("name","張1");//設(shè)置了name值

          message1.setIntProperty("age",23);//設(shè)置了age屬性

接受端:

MessageConsumer consumer = session.createConsumer(queue,"age > 30");

這樣匣屡,這個(gè)consumer就只消費(fèi)這個(gè)queue中age>30的消息。

5.6 消息保序性

利用Activemq的高級(jí)特性:consumer之獨(dú)有消費(fèi)者(exclusive consumer)

  queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");

這樣會(huì)確保consumer列表中拇涤,始終取一個(gè)主消費(fèi)者來(lái)消費(fèi)捣作。但是缺點(diǎn)在于將會(huì)始終發(fā)往這個(gè)消費(fèi)者,導(dǎo)致無(wú)法負(fù)載均衡鹅士。比如券躁,訂單消息,希望一個(gè)訂單內(nèi)的消息是保序的掉盅,但各個(gè)訂單之間可以并發(fā)也拜,這就需要用到下文消息組 messageGroup 的概念。

Message Groups特性是一種負(fù)載均衡的機(jī)制趾痘。在一個(gè)消息被分發(fā)到consumer之前慢哈,broker首先檢查消息JMSXGroupID屬性。如果存在永票,那么broker會(huì)檢查是否有某個(gè)consumer擁有這個(gè)message group卵贱。如果沒(méi)有滥沫,那么broker會(huì)選擇一個(gè)consumer,并將它關(guān)聯(lián)到這個(gè)message group键俱。此后兰绣,這個(gè)consumer會(huì)接收這個(gè)message group的所有消息,直到:

  • Consumer被關(guān)閉
  • Message group被關(guān)閉编振,通過(guò)發(fā)送一個(gè)消息缀辩,并設(shè)置這個(gè)消息的JMSXGroupSeq為-1

例如:

message.setStringProperty("JMSXGroupID", "constact-20100000002");
.....
message.setIntProperty("JMSXGroupSeq", -1);

5.6 死信隊(duì)列

在使用Message Queue的過(guò)程中,總會(huì)由于種種原因而導(dǎo)致消息失敗踪央。一個(gè)經(jīng)典的場(chǎng)景是一個(gè)生成者向Queue中發(fā)消息臀玄,里面包含了一組郵件地址和郵件內(nèi)容。而消費(fèi)者從Queue中將消息一條條讀出來(lái)畅蹂,向指定郵件地址發(fā)送郵件镐牺。消費(fèi)者在發(fā)送消息的過(guò)程中由于種種原因會(huì)導(dǎo)致失敗,比如網(wǎng)絡(luò)超時(shí)魁莉、當(dāng)前郵件服務(wù)器不可用等。這樣我們就希望建立一種機(jī)制募胃,對(duì)于未發(fā)送成功的郵件再重新發(fā)送旗唁,也就是重新處理。重新處理超過(guò)一定次數(shù)還不成功痹束,就放棄對(duì)該消息的處理检疫,記錄下來(lái),繼續(xù)對(duì)剩余消息進(jìn)行處理祷嘶。

ActiveMQ為我們實(shí)現(xiàn)了這一功能屎媳,叫做ReDelivery(重新投遞)。當(dāng)消費(fèi)者在處理消息時(shí)有異常發(fā)生论巍,會(huì)將消息重新放回Queue里烛谊,進(jìn)行下一次處理。當(dāng)超過(guò)重試次數(shù)時(shí)嘉汰,會(huì)給broker發(fā)送一個(gè)"Poison ack"丹禀,這個(gè)消息被認(rèn)為是a poison pill(毒丸),這時(shí)broker會(huì)將這個(gè)消息發(fā)送到DLQ鞋怀。

在以下四種情況中双泪,ActiveMQ消息會(huì)被重發(fā)給客戶端/消費(fèi)者:

  • 在一個(gè)事務(wù)session中,并且調(diào)用了session.rollback()方法密似。
  • 在一個(gè)事務(wù)session中焙矛,session.commit()之前調(diào)用了commit.close()。
  • 在session中使用CLIENT_ACKNOWLEDGE簽收模式残腌,并且調(diào)用了session.recover()方法村斟。
  • 在session中使用AUTO_ACKNOWLEDGE簽收模式贫导,在異步(messageListener)消費(fèi)消息情況下,如果onMessage方法異常且沒(méi)有被catch邓梅,此消息會(huì)被redelivery脱盲。

缺省情況下:持久消息過(guò)期,會(huì)被送到DLQ日缨,非持久消息不會(huì)送到DLQ(不會(huì)redelivery)钱反。
可以在connectionFactory中注入自定義的redeliveryPolicy來(lái)改變?nèi)笔?shù)。
activeMQ默認(rèn)是發(fā)送六次匣距,每次間隔1秒面哥。

6.常見(jiàn)問(wèn)題

6.1 消息堆積導(dǎo)致服務(wù)不可用

  • ActiveMQ分為持久化和非持久化。非持久化消息是存儲(chǔ)在內(nèi)存中的毅待,持久化消息是存儲(chǔ)在文件中的尚卫,它們的最大限制在配置文件的<systemUsage>節(jié)點(diǎn)中配置。
  • 非持久化消息堆積到內(nèi)存告急時(shí)尸红,ActiveMQ會(huì)將內(nèi)存中的非持久化消息寫入臨時(shí)文件中吱涉,以騰出內(nèi)存。(重啟后持久化消息會(huì)從文件中恢復(fù)外里,非持久化的臨時(shí)文件會(huì)直接刪除)
  • 當(dāng)文件增大到達(dá)了配置中的最大限制的時(shí):
  • 持久化情況怎爵,到達(dá)文件限額時(shí),生產(chǎn)者阻塞盅蝗,消費(fèi)者可連接可消費(fèi)鳖链,消費(fèi)后生產(chǎn)者繼續(xù)運(yùn)行:
    設(shè)置2G左右的持久化文件限制,大量生產(chǎn)持久化消息直到文件達(dá)到最大限制墩莫,此時(shí)生產(chǎn)者阻塞芙委,但消費(fèi)者可正常連接并消費(fèi)消息,等消息消費(fèi)掉一部分狂秦,文件刪除又騰出空間之后灌侣,生產(chǎn)者又可繼續(xù)發(fā)送消息,服務(wù)自動(dòng)恢復(fù)正常裂问。
  • 非持久化情況顶瞳,到達(dá)文件限額時(shí),生產(chǎn)者阻塞愕秫,消費(fèi)者可連接但不能消費(fèi)慨菱,系統(tǒng)崩潰:
    設(shè)置2G左右的臨時(shí)文件限制,大量生產(chǎn)非持久化消息并寫入臨時(shí)文件戴甩,在達(dá)到最大限制時(shí)符喝,生產(chǎn)者阻塞,消費(fèi)者可正常連接但不能消費(fèi)消息甜孤,或者原本慢速消費(fèi)的消費(fèi)者协饲,消費(fèi)突然停止畏腕。整個(gè)系統(tǒng)可連接,但是無(wú)法提供服務(wù)茉稠,就這樣掛了描馅。
    解決方案:盡量不要用非持久化消息,非要用的話而线,將臨時(shí)文件限制盡可能的調(diào)大

6.2 消息丟失

6.3 消息持久化慢

默認(rèn)情況非持久化的消息是異步發(fā)送的铭污,持久化的消息是同步發(fā)送的,遇到慢一點(diǎn)的硬盤膀篮,發(fā)送消息的速度很慢嘹狞。

但是在開啟事務(wù)的情況下,消息都是異步發(fā)送的誓竿,效率會(huì)有2個(gè)數(shù)量級(jí)的提升磅网。所以在發(fā)送持久化消息時(shí),請(qǐng)務(wù)必開啟事務(wù)模式筷屡。發(fā)送非持久化消息時(shí)也建議開啟事務(wù)涧偷,因?yàn)楦静粫?huì)影響性能;

解決方案:消息發(fā)送啟用事務(wù)毙死;

6.4 消息不均勻消費(fèi)(存在無(wú)法處理消息嫂丙,prefech機(jī)制)

ActiveMQ的一個(gè)主要的設(shè)計(jì)目標(biāo)是:提供一個(gè)高性能的消息中間件。它使用了SEDA(Staged Event Driven Architecture)架構(gòu)及異步傳輸规哲。為了提供更高的性能,很重要的一點(diǎn)是 盡快地將消息傳送給消費(fèi)者诽表,這樣消費(fèi)者利用消息緩沖區(qū)等待處理唉锌,而不是等待消息。
   然后竿奏,這樣也有很大風(fēng)險(xiǎn):不斷地向 消費(fèi)者 傳送消息可能使得其消息緩沖溢出袄简,因?yàn)閭魉偷乃俣缺认M(fèi)者真正“消費(fèi)”消息更快,這是很可能泛啸。
  因此绿语,ActiveMQ使用了 消息”預(yù)取限制“(prefetch limit):表示在某個(gè)時(shí)間段內(nèi),可能向消費(fèi)者傳輸?shù)淖畲笙⒘亢蛑罚绻_(dá)到該上限吕粹,那么停止發(fā)送,直到ActiveMQ收到消費(fèi)者的acknowledgements(確認(rèn)岗仑,表示已經(jīng)處理了該消息)匹耕。prefetch limit可以針對(duì)每個(gè)不同的consumer來(lái)設(shè)置。
  為了獲取更高的性能荠雕,prefetch limit當(dāng)然是越大越好稳其,只要consumer有足夠大的消息緩沖區(qū)(messagevolume)驶赏。如果消息的總量非常少,而且每個(gè)消息的處理時(shí)間非常的長(zhǎng)既鞠,那么煤傍,可以將prefetch設(shè)置為1,這樣嘱蛋,每次向consumer發(fā)送一個(gè)消息蚯姆,等其確認(rèn)已經(jīng)處理完畢后,再發(fā)送第二個(gè)浑槽。
  特別地蒋失,如果prefetch設(shè)置為0,表示consumer每次 主動(dòng)向activeMQ要求傳輸最大的數(shù)據(jù)量桐玻,而不是被動(dòng)地接收消息

原因在于ActiveMQ的prefetch機(jī)制:每個(gè)消費(fèi)者獲取消息時(shí)篙挽,是批量獲取的,相當(dāng)于預(yù)定镊靴。因?yàn)閍ctiveMQ提倡是盡量消費(fèi)理念铣卡,所以會(huì)采取消費(fèi)者批量獲取信息的方式來(lái)提高消費(fèi)效率。prefech的值默認(rèn)是1000偏竟,即表示一個(gè)消費(fèi)者在取消息時(shí)煮落,會(huì)一次取1000條到自己的消息緩沖區(qū)中(不溢出),然后再一條一條的處理踊谋,每處理完一條蝉仇,mq就會(huì)刪除這一條,沒(méi)處理的在mq中依然可見(jiàn)殖蚕。

在發(fā)送一些消息之后轿衔,開啟2個(gè)消費(fèi)者去處理消息。會(huì)發(fā)現(xiàn)一個(gè)消費(fèi)者處理了所有的消息睦疫,另一個(gè)消費(fèi)者根本沒(méi)收到消息害驹。原因在于ActiveMQ的prefetch機(jī)制。當(dāng)消費(fèi)者去獲取消息時(shí)蛤育,不會(huì)一條一條去獲取宛官,而是一次性獲取一批,默認(rèn)是1000條瓦糕。這些預(yù)獲取的消息底洗,在還沒(méi)確認(rèn)消費(fèi)之前,在管理控制臺(tái)還是可以看見(jiàn)這些消息的咕娄,但是不會(huì)再分配給其他消費(fèi)者枷恕,此時(shí)這些消息的狀態(tài)應(yīng)該算作“已分配未消費(fèi)”,如果消息最后被消費(fèi)谭胚,則會(huì)在服務(wù)器端被刪除徐块,如果消費(fèi)者崩潰未玻,則這些消息會(huì)被重新分配給新的消費(fèi)者。但是如果消費(fèi)者既不消費(fèi)確認(rèn)胡控,又不崩潰扳剿,那這些消息就永遠(yuǎn)躺在消費(fèi)者的緩存區(qū)里無(wú)法處理。更通常的情況是昼激,消費(fèi)這些消息非常耗時(shí)庇绽,你開了10個(gè)消費(fèi)者去處理,結(jié)果發(fā)現(xiàn)只有一臺(tái)機(jī)器吭哧吭哧處理橙困,另外9臺(tái)啥事不干瞧掺。

解決方案:將prefetch設(shè)為1,每次處理1條消息凡傅,處理完再去取辟狈,這樣也慢不了多少.

另外,當(dāng)連接來(lái)自一個(gè)連接池中夏跷,消費(fèi)消息可能出現(xiàn)一些由于“prefetch”而產(chǎn)生的問(wèn)題:預(yù)取的消息(還未被處理)當(dāng)連接關(guān)閉時(shí)會(huì)被釋放(即哼转,可以在activeMQ中再次讀取到該消息)。而連接池中的連接只有在連接池關(guān)閉后才真正的銷毀槽华。這使得 預(yù)取的消息直到連接被重用時(shí)才會(huì)被處理(或者等連接池關(guān)閉壹蔓,可再次從activeMQ中讀取)猫态。這樣導(dǎo)致了消息可能丟失佣蓉,或者當(dāng)連接池中有多個(gè)連接時(shí),消息亂序(out-of-sequence)亲雪!

  • 消費(fèi)者則不放入連接池勇凭!當(dāng)然,這樣在消費(fèi)者的性能上匆光,會(huì)受到影響(當(dāng)有多個(gè)線程快速的消費(fèi)消息時(shí),對(duì)象被不斷的創(chuàng)建銷毀)酿联。
  • 將消費(fèi)者的連接池中數(shù)量設(shè)為1终息。
  • 在使用連接池的情況下,將prefetch設(shè)為1或者0贞让。當(dāng)使用Spring JMS和MessageDrivenPojo時(shí)周崭,只能將prefetch設(shè)為1,而不能為0喳张;
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末续镇,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子销部,更是在濱河造成了極大的恐慌摸航,老刑警劉巖制跟,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異酱虎,居然都是意外死亡雨膨,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門读串,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)聊记,“玉大人,你說(shuō)我怎么就攤上這事恢暖∨偶啵” “怎么了?”我有些...
    開封第一講書人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵杰捂,是天一觀的道長(zhǎng)舆床。 經(jīng)常有香客問(wèn)我,道長(zhǎng)琼娘,這世上最難降的妖魔是什么峭弟? 我笑而不...
    開封第一講書人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮脱拼,結(jié)果婚禮上瞒瘸,老公的妹妹穿的比我還像新娘。我一直安慰自己熄浓,他們只是感情好情臭,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著赌蔑,像睡著了一般俯在。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上娃惯,一...
    開封第一講書人閱讀 51,631評(píng)論 1 305
  • 那天跷乐,我揣著相機(jī)與錄音,去河邊找鬼趾浅。 笑死愕提,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的皿哨。 我是一名探鬼主播浅侨,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼溃列,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼千诬!你這毒婦竟也來(lái)了诱渤?” 一聲冷哼從身側(cè)響起送悔,我...
    開封第一講書人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤硼婿,失蹤者是張志新(化名)和其女友劉穎寺滚,沒(méi)想到半個(gè)月后蚀乔,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體持寄,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年脖祈,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了肆捕。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡盖高,死狀恐怖慎陵,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情喻奥,我是刑警寧澤席纽,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站撞蚕,受9級(jí)特大地震影響润梯,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜甥厦,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一纺铭、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧刀疙,春花似錦舶赔、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至疚鲤,卻和暖如春锥累,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背集歇。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工桶略, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人诲宇。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓际歼,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親焕窝。 傳聞我的和親對(duì)象是個(gè)殘疾皇子蹬挺,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 個(gè)人專題目錄[http://www.reibang.com/p/140e2a59db2c] 一维贺、JMS簡(jiǎn)介 全稱...
    Java及SpringBoot閱讀 2,084評(píng)論 0 10
  • 什么是activeMQ activeMQ是一種開源的它掂,實(shí)現(xiàn)了JMS1.1規(guī)范的,面向消息(MOM)的中間件,為應(yīng)用...
    趙鐵柱啊閱讀 1,896評(píng)論 1 6
  • 此刻正獨(dú)自坐在一家簡(jiǎn)易的快餐店虐秋。 由于還沒(méi)到飯點(diǎn)榕茧,來(lái)往用餐的人并不多。 我只點(diǎn)了一杯飲料客给,坐等時(shí)間的流逝用押。 天還是...
    枝角閱讀 613評(píng)論 4 5
  • 感情很奇妙,我竟然看著你的笑臉靶剑,說(shuō)傻姑娘蜻拨。
    飛利浦凈水鄭凱閱讀 190評(píng)論 0 0
  • 《奇雙會(huì)》又名《販馬記》:包括《哭監(jiān)》、《寫狀》桩引、《三拉》三折戲缎讼。 主要?jiǎng)∏槭牵盒氯侮兾靼强h令趙...
    象浦閱讀 2,036評(píng)論 3 21