三端可靠
- 發(fā)送方和mq保證消息送達到mq
- mq保證保存的消息不丟失
- 消費方和mq一起保證消息被成功消費
發(fā)送方和mq保證消息送達到mq
方案一挤渔、rabbitmq如果是用spring boot提供的模版接口發(fā)送 需要調(diào)用rabbitTemplate.convertSendAndReceive()方法發(fā)送 這個是當消息成功到隊列了才會返回結(jié)果 如果失敗則會拋異常 不過這就會導致等待時間比較長 適合高可靠場景
不過一般在業(yè)務開發(fā)都是完成業(yè)務以后再發(fā)消息 比如插入訂單表一筆訂單 發(fā)送訂單創(chuàng)建的消息 這兩步是需要保證原子性的 要么都成功要么都失敗 rabbitmq支持事務消息 不過如果出現(xiàn)下面情況
- 開啟事務
- 插入訂單表
- 發(fā)送mq消息
- 提交數(shù)據(jù)庫事務成功
- 提交mq事務失敗
- 消息丟失
所以如果是用rabbitmq的事務消息來做 其實在極端情況是會丟失消息的 在這里可以采用一個異步命令組件提供的方案https://github.com/bojiw/asyncmd
- 開啟事務
- 插入訂單表
- 插入異步命令表
- 提交數(shù)據(jù)庫事務
- 線程掃描異步命令表撈取消息
- 通過rabbitTemplate.convertSendAndReceive()方法發(fā)送
- 如果失敗 則重試 并且報警
方案二卧波、如采用rabbitTemplate.convertAndSend和confirms(消費回調(diào))加Return(錯誤回調(diào))模式
- convertAndSend 發(fā)送到mq 立刻返回 不管交換機是否成功處理 所以并發(fā)會高
- confirms(消費回調(diào)) 實現(xiàn)接口ConfirmCallback 消息成功發(fā)送到rabbitmq交換機上則會回調(diào)接口 入?yún)ck為true代表成功發(fā)送到交換機 false代表異常
- Return(錯誤回調(diào)) 實現(xiàn)接口ReturnCallback 消息從交換機到隊列 成功不會回調(diào) 如果發(fā)送到隊列失敗 則會調(diào)用回調(diào)
上面這種方式如果在回調(diào)中處理消息發(fā)送失敗的邏輯時出現(xiàn)異陈饽校或者應用服務器掛了 則會導致消息丟失 因為只會回調(diào)一次
這種情況可以采用加一張消息表 先插入消息表 然后掃表發(fā)送消息 confirms回調(diào)成功 則更新表狀態(tài) 如果回調(diào)的時候異常 則消息表會重新發(fā)送 這種就會出現(xiàn)消息重發(fā)的情況 不過一般消息消費者都要保證冪等 所以這個問題不大 不過如果出現(xiàn)以下情況
- 數(shù)據(jù)庫有兩個字段 confirms默認0 和 return 默認0
- 回調(diào)成功confirms=1 回調(diào)失敗confirms=2 錯誤回調(diào)return=2
- 當回調(diào)成功 confirms=1 錯誤回調(diào)處理失敗沒有成功更新表 則return還是0
- 這個時候你掃表就不確定需不需要重發(fā)消息 因為如果消息成功到隊列 表的狀態(tài)也是confirms=1 return=0
- 無法對發(fā)送隊列成功和發(fā)送隊列失敗可在回調(diào)異常這兩種情況做區(qū)分
這里邏輯就會出問題 所以只能處理消息成功到交換機 是否到隊列則不管 因為一般都是成功的 除了極端情況 比如隊列被人誤刪除
方案二和方案一其實從整個流程來講 發(fā)送消息速度其實差不多的 不過可靠性還是方案一高一點
mq保證保存的消息不丟失
消息、交換機夭谤、隊列都需要設置持久化
消費方和mq一起保證消息被成功消費
消費者開啟手動確認
acknowledge="manual"
在業(yè)務代碼里 成功處理業(yè)務 才返回給rabbitmq消費成功的確認
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
如果業(yè)務處理失敗則重新放到隊列重新消費
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
由消費者 只有業(yè)務成功處理才進行ack 記得需要做好冪等
不過這里rabbitmq在重試這塊沒有做好 如果不確定會一直重試 如果因為依賴的一個系統(tǒng)掛了 要一個小時以后才會啟動成功 在這一個小時里會一直重試 這就會對rabbitmq和消費者帶來一定的壓力 這塊也可以采用異步命令組件提供的方案https://github.com/bojiw/asyncmd
- 接收消息
- 把消息插入異步命令
- 返回rabbitmq成功
- 異步組件執(zhí)行業(yè)務邏輯
- 調(diào)用接口失敗重試
- 重試一定次數(shù)則不重試 由人工進行處理 也可以把重試間隔設置的長一點 比如前三次每隔1s重試 第四次隔一個小時重試