dubbo之Protocol(協(xié)議)

前言

前面我們講到的服務(wù)暴露流程中,沒(méi)有涉及Protocol相關(guān)操作歹撒,本篇我們來(lái)看Protocol的實(shí)現(xiàn)。 本文將重點(diǎn)分析Protocol的核心功能秋柄,以及Protocol的設(shè)計(jì)與實(shí)現(xiàn)原理庸队。從Protocol接口開(kāi)始积蜻,逐個(gè)分析具體Protocol的實(shí)現(xiàn)闯割。Protocol是dubbo中協(xié)議的抽象,負(fù)責(zé)服務(wù)的暴露竿拆、引用宙拉,在dubbo整個(gè)框架設(shè)計(jì)中位于protocol層(遠(yuǎn)程調(diào)用層),在exchange層(信息交換層)之上(各層間依賴(lài)關(guān)系:protocol -> exchange -> transport -> serialize)如输。Protocol接口支持SPI擴(kuò)展鼓黔,默認(rèn)SPI實(shí)現(xiàn)是DubboProtocol,同時(shí)支持方法級(jí)SPI。接口核心方法包括export(服務(wù)暴露)不见、refer(服務(wù)引用)澳化、destroy(協(xié)議銷(xiāo)毀)。AbstractProtocol作為基類(lèi)實(shí)現(xiàn)稳吮,提供默認(rèn)destroy方法實(shí)現(xiàn)缎谷。方便理解起見(jiàn),我把Protocol的SPI擴(kuò)展實(shí)現(xiàn)分為兩種灶似,第一種直接實(shí)現(xiàn)Protocol接口列林,這里暫且稱(chēng)之為Protocol代理,包括QosProtocolWrapper酪惭、RegistryProtocol希痴、ProtocolFilterWrapper、ProtocolListenerWrapper春感;第二種繼承AbstractProtocol基類(lèi)砌创,具體實(shí)現(xiàn)export、refer方法功能鲫懒,最具代表性的是DubboProtocol嫩实,但是RedisProtocol、MemcachedProtocol窥岩、MockProtocol僅提供服務(wù)引用功能(僅實(shí)現(xiàn)refer接口)甲献,不支持服務(wù)暴露;另外颂翼,有一部分Protocol通過(guò)繼承AbstractProxyProtocol(當(dāng)然晃洒,AbstractProxyProtocol繼承了AbstractProtocol)實(shí)現(xiàn),AbstractProxyProtocol內(nèi)部引入ProxyFactory朦乏,借助Hessian锥累、Spring的httpInvoker、Spring的RMI集歇、apache的cxf(webService框架)等Rpc桶略、RMI、Service框架實(shí)現(xiàn)服務(wù)的暴露和引用。類(lèi)之間的繼承關(guān)系如下

Protocol-UML (2).jpg

一际歼、Protocol接口

上面提到惶翻,Protocol對(duì)外暴露的核心接口有export(服務(wù)暴露)、refer(服務(wù)引用)鹅心、destroy(協(xié)議銷(xiāo)毀)吕粗,dubbo默認(rèn)采用的協(xié)議是DubboProtocol,其中服務(wù)暴露和服務(wù)引用支持方法級(jí)SPI旭愧。核心方法介紹如下:

1.1颅筋、 服務(wù)暴露

服務(wù)暴露,即對(duì)外暴露可用服務(wù)输枯,其本質(zhì)是創(chuàng)建socket連接议泵,并綁定url,對(duì)外暴露服務(wù)端口

export方法有以下幾點(diǎn)需要注意
1桃熄、收到請(qǐng)求后先口,會(huì)記錄請(qǐng)求源地址到 RpcContext,RpcContext.getContext().setRemoteAddress();
2瞳收、export方法要求冪等碉京,即對(duì)于同一個(gè)URL來(lái)說(shuō)調(diào)用一次與調(diào)用n次的結(jié)果一致;
3螟深、參數(shù)Invoker由框架傳遞谐宙,protocol無(wú)需關(guān)注。

1.2界弧、服務(wù)引用

refer方法有以下幾點(diǎn)需要注意
1卧惜、用戶調(diào)用refer方法返回的Invoker對(duì)象的invoke方法時(shí),protocol需要相應(yīng)的執(zhí)行Invoker的invoke方法夹纫;
2、由protocol負(fù)責(zé)refer方法返回的Invoker的具體實(shí)現(xiàn);
3设凹、當(dāng)URL中設(shè)置了check=false時(shí)舰讹,連接失敗時(shí)protocol應(yīng)該嘗試恢復(fù)而不是拋異常。

1.3 闪朱、協(xié)議銷(xiāo)毀

destroy方法有以下幾點(diǎn)需要注意
1月匣、銷(xiāo)毀方法需要取消當(dāng)前協(xié)議暴露和引用的所有服務(wù);
2、釋放所有占用的資源奋姿,比如連接锄开、端口等;
3称诗、當(dāng)被銷(xiāo)毀之后萍悴,protocol可以繼續(xù)暴露或者引用新服務(wù)。

一、Protocol代理

Protocol的代理實(shí)現(xiàn)主要有QosProtocolWrapper癣诱、ProtocolFilterWrapper计维、ProtocolListenerWrapper、RegistryProtocol撕予。其中OosProtocolWrapper提供服務(wù)的Qos鲫惶,即為dubbo服務(wù)提供Qos保障(Qos 指利用各種基礎(chǔ)技術(shù),為指定的網(wǎng)絡(luò)通信提供更好的服務(wù)能力实抡,是網(wǎng)絡(luò)的一種安全機(jī)制欠母, 用來(lái)解決網(wǎng)絡(luò)延遲和阻塞等問(wèn)題,通常Oos的關(guān)鍵指標(biāo)包括可用性吆寨、吞吐量赏淌、時(shí)延、時(shí)延變化(包括抖動(dòng)和漂移)和丟失)鸟废。嚴(yán)格來(lái)講猜敢,RegistryProtocol并非Protocol的代理實(shí)現(xiàn)(dubbo中對(duì)代理類(lèi)的定義是,構(gòu)造方法有且僅有一個(gè)參數(shù)盒延,即被代理類(lèi)對(duì)象)缩擂,但RegistryProtocol的服務(wù)暴露、引用均由內(nèi)部的protocol引用實(shí)現(xiàn)添寺,所以將其歸類(lèi)到代理胯盯,一并進(jìn)行解析。

1计露、QosProtocolWrapper

QosProtocolWrapper是Protocol的代理實(shí)現(xiàn)博脑,提供服務(wù)的Qos保障,需要注意的是票罐,dubbo默認(rèn)開(kāi)啟Qos叉趣,即默認(rèn)qos.enable=true;另外该押,只有協(xié)議采用registry時(shí)疗杉,才會(huì)啟動(dòng)Qos;Qos的實(shí)現(xiàn)借助Netty蚕礼,打開(kāi)本地端口(*默認(rèn)端口22222*)烟具,創(chuàng)建并啟動(dòng)Server。支持telnet使用奠蹬,格式:telnet ip port朝聋。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    // protocol使用registry時(shí),才會(huì)啟動(dòng)Qos服務(wù)囤躁,否則直接使用代理的protocol實(shí)現(xiàn)服務(wù)暴露
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        startQosServer(invoker.getUrl());
        return protocol.export(invoker);
    }
    return protocol.export(invoker);
}

@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // protocol使用registry時(shí)冀痕,才會(huì)啟動(dòng)Qos荔睹,復(fù)雜直接使用代理的protocol實(shí)現(xiàn)服務(wù)引用
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        startQosServer(url);
        return protocol.refer(type, url);
    }
    return protocol.refer(type, url);
}

來(lái)看Qos服務(wù)啟動(dòng)的邏輯,核心邏輯是創(chuàng)建Server并啟動(dòng)(這里的Server是個(gè)單獨(dú)的工具類(lèi)),比較簡(jiǎn)單金度,直接來(lái)看Server的啟動(dòng)邏輯

public void start() throws Throwable {
    if (!started.compareAndSet(false, true)) {
        return;
    }
    boss = new NioEventLoopGroup(0, new DefaultThreadFactory("qos-boss", true));
    worker = new NioEventLoopGroup(0, new DefaultThreadFactory("qos-worker", true));
    // 創(chuàng)建NettyServer綁定端口
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(boss, worker);
    serverBootstrap.channel(NioServerSocketChannel.class);
    serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
    serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
    serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new QosProcessHandler(welcome, acceptForeignIp));
        }
    });
    try {
        serverBootstrap.bind(port).sync();
        logger.info("qos-server bind localhost:" + port);
    } catch (Throwable throwable) {
        logger.error("qos-server can not bind localhost:" + port, throwable);
        throw throwable;
    }
}

2应媚、ProtocolFilterWrapper

ProtocolFilterWrapper的核心邏輯就是buildInvokerChain,用于構(gòu)建invoker的調(diào)用鏈猜极,前面介紹buildInvokerChain方法時(shí)中姜,做過(guò)相關(guān)介紹,這里不過(guò)多解析跟伏。

3丢胚、ProtocolListenerWrapper

ProtocolListenerWrapper同樣是Protocol的代理實(shí)現(xiàn),借助ExporterListener(支持SPI擴(kuò)展,無(wú)實(shí)質(zhì)邏輯)受扳、InvokerListener(支持SPI擴(kuò)展携龟,無(wú)實(shí)質(zhì)邏輯),用于提供服務(wù)暴露勘高、引用實(shí)例的代理峡蟋,只有protocol非registry時(shí)才會(huì)返回實(shí)例代理。僅需關(guān)注核心方法export华望、refer蕊蝗,邏輯也比較簡(jiǎn)單

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    // 協(xié)議值為registry時(shí),直接執(zhí)行服務(wù)暴露,否則返回服務(wù)暴露后的代理實(shí)例
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        return protocol.export(invoker);
    }
    return new ListenerExporterWrapper<T>(protocol.export(invoker),
            Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                    .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}

@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // 協(xié)議值為registry時(shí)赖舟,直接執(zhí)行服務(wù)的引用,否則返回的代理實(shí)例
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        return protocol.refer(type, url);
    }
    return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
            Collections.unmodifiableList(
                    ExtensionLoader.getExtensionLoader(InvokerListener.class)
                            .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
}

上面的代碼中蓬戚,ListenerExporterWrapper與ListenerInvokerWrapper的邏輯比較簡(jiǎn)單,構(gòu)造方法中分別執(zhí)行ExporterListener.export宾抓、InvokerListener.invoke方法的代理邏輯子漩。

4、RegistryProtocol

RegistryProtocol石洗,即注冊(cè)中心協(xié)議實(shí)現(xiàn)幢泼,服務(wù)暴露多了創(chuàng)建注冊(cè)中心和服務(wù)注冊(cè)邏輯。當(dāng)URL中protocol=registry時(shí)讲衫,采用該協(xié)議缕棵。分析RegisgtryProtocol過(guò)程中,Registry的實(shí)現(xiàn)我們以全部以Registry的默認(rèn)SPI實(shí)現(xiàn)即DubboRegistry為例(后續(xù)會(huì)對(duì)Registry專(zhuān)門(mén)開(kāi)篇解析)焦人。RegistryProtocol的服務(wù)暴露包括:Url構(gòu)建、服務(wù)本地暴露重父、注冊(cè)中心構(gòu)建花椭、服務(wù)注冊(cè)幾個(gè)步驟

4.1 服務(wù)暴露

1、構(gòu)建URL

第一步房午,構(gòu)建registryUrl矿辽、providerUrl、overrideSubscribeUrl,以當(dāng)前Invoker中的URL(暫且叫做originUrl)為參照袋倔,按照原型模式創(chuàng)建雕蔽。registryUrl,即將originUrl中procotol值替換為registry值宾娜,從下面的示例可以清楚看出URL構(gòu)建的過(guò)程及結(jié)果批狐。

// 當(dāng)前Invoker中url值
originUrl = "registry://127.0.0.1:9090?export=dubbo://127.0.0.1:9453/org.apache.dubbo.registry.protocol.DemoService:1.0.0?notify=true&methods=test1,test2&side=con&side=consumer"

// registryUrl,即將protocol值由registry變更為dubbo(默認(rèn)注冊(cè)中心實(shí)現(xiàn)DubboRegistry)
registryUrl = "dubbo://127.0.0.1:9090?export=dubbo://127.0.0.1:9453/org.apache.dubbo.registry.protocol.DemoService:1.0.0?notify=true&methods=test1,test2&side=con&side=consumer"

//  providerUrl,即originUrl中export值
providerUrl = "dubbo://127.0.0.1:9453/org.apache.dubbo.registry.protocol.DemoService:1.0.0?methods=test1,test2&notify=true&side=consumer"

// overrideSubscribeUrl,即將originUrl中protocol值替換為provider,
//并新增category參數(shù)前塔,值為configurators嚣艇,為后面providerUrl的參數(shù)融合做準(zhǔn)備
overrideSubscribeUrl = "provider://127.0.0.1:9453/org.apache.dubbo.registry.protocol.DemoService:1.0.0?category=configurators&check=false&methods=test1,test2&notify=true&side=consumer" 

完成相關(guān)URL構(gòu)建之后,進(jìn)行URL參數(shù)融合华弓,核心代碼如下食零,重點(diǎn)關(guān)注overrideUrlWithConfig方法。providerConfigurationListener寂屏、serviceConfigurationListener構(gòu)建過(guò)程中贰谣,會(huì)加載動(dòng)態(tài)配置(關(guān)于配置的解析詳見(jiàn)dubbo之配置(Configuration))到對(duì)應(yīng)listener的configurators,通過(guò)listener的overrideUrl方法將configurators配置融合到providerUrl參數(shù)

// 構(gòu)建信息融合listener
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 動(dòng)態(tài)配置信息融合到provider
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);

// 配置信息融合
private URL overrideUrlWithConfig(URL providerUrl, OverrideListener listener) {
            // 應(yīng)用級(jí)配置信息融合到providerUrl
        providerUrl = providerConfigurationListener.overrideUrl(providerUrl);
        ServiceConfigurationListener serviceConfigurationListener = new ServiceConfigurationListener(providerUrl, listener);
            // 服務(wù)級(jí)配置信息融合到providerUrl
        serviceConfigurationListeners.put(providerUrl.getServiceKey(), serviceConfigurationListener);
        return serviceConfigurationListener.overrideUrl(providerUrl);
    }

有一點(diǎn)需要注意迁霎,ServiceConfigurationListener初始化過(guò)程中會(huì)從動(dòng)態(tài)配置拉取最新配置信息吱抚,然后執(zhí)行reExport(重新暴露),入口在process方法,邏輯如下:

protected final void initWith(String key) {
    //動(dòng)態(tài)配置
    DynamicConfiguration dynamicConfiguration = DynamicConfiguration.getDynamicConfiguration();
    // 如果是nopDynamicConfiguration欧引,那么這里什么都不會(huì)做
    dynamicConfiguration.addListener(key, this);
    String rawConfig = dynamicConfiguration.getConfig(key);
    if (!StringUtils.isEmpty(rawConfig)) {
        //直接根據(jù)字符串配置频伤,賦值configurators.并執(zhí)行notifyOverride模板方法
        process(new ConfigChangeEvent(key, rawConfig));
    }
}

configurators的初始化不再過(guò)多解析,重點(diǎn)關(guān)注process方法中調(diào)用的notifyOverrides模板方法芝此,以ProviderConfigurationListener為例憋肖,notifyOverrides實(shí)現(xiàn)如下:

@Override
protected void notifyOverrides() {
    overrideListeners.values().forEach(listener -> ((OverrideListener) listener).doOverrideIfNecessary());
}

繼續(xù)看方法內(nèi)部,執(zhí)行OverrideListener的doOverrideIfNecessary方法婚苹,邏輯比較簡(jiǎn)單岸更,對(duì)比配置覆蓋前后的url,若不一致膊升,則重新暴露

public synchronized void doOverrideIfNecessary() {
    final Invoker<?> invoker;
    // 原始invoker或者invoker代理
    if (originInvoker instanceof InvokerDelegate) {
        invoker = ((InvokerDelegate<?>) originInvoker).getInvoker();
    } else {
        invoker = originInvoker;
    }
    //The origin invoker怎炊,從Invoker中解析出原始URL(非注冊(cè)中心url)
    URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);
    String key = getCacheKey(originInvoker);
    ExporterChangeableWrapper<?> exporter = bounds.get(key);
    if (exporter == null) {
        logger.warn(new IllegalStateException("error state, exporter should not be null"));
        return;
    }
    //The current, may have been merged many times
    URL currentUrl = exporter.getInvoker().getUrl();
    //Merged with this configuration,所有configurator參數(shù)合并到原始URL,生成新url
    URL newUrl = getConfigedInvokerUrl(configurators, originUrl);
    newUrl = getConfigedInvokerUrl(serviceConfigurationListeners.get(originUrl.getServiceKey())
            .getConfigurators(), newUrl);
    newUrl = getConfigedInvokerUrl(providerConfigurationListener.getConfigurators(), newUrl);
    // 如果當(dāng)前url與合并后的新url不一致廓译,那么评肆,重新暴露新的url
    if (!currentUrl.equals(newUrl)) {
        RegistryProtocol.this.reExport(originInvoker, newUrl);
        logger.info("exported provider url changed, origin url: " + originUrl +
                ", old export url: " + currentUrl + ", new export url: " + newUrl);
    }
}

重新暴露即reExport方法分為兩部分,服務(wù)重新暴露和服務(wù)重新注冊(cè)

public <T> void reExport(final Invoker<T> originInvoker, URL newInvokerUrl) {
    // update local exporter非区,本地exporter更新,檢查本地Exporter瓜挽,通過(guò)代理protocol重新暴露
    ExporterChangeableWrapper exporter = doChangeLocalExport(originInvoker, newInvokerUrl);
    // update registry
    URL registryUrl = getRegistryUrl(originInvoker);
    final URL registeredProviderUrl = getRegisteredProviderUrl(newInvokerUrl, registryUrl);

    //decide if we need to re-publish
    ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.getProviderWrapper(registeredProviderUrl, originInvoker);
    ProviderInvokerWrapper<T> newProviderInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
    /**
     * Only if the new url going to Registry is different with the previous one should we do unregister and register.
     * 原始provider已注冊(cè),且原始providerUrl與新的providerUrl不一致征绸,執(zhí)行重新注冊(cè)操作(注銷(xiāo)久橙、重新注冊(cè))
     */
    if (providerInvokerWrapper.isReg() && !registeredProviderUrl.equals(providerInvokerWrapper.getProviderUrl())) {
        unregister(registryUrl, providerInvokerWrapper.getProviderUrl());
        register(registryUrl, registeredProviderUrl);
        newProviderInvokerWrapper.setReg(true);
    }
    //更新注冊(cè)的url
    exporter.setRegisterUrl(registeredProviderUrl);
}

2俄占、服務(wù)本地暴露

服務(wù)本地暴露,先從緩存中取之前的暴露結(jié)果淆衷,取通過(guò)protocol引用執(zhí)行服務(wù)暴露缸榄。這里實(shí)際執(zhí)行的是RegistryProtocol中通過(guò)IOC注入的protocol(默認(rèn)是DubboProtocol),也就是說(shuō)實(shí)際執(zhí)行的是DubboProtocol的export祝拯,執(zhí)行完成之后甚带,包裝結(jié)果Invoker到ExporterChangeableWrapper,并返回鹿驼。來(lái)看代碼:

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
    String key = getCacheKey(originInvoker);
    ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    if (exporter == null) {
        synchronized (bounds) {
            exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
            if (exporter == null) {
                //本地暴露欲低,即暴露invoker并緩存到bounds
                final Invoker<?> invokerDelegete = new InvokerDelegate<T>(originInvoker, providerUrl);
                // 生成代理類(lèi)exporter
                exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                bounds.put(key, exporter);
            }
        }
    }
    return exporter;
}

3、構(gòu)建注冊(cè)中心

注冊(cè)中心通過(guò)RegistryFactory構(gòu)建畜晰,簡(jiǎn)單介紹一下RegistryFactory砾莱,RegistryFactory接口支持SPI,默認(rèn)SPI實(shí)現(xiàn)是DubboRegistryFactory凄鼻,也就是說(shuō)除非URL中指定了registry值腊瑟,否則默認(rèn)采用DubboRegistry。我們以DubboRegistry為例來(lái)解析注冊(cè)中心的構(gòu)建過(guò)程:

private Registry getRegistry(final Invoker<?> originInvoker) {
    URL registryUrl = getRegistryUrl(originInvoker);
    return registryFactory.getRegistry(registryUrl);
}

getRegistry方法由RegistryFactory的基類(lèi)AbstractRegistryFactory實(shí)現(xiàn)块蚌,基類(lèi)內(nèi)部定義了模板方法createRegistry闰非,由子類(lèi)實(shí)現(xiàn)。直接來(lái)看DubboRegistryFactory的createRegistry邏輯峭范〔扑桑基類(lèi)方法getRegistry中,dubbo會(huì)將當(dāng)前registryUrl中的path參數(shù)替換成org.apache.dubbo.registry.RegistryService纱控,同時(shí)補(bǔ)全methods參數(shù)值辆毡,即RegistryService聲明的方法lookup,unsubscribe,subscribe,unregister,register。那么創(chuàng)建Registry時(shí)甜害,url參數(shù)值為:

dubbo://127.0.0.1:9090/org.apache.dubbo.registry.RegistryService?callbacks=10000&connect.timeout=10000&interface=org.apache.dubbo.registry.RegistryService&lazy=true&methods=lookup,unsubscribe,subscribe,unregister,register&reconnect=false&sticky=true&subscribe.1.callback=true&timeout=10000&unsubscribe.1.callback=false

接著舶掖,根據(jù)backup參數(shù)值,url被拆分為多個(gè)url存放于url列表尔店,列表中url除地址外眨攘,其他參數(shù)完全相同。然后嚣州,借助RegistryDirectory鲫售,完成注冊(cè)中心的創(chuàng)建

@Override
public Registry createRegistry(URL url) {
    //  補(bǔ)全RegistryService相關(guān)參數(shù)
    url = getRegistryURL(url);
    List<URL> urls = new ArrayList<>();
    urls.add(url.removeParameter(Constants.BACKUP_KEY));
    String backup = url.getParameter(Constants.BACKUP_KEY);
    // 根據(jù)backUp地址,拆分成多個(gè)url该肴,url列表作為RegistryDirectory的notify參數(shù)
    if (backup != null && backup.length() > 0) {
        String[] addresses = Constants.COMMA_SPLIT_PATTERN.split(backup);
        for (String address : addresses) {
            urls.add(url.setAddress(address));
        }
    }
    // 創(chuàng)建RegistryDirectory
    RegistryDirectory<RegistryService> directory = new RegistryDirectory<>(RegistryService.class, url.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()).addParameterAndEncoded(Constants.REFER_KEY, url.toParameterString()));
    // join生成虛擬Invoke
    Invoker<RegistryService> registryInvoker = cluster.join(directory);
    // 創(chuàng)建RegistryService代理對(duì)象
    RegistryService registryService = proxyFactory.getProxy(registryInvoker);
    // 創(chuàng)建注冊(cè)中心
    DubboRegistry registry = new DubboRegistry(registryInvoker, registryService);
    directory.setRegistry(registry);
    // 這里的protocol就是通過(guò)ExtensionLoader的spi注入的
    directory.setProtocol(protocol);
    directory.setRouterChain(RouterChain.buildChain(url));
    // 刷新Directory內(nèi)invokers緩存
    directory.notify(urls);
    // 訂閱consumer數(shù)據(jù)情竹,刷新內(nèi)部configurators,調(diào)用registry代理的subscribe()
    directory.subscribe(new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, RegistryService.class.getName(), url.getParameters()));
    return registry;
}

4沙庐、服務(wù)注冊(cè)

注冊(cè)中心創(chuàng)建完成后鲤妥,執(zhí)行服務(wù)注冊(cè),邏輯比較簡(jiǎn)單拱雏,直接看代碼

// 注冊(cè)中心服務(wù)URL與服務(wù)提供者URL參數(shù)合并.
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
// 服務(wù)提供者注冊(cè)到注冊(cè)表
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
        registryUrl, registeredProviderUrl);
//to judge if we need to delay publish,服務(wù)是否需要注冊(cè)發(fā)布棉安,默認(rèn)true
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
    // 執(zhí)行服務(wù)注冊(cè),核心邏輯即執(zhí)行registry的register铸抑,以DubboRegistry為例贡耽,將服務(wù)url放入registered緩存,然后再執(zhí)行RegistryService代理對(duì)象的register方法鹊汛。
    register(registryUrl, registeredProviderUrl);
    providerInvokerWrapper.setReg(true);
}
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

最后蒲赂,創(chuàng)建DestroyableExporter,并返回。

4.2 服務(wù)引用

服務(wù)引用過(guò)程包括創(chuàng)建注冊(cè)中心、創(chuàng)建注冊(cè)中心目錄(RegistryDirectory)撬呢、聚合Invoker幾部分

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    url = url.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY)).removeParameter(REGISTRY_KEY);
    Registry registry = registryFactory.getRegistry(url);
    // type為RegistryService破加,則直接返回當(dāng)前Invoker代理
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // group="a,b" or group="*",多個(gè)group情況下,使用mergableCluster
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
    String group = qs.get(Constants.GROUP_KEY);
    if (group != null && group.length() > 0) {
        if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    // 真正的引用邏輯
    return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    // 創(chuàng)建RegistryDirectory
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
        directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
        // 這里為什么會(huì)有服務(wù)的注冊(cè)?看代碼日志贸人,是為了解決issue#3295,具體什么問(wèn)題已經(jīng)看不到了,屬實(shí)不理解
        registry.register(directory.getRegisteredConsumerUrl());
    }
    directory.buildRouterChain(subscribeUrl);
    directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
            PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

    // cluster聚合走触,
    Invoker invoker = cluster.join(directory);
    // 注冊(cè)表注冊(cè)
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    return invoker;
}

這里有個(gè)問(wèn)題需要注意,在doRefer方法中有這么一段代碼疤苹,git log顯示是為了解決issue#3295做的修改互广,看不到3295具體是什么問(wèn)題,如果有知道的朋友卧土,勞煩告知惫皱,我及時(shí)修正說(shuō)明。

if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
        directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
        // 這里為什么會(huì)有服務(wù)的注冊(cè)?看代碼日志夸溶,是為了解決issue#3295逸吵,具體什么問(wèn)題已經(jīng)看不到了,屬實(shí)不理解
        registry.register(directory.getRegisteredConsumerUrl());
}

4.3缝裁、協(xié)議銷(xiāo)毀

銷(xiāo)毀比較容易理解扫皱,注銷(xiāo)所有exporter、清理所有exporter緩存捷绑、移除所有配置監(jiān)聽(tīng)器韩脑。

@Override
public void destroy() {
    // 注銷(xiāo)所有exporter
    List<Exporter<?>> exporters = new ArrayList<Exporter<?>>(bounds.values());
    for (Exporter<?> exporter : exporters) {
        exporter.unexport();
    }
    // 清除所有exporter緩存
    bounds.clear();
    // 移除所有監(jiān)聽(tīng)器
    DynamicConfiguration.getDynamicConfiguration()
            .removeListener(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX, providerConfigurationListener);
}

二、Protocol實(shí)現(xiàn)

Protocol的直接實(shí)現(xiàn)類(lèi)包括常見(jiàn)的DubboProtocol粹污、HessianProtocol等段多。下面從AbstractProtocol開(kāi)始,逐個(gè)分析.

1壮吩、AbstractProtocol

AbstractProtocol基類(lèi)并沒(méi)有對(duì)Protocol接口做過(guò)多實(shí)現(xiàn)进苍,僅實(shí)現(xiàn)了destroy方法加缘,邏輯也比較簡(jiǎn)單,銷(xiāo)毀所有的Exporter和Invoker觉啊,具體代碼如下:

@Override
public void destroy() {
    // 注意順序:先銷(xiāo)毀所有invoker拣宏,再銷(xiāo)毀所有exporter
    for (Invoker<?> invoker : invokers) {
        if (invoker != null) {
            invokers.remove(invoker);
            // 中間日志打印省略
            invoker.destroy();
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }
    for (String key : new ArrayList<String>(exporterMap.keySet())) {
        Exporter<?> exporter = exporterMap.remove(key);
        if (exporter != null) {
                    // 日志打印省略
                exporter.unexport();
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }
}

2、DubboProtocol

DubboProtocol是Protocol的默認(rèn)SPI實(shí)現(xiàn)杠人,默認(rèn)端口20880勋乾;除了transporter依賴(lài)netty之外,其他所有邏輯都是dubbo自己實(shí)現(xiàn)嗡善。開(kāi)文提到辑莫,protocol位于exchange層之上(protocol直接依賴(lài)exchange),所以這里也會(huì)對(duì)exchange層的部分實(shí)現(xiàn)做相關(guān)分析罩引。下面各吨,我們?nèi)砸詄xport、refer袁铐、destroy方法為入口绅你,開(kāi)始解析DubboProtocol。

2.1 export(服務(wù)暴露)

先來(lái)看export方法昭躺,核心邏輯分為兩部分(stubService的邏輯處理比較簡(jiǎn)單)忌锯,創(chuàng)建Exporter并緩存、綁定并啟動(dòng)NettyServer(實(shí)際上是綁定ip端口领炫,建立socket連接)偶垮;exporter的創(chuàng)建和緩存比較簡(jiǎn)單:

// export service.創(chuàng)建exporter并緩存
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
// 對(duì)stubService的處理省略

// 綁定server
openServer(url);
// 無(wú)實(shí)質(zhì)邏輯,可以忽略
optimizeSerialization(url);

重點(diǎn)關(guān)注openServer方法帝洪。首先似舵,dubbo會(huì)先從serverMap緩存里取server實(shí)例,取到則會(huì)執(zhí)行reset操作葱峡,具體邏輯在HeaderExchangeServer#reset方法砚哗;取不到則直接創(chuàng)建,核心邏輯在createServer方法砰奕。createServer方法內(nèi)部有幾個(gè)細(xì)節(jié)需要注意: 1蛛芥、server關(guān)閉發(fā)送只讀事件,開(kāi)關(guān)默認(rèn)開(kāi)啟军援;2仅淑、默認(rèn)心跳間隔60s;3胸哥、默認(rèn)采用NettyTransporter涯竟;4、編解碼器Codec,默認(rèn)采用DubboCodec庐船。createServer方法實(shí)際上通過(guò)Exchanger接口的bind方法實(shí)現(xiàn)银酬,以HeaderExchanger(Exchanger的默認(rèn)SPI實(shí)現(xiàn))為例,核心邏輯如下:

@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

可以看到筐钟,bind方法又借助Transporter的bind實(shí)現(xiàn)捡硅,創(chuàng)建Server實(shí)例。以NettyTransporter(Transporter接口的默認(rèn)SPI實(shí)現(xiàn))為例盗棵,直接創(chuàng)建NettyServer:

@Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

Server實(shí)例化邏輯大部分由父類(lèi)AbstractServer構(gòu)造方法完成。另外北发,AbstractServer定義模板方法doOpen纹因,并在其構(gòu)造方法中調(diào)用,doOpen方法邏輯由子類(lèi)Server實(shí)現(xiàn)琳拨,也就說(shuō)在實(shí)例化NettyServer時(shí)會(huì)執(zhí)行NettyServer.doOpen的動(dòng)作瞭恰,具體代碼如下:

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
  // 參數(shù)初始化階段
  super(url, handler);
    localAddress = getUrl().toInetSocketAddress();
    String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
    int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
    if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
        bindIp = Constants.ANYHOST_VALUE;
    }
    bindAddress = new InetSocketAddress(bindIp, bindPort);
    this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
    this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
  // 執(zhí)行doOpen模板方法,由具體的Server實(shí)現(xiàn)  
  try {
        doOpen();
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
        }
    } catch (Throwable t) {
        throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
    }
    //fixme replace this with better method狱庇,初始化線程池
    DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
    executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}

NettyServer的doOpen方法:

// 綁定ip與端口
@Override
protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    // boss惊畏、worker線程池
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    bootstrap = new ServerBootstrap(channelFactory);

    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    bootstrap.setOption("child.tcpNoDelay", true);
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        @Override
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    // bind,最終調(diào)用netty的bootstrap.bind方法建立channel通道
    channel = bootstrap.bind(getBindAddress());
}

到這里,createServer的動(dòng)作就完成了密任,其本質(zhì)是借助netty創(chuàng)建服務(wù)端socket連接颜启,并綁定ip和端口。再來(lái)看浪讳,若serverMap緩存中server已存在缰盏,則執(zhí)行reset方法。核心邏輯是淹遵,若當(dāng)前心跳值與url心跳值不一致或者當(dāng)前空閑超時(shí)時(shí)間與url的空閑超時(shí)時(shí)間不一致口猜,取消關(guān)閉任務(wù)(CloseTimerTask)并重啟空閑連接檢查任務(wù)(借助前面提到的HashedWheelTimer定時(shí)器實(shí)現(xiàn))。代碼如下:

@Override
public void reset(URL url) {
    server.reset(url);
    try {
        int currHeartbeat = UrlUtils.getHeartbeat(getUrl());
        int currIdleTimeout = UrlUtils.getIdleTimeout(getUrl());
        int heartbeat = UrlUtils.getHeartbeat(url);
        int idleTimeout = UrlUtils.getIdleTimeout(url);
        // 當(dāng)前心跳值與url心跳值不一致或者當(dāng)前空閑超時(shí)時(shí)間與url的空閑超時(shí)時(shí)間不一致透揣,取消關(guān)閉任務(wù)并重啟空閑檢查任務(wù)济炎。
        if (currHeartbeat != heartbeat || currIdleTimeout != idleTimeout) {
            cancelCloseTask();
            startIdleCheckTask(url);
        }
    } catch (Throwable t) {
        logger.error(t.getMessage(), t);
    }
}

2.2 refer(服務(wù)引用)

DubboProtocol中服務(wù)引用過(guò)程比較簡(jiǎn)單,大致分為連接Server辐真、創(chuàng)建client數(shù)組须尚,初始化Invoker,緩存至Invoker列表等幾個(gè)步驟侍咱,最后后返回創(chuàng)建的Invoker恨闪。

@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
  // 無(wú)有效邏輯,可以忽略  
  optimizeSerialization(url);
    //初始化Invoker,重點(diǎn)關(guān)注getClients
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
  // 加入invokers列表緩存
  invokers.add(invoker);
  return invoker;
}

創(chuàng)建Client數(shù)據(jù)的過(guò)程比較有意思放坏,首先咙咽,dubbo默認(rèn)是不共享service連接的,即一個(gè)service一個(gè)連接淤年,我們按照共享和不共享連接兩種情況分析:

1钧敞、共享連接

        a蜡豹、當(dāng)用戶沒(méi)有自定義連接數(shù)時(shí),dubbo會(huì)默認(rèn)共享連接溉苛,且連接數(shù)默認(rèn)為1镜廉;

        b、構(gòu)建sharedClient列表愚战,此時(shí)會(huì)優(yōu)先從緩存map中取娇唯,若緩存的sharedClient列表內(nèi)client均可用則直接返回;若緩存結(jié)果為空寂玲,則初始化sharedClient列表并緩存到map塔插,返回sharedClient列表結(jié)果;若緩存結(jié)果非空拓哟,即緩存的sharedClient列表內(nèi)部分client不可用的情況想许,則會(huì)新建client替換列表中不可用的client。注意:這里創(chuàng)建的是ReferenceCountExchangeClient實(shí)例

        c断序、將sharedClient列表內(nèi)client復(fù)制到client數(shù)組流纹,作為最終結(jié)果返回。

 2违诗、不共享連接

      直接執(zhí)行client數(shù)組的初始化(*initClient方法*)

核心代碼如下:

private ExchangeClient[] getClients(URL url) {
    // 默認(rèn)不共享連接
    boolean useShareConnect = false;
    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
    List<ReferenceCountExchangeClient> shareClients = null;
    // 若用戶沒(méi)有自定義連接數(shù)漱凝,則dubbo認(rèn)為共享連接,否則每個(gè)服務(wù)獨(dú)享一個(gè)連接
    if (connections == 0) {
        useShareConnect = true;
        /**
         * The xml configuration should have a higher priority than properties.
         */
        String shareConnectionsStr = url.getParameter(Constants.SHARE_CONNECTIONS_KEY, (String) null);
        // 共享連接數(shù)默認(rèn)是1
        connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(Constants.SHARE_CONNECTIONS_KEY,Constants.DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
        // 構(gòu)建共享client列表
        shareClients = getSharedClient(url, connections);
    }
    // clients.size >= 1
    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (useShareConnect) {
            clients[i] = shareClients.get(i);
        } else {
            clients[i] = initClient(url);
        }
    }
    return clients;
}

構(gòu)建共享client列表的邏輯在getSharedClient方法诸迟,篇幅原因碉哑,這里略去。在構(gòu)建client列表過(guò)程中亮蒋,都會(huì)涉及client的初始化扣典,核心邏輯在initClient。initClient方法代碼如下慎玖,其中屬性值配置部分略過(guò)(屬性值配置,同樣默認(rèn)采用NettyTransporter贮尖,默認(rèn)心跳間隔60s,編解碼默認(rèn)采用DubboCodec):

private ExchangeClient initClient(URL url) {
        // 屬性值配置及參數(shù)校驗(yàn)省略
    ExchangeClient client;
    try {
        // 延遲連接趁怔,即延遲到真正使用client時(shí)才會(huì)進(jìn)行初始化
        if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
            client = new LazyConnectExchangeClient(url, requestHandler);
        } else {
            // 重點(diǎn)關(guān)注湿硝,借助org.apache.dubbo.remoting.exchange.Exchanger的connect方法,創(chuàng)建client
            client = Exchangers.connect(url, requestHandler);
        }
    } catch (RemotingException e) {
     throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
    }
  return client;
}

與2.1中export方法類(lèi)似润努,初始化client方法借助Exchanger接口connect方法實(shí)現(xiàn)关斜,同樣以HeadExchanger為例,

@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

可以看到铺浇,connect方法通過(guò)Transporter的connect方法實(shí)現(xiàn)痢畜,以NettyTransporter為例,直接實(shí)例化NettyClient并返回

@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
    return new NettyClient(url, listener);
}

NettyClient實(shí)例化過(guò)程與NettyServer類(lèi)似,也是借助抽象基類(lèi)(AbstractClient)構(gòu)造方法完成丁稀。抽象基類(lèi)AbstractClient定義了doOpen吼拥、doConnect、doClose线衫、doDisconnect等模板方法凿可,留給具體子類(lèi)實(shí)現(xiàn)。在其構(gòu)造方法中授账,除了部分屬性初始化邏輯之外枯跑,會(huì)依次執(zhí)行doOpen、connect白热。

// 篇幅原因敛助,方法內(nèi)異常處理、日志打印均省略
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
        // 是否需要重連棘捣,dubbo默認(rèn)無(wú)需重連
    needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
    // 打開(kāi)Channel
    doOpen();
    // 執(zhí)行連接 connect.邏輯比較簡(jiǎn)單,內(nèi)部實(shí)際還是執(zhí)行具體子類(lèi)的doConnect方法
    connect();
      // 初始化消費(fèi)者端線程池,然后移除緩存中線程池
    executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
            .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
    ExtensionLoader.getExtensionLoader(DataStore.class)
            .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}

以NettyClient為例休建,來(lái)看doOpen與doConnect方法內(nèi)部究竟做了什么:

@Override
protected void doOpen() throws Throwable {
        // 日志配置
    NettyHelper.setNettyLoggerFactory();
    // 初始化bootStrap
    bootstrap = new ClientBootstrap(channelFactory);
    // config乍恐,完善bootStrap參數(shù)配置
    // @see org.jboss.netty.channel.socket.SocketChannelConfig
    bootstrap.setOption("keepAlive", true);
    bootstrap.setOption("tcpNoDelay", true);
    bootstrap.setOption("connectTimeoutMillis", getConnectTimeout());
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        @Override
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
}

NettyClient的doOpen方法,僅僅是對(duì)bootStrap做了初始化和部分屬性的完善测砂,再來(lái)看doConnect方法:

// 省略異常處理與日志打印
@Override
protected void doConnect() throws Throwable {
    long start = System.currentTimeMillis();
    // 通過(guò)netty建立連接茵烈,拿到NettyChannel
    ChannelFuture future = bootstrap.connect(getConnectAddress());
    try {
        // 非中斷標(biāo)志
        boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
        if (ret && future.isSuccess()) {
            Channel newChannel = future.getChannel();
            newChannel.setInterestOps(Channel.OP_READ_WRITE);
            try {
                // Close old channel,關(guān)閉老的channel
                Channel oldChannel = NettyClient.this.channel; // copy reference
                if (oldChannel != null) {
                    try {
                        oldChannel.close();
                    } finally {
                        // 老的channel從緩存中刪除
                        NettyChannel.removeChannelIfDisconnected(oldChannel);
                    }
                }
             } finally {
                // 若當(dāng)前通道處于關(guān)閉狀態(tài),那么新建的通道也保持關(guān)閉狀態(tài)
                if (NettyClient.this.isClosed()) {
                    try {
                        newChannel.close();
                    } finally {
                        NettyClient.this.channel = null;
                        NettyChannel.removeChannelIfDisconnected(newChannel);
                    }
                } else {
                    // 原本通道處于開(kāi)啟狀態(tài)砌些,那么當(dāng)前通道直接替換為新建的channel
                    NettyClient.this.channel = newChannel;
                }
            }
       } else if (future.getCause() != null) {
        // 異常包裝后拋出
    } finally {
        // 連接失敗呜投,則取消future
        if (!isConnected()) {
           future.cancel();
         }
    }
}

doConnect借助doOpen初始化后的bootStrap創(chuàng)建新的netty通道。有個(gè)細(xì)節(jié)需要注意存璃,每次連接都會(huì)新建Channel仑荐,而且新建的Channel要與連接前的channel狀態(tài)保持一致。到這里為止纵东,采用HeaderExchangerClient粘招、NettyTransporter的DubboProtocol的服務(wù)引用流程結(jié)束。

2.3 destroy(協(xié)議銷(xiāo)毀)

destroy主要用作資源的回收偎球,比如server洒扎、client的關(guān)閉等,直接來(lái)看代碼:

public void destroy() {
    // 關(guān)閉serverMap緩存內(nèi)所有server
    for (String key : new ArrayList<>(serverMap.keySet())) {
        ExchangeServer server = serverMap.remove(key);
        if (server == null) {
            continue;
        }
        try {
            if (logger.isInfoEnabled()) {
                logger.info("Close dubbo server: " + server.getLocalAddress());
            }
                    server.close(ConfigurationUtils.getServerShutdownTimeout());
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
    }
        
    // 關(guān)閉clientMap緩存內(nèi)所有client
    for (String key : new ArrayList<>(referenceClientMap.keySet())) {
        List<ReferenceCountExchangeClient> clients = referenceClientMap.remove(key);
                if (CollectionUtils.isEmpty(clients)) {
            continue;
        }
        for (ReferenceCountExchangeClient client : clients) {
            closeReferenceCountExchangeClient(client);
        }
    }
        // stubService方法緩存清空
    stubServiceMethodsMap.clear();
    // 父類(lèi)destroy方法衰絮,參考AbstractProtocol的destroy方法
    super.destroy();
}

3袍冷、InjvmProtocol

InjvmProtocol即Protocol本地協(xié)議實(shí)現(xiàn),僅支持服務(wù)本地暴露和引用猫牡,默認(rèn)端口0胡诗。InjvmProtocol對(duì)export、refer非常簡(jiǎn)單

// export方法,新建InjvmExporter乃戈,放入父類(lèi)的exporterMap
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
// 順便來(lái)看InjvmExporter的構(gòu)造方法
InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
        super(invoker);
        this.key = key;
        this.exporterMap = exporterMap;
        exporterMap.put(key, this);
}
// refer方法褂痰,新建InjvmInvoker,直接返回
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap);
}

除此之外症虑,有兩個(gè)地方需要注意缩歪,InjvmProtocol對(duì)外提供單例實(shí)現(xiàn),線程安全由ExtensionLoader.getExtesion保證(內(nèi)部DCL鎖保證線程安全)代碼如下:

private static InjvmProtocol INSTANCE;
public InjvmProtocol() {
        INSTANCE = this;
}

public static InjvmProtocol getInjvmProtocol() {
    if (INSTANCE == null) {
        //注意谍憔,這里如果直接new InjvmProtocol則是非線程安全的單例實(shí)現(xiàn)匪蝙;
        ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(InjvmProtocol.NAME); // load
    }
    return INSTANCE;
}

另一個(gè)需要注意的地方是對(duì)本地引用的判斷,如果url中的參數(shù)scope=local习贫,或者參數(shù)injvm=true會(huì)被判斷為本地引用逛球;另外,若本地有服務(wù)暴露苫昌,則也會(huì)通過(guò)本地引用颤绕,具體代碼如下:

public boolean isInjvmRefer(URL url) {
    String scope = url.getParameter(Constants.SCOPE_KEY);
    // 對(duì)于本地引用來(lái)說(shuō),scope = local 與 injvm = true是完全等價(jià)的
    if (Constants.SCOPE_LOCAL.equals(scope) || (url.getParameter(Constants.LOCAL_PROTOCOL, false))) {
        return true;
    } else if (Constants.SCOPE_REMOTE.equals(scope)) {
        // 遠(yuǎn)程引用
        return false;
    } else if (url.getParameter(Constants.GENERIC_KEY, false)) {
        // 通用invocation非本地引用
        return false;
    } else if (getExporter(exporterMap, url) != null) {
        // 默認(rèn)情況下祟身,如果有本地服務(wù)暴露奥务,則通過(guò)本地引用
        return true;
    } else {
        return false;
    }
}

4、RedisProtocol

RedisProtocol僅支持refer操作袜硫,協(xié)議默認(rèn)端口6379氯葬,內(nèi)部借助jedis實(shí)現(xiàn),將url中g(shù)et、set婉陷、delete參數(shù)值(默認(rèn)值分別為get帚称、set、delete)分別與會(huì)話域(Invocation)中的的方法名秽澳、參數(shù)(有且僅有一個(gè)參數(shù))匹配闯睹,若匹配成功,則返回一個(gè)Invoker實(shí)現(xiàn)(invoke方法內(nèi)部會(huì)執(zhí)行redis的get(或者set担神、delete方法瞻坝,并返回操作結(jié)果)。核心代碼如下:

// 其他參數(shù)設(shè)置代碼省略
final String get = url.getParameter("get", "get");
final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set");
final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete");
return new AbstractInvoker<T>(type, url) {
    @Override
    protected Result doInvoke(Invocation invocation) throws Throwable {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();

            // get方法名取自u(píng)rl中的get參數(shù),該方法有且只能有一個(gè)參數(shù)
            if (get.equals(invocation.getMethodName())) {
                if (invocation.getArguments().length != 1) {
                    throw new IllegalArgumentException("The redis get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
                }
                // 執(zhí)行redis get操作
                byte[] value = jedis.get(String.valueOf(invocation.getArguments()[0]).getBytes());
                if (value == null) {
                    return new RpcResult();
                }
                ObjectInput oin = getSerialization(url).deserialize(url, new ByteArrayInputStream(value));
                return new RpcResult(oin.readObject());
            } else if (set.equals(invocation.getMethodName())) {
                if (invocation.getArguments().length != 2) {
                    throw new IllegalArgumentException("The redis set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
                }
                byte[] key = String.valueOf(invocation.getArguments()[0]).getBytes();
                ByteArrayOutputStream output = new ByteArrayOutputStream();
                ObjectOutput value = getSerialization(url).serialize(url, output);
                value.writeObject(invocation.getArguments()[1]);
                // 執(zhí)行redis的set操作
                jedis.set(key, output.toByteArray());
                if (expiry > 1000) {
                    jedis.expire(key, expiry / 1000);
                }
                return new RpcResult();
            } else if (delete.equals(invocation.getMethodName())) {
                if (invocation.getArguments().length != 1) {
                    throw new IllegalArgumentException("The redis delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
                }
                // 執(zhí)行redis的delete操作
                jedis.del(String.valueOf(invocation.getArguments()[0]).getBytes());
                return new RpcResult();
            } else {
                throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in redis service.");
            }
        } catch (Throwable t) {
            RpcException re = new RpcException("Failed to invoke redis service method. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url + ", cause: " + t.getMessage(), t);
            if (t instanceof TimeoutException || t instanceof SocketTimeoutException) {
                re.setCode(RpcException.TIMEOUT_EXCEPTION);
            } else if (t instanceof JedisConnectionException || t instanceof IOException) {
                re.setCode(RpcException.NETWORK_EXCEPTION);
            } else if (t instanceof JedisDataException) {
                re.setCode(RpcException.SERIALIZATION_EXCEPTION);
            }
            throw re;
        } finally {
            if (jedis != null) {
                try {
                    // 關(guān)閉redis
                    jedis.close();
                } catch (Throwable t) {
                    logger.warn("returnResource error: " + t.getMessage(), t);
                }
            }
        }
    }

    @Override
    public void destroy() {
        super.destroy();
        try {
            // 銷(xiāo)毀jedis連接池
            jedisPool.destroy();
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    }
};

5杏瞻、MemcachedProtocol

MemcachedProtocol同樣僅支持refer操作所刀,默認(rèn)端口 11211,內(nèi)部借助google的工具xmemcached實(shí)現(xiàn)(有興趣的同學(xué)可以了解xmemcached的使用捞挥,jar包版本如下),邏輯與RedisProcotol的refer方法類(lèi)似浮创,url中g(shù)et、set砌函、delete參數(shù)值(默認(rèn)值分別為get斩披、set溜族、delete)分別與會(huì)話域中的的方法名、參數(shù)(有且僅有一個(gè)參數(shù))匹配垦沉,若匹配上返回一個(gè)Invoker實(shí)現(xiàn)(Invoke方法會(huì)執(zhí)行memcache的get(或者set煌抒、delete方法),并返回操作結(jié)果)厕倍。

<dependency>
  <groupId>com.googlecode.xmemcached</groupId>
  <artifactId>xmemcached</artifactId>
  <version>1.3.6</version>
</dependency>

核心代碼如下:

// 其他參數(shù)設(shè)置類(lèi)代碼省略
final int expiry = url.getParameter("expiry", 0);
final String get = url.getParameter("get", "get");
final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set");
final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete");
return new AbstractInvoker<T>(type, url) {
    @Override
    protected Result doInvoke(Invocation invocation) throws Throwable {
        try {
            if (get.equals(invocation.getMethodName())) {
                if (invocation.getArguments().length != 1) {
                    throw new IllegalArgumentException("The memcached get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
                }
                // 執(zhí)行memecache的get操作
                return new RpcResult(memcachedClient.get(String.valueOf(invocation.getArguments()[0])));
            } else if (set.equals(invocation.getMethodName())) {
                // 參數(shù)長(zhǎng)度非法
                if (invocation.getArguments().length != 2) {
                    throw new IllegalArgumentException("The memcached set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
                }
                // 執(zhí)行memcache的set操作 
                memcachedClient.set(String.valueOf(invocation.getArguments()[0]), expiry, invocation.getArguments()[1]);
                return new RpcResult();
            } else if (delete.equals(invocation.getMethodName())) {
                if (invocation.getArguments().length != 1) {
                    throw new IllegalArgumentException("The memcached delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
                }
                // 執(zhí)行memcache的delete操作
                memcachedClient.delete(String.valueOf(invocation.getArguments()[0]));
                return new RpcResult();
            } else {
                throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in memcached service.");
            }
        } catch (Throwable t) {
            RpcException re = new RpcException("Failed to invoke memcached service method. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url + ", cause: " + t.getMessage(), t);
            if (t instanceof TimeoutException || t instanceof SocketTimeoutException) {
                re.setCode(RpcException.TIMEOUT_EXCEPTION);
            } else if (t instanceof MemcachedException || t instanceof IOException) {
                re.setCode(RpcException.NETWORK_EXCEPTION);
            }
            throw re;
        }
    }

    @Override
    public void destroy() {
        super.destroy();
        try {
            // memcache關(guān)閉操作
            memcachedClient.shutdown();
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    }
};

小結(jié)

上面介紹的幾種procotol直接繼承基類(lèi)AbstractProtocol寡壮,都比較容易理解。其中DubboProtocol比較復(fù)雜讹弯,完全由dubbo自己實(shí)現(xiàn)况既,僅在tansport層(網(wǎng)絡(luò)傳輸層)依賴(lài)了netty的網(wǎng)絡(luò)通信能力,相關(guān)的分析參考DubboProtocol章節(jié)组民。除了上面介紹的幾種Protocol棒仍,還有一類(lèi)protocol具備代理能力,即下面章節(jié)將要介紹的AbstractProxyProtocol臭胜。

6莫其、AbstractProxyProtocol

6.1、基礎(chǔ)方法實(shí)現(xiàn)

AbstractProxyProtocol在AbstractProtocol基類(lèi)基礎(chǔ)上引入ProxyFactory耸三,同樣實(shí)現(xiàn)了export乱陡、refer方法(內(nèi)部主要借助定義的doExport、doRefer模板方法實(shí)現(xiàn)吕晌,模板方法由具體子類(lèi)實(shí)現(xiàn))蛋褥。AbstractProxyProtocol的子類(lèi)實(shí)現(xiàn)有HessianProtocol临燃、HttpProtocol睛驳、RestProtocol、RmiProtocol膜廊、WebServiceProtocol乏沸。先來(lái)看export、refer方法:

public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
    final String uri = serviceKey(invoker.getUrl());
    // 優(yōu)先從緩存取
    Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);
    if (exporter != null) {
        // exporter的url信息不一致時(shí)爪瓜,重新執(zhí)行暴露邏輯;url信息一致則直接返回
        if (Objects.equals(exporter.getInvoker().getUrl(), invoker.getUrl())) {
            return exporter;
        }
    }
    // doExport方法由具體子類(lèi)實(shí)現(xiàn)蹬跃,這里會(huì)借助proxyFacotry生成代理實(shí)例
    final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl());
    // 匿名類(lèi)實(shí)現(xiàn),初始化exporter的Invoker铆铆,重寫(xiě)unExport方法
    exporter = new AbstractExporter<T>(invoker) {
        @Override
        public void unexport() {
            super.unexport();
            exporterMap.remove(uri);
            if (runnable != null) {
                try {
                    runnable.run();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }
    };
    // exporter放入exporterMap緩存
    exporterMap.put(uri, exporter);
    return exporter;
}

再來(lái)看refer方法:

@Override
public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException {
    // 借助proxyFactory生成Invoker代理實(shí)例蝶缀,內(nèi)部調(diào)用doRefer方法
    final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url);
    Invoker<T> invoker = new AbstractInvoker<T>(type, url) {
        @Override
        protected Result doInvoke(Invocation invocation) throws Throwable {
            try {
                // 具體代理Invoker對(duì)象執(zhí)行invoke邏輯
                Result result = target.invoke(invocation);
                // 有異常,則包裝為rpcException之后薄货,重新拋出
                Throwable e = result.getException();
                if (e != null) {
                    for (Class<?> rpcException : rpcExceptions) {
                        if (rpcException.isAssignableFrom(e.getClass())) {
                            throw getRpcException(type, url, invocation, e);
                        }
                    }
                }
                return result;
            } catch (RpcException e) {
                if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
                    e.setCode(getErrorCode(e.getCause()));
                }
                throw e;
            } catch (Throwable e) {
                throw getRpcException(type, url, invocation, e);
            }
        }
    };
    // 保存到invokers緩存列表
    invokers.add(invoker);
    return invoker;
}

邏輯比較簡(jiǎn)單翁都,內(nèi)部核心的doExport、doRefer方法由具體子類(lèi)實(shí)現(xiàn)(ProxyFactory部分后面會(huì)開(kāi)篇進(jìn)行分析),來(lái)看這兩個(gè)模板方法的定義:

// 模板方法 doExport
protected abstract <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException;
// 模板方法 doRefer
protected abstract <T> T doRefer(Class<T> type, URL url) throws RpcException;

6.2谅猾、核心輔助接口

了解完AbstractProxyProtocol內(nèi)部實(shí)現(xiàn)柄慰,介紹具體子類(lèi)實(shí)現(xiàn)之前鳍悠,先來(lái)看幾個(gè)輔助接口:HttpServer、HttpBinder坐搔、HttpHandler藏研。HttpBinder用于綁定HttpServer的具體執(zhí)行動(dòng)作(由HttpHandler實(shí)現(xiàn)),HttpServer則是所有HttpServer的抽象概行。其中蠢挡,HttpBinder支持SPI,默認(rèn)SPI實(shí)現(xiàn)是JettyHttpBinder占锯。

6.2.1袒哥、HttpServer

HttpServer接口繼承Resetable,即支持reset功能消略。除此之外堡称,提供基本的server關(guān)閉等功能。子類(lèi)通過(guò)繼承抽象基類(lèi)AbstractHttpServer艺演,實(shí)現(xiàn)類(lèi)包括JettyHttpServer却紧、ServletHttpServer、TomcatHttpServer胎撤,從名字可以看出晓殊,主要是借助Jetty、Tomcat巫俺、Servlet容器為dubbo的服務(wù)暴露提供支持。下面按照順序依次進(jìn)行分析,核心邏輯都在構(gòu)造方法叹卷,先來(lái)看JettyHttpServer。

6.2.1.1 JettyHttpServer

JettyHttpServer的構(gòu)造方法主要完成JettyServer的初始化和啟動(dòng):1、創(chuàng)建并初始化JettyServer墨技;2扣汪、啟動(dòng)JettyServer:

public JettyHttpServer(URL url, final HttpHandler handler) {
    super(url, handler);
    this.url = url;
    // TODO we should leave this setting to slf4j
    // we must disable the debug logging for production use
    Log.setLog(new StdErrLog());
    Log.getLog().setDebugEnabled(false);
    // handler交給dispatcherServlet統(tǒng)一托管
    DispatcherServlet.addHttpHandler(url.getParameter(Constants.BIND_PORT_KEY, url.getPort()), handler);
    // 默認(rèn)線程數(shù)200,使用jetty的隊(duì)列線程池
    int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
    QueuedThreadPool threadPool = new QueuedThreadPool();
    threadPool.setDaemon(true);
    threadPool.setMaxThreads(threads);
    threadPool.setMinThreads(threads);

    // 創(chuàng)建并初始化nettyServer,指定JettyServer線程池大小、connector赫段、以及Handler
    server = new Server(threadPool);
    ServerConnector connector = new ServerConnector(server);
    String bindIp = url.getParameter(Constants.BIND_IP_KEY, url.getHost());
    if (!url.isAnyHost() && NetUtils.isValidLocalHost(bindIp)) {
        connector.setHost(bindIp);
    }
    connector.setPort(url.getParameter(Constants.BIND_PORT_KEY, url.getPort()));
    server.addConnector(connector);

    // DispatcherServlet 托管給ServletHandler
    ServletHandler servletHandler = new ServletHandler();
    ServletHolder servletHolder = servletHandler.addServletWithMapping(DispatcherServlet.class, "/*");
    servletHolder.setInitOrder(2);

    // 設(shè)置JettyServer的handler 為ServletContextHandler
    ServletContextHandler context = new ServletContextHandler(server, "/", ServletContextHandler.SESSIONS);
    context.setServletHandler(servletHandler);
    ServletManager.getInstance().addServletContext(url.getParameter(Constants.BIND_PORT_KEY, url.getPort()), context.getServletContext());

    try {
        // 啟動(dòng)nettyServer
        server.start();
    } catch (Exception e) {
        throw new IllegalStateException("Failed to start jetty server on " + url.getParameter(Constants.BIND_IP_KEY) + ":" + url.getParameter(Constants.BIND_PORT_KEY) + ", cause: "
            + e.getMessage(), e);
    }
}

這塊代碼看起來(lái)比較好理解撩银,但是有許多細(xì)節(jié)需要注意够庙,比如自定義的Handler邏輯如何執(zhí)行暮屡?什么時(shí)候執(zhí)行准夷?首先,代碼注釋中做了說(shuō)明结闸,自定義Handler會(huì)交給DispatcherServlet管理;然后,創(chuàng)建ServletHandler實(shí)例图甜,調(diào)用ServertHandler.addServletWithMapping方法將DispatcherServlet交給Jetty的ServeltHandler钦讳;再接著匪凡,創(chuàng)建ServletContextHandler,并將該ServletHandler傳遞給該實(shí)例,最后將JettyServer(父類(lèi)HandlerWrapper的屬性)的_handler引用指向創(chuàng)建的ServletContextHandler實(shí)例,自定義的Handler就完全托管給JettyServer了狮惜,JettyServer的啟動(dòng)流程如下:

DispatcherServlet.addHttpHandler 
-> new QueueThreadPool()
-> 新建JettyServer實(shí)例筏餐,new Server() 
-> 同步執(zhí)行ServletHandler.addServletWithMapping 
-> new ServletContextHandler()當(dāng)前handler托管給JettyServer 
-> 啟動(dòng)JettyServer惠呼,JettyServer.start

下一個(gè)問(wèn)題是handler的handle邏輯什么時(shí)候執(zhí)行呢,可以肯定的一點(diǎn)是,handle一定是通過(guò)DispatcherServlet的service方法來(lái)執(zhí)行,來(lái)看邏輯:

@Override
protected void service(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {
    // 根據(jù)端口獲取具體使用的handler
    HttpHandler handler = handlers.get(request.getLocalPort());
    if (handler == null) {// service not found.
        response.sendError(HttpServletResponse.SC_NOT_FOUND, "Service not found.");
    } else {
        // 具體的handle邏輯在這里執(zhí)行
        handler.handle(request, response);
    }
}

這么簡(jiǎn)單嗎?當(dāng)然不是,下著來(lái)看樟遣,執(zhí)行流程如下:

JettyServer初始化過(guò)程中,初始化線程池
new QueuedThreadPool() -> _runnable = new Runnable() 
-> jetty線程池異步調(diào)度執(zhí)行 -> runJob()-> ChannelEndPoint._runFillalbe -> FillInterest.fillable()
-> AbstractConnection.ReadCallback.succeeded() -> HttpConnection.onFillable() 
-> HttpChannelOverHttp.handle()
-> Server.handle()(JettyServer構(gòu)造過(guò)程中脱篙,會(huì)把傳入的handler塞到Server中) 
-> 執(zhí)行具體Server子類(lèi)的Handler的handle方法

詳細(xì)流程有興趣的話可以參考Jetty的QueueThreadPool實(shí)現(xiàn)适刀。

6.2.1.2 、TomcatHttpServer

與JettyHttpServer類(lèi)似吧彪,TomcatHttpServer為dubbo提供web容器能力崩侠,核心邏輯在構(gòu)造方法,同樣包括兩部分售淡,Tomcat容器初始化和容器的啟動(dòng),代碼如下:

public TomcatHttpServer(URL url, final HttpHandler handler) {
        super(url, handler);

        this.url = url;
            // 同樣的揍堕,自定義handler托管給DispatcherServlet
        DispatcherServlet.addHttpHandler(url.getPort(), handler);
        String baseDir = new File(System.getProperty("java.io.tmpdir")).getAbsolutePath();
            // tomcat屬性配置楞慈,與server.xml中配置項(xiàng)等同
        tomcat = new Tomcat();
        tomcat.setBaseDir(baseDir);
        tomcat.setPort(url.getPort());
        tomcat.getConnector().setProperty("maxThreads",     String.valueOf(url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS)));
                tomcat.getConnector().setProperty(
                "maxConnections", String.valueOf(url.getParameter(Constants.ACCEPTS_KEY, -1)));
        tomcat.getConnector().setProperty("URIEncoding", "UTF-8");
        tomcat.getConnector().setProperty("connectionTimeout", "60000");
        tomcat.getConnector().setProperty("maxKeepAliveRequests", "-1");
        tomcat.getConnector().setProtocol("org.apache.coyote.http11.Http11NioProtocol");

            // DisptatcherServlet交給Tomcat的context管理
        Context context = tomcat.addContext("/", baseDir);
        Tomcat.addServlet(context, "dispatcher", new DispatcherServlet());
        context.addServletMapping("/*", "dispatcher");
        ServletManager.getInstance().addServletContext(url.getPort(), context.getServletContext());

        try {
            // 啟動(dòng)tomcat容器
            tomcat.start();
        } catch (LifecycleException e) {
            throw new IllegalStateException("Failed to start tomcat server at " + url.getAddress(), e);
        }
    }

同樣的,我們來(lái)看自定義的handler的執(zhí)行流程运杭,DispatcherServlet入口就不再介紹了,與JettyHttpServer一樣通過(guò)線程池異步執(zhí)行腕巡,直接來(lái)看Tomcat的調(diào)度流程,有興趣的同學(xué)可以自己研究下Tomcat的工作流程

線程池內(nèi)工作線程隊(duì)列,SocketProcessorBase.run() -> NioEndpoint$SocketProcessor.doRun() -> AbstractProtocol.process()
-> AbstractProcessorLight.process() -> Http11Processor.service()
-> CoyoteAdapter.service() -> StandardEngineValve.invoke()
-> ErrorReportValve.invoke() -> StandardHostValve.invoke()
-> AuthenticatorBase.invoke() -> StandardContextValve.invoke()
-> StandardWrapperValve.invoke() -> ApplicationFilterChain.doFilter()
-> ApplicationFilterChain.internalDoFilter() -> HttpServlet.service() -> DispatcherServlet.service()
-> 執(zhí)行具體的handle邏輯
6.2.1.3 表伦、ServletHttpServer

ServletHttpServer比較簡(jiǎn)單翔怎,直接使用HttpServlet作為web容器容握,代碼也比較簡(jiǎn)單,不做過(guò)多解析令花。

public ServletHttpServer(URL url, HttpHandler handler) {
    super(url, handler);
    DispatcherServlet.addHttpHandler(url.getParameter(Constants.BIND_PORT_KEY, 8080), handler);
}

綜上趟章,HttpServer接口主要提供web容器北戏,借助HttpBinder將dubbo服務(wù)暴露URL與web容器綁定,由web容器統(tǒng)一管理消費(fèi)者請(qǐng)求搂赋。

6.2.2轰豆、HttpBinder

HttpBinder負(fù)責(zé)dubbo服務(wù)與web容器的綁定,接口支持SPI擴(kuò)展糙置,默認(rèn)實(shí)現(xiàn)是JettyHttpBinder亡容,即默認(rèn)使用Jetty作為web容器龟糕。當(dāng)然,可以通過(guò)URL參數(shù)指定容器類(lèi)型舶治,比如 &server=tomcat指定使用Tomcat坛悉。同時(shí)支持自定義Handler荡澎,用于web容器對(duì)綁定URL的處理衔彻。HttpBinder的邏輯非常簡(jiǎn)單祖搓,這里以JettyHttpBinder為例该贾,代碼如下:

@Override
public HttpServer bind(URL url, HttpHandler handler) {
    // 綁定URL與web容器寇荧,同時(shí)指定handler
    return new JettyHttpServer(url, handler);
}
6.2.3刃泌、HttpHandler

HttpHandler接口只有一個(gè)通用方法hanlde,在dubbo請(qǐng)求過(guò)程中,對(duì)請(qǐng)求做處理母截。

void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException;

HttpHandler主要實(shí)現(xiàn)類(lèi)有HessianProtocol.HessianHandler到忽、HttpProtocol.InternalHandler、WebServiceProtocol.WebServiceHandler清寇,分別位于對(duì)應(yīng)的Protocol實(shí)現(xiàn)中喘漏,下面從HessianHandler開(kāi)始,逐一解析颗管。HessianHandler核心邏輯是借助HessianSkeleton完成單次rpc請(qǐng)求陷遮,代碼如下:

private class HessianHandler implements HttpHandler {

    // 借助HessianSkeleton,實(shí)現(xiàn)rpc請(qǐng)求
    // 什么時(shí)候執(zhí)行該handler垦江?
    // 對(duì)于jetty來(lái)說(shuō)帽馋,構(gòu)建QueuedThreadPool時(shí),會(huì)從自己的任務(wù)隊(duì)列取出任務(wù)比吭,調(diào)用自己的runJob方法绽族,執(zhí)行Runnable邏輯(實(shí)際執(zhí)行的是ChannelEndPoint的_runFillable的run方法)
    // 大致流程: QueuedThreadPool -> QueuedThreadPool.runJob() -> ChannelEndPoint._runFillalbe -> FillInterest.fillable() -> AbstractConnection.ReadCallback.succeeded() -> HttpConnection.onFillable() ->  HttpChannelOverHttp.handle() -> Server.handle()(JettyServer構(gòu)造過(guò)程中,會(huì)把傳入的handler塞到Server中) -> 具體Server子類(lèi)的Handler的handle方法
    @Override
    public void handle(HttpServletRequest request, HttpServletResponse response)
            throws IOException, ServletException {
        String uri = request.getRequestURI();
        HessianSkeleton skeleton = skeletonMap.get(uri);
        //僅支持post方法衩藤,貌似2.7.1以上版本支持其他類(lèi)型請(qǐng)求
        if (!request.getMethod().equalsIgnoreCase("POST")) {
            response.setStatus(500);
        } else {
            RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
            Enumeration<String> enumeration = request.getHeaderNames();
            while (enumeration.hasMoreElements()) {
                String key = enumeration.nextElement();
                if (key.startsWith(Constants.DEFAULT_EXCHANGER)) {
                   RpcContext.getContext().setAttachment(key.substring(Constants.DEFAULT_EXCHANGER.length()),
                            request.getHeader(key));
                }
            }

            try {
                // 調(diào)用Hesssian的invoke方法
                skeleton.invoke(request.getInputStream(), response.getOutputStream());
            } catch (Throwable e) {
                throw new ServletException(e);
            }
        }
    }
}

再來(lái)看WebServiceHandler吧慢,借助apache的cxf,使用ServletController 完成rpc請(qǐng)求過(guò)程赏表,代碼如下:

private class WebServiceHandler implements HttpHandler {
    private volatile ServletController servletController;
    @Override
    public void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
        if (servletController == null) {
            HttpServlet httpServlet = DispatcherServlet.getInstance();
            if (httpServlet == null) {
                response.sendError(500, "No such DispatcherServlet instance.");
                return;
            }
            synchronized (this) {
                if (servletController == null) {
                    servletController = new ServletController(transportFactory.getRegistry(), httpServlet.getServletConfig(), httpServlet);
                }
            }
        }
        RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
        servletController.invoke(request, response);
    }
}

HttpProtocol對(duì)應(yīng)的Handler實(shí)現(xiàn)即InternalHandler检诗,利用spring的httpinvoker包中的HttpInvokerServiceExporter實(shí)現(xiàn)對(duì)請(qǐng)求的處理匈仗。代碼如下

private class InternalHandler implements HttpHandler {
    @Override
    public void handle(HttpServletRequest request, HttpServletResponse response)
            throws IOException, ServletException {
        String uri = request.getRequestURI();
        HttpInvokerServiceExporter skeleton = skeletonMap.get(uri);
        // 同樣僅支持post請(qǐng)求
        if (!request.getMethod().equalsIgnoreCase("POST")) {
            response.setStatus(500);
        } else {
            RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
            try {
                skeleton.handleRequest(request, response);
            } catch (Throwable e) {
                throw new ServletException(e);
            }
        }
    }
}

7、HessianProtocol

在介紹完AbstractProxyProtocol及相關(guān)輔助接口之后逢慌,我們來(lái)看HessianProtocol悠轩,重點(diǎn)關(guān)注doExport、doRefer方法,也是HessianProtocol服務(wù)暴露與服務(wù)引用的主要邏輯所在攻泼。Hessian的doExport核心邏輯火架,可以概括為兩部分,創(chuàng)建HttpServer和創(chuàng)建Exporter并返回忙菠。創(chuàng)建HttpServer的過(guò)程借助HttpBinder實(shí)現(xiàn)何鸡,即調(diào)用HttpBinder的bind方法生成對(duì)應(yīng)server,默認(rèn)的實(shí)現(xiàn)是JettyHttpServer牛欢。生成Exporter則更為簡(jiǎn)單骡男,只是創(chuàng)建了一個(gè)Runnable,用于實(shí)現(xiàn)Exporter的unExport方法氢惋,下面來(lái)看代碼:

protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException {
    String addr = getAddr(url);
    // 先從server緩存取
    HttpServer server = serverMap.get(addr);
    if (server == null) {
        // 取不到則新建JettyHttpServer洞翩,并啟動(dòng)
        server = httpBinder.bind(url, new HessianHandler());
        serverMap.put(addr, server);
    }
    final String path = url.getAbsolutePath();
    final HessianSkeleton skeleton = new HessianSkeleton(impl, type);
    skeletonMap.put(path, skeleton);

    // 通用服務(wù)
    final String genericPath = path + "/" + Constants.GENERIC_KEY;
    skeletonMap.put(genericPath, new HessianSkeleton(impl, GenericService.class));

    return new Runnable() {
        @Override
        public void run() {
            skeletonMap.remove(path);
            skeletonMap.remove(genericPath);
        }
    };
}

再來(lái)看doRefer(),核心邏輯同樣可以分為兩部分稽犁,創(chuàng)建HessianProxyFactory焰望、利用HessianProxyFactory生成接口的代理實(shí)現(xiàn)并返回

@Override
@SuppressWarnings("unchecked")
protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException {
    String generic = url.getParameter(Constants.GENERIC_KEY);
    boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);
    if (isGeneric) {
        RpcContext.getContext().setAttachment(Constants.GENERIC_KEY, generic);
        url = url.setPath(url.getPath() + "/" + Constants.GENERIC_KEY);
    }

    HessianProxyFactory hessianProxyFactory = new HessianProxyFactory();
    boolean isHessian2Request = url.getParameter(Constants.HESSIAN2_REQUEST_KEY, Constants.DEFAULT_HESSIAN2_REQUEST);
    hessianProxyFactory.setHessian2Request(isHessian2Request);
    boolean isOverloadEnabled = url.getParameter(Constants.HESSIAN_OVERLOAD_METHOD_KEY, Constants.DEFAULT_HESSIAN_OVERLOAD_METHOD);
    hessianProxyFactory.setOverloadEnabled(isOverloadEnabled);
    String client = url.getParameter(Constants.CLIENT_KEY, Constants.DEFAULT_HTTP_CLIENT);
    // 客戶端連接方式,httpclient,默認(rèn)值jdk
    if ("httpclient".equals(client)) {
        HessianConnectionFactory factory = new HttpClientConnectionFactory();
        factory.setHessianProxyFactory(hessianProxyFactory);
        hessianProxyFactory.setConnectionFactory(factory);
    } else if (client != null && client.length() > 0 && !Constants.DEFAULT_HTTP_CLIENT.equals(client)) {
        // 非默認(rèn)值,則直接拋異常
        throw new IllegalStateException("Unsupported http protocol client=\"" + client + "\"!");
    } else {
        // 默認(rèn)采用Hessian連接
        HessianConnectionFactory factory = new DubboHessianURLConnectionFactory();
        factory.setHessianProxyFactory(hessianProxyFactory);
        hessianProxyFactory.setConnectionFactory(factory);
    }
    int timeout = url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    hessianProxyFactory.setConnectTimeout(timeout);
    hessianProxyFactory.setReadTimeout(timeout);
    // 利用hessianProxyFactory生成引用服務(wù)實(shí)例已亥,實(shí)際上是采用jdk的動(dòng)態(tài)代理熊赖,生成serviceType實(shí)例
    // 前面部分完成HessianProxyFactory的各項(xiàng)初始化工作,執(zhí)行create操作時(shí)虑椎,方法內(nèi)部創(chuàng)建HessianProxy(實(shí)現(xiàn)JDK動(dòng)態(tài)代理的InvocationHandler接口)實(shí)例震鹉,HessianProxy的_factory引用了當(dāng)前的HessianProxyFactory對(duì)象,用于創(chuàng)建hessian連接時(shí)開(kāi)啟連接捆姜。整個(gè)hessian連接開(kāi)啟流程可以表示為:serviceType$Proxy.method -> HessianProxy.invoke ->  HessianProxy.sendRequest -> HessianProxyFactory.getConnectionFactory -> HessianConnectionFactory.open -> AbstractHessianOutput.call(執(zhí)行被代理的方法,結(jié)果存放connection的outputStream) -> invoke方法調(diào)用結(jié)束
    // 這里有個(gè)疑問(wèn)传趾,每次調(diào)用結(jié)束之后HessianConnection都會(huì)直接關(guān)閉,下次請(qǐng)求過(guò)來(lái)再重新開(kāi)啟泥技,如何保證性能浆兰?
    return (T) hessianProxyFactory.create(serviceType, url.setProtocol("http").toJavaURL(), Thread.currentThread().getContextClassLoader());
}

8、HttpProtocol

HttpProtocol借助Spring的httpinvoker以及HttpInvokerProxyFactoryBean實(shí)現(xiàn)服務(wù)的暴露和引用珊豹。服務(wù)暴露過(guò)程與HessianProtocol類(lèi)似簸呈,包括兩個(gè)步驟:創(chuàng)建Server、創(chuàng)建Exporter并返回店茶,其中Server的創(chuàng)建過(guò)程同樣借助前面提到的HttpBinder實(shí)現(xiàn)蜕便;服務(wù)引用則借助Spring的FactoryBean實(shí)現(xiàn),即HttpInvokerProxyFactoryBean贩幻,最終返回的引用實(shí)例是FactoryBean.getObject轿腺。代碼比較簡(jiǎn)單两嘴,這里就省略了。

9族壳、RestProtocol

RestProtocol的doExport溶诞、doRefer方法核心邏輯與HttpProtocol大同小異,不同之處在于决侈,1螺垢、dubbo為RestProtocol獨(dú)立抽象出一個(gè)RestServer接口,也就是說(shuō)在doExport過(guò)程中,創(chuàng)建的server是RestServer赖歌;2枉圃、服務(wù)引用過(guò)程借助于resteasy工具實(shí)現(xiàn),最終采用ResteasyWebTarget生成代理服務(wù)實(shí)例(內(nèi)部實(shí)際上還是JDK的動(dòng)態(tài)代理實(shí)現(xiàn))庐冯。這里我們?cè)敿?xì)介紹一下RestServer以及RestProtocol的服務(wù)引用的過(guò)程孽亲,先來(lái)看RestServer接口。

RestServer定義了REST服務(wù)的啟動(dòng)(start)展父、部署(deploy)返劲、解除部署(undeploy)、停止(stop)等方法栖茉,其中篮绿,啟動(dòng)、部署吕漂、解除部署由基類(lèi)BaseRestServer實(shí)現(xiàn)亲配,stop方法則由具體子類(lèi)實(shí)現(xiàn)。另外惶凝,BaseRestServer定義模板方法doStart吼虎、getDeployment,由子類(lèi)具體實(shí)現(xiàn)苍鲜。來(lái)看子類(lèi)DubboHttpServer(名字取得是不是容易讓人誤解)思灰、NettyServer。

9.1混滔、DubboHttpServer

重點(diǎn)關(guān)注DubboHttpServer的doStrart方法洒疚,核心邏輯是創(chuàng)建HttpServer,初始化dispatcher遍坟。借助了resteasy的HttpServletDispatcher和ResteasyDeployment

@Override
protected void doStart(URL url) {
    // TODO jetty will by default enable keepAlive so the xml config has no effect now
    // 這里也借助了dubbo抽象的HttpBinder
    httpServer = httpBinder.bind(url, new RestHandler());

    ServletContext servletContext = ServletManager.getInstance().getServletContext(url.getPort());
    if (servletContext == null) {
        servletContext = ServletManager.getInstance().getServletContext(ServletManager.EXTERNAL_SERVER_PORT);
    }
    if (servletContext == null) {
        throw new RpcException("No servlet context found. If you are using server='servlet', " +
                "make sure that you've configured " + BootstrapListener.class.getName() + " in web.xml");
    }
    servletContext.setAttribute(ResteasyDeployment.class.getName(), deployment);
    try {
        // 初始化dispatcher
        dispatcher.init(new SimpleServletConfig(servletContext));
    } catch (ServletException e) {
        throw new RpcException(e);
    }
}

9.2拳亿、NettyServer

NettyServer比較簡(jiǎn)單,核心邏輯再doStart方法愿伴,負(fù)責(zé)初始化server(*resteasy的NettyJaxrsServer實(shí)例*)肺魁,并啟動(dòng):
@Override
protected void doStart(URL url) {
    String bindIp = url.getParameter(Constants.BIND_IP_KEY, url.getHost());
    if (!url.isAnyHost() && NetUtils.isValidLocalHost(bindIp)) {
        server.setHostname(bindIp);
    }
    // NettyJaxrsServer實(shí)例與ip地址、端口綁定隔节,核心參數(shù)初始化
    server.setPort(url.getParameter(Constants.BIND_PORT_KEY, url.getPort()));
    Map<ChannelOption, Object> channelOption = new HashMap<ChannelOption, Object>();
    channelOption.put(ChannelOption.SO_KEEPALIVE, url.getParameter(Constants.KEEP_ALIVE_KEY, Constants.DEFAULT_KEEP_ALIVE));
    server.setChildChannelOptions(channelOption);
    server.setExecutorThreadCount(url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS));
    server.setIoWorkerCount(url.getParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    server.setMaxRequestSize(url.getParameter(Constants.PAYLOAD_KEY, Constants.DEFAULT_PAYLOAD));
    // 啟動(dòng)serfer  
    server.start();
}

來(lái)看doRefer過(guò)程鹅经,核心邏輯是初始化ResteasyClient寂呛,并將client加入list緩存,然后瘾晃,構(gòu)建ResteasyWebTarget贷痪,再通過(guò)ResteasyWebTarget的代理方法生成引用實(shí)例的代理,并返回蹦误,這里省略代碼劫拢。

10、RmiProtocol

RmiProtocol的doExport邏輯强胰,通過(guò)spring的RmiServiceExporter實(shí)現(xiàn)

@Override
protected <T> Runnable doExport(final T impl, Class<T> type, URL url) throws RpcException {
    //初始化RmiServiceExporter舱沧,設(shè)置相關(guān)參數(shù)
    final RmiServiceExporter rmiServiceExporter = new RmiServiceExporter();
    rmiServiceExporter.setRegistryPort(url.getPort());
    rmiServiceExporter.setServiceName(url.getPath());
    rmiServiceExporter.setServiceInterface(type);
    rmiServiceExporter.setService(impl);
    try {
        // spring擴(kuò)展邏輯
        rmiServiceExporter.afterPropertiesSet();
    } catch (RemoteException e) {
        throw new RpcException(e.getMessage(), e);
    }
    return new Runnable() {
        @Override
        public void run() {
            try {
                rmiServiceExporter.destroy();
            } catch (Throwable e) {
                logger.warn(e.getMessage(), e);
            }
        }
    };
}

doRefer方法則借助spring的RmiProxyFactoryBean實(shí)現(xiàn),兼容2.7.0以下版本邏輯偶洋;方法最終返回RmiProxyFactoryBean所代理的bean實(shí)例熟吏。

protected <T> T doRefer(final Class<T> serviceType, final URL url) throws RpcException {
    // 初始化RmiProxyFactoryBean
    final RmiProxyFactoryBean rmiProxyFactoryBean = new RmiProxyFactoryBean();
    if (isRelease270OrHigher(url.getParameter(Constants.RELEASE_KEY))) {
        rmiProxyFactoryBean.setRemoteInvocationFactory(RmiRemoteInvocation::new);
    } else if (isRelease263OrHigher(url.getParameter(Constants.DUBBO_VERSION_KEY))) {
        rmiProxyFactoryBean.setRemoteInvocationFactory(com.alibaba.dubbo.rpc.protocol.rmi.RmiRemoteInvocation::new);
    }
    rmiProxyFactoryBean.setServiceUrl(url.toIdentityString());
    rmiProxyFactoryBean.setServiceInterface(serviceType);
    rmiProxyFactoryBean.setCacheStub(true);
    rmiProxyFactoryBean.setLookupStubOnStartup(true);
    rmiProxyFactoryBean.setRefreshStubOnConnectFailure(true);
    rmiProxyFactoryBean.afterPropertiesSet();
    // 返回factoryBean代理對(duì)象實(shí)例
    return (T) rmiProxyFactoryBean.getObject();
}

11、WebServiceProtocol

WebServiceProtocol的doExport方法玄窝,借助apache的cxf(webService框架)工具包牵寺,通過(guò)ServerFactoryBean完成服務(wù)與實(shí)現(xiàn)的綁定:

@Override
protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException {
    String addr = getAddr(url);
    HttpServer httpServer = serverMap.get(addr);
    // 借助HttpBinder創(chuàng)建HttpServer
    if (httpServer == null) {
        httpServer = httpBinder.bind(url, new WebServiceHandler());
        serverMap.put(addr, httpServer);
    }
    final ServerFactoryBean serverFactoryBean = new ServerFactoryBean();
    serverFactoryBean.setAddress(url.getAbsolutePath());
    serverFactoryBean.setServiceClass(type);
    serverFactoryBean.setServiceBean(impl);
    serverFactoryBean.setBus(bus);
    serverFactoryBean.setDestinationFactory(transportFactory);
    serverFactoryBean.create();
    // Exporter匿名類(lèi),內(nèi)部邏輯實(shí)現(xiàn)
    return new Runnable() {
        @Override
        public void run() {
            if(serverFactoryBean.getServer()!= null) {
                serverFactoryBean.getServer().destroy();
            }
            if(serverFactoryBean.getBus()!=null) {
                serverFactoryBean.getBus().shutdown(true);
            }
        }
    };
}

doRefer方法也是借助apache的cxf(webService框架)恩脂,通過(guò)ClientProxyFactoryBean完成應(yīng)用服務(wù)的實(shí)例化帽氓,并返回該實(shí)例。

@Override
@SuppressWarnings("unchecked")
protected <T> T doRefer(final Class<T> serviceType, final URL url) throws RpcException {
    // 創(chuàng)建ClientProxyFactoryBean實(shí)例
    ClientProxyFactoryBean proxyFactoryBean = new ClientProxyFactoryBean();
    proxyFactoryBean.setAddress(url.setProtocol("http").toIdentityString());
    proxyFactoryBean.setServiceClass(serviceType);
    proxyFactoryBean.setBus(bus);
    // 動(dòng)態(tài)代理創(chuàng)建服務(wù)引用實(shí)例
    T ref = (T) proxyFactoryBean.create();
    Client proxy = ClientProxy.getClient(ref);
    HTTPConduit conduit = (HTTPConduit) proxy.getConduit();
    HTTPClientPolicy policy = new HTTPClientPolicy();
    policy.setConnectionTimeout(url.getParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT));
    policy.setReceiveTimeout(url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
    conduit.setClient(policy);
    return ref;
}

總結(jié)

Protocol在dubbo中的位置非常重要东亦,本文按照接口杏节、代理實(shí)現(xiàn)、直接實(shí)現(xiàn)三個(gè)部分對(duì)Protocol做了解析典阵。總的來(lái)講镊逝,Dubbo中Protocol的實(shí)現(xiàn)分為兩種壮啊,1、代理實(shí)現(xiàn)撑蒜,比如RegistryProtocol歹啼,并不直接實(shí)現(xiàn)Protocol,而是借助內(nèi)部引用實(shí)例完成服務(wù)暴露座菠、引用狸眼;2、直接實(shí)現(xiàn)浴滴,比如DubboProtocol拓萌,Server -> Exchanger -> Transporter -> 利用Netty建立socket連接,執(zhí)行具體的服務(wù)暴露升略、引用微王;直接實(shí)現(xiàn)中還有一類(lèi)屡限,即借助web容器比如Jetty、tomcat炕倘、servlet或者三方框架如apache的cxf實(shí)現(xiàn)server的創(chuàng)建和啟動(dòng),然后將dubbo服務(wù)URL钧大、端口與server綁定,完成服務(wù)的暴露罩旋。

注:源碼版本2.7.1啊央,歡迎指正。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末涨醋,一起剝皮案震驚了整個(gè)濱河市劣挫,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌东帅,老刑警劉巖压固,帶你破解...
    沈念sama閱讀 218,036評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異靠闭,居然都是意外死亡帐我,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)愧膀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)拦键,“玉大人,你說(shuō)我怎么就攤上這事檩淋》椅” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,411評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵蟀悦,是天一觀的道長(zhǎng)媚朦。 經(jīng)常有香客問(wèn)我,道長(zhǎng)日戈,這世上最難降的妖魔是什么询张? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,622評(píng)論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮浙炼,結(jié)果婚禮上份氧,老公的妹妹穿的比我還像新娘。我一直安慰自己弯屈,他們只是感情好蜗帜,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,661評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著资厉,像睡著了一般厅缺。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,521評(píng)論 1 304
  • 那天店归,我揣著相機(jī)與錄音阎抒,去河邊找鬼。 笑死消痛,一個(gè)胖子當(dāng)著我的面吹牛且叁,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播秩伞,決...
    沈念sama閱讀 40,288評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼逞带,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了纱新?” 一聲冷哼從身側(cè)響起展氓,我...
    開(kāi)封第一講書(shū)人閱讀 39,200評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎脸爱,沒(méi)想到半個(gè)月后遇汞,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,644評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡簿废,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,837評(píng)論 3 336
  • 正文 我和宋清朗相戀三年空入,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片族檬。...
    茶點(diǎn)故事閱讀 39,953評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡歪赢,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出单料,到底是詐尸還是另有隱情埋凯,我是刑警寧澤,帶...
    沈念sama閱讀 35,673評(píng)論 5 346
  • 正文 年R本政府宣布扫尖,位于F島的核電站白对,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏藏斩。R本人自食惡果不足惜躏结,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,281評(píng)論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望狰域。 院中可真熱鬧,春花似錦黄橘、人聲如沸兆览。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,889評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)抬探。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間小压,已是汗流浹背线梗。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,011評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留怠益,地道東北人仪搔。 一個(gè)月前我還...
    沈念sama閱讀 48,119評(píng)論 3 370
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像蜻牢,于是被迫代替她去往敵國(guó)和親烤咧。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,901評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容