RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)白翻?

1. 本篇概要

其實(shí)乍炉,還有1種場(chǎng)景需要考慮:當(dāng)消費(fèi)者接收到消息后绢片,還沒(méi)處理完業(yè)務(wù)邏輯,消費(fèi)者掛掉了岛琼,那消息也算丟失了底循?,比如用戶下單槐瑞,訂單中心發(fā)送了1個(gè)消息到RabbitMQ里的隊(duì)列熙涤,積分中心收到這個(gè)消息,準(zhǔn)備給這個(gè)下單的用戶增加20積分困檩,但積分還沒(méi)增加成功呢祠挫,積分中心自己掛掉了,導(dǎo)致數(shù)據(jù)出現(xiàn)問(wèn)題窗看。

那么如何解決這種問(wèn)題呢茸歧?

為了保證消息被消費(fèi)者成功的消費(fèi),RabbitMQ提供了消息確認(rèn)機(jī)制(message acknowledgement)显沈,本文主要講解RabbitMQ中软瞎,如何使用消息確認(rèn)機(jī)制來(lái)保證消息被消費(fèi)者成功的消費(fèi),避免因?yàn)橄M(fèi)者突然宕機(jī)而引起的消息丟失拉讯。

2. 開啟顯式Ack模式

我們開啟一個(gè)消費(fèi)者的代碼是這樣的:

// 創(chuàng)建隊(duì)列消費(fèi)者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("Received Message '" + message + "'");
    }
};
channel.basicConsume(QUEUE_NAME, true, consumer);

這里的重點(diǎn)是channel.basicConsume(QUEUE_NAME, true, consumer);方法的第2個(gè)參數(shù)涤浇,讓我們先看下basicConsume()的源碼:

public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException {
    return this.basicConsume(queue, autoAck, "", callback);
}

這里的autoAck參數(shù)指的是是否自動(dòng)確認(rèn),如果設(shè)置為ture魔慷,RabbitMQ會(huì)自動(dòng)把發(fā)送出去的消息置為確認(rèn)只锭,然后從內(nèi)存(或者磁盤)中刪除,而不管消費(fèi)者接收到消息是否處理成功院尔;如果設(shè)置為false蜻展,RabbitMQ會(huì)等待消費(fèi)者顯式的回復(fù)確認(rèn)信號(hào)后才會(huì)從內(nèi)存(或者磁盤)中刪除。

建議將autoAck設(shè)置為false邀摆,這樣消費(fèi)者就有足夠的時(shí)間處理消息纵顾,不用擔(dān)心處理消息過(guò)程中消費(fèi)者宕機(jī)造成消息丟失。

此時(shí)栋盹,隊(duì)列里的消息就分成了2個(gè)部分:

  1. 等待投遞給消費(fèi)者的消息(下圖中的Ready部分)
  2. 已經(jīng)投遞給消費(fèi)者施逾,但是還沒(méi)有收到消費(fèi)者確認(rèn)信號(hào)的消息(下圖中的Unacked部分)
RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)?

如果RabbitMQ一直沒(méi)有收到消費(fèi)者的確認(rèn)信號(hào)例获,并且消費(fèi)此消息的消費(fèi)者已經(jīng)斷開連接汉额,則RabbitMQ會(huì)安排該消息重新進(jìn)入隊(duì)列,等待投遞給下一個(gè)消費(fèi)者榨汤,當(dāng)然也有可能還是原來(lái)的那個(gè)消費(fèi)者蠕搜。

RabbitMQ不會(huì)為未確認(rèn)的消息設(shè)置過(guò)期時(shí)間,它判斷此消息是否需要重新投遞給消費(fèi)者的唯一依據(jù)是消費(fèi)該消息的消費(fèi)者連接是否已經(jīng)斷開件余,這么設(shè)計(jì)的原因是RabbitMQ允許消費(fèi)者消費(fèi)一條消息的時(shí)間可以很久很久讥脐。

為了便于理解遭居,我們舉個(gè)具體的例子,生產(chǎn)者的話的我們延用上文中的DurableProducer:

package com.zwwhnly.springbootaction.rabbitmq.durable;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DurableProducer {
    private final static String EXCHANGE_NAME = "durable-exchange";
    private final static String QUEUE_NAME = "durable-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 創(chuàng)建連接
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置 RabbitMQ 的主機(jī)名
        factory.setHost("localhost");
        // 創(chuàng)建一個(gè)連接
        Connection connection = factory.newConnection();
        // 創(chuàng)建一個(gè)通道
        Channel channel = connection.createChannel();
        // 創(chuàng)建一個(gè)Exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 發(fā)送消息
        String message = "durable exchange test";
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
        channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

        // 關(guān)閉頻道和連接
        channel.close();
        connection.close();
    }
}

然后新建一個(gè)消費(fèi)者AckConsumer類:

package com.zwwhnly.springbootaction.rabbitmq.ack;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class AckConsumer {
    private final static String QUEUE_NAME = "durable-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 創(chuàng)建連接
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置 RabbitMQ 的主機(jī)名
        factory.setHost("localhost");
        // 創(chuàng)建一個(gè)連接
        Connection connection = factory.newConnection();
        // 創(chuàng)建一個(gè)通道
        Channel channel = connection.createChannel();
        // 創(chuàng)建隊(duì)列消費(fèi)者
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                int result = 1 / 0;
                System.out.println("Received Message '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

我們先將autoAck參數(shù)設(shè)置為ture旬渠,即自動(dòng)確認(rèn)俱萍,并在消費(fèi)消息時(shí)故意寫個(gè)異常,然后先運(yùn)行生產(chǎn)者客戶端將消息寫入隊(duì)列中告丢,然后運(yùn)行消費(fèi)者客戶端枪蘑,發(fā)現(xiàn)消息未消費(fèi)成功但是卻消失了:

RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)?
RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)岖免?

然后我們將autoAck設(shè)置為false:

channel.basicConsume(QUEUE_NAME, false, consumer);

再次運(yùn)行生產(chǎn)者客戶端將消息寫入隊(duì)列中岳颇,然后運(yùn)行消費(fèi)者客戶端,此時(shí)雖然消費(fèi)者客戶端仍然代碼異常颅湘,但是消息仍然在隊(duì)列中:

RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)话侧?

然后我們刪除掉消費(fèi)者客戶端中的異常代碼,重新啟動(dòng)消費(fèi)者客戶端闯参,發(fā)現(xiàn)消息消費(fèi)成功了瞻鹏,但是消息一直未Ack:

RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)?
RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)鹿寨?

手動(dòng)停掉消費(fèi)者客戶端新博,發(fā)現(xiàn)消息又到了Ready狀態(tài),準(zhǔn)備重新投遞:

RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)脚草?

之所以消費(fèi)掉消息赫悄,卻一直還是Unacked狀態(tài),是因?yàn)槲覀儧](méi)在代碼中添加顯式的Ack代碼:

String message = new String(body, "UTF-8");
//int result = 1 / 0;
System.out.println("Received Message '" + message + "'");

long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);

deliveryTag可以看做消息的編號(hào)馏慨,它是一個(gè)64位的長(zhǎng)×××值埂淮。

此時(shí)運(yùn)行消費(fèi)者客戶端,發(fā)現(xiàn)消息消費(fèi)成功写隶,并且在隊(duì)列中被移除:

RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)同诫?
RabbitMQ如何保證隊(duì)列里的消息99.99%被消費(fèi)?

文末彩蛋

[Java學(xué)習(xí)樟澜、面試;文檔叮盘、視頻資源免費(fèi)獲取]

加QQ群:219571750

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末秩贰,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子柔吼,更是在濱河造成了極大的恐慌毒费,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,884評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件愈魏,死亡現(xiàn)場(chǎng)離奇詭異觅玻,居然都是意外死亡想际,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門溪厘,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)胡本,“玉大人,你說(shuō)我怎么就攤上這事畸悬〔喔Γ” “怎么了?”我有些...
    開封第一講書人閱讀 157,435評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵蹋宦,是天一觀的道長(zhǎng)披粟。 經(jīng)常有香客問(wèn)我,道長(zhǎng)冷冗,這世上最難降的妖魔是什么守屉? 我笑而不...
    開封第一講書人閱讀 56,509評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮蒿辙,結(jié)果婚禮上拇泛,老公的妹妹穿的比我還像新娘。我一直安慰自己须板,他們只是感情好碰镜,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,611評(píng)論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著习瑰,像睡著了一般绪颖。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上甜奄,一...
    開封第一講書人閱讀 49,837評(píng)論 1 290
  • 那天柠横,我揣著相機(jī)與錄音,去河邊找鬼课兄。 笑死牍氛,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的烟阐。 我是一名探鬼主播搬俊,決...
    沈念sama閱讀 38,987評(píng)論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼蜒茄!你這毒婦竟也來(lái)了唉擂?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,730評(píng)論 0 267
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤檀葛,失蹤者是張志新(化名)和其女友劉穎玩祟,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體屿聋,經(jīng)...
    沈念sama閱讀 44,194評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡空扎,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,525評(píng)論 2 327
  • 正文 我和宋清朗相戀三年藏鹊,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片转锈。...
    茶點(diǎn)故事閱讀 38,664評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡盘寡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出黑忱,到底是詐尸還是另有隱情宴抚,我是刑警寧澤,帶...
    沈念sama閱讀 34,334評(píng)論 4 330
  • 正文 年R本政府宣布甫煞,位于F島的核電站菇曲,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏抚吠。R本人自食惡果不足惜常潮,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,944評(píng)論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望楷力。 院中可真熱鬧喊式,春花似錦、人聲如沸萧朝。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,764評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)检柬。三九已至献联,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間何址,已是汗流浹背里逆。 一陣腳步聲響...
    開封第一講書人閱讀 31,997評(píng)論 1 266
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留用爪,地道東北人原押。 一個(gè)月前我還...
    沈念sama閱讀 46,389評(píng)論 2 360
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像偎血,于是被迫代替她去往敵國(guó)和親诸衔。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,554評(píng)論 2 349

推薦閱讀更多精彩內(nèi)容

  • 本文章翻譯自http://www.rabbitmq.com/api-guide.html颇玷,并沒(méi)有及時(shí)更新署隘。 術(shù)語(yǔ)對(duì)...
    joyenlee閱讀 7,645評(píng)論 0 3
  • 利用RabbitMQ集群橫向擴(kuò)展能力,均衡流量壓力亚隙,讓消息集群的秒級(jí)服務(wù)能力達(dá)到百萬(wàn),Google曾做過(guò)此類實(shí)驗(yàn)违崇;...
    有貨技術(shù)閱讀 3,455評(píng)論 0 1
  • 走在清晨的陽(yáng)光里 一路繁花相伴 路邊樹上的鳥兒哼著歌 街邊包子鋪的大爺在狹窄的房間里忙碌著 孩子們背著書包蹦蹦跳跳...
    詩(shī)小情閱讀 415評(píng)論 6 4
  • 喜怒哀樂(lè)是人類最基本的情緒阿弃。研究表明诊霹,很多身體的疾病與心理因素密切相關(guān)。70%以上的人會(huì)遭受到情緒對(duì)身體器官的“攻...
    蔣一多閱讀 357評(píng)論 2 7
  • 自古以來(lái)渣淳,寫字是人們基本要掌握的東西脾还,寫字在我們的生活中起到了必不可少的作用。 小的時(shí)候入愧,媽媽就開始教我寫字了...
    范雯寧閱讀 271評(píng)論 0 0