Dubbo-服務引入(5)

1. 簡介

上一篇文章詳細分析了服務導出的過程胞此,本篇文章我們趁熱打鐵沥阳,繼續(xù)分析服務引用過程署咽。在 Dubbo 中杂抽,我們可以通過兩種方式引用遠程服務诈唬。第一種是使用服務直連的方式引用服務,第二種方式是基于注冊中心進行引用默怨。服務直連的方式僅適合在調(diào)試或測試服務的場景下使用讯榕,不適合在線上環(huán)境使用。因此匙睹,本文我將重點分析通過注冊中心引用服務的過程愚屁。從注冊中心中獲取服務配置只是服務引用過程中的一環(huán),除此之外痕檬,服務消費者還需要經(jīng)歷 Invoker 創(chuàng)建霎槐、代理類創(chuàng)建等步驟。這些步驟梦谜,將在后續(xù)章節(jié)中一一進行分析丘跌。

2.服務引用原理

Dubbo 服務引用的時機有兩個,第一個是在 Spring 容器調(diào)用 ReferenceBean 的 afterPropertiesSet 方法時引用服務唁桩,第二個是在 ReferenceBean 對應的服務被注入到其他類中時引用闭树。這兩個引用服務的時機區(qū)別在于,第一個是餓漢式的荒澡,第二個是懶漢式的报辱。默認情況下,Dubbo 使用懶漢式引用服務单山。如果需要使用餓漢式碍现,可通過配置 <dubbo:reference> 的 init 屬性開啟。下面我們按照 Dubbo 默認配置進行分析米奸,整個分析過程從 ReferenceBean 的 getObject 方法開始昼接。當我們的服務被注入到其他類中時,Spring 會第一時間調(diào)用 getObject 方法悴晰,并由該方法執(zhí)行服務引用邏輯慢睡。按照慣例,在進行具體工作之前铡溪,需先進行配置檢查與收集工作一睁。接著根據(jù)收集到的信息決定服務用的方式,有三種佃却,第一種是引用本地 (JVM) 服務者吁,第二是通過直連方式引用遠程服務,第三是通過注冊中心引用遠程服務饲帅。不管是哪種引用方式复凳,最后都會得到一個 Invoker 實例瘤泪。如果有多個注冊中心,多個服務提供者育八,這個時候會得到一組 Invoker 實例对途,此時需要通過集群管理類 Cluster 將多個 Invoker 合并成一個實例。合并后的 Invoker 實例已經(jīng)具備調(diào)用本地或遠程服務的能力了髓棋,但并不能將此實例暴露給用戶使用实檀,這會對用戶業(yè)務代碼造成侵入。此時框架還需要通過代理工廠類 (ProxyFactory) 為服務接口生成代理類按声,并讓代理類去調(diào)用 Invoker 邏輯膳犹。避免了 Dubbo 框架代碼對業(yè)務代碼的侵入,同時也讓框架更容易使用签则。

以上就是服務引用的大致原理须床,下面我們深入到代碼中,詳細分析服務引用細節(jié)渐裂。

3.源碼分析

我們先看一下ReferenceBean 的依賴結構:

image

服務引用的入口方法為 ReferenceBean 的 getObject 方法豺旬,該方法定義在 Spring 的 FactoryBean 接口中,在調(diào)用afterPropertiesSet 方法時引用服務柒凉,ReferenceBean 實現(xiàn)了這個方法族阅。實現(xiàn)代碼如下:

// 這個方法重寫自  InitializingBean,該方法用于當bean set完屬性后膝捞,用戶進行自定義的邏輯
public void afterPropertiesSet() throws Exception {

    // Initializes Dubbo's Config Beans before @Reference bean autowiring
    // 初始化 dubbo 配置 bean
    prepareDubboConfigBeans();

    // 默認懶漢模式
    if (init == null) {
        init = false;
    }

    // 通過配置文件可以設置init屬性
    if (shouldInit()) {
        getObject();
    }
}

public Object getObject() {
        return get();
    }

ReferenceBean 中get()方法在父類ReferenceConfig中實現(xiàn)坦刀,代碼如下:

public synchronized T get() {
    if (destroyed) {
        throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
    }
    // 檢測 ref 是否為空,為空則通過 init 方法創(chuàng)建
    if (ref == null) {
        // init 方法主要用于處理配置绑警,以及調(diào)用 createProxy 生成代理類
        init();
    }
    // 返回接口代理引用
    return ref;
}

Dubbo 提供了豐富的配置求泰,用于調(diào)整和優(yōu)化框架行為央渣,性能等计盒。Dubbo 在引用或?qū)С龇諘r,首先會對這些配置進行檢查和處理芽丹,以保證配置的正確性北启。配置解析邏輯封裝在 ReferenceConfig 的 init 方法中,這部分就不展開了拔第。

public synchronized void init() {
        // 避免重復初始化
        if (initialized) {
            return;
        }

        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.init();
        }

        // 1咕村、配置檢查
        checkAndUpdateSubConfigs();
        checkStubAndLocal(interfaceClass);
        ConfigValidationUtils.checkMock(interfaceClass, this);

        // 2、添加 side蚊俺、協(xié)議版本信息懈涛、時間戳和進程號等等信息到 map 中
        Map<String, String> map = new HashMap<String, String>();
        map.put(SIDE_KEY, CONSUMER_SIDE);

        ReferenceConfigBase.appendRuntimeParameters(map);
        // 檢測是否為泛化接口
        if (!ProtocolUtils.isGeneric(generic)) {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put(REVISION_KEY, revision);
            }

            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("No method found in service interface " + interfaceClass.getName());
                // 獲取接口方法列表,并添加到 map 中
                map.put(METHODS_KEY, ANY_VALUE);
            } else {
                map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
            }
        }
        map.put(INTERFACE_KEY, interfaceName);
        // 將 ApplicationConfig泳猬、ConsumerConfig批钠、ReferenceConfig 等對象的字段信息添加到 map 中
        AbstractConfig.appendParameters(map, getMetrics());
        AbstractConfig.appendParameters(map, getApplication());
        AbstractConfig.appendParameters(map, getModule());
        // remove 'default.' prefix for configs from ConsumerConfig
        // appendParameters(map, consumer, Constants.DEFAULT_KEY);
        AbstractConfig.appendParameters(map, consumer);
        AbstractConfig.appendParameters(map, this);
        MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
        if (metadataReportConfig != null && metadataReportConfig.isValid()) {
            map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
        }
        Map<String, AsyncMethodInfo> attributes = null;
        if (CollectionUtils.isNotEmpty(getMethods())) {
            attributes = new HashMap<>();
            for (MethodConfig methodConfig : getMethods()) {
                AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
                String retryKey = methodConfig.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(methodConfig.getName() + ".retries", "0");
                    }
                }
                AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
                if (asyncMethodInfo != null) {
//                    consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
                    // 添加 MethodConfig 中的“屬性”字段到 attributes
                    // 比如 onreturn宇植、onthrow、oninvoke 等
                    attributes.put(methodConfig.getName(), asyncMethodInfo);
                }
            }
        }

        // 獲取服務消費者 ip 地址
        String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
        if (StringUtils.isEmpty(hostToRegistry)) {
            hostToRegistry = NetUtils.getLocalHost();
        } else if (isInvalidLocalHost(hostToRegistry)) {
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
        }
        map.put(REGISTER_IP_KEY, hostToRegistry);

        serviceMetadata.getAttachments().putAll(map);

        // 3埋心、創(chuàng)建代理類
        ref = createProxy(map);

        serviceMetadata.setTarget(ref);
        serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
        ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
        consumerModel.setProxyObject(ref);
        consumerModel.init(attributes);

        initialized = true;

        // dispatch ReferenceConfigInitializedEvent事件
        dispatch(new ReferenceConfigInitializedEvent(this, invoker));
    }

核心流程主要是三個:

  1. 配置檢查
  2. 構建map
  3. 創(chuàng)建代理類

繼續(xù)指郁,我們進入createProxy方法:

private T createProxy(Map<String, String> map) {
        
    // 本地引用
    if (shouldJvmRefer(map)) {
        // 生成本地引用 URL,協(xié)議為 injvm
        URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
        // 調(diào)用 refer 方法構建 InjvmInvoker 實例
        invoker = REF_PROTOCOL.refer(interfaceClass, url);
    // 遠程引用    
    } else {
        urls.clear();
        // url 不為空拷呆,表明用戶可能想進行點對點調(diào)用
        if (url != null && url.length() > 0) { 
            String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
            if (us != null && us.length > 0) {
                for (String u : us) {
                    URL url = URL.valueOf(u);
                    // 設置接口全限定名為 url 路徑
                    if (StringUtils.isEmpty(url.getPath())) {
                        url = url.setPath(interfaceName);
                    }
                    if (UrlUtils.isRegistry(url)) {
                        urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                    } else {
                        urls.add(ClusterUtils.mergeUrl(url, map));
                    }
                }
            }
        } else { // assemble URL from register center's configuration
            // 檢測 url 協(xié)議是否為 registry闲坎,若是,表明用戶想使用指定的注冊中心
            if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                checkRegistry();
                List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
                if (CollectionUtils.isNotEmpty(us)) {
                    for (URL u : us) {
                        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                        if (monitorUrl != null) {
                            map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        // 添加 refer 參數(shù)到 url 中茬斧,并將 url 添加到 urls 中
                        urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                 // 未配置注冊中心腰懂,拋出異常
                if (urls.isEmpty()) {
                    // 拋出異常
                }
            }
        }

        // 單個注冊中心或服務提供者(服務直連,下同)
        if (urls.size() == 1) {
             // 調(diào)用 RegistryProtocol 的 refer 構建 Invoker 實例
            invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
        // 多個注冊中心或多個服務提供者啥供,或者兩者混合    
        } else {
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;
            // 獲取所有的 Invoker
            for (URL url : urls) {
                // 通過 refprotocol 調(diào)用 refer 構建 Invoker悯恍,refprotocol 會在運行時
                // 根據(jù) url 協(xié)議頭加載指定的 Protocol 實例,并調(diào)用實例的 refer 方法
                invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                if (UrlUtils.isRegistry(url)) {
                    registryURL = url; // use last registry url
                }
            }
            // 如果注冊中心鏈接不為空伙狐,則將使用 AvailableCluster
            if (registryURL != null) {
                // for multi-subscription scenario, use 'zone-aware' policy by default
                URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
                // 創(chuàng)建 StaticDirectory 實例涮毫,并由 Cluster 對多個 Invoker 進行合并
                // The invoker wrap relation would be like: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
                invoker = CLUSTER.join(new StaticDirectory(u, invokers));
            } else { // not a registry url, must be direct invoke.
                invoker = CLUSTER.join(new StaticDirectory(invokers));
            }
        }
    }

    // invoker 可用性檢查
    if (shouldCheck() && !invoker.isAvailable()) {
        invoker.destroy();
        // throw new IllegalStateException
    }
    
    String metadata = map.get(METADATA_KEY);
    WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
    if (metadataService != null) {
        URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
        metadataService.publishServiceDefinition(consumerURL);
    }
    // 生成代理類
    return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}

上面代碼很多,不過邏輯比較清晰贷屎。首先根據(jù)配置檢查是否為本地調(diào)用罢防,若是,則調(diào)用 InjvmProtocol 的 refer 方法生成 InjvmInvoker 實例唉侄。若不是咒吐,則讀取直連配置項,或注冊中心 url属划,并將讀取到的 url 存儲到 urls 中恬叹。然后根據(jù) urls 元素數(shù)量進行后續(xù)操作。若 urls 元素數(shù)量為1同眯,則直接通過 Protocol 自適應拓展類構建 Invoker 實例接口绽昼。若 urls 元素數(shù)量大于1,即存在多個注冊中心或服務直連 url须蜗,此時先根據(jù) url 構建 Invoker硅确。然后再通過 Cluster 合并多個 Invoker,最后調(diào)用 ProxyFactory 生成代理類明肮。

3.1 創(chuàng)建 Invoker

Invoker 是 Dubbo 的核心模型菱农,代表一個可執(zhí)行體。在服務提供方柿估,Invoker 用于調(diào)用服務提供類循未。在服務消費方,Invoker 用于執(zhí)行遠程調(diào)用秫舌。Invoker 是由 Protocol 實現(xiàn)類構建而來的妖。Protocol 實現(xiàn)類有很多烙丛,本節(jié)會分析最常用的兩個,分別是 RegistryProtocol 和 DubboProtocol羔味,其他的大家自行分析河咽。下面先來分析 RegistryProtocol 的 refer 方法源碼。如下:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // 取 registry 參數(shù)值赋元,并將其設置為協(xié)議頭
    url = getRegistryUrl(url);
    // 獲取注冊中心實例
    Registry registry = registryFactory.getRegistry(url);
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // group="a,b" or group="*"
    // 將 url 查詢字符串轉(zhuǎn)為 Map
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
    // 獲取 group 配置
    String group = qs.get(GROUP_KEY);
    if (group != null && group.length() > 0) {
        if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            // 通過 SPI 加載 MergeableCluster 實例忘蟹,并調(diào)用 doRefer 繼續(xù)執(zhí)行服務引用邏輯
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    // 調(diào)用 doRefer 繼續(xù)執(zhí)行服務引用邏輯
    return doRefer(cluster, registry, type, url);
}

上面代碼首先為 url 設置協(xié)議頭,然后根據(jù) url 參數(shù)加載注冊中心實例搁凸。然后獲取 group 配置媚值,根據(jù) group 配置決定 doRefer 第一個參數(shù)的類型。這里的重點是 doRefer 方法护糖,如下:

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    // 創(chuàng)建 RegistryDirectory 實例, 這個 RegistryDirectory 存放注冊信息的資料褥芒,當注冊信息發(fā)生變化的時候通過 subscribe 方法分發(fā),內(nèi)部存在一些監(jiān)聽器處理信息時間
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    // 設置注冊中心和協(xié)議
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
    // 生成服務消費者鏈接
    URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (directory.isShouldRegister()) {
        directory.setRegisteredConsumerUrl(subscribeUrl);
        // 注冊服務消費者嫡良,在 consumers 目錄下新節(jié)點
        registry.register(directory.getRegisteredConsumerUrl());
    }
    directory.buildRouterChain(subscribeUrl);
    // 訂閱 providers锰扶、configurators、routers 等節(jié)點數(shù)據(jù)
    directory.subscribe(toSubscribeUrl(subscribeUrl));

    // 這才是真正生成 invoker 的地方
    // 一個注冊中心可能有多個服務提供者寝受,因此這里需要將多個服務提供者合并為一個
    Invoker<T> invoker = cluster.join(directory);
    List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
    if (CollectionUtils.isEmpty(listeners)) {
        return invoker;
    }

    RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
    for (RegistryProtocolListener listener : listeners) {
        listener.onRefer(this, registryInvokerWrapper);
    }
    return registryInvokerWrapper;
}

如上坷牛,doRefer 方法創(chuàng)建一個 RegistryDirectory 實例,然后生成服務者消費者鏈接很澄,并向注冊中心進行注冊京闰。注冊完畢后,緊接著訂閱 providers甩苛、configurators蹂楣、routers 等節(jié)點下的數(shù)據(jù)。完成訂閱后讯蒲,RegistryDirectory 會收到這幾個節(jié)點下的子節(jié)點信息痊土。由于一個服務可能部署在多臺服務器上,這樣就會在 providers 產(chǎn)生多個節(jié)點爱葵,這個時候就需要 Cluster 將多個服務節(jié)點合并為一個施戴,并生成一個 Invoker反浓。我們發(fā)現(xiàn)在返回 inovker 之前的 萌丈,會調(diào)用 RegistryDirectory 的 subscribe 方法,這個方法正是就是同個服務提供者生成多個 invoker 的地方雷则,調(diào)用棧如下辆雾,可以看見該例中調(diào)用的是 DubboProtocol 。

服務導入-1

我們稍微講解一下月劈,然后再分析 DubboProtocol 生成 invoker 的過程度迂。 首先藤乙,多個提供者生成 invoker 的步驟在 AbstractRegistry 的 notify 方法中(可以從前面的調(diào)用棧找到)

/**
 * Notify changes from the Provider side.
 */
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    // 省略 check...
    
    
    // keep every provider's category.
    // 找到提供者的目錄(提供者信息)
    Map<String, List<URL>> result = new HashMap<>();
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
            String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
            List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
            categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
    Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        //更新操作!2涯埂L沉骸!要是第一次腊凶,必然多個提供者創(chuàng)建多個invoker,從調(diào)用椈溃可以看出下面的 notity 方法最終也調(diào)用到了 DubboProtocol 生成 DubboInvoker 的過程。 
        listener.notify(categoryList);
        // We will update our cache file after each notification.
        // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
        saveProperties(url);
    }
}

最后 AbstractProtocol 的 refer 被調(diào)用

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
    }

調(diào)用鏈如下:

服務導入2

此后DubboProtocol 的 protocolBindingRefer 被調(diào)用 :

public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);

    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);

    return invoker;
}

到了這里我們钧萍,邏輯不難懂褐缠,就是創(chuàng)建一個 DubboInvoker 然后返回。其中有個 getClients 的方法风瘦,這個方法用于獲取客戶端實例队魏,實例類型為 ExchangeClient。ExchangeClient 實際上并不具備通信能力万搔,它需要基于更底層的客戶端實例進行通信胡桨。比如 NettyClient、MinaClient 等瞬雹,默認情況下登失,Dubbo 使用 NettyClient 進行通信。接下來挖炬,我們簡單看一下 getClients 方法的邏輯揽浙。

private ExchangeClient[] getClients(URL url) {
    
    // 是否共享連接
    boolean useShareConnect = false;

    // 獲取連接數(shù),默認為0意敛,表示未配置
    int connections = url.getParameter(CONNECTIONS_KEY, 0);
    List<ReferenceCountExchangeClient> shareClients = null;
    // 如果未配置 connections馅巷,則共享連接
    if (connections == 0) {
        useShareConnect = true;

        /*
         * The xml configuration should have a higher priority than properties.
         */
        String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
        connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
                DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
        // 獲取共享客戶端
        shareClients = getSharedClient(url, connections);
    }

    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (useShareConnect) {
            clients[i] = shareClients.get(i);
        } else {
            // 初始化新的客戶端
            clients[i] = initClient(url);
        }
    }

    return clients;
}

這里根據(jù) connections 數(shù)量決定是獲取共享客戶端還是創(chuàng)建新的客戶端實例,默認情況下草姻,使用共享客戶端實例钓猬。getSharedClient 方法中也會調(diào)用 initClient 方法,因此下面我們一起看一下這兩個方法撩独。

private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
    String key = url.getAddress();
    // 獲取帶有“引用計數(shù)”功能的 ExchangeClient
    List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);

    if (checkClientCanUse(clients)) {
        // 增加引用計數(shù)
        batchClientRefIncr(clients);
        return clients;
    }

    locks.putIfAbsent(key, new Object());
    synchronized (locks.get(key)) {
        clients = referenceClientMap.get(key);
        // dubbo check
        if (checkClientCanUse(clients)) {
            batchClientRefIncr(clients);
            return clients;
        }

        // connectNum must be greater than or equal to 1
        connectNum = Math.max(connectNum, 1);

        // If the clients is empty, then the first initialization is
        if (CollectionUtils.isEmpty(clients)) {
            // 創(chuàng)建 ExchangeClient 客戶端
            clients = buildReferenceCountExchangeClientList(url, connectNum);
            referenceClientMap.put(key, clients);

        } else {
            for (int i = 0; i < clients.size(); i++) {
                ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
                // If there is a client in the list that is no longer available, create a new one to replace him.
                if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
                    clients.set(i, buildReferenceCountExchangeClient(url));
                    continue;
                }

                referenceCountExchangeClient.incrementAndGetCount();
            }
        }

        /*
         * I understand that the purpose of the remove operation here is to avoid the expired url key
         * always occupying this memory space.
         */
        locks.remove(key);

        return clients;
    }
}

上面方法先訪問緩存敞曹,若緩存未命中,則通過 initClient 方法創(chuàng)建新的 ExchangeClient 實例综膀,并將該實例傳給 ReferenceCountExchangeClient 構造方法創(chuàng)建一個帶有引用計數(shù)功能的 ExchangeClient 實例澳迫。ReferenceCountExchangeClient 內(nèi)部實現(xiàn)比較簡單,就不分析了剧劝。下面我們再來看一下 initClient 方法的代碼橄登。

    private ExchangeClient initClient(URL url) {

    // 獲取客戶端類型,默認為 netty
    String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));

    // 添加編解碼和心跳包參數(shù)到 url 中
    url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
    // enable heartbeat by default
    url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));

    // 檢測客戶端類型是否存在,不存在則拋出異常
    // BIO is not allowed since it has severe performance issue.
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported client type: " + str + "," +
                " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
    }

    ExchangeClient client;
    try {
        // connection should be lazy
        // 獲取 lazy 配置拢锹,并根據(jù)配置值決定創(chuàng)建的客戶端類型
        if (url.getParameter(LAZY_CONNECT_KEY, false)) {
            // 創(chuàng)建懶加載 ExchangeClient 實例
            client = new LazyConnectExchangeClient(url, requestHandler);

        } else {
            // 創(chuàng)建普通 ExchangeClient 實例
            client = Exchangers.connect(url, requestHandler);
        }

    } catch (RemotingException e) {
        throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
    }

    return client;
}

initClient 方法首先獲取用戶配置的客戶端類型谣妻,默認為 netty。然后檢測用戶配置的客戶端類型是否存在卒稳,不存在則拋出異常蹋半。最后根據(jù) lazy 配置決定創(chuàng)建什么類型的客戶端。這里的 LazyConnectExchangeClient 代碼并不是很復雜充坑,該類會在 request 方法被調(diào)用時通過 Exchangers 的 connect 方法創(chuàng)建 ExchangeClient 客戶端湃窍,該類的代碼本節(jié)就不分析了。下面我們分析一下 Exchangers 的 connect 方法匪傍。

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    // 獲取 Exchanger 實例您市,默認為 HeaderExchangeClient
    return getExchanger(url).connect(url, handler);
}

如上,getExchanger 會通過 SPI 加載 HeaderExchangeClient 實例役衡,這個方法比較簡單茵休,大家自己看一下吧。接下來分析 HeaderExchangeClient 的實現(xiàn)手蝎。

public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";
    
    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    // 這里包含了多個調(diào)用榕莺,分別如下:
    // 1. 創(chuàng)建 HeaderExchangeHandler 對象
    // 2. 創(chuàng)建 DecodeHandler 對象
    // 3. 通過 Transporters 構建 Client 實例
    // 4. 創(chuàng)建 HeaderExchangeClient 對象
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }
}

這里的調(diào)用比較多,我們這里重點看一下 Transporters 的 connect 方法棵介。如下:

public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    ChannelHandler handler;
    if (handlers == null || handlers.length == 0) {
        handler = new ChannelHandlerAdapter();
    } else if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        // 如果 handler 數(shù)量大于1钉鸯,則創(chuàng)建一個 ChannelHandler 分發(fā)器
        handler = new ChannelHandlerDispatcher(handlers);
    }
    // 獲取 Transporter 自適應拓展類,并調(diào)用 connect 方法生成 Client 實例
    return getTransporter().connect(url, handler);
}

如上邮辽,getTransporter 方法返回的是自適應拓展類唠雕,該類會在運行時根據(jù)客戶端類型加載指定的 Transporter 實現(xiàn)類。若用戶未配置客戶端類型吨述,則默認加載 NettyTransporter岩睁,并調(diào)用該類的 connect 方法。如下:

public Client connect(URL url, ChannelHandler listener) throws RemotingException {
    // 創(chuàng)建 NettyClient 對象
    return new NettyClient(url, listener);
}

到這里就不繼續(xù)跟下去了揣云,在往下就是通過 Netty 提供的 API 構建 Netty 客戶端了捕儒,大家有興趣自己看看。到這里邓夕,關于 DubboProtocol 的 refer 方法就分析完了刘莹。

3.2 創(chuàng)建代理

Invoker 創(chuàng)建完畢后,接下來要做的事情是為服務接口生成代理對象焚刚。有了代理對象点弯,即可進行遠程調(diào)用。默認代理工廠是JavassistProxyFactory汪榔,接下來進行分析蒲拉。

public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    // 生成 Proxy 子類(Proxy 是抽象類)。并調(diào)用 Proxy 子類的 newInstance 方法創(chuàng)建 Proxy 實例
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

上面代碼并不多痴腌,首先是通過 Proxy 的 getProxy 方法獲取 Proxy 子類雌团,然后創(chuàng)建 InvokerInvocationHandler 對象,并將該對象傳給 newInstance 生成 Proxy 實例士聪。InvokerInvocationHandler 實現(xiàn) JDK 的 InvocationHandler 接口锦援,具體的用途是攔截接口類調(diào)用。該類邏輯比較簡單剥悟,這里就不分析了灵寺。下面我們重點關注一下 Proxy 的 getProxy 方法,如下区岗。

public static Proxy getProxy(Class<?>... ics) {
    return getProxy(ClassUtils.getClassLoader(Proxy.class), ics);
}

public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
    if (ics.length > 65535)
        throw new IllegalArgumentException("interface limit exceeded");

    StringBuilder sb = new StringBuilder();
    // 遍歷接口列表
    for (int i = 0; i < ics.length; i++) {
        String itf = ics[i].getName();
        // 檢測類型是否為接口
        if (!ics[i].isInterface())
            throw new RuntimeException(itf + " is not a interface.");

        Class<?> tmp = null;
        try {
            // 重新加載接口類
            tmp = Class.forName(itf, false, cl);
        } catch (ClassNotFoundException e) {
        }

        // 檢測接口是否相同略板,這里 tmp 有可能為空
        if (tmp != ics[i])
            throw new IllegalArgumentException(ics[i] + " is not visible from class loader");

        // 拼接接口全限定名,分隔符為 ;
        sb.append(itf).append(';');
    }

    // 使用拼接后的接口名作為 key
    String key = sb.toString();

    Map<String, Object> cache;
    synchronized (ProxyCacheMap) {
        cache = ProxyCacheMap.get(cl);
        if (cache == null) {
            cache = new HashMap<String, Object>();
            ProxyCacheMap.put(cl, cache);
        }
    }

    Proxy proxy = null;
    synchronized (cache) {
        do {
            // 從緩存中獲取 Reference<Proxy> 實例
            Object value = cache.get(key);
            if (value instanceof Reference<?>) {
                proxy = (Proxy) ((Reference<?>) value).get();
                if (proxy != null) {
                    return proxy;
                }
            }

            // 并發(fā)控制慈缔,保證只有一個線程可以進行后續(xù)操作
            if (value == PendingGenerationMarker) {
                try {
                    // 其他線程在此處進行等待
                    cache.wait();
                } catch (InterruptedException e) {
                }
            } else {
                // 放置標志位到緩存中叮称,并跳出 while 循環(huán)進行后續(xù)操作
                cache.put(key, PendingGenerationMarker);
                break;
            }
        }
        while (true);
    }

    long id = PROXY_CLASS_COUNTER.getAndIncrement();
    String pkg = null;
    ClassGenerator ccp = null, ccm = null;
    try {
        // 創(chuàng)建 ClassGenerator 對象
        ccp = ClassGenerator.newInstance(cl);

        Set<String> worked = new HashSet<String>();
        List<Method> methods = new ArrayList<Method>();

        for (int i = 0; i < ics.length; i++) {
            // 檢測接口訪問級別是否為 protected 或 privete
            if (!Modifier.isPublic(ics[i].getModifiers())) {
                // 獲取接口包名
                String npkg = ics[i].getPackage().getName();
                if (pkg == null) {
                    pkg = npkg;
                } else {
                    if (!pkg.equals(npkg))
                        // 非 public 級別的接口必須在同一個包下,否者拋出異常
                        throw new IllegalArgumentException("non-public interfaces from different packages");
                }
            }
            
            // 添加接口到 ClassGenerator 中
            ccp.addInterface(ics[i]);

            // 遍歷接口方法
            for (Method method : ics[i].getMethods()) {
                // 獲取方法描述藐鹤,可理解為方法簽名
                String desc = ReflectUtils.getDesc(method);
                // 如果方法描述字符串已在 worked 中瓤檐,則忽略∮榻冢考慮這種情況挠蛉,
                // A 接口和 B 接口中包含一個完全相同的方法
                if (worked.contains(desc))
                    continue;
                worked.add(desc);

                int ix = methods.size();
                // 獲取方法返回值類型
                Class<?> rt = method.getReturnType();
                // 獲取參數(shù)列表
                Class<?>[] pts = method.getParameterTypes();

                // 生成 Object[] args = new Object[1...N]
                StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
                for (int j = 0; j < pts.length; j++)
                    // 生成 args[1...N] = ($w)$1...N;
                    code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
                // 生成 InvokerHandler 接口的 invoker 方法調(diào)用語句,如下:
                // Object ret = handler.invoke(this, methods[1...N], args);
                code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");

                // 返回值不為 void
                if (!Void.TYPE.equals(rt))
                    // 生成返回語句肄满,形如 return (java.lang.String) ret;
                    code.append(" return ").append(asArgument(rt, "ret")).append(";");

                methods.add(method);
                // 添加方法名谴古、訪問控制符、參數(shù)列表稠歉、方法代碼等信息到 ClassGenerator 中 
                ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
            }
        }

        if (pkg == null)
            pkg = PACKAGE_NAME;

        // 構建接口代理類名稱:pkg + ".proxy" + id讥电,比如 org.apache.dubbo.proxy0
        String pcn = pkg + ".proxy" + id;
        ccp.setClassName(pcn);
        ccp.addField("public static java.lang.reflect.Method[] methods;");
        // 生成 private java.lang.reflect.InvocationHandler handler;
        ccp.addField("private " + InvocationHandler.class.getName() + " handler;");

        // 為接口代理類添加帶有 InvocationHandler 參數(shù)的構造方法,比如:
        // porxy0(java.lang.reflect.InvocationHandler arg0) {
        //     handler=$1;
        // }
        ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
        // 為接口代理類添加默認構造方法
        ccp.addDefaultConstructor();
        
        // 生成接口代理類
        Class<?> clazz = ccp.toClass();
        clazz.getField("methods").set(null, methods.toArray(new Method[0]));

        // 構建 Proxy 子類名稱轧抗,比如 Proxy1恩敌,Proxy2 等
        String fcn = Proxy.class.getName() + id;
        ccm = ClassGenerator.newInstance(cl);
        ccm.setClassName(fcn);
        ccm.addDefaultConstructor();
        ccm.setSuperClass(Proxy.class);
        // 為 Proxy 的抽象方法 newInstance 生成實現(xiàn)代碼,形如:
        // public Object newInstance(java.lang.reflect.InvocationHandler h) { 
        //     return new org.apache.dubbo.proxy0($1);
        // }
        ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
        // 生成 Proxy 實現(xiàn)類
        Class<?> pc = ccm.toClass();
        // 通過反射創(chuàng)建 Proxy 實例
        proxy = (Proxy) pc.newInstance();
    } catch (RuntimeException e) {
        throw e;
    } catch (Exception e) {
        throw new RuntimeException(e.getMessage(), e);
    } finally {
        if (ccp != null)
            // 釋放資源
            ccp.release();
        if (ccm != null)
            ccm.release();
        synchronized (cache) {
            if (proxy == null)
                cache.remove(key);
            else
                // 寫緩存
                cache.put(key, new WeakReference<Proxy>(proxy));
            // 喚醒其他等待線程
            cache.notifyAll();
        }
    }
    return proxy;
}

上面代碼比較復雜横媚,我們寫了大量的注釋纠炮。大家在閱讀這段代碼時,要搞清楚 ccp 和 ccm 的用途灯蝴,不然會被搞暈恢口。ccp 用于為服務接口生成代理類,比如我們有一個 DemoService 接口穷躁,這個接口代理類就是由 ccp 生成的耕肩。ccm 則是用于為 org.apache.dubbo.common.bytecode.Proxy 抽象類生成子類,主要是實現(xiàn) Proxy 類的抽象方法。下面以 org.apache.dubbo.demo.DemoService 這個接口為例猿诸,來看一下該接口代理類代碼大致是怎樣的(忽略 EchoService 接口)婚被。

package org.apache.dubbo.common.bytecode;

public class proxy0 implements org.apache.dubbo.demo.DemoService {

    public static java.lang.reflect.Method[] methods;

    private java.lang.reflect.InvocationHandler handler;

    public proxy0() {
    }

    public proxy0(java.lang.reflect.InvocationHandler arg0) {
        handler = $1;
    }

    public java.lang.String sayHello(java.lang.String arg0) {
        Object[] args = new Object[1];
        args[0] = ($w) $1;
        Object ret = handler.invoke(this, methods[0], args);
        return (java.lang.String) ret;
    }
}

好了,到這里代理類生成邏輯就分析完了梳虽。

4.總結

我們可以說服務引用就是Dubbo進行 :

  • 消費者注冊
  • 封裝 invoker 返回給消費者址芯,供消費者透明使用 的過程。

5.參考資料

本文參考于Dubbo官網(wǎng)窜觉,詳情以官網(wǎng)最新文檔為準谷炸。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市禀挫,隨后出現(xiàn)的幾起案子旬陡,更是在濱河造成了極大的恐慌,老刑警劉巖语婴,帶你破解...
    沈念sama閱讀 217,084評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件季惩,死亡現(xiàn)場離奇詭異,居然都是意外死亡腻格,警方通過查閱死者的電腦和手機画拾,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,623評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來菜职,“玉大人青抛,你說我怎么就攤上這事〕旰耍” “怎么了蜜另?”我有些...
    開封第一講書人閱讀 163,450評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長嫡意。 經(jīng)常有香客問我举瑰,道長,這世上最難降的妖魔是什么蔬螟? 我笑而不...
    開封第一講書人閱讀 58,322評論 1 293
  • 正文 為了忘掉前任此迅,我火速辦了婚禮,結果婚禮上旧巾,老公的妹妹穿的比我還像新娘耸序。我一直安慰自己,他們只是感情好鲁猩,可當我...
    茶點故事閱讀 67,370評論 6 390
  • 文/花漫 我一把揭開白布坎怪。 她就那樣靜靜地躺著,像睡著了一般廓握。 火紅的嫁衣襯著肌膚如雪搅窿。 梳的紋絲不亂的頭發(fā)上嘁酿,一...
    開封第一講書人閱讀 51,274評論 1 300
  • 那天,我揣著相機與錄音男应,去河邊找鬼闹司。 笑死,一個胖子當著我的面吹牛殉了,可吹牛的內(nèi)容都是我干的开仰。 我是一名探鬼主播拟枚,決...
    沈念sama閱讀 40,126評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼薪铜,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了恩溅?” 一聲冷哼從身側響起隔箍,我...
    開封第一講書人閱讀 38,980評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎脚乡,沒想到半個月后蜒滩,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,414評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡奶稠,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,599評論 3 334
  • 正文 我和宋清朗相戀三年俯艰,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片锌订。...
    茶點故事閱讀 39,773評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡竹握,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出辆飘,到底是詐尸還是另有隱情啦辐,我是刑警寧澤,帶...
    沈念sama閱讀 35,470評論 5 344
  • 正文 年R本政府宣布蜈项,位于F島的核電站芹关,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏紧卒。R本人自食惡果不足惜侥衬,卻給世界環(huán)境...
    茶點故事閱讀 41,080評論 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望跑芳。 院中可真熱鬧浇冰,春花似錦、人聲如沸聋亡。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,713評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽坡倔。三九已至漂佩,卻和暖如春脖含,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背投蝉。 一陣腳步聲響...
    開封第一講書人閱讀 32,852評論 1 269
  • 我被黑心中介騙來泰國打工养葵, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人瘩缆。 一個月前我還...
    沈念sama閱讀 47,865評論 2 370
  • 正文 我出身青樓关拒,卻偏偏與公主長得像,于是被迫代替她去往敵國和親庸娱。 傳聞我的和親對象是個殘疾皇子着绊,可洞房花燭夜當晚...
    茶點故事閱讀 44,689評論 2 354

推薦閱讀更多精彩內(nèi)容