第四章----SpringBoot+RabbitMQ發(fā)送確認(rèn)和消費(fèi)手動(dòng)確認(rèn)機(jī)制

1. 配置RabbitMQ

# 發(fā)送確認(rèn)
spring.rabbitmq.publisher-confirms=true
# 發(fā)送回調(diào)
spring.rabbitmq.publisher-returns=true
# 消費(fèi)手動(dòng)確認(rèn)
spring.rabbitmq.listener.simple.acknowledge-mode=manual

2. 生產(chǎn)者發(fā)送消息確認(rèn)機(jī)制

  • 其實(shí)這個(gè)也不能叫確認(rèn)機(jī)制,只是起到一個(gè)監(jiān)聽的作用唐含,監(jiān)聽生產(chǎn)者是否發(fā)送消息到exchange和queue。
  • 生產(chǎn)者和消費(fèi)者代碼不改變彻亲。
  • 新建配置類 MQProducerAckConfig.java 實(shí)現(xiàn)ConfirmCallback和ReturnCallback接口堕担,@Component注冊(cè)成組件。
  • ConfirmCallback只確認(rèn)消息是否到達(dá)exchange座掘,已實(shí)現(xiàn)方法confirm中ack屬性為標(biāo)準(zhǔn)递惋,true到達(dá),反之進(jìn)入黑洞溢陪。
  • ReturnCallback消息沒有正確到達(dá)隊(duì)列時(shí)觸發(fā)回調(diào)萍虽,如果正確到達(dá)隊(duì)列不執(zhí)行。
package com.fzb.rabbitmq.config;

import org.apache.commons.lang3.SerializationUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @Description 消息發(fā)送確認(rèn)
 * <p>
 * ConfirmCallback  只確認(rèn)消息是否正確到達(dá) Exchange 中
 * ReturnCallback   消息沒有正確到達(dá)隊(duì)列時(shí)觸發(fā)回調(diào)形真,如果正確到達(dá)隊(duì)列不執(zhí)行
 * <p>
 * 1. 如果消息沒有到exchange,則confirm回調(diào),ack=false
 * 2. 如果消息到達(dá)exchange,則confirm回調(diào),ack=true
 * 3. exchange到queue成功,則不回調(diào)return
 * 4. exchange到queue失敗,則回調(diào)return
 * @Author jxb
 * @Date 2019-04-04 16:57:04
 */
@Component
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);            //指定 ConfirmCallback
        rabbitTemplate.setReturnCallback(this);             //指定 ReturnCallback

    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息發(fā)送成功" + correlationData);
        } else {
            System.out.println("消息發(fā)送失敗:" + cause);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        // 反序列化對(duì)象輸出
        System.out.println("消息主體: " + SerializationUtils.deserialize(message.getBody()));
        System.out.println("應(yīng)答碼: " + replyCode);
        System.out.println("描述:" + replyText);
        System.out.println("消息使用的交換器 exchange : " + exchange);
        System.out.println("消息使用的路由鍵 routing : " + routingKey);
    }
}

3. 消費(fèi)者消息手動(dòng)確認(rèn)

  • SpringBoot集成RabbitMQ確認(rèn)機(jī)制分為三種:none杉编、auto(默認(rèn))、manual

Auto:
1. 如果消息成功被消費(fèi)(成功的意思是在消費(fèi)的過程中沒有拋出異常)咆霜,則自動(dòng)確認(rèn)
2. 當(dāng)拋出 AmqpRejectAndDontRequeueException 異常的時(shí)候邓馒,則消息會(huì)被拒絕,且 requeue = false(不重新入隊(duì)列)
3. 當(dāng)拋出 ImmediateAcknowledgeAmqpException 異常蛾坯,則消費(fèi)者會(huì)被確認(rèn)
4. 其他的異常光酣,則消息會(huì)被拒絕,且 requeue = true脉课,此時(shí)會(huì)發(fā)生死循環(huán)挂疆,可以通過 setDefaultRequeueRejected(默認(rèn)是true)去設(shè)置拋棄消息

  • 如設(shè)置成manual手動(dòng)確認(rèn),一定要對(duì)消息做出應(yīng)答下翎,否則rabbit認(rèn)為當(dāng)前隊(duì)列沒有消費(fèi)完成缤言,將不再繼續(xù)向該隊(duì)列發(fā)送消息。

  • channel.basicAck(long,boolean); 確認(rèn)收到消息视事,消息將被隊(duì)列移除胆萧,false只確認(rèn)當(dāng)前consumer一個(gè)消息收到,true確認(rèn)所有consumer獲得的消息。

  • channel.basicNack(long,boolean,boolean); 確認(rèn)否定消息跌穗,第一個(gè)boolean表示一個(gè)consumer還是所有订晌,第二個(gè)boolean表示requeue是否重新回到隊(duì)列,true重新入隊(duì)蚌吸。

  • channel.basicReject(long,boolean); 拒絕消息锈拨,requeue=false 表示不再重新入隊(duì),如果配置了死信隊(duì)列則進(jìn)入死信隊(duì)列羹唠。

  • 當(dāng)消息回滾到消息隊(duì)列時(shí)奕枢,這條消息不會(huì)回到隊(duì)列尾部,而是仍是在隊(duì)列頭部佩微,這時(shí)消費(fèi)者會(huì)又接收到這條消息缝彬,如果想消息進(jìn)入隊(duì)尾,須確認(rèn)消息后再次發(fā)送消息哺眯。

channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
                    message.getMessageProperties().getReceivedRoutingKey(), 
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBody());
  • 延續(xù)上一章direct類型隊(duì)列為例谷浅,當(dāng)消息出現(xiàn)異常,判斷是否回滾過消息奶卓,如否則消息從新入隊(duì)一疯,反之拋棄消息。其中一個(gè)消費(fèi)者模擬一個(gè)異常夺姑。
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "direct.queue"), exchange = @Exchange(value = "direct.exchange"), key = "HelloWorld")})
    public void getDirectMessage(User user, Channel channel, Message message) throws IOException {
        try {
            // 模擬執(zhí)行任務(wù)
            Thread.sleep(1000);
            // 模擬異常
            String is = null;
            is.toString();
            // 確認(rèn)收到消息墩邀,false只確認(rèn)當(dāng)前consumer一個(gè)消息收到,true確認(rèn)所有consumer獲得的消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                System.out.println("消息已重復(fù)處理失敗,拒絕再次接收" + user.getName());
                // 拒絕消息瑟幕,requeue=false 表示不再重新入隊(duì)磕蒲,如果配置了死信隊(duì)列則進(jìn)入死信隊(duì)列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                System.out.println("消息即將再次返回隊(duì)列處理" + user.getName());
                // requeue為是否重新回到隊(duì)列,true重新入隊(duì)
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
            //e.printStackTrace();
        }
    }

@RabbitListener(queues = "direct.queue")
    public void getDirectMessageCopy(User user, Channel channel, Message message) throws IOException {
        try {
            // 模擬執(zhí)行任務(wù)
            Thread.sleep(1000);
            System.out.println("--jxb--MQConsumer--getDirectMessageCopy:" + user.toString());
            // 確認(rèn)收到消息只盹,false只確認(rèn)當(dāng)前consumer一個(gè)消息收到辣往,true確認(rèn)所有consumer獲得的消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                System.out.println("消息已重復(fù)處理失敗,拒絕再次接收!");
                // 拒絕消息殖卑,requeue=false 表示不再重新入隊(duì)站削,如果配置了死信隊(duì)列則進(jìn)入死信隊(duì)列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                System.out.println("消息即將再次返回隊(duì)列處理!");
                // requeue為是否重新回到隊(duì)列孵稽,true重新入隊(duì)
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
            e.printStackTrace();
        }
    }

從執(zhí)行結(jié)果來看许起,三條消息都調(diào)用了confirm方法,說明消息發(fā)送到了exchange菩鲜,且沒有調(diào)用return方法园细,說明消息成功到達(dá)相應(yīng)隊(duì)列。

getDirectMessageCopy方法成功消費(fèi)掉“張三”這條消息接校,由于getDirectMessage方法模擬異常猛频,所以第一次把“李四”從新入隊(duì),此時(shí)getDirectMessageCopy繼續(xù)消費(fèi)“王五”成功,getDirectMessage方法因李四已經(jīng)從新入隊(duì)過鹿寻,再次發(fā)生異常則拋棄消息睦柴。

輪詢分發(fā)

進(jìn)一步挖掘你會(huì)發(fā)現(xiàn),開始一共3條消息毡熏,有一條回滾消息總數(shù)變成了4條坦敌,每個(gè)消費(fèi)者消費(fèi)2條,所以兩個(gè)消費(fèi)者是輪詢分配的痢法。

  • 工作隊(duì)列有兩種工作方式:輪詢分發(fā)(默認(rèn))狱窘、公平分發(fā)即當(dāng)某個(gè)消費(fèi)者沒有消費(fèi)完成之前不用再分發(fā)消息。
  • 修改配置文件
# 消費(fèi)者每次從隊(duì)列獲取的消息數(shù)量疯暑。此屬性當(dāng)不設(shè)置時(shí)為:輪詢分發(fā)训柴,設(shè)置為1為:公平分發(fā)
spring.rabbitmq.listener.simple.prefetch=1

將第一個(gè)消費(fèi)者模擬執(zhí)行5秒哑舒,然后向數(shù)據(jù)庫(kù)增加一條數(shù)據(jù)妇拯,執(zhí)行結(jié)果為:

公平分發(fā)

可以看到,getDirectMessageCopy執(zhí)行了4次洗鸵,getDirectMessage執(zhí)行了1次越锈,根據(jù)他們的消費(fèi)能力來公平分發(fā)消息。


如果可以膘滨,我想回到10年前...

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末甘凭,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子火邓,更是在濱河造成了極大的恐慌丹弱,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,591評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件铲咨,死亡現(xiàn)場(chǎng)離奇詭異躲胳,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)纤勒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門坯苹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人摇天,你說我怎么就攤上這事粹湃。” “怎么了泉坐?”我有些...
    開封第一講書人閱讀 162,823評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵为鳄,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我腕让,道長(zhǎng)孤钦,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,204評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮司训,結(jié)果婚禮上构捡,老公的妹妹穿的比我還像新娘。我一直安慰自己壳猜,他們只是感情好勾徽,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,228評(píng)論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著统扳,像睡著了一般喘帚。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上咒钟,一...
    開封第一講書人閱讀 51,190評(píng)論 1 299
  • 那天吹由,我揣著相機(jī)與錄音,去河邊找鬼朱嘴。 笑死倾鲫,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的萍嬉。 我是一名探鬼主播乌昔,決...
    沈念sama閱讀 40,078評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼壤追!你這毒婦竟也來了磕道?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,923評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤行冰,失蹤者是張志新(化名)和其女友劉穎溺蕉,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體悼做,經(jīng)...
    沈念sama閱讀 45,334評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡疯特,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,550評(píng)論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了贿堰。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片辙芍。...
    茶點(diǎn)故事閱讀 39,727評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖羹与,靈堂內(nèi)的尸體忽然破棺而出故硅,到底是詐尸還是另有隱情,我是刑警寧澤纵搁,帶...
    沈念sama閱讀 35,428評(píng)論 5 343
  • 正文 年R本政府宣布吃衅,位于F島的核電站,受9級(jí)特大地震影響腾誉,放射性物質(zhì)發(fā)生泄漏徘层。R本人自食惡果不足惜峻呕,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,022評(píng)論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望趣效。 院中可真熱鬧瘦癌,春花似錦、人聲如沸跷敬。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,672評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽西傀。三九已至斤寇,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間拥褂,已是汗流浹背娘锁。 一陣腳步聲響...
    開封第一講書人閱讀 32,826評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留饺鹃,地道東北人莫秆。 一個(gè)月前我還...
    沈念sama閱讀 47,734評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像尤慰,于是被迫代替她去往敵國(guó)和親馏锡。 傳聞我的和親對(duì)象是個(gè)殘疾皇子雷蹂,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,619評(píng)論 2 354