【RabbitMQ的那點事】消息的可靠消費

  • Producer發(fā)送消息到Broker减拭,Broker向Producer確認消息蔽豺,這個步驟叫Publisher Confirms,詳細在前面文章中有介紹——【RabbitMQ的那點事】發(fā)送方確認機制(Publisher Confirms):http://www.reibang.com/p/15f0c1a105fb
  • 消息從Broker再發(fā)送至Consumer拧粪,Consumer向Broker確認消息修陡,這個步驟叫acknowledgements,也就是消費端的消息可靠性可霎,即本文內(nèi)容魄鸦。

1. 關(guān)于消費端確認模式(acknowledgements modes),官網(wǎng)文章:

2. 當注冊一個Consumer程序時癣朗,可以選擇多種投遞方式:

  • 無ack模式(AcknowledgeMode.NONE)拾因,默認模式,意思是發(fā)送方不需要確認旷余,即fire and forget(發(fā)后即忘绢记,在消息發(fā)出后就立即將這條消息刪除,而不管消費端是否接收到正卧,是否處理完)
  • 發(fā)送方需要消費端確認消息模式蠢熄,(消費端需要告知消息已經(jīng)收到)。這其中分兩種:
    • 一種是自動確認:AcknowledgeMode.AUTO模式下炉旷,由Container自動應(yīng)答签孔,正確處理發(fā)出ack信息,處理失敗發(fā)出nack信息砾跃,rabbitmq發(fā)出消息后將會等待consumer端的應(yīng)答骏啰,只有收到ack確認信息才會把消息清除掉,收到nack信息的處理辦法由setDefaultRequeueRejected()方法設(shè)置抽高,所以在這種模式下,發(fā)生錯誤的消息是可以恢復的透绩。
    • 另一種是手動確認翘骂,AcknowledgeMode.MANUAL≈愫溃基本同AUTO模式碳竟,區(qū)別是需要人為調(diào)用方法給應(yīng)答。

3. 想要消息的可靠消息狸臣,需要采用手動確認模式莹桅。

以下是manual mode的代碼示例,參考了文章:https://javamana.com/2021/09/20210912132719181S.html

3.1 配置:
spring:
  rabbitmq:
    port: 5672
    host: localhost
    virtual-host: spring-boot-test
    listener:
      simple:
        acknowledge-mode: manual

如果我們使用了manual(需要手動在消費端確認消息模式)烛亦,然后使用了之前的RabbitListener去接收消息:

    @RabbitListener(queues = "direct.queue")
    public void listen(String in) {
        System.out.println("Direct Message Listener: " + in);
    }

可以看到確實也能接收到消息诈泼,但看控制臺懂拾,消息依舊還是處于Ready的狀態(tài),也就是消息雖然被投遞過铐达,但Broker并沒有收到確認:
UI Console:Queue
3.2 那么怎樣進行手動確認岖赋?

還是使用@RabbitListener,但需要在參數(shù)中加上:

  • message瓮孙,用來查看是否是再次投遞過(getRedelivered)
  • channel用來回復ack
  • deliveryTag:消息投遞序號唐断,每個channel對應(yīng)一個(long類型),從1開始到9223372036854775807范圍杭抠,在手動消息確認時可以對指定delivery_tag的消息進行ack脸甘、nack、reject等操作偏灿。例如上面Ready中有4個消息斤程,那么deliveryTag分別是:1, 2菩混, 3忿墅, 4。
示例做了什么事情:

a. 接收消息
b. 處理(1除以0沮峡,業(yè)務(wù)處理報錯)疚脐,即沒來得及做act
c. 在異常處理中嘗試重新投遞,如果發(fā)現(xiàn)已經(jīng)重新投遞過一次(通過判斷messga的redelivered flag是否為true)邢疙,如果是棍弄,那么拒絕消息。
d. 在異常中嘗試再次返回隊列處理

以上的處理涉及到的方法:
  • 告知Broker消息已收到疟游,使用void basicAck(long deliveryTag, boolean multiple)荔睹,其中deliveryTag在上述有介紹孔飒,是消息投遞的序號,每個channel的唯一值。multiple如果設(shè)為true哨鸭,則表示會確實該channel中的所有delivery tags。例如這個channel中有5爆班, 6南吮, 7, 8需要確定笋籽,如果調(diào)用basicAct(8, true)蹦漠,則會把5-8的delivery tag的消息都確認掉。如果multiple為false车海,那么例子中只會確認delivery tag = 8的消息笛园。
  • 告知Broker消息消費失敗:basicNack(long deliveryTag, boolean multiple, boolean requeue),此方法有三個參數(shù)研铆,前兩個和basicAct相似埋同,最后一個是requeue,如果為true蚜印,則會將消息放回原隊列頭部莺禁。如果為false,在配置了dead letter exchange(死信通道)那么則會放這里窄赋,否則會丟掉哟冬。
  • 另一個方法basicReject(long deliveryTag, boolean requeue),和basicNack類似忆绰,也是告知Broker消息消費失敗浩峡,只不過不能multiple確認。
@Slf4j
@Component
public class RabbitMQListener {
    @RabbitListener(queues = {"direct.queue"})
    public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        MessageProperties messageProperties = message.getMessageProperties();
        try {
            log.info("Get message: {}", message.toString());
            log.info("DeliveryTag: {}", tag);

            int a = 1/0; // 主動拋出錯誤
            channel.basicAck(tag, false);

        } catch (Exception e) {
            log.error("met exception......{}", e.getMessage());
            // 當前的消息是否重新投遞的消息,也就是該消息是重新回到隊列里的消息
            log.info("messageProperties.getRedelivered(): {}", messageProperties.getRedelivered());

            if (messageProperties.getRedelivered()) {
                log.info("消息已重復處理失敗,拒絕再次接收...");
                // 拒絕消息
                channel.basicReject(tag, false);

            } else {
                log.info("消息即將再次返回隊列處理...");
                channel.basicNack(tag, false, true);
            }
        }
    }
}

測試結(jié)果:

2022-05-08 15:17:07.232 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : Get message: (Body:'hello, i am direct message!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=direct.exchange, receivedRoutingKey=direct-routing-key, deliveryTag=1, consumerTag=amq.ctag-i3qgOdtD8H4bm7_5OUQXTQ, consumerQueue=direct.queue])
2022-05-08 15:17:07.235 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : DeliveryTag: 1
2022-05-08 15:17:07.236 ERROR 32107 --- [ntContainer#4-1] RabbitMQListener : met exception....../ by zero
2022-05-08 15:17:07.236 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : messageProperties.getRedelivered(): false
2022-05-08 15:17:07.236 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : 消息即將再次返回隊列處理...
2022-05-08 15:17:07.238 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : Get message: (Body:'hello, i am direct message!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=direct.exchange, receivedRoutingKey=direct-routing-key, deliveryTag=2, consumerTag=amq.ctag-i3qgOdtD8H4bm7_5OUQXTQ, consumerQueue=direct.queue])
2022-05-08 15:17:07.238 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : DeliveryTag: 2
2022-05-08 15:17:07.238 ERROR 32107 --- [ntContainer#4-1] RabbitMQListener : met exception....../ by zero
2022-05-08 15:17:07.238 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : messageProperties.getRedelivered(): true
2022-05-08 15:17:07.238 INFO 32107 --- [ntContainer#4-1] RabbitMQListener : 消息已重復處理失敗,拒絕再次接收...

關(guān)于配置:

有些文章用的是listener.direct.acknowledge-mode: manual
參考Spring Boot 2.x的文檔:https://docs.spring.io/spring-boot/docs/2.0.0.RC1/reference/html/common-application-properties.html
原因是Spring AMQP現(xiàn)在支持兩種container type了错敢。
關(guān)于兩種type的區(qū)別翰灾,可以看網(wǎng)友的文章——RabbitMQ筆記(七)-SimpleMessageListenerContainer和DirectMessageListenerContainer:https://blog.csdn.net/yingziisme/article/details/86418580

listener type

4. 總結(jié)

消費方的ACK機制可以有效的解決消息從Broker到Consumer丟失的問題。但也要注意一點:消息的無限消費稚茅。


5. 額外的一些測試

5.1 上述關(guān)于deliveryTag纸淮,范圍是每個channel,例如:

啟動兩個Consumer亚享,都去訂閱同一個queue
這時候Producer往Broker中發(fā)10個消息咽块,那么每個Consumer都能收到5個消息,DeliveryTag都是1欺税,2侈沪,3,4晚凿,5

首先是Consumer端:
啟動兩個Instance:

@Slf4j
@Component
public class MultipleRabbitMQListener {
    @RabbitListener(queues = {"direct.queue"})
    public void receiveMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        log.info("Get message: {}", new String(message.getBody()));
        log.info("DeliveryTag: {}", tag);

        ThreadUtils.sleep(30000); // 模擬業(yè)務(wù)處理時間
        channel.basicAck(tag, false);
    }
}

其次是Producer端亭罪,在兩個Consumer instance啟動后再啟動:

    @Test
    public void sendMessageToDirectExchange1() {
        for (int i = 0; i < 10; i ++) {
            rabbitTemplate.convertAndSend("direct.exchange", "direct-routing-key", "message - " + i);
        }
    }

測試結(jié)果:可以看到每個Channel的DeliveryTag都是自增的。
另外歼秽,就算業(yè)務(wù)需要處理30秒应役,Broker在沒有收到Consumer的ack之前,不會再把消息給另一個消費者哲银。即:保證了同一個Queue消息只會被消費一次扛吞。

Consumer-1的log:

Consumer-1的log

Consumer-2的log:

Consumer-2的log


參考:
spring-rabbit消費過程解析及AcknowledgeMode選擇https://blog.csdn.net/weixin_38380858/article/details/84963944

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市荆责,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌亚脆,老刑警劉巖做院,帶你破解...
    沈念sama閱讀 223,126評論 6 520
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異,居然都是意外死亡键耕,警方通過查閱死者的電腦和手機寺滚,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,421評論 3 400
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來屈雄,“玉大人村视,你說我怎么就攤上這事【颇蹋” “怎么了蚁孔?”我有些...
    開封第一講書人閱讀 169,941評論 0 366
  • 文/不壞的土叔 我叫張陵,是天一觀的道長惋嚎。 經(jīng)常有香客問我杠氢,道長,這世上最難降的妖魔是什么另伍? 我笑而不...
    開封第一講書人閱讀 60,294評論 1 300
  • 正文 為了忘掉前任鼻百,我火速辦了婚禮,結(jié)果婚禮上摆尝,老公的妹妹穿的比我還像新娘温艇。我一直安慰自己,他們只是感情好堕汞,可當我...
    茶點故事閱讀 69,295評論 6 398
  • 文/花漫 我一把揭開白布勺爱。 她就那樣靜靜地躺著,像睡著了一般臼朗。 火紅的嫁衣襯著肌膚如雪邻寿。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,874評論 1 314
  • 那天视哑,我揣著相機與錄音绣否,去河邊找鬼。 笑死挡毅,一個胖子當著我的面吹牛蒜撮,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播跪呈,決...
    沈念sama閱讀 41,285評論 3 424
  • 文/蒼蘭香墨 我猛地睜開眼段磨,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了耗绿?” 一聲冷哼從身側(cè)響起苹支,我...
    開封第一講書人閱讀 40,249評論 0 277
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎误阻,沒想到半個月后债蜜,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體晴埂,經(jīng)...
    沈念sama閱讀 46,760評論 1 321
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,840評論 3 343
  • 正文 我和宋清朗相戀三年寻定,在試婚紗的時候發(fā)現(xiàn)自己被綠了儒洛。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,973評論 1 354
  • 序言:一個原本活蹦亂跳的男人離奇死亡狼速,死狀恐怖琅锻,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情向胡,我是刑警寧澤恼蓬,帶...
    沈念sama閱讀 36,631評論 5 351
  • 正文 年R本政府宣布,位于F島的核電站捷枯,受9級特大地震影響滚秩,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜淮捆,卻給世界環(huán)境...
    茶點故事閱讀 42,315評論 3 336
  • 文/蒙蒙 一郁油、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧攀痊,春花似錦桐腌、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,797評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至棘街,卻和暖如春蟆盐,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背遭殉。 一陣腳步聲響...
    開封第一講書人閱讀 33,926評論 1 275
  • 我被黑心中介騙來泰國打工石挂, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人险污。 一個月前我還...
    沈念sama閱讀 49,431評論 3 379
  • 正文 我出身青樓痹愚,卻偏偏與公主長得像,于是被迫代替她去往敵國和親蛔糯。 傳聞我的和親對象是個殘疾皇子拯腮,可洞房花燭夜當晚...
    茶點故事閱讀 45,982評論 2 361

推薦閱讀更多精彩內(nèi)容