Dubbo源碼之網(wǎng)絡通信

服務暴露將做哪些事情愤诱?

  1. 注冊ZK芜果,監(jiān)聽動態(tài)配置節(jié)點
  2. 開啟Server端
  3. 創(chuàng)建代理服務
  4. Exporter -> Invoker -> proxyService

服務引用將做哪些事情妆兑?

  1. 注冊ZK榜贴,監(jiān)聽動態(tài)配置節(jié)點鉴分、providr節(jié)點、路由節(jié)點
  2. 開啟Client端
  3. 創(chuàng)建代理服務
  4. proxyService -> Invoker

客戶端請求

ConsumerProxyService -> Invoker【DubboInvoker】 -> Exchanger【HeaderExchangeClient】 -> Transporter【NettyClient】 -> 編碼 -> SEND-TO-SERVER (創(chuàng)建了DefaultFuture惧蛹,Request帶唯一標識)

服務端響應

解碼 -> Transporter【NettyServer】-> 系列Handlers -> 線程池 -> Exporter#getInvoker -> Invoker#invoke -> ProviderProxyService -> callback 

最開始看的就是覺得這一塊涉及到的類好多啊扇救,傻傻分不清楚,沒事多看幾遍吧

Exchanger

Exchangers

門面類香嗓,提供各種便捷方法迅腔,先通過SPI獲取Exchanger,然后調(diào)用Exchanger的相關方法創(chuàng)建ExchangeServer靠娱、ExchangeClient

Exchanger

SPI接口钾挟,默認實現(xiàn)類HeaderExchanger,提供了兩個快捷方法創(chuàng)建ExchangeServer饱岸、ExchangeClient

@SPI(HeaderExchanger.NAME)
public interface Exchanger {
    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;

    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
}

public class HeaderExchanger implements Exchanger {
    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
}

ExchangeServer

Server端使用,默認實現(xiàn)類HeaderExchangeServer徽千,內(nèi)部調(diào)用Transporter開啟Server服務

public interface ExchangeServer extends Server {
    Collection<ExchangeChannel> getExchangeChannels();

    ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress);
}

ExchangeClient

Client端使用苫费,默認實現(xiàn)類HeaderExchangeClient,核心request方法双抽,內(nèi)部調(diào)用Transporter發(fā)送請求

public interface ExchangeClient extends Client, ExchangeChannel {
}

ExchangeChannel

默認實現(xiàn)類 HeaderExchangeChannel百框,作為HeaderExchangeClient的一個屬性

Transporter

Transporters

門面類,提供各種便捷方法牍汹,先通過SPI獲取Transporter铐维,然后調(diào)用Transporter的相關方法創(chuàng)建Server柬泽、Client

Transporter

SPI接口,默認實現(xiàn)類NettyTransporter嫁蛇,提供了兩個快捷方法創(chuàng)建Server锨并、Client

@SPI("netty")
public interface Transporter {
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;

    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;
}

public class NettyTransporter implements Transporter {
    public static final String NAME = "netty";

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }
}

Server

Server端使用,默認實現(xiàn)類NettyServer睬棚,用于開啟Server服務第煮,核心方法doOpen

public class NettyServer extends AbstractServer implements Server {
}

Client

Client端使用,默認實現(xiàn)類NettyClient抑党,核心request方法用于發(fā)送請求包警,doOpen用于與服務端建立連接

public class NettyClient extends AbstractClient {
}

服務端啟動服務

DubboProtocol#export =>
DubboProtocol#openServer => 
DubboProtocol#createServer =>
Exchangers#bind => 
NettyServer#doOpen

最終,在NettyServer#doOpen中通過Netty開啟了一個Server端

DubboProtocol#createServer
    => Exchangers#bind(url, requestHandler)
        => HeaderExchanger#bind(url, requestHandler)
            => return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))))
                
// Transporters#bind 語句可以拆解為 
Transporters#bind
    => NettyTransporter#bind(url, handler)
        => return new NettyServer(url, handler)
            =>  NettyServer#doOpen【NettyServer構造函數(shù)中調(diào)用了doOpen方法】

NettyServer中的hander屬性底靠,最終指向的是new DecodeHandler(new HeaderExchangeHandler(handler))害晦。最終Server端返回HeaderExchangeServer,然后在NettyServer的構造函數(shù)中暑中,對handle其實還做了一些封裝

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}

public class ChannelHandlers {
    private static ChannelHandlers INSTANCE = new ChannelHandlers();

    protected ChannelHandlers() {}

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }
    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }
    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}

所以壹瘟,最終NettyServer中的hander屬性指向MultiMessageHandler -> HeartbeatHandler -> AllDispatcher -> DecodeHandler

客戶端連接服務

調(diào)用鏈太長了,而且隱藏的非常深痒芝,重點省略了一些俐筋,在應用啟動時為Reference對象生成Invoker時創(chuàng)建的

RegistryProtocol#doRefer =>
RegistryDirectory#subscribe =>
RegistryDirectory#toInvokers => 
ProtocolFilterWrapper#refer =>
AbstractProtocol#refer =>
DubboProtocol#protocolBindingRefer =>
DubboProtocol#getClients =>
DubboProtocol#getSharedClient =>
DubboProtocol#buildReferenceCountExchangeClientList =>
DubboProtocol#buildReferenceCountExchangeClient =>
DubboProtocol#initClient =>
Exchangers#connect =>
HeaderExchanger#connect =>
Transporters#connect =>
NettyTransporter#connect =>
NettyClient#<init> =>
NettyClient#doOpen

最終,在NettyClient#doOpen中通過Netty與Server建立連接

Exchangers#connect
    => HeaderExchanger#connect(url, handler)
        => return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true)
                
// Transporters#connect 語句可以拆解為 
Transporters#connect
    => NettyTransporter#connect(url, handler)
        => return new NettyClient(url, handler)
            =>  NettyClient#doOpen【NettyClient構造函數(shù)中調(diào)用了doOpen方法】

NettyClient中的hander屬性严衬,最終指向的是new DecodeHandler(new HeaderExchangeHandler(handler))澄者。最終Client端返回HeaderExchangeClient,其中的client屬性也對NettyClient做了包裝處理

不過在DubboProtocol#buildReferenceCountExchangeClient方法中對HeaderExchangeClient包裝了一層请琳,最終Invoker中的Client類型是ReferenceCountExchangeClient

private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
    ExchangeClient exchangeClient = initClient(url);

    return new ReferenceCountExchangeClient(exchangeClient);
}

ReferenceCountExchangeClientHeaderExchangeClient沒什么區(qū)別花鹅,只不過包裝了一層,然后還有一個比較重要的屬性referenceCount买猖,用于記錄客戶端的個數(shù)膨蛮?

客戶端發(fā)送請求

調(diào)用方代理類 ->
InvokerInvocationHandler#invoke ->
MockClusterInvoker#invoke ->
AbstractClusterInvoker#invoke【獲取LoadBalance】 -> 
FailoverClusterInvoker#doInvoke【處理重試次數(shù)】 ->
ProtocolFilterWrapper#invoke【處理Filter鏈路】 ->
AbstractInvoker#invoke【設置Attachments參數(shù)】 ->
DubboInvoker#doInvoke【Exchange交接層】 ->
ReferenceCountExchangeClient#request ->
HeaderExchangeClient#request ->
HeaderExchangeChannel#request【return CompletableFuture】 ->
AbstractPeer#send ->
AbstractClient#send ->
NettyChannel#send ->
Channel#writeAndFlush【發(fā)消息給服務端】

DubboInvoker#doInvoke開始與Exchange層交互,核心代碼如下

protected Result doInvoke(final Invocation invocation) throws Throwable {
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
    // return = false竖慧,即oneWay 嫌套,可以減少不必要的Future對象創(chuàng)建
    if (isOneway) {
        // send=true,即客戶端發(fā)送之后再返回圾旨,否則直接返回
        boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
        currentClient.send(inv, isSent);
        RpcContext.getContext().setFuture(null);
        return AsyncRpcResult.newDefaultAsyncResult(invocation);
    } else {
        AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
        CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
        asyncRpcResult.subscribeTo(responseFuture);
        RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
        return asyncRpcResult;
    }
}
ReferenceCountExchangeClient#request => 
HeaderExchangeClient#request =>  
HeaderExchangeChannel#request
// HeaderExchangeChannel.java
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // create request.
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay(true);
    req.setData(request);
    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
    try {
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}

在這個方法中踱讨,有以下幾個需要注意的點:

  1. Request構造函數(shù)內(nèi)部,會為Request生成一個遞增唯一的ID砍的,用于標識該請求
  2. channel#send調(diào)用過程中痹筛,涉及到NettyChannel#getOrAddChannel方法的調(diào)用,NettyChannel中有一個ConcurrentMap<Channel, NettyChannel> CHANNEL_MAP緩存,用于維護io.netty.channel.ChannelNettyChannel 的關系
  3. channel#send調(diào)用過程中帚稠,最終會調(diào)用到NettyChannel#send方法谣旁,該方法真正的將消息發(fā)給Server端
  4. 返回的DefaultFuture是一個CompletableFuture
// NettyChannel.java
public void send(Object message, boolean sent) throws RemotingException {
    boolean success = true;
    int timeout = 0;
    try {
        // 將消息發(fā)給Server
        ChannelFuture future = channel.writeAndFlush(message);
        if (sent) {
            // 如果配置了 send=true 參數(shù),客戶端需要等待消息發(fā)出之后再返回
            timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            success = future.await(timeout);
        }
        Throwable cause = future.cause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }
    if (!success) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit");
    }
}

從上面消息發(fā)送的流程中滋早,好像沒有看到對消息的編碼工作榄审,那是因為在Netty客戶端初始化的時候,已經(jīng)設置了編解碼器

// NettyClient.java 
protected void doOpen() throws Throwable {
    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
    bootstrap = new Bootstrap();
    bootstrap.group(nioEventLoopGroup)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .channel(NioSocketChannel.class);
    if (getConnectTimeout() < 3000) {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
    } else {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
    }

    bootstrap.handler(new ChannelInitializer() {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            ch.pipeline()
                    .addLast("decoder", adapter.getDecoder())
                    .addLast("encoder", adapter.getEncoder())
                    .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
                    .addLast("handler", nettyClientHandler);
            String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);
            if(socksProxyHost != null) {
                int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
                Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
                ch.pipeline().addFirst(socks5ProxyHandler);
            }
        }
    });
}

先經(jīng)過編碼器馆衔,即InternalEncoder#encode方法瘟判,InternalEncoder實現(xiàn)了MessageToByteEncoder接口,該方法內(nèi)部調(diào)用了Codec2的相關方法角溃,而Codec2是一個SPI接口拷获,默認實現(xiàn)DubboCodec

NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
protected static Codec2 getChannelCodec(URL url) {
    String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
    if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
        return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
    } else {
        return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));
    }
}

服務端響應請求

上面提到了NettyServer中的hander屬性指向 MultiMessageHandler -> HeartbeatHandler -> AllDispatcher -> DecodeHandler -> HeaderExchangeHandler
NettyServer開啟Server端的代碼如下

protected void doOpen() throws Throwable {
    bootstrap = new ServerBootstrap();
    bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker", true));
    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();

    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                            .addLast("handler", nettyServerHandler);
                }
            });
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();
}
  1. 先經(jīng)過解碼器,即InternalDecoder#decode方法减细,InternalDecoder實現(xiàn)了ByteToMessageDecoder接口匆瓜,該方法內(nèi)部調(diào)用了Codec2的相關方法,而Codec2是一個SPI接口未蝌,默認實現(xiàn)DubboCodec
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
protected static Codec2 getChannelCodec(URL url) {
    String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
    if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
        return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
    } else {
        return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class).getExtension(codecName));
    }
}
  1. MultiMessageHandler用于處理數(shù)組消息驮吱,如果是消息是MultiMessage類型,MultiMessage實現(xiàn)了Iterable數(shù)組萧吠,則遍歷調(diào)用handle的received方法左冬;否則直接調(diào)用下一個handle的received方法
  2. AllChannelHandler收到消息,將 channel handler message封裝成state為ChannelState.RECEIVED類型的ChannelEventRunnable對象纸型,然后交給線程池執(zhí)行
  3. ChannelEventRunnable#run方法中判斷state為ChannelState.RECEIVED類型拇砰,直接執(zhí)行下一個handler的received方法,即DecodeHandler狰腌,這個過程是由線程池執(zhí)行
  4. DecodeHandler#received方法中除破,如果消息是Decodeable類型,對整個消息進行解碼琼腔;如果消息是Request類型瑰枫,對Request.getData()進行解碼;如果消息是Response類型丹莲,對Response.getResult()進行解碼
  5. HeaderExchangeHandler#received -> HeaderExchangeHandler#handleRequest -> requestHandler#reply 光坝,requestHandlerDubboProtocol中的一個屬性,ExchangeHandlerAdapter類型
  6. HeaderExchangeHandler#handleRequest中會創(chuàng)建一個Response對象甥材,它的ID屬性值盯另,就是Request對象的ID值,這樣請求和響應就關聯(lián)起來了
  7. requestHandler#reply方法中擂达,從exporterMap緩存中獲取對應的DubboExporter對象,然后從DubboExporter獲取Invoker,最后執(zhí)行Invoker#invoke方法板鬓,然后返回一個CompletableFuture對象
  8. HeaderExchangeHandler#handleRequest方法中接收返回的CompletableFuture對象悲敷,對它添加回調(diào)處理,在回調(diào)中將返回結果封裝到Response對象中俭令,然后通過channel將Response發(fā)出
// ChannelEventRunnable.java
public void run() {
    if (state == ChannelState.RECEIVED) {
        try {
            // RECEIVED 類型后德,直接執(zhí)行下一個handle的received方法,即 DecodeHandler
            handler.received(channel, message);
        } catch (Exception e) {}
    } else {
        switch (state) {
        case CONNECTED:
            try {
                handler.connected(channel);
           } catch (Exception e) {}
            break;
        case DISCONNECTED:
            try {
                handler.disconnected(channel);
           } catch (Exception e) {}
            break;
        case SENT:
            try {
                handler.sent(channel, message);
            } catch (Exception e) {}
            break;
        case CAUGHT:
            try {
                handler.caught(channel, exception);
           } catch (Exception e) {}
            break;
        default:
            logger.warn("unknown state: " + state + ", message is " + message);
        }
    }

}
InternalDecoder#decode
    => NettyServerHandler#channelRead
        => AbstractPeer#received
            => MultiMessageHandler#received
                => HeartbeatHandler#received
                    => AllChannelHandler#received
                    
                    ------------------ 異步執(zhí)行抄腔,放到線程池 ----------------------
                    => ChannelEventRunnable#run
                        => DecodeHandler#received
                            => DecodeHandler#decode
                                => DecodeableRpcInvocation#decode
                        => HeaderExchangeHandler#received
                            => HeaderExchangeHandler#handleRequest
                                => DubboProtocol.requestHandler#reply
                    ------------------ 異步執(zhí)行 -----------------------

                                    ----------------擴展點-------------------
                                    => ProtocolFilterWrapper.invoke
                                    => EchoFilter.invoke
                                        => ClassLoaderFilter.invoke
                                        => GenericFilter.invoke
                                            => TraceFilter.invoke
                                            => MonitorFilter.invoke
                                                => TimeoutFilter.invoke
                                                => ExceptionFilter.invoke
                                                    => InvokerWrapper.invoke
                                    -----------------擴展點-------------------
                                                        => AbstractProxyInvoker#invoke
                                                            => JavassistProxyFactory.AbstractProxyInvoker#doInvoke
                                                                => 代理類#invokeMethod
                                                                    => 真正的service方法


                            //把接收處理的結果瓢湃,數(shù)據(jù)發(fā)回consumer  future#whenComplete                                                              
                            => channel.send(response)
                                => HeaderExchangeChannel
                                    => NettyChannel.send
                                        => NioSocketChannel#writeAndFlush(message)                                                  

服務端發(fā)送結果

HeaderExchangeChannel#send =>
NettyChannel#send => 
NioSocketChannel#writeAndFlush(message) 

客戶端響應結果

在客戶端啟動的時候,入?yún)andler和服務端的handler是同一個

// DubboProtocol#initClient
Exchangers.connect(url, requestHandler);

// HeaderExchanger#connect
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

Transporters#connect =>
    NettyTransporter#connect
        return NettyClient

NettyClient構造函數(shù)中赫蛇,對handler做了包裝

ChannelHandlers.wrap(handler, url)

public class ChannelHandlers {
    private static ChannelHandlers INSTANCE = new ChannelHandlers();
    protected ChannelHandlers() {
    }

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }
    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }
    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}

所以绵患,最終NettyClient中的handler屬性指向 MultiMessageHandler -> HeartbeatHandler -> AllChannelHandler -> DecodeHandler -> HeaderExchangeHandler -> requestHandler ,和服務端處理流程一樣一樣

  1. 接收消息悟耘,經(jīng)過MultiMessageHandler落蝙、HeartbeatHandler 處理,到達 AllDispatcher
  2. AllChannelHandler中將消息封裝成new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)類型暂幼,交由線程池執(zhí)行
  3. 線程池執(zhí)行任務筏勒,經(jīng)過DecodeHandler到達HeaderExchangeHandler
  4. HeaderExchangeHandler#received -> HeaderExchangeHandler#handleResponse -> DefaultFuture#receivedDefaultFuture中維護了一個請求ID和DefaultFuture的映射關系旺嬉,Request和Response通過請求ID可以一一對應
public static void received(Channel channel, Response response, boolean timeout) {
    try {
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            Timeout t = future.timeoutCheckTask;
            if (!timeout) {
                t.cancel();
            }
            future.doReceived(response);
        } else {
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}

private void doReceived(Response res) {
    if (res == null) {
        throw new IllegalStateException("response cannot be null");
    }
    if (res.getStatus() == Response.OK) {
        this.complete(res.getResult());
    } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
    } else {
        this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
    }
}
  1. 通過response.Id獲取DefaultFuture
  2. 執(zhí)行CompletableFuture#complete方法可以讓 執(zhí)行了CompletableFuture#get的用戶線程得到響應管行,獲取結果返回。至此整個調(diào)用過程完成

同步轉異步

可是我們在代碼中很多時候都是同步調(diào)用邪媳,很少自己去調(diào)用CompletableFuture#get方法捐顷,這一部分邏輯又是怎么處理的。在DubboInvoker#doInvoke方法中悲酷,返回的是一個AsyncRpcResult

protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(PATH_KEY, getUrl().getPath());
    inv.setAttachment(VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
        // return = false套菜,即oneWay ,可以減少不必要的Future對象創(chuàng)建
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return AsyncRpcResult.newDefaultAsyncResult(invocation);
        } else {c
            AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
            CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
            // 訂閱 responseFuture 设易,當 responseFuture 完成的之后逗柴,執(zhí)行 asyncRpcResult 的complete方法, 這樣用戶線程就可以響應了
            asyncRpcResult.subscribeTo(responseFuture);

            RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
            return asyncRpcResult;
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

AsyncToSyncInvoker

AsyncToSyncInvoker#invoke方法中顿肺,會判斷是同步調(diào)用還是異步調(diào)用戏溺,如果是同步調(diào)用,將調(diào)用AsyncRpcResult#get方法阻塞用戶線程屠尊,以達到同步效果

public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult = invoker.invoke(invocation);
    try {
        if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
            // 如果是同步調(diào)用旷祸,調(diào)用 asyncResult#get 阻塞用戶線程
            asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
        }
    } catch (InterruptedException e) {
        throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return!  method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (ExecutionException e) {
        Throwable t = e.getCause();
        if (t instanceof TimeoutException) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } else if (t instanceof RemotingException) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    } catch (Throwable e) {
        throw new RpcException(e.getMessage(), e);
    }
    return asyncResult;
}
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市讼昆,隨后出現(xiàn)的幾起案子托享,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,198評論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件闰围,死亡現(xiàn)場離奇詭異赃绊,居然都是意外死亡,警方通過查閱死者的電腦和手機羡榴,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評論 3 398
  • 文/潘曉璐 我一進店門碧查,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人校仑,你說我怎么就攤上這事忠售。” “怎么了迄沫?”我有些...
    開封第一講書人閱讀 167,643評論 0 360
  • 文/不壞的土叔 我叫張陵稻扬,是天一觀的道長。 經(jīng)常有香客問我邢滑,道長腐螟,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,495評論 1 296
  • 正文 為了忘掉前任困后,我火速辦了婚禮乐纸,結果婚禮上,老公的妹妹穿的比我還像新娘摇予。我一直安慰自己汽绢,他們只是感情好,可當我...
    茶點故事閱讀 68,502評論 6 397
  • 文/花漫 我一把揭開白布侧戴。 她就那樣靜靜地躺著宁昭,像睡著了一般。 火紅的嫁衣襯著肌膚如雪酗宋。 梳的紋絲不亂的頭發(fā)上积仗,一...
    開封第一講書人閱讀 52,156評論 1 308
  • 那天,我揣著相機與錄音蜕猫,去河邊找鬼寂曹。 笑死,一個胖子當著我的面吹牛回右,可吹牛的內(nèi)容都是我干的隆圆。 我是一名探鬼主播,決...
    沈念sama閱讀 40,743評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼翔烁,長吁一口氣:“原來是場噩夢啊……” “哼渺氧!你這毒婦竟也來了?” 一聲冷哼從身側響起蹬屹,我...
    開封第一講書人閱讀 39,659評論 0 276
  • 序言:老撾萬榮一對情侶失蹤侣背,失蹤者是張志新(化名)和其女友劉穎白华,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體贩耐,經(jīng)...
    沈念sama閱讀 46,200評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡衬鱼,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,282評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了憔杨。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,424評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡蒜胖,死狀恐怖消别,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情台谢,我是刑警寧澤寻狂,帶...
    沈念sama閱讀 36,107評論 5 349
  • 正文 年R本政府宣布,位于F島的核電站朋沮,受9級特大地震影響蛇券,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜樊拓,卻給世界環(huán)境...
    茶點故事閱讀 41,789評論 3 333
  • 文/蒙蒙 一纠亚、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧筋夏,春花似錦蒂胞、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,264評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至赴叹,卻和暖如春鸿染,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背乞巧。 一陣腳步聲響...
    開封第一講書人閱讀 33,390評論 1 271
  • 我被黑心中介騙來泰國打工涨椒, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人摊欠。 一個月前我還...
    沈念sama閱讀 48,798評論 3 376
  • 正文 我出身青樓丢烘,卻偏偏與公主長得像,于是被迫代替她去往敵國和親些椒。 傳聞我的和親對象是個殘疾皇子播瞳,可洞房花燭夜當晚...
    茶點故事閱讀 45,435評論 2 359

推薦閱讀更多精彩內(nèi)容