RabbitMQ:訂閱模式Publish/Subscribe

9824247-e9d1c4b88ced1491.jpg

我們之前學(xué)習(xí)的都是一個(gè)消息只能被一個(gè)消費(fèi)者消費(fèi),那么如果我想發(fā)一個(gè)消息 能被多個(gè)消費(fèi)者消費(fèi),這時(shí)候怎么辦? 這時(shí)候我們就得用到了消息中的發(fā)布訂閱模型膀斋。

1速缆、解讀

在前面的教程中,我們創(chuàng)建了一個(gè)工作隊(duì)列拆魏,都是一個(gè)任務(wù)只交給一個(gè)消費(fèi)者。 這次我們做 將消息發(fā)送給多個(gè)消費(fèi)者。這種模式叫做“發(fā)布/訂閱”嘹害。

類似微信訂閱號(hào) 發(fā)布文章消息 就可以廣播給所有的接收者。(訂閱者)
那么咱們來(lái)看一下圖,我們學(xué)過(guò)前兩種有一些不一樣,work模式 是不是同一個(gè)隊(duì)列 多個(gè)消費(fèi)者,而ps這種模式呢,是一個(gè)隊(duì)列對(duì)應(yīng)一個(gè)消費(fèi)者,Publish模式還多了一個(gè)exchange(交換機(jī) 轉(zhuǎn)發(fā)器) ,這時(shí)候我們要獲取消息 就需要隊(duì)列綁定到交換機(jī)上,交換機(jī)把消息發(fā)送到隊(duì)列 , 消費(fèi)者才能獲取隊(duì)列的消息

python-three-overall.png

解讀:

1吮便、1個(gè)生產(chǎn)者笔呀,多個(gè)消費(fèi)者
2、每一個(gè)消費(fèi)者都有自己的一個(gè)隊(duì)列
3髓需、生產(chǎn)者沒(méi)有將消息直接發(fā)送到隊(duì)列许师,而是發(fā)送到了交換機(jī)(轉(zhuǎn)發(fā)器)
4、每個(gè)隊(duì)列都要綁定到交換機(jī)
5僚匆、生產(chǎn)者發(fā)送的消息微渠,經(jīng)過(guò)交換機(jī),到達(dá)隊(duì)列咧擂,實(shí)現(xiàn)逞盆,一個(gè)消息被多個(gè)消費(fèi)者獲取的目的

2、生產(chǎn)者

package com.hrabbit.rabbitmq.publish.send;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: hrabbit
 * @Date: 2018-06-28 下午2:40
 * @Description:
 */
public class Send {
    //交換機(jī)名稱
    private final static String EXCHANGE_NAME = "hrabbit_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //聲明一個(gè)交換機(jī)松申,一個(gè)參數(shù)為交換機(jī)名稱云芦,第二個(gè)參數(shù)為模式
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //fanout 分裂
        // 消息內(nèi)容
        String message = "hello rabbitMQ!";
        //發(fā)送消息
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println("Send '" + message + "'");
        channel.close();
        connection.close();
    }
}

當(dāng)我們執(zhí)行發(fā)送消息之后,打開RabbitMQ的后臺(tái)管理界面贸桶,我們可以看到舅逸,交換機(jī)已經(jīng)存在了,但是這時(shí)候皇筛,大家思考一個(gè)問(wèn)題琉历,消息內(nèi)容跑到哪里去了呢旷痕?


image.png

這是因?yàn)榻粨Q機(jī)沒(méi)有存儲(chǔ)消息的能力,在rabbitmq中只有隊(duì)列存儲(chǔ)消息的能力.因?yàn)檫@時(shí)還沒(méi)有隊(duì)列,所以就會(huì)丟失;
所以,消息發(fā)送到了一個(gè)沒(méi)有綁定隊(duì)列的交換機(jī)時(shí),消息就會(huì)丟失!

3蹂析、消費(fèi)者1

聲明隊(duì)列為hrabbit_queue_fanout_phone,將隊(duì)列也綁定到交換機(jī)hrabbit_exchange_fanout,代碼如下:

package com.hrabbit.rabbitmq.publish.recove;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: hrabbit
 * @Date: 2018-06-28 下午2:41
 * @Description:
 */
public class Recover {

    private final static String QUEUE_NAME = "hrabbit_queue_fanout_phone";
    private final static String EXCHANGE_NAME = "hrabbit_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();

        // 聲明隊(duì)列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 綁定隊(duì)列到交換機(jī)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        //------------下面邏輯和work模式一樣-----
        // 同一時(shí)刻服務(wù)器只會(huì)發(fā)一條消息給消費(fèi)者
        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消息到達(dá) 觸發(fā)這個(gè)方法
                String msg = new String(body, "utf-8");
                System.out.println("消費(fèi)者1號(hào):" + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("消費(fèi)者1號(hào)執(zhí)行完畢翔始!");
                    // 手動(dòng)回執(zhí)
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}

4斩箫、消費(fèi)者2

聲明隊(duì)列為hrabbit_queue_fanout_email,并且將隊(duì)列也綁定到交換機(jī)hrabbit_exchange_fanout,代碼如下:

package com.hrabbit.rabbitmq.publish.recove;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: hrabbit
 * @Date: 2018-06-28 下午2:41
 * @Description:
 */
public class Recove2 {


    private final static String QUEUE_NAME = "hrabbit_queue_fanout_email";
    private final static String EXCHANGE_NAME = "hrabbit_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();

        // 聲明隊(duì)列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 綁定隊(duì)列到交換機(jī)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        //------------下面邏輯和work模式一樣-----
        // 同一時(shí)刻服務(wù)器只會(huì)發(fā)一條消息給消費(fèi)者
        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消息到達(dá) 觸發(fā)這個(gè)方法
                String msg = new String(body, "utf-8");
                System.out.println("消費(fèi)者2號(hào):" + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("消費(fèi)者2號(hào)執(zhí)行完畢!");
                    // 手動(dòng)回執(zhí)
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }

}

5. 測(cè)試結(jié)果

消費(fèi)者1輸出:

image.png

消費(fèi)者2輸出:
image.png

從上圖中我們可以看到换团,消費(fèi)者1和消費(fèi)者2都可以接收到生產(chǎn)者發(fā)送過(guò)來(lái)的消息悉稠。
而從這張圖中我們可以發(fā)現(xiàn)hrabbit_queue_fanout_emailhrabbit_queue_fanout_phone都綁定到這個(gè)交換機(jī)上了,因此都可以同時(shí)接收到消息艘包。
image.png

系列文章:

RabbitMQ:RabbitMQ-理論基礎(chǔ)
RabbitMQ:快速入門hello word
RabbitMQ:RabbitMQ:work queues 工作隊(duì)列(Round-robin/Fair dispatch)
RabbitMQ:RabbitMQ:消息應(yīng)答與消息持久化
RabbitMQ:路由Routing
RabbitMQ:Topic類型的exchange
RabbitMQ:RabbitMQ之消息確認(rèn)機(jī)制(事務(wù)+Confirm)
RabbitMQ:spring整合RabbitMQ

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末的猛,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子想虎,更是在濱河造成了極大的恐慌卦尊,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,627評(píng)論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件舌厨,死亡現(xiàn)場(chǎng)離奇詭異岂却,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)裙椭,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,180評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門躏哩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人揉燃,你說(shuō)我怎么就攤上這事扫尺。” “怎么了炊汤?”我有些...
    開封第一講書人閱讀 169,346評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵正驻,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我抢腐,道長(zhǎng)姑曙,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,097評(píng)論 1 300
  • 正文 為了忘掉前任迈倍,我火速辦了婚禮伤靠,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘啼染。我一直安慰自己醋界,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,100評(píng)論 6 398
  • 文/花漫 我一把揭開白布提完。 她就那樣靜靜地躺著,像睡著了一般丘侠。 火紅的嫁衣襯著肌膚如雪徒欣。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,696評(píng)論 1 312
  • 那天蜗字,我揣著相機(jī)與錄音打肝,去河邊找鬼脂新。 笑死,一個(gè)胖子當(dāng)著我的面吹牛粗梭,可吹牛的內(nèi)容都是我干的争便。 我是一名探鬼主播,決...
    沈念sama閱讀 41,165評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼断医,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼滞乙!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起鉴嗤,我...
    開封第一講書人閱讀 40,108評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤斩启,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后醉锅,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體兔簇,經(jīng)...
    沈念sama閱讀 46,646評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,709評(píng)論 3 342
  • 正文 我和宋清朗相戀三年硬耍,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了垄琐。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,861評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡经柴,死狀恐怖狸窘,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情口锭,我是刑警寧澤朦前,帶...
    沈念sama閱讀 36,527評(píng)論 5 351
  • 正文 年R本政府宣布,位于F島的核電站鹃操,受9級(jí)特大地震影響韭寸,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜荆隘,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,196評(píng)論 3 336
  • 文/蒙蒙 一恩伺、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧椰拒,春花似錦晶渠、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,698評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至缆毁,卻和暖如春番川,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,804評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工颁督, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留践啄,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,287評(píng)論 3 379
  • 正文 我出身青樓沉御,卻偏偏與公主長(zhǎng)得像屿讽,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子吠裆,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,860評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容

  • 來(lái)源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器伐谈。支持消息的持久化、事務(wù)硫痰、擁塞控...
    jiangmo閱讀 10,369評(píng)論 2 34
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理衩婚,服務(wù)發(fā)現(xiàn),斷路器效斑,智...
    卡卡羅2017閱讀 134,716評(píng)論 18 139
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 15,943評(píng)論 2 11
  • 轉(zhuǎn)載2017年11月01日 09:54:03 2595 RabbitMQ 即一個(gè)消息隊(duì)列非春,主要是用來(lái)實(shí)現(xiàn)應(yīng)用程序的...
    楊傳池chris閱讀 6,352評(píng)論 1 0
  • RabbitMQ 即一個(gè)消息隊(duì)列,主要是用來(lái)實(shí)現(xiàn)應(yīng)用程序的異步和解耦缓屠,同時(shí)也能起到消息緩沖奇昙,消息分發(fā)的作用。 消息...
    彩虹之夢(mèng)閱讀 1,088評(píng)論 2 1