Dubbo-服務(wù)字典(6)

1. 簡介

本篇文章皿伺,將開始分析 Dubbo 集群容錯方面的源碼。集群容錯源碼包含四個部分盒粮,分別是服務(wù)目錄 Directory心傀、服務(wù)路由 Router、集群 Cluster 和負(fù)載均衡 LoadBalance拆讯。這幾個部分的源碼邏輯相對比較獨立脂男,我們將會分四篇文章進(jìn)行分析。本篇文章作為集群容錯的開篇文章种呐,將和大家一起分析服務(wù)目錄相關(guān)的源碼宰翅。在進(jìn)行深入分析之前,我們先來了解一下服務(wù)目錄是什么爽室。服務(wù)目錄中存儲了一些和服務(wù)提供者有關(guān)的信息汁讼,通過服務(wù)目錄,服務(wù)消費者可獲取到服務(wù)提供者的信息阔墩,比如 ip嘿架、端口、服務(wù)協(xié)議等啸箫。通過這些信息耸彪,服務(wù)消費者就可通過 Netty 等客戶端進(jìn)行遠(yuǎn)程調(diào)用。在一個服務(wù)集群中忘苛,服務(wù)提供者數(shù)量并不是一成不變的蝉娜,如果集群中新增了一臺機器,相應(yīng)地在服務(wù)目錄中就要新增一條服務(wù)提供者記錄扎唾≌俅ǎ或者,如果服務(wù)提供者的配置修改了胸遇,服務(wù)目錄中的記錄也要做相應(yīng)的更新荧呐。如果這樣說,服務(wù)目錄和注冊中心的功能不就雷同了嗎纸镊?確實如此倍阐,這里這么說是為了方便大家理解。實際上服務(wù)目錄在獲取注冊中心的服務(wù)配置信息后薄腻,會為每條配置信息生成一個 Invoker 對象收捣,并把這個 Invoker 對象存儲起來,這個 Invoker 才是服務(wù)目錄最終持有的對象庵楷。Invoker 有什么用呢?看名字就知道了,這是一個具有遠(yuǎn)程調(diào)用功能的對象尽纽。講到這大家應(yīng)該知道了什么是服務(wù)目錄了咐蚯,它可以看做是 Invoker 集合,且這個集合中的元素會隨注冊中心的變化而進(jìn)行動態(tài)調(diào)整弄贿。

2. 繼承體系

服務(wù)目錄目前內(nèi)置的實現(xiàn)有兩個春锋,分別為 StaticDirectory 和 RegistryDirectory,它們均是 AbstractDirectory 的子類差凹。AbstractDirectory 實現(xiàn)了 Directory 接口期奔,這個接口包含了一個重要的方法定義,即 list(Invocation)危尿,用于列舉 Invoker呐萌。下面我們來看一下他們的繼承體系圖。

<img src="https://i.loli.net/2020/04/19/IOH7DwRMLErui9j.png" style="zoom: 50%;" />

如上谊娇,Directory 繼承自 Node 接口肺孤,Node 這個接口繼承者比較多,像 Registry济欢、Monitor赠堵、Invoker 等均繼承了這個接口。這個接口包含了一個獲取配置信息的方法 getUrl法褥,實現(xiàn)該接口的類可以向外提供配置信息茫叭。另外,大家注意看 RegistryDirectory 實現(xiàn)了 NotifyListener 接口半等,當(dāng)注冊中心節(jié)點信息發(fā)生變化后杂靶,RegistryDirectory 可以通過此接口方法得到變更信息,并根據(jù)變更信息動態(tài)調(diào)整內(nèi)部 Invoker 列表酱鸭。

3. 源碼分析

本章將分析 AbstractDirectory 和它兩個子類的源碼吗垮。AbstractDirectory 封裝了 Invoker 列舉流程,具體的列舉邏輯則由子類實現(xiàn)凹髓,這是典型的模板模式烁登。所以,接下來我們先來看一下 AbstractDirectory 的源碼蔚舀。

public List<Invoker<T>> list(Invocation invocation) throws RpcException {
    if (destroyed) {
        throw new RpcException("Directory already destroyed .url: " + getUrl());
    }

    return doList(invocation);
}

protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;

關(guān)于 AbstractDirectory 就分析這么多饵沧,下面開始分析子類的源碼。

3.1 StaticDirectory

StaticDirectory 即靜態(tài)服務(wù)目錄赌躺,顧名思義狼牺,它內(nèi)部存放的 Invoker 是不會變動的。所以礼患,理論上它和不可變 List 的功能很相似是钥。

public class StaticDirectory<T> extends AbstractDirectory<T> {
    
    // Invoker 列表
    private final List<Invoker<T>> invokers;

    // 省略構(gòu)造方法

    @Override
    public Class<T> getInterface() {
        // 獲取接口類
        return invokers.get(0).getInterface();
    }

    @Override
    public List<Invoker<T>> getAllInvokers() {
        return invokers;
    }

    // 檢測服務(wù)目錄是否可用
    @Override
    public boolean isAvailable() {
        if (isDestroyed()) {
            return false;
        }
        for (Invoker<T> invoker : invokers) {
            if (invoker.isAvailable()) {
                // 只要有一個 Invoker 是可用的掠归,就認(rèn)為當(dāng)前目錄是可用的
                return true;
            }
        }
        return false;
    }

    @Override
    public void destroy() {
        if (isDestroyed()) {
            return;
        }
        // 調(diào)用父類銷毀邏輯
        super.destroy();
        // 遍歷 Invoker 列表,并執(zhí)行相應(yīng)的銷毀邏輯
        for (Invoker<T> invoker : invokers) {
            invoker.destroy();
        }
        invokers.clear();
    }

    public void buildRouterChain() {
        RouterChain<T> routerChain = RouterChain.buildChain(getUrl());
        routerChain.setInvokers(invokers);
        this.setRouterChain(routerChain);
    }

    @Override
    protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
        List<Invoker<T>> finalInvokers = invokers;
        if (routerChain != null) {
            try {
                //調(diào)用父類的路由信息查找對應(yīng)的 invoker 
                finalInvokers = routerChain.route(getConsumerUrl(), invocation);
            } catch (Throwable t) {
                logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
            }
        }
        return finalInvokers == null ? Collections.emptyList() : finalInvokers;
    }

}

3.2 RegistryDirectory

RegistryDirectory 是一種動態(tài)服務(wù)目錄悄泥,實現(xiàn)了 NotifyListener 接口虏冻。當(dāng)注冊中心服務(wù)配置發(fā)生變化后,RegistryDirectory 可收到與當(dāng)前服務(wù)相關(guān)的變化弹囚。收到變更通知后厨相,RegistryDirectory 可根據(jù)配置變更信息刷新 Invoker 列表。

public interface NotifyListener {

    /**
     * Triggered when a service change notification is received.
     * <p>
     * Notify needs to support the contract: <br>
     * 1. Always notifications on the service interface and the dimension of the data type. that is, won't notify part of the same type data belonging to one service. Users do not need to compare the results of the previous notification.<br>
     * 2. The first notification at a subscription must be a full notification of all types of data of a service.<br>
     * 3. At the time of change, different types of data are allowed to be notified separately, e.g.: providers, consumers, routers, overrides. It allows only one of these types to be notified, but the data of this type must be full, not incremental.<br>
     * 4. If a data type is empty, need to notify a empty protocol with category parameter identification of url data.<br>
     * 5. The order of notifications to be guaranteed by the notifications(That is, the implementation of the registry). Such as: single thread push, queue serialization, and version comparison.<br>
     *
     * @param urls The list of registered information , is always not empty. The meaning is the same as the return value of {@link org.apache.dubbo.registry.RegistryService#lookup(URL)}.
     */
    void notify(List<URL> urls);

}

RegistryDirectory 中有幾個比較重要的邏輯:

  1. Invoker 的列舉
  2. 接收服務(wù)配置變更
  3. Invoker 列表的刷新
public List<Invoker<T>> doList(Invocation invocation) {
    if (forbidden) {
        // 服務(wù)提供者關(guān)閉或禁用了服務(wù)鸥鹉,此時拋出 No provider 異常
    }

    if (multiGroup) {
        return this.invokers == null ? Collections.emptyList() : this.invokers;
    }

    List<Invoker<T>> invokers = null;
    try {
        // Get invokers from cache, only runtime routers will be executed.
        invokers = routerChain.route(getConsumerUrl(), invocation);
    } catch (Throwable t) {
        // 異常日志
    }

    return invokers == null ? Collections.emptyList() : invokers;
}

后面兩個邏輯可以查看上一節(jié)的方法調(diào)用棧蛮穿。

服務(wù)導(dǎo)入2

我們看一下 RD 的類結(jié)構(gòu),可以看到兩個listener 內(nèi)部類:

image
private static class ReferenceConfigurationListener extends AbstractConfiguratorListener {
    private RegistryDirectory directory;
    private URL url;

    ReferenceConfigurationListener(RegistryDirectory directory, URL url) {
        this.directory = directory;
        this.url = url;
        this.initWith(DynamicConfiguration.getRuleKey(url) + CONFIGURATORS_SUFFIX);
    }

    void stop() {
        this.stopListen(DynamicConfiguration.getRuleKey(url) + CONFIGURATORS_SUFFIX);
    }

    @Override
    protected void notifyOverrides() {
        // to notify configurator/router changes
        directory.refreshInvoker(Collections.emptyList());
    }
}

private static class ConsumerConfigurationListener extends AbstractConfiguratorListener {
    List<RegistryDirectory> listeners = new ArrayList<>();

    ConsumerConfigurationListener() {
        this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX);
    }

    void addNotifyListener(RegistryDirectory listener) {
        this.listeners.add(listener);
    }

    void removeNotifyListener(RegistryDirectory listener) {
        this.listeners.remove(listener);
    }

    @Override
    protected void notifyOverrides() {
        listeners.forEach(listener -> listener.refreshInvoker(Collections.emptyList()));
    }
}

可以看到更新的操作最終都是調(diào)用了 refreshInvoker 方法 毁渗。

/**
 *
 * 將invokerURL列表轉(zhuǎn)換為調(diào)用程序映射践磅。轉(zhuǎn)換的規(guī)則如下:
 * 1、如果URL已轉(zhuǎn)換為invoker祝蝠,它將不再被重新引用并直接從緩存中獲得音诈,請注意URL中的任何參數(shù)更改都將被重新引用。
 * 2绎狭、如果輸入的調(diào)用者列表不是空的细溅,這意味著它是最新的調(diào)用者列表。
 * 3儡嘶、如果傳入的invokerUrl列表為空喇聊,則意味著該規(guī)則只是一個覆蓋規(guī)則或路由規(guī)則,需要對其進(jìn)行重新對比蹦狂,以決定是否重新引用誓篱。
 *
 * @param invokerUrls 參數(shù)不能為空
 */
private void refreshInvoker(List<URL> invokerUrls) {
    Assert.notNull(invokerUrls, "invokerUrls should not be null");

    // invokerUrls 僅有一個元素,且 url 協(xié)議頭為 empty凯楔,此時表示禁用所有服務(wù)
    if (invokerUrls.size() == 1
            && invokerUrls.get(0) != null
            && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        this.forbidden = true; // Forbid to access
        this.invokers = Collections.emptyList();
        routerChain.setInvokers(this.invokers);
        //刪除所有的 invokers
        destroyAllInvokers(); // Close all invokers
    } else {
        this.forbidden = false; // Allow to access
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        if (invokerUrls == Collections.<URL>emptyList()) {
            invokerUrls = new ArrayList<>();
        }
        if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
            invokerUrls.addAll(this.cachedInvokerUrls);
        } else {
            this.cachedInvokerUrls = new HashSet<>();
            // 添加緩存 url 到 invokerUrls 中
            this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
        }
        if (invokerUrls.isEmpty()) {
            return;
        }
        // 將 url 轉(zhuǎn)成 Invoker
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

        /**
         * If the calculation is wrong, it is not processed.
         *
         * 1. The protocol configured by the client is inconsistent with the protocol of the server.
         *    eg: consumer protocol = dubbo, provider only has other protocol services(rest).
         * 2. The registration center is not robust and pushes illegal specification data.
         *
         */
        if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
            logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
                    .toString()));
            return;
        }

        List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
        // pre-route and build cache, notice that route cache should build on original Invoker list.
        // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
        routerChain.setInvokers(newInvokers);
        this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
        this.urlInvokerMap = newUrlInvokerMap;

        try {
            //刪除沒有使用的 invoker
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}

refreshInvoker 方法首先會根據(jù)入?yún)?invokerUrls 的數(shù)量和協(xié)議頭判斷是否禁用所有的服務(wù)窜骄,如果禁用,則將 forbidden 設(shè)為 true摆屯,并銷毀所有的 Invoker邻遏。若不禁用,則將 url 轉(zhuǎn)成 Invoker虐骑,得到 <url, Invoker> 的映射關(guān)系准验。然后進(jìn)一步進(jìn)行轉(zhuǎn)換,得到 <methodName, Invoker 列表> 映射關(guān)系廷没。之后進(jìn)行多組 Invoker 合并操作糊饱,并將合并結(jié)果賦值給 methodInvokerMap。methodInvokerMap 變量在 doList 方法中會被用到颠黎,doList 會對該變量進(jìn)行讀操作另锋,在這里是寫操作滞项。當(dāng)新的 Invoker 列表生成后,還要一個重要的工作要做砰蠢,就是銷毀無用的 Invoker蓖扑,避免服務(wù)消費者調(diào)用已下線的服務(wù)的服務(wù)唉铜。

到此關(guān)于 Invoker 列表的刷新邏輯就分析了台舱,這里對整個過程進(jìn)行簡單總結(jié)。如下:

  1. 檢測入?yún)⑹欠駜H包含一個 url潭流,且 url 協(xié)議頭為 empty
  2. 若第一步檢測結(jié)果為 true竞惋,表示禁用所有服務(wù),此時銷毀所有的 Invoker
  3. 若第一步檢測結(jié)果為 false灰嫉,此時將入?yún)⑥D(zhuǎn)為 Invoker 列表
  4. 對上一步邏輯生成的結(jié)果進(jìn)行進(jìn)一步處理拆宛,得到方法名到 Invoker 的映射關(guān)系表
  5. 合并多組 Invoker
  6. 銷毀無用 Invoker

Invoker 的刷新邏輯還是比較復(fù)雜的,大家在看的過程中多寫點 demo 進(jìn)行調(diào)試讼撒,以加深理解浑厚。

看完了底層更新的刷新 invoker 操作,RegistryDirectory 是一個動態(tài)服務(wù)目錄根盒,會隨注冊中心配置的變化進(jìn)行動態(tài)調(diào)整钳幅。因此 RegistryDirectory 實現(xiàn)了 NotifyListener 接口,通過這個接口獲取注冊中心變更通知炎滞。下面我們來看一下具體的邏輯敢艰。

public synchronized void notify(List<URL> urls) {
    Map<String, List<URL>> categoryUrls = urls.stream()
            .filter(Objects::nonNull)
            .filter(this::isValidCategory)
            .filter(this::isNotCompatibleFor26x)
            .collect(Collectors.groupingBy(this::judgeCategory));

    // 定義三個集合,分別用于存放配置器 url册赛,路由 url钠导,服務(wù)提供者 url
    List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
    this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

    List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
    toRouters(routerURLs).ifPresent(this::addRouters);

    // providers
    List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
    /**
     * 3.x added for extend URL address
     */
    ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
    List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
    if (supportedListeners != null && !supportedListeners.isEmpty()) {
        for (AddressListener addressListener : supportedListeners) {
            providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
        }
    }
    refreshOverrideAndInvoker(providerURLs);
}

    private void refreshOverrideAndInvoker(List<URL> urls) {
        // mock zookeeper://xxx?mock=return null
        overrideDirectoryUrl();
        refreshInvoker(urls);
    }

    private void overrideDirectoryUrl() {
        // merge override parameters
        this.overrideDirectoryUrl = directoryUrl;
        List<Configurator> localConfigurators = this.configurators; // local reference
        doOverrideUrl(localConfigurators);
        List<Configurator> localAppDynamicConfigurators = CONSUMER_CONFIGURATION_LISTENER.getConfigurators(); // local reference
        doOverrideUrl(localAppDynamicConfigurators);
        if (serviceConfigurationListener != null) {
            List<Configurator> localDynamicConfigurators = serviceConfigurationListener.getConfigurators(); // local reference
            doOverrideUrl(localDynamicConfigurators);
        }
    }

    private void doOverrideUrl(List<Configurator> configurators) {
        if (CollectionUtils.isNotEmpty(configurators)) {
            for (Configurator configurator : configurators) {
                this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
            }
        }
    }

4.參考資料

本文參考于Dubbo官網(wǎng),詳情以官網(wǎng)最新文檔為準(zhǔn)森瘪。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末牡属,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子扼睬,更是在濱河造成了極大的恐慌逮栅,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件痰驱,死亡現(xiàn)場離奇詭異证芭,居然都是意外死亡,警方通過查閱死者的電腦和手機担映,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進(jìn)店門废士,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人蝇完,你說我怎么就攤上這事官硝〈H铮” “怎么了?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵氢架,是天一觀的道長傻咖。 經(jīng)常有香客問我,道長岖研,這世上最難降的妖魔是什么卿操? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮孙援,結(jié)果婚禮上害淤,老公的妹妹穿的比我還像新娘。我一直安慰自己拓售,他們只是感情好窥摄,可當(dāng)我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著础淤,像睡著了一般崭放。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上鸽凶,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天币砂,我揣著相機與錄音,去河邊找鬼吱瘩。 笑死道伟,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的使碾。 我是一名探鬼主播蜜徽,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼票摇!你這毒婦竟也來了拘鞋?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤矢门,失蹤者是張志新(化名)和其女友劉穎盆色,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體祟剔,經(jīng)...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡隔躲,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了物延。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片宣旱。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖叛薯,靈堂內(nèi)的尸體忽然破棺而出浑吟,到底是詐尸還是另有隱情笙纤,我是刑警寧澤,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布组力,位于F島的核電站省容,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏燎字。R本人自食惡果不足惜腥椒,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望轩触。 院中可真熱鬧寞酿,春花似錦家夺、人聲如沸脱柱。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽榨为。三九已至,卻和暖如春煌茴,著一層夾襖步出監(jiān)牢的瞬間随闺,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工蔓腐, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留矩乐,地道東北人。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓回论,卻偏偏與公主長得像散罕,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子傀蓉,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,927評論 2 355

推薦閱讀更多精彩內(nèi)容