在fescar源碼分析-AbstractRpcRemoting介紹了fescar對響應請求及向送請求的封裝艾帐。下面分析AbstractRpcRemoting
的子類AbstractRpcRemotingClient
暑刃,AbstractRpcRemotingClient
在父類基礎針對RPC調用的客戶端做進一步的封裝玲躯。
構造方法
public AbstractRpcRemotingClient(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup,
final ThreadPoolExecutor messageExecutor) {
super(messageExecutor);
if (null == nettyClientConfig) {
nettyClientConfig = new NettyClientConfig();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("use default netty client config.");
}
}
this.nettyClientConfig = nettyClientConfig;//1
int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();
this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize, new NamedThreadFactory(
getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()), selectorThreadSizeThreadSize));//2
this.defaultEventExecutorGroup = eventExecutorGroup;
}
- 1根據(jù)配置文件界弧,初始化RPC客戶端配置凿跳。
- 2根據(jù)配置文件中的參數(shù)良拼,設置
EventLoopGroup
。 - 3由子類創(chuàng)建的eventExecutorGroup對象就斤,設置
EventExecutorGroup
悍募。
其中關于EventLoopGroup類型的屬性,主要是用于處理NIO中的accept,read及write操作洋机,而EventExecutorGroup則是用于執(zhí)行Handler坠宴,如下圖所示:
重載父類方法
重載init()方法
首先AbstractRpcRemoting
重載了AbstractRpcRemoting
的init()
方法(初始化一個線程池,處理異步RPC請求中的超時緩存數(shù)據(jù))绷旗,添加了一些新的處理:
@Override
public void init() {
NettyPoolableFactory keyPoolableFactory = new NettyPoolableFactory(this);
nettyClientKeyPool = new GenericKeyedObjectPool(keyPoolableFactory);
nettyClientKeyPool.setConfig(getNettyPoolConfig());//1
serviceManager = new ServiceManagerStaticConfigImpl();//2
super.init();
}
- 1.設置了
AbstractRpcRemoting
類中屬性nettyClientKeyPool
喜鼓,該屬性是GenericKeyedObjectPool
類型副砍。
GenericKeyedObjectPool provides robust pooling functionality for keyed objects.
A GenericKeyedObjectPool can be viewed as a map of pools, keyed on the (unique) key values provided to the prepare Pool, add Object or borrow Object methods.
Each time a new key value is provided to one of these methods, a new pool is created under the given key to be managed by the containing GenericKeyedObjectPool.
- 2.設置了
AbstractRpcRemoting
類中屬性serviceManager
,該屬性是ServiceManager
類型庄岖。
該屬性應該是為了后期實現(xiàn)服務的注冊與發(fā)現(xiàn)功能豁翎,但現(xiàn)在暫時沒有實現(xiàn)。
ServiceManager :Service Registry and Discovery
重載dispatch()方法
dispatch在父類是一個抽象方法顿锰,具體邏輯需要子類來實現(xiàn)谨垃。AbstractRpcRemotingClient
通過注冊的ClientMessageListener
事件監(jiān)聽器實現(xiàn)對消息做具體的處理。
protected ClientMessageListener clientMessageListener;
......
@Override
public void dispatch(long msgId, ChannelHandlerContext ctx, Object msg) {
if (clientMessageListener != null) {
String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
clientMessageListener.onMessage(msgId, remoteAddress, msg, this);
}
}
重載channelRead()方法
在處理父類channelRead()方法邏輯之前硼控,先判斷一下接收到的消息是不是心跳回復消息,如果是則不再處理下面的請求胳赌。
if (rpcMessage.getBody() == HeartbeatMessage.PONG) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("received PONG from " + ctx.channel().remoteAddress());
}
return;
}
如果是合并回復消息,則批量處理回復信息牢撼。
if (((RpcMessage) msg).getBody() instanceof MergeResultMessage) {
MergeResultMessage results = (MergeResultMessage) ((RpcMessage) msg).getBody();
MergedWarpMessage mergeMessage = (MergedWarpMessage) mergeMsgMap.remove(((RpcMessage) msg).getId());
int num = mergeMessage.msgs.size();
for (int i = 0; i < num; i++) {
long msgId = mergeMessage.msgIds.get(i);
MessageFuture future = futures.remove(msgId);
if (future == null) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("msg:" + msgId + " is not found in futures.");
}
} else {
future.setResultMessage(results.getMsgs()[i]);
}
}
return;
}
除了重載父類方法,該類主要實現(xiàn)了RemotingService
, RegisterMsgListener
, ClientMessageSender
三個接口疑苫。圖下所示:
下面具體講一下各個接口的定義以及
AbstractRpcRemotingClient
對接口的具體實現(xiàn):
實現(xiàn)RemotingService接口
實現(xiàn)RPC客戶端的啟停操作熏版。在AbstractRpcRemotingClient
類中主要是通過操作內部的Bootstrap
屬性(可以通過Netty引導客戶端和無連接協(xié)議了解到,Bootstrap
是Netty創(chuàng)建客戶端的引導類)捍掺,
start()
實現(xiàn)RPC客戶端的啟動撼短。
private final Bootstrap bootstrap = new Bootstrap();
Bootstrap
主要需要設置以下屬性:
* EventLoopGroup
* Channel
* ChannelHandler
* 及一些參數(shù)配置等
為了實現(xiàn)這些參數(shù)的可配置化,AbstractRpcRemotingClient
使用了NettyClientConfig類挺勿。
而AbstractRpcRemotingClient
分別通過以下方式來設置Bootstrap
的這些屬性:
->EventLoopGroup
AbstractRpcRemotingClient
在創(chuàng)建時指定EventLoopGroup
為NioEventLoopGroup
的實例曲横。
this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize, new NamedThreadFactory(
getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()), selectorThreadSizeThreadSize));
->Channel
Channel實例類型將通過nettyClientConfig對象的屬性值來指定。
.channel(nettyClientConfig.getClientChannelClazz())//
->ChannelHandler
首先會通過配置屬性判斷是否使用連接池不瓶,如果使用連接池禾嫉。則為每個連接地址創(chuàng)建一個FixedChannelPool
對象。從字面意思理解就是一個通道池子蚊丐。
if (nettyClientConfig.isUseConnPool()) {
clientChannelPool = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() {//0
@Override
protected FixedChannelPool newPool(InetSocketAddress key) {
FixedChannelPool fixedClientChannelPool = new FixedChannelPool(bootstrap.remoteAddress(key),
new DefaultChannelPoolHandler() {
@Override
public void channelCreated(Channel ch) throws Exception {
super.channelCreated(ch);
final ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(defaultEventExecutorGroup,//1
new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
nettyClientConfig.getChannelMaxWriteIdleSeconds(),
nettyClientConfig.getChannelMaxAllIdleSeconds()));//2.1
pipeline.addLast(defaultEventExecutorGroup, new RpcClientHandler());//2.2
}
}, ChannelHealthChecker.ACTIVE, AcquireTimeoutAction.FAIL,
nettyClientConfig.getMaxAcquireConnMills(), nettyClientConfig.getPerHostMaxConn(),
nettyClientConfig.getPendingConnSize(), false);
return fixedClientChannelPool;
}
};
- 0.繼承
AbstractChannelPoolMap
類熙参,實現(xiàn)newPool(InetSocketAddress key)
方法,創(chuàng)建一個FixedChannelPool
對象。 - 池中的Channel對象會使用創(chuàng)建的DefaultEventExecutorGroup對象來執(zhí)行ChannelHandler麦备。
if (this.defaultEventExecutorGroup == null) {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),
nettyClientConfig.getClientWorkerThreads()));
}
- 2.池中的Channel對象的ChannelPipeline設置了:
IdleStateHandler
和RpcClientHandler
兩個ChannelHandler孽椰。
(注:ChannelHandler為fescar自定義ChannelHandler,但從代碼來看目前并沒有對RPC客戶端對線程池的支持實現(xiàn)完凛篙。后面將不再分析Channel池相關的代碼)黍匾。
如果不使用連接池,則為Bootstrap
對象設置了:IdleStateHandler
和 MessageCodecHandler
兩個ChannelHandler鞋诗。
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
nettyClientConfig.getChannelMaxWriteIdleSeconds(),
nettyClientConfig.getChannelMaxAllIdleSeconds()))//
.addLast(new MessageCodecHandler());//添加消息加解碼器
if (null != channelHandlers) {
addChannelPipelineLast(ch, channelHandlers);
}
}
});
shutdown()
就是在PRC客戶端關閉的時候膀捷,關閉相關資源。
public void shutdown() {
try {
if (null != clientChannelPool) {
clientChannelPool.close();//1關閉通道池削彬。
}
this.eventLoopGroupWorker.shutdownGracefully();//2關閉通道使用的EventLoopGroup全庸。
if (this.defaultEventExecutorGroup != null) {
this.defaultEventExecutorGroup.shutdownGracefully();//3關閉通道池使用的EventLoopGroup池秀仲。
}
super.destroy();
} catch (Exception exx) {
LOGGER.error("shutdown error:" + exx.getMessage());
}
}
RegisterMsgListener
該接口主要用于,根據(jù)RPC請求返回的信息壶笼,判斷調用成功或失敗的狀態(tài)觸發(fā)接口方法神僵,實現(xiàn)后續(xù)業(yè)務邏輯處理:
NettyPoolableFactory.java
......
try {
response = rpcRemotingClient.sendAsyncRequestWithResponse(null, tmpChannel, key.getMessage());
if (!isResponseSuccess(response, key.getTransactionRole())) {
rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());
} else {
channelToServer = tmpChannel;
rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response,
key.getMessage());
}
} catch (Exception exx) {
......
AbstractRpcRemotingClient并沒有實現(xiàn)RegisterMsgListener
接口,而是交由子類來實現(xiàn)覆劈。
ClientMessageSender
該接口主要用于實現(xiàn)發(fā)送請求保礼,并返回請求結果的邏輯實現(xiàn)。