RabbitMQ進(jìn)階(一)--備份交換器\死信\延時隊列

上一節(jié)講述了RabbitMQ的基礎(chǔ)Api和參數(shù)詳解.本節(jié)將深入學(xué)習(xí)使用一些RabbitMQ的高階用法.比如死信隊列\(zhòng)延時隊列等


1. mandatory參數(shù)和備份交換器

  • mandatory參數(shù)
    在聲明交換器的exchangeDeclare方法中,有一個mandatory參數(shù),當(dāng)設(shè)置為true時,交換器無法根據(jù)自身類型和RoutingKey找到隊列的時候,RabbitMQ會調(diào)用Basic.Return命令將消息返回給生產(chǎn)者.生產(chǎn)者可以在channel中添加一個ReturnListener監(jiān)聽器回調(diào)服務(wù)器返回的消息.代碼如下:
public void mandatoryTest() throws IOException {
        Connection connection = ConnectionCreator.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("mandatory_exchange", BuiltinExchangeType.DIRECT);
        channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
            System.out.println(replyCode + "\t" + replyText);
            System.out.println(new String(body));
        });
        channel.basicPublish("mandatory_exchange", "", true, MessageProperties.TEXT_PLAIN, "mandatory message".getBytes());
        RabbitMQUtils.close(channel, connection);
    }

控制臺運(yùn)行結(jié)果如下:


  • 備份交換器
    備份交換器(Alternate Exchange),可以通過聲明備份交換器的方式處理未被路由的消息,從而避免消息丟失.
    在聲明交換器時,添加alternate-exchange參數(shù),添加交換器的備份交換器.也可以通過策略Policy,同時使用時,前者的優(yōu)先級更高.
    代碼如下:
public void alternateExchangeTest() throws IOException {
        Connection connection = ConnectionCreator.getConnection();
        Channel channel = connection.createChannel();
        // 聲明一個備份交換器, 類型通常為fanout的.從而使任何沒有去處的消息都發(fā)送到備份交換器綁定的隊列中.
        channel.exchangeDeclare("alternate_exchange_test", BuiltinExchangeType.FANOUT, true, false, null);
        // 聲明交換器并添加備份交換器參數(shù)
        Map<String, Object> args = new HashMap<>();
        args.put("alternate-exchange", "alternate_exchange_test");
        channel.exchangeDeclare("normal_exchange", BuiltinExchangeType.DIRECT, true, false, args);
        
        channel.queueDeclare("alternate_queue_test", true, false, false, null);
        channel.queueDeclare("normal_queue", true, false, false, null);
        channel.queueBind("normal_queue", "normal_exchange", "normal_routing");
        // 綁定備份交換器和對應(yīng)的隊列
        channel.queueBind("alternate_queue_test", "alternate_exchange_test", "alternate_routing");
        channel.basicPublish("normal_exchange", "wrong_routing",new AMQP.BasicProperties.Builder().build(), "testMessage".getBytes());
        RabbitMQUtils.close(channel, connection);
    }

消息發(fā)送后,由于沒有對應(yīng)的路由鍵,因此消息將會發(fā)送到備份交換器,通過備份交換器發(fā)送到綁定的隊列中.可以在管理頁面看到:



對于備份交換器有以下幾點:

  1. 如果設(shè)置的備份交換器不存在,則消息丟失,沒有異常
  2. 如果備份交換器沒有綁定任何隊列,則消息丟失,沒有異常
  3. 如果備份交換器沒有匹配的RoutingKey的隊列,則消息丟失,沒有異常
  4. 如果備份交換器和mandatory參數(shù)同事使用,則mandatory參數(shù)不生效

2.過期時間(TTL)

RabbitMQ可以對消息和隊列設(shè)置過期時間

  • 對消息設(shè)置過期時間
  1. 通過隊列設(shè)置消息的過期時間.
    在聲明隊列時,加入x-message-ttl參數(shù)實現(xiàn)的,參數(shù)單位是ms
    代碼實例如下:
 public void messageTTLByQueueTest() throws IOException {
        Connection connection = ConnectionCreator.getConnection();
        Channel channel = connection.createChannel();
        Map<String, Object> map = new HashMap<>();
        map.put("x-message-ttl", 5000);
        channel.queueDeclare("ttl_queue", true, false, false, map);
        channel.queueBind("ttl_queue", RabbitMQUtils.TEST_EXCHANGE, "ttl_routing");
        channel.basicPublish(RabbitMQUtils.TEST_EXCHANGE, "ttl_routing", null, "ttl_message".getBytes());
        RabbitMQUtils.close(channel, connection);

    }
  1. 通過消息屬性設(shè)置消息的過期時間
    在channel.basicPublish方法中加入expiration的屬性參數(shù),單位為ms.示例代碼如下:
public void messageTTLByMessageTest() throws IOException {
        Connection connection = ConnectionCreator.getConnection();
        Channel channel = connection.createChannel();
        channel.basicPublish(RabbitMQUtils.TEST_EXCHANGE, RabbitMQUtils.TEST_ROUTING,
                new AMQP.BasicProperties().builder().expiration("30000").build(), "message_ttl_test".getBytes());
        RabbitMQUtils.close(channel, connection);
    }

對于第一種隊列設(shè)置TTL屬性的方法,一旦消息過期,就會從隊列中抹去.而在第二種方法中,即使消息過期,也不會馬上從隊列中抹去.因為每條消息是否過期是在即將投遞前判定的.而第二種方法中消息過期時間不同.

  • 設(shè)置隊列的ttl
    聲明隊列時加入x-expires參數(shù).如下:
public void queueTTLTest() throws IOException {
        Connection connection = ConnectionCreator.getConnection();
        Channel channel = connection.createChannel();
        Map<String, Object> args = new HashMap<>();
        args.put("x-expires", 60000);
        channel.queueDeclare("ttl_queue_test", true, false, false, args);
        RabbitMQUtils.close(channel, connection);
    }

隊列到期后會自動被刪除


3. 死信隊列

DLX: Dead-Letter-Exchange 死信交換器.當(dāng)消息在一個隊列中變成死信之后,能夠被重新投遞到另一個交換器中.這個交換器就是DLX.消息變成死信一般由于以下幾種情況:

  • 消息被拒絕 basic.reject basic.nack 并且設(shè)置requeue為false
  • 消息過期
  • 隊列達(dá)到最大長度
    代碼示例如下:
public void DLXTest() throws IOException {
        Connection connection = ConnectionCreator.getConnection();
        Channel channel = connection.createChannel();
        // 聲明一個作為死信隊列的路由和隊列并綁定
        channel.exchangeDeclare("dlx_test", BuiltinExchangeType.DIRECT);
        channel.queueDeclare("dlx_queue", true, false, false, null);
        channel.queueBind("dlx_queue", "dlx_test", "dlx_routing");
        // 聲明一個隊列,設(shè)置死信交換器和消息過期時間
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 5000);
        args.put("x-dead-letter-exchange", "dlx_test");
        args.put("x-dead-letter-exchange-routing-key", "dlx_routing");
        channel.queueDeclare("normal_queue", true, false, false, args);

        channel.queueBind("normal_queue", RabbitMQUtils.TEST_EXCHANGE, "dlx_routing");

        channel.basicPublish(RabbitMQUtils.TEST_EXCHANGE, "dlx_routing", null, "dead letter queue test".getBytes());

        RabbitMQUtils.close(channel, connection);
}

4.延遲隊列

上面的死信隊列+過期時間配合使用就實現(xiàn)了一個延遲隊列.
消息到達(dá)隊列后,一段時間沒有被消費(fèi),就進(jìn)入到死信隊列.

5. 優(yōu)先級隊列

可以通過隊列的x-max-priority屬性設(shè)置隊列的最大優(yōu)先級.
發(fā)布消息的時候設(shè)置priority屬性設(shè)置消息的優(yōu)先級.
如果節(jié)點中消息有堆積.則高優(yōu)先級的消息會被優(yōu)先消費(fèi).

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末篷扩,一起剝皮案震驚了整個濱河市拧烦,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖凤壁,帶你破解...
    沈念sama閱讀 218,546評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異荷科,居然都是意外死亡笔链,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評論 3 395
  • 文/潘曉璐 我一進(jìn)店門趣惠,熙熙樓的掌柜王于貴愁眉苦臉地迎上來狸棍,“玉大人,你說我怎么就攤上這事味悄〔莞辏” “怎么了?”我有些...
    開封第一講書人閱讀 164,911評論 0 354
  • 文/不壞的土叔 我叫張陵侍瑟,是天一觀的道長唐片。 經(jīng)常有香客問我丙猬,道長,這世上最難降的妖魔是什么费韭? 我笑而不...
    開封第一講書人閱讀 58,737評論 1 294
  • 正文 為了忘掉前任茧球,我火速辦了婚禮,結(jié)果婚禮上星持,老公的妹妹穿的比我還像新娘抢埋。我一直安慰自己,他們只是感情好督暂,可當(dāng)我...
    茶點故事閱讀 67,753評論 6 392
  • 文/花漫 我一把揭開白布揪垄。 她就那樣靜靜地躺著,像睡著了一般逻翁。 火紅的嫁衣襯著肌膚如雪饥努。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,598評論 1 305
  • 那天八回,我揣著相機(jī)與錄音肪凛,去河邊找鬼。 笑死辽社,一個胖子當(dāng)著我的面吹牛伟墙,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播滴铅,決...
    沈念sama閱讀 40,338評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼戳葵,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了汉匙?” 一聲冷哼從身側(cè)響起拱烁,我...
    開封第一講書人閱讀 39,249評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎噩翠,沒想到半個月后戏自,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,696評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡伤锚,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,888評論 3 336
  • 正文 我和宋清朗相戀三年擅笔,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片屯援。...
    茶點故事閱讀 40,013評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡猛们,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出狞洋,到底是詐尸還是另有隱情弯淘,我是刑警寧澤,帶...
    沈念sama閱讀 35,731評論 5 346
  • 正文 年R本政府宣布吉懊,位于F島的核電站庐橙,受9級特大地震影響假勿,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜态鳖,卻給世界環(huán)境...
    茶點故事閱讀 41,348評論 3 330
  • 文/蒙蒙 一废登、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧郁惜,春花似錦、人聲如沸甲锡。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽缤沦。三九已至虎韵,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間缸废,已是汗流浹背包蓝。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留企量,地道東北人测萎。 一個月前我還...
    沈念sama閱讀 48,203評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像届巩,于是被迫代替她去往敵國和親硅瞧。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,960評論 2 355

推薦閱讀更多精彩內(nèi)容