RabbitMQ 結(jié)合業(yè)務(wù)實(shí)現(xiàn)消息確認(rèn)

嘮個(gè)嗑

網(wǎng)絡(luò)上搜羅了多次想知道 RabbitMQ 現(xiàn)實(shí)業(yè)務(wù)種怎么實(shí)現(xiàn)消息的可靠性的,但是大多都不太理想谦屑,站在各位大佬巨人的肩膀上研究了一段時(shí)間,我也整理了一套簡(jiǎn)單可行性的方案伞辛,包括消息異常處理币绩。這篇文章想主要講一些業(yè)務(wù)處理方案,項(xiàng)目中加入 RabbitMQ 中間件很簡(jiǎn)單瘸羡,但是根據(jù)具體業(yè)務(wù)實(shí)現(xiàn)消息的可靠性漩仙,這個(gè)需要多加考慮。當(dāng)然下面也會(huì)通過測(cè)試代碼來分析犹赖,文末也會(huì)附上源碼地址队他。

1、準(zhǔn)備

1.1峻村、環(huán)境準(zhǔn)備

之前博客上寫過一篇編譯安裝的方法 地址麸折,大家可以參考,因?yàn)?RabbitMQ 底層語言的原因可能稍微麻煩點(diǎn)粘昨,那就沒有辦法了嗎垢啼?如果你是先搞測(cè)試,再在項(xiàng)目中使用的話张肾,那可以使用 docker 安裝芭析,2 行代碼,如下

docker pull rabbitmq

docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

1.2吞瞪、理論準(zhǔn)備

RabbitMQ 的類型包括:direct馁启、topic、fanout芍秆、headers惯疙、system(翻源碼看到的)

這里主要通過 topic 來分析,bindingkey 可以通過通配符 # 和 * 來匹配多個(gè) 路由鍵 (routingKey)浪听,
bindingkey 是綁定交換機(jī)(exchange)和隊(duì)列(queue)的螟碎, 生產(chǎn)者(publisher)發(fā)消息的時(shí)候會(huì)攜帶 routingKey、exchange 和 消息發(fā)送給 RabbitMQ迹栓,
連接成功后實(shí)際是組件 exchange 接收了生產(chǎn)者的消息掉分,然后通過 bindingkey 匹配 routingKey,決定送給哪個(gè) queue克伊,每個(gè)消費(fèi)者都會(huì)有 queue酥郭,所以 queue接收到消息 后就可以確保消費(fèi)者接可以收到消息了,最后消費(fèi)者再消費(fèi)愿吹。

再詳細(xì)的內(nèi)容可以查看大佬 erlie 的總結(jié) 地址

2不从、消息確認(rèn)

RabbitMQ 基礎(chǔ)配置

pom

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.5.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
    
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml

spring:
  rabbitmq:
    host: 192.168.1.105
    port: 5672
    username: guest
    password: guest
    virtual-host: /

# 交換機(jī)、隊(duì)列和綁定鍵聲明
test:
  exchange: test.exchange
  one:
    queue: one.test.queue
    binding-key: one.test.key

consumer

@RabbitListener(bindings=@QueueBinding(
                //配置交換器
                exchange=@Exchange(value="${test.exchange}",type= ExchangeTypes.TOPIC),
                //配置路由鍵
                key="${test.one.binding-key}",
                //配置隊(duì)列名稱
                value=@Queue(value="${test.one.queue}",autoDelete="true")
))
public void test(String msg) {
    log.info("test 收到的消息為:[{}]", msg);
    //業(yè)務(wù)代碼...
}

publisher

@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${test.exchange}")
private String exchange;
@Value("${test.one.binding-key}")
private String routingKey;

public void test() {
    rabbitTemplate.convertAndSend(exchange, routingKey, "test msg");
}

通過上面默認(rèn)的配置基本上就可以使用 RabbitMQ 了,但是這不是本篇的重點(diǎn).我們要知道消息發(fā)送和到消費(fèi)的過程中出現(xiàn)問題怎么辦犁跪?這就需要我們分段確認(rèn)消息是否接收成功椿息,如果失敗了該如何處理歹袁,先想想可以怎么做。
我們先捋一下思路寝优,消息發(fā)送給 RabbitMQ 条舔,如果連接 RabbitMQ 失敗,則記錄該消息乏矾,如果連接成功但是 exchange 接收失敗則記錄下該消息孟抗,如果 exchange 接收成功但是 queue 接收 exchange 的消息失敗則記錄下該消息,消息從生產(chǎn)者到 queue 有 3 個(gè)位置可能因網(wǎng)絡(luò)抖動(dòng)或其他原因出現(xiàn)問題钻心,那我們?cè)谶@三個(gè)位置記錄下問題后凄硼,統(tǒng)一通過計(jì)劃定時(shí)獲取記錄的消息并且重新發(fā)送,如果重發(fā)三次還沒有成功則標(biāo)記該記錄為異常消息捷沸。

2.1摊沉、生產(chǎn)者消息確認(rèn)和回調(diào)

這里分兩步:

  1. 消息確認(rèn)指的是 RabbitMQ(exchange) 確認(rèn)接收到了消費(fèi)者發(fā)送的消息
  2. 消息回調(diào)指的是 queue 接收 exchange 的消息失敗,則回調(diào)告訴 RabbitMQ 失敗的消息

2.1.1亿胸、消息確認(rèn)

開啟配置

spring:
  rabbitmq:
    #publisher-confirms: true #已過時(shí)
    publisher-confirm-type: correlated #開啟生產(chǎn)者消息確認(rèn)

還有另外 2 種模式:

  1. none 值是禁用發(fā)布確認(rèn)模式坯钦,是默認(rèn)值;
  2. simple 值經(jīng)測(cè)試有兩種效果,其一效果和 correlated 值一樣會(huì)觸發(fā)回調(diào)方法侈玄,其二在發(fā)布消息成功后使用 rabbitTemplate 調(diào)用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 節(jié)點(diǎn)返回發(fā)送結(jié)果婉刀,根據(jù)返回結(jié)果來判定下一步的邏輯,要注意的點(diǎn)是 waitForConfirmsOrDie 方法如果返回false則會(huì)關(guān)閉 channel序仙,則接下來無法發(fā)送消息到 broker突颊。

當(dāng)然還需新建一個(gè)類實(shí)現(xiàn) RabbitTemplate.ConfirmCallback,重寫方法 confirm潘悼,該方法有三個(gè)參數(shù) correlationData律秃、ack、cause治唤,主要說下 ack棒动,值為 true 表示 exchange 成功接收到消息,false 表示 exchange 接收消息失敗宾添,這里 2 種結(jié)果可以分別處理船惨,比如 false 可以把接收失敗的消息入庫,然后通過定時(shí)器來處理缕陕,比較懂的同學(xué)現(xiàn)在可能就有疑問了粱锐,correlationData 只能得到 msgId,根本沒有具體的消息,這里可以發(fā)揮你出色的想象力扛邑,可以通過對(duì)象封裝得到怜浅,也可以通過存內(nèi)存或者磁盤存儲(chǔ)得到,方法總比困難多蔬崩。

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
   String msgId = correlationData.getId();
   if (ack) {
       log.info("成功發(fā)送給 mq, msgId:[{}]", msgId);
   } else {
       log.error("發(fā)送給 mq 失敗, msgId:[{}], 失敗理由cause:[{}]", msgId, cause);
       //消息從生產(chǎn)者沒有到 exchange恶座,那存庫
       saveToDB(msgId,...);
   }
}

這里還差一步搀暑,就是原生的 rabbitTemplate 怎么知道消息確認(rèn)時(shí)使用剛建的類呢,注入即可
rabbitTemplate.setConfirmCallback(剛建的類);如有疑問可看文末源碼奥裸。

2.1.2险掀、消息回調(diào)

spring:
  rabbitmq:
    publisher-returns: true #開啟生產(chǎn)者消息回調(diào)

同上沪袭,需要新建類并實(shí)現(xiàn) RabbitTemplate.ReturnCallback,并且重寫方法 returnedMessage湾宙, 最后需要注入如下內(nèi)容

rabbitTemplate.setReturnCallback(剛建的類);
// 要想使 returnCallback 生效,必須設(shè)置為true
rabbitTemplate.setMandatory(true);
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
    String msgId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
    String msg = new String(message.getBody());
    log.error("消息回調(diào) msgId:[{}], msg:[{}] 不能被正確路由冈绊,replyCode:[{}], replyText:[{}], exchange:[{}], routingKey:[{}]", msgId, msg, replyCode, replyText, exchange, routingKey);
    //消息從 exchange 沒有到 queue侠鳄, 那存庫
    saveToDB(msgId, exchange, routingKey, msg);
}

如果消息發(fā)送時(shí)走到了回調(diào)方法 returnedMessage 中,說明目前的消息有問題是需要處理的死宣,同上伟恶,入庫。定時(shí)器來處理毅该。

當(dāng)然消息的發(fā)送方法 rabbitTemplate.convertAndSend() 會(huì)多一個(gè)參數(shù) correlationData

具體處理方法可以參考源碼博秫,這里只提供思路。

2.2眶掌、消費(fèi)者消息確認(rèn)

消費(fèi)者消息確認(rèn)是確認(rèn)消費(fèi)了隊(duì)列中的消息挡育,如果出現(xiàn)問題 RabbitMQ 會(huì)有重試機(jī)制,長時(shí)間失敗則需要人工干預(yù)朴爬,這個(gè)和生產(chǎn)者的確認(rèn)是先后關(guān)系即寒,實(shí)際是沒有關(guān)聯(lián)關(guān)系的,說到這召噩,說下我之前轉(zhuǎn)的牛角尖母赵,一直想尋找 exchange 如何確認(rèn)消費(fèi)者成功消費(fèi)消息,但是無果具滴,后來細(xì)想凹嘲,RabbitMQ 應(yīng)該設(shè)計(jì)的就是消費(fèi)者和 queue 交互,沒必要和 exchange 交互构韵。如有大佬知道 exchange 如何 ack 消費(fèi)者消費(fèi)消息可以告訴小弟周蹭,不勝感激。
圓規(guī)正轉(zhuǎn)贞绳,上消費(fèi)者代碼

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual #開啟消費(fèi)者消息確認(rèn); none:自動(dòng)確認(rèn)谷醉、auto:根據(jù)情況確認(rèn)
@RabbitListener(bindings=@QueueBinding(
                    //配置交換器
                    exchange=@Exchange(value="${test.exchange}",type= ExchangeTypes.TOPIC),
                    //配置路由鍵
                    key="${test.one.binding-key}",
                    //配置隊(duì)列名稱
                    value=@Queue(value="${test.one.queue}",autoDelete="true")
))
    public void test(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.info("test 收到的消息為:[{}], msgId:[{}]", msg, message.getMessageProperties().getHeaders().get("spring_returned_message_correlation"));
        try {
            //業(yè)務(wù)處理
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            e.printStackTrace();
            try {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        }
    }

主要說 3 個(gè)方法:

1、basicAck 是確認(rèn)消息冈闭,需要傳遞兩個(gè)參數(shù)

  1. deliveryTag(唯一標(biāo)識(shí) ID):當(dāng)一個(gè)消費(fèi)者向 RabbitMQ 注冊(cè)后俱尼,會(huì)建立起一個(gè) Channel ,RabbitMQ 會(huì)用 basic.deliver 方法向消費(fèi)者推送消息萎攒,這個(gè)方法攜帶了一個(gè) delivery tag遇八, 它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標(biāo)識(shí) ID矛绘,是一個(gè)單調(diào)遞增的正整數(shù),delivery tag 的范圍僅限于 Channel
  2. multiple:為了減少網(wǎng)絡(luò)流量刃永,手動(dòng)確認(rèn)可以被批處理货矮,當(dāng)該參數(shù)為 true 時(shí),則可以一次性確認(rèn) delivery_tag 小于等于傳入值的所有消息斯够,false則只確認(rèn)傳入值等于 delivery_tag 的消息

2囚玫、basicNack 是拒絕消息,可以拒絕多條读规,比 basicAck 多一個(gè)布爾值的參數(shù)抓督,如果為 true,被 nack 后重新入隊(duì)列然后重新消費(fèi)消費(fèi)束亏;如果為 false 被 nack 就丟了瘸恼。

3肯污、basicReject 只能拒絕一條消息铐料,reject 后消息直接丟了改基。

總結(jié)

這里簡(jiǎn)單實(shí)現(xiàn) RabbitMQ 消息可靠的方式是通過把消息發(fā)送時(shí)出現(xiàn)問題后直接入庫,然后通過計(jì)劃定時(shí)查詢?cè)僦匦掳l(fā)送給 RabbitMQ,如果 exchange 成功 ack 后則標(biāo)記為重發(fā)成功怕敬,如果重發(fā) 3 次還是失敗則標(biāo)記異常揣炕,需要人工處理。

討論

這種處理其實(shí)不算是最優(yōu)的方案赖捌,技術(shù)上還可以有如下方案

  1. 生產(chǎn)者發(fā)消息時(shí)記錄該條消息祝沸,并設(shè)該記錄 1 分鐘后生效,留 1 分鐘給exchange 確認(rèn)并直接標(biāo)記該消息記錄為成功越庇,然后計(jì)劃任務(wù)定時(shí)掃有效且未確認(rèn)的消息罩锐,再發(fā)送給 RabbitMQ ,如果確認(rèn)后則標(biāo)記為成功卤唉,否則 3 次后標(biāo)記為失敗涩惑。
  2. 還和小編寫的方案類似,記錄失敗的消息桑驱,但是定時(shí)任務(wù)獲取到失敗的消息后竭恬,直接調(diào)用消費(fèi)者的服務(wù)處理,不通過 RabbitMQ熬的, 但是這就需要維護(hù)消息和消費(fèi)者服務(wù)的關(guān)系了痊硕,稍微復(fù)雜些。

最后希望可以幫到看官押框,如果記錄的不對(duì)煩請(qǐng)?jiān)u論指出岔绸,一起討論

https://github.com/charmsongo/springboot-samples/tree/master/springboot-rabbitmq-songo

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子盒揉,更是在濱河造成了極大的恐慌晋被,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,290評(píng)論 6 491
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件刚盈,死亡現(xiàn)場(chǎng)離奇詭異羡洛,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)藕漱,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,107評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門欲侮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人谴分,你說我怎么就攤上這事锈麸。” “怎么了牺蹄?”我有些...
    開封第一講書人閱讀 156,872評(píng)論 0 347
  • 文/不壞的土叔 我叫張陵,是天一觀的道長薄翅。 經(jīng)常有香客問我沙兰,道長,這世上最難降的妖魔是什么翘魄? 我笑而不...
    開封第一講書人閱讀 56,415評(píng)論 1 283
  • 正文 為了忘掉前任鼎天,我火速辦了婚禮,結(jié)果婚禮上暑竟,老公的妹妹穿的比我還像新娘斋射。我一直安慰自己,他們只是感情好但荤,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,453評(píng)論 6 385
  • 文/花漫 我一把揭開白布罗岖。 她就那樣靜靜地躺著,像睡著了一般腹躁。 火紅的嫁衣襯著肌膚如雪桑包。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,784評(píng)論 1 290
  • 那天纺非,我揣著相機(jī)與錄音哑了,去河邊找鬼。 笑死烧颖,一個(gè)胖子當(dāng)著我的面吹牛弱左,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播炕淮,決...
    沈念sama閱讀 38,927評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼拆火,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起榜掌,我...
    開封第一講書人閱讀 37,691評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤优妙,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后憎账,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體套硼,經(jīng)...
    沈念sama閱讀 44,137評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,472評(píng)論 2 326
  • 正文 我和宋清朗相戀三年胞皱,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了邪意。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,622評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡反砌,死狀恐怖雾鬼,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情宴树,我是刑警寧澤策菜,帶...
    沈念sama閱讀 34,289評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站酒贬,受9級(jí)特大地震影響又憨,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜锭吨,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,887評(píng)論 3 312
  • 文/蒙蒙 一蠢莺、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧零如,春花似錦躏将、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,741評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至辕翰,卻和暖如春夺衍,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背喜命。 一陣腳步聲響...
    開封第一講書人閱讀 31,977評(píng)論 1 265
  • 我被黑心中介騙來泰國打工沟沙, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人壁榕。 一個(gè)月前我還...
    沈念sama閱讀 46,316評(píng)論 2 360
  • 正文 我出身青樓矛紫,卻偏偏與公主長得像,于是被迫代替她去往敵國和親牌里。 傳聞我的和親對(duì)象是個(gè)殘疾皇子颊咬,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,490評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容