消息中間件實戰(zhàn)(下)

34.生產(chǎn)案例:從 RocketMQ 全鏈路分析一下為什么用戶支付后沒收到紅包基显?

有用戶反饋說,按照規(guī)則應(yīng)該是在支付之后可以拿到一個現(xiàn)金紅包的学歧,但是他在支付了一個訂單之后挣跋,卻并沒有收到這個現(xiàn)金紅包谴仙,于是就反饋給了客服。
經(jīng)過一通排查逆甜,從訂單系統(tǒng)和紅包系統(tǒng)當天那個時間段的日志來看教藻,居然只看到了訂單系統(tǒng)有推送消息到RocketMQ的日志,但是并沒有看到紅包系統(tǒng)從RocketMQ中接收消息以及發(fā)現(xiàn)金紅包的日志谜叹。
可能原因一:訂單系統(tǒng)推送消息到MQ的過程會丟失消息

網(wǎng)絡(luò)故障導(dǎo)致訂單系統(tǒng)推送消息失敗.png

可能原因二:Broker丟失消息:消息寫入os cache但沒有寫入磁盤
image.png

可能原因三:Broker丟失消息:消息寫入磁盤但磁盤壞了
磁盤故障.png

可能原因四:紅包系統(tǒng)獲取到消息后丟失消息
默認情況下匾寝,MQ的消費者有可能會自動提交已經(jīng)消費的offset,那么如果此時你還沒處理這個消息派發(fā)紅包的情況下荷腊,MQ的消費者可能直接自動給你提交這個消息1的offset到broker去了艳悔,標識為你已經(jīng)成功處理了這個消息。接著恰巧在這個時候女仰,我們的紅包系統(tǒng)突然重啟了猜年,或者是宕機了,或者是可能在派發(fā)紅包的時候更新數(shù)據(jù)庫失敗了疾忍,總之就是他突然故障了乔外,紅包系統(tǒng)的機器重啟了一下,然后此時內(nèi)存里的消息1必然就丟失了一罩,而且紅包也沒發(fā)出去杨幼。
紅包系統(tǒng)故障.png

35.發(fā)送消息零丟失方案:RocketMQ事務(wù)消息的實現(xiàn)流程分析

  1. 首先要讓訂單系統(tǒng)去發(fā)送一條half消息到MQ去,這個half消息本質(zhì)就是一個訂單支付成功的消息擒抛,只不過你可以理解為他這個消息的狀態(tài)是half狀態(tài)推汽,這個時候紅包系統(tǒng)是看不見這個half消息的。


    image.png
  2. 萬一half消息寫入失敗
    這個時候訂單系統(tǒng)就應(yīng)該執(zhí)行一系列的回滾操作歧沪,比如對訂單狀態(tài)做一個更新歹撒,讓狀態(tài)變成“關(guān)閉交易”,同時通知支付系統(tǒng)自動進行退款诊胞。
  3. half消息成功之后暖夭,訂單系統(tǒng)完成自己的任務(wù)
    這個時候訂單系統(tǒng)就應(yīng)該在自己本地的數(shù)據(jù)庫里執(zhí)行一些增刪改操作了,因為一旦half消息寫成功了撵孤,就說明MQ肯定已經(jīng)收到這條消息了迈着,MQ還活著,而且目前你是可以跟MQ正常溝通的邪码。


    image.png
  4. 如果訂單系統(tǒng)的本地事務(wù)執(zhí)行失敗
    這個時候其實也很簡單裕菠,直接就是讓訂單系統(tǒng)發(fā)送一個rollback請求給MQ就可以了。這個意思就是說闭专,你可以把之前我發(fā)給你的half消息給刪除掉了奴潘,因為我自己這里都出問題了旧烧,已經(jīng)無力跟你繼續(xù)后續(xù)的流程了。請求給MQ刪除那個half消息之后画髓,你的訂單系統(tǒng)就必須走后續(xù)的回退流程了掘剪,就是通知支付系統(tǒng)退款。


    image.png
  5. 訂單系統(tǒng)完成了本地事務(wù)
    如果訂單系統(tǒng)成功完成了本地的事務(wù)操作奈虾,比如把訂單狀態(tài)都更新為“已完成”了夺谁,此時你就可以發(fā)送一個commit請求給MQ,要求讓MQ對之前的half消息進行commit操作肉微,讓紅包系統(tǒng)可以看見這個訂單支付成功消息匾鸥。


    image.png
  6. 如果發(fā)送half消息成功了,但是沒收到響應(yīng)呢浪册?
    這個時候我們沒收到響應(yīng)扫腺,可能就會網(wǎng)絡(luò)超時報錯,也可能直接有其他的異常錯誤村象,這個時候訂單系統(tǒng)會誤以為是發(fā)送half消息到MQ失敗了,訂單系統(tǒng)就直接會執(zhí)行退款流程了攒至,訂單狀態(tài)也會標記為“已關(guān)閉”厚者。


    image.png

    其實RocketMQ這里有一個補償流程,他會去掃描自己處于half狀態(tài)的消息迫吐,如果我們一直沒有對這個消息執(zhí)行commit/rollback操作库菲,超過了一定的時間,他就會回調(diào)你的訂單系統(tǒng)的一個接口志膀,系統(tǒng)就得去查一下數(shù)據(jù)庫熙宇,看看這個訂單當前的狀態(tài),一下發(fā)現(xiàn)訂單狀態(tài)是“已關(guān)閉”溉浙,此時就知道烫止,你必然得發(fā)送rollback請求給MQ去刪除之前那個half消息了!


    image.png

    image.png
  7. 如果rollback或者commit發(fā)送失敗了呢戳稽?
    這個時候其實也很簡單馆蠕,因為MQ里的消息一直是half狀態(tài),所以說他過了一定的超時時間會發(fā)現(xiàn)這個half消息有問題惊奇,他會回調(diào)你的訂單系統(tǒng)的接口互躬,此時要判斷一下,這個訂單的狀態(tài)如果更新為了“已完成”颂郎,那就得再次執(zhí)行commit請求吼渡,反之則再次執(zhí)行rollback請求。
    本質(zhì)這個MQ的回調(diào)就是一個補償機制乓序,如果你的half消息響應(yīng)沒收到寺酪,或者rollback舟奠、commit請求沒發(fā)送成功,他都會來找你問問對half消息后續(xù)如何處理房维。
  8. 總結(jié)
    其實很簡單沼瘫,如果你的MQ有問題或者網(wǎng)絡(luò)有問題,half消息根本都發(fā)不出去咙俩,此時half消息肯定是失敗的耿戚,那么訂單系統(tǒng)就不會執(zhí)行后續(xù)流程了!
    如果要是half消息發(fā)送出去了阿趁,但是half消息的響應(yīng)都沒收到膜蛔,然后執(zhí)行了退款流程,那MQ會有補償機制來回調(diào)找你詢問要commit還是rollback脖阵,此時你選擇rollback刪除消息就可以了皂股,不會執(zhí)行后續(xù)流程!
    如果要是訂單系統(tǒng)收到half消息了命黔,結(jié)果訂單系統(tǒng)自己更新數(shù)據(jù)庫失敗了呜呐,那么他也會進行回滾,不會執(zhí)行后續(xù)流程了悍募!
    如果要是訂單系統(tǒng)收到half消息了蘑辑,然后還更新自己數(shù)據(jù)庫成功了,訂單狀態(tài)是“已完成”了坠宴,此時就必然會發(fā)送commit請求給MQ洋魂,一旦消息commit了,那么必然保證紅包系統(tǒng)可以收到這個消息喜鼓!
    而且即使你commit請求發(fā)送失敗了副砍,MQ也會有補償機制,回調(diào)你接口讓你判斷是否重新發(fā)送commit請求庄岖。
    總之豁翎,就是你的訂單系統(tǒng)只要成功了,那么必然要保證MQ里的消息是commit了可以讓紅包系統(tǒng)看到他顿锰!

36.事務(wù)消息機制的底層實現(xiàn)原理

  1. half 消息是如何對消費者不可見的谨垃?
    RocketMQ一旦發(fā)現(xiàn)你發(fā)送的是一個half消息,他不會把這個half消息的offset寫入OrderPaySuccessTopic的ConsumeQueue里去硼控。他會把這條half消息寫入到自己內(nèi)部的“RMQ_SYS_TRANS_HALF_TOPIC”這個Topic對應(yīng)的一個ConsumeQueue里去刘陶,所以你的紅包系統(tǒng)自然無法從OrderPaySuccessTopic的ConsumeQueue中看到這條half消息了。half消息進入到RocketMQ內(nèi)部的RMQ_SYS_TRANS_HALF_TOPIC的ConsumeQueue文件了牢撼,此時就會認為half消息寫入成功了匙隔,然后就會返回響應(yīng)給訂單系統(tǒng)。

    RMQ_SYS_TRANS_HALF_TOPIC.png

  2. 假如因為各種問題熏版,沒有執(zhí)行rollback或者commit會怎么樣纷责?
    其實這個時候他會在后臺有定時任務(wù)捍掺,定時任務(wù)會去掃描RMQ_SYS_TRANS_HALF_TOPIC中的half消息,如果你超過一定時間還是half消息再膳,他會回調(diào)訂單系統(tǒng)的接口挺勿,讓你判斷這個half消息是要rollback還是commit。

    定時任務(wù)掃描.png

  3. 如果執(zhí)行rollback操作的話喂柒,如何標記消息回滾不瓶?
    因為RocketMQ都是順序把消息寫入磁盤文件的,所以在這里如果你執(zhí)行rollback灾杰,他的本質(zhì)就是用一個OP操作來標記half消息的狀態(tài)蚊丐,RocketMQ內(nèi)部有一個OP_TOPIC,此時可以寫一條rollback OP記錄到這個Topic里艳吠,標記某個half消息是rollback了麦备。另外,假設(shè)你一直沒有執(zhí)行commit/rollback昭娩,RocketMQ會回調(diào)訂單系統(tǒng)的接口去判斷half消息的狀態(tài)凛篙,但是他最多就是回調(diào)15次,如果15次之后你都沒法告知他half消息的狀態(tài)题禀,就自動把消息標記為rollback鞋诗。

    如何標記消息回滾.png

  4. 如果執(zhí)行commit操作,如何讓消息對紅包系統(tǒng)可見迈嘹?
    執(zhí)行commit操作之后,RocketMQ就會在OP_TOPIC里寫入一條記錄全庸,標記half消息已經(jīng)是commit狀態(tài)了秀仲。接著需要把放在RMQ_SYS_TRANS_HALF_TOPIC中的half消息給寫入到OrderPaySuccessTopic的ConsumeQueue里去,然后我們的紅包系統(tǒng)可以就可以看到這條消息進行消費了壶笼。

    如何標記消息提交.png

其實本質(zhì)都是基于CommitLog神僵、ConsumeQueue這套存儲機制來做的,只不過中間有一些Topic的變換覆劈,half消息可能就是寫入內(nèi)部Topic的保礼。

37.同步發(fā)送消息 + 反復(fù)多次重試方案 VS RocketMQ事務(wù)消息方案

RocketMQ事務(wù)消息方案雖然能保證消息零丟失壹蔓,但是機制復(fù)雜喊式,完全有可能導(dǎo)致整體性能比較差,而且吞吐量比較低伊者,是否有更加簡單的方法來確保消息一定可以到達MQ呢坤候?能不能基于重試機制來確保消息到達MQ胁赢?
只要我們在代碼中發(fā)送消息到MQ之后,同步等待MQ返回響應(yīng)給我們白筹,一直等待智末,如果半路中有網(wǎng)絡(luò)異沉律悖或者MQ內(nèi)部異常,我們肯定會收到一個異常系馆,比如網(wǎng)絡(luò)錯誤送漠,或者請求超時之類的。
如果我們在收到異常之后由蘑,就認為消息到MQ發(fā)送失敗了闽寡,然后再次重試嘗試發(fā)送消息到MQ,接著再次同步等待MQ返回響應(yīng)給我們纵穿,這樣反復(fù)重試下隧,是否可以確保消息一定會到達MQ?

同步等待+反復(fù)重試.png

先執(zhí)行訂單本地事務(wù)谓媒,還是先發(fā)消息到MQ淆院?
如果我們先執(zhí)行訂單本地事務(wù),接著再發(fā)送消息到MQ的話句惯,偽代碼是這樣的:
image.png

假設(shè)你剛執(zhí)行完成了訂單本地事務(wù)了土辩,結(jié)果還沒等到你發(fā)送消息到MQ,結(jié)果你的訂單系統(tǒng)突然崩潰了抢野!這就導(dǎo)致你的訂單狀態(tài)可能已經(jīng)修改為了“已完成”拷淘,但是消息卻沒發(fā)送到MQ去!這就是這個方案最大的隱患指孤。
image.png

把訂單本地事務(wù)和重試發(fā)送MQ消息放到一個事務(wù)代碼中
偽代碼改成這樣:

image.png

上面這個代碼看起來似乎解決了我們的問題启涯,就是在這個方法上加入事務(wù),在這個事務(wù)方法中恃轩,我們哪怕執(zhí)行了orderService.finishOrderPay()结洼,但是其實也僅僅執(zhí)行了一些增刪改SQL語句,還沒提交訂單本地事務(wù)叉跛。
如果發(fā)送MQ消息失敗了松忍,而且多次重試還不奏效,則我們拋出異常會自動回滾訂單本地事務(wù)筷厘;如果你剛執(zhí)行了orderService.finishOrderPay()鸣峭,結(jié)果訂單系統(tǒng)直接崩潰了,此時訂單本地事務(wù)會回滾酥艳,因為根本沒提交過摊溶。
但是對于這個方案,還是非常的不理想玖雁,原因就出在那個MQ多次重試的地方更扁。
假設(shè)用戶支付成功了,然后支付系統(tǒng)回調(diào)通知你的訂單系統(tǒng)說,有一筆訂單已經(jīng)支付成功了浓镜,這個時候你的訂單系統(tǒng)卡在多次重試MQ的代碼那里溃列,可能耗時了好幾秒種,此時回調(diào)通知你的系統(tǒng)早就等不及可能都超時異常了膛薛。而且你把重試MQ的代碼放在這個邏輯里听隐,可能會導(dǎo)致訂單系統(tǒng)的這個接口性能很差。
image.png

一定可以依靠本地事務(wù)回滾嗎哄啄?
看下面的代碼:

image.png

雖然在方法上加了事務(wù)注解雅任,但是代碼里還有更新Redis緩存和Elasticsearch數(shù)據(jù)的代碼邏輯,如果你要是已
經(jīng)完成了訂單數(shù)據(jù)庫更新咨跌、Redis緩存更新沪么、ES數(shù)據(jù)更新了,結(jié)果沒法送MQ呢訂單系統(tǒng)崩潰了锌半。雖然訂單數(shù)據(jù)庫的操作會回滾禽车,但是Redis、Elasticsearch中的數(shù)據(jù)更新會自動回滾嗎刊殉?不會的殉摔,因為他們根本沒法自動回滾,此時數(shù)據(jù)還是會不一致的记焊。所以說逸月,完全寄希望于本地事務(wù)自動回滾是不現(xiàn)實的。

所以分析完了這個同步發(fā)送消息 + 反復(fù)多次重試的方案之后遍膜,我們會發(fā)現(xiàn)他實際落地的時候是可以的碗硬,但是里面存在一些問題。最后保證業(yè)務(wù)系統(tǒng)一致性的最佳方案還是:基于RocketMQ的事務(wù)消息機制瓢颅。

38.分析RocketMQ事物消息的代碼實現(xiàn)細節(jié)

public class TransactionProducer {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {

        // 這個東西就是用來接受RocketMQ回調(diào)的一個監(jiān)聽器接口
        // 這里會實現(xiàn)執(zhí)行訂單本地事務(wù)肛响,commit、rollback惜索,回調(diào)查詢等邏輯
        TransactionListener transactionListener = new TransactionListenerImpl();

        // 創(chuàng)建一個支持事務(wù)消息的Producer
        TransactionMQProducer producer = new TransactionMQProducer("TestProducerGroup");

        // 這個線程池是用來處理RocketMQ回調(diào)你的請求的
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(2000),
                (r) -> {
                    Thread thread = new Thread(r);
                    thread.setName("TestThread");
                    return thread;
                });

        // 給事務(wù)生產(chǎn)者設(shè)置對應(yīng)的線程池,負責執(zhí)行RocketMQ回調(diào)請求
        producer.setExecutorService(threadPool);
        // 給事務(wù)生產(chǎn)者設(shè)置對應(yīng)的回調(diào)函數(shù)
        producer.setTransactionListener(transactionListener);
        // 啟動這個事務(wù)消息生產(chǎn)者
        producer.start();

        // 構(gòu)建一條訂單支付成功的消息剃浇,指定Topic
        Message message = new Message("PayOrderSuccessTopic", "TestTag", "TestKey",
                "訂單消息".getBytes(RemotingHelper.DEFAULT_CHARSET));

        try {
            SendResult sendResult = producer.sendMessageInTransaction(message, null);
        } catch (MQClientException e) {
            // half消息發(fā)送失敗
            // 訂單系統(tǒng)執(zhí)行回滾邏輯巾兆,比如說觸發(fā)支付退款,更新訂單狀態(tài)為“已關(guān)閉”
        }
    }
}
public class TransactionListenerImpl implements TransactionListener {

    // 如果half消息發(fā)送成功了虎囚,就會毀掉你的這個函數(shù)角塑,你就可以執(zhí)行本地事務(wù)了
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 執(zhí)行本地事務(wù)
        // 根據(jù)本地事務(wù)一連串執(zhí)行結(jié)果,去選擇commit or rollback
        try {
            // 如果本地事務(wù)都成功了淘讥,返回commit
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            // 本地事務(wù)都成功了圃伶,回滾一切執(zhí)行過的操作
            // 如果本地事務(wù)執(zhí)行失敗了,返回rollback,標記half消息無效
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    // 如果因為各種原因窒朋,沒有返回commit或者rollback
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 查詢本地事務(wù)搀罢,是否執(zhí)行成功了
        Integer status = localTrans.get(msg.getTransactionId());
        // 根據(jù)本地事務(wù)的情況取選擇commit or rollback
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

39.Broker消息零丟失方案:同步刷盤 + Raft協(xié)議主從同步

Broker消息丟失的可能原因:

  1. 消息被寫入到os cache,但沒有被寫入到磁盤侥猩,此時Broke宕機榔至;
  2. 消息已經(jīng)被寫入到磁盤,但是磁盤損壞了欺劳,導(dǎo)致磁盤中消息丟失唧取,并且此時消費者還沒有來得及消費。

解決方案:
對于1划提,將異步刷盤調(diào)整為同步刷盤枫弟,這樣就保證了只有消息被刷入到磁盤中,該消息才被認為寫入成功鹏往,返回響應(yīng)給生產(chǎn)者淡诗。比如我們發(fā)送half消息的時候,只要MQ返回響應(yīng)是half消息發(fā)送成功了掸犬,那么就說明消息已經(jīng)進入磁盤文件了袜漩,不會停留在os cache里。具體做法:調(diào)整broker的配置文件湾碎,將其中的flushDiskType配置設(shè)置為:SYNC_FLUSH宙攻,默認他的值是ASYNC_FLUSH,即默認是異步刷盤的介褥。
對于2座掘,通過主從架構(gòu)模式避免磁盤故障導(dǎo)致的數(shù)據(jù)丟失,這樣一來柔滔,你一條消息但凡寫入成功了溢陪,此時主從兩個Broker上都有這條數(shù)據(jù)了,此時如果你的Master Broker的磁盤壞了睛廊,但是Slave Broker上至少還是有數(shù)據(jù)的形真,數(shù)據(jù)是不會因為磁盤故障而丟失的。

40.Consumer消息零丟失方案:手動提交offset + 自動故障轉(zhuǎn)移

Consumer消息丟失原因:紅包系統(tǒng)已經(jīng)拿到了這條消息超全,但是消息目前還在他的內(nèi)存里咆霜,還沒執(zhí)行派發(fā)紅包的邏輯,此時他就直接提交了這條消息的offset到broker去說自己已經(jīng)處理過了嘶朱,接著紅包系統(tǒng)在上圖這個狀態(tài)的時候就直接崩潰了蛾坯,內(nèi)存里的消息就沒了,紅包也沒派發(fā)出去疏遏,結(jié)果Broker已經(jīng)收到他提交的消息offset了脉课,還以為他已經(jīng)處理完這條消息了救军。等紅包系統(tǒng)重啟的時候,就不會再次消費這條消息了倘零。

解決方案:


手動提交offest代碼.png

對于RocketMQ而言唱遭,其實只要你的紅包系統(tǒng)是在這個監(jiān)聽器的函數(shù)中先處理一批消息,基于這批消息都派發(fā)完了紅包视事,然后返回了那個消費成功的狀態(tài)胆萧,接著才會去提交這批消息的offset到broker去。所以在這個情況下俐东,如果你對一批消息都處理完畢了跌穗,然后再提交消息的offset給broker,接著紅包系統(tǒng)崩潰了虏辫,此時是不會丟失消息的蚌吸。


手動提交offest.png

那么如果是紅包系統(tǒng)獲取到一批消息之后,還沒處理完砌庄,也就沒返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS這個狀態(tài)呢羹唠,自然沒提交這批消息的offset給broker呢,此時紅包系統(tǒng)突然掛了娄昆,會怎么樣佩微?
其實在這種情況下,你對一批消息都沒提交他的offset給broker的話萌焰,broker不會認為你已經(jīng)處理完了這批消息哺眯,此時你突然紅包系統(tǒng)的一臺機器宕機了,他其實會感知到你的紅包系統(tǒng)的一臺機器作為一個Consumer掛了扒俯。接著他會把你沒處理完的那批消息交給紅包系統(tǒng)的其他機器去進行處理奶卓,所以在這種情況下,消息也絕對是不會丟失的撼玄。

需要警惕的地方:不能異步消費消息
不能在代碼中對消息進行異步的處理夺姑,如下錯誤的示范,我們開啟了一個子線程去處理這批消息掌猛,然后啟動線程之后盏浙,就直接返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀態(tài)了。

不能異步消費消息.png

如果要是用這種方式來處理消息的話荔茬,那可能就會出現(xiàn)你開啟的子線程還沒處理完消息呢只盹,你已經(jīng)返回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS狀態(tài)了,就可能提交這批消息的offset給broker了兔院,認為已經(jīng)處理結(jié)束了。然后此時你紅包系統(tǒng)突然宕機站削,必然會導(dǎo)致你的消息丟失了坊萝!

41.基于 RocketMQ 設(shè)計的全鏈路消息零丟失方案總結(jié)

發(fā)送消息到MQ的零丟失:
方案一(同步發(fā)送消息 + 反復(fù)多次重試)
方案二(事務(wù)消息機制),兩者都有保證消息發(fā)送零丟失的效果,但是經(jīng)過分析十偶,事務(wù)消息方案整體會更好一些
MQ收到消息之后的零丟失:開啟同步刷盤策略 + 主從架構(gòu)同步機制菩鲜,只要讓一個Broker收到消息之后同步寫入磁盤,同時同步復(fù)制給其他Broker惦积,然后再返回響應(yīng)給生產(chǎn)者說寫入成功接校,此時就可以保證MQ自己不會弄丟消息
消費消息的零丟失:采用RocketMQ的消費者天然就可以保證你處理完消息之后,才會提交消息的offset到broker去狮崩,只要記住別采用多線程異步處理消息的方式即可

消息零丟失方案的優(yōu)勢與劣勢
優(yōu)勢:消息零丟失
劣勢:整個從頭到尾的消息流轉(zhuǎn)鏈路的性能大幅度下降蛛勉,MQ的吞吐量大幅度的下降

消息零丟失方案到底適用場景
一般我們建議,對于跟金錢睦柴、交易以及核心數(shù)據(jù)相關(guān)的系統(tǒng)和核心鏈路诽凌,可以上這套消息零丟失方案。
比如支付系統(tǒng)坦敌,他是絕對不能丟失任何一條消息的侣诵,你的性能可以低一些,但是不能有任何一筆支付記錄丟失。
比如訂單系統(tǒng),公司一般是不能輕易丟失一個訂單的叠纹,畢竟一個訂單就對應(yīng)一筆交易牡借,如果訂單丟失,用戶還支付成功了砰奕,你輕則要給用戶賠付損失,重則弄不好要經(jīng)受官司,特別是一些B2B領(lǐng)域的電商洗鸵,一筆線上交易可能多大幾萬幾十萬。
所以對這種非常非常核心的場景和少數(shù)幾條核心鏈路仗嗦,才會建議大家上這套復(fù)雜的消息0丟失方案膘滨。對于非核心的鏈路,非金錢交易的鏈路稀拐,大家可以適當簡化這套方案火邓,用一些方法避免數(shù)據(jù)輕易丟失,但是同時性能整體很高德撬,即使有極個別的數(shù)據(jù)丟失铲咨,對非核心的場景,也不會有太大的影響蜓洪。

42.生產(chǎn)案例:從 RocketMQ 底層原理分析為什么會重復(fù)發(fā)優(yōu)惠券纤勒?

重復(fù)發(fā)優(yōu)惠券.png
  1. 用于發(fā)送消息到MQ的訂單系統(tǒng),如果出現(xiàn)了接口超時等問題隆檀,可能會導(dǎo)致上游的支付系統(tǒng)重試調(diào)用訂單系統(tǒng)的接口摇天,進而導(dǎo)致訂單系統(tǒng)對一個消息重復(fù)發(fā)送兩條到MQ里去粹湃!


    可能原因1.png
  2. 發(fā)送MQ的重試機制可能因為網(wǎng)絡(luò)原因出現(xiàn)超時異常,從而重復(fù)發(fā)送MQ泉坐。


    重試代碼.png

    網(wǎng)絡(luò)原因?qū)е轮匕l(fā)MQ.png
  3. 優(yōu)惠券系統(tǒng)剛剛發(fā)完優(yōu)惠券为鳄,還沒來得及提交消息offset到broker,就宕機了(或者重啟)腕让,這時因為你沒提交這條消息的offset給broker孤钦,broker并不知道你已經(jīng)處理完了這條消息,然后優(yōu)惠券系統(tǒng)重啟之后纯丸,broker就會再次把這條消息交給你偏形,讓你再一次進行處理,然后你會再一次發(fā)送一張優(yōu)惠券液南,導(dǎo)致重復(fù)發(fā)送了兩次優(yōu)惠券壳猜!


    第一次發(fā)送優(yōu)惠券.png

    第二次發(fā)送優(yōu)惠券.png

43.對訂單系統(tǒng)核心流程引入 冪等性機制,保證數(shù)據(jù)不會重復(fù)

什么是冪等性機制滑凉?
這個冪等性機制统扳,其實就是用來避免對同一個請求或者同一條消息進行重復(fù)處理的機制,所謂的冪等畅姊,他的意思就是咒钟,比如你有一個接口,然后如果別人對一次請求重試了多次若未,來調(diào)用你的接口朱嘴,你必須保證自己系統(tǒng)的數(shù)據(jù)是正常的,不能多出來一些重復(fù)的數(shù)據(jù)粗合,這就是冪等性的意思萍嬉。

發(fā)送消息到MQ的時候如何保證冪等性?
1. 業(yè)務(wù)判斷法:當你的訂單系統(tǒng)的接口被重試調(diào)用的時候隙疚,你這個接口上來就應(yīng)該發(fā)送請求到MQ里去查詢一下壤追,比如對訂單id=1100這個訂單的支付成功消息,在你MQ那里有沒有供屉?如果有的話行冰,我就不再重復(fù)發(fā)送消息了!

業(yè)務(wù)判斷法.png

弊端:在這個環(huán)節(jié)你直接從MQ查詢消息是沒這個必要的伶丐,他的性能也不是太好悼做,會影響你的接口的性能。
2. 狀態(tài)判斷法-基于Redis緩存的冪等性機制:這個方法的核心在于哗魂,你需要引入一個Redis緩存來存儲你是否發(fā)送過消息的狀態(tài)肛走,如果你成功發(fā)送了一個消息到MQ里去,你得在Redis緩存里寫一條數(shù)據(jù)录别,標記這個消息已經(jīng)發(fā)送過羹与,那么當你的訂單接口被重復(fù)調(diào)用的時候故硅,你只要根據(jù)訂單id去Redis緩存里查詢一下,這個訂單的支付消息是否已經(jīng)發(fā)送給MQ了纵搁,如果發(fā)送過了,你就別再次發(fā)送了往踢!
基于Redis緩存的冪等性機制.png

弊端:這種方案一般情況下是可以做到冪等性的腾誉,但是如果有時候你剛發(fā)送了消息到MQ,還沒來得及寫Redis峻呕,系統(tǒng)就掛了利职,之后你的接口被重試調(diào)用的時候,你查Redis還以為消息沒發(fā)過瘦癌,就會發(fā)送重復(fù)的消息到MQ去猪贪。

優(yōu)惠券系統(tǒng)如何保證消息處理的冪等性?
其實這里就比較簡單了讯私,直接基于業(yè)務(wù)判斷法就可以了热押,因為優(yōu)惠券系統(tǒng)每次拿到一條消息后給用戶發(fā)一張優(yōu)惠券,實際上核心就是在數(shù)據(jù)庫里給用戶插入一條優(yōu)惠券記錄斤寇。那么如果優(yōu)惠券系統(tǒng)從MQ那里拿到一個訂單的兩條重復(fù)的支付成功消息桶癣,這個時候其實很簡單,他只要先去優(yōu)惠券數(shù)據(jù)庫中查詢一下娘锁,比如對訂單id=1100的訂單牙寞,是否已經(jīng)發(fā)放過優(yōu)惠券了,是否有優(yōu)惠券記錄莫秆,如果有的話间雀,就不要重復(fù)發(fā)券了!

優(yōu)惠券系統(tǒng)保證消息處理的冪等性.png

總結(jié)
一般來說镊屎,對于MQ的重復(fù)消息而言惹挟,往MQ里重復(fù)發(fā)送一樣的消息還是可以接受的,因為MQ里有多條重復(fù)消息杯道,它不會對系統(tǒng)的核心數(shù)據(jù)造成影響匪煌,但是關(guān)鍵要保證的是,從MQ里獲取消息進行處理的時候党巾,必須要保證消息不能重復(fù)處理萎庭。

44.如果優(yōu)惠券系統(tǒng)的數(shù)據(jù)庫宕機,如何用死信隊列解決這種異常場景齿拂?

假設(shè)了一個場景驳规,就是訂單支付成功之后會推送消息到MQ,然后優(yōu)惠券系統(tǒng)署海、紅包系統(tǒng)會從MQ里獲取消息去執(zhí)行后續(xù)的處理吗购,比如發(fā)紅包或者發(fā)優(yōu)惠券医男。那么如果這個時候,優(yōu)惠券系統(tǒng)的數(shù)據(jù)庫宕機了捻勉,針對這樣的一個坑爹的異常場景我們應(yīng)該怎么處理镀梭?
數(shù)據(jù)庫宕機的時候,返回RECONSUME_LATER
實際上如果我們因為數(shù)據(jù)庫宕機等問題踱启,對這批消息的處理是異常的报账,此時沒法處理這批消息,我們就應(yīng)該返回一個RECONSUME_LATER狀態(tài)埠偿,他的意思是透罢,我現(xiàn)在沒法完成這批消息的處理,麻煩你稍后過段時間再次給我這批消息讓我重新試一下冠蒋!

RECONSUME_LATER.png

RocketMQ是如何讓你進行消費重試的羽圃?
簡單來說,RocketMQ會有一個針對你這個ConsumerGroup的重試隊列抖剿。如果你返回了RECONSUME_LATER狀態(tài)朽寞,他會把你這批消息放到你這個消費組的重試隊列中去比如你的消費組的名稱是“VoucherConsumerGroup”,意思是優(yōu)惠券系統(tǒng)的消費組牙躺,那么他會有一個
“%RETRY%VoucherConsumerGroup”這個名字的重試隊列愁憔,然后過一段時間之后,重試隊列中的消息會再次給我們孽拷,讓我們進行處理吨掌。如果再次失敗,又返回了RECONSUME_LATER脓恕,那么會再過一段時間讓我們來進行處理膜宋,默認最多是重試16次!每次重試之間的間隔時間是不一樣的炼幔,這個間隔時間可以如下進行配置:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

重試隊列.png

如果連續(xù)重試16次還是無法處理消息秋茫,然后怎么辦?
其實就是一批消息交給你處理乃秀,你重試了16次還一直沒處理成功肛著,就不要繼續(xù)重試這批消息了,你就認為他們死掉了就可以了跺讯。然后這批消息會自動進入死信隊列枢贿。死信隊列的名字是“%DLQ%VoucherConsumerGroup”
那么對死信隊列中的消息我們怎么處理?
其實這個就看你的使用場景了刀脏,比如我們可以專門開一個后臺線程局荚,就是訂閱“%DLQ%VoucherConsumerGroup”這個死信隊列,對死信隊列中的消息,還是一直不停的重試耀态。
死信隊列.png

消費者底層的一些依賴可能有故障了轮傍,比如數(shù)據(jù)庫宕機,緩存宕機之類的首装,此時你就沒辦法完成消息的處理了创夜,那么可以通過一些返回狀態(tài)去讓消息進入RocketMQ自帶的重試隊列,同時如果反復(fù)重試還是不行仙逻,可以讓消息進入RocketMQ自帶的死信隊列挥下,后續(xù)針對死信隊列中的消息進行單獨的處理就可以了。

45.生產(chǎn)案例:為什么基于 RocketMQ 進行訂單庫數(shù)據(jù)同步時會消息亂序桨醋?

場景再現(xiàn):大數(shù)據(jù)系統(tǒng)在基于Mysql binlog同步訂單數(shù)據(jù)時,binlog里有兩條日志现斋,依次時insert喜最、update操作,但是大數(shù)據(jù)系統(tǒng)在處理消息的時候哦庄蹋,先處理了upodate消息瞬内,后處理insert消息,導(dǎo)致消息亂序限书,數(shù)據(jù)出現(xiàn)問題虫蝶。

消息亂序現(xiàn)象.png

原因分析:原本有順序的消息,完全可能會分發(fā)到不同的MessageQueue中去倦西,然后大數(shù)據(jù)系統(tǒng)的不同機器上部署的Consumer可能會用混亂的順序從不同的MessageQueue里獲取消息然后處理能真。
消息亂序原因.png

46.如何解決訂單數(shù)據(jù)庫同步的消息亂序問題?

  1. 采用取模的方式讓屬于同一個訂單的binlog進入一個MessageQueue
  2. 獲取binlog的時候也得有序
    解決訂單數(shù)據(jù)庫同步的消息亂序問題的方案.png

    萬一消息處理失敗了不可以走重試隊列
    因為如果你的consumer獲取到訂單的一個insert binlog扰柠,結(jié)果處理失敗了粉铐,此時返回了RECONSUME_LATER,那么這條消息會進入重試隊列卤档,過一會兒才會交給你重試蝙泼。但是此時broker會直接把下一條消息,也就是這個訂單的update binlog交給你來處理劝枣,此時萬一你執(zhí)行成功了汤踏,就根本沒有數(shù)據(jù)可以更新!又會出現(xiàn)消息亂序的問題舔腾。
    所以對于有序消息的方案中溪胶,如果你遇到消息處理失敗的場景,就必須返回SUSPEND_CURRENT_QUEUE_A_MOMENT這個狀態(tài)琢唾,意思是先等一會兒载荔,一會兒再繼續(xù)處理這批消息,而不能把這批消息放入重試隊列去采桃,然后直接處理下一批消息懒熙。

RocketMQ的順序消息機制的代碼實現(xiàn)

  1. 讓一個訂單的binlog進入一個MessageQueue


    讓一個訂單的binlog進入一個MessageQueue.png
  2. 消費者按照順序來獲取一個MessageQueue中的消息
    消費者按照順序來獲取一個MessageQueue中的消息.png

    使用的是MessageListenerOrderly這個東西丘损,他里面有Orderly這個名稱,也就是說工扎,Consumer會對每一個ConsumeQueue徘钥,都僅僅用一個線程來處理其中的消息。比如對ConsumeQueue01中的訂單id=1100的多個binlog肢娘,會交給一個線程來按照binlog順序來依次處理呈础。否則如果ConsumeQueue01中的訂單id=1100的多個binlog交給Consumer中的多個線程來處理的話,那還是會有消息亂序的問題橱健。

47.基于RocketMQ的數(shù)據(jù)過濾機制而钞,提升訂單數(shù)據(jù)庫同步的處理效率

一個數(shù)據(jù)庫中可能會包含很多表的數(shù)據(jù),比如訂單數(shù)據(jù)庫拘荡,他里面除了訂單信息表以外臼节,可能還包含很多其他的表。所以我們在進行數(shù)據(jù)庫binlog同步的時候珊皿,很可能是把一個數(shù)據(jù)庫里所有表的binlog都推送到MQ里去的网缝!
假設(shè)我們的大數(shù)據(jù)系統(tǒng)僅僅關(guān)注訂單數(shù)據(jù)庫中的表A的binlog逼蒙,并不關(guān)注其他表的binlog阁谆,那么大數(shù)據(jù)系統(tǒng)可能需要在獲取到所有表的binlog之后,對每條binlog判斷一下遗座,是否是表A的binlog驶兜?
如果不是表A的binlog扼仲,那么就直接丟棄不要處理;如果是表A的binlog促王,才會去進行處理犀盟!但是這樣的話,必然會導(dǎo)致大數(shù)據(jù)系統(tǒng)處理很多不關(guān)注的表的binlog蝇狼,也會很浪費時間阅畴,降低消息的效率.

解決方案:在發(fā)送消息的時候,給消息設(shè)置tag和屬性
針對這個問題迅耘,我們可以采用RocketMQ支持的數(shù)據(jù)過濾機制贱枣,來讓大數(shù)據(jù)系統(tǒng)僅僅關(guān)注他想要的表的binlog數(shù)據(jù)即可。
發(fā)送消息的時候颤专,可以給消息設(shè)置tag和屬性:

給消息設(shè)置tag和屬性.png

在消費數(shù)據(jù)的時候根據(jù)tag和屬性進行過濾:
根據(jù)tag過濾.png

根據(jù)屬性過濾

RocketMQ還是支持比較豐富的數(shù)據(jù)過濾語法的纽哥,如下所示:
(1)數(shù)值比較,比如:>栖秕,>=春塌,<,<=,BETWEEN只壳,=俏拱;
(2)字符比較,比如:=吼句,<>锅必,IN;
(3)IS NULL 或者 IS NOT NULL惕艳;
(4)邏輯符號 AND搞隐,OR,NOT远搪;
(5)數(shù)值劣纲,比如:123,3.1415谁鳍;
(6)字符味廊,比如:'abc',必須用單引號包裹起來棠耕;
(7)NULL,特殊的常量
(8)布爾值柠新,TRUE 或 FALSE

48.生產(chǎn)案例:基于延遲消息機制優(yōu)化大量訂單的定時退款掃描問題窍荧!

場景:在實際情況中,其實APP的大量用戶每天會下很多訂單恨憎,但是不少訂單可能是一直沒有進行支付的蕊退,可能他下單之后猶豫了,可能是他忘了支付了憔恳!所以一般訂單系統(tǒng)都必須設(shè)置一個規(guī)則瓤荔,當一個訂單下單之后,超過比如30分鐘沒有支付钥组,那么就必須訂單系統(tǒng)自動關(guān)閉這個訂單输硝,后續(xù)你如果要購買這個訂單里的商品,就得重新下訂單了程梦。
問題:那么訂單系統(tǒng)就需要有一個后臺線程点把,不停的掃描訂單數(shù)據(jù)庫里所有的未支付狀態(tài)的訂單,看他如果超過30分鐘了還沒支付屿附,那么就必須自動把訂單狀態(tài) 更新為“已關(guān)閉”郎逃。

后臺線程掃描未支付訂單.png

但是這里就引入了一個問題,就是訂單系統(tǒng)的后臺線程必須要不停的掃描各種未支付的訂單挺份,這種實現(xiàn)方式實際上并不是很好褒翰。

  1. 一個原因是未支付狀態(tài)的訂單可能是比較多的,然后你需要不停的掃描他們,可能每個未支付狀態(tài)的訂單要被掃描N多遍优训,才會發(fā)現(xiàn)他已經(jīng)超過30分鐘沒支付了朵你。
  2. 另外一個是很難去分布式并行掃描你的訂單。因為假設(shè)你的訂單數(shù)據(jù)量特別的多型宙,然后你要是打算用多臺機器部署訂單掃描服務(wù)撬呢,但是每臺機器掃描哪些訂單?怎么掃描妆兑?什么時候掃描魂拦?這都是一系列的麻煩問題。
    方案:針對類似這種場景搁嗓,MQ里的延遲消息可以派上用場了芯勘。所謂延遲消息,意思就是說腺逛,我們訂單系統(tǒng)在創(chuàng)建了一個訂單之后荷愕,可以發(fā)送一條消息到MQ里去,我們指定這條消息是延遲消息棍矛,比如要等待30分鐘之后安疗,才能被訂單掃描服務(wù)給消費到,這樣當訂單掃描服務(wù)在30分鐘后消費到了一條消息之后够委,就可以針對這條消息的信息荐类,去訂單數(shù)據(jù)庫里查詢這個訂單,看看他在創(chuàng)建過后都過了30分鐘了茁帽,此時他是否還是未支付狀態(tài)玉罐?如果此時訂單還是未支付狀態(tài),那么就可以關(guān)閉他潘拨,否則訂單如果已經(jīng)支付了吊输,就什么都不用做了。
    延遲消息用法.png

    這種方式就比你用后臺線程掃描訂單的方式要好的多了铁追,一個是對每個訂單你只會在他創(chuàng)建30分鐘后查詢他一次而已季蚂,不會反復(fù)掃描訂單多次。
    另外就是如果你的訂單數(shù)量很多琅束,你完全可以讓訂單掃描服務(wù)多部署幾臺機器癣蟋,然后對于MQ中的Topic可以多指定一個MessageQueue,這樣每個訂單掃描服務(wù)的機器作為一個Consumer都會處理一部分訂單的查詢?nèi)蝿?wù)狰闪。
    延遲消息代碼實現(xiàn):
    延遲消息生產(chǎn)者:
    生產(chǎn)者代碼.png

    發(fā)送延遲消息的核心疯搅,就是設(shè)置消息的delayTimeLevel,也就是延遲級別
    RocketMQ默認支持一些延遲級別如下:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h埋泵。所以上面代碼中設(shè)置延遲級別為3幔欧,意思就是延遲10s罪治,你發(fā)送出去的消息,會過10s被消費者獲取到礁蔗。那么如果是訂單延遲掃描場景觉义,可以設(shè)置延遲級別為16,也就是對應(yīng)上面的30分鐘浴井。
    延遲消息消費者:
    消費者代碼.png

49.在RocketMQ的生產(chǎn)實踐中積累的各種一手經(jīng)驗總結(jié)

1. 靈活的運用 tags來過濾數(shù)據(jù)
在真正的生產(chǎn)項目中晒骇,建議大家合理的規(guī)劃Topic和里面的tags,一個Topic代表了一類業(yè)務(wù)消息數(shù)據(jù)磺浙,然后對于這類業(yè)務(wù)消息數(shù)據(jù)洪囤,如果你希望繼續(xù)劃分一些類別的話,可以在發(fā)送消息的時候設(shè)置tags撕氧。
舉個例子瘤缩,比如我們都知道現(xiàn)在常見的外賣平臺有美團外賣、餓了么外賣還有別的一些外賣伦泥,那么假設(shè)你現(xiàn)在一個系統(tǒng)要發(fā)送外賣訂單數(shù)據(jù)到MQ里去剥啤,就可以針對性的設(shè)置tags,比如不同的外賣數(shù)據(jù)都到一個“WaimaiOrderTopic”里去不脯。但是不同類型的外賣可以有不同的tags:“meituan_waimai”府怯,“eleme_waimai”,“other_waimai”防楷,等等富腊。然后對你消費“WaimaiOrderTopic”的系統(tǒng),可以根據(jù)tags來篩選域帐,可能你就需要某一種類別的外賣數(shù)據(jù)罷了。
2. 基于消息key來定位消息是否丟失
之前我們給大家講過是整,在消息0丟失方案中肖揣,可能要解決的是消息是否丟失的問題,那么如果消息真的丟失了浮入,我們是不是要排查龙优?此時是不是要從MQ里查一下,這個消息是否丟失了事秀?
那么怎么從MQ里查消息是否丟失呢彤断?可以基于消息key來實現(xiàn),比如通過下面的方式設(shè)置一個消息的key為訂單id:message.setKeys(orderId)易迹,這樣這個消息就具備一個key了宰衙。接著這個消息到broker上,會基于key構(gòu)建hash索引睹欲,這個hash索引就存放在IndexFile索引文件里供炼。然后后續(xù)我們可以通過MQ提供的命令去根據(jù)key查詢這個消息一屋,類似下面這樣:mqadmin queryMsgByKey -n 127.0.0.1:9876 -t SCANRECORD -k orderId
3. 消息零丟失方案的補充
之前我們給大家分析過消息零丟失方案,其實在消息零丟失方案中還有一個問題袋哼,那就是MQ集群徹底故障了冀墨,此時就是不可用了,那么怎么辦呢涛贯?
其實對于一些金融級的系統(tǒng)诽嘉,或者跟錢相關(guān)的支付系統(tǒng),或者是廣告系統(tǒng)弟翘,類似這樣的系統(tǒng)虫腋,都必須有超高級別的高可用保障機制。
一般假設(shè)MQ集群徹底崩潰了衅胀,你生產(chǎn)者就應(yīng)該把消息寫入到本地磁盤文件里去進行持久化岔乔,或者是寫入數(shù)據(jù)庫里去暫存起來,等待MQ恢復(fù)之后滚躯,然后再把持久化的消息繼續(xù)投遞到MQ里去雏门。
4. 提高消費者的吞吐量
如果消費的時候發(fā)現(xiàn)消費的比較慢,那么可以提高消費者的并行度掸掏,常見的就是部署更多的consumer機器
但是這里要注意茁影,你的Topic的MessageQueue得是有對應(yīng)的增加,因為如果你的consumer機器有5臺丧凤,然后MessageQueue只有4個募闲,那么意味著有一個consumer機器是獲取不到消息的。
然后就是可以增加consumer的線程數(shù)量愿待,可以設(shè)置consumer端的參數(shù):consumeThreadMin浩螺、consumeThreadMax,這樣一臺consumer機器上的消費線程越多仍侥,消費的速度就越快要出。此外,還可以開啟消費者的批量消費功能农渊,就是設(shè)置consumeMessageBatchMaxSize參數(shù)患蹂,他默認是1,但是你可以設(shè)置的多一些砸紊,那么一次就會交給你的回調(diào)函數(shù)一批消息給你來處理了传于,此時你可以通過SQL語句一次性批量處理一些數(shù)據(jù),比如:update xxx setxxx where id in (xx,xx,xx)醉顽。通過批量處理消息的方式沼溜,也可以大幅度提升消息消費的速度。
5. 要不要消費歷史消息
其實consumer是支持設(shè)置從哪里開始消費消息的游添,常見的有兩種:一個是從Topic的第一條數(shù)據(jù)開始消費盛末,一個是從最后一次消費過的消息之后開始消費弹惦。對應(yīng)的是:CONSUME_FROM_LAST_OFFSETCONSUME_FROM_FIRST_OFFSET悄但。一般來說棠隐,我們都會選擇CONSUME_FROM_FIRST_OFFSET,這樣你剛開始就從Topic的第一條消息開始消費檐嚣,但是以后每次重啟助泽,你都是從上一次消費到的位置繼續(xù)往后進行消費的。

50.企業(yè)級的RocketMQ集群如何進行權(quán)限機制的控制嚎京?

在RocketMQ中實現(xiàn)權(quán)限控制也不難嗡贺,首先我們需要在broker端放一個額外的ACK權(quán)限控制配置文件,里面需要規(guī)定好權(quán)限鞍帝,包括什么用戶對哪些Topic有什么操作權(quán)限诫睬,這樣的話,各個Broker才知道你每個用戶的權(quán)限帕涌。
首先在每個Broker的配置文件里需要設(shè)置aclEnable=true這個配置摄凡,開啟權(quán)限控制
其次,在每個Broker部署機器的${ROCKETMQ_HOME}/store/config目錄下蚓曼,可以放一個plain_acl.yml的配置文件亲澡,這個里面就可以進行權(quán)限配置,類似下面這樣子:

# 這個參數(shù)就是全局性的白名單
# 這里定義的ip地址纫版,都是可以訪問Topic的
globalWhiteRemoteAddresses:
- 13.21.33.*
- 192.168.0.*
# 這個accounts就是說床绪,你在這里可以定義很多賬號
# 每個賬號都可以在這里配置對哪些Topic具有一些操作權(quán)限
accounts:
# 這個accessKey其實就是用戶名的意思,比如我們這里叫做“訂單技術(shù)團隊”
- accessKey: OrderTeam
# 這個secretKey其實就是這個用戶名的密碼
secretKey: 123456
# 下面這個是當前這個用戶名下哪些機器要加入白名單的
whiteRemoteAddress:
# admin指的是這個賬號是不是管理員賬號
admin: false
# 這個指的是默認情況下這個賬號的Topic權(quán)限和ConsumerGroup權(quán)限
defaultTopicPerm: DENY
defaultGroupPerm: SUB
# 這個就是這個賬號具體的堆一些賬號的權(quán)限
# 下面就是說當前這個賬號對兩個Topic其弊,都具備PUB|SUB權(quán)限癞己,就是發(fā)布和訂閱的權(quán)限
# PUB就是發(fā)布消息的權(quán)限,SUB就是訂閱消息的權(quán)限
# DENY就是拒絕你這個賬號訪問這個Topic
topicPerms:
- CreateOrderInformTopic=PUB|SUB
- PaySuccessInformTopic=PUB|SUB
# 下面就是對ConsumerGroup的權(quán)限梭伐,也是同理的
groupPerms:
- groupA=DENY
- groupB=PUB|SUB
- groupC=SUB
# 下面就是另外一個賬號了痹雅,比如是商品技術(shù)團隊的賬號
- accessKey: ProductTeam
secretKey: 12345678
whiteRemoteAddress: 192.168.1.*
# 如果admin設(shè)置為true,就是具備一切權(quán)限
admin: true

如果你一個賬號沒有對某個Topic顯式的指定權(quán)限籽御,那么就是會采用默認Topic權(quán)限。
接著我們看看在你的生產(chǎn)者和消費者里惰匙,如何指定你的團隊分配到的RocketMQ的賬號技掏,當你使用一個賬號的時候,就只能訪問你有權(quán)限的Topic项鬼。


權(quán)限控制代碼.png

上面的代碼中就是在創(chuàng)建Producer的時候后哑梳,傳入進去一個AclClientRPCHook,里面就可以設(shè)置你這個Producer的賬號密碼绘盟,對于創(chuàng)建Consumer也是同理的。通過這樣的方式体啰,就可以在Broker端設(shè)置好每個賬號對Topic的訪問權(quán)限捞烟,然后你不同的技術(shù)團隊就用不同的賬號就可以了。

51.如何對線上生產(chǎn)環(huán)境的RocketMQ集群進行消息軌跡的追蹤锡垄?

首先需要在broker的配置文件里開啟traceTopicEnable=true這個選項,此時就會開啟消息軌跡追蹤的功能祭隔。
接著當我們開啟了上述的選項之后货岭,我們啟動這個Broker的時候會自動創(chuàng)建出來一個內(nèi)部的Topic,就是RMQ_SYS_TRACE_TOPIC疾渴,這個Topic就是用來存儲所有的消息軌跡追蹤的數(shù)據(jù)的千贯。
此時創(chuàng)建Producer的時候要用如下的方式,下面構(gòu)造函數(shù)中的第二個參數(shù)搞坝,就是enableMsgTrace參數(shù)搔谴,他設(shè)置為true,就是說可以對消息開啟軌跡追蹤桩撮,在訂閱消息的時候敦第,對于Consumer也是同理的,在構(gòu)造函數(shù)的第二個參數(shù)設(shè)置為true距境,就是開啟了消費時候的軌跡追蹤申尼。


image.png

接著如果我們想要查詢消息軌跡,也很簡單垫桂,在RocketMQ控制臺里师幕,在導(dǎo)航欄里就有一個消息軌跡,在里面可以創(chuàng)建查詢?nèi)蝿?wù)诬滩,你可以根據(jù)messageId霹粥、message key或者Topic來查詢,查詢?nèi)蝿?wù)執(zhí)行完畢之后疼鸟,就可以看到消息軌跡的界面了后控。

52.由于消費系統(tǒng)故障導(dǎo)致的RocketMQ百萬消息積壓問題,應(yīng)該如何處理空镜?

1. MessageQueue數(shù)量大于消費者系統(tǒng)數(shù)量->增加機器
假如你的Topic有20個MessageQueue浩淘,然后你只有4個消費者系統(tǒng)在消費,那么每個消費者系統(tǒng)會從5個MessageQueue里獲取消息吴攒,所以此時如果你僅僅依靠4個消費者系統(tǒng)是肯定不夠的张抄,畢竟MQ里積壓了百萬消息了。
所以此時你可以臨時申請16臺機器多部署16個消費者系統(tǒng)的實例洼怔,然后20個消費者系統(tǒng)同時消費署惯,每個人消費一個MessageQueue的消息,此時你會發(fā)現(xiàn)你消費的速度提高了5倍镣隶,很快積壓的百萬消息都會被處理完畢极谊。
當你處理完百萬積壓的消息之后诡右,就可以下線多余的16臺機器了。
2. MessageQueue數(shù)量等于消費者系統(tǒng)數(shù)量->寫入臨時隊列
那么如果你的Topic總共就只有4個MessageQueue轻猖,然后你就只有4個消費者系統(tǒng)呢帆吻?
這個時候就沒辦法擴容消費者系統(tǒng)了,因為你加再多的消費者系統(tǒng)蜕依,還是只有4個MessageQueue桅锄,沒法并行消費。
所以此時往往是臨時修改那4個消費者系統(tǒng)的代碼样眠,讓他們獲取到消息然后不寫入NoSQL友瘤,而是直接把消息寫入一個新的Topic,這個速度是很快的檐束,因為僅僅是讀寫MQ而已辫秧。
然后新的Topic有20個MessageQueue,然后再部署20臺臨時增加的消費者系統(tǒng)被丧,去消費新的Topic后寫入數(shù)據(jù)到NoSQL里去盟戏,這樣子也可以迅速的增加消費者系統(tǒng)的并行處理能力,使用一個新的Topic來允許更多的消費者系統(tǒng)并行處理甥桂。

53.金融級的系統(tǒng)如何針對RocketMQ集群崩潰設(shè)計高可用方案柿究?

跟金錢相關(guān)的一些系統(tǒng),他可能需要依賴MQ去傳遞消息黄选,如果你MQ突然崩潰了蝇摸,可能導(dǎo)致很多跟錢相關(guān)的東西就會出問題。
針對這種場景办陷,我們通常都會在你發(fā)送消息到MQ的那個系統(tǒng)中設(shè)計高可用的降級方案貌夕,這個降級方案通常的思路是,你需要在你發(fā)送消息到MQ代碼里去try catch捕獲異常民镜,如果你發(fā)現(xiàn)發(fā)送消息到MQ有異常啡专,此時你需要進行重試
如果你發(fā)現(xiàn)連續(xù)重試了比如超過3次還是失敗制圈,說明此時可能就是你的MQ集群徹底崩潰了们童,此時你必須把這條重要的消息寫入到本地存儲中去,可以是寫入數(shù)據(jù)庫里鲸鹦,也可以是寫入到機器的本地磁盤文件里去慧库,或者是NoSQL存儲中去。
之后你要不停的嘗試發(fā)送消息到MQ去亥鬓,一旦發(fā)現(xiàn)MQ集群恢復(fù)了完沪,你必須有一個后臺線程可以把之前持久化存儲的消息都查詢出來域庇,然后依次按照順序發(fā)送到MQ集群里去嵌戈,這樣才能保證你的消息不會因為MQ徹底崩潰會丟失覆积。
這里要有一個很關(guān)鍵的注意點,就是你把消息寫入存儲中暫存時熟呛,一定要保證他的順序宽档,比如按照順序一條一條的寫入本地磁盤文件去暫存消息。而且一旦MQ集群故障了庵朝,你后續(xù)的所有寫消息的代碼必須嚴格的按照順序把消息寫入到本地磁盤文件里去暫存吗冤,這個順序性是要嚴格保證的。

54.為什么要給RocketMQ增加消息限流功能保證其高可用性九府?

其實本質(zhì)上來說椎瘟,限流功能就是對系統(tǒng)的一個保護功能。
在接收消息這塊侄旬,必須引入一個限流機制肺蔚,也就是說要限制好,你這臺機器每秒鐘最多就只能處理比如3萬條消息儡羔,根據(jù)你的MQ集群的壓測結(jié)果來宣羊,你可以通過壓測看看你的MQ最多可以抗多少Q(mào)PS,然后就做好限流汰蜘。
一般來說仇冯,限流算法可以采取令牌桶算法,也就是說你每秒鐘就發(fā)放多少個令牌族操,然后只能允許多少個請求通過苛坚。關(guān)于限流算法的實現(xiàn),不在我們的討論范圍內(nèi)坪创,大家可以自己查閱一下資料炕婶,也并不是很難。
我們這里主要是給大家講一下莱预,很多互聯(lián)網(wǎng)大廠其實都會改造開源MQ的內(nèi)核源碼柠掂,引入限流機制,然后只能允許指定范圍內(nèi)的消息被在一秒內(nèi)被處理依沮,避免因為一些異常的情況涯贞,導(dǎo)致MQ集群掛掉。

55.設(shè)計一套Kafka到RocketMQ的雙寫+雙讀技術(shù)方案危喉,實現(xiàn)無縫遷移宋渔!

假設(shè)你們公司本來線上的MQ用的主要是Kafka,現(xiàn)在要從Kafka遷移到RocketMQ去辜限,那么這個遷移的過程應(yīng)
該怎么做呢皇拣?應(yīng)該采用什么樣的技術(shù)方案來做遷移呢?
MQ集群遷移過程中的雙寫+雙讀技術(shù)方案

  1. 一般來說,首先你要做到雙寫氧急,也就是說颗胡,在你所有的Producer系統(tǒng)中,要引入一個雙寫的代碼吩坝,讓他同時往Kafka和RocketMQ中去寫入消息毒姨,然后多寫幾天,起碼雙寫要持續(xù)個1周左右钉寝,因為MQ一般都是實時數(shù)據(jù)弧呐,里面數(shù)據(jù)也就最多保留一周。
  2. 當你的雙寫持續(xù)一周過后嵌纲,你會發(fā)現(xiàn)你的Kafka和RocketMQ里的數(shù)據(jù)看起來是幾乎一模一樣了俘枫,因為MQ反正也就保留最近幾天的數(shù)據(jù),當你雙寫持續(xù)超過一周過后逮走,你會發(fā)現(xiàn)Kafka和RocketMQ里的數(shù)據(jù)幾乎一模一樣了崩哩。
  3. 但是光是雙寫還是不夠的,還需要同時進行雙讀言沐,也就是說在你雙寫的同時邓嘹,你所有的Consumer系統(tǒng)都需要同時從Kafka和RocketMQ里獲取消息,分別都用一模一樣的邏輯處理一遍险胰。只不過從Kafka里獲取到的消息還是走核心邏輯去處理汹押,然后可以落入數(shù)據(jù)庫或者是別的存儲什么的,但是對于RocketMQ里獲取到的消息起便,你可以用一樣的邏輯處理棚贾,但是不能把處理結(jié)果具體的落入數(shù)據(jù)庫之類的地方。
  4. 你的Consumer系統(tǒng)在同時從Kafka和RocketMQ進行消息讀取的時候榆综,你需要統(tǒng)計每個MQ當日讀取和處理的消息的數(shù)量妙痹,這點非常的重要,同時對于RocketMQ讀取到的消息處理之后的結(jié)果鼻疮,可以寫入一個臨時的存儲中怯伊。
  5. 同時你要觀察一段時間,當你發(fā)現(xiàn)持續(xù)雙寫和雙讀一段時間之后判沟,如果所有的Consumer系統(tǒng)通過對比發(fā)現(xiàn)耿芹,從Kafka和RocketMQ讀取和處理的消息數(shù)量一致,同時處理之后得到的結(jié)果也都是一致的挪哄,此時就可以判斷說當前Kafka和RocketMQ里的消息是一致的吧秕,而且計算出來的結(jié)果也都是一致的。
    6.這個時候就可以實施正式的切換了迹炼,你可以停機Producer系統(tǒng)砸彬,再重新修改后上線,全部修改為僅僅寫RocketMQ,這個時候他數(shù)據(jù)不會丟砂碉,因為之前已經(jīng)雙寫了一段時間了吟秩,然后所有的Consumer系統(tǒng)可以全部下線后修改代碼再上線,全部基于RocketMQ來獲取消息绽淘,計算和處理,結(jié)果寫入存儲中闹伪』γ基本上對于類似的一些重要中間件的遷移,往往都會采取雙寫的方法偏瓤,雙寫一段時間杀怠,然后觀察兩個方案的結(jié)果都一致了,你再正式下線舊的一套東西厅克。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末赔退,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子证舟,更是在濱河造成了極大的恐慌硕旗,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,734評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件女责,死亡現(xiàn)場離奇詭異漆枚,居然都是意外死亡,警方通過查閱死者的電腦和手機抵知,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,931評論 3 394
  • 文/潘曉璐 我一進店門墙基,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人刷喜,你說我怎么就攤上這事残制。” “怎么了掖疮?”我有些...
    開封第一講書人閱讀 164,133評論 0 354
  • 文/不壞的土叔 我叫張陵初茶,是天一觀的道長。 經(jīng)常有香客問我浊闪,道長纺蛆,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,532評論 1 293
  • 正文 為了忘掉前任规揪,我火速辦了婚禮桥氏,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘猛铅。我一直安慰自己字支,他們只是感情好,可當我...
    茶點故事閱讀 67,585評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著堕伪,像睡著了一般揖庄。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上欠雌,一...
    開封第一講書人閱讀 51,462評論 1 302
  • 那天蹄梢,我揣著相機與錄音,去河邊找鬼富俄。 笑死禁炒,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的霍比。 我是一名探鬼主播幕袱,決...
    沈念sama閱讀 40,262評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼悠瞬!你這毒婦竟也來了们豌?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,153評論 0 276
  • 序言:老撾萬榮一對情侶失蹤浅妆,失蹤者是張志新(化名)和其女友劉穎望迎,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體凌外,經(jīng)...
    沈念sama閱讀 45,587評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡擂煞,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,792評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了趴乡。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片对省。...
    茶點故事閱讀 39,919評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖晾捏,靈堂內(nèi)的尸體忽然破棺而出蒿涎,到底是詐尸還是另有隱情,我是刑警寧澤惦辛,帶...
    沈念sama閱讀 35,635評論 5 345
  • 正文 年R本政府宣布劳秋,位于F島的核電站,受9級特大地震影響胖齐,放射性物質(zhì)發(fā)生泄漏玻淑。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,237評論 3 329
  • 文/蒙蒙 一呀伙、第九天 我趴在偏房一處隱蔽的房頂上張望补履。 院中可真熱鬧,春花似錦剿另、人聲如沸箫锤。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,855評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽谚攒。三九已至阳准,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間馏臭,已是汗流浹背盛卡。 一陣腳步聲響...
    開封第一講書人閱讀 32,983評論 1 269
  • 我被黑心中介騙來泰國打工寝受, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留撩荣,地道東北人道偷。 一個月前我還...
    沈念sama閱讀 48,048評論 3 370
  • 正文 我出身青樓膝昆,卻偏偏與公主長得像孵延,于是被迫代替她去往敵國和親贷腕。 傳聞我的和親對象是個殘疾皇子惹苗,可洞房花燭夜當晚...
    茶點故事閱讀 44,864評論 2 354

推薦閱讀更多精彩內(nèi)容