Dubbo中消費者初始化的過程解析

首先還是Spring碰到dubbo的標簽之后穿撮,會使用parseCustomElement解析dubbo標簽纳账,使用的解析器是dubbo的DubboBeanDefinitionParser,解析完成之后返回BeanDefinition給Spring管理烦周。

服務(wù)消費者端對應(yīng)的是ReferenceBean,實現(xiàn)了ApplicationContextAware接口,Spring會在Bean的實例化那一步回調(diào)setApplicationContext方法穆律。也實現(xiàn)了InitializingBean接口,接著會回調(diào)afterPropertySet方法导俘。還實現(xiàn)了FactoryBean接口峦耘,實現(xiàn)FactoryBean可以在后期獲取bean的時候做一些操作,dubbo在這個時候做初始化旅薄。另外ReferenceBean還實現(xiàn)了DisposableBean辅髓,會在bean銷毀的時候調(diào)用destory方法。

消費者的初始化是在ReferenceBean的init方法中執(zhí)行少梁,分為兩種情況:

  • reference標簽中沒有配置init屬性洛口,此時是延遲初始化的,也就是只有等到bean引用被注入到其他Bean中猎莲,或者調(diào)用getBean獲取這個Bean的時候绍弟,才會初始化。比如在這里的例子里reference沒有配置init屬性著洼,只有等到HelloService helloService = (HelloService) applicationContext.getBean("helloService");這句getBean的時候樟遣,才會開始調(diào)用init方法進行初始化。
  • 另外一種情況是立即初始化身笤,即是如果reference標簽中init屬性配置為true豹悬,會立即進行初始化(也就是上面說到的實現(xiàn)了FactoryBean接口)。

初始化開始

這里以沒有配置init的reference為例液荸,只要不注入bean或者不調(diào)用getBean獲取bean的時候瞻佛,就不會被初始化。HelloService helloService = (HelloService) applicationContext.getBean("helloService");

另外在ReferenceBean這個類在Spring中初始化的時候,有幾個靜態(tài)變量會被初始化:

private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();

private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

這幾個變量的初始化是根據(jù)dubbo的SPI擴展機制動態(tài)生成的代碼:

refprotocol:

import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
  public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class {
    if (arg1 == null) throw new IllegalArgumentException("url == null");

    com.alibaba.dubbo.common.URL url = arg1;
    String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );

    if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
    
    com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
    
    return extension.refer(arg0, arg1);
  }
  
  public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
    if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
    
    if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
    
    String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
    
    if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
    
    com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
    
    return extension.export(arg0);
  }
  
  public void destroy() {
    throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
  }
  
  public int getDefaultPort() {
    throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
  }
}

cluster:

import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Cluster$Adpative implements com.alibaba.dubbo.rpc.cluster.Cluster {

  public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.cluster.Directory {
    if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
    
    if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
    
    String extName = url.getParameter("cluster", "failover");
    if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
    
    com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
    
    return extension.join(arg0);
  }
}

proxyFactory:

import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {

  public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
    if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
    
    if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
    
    String extName = url.getParameter("proxy", "javassist");
    if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
    
    com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
    
    return extension.getProxy(arg0);
  }
  
  public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object {
    if (arg2 == null) throw new IllegalArgumentException("url == null");
    
    com.alibaba.dubbo.common.URL url = arg2;
    String extName = url.getParameter("proxy", "javassist");
    
    if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
    
    com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
    
    return extension.getInvoker(arg0, arg1, arg2);
  }
}

初始化入口

初始化的入口在ReferenceConfig的get()方法:

public synchronized T get() {
  if (destroyed){
    throw new IllegalStateException("Already destroyed!");
  }
  if (ref == null) {
    init();
  }
  return ref;
}

init()方法會先檢查初始化所有的配置信息伤柄,然后調(diào)用ref = createProxy(map);創(chuàng)建代理绊困,消費者最終得到的是服務(wù)的代理。初始化主要做的事情就是引用對應(yīng)的遠程服務(wù)适刀,大概的步驟:

  • 監(jiān)聽注冊中心
  • 連接服務(wù)提供者端進行服務(wù)引用
  • 創(chuàng)建服務(wù)代理并返回

文檔上關(guān)于Zookeeper作為注冊中心時秤朗,服務(wù)消費者啟動時要做的事情有:

訂閱/dubbo/com.foo.BarService/providers目錄下的提供者URL地址。
并向/dubbo/com.foo.BarService/consumers目錄下寫入自己的URL地址笔喉。

創(chuàng)建代理

  • 引用遠程服務(wù)
  • 創(chuàng)建代理

init()中createProxy方法:

private T createProxy(Map<String, String> map) {
    //先判斷是否是本地服務(wù)引用injvm
    //判斷是否是點對點直連
    //判斷是否是通過注冊中心連接
    //然后是服務(wù)的引用
    //這里url為
    //registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?
    //application=dubbo-consumer&dubbo=2.5.3&pid=12272&
    //refer=application%3Ddubbo-consumer%26dubbo%3D2.5.3%26
    //interface%3Ddubbo.common.hello.service.HelloService%26
    //methods%3DsayHello%26pid%3D12272%26side%3D
    //consumer%26timeout%3D100000%26timestamp%3D1489318676447&
    //registry=zookeeper&timestamp=1489318676641
    //引用遠程服務(wù)由Protocol的實現(xiàn)來處理
    refprotocol.refer(interfaceClass, url);
    //最后返回服務(wù)代理
     return (T) proxyFactory.getProxy(invoker);
}

這里refprotocol是上面生成的代碼取视,會根據(jù)協(xié)議不同選擇不同的Protocol協(xié)議。

引用遠程服務(wù)

對于服務(wù)引用refprotocol.refer(interfaceClass, url)會首先進入ProtocolListenerWrapper的refer方法常挚,然后在進入ProtocolFilterWrapper的refer方法作谭,然后再進入RegistryProtocol的refer方法,這里的url協(xié)議是registry奄毡,所以上面兩個Wrapper中不做處理折欠,直接進入了RegistryProtocol,看下RegistryProtocol中:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    //這里獲得的url是
    //zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?
    //application=dubbo-consumer&dubbo=2.5.3&pid=12272&
    //refer=application%3Ddubbo-consumer%26dubbo%3D2.5.3%26
    //interface%3Ddubbo.common.hello.service.HelloService%26
    //methods%3DsayHello%26pid%3D12272%26side%3D
    //consumer%26timeout%3D100000%26
    //timestamp%3D1489318676447&timestamp=1489318676641
    url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
    //根據(jù)url獲取Registry對象
    //先連接注冊中心秧倾,把消費者注冊到注冊中心
    Registry registry = registryFactory.getRegistry(url);
    //判斷引用是否是注冊中心RegistryService怨酝,如果是直接返回剛得到的注冊中心服務(wù)
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }
    //以下是普通服務(wù)傀缩,需要進入注冊中心和集群下面的邏輯
    // group="a,b" or group="*"
    //獲取ref的各種屬性
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
    //獲取分組屬性
    String group = qs.get(Constants.GROUP_KEY);
    //先判斷引用服務(wù)是否需要合并不同實現(xiàn)的返回結(jié)果
    if (group != null && group.length() > 0 ) {
        if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1
                || "*".equals( group ) ) {
                //使用默認的分組聚合集群策略
            return doRefer( getMergeableCluster(), registry, type, url );
        }
    }
    //選擇配置的集群策略(cluster="failback")或者默認策略
    return doRefer(cluster, registry, type, url);
}

獲取注冊中心

連接注冊中心Registry registry = registryFactory.getRegistry(url);首先會到AbstractRegistryFactory的getRegistry方法:

public Registry getRegistry(URL url) {
    //這里url是
    //zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?
    //application=dubbo-consumer&dubbo=2.5.3&
    //interface=com.alibaba.dubbo.registry.RegistryService&
    //pid=12272&timestamp=1489318676641
    url = url.setPath(RegistryService.class.getName())
            .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
    //這里key是
    //zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService
    String key = url.toServiceString();
    // 鎖定注冊中心獲取過程那先,保證注冊中心單一實例
    LOCK.lock();
    try {
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }
        //這里用的是ZookeeperRegistryFactory
        //返回的Registry中封裝了已經(jīng)連接到Zookeeper的zkClient實例
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        //放到緩存中
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        // 釋放鎖
        LOCK.unlock();
    }
}

ZookeeperRegistryFactory的createRegistry方法:

public Registry createRegistry(URL url) {
    //直接返回一個新的ZookeeperRegistry實例
    //這里的zookeeperTransporter代碼在下面,動態(tài)生成的適配類
    return new ZookeeperRegistry(url, zookeeperTransporter);
}

zookeeperTransporter代碼:

package com.alibaba.dubbo.remoting.zookeeper;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ZookeeperTransporter$Adpative implements com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter {
    public com.alibaba.dubbo.remoting.zookeeper.ZookeeperClient connect(com.alibaba.dubbo.common.URL arg0) {
        if (arg0 == null) throw new IllegalArgumentException("url == null");
        
        com.alibaba.dubbo.common.URL url = arg0;
        String extName = url.getParameter("client", url.getParameter("transporter", "zkclient"));
        
        if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter) name from url(" + url.toString() + ") use keys([client, transporter])");
        
        com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter extension = (com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter.class).getExtension(extName);
        
        return extension.connect(arg0);
    }
}

上面代碼中可以看到赡艰,如果我們沒有指定Zookeeper的client屬性售淡,默認使用zkClient,所以上面的zookeeperTransporter是ZkclientZookeeperTransporter慷垮。

繼續(xù)看new ZookeeperRegistry(url, zookeeperTransporter);

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    //這里會先經(jīng)過AbstractRegistry的處理揖闸,然后經(jīng)過FailbackRegistry的處理(解釋在下面)
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    //服務(wù)分組,默認dubbo
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (! group.startsWith(Constants.PATH_SEPARATOR)) {
        group = Constants.PATH_SEPARATOR + group;
    }
    //注冊中心的節(jié)點
    this.root = group;
    //ZkclientZookeeperTransporter的connect方法
    //直接返回一個ZkclientZookeeperClient實例
    //具體的步驟是料身,new一個ZkClient實例汤纸,然后訂閱了一個狀態(tài)變化的監(jiān)聽器
    zkClient = zookeeperTransporter.connect(url);
    //添加一個狀態(tài)改變的監(jiān)聽器
    zkClient.addStateListener(new StateListener() {
        public void stateChanged(int state) {
            if (state == RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
}

AbstractRegistry的處理:

public AbstractRegistry(URL url) {
    //設(shè)置registryUrl
    setUrl(url);
    // 啟動文件保存定時器
    syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
    //會先去用戶主目錄下的.dubbo目錄下加載緩存注冊中心的緩存文件比如:dubbo-registry-127.0.0.1.cache
    String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache");
    File file = null;
    if (ConfigUtils.isNotEmpty(filename)) {
        file = new File(filename);
        if(! file.exists() && file.getParentFile() != null && ! file.getParentFile().exists()){
            if(! file.getParentFile().mkdirs()){
                throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
            }
        }
    }
    this.file = file;
    //緩存文件存在的話就把文件讀進內(nèi)存中
    loadProperties();
    //先獲取backup url
    //然后通知訂閱
    notify(url.getBackupUrls());
}

獲取注冊中心時的通知方法

notify方法:

protected void notify(List<URL> urls) {
    if(urls == null || urls.isEmpty()) return;
    //getSubscribed()方法獲取訂閱者列表
    for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
        URL url = entry.getKey();

        if(! UrlUtils.isMatch(url, urls.get(0))) {
            continue;
        }

        Set<NotifyListener> listeners = entry.getValue();
        if (listeners != null) {
            for (NotifyListener listener : listeners) {
                try {
                    //通知每個監(jiān)聽器
                    notify(url, listener, filterEmpty(url, urls));
                } catch (Throwable t) { }
            }
        }
    }
}

notify(url, listener, filterEmpty(url, urls));代碼:

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    Map<String, List<URL>> result = new HashMap<String, List<URL>>();
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
            //分類
            String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            List<URL> categoryList = result.get(category);
            if (categoryList == null) {
                categoryList = new ArrayList<URL>();
                result.put(category, categoryList);
            }
            categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
    Map<String, List<URL>> categoryNotified = notified.get(url);
    if (categoryNotified == null) {
        notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
        categoryNotified = notified.get(url);
    }
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        saveProperties(url);
        //通知
        listener.notify(categoryList);
    }
}

AbstractRegistry構(gòu)造完,接著是FailbackRegistry的處理:

public FailbackRegistry(URL url) {
    super(url);
    int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
    //啟動失敗重試定時器
    this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
        public void run() {
            // 檢測并連接注冊中心
            try {
                //重試方法由每個具體子類實現(xiàn)
                //獲取到注冊失敗的芹血,然后嘗試注冊
                retry();
            } catch (Throwable t) { // 防御性容錯 }
        }
    }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}

這里會啟動一個新的定時線程贮泞,主要是有連接失敗的話,會進行重試連接retry()幔烛,啟動完之后返回ZookeeperRegistry中繼續(xù)處理啃擦。接下來下一步是服務(wù)的引用。

引用遠程服務(wù)

繼續(xù)看ref方法中最后一步饿悬,服務(wù)的引用令蛉,返回的是一個Invoker,return doRefer(cluster, registry, type, url)狡恬;

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    //初始化Directory
    //組裝Directory珠叔,可以看成一個消費端的List蝎宇,可以隨著注冊中心的消息推送而動態(tài)的變化服務(wù)的Invoker
    //封裝了所有服務(wù)真正引用邏輯,覆蓋配置祷安,路由規(guī)則等邏輯
    //初始化時只需要向注冊中心發(fā)起訂閱請求夫啊,其他邏輯均是異步處理,包括服務(wù)的引用等
    //緩存接口所有的提供者端Invoker以及注冊中心接口相關(guān)的配置等
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    //此處的subscribeUrl為
    //consumer://192.168.1.100/dubbo.common.hello.service.HelloService?
    //application=dubbo-consumer&dubbo=2.5.3&
    //interface=dubbo.common.hello.service.HelloService&
    //methods=sayHello&pid=16409&
    //side=consumer&timeout=100000&timestamp=1489322133987
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
    if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {
            //到注冊中心注冊服務(wù)
            //此處regist是上面一步獲得的registry辆憔,即是ZookeeperRegistry撇眯,包含zkClient的實例
            //會先經(jīng)過AbstractRegistry的處理,然后經(jīng)過FailbackRegistry的處理(解析在下面)
        registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                Constants.CHECK_KEY, String.valueOf(false)));
    }
    //訂閱服務(wù)
    //有服務(wù)提供的時候虱咧,注冊中心會推送服務(wù)消息給消費者熊榛,消費者再進行服務(wù)的引用。
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
            Constants.PROVIDERS_CATEGORY 
            + "," + Constants.CONFIGURATORS_CATEGORY 
            + "," + Constants.ROUTERS_CATEGORY));
    //服務(wù)的引用與變更全部由Directory異步完成
    //集群策略會將Directory偽裝成一個Invoker返回
    //合并所有相同的invoker
    return cluster.join(directory);
}

注冊中心接收到消費者發(fā)送的訂閱請求后腕巡,會根據(jù)提供者注冊服務(wù)的列表莺戒,推送服務(wù)消息給消費者。消費者端接收到注冊中心發(fā)來的提供者列表后元旬,進行服務(wù)的引用岂昭。觸發(fā)Directory監(jiān)聽器的可以是訂閱請求,覆蓋策略消息车伞,路由策略消息择懂。

注冊到注冊中心

AbstractRegistry的register方法:

public void register(URL url) {
    //此時url是
    //consumer://192.168.1.100/dubbo.common.hello.service.HelloService?
    //application=dubbo-consumer&
    //category=consumers&check=false&dubbo=2.5.3&
    //interface=dubbo.common.hello.service.HelloService&methods=sayHello
    //&pid=16409&side=consumer&timeout=100000&timestamp=1489322133987
    if (url == null) {
        throw new IllegalArgumentException("register url == null");
    }
    if (logger.isInfoEnabled()){
        logger.info("Register: " + url);
    }
    registered.add(url);
}

上面只是把url添加到registered這個set中。

接著看FailbackRegistry的register方法:

public void register(URL url) {
    super.register(url);
    failedRegistered.remove(url);
    failedUnregistered.remove(url);
    try {
        // 向服務(wù)器端發(fā)送注冊請求
        //這里調(diào)用的是ZookeeperRegistry中的doRegister方法
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // 如果開啟了啟動時檢測另玖,則直接拋出異常
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if(skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // 將失敗的注冊請求記錄到失敗列表困曙,定時重試
        failedRegistered.add(url);
    }
}

接著看下doRegister(url);方法,向服務(wù)器端發(fā)送注冊請求谦去,在ZookeeperRegistry中:

protected void doRegister(URL url) {
    try {
        //直接調(diào)用create慷丽,在AbstractZookeeperClient類中
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

zkClient.create()方法:

//path為
///dubbo/dubbo.common.hello.service.HelloService/consumers/
//consumer%3A%2F%2F192.168.1.100%2F
//dubbo.common.hello.service.HelloService%3Fapplication%3D
//dubbo-consumer%26category%3Dconsumers%26check%3Dfalse%26
//dubbo%3D2.5.3%26interface%3D
//dubbo.common.hello.service.HelloService%26
//methods%3DsayHello%26pid%3D28819%26
//side%3Dconsumer%26timeout%3D100000%26timestamp%3D1489332839677
public void create(String path, boolean ephemeral) {
    int i = path.lastIndexOf('/');
    if (i > 0) {
        create(path.substring(0, i), false);
    }
    //循環(huán)完得到的path為/dubbo
    //dynamic=false 表示該數(shù)據(jù)為持久數(shù)據(jù),當注冊方退出時鳄哭,數(shù)據(jù)依然保存在注冊中心
    if (ephemeral) {
        //創(chuàng)建臨時的節(jié)點
        createEphemeral(path);
    } else {
        //創(chuàng)建持久的節(jié)點要糊,/dubbo/dubbo.common.hello.service.HelloService/consumers/
        //consumer%3A%2F%2F192.168.110.197%2F
        //dubbo.common.hello.service.HelloService%3Fapplication%3Ddubbo-consumer%26
        //category%3Dconsumers%26check%3Dfalse%26
        //dubbo%3D2.5.3%26interface%3D
        //dubbo.common.hello.service.HelloService%26
        //methods%3DsayHello%26pid%3D6370%26side%3D
        //consumer%26timeout%3D100000%26timestamp%3D1489367959659
        createPersistent(path);
    }
}

經(jīng)過上面create之后,Zookeeper中就存在了消費者需要訂閱的服務(wù)的節(jié)點:

/dubbo
    /dubbo.common.hello.service.HelloService
        /consumers
            /http://0.0.0.0:4550/?path=dubbo%2F
            dubbo.common.hello.service.HelloService%2F
            consumers%2Fconsumer%253A%252F%252F192.168.110.197%252F
            dubbo.common.hello.service.HelloService%253F
            application%253Ddubbo-consumer%2526category%253D
            consumers%2526check%253Dfalse%2526
            dubbo%253D2.5.3%2526interface%253D
            dubbo.common.hello.service.HelloService%2526
            methods%253DsayHello%2526pid%253D22392%2526side%253D
            consumer%2526timeout%253D100000%2526timestamp%253D1490063394184

訂閱服務(wù)提供者

消費者自己注冊到注冊中心之后妆丘,接著是訂閱服務(wù)提供者锄俄,directory.subscribe():

public void subscribe(URL url) {
    //設(shè)置消費者url
    setConsumerUrl(url);
    //這里的registry是ZookeeperRegistry
    registry.subscribe(url, this);
}

看下registry.subscribe(url, this);,這里registry是ZookeeperRegistry飘痛,會先經(jīng)過AbstractRegistry的處理珊膜,然后是FailbackRegistry的處理。

在AbstractRegistry中:

//此時url為consumer://192.168.1.100/dubbo.common.hello.service.HelloService?application=dubbo-consumer&
//category=providers,configurators,routers&dubbo=2.5.3&interface=dubbo.common.hello.service.HelloService&methods=
//sayHello&pid=28819&side=consumer&timeout=100000&timestamp=1489332839677
public void subscribe(URL url, NotifyListener listener) {
    //先根據(jù)url獲取已注冊的監(jiān)聽器
    Set<NotifyListener> listeners = subscribed.get(url);
    //沒有監(jiān)聽器宣脉,就創(chuàng)建车柠,并添加進去
    if (listeners == null) {
        subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
        listeners = subscribed.get(url);
    }
    //有監(jiān)聽器,直接把當前RegistryDirectory添加進去
    listeners.add(listener);
}

然后是FailbackRegistry中:

public void subscribe(URL url, NotifyListener listener) {
    super.subscribe(url, listener);
    removeFailedSubscribed(url, listener);
    try {
        // 向服務(wù)器端發(fā)送訂閱請求
        doSubscribe(url, listener);
    } catch (Exception e) {...}
}

繼續(xù)看doSubscribe(url, listener);向服務(wù)端發(fā)送訂閱請求,在ZookeeperRegistry中:

protected void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {... } else {
            List<URL> urls = new ArrayList<URL>();
            for (String path : toCategoriesPath(url)) {
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }
                //將zkClient的事件IZkChildListener轉(zhuǎn)換到registry事件NotifyListener
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                //創(chuàng)建三個節(jié)點
                // /dubbo/dubbo.common.hello.service.HelloService/providers/
                // /dubbo/dubbo.common.hello.service.HelloService/configurators/
                // /dubbo/dubbo.common.hello.service.HelloService/routers/
                //上面三個路徑會被消費者端監(jiān)聽竹祷,當提供者谈跛,配置,路由發(fā)生變化之后塑陵,
                //注冊中心會通知消費者刷新本地緩存感憾。
                zkClient.create(path, false);
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

服務(wù)訂閱完之后的通知

服務(wù)訂閱完成之后,接著就是notify(url, listener, urls);:

會先經(jīng)過FailbackRegistry將失敗的通知請求記錄到失敗列表令花,定時重試阻桅。

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    try {
        doNotify(url, listener, urls);
    } catch (Exception t) {
        // 將失敗的通知請求記錄到失敗列表,定時重試
        Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
        if (listeners == null) {
            failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
            listeners = failedNotified.get(url);
        }
        listeners.put(listener, urls);
        logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
    }
}

doNotify(url, listener, urls);:

protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
    //父類實現(xiàn)
    super.notify(url, listener, urls);
}

AbstractRegistry中的doNotify實現(xiàn):

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    Map<String, List<URL>> result = new HashMap<String, List<URL>>();
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
            //不同類型的數(shù)據(jù)分開通知兼都,providers嫂沉,consumers,routers扮碧,overrides
            //允許只通知其中一種類型趟章,但該類型的數(shù)據(jù)必須是全量的,不是增量的慎王。
            String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            List<URL> categoryList = result.get(category);
            if (categoryList == null) {
                categoryList = new ArrayList<URL>();
                result.put(category, categoryList);
            }
            categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
    Map<String, List<URL>> categoryNotified = notified.get(url);
    if (categoryNotified == null) {
        notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
        categoryNotified = notified.get(url);
    }
    //對這里得到的providers蚓土,configurators,routers分別進行通知
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        saveProperties(url);
        //這里的listener是RegistryDirectory
        listener.notify(categoryList);
    }
}

到RegistryDirectory中查看notify方法:

public synchronized void notify(List<URL> urls) {
    List<URL> invokerUrls = new ArrayList<URL>();
    List<URL> routerUrls = new ArrayList<URL>();
    List<URL> configuratorUrls = new ArrayList<URL>();
    for (URL url : urls) {
        String protocol = url.getProtocol();
        String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
        if (Constants.ROUTERS_CATEGORY.equals(category) 
                || Constants.ROUTE_PROTOCOL.equals(protocol)) {
            routerUrls.add(url);
        } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) 
                || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
            configuratorUrls.add(url);
        } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
            invokerUrls.add(url);
        } else {
            logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
        }
    }
    // configurators 更新緩存的服務(wù)提供方配置
    if (configuratorUrls != null && configuratorUrls.size() >0 ){
        this.configurators = toConfigurators(configuratorUrls);
    }
    // routers//更新緩存的路由規(guī)則配置
    if (routerUrls != null && routerUrls.size() >0 ){
        List<Router> routers = toRouters(routerUrls);
        if(routers != null){ // null - do nothing
            setRouters(routers);
        }
    }
    List<Configurator> localConfigurators = this.configurators; // local reference
    // 合并override參數(shù)
    this.overrideDirectoryUrl = directoryUrl;
    if (localConfigurators != null && localConfigurators.size() > 0) {
        for (Configurator configurator : localConfigurators) {
            this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
        }
    }
    // providers
    //重建invoker實例
    refreshInvoker(invokerUrls);
}

重建invoker實例

refreshInvoker(invokerUrls);:

/**
 * 根據(jù)invokerURL列表轉(zhuǎn)換為invoker列表赖淤。轉(zhuǎn)換規(guī)則如下:
 * 1.如果url已經(jīng)被轉(zhuǎn)換為invoker蜀漆,則不在重新引用,直接從緩存中獲取漫蛔,注意如果url中任何一個參數(shù)變更也會重新引用
 * 2.如果傳入的invoker列表不為空嗜愈,則表示最新的invoker列表
 * 3.如果傳入的invokerUrl列表是空,則表示只是下發(fā)的override規(guī)則或route規(guī)則莽龟,需要重新交叉對比,決定是否需要重新引用锨天。
 * @param invokerUrls 傳入的參數(shù)不能為null
 */
private void refreshInvoker(List<URL> invokerUrls){
    if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
            && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        this.forbidden = true; // 禁止訪問
        this.methodInvokerMap = null; // 置空列表
        destroyAllInvokers(); // 關(guān)閉所有Invoker
    } else {
        this.forbidden = false; // 允許訪問
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null){
            invokerUrls.addAll(this.cachedInvokerUrls);
        } else {
            this.cachedInvokerUrls = new HashSet<URL>();
            this.cachedInvokerUrls.addAll(invokerUrls);//緩存invokerUrls列表毯盈,便于交叉對比
        }
        if (invokerUrls.size() ==0 ){
            return;
        }
        //會重新走一遍服務(wù)的引用過程
        //給每個提供者創(chuàng)建一個Invoker
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 將URL列表轉(zhuǎn)成Invoker列表
        Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 換方法名映射Invoker列表
        // state change
        //如果計算錯誤,則不進行處理.
        if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ){
            logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :"+invokerUrls.size() + ", invoker.size :0. urls :"+invokerUrls.toString()));
            return ;
        }
        //服務(wù)提供者Invoker保存在這個map中
        this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
        this.urlInvokerMap = newUrlInvokerMap;
        try{
            destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); // 關(guān)閉未使用的Invoker
        }catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}

toInvokers(invokerUrls) 方法:

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
    Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
    if(urls == null || urls.size() == 0){
        return newUrlInvokerMap;
    }
    Set<String> keys = new HashSet<String>();
    String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
    for (URL providerUrl : urls) {
        //此時url是dubbo://192.168.110.197:20880/dubbo.common.hello.service.HelloService?anyhost=true&
        //application=dubbo-provider&application.version=1.0&dubbo=2.5.3&environment=product&
        //interface=dubbo.common.hello.service.HelloService&methods=sayHello&organization=china&
        //owner=cheng.xi&pid=5631&side=provider&timestamp=1489367571986
        //從注冊中心獲取到的攜帶提供者信息的url
        //如果reference端配置了protocol病袄,則只選擇匹配的protocol
        if (queryProtocols != null && queryProtocols.length() >0) {
            boolean accept = false;
            String[] acceptProtocols = queryProtocols.split(",");
            for (String acceptProtocol : acceptProtocols) {
                if (providerUrl.getProtocol().equals(acceptProtocol)) {
                    accept = true;
                    break;
                }
            }
            if (!accept) {
                continue;
            }
        }
        if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
            continue;
        }
        if (! ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
            logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost() 
                    + ", supported protocol: "+ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
            continue;
        }
        URL url = mergeUrl(providerUrl);

        String key = url.toFullString(); // URL參數(shù)是排序的
        if (keys.contains(key)) { // 重復(fù)URL
            continue;
        }
        keys.add(key);
        // 緩存key為沒有合并消費端參數(shù)的URL搂赋,不管消費端如何合并參數(shù),如果服務(wù)端URL發(fā)生變化益缠,則重新refer
        Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
        Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
        if (invoker == null) { // 緩存中沒有脑奠,重新refer
            try {
                boolean enabled = true;
                if (url.hasParameter(Constants.DISABLED_KEY)) {
                    enabled = ! url.getParameter(Constants.DISABLED_KEY, false);
                } else {
                    enabled = url.getParameter(Constants.ENABLED_KEY, true);
                }
                if (enabled) {
                    //根據(jù)擴展點加載機制,這里使用的protocol是DubboProtocol
                    invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
                }
            } catch (Throwable t) {
                logger.error("Failed to refer invoker for interface:"+serviceType+",url:("+url+")" + t.getMessage(), t);
            }
            if (invoker != null) { // 將新的引用放入緩存
                newUrlInvokerMap.put(key, invoker);
            }
        }else {
            newUrlInvokerMap.put(key, invoker);
        }
    }
    keys.clear();
    return newUrlInvokerMap;
}

創(chuàng)建invoker invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);

  • 先使用DubboProtocol的refer方法幅慌,這一步會依次調(diào)用ProtocolFIlterListenerWrapper宋欺,ProtocolFilterWrapper,DubboProtocol中的refer方法。經(jīng)過兩個Wrapper中齿诞,會添加對應(yīng)的InvokerListener并構(gòu)建Invoker Filter鏈酸休,在DubboProtocol中會創(chuàng)建一個DubboInvoker對象,該Invoker對象持有服務(wù)Class祷杈,providerUrl斑司,負責和服務(wù)提供端通信的ExchangeClient。
  • 接著使用得到的Invoker創(chuàng)建一個InvokerDelegete

創(chuàng)建invoker

在DubboProtocol中創(chuàng)建DubboInvoker的時候代碼如下:

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    // create rpc invoker.
    //這里有一個getClients方法
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
    return invoker;
}

查看getClients方法:

private ExchangeClient[] getClients(URL url){
    //是否共享連接
    boolean service_share_connect = false;
    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
    //如果connections不配置但汞,則共享連接宿刮,否則每服務(wù)每連接
    if (connections == 0){
        service_share_connect = true;
        connections = 1;
    }

    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (service_share_connect){
            //這里沒有配置connections,就使用getSharedClient
            //getSharedClient中先去緩存中查找私蕾,沒有的話就會新建糙置,也是調(diào)用initClient方法
            clients[i] = getSharedClient(url);
        } else {
            clients[i] = initClient(url);
        }
    }
    return clients;
}

直接看initClient方法:

//創(chuàng)建新連接
private ExchangeClient initClient(URL url) {
        
    // client type setting.
    String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

    String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
    boolean compatible = (version != null && version.startsWith("1.0."));
    url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
    //默認開啟heartbeat
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

    // BIO存在嚴重性能問題,暫時不允許使用
    if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported client type: " + str + "," +
                " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
    }

    ExchangeClient client ;
    try {
        //如果lazy屬性沒有配置為true(我們沒有配置是目,默認為false)ExchangeClient會馬上和服務(wù)端建立連接
        //設(shè)置連接應(yīng)該是lazy的 
        if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)){
            client = new LazyConnectExchangeClient(url ,requestHandler);
        } else {
            //立即和服務(wù)端建立連接
            client = Exchangers.connect(url ,requestHandler);
        }
    } catch (RemotingException e) {
        throw new RpcException("Fail to create remoting client for service(" + url
                + "): " + e.getMessage(), e);
    }
    return client;
}

和服務(wù)端建立連接谤饭,Exchangers.connect(url ,requestHandler);,其實最后使用的是HeaderExchanger懊纳,Exchanger目前只有這一個實現(xiàn):

public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    //先經(jīng)過HeaderExchangeHandler包裝
    //然后是DecodeHandler
    //然后是Transporters.connect
    //返回一個HeaderExchangerClient揉抵,這里封裝了client,channel嗤疯,啟動心跳的定時器等
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

Transporters.connect中也是根據(jù)SPI擴展獲取Transport的具體實現(xiàn)冤今,這里默認使用NettyTransporter.connect(),在NettyTransporter的connect方法中直接返回一個NettyClient(url, listener);茂缚,下面看下具體的NettyClient初始化細節(jié)戏罢,會先初始化AbstractPeer這里只是吧url和handler賦值;然后是AbstractEndpoint初始化:

public AbstractEndpoint(URL url, ChannelHandler handler) {
    super(url, handler);
    //獲取編解碼器脚囊,這里是DubboCountCodec
    this.codec = getChannelCodec(url);
    this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
}

接著是AbstractClient的初始化:

public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
    shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
    //默認重連間隔2s龟糕,1800表示1小時warning一次.
    reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);

    try {
        //具體實現(xiàn)在子類中
        doOpen();
    } catch (Throwable t) {。悔耘。讲岁。 }
    try {
        // 連接
        connect();
    } catch (RemotingException t) {。衬以。缓艳。} 
    // TODO暫沒理解
    executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
        .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
    ExtensionLoader.getExtensionLoader(DataStore.class)
        .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}

看下在NettyClient中doOpen()的實現(xiàn):

protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    bootstrap = new ClientBootstrap(channelFactory);
    // config
    // @see org.jboss.netty.channel.socket.SocketChannelConfig
    bootstrap.setOption("keepAlive", true);
    bootstrap.setOption("tcpNoDelay", true);
    bootstrap.setOption("connectTimeoutMillis", getTimeout());
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
}

這里是Netty3中的客戶端連接的一些常規(guī)步驟,暫不做具體解析看峻。open之后阶淘,就是真正連接服務(wù)端的操作了,connect():

protected void connect() throws RemotingException {
    connectLock.lock();
    try {
        if (isConnected()) {
            return;
        }
        //初始化重連的線程
        initConnectStatusCheckCommand();
        //連接互妓,在子類中實現(xiàn)
        doConnect();
        reconnect_count.set(0);
        reconnect_error_log_flag.set(false);
    } catch (RemotingException e) {溪窒。坤塞。。} finally {
        connectLock.unlock();
    }
}

NettyClient中的doConnect方法:

protected void doConnect() throws Throwable {
    long start = System.currentTimeMillis();
    //消費者端開始連接霉猛,這一步的時候尺锚,服務(wù)提供者端就接到了連接請求,開始處理了
    ChannelFuture future = bootstrap.connect(getConnectAddress());
    try{
        boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
        if (ret && future.isSuccess()) {
            Channel newChannel = future.getChannel();
            newChannel.setInterestOps(Channel.OP_READ_WRITE);
            try {
                // 關(guān)閉舊的連接
                Channel oldChannel = NettyClient.this.channel; // copy reference
                if (oldChannel != null) {
                    try {
                        oldChannel.close();
                    } finally {
                        NettyChannel.removeChannelIfDisconnected(oldChannel);
                    }
                }
            } finally {
                if (NettyClient.this.isClosed()) {
                    try {
                        newChannel.close();
                    } finally {
                        NettyClient.this.channel = null;
                        NettyChannel.removeChannelIfDisconnected(newChannel);
                    }
                } else {
                    NettyClient.this.channel = newChannel;
                }
            }
        } else if (future.getCause() != null) { throw惜浅。瘫辩。。  } else {throw 坛悉。伐厌。。 }
    }finally{
        if (! isConnected()) {
            future.cancel();
        }
    }
}

這里連接的細節(jié)都交給了netty裸影。

NettyClient初始化完成之后挣轨,返回給Transporters,再返回給HeaderExchanger轩猩,HeaderExchanger中將NettyClient包裝成HeaderExchangeClient返回給DubboProtocol的initClient方法中卷扮,到此在getSharedClient中就獲取到了一個ExchangeClient,然后包裝一下返回client = new ReferenceCountExchangeClient(exchagneclient, ghostClientMap);均践。

到這里在DubboProtocol的refer方法中這句DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);創(chuàng)建DubboInvoker就已經(jīng)解析完成晤锹,創(chuàng)建過程中連接了服務(wù)端,包含一個ExchangeClient等:

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    //將invoker緩存
    invokers.add(invoker);
    //返回invoker
    return invoker;
}

接著返回ProtocolFilterWrapper的refer方法彤委,在這里會構(gòu)建invoker鏈:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        return protocol.refer(type, url);
    }
    return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}

接著再返回到ProtocolListenerWrapper的refer方法鞭铆,這里會初始化監(jiān)聽器,包裝:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        return protocol.refer(type, url);
    }
    return new ListenerInvokerWrapper<T>(protocol.refer(type, url), 
            Collections.unmodifiableList(
                    ExtensionLoader.getExtensionLoader(InvokerListener.class)
                    .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
}

接著在返回到toInvokers方法焦影,然后返回refreshInvoker方法的Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;這就獲得了Invoker车遂,接著就是方法名映射Invoker列表:Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);這里將invokers列表轉(zhuǎn)成與方法的映射關(guān)系。到這里refreshInvoker方法就完成了斯辰,在往上就返回到AbstractRegistry的notify方法舶担,到這里也完成了。

創(chuàng)建服務(wù)代理

到這里有關(guān)消費者端注冊到注冊中心和訂閱注冊中心就完事兒了椒涯,這部分是在RegistryProtocol.doRefer方法中柄沮,這個方法最后一句是return cluster.join(directory);,這里由Cluster組件創(chuàng)建一個Invoker并返回废岂,這里的cluster默認是用FailoverCluster,最后返回的是經(jīng)過MockClusterInvoker包裝過的FailoverCluster狱意。繼續(xù)返回到ReferenceConfig中createProxy方法湖苞,這時候我們已經(jīng)完成了消費者端引用服務(wù)的Invoker。然后最后返回的是根據(jù)我們得到的invoker創(chuàng)建的服務(wù)代理return (T) proxyFactory.getProxy(invoker);详囤。這里proxyFactory是我們在最上面列出的動態(tài)生成的代碼财骨。

首先經(jīng)過AbstractProxyFactory的處理:

public <T> T getProxy(Invoker<T> invoker) throws RpcException {
    Class<?>[] interfaces = null;
    String config = invoker.getUrl().getParameter("interfaces");
    if (config != null && config.length() > 0) {
        String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
        if (types != null && types.length > 0) {
            interfaces = new Class<?>[types.length + 2];
            interfaces[0] = invoker.getInterface();
            interfaces[1] = EchoService.class;
            for (int i = 0; i < types.length; i ++) {
                interfaces[i + 1] = ReflectUtils.forName(types[i]);
            }
        }
    }
    if (interfaces == null) {
        interfaces = new Class<?>[] {invoker.getInterface(), EchoService.class};
    }
    //這里默認使用的是JavassistProxyFactory的實現(xiàn)
    return getProxy(invoker, interfaces);
}

然后經(jīng)過StubProxyFactoryWrapper的處理:

public <T> T getProxy(Invoker<T> invoker) throws RpcException {
    T proxy = proxyFactory.getProxy(invoker);
    if (GenericService.class != invoker.getInterface()) {
        String stub = invoker.getUrl().getParameter(Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY));
        if (ConfigUtils.isNotEmpty(stub)) {
            Class<?> serviceType = invoker.getInterface();
            if (ConfigUtils.isDefault(stub)) {
                if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) {
                    stub = serviceType.getName() + "Stub";
                } else {
                    stub = serviceType.getName() + "Local";
                }
            }
            try {
                Class<?> stubClass = ReflectUtils.forName(stub);
                if (! serviceType.isAssignableFrom(stubClass)) {
                    throw new IllegalStateException("The stub implemention class " + stubClass.getName() + " not implement interface " + serviceType.getName());
                }
                try {
                    Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);
                    proxy = (T) constructor.newInstance(new Object[] {proxy});
                    //export stub service
                    URL url = invoker.getUrl();
                    if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)){
                        url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join(Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ","));
                        url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString());
                        try{
                            export(proxy, (Class)invoker.getInterface(), url);
                        }catch (Exception e) {
                            LOGGER.error("export a stub service error.", e);
                        }
                    }
                } catch (NoSuchMethodException e) {
                    throw new IllegalStateException("No such constructor \"public " + stubClass.getSimpleName() + "(" + serviceType.getName() + ")\" in stub implemention class " + stubClass.getName(), e);
                }
            } catch (Throwable t) {
                LOGGER.error("Failed to create stub implemention class " + stub + " in consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", cause: " + t.getMessage(), t);
                // ignore
            }
        }
    }
    return proxy;
}

返回代理镐作。到此HelloService helloService = (HelloService) applicationContext.getBean("helloService");就解析完成了,得到了服務(wù)的代理隆箩,代理會被注冊到Spring容器中该贾,可以調(diào)用服務(wù)方法了。接下來的方法調(diào)用過程捌臊,是消費者發(fā)送請求杨蛋,提供者處理,然后消費者接受處理結(jié)果的請求理澎。

初始化的過程:主要做了注冊到注冊中心逞力,監(jiān)聽注冊中心,連接到服務(wù)提供者端糠爬,創(chuàng)建代理寇荧。這些都是為了下面消費者和提供者之間的通信做準備。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末执隧,一起剝皮案震驚了整個濱河市揩抡,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌镀琉,老刑警劉巖峦嗤,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異滚粟,居然都是意外死亡寻仗,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進店門凡壤,熙熙樓的掌柜王于貴愁眉苦臉地迎上來署尤,“玉大人,你說我怎么就攤上這事亚侠〔芴澹” “怎么了?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵硝烂,是天一觀的道長箕别。 經(jīng)常有香客問我,道長滞谢,這世上最難降的妖魔是什么串稀? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮狮杨,結(jié)果婚禮上母截,老公的妹妹穿的比我還像新娘。我一直安慰自己橄教,他們只是感情好清寇,可當我...
    茶點故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布喘漏。 她就那樣靜靜地躺著,像睡著了一般华烟。 火紅的嫁衣襯著肌膚如雪翩迈。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天盔夜,我揣著相機與錄音,去河邊找鬼比吭。 笑死绽族,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的衩藤。 我是一名探鬼主播吧慢,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼赏表!你這毒婦竟也來了检诗?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤瓢剿,失蹤者是張志新(化名)和其女友劉穎逢慌,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體间狂,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡攻泼,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了鉴象。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片忙菠。...
    茶點故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖纺弊,靈堂內(nèi)的尸體忽然破棺而出牛欢,到底是詐尸還是另有隱情,我是刑警寧澤淆游,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布傍睹,位于F島的核電站,受9級特大地震影響犹菱,放射性物質(zhì)發(fā)生泄漏拾稳。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一腊脱、第九天 我趴在偏房一處隱蔽的房頂上張望熊赖。 院中可真熱鬧,春花似錦虑椎、人聲如沸震鹉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽传趾。三九已至,卻和暖如春泥技,著一層夾襖步出監(jiān)牢的瞬間浆兰,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工珊豹, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留簸呈,地道東北人。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓店茶,卻偏偏與公主長得像蜕便,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子贩幻,可洞房花燭夜當晚...
    茶點故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內(nèi)容

  • dubbo暴露服務(wù)有兩種情況轿腺,一種是設(shè)置了延遲暴露(比如delay="5000"),另外一種是沒有設(shè)置延遲暴露或者...
    加大裝益達閱讀 21,270評論 5 36
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理丛楚,服務(wù)發(fā)現(xiàn)族壳,斷路器,智...
    卡卡羅2017閱讀 134,654評論 18 139
  • 噢~星期五了~哦趣些,還有40分鐘就下班了仿荆。唉~又過了一個禮拜~嗯~是熬過去的吧... 今天看到幾句話很喜歡 盛年不再...
    3624db714e15閱讀 231評論 0 0
  • 眨眼的樣子拢操,2015就變成2016了,離開學(xué)校一年有半功茴,患上鼠標手已近兩歲庐冯,而且還有,跑男我都看到第三季了坎穿。 大多...
    框框之上閱讀 306評論 0 0