概覽
RocketMQ作為一款優(yōu)秀的分布式消息中間件介牙,分布式系統(tǒng)的一個很重要的特點就是要保證系統(tǒng)的高可用(HA)膘螟,RocketMQ則是通過主從同步機制保證系統(tǒng)的高可用富雅。
下面是兩主兩從的主從同步原理圖缀辩。
主從兩節(jié)點優(yōu)點
- 數據備份:保證了兩/多臺機器上的數據冗余屠列,特別是在主從同步復制的情況下,一定程度上保證了Master出現不可恢復的故障以后罢艾,數據不丟失楣颠。
- 高可用性:即使Master掉線嫁乘, Consumer會自動重連到對應的Slave機器,不會出現消費停滯的情況球碉。
- 提高性能:主要表現為可分擔Master讀的壓力,當從Master拉取消息仓蛆,拉取消息的最大物理偏移與本地存儲的最大物理偏移的差值超過一定值睁冬,會轉向Slave(默認brokerId=1)進行讀取,減輕了Master壓力看疙。
- 消費實時:master宕機后消費者可以從slave上消費保證消息的實時性豆拨,但是slave不能接收producer發(fā)送的消息,slave只能同步master數據(RocketMQ4.5版本之前)能庆,4.5版本開始增加多副本機制施禾,根據RAFT算法,master宕機會自動選擇其中一個副本節(jié)點作為master保證消息可以正常的生產消費搁胆。
主從數據同步有兩種方式同步復制弥搞、異步復制
復制方式 | 優(yōu)點 | 缺點 | 適應場景 |
---|---|---|---|
同步復制 | slave保證了與master一致的數據副本,如果master宕機渠旁,數據依然在slave中找到其數據和master的數據一致 | 由于需要slave確認效率上會有一定的損失 | 數據可靠性要求很高的場景 |
異步復制 | 無需等待slave確認消息是否存儲成功效率上要高于同步復制 | 如果master宕機攀例,由于數據同步有延遲導致slave和master存在一定程度的數據不一致問題 | 數據可靠性要求一般的場景 |
CommitLog復制
主從節(jié)點同步只復制commitlog消息信息,consumequeue顾腊、indexfile所索引文件不會同步粤铭,會由從節(jié)點Broker的commitlog文件重新生成本機的consumequeue、indexfile的索引信息杂靶。
HAService:Master和Slave通信的服務類梆惯;包含Slave作為客戶端的HAClient類對象。
HAClient:Slave向Master建立連接吗垮,發(fā)送Offset數據請求垛吗,并處理Master返回請求的類對象;HAClient作為Slave向Master通信的客戶端抱既,和Master建立socket連接职烧。
AcceptSocketService:Master服務端和Slave建立連接,并監(jiān)聽Slave的IO事件防泵,建立HAConnection對象蚀之。
HAConnection:HAConnection是Master用來和Slave建立連接的類,處理和Slave的交互捷泞。Master(類似服務端)Slave(類似客戶端)
ReadSocketService:ReadSocketService用來讀取Slave向Master發(fā)送的數據足删,采用IO復用的方式處理。
WriteSocketService:WriteSocketService用來Master向Slave寫返回的數據(commitlog的message數據)锁右。
GroupTransferService: GroupTransferService是用來控制Master是否向Slave同步commitlog數據的失受。Master和Slave會進行通信讶泰,Master寫message到內存ByteBuffer,然后調用handleHA()方法拂到,然后構造一個同步請求放入GroupTransferService#requestsWrite的隊列里痪署,等待HAConnection#WriteSocketService處理這個請求,然后將commitlog的message數據,同步到Slave中兄旬。
圖解類關系
HAClient類是Slave節(jié)點使用的狼犯,用來向Master通信的,相當于客戶端的角色领铐。
HAService是類Master節(jié)點使用的悯森,里面包含了AcceptSocketService和GroupTransferService。
AcceptSocketService是同Slave建立連接绪撵,并監(jiān)聽Slave的IO事件瓢姻,建立HAConnection連接對象。
HAConnection包含ReadSocketService和WriteSocketService音诈;ReadSocketService用來讀取Slave向Master發(fā)送的數據幻碱,采用IO復用的方式處理。WriteSocketService用來Master向Slave寫返回的數據(commitlog的message數據)改艇。
GroupTransferService是用來控制Master是否向Slave同步commitlog數據的收班。Master和Slave會進行通信,Master寫message到內存ByteBuffer谒兄,然后調用handleHA()方法摔桦,然后構造一個同步請求放入GroupTransferService#requestsWrite的隊列里,等待HAConnection#WriteSocketService處理這個請求,然后將commitlog的message數據承疲,同步到Slave中邻耕。
Slave向Master通信
HAClient使Slave和Master建立連接,并報告自己同步的offset燕鸽,然后等待Master的返回兄世,并處理Master返回的message信息,寫入到Slave本機的commitlog文件中啊研,并構建consumequeue御滩、indexfile索引文件。
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 連接Master
if (this.connectMaster()) {
// slave是否向Master發(fā)送offset消息,默認5秒發(fā)送一次
if (this.isTimeToReportOffset()) {
//Slave向Master發(fā)送當前Slave的commitlog的最大offset
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
// 沒有寫完關閉Master
if (!result) {
this.closeMaster();
}
}
// selector使socketChannel等待1秒鐘党远,等待
// I/O復用削解,檢查是否有讀事件
this.selector.select(1000);
// 處理Master返回的待處理消息,將返回的消息寫入commitlog文件沟娱,并構建consumequeue氛驮、indexfile索引文件
boolean ok = this.processReadEvent();
if (!ok) {
// 關閉Master
this.closeMaster();
}
// 處理完讀事件后,若slave的offset更新济似,需要再次發(fā)送新的slave的offset
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
long interval =
HAService.this.getDefaultMessageStore().getSystemClock().now()
- this.lastWriteTimestamp;
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
.getHaHousekeepingInterval()) {
log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
+ "] expired, " + interval);
this.closeMaster();
log.warn("HAClient, master not response some time, so close connection");
}
} else {
// 連接失敗矫废,等待5秒;并不涉及線程之間的wait和notify操作等
this.waitForRunning(1000 * 5);
}
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
// 等待5秒盏缤,并不涉及線程之間的wait和notify操作等,然后再進行while循環(huán)蓖扑,再次連接到master
this.waitForRunning(1000 * 5);
}
}
log.info(this.getServiceName() + " service end");
}
Master和Slave建立連接
AcceptSocketService是用來Master服務端和Slave建立連接唉铜,并監(jiān)聽Slave的IO事件,建立HAConnection對象律杠。
/**
* Starts listening to slave connections.
*/
public void beginAccept() throws Exception {
this.serverSocketChannel = ServerSocketChannel.open();
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
// 綁定監(jiān)聽
this.serverSocketChannel.socket().bind(this.socketAddressListen);
// 非阻塞
this.serverSocketChannel.configureBlocking(false);
// serverSocketChannel注冊到selector
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
// 線程未停止
while (!this.isStopped()) {
try {
// 等待監(jiān)聽Socket的I/0完成事件通知打毛,超時等待1秒
this.selector.select(1000);
// 被注冊到selector上的key,也就是IO的socket
Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) {
// 遍歷監(jiān)聽俩功,一個監(jiān)聽事件一個HAConnection
for (SelectionKey k : selected) {
// 監(jiān)聽狀態(tài)ok
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
// 獲取SocketChannel
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
// 建立和Slave的連接,里面包含了ReadSocketService用來讀取Slave向Master發(fā)送的數據碰声,WriteSocketService用來寫Master向Slave返回的數據诡蜓;
HAConnection conn = new HAConnection(HAService.this, sc);
// 開啟ReadSocketService和WriteSocketService服務,處理Slave發(fā)來的請求和返回給Slave的數據
conn.start();
// 連接添加到集合
HAService.this.addConnection(conn);
} catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
} else {
log.warn("Unexpected ops in select " + k.readyOps());
}
}
// 清空selected
selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}
log.info(this.getServiceName() + " service end");
}
處理Slave發(fā)送的offset
ReadSocketService是Master用來讀取Slave向Master發(fā)送的數據胰挑,采用IO復用的方式處理蔓罚。
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
// 線程不會停止
while (!this.isStopped()) {
try {
// 同步輪詢SocketChannel,等待IO事件通知完成瞻颂,超時等待1秒
this.selector.select(1000);
// Master處理Slave發(fā)送的offset請求豺谈,并返回
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}
long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
// Master和Slave連接超時間隔,20秒超時贡这,記錄log
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
break;
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
// Broker停機茬末,線程關閉,資源釋放
this.makeStop();
writeSocketService.makeStop();
haService.removeConnection(HAConnection.this);
HAConnection.this.haService.getConnectionCount().decrementAndGet();
SelectionKey sk = this.socketChannel.keyFor(this.selector);
if (sk != null) {
sk.cancel();
}
try {
this.selector.close();
this.socketChannel.close();
} catch (IOException e) {
HAConnection.log.error("", e);
}
HAConnection.log.info(this.getServiceName() + " service end");
}
// Master處理Slave發(fā)送的offset請求盖矫,并返回
private boolean processReadEvent() {
// 讀取到數據為0byte的數據次數
int readSizeZeroTimes = 0;
// byteBufferRead不在包含空余空間丽惭,進行重新開啟
if (!this.byteBufferRead.hasRemaining()) {
this.byteBufferRead.flip();
this.processPosition = 0;
}
// byteBufferRead還有剩余空間
while (this.byteBufferRead.hasRemaining()) {
try {
// 讀取數據到byteBufferRead中
int readSize = this.socketChannel.read(this.byteBufferRead);
// 讀取到數據
if (readSize > 0) {
// 更新readSizeZeroTimes和lastReadTimestamp
readSizeZeroTimes = 0;
this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
// byteBufferRead中讀取到的數據位置>byteBufferRead上次處理過的數據>8;
// 讀取超過8byte:8byte:代表slave向master發(fā)送的offset的大小8byte
if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
// 獲得slave發(fā)送的最大的offset的位置
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
// 讀取offset
long readOffset = this.byteBufferRead.getLong(pos - 8);
// 更新處理的位置
this.processPosition = pos;
// master接受到slave發(fā)送的offset
HAConnection.this.slaveAckOffset = readOffset;
if (HAConnection.this.slaveRequestOffset < 0) {
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
}
// 喚醒GroupTransferService#WaitNotifyObject#notifyTransferObject判斷這個offset是否發(fā)送了,沒有發(fā)送進行等待(GroupTransferService#notifyTransferObject.waitForRunning(1000))辈双,
// 等待WriteSocketService寫數據成功责掏,然后再判斷是否寫入成功。
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
// 讀取到數據的數據大小為0湃望,3次跳出循環(huán)
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
return false;
}
} catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}
return true;
}
Master向Slave寫message消息
WriteSocketService用來Master向Slave寫返回的數據(commitlog的message數據)换衬。
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 同步輪詢SocketChannel,等待IO事件通知完成证芭,超時等待1秒
this.selector.select(1000);
// slave請求master的offset == -1瞳浦,項目剛開始啟動,master未接收到slave的拉取請求檩帐,sleep
if (-1 == HAConnection.this.slaveRequestOffset) {
Thread.sleep(10);
continue;
}
// nextTransferFromWhere = -1說明第一次進行數據傳輸术幔,需要計算傳輸的物理偏移量
if (-1 == this.nextTransferFromWhere) {
// 如果slaveRequestOffset為0則從當前最后一個commitlog文件傳輸,否則根據slave broker的拉取請求偏移量開始
if (0 == HAConnection.this.slaveRequestOffset) {
// 確定Master的offset
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMappedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
this.nextTransferFromWhere = masterOffset;
} else {
// 下次開始位置為slave請求位置
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
+ "], and slave request " + HAConnection.this.slaveRequestOffset);
}
// 上次寫slave數據完成
if (this.lastWriteOver) {
// 距上次寫數據間隔
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getHaSendHeartbeatInterval()) {
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
// 上次傳輸未結束則繼續(xù)傳輸湃密,可能是byteBufferHeader有剩余诅挑,也可能是SelectMappedBufferResult.ByteBuffer盛放消息的具體內容的數據還有剩余四敞,沒有被寫完,重新開始寫
} else {
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
// 根據offset從master的commitlog文件獲取數據
SelectMappedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
int size = selectResult.getSize();
if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
}
long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;
selectResult.getByteBuffer().limit(size);
this.selectMappedBufferResult = selectResult;
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(headerSize);
this.byteBufferHeader.putLong(thisOffset);
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();
// 向slave的socket通道寫數據
this.lastWriteOver = this.transferData();
} else {
// 如果沒有獲取到commitlog的數據拔妥,則進行等待忿危;
// 一個Slave到Master的連接,一個HAConnection對象没龙,一個WriteSocketService對象铺厨,一個線程,
// 因為Master沒有最新的commitlog的數據硬纤,所以把所有的等待著數據的HAConnection的WriteSocketService()動作解滓,進行等待;
// 將所有的HAConnection的WriteSocketService()線程被設置為未被通知的狀態(tài)
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}
// 正常關機
// 將這個連接線程關閉筝家,移除
HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();
if (this.selectMappedBufferResult != null) {
this.selectMappedBufferResult.release();
}
this.makeStop();
readSocketService.makeStop();
haService.removeConnection(HAConnection.this);
SelectionKey sk = this.socketChannel.keyFor(this.selector);
if (sk != null) {
sk.cancel();
}
try {
this.selector.close();
this.socketChannel.close();
} catch (IOException e) {
HAConnection.log.error("", e);
}
HAConnection.log.info(this.getServiceName() + " service end");
}
// 向slave的socket通道寫數據
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
// Write Header
// 如果讀到Header數據的大小為0byte>3,跳出這個循環(huán)洼裤,進行下次header的寫入
while (this.byteBufferHeader.hasRemaining()) {
int writeSize = this.socketChannel.write(this.byteBufferHeader);
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write header error < 0");
}
}
// 根據offset從master的commitlog文件獲取數據,maser是否有數據
if (null == this.selectMappedBufferResult) {
return !this.byteBufferHeader.hasRemaining();
}
writeSizeZeroTimes = 0;
// Write Body
// header被寫滿溪王,開始寫body腮鞍;header:offset大小+4字節(jié)消息大小莹菱;header寫滿了移国,一定會有message的body,再去小body
if (!this.byteBufferHeader.hasRemaining()) {
while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception("ha master write body error < 0");
}
}
}
// header已經被寫滿 + selectMappedBufferResult里面存儲message的內容的ByteBuffer已經被寫完了道伟,那這次寫數據成功了迹缀。
boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();
// 釋放空間
if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
this.selectMappedBufferResult.release();
this.selectMappedBufferResult = null;
}
return result;
}
Master和Slave同步通知控制
GroupTransferService是用來控制Master是否向Slave同步commitlog數據的。通過WaitNotifyObject來喚醒HAConnection中WriteSocketService向Slave寫commitlog數據蜜徽,同步等待5秒進行判斷是否寫入Slave是否成功裹芝。
Master和Slave會進行通信,Master寫message到內存ByteBuffer娜汁,然后調用handleHA()方法嫂易,然后構造一個同步請求放入GroupTransferService#requestsWrite的隊列里,等待HAConnection#WriteSocketService處理這個請求,然后將commitlog的message數據掐禁,同步到Slave中怜械。
class GroupTransferService extends ServiceThread {
// 用來協調HAConnection中WriteSocketService和ReadSocketService之間的通信的
private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();
// 寫請求隊列,兩個隊列進行交換
private volatile List<CommitLog.GroupCommitRequest> requestsWrite = new ArrayList<>();
// 讀請求隊列傅事,兩個隊列進行交換
private volatile List<CommitLog.GroupCommitRequest> requestsRead = new ArrayList<>();
// 放入請求到寫隊列
public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {
synchronized (this.requestsWrite) {
this.requestsWrite.add(request);
}
// 喚醒處理這個Request的線程缕允,喚醒doWaitTransfer()方法
this.wakeup();
}
// 通知Master的WriteSocketService給Slave傳輸一些數據
public void notifyTransferSome() {
this.notifyTransferObject.wakeup();
}
// 交換隊列
private void swapRequests() {
List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}
private void doWaitTransfer() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
// true,代表這個offset已經被推送過Slave了
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
// request被處理的截止時間蹭越,消息從Master同步到Slave的同步等待時間5秒障本;
long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
+ HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
// offset沒有被推送過&&now<被處理的截止時間
// 圖中是根據次數進行控制的,現在最新代碼改為了時間判斷
while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
// WaitNotifyObject對象的waitForRunning(),交換讀寫隊列,轉變對象為未被通知的狀態(tài)驾霜,并等待1秒案训,
// 等待WriteSocketService中將數據寫入到Slave中,并更細push2SlaveMaxOffset粪糙,表示已經發(fā)送强霎;具體發(fā)送動作在WriteSocketService中,這里只有一個判斷是否發(fā)送成功蓉冈,然后是等待城舞,等待發(fā)送結果。
this.notifyTransferObject.waitForRunning(1000);
// push2SlaveMaxOffset被更新寞酿,大于request的offset家夺,表示被Slave處理成功。
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
if (!transferOK) {
log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
}
// 喚醒等待這個request處理結果的線程伐弹,應答存放這個request的線程秦踪,并返回結果;返回點為HandleHA()方法
req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
this.requestsRead.clear();
}
}
}
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 父類ServiceThread的waitForRunning()方法掸茅,設置hasNotified為false,未被通知柠逞,然后交換寫對隊列和讀隊列昧狮,重置waitPoint為(1),休息200ms板壮,finally設置hasNotified為未被通知逗鸣,交換寫對隊列和讀隊列
this.waitForRunning(10);
//
this.doWaitTransfer();
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info(this.getServiceName() + " service end");
}
@Override
protected void onWaitEnd() {
this.swapRequests();
}
@Override
public String getServiceName() {
return GroupTransferService.class.getSimpleName();
}
}
Message同步入庫
Master和Slave會進行通信,Master寫message到內存ByteBuffer绰精,然后調用handleHA()方法撒璧,然后構造一個同步請求放入GroupTransferService#requestsWrite的隊列里,等待HAConnection#WriteSocketService處理這個請求,然后將commitlog的message數據笨使,同步到Slave中卿樱。
一個Slave到Master的連接,一個HAConnection對象硫椰,一個WriteSocketService對象繁调,一個線程,將這個線程放入waitingThreadTable中靶草,被設置這個線程未被通知的狀態(tài);
service.getWaitNotifyObject().wakeupAll();是喚醒所有等待的Master向Slave寫CommitLog的message線程蹄胰,向Slave同步數據。
/**
* commitlog的高可用奕翔,不同節(jié)點之間的構成commitlog的message復制裕寨,每條消息進行一次方法調用
* @param result 追加消息到ByteBuffer中的返回結果
* @param putMessageResult 放入ByteBuffer這個過程的結果(存放消息的結果)
* @param messageExt 需要存放的消息
*/
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// 是同步Master的角色
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
// HA服務
HAService service = this.defaultMessageStore.getHaService();
// 是否等待消息落盤完畢
if (messageExt.isWaitStoreMsgOK()) {
// Determine whether to wait
// 推送這條消息,Slave是否可以接受這條消息推送
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
// 構建Master到Slave的同步請求
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
// 放入請求,并喚醒處理這個request的服務:HAService#GroupTransferService的doWaitTransfer()方法宾袜,處理這個request捻艳。
service.putRequest(request);
// HAService#WaitNotifyObject對象
// 一個Slave到Master的連接,一個HAConnection對象试和,一個WriteSocketService對象讯泣,一個線程,將這個線程放入waitingThreadTable中阅悍,被設置這個線程未被通知的狀態(tài);
// 這里的作用是喚醒所有等待的Master向Slave寫CommitLog的message線程好渠,向Slave同步數據。
service.getWaitNotifyObject().wakeupAll();
PutMessageStatus replicaStatus = null;
try {
// 同步等待寫入Slave的commitlog消息返回結果节视,超時等待5秒
replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
}
// 同步失敗拳锚,記錄log
if (replicaStatus != PutMessageStatus.PUT_OK) {
log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
+ messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
// 存放Slave不可用結果,并返回
else {
// Tell the producer, slave not available
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
}
線程通知關鍵
- GroupTransferService#WaitNotifyObject#notifyTransferObject
HAConnection的WriteSocketService和ReadSocketService都沒有使用ThreadService的通知和等待系統(tǒng)寻行,WriteSocketService和ReadSocketService通信采用了GroupTransferService#WaitNotifyObject#notifyTransferObject這個協調兩個線程之間的通信霍掺。
- HAService#WaitNotifyObject#waitNotifyObject喚醒
HAService#WaitNotifyObject#waitNotifyObject的喚醒
是用來通知所有的HAConnection的WriteSocketService中Master向Slave寫CommitLog數據的,
調用的地方CommitLog#handleHA()#service.getWaitNotifyObject().wakeupAll()拌蜘。
- HAService#WaitNotifyObject#waitNotifyObject等待
HAService#WaitNotifyObject#waitNotifyObject等待
是用來停止所有的HAConnection的WriteSocketService中Master向Slave寫CommitLog數據的杆烁,
調用的地方是在:HAConnection的#WriteSocketService#HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
- GroupTransferService#ThredService父類
GroupTransferService的ServiceThread的喚醒和等待是用來處理本線程讀隊列和寫隊列之間處理同步數據請求的,和同步刷盤服務GroupCommitService一樣的效果简卧。