dubbo的調(diào)用過程

一、消費(fèi)者發(fā)起請求

1.1 調(diào)用入口

在@Reference注入的bean的invoke方法,即Invoker.invoke。

public interface Invoker<T> extends Node {
    /**
     * get service interface.
     *
     * @return service interface.
     */
    Class<T> getInterface();

    /**
     * invoke.
     *
     * @param invocation
     * @return result
     * @throws RpcException
     */
    Result invoke(Invocation invocation) throws RpcException;
}

然后依次調(diào)用的路徑:
MockClusterInvoker.invoke -> //用來支持mock
AbstractClusterInvoker.invoke -> //用來加載指定的負(fù)載均衡策略
FailoverClusterInvoker.doInvoke -> //用來負(fù)載均衡選擇具體哪個提供者的invoker
AbstractInvoker.invoke -> 用來初始化一些數(shù)據(jù)
DubboInvoker.doInvoke -> 用來執(zhí)行invoke調(diào)用
HeaderExchangeClient.request -> 用來執(zhí)行網(wǎng)絡(luò)調(diào)用請求
HeaderExchangeChannel.request -> 用來執(zhí)行封裝Request請求
AbstractPeer.send ->
NettyChannel.send ->
暫時到這里。想一下為啥會上上面的調(diào)用路徑呢售葡?因?yàn)檎嬲腄ubboInvoke會被包裝為很多層。比如為了滿足服務(wù)治理:使用FailoverClusterInvoker忠藤,為了滿足mock使用MockClusterInvoker挟伙,為了滿足過濾器,使用ProtocolFilterWrapper這里沒有顯示出來模孩,等等尖阔。
這里從DubboInvoker的doInvoke開始看,源碼如下:

 @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

可以看到榨咐,正常調(diào)用都是雙向的介却,且是同步的。所以會走到else里面块茁。如下:

 RpcContext.getContext().setFuture(null);
 return (Result) currentClient.request(inv, timeout).get();

這里有個關(guān)鍵的地方:用戶線程在發(fā)送完request請求后齿坷,使用get()方法阻塞本次調(diào)用的用戶線程,等待ResponseFuture的返回数焊。而該ResponseFuture的實(shí)現(xiàn)類DefaultFuture永淌。下面看下DefaultFuture的get方法:

 public Object get() throws RemotingException {
        return get(timeout);
    }

    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

明顯可以看到!isDone的話,線程await超時時間佩耳,阻塞這里等待回來的數(shù)據(jù)仰禀。用戶線程到這里就結(jié)束了。等待request返回蚕愤,并喚醒該線程答恶。
下面我看下request方法:

public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

到這里看到future=new DefaultFuture()了。然后調(diào)用了channel.send(req)萍诱,后面就是netty的框架了悬嗓,下節(jié)會介紹。

1.2 消費(fèi)者調(diào)用過程---NIO發(fā)送請求

這里主要是介紹:NettyChannel.send方法:

public void send(Object message, boolean sent) throws RemotingException {
        super.send(message, sent);

        boolean success = true;
        int timeout = 0;
        try {
            ChannelFuture future = channel.write(message);
            if (sent) {
                timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            Throwable cause = future.getCause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
        }

        if (!success) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                    + "in timeout(" + timeout + "ms) limit");
        }
    }

追蹤下去裕坊,到了:

  public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
        ChannelFuture future = future(channel);
        channel.getPipeline().sendDownstream(
                new DownstreamMessageEvent(channel, future, message, remoteAddress));
        return future;
    }

目前到這里包竹,我們的調(diào)用路徑如下:


dubbo調(diào)用棧1.png

Channels.write方法源碼如上,調(diào)用了sendDownstream方法籍凝。在追蹤下去周瞎,sendDownstream源碼如下:

 void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
        if (e instanceof UpstreamMessageEvent) {
            throw new IllegalArgumentException("cannot send an upstream event to downstream");
        }
        
        try {
            ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
        } catch (Throwable t) {
            // Unlike an upstream event, a downstream event usually has an
            // incomplete future which is supposed to be updated by ChannelSink.
            // However, if an exception is raised before the event reaches at
            // ChannelSink, the future is not going to be updated, so we update
            // here.
            e.getFuture().setFailure(t);
            notifyHandlerException(e, t);
        }
    }

可以看到handleDownstream方法,由指定handler執(zhí)行饵蒂。這里使用OneToOneEncoder執(zhí)行:

 public void handleDownstream(
            ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
        if (!(evt instanceof MessageEvent)) {
            ctx.sendDownstream(evt);
            return;
        }

        MessageEvent e = (MessageEvent) evt;
        Object originalMessage = e.getMessage();
        Object encodedMessage = encode(ctx, e.getChannel(), originalMessage);
        if (originalMessage == encodedMessage) {
            ctx.sendDownstream(evt);
        } else if (encodedMessage != null) {
            write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress());
        }
    }

encode方法就是模板方法了声诸,實(shí)現(xiàn)由子類來實(shí)現(xiàn)。然后退盯,追蹤到在NettyCodecAdapter里面彼乌,有個內(nèi)部類InternalEncoder實(shí)現(xiàn)了encode方法。


    @Sharable
    private class InternalEncoder extends OneToOneEncoder {

        @Override
        protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
            com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
                    com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
            NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
            try {
                codec.encode(channel, buffer, msg);
            } finally {
                NettyChannel.removeChannelIfDisconnected(ch);
            }
            return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
        }
    }

最后渊迁,這里的codec.encode(channel, buffer, msg)委托給了DubboCountCodec.encode
總結(jié):

這節(jié)內(nèi)容把消費(fèi)者的send請求和編碼慰照,序列化等底層操作結(jié)合起來了。不能追蹤到netty層就不向下了琉朽,其實(shí)dubbo拓展了很多netty的類毒租。導(dǎo)致雖然調(diào)用已經(jīng)走到netty框架,但是很多業(yè)務(wù)處理箱叁,netty還需要回調(diào)netty拓展的功能墅垮。這種細(xì)節(jié)還是不能馬虎,需要搞懂蝌蹂。舉個例子:org.jboss.netty.handler.codec.oneone.OneToOneEncoder是netty的抽象類噩斟,必須由子類實(shí)現(xiàn),然后netty在調(diào)用的時候孤个,會調(diào)用dubbo實(shí)現(xiàn)的子類剃允。這里是InternalEncoder類。環(huán)環(huán)相扣=_=

緊接著上面的encode結(jié)束后齐鲤,調(diào)用了:write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress())方法斥废,繼續(xù)追蹤:

 public void sendDownstream(ChannelEvent e) {
            DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
            if (prev == null) {
                try {
                    getSink().eventSunk(DefaultChannelPipeline.this, e);
                } catch (Throwable t) {
                    notifyHandlerException(e, t);
                }
            } else {
                DefaultChannelPipeline.this.sendDownstream(prev, e);
            }
        }

該方法屬于DefaultChannelPipeline.java類,prev必然為null给郊,然后調(diào)用 getSink().eventSunk(DefaultChannelPipeline.this, e)方法牡肉,繼續(xù)追蹤到NioClientSocketPipelineSink類,如下:

 public void eventSunk(
            ChannelPipeline pipeline, ChannelEvent e) throws Exception {
        if (e instanceof ChannelStateEvent) {
            ChannelStateEvent event = (ChannelStateEvent) e;
            NioClientSocketChannel channel =
                (NioClientSocketChannel) event.getChannel();
            ChannelFuture future = event.getFuture();
            ChannelState state = event.getState();
            Object value = event.getValue();

            switch (state) {
            case OPEN:
                if (Boolean.FALSE.equals(value)) {
                    channel.worker.close(channel, future);
                }
                break;
            case BOUND:
                if (value != null) {
                    bind(channel, future, (SocketAddress) value);
                } else {
                    channel.worker.close(channel, future);
                }
                break;
            case CONNECTED:
                if (value != null) {
                    connect(channel, future, (SocketAddress) value);
                } else {
                    channel.worker.close(channel, future);
                }
                break;
            case INTEREST_OPS:
                channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
                break;
            }
        } else if (e instanceof MessageEvent) {
            MessageEvent event = (MessageEvent) e;
            NioSocketChannel channel = (NioSocketChannel) event.getChannel();
            boolean offered = channel.writeBuffer.offer(event);
            assert offered;
            channel.worker.writeFromUserCode(channel);
        }
    }

到了這里淆九,終于看到我們想要的事件了统锤。這里根據(jù)事件類型選擇了MessageEvent毛俏。看到調(diào)用了writeFromUserCode方法饲窿,在后面就只有netty的代碼了煌寇,調(diào)用了write0方法。這里就不在向下追蹤了逾雄,有興趣的可以自己看netty阀溶。
調(diào)用鏈路如下,緊接著上面的圖:


dubb調(diào)用棧2.png

總結(jié):到了writeFromUserCode這里鸦泳,總算把dubbo請求發(fā)送給了網(wǎng)絡(luò)银锻。剩下就是網(wǎng)絡(luò)包通過TCP/IP協(xié)議,傳到提供者ip那里去了做鹰。在消費(fèi)者發(fā)送請求的過程中击纬,一直使用了是一個線程,在線程執(zhí)行完request的send操作后誊垢,同步得到一個Future掉弛,然后一直阻塞在Future.get()方法上,等待返回值喂走。其實(shí)這個調(diào)用過程抽象出來就是:
1.服務(wù)治理選出一個最佳的提供者ip
2.執(zhí)行invoker的各種filter殃饿,類似AOP功能
3.構(gòu)造請求request,然后encode請求參數(shù)芋肠,便于網(wǎng)絡(luò)傳輸
4.把序列化好后的二進(jìn)制流傳遞給netty
5.netty把數(shù)據(jù)發(fā)送到網(wǎng)絡(luò)上

二乎芳、提供者接收請求

參考文檔:http://dubbo.apache.org/zh-cn/docs/source_code_guide/service-invoking-process.html

提供者會一直在dubbo指定的端口上,while(true)監(jiān)聽channel中有沒有數(shù)據(jù)達(dá)到帖池。通過netty的reactor模式奈惑,netty的IO線程監(jiān)聽有數(shù)據(jù)達(dá)到,然后看是什么事件睡汹,select喚起對應(yīng)的事件處理器肴甸。事件處理,由單獨(dú)的線程池去完成囚巴。先看看原在,在提供者export服務(wù)的時候,bind的代碼:

 public Channel bind(final SocketAddress localAddress) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }

        final BlockingQueue<ChannelFuture> futureQueue =
            new LinkedBlockingQueue<ChannelFuture>();

        ChannelHandler binder = new Binder(localAddress, futureQueue);
        ChannelHandler parentHandler = getParentHandler();

        ChannelPipeline bossPipeline = pipeline();
        bossPipeline.addLast("binder", binder);
        if (parentHandler != null) {
            bossPipeline.addLast("userHandler", parentHandler);
        }

        Channel channel = getFactory().newChannel(bossPipeline);

        // Wait until the future is available.
        ChannelFuture future = null;
        boolean interrupted = false;
        do {
            try {
                future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                interrupted = true;
            }
        } while (future == null);

        if (interrupted) {
            Thread.currentThread().interrupt();
        }

        // Wait for the future.
        future.awaitUninterruptibly();
        if (!future.isSuccess()) {
            future.getChannel().close().awaitUninterruptibly();
            throw new ChannelException("Failed to bind to: " + localAddress, future.getCause());
        }

        return channel;
    }

其中有段代碼:

   do {
            try {
                future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                interrupted = true;
            }
        } while (future == null);

futureQueue會阻塞獲取channel上的可讀數(shù)據(jù)彤叉,如果有數(shù)據(jù)達(dá)到庶柿,那么就喚醒監(jiān)聽線程,調(diào)用decode對數(shù)據(jù)進(jìn)行解碼秽浇。

bind前面還有段代碼:

     bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());

pipeline會設(shè)置channel的處理器鏈浮庐。在接收請求后,decoder結(jié)束后柬焕,下個處理器是handler审残。然后下面就分析梭域,nettyHandler。

2.1 服務(wù)提供者暴露服務(wù)的Server初始化過程

下面以dubbo為例:
DubboProtocol.export方法的源碼如下:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }

創(chuàng)建server的核心方法在openServer维苔,如下:

   private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                serverMap.put(key, createServer(url));
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }

核心方法在createServer碰辅,如下:

private ExchangeServer createServer(URL url) {
        // send readonly event when server closes, it's enabled by default
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);

        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }

核心方法在Exchangers.bind方法,其中requestHandler是dubboProtocol類內(nèi)部重寫的內(nèi)部類介时。它是提供者bean接收方法要處理的最底層處理邏輯。下面看看它的核心方法凌彬,reply方法沸柔,如下:

public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                // need to consider backward-compatibility if it's a callback
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

把message轉(zhuǎn)換為dubbo 語義下的Invocation,該Invocation包含了調(diào)用方法铲敛,調(diào)用參數(shù)等等全面的信息褐澎,足夠運(yùn)行執(zhí)行了。下面我們在看下這個real handler上面包裝了哪些其他handler伐蒋,并成為了一個怎樣的過濾器鏈工三?繼續(xù)看bind方法。一路追蹤下去先鱼,看下HeaderExchanger的bind方法俭正,如下:

  public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

看到第一個構(gòu)造過濾器鏈的handler了,就是HeaderExchangeHandler焙畔。繼續(xù)跟進(jìn)Transporters.bind方法掸读,如下:

 public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }

繼續(xù)跟蹤下去,到new NettyServer宏多,handler還是上面的HeaderExchangeHandler儿惫。它的構(gòu)造方法如下:

  public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

在創(chuàng)建nettyServer的時候,會調(diào)用ChannelHandlers.wrap方法伸但,構(gòu)造一個handler的過濾器模式肾请。代碼如下:

public class ChannelHandlers {

    private static ChannelHandlers INSTANCE = new ChannelHandlers();

    protected ChannelHandlers() {
    }

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }

    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }

    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}

可以看到wrap方法調(diào)用了wrapInternal方法。該方法更胖,先new MultiMessageHandler铛铁,然后在HeartBeatHandler,再Dispatcher的Handler函喉。而它是通過SPI去指定的避归。所以,這個過濾器鏈就構(gòu)成了在執(zhí)行的時候管呵,按照從前向后的順序梳毙,執(zhí)行鏈。
總結(jié):整個過濾器鏈算是構(gòu)造好了捐下,MultiMessageHandler -> HeartBeatHandler -> AllDispatcher -> HeaderExchanger -> dubbo real handler
MultiMessageHandler這些Handler都是使用了裝飾器模式账锹,對傳入的handler進(jìn)行了裝飾行為萌业。

這一節(jié)分析,其實(shí)和第四節(jié)分析的很像奸柬。因?yàn)樗麄兲幚砟P投际菍Φ鹊纳辍L峁┱遰eceived request和消費(fèi)者received reponse,兩者過程其實(shí)都很相似廓奕。都是把realHandler一層一層封裝抱婉,最終給到netty的pepiline管道。

三桌粉、提供者發(fā)送結(jié)果

上面已經(jīng)分析了提供者接收到nettp的tcp數(shù)據(jù)message后蒸绩。給到MultiMessageHandler.received方法,然后在給到HeartBeatHandler.received铃肯,再給到AllChannelHandler.received患亿。我們就來看下AllChannelHandler的received方法。如下:

@Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executorService = getExecutorService();
        try {
            executorService.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO 臨時解決線程池滿后異常信息無法發(fā)送到對端的問題押逼。待重構(gòu)
            //fix 線程池滿了拒絕調(diào)用不返回步藕,導(dǎo)致消費(fèi)者一直等待超時
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted, " +
                            "detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
        }
    }

看到這行:

            executorService.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));

直接把netty線程傳遞過來的message傳遞給線程池的ChannelEventRunnable,然后就返回了挑格。剩下的由dubbo線程池處理咙冗,這里如果dubbo線程池滿了,是無法處理任何請求的恕齐。咱們看下ChannelEventRunnable的run方法乞娄,看看它到底干了啥:

 @Override
    public void run() {
        switch (state) {
            case CONNECTED:
                try {
                    handler.connected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case DISCONNECTED:
                try {
                    handler.disconnected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case SENT:
                try {
                    handler.sent(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is " + message, e);
                }
                break;
            case RECEIVED:
                try {
                    handler.received(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is " + message, e);
                }
                break;
            case CAUGHT:
                try {
                    handler.caught(channel, exception);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is: " + message + ", exception is " + exception, e);
                }
                break;
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
        }
    }

很明顯,這次事件是接收請求显歧,那么就是RECEIVED了仪或。看到執(zhí)行了handler.received方法士骤。這個handler我們上面分析了范删,是DecodeHandler(初始化的時候指定的)。進(jìn)去看下:

public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
            decode(message);
        }

        if (message instanceof Request) {
            decode(((Request) message).getData());
        }

        if (message instanceof Response) {
            decode(((Response) message).getResult());
        }

        handler.received(channel, message);
    }

肯定會走到decode(request)這里拷肌。然后解碼完畢后到旦,執(zhí)行handler。這里的handler是HeaderExchangeHandler巨缘,也是初始化就指定的了添忘。進(jìn)去看下它的received方法:

public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

對于request,最終會走到:handler.received(exchangeChannel, request.getData())這里若锁。
繼續(xù)handler搁骑,這個handler就是dubboProtocol的內(nèi)部匿名類requestHandler:

@Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);
            } else {
                super.received(channel, message);
            }
        }

繼續(xù)看reply方法:

 @Override
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                //如果是callback 需要處理高版本調(diào)用低版本的問題
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || !methodsStr.contains(",")) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: "
                    + (message == null ? null : (message.getClass().getName() + ": " + message))
                    + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

getInvoke得到實(shí)現(xiàn)類的代理類,然后調(diào)用實(shí)現(xiàn)類的代理類,執(zhí)行真正的業(yè)務(wù)邏輯得到結(jié)果仲器。那么這個結(jié)果煤率,怎么發(fā)送給消費(fèi)者呢?猜測肯定是用send方法發(fā)送乏冀。但是在哪里呢蝶糯?
答案就在我們上面看到的HeaderExchangeHandler的received方法里面。再看下:

public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

看到如下代碼:

 if (request.isTwoWay()) {
      Response response = handleRequest(exchangeChannel, request);
      channel.send(response);
         } else {
             handler.received(exchangeChannel, request.getData());
              }

如果是雙向通行辆沦,表示消費(fèi)者正在等著結(jié)果呢昼捍。所以,這里handleRequest結(jié)束后众辨,就要把response結(jié)果send出去端三。通過channel.send出去。下面我們就要分析這個channel是怎么初始化的鹃彻,以及怎么傳遞進(jìn)來的。
這個channel是在NettyHandler收到message的時候生成的妻献。
NettyHandler.messageReceived方法的源碼如下:

 @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            handler.received(channel, e.getMessage());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }

這個channel肯定就是netty用來和消費(fèi)者通信的socket蛛株。channel是流,里面有數(shù)據(jù)在流淌著育拨,用戶可以從這channel里面拿到數(shù)據(jù)谨履。這個channel也提供發(fā)送接收數(shù)據(jù)的能力。 channel.send(response)方法熬丧,我們就知道了這個channel就是nettyChannel笋粟。源碼如下:

public void send(Object message, boolean sent) throws RemotingException {
        super.send(message, sent);

        boolean success = true;
        int timeout = 0;
        try {
            ChannelFuture future = channel.write(message);
            if (sent) {
                timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            Throwable cause = future.getCause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                    + ", cause: " + e.getMessage() + ", may be graceful shutdown problem (2.5.3 or 3.1.*)"
                    + ", see http://git.caimi-inc.com/middleware/hokage/issues/14",
                    e);
        }

        if (!success) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                    + "in timeout(" + timeout + "ms) limit");
        }
    }

只要通過channel.write(message),把response的數(shù)據(jù)write到channel就完事了析蝴。我們也知道channel是可讀可寫的

到此結(jié)束害捕,提供方接收請求 -> 處理請求 -> 發(fā)送請求,都全部分析結(jié)束闷畸。
這里多一嘴尝盼,其實(shí)消費(fèi)者調(diào)用請求的send也是走這里的。不信可以驗(yàn)證佑菩,為了簡單起盾沫,我們直接定位到DubboInvoker的doInvoke方法:

@Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

看到最后一段代碼:

 else {
         RpcContext.getContext().setFuture(null);
         return (Result) currentClient.request(inv, timeout).get();
            }

追蹤到HeaderExchangeChannel.request方法:

 public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

channel.send最后又到了nettyChannel.send方法。和剛剛發(fā)送結(jié)果的send是一模一樣殿漠。都是往channel上write數(shù)據(jù)赴精。這個數(shù)據(jù)可以是request也可以是response,對channel來說無所謂绞幌。它僅僅是雙方用來通信的管道蕾哟,里面是啥數(shù)據(jù),兩邊是啥角色,channel都不關(guān)心渐苏。

《完》

四掀潮、消費(fèi)者接收結(jié)果

先看下消費(fèi)者請求接收結(jié)果的堆棧。我們把斷點(diǎn)設(shè)置到AllChannelHandler.received方法的第一行琼富。如下:


image.png

重下往上看仪吧,一直到SimpleChannelHandler.handleUpstream都是netty的代碼,在往上一層就是dubbo的代碼了鞠眉∈硎螅可能有人會問,你為啥就知道斷點(diǎn)到這里呢械蹋?我們看下消費(fèi)者初始化過程就清楚了出皇。這里是網(wǎng)絡(luò)相關(guān),我們猜想建立網(wǎng)絡(luò)連接一定在消費(fèi)者初始化client的時候哗戈。所以我們一下子就可以定位到DubboProtocol的refer方法中的getClients(url)郊艘。看下getClients方法唯咬,最終到initClient方法纱注。看到:

client = Exchangers.connect(url, requestHandler);

好了胆胰,我們現(xiàn)在就要研究這個方法了狞贱。requestHandler肯定就是出站數(shù)據(jù)和入站數(shù)據(jù),我們要如何處理的具體hanlder了蜀涨。你可能有疑問瞎嬉?我們看下這個handler實(shí)現(xiàn)的接口好了,它實(shí)現(xiàn)了ChannelHandler接口厚柳,其中該接口有sent和received方法氧枣,就代表了出站和入站。還有不懂草娜,自己去研究下就會了挑胸。
進(jìn)入connect源碼看下:

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).connect(url, handler);
    }

看到connect方法,繼續(xù)向下:

 public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

這里我們看到realHandler先被HeaderExchangeHandler包裝宰闰,再被DecodeHandler包裝茬贵。有人這里會奇怪,這些handler在上面的堆棧也沒出現(xiàn)啊移袍。對解藻,說明你已經(jīng)明白了。既然上面堆棧沒出現(xiàn)這里包裝的類葡盗,那是不是推測Transporters.connect里面肯定又做了很多包裝螟左。下面我們來驗(yàn)證下啡浊,繼續(xù)向下追蹤。


    public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        ChannelHandler handler;
        if (handlers == null || handlers.length == 0) {
            handler = new ChannelHandlerAdapter();
        } else if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().connect(url, handler);
    }

看到handler原封不動的傳給了getTransporter().connect方法胶背。我們繼續(xù)推測巷嚣,包裝發(fā)生在Transporter.connect里面。繼續(xù)追蹤:

public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

追蹤到NettyTransporter钳吟,很簡單廷粒。繼續(xù)往下看:

public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
        super(url, wrapChannelHandler(url, handler));
    }

是不是有點(diǎn)感覺了?wrapChannelHandler方法是不是做了包裝红且。繼續(xù)往下看:

 protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
        url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
        url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
        return ChannelHandlers.wrap(handler, url);
    }

繼續(xù)看ChannelHandlers.wrap(handler, url):

public class ChannelHandlers {

    private static ChannelHandlers INSTANCE = new ChannelHandlers();

    protected ChannelHandlers() {
    }

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }

    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }

    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}

warp方法調(diào)用了個單例坝茎,然后調(diào)用了wrapInternal∠痉看看里面的handler嗤放,是不是很熟悉。最終傳遞給netty就是這個MultiMessageHandler壁酬,然后是HeartbeatHandler次酌,里面那個動態(tài)SPI就是AllChannelHandler∮咔牵可能有人會問為啥和措?我們就來看下dispather的Adaptive代理類長啥樣子。如下:

package com.alibaba.dubbo.remoting;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class Dispatcher$Adpative implements Dispatcher {
    public Dispatcher$Adpative() {
    }

    public ChannelHandler dispatch(ChannelHandler var1, URL var2) {
        if (var2 == null) {
            throw new IllegalArgumentException("url == null");
        } else {
            String var4 = var2.getParameter("dispatcher", var2.getParameter("dispather", var2.getParameter("channel.handler", "all")));
            if (var4 == null) {
                throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Dispatcher) name from url(" + var2.toString() + ") use keys([dispatcher, dispather, channel.handler])");
            } else {
                Dispatcher var5 = (Dispatcher)ExtensionLoader.getExtensionLoader(Dispatcher.class).getExtension(var4);
                return var5.dispatch(var1, var2);
            }
        }
    }
}

看到all了沒蜕煌,它是默認(rèn)配置,用javassist代理生成的诬留。
到這里只分析道MultiMessageHandler以后的handler斜纪,那么它前面的handler是怎么來的?
繼續(xù)new NettyClient:

public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
        super(url, wrapChannelHandler(url, handler));
    }

這里super(url, MultiMessageHandler)文兑,進(jìn)入super看下:

 public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);

        send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);

        shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);

        //默認(rèn)重連間隔2s盒刚,1800表示1小時warning一次.
        reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);

        try {
            doOpen();
        } catch (Throwable t) {
            close();
            throw new RemotingException(url.toInetSocketAddress(), null,
                    "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                            + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
        }
        try {
            // connect.
           .....//省略
    }

看到doOpen方法,進(jìn)去看下:

@Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        bootstrap = new ClientBootstrap(channelFactory);
        // config
        // @see org.jboss.netty.channel.socket.SocketChannelConfig
        bootstrap.setOption("keepAlive", true);
        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("connectTimeoutMillis", getTimeout());
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
    }

看到有句代碼:

 final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);

this就是當(dāng)前NettyClient绿贞,它也是ChannelHandler的實(shí)現(xiàn)因块。它被NettyHandler包裝起來,NettyHandler繼承了然后就把它給了SimpleChannelHandler籍铁,最終傳遞給了netty的pipeline管道∥猩希現(xiàn)在終于把dubbo的hanlder和netty的handler銜接起來了。

總結(jié)下handler的包裝過程:
DubboProtocol.requestHandler < HeaderExchangeHandler < DecodeHandler < AllChannelHandler < HeartbeatHandler < MultiMessageHandler < NettyHandler拒名。我們猜測netty調(diào)用dubbo的第一個handler吩愧,一定是NettyHandler。在對比下上面的調(diào)用堆棧驗(yàn)證下增显,netty -> dubbo的第一個調(diào)用handler確實(shí)是nettyHandler雁佳。

到這里就分析結(jié)束了。但是這里我要特別說明下,我當(dāng)時看到這些包裝鏈路的時候糖权,想當(dāng)然的就把斷點(diǎn)設(shè)置到最里面那個handler堵腹,就是我們真正處理邏輯的handler。就是那個requestHandler星澳,在DubboProtocol的內(nèi)部類疚顷。如下:

// 請求處理器
    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
        @Override
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                //如果是callback 需要處理高版本調(diào)用低版本的問題
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || !methodsStr.contains(",")) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: "
                    + (message == null ? null : (message.getClass().getName() + ": " + message))
                    + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);
            } else {
                super.received(channel, message);
            }
        }

        @Override
        public void connected(Channel channel) throws RemotingException {
            invoke(channel, Constants.ON_CONNECT_KEY);
        }

        @Override
        public void disconnected(Channel channel) throws RemotingException {
            if (logger.isInfoEnabled()) {
                logger.info("disconnected from " + channel.getRemoteAddress() + ", url: " + channel.getUrl());
            }
            invoke(channel, Constants.ON_DISCONNECT_KEY);
        }

        private void invoke(Channel channel, String methodKey) {
            Invocation invocation = createInvocation(channel.getUrl(), methodKey);
            if (invocation != null) {
                try {
                    received(channel, invocation);
                } catch (Throwable t) {
                    logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
                }
            }
        }

        private Invocation createInvocation(URL url, String methodKey) {
            String method = url.getParameter(methodKey);
            if (method == null || method.length() == 0) {
                return null;
            }
            RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
            invocation.setAttachment(Constants.PATH_KEY, url.getPath());
            invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
            invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
            invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
            if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
                invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
            }
            return invocation;
        }
    };

它實(shí)現(xiàn)了ExchangeHandlerAdapter接口,里面當(dāng)然也有received方法募判。我當(dāng)時就把斷點(diǎn)設(shè)置到這里荡含,因?yàn)槲矣X得返回結(jié)果肯定最終會到達(dá)這里得到處理。結(jié)果就是我想錯了届垫,從它這里的實(shí)現(xiàn)也看得出释液,它也沒法處理response戒幔。我們這里來分析下:
上面調(diào)用堆椀挽看到現(xiàn)在請求結(jié)果已經(jīng)到達(dá)了AllChannelHandler雕欺,看下上面我們分析的包裝過程毕骡。下一個handler給到DecodeHandler章贞《贶裕看下DecodeHandler:


    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
            decode(message);
        }

        if (message instanceof Request) {
            decode(((Request) message).getData());
        }

        if (message instanceof Response) {
            decode(((Response) message).getResult());
        }

        handler.received(channel, message);
    }

看到解碼后慧域,繼續(xù)執(zhí)行handler的received方法侄非。繼續(xù)向下看登淘,到HeaderExchangeHandler箫老。

public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

看到message instanceof Response后,里面的handleResponse黔州。進(jìn)去看下:

static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }

重點(diǎn)來了耍鬓,它壓根就沒把reponse委托給下面的requestHandler,自己給處理了流妻。牲蜀。。我想當(dāng)然的以為一定會傳遞給最后一個handler處理呢绅这,沒想到到倒數(shù)第二個handler后自己給處理完了涣达,不向下傳遞了。所以解決了我在dubboProtocol那里斷點(diǎn)证薇,死活斷點(diǎn)不上(異步線程的斷點(diǎn)不在這里討論)度苔。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市棕叫,隨后出現(xiàn)的幾起案子林螃,更是在濱河造成了極大的恐慌,老刑警劉巖俺泣,帶你破解...
    沈念sama閱讀 211,265評論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件疗认,死亡現(xiàn)場離奇詭異完残,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)横漏,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,078評論 2 385
  • 文/潘曉璐 我一進(jìn)店門谨设,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人缎浇,你說我怎么就攤上這事扎拣。” “怎么了素跺?”我有些...
    開封第一講書人閱讀 156,852評論 0 347
  • 文/不壞的土叔 我叫張陵二蓝,是天一觀的道長。 經(jīng)常有香客問我指厌,道長刊愚,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,408評論 1 283
  • 正文 為了忘掉前任踩验,我火速辦了婚禮鸥诽,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘箕憾。我一直安慰自己牡借,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,445評論 5 384
  • 文/花漫 我一把揭開白布袭异。 她就那樣靜靜地躺著钠龙,像睡著了一般。 火紅的嫁衣襯著肌膚如雪御铃。 梳的紋絲不亂的頭發(fā)上俊鱼,一...
    開封第一講書人閱讀 49,772評論 1 290
  • 那天,我揣著相機(jī)與錄音畅买,去河邊找鬼。 笑死细睡,一個胖子當(dāng)著我的面吹牛谷羞,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播溜徙,決...
    沈念sama閱讀 38,921評論 3 406
  • 文/蒼蘭香墨 我猛地睜開眼湃缎,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了蠢壹?” 一聲冷哼從身側(cè)響起嗓违,我...
    開封第一講書人閱讀 37,688評論 0 266
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎图贸,沒想到半個月后蹂季,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體冕广,經(jīng)...
    沈念sama閱讀 44,130評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,467評論 2 325
  • 正文 我和宋清朗相戀三年偿洁,在試婚紗的時候發(fā)現(xiàn)自己被綠了撒汉。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,617評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡涕滋,死狀恐怖睬辐,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情宾肺,我是刑警寧澤溯饵,帶...
    沈念sama閱讀 34,276評論 4 329
  • 正文 年R本政府宣布,位于F島的核電站锨用,受9級特大地震影響丰刊,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜黔酥,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,882評論 3 312
  • 文/蒙蒙 一藻三、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧跪者,春花似錦棵帽、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,740評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至忘衍,卻和暖如春逾苫,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背枚钓。 一陣腳步聲響...
    開封第一講書人閱讀 31,967評論 1 265
  • 我被黑心中介騙來泰國打工铅搓, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人搀捷。 一個月前我還...
    沈念sama閱讀 46,315評論 2 360
  • 正文 我出身青樓星掰,卻偏偏與公主長得像,于是被迫代替她去往敵國和親嫩舟。 傳聞我的和親對象是個殘疾皇子氢烘,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,486評論 2 348