Dubbo中的服務(wù)引用

本系列主要參考官網(wǎng)文檔基跑、芋道源碼的源碼解讀和《深入理解Apache Dubbo與實(shí)戰(zhàn)》一書糜烹。Dubbo版本為2.6.1养泡。

文章內(nèi)容順序:
1.服務(wù)引用的介紹
2.服務(wù)引用的入口方法getObject()=>createProxy()方法介紹

3.本地引用

  • 3.1是否為本地引用的判別方法璧微,InjvmProtocol#isInjvmRefer
  • 3.2createProxy()中的本地引用鏈路
  • 3.3InjvmIvoker介紹,引出proxyFactory

4.proxyFactory擴(kuò)展點(diǎn)

  • 4.1proxyFactory的包裝類StubProxyFactoryWrapper,本地存根的作用
  • 4.2擴(kuò)展類JavassistProxyFactory的getProxy(Invoker)
  • 4.3Proxy 實(shí)例中傳入了InvokerInvocationHandler類的意義
  • 4.4JavassistProxyFactory生成的代碼樣例及作用
  • 4.5服務(wù)引用存在哪邑雅?

5.遠(yuǎn)程引用

  • 5.1 createProxy()中的遠(yuǎn)程引用鏈路
  • 5.2只配置了一個(gè)注冊(cè)中心的遠(yuǎn)程引用
  • 5.3RegistryProtocol#refer()
  • 5.4直連時(shí)的遠(yuǎn)程引用
  • 5.5new DubboInvoker時(shí)的getClients(url)
  • 5.6getClients(url)中的getSharedClient(url)
  • 5.7getClients(url)中的initClient(url)
  • 5.8initClient(url)中的自適應(yīng)Exchangers#connect()
  • 5.9多注冊(cè)中心時(shí)鏈路

1.服務(wù)引用的介紹

來自官網(wǎng)的服務(wù)引用介紹:

  • Dubbo 服務(wù)引用的時(shí)機(jī)有兩個(gè)嗦玖,第一個(gè)是在 Spring 容器調(diào)用 ReferenceBean 的 afterPropertiesSet方法時(shí)引用服務(wù)患雇,第二個(gè)是在 ReferenceBean對(duì)應(yīng)的服務(wù)被注入到其他類中時(shí)引用。這兩個(gè)引用服務(wù)的時(shí)機(jī)區(qū)別在于宇挫,第一個(gè)是餓漢式的苛吱,第二個(gè)是懶漢式的。
  • 默認(rèn)情況下器瘪,Dubbo 使用懶漢式引用服務(wù)翠储。如果需要使用餓漢式,可通過配置 <dubbo:reference> 的 init 屬性開啟橡疼。下面我們按照 Dubbo 默認(rèn)配置進(jìn)行分析援所,整個(gè)分析過程從 ReferenceBean 的 getObject方法開始。當(dāng)我們的服務(wù)被注入到其他類中時(shí)欣除,Spring 會(huì)第一時(shí)間調(diào)用 getObject 方法住拭,并由該方法執(zhí)行服務(wù)引用邏輯。
  • 按照慣例历帚,在進(jìn)行具體工作之前滔岳,需先進(jìn)行配置檢查與收集工作。接著根據(jù)收集到的信息決定服務(wù)用的方式挽牢,有三種谱煤,第一種是引用本地 (JVM) 服務(wù),第二是通過直連方式引用遠(yuǎn)程服務(wù)卓研,第三是通過注冊(cè)中心引用遠(yuǎn)程服務(wù)趴俘。不管是哪種引用方式睹簇,最后都會(huì)得到一個(gè) Invoker 實(shí)例。
  • 如果有多個(gè)注冊(cè)中心寥闪,多個(gè)服務(wù)提供者太惠,這個(gè)時(shí)候會(huì)得到一組 Invoker 實(shí)例,此時(shí)需要通過集群管理類 Cluster 將多個(gè) Invoker 合并成一個(gè)實(shí)例疲憋。合并后的 Invoker 實(shí)例已經(jīng)具備調(diào)用本地或遠(yuǎn)程服務(wù)的能力了凿渊,但并不能將此實(shí)例暴露給用戶使用,這會(huì)對(duì)用戶業(yè)務(wù)代碼造成侵入缚柳。此時(shí)框架還需要通過代理工廠類 (ProxyFactory) 為服務(wù)接口生成代理類埃脏,并讓代理類去調(diào)用 Invoker 邏輯。避免了 Dubbo 框架代碼對(duì)業(yè)務(wù)代碼的侵入秋忙,同時(shí)也讓框架更容易使用彩掐。
image.png

本圖暫不考慮集群容錯(cuò)、網(wǎng)絡(luò)調(diào)用灰追、序列化反序列

2.服務(wù)引用的入口,getObject()=>createProxy()方法介紹

那么就從ReferenceBean的getObject開始吧:

public Object getObject() throws Exception {
    return get();
}

public synchronized T get() {
    if (destroyed) {
        throw new IllegalStateException("Already destroyed!");
    }
    // 檢測(cè) ref (即service對(duì)象)是否為空堵幽,為空則通過 init 方法創(chuàng)建
    if (ref == null) {
        // init 方法主要用于處理配置,以及調(diào)用 createProxy 生成代理類
        init();
    }
    return ref;
}

注意這個(gè)ReferenceConfig#init()弹澎,主要邏輯就是配置解析封裝(實(shí)在太長就不貼了朴下,而且解析配置不在討論的點(diǎn)),該實(shí)例包含了事件通知配置苦蒿,比如 onreturn殴胧、onthrow、oninvoke 等佩迟。并在最后調(diào)用 createProxy ()創(chuàng)建代理對(duì)象团滥。

從createProxy 我們也將開始真正的引用鏈路。

  //map為 `side`报强,`dubbo`惫撰,`timestamp`,`pid`等 參數(shù)
    private T createProxy(Map<String, String> map) {
        URL tmpUrl = new URL("temp", "localhost", 0, map);
        // 是否本地引用
        final boolean isJvmRefer;
        // injvm 屬性為空躺涝,不通過該屬性判斷
        if (isInjvm() == null) {
            // 直連服務(wù)提供者,參見文檔《直連提供者》https://dubbo.gitbooks.io/dubbo-user-book/demos/explicit-target.html
            if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
                isJvmRefer = false;
            // 通過 `tmpUrl` 判斷扼雏,是否需要本地引用
            } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                // by default, reference local service if there is
                isJvmRefer = true;
            // 默認(rèn)不是
            } else {
                isJvmRefer = false;
            }
        // 通過 injvm 屬性坚嗜。
        } else {
            isJvmRefer = isInjvm();
        }

        // 本地引用
        if (isJvmRefer) {
            // 創(chuàng)建本地服務(wù)引用 URL 對(duì)象。
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            // 引用服務(wù)诗充,返回 Invoker 對(duì)象
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        // 正常流程苍蔬,一般為遠(yuǎn)程引用
        } else {
            // 定義直連地址,可以是服務(wù)提供者的地址蝴蜓,也可以是注冊(cè)中心的地址
            if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                // 拆分地址成數(shù)組碟绑,使用 ";" 分隔俺猿。
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                // 循環(huán)數(shù)組,添加到 `url` 中格仲。
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        // 創(chuàng)建 URL 對(duì)象
                        URL url = URL.valueOf(u);
                        // 設(shè)置默認(rèn)路徑
                        if (url.getPath() == null || url.getPath().length() == 0) {
                            url = url.setPath(interfaceName);
                        }
                        // 注冊(cè)中心的地址押袍,帶上服務(wù)引用的配置參數(shù)
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        // 服務(wù)提供者的地址
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            // 注冊(cè)中心
            } else { // assemble URL from register center's configuration
                // 加載注冊(cè)中心 URL 數(shù)組
                List<URL> us = loadRegistries(false);
                // 循環(huán)數(shù)組火本,添加到 `url` 中扰才。
                if (us != null && !us.isEmpty()) {
                    for (URL u : us) {
                        // 加載監(jiān)控中心 URL
                        URL monitorUrl = loadMonitor(u);
                        // 服務(wù)引用配置對(duì)象 `map`,帶上監(jiān)控中心的 URL
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        // 注冊(cè)中心的地址凌停,帶上服務(wù)引用的配置參數(shù)
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); // 注冊(cè)中心侮东,帶上服務(wù)引用的配置參數(shù)
                    }
                }
                if (urls.isEmpty()) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }

            // 單 `urls` 時(shí)圈盔,引用服務(wù),返回 Invoker 對(duì)象
            if (urls.size() == 1) {
                // 引用服務(wù)
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                // 循環(huán) `urls` 悄雅,引用服務(wù)驱敲,返回 Invoker 對(duì)象
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    // 引用服務(wù)
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    // 使用最后一個(gè)注冊(cè)中心的 URL
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
                // 有注冊(cè)中心
                if (registryURL != null) { // registry url is available
                    // 對(duì)有注冊(cè)中心的 Cluster 只用 AvailableCluster
                    // use AvailableCluster only when register's cluster is available
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                // 無注冊(cè)中心,全部都是服務(wù)直連
                } else { // not a registry url
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }

        // 啟動(dòng)時(shí)檢查
        Boolean c = check;
        if (c == null && consumer != null) {
            c = consumer.isCheck();
        }
        if (c == null) {
            c = true; // default true
        }
        if (c && !invoker.isAvailable()) {
            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }

        // 創(chuàng)建 Service 代理對(duì)象
        // create service proxy
        return (T) proxyFactory.getProxy(invoker);
    }

3.createProxy()中的本地引用鏈路

3.1是否為本地引用的判別方法宽闲,InjvmProtocol#isInjvmRefer

同樣的众眨,服務(wù)引用也分為本地引用和遠(yuǎn)程引用,本地引用還是遠(yuǎn)程引用是從URL來辨別的便锨,先從本地引用來講起围辙。
這邊來簡(jiǎn)單介紹下他的判別方法InjvmProtocol#isInjvmRefer,也比較簡(jiǎn)單

public boolean isInjvmRefer(URL url) {
        final boolean isJvmRefer;
        String scope = url.getParameter(Constants.SCOPE_KEY);
        // Since injvm protocol is configured explicitly, we don't need to set any extra flag, use normal refer process.
        // 當(dāng) `protocol = injvm` 時(shí)放案,本身已經(jīng)是 jvm 協(xié)議了姚建,走正常流程就是了。
        if (Constants.LOCAL_PROTOCOL.toString().equals(url.getProtocol())) {
            isJvmRefer = false;
        // 當(dāng) `scope = local` 或者 `injvm = true` 時(shí)吱殉,本地引用
        } else if (Constants.SCOPE_LOCAL.equals(scope) || (url.getParameter("injvm", false))) {
            // if it's declared as local reference
            // 'scope=local' is equivalent to 'injvm=true', injvm will be deprecated in the future release
            isJvmRefer = true;
        // 當(dāng) `scope = remote` 時(shí)掸冤,遠(yuǎn)程引用
        } else if (Constants.SCOPE_REMOTE.equals(scope)) {
            // it's declared as remote reference
            isJvmRefer = false;
        // 當(dāng) `generic = true` 時(shí),即使用泛化調(diào)用友雳,遠(yuǎn)程引用稿湿。
        } else if (url.getParameter(Constants.GENERIC_KEY, false)) {
            // generic invocation is not local reference
            isJvmRefer = false;
        // 當(dāng)本地已經(jīng)有該 Exporter 時(shí),本地引用
        } else if (getExporter(exporterMap, url) != null) {
            // by default, go through local reference if there's the service exposed locally
            isJvmRefer = true;
        // 默認(rèn)押赊,遠(yuǎn)程引用
        } else {
            isJvmRefer = false;
        }
        return isJvmRefer;
    }
}

上面的方法沒什么好說的饺藤,都已經(jīng)注釋好啦。

createProxy()中的本地引用鏈路

這邊我們先走本地引用的鏈路流礁,再貼一下createProxy()的部分代碼如下涕俗,注意到在創(chuàng)建本地服務(wù)引用 URL 對(duì)象時(shí),已經(jīng)把Protocol設(shè)置成InjvmProtocol了神帅,

  // 本地引用
        if (isJvmRefer) {
            // 創(chuàng)建本地服務(wù)引用 URL 對(duì)象再姑。
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            // 引用服務(wù),返回 Invoker 對(duì)象
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }

看到直接調(diào)用了Protocol#refer(interface, url)找御,根據(jù)url獲得對(duì)應(yīng) Protocol 拓展實(shí)現(xiàn)為 InjvmProtocol 元镀。同樣的還是SPI機(jī)制绍填,調(diào)用鏈路是:
Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => InjvmProtocol,已經(jīng)在SPI和服務(wù)暴露一文提過很多次了栖疑,包裝類里export()refer()的邏輯都差不多讨永,就不再介紹。直接來看Protocol#refer(interface, url)做了什么吧蔽挠。

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap);
    }

非常樸素的new了InjvmInvoker,在進(jìn)去看看他干了什么住闯。

3.3InjvmIvoker介紹,引出proxyFactory

    /**
     * Exporter 集合
     *
     * key: 服務(wù)鍵
     *
     * 該值實(shí)際就是 {@link com.alibaba.dubbo.rpc.protocol.AbstractProtocol#exporterMap}
     */
    private final Map<String, Exporter<?>> exporterMap;

    InjvmInvoker(Class<T> type, URL url, String key, Map<String, Exporter<?>> exporterMap) {
        super(type, url);
        this.key = key;
        this.exporterMap = exporterMap;
    }
  • 最后返回的就是這個(gè)InjvmIvoker,Invoker 是 Dubbo 的核心模型澳淑,代表一個(gè)可執(zhí)行體比原,這個(gè)InjvmIvoker他將一些屬性和本地的緩存聚合到一起形成了一個(gè)Invoker。
    每個(gè)InjvmInvoker都會(huì)持有一個(gè)指向本地緩存的指針杠巡。
  • 拿到這個(gè)Invoker后量窘,在ReferenceConfig#createProxy最后調(diào)用
// 創(chuàng)建 Service 代理對(duì)象
       // create service proxy
       return (T) proxyFactory.getProxy(invoker);

調(diào)用 ProxyFactory#getProxy(invoker)方法,創(chuàng)建 Service 代理對(duì)象氢拥。這邊的proxyFactory同樣也是擴(kuò)展點(diǎn)蚌铜,由傳入的invoker.url來決定調(diào)用哪個(gè)。
順帶一提 Service 代理對(duì)象的內(nèi)部嫩海,可以調(diào)用 Invoker#invoke(Invocation)方法冬殃,進(jìn)行 Dubbo 服務(wù)的調(diào)用。
那么最后叁怪,getProxy()到底是怎么創(chuàng)建代理對(duì)象的呢审葬?

4.proxyFactory擴(kuò)展點(diǎn)

image.png

ProxyFactory同樣也是個(gè)擴(kuò)展類,有Stub包裝類奕谭,還有兩個(gè)擴(kuò)展類實(shí)現(xiàn)涣觉,可以自由決定用哪個(gè)Factory。Invoker通過Javassist動(dòng)態(tài)代理或者JDK動(dòng)態(tài)代理血柳,兩者都是通過生成字節(jié)碼實(shí)現(xiàn)的官册。

而技術(shù)的選型可以看下這張圖作者的解釋


image.png

4.1proxyFactory的包裝類StubProxyFactoryWrapper

  • ProxyFactory#getProxy(invoker)`鏈路其實(shí)是這樣的,StubProxyFactoryWrapper的作用就是生成本地存根。


    image.png

在此之前我們先要知道本地存根的作用(也就是為什么要調(diào)用StubProxyFactoryWrapper):
遠(yuǎn)程服務(wù)后难捌,客戶端通常只剩下接口膝宁,而實(shí)現(xiàn)全在服務(wù)器端,但提供方有些時(shí)候想在客戶端也執(zhí)行部分邏輯根吁,比如:做 ThreadLocal 緩存昆汹,提前驗(yàn)證參數(shù),調(diào)用失敗后偽造容錯(cuò)數(shù)據(jù)等等婴栽,此時(shí)就需要在 API 中帶上 Stub,客戶端生成 Proxy 實(shí)例辈末,會(huì)把 Proxy 通過構(gòu)造函數(shù)傳給 Stub 愚争,然后把 Stub 暴露給用戶映皆,Stub 可以決定要不要去調(diào) Proxy。
說多了不如看一個(gè)簡(jiǎn)單的實(shí)例:Dubbo本地存根

StubProxyFactoryWrapper#getProxy(invoker)代碼如下

public <T> T getProxy(Invoker<T> invoker) throws RpcException {
        // 獲得 Service Proxy 對(duì)象,這邊調(diào)用的就是指定的proxyFactory了
        T proxy = proxyFactory.getProxy(invoker);
        if (GenericService.class != invoker.getInterface()) { // 非泛化引用
            // 獲得 `stub` 配置項(xiàng)
            String stub = invoker.getUrl().getParameter(Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY));
            if (ConfigUtils.isNotEmpty(stub)) {
                Class<?> serviceType = invoker.getInterface();
                // `stub = true` 的情況轰枝,使用接口 + `Stub` 字符串捅彻。
                if (ConfigUtils.isDefault(stub)) {
                    if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) {
                        stub = serviceType.getName() + "Stub";
                    } else {
                        stub = serviceType.getName() + "Local";
                    }
                }
                try {
                    // 加載 Stub 類
                    Class<?> stubClass = ReflectUtils.forName(stub);
                    if (!serviceType.isAssignableFrom(stubClass)) {
                        throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + serviceType.getName());
                    }
                    try {
                        Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);
                          // 創(chuàng)建 Stub 對(duì)象,使用帶 Service Proxy 對(duì)象的構(gòu)造方法
                        proxy = (T) constructor.newInstance(new Object[]{proxy});

                        // 【TODO 8033】參數(shù)回調(diào)
                        //export stub service
                        URL url = invoker.getUrl();
                        if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)) {
                            url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join(Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ","));
                            url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString());
                            try {
                                export(proxy, (Class) invoker.getInterface(), url);
                            } catch (Exception e) {
                                LOGGER.error("export a stub service error.", e);
                            }
                        }
                    } catch (NoSuchMethodException e) {
                        throw new IllegalStateException("No such constructor \"public " + stubClass.getSimpleName() + "(" + serviceType.getName() + ")\" in stub implementation class " + stubClass.getName(), e);
                    }
                } catch (Throwable t) {
                    LOGGER.error("Failed to create stub implementation class " + stub + " in consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", cause: " + t.getMessage(), t);
                    // ignore
                }
            }
        }
        return proxy;
    }
  • 動(dòng)態(tài)的用proxyFactory(默認(rèn)是JavassistFactory)來獲取Service Proxy 對(duì)象
  • 獲取配置中的stub 配置項(xiàng)鞍陨,而后調(diào)用 ReflectUtils#forName(stub) 方法步淹,加載我們自己寫的 Stub 類,注意是加載哦,就是拿到Class诚撵,初始化對(duì)象是在下面完成的缭裆。
  • 之后創(chuàng)建 Stub 對(duì)象,使用帶 Service Proxy 對(duì)象作為參數(shù)的構(gòu)造方法寿烟。例如澈驼,public DemoServiceStub(DemoService demoService)。通過這樣的方式筛武,我們就擁有了一個(gè)內(nèi)部有 Proxy Service 對(duì)象的 Stub 對(duì)象啦缝其,可以實(shí)現(xiàn)各種 OOXX 啦。最后將這個(gè)Stub對(duì)象返回徘六。
  • 再次提醒内边,所以我們有的,是這個(gè)Stub對(duì)象待锈,可以用這個(gè)對(duì)象來對(duì)執(zhí)行方法進(jìn)行一些本地的AOP攔截

4.2擴(kuò)展類JavassistProxyFactory的getProxy(Invoker)

StubProxyFactoryWrapper#getProxy(invoker)第一行就調(diào)用了擴(kuò)展類的getProxy(invoker)
既然默認(rèn)實(shí)現(xiàn)是javassist漠其,那么我們就來看看javassist都干了什么吧
先來看看他的父類,AbstractProxyFactory,

public abstract class AbstractProxyFactory implements ProxyFactory {

    public <T> T getProxy(Invoker<T> invoker) throws RpcException {
        Class<?>[] interfaces = null;
        // TODO 8022 芋艿
        String config = invoker.getUrl().getParameter("interfaces");
        if (config != null && config.length() > 0) {
            String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
            if (types != null && types.length > 0) {
                interfaces = new Class<?>[types.length + 2];
                interfaces[0] = invoker.getInterface();
                interfaces[1] = EchoService.class;
                for (int i = 0; i < types.length; i++) {
                    interfaces[i + 1] = ReflectUtils.forName(types[i]);
                }
            }
        }
        // 增加 EchoService 接口炉擅,用于回生測(cè)試辉懒。參見文檔《回聲測(cè)試》https://dubbo.gitbooks.io/dubbo-user-book/demos/echo-service.html
        if (interfaces == null) {
            interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
        }
        return getProxy(invoker, interfaces);
    }

    public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);

}

可以看到,該抽象類谍失,主要是實(shí)現(xiàn)了 #getProxy(invoker)方法眶俩,獲得需要生成代理的接口們,而后調(diào)用了我們的子類JavassistProxyFactory#getProxy(invoker, types)。接著往下看

public class JavassistProxyFactory extends AbstractProxyFactory {

    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
      /// 生成 Proxy 子類(Proxy 是抽象類)快鱼。并調(diào)用 Proxy 子類的 newInstance 方法創(chuàng)建 Proxy 實(shí)例
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        // TODO Wrapper類不能正確處理帶$的類名
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

JavassistProxyFactory#getProxy(invoker, types)return的時(shí)候生成 Proxy 子類(Proxy 是抽象類)颠印。并調(diào)用 Proxy 子類的 newInstance 方法創(chuàng)建 Proxy 實(shí)例。注意:其中傳入的參數(shù)是 InvokerInvocationHandler類抹竹,通過這樣的方式线罕,讓 proxy 和真正的邏輯代碼解耦。
那我們來看看這個(gè)InvokerInvocationHandler類到底是什么

4.3Proxy 實(shí)例中傳入了InvokerInvocationHandler類

public class InvokerInvocationHandler implements InvocationHandler {

    /**
     * Invoker 對(duì)象
     */
    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        // wait 等Object類的方法窃判,直接反射調(diào)用
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        // 基礎(chǔ)方法钞楼,不使用 RPC 調(diào)用
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        // RPC 調(diào)用
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }

}
  • 這個(gè)類就是攔截一些接口類調(diào)用的用途。將一些簡(jiǎn)單的方法本地調(diào)用袄琳,就不用浪費(fèi)網(wǎng)絡(luò)了通信了询件。
  • 這里注意InvokerInvocationHandler#invoke return的調(diào)用 invoker.invoke(new RpcInvocation(method, args)).recreate()燃乍,如果消費(fèi)者調(diào)用invoke的話,會(huì)在這里終于開始了網(wǎng)絡(luò)調(diào)用

4.4JavassistProxyFactory生成的代碼樣例及作用

最終JavassistProxyFactory生成的代碼樣例如下:

package org.apache.dubbo.common.bytecode;

public class proxy0 implements org.apache.dubbo.demo.DemoService {

    public static java.lang.reflect.Method[] methods;

    private java.lang.reflect.InvocationHandler handler;

    public proxy0() {
    }

    public proxy0(java.lang.reflect.InvocationHandler arg0) {
        handler = $1;
    }

    public java.lang.String sayHello(java.lang.String arg0) {
        Object[] args = new Object[1];
        args[0] = ($w) $1;
        Object ret = handler.invoke(this, methods[0], args);
        return (java.lang.String) ret;
    }
}

  • 這個(gè)proxy實(shí)現(xiàn)了我們的業(yè)務(wù)接口方法宛琅,使得我們自己調(diào)用業(yè)務(wù)方法刻蟹,比如sayHello的時(shí)候,可以進(jìn)行這樣的鏈路:
    InvokerInvocationHandler對(duì)象(攔截基本方法嘿辟,使其不進(jìn)行網(wǎng)絡(luò)調(diào)用舆瘪,如toString()) => 網(wǎng)絡(luò)調(diào)用
  • 而使用方對(duì)此是沒有感覺的,他只需要調(diào)用消費(fèi)者的接口红伦,Dubbo幫他實(shí)現(xiàn)了一切英古。

4.5服務(wù)引用存在哪?

別急色建,這就來
XML中的服務(wù)引用配置樣例如下:

<dubbo:reference interface="cn.com.wang.service.UserService" id="userService">
</dubbo:reference>
  • createProxy()方法最終返回的一個(gè)Stub(如果實(shí)現(xiàn)了本地存根的話)哺呜,getObject()的返回值正是這個(gè)Stub,存儲(chǔ)在ReferenceBean中。
  • ReferenceBean 有個(gè)特殊之處箕戳,實(shí)現(xiàn)了FactoryBean ,FactoryBean就是spring的工廠bean某残,工廠bean也就是說當(dāng)我們要獲取dubbo:reference中interface時(shí)也就是我們的UserService,我們會(huì)直接注入到使用類中陵吸,spring就得從容器中找玻墅。
  • 也因?yàn)樗枪Sbean,才會(huì)有上文我們提到的壮虫,調(diào)用FactoryBean的getObject()方法澳厢,這個(gè)方法返回的對(duì)象就會(huì)作為標(biāo)簽配置返回的對(duì)象。所以我們的引用囚似,也就是最后返回Service 代理對(duì)象剩拢,的實(shí)際上是存在Spring容器里的。

至此饶唤,我們簡(jiǎn)單分析了本地引用和proxyFactory的動(dòng)態(tài)代理以及本地存根的作用徐伐,接下來來看看遠(yuǎn)程引用與本地引用的區(qū)別吧。

5.遠(yuǎn)程調(diào)用

5.1 createProx()中的遠(yuǎn)程引入的鏈路

 private T createProxy(Map<String, String> map) {
     URL tmpUrl = new URL("temp", "localhost", 0, map);
     // 【省略代碼】是否本地引用
     final boolean isJvmRefer;

     if (isJvmRefer) {
    // 【省略代碼】本地引用
    } else {// 遠(yuǎn)程引用
        // url 不為空募狂,表明用戶可能想進(jìn)行點(diǎn)對(duì)點(diǎn)調(diào)用
        if (url != null && url.length() > 0) {
            // 當(dāng)需要配置多個(gè) url 時(shí)办素,可用分號(hào)進(jìn)行分割,這里會(huì)進(jìn)行切分
            String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
            if (us != null && us.length > 0) {
                for (String u : us) {
                    URL url = URL.valueOf(u);
                    if (url.getPath() == null || url.getPath().length() == 0) {
                        // 設(shè)置接口全限定名為 url 路徑
                        url = url.setPath(interfaceName);
                    }
                    
                    // 檢測(cè) url 協(xié)議是否為 registry祸穷,若是性穿,表明用戶想使用指定的注冊(cè)中心
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        // 將 map 轉(zhuǎn)換為查詢字符串,并作為 refer 參數(shù)的值添加到 url 中
                        urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    } else {
                        // 合并 url雷滚,移除服務(wù)提供者的一些配置(這些配置來源于用戶配置的 url 屬性)需曾,
                        // 比如線程池相關(guān)配置。并保留服務(wù)提供者的部分配置,比如版本胯舷,group刻蚯,時(shí)間戳等
                        // 最后將合并后的配置設(shè)置為 url 查詢字符串中。
                        urls.add(ClusterUtils.mergeUrl(url, map));
                    }
                }
            }
        } else {
            // url為空桑嘶,加載注冊(cè)中心 url,
            //注意是這里開始將不同的注冊(cè)中心循環(huán)添加到urls
            List<URL> us = loadRegistries(false);
            if (us != null && !us.isEmpty()) {
                for (URL u : us) {
                    // 加載監(jiān)控中心 URL
                    URL monitorUrl = loadMonitor(u);
                    if (monitorUrl != null) {
                        // 服務(wù)引用配置對(duì)象 `map`躬充,帶上監(jiān)控中心的 URL
                        map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                    }
                    // 添加 refer 參數(shù)到 url 中逃顶,并將 url 添加到 urls 中
                    urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                }
            }

            // 未配置注冊(cè)中心,拋出異常
            if (urls.isEmpty()) {
                throw new IllegalStateException("No such any registry to reference...");
            }
        }

        // 單個(gè)注冊(cè)中心或服務(wù)提供者(服務(wù)直連充甚,下同)
        if (urls.size() == 1) {
            // 調(diào)用 RegistryProtocol 的 refer 構(gòu)建 Invoker 實(shí)例
            invoker = refprotocol.refer(interfaceClass, urls.get(0));
            
        // 多個(gè)注冊(cè)中心或多個(gè)服務(wù)提供者以政,或者兩者混合
        } else {
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;

            // 獲取所有的 Invoker
            for (URL url : urls) {
                // 通過 refprotocol 調(diào)用 refer 構(gòu)建 Invoker,refprotocol 會(huì)在運(yùn)行時(shí)
                // 根據(jù) url 協(xié)議頭加載指定的 Protocol 實(shí)例伴找,并調(diào)用實(shí)例的 refer 方法
                invokers.add(refprotocol.refer(interfaceClass, url));
                if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    registryURL = url;
                }
            }
            if (registryURL != null) {
                // 如果注冊(cè)中心鏈接不為空盈蛮,則將使用 AvailableCluster
                URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                // 創(chuàng)建 StaticDirectory 實(shí)例,并由 Cluster 對(duì)多個(gè) Invoker 進(jìn)行合并
                invoker = cluster.join(new StaticDirectory(u, invokers));
            } else {
                invoker = cluster.join(new StaticDirectory(invokers));
            }
        }
    }

    Boolean c = check;
    if (c == null && consumer != null) {
        c = consumer.isCheck();
    }
    if (c == null) {
        c = true;
    }
    
    // invoker 可用性檢查
    if (c && !invoker.isAvailable()) {
        throw new IllegalStateException("No provider available for the service...");
    }

    // 生成代理類
    return (T) proxyFactory.getProxy(invoker);
}
  • 簡(jiǎn)單概括下遠(yuǎn)程引用的邏輯:讀取直連配置項(xiàng)技矮,或注冊(cè)中心 url抖誉,并將讀取到的 url 存儲(chǔ)到 urls 中。然后根據(jù) urls 元素?cái)?shù)量進(jìn)行后續(xù)操作衰倦。
  • 若 urls 元素?cái)?shù)量為1袒炉,則直接通過 Protocol 自適應(yīng)拓展類構(gòu)建 Invoker 實(shí)例接口。
  • 若 urls 元素?cái)?shù)量大于1樊零,即存在多個(gè)注冊(cè)中心或服務(wù)直連 url我磁,此時(shí)先根據(jù) url 構(gòu)建 Invoker。然后再通過 Cluster 合并多個(gè) Invoker驻襟,最后調(diào)用 ProxyFactory 生成代理類夺艰。
  • 對(duì)于上述的判斷,可以看下多注冊(cè)中心官網(wǎng)的配置樣例來輔助理解沉衣。多注冊(cè)中心

Invoker 的構(gòu)建過程以及代理類的過程比較重要郁副,就是refprotocol.refer(type, url)這一方法的調(diào)用鏈路。

5.2只配置了一個(gè)注冊(cè)中心的遠(yuǎn)程引用:

截取上面的方法部分如下:

  // 單個(gè)注冊(cè)中心或服務(wù)提供者
        if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));

             // 多個(gè)注冊(cè)中心或多個(gè)服務(wù)提供者厢蒜,或者兩者混合
            } else {
            //…………
        }

在上述方法中霞势,如果只配置了一個(gè)注冊(cè)中心的話,urls屬性樣例如下(配置了直連會(huì)有不同斑鸦,下面會(huì)介紹)
registry://224.5.6.7:1234/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&pid=26540&qos.port=33333&refer=application%3Ddemo-consumer%26check%3Dfalse%26dubbo%3D2.0.0%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D26540%26qos.port%3D33333%26register.ip%3D192.168.1.102%26side%3Dconsumer%26timestamp%3D1594622397172&registry=multicast&timestamp=1594622397254

image.png

調(diào)用 refprotocol.refer(type, url)方法愕贡,引用服務(wù),返回 Invoker 對(duì)象巷屿。又到了我們喜聞樂見的SPI自適應(yīng)特性固以,自動(dòng)根據(jù) URL 參數(shù),獲得對(duì)應(yīng)的拓展實(shí)現(xiàn)。
在這里我們根據(jù)url可以得到調(diào)用的是RegistryProtocol憨琳,#refer(...)方法的調(diào)用順序是:
Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => RegistryProtocol

與服務(wù)暴露時(shí)類似诫钓,ProtocolFilterWrapper 用于創(chuàng)建過濾鏈,ProtocolListenerWrapper返回了一個(gè)ListenerInvokerWrapper對(duì)象,ListenerInvokerWrapper裝飾invoker, 在構(gòu)造器中遍歷listeners構(gòu)建referer的監(jiān)聽鏈篙螟,這兩個(gè)類都會(huì)放行RegistryProtocol菌湃。也就是什么都不做。

5.35.3RegistryProtocol#refer()

直接來看RegistryProtocol的refer(type, url)做了什么吧

 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // 獲得真實(shí)的注冊(cè)中心的 URL
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        // 獲得注冊(cè)中心
        Registry registry = registryFactory.getRegistry(url);
        // TODO 芋艿
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // 將 url 查詢字符串轉(zhuǎn)為 Map,獲得服務(wù)引用配置參數(shù)集合
        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        // 分組聚合遍略,參見文檔 http://dubbo.io/books/dubbo-user-book/demos/group-merger.html
        if (group != null && group.length() > 0) {
            if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                    || "*".equals(group)) {
                // 通過 SPI 加載 MergeableCluster 實(shí)例惧所,并調(diào)用 doRefer 繼續(xù)執(zhí)行服務(wù)引用邏輯
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        // 執(zhí)行服務(wù)引用
        return doRefer(cluster, registry, type, url);
    }

上面代碼首先為 url 設(shè)置協(xié)議頭,然后根據(jù) url 參數(shù)從registryFactory加載注冊(cè)中心實(shí)例绪杏,如果用的是zk的協(xié)議下愈,那么注冊(cè)器就是zk的注冊(cè)器。然后獲取 group 配置蕾久,根據(jù) group 配置決定doRefer()第一個(gè)參數(shù)的類型势似。這里的重點(diǎn)是 doRefer()方法

 private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        // 創(chuàng)建 RegistryDirectory 對(duì)象,并設(shè)置注冊(cè)中心和協(xié)議
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
       // 服務(wù)引用配置集合
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); 
      // 生成服務(wù)消費(fèi)者鏈接
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        // 向注冊(cè)中心注冊(cè)自己(服務(wù)消費(fèi)者)
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false))); 
        }
        // 向注冊(cè)中心訂閱服務(wù)提供者 + 路由規(guī)則 + 配置規(guī)則
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                        Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));

        // 創(chuàng)建 Invoker 對(duì)象
        Invoker invoker = cluster.join(directory);
        // 向本地注冊(cè)表僧著,注冊(cè)消費(fèi)者
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }
  • doRefer 方法創(chuàng)建一個(gè)RegistryDirectory實(shí)例履因,然后生成服務(wù)者消費(fèi)者鏈接,并向注冊(cè)中心進(jìn)行注冊(cè)霹抛。
  • 注冊(cè)完畢后搓逾,緊接著訂閱 providers、configurators杯拐、routers 等節(jié)點(diǎn)下的數(shù)據(jù)霞篡。完成訂閱后,RegistryDirectory 會(huì)收到這幾個(gè)節(jié)點(diǎn)下的子節(jié)點(diǎn)信息端逼。
  • 由于一個(gè)服務(wù)可能部署在多臺(tái)服務(wù)器上朗兵,這樣就會(huì)在 providers 產(chǎn)生多個(gè)節(jié)點(diǎn),這個(gè)時(shí)候就需要 Cluster 將多個(gè)服務(wù)節(jié)點(diǎn)合并為一個(gè)顶滩,并生成一個(gè) Invoker余掖。
    注意這個(gè)cluster,他也是通過SPI機(jī)制依賴注入來的礁鲁,(關(guān)于Cluster盐欺,RegistryDirectory的內(nèi)容本篇暫不討論,等我下一篇再說仅醇! )

5.45.4直連時(shí)的遠(yuǎn)程引用

上面講的是沒有配置直連的情況冗美,這次我們改一下配置,將引用改成直連看看會(huì)咋樣析二。

<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService" url="dubbo://localhost:20880" />

同樣是到這一行代碼粉洼,

// 單個(gè)注冊(cè)中心或服務(wù)提供者
        if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));

             // 多個(gè)注冊(cè)中心或多個(gè)服務(wù)提供者节预,或者兩者混合
            } else {
            //…………
        }

此時(shí)的urls屬性如下
dubbo://localhost:20880/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=2144&qos.port=33333&register.ip=192.168.1.102&side=consumer&timestamp=1594651670644

image.png

注意到頭部已經(jīng)被換成dubbo了,與上文的registry類似属韧,這邊就是調(diào)用DubboProtocol來進(jìn)行引用了安拟,鏈路如下
Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol
ProtocolFilterWrapper創(chuàng)建好過濾器鏈,ProtocolListenerWrapper開啟監(jiān)聽后宵喂,就開始調(diào)用DubboProtocol#refer()了糠赦,來看一下這個(gè)方法做了什么吧

5.5DubboProtocol#refer()

  public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        // 初始化序列化優(yōu)化器
        optimizeSerialization(url);
        // 獲得遠(yuǎn)程通信客戶端數(shù)組
        // 創(chuàng)建 DubboInvoker 對(duì)象
        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        // 添加到 `invokers`
        invokers.add(invoker);
        return invoker;
    }

上面方法看起來比較簡(jiǎn)單,不過這里有一個(gè)調(diào)用需要我們注意一下锅棕,即 getClients(url)愉棱。這個(gè)方法用于獲取客戶端實(shí)例,實(shí)例類型為ExchangeClient哲戚。ExchangeClient實(shí)際上并不具備通信能力,它需要基于更底層的客戶端實(shí)例進(jìn)行通信艾岂。比如 NettyClient顺少、MinaClient 等,默認(rèn)情況下王浴,Dubbo 使用 NettyClient進(jìn)行通信脆炎。接下來,我們簡(jiǎn)單看一下 getClients() 方法的邏輯氓辣。

5.5new DubboInvoker時(shí)的getClients(url)

private ExchangeClient[] getClients(URL url) {
    // 是否共享連接
    boolean service_share_connect = false;
    // 獲取連接數(shù)秒裕,默認(rèn)為0,表示未配置
    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
    // 如果未配置 connections钞啸,則共享連接
    if (connections == 0) {
        service_share_connect = true;
        connections = 1;
    }

    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (service_share_connect) {
            // 獲取共享客戶端
            clients[i] = getSharedClient(url);
        } else {
            // 初始化新的客戶端
            clients[i] = initClient(url);
        }
    }
    return clients;
}

這里根據(jù) connections 數(shù)量決定是獲取共享客戶端還是創(chuàng)建新的客戶端實(shí)例几蜻,默認(rèn)情況下,使用共享客戶端實(shí)例体斩。getSharedClient 方法中也會(huì)調(diào)用 initClient 方法梭稚,因此下面我們一起看一下這兩個(gè)方法,看看他是怎么共享絮吵,怎么初始化的弧烤。

5.6getClients(url)中的getSharedClient(url)

 private ExchangeClient getSharedClient(URL url) {
        // 從集合中,查找 ReferenceCountExchangeClient 對(duì)象
        String key = url.getAddress();
        ReferenceCountExchangeClient client = referenceClientMap.get(key);
        if (client != null) {
            // 若未關(guān)閉蹬敲,增加指向該 Client 的數(shù)量暇昂,并返回它
            if (!client.isClosed()) {
                client.incrementAndGetCount();
                return client;
            // 若已關(guān)閉,移除
            } else {
                referenceClientMap.remove(key);
            }
        }
        // 同步伴嗡,創(chuàng)建 ExchangeClient 對(duì)象急波。
        synchronized (key.intern()) {
            // 創(chuàng)建 ExchangeClient 對(duì)象
            ExchangeClient exchangeClient = initClient(url);
            // 將 `exchangeClient` 包裝,創(chuàng)建 ReferenceCountExchangeClient 對(duì)象
            client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
            // 添加到集合
            referenceClientMap.put(key, client);
            // 從 `ghostClientMap`移除
            ghostClientMap.remove(key);
            return client;
        }
    }
  • 上面方法先訪問緩存闹究,若緩存未命中幔崖,則通過initClient 方法創(chuàng)建新的 ExchangeClient 實(shí)例,并將該實(shí)例傳給 ReferenceCountExchangeClient構(gòu)造方法創(chuàng)建一個(gè)帶有引用計(jì)數(shù)功能的 ExchangeClient實(shí)例。
  • ReferenceCountExchangeClient 類內(nèi)部使用引用計(jì)數(shù)的方式記錄共享的數(shù)量赏寇。
    ghostClientMap吉嫩,這個(gè)名字說實(shí)話確實(shí)有點(diǎn)難理解,這個(gè)實(shí)際上是一個(gè)存儲(chǔ)LazyConnectExchangeClient的集合嗅定。
    這個(gè)時(shí)候我們就要理解LazyConnectExchangeClient的作用自娩,當(dāng)服務(wù)引用時(shí),我們并不想此時(shí)就是開始通信渠退,而是在調(diào)用的時(shí)候再與服務(wù)端通信忙迁,LazyConnectExchangeClient就像是一個(gè)緩存,在服務(wù)調(diào)用的時(shí)候才會(huì)創(chuàng)建真正的Client去連接碎乃,節(jié)省了資源.
  • LazyConnectExchangeClient 在每次數(shù)據(jù)傳輸前姊扔,先判斷tcp連接狀態(tài),若連接斷開則先執(zhí)行connect建立連接梅誓。

5.7getClients(url)中的initClient(url)

DubboProtocol#getSharedClient()也也調(diào)用了DubboProtocol#initClient()恰梢,看來是繞不過這方法了,來繼續(xù)看他的代碼梗掰。

private ExchangeClient initClient(URL url) {
        // 獲取客戶端類型嵌言,默認(rèn)為 netty
        String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
        // 校驗(yàn) Client 的 Dubbo SPI 拓展是否存在,不存在則拋出異常
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }

        // 設(shè)置編解碼器為 Dubbo 及穗,即 DubboCountCodec
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);

        // 默認(rèn)開啟 heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

        // 連接服務(wù)器摧茴,創(chuàng)建客戶端
        ExchangeClient client;
        try {
            // 懶連接,創(chuàng)建 LazyConnectExchangeClient 對(duì)象
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);
            // 直接連接埂陆,創(chuàng)建 HeaderExchangeClient 對(duì)象
            } else {
                client = Exchangers.connect(url, requestHandler);
            }
        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }
        return client;
    }
  • DubboProtocol的requestHandlerExchangeHandler的實(shí)現(xiàn)苛白,是remoting層接收數(shù)據(jù)后的回調(diào)。只定義了一個(gè)回復(fù)請(qǐng)求結(jié)果的方法猜惋,返回的是請(qǐng)求結(jié)果
  • initClient() 方法首先獲取用戶配置的客戶端類型丸氛,默認(rèn)為 netty。然后檢測(cè)用戶配置的客戶端類型是否存在著摔,不存在則拋出異常缓窜。最后根據(jù) lazy 配置決定創(chuàng)建什么類型的客戶端。這里的 LazyConnectExchangeClient 代碼并不是很復(fù)雜谍咆,該類會(huì)在 request 方法被調(diào)用時(shí)通過 Exchangers#connect 方法創(chuàng)建 ExchangeClient 客戶端禾锤,該類的代碼本節(jié)就不分析了。下面我們分析一下 Exchangers#connect() 方法摹察。

5.8initClient(url)中的自適應(yīng)Exchangers#connect()

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    // 獲取 Exchanger 實(shí)例恩掷,默認(rèn)為 HeaderExchangeClient
    return getExchanger(url).connect(url, handler);

Exchangers#getExchanger 會(huì)通過 SPI 加載 HeaderExchanger 實(shí)例(在Dubbo中只有這一個(gè)實(shí)例),來就來看看HeaderExchanger#connect()

public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
  // 這里包含了多個(gè)調(diào)用供嚎,分別如下:
    // 1. 創(chuàng)建 HeaderExchangeHandler 對(duì)象
    // 2. 創(chuàng)建 DecodeHandler 對(duì)象
    // 3. 通過 Transporters 構(gòu)建 Client 實(shí)例
    // 4. 創(chuàng)建 HeaderExchangeClient 對(duì)象
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

我們這里重點(diǎn)看一下 Transporters#connect 方法

public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    ChannelHandler handler;
    if (handlers == null || handlers.length == 0) {
        handler = new ChannelHandlerAdapter();
    } else if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        // 如果 handler 數(shù)量大于1黄娘,則創(chuàng)建一個(gè) ChannelHandler 分發(fā)器
        handler = new ChannelHandlerDispatcher(handlers);
    }
    
    // 獲取 Transporter 自適應(yīng)拓展類峭状,并調(diào)用 connect 方法生成 Client 實(shí)例
    return getTransporter().connect(url, handler);
}

如上,getTransporter() 方法返回的是自適應(yīng)拓展類逼争,該類會(huì)在運(yùn)行時(shí)根據(jù)客戶端類型加載指定的 Transporter 實(shí)現(xiàn)類优床。若用戶未配置客戶端類型,則默認(rèn)加載 NettyTransporter誓焦,并調(diào)用該類的 connect() 方法胆敞。如下:

public Client connect(URL url, ChannelHandler listener) throws RemotingException {
    // 創(chuàng)建 NettyClient 對(duì)象
    return new NettyClient(url, listener);
}

到這里就不繼續(xù)跟下去了,在往下就是通過 Netty 提供的 API 構(gòu)建 Netty 客戶端了杂伟。

5.9多注冊(cè)中心時(shí)鏈路

else {
                // 循環(huán) `urls` 移层,引用服務(wù),返回 Invoker 對(duì)象
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    // 引用服務(wù)
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    // 使用最后一個(gè)注冊(cè)中心的 URL
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
                // 有注冊(cè)中心
                if (registryURL != null) { // registry url is available
                    // 對(duì)有注冊(cè)中心的 Cluster 只用 AvailableCluster
                    // use AvailableCluster only when register's cluster is available
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                // 無注冊(cè)中心赫粥,全部都是服務(wù)直連
                } else { // not a registry url
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }

可以看到多注冊(cè)中心的時(shí)候多了一步 cluster.join()观话,其他與上文分析的單注冊(cè)中心并無區(qū)別,僅僅是Cluster 將多個(gè)服務(wù)節(jié)點(diǎn)合并為一個(gè)越平,并生成一個(gè) Invoker匪燕,這個(gè)我們留待下面章節(jié)來主要分析Cluster。

至此喧笔,我們的DubboProcotol#refer()也分析完畢告一段落,我們知道了他里面創(chuàng)建了一個(gè)DubboInvoker,其中有我們的Clients實(shí)例龟再,基于此书闸,我們才可以通過它來進(jìn)行遠(yuǎn)程調(diào)用了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末利凑,一起剝皮案震驚了整個(gè)濱河市浆劲,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌哀澈,老刑警劉巖牌借,帶你破解...
    沈念sama閱讀 221,430評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異割按,居然都是意外死亡膨报,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,406評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門适荣,熙熙樓的掌柜王于貴愁眉苦臉地迎上來现柠,“玉大人陡厘,你說我怎么就攤上這事仓手「嵛叮” “怎么了榜旦?”我有些...
    開封第一講書人閱讀 167,834評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵亡容,是天一觀的道長局冰。 經(jīng)常有香客問我植兰,道長焚挠,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,543評(píng)論 1 296
  • 正文 為了忘掉前任湾笛,我火速辦了婚禮饮怯,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘迄本。我一直安慰自己硕淑,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,547評(píng)論 6 397
  • 文/花漫 我一把揭開白布嘉赎。 她就那樣靜靜地躺著置媳,像睡著了一般。 火紅的嫁衣襯著肌膚如雪公条。 梳的紋絲不亂的頭發(fā)上拇囊,一...
    開封第一講書人閱讀 52,196評(píng)論 1 308
  • 那天,我揣著相機(jī)與錄音靶橱,去河邊找鬼寥袭。 笑死,一個(gè)胖子當(dāng)著我的面吹牛关霸,可吹牛的內(nèi)容都是我干的传黄。 我是一名探鬼主播,決...
    沈念sama閱讀 40,776評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼队寇,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼膘掰!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起佳遣,我...
    開封第一講書人閱讀 39,671評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤识埋,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后零渐,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體窒舟,經(jīng)...
    沈念sama閱讀 46,221評(píng)論 1 320
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,303評(píng)論 3 340
  • 正文 我和宋清朗相戀三年诵盼,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了惠豺。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,444評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡风宁,死狀恐怖耕腾,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情杀糯,我是刑警寧澤扫俺,帶...
    沈念sama閱讀 36,134評(píng)論 5 350
  • 正文 年R本政府宣布,位于F島的核電站固翰,受9級(jí)特大地震影響狼纬,放射性物質(zhì)發(fā)生泄漏羹呵。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,810評(píng)論 3 333
  • 文/蒙蒙 一疗琉、第九天 我趴在偏房一處隱蔽的房頂上張望冈欢。 院中可真熱鬧,春花似錦盈简、人聲如沸凑耻。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,285評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽香浩。三九已至,卻和暖如春臼勉,著一層夾襖步出監(jiān)牢的瞬間邻吭,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,399評(píng)論 1 272
  • 我被黑心中介騙來泰國打工宴霸, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留囱晴,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,837評(píng)論 3 376
  • 正文 我出身青樓瓢谢,卻偏偏與公主長得像畸写,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子氓扛,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,455評(píng)論 2 359