1 概述
2 相關(guān)類介紹
3 同步復(fù)制原理
4 異步復(fù)制原理
5 注意事項
1 概述
為了提供系統(tǒng)的可靠性付材,RocketMQ采用了主從復(fù)制機(jī)制赞别,對于每個主Broker捅僵,可配置多個從Broker慷妙,主Broker接收生產(chǎn)者消息保存后眷昆,可通過同步或者異步的方式復(fù)制到從Broker演训,以此實現(xiàn)高可用弟孟。
采用同步復(fù)制,可以最大程度保證消息的可靠性样悟,但是每次寫消息都必須等待至少該消息被同步至一個從Broker中拂募,因此也會影響系統(tǒng)吞吐量。異步復(fù)制則和同步復(fù)制正好相反窟她,主Broker寫完消息之后立刻返回陈症,不用管該消息是否已經(jīng)被復(fù)制到從Broker,因此吞吐量會高一點(diǎn)震糖,但是如果在消息被復(fù)制到從Broker之前主Broker發(fā)生故障录肯,那么有可能會造成未來得及復(fù)制到從Broker的信息丟失。
本文會簡單介紹下RocketMQ主從同步的實現(xiàn)吊说。
2 相關(guān)類介紹
HAService
是主從復(fù)制服務(wù)的主要實現(xiàn)類论咏,通過內(nèi)部相關(guān)組件實現(xiàn)接受從Broker連接請求、記錄從Broker上報的復(fù)制進(jìn)度等功能颁井。
AcceptSocketService
主要負(fù)責(zé)接受從Broker的連接請求厅贪,接受到的每個從Broker的連接之后會新建HAConnection
對象實例。
HAConnection
主Broker管理的一系列從Broker連接雅宾,內(nèi)部持有一個ReadSocketService
對象實例养涮,負(fù)責(zé)接收從Broker定時上報的自己當(dāng)前復(fù)制進(jìn)度,也持有一個WriteSocketService
負(fù)責(zé)在通道可寫時向從Broker發(fā)送需要復(fù)制的數(shù)據(jù),完成主從復(fù)制单寂。
HAClient
一個ServiceThread
實現(xiàn)類贬芥,如果是從Broker,則會嘗試向主Broker建立連接宣决,并定時向主Broker匯報自己的復(fù)制進(jìn)度蘸劈,之后監(jiān)聽OP_READ
事件(可參考筆者文章NIO SelectionKey事件理解),處理主Broker通過HAConnection.WriteSocketService
發(fā)送過來的需要復(fù)制的數(shù)據(jù)尊沸。
GroupTransferService
主從同步復(fù)制的實現(xiàn)類威沫,如果是主從同步復(fù)制,則會向該類提交一個復(fù)制任務(wù)請求洼专,并進(jìn)入阻塞等待狀態(tài)棒掠,該任務(wù)主要封裝了當(dāng)前主Broker的消息寫進(jìn)度,GroupTransferService
也是一個ServiceThread
屁商,會定時獲取HAConnection.ReadSocketService
接受到的從Broker的最大復(fù)制進(jìn)度烟很,然后對比所有的復(fù)制任務(wù)請求,如果已經(jīng)從Broker最大復(fù)制進(jìn)度已經(jīng)大于請求內(nèi)的進(jìn)度要求蜡镶,則喚醒該同步復(fù)制阻塞雾袱,阻塞和喚醒是通過CountDownLatch
實現(xiàn)的。
3 同步復(fù)制原理
BrokerController
通過DefaultMessageStore.putMessage
存儲消息官还,后者則通過調(diào)用CommitLog.putMessage
進(jìn)行實際的消息存儲處理芹橡,CommitLog.putMessage
在將消息寫入內(nèi)存緩沖之后會先調(diào)用handleDiskFlush
進(jìn)行同步或異步刷盤,之后會調(diào)用handleHA
進(jìn)行主從復(fù)制處理望伦。
handlerHA
方法定義如下:
//CommitLog
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
//住過服務(wù)器狀態(tài)為同步主Broker林说,則需要向HAService的
//GroupTransferService提交復(fù)制等待請求
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
// Determine whether to wait
//isSlaveOK則判斷當(dāng)前是否有從Broker連接,如果沒有返回
//失敗屯伞,所以如果集群只有一個主Broker腿箩,沒有從Broker,但是
//配置該主Broker為同步模式劣摇,則會一直報
//SLAVE_NOT_AVAILABLE錯誤
//同時如果當(dāng)前從Broker復(fù)制的最大進(jìn)度離當(dāng)前寫入的位置偏差
//大于配置的指定值珠移,則也會報錯,這個判斷就是為了保證
//從Broker不會落后主Broker太多
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
//等待請求的復(fù)制進(jìn)度為當(dāng)前寫開始位置+寫入字節(jié)數(shù)
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
//等待復(fù)制到該進(jìn)度完成
boolean flushOK =
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
}
GroupTransferService
接受了復(fù)制等待請求任務(wù)之后會在run
方法中定時查看從Broker上報的最大復(fù)制進(jìn)度饵撑,會喚醒那些等待進(jìn)度已達(dá)到的等待任務(wù)。
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//定時調(diào)用doWaitTransfer方法
this.waitForRunning(10);
this.doWaitTransfer();
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info(this.getServiceName() + " service end");
}
private void doWaitTransfer() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
//對于提交上來的每個等待復(fù)制請求唆貌,判斷是否達(dá)到
//預(yù)期的復(fù)制進(jìn)度
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
//如果從Broker上報的賦值進(jìn)度大于請求期望的偏移
//則表示傳輸完成滑潘,也就是等待完成
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
//否則的話,再嘗試五次判斷锨咙,每次間隔一秒
for (int i = 0; !transferOK && i < 5; i++) {
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
//如果五次依然失敗语卤,則記錄日志
if (!transferOK) {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
}
//令等待同步復(fù)制的handleHA方法返回,并通過transferOK
//告訴其等待結(jié)果
req.wakeupCustomer(transferOK);
}
this.requestsRead.clear();
}
}
}
4 異步復(fù)制原理
其實異步復(fù)制原理比較簡單,如果Broker配置為異步復(fù)制粹舵,則在CommitLog.putMessage
寫入消息之后钮孵,調(diào)用handleHA
方法不會做任何操作,寫入之后根本不用管從Broker的復(fù)制進(jìn)度眼滤,復(fù)制完全是由后臺HAConnection.WriteSocketService
服務(wù)在監(jiān)聽到有從Broker連接可寫時巴席,向其寫待復(fù)制的數(shù)據(jù)。每個從Broker發(fā)送進(jìn)度則由從Broker定時匯報的自身當(dāng)前已復(fù)制進(jìn)度控制诅需,該匯報由HAConnection.ReadSocketService
負(fù)責(zé)處理漾唉,從Broker匯報上來的最大復(fù)制進(jìn)度則用于第3節(jié)介紹的同步復(fù)制的等待復(fù)制任務(wù)阻塞的線程。
5 注意事項
RocketMQ各組件通信比如Broker和Namesrv堰塌、Producer和Namesrv赵刑、Consumer和Namesrv、Producer以及Consumer和Broker之間的通信都是基于Netty實現(xiàn)的场刑,但是本文介紹的主從復(fù)制實現(xiàn)中的網(wǎng)絡(luò)交互都是基于原生Java NIO實現(xiàn)的般此。