title: 【MQ】可靠消息
date: 2017-12-08 21:55:53
tags: MQ
categories: MQ
初始【MQ】最后說到默認(rèn)情況下柒爸,消息發(fā)送后 MQ 不會(huì)向發(fā)送方確認(rèn)消息到達(dá)准浴,也不會(huì)進(jìn)行持久化處理。即在發(fā)送方眼里消息只要發(fā)出去捎稚,就不再關(guān)心消息消息了乐横。這確實(shí)做到了生產(chǎn)者與 MQ 的解耦求橄,并且效率很高。但缺點(diǎn)也非常明顯葡公,無法確定消息投遞是可靠的:
- 正在運(yùn)行的 MQ 宕機(jī)后罐农,無法恢復(fù)已發(fā)送的消息(持久化問題)
- 沒有匹配的 queue,那么消息將被 exchange 直接丟棄催什,而發(fā)送方對此毫不知情(確認(rèn)問題)
- 消息發(fā)送過程中在網(wǎng)絡(luò)中丟失涵亏,發(fā)送方毫不知情(確認(rèn)問題)
Rabbit MQ 是被設(shè)計(jì)為金融行業(yè)服務(wù)的,在這些方面當(dāng)然有考慮蒲凶。本文將從持久化和消息確認(rèn)兩方面來了解 Rabbit MQ 的可靠消息實(shí)踐气筋。
持久化
為了確保消息在 MQ 各個(gè)環(huán)節(jié)的不丟失,需要將 exchange, queue, 投遞方式都進(jìn)行持久化聲明旋圆。具體持久化的方式很簡單宠默,調(diào)用 API 就可以了。
exchange 持久化
exchange 聲明時(shí)灵巧,將 durable 設(shè)置為 true 就可以了搀矫。這順便看一下 exchange 創(chuàng)建方法
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable)
throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable,
boolean autoDelete,Map<String, Object> arguments)
throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type)
throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, // 交換器名稱
String type, // 交換器類型
boolean durable, // 是否持久化
boolean autoDelete, // 是否自動(dòng)刪除
boolean internal, // 內(nèi)部
Map<String, Object> arguments // 其他構(gòu)造參數(shù)
) throws IOException;
// 等價(jià)于 exchangeDeclare 方法設(shè)置 nowait 參數(shù)
void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete,
boolean internal, Map<String, Object> arguments)
throws IOException;
// 被動(dòng)聲明隊(duì)列,聲明前先檢查
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
exchange 聲明持久化后只能確保重啟后 exchange 重新創(chuàng)建刻肄。否則 exchange 將丟失瓤球,生產(chǎn)者就無法正常發(fā)送消息了。
queue 持久化
queue 持久化也是一樣的套路敏弃,將 durable 設(shè)置為 true 就可以了卦羡。queue 創(chuàng)建的 AIP:
Queue.DeclareOk queueDeclare() throws IOException;
Queue.DeclareOk queueDeclare(String queue, // queue 名稱
boolean durable, // 持久化
boolean exclusive, // 排他隊(duì)列
boolean autoDelete, // 自動(dòng)刪除
Map<String, Object> arguments // 其他構(gòu)造參數(shù)
) throws IOException;
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
對 durable 沒什么好說的,確保重啟后 queue 重新創(chuàng)建权她,但消息無法恢復(fù)虹茶,消息的持久化依賴于投遞方式的持久化。
注意一下 exclusive 參數(shù):一個(gè)隊(duì)列被聲明為排他隊(duì)列隅要,該隊(duì)列僅對首次申明它的連接可見蝴罪,并在連接斷開時(shí)自動(dòng)刪除:
- 排他隊(duì)列是基于連接可見的,同一連接的不同信道是可以同時(shí)訪問同一連接創(chuàng)建的排他隊(duì)列步清;
- “首次”要门,如果一個(gè)連接已經(jīng)聲明了一個(gè)排他隊(duì)列,其他連接是不允許建立同名的排他隊(duì)列的廓啊,這個(gè)與普通隊(duì)列不同欢搜;
- 即使該隊(duì)列是持久化的,一旦連接關(guān)閉或者客戶端退出谴轮,該排他隊(duì)列都會(huì)被自動(dòng)刪除的炒瘟,這種隊(duì)列適用于一個(gè)客戶端發(fā)送讀取消息的應(yīng)用場景。
投遞方式持久化聲明
套路基本一致第步,還是看 API:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props,
byte[] body)throws IOException;
void basicPublish(String exchange, // 交換器
String routingKey, // routing key
boolean mandatory, // 消息確認(rèn)
boolean immediate, // 廢棄
BasicProperties props, // 參數(shù)
byte[] body // 消息有效負(fù)載
) throws IOException;
持久化的參數(shù)包含在 BasicProperties 定義中:
public static class BasicProperties extends AMQBasicProperties {
private String contentType; // 消息類型
private String contentEncoding; // 編碼
private Map<String, Object> headers;
private Integer deliveryMode; // 持久化疮装。1:非持久化缘琅;2:持久化
private Integer priority; // 優(yōu)先級
private String correlationId;
private String replyTo; // 反饋隊(duì)列
private String expiration; // expiration到期時(shí)間
private String messageId;
private Date timestamp;
private String type;
private String userId;
private String appId;
private String clusterId;
// 省略方法
}
BasicProperties 的構(gòu)造除了提供默認(rèn)的方法外,對常用的參數(shù)可以直接獲得廓推,還支持使用 builder 模式構(gòu)造刷袍。
如果單獨(dú)持久化投遞方式,重啟后因?yàn)榻粨Q器樊展、隊(duì)列已不存在所以毫無意義
持久化的影響
-
性能
《Rabbit MQ 實(shí)戰(zhàn)》 一書在說明持久化對性能影響時(shí)呻纹,舉例:“使用持久化機(jī)制而導(dǎo)致消息吞吐量降低至少 10 倍的情況并不少見”。這個(gè)說法還是很讓我震驚的专缠,很好奇 Rabbit MQ 的持久化策略是怎么做的影響這么大雷酪,還是說非持久化策略太優(yōu)秀了,以至于磁盤性能極大影響了整體吞吐量藤肢。這里挖個(gè)坑太闺,爭取以后看看內(nèi)部實(shí)現(xiàn)吧,畢竟 erlang 對我是個(gè)大問題嘁圈。
-
集群模式下工作的不好
暫時(shí)不清楚集群模式下的影響,先 mark 一下
-
依舊無法 100% 數(shù)據(jù)不丟失
即使 exchange蟀淮,queue最住,投遞方式都進(jìn)行持久化聲明依舊不能做到 100% 數(shù)據(jù)不丟失,原因有二:
-
Rabbit MQ 不是為每條消息進(jìn)行 fsync(同步 IO) 處理
依舊可能出現(xiàn)掛掉時(shí)有消息沒有持久化的情況怠惶,解決有兩種方式:鏡像隊(duì)列和消息確認(rèn)
看到網(wǎng)上有提到 erlang 寫文件的實(shí)時(shí)問題涨缚,不懂,先 mark策治,待求證
-
消息確認(rèn)
消息確認(rèn)可以分為生產(chǎn)者確認(rèn)消息正確投遞和消費(fèi)者確認(rèn)消息正確接收脓魏,對 Rabbit MQ 有三種更具體的情況:
- confire/事務(wù):確認(rèn)消息到達(dá) broker,避免消息在生產(chǎn)者發(fā)出后丟失
- 客戶端 ACK:確認(rèn)消費(fèi)者接收消息通惫,避免消息在消息隊(duì)列發(fā)出后丟失
- mandatory/immediate:確認(rèn)消息到達(dá)隊(duì)列茂翔,避免到達(dá)交換器后找不到隊(duì)列而丟棄
事務(wù)/confire
事務(wù)
確認(rèn)消息成功被 exchange 接收。事務(wù)是 AMQP 協(xié)議內(nèi)定義的履腋, Rabbit MQ 也做了相應(yīng)的實(shí)現(xiàn)珊燎。與事務(wù)相關(guān)有三個(gè)方法,具體使用的模板:
try {
channel.txSelect();
channel.basicPublish(...);
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
channel.txRollback();
}
事務(wù)缺點(diǎn):最大的問題是執(zhí)行前后需要開啟事務(wù)遵湖,提交/回滾事務(wù)悔政,而這幾個(gè)過程又必須是同步的因此會(huì)造成很大的性能問題
confire
confire 是 Rabbit MQ 為解決事務(wù)性能問題設(shè)計(jì)的確認(rèn)機(jī)制,主要的做法是為每條消息都設(shè)置唯一 ID 且 ID 以 1 為步長生序延旧,MQ 通過發(fā)送 ACK, NACK 異步確認(rèn)消息是否到達(dá)交換器谋国。
網(wǎng)上普遍對 confire 的描述都集中在異步性上。除了異步迁沫,可以設(shè)置 basic.ack 的 multiple 域進(jìn)行累計(jì)確認(rèn)芦瘾,這有點(diǎn) TCP 的確認(rèn)方式捌蚊。
confire 最大的問題是無法回滾,導(dǎo)致生產(chǎn)者本身也不確定消息是否放成功旅急。如果程序需要實(shí)現(xiàn)類似回滾功能逢勾,則維護(hù)一個(gè) unconfire 消息的集合,每次收到 ACK/NACK 時(shí)更新集合(還需要考慮是否是累計(jì)確認(rèn))
我使用了三種方式實(shí)現(xiàn) confire 并進(jìn)行對比:
- 對每條消息要求接收對應(yīng)的 confire 消息
- 對一組消息要求接收一條 confire 消息
- 使用監(jiān)聽器完全異步的接收 confire 消息
不出意外的第三種方式的性能是最好的藐吮。
客戶端 ACK
聲明隊(duì)列時(shí)指定 noAck 參數(shù):
- noAck=false:Rabbit MQ 向消費(fèi)者發(fā)出消息后等待消費(fèi)者顯式發(fā)出 ack 信號后才移除消息
- noAck=true:Rabbit MQ 向消費(fèi)者發(fā)出消息后立即移除消息
當(dāng)設(shè)置隊(duì)列 noAck 為 false 時(shí)溺拱,客戶端必須根據(jù)消息的處理情況向 MQ 反饋,默認(rèn)情況下 會(huì)自動(dòng)確認(rèn)谣辞。如果希望手動(dòng)確認(rèn)需要關(guān)閉自動(dòng)確認(rèn)迫摔。
客戶端除了 ACK 為還可以向 MQ 反饋其他信息,反饋的 API 分別有:
- channel.basicAck:向 MQ 確認(rèn)消息正確接收
- channel.basicRecover:向 MQ 確認(rèn)消息需要重發(fā)泥从,可以根據(jù)參數(shù)重發(fā)給當(dāng)前消費(fèi)者或重新入隊(duì)
- channel.basicReject:向 MQ 確認(rèn)消息退回
- channel.basicNack:向 MQ 確認(rèn)批量退回消息句占,可以根據(jù)參數(shù)選擇是否批量
mandatory/immediate
mandatory
mandatory 設(shè)置為 true 時(shí):MQ 至少將該消息路由到至少一個(gè)隊(duì)列中,否則將消息返還給生產(chǎn)者
mandatory 實(shí)現(xiàn)時(shí)只需要:
-
投遞消息時(shí)設(shè)置 mandatory 參數(shù)為true
void basicPublish(String exchange, // 交換器 String routingKey, // routing key boolean mandatory, // 消息確認(rèn) boolean immediate, // 廢棄 BasicProperties props, // 參數(shù) byte[] body // 消息有效負(fù)載 ) throws IOException;
-
設(shè)置監(jiān)聽器
channel.addReturnListener(new ReturnListener() { public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] body) throws IOException { // TODO } });
當(dāng)消息沒有被正確路由到至少一個(gè)隊(duì)列時(shí)躯嫉,AMQP協(xié)議會(huì)返回對應(yīng)消息纱烘,監(jiān)聽器內(nèi)的代碼將被調(diào)用;
當(dāng)消息正確投遞祈餐,什么也不發(fā)生
immediate
Rabbit MQ 3.0 之后已移除擂啥。設(shè)置為 true 時(shí):消息路由到 queue 前,如果 queue 有消費(fèi)者帆阳,則馬上將消息投遞給 queue哺壶,否則直接把消息返還給生產(chǎn)者,消息不再入隊(duì)蜒谤。
參考:
《Rabbit MQ 實(shí)戰(zhàn)》
RabbitMQ(二):mandatory標(biāo)志的作用
RabbitMQ:Publisher的消息確認(rèn)機(jī)制
RabbitMQ之消息確認(rèn)機(jī)制(事務(wù)+Confirm)
rabbitMq生產(chǎn)者角度:消息持久化山宾、事務(wù)機(jī)制、PublisherConfirm鳍徽、mandatory