Dubbo的線程模型待诅、handler

本系列參考官網(wǎng)文檔统求、芋道源碼的源碼解讀和《深入理解Apache Dubbo與實戰(zhàn)》一書乘碑。Dubbo版本為2.6.1

這篇文章是為了理清Dubbo里的線程模型挖息、Handler機制,順便解釋在服務(wù)暴露時候最后遺留下來的問題兽肤,即在最后的DubboProtocol#createServer(url)方法里調(diào)用了Exchangers.bind(url, requestHandler)做了什么套腹。
我們將討論當收到請求的時候是怎么從Netty轉(zhuǎn)到Dubbo的邏輯來的,再介紹Handler的調(diào)用鏈路轿衔,并且分析如何將解碼這一IO操作Dubbo的業(yè)務(wù)線程池做的

文章內(nèi)容順序:
1. Dubbo的線程模型
    1.1 Dubbo的線程模型調(diào)用流程
    1.2 Netty線程模型
    1.3 Dubbo中的線程池
    1.4 那么什么業(yè)務(wù)會由Dubbo自己的線程池來實現(xiàn)呢沉迹?
2. Handler的包裝流程
    2.1 Handler包裝圖解和描述
    2.2 創(chuàng)建一個requestHandler
    2.3 Exchange層做兩次包裝new DecodeHandler(new HeaderExchangeHandler(requestHandler))
    2.4 HeaderExchangeHandler#received
    2.5 DecodeHandler#received
    2.6 Transporters#bind
    2.6.1 ChannelHandlerDispatcher#received
    2.7 NettyTransporter(netty3)    
    2.8 NettyServer構(gòu)造方法
    2.9 ChannelHandlers#wrap 又進行三層包裝
    2.10 AllChannelHandler 與Dubbo線程池耦合
         2.10.1 ChannelEventRunnable 線程的執(zhí)行邏輯
     2.11 HeartbeatHandler 處理心跳請求
    2.12 MultiMessageHandler 處理批量請求
    2.13 NettyServer
    2.14 NettyServer#doOpen創(chuàng)建Netty的執(zhí)行鏈睦疫,包裝到NettyHandler里
    2.15 NettyHandler做了什么害驹,為什么要封裝?
3. Handler的調(diào)用順序
4. 需要注意的幾個類
5. Dubbo是怎么用到Netty的

1. Dubbo的線程模型模型

首先來講講Dubbo的線程模型蛤育,可以看下這篇博客的介紹宛官,下面的圖也是來此博客。
dubbo線程模型

1.1Dubbo的線程調(diào)用流程

image.png

客戶端的主線程發(fā)出一個請求后獲得future瓦糕,在執(zhí)行get時進行阻塞等待底洗;
服務(wù)端使用worker線程(netty通信模型)接收到請求后,將請求提交到server線程池(Dubbo線程池)中進行處理
server線程處理完成之后咕娄,將相應(yīng)結(jié)果返回給客戶端的worker線程池(netty通信模型)亥揖,最后,worker線程將響應(yīng)結(jié)果提交到client線程池進行處理
client線程將響應(yīng)結(jié)果填充到future中,然后喚醒等待的主線程费变,主線程獲取結(jié)果摧扇,返回給客戶端

這邊再簡單概括下博客的內(nèi)容:

1.2Netty線程模型

Dubbo使用netty作為網(wǎng)絡(luò)傳輸框架,所以我們先來簡單了解下Netty挚歧,下圖為Netty的線程模型扛稽。


image.png

Netty中存在兩種線程:boss線程worker線程

boss線程的作用:
accept客戶端的連接滑负;將接收到的連接注冊到一個worker線程上
個數(shù):通常情況下在张,服務(wù)端每綁定一個端口,開啟一個boss線程

worker線程的作用:
處理注冊在其身上的連接connection上的各種io事件
個數(shù):默認是核數(shù)+1
注意:
一個worker線程可以注冊多個connection
一個connection只能注冊在一個worker線程上

1.3Dubbo中的線程池

為了配合worker線程工作矮慕,在Dubbo中還實現(xiàn)了自己的線程池來執(zhí)行各種業(yè)務(wù)帮匾。
Dubbo擴展接口 ThreadPool 的SPI實現(xiàn)有如下幾種:

  • fixed:固定大小線程池,啟動時建立線程痴鳄,不關(guān)閉辟狈,一直持有(默認實現(xiàn))。
    coresize:200
    maxsize:200
    隊列:SynchronousQueue
    回絕策略:AbortPolicyWithReport - 打印線程信息jstack夏跷,之后拋出異常
  • cached:緩存線程池哼转,空閑一分鐘自動刪除,需要時重建槽华。
  • limited:可伸縮線程池壹蔓,但池中的線程數(shù)只會增長不會收縮。只增長不收縮的目的是為了避免收縮時突然帶來大流量引起性能問題猫态。

1.4那么什么業(yè)務(wù)會由Dubbo自己的線程池來實現(xiàn)呢佣蓉?

有5種派發(fā)策略:

  • 默認是all:所有消息都派發(fā)到Dubbo線程池,包括請求亲雪,響應(yīng)勇凭,連接事件,斷開事件义辕,心跳等虾标。 即worker線程接收到事件后,將該事件提交到業(yè)務(wù)線程池中灌砖,自己再去處理其他事璧函。
  • direct:worker線程接收到事件后,由worker執(zhí)行到底基显。
  • message:只有請求響應(yīng)消息派發(fā)到Dubbo線程池蘸吓,其它連接斷開事件,心跳等消息撩幽,直接在 IO線程上執(zhí)行
  • execution:只請求消息派發(fā)到Dubbo線程池库继,不含響應(yīng)(客戶端線程池),響應(yīng)和其它連接斷開事件,心跳等消息宪萄,直接在 IO 線程上執(zhí)行
  • connection:在 IO 線程上舅桩,將連接斷開事件放入隊列,有序逐個執(zhí)行雨膨,其它消息派發(fā)到Dubbo線程池擂涛。

image.png

如圖所示切揭,各個策略的實現(xiàn)也很簡單从橘,直接返回了一個對應(yīng)名稱的XXXChannelHandler,說明具體代碼邏輯在這個Handler里面實現(xiàn)运褪。
以上就是基本的Dubbo線程模型了排监。

2. Handler的包裝流程

接下來來講講Dubbo里的Handler狰右,會一步步來分析他是怎么包裝起來的。
參考如下:dubbo的handler機制

2.1 Handler包裝圖解和描述

這是使用Dubbo Protocol并且使用Netty作為服務(wù)器的情況下Handler的整個包裝過程

image.png

上圖是來自參考鏈接的圖的一部分舆床,他的Transport層部分畫錯了棋蚌,我就自己畫了一個,如下圖挨队,比較簡陋谷暮,下圖就是完整的handler包裝。(我也知道你們喜歡彩色的圖啊盛垦,可惜我懶)

image.png

1.在DubboProtocol中構(gòu)建ExchangeHandler命名requestHandler

2.在Exchange層做兩次包裝new DecodeHandler(new HeaderExchangeHandler(requestHandler))湿弦,具體參考類:HeaderExchanger
? ① 使用HeaderExchangeHandler做一次包裝,HeaderExchangeHandler的作用是實現(xiàn)了Request和Response的概念腾夯,當接到received請求后颊埃,將請求轉(zhuǎn)為reply。請參考類HeaderExchangeHandler蝶俱,
? ② 使用DecodeHandler做一次包裝班利,DecodeHandler的作用是用來對Request MessageResponse Message做解碼操作,解碼完成后才能給HeaderExchangeHandler使用榨呆。

3.在Exchange層包裝后的Handler會被傳遞到Transporter層(NettyTransporter)并且把類型轉(zhuǎn)換成ChannelHandler罗标,因為ChannelHandler更為抽象。

4.HandlerTransporter層流轉(zhuǎn)愕提,會被傳遞到NettyServer

5.在NettyServer中被AllChannelHandler包裝馒稍,其作用是把NettyServer接收到的請求轉(zhuǎn)移給Transporter層的線程池來處理皿哨。同步轉(zhuǎn)異步浅侨。

6.接著就先被HeartbeatHandler包裝用以處理心跳請求,接著被MultiMessageHandler包裝用以處理批量請求

7.這個MultiMessageHandler會在NettyServer中以構(gòu)造函數(shù)的方式注入進來

8.NettyServer被再次NettyHandler包裝证膨,NettyHandler的父類是SimpleChannelHandler如输。它屬于NettyHandler。由Netty來管理和調(diào)用其中的回調(diào)方法。Netty在接受到channelActive不见,channelRead等方法后澳化,會把請求轉(zhuǎn)移給DubboHandler,這樣每當請求過來稳吮,NettyHandler接到請求就立馬把數(shù)據(jù)和相關(guān)信息轉(zhuǎn)交給DubboHandler缎谷,由DubboHandler來管理了。

上面是簡單的概括灶似,我們來一個個上代碼

2.2創(chuàng)建一個requestHandler

 private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

        @Override
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                // 獲得請求對應(yīng)的 Invoker 對象
                Invoker<?> invoker = getInvoker(channel, inv);
                // 如果是callback 需要處理高版本調(diào)用低版本的問題
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || !methodsStr.contains(",")) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                // 設(shè)置調(diào)用方的地址
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                // 執(zhí)行調(diào)用
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, message.getClass().getName() + ": " + message
                    + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                this.reply((ExchangeChannel) channel, message);
            } else {
                super.received(channel, message);
            }
        }

        @Override
        public void connected(Channel channel) {
            this.invoke(channel, Constants.ON_CONNECT_KEY);
        }

        @Override
        public void disconnected(Channel channel) throws RemotingException {
            if (logger.isInfoEnabled()) {
                logger.info("disconected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
            }
            this.invoke(channel, Constants.ON_DISCONNECT_KEY);
        }

        /**
         * 調(diào)用方法
         *
         * @param channel 通道
         * @param methodKey 方法名
         */
        private void invoke(Channel channel, String methodKey) {
            // 創(chuàng)建 Invocation 對象
            Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
            // 調(diào)用 received 方法列林,執(zhí)行對應(yīng)的方法
            if (invocation != null) {
                try {
                    this.received(channel, invocation);
                } catch (Throwable t) {
                    logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
                }
            }
        }

        private Invocation createInvocation(Channel channel, URL url, String methodKey) {
            String method = url.getParameter(methodKey);
            if (method == null || method.length() == 0) {
                return null;
            }
            RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
            invocation.setAttachment(Constants.PATH_KEY, url.getPath());
            invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
            invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
            invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
            if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
                invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
            }
            return invocation;
        }
    };
image.png

他實現(xiàn)了ChannelHandler接口5個關(guān)鍵方法,連接酪惭,斷開連接希痴,發(fā)送消息,接受消息和異常處理方法春感。也是rpc調(diào)用的常用處理方法砌创。 同時也是線程派發(fā)處理關(guān)注的方法。

requestHandler第一次用到鲫懒,也就是在服務(wù)暴露那篇最后提的DubboProtocol#Exchangers.bind(url, requestHandler)方法中

2.3 Exchange層做兩次包裝new DecodeHandler(new HeaderExchangeHandler(requestHandler))

進去后發(fā)現(xiàn)Exchangers是個門面類嫩实,調(diào)用的是HeaderExchanger#bind方法,事實上也只有HeaderExchanger這一個實現(xiàn)窥岩。如下圖

image.png

image.png

接著來看HeaderExchanger#bind

public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

注意舶赔,這里我們的requestHandler直接被包裝了兩層,對應(yīng)的是最上面圖里的Exchange層的包裝谦秧,從里到外看看這兩個包裝的Handler做了什么(主要看他的received方法)

2.4 HeaderExchangeHandler#received

/**
 * ExchangeReceiver
 *
 * 基于消息頭部( Header )的信息交換處理器實現(xiàn)類
 */
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
    public void received(Channel channel, Object message) throws RemotingException {
        // 設(shè)置最后的讀時間
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        // 創(chuàng)建 ExchangeChannel 對象
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            // 處理請求( Request )
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                // 處理事件請求
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    // 處理普通請求竟纳,判斷是否要響應(yīng)(即雙向通信)
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                    // 將調(diào)用結(jié)果返回給服務(wù)消費端
                        channel.send(response);
                    // 如果是單向通信,僅向后調(diào)用指定服務(wù)即可疚鲤,無需返回調(diào)用結(jié)果
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            // 處理響應(yīng)( Response )
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            // 處理 String
            } else if (message instanceof String) {
                // 客戶端側(cè)锥累,不支持 String
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                // 服務(wù)端側(cè),目前是 telnet 命令
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
                // 提交給裝飾的 `handler`集歇,繼續(xù)處理
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            // 移除 ExchangeChannel 對象桶略,若已斷開
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }
//省略其他代碼
}

代碼注釋已經(jīng)比較清晰,簡單來說就是會辨別收到的message是服務(wù)端收到的Request還是消費端發(fā)起請求后得到的Reponse進行一系列業(yè)務(wù)判斷操作(比如對于不同的請求诲宇,有事件請求际歼、需要響應(yīng)的和不需要響應(yīng)的,都有不同的執(zhí)行邏輯)
如果需要響應(yīng)還會傳到更里面一層也就是我們的requestHandler執(zhí)行received方法

再來看看在更外面一層的DecodeHandler

2.5DecodeHandler#received

public class DecodeHandler extends AbstractChannelHandlerDelegate {
    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
 // 對 Decodeable 接口實現(xiàn)類對象進行解碼
            decode(message);
        }

        if (message instanceof Request) {
// 對 Request 的 data 字段進行解碼
            decode(((Request) message).getData());
        }

        if (message instanceof Response) {
// 對 Request 的 result 字段進行解碼
            decode(((Response) message).getResult());
        }
// 執(zhí)行后續(xù)邏輯
        handler.received(channel, message);
    }

    private void decode(Object message) {
        if (message != null && message instanceof Decodeable) {
            try {
                ((Decodeable) message).decode(); // 解析消息
                if (log.isDebugEnabled()) {
                    log.debug(new StringBuilder(32).append("Decode decodeable message ").append(message.getClass().getName()).toString());
                }
            } catch (Throwable e) {
                if (log.isWarnEnabled()) {
                    log.warn(new StringBuilder(32).append("Call Decodeable.decode failed: ").append(e.getMessage()).toString(), e);
                }
            } // ~ end of catch
        } // ~ end of if
    } // ~ end of method decode
//省略其他代碼
}

注意看這個received中的方法姑蓝,很明顯這個DecodeHandler就是用來解碼的鹅心,根據(jù)不同的message類型,執(zhí)行不同的邏輯纺荧,,這里的if (message instanceof Decodeable)判斷下的代碼就是我們解碼篇提到的交由業(yè)務(wù)線程池來執(zhí)行解碼操作旭愧。
每個Handler都會執(zhí)行到這一步颅筋,如果被解碼過自然不用再解碼了(在DecodeableRpcInvocation或者DecodeableRpcResult中會有是否解碼已完成的標志位),如果還未解碼输枯,當前線程就會進行解碼操作

接下來進入到Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))中的Transporters#bind方法

2.6 Transporters#bind

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        // 創(chuàng)建 handler
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        // 創(chuàng)建 Server 對象
        return getTransporter().bind(url, handler);
    }

同樣是門面類议泵,會調(diào)用真正的Transporter來執(zhí)行bind方法,
這里我們只傳了一個ChannelHandler對象桃熄,所以直接到了rerun先口,如果ChannelHandler有多個的情況下,說明這些handler是同級的瞳收,new ChannelHandlerDispatcher(handlers)每個實現(xiàn)的方法池充,都會循環(huán)調(diào)用 channelHandlers的方法

2.6.1ChannelHandlerDispatcher#received

如下面的代碼例子所示:

public class ChannelHandlerDispatcher implements ChannelHandler {
    public void received(Channel channel, Object message) {
        for (ChannelHandler listener : channelHandlers) {
            try {
                listener.received(channel, message);
            } catch (Throwable t) {
                logger.error(t.getMessage(), t);
            }
        }
    }
//省略其他代碼
}

那我們便直接來到真正的Transporter#bind方法中一探究竟。

image.png

這個Transporter有四種實現(xiàn)缎讼,其中又有netty3netty4收夸,默認實現(xiàn)為netty3,我們就來看看netty3的實現(xiàn)吧

netty3的NettyTransporter 實現(xiàn)如下

2.7 NettyTransporter(netty3)

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

}

接著來看NettyServer的實現(xiàn)

2.8 NettyServer構(gòu)造方法

public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
}

又是一層對handler的包裝血崭,調(diào)用了ChannelHandlers#wrap方法卧惜,跟進去看看

這里注意一下!<腥摇咽瓷!我們先關(guān)注ChannelHandlers#warp的調(diào)用,這里的super方法我們最后會分析舰讹。

2.9 ChannelHandlers#wrap 又進行三層包裝

public class ChannelHandlers {

    /**
     * 單例
     */
    private static ChannelHandlers INSTANCE = new ChannelHandlers();



    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(
                new HeartbeatHandler(
                        ExtensionLoader.getExtensionLoader(Dispatcher.class)
                                     .getAdaptiveExtension().dispatch(handler, url)
                )
        );
    }
//省略其他方法

}

wrap方法直接調(diào)用了他自己實現(xiàn)的wrapInternal方法茅姜,套了一層又一層。
注意ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url)這一段代碼月匣,dispatch(handler, url)前面很好理解钻洒,就是拿到Dispatcher這個接口的擴展類,我們的默認擴展類是AllDispatcher锄开,在前面已經(jīng)介紹過了素标,我們就以默認的類來繼續(xù)講,后面直接調(diào)用了這個AllDispatcher#dispatch方法萍悴,這個我們前面也同樣介紹過头遭,再來簡單復(fù)習下下面的圖,就是直接把我們的handler又又又封裝了一層癣诱,封裝到了AllChannelHandler中计维。那么就來看看這個AllChannelHandler又做了什么吧

image.png

2.10 AllChannelHandler 與Dubbo線程池耦合

/**
 * `all` 所有消息都派發(fā)到線程池,包括請求撕予,響應(yīng)剑按,連接事件艺蝴,斷開事件,心跳等。
 */
public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

 public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
          //用來解決線程池已滿后無法將異常信息發(fā)送到另一端的問題的臨時解決方案,仍然需要重構(gòu)
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
}

這個類還調(diào)用了父類的有參構(gòu)造,父類的構(gòu)造方法里面就是通過SPI機制拿到url后創(chuàng)建對應(yīng)的線程池(還記得我們Dubbo中的三種線程池不,默認是fixed)疗杉,這里就不貼代碼了蚕礼。
還是來看看這個AllChannelHandler#received方法,直接調(diào)用了線程池來執(zhí)行方法奠蹬。

2.10.1 ChannelEventRunnable 線程的執(zhí)行邏輯

熟悉線程池的小伙伴肯定猜出來了玖翅,這個執(zhí)行的ChannelEventRunnable肯定是實現(xiàn)了Runnable接口的類了金度。再來進去看看他的執(zhí)行邏輯吧

public class ChannelEventRunnable implements Runnable {
    public void run() {
        switch (state) {
            case CONNECTED:
                try {
                    handler.connected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case DISCONNECTED:
                try {
                    handler.disconnected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case SENT:
                try {
                    handler.sent(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is " + message, e);
                }
                break;
            case RECEIVED:
                try {
                    handler.received(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is " + message, e);
                }
                break;
            case CAUGHT:
                try {
                    handler.caught(channel, exception);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is: " + message + ", exception is " + exception, e);
                }
                break;
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
        }
    }
//省略其他代碼
}

這里的線程池很明顯丢胚,用了case語句來判斷到底執(zhí)行的我們傳入的包裝類DecodeHandler里的什么方法。(AllChannelHandler里面的一層是DecodeHandler)

到了這里峡蟋,我們別忘了在ChannelHandlers#wrapInternal還沒分析完,外面還有一層HeartbeatHandler和一層MultiMessageHandler赖舟。先來看HeartbeatHandler

2.11 HeartbeatHandler 處理心跳請求

public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
    public void received(Channel channel, Object message) throws RemotingException {
        // 設(shè)置最后的讀時間
        setReadTimestamp(channel);
        // 如果是心跳事件請求,返回心跳事件的響應(yīng)
        if (isHeartbeatRequest(message)) {
            Request req = (Request) message;
            if (req.isTwoWay()) {
                Response res = new Response(req.getId(), req.getVersion());
                res.setEvent(Response.HEARTBEAT_EVENT);
                channel.send(res);
                if (logger.isInfoEnabled()) {
                    int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                                + ", cause: The channel has no data-transmission exceeds a heartbeat period"
                                + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
                    }
                }
            }
            return;
        }
        // 如果是心跳事件響應(yīng)宾抓,返回
        if (isHeartbeatResponse(message)) {
            if (logger.isDebugEnabled()) {
                logger.debug(new StringBuilder(32).append("Receive heartbeat response in thread ").append(Thread.currentThread().getName()).toString());
            }
            return;
        }
        // 提交給裝飾的 `handler`,繼續(xù)處理
        handler.received(channel, message);
    }
//省略其他代碼
}

可以看到HeartbeatHandlerreceived方法進行了處理痛单,所以心跳的消息的接受和發(fā)送是不會派發(fā)到Dubbo線程池的。

接著是外面的那層MultiMessageHandler啦

2.12 MultiMessageHandler 處理批量請求

public class MultiMessageHandler extends AbstractChannelHandlerDelegate {

    public MultiMessageHandler(ChannelHandler handler) {
        super(handler);
    }

    @SuppressWarnings("unchecked")
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof MultiMessage) { // 多消息
            MultiMessage list = (MultiMessage) message;
            for (Object obj : list) {
                handler.received(channel, obj);
            }
        } else {
            handler.received(channel, message);
        }
    }

}

從他的名字就可以看出來主要是處理多消息的劲腿,主要完成多消息類型的循環(huán)解析接收。

這里來做下收尾:還記得我當時三個!:鲂佟!的NettyServer構(gòu)造方法調(diào)用的父類構(gòu)造方法嘛袋倔,一起來看看批狐。

2.13 NettyServer

public abstract class AbstractServer extends AbstractEndpoint implements Server {
    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        // 服務(wù)地址
        localAddress = getUrl().toInetSocketAddress();
        // 綁定地址
        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            bindIp = NetUtils.ANYHOST;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        // 服務(wù)器最大可接受連接數(shù)
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        // 空閑超時時間
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);

        // 開啟服務(wù)器
        try {
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }

        // 獲得線程池
        //fixme replace this with better method
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }
//省略其他代碼
}

這個AbstractServer就是NettyServer的父類,可以看到這個構(gòu)造方法中前塔,開啟服務(wù)器時調(diào)用了doOpen()方法嚣艇,這是交由子類自己的實現(xiàn)的方法承冰。(注意,我們這里只關(guān)注handler的傳遞過程食零,其他代碼的介紹就略過啦)

2.14 NettyServer#doOpen創(chuàng)建Netty的執(zhí)行鏈困乒,包裝到NettyHandler里

public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

    @Override
 protected void doOpen() {
        // 設(shè)置日志工廠
        NettyHelper.setNettyLoggerFactory();

        // 創(chuàng)建線程池
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));

        // 創(chuàng)建 ChannelFactory 對象
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        // 實例化 ServerBootstrap
        bootstrap = new ServerBootstrap(channelFactory);

        // 創(chuàng)建 NettyHandler 對象,著重注意;藕椤6パ唷凑保!
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        // 設(shè)置 `channels` 屬性
        channels = nettyHandler.getChannels();
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                // 創(chuàng)建 NettyCodecAdapter 對象
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder()); // 解碼
                pipeline.addLast("encoder", adapter.getEncoder()); // 解碼
                pipeline.addLast("handler", nettyHandler); // 處理器冈爹,著重注意!E芬频伤!
                return pipeline;
            }
        });
        // 服務(wù)器綁定端口監(jiān)聽
        // bind
        channel = bootstrap.bind(getBindAddress());
    }
//省略其他代碼
}

這個就是NettyServer#doOpen()的代碼啦,注意代碼打上著重注意Vゴ恕1镄ぁ!的兩行婚苹,
一行是final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
將這個NettyServer又包裝進了NettyHandler里岸更,
還有一行是pipeline.addLast("handler", nettyHandler);,這是Netty的機制膊升,
可以參考詳細講解Netty中Pipeline責任鏈做一個簡單的了解怎炊,這里解釋了為什么收到消息后能調(diào)用到我們的Handler。
順帶一提廓译,NettyServer其實也實現(xiàn)了ChannelHandler评肆,他也算是個"handler",看下圖非区。

image.png

閑話至此瓜挽,我們來看下NettyHandler做了什么,為什么要封裝征绸?

2.15 NettyHandler做了什么久橙,為什么要封裝?

@Sharable
public class NettyHandler extends SimpleChannelHandler {

    public NettyHandler(URL url, ChannelHandler handler) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.url = url;
        this.handler = handler;
    }

    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            handler.received(channel, e.getMessage());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }
//省略其他代碼
}

這個SimpleChannelHandlerNetty的接口管怠,說明這個NettyHandlerNetty handler服務(wù)Dubbo handler職責交接的地方淆衷,Netty首先接收到信息后在內(nèi)部進行一連串調(diào)用,然后調(diào)用到Dubbo自己實現(xiàn)的Netty接口的方法排惨,從此開始在Dubbo中進行方法的調(diào)用吭敢。
再順帶一提,當我們debug的時到NettyHandler#messageReceived這行的時候暮芭,
這里的handler一個個分解下去鹿驼,就是包裝的順序啦欲低,看下圖。

image.png

這里再再順便提一嘴畜晰,我們介紹的都是以Netty3砾莱,如果換成Netty4實現(xiàn)的話,最外面的handler類名不是現(xiàn)在的NettyHandler而是NettyServerHandler凄鼻。(一開始我迷糊了后來才知道所以在這一提腊瑟,可能會有跟我一樣迷糊的人呢)

3. Handler的調(diào)用順序

至此,我們的Handler終于分析完畢了块蚌!闰非,再來捋一遍調(diào)用的順序:

  • netty處理接收請求
  • ->NioServerSocketPipelineSink$Boss
  • ->注冊NioWorker線程用于處理長連接和IO操作
  • ->DefaultChannelPipeline(處理請求的反序列化和分發(fā)handler,初始化過程見NettyServer峭范,會往pipeline加入decoder财松、encoder和NettyHandler)
  • ->decoder做反序列化
  • ->NettyHandler處理請求
  • ->NettyServer處理請求(NettyServer初始化封裝了MultiMessageHandler、HeartbeatHandler以及根據(jù)dispatcher得到的channelHandler(里面還有封裝纱控,就不往下說了)辆毡,默認是AllDispatcher)
  • ->MultiMessageHandler for each處理批量請求
  • ->HeartbeatHandler處理心跳請求
  • ->AllDispatcher得到的AllChannelHandler處理將通道所有狀態(tài)變更用新線程處理(包括建立連接、斷開連接甜害、接收請求舶掖、異常)
  • ->從指定的線程池拿到線程執(zhí)行rpc請求
  • ->DecodeHandler進行對message的解碼,此操作可能會被放在work線程進行尔店,如果work線程已經(jīng)解碼過了眨攘,得到的message內(nèi)部會有標識,解碼的方法會直接返回進行下一步操作
  • ->HeaderExchangeHandler判斷雙向和單向處理請求闹获,雙向則將結(jié)果使用當前channel回寫
  • ->到DubboProtocol$ExchangeHandlerAdapter處理請求調(diào)用
  • ->到此服務(wù)端網(wǎng)絡(luò)傳輸層結(jié)束

4. 需要注意的幾個類

1.DubboProtocol類期犬,Dubbo Handler初始化創(chuàng)建的地方
2.HeaderExchangeHandler類,Request和Response概念重點提現(xiàn)的地方
3.DecodeHandler類避诽,Dubbo線程池幫忙解碼的地方
3.NettyHandler類龟虎,Netty Handler服務(wù)與Dubbo Handler職責交接的地方

5. Dubbo是怎么用到Netty的?

可以看到沙庐,dubbo 使用Netty還是挺簡單的鲤妥,消費者使用 NettyClient,提供者使用 NettyServer拱雏,Provider 啟動的時候棉安,會開啟端口監(jiān)聽,使用我們平時啟動 Netty一樣的方式铸抑。而ClientSpring getBean的時候贡耽,會創(chuàng)建Client,當調(diào)用遠程方法的時候,將數(shù)據(jù)通過 dubbo 協(xié)議編碼發(fā)送到 NettyServer蒲赂,然后 NettServer收到數(shù)據(jù)后解碼阱冶,并調(diào)用本地方法,并返回數(shù)據(jù)滥嘴,完成一次完美的 RPC 調(diào)用木蹬。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市若皱,隨后出現(xiàn)的幾起案子镊叁,更是在濱河造成了極大的恐慌,老刑警劉巖走触,帶你破解...
    沈念sama閱讀 217,826評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件晦譬,死亡現(xiàn)場離奇詭異,居然都是意外死亡饺汹,警方通過查閱死者的電腦和手機蛔添,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,968評論 3 395
  • 文/潘曉璐 我一進店門痰催,熙熙樓的掌柜王于貴愁眉苦臉地迎上來兜辞,“玉大人,你說我怎么就攤上這事夸溶∫莩常” “怎么了?”我有些...
    開封第一講書人閱讀 164,234評論 0 354
  • 文/不壞的土叔 我叫張陵缝裁,是天一觀的道長扫皱。 經(jīng)常有香客問我,道長捷绑,這世上最難降的妖魔是什么韩脑? 我笑而不...
    開封第一講書人閱讀 58,562評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮粹污,結(jié)果婚禮上段多,老公的妹妹穿的比我還像新娘。我一直安慰自己壮吩,他們只是感情好进苍,可當我...
    茶點故事閱讀 67,611評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著鸭叙,像睡著了一般觉啊。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上沈贝,一...
    開封第一講書人閱讀 51,482評論 1 302
  • 那天杠人,我揣著相機與錄音,去河邊找鬼。 笑死嗡善,一個胖子當著我的面吹牛市俊,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播滤奈,決...
    沈念sama閱讀 40,271評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼摆昧,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了蜒程?” 一聲冷哼從身側(cè)響起绅你,我...
    開封第一講書人閱讀 39,166評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎昭躺,沒想到半個月后忌锯,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,608評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡领炫,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,814評論 3 336
  • 正文 我和宋清朗相戀三年偶垮,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片帝洪。...
    茶點故事閱讀 39,926評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡似舵,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出葱峡,到底是詐尸還是另有隱情砚哗,我是刑警寧澤,帶...
    沈念sama閱讀 35,644評論 5 346
  • 正文 年R本政府宣布砰奕,位于F島的核電站弃鸦,受9級特大地震影響洗鸵,放射性物質(zhì)發(fā)生泄漏帕翻。R本人自食惡果不足惜屏箍,卻給世界環(huán)境...
    茶點故事閱讀 41,249評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望胸哥。 院中可真熱鬧涯竟,春花似錦、人聲如沸烘嘱。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,866評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蝇庭。三九已至醉鳖,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間哮内,已是汗流浹背盗棵。 一陣腳步聲響...
    開封第一講書人閱讀 32,991評論 1 269
  • 我被黑心中介騙來泰國打工壮韭, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人纹因。 一個月前我還...
    沈念sama閱讀 48,063評論 3 370
  • 正文 我出身青樓喷屋,卻偏偏與公主長得像,于是被迫代替她去往敵國和親瞭恰。 傳聞我的和親對象是個殘疾皇子屯曹,可洞房花燭夜當晚...
    茶點故事閱讀 44,871評論 2 354