TM、RM和TC之間如何通信壤巷。(源碼持續(xù)更新邑彪,本文僅供參考)
- Fescar源碼閱讀-解決分布式事務(wù)的利器
- Fescar源碼閱讀-RPC和消息
- Fescar源碼閱讀-全自動(dòng)的分布式事務(wù)AT
- Fescar源碼閱讀-神奇的UndoLog(一)
Fescar處理分布式事務(wù),本身也是分布式系統(tǒng)胧华。其中TM和RM可理解為客戶端Client寄症,TC事務(wù)協(xié)調(diào)器作為服務(wù)端Server。
Fescar中兩者之間的通信矩动,毫不意外是通過(guò)Netty實(shí)現(xiàn)的有巧。從以理解Fescar作為目標(biāo)這一角度來(lái)說(shuō),單純的網(wǎng)絡(luò)通信的模塊悲没,其實(shí)沒(méi)有必要去刨根究底篮迎,關(guān)注著一細(xì)節(jié)。
但是目前Fescar的實(shí)現(xiàn)中,請(qǐng)求甜橱、響應(yīng)逊笆、編碼解碼、消息處理等等聯(lián)系的比較緊密(耦合了岂傲。难裆。),如果想更好的跟蹤譬胎、理解Fescar是如何處理不同類(lèi)型的事務(wù)消息的,還是有必要把Fescar的rpc實(shí)現(xiàn)梳理一下命锄。
消息
Fescar為每一種交互都定義了一個(gè)消息類(lèi)(request和response為對(duì)應(yīng)一組)堰乔,為了容易理解,此圖只包含Request脐恩,镐侯,從上圖可以看出,
消息主要分三大類(lèi)
- AbstractTransactionRequestToTC
RM/TM發(fā)送到TC的消息 - AbstractTransactionRequestToRM
TC發(fā)送到TM的消息 - AbstractIdentifyRequest
TM/RM和TC連接時(shí)的注冊(cè)消息
另外一個(gè)MergedWarpMessage
是一個(gè)消息包裝類(lèi)驶冒,內(nèi)部包含多個(gè)消息苟翻,用于消息的異步批量發(fā)送。
RPC核心類(lèi)
核心類(lèi)圖如下:
主要實(shí)現(xiàn)類(lèi):
- TC實(shí)現(xiàn)
RpcServer extends AbstractRpcRemotingServer
- TM實(shí)現(xiàn)
TmRpcClient extends AbstractRpcRemotingClient
- RM實(shí)現(xiàn)
RMRpcClient extends AbstractRpcRemotingClient
他們擁有共同父類(lèi)AbstractRpcRemoting
。
AbstractRpcRemoting提供了基本的消息發(fā)送和消息處理實(shí)現(xiàn)需忿。
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof RpcMessage) {
final RpcMessage rpcMessage = (RpcMessage)msg;
if (rpcMessage.isRequest()) {
try {
AbstractRpcRemoting.this.messageExecutor.execute(new Runnable() {
@Override
public void run() {
try {
dispatch(rpcMessage.getId(), ctx, rpcMessage.getBody());
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.errCode, th.getMessage(), th);
}
}
});
} catch (RejectedExecutionException e) {
//...
}
} else {
MessageFuture messageFuture = futures.remove(rpcMessage.getId());
if (messageFuture != null) {
messageFuture.setResultMessage(rpcMessage.getBody());
} else {
try {
AbstractRpcRemoting.this.messageExecutor.execute(new Runnable() {
@Override
public void run() {
try {
dispatch(rpcMessage.getId(), ctx, rpcMessage.getBody());
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.errCode, th.getMessage(), th);
}
}
});
} catch (RejectedExecutionException e) {
//...
}
}
}
}
}
public abstract void dispatch(long msgId, ChannelHandlerContext ctx, Object msg);
可以看到RpcMessage
就是Fescar系統(tǒng)間交互的信息載體诅炉,所有的消息都是在此進(jìn)行處理(RpcServer
中Overide
了此方法,對(duì)一些特殊消息進(jìn)行了提前處理屋厘,但最終還是調(diào)用此處的方法)涕烧。
所有的消息都是異步消費(fèi),交由dispatch
方法處理汗洒。
TC處理消息
RpcServer
的dispatch實(shí)現(xiàn)议纯,所有消息都委托給ServerMessageListener
處理。
// RpcServer
@Override
public void dispatch(long msgId, ChannelHandlerContext ctx, Object msg) {
if (msg instanceof RegisterRMRequest) {
serverMessageListener.onRegRmMessage(msgId, ctx, (RegisterRMRequest)msg, this,
checkAuthHandler);
} else {
if (ChannelManager.isRegistered(ctx.channel())) {
serverMessageListener.onTrxMessage(msgId, ctx, msg, this);
} else {
closeChannelHandlerContext(ctx);
}
}
}
- 注冊(cè)類(lèi)消息
對(duì)注冊(cè)TM和注冊(cè)RM的消息的處理主要是將channel和請(qǐng)求綁定(IP溢谤,PORT瞻凤,ResourceId等等),綁定關(guān)系使用ChannelManager
和RpcContext
管理世杀。最終ChannelManager
將TM和RM的Channel緩存在兩個(gè)很復(fù)雜的map中鲫构。
具體綁定過(guò)程比較復(fù)雜(有點(diǎn)亂。玫坛。)先忽略
// resourceId -> applicationId -> ip -> port -> RpcContext
private static final ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>>>
RM_CHANNELS = new ConcurrentHashMap<>();
//ip+appname,port
private static final ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> TM_CHANNELS = new ConcurrentHashMap<>();
- 事務(wù)消息
@Override
public void onTrxMessage(long msgId, ChannelHandlerContext ctx, Object message, ServerMessageSender sender) {
RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
if (message instanceof MergedWarpMessage) {
AbstractResultMessage[] results = new AbstractResultMessage[((MergedWarpMessage)message).msgs.size()];
for (int i = 0; i < results.length; i++) {
final AbstractMessage subMessage = ((MergedWarpMessage)message).msgs.get(i);
results[i] = transactionMessageHandler.onRequest(subMessage, rpcContext);
}
MergeResultMessage resultMessage = new MergeResultMessage();
resultMessage.setMsgs(results);
sender.sendResponse(msgId, ctx.channel(), resultMessage);
} else if (message instanceof AbstractResultMessage) {
transactionMessageHandler.onResponse((AbstractResultMessage)message, rpcContext);
}
}
層層委托后结笨,所有消息最終委托給DefaultCoordinator
和DefaultCore
處理,具體邏輯不在此文分析。
TM處理消息
準(zhǔn)確的說(shuō)TM主要是發(fā)送消息到TC(register炕吸、commit和rollback等)伐憾,對(duì)于入站消息的處理沒(méi)有特殊實(shí)現(xiàn)。
RM處理消息
RM除了需要發(fā)送消息到TC外赫模,需要處理TC的branchCommit和branchRollback消息树肃,參見(jiàn)RmMessageListener
@Override
public void onMessage(long msgId, String serverAddress, Object msg, ClientMessageSender sender) {
if (msg instanceof BranchCommitRequest) {
handleBranchCommit(msgId, serverAddress, (BranchCommitRequest)msg, sender);
} else if (msg instanceof BranchRollbackRequest) {
handleBranchRollback(msgId, serverAddress, (BranchRollbackRequest)msg, sender);
}
}
合并消息
發(fā)送消息時(shí),為了提高吞吐量瀑罗,F(xiàn)escar通過(guò)MergedSendRunnable
來(lái)合并消息胸嘴,批量異步發(fā)送;合并后的Request消息為MergedWarpMessage
啟動(dòng)Fescar
最后提一下Fescar的啟動(dòng)方式斩祭。
public interface RemotingService {
void start();
void shutdown();
}
RemotingService
定義了TM劣像、RM、TC的啟動(dòng)和關(guān)閉摧玫;不過(guò)實(shí)際的實(shí)現(xiàn)中耳奕,F(xiàn)escar是通過(guò)AbstractRpcRemoting
的init()方法去完成服務(wù)的初始化,各種任務(wù)線程池的啟動(dòng),以及服務(wù)長(zhǎng)連接的建立诬像。
以RmCpcCLient
為例:
//RmRpcClient
@Override
public void init() {
if (initialized.compareAndSet(false, true)) {
super.init();
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
reconnect(); //讀取配置屋群,獲取TC地址,連接TC
}
}, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS);
ExecutorService mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory(getThreadPrefix(MERGE_THREAD_PREFIX), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable()); //啟動(dòng)消息合并發(fā)送的任務(wù)
}
}
OK坏挠,對(duì)Fescar的RPC和消息模塊大致有了總體認(rèn)識(shí)了芍躏,后面再去看看Fescar是具體是怎么流程化去處理各種事務(wù)消息的。