概述
????本文主要介紹一下RabbitMQ中的備份交換器煌妈、死信隊(duì)列勾怒、延遲隊(duì)列以及優(yōu)先級(jí)隊(duì)列
備份交換器
mandatory參數(shù)
????在介紹備份交換器之前我們?cè)诨仡櫼幌耤hannel.basicPublish中的mandatory參數(shù),它有當(dāng)消息傳遞過(guò)程中不可達(dá)目的地時(shí)將消息返回給生產(chǎn)者的功能:mandatory參數(shù)如果設(shè)為true声旺,當(dāng)交換器無(wú)法根據(jù)自身的類(lèi)型和路由鍵找到一個(gè)符合條件的隊(duì)列時(shí),那么RabbitMQ會(huì)調(diào)用Basic.Return命令將消息返回給生產(chǎn)者 段只。當(dāng)mandatory參數(shù)設(shè)置為false時(shí)腮猖,出現(xiàn)上述情形,則消息直接被丟棄赞枕。
????那么生產(chǎn)者如何知道發(fā)布的消息有沒(méi)有被正確的路由到合適的隊(duì)列呢澈缺?這時(shí)候可以通過(guò)調(diào)用channel.addReturnListener監(jiān)聽(tīng)器實(shí)現(xiàn):
channel.basicPublish(EXCHANGE_NAME, "", true,
MessageProperties.PERSISTENT_TEXT_PLAIN,
"mandatory test".getBytes());
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties basicProperties,
byte[] body) throws IOException {
String message = new String(body);
System.out.println("Basic.Return 返回的結(jié)果是: "+ message);
}
});
????上面代碼中生產(chǎn)者沒(méi)有成功地將消息路由到隊(duì)列,此時(shí)RabbitMQ會(huì)通過(guò)Basic.Return返回"mandatory test"這條消息炕婶,之后生產(chǎn)者客戶端通過(guò)ReturnListener監(jiān)昕到了這個(gè)事件姐赡,上面代碼的最后輸出應(yīng)該是:"Basic.Return 返回的結(jié)果是 : mandatory test"。
????備份交換器柠掂,英文名稱為:Altemate Exchange(簡(jiǎn)稱AE)项滑,生產(chǎn)者在發(fā)送消息的時(shí)候如果不設(shè)置mandatory參數(shù) ,那么消息在未被路由的情況下將會(huì)丟失 : 如果設(shè)置了mandatory參數(shù)涯贞,那么需要添加ReturnListener的編程邏輯枪狂,生產(chǎn)者的代碼將變得復(fù)雜。如果既不想復(fù)雜化生產(chǎn)者的編程邏輯宋渔,又不想消息丟失州疾,那么可以使用備份交換器,這樣可以將未被路由的消息存儲(chǔ)在RabbitMQ中皇拣,再在需要的時(shí)候去處理這些消息严蓖。
聲明備份交換器
????聲明備份交換器可以在調(diào)用channel.exchangeDeclare方法時(shí)添加alternate-exchange參數(shù)來(lái)實(shí)現(xiàn),也可以通過(guò)策略Policy的方式實(shí)現(xiàn)氧急。如果兩者同時(shí)使用颗胡,則前者的優(yōu)先級(jí)更高,會(huì)覆蓋掉Policy的設(shè)置吩坝。
Map<String, Object> args = new HashMap<String, Object>();
args.put("alternate-exchange", "myAe");
//聲明交換器
channel.exchangeDeclare("normalExchange", "direct", true, false, args);
channel.exchangeDeclare("myAe", "fanout", true, false, null);
//聲明隊(duì)列
channel.queueDeclare("normalQueue", true, false, false, null);
channel.queueDeclare("unroutedQueue", true, false, false, null);
//交換器與隊(duì)列綁定
channel.queueBind("normalQueue", "normalExchange", "normalKey");
channel.queueBind( "unroutedQueue", "myAe", "" ) ;
????上面的代碼中聲明了兩個(gè)交換器:normalExchange和myAe杭措,分別綁定了normalQueue和uroutedQueue這兩個(gè)隊(duì)列,同時(shí)將myAe設(shè)置為normalExchange的備份交換器(注意myAe的交換器類(lèi)型為fanout)钾恢。
????如果此時(shí)發(fā)送一條消息到normalExchange上手素,當(dāng)路由鍵等于"normalKey"的時(shí)候鸳址,消息能正確路由到normalQueue這個(gè)隊(duì)列中。如果路由鍵設(shè)為其他值泉懦,比如"errorKey" 則消息不能被正確地路由到與normalExchange綁定的任何隊(duì)列上稿黍,此時(shí)就會(huì)發(fā)送給myAe,進(jìn)而發(fā)送到unroutedQueue這個(gè)隊(duì)列崩哩。
????備份交換器其實(shí)和普通的交換器沒(méi)有太大的區(qū)別巡球,為了方便使用,建議設(shè)置為fanout類(lèi)型邓嘹,如若想設(shè)置為direct或者topic的類(lèi)型也沒(méi)有什么不妥酣栈,個(gè)人建議使用fanout類(lèi)型:考慮這樣一種情況,如果備份交換器的類(lèi)型是direct汹押,并且有一個(gè)與其綁定的隊(duì)列矿筝,假設(shè)綁定的路由鍵是key1,當(dāng)某條攜帶路由鍵為key2的消息被轉(zhuǎn)發(fā)到這個(gè)備份交換器的時(shí)候棚贾,備份交換器沒(méi)有匹配到合適的隊(duì)列窖维,則消息仍丟失,這樣備份交換器只能存儲(chǔ)攜帶路由鍵key1的消息妙痹。
????需要注意的是铸史,消息被重新發(fā)送到備份交換器時(shí)的路由鍵和從生產(chǎn)者發(fā)出的路由鍵是一樣的。
????對(duì)于備份交換器怯伊,總結(jié)了以下幾種特殊情況:
- 如果設(shè)置的備份交換器不存在琳轿,客戶端和RabbitMQ服務(wù)端都不會(huì)有異常出現(xiàn),此時(shí)消息會(huì)丟失耿芹。
- 如果備份交換器沒(méi)有綁定任何隊(duì)列利赋,客戶端和RabbitMQ 服務(wù)端都不會(huì)有異常出現(xiàn),此時(shí)消息會(huì)丟失猩系。
- 如果備份交換器沒(méi)有任何匹配的隊(duì)列媚送,客戶端和RabbitMQ服務(wù)端都不會(huì)有異常出現(xiàn),此時(shí)消息會(huì)丟失寇甸。
- 如果備份交換器和mandatory參數(shù)一起使用塘偎,那么mandatory參數(shù)無(wú)效。
設(shè)置消息過(guò)期時(shí)間(TTL:Time to Live)
????RabbitMQ支持對(duì)消息和隊(duì)列設(shè)置過(guò)期時(shí)間
????設(shè)置過(guò)期時(shí)間的方式可以在聲明隊(duì)列時(shí)給隊(duì)列設(shè)置過(guò)期時(shí)間屬性拿霉,如果一個(gè)隊(duì)列設(shè)置了過(guò)期時(shí)間吟秩,那么隊(duì)列中的所有消息都有相同的過(guò)期時(shí)間,也可以在發(fā)送消息時(shí)對(duì)消息本身進(jìn)行單獨(dú)設(shè)置
????如果我們同時(shí)設(shè)置了隊(duì)列過(guò)期時(shí)間和消息過(guò)期時(shí)間绽淘,那么消息最終的TTL取最小值涵防,消息在隊(duì)列中生存時(shí)間如果超過(guò)設(shè)置的TTL值就會(huì)變成"死信"(Dead Message),消費(fèi)者將無(wú)法再收到該條消息(不是絕對(duì)的沪铭,可以使用死信隊(duì)列)
????對(duì)于給隊(duì)列設(shè)置過(guò)期時(shí)間這種方式一旦消息過(guò)期壮池,就會(huì)從隊(duì)列中移除偏瓤,而給消息設(shè)置過(guò)期時(shí)間這種方式,即使消息過(guò)期椰憋,也不會(huì)馬上從隊(duì)列中移除厅克,因?yàn)槊織l消息是否過(guò)期是在即將投遞到消費(fèi)者之前判定的。
????為什么這兩種方法處理的方式不一樣?因?yàn)殛?duì)列設(shè)置過(guò)期時(shí)間這種方式橙依,隊(duì)列中己過(guò)期的消息肯定在隊(duì)列頭部证舟,RabbitMQ只要定期從隊(duì)頭開(kāi)始掃描是否有過(guò)期的消息即可。而給消息設(shè)置過(guò)期時(shí)間窗骑,每條消息的過(guò)期時(shí)間不同女责,如果要?jiǎng)h除所有過(guò)期消息勢(shì)必要掃描整個(gè)隊(duì)列,所以不如等到此消息即將被消費(fèi)時(shí)再判定是否過(guò)期创译,如果過(guò)期再進(jìn)行刪除即可抵知。
-
給隊(duì)列設(shè)置過(guò)期時(shí)間
在隊(duì)列聲明channel.queueDeclare方法中加入x-message-ttl可以實(shí)現(xiàn)給隊(duì)列設(shè)置過(guò)期時(shí)間,參數(shù)的單位是毫秒昔榴。
Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, argss);
?????也可以通過(guò)Policy的方式來(lái)設(shè)置TTL,示例如下 :
./rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues
?????如果不設(shè)置TTL則表示此消息不會(huì)過(guò)期
?????如果將TTL設(shè)置為0碘橘,則表示除非此時(shí)可以直接將消息投遞到消費(fèi)者互订,否則該消息會(huì)被立即丟棄。
-
給消息單獨(dú)設(shè)置過(guò)期時(shí)間
在發(fā)送消息channel.basicPublish方法中加入expiration的屬性參數(shù)痘拆,單位為毫秒仰禽。
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
//持久化消息
builder.deliveryMode(2);
//設(shè)置消息過(guò)期時(shí)間為60000ms
builder.expiration("60000");
AMQP.BasicProperties properties = builder.build() ;
channel.basicPublish(exchangeName, routingKey, mandatory, properties,"Hello RabbitMQ".getBytes());
死信隊(duì)列
????死信隊(duì)列全稱為Dead-Letter-Exchange(DLX),也可以稱為死信交換器或死信郵箱,當(dāng)消息在一個(gè)隊(duì)列中變成死信 (dead message) 之后纺蛆,它能被重新被發(fā)送到另一個(gè)交換器中吐葵,這個(gè)交換器就是DLX,綁定DLX的隊(duì)列就稱之為死信隊(duì)列桥氏。
????消息變成死信的情形:
????????-- 消息被拒絕温峭,并且設(shè)置requeque參數(shù)為false
????????-- 消息過(guò)期
????????-- 隊(duì)列長(zhǎng)度達(dá)到最大值
????DLX也是一個(gè)正常的交換器,和一般的交換器沒(méi)有區(qū)別字支,它能在任何的隊(duì)列上被指定 凤藏,實(shí)際上就是設(shè)置某個(gè)隊(duì)列的屬性。當(dāng)這個(gè)隊(duì)列中存在死信時(shí)堕伪,RabbitMQ就會(huì)自動(dòng)地將這個(gè)消息重新發(fā)布到設(shè)置的DLX上去揖庄,進(jìn)而被路由到另一個(gè)隊(duì)列,即死信隊(duì)列欠雌√闵遥可以監(jiān)聽(tīng)這個(gè)隊(duì)列中的消息以進(jìn)行相應(yīng)的處理。
配置死信隊(duì)列
- 方式一:通過(guò)在隊(duì)列聲明channel.queueDeclare方法中設(shè)置x-dead-letter-exchange參數(shù)可以為一個(gè)隊(duì)列添加一個(gè)死信隊(duì)列DLX
//先聲明一個(gè)死信隊(duì)列: dlx_exchange
channel.exchangeDeclare("dlx_exchange", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "dlx_exchange ");
//為隊(duì)列myqueue添加死信隊(duì)列
channel.queueDeclare("myqueue", false, false, false, args);
????RabbitMQ允許我們?yōu)樗佬抨?duì)列指定路由鍵富俄,如果沒(méi)有特殊指定禁炒,則使用原隊(duì)列的路由鍵而咆。
- 方式二:當(dāng)然這里也可以通過(guò)Policy的方式設(shè)置:
./rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":" dlx_exchange"}' --apply-to queues
????對(duì)于 RabbitMQ 來(lái)說(shuō),死信隊(duì)列是一個(gè)非常有用的特性齐苛,消息不能被消費(fèi)者正確消費(fèi)(消費(fèi)者調(diào)用了Basic.Nack 或者 Basic.Reject) 而被置入死信隊(duì)列中的情況翘盖,后續(xù)分析程序可以通過(guò)消費(fèi)這個(gè)死信隊(duì)列中的內(nèi)容來(lái)分析當(dāng)時(shí)所遇到的異常情況,進(jìn)而可以改善和優(yōu)化系統(tǒng)凹蜂。
延遲隊(duì)列
????延遲隊(duì)列存儲(chǔ)的對(duì)象是對(duì)應(yīng)的延遲消息馍驯,所謂延遲消息就是指消息被發(fā)送后,并不想讓消費(fèi)者立即拿到消息玛痊,而是在等待特定時(shí)間后消費(fèi)者才可以拿到這個(gè)消息進(jìn)行消費(fèi)汰瘫。
????在AMQP協(xié)議中,或者RabbitMQ本身沒(méi)有直接支持延遲隊(duì)列的功能擂煞,但是可以通過(guò)前面介紹的DLX和TTL模擬出延遲隊(duì)列的功能:假設(shè)一個(gè)應(yīng)用中需要將每條消息都設(shè)置為10秒的延遲混弥,生產(chǎn)者通過(guò) exchange.normal這個(gè)交換器將發(fā)送的消息存儲(chǔ)在 queue.normal這個(gè)隊(duì)列中。消費(fèi)者訂閱的并非是 queue.normal 這個(gè)隊(duì)列对省,而是它的死信隊(duì)列queue.dlx 蝗拿。當(dāng) 消息從queue.normal 這個(gè)隊(duì)列中過(guò)期之后被存入queue.dlx這個(gè)隊(duì)列中,消費(fèi)者就恰巧消費(fèi)到了延遲10秒的這條消息蒿涎。
優(yōu)先級(jí)隊(duì)列
????優(yōu)先級(jí)隊(duì)列哀托,顧名思義,具有高優(yōu)先級(jí)的隊(duì)列具有高的優(yōu)先權(quán)劳秋,優(yōu)先級(jí)高的消息具備優(yōu)先被消費(fèi)的特權(quán)仓手。
????可以通過(guò)設(shè)置隊(duì)列的x-max-priority參數(shù)來(lái)實(shí)現(xiàn)
Map<String, Object> args = new HashMap<String, Object>() ;
args.put( "x-max-priority", 10) ;
channel.queueDeclare("queue.priority", true, fa1se, false, args) ;
上面的代碼演示的是如何配置一個(gè)隊(duì)列的最大優(yōu)先級(jí)。在此之后玻淑,需要在發(fā)送時(shí)在消息中設(shè)置消息當(dāng)前的優(yōu)先級(jí):
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.priority(5) ;
AMQP.BasicProperties properties = builder.build() ;
channel.basicPublish("exchange_priority", "rk_priority", properties, "Hello RabbitMQ".getBytes());
????上面的代碼中設(shè)置消息的優(yōu)先級(jí)為5嗽冒。默認(rèn)最低為0,最高為隊(duì)列設(shè)置的最大優(yōu)先級(jí)补履。優(yōu)先級(jí)高的消息可以被優(yōu)先消費(fèi)添坊,這個(gè)也是有前提的 : 如果在消費(fèi)者的消費(fèi)速度大于生產(chǎn)者的速度,且 Broker中沒(méi)有消息堆積的情況下箫锤,對(duì)發(fā)送的消息設(shè)置優(yōu)先級(jí)也就沒(méi)有什么實(shí)際意義帅腌。因?yàn)樯a(chǎn)者剛發(fā)送完一條消息就被消費(fèi)者消費(fèi)了,那么就相當(dāng)于 Broker中至多只有一條消息麻汰,對(duì)于單條消息來(lái)說(shuō)優(yōu)先級(jí)是沒(méi)有什么意義的速客。