1. 服務(wù)端
如果RabbitMQ集群中只有一個Broker節(jié)點匙瘪,那么該節(jié)點的失效將導(dǎo)致整體服務(wù)的臨時性不可用,并且入偷,也可能會導(dǎo)致消息的丟失「停可以將所有消息都設(shè)置為持久化慢蜓,并且對應(yīng)隊列的durable屬性也設(shè)置為true。但是這樣仍然無法避免由于緩存導(dǎo)致的問題:因為消息在發(fā)送之后和被寫入磁盤并執(zhí)行刷盤動作之間存在一個短暫卻會產(chǎn)生問題的時間窗鼠锈。
1.1 鏡像隊列
如果RabbitMQ集群是由多個Broker節(jié)點組成的闪檬,盡管交換器和綁定關(guān)系能夠在單點故障問題上幸免于難,但是隊列和其上的存儲的消息卻不行购笆,這是因為隊列進程及其內(nèi)容僅僅維持在單個節(jié)點之上粗悯,所以一個節(jié)點的失效表現(xiàn)為其對應(yīng)的隊列不可用。
引入鏡像隊列(Mirror Queue)的機制由桌,可以將隊列鏡像到集群中的其他Broker節(jié)點之上为黎,如果集群中的一一個節(jié)點失效了,隊列能自動地切換到鏡像中的另一個節(jié)點上以保證服務(wù)的可用性行您。在通常的用法中铭乾,針對每一個配置鏡像的隊列(以下簡稱鏡像隊列)都包含一個主節(jié)點(master)和若干個從節(jié)點(slave)。
slave會準(zhǔn)確地按照master執(zhí)行命令的順序進行動作(如下圖)娃循,故slave與master上維護的狀態(tài)應(yīng)該是相同的炕檩。如果master由于某種原因失效,那么“資歷最老”的slave會被提升為新的master.根據(jù)slave加入的時間排序捌斧,時間最長的slave即為“資歷最老”笛质。發(fā)送到鏡像隊列的所有消息會被同時發(fā)往master和所有的slave上,如果此時master掛掉了捞蚂,消息還會在slave上妇押,這樣slave提升為master的時候消息也不會丟失。
2. 生產(chǎn)端
通過消息持久化和鏡像隊列來解決因為服務(wù)器的異常奔潰導(dǎo)致的消息丟失姓迅。當(dāng)消息的發(fā)布者在將消息發(fā)送出去之后敲霍,消息到底有沒有正確到達(dá)broker代理服務(wù)器呢?默認(rèn)情況下生產(chǎn)者是不知道消息有沒有正確到達(dá)broker的丁存,如果在消息到達(dá)broker之前已經(jīng)丟失的話肩杈,消息根本就沒到達(dá)代理服務(wù)器,那么這個問題該怎么解決呢解寝?
RabbitMQ為我們提供了兩種方式:
- 通過AMQP事務(wù)機制實現(xiàn)扩然,這也是AMQP協(xié)議層面提供的解決方案;
- 通過將channel設(shè)置成confirm模式來實現(xiàn)聋伦;
2.1 消息持久化
發(fā)送時將deliveryMode
設(shè)置為2即可實現(xiàn)消息的持久化夫偶。
關(guān)鍵代碼:
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("bob")
.build()),
messageBodyBytes);
Spring的RabbitTemplate
發(fā)送消息時默認(rèn)將deliveryMode
其設(shè)置為2界睁,從而實現(xiàn)了消息的持久化。詳見:RabbitTemplate#convertMessageIfNecessary
方法索守。
2.2 事物機制
RabbitMQ中與事務(wù)機制有關(guān)的方法有三個:
方法 | 描述 |
---|---|
txSelect | 用于將當(dāng)前channel設(shè)置成transaction模式 |
txCommit | 用于提交事務(wù) |
txRollback | 用于回滾事務(wù) |
在通過txSelect開啟事務(wù)之后晕窑,我們便可以發(fā)布消息給broker代理服務(wù)器了,如果txCommit提交成功了卵佛,則消息一定到達(dá)了broker了杨赤,如果在txCommit執(zhí)行之前broker異常崩潰或者由于其他原因拋出異常,這個時候我們便可以捕獲異常通過txRollback回滾事務(wù)了截汪。
關(guān)鍵代碼:
try {
channel.txSelect();
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
int result = 1 / 0;
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
channel.txRollback();
}
事務(wù)確實能夠解決producer與broker之間消息確認(rèn)的問題疾牲,只有消息成功被broker接受,事務(wù)提交才能成功衙解,否則我們便可以在捕獲異常進行事務(wù)回滾操作同時進行消息重發(fā)阳柔,但是使用事務(wù)機制的話會降低RabbitMQ的性能。
2.3 Confirm機制
通過publisher confirm機制能夠確彬韭停客戶端知道哪些消息已經(jīng)存入磁盤舌剂。與事務(wù)機制
互斥。
生產(chǎn)者將信道設(shè)置成confirm模式暑椰,一旦信道進入confirm模式霍转,所有在該信道上面發(fā)布的消息都會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之后一汽,broker就會發(fā)送一個確認(rèn)給生產(chǎn)者(包含消息的唯一ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊列了避消,如果消息和隊列是可持久化的,那么確認(rèn)消息會將消息寫入磁盤之后發(fā)出召夹,broker回傳給生產(chǎn)者的確認(rèn)消息中deliver-tag域包含了確認(rèn)消息的序列號岩喷,此外broker也可以設(shè)置basic.ack的multiple域,表示到這個序列號之前的所有消息都已經(jīng)得到了處理监憎。
confirm模式最大的好處在于他是異步的纱意,一旦發(fā)布一條消息,生產(chǎn)者應(yīng)用程序就可以在等信道返回確認(rèn)的同時繼續(xù)發(fā)送下一條消息鲸阔,當(dāng)消息最終得到確認(rèn)之后偷霉,生產(chǎn)者應(yīng)用便可以通過回調(diào)方法來處理該確認(rèn)消息,如果RabbitMQ因為自身內(nèi)部錯誤導(dǎo)致消息丟失隶债,就會發(fā)送一條nack消息腾它,生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理該nack消息跑筝。
在channel 被設(shè)置成 confirm 模式之后死讹,所有被 publish 的后續(xù)消息都將被 confirm(即 ack) 或者被nack一次。但是沒有對消息被 confirm 的快慢做任何保證曲梗,并且同一條消息不會既被 confirm又被nack赞警。
客戶端實現(xiàn)生產(chǎn)者confirm有三種編程方式:
- 普通confirm模式:每發(fā)送一條消息后妓忍,調(diào)用waitForConfirms()方法,等待服務(wù)器端confirm愧旦。實際上是一種串行confirm了世剖。
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
if(!channel.waitForConfirms()){
System.out.println("send message failed.");
}
-
批量confirm模式:每發(fā)送一批消息后,調(diào)用waitForConfirms()方法笤虫,等待服務(wù)器端confirm旁瘫。
批量confirm模式稍微復(fù)雜一點,客戶端程序需要定期(每隔多少秒)或者定量(達(dá)到多少條)或者兩則結(jié)合起來publish消息琼蚯,然后等待服務(wù)器端confirm, 相比普通confirm模式酬凳,批量極大提升confirm效率,但是問題在于一旦出現(xiàn)confirm返回false或者超時的情況時遭庶,客戶端需要將這一批次的消息全部重發(fā)宁仔,這會帶來明顯的重復(fù)消息數(shù)量,并且峦睡,當(dāng)消息經(jīng)常丟失時翎苫,批量confirm性能應(yīng)該是不升反降的。
關(guān)鍵代碼:
channel.confirmSelect();
for(int i=0;i<batchCount;i++){
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
}
if(!channel.waitForConfirms()){
System.out.println("send message failed.");
}
-
異步confirm模式:提供一個回調(diào)方法榨了,服務(wù)端confirm了一條或者多條消息后Client端會回調(diào)這個方法煎谍。
異步confirm模式的編程實現(xiàn)最復(fù)雜,Channel對象提供的ConfirmListener()回調(diào)方法只包含deliveryTag(當(dāng)前Chanel發(fā)出的消息序號)阻逮,我們需要自己為每一個Channel維護一個unconfirm的消息序號集合粱快,每publish一條數(shù)據(jù),集合中元素加1叔扼,每回調(diào)一次handleAck方法事哭,unconfirm集合刪掉相應(yīng)的一條(multiple=false)或多條(multiple=true)記錄。從程序運行效率上看瓜富,這個unconfirm集合最好采用有序集合SortedSet存儲結(jié)構(gòu)鳍咱。實際上,SDK中的waitForConfirms()方法也是通過SortedSet維護消息序號的与柑。
關(guān)鍵代碼:
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});
while (true) {
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
confirmSet.add(nextSeqNo);
}
在使用SpringRabbitTemplate
時谤辜,通過為其設(shè)置ConfirmCallback
即可實現(xiàn)異步confirm機制。
關(guān)鍵代碼:
<rabbit:connection-factory id="connectionFactory"
addresses="${rabbit.addresses}"
username="${rabbit.username}"
password="${rabbit.password}"
virtual-host="${rabbit.vhost}"
publisher-confirms="true"
/>
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) ->
System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause)
);
3. 消費端
3.1 Ack機制
為了保證消息從隊列可靠地到達(dá)消費者价捧,RabbitMQ提供消息確認(rèn)機制(message acknowledgment)丑念。消費者在聲明隊列時,可以指定noAck參數(shù)结蟋,當(dāng)autoAck=false時脯倚,RabbitMQ會等待消費者顯式發(fā)回ack信號后才從內(nèi)存(和磁盤,如果是持久化消息的話)中移去消息。否則推正,RabbitMQ會在隊列中消息被消費后立即刪除它恍涂。
采用消息確認(rèn)機制后,只要令autoAck=false植榕,消費者就有足夠的時間處理消息(任務(wù))再沧,不用擔(dān)心處理消息過程中消費者進程掛掉后消息丟失的問題,因為RabbitMQ會一直持有消息直到消費者顯式調(diào)用basicAck為止尊残。
RabbitTemplate
發(fā)送消息時炒瘸,已經(jīng)默認(rèn)將autoAck設(shè)置為false;
// 處理消息...
// 手動發(fā)送ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
sequenceDiagram
服務(wù)器->>消費者: 消息
消費者-->>服務(wù)器: Ack