RabbitMQ系列-4.備份交換器侄旬、死信隊(duì)列、延遲隊(duì)列與優(yōu)先級(jí)隊(duì)列

概述

????本文主要介紹一下RabbitMQ中的備份交換器煌妈、死信隊(duì)列勾怒、延遲隊(duì)列以及優(yōu)先級(jí)隊(duì)列

備份交換器

mandatory參數(shù)

????在介紹備份交換器之前我們?cè)诨仡櫼幌耤hannel.basicPublish中的mandatory參數(shù),它有當(dāng)消息傳遞過(guò)程中不可達(dá)目的地時(shí)將消息返回給生產(chǎn)者的功能:mandatory參數(shù)如果設(shè)為true声旺,當(dāng)交換器無(wú)法根據(jù)自身的類(lèi)型和路由鍵找到一個(gè)符合條件的隊(duì)列時(shí),那么RabbitMQ會(huì)調(diào)用Basic.Return命令將消息返回給生產(chǎn)者 段只。當(dāng)mandatory參數(shù)設(shè)置為false時(shí)腮猖,出現(xiàn)上述情形,則消息直接被丟棄赞枕。
????那么生產(chǎn)者如何知道發(fā)布的消息有沒(méi)有被正確的路由到合適的隊(duì)列呢澈缺?這時(shí)候可以通過(guò)調(diào)用channel.addReturnListener監(jiān)聽(tīng)器實(shí)現(xiàn):

channel.basicPublish(EXCHANGE_NAME, "", true, 
                     MessageProperties.PERSISTENT_TEXT_PLAIN,
                     "mandatory test".getBytes()); 

channel.addReturnListener(new ReturnListener() {
    public void handleReturn(int replyCode, String replyText, String exchange,               
                             String routingKey, AMQP.BasicProperties basicProperties,
                             byte[] body) throws IOException {
           String message = new String(body);
           System.out.println("Basic.Return 返回的結(jié)果是: "+ message);
    }
});

????上面代碼中生產(chǎn)者沒(méi)有成功地將消息路由到隊(duì)列,此時(shí)RabbitMQ會(huì)通過(guò)Basic.Return返回"mandatory test"這條消息炕婶,之后生產(chǎn)者客戶端通過(guò)ReturnListener監(jiān)昕到了這個(gè)事件姐赡,上面代碼的最后輸出應(yīng)該是:"Basic.Return 返回的結(jié)果是 : mandatory test"。
????備份交換器柠掂,英文名稱為:Altemate Exchange(簡(jiǎn)稱AE)项滑,生產(chǎn)者在發(fā)送消息的時(shí)候如果不設(shè)置mandatory參數(shù) ,那么消息在未被路由的情況下將會(huì)丟失 : 如果設(shè)置了mandatory參數(shù)涯贞,那么需要添加ReturnListener的編程邏輯枪狂,生產(chǎn)者的代碼將變得復(fù)雜。如果既不想復(fù)雜化生產(chǎn)者的編程邏輯宋渔,又不想消息丟失州疾,那么可以使用備份交換器,這樣可以將未被路由的消息存儲(chǔ)在RabbitMQ中皇拣,再在需要的時(shí)候去處理這些消息严蓖。

聲明備份交換器

????聲明備份交換器可以在調(diào)用channel.exchangeDeclare方法時(shí)添加alternate-exchange參數(shù)來(lái)實(shí)現(xiàn),也可以通過(guò)策略Policy的方式實(shí)現(xiàn)氧急。如果兩者同時(shí)使用颗胡,則前者的優(yōu)先級(jí)更高,會(huì)覆蓋掉Policy的設(shè)置吩坝。

Map<String, Object> args = new HashMap<String, Object>(); 
args.put("alternate-exchange", "myAe");
//聲明交換器
channel.exchangeDeclare("normalExchange", "direct", true, false, args); 
channel.exchangeDeclare("myAe", "fanout", true, false, null); 
//聲明隊(duì)列
channel.queueDeclare("normalQueue", true, false, false, null);
channel.queueDeclare("unroutedQueue", true, false, false, null);
//交換器與隊(duì)列綁定
channel.queueBind("normalQueue", "normalExchange",  "normalKey"); 
channel.queueBind( "unroutedQueue", "myAe", "" ) ;

????上面的代碼中聲明了兩個(gè)交換器:normalExchange和myAe杭措,分別綁定了normalQueue和uroutedQueue這兩個(gè)隊(duì)列,同時(shí)將myAe設(shè)置為normalExchange的備份交換器(注意myAe的交換器類(lèi)型為fanout)钾恢。
????如果此時(shí)發(fā)送一條消息到normalExchange上手素,當(dāng)路由鍵等于"normalKey"的時(shí)候鸳址,消息能正確路由到normalQueue這個(gè)隊(duì)列中。如果路由鍵設(shè)為其他值泉懦,比如"errorKey" 則消息不能被正確地路由到與normalExchange綁定的任何隊(duì)列上稿黍,此時(shí)就會(huì)發(fā)送給myAe,進(jìn)而發(fā)送到unroutedQueue這個(gè)隊(duì)列崩哩。

備份交換器

????備份交換器其實(shí)和普通的交換器沒(méi)有太大的區(qū)別巡球,為了方便使用,建議設(shè)置為fanout類(lèi)型邓嘹,如若想設(shè)置為direct或者topic的類(lèi)型也沒(méi)有什么不妥酣栈,個(gè)人建議使用fanout類(lèi)型:考慮這樣一種情況,如果備份交換器的類(lèi)型是direct汹押,并且有一個(gè)與其綁定的隊(duì)列矿筝,假設(shè)綁定的路由鍵是key1,當(dāng)某條攜帶路由鍵為key2的消息被轉(zhuǎn)發(fā)到這個(gè)備份交換器的時(shí)候棚贾,備份交換器沒(méi)有匹配到合適的隊(duì)列窖维,則消息仍丟失,這樣備份交換器只能存儲(chǔ)攜帶路由鍵key1的消息妙痹。
????需要注意的是铸史,消息被重新發(fā)送到備份交換器時(shí)的路由鍵和從生產(chǎn)者發(fā)出的路由鍵是一樣的
????對(duì)于備份交換器怯伊,總結(jié)了以下幾種特殊情況:

  • 如果設(shè)置的備份交換器不存在琳轿,客戶端和RabbitMQ服務(wù)端都不會(huì)有異常出現(xiàn),此時(shí)消息會(huì)丟失耿芹。
  • 如果備份交換器沒(méi)有綁定任何隊(duì)列利赋,客戶端和RabbitMQ 服務(wù)端都不會(huì)有異常出現(xiàn),此時(shí)消息會(huì)丟失猩系。
  • 如果備份交換器沒(méi)有任何匹配的隊(duì)列媚送,客戶端和RabbitMQ服務(wù)端都不會(huì)有異常出現(xiàn),此時(shí)消息會(huì)丟失寇甸。
  • 如果備份交換器和mandatory參數(shù)一起使用塘偎,那么mandatory參數(shù)無(wú)效。

設(shè)置消息過(guò)期時(shí)間(TTL:Time to Live)

????RabbitMQ支持對(duì)消息和隊(duì)列設(shè)置過(guò)期時(shí)間
????設(shè)置過(guò)期時(shí)間的方式可以在聲明隊(duì)列時(shí)給隊(duì)列設(shè)置過(guò)期時(shí)間屬性拿霉,如果一個(gè)隊(duì)列設(shè)置了過(guò)期時(shí)間吟秩,那么隊(duì)列中的所有消息都有相同的過(guò)期時(shí)間,也可以在發(fā)送消息時(shí)對(duì)消息本身進(jìn)行單獨(dú)設(shè)置
????如果我們同時(shí)設(shè)置了隊(duì)列過(guò)期時(shí)間和消息過(guò)期時(shí)間绽淘,那么消息最終的TTL取最小值涵防,消息在隊(duì)列中生存時(shí)間如果超過(guò)設(shè)置的TTL值就會(huì)變成"死信"(Dead Message),消費(fèi)者將無(wú)法再收到該條消息(不是絕對(duì)的沪铭,可以使用死信隊(duì)列)
????對(duì)于給隊(duì)列設(shè)置過(guò)期時(shí)間這種方式一旦消息過(guò)期壮池,就會(huì)從隊(duì)列中移除偏瓤,而給消息設(shè)置過(guò)期時(shí)間這種方式,即使消息過(guò)期椰憋,也不會(huì)馬上從隊(duì)列中移除厅克,因?yàn)槊織l消息是否過(guò)期是在即將投遞到消費(fèi)者之前判定的。
????為什么這兩種方法處理的方式不一樣?因?yàn)殛?duì)列設(shè)置過(guò)期時(shí)間這種方式橙依,隊(duì)列中己過(guò)期的消息肯定在隊(duì)列頭部证舟,RabbitMQ只要定期從隊(duì)頭開(kāi)始掃描是否有過(guò)期的消息即可。而給消息設(shè)置過(guò)期時(shí)間窗骑,每條消息的過(guò)期時(shí)間不同女责,如果要?jiǎng)h除所有過(guò)期消息勢(shì)必要掃描整個(gè)隊(duì)列,所以不如等到此消息即將被消費(fèi)時(shí)再判定是否過(guò)期创译,如果過(guò)期再進(jìn)行刪除即可抵知。

  • 給隊(duì)列設(shè)置過(guò)期時(shí)間
    在隊(duì)列聲明channel.queueDeclare方法中加入x-message-ttl可以實(shí)現(xiàn)給隊(duì)列設(shè)置過(guò)期時(shí)間,參數(shù)的單位是毫秒昔榴。
Map<String, Object> argss = new HashMap<String, Object>(); 
argss.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, argss);

?????也可以通過(guò)Policy的方式來(lái)設(shè)置TTL,示例如下 :

./rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues

?????如果不設(shè)置TTL則表示此消息不會(huì)過(guò)期
?????如果將TTL設(shè)置為0碘橘,則表示除非此時(shí)可以直接將消息投遞到消費(fèi)者互订,否則該消息會(huì)被立即丟棄。

  • 給消息單獨(dú)設(shè)置過(guò)期時(shí)間
    在發(fā)送消息channel.basicPublish方法中加入expiration的屬性參數(shù)痘拆,單位為毫秒仰禽。
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); 
//持久化消息
builder.deliveryMode(2);
//設(shè)置消息過(guò)期時(shí)間為60000ms
builder.expiration("60000");
AMQP.BasicProperties properties = builder.build() ; 
channel.basicPublish(exchangeName, routingKey, mandatory, properties,"Hello RabbitMQ".getBytes());

死信隊(duì)列

????死信隊(duì)列全稱為Dead-Letter-Exchange(DLX),也可以稱為死信交換器或死信郵箱,當(dāng)消息在一個(gè)隊(duì)列中變成死信 (dead message) 之后纺蛆,它能被重新被發(fā)送到另一個(gè)交換器中吐葵,這個(gè)交換器就是DLX,綁定DLX的隊(duì)列就稱之為死信隊(duì)列桥氏。
????消息變成死信的情形:
????????-- 消息被拒絕温峭,并且設(shè)置requeque參數(shù)為false
????????-- 消息過(guò)期
????????-- 隊(duì)列長(zhǎng)度達(dá)到最大值
????DLX也是一個(gè)正常的交換器,和一般的交換器沒(méi)有區(qū)別字支,它能在任何的隊(duì)列上被指定 凤藏,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性。當(dāng)這個(gè)隊(duì)列中存在死信時(shí)堕伪,RabbitMQ就會(huì)自動(dòng)地將這個(gè)消息重新發(fā)布到設(shè)置的DLX上去揖庄,進(jìn)而被路由到另一個(gè)隊(duì)列,即死信隊(duì)列欠雌√闵遥可以監(jiān)聽(tīng)這個(gè)隊(duì)列中的消息以進(jìn)行相應(yīng)的處理。
配置死信隊(duì)列

  • 方式一:通過(guò)在隊(duì)列聲明channel.queueDeclare方法中設(shè)置x-dead-letter-exchange參數(shù)可以為一個(gè)隊(duì)列添加一個(gè)死信隊(duì)列DLX
//先聲明一個(gè)死信隊(duì)列: dlx_exchange 
channel.exchangeDeclare("dlx_exchange", "direct"); 
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "dlx_exchange ");
//為隊(duì)列myqueue添加死信隊(duì)列
channel.queueDeclare("myqueue", false, false, false, args); 

????RabbitMQ允許我們?yōu)樗佬抨?duì)列指定路由鍵富俄,如果沒(méi)有特殊指定禁炒,則使用原隊(duì)列的路由鍵而咆。

  • 方式二:當(dāng)然這里也可以通過(guò)Policy的方式設(shè)置:
./rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":" dlx_exchange"}' --apply-to queues

????對(duì)于 RabbitMQ 來(lái)說(shuō),死信隊(duì)列是一個(gè)非常有用的特性齐苛,消息不能被消費(fèi)者正確消費(fèi)(消費(fèi)者調(diào)用了Basic.Nack 或者 Basic.Reject) 而被置入死信隊(duì)列中的情況翘盖,后續(xù)分析程序可以通過(guò)消費(fèi)這個(gè)死信隊(duì)列中的內(nèi)容來(lái)分析當(dāng)時(shí)所遇到的異常情況,進(jìn)而可以改善和優(yōu)化系統(tǒng)凹蜂。

延遲隊(duì)列

????延遲隊(duì)列存儲(chǔ)的對(duì)象是對(duì)應(yīng)的延遲消息馍驯,所謂延遲消息就是指消息被發(fā)送后,并不想讓消費(fèi)者立即拿到消息玛痊,而是在等待特定時(shí)間后消費(fèi)者才可以拿到這個(gè)消息進(jìn)行消費(fèi)汰瘫。
????在AMQP協(xié)議中,或者RabbitMQ本身沒(méi)有直接支持延遲隊(duì)列的功能擂煞,但是可以通過(guò)前面介紹的DLX和TTL模擬出延遲隊(duì)列的功能:假設(shè)一個(gè)應(yīng)用中需要將每條消息都設(shè)置為10秒的延遲混弥,生產(chǎn)者通過(guò) exchange.normal這個(gè)交換器將發(fā)送的消息存儲(chǔ)在 queue.normal這個(gè)隊(duì)列中。消費(fèi)者訂閱的并非是 queue.normal 這個(gè)隊(duì)列对省,而是它的死信隊(duì)列queue.dlx 蝗拿。當(dāng) 消息從queue.normal 這個(gè)隊(duì)列中過(guò)期之后被存入queue.dlx這個(gè)隊(duì)列中,消費(fèi)者就恰巧消費(fèi)到了延遲10秒的這條消息蒿涎。

優(yōu)先級(jí)隊(duì)列

????優(yōu)先級(jí)隊(duì)列哀托,顧名思義,具有高優(yōu)先級(jí)的隊(duì)列具有高的優(yōu)先權(quán)劳秋,優(yōu)先級(jí)高的消息具備優(yōu)先被消費(fèi)的特權(quán)仓手。
????可以通過(guò)設(shè)置隊(duì)列的x-max-priority參數(shù)來(lái)實(shí)現(xiàn)

Map<String, Object> args = new HashMap<String, Object>() ;
args.put( "x-max-priority", 10) ; 
channel.queueDeclare("queue.priority", true, fa1se, false, args) ;

上面的代碼演示的是如何配置一個(gè)隊(duì)列的最大優(yōu)先級(jí)。在此之后玻淑,需要在發(fā)送時(shí)在消息中設(shè)置消息當(dāng)前的優(yōu)先級(jí):

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); 
builder.priority(5) ;
AMQP.BasicProperties properties = builder.build() ; 
channel.basicPublish("exchange_priority", "rk_priority", properties, "Hello RabbitMQ".getBytes());

????上面的代碼中設(shè)置消息的優(yōu)先級(jí)為5嗽冒。默認(rèn)最低為0,最高為隊(duì)列設(shè)置的最大優(yōu)先級(jí)补履。優(yōu)先級(jí)高的消息可以被優(yōu)先消費(fèi)添坊,這個(gè)也是有前提的 : 如果在消費(fèi)者的消費(fèi)速度大于生產(chǎn)者的速度,且 Broker中沒(méi)有消息堆積的情況下箫锤,對(duì)發(fā)送的消息設(shè)置優(yōu)先級(jí)也就沒(méi)有什么實(shí)際意義帅腌。因?yàn)樯a(chǎn)者剛發(fā)送完一條消息就被消費(fèi)者消費(fèi)了,那么就相當(dāng)于 Broker中至多只有一條消息麻汰,對(duì)于單條消息來(lái)說(shuō)優(yōu)先級(jí)是沒(méi)有什么意義的速客。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市五鲫,隨后出現(xiàn)的幾起案子溺职,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,265評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件浪耘,死亡現(xiàn)場(chǎng)離奇詭異乱灵,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)七冲,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門(mén)痛倚,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人澜躺,你說(shuō)我怎么就攤上這事蝉稳。” “怎么了掘鄙?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,852評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵耘戚,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我操漠,道長(zhǎng)收津,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,408評(píng)論 1 283
  • 正文 為了忘掉前任浊伙,我火速辦了婚禮撞秋,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘嚣鄙。我一直安慰自己吻贿,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,445評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布拗慨。 她就那樣靜靜地躺著廓八,像睡著了一般奉芦。 火紅的嫁衣襯著肌膚如雪赵抢。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,772評(píng)論 1 290
  • 那天声功,我揣著相機(jī)與錄音烦却,去河邊找鬼。 笑死先巴,一個(gè)胖子當(dāng)著我的面吹牛其爵,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播伸蚯,決...
    沈念sama閱讀 38,921評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼摩渺,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了剂邮?” 一聲冷哼從身側(cè)響起摇幻,我...
    開(kāi)封第一講書(shū)人閱讀 37,688評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后绰姻,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體枉侧,經(jīng)...
    沈念sama閱讀 44,130評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,467評(píng)論 2 325
  • 正文 我和宋清朗相戀三年狂芋,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了榨馁。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片精堕。...
    茶點(diǎn)故事閱讀 38,617評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡膛虫,死狀恐怖录豺,靈堂內(nèi)的尸體忽然破棺而出陆错,到底是詐尸還是另有隱情毡庆,我是刑警寧澤壮啊,帶...
    沈念sama閱讀 34,276評(píng)論 4 329
  • 正文 年R本政府宣布坎藐,位于F島的核電站遇西,受9級(jí)特大地震影響灭衷,放射性物質(zhì)發(fā)生泄漏次慢。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,882評(píng)論 3 312
  • 文/蒙蒙 一翔曲、第九天 我趴在偏房一處隱蔽的房頂上張望迫像。 院中可真熱鬧,春花似錦瞳遍、人聲如沸闻妓。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,740評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)由缆。三九已至,卻和暖如春猾蒂,著一層夾襖步出監(jiān)牢的瞬間均唉,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,967評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工肚菠, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留舔箭,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,315評(píng)論 2 360
  • 正文 我出身青樓蚊逢,卻偏偏與公主長(zhǎng)得像层扶,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子烙荷,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,486評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容

  • 概述 RabbitMQ是目前非常熱門(mén)的一款消息中間件镜会,不管是互聯(lián)網(wǎng)行業(yè)還是傳統(tǒng)行業(yè)都在大量地使用 。 Rabbit...
    Tian_Peng閱讀 1,667評(píng)論 1 4
  • 品茶時(shí)的最后一杯是白水终抽,又名太平湯戳表,留在舌間是淡淡的甜焰薄,原味能容百味…… 2018年12月6日,迎來(lái)了今冬的第一場(chǎng)...
    嘻哈姥閱讀 385評(píng)論 0 4
  • “當(dāng)你的爸爸媽媽開(kāi)始對(duì)你小心翼翼的時(shí)候扒袖,不要以為那是出于一份恐懼塞茅,那是出于一份愛(ài),請(qǐng)好好珍惜吧季率∫笆荩”這是最近看朗讀者...
    bing_0486閱讀 242評(píng)論 1 1
  • I've dropped hints that i am into music、reading. it's a h...
    無(wú)與倫比呀閱讀 170評(píng)論 0 0