dubbo之Registry(注冊(cè)中心)

前言

Registry是dubbo對(duì)注冊(cè)中心的抽象分唾,提供服務(wù)的注冊(cè)抗碰、注銷、查找绽乔、訂閱弧蝇、取消訂閱等功能。本文按照dubbo中Registry的組織形式折砸,分析Registry的核心邏輯看疗。首先來(lái)看注冊(cè)中心的創(chuàng)建,RegistryFactory接口定義注冊(cè)中心的創(chuàng)建(工廠模式實(shí)現(xiàn))睦授,支持SPI擴(kuò)展鹃觉,默認(rèn)SPI實(shí)現(xiàn)是DubboRegistryFactory。注冊(cè)中心Registry接口繼承Node睹逃、RegistryService盗扇,抽象基類AbstractRegistry直接實(shí)現(xiàn)Registry,F(xiàn)ailbackRegistry繼承自基類AbstractRegistry沉填,所有注冊(cè)中心實(shí)現(xiàn)均繼承自FailbackRegistry疗隶,這里可以看出,所有注冊(cè)中心均支持失敗重試(Failback)翼闹。

一斑鼻、RegistryFactory(注冊(cè)中心工廠)

RegistryFactory的UML類圖如下:


注冊(cè)中心工廠UML (1).jpg

先來(lái)看RegistryFactory接口定義,比較簡(jiǎn)單:

@SPI("dubbo")
public interface RegistryFactory {
    /**
     * Connect to the registry
     *  1猎荠、check=false時(shí)坚弱,連接無(wú)需check,否則連接斷開(kāi)時(shí)直接拋異常
     *  2关摇、支持URL的用戶名荒叶、密碼權(quán)限校驗(yàn)
     *  3、支持注冊(cè)中心族備份地址:10.20.153.10
     *  4输虱、支持注冊(cè)中心本地緩存文件
     *  5些楣、支持超時(shí)設(shè)置
     *  6、支持session 60s過(guò)期
     */
    @Adaptive({"protocol"})
    Registry getRegistry(URL url);

1.1 AbstractRegistryFactory

接著是抽象基類AbstractRegistryFactory宪睹,實(shí)現(xiàn)RegistryFactory接口的getRegistry方法愁茁,同時(shí)定義模板方法createRegistry供子類實(shí)現(xiàn);AbstractRegistryFactory的getRegistry方法先從Map緩存查詢注冊(cè)中心亭病,查不到則執(zhí)行模板方法createRegistry創(chuàng)建注冊(cè)中心鹅很,并放入緩存,然后返回該registry

@Override
public Registry getRegistry(URL url) {
    url = url.setPath(RegistryService.class.getName())
            .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
    String key = url.toServiceStringWithoutResolving();
    //加鎖罪帖,保證單例
    LOCK.lock();
    try {
        // 緩存查詢注冊(cè)中心
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }
        //create registry by spi/ioc
        // 不存在則通過(guò)spi促煮、ioc方式創(chuàng)建registry
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        // 鎖釋放
        LOCK.unlock();
    }
}

1.2 其他實(shí)現(xiàn)

基類AbstractRegistryFactory的子類實(shí)現(xiàn)中食听,比較重要的是DubboRegistryFactory(介紹Protocol時(shí)已經(jīng)做了解析),其他實(shí)現(xiàn)比如RedisRegistryFactory污茵、ZookeeperRegistryFactory樱报、MulticastRegistryFactory的邏輯非常簡(jiǎn)單,直接返回對(duì)應(yīng)的注冊(cè)中心實(shí)現(xiàn)泞当,代碼就省略了迹蛤。

二、Registry(注冊(cè)中心)

來(lái)看dubbo中Registry實(shí)現(xiàn)襟士。UML類圖如下:


注冊(cè)中心UML.jpg

方便理解起見(jiàn)盗飒,這里把注冊(cè)中實(shí)現(xiàn)分為三個(gè)層次,分別是Registry接口陋桂、FailbackRegistry實(shí)現(xiàn)逆趣、注冊(cè)中心實(shí)現(xiàn)。

2.1嗜历、Registry接口

上面UML類圖中可以看出宣渗,Registry繼承Node和RegistryService接口(Registry接口內(nèi)部無(wú)新增方法),重點(diǎn)關(guān)注RegistryService接口梨州。RegistryService抽象了服務(wù)的注冊(cè)痕囱、注銷、訂閱暴匠、取消訂閱鞍恢、查找等核心功能,來(lái)看接口定義:

public interface RegistryService {
    /**
     * 注冊(cè)數(shù)據(jù)每窖,比如提供者服務(wù)帮掉、消費(fèi)者地址、路由規(guī)則窒典、override規(guī)則以及其他數(shù)據(jù)
     * 1蟆炊、若URL中check=false,那么注冊(cè)失敗會(huì)進(jìn)行重試且不會(huì)拋出異常崇败,否則直接拋異常
     * 2盅称、若URL中dynamic=false肩祥,那么URL中信息會(huì)被持久化存儲(chǔ)后室,否則注冊(cè)過(guò)程異常退出,URL信息應(yīng)當(dāng)被刪除
     * 3混狠、若URL中category=routers岸霹,那么意味著分類存儲(chǔ),默認(rèn)catetory=providers将饺,且會(huì)根據(jù)分類區(qū)域進(jìn)行通知更新
     * 4贡避、當(dāng)注冊(cè)中心重啟,網(wǎng)絡(luò)波動(dòng),數(shù)據(jù)不能被丟棄蟆盐,包括刪除損壞的流水線中數(shù)據(jù)的刪除
     * 5啄踊、參數(shù)不同的URL可以共存,不能相互覆蓋
     * @param url  Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin
     */
    void register(URL url);

    /**
     * 注銷
     * 1杀捻、若是dynamic=false的持久化存儲(chǔ)井厌,如果找不到注冊(cè)數(shù)據(jù),那么會(huì)拋出非法狀態(tài)異常致讥,否則會(huì)忽略
     * 2仅仆、根據(jù)完整URL進(jìn)行匹配注銷
     * @param url  Registration information , is not allowed to be empty, e.g: dubbo://10.20.153.10/org.apache.dubbo.foo.BarService?version=1.0.0&application=kylin
     */
    void unregister(URL url);

    /**
     * 訂閱注冊(cè)數(shù)據(jù),并在注冊(cè)數(shù)據(jù)更新的時(shí)候自動(dòng)推送
     * 服務(wù)訂閱
     * 1垢袱、若URL中check=false墓拜,那么當(dāng)注冊(cè)失敗時(shí),會(huì)直接在后臺(tái)重試不會(huì)拋異常
     * 2请契、若URL中category=routers咳榜,那么只會(huì)通知特定分類數(shù)據(jù);多個(gè)分類用逗號(hào)隔開(kāi)爽锥;允許使用*全部匹配
     * 3贿衍、允許接口、組救恨、版本以及分類作為查詢條件
     * 4贸辈、查詢條件允許使用*進(jìn)行匹配,意味著訂閱接口的所有版本
     * 5肠槽、若注冊(cè)中心重啟擎淤、網(wǎng)絡(luò)波動(dòng),必須自動(dòng)保存訂閱請(qǐng)求
     * 6秸仙、參數(shù)不同的URL可以共存嘴拢,不能相互覆蓋
     * 7、訂閱過(guò)程必須是阻塞的
     */
    void subscribe(URL url, NotifyListener listener);

    /**
    * 1寂纪、沒(méi)有訂閱席吴,則直接忽略
     * 2、根據(jù)URL完全匹配
     **/
    void unsubscribe(URL url, NotifyListener listener);

    /**
     * 根據(jù)條件匹配捞蛋,查詢注冊(cè)數(shù)據(jù)孝冒;對(duì)應(yīng)訂閱的推模式,提供拉模式拟杉,且僅返回一個(gè)結(jié)果
    */
    List<URL> lookup(URL url);

從接口定義可以看出庄涡,RegsitryService對(duì)數(shù)據(jù)的注冊(cè)、注銷搬设、訂閱穴店、取消訂閱撕捍、查找等功能做了定義約束,所有對(duì)RegistryService的實(shí)現(xiàn)都必須滿足這個(gè)約束泣洞。

2.2忧风、AbstractRegistry & FailbackRegistry

接下來(lái)看Registry接口的基類實(shí)現(xiàn)AbstractRegistry、FailbackRegistry球凰。

2.2.1 AbstractRegistry

先來(lái)看AbstractRegistry阀蒂,重點(diǎn)關(guān)注構(gòu)造方法,AbstractRegistry基類的構(gòu)造過(guò)程的核心邏輯分為三部分:1、創(chuàng)建注冊(cè)中心緩存文件弟蚀;2蚤霞、cache文件加載至properties;3义钉、同步backUpUrl信息昧绣。我們先來(lái)看構(gòu)造方法的定義,然后再分步來(lái)看:

public AbstractRegistry(URL url) {
    // 注冊(cè)中心URL負(fù)載
    setUrl(url);
    // Start file save timer
    // 1捶闸、注冊(cè)中心文件同步保存開(kāi)關(guān)夜畴,默認(rèn)關(guān)閉,即默認(rèn)異步保存
    syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
    // 默認(rèn)文件路徑: user.home/.dubbo/dubbo-registry-applicationName-ip.cache
    String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
    File file = null;
    if (ConfigUtils.isNotEmpty(filename)) {
        // 文件目錄創(chuàng)建失敗删壮,直接拋異常
        file = new File(filename);
        if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
            if (!file.getParentFile().mkdirs()) {
                throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
            }
        }
    }
    this.file = file;
    // When starting the subscription center,we need to read the local cache file for future Registry fault tolerance processing.
    // 2贪绘、加載注冊(cè)中心cache文件到內(nèi)存,用于容錯(cuò)
    loadProperties();
    // 3央碟、backUpUrl數(shù)據(jù)同步税灌,同步所有訂閱者、更新properties緩存亿虽。
    notify(url.getBackupUrls());
}
2.2.1.1菱涤、創(chuàng)建注冊(cè)中心緩存

這一步比較簡(jiǎn)單,首先根據(jù)URL信息洛勉,確定緩存文件保存方式(異步粘秆、同步);然后收毫,拼接緩存文件名稱攻走,根據(jù)文件名創(chuàng)建緩存文件,創(chuàng)建失敗則直接拋異常此再,否則初始化緩存文件file昔搂。

2.2.1.2、注冊(cè)中心cache加載

加載本地cache文件到內(nèi)存緩存properties引润,將上一步中的cache文件內(nèi)容巩趁,加載到properties。比較容易理解淳附,啟動(dòng)時(shí)先讀本地緩存议慰,用于容錯(cuò)。

private void loadProperties() {
    if (file != null && file.exists()) {
        InputStream in = null;
        // 省略try-catch
            in = new FileInputStream(file);
            properties.load(in);
            if (logger.isInfoEnabled()) {
                logger.info("Load registry cache file " + file + ", data: " + properties);
            }
    }
}
2.2.1.3奴曙、backupUrl數(shù)據(jù)同步

重點(diǎn)來(lái)看backupUrl的數(shù)據(jù)同步别凹,backupUrl的生成方式前面我們已經(jīng)講過(guò)了,這一步主要是將生成的backupUrl列表同步給各訂閱者以及內(nèi)存緩存Properties洽糟,來(lái)看代碼:

protected void notify(List<URL> urls) {
    if (CollectionUtils.isEmpty(urls)) {
        return;
    }

    // 根據(jù)已訂閱的URL炉菲,以及訂閱者listener進(jìn)行同步
    for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
        URL url = entry.getKey();
                
        //過(guò)濾掉不匹配的URL
        if (!UrlUtils.isMatch(url, urls.get(0))) {
            continue;
        }

        Set<NotifyListener> listeners = entry.getValue();
        if (listeners != null) {
            for (NotifyListener listener : listeners) {
                try {
                    // 核心邏輯,執(zhí)行URL信息同步
                    notify(url, listener, filterEmpty(url, urls));
                } catch (Throwable t) {
                    logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
                }
            }
        }
    }
}

繼續(xù)來(lái)看notify方法:

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    if (url == null) {
        throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("notify listener == null");
    }
    if ((CollectionUtils.isEmpty(urls))
            && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
        logger.warn("Ignore empty notify urls for subscribe url " + url);
        return;
    }
    if (logger.isInfoEnabled()) {
        logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
    }
    // keep every provider's category.
    // 按照category分組
    Map<String, List<URL>> result = new HashMap<>();
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
            String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
            categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
  
    //緩存分組后的Notified結(jié)果
    Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        // 同步URL信息坤溃,這里有兩種實(shí)現(xiàn)拍霜,分別是RegistryDirectory和RegistryProtocol$OverrideListener,listener通過(guò)subscribe方法被注冊(cè)到subscribed緩存,
        listener.notify(categoryList);
        // 每次notify都會(huì)更新cache緩存文件(同步或者異步)薪介,保證注冊(cè)中心properties內(nèi)容與各訂閱者拿到的信息一致祠饺。
        saveProperties(url);
    }
}

同步URL信息到訂閱者的邏輯主要在RegistryDirectory和RegistryProtocol$OverrideListener,這里就不再做解析了汁政。來(lái)看同步到內(nèi)存properties的邏輯道偷,把所有待同步的URL序列化為字符串(每個(gè)url中間用空格隔開(kāi)),然后將URL數(shù)據(jù)保存(同步或異步)至緩存文件(即2.2.1.1中創(chuàng)建的緩存文件)记劈,注意勺鸦,這里異步保存實(shí)際執(zhí)行的邏輯與同步保存完全一致,核心邏輯在doSaveProperties方法目木。

private void saveProperties(URL url) {
    if (file == null) {
        return;
    }

    try {
        StringBuilder buf = new StringBuilder();
        //  url緩存信息組裝,每個(gè)url中間用空格隔開(kāi)
        Map<String, List<URL>> categoryNotified = notified.get(url);
        if (categoryNotified != null) {
            for (List<URL> us : categoryNotified.values()) {
                for (URL u : us) {
                    if (buf.length() > 0) {
                        buf.append(URL_SEPARATOR);
                    }
                    buf.append(u.toFullString());
                }
            }
        }
        // 更新內(nèi)存緩存
        properties.setProperty(url.getServiceKey(), buf.toString());
        // 版本控制换途,可以看出,MVCC并非DB專有
        long version = lastCacheChanged.incrementAndGet();
        // 同步保存文件則直接執(zhí)行save,將properties緩存內(nèi)容同步至緩存文件刽射,否則放入線程池異步調(diào)度
        if (syncSaveFile) {
            doSaveProperties(version);
        } else {
            registryCacheExecutor.execute(new SaveProperties(version));
        }
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
}

繼續(xù)來(lái)看doSaveProperties方法怀跛,方法參數(shù)是當(dāng)前文件的版本號(hào),可以看到柄冲,防止并發(fā)操作吻谋,版本號(hào)用于版本控制

// 保存配置到本地緩存文件,文件版本
public void doSaveProperties(long version) {
    // 防止版本回退
    if (version < lastCacheChanged.get()) {
        return;
    }
    if (file == null) {
        return;
    }
    // Save
    try {
        // 創(chuàng)建本地緩存文件鎖,不存在則直接創(chuàng)建现横;
        File lockfile = new File(file.getAbsolutePath() + ".lock");
        if (!lockfile.exists()) {
            lockfile.createNewFile();
        }
        try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
             FileChannel channel = raf.getChannel()) {
            FileLock lock = channel.tryLock();
            //拿到鎖才可以操作
            if (lock == null) {
                throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
            }
            // Save
            try {
                if (!file.exists()) {
                    file.createNewFile();
                }
                try (FileOutputStream outputFile = new FileOutputStream(file)) {
                    properties.store(outputFile, "Dubbo Registry Cache");
                }
            } finally {
                lock.release();
            }
        }
    } catch (Throwable e) {
        if (version < lastCacheChanged.get()) {
            return;
        } else {
            registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
        }
        logger.warn("Failed to save registry cache file, cause: " + e.getMessage(), e);
    }
}

為了進(jìn)一步加深理解漓拾,我們把AbstractRegistry構(gòu)建過(guò)程中的數(shù)據(jù)流圖示如下:


AbstractRegistry init.jpg
2.2.1.4、數(shù)據(jù)注冊(cè) - register

基類中的注冊(cè)等方法邏輯非常簡(jiǎn)單戒祠,只是將URL放入緩存,非常簡(jiǎn)單骇两,不做過(guò)多說(shuō)明。

2.2.1.5姜盈、數(shù)據(jù)注銷 - unRegister

同樣的低千,注銷方法邏輯也非常簡(jiǎn)單,將URL從緩存中刪除。

2.2.1.6示血、數(shù)據(jù)訂閱 - subscribe

數(shù)據(jù)訂閱需要注意棋傍,訂閱的邏輯核心是注冊(cè)監(jiān)聽(tīng)器,以便數(shù)據(jù)變更時(shí)难审,同步更新瘫拣;subscribed緩存的結(jié)構(gòu)比較特殊,來(lái)看代碼:

@Override
public void subscribe(URL url, NotifyListener listener) {
    if (url == null) {
        throw new IllegalArgumentException("subscribe url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("subscribe listener == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Subscribe: " + url);
    }
    // subscribed緩存結(jié)構(gòu):ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>()
    Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>());
    listeners.add(listener);
}
2.2.1.7告喊、取消數(shù)據(jù)訂閱 - unSubscribe

取消訂閱即刪除監(jiān)聽(tīng)器麸拄,從subscribed緩存中刪除對(duì)應(yīng)監(jiān)聽(tīng)器。

2.2.1.8黔姜、數(shù)據(jù)查找 - lookup

查找邏輯即拢切,從已同步過(guò)的URL列表(notified緩存)中,過(guò)濾滿足要求的URL秆吵;若當(dāng)前已同步過(guò)的URL集合為空淮椰,則

@Override
public List<URL> lookup(URL url) {
    List<URL> result = new ArrayList<>();
    Map<String, List<URL>> notifiedUrls = getNotified().get(url);
    if (notifiedUrls != null && notifiedUrls.size() > 0) {
        for (List<URL> urls : notifiedUrls.values()) {
            for (URL u : urls) {
                if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
                    result.add(u);
                }
            }
        }
    } else {
        final AtomicReference<List<URL>> reference = new AtomicReference<>();
        NotifyListener listener = reference::set;
        // 即注冊(cè)監(jiān)聽(tīng)器,保證首次notify有數(shù)據(jù)返回
        subscribe(url, listener); // Subscribe logic guarantees the first notify to return
        List<URL> urls = reference.get();
        if (CollectionUtils.isNotEmpty(urls)) {
            for (URL u : urls) {
                if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
                    result.add(u);
                }
            }
        }
    }
    return result;
}
2.2.1.9帮毁、注冊(cè)中心銷毀- destory

銷毀的邏輯比較簡(jiǎn)單实苞,分為注銷數(shù)據(jù)和注銷監(jiān)聽(tīng)器兩部分,來(lái)看代碼:

@Override
// 主要做兩件事情:1烈疚、注銷URL黔牵,從registered列表中剔除所有URL;2爷肝、移除所有NotifyListener猾浦,也就是說(shuō)不再接受配置變更同步消息。
public void destroy() {
    if (logger.isInfoEnabled()) {
        logger.info("Destroy registry:" + getUrl());
    }
    // 將所有URL從registered移除,即注銷全部數(shù)據(jù)
    Set<URL> destroyRegistered = new HashSet<>(getRegistered());
    if (!destroyRegistered.isEmpty()) {
        for (URL url : new HashSet<>(getRegistered())) {
            if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
                try {
                    unregister(url);
                    if (logger.isInfoEnabled()) {
                        logger.info("Destroy unregister url " + url);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                }
            }
        }
    }

    // 注銷監(jiān)聽(tīng)器,即從subscribed移除所有訂閱URL的listener
    Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<>(getSubscribed());
    if (!destroySubscribed.isEmpty()) {
        for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
            URL url = entry.getKey();
            for (NotifyListener listener : entry.getValue()) {
                try {
                    unsubscribe(url, listener);
                    if (logger.isInfoEnabled()) {
                        logger.info("Destroy unsubscribe url " + url);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                }
            }
        }
    }
}

AbstractRegistry基類的邏輯就分析到這里灯抛,接著來(lái)看FailbackRegistry金赦。

2.2.2、FailbackRegistry

FailbackRegistry 顧名思義对嚼,支持失敗恢復(fù)的注冊(cè)中心夹抗,繼承自基類AbstractRegistry,可以看到在基類基礎(chǔ)上擴(kuò)展了失敗恢復(fù)功能纵竖,先來(lái)看幾個(gè)緩存變量:

// 重試任務(wù)map
private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();

private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();

private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();

private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();

private final ConcurrentMap<Holder, FailedNotifiedTask> failedNotified = new ConcurrentHashMap<Holder, FailedNotifiedTask>();

幾個(gè)重試任務(wù)map漠烧,用于緩存失敗的任務(wù)以便于重試;接著來(lái)看構(gòu)造方法,在父類構(gòu)造方法的基礎(chǔ)上靡砌,新增了HashedWheelTimer實(shí)例已脓,用于定時(shí)重試失敗任務(wù),默認(rèn)重試時(shí)間間隔5s通殃。

public FailbackRegistry(URL url) {
    super(url);
    // 默認(rèn)重試時(shí)間間隔 5s
    this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);

    // 利用hashTimer實(shí)現(xiàn)定時(shí)重試,
    retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
}

除此之外度液,F(xiàn)ailbackRegistry還定義了幾個(gè)關(guān)鍵的模板方法,由子類實(shí)現(xiàn):

public abstract void doRegister(URL url);
public abstract void doUnregister(URL url);
public abstract void doSubscribe(URL url, NotifyListener listener);
public abstract void doUnsubscribe(URL url, NotifyListener listener);
2.2.2.1、數(shù)據(jù)注冊(cè)-register

注冊(cè)邏輯在父類基礎(chǔ)上新增了失敗以后的操作堕担,邏輯比較簡(jiǎn)單已慢,直接來(lái)看代碼:

@Override
public void register(URL url) {
    // 父類注冊(cè)方法,保存url到已注冊(cè)列表
    super.register(url);
    // 將url從注冊(cè)失敗列表中剔除
    removeFailedRegistered(url);
    // 將url從注銷失敗列表中剔除
    removeFailedUnregistered(url);
    try {
        // 執(zhí)行模板方法邏輯
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;
                // 如果啟動(dòng)檢測(cè)開(kāi)關(guān)開(kāi)啟(默認(rèn)開(kāi)啟)照宝,失敗會(huì)直接拋異常蛇受,否則加入失敗列表句葵,用于重試
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // 加入注冊(cè)失敗列表厕鹃,用于重試
        addFailedRegistered(url);
    }
}
2.2.2.2、數(shù)據(jù)注銷-unRegister

注銷邏輯與注冊(cè)類似乍丈,直接來(lái)看代碼:

@Override
public void unregister(URL url) {
    // 父類注銷邏輯
    super.unregister(url);
    // 從注冊(cè)失敗緩存中刪除
    removeFailedRegistered(url);
    // 從注銷失敗列表中刪除
    removeFailedUnregistered(url);
    try {
        // 執(zhí)行模板方法
        doUnregister(url);
    } catch (Exception e) {
        Throwable t = e;
                // 如果啟動(dòng)檢測(cè)開(kāi)關(guān)開(kāi)啟(默認(rèn)開(kāi)啟)剂碴,失敗會(huì)直接拋異常,否則加入失敗列表轻专,用于重試
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to unregister " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to unregister " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        //加入失敗重試緩存
        addFailedUnregistered(url);
    }
}
2.2.2.3忆矛、數(shù)據(jù)訂閱-subscribe

與注冊(cè)、注銷邏輯類似请垛,數(shù)據(jù)訂閱邏輯也非常簡(jiǎn)單催训,代碼就不再展示了

2.2.2.4、取消數(shù)據(jù)訂閱-unSubscribe

與注冊(cè)宗收、注銷邏輯類似匈勋,不做過(guò)多解析璃俗。

2.3喊衫、注冊(cè)中心實(shí)現(xiàn)

好了充尉,前面的鋪墊結(jié)束了鲜锚,本節(jié)來(lái)看具體的注冊(cè)中心實(shí)現(xiàn)周拐,下面按照DubboRegistry、MulticastRegistry吏够、RedisRegistry勾给、ZookeeperRegistry的順序依次分析滩报。

2.3.1、DubboRegistry

DubboRegistry作為dubbo的默認(rèn)注冊(cè)中心實(shí)現(xiàn)(RegistryFactory默認(rèn)SPI實(shí)現(xiàn)是DubboRegistryFactory)播急,嚴(yán)格意義上來(lái)講脓钾,DubboRegistry實(shí)際上是一個(gè)Registry代理,核心邏輯全部由代理也即registryService實(shí)現(xiàn)桩警。DubboRegistry的創(chuàng)建在DubboRegistryFactory中已經(jīng)做了解析可训,這里不做過(guò)多說(shuō)明。重點(diǎn)關(guān)注DubboRegistry的構(gòu)造方法

public DubboRegistry(Invoker<RegistryService> registryInvoker, RegistryService registryService) {
    super(registryInvoker.getUrl());
    this.registryInvoker = registryInvoker;
    // Registry代理捶枢,核心邏輯借助registryService實(shí)現(xiàn)
    this.registryService = registryService;
    // 重連定時(shí)器握截,默認(rèn)重連間隔時(shí)間3s
    this.reconnectPeriod = registryInvoker.getUrl().getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, RECONNECT_PERIOD_DEFAULT);
    // 初始化調(diào)度任務(wù)邏輯,具體邏輯參考recover方法
    reconnectFuture = reconnectTimer.scheduleWithFixedDelay(() -> {
        try {
            // 重連邏輯
            connect();
        } catch (Throwable t) { // Defensive fault tolerance
            logger.error("Unexpected error occur at reconnect, cause: " + t.getMessage(), t);
        }
    }, reconnectPeriod, reconnectPeriod, TimeUnit.MILLISECONDS);
}

介紹FailbackRegistry時(shí)說(shuō)過(guò)柱蟀,所有的注冊(cè)中心實(shí)現(xiàn)都支持自動(dòng)恢復(fù)川蒙,DubboRegistry的自動(dòng)恢復(fù)實(shí)現(xiàn)原理是若當(dāng)前注冊(cè)中心不可用蚜厉,則直接將該URL注冊(cè)信息加入到注冊(cè)失敗长已、訂閱失敗緩存(借助父類FailbackRegistry的addFailedRegistered、addFailedSubscribed方法實(shí)現(xiàn))昼牛,由父類的HashedWheelTimer重新調(diào)度术瓮,進(jìn)行恢復(fù),來(lái)看失敗后加入緩存的邏輯(外層方法是connect贰健,內(nèi)部實(shí)際上執(zhí)行的recover方法):

// 核心邏輯:注冊(cè)失敗胞四、訂閱失敗的url分別放入對(duì)應(yīng)列表,用于hashedWheelTimer調(diào)度
@Override
protected void recover() throws Exception {
    // register
    Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
    if (!recoverRegistered.isEmpty()) {
        if (logger.isInfoEnabled()) {
            logger.info("Recover register url " + recoverRegistered);
        }
        // 放入注冊(cè)失敗列表
        for (URL url : recoverRegistered) {
            addFailedRegistered(url);
        }
    }
    // subscribe
    Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
    if (!recoverSubscribed.isEmpty()) {
        if (logger.isInfoEnabled()) {
            logger.info("Recover subscribe url " + recoverSubscribed.keySet());
        }
        for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
            URL url = entry.getKey();
            for (NotifyListener listener : entry.getValue()) {
                addFailedSubscribed(url, listener);
            }
        }
    }
}

2.3.2伶椿、MulticastRegistry

MulticastRegistry即多播注冊(cè)中心辜伟,顧名思義,采用多播實(shí)現(xiàn)脊另;注冊(cè)导狡、注銷、訂閱偎痛、取消訂閱等均通過(guò)多播方式實(shí)現(xiàn)旱捧。核心邏輯在構(gòu)造方法,包括多播組的創(chuàng)建以及多播消息的處理踩麦;創(chuàng)建多播組比較容易理解枚赡,重點(diǎn)關(guān)注多播消息的處理,在構(gòu)造方法中創(chuàng)建并啟動(dòng)一個(gè)守護(hù)線程谓谦,用于接收并處理多播消息(注冊(cè)消息贫橙、注銷消息、訂閱消息)反粥,處理多播消息的入口是receive方法卢肃。

2.3.2.1谓松、構(gòu)造方法

先來(lái)看構(gòu)造方法:

// 創(chuàng)建并啟動(dòng)daemon線程,用于接收廣播消息践剂,對(duì)接收到的消息處理邏輯在receive方法
public MulticastRegistry(URL url) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    try {
        // 創(chuàng)建并加入多播組
        multicastAddress = InetAddress.getByName(url.getHost());
        checkMulticastAddress(multicastAddress);
        multicastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort();
        multicastSocket = new MulticastSocket(multicastPort);
        // 注冊(cè)中心URL地址加入多播組
        NetUtils.joinMulticastGroup(multicastSocket, multicastAddress);
        // 啟動(dòng)daemon線程鬼譬,用于接收廣播socket消息
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                byte[] buf = new byte[2048];
                // UDP包封裝
                DatagramPacket recv = new DatagramPacket(buf, buf.length);
                while (!multicastSocket.isClosed()) {
                    try {
                        // 接收UDP報(bào)文
                        multicastSocket.receive(recv);
                        String msg = new String(recv.getData()).trim();
                        int i = msg.indexOf('\n');
                        if (i > 0) {
                            msg = msg.substring(0, i).trim();
                        }
                        // 多播消息接收處理
                        MulticastRegistry.this.receive(msg, (InetSocketAddress) recv.getSocketAddress());
                        Arrays.fill(buf, (byte) 0);
                    } catch (Throwable e) {
                        if (!multicastSocket.isClosed()) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
            }
        }, "DubboMulticastRegistryReceiver");
        thread.setDaemon(true);
        thread.start();
    } catch (IOException e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
    // 這里利用定時(shí)調(diào)度線程池,定時(shí)清理received緩存中不可用socket逊脯;默認(rèn)清理時(shí)間間隔60s;
    this.cleanPeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
    if (url.getParameter("clean", true)) {
        this.cleanFuture = cleanExecutor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    clean(); // Remove the expired
                } catch (Throwable t) { // Defensive fault tolerance
                    logger.error("Unexpected exception occur at clean expired provider, cause: " + t.getMessage(), t);
                }
            }
        }, cleanPeriod, cleanPeriod, TimeUnit.MILLISECONDS);
    } else {
        this.cleanFuture = null;
    }
}

receive方法實(shí)現(xiàn)比較簡(jiǎn)單优质,根據(jù)msg類型(通過(guò)消息前綴判斷)執(zhí)行相應(yīng)的注冊(cè)、注銷军洼、訂閱操作巩螃;重點(diǎn)關(guān)注registered、unregistered匕争、multicast三個(gè)方法(MulticastRegistry的注冊(cè)避乏、注銷、訂閱邏輯均通過(guò)這三個(gè)方法實(shí)現(xiàn))甘桑。

private void receive(String msg, InetSocketAddress remoteAddress) {
    if (logger.isInfoEnabled()) {
        logger.info("Receive multicast message: " + msg + " from " + remoteAddress);
    }
    // 廣播注冊(cè)消息
    if (msg.startsWith(Constants.REGISTER)) {
        URL url = URL.valueOf(msg.substring(Constants.REGISTER.length()).trim());
        registered(url);
    // 廣播注銷消息
    } else if (msg.startsWith(Constants.UNREGISTER)) {
        URL url = URL.valueOf(msg.substring(Constants.UNREGISTER.length()).trim());
        unregistered(url);
    // 廣播訂閱消息
    } else if (msg.startsWith(Constants.SUBSCRIBE)) {
        URL url = URL.valueOf(msg.substring(Constants.SUBSCRIBE.length()).trim());
        // 根據(jù)注冊(cè)的地址拍皮,發(fā)送單播、多播消息
        Set<URL> urls = getRegistered();
        if (CollectionUtils.isNotEmpty(urls)) {
            for (URL u : urls) {
                if (UrlUtils.isMatch(url, u)) {
                    String host = remoteAddress != null && remoteAddress.getAddress() != null ? remoteAddress.getAddress().getHostAddress() : url.getIp();
                    // 發(fā)送單播跑杭,多播消息
                    if (url.getParameter("unicast", true) // Whether the consumer's machine has only one process
                            && !NetUtils.getLocalHost().equals(host)) { // Multiple processes in the same machine cannot be unicast with unicast or there will be only one process receiving information
                        unicast(Constants.REGISTER + " " + u.toFullString(), host);
                    } else {
                        multicast(Constants.REGISTER + " " + u.toFullString());
                    }
                }
            }
        }
    }/* else if (msg.startsWith(UNSUBSCRIBE)) {
    }*/
}

先來(lái)看registered方法铆帽,核心邏輯是將URL與subscried緩存中的URL進(jìn)行匹配,匹配成功則加入received德谅,同時(shí)同步給訂閱者(通過(guò)NotifyListener的notify方法),然后通知當(dāng)前l(fā)istener(在subscribe時(shí)wait*)

protected void registered(URL url) {
    for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
        URL key = entry.getKey();
        if (UrlUtils.isMatch(key, url)) {
            Set<URL> urls = received.get(key);
            if (urls == null) {
                received.putIfAbsent(key, new ConcurrentHashSet<URL>());
                urls = received.get(key);
            }
            urls.add(url);
            List<URL> list = toList(urls);
            for (NotifyListener listener : entry.getValue()) {
                notify(key, listener, list);
                synchronized (listener) {
                    listener.notify();
                }
            }
        }
    }
}

再來(lái)看unRegistered爹橱,邏輯上大體與registered類似,將多播消息中的URL與subscried中URL匹配窄做,匹配成功則將該URL從received中剔除愧驱;若當(dāng)前received中該URL對(duì)應(yīng)URL列表為空,則重置該URL的protocol為empty椭盏;最后同步變更給訂閱者组砚。

protected void unregistered(URL url) {
    for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
        URL key = entry.getKey();
        if (UrlUtils.isMatch(key, url)) {
            Set<URL> urls = received.get(key);
            if (urls != null) {
                urls.remove(url);
            }
            // received中url對(duì)應(yīng)URL列表為空,則直接重置該url協(xié)議為empty
            if (urls == null || urls.isEmpty()) {
                if (urls == null) {
                    urls = new ConcurrentHashSet<URL>();
                }
                URL empty = url.setProtocol(Constants.EMPTY_PROTOCOL);
                urls.add(empty);
            }
            List<URL> list = toList(urls);
            // 同步變更消息到各訂閱者
            for (NotifyListener listener : entry.getValue()) {
                notify(key, listener, list);
            }
        }
    }
}

最后來(lái)看multicast方法庸汗,邏輯比較簡(jiǎn)單惫确,即為發(fā)送多播消息到多播消息組(多播消息由構(gòu)造方法中創(chuàng)建的線程異步處理

private void multicast(String msg) {
    if (logger.isInfoEnabled()) {
        logger.info("Send multicast message: " + msg + " to " + multicastAddress + ":" + multicastPort);
    }
    try {
        byte[] data = (msg + "\n").getBytes();
        DatagramPacket hi = new DatagramPacket(data, data.length, multicastAddress, multicastPort);
        multicastSocket.send(hi);
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}
2.3.2.2、其他邏輯

MulticastRegistry的其他邏輯包括doRegister蚯舱、doUnregister改化、doSubscribe、doUnsubscribe枉昏、destroy陈肛,前面四個(gè)方法的實(shí)現(xiàn)方式完全一致,即拼接并發(fā)送多播消息到多播組兄裂,這里以doRegister為例句旱,代碼如下:

@Override
public void doRegister(URL url) {
    multicast(Constants.REGISTER + " " + url.toFullString());
}

最后來(lái)看destroy阳藻,銷毀需要處理多播組、關(guān)閉所有線程池谈撒、關(guān)閉所有socket腥泥,來(lái)看代碼:

@Override
public void destroy() {
    super.destroy();
    try {
        // 取消定時(shí)清理任務(wù)
        ExecutorUtil.cancelScheduledFuture(cleanFuture);
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
    try {
        // 退出多播組,并關(guān)閉socket
        multicastSocket.leaveGroup(multicastAddress);
        multicastSocket.close();
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
    // 關(guān)閉定時(shí)清理線程池,這里關(guān)注下線程池的優(yōu)雅關(guān)閉
    ExecutorUtil.gracefulShutdown(cleanExecutor, cleanPeriod);
}

來(lái)看下dubbo線程池的優(yōu)雅關(guān)閉:

public static void gracefulShutdown(Executor executor, int timeout) {
    if (!(executor instanceof ExecutorService) || isTerminated(executor)) {
        return;
    }
    final ExecutorService es = (ExecutorService) executor;
    try {
        // 新任務(wù)不能再提交
        es.shutdown();
    } catch (SecurityException ex2) {
        return;
    } catch (NullPointerException ex2) {
        return;
    }
    try {
        // 等待隊(duì)列內(nèi)剩余任務(wù)結(jié)束啃匿,立即關(guān)閉線程池
        if (!es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
            es.shutdownNow();
        }
    } catch (InterruptedException ex) {
        es.shutdownNow();
        Thread.currentThread().interrupt();
    }
    // 若線程池仍未關(guān)閉蛔外,則新建線程用于線程池的關(guān)閉
    if (!isTerminated(es)) {
        newThreadToCloseExecutor(es);
    }
}
// 專門用于關(guān)閉線程池的線程池shutdownExecutor
private static void newThreadToCloseExecutor(final ExecutorService es) {
        if (!isTerminated(es)) {
            shutdownExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        for (int i = 0; i < 1000; i++) {
                            es.shutdownNow();
                            if (es.awaitTermination(10, TimeUnit.MILLISECONDS)) {
                                break;
                            }
                        }
                    } catch (InterruptedException ex) {
                        Thread.currentThread().interrupt();
                    } catch (Throwable e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            });
        }
    }

了解完MulticastRegistry的所有方法,我們來(lái)看整個(gè)MulticastRegistry中的數(shù)據(jù)流溯乒,執(zhí)行注冊(cè)操作(這里以register為例夹厌,其他操作類似)將數(shù)據(jù)多播至多播組,然后由deamon線程異步將數(shù)據(jù)同步至訂閱者裆悄,如下圖所示:

MulticastRegistry-dataflow (1).jpg

2.3.3矛纹、RedisRegistry

RedisRegistry,即使用Redis存儲(chǔ)URL數(shù)據(jù)的注冊(cè)中心光稼,這里從初始化或南、數(shù)據(jù)流以及核心方法等幾個(gè)方面進(jìn)行解析。RedisRegisty除了使用Redis緩存之外钟哥,還使用了Redis的消息隊(duì)列迎献,用于doSubscribe過(guò)程中的URL數(shù)據(jù)變更消息處理。

2.3.3.1腻贰、初始化

先來(lái)看RedisRegistry的初始化,大致可以分為父類構(gòu)造方法扒秸、注冊(cè)中心參數(shù)初始化播演、RedisPool連接池創(chuàng)建、過(guò)期調(diào)度線程池啟動(dòng)伴奥。父類構(gòu)造方法主要是AbstractRegistry構(gòu)造方法的調(diào)用写烤;參數(shù)初始化主要包括RedisPool連接池參數(shù)初始化、注冊(cè)中心重連時(shí)間間隔初始化拾徙、過(guò)期線程池調(diào)度時(shí)間間隔初始化等洲炊;RedisPool的創(chuàng)建比較簡(jiǎn)單,即直接取上一步中的參數(shù)創(chuàng)建RedisPool連接池(這里需要注意尼啡,會(huì)對(duì)同一個(gè)URL的多個(gè)backup地址單獨(dú)創(chuàng)建RedisPool暂衡,多個(gè)地址之間進(jìn)行了隔離,保證URL的可用性)崖瞭;過(guò)期調(diào)度線程池的調(diào)度比較容易理解狂巢,也就是說(shuō)在注冊(cè)中心構(gòu)造過(guò)程中已經(jīng)啟動(dòng)了對(duì)過(guò)期URL數(shù)據(jù)的定時(shí)清理,默認(rèn)調(diào)度間隔30s书聚。

RedisRegistry-init.jpg

然后來(lái)看構(gòu)造方法:

public RedisRegistry(URL url) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    // 借用對(duì)象池管理配置
    GenericObjectPoolConfig config = new GenericObjectPoolConfig();
    config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
    config.setTestOnReturn(url.getParameter("test.on.return", false));
    config.setTestWhileIdle(url.getParameter("test.while.idle", false));
    // config參數(shù)配置唧领,略去

    // 支持的redis集群模式藻雌,failover和replicate
    String cluster = url.getParameter("cluster", "failover");
    if (!"failover".equals(cluster) && !"replicate".equals(cluster)) {
        throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate.");
    }
    replicate = "replicate".equals(cluster);

    List<String> addresses = new ArrayList<>();
    addresses.add(url.getAddress());
    String[] backups = url.getParameter(Constants.BACKUP_KEY, new String[0]);
    if (ArrayUtils.isNotEmpty(backups)) {
        addresses.addAll(Arrays.asList(backups));
    }

    for (String address : addresses) {
        int i = address.indexOf(':');
        String host;
        int port;
        if (i > 0) {
            host = address.substring(0, i);
            port = Integer.parseInt(address.substring(i + 1));
        } else {
            host = address;
            port = DEFAULT_REDIS_PORT;
        }
        // 每個(gè)地址對(duì)應(yīng)一個(gè)jedis連接池,URL的各地址之間互不影響斩个,保證可用性
        this.jedisPools.put(address, new JedisPool(config, host, port,
                url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT), StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword(),
                url.getParameter("db.index", 0)));
    }

    this.reconnectPeriod = url.getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD);
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        group = Constants.PATH_SEPARATOR + group;
    }
    if (!group.endsWith(Constants.PATH_SEPARATOR)) {
        group = group + Constants.PATH_SEPARATOR;
    }
    // group = "/group/"
    this.root = group;

    // 配置過(guò)期定時(shí)調(diào)度
    this.expirePeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
    this.expireFuture = expireExecutor.scheduleWithFixedDelay(() -> {
        try {
            // 延遲過(guò)期
            deferExpired(); // Extend the expiration time
        } catch (Throwable t) { // Defensive fault tolerance
            logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
        }
    }, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
}

// 延遲過(guò)期
private void deferExpired() {
        for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
            JedisPool jedisPool = entry.getValue();
            try {
                try (Jedis jedis = jedisPool.getResource()) {
                    for (URL url : new HashSet<>(getRegistered())) {
                        if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
                            String key = toCategoryPath(url);
                            // 延長(zhǎng)緩存過(guò)期時(shí)間胯杭,并發(fā)布隊(duì)列消息
                            if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
                                jedis.publish(key, Constants.REGISTER);
                            }
                        }
                    }
                    // 如果開(kāi)啟了強(qiáng)制清理開(kāi)關(guān);則直接清理redis中數(shù)據(jù),并發(fā)布注銷消息
                    if (admin) {
                        clean(jedis);
                    }
                    // 無(wú)需創(chuàng)建副本受啥,則只寫單臺(tái)redis歉摧,直接break;
                    if (!replicate) {
                        break;//  If the server side has synchronized data, just write a single machine
                    }
                }
            } catch (Throwable t) {
                logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
            }
        }
    }

再來(lái)看整個(gè)RedisRegistry中的數(shù)據(jù)流腔呜,可以發(fā)現(xiàn)叁温,RedisRegistry通過(guò)Redis緩存與Redis消息隊(duì)列,異步+同步的方式將數(shù)據(jù)同步至本地cache文件核畴、內(nèi)存緩存Properties以及具體的數(shù)據(jù)訂閱者膝但,如RegistryDirectory:


RedisRegistry-dataflow (1).jpg
2.3.3.2、注冊(cè)(doRegister)

了解完RedisRegistry中的數(shù)據(jù)流谤草,再來(lái)看注冊(cè)過(guò)程就比較容易理解了跟束。主要包括兩個(gè)核心操作:將當(dāng)前URL緩存至Redis;然后將URL信息發(fā)布至Redis消息隊(duì)列丑孩。直接來(lái)看代碼:

public void doRegister(URL url) {
    String key = toCategoryPath(url);
    String value = url.toFullString();
    String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
    boolean success = false;
    RpcException exception = null;
    // 當(dāng)前URL的所有可用地址冀宴,遍歷,存入redis温学,并發(fā)布注冊(cè)消息
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
        JedisPool jedisPool = entry.getValue();
        try {
            try (Jedis jedis = jedisPool.getResource()) {
                jedis.hset(key, value, expire);
                jedis.publish(key, Constants.REGISTER);
                success = true;
                if (!replicate) {
                    break; //  If the server side has synchronized data, just write a single machine
                }
            }
        } catch (Throwable t) {
            exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
        }
    }
    if (exception != null) {
        if (success) {
            logger.warn(exception.getMessage(), exception);
        } else {
            throw exception;
        }
    }
}
2.3.3.3略贮、注銷(doUnregister)

注銷邏輯即注冊(cè)邏輯的反向操作,先刪除redis緩存仗岖,再發(fā)布注銷消息:

public void doUnregister(URL url) {
    String key = toCategoryPath(url);
    String value = url.toFullString();
    RpcException exception = null;
    boolean success = false;
    // 遍歷當(dāng)前URL的所有可用節(jié)點(diǎn)地址逃延,遍歷從redis中刪除,并發(fā)布注銷消息
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
        JedisPool jedisPool = entry.getValue();
        try {
            try (Jedis jedis = jedisPool.getResource()) {
                jedis.hdel(key, value);
                jedis.publish(key, Constants.UNREGISTER);
                success = true;
                if (!replicate) {
                    break; //  If the server side has synchronized data, just write a single machine
                }
            }
        } catch (Throwable t) {
            exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
        }
    }
    if (exception != null) {
        if (success) {
            logger.warn(exception.getMessage(), exception);
        } else {
            throw exception;
        }
    }
}
2.3.3.4轧拄、訂閱(doSubscribe)

訂閱邏輯稍微復(fù)雜揽祥,支持同步和異步方式將URL信息同步至訂閱者,如RegistryDirectory檩电;同步邏輯比較簡(jiǎn)單拄丰,將被訂閱的URL與redis中緩存URL進(jìn)行交叉過(guò)濾,最終通過(guò)父類notify方法(AbstractRegistry的notify方法)俐末,完成URL數(shù)據(jù)同步料按;異步邏輯由Notifier線程實(shí)現(xiàn),Notifier依照線性退避規(guī)則執(zhí)行鹅搪,執(zhí)行邏輯除了同步URL信息之外站绪,還會(huì)訂閱redis消息隊(duì)列并消費(fèi)隊(duì)列中消息,當(dāng)然丽柿,消費(fèi)的核心邏輯也是執(zhí)行doNotify方法恢准。

public void doSubscribe(final URL url, final NotifyListener listener) {
    String service = toServicePath(url);
    // 每個(gè)service對(duì)應(yīng)一個(gè)notifier線程
    Notifier notifier = notifiers.get(service);
    // 異步方式魂挂,創(chuàng)建Notifier線程,并啟動(dòng)馁筐,線性回避執(zhí)行涂召,數(shù)據(jù)流 : redis緩存、redis消息隊(duì)列 -> 訂閱者
    if (notifier == null) {
        Notifier newNotifier = new Notifier(service);
        notifiers.putIfAbsent(service, newNotifier);
        notifier = notifiers.get(service);
        // notifier線程創(chuàng)建后即啟動(dòng)
        if (notifier == newNotifier) {
            notifier.start();
        }
    }
    boolean success = false;
    RpcException exception = null;
    // 同步方式訂閱 數(shù)據(jù)流 redis緩存 -> 訂閱者敏沉,如RegistryDirector
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
        JedisPool jedisPool = entry.getValue();
        try {
            try (Jedis jedis = jedisPool.getResource()) {
                // 區(qū)分所有服務(wù)與單個(gè)服務(wù)果正。
                if (service.endsWith(Constants.ANY_VALUE)) {
                    admin = true;
                    Set<String> keys = jedis.keys(service);
                    if (CollectionUtils.isNotEmpty(keys)) {
                        Map<String, Set<String>> serviceKeys = new HashMap<>();
                        for (String key : keys) {
                            String serviceKey = toServicePath(key);
                            Set<String> sk = serviceKeys.computeIfAbsent(serviceKey, k -> new HashSet<>());
                            sk.add(key);
                        }
                        for (Set<String> sk : serviceKeys.values()) {
                            doNotify(jedis, sk, url, Collections.singletonList(listener));
                        }
                    }
                } else {
                    doNotify(jedis, jedis.keys(service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE), url, Collections.singletonList(listener));
                }
                success = true;
                break; // Just read one server's data
            }
        } catch (Throwable t) { // Try the next server
            exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
        }
    }
    // 異常處理邏輯略去
}

來(lái)看doNotify實(shí)現(xiàn)

private void doNotify(Jedis jedis, String key) {
    for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<>(getSubscribed()).entrySet()) {
        doNotify(jedis, Collections.singletonList(key), entry.getKey(), new HashSet<>(entry.getValue()));
    }
}

private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<NotifyListener> listeners) {
        if (keys == null || keys.isEmpty()
                || listeners == null || listeners.isEmpty()) {
            return;
        }
        long now = System.currentTimeMillis();
        List<URL> result = new ArrayList<>();
        List<String> categories = Arrays.asList(url.getParameter(Constants.CATEGORY_KEY, new String[0]));
        String consumerService = url.getServiceInterface();
        for (String key : keys) {
            if (!Constants.ANY_VALUE.equals(consumerService)) {
                String providerService = toServiceName(key);
                if (!providerService.equals(consumerService)) {
                    continue;
                }
            }
            String category = toCategoryName(key);
            if (!categories.contains(Constants.ANY_VALUE) && !categories.contains(category)) {
                continue;
            }
            List<URL> urls = new ArrayList<>();
            // redis緩存URL信息與內(nèi)存中被訂閱的URL信息進(jìn)行交叉過(guò)濾
            Map<String, String> values = jedis.hgetAll(key);
            if (CollectionUtils.isNotEmptyMap(values)) {
                for (Map.Entry<String, String> entry : values.entrySet()) {
                    URL u = URL.valueOf(entry.getKey());
                    if (!u.getParameter(Constants.DYNAMIC_KEY, true)
                            || Long.parseLong(entry.getValue()) >= now) {
                        if (UrlUtils.isMatch(url, u)) {
                            // 與url匹配的緩存URL放入待通知列表
                            urls.add(u);
                        }
                    }
                }
            }
            // 若無(wú)有效URL,則填充urls地址為任意值*
            if (urls.isEmpty()) {
                urls.add(url.setProtocol(Constants.EMPTY_PROTOCOL)
                        .setAddress(Constants.ANYHOST_VALUE)
                        .setPath(toServiceName(key))
                        .addParameter(Constants.CATEGORY_KEY, category));
            }
            result.addAll(urls);
            if (logger.isInfoEnabled()) {
                logger.info("redis notify: " + key + " = " + urls);
            }
        }
        if (CollectionUtils.isEmpty(result)) {
            return;
        }
            // 由父類nofity方法完成最終URL數(shù)據(jù)的同步
        for (NotifyListener listener : listeners) {
            notify(url, listener, result);
        }
    }

再來(lái)看Notifier實(shí)現(xiàn)

private class Notifier extends Thread {

    private final String service;
    private final AtomicInteger connectSkip = new AtomicInteger();
    private final AtomicInteger connectSkipped = new AtomicInteger();
    private volatile Jedis jedis;
    private volatile boolean first = true;
    private volatile boolean running = true;
    private volatile int connectRandom;

    public Notifier(String service) {
        super.setDaemon(true);
        super.setName("DubboRedisSubscribe");
        this.service = service;
    }

    private void resetSkip() {
        connectSkip.set(0);
        connectSkipped.set(0);
        connectRandom = 0;
    }

    // 首次結(jié)果為false盟迟;線性退避算法
    private boolean isSkip() {
        // 初始值均為0
        int skip = connectSkip.get(); // Growth of skipping times
        if (skip >= 10) { 
            if (connectRandom == 0) {
                connectRandom = ThreadLocalRandom.current().nextInt(10);
            }
            skip = 10 + connectRandom;
        }
        // 初始值 false秋泳,
        // 第一次:false,0-1
        // 第二次:true攒菠,1-1
        // 第三次:false迫皱,0-2
        // 第四次:true,1-2
        // 第五次:true,2-2
        // 第五次:false辖众,0-3
        if (connectSkipped.getAndIncrement() < skip) { // Check the number of skipping times
            return true;
        }

        // skip 自增
        connectSkip.incrementAndGet();
        // skiped重置0
        connectSkipped.set(0);
        connectRandom = 0;
        return false;
    }

    @Override
    public void run() {
        while (running) {
            try {
                // 線性回避卓起,是否跳過(guò)
                if (!isSkip()) {
                    try {
                        for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {
                            JedisPool jedisPool = entry.getValue();
                            try {
                                jedis = jedisPool.getResource();
                                try {
                                    // service是具體類型還是所有服務(wù)
                                    if (service.endsWith(Constants.ANY_VALUE)) {
                                        if (!first) {
                                            first = false;
                                            Set<String> keys = jedis.keys(service);
                                            if (CollectionUtils.isNotEmpty(keys)) {
                                                for (String s : keys) {
                                                    //  同步redis中緩存數(shù)據(jù)至各訂閱者以及本地cache文件
                                                    doNotify(jedis, s);
                                                }
                                            }
                                            // 同步完之后,重置skip
                                            resetSkip();
                                        }
                                        // 處理Redis消息隊(duì)列中所有與service匹配的消息
                                        jedis.psubscribe(new NotifySub(jedisPool), service); // blocking
                                    } else {
                                        if (!first) {
                                            first = false;
                                            //  同步redis中緩存數(shù)據(jù)至各訂閱者以及本地cache文件
                                            doNotify(jedis, service);
                                            resetSkip();
                                        }
                                        // 處理Redis消息隊(duì)列中所有與service匹配的消息凹炸,同樣的同步至各數(shù)據(jù)訂閱者以及本地cache文件戏阅。
                                        jedis.psubscribe(new NotifySub(jedisPool), service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE); // blocking
                                    }
                                    break;
                                } finally {
                                    jedis.close();
                                }
                            } catch (Throwable t) { // Retry another server
                                logger.warn("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
                                // If you only have a single redis, you need to take a rest to avoid overtaking a lot of CPU resources
                                sleep(reconnectPeriod);
                            }
                        }
                    } catch (Throwable t) {
                        logger.error(t.getMessage(), t);
                        sleep(reconnectPeriod);
                    }
                }
            } catch (Throwable t) {
                logger.error(t.getMessage(), t);
            }
        }
    }

    public void shutdown() {
        try {
            running = false;
            jedis.disconnect();
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
    }
}

這里順便提一下NotifySub,繼承自JedisPubSub啤它,用于redis消息隊(duì)列中消息的消費(fèi)奕筐,核心邏輯在onMessage方法:

@Override
public void onMessage(String key, String msg) {
    if (logger.isInfoEnabled()) {
        logger.info("redis event: " + key + " = " + msg);
    }
    // 只處理注冊(cè)、注銷類消息
    if (msg.equals(Constants.REGISTER)
            || msg.equals(Constants.UNREGISTER)) {
        try {
            Jedis jedis = jedisPool.getResource();
            try {
                doNotify(jedis, key);
            } finally {
                jedis.close();
            }
        } catch (Throwable t) { // TODO Notification failure does not restore mechanism guarantee
            logger.error(t.getMessage(), t);
        }
    }
}

2.3.4蚕键、ZookeeperRegistry

ZookeeperRegistry救欧,即zk注冊(cè)中心,也是dubbo官方默認(rèn)采用的注冊(cè)中心锣光。先來(lái)看構(gòu)造方法:

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        group = Constants.PATH_SEPARATOR + group;
    }
    this.root = group;
    // 初始化zkClient
    zkClient = zookeeperTransporter.connect(url);
    // 若是重連狀態(tài),執(zhí)行恢復(fù)邏輯铝耻,即將已注冊(cè)過(guò)的URL誊爹,放入retry列表,重新注冊(cè)瓢捉;
    zkClient.addStateListener(state -> {
        if (state == StateListener.RECONNECTED) {
            try {
                recover();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    });
}

構(gòu)造方法里有一個(gè)參數(shù)频丘,ZookeeperTransporter,先來(lái)看一下這個(gè)ZookeeperTransporter泡态。

2.3.4.1甥材、ZookeeperTransporter

ZookeeperTransporter是dubbo對(duì)zk客戶端的適配接口蚕涤,支持SPI(方法級(jí)),內(nèi)部只有一個(gè)connect方法鱼鸠,返回ZookeeperClient實(shí)例。我們知道厢呵,zk客戶端有常用的兩個(gè)實(shí)現(xiàn),分別是:

  1. Curator,Netflix公司開(kāi)源的一套zookeeper客戶端框架,jar包gav如下:

    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>4.0.1</version>
    </dependency>
    
  2. Zkclient,Github上一個(gè)開(kāi)源的Zookeeper客戶端,在Zookeeper原生 API接口之上進(jìn)行了包裝员萍,是一個(gè)更加易用的Zookeeper客戶端,jar包gav如下:

    <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.2</version>
    </dependency>
    

    dubbo中借助這兩種客戶端組件,ZookeeperClient的實(shí)現(xiàn)有CuratorZookeeperClient拣度、ZkclientZookeeperClient碎绎,內(nèi)部通過(guò)客戶端組件完成zk的連接、監(jiān)聽(tīng)等動(dòng)作抗果,具體邏輯這里不做詳細(xì)解析筋帖。繼續(xù)來(lái)看ZookeeperTransporter,基于接口的基類實(shí)現(xiàn)AbstractZookeeperTransporter冤馏,實(shí)現(xiàn)了connect方法日麸,同時(shí)定義模板方法createZookeeperClient由具體的Transporter(CuratorZookeeperTransporter、ZkclientZookeeperTransporter宿接,邏輯比較簡(jiǎn)單赘淮,省略)實(shí)現(xiàn)。直接來(lái)看基類的connect方法:

    @Override
    public ZookeeperClient connect(URL url) {
        ZookeeperClient zookeeperClient;
        // backUrl解析
        List<String> addressList = getURLBackupAddress(url);
        // The field define the zookeeper server , including protocol, host, port, username, password
        // fetchAndUpdateZookeeperClientCache 邏輯比較簡(jiǎn)單睦霎,從緩存里取梢卸,取不到則返回null;
        if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
            logger.info("find valid zookeeper client from the cache for address: " + url);
            return zookeeperClient;
        }
        // avoid creating too many connections副女, so add lock蛤高,加鎖,防并發(fā)碑幅。
        synchronized (zookeeperClientMap) {
            if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
                logger.info("find valid zookeeper client from the cache for address: " + url);
                return zookeeperClient;
            }
            // 緩存沒(méi)取到戴陡,創(chuàng)建zkClient,并緩存到map
            zookeeperClient = createZookeeperClient(toClientURL(url));
            logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
            writeToClientMap(addressList, zookeeperClient);
        }
        return zookeeperClient;
    }
    
2.3.4.2沟涨、doRegister方法

基類FailbackRegistry的模板方法實(shí)現(xiàn)恤批,邏輯非常簡(jiǎn)單:

@Override
public void doRegister(URL url) {
    try {
        // 創(chuàng)建zk臨時(shí)節(jié)點(diǎn),節(jié)點(diǎn)目錄類似 /dubbo/xxx/xxx/xxx.xxx.xxxService
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
2.3.4.3裹赴、doUnregister方法

基類FailbackRegistry的模板方法實(shí)現(xiàn)喜庞,邏輯同樣非常簡(jiǎn)單:

@Override
public void doUnregister(URL url) {
    try {
        // 刪除zk節(jié)點(diǎn)
        zkClient.delete(toUrlPath(url));
    } catch (Throwable e) {
        throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
2.3.4.4、doSubscribe方法

基類FailbackRegistry的模板方法實(shí)現(xiàn)棋返,邏輯相對(duì)復(fù)雜延都,主要分為幾部分1)zkListener的初始化,2)遞歸訂閱url變更信息睛竣,創(chuàng)建zk永久節(jié)點(diǎn)晰房,3)執(zhí)行父類notify(邏輯參考AbstractRegistry解析部分),代碼如下:

@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        // 無(wú)指定服務(wù)
        if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            // 根節(jié)點(diǎn)/dubbo
            String root = toRootPath();
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
            if (listeners == null) {
                zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                listeners = zkListeners.get(url);
            }
            // 初始化zkListener
            ChildListener zkListener = listeners.get(listener);
            if (zkListener == null) {
                // 初始化anyServices 注冊(cè) ChildListener到listener內(nèi)存緩存,訂閱childChange變更殊者,即當(dāng)執(zhí)行childChanged時(shí)与境,完成對(duì)所有child的訂閱。
                listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
                    for (String child : currentChilds) {
                        child = URL.decode(child);
                        if (!anyServices.contains(child)) {
                            anyServices.add(child);
                            // 遞歸訂閱url變更消息幽污,最終會(huì)走到else嚷辅,結(jié)束
                            subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                    Constants.CHECK_KEY, String.valueOf(false)), listener);
                        }
                    }
                });
                zkListener = listeners.get(listener);
            }
            // 創(chuàng)建永久節(jié)點(diǎn)/dubbo
            zkClient.create(root, false);
            // 訂閱URL變更信息
            List<String> services = zkClient.addChildListener(root, zkListener);
            if (CollectionUtils.isNotEmpty(services)) {
                for (String service : services) {
                    service = URL.decode(service);
                    anyServices.add(service);
                    // 遞歸訂閱url變更消息,最終會(huì)走到else距误,結(jié)束
                    subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                }
            }
        } else {
            //指定URL變更處理
            List<URL> urls = new ArrayList<>();
            for (String path : toCategoriesPath(url)) {
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    // childChange事件簸搞,只做notify到指定listener
                    listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                    zkListener = listeners.get(listener);
                }
                zkClient.create(path, false);
                // 監(jiān)聽(tīng)該path
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            // 父類AbstractRegistry的notify邏輯,略去
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
2.3.4.5准潭、doUnsubscribe方法

基類FailbackRegistry的模板方法實(shí)現(xiàn)趁俊,直接上代碼:

@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
    if (listeners != null) {
        ChildListener zkListener = listeners.get(listener);
        if (zkListener != null) {
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                // 邏輯比較簡(jiǎn)單,即不再訂閱對(duì)應(yīng)的listener
                zkClient.removeChildListener(root, zkListener);
            } else {
                for (String path : toCategoriesPath(url)) {
                    zkClient.removeChildListener(path, zkListener);
                }
            }
        }
    }
}
2.3.4.6刑然、lookup方法

基類AbstractRegistry的模板方法實(shí)現(xiàn),核心邏輯是從zk中查詢指定URL對(duì)應(yīng)地址的所有可用URL(不同Category)寺擂,直接來(lái)看代碼:

@Override
public List<URL> lookup(URL url) {
    if (url == null) {
        throw new IllegalArgumentException("lookup url == null");
    }
    try {
        List<String> providers = new ArrayList<>();
        for (String path : toCategoriesPath(url)) {
            List<String> children = zkClient.getChildren(path);
            if (children != null) {
                providers.addAll(children);
            }
        }
        return toUrlsWithoutEmpty(url, providers);
    } catch (Throwable e) {
        throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

小結(jié)

本文重點(diǎn)解析了dubbo中Registry的相關(guān)核心實(shí)現(xiàn),從RegistryFactory到Registry泼掠,dubbo抽象了一系列注冊(cè)中心怔软,為用戶自定義擴(kuò)展提供了非常優(yōu)良的入口;同時(shí)择镇,dubbo為我們提供了基于緩存挡逼、多播、zk等的注冊(cè)中心實(shí)現(xiàn)腻豌,非常方便家坎,不得不感嘆設(shè)計(jì)的非常好。

注:源碼版本 2.7.1吝梅。8月以來(lái)虱疏,接連經(jīng)歷了迎接小生命的驚喜,跳槽后的適應(yīng)期苏携,中間耽誤了挺長(zhǎng)一段時(shí)間做瞪,后面我會(huì)努力持續(xù)更新的,come on右冻。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末穿扳,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子国旷,更是在濱河造成了極大的恐慌,老刑警劉巖茫死,帶你破解...
    沈念sama閱讀 217,084評(píng)論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件跪但,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)屡久,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,623評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門忆首,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人被环,你說(shuō)我怎么就攤上這事糙及。” “怎么了筛欢?”我有些...
    開(kāi)封第一講書人閱讀 163,450評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵浸锨,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我版姑,道長(zhǎng)柱搜,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書人閱讀 58,322評(píng)論 1 293
  • 正文 為了忘掉前任剥险,我火速辦了婚禮聪蘸,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘表制。我一直安慰自己健爬,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,370評(píng)論 6 390
  • 文/花漫 我一把揭開(kāi)白布么介。 她就那樣靜靜地躺著娜遵,像睡著了一般。 火紅的嫁衣襯著肌膚如雪夭拌。 梳的紋絲不亂的頭發(fā)上魔熏,一...
    開(kāi)封第一講書人閱讀 51,274評(píng)論 1 300
  • 那天,我揣著相機(jī)與錄音鸽扁,去河邊找鬼蒜绽。 笑死,一個(gè)胖子當(dāng)著我的面吹牛桶现,可吹牛的內(nèi)容都是我干的躲雅。 我是一名探鬼主播,決...
    沈念sama閱讀 40,126評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼骡和,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼相赁!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起慰于,我...
    開(kāi)封第一講書人閱讀 38,980評(píng)論 0 275
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤钮科,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后婆赠,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體绵脯,經(jīng)...
    沈念sama閱讀 45,414評(píng)論 1 313
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,599評(píng)論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了蛆挫。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片赃承。...
    茶點(diǎn)故事閱讀 39,773評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖悴侵,靈堂內(nèi)的尸體忽然破棺而出瞧剖,到底是詐尸還是另有隱情,我是刑警寧澤可免,帶...
    沈念sama閱讀 35,470評(píng)論 5 344
  • 正文 年R本政府宣布抓于,位于F島的核電站,受9級(jí)特大地震影響巴元,放射性物質(zhì)發(fā)生泄漏毡咏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,080評(píng)論 3 327
  • 文/蒙蒙 一逮刨、第九天 我趴在偏房一處隱蔽的房頂上張望呕缭。 院中可真熱鬧,春花似錦修己、人聲如沸恢总。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,713評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)片仿。三九已至,卻和暖如春尤辱,著一層夾襖步出監(jiān)牢的瞬間砂豌,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,852評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工光督, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留阳距,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,865評(píng)論 2 370
  • 正文 我出身青樓结借,卻偏偏與公主長(zhǎng)得像筐摘,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子船老,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,689評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容