架構師方案-業(yè)務角度下保證消息的可靠性的投遞

前言:

消息隊列的主要作用是實現(xiàn)系統(tǒng)間的解耦缓呛、異步處理和削峰填谷扮授。 由于消息隊列的異步使用特性闸天,天然的會存在一定概率消息丟失的情況犬绒。


image.png

方案1:消息落庫

消息落庫重發(fā)是基于MQ的confirm機制,在消息發(fā)送失敗后自動重發(fā)挂签。

image.png
  • Step 1: 首先把消息信息(業(yè)務數(shù)據(jù))存儲到數(shù)據(jù)庫中疤祭,緊接著,我們再把這個消息記錄也存儲到一張消息記錄表里(或者另外一個同源數(shù)據(jù)庫的消息記錄表)

  • Step 2:發(fā)送消息到MQ Broker節(jié)點(采用confirm方式發(fā)送饵婆,會有異步的返回結果)

  • Step 3勺馆、4:生產者端接受MQ Broker節(jié)點返回的Confirm確認消息結果,然后進行更新消息記錄表里的消息狀態(tài)啦辐。比如默認Status = 0 當收到消息確認成功后谓传,更新為1即可!

  • Step 5:但是在消息確認這個過程中可能由于網絡閃斷芹关、MQ Broker端異常等原因導致 回送消息失敗或者異常续挟。這個時候就需要發(fā)送方(生產者)對消息進行可靠性投遞了,保障消息不丟失侥衬,100%的投遞成功J觥(有一種極限情況是閃斷,Broker返回的成功確認消息轴总,但是生產端由于網絡閃斷沒收到直颅,這個時候重新投遞可能會造成消息重復,需要消費端去做冪等處理)所以我們需要有一個定時任務怀樟,(比如每5分鐘拉取一下處于中間狀態(tài)的消息功偿,當然這個消息可以設置一個超時時間,比如超過1分鐘 Status = 0 往堡,也就說明了1分鐘這個時間窗口內械荷,我們的消息沒有被確認,那么會被定時任務拉取出來)

  • Step 6:接下來我們把中間狀態(tài)的消息進行重新投遞 retry send虑灰,繼續(xù)發(fā)送消息到MQ 吨瞎,當然也可能有多種原因導致發(fā)送失敗

  • Step 7:我們可以采用設置最大努力嘗試次數(shù),比如投遞了3次穆咐,還是失敗颤诀,那么我們可以將最終狀態(tài)設置為Status = 2 字旭,最后 交由人工解決處理此類問題(或者把消息轉儲到失敗表中)。

表結構和代碼示例


CREATE TABLE IF NOT EXISTS `message_log`
(
    `message_id`      varchar(30) NOT NULL COMMENT '消息唯一ID',
    `message`         varchar(1000)  DEFAULT '' COMMENT '消息內容',
    `business_id`     varchar(40) NOT NULL COMMENT '業(yè)務id崖叫,比如記錄訂單號',
    `try_count`       int(4)       DEFAULT '0' COMMENT '重試次數(shù)',
    `status`          tinyint(2)   DEFAULT '0' COMMENT ' 消息投遞狀態(tài)  0:投遞中 1:投遞成功   2:投遞失敗',
    `next_retry_time` datetime     DEFAULT CURRENT_TIMESTAMP COMMENT '下一次投遞時間',
    `create_time`     datetime     DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時間',
    `update_time`     datetime     DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后一次更新時間',
    PRIMARY KEY (`message_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8mb4;
創(chuàng)建訂單方法
@Service
@RequiredArgsConstructor
public class OrderService {
        
        private final OrderMapper orderMapper;
        
        private final MessageLogMapper messageLogMapper;

        private final RocketMQProducer rocketMQProducer;
        
        
        //創(chuàng)建訂單
        public void createOrder(Order order) {
                //插入業(yè)務數(shù)據(jù)
                orderMapper.insert(order);
                //插入消息記錄表數(shù)據(jù)
                MessageLog messageLog = new MessageLog();
                //消息唯一ID
                messageLog.setMessageId(messageId);
                //保存消息整體
                messageLog.setMessage(JSONObject.toJSONString(order));
                //設置消息狀態(tài)為0 表示發(fā)送中
                messageLog.setStatus(0);
                //設置下一次執(zhí)行時間
                messageLog.setNextRetryTime(nextRetryTime);
                messageLogMapper.insert(brokerMessageLog);
                
                //發(fā)送消息
                rocketMQProducer.sendOrder(order);

        }
}
消息生產者
@Component
public class RocketMQProducer {

    public void sendOrder(Order order) {
        //1.創(chuàng)建消息
        Message message = new Message("test_quick_topic",// 主題
                "TagA",// 標簽
                "KeyA",// 用戶自定義的key遗淳,唯一的標識
                FastJsonConvertUtil.convertObjectToJSON(order).getBytes()); //消息內容實體(byte[])

        try {
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
             
                    //如果confirm返回成功 則進行更新
                    messageLogMapper.changeMessageLogStatus();
                }

                @Override
                public void onException(Throwable e) {
                    e.printStackTrace();
                    //失敗則進行具體的后續(xù)操作:重試 或者補償?shù)仁侄?                   System.err.println("-----------異常處理-----------");
                }
            });
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
定時任務

@Component
public class RetryMessageTasker {

    @Scheduled(initialDelay = 5000, fixedDelay = 10000)
    public void reSend() {
        System.out.println("----------------定時任務開始----------------");
        //pull status = 0 and timeout message 
        List<MessageLog> list = getNeedReSendMsgList();
        for (MessageLog messageLog : list) {
            if (messageLog.getTryCount() > maxTryCount) {
                //update fail message 
                continue;
            }
        
            //更新try_count
            // resend
            try {
                sendOrder(getMessage());
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println("-----------異常處理-----------");
            }
            
        }

    }
}
該方案只能保證消息從生產者到MQ之間的可靠性投遞,解決辦法:

方式1. 在消息表中新增 消費成功狀態(tài),下游消費者變更消費狀態(tài)(要考慮多個業(yè)務消費的情況)
方式2. 使用業(yè)務正確性校驗平臺BCP檢查上下游業(yè)務數(shù)據(jù)是否一致,進行修復

方案2:二次確認檢測

二次確認檢測是基于延時投遞機制實現(xiàn)的心傀,主要目的是為了減少數(shù)據(jù)庫操作洲脂,提高并發(fā)量。

image.png
  • Step 1:先將業(yè)務數(shù)據(jù)進行入庫剧包,然后上游服務將消息M1發(fā)送出去

  • Step 2:在發(fā)送消息M1之后,緊接著生產端再次發(fā)送一條延遲消息(Second Send Delay Check)往果,即延遲檢查投遞消息M3

  • Step 3:消費端去監(jiān)聽指定隊列疆液,將收到的消息進行處理

  • Step 4:處理完成之后,發(fā)送一個confirm消息M2陕贮,也就是回送響應堕油,但是這里響應不是正常的ACK,而是重新生成一條消息肮之,投遞到MQ中

  • Step 5:下游Callback Check Service是一個單獨的服務掉缺,其實它扮演了方案一的存儲消息的DB角色,它通過MQ去監(jiān)聽下游服務發(fā)送的confirm消息M2戈擒,如果下游Callback Check Service收到下游服務的confirm消息M2眶明,那么就對消息做持久化存儲,即將消息持久化到DB中

  • Step 6:10分鐘之后MQ Server推送了延遲消息發(fā)送M3

  • Step 7:下游Callback Check Service收到延遲消息發(fā)送M3后筐高,Check消息后去檢查DB中是否存在消息M2搜囱,如果存在,則不需要做任何處理柑土,如果不存在或者消費失敗了蜀肘,那么下游Callback Check Service就需要主動發(fā)起RPC通信給上游服務,上游服務收到信息后就會重新查詢業(yè)務消息然后將消息M1發(fā)送出去

該方案能夠保證消息從生成者端到消費者的可靠性投遞稽屏,消費者都能消費到扮宠,生產者也就自然而然是可靠性的投遞。

方案對比

方案 優(yōu)點 缺點
消息落庫 實現(xiàn)簡單 發(fā)送消息前需要2次DB操作狐榔,影響并發(fā)性能
二次確認檢測 減少了數(shù)據(jù)庫操作坛增,提高并發(fā)量 不一定能保障百分百投遞成功,但是基本上可以保障大概99.9%的消息是OK的荒叼,有些特別極端的情況只能是使用定時任務去轿偎、BCP或人工去做補償了,

參考:

阿神-RabbitMQ消息可靠性投遞解決方案
美團業(yè)務正確性校驗平臺 BCP的設計與實踐

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末被廓,一起剝皮案震驚了整個濱河市坏晦,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌,老刑警劉巖昆婿,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件球碉,死亡現(xiàn)場離奇詭異,居然都是意外死亡仓蛆,警方通過查閱死者的電腦和手機睁冬,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來看疙,“玉大人豆拨,你說我怎么就攤上這事∧芮欤” “怎么了施禾?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長搁胆。 經常有香客問我弥搞,道長,這世上最難降的妖魔是什么渠旁? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任攀例,我火速辦了婚禮,結果婚禮上顾腊,老公的妹妹穿的比我還像新娘粤铭。我一直安慰自己,他們只是感情好投慈,可當我...
    茶點故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布承耿。 她就那樣靜靜地躺著,像睡著了一般伪煤。 火紅的嫁衣襯著肌膚如雪加袋。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天抱既,我揣著相機與錄音职烧,去河邊找鬼。 笑死防泵,一個胖子當著我的面吹牛蚀之,可吹牛的內容都是我干的。 我是一名探鬼主播捷泞,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼足删,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了锁右?” 一聲冷哼從身側響起失受,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤讶泰,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后拂到,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體痪署,經...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年兄旬,在試婚紗的時候發(fā)現(xiàn)自己被綠了狼犯。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡领铐,死狀恐怖,靈堂內的尸體忽然破棺而出绪撵,到底是詐尸還是另有隱情呐馆,我是刑警寧澤,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布莲兢,位于F島的核電站,受9級特大地震影響续膳,放射性物質發(fā)生泄漏改艇。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一坟岔、第九天 我趴在偏房一處隱蔽的房頂上張望谒兄。 院中可真熱鬧,春花似錦社付、人聲如沸承疲。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽燕鸽。三九已至,卻和暖如春啼辣,著一層夾襖步出監(jiān)牢的瞬間啊研,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工鸥拧, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留党远,地道東北人。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓富弦,卻偏偏與公主長得像沟娱,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子腕柜,可洞房花燭夜當晚...
    茶點故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內容