ActiveMQ

ActiveMQ 使用版本: 5.13.5
下載地址:
鏈接:https://pan.baidu.com/s/1jIvN0gu 密碼:am2r

jdk 版本1.8
Mac版下載地址:
鏈接:https://pan.baidu.com/s/1gfnLWAZ 密碼:tj6t

啟動mq步驟:

  1. 首先進到mq 的bin 文件夾下
    cd + 文件路徑/bin
  2. 啟動 activemq
    ./activemq start
啟動mq.png
  1. 在啟動時可能會報錯, 應該是沒有執(zhí)行權限
    使用 ls -l 命令查看權限
    修改權限命令自行百度
查看權限.png
  1. 驗證是否啟動成功:
    http://127.0.0.1:8161/admin/queues.jsp
    賬號和密碼一般都是admin
    image.png

編寫mq代碼

創(chuàng)建maven項目

pom文件 添加依賴:

    <dependencies>
        <!-- spring核心配置-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${org.springframework.version}</version>
        </dependency>
        <!--spring test 結合 junit 進行測試-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>3.2.4.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.4.2</version>
        </dependency>
    </dependencies>

點對點消息模式

如果有兩個消費者同時開啟, 兩個消費者都會收到部分數(shù)據(jù), 加起來的數(shù)據(jù)為一份. 因此要保證消息目的地唯一.

  1. 創(chuàng)建生產(chǎn)者:
    // 獲取默認的用戶名, 密碼, 地址
    private static final String UserName = ActiveMQConnection.DEFAULT_USER;
    private static final String password = ActiveMQConnection.DEFAULT_PASSWORD;
    private static final String url = ActiveMQConnection.DEFAULT_BROKER_URL;
        // 創(chuàng)建連接工廠
        ConnectionFactory factory = new ActiveMQConnectionFactory(UserName, password, url);
        try {
            // 創(chuàng)建連接
            Connection connection = factory.createConnection();
            // 啟動連接
            connection.start();

            // 是否支持事務咬摇,如果為true管削,則第二個參數(shù)被設置為SESSION_TRANSACTED
            //Session.AUTO_ACKNOWLEDGE為自動確認,不管成功失敗
            //Session.CLIENT_ACKNOWLEDGE為客戶端確認。客戶端調(diào)用acknowledge方法時服務器刪除消息
            //DUPS_OK_ACKNOWLEDGE允許重復確認模式桥狡。
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // 消息的目的地
            Destination destination = session.createQueue("FirstQueue");

            // 消費生產(chǎn)者
            MessageProducer producer = session.createProducer(destination);

            for (int i = 0; i < 5; i++) {
                // 創(chuàng)建文本消息
                TextMessage message = session.createTextMessage("message" + i);
                producer.send(message);
            }
            // 如果創(chuàng)建session時為true, 則放開下面語句
//            session.commit();
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
  1. 創(chuàng)建消費者
ConnectionFactory factory = new ActiveMQConnectionFactory(username, password, url);
        try {
            Connection connection = factory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 點對點模式
            Destination destination = session.createQueue("test1");
            // 創(chuàng)建消費者
            MessageConsumer consumer = session.createConsumer(destination);
            while (true){
                TextMessage message = (TextMessage) consumer.receive();
                if (message!=null){
                    System.out.println("收到的信息為: "+ message.getText());
                }else {
                    break;
                }
            }
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

訂閱模式

與點對點模式代碼雷同, 只寫不同部分
只是生產(chǎn)者和消費者中的消息目的地創(chuàng)建的方式不同

    // 創(chuàng)建消息目的地時將createQueue("name") 換成createTopic("name")
    Destination destination = session.createTopic("test1");

持久化訂閱

生產(chǎn)者模塊代碼不變, 消費者添加身份識別
消費者代碼修改如下:

 ConnectionFactory factory = new ActiveMQConnectionFactory(username, password, url);
        try {
            Connection connection = factory.createConnection();

            // 持久化訂閱時添加
            connection.setClientID("bbb");
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // 持久化訂閱時添加
            Topic topic = session.createTopic("test1");
            MessageConsumer consumer = session.createDurableSubscriber(topic, "bbb");

            while (true){
                TextMessage message = (TextMessage) consumer.receive();
                if (message!=null){
                    System.out.println("收到的信息為: "+ message.getText());
                }else {
                    break;
                }
            }
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }

注:該模式下一定要先注冊消費者, 然后再發(fā)送消息

消息過濾

  1. 生產(chǎn)者
// 設置超時時間
producer.setTimeToLive(10 * 1000);
MapMessage message1 = session.createMapMessage();
message1.setString("name", "laowei");
message1.setIntProperty("age", 19);

MapMessage message2 = session.createMapMessage();
message2.setString("name", "xiaowang");
message2.setIntProperty("age", 10);

/**
 *message : 發(fā)送的消息
 * DeliveryMode: 是否持久化
 * priority優(yōu)先級
 * timeToLive 消息過期時間
 */
producer.send(message1, DeliveryMode.NON_PERSISTENT, 4,1000*60*10);
producer.send(message2, DeliveryMode.NON_PERSISTENT, 4,1000*60*10);
  1. 消費者:
String condition = "age>=20";
Destination destination = session.createTopic("test1");
MessageConsumer consumer = session.createConsumer(destination, condition);

activeMQ 與spring 相結合

配置:

  1. activemq.properties
## ActiveMQ Config
activemq.brokerURL=tcp\://127.0.0.1\:61616
activemq.userName=admin
activemq.password=admin
activemq.pool.maxConnections=10
#queueName
activemq.queueName=myspringqueue
  1. spring-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd"
       default-autowire="byName">
       <!-- 讀入配置屬性文件 -->
       <context:property-placeholder location="classpath:activemq.properties" />
       <!-- 注釋配置 -->
       <context:annotation-config />
       <!-- 掃描包起始位置 -->
       <context:component-scan base-package="com.laowei.springmq" />
       <import resource="classpath:spring-activemq.xml" />
</beans>
  1. sping-activemq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
       <!-- 第三方MQ工廠: ConnectionFactory -->
       <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
              <!-- ActiveMQ Address -->
              <property name="brokerURL" value="${activemq.brokerURL}" />
              <property name="userName" value="${activemq.userName}"></property>
              <property name="password" value="${activemq.password}"></property>
       </bean>

       <!--
           ActiveMQ為我們提供了一個PooledConnectionFactory恒界,通過往里面注入一個ActiveMQConnectionFactory
           可以用來將Connection烟阐、Session和MessageProducer池化辣卒,這樣可以大大的減少我們的資源消耗,要依賴于 activemq-pool包
        -->
       <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
              <property name="connectionFactory" ref="targetConnectionFactory" />
              <property name="maxConnections" value="${activemq.pool.maxConnections}" />
       </bean>
       <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
       <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
              <!-- 目標ConnectionFactory對應真實的可以產(chǎn)生JMS Connection的ConnectionFactory -->
              <property name="targetConnectionFactory" ref="pooledConnectionFactory" />
       </bean>

       <!--這個是目的地-->
       <bean id="msgQueue" class="org.apache.activemq.command.ActiveMQQueue">
              <constructor-arg>
                     <value>${activemq.queueName}</value>
              </constructor-arg>
       </bean>
       <!-- Spring提供的JMS工具類,它可以進行消息發(fā)送倦春、接收等 -->
       <!-- 隊列模板 -->
       <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
              <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->
              <property name="connectionFactory" ref="connectionFactory"/>
              <property name="defaultDestinationName" value="${activemq.queueName}"></property>
       </bean>
       <!-- 配置自定義監(jiān)聽:MessageListener -->
       <bean id="msgQueueMessageListener" class="com.laowei.springmq.Consumer"></bean>

       <!-- 將連接工廠户敬、目標對了、自定義監(jiān)聽注入jms模板 -->
       <bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
              <property name="connectionFactory" ref="connectionFactory" />
              <property name="destination" ref="msgQueue" />
              <property name="messageListener" ref="msgQueueMessageListener" />
       </bean>
</beans>
  1. 生產(chǎn)者代碼:
@Service("activeMQProducer")
public class Product {
    private JmsTemplate jmsTemplate;

    public JmsTemplate getJmsTemplate() {
        return jmsTemplate;
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    public void sendMessage(final String info){
        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(info);
            }
        });
    }
}
  1. 消費者
public class Consumer implements SessionAwareMessageListener<Message> {
    public void onMessage(Message message, Session session) throws JMSException {
        if (message instanceof TextMessage){
            System.out.println("*******" +((TextMessage) message).getText());
        }
    }
}
  1. 測試代碼:
public class TestProduct extends BaseJunit4Test{
    @Autowired
    private Product product;
    @Test
    public void sendmessage() throws InterruptedException {
        while (true){
            Thread.sleep(3000);
            product.sendMessage("hahaha");
        }
    }
}

代碼地址:
https://github.com/weijun8687/ActiveMQ.git

參考文章:
http://blog.csdn.net/fulai0_0/article/details/52127320
http://www.mytju.com/classcode/news_readNews.asp?newsID=486

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末睁本,一起剝皮案震驚了整個濱河市尿庐,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌添履,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,123評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件脑又,死亡現(xiàn)場離奇詭異暮胧,居然都是意外死亡,警方通過查閱死者的電腦和手機问麸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,031評論 2 384
  • 文/潘曉璐 我一進店門往衷,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人严卖,你說我怎么就攤上這事席舍。” “怎么了哮笆?”我有些...
    開封第一講書人閱讀 156,723評論 0 345
  • 文/不壞的土叔 我叫張陵来颤,是天一觀的道長。 經(jīng)常有香客問我稠肘,道長福铅,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,357評論 1 283
  • 正文 為了忘掉前任项阴,我火速辦了婚禮滑黔,結果婚禮上,老公的妹妹穿的比我還像新娘环揽。我一直安慰自己略荡,他們只是感情好,可當我...
    茶點故事閱讀 65,412評論 5 384
  • 文/花漫 我一把揭開白布歉胶。 她就那樣靜靜地躺著汛兜,像睡著了一般。 火紅的嫁衣襯著肌膚如雪通今。 梳的紋絲不亂的頭發(fā)上序无,一...
    開封第一講書人閱讀 49,760評論 1 289
  • 那天验毡,我揣著相機與錄音,去河邊找鬼帝嗡。 笑死晶通,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的哟玷。 我是一名探鬼主播狮辽,決...
    沈念sama閱讀 38,904評論 3 405
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼巢寡!你這毒婦竟也來了喉脖?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 37,672評論 0 266
  • 序言:老撾萬榮一對情侶失蹤抑月,失蹤者是張志新(化名)和其女友劉穎树叽,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體谦絮,經(jīng)...
    沈念sama閱讀 44,118評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡题诵,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,456評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了层皱。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片性锭。...
    茶點故事閱讀 38,599評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖叫胖,靈堂內(nèi)的尸體忽然破棺而出草冈,到底是詐尸還是另有隱情,我是刑警寧澤瓮增,帶...
    沈念sama閱讀 34,264評論 4 328
  • 正文 年R本政府宣布怎棱,位于F島的核電站,受9級特大地震影響绷跑,放射性物質發(fā)生泄漏蹄殃。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,857評論 3 312
  • 文/蒙蒙 一你踩、第九天 我趴在偏房一處隱蔽的房頂上張望诅岩。 院中可真熱鬧,春花似錦带膜、人聲如沸吩谦。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,731評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至华弓,卻和暖如春芭挽,著一層夾襖步出監(jiān)牢的瞬間滑废,已是汗流浹背蝗肪。 一陣腳步聲響...
    開封第一講書人閱讀 31,956評論 1 264
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蠕趁,地道東北人薛闪。 一個月前我還...
    沈念sama閱讀 46,286評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像俺陋,于是被迫代替她去往敵國和親豁延。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,465評論 2 348

推薦閱讀更多精彩內(nèi)容