Dubbo——服務(wù)引用流程

前言

本文接著深入分析服務(wù)引用的核心流程胁后。

Dubbo 支持兩種方式引用遠(yuǎn)程的服務(wù):

  • 1挟阻、服務(wù)直連的方式琼娘,僅適合在調(diào)試服務(wù)的時(shí)候使用。

  • 2赁濒、基于注冊(cè)中心引用服務(wù)轨奄,這是生產(chǎn)環(huán)境中使用的服務(wù)引用方式孟害。

DubboBootstrap 入口

在介紹服務(wù)發(fā)布的時(shí)候拒炎,介紹了 DubboBootstrap.start() 方法的核心流程,其中除了會(huì)調(diào)用 exportServices() 方法完成服務(wù)發(fā)布之外挨务,還會(huì)調(diào)用 referServices() 方法完成服務(wù)引用击你。

在 DubboBootstrap.referServices() 方法中,會(huì)從 ConfigManager 中獲取所有 ReferenceConfig 列表谎柄,并根據(jù) ReferenceConfig 獲取對(duì)應(yīng)的代理對(duì)象丁侄,入口邏輯如下:

public class DubboBootstrap extends GenericEventListener {

    private final ConfigManager configManager;

    private ReferenceConfigCache cache;

    private final ExecutorRepository executorRepository = getExtensionLoader(ExecutorRepository.class).getDefaultExtension();

    private volatile boolean referAsync;
    
    private List<CompletableFuture<Object>> asyncReferringFutures = new ArrayList<>();

    private void referServices() {
        if (cache == null) {
            // 初始ReferenceConfigCache
            cache = ReferenceConfigCache.getCache();
        }
        
        // 遍歷ReferenceConfig列表
        configManager.getReferences().forEach(rc -> {
            
            ReferenceConfig referenceConfig = (ReferenceConfig) rc;
            referenceConfig.setBootstrap(this);
            // 檢測(cè)ReferenceConfig是否已經(jīng)初始化
            if (rc.shouldInit()) {
                 // 異步
                if (referAsync) {
                    CompletableFuture<Object> future = ScheduledCompletableFuture.submit(
                            executorRepository.getServiceExporterExecutor(),
                            () -> cache.get(rc)
                    );
                    asyncReferringFutures.add(future);
                } else {
                    // 同步
                    cache.get(rc);
                }
            }
        });
    }
}

Dubbo 服務(wù)引用的時(shí)機(jī)有兩個(gè),第一個(gè)是在 Spring 容器調(diào)用 ReferenceBean 的 afterPropertiesSet 方法時(shí)引用服務(wù)朝巫,第二個(gè)是在 ReferenceBean 對(duì)應(yīng)的服務(wù)被注入到其他類中時(shí)引用鸿摇。這兩個(gè)引用服務(wù)的時(shí)機(jī)區(qū)別在于,第一個(gè)是餓漢式的劈猿,第二個(gè)是懶漢式的拙吉。默認(rèn)情況下,Dubbo 使用懶漢式引用服務(wù)揪荣。

新建的 ReferenceConfig 對(duì)象會(huì)通過 DubboBootstrap.reference() 方法添加到 ConfigManager 中進(jìn)行管理筷黔,如下所示:

public class DubboBootstrap extends GenericEventListener {

    private final ConfigManager configManager;

    public DubboBootstrap reference(ReferenceConfig<?> referenceConfig) {
        configManager.addReference(referenceConfig);
        return this;
    }
}

ReferenceConfigCache

服務(wù)引用的核心實(shí)現(xiàn)在 ReferenceConfig 之中,一個(gè) ReferenceConfig 對(duì)象對(duì)應(yīng)一個(gè)服務(wù)接口仗颈,每個(gè) ReferenceConfig 對(duì)象中都封裝了與注冊(cè)中心的網(wǎng)絡(luò)連接佛舱,以及與 Provider 的網(wǎng)絡(luò)連接,這是一個(gè)非常重要的對(duì)象。

為了避免底層連接泄漏造成性能問題请祖,從 Dubbo 2.4.0 版本開始订歪,Dubbo 提供了 ReferenceConfigCache 用于緩存 ReferenceConfig 實(shí)例。

在 dubbo-demo-api-consumer 示例中肆捕,我們可以看到 ReferenceConfigCache 的基本使用方式:

ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
reference.setInterface(DemoService.class);
... 
// 這一步在DubboBootstrap.start()方法中完成
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
...
DemoService demoService = ReferenceConfigCache.getCache().get(reference);

在 ReferenceConfigCache 中維護(hù)了一個(gè)靜態(tài)的 Map(CACHE_HOLDER)字段陌粹,其中 Key 是由 Group、服務(wù)接口和 version 構(gòu)成福压,Value 是一個(gè) ReferenceConfigCache 對(duì)象掏秩。在 ReferenceConfigCache 中可以傳入一個(gè) KeyGenerator 用來修改緩存 Key 的生成邏輯,KeyGenerator 接口的定義如下:

public class ReferenceConfigCache {

    public interface KeyGenerator {
        String generateKey(ReferenceConfigBase<?> referenceConfig);
    }
}

默認(rèn)的 KeyGenerator 實(shí)現(xiàn)是 ReferenceConfigCache 中的匿名內(nèi)部類荆姆,其對(duì)象由 DEFAULT_KEY_GENERATOR 這個(gè)靜態(tài)字段引用蒙幻,具體實(shí)現(xiàn)如下:

public class ReferenceConfigCache {

    public static final String DEFAULT_NAME = "_DEFAULT_";

    public static final KeyGenerator DEFAULT_KEY_GENERATOR = referenceConfig -> {
        String iName = referenceConfig.getInterface();
        if (StringUtils.isBlank(iName)) {
            // 獲取服務(wù)接口名稱
            Class<?> clazz = referenceConfig.getInterfaceClass();
            iName = clazz.getName();
        }
        if (StringUtils.isBlank(iName)) {
            throw new IllegalArgumentException("No interface info in ReferenceConfig" + referenceConfig);
        }
        // Key的格式是group/interface:version
        StringBuilder ret = new StringBuilder();
        if (!StringUtils.isBlank(referenceConfig.getGroup())) {
            ret.append(referenceConfig.getGroup()).append("/");
        }
        ret.append(iName);
        if (!StringUtils.isBlank(referenceConfig.getVersion())) {
            ret.append(":").append(referenceConfig.getVersion());
        }
        return ret.toString();
    };
}

在 ReferenceConfigCache 實(shí)例對(duì)象中,會(huì)維護(hù)下面兩個(gè) Map 集合:

  • proxies(ConcurrentMap<Class<?>, ConcurrentMap<String, Object>>類型):該集合用來存儲(chǔ)服務(wù)接口的全部代理對(duì)象胆筒,其中第一層 Key 是服務(wù)接口的類型邮破,第二層 Key 是上面介紹的 KeyGenerator 為不同服務(wù)提供方生成的 Key,Value 是服務(wù)的代理對(duì)象仆救。

  • referredReferences(ConcurrentMap<String, ReferenceConfigBase<?>> 類型):該集合用來存儲(chǔ)已經(jīng)被處理的 ReferenceConfig 對(duì)象抒和。

回到 DubboBootstrap.referServices() 方法中,看一下其中與 ReferenceConfigCache 相關(guān)的邏輯彤蔽。

首先是 ReferenceConfigCache.getCache() 這個(gè)靜態(tài)方法摧莽,會(huì)在 CACHE_HOLDER 集合中添加一個(gè) Key 為“DEFAULT”的 ReferenceConfigCache 對(duì)象(使用默認(rèn)的 KeyGenerator 實(shí)現(xiàn)),它將作為默認(rèn)的 ReferenceConfigCache 對(duì)象顿痪。

接下來镊辕,無論是同步服務(wù)引用還是異步服務(wù)引用,都會(huì)調(diào)用 ReferenceConfigCache.get() 方法蚁袭,創(chuàng)建并緩存代理對(duì)象征懈。下面就是 ReferenceConfigCache.get() 方法的核心實(shí)現(xiàn):

public class ReferenceConfigCache {

    public <T> T get(ReferenceConfigBase<T> referenceConfig) {
        // 生成服務(wù)提供方對(duì)應(yīng)的Key
        String key = generator.generateKey(referenceConfig);
        // 獲取接口類型
        Class<?> type = referenceConfig.getInterfaceClass();
        // 獲取該接口對(duì)應(yīng)代理對(duì)象集合
        proxies.computeIfAbsent(type, _t -> new ConcurrentHashMap<>());

        ConcurrentMap<String, Object> proxiesOfType = proxies.get(type);
        
        // 根據(jù)Key獲取服務(wù)提供方對(duì)應(yīng)的代理對(duì)象
        proxiesOfType.computeIfAbsent(key, _k -> {
            // 服務(wù)引用
            Object proxy = referenceConfig.get();
            // 將ReferenceConfig記錄到referredReferences集合
            referredReferences.put(key, referenceConfig);
            return proxy;
        });

        return (T) proxiesOfType.get(key);
    }
}

ReferenceConfig

通過前面的介紹知道,ReferenceConfig 是服務(wù)引用的真正入口揩悄,其中會(huì)創(chuàng)建相關(guān)的代理對(duì)象卖哎。下面先來看 ReferenceConfig.get() 方法:

public class ReferenceConfig<T> extends ReferenceConfigBase<T> {

    private transient volatile T ref;
    
    public synchronized T get() {
        // 檢測(cè)當(dāng)前ReferenceConfig狀態(tài)
        if (destroyed) {
            throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
        }
        // 檢測(cè) ref 是否為空,為空則通過 init 方法創(chuàng)建
        if (ref == null) {// ref指向了服務(wù)的代理對(duì)象
            // 啟動(dòng)初始化操作 init 方法主要用于處理配置删性,以及調(diào)用 createProxy 生成代理類
            init();
        }
        return ref;
    }
}

在 ReferenceConfig.init() 方法中亏娜,首先會(huì)對(duì)服務(wù)引用的配置進(jìn)行處理,以保證配置的正確性镇匀。

ReferenceConfig.init() 方法的核心邏輯是調(diào)用 createProxy() 方法照藻,調(diào)用之前會(huì)從配置中獲取 createProxy() 方法需要的參數(shù):

public class ReferenceConfig<T> extends ReferenceConfigBase<T> {

    private transient volatile T ref;
    
    private transient volatile boolean initialized;

    private DubboBootstrap bootstrap;   
    
    public synchronized void init() {
        //避免重復(fù)加載
        if (initialized) {
            return;
        }

        //獲取Dubbo核心容器
        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            //進(jìn)行Dubbo核心配置的加載和檢查
            bootstrap.initialize();
        }
        //在對(duì)象創(chuàng)建后在使用其他配置模塊配置對(duì)象之前檢查對(duì)象配置并重寫默認(rèn)配置
        checkAndUpdateSubConfigs();
        //檢查并生成sub配置和Local配置是否合法
        checkStubAndLocal(interfaceClass);
        //判斷對(duì)象是否有mock并生成mock信息
        ConfigValidationUtils.checkMock(interfaceClass, this);
        //保存對(duì)象屬性map信息
        Map<String, String> map = new HashMap<String, String>();
        map.put(SIDE_KEY, CONSUMER_SIDE);
        //添加版本信息,包含dubbo版本汗侵,release版本幸缕,timestamp運(yùn)行時(shí)間戳和sid_key等信息
        ReferenceConfigBase.appendRuntimeParameters(map);
        //添加泛型 revision信息
        if (!ProtocolUtils.isGeneric(generic)) {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put(REVISION_KEY, revision);
            }
            //生成服務(wù)的代理對(duì)象群发,跟服務(wù)導(dǎo)出是一樣,通過代理對(duì)象來代理发乔,返回代理方法
            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("No method found in service interface " + interfaceClass.getName());
                map.put(METHODS_KEY, ANY_VALUE);
            } else {
                //添加需要代理的方法
                map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
            }
        }
        //添加interface名
        map.put(INTERFACE_KEY, interfaceName);
        //添加重試信息
        AbstractConfig.appendParameters(map, getMetrics());
        //檢查獲取并添加Application信息
        AbstractConfig.appendParameters(map, getApplication());
        //檢查獲取并添加Module信息
        AbstractConfig.appendParameters(map, getModule());
        // remove 'default.' prefix for configs from ConsumerConfig
        // appendParameters(map, consumer, Constants.DEFAULT_KEY);
        //檢查獲取并添加consumer信息
        AbstractConfig.appendParameters(map, consumer);
        AbstractConfig.appendParameters(map, this);
        MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
        if (metadataReportConfig != null && metadataReportConfig.isValid()) {
            map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
        }
        //設(shè)置方法重試信息并收集方法異步調(diào)用信息
        Map<String, AsyncMethodInfo> attributes = null;
        if (CollectionUtils.isNotEmpty(getMethods())) {
            attributes = new HashMap<>();
            for (MethodConfig methodConfig : getMethods()) {
                AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
                String retryKey = methodConfig.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(methodConfig.getName() + ".retries", "0");
                    }
                }
                AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
                if (asyncMethodInfo != null) {
//                    consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
                    attributes.put(methodConfig.getName(), asyncMethodInfo);
                }
            }
        }
        //獲取服務(wù)消費(fèi)者 ip 地址
        String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
        if (StringUtils.isEmpty(hostToRegistry)) {
            hostToRegistry = NetUtils.getLocalHost();
        } else if (isInvalidLocalHost(hostToRegistry)) {
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
        }
        //添加服務(wù)注冊(cè)信息
        map.put(REGISTER_IP_KEY, hostToRegistry);
        //將配置保存如服務(wù)元信息中
        serviceMetadata.getAttachments().putAll(map);
        //創(chuàng)建代理
        ref = createProxy(map);

        serviceMetadata.setTarget(ref);
        serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
        // 根據(jù)服務(wù)名熟妓,ReferenceConfig,代理類構(gòu)建 ConsumerModel栏尚,
        // 并將 ConsumerModel 存入到 ApplicationModel 中      
        ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
        consumerModel.setProxyObject(ref);
        consumerModel.init(attributes);

        initialized = true;
        //檢查引入的服務(wù)是否可用
        checkInvokerAvailable();

        // dispatch a ReferenceConfigInitializedEvent since 2.7.4
        dispatch(new ReferenceConfigInitializedEvent(this, invoker));
    }
}

ReferenceConfig.createProxy() 方法中處理了多種服務(wù)引用的場(chǎng)景起愈,例如,直連單個(gè)/多個(gè)Provider译仗、單個(gè)/多個(gè)注冊(cè)中心抬虽。下面是 createProxy() 方法的核心流程,大致可以梳理出這么 5 個(gè)步驟:

  • 1纵菌、根據(jù)傳入的參數(shù)集合判斷協(xié)議是否為 injvm 協(xié)議阐污,如果是,直接通過 InjvmProtocol 引用服務(wù)咱圆。

  • 2笛辟、構(gòu)造 urls 集合。Dubbo 支持直連 Provider和依賴注冊(cè)中心兩種服務(wù)引用方式序苏。如果是直連服務(wù)的模式手幢,我們可以通過 url 參數(shù)指定一個(gè)或者多個(gè) Provider 地址,會(huì)被解析并填充到 urls 集合忱详;如果通過注冊(cè)中心的方式進(jìn)行服務(wù)引用围来,則會(huì)調(diào)用 AbstractInterfaceConfig.loadRegistries() 方法加載所有注冊(cè)中心。

  • 3踱阿、如果 urls 集合中只記錄了一個(gè) URL管钳,通過 Protocol 適配器選擇合適的 Protocol 擴(kuò)展實(shí)現(xiàn)創(chuàng)建 Invoker 對(duì)象钦铁。如果是直連 Provider 的場(chǎng)景软舌,則 URL 為 dubbo 協(xié)議,這里就會(huì)使用 DubboProtocol 這個(gè)實(shí)現(xiàn)牛曹;如果依賴注冊(cè)中心佛点,則使用 RegistryProtocol 這個(gè)實(shí)現(xiàn)。

  • 4黎比、如果 urls 集合中有多個(gè)注冊(cè)中心超营,則使用 ZoneAwareCluster 作為 Cluster 的默認(rèn)實(shí)現(xiàn),生成對(duì)應(yīng)的 Invoker 對(duì)象阅虫;如果 urls 集合中記錄的是多個(gè)直連服務(wù)的地址演闭,則使用 Cluster 適配器選擇合適的擴(kuò)展實(shí)現(xiàn)生成 Invoker 對(duì)象。

  • 5颓帝、通過 ProxyFactory 適配器選擇合適的 ProxyFactory 擴(kuò)展實(shí)現(xiàn)米碰,將 Invoker 包裝成服務(wù)接口的代理對(duì)象窝革。

通過上面的流程我們可以看出createProxy() 方法中有兩個(gè)核心:

  • 1、通過 Protocol 適配器選擇合適的 Protocol 擴(kuò)展實(shí)現(xiàn)創(chuàng)建 Invoker 對(duì)象吕座。
  • 2虐译、通過 ProxyFactory 適配器選擇合適的 ProxyFactory 創(chuàng)建代理對(duì)象。

下面我們來看 createProxy() 方法的具體實(shí)現(xiàn):

public class ReferenceConfig<T> extends ReferenceConfigBase<T> {

    private T createProxy(Map<String, String> map) {
        //jvm本地引入
        // 根據(jù)url的協(xié)議吴趴、scope以及injvm等參數(shù)檢測(cè)是否需要本地引用
        if (shouldJvmRefer(map)) {
            // 創(chuàng)建injvm協(xié)議的URL
            URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
            // 本地引用invoker生成
            // 通過Protocol的適配器選擇對(duì)應(yīng)的Protocol實(shí)現(xiàn)創(chuàng)建Invoker對(duì)象
            invoker = REF_PROTOCOL.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
            urls.clear();
            // 用戶配置url信息,表明用戶可能想進(jìn)行點(diǎn)對(duì)點(diǎn)調(diào)用
            if (url != null && url.length() > 0) { 
                // 當(dāng)需要配置多個(gè) url 時(shí)漆诽,可用分號(hào)進(jìn)行分割,這里會(huì)進(jìn)行切分
                String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (StringUtils.isEmpty(url.getPath())) {
                            // 設(shè)置接口全限定名為 url 路徑
                            url = url.setPath(interfaceName);
                        }
                        // 檢測(cè) url 協(xié)議是否為 registry锣枝,若是厢拭,表明用戶想使用指定的注冊(cè)中心
                        if (UrlUtils.isRegistry(url)) {
                            // 將 map 轉(zhuǎn)換為查詢字符串,并作為 refer 參數(shù)的值添加到 url 中
                            urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            // 合并 url撇叁,移除服務(wù)提供者的一些配置(這些配置來源于用戶配置的 url 屬性)蚪腐,
                            // 比如線程池相關(guān)配置。并保留服務(wù)提供者的部分配置税朴,比如版本回季,group,時(shí)間戳等
                            // 最后將合并后的配置設(shè)置為 url 查詢字符串中正林。                 
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else { // assemble URL from register center's configuration
                // 從注冊(cè)中心的配置中組裝url信息
                // if protocols not injvm checkRegistry
                // 如果協(xié)議不是在jvm本地中
                if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                    //檢查注冊(cè)中心是否存在(如果當(dāng)前配置不存在則獲取服務(wù)默認(rèn)配置),然后將他們轉(zhuǎn)換到RegistryConfig中
                    checkRegistry();
                    //通過注冊(cè)中心配置信息組裝URL
                    List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
                    if (CollectionUtils.isNotEmpty(us)) {
                        for (URL u : us) {
                            //添加monitor監(jiān)控信息
                            URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                            if (monitorUrl != null) {
                                map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                            // 將map中的參數(shù)整理成refer參數(shù)泡一,添加到RegistryURL中
                            urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        }
                    }
                    // 既不是服務(wù)直連,也沒有配置注冊(cè)中心觅廓,拋出異常
                    if (urls.isEmpty()) {
                        throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                    }
                }
            }
            //單個(gè)注冊(cè)中心或服務(wù)提供者(服務(wù)直連鼻忠,下同)
            if (urls.size() == 1) {
                // 調(diào)用 RegistryProtocol 的 refer 構(gòu)建 Invoker 實(shí)例
                // 在單注冊(cè)中心或是直連單個(gè)服務(wù)提供方的時(shí)候,通過Protocol的適配器選擇對(duì)應(yīng)的Protocol實(shí)現(xiàn)創(chuàng)建Invoker對(duì)象
                invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
            } else {
                //多個(gè)注冊(cè)中心或多個(gè)服務(wù)提供者杈绸,或者兩者混合
                // 多注冊(cè)中心或是直連多個(gè)服務(wù)提供方的時(shí)候帖蔓,會(huì)根據(jù)每個(gè)URL創(chuàng)建Invoker對(duì)象
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                // 獲取所有的 Invoker
                for (URL url : urls) {
                    invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                    if (UrlUtils.isRegistry(url)) {// 確定是多注冊(cè)中心,還是直連多個(gè)Provider
                        // 保存使用注冊(cè)中心的最新的URL信息
                        registryURL = url; // use last registry url
                    }
                }
                // 注冊(cè)中心URL存在
                if (registryURL != null) { // registry url is available
                    // for multi-subscription scenario, use 'zone-aware' policy by default
                    // 多注冊(cè)中心的場(chǎng)景中瞳脓,會(huì)使用ZoneAwareCluster作為Cluster默認(rèn)實(shí)現(xiàn)塑娇,多注冊(cè)中心之間的選擇
                    // 對(duì)于對(duì)區(qū)域訂閱方案,默認(rèn)使用"zone-aware"區(qū)域
                    String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
                    // The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
                    // invoker 包裝順序: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
                    invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
                } else { // not a registry url, must be direct invoke.
                    // 如果不存在注冊(cè)中心連接劫侧,只能使用直連
                    //如果訂閱區(qū)域未設(shè)置埋酬,則設(shè)置為默認(rèn)區(qū)域"zone-aware"
                    String cluster = CollectionUtils.isNotEmpty(invokers)
                            ? (invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) : Cluster.DEFAULT)
                            : Cluster.DEFAULT;
                    // 創(chuàng)建 StaticDirectory 實(shí)例,并由 Cluster 對(duì)多個(gè) Invoker 進(jìn)行合并        
                    invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
                }
            }
        }

        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }

        URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
        MetadataUtils.publishServiceDefinition(consumerURL);

        // 通過ProxyFactory適配器選擇合適的ProxyFactory擴(kuò)展實(shí)現(xiàn)烧栋,創(chuàng)建代理對(duì)象
        return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
    }
}

RegistryProtocol

在直連 Provider 的場(chǎng)景中写妥,會(huì)使用 DubboProtocol.refer() 方法完成服務(wù)引用,DubboProtocol.refer() 方法的具體實(shí)現(xiàn)在之前已經(jīng)詳細(xì)介紹過了审姓,這里我們重點(diǎn)來看存在注冊(cè)中心的場(chǎng)景中珍特,Dubbo Consumer 是如何通過 RegistryProtocol 完成服務(wù)引用的。

在 RegistryProtocol.refer() 方法中魔吐,會(huì)先根據(jù) URL 獲取注冊(cè)中心的 URL扎筒,再調(diào)用 doRefer 方法生成 Invoker呼猪,在 refer() 方法中會(huì)使用 MergeableCluster 處理多 group 引用的場(chǎng)景。

public class RegistryProtocol implements Protocol {

    @Override
    @SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // 從URL中獲取注冊(cè)中心的URL
        url = getRegistryUrl(url);
        // 獲取Registry實(shí)例,這里的RegistryFactory對(duì)象是通過Dubbo SPI的自動(dòng)裝載機(jī)制注入的
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // 從注冊(cè)中心URL的refer參數(shù)中獲取此次服務(wù)引用的一些參數(shù),其中就包括group
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                // 如果此次可以引用多個(gè)group的服務(wù)器紧,則Cluser實(shí)現(xiàn)使用MergeableCluster實(shí)現(xiàn)捏浊,
                // 這里的getMergeableCluster()方法就會(huì)通過Dubbo SPI方式找到MergeableCluster實(shí)例
                return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url);
            }
        }

        Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
        // 如果沒有g(shù)roup參數(shù)或是只指定了一個(gè)group,則通過Cluster適配器選擇Cluster實(shí)現(xiàn)
        return doRefer(cluster, registry, type, url);
    }
}

在 doRefer() 方法中,首先會(huì)根據(jù) URL 初始化 RegistryDirectory 實(shí)例,然后生成 Subscribe URL 并進(jìn)行注冊(cè),之后會(huì)通過 Registry 訂閱服務(wù)壶唤,最后通過 Cluster 將多個(gè) Invoker 合并成一個(gè) Invoker 返回給上層,具體實(shí)現(xiàn)如下:

public class RegistryProtocol implements Protocol {

    protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        return interceptInvoker(getInvoker(cluster, registry, type, url), url);
    }
    
    protected <T> ClusterInvoker<T> getInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {
        // 創(chuàng)建RegistryDirectory實(shí)例
        DynamicDirectory<T> directory = createDirectory(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // 生成SubscribeUrl棕所,協(xié)議為consumer闸盔,具體的參數(shù)是RegistryURL中refer參數(shù)指定的參數(shù)
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        URL urlToRegistry = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (directory.isShouldRegister()) {
            // 在SubscribeUrl中添加category=consumers和check=false參數(shù)
            directory.setRegisteredConsumerUrl(urlToRegistry);
            // 服務(wù)注冊(cè),在Zookeeper的consumers節(jié)點(diǎn)下琳省,添加該Consumer對(duì)應(yīng)的節(jié)點(diǎn)
            registry.register(directory.getRegisteredConsumerUrl());
        }
        // 根據(jù)SubscribeUrl創(chuàng)建服務(wù)路由
        directory.buildRouterChain(urlToRegistry);
        // 訂閱服務(wù)迎吵,toSubscribeUrl()方法會(huì)將SubscribeUrl中category參數(shù)修改為"providers,configurators,routers"
        // RegistryDirectory的subscribe()在前面詳細(xì)分析過了,其中會(huì)通過Registry訂閱服務(wù)针贬,同時(shí)還會(huì)添加相應(yīng)的監(jiān)聽器
        directory.subscribe(toSubscribeUrl(urlToRegistry));

        // 注冊(cè)中心中可能包含多個(gè)Provider击费,相應(yīng)地,也就有多個(gè)Invoker桦他,
        // 這里通過前面選擇的Cluster將多個(gè)Invoker對(duì)象封裝成一個(gè)Invoker對(duì)象
        return (ClusterInvoker<T>) cluster.join(directory);
    }

    protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url) {
        // 根據(jù)URL中的registry.protocol.listener參數(shù)加載相應(yīng)的監(jiān)聽器實(shí)現(xiàn)
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) {
            return invoker;
        }
    
        //引入了RegistryProtocol偵聽器蔫巩,以使用戶有機(jī)會(huì)自定義或更改導(dǎo)出并引用RegistryProtocol的行為。
        // 例如:在滿足某些條件時(shí)立即重新導(dǎo)出或重新引用快压。
        for (RegistryProtocolListener listener : listeners) {
            listener.onRefer(this, invoker);
        }
        return invoker;
    }   
}

總結(jié)

本文重點(diǎn)介紹了 Dubbo 服務(wù)引用的整個(gè)流程:

  • 首先圆仔,我們介紹了 DubboBootStrap 這個(gè)入口門面類與服務(wù)引用相關(guān)的方法,其中涉及 referServices()蔫劣、reference() 等核心方法坪郭。

  • 接下來,我們分析了 ReferenceConfigCache 這個(gè) ReferenceConfig 對(duì)象緩存拦宣,以及 ReferenceConfig 實(shí)現(xiàn)服務(wù)引用的核心流程截粗。

  • 最后,我們還講解了 RegistryProtocol 從注冊(cè)中心引用服務(wù)的核心實(shí)現(xiàn)鸵隧。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市意推,隨后出現(xiàn)的幾起案子豆瘫,更是在濱河造成了極大的恐慌,老刑警劉巖菊值,帶你破解...
    沈念sama閱讀 217,084評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件外驱,死亡現(xiàn)場(chǎng)離奇詭異育灸,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)昵宇,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,623評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門磅崭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人瓦哎,你說我怎么就攤上這事砸喻。” “怎么了蒋譬?”我有些...
    開封第一講書人閱讀 163,450評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵割岛,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我犯助,道長(zhǎng)癣漆,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,322評(píng)論 1 293
  • 正文 為了忘掉前任剂买,我火速辦了婚禮惠爽,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘瞬哼。我一直安慰自己疆股,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,370評(píng)論 6 390
  • 文/花漫 我一把揭開白布倒槐。 她就那樣靜靜地躺著旬痹,像睡著了一般。 火紅的嫁衣襯著肌膚如雪讨越。 梳的紋絲不亂的頭發(fā)上两残,一...
    開封第一講書人閱讀 51,274評(píng)論 1 300
  • 那天,我揣著相機(jī)與錄音把跨,去河邊找鬼人弓。 笑死,一個(gè)胖子當(dāng)著我的面吹牛着逐,可吹牛的內(nèi)容都是我干的崔赌。 我是一名探鬼主播,決...
    沈念sama閱讀 40,126評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼耸别,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼健芭!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起秀姐,我...
    開封第一講書人閱讀 38,980評(píng)論 0 275
  • 序言:老撾萬榮一對(duì)情侶失蹤慈迈,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后省有,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體痒留,經(jīng)...
    沈念sama閱讀 45,414評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡谴麦,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,599評(píng)論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了伸头。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片匾效。...
    茶點(diǎn)故事閱讀 39,773評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖恤磷,靈堂內(nèi)的尸體忽然破棺而出面哼,到底是詐尸還是另有隱情,我是刑警寧澤碗殷,帶...
    沈念sama閱讀 35,470評(píng)論 5 344
  • 正文 年R本政府宣布精绎,位于F島的核電站,受9級(jí)特大地震影響锌妻,放射性物質(zhì)發(fā)生泄漏代乃。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,080評(píng)論 3 327
  • 文/蒙蒙 一仿粹、第九天 我趴在偏房一處隱蔽的房頂上張望搁吓。 院中可真熱鬧,春花似錦吭历、人聲如沸堕仔。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,713評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽摩骨。三九已至,卻和暖如春朗若,著一層夾襖步出監(jiān)牢的瞬間恼五,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,852評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工哭懈, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留灾馒,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,865評(píng)論 2 370
  • 正文 我出身青樓遣总,卻偏偏與公主長(zhǎng)得像睬罗,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子旭斥,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,689評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容