RabbitMQ生產(chǎn)端消息可靠性投遞方案分析

前言

導(dǎo)文:
1.什么是RabbitMQ
2.Java開發(fā)技術(shù)大雜燴(三)之電商項(xiàng)目?jī)?yōu)化、rabbitmq腾仅、Git乒裆、OSI套利、VIM推励、Intellj IDEA、HTTP肉迫、JS验辞、Java

之前在上面2篇文章中,講到過RabbitMQ的安裝喊衫,基本概念和用法跌造。我們來回顧一下RabbitMQ核心基礎(chǔ)概念。

  • Server:又稱之為Broker族购,接受客戶端的連接壳贪,實(shí)現(xiàn)AMQP實(shí)體服務(wù)。

  • Connection:連接寝杖,應(yīng)用程序與Broker的網(wǎng)絡(luò)連接违施。

  • Channel:網(wǎng)絡(luò)信道,幾乎所有的操作都在Channel中進(jìn)行瑟幕,Channel是進(jìn)行消息讀寫的通道磕蒲×袅剩客戶端可以建立多個(gè)Channel,每個(gè)Channel代表一個(gè)會(huì)話任務(wù)辣往。
    如果每一次訪問RabbitMQ都建立一個(gè)Connection兔院,在消息量大的時(shí)候建立TCP Connection的開銷將是巨大的,效率也較低站削。Channel是在connection內(nèi)部建立的邏輯連接坊萝,如果應(yīng)用程序支持多線程,通常每個(gè)thread創(chuàng)建單獨(dú)的channel進(jìn)行通訊许起,AMQP method包含了channel id幫助客戶端和message broker識(shí)別channel屹堰,所以channel之間是完全隔離的。Channel作為輕量級(jí)的Connection極大減少了操作系統(tǒng)建立TCP connection的開銷街氢。

  • Message:消息扯键,服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù),由Message Properties和Body組成珊肃。Properties可以對(duì)消息進(jìn)行修飾荣刑,比如消息的優(yōu)先級(jí),延遲等高級(jí)特性伦乔,Body就是消息體內(nèi)容厉亏。

  • Virtual Host:虛擬地址,用于進(jìn)行邏輯隔離烈和,最上層的消息路由爱只。一個(gè)Virtual Host里面可以有若干個(gè)Exchange和Queue,同一個(gè)Virtual Host里面不能有相同名稱的Exchange或者Queue招刹。

  • Exchange:交換機(jī)恬试,接收消息,根據(jù)路由鍵轉(zhuǎn)發(fā)消息到綁定的隊(duì)列疯暑。

  • Binding:Exchange和Queue之間的虛擬連接训柴,binding中可以包含routing key。

  • Routing key:一個(gè)路由規(guī)則妇拯,虛擬機(jī)可以用它來確定如何路由一個(gè)特定消息幻馁。

  • Queue:也可以稱之為Message Queue(消息隊(duì)列),保存消息并將它們轉(zhuǎn)發(fā)到消費(fèi)者越锈。

通過下面2張圖仗嗦,我們能大概能明白AMQP協(xié)議模型和消息流轉(zhuǎn)過程。在Exchange和Message Queue上面還有Virtual host甘凭。記住同一個(gè)Virtual Host里面不能有相同名稱的ExChange和Message Queue稀拐。


image.png
image.png

接著我們看下面的圖,這是RabbitMQ消息可靠性投遞的解決方案之一对蒲。


image.png

1.將消息落地到業(yè)務(wù)db和Message db钩蚊。
2.采用Confirm方式發(fā)送消息至MQ Broker贡翘,返回結(jié)果的過程是異步的。Confirm消息砰逻,是指生產(chǎn)者投遞消息后鸣驱,如果Broker收到消息后,會(huì)給生產(chǎn)者一個(gè)ACK蝠咆。生產(chǎn)者通過ACK踊东,可以確認(rèn)這條消息是否正常發(fā)送到Broker,這種方式是消息可靠性投遞的核心刚操。
3闸翅、4:在這里將消息分成3種狀態(tài)。status=0表示消息正在投遞中菊霜,status=1表示消息投遞成功坚冀,status=2表示消息投遞了3次還是失敗。生產(chǎn)者接收Broker返回的Confirm確認(rèn)消息結(jié)果鉴逞,然后根據(jù)結(jié)果更新消息的狀態(tài)记某。將status的狀態(tài)從投遞中改成投遞成功即可。
5.在消息Confirm過程中构捡,可能由于網(wǎng)絡(luò)閃斷問題或者是Broker端出現(xiàn)異常液南,導(dǎo)致回送消息失敗或者出現(xiàn)異常。這時(shí)候勾徽,就需要生產(chǎn)者對(duì)消息進(jìn)行可靠性投遞滑凉,保證投遞到Broker的消息可靠不丟失。還有一種極端情況值得我們考慮喘帚,那就是網(wǎng)絡(luò)閃斷畅姊。我們的消息成功投遞到Broker,但是在回送ACK確認(rèn)消息時(shí)啥辨,由于網(wǎng)絡(luò)閃斷涡匀,生產(chǎn)者沒有收到。此時(shí)我們?cè)僦匦峦哆f此消息可能會(huì)造成消費(fèi)端重復(fù)消費(fèi)消息了溉知。這時(shí)候需要消費(fèi)端去做冪等處理(生成全局消息ID,判斷此消息是否消費(fèi)過)腕够。對(duì)于沒有投遞成功的消息级乍,我們可以設(shè)置一個(gè)重新投遞時(shí)間。比如一個(gè)消息在5分鐘內(nèi)帚湘,status狀態(tài)還是0玫荣,也就是這個(gè)消息還沒有成功投遞到Broker端。這時(shí)候我們需要一個(gè)定時(shí)任務(wù)大诸,每隔幾分鐘從Message db中拉取status為0的消息捅厂。
6.將拉取的消息執(zhí)行重新投遞操作贯卦。
7.設(shè)置最大消息投遞次數(shù)。當(dāng)一個(gè)消息被投遞了3次焙贷,還是不成功撵割,那么將status置為2。最后交給人工解決處理此類問題或者將消息轉(zhuǎn)存到失敗表辙芍。

下面講解一下涉及到消息可靠性的知識(shí)點(diǎn)和一些配置了啡彬。

application-dev.properties

#rabbtisMQ配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.virtual-host=/
#消費(fèi)者數(shù)量
spring.rabbitmq.listener.simple.concurrency=10
#最大消費(fèi)者數(shù)量
spring.rabbitmq.listener.simple.max-concurrency=10
#消費(fèi)者每次從隊(duì)列獲取的消息數(shù)量
spring.rabbitmq.listener.simple.prefetch=1
#消費(fèi)者自動(dòng)啟動(dòng)
spring.rabbitmq.listener.simple.auto-startup=true
#消費(fèi)失敗,自動(dòng)重新入隊(duì)
#重試次數(shù)超過最大限制之后是否丟棄(true不丟棄時(shí)需要寫相應(yīng)代碼將該消息加入死信隊(duì)列)
#true故硅,自動(dòng)重新入隊(duì)庶灿,要寫相應(yīng)代碼將該消息加入死信隊(duì)列
#false,丟棄
spring.rabbitmq.listener.simple.default-requeue-rejected=false
#是否開啟消費(fèi)者重試(為false時(shí)關(guān)閉消費(fèi)者重試,這時(shí)消費(fèi)端代碼異常會(huì)一直重復(fù)收到消息)
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.initial-interval=1000
spring.rabbitmq.listener.simple.retry.max-attempts=3
spring.rabbitmq.listener.simple.retry.multiplier=1.0
spring.rabbitmq.listener.simple.retry.max-interval=10000

#啟動(dòng)發(fā)送重試策略
spring.rabbitmq.template.retry.enabled=true
#初始重試間隔為1s
spring.rabbitmq.template.retry.initial-interval=1000
#重試的最大次數(shù)
spring.rabbitmq.template.retry.max-attempts=3
#重試間隔最多10s
spring.rabbitmq.template.retry.max-interval=10000
#每次重試的因子是1.0 等差
spring.rabbitmq.template.retry.multiplier=1.0
#
#RabbitMQ的消息確認(rèn)有兩種吃衅。
#一種是消息發(fā)送確認(rèn)往踢。這種是用來確認(rèn)生產(chǎn)者將消息發(fā)送給交換器,交換器傳遞給隊(duì)列的過程中徘层,
# 消息是否成功投遞菲语。
#發(fā)送確認(rèn)分為兩步,一是確認(rèn)是否到達(dá)交換器惑灵,二是確認(rèn)是否到達(dá)隊(duì)列山上。
#第二種是消費(fèi)接收確認(rèn)。這種是確認(rèn)消費(fèi)者是否成功消費(fèi)了隊(duì)列中的消息英支。
# 確認(rèn)消息發(fā)送成功佩憾,通過實(shí)現(xiàn)ConfirmCallBack接口,消息發(fā)送到交換器Exchange后觸發(fā)回調(diào)
spring.rabbitmq.publisher-confirms=true
# 實(shí)現(xiàn)ReturnCallback接口干花,如果消息從交換器發(fā)送到對(duì)應(yīng)隊(duì)列失敗時(shí)觸發(fā)
# (比如根據(jù)發(fā)送消息時(shí)指定的routingKey找不到隊(duì)列時(shí)會(huì)觸發(fā))
spring.rabbitmq.publisher-returns=true
# 消息消費(fèi)確認(rèn)妄帘,可以手動(dòng)確認(rèn)
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#在消息沒有被路由到合適隊(duì)列情況下會(huì)將消息返還給消息發(fā)布者
#當(dāng)mandatory標(biāo)志位設(shè)置為true時(shí),如果exchange根據(jù)自身類型和消息routingKey無法找到一個(gè)合適的queue存儲(chǔ)消息池凄,
# 那么broker會(huì)調(diào)用basic.return方法將消息返還給生產(chǎn)者;當(dāng)mandatory設(shè)置為false時(shí)抡驼,
# 出現(xiàn)上述情況broker會(huì)直接將消息丟棄;通俗的講,mandatory標(biāo)志告訴broker代理服務(wù)器至少將消息route到一個(gè)隊(duì)列中肿仑,
# 否則就將消息return給發(fā)送者;
spring.rabbitmq.template.mandatory=true

要確保RabbitMQ消息的可靠要保證以下3點(diǎn):
1.publisher Confirms:要確保生產(chǎn)者的消息到broker的可靠性致盟。可能會(huì)發(fā)生消息投遞到broker過程中,broker掛了的情況。
2.Exchange冀值,Queue,Message持久化:RabbitMQ是典型的內(nèi)存式消息堆積杯道。我們需要把message存儲(chǔ)到磁盤中。如果是未持久化的消息存儲(chǔ)在內(nèi)存中责蝠,broker掛了那么消息會(huì)丟失党巾。
3.consumer acknowledgement:消費(fèi)者確認(rèn)模式有3種:none(沒有消息會(huì)發(fā)送應(yīng)答),auto(自動(dòng)應(yīng)答),manual(手動(dòng)應(yīng)答)萎庭。為了保證消息可靠性,我們?cè)O(shè)置手動(dòng)應(yīng)答齿拂,這是為什么呢驳规?采用自動(dòng)應(yīng)答的方式,每次消費(fèi)端收到消息后创肥,不管是否處理完成达舒,Broker都會(huì)把這條消息置為完成,然后從Queue中刪除叹侄。如果消費(fèi)端消費(fèi)時(shí)巩搏,拋出異常。也就是說消費(fèi)端沒有成功消費(fèi)該消息趾代,從而造成消息丟失贯底。為了確保消息被消費(fèi)者正確處理,我們采用手動(dòng)應(yīng)答(調(diào)用basicAck撒强、basicNack禽捆、basicReject方法),只有在消息得到正確處理下飘哨,再發(fā)送ACK胚想。

RabbitMQ消息確認(rèn)有2種:消息發(fā)送確認(rèn),消費(fèi)接收確認(rèn)芽隆。消息發(fā)送確認(rèn)是確認(rèn)生產(chǎn)者將消息發(fā)送到Exchange浊服,Exchange分發(fā)消息至Queue的過程中,消息是否可靠投遞胚吁。第一步是否到達(dá)Exchange牙躺,第二步確認(rèn)是否到達(dá)Queue。

實(shí)現(xiàn)ConfirmCallBack接口,消息發(fā)送到Exchange后觸發(fā)回調(diào)腕扶。

    // 消息發(fā)送到交換器Exchange后觸發(fā)回調(diào)
    private final RabbitTemplate.ConfirmCallback confirmCallback =
            new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    log.info("生產(chǎn)端confirm...");
                    log.info("correlationData=" + correlationData);
                    String messageId = correlationData.getId();
                    if (ack) {
                        //confirm返回成功,更新消息投遞狀態(tài)
                        brokerMessageLogMapper.updateMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date());
                    } else {
                        // 失敗則進(jìn)行具體的后續(xù)操作孽拷,重試或者補(bǔ)償?shù)仁侄巍?                        log.info("異常處理...");
                    }
                }
            };

實(shí)現(xiàn)ReturnCallBack接口,消息從Exchange發(fā)送到指定的Queue失敗觸發(fā)回調(diào)

    // 如果消息從交換器發(fā)送到對(duì)應(yīng)隊(duì)列失敗時(shí)觸發(fā)
    private final RabbitTemplate.ReturnCallback returnCallback =
            new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    log.info("message=" + message.toString());
                    log.info("replyCode=" + replyCode);
                    log.info("replyText=" + replyText);
                    log.info("exchange=" + exchange);
                    log.info("routingKey=" + routingKey);
                }
            };

消息確認(rèn)機(jī)制開啟半抱,需要配置以下信息

spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.listener.simple.acknowledge-mode=manual

之前說過手動(dòng)應(yīng)答可以調(diào)用basicAck,basicNack,basicReject方法脓恕,下面來講講。

手動(dòng)確認(rèn)消息代虾,當(dāng)multiple為false进肯,只確認(rèn)當(dāng)前的消息。當(dāng)multiple為true棉磨,批量確認(rèn)所有比當(dāng)前deliveryTag小的消息。deliveryTag是用來標(biāo)識(shí)Channel中投遞的消息学辱。RabbitMQ保證在每個(gè)Channel中乘瓤,消息的deliveryTag是從1遞增环形。


image.png

當(dāng)消費(fèi)端處理消息異常時(shí),我們可以選擇處理失敗消息的方式衙傀。如果requeue為true抬吟,失敗消息會(huì)重新進(jìn)入Queue,試想一下统抬,如果消費(fèi)者在消費(fèi)時(shí)發(fā)生異常火本,那么就不會(huì)對(duì)這一次消息進(jìn)行ACK,進(jìn)而發(fā)生回滾消息的操作聪建,使消息始終放在Queue的頭部钙畔,然后不斷的被處理和回滾,導(dǎo)致隊(duì)列陷入死循環(huán)金麸,為了解決這種問題擎析,我們可以引入重試機(jī)制(當(dāng)重試次數(shù)超過最大值,丟棄該消息)或者是死信隊(duì)列+重試隊(duì)列挥下。
requeue為false揍魂,丟棄該消息。


image.png

和basicNack用法一樣棚瘟。


image.png

為了配合Return機(jī)制现斋,我們要配置spring.rabbitmq.template.mandatory=true。它的作用是在消息沒有被路由到合適的隊(duì)列情況下偎蘸,Broker會(huì)將消息返回給生產(chǎn)者庄蹋。當(dāng)mandatory為true時(shí),如果Exchange根據(jù)類型和消息Routing Key無法路由到一個(gè)合適的Queue存儲(chǔ)消息禀苦,那么Broker會(huì)調(diào)用Basic.Return回調(diào)給handleReturn()蔓肯,再回調(diào)給ReturnCallback,將消息返回給生產(chǎn)者振乏。當(dāng)mandatory為false時(shí)蔗包,丟棄該消息。

    @Override
    public void handleReturn(int replyCode,
            String replyText,
            String exchange,
            String routingKey,
            BasicProperties properties,
            byte[] body)
        throws IOException {

        ReturnCallback returnCallback = this.returnCallback;
        if (returnCallback == null) {
            Object messageTagHeader = properties.getHeaders().remove(RETURN_CORRELATION_KEY);
            if (messageTagHeader != null) {
                String messageTag = messageTagHeader.toString();
                final PendingReply pendingReply = this.replyHolder.get(messageTag);
                if (pendingReply != null) {
                    returnCallback = new ReturnCallback() {

                        @Override
                        public void returnedMessage(Message message, int replyCode, String replyText, String exchange,
                                String routingKey) {
                            pendingReply.returned(new AmqpMessageReturnedException("Message returned",
                                    message, replyCode, replyText, exchange, routingKey));
                        }
                    };
                }
                else if (logger.isWarnEnabled()) {
                    logger.warn("Returned request message but caller has timed out");
                }
            }
            else if (logger.isWarnEnabled()) {
                logger.warn("Returned message but no callback available");
            }
        }
        if (returnCallback != null) {
            properties.getHeaders().remove(PublisherCallbackChannel.RETURN_CORRELATION_KEY);
            MessageProperties messageProperties = this.messagePropertiesConverter.toMessageProperties(
                    properties, null, this.encoding);
            Message returnedMessage = new Message(body, messageProperties);
            returnCallback.returnedMessage(returnedMessage,
                    replyCode, replyText, exchange, routingKey);
        }
    }

當(dāng)消息路由不到合適的Queue慧邮,會(huì)在回調(diào)給ReturnCallck這些信息调限。


image.png

如果消費(fèi)端忘記了ACK,這些消息會(huì)一直處于Unacked 狀態(tài)误澳。由于RabbitMQ消息消費(fèi)沒有超時(shí)機(jī)制耻矮,也就是程序不重啟,消息會(huì)一直處于Unacked狀態(tài)忆谓。當(dāng)消費(fèi)端程序關(guān)閉時(shí)裆装,這些處于Unack狀態(tài)的消息會(huì)重新恢復(fù)成Ready狀態(tài)。這時(shí)候會(huì)出現(xiàn)一種情況:當(dāng)消費(fèi)端程序開啟時(shí),由于Broker端積壓了大量的消息哨免,又可能會(huì)讓消費(fèi)端崩潰茎活。所以我們要對(duì)消費(fèi)端進(jìn)行限流處理。RabbitMQ提供了一種qos(Quality of Service,服務(wù)質(zhì)量保證)功能琢唾,即在非自動(dòng)ACK前提下载荔,如果一定數(shù)量的消息未被ACK前,不進(jìn)行新消息的消息采桃。

image.png
spring.rabbitmq.listener.simple.prefetch=1
image.png

下面貼消息可靠性解決方案代碼了懒熙。

配置任務(wù)調(diào)度中心

@Configuration
@EnableScheduling
public class TaskSchedulerConfig implements SchedulingConfigurer {

    protected ThreadPoolExecutor threadPoolExecutor;

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        taskRegistrar.setScheduler(taskExecutor());
    }

    @Bean(destroyMethod = "shutdown")
    public ThreadPoolExecutor taskExecutor() {
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("task-executor-pool-%d").build();
        this.threadPoolExecutor = new ScheduledThreadPoolExecutor(10, namedThreadFactory,
                new ThreadPoolExecutor.AbortPolicy());
        return threadPoolExecutor;
    }

}

執(zhí)行重新投遞status為0的消息。這里也可以使用corn表達(dá)式設(shè)置觸發(fā)任務(wù)調(diào)度的時(shí)間普办。關(guān)于fixedRate和fixedDelay概念總有人搞混工扎。fixedRate任務(wù)兩次執(zhí)行時(shí)間間隔是任務(wù)的開始點(diǎn),而fixedDelay的間隔是前次任務(wù)的結(jié)束和下一次任務(wù)開始的間隔泌豆。

@Component
@Slf4j
public class RetryMessageTask {

    @Autowired
    private RabbitmqOrderSender rabbitmqOrderSender;

    @Autowired
    private BrokerMessageLogMapper brokerMessageLogMapper;

    @Scheduled(initialDelay = 5000, fixedDelay = 30000)
    public void trySendMessage() {
        log.info("定時(shí)投遞status為0的消息...");
        List<BrokerMessageLog> brokerMessageLogList = brokerMessageLogMapper.listStatusAndTimeoutMessage();
        brokerMessageLogList.forEach(brokerMessageLog -> {
            if (brokerMessageLog.getTryCount() >= 3) {
                log.info("投遞3次還是失敗...");
                brokerMessageLogMapper.updateMessageLogStatus(brokerMessageLog.getMessageId(),
                        Constants.ORDER_SEND_FAIL,
                        new Date());
            } else {
                log.info("投遞失敗...");
                brokerMessageLogMapper.updateReSendMessage(brokerMessageLog.getMessageId(),
                        new Date());
                Order order = JSON.parseObject(brokerMessageLog.getMessage(), Order.class);
                try {
                    rabbitmqOrderSender.sendOrder(order);
                } catch (Exception e) {
                    log.error("重新投遞消息發(fā)送異常...:" + e.getMessage());
                }
            }
        });
    }
}

消息生產(chǎn)端

@Component
@Slf4j
public class RabbitmqOrderSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private BrokerMessageLogMapper brokerMessageLogMapper;

    // 消息發(fā)送到交換器Exchange后觸發(fā)回調(diào)
    private final RabbitTemplate.ConfirmCallback confirmCallback =
            new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    log.info("生產(chǎn)端confirm...");
                    log.info("correlationData=" + correlationData);
                    String messageId = correlationData.getId();
                    if (ack) {
                        //confirm返回成功,更新消息投遞狀態(tài)
                        brokerMessageLogMapper.updateMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date());
                    } else {
                        // 失敗則進(jìn)行具體的后續(xù)操作定庵,重試或者補(bǔ)償?shù)仁侄巍?                        log.info("異常處理...");
                    }
                }
            };

    // 如果消息從交換器發(fā)送到對(duì)應(yīng)隊(duì)列失敗時(shí)觸發(fā)
    private final RabbitTemplate.ReturnCallback returnCallback =
            new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    log.info("message=" + message.toString());
                    log.info("replyCode=" + replyCode);
                    log.info("replyText=" + replyText);
                    log.info("exchange=" + exchange);
                    log.info("routingKey=" + routingKey);
                }
            };

    public void sendOrder(Order order) {
        log.info("生產(chǎn)端發(fā)送消息...");
        rabbitTemplate.setConfirmCallback(this.confirmCallback);
        rabbitTemplate.setReturnCallback(this.returnCallback);
        CorrelationData correlationData = new CorrelationData(order.getMessageId());
        rabbitTemplate.convertAndSend(MQConfig.ORDER_DIRECT_EXCAHNGE,
                MQConfig.ORDER_QUEUE,  order, correlationData);
    }
}

消息消費(fèi)端

@Component
@Slf4j
public class RabbitmqOrderReceiver {

    @RabbitListener(queues = MQConfig.ORDER_QUEUE)
    public void receive(@Payload Order order, Channel channel,
                        @Headers Map<String, Object> headers,
                        Message message) throws IOException, InterruptedException {
        log.info("消費(fèi)端接收消息...");
        log.info("message=" + message.toString());
        log.info("order=" + order);
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        log.info("deliveryTag=" + deliveryTag);
        // 手工ack
        channel.basicAck(deliveryTag, false);
    }
}

當(dāng)我們發(fā)送消息時(shí),故意將Exchange設(shè)置成一個(gè)不存在的值踪危。消息路由不到合適的Exchange蔬浙,Confirm機(jī)制回送的ACK會(huì)返回false,走異常處理贞远。這個(gè)消息的狀態(tài)不會(huì)更新成1畴博。然后定時(shí)任務(wù)會(huì)拉取status為0的消息,進(jìn)行重新投遞蓝仲,投遞了3次消息還未成功俱病,將status置為2。


image.png

接下來袱结,我們測(cè)試一波亮隙。

    @Test
    public void test() {
        Order order = new Order();
        order.setId("36");
        order.setName("cmazxiaoma測(cè)試訂單-36");
        order.setMessageId(UUIDUtil.uuid());

        rabbitmqOrderService.createOrder(order);
    }

消息投遞失敗。


image.png

定時(shí)任務(wù)重新投遞消息失敗垢夹。


image.png

將失敗的消息重新投遞3次還是失敗溢吻。


image.png

更新Message db信息,將重新投遞3次還是失敗的消息狀態(tài)置為2果元。


image.png

接著我們把消費(fèi)端手動(dòng)ACK的代碼注釋掉促王,再讓生產(chǎn)端發(fā)送消息《梗看看會(huì)出現(xiàn)什么情況蝇狼。


image.png

我們會(huì)發(fā)現(xiàn)Queue堆積了該消息。


image.png

我們關(guān)掉RabbitMQ Server倡怎,看看此消息是否會(huì)持久化迅耘。

[root@VM_0_11_centos log]# ps -ef|grep rabbitmq
root     13283 10291  0 13:42 pts/1    00:00:00 grep --color=auto rabbitmq
root     23051     1  1 Nov06 ?        00:09:29 /usr/lib64/erlang/erts-5.10.4/bin/beam -W w -A 64 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -K true -- -root /usr/lib64/erlang -progname erl -- -home /root -- -pa /usr/local/rabbitmq/ebin -noshell -noinput -s rabbit boot -sname rabbit@VM_0_11_centos -boot start_sasl -kernel inet_default_connect_options [{nodelay,true}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit error_logger {file,"/usr/local/rabbitmq/var/log/rabbitmq/rabbit@VM_0_11_centos.log"} -rabbit sasl_error_logger {file,"/usr/local/rabbitmq/var/log/rabbitmq/rabbit@VM_0_11_centos-sasl.log"} -rabbit enabled_plugins_file "/usr/local/rabbitmq/etc/rabbitmq/enabled_plugins" -rabbit plugins_dir "/usr/local/rabbitmq/plugins" -rabbit plugins_expand_dir "/usr/local/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@VM_0_11_centos-plugins-expand" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/usr/local/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@VM_0_11_centos" -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672 -noshell -noinput
[root@VM_0_11_centos log]# kill -9 23051

[root@VM_0_11_centos sbin]# rabbitmq-server -detached
Warning: PID file not written; -detached was passed.
[root@VM_0_11_centos sbin]# ps -ef|grep rabbitmq
root     13500     1 31 13:44 ?        00:00:02 /usr/lib64/erlang/erts-5.10.4/bin/beam -W w -A 64 -P 1048576 -t 5000000 -stbt db -zdbbl 128000 -K true -- -root /usr/lib64/erlang -progname erl -- -home /root -- -pa /usr/local/rabbitmq/ebin -noshell -noinput -s rabbit boot -sname rabbit@VM_0_11_centos -boot start_sasl -kernel inet_default_connect_options [{nodelay,true}] -sasl errlog_type error -sasl sasl_error_logger false -rabbit error_logger {file,"/usr/local/rabbitmq/var/log/rabbitmq/rabbit@VM_0_11_centos.log"} -rabbit sasl_error_logger {file,"/usr/local/rabbitmq/var/log/rabbitmq/rabbit@VM_0_11_centos-sasl.log"} -rabbit enabled_plugins_file "/usr/local/rabbitmq/etc/rabbitmq/enabled_plugins" -rabbit plugins_dir "/usr/local/rabbitmq/plugins" -rabbit plugins_expand_dir "/usr/local/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@VM_0_11_centos-plugins-expand" -os_mon start_cpu_sup false -os_mon start_disksup false -os_mon start_memsup false -mnesia dir "/usr/local/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@VM_0_11_centos" -kernel inet_dist_listen_min 25672 -kernel inet_dist_listen_max 25672 -noshell -noinput
root     13597 10291  0 13:44 pts/1    00:00:00 grep --color=auto rabbitmq

執(zhí)行rabbitmqctl list_queues name messages_ready messages_unacknowledged命令贱枣,查詢Queue情況,發(fā)現(xiàn)Message持久化了豹障。

image.png

image.png

斷開消費(fèi)者程序冯事,我們可以看到消息從Unacked狀態(tài)轉(zhuǎn)換成Ready了焦匈。


image.png

尾言

不管是神還是惡魔都不會(huì)對(duì)不抗?fàn)幍娜松斐鲈?/p>

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末血公,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子缓熟,更是在濱河造成了極大的恐慌累魔,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,718評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件够滑,死亡現(xiàn)場(chǎng)離奇詭異垦写,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)彰触,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門梯投,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人况毅,你說我怎么就攤上這事分蓖。” “怎么了尔许?”我有些...
    開封第一講書人閱讀 158,207評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵么鹤,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我味廊,道長(zhǎng)蒸甜,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,755評(píng)論 1 284
  • 正文 為了忘掉前任余佛,我火速辦了婚禮柠新,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘辉巡。我一直安慰自己恨憎,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,862評(píng)論 6 386
  • 文/花漫 我一把揭開白布红氯。 她就那樣靜靜地躺著框咙,像睡著了一般。 火紅的嫁衣襯著肌膚如雪痢甘。 梳的紋絲不亂的頭發(fā)上喇嘱,一...
    開封第一講書人閱讀 50,050評(píng)論 1 291
  • 那天,我揣著相機(jī)與錄音塞栅,去河邊找鬼者铜。 笑死,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的作烟。 我是一名探鬼主播愉粤,決...
    沈念sama閱讀 39,136評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼拿撩!你這毒婦竟也來了衣厘?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,882評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤压恒,失蹤者是張志新(化名)和其女友劉穎影暴,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體探赫,經(jīng)...
    沈念sama閱讀 44,330評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡型宙,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,651評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了伦吠。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片妆兑。...
    茶點(diǎn)故事閱讀 38,789評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖毛仪,靈堂內(nèi)的尸體忽然破棺而出搁嗓,到底是詐尸還是另有隱情,我是刑警寧澤潭千,帶...
    沈念sama閱讀 34,477評(píng)論 4 333
  • 正文 年R本政府宣布谱姓,位于F島的核電站,受9級(jí)特大地震影響刨晴,放射性物質(zhì)發(fā)生泄漏屉来。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,135評(píng)論 3 317
  • 文/蒙蒙 一狈癞、第九天 我趴在偏房一處隱蔽的房頂上張望茄靠。 院中可真熱鬧,春花似錦蝶桶、人聲如沸慨绳。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽脐雪。三九已至,卻和暖如春恢共,著一層夾襖步出監(jiān)牢的瞬間战秋,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評(píng)論 1 267
  • 我被黑心中介騙來泰國打工讨韭, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留脂信,地道東北人癣蟋。 一個(gè)月前我還...
    沈念sama閱讀 46,598評(píng)論 2 362
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像狰闪,于是被迫代替她去往敵國和親疯搅。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,697評(píng)論 2 351

推薦閱讀更多精彩內(nèi)容

  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器埋泵。支持消息的持久化幔欧、事務(wù)、擁塞控...
    jiangmo閱讀 10,353評(píng)論 2 34
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理秋泄,服務(wù)發(fā)現(xiàn)琐馆,斷路器,智...
    卡卡羅2017閱讀 134,638評(píng)論 18 139
  • 1. 歷史 RabbitMQ是一個(gè)由erlang開發(fā)的AMQP(Advanced Message Queue )的...
    高廣超閱讀 6,092評(píng)論 3 51
  • 1 RabbitMQ安裝部署 這里是ErLang環(huán)境的下載地址http://www.erlang.org/down...
    Bobby0322閱讀 2,228評(píng)論 0 11
  • 本文大綱 RabbitMQ 歷史 RabbitMQ 應(yīng)用場(chǎng)景 RabbitMQ 系統(tǒng)架構(gòu) RabbitMQ 基本概...
    Java_Explorer閱讀 16,331評(píng)論 1 40