fescar源碼分析-AbstractRpcRemotingClient

fescar源碼分析-AbstractRpcRemoting介紹了fescar對響應請求及向送請求的封裝艾帐。下面分析AbstractRpcRemoting的子類AbstractRpcRemotingClient暑刃,AbstractRpcRemotingClient在父類基礎針對RPC調用的客戶端做進一步的封裝玲躯。

構造方法

    public AbstractRpcRemotingClient(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup,
            final ThreadPoolExecutor messageExecutor) {
        super(messageExecutor);
        if (null == nettyClientConfig) {
            nettyClientConfig = new NettyClientConfig();
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("use default netty client config.");
            }
        }
        this.nettyClientConfig = nettyClientConfig;//1
        int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();
        this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize, new NamedThreadFactory(
                getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()), selectorThreadSizeThreadSize));//2
        this.defaultEventExecutorGroup = eventExecutorGroup;
    }
  • 1根據(jù)配置文件界弧,初始化RPC客戶端配置凿跳。
  • 2根據(jù)配置文件中的參數(shù)良拼,設置EventLoopGroup
  • 3由子類創(chuàng)建的eventExecutorGroup對象就斤,設置EventExecutorGroup悍募。

其中關于EventLoopGroup類型的屬性,主要是用于處理NIO中的accept,read及write操作洋机,而EventExecutorGroup則是用于執(zhí)行Handler坠宴,如下圖所示:


image
image

重載父類方法

重載init()方法

首先AbstractRpcRemoting重載了AbstractRpcRemotinginit()方法(初始化一個線程池,處理異步RPC請求中的超時緩存數(shù)據(jù))绷旗,添加了一些新的處理:


    @Override
    public void init() {
        NettyPoolableFactory keyPoolableFactory = new NettyPoolableFactory(this);
        nettyClientKeyPool = new GenericKeyedObjectPool(keyPoolableFactory);
        nettyClientKeyPool.setConfig(getNettyPoolConfig());//1
        serviceManager = new ServiceManagerStaticConfigImpl();//2
        super.init();
    }
  • 1.設置了AbstractRpcRemoting類中屬性nettyClientKeyPool喜鼓,該屬性是GenericKeyedObjectPool類型副砍。
GenericKeyedObjectPool provides robust pooling functionality for keyed objects. 
A GenericKeyedObjectPool can be viewed as a map of pools, keyed on the (unique) key values provided to the prepare Pool, add Object or borrow Object methods. 
Each time a new key value is provided to one of these methods, a new pool is created under the given key to be managed by the containing GenericKeyedObjectPool. 
  • 2.設置了AbstractRpcRemoting類中屬性serviceManager,該屬性是ServiceManager類型庄岖。
    該屬性應該是為了后期實現(xiàn)服務的注冊與發(fā)現(xiàn)功能豁翎,但現(xiàn)在暫時沒有實現(xiàn)。
ServiceManager :Service Registry and Discovery

重載dispatch()方法

dispatch在父類是一個抽象方法顿锰,具體邏輯需要子類來實現(xiàn)谨垃。AbstractRpcRemotingClient通過注冊的ClientMessageListener事件監(jiān)聽器實現(xiàn)對消息做具體的處理。

 protected ClientMessageListener clientMessageListener;
......
    @Override
    public void dispatch(long msgId, ChannelHandlerContext ctx, Object msg) {
        if (clientMessageListener != null) {
            String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
            clientMessageListener.onMessage(msgId, remoteAddress, msg, this);
        }
    }

重載channelRead()方法

在處理父類channelRead()方法邏輯之前硼控,先判斷一下接收到的消息是不是心跳回復消息,如果是則不再處理下面的請求胳赌。

            if (rpcMessage.getBody() == HeartbeatMessage.PONG) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("received PONG from " + ctx.channel().remoteAddress());
                }
                return;
            }

如果是合并回復消息,則批量處理回復信息牢撼。

        if (((RpcMessage) msg).getBody() instanceof MergeResultMessage) {
            MergeResultMessage results = (MergeResultMessage) ((RpcMessage) msg).getBody();
            MergedWarpMessage mergeMessage = (MergedWarpMessage) mergeMsgMap.remove(((RpcMessage) msg).getId());
            int num = mergeMessage.msgs.size();
            for (int i = 0; i < num; i++) {
                long msgId = mergeMessage.msgIds.get(i);
                MessageFuture future = futures.remove(msgId);
                if (future == null) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("msg:" + msgId + " is not found in futures.");
                    }
                } else {
                    future.setResultMessage(results.getMsgs()[i]);
                }
            }
            return;
        }

除了重載父類方法,該類主要實現(xiàn)了RemotingService, RegisterMsgListener, ClientMessageSender三個接口疑苫。圖下所示:

image.png

下面具體講一下各個接口的定義以及AbstractRpcRemotingClient對接口的具體實現(xiàn):

實現(xiàn)RemotingService接口

實現(xiàn)RPC客戶端的啟停操作熏版。在AbstractRpcRemotingClient類中主要是通過操作內部的Bootstrap屬性(可以通過Netty引導客戶端和無連接協(xié)議了解到,Bootstrap是Netty創(chuàng)建客戶端的引導類)捍掺,

image.png

start()

實現(xiàn)RPC客戶端的啟動撼短。

    private final Bootstrap bootstrap = new Bootstrap();

Bootstrap主要需要設置以下屬性:

* EventLoopGroup 
* Channel
* ChannelHandler
* 及一些參數(shù)配置等

為了實現(xiàn)這些參數(shù)的可配置化,AbstractRpcRemotingClient使用了NettyClientConfig類挺勿。
AbstractRpcRemotingClient分別通過以下方式來設置Bootstrap的這些屬性:

->EventLoopGroup

AbstractRpcRemotingClient在創(chuàng)建時指定EventLoopGroupNioEventLoopGroup的實例曲横。

        this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize, new NamedThreadFactory(
                getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()), selectorThreadSizeThreadSize));

->Channel

Channel實例類型將通過nettyClientConfig對象的屬性值來指定。

                .channel(nettyClientConfig.getClientChannelClazz())//

->ChannelHandler

首先會通過配置屬性判斷是否使用連接池不瓶,如果使用連接池禾嫉。則為每個連接地址創(chuàng)建一個FixedChannelPool對象。從字面意思理解就是一個通道池子蚊丐。

        if (nettyClientConfig.isUseConnPool()) {
            clientChannelPool = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() {//0
                @Override
                protected FixedChannelPool newPool(InetSocketAddress key) {
                    FixedChannelPool fixedClientChannelPool = new FixedChannelPool(bootstrap.remoteAddress(key),
                            new DefaultChannelPoolHandler() {
                                @Override
                                public void channelCreated(Channel ch) throws Exception {
                                    super.channelCreated(ch);
                                    final ChannelPipeline pipeline = ch.pipeline();
                                    pipeline.addLast(defaultEventExecutorGroup,//1
                                            new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
                                                    nettyClientConfig.getChannelMaxWriteIdleSeconds(),
                                                    nettyClientConfig.getChannelMaxAllIdleSeconds()));//2.1
                                    pipeline.addLast(defaultEventExecutorGroup, new RpcClientHandler());//2.2
                                }
                            }, ChannelHealthChecker.ACTIVE, AcquireTimeoutAction.FAIL,
                            nettyClientConfig.getMaxAcquireConnMills(), nettyClientConfig.getPerHostMaxConn(),
                            nettyClientConfig.getPendingConnSize(), false);
                    return fixedClientChannelPool;

                }
            };
  • 0.繼承AbstractChannelPoolMap類熙参,實現(xiàn)newPool(InetSocketAddress key)方法,創(chuàng)建一個FixedChannelPool對象。
    1. 池中的Channel對象會使用創(chuàng)建的DefaultEventExecutorGroup對象來執(zhí)行ChannelHandler麦备。
        if (this.defaultEventExecutorGroup == null) {
            this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(),
                    new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()),
                            nettyClientConfig.getClientWorkerThreads()));
        }
  • 2.池中的Channel對象的ChannelPipeline設置了:IdleStateHandlerRpcClientHandler兩個ChannelHandler孽椰。
    (注:ChannelHandler為fescar自定義ChannelHandler,但從代碼來看目前并沒有對RPC客戶端對線程池的支持實現(xiàn)完凛篙。后面將不再分析Channel池相關的代碼)黍匾。

如果不使用連接池,則為Bootstrap對象設置了:IdleStateHandlerMessageCodecHandler兩個ChannelHandler鞋诗。

bootstrap.handler(new ChannelInitializer<SocketChannel>() {

                @Override
                public void initChannel(SocketChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
                            nettyClientConfig.getChannelMaxWriteIdleSeconds(),
                            nettyClientConfig.getChannelMaxAllIdleSeconds()))//
                    .addLast(new MessageCodecHandler());//添加消息加解碼器
                    if (null != channelHandlers) {
                        addChannelPipelineLast(ch, channelHandlers);
                    }
                }
            });

shutdown()

就是在PRC客戶端關閉的時候膀捷,關閉相關資源。

    public void shutdown() {
        try {
            if (null != clientChannelPool) {
                clientChannelPool.close();//1關閉通道池削彬。
            }
            this.eventLoopGroupWorker.shutdownGracefully();//2關閉通道使用的EventLoopGroup全庸。
            if (this.defaultEventExecutorGroup != null) {
                this.defaultEventExecutorGroup.shutdownGracefully();//3關閉通道池使用的EventLoopGroup池秀仲。
            }
            super.destroy();
        } catch (Exception exx) {
            LOGGER.error("shutdown error:" + exx.getMessage());
        }
    }

RegisterMsgListener

image.png

該接口主要用于,根據(jù)RPC請求返回的信息壶笼,判斷調用成功或失敗的狀態(tài)觸發(fā)接口方法神僵,實現(xiàn)后續(xù)業(yè)務邏輯處理:

NettyPoolableFactory.java
......
        try {
            response = rpcRemotingClient.sendAsyncRequestWithResponse(null, tmpChannel, key.getMessage());
            if (!isResponseSuccess(response, key.getTransactionRole())) {
                rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());
            } else {
                channelToServer = tmpChannel;
                rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response,
                    key.getMessage());
            }
        } catch (Exception exx) {
......

AbstractRpcRemotingClient并沒有實現(xiàn)RegisterMsgListener接口,而是交由子類來實現(xiàn)覆劈。

ClientMessageSender

image.png

該接口主要用于實現(xiàn)發(fā)送請求保礼,并返回請求結果的邏輯實現(xiàn)。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末责语,一起剝皮案震驚了整個濱河市炮障,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌坤候,老刑警劉巖胁赢,帶你破解...
    沈念sama閱讀 218,525評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異白筹,居然都是意外死亡智末,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,203評論 3 395
  • 文/潘曉璐 我一進店門徒河,熙熙樓的掌柜王于貴愁眉苦臉地迎上來系馆,“玉大人,你說我怎么就攤上這事顽照∮赡ⅲ” “怎么了?”我有些...
    開封第一講書人閱讀 164,862評論 0 354
  • 文/不壞的土叔 我叫張陵棒厘,是天一觀的道長纵穿。 經(jīng)常有香客問我,道長奢人,這世上最難降的妖魔是什么谓媒? 我笑而不...
    開封第一講書人閱讀 58,728評論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮何乎,結果婚禮上句惯,老公的妹妹穿的比我還像新娘。我一直安慰自己支救,他們只是感情好抢野,可當我...
    茶點故事閱讀 67,743評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著各墨,像睡著了一般指孤。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,590評論 1 305
  • 那天恃轩,我揣著相機與錄音结洼,去河邊找鬼。 笑死叉跛,一個胖子當著我的面吹牛松忍,可吹牛的內容都是我干的。 我是一名探鬼主播筷厘,決...
    沈念sama閱讀 40,330評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼鸣峭,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了酥艳?” 一聲冷哼從身側響起摊溶,我...
    開封第一講書人閱讀 39,244評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎充石,沒想到半個月后更扁,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,693評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡赫冬,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,885評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了溃列。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片劲厌。...
    茶點故事閱讀 40,001評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖听隐,靈堂內的尸體忽然破棺而出补鼻,到底是詐尸還是另有隱情,我是刑警寧澤雅任,帶...
    沈念sama閱讀 35,723評論 5 346
  • 正文 年R本政府宣布风范,位于F島的核電站,受9級特大地震影響沪么,放射性物質發(fā)生泄漏硼婿。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,343評論 3 330
  • 文/蒙蒙 一禽车、第九天 我趴在偏房一處隱蔽的房頂上張望寇漫。 院中可真熱鬧,春花似錦殉摔、人聲如沸州胳。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,919評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽栓撞。三九已至,卻和暖如春碗硬,著一層夾襖步出監(jiān)牢的瞬間瓤湘,已是汗流浹背瓢颅。 一陣腳步聲響...
    開封第一講書人閱讀 33,042評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留岭粤,地道東北人惜索。 一個月前我還...
    沈念sama閱讀 48,191評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像剃浇,于是被迫代替她去往敵國和親巾兆。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,955評論 2 355

推薦閱讀更多精彩內容