《RabbitMQ》如何保證消息的可靠性

一條消費成功被消費經歷了生產者->MQ->消費者,因此在這三個步驟中都有可能造成消息丟失。

一 消息生產者沒有把消息成功發(fā)送到MQ

1.1 事務機制

AMQP協(xié)議提供了事務機制邑狸,在投遞消息時開啟事務支持,如果消息投遞失敗闸翅,則回滾事務舟铜。

自定義事務管理器

@Configuration
public class RabbitTranscation {
    
    @Bean
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){
        return new RabbitTransactionManager(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        return new RabbitTemplate(connectionFactory);
    }
}

修改yml

spring:
  rabbitmq:
    # 消息在未被隊列收到的情況下返回
    publisher-returns: true

開啟事務支持

rabbitTemplate.setChannelTransacted(true);

消息未接收時調用ReturnCallback

rabbitTemplate.setMandatory(true);

生產者投遞消息

@Service
public class ProviderTranscation implements RabbitTemplate.ReturnCallback {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        // 設置channel開啟事務
        rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.setReturnCallback(this);
    }
    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("這條消息發(fā)送失敗了"+message+",請?zhí)幚?);
    }
    
    @Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
    public void publishMessage(String message) throws Exception {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.convertAndSend("javatrip",message);
    }
}

但是,很少有人這么干仇穗,因為這是同步操作流部,一條消息發(fā)送之后會使發(fā)送端阻塞,以等待RabbitMQ-Server的回應纹坐,之后才能繼續(xù)發(fā)送下一條消息枝冀,生產者生產消息的吞吐量和性能都會大大降低。

1.2 發(fā)送方確認機制

發(fā)送消息時將信道設置為confirm模式耘子,消息進入該信道后果漾,都會被指派給一個唯一ID,一旦消息被投遞到所匹配的隊列后谷誓,RabbitMQ就會發(fā)送給生產者一個確認绒障。

開啟消息確認機制

spring:
  rabbitmq:
    # 消息在未被隊列收到的情況下返回
    publisher-returns: true
    # 開啟消息確認機制
    publisher-confirm-type: correlated

消息未接收時調用ReturnCallback

rabbitTemplate.setMandatory(true);

生產者投遞消息

@Service
public class ConfirmProvider implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setReturnCallback(this);
        rabbitTemplate.setConfirmCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            System.out.println("確認了這條消息:"+correlationData);
        }else{
            System.out.println("確認失敗了:"+correlationData+";出現(xiàn)異常:"+cause);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("這條消息發(fā)送失敗了"+message+",請?zhí)幚?);
    }

    public void publisMessage(String message){
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.convertAndSend("javatrip",message);
    }
}

如果消息確認失敗后捍歪,我們可以進行消息補償户辱,也就是消息的重試機制鸵钝。當未收到確認信息時進行消息的重新投遞。設置如下配置即可完成焕妙。

spring:
  rabbitmq:
    # 支持消息發(fā)送失敗后重返隊列
    publisher-returns: true
    # 開啟消息確認機制
    publisher-confirm-type: correlated
    listener:
      simple:
        retry:
          # 開啟重試
          enabled: true
          # 最大重試次數
          max-attempts: 5
          # 重試時間間隔
          initial-interval: 3000

二 消息發(fā)送到MQ后蒋伦,MQ宕機導致內存中的消息丟失

消息在MQ中有可能發(fā)生丟失,這時候我們就需要將隊列和消息都進行持久化焚鹊。

@Queue注解為我們提供了隊列相關的一些屬性痕届,具體如下:

  1. name: 隊列的名稱;
  2. durable: 是否持久化末患;
  3. exclusive: 是否獨享研叫、排外的;
  4. autoDelete: 是否自動刪除璧针;
  5. arguments:隊列的其他屬性參數嚷炉,有如下可選項,可參看圖2的arguments:
    • x-message-ttl:消息的過期時間探橱,單位:毫秒申屹;
    • x-expires:隊列過期時間,隊列在多長時間未被訪問將被刪除隧膏,單位:毫秒哗讥;
    • x-max-length:隊列最大長度,超過該最大值胞枕,則將從隊列頭部開始刪除消息杆煞;
    • x-max-length-bytes:隊列消息內容占用最大空間,受限于內存大小腐泻,超過該閾值則從隊列頭部開始刪除消息决乎;
    • x-overflow:設置隊列溢出行為。這決定了當達到隊列的最大長度時消息會發(fā)生什么派桩。有效值是drop-head构诚、reject-publish或reject-publish-dlx。仲裁隊列類型僅支持drop-head铆惑;
    • x-dead-letter-exchange:死信交換器名稱唤反,過期或被刪除(因隊列長度超長或因空間超出閾值)的消息可指定發(fā)送到該交換器中;
    • x-dead-letter-routing-key:死信消息路由鍵鸭津,在消息發(fā)送到死信交換器時會使用該路由鍵,如果不設置肠缨,則使用消息的原來的路由鍵值
    • x-single-active-consumer:表示隊列是否是單一活動消費者逆趋,true時,注冊的消費組內只有一個消費者消費消息晒奕,其他被忽略闻书,false時消息循環(huán)分發(fā)給所有消費者(默認false)
    • x-max-priority:隊列要支持的最大優(yōu)先級數;如果未設置名斟,隊列將不支持消息優(yōu)先級;
    • x-queue-mode(Lazy mode):將隊列設置為延遲模式魄眉,在磁盤上保留盡可能多的消息砰盐,以減少RAM的使用;如果未設置,隊列將保留內存緩存以盡可能快地傳遞消息坑律;
    • x-queue-master-locator:在集群模式下設置鏡像隊列的主節(jié)點信息岩梳。

持久化隊列

創(chuàng)建隊列的時候將持久化屬性durable設置為true,同時要將autoDelete設置為false

@Queue(value = "javatrip",durable = "false",autoDelete = "false")

持久化消息

發(fā)送消息的時候將消息的deliveryMode設置為2晃择,在Spring Boot中消息默認就是持久化的冀值。

三 消費者消費消息的時候,未消費完畢就出現(xiàn)了異常

消費者剛消費了消息宫屠,還沒有處理業(yè)務列疗,結果發(fā)生異常。這時候就需要關閉自動確認浪蹂,改為手動確認消息抵栈。

修改yml為手動簽收模式

spring:
  rabbitmq:
    listener:
      simple:
        # 手動簽收模式
        acknowledge-mode: manual
        # 每次簽收一條消息
        prefetch: 1

消費者手動簽收

@Component
@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))
public class Consumer {

    @RabbitHandler
    public void receive(String message, @Headers Map<String,Object> headers, Channel channel) throws Exception{

        System.out.println(message);
        // 唯一的消息ID
        Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        // 確認該條消息
        if(...){
            channel.basicAck(deliverTag,false);
        }else{
            // 消費失敗,消息重返隊列
            channel.basicNack(deliverTag,false,true);
        }
      
    }
}

四 總結

消息丟失的原因坤次?

生產者古劲、MQ、消費者都有可能造成消息丟失

如何保證消息的可靠性浙踢?

  • 發(fā)送方采取發(fā)送者確認模式
  • MQ進行隊列及消息的持久化
  • 消費者消費成功后手動確認消息


    image
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末绢慢,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子洛波,更是在濱河造成了極大的恐慌胰舆,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,858評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蹬挤,死亡現(xiàn)場離奇詭異缚窿,居然都是意外死亡,警方通過查閱死者的電腦和手機焰扳,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,372評論 3 395
  • 文/潘曉璐 我一進店門倦零,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人吨悍,你說我怎么就攤上這事扫茅。” “怎么了育瓜?”我有些...
    開封第一講書人閱讀 165,282評論 0 356
  • 文/不壞的土叔 我叫張陵葫隙,是天一觀的道長。 經常有香客問我躏仇,道長恋脚,這世上最難降的妖魔是什么腺办? 我笑而不...
    開封第一講書人閱讀 58,842評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮糟描,結果婚禮上怀喉,老公的妹妹穿的比我還像新娘。我一直安慰自己船响,他們只是感情好躬拢,可當我...
    茶點故事閱讀 67,857評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著灿意,像睡著了一般估灿。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上缤剧,一...
    開封第一講書人閱讀 51,679評論 1 305
  • 那天馅袁,我揣著相機與錄音,去河邊找鬼荒辕。 笑死汗销,一個胖子當著我的面吹牛,可吹牛的內容都是我干的抵窒。 我是一名探鬼主播弛针,決...
    沈念sama閱讀 40,406評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼李皇!你這毒婦竟也來了削茁?” 一聲冷哼從身側響起蹬耘,我...
    開封第一講書人閱讀 39,311評論 0 276
  • 序言:老撾萬榮一對情侶失蹤兄淫,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后悲幅,有當地人在樹林里發(fā)現(xiàn)了一具尸體卓囚,經...
    沈念sama閱讀 45,767評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡瘾杭,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,945評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了哪亿。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片粥烁。...
    茶點故事閱讀 40,090評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖蝇棉,靈堂內的尸體忽然破棺而出讨阻,到底是詐尸還是另有隱情,我是刑警寧澤篡殷,帶...
    沈念sama閱讀 35,785評論 5 346
  • 正文 年R本政府宣布变勇,位于F島的核電站,受9級特大地震影響,放射性物質發(fā)生泄漏搀绣。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,420評論 3 331
  • 文/蒙蒙 一戳气、第九天 我趴在偏房一處隱蔽的房頂上張望链患。 院中可真熱鬧,春花似錦瓶您、人聲如沸麻捻。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,988評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽贸毕。三九已至,卻和暖如春夜赵,著一層夾襖步出監(jiān)牢的瞬間明棍,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,101評論 1 271
  • 我被黑心中介騙來泰國打工寇僧, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留摊腋,地道東北人。 一個月前我還...
    沈念sama閱讀 48,298評論 3 372
  • 正文 我出身青樓嘁傀,卻偏偏與公主長得像兴蒸,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子细办,可洞房花燭夜當晚...
    茶點故事閱讀 45,033評論 2 355