Dubbo2.6.x—注冊(cè)中心源碼分析 dubbo-registry模塊 (api and zookeeper)

文章有點(diǎn)長(zhǎng)兄渺,親侵浸,要慢慢看!

1. 概述

1.1 注冊(cè)中心作用

  • 在Dubbo中础嫡,注冊(cè)中心為核心模塊指么,Dubbo通過(guò)注冊(cè)中心實(shí)現(xiàn)各個(gè)服務(wù)之間的注冊(cè)與發(fā)現(xiàn)等功能,而本次源碼的分析為registry模塊的api和zookeeper的實(shí)現(xiàn)榴鼎。
  • 服務(wù)的提供者和消費(fèi)者都需要把自己注冊(cè)到注冊(cè)中心伯诬,提供者讓消費(fèi)者感知到服務(wù)存在,從而消費(fèi)者發(fā)起遠(yuǎn)程調(diào)用巫财,也讓服務(wù)治理中心感知到有服務(wù)提供者上線盗似;消費(fèi)者則是讓服務(wù)治理中心可以發(fā)現(xiàn)自己。

1.2 Zookeeper

  • Zookeeper是一個(gè)提供分布式協(xié)調(diào)服務(wù)的開(kāi)源軟件平项,常用于解決分布式應(yīng)用中經(jīng)常遇到的一些數(shù)據(jù)管理問(wèn)題赫舒。Zookeeper功能非常強(qiáng)大,可以實(shí)現(xiàn)如分布式應(yīng)用配置管理闽瓢、統(tǒng)一命名服務(wù)接癌、狀態(tài)同步服務(wù)、集群管理等功能扣讼。關(guān)于Zookeeper缺猛,大家如果想了解可以關(guān)注一下自行去搜索一下。

1.3 registry模塊

  • 整個(gè)registry下的模塊


    dubbo-registry
  • api是注冊(cè)中心所有的API和抽象類(lèi)實(shí)現(xiàn)

  • default是注冊(cè)中心的內(nèi)存實(shí)現(xiàn)

  • zookeeper、redis荔燎、nacos就是基于不同的組件的實(shí)現(xiàn)

  • multicast是通過(guò)廣播實(shí)現(xiàn)

1.4 注冊(cè)中心工作流程

image

這張圖相信只要是用過(guò)的都不陌生耻姥,掛在dubbo.io的官網(wǎng)掛了很久很久了。那么這個(gè)流程主要是說(shuō)了什么呢有咨?

  • 0.是生產(chǎn)者(服務(wù)提供方)初始化咏闪,就好比你寫(xiě)了個(gè)服務(wù)實(shí)現(xiàn)然后啟動(dòng)起來(lái)。
  • 1.是服務(wù)提供方向啟動(dòng)器起來(lái)過(guò)后摔吏,就會(huì)向注冊(cè)中心提交自己的服務(wù)信息
  • 2.是消費(fèi)者(服務(wù)消費(fèi)方)向注冊(cè)中心提交訂閱請(qǐng)求鸽嫂。就是你寫(xiě)了一個(gè)業(yè)務(wù)需要用到一個(gè)生產(chǎn)者服務(wù),這個(gè)時(shí)候你需要提前打招呼征讲,我需要它据某,有它的消息的時(shí)候讓注冊(cè)中心告訴你他的信息。
  • 3.這個(gè)時(shí)候當(dāng)服務(wù)提供者離開(kāi)或者是有新的服務(wù)提供者加入诗箍,注冊(cè)中心就會(huì)將變化的信息發(fā)送給消費(fèi)者癣籽。
  • 4.消費(fèi)者知道了生產(chǎn)者的信息,要用的時(shí)候就直接調(diào)用滤祖,注意這里的調(diào)用是不經(jīng)過(guò)注冊(cè)中心的筷狼,而是直接同步的網(wǎng)絡(luò)調(diào)用。

2. dubbo-registry-api

  • api層主要是注冊(cè)中心所有API的抽象實(shí)現(xiàn)類(lèi)匠童,并不是實(shí)際提供服務(wù)的組件埂材。
  • 模塊關(guān)系圖


    image
  • 類(lèi)關(guān)系圖


    image
  • 目錄結(jié)構(gòu)


    image

2.1 Registry的相關(guān)實(shí)現(xiàn)

  • 由類(lèi)的關(guān)系圖科看到Registry的實(shí)現(xiàn)關(guān)系,我們接下來(lái)就分析下各個(gè)接口和這個(gè)類(lèi)
image

2.1.1 RegistryService

  • 注冊(cè)中心模塊的服務(wù)接口:提供了注冊(cè)汤求、取消注冊(cè)俏险、訂閱、取消訂閱扬绪、查詢(xún)符合條件的已注冊(cè)數(shù)據(jù)竖独。
  • 雖然官方有解釋這個(gè)的地方但是還是復(fù)制一下方法解釋如下,官方地址是:http://dubbo.apache.org/zh-cn/docs/dev/impls/registry.html
public interface RegistryService {
    /**
     * 注冊(cè)服務(wù).
     * @param url 注冊(cè)信息挤牛,不允許為空莹痢,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
     */
    void register(URL url);
 
    /**
     * 取消注冊(cè)服務(wù).
     * @param url 注冊(cè)信息,不允許為空墓赴,如:dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
     */
    void unregister(URL url);
 
    /**
     * 訂閱服務(wù).
     * @param listener 變更事件監(jiān)聽(tīng)器竞膳,不允許為空
     */
    void subscribe(URL url, NotifyListener listener);
 
    /**
     * 取消訂閱服務(wù).
     * @param url 訂閱條件,不允許為空竣蹦,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
     * @param listener 變更事件監(jiān)聽(tīng)器顶猜,不允許為空
     */
    void unsubscribe(URL url, NotifyListener listener);
 
    /**
     * 查詢(xún)注冊(cè)列表,與訂閱的推模式相對(duì)應(yīng)痘括,這里為拉模式,只返回一次結(jié)果。
     * 
     * @see org.apache.dubbo.registry.NotifyListener#notify(List)
     * @param url 查詢(xún)條件纲菌,不允許為空挠日,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
     * @return 已注冊(cè)信息列表,可能為空翰舌,含義同{@link org.apache.dubbo.registry.NotifyListener#notify(List<URL>)}的參數(shù)嚣潜。
     */
    List<URL> lookup(URL url);
}

2.1.2 Node (不在api中定義,在common模塊中)

  • 節(jié)點(diǎn)的接口 里面聲明了一些關(guān)于節(jié)點(diǎn)的操作方法
public interface Node {

    /**
     * 獲取節(jié)點(diǎn)Url
     */
    URL getUrl();

    /**
     * 是否可用
     */
    boolean isAvailable();

    /**
     * 銷(xiāo)毀節(jié)點(diǎn)
     */
    void destroy();

}

2.1.2 Registry

  • 這個(gè)接口其實(shí)就是把節(jié)點(diǎn)以及注冊(cè)中心服務(wù)的方法放在了一起
public interface Registry extends Node, RegistryService {
}

2.1.3 AbstractRegistry

  • AbstractRegistry實(shí)現(xiàn)了Registry接口,為減輕注冊(cè)中心的壓力椅贱,在該類(lèi)中實(shí)現(xiàn)了把本地URL緩存到property文件中的機(jī)制懂算,并且實(shí)現(xiàn)了注冊(cè)中心的注冊(cè)、訂閱等方法庇麦。
  • 看下類(lèi)圖
image
  • 首先是抽象類(lèi)的屬性
    // url地址分隔符计技,用于文件緩存,服務(wù)提供程序url分隔
    private static final char URL_SEPARATOR = ' ';
    // URL地址分隔的正則表達(dá)式山橄,用于分析文件緩存中的服務(wù)提供程序URL列表
    private static final String URL_SPLIT = "\\s+";
    // 日志輸出
    protected final Logger logger = LoggerFactory.getLogger(getClass());
    // 本地磁盤(pán)緩存垮媒,其中的特殊key為registies是記錄注冊(cè)表中心列表,其他是服務(wù)提供者的李彪
    private final Properties properties = new Properties();
    // 文件緩存寫(xiě)入執(zhí)行器 提供一個(gè)線程的線程池 
    private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
    // 是否同步保存文件標(biāo)志
    private final boolean syncSaveFile;
    // 這個(gè)是緩存的版本號(hào)
    private final AtomicLong lastCacheChanged = new AtomicLong();
    // 這個(gè)是已經(jīng)注冊(cè)的URL集合航棱,不僅僅是服務(wù)提供者的睡雇,也可以是服務(wù)消費(fèi)者的
    private final Set<URL> registered = new ConcurrentHashSet<URL>();

    // 已訂閱的url 值為url的監(jiān)聽(tīng)器集合
    private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
   
    // 消費(fèi)者或服務(wù)治理服務(wù)獲取注冊(cè)信息后的緩存對(duì)象
    // 內(nèi)存中服務(wù)器緩存的notified對(duì)象是ConcurrentHashMap里面嵌套了一個(gè)Map,
    // 外層Map的Key是消費(fèi)者的URL饮醇,
    // 內(nèi)層的Map的key是分類(lèi)它抱,包括provider,consumer朴艰,routes抗愁,configurators四種,
    // value則對(duì)應(yīng)服務(wù)列表呵晚,沒(méi)有服務(wù)提供者提供服務(wù)的URL蜘腌,會(huì)以一個(gè)特別的empty://前綴開(kāi)頭
    private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
    // 注冊(cè)中心的URL
    private URL registryUrl;
    // 本地磁盤(pán)緩存文件保存的是注冊(cè)中心的數(shù)據(jù)
    private File file;
2.1.3.1 構(gòu)造方法
    public AbstractRegistry(URL url) {
        // 設(shè)置注冊(cè)中心的地址URL
        setUrl(url);
        // 從URL參數(shù)中獲取是否同步保存的狀態(tài),URL中如果不包含饵隙,那就設(shè)置默認(rèn)值為false
        syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
        // 獲取文件路徑
        String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");

        // 開(kāi)始讀入文件
        File file = null;
        // 不存在就拋出異常
        if (ConfigUtils.isNotEmpty(filename)) {
            file = new File(filename);
            if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
                if (!file.getParentFile().mkdirs()) {
                    throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                }
            }
        }
        // 把文件對(duì)象放到屬性上
        this.file = file;
        // 加載文件中的參數(shù)放入Properties撮珠,Properties繼承HashTable蛮放。
        loadProperties();
        // 通知監(jiān)聽(tīng)器 URL變化 見(jiàn)下面notify的源碼
        notify(url.getBackupUrls());
    }
private void loadProperties() {
        if (file != null && file.exists()) {
            InputStream in = null;
            try {
                // 把文件中的key-value讀進(jìn)來(lái)
                in = new FileInputStream(file);
                // Properties是一個(gè)繼承HashTable的類(lèi).
                // 這個(gè)地方就是按行讀入涧卵,util里面的類(lèi)锅棕,里面調(diào)用了一個(gè)load0 方法會(huì)把key和value做分割然后放入Properties中袭景,汗菜。
                properties.load(in);
                if (logger.isInfoEnabled()) {
                    logger.info("Load registry store file " + file + ", data: " + properties);
                }
            } catch (Throwable e) {
                logger.warn("Failed to load registry store file " + file, e);
            } finally {
                // 關(guān)閉流
                if (in != null) {
                    try {
                        in.close();
                    } catch (IOException e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
        }
    }
2.1.3.2 lookup
  • 獲得消費(fèi)者url訂閱的服務(wù)URL列表
    @Override
    public List<URL> lookup(URL url) {
        // 查找的結(jié)果數(shù)據(jù)
        List<URL> result = new ArrayList<URL>();
        // 獲取注冊(cè)信息中的分類(lèi)服務(wù)列表信息
        Map<String, List<URL>> notifiedUrls = getNotified().get(url);
        // 如果該消費(fèi)者訂閱了服務(wù)
        if (notifiedUrls != null && notifiedUrls.size() > 0) {
            for (List<URL> urls : notifiedUrls.values()) {
                for (URL u : urls) {
                    // 把非空的加入結(jié)果集中
                    if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
                        result.add(u);
                    }
                }
            }
        } else {
            // 如果沒(méi)有訂閱服務(wù)
            // 使用原子類(lèi)以保證在獲取注冊(cè)在注冊(cè)中心的服務(wù)url時(shí)能夠保證是最新的url集合
            final AtomicReference<List<URL>> reference = new AtomicReference<List<URL>>();
            // 通知監(jiān)聽(tīng)器地淀。當(dāng)收到服務(wù)變更通知時(shí)觸發(fā)
            NotifyListener listener = new NotifyListener() {
                @Override
                public void notify(List<URL> urls) {
                    reference.set(urls);
                }
            };
            // 添加這個(gè)服務(wù)的監(jiān)聽(tīng)器
            subscribe(url, listener); // Subscribe logic guarantees the first notify to return
            List<URL> urls = reference.get();
            // 然后把非空結(jié)果放入結(jié)果集中
            if (urls != null && !urls.isEmpty()) {
                for (URL u : urls) {
                    if (!Constants.EMPTY_PROTOCOL.equals(u.getProtocol())) {
                        result.add(u);
                    }
                }
            }
        }
        return result;
    }
2.1.3.3 register and unregister
  • url注冊(cè)和取消注冊(cè)代碼很簡(jiǎn)單梧疲,就是向registered中add或者remove url
    @Override
    public void register(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("register url == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Register: " + url);
        }
        registered.add(url);
    }

    @Override
    public void unregister(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("unregister url == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Unregister: " + url);
        }
        registered.remove(url);
    }
2.1.3.4 notify
    protected void notify(List<URL> urls) {
        if (urls == null || urls.isEmpty()) return;
        // 遍歷已訂閱的URL
        for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
            URL url = entry.getKey();

            if (!UrlUtils.isMatch(url, urls.get(0))) {
                continue;
            }
            // 通知URL對(duì)應(yīng)的監(jiān)聽(tīng)器
            Set<NotifyListener> listeners = entry.getValue();
            if (listeners != null) {
                for (NotifyListener listener : listeners) {
                    try {
                        // 通知監(jiān)聽(tīng)器擦耀,看下方代碼注釋
                        notify(url, listener, filterEmpty(url, urls));
                    } catch (Throwable t) {
                        logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
                    }
                }
            }
        }
    }
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        if ((urls == null || urls.isEmpty())
                && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            logger.warn("Ignore empty notify urls for subscribe url " + url);
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
        }
        Map<String, List<URL>> result = new HashMap<String, List<URL>>();
        // 將url進(jìn)行分類(lèi)
        for (URL u : urls) {
            if (UrlUtils.isMatch(url, u)) {
                // 根據(jù)不同的category分別放到不同List中處理 以category的值做分類(lèi)
                String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
                List<URL> categoryList = result.get(category);
                if (categoryList == null) {
                    categoryList = new ArrayList<URL>();
                    result.put(category, categoryList);
                }
                categoryList.add(u);
            }
        }
        // 沒(méi)有分類(lèi)結(jié)果就直接return
        if (result.size() == 0) {
            return;
        }
        // 獲得消費(fèi)者被通知的url的Map
        Map<String, List<URL>> categoryNotified = notified.get(url);
        // 如果沒(méi)有 就創(chuàng)建一個(gè)
        if (categoryNotified == null) {
            notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
            // 創(chuàng)建過(guò)后再獲取
            categoryNotified = notified.get(url);
        }
        // 發(fā)送URL變化給監(jiān)聽(tīng)器
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            // 把分類(lèi)標(biāo)實(shí)和分類(lèi)后的列表放入notified的value中 覆蓋到 `notified`
            // 當(dāng)分類(lèi)的數(shù)據(jù)為空饼酿,依然有urls 榕酒。不過(guò)其中的urls[0].protocol是empty胚膊,以此來(lái)處理所有服務(wù)提供者為空時(shí)的情況。
            categoryNotified.put(category, categoryList);
            // 保存一份到文件緩存中 中間做的 就是解析出參數(shù)然后同步或者異步保存到文件中
            saveProperties(url);
            // 通知監(jiān)聽(tīng)器
            listener.notify(categoryList);
        }
    }
2.1.3.5 subscribe and unsubscribe
  • 注冊(cè)中心服務(wù)實(shí)現(xiàn)的訂閱和取消訂閱

    @Override
    public void subscribe(URL url, NotifyListener listener) {
        if (url == null) {
            throw new IllegalArgumentException("subscribe url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("subscribe listener == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Subscribe: " + url);
        }
        //  獲得url已經(jīng)訂閱的服務(wù)的監(jiān)聽(tīng)器集合
        Set<NotifyListener> listeners = subscribed.get(url);
        if (listeners == null) {
            subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
            listeners = subscribed.get(url);
        }
        // 然后把listener添加到上
        listeners.add(listener);
    }

    @Override
    public void unsubscribe(URL url, NotifyListener listener) {
        if (url == null) {
            throw new IllegalArgumentException("unsubscribe url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("unsubscribe listener == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Unsubscribe: " + url);
        }
        //  獲得url已經(jīng)訂閱的服務(wù)的監(jiān)聽(tīng)器集合
        Set<NotifyListener> listeners = subscribed.get(url);
        if (listeners != null) {
            // 然后移除
            listeners.remove(listener);
        }
    }
2.1.3.6 recover
  • 注冊(cè)中心的連接斷開(kāi)后恢復(fù)時(shí)調(diào)用的方法想鹰,里面其實(shí)就是注冊(cè)和訂閱
    protected void recover() throws Exception {
        // register
        Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
        if (!recoverRegistered.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover register url " + recoverRegistered);
            }
            for (URL url : recoverRegistered) {
                //調(diào)用的上面的注冊(cè)方法
                register(url);
            }
        }
        // subscribe
        Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
        if (!recoverSubscribed.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover subscribe url " + recoverSubscribed.keySet());
            }
            for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    // 調(diào)用上面的訂閱方法
                    subscribe(url, listener);
                }
            }
        }
    }
2.1.3.7 destory
  • 這個(gè)方法是在進(jìn)程關(guān)閉時(shí)紊婉,去取消注冊(cè)和訂閱,實(shí)際上就是調(diào)用unregister和unsubscribe
    @Override
    public void destroy() {
        if (logger.isInfoEnabled()) {
            logger.info("Destroy registry:" + getUrl());
        }
        // 獲取以注冊(cè)的URL
        Set<URL> destroyRegistered = new HashSet<URL>(getRegistered());
        if (!destroyRegistered.isEmpty()) {
            for (URL url : new HashSet<URL>(getRegistered())) {
                if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
                    try {
                        // 取消注冊(cè)
                        unregister(url);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unregister url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        // 獲取已訂閱的URL以及監(jiān)聽(tīng)器
        Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
        if (!destroySubscribed.isEmpty()) {
            for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    try {
                        // 去取消訂閱
                        unsubscribe(url, listener);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unsubscribe url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
    }

2.1.4 FailbackRegistry

  • 這個(gè)類(lèi)其實(shí)是為AbstractRegistry增加了失敗重試的機(jī)制作為抽象能力辑舷,后面不同的注冊(cè)中心具體實(shí)現(xiàn)繼承了這個(gè)類(lèi)就可以直接使用這個(gè)能力喻犁。
  • 類(lèi)圖


    image
  • 常規(guī)套路 類(lèi)的屬性
    // Scheduled executor service
    // 經(jīng)過(guò)固定時(shí)間后(默認(rèn)是5s),調(diào)用FailbackRegistry#retry方法
    private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));

    // Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
    // 失敗重試計(jì)時(shí)器何缓,定期檢查是否有失敗請(qǐng)求肢础,如果有,則無(wú)限制重試
    private final ScheduledFuture<?> retryFuture;
    // 注冊(cè)失敗的集合
    private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
    // 取消注冊(cè)失敗的集合
    private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
    // 發(fā)起訂閱失敗的監(jiān)聽(tīng)器集合
    private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
    // 取消訂閱失敗的監(jiān)聽(tīng)器集合
    private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
    // 通知失敗的URL集合
    private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
    /**
     * The time in milliseconds the retryExecutor will wait
     * RetryExecutor將等待的時(shí)間(毫秒)
     */
    private final int retryPeriod;
2.1.4.1 構(gòu)造方法
    public FailbackRegistry(URL url) {
        super(url);
        // 獲取重試的時(shí)間 如果沒(méi)有就設(shè)置成默認(rèn)的 DEFAULT_REGISTRY_RETRY_PERIOD = 5 * 1000;
        this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        // 設(shè)置重試任務(wù) 里面就是調(diào)用retry方法 見(jiàn)下方retry方法的解析
        this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                // Check and connect to the registry
                try {
                    retry();
                } catch (Throwable t) { // Defensive fault tolerance
                    logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
                }
            }
        }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
    }
2.1.4.2 register and unregister 碌廓、 subscribe and unsubscribe
  • 注冊(cè)和取消注冊(cè)
    @Override
    public void register(URL url) {
        // 緩存等注冊(cè)操作 見(jiàn)AbstractRegistry
        super.register(url);
        // 在失敗集合中將這個(gè)移除
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // Sending a registration request to the server side
            // 向服務(wù)器發(fā)送注冊(cè)請(qǐng)求
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            // 開(kāi)啟了啟動(dòng)時(shí)就檢測(cè)传轰,直接拋異常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // Record a failed registration request to a failed list, retry regularly
            // 記錄失敗的url
            failedRegistered.add(url);
        }
    }
  • 后面的unregister方法,subscribe unsubscribe都類(lèi)似 可以看下源碼氓皱, 中間的doXXXX這幾個(gè)方法都是abstract方法等著后面不同的服務(wù)來(lái)實(shí)現(xiàn)路召。
2.1.4.3 notify
  • notify則與上面的 四個(gè)方法不同,它是默認(rèn)調(diào)用的父類(lèi)AbstractRegistry的notify方法

    @Override
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        try {
            doNotify(url, listener, urls);
        } catch (Exception t) {
            // Record a failed registration request to a failed list, retry regularly
            // 將失敗的注冊(cè)請(qǐng)求記錄到失敗列表波材,定期重試
            Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
            if (listeners == null) {
                failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
                listeners = failedNotified.get(url);
            }
            listeners.put(listener, urls);
            logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }
    }

    protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
        // 注意 這個(gè)是調(diào)用父類(lèi)的
        super.notify(url, listener, urls);
    }

2.1.4.4 recover
  • recover方法也區(qū)別于AbstractRegistry股淡,他是直接添加到失敗重試的集合中,讓定時(shí)任務(wù)自己去重新注冊(cè)和訂閱
@Override
    protected void recover() throws Exception {
        // register
        // 把已注冊(cè)的添加到失敗重試的列表中
        Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
        if (!recoverRegistered.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover register url " + recoverRegistered);
            }
            for (URL url : recoverRegistered) {
                // 添加到失敗重試注冊(cè)列表
                failedRegistered.add(url);
            }
        }
        // subscribe
        // 把已訂閱的添加到失敗重試的列表中
        Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
        if (!recoverSubscribed.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Recover subscribe url " + recoverSubscribed.keySet());
            }
            for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    // 添加到失敗重試訂閱列表
                    addFailedSubscribed(url, listener);
                }
            }
        }
    }
2.1.4.5 retry
  • 重試的方法廷区,其實(shí)也比較簡(jiǎn)單唯灵,就是把集合中的數(shù)據(jù)拿出來(lái),該做注冊(cè)做注冊(cè)隙轻,該訂閱就訂閱埠帕,成功了就從失敗重試集合中移除,失敗了就等下次再來(lái)玖绿。簡(jiǎn)單看下對(duì)注冊(cè)列表的代碼就明白了敛瓷。其他代碼都是類(lèi)似的
// Retry the failed actions
    protected void retry() {
        if (!failedRegistered.isEmpty()) {
            // 不為空就把他URL拿到
            Set<URL> failed = new HashSet<URL>(failedRegistered);
            if (failed.size() > 0) {
                if (logger.isInfoEnabled()) {
                    logger.info("Retry register " + failed);
                }
                try {
                    // 然后遍歷它 做對(duì)應(yīng)的操作
                    for (URL url : failed) {
                        try {
                            // 做注冊(cè)操作
                            doRegister(url);
                            // 移除失敗集合中URL
                            failedRegistered.remove(url);
                        } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                            logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                        }
                    }
                } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                    logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
                }
            }
        }
        ......
    }
2.1.4.6 destroy
    @Override
    public void destroy() {
        // 調(diào)用父類(lèi)的方法
        super.destroy();
        try {
            // 取消執(zhí)行任務(wù)
            retryFuture.cancel(true);
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
        ExecutorUtil.gracefulShutdown(retryExecutor, retryPeriod);
    }
2.1.4.7 待實(shí)現(xiàn)的方法
  • 這些方法都是交給不同的服務(wù)提供組件去自己實(shí)現(xiàn)的,后面的Zookeeper就針對(duì)這些方法做了實(shí)現(xiàn)斑匪。

    // ==== Template method ====

    protected abstract void doRegister(URL url);

    protected abstract void doUnregister(URL url);

    protected abstract void doSubscribe(URL url, NotifyListener listener);

    protected abstract void doUnsubscribe(URL url, NotifyListener listener);

2.2 Registry的相關(guān)Factory的實(shí)現(xiàn)

  • 注冊(cè)中心的工廠類(lèi)呐籽,顧名思義就是生產(chǎn)上面的Registry的實(shí)現(xiàn)。
image

2.2.1 RegistryFactory

@SPI("dubbo")
public interface RegistryFactory {

    // 這個(gè)接口方法實(shí)際上就是獲取對(duì)注冊(cè)中心的連接蚀瘸,然后返回不同注冊(cè)中心的不同Regsitry的實(shí)現(xiàn)對(duì)象狡蝶,
    // 注解就是根據(jù)設(shè)置不同的protocol(協(xié)議)來(lái)選擇不同的實(shí)現(xiàn),
    // 比如Zookeeper贮勃,就會(huì)去使用Zookeeper的ZookeeperRegistryFactory,具體怎么選擇贪惹,后續(xù)博客再寫(xiě)
    @Adaptive({"protocol"})
    Registry getRegistry(URL url);
}

2.2.2 AbstractRegistryFactory

  • 類(lèi)圖
image
  • 這個(gè)抽象類(lèi)還是相對(duì)來(lái)說(shuō)比較簡(jiǎn)答的。咱們看一下他的類(lèi)屬性

    // 注冊(cè)中心獲取過(guò)程的鎖
    private static final ReentrantLock LOCK = new ReentrantLock();

    // 注冊(cè)中心Map<注冊(cè)地址寂嘉,registry> 一個(gè)類(lèi)的緩存奏瞬。
    private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();

2.2.2.1 getRegistryies
  • 獲取所有的registry對(duì)象
    /**
     * Get all registries
     * 獲取所有的registry對(duì)象
     * @return all registries
     */
    public static Collection<Registry> getRegistries() {
        //得到一個(gè)集合的鏡像,它的返回結(jié)果不可直接被改變,否則會(huì)報(bào)錯(cuò)
        return Collections.unmodifiableCollection(REGISTRIES.values());
    }

2.2.2.2 destoryAll
  • 關(guān)閉所有已創(chuàng)建的registry對(duì)象
/**
     * Close all created registries
     * 關(guān)閉所有已創(chuàng)建的registry對(duì)象
     */
    // TODO: 2017/8/30 to move somewhere else better
    public static void destroyAll() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Close all registries " + getRegistries());
        }
        //對(duì)注冊(cè)中心關(guān)閉操作加鎖
        LOCK.lock();
        try {
            // 遍歷所有的注冊(cè)中心的操作類(lèi)枫绅,然后調(diào)用destroy來(lái)銷(xiāo)毀。
            for (Registry registry : getRegistries()) {
                try {
                    registry.destroy();
                } catch (Throwable e) {
                    LOGGER.error(e.getMessage(), e);
                }
            }
            // 然后清除集合
            REGISTRIES.clear();
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
2.2.2.3 getRegistry
  • 獲取對(duì)應(yīng)注冊(cè)中心的操作實(shí)現(xiàn)類(lèi)
    @Override
    public Registry getRegistry(URL url) {
        // 通過(guò)URL來(lái)獲取到注冊(cè)中心的類(lèi)型
        url = url.setPath(RegistryService.class.getName())
                .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
        String key = url.toServiceStringWithoutResolving();
        // 鎖定注冊(cè)中心訪問(wèn)進(jìn)程以確保注冊(cè)表的單個(gè)實(shí)例
        LOCK.lock();
        try {
            // 通過(guò)key來(lái)拿到對(duì)應(yīng)的注冊(cè)中心的操作類(lèi)
            Registry registry = REGISTRIES.get(key);
            // 有就直接返回
            if (registry != null) {
                return registry;
            }
            // 沒(méi)有就創(chuàng)建對(duì)應(yīng)的注冊(cè)中心操作類(lèi)
            registry = createRegistry(url);
            // 如果創(chuàng)建失敗丝格,報(bào)錯(cuò)
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            // 創(chuàng)建成功就放到結(jié)合中
            REGISTRIES.put(key, registry);
            // 然后再返回
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
2.2.2.4 createRegistry
  • 抽象方法撑瞧,沒(méi)有實(shí)現(xiàn)棵譬,需要不同的服務(wù)提供工廠對(duì)象來(lái)自己實(shí)現(xiàn)對(duì)應(yīng)的創(chuàng)建方法
    protected abstract Registry createRegistry(URL url);

2.3 Consumer And Provider InvokerWrapper

  • 實(shí)現(xiàn)Invoker接口显蝌,主要包裝消費(fèi)者和服務(wù)提供者的屬性
  • 主要為QOS提供服務(wù) 官方地址:http://dubbo.apache.org/zh-cn/docs/user/references/qos.html
  • 什么是QOS? qos-server,是dubbo在線運(yùn)維命令服務(wù)订咸,默認(rèn)端口號(hào)為:2222曼尊,用于接口命令,運(yùn)維dubbo脏嚷。

2.3.1 ConsumerInvokerWrapper

    // invoker對(duì)象
    private Invoker<T> invoker;
    // 原始的URL地址
    private URL originUrl;
    // 注冊(cè)中心的地址
    private URL registryUrl;
    // 消費(fèi)者的地址
    private URL consumerUrl;
    // 注冊(cè)中心的Directory
    private RegistryDirectory registryDirectory;

2.3.2 ProviderInvokerWrapper

    // invoker對(duì)象
    private Invoker<T> invoker;
    // 原始的URL地址
    private URL originUrl;
    // 注冊(cè)中心的地址
    private URL registryUrl;
    // 提供者的地址
    private URL providerUrl;
    // 是否注冊(cè)
    private volatile boolean isReg;

2.4 ProviderConsumerRegTable

  • 這個(gè)類(lèi)是消費(fèi)者和服務(wù)提供者的注冊(cè)表操作骆撇,也是用在QOS中。
  • 主要類(lèi)屬性
    // 服務(wù)提供者的Invokers集合
    public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>();
    // 服務(wù)消費(fèi)者的Invokers集合
    public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>();
  • 類(lèi)圖
image
  • 里面就是一些對(duì)類(lèi)屬性集合的操作父叙,主要是QOS會(huì)用神郊。

2.5 RegistryStatusChecker

  • 這個(gè)類(lèi)就一個(gè)方法 check方法,主要是做狀態(tài)校驗(yàn)趾唱。做注冊(cè)中心相關(guān)的狀態(tài)檢查校驗(yàn)
  • 類(lèi)上面的@Activate 注解 使這個(gè)類(lèi)自動(dòng)被激活加載涌乳。
    @Override
    public Status check() {
        // 獲取所有的注冊(cè)中心的對(duì)象
        Collection<Registry> registries = AbstractRegistryFactory.getRegistries();
        if (registries.isEmpty()) {
            return new Status(Status.Level.UNKNOWN);
        }
        Status.Level level = Status.Level.OK;
        StringBuilder buf = new StringBuilder();
        // 遍歷
        for (Registry registry : registries) {
            if (buf.length() > 0) {
                buf.append(",");
            }
            // 把地址拼接到一起
            buf.append(registry.getUrl().getAddress());
            // 如果注冊(cè)中心的某個(gè)節(jié)點(diǎn)不可用就把狀態(tài)設(shè)置成error
            if (!registry.isAvailable()) {
                level = Status.Level.ERROR;
                buf.append("(disconnected)");
            } else {
                buf.append("(connected)");
            }
        }
        // 然后返回價(jià)差的結(jié)果對(duì)象
        return new Status(level, buf.toString());
    }

2.5 RegistryDirectory and RegistryProtocol

  • 這兩個(gè)類(lèi)后續(xù)再說(shuō)。牽涉到其他地方的一些東西甜癞。

3. dubbo-registry-zookeeper

  • 不知道大家看到這里有沒(méi)有忘記這張圖

  • 模塊關(guān)系圖


    image
  • 所有的注冊(cè)中心實(shí)現(xiàn)FailbackRegistry 和 AbstractRegistryFactory來(lái)實(shí)現(xiàn)對(duì)應(yīng)的功能夕晓。

  • 那么Zookeeper也是如此。Zookeeper主要就只有兩個(gè)類(lèi)

image
    1. 是ZookeeperRegistry
    1. 是ZookeeperRegistryFactory來(lái)實(shí)現(xiàn)對(duì)應(yīng)的功能

3.1 Dubbo在Zookeeper中的數(shù)據(jù)結(jié)構(gòu)

  • dubbo在使用Zookeeper時(shí)只會(huì)創(chuàng)建永久節(jié)點(diǎn)和臨時(shí)節(jié)點(diǎn)悠咱。
image
  • 根節(jié)點(diǎn)是注冊(cè)中心分組蒸辆,下面是很多的服務(wù)接口,分組來(lái)自用戶(hù)配置的<dubbo:registry>中的group屬性析既,默認(rèn)是/dubbo躬贡。
  • 服務(wù)接口下是如圖所示的四種服務(wù)目錄,都是持久節(jié)點(diǎn)眼坏。
  • 服務(wù)提供者路徑/dubbo/service/providers (這里方便標(biāo)識(shí)全部都用service替代接口com.demo.DemoService)拂玻,下面包含接口的多個(gè)服務(wù)提供者者的URL元數(shù)據(jù)信息。
  • 服務(wù)提供者路徑/dubbo/service/consumers空骚,下面包含接口有多個(gè)消費(fèi)者的URL元數(shù)據(jù)信息
  • 服務(wù)提供者路徑/dubbo/service/routers纺讲,下面包含多個(gè)用于消費(fèi)者路由策略URL元數(shù)據(jù)信息。
  • 服務(wù)提供者路徑/dubbo/service/configurators囤屹,下面包含多個(gè)用于服務(wù)提供者動(dòng)態(tài)配置的URL元數(shù)據(jù)信息熬甚。

在Dubbo框架啟動(dòng)時(shí)會(huì)根據(jù)我們所寫(xiě)的服務(wù)相關(guān)的配置在注冊(cè)中心創(chuàng)建4個(gè)目錄,在providers和consumers目錄中分別存儲(chǔ)服務(wù)提供方肋坚、消費(fèi)方元數(shù)據(jù)信息乡括。包括:IP肃廓、端口、權(quán)重和應(yīng)用名等數(shù)據(jù)诲泌。

  • 目錄包含信息
目錄名稱(chēng) 存儲(chǔ)值樣例
/dubbo/service/providers dubbo://192.168.1.1.20880/com.demo.DemoService?key=value&...
/dubbo/service/consumers dubbo://192.168.1.1.5002/com.demo.DemoService?key=value&...
/dubbo/service/routers condition://0.0.0.0/com.demo.DemoService?category=routers&key=value&...
/dubbo/service/configurators override://0.0.0.0/com.demo.DemoService?category=configurators&key=value&...
  • 在Dubbo中啟用注冊(cè)中心:
<beans>
    <!-- 適用于Zookeeper一個(gè)集群有多個(gè)節(jié)點(diǎn)盲赊,多個(gè)IP和端口用逗號(hào)分隔-->
    <dubbo:registry protocol="zookeeper" address="ip:port;ip:port">
    <!-- 適用于Zookeeper多個(gè)集群有多個(gè)節(jié)點(diǎn),多個(gè)IP和端口用豎線分隔-->
    <dubbo:registry protocol="zookeeper" address="ip:port|ip:port">
</beans>

3.2 ZookeeperRegistry

  • 慣例給大家一張類(lèi)圖
image
  • 然后看下屬性
    // Zookeeper的默認(rèn)端口號(hào)
    private final static int DEFAULT_ZOOKEEPER_PORT = 2181;

    // Dubbo在Zookeeper中注冊(cè)的默認(rèn)根節(jié)點(diǎn)
    private final static String DEFAULT_ROOT = "dubbo";

    // 組的名稱(chēng) 或者說(shuō)是 根節(jié)點(diǎn)的值
    private final String root;

    // 服務(wù)集合
    private final Set<String> anyServices = new ConcurrentHashSet<String>();

    // zk節(jié)點(diǎn)的監(jiān)聽(tīng)器
    // Dubbo底層封裝了2套Zookeeper API敷扫,所以通過(guò)ChildListener抽象了監(jiān)聽(tīng)器哀蘑,
    // 但是在實(shí)際調(diào)用時(shí)會(huì)通過(guò)createTargetChildListener轉(zhuǎn)為對(duì)應(yīng)框架的監(jiān)聽(tīng)器實(shí)現(xiàn)
    private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();

    // zk的客戶(hù)端, 對(duì)節(jié)點(diǎn)進(jìn)行一些刪改等操作
    private final ZookeeperClient zkClient;
  • 關(guān)于Dubbo中的Zookeeper客戶(hù)端葵第,Dubbo實(shí)現(xiàn)了一個(gè)統(tǒng)一的Client API绘迁,但是用兩種不同的Zookeeper開(kāi)源庫(kù)來(lái)實(shí)現(xiàn),一個(gè)是Apache的Curator卒密,另一個(gè)是zkClient 如果用戶(hù)不設(shè)置缀台,則默認(rèn)使用Curator實(shí)現(xiàn)。

3.2.1 構(gòu)造方法

  • 構(gòu)造方法比較簡(jiǎn)單哮奇,就是獲取組名膛腐,連接Zookeeper

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        // 調(diào)用FailbackRegistry的構(gòu)造方法
        super(url);

        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        // 獲取組名稱(chēng) 并復(fù)制給root
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;
        // 連接上Zookeeper
        zkClient = zookeeperTransporter.connect(url);
        // 添加連接狀態(tài)監(jiān)聽(tīng)器
        zkClient.addStateListener(new StateListener() {
            @Override
            public void stateChanged(int state) {
                if (state == RECONNECTED) {
                    try {
                        // 重連恢復(fù)
                        recover();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        });
    }

3.2.2 服務(wù)注冊(cè)發(fā)布與服務(wù)下線取消注冊(cè)

  • 也比較簡(jiǎn)單就是創(chuàng)建節(jié)點(diǎn)和刪除節(jié)點(diǎn)
    // 發(fā)布
    @Override
    protected void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

    // 取消發(fā)布
    @Override
    protected void doUnregister(URL url) {
        try {
            zkClient.delete(toUrlPath(url));
        } catch (Throwable e) {
            throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

3.2.3 服務(wù)訂閱和取消訂閱

  • 訂閱有pull和push兩種方式,一種是客戶(hù)端定時(shí)輪詢(xún)注冊(cè)中心拉去配置鼎俘,另一種是注冊(cè)中心主動(dòng)推送數(shù)據(jù)給客戶(hù)端哲身。Dubbo目前采用的是第一次啟動(dòng)拉取然后接受事件再重新拉取。
  • 再暴露服務(wù)的時(shí)候而芥,服務(wù)端會(huì)訂閱configurators監(jiān)聽(tīng)動(dòng)態(tài)配置律罢,消費(fèi)端啟動(dòng)的時(shí)候回訂閱providers、routers棍丐、configurators類(lèi)接收這三者的變更通知误辑。
  • Dubbo在實(shí)現(xiàn)Zookeeper注冊(cè)中心的時(shí)候是,客戶(hù)端第一次連接獲取全量數(shù)據(jù)歌逢,然后在訂閱節(jié)點(diǎn)上注冊(cè)一個(gè)watcher巾钉,客戶(hù)端與注冊(cè)中心之間保持TCP長(zhǎng)連接,后續(xù)有節(jié)點(diǎn)發(fā)生變化則會(huì)觸發(fā)watcher事件來(lái)把對(duì)應(yīng)節(jié)點(diǎn)下的全量數(shù)據(jù)拉取過(guò)來(lái)秘案。
3.2.3.1 doSubscribe
    @Override
    protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            // 訂閱所有數(shù)據(jù)
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    // 為空則把listeners放入到緩存的Map中
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }

                ChildListener zkListener = listeners.get(listener);
                // 創(chuàng)建子節(jié)點(diǎn)監(jiān)聽(tīng)器砰苍,對(duì)root下的子節(jié)點(diǎn)做監(jiān)聽(tīng),一旦有子節(jié)點(diǎn)發(fā)生改變阱高,
                // 那么就對(duì)這個(gè)節(jié)點(diǎn)進(jìn)行訂閱.
                if (zkListener == null) {
                    // zkListener為空說(shuō)明是第一次拉取赚导,則新建一個(gè)listener
                    listeners.putIfAbsent(listener, new ChildListener() {
                        // 節(jié)點(diǎn)變更時(shí),觸發(fā)通知時(shí)執(zhí)行
                        @Override
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            for (String child : currentChilds) {
                                // 遍歷所有節(jié)點(diǎn)
                                child = URL.decode(child);
                                // 如果有子節(jié)點(diǎn)還未被訂閱賊說(shuō)明是新節(jié)點(diǎn)赤惊,
                                if (!anyServices.contains(child)) {
                                    // 加入到集合中
                                    anyServices.add(child);
                                    //就訂閱之
                                    subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                                }
                            }
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                // 創(chuàng)建持久節(jié)點(diǎn)root吼旧,接下來(lái)訂閱持久節(jié)點(diǎn)的子節(jié)點(diǎn)
                zkClient.create(root, false);
                // 添加root節(jié)點(diǎn)的子節(jié)點(diǎn)監(jiān)聽(tīng)器,并返回當(dāng)前的services
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (services != null && !services.isEmpty()) {
                    // 遍歷所有的子節(jié)點(diǎn)進(jìn)行訂閱
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        // 增加當(dāng)前節(jié)點(diǎn)的訂閱未舟,并且會(huì)返回改節(jié)點(diǎn)下所有子節(jié)點(diǎn)的列表
                        subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }

                // 訂閱類(lèi)別服務(wù)
            } else {
                List<URL> urls = new ArrayList<URL>();
                // 將url轉(zhuǎn)變成
                //  /dubbo/com.demo.DemoService/providers
                // /dubbo/com.demo.DemoService/configurators
                //  /dubbo/com.demo.DemoService/routers
                // 根據(jù)url類(lèi)別獲取一組要訂閱的路徑 
                for (String path : toCategoriesPath(url)) {
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    // 如果緩存沒(méi)有圈暗,則添加到緩存中
                    if (listeners == null) {
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    // 同樣如果監(jiān)聽(tīng)器緩存中沒(méi)有 則放入緩存
                    if (zkListener == null) {
                        listeners.putIfAbsent(listener, new ChildListener() {
                            @Override
                            public void childChanged(String parentPath, List<String> currentChilds) {
                                // 通知節(jié)點(diǎn)變化
                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    zkClient.create(path, false);
                    // 訂閱并返回該節(jié)點(diǎn)下的子路徑并緩存
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        // 有子節(jié)點(diǎn)組裝掂为,沒(méi)有那么就將消費(fèi)者的協(xié)議變成empty作為url。
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                // 回調(diào)NotifyListener员串,更新本地緩存信息
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
3.2.3.2 doUnsubscribe
    @Override
    protected void doUnsubscribe(URL url, NotifyListener listener) {
        // 通過(guò)url把監(jiān)聽(tīng)器全部拿到
        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
        if (listeners != null) {
            ChildListener zkListener = listeners.get(listener);
            if (zkListener != null) {
                // 直接刪除group下所有的
                if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                    String root = toRootPath();
                    // 移除監(jiān)聽(tīng)器
                    zkClient.removeChildListener(root, zkListener);
                } else {
                     // 移除類(lèi)別服務(wù)下的監(jiān)聽(tīng)器
                    for (String path : toCategoriesPath(url)) {
                        zkClient.removeChildListener(path, zkListener);
                    }
                }
            }
        }
    }
3.2.3.3 其他
  • 其他代碼相對(duì)來(lái)說(shuō)不是很復(fù)雜可以自行看一下勇哗。

3.3 ZookeeperRegistryFactory

  • 工廠類(lèi)的代碼極其短,隨意看下寸齐。
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

    private ZookeeperTransporter zookeeperTransporter;

    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
        this.zookeeperTransporter = zookeeperTransporter;
    }

    @Override
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

}

3.3.1 關(guān)于ZookeeperTransporter

@SPI("curator")
public interface ZookeeperTransporter {

    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    ZookeeperClient connect(URL url);

}

  • 上面我提到過(guò)欲诺,Dubbo用Zookeeper的時(shí)候用了兩種方式實(shí)現(xiàn),一個(gè)是Apache Curator访忿,另一個(gè)是zkClient瞧栗,這個(gè)類(lèi)就是做看了一個(gè)轉(zhuǎn)換斯稳。如下圖


    image
  • 兩個(gè)類(lèi)都實(shí)現(xiàn)了該接口來(lái)向外提供統(tǒng)一的ZookeeperClient海铆。

  • 這個(gè)實(shí)現(xiàn)在remoting模塊。暫時(shí)就不講了挣惰。

4. 結(jié)語(yǔ)

  • 整個(gè)模塊卧斟,其他的Redis、Nacos等實(shí)現(xiàn)都是根據(jù)不同組件的特點(diǎn)來(lái)實(shí)現(xiàn)憎茂。功能都一樣珍语,只是實(shí)現(xiàn)不一樣,大家可以自己去探索一下竖幔。
  • 整個(gè)模塊中我們單獨(dú)看的話主要是就是一個(gè)實(shí)現(xiàn)板乙,一個(gè)工廠,里面牽涉到了本地緩存拳氢、重試這些機(jī)制募逞。代碼量不是很大。認(rèn)真看還是不難的馋评。其中特別需要注意的就是注冊(cè)中心的數(shù)據(jù)結(jié)構(gòu) 和 發(fā)布訂閱這些的實(shí)現(xiàn)了放接。
  • 結(jié)語(yǔ)有點(diǎn)亂。就這樣留特,不足之處希望留言指出纠脾,后續(xù)優(yōu)化。蜕青!感謝9兜浮!右核!

關(guān)于我

  • 坐標(biāo)杭州慧脱,普通本科在讀,計(jì)算機(jī)科學(xué)與技術(shù)專(zhuān)業(yè)蒙兰,20年畢業(yè)磷瘤,目前處于實(shí)習(xí)階段芒篷。
  • 主要做Java開(kāi)發(fā),會(huì)寫(xiě)點(diǎn)Golang采缚、Shell针炉。對(duì)微服務(wù)、大數(shù)據(jù)比較感興趣扳抽,預(yù)備做這個(gè)方向篡帕。
  • 目前處于菜鳥(niǎo)階段,各位大佬輕噴贸呢,小弟正在瘋狂學(xué)習(xí)镰烧。
  • 歡迎大家和我交流鴨!@阆荨怔鳖!
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市固蛾,隨后出現(xiàn)的幾起案子结执,更是在濱河造成了極大的恐慌,老刑警劉巖艾凯,帶你破解...
    沈念sama閱讀 222,000評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件献幔,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡趾诗,警方通過(guò)查閱死者的電腦和手機(jī)蜡感,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,745評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)恃泪,“玉大人郑兴,你說(shuō)我怎么就攤上這事∥虮茫” “怎么了杈笔?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,561評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)糕非。 經(jīng)常有香客問(wèn)我蒙具,道長(zhǎng),這世上最難降的妖魔是什么朽肥? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,782評(píng)論 1 298
  • 正文 為了忘掉前任禁筏,我火速辦了婚禮,結(jié)果婚禮上衡招,老公的妹妹穿的比我還像新娘篱昔。我一直安慰自己驶悟,他們只是感情好膨桥,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,798評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布桶雀。 她就那樣靜靜地躺著勋眯,像睡著了一般。 火紅的嫁衣襯著肌膚如雪穗椅。 梳的紋絲不亂的頭發(fā)上辨绊,一...
    開(kāi)封第一講書(shū)人閱讀 52,394評(píng)論 1 310
  • 那天,我揣著相機(jī)與錄音匹表,去河邊找鬼门坷。 笑死,一個(gè)胖子當(dāng)著我的面吹牛袍镀,可吹牛的內(nèi)容都是我干的默蚌。 我是一名探鬼主播,決...
    沈念sama閱讀 40,952評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼苇羡,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼绸吸!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起宣虾,我...
    開(kāi)封第一講書(shū)人閱讀 39,852評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤惯裕,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后绣硝,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,409評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡撑刺,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,483評(píng)論 3 341
  • 正文 我和宋清朗相戀三年鹉胖,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片够傍。...
    茶點(diǎn)故事閱讀 40,615評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡甫菠,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出冕屯,到底是詐尸還是另有隱情寂诱,我是刑警寧澤,帶...
    沈念sama閱讀 36,303評(píng)論 5 350
  • 正文 年R本政府宣布安聘,位于F島的核電站痰洒,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏浴韭。R本人自食惡果不足惜丘喻,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,979評(píng)論 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望念颈。 院中可真熱鬧泉粉,春花似錦、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,470評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至讨彼,卻和暖如春财边,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背点骑。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,571評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工酣难, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人黑滴。 一個(gè)月前我還...
    沈念sama閱讀 49,041評(píng)論 3 377
  • 正文 我出身青樓憨募,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親袁辈。 傳聞我的和親對(duì)象是個(gè)殘疾皇子菜谣,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,630評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容