RpcServer server = new RpcServer(8888); server.registerUserProcessor(new MyServerUserProcessor()); server.start();
一、代碼執(zhí)行流程梯形圖
<!-- 一、創(chuàng)建 RpcServer -->
new RpcServer(port)
-->EventLoopGroup workerGroup
-->EventLoopGroup bossGroup
-->new RpcCodec
-->new ConnectionEventListener()
-->new ConcurrentHashMap<String, UserProcessor<?>> userProcessors
<!-- 二琐谤、添加用戶自定義業(yè)務(wù)邏輯處理器 -->
RpcServer.registerUserProcessor(UserProcessor<?> processor)
<!-- 三、初始化并啟動 RpcServer 實例 -->
AbstractRemotingServer.start()
-->RpcServer.doInit()
<!-- 3.1 關(guān)聯(lián)連接事件處理器與連接事件監(jiān)聽器,并由連接事件處理器中的連接執(zhí)行器來執(zhí)行連接監(jiān)聽器中的 processor -->
-->new ConnectionEventHandler(GlobalSwitch globalSwitch) // 連接事件處理器
-->ConnectionEventHandler.setConnectionEventListener(ConnectionEventListener listener) // 關(guān)聯(lián)連接事件處理器與連接事件監(jiān)聽器
-->new ConnectionEventExecutor() // 創(chuàng)建連接事件執(zhí)行器(后續(xù) ConnectionEventListener 中的 ConnectionEventProcessor 的執(zhí)行都由該線程池來完成)
<!-- 3.2 創(chuàng)建真正的請求執(zhí)行客戶端(發(fā)起調(diào)用類) -->
-->initRpcRemoting()
<!-- 3.2.1 創(chuàng)建兩種協(xié)議實例 + 注冊到 RpcProtocolManager -->
-->RpcProtocolManager.initProtocols()
-->new RpcProtocol()
-->this.encoder = new RpcCommandEncoder(); // 真正的最底層的編碼器
-->this.decoder = new RpcCommandDecoder(); // 真正的最底層的解碼器
-->this.commandFactory = new RpcCommandFactory(); // 創(chuàng)建請求信息和返回信息包裝體的工廠
-->this.heartbeatTrigger = new RpcHeartbeatTrigger(this.commandFactory); // 最底層的連接心跳處理器
-->this.commandHandler = new RpcCommandHandler(this.commandFactory);
-->this.processorManager = new ProcessorManager() // 創(chuàng)建 processor 管理器
-->Map<CommandCode, RemotingProcessor<?>> cmd2processors
-->ExecutorService defaultExecutor
-->this.processorManager.registerProcessor(RpcCommandCode.RPC_REQUEST, new RpcRequestProcessor(this.commandFactory))
-->this.processorManager.registerProcessor(RpcCommandCode.RPC_RESPONSE, new RpcResponseProcessor())
-->this.processorManager.registerProcessor(CommonCommandCode.HEARTBEAT, new RpcHeartBeatProcessor());
-->this.processorManager.registerDefaultProcessor(直接 logger.error)
-->RpcProtocolManager.registerProtocol(Protocol protocol, byte... protocolCodeBytes) // 將 RpcProtocol 實例添加到 RpcProtocolManager 的 Map<ProtocolCode, Protocol> protocols 中
-->new RpcProtocol2()
-->this.encoder = new RpcCommandEncoderV2();
-->this.decoder = new RpcCommandDecoderV2();
-->this.commandFactory = new RpcCommandFactory();
-->this.heartbeatTrigger = new RpcHeartbeatTrigger(this.commandFactory);
-->this.commandHandler = new RpcCommandHandler(this.commandFactory);
-->RpcProtocolManager.registerProtocol(Protocol protocol, byte... protocolCodeBytes) // 將 RpcProtocolV2 實例添加到 RpcProtocolManager 的 Map<ProtocolCode, Protocol> protocols 中
<!-- 3.2.2 創(chuàng)建請求信息和返回信息包裝體的工廠 -->
-->new RpcCommandFactory()
<!-- 3.2.3 創(chuàng)建 RpcServerRemoting (發(fā)起底層調(diào)用實現(xiàn)類) -->
-->new RpcServerRemoting(CommandFactory commandFactory, RemotingAddressParser addressParser, DefaultConnectionManager connectionManager)
<!-- 3.3 配置 netty 服務(wù)端 -->
-->new ServerBootstrap() 并做一系列配置
// netty 業(yè)務(wù)邏輯處理器
-->new RpcHandler(boolean serverSide, ConcurrentHashMap<String, UserProcessor<?>> userProcessors) // {"com.alipay.remoting.mydemo.MyRequest" : MyServerUserProcessor}
-->netty 連接 channel 創(chuàng)建成功后锻弓,將 channel 包裝為 Connection 對象
-->RpcServer.doStart()
-->this.bootstrap.bind(new InetSocketAddress(ip(), port())).sync()
總結(jié):
關(guān)于連接 Connection 相關(guān)的辣辫,放在《Connection 連接設(shè)計》章節(jié)分析,此處跳過忧便;
關(guān)于心跳 HeartBeat 相關(guān)的族吻,放在《HeartBeat 心跳設(shè)計》章節(jié)分析,此處跳過珠增。
- 創(chuàng)建 RpcServer 實例
- 創(chuàng)建 workerGroup(static類變量超歌,實現(xiàn)多個 RpcServer 實例共享 workerGroup)與 bossGroup
- 創(chuàng)建 Codec 的實現(xiàn)類 RpcCodec 實例,用于創(chuàng)建 netty 的編解碼器蒂教,實質(zhì)上是一個工廠類
- 創(chuàng)建用戶處理器 UserProcessor 實現(xiàn)類容器
Map<String, UserProcessor<?>> userProcessors
- 添加 UserProcessor 實現(xiàn)類 到 userProcessors
{ "感興趣的請求數(shù)據(jù)類型" :UserProcessor實現(xiàn)類 }
eg. key = "com.alipay.remoting.mydemo.MyRequest"巍举,value = MyServerUserProcessor 實例
- 初始化并啟動 RpcServer 實例
- 初始化 RpcServer
- 創(chuàng)建兩種協(xié)議 RpcProtocol 和 RpcProtocolV2 實例,添加到 RpcProtocolManager 的
Map<ProtocolCode, Protocol> protocols
協(xié)議容器中凝垛;每一種協(xié)議都有以下5個屬性:CommandEncoder:編碼器實例
CommandDecoder:解碼器實例
HeartbeatTrigger:心跳觸發(fā)器實例
CommandFactory:創(chuàng)建 Remote 層請求和響應(yīng)封裝實體的創(chuàng)建工廠實例
CommandHandler:Remote 層的消息處理器懊悯,包含一個 ProcessorManager 實例, 其包含ConcurrentHashMap<CommandCode, RemotingProcessor<?>> cmd2processors
容器梦皮,存儲了三個 Processor 處理器請求消息處理器:{ RpcCommandCode.RPC_REQUEST : RpcRequestProcessor 實例 }
響應(yīng)消息處理器:{ RpcCommandCode.RPC_RESPONSE : RpcResponseProcessor 實例 }
心跳消息處理器(心跳發(fā)送與心跳響應(yīng)消息):{ CommonCommandCode.HEARTBEAT : RpcHeartBeatProcessor 實例 }
默認(rèn)處理器:AbstractRemotingProcessor 匿名內(nèi)部類定枷,直接 logger.error。該處理器用于婁底届氢,當(dāng)要處理的消息不是上述三種的任何一個欠窒,則使用該處理器
- 創(chuàng)建 Remote 層請求和響應(yīng)封裝實體的創(chuàng)建工廠 RpcCommandFactory 實例
- 創(chuàng)建 RpcServerRemoting (發(fā)起底層調(diào)用實現(xiàn)類) 實例
SOFABolt 可以進(jìn)行雙向調(diào)用,server 端也可以調(diào)用 client 端退子,所以此處構(gòu)建了 RpcServerRemoting 實例
- 創(chuàng)建 ServerBootstrap 實例并設(shè)置一系列 netty 服務(wù)端配置
- 創(chuàng)建 RpcHandler 實例作為 netty 的業(yè)務(wù)邏輯處理器岖妄,之后會有一個 Processor 處理鏈進(jìn)行處理
請求鏈
RpcHandler -> RpcCommandHandler -> RpcRequestProcessor -> UserProcessor
響應(yīng)鏈RpcHandler -> RpcCommandHandler -> RpcResponseProcessor
心跳鏈RpcHandler -> RpcCommandHandler -> RpcHeartBeatProcessor
- 啟動 RpcServer
this.bootstrap.bind
二、從 netty 的角度看執(zhí)行鏈
this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("decoder", codec.newDecoder());
pipeline.addLast("encoder", codec.newEncoder());
if (idleSwitch) {
pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime,
TimeUnit.MILLISECONDS));
pipeline.addLast("serverIdleHandler", serverIdleHandler);
}
pipeline.addLast("connectionEventHandler", connectionEventHandler);
pipeline.addLast("handler", rpcHandler);
createConnection(channel);
}
}
跳過心跳 HeartBeat 邏輯寂祥、跳過連接 Connection 邏輯荐虐,我們只看編解碼器和業(yè)務(wù)處理器邏輯。
這里大概給出編解碼鏈丸凭,真正的編解碼過程在《Codec 編解碼設(shè)計》的時候再詳細(xì)分析福扬。
這里大概給出調(diào)用鏈,真正的調(diào)用處理過程在分析四種調(diào)用模式設(shè)計的時候再詳細(xì)分析惜犀。
2.1 編解碼器 RpcCodec
編碼鏈
ProtocolCodeBasedEncoder -> CommandEncoder
解碼鏈RpcProtocolDecoder -> CommandDecoder
-------------------------------- netty 編解碼器 構(gòu)造工廠 ------------------------------------------
public class RpcCodec implements Codec {
@Override
public ChannelHandler newEncoder() {
return new ProtocolCodeBasedEncoder(ProtocolCode.fromBytes(RpcProtocolV2.PROTOCOL_CODE));
}
@Override
public ChannelHandler newDecoder() {
return new RpcProtocolDecoder(RpcProtocolManager.DEFAULT_PROTOCOL_CODE_LENGTH);
}
}
--------------------------- netty 編碼器:CommandEncoder的包裝類 -----------------------------------
public class ProtocolCodeBasedEncoder extends MessageToByteEncoder<Serializable> {
...
@Override
protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out)
throws Exception {
Attribute<ProtocolCode> att = ctx.channel().attr(Connection.PROTOCOL);
...
Protocol protocol = ProtocolManager.getProtocol(protocolCode);
// 使用protocol的編碼器進(jìn)行編碼
protocol.getEncoder().encode(ctx, msg, out);
}
}
--------------------------- netty 解碼器:CommandDecoder的包裝類 -----------------------------------
public class RpcProtocolDecoder extends ProtocolCodeBasedDecoder {
...
@Override
protected byte decodeProtocolVersion(ByteBuf in) {
...
}
}
public class ProtocolCodeBasedDecoder extends AbstractBatchDecoder {
...
protected byte decodeProtocolVersion(ByteBuf in) {
...
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
in.markReaderIndex();
ProtocolCode protocolCode = decodeProtocolCode(in);
if (null != protocolCode) {
byte protocolVersion = decodeProtocolVersion(in);
...
Protocol protocol = ProtocolManager.getProtocol(protocolCode);
if (null != protocol) {
in.resetReaderIndex();
// 使用protocol的解碼器進(jìn)行解碼
protocol.getDecoder().decode(ctx, in, out);
} else {
throw new CodecException("Unknown protocol code: [" + protocolCode
+ "] while decode in ProtocolDecoder.");
}
}
}
}
2.2 業(yè)務(wù)處理器 RpcHandler
請求鏈
RpcHandler -> RpcCommandHandler -> RpcRequestProcessor -> UserProcessor
響應(yīng)鏈RpcHandler -> RpcCommandHandler -> RpcResponseProcessor
心跳鏈RpcHandler -> RpcCommandHandler -> RpcHeartBeatProcessor
----------------- RpcHandler.channelRead(ChannelHandlerContext ctx, Object msg) -------------
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 每一個請求都會在連接上添加 ProtocolCode 屬性
ProtocolCode protocolCode = ctx.channel().attr(Connection.PROTOCOL).get();
// 使用 ProtocolCode 獲取 Protocol
Protocol protocol = ProtocolManager.getProtocol(protocolCode);
// 使用 Protocol 獲取其 RpcCommandHandler 實例铛碑,使用 RpcCommandHandler 實例進(jìn)行消息處理
protocol.getCommandHandler().handleCommand(
new RemotingContext(ctx, new InvokeContext(), serverSide, userProcessors), msg);
}
三、RpcServer 類結(jié)構(gòu)圖
其中虽界,AbstractConfigurableInstance 是可配置實例抽象類汽烦,包含配置容器和全局開關(guān)(在《Config 配置設(shè)計》中進(jìn)行分析);
AbstractRemotingServer 使用
模板模式
實現(xiàn)了 父類 RemotingServer 的方法莉御,并提供了三個 doXxx() 方法由子類來覆蓋撇吞;
RpcServer 中前6個屬性除了 userProcessors 都是 netty 相關(guān)俗冻;后三個與 Connection 相關(guān);RpcRemoting 是調(diào)用客戶端的工具類牍颈。
注意:在 RemotingServer 中有void registerDefaultExecutor(byte protocolCode, ExecutorService executor)
迄薄,在《線程池設(shè)計》部分進(jìn)行分析。