Dubbo-服務(wù)暴露 (一)

講過(guò)了集群容錯(cuò)的原理分析春寿,介些來(lái)我們看看這些服務(wù)是如何暴露出來(lái)供消費(fèi)者使用的喊熟。
這一節(jié)我們主要看看是什么時(shí)候開(kāi)始了服務(wù)暴露污淋,在這個(gè)過(guò)程中發(fā)生了什么嫉晶。

我們從ServiceBean講起,先看一下繼承結(jié)構(gòu):

image.png

可見(jiàn)它實(shí)現(xiàn)了許多的spring的接口敛滋,其中有一個(gè)ApplicationListener许布,熟悉spring的話可以知道這是一個(gè)事件監(jiān)聽(tīng)方法,當(dāng)spring發(fā)出事件的時(shí)候執(zhí)行這個(gè)方法绎晃。在ServiceBean中蜜唾,監(jiān)聽(tīng)的是ContextRefreshedEvent事件。

    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (isDelay() && !isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            export();
        }
    }

可以看到這里有個(gè)export方法庶艾,要執(zhí)行此方法需要三個(gè)條件:

  • isDelay(): 這個(gè)要注意是立即導(dǎo)出的意思袁余,不是需要延遲,通過(guò)實(shí)現(xiàn)過(guò)程就可以發(fā)現(xiàn)咱揍。
    private boolean isDelay() {
        Integer delay = getDelay();
        ProviderConfig provider = getProvider();
        if (delay == null && provider != null) {
            delay = provider.getDelay();
        } // 獲取到provider的 delay配置
        return supportedApplicationListener && (delay == null || delay == -1);
    }

(delay == null || delay == -1)就知道只有delay無(wú)效的時(shí)候才會(huì)返回true颖榜,所以這個(gè)方法好像寫(xiě)反了。

  • !isExported(): 返回ServiceConfigexported煤裙,通過(guò)查詢賦值過(guò)程可以發(fā)現(xiàn)只有在doExport中賦值true掩完。也就是說(shuō)在服務(wù)暴露后的一個(gè)標(biāo)記。
  • !isUnexported():返回ServiceConfigunexported硼砰,同上可知是在服務(wù)銷(xiāo)毀時(shí)候賦值的標(biāo)記且蓬。
    綜上可見(jiàn)要執(zhí)行export方法的過(guò)程是 是否有延遲導(dǎo)出 && 是否已導(dǎo)出 && 是不是已被取消導(dǎo)出 來(lái)決定的。
    繼續(xù)往下走進(jìn)入export
    public synchronized void export() {
        if (provider != null) {
            if (export == null) {
                export = provider.getExport();
            }
            if (delay == null) {
                delay = provider.getDelay();
            }
        }
        if (export != null && !export) {
            return;
        }

        if (delay != null && delay > 0) {
            delayExportExecutor.schedule(new Runnable() {
                public void run() {
                    doExport();
                }
            }, delay, TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }
    }

處理了是否需要延遲暴露题翰,需要的話放入線程池中稍后執(zhí)行doExport缅疟;否則立即執(zhí)行。
doExport

 protected synchronized void doExport() {
        // 執(zhí)行過(guò)取消暴露的話報(bào)錯(cuò)
        if (unexported) {
            throw new IllegalStateException("Already unexported!");
        }
        // 已經(jīng)暴露過(guò)就不繼續(xù)了
        if (exported) {
            return;
        }
        exported = true;
        // 檢查服務(wù)名稱合法性
        if (interfaceName == null || interfaceName.length() == 0) {
            throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
        }
        // 檢測(cè)provider是否為空遍愿,空的話新建并且根據(jù)系統(tǒng)變量進(jìn)行初始化。
        checkDefault();
        // 從其他配置中獲取配置耘斩,設(shè)置核心配置類(lèi)對(duì)象
        if (provider != null) {
            if (application == null) {
                application = provider.getApplication();
            }
            if (module == null) {
                module = provider.getModule();
            }
            if (registries == null) {
                registries = provider.getRegistries();
            }
            if (monitor == null) {
                monitor = provider.getMonitor();
            }
            if (protocols == null) {
                protocols = provider.getProtocols();
            }
        }
        if (module != null) {
            if (registries == null) {
                registries = module.getRegistries();
            }
            if (monitor == null) {
                monitor = module.getMonitor();
            }
        }
        if (application != null) {
            if (registries == null) {
                registries = application.getRegistries();
            }
            if (monitor == null) {
                monitor = application.getMonitor();
            }
        }
        // 設(shè)置泛化服務(wù)類(lèi)型
        if (ref instanceof GenericService) {
            interfaceClass = GenericService.class;
            if (StringUtils.isEmpty(generic)) {
                generic = Boolean.TRUE.toString();
            }
        } else {
            try {
            // 獲取對(duì)應(yīng)的服務(wù)類(lèi)信息
                interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                        .getContextClassLoader());
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            // 對(duì) interfaceClass沼填,以及 <dubbo:method> 標(biāo)簽中的必要字段進(jìn)行檢查
            checkInterfaceAndMethods(interfaceClass, methods);
            // 對(duì) ref 合法性進(jìn)行檢測(cè)
            checkRef();
            generic = Boolean.FALSE.toString();
        }
        if (local != null) {
            if ("true".equals(local)) {
                local = interfaceName + "Local";
            }
            Class<?> localClass;
            try {
                localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            if (!interfaceClass.isAssignableFrom(localClass)) {
                throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
            }
        }
        if (stub != null) {
            if ("true".equals(stub)) {
                stub = interfaceName + "Stub";
            }
            Class<?> stubClass;
            try {
                stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            if (!interfaceClass.isAssignableFrom(stubClass)) {
                throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName);
            }
        }
        // 檢查各模塊配置
        checkApplication();
        checkRegistry();
        checkProtocol();
        appendProperties(this);
        checkStubAndMock(interfaceClass);
        if (path == null || path.length() == 0) {
            path = interfaceName;
        }
         // 導(dǎo)出服務(wù)
        doExportUrls();
        
        ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
        ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
    }

總結(jié)一下在doExport中做的事情:

  • 檢測(cè)當(dāng)前服務(wù)狀態(tài),在未暴露括授,未取消暴露的情況下執(zhí)行坞笙。
  • 檢測(cè)服務(wù)名稱的合法性
  • 建立Provider,初始化
  • 從當(dāng)前配置中獲取核心配置并賦值荚虚。
  • 檢查是否是泛化調(diào)用薛夜,非泛華調(diào)用的進(jìn)行標(biāo)簽配置與服務(wù)類(lèi)型檢查。
  • 本地存根的配置與檢查版述。
  • 對(duì)配置類(lèi)進(jìn)行最后的檢查梯澜。
  • 執(zhí)行暴露
    先不管最后兩行代碼,留個(gè)問(wèn)題渴析,先來(lái)看看doExportUrls中做了什么事情晚伙。
    private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

先獲取了注冊(cè)中心的url集合吮龄,再遍歷protocols進(jìn)行暴露服務(wù)。
loadRegistries:

   protected List<URL> loadRegistries(boolean provider) {
        checkRegistry();
        List<URL> registryList = new ArrayList<URL>();
        if (registries != null && registries.size() > 0) {
            for (RegistryConfig config : registries) {
                String address = config.getAddress();
                if (address == null || address.length() == 0) {
                    address = Constants.ANYHOST_VALUE;
                }
                String sysaddress = System.getProperty("dubbo.registry.address");
                if (sysaddress != null && sysaddress.length() > 0) {
                    address = sysaddress;
                }
                if (address != null && address.length() > 0
                        && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
                    Map<String, String> map = new HashMap<String, String>();
                    appendParameters(map, application);
                    appendParameters(map, config);
                    map.put("path", RegistryService.class.getName());
                    map.put("dubbo", Version.getVersion());
                    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
                    if (ConfigUtils.getPid() > 0) {
                        map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
                    }
                    if (!map.containsKey("protocol")) {
                        if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
                            map.put("protocol", "remote");
                        } else {
                            map.put("protocol", "dubbo");
                        }
                    }
                    List<URL> urls = UrlUtils.parseURLs(address, map);
                    for (URL url : urls) {
                        url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
                        url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
                        if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
                                || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
                            registryList.add(url);
                        }
                    }
                }
            }
        }
        return registryList;

1.先從配置咆疗,環(huán)境變量獲取注冊(cè)中心的有效地址漓帚。
2.獲取到地址后,獲取應(yīng)用與注冊(cè)中心的配置信息午磁,再設(shè)置

  • path 注冊(cè)中心服務(wù)路徑尝抖,
  • dubbo dubbo版本,
  • timestamp啟動(dòng)時(shí)間戳迅皇,
  • pid應(yīng)用進(jìn)程id昧辽,
  • protocol協(xié)議信息 remote/dubbo
    3.通過(guò)UrlUtils.parseURLs組裝成Url,這里的address可以是多個(gè)喧半,對(duì)應(yīng)多個(gè)url奴迅,用|;分割。
    4.設(shè)置注冊(cè)中心的參數(shù)挺据,協(xié)議等信息取具,檢查有效性并加入結(jié)果中。

現(xiàn)在拿到了所有的注冊(cè)中心與協(xié)議信息扁耐,可以對(duì)協(xié)議與注冊(cè)中心的信息進(jìn)行組裝了暇检。
doExportUrlsFor1Protocol:

   private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        String name = protocolConfig.getName();
        if (name == null || name.length() == 0) {
            name = "dubbo";
        }

        Map<String, String> map = new HashMap<String, String>();
        map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
        map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
        if (ConfigUtils.getPid() > 0) {
            map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
        }
        appendParameters(map, application);
        appendParameters(map, module);
        appendParameters(map, provider, Constants.DEFAULT_KEY);
        appendParameters(map, protocolConfig);
        appendParameters(map, this);
        if (methods != null && !methods.isEmpty()) {
            for (MethodConfig method : methods) {
            //                ......
            } // end of methods for
        }

        if (ProtocolUtils.isGeneric(generic)) {
            map.put(Constants.GENERIC_KEY, generic);
            map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
        } else {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put("revision", revision);
            }

            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("NO method found in service interface " + interfaceClass.getName());
                map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
            } else {
                map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }
        if (!ConfigUtils.isEmpty(token)) {
            if (ConfigUtils.isDefault(token)) {
                map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
            } else {
                map.put(Constants.TOKEN_KEY, token);
            }
        }
        if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
            protocolConfig.setRegister(false);
            map.put("notify", "false");
        }
        // export service
        String contextPath = protocolConfig.getContextpath();
        if ((contextPath == null || contextPath.length() == 0) && provider != null) {
            contextPath = provider.getContextpath();
        }

        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
        URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }

        String scope = url.getParameter(Constants.SCOPE_KEY);
        // don't export when none is configured
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && !registryURLs.isEmpty()) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }

這里主要是進(jìn)行一些配置參數(shù)的組裝,對(duì)泛化與非泛化服務(wù)調(diào)用的參數(shù)處理等婉称,最后組裝成com.alibaba.dubbo.common.URL來(lái)持有服務(wù)的信息块仆。太過(guò)于細(xì)節(jié)的地方現(xiàn)在先不看,上面省略了methods配置的處理王暗,下面簡(jiǎn)單來(lái)看一下做了什么事悔据。

if (methods != null && !methods.isEmpty()) {
            for (MethodConfig method : methods) {
                appendParameters(map, method, method.getName());
                String retryKey = method.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(method.getName() + ".retries", "0");
                    }
                }
                List<ArgumentConfig> arguments = method.getArguments();
                if (arguments != null && !arguments.isEmpty()) {
                    for (ArgumentConfig argument : arguments) {
                        // convert argument type
                        if (argument.getType() != null && argument.getType().length() > 0) {
                            Method[] methods = interfaceClass.getMethods();
                            // visit all methods
                            if (methods != null && methods.length > 0) {
                                for (int i = 0; i < methods.length; i++) {
                                    String methodName = methods[i].getName();
                                    // target the method, and get its signature
                                    if (methodName.equals(method.getName())) {
                                        Class<?>[] argtypes = methods[i].getParameterTypes();
                                        // one callback in the method
                                        if (argument.getIndex() != -1) {
                                            if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                                appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                            } else {
                                                throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                            }
                                        } else {
                                            // multiple callbacks in the method
                                            for (int j = 0; j < argtypes.length; j++) {
                                                Class<?> argclazz = argtypes[j];
                                                if (argclazz.getName().equals(argument.getType())) {
                                                    appendParameters(map, argument, method.getName() + "." + j);
                                                    if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                        throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        } else if (argument.getIndex() != -1) {
                            appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                        } else {
                            throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                        }

                    }
                }
            } // end of methods for
        }

這里要對(duì)應(yīng)文檔看一下功能,處理的是這個(gè)標(biāo)簽:

<dubbo:reference interface="com.xxx.XxxService">
    <dubbo:method name="findXxx" timeout="3000" retries="2">
        <dubbo:argument index="0" callback="true" />
    </dubbo:method>
</dubbo:reference>

對(duì)每一個(gè)method標(biāo)簽處理俗壹,對(duì)每一個(gè)參數(shù)進(jìn)行設(shè)置科汗,對(duì)retryretries進(jìn)行處理,統(tǒng)一用retries表示绷雏,不重試的retries = 0头滔。
然后是一個(gè)很深的嵌套循環(huán),其實(shí)不難看懂涎显,兩層for循環(huán)for (ArgumentConfig argument : arguments) {for (int i = 0; i < methods.length; i++) {主要是為了把標(biāo)簽的method與服務(wù)類(lèi)反射獲得的方法進(jìn)行匹配坤检,來(lái)執(zhí)行對(duì)callback的處理。

if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
    appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
    throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}

匹配到了方法中的參數(shù)期吓,并且類(lèi)型相同的化早歇,就把callback的配置放入map中。
如果沒(méi)有設(shè)置參數(shù)的index缺前,那就把所有的參數(shù)都配置上callback的配置蛀醉。

繼續(xù)看

        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }

加載對(duì)應(yīng)協(xié)議的ConfiguratorFactory拯刁,設(shè)置參數(shù)

下面終于到了暴露服務(wù)的核心了

 String scope = url.getParameter(Constants.SCOPE_KEY);
        // don't export when none is configured
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

            // export to local if the config is not remote (export to remote only when config is remote)
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { // @1
                exportLocal(url);
            }
            // export to remote if the config is not local (export to local only when config is local)
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { // @2
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && !registryURLs.isEmpty()) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);

首先獲取到scope參數(shù),如果是none就什么也不做逝段,接下來(lái)看到兩個(gè)if代碼塊
@1本地服務(wù)暴露垛玻,@2遠(yuǎn)程服務(wù)暴露
這個(gè)配置的作用就是指定暴露的位置,默認(rèn)暴露本地與遠(yuǎn)程奶躯,不是遠(yuǎn)程就是本地帚桩,不是本地就是遠(yuǎn)程。
@1中的exportLocal單獨(dú)用一節(jié)來(lái)講嘹黔。
@2中有注冊(cè)中心時(shí)账嚎,為每一個(gè)注冊(cè)中心加載監(jiān)聽(tīng)參數(shù),最后生成invoker并導(dǎo)出儡蔓。沒(méi)有注冊(cè)中心時(shí)直接導(dǎo)出郭蕉。
在本地暴露或是遠(yuǎn)程暴露時(shí),都能看到時(shí)先用proxyFactory創(chuàng)建一個(gè)invoker喂江,再使用protocol.export進(jìn)行導(dǎo)出召锈,那我們先看一看invoker時(shí)怎么創(chuàng)建的。

Invoker 是實(shí)體域获询,它是 Dubbo 的核心模型涨岁,其它模型都向它靠擾,或轉(zhuǎn)換成它吉嚣,它代表一個(gè)可執(zhí)行體梢薪,可向它發(fā)起 invoke 調(diào)用,它有可能是一個(gè)本地的實(shí)現(xiàn)尝哆,也可能是一個(gè)遠(yuǎn)程的實(shí)現(xiàn)沮尿,也可能一個(gè)集群實(shí)現(xiàn)。

ProxyFactory的默認(rèn)實(shí)現(xiàn)為JavassistProxyFactory较解,簡(jiǎn)單地看一下里面是什么樣的:

/**
 * JavaassistRpcProxyFactory
 */
public class JavassistProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

getInvoker方法中,先用Wrapper為對(duì)應(yīng)的目標(biāo)類(lèi)創(chuàng)建一個(gè)wrapper赴邻,再返回一個(gè)匿名的Invoker印衔,重寫(xiě)了doInvoke方法,將調(diào)用交給了wrapper.invokeMethod處理姥敛。
這里先簡(jiǎn)單介紹下Wrapper.getWrapper方法奸焙,就是先從緩存中獲取目標(biāo)類(lèi)的wrapper對(duì)象,沒(méi)有的話,通過(guò)字符拼接的方式組裝方法与帆,通過(guò)Dubbo的ClassGenerator來(lái)生成一個(gè)目標(biāo)類(lèi)的Wrapper對(duì)象了赌。細(xì)節(jié)方面在后面放一節(jié)來(lái)講,需要一下javassist知識(shí)玄糟。

Invoker的創(chuàng)建過(guò)程大概知道了勿她,下面看protocol.export方法,通過(guò)SPI可以發(fā)現(xiàn)對(duì)應(yīng)多種實(shí)現(xiàn)阵翎,在調(diào)試過(guò)程中逢并,不能獲取到protocol的具體引用,是一個(gè)這個(gè)東西:

image.png

咱也不知道這是干啥的郭卫,這個(gè)需要了解dubbo spi的機(jī)制砍聊。關(guān)于spi的知識(shí)會(huì)在后面介紹,這里需要知道在調(diào)用InjvmProtocol或者其他的Protocol的時(shí)候贰军,需要先經(jīng)過(guò)QosProtocolWrapper玻蝌,ProtocolFilterWrapperProtocolListenerWrapperexport方法词疼。
先看一下本地暴露exportLocal:

    private void exportLocal(URL url) {
        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
            URL local = URL.valueOf(url.toFullString())
                    .setProtocol(Constants.LOCAL_PROTOCOL)
                    .setHost(LOCALHOST)
                    .setPort(0);
            ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
            Exporter<?> exporter = protocol.export(
                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
            exporters.add(exporter);
            logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
        }
    }

首先是判斷url中的protocol參數(shù)是否設(shè)置了injvm俯树,設(shè)置過(guò)的話表示已經(jīng)暴露過(guò),跳過(guò)寒跳。未設(shè)置的話進(jìn)入暴露流程聘萨。
為本地暴露組裝local的URL,加上protocol = injvm, host = 127.0.0.1, port = 0參數(shù)童太。
根據(jù)ref創(chuàng)建一個(gè)本地暴露用的invoker米辐,通過(guò)protocol.export暴露,并加入到緩存中书释。
暴露的過(guò)程在InjvmProtocol中:

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
    }

InjvmProtocol維護(hù)了一個(gè)InjvmExporter的緩存翘贮,暴露工作實(shí)際是就是創(chuàng)建一個(gè)InjvmExporter對(duì)象。本地暴露就是這么簡(jiǎn)單爆惧,繼續(xù)分析遠(yuǎn)程暴露狸页。

遠(yuǎn)程注冊(cè)中心暴露是通過(guò)RegistryProtocol執(zhí)行的,比本地復(fù)雜的多:

    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker @1
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

        URL registryUrl = getRegistryUrl(originInvoker); // @2

        //registry provider
        final Registry registry = getRegistry(originInvoker); // @3
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);

        //to judge to delay publish whether or not
        boolean register = registedProviderUrl.getParameter("register", true);

        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl); // @4

        if (register) { // @5
            register(registryUrl, registedProviderUrl);
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
        }

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // @6
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl); //@7
    }

@1通過(guò)doLocalExport對(duì)服務(wù)進(jìn)行導(dǎo)出扯再。
@2獲取注冊(cè)中心的URL芍耘。
@3獲取注冊(cè)中心,并拿到provider的URL熄阻,在ProviderConsumerRegTable注冊(cè)表中進(jìn)行provider的注冊(cè)斋竞。
@5向注冊(cè)中心注冊(cè)服務(wù),并設(shè)置provider的注冊(cè)標(biāo)記秃殉。
@6創(chuàng)建override的監(jiān)聽(tīng)坝初,目前先不看是做什么的浸剩。
@7返回一個(gè)DestroyableExporter

除了@7其他的都比較復(fù)雜,我們來(lái)一個(gè)一個(gè)分析下去鳄袍。
先來(lái)看一下第一步的doLocalExport做了什么

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }

使用雙重檢查鎖來(lái)獲得緩存中的exporter绢要。如果不存在,通過(guò)protocol.export創(chuàng)建拗小。之前講過(guò)在這個(gè)方法前會(huì)有三個(gè)裝飾器wrapper先執(zhí)行重罪,假設(shè)我們使用的是Dubbo的Protocol,那么來(lái)看一下DubboProtocol#export方法:

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }

開(kāi)頭是創(chuàng)建DubboExporter的過(guò)程十籍,中間是關(guān)于本地存根相關(guān)的處理蛆封,這里不做分析」蠢酰看一下最后兩個(gè)方法openServeroptimizeSerialization是分析的重點(diǎn)惨篱。
openServer:

    private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                serverMap.put(key, createServer(url));
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }

key是url中的進(jìn)程地址與端口號(hào),每一個(gè)key維護(hù)一個(gè)ExchangeServer围俘,沒(méi)有時(shí)進(jìn)行創(chuàng)建createServer砸讳,有的話進(jìn)行重置server.reset(url)
下面分兩個(gè)分支界牡,先講一下創(chuàng)建createServer的過(guò)程:

    private ExchangeServer createServer(URL url) {
        // send readonly event when server closes, it's enabled by default
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // @1
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // @2

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);

        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler); // @3
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY); // @4
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }

在前二行@1中設(shè)置了兩個(gè)參數(shù):

  • channel.readonly.sent表示server關(guān)閉時(shí)是否發(fā)送只讀事件
  • heartbeat 心跳時(shí)間默認(rèn)1分鐘簿寂。
    @2中獲取到server的類(lèi)型,并檢查是不是所支持的netty宿亡,mina常遂,grizzly
    @3設(shè)置編碼解碼器參數(shù)并創(chuàng)建server挽荠。
    @4檢查client參數(shù)是不是支持的克胳。
    主要步驟是檢測(cè)server參數(shù),創(chuàng)建server圈匆,檢測(cè)client參數(shù)漠另。
    繼續(xù)來(lái)看Exchangers.bind
    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);
    }

看看getExchanger做的事情:


    public static Exchanger getExchanger(URL url) {
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
        return getExchanger(type);
    }

    public static Exchanger getExchanger(String type) {
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }

獲取到了HeaderExchanger實(shí)例,看看bind方法:

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

包含了4個(gè)邏輯跃赚,3個(gè)是構(gòu)造方法笆搓,一個(gè)是Transporters.bind,我們主要關(guān)心下bind方法:

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }

用默認(rèn)的NettyTransporter纬傲,bind方法

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

直接創(chuàng)建了一個(gè)NettyServer满败,繼續(xù)看它的構(gòu)造函數(shù):


    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

super:

   public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();

        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            bindIp = NetUtils.ANYHOST;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
        try {
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        //fixme replace this with better method
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}

首先獲取到ip,port等信息賦默認(rèn)值叹括,然后執(zhí)行doOpen方法葫录。doOpen是一個(gè)模板方法,由子類(lèi)實(shí)現(xiàn)领猾。繼續(xù)看看NettyServerdoOpen

    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        // https://issues.jboss.org/browse/NETTY-365
        // https://issues.jboss.org/browse/NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }

創(chuàng)建boss與worker的線程池米同,創(chuàng)建Netty的ServerBootstrap,設(shè)置 PipelineFactory摔竿,綁定ip與端口面粮。
以上就是一個(gè)server的創(chuàng)建過(guò)程,后面的都是netty相關(guān)的源碼了继低。
總結(jié)一下這個(gè)server的創(chuàng)建過(guò)程熬苍,dubbo將netty的創(chuàng)建給封裝在NettyServer中,通過(guò)spi獲取到相關(guān)的Transpoter袁翁,獲取到創(chuàng)建server的相關(guān)參數(shù)來(lái)創(chuàng)建server柴底。

第二個(gè)分支server.reset,具體的調(diào)用路徑不貼了粱胜,主要看下AbstractServerreset方法:

   @Override
    public void reset(URL url) {
        if (url == null) {
            return;
        }
        try {
            if (url.hasParameter(Constants.ACCEPTS_KEY)) {
                int a = url.getParameter(Constants.ACCEPTS_KEY, 0);
                if (a > 0) {
                    this.accepts = a;
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
        try {
            if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) {
                int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0);
                if (t > 0) {
                    this.idleTimeout = t;
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
        try {
            if (url.hasParameter(Constants.THREADS_KEY)
                    && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
                ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
                int threads = url.getParameter(Constants.THREADS_KEY, 0);
                int max = threadPoolExecutor.getMaximumPoolSize();
                int core = threadPoolExecutor.getCorePoolSize();
                if (threads > 0 && (threads != max || threads != core)) {
                    if (threads < core) {
                        threadPoolExecutor.setCorePoolSize(threads);
                        if (core == max) {
                            threadPoolExecutor.setMaximumPoolSize(threads);
                        }
                    } else {
                        threadPoolExecutor.setMaximumPoolSize(threads);
                        if (core == max) {
                            threadPoolExecutor.setCorePoolSize(threads);
                        }
                    }
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
        super.setUrl(getUrl().addParameters(url.getParameters()));
    }

設(shè)置一些參數(shù) 最大可連接的客戶端數(shù)量accepts柄驻,空閑超時(shí)時(shí)間idleTimeout,線程數(shù)threads焙压。
在設(shè)置threads的時(shí)候鸿脓,會(huì)把threads的值與當(dāng)前線程池比較。

  • 如果threads比核心線程數(shù)小涯曲,那么減小核心線程數(shù)野哭。
  • 如果threads比核心線程數(shù)大,那么增大最大線程數(shù)幻件。
    這里這么設(shè)置我想是為了不影響原來(lái)正在執(zhí)行的線程吧拨黔,同時(shí)節(jié)省資源,留個(gè)疑問(wèn)绰沥?篱蝇??
    最后一行是對(duì)url的一個(gè)重置揪利,把當(dāng)前的持有的url與暴露出來(lái)的url參數(shù)進(jìn)行合并态兴,放入AbstractPeer里面的url。
    protected void setUrl(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        this.url = url;
    }

AbstractPeer.url是由volatile修飾的疟位,通過(guò)調(diào)試發(fā)現(xiàn)瞻润,這個(gè)url的基本信息path,host等都是不變的,變的只有parameters甜刻∩茏玻可見(jiàn)這個(gè)url在這里的作用與具體的服務(wù)無(wú)關(guān),只是刷新參數(shù)信息得院,可能是與server相關(guān)的一些參數(shù)調(diào)整傻铣。當(dāng)然參數(shù)調(diào)整也不是任何都會(huì)調(diào)整,回過(guò)來(lái)看AbstractServer#reset剛才講了三個(gè)參數(shù)的賦值祥绞,都是有條件的acceptsidleTimeout只能是單調(diào)遞增的調(diào)整非洲。這就是要把server的這些參數(shù)調(diào)整為暴露服務(wù)中的最大值以保證配置生效鸭限。

講完了createServerserver.reset了,整個(gè)openServer的過(guò)程大概清楚了两踏。

然后調(diào)用optimizeSerialization(URL url)方法,將指定的序列化類(lèi)加載到緩存,通常配置kryo, fst序列化,不配置,默認(rèn)用hessian2序列化败京。
到此為止服務(wù)的遠(yuǎn)程暴露過(guò)程就結(jié)束了∶稳荆回到RegistryProtocol赡麦,知道了doLoaclExport后來(lái)看看是如何注冊(cè)的:

        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);

        if (register) {
            register(registryUrl, registedProviderUrl);
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
        }

首先看一下注冊(cè)表的結(jié)構(gòu):

    public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>();
    public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>();

    public static void registerProvider(Invoker invoker, URL registryUrl, URL providerUrl) {
        ProviderInvokerWrapper wrapperInvoker = new ProviderInvokerWrapper(invoker, registryUrl, providerUrl);
        String serviceUniqueName = providerUrl.getServiceKey();
        Set<ProviderInvokerWrapper> invokers = providerInvokers.get(serviceUniqueName);
        if (invokers == null) {
            providerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashSet<ProviderInvokerWrapper>());
            invokers = providerInvokers.get(serviceUniqueName);
        }
        invokers.add(wrapperInvoker);
    }

通過(guò)ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> 分別維護(hù)了provider與consumer的服務(wù)映射,key為服務(wù)的group/serviceName:version組裝帕识,映射了不同的invoker泛粹。invoker由ProviderInvokerWrapper裝飾,封裝了invoker肮疗,注冊(cè)中心url晶姊,invoker的url,provider的url族吻。同時(shí)由isReg來(lái)標(biāo)記是否注冊(cè)成功帽借。
接下來(lái)看注冊(cè)的過(guò)程,主要分析下register方法超歌。

    public void register(URL registryUrl, URL registedProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registedProviderUrl);
    }

看看AbstractRegistryFactory#getRegistry

    @Override
    public Registry getRegistry(URL url) {
        url = url.setPath(RegistryService.class.getName())
                .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
        String key = url.toServiceString();
        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末砍艾,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子巍举,更是在濱河造成了極大的恐慌脆荷,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,273評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件懊悯,死亡現(xiàn)場(chǎng)離奇詭異蜓谋,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)炭分,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,349評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門(mén)桃焕,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人捧毛,你說(shuō)我怎么就攤上這事观堂。” “怎么了呀忧?”我有些...
    開(kāi)封第一講書(shū)人閱讀 167,709評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵师痕,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我而账,道長(zhǎng)胰坟,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,520評(píng)論 1 296
  • 正文 為了忘掉前任泞辐,我火速辦了婚禮笔横,結(jié)果婚禮上竞滓,老公的妹妹穿的比我還像新娘。我一直安慰自己吹缔,他們只是感情好虽界,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,515評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著涛菠,像睡著了一般。 火紅的嫁衣襯著肌膚如雪撇吞。 梳的紋絲不亂的頭發(fā)上俗冻,一...
    開(kāi)封第一講書(shū)人閱讀 52,158評(píng)論 1 308
  • 那天,我揣著相機(jī)與錄音牍颈,去河邊找鬼迄薄。 笑死,一個(gè)胖子當(dāng)著我的面吹牛煮岁,可吹牛的內(nèi)容都是我干的讥蔽。 我是一名探鬼主播,決...
    沈念sama閱讀 40,755評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼画机,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼冶伞!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起步氏,我...
    開(kāi)封第一講書(shū)人閱讀 39,660評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤响禽,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后荚醒,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體芋类,經(jīng)...
    沈念sama閱讀 46,203評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,287評(píng)論 3 340
  • 正文 我和宋清朗相戀三年界阁,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了侯繁。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,427評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡泡躯,死狀恐怖贮竟,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情精续,我是刑警寧澤坝锰,帶...
    沈念sama閱讀 36,122評(píng)論 5 349
  • 正文 年R本政府宣布,位于F島的核電站重付,受9級(jí)特大地震影響顷级,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜确垫,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,801評(píng)論 3 333
  • 文/蒙蒙 一弓颈、第九天 我趴在偏房一處隱蔽的房頂上張望帽芽。 院中可真熱鬧,春花似錦翔冀、人聲如沸导街。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,272評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)搬瑰。三九已至,卻和暖如春控硼,著一層夾襖步出監(jiān)牢的瞬間泽论,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,393評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工卡乾, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留翼悴,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,808評(píng)論 3 376
  • 正文 我出身青樓幔妨,卻偏偏與公主長(zhǎng)得像鹦赎,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子误堡,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,440評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容

  • 先看官網(wǎng)兩張圖【引用來(lái)自官網(wǎng)】:image.png 官網(wǎng)說(shuō)明: 1.首先 ServiceConfig 類(lèi)拿到對(duì)外提...
    致慮閱讀 1,439評(píng)論 1 4
  • dubbo暴露服務(wù)有兩種情況古话,一種是設(shè)置了延遲暴露(比如delay="5000"),另外一種是沒(méi)有設(shè)置延遲暴露或者...
    加大裝益達(dá)閱讀 21,279評(píng)論 5 36
  • 先看官網(wǎng)兩張圖【引用來(lái)自官網(wǎng)】:image.png 官網(wǎng)說(shuō)明: 1.首先 ReferenceConfig 類(lèi)的 i...
    致慮閱讀 1,028評(píng)論 0 2
  • 時(shí)序圖 在講解源碼前埂伦,先看下官方文檔提供的時(shí)序圖煞额,后面的講解基本是這個(gè)路線,但是會(huì)更細(xì)節(jié)化 大致邏輯 首先服務(wù)的實(shí)...
    土豆肉絲蓋澆飯閱讀 2,893評(píng)論 2 3
  • 本文將詳細(xì)分析Dubbo的服務(wù)發(fā)布流程沾谜,建議結(jié)合文章Dubbo SPI 機(jī)制解析一起閱讀殖熟。 在開(kāi)始分析之前扣癣,有必須...
    匠丶閱讀 4,091評(píng)論 0 6