[用官方文檔學(xué)習(xí)RabbitMQ]——3.RabbitMQ的發(fā)布訂閱模式——Publish/Subscribe

繼續(xù)翻譯第一次嘗試進行這樣模式的學(xué)習(xí)系洛,感覺好難進行螃征,不過還是要堅持住搪桂!

簡介

在之前的教程中,我們創(chuàng)建了一個工作隊列盯滚,工作隊列使用情況的假設(shè)是:每個人物都交付給一個Worker踢械,也就是消費者。在這部分中魄藕,我們將做一些完全不同的事情——我們將向多個消費者傳遞消息内列。這樣的模式被稱為“發(fā)布/訂閱"模式,檢查P/S模式背率。
為了說明這個模式话瞧,我們將會構(gòu)建一個簡單的日志記錄系統(tǒng)嫩与。它將由兩個程序組成:1.第一個程序發(fā)送日志消息。2.第二個程序?qū)⒔邮艽蛴∵@些日志交排。
在我們的日志系統(tǒng)中划滋,有接收功能的程序都將得到消息。所以埃篓,我們就可以運行一個接收器处坪,將這些日志引導(dǎo)到磁盤。同時我們再運行另一個接收器架专,功能是讓我們在屏幕上看到日志同窘。
本質(zhì)是:生產(chǎn)者發(fā)送的消息,會被傳播到所有消費者那里去部脚。

Exchange——交換器

在之前的教程之中塞椎,我們只是通過隊列發(fā)送和接受消息。接下來我們需要了解一下完整的消息傳遞模型睛低。
讓我們快速的回顧一下前面教程中介紹的內(nèi)容:

  • 生產(chǎn)者是一個應(yīng)用程序,它的任務(wù)是發(fā)送消息
  • 隊列是存儲消息的緩沖區(qū)
  • 消費者是一個應(yīng)用程序服傍,它的任務(wù)是接收消息

完整的RabbitMQ消息傳遞模型的核心思想是——生產(chǎn)者不會直接向隊列發(fā)送任何消息钱雷!甚至消費者都不知道消息是否會被傳遞到哪些隊列。
那么這些消息發(fā)送給誰了呢吹零?
生產(chǎn)者只能將消息發(fā)送到Exchange罩抗,也就是交換器里面,交換是一個很簡單的事情灿椅。一方面套蒂,它接受來自生產(chǎn)者的消息,另一方面則把消息推送到隊列里面茫蛹。交換器必須知道如何處理它收到的消息——它是否應(yīng)該推送到特定的隊列中操刀?它是否應(yīng)該被推送到N個隊列里面?獲取它應(yīng)該被拋棄婴洼?
答:這個規(guī)則是由交換類型定義的骨坑。

Exchange交換器

有一些可用的交換類型:directtopic柬采、headersfanout欢唾。我們這次主要關(guān)注點在fanout上,我們將會創(chuàng)建這種類型的交換粉捻,并調(diào)用它的日志礁遣。

channel.exchangeDeclare("logs","fanout");

fanout類型很簡單,它會將接收到的所有消息肩刃,傳播到它知道的所有隊列中去祟霍。對于我們的系統(tǒng)來說杏头,這正是我們需要的。
簡單說一下這幾個交換類型

  • direct : 所有發(fā)送到direct類型的交換器中的消息都會被轉(zhuǎn)發(fā)到”RouteKey“中指定的隊列浅碾。

  • fanout : 所有發(fā)送到fanout類型的交換器中的消息都會被轉(zhuǎn)發(fā)到所有與該交換器綁定的隊列大州。

  • topic : 所有發(fā)送到topic類型的交換器中的消息都會被轉(zhuǎn)發(fā)到所有關(guān)心RouteKey中指定的話題的隊列。

  • header : 這個用的比較少垂谢,忽略了RouteKey的路由方式厦画,使用Headers來匹配。Headers是個鍵值對滥朱。

(我自己也是在學(xué)習(xí)中根暑,不是很熟悉,以后我研究研究徙邻,明白了單寫一個番外)

交換器列表
我們可以通過rabbitmqctl語句列出我們可以運行的可用的交換器列表:

rabbitmqctl list_exchanges
listing Exchanges

出來這些東西排嫌,莫方!很多帶著amq做開頭的交換器和沒有命名的交換器缰犁,這些都是默認(rèn)創(chuàng)建的淳地,而且我們目前用不上他們。

沒有名字的交換器
教程的前面幾個部分里面帅容,我們對交換器一無所知颇象,但是仍然能夠?qū)⑾l(fā)送到隊列里面去。這是為啥并徘?
我們使用的是默認(rèn)的交換器遣钳,我們用空字符串("")去識別它。
回憶一下前面我們是如何發(fā)送消息的:

channel.basicPublish("","hello",null,message.getBytes());

第一個參數(shù)是空字符串麦乞,這個就是我們使用的交換器的名稱蕴茴。空字符串表示默認(rèn)或者匿名的交換器姐直。如果消息存在倦淀,那么則使用RoutingKey指定的名稱將消息放到隊列中去。

現(xiàn)在声畏,我們可以發(fā)布到我們自己命名的交換器啦:

channel.basicPublish("logs","",null,message.getBytes());

臨時隊列——Temporyary Queues

你可能記得我們使用過有指定名稱的隊列(還記得"hello"和task_queue"嗎晃听?)。對于我們來說砰识,能夠給隊列命名能扒,是至關(guān)重要的,因為我們需要把Worker(消費者)指向相同的隊列辫狼。當(dāng)我們想要在消費者和生產(chǎn)者之間共享隊列的時候初斑,給隊列命名就會尤為重要。(我也不知道為啥官方把這句話說了兩遍膨处,可能很重要吧ㄟ( ▔, ▔ )ㄏ...)
但是<印I笆!對于我們的Log來說鹃答,情況就不一樣啦乎澄。我們希望拿到所有關(guān)于日志的消息,而不只是它們中的一部分测摔。我們也只對當(dāng)前流動的消息感興趣置济,而不是舊消息。想要解決這個問題锋八,我們需要理清兩件事:
首先浙于,每當(dāng)我們連接到RabbitMQ,我們都需要一個新的挟纱,空的隊列羞酗。要做到這一點,我們可以創(chuàng)建一個帶有隨機名稱的隊列紊服,或者檀轨,我們選擇更好的方式——讓服務(wù)器為我們選擇一個隨機的隊列名稱。其次欺嗤,一旦我們斷開了消費者的連接裤园,應(yīng)該自動刪除隊列。在Java客戶端剂府,當(dāng)我們沒有向queueDeclare()提供參數(shù)的時候,我們會創(chuàng)建一個非持久的剃盾,獨占的腺占,自動刪除的隊列,并會生成一個名稱痒谴。

String queueName = channel.queueDeclare().getQueue();

此時衰伯,queueName包含一個隨機隊列名稱。比如积蔚,他可能看起來像amq.gen-jzty20brgko-hjmuj0wlg (總之是亂七八糟的)

綁定——Bindings

Bindings

我們已經(jīng)創(chuàng)建了一個fanout類型的交換器意鲸,和一個隊列,現(xiàn)在我們需要告訴交換器尽爆,讓他將消息發(fā)送到我們的隊列中區(qū)怎顾。交換器和隊列之間的關(guān)系叫做綁定。

channel.queueBind(queueName,"logs","")

從現(xiàn)在開始漱贱,我們的log交換器將會向我們隊列添加消息了槐雾。

完整示例

這次我沒有像之前,先貼例子幅狮。這次我選擇先寫小部分募强,最后寫完整的株灸,這樣就和官方基本上一模一樣了,會在某些地方加上自己的理解擎值。還有注釋慌烧!我還是會寫的很完整的!


p/s

發(fā)布日志消息的生產(chǎn)者程序與前一期沒啥太大的不同鸠儿,不過有些變化比較重要屹蚊。我們現(xiàn)在想要將消息發(fā)布到日志交換器中,而不是無名的交換器捆交。我們需要在發(fā)送的時候提供一個路由鍵(RoutingKey),但是我們忽略了它的value淑翼,因為我們的交換類型是fanout。

EmitLog.java

public class EmitLog {
    //設(shè)置交換器的名字
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException {

        //獲取連接
        Connection connection = ConnectionUtil.getConnection();
        //創(chuàng)建通道
        Channel channel = connection.createChannel();
        //聲明交換器,給它名字,設(shè)置交換類型為fanout
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //待傳遞的消息內(nèi)容
        String message = getMessage(args);
        //傳消息
        channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
        System.out.println("[x] Sent '"+message+"'");
        //關(guān)閉連接通道
        channel.close();
        connection.close();
    }

    private static String getMessage(String[] strings) {
        if (strings.length<1){
            return "hello world";
        }
        return joinStrings(strings," ");
    }

    private static String joinStrings(String[] strings, String delimiter) {
        int length = strings.length;
        if (length == 0){
            return "";
        }
        StringBuilder words = new StringBuilder(strings[0]);
        for (int i = 1;i < length; i++){
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}

(我貼心的把官方?jīng)]給你們的工具函數(shù)也寫上了品追!快夸我(☆▽☆))
正如你們看到的玄括,建立連接之后,我們聲明了交換器肉瓦,這個步驟是必要的遭京,因為把消息發(fā)布到一個不存在的交換器上是禁止的!
如果沒有隊列綁定到交換中泞莉,消息將消失哪雕,但是對于我們是允許的,因為如果沒有消費者在收集消息鲫趁,我們可以安全的丟棄信息斯嚎。
ReceiveLogs.java

public class ReceiveLogs {
    //設(shè)置交換器的名字
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException {
        //獲取連接
        Connection connection = ConnectionUtil.getConnection();
        //創(chuàng)建通道
        Channel channel = connection.createChannel();
        //聲明交換器,給它名字,設(shè)置交換類型為fanout
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //得到隊列的名字
        String queueName = channel.queueDeclare().getQueue();
        //隊列和交換器進行綁定
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("[*] Waiting for message.To exit press CTRL+C");
        //接收
        Consumer consumer =new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"UTF-8");
                System.out.println("[x]Receive '"+ message+"'");
            }
        };
        boolean autoAck = true;
        channel.basicConsume(queueName,autoAck,consumer);
    }
}

這里可以自己試試,打開兩個或者三個ReceiveLogs挨厚,然后嘗試看看發(fā)送一條信息堡僻,會發(fā)生什么?

結(jié)果

本來想你們自己嘗試來著疫剃,不過我還是回來把結(jié)果圖貼上钉疫。具體怎么做請看第二期工作隊列模式里面教的方式。
生產(chǎn)者:

生產(chǎn)者

打開的三個消費者:

生產(chǎn)者1
生產(chǎn)者2

第三個反正長得一樣就不貼了巢价,╭(╯^╰)╮

試驗成功牲阁,一個發(fā)送,三個同時接受成功壤躲!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末城菊,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子碉克,更是在濱河造成了極大的恐慌役电,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,383評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件棉胀,死亡現(xiàn)場離奇詭異法瑟,居然都是意外死亡冀膝,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評論 3 385
  • 文/潘曉璐 我一進店門霎挟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來窝剖,“玉大人,你說我怎么就攤上這事酥夭〈蜕矗” “怎么了?”我有些...
    開封第一講書人閱讀 157,852評論 0 348
  • 文/不壞的土叔 我叫張陵熬北,是天一觀的道長疙描。 經(jīng)常有香客問我,道長讶隐,這世上最難降的妖魔是什么起胰? 我笑而不...
    開封第一講書人閱讀 56,621評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮巫延,結(jié)果婚禮上效五,老公的妹妹穿的比我還像新娘。我一直安慰自己炉峰,他們只是感情好畏妖,可當(dāng)我...
    茶點故事閱讀 65,741評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著疼阔,像睡著了一般戒劫。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上婆廊,一...
    開封第一講書人閱讀 49,929評論 1 290
  • 那天迅细,我揣著相機與錄音,去河邊找鬼否彩。 笑死,一個胖子當(dāng)著我的面吹牛嗦随,可吹牛的內(nèi)容都是我干的列荔。 我是一名探鬼主播,決...
    沈念sama閱讀 39,076評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼枚尼,長吁一口氣:“原來是場噩夢啊……” “哼贴浙!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起署恍,我...
    開封第一講書人閱讀 37,803評論 0 268
  • 序言:老撾萬榮一對情侶失蹤崎溃,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后盯质,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體袁串,經(jīng)...
    沈念sama閱讀 44,265評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡概而,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,582評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了囱修。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片赎瑰。...
    茶點故事閱讀 38,716評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖破镰,靈堂內(nèi)的尸體忽然破棺而出餐曼,到底是詐尸還是另有隱情,我是刑警寧澤鲜漩,帶...
    沈念sama閱讀 34,395評論 4 333
  • 正文 年R本政府宣布源譬,位于F島的核電站,受9級特大地震影響孕似,放射性物質(zhì)發(fā)生泄漏踩娘。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,039評論 3 316
  • 文/蒙蒙 一鳞青、第九天 我趴在偏房一處隱蔽的房頂上張望霸饲。 院中可真熱鬧,春花似錦臂拓、人聲如沸厚脉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽傻工。三九已至,卻和暖如春孵滞,著一層夾襖步出監(jiān)牢的瞬間中捆,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,027評論 1 266
  • 我被黑心中介騙來泰國打工坊饶, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留泄伪,地道東北人。 一個月前我還...
    沈念sama閱讀 46,488評論 2 361
  • 正文 我出身青樓匿级,卻偏偏與公主長得像蟋滴,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子痘绎,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,612評論 2 350

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理津函,服務(wù)發(fā)現(xiàn),斷路器孤页,智...
    卡卡羅2017閱讀 134,637評論 18 139
  • 在前面的教程里尔苦,我們構(gòu)建了一個簡單的日志記錄系統(tǒng)。我們已經(jīng)能夠向許多消費者傳送日志消息啦。在本期允坚,我們將會做一些修...
    AceCream佳閱讀 533評論 0 2
  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務(wù)器魂那。支持消息的持久化、事務(wù)屋讶、擁塞控...
    jiangmo閱讀 10,353評論 2 34
  • RabbitMQ筆記 本文參考資料:http://blog.csdn.net/chwshuang/article/...
    wangxiaoda閱讀 2,819評論 0 11
  • RabbitMQ 原理介紹及安裝部署 標(biāo)簽:RabbitMQ 安裝 簡介 RabbitMQ 是一個用 Erlang...
    神仙CGod閱讀 8,564評論 0 60