四種策略確保 RabbitMQ 消息發(fā)送可靠性嘴纺!你用哪種败晴?

微服務(wù)可以設(shè)計(jì)成消息驅(qū)動(dòng)的微服務(wù)栽渴,響應(yīng)式系統(tǒng)也可以基于消息中間件來做尖坤,從這個(gè)角度來說,在互聯(lián)網(wǎng)應(yīng)用開發(fā)中闲擦,消息中間件真的是太重要了慢味。

今天,以 RabbitMQ 為例佛致,來和大家聊一聊消息中間消息發(fā)送可靠性的問題贮缕。

注意,以下內(nèi)容我主要和大家討論如何確保消息生產(chǎn)者將消息發(fā)送成功俺榆,并不涉及消息消費(fèi)的問題感昼。

1. RabbitMQ 消息發(fā)送機(jī)制

大家知道,RabbitMQ 中的消息發(fā)送引入了 Exchange(交換機(jī))的概念罐脊,消息的發(fā)送首先到達(dá)交換機(jī)上定嗓,然后再根據(jù)既定的路由規(guī)則,由交換機(jī)將消息路由到不同的 Queue(隊(duì)列)中萍桌,再由不同的消費(fèi)者去消費(fèi)宵溅。

大致的流程就是這樣,所以要確保消息發(fā)送的可靠性上炎,主要從兩方面去確認(rèn):

  1. 消息成功到達(dá) Exchange
  2. 消息成功到達(dá) Queue

如果能確認(rèn)這兩步恃逻,那么我們就可以認(rèn)為消息發(fā)送成功了。

如果這兩步中任一步驟出現(xiàn)問題藕施,那么消息就沒有成功送達(dá)寇损,此時(shí)我們可能要通過重試等方式去重新發(fā)送消息,多次重試之后裳食,如果消息還是不能到達(dá)矛市,則可能就需要人工介入了。

經(jīng)過上面的分析诲祸,我們可以確認(rèn)浊吏,要確保消息成功發(fā)送而昨,我們只需要做好三件事就可以了:

  1. 確認(rèn)消息到達(dá) Exchange。
  2. 確認(rèn)消息到達(dá) Queue找田。
  3. 開啟定時(shí)任務(wù)歌憨,定時(shí)投遞那些發(fā)送失敗的消息。

2. RabbitMQ 的努力

上面提出的三個(gè)步驟午阵,第三步需要我們自己實(shí)現(xiàn)躺孝,前兩步 RabbitMQ 則有現(xiàn)成的解決方案。

如何確保消息成功到達(dá) RabbitMQ底桂?RabbitMQ 給出了兩種方案:

  1. 開啟事務(wù)機(jī)制
  2. 發(fā)送方確認(rèn)機(jī)制

這是兩種不同的方案,不可以同時(shí)開啟惧眠,只能選擇其中之一籽懦,如果兩者同時(shí)開啟,則會報(bào)如下錯(cuò)誤:

我們分別來看氛魁。以下所有案例都在 Spring Boot 中展開暮顺,文末可以下載相關(guān)源碼。

2.1 開啟事務(wù)機(jī)制

Spring Boot 中開啟 RabbitMQ 事務(wù)機(jī)制的方式如下:

首先需要先提供一個(gè)事務(wù)管理器秀存,如下:

@Bean
RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
    return new RabbitTransactionManager(connectionFactory);
}

接下來捶码,在消息生產(chǎn)者上面做兩件事:添加事務(wù)注解并設(shè)置通信信道為事務(wù)模式:

@Service
public class MsgService {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Transactional
    public void send() {
        rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes());
        int i = 1 / 0;
    }
}

這里注意兩點(diǎn):

  1. 發(fā)送消息的方法上添加 @Transactional 注解標(biāo)記事務(wù)。
  2. 調(diào)用 setChannelTransacted 方法設(shè)置為 true 開啟事務(wù)模式米同。

這就 OK 了希俩。

在上面的案例中纫雁,我們在結(jié)尾來了個(gè) 1/0 ,這在運(yùn)行時(shí)必然拋出異常祈纯,我們可以嘗試運(yùn)行該方法,發(fā)現(xiàn)消息并未發(fā)送成功叼耙。

當(dāng)我們開啟事務(wù)模式之后腕窥,RabbitMQ 生產(chǎn)者發(fā)送消息會多出四個(gè)步驟:

  1. 客戶端發(fā)出請求,將信道設(shè)置為事務(wù)模式筛婉。
  2. 服務(wù)端給出回復(fù)簇爆,同意將信道設(shè)置為事務(wù)模式。
  3. 客戶端發(fā)送消息爽撒。
  4. 客戶端提交事務(wù)入蛆。
  5. 服務(wù)端給出響應(yīng),確認(rèn)事務(wù)提交匆浙。

上面的步驟安寺,除了第三步是本來就有的,其他幾個(gè)步驟都是平白無故多出來的首尼。所以大家看到挑庶,事務(wù)模式其實(shí)效率有點(diǎn)低言秸,這并非一個(gè)最佳解決方案。我們可以想想迎捺,什么項(xiàng)目會用到消息中間件举畸?一般來說都是一些高并發(fā)的項(xiàng)目,這個(gè)時(shí)候并發(fā)性能尤為重要凳枝。

所以抄沮,RabbitMQ 還提供了發(fā)送方確認(rèn)機(jī)制(publisher confirm)來確保消息發(fā)送成功,這種方式岖瑰,性能要遠(yuǎn)遠(yuǎn)高于事務(wù)模式叛买,一起來看下。

2.2 發(fā)送方確認(rèn)機(jī)制

2.2.1 單條消息處理

首先我們移除剛剛關(guān)于事務(wù)的代碼蹋订,然后在 application.properties 中配置開啟消息發(fā)送方確認(rèn)機(jī)制率挣,如下:

spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true

第一行是配置消息到達(dá)交換器的確認(rèn)回調(diào),第二行則是配置消息到達(dá)隊(duì)列的回調(diào)露戒。

第一行屬性的配置有三個(gè)取值:

  1. none:表示禁用發(fā)布確認(rèn)模式椒功,默認(rèn)即此。
  2. correlated:表示成功發(fā)布消息到交換器后會觸發(fā)的回調(diào)方法智什。
  3. simple:類似 correlated动漾,并且支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法的調(diào)用。

接下來我們要開啟兩個(gè)監(jiān)聽荠锭,具體配置如下:

@Configuration
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
    public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
    private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Bean
    Queue queue() {
        return new Queue(JAVABOY_QUEUE_NAME);
    }
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange(JAVABOY_EXCHANGE_NAME);
    }
    @Bean
    Binding binding() {
        return BindingBuilder.bind(queue())
                .to(directExchange())
                .with(JAVABOY_QUEUE_NAME);
    }

    @PostConstruct
    public void initRabbitTemplate() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            logger.info("{}:消息成功到達(dá)交換器",correlationData.getId());
        }else{
            logger.error("{}:消息發(fā)送失敗", correlationData.getId());
        }
    }

    @Override
    public void returnedMessage(ReturnedMessage returned) {
        logger.error("{}:消息未成功路由到隊(duì)列",returned.getMessage().getMessageProperties().getMessageId());
    }
}

關(guān)于這個(gè)配置類旱眯,我說如下幾點(diǎn):

  1. 定義配置類,實(shí)現(xiàn) RabbitTemplate.ConfirmCallback 和 RabbitTemplate.ReturnsCallback 兩個(gè)接口节沦,這兩個(gè)接口键思,前者的回調(diào)用來確定消息到達(dá)交換器,后者則會在消息路由到隊(duì)列失敗時(shí)被調(diào)用甫贯。
  2. 定義 initRabbitTemplate 方法并添加 @PostConstruct 注解吼鳞,在該方法中為 rabbitTemplate 分別配置這兩個(gè) Callback。

這就可以了叫搁。

接下來我們對消息發(fā)送進(jìn)行測試赔桌。

首先我們嘗試將消息發(fā)送到一個(gè)不存在的交換機(jī)中,像下面這樣:

rabbitTemplate.convertAndSend("RabbitConfig.JAVABOY_EXCHANGE_NAME",RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));

注意第一個(gè)參數(shù)是一個(gè)字符串渴逻,不是變量疾党,這個(gè)交換器并不存在,此時(shí)控制臺會報(bào)如下錯(cuò)誤:

接下來我們給定一個(gè)真實(shí)存在的交換器惨奕,但是給一個(gè)不存在的隊(duì)列雪位,像下面這樣:

rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,"RabbitConfig.JAVABOY_QUEUE_NAME","hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));

注意此時(shí)第二個(gè)參數(shù)是一個(gè)字符串,不是變量梨撞。

可以看到雹洗,消息雖然成功達(dá)到交換器了香罐,但是沒有成功路由到隊(duì)列(因?yàn)殛?duì)列不存在)。

這是一條消息的發(fā)送时肿,我們再來看看消息的批量發(fā)送庇茫。

2.2.2 消息批量處理

如果是消息批量處理,那么發(fā)送成功的回調(diào)監(jiān)聽是一樣的螃成,這里不再贅述旦签。

這就是 publisher-confirm 模式。

相比于事務(wù)寸宏,這種模式下的消息吞吐量會得到極大的提升宁炫。

3. 失敗重試

失敗重試分兩種情況,一種是壓根沒找到 MQ 導(dǎo)致的失敗重試击吱,另一種是找到 MQ 了淋淀,但是消息發(fā)送失敗了。

兩種重試我們分別來看覆醇。

3.1 自帶重試機(jī)制

前面所說的事務(wù)機(jī)制和發(fā)送方確認(rèn)機(jī)制,都是發(fā)送方確認(rèn)消息發(fā)送成功的辦法炭臭。如果發(fā)送方一開始就連不上 MQ永脓,那么 Spring Boot 中也有相應(yīng)的重試機(jī)制,但是這個(gè)重試機(jī)制就和 MQ 本身沒有關(guān)系了鞋仍,這是利用 Spring 中的 retry 機(jī)制來完成的常摧,具體配置如下:

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000ms
spring.rabbitmq.template.retry.max-attempts=10
spring.rabbitmq.template.retry.max-interval=10000ms
spring.rabbitmq.template.retry.multiplier=2

從上往下配置含義依次是:

  • 開啟重試機(jī)制。
  • 重試起始間隔時(shí)間威创。
  • 最大重試次數(shù)落午。
  • 最大重試間隔時(shí)間。
  • 間隔時(shí)間乘數(shù)肚豺。(這里配置間隔時(shí)間乘數(shù)為 2溃斋,則第一次間隔時(shí)間 1 秒,第二次重試間隔時(shí)間 2 秒吸申,第三次 4 秒梗劫,以此類推)

配置完成后,再次啟動(dòng) Spring Boot 項(xiàng)目截碴,然后關(guān)掉 MQ梳侨,此時(shí)嘗試發(fā)送消息,就會發(fā)送失敗日丹,進(jìn)而導(dǎo)致自動(dòng)重試走哺。

3.2 業(yè)務(wù)重試

業(yè)務(wù)重試主要是針對消息沒有到達(dá)交換器的情況。

如果消息沒有成功到達(dá)交換器哲虾,根據(jù)我們第二小節(jié)的講解丙躏,此時(shí)就會觸發(fā)消息發(fā)送失敗回調(diào)择示,在這個(gè)回調(diào)中,我們就可以做文章了彼哼!

整體思路是這樣:

  1. 首先創(chuàng)建一張表对妄,用來記錄發(fā)送到中間件上的消息,像下面這樣:

每次發(fā)送消息的時(shí)候敢朱,就往數(shù)據(jù)庫中添加一條記錄剪菱。這里的字段都很好理解,有三個(gè)我額外說下:

  • status:表示消息的狀態(tài)拴签,有三個(gè)取值孝常,0,1蚓哩,2 分別表示消息發(fā)送中构灸、消息發(fā)送成功以及消息發(fā)送失敗。
  • tryTime:表示消息的第一次重試時(shí)間(消息發(fā)出去之后岸梨,在 tryTime 這個(gè)時(shí)間點(diǎn)還未顯示發(fā)送成功喜颁,此時(shí)就可以開始重試了)。
  • count:表示消息重試次數(shù)曹阔。

其他字段都很好理解半开,我就不一一啰嗦了。

  1. 在消息發(fā)送的時(shí)候赃份,我們就往該表中保存一條消息發(fā)送記錄寂拆,并設(shè)置狀態(tài) status 為 0,tryTime 為 1 分鐘之后抓韩。
  2. 在 confirm 回調(diào)方法中纠永,如果收到消息發(fā)送成功的回調(diào),就將該條消息的 status 設(shè)置為1(在消息發(fā)送時(shí)為消息設(shè)置 msgId谒拴,在消息發(fā)送成功回調(diào)時(shí)尝江,通過 msgId 來唯一鎖定該條消息)。
  3. 另外開啟一個(gè)定時(shí)任務(wù)彪薛,定時(shí)任務(wù)每隔 10s 就去數(shù)據(jù)庫中撈一次消息茂装,專門去撈那些 status 為 0 并且已經(jīng)過了 tryTime 時(shí)間記錄,把這些消息拎出來后善延,首先判斷其重試次數(shù)是否已超過 3 次少态,如果超過 3 次,則修改該條消息的 status 為 2易遣,表示這條消息發(fā)送失敗彼妻,并且不再重試。對于重試次數(shù)沒有超過 3 次的記錄,則重新去發(fā)送消息侨歉,并且為其 count 的值+1屋摇。

大致的思路就是上面這樣,松哥這里就不給出代碼了幽邓,松哥的 vhr 里邊郵件發(fā)送就是這樣的思路來處理的炮温,完整代碼大家可以參考 vhr 項(xiàng)目(
https://github.com/lenve/vhr)。

當(dāng)然這種思路有兩個(gè)弊端:

  1. 去數(shù)據(jù)庫走一遭牵舵,可能拖慢 MQ 的 Qos柒啤,不過有的時(shí)候我們并不需要 MQ 有很高的 Qos,所以這個(gè)應(yīng)用時(shí)要看具體情況畸颅。
  2. 按照上面的思路担巩,可能會出現(xiàn)同一條消息重復(fù)發(fā)送的情況,不過這都不是事没炒,我們在消息消費(fèi)時(shí)涛癌,解決好冪等性問題就行了。

當(dāng)然送火,大家也要注意拳话,消息是否要確保 100% 發(fā)送成功,也要看具體情況种吸。

4. 小結(jié)

好啦假颇,這就是關(guān)于消息生產(chǎn)者的一些常見問題以及對應(yīng)的解決方案!

本文涉及到的相關(guān)源代碼大家可以在這里下載:
https://github.com/lenve/javaboy-code-samples骨稿。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市姜钳,隨后出現(xiàn)的幾起案子坦冠,更是在濱河造成了極大的恐慌,老刑警劉巖哥桥,帶你破解...
    沈念sama閱讀 217,509評論 6 504
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件辙浑,死亡現(xiàn)場離奇詭異,居然都是意外死亡拟糕,警方通過查閱死者的電腦和手機(jī)判呕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,806評論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來送滞,“玉大人侠草,你說我怎么就攤上這事±缧幔” “怎么了边涕?”我有些...
    開封第一講書人閱讀 163,875評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我功蜓,道長园爷,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,441評論 1 293
  • 正文 為了忘掉前任式撼,我火速辦了婚禮童社,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘著隆。我一直安慰自己扰楼,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,488評論 6 392
  • 文/花漫 我一把揭開白布旅东。 她就那樣靜靜地躺著灭抑,像睡著了一般。 火紅的嫁衣襯著肌膚如雪抵代。 梳的紋絲不亂的頭發(fā)上腾节,一...
    開封第一講書人閱讀 51,365評論 1 302
  • 那天,我揣著相機(jī)與錄音荤牍,去河邊找鬼案腺。 笑死,一個(gè)胖子當(dāng)著我的面吹牛康吵,可吹牛的內(nèi)容都是我干的劈榨。 我是一名探鬼主播,決...
    沈念sama閱讀 40,190評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼晦嵌,長吁一口氣:“原來是場噩夢啊……” “哼同辣!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起惭载,我...
    開封第一講書人閱讀 39,062評論 0 276
  • 序言:老撾萬榮一對情侶失蹤旱函,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后描滔,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體棒妨,經(jīng)...
    沈念sama閱讀 45,500評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,706評論 3 335
  • 正文 我和宋清朗相戀三年含长,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了券腔。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,834評論 1 347
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡拘泞,死狀恐怖纷纫,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情田弥,我是刑警寧澤涛酗,帶...
    沈念sama閱讀 35,559評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站,受9級特大地震影響商叹,放射性物質(zhì)發(fā)生泄漏燕刻。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,167評論 3 328
  • 文/蒙蒙 一剖笙、第九天 我趴在偏房一處隱蔽的房頂上張望卵洗。 院中可真熱鬧,春花似錦弥咪、人聲如沸过蹂。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,779評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽酷勺。三九已至,卻和暖如春扳躬,著一層夾襖步出監(jiān)牢的瞬間脆诉,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,912評論 1 269
  • 我被黑心中介騙來泰國打工贷币, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留击胜,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,958評論 2 370
  • 正文 我出身青樓役纹,卻偏偏與公主長得像偶摔,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子促脉,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,779評論 2 354

推薦閱讀更多精彩內(nèi)容