【MQ】可靠消息


title: 【MQ】可靠消息
date: 2017-12-08 21:55:53
tags: MQ
categories: MQ


初始【MQ】最后說到默認(rèn)情況下柒爸,消息發(fā)送后 MQ 不會(huì)向發(fā)送方確認(rèn)消息到達(dá)准浴,也不會(huì)進(jìn)行持久化處理。即在發(fā)送方眼里消息只要發(fā)出去捎稚,就不再關(guān)心消息消息了乐横。這確實(shí)做到了生產(chǎn)者與 MQ 的解耦求橄,并且效率很高。但缺點(diǎn)也非常明顯葡公,無法確定消息投遞是可靠的:

  • 正在運(yùn)行的 MQ 宕機(jī)后罐农,無法恢復(fù)已發(fā)送的消息(持久化問題)
  • 沒有匹配的 queue,那么消息將被 exchange 直接丟棄催什,而發(fā)送方對此毫不知情(確認(rèn)問題)
  • 消息發(fā)送過程中在網(wǎng)絡(luò)中丟失涵亏,發(fā)送方毫不知情(確認(rèn)問題)

Rabbit MQ 是被設(shè)計(jì)為金融行業(yè)服務(wù)的,在這些方面當(dāng)然有考慮蒲凶。本文將從持久化和消息確認(rèn)兩方面來了解 Rabbit MQ 的可靠消息實(shí)踐气筋。

持久化

為了確保消息在 MQ 各個(gè)環(huán)節(jié)的不丟失,需要將 exchange, queue, 投遞方式都進(jìn)行持久化聲明旋圆。具體持久化的方式很簡單宠默,調(diào)用 API 就可以了。

exchange 持久化

exchange 聲明時(shí)灵巧,將 durable 設(shè)置為 true 就可以了搀矫。這順便看一下 exchange 創(chuàng)建方法

Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) 
  throws IOException;

Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, 
                                   boolean autoDelete,Map<String, Object> arguments) 
  throws IOException;

Exchange.DeclareOk exchangeDeclare(String exchange, String type) 
  throws IOException;

Exchange.DeclareOk exchangeDeclare(String exchange,  // 交換器名稱
                                   String type,  // 交換器類型
                                   boolean durable, // 是否持久化
                                   boolean autoDelete,  // 是否自動(dòng)刪除
                                   boolean internal,  // 內(nèi)部
                                   Map<String, Object> arguments  // 其他構(gòu)造參數(shù)
                                  ) throws IOException;

// 等價(jià)于 exchangeDeclare 方法設(shè)置 nowait 參數(shù)
void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete,
                           boolean internal, Map<String, Object> arguments) 
  throws IOException;

// 被動(dòng)聲明隊(duì)列,聲明前先檢查
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;

exchange 聲明持久化后只能確保重啟后 exchange 重新創(chuàng)建刻肄。否則 exchange 將丟失瓤球,生產(chǎn)者就無法正常發(fā)送消息了。

queue 持久化

queue 持久化也是一樣的套路敏弃,將 durable 設(shè)置為 true 就可以了卦羡。queue 創(chuàng)建的 AIP:

Queue.DeclareOk queueDeclare() throws IOException;

Queue.DeclareOk queueDeclare(String queue,  // queue 名稱 
                             boolean durable,  // 持久化
                             boolean exclusive,  // 排他隊(duì)列
                             boolean autoDelete,  // 自動(dòng)刪除
                             Map<String, Object> arguments  // 其他構(gòu)造參數(shù)
                            ) throws IOException;

void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, 
                        Map<String, Object> arguments) throws IOException;

Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;

對 durable 沒什么好說的,確保重啟后 queue 重新創(chuàng)建权她,但消息無法恢復(fù)虹茶,消息的持久化依賴于投遞方式的持久化。

注意一下 exclusive 參數(shù):一個(gè)隊(duì)列被聲明為排他隊(duì)列隅要,該隊(duì)列僅對首次申明它的連接可見蝴罪,并在連接斷開時(shí)自動(dòng)刪除:

  1. 排他隊(duì)列是基于連接可見的,同一連接的不同信道是可以同時(shí)訪問同一連接創(chuàng)建的排他隊(duì)列步清;
  2. “首次”要门,如果一個(gè)連接已經(jīng)聲明了一個(gè)排他隊(duì)列,其他連接是不允許建立同名的排他隊(duì)列的廓啊,這個(gè)與普通隊(duì)列不同欢搜;
  3. 即使該隊(duì)列是持久化的,一旦連接關(guān)閉或者客戶端退出谴轮,該排他隊(duì)列都會(huì)被自動(dòng)刪除的炒瘟,這種隊(duì)列適用于一個(gè)客戶端發(fā)送讀取消息的應(yīng)用場景。

投遞方式持久化聲明

套路基本一致第步,還是看 API:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) 
  throws IOException;

void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props,
                  byte[] body)throws IOException;

void basicPublish(String exchange,  // 交換器
                  String routingKey,  // routing key
                  boolean mandatory,  // 消息確認(rèn)
                  boolean immediate,  // 廢棄
                  BasicProperties props,  // 參數(shù)
                  byte[] body  // 消息有效負(fù)載
                 ) throws IOException;

持久化的參數(shù)包含在 BasicProperties 定義中:

public static class BasicProperties extends AMQBasicProperties {
    private String contentType;  // 消息類型
    private String contentEncoding;  // 編碼
    private Map<String, Object> headers;
    private Integer deliveryMode;  // 持久化疮装。1:非持久化缘琅;2:持久化
    private Integer priority;  // 優(yōu)先級
    private String correlationId;
    private String replyTo;  // 反饋隊(duì)列
    private String expiration;  // expiration到期時(shí)間
    private String messageId;
    private Date timestamp;
    private String type;
    private String userId;
    private String appId;
    private String clusterId;
    // 省略方法   
}    

BasicProperties 的構(gòu)造除了提供默認(rèn)的方法外,對常用的參數(shù)可以直接獲得廓推,還支持使用 builder 模式構(gòu)造刷袍。

如果單獨(dú)持久化投遞方式,重啟后因?yàn)榻粨Q器樊展、隊(duì)列已不存在所以毫無意義

持久化的影響

  • 性能

    《Rabbit MQ 實(shí)戰(zhàn)》 一書在說明持久化對性能影響時(shí)呻纹,舉例:“使用持久化機(jī)制而導(dǎo)致消息吞吐量降低至少 10 倍的情況并不少見”。這個(gè)說法還是很讓我震驚的专缠,很好奇 Rabbit MQ 的持久化策略是怎么做的影響這么大雷酪,還是說非持久化策略太優(yōu)秀了,以至于磁盤性能極大影響了整體吞吐量藤肢。這里挖個(gè)坑太闺,爭取以后看看內(nèi)部實(shí)現(xiàn)吧,畢竟 erlang 對我是個(gè)大問題嘁圈。

  • 集群模式下工作的不好

    暫時(shí)不清楚集群模式下的影響,先 mark 一下

  • 依舊無法 100% 數(shù)據(jù)不丟失

    即使 exchange蟀淮,queue最住,投遞方式都進(jìn)行持久化聲明依舊不能做到 100% 數(shù)據(jù)不丟失,原因有二:

    1. Rabbit MQ 不是為每條消息進(jìn)行 fsync(同步 IO) 處理

      依舊可能出現(xiàn)掛掉時(shí)有消息沒有持久化的情況怠惶,解決有兩種方式:鏡像隊(duì)列和消息確認(rèn)

    2. 看到網(wǎng)上有提到 erlang 寫文件的實(shí)時(shí)問題涨缚,不懂,先 mark策治,待求證

消息確認(rèn)

消息確認(rèn)可以分為生產(chǎn)者確認(rèn)消息正確投遞和消費(fèi)者確認(rèn)消息正確接收脓魏,對 Rabbit MQ 有三種更具體的情況:

  • confire/事務(wù):確認(rèn)消息到達(dá) broker,避免消息在生產(chǎn)者發(fā)出后丟失
  • 客戶端 ACK:確認(rèn)消費(fèi)者接收消息通惫,避免消息在消息隊(duì)列發(fā)出后丟失
  • mandatory/immediate:確認(rèn)消息到達(dá)隊(duì)列茂翔,避免到達(dá)交換器后找不到隊(duì)列而丟棄

事務(wù)/confire

事務(wù)

確認(rèn)消息成功被 exchange 接收。事務(wù)是 AMQP 協(xié)議內(nèi)定義的履腋, Rabbit MQ 也做了相應(yīng)的實(shí)現(xiàn)珊燎。與事務(wù)相關(guān)有三個(gè)方法,具體使用的模板:

try {
  channel.txSelect();
  channel.basicPublish(...);
  channel.txCommit();
} catch (Exception e) {
  e.printStackTrace();
  channel.txRollback();
}

事務(wù)缺點(diǎn):最大的問題是執(zhí)行前后需要開啟事務(wù)遵湖,提交/回滾事務(wù)悔政,而這幾個(gè)過程又必須是同步的因此會(huì)造成很大的性能問題

confire

confire 是 Rabbit MQ 為解決事務(wù)性能問題設(shè)計(jì)的確認(rèn)機(jī)制,主要的做法是為每條消息都設(shè)置唯一 ID 且 ID 以 1 為步長生序延旧,MQ 通過發(fā)送 ACK, NACK 異步確認(rèn)消息是否到達(dá)交換器谋国。

網(wǎng)上普遍對 confire 的描述都集中在異步性上。除了異步迁沫,可以設(shè)置 basic.ack 的 multiple 域進(jìn)行累計(jì)確認(rèn)芦瘾,這有點(diǎn) TCP 的確認(rèn)方式捌蚊。

confire 最大的問題是無法回滾,導(dǎo)致生產(chǎn)者本身也不確定消息是否放成功旅急。如果程序需要實(shí)現(xiàn)類似回滾功能逢勾,則維護(hù)一個(gè) unconfire 消息的集合,每次收到 ACK/NACK 時(shí)更新集合(還需要考慮是否是累計(jì)確認(rèn))

我使用了三種方式實(shí)現(xiàn) confire 并進(jìn)行對比:

  • 對每條消息要求接收對應(yīng)的 confire 消息
  • 對一組消息要求接收一條 confire 消息
  • 使用監(jiān)聽器完全異步的接收 confire 消息

不出意外的第三種方式的性能是最好的藐吮。

客戶端 ACK

聲明隊(duì)列時(shí)指定 noAck 參數(shù):

  • noAck=false:Rabbit MQ 向消費(fèi)者發(fā)出消息后等待消費(fèi)者顯式發(fā)出 ack 信號后才移除消息
  • noAck=true:Rabbit MQ 向消費(fèi)者發(fā)出消息后立即移除消息

當(dāng)設(shè)置隊(duì)列 noAck 為 false 時(shí)溺拱,客戶端必須根據(jù)消息的處理情況向 MQ 反饋,默認(rèn)情況下 會(huì)自動(dòng)確認(rèn)谣辞。如果希望手動(dòng)確認(rèn)需要關(guān)閉自動(dòng)確認(rèn)迫摔。

客戶端除了 ACK 為還可以向 MQ 反饋其他信息,反饋的 API 分別有:

  • channel.basicAck:向 MQ 確認(rèn)消息正確接收
  • channel.basicRecover:向 MQ 確認(rèn)消息需要重發(fā)泥从,可以根據(jù)參數(shù)重發(fā)給當(dāng)前消費(fèi)者或重新入隊(duì)
  • channel.basicReject:向 MQ 確認(rèn)消息退回
  • channel.basicNack:向 MQ 確認(rèn)批量退回消息句占,可以根據(jù)參數(shù)選擇是否批量

mandatory/immediate

mandatory

mandatory 設(shè)置為 true 時(shí):MQ 至少將該消息路由到至少一個(gè)隊(duì)列中,否則將消息返還給生產(chǎn)者

mandatory 實(shí)現(xiàn)時(shí)只需要:

  1. 投遞消息時(shí)設(shè)置 mandatory 參數(shù)為true

    void basicPublish(String exchange,  // 交換器
                  String routingKey,  // routing key
                  boolean mandatory,  // 消息確認(rèn)
                  boolean immediate,  // 廢棄
                  BasicProperties props,  // 參數(shù)
                  byte[] body  // 消息有效負(fù)載
                 ) throws IOException;
    
  2. 設(shè)置監(jiān)聽器

    channel.addReturnListener(new ReturnListener() {
        public void handleReturn(int replyCode, String replyText, String exchange,
                                 String routingKey, AMQP.BasicProperties basicProperties,
                                 byte[] body) throws IOException {
                                   // TODO
                                 }
    });
    

當(dāng)消息沒有被正確路由到至少一個(gè)隊(duì)列時(shí)躯嫉,AMQP協(xié)議會(huì)返回對應(yīng)消息纱烘,監(jiān)聽器內(nèi)的代碼將被調(diào)用;

當(dāng)消息正確投遞祈餐,什么也不發(fā)生

immediate

Rabbit MQ 3.0 之后已移除擂啥。設(shè)置為 true 時(shí):消息路由到 queue 前,如果 queue 有消費(fèi)者帆阳,則馬上將消息投遞給 queue哺壶,否則直接把消息返還給生產(chǎn)者,消息不再入隊(duì)蜒谤。


參考:

《Rabbit MQ 實(shí)戰(zhàn)》

RabbitMQ(二):mandatory標(biāo)志的作用

RabbitMQ:Publisher的消息確認(rèn)機(jī)制

RabbitMQ之mandatory和immediate

RabbitMQ之消息確認(rèn)機(jī)制(事務(wù)+Confirm)

rabbitMq生產(chǎn)者角度:消息持久化山宾、事務(wù)機(jī)制、PublisherConfirm鳍徽、mandatory

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末资锰,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子旬盯,更是在濱河造成了極大的恐慌台妆,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,451評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件胖翰,死亡現(xiàn)場離奇詭異接剩,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)萨咳,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,172評論 3 394
  • 文/潘曉璐 我一進(jìn)店門懊缺,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事鹃两∫抛” “怎么了?”我有些...
    開封第一講書人閱讀 164,782評論 0 354
  • 文/不壞的土叔 我叫張陵俊扳,是天一觀的道長途蒋。 經(jīng)常有香客問我,道長馋记,這世上最難降的妖魔是什么号坡? 我笑而不...
    開封第一講書人閱讀 58,709評論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮梯醒,結(jié)果婚禮上宽堆,老公的妹妹穿的比我還像新娘。我一直安慰自己茸习,他們只是感情好畜隶,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,733評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著号胚,像睡著了一般籽慢。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上猫胁,一...
    開封第一講書人閱讀 51,578評論 1 305
  • 那天嗡综,我揣著相機(jī)與錄音,去河邊找鬼杜漠。 笑死,一個(gè)胖子當(dāng)著我的面吹牛察净,可吹牛的內(nèi)容都是我干的驾茴。 我是一名探鬼主播,決...
    沈念sama閱讀 40,320評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼氢卡,長吁一口氣:“原來是場噩夢啊……” “哼锈至!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起译秦,我...
    開封第一講書人閱讀 39,241評論 0 276
  • 序言:老撾萬榮一對情侶失蹤峡捡,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后筑悴,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體们拙,經(jīng)...
    沈念sama閱讀 45,686評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,878評論 3 336
  • 正文 我和宋清朗相戀三年阁吝,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了砚婆。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,992評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡突勇,死狀恐怖装盯,靈堂內(nèi)的尸體忽然破棺而出坷虑,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 35,715評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站讼溺,受9級特大地震影響厘惦,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜洪乍,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,336評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧党窜,春花似錦、人聲如沸借宵。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,912評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽壤玫。三九已至豁护,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間欲间,已是汗流浹背楚里。 一陣腳步聲響...
    開封第一講書人閱讀 33,040評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留猎贴,地道東北人班缎。 一個(gè)月前我還...
    沈念sama閱讀 48,173評論 3 370
  • 正文 我出身青樓她渴,卻偏偏與公主長得像达址,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子趁耗,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,947評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化嘀韧、事務(wù)乳蛾、擁塞控...
    jiangmo閱讀 10,361評論 2 34
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 15,914評論 2 11
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理激率,服務(wù)發(fā)現(xiàn),斷路器勿决,智...
    卡卡羅2017閱讀 134,657評論 18 139
  • 利用RabbitMQ集群橫向擴(kuò)展能力乒躺,均衡流量壓力,讓消息集群的秒級服務(wù)能力達(dá)到百萬低缩,Google曾做過此類實(shí)驗(yàn)嘉冒;...
    有貨技術(shù)閱讀 3,468評論 0 1
  • 每一次的上公開課對上課者是一種折磨也是一種歷練。你會(huì)逼著自己一遍又一遍地捋自己的思路,推敲板塊標(biāo)題,布置師生互動(dòng)...
    21春雨無聲閱讀 594評論 0 1