springboot rabbitmq高可用消息確認消費實戰(zhàn)

RabbitMQ的高可用主要體現(xiàn)在消息的發(fā)送专缠、傳輸和接收的過程中同廉,可以保證消息成功發(fā)送、不會丟失瞎惫,以及被確認消費/不重復(fù)消費。

  • 對于消息是否發(fā)送成功钱豁,主要是針對生產(chǎn)者端的消息生產(chǎn)確認機制;
  • 對于消息不會丟失疯汁,主要是rabbitmq消息持久化機制牲尺;
  • 對于消息確認消費/不重復(fù)消費,主要是針對消費者端對消息的確認消費機制涛目。
一秸谢、消息生產(chǎn)確認機制

對于消息是否發(fā)送成功,在rabbitmq自定義操作組件中可以統(tǒng)一設(shè)置消息生產(chǎn)確認相關(guān)邏輯rabbitTemplate.setConfirmCallback和rabbitTemplate.setReturnCallback霹肝。

@Slf4j
@Configuration
public class RabbitmqConfig {
    //自定義配置RabbitMQ發(fā)送消息的操作組件RabbitTemplate
    @Bean
    public RabbitTemplate rabbitTemplate(){
        //設(shè)置“發(fā)送消息后進行確認”
        connectionFactory.setPublisherConfirms(true);
        //設(shè)置“發(fā)送消息后返回確認信息”
        connectionFactory.setPublisherReturns(true);
        //構(gòu)造發(fā)送消息組件實例對象
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        //發(fā)送消息后估蹄,如果發(fā)送成功,則輸出“消息發(fā)送成功”的反饋信息
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息發(fā)送成功:correlationData({}),ack({}),cause({})", correlationData,ack,cause));
        //發(fā)送消息后沫换,如果發(fā)送失敗臭蚁,則輸出“消息發(fā)送失敗-消息丟失”的反饋信息
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message));
        //定義消息傳輸?shù)母袷綖镴SON字符串格式
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        //最終返回RabbitMQ的操作組件實例RabbitTemplate
        return rabbitTemplate;
    }
}
二、消息持久化
  1. 在創(chuàng)建交換機和隊列的時候讯赏,有個durable的參數(shù)垮兑,即是否持久化,如果設(shè)置為true漱挎,當(dāng)rabbitmq服務(wù)器重啟的時候系枪,創(chuàng)建的交換機和隊列均還存在著,不會丟失磕谅;
  2. 在發(fā)送消息的時候可以選擇為該消息設(shè)置持久化私爷,即消息體Message的deliveryMode設(shè)置為MessageDeliveryMode.PERSISTENT持久化雾棺,當(dāng)消息來不及消費rabbitmq服務(wù)器重啟,那么消息依舊存在衬浑,如果將所有消息都設(shè)置持久化捌浩,那么會影響性能,內(nèi)存和磁盤的讀寫速度差異很大工秩。
三尸饺、消息確認消費機制
  • 如何保證消息能夠被準(zhǔn)備消費、不重復(fù)消費助币,RabbitMQ提供了消息確認機制浪听,即ACK模式。RabbitMQ的消息確認機制有3種奠支,分別是NONE(無須確認)馋辈、AUTO(自動確認)和MANUAL(手動確認)。

  • 無須確認流程圖如下圖所示倍谜,對于該模式,消息是否消費成功生產(chǎn)者端是不知道的叉抡,存在可能重復(fù)消費/消息消費失敗的情況:


    無需確認.jpeg
  • 代碼目錄如圖所示尔崔,演示自動確認和手動確認:


    自動確認和手動確認.png

    對于設(shè)置ACK模式,可以在yaml配置文件中設(shè)置spring.rabbitmq.listener.simple.acknowledge-mode: xxx褥民,也可以在聲明的監(jiān)聽器Bean中設(shè)置季春,用簡單監(jiān)聽器SimpleRabbitListenerContainerFactory即可:

@Slf4j
@Configuration
public class RabbitmqConfig {
    /**
     * 確認消費模式為自動確認機制-AUTO,采用直連傳輸directExchange消息模型
     */
    @Bean
    public SimpleRabbitListenerContainerFactory singleListenerContainerAuto(){
        //定義消息監(jiān)聽器所在的容器工廠
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //設(shè)置容器工廠所用的實例
        factory.setConnectionFactory(connectionFactory);
        //設(shè)置消息在傳輸中的格式,在這里采用JSON的格式進行傳輸
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //設(shè)置并發(fā)消費者實例的初始數(shù)量消返。在這里為1個
        factory.setConcurrentConsumers(1);
        //設(shè)置并發(fā)消費者實例的最大數(shù)量载弄。在這里為1個
        factory.setMaxConcurrentConsumers(1);
        //設(shè)置并發(fā)消費者實例中每個實例拉取的消息數(shù)量-在這里為1個
        factory.setPrefetchCount(1);
        //確認消費模式為自動確認機制
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }

    /**
     * 確認消費模式為手動確認機制-MANUAL,采用直連傳輸directExchange消息模型
     */
    @Bean
    public SimpleRabbitListenerContainerFactory singleListenerContainerManual(){
        //定義消息監(jiān)聽器所在的容器工廠
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //設(shè)置容器工廠所用的實例
        factory.setConnectionFactory(connectionFactory);
        //設(shè)置消息在傳輸中的格式,在這里采用JSON的格式進行傳輸
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //設(shè)置并發(fā)消費者實例的初始數(shù)量撵颊。在這里為1個
        factory.setConcurrentConsumers(1);
        //設(shè)置并發(fā)消費者實例的最大數(shù)量宇攻。在這里為1個
        factory.setMaxConcurrentConsumers(1);
        //設(shè)置并發(fā)消費者實例中每個實例拉取的消息數(shù)量-在這里為1個
        factory.setPrefetchCount(1);
        //確認消費模式為自動確認機制
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
}

(1)自動確認模式
自動確認模式流程圖如圖所示,RabbitMQ內(nèi)置組件通知生產(chǎn)者端倡勇,當(dāng)消息成功消費/消費失敗都會通知:


auto確認.jpeg

對于自動確認模式逞刷,在消費者端可以看到和普通的消息隊列沒什么區(qū)別,而手工確認消費模式則比較靈活妻熊。

  • 確認消費模式為自動確認機制-AUTO,采用直連傳輸directExchange消息模型-生產(chǎn)者
@Slf4j
@Component
public class AutoAckPublisher {
    //定義RabbitMQ消息操作組件RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 發(fā)送消息
     */
    public void sendMsg(Order order) {
        try {
            //設(shè)置交換機
            rabbitTemplate.setExchange(RabbitMqConstants.AUTO_ACKNOWLEDGE_EXCHANGE);
            //設(shè)置路由
            rabbitTemplate.setRoutingKey(RabbitMqConstants.AUTO_ACKNOWLEDGE_ROUTING_KEY);
            //發(fā)送消息
            rabbitTemplate.convertAndSend(order);
            log.info("確認消費模式為自動確認機制-消息模型DirectExchange-one-生產(chǎn)者-發(fā)送消息:{} ",order);
        }catch (Exception e){
            log.error("確認消費模式為自動確認機制-消息模型DirectExchange-one-生產(chǎn)者-發(fā)送消息:{},發(fā)生異常:{} ",order, e);
        }
    }
}
  • 確認消費模式為自動確認機制-AUTO,采用直連傳輸directExchange消息模型-消費者
@Slf4j
@Component
public class AutoAckConsumer {

    @RabbitListener(queues = RabbitMqConstants.AUTO_ACKNOWLEDGE_QUEUE, containerFactory = "singleListenerContainerAuto")
    public void consumeMsg(Order order) {
        try {
            log.info("基于AUTO的自動確認消費模式-消費者監(jiān)聽消費消息-內(nèi)容為:{} ",order);
        }catch (Exception e){
            log.error("基于AUTO的自動確認消費模式-消費者監(jiān)聽消費消息:{},發(fā)生異常:", order, e);
        }
    }
}

(2)手工確認流程圖如圖所示夸浅,當(dāng)消息處理過程中出現(xiàn)異常的時候,需要手工確認處理該異常消息扔役,該消息是否重新歸入隊列等處理帆喇。


manual確認.jpeg
  • 確認消費模式為手動確認機制-MANUAL,采用直連傳輸directExchange消息模型-生產(chǎn)者
@Slf4j
@Component
public class ManualAckPublisher {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 發(fā)送消息
     */
    public void sendMsg(Order order) {
        try {
            rabbitTemplate.setExchange(RabbitMqConstants.MANUAL_ACKNOWLEDGE_EXCHANGE);
            rabbitTemplate.setRoutingKey(RabbitMqConstants.MANUAL_ACKNOWLEDGE_ROUTING_KEY);
            rabbitTemplate.convertAndSend(order);
            log.info("確認消費模式為手動確認機制-消息模型DirectExchange-one-生產(chǎn)者-發(fā)送消息:{} ", order);
        }catch (Exception e){
            log.error("確認消費模式為手動確認機制-消息模型DirectExchange-one-生產(chǎn)者-發(fā)送消息:{},發(fā)生異常:{} ", order, e);
        }
    }
}
  • 確認消費模式為手動確認機制-MANUAL,采用直連傳輸directExchange消息模型-消費者
    在監(jiān)聽到消息并且消息成功處理完之后,通過basicAck來確認消息成功消費亿胸,當(dāng)捕獲到異常的時候即該消息處理失敗的時候坯钦,有兩種方式预皇,一種是拒絕該消息并且消息重新歸入隊列中,另一種是拒絕該消息并且丟棄掉葫笼,一般情況下重新歸入隊列深啤,還是會出現(xiàn)異常沒法消費掉,除非把異常修復(fù)了才行路星,并且在未修復(fù)該異常的情況下溯街,后面的消息會被堵塞住沒辦法消費,將消息重新歸入隊列中或許不是一個好的選擇洋丐。
    一般情況下可以保留該消息的信息然后把消息丟棄掉呈昔,最后重新發(fā)送消息;或者把該消息丟入到死信隊列中友绝,不對該死信隊列進行監(jiān)聽堤尾,最后在rabbitmq管理后臺取出該消息/重新監(jiān)聽該消息重新發(fā)送到原先隊列進行消費,修復(fù)好異常情況再發(fā)送消息進行處理迁客,保證消息成功消費郭宝。
@Slf4j
@Component
public class ManualAckConsumer {
    @RabbitListener(queues = RabbitMqConstants.MANUAL_ACKNOWLEDGE_QUEUE, containerFactory = "singleListenerContainerManual")
    public void consumeMsg(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long tag) throws IOException {
        try {
            log.info("基于MANUAL的手工確認消費模式-消費者監(jiān)聽消費消息,消息投遞標(biāo)記:{},內(nèi)容為:{} ", tag, order);
            //拋異常,歸入使得消息重新歸入隊列
            //int num = 1 / 0;
            //執(zhí)行完業(yè)務(wù)邏輯后,手動進行確認消費掷漱,其中第一個參數(shù)為:消息的分發(fā)標(biāo)識(全局唯一);第二個參數(shù):是否允許批量確認消費
            channel.basicAck(tag, false);
        }catch (Exception e){
            //第二個參數(shù)reueue重新歸入隊列,true的話會重新歸入隊列,需要人為地處理此次異常消息,重新歸入隊列也會繼續(xù)異常
            channel.basicReject(tag, true);
            log.error("基于MANUAL的手工確認消費模式-消費者監(jiān)聽消費消息:{},消息投遞標(biāo)簽:{},發(fā)生異常:", order, tag, e);
        }
    }
}

出現(xiàn)異常重新歸入隊列的情況粘室,如圖所示,顯示有unacked 1條消息卜范,下面有g(shù)et messages衔统,當(dāng)點擊的時候發(fā)現(xiàn)提示queue is empty隊列為空,確實準(zhǔn)備消費的消息為0條海雪,正在消費的消息一直是unacked狀態(tài)無法取出锦爵。


unacked消息.png
取不出來.png

這個時候只能停止監(jiān)聽重啟項目,這個在線上不是好的辦法奥裸,停止監(jiān)聽之后消息變?yōu)閞eady狀態(tài)险掀,這個時候可以取出,可以看到提示“取出消息是毀滅性的操作”刺彩。


ready狀態(tài)消息.png
取出消息.png

四種取出消息的模式迷郑,分別為:不確認消息重新歸入隊列、確認消息不重新歸入隊列创倔、拒絕該消息重新歸入隊列嗡害、拒絕該消息不重新歸入隊列。當(dāng)取出消息可以看到消息的內(nèi)容畦攘。


取出消息模式.png

消息內(nèi)容.png

對于確認消息消費霸妹,避免消息異常出現(xiàn)上述情況,可以用死信隊列來處理知押,捕獲異常消息叹螟,發(fā)送消息到死信隊列鹃骂,不監(jiān)聽該隊列的消息,最后修復(fù)異常重新發(fā)送消息到原先隊列進行消費罢绽,詳情請看下篇博文

參考資料:
《分布式中間件實戰(zhàn)》
《rabbitmq實戰(zhàn)指南》

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末畏线,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子良价,更是在濱河造成了極大的恐慌寝殴,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,000評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件明垢,死亡現(xiàn)場離奇詭異蚣常,居然都是意外死亡,警方通過查閱死者的電腦和手機痊银,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,745評論 3 399
  • 文/潘曉璐 我一進店門抵蚊,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人溯革,你說我怎么就攤上這事贞绳。” “怎么了致稀?”我有些...
    開封第一講書人閱讀 168,561評論 0 360
  • 文/不壞的土叔 我叫張陵熔酷,是天一觀的道長。 經(jīng)常有香客問我豺裆,道長,這世上最難降的妖魔是什么号显? 我笑而不...
    開封第一講書人閱讀 59,782評論 1 298
  • 正文 為了忘掉前任臭猜,我火速辦了婚禮,結(jié)果婚禮上押蚤,老公的妹妹穿的比我還像新娘蔑歌。我一直安慰自己,他們只是感情好揽碘,可當(dāng)我...
    茶點故事閱讀 68,798評論 6 397
  • 文/花漫 我一把揭開白布次屠。 她就那樣靜靜地躺著,像睡著了一般雳刺。 火紅的嫁衣襯著肌膚如雪劫灶。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,394評論 1 310
  • 那天掖桦,我揣著相機與錄音本昏,去河邊找鬼。 笑死枪汪,一個胖子當(dāng)著我的面吹牛涌穆,可吹牛的內(nèi)容都是我干的怔昨。 我是一名探鬼主播,決...
    沈念sama閱讀 40,952評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼宿稀,長吁一口氣:“原來是場噩夢啊……” “哼趁舀!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起祝沸,我...
    開封第一講書人閱讀 39,852評論 0 276
  • 序言:老撾萬榮一對情侶失蹤矮烹,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后奋隶,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體擂送,經(jīng)...
    沈念sama閱讀 46,409評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,483評論 3 341
  • 正文 我和宋清朗相戀三年唯欣,在試婚紗的時候發(fā)現(xiàn)自己被綠了嘹吨。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,615評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡境氢,死狀恐怖蟀拷,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情萍聊,我是刑警寧澤问芬,帶...
    沈念sama閱讀 36,303評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站寿桨,受9級特大地震影響此衅,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜亭螟,卻給世界環(huán)境...
    茶點故事閱讀 41,979評論 3 334
  • 文/蒙蒙 一挡鞍、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧预烙,春花似錦墨微、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,470評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至谴分,卻和暖如春锈麸,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背狸剃。 一陣腳步聲響...
    開封第一講書人閱讀 33,571評論 1 272
  • 我被黑心中介騙來泰國打工掐隐, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個月前我還...
    沈念sama閱讀 49,041評論 3 377
  • 正文 我出身青樓虑省,卻偏偏與公主長得像硝枉,于是被迫代替她去往敵國和親投慈。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,630評論 2 359

推薦閱讀更多精彩內(nèi)容