Dubbo源碼分析----暴露服務(wù)

暴露服務(wù)的過(guò)程中齿诞,會(huì)涉及到兩個(gè)Protocol

  1. DubboProtocol主要是做網(wǎng)絡(luò)通信相關(guān)初始化
  2. RegistryProtocol主要是做zk的注冊(cè)和訂閱相關(guān)

在提供一個(gè)服務(wù)的時(shí)候搅吁,需要在配置文件里聲明如下xml

<dubbo:service....

????然后Spring會(huì)根據(jù)對(duì)應(yīng)關(guān)系執(zhí)行對(duì)應(yīng)的BeanDefinitionParser尊蚁,然后實(shí)例化對(duì)應(yīng)的類(lèi)职烧,提供一個(gè)服務(wù)的時(shí)候會(huì)實(shí)例化ServiceBean(具體對(duì)應(yīng)關(guān)系看DubboNamespaceHandler類(lèi);spring解析自定義標(biāo)簽可以看下spring源碼關(guān)于標(biāo)簽的處理慰于,這里就不說(shuō)了)

????ServiceBean實(shí)現(xiàn)了InitializingBean和ApplicationContextAware接口,所以會(huì)執(zhí)行afterPropertiesSet和onApplicationEvent方法,這里就是入口单起,然后就會(huì)執(zhí)行export方法暴露服務(wù)

????一路跟下去,都是設(shè)置一下屬性值劣坊,然后到了doExportUrls方法便開(kāi)始主要的邏輯

    private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);//獲取注冊(cè)中心的url
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

????在Zookeeper為注冊(cè)中心的情況下嘀倒,registryURLs值如下
[registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=test&dubbo=2.0.0&owner=william&pid=4444&registry=zookeeper&timestamp=1488886235790]

????進(jìn)入doExportUrlsFor1Protocol方法,看下暴露服務(wù)的主要邏輯

        if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

            //配置不是remote的情況下做本地暴露 (配置為remote局冰,則表示只暴露遠(yuǎn)程服務(wù))
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            //如果配置不是local則暴露為遠(yuǎn)程服務(wù).(配置為local测蘑,則表示只暴露遠(yuǎn)程服務(wù))
            if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && registryURLs.size() > 0
                        && url.getParameter("register", true)) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

                        Exporter<?> exporter = protocol.export(invoker);
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

                    Exporter<?> exporter = protocol.export(invoker);
                    exporters.add(exporter);
                }
            }
        }

1.本地暴露

????會(huì)先使用exportLocal暴露本地服務(wù)

    private void exportLocal(URL url) {
        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
            URL local = URL.valueOf(url.toFullString())
                    .setProtocol(Constants.LOCAL_PROTOCOL)
                    .setHost(NetUtils.LOCALHOST)
                    .setPort(0);
            Exporter<?> exporter = protocol.export(
                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
            exporters.add(exporter);
            logger.info("Export dubbo service " + interfaceClass.getName() +" to local registry");
        }
    }

????看了Dubbo的擴(kuò)展機(jī)制會(huì)知道,ProxyFactory默認(rèn)會(huì)使用接口上@Spi注解聲明的服務(wù)康二,為了容易理解碳胳,我把注解上的@SPI設(shè)置成jdk,那么就會(huì)使用jdk對(duì)應(yīng)的實(shí)現(xiàn)類(lèi)沫勿,即JdkProxyFactory
????JdkProxyFactory的getInvoker方法如下:

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, 
                                      Class<?>[] parameterTypes, 
                                      Object[] arguments) throws Throwable {
                Method method = proxy.getClass().getMethod(methodName, parameterTypes);
                return method.invoke(proxy, arguments);
            }
        };
    }

????返回一個(gè)AbstractProxyInvoker類(lèi)的對(duì)象挨约,這個(gè)AbstractProxyInvoker主要是接收消費(fèi)方的請(qǐng)求后,執(zhí)行本地方法的一個(gè)Invoker产雹,其中是使用了反射機(jī)制來(lái)調(diào)用了本地方法
????獲取到Invoker之后诫惭,需要使用Protocol的export來(lái)暴露這個(gè)服務(wù),在講Dubbo擴(kuò)展機(jī)制的時(shí)候蔓挖,Protocol外面有兩個(gè)裝飾類(lèi)夕土,那么export會(huì)先調(diào)用ProtocolListenerWrapper

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return new ListenerExporterWrapper<T>(protocol.export(invoker), 
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                        .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
    }

????這時(shí)的url為
injvm://127.0.0.1/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=test&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=4444&side=provider&timestamp=1488887324869
????對(duì)象為

image.png

????那么if條件不滿(mǎn)足,將調(diào)用ProtocolFilterWrapper的export方法

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
    }

????If條件還是不滿(mǎn)足瘟判,執(zhí)行下面的代碼
????注意:這里的protocol就是InjvmProtocol了

????然后看下buildInvokerChain方法怨绣,這個(gè)方法建立了一個(gè)個(gè)的filter,使用了責(zé)任鏈模式拷获,一個(gè)普通的Invoker調(diào)用也會(huì)經(jīng)歷這些filter篮撑,每個(gè)filter都有自己特殊的功能

    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (filters.size() > 0) {
            for (int i = filters.size() - 1; i >= 0; i --) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }
                    public URL getUrl() {
                        return invoker.getUrl();
                    }
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }
                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }
                    public void destroy() {
                        invoker.destroy();
                    }
                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }

????這里的group是provider,key是service.filter
????看下getActivateExtension方法實(shí)現(xiàn)

    public List<T> getActivateExtension(URL url, String key, String group) {
        String value = url.getParameter(key);
        return getActivateExtension(url, value == null || value.length() == 0 ? null : Constants.COMMA_SPLIT_PATTERN.split(value), group);
    }

    public List<T> getActivateExtension(URL url, String[] values, String group) {
        List<T> exts = new ArrayList<T>();
        List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
        if (! names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
            getExtensionClasses();
            for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
                String name = entry.getKey();
                Activate activate = entry.getValue();
                if (isMatchGroup(group, activate.group())) {
                    T ext = getExtension(name);
                    if (! names.contains(name)
                            && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name) 
                            && isActive(activate, url)) {
                        exts.add(ext);
                    }
                }
            }
            Collections.sort(exts, ActivateComparator.COMPARATOR);
        }
        List<T> usrs = new ArrayList<T>();
        for (int i = 0; i < names.size(); i ++) {
            String name = names.get(i);
            if (! name.startsWith(Constants.REMOVE_VALUE_PREFIX)
                    && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
                if (Constants.DEFAULT_KEY.equals(name)) {
                    if (usrs.size() > 0) {
                        exts.addAll(0, usrs);
                        usrs.clear();
                    }
                } else {
                    T ext = getExtension(name);
                    usrs.add(ext);
                }
            }
        }
        if (usrs.size() > 0) {
            exts.addAll(usrs);
        }
        return exts;
    }

????由于url中沒(méi)有service.filter的key匆瓜,所以values為[]
????cachedActivates記錄了接口實(shí)現(xiàn)中帶有Activate注解的類(lèi)咽扇,需要篩選出group為provider的實(shí)現(xiàn)類(lèi),最后進(jìn)行排序


filter.png

????過(guò)濾器調(diào)用的順序和上圖的順序一樣

????最后會(huì)調(diào)用InjvmProtocol的export方法陕壹,將invoker封裝成InjvmExporter返回质欲,得到Exporter之后,放到List里

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
    }

2.遠(yuǎn)程暴露

回到doExportUrlsFor1Protocol方法糠馆,暴露本地服務(wù)之后嘶伟,根據(jù)注冊(cè)中心的地址暴露遠(yuǎn)程服務(wù)

    for (URL registryURL : registryURLs) {
        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
        URL monitorUrl = loadMonitor(registryURL);
        if (monitorUrl != null) {
            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
        }
        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

        Exporter<?> exporter = protocol.export(invoker);
        exporters.add(exporter);
    }

獲取Invoker的方式和之前一樣
而實(shí)際調(diào)用的是哪個(gè)Protocol對(duì)象呢?根據(jù)Invoker中protocol屬性的值(值為registry)和Dubbo擴(kuò)展機(jī)制可以知道調(diào)用的是RegistryProtocol

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        registry.register(registedProviderUrl);
        // 訂閱override數(shù)據(jù)
        // FIXME 提供者訂閱時(shí)又碌,會(huì)影響同一JVM即暴露服務(wù)九昧,又引用同一服務(wù)的的場(chǎng)景绊袋,因?yàn)閟ubscribed以服務(wù)名為緩存的key,導(dǎo)致訂閱信息覆蓋铸鹰。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //保證每次export都返回一個(gè)新的exporter實(shí)例
        return new Exporter<T>() {
            public Invoker<T> getInvoker() {
                return exporter.getInvoker();
            }
            public void unexport() {
                try {
                    exporter.unexport();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    registry.unregister(registedProviderUrl);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    overrideListeners.remove(overrideSubscribeUrl);
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        };
    }

2.1 doLocalExport

先看下doLocalExport方法

    private <T> ExporterChangeableWrapper<T>  doLocalExport(final Invoker<T> originInvoker){
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return (ExporterChangeableWrapper<T>) exporter;
    }

originInvoker的url值為:
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=test&dubbo=2.0.0&export=dubbo%3A%2F%2F10.1.87.36%3A20888%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Dtest%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26loadbalance%3Droundrobin%26methods%3DsayHello%26owner%3Dwilliam%26pid%3D14204%26side%3Dprovider%26threads%3D1%26timestamp%3D1518580683573&owner=william&pid=14204&registry=zookeeper&timestamp=1518580683551
url是代表注冊(cè)中心相關(guān)信息癌别,getCacheKey是獲取url中的provider屬性,即provider的url:
dubbo://10.1.87.36:20888/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=test&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=william&pid=14204&side=provider&threads=1&timestamp=1518580683573
第一次進(jìn)來(lái)的時(shí)候蹋笼,bounds中該key對(duì)應(yīng)的值為空展姐,所以根據(jù)provider的url和originInvoker封裝成新的Invoker,此時(shí)Invoker的url就是provider的url剖毯,其中protocol的值為dubbo圾笨,那么將調(diào)用DubboProtocol的export方法,一開(kāi)始已經(jīng)介紹逊谋,DubboProtocol主要是做網(wǎng)絡(luò)通信相關(guān)初始化
DubboProtocol的export方法如下:

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);
        
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice){
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
                if (logger.isWarnEnabled()){
                    logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url);
        
        return exporter;
    }

key=全類(lèi)名+dubbo監(jiān)聽(tīng)端口號(hào)
然后將Invoker封裝成Export擂达,然后放到map中
所以看下來(lái),Export和invoker是一對(duì)一的關(guān)系
exporterMap主要是將服務(wù)名和export進(jìn)行關(guān)聯(lián)胶滋,看到這里其實(shí)已經(jīng)差不多結(jié)束了板鬓,Dubbo接收到consumer的請(qǐng)求時(shí),會(huì)在exporterMap中找到對(duì)應(yīng)的exporter究恤,然后找到對(duì)應(yīng)的Invoker俭令,有了invoker就可以調(diào)用本地的服務(wù)

2.2 getRegistry

再回到RegistryProtocol的exprot方法中,接下來(lái)就是執(zhí)行g(shù)etRegistry(originInvoker)這句代碼了丁溅,主要是做注冊(cè)中心的初始化

    /**
     * 根據(jù)invoker的地址獲取registry實(shí)例
     * @param originInvoker
     * @return
     */
    private Registry getRegistry(final Invoker<?> originInvoker){
        URL registryUrl = originInvoker.getUrl();
        if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
            String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
            registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
        }
        return registryFactory.getRegistry(registryUrl);
    }

在上面我們看到invoker的url中唤蔗,registry屬性是zookeeper探遵,根據(jù)spi機(jī)制窟赏,該RegistryFactory為ZookeeperRegistryFactory,getRegistry會(huì)調(diào)用到父類(lèi)AbstractRegistryFactory的方法

    public Registry getRegistry(URL url) {
        url = url.setPath(RegistryService.class.getName())
                .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
        String key = url.toServiceString();
        // 鎖定注冊(cè)中心獲取過(guò)程箱季,保證注冊(cè)中心單一實(shí)例
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // 釋放鎖
            LOCK.unlock();
        }
    }

createRegistry在父類(lèi)中沒(méi)有實(shí)現(xiàn)涯穷,調(diào)用到ZookeeperRegistryFactory中

    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

先看下Registry的繼承結(jié)構(gòu)


image.png

可以看到ZookeeperRegistry有兩個(gè)父類(lèi),所以ZookeeperRegistry會(huì)先調(diào)用父類(lèi)的構(gòu)造方法藏雏,這個(gè)過(guò)程主要做了幾件事:

  1. 加載服務(wù)緩存文件(AbstractRegistry)
  2. 異步(默認(rèn))更新緩存文件(AbstractRegistry)
  3. 定時(shí)重試失敗的動(dòng)作:注冊(cè)失敗拷况,取消注冊(cè)失敗,訂閱失敗掘殴,取消訂閱失敗赚瘦,通知失敗(FailbackRegistry)
  4. 初始化zk通信相關(guān)(ZookeeperRegistry)
加載服務(wù)緩存文件:
    public AbstractRegistry(URL url) {
        setUrl(url);
        // 啟動(dòng)文件保存定時(shí)器
        syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
        String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache");
        File file = null;
        if (ConfigUtils.isNotEmpty(filename)) {
            file = new File(filename);
            if(! file.exists() && file.getParentFile() != null && ! file.getParentFile().exists()){
                if(! file.getParentFile().mkdirs()){
                    throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                }
            }
        }
        this.file = file;
        loadProperties();
        notify(url.getBackupUrls());
    }

loadProperties主要是將dubbo的服務(wù)緩存文件加載進(jìn)來(lái),轉(zhuǎn)換成Properties對(duì)象
Key=服務(wù)名 Value=url

更新緩存文件:

AbstractRegistry類(lèi)中還有一個(gè)doSaveProperties方法奏寨,主要用來(lái)更新緩存文件起意,在更新前會(huì)先把本地文件的內(nèi)容先更新到properties對(duì)象中,然后再進(jìn)行更新操作

    public void doSaveProperties(long version) {
        if(version < lastCacheChanged.get()){
            return;
        }
        if (file == null) {
            return;
        }
        Properties newProperties = new Properties();
        // 保存之前先讀取一遍病瞳,防止多個(gè)注冊(cè)中心之間沖突
        InputStream in = null;
        try {
            if (file.exists()) {
                in = new FileInputStream(file);
                newProperties.load(in);
            }
        } catch (Throwable e) {
        } finally {
            ....
        }     
     // 保存
        try {
            newProperties.putAll(properties);
            File lockfile = new File(file.getAbsolutePath() + ".lock");
            if (!lockfile.exists()) {
                lockfile.createNewFile();
            }
            RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
            try {
                FileChannel channel = raf.getChannel();
                try {
                    FileLock lock = channel.tryLock();
                    if (lock == null) {
                        throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
                    }
                    // 保存
                    try {
                        if (! file.exists()) {
                            file.createNewFile();
                        }
                        FileOutputStream outputFile = new FileOutputStream(file);  
                        try {
                            newProperties.store(outputFile, "Dubbo Registry Cache");
                        } finally {
                            outputFile.close();
                        }
                    } finally {
                        lock.release();
                    }
                } finally {
                    channel.close();
                }
            } finally {揽咕,
            }
        } catch (Throwable e) {
            if (version < lastCacheChanged.get()) {
                return;
            } else {
                registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
            }
        }
    }

更新的時(shí)候會(huì)對(duì).lock文件進(jìn)行加鎖悲酷,這樣其他線(xiàn)程/進(jìn)程就無(wú)法修改緩存文件,如果tryLock返回空亲善,證明該文件正在被修改设易,那么增加版本號(hào)進(jìn)行重試
doSaveProperties方法開(kāi)頭的時(shí)候有句代碼:

if(version < lastCacheChanged.get()){return;}

可以看到保存的時(shí)候有版本號(hào)的判斷,lastCacheChanged這個(gè)是在saveProperties方法中增加的蛹头,如果當(dāng)條件滿(mǎn)足顿肺,那么證明在這個(gè)線(xiàn)程執(zhí)行saveProperties方法之后還沒(méi)執(zhí)行doSaveProperties操作的時(shí)候,又有線(xiàn)程執(zhí)行了saveProperties操作掘而,把version+1了挟冠,那么當(dāng)前線(xiàn)程直接return,讓后續(xù)線(xiàn)程繼續(xù)

定時(shí)重試失敗的動(dòng)作

構(gòu)造方法里起了一個(gè)定時(shí)器袍睡,定時(shí)進(jìn)行重試操作知染,代碼簡(jiǎn)化如下:

    // 重試失敗的動(dòng)作
    protected void retry() {
        if (! failedRegistered.isEmpty()) {
            Set<URL> failed = new HashSet<URL>(failedRegistered);
            for (URL url : failed) {
                    doRegister(url);
                    failedRegistered.remove(url);
            }
        }
        if (! failedUnregistered.isEmpty()) {
            Set<URL> failed = new HashSet<URL>(failedUnregistered);
            if (logger.isInfoEnabled()) {
                logger.info("Retry unregister " + failed);
            }
            for (URL url : failed) {
                    doUnregister(url);
                    failedUnregistered.remove(url);
            }
        }
        if (! failedSubscribed.isEmpty()) {
            Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed);
            for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
                URL url = entry.getKey();
                Set<NotifyListener> listeners = entry.getValue();
                for (NotifyListener listener : listeners) {
                    doSubscribe(url, listener);
                    listeners.remove(listener);
                }
            }
        }
        if (! failedUnsubscribed.isEmpty()) {
            Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedUnsubscribed);
            for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
                URL url = entry.getKey();
                Set<NotifyListener> listeners = entry.getValue();
                for (NotifyListener listener : listeners) {
                    try {
                        doUnsubscribe(url, listener);
                        listeners.remove(listener);
                    } catch (Throwable t) { // 忽略所有異常,等待下次重試
                        logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        if (! failedNotified.isEmpty()) {
            Map<URL, Map<NotifyListener, List<URL>>> failed = new HashMap<URL, Map<NotifyListener, List<URL>>>(failedNotified);
            for (Map<NotifyListener, List<URL>> values : failed.values()) {
                for (Map.Entry<NotifyListener, List<URL>> entry : values.entrySet()) {
                    NotifyListener listener = entry.getKey();
                    List<URL> urls = entry.getValue();
                    listener.notify(urls);
                    values.remove(listener);
                }
            }
        }
    }

從對(duì)面失敗的集合從取數(shù)據(jù)執(zhí)行相應(yīng)的重試

初始化zk通信相關(guān)

這里會(huì)連接zk斑胜,然后增加監(jiān)聽(tīng)器

2.3 register

再回到RegistryProtocol的export方法控淡,registry初始化完成之后,接下來(lái)會(huì)調(diào)用register方法止潘,仍然是先調(diào)用父類(lèi)的方法
AbstractRegistry.register:將url加入到registered集合中
FailbackRegistry.register:

    @Override
    public void register(URL url) {
        super.register(url);
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // 向服務(wù)器端發(fā)送注冊(cè)請(qǐng)求
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;

            // 如果開(kāi)啟了啟動(dòng)時(shí)檢測(cè)掺炭,則直接拋出異常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if(skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }
            // 將失敗的注冊(cè)請(qǐng)求記錄到失敗列表,定時(shí)重試
            failedRegistered.add(url);
        }
    }

從代碼上看到凭戴,可以分為幾步

  1. failedRegistered和failedUnregistered中移除該url
  2. 發(fā)送注冊(cè)請(qǐng)求

可以看到注冊(cè)會(huì)失敗涧狮,如果設(shè)置了check為false,那么放到重試隊(duì)列中重試

doRegister方法是在ZookeeperRegistry中實(shí)現(xiàn)的么夫,主要是在zk上創(chuàng)建節(jié)點(diǎn)者冤,創(chuàng)建邏輯如下:

    public void create(String path, boolean ephemeral) {
        int i = path.lastIndexOf('/');
        if (i > 0) {
            create(path.substring(0, i), false);
        }
        if (ephemeral) {
            createEphemeral(path);
        } else {
            createPersistent(path);
        }
    }

Path格式為:
/dubbo/服務(wù)名/providers(可能是configurators或者router節(jié)點(diǎn))/url
該方法會(huì)遞歸把每個(gè)父節(jié)點(diǎn)都創(chuàng)建完畢,可以看到除了dubbo這個(gè)節(jié)點(diǎn)是持久化節(jié)點(diǎn)档痪,其他都是臨時(shí)節(jié)點(diǎn)涉枫,那么當(dāng)服務(wù)與zk斷開(kāi)連接一段時(shí)間后,zk會(huì)刪除該節(jié)點(diǎn)腐螟,服務(wù)消費(fèi)方就會(huì)得到通知愿汰,知道該提供者下線(xiàn),做相應(yīng)操作

2.4 subscribe

接下來(lái)看訂閱方法subscribe乐纸,同理會(huì)先調(diào)用父類(lèi)的方法
AbstractRegistry.subscribe:將listener保存到subscribed中
FailbackRegistry.subscribe:

    @Override
    public void subscribe(URL url, NotifyListener listener) {
        super.subscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
            // 向服務(wù)器端發(fā)送訂閱請(qǐng)求
            doSubscribe(url, listener);
        } catch (Exception e) {
            Throwable t = e;

            List<URL> urls = getCacheUrls(url);
            if (urls != null && urls.size() > 0) {
                notify(url, listener, urls);
                logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
            } else {
                // 如果開(kāi)啟了啟動(dòng)時(shí)檢測(cè)衬廷,則直接拋出異常
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true);
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if(skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
            }

            // 將失敗的訂閱請(qǐng)求記錄到失敗列表,定時(shí)重試
            addFailedSubscribed(url, listener);
        }
    }

和注冊(cè)類(lèi)似汽绢,區(qū)別是訂閱失敗的時(shí)候吗跋,首先會(huì)調(diào)用getCacheUrls獲取url,這個(gè)方法就是從緩存的properties對(duì)象里獲取服務(wù)的url庶喜,如果沒(méi)有數(shù)據(jù)才執(zhí)行和注冊(cè)一樣的操作小腊,有則調(diào)用notify方法救鲤,這個(gè)后面會(huì)說(shuō)到

看下核心的ZookeeperRegistry的doSubscribe方法

    List<URL> urls = new ArrayList<URL>();
    for (String path : toCategoriesPath(url)) {
        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
        if (listeners == null) {
            zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
            listeners = zkListeners.get(url);
        }
        ChildListener zkListener = listeners.get(listener);
        if (zkListener == null) {
            listeners.putIfAbsent(listener, new ChildListener() {
                public void childChanged(String parentPath, List<String> currentChilds) {
                    ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                }
            });
            zkListener = listeners.get(listener);
        }
        zkClient.create(path, false);
        List<String> children = zkClient.addChildListener(path, zkListener);
        if (children != null) {
            urls.addAll(toUrlsWithEmpty(url, path, children));
        }
    }
    notify(url, listener, urls);

toCategoriesPath方法是從url中獲取category屬性的值,轉(zhuǎn)換成zk上路徑的形式秩冈,由于provider只有一個(gè)configurators本缠,所以path如下
/dubbo/服務(wù)名/configurators,接下來(lái)就是為該節(jié)點(diǎn)添加監(jiān)聽(tīng)回調(diào)入问,然后返回子節(jié)點(diǎn)
如果子節(jié)點(diǎn)為空丹锹,那么toUrlsWithEmpty方法會(huì)返回empty://....格式的url,即protocol為empty芬失,這個(gè)協(xié)議后面會(huì)用到
接下來(lái)notify一路會(huì)調(diào)用到AbstractRegistry的notify方法

    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
       ....
        Map<String, List<URL>> result = new HashMap<String, List<URL>>();
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                List<URL> categoryList = result.get(category);
                if (categoryList == null) {
                    categoryList = new ArrayList<URL>();
                    result.put(category, categoryList);
                }
                categoryList.add(u);
            }
        }
        if (result.size() == 0) {
            return;
        }
        Map<String, List<URL>> categoryNotified = notified.get(url);
        if (categoryNotified == null) {
            notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
            categoryNotified = notified.get(url);
        }
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            saveProperties(url);
            listener.notify(categoryList);
        }
    }

主要看下最下面的循環(huán)楣黍,即遍歷每個(gè)類(lèi)目下的url,執(zhí)行一遍saveProperties方法(把服務(wù)信息保存到文件)棱烂,調(diào)用監(jiān)聽(tīng)器的notify方法租漂,listener最初初始化為OverrideListener

       public void notify(List<URL> urls) {
            List<URL> result = null;
            for (URL url : urls) {
                URL overrideUrl = url;
                if (url.getParameter(Constants.CATEGORY_KEY) == null
                        && Constants.OVERRIDE_PROTOCOL.equals(url.getProtocol())) {
                    // 兼容舊版本
                    overrideUrl = url.addParameter(Constants.CATEGORY_KEY, Constants.CONFIGURATORS_CATEGORY);
                }
                if (! UrlUtils.isMatch(subscribeUrl, overrideUrl)) {
                    if (result == null) {
                        result = new ArrayList<URL>(urls);
                    }
                    result.remove(url);
                }
            }
            if (result != null) {
                urls = result;
            }
            this.configurators = RegistryDirectory.toConfigurators(urls);
            List<ExporterChangeableWrapper<?>> exporters = new ArrayList<ExporterChangeableWrapper<?>>(bounds.values());
            for (ExporterChangeableWrapper<?> exporter : exporters){
                Invoker<?> invoker = exporter.getOriginInvoker();
                final Invoker<?> originInvoker ;
                if (invoker instanceof InvokerDelegete){
                    originInvoker = ((InvokerDelegete<?>)invoker).getInvoker();
                }else {
                    originInvoker = invoker;
                }
                
                URL originUrl = RegistryProtocol.this.getProviderUrl(originInvoker);
                URL newUrl = getNewInvokerUrl(originUrl, urls);
                
                if (! originUrl.equals(newUrl)){
                    RegistryProtocol.this.doChangeLocalExport(originInvoker, newUrl);
                }
            }
        }

主要看下最下面的循環(huán),會(huì)比對(duì)invoker的url是新的url是否是一樣颊糜,如果不一樣哩治,那么更新invoker的url

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市衬鱼,隨后出現(xiàn)的幾起案子业筏,更是在濱河造成了極大的恐慌,老刑警劉巖鸟赫,帶你破解...
    沈念sama閱讀 216,324評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件蒜胖,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡抛蚤,警方通過(guò)查閱死者的電腦和手機(jī)台谢,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,356評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)霉颠,“玉大人对碌,你說(shuō)我怎么就攤上這事荆虱≥镔耍” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,328評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵怀读,是天一觀的道長(zhǎng)诉位。 經(jīng)常有香客問(wèn)我,道長(zhǎng)菜枷,這世上最難降的妖魔是什么苍糠? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,147評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮啤誊,結(jié)果婚禮上岳瞭,老公的妹妹穿的比我還像新娘拥娄。我一直安慰自己,他們只是感情好瞳筏,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,160評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布稚瘾。 她就那樣靜靜地躺著,像睡著了一般姚炕。 火紅的嫁衣襯著肌膚如雪摊欠。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,115評(píng)論 1 296
  • 那天柱宦,我揣著相機(jī)與錄音些椒,去河邊找鬼。 笑死掸刊,一個(gè)胖子當(dāng)著我的面吹牛免糕,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播忧侧,決...
    沈念sama閱讀 40,025評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼说墨,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了苍柏?” 一聲冷哼從身側(cè)響起尼斧,我...
    開(kāi)封第一講書(shū)人閱讀 38,867評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎试吁,沒(méi)想到半個(gè)月后棺棵,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,307評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡熄捍,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,528評(píng)論 2 332
  • 正文 我和宋清朗相戀三年烛恤,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片余耽。...
    茶點(diǎn)故事閱讀 39,688評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡缚柏,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出碟贾,到底是詐尸還是另有隱情币喧,我是刑警寧澤,帶...
    沈念sama閱讀 35,409評(píng)論 5 343
  • 正文 年R本政府宣布袱耽,位于F島的核電站杀餐,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏朱巨。R本人自食惡果不足惜史翘,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,001評(píng)論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧琼讽,春花似錦必峰、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,657評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至脉让,卻和暖如春桂敛,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背溅潜。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,811評(píng)論 1 268
  • 我被黑心中介騙來(lái)泰國(guó)打工术唬, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人滚澜。 一個(gè)月前我還...
    沈念sama閱讀 47,685評(píng)論 2 368
  • 正文 我出身青樓粗仓,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親设捐。 傳聞我的和親對(duì)象是個(gè)殘疾皇子借浊,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,573評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容

  • dubbo服務(wù)發(fā)布 dubbo服務(wù)發(fā)布只需在spring.xml中如下配置即可:<dubbo:service in...
    阿飛的博客閱讀 1,618評(píng)論 6 10
  • dubbo暴露服務(wù)有兩種情況,一種是設(shè)置了延遲暴露(比如delay="5000")萝招,另外一種是沒(méi)有設(shè)置延遲暴露或者...
    加大裝益達(dá)閱讀 21,266評(píng)論 5 36
  • 我們以dubbo 的xml配置為例: dubbo服務(wù)發(fā)布只需在spring.xml中如下配置即可: <dubbo:...
    匡和閱讀 1,416評(píng)論 0 0
  • Dubbo是什么 Dubbo是Alibaba開(kāi)源的分布式服務(wù)框架蚂斤,它最大的特點(diǎn)是按照分層的方式來(lái)架構(gòu),使用這種方式...
    Coselding閱讀 17,206評(píng)論 3 196
  • 陽(yáng)光灑落 點(diǎn)亮櫥窗微渺塵埃 窗外黑白斑馬線(xiàn) 多少人來(lái)來(lái)往往 衣冠整齊槐沼,眼眉低垂 像遺失了玩具的孩子 走不出傷悲 大...
    燕純閱讀 341評(píng)論 0 1