RocketMQ源碼-主從同步復(fù)制和異步復(fù)制


1 概述
2 相關(guān)類介紹
3 同步復(fù)制原理
4 異步復(fù)制原理
5 注意事項

1 概述

為了提供系統(tǒng)的可靠性付材,RocketMQ采用了主從復(fù)制機(jī)制赞别,對于每個主Broker捅僵,可配置多個從Broker慷妙,主Broker接收生產(chǎn)者消息保存后眷昆,可通過同步或者異步的方式復(fù)制到從Broker演训,以此實現(xiàn)高可用弟孟。

采用同步復(fù)制,可以最大程度保證消息的可靠性样悟,但是每次寫消息都必須等待至少該消息被同步至一個從Broker中拂募,因此也會影響系統(tǒng)吞吐量。異步復(fù)制則和同步復(fù)制正好相反窟她,主Broker寫完消息之后立刻返回陈症,不用管該消息是否已經(jīng)被復(fù)制到從Broker,因此吞吐量會高一點(diǎn)震糖,但是如果在消息被復(fù)制到從Broker之前主Broker發(fā)生故障录肯,那么有可能會造成未來得及復(fù)制到從Broker的信息丟失。

本文會簡單介紹下RocketMQ主從同步的實現(xiàn)吊说。

2 相關(guān)類介紹

  • HAService

是主從復(fù)制服務(wù)的主要實現(xiàn)類论咏,通過內(nèi)部相關(guān)組件實現(xiàn)接受從Broker連接請求、記錄從Broker上報的復(fù)制進(jìn)度等功能颁井。

  • AcceptSocketService

主要負(fù)責(zé)接受從Broker的連接請求厅贪,接受到的每個從Broker的連接之后會新建HAConnection對象實例。

  • HAConnection

主Broker管理的一系列從Broker連接雅宾,內(nèi)部持有一個ReadSocketService對象實例养涮,負(fù)責(zé)接收從Broker定時上報的自己當(dāng)前復(fù)制進(jìn)度,也持有一個WriteSocketService負(fù)責(zé)在通道可寫時向從Broker發(fā)送需要復(fù)制的數(shù)據(jù),完成主從復(fù)制单寂。

  • HAClient

一個ServiceThread實現(xiàn)類贬芥,如果是從Broker,則會嘗試向主Broker建立連接宣决,并定時向主Broker匯報自己的復(fù)制進(jìn)度蘸劈,之后監(jiān)聽OP_READ事件(可參考筆者文章NIO SelectionKey事件理解),處理主Broker通過HAConnection.WriteSocketService發(fā)送過來的需要復(fù)制的數(shù)據(jù)尊沸。

  • GroupTransferService

主從同步復(fù)制的實現(xiàn)類威沫,如果是主從同步復(fù)制,則會向該類提交一個復(fù)制任務(wù)請求洼专,并進(jìn)入阻塞等待狀態(tài)棒掠,該任務(wù)主要封裝了當(dāng)前主Broker的消息寫進(jìn)度,GroupTransferService也是一個ServiceThread屁商,會定時獲取HAConnection.ReadSocketService接受到的從Broker的最大復(fù)制進(jìn)度烟很,然后對比所有的復(fù)制任務(wù)請求,如果已經(jīng)從Broker最大復(fù)制進(jìn)度已經(jīng)大于請求內(nèi)的進(jìn)度要求蜡镶,則喚醒該同步復(fù)制阻塞雾袱,阻塞和喚醒是通過CountDownLatch實現(xiàn)的。

3 同步復(fù)制原理

BrokerController通過DefaultMessageStore.putMessage存儲消息官还,后者則通過調(diào)用CommitLog.putMessage進(jìn)行實際的消息存儲處理芹橡,CommitLog.putMessage在將消息寫入內(nèi)存緩沖之后會先調(diào)用handleDiskFlush進(jìn)行同步或異步刷盤,之后會調(diào)用handleHA進(jìn)行主從復(fù)制處理望伦。

handlerHA方法定義如下:

//CommitLog
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    //住過服務(wù)器狀態(tài)為同步主Broker林说,則需要向HAService的
    //GroupTransferService提交復(fù)制等待請求
    if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
        HAService service = this.defaultMessageStore.getHaService();
        if (messageExt.isWaitStoreMsgOK()) {
            // Determine whether to wait
            //isSlaveOK則判斷當(dāng)前是否有從Broker連接,如果沒有返回
            //失敗屯伞,所以如果集群只有一個主Broker腿箩,沒有從Broker,但是
            //配置該主Broker為同步模式劣摇,則會一直報
            //SLAVE_NOT_AVAILABLE錯誤
            //同時如果當(dāng)前從Broker復(fù)制的最大進(jìn)度離當(dāng)前寫入的位置偏差
            //大于配置的指定值珠移,則也會報錯,這個判斷就是為了保證
            //從Broker不會落后主Broker太多
            if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
                //等待請求的復(fù)制進(jìn)度為當(dāng)前寫開始位置+寫入字節(jié)數(shù)
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                service.putRequest(request);
                service.getWaitNotifyObject().wakeupAll();
                //等待復(fù)制到該進(jìn)度完成
                boolean flushOK =
                    request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
                        + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
                }
            }
            // Slave problem
            else {
                // Tell the producer, slave not available
                putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
            }
        }
    }
}

GroupTransferService接受了復(fù)制等待請求任務(wù)之后會在run方法中定時查看從Broker上報的最大復(fù)制進(jìn)度饵撑,會喚醒那些等待進(jìn)度已達(dá)到的等待任務(wù)。

public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            //定時調(diào)用doWaitTransfer方法
            this.waitForRunning(10);
            this.doWaitTransfer();
        } catch (Exception e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}

private void doWaitTransfer() {
    synchronized (this.requestsRead) {
        if (!this.requestsRead.isEmpty()) {
            //對于提交上來的每個等待復(fù)制請求唆貌,判斷是否達(dá)到
            //預(yù)期的復(fù)制進(jìn)度
            for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                //如果從Broker上報的賦值進(jìn)度大于請求期望的偏移
                //則表示傳輸完成滑潘,也就是等待完成
                boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                //否則的話,再嘗試五次判斷锨咙,每次間隔一秒
                for (int i = 0; !transferOK && i < 5; i++) {
                    this.notifyTransferObject.waitForRunning(1000);
                    transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                }
                //如果五次依然失敗语卤,則記錄日志
                if (!transferOK) {
                    log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
                }
                //令等待同步復(fù)制的handleHA方法返回,并通過transferOK
                //告訴其等待結(jié)果
                req.wakeupCustomer(transferOK);
            }

            this.requestsRead.clear();
        }
    }
}

4 異步復(fù)制原理

其實異步復(fù)制原理比較簡單,如果Broker配置為異步復(fù)制粹舵,則在CommitLog.putMessage寫入消息之后钮孵,調(diào)用handleHA方法不會做任何操作,寫入之后根本不用管從Broker的復(fù)制進(jìn)度眼滤,復(fù)制完全是由后臺HAConnection.WriteSocketService服務(wù)在監(jiān)聽到有從Broker連接可寫時巴席,向其寫待復(fù)制的數(shù)據(jù)。每個從Broker發(fā)送進(jìn)度則由從Broker定時匯報的自身當(dāng)前已復(fù)制進(jìn)度控制诅需,該匯報由HAConnection.ReadSocketService負(fù)責(zé)處理漾唉,從Broker匯報上來的最大復(fù)制進(jìn)度則用于第3節(jié)介紹的同步復(fù)制的等待復(fù)制任務(wù)阻塞的線程。

5 注意事項

RocketMQ各組件通信比如Broker和Namesrv堰塌、Producer和Namesrv赵刑、Consumer和Namesrv、Producer以及Consumer和Broker之間的通信都是基于Netty實現(xiàn)的场刑,但是本文介紹的主從復(fù)制實現(xiàn)中的網(wǎng)絡(luò)交互都是基于原生Java NIO實現(xiàn)的般此。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市牵现,隨后出現(xiàn)的幾起案子铐懊,更是在濱河造成了極大的恐慌,老刑警劉巖施籍,帶你破解...
    沈念sama閱讀 219,427評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件居扒,死亡現(xiàn)場離奇詭異,居然都是意外死亡丑慎,警方通過查閱死者的電腦和手機(jī)喜喂,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,551評論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來竿裂,“玉大人玉吁,你說我怎么就攤上這事∧逡欤” “怎么了进副?”我有些...
    開封第一講書人閱讀 165,747評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長悔常。 經(jīng)常有香客問我影斑,道長,這世上最難降的妖魔是什么机打? 我笑而不...
    開封第一講書人閱讀 58,939評論 1 295
  • 正文 為了忘掉前任矫户,我火速辦了婚禮,結(jié)果婚禮上残邀,老公的妹妹穿的比我還像新娘皆辽。我一直安慰自己柑蛇,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,955評論 6 392
  • 文/花漫 我一把揭開白布驱闷。 她就那樣靜靜地躺著耻台,像睡著了一般。 火紅的嫁衣襯著肌膚如雪空另。 梳的紋絲不亂的頭發(fā)上盆耽,一...
    開封第一講書人閱讀 51,737評論 1 305
  • 那天,我揣著相機(jī)與錄音痹换,去河邊找鬼征字。 笑死,一個胖子當(dāng)著我的面吹牛娇豫,可吹牛的內(nèi)容都是我干的匙姜。 我是一名探鬼主播,決...
    沈念sama閱讀 40,448評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼冯痢,長吁一口氣:“原來是場噩夢啊……” “哼氮昧!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起浦楣,我...
    開封第一講書人閱讀 39,352評論 0 276
  • 序言:老撾萬榮一對情侶失蹤袖肥,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后振劳,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體椎组,經(jīng)...
    沈念sama閱讀 45,834評論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,992評論 3 338
  • 正文 我和宋清朗相戀三年历恐,在試婚紗的時候發(fā)現(xiàn)自己被綠了寸癌。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,133評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡弱贼,死狀恐怖蒸苇,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情吮旅,我是刑警寧澤溪烤,帶...
    沈念sama閱讀 35,815評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站庇勃,受9級特大地震影響檬嘀,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜责嚷,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,477評論 3 331
  • 文/蒙蒙 一鸳兽、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧再层,春花似錦贸铜、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,022評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至蛋济,卻和暖如春棍鳖,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背碗旅。 一陣腳步聲響...
    開封第一講書人閱讀 33,147評論 1 272
  • 我被黑心中介騙來泰國打工渡处, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人祟辟。 一個月前我還...
    沈念sama閱讀 48,398評論 3 373
  • 正文 我出身青樓医瘫,卻偏偏與公主長得像,于是被迫代替她去往敵國和親旧困。 傳聞我的和親對象是個殘疾皇子醇份,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,077評論 2 355