前言
RabbitMQ雖然有對(duì)隊(duì)列及消息等的一些持久化設(shè)置锨匆,但其實(shí)光光只是這一個(gè)是不能夠保障數(shù)據(jù)的可靠性的吩愧,下面我們提出這樣的質(zhì)疑:
(1)RabbitMQ生產(chǎn)者是不知道自己發(fā)布的消息是否已經(jīng)正確達(dá)到服務(wù)器呢昂灵,如果中間發(fā)生網(wǎng)絡(luò)異常等情況呢?消息必然會(huì)丟失!
(2)RabbitMQ如果沒(méi)有設(shè)置隊(duì)列持久化,RabbitMQ服務(wù)器重后隊(duì)列的元數(shù)據(jù)會(huì)丟失亚兄,消息自然也會(huì)丟失混稽!
(3)RabbitMQ如果消費(fèi)者設(shè)置自動(dòng)確認(rèn)采驻,即autoAck為true,那么不管消費(fèi)者發(fā)生什么情況匈勋,該消息會(huì)自動(dòng)從隊(duì)列中移除礼旅,實(shí)際上消費(fèi)者有可能掛掉,消息必然會(huì)丟失洽洁!
(4)RabbitMQ中的消息如果沒(méi)有匹配到隊(duì)列時(shí)痘系,那么消息也會(huì)丟失!
一饿自、設(shè)置mandotory參數(shù)汰翠、AE備份交換器
針對(duì)前言中的第(4)個(gè)問(wèn)題,我們可以通過(guò)設(shè)置mandotory參數(shù)與AE備份交換器來(lái)解決
1昭雌、mandotory參數(shù)
1)當(dāng)為true時(shí)复唤,交換器無(wú)法根據(jù)自身的類型和路由鍵找到一個(gè)符合條件的隊(duì)列,此時(shí)RabbitMQ會(huì)調(diào)用Basic.Return命令將消息返回給生產(chǎn)者烛卧,消息將不會(huì)丟失
2)當(dāng)為false時(shí)佛纫,消息將會(huì)被直接丟棄。
3)RabbitMQ通過(guò)addReturnListener添加ReturnLisener監(jiān)聽器監(jiān)聽獲取沒(méi)有被正確路由到合適隊(duì)列的消息。
如果想學(xué)習(xí)Java工程化呈宇、高性能及分布式好爬、深入淺出。微服務(wù)甥啄、Spring存炮,MyBatis,Netty源碼分析的朋友可以加我的Java高級(jí)交流:854630135蜈漓,群里有阿里大牛直播講解技術(shù)僵蛛,以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家。
channel.basicPublish(EXCHANGE NAME, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "mandatory test".getBytes());
channel.addReturnListener(new ReturnListener(){
public void handleReturn(int replyCode, String replyText,
String exchange, String routingKey,
AMQP.BasicProperties basicProperties,
byte[] body) throws IOException {
String message = new String(body);
System.out.println("Basic.Return 返回的結(jié)果是: " + message);
}
});
2迎变、AE備份交換器
Alternate Exchange充尉,簡(jiǎn)稱AE,不設(shè)置mandatory參數(shù)衣形,那么消息將會(huì)被丟失驼侠,設(shè)置mandatory參數(shù)的話,需要添加ReturnListner監(jiān)聽器谆吴,增加復(fù)雜代碼倒源,如果既不想增加代碼又不想消息丟失,則使用AE句狼,將沒(méi)有被路由的消息存儲(chǔ)于RabbitMQ中笋熬。當(dāng)mandatory參數(shù)用AE一起使用時(shí),mandatory將失效腻菇。在介紹AE之前胳螟,也認(rèn)識(shí)RabbitMQ對(duì)于消息的過(guò)期時(shí)間TTL設(shè)置以及隊(duì)列的過(guò)期時(shí)間TTL設(shè)置
2.1 TTL過(guò)期時(shí)間設(shè)置
可以對(duì)隊(duì)列設(shè)置TTL與消息設(shè)置TTL,其中消息設(shè)置TTL經(jīng)常用于死信隊(duì)列筹吐、延遲隊(duì)列等高級(jí)應(yīng)用中糖耸。
1)設(shè)置消息TTL
設(shè)置TTL過(guò)期時(shí)間一般有兩種當(dāng)時(shí):一是通過(guò)隊(duì)列屬性,對(duì)隊(duì)列中所有消息設(shè)置相同的TTL丘薛。二就是對(duì)消息本身單獨(dú)設(shè)置嘉竟,每條消息TTL不同。如果一起使用時(shí)候洋侨,TTL小的為準(zhǔn)舍扰,當(dāng)一旦超過(guò)設(shè)置的TTL時(shí)間時(shí),就會(huì)變成“死信”希坚。
方式一:針對(duì)每條消息設(shè)置TTL是通過(guò)增加expiration的屬性參數(shù)實(shí)現(xiàn)的边苹,不可能像方式二一樣掃描整個(gè)隊(duì)列再判斷是否過(guò)期,只有當(dāng)該消息即將被消費(fèi)時(shí)再判定是否過(guò)期即可刪除吏够,也就是消息即使已經(jīng)過(guò)期勾给,但不一定立馬被刪除滩报!
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
// 持久化消息
builder deliveryMode(2);
// 設(shè)置 TTL=60000ms
builder expiration( 60000 );
AMQP.BasicProperties properties = builder. build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "ttlTestMessage".getBytes());
方式二:通過(guò)隊(duì)列屬性設(shè)置消息TTL是增加x-message-ttl參數(shù)實(shí)現(xiàn)的,只需要掃描整個(gè)隊(duì)列頭部即可立即刪除播急,也就是消息一旦過(guò)期就會(huì)被刪除脓钾!
Map argss = new HashMap();
argss.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, argss) ;
2)設(shè)置隊(duì)列TTL
通過(guò)在隊(duì)列中添加參數(shù)x-message-ttl參數(shù)實(shí)現(xiàn),設(shè)置隊(duì)列被自動(dòng)刪除前處于未被使用狀態(tài)的時(shí)間桩警,注意是隊(duì)列的使用狀態(tài)可训,并不是消息是否被消費(fèi)的狀態(tài)
設(shè)置ttl=30min的隊(duì)列,時(shí)間一到RabbitMQ會(huì)保證隊(duì)列被刪除捶枢,但是不會(huì)保證刪除的速度有多快握截。
Map args = new HashMap{);
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);
2.2 AE備份交換器的使用
聲明交換器的時(shí)候,添加alternate-exchange參數(shù)實(shí)現(xiàn)烂叔,或通過(guò)策略實(shí)現(xiàn)谨胞。前者優(yōu)先級(jí)高。從代碼角度需要以下三個(gè)步驟蒜鸡,具體代碼如下:
Map args = new HashMap();
args.put("a1ternate-exchange", "myAe");
channe1.exchangeDec1are("norma1Exchange", "direct", true, fa1se, args);
channe1.exchangeDec1are("myAe", "fanout", true, fa1se, nu11) ;
channe1.queueDec1are( "norma1Queue", true, fa1se, fa1se, nu11);
channe1.queueB nd("norma1Queue", "norma1Exchange", "norma1Key");
channe1.queueDec1are("unroutedQueue", true, fa1se, fa1se, nu11);
1)聲明normalExchange類型為direct的交換器胯努、類型為fanout的myAe備份交換器;并且normalExchange的備份交換器為myAe(備份交換器建議使用fanout類型交換器)
2)聲明normalQueue隊(duì)列逢防,聲明unrouteQueue隊(duì)列叶沛;
3)通過(guò)路由鍵normalKey綁定normalExchange與normalQueue,不適用路由鍵綁定unrouteQueue與myAe
二忘朝、消費(fèi)者手動(dòng)確認(rèn)
針對(duì)前言中第(3)個(gè)問(wèn)題灰署,我們需要在消費(fèi)者消費(fèi)完消息后手動(dòng)進(jìn)行確認(rèn),保證消息數(shù)據(jù)不丟失局嘁!
1溉箕、autoAck參數(shù)設(shè)置
1) 當(dāng)autoAck參數(shù)為false時(shí),手動(dòng)確認(rèn):
RabbitMQ會(huì)等待消費(fèi)者顯式地回復(fù)確認(rèn)信號(hào)后從內(nèi)存中移去消息(實(shí)際上是先標(biāo)示刪除標(biāo)記导狡,之后再刪除)约巷,這是一般推薦使用的方式偎痛,因?yàn)槭褂檬謩?dòng)確認(rèn)有足夠的時(shí)間處理消息旱捧,不需要擔(dān)心消費(fèi)者進(jìn)程掛掉之后消息丟失問(wèn)題。此時(shí)的消息就會(huì)分為兩個(gè)部分:一是等待投遞給消費(fèi)者的消息踩麦;二是已經(jīng)投遞給消費(fèi)者但還沒(méi)有收到消費(fèi)者確認(rèn)信號(hào)的消息枚赡。
2) 當(dāng)autoAck為true時(shí),自動(dòng)確認(rèn):
RabbitMQ會(huì)自動(dòng)隱式地回復(fù)確認(rèn)信號(hào)后從內(nèi)存中移去消息谓谦, RabbitMQ不需要管消費(fèi)者是否真正消費(fèi)了這些消息贫橙,RabbitMQ會(huì)自動(dòng)把發(fā)送出去的消息置為確認(rèn),然后直接從內(nèi)存中刪除反粥。
2卢肃、重新投遞
問(wèn):如果選擇手動(dòng)確認(rèn)疲迂,即autoAck為false時(shí),消費(fèi)者由于某些原因斷開了莫湘,那么消息的確認(rèn)會(huì)受到影響尤蒿,那么此時(shí)的消息會(huì)丟失嗎?
這也就是一開始提出來(lái)的問(wèn)題幅垮,其實(shí)是不必?fù)?dān)心消息會(huì)被丟失腰池,因?yàn)镽abbitMQ如果一直沒(méi)收到消費(fèi)者的確認(rèn)信號(hào),并且消費(fèi)此消息的消費(fèi)者已經(jīng)斷開忙芒,則RabbitMQ會(huì)重新安排消息進(jìn)入隊(duì)列等待給下一個(gè)消費(fèi)者示弓。也就是RabbitMQ不會(huì)設(shè)置消息的過(guò)期時(shí)間(當(dāng)然也可以設(shè)置過(guò)期時(shí)間,但與之有關(guān)系方式消息丟失的特性是死信隊(duì)列)呵萨,它只判斷是否需要重新安排入隊(duì)列重新投遞奏属,而判斷的唯一標(biāo)準(zhǔn)是消費(fèi)此消息的消費(fèi)者連接是否已經(jīng)斷開,即RabbitMQ會(huì)允許消費(fèi)一條消息的時(shí)間很久很久潮峦。
3拍皮、消費(fèi)者拒絕消息
1)使用channel.basicReject方法,但只能拒絕一條跑杭。
void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:消息的唯一標(biāo)識(shí)
requeue:表示是否可以拒絕的消息重新存入隊(duì)列
2)使用channel.basicNack铆帽。不同于前者,此方法可以批量拒絕德谅。
void basicNack(long deliveryTag, boolean multiple , boolean requeue) throws IOException;
multiple:設(shè)置為true則表示拒絕deliveryTag編號(hào)之前所有未被當(dāng)前消費(fèi)者確認(rèn)的消息爹橱。
3)問(wèn):關(guān)鍵在于,消費(fèi)者拒絕消費(fèi)消息后怎么處理窄做?是丟棄愧驱,還是重新回到隊(duì)列呢?
當(dāng)參數(shù)requeue設(shè)置為true時(shí)候椭盏,可以重新進(jìn)入隊(duì)列组砚,投遞給下一個(gè)消費(fèi)者。如果為false掏颊,消息就會(huì)把隊(duì)列中消息立馬移除糟红,再結(jié)合啟用“死信隊(duì)列”,防止消息丟失并且可以分析異常情況的發(fā)生乌叶。
最后盆偿,由于剩下的兩種方式涉及的內(nèi)容較多,所以在此將分成兩篇繼續(xù)介紹准浴,請(qǐng)看下篇
如果想學(xué)習(xí)Java工程化事扭、高性能及分布式、深入淺出乐横。微服務(wù)求橄、Spring今野,MyBatis,Netty源碼分析的朋友可以加我的Java高級(jí)交流:854630135罐农,群里有阿里大牛直播講解技術(shù)腥泥,以及Java大型互聯(lián)網(wǎng)技術(shù)的視頻免費(fèi)分享給大家。