dubbo技術(shù)內(nèi)幕十一 Endpoint 之 Client

image.png

如上所示伟葫,在dubbo的自介里面拧晕,是這樣介紹exchange層與transport層的
exchange 信息交換層:封裝請求響應(yīng)模式十酣,同步轉(zhuǎn)異步,以 Request, Response 為中心蚊丐,擴(kuò)展接口為 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
transport 網(wǎng)絡(luò)傳輸層:抽象 mina 和 netty 為統(tǒng)一接口,以 Message 為中心艳吠,擴(kuò)展接口為 Channel, Transporter, Client, Server, Codec

exchange封裝語義信息麦备,等于說操作的是Request Response的語義封裝,而transport層做為傳輸層是不care任何的語義信息的昭娩,它只負(fù)責(zé)單向的數(shù)據(jù)傳輸凛篙。

為了理解上面所述,我們先看下dubbo里面提供的Endpoint接口题禀。

public interface Endpoint {

    //當(dāng)前Endpoint對應(yīng)的url(這個(gè)url是dubbo提供的一個(gè)信息載體)
    URL getUrl();

    //當(dāng)前Endpoint對應(yīng)的ChannelHandler
    ChannelHandler getChannelHandler();

    //當(dāng)前Endpoint對應(yīng)的本地地址
    InetSocketAddress getLocalAddress();

    //使用Endpoint發(fā)送消息
    void send(Object message) throws RemotingException;

    //與上相同鞋诗,添加的sent用于冪等校驗(yàn)
    void send(Object message, boolean sent) throws RemotingException;

   //關(guān)閉Endpoint
    void close();

    //在timeout時(shí)間內(nèi)關(guān)閉Endpoint
    void close(int timeout);
    //開始關(guān)閉Endpoint
    void startClose();
    //Endpoint是否已經(jīng)關(guān)閉
    boolean isClosed();

}

dubbo為什么抽象出Endpoint的概念,因?yàn)閷τ贑lient和Service來說迈嘹,其實(shí)都是一個(gè)Endpoint削彬,只是語義上的不同,兩者在物理上是p2p的概念秀仲,所以用Endpoint來統(tǒng)一抽象一下融痛。如下


image.png

Client和Service都extends于Endpoint,這里有個(gè)很關(guān)鍵的概念要說下神僵,dubbo里面的Client到底是個(gè)啥雁刷,Client就是對應(yīng)一個(gè)虛擬機(jī)嗎?如果你這樣理解的話保礼,會(huì)有很多不理解的地方沛励,在dubbo里面,每個(gè)DubboInvoker都內(nèi)置了一個(gè)ExchangeClient[] clients;默認(rèn)情況下是一個(gè)client炮障,所以如果refer的dubbo Provider 服務(wù)處于不同的虛擬機(jī)的話目派,一個(gè)虛擬機(jī)里面會(huì)有多個(gè)Client(根據(jù)refer的地址來緩存共享Client)。
所以看到上面的圖我們就明白了胁赢,為啥Client是繼承Channel的企蹭,因?yàn)橐粋€(gè)Client只關(guān)聯(lián)到一個(gè)遠(yuǎn)程的Provider的server,那么其關(guān)聯(lián)的Channel也就是一對一的智末,所以我們可以這樣的繼承和實(shí)現(xiàn)谅摄。
我們接著看下Channel接口的實(shí)現(xiàn),Channel是連接p2p的通道系馆,

public interface Channel extends Endpoint {

    //拿到遠(yuǎn)程地址送漠,從這個(gè)方法的語義出發(fā),我們可以看大Channel是關(guān)聯(lián)到Client上的
    InetSocketAddress getRemoteAddress();

    //是否已經(jīng)連接由蘑,那么肯定是針對服務(wù)端來說的螺男,就是是否已經(jīng)連接上了服務(wù)端
    boolean isConnected();

    
    boolean hasAttribute(String key);

   
    Object getAttribute(String key);

   
    void setAttribute(String key, Object value);

   
    void removeAttribute(String key);

}

關(guān)于Attribute的幾個(gè)簡單方法我們先不談棒厘,從Channel的方法的語義上分析,我們可以定位到Channel是為Client而準(zhǔn)備的下隧。
故而我們可以看到Client直接的實(shí)現(xiàn)了對Channel的繼承,源碼如下

public interface Client extends Endpoint, Channel, Resetable {

   //從新連接
    void reconnect() throws RemotingException;

    @Deprecated
    void reset(com.alibaba.dubbo.common.Parameters parameters);

}

我們可以看到Client只是新增了一個(gè)reconnect方法谓媒,因?yàn)槟J(rèn)是使用tcp來打開keepalive連接的,那么如果發(fā)生網(wǎng)絡(luò)抖動(dòng)句惯,我們是需要重連的土辩,保證通道的可用。
而接口Resetable抢野,表明實(shí)現(xiàn)類可以通過url進(jìn)行屬性的重新設(shè)置拷淘,在前面說過,dubbo里面的peer端都是通過url這個(gè)對象進(jìn)行屬性的更新和設(shè)置的指孤。

public interface Resetable {

    void reset(URL url);

}

我們再看下Server接口的定義

public interface Server extends Endpoint, Resetable {

   //是否已經(jīng)綁定到本地启涯,也就是是否啟動(dòng)成功,因?yàn)槭荢erver恃轩,所以不用connect到遠(yuǎn)程
   //只需要監(jiān)聽bind的本地port就可以
    boolean isBound();

    //拿到所有連接到此Server的Channel结洼,可以認(rèn)為這個(gè)Channel是對Client的抽象
    Collection<Channel> getChannels();

     //根據(jù)遠(yuǎn)程地址(也就是Client的地址)返回綁定的Channel
    Channel getChannel(InetSocketAddress remoteAddress);

    @Deprecated //作廢,不介紹叉跛,使用Resetable里面的reset方法松忍。
    void reset(com.alibaba.dubbo.common.Parameters parameters);

}

由于exchange 信息交換層:封裝請求響應(yīng)模式,同步轉(zhuǎn)異步筷厘,以 Request, Response 為中心鸣峭,擴(kuò)展接口為 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
我們通過看前面的接口發(fā)現(xiàn)Channel只有單向send能力(繼承于Endpoint接口),而ExchangeChannel位于exchange層酥艳,具有了接收response的能力摊溶,我們看接口源碼

public interface ExchangeChannel extends Channel {

    //抽象出request方法,在Endpont的簡單的send的基礎(chǔ)上抽象出了request和response的能力
    ResponseFuture request(Object request) throws RemotingException;

    //帶超時(shí)時(shí)間的request方法
    ResponseFuture request(Object request, int timeout) throws RemotingException;

    //后面再分析這個(gè)
    ExchangeHandler getExchangeHandler();

    //指定時(shí)間內(nèi)關(guān)閉這個(gè)close玖雁。
    void close(int timeout);

}

而針對 Channel的不同的狀態(tài)事件更扁,我們綁定ChannelHandler來進(jìn)行邏輯上的處理,接口定義如下

@SPI
public interface ChannelHandler {

    //當(dāng)channel connected時(shí)的處理方法
    void connected(Channel channel) throws RemotingException;

    //當(dāng)channel disconnected時(shí)的處理方法
    void disconnected(Channel channel) throws RemotingException;

    //當(dāng)channel send message時(shí)的處理方法
    void sent(Channel channel, Object message) throws RemotingException;

    //當(dāng) channel  received message時(shí)的處理方法
    void received(Channel channel, Object message) throws RemotingException;

    //當(dāng)channel caught exception時(shí)的處理方法
    void caught(Channel channel, Throwable exception) throws RemotingException;

}

ChannelHandler不關(guān)心語義的處理赫冬,而ExchangeHandler關(guān)心語義浓镜,所以其添加了reply方法,如下

public interface ExchangeHandler extends ChannelHandler, TelnetHandler {

    //其中的request是具有語義的request或是response
    Object reply(ExchangeChannel channel, Object request) throws RemotingException;

}

好劲厌,看了如上之后膛薛,我們看下transport層的Channel的繼承體系,如下


image.png

可以看到transport的Channel對Netty补鼻,Grizzly哄啄,Mina的Channel做了統(tǒng)一的封裝雅任,這樣用戶可以靈活的使用spi進(jìn)行配置。
我們看下AbstractChannel的源碼如下

public abstract class AbstractChannel extends AbstractPeer implements Channel {

    public AbstractChannel(URL url, ChannelHandler handler) {
        super(url, handler);
    }

    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        if (isClosed()) {
            throw new RemotingException(this, "Failed to send message "
                    + (message == null ? "" : message.getClass().getName()) + ":" + message
                    + ", cause: Channel closed. channel: " + getLocalAddress() + " -> " + getRemoteAddress());
        }
    }

    @Override
    public String toString() {
        return getLocalAddress() + " -> " + getRemoteAddress();
    }

}

基本沒有實(shí)現(xiàn)什么功能咨跌,只是簡單的做了一個(gè)Channel的狀態(tài)的判斷沪么。所有的實(shí)現(xiàn)都由子類來實(shí)現(xiàn)。AbstractPeer只是為了抽象出p2p的概念锌半,很簡單禽车,不用介紹。
由于最出名的nio框架還是netty刊殉,我們選取NettyChannel進(jìn)行分析殉摔,如下

final class NettyChannel extends AbstractChannel {

    private static final Logger logger = LoggerFactory.getLogger(NettyChannel.class);
    //這里封裝了netty的Channel與dubbo自己封裝的NettyChannel的緩存關(guān)系
    private static final ConcurrentMap<Channel, NettyChannel> channelMap = new ConcurrentHashMap<Channel, NettyChannel>();
    //這個(gè)是實(shí)際引用的netty的Channel
    private final Channel channel;
    //屬性
    private final Map<String, Object> attributes = new ConcurrentHashMap<String, Object>();
    //構(gòu)造函數(shù)為私有,所以只能自己調(diào)用 
    private NettyChannel(Channel channel, URL url, ChannelHandler handler) {
        super(url, handler);
        if (channel == null) {
            throw new IllegalArgumentException("netty channel == null;");
        }
        this.channel = channel; //典型的適配器模式记焊,將netty的Channel適配成了dubbo的NettyChannel
    }
    //將netty的Channel封裝成dubbo的NettyChannel
    static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) {
        if (ch == null) {
            return null;
        }
        NettyChannel ret = channelMap.get(ch);
        if (ret == null) {
            NettyChannel nettyChannel = new NettyChannel(ch, url, handler);
            if (ch.isActive()) {
                ret = channelMap.putIfAbsent(ch, nettyChannel);
            }
            if (ret == null) {
                ret = nettyChannel;
            }
        }
        return ret;
    }
    //如果Channel  Disconnected 逸月,對緩存進(jìn)行移除
    static void removeChannelIfDisconnected(Channel ch) {
        if (ch != null && !ch.isActive()) {
            channelMap.remove(ch);
        }
    }

    //典型的代理模式
    public InetSocketAddress getLocalAddress() {
        return (InetSocketAddress) channel.localAddress();
    }

    //拿到遠(yuǎn)程Server的地址
    public InetSocketAddress getRemoteAddress() {
        return (InetSocketAddress) channel.remoteAddress();
    }

    //是否Connected
    public boolean isConnected() {
        return !isClosed() && channel.isActive();
    }

    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        super.send(message, sent);

        boolean success = true;
        int timeout = 0;
        try {
            ChannelFuture future = channel.writeAndFlush(message);
            if (sent) {
                timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            Throwable cause = future.cause();
            if (cause != null) {
                throw cause;
            }
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
        }

        if (!success) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                    + "in timeout(" + timeout + "ms) limit");
        }
    }

    @Override
    public void close() {
        try {
            super.close();
        } catch (Exception e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            removeChannelIfDisconnected(channel);
        } catch (Exception e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            attributes.clear();
        } catch (Exception e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            if (logger.isInfoEnabled()) {
                logger.info("Close netty channel " + channel);
            }
            channel.close();
        } catch (Exception e) {
            logger.warn(e.getMessage(), e);
        }
    }

    @Override
    public boolean hasAttribute(String key) {
        return attributes.containsKey(key);
    }

    @Override
    public Object getAttribute(String key) {
        return attributes.get(key);
    }

    @Override
    public void setAttribute(String key, Object value) {
        if (value == null) { // The null value unallowed in the ConcurrentHashMap.
            attributes.remove(key);
        } else {
            attributes.put(key, value);
        }
    }

    @Override
    public void removeAttribute(String key) {
        attributes.remove(key);
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((channel == null) ? 0 : channel.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj) return true;
        if (obj == null) return false;
        if (getClass() != obj.getClass()) return false;
        NettyChannel other = (NettyChannel) obj;
        if (channel == null) {
            if (other.channel != null) return false;
        } else if (!channel.equals(other.channel)) return false;
        return true;
    }

    @Override
    public String toString() {
        return "NettyChannel [channel=" + channel + "]";
    }

}

如上可以看到NettyChannel的能力都是借用了netty的Channel的能力。NettyChannel只是做了簡單的封裝遍膜。

由于在客戶端碗硬,Channel與Client一一對應(yīng),我們看下transport層的Client的繼承體系捌归,如下


image.png

從繼承關(guān)系看由于AbstractClient繼承于ChannelHandler肛响,所以其擁有了自我handler的能力。
我們先看下AbstractPeer的源碼如下

public abstract class AbstractPeer implements Endpoint, ChannelHandler {
    //代理的ChannelHandler
    private final ChannelHandler handler;
    //url就是配置信息
    private volatile URL url;

    //是否在關(guān)閉中
    private volatile boolean closing;
    //是否已經(jīng)關(guān)閉
    private volatile boolean closed;

    public AbstractPeer(URL url, ChannelHandler handler) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.url = url;
        this.handler = handler;
    }

    @Override
    public void send(Object message) throws RemotingException {
        send(message, url.getParameter(Constants.SENT_KEY, false));
    }

    @Override
    public void close() {
        closed = true;
    }

    @Override
    public void close(int timeout) {
        close();
    }

    @Override
    public void startClose() {
        if (isClosed()) {
            return;
        }
        closing = true;
    }

    @Override
    public URL getUrl() {
        return url;
    }

    protected void setUrl(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        this.url = url;
    }

    @Override
    public ChannelHandler getChannelHandler() {
        if (handler instanceof ChannelHandlerDelegate) {
            return ((ChannelHandlerDelegate) handler).getHandler();
        } else {
            return handler;
        }
    }

    /**
     * @return ChannelHandler
     */
    @Deprecated
    public ChannelHandler getHandler() {
        return getDelegateHandler();
    }

    /**
     * Return the final handler (which may have been wrapped). This method should be distinguished with getChannelHandler() method
     *
     * @return ChannelHandler
     */
    public ChannelHandler getDelegateHandler() {
        return handler;
    }

    @Override
    public boolean isClosed() {
        return closed;
    }

    public boolean isClosing() {
        return closing && !closed;
    }

    @Override
    public void connected(Channel ch) throws RemotingException {
        if (closed) {
            return;
        }
        handler.connected(ch);
    }

    @Override
    public void disconnected(Channel ch) throws RemotingException {
        handler.disconnected(ch);
    }

    @Override
    public void sent(Channel ch, Object msg) throws RemotingException {
        if (closed) {
            return;
        }
        handler.sent(ch, msg);
    }

    @Override
    public void received(Channel ch, Object msg) throws RemotingException {
        if (closed) {
            return;
        }
        handler.received(ch, msg);
    }

    @Override
    public void caught(Channel ch, Throwable ex) throws RemotingException {
        handler.caught(ch, ex);
    }
}

如上可以看到AbstractPeer內(nèi)部引用了一個(gè)ChannelHandler來實(shí)現(xiàn)ChannelHandler的功能惜索,典型的裝飾模式特笋。
我們再看看AbstractEndpoint的源碼如下

public abstract class AbstractEndpoint extends AbstractPeer implements Resetable {

    private static final Logger logger = LoggerFactory.getLogger(AbstractEndpoint.class);
    //內(nèi)部封裝的編解碼器,對于transport層的Endpoint來說巾兆,我操作的都是字符數(shù)組猎物,怎么編解碼需要Codec2來負(fù)責(zé)黏包,半包的問題角塑。
    private Codec2 codec;
    //超時(shí)時(shí)間
    private int timeout;
    //鏈接超時(shí)時(shí)間
    private int connectTimeout;

    public AbstractEndpoint(URL url, ChannelHandler handler) {
        super(url, handler);
        //通過url拿到自定義的(如果有的話)的codec
        this.codec = getChannelCodec(url);
       //通過url拿到自定義的(如果有的話)的timeout
        this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // //通過url拿到自定義的(如果有的話)的connectTimeout
        this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
    }

    protected static Codec2 getChannelCodec(URL url) {
        String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
        if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
            return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
        } else {
            return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class)
                    .getExtension(codecName));
        }
    }
   //根據(jù)url重置timeout與connectTimeout屬性
    @Override
    public void reset(URL url) {
        if (isClosed()) {
            throw new IllegalStateException("Failed to reset parameters "
                    + url + ", cause: Channel closed. channel: " + getLocalAddress());
        }
        try {
            if (url.hasParameter(Constants.TIMEOUT_KEY)) {
                int t = url.getParameter(Constants.TIMEOUT_KEY, 0);
                if (t > 0) {
                    this.timeout = t;
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
        try {
            if (url.hasParameter(Constants.CONNECT_TIMEOUT_KEY)) {
                int t = url.getParameter(Constants.CONNECT_TIMEOUT_KEY, 0);
                if (t > 0) {
                    this.connectTimeout = t;
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
        try {
            if (url.hasParameter(Constants.CODEC_KEY)) {
                this.codec = getChannelCodec(url);
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
    }

    @Deprecated
    public void reset(com.alibaba.dubbo.common.Parameters parameters) {
        reset(getUrl().addParameters(parameters.getParameters()));
    }

    protected Codec2 getCodec() {
        return codec;
    }

    protected int getTimeout() {
        return timeout;
    }

    protected int getConnectTimeout() {
        return connectTimeout;
    }

}

如上可以看到AbstractEndpoint封裝了Codec2和兩個(gè)超時(shí)的屬性蔫磨。
我們再看下AbstractClient。

public abstract class AbstractClient extends AbstractEndpoint implements Client {
    //對應(yīng)的線程池的名字的前綴
    protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler";
    private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class);
     //對應(yīng)的線程池的名字id
    private static final AtomicInteger CLIENT_THREAD_POOL_ID = new AtomicInteger();
   //定時(shí)任務(wù)圃伶,用于client的connect狀態(tài)的檢測和重連
    private static final ScheduledThreadPoolExecutor reconnectExecutorService = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("DubboClientReconnectTimer", true));
   //connect時(shí)的鎖
    private final Lock connectLock = new ReentrantLock();
   //在send的時(shí)候堤如,如果send_reconnect為ture,那么會(huì)檢查下client的狀態(tài)窒朋,如果是disconnect狀態(tài)搀罢,那么會(huì)發(fā)起連接
    private final boolean send_reconnect;
   //重連的次數(shù)的統(tǒng)計(jì)
    private final AtomicInteger reconnect_count = new AtomicInteger(0);
    // Reconnection error log has been called before?
    private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false);
    // reconnect warning period. Reconnect warning interval (log warning after how many times) //for test
    private final int reconnect_warning_period;
    private final long shutdown_timeout;
    //client對應(yīng)的線程池
    protected volatile ExecutorService executor;
    //重連返回的結(jié)果的占位符Future
    private volatile ScheduledFuture<?> reconnectExecutorFuture = null;
    // the last successed connected time
    private long lastConnectedTime = System.currentTimeMillis();


    public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        //默認(rèn)是false
        send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
        //默認(rèn)15分鐘
        shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);

        // The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
        reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);

        try {
            //子類實(shí)現(xiàn)
            doOpen();
        } catch (Throwable t) {
            close();
            throw new RemotingException(url.toInetSocketAddress(), null,
                    "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                            + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
        }
        try {
            // connect.
            connect();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
            }
        } catch (RemotingException t) {
            if (url.getParameter(Constants.CHECK_KEY, true)) {
                close();
                throw t;
            } else {
                logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                        + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
            }
        } catch (Throwable t) {
            close();
            throw new RemotingException(url.toInetSocketAddress(), null,
                    "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                            + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
        }
       //得到這個(gè)Client關(guān)聯(lián)的線程池
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
                .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
        ExtensionLoader.getExtensionLoader(DataStore.class)
                .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
    }

    protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
        url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
        url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
        return ChannelHandlers.wrap(handler, url);
    }

    /**
     * @param url
     * @return 0-false  Reconnect間隔時(shí)間
     */
    private static int getReconnectParam(URL url) {
        int reconnect;
        String param = url.getParameter(Constants.RECONNECT_KEY);
        if (param == null || param.length() == 0 || "true".equalsIgnoreCase(param)) {
            reconnect = Constants.DEFAULT_RECONNECT_PERIOD;
        } else if ("false".equalsIgnoreCase(param)) {
            reconnect = 0;
        } else {
            try {
                reconnect = Integer.parseInt(param);
            } catch (Exception e) {
                throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:" + param);
            }
            if (reconnect < 0) {
                throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:" + param);
            }
        }
        return reconnect;
    }

    /**
     * init reconnect thread 這個(gè)重連線程會(huì)在client斷開的時(shí)候發(fā)起重連
     */
    private synchronized void initConnectStatusCheckCommand() {
        //reconnect=false to close reconnect
        int reconnect = getReconnectParam(getUrl());
        if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) {
            Runnable connectStatusCheckCommand = new Runnable() {
                @Override
                public void run() {
                    try {
                        if (!isConnected()) {
                            connect();
                        } else {
                            lastConnectedTime = System.currentTimeMillis();
                        }
                    } catch (Throwable t) {
                        String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl();
                        // wait registry sync provider list
                        if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) {
                            if (!reconnect_error_log_flag.get()) {
                                reconnect_error_log_flag.set(true);
                                logger.error(errorMsg, t);
                                return;
                            }
                        }
                        if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) {
                            logger.warn(errorMsg, t);
                        }
                    }
                }
            };
            reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
        }
    }
   //不進(jìn)行重連的定時(shí)操作
    private synchronized void destroyConnectStatusCheckCommand() {
        try {
            if (reconnectExecutorFuture != null && !reconnectExecutorFuture.isDone()) {
                reconnectExecutorFuture.cancel(true);
                reconnectExecutorService.purge();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    }
    //創(chuàng)建新的線程池
    protected ExecutorService createExecutor() {
        return Executors.newCachedThreadPool(new NamedThreadFactory(CLIENT_THREAD_POOL_NAME + CLIENT_THREAD_POOL_ID.incrementAndGet() + "-" + getUrl().getAddress(), true));
    }

    public InetSocketAddress getConnectAddress() {
        return new InetSocketAddress(NetUtils.filterLocalHost(getUrl().getHost()), getUrl().getPort());
    }

    @Override
    public InetSocketAddress getRemoteAddress() {
        Channel channel = getChannel();
        if (channel == null)
            return getUrl().toInetSocketAddress();
        return channel.getRemoteAddress();
    }

    @Override
    public InetSocketAddress getLocalAddress() {
        Channel channel = getChannel();
        if (channel == null)
            return InetSocketAddress.createUnresolved(NetUtils.getLocalHost(), 0);
        return channel.getLocalAddress();
    }
   //還是拋給了channel去判斷
    @Override
    public boolean isConnected() {
        Channel channel = getChannel();
        if (channel == null)
            return false;
        return channel.isConnected();
    }

    @Override
    public Object getAttribute(String key) {
        Channel channel = getChannel();
        if (channel == null)
            return null;
        return channel.getAttribute(key);
    }

    @Override
    public void setAttribute(String key, Object value) {
        Channel channel = getChannel();
        if (channel == null)
            return;
        channel.setAttribute(key, value);
    }

    @Override
    public void removeAttribute(String key) {
        Channel channel = getChannel();
        if (channel == null)
            return;
        channel.removeAttribute(key);
    }

    @Override
    public boolean hasAttribute(String key) {
        Channel channel = getChannel();
        if (channel == null)
            return false;
        return channel.hasAttribute(key);
    }

    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        //如果send_reconnect為true,那么進(jìn)行狀態(tài)檢測侥猩,如果狀態(tài)是非連接態(tài)榔至,進(jìn)行重連
        if (send_reconnect && !isConnected()) {
            connect();
        }
        //留給子類實(shí)現(xiàn)
        Channel channel = getChannel();
        //TODO Can the value returned by getChannel() be null? need improvement.
        if (channel == null || !channel.isConnected()) {
            throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
        }
        //通過channel將消息發(fā)出去
        channel.send(message, sent);
    }
    //發(fā)起連接
    protected void connect() throws RemotingException {
        connectLock.lock();
        try {
            if (isConnected()) {
                return;
            }
            initConnectStatusCheckCommand();
            //核心方法由子類實(shí)現(xiàn)
            doConnect();
            if (!isConnected()) {
                throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                        + ", cause: Connect wait timeout: " + getConnectTimeout() + "ms.");
            } else {
                if (logger.isInfoEnabled()) {
                    logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                            + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                            + ", channel is " + this.getChannel());
                }
            }
            reconnect_count.set(0);
            reconnect_error_log_flag.set(false);
        } catch (RemotingException e) {
            throw e;
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
                    + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
                    + ", cause: " + e.getMessage(), e);
        } finally {
            connectLock.unlock();
        }
    }

    public void disconnect() {
        connectLock.lock();
        try {
            destroyConnectStatusCheckCommand();
            try {
                Channel channel = getChannel();
                if (channel != null) {
                    channel.close();
                }
            } catch (Throwable e) {
                logger.warn(e.getMessage(), e);
            }
            try {
                //核心方法由子類實(shí)現(xiàn)
                doDisConnect();
            } catch (Throwable e) {
                logger.warn(e.getMessage(), e);
            }
        } finally {
            connectLock.unlock();
        }
    }
    
    //提供重連的方法,先斷開欺劳,再連接
    @Override
    public void reconnect() throws RemotingException {
        disconnect();
        connect();
    }
    

//close的時(shí)候先銷毀外圍資源
    @Override
    public void close() {
        try {
            if (executor != null) {
                ExecutorUtil.shutdownNow(executor, 100);
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            super.close();
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            disconnect();
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            doClose();
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    }

    @Override
    public void close(int timeout) {
        ExecutorUtil.gracefulShutdown(executor, timeout);
        close();
    }

    @Override
    public String toString() {
        return getClass().getName() + " [" + getLocalAddress() + " -> " + getRemoteAddress() + "]";
    }

    /**
     * Open client.
     *
     * @throws Throwable
     */
    protected abstract void doOpen() throws Throwable;

    /**
     * Close client.
     *
     * @throws Throwable
     */
    protected abstract void doClose() throws Throwable;

    /**
     * Connect to server.
     *
     * @throws Throwable
     */
    protected abstract void doConnect() throws Throwable;

    /**
     * disConnect to server.
     *
     * @throws Throwable
     */
    protected abstract void doDisConnect() throws Throwable;

    /**
     * Get the connected channel.
     *
     * @return channel
     */
    protected abstract Channel getChannel();

}

如上我們可以看到AbstractClient對Client的鏈接態(tài)做了判斷和重連機(jī)制唧取,所有的核心實(shí)現(xiàn)還是由子類來實(shí)現(xiàn)我們看下子類是怎么實(shí)現(xiàn)的铅鲤,由于netty最為有名,我們選取NettyClient做分析枫弟。
源碼如下

public class NettyClient extends AbstractClient {

    private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);

    private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));
    //netty客戶端啟動(dòng)類
    private Bootstrap bootstrap;
    //volatile,使用引用的方式進(jìn)行使用
    private volatile Channel channel; // volatile, please copy reference to use

    public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
         //wrapChannelHandler我們后面分析
        super(url, wrapChannelHandler(url, handler));
    }
    
    //開啟客戶端邢享,其實(shí)就是對Client進(jìn)行設(shè)置,為connect前做準(zhǔn)備
    @Override
    protected void doOpen() throws Throwable {
        final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
        bootstrap = new Bootstrap();
        bootstrap.group(nioEventLoopGroup)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                //緩存使用池化技術(shù)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
                .channel(NioSocketChannel.class);
       //鏈接超時(shí)時(shí)間設(shè)置成3秒之上
        if (getConnectTimeout() < 3000) {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
        } else {
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
        }

        bootstrap.handler(new ChannelInitializer() {

            @Override
            protected void initChannel(Channel ch) throws Exception {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                       //設(shè)置編碼器
                        .addLast("decoder", adapter.getDecoder())
                       //設(shè)置解碼器
                        .addLast("encoder", adapter.getEncoder())
                        //設(shè)置handler媒区,其實(shí)就是本身
                        .addLast("handler", nettyClientHandler);
            }
        });
    }

    @Override
    protected void doConnect() throws Throwable {
        long start = System.currentTimeMillis();
        ChannelFuture future = bootstrap.connect(getConnectAddress());
        try {
            boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
            //發(fā)起連接
            if (ret && future.isSuccess()) {
                Channel newChannel = future.channel();
                try {
                   //銷毀原來關(guān)聯(lián)的老的channel
                    // Close old channel
                    Channel oldChannel = NettyClient.this.channel; // copy reference
                    if (oldChannel != null) {
                        try {
                            if (logger.isInfoEnabled()) {
                                logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                            }
                            oldChannel.close();
                        } finally {
                            NettyChannel.removeChannelIfDisconnected(oldChannel);
                        }
                    }
                } finally {
                    if (NettyClient.this.isClosed()) {
                        try {
                            if (logger.isInfoEnabled()) {
                                logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                            }
                            newChannel.close();
                        } finally {
                            NettyClient.this.channel = null;
                            NettyChannel.removeChannelIfDisconnected(newChannel);
                        }
                    } else {
                       //將新生成的channl賦值給當(dāng)前驼仪,這樣就完成了整個(gè)的初始化
                        NettyClient.this.channel = newChannel;
                    }
                }
            } else if (future.cause() != null) {
                throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                        + getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
            } else {
                throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                        + getRemoteAddress() + " client-side timeout "
                        + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
            }
        } finally {
            if (!isConnected()) {
                //future.cancel(true);
            }
        }
    }
  
  //關(guān)閉連接,其實(shí)就是移除緩存袜漩,為啥不把這個(gè)nioEventLoopGroup也干掉呢,因?yàn)檫@個(gè)是共享的湾碎,別人還在用
    @Override
    protected void doDisConnect() throws Throwable {
        try {
            NettyChannel.removeChannelIfDisconnected(channel);
        } catch (Throwable t) {
            logger.warn(t.getMessage());
        }
    }

    @Override
    protected void doClose() throws Throwable {
        //can't shutdown nioEventLoopGroup
        //nioEventLoopGroup.shutdownGracefully();
    }
  
    //返回當(dāng)前的channel并將其緩存并封裝成dubbo的channel進(jìn)行返回
    @Override
    protected com.alibaba.dubbo.remoting.Channel getChannel() {
        Channel c = channel;
        if (c == null || !c.isActive())
            return null;
        return NettyChannel.getOrAddChannel(c, getUrl(), this);
    }

}

如上的代碼還是比較容易的宙攻。如上是對transport層的client的分析,對于transport來說沒有任何的業(yè)務(wù)含義在里面介褥,所以接下來我們繼續(xù)的分析exchange層的client座掘。
整個(gè)exchange層的client的繼承關(guān)系如下


image.png

前面我們分析過,Client與Channel是一一對應(yīng)的柔滔,所以dubbo抽象出了個(gè)接口ExchangeClient溢陪,源碼如下

public interface ExchangeClient extends Client, ExchangeChannel {

}

而例外一個(gè)實(shí)現(xiàn)類HeaderExchangeChannel是在exchange層具有業(yè)務(wù)語義的Channel,我們看下其源碼實(shí)現(xiàn)。

final class HeaderExchangeChannel implements ExchangeChannel {

    private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeChannel.class);

    private static final String CHANNEL_KEY = HeaderExchangeChannel.class.getName() + ".CHANNEL";
   //內(nèi)部封裝的channel睛廊,我們可以看到這是個(gè)裝飾模式形真,給沒有業(yè)務(wù)語義的channel
  //添加業(yè)務(wù)語義
    private final Channel channel;

    private volatile boolean closed = false;

    HeaderExchangeChannel(Channel channel) {
        if (channel == null) {
            throw new IllegalArgumentException("channel == null");
        }
        this.channel = channel;
    }
    //
    static HeaderExchangeChannel getOrAddChannel(Channel ch) {
        if (ch == null) {
            return null;
        }
        HeaderExchangeChannel ret = (HeaderExchangeChannel) ch.getAttribute(CHANNEL_KEY);
        if (ret == null) {
            ret = new HeaderExchangeChannel(ch);
            if (ch.isConnected()) {
                ch.setAttribute(CHANNEL_KEY, ret);
            }
        }
        return ret;
    }

    static void removeChannelIfDisconnected(Channel ch) {
        if (ch != null && !ch.isConnected()) {
            ch.removeAttribute(CHANNEL_KEY);
        }
    }

    @Override
    public void send(Object message) throws RemotingException {
        send(message, getUrl().getParameter(Constants.SENT_KEY, false));
    }

    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
        }
       //如果message是Request 或者 Response 或者String(兼容telnet)直接的發(fā)送
        if (message instanceof Request
                || message instanceof Response
                || message instanceof String) {
            channel.send(message, sent);
        } else {
            //不然將其封裝成Request進(jìn)行發(fā)送
            Request request = new Request();
            request.setVersion(Version.getProtocolVersion());
            //單向發(fā)送,不關(guān)注返回結(jié)果超全,即時(shí)的返回即可
            request.setTwoWay(false);
            request.setData(message);
            channel.send(request, sent);
        }
    }
   
  //發(fā)送咆霜,并拿到返回結(jié)果,這里就有了response的信息了
    @Override
    public ResponseFuture request(Object request) throws RemotingException {
        return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
    }

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        //需要返回結(jié)果嘶朱,會(huì)進(jìn)行超時(shí)等待返回結(jié)果
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

    @Override
    public boolean isClosed() {
        return closed;
    }

    @Override
    public void close() {
        try {
            channel.close();
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    }

    // graceful close
    @Override
    public void close(int timeout) {
        if (closed) {
            return;
        }
        closed = true;
        if (timeout > 0) {
            long start = System.currentTimeMillis();
            while (DefaultFuture.hasFuture(channel)
                    && System.currentTimeMillis() - start < timeout) {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
        close();
    }

    @Override
    public void startClose() {
        channel.startClose();
    }

    @Override
    public InetSocketAddress getLocalAddress() {
        return channel.getLocalAddress();
    }

    @Override
    public InetSocketAddress getRemoteAddress() {
        return channel.getRemoteAddress();
    }

    @Override
    public URL getUrl() {
        return channel.getUrl();
    }

    @Override
    public boolean isConnected() {
        return channel.isConnected();
    }

    @Override
    public ChannelHandler getChannelHandler() {
        return channel.getChannelHandler();
    }

    @Override
    public ExchangeHandler getExchangeHandler() {
        return (ExchangeHandler) channel.getChannelHandler();
    }

    @Override
    public Object getAttribute(String key) {
        return channel.getAttribute(key);
    }

    @Override
    public void setAttribute(String key, Object value) {
        channel.setAttribute(key, value);
    }

    @Override
    public void removeAttribute(String key) {
        channel.removeAttribute(key);
    }

    @Override
    public boolean hasAttribute(String key) {
        return channel.hasAttribute(key);
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((channel == null) ? 0 : channel.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj) return true;
        if (obj == null) return false;
        if (getClass() != obj.getClass()) return false;
        HeaderExchangeChannel other = (HeaderExchangeChannel) obj;
        if (channel == null) {
            if (other.channel != null) return false;
        } else if (!channel.equals(other.channel)) return false;
        return true;
    }

    @Override
    public String toString() {
        return channel.toString();
    }

}

如上我們可以看到HeaderExchangeChannel已經(jīng)有了Request和Response的業(yè)務(wù)語義了蛾坯。
即時(shí)是在exchange層,Channel也是底層的實(shí)現(xiàn)疏遏,我們業(yè)務(wù)需要關(guān)心的是Client脉课。
我們看下ExchangeClient的幾個(gè)實(shí)現(xiàn)。
ReferenceCountExchangeClient略過财异,只是簡單的記錄對某個(gè)ExchangeClient的引用倘零。
HeaderExchangeClient底層也是使用了ExchangeChannel但是擁有了heartbeat的能力。
LazyConnectExchangeClient 只有在真正的發(fā)起連接的時(shí)候宝当,才初始化客戶端视事。
這3個(gè)Client其實(shí)都是針對Channel的封裝,使用很簡單庆揩,我們就不說了俐东。

最后跌穗,我們看下exchange與transport層是如何關(guān)聯(lián)起來的,其實(shí)在LazyConnectExchangeClient已經(jīng)有了虏辫,其有個(gè)initClient方法蚌吸,我們看下

 private void initClient() throws RemotingException {
        if (client != null)
            return;
        if (logger.isInfoEnabled()) {
            logger.info("Lazy connect to " + url);
        }
        connectLock.lock();
        try {
            if (client != null)
                return;
            //這一句是重點(diǎn)
            this.client = Exchangers.connect(url, requestHandler);
        } finally {
            connectLock.unlock();
        }
    }

我們再看Exchangers里面的connect方法,如下

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).connect(url, handler);
//
public static Exchanger getExchanger(String type) {
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }

Exchanger其實(shí)就是對exchange層的最高級(jí)的封裝砌庄,如下

public interface Exchanger {

    //生成服務(wù)端的ExchangeServer 
    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;

    //生成客戶端的ExchangeClient
    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;

}

其具體實(shí)現(xiàn)是HeaderExchanger
如下

public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
       //返回的就是我們前面講的HeaderExchangeClient
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

照這樣分析羹唠,ExchangeClient應(yīng)該是封裝了一個(gè)Transporter層的client,果不其然娄昆,
我們看

Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler)))

的具體返回佩微。
Transporters是對Transporter層的最高級(jí)封裝,如下

public class Transporters {

    static {
        // check duplicate jar package 防癡呆設(shè)計(jì)萌焰,防止重復(fù)的jar沖突
        Version.checkDuplicate(Transporters.class);
        Version.checkDuplicate(RemotingException.class);
    }

    private Transporters() {
    }

 
    public static Client connect(String url, ChannelHandler... handler) throws RemotingException {
        return connect(URL.valueOf(url), handler);
    }

    public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        ChannelHandler handler;
        if (handlers == null || handlers.length == 0) {
            handler = new ChannelHandlerAdapter();
        } else if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            //
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().connect(url, handler);
    }

    public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }

}

而getTransporter方法哺眯,默認(rèn)返回的就是我們前面分析的NettyClient,這樣整個(gè)就串起來扒俯。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末奶卓,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子撼玄,更是在濱河造成了極大的恐慌夺姑,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件掌猛,死亡現(xiàn)場離奇詭異盏浙,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)留潦,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進(jìn)店門只盹,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人兔院,你說我怎么就攤上這事殖卑。” “怎么了坊萝?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵孵稽,是天一觀的道長。 經(jīng)常有香客問我十偶,道長菩鲜,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任惦积,我火速辦了婚禮接校,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己蛛勉,他們只是感情好鹿寻,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著诽凌,像睡著了一般毡熏。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上侣诵,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天痢法,我揣著相機(jī)與錄音,去河邊找鬼杜顺。 笑死财搁,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的躬络。 我是一名探鬼主播妇拯,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼洗鸵!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起仗嗦,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤膘滨,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后稀拐,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體火邓,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年德撬,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了铲咨。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,117評論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡蜓洪,死狀恐怖纤勒,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情隆檀,我是刑警寧澤摇天,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布,位于F島的核電站恐仑,受9級(jí)特大地震影響泉坐,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜裳仆,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一腕让、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧歧斟,春花似錦纯丸、人聲如沸偏形。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽壳猜。三九已至,卻和暖如春滑凉,著一層夾襖步出監(jiān)牢的瞬間统扳,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工畅姊, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留咒钟,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓若未,卻偏偏與公主長得像朱嘴,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子粗合,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,877評論 2 345

推薦閱讀更多精彩內(nèi)容