dubbo源碼閱讀(三) -- 服務(wù)引用

簡(jiǎn)介

dubbo服務(wù)引用有兩種方式召川,餓漢式和懶漢式。餓漢式指在Spring容器調(diào)用ReferenceBean的afterPropertiesSet方法時(shí)引用服務(wù)娇掏,懶漢式指在ReferenceBean對(duì)應(yīng)的服務(wù)被注入到其他類(lèi)中時(shí)引用蛮浑。dubbo默認(rèn)懶漢式,我們按照dubbo的默認(rèn)配置進(jìn)行分析瞎访,看到服務(wù)引用的入口方法 -- ReferenceBean的getObject。

    public Object getObject() {
        return get();
    }

這個(gè)方法重寫(xiě)了FactoryBean的getObject吁恍,用于返回本工廠(chǎng)創(chuàng)建的對(duì)象實(shí)例扒秸。接下來(lái)看具體的源碼分析。

1.獲取對(duì)象實(shí)例

獲取對(duì)象實(shí)例獲得的是一個(gè)代理類(lèi)冀瓦,目的是避免框架的代碼對(duì)用戶(hù)代碼造成入侵伴奥,其方法調(diào)用關(guān)系如下。


獲取對(duì)象實(shí)例

ReferenceBean的getObject方法調(diào)用了其父類(lèi)ReferenceConfig的get方法翼闽。

    public synchronized T get() {
        checkAndUpdateSubConfigs();

        if (destroyed) {
            throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
        }
        if (ref == null) {
            init();
        }
        return ref;
    }

注意這是一個(gè)synchronized方法渔伯,防止ref(類(lèi)型為泛型)被重復(fù)初始化。首先判斷ref是否為null肄程,若為空則調(diào)用初始化方法。下面看初始化方法init选浑。

    private void init() {
        if (initialized) {
            return;
        }
        initialized = true;
        checkStubAndLocal(interfaceClass);
        checkMock(interfaceClass);
        Map<String, String> map = new HashMap<String, String>();

        map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
        appendRuntimeParameters(map);
        if (!isGeneric()) {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put("revision", revision);
            }

            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("No method found in service interface " + interfaceClass.getName());
                map.put("methods", Constants.ANY_VALUE);
            } else {
                map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }
        map.put(Constants.INTERFACE_KEY, interfaceName);
        appendParameters(map, application);
        appendParameters(map, module);
        appendParameters(map, consumer, Constants.DEFAULT_KEY);
        appendParameters(map, this);
        Map<String, Object> attributes = null;
        if (CollectionUtils.isNotEmpty(methods)) {
            attributes = new HashMap<String, Object>();
            for (MethodConfig methodConfig : methods) {
                appendParameters(map, methodConfig, methodConfig.getName());
                String retryKey = methodConfig.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(methodConfig.getName() + ".retries", "0");
                    }
                }
                attributes.put(methodConfig.getName(), convertMethodConfig2AyncInfo(methodConfig));
            }
        }

        String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
        if (StringUtils.isEmpty(hostToRegistry)) {
            hostToRegistry = NetUtils.getLocalHost();
        } else if (isInvalidLocalHost(hostToRegistry)) {
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
        }
        map.put(Constants.REGISTER_IP_KEY, hostToRegistry);

        ref = createProxy(map);

        ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), interfaceClass, ref, interfaceClass.getMethods(), attributes);
        ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
    }

init方法的主要目的是初始化ref變量蓝厌。首先判斷是否被初始化,若已初始化則直接返回古徒,否則開(kāi)始初始化拓提。將ApplicationConfig、ModuleConfig等配置信息寫(xiě)入map隧膘,然后基于map創(chuàng)建代理類(lèi)代态,賦值給ref寺惫。關(guān)注給ref賦值這步操作,ref獲得的其實(shí)是Invoker的代理類(lèi)蹦疑。這里調(diào)用了createProxy方法創(chuàng)建代理類(lèi)西雀,下面分析這個(gè)方法。

    private T createProxy(Map<String, String> map) {
        URL tmpUrl = new URL("temp", "localhost", 0, map);
        final boolean isJvmRefer;
        if (isInjvm() == null) {
            if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
                isJvmRefer = false;
            } else {
                // by default, reference local service if there is
                isJvmRefer = InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl);
            }
        } else {
            isJvmRefer = isInjvm();
        }

        if (isJvmRefer) {
            URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
            if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (StringUtils.isEmpty(url.getPath())) {
                            url = url.setPath(interfaceName);
                        }
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else { // assemble URL from register center's configuration
                checkRegistry();
                List<URL> us = loadRegistries(false);
                if (CollectionUtils.isNotEmpty(us)) {
                    for (URL u : us) {
                        URL monitorUrl = loadMonitor(u);
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                if (urls.isEmpty()) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }

            if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
                if (registryURL != null) { // registry url is available
                    // use RegistryAwareCluster only when register's cluster is available
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME);
                    // The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // not a registry url, must be direct invoke.
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }

        Boolean c = check;
        if (c == null && consumer != null) {
            c = consumer.isCheck();
        }
        if (c == null) {
            c = true; // default true
        }
        if (c && !invoker.isAvailable()) {
            // make it possible for consumer to retry later if provider is temporarily unavailable
            initialized = false;
            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }
        /**
         * @since 2.7.0
         * ServiceData Store
         */
        MetadataReportService metadataReportService = null;
        if ((metadataReportService = getMetadataReportService()) != null) {
            URL consumerURL = new URL(Constants.CONSUMER_PROTOCOL, map.remove(Constants.REGISTER_IP_KEY), 0, map.get(Constants.INTERFACE_KEY), map);
            metadataReportService.publishConsumer(consumerURL);
        }
        // create service proxy
        return (T) proxyFactory.getProxy(invoker);
    }

注意這個(gè)方法的參數(shù)map歉摧,其包含了上文提到的ApplicationConfig等配置信息艇肴。createProxy方法包含兩步:

  • 1.獲得invoker實(shí)例;
  • 2.創(chuàng)建Invoker的代理類(lèi)叁温;

首先分析獲得invoker實(shí)例再悼。判斷服務(wù)引用的方式是否為本地引用:injvm值不為null,就根據(jù)injvm的值判斷膝但;否則冲九,有指定的url,說(shuō)明不是本地引用跟束。

  • case1:是本地引用莺奸,根據(jù)本地協(xié)議、本地主機(jī)等新建URL泳炉,根據(jù)接口和URL通過(guò)協(xié)議引用服務(wù)憾筏,獲得invoker;
  • case2:不是本地引用
    -- case2.1:有指定的url花鹅,此時(shí)可能是服務(wù)直連也可能是通過(guò)注冊(cè)中心連接氧腰。解析url并且遍歷,判斷每個(gè)url的協(xié)議配置是否是registry刨肃,然后加入urls古拴。
    -- case2.2:沒(méi)有指定的url,此時(shí)是通過(guò)注冊(cè)中心連接真友。加載RegistryConfig配置獲得注冊(cè)中心的url黄痪,遍歷url加入urls。
    根據(jù)urls的列表長(zhǎng)度分為兩種情況:
    -- case1:urls的列表長(zhǎng)度為1盔然,說(shuō)明只有一個(gè)服務(wù)提供者桅打,直接引用服務(wù)獲得invoker即可;
    -- case2:urls的列表長(zhǎng)度大于1愈案,說(shuō)明存在多個(gè)服務(wù)提供者挺尾,遍歷urls的列表,引用每個(gè)服務(wù)獲得invoker并加入invokers列表站绪。然后通過(guò)集群管理將invokers列表生成invoker遭铺;

然后分析創(chuàng)建Invoker的代理類(lèi),代碼中調(diào)用了proxyFactory.getProxy來(lái)創(chuàng)建服務(wù)代理。

2.創(chuàng)建Invoker

Invoker 是 Dubbo 的核心模型魂挂,代表一個(gè)可執(zhí)行體甫题。在服務(wù)提供方,Invoker 用于調(diào)用服務(wù)提供類(lèi)涂召。在服務(wù)消費(fèi)方坠非,Invoker 用于執(zhí)行遠(yuǎn)程調(diào)用。

createProxy代碼中通過(guò)調(diào)用refprotocol.refer創(chuàng)建Invoker芹扭,這邊分析兩種常見(jiàn)的Protocol接口的實(shí)現(xiàn)麻顶,DubboProtocol和RegistryProtocol。

2.1 DubboProtocol.refer

方法調(diào)用關(guān)系如下圖舱卡。


從DubboProtocol的refer開(kāi)始分析辅肾。

    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);

        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);

        return invoker;
    }

該方法創(chuàng)建了一個(gè)DubboInvoker實(shí)例并返回,這里關(guān)注一下getClients方法轮锥。

    private ExchangeClient[] getClients(URL url) {
        // whether to share connection
        boolean useShareConnect = false;

        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        List<ReferenceCountExchangeClient> shareClients = null;
        // if not configured, connection is shared, otherwise, one connection for one service
        if (connections == 0) {
            useShareConnect = true;

            /**
             * The xml configuration should have a higher priority than properties.
             */
            String shareConnectionsStr = url.getParameter(Constants.SHARE_CONNECTIONS_KEY, (String) null);
            connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(Constants.SHARE_CONNECTIONS_KEY,
                    Constants.DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
            shareClients = getSharedClient(url, connections);
        }

        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (useShareConnect) {
                clients[i] = shareClients.get(i);

            } else {
                clients[i] = initClient(url);
            }
        }

        return clients;
    }

第一步判斷是否共享連接矫钓,默認(rèn)不共享。根據(jù)url的connections參數(shù)值判斷舍杜,若connections為0新娜,此時(shí)共享連接。先從xml配置或者配置文件獲取connection既绩,xml配置優(yōu)先于配置文件概龄,然后獲取共享的client。

  • 共享連接饲握,返回的ExchangeClient數(shù)組的值從共享的client獲人蕉拧;
  • 不共享連接救欧,返回的ExchangeClient數(shù)組的值由初始化client得到衰粹;

接下來(lái)先看getSharedClient方法。

    private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
        String key = url.getAddress();
        List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);

        if (checkClientCanUse(clients)) {
            batchClientRefIncr(clients);
            return clients;
        }

        locks.putIfAbsent(key, new Object());
        synchronized (locks.get(key)) {
            clients = referenceClientMap.get(key);
            // dubbo check
            if (checkClientCanUse(clients)) {
                batchClientRefIncr(clients);
                return clients;
            }

            // connectNum must be greater than or equal to 1
            connectNum = Math.max(connectNum, 1);

            // If the clients is empty, then the first initialization is
            if (CollectionUtils.isEmpty(clients)) {
                clients = buildReferenceCountExchangeClientList(url, connectNum);
                referenceClientMap.put(key, clients);

            } else {
                for (int i = 0; i < clients.size(); i++) {
                    ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
                    // If there is a client in the list that is no longer available, create a new one to replace him.
                    if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
                        clients.set(i, buildReferenceCountExchangeClient(url));
                        continue;
                    }

                    referenceCountExchangeClient.incrementAndGetCount();
                }
            }

            /**
             * I understand that the purpose of the remove operation here is to avoid the expired url key
             * always occupying this memory space.
             */
            locks.remove(key);

            return clients;
        }
    }

根據(jù)url的address獲取clients笆怠。

  • case1:clients可用铝耻,增加clients服務(wù)引用的數(shù)量,直接返回clients即可蹬刷;
  • case2:clients不可用瓢捉,加鎖雙重檢查
    -- case2.1 clients為空,創(chuàng)建clients并存入緩存办成;
    -- case2.2 clients非空泊柬,遍歷clients,遇到空的client則新建诈火,非空的client則增加服務(wù)引用計(jì)數(shù);

接著看initClient。

    private ExchangeClient initClient(URL url) {

        // client type setting.
        String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

        // BIO is not allowed since it has severe performance issue.
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }

        ExchangeClient client;
        try {
            // connection should be lazy
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);

            } else {
                client = Exchangers.connect(url, requestHandler);
            }

        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }

        return client;
    }

step1:獲取url的client參數(shù)冷守,即客戶(hù)端配置刀崖。
step2:url添加codec和heartbeat參數(shù)。
step3:客戶(hù)端配置對(duì)應(yīng)的Transporter是否存在拍摇,不存在則拋出異常亮钦。
step4:創(chuàng)建client實(shí)例。這邊有兩種情況:

  • lazy參數(shù)為true充活,創(chuàng)建懶加載ExchangeClient實(shí)例蜂莉;
  • lazy參數(shù)為false,創(chuàng)建普通ExchangeClient實(shí)例混卵;

看下創(chuàng)建普通ExchangeClient實(shí)例的情況映穗,是通過(guò)Exchangers.connect創(chuàng)建的。

    public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).connect(url, handler);
    }

這個(gè)方法獲取Exchanger實(shí)例幕随,然后調(diào)用其connect方法蚁滋。這邊分析Exchanger的默認(rèn)實(shí)現(xiàn)HeaderExchanger

    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

該方法返回了HeaderExchangeClient實(shí)例,初始化client完成赘淮。然后再關(guān)注一下調(diào)用的Transporters.connect方法辕录。

    public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        ChannelHandler handler;
        if (handlers == null || handlers.length == 0) {
            handler = new ChannelHandlerAdapter();
        } else if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().connect(url, handler);
    }

判斷handlers的數(shù)量:

  • handlers數(shù)量為0,新建一個(gè)ChannelHandlerAdapter實(shí)例賦值給handler梢卸;
  • handlers數(shù)量為1走诞,將該handler賦值;
  • handlers數(shù)量大于1蛤高,創(chuàng)建handler分發(fā)器蚣旱;

最后獲取Transporter實(shí)例,調(diào)用其connect方法襟齿,這邊分析Transporter接口的NettyTransporter實(shí)現(xiàn)姻锁。

    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

該方法的作用是創(chuàng)建并返回NettyClient實(shí)例。

這邊疑惑的是創(chuàng)建了HeaderExchangeClient實(shí)例猜欺,初始化客戶(hù)端的任務(wù)應(yīng)該就完成了位隶,為什么還要?jiǎng)?chuàng)建NettyClient實(shí)例。因?yàn)镠eaderExchangeClient不具備通信能力开皿,需要基于底層客戶(hù)端通信涧黄,譬如dubbo默認(rèn)的NettyClient。

2.2 RegistryProtocol.refer

RegistryProtocol的refer方法相對(duì)沒(méi)那么復(fù)雜赋荆,這邊簡(jiǎn)單分析一下笋妥。

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        url = url.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY)).removeParameter(REGISTRY_KEY);
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        return doRefer(cluster, registry, type, url);
    }

step1:獲取url的registry參數(shù),將其值賦給protocol參數(shù)窄潭,移除registry參數(shù)春宣。
step2:獲取Registry實(shí)例(注冊(cè)中心)。
step3:獲取Invoker,有以下幾種情況:

  • case1:type為RegistryService月帝,從proxyFactory中獲取Invoker躏惋;
  • case2:通過(guò)調(diào)用doRefer方法獲取Invoker實(shí)例:
    -- case2.1 group="*",doRefer方法的第一個(gè)參數(shù)Cluster通過(guò)getMergeableCluster()獲得嚷辅;
    -- case2.2 group="a,b"簿姨,doRefer方法的第一個(gè)參數(shù)Cluster即為cluster;

下面分析doRefer方法簸搞。

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
            directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(subscribeUrl);
        directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
                PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }

step1:新建RegistryDirectory實(shí)例扁位,設(shè)置registry和protocol;
step2:從url獲取參數(shù)配置趁俊,新建服務(wù)消費(fèi)者url域仇;
step3:注冊(cè)服務(wù)消費(fèi)者,訂閱providers/configurators/routers等節(jié)點(diǎn)數(shù)據(jù)则酝;
step4:通過(guò)cluster將多個(gè)服務(wù)提供者合并成一個(gè)殉簸,生成Invoker;

3.創(chuàng)建代理

方法調(diào)用關(guān)系如下圖沽讹。


創(chuàng)建代理

創(chuàng)建代理的入口方法是ProxyFactory的getProxy(Invoker<T> invoker)般卑,這里以AbstractProxyFactory為例進(jìn)行分析。

    public <T> T getProxy(Invoker<T> invoker) throws RpcException {
        return getProxy(invoker, false);
    }

調(diào)用了重載方法getProxy(Invoker<T> invoker, boolean generic)爽雄,接著來(lái)看重載方法蝠检。

    @Override
    public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
        Class<?>[] interfaces = null;
        String config = invoker.getUrl().getParameter(Constants.INTERFACES);
        if (config != null && config.length() > 0) {
            String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
            if (types != null && types.length > 0) {
                interfaces = new Class<?>[types.length + 2];
                interfaces[0] = invoker.getInterface();
                interfaces[1] = EchoService.class;
                for (int i = 0; i < types.length; i++) {
                    // TODO can we load successfully for a different classloader?.
                    interfaces[i + 2] = ReflectUtils.forName(types[i]);
                }
            }
        }
        if (interfaces == null) {
            interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
        }

        if (!GenericService.class.isAssignableFrom(invoker.getInterface()) && generic) {
            int len = interfaces.length;
            Class<?>[] temp = interfaces;
            interfaces = new Class<?>[len + 1];
            System.arraycopy(temp, 0, interfaces, 0, len);
            interfaces[len] = com.alibaba.dubbo.rpc.service.GenericService.class;
        }

        return getProxy(invoker, interfaces);
    }

這個(gè)方法的主要作用是生成接口數(shù)組Class<?>[]。首先獲取url的interfaces參數(shù)挚瘟,分為兩種情況:

  • case1:url的interfaces參數(shù)值不為空叹谁。接口數(shù)組的第一個(gè)元素是從invoker獲取的Interface,第二個(gè)元素是EchoService乘盖,其他元素是從interfaces參數(shù)值中解析得到的Class焰檩。
  • case2:url的interfaces參數(shù)值為空。接口數(shù)組包含兩個(gè)元素订框,第一個(gè)元素是從invoker獲取的Interface析苫,第二個(gè)元素是EchoService。

然后判斷是否是泛化調(diào)用穿扳,若是則增加接口數(shù)組的最后一個(gè)元素GenericService衩侥。最后調(diào)用抽象方法getProxy,這里分析子類(lèi)JavassistProxyFactory矛物。

    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

這個(gè)方法先獲取Proxy的子類(lèi)茫死,然后調(diào)用其newInstance方法新建實(shí)例÷男撸看到Proxy.getProxy

    public static Proxy getProxy(Class<?>... ics) {
        return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);
    }

這個(gè)方法調(diào)用了重載方法getProxy(ClassLoader cl, Class<?>... ics)

    public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
        if (ics.length > Constants.MAX_PROXY_COUNT) {
            throw new IllegalArgumentException("interface limit exceeded");
        }

        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < ics.length; i++) {
            String itf = ics[i].getName();
            if (!ics[i].isInterface()) {
                throw new RuntimeException(itf + " is not a interface.");
            }

            Class<?> tmp = null;
            try {
                tmp = Class.forName(itf, false, cl);
            } catch (ClassNotFoundException e) {
            }

            if (tmp != ics[i]) {
                throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
            }

            sb.append(itf).append(';');
        }

        // use interface class name list as key.
        String key = sb.toString();

        // get cache by class loader.
        Map<String, Object> cache;
        synchronized (ProxyCacheMap) {
            cache = ProxyCacheMap.computeIfAbsent(cl, k -> new HashMap<>());
        }

        Proxy proxy = null;
        synchronized (cache) {
            do {
                Object value = cache.get(key);
                if (value instanceof Reference<?>) {
                    proxy = (Proxy) ((Reference<?>) value).get();
                    if (proxy != null) {
                        return proxy;
                    }
                }

                if (value == PendingGenerationMarker) {
                    try {
                        cache.wait();
                    } catch (InterruptedException e) {
                    }
                } else {
                    cache.put(key, PendingGenerationMarker);
                    break;
                }
            }
            while (true);
        }

        long id = PROXY_CLASS_COUNTER.getAndIncrement();
        String pkg = null;
        ClassGenerator ccp = null, ccm = null;
        try {
            ccp = ClassGenerator.newInstance(cl);

            Set<String> worked = new HashSet<>();
            List<Method> methods = new ArrayList<>();

            for (int i = 0; i < ics.length; i++) {
                if (!Modifier.isPublic(ics[i].getModifiers())) {
                    String npkg = ics[i].getPackage().getName();
                    if (pkg == null) {
                        pkg = npkg;
                    } else {
                        if (!pkg.equals(npkg)) {
                            throw new IllegalArgumentException("non-public interfaces from different packages");
                        }
                    }
                }
                ccp.addInterface(ics[i]);

                for (Method method : ics[i].getMethods()) {
                    String desc = ReflectUtils.getDesc(method);
                    if (worked.contains(desc)) {
                        continue;
                    }
                    worked.add(desc);

                    int ix = methods.size();
                    Class<?> rt = method.getReturnType();
                    Class<?>[] pts = method.getParameterTypes();

                    StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
                    for (int j = 0; j < pts.length; j++) {
                        code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
                    }
                    code.append(" Object ret = handler.invoke(this, methods[").append(ix).append("], args);");
                    if (!Void.TYPE.equals(rt)) {
                        code.append(" return ").append(asArgument(rt, "ret")).append(";");
                    }

                    methods.add(method);
                    ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
                }
            }

            if (pkg == null) {
                pkg = PACKAGE_NAME;
            }

            // create ProxyInstance class.
            String pcn = pkg + ".proxy" + id;
            ccp.setClassName(pcn);
            ccp.addField("public static java.lang.reflect.Method[] methods;");
            ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
            ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
            ccp.addDefaultConstructor();
            Class<?> clazz = ccp.toClass();
            clazz.getField("methods").set(null, methods.toArray(new Method[0]));

            // create Proxy class.
            String fcn = Proxy.class.getName() + id;
            ccm = ClassGenerator.newInstance(cl);
            ccm.setClassName(fcn);
            ccm.addDefaultConstructor();
            ccm.setSuperClass(Proxy.class);
            ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
            Class<?> pc = ccm.toClass();
            proxy = (Proxy) pc.newInstance();
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        } finally {
            // release ClassGenerator
            if (ccp != null) {
                ccp.release();
            }
            if (ccm != null) {
                ccm.release();
            }
            synchronized (cache) {
                if (proxy == null) {
                    cache.remove(key);
                } else {
                    cache.put(key, new WeakReference<Proxy>(proxy));
                }
                cache.notifyAll();
            }
        }
        return proxy;
    }

step1:遍歷Class數(shù)組峦萎,獲取Class的類(lèi)名并通過(guò)類(lèi)加載器加載該類(lèi)屡久,驗(yàn)證加載的Class和Class數(shù)組中的是否是同一個(gè)Class,然后將類(lèi)名拼接成字符串骨杂。
step2:將上一步拼接的字符串作為key涂身,根據(jù)類(lèi)加載器從ProxyCacheMap中獲取cache,根據(jù)key從cache中獲取value
-- case1:value屬于Reference類(lèi)搓蚪,獲取proxy并返回;
-- case2:value等于PendingGenerationMarker丁鹉,等待(并發(fā)控制)妒潭;
-- case3:value為空,將PendingGenerationMarker放入cache揣钦;
step3:動(dòng)態(tài)生成服務(wù)接口代理類(lèi)和Proxy的子類(lèi)雳灾。ccp用于生成服務(wù)接口代理類(lèi)活逆,ccm用于生成Proxy的子類(lèi)钧嘶。

總結(jié)

至此,服務(wù)引用分析完成蹬铺。值得注意的是獲得的服務(wù)引用是Invoker的代理類(lèi)宇姚,因此本文的重點(diǎn)放在了Invoker的創(chuàng)建和代理的創(chuàng)建匈庭。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市浑劳,隨后出現(xiàn)的幾起案子阱持,更是在濱河造成了極大的恐慌,老刑警劉巖魔熏,帶你破解...
    沈念sama閱讀 211,194評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件衷咽,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡蒜绽,警方通過(guò)查閱死者的電腦和手機(jī)镶骗,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,058評(píng)論 2 385
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)躲雅,“玉大人鼎姊,你說(shuō)我怎么就攤上這事±艉唬” “怎么了此蜈?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,780評(píng)論 0 346
  • 文/不壞的土叔 我叫張陵,是天一觀(guān)的道長(zhǎng)噪生。 經(jīng)常有香客問(wèn)我裆赵,道長(zhǎng),這世上最難降的妖魔是什么跺嗽? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,388評(píng)論 1 283
  • 正文 為了忘掉前任战授,我火速辦了婚禮页藻,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘植兰。我一直安慰自己份帐,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,430評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布楣导。 她就那樣靜靜地躺著废境,像睡著了一般。 火紅的嫁衣襯著肌膚如雪筒繁。 梳的紋絲不亂的頭發(fā)上噩凹,一...
    開(kāi)封第一講書(shū)人閱讀 49,764評(píng)論 1 290
  • 那天,我揣著相機(jī)與錄音毡咏,去河邊找鬼驮宴。 笑死,一個(gè)胖子當(dāng)著我的面吹牛呕缭,可吹牛的內(nèi)容都是我干的堵泽。 我是一名探鬼主播,決...
    沈念sama閱讀 38,907評(píng)論 3 406
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼恢总,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼迎罗!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起离熏,我...
    開(kāi)封第一講書(shū)人閱讀 37,679評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤佳谦,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后滋戳,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體钻蔑,經(jīng)...
    沈念sama閱讀 44,122評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,459評(píng)論 2 325
  • 正文 我和宋清朗相戀三年奸鸯,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了咪笑。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,605評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡娄涩,死狀恐怖窗怒,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情蓄拣,我是刑警寧澤扬虚,帶...
    沈念sama閱讀 34,270評(píng)論 4 329
  • 正文 年R本政府宣布,位于F島的核電站球恤,受9級(jí)特大地震影響辜昵,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜咽斧,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,867評(píng)論 3 312
  • 文/蒙蒙 一堪置、第九天 我趴在偏房一處隱蔽的房頂上張望躬存。 院中可真熱鬧,春花似錦舀锨、人聲如沸岭洲。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,734評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)盾剩。三九已至,卻和暖如春替蔬,著一層夾襖步出監(jiān)牢的瞬間彪腔,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,961評(píng)論 1 265
  • 我被黑心中介騙來(lái)泰國(guó)打工进栽, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人恭垦。 一個(gè)月前我還...
    沈念sama閱讀 46,297評(píng)論 2 360
  • 正文 我出身青樓快毛,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親番挺。 傳聞我的和親對(duì)象是個(gè)殘疾皇子唠帝,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,472評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容