第8章 Dubbo 服務(wù)引用流程的設(shè)計與實現(xiàn)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
    <!-- 應(yīng)用名毡证,通常與 artifactId 相同即可 -->
    <dubbo:application name="demo-consumer"/>
    <!-- 注冊中心 -->
    <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
    <!-- 生成遠(yuǎn)端服務(wù)的代理對象, 之后可以向調(diào)用本地服務(wù)一樣調(diào)用遠(yuǎn)端服務(wù) -->
    <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService"/>
</beans>
服務(wù)消費(fèi)者創(chuàng)建服務(wù)代理
// demoService 是代理對象瘪贱,代理對象是 DemoService 接口的實現(xiàn)類
DemoService demoService = (DemoService) context.getBean("demoService");

一卷雕、服務(wù)消費(fèi)者創(chuàng)建服務(wù)代理簡圖

Alt pic
總體流程:(默認(rèn)配置情況下)
  1. 首先 ReferenceConfig 類的 init 方法調(diào)用 Protocol#refer 方法生成 Invoker 實例(如上圖中的紅色部分)抑堡,這是服務(wù)消費(fèi)的關(guān)鍵浸踩。
  2. 然后使用 JavassistProxyFactory#getProxy 生成接口(DemoService)的代理對象 ref
image.png
  • 服務(wù)引用第一步掌敬,有注冊中心的情況下(最常用)會調(diào)用 RegistryProtocol#refer(Class<T> type, URL url) 匀借,RegistryProtocol 實際上是其他具體 Protocol(eg. DubboProtocol)的 AOP 類碴倾,在 refer(...) 中:
  1. 獲取注冊中心
  2. 創(chuàng)建 RegistryDirectory(AOP)
  3. 首先會獲取注冊中心 Registry逗噩,然后進(jìn)行服務(wù)注冊;(AOP)
  4. 訂閱providers跌榔、configurators异雁、routers

4.1. 做第一次服務(wù)發(fā)現(xiàn),獲取到 provider 節(jié)點(provider 以 URL 進(jìn)行表示)后僧须;(AOP)
4.2. 之后使用具體的 DubboProtocol 將這些表示 provider 的 URL 轉(zhuǎn)化為 DubboInvoker纲刀,并且為每一個 provider 創(chuàng)建 nettyClient,與 nettyServer 進(jìn)行連接具體的 Protocol 做的事
4.3担平、進(jìn)行 DubboInvoker 的緩存(AOP)示绊,其中 RegistryDirectory#Map<String, List<Invoker<T>>> methodInvokerMap 是后續(xù)發(fā)起調(diào)用時獲取 Invoker 的真正容器(重要

  1. 將directory封裝成一個ClusterInvoker(MockClusterInvoker)。
public class RegistryProtocol implements Protocol {
    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        ...
        // 1. 獲取注冊中心:創(chuàng)建ZkClient實例暂论,連接zk
        Registry registry = registryFactory.getRegistry(url);
        ...
        // 2.
        return doRefer(cluster, registry, type, url);
    }

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        // 1. 創(chuàng)建 RegistryDirectory 實例
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        ...
        // 2. 向注冊中心注冊服務(wù)
        registry.register(registeredConsumerUrl);
        // 3. 訂閱providers面褐、configurators、routers(訂閱時取胎,調(diào)用了具體的Protocol,eg. DubboProtocol 的 refer(RegistryDirectory)展哭,方法,在該方法中,創(chuàng)建了 nettyClient 端匪傍,并建立了長連接)
        directory.subscribe(subscribeUrl.addParameter("category","providers,configurators,routers"));
        // 4. 將directory封裝成一個ClusterInvoker(MockClusterInvoker)
        Invoker invoker = cluster.join(directory);
        ...
        return invoker;
    }
}

大致看下后續(xù)發(fā)起調(diào)用時坝咐,是怎么從 RegistryDirectory 中獲取可執(zhí)行的 Invoker 的。

public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
    // 以注釋處的例子為例析恢,初始化之后:{"sayHello":[A,B], "sayBye":[B], "*":[router過濾后的provider]}
    private volatile Map<String, List<Invoker<T>>> methodInvokerMap;
    
    /************************* 初始化更新 newMethodInvokerMap *************************/
    // 訂閱providers墨坚、configurators、routers時映挂,執(zhí)行的通知邏輯
    @Override
    public synchronized void notify(List<URL> urls) {
        // 服務(wù)提供者URL
        List<URL> invokerUrls = new ArrayList<URL>();
        ...
        // 初始化 invokerUrls
        for (URL url : urls) {
            String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            ...
            if (Constants.PROVIDERS_CATEGORY.equals(category)) {
                invokerUrls.add(url);
            } 
        }
        ...
        // 只針對 providers 進(jìn)行調(diào)用
        refreshInvoker(invokerUrls);
    }

    private void refreshInvoker(List<URL> invokerUrls) {
        ...
        // Translate url list to Invoker map => 將url轉(zhuǎn)換為InvokerDelegate
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
        // Change method name to map Invoker Map => 構(gòu)造{"sayHello":InvokerDelegate}這樣的鍵值對
        Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); 
        ...
        // 初始化實例屬性 methodInvokerMap
        this.methodInvokerMap = newMethodInvokerMap;
        ...
    }

    private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) {
        Map<String, List<Invoker<T>>> newMethodInvokerMap = new HashMap<String, List<Invoker<T>>>();
        ...
        for (Invoker<T> invoker : invokersMap.values()) {
            // 1. 獲取 provider 的所有方法 methods=xxx,yyy,zzz
            // (同一個接口的不同 provider 的 methods 參數(shù)可能不同泽篮,例如 DemoService#sayHello() 在 providerA 中有,后續(xù)添加了 DemoService#sayBye() 之后柑船,部署到了 providerB帽撑,
            // 此時 providerA 還沒部署,這一時刻鞍时,進(jìn)行的服務(wù)發(fā)現(xiàn)根據(jù) serviceKey 會發(fā)現(xiàn) providerA 和 providerB亏拉,但是二者所擁有的方法卻是不同的,那么經(jīng)過如下邏輯后逆巍,
            // newMethodInvokerMap={"sayHello":[A,B], "sayBye":[B]})
            String parameter = invoker.getUrl().getParameter(Constants.METHODS_KEY);
            String[] methods = Constants.COMMA_SPLIT_PATTERN.split(parameter);
            for (String method : methods) {
                ...
                newMethodInvokerMap.put(method, methodInvokers);
                ...
            }
        }
        // newMethodInvokerMap={"sayHello":[A,B], "sayBye":[B], "*":[router過濾后的provider]}
        newMethodInvokerMap.put(Constants.ANY_VALUE, newInvokersList);
        ...
        return Collections.unmodifiableMap(newMethodInvokerMap);
    }

    /************************* 從 newMethodInvokerMap 中選擇 Invoker *************************/
    @Override
    public List<Invoker<T>> doList(Invocation invocation) {
        ...
        List<Invoker<T>> invokers = null;
        Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
        if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
            String methodName = RpcUtils.getMethodName(invocation);
            ...
            // 1. 根據(jù)方法名進(jìn)行 Invoker 的獲燃疤痢:從 {"sayHello":List<InvokerDelegate實例>} 中根據(jù) methodName("sayHello")獲取List<InvokerDelegate實例>
            if (invokers == null) {
                invokers = localMethodInvokerMap.get(methodName); 
            }
            // 2. 根據(jù) key=* 進(jìn)行獲取 List<Invoker>
            if (invokers == null) {
                invokers = localMethodInvokerMap.get(Constants.ANY_VALUE); 
            }
            // 3. 遍歷獲取一個 List<Invoker>
            if (invokers == null) {
                Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator(); 
                if (iterator.hasNext()) {
                    invokers = iterator.next();
                }
            }
        }
        return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
    }
    ...
}

Q: 為什么要按照方法名進(jìn)行 provider 的緩存而不是直接按照 interfaceName/group/version 這樣的格式?
A: 同一個接口的不同 provider 的 methods 參數(shù)可能不同锐极,例如 DemoService#sayHello() 在 providerA 中有笙僚,后續(xù)添加了 DemoService#sayBye() 之后,部署到了 providerB灵再,同時將新包(包含sayBye())打包給 consumer肋层,consumer 部署完成后,假設(shè)此時 providerA 還沒部署翎迁,這一時刻栋猖,consumer 進(jìn)行的服務(wù)發(fā)現(xiàn)根據(jù) serviceKey 會發(fā)現(xiàn) providerA 和 providerB,但是二者所擁有的方法卻是不同的汪榔,那么經(jīng)過如下邏輯后蒲拉,newMethodInvokerMap={"sayHello":[A,B], "sayBye":[B]}),這樣后續(xù)如果執(zhí)行 DemoService#sayBye() 揍异,就會直接獲取到 B全陨。
注意:在 2.7.x 中去掉了 newMethodInvokerMap 屬性,不再使用方法名作為 key衷掷,直接存儲 List<Invoker>辱姨,代碼雖然簡化了,但是也丟失了 A 中描述的好處戚嗅。

服務(wù)引用的第二步雨涛,見下文第二小節(jié)分析和 第10章 Dubbo 代理層的設(shè)計與實現(xiàn)

二枢舶、服務(wù)消費(fèi)者創(chuàng)建服務(wù)代理源碼梯形圖

ReferenceConfig.init()
-->createProxy(Map<String, String> map)
  //一 獲取Invoker
  -->RegistryProtocol.refer(Class<T> type, URL url)
    //1 獲取注冊中心:創(chuàng)建ZkClient實例,連接zk
    -->Registry registry = registryFactory.getRegistry(url)
      -->AbstractRegistryFactory.getRegistry(URL url)
        -->ZookeeperRegistryFactory.createRegistry(URL url)
          -->new ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter)
            -->ZkclientZookeeperTransporter.connect(URL url)
              -->new ZkclientZookeeperClient(URL url)
                -->new ZkClient(url.getBackupAddress())
            -->AbstractRegistryFactory.Map<String, Registry> REGISTRIES.put("zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService", 上邊的ZookeeperRegistry實例)
    -->doRefer(Cluster cluster, Registry registry, Class<T> type, URL url)
      -->new RegistryDirectory<T>(type, url)
      //2 向注冊中心注冊服務(wù)
      -->registry.register(url)
        -->ZookeeperRegistry.doRegister(URL url)
          -->AbstractZookeeperClient.create(String path, boolean ephemeral)
    //3 訂閱providers替久、configurators凉泄、routers
      -->RegistryDirectory.subscribe(URL url)
        -->ZookeeperRegistry.doSubscribe(final URL url, final NotifyListener listener)
          //3.1 會獲取當(dāng)前節(jié)點下已經(jīng)存在的子節(jié)點(第一次服務(wù)發(fā)現(xiàn)發(fā)生在這里),添加子節(jié)點變化監(jiān)聽器
          -->List<String> children = zkClient.addChildListener(path, zkListener)
          -->AbstractRegistry.notify(URL url, NotifyListener listener, List<URL> urls)
            -->saveProperties(url)
            -->RegistryDirectory.notify(List<URL> urls)
              //僅僅針對的是providers
              -->refreshInvoker(List<URL> invokerUrls)
                -->toInvokers(List<URL> urls)
                  -->ProtocolFilterWrapper.refer(Class<T> type, URL url)
                    -->DubboProtocol.refer(Class<T> serviceType, URL url)
                      //3.1.1 創(chuàng)建ExchangeClient蚯根,對第一次服務(wù)發(fā)現(xiàn)providers路徑下的相關(guān)url建立長連接
                      -->getClients(URL url)
                        -->getSharedClient(URL url)
                          -->ExchangeClient exchangeClient = initClient(url)
                            -->Exchangers.connect(url, requestHandler)
                              -->HeaderExchanger.connect(URL url, ExchangeHandler handler)
                                -->new DecodeHandler(new HeaderExchangeHandler(handler)))
                                  -->Transporters.connect(URL url, ChannelHandler... handlers)
                                    -->NettyTransporter.connect(URL url, ChannelHandler listener)
                                      -->new NettyClient(url, listener)
                                        -->new MultiMessageHandler(HeartbeatHandler(AllChannelHandler(handler)))
                                        -->getChannelCodec(url)//獲取Codec2后众,這里是DubboCountCodec實例
                                        -->doOpen()//開啟netty客戶端
                                        -->doConnect()//連接服務(wù)端,建立長連接
                                -->new HeaderExchangeClient(Client client, boolean needHeartbeat)//上述的NettyClient實例颅拦,needHeartbeat:true
                                  -->startHeatbeatTimer()//啟動心跳計數(shù)器
                          -->ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap)/
                          -->Map<String, ReferenceCountExchangeClient> referenceClientMap.put("10.10.10.10:20880", 上邊的ReferenceCountExchangeClient實例)
                      //3.2 創(chuàng)建DubboInvoker(nettyClient的持有者蒂誉,真正發(fā)起netty調(diào)用的Invoker,做兩件事:選擇nettyClient + 處理單向/異步/同步調(diào)用模板)
                      -->new DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers)
                      -->DubboProtocol.Set<Invoker<?>> invokers.add(上邊的DubboInvoker實例)
                    -->ProtocolFilterWrapper.buildInvokerChain(final Invoker<T> invoker, String key, String group)
                  -->new InvokerDelegete(Invoker<T> invoker, URL url, URL providerUrl)
                  //3.3 將創(chuàng)建出來的Invoker緩存起來
                  -->newUrlInvokerMap.put("dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=16001&register.ip=10.10.10.10&remote.timestamp=1510127991625&side=consumer&timestamp=1510128022123", 上邊的InvokerDelegate實例)
                -->toMethodInvokers(newUrlInvokerMap)
                -->Map<String, List<Invoker<T>>> newMethodInvokerMap:{sayHello=[InvokerDelegete實例], *=[InvokerDelegete實例]}
      //4 將directory封裝成一個ClusterInvoker(MockClusterInvoker)
      -->cluster.join(directory)
        -->Cluster$Adaptive.join(directory)
          -->ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("failover")//MockClusterWrapper包裝FailoverCluster
            -->MockClusterWrapper.join(Directory<T> directory)
              -->FailoverCluster.join(Directory<T> directory)
                -->new FailoverClusterInvoker<T>(directory)
              -->MockClusterInvoker(Directory<T> directory, Invoker<T> invoker)//invoker:上邊的FailoverClusterInvoker實例
  //二 獲取代理
  -->JavassistProxyFactory.getProxy(Invoker<T> invoker, Class<?>[] interfaces)//invoker:上邊的MockClusterInvoker實例距帅, interfaces:[interface com.alibaba.dubbo.demo.DemoService, interface com.alibaba.dubbo.rpc.service.EchoService]
    -->Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker))
      -->Proxy.getProxy(ClassLoader cl, Class<?>... ics)//使用javassist獲取一個動態(tài)類
      -->new InvokerInvocationHandler(invoker)//invoker:上邊的MockClusterInvoker實例

消費(fèi)者發(fā)布的時候總體做了兩件事:創(chuàng)建 Invoker創(chuàng)建 API 接口的代理對象右锨。

2.1 創(chuàng)建Invoker

1. 獲取注冊中心:創(chuàng)建 ZkClient 實例,連接 zk
    //1 獲取注冊中心:創(chuàng)建ZkClient實例碌秸,連接zk
    -->Registry registry = registryFactory.getRegistry(url)
      -->AbstractRegistryFactory.getRegistry(URL url)
        -->ZookeeperRegistryFactory.createRegistry(URL url)
          -->new ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter)
            -->ZkclientZookeeperTransporter.connect(URL url)
              -->new ZkclientZookeeperClient(URL url)
                -->new ZkClient(url.getBackupAddress())
            -->AbstractRegistryFactory.Map<String, Registry> REGISTRIES.put("zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService", 上邊的ZookeeperRegistry實例)

與provider相同绍移,不再贅述。

2. 向注冊中心注冊 consumer 服務(wù)
      //2 向注冊中心注冊服務(wù)
      -->registry.register(url)
        -->ZookeeperRegistry.doRegister(URL url)
          -->AbstractZookeeperClient.create(String path, boolean ephemeral)

consumer 完成注冊后讥电,會在 zk 上創(chuàng)建節(jié)點(url 解碼后):

/dubbo
- /com.alibaba.dubbo.demo.DemoService
-- /consumers
--- /consumer://10.10.10.10/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=consumers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=25267&side=consumer&timestamp=1510225913509
3. 使用 RegistryDirectory 訂閱 providers蹂窖、configurators、routers 節(jié)點

其中允趟,configurators 節(jié)點用于覆蓋配置(實現(xiàn)“熱配置”)恼策,routers 節(jié)點用于配置路由信息。這里重點說一下 providers節(jié)點潮剪,該節(jié)點下存儲著 DemoService 的服務(wù)提供者列表,當(dāng)該列表發(fā)生變化時(添加 provider 機(jī)器或者宕機(jī))分唾,會通知 consumer 進(jìn)行 refreshInvoker 操作抗碰。看一下 RegistryDirectory 訂閱 providers 的邏輯绽乔。

          //3.1 會獲取當(dāng)前節(jié)點下已經(jīng)存在的字節(jié)點(第一次服務(wù)發(fā)現(xiàn)發(fā)生在這里)弧蝇,添加子節(jié)點變化監(jiān)聽器
          -->List<String> children = zkClient.addChildListener(path, zkListener)
          -->AbstractRegistry.notify(URL url, NotifyListener listener, List<URL> urls)
            -->saveProperties(url)
            -->RegistryDirectory.notify(List<URL> urls)
              //僅僅針對的是providers
              -->refreshInvoker(List<URL> invokerUrls)
                -->toInvokers(List<URL> urls
                  -->ProtocolFilterWrapper.refer(Class<T> type, URL url)
                    -->DubboProtocol.refer(Class<T> serviceType, URL url)
                      //3.1.1 創(chuàng)建ExchangeClient,對第一次服務(wù)發(fā)現(xiàn)providers路徑下的相關(guān)url建立長連接
                      -->getClients(URL url)
                        -->getSharedClient(URL url)
                          -->ExchangeClient exchangeClient = initClient(url)
                            -->Exchangers.connect(url, requestHandler)
                              -->HeaderExchanger.connect(URL url, ExchangeHandler handler)
                                -->new DecodeHandler(new HeaderExchangeHandler(handler)))
                                  -->Transporters.connect(URL url, ChannelHandler... handlers)
                                    -->NettyTransporter.connect(URL url, ChannelHandler listener)
                                      -->new NettyClient(url, listener)
                                        -->new MultiMessageHandler(HeartbeatHandler(AllChannelHandler(handler)))
                                        -->getChannelCodec(url)//獲取Codec2折砸,這里是DubboCountCodec實例
                                        -->doOpen()//開啟netty客戶端
                                        -->doConnect()//連接服務(wù)端看疗,建立長連接
                                -->new HeaderExchangeClient(Client client, boolean needHeartbeat)//上述的NettyClient實例,needHeartbeat:true
                                  -->startHeatbeatTimer()//啟動心跳計數(shù)器
                          -->ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap)/
                          -->Map<String, ReferenceCountExchangeClient> referenceClientMap.put("10.10.10.10:20880", 上邊的ReferenceCountExchangeClient實例)
                      //3.2 創(chuàng)建DubboInvoker
                      -->new DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers)
                      -->DubboProtocol.Set<Invoker<?>> invokers.add(上邊的DubboInvoker實例)
                    -->ProtocolFilterWrapper.buildInvokerChain(final Invoker<T> invoker, String key, String group)
                  -->new InvokerDelegete(Invoker<T> invoker, URL url, URL providerUrl)
                  //3.3 將創(chuàng)建出來的Invoker緩存起來
                  -->newUrlInvokerMap.put("dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=16001&register.ip=10.10.10.10&remote.timestamp=1510127991625&side=consumer&timestamp=1510128022123", 上邊的InvokerDelegate實例)
                -->toMethodInvokers(newUrlInvokerMap)
                -->Map<String, List<Invoker<T>>> newMethodInvokerMap:{sayHello=[InvokerDelegete實例], *=[InvokerDelegete實例]}
總體流程 :
  1. 首先添加 provider 子節(jié)點監(jiān)聽器睦授,同時進(jìn)行 第一次服務(wù)發(fā)現(xiàn)两芳,找出當(dāng)前注冊在 zk 上的 provider 節(jié)點。
  2. 然后創(chuàng)建 ReferenceCountExchangeClient去枷,為每一個 provider 創(chuàng)建一條 Netty 長連接(開啟了 Netty 客戶端并連接 provider 的 Netty 服務(wù)端)
  3. 開啟心跳定時器
  4. 緩存 ReferenceCountExchangeClient
  5. 將創(chuàng)建出來的 ReferenceCountExchangeClient 封裝到 DubboInvoker 實例中
  6. 根據(jù)條件獲取相關(guān)的 filter怖辆,之后使用這些 fiter 對 DubboInvoker 實例進(jìn)行鏈?zhǔn)桨b是复,將包裝后的 DubboInvoker 封裝到 InvokerDelegete 中
  7. 最后將該 InvokerDelegete 實例按照方法名封裝到 Map<String, List<Invoker<T>>> newMethodInvokerMap 中。(該 map 也是 consumer 調(diào)用 provider 時竖螃,獲取 provider 實例的地方淑廊,該 map 是 RegistryDirectory 的一個屬性,所以我們可以將 RegistryDirectory 看做是一個 provider 的客戶端緩存器

Directory.List<Router> routers 會在兩個地方被設(shè)置

  • 初次創(chuàng)建 Directory 實例時特咆,在 Directory 構(gòu)造器中進(jìn)行設(shè)置季惩;
  • notify 的時候會進(jìn)行 routers 的重新設(shè)置。

關(guān)于 Directory 的設(shè)計腻格,后續(xù)分析

4. 將 directory 封裝成一個 ClusterInvoker(MockClusterInvoker)
      //4 將directory封裝成一個ClusterInvoker(MockClusterInvoker)
      -->cluster.join(directory)
        -->Cluster$Adaptive.join(directory)
          -->ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("failover")//MockClusterWrapper包裝FailoverCluster
            -->MockClusterWrapper.join(Directory<T> directory)
              -->FailoverCluster.join(Directory<T> directory)
                -->new FailoverClusterInvoker<T>(directory)
              -->MockClusterInvoker(Directory<T> directory, Invoker<T> invoker)//invoker:上邊的FailoverClusterInvoker實例

Dubbo 實現(xiàn)了集群容錯功能画拾。在 consumer 發(fā)布的最后流程中,實現(xiàn)了集群容錯荒叶。

  1. 首先根據(jù) Dubbo SPI 機(jī)制獲取指定類型的 Cluster 實現(xiàn)碾阁,這里默認(rèn)是 FailoverCluster(失敗重試機(jī)制);
  2. 之后將上邊的 RegistryDirectory 封裝到 FailoverClusterInvoker 實例中些楣;
  3. 最后創(chuàng)建一個 MockClusterInvoker 實例脂凶,封裝了 RegistryDirectoryFailoverClusterInvoker。(MockClusterInvoker 用于實現(xiàn)服務(wù)降級功能)

到這里愁茁,consumer 創(chuàng)建 Invoker 的源碼就結(jié)束了蚕钦。總結(jié)一下

  1. 獲取注冊中心:創(chuàng)建 ZkClient 實例,連接 zk
  2. 向注冊中心注冊 consumer 服務(wù)
  3. 使用 RegistryDirectory 訂閱 providers鹅很、configurators嘶居、routers 節(jié)點

3.1. 添加 provider 子節(jié)點監(jiān)聽器,同時進(jìn)行第一次服務(wù)發(fā)現(xiàn)促煮,找出當(dāng)前注冊在 zk 上的 provider 節(jié)點

3.1.1. 創(chuàng)建 ReferenceCountExchangeClient邮屁,為每一個 provider 創(chuàng)建一條 Netty 長連接(開啟了 Netty 客戶端并連接 provider 的 Netty 服務(wù)端)
3.1.2. 開啟心跳定時器
3.2. 根據(jù)條件獲取相關(guān)的 filter,之后使用這些 fiter 對 DubboInvoker 實例進(jìn)行鏈?zhǔn)桨b菠齿,將包裝后的 DubboInvoker 封裝到 InvokerDelegete 中
3.3. 最后將該 InvokerDelegete 實例按照方法名封裝到 Map<String, List<Invoker<T>>> newMethodInvokerMap

  1. 將 directory 封裝成一個 ClusterInvoker(MockClusterInvoker)

4.1 首先根據(jù) Dubbo SPI 機(jī)制獲取指定類型的 Cluster 實現(xiàn)佑吝,這里默認(rèn)是 FailoverCluster(失敗重試機(jī)制);
4.2 之后將上邊的 RegistryDirectory 封裝到 FailoverClusterInvoker 實例中绳匀;
4.3 最后創(chuàng)建一個 MockClusterInvoker 實例芋忿,封裝了 RegistryDirectory 和 FailoverClusterInvoker。(MockClusterInvoker用于實現(xiàn)服務(wù)降級功能)

我們可以看到疾棵,最終獲取到的 Invoker 是 MockClusterInvoker 實例戈钢。

關(guān)于 Cluster 的設(shè)計,后續(xù)分析

2.2 創(chuàng)建API接口的代理對象

  //二 獲取代理
  -->JavassistProxyFactory.getProxy(Invoker<T> invoker, Class<?>[] interfaces)//invoker:上邊的MockClusterInvoker實例是尔, interfaces:[interface com.alibaba.dubbo.demo.DemoService, interface com.alibaba.dubbo.rpc.service.EchoService]
    -->Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker))
      -->Proxy.getProxy(ClassLoader cl, Class<?>... ics)//使用javassist獲取一個動態(tài)類
      -->new InvokerInvocationHandler(invoker)//invoker:上邊的MockClusterInvoker實例

獲取 API 接口代理的邏輯比較簡單殉了,注意這里的 Proxy 是 com.alibaba.dubbo.common.bytecode.Proxy,而非 JDK 的 Proxy嗜历。
這里首先調(diào)用 Proxy.getProxy(interfaces) 獲取到一個創(chuàng)建代理的工廠類 com.alibaba.dubbo.common.bytecode.Proxy0宣渗,如下:

package com.alibaba.dubbo.common.bytecode;

import com.alibaba.dubbo.common.bytecode.ClassGenerator;
import com.alibaba.dubbo.common.bytecode.Proxy;
import com.alibaba.dubbo.common.bytecode.proxy0;
import java.lang.reflect.InvocationHandler;

public class Proxy0 extends Proxy implements ClassGenerator.DC {
    public Object newInstance(InvocationHandler invocationHandler) {
        return new proxy0(invocationHandler);
    }
}

之后調(diào)用了 Proxy0#newInstance 方法抖所,創(chuàng)建了一個 com.alibaba.dubbo.common.bytecode.proxy0 實例,該實例就是最終的 DemoService 的代理對象痕囱。

DemoService demoService = (DemoService) context.getBean("demoService");

這里的 demoService 就是上述的 com.alibaba.dubbo.common.bytecode.proxy0 實例田轧。

package com.alibaba.dubbo.common.bytecode;

import com.alibaba.dubbo.common.bytecode.ClassGenerator;
import com.alibaba.dubbo.demo.DemoService;
import com.alibaba.dubbo.rpc.service.EchoService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

public class proxy0 implements EchoService, DemoService {
    public static Method[] methods;
    private InvocationHandler handler;

    public String sayHello(String string) {
        Object[] arrobject = new Object[]{string};
        Object object = this.handler.invoke(this, methods[0], arrobject);
        return (String)object;
    }

    public Object $echo(Object object) {
        Object[] arrobject = new Object[]{object};
        Object object2 = this.handler.invoke(this, methods[1], arrobject);
        return object2;
    }

    public proxy0() {
    }

    public proxy0(InvocationHandler invocationHandler) {
        this.handler = invocationHandler;
    }
}

可以看到,當(dāng)調(diào)用 proxy0#sayHello 時鞍恢,實際上其內(nèi)部執(zhí)行的是 InvocationHandler#invoke傻粘,來看一下 InvocationHandler。

public class InvokerInvocationHandler implements InvocationHandler {
    private final Invoker<?> invoker; //上邊的MockClusterInvoker實例

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        ...
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
}

InvocationHandler#invoke 調(diào)用的又是 MockClusterInvoker#invoke 方法帮掉。到此為止弦悉,整個服務(wù)引用的源碼分析就完成了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蟆炊,一起剝皮案震驚了整個濱河市稽莉,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌涩搓,老刑警劉巖污秆,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異昧甘,居然都是意外死亡良拼,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進(jìn)店門充边,熙熙樓的掌柜王于貴愁眉苦臉地迎上來庸推,“玉大人,你說我怎么就攤上這事浇冰”崦剑” “怎么了?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵肘习,是天一觀的道長掖蛤。 經(jīng)常有香客問我,道長井厌,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任致讥,我火速辦了婚禮仅仆,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘垢袱。我一直安慰自己墓拜,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布请契。 她就那樣靜靜地躺著咳榜,像睡著了一般夏醉。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上涌韩,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天畔柔,我揣著相機(jī)與錄音,去河邊找鬼臣樱。 笑死靶擦,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的雇毫。 我是一名探鬼主播玄捕,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼棚放!你這毒婦竟也來了枚粘?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤飘蚯,失蹤者是張志新(化名)和其女友劉穎馍迄,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體孝冒,經(jīng)...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡柬姚,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了庄涡。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片量承。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖穴店,靈堂內(nèi)的尸體忽然破棺而出撕捍,到底是詐尸還是另有隱情,我是刑警寧澤泣洞,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布忧风,位于F島的核電站,受9級特大地震影響球凰,放射性物質(zhì)發(fā)生泄漏狮腿。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一呕诉、第九天 我趴在偏房一處隱蔽的房頂上張望缘厢。 院中可真熱鬧,春花似錦甩挫、人聲如沸贴硫。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽英遭。三九已至间护,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間挖诸,已是汗流浹背汁尺。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留税灌,地道東北人均函。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像菱涤,于是被迫代替她去往敵國和親苞也。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,762評論 2 345

推薦閱讀更多精彩內(nèi)容