rocketMQ在實(shí)現(xiàn)消息穩(wěn)定,不丟失等高可用特性時(shí)括授,采用了2種技術(shù)方案生蚁,一種是經(jīng)典Master/Slave設(shè)計(jì)風(fēng)格,但是有個(gè)最致命的缺點(diǎn)在出現(xiàn)主節(jié)點(diǎn)故障時(shí)针炉,無法切換某個(gè)slave作為主節(jié)點(diǎn)钱雷,導(dǎo)致該節(jié)點(diǎn)集群不可使用,必須將master節(jié)點(diǎn)恢復(fù)后才能使用。第二種是通過Dledger框架舶沛,不僅實(shí)現(xiàn)了數(shù)據(jù)消息的備份,也可以實(shí)現(xiàn)故障轉(zhuǎn)移窗价,保證節(jié)點(diǎn)集群可以繼續(xù)使用如庭。
本文主要熟悉Master/Slave是如何實(shí)現(xiàn),并且rocketMQ也是有自身實(shí)現(xiàn)代碼撼港,并沒有引入第三方框架坪它。
作為Broker服務(wù),他有2種角色帝牡,分別是Master往毡,Slave。當(dāng)然生產(chǎn)消息主要與Master交互靶溜,但是消息消費(fèi)2者都能使用开瞭。為了消息的穩(wěn)定,安全罩息,持久化嗤详,將消息備份在異地服務(wù)器是重要手段,所以Slave節(jié)點(diǎn)主要是保持與Master中的消息持續(xù)獲取瓷炮,并且保持在本地磁盤中葱色。當(dāng)然Slave與Master同步數(shù)據(jù),不僅僅只有消息娘香,還有很多其他的數(shù)據(jù)苍狰。
SlaveSynchronize Slave同步服務(wù)
在BrokerController中,在start啟動(dòng)方法時(shí)烘绽,針對broker角色進(jìn)行處理
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
}
當(dāng)然采用Master/Slaver模式才會(huì)執(zhí)行代碼淋昭,在handleSlaveSynchronize方法中
private void handleSlaveSynchronize(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
}
catch (Throwable e) {
log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
}
}
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
} else {
//handle the slave synchronise
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
}
}
在角色為Slave時(shí),會(huì)開啟定時(shí)任務(wù)诀姚,執(zhí)行slaveSynchronize對象的syncAll同步方法
public void syncAll() {
this.syncTopicConfig();
this.syncConsumerOffset();
this.syncDelayOffset();
this.syncSubscriptionGroupConfig();
}
在這塊同步代碼塊中响牛,同步了很多內(nèi)容,包括同步topic主題配置數(shù)據(jù)赫段,消費(fèi)進(jìn)度數(shù)據(jù)呀打,消息延遲隊(duì)列數(shù)據(jù)同步贬丛,和訂閱組配置同步给涕。舉例其中topic配置同步內(nèi)容
private void syncTopicConfig() {
String masterAddrBak = this.masterAddr;
if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
try {
TopicConfigSerializeWrapper topicWrapper =
this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
if (!this.brokerController.getTopicConfigManager().getDataVersion()
.equals(topicWrapper.getDataVersion())) {
this.brokerController.getTopicConfigManager().getDataVersion()
.assignNewOne(topicWrapper.getDataVersion());
this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
this.brokerController.getTopicConfigManager().getTopicConfigTable()
.putAll(topicWrapper.getTopicConfigTable());
this.brokerController.getTopicConfigManager().persist();
log.info("Update slave topic config from master, {}", masterAddrBak);
}
} catch (Exception e) {
log.error("SyncTopicConfig Exception, {}", masterAddrBak, e);
}
}
}
從master服務(wù)器上獲取到了topicWrapper包裝好的topic配置內(nèi)容豺憔,然后更新自身brokerController中的相關(guān)的topic數(shù)據(jù)恭应,最終進(jìn)行持久化抄邀。其他的同步數(shù)據(jù),也是一樣境肾,需要持久化到本地文件中奥喻。在這里环鲤,目前只是同步了一些外圍的配置數(shù)據(jù)憎兽,消費(fèi)進(jìn)度數(shù)據(jù)等唇兑,但是并沒有說明消息是如何同步的扎附。
消息同步
消息同步留夜,設(shè)計(jì)到存儲(chǔ)相關(guān)碍粥,所以將這部分核心代碼放在了store模塊下嚼摩,其中HAService是核心服務(wù)枕面。
首先了解一下HAService服務(wù)內(nèi)部類HAClient潮秘,他是最為Slave客戶端枕荞,發(fā)起請求同步Master消息數(shù)據(jù)躏精。
HAClient
private final AtomicReference<String> masterAddress = new AtomicReference<>(); // master地址
private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
private SocketChannel socketChannel;
private Selector selector;
private long lastWriteTimestamp = System.currentTimeMillis();
private long currentReportedOffset = 0;
private int dispatchPosition = 0; // 該位置矗烛,是不停的增長的高诺,當(dāng)byteBufferRead寫滿虱而,會(huì)重置為0
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
這部分是client的基本數(shù)據(jù)牡拇,其中記錄了masterAddress地址惠呼,currentReportedOffset 當(dāng)前上報(bào)的偏移量剔蹋,dispatchPosition臨時(shí)記錄存儲(chǔ)的位置泣崩。由于HAClient是實(shí)現(xiàn)了ServiceThread類矫付,所以他也是一個(gè)線程類
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
if (this.connectMaster()) {
if (this.isTimeToReportOffset()) {
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
this.selector.select(1000);
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
// 上報(bào)一下slave的最大偏移量進(jìn)度
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
long interval =
HAService.this.getDefaultMessageStore().getSystemClock().now()
- this.lastWriteTimestamp;
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
.getHaHousekeepingInterval()) {
log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
+ "] expired, " + interval);
this.closeMaster();
log.warn("HAClient, master not response some time, so close connection");
}
} else {
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.waitForRunning(1000 * 5);
}
}
log.info(this.getServiceName() + " service end");
}
作為客戶端妨马,他需要主動(dòng)發(fā)起連接master烘跺,所以在嘗試得到獲取數(shù)據(jù)時(shí)液荸,確認(rèn)master是否連接成功娇钱,在這里文搂,rocketMQ自己實(shí)現(xiàn)了基于jdk的NIO實(shí)現(xiàn)通信煤蹭。
private boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) {
String addr = this.masterAddress.get();
if (addr != null) {
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
this.lastWriteTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}
在連接master時(shí)硝皂,確認(rèn)masterAddress地址是否存在稽物,然后同步該地址得到channel贝或,并且注冊到selector中咪奖,進(jìn)行讀寫監(jiān)聽羊赵。當(dāng)然在連接初始化時(shí)慷垮,得到currentReportedOffset值料身,即為當(dāng)前slave的存儲(chǔ)消息的最大偏移量芹血。在連接master成功后幔烛,都會(huì)定時(shí)進(jìn)行上報(bào)master當(dāng)前偏移量
private boolean reportSlaveMaxOffset(final long maxOffset) {
this.reportOffset.position(0);
this.reportOffset.limit(8);
this.reportOffset.putLong(maxOffset);
this.reportOffset.position(0);
this.reportOffset.limit(8);
// 上報(bào)一個(gè)slave最大的偏移量
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
this.socketChannel.write(this.reportOffset);
} catch (IOException e) {
log.error(this.getServiceName()
+ "reportSlaveMaxOffset this.socketChannel.write exception", e);
return false;
}
}
lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
return !this.reportOffset.hasRemaining();
}
上報(bào)內(nèi)容也很簡單,將reportOffset 字節(jié)緩存進(jìn)行重置狡恬,并且添加8個(gè)子節(jié)點(diǎn)maxOffset值弟劲,然后寫入到socketChannel中兔乞。這樣master就能直到當(dāng)前slave的上報(bào)進(jìn)度了庸追。當(dāng)然定時(shí)上報(bào)還有另外好處淡溯,可以用這樣的方式代替實(shí)時(shí)心跳檢測血筑,保證channel長時(shí)間連接豺总。
在master下發(fā)數(shù)據(jù)時(shí)候喻喳,slave就能監(jiān)聽到讀事件表伦,然后channel就能從通道中讀取字節(jié)流蹦哼。
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
} catch (IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}
return true;
}
processReadEvent 就是用以處理讀事件的妆丘,其中byteBufferRead是用來保存讀取channel通道中的字節(jié)流勺拣。當(dāng)讀取到內(nèi)容時(shí)药有,readSize是大于0的苇经。讀取完內(nèi)容后塑陵,就開始執(zhí)行分發(fā)讀請求 dispatchReadRequest方法令花。
private boolean dispatchReadRequest() {
final int msgHeaderSize = 8 + 4; // phyoffset + size
int readSocketPos = this.byteBufferRead.position();
while (true) {
int diff = this.byteBufferRead.position() - this.dispatchPosition;
if (diff >= msgHeaderSize) {
// 得到物理偏移量
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
// 得到body的大小
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
if (slavePhyOffset != 0) {
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ slavePhyOffset + " MASTER: " + masterPhyOffset);
return false;
}
}
if (diff >= (msgHeaderSize + bodySize)) { // 至少是一個(gè)完整的包
byte[] bodyData = new byte[bodySize];
this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
this.byteBufferRead.get(bodyData);
// 寫入到slave中去
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
this.byteBufferRead.position(readSocketPos);
this.dispatchPosition += msgHeaderSize + bodySize; // dispatchPosition 記錄的是n個(gè)完整的包長度
if (!reportSlaveMaxOffsetPlus()) {
// 上報(bào)一下slave的最大偏移量
return false;
}
continue;
}
}
if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}
break;
}
return true;
}
定義了一個(gè)msgHeaderSize 消息頭長度,readSocketPos是當(dāng)前byteBufferRead的位置扮碧,首先明確慎王,byteBufferRead如果從channel中讀取數(shù)據(jù)時(shí)候,他的pos位置也是增長的咱旱。所以readSocketPos不是讀取消息的位置吐限,當(dāng)前消息的終點(diǎn)位置诸典。那么怎么得到讀取消息的起始位置呢搂赋,在HAClient中,有個(gè)屬性dispatchPosition 就是用來保存上一次讀取的位置幅慌。在master發(fā)送消息數(shù)據(jù)的時(shí)候胰伍,基本協(xié)議的結(jié)構(gòu)是 消息頭(包括物理偏移量骂租,消息body的長度)+ 數(shù)據(jù)內(nèi)容渗饮。在準(zhǔn)備讀取內(nèi)容時(shí)私蕾,確認(rèn)上一次讀取的位置dispatchPosition與當(dāng)前byteBufferRead的位置的長度差大于等于一個(gè)消息頭的大小踩叭。先從上一個(gè)起始點(diǎn)開始讀取8個(gè)字節(jié)容贝,即為masterPhyOffset最大偏移量斤富,然后讀取4個(gè)字節(jié)bodySize茂缚,消息內(nèi)容長度脚囊。當(dāng)前會(huì)進(jìn)行判斷slave服務(wù)中當(dāng)前最大的物理偏移量slavePhyOffset 是否與master推送過來的masterPhyOffset是否一致。這樣的目的是保證slave與master數(shù)據(jù)同步一致性衬以,保證是不會(huì)丟失看峻。在diff >= (msgHeaderSize + bodySize)比較中互妓,確定是否存在一個(gè)完整的數(shù)據(jù)包冯勉。然后聲明長度為bodySize的bodyData字節(jié)數(shù)組宛瞄,并且重置了byteBufferRead的pos位置份汗,然后將數(shù)據(jù)復(fù)制給bodyData裸影,然后將數(shù)據(jù)添加到commitLog中轩猩。數(shù)據(jù)添加完成后,又將byteBufferRead的pos位置重置為readSocketPos彤委,并且dispatchPosition 又增加了一個(gè)完整數(shù)據(jù)包的長度焦影,最后及時(shí)上報(bào)master服務(wù)當(dāng)前slave的最大物理偏移量斯辰。
byteBufferRead最為存儲(chǔ)通信數(shù)據(jù)的載體,長度為4M闸氮。在目前數(shù)據(jù)獲取時(shí)蒲跨,byteBufferRead是一直在增加财骨,即pos在增加隆箩,即有讀取到新得數(shù)據(jù),byteBufferRead的pos一直變大理澎,直到limit=pos時(shí)無法添加內(nèi)容糠爬。為保證數(shù)據(jù)持續(xù)獲取,肯定需要將byteBufferRead重置镀琉,然后再讀取屋摔。所以才會(huì)有reallocateByteBuffer方法
private void reallocateByteBuffer() {
int remain = READ_MAX_BUFFER_SIZE - this.dispatchPosition;
if (remain > 0) {
this.byteBufferRead.position(this.dispatchPosition);
this.byteBufferBackup.position(0);
this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);
this.byteBufferBackup.put(this.byteBufferRead); // 將read沒有讀取的部分放入到backup中
}
this.swapByteBuffer();
this.byteBufferRead.position(remain);
this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
this.dispatchPosition = 0;
}
當(dāng)byteBufferRead數(shù)據(jù)存滿,但是任然會(huì)出現(xiàn) 最后一個(gè)數(shù)據(jù)包是不完整的弓熏。肯定需要將最后一段數(shù)據(jù)進(jìn)行保存起來滞谢,下次繼續(xù)使用狮杨。在這里清寇,他采用了byteBufferBackup一個(gè)備份的字節(jié)緩存华烟。remain是指一個(gè)數(shù)據(jù)包的部分?jǐn)?shù)據(jù)長度盔夜。當(dāng)remain大于0,說明包不完整椭微,就會(huì)將byteBufferRead剩余部分復(fù)制給重置后的byteBufferBackup,此時(shí)byteBufferBackup是存在數(shù)據(jù)瓢剿,并且當(dāng)前pos為remain间狂。swapByteBuffer方法就是將byteBufferRead和byteBufferBackup執(zhí)行對象互相交換,即現(xiàn)在的byteBufferRead就是原來的byteBufferBackup對象纺弊。然后又重置了byteBufferRead的limit值,并且dispatchPosition的位置也變成0了犹菱。下次byteBufferRead再從socketChannel中讀取的位置就是從remain開始了腊脱。
這是作為slave端陕凹,如何主動(dòng)發(fā)起連接搜骡,并且上報(bào)最大偏移量和消息數(shù)據(jù)獲取并存儲(chǔ)的邏輯。
Master服務(wù)端
最為服務(wù)端,首先他可以由多個(gè)slave同時(shí)同步消息數(shù)據(jù)店茶,所以需要有個(gè)管理客戶端的服務(wù)。
AcceptSocketService
該類是HAService內(nèi)部類两嘴,他是監(jiān)聽管理連接slave的connection服務(wù)
在配置通信服務(wù)端時(shí)
public void beginAccept() throws Exception {
this.serverSocketChannel = ServerSocketChannel.open();
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
this.serverSocketChannel.socket().bind(this.socketAddressListen);
this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
綁定端口丛楚,配置非阻塞,并且注冊了接收事件憔辫,即接受客戶端連接請求趣些。
在開啟線程時(shí)
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) {
for (SelectionKey k : selected) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
// 有請求連接
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
// 創(chuàng)建一個(gè)客戶端的連接,
HAConnection conn = new HAConnection(HAService.this, sc);
conn.start();
HAService.this.addConnection(conn);
} catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
} else {
log.warn("Unexpected ops in select " + k.readyOps());
}
}
selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}
log.info(this.getServiceName() + " service end");
}
就是實(shí)時(shí)監(jiān)聽連接請求贰您,selected 即為得到事件的所以連接通道,然后判斷確認(rèn)存在OP_ACCEPT事件锦亦,將客戶端SocketChannel封裝成HAConnection并且添加到HAService中connectionList集合中舶替。當(dāng)然AcceptSocketService只是管理連接客戶端數(shù),服務(wù)器真正讀寫邏輯在HAConnection完成的杠园。
HAConnection
public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {
this.haService = haService;
this.socketChannel = socketChannel;
this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
this.socketChannel.configureBlocking(false);
this.socketChannel.socket().setSoLinger(false, -1);
this.socketChannel.socket().setTcpNoDelay(true);
this.socketChannel.socket().setReceiveBufferSize(1024 * 64);
this.socketChannel.socket().setSendBufferSize(1024 * 64);
this.writeSocketService = new WriteSocketService(this.socketChannel);
this.readSocketService = new ReadSocketService(this.socketChannel);
this.haService.getConnectionCount().incrementAndGet();
}
構(gòu)成器中創(chuàng)建了readSocketService讀服務(wù)顾瞪,和writeSocketService寫服務(wù)。當(dāng)然讀服務(wù)主要任務(wù)是讀取 slave上報(bào)的最大偏移量抛蚁,而寫服務(wù)是寫入消息數(shù)據(jù)到通道中陈醒。
ReadSocketService
讀服務(wù)只要監(jiān)聽讀事件即可,并且通過線程篮绿,實(shí)時(shí)監(jiān)聽數(shù)據(jù)孵延,然后執(zhí)行處理讀請求
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
if (!this.byteBufferRead.hasRemaining()) {
this.byteBufferRead.flip();
this.processPosition = 0;
}
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
// slave上報(bào)的最大偏移量占8個(gè)子節(jié),正常情況下byteBufferRead的position是8的倍數(shù)亲配,但是不能確定出現(xiàn)粘包情況的出現(xiàn)
// 所以pos重新計(jì)算尘应,規(guī)避該情況的出現(xiàn)惶凝,保證獲取的位置是正確的
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPosition = pos;
HAConnection.this.slaveAckOffset = readOffset;
if (HAConnection.this.slaveRequestOffset < 0) {
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
}
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
return false;
}
} catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}
return true;
}
}
byteBufferRead 存儲(chǔ)最大長度為1M,但是當(dāng)byteBufferRead滿后犬钢,就直接重置苍鲜,并沒有處理未完整包,為什么呢玷犹?因?yàn)閟lave上報(bào)最大偏移量時(shí)混滔,占用8個(gè)字節(jié),所以byteBufferRead的長度肯定能存儲(chǔ)完整的包歹颓。processPosition是上一次讀取數(shù)據(jù)的位置坯屿。從socketChannel讀取內(nèi)容時(shí)都會(huì)復(fù)制到byteBufferRead中,然后與上一次去讀位置比較巍扛,因?yàn)橐粋€(gè)包只要8個(gè)字節(jié)领跛,所以差值大于等于8即可。但是在獲取slave上報(bào)的偏移量時(shí)撤奸,他只是獲取了最近的一個(gè)包吠昭,即如果this.byteBufferRead.position() - this.processPosition是8的好幾倍,他也是讀取最某位的數(shù)據(jù)胧瓜。不關(guān)心矢棚,中間包的數(shù)據(jù)情況。
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8); 該pos獲取的方式府喳,就是為了保證pos是8的倍數(shù)蒲肋,剔除掉某位可能出現(xiàn)不完整包的情況。然后readOffset的偏移量就是pos-8 的位置讀取8個(gè)字節(jié)的數(shù)據(jù)劫拢。并且重置了processPosition的位置肉津。slaveAckOffset 就是認(rèn)為slave存儲(chǔ)的數(shù)據(jù)也是持久化成功的確認(rèn)位置,然后執(zhí)行haService.notifyTransferSome服務(wù)(待會(huì)聊舱沧,為什么需要這個(gè)通知服務(wù))妹沙。其中slaveRequestOffset 是HAConnection的屬性,當(dāng)小于0時(shí)需要將此次讀取到readOffset賦值給slaveRequestOffset熟吏。
這是一個(gè)讀取服務(wù)距糖,既可以直到當(dāng)前slave的消息持久的位置,也能保證心跳連接方式牵寺。
WriteSocketService
該服務(wù)肯定是推送消息數(shù)據(jù)給slave悍引,并且注冊了寫事件。在slave實(shí)現(xiàn)讀消息時(shí)帽氓,已經(jīng)知道一個(gè)包的組成結(jié)構(gòu)趣斤,消息頭(起始偏移量+消息長度)+ 消息內(nèi)容。在實(shí)現(xiàn)線程接口方法時(shí)黎休,分開講解浓领,一部分 初始情況怎么做玉凯,第二部數(shù)據(jù)是怎么寫的,如果沒有寫完怎么做
第一部分 初始操作
if (-1 == HAConnection.this.slaveRequestOffset) {
Thread.sleep(10);
continue;
}
if (-1 == this.nextTransferFromWhere) {
// 初始化
if (0 == HAConnection.this.slaveRequestOffset) {
// slave 沒有任何請求偏移量联贩,那么默認(rèn)從一個(gè)文件開始
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset =
masterOffset - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMappedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
this.nextTransferFromWhere = masterOffset;
} else {
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+ "], and slave request " + HAConnection.this.slaveRequestOffset);
}
slaveRequestOffset 等于-1 說明slave還沒有發(fā)送當(dāng)前最大偏移量漫仆,作為master,是無法確定給slave推送消息數(shù)據(jù)的起始位置泪幌,所以不能繼續(xù)執(zhí)行盲厌。
當(dāng)nextTransferFromWhere為-1時(shí),說明master還沒有開始推送祸泪,需要確認(rèn)推送給slave的起始位置吗浩。如果slaveRequestOffset等于0,說明slave當(dāng)前狀況完全是新服務(wù)浴滴,沒有本地存儲(chǔ)的消息拓萌,那么master推送給slave的起始位置是存儲(chǔ)消息的最后一個(gè)文件開始位置。masterOffset是當(dāng)前master最大偏移量升略,最終取模相減之后,即為最后文件的起始位置屡限。
如果說slaveRequestOffset大于0品嚣,那么master推送給slave起始位置,即為slave當(dāng)前最大的位置slaveRequestOffset钧大。
這是初始工作翰撑,最重要的任務(wù)就是需要確認(rèn)master推送數(shù)據(jù)時(shí)的起始位置。
第二部分 推送數(shù)據(jù)
SelectMappedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
int size = selectResult.getSize();
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;
selectResult.getByteBuffer().limit(size);
this.selectMappedBufferResult = selectResult;
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(thisOffset);
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
} else {
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
通過nextTransferFromWhere位置截取SelectMappedBufferResult數(shù)據(jù)啊央,由于受到推送最大長度限制眶诈,默認(rèn)最大size為32k數(shù)據(jù)。需要對byteBuffer進(jìn)行l(wèi)imit 限制最大為size長度瓜饥,然后將selectResult對象指向給當(dāng)前屬性的selectMappedBufferResult逝撬。開始先定義頭數(shù)據(jù)byteBufferHeader ,放入了nextTransferFromWhere值8個(gè)字節(jié)乓土,和內(nèi)容長度size 4個(gè)字節(jié)宪潮。最后執(zhí)行傳輸數(shù)據(jù)方法transferData
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
// Write Header
while (this.byteBufferHeader.hasRemaining()) {
int writeSize = this.socketChannel.write(this.byteBufferHeader);
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write header error < 0");
}
}
if (null == this.selectMappedBufferResult) {
return !this.byteBufferHeader.hasRemaining();
}
writeSizeZeroTimes = 0;
// Write Body
if (!this.byteBufferHeader.hasRemaining()) {
while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write body error < 0");
}
}
}
boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
this.selectMappedBufferResult.release();
this.selectMappedBufferResult = null;
}
return result;
}
寫數(shù)據(jù)分2部分,一部分是頭數(shù)據(jù)趣苏,一部分是寫消息數(shù)據(jù)狡相。首先保證byteBufferHeader寫完,才能寫入消息內(nèi)容食磕。在寫完消息數(shù)據(jù)后尽棕,有個(gè)注意點(diǎn),就是將selectMappedBufferResult進(jìn)行release釋放彬伦。因?yàn)閺腸ommitLog中獲取selectMappedBufferResult時(shí)滔悉,標(biāo)記為使用狀態(tài)伊诵。如果用完,需要主動(dòng)釋放氧敢。假設(shè)此次沒有寫完時(shí)日戈,在下個(gè)循環(huán)會(huì)繼續(xù)執(zhí)行。
有個(gè)問題需要思考一下孙乖,在master推送數(shù)據(jù)時(shí)浙炼,推送的數(shù)據(jù)都是完整的消息內(nèi)容嗎?
答案肯定是否定唯袄,首先.規(guī)定了消息最大推送長度為32k弯屈,假設(shè)slave是全新的服務(wù)器或者長時(shí)間沒有同步,導(dǎo)致slave與master服務(wù)的最大偏移量差會(huì)很大恋拷。那么每次推送的數(shù)據(jù)中肯定都不是非常完整的消息內(nèi)容了资厉。所以只要偏差小于32k,基本上就能推送完整的消息了蔬顾。
GroupTransferService
在master讀取slave最大偏移量時(shí)宴偿,會(huì)執(zhí)行HAService中的notifyTransferSome方法
public void notifyTransferSome(final long offset) {
for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
// 通知一下slave已經(jīng)更新到了最大的偏移量,有些同步等待的可以確認(rèn)slave成功了
boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
if (ok) {
this.groupTransferService.notifyTransferSome();
break;
} else {
value = this.push2SlaveMaxOffset.get();
}
}
}
因?yàn)橐粋€(gè)master同時(shí)保持多個(gè)slave連接诀豁,并且每個(gè)slave上報(bào)的偏移量可能都不太一致窄刘,所以該放入通過原子長整型對象push2SlaveMaxOffset,來保證原子性舷胜。當(dāng)offset確實(shí)大于了value值娩践,并且push2SlaveMaxOffset更新成功,那么才能執(zhí)行g(shù)roupTransferService.notifyTransferSome();方法烹骨。如果更新失敗翻伺,那么從新判斷更新。
那么GroupTransferService最用是什么呢沮焕?在之前消息存儲(chǔ)講到吨岭,當(dāng)broker是同步狀態(tài)時(shí)候,一條消息存儲(chǔ)成功遇汞,是需要消息在本地broker持久化成功未妹,第二如果存在Slave,保證slave也需要將消息持久化成功空入。
在CommitLog類handleHA方法
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
// Determine whether to wait
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
boolean flushOK =
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
}
通過DefaulteMessageStore中獲取到HAService服務(wù)络它,并且slave都正常,然后創(chuàng)建GroupCommitRequest 對象放入到HAservice中歪赢,即為GroupTransferService中化戳,然后進(jìn)行等待,等待核心就是通過CountDownLatch實(shí)現(xiàn)。
GroupTransferService 服務(wù)也是實(shí)現(xiàn)ServiceThread点楼,
private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();
private volatile List<CommitLog.GroupCommitRequest> requestsWrite = new ArrayList<>();
private volatile List<CommitLog.GroupCommitRequest> requestsRead = new ArrayList<>();
其中requestsWrite 用于放置提交請求的扫尖,requestsRead是用來去讀請求,通過swapRequests方法進(jìn)行讀寫轉(zhuǎn)換掠廓。swapRequests方法是通過等待結(jié)束后會(huì)調(diào)用onWaitEnd方法换怖,然后執(zhí)行讀寫轉(zhuǎn)換。
首先是放入提交請求方法蟀瞧。
public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
首先對requestsWrite進(jìn)行鎖定沉颂,然后放入提交請求。如果當(dāng)前線程沒有通知悦污,那么將hasNotified標(biāo)記為已經(jīng)通知铸屉,然后喚醒線程,因?yàn)榫€程await是通過waitPoint實(shí)現(xiàn)的切端。
private void doWaitTransfer() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
for (int i = 0; !transferOK && i < 5; i++) {
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
if (!transferOK) {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
}
req.wakeupCustomer(transferOK);
}
this.requestsRead.clear();
}
}
}
doWaitTransfer方法是核心彻坛,首先鎖住requestsRead,然后就行遍歷踏枣。當(dāng)push2SlaveMaxOffset值大于等于了req中物理偏移量值(該值是消息的起始物理偏移量+消息長度)昌屉,那么需要喚醒寫入線程,并且標(biāo)記成功茵瀑。如何transferOK =false怠益,則會(huì)進(jìn)行等待,等待時(shí)長最多 5s瘾婿。如果失敗,然后會(huì)寫入喚醒線程烤咧,但是標(biāo)記為失敗偏陪。寫入線程喚醒后,通過狀態(tài)進(jìn)行反饋煮嫌,如果flushOK為false 笛谦,putMessageResult放置狀態(tài)FLUSH_SLAVE_TIMEOUT。
那么有沒有可能昌阿,存在request丟失的情況饥脑,不會(huì)出現(xiàn)。極端情況懦冰,當(dāng)put請求時(shí)當(dāng)前對象為write灶轰,但此時(shí)GroupTransferService線程正好等待結(jié)束,進(jìn)行讀寫轉(zhuǎn)換了刷钢,此時(shí)讀的對象變成了寫笋颤,寫的對象變成了讀,但是放置線程對象執(zhí)行了讀的遍歷内地。因?yàn)樵诜胖脮r(shí)伴澄,還是讀取遍歷時(shí)赋除,都進(jìn)行對對象鎖定。保證其他線程不能修改了非凌。但是會(huì)出現(xiàn)一種情況举农,就是讀取線程遍歷完后,會(huì)清空讀請求敞嗡,然后釋放鎖颁糟,此時(shí)put線程就獲取到該對象,然后放置了request對象秸妥。那么該對象何時(shí)會(huì)被讀取到呢滚停?需要進(jìn)行2次swap后才能被讀取到,但是不會(huì)被清空粥惧。
作為broker 節(jié)點(diǎn) slave角色芽世,需要同步很多數(shù)據(jù),但是其中有2個(gè)數(shù)據(jù)不同步损搬,一個(gè)時(shí)消費(fèi)隊(duì)列ConsumeQueue數(shù)據(jù)崎淳,一個(gè)是索引數(shù)據(jù)IndexFile。那么回顧一下這2個(gè)數(shù)據(jù)是如何實(shí)現(xiàn)的咏删,在BrokerController中惹想,有一個(gè)線程服務(wù)ReputMessageService,會(huì)不停的從commitLog中獲取消息督函,然后進(jìn)行分發(fā)嘀粱,從而會(huì)添加消費(fèi)隊(duì)列數(shù)據(jù)和索引數(shù)據(jù)。