RabbitMQ——如何保證消息發(fā)送的可靠性

[TOC]

一鸭巴、RabbitMQ消息發(fā)送機(jī)制

消息由生產(chǎn)者生產(chǎn)鹃祖,首先并發(fā)送到交換機(jī)(Exchange),然后由交換機(jī)根據(jù)指定的路由規(guī)則將消息路由到不同的隊(duì)列(Queue)中校读。最后由不同的消費(fèi)者去消費(fèi)處理歉秫。

RabbitMQ

根據(jù)RabbitMQ的機(jī)制分析雁芙,想要確保消息發(fā)送的可靠性兔甘,需要保證兩個(gè)方面:

  • 確認(rèn)消息到達(dá)交換機(jī)洞焙,publish
  • 確認(rèn)消息到達(dá)隊(duì)列,routes

這兩步如果全成功了熔任,說明消息已經(jīng)成功發(fā)送笋敞;任何一步出現(xiàn)問題荠瘪,消息就沒發(fā)送成功。

二喷兼、方案1: 開啟事務(wù)機(jī)制

在SpringBoot項(xiàng)目中可以通過開啟RabbitMQ事務(wù)機(jī)制的方式解決季惯,具體操作如下勉抓。

  1. 提供一個(gè)事物管理器

    @Bean
    RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }
    
  2. 生產(chǎn)者上添加事務(wù)注解并設(shè)置通信信道為事務(wù)模式

    @Service
    public class MsgService {
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        // 添加事務(wù)注解
        @Transactional
        public void send() {
            // 開啟事務(wù)模式
            rabbitTemplate.setChannelTransacted(true);
            rabbitTemplate.convertAndSend("ecchange", "queue", "Hello World!");
            throw new RuntimeException();
        }
    }
    

開啟事務(wù)模式以后纵散,RabbitMQ生產(chǎn)者發(fā)送消息要經(jīng)過如下4步:

  1. 客戶端請(qǐng)求伍掀,將信道設(shè)置為事務(wù)模式蜜笤;
  2. 服務(wù)端響應(yīng)瘩例,設(shè)置完成垛贤;
  3. 客戶端發(fā)送消息趣倾;
  4. 客戶端提交事務(wù);
  5. 服務(wù)端響應(yīng)善绎,確認(rèn)事務(wù)提交禀酱。

正常情況我們發(fā)送消息只需要第三步剂跟,可以看出上面的步驟比較復(fù)雜曹洽,所以事務(wù)模式效率不是很高送淆。

三辟拷、方案2: 發(fā)送方確認(rèn)

添加配置阐斜,啟用發(fā)送方確認(rèn)衫冻。注意,發(fā)送方確認(rèn)和事務(wù)不能同時(shí)存在智听,會(huì)報(bào)錯(cuò)羽杰。

spring:
  rabbitmq:
    # 消息到達(dá)交換器確認(rèn)回調(diào)
    publisher-confirm-type: correlated
    # 消息到達(dá)隊(duì)列回調(diào)
    publisher-returns: true

publisher-confirm-type枚舉如下:

public enum ConfirmType {

  /**
    * Use {@code RabbitTemplate#waitForConfirms()} (or {@code waitForConfirmsOrDie()}
    * within scoped operations.
    */
    SIMPLE,

    /**
  * Use with {@code CorrelationData} to correlate confirmations with sent
  * messsages.
  */
    CORRELATED,

  /**
    * Publisher confirms are disabled (default).
    */
    NONE

}

配置監(jiān)聽,實(shí)現(xiàn)回調(diào)

@Slf4j
@Configuration
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    public static final String EXCHANGE_NAME = "exchange_name";
    public static final String QUEUE_NAME = "queue.name";
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Bean
    Queue queue() {
        return new Queue(QUEUE_NAME);
    }

    @Bean
    DirectExchange directExchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    @Bean
    Binding binding() {
        return BindingBuilder.bind(queue()).to(directExchange()).with(QUEUE_NAME);
    }

    @PostConstruct
    public void initRabbitTemplate() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * Confirmation callback.
     *
     * @param correlationData correlation data for the callback.
     * @param ack             true for ack, false for nack
     * @param cause           An optional cause, for nack, when available, otherwise null.
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("{}:消息成功到達(dá)交換器", correlationData.getId());
        } else {
            log.error("{}:消息發(fā)送失敗", correlationData.getId());
        }
    }


    /**
     * Returned message callback.
     *
     * @param message    the returned message.
     * @param replyCode  the reply code.
     * @param replyText  the reply text.
     * @param exchange   the exchange.
     * @param routingKey the routing key.
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("{}:消息未成功路由到隊(duì)列", message.getMessageProperties().getMessageId());

    }
}

三到推、方案3: 失敗重試

失敗重試分兩種情況考赛,沒鏈接上mq的鏈接重試和鏈接上了發(fā)送失敗的發(fā)送重試。

1. 鏈接重試

利用Spring Boot自帶的重試機(jī)制莉测,直接通過配置開啟即可:

spring:
  rabbitmq:
    template:
      retry:
        enabled: true #開啟重試機(jī)制
        initial-interval: 1000ms #重試起始時(shí)間間隔
        max-attempts: 10 #最大重試次數(shù)
        max-interval: 10000ms #最大重試時(shí)間間隔
        multiplier: 2 #時(shí)間間隔系數(shù)

配置完以后颜骤,當(dāng)MQ鏈接斷開后,Spring會(huì)進(jìn)行retry連接。retry時(shí)間間隔為起始時(shí)間間隔*系數(shù)

2. 發(fā)送重試

發(fā)送重試主要針對(duì)的是消息沒有到達(dá)交換器祟绊。總體思路就是:利用消息確認(rèn)機(jī)制,當(dāng)消息沒有到達(dá)交換器時(shí),就會(huì)走失敗回調(diào)厚满,在回調(diào)方法中進(jìn)行相應(yīng)的業(yè)務(wù)補(bǔ)償處理即可丰榴。

利用數(shù)據(jù)庫(kù)存儲(chǔ)發(fā)送的消息記錄,創(chuàng)建數(shù)據(jù)表,大致字段內(nèi)容如下:

字段名 字段類型 字段是否為空 默認(rèn)值 備注
id bigint(20) unsigned N 主鍵仆邓,消息id
content text N 消息內(nèi)容
state tinyint(1) N 狀態(tài):0-發(fā)送中嗓蘑,1-成功,2-失敗
route_key varchar(255) N 路由key
exchange varcher(255) N 交換機(jī)
count tinyint N 重試次數(shù)
try_time datetime N 重試時(shí)間
create_time datetime N CURRENT_TIMESTAMP 創(chuàng)建時(shí)間
update_time datetime N CURRENT_TIMESTAMP 更新時(shí)間
del_flag tinyint(1) N 邏輯刪除

操作步驟如下:

  1. 消息發(fā)送前巷燥,向表中插入消息發(fā)送記錄,狀態(tài)為0抛姑,try_time根據(jù)實(shí)際情況設(shè)置即可蔬啡。
  2. 在confirm回調(diào)方法中顽腾,收到發(fā)送成功的回調(diào)漓摩,則將該消息的狀態(tài)修改為1啃炸。
  3. 通過定時(shí)job掃描發(fā)送記錄,篩選出狀態(tài)為0融击,并且過了重試時(shí)間的消息匣屡,重新發(fā)送。重試次數(shù)根據(jù)實(shí)際情況判斷怔接。

注意瓦侮,在發(fā)送的時(shí)候會(huì)出現(xiàn)重復(fù)發(fā)送的情況斋泄,所以在消費(fèi)的時(shí)候需要做好冪等摔认。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末抹蚀,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌薄扁,老刑警劉巖九默,帶你破解...
    沈念sama閱讀 217,509評(píng)論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)题山,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,806評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人舔琅,你說我怎么就攤上這事”蛤荆” “怎么了课蔬?”我有些...
    開封第一講書人閱讀 163,875評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)郊尝。 經(jīng)常有香客問我二跋,道長(zhǎng)流昏,這世上最難降的妖魔是什么吞获? 我笑而不...
    開封第一講書人閱讀 58,441評(píng)論 1 293
  • 正文 為了忘掉前任谚鄙,我火速辦了婚禮,結(jié)果婚禮上烤黍,老公的妹妹穿的比我還像新娘。我一直安慰自己速蕊,他們只是感情好娘赴,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,488評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著筝闹,像睡著了一般。 火紅的嫁衣襯著肌膚如雪糊秆。 梳的紋絲不亂的頭發(fā)上议双,一...
    開封第一講書人閱讀 51,365評(píng)論 1 302
  • 那天,我揣著相機(jī)與錄音平痰,去河邊找鬼。 笑死昂芜,一個(gè)胖子當(dāng)著我的面吹牛赔蒲,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播舞虱,決...
    沈念sama閱讀 40,190評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼损趋!你這毒婦竟也來了椅寺?” 一聲冷哼從身側(cè)響起浑槽,我...
    開封第一講書人閱讀 39,062評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤括荡,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后畸冲,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,500評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡算行,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,706評(píng)論 3 335
  • 正文 我和宋清朗相戀三年苫耸,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片褪子。...
    茶點(diǎn)故事閱讀 39,834評(píng)論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡嫌褪,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出笼痛,到底是詐尸還是另有隱情,我是刑警寧澤缨伊,帶...
    沈念sama閱讀 35,559評(píng)論 5 345
  • 正文 年R本政府宣布刻坊,位于F島的核電站枷恕,受9級(jí)特大地震影響紧唱,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜漏益,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,167評(píng)論 3 328
  • 文/蒙蒙 一绰疤、第九天 我趴在偏房一處隱蔽的房頂上張望舞终。 院中可真熱鬧轻庆,春花似錦、人聲如沸纷宇。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,779評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)桩砰。三九已至,卻和暖如春亚隅,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背懂鸵。 一陣腳步聲響...
    開封第一講書人閱讀 32,912評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工行疏, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人隘擎。 一個(gè)月前我還...
    沈念sama閱讀 47,958評(píng)論 2 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像采幌,于是被迫代替她去往敵國(guó)和親毁欣。 傳聞我的和親對(duì)象是個(gè)殘疾皇子怠噪,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,779評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容