ActiveMQ學(xué)習(xí)筆記

一、ActiveMQ簡(jiǎn)介
1.什么是ActiveMQ

ActiveMQ是Apache出品,最流行的顽爹,能力強(qiáng)勁的開(kāi)源消息總線朱灿。ActiveMQ是一個(gè)完全支持JMS1.1 和J2EE1.4規(guī)范的JMS Provider實(shí)現(xiàn),盡管JMS規(guī)范出臺(tái)已經(jīng)是很久的事情了,但是JMS在當(dāng)今的J2EE應(yīng)用中間仍然扮演著特殊的地位。

2.什么是消息

“消息”是在兩臺(tái)計(jì)算機(jī)間傳送的數(shù)據(jù)單位拐云。消息可以非常簡(jiǎn)單完疫,例如只包含文本字符串库正,也可以更復(fù)雜,可以包含嵌入對(duì)象追他。

3.什么是隊(duì)列

是一種有序的,先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu)钝荡,例如:生活中的排隊(duì)

4.什么是消息隊(duì)列

“消息隊(duì)列”是在消息的傳輸過(guò)程中保存消息的容器

5.常見(jiàn)消息服務(wù)應(yīng)用
  • ActiveMQ
  • RabbitMQ
  • RocketMQ
二街立、消息服務(wù)的應(yīng)用場(chǎng)景

消息隊(duì)列的特點(diǎn)主要是異步處理,主要作用是減少消息請(qǐng)求和響應(yīng)的時(shí)間以及解耦埠通。所以主要用于比較耗時(shí)并且不需要即時(shí)(同步)返回結(jié)果的操作赎离。

image.png

2.1 異步處理
2.1.1 用戶注冊(cè)
用戶注冊(cè)流程:
  • 注冊(cè)處理及寫(xiě)入數(shù)據(jù)庫(kù)
  • 發(fā)送注冊(cè)成功的手機(jī)短信
  • 發(fā)送注冊(cè)成功的郵件信息

如果使用消息中間件,則可以創(chuàng)建兩個(gè)線程來(lái)做這些事情植阴,直接發(fā)送消息給消息中間件蟹瘾,然后讓郵件服務(wù)和短信服務(wù)去消息中間件中取消息圾浅,取到消息后自己再做對(duì)應(yīng)的操作掠手。

2.2 應(yīng)用的解耦
2.2.1 訂單處理
生成訂單流程:
  • 在購(gòu)物車中點(diǎn)擊結(jié)算
  • 完成支付
  • 創(chuàng)建訂單
  • 調(diào)用庫(kù)存系統(tǒng)

訂單完成后,訂單系統(tǒng)不用直接取調(diào)用庫(kù)存系統(tǒng)狸捕,而是發(fā)送消息到消息中間件喷鸽,寫(xiě)入一個(gè)訂單信息。庫(kù)存系統(tǒng)自己去消息中間件中獲取灸拍,然后做發(fā)貨處理做祝,并更新庫(kù)存。

2.3流量的削峰
2.3.1 秒殺功能
秒殺流程
  • 用戶點(diǎn)擊秒殺
  • 發(fā)送請(qǐng)求到秒殺應(yīng)用
  • 在請(qǐng)求秒殺應(yīng)用之前將請(qǐng)求放入到消息隊(duì)列
  • 秒殺應(yīng)用從消息隊(duì)列中獲取請(qǐng)求并處理

系統(tǒng)舉行秒殺活動(dòng)鸡岗,流量蜂擁而至100件商品混槐,10萬(wàn)人擠進(jìn)來(lái)怎么辦?
將10萬(wàn)秒殺的操作轩性,放入消息隊(duì)列声登。秒殺應(yīng)用將10萬(wàn)個(gè)請(qǐng)求中的前100個(gè)進(jìn)行處理,其它的駁回通知失敗。這樣將流量控制在了消息隊(duì)列處悯嗓。秒殺應(yīng)用不會(huì)被懟死件舵。

三、JMS
1.什么是JMS

JMS(Java Message Service)是Java平臺(tái)上面向消息中間件的技術(shù)規(guī)范脯厨,它便于消息系統(tǒng)中的Java應(yīng)用程序進(jìn)行消息交換铅祸,并且提供標(biāo)準(zhǔn)的產(chǎn)生、發(fā)送合武、接收消息的接口临梗,簡(jiǎn)化企業(yè)應(yīng)用的開(kāi)發(fā)。

2.JMS模型
2.1 點(diǎn)對(duì)點(diǎn)模型(Point To Point)

生產(chǎn)者發(fā)送一條消息到queue眯杏,只有一個(gè)消費(fèi)者能收到夜焦。


image.png
2.2 發(fā)布訂閱模型(Publish/Subscribe)

發(fā)布者發(fā)送到topic的消息,只有訂閱了topic的訂閱者才會(huì)收到消息岂贩。


image.png
四茫经、ActiveMQ安裝
1.下載資源

ActiveMQ官網(wǎng):http://activemq.apache.org

1.1 版本說(shuō)明

ActiveMQ5.10.x 以上版本必須使用 JDK1.8 才能正常使用。
ActiveMQ5.9.x 及以下版本使用 JDK1.7 即可正常使用萎津。

2.上傳至Linux服務(wù)器
3.解壓安裝文件
tar -zxf apache-activemq-5.9.0-bin.tar.gz
4.檢查權(quán)限
ls -al apache-activemq-5.9.0/bin

如果權(quán)限不足卸伞,則無(wú)法執(zhí)行,需要修改文件權(quán)限:

chmod 755 activemq
5.復(fù)制應(yīng)用至本地目錄
cp apache-activemq-5.9.0 /usr/local/activemq -r
6.啟動(dòng)ActiveMQ
/usr/local/activemq/bin/activemq start
7.測(cè)試ActiveMQ
7.1檢查進(jìn)程
ps aux|grep activemq
7.2管理界面

使用瀏覽器訪問(wèn)ActiveMQ管理應(yīng)用锉屈,地址如下:
http://ip:8161/admin/
用戶名:admin
密碼:admin
AcitveMQ使用的是Jetty提供的HTTP服務(wù)荤傲。啟動(dòng)稍慢,建議短暫等待再訪問(wèn)測(cè)試颈渊。
見(jiàn)到如下界面代表服務(wù)啟動(dòng)成功

image.png

7.3 修改訪問(wèn)端口(管理應(yīng)用監(jiān)聽(tīng)的端口)

修改ActiveMQ配置文件:

/usr/local/activemq/conf/jetty.xml
image.png
7.4 修改用戶名和密碼

修改conf/users.properties配置文件遂黍,內(nèi)容為:用戶名=密碼
保存并啟動(dòng)ActiveMQ服務(wù)即可。


image.png
8.重啟ActiveMQ
/usr/local/activemq/bin/activemq restart
9.關(guān)閉ActiveMQ
/usr/local/activemq/bin/activemq stop
10.配置文件activemq.xml

配置文件中俊嗽,配置的是ActiveMQ的核心配置信息雾家,是提供服務(wù)時(shí)使用的配置,可以修改啟動(dòng)的訪問(wèn)端口绍豁,即Java編程中訪問(wèn)ActiveMQ的訪問(wèn)端口


image.png

默認(rèn)端口:61616(編程時(shí)使用的端口)
使用協(xié)議:TCP協(xié)議
修改端口后芯咧,保存并重啟ActiveMQ服務(wù)即可

11.ActiveMQ目錄介紹

bin:可執(zhí)行的腳本文件
conf:相關(guān)的配置文件
data:存放的是日志文件
docs:存放的是相關(guān)文檔
examples:存放的是簡(jiǎn)單的實(shí)例
lib:相關(guān)的jar包
webapps:用于存放項(xiàng)目的目錄

五、ActiveMQ術(shù)語(yǔ)
1.Destination

目的地竹揍,JMS Provider(消息中間件)負(fù)責(zé)維護(hù)敬飒,用于對(duì)Message進(jìn)行管理的對(duì)象。MessageProducer需要指定Destination才能發(fā)送消息芬位,MessageReceiver需要指定Destination才能接收消息无拗。

2.Producer

消息生成者,負(fù)責(zé)發(fā)送Message到目的地昧碉。

3.Consumer|Receiver

消息消費(fèi)者英染,負(fù)責(zé)從目的地中消費(fèi)(處理/監(jiān)聽(tīng)/訂閱)Message

4.Message

消息阴孟,用于封裝一次通信的內(nèi)容

六、ActiveMQ應(yīng)用
1.ActiveMQ常用API簡(jiǎn)介

下述API都是接口類型税迷,定義在javax.jms包中

1.1 ConnectionFactory

連接工廠:用于創(chuàng)建連接的工廠類型

1.2 Connection

連接:用于建立訪問(wèn)ActiveMQ連接的類型永丝,由連接工廠創(chuàng)建

1.3 Session

會(huì)話:一次持久、有效箭养、有狀態(tài)的訪問(wèn)慕嚷,由連接創(chuàng)建

1.4 Destination & Queue

目的地:用于描述本次訪問(wèn)ActiveMQ的消息訪問(wèn)目的地,即ActiveMQ服務(wù)中的具體隊(duì)列毕泌,由會(huì)話創(chuàng)建
Interface Queue extends Destination

1.5 MessageProducer

消息生產(chǎn)者:在一次有效會(huì)話中喝检,用于發(fā)送消息給ActiveMQ的服務(wù)工具,由會(huì)話創(chuàng)建

1.6 MessageConsumer

消息消費(fèi)者:在一次有效會(huì)話中撼泛,用于從ActiveMQ中獲取消息的工具挠说,由會(huì)話創(chuàng)建

1.7 Message

消息:通過(guò)消息生產(chǎn)者向ActiveMQ服務(wù)發(fā)送消息時(shí)使用的數(shù)據(jù)載體對(duì)象或消息消費(fèi)者從ActiveMQ服務(wù)中獲取消息時(shí)使用的數(shù)據(jù)載體對(duì)象,是所有消息(文本消息愿题、對(duì)象消息等)具體類型的頂級(jí)接口损俭,可以通過(guò)會(huì)話創(chuàng)建或通過(guò)會(huì)話從ActiveMQ服務(wù)中獲取

2.JMS-HelloWorld
2.1 處理文本消息
2.1.1 創(chuàng)建消息生產(chǎn)者
創(chuàng)建工程

mq-producer

添加坐標(biāo)
    <!--activeMQ-->
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>${activemq-all.version}</version>
    </dependency>
編寫(xiě)消息生產(chǎn)者
public class HelloWorldProducer {

    public void sendHelloWorldActiveMQ(String msgText){
        //定義連接工廠
        ConnectionFactory connectionFactory = null;

        //定義連接對(duì)象
        Connection connection = null;

        //定義會(huì)話
        Session session = null;

        //目的地
        Destination destination = null;

        //消息生產(chǎn)者
        MessageProducer producer = null;

        //定義消息
        Message message = null;

        try {
            //傳入的用戶名和密碼可以通過(guò)jetty-realm.properties文件修改
            //brokerURL:訪問(wèn)activeMQ的連接地址,路徑結(jié)構(gòu)為:協(xié)議://主機(jī)地址:端口號(hào)
            connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.254.128:61616");

            //創(chuàng)建連接對(duì)象
            connection = connectionFactory.createConnection();

            //啟動(dòng)連接(此時(shí)才是真正創(chuàng)建連接)
            connection.start();

            /**
             * 創(chuàng)建會(huì)話
             * transacted:是否使用事務(wù)潘酗,可選值為true杆兵,false
             *              true:使用事務(wù),設(shè)置此變量值仔夺,Session.SESSION.TRANSACTION
             *              false:不使用事務(wù)琐脏,設(shè)置此變量 則acknowledgeMode必須設(shè)置
             * acknowledgeMode:
             * Session.AUTO_ACKNOWLEDGE:自動(dòng)確認(rèn)機(jī)制
             * Session.CLIENT_ACKNOWLEDGE:客戶端確認(rèn)機(jī)制(需手動(dòng)調(diào)用API)
             * Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認(rèn)機(jī)制(前兩種一旦收到消息確認(rèn)就會(huì)進(jìn)行刪除,這個(gè)則不會(huì))
             */
            session = connection.createSession(false,Session.DUPS_OK_ACKNOWLEDGE);

            //創(chuàng)建目的地缸兔,即隊(duì)列的名稱日裙,消息消費(fèi)者需要通過(guò)此名稱訪問(wèn)對(duì)應(yīng)的隊(duì)列
            destination = session.createQueue("helloworld-destination");

            //創(chuàng)建消息生產(chǎn)者
            producer = session.createProducer(destination);

            //創(chuàng)建消息對(duì)象
            message = session.createTextMessage(msgText);

            //發(fā)送消息
            producer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            //回收消息發(fā)送者資源
            if (producer != null){
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (session != null){
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
2.1.2 創(chuàng)建消息消費(fèi)者
創(chuàng)建工程

mq-consumer

添加坐標(biāo)
<!--activeMQ-->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
</dependency>
創(chuàng)建消息生產(chǎn)者
public class HelloWorldConsumer {

    public void receiveHelloWorldActiveMQ() {
        //定義連接工廠
        ConnectionFactory connectionFactory = null;
        //定義連接
        Connection connection = null;
        //定義會(huì)話
        Session session = null;
        //定義目的地
        Destination destination = null;
        //定義消息消費(fèi)者
        MessageConsumer consumer = null;
        //定義消息
        Message message = null;

        try {
            //創(chuàng)建連接工廠
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.254.128:61616");
            //創(chuàng)建連接對(duì)象
            connection = connectionFactory.createConnection();
            //開(kāi)啟連接
            connection.start();
            //創(chuàng)建會(huì)話
            session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            //創(chuàng)建目的地
            destination = session.createQueue("helloworld-destination");
            //創(chuàng)建消息消費(fèi)者
            consumer = session.createConsumer(destination);
            //接收消息
            message = consumer.receive();

            //獲取文本消息
            String msg = ((TextMessage) message).getText();
            System.out.println("從ActiveMQ中獲取的文本信息:" + msg);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
2.1.3 測(cè)試
測(cè)試生產(chǎn)者
public class Test {
    public static void main(String[] args) {
        HelloWorldProducer producer = new HelloWorldProducer();
        producer.sendHelloWorldActiveMQ("HelloWorld");
    }
}
測(cè)試消費(fèi)者
public class Test {
    public static void main(String[] args) {
        HelloWorldConsumer consumer = new HelloWorldConsumer();
        consumer.receiveHelloWorldActiveMQ();
    }
}
image.png
2.2 處理對(duì)象消息
2.2.1 創(chuàng)建對(duì)象
public class User implements Serializable {

    private Integer userId;
    private String userName;
    private Integer userAge;

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public Integer getUserAge() {
        return userAge;
    }

    public void setUserAge(Integer userAge) {
        this.userAge = userAge;
    }

    @Override
    public String toString() {
        return "User{" +
                "userId=" + userId +
                ", userName='" + userName + '\'' +
                ", userAge=" + userAge +
                '}';
    }
}
2.2.2 創(chuàng)建生產(chǎn)者
public class HelloWorldProducer2 {

    public void sendHelloWorldActiveMQ(User user){
        //定義連接工廠
        ConnectionFactory connectionFactory = null;

        //定義連接對(duì)象
        Connection connection = null;

        //定義會(huì)話
        Session session = null;

        //目的地
        Destination destination = null;

        //消息生產(chǎn)者
        MessageProducer producer = null;

        //定義消息
        Message message = null;

        try {
            //傳入的用戶名和密碼可以通過(guò)jetty-realm.properties文件修改
            //brokerURL:訪問(wèn)activeMQ的連接地址,路徑結(jié)構(gòu)為:協(xié)議://主機(jī)地址:端口號(hào)
            connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.254.128:61616");

            //創(chuàng)建連接對(duì)象
            connection = connectionFactory.createConnection();

            //啟動(dòng)連接(此時(shí)才是真正創(chuàng)建連接)
            connection.start();

            /**
             * 創(chuàng)建會(huì)話
             * transacted:是否使用事務(wù)惰蜜,可選值為true昂拂,false
             *              true:使用事務(wù),設(shè)置此變量值蝎抽,Session.SESSION.TRANSACTION
             *              false:不使用事務(wù)政钟,設(shè)置此變量 則acknowledgeMode必須設(shè)置
             * acknowledgeMode:
             * Session.AUTO_ACKNOWLEDGE:自動(dòng)確認(rèn)機(jī)制
             * Session.CLIENT_ACKNOWLEDGE:客戶端確認(rèn)機(jī)制(需手動(dòng)調(diào)用API)
             * Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認(rèn)機(jī)制(前兩種一旦收到消息確認(rèn)就會(huì)進(jìn)行刪除路克,這個(gè)則不會(huì))
             */
            session = connection.createSession(false,Session.DUPS_OK_ACKNOWLEDGE);

            //創(chuàng)建目的地樟结,即隊(duì)列的名稱,消息消費(fèi)者需要通過(guò)此名稱訪問(wèn)對(duì)應(yīng)的隊(duì)列
            destination = session.createQueue("my-user");

            //創(chuàng)建消息生產(chǎn)者
            producer = session.createProducer(destination);

            //創(chuàng)建消息對(duì)象
            message = session.createObjectMessage(user);

            //發(fā)送消息
            producer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            //回收消息發(fā)送者資源
            if (producer != null){
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (session != null){
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
2.2.3 創(chuàng)建消費(fèi)者
public class HelloWorldConsumer2 {

    public void receiveHelloWorldActiveMQ() {
        //定義連接工廠
        ConnectionFactory connectionFactory = null;
        //定義連接
        Connection connection = null;
        //定義會(huì)話
        Session session = null;
        //定義目的地
        Destination destination = null;
        //定義消息消費(fèi)者
        MessageConsumer consumer = null;
        //定義消息
        Message message = null;

        try {
            //創(chuàng)建連接工廠
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.254.128:61616");
            //創(chuàng)建連接對(duì)象
            connection = connectionFactory.createConnection();
            //開(kāi)啟連接
            connection.start();
            //創(chuàng)建會(huì)話
            session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            //創(chuàng)建目的地
            destination = session.createQueue("my-user");
            //創(chuàng)建消息消費(fèi)者
            consumer = session.createConsumer(destination);
            //接收消息
            message = consumer.receive();

            Serializable obj = ((ObjectMessage) message).getObject();

            User user = (User) obj;
            System.out.println("從ActiveMQ中獲取的對(duì)象信息:" + user);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (consumer != null) {
                try {
                    consumer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
2.2.4 測(cè)試
public class Test {
    public static void main(String[] args) {
        /*HelloWorldProducer producer = new HelloWorldProducer();
        producer.sendHelloWorldActiveMQ("HelloWorld");*/

        HelloWorldProducer2 producer2 = new HelloWorldProducer2();
        producer2.sendHelloWorldActiveMQ(new User(1, "tom", 21));
    }
}
public class Test {
    public static void main(String[] args) {
        /*HelloWorldConsumer consumer = new HelloWorldConsumer();
        consumer.receiveHelloWorldActiveMQ();*/

        HelloWorldConsumer2 consumer2 = new HelloWorldConsumer2();
        consumer2.receiveHelloWorldActiveMQ();
    }
}
image.png
3.JMS-實(shí)現(xiàn)隊(duì)列服務(wù)監(jiān)聽(tīng)
隊(duì)列監(jiān)聽(tīng)使用了觀察者模式
3.1 創(chuàng)建消息生產(chǎn)者
public class HelloWorldProducer3 {

    public void sendHelloWorldActiveMQ(User user){
        //定義連接工廠
        ConnectionFactory connectionFactory = null;

        //定義連接對(duì)象
        Connection connection = null;

        //定義會(huì)話
        Session session = null;

        //目的地
        Destination destination = null;

        //消息生產(chǎn)者
        MessageProducer producer = null;

        //定義消息
        Message message = null;

        try {
            //傳入的用戶名和密碼可以通過(guò)jetty-realm.properties文件修改
            //brokerURL:訪問(wèn)activeMQ的連接地址精算,路徑結(jié)構(gòu)為:協(xié)議://主機(jī)地址:端口號(hào)
            connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.254.128:61616");

            //創(chuàng)建連接對(duì)象
            connection = connectionFactory.createConnection();

            //啟動(dòng)連接(此時(shí)才是真正創(chuàng)建連接)
            connection.start();

            /**
             * 創(chuàng)建會(huì)話
             * transacted:是否使用事務(wù)瓢宦,可選值為true,false
             *              true:使用事務(wù)灰羽,設(shè)置此變量值驮履,Session.SESSION.TRANSACTION
             *              false:不使用事務(wù)鱼辙,設(shè)置此變量 則acknowledgeMode必須設(shè)置
             * acknowledgeMode:
             * Session.AUTO_ACKNOWLEDGE:自動(dòng)確認(rèn)機(jī)制
             * Session.CLIENT_ACKNOWLEDGE:客戶端確認(rèn)機(jī)制(需手動(dòng)調(diào)用API)
             * Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認(rèn)機(jī)制(前兩種一旦收到消息確認(rèn)就會(huì)進(jìn)行刪除,這個(gè)則不會(huì))
             */
            session = connection.createSession(false,Session.DUPS_OK_ACKNOWLEDGE);

            //創(chuàng)建目的地玫镐,即隊(duì)列的名稱倒戏,消息消費(fèi)者需要通過(guò)此名稱訪問(wèn)對(duì)應(yīng)的隊(duì)列
            destination = session.createQueue("my-destination");

            //創(chuàng)建消息生產(chǎn)者
            producer = session.createProducer(destination);

            //創(chuàng)建消息對(duì)象
            message = session.createObjectMessage(user);

            //發(fā)送消息
            producer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            //回收消息發(fā)送者資源
            if (producer != null){
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (session != null){
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
3.2 創(chuàng)建消息消費(fèi)者
public class HelloWorldConsumer3 {

    public void receiveHelloWorldActiveMQ() {
        //定義連接工廠
        ConnectionFactory connectionFactory = null;
        //定義連接
        Connection connection = null;
        //定義會(huì)話
        Session session = null;
        //定義目的地
        Destination destination = null;
        //定義消息消費(fèi)者
        MessageConsumer consumer = null;
        //定義消息
        Message message = null;

        try {
            //創(chuàng)建連接工廠
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.254.128:61616");
            //創(chuàng)建連接對(duì)象
            connection = connectionFactory.createConnection();
            //開(kāi)啟連接
            connection.start();
            //創(chuàng)建會(huì)話
            session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            //創(chuàng)建目的地
            destination = session.createQueue("my-destination");
            //創(chuàng)建消息消費(fèi)者
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {

                //ActiveMQ的回調(diào)方法,通過(guò)該方法將消息傳遞到consumer中
                @Override
                public void onMessage(Message message) {
                    Serializable obj = null;
                    try {
                        obj = ((ObjectMessage) message).getObject();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }

                    User user = (User) obj;
                    System.out.println("從ActiveMQ中獲取的對(duì)象信息:" + user);
                }
            });
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
4.Topic模型
4.1 Publish/Subscribe 處理模式(Topic)

消息生產(chǎn)者(發(fā)布)將消息發(fā)布到topic中恐似,同時(shí)有多個(gè)消息消費(fèi)者(訂閱)消費(fèi)該消息杜跷。
和點(diǎn)對(duì)點(diǎn)方式不同,發(fā)布到topic的消息會(huì)被所有訂閱者消費(fèi)
當(dāng)生產(chǎn)者發(fā)布消息矫夷,不管是否有消費(fèi)者葛闷,都不會(huì)保存消息
一定要先有消息的消費(fèi)者,后有消息生產(chǎn)者

image.png

4.2 創(chuàng)建消息生產(chǎn)者
public class HelloWorldProducerTopic {

    public void sendHelloWorldActiveMQ(String msgText){
        //定義連接工廠
        ConnectionFactory connectionFactory = null;

        //定義連接對(duì)象
        Connection connection = null;

        //定義會(huì)話
        Session session = null;

        //目的地
        Destination destination = null;

        //消息生產(chǎn)者
        MessageProducer producer = null;

        //定義消息
        Message message = null;

        try {
            //傳入的用戶名和密碼可以通過(guò)jetty-realm.properties文件修改
            //brokerURL:訪問(wèn)activeMQ的連接地址双藕,路徑結(jié)構(gòu)為:協(xié)議://主機(jī)地址:端口號(hào)
            connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.254.128:61616");

            //創(chuàng)建連接對(duì)象
            connection = connectionFactory.createConnection();

            //啟動(dòng)連接(此時(shí)才是真正創(chuàng)建連接)
            connection.start();

            /**
             * 創(chuàng)建會(huì)話
             * transacted:是否使用事務(wù)淑趾,可選值為true,false
             *              true:使用事務(wù)忧陪,設(shè)置此變量值扣泊,Session.SESSION.TRANSACTION
             *              false:不使用事務(wù),設(shè)置此變量 則acknowledgeMode必須設(shè)置
             * acknowledgeMode:
             * Session.AUTO_ACKNOWLEDGE:自動(dòng)確認(rèn)機(jī)制
             * Session.CLIENT_ACKNOWLEDGE:客戶端確認(rèn)機(jī)制(需手動(dòng)調(diào)用API)
             * Session.DUPS_OK_ACKNOWLEDGE:有副本的客戶端確認(rèn)機(jī)制(前兩種一旦收到消息確認(rèn)就會(huì)進(jìn)行刪除嘶摊,這個(gè)則不會(huì))
             */
            session = connection.createSession(false,Session.DUPS_OK_ACKNOWLEDGE);

            //創(chuàng)建目的地旷赖,即隊(duì)列的名稱,消息消費(fèi)者需要通過(guò)此名稱訪問(wèn)對(duì)應(yīng)的隊(duì)列
            destination = session.createTopic("test-topic");

            //創(chuàng)建消息生產(chǎn)者
            producer = session.createProducer(destination);

            //創(chuàng)建消息對(duì)象
            message = session.createTextMessage(msgText);

            //發(fā)送消息
            producer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            //回收消息發(fā)送者資源
            if (producer != null){
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (session != null){
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if (connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
4.3 創(chuàng)建消息消費(fèi)者

創(chuàng)建三份

public class HelloWorldConsumerTopic1 implements Runnable{

    public void receiveHelloWorldActiveMQ() {
        //定義連接工廠
        ConnectionFactory connectionFactory = null;
        //定義連接
        Connection connection = null;
        //定義會(huì)話
        Session session = null;
        //定義目的地
        Destination destination = null;
        //定義消息消費(fèi)者
        MessageConsumer consumer = null;
        //定義消息
        Message message = null;

        try {
            //創(chuàng)建連接工廠
            connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.254.128:61616");
            //創(chuàng)建連接對(duì)象
            connection = connectionFactory.createConnection();
            //開(kāi)啟連接
            connection.start();
            //創(chuàng)建會(huì)話
            session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            //創(chuàng)建目的地
            destination = session.createTopic("test-topic");
            //創(chuàng)建消息消費(fèi)者
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    //獲取文本消息
                    String msg = null;
                    try {
                        msg = ((TextMessage) message).getText();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    System.out.println("從ActiveMQ中獲取的文本信息----topic1:" + msg);
                }
            });

        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        receiveHelloWorldActiveMQ();
    }
}
4.4 測(cè)試
public class Test {
    public static void main(String[] args) {
        /*HelloWorldProducer producer = new HelloWorldProducer();
        producer.sendHelloWorldActiveMQ("HelloWorld");*/

        /*HelloWorldProducer2 producer2 = new HelloWorldProducer2();
        producer2.sendHelloWorldActiveMQ(new User(1, "tom", 21));*/

        /*HelloWorldProducer3 producer3 = new HelloWorldProducer3();
        producer3.sendHelloWorldActiveMQ(new User(2,"alice",19));*/

        HelloWorldProducerTopic topic = new HelloWorldProducerTopic();
        topic.sendHelloWorldActiveMQ("Hello Topic");
    }
}
public class Test {
    public static void main(String[] args) {
        /*HelloWorldConsumer consumer = new HelloWorldConsumer();
        consumer.receiveHelloWorldActiveMQ();*/

        /*HelloWorldConsumer2 consumer2 = new HelloWorldConsumer2();
        consumer2.receiveHelloWorldActiveMQ();*/

        /*HelloWorldConsumer3 consumer3 = new HelloWorldConsumer3();
        consumer3.receiveHelloWorldActiveMQ();*/

        HelloWorldConsumerTopic1 topic1 = new HelloWorldConsumerTopic1();
        Thread thread1 = new Thread(topic1);
        thread1.start();

        HelloWorldConsumerTopic2 topic2 = new HelloWorldConsumerTopic2();
        Thread thread2 = new Thread(topic2);
        thread2.start();

        HelloWorldConsumerTopic3 topic3 = new HelloWorldConsumerTopic3();
        Thread thread3 = new Thread(topic3);
        thread3.start();
    }
}
image.png
七更卒、Spring整合ActiveMQ
1.創(chuàng)建項(xiàng)目

創(chuàng)建spring-activemq-producer

1.1 添加坐標(biāo)
    <dependencies>
        <!--activeMQ-->
        <!--ActiveMQ客戶端完整jar包依賴-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>spring-context</artifactId>
                    <groupId>org.springframework</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.geronimo.specs</groupId>
                    <artifactId>geronimo-jms_1.1_spec</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--ActiveMQ和Spring整合配置文件標(biāo)簽處理jar包依賴-->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
        </dependency>
        <!--Spring JMS插件相關(guān)的jar包依賴-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>

        <!--Active Pool-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-jms-pool</artifactId>
        </dependency>

        <!-- 單元測(cè)試 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
        </dependency>
        <!-- 日志處理 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </dependency>
        <!-- spring -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
        </dependency>

        <!--javaee-->
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>servlet-api</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>jsp-api</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>jstl</artifactId>
        </dependency>

        <dependency>
            <groupId>javax.jms</groupId>
            <artifactId>javax.jms-api</artifactId>
        </dependency>
    </dependencies>
1.2 整合ActiveMQ
  • web.xml
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
          http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
         version="2.5">

    <servlet>
        <servlet-name>springmvc</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath:spring-*.xml</param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>
    <servlet-mapping>
        <servlet-name>springmvc</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>
    
    <filter>
        <filter-name>encodingFilter</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>UTF-8</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>encodingFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>
</web-app>
  • spring-mvc.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!--掃包-->
    <context:component-scan base-package="com.hxx.web.controller"/>
    
    <!--添加注解驅(qū)動(dòng)-->
    <mvc:annotation-driven/>

    <!--配置視圖解析器-->
    <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">
        <property name="prefix" value="/WEB-INF/jsp/"/>
        <property name="suffix" value=".jsp"/>
    </bean>

    <!--放行靜態(tài)資源-->
</beans>
  • spring-service
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!--加載資源文件-->
    <context:property-placeholder location="classpath:resource.properties"/>

    <!--掃描bean對(duì)象-->
    <context:component-scan base-package="com.hxx.service.impl"/>
</beans>
  • spring-jms.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!--創(chuàng)建一個(gè)連接工廠等孵,連接ActiveMQ,ActiveMQConnectionFactory,需要依賴ActiveMQ提供的amq標(biāo)簽-->
    <!--amq:connectionFactory是bean的子標(biāo)簽蹂空,會(huì)在Spring容器中創(chuàng)建一個(gè)bean對(duì)象俯萌,
    可以為對(duì)象名,類似:<bean id="" class="ActiveMQConnectionFactory"/>-->
    <amq:connectionFactory brokerURL="tcp://192.168.254.128:61616"
                           userName="admin" password="admin" id="amqConnectionFactory"/>

    <!--spring管理JMS相關(guān)代碼的時(shí)候上枕,必須依賴jms標(biāo)簽庫(kù)咐熙,Spring-jms提供標(biāo)簽庫(kù)-->
    <!--
        定義Spring-jms中的連接工廠對(duì)象
        CachingConnectionFactory - spring框架提供的連接工廠對(duì)象,不能真正訪問(wèn)MOM容器辨萍,
        類似一個(gè)工廠的代理對(duì)象棋恼,需要提供一個(gè)真實(shí)工廠,實(shí)現(xiàn)MOM容器的連接訪問(wèn)
    -->
    <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
        <property name="connectionFactory" ref="amqConnectionFactory"/>
        <property name="maxConnections" value="10"/>
    </bean>

    <!--配置有緩存的ConnectionFactory锈玉,Session的緩存大小可定制-->
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
        <property name="sessionCacheSize" value="3"/>
    </bean>

    <!--jmsTemplate配置-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!--給定連接工廠-->
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <!--默認(rèn)目的地命名-->
        <property name="defaultDestinationName" value="test-spring"/>
    </bean>
</beans>
2.創(chuàng)建項(xiàng)目

spring-activemq-consumer

2.1 添加依賴
    <dependencies>
        <!--activeMQ-->
        <!--ActiveMQ客戶端完整jar包依賴-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>spring-context</artifactId>
                    <groupId>org.springframework</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.geronimo.specs</groupId>
                    <artifactId>geronimo-jms_1.1_spec</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--ActiveMQ和Spring整合配置文件標(biāo)簽處理jar包依賴-->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
        </dependency>
        <!--Spring JMS插件相關(guān)的jar包依賴-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
        </dependency>

        <!--Active Pool-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-jms-pool</artifactId>
        </dependency>

        <!-- 單元測(cè)試 -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
        </dependency>
        <!-- 日志處理 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </dependency>
        <!-- spring -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
        </dependency>

        <dependency>
            <groupId>javax.jms</groupId>
            <artifactId>javax.jms-api</artifactId>
        </dependency>
    </dependencies>
2.2 整合ActiveMQ
  • spring-service.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!--掃描bean對(duì)象-->
    <context:component-scan base-package="com.hxx.service,com.hxx.listener"/>
</beans>
  • spring-jms.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">

    <!--創(chuàng)建一個(gè)連接工廠爪飘,連接ActiveMQ络断,ActiveMQConnectionFactory,需要依賴ActiveMQ提供的amq標(biāo)簽-->
    <!--amq:connectionFactory是bean的子標(biāo)簽统锤,會(huì)在Spring容器中創(chuàng)建一個(gè)bean對(duì)象桩蓉,
    可以為對(duì)象名扔水,類似:<bean id="" class="ActiveMQConnectionFactory"/>-->
    <amq:connectionFactory brokerURL="tcp://192.168.254.128:61616"
                           userName="admin" password="admin" id="amqConnectionFactory"/>

    <!--spring管理JMS相關(guān)代碼的時(shí)候遭商,必須依賴jms標(biāo)簽庫(kù)攘残,Spring-jms提供標(biāo)簽庫(kù)-->
    <!--
        定義Spring-jms中的連接工廠對(duì)象
        CachingConnectionFactory - spring框架提供的連接工廠對(duì)象插佛,不能真正訪問(wèn)MOM容器产弹,
        類似一個(gè)工廠的代理對(duì)象,需要提供一個(gè)真實(shí)工廠床估,實(shí)現(xiàn)MOM容器的連接訪問(wèn)
    -->
    <!--配置有緩存的ConnectionFactory含滴,Session的緩存大小可定制-->
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
        <property name="sessionCacheSize" value="3"/>
    </bean>

    <!--注冊(cè)監(jiān)聽(tīng)器-->
    <!--
        開(kāi)始注冊(cè)監(jiān)聽(tīng)
        需要的參數(shù)有:
            acknowledge:消息確認(rèn)機(jī)制
            container-type:simple|default
            simple:SimpleMessageListenerContainer最簡(jiǎn)單的消息監(jiān)聽(tīng)器容器,只能處理固定數(shù)量的JMS會(huì)話
            default:DefaultMessageListenerContainer是一種用于異步消息監(jiān)聽(tīng)的管理類丐巫,且支持事務(wù)
            destination-type:目的地類型蛙吏,使用隊(duì)列作為目的地,
            connection-factory:連接工廠鞋吉,spring-jms使用的工廠鸦做,必須是spring自主創(chuàng)建的
                                不能使用第三方工具創(chuàng)建工程,如:ActiveMQConnectionFactory
    -->
    <jms:listener-container acknowledge="auto" container-type="default"
                            destination-type="queue" connection-factory="cachingConnectionFactory">
        <!--
            在監(jiān)聽(tīng)器容器中注冊(cè)某監(jiān)聽(tīng)對(duì)象谓着,
            destination - 設(shè)置目的地命名
            ref - 指定監(jiān)聽(tīng)器對(duì)象
        -->
        <jms:listener destination="test-spring" ref="myListener"/>
    </jms:listener-container>
</beans>
  • 創(chuàng)建MyMessageListener
@Component(value = "myListener")
public class MyMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {

    }
}
  • 測(cè)試
public class TestActiveMQ {

    public static void main(String[] args) throws IOException {
        ClassPathXmlApplicationContext ac = new ClassPathXmlApplicationContext(new String[]{"classpath:spring-jms.xml"
                ,"classpath:spring-service.xml"});
        ac.start();
        System.out.println("spring容器啟動(dòng)");

        System.in.read();
    }
}
image.png
3.測(cè)試整合
需求:

1.在Producer中創(chuàng)建User類
2.將User對(duì)象傳遞到ActiveMQ中
3.在Consumer中獲取User對(duì)象并在控制臺(tái)打印

3.1 Producer發(fā)送消息
@Service
public class UserServiceImpl implements UserService {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Override
    public void addUser(final User user) {
        jmsTemplate.send(new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                //發(fā)送消息
                return session.createObjectMessage(user);
            }
        });
    }
}

發(fā)送成功


image.png
3.2 Consumer接收消息
  • userServiceImpl.java
@Service
public class UserServiceImpl implements UserService {
    @Override
    public void showUser(User user) {
        System.out.println(user);
    }
}
  • MyMessageListener.java
@Component(value = "myListener")
public class MyMessageListener implements MessageListener {

    @Autowired
    private UserService userService;

    @Override
    public void onMessage(Message message) {
        Serializable obj = null;
        try {
            obj = ((ObjectMessage) message).getObject();
        } catch (JMSException e) {
            e.printStackTrace();
        }
        User user = (User) obj;
        userService.showUser(user);
    }
}
image.png

image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末泼诱,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子赊锚,更是在濱河造成了極大的恐慌治筒,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,627評(píng)論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件舷蒲,死亡現(xiàn)場(chǎng)離奇詭異耸袜,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)牲平,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,180評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門堤框,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人纵柿,你說(shuō)我怎么就攤上這事蜈抓。” “怎么了昂儒?”我有些...
    開(kāi)封第一講書(shū)人閱讀 169,346評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵沟使,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我渊跋,道長(zhǎng)腊嗡,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 60,097評(píng)論 1 300
  • 正文 為了忘掉前任拾酝,我火速辦了婚禮燕少,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘微宝。我一直安慰自己棺亭,他們只是感情好虎眨,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,100評(píng)論 6 398
  • 文/花漫 我一把揭開(kāi)白布蟋软。 她就那樣靜靜地躺著镶摘,像睡著了一般。 火紅的嫁衣襯著肌膚如雪岳守。 梳的紋絲不亂的頭發(fā)上凄敢,一...
    開(kāi)封第一講書(shū)人閱讀 52,696評(píng)論 1 312
  • 那天,我揣著相機(jī)與錄音湿痢,去河邊找鬼涝缝。 笑死,一個(gè)胖子當(dāng)著我的面吹牛譬重,可吹牛的內(nèi)容都是我干的拒逮。 我是一名探鬼主播,決...
    沈念sama閱讀 41,165評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼臀规,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼滩援!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起塔嬉,我...
    開(kāi)封第一講書(shū)人閱讀 40,108評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤玩徊,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后谨究,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體恩袱,經(jīng)...
    沈念sama閱讀 46,646評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,709評(píng)論 3 342
  • 正文 我和宋清朗相戀三年胶哲,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了畔塔。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,861評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡鸯屿,死狀恐怖俩檬,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情碾盟,我是刑警寧澤棚辽,帶...
    沈念sama閱讀 36,527評(píng)論 5 351
  • 正文 年R本政府宣布,位于F島的核電站冰肴,受9級(jí)特大地震影響屈藐,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜熙尉,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,196評(píng)論 3 336
  • 文/蒙蒙 一联逻、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧检痰,春花似錦包归、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,698評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)换可。三九已至,卻和暖如春厦幅,著一層夾襖步出監(jiān)牢的瞬間沾鳄,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,804評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工确憨, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留译荞,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,287評(píng)論 3 379
  • 正文 我出身青樓休弃,卻偏偏與公主長(zhǎng)得像吞歼,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子塔猾,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,860評(píng)論 2 361