文章參考:Rabbit實戰(zhàn)指南
消息何去何從
mandatory和immediate是channel.basicPublish
方法中的兩個參數(shù)奏篙,它們都有當(dāng)消息傳遞過程中不可達(dá)目的地時將消息返回給生產(chǎn)者的功能汹族。RabbitMQ提供的備份交換器(Alternate Exchange)可以將未能被交換器路由的消息(沒有綁定隊列或者沒有匹配的綁定)存儲起來,而不用返回給客戶端。
-
mandatory參數(shù)
當(dāng)mandatory參數(shù)設(shè)置為true時瘸右,交換器無法根據(jù)自身類型和路由鍵找到一個符號條件的隊列妆档,那么RabbitMQ會調(diào)用
Basic.Return
命令將消息返回給生產(chǎn)者蒸痹。當(dāng)mandatory參數(shù)設(shè)置為false時,出現(xiàn)上述情形呛哟,消息直接丟棄叠荠。生產(chǎn)者如何獲取到?jīng)]有被正確路由到合適隊列的消息呢?
可以通過調(diào)用
channel.addReturnListener
來添加ReturnListener監(jiān)聽器來實現(xiàn)扫责。使用mandatory參數(shù)的關(guān)鍵代碼如下
channel.basicPublish(EXCHANGE_NAME,"",true, MessageProperties.PERSISTENT_TEXT_PLAIN, "mandatory test".getBytes()); channel.addReturnListener(new ReturnListener(){ public void handleReturn(int replyCode,String replyText, String exchange,String routingKey, AMQP.BasicProperties basicProperties, byte[] body) throws IOException{ String message = new String(body); System.out.println("Basic.Return返回的結(jié)果是:"+message); } });
上面代碼中生產(chǎn)者沒有成功將消息路由到隊列榛鼎,此時RabbitMQ會通過
Basic.Return
返回“mandatory test”這條消息,之后生產(chǎn)者客戶端通過ReturnListener監(jiān)聽到這個事件鳖孤,上面代碼最后輸出的應(yīng)該是“Basic.Return返回的結(jié)果是:madatory test”者娱。 -
immediate參數(shù)
當(dāng)immediate參數(shù)設(shè)為true時,如果交換器在將消息路由到隊列時苏揣,發(fā)現(xiàn)隊列上并不存在任何消費者黄鳍,那么這條消息不會存入隊列中。當(dāng)與路由鍵匹配的所有隊列都沒有消費者時平匈,該消息會通過
Basic.Return
返回給生產(chǎn)者框沟。概況來說,mandatory參數(shù)告訴服務(wù)器至少將該消息路由到一個隊列中增炭,否則將消息返回給生產(chǎn)者忍燥。
-
備份交換器
備份交換器,英文名為Alternate Exchange隙姿。生產(chǎn)者在發(fā)送消息的時候梅垄,如果沒有設(shè)置mandatory參數(shù),那么消息在未被路由的情況下將會丟失输玷;如果設(shè)置mandatory參數(shù)队丝,那么添加ReturnListener的編程邏輯,生產(chǎn)者的代碼將會變得復(fù)雜饲嗽。如果既不想復(fù)雜炭玫,又不想消息丟失,那么可以使用備份交換器貌虾,這樣可以將未被路由的消息存儲在RabbitMQ中吞加,在需要的時候去處理這些消息。
可以通過在聲明交換器(調(diào)用
channel.exchangeDeclare
方法)的時候添加alternate-exchange參數(shù)來實現(xiàn)尽狠,也可以通過策略(Policy)的方式實現(xiàn)衔憨。如果兩者同時使用,則前者的優(yōu)先級更高袄膏,會覆蓋掉Policy的設(shè)置践图。使用參數(shù)設(shè)置關(guān)鍵代碼如下:
Map<String,Object> args = new HashMap<String,Object>(); args.put("alternate-exchange","myAe"); channel.exchangeDeclare("normalExchange","direct",true,false,args); channel.exchangeDeclare("myAe","fanout",true,false,null); channel.queueDeclare("normalQueue",true,false,false,null); channel.queueBind("normalQueue","normalExchange","normalKey"); channel.queueDeclare("unroutedQueue",true,false,false,null); channel.queueBind("unroutedQueue","myAe","");
上面代碼聲明了兩個交換器normalExchange和myAe,分別綁定了normalQueue和unroutedQueue這兩個隊列沉馆,同時將myAe設(shè)置為normalExchange的備份交換器码党。注意myAe的交換器類型為fanout德崭。
如果此時發(fā)送一條信息到normalExchange上,當(dāng)路由鍵等于“normalKey”的時候揖盘,消息能正確路由到normalQueue這個隊列上眉厨。如果路由鍵設(shè)置為其他值,比如“errorKey”兽狭,即消息不能正確的路由到與normalExchange綁定的任何隊列上憾股,此時,就會發(fā)送給myAe箕慧,進(jìn)而發(fā)送個unroutedQueue這個隊列服球。
同樣,如果采取Policy的方式來設(shè)置備份交換器颠焦,可參考如下:
rabbitmqctl set_policy AE "^noramlExchange$" '{"alternate-exchange":"myAE"}'
備份交換器和普通交換器沒有啥太大區(qū)別斩熊,為了方便使用,建議設(shè)置為fanout類型蒸健。需要注意的是座享,消息被重新發(fā)送到備份交換器時的路由鍵和從生產(chǎn)者發(fā)出的路由鍵是一樣的婉商。
對于備份交換器似忧,總結(jié)了以下幾個情況:
- 如果設(shè)置的備份交換器不存在,客戶端和RabbitMQ服務(wù)端都不會有異常出血丈秩,此時盯捌,消息丟失。
- 如果備份交換器沒有綁定任何隊列蘑秽,客戶端和RabbitMQ服務(wù)端都不會有異常出血饺著,此時,消息丟失肠牲。
- 如果備份交換器沒有任何匹配的隊列幼衰,戶端和RabbitMQ服務(wù)端都不會有異常出血,此時缀雳,消息丟失渡嚣。
- 如果備份交換器和mandatory參數(shù)一起使用,那么mandatory參數(shù)無效肥印。