SOFABolt 源碼分析2 - RpcServer 服務(wù)端啟動的設(shè)計

 RpcServer server = new RpcServer(8888);
 server.registerUserProcessor(new MyServerUserProcessor());
 server.start();

一、代碼執(zhí)行流程梯形圖

<!-- 一、創(chuàng)建 RpcServer -->
new RpcServer(port)
-->EventLoopGroup workerGroup
-->EventLoopGroup bossGroup
-->new RpcCodec
-->new ConnectionEventListener()
-->new ConcurrentHashMap<String, UserProcessor<?>> userProcessors

<!-- 二琐谤、添加用戶自定義業(yè)務(wù)邏輯處理器 -->
RpcServer.registerUserProcessor(UserProcessor<?> processor)

<!-- 三、初始化并啟動 RpcServer 實例 -->
AbstractRemotingServer.start()
-->RpcServer.doInit()
  <!-- 3.1 關(guān)聯(lián)連接事件處理器與連接事件監(jiān)聽器,并由連接事件處理器中的連接執(zhí)行器來執(zhí)行連接監(jiān)聽器中的 processor -->
  -->new ConnectionEventHandler(GlobalSwitch globalSwitch) // 連接事件處理器
  -->ConnectionEventHandler.setConnectionEventListener(ConnectionEventListener listener) // 關(guān)聯(lián)連接事件處理器與連接事件監(jiān)聽器
  -->new ConnectionEventExecutor() // 創(chuàng)建連接事件執(zhí)行器(后續(xù) ConnectionEventListener 中的 ConnectionEventProcessor 的執(zhí)行都由該線程池來完成)
  <!-- 3.2 創(chuàng)建真正的請求執(zhí)行客戶端(發(fā)起調(diào)用類) -->
  -->initRpcRemoting()
    <!-- 3.2.1 創(chuàng)建兩種協(xié)議實例 + 注冊到 RpcProtocolManager -->
    -->RpcProtocolManager.initProtocols()
      -->new RpcProtocol()
        -->this.encoder = new RpcCommandEncoder(); // 真正的最底層的編碼器
        -->this.decoder = new RpcCommandDecoder(); // 真正的最底層的解碼器
        -->this.commandFactory = new RpcCommandFactory(); // 創(chuàng)建請求信息和返回信息包裝體的工廠
        -->this.heartbeatTrigger = new RpcHeartbeatTrigger(this.commandFactory); // 最底層的連接心跳處理器
        -->this.commandHandler = new RpcCommandHandler(this.commandFactory); 
          -->this.processorManager = new ProcessorManager() // 創(chuàng)建 processor 管理器
            -->Map<CommandCode, RemotingProcessor<?>> cmd2processors
            -->ExecutorService defaultExecutor
          -->this.processorManager.registerProcessor(RpcCommandCode.RPC_REQUEST, new RpcRequestProcessor(this.commandFactory))
          -->this.processorManager.registerProcessor(RpcCommandCode.RPC_RESPONSE, new RpcResponseProcessor())
          -->this.processorManager.registerProcessor(CommonCommandCode.HEARTBEAT, new RpcHeartBeatProcessor());
          -->this.processorManager.registerDefaultProcessor(直接 logger.error)
      -->RpcProtocolManager.registerProtocol(Protocol protocol, byte... protocolCodeBytes) // 將 RpcProtocol 實例添加到 RpcProtocolManager 的 Map<ProtocolCode, Protocol> protocols 中
      -->new RpcProtocol2()
        -->this.encoder = new RpcCommandEncoderV2();
        -->this.decoder = new RpcCommandDecoderV2();
        -->this.commandFactory = new RpcCommandFactory();
        -->this.heartbeatTrigger = new RpcHeartbeatTrigger(this.commandFactory);
        -->this.commandHandler = new RpcCommandHandler(this.commandFactory);
      -->RpcProtocolManager.registerProtocol(Protocol protocol, byte... protocolCodeBytes) // 將 RpcProtocolV2 實例添加到 RpcProtocolManager 的 Map<ProtocolCode, Protocol> protocols 中
    <!-- 3.2.2 創(chuàng)建請求信息和返回信息包裝體的工廠 -->
    -->new RpcCommandFactory()
    <!-- 3.2.3 創(chuàng)建 RpcServerRemoting (發(fā)起底層調(diào)用實現(xiàn)類) -->
    -->new RpcServerRemoting(CommandFactory commandFactory, RemotingAddressParser addressParser, DefaultConnectionManager connectionManager)
  <!-- 3.3 配置 netty 服務(wù)端 -->
  -->new ServerBootstrap() 并做一系列配置
  // netty 業(yè)務(wù)邏輯處理器
  -->new RpcHandler(boolean serverSide, ConcurrentHashMap<String, UserProcessor<?>> userProcessors) // {"com.alipay.remoting.mydemo.MyRequest" : MyServerUserProcessor}
  -->netty 連接 channel 創(chuàng)建成功后锻弓,將 channel 包裝為 Connection 對象
-->RpcServer.doStart()
  -->this.bootstrap.bind(new InetSocketAddress(ip(), port())).sync()

總結(jié):

關(guān)于連接 Connection 相關(guān)的辣辫,放在《Connection 連接設(shè)計》章節(jié)分析,此處跳過忧便;
關(guān)于心跳 HeartBeat 相關(guān)的族吻,放在《HeartBeat 心跳設(shè)計》章節(jié)分析,此處跳過珠增。

  1. 創(chuàng)建 RpcServer 實例
  • 創(chuàng)建 workerGroup(static類變量超歌,實現(xiàn)多個 RpcServer 實例共享 workerGroup)與 bossGroup
  • 創(chuàng)建 Codec 的實現(xiàn)類 RpcCodec 實例,用于創(chuàng)建 netty 的編解碼器蒂教,實質(zhì)上是一個工廠類
  • 創(chuàng)建用戶處理器 UserProcessor 實現(xiàn)類容器 Map<String, UserProcessor<?>> userProcessors
  1. 添加 UserProcessor 實現(xiàn)類 到 userProcessors

{ "感興趣的請求數(shù)據(jù)類型" :UserProcessor實現(xiàn)類 }
eg. key = "com.alipay.remoting.mydemo.MyRequest"巍举,value = MyServerUserProcessor 實例

  1. 初始化并啟動 RpcServer 實例
  • 初始化 RpcServer
  • 創(chuàng)建兩種協(xié)議 RpcProtocol 和 RpcProtocolV2 實例,添加到 RpcProtocolManager 的 Map<ProtocolCode, Protocol> protocols 協(xié)議容器中凝垛;每一種協(xié)議都有以下5個屬性:

CommandEncoder:編碼器實例
CommandDecoder:解碼器實例
HeartbeatTrigger:心跳觸發(fā)器實例
CommandFactory:創(chuàng)建 Remote 層請求和響應(yīng)封裝實體的創(chuàng)建工廠實例
CommandHandler:Remote 層的消息處理器懊悯,包含一個 ProcessorManager 實例, 其包含ConcurrentHashMap<CommandCode, RemotingProcessor<?>> cmd2processors容器梦皮,存儲了三個 Processor 處理器

請求消息處理器:{ RpcCommandCode.RPC_REQUEST : RpcRequestProcessor 實例 }
響應(yīng)消息處理器:{ RpcCommandCode.RPC_RESPONSE : RpcResponseProcessor 實例 }
心跳消息處理器(心跳發(fā)送與心跳響應(yīng)消息):{ CommonCommandCode.HEARTBEAT : RpcHeartBeatProcessor 實例 }
默認(rèn)處理器:AbstractRemotingProcessor 匿名內(nèi)部類定枷,直接 logger.error。該處理器用于婁底届氢,當(dāng)要處理的消息不是上述三種的任何一個欠窒,則使用該處理器

  • 創(chuàng)建 Remote 層請求和響應(yīng)封裝實體的創(chuàng)建工廠 RpcCommandFactory 實例
  • 創(chuàng)建 RpcServerRemoting (發(fā)起底層調(diào)用實現(xiàn)類) 實例

SOFABolt 可以進(jìn)行雙向調(diào)用,server 端也可以調(diào)用 client 端退子,所以此處構(gòu)建了 RpcServerRemoting 實例

  • 創(chuàng)建 ServerBootstrap 實例并設(shè)置一系列 netty 服務(wù)端配置
  • 創(chuàng)建 RpcHandler 實例作為 netty 的業(yè)務(wù)邏輯處理器岖妄,之后會有一個 Processor 處理鏈進(jìn)行處理

請求鏈 RpcHandler -> RpcCommandHandler -> RpcRequestProcessor -> UserProcessor
響應(yīng)鏈 RpcHandler -> RpcCommandHandler -> RpcResponseProcessor
心跳鏈 RpcHandler -> RpcCommandHandler -> RpcHeartBeatProcessor

  • 啟動 RpcServer

this.bootstrap.bind

二、從 netty 的角度看執(zhí)行鏈

        this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel channel) {
                ChannelPipeline pipeline = channel.pipeline();
                pipeline.addLast("decoder", codec.newDecoder());
                pipeline.addLast("encoder", codec.newEncoder());
                if (idleSwitch) {
                    pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime,
                        TimeUnit.MILLISECONDS));
                    pipeline.addLast("serverIdleHandler", serverIdleHandler);
                }
                pipeline.addLast("connectionEventHandler", connectionEventHandler);
                pipeline.addLast("handler", rpcHandler);
                createConnection(channel);
            }
        }

跳過心跳 HeartBeat 邏輯寂祥、跳過連接 Connection 邏輯荐虐,我們只看編解碼器和業(yè)務(wù)處理器邏輯。
這里大概給出編解碼鏈丸凭,真正的編解碼過程在《Codec 編解碼設(shè)計》的時候再詳細(xì)分析福扬。
這里大概給出調(diào)用鏈,真正的調(diào)用處理過程在分析四種調(diào)用模式設(shè)計的時候再詳細(xì)分析惜犀。

2.1 編解碼器 RpcCodec

編碼鏈 ProtocolCodeBasedEncoder -> CommandEncoder
解碼鏈 RpcProtocolDecoder -> CommandDecoder

-------------------------------- netty 編解碼器 構(gòu)造工廠 ------------------------------------------
public class RpcCodec implements Codec {
    @Override
    public ChannelHandler newEncoder() {
        return new ProtocolCodeBasedEncoder(ProtocolCode.fromBytes(RpcProtocolV2.PROTOCOL_CODE));
    }
    @Override
    public ChannelHandler newDecoder() {
        return new RpcProtocolDecoder(RpcProtocolManager.DEFAULT_PROTOCOL_CODE_LENGTH);
    }
}

--------------------------- netty 編碼器:CommandEncoder的包裝類 -----------------------------------
public class ProtocolCodeBasedEncoder extends MessageToByteEncoder<Serializable> {
    ...

    @Override
    protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out)
                                                                                   throws Exception {
        Attribute<ProtocolCode> att = ctx.channel().attr(Connection.PROTOCOL);
        ...
        Protocol protocol = ProtocolManager.getProtocol(protocolCode);
        // 使用protocol的編碼器進(jìn)行編碼
        protocol.getEncoder().encode(ctx, msg, out);
    }
}

--------------------------- netty 解碼器:CommandDecoder的包裝類 -----------------------------------
public class RpcProtocolDecoder extends ProtocolCodeBasedDecoder {
    ...

    @Override
    protected byte decodeProtocolVersion(ByteBuf in) {
       ...
    }
}

public class ProtocolCodeBasedDecoder extends AbstractBatchDecoder {
    ...

   protected byte decodeProtocolVersion(ByteBuf in) {
        ...
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        in.markReaderIndex();
        ProtocolCode protocolCode = decodeProtocolCode(in);
        if (null != protocolCode) {
            byte protocolVersion = decodeProtocolVersion(in);
            ...
            Protocol protocol = ProtocolManager.getProtocol(protocolCode);
            if (null != protocol) {
                in.resetReaderIndex();
                // 使用protocol的解碼器進(jìn)行解碼
                protocol.getDecoder().decode(ctx, in, out);
            } else {
                throw new CodecException("Unknown protocol code: [" + protocolCode
                                         + "] while decode in ProtocolDecoder.");
            }
        }
    }
}

2.2 業(yè)務(wù)處理器 RpcHandler

請求鏈 RpcHandler -> RpcCommandHandler -> RpcRequestProcessor -> UserProcessor
響應(yīng)鏈 RpcHandler -> RpcCommandHandler -> RpcResponseProcessor
心跳鏈 RpcHandler -> RpcCommandHandler -> RpcHeartBeatProcessor

----------------- RpcHandler.channelRead(ChannelHandlerContext ctx, Object msg) -------------
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 每一個請求都會在連接上添加 ProtocolCode 屬性
        ProtocolCode protocolCode = ctx.channel().attr(Connection.PROTOCOL).get();
        // 使用 ProtocolCode 獲取 Protocol
        Protocol protocol = ProtocolManager.getProtocol(protocolCode);
        // 使用 Protocol 獲取其 RpcCommandHandler 實例铛碑,使用 RpcCommandHandler 實例進(jìn)行消息處理
        protocol.getCommandHandler().handleCommand(
            new RemotingContext(ctx, new InvokeContext(), serverSide, userProcessors), msg);
    }

三、RpcServer 類結(jié)構(gòu)圖

image.png

其中虽界,AbstractConfigurableInstance 是可配置實例抽象類汽烦,包含配置容器和全局開關(guān)(在《Config 配置設(shè)計》中進(jìn)行分析);

AbstractRemotingServer 使用 模板模式 實現(xiàn)了 父類 RemotingServer 的方法莉御,并提供了三個 doXxx() 方法由子類來覆蓋撇吞;
RpcServer 中前6個屬性除了 userProcessors 都是 netty 相關(guān)俗冻;后三個與 Connection 相關(guān);RpcRemoting 是調(diào)用客戶端的工具類牍颈。
注意:在 RemotingServer 中有 void registerDefaultExecutor(byte protocolCode, ExecutorService executor)迄薄,在《線程池設(shè)計》部分進(jìn)行分析。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末煮岁,一起剝皮案震驚了整個濱河市噪奄,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌人乓,老刑警劉巖勤篮,帶你破解...
    沈念sama閱讀 216,496評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異色罚,居然都是意外死亡碰缔,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,407評論 3 392
  • 文/潘曉璐 我一進(jìn)店門戳护,熙熙樓的掌柜王于貴愁眉苦臉地迎上來金抡,“玉大人,你說我怎么就攤上這事腌且」8危” “怎么了?”我有些...
    開封第一講書人閱讀 162,632評論 0 353
  • 文/不壞的土叔 我叫張陵铺董,是天一觀的道長巫击。 經(jīng)常有香客問我,道長精续,這世上最難降的妖魔是什么坝锰? 我笑而不...
    開封第一講書人閱讀 58,180評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮重付,結(jié)果婚禮上顷级,老公的妹妹穿的比我還像新娘。我一直安慰自己确垫,他們只是感情好弓颈,可當(dāng)我...
    茶點故事閱讀 67,198評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著删掀,像睡著了一般翔冀。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上爬迟,一...
    開封第一講書人閱讀 51,165評論 1 299
  • 那天橘蜜,我揣著相機與錄音菊匿,去河邊找鬼付呕。 笑死计福,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的徽职。 我是一名探鬼主播象颖,決...
    沈念sama閱讀 40,052評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼姆钉!你這毒婦竟也來了说订?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,910評論 0 274
  • 序言:老撾萬榮一對情侶失蹤潮瓶,失蹤者是張志新(化名)和其女友劉穎陶冷,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體毯辅,經(jīng)...
    沈念sama閱讀 45,324評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡埂伦,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,542評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了思恐。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片沾谜。...
    茶點故事閱讀 39,711評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖胀莹,靈堂內(nèi)的尸體忽然破棺而出基跑,到底是詐尸還是另有隱情,我是刑警寧澤描焰,帶...
    沈念sama閱讀 35,424評論 5 343
  • 正文 年R本政府宣布媳否,位于F島的核電站,受9級特大地震影響荆秦,放射性物質(zhì)發(fā)生泄漏逆日。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,017評論 3 326
  • 文/蒙蒙 一萄凤、第九天 我趴在偏房一處隱蔽的房頂上張望室抽。 院中可真熱鬧,春花似錦靡努、人聲如沸坪圾。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,668評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽兽泄。三九已至,卻和暖如春漾月,著一層夾襖步出監(jiān)牢的瞬間病梢,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,823評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蜓陌,地道東北人觅彰。 一個月前我還...
    沈念sama閱讀 47,722評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像钮热,于是被迫代替她去往敵國和親填抬。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,611評論 2 353

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理隧期,服務(wù)發(fā)現(xiàn)飒责,斷路器,智...
    卡卡羅2017閱讀 134,651評論 18 139
  • Spring Web MVC Spring Web MVC 是包含在 Spring 框架中的 Web 框架仆潮,建立于...
    Hsinwong閱讀 22,394評論 1 92
  • Netty的簡單介紹 Netty 是一個 NIO client-server(客戶端服務(wù)器)框架宏蛉,使用 Netty...
    AI喬治閱讀 8,402評論 1 101
  • Swift1> Swift和OC的區(qū)別1.1> Swift沒有地址/指針的概念1.2> 泛型1.3> 類型嚴(yán)謹(jǐn) 對...
    cosWriter閱讀 11,097評論 1 32
  • #只有感覺好,才能做的好# 女兒第一個30天目標(biāo):晚9:30早6:50 媽媽第一個30天目標(biāo):晚10早5 加油寶貝...
    美悠美媽閱讀 187評論 0 0