RabbitMQ:路由模式Routing

9824247-2f04485338ca6443.jpg

上一篇中我們構(gòu)建了一個(gè)簡(jiǎn)單的日志系統(tǒng),我們可以把日志消息廣播給多個(gè)接受者荧关。

這篇中我們將來(lái)添加一個(gè)特性只接收部分消息。例如我只將一些錯(cuò)誤log存到文件中,把所有的log都打印到控制臺(tái)里岭辣。


WX20180630-192459@2x.png

1沦童、綁定(Bindings)

在上篇博文中偷遗,我們已經(jīng)創(chuàng)建了一個(gè)binding氏豌,代碼如下:

channel.queueBind(queueName, EXCHANGE_NAME, "");

一個(gè)binding就是exchange和Queue之間的一個(gè)關(guān)系般妙。可以簡(jiǎn)單的理解為:這個(gè)Queue對(duì)其相對(duì)于的exchange的消息之間建立了一個(gè)關(guān)系旺隙。

Binding可以使用一個(gè)已經(jīng)存在的routingKey參數(shù)伏社。為了避免和basic_publish參數(shù)混淆,我們稱之為binding key罕容。下邊就是我們?cè)趺从胟ey來(lái)創(chuàng)建一個(gè)binding:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

binding key的意義有時(shí)候取決于exchange的類型旅择。對(duì)于Fanout類型的exchange捺宗,會(huì)忽略binding key。

2、Direct類型的exchange

我們上篇博文中的日志系統(tǒng)會(huì)把所有的log消息廣播給所有的消費(fèi)者。我們想擴(kuò)展來(lái)根據(jù)他們的日志級(jí)別來(lái)過(guò)濾log消息蕊连。例如:我們只想把error級(jí)別的日志寫到磁盤文件中载庭,而其它級(jí)別的日志消息則過(guò)濾掉顽铸。

我們之前使用的fanout類型的exchange鬼譬,但這樣就不會(huì)有太多的靈活性。

在這里我們將要使用direct類型的exchange。Direct類型exchange的路由算法是很簡(jiǎn)單的:要想一個(gè)消息能到達(dá)這個(gè)隊(duì)列颤枪,需要binding key和routing key正好能匹配得上汗捡。

8B1B778212E1B8A1ABDCC136CAB20DB8.jpg

在這樣的結(jié)構(gòu)中扇住,我們可以看到direct類型的exchange X女阀,有兩個(gè)queue綁定到它。第一個(gè)queue是以orange為binding key綁定到exchange X上的,第二個(gè)queue是由兩個(gè)binding key(black和green)綁定到exchange X的晓淀。

在這樣的設(shè)置中所袁,一條消息被推送到exchange,如果使用的routing key是error懦窘,那么消息就會(huì)被路由到C1中前翎;如果使用的routing key是error或者info或者warning,那么該消息將會(huì)被路由到C2中畅涂。其它的消息都將會(huì)被丟棄掉港华。

3、多重綁定(Multiple bindings)

221512183874891.png

用同一個(gè)binding來(lái)把多個(gè)queue綁定到同一個(gè)exchange也是可行的午衰。例如在之前例子的基礎(chǔ)上立宜,在X和Q1之間添加binding key名字為black,這樣的話臊岸,這里的direct類型的exchange就和fanout類型的一樣了橙数,可以把消息推送給所有的queue。帶有routing key為black的消息將會(huì)被推送到Q1和Q2中帅戒。

4灯帮、發(fā)送日志(Emitting logs)

我們將會(huì)使用這種模型,不使用fanout類型的exchange逻住,而是使用direct類型的钟哥。我們使用日志級(jí)別做為routing key,接收端根據(jù)設(shè)置的日志級(jí)別做為binding key來(lái)接收消息瞎访。首先來(lái)看看發(fā)射日志:

如之前一樣腻贰,首先來(lái)創(chuàng)建一個(gè)exchange:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

然后準(zhǔn)備發(fā)送消息;

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

這里的”severity”可以是”info”、“warning”装诡、”error”等银受。

那么下面我們用代碼實(shí)現(xiàn)以下:

5践盼、生產(chǎn)者

package com.hrabbit.rabbitmq.routing.send;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: hrabbit
 * @Date: 2018-06-30 下午7:41
 * @Description:
 */
public class Send {

    //交換機(jī)名稱
    private final static String EXCHANGE_NAME = "hrabbit_exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {

        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //聲明一個(gè)交換機(jī),一個(gè)參數(shù)為交換機(jī)名稱宾巍,第二個(gè)參數(shù)為模式
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 消息內(nèi)容
        String message = "id=1的商品刪除了";

        channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}

在上面的生產(chǎn)者我發(fā)送了一個(gè)info類型的內(nèi)容咕幻,此時(shí)應(yīng)該C2可以接受到這條消息。

6顶霞、消費(fèi)者1號(hào)

消費(fèi)者定義的routingKey 為error肄程。

package com.hrabbit.rabbitmq.routing.recover;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: hrabbit
 * @Date: 2018-06-30 下午7:42
 * @Description:
 */
public class Recover {

    //隊(duì)列名稱
    private final static String QUEUE_NAME = "hrabbit_queue_direct_1";
    //交換機(jī)名稱
    private final static String EXCHANGE_NAME = "hrabbit_exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();

        // 聲明隊(duì)列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 綁定隊(duì)列到交換機(jī)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        //------------下面邏輯和work模式一樣-----
        // 同一時(shí)刻服務(wù)器只會(huì)發(fā)一條消息給消費(fèi)者
        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消息到達(dá) 觸發(fā)這個(gè)方法
                String msg = new String(body, "utf-8");
                System.out.println("[error]:" + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("error消息執(zhí)行完畢!");
                    // 手動(dòng)回執(zhí)
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);

    }
}

6选浑、消費(fèi)者2號(hào)

消費(fèi)者定義的routingKey 為error蓝厌、infowarning古徒。

package com.hrabbit.rabbitmq.routing.recover;

import com.hrabbit.rabbitmq.utils.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: hrabbit
 * @Date: 2018-06-30 下午7:42
 * @Description:
 */
public class Recover2 {

    //隊(duì)列名稱
    private final static String QUEUE_NAME = "hrabbit_queue_direct_2";
    //交換機(jī)名稱
    private final static String EXCHANGE_NAME = "hrabbit_exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 獲取到連接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();

        // 聲明隊(duì)列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 綁定隊(duì)列到交換機(jī)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        //------------下面邏輯和work模式一樣-----
        // 同一時(shí)刻服務(wù)器只會(huì)發(fā)一條消息給消費(fèi)者
        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 消息到達(dá) 觸發(fā)這個(gè)方法
                String msg = new String(body, "utf-8");
                System.out.println("[info]:" + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("info消息執(zhí)行完畢拓提!");
                    // 手動(dòng)回執(zhí)
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);

    }
}

7.測(cè)試結(jié)果

在消費(fèi)者2中輸出了類型info的消息

image.png

總結(jié):
要記住生產(chǎn)者端的routing key,那么在消費(fèi)者端設(shè)置binding key和之前的routing key一樣隧膘,就可以用direct類型的exchange了代态,以此來(lái)獲取到自己需要的消息。

系列文章:

RabbitMQ:RabbitMQ-理論基礎(chǔ)
RabbitMQ:快速入門hello word
RabbitMQ:RabbitMQ:work queues 工作隊(duì)列(Round-robin/Fair dispatch)
RabbitMQ:RabbitMQ:消息應(yīng)答與消息持久化
RabbitMQ:發(fā)布/訂閱 Publish/Subscribe
RabbitMQ:Topic類型的exchange
RabbitMQ:RabbitMQ之消息確認(rèn)機(jī)制(事務(wù)+Confirm)
RabbitMQ:spring整合RabbitMQ

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末疹吃,一起剝皮案震驚了整個(gè)濱河市蹦疑,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌萨驶,老刑警劉巖歉摧,帶你破解...
    沈念sama閱讀 218,122評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異腔呜,居然都是意外死亡叁温,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門核畴,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)券盅,“玉大人,你說(shuō)我怎么就攤上這事膛檀。” “怎么了娘侍?”我有些...
    開封第一講書人閱讀 164,491評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵咖刃,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我憾筏,道長(zhǎng)嚎杨,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,636評(píng)論 1 293
  • 正文 為了忘掉前任氧腰,我火速辦了婚禮,結(jié)果婚禮上酣难,老公的妹妹穿的比我還像新娘孽文。我一直安慰自己,他們只是感情好真友,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,676評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著紧帕,像睡著了一般盔然。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上是嗜,一...
    開封第一講書人閱讀 51,541評(píng)論 1 305
  • 那天愈案,我揣著相機(jī)與錄音,去河邊找鬼鹅搪。 笑死站绪,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的丽柿。 我是一名探鬼主播恢准,決...
    沈念sama閱讀 40,292評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼航厚!你這毒婦竟也來(lái)了顷歌?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,211評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤幔睬,失蹤者是張志新(化名)和其女友劉穎眯漩,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體麻顶,經(jīng)...
    沈念sama閱讀 45,655評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡赦抖,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,846評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了辅肾。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片队萤。...
    茶點(diǎn)故事閱讀 39,965評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖矫钓,靈堂內(nèi)的尸體忽然破棺而出要尔,到底是詐尸還是另有隱情,我是刑警寧澤新娜,帶...
    沈念sama閱讀 35,684評(píng)論 5 347
  • 正文 年R本政府宣布赵辕,位于F島的核電站,受9級(jí)特大地震影響概龄,放射性物質(zhì)發(fā)生泄漏还惠。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,295評(píng)論 3 329
  • 文/蒙蒙 一私杜、第九天 我趴在偏房一處隱蔽的房頂上張望蚕键。 院中可真熱鬧救欧,春花似錦、人聲如沸锣光。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)嫉晶。三九已至骑疆,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間替废,已是汗流浹背箍铭。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留椎镣,地道東北人诈火。 一個(gè)月前我還...
    沈念sama閱讀 48,126評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像状答,于是被迫代替她去往敵國(guó)和親冷守。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,914評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 15,912評(píng)論 2 11
  • 本文大綱 RabbitMQ 歷史 RabbitMQ 應(yīng)用場(chǎng)景 RabbitMQ 系統(tǒng)架構(gòu) RabbitMQ 基本概...
    Java_Explorer閱讀 16,366評(píng)論 1 40
  • 1. 歷史 RabbitMQ是一個(gè)由erlang開發(fā)的AMQP(Advanced Message Queue )的...
    高廣超閱讀 6,096評(píng)論 3 51
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理惊科,服務(wù)發(fā)現(xiàn)拍摇,斷路器,智...
    卡卡羅2017閱讀 134,657評(píng)論 18 139
  • 來(lái)源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器馆截。支持消息的持久化充活、事務(wù)、擁塞控...
    jiangmo閱讀 10,359評(píng)論 2 34