RabbitMQ(五) - 路由(Routing)

路由(routing)

上一個(gè)教程中,我們實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的日志系統(tǒng)买喧。我們將日志消息廣播到很多個(gè)消費(fèi)者。

在這個(gè)教程我們將給它加一個(gè)特性 —— 我們將使它可以只訂閱消息的一個(gè)子集。例如:我們直接將致命的錯(cuò)誤信息打印到日志(保存在磁盤中)旨巷,同時(shí)能將所有的日志信息打印在控制臺(tái)上。

綁定(Bindings)

在前面的教程中我們已經(jīng)用到了綁定添忘,可以重新調(diào)用那段代碼:

channel.queueBind(queueName, EXCHANGE_NAME, "");

一個(gè)exchange和queue之間的綁定關(guān)系采呐。換句話說(shuō):這個(gè)queue對(duì)exchange中的消息感興趣。

綁定需要增加一個(gè)額外的參數(shù)routingKey搁骑。為了避免和推送消息中basic_publish的參數(shù)名造成混亂斧吐,消費(fèi)者中我們叫它binding key。下面展示創(chuàng)建一個(gè)binding key:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

binding key的關(guān)鍵在于依賴的exchange類型仲器。類型為fanout的exchange煤率,上個(gè)教程用到的,忽略了它的值乏冀。

直接交換(direct exchange)

上個(gè)教程我們的日志系統(tǒng)直接將所有的信息廣播給所有的消費(fèi)者蝶糯。我們想在這個(gè)的基礎(chǔ)上根據(jù)消息的嚴(yán)重程度進(jìn)行過(guò)濾。例如:我們想有一個(gè)程序用來(lái)將重要的錯(cuò)誤信息存儲(chǔ)到磁盤當(dāng)中辆沦,而不像浪費(fèi)空間去存儲(chǔ)warning或者是info級(jí)別的日志消息昼捍。

我們使用的fanout類型的exchange,并不能給我們提供這樣的靈活性 —— 它只能盲目的進(jìn)行廣播肢扯。

我們將用direct類型的exchange來(lái)代替它妒茬。direct exchange背后的路由算法很簡(jiǎn)單 —— 生產(chǎn)者的routing key 完全匹配消費(fèi)者中的binding key

為了說(shuō)明這些蔚晨,請(qǐng)看下圖中的配置:

rabbit-direct

在上圖的配置中乍钻,我們看到有兩個(gè)隊(duì)列綁定了類型為direct的exchange X。第一個(gè)隊(duì)列綁定了的key為orange,第二個(gè)隊(duì)列有兩個(gè)綁定团赁,一個(gè)綁定的key為black育拨,另一個(gè)是green

在這樣的一個(gè)配置中欢摄,推送到exchange的消息熬丧,routing key為orange的消息將路由到隊(duì)列1(Q1)中,routing key為black或者為green的消息將路由到隊(duì)列2(Q2)中怀挠。其他的消息將被丟棄析蝴。

多個(gè)綁定(multiple bindings)

多個(gè)隊(duì)列綁定相同的key是完全允許的。在我們的例子當(dāng)中绿淋,我們可以在XQ1之間添加一個(gè)綁定key為black的關(guān)系闷畸。這樣,類型為direct的exchange就類似于fanout了吞滞,能將消息廣播到所有匹配的隊(duì)列中佑菩。當(dāng)消息的routing key為black時(shí),將分發(fā)到Q1Q2中裁赠。

發(fā)送日志(emitting logs)

我們將在我們的日志系統(tǒng)中使用這個(gè)模式殿漠。將消息發(fā)送到類型為direct的exchange中,而不是fanout類型的exchange佩捞。這樣接收程序就能選擇重要的消息接收了绞幌。
像以前一樣,我們需要?jiǎng)?chuàng)建一個(gè)exchange先:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

然后發(fā)送一個(gè)消息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

簡(jiǎn)單起見一忱,我們將嚴(yán)重的級(jí)別的定義為:info莲蜘、warning、error帘营。

訂閱(subscribing)

接收消息將和前面的教程差不多票渠,但有一點(diǎn)除外 —— 我們將給我們所有感興趣的每種嚴(yán)重的消息創(chuàng)建綁定關(guān)系。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

信息匯總

rabbitmq-routing

官網(wǎng)的使用命令行執(zhí)行的仪吧。這里我們將一次性的向exchange發(fā)送6條消息庄新,info、warn薯鼠、error三個(gè)級(jí)別各兩條择诈。如上圖,我們創(chuàng)建兩個(gè)消費(fèi)者出皇,這里我們創(chuàng)建兩個(gè)類羞芍,一個(gè)接收routing key為error的消息,并將其打印到文件中郊艘;另一個(gè)接收所有消息并打印到控制臺(tái)荷科。

EmitLogDirect.java

package com.roachfu.tutorial.rabbitmq.website.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct.log";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        System.out.println(" [*] begin sent message to exchange");
        /* 分別發(fā)送兩條 info,warn,error基本的消息 */
        channel.basicPublish(EXCHANGE_NAME, "error", null, "[error] - this is first error message.".getBytes("UTF-8"));
        channel.basicPublish(EXCHANGE_NAME, "warn", null, "[warn ] - this is first warn message.".getBytes("UTF-8"));
        channel.basicPublish(EXCHANGE_NAME, "info", null, "[info ] - this is first info message.".getBytes("UTF-8"));
        channel.basicPublish(EXCHANGE_NAME, "error", null, "[error] - this is second error message.".getBytes("UTF-8"));
        channel.basicPublish(EXCHANGE_NAME, "warn", null, "[warn ] - this is second warn message.".getBytes("UTF-8"));
        channel.basicPublish(EXCHANGE_NAME, "info", null, "[info ] - this is second info message.".getBytes("UTF-8"));
        System.out.println( " [x] done. . . ");

        channel.close();
        connection.close();
    }
}

ReceiveLogDirectToConsole

package com.roachfu.tutorial.rabbitmq.website.direct;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLogDirectToConsole {

    private static final String EXCHANGE_NAME = "direct.log";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "info");
        channel.queueBind(queueName, EXCHANGE_NAME, "warn");
        channel.queueBind(queueName, EXCHANGE_NAME, "error");

        System.out.println(" [*] Waiting for message and handle it to console . . . ");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] receive " + envelope.getRoutingKey() + " : '" + message + "'");
            }
        };

        channel.basicConsume(queueName,consumer);
    }
}

ReceiveLogDirectToFile

package com.roachfu.tutorial.rabbitmq.website.direct;

import com.rabbitmq.client.*;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveLogDirectToFile {

    private static final String EXCHANGE_NAME = "direct.log";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "error");

        System.out.println(" [*] Waiting for message and handle it to file . . . ");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                File file = new File("/temp/direct.log");

                FileOutputStream out = new FileOutputStream(file, true);
                out.write(body);
                out.write(("\r\n").getBytes());
                out.flush();
                out.close();
            }
        };

        channel.basicConsume(queueName,consumer);
    }
}

我們先運(yùn)行兩個(gè)消費(fèi)者唯咬,然后運(yùn)行生產(chǎn)者∥方看輸出結(jié)果:

ReceiveLogDirectToConsole 消費(fèi)者

rabbitmq-direct-console

ReceiveLogDierctToFile 消費(fèi)者

rabbitmq-direct-file
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末胆胰,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子刻获,更是在濱河造成了極大的恐慌蜀涨,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,183評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蝎毡,死亡現(xiàn)場(chǎng)離奇詭異厚柳,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)沐兵,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,850評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門别垮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人扎谎,你說(shuō)我怎么就攤上這事碳想。” “怎么了簿透?”我有些...
    開封第一講書人閱讀 168,766評(píng)論 0 361
  • 文/不壞的土叔 我叫張陵移袍,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我老充,道長(zhǎng),這世上最難降的妖魔是什么螟左? 我笑而不...
    開封第一講書人閱讀 59,854評(píng)論 1 299
  • 正文 為了忘掉前任啡浊,我火速辦了婚禮,結(jié)果婚禮上胶背,老公的妹妹穿的比我還像新娘巷嚣。我一直安慰自己,他們只是感情好钳吟,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,871評(píng)論 6 398
  • 文/花漫 我一把揭開白布廷粒。 她就那樣靜靜地躺著,像睡著了一般红且。 火紅的嫁衣襯著肌膚如雪坝茎。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,457評(píng)論 1 311
  • 那天暇番,我揣著相機(jī)與錄音嗤放,去河邊找鬼。 笑死壁酬,一個(gè)胖子當(dāng)著我的面吹牛次酌,可吹牛的內(nèi)容都是我干的恨课。 我是一名探鬼主播,決...
    沈念sama閱讀 40,999評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼岳服,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼剂公!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起吊宋,我...
    開封第一講書人閱讀 39,914評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤纲辽,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后贫母,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體文兑,經(jīng)...
    沈念sama閱讀 46,465評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡泵琳,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,543評(píng)論 3 342
  • 正文 我和宋清朗相戀三年闲勺,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片炫刷。...
    茶點(diǎn)故事閱讀 40,675評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡橘原,死狀恐怖籍铁,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情趾断,我是刑警寧澤拒名,帶...
    沈念sama閱讀 36,354評(píng)論 5 351
  • 正文 年R本政府宣布,位于F島的核電站芋酌,受9級(jí)特大地震影響增显,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜脐帝,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,029評(píng)論 3 335
  • 文/蒙蒙 一同云、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧堵腹,春花似錦炸站、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,514評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至腿堤,卻和暖如春阀坏,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背释液。 一陣腳步聲響...
    開封第一講書人閱讀 33,616評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工全释, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人误债。 一個(gè)月前我還...
    沈念sama閱讀 49,091評(píng)論 3 378
  • 正文 我出身青樓浸船,卻偏偏與公主長(zhǎng)得像妄迁,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子李命,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,685評(píng)論 2 360

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理登淘,服務(wù)發(fā)現(xiàn),斷路器封字,智...
    卡卡羅2017閱讀 134,707評(píng)論 18 139
  • 來(lái)源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器黔州。支持消息的持久化、事務(wù)阔籽、擁塞控...
    jiangmo閱讀 10,367評(píng)論 2 34
  • 1 RabbitMQ安裝部署 這里是ErLang環(huán)境的下載地址http://www.erlang.org/down...
    Bobby0322閱讀 2,243評(píng)論 0 11
  • 1. 歷史 RabbitMQ是一個(gè)由erlang開發(fā)的AMQP(Advanced Message Queue )的...
    高廣超閱讀 6,097評(píng)論 3 51
  • RabbitMQ 即一個(gè)消息隊(duì)列流妻,主要是用來(lái)實(shí)現(xiàn)應(yīng)用程序的異步和解耦,同時(shí)也能起到消息緩沖笆制,消息分發(fā)的作用绅这。 消息...
    彩虹之夢(mèng)閱讀 1,088評(píng)論 2 1