在生產(chǎn)過程中誊爹,難免會發(fā)生服務(wù)器宕機(jī)的事情,RabbitMQ也不例外瓢捉,可能由于某種特殊情況下的異常而導(dǎo)致RabbitMQ宕機(jī)從而重啟频丘,那么這個時候?qū)τ谙㈥犃欣锏臄?shù)據(jù),包括交換機(jī)泡态、隊列以及隊列中存在消息恢復(fù)就顯得尤為重要了搂漠。RabbitMQ本身帶有持久化機(jī)制,包括交換機(jī)某弦、隊列以及消息的持久化桐汤。持久化的主要機(jī)制就是將信息寫入磁盤而克,當(dāng)RabbtiMQ服務(wù)宕機(jī)重啟后,從磁盤中讀取存入的持久化信息怔毛,恢復(fù)數(shù)據(jù)员萍。
1、交換機(jī)的持久化
如果使用常規(guī)的聲明交換機(jī)的方法:
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
默認(rèn)不是持久化的拣度,在服務(wù)器重啟之后碎绎,交換機(jī)會消失。我們在管理臺的Exchange頁簽下查看交換機(jī)抗果,可以看到使用上述方法聲明的交換機(jī)筋帖,F(xiàn)eatures一列是空的,即沒有任何附加屬性冤馏。
我們換用另一種方法聲明交換機(jī):
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
查看一下方法的說明:
* Actively declare a non-autodelete exchange with no extra arguments
* @see com.rabbitmq.client.AMQP.Exchange.Declare
* @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
* @param exchange the name of the exchange
* @param type the exchange type
* @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
* @throws java.io.IOException if an error is encountered
* @return a declaration-confirm method to indicate the exchange was successfully declared
*/
Exchange.DeclareOk (String exchange, String type, boolean durable) throws IOException;
我們可以看到第三個參數(shù)durable
日麸,如果為true時則表示要做持久化,當(dāng)服務(wù)重啟時宿接,交換機(jī)依然存在赘淮,所以使用該方法聲明的交換機(jī)是下面這個樣子的:
2、隊列的持久化
與交換機(jī)的持久化相同睦霎,隊列的持久化也是通過durable參數(shù)實(shí)現(xiàn)的梢卸,看一下方法的定義:
* Declare a queue
* @see com.rabbitmq.client.AMQP.Queue.Declare
* @see com.rabbitmq.client.AMQP.Queue.DeclareOk
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
第二個參數(shù)跟交換機(jī)方法的參數(shù)一樣,true表示做持久化副女,當(dāng)RabbitMQ服務(wù)重啟時蛤高,隊列依然存在。
這里說一下另外兩個參數(shù):
-
exclusive
:排他隊列碑幅。如果一個隊列被聲明為排他隊列戴陡,那么這個隊列只能被第一次聲明他的連接所見,并在連接斷開的時候自動刪除沟涨。這里有三點(diǎn)需要說明:- 排他隊列是基于連接可見的恤批,同一連接的不同信道是可以同時訪問同一連接創(chuàng)建的排他隊列
- 如果一個連接已經(jīng)聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的裹赴,這個與普通隊列不同
- 即使該隊列是持久化的喜庞,一旦連接關(guān)閉或者客戶端退出,該排他隊列都會被自動刪除的棋返,這種隊列適用于一個客戶端發(fā)送讀取消息的應(yīng)用場景
-
autoDelete
:自動刪除延都,如果該隊列沒有任何訂閱的消費(fèi)者的話,該隊列會被自動刪除睛竣。這種隊列適用于臨時隊列
3晰房、消息的持久化
消息的持久化是指當(dāng)消息從交換機(jī)發(fā)送到隊列之后,被消費(fèi)者消費(fèi)之前,服務(wù)器突然宕機(jī)重啟殊者,消息仍然存在与境。消息持久化的前提是隊列持久化,假如隊列不是持久化幽污,那么消息的持久化毫無意義嚷辅。
通過如下代碼設(shè)置消息的持久化:
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
其中MessageProperties.PERSISTENT_TEXT_PLAIN
是設(shè)置持久化的參數(shù)。
看一下basicPublish方法的定義:
* Publish a message
* @see com.rabbitmq.client.AMQP.Basic.Publish
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param props other properties for the message - routing headers etc
* @param body the message body
* @throws java.io.IOException if an error is encountered
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
再看下BasicProperties的類型:
public BasicProperties(
String contentType,
String contentEncoding,
Map<String,Object> headers,
Integer deliveryMode,
Integer priority,
String correlationId,
String replyTo,
String expiration,
String messageId,
Date timestamp,
String type,
String userId,
String appId,
String clusterId)
其中deliveryMode
是設(shè)置消息持久化的參數(shù)距误,等于1不設(shè)置持久化簸搞,等于2設(shè)置持久化。PERSISTENT_TEXT_PLAIN
是實(shí)例化的一個deliveryMode=2
的對象准潭,便于編程:
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null);
保證在服務(wù)器重啟的時候可以保持不丟失相關(guān)信息趁俊,重點(diǎn)解決服務(wù)器的異常崩潰而導(dǎo)致的消息丟失問題。但是刑然,將所有的消息都設(shè)置為持久化寺擂,會嚴(yán)重影響RabbitMQ的性能,寫入硬盤的速度比寫入內(nèi)存的速度慢的不只一點(diǎn)點(diǎn)泼掠。對于可靠性不是那么高的消息可以不采用持久化處理以提高整體的吞吐率怔软,在選擇是否要將消息持久化時,需要在可靠性和吞吐量之間做一個權(quán)衡择镇。
4挡逼、進(jìn)一步思考
將Exchange、Queue腻豌、Message都設(shè)置了持久化之后就能保證100%保證數(shù)據(jù)不丟失了嗎家坎?
答案是否定的。
首先吝梅,從consumer端來說虱疏,如果這時autoAck=true,那么當(dāng)consumer接收到相關(guān)消息之后苏携,還沒來得及處理就crash掉了做瞪,那么這樣也算數(shù)據(jù)丟失,這種情況也好處理右冻,只需將autoAck設(shè)置為false穿扳,然后在正確處理完消息之后進(jìn)行手動ack。
還有要在producer引入事務(wù)機(jī)制或者Confirm機(jī)制來確保消息已經(jīng)正確的發(fā)送至broker端国旷。
其次,關(guān)鍵的問題是消息在正確存入RabbitMQ之后茫死,還需要有一段時間(這個時間很短跪但,但不可忽視)才能存入磁盤之中,RabbitMQ并不是為每條消息都做fsync的處理,可能僅僅保存到cache中而不是物理磁盤上屡久,在這段時間內(nèi)RabbitMQ broker發(fā)生crash忆首,消息保存到cache但是還沒來得及落盤,那么這些消息將會丟失被环。
那么這個怎么解決呢糙及?首先可以引入RabbitMQ的鏡像隊列,這個相當(dāng)于配置了副本筛欢,當(dāng)master在此特殊時間內(nèi)crash掉浸锨,可以自動切換到slave,這樣有效的保障了HA版姑,除非整個集群都掛掉柱搜,這樣也不能完全的100%保障RabbitMQ不丟消息,但比沒有鏡像隊列的要好很多剥险,很多現(xiàn)實(shí)生產(chǎn)環(huán)境下都是配置了鏡像隊列的聪蘸。
RabbitMQ的可靠性除了Exchange、Queue表制、Message的持久化健爬,還涉及producer端的確認(rèn)機(jī)制、broker端的鏡像隊列的配置以及consumer端的確認(rèn)機(jī)制么介,要想確保消息的可靠性越高娜遵,那么性能也會隨之而降,魚和熊掌不可兼得夭拌,關(guān)鍵在于選擇和取舍魔熏。