ActiveMQ基礎(chǔ)教程(二)簡(jiǎn)單介紹與基礎(chǔ)使用

概述

????ActiveMQ是由Apache出品的犹撒,一款最流行的鞭达,能力強(qiáng)勁的開源消息總線凭涂。ActiveMQ是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn),它非呈唇快速早芭,支持多種語(yǔ)言的客戶端和協(xié)議典蝌,而且可以非常容易的嵌入到企業(yè)的應(yīng)用環(huán)境中本橙,并有許多高級(jí)功能埋凯。

特性

  • 遵循JMS規(guī)范:ActiveMQ的各種特性是JMS1.1規(guī)范的實(shí)現(xiàn)握础。它們包括同步和異步消息傳遞辐董,一次和只有一次的消息傳遞,對(duì)于預(yù)訂者的持久消息等等禀综。依附于JMS規(guī)范意味著简烘,不論JMS消息提供者是誰(shuí),同樣的基本特性都是有效的定枷。(JMS可查看前篇博文(ActiveMQ基礎(chǔ)教程(一)JMS概述http://www.reibang.com/p/639627f88a6e)孤澎。
  • 連接:ActiveMQ提供各種連接選擇,包括HTTP依鸥,HTTPS亥至,IP多點(diǎn)傳送,SSL贱迟,STOMP姐扮,TCP,UDP衣吠,XMPP等茶敏。大量的連接協(xié)議支持使之具有更好的靈活性。
  • 支持多種語(yǔ)言客戶端:ActiveMQ對(duì)多種語(yǔ)言提供客戶端API缚俏,除了Java之外還有C/C++惊搏、.NET、Perl忧换、PHP恬惯、Ruby、Python等亚茬。這使得ActiveMQ能用在Java之外的其它語(yǔ)言中酪耳。很多其它語(yǔ)言都可以通過(guò)ActiveMQ提供的客戶端API使用ActiveMQ的全部特性。當(dāng)然刹缝,ActiveMQ代理器(broker)仍然是運(yùn)行在java虛擬機(jī)上碗暗,但是客戶端能夠使用其它的被支持的語(yǔ)言。
  • 可插拔的持久性和安全:ActiveMQ提供多種持久性方案可供選擇梢夯,也可以完全按自己需求定制驗(yàn)證和授權(quán)言疗。例如,ActiveMQ通過(guò)KahaDB提供自己的超快速消息持久方案(ultra-fast message persistence)颂砸,但也支持標(biāo)準(zhǔn)的JDBC方案噪奄。ActiveMQ可以通過(guò)配置文件提供簡(jiǎn)單的驗(yàn)證和授權(quán)死姚,也提供標(biāo)準(zhǔn)的JAAS登陸模塊。
  • 簡(jiǎn)單的管理:ActiveMQ是為開發(fā)者設(shè)計(jì)的勤篮。它并不需要專門的管理工具知允,因?yàn)樗峁└鞣N易用且強(qiáng)大的管理特性。有很多方法去監(jiān)控ActiveMQ的各個(gè)方面叙谨,可以通過(guò)JMX使用JConsole或ActiveMQ web console;可以運(yùn)行ActiveMQ消息報(bào)告保屯;可以用命令行腳本手负;可以通過(guò)日志。
  • 支持集群:為了利于擴(kuò)展姑尺,多個(gè)ActiveMQ broker能夠聯(lián)合工作竟终。這個(gè)方式就是network of brokers并且能支持多種拓?fù)浣Y(jié)構(gòu)。

安裝與管理后臺(tái)

安裝

????ActiveMQ官網(wǎng)下載地址:http://activemq.apache.org/download.html
????ActiveMQ 提供了Windows 和Linux切蟋、Unix 等幾個(gè)版本统捶。具體安裝方法請(qǐng)自行查找資料進(jìn)行安裝,博主這邊就不多敘述柄粹。

管理后臺(tái)

????安裝成功啟動(dòng)ActiveMQ服務(wù)后喘鸟,在瀏覽器輸入http://localhost:8161,用戶名密碼默認(rèn)都是 admin。下面為登陸成功后的頁(yè)面:

登陸后頁(yè)面.png

Queues頁(yè)面

Queues頁(yè)面.png

????Queues是隊(duì)列方式消息驻右,從菜單欄中點(diǎn)擊Queues可以進(jìn)入到Queues頁(yè)面什黑,頁(yè)面主要內(nèi)容包括:

  • Name:消息隊(duì)列的名稱。
  • Number Of Pending Messages:未被消費(fèi)的消息數(shù)目堪夭。
  • Number Of Consumers:消費(fèi)者的數(shù)量愕把。
  • Messages Enqueued:進(jìn)入隊(duì)列的消息 ;進(jìn)入隊(duì)列的總消息數(shù)目森爽,包括已經(jīng)被消費(fèi)的和未被消費(fèi)的恨豁。 這個(gè)數(shù)量只增不減。
  • Messages Dequeued:出了隊(duì)列的消息爬迟,可以理解為是被消費(fèi)掉的消息數(shù)量橘蜜。在Queues里它和進(jìn)入隊(duì)列的總數(shù)量相等(因?yàn)橐粋€(gè)消息只會(huì)被成功消費(fèi)一次),如果暫時(shí)不等是因?yàn)橄M(fèi)者還沒(méi)來(lái)得及消費(fèi)。

Topics頁(yè)面

Topics頁(yè)面.png

????Topics是主題方式消息雕旨,從菜單欄中點(diǎn)擊Topics可以進(jìn)入到Topics頁(yè)面扮匠,頁(yè)面主要內(nèi)容包括:

  • Name:主題名稱。
  • Number Of Pending Messages:未被消費(fèi)的消息數(shù)目凡涩。
  • Number Of Consumers:消費(fèi)者的數(shù)量棒搜。
  • Messages Enqueued:進(jìn)入隊(duì)列的消息 ;進(jìn)入隊(duì)列的總消息數(shù)目活箕,包括已經(jīng)被消費(fèi)的和未被消費(fèi)的力麸。 這個(gè)數(shù)量只增不減。
  • Messages Dequeued:出了隊(duì)列的消息,可以理解為是被消費(fèi)掉的消息數(shù)量克蚂。在Topics里闺鲸,因?yàn)槎嘞M(fèi)者從而導(dǎo)致數(shù)量會(huì)比入隊(duì)列數(shù)高。

Subscribers頁(yè)面

????Subscribers 是查看訂閱者的頁(yè)面埃叭,可以查看訂閱者的信息等摸恍。只在Topics消息類型中這個(gè)頁(yè)面才會(huì)有數(shù)據(jù)。


Subscribers頁(yè)面.png

Connections頁(yè)面

????Connections頁(yè)面可以查看到所有的連接數(shù)赤屋。


Connections頁(yè)面.png

使用

Queue消息模式

????點(diǎn)對(duì)點(diǎn)的模式主要建立在一個(gè)隊(duì)列上面立镶,當(dāng)連接一個(gè)列隊(duì)的時(shí)候,發(fā)送端不需要知道接收端是否正在接收类早,可以直接向ActiveMQ發(fā)送消息媚媒,發(fā)送的消息將會(huì)先進(jìn)入隊(duì)列中,如果有接收端在監(jiān)聽涩僻,則會(huì)發(fā)向接收端缭召,如果沒(méi)有接收端接收,則會(huì)保存在activemq服務(wù)器逆日,直到接收端接收消息嵌巷,點(diǎn)對(duì)點(diǎn)的消息模式可以有多個(gè)發(fā)送端,多個(gè)接收端室抽,但是一條消息晴竞,只會(huì)被一個(gè)接收端給接收到,哪個(gè)接收端先連上ActiveMQ狠半,則會(huì)先接收到噩死,而后來(lái)的接收端則接收不到那條消息。

生產(chǎn)者

public class JmsProducer {
    /**
     * 默認(rèn)連接用戶名
     */
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默認(rèn)用戶密碼
     */
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默認(rèn)連接地址
     */
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final String QUEUE_NAME = "queue.test";

    public static void main(String[] args) {
        /**
         * 第一步:創(chuàng)建連接工廠
         */
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
        /**
         * 連接
         */
        Connection connection = null;
        /**
         * 會(huì)話
         */
        Session session = null;
        /**
         * 消息目的地
         */
        Destination destination = null;
        /**
         * 消息生產(chǎn)者
         */
        MessageProducer messageProducer = null;
        try {
            /**
             * 第二步:創(chuàng)建連接
             */
            connection = connectionFactory.createConnection();
            /**
             * 啟動(dòng)連接
             */
            connection.start();
            /**
             * 第三步:創(chuàng)建會(huì)話
             */
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            /**
             * 第四步:創(chuàng)建消息目的地神年,其實(shí)就是連接到哪個(gè)隊(duì)列已维,如果是點(diǎn)對(duì)點(diǎn),那么它的實(shí)現(xiàn)是Queue已日,如果是訂閱模式垛耳,那它的實(shí)現(xiàn)是Topic。這里我們創(chuàng)建一個(gè)名為queue.test的消息隊(duì)列飘千。
             */
            destination = session.createQueue(QUEUE_NAME);
            /**
             * 第五步:創(chuàng)建消息生產(chǎn)者
             */
            messageProducer = session.createProducer(destination);
            /**
             * 第六步:發(fā)送消息堂鲜,這個(gè)步驟包括創(chuàng)建消息,然后發(fā)送消息
             */
            sendMessage(session, messageProducer);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (null != session) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (null != connection) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 發(fā)送消息
     *
     * @param session
     * @param messageProducer
     * @throws JMSException
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
        for (int i = 0; i < 10; i++) {
            /**
             * 創(chuàng)建一條文本消息
             */
            TextMessage message = session.createTextMessage("ActiveMQ 發(fā)送消息" + i);
            System.out.println("發(fā)送消息:Activemq 發(fā)送消息" + i);
            /**
             * 通過(guò)消息生產(chǎn)者發(fā)出消息
             */
            messageProducer.send(message);
        }
    }
}

運(yùn)行結(jié)果圖:


程序運(yùn)行截圖.png

ActiveMQ控制臺(tái)截圖.png

????我們可以看的护奈,當(dāng)運(yùn)行JmsProducer程序時(shí)缔莲,在ActiveMQ控制臺(tái),可以看到生產(chǎn)者往queue.test的隊(duì)列中發(fā)送了10條消息霉旗,因?yàn)檫@時(shí)還沒(méi)有消費(fèi)者痴奏,所以這邊的Number Of Pending Messages顯示的是10蛀骇,
Number Of Consumers顯示的是0,Messages Enqueued顯示的也是10读拆。

消費(fèi)者

public class JmsConsumer {
    /**
     * 默認(rèn)連接用戶名
     */
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默認(rèn)用戶密碼
     */
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默認(rèn)連接地址
     */
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final String QUEUE_NAME = "queue.test";

    public static void main(String[] args) {
        /**
         * 第一步:創(chuàng)建連接工廠
         */
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
        /**
         * 連接
         */
        Connection connection = null;
        /**
         * 會(huì)話
         */
        Session session = null;
        /**
         * 消息目的地
         */
        Destination destination = null;
        /**
         * 消息消費(fèi)者
         */
        MessageConsumer messageConsumer = null;
        try {
            /**
             * 第二步:創(chuàng)建連接
             */
            connection = connectionFactory.createConnection();
            /**
             * 啟動(dòng)連接
             */
            connection.start();
            /**
             * 第三步:創(chuàng)建會(huì)話
             */
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            /**
             * 第四步:創(chuàng)建消息目的地擅憔,其實(shí)就是連接到哪個(gè)隊(duì)列,如果是點(diǎn)對(duì)點(diǎn)檐晕,那么它的實(shí)現(xiàn)是Queue暑诸,如果是訂閱模式,那它的實(shí)現(xiàn)是Topic辟灰。這里我們創(chuàng)建一個(gè)名為queue.test的消息隊(duì)列屠列。
             */
            destination = session.createQueue(QUEUE_NAME);
            /**
             * 第五步:創(chuàng)建消費(fèi)者
             */
            messageConsumer = session.createConsumer(destination);
            while (true) {
                /**
                 * 接收數(shù)據(jù)的時(shí)間(等待) 100 ms
                 */
                TextMessage textMessage = (TextMessage) messageConsumer.receive(1000 * 100);
                if (textMessage != null) {
                    System.out.println("收到的消息:" + textMessage.getText());
                } else {
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (null != session) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (null != connection) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

運(yùn)行結(jié)果圖:


程序運(yùn)行截圖.png

ActiveMQ控制臺(tái)截圖.png

????我們可以看到,但運(yùn)行JmsConsumer程序時(shí)伞矩,在運(yùn)行程序的控制臺(tái)中我們可以看到消費(fèi)者消費(fèi)了剛剛生產(chǎn)者生產(chǎn)的消息。在ActiveMQ控制臺(tái)夏志,可以看到所以這邊的Number Of Pending Messages顯示的是0乃坤,Number Of Consumers顯示的是1,Messages Enqueued顯示的是10沟蔑,Messages Dequeued顯示的也是10湿诊,即消息被消費(fèi)。
???? 在前面的消費(fèi)者例子中瘦材,我們這邊使用while (true) 死循環(huán)來(lái)不停接受消息厅须。這樣很浪費(fèi)cpu資源,實(shí)際生產(chǎn)中不會(huì)這么做食棕。下面朗和,我們采用注冊(cè)一個(gè)監(jiān)聽器的方法,當(dāng)監(jiān)聽到有消息入隊(duì)列后簿晓,才去接收消息眶拉。

public class JmsConsumerMessageListener {
    /**
     * 默認(rèn)連接用戶名
     */
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默認(rèn)用戶密碼
     */
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默認(rèn)連接地址
     */
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final String QUEUE_NAME = "queue.test";

    public static void main(String[] args) {
        /**
         * 第一步:創(chuàng)建連接工廠
         */
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
        /**
         * 連接
         */
        Connection connection = null;
        /**
         * 會(huì)話
         */
        Session session = null;
        /**
         * 消息目的地
         */
        Destination destination = null;
        /**
         * 消息消費(fèi)者
         */
        MessageConsumer messageConsumer = null;
        try {
            /**
             * 第二步:創(chuàng)建連接
             */
            connection = connectionFactory.createConnection();
            /**
             * 啟動(dòng)連接
             */
            connection.start();
            /**
             * 第三步:創(chuàng)建會(huì)話
             */
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            /**
             * 第四步:創(chuàng)建消息目的地,其實(shí)就是連接到哪個(gè)隊(duì)列憔儿,如果是點(diǎn)對(duì)點(diǎn)忆植,那么它的實(shí)現(xiàn)是Queue,如果是訂閱模式谒臼,那它的實(shí)現(xiàn)是Topic朝刊。這里我們創(chuàng)建一個(gè)名為queue.test的消息隊(duì)列。
             */
            destination = session.createQueue(QUEUE_NAME);
            /**
             * 第五步:創(chuàng)建消費(fèi)者
             */
            messageConsumer = session.createConsumer(destination);
            /**
             * 第六步:創(chuàng)建監(jiān)聽器
             */
            messageConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("收到的消息:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
//            if (null != session) {
//                try {
//                    session.close();
//                } catch (JMSException e) {
//                    e.printStackTrace();
//                }
//            }
//            if (null != connection) {
//                try {
//                    connection.close();
//                } catch (JMSException e) {
//                    e.printStackTrace();
//                }
//            }
        }
    }
}

運(yùn)行結(jié)果圖:


程序運(yùn)行截圖.png
ActiveMQ控制臺(tái)截圖.png

????當(dāng)生產(chǎn)者一生產(chǎn)消息到隊(duì)列中時(shí)蜈缤,我們的消費(fèi)者就馬上進(jìn)行消費(fèi)拾氓,注意程序中我們沒(méi)有將會(huì)話和連接關(guān)閉,因?yàn)楸O(jiān)聽器是異步的底哥,如果關(guān)閉后就無(wú)法接收到消息田盈。

Topic消息模式

????訂閱/發(fā)布模式掀虎,同樣可以有著多個(gè)發(fā)送端與多個(gè)接收端村斟,但是接收端與發(fā)送端存在時(shí)間上的依賴,就是如果發(fā)送端發(fā)送消息的時(shí)候易阳,接收端并沒(méi)有監(jiān)聽消息,那么ActiveMQ將不會(huì)保存消息吃粒,將會(huì)認(rèn)為消息已經(jīng)發(fā)送潦俺,換一種說(shuō)法,就是發(fā)送端發(fā)送消息的時(shí)候徐勃,接收端不在線事示,是接收不到消息的,哪怕以后監(jiān)聽消息僻肖,同樣也是接收不到的肖爵。這個(gè)模式還有一個(gè)特點(diǎn),那就是發(fā)送端發(fā)送的消息臀脏,將會(huì)被所有的接收端給接收到劝堪,不類似點(diǎn)對(duì)點(diǎn),一條消息只會(huì)被一個(gè)接收端給接收到揉稚。

發(fā)布者

public class JmsProducer {
    /**
     * 默認(rèn)連接用戶名
     */
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默認(rèn)用戶密碼
     */
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默認(rèn)連接地址
     */
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final String TOPIC_NAME = "topic.test";

    public static void main(String[] args) {
        /**
         * 第一步:創(chuàng)建連接工廠
         */
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
        /**
         * 連接
         */
        Connection connection = null;
        /**
         * 會(huì)話
         */
        Session session = null;
        /**
         * 消息目的地秒啦,其實(shí)就是連接到哪個(gè)隊(duì)列,如果是點(diǎn)對(duì)點(diǎn)搀玖,那么它的實(shí)現(xiàn)是Queue余境,如果是訂閱模式,那它的實(shí)現(xiàn)是Topic
         */
        Destination destination = null;
        /**
         * 消息生產(chǎn)者
         */
        MessageProducer messageProducer = null;
        try {
            /**
             * 第二步:創(chuàng)建連接
             */
            connection = connectionFactory.createConnection();
            /**
             * 啟動(dòng)連接
             */
            connection.start();
            /**
             * 第三步:創(chuàng)建會(huì)話
             */
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            /**
             * 第四步:創(chuàng)建消息目的地灌诅,這里我們創(chuàng)建一個(gè)名為topic.test的主題
             */
            destination = session.createTopic(TOPIC_NAME);
            /**
             * 第五步:創(chuàng)建消息生產(chǎn)者
             */
            messageProducer = session.createProducer(destination);
            /**
             * 第六步:發(fā)送消息芳来,這個(gè)步驟包括創(chuàng)建消息,然后發(fā)送消息
             */
            sendMessage(session, messageProducer);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (null != session) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (null != connection) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 發(fā)送消息
     *
     * @param session
     * @param messageProducer
     * @throws JMSException
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
        for (int i = 0; i < 10; i++) {
            /**
             * 創(chuàng)建一條文本消息
             */
            TextMessage message = session.createTextMessage("ActiveMQ 發(fā)送消息" + i);
            System.out.println("發(fā)送消息:Activemq 發(fā)送消息" + i);
            /**
             * 通過(guò)消息生產(chǎn)者發(fā)出消息
             */
            messageProducer.send(message);
        }
    }
}

運(yùn)行結(jié)果圖:


程序運(yùn)行截圖.png
ActiveMQ控制臺(tái)截圖.png

訂閱者

public class JmsConsumer {
    /**
     * 默認(rèn)連接用戶名
     */
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默認(rèn)用戶密碼
     */
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默認(rèn)連接地址
     */
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final String TOPIC_NAME = "topic.test";

    public static void main(String[] args) {
        /**
         * 第一步:創(chuàng)建連接工廠
         */
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
        /**
         * 連接
         */
        Connection connection = null;
        /**
         * 會(huì)話
         */
        Session session = null;
        /**
         * 消息目的地猜拾,其實(shí)就是連接到哪個(gè)隊(duì)列绣张,如果是點(diǎn)對(duì)點(diǎn),那么它的實(shí)現(xiàn)是Queue关带,如果是訂閱模式侥涵,那它的實(shí)現(xiàn)是Topic
         */
        Destination destination = null;
        /**
         * 消息消費(fèi)者
         */
        MessageConsumer messageConsumer = null;
        try {
            /**
             * 第二步:創(chuàng)建連接
             */
            connection = connectionFactory.createConnection();
            /**
             * 啟動(dòng)連接
             */
            connection.start();
            /**
             * 第三步:創(chuàng)建會(huì)話
             */
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            /**
             * 第四步:創(chuàng)建消息目的地,這里我們創(chuàng)建一個(gè)名為topic.test的主題
             */
            destination = session.createTopic(TOPIC_NAME);
            /**
             * 第五步:創(chuàng)建消費(fèi)者
             */
            messageConsumer = session.createConsumer(destination);
            messageConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("收到的消息:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
//            if (null != session) {
//                try {
//                    session.close();
//                } catch (JMSException e) {
//                    e.printStackTrace();
//                }
//            }
//            if (null != connection) {
//                try {
//                    connection.close();
//                } catch (JMSException e) {
//                    e.printStackTrace();
//                }
//            }
        }
    }
}

運(yùn)行結(jié)果圖:


程序運(yùn)行截圖.png
ActiveMQ控制臺(tái)截圖.png

????我們可以發(fā)現(xiàn)宋雏,Topic消息模式的代碼跟Queue消息模式的代碼基本是一樣的芜飘,除了在創(chuàng)建消息目的地的時(shí)候,一個(gè)是queue一個(gè)是topic磨总;還有一點(diǎn)區(qū)別就是嗦明,Topic消息模式,訂閱者需要先訂閱蚪燕,才能接收到發(fā)布者發(fā)布的消息娶牌。

談?wù)凷ession

????在通過(guò)Connection創(chuàng)建Session的時(shí)候奔浅,需要設(shè)置2個(gè)參數(shù),一個(gè)是否支持事務(wù)诗良,另一個(gè)是簽收的模式汹桦。
????簽收就是消費(fèi)者接受到消息后,需要告訴消息服務(wù)器鉴裹,我收到消息了舞骆。當(dāng)消息服務(wù)器收到回執(zhí)后,本條消息將失效径荔。因此簽收將對(duì)PTP模式產(chǎn)生很大影響督禽。如果消費(fèi)者收到消息后,并不簽收总处,那么本條消息繼續(xù)有效狈惫,很可能會(huì)被其他消費(fèi)者消費(fèi)掉!
????簽收方式有三種:

  • AUTO_ACKNOWLEDGE:表示在消費(fèi)者receive消息的時(shí)候自動(dòng)的簽收鹦马‰侍福客戶端發(fā)送和接收消息不需要做額外的工作。哪怕是接收端發(fā)生異常菠红,也會(huì)被當(dāng)作正常發(fā)送成功。
  • CLIENT_ACKNOWLEDGE:表示消費(fèi)者receive消息后必須手動(dòng)的調(diào)用acknowledge()方法進(jìn)行簽收难菌。
  • DUPS_OK_ACKNOWLEDGE:允許副本的確認(rèn)模式试溯。一旦接收方應(yīng)用程序的方法調(diào)用從處理消息處返回,會(huì)話對(duì)象就會(huì)確認(rèn)消息的接收郊酒;而且允許重復(fù)確認(rèn)遇绞。

發(fā)送消息的數(shù)據(jù)類型

????我們上面演示的全都是字符串的消息類型,但ActiveMQ支持的還有ObjectMessage燎窘,StreamMessage摹闽,MapMessage,BytesMessage等消息類型褐健。下面我們來(lái)看看其他消息類型是如何編寫的付鹿,以下都是以隊(duì)列的消息模式進(jìn)行。

ObjectMessage

傳輸對(duì)象

public class User implements Serializable {
    private static final long serialVersionUID = 2504467948968634865L;
    private String userName;
    private String password;

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public String toString() {
        return "User{" +
                "userName='" + userName + '\'' +
                ", password='" + password + '\'' +
                '}';
    }
}

生產(chǎn)者

public class JmsProducer {
    /**
     * 默認(rèn)連接用戶名
     */
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默認(rèn)用戶密碼
     */
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默認(rèn)連接地址
     */
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final String QUEUE_NAME = "object.test";

    public static void main(String[] args) {
        /**
         * 第一步:創(chuàng)建連接工廠
         */
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
        /**
         * 設(shè)置所有對(duì)所有序列化包都信任
         */
        connectionFactory.setTrustAllPackages(true);
        /**
         * 連接
         */
        Connection connection = null;
        /**
         * 會(huì)話
         */
        Session session = null;
        /**
         * 消息目的地
         */
        Destination destination = null;
        /**
         * 消息生產(chǎn)者
         */
        MessageProducer messageProducer = null;
        try {
            /**
             * 第二步:創(chuàng)建連接
             */
            connection = connectionFactory.createConnection();
            /**
             * 啟動(dòng)連接
             */
            connection.start();
            /**
             * 第三步:創(chuàng)建會(huì)話
             */
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            /**
             * 第四步:創(chuàng)建消息目的地蚜迅,這里我們創(chuàng)建一個(gè)名為object.test的消息隊(duì)列
             */
            destination = session.createQueue(QUEUE_NAME);
            /**
             * 第五步:創(chuàng)建消息生產(chǎn)者
             */
            messageProducer = session.createProducer(destination);
            /**
             * 第六步:發(fā)送消息舵匾,這個(gè)步驟包括創(chuàng)建消息,然后發(fā)送消息
             */
            sendMessage(session, messageProducer);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (null != session) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (null != connection) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 發(fā)送消息
     *
     * @param session
     * @param messageProducer
     * @throws JMSException
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
        /**
         * 創(chuàng)建一條Object消息
         */
        ActiveMQObjectMessage objectMessage = (ActiveMQObjectMessage) session.createObjectMessage();
        for (int i = 0; i < 10; i++) {
            User user = new User();
            user.setUserName("hyn" + i);
            user.setPassword("qwe" + i);
            System.out.println("發(fā)送消息:Activemq 發(fā)送消息" + user.toString());
            /**
             * 對(duì)象需要序列化
             */
            objectMessage.setObject(user);
            /**
             * 通過(guò)消息生產(chǎn)者發(fā)出消息
             */
            messageProducer.send(objectMessage);
        }
    }
}

運(yùn)行結(jié)果圖:


程序運(yùn)行截圖.png

ActiveMq控制臺(tái)截圖.png

消費(fèi)者

public class JmsConsumerMessageListener {
    /**
     * 默認(rèn)連接用戶名
     */
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默認(rèn)用戶密碼
     */
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默認(rèn)連接地址
     */
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final String QUEUE_NAME = "object.test";

    public static void main(String[] args) {
        /**
         * 第一步:創(chuàng)建連接工廠
         */
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
        /**
         * 設(shè)置所有對(duì)所有序列化包都信任
         */
        connectionFactory.setTrustAllPackages(true);
        /**
         * 連接
         */
        Connection connection = null;
        /**
         * 會(huì)話
         */
        Session session = null;
        /**
         * 消息目的地
         */
        Destination destination = null;
        /**
         * 消息消費(fèi)者
         */
        MessageConsumer messageConsumer = null;
        try {
            /**
             * 第二步:創(chuàng)建連接
             */
            connection = connectionFactory.createConnection();
            /**
             * 啟動(dòng)連接
             */
            connection.start();
            /**
             * 第三步:創(chuàng)建會(huì)話
             */
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            /**
             * 第四步:創(chuàng)建消息目的地谁不,這里我們創(chuàng)建一個(gè)名為object.test的消息隊(duì)列
             */
            destination = session.createQueue(QUEUE_NAME);
            /**
             * 第五步:創(chuàng)建消費(fèi)者
             */
            messageConsumer = session.createConsumer(destination);
            /**
             * 第六步:創(chuàng)建監(jiān)聽器
             */
            messageConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    try {
                        User user = (User) ((ActiveMQObjectMessage) message).getObject();
                        System.out.println("收到的消息:" + user.toString());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
//            if (null != session) {
//                try {
//                    session.close();
//                } catch (JMSException e) {
//                    e.printStackTrace();
//                }
//            }
//            if (null != connection) {
//                try {
//                    connection.close();
//                } catch (JMSException e) {
//                    e.printStackTrace();
//                }
//            }
        }
    }
}

運(yùn)行結(jié)果圖:


程序運(yùn)行截圖.png

ActiveMq控制臺(tái)截圖.png

????從代碼中我們可以看的坐梯,ObjectMessage跟TextMessage代碼差不多,只不過(guò)有兩個(gè)地方需要注意:

  • 所傳輸?shù)膶?duì)象必須是序列化的刹帕,也就是要實(shí)現(xiàn)Serializable接口吵血;
  • 在創(chuàng)建連接工廠時(shí)谎替,需要添加對(duì)所有或需要傳輸?shù)男蛄谢瘜?duì)象所在的包為白名單,這個(gè)是從ActiveMQ 5.12.2 開始為了增強(qiáng)這個(gè)框架的安全性蹋辅,ActiveMQ將強(qiáng)制用戶配置可序列化的包名钱贯;

BytesMessage

????首先我們項(xiàng)目的資源目錄下新建兩個(gè)文件,producer.txt 和 consumer.txt晕翠,在producer.txt輸入如下內(nèi)容喷舀,consumer.txt為空。


項(xiàng)目.png

producer.txt.png

consumer.txt.png

生產(chǎn)者

public class JmsProducer {
    /**
     * 默認(rèn)連接用戶名
     */
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默認(rèn)用戶密碼
     */
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默認(rèn)連接地址
     */
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final String QUEUE_NAME = "bytes.test";

    public static void main(String[] args) {
        /**
         * 第一步:創(chuàng)建連接工廠
         */
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
        /**
         * 連接
         */
        Connection connection = null;
        /**
         * 會(huì)話
         */
        Session session = null;
        /**
         * 消息目的地
         */
        Destination destination = null;
        /**
         * 消息生產(chǎn)者
         */
        MessageProducer messageProducer = null;
        try {
            /**
             * 第二步:創(chuàng)建連接
             */
            connection = connectionFactory.createConnection();
            /**
             * 啟動(dòng)連接
             */
            connection.start();
            /**
             * 第三步:創(chuàng)建會(huì)話
             */
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            /**
             * 第四步:創(chuàng)建消息目的地淋肾,這里我們創(chuàng)建一個(gè)名為bytes.test的消息隊(duì)列
             */
            destination = session.createQueue(QUEUE_NAME);
            /**
             * 第五步:創(chuàng)建消息生產(chǎn)者
             */
            messageProducer = session.createProducer(destination);
            /**
             * 第六步:發(fā)送消息硫麻,這個(gè)步驟包括創(chuàng)建消息,然后發(fā)送消息
             */
            sendMessage(session, messageProducer);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (null != session) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (null != connection) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 發(fā)送消息
     *
     * @param session
     * @param messageProducer
     * @throws JMSException
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
        /**
         * 創(chuàng)建一條Byte消息
         */
        BytesMessage bytesMessage = session.createBytesMessage();
        bytesMessage.writeBytes(getFileByte(System.getProperty("user.dir")+"/src/main/resources/producer.txt"));
        messageProducer.send(bytesMessage);
    }

    /**
     * 讀取文件
     *
     * @param fileUrl
     * @return
     */
    public static byte[] getFileByte(String fileUrl) {
        byte[] buffer = null;
        FileInputStream fileInputStream = null;
        try {
            fileInputStream = new FileInputStream(new File(ResourceUtils.getURL(fileUrl).getPath()));
            buffer = new byte[fileInputStream.available()];
            fileInputStream.read(buffer);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (fileInputStream != null) {
                try {
                    fileInputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return buffer;
    }
}

運(yùn)行結(jié)果圖:


ActiveMQ控制臺(tái)截圖.png

消費(fèi)者

public class JmsConsumerMessageListener {
    /**
     * 默認(rèn)連接用戶名
     */
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默認(rèn)用戶密碼
     */
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默認(rèn)連接地址
     */
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final String QUEUE_NAME = "bytes.test";

    public static void main(String[] args) {
        /**
         * 第一步:創(chuàng)建連接工廠
         */
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
        /**
         * 連接
         */
        Connection connection = null;
        /**
         * 會(huì)話
         */
        Session session = null;
        /**
         * 消息目的地
         */
        Destination destination = null;
        /**
         * 消息消費(fèi)者
         */
        MessageConsumer messageConsumer = null;
        try {
            /**
             * 第二步:創(chuàng)建連接
             */
            connection = connectionFactory.createConnection();
            /**
             * 啟動(dòng)連接
             */
            connection.start();
            /**
             * 第三步:創(chuàng)建會(huì)話
             */
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            /**
             * 第四步:創(chuàng)建消息目的地樊卓,這里我們創(chuàng)建一個(gè)名為bytes.test的消息隊(duì)列
             */
            destination = session.createQueue(QUEUE_NAME);
            /**
             * 第五步:創(chuàng)建消費(fèi)者
             */
            messageConsumer = session.createConsumer(destination);
            /**
             * 第六步:創(chuàng)建監(jiān)聽器
             */
            messageConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    FileOutputStream fileOutputStream = null;
                    try {
                        BytesMessage bytesMessage = (BytesMessage) message;
                        fileOutputStream = new FileOutputStream(new File((System.getProperty("user.dir") + "/src/main/resources/consumer.txt")));
                        byte[] content = new byte[1024];
                        int len;
                        while ((len = bytesMessage.readBytes(content)) != -1) {
                            fileOutputStream.write(content, 0, len);
                        }
                    } catch (JMSException e) {
                        e.printStackTrace();
                    } catch (FileNotFoundException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } finally {
                        if (fileOutputStream != null) {
                            try {
                                fileOutputStream.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            });
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
//            if (null != session) {
//                try {
//                    session.close();
//                } catch (JMSException e) {
//                    e.printStackTrace();
//                }
//            }
//            if (null != connection) {
//                try {
//                    connection.close();
//                } catch (JMSException e) {
//                    e.printStackTrace();
//                }
//            }
        }
    }
}

運(yùn)行結(jié)果圖:


ActiveMQ控制圖截圖.png

consumer.txt結(jié)果圖.png

????從結(jié)果可以看出拿愧,consumer.txt的內(nèi)容結(jié)果跟product.txt內(nèi)容是一致的,即消息接收成功碌尔。當(dāng)然浇辜,發(fā)送文件的話我們也可以使用StreamMessage,下面我們來(lái)看看StreamMessage的使用唾戚。

StreamMessage

????同樣需要在項(xiàng)目中新建producer.txt 和 consumer.txt兩個(gè)文件柳洋;

生產(chǎn)者

public class JmsProducer {
    /**
     * 默認(rèn)連接用戶名
     */
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默認(rèn)用戶密碼
     */
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默認(rèn)連接地址
     */
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final String QUEUE_NAME = "stream.test";

    public static void main(String[] args) {
        /**
         * 第一步:創(chuàng)建連接工廠
         */
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
        /**
         * 連接
         */
        Connection connection = null;
        /**
         * 會(huì)話
         */
        Session session = null;
        /**
         * 消息目的地
         */
        Destination destination = null;
        /**
         * 消息生產(chǎn)者
         */
        MessageProducer messageProducer = null;
        try {
            /**
             * 第二步:創(chuàng)建連接
             */
            connection = connectionFactory.createConnection();
            /**
             * 啟動(dòng)連接
             */
            connection.start();
            /**
             * 第三步:創(chuàng)建會(huì)話
             */
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            /**
             * 第四步:創(chuàng)建消息目的地,這里我們創(chuàng)建一個(gè)名為stream.test的消息隊(duì)列
             */
            destination = session.createQueue(QUEUE_NAME);
            /**
             * 第五步:創(chuàng)建消息生產(chǎn)者
             */
            messageProducer = session.createProducer(destination);
            /**
             * 第六步:發(fā)送消息叹坦,這個(gè)步驟包括創(chuàng)建消息熊镣,然后發(fā)送消息
             */
            sendMessage(session, messageProducer);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (null != session) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (null != connection) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 發(fā)送消息
     *
     * @param session
     * @param messageProducer
     * @throws JMSException
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
        /**
         * 創(chuàng)建一條streamMessage消息
         */
        StreamMessage streamMessage = session.createStreamMessage();
        streamMessage.writeBytes(getFileByte(System.getProperty("user.dir") + "/src/main/resources/producer.txt"));
        messageProducer.send(streamMessage);
    }

    /**
     * 讀取文件
     *
     * @param fileUrl
     * @return
     */
    public static byte[] getFileByte(String fileUrl) {
        byte[] buffer = null;
        FileInputStream fileInputStream = null;
        try {
            fileInputStream = new FileInputStream(new File(ResourceUtils.getURL(fileUrl).getPath()));
            buffer = new byte[fileInputStream.available()];
            fileInputStream.read(buffer);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (fileInputStream != null) {
                try {
                    fileInputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        return buffer;
    }
}

運(yùn)行結(jié)果圖:


ActiveMQ控制臺(tái)截圖.png

消費(fèi)者

public class JmsConsumerMessageListener {
    /**
     * 默認(rèn)連接用戶名
     */
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默認(rèn)用戶密碼
     */
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默認(rèn)連接地址
     */
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final String QUEUE_NAME = "stream.test";

    public static void main(String[] args) {
        /**
         * 第一步:創(chuàng)建連接工廠
         */
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
        /**
         * 連接
         */
        Connection connection = null;
        /**
         * 會(huì)話
         */
        Session session = null;
        /**
         * 消息目的地
         */
        Destination destination = null;
        /**
         * 消息消費(fèi)者
         */
        MessageConsumer messageConsumer = null;
        try {
            /**
             * 第二步:創(chuàng)建連接
             */
            connection = connectionFactory.createConnection();
            /**
             * 啟動(dòng)連接
             */
            connection.start();
            /**
             * 第三步:創(chuàng)建會(huì)話
             */
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            /**
             * 第四步:創(chuàng)建消息目的地,這里我們創(chuàng)建一個(gè)名為stream.test的消息隊(duì)列
             */
            destination = session.createQueue(QUEUE_NAME);
            /**
             * 第五步:創(chuàng)建消費(fèi)者
             */
            messageConsumer = session.createConsumer(destination);
            /**
             * 第六步:創(chuàng)建監(jiān)聽器
             */
            messageConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    FileOutputStream fileOutputStream = null;
                    try {
                        StreamMessage streamMessage = (StreamMessage) message;
                        fileOutputStream = new FileOutputStream(new File((System.getProperty("user.dir") + "/src/main/resources/consumer.txt")));
                        byte[] content = new byte[1024];
                        int len;
                        while ((len = streamMessage.readBytes(content)) != -1) {
                            fileOutputStream.write(content, 0, len);
                        }
                    } catch (JMSException e) {
                        e.printStackTrace();
                    } catch (FileNotFoundException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } finally {
                        if (fileOutputStream != null) {
                            try {
                                fileOutputStream.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            });
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
//            if (null != session) {
//                try {
//                    session.close();
//                } catch (JMSException e) {
//                    e.printStackTrace();
//                }
//            }
//            if (null != connection) {
//                try {
//                    connection.close();
//                } catch (JMSException e) {
//                    e.printStackTrace();
//                }
//            }
        }
    }
}

運(yùn)行結(jié)果圖:


ActiveMQ控制臺(tái)結(jié)果圖.png

consumer.txt結(jié)果.png

MapMessage

生產(chǎn)者

public class JmsProducer {
    /**
     * 默認(rèn)連接用戶名
     */
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默認(rèn)用戶密碼
     */
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默認(rèn)連接地址
     */
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final String QUEUE_NAME = "map.test";

    public static void main(String[] args) {
        /**
         * 第一步:創(chuàng)建連接工廠
         */
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
        /**
         * 連接
         */
        Connection connection = null;
        /**
         * 會(huì)話
         */
        Session session = null;
        /**
         * 消息目的地
         */
        Destination destination = null;
        /**
         * 消息生產(chǎn)者
         */
        MessageProducer messageProducer = null;
        try {
            /**
             * 第二步:創(chuàng)建連接
             */
            connection = connectionFactory.createConnection();
            /**
             * 啟動(dòng)連接
             */
            connection.start();
            /**
             * 第三步:創(chuàng)建會(huì)話
             */
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            /**
             * 第四步:創(chuàng)建消息目的地募书,這里我們創(chuàng)建一個(gè)名為map.test的消息隊(duì)列
             */
            destination = session.createQueue(QUEUE_NAME);
            /**
             * 第五步:創(chuàng)建消息生產(chǎn)者
             */
            messageProducer = session.createProducer(destination);
            /**
             * 第六步:發(fā)送消息绪囱,這個(gè)步驟包括創(chuàng)建消息,然后發(fā)送消息
             */
            sendMessage(session, messageProducer);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (null != session) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (null != connection) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 發(fā)送消息
     *
     * @param session
     * @param messageProducer
     * @throws JMSException
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws JMSException {
        /**
         * 創(chuàng)建一條mapMessage消息
         */
        MapMessage mapMessage = session.createMapMessage();
        mapMessage.setString("name","hyn");
        mapMessage.setInt("age",27);
        messageProducer.send(mapMessage);
    }
}

運(yùn)行結(jié)果圖:


ActiveMQ控制臺(tái)截圖.png

消費(fèi)者

public class JmsConsumerMessageListener {
    /**
     * 默認(rèn)連接用戶名
     */
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默認(rèn)用戶密碼
     */
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默認(rèn)連接地址
     */
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static final String QUEUE_NAME = "map.test";

    public static void main(String[] args) {
        /**
         * 第一步:創(chuàng)建連接工廠
         */
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
        /**
         * 連接
         */
        Connection connection = null;
        /**
         * 會(huì)話
         */
        Session session = null;
        /**
         * 消息目的地
         */
        Destination destination = null;
        /**
         * 消息消費(fèi)者
         */
        MessageConsumer messageConsumer = null;
        try {
            /**
             * 第二步:創(chuàng)建連接
             */
            connection = connectionFactory.createConnection();
            /**
             * 啟動(dòng)連接
             */
            connection.start();
            /**
             * 第三步:創(chuàng)建會(huì)話
             */
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            /**
             * 第四步:創(chuàng)建消息目的地莹捡,這里我們創(chuàng)建一個(gè)名為stream.test的消息隊(duì)列
             */
            destination = session.createQueue(QUEUE_NAME);
            /**
             * 第五步:創(chuàng)建消費(fèi)者
             */
            messageConsumer = session.createConsumer(destination);
            /**
             * 第六步:創(chuàng)建監(jiān)聽器
             */
            messageConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    MapMessage mapMessage = (MapMessage) message;
                    try {
                        System.out.println("name:" + mapMessage.getString("name"));
                        System.out.println("age:" + mapMessage.getInt("age"));
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
//            if (null != session) {
//                try {
//                    session.close();
//                } catch (JMSException e) {
//                    e.printStackTrace();
//                }
//            }
//            if (null != connection) {
//                try {
//                    connection.close();
//                } catch (JMSException e) {
//                    e.printStackTrace();
//                }
//            }
        }
    }
}

運(yùn)行結(jié)果圖:


程序運(yùn)行截圖.png

ActiveMQ控制臺(tái)截圖.png

ActiveMQ的應(yīng)用

保證消息的成功處理

????消息發(fā)送成功后鬼吵,接收端接收到了消息。然后進(jìn)行處理篮赢,但是可能由于某種原因齿椅,高并發(fā)也好,IO阻塞也好启泣,反正這條消息在接收端處理失敗了媒咳。而點(diǎn)對(duì)點(diǎn)的特性是一條消息,只會(huì)被一個(gè)接收端給接收种远,只要接收端A接收成功了涩澡,接收端B,就不可能接收到這條消息,如果是一些普通的消息還好妙同,但是如果是一些很重要的消息射富,比如說(shuō)用戶的支付訂單,用戶的退款粥帚,這些與金錢相關(guān)的胰耗,是必須保證成功的,那么這個(gè)時(shí)候要怎么處理呢芒涡?
????我們可以在創(chuàng)建session的時(shí)候使用 CLIENT_ACKNOWLEDGE 模式柴灯。創(chuàng)建session的時(shí)候是需要指定事務(wù)以及消息的處理模式的。我們之前是這樣創(chuàng)建session:

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

????AUTO_ACKNOWLEDGE的消息處理模式是當(dāng)消息發(fā)送給接收端之后费尽,就自動(dòng)確認(rèn)成功了赠群,而不管接收端有沒(méi)有處理成功,而一旦確認(rèn)成功后旱幼,就會(huì)把隊(duì)列里面的消息給清除掉查描,避免下一個(gè)接收端接收到同樣的消息。

session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

而當(dāng)我們使用CLIENT_ACKNOWLEDGE的消息處理模式時(shí)柏卤,如果接收端不確認(rèn)消息冬三,那么activemq將會(huì)把這條消息一直保留,直到有一個(gè)接收端確定了消息缘缚。那么要怎么確認(rèn)消息呢勾笆?具體代碼如下:

messageConsumer.setMessageListener(new MessageListener() {
        public void onMessage(Message message) {
              TextMessage textMessage = (TextMessage) message;
              try {
                   System.out.println("收到的消息:" + textMessage.getText());
                   //確認(rèn)接收,并成功處理了消息
                   textMessage.acknowledge();
              } catch (JMSException e) {
                   e.printStackTrace();
              }
        }
});

避免消息隊(duì)列的并發(fā)

主動(dòng)接收隊(duì)列消息

????之前的代碼里面桥滨,實(shí)現(xiàn)了一個(gè)監(jiān)聽器窝爪,監(jiān)聽消息的傳遞,這樣只要每有一個(gè)消息该园,都會(huì)即時(shí)的傳遞到程序中酸舍。但是帅韧,這樣的處理里初,在高并發(fā)的時(shí)候,因?yàn)樗潜粍?dòng)接收忽舟,并沒(méi)有考慮到程序的處理能力双妨,可能會(huì)壓跨系統(tǒng),那要怎么辦呢?
????答案就是把被動(dòng)變?yōu)橹鲃?dòng)叮阅,當(dāng)程序有著處理消息的能力時(shí)刁品,主動(dòng)去接收一條消息進(jìn)行處理

if(當(dāng)程序有能力處理){//當(dāng)程序有能力處理時(shí)接收
    Message receive = consumer.receive();
    //這個(gè)可以設(shè)置超時(shí)時(shí)間,超過(guò)則不等待消息 
    recieve.receive(10000);
    //其實(shí)receive是一個(gè)阻塞式方法浩姥,一定會(huì)拿到值的
    if(null != receive){
        String text = ((TextMessage)receive).getText();
        receive.acknowledge();
        System.out.println(text);
    }else{
        //沒(méi)有值
    }
}

使用多個(gè)接收端

????ActiveMQ是支持多個(gè)接收端的挑随,如果當(dāng)程序無(wú)法處理這么多數(shù)據(jù)的時(shí)候,可以考慮多個(gè)線程勒叠,或者增加服務(wù)器來(lái)處理兜挨。

消息有效期的管理

????這樣的場(chǎng)景也是有的膏孟,一條消息的有效時(shí)間,當(dāng)發(fā)送一條消息的時(shí)候拌汇,可能希望這條消息在指定的時(shí)間被處理柒桑,如果超過(guò)了指定的時(shí)間,那么這條消息就失效了噪舀,就不需要進(jìn)行處理了魁淳,那么我們可以使用ActiveMQ的設(shè)置有效期來(lái)實(shí)現(xiàn)。具體設(shè)置如下:

producer.setTimeToLive(long l);

過(guò)期消息与倡,處理失敗的消息如何處理

????過(guò)期的界逛、處理失敗的消息,將會(huì)被ActiveMQ置入“ActiveMQ.DLQ”這個(gè)隊(duì)列中蒸走。這個(gè)隊(duì)列是ActiveMQ自動(dòng)創(chuàng)建的仇奶。如果需要查看這些未被處理的消息,可以進(jìn)入這個(gè)隊(duì)列中查看:

//指定一個(gè)目的地比驻,也就是一個(gè)隊(duì)列的位置
destination = session.createQueue("ActiveMQ.DLQ");

????這樣就可以進(jìn)入隊(duì)列中该溯,然后實(shí)現(xiàn)接口,或者通過(guò)receive()方法别惦,就可以拿到未被處理的消息狈茉,從而保證正確的處理。


????整理文章主要為了自己日后復(fù)習(xí)用掸掸,文章中可能會(huì)引用到別的博主的文章氯庆,如涉及到博主的版權(quán)問(wèn)題,請(qǐng)博主聯(lián)系我扰付。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末堤撵,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子羽莺,更是在濱河造成了極大的恐慌实昨,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,651評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件盐固,死亡現(xiàn)場(chǎng)離奇詭異荒给,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)刁卜,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,468評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門志电,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人蛔趴,你說(shuō)我怎么就攤上這事挑辆。” “怎么了路呜?”我有些...
    開封第一講書人閱讀 162,931評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵房交,是天一觀的道長(zhǎng)产徊。 經(jīng)常有香客問(wèn)我律想,道長(zhǎng)焊唬,這世上最難降的妖魔是什么蒋纬? 我笑而不...
    開封第一講書人閱讀 58,218評(píng)論 1 292
  • 正文 為了忘掉前任源梭,我火速辦了婚禮铺呵,結(jié)果婚禮上吉挣,老公的妹妹穿的比我還像新娘派撕。我一直安慰自己,他們只是感情好睬魂,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,234評(píng)論 6 388
  • 文/花漫 我一把揭開白布终吼。 她就那樣靜靜地躺著,像睡著了一般氯哮。 火紅的嫁衣襯著肌膚如雪际跪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,198評(píng)論 1 299
  • 那天喉钢,我揣著相機(jī)與錄音姆打,去河邊找鬼。 笑死肠虽,一個(gè)胖子當(dāng)著我的面吹牛幔戏,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播税课,決...
    沈念sama閱讀 40,084評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼闲延,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了韩玩?” 一聲冷哼從身側(cè)響起垒玲,我...
    開封第一講書人閱讀 38,926評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎找颓,沒(méi)想到半個(gè)月后合愈,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,341評(píng)論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡叮雳,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,563評(píng)論 2 333
  • 正文 我和宋清朗相戀三年想暗,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了妇汗。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片帘不。...
    茶點(diǎn)故事閱讀 39,731評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖杨箭,靈堂內(nèi)的尸體忽然破棺而出寞焙,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 35,430評(píng)論 5 343
  • 正文 年R本政府宣布捣郊,位于F島的核電站辽狈,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏呛牲。R本人自食惡果不足惜刮萌,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,036評(píng)論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望娘扩。 院中可真熱鬧着茸,春花似錦、人聲如沸琐旁。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,676評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)灰殴。三九已至敬特,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間牺陶,已是汗流浹背伟阔。 一陣腳步聲響...
    開封第一講書人閱讀 32,829評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留掰伸,地道東北人减俏。 一個(gè)月前我還...
    沈念sama閱讀 47,743評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像碱工,于是被迫代替她去往敵國(guó)和親娃承。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,629評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容

  • 1怕篷、前言 之前我們通過(guò)兩篇文章(架構(gòu)設(shè)計(jì):系統(tǒng)間通信(19)——MQ:消息協(xié)議(上)历筝、架構(gòu)設(shè)計(jì):系統(tǒng)間通信(20)...
    境里婆娑閱讀 1,881評(píng)論 0 4
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn)廊谓,斷路器梳猪,智...
    卡卡羅2017閱讀 134,652評(píng)論 18 139
  • ActiveMQ 即時(shí)通訊服務(wù) 淺析http://www.cnblogs.com/hoojo/p/active_m...
    bboymonk閱讀 1,488評(píng)論 0 11
  • 現(xiàn)在是北京時(shí)間下午14:14春弥,這本該是我上課的時(shí)間,但我也不清楚為什么自己會(huì)下定決心打開簡(jiǎn)書完成我的首作叠荠。 現(xiàn)在是...
    ZhongwenClaus閱讀 316評(píng)論 3 1
  • 積壓 一棵樹被剝?nèi)ケ砥?纖維的張力在撐 幾縷枯草 幾片瓦 沉默 不是無(wú)法表達(dá) 咽下的雪雨 澆灌心里的嫩芽 讓日子變...
    張百酒閱讀 227評(píng)論 8 9