Dubbo——集群容錯(cuò)(上)

前言

正常情況下臼婆,當(dāng)我們進(jìn)行系統(tǒng)設(shè)計(jì)時(shí)候,不僅要考慮正常邏輯下代碼該如何走,還要考慮異常情況下代碼邏輯應(yīng)該怎么走裹粤。當(dāng)服務(wù)消費(fèi)方調(diào)用服務(wù)提供方的服務(wù)出現(xiàn)錯(cuò)誤時(shí)候,Dubbo提供了多種容錯(cuò)方案蜂林,缺省模式為failover遥诉,也就是失敗重試。

Cluster 接口提供了我們常說的集群容錯(cuò)功能噪叙。

集群中的單個(gè)節(jié)點(diǎn)有一定概率出現(xiàn)一些問題矮锈,例如,磁盤損壞睁蕾、系統(tǒng)崩潰等苞笨,導(dǎo)致節(jié)點(diǎn)無法對(duì)外提供服務(wù),因此在分布式 RPC 框架中子眶,必須要重視這種情況瀑凝。為了避免單點(diǎn)故障,我們的 Provider 通常至少會(huì)部署在兩臺(tái)服務(wù)器上臭杰,以集群的形式對(duì)外提供服務(wù)粤咪,對(duì)于一些負(fù)載比較高的服務(wù),則需要部署更多 Provider 來抗住流量渴杆。

在 Dubbo 中寥枝,通過 Cluster 這個(gè)接口把一組可供調(diào)用的 Provider 信息組合成為一個(gè)統(tǒng)一的 Invoker 供調(diào)用方進(jìn)行調(diào)用。經(jīng)過 Router 過濾磁奖、LoadBalance 選址之后囊拜,選中一個(gè)具體 Provider 進(jìn)行調(diào)用,如果調(diào)用失敗点寥,則會(huì)按照集群的容錯(cuò)策略進(jìn)行容錯(cuò)處理艾疟。

Dubbo 默認(rèn)內(nèi)置了若干容錯(cuò)策略,并且每種容錯(cuò)策略都有自己獨(dú)特的應(yīng)用場(chǎng)景敢辩,我們可以通過配置選擇不同的容錯(cuò)策略。如果這些內(nèi)置容錯(cuò)策略不能滿足需求戚长,我們還可以通過自定義容錯(cuò)策略進(jìn)行配置盗冷。

Cluster 接口與容錯(cuò)機(jī)制

Cluster 的工作流程大致可以分為兩步(如下圖所示):①創(chuàng)建 Cluster Invoker 實(shí)例(在 Consumer 初始化時(shí),Cluster 實(shí)現(xiàn)類會(huì)創(chuàng)建一個(gè) Cluster Invoker 實(shí)例同廉,即下圖中的 merge 操作)仪糖;②使用 Cluster Invoker 實(shí)例(在 Consumer 服務(wù)消費(fèi)者發(fā)起遠(yuǎn)程調(diào)用請(qǐng)求的時(shí)候柑司,Cluster Invoker 會(huì)依賴前面課時(shí)介紹的 Directory、Router锅劝、LoadBalance 等組件得到最終要調(diào)用的 Invoker 對(duì)象)攒驰。

Cluster 核心流程圖

Cluster Invoker 獲取 Invoker 的流程大致可描述為如下:

  • 1、通過 Directory 獲取 Invoker 列表故爵,以 RegistryDirectory 為例玻粪,會(huì)感知注冊(cè)中心的動(dòng)態(tài)變化,實(shí)時(shí)獲取當(dāng)前 Provider 對(duì)應(yīng)的 Invoker 集合诬垂。

  • 2劲室、調(diào)用 Router 的 route() 方法進(jìn)行路由,過濾掉不符合路由規(guī)則的 Invoker 對(duì)象结窘。

  • 3很洋、通過 LoadBalance 從 Invoker 列表中選擇一個(gè) Invoker。

  • 4隧枫、ClusterInvoker 會(huì)將參數(shù)傳給 LoadBalance 選擇出的 Invoker 實(shí)例的 invoke 方法喉磁,進(jìn)行真正的遠(yuǎn)程調(diào)用。

這個(gè)過程是一個(gè)正常流程悠垛,沒有涉及容錯(cuò)處理线定。Dubbo 中常見的容錯(cuò)方式有如下幾個(gè):

  • Failover Cluster:失敗自動(dòng)切換。它是 Dubbo 的默認(rèn)容錯(cuò)機(jī)制确买,在請(qǐng)求一個(gè) Provider 節(jié)點(diǎn)失敗的時(shí)候斤讥,自動(dòng)切換其他 Provider 節(jié)點(diǎn),默認(rèn)執(zhí)行 3 次湾趾,適合冪等操作芭商。當(dāng)然,重試次數(shù)越多搀缠,在故障容錯(cuò)的時(shí)候帶給 Provider 的壓力就越大铛楣,在極端情況下甚至可能造成雪崩式的問題。

  • Failback Cluster:失敗自動(dòng)恢復(fù)艺普。失敗后記錄到隊(duì)列中簸州,通過定時(shí)器重試。

  • Failfast Cluster:快速失敗歧譬。請(qǐng)求失敗后返回異常岸浑,不進(jìn)行任何重試。

  • Failsafe Cluster:失敗安全瑰步。請(qǐng)求失敗后忽略異常矢洲,不進(jìn)行任何重試。

  • Forking Cluster:并行調(diào)用多個(gè) Provider 節(jié)點(diǎn)缩焦,只要有一個(gè)成功就返回读虏。

  • Broadcast Cluster:廣播多個(gè) Provider 節(jié)點(diǎn)责静,只要有一個(gè)節(jié)點(diǎn)失敗就失敗。

  • Available Cluster:遍歷所有的 Provider 節(jié)點(diǎn)盖桥,找到每一個(gè)可用的節(jié)點(diǎn)屠升,就直接調(diào)用凝化。如果沒有可用的 Provider 節(jié)點(diǎn)褐鸥,則直接拋出異常狠毯。

  • Mergeable Cluster:請(qǐng)求多個(gè) Provider 節(jié)點(diǎn)并將得到的結(jié)果進(jìn)行合并。

下面再來看 Cluster 接口靴拱。Cluster 接口是一個(gè)擴(kuò)展接口,通過 @SPI 注解的參數(shù)我們知道其使用的默認(rèn)實(shí)現(xiàn)是 FailoverCluster猾普,它只定義了一個(gè) join() 方法袜炕,在其上添加了 @Adaptive 注解,會(huì)動(dòng)態(tài)生成適配器類初家,其中會(huì)優(yōu)先根據(jù) Directory.getUrl() 方法返回的 URL 中的 cluster 參數(shù)值選擇擴(kuò)展實(shí)現(xiàn)偎窘,若無 cluster 參數(shù)則使用默認(rèn)的 FailoverCluster 實(shí)現(xiàn)。Cluster 接口的具體定義如下所示:

@SPI(Cluster.DEFAULT)
public interface Cluster {
    String DEFAULT = FailoverCluster.NAME;

    @Adaptive
    <T> Invoker<T> join(Directory<T> directory) throws RpcException;

    static Cluster getCluster(String name) {
        return getCluster(name, true);
    }

    static Cluster getCluster(String name, boolean wrap) {
        if (StringUtils.isEmpty(name)) {
            name = Cluster.DEFAULT;
        }
        return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(name, wrap);
    }
}

Cluster 接口的實(shí)現(xiàn)類如下圖所示溜在,分別對(duì)應(yīng)前面提到的多種容錯(cuò)策略:


Cluster 接口繼承關(guān)系

在每個(gè) Cluster 接口實(shí)現(xiàn)中陌知,都會(huì)創(chuàng)建對(duì)應(yīng)的 Invoker 對(duì)象,這些都繼承自 AbstractClusterInvoker 抽象類掖肋,如下圖所示:


AbstractClusterInvoker 繼承關(guān)系圖

通過上面兩張繼承關(guān)系圖我們可以看出仆葡,Cluster 接口和 Invoker 接口都會(huì)有相應(yīng)的抽象實(shí)現(xiàn)類,這些抽象實(shí)現(xiàn)類都實(shí)現(xiàn)了一些公共能力志笼。下面我們就來深入介紹 AbstractClusterInvoker 和 AbstractCluster 這兩個(gè)抽象類沿盅。

AbstractClusterInvoker

了解了 Cluster Invoker 的繼承關(guān)系之后,首先來看 AbstractClusterInvoker纫溃,它有兩點(diǎn)核心功能:一個(gè)是實(shí)現(xiàn)的 Invoker 接口腰涧,對(duì) Invoker.invoke() 方法進(jìn)行通用的抽象實(shí)現(xiàn);另一個(gè)是實(shí)現(xiàn)通用的負(fù)載均衡算法紊浩。

在 AbstractClusterInvoker.invoke() 方法中窖铡,會(huì)通過 Directory 獲取 Invoker 列表,然后通過 SPI 初始化 LoadBalance坊谁,最后調(diào)用 doInvoke() 方法執(zhí)行子類的邏輯费彼。在 Directory.list() 方法返回 Invoker 集合之前,已經(jīng)使用 Router 進(jìn)行了一次篩選呜袁。

public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> {

    @Override
    public Result invoke(final Invocation invocation) throws RpcException {
        // 檢測(cè)當(dāng)前Invoker是否已銷毀
        checkWhetherDestroyed();

        // 將RpcContext中的attachment添加到Invocation中
        Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
        }

        // 通過Directory獲取Invoker對(duì)象列表敌买,通過對(duì)RegistryDirectory的介紹我們知道,其中已經(jīng)調(diào)用了Router進(jìn)行過濾
        List<Invoker<T>> invokers = list(invocation);
        // 通過SPI加載LoadBalance
        LoadBalance loadbalance = initLoadBalance(invokers, invocation);
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        // 調(diào)用doInvoke()方法阶界,該方法是個(gè)抽象方法
        return doInvoke(invocation, invokers, loadbalance);
    }
    
    protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
        // 調(diào)用Directory.list()方法
        return directory.list(invocation);
    }   
}

下面來看一下 AbstractClusterInvoker 是如何按照不同的 LoadBalance 算法從 Invoker 集合中選取最終 Invoker 對(duì)象的虹钮。

AbstractClusterInvoker 并沒有簡單粗暴地使用 LoadBalance.select() 方法完成負(fù)載均衡聋庵,而是做了進(jìn)一步的封裝,具體實(shí)現(xiàn)在 select() 方法中芙粱。在 select() 方法中會(huì)根據(jù)配置決定是否開啟粘滯連接特性祭玉,如果開啟了,則需要將上次使用的 Invoker 緩存起來春畔,只要 Provider 節(jié)點(diǎn)可用就直接調(diào)用脱货,不會(huì)再進(jìn)行負(fù)載均衡。如果調(diào)用失敗律姨,才會(huì)重新進(jìn)行負(fù)載均衡振峻,并且排除已經(jīng)重試過的 Provider 節(jié)點(diǎn)。

public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> {

    // 第一個(gè)參數(shù)是此次使用的LoadBalance實(shí)現(xiàn)择份,第二個(gè)參數(shù)Invocation是此次服務(wù)調(diào)用的上下文信息扣孟,
    // 第三個(gè)參數(shù)是待選擇的Invoker集合,第四個(gè)參數(shù)用來記錄負(fù)載均衡已經(jīng)選出來荣赶、嘗試過的Invoker集合
    protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
                                List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

        if (CollectionUtils.isEmpty(invokers)) {
            return null;
        }
        // 獲取調(diào)用方法名
        String methodName = invocation == null ? StringUtils.EMPTY_STRING : invocation.getMethodName();
        // 獲取sticky配置凤价,sticky表示粘滯連接,所謂粘滯連接是指Consumer會(huì)盡可能地
        // 調(diào)用同一個(gè)Provider節(jié)點(diǎn)拔创,除非這個(gè)Provider無法提供服務(wù)
        boolean sticky = invokers.get(0).getUrl()
                .getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY);

        // 檢測(cè)invokers列表是否包含sticky Invoker利诺,如果不包含,
        // 說明stickyInvoker代表的服務(wù)提供者掛了剩燥,此時(shí)需要將其置空
        if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
            stickyInvoker = null;
        }
        
        // 如果開啟了粘滯連接特性慢逾,需要先判斷這個(gè)Provider節(jié)點(diǎn)是否已經(jīng)重試過了
        if (sticky && stickyInvoker != null // 表示粘滯連接
            && (selected == null || !selected.contains(stickyInvoker))) {// 表示stickyInvoker未重試過
            // 檢測(cè)當(dāng)前stickyInvoker是否可用,如果可用灭红,直接返回stickyInvoker
            if (availablecheck && stickyInvoker.isAvailable()) {
                return stickyInvoker;
            }
        }
        
        // 執(zhí)行到這里氛改,說明前面的stickyInvoker為空,或者不可用
        // 這里會(huì)繼續(xù)調(diào)用doSelect選擇新的Invoker對(duì)象
        Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);

        if (sticky) {
            // 是否開啟粘滯比伏,更新stickyInvoker字段
            stickyInvoker = invoker;
        }
        return invoker;
    }
}

doSelect() 方法主要做了兩件事:

  • 一是通過 LoadBalance 選擇 Invoker 對(duì)象胜卤。

  • 二是如果選出來的 Invoker 不穩(wěn)定或不可用,會(huì)調(diào)用 reselect() 方法進(jìn)行重選赁项。

public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> {

    private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
                                List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
        // 判斷是否需要進(jìn)行負(fù)載均衡葛躏,Invoker集合為空,直接返回null
        if (CollectionUtils.isEmpty(invokers)) {
            return null;
        }
        if (invokers.size() == 1) {
            // 只有一個(gè)Invoker對(duì)象悠菜,直接返回即可
            return invokers.get(0);
        }
        
        // 通過LoadBalance實(shí)現(xiàn)選擇Invoker對(duì)象
        Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

        // 如果LoadBalance選出的Invoker對(duì)象舰攒,已經(jīng)嘗試過請(qǐng)求了或不可用,則需要調(diào)用reselect()方法重選
        if ((selected != null && selected.contains(invoker)) // Invoker已經(jīng)嘗試調(diào)用過了悔醋,但是失敗了
                || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {// Invoker不可用
            try {
                // 調(diào)用reselect()方法重選
                Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
                // 如果重選的Invoker對(duì)象不為空摩窃,則直接返回這個(gè) rInvoker
                if (rInvoker != null) {
                    invoker = rInvoker;
                } else {
                    //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                    int index = invokers.indexOf(invoker);
                    try {
                        // 如果重選的Invoker對(duì)象為空,則返回該Invoker的下一個(gè)Invoker對(duì)象
                        invoker = invokers.get((index + 1) % invokers.size());
                    } catch (Exception e) {
                        logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                    }
                }
            } catch (Throwable t) {
                logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
            }
        }
        return invoker;
    }
}

reselect() 方法會(huì)重新進(jìn)行一次負(fù)載均衡,首先對(duì)未嘗試過的可用 Invokers 進(jìn)行負(fù)載均衡猾愿,如果已經(jīng)全部重試過了鹦聪,則將嘗試過的 Provider 節(jié)點(diǎn)過濾掉,然后在可用的 Provider 節(jié)點(diǎn)中重新進(jìn)行負(fù)載均衡蒂秘。

public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> {

    private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
                                List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException {

        // 用于記錄要重新進(jìn)行負(fù)載均衡的Invoker集合
        List<Invoker<T>> reselectInvokers = new ArrayList<>(
                invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());

        // 將不在selected集合中的Invoker過濾出來進(jìn)行負(fù)載均衡
        for (Invoker<T> invoker : invokers) {
            if (availablecheck && !invoker.isAvailable()) {
                continue;
            }

            if (selected == null || !selected.contains(invoker)) {
                reselectInvokers.add(invoker);
            }
        }
        // reselectInvokers不為空時(shí)泽本,才需要通過負(fù)載均衡組件進(jìn)行選擇
        if (!reselectInvokers.isEmpty()) {
            return loadbalance.select(reselectInvokers, getUrl(), invocation);
        }

        // 只能對(duì)selected集合中可用的Invoker再次進(jìn)行負(fù)載均衡
        if (selected != null) {
            for (Invoker<T> invoker : selected) {
                if ((invoker.isAvailable()) // available first
                        && !reselectInvokers.contains(invoker)) {
                    reselectInvokers.add(invoker);
                }
            }
        }
        if (!reselectInvokers.isEmpty()) {
            return loadbalance.select(reselectInvokers, getUrl(), invocation);
        }

        return null;
    }
}

AbstractCluster

常用的 ClusterInvoker 實(shí)現(xiàn)都繼承了 AbstractClusterInvoker 類型,對(duì)應(yīng)的 Cluster 擴(kuò)展實(shí)現(xiàn)都繼承了 AbstractCluster 抽象類姻僧。AbstractCluster 抽象類的核心邏輯是在 ClusterInvoker 外層包裝一層 ClusterInterceptor规丽,從而實(shí)現(xiàn)類似切面的效果。

下面是 ClusterInterceptor 接口的定義:

@SPI
public interface ClusterInterceptor {
    
    // 前置攔截方法
    void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);

    // 后置攔截方法
    void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);

    // 調(diào)用ClusterInvoker的invoke()方法完成請(qǐng)求
    default Result intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) throws RpcException {
        return clusterInvoker.invoke(invocation);
    }

    // 這個(gè)Listener用來監(jiān)聽請(qǐng)求的正常結(jié)果以及異常
    interface Listener {

        void onMessage(Result appResponse, AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);

        void onError(Throwable t, AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);
    }
}

在 AbstractCluster 抽象類的 join() 方法中撇贺,首先會(huì)調(diào)用 doJoin() 方法獲取最終要調(diào)用的 Invoker 對(duì)象赌莺,doJoin() 是個(gè)抽象方法,由 AbstractCluster 子類根據(jù)具體的策略進(jìn)行實(shí)現(xiàn)松嘶。之后雄嚣,AbstractCluster.join() 方法會(huì)調(diào)用 buildClusterInterceptors() 方法加載 ClusterInterceptor 擴(kuò)展實(shí)現(xiàn)類,對(duì) Invoker 對(duì)象進(jìn)行包裝喘蟆。具體實(shí)現(xiàn)如下:

public abstract class AbstractCluster implements Cluster {

    private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, String key) {
        AbstractClusterInvoker<T> last = clusterInvoker;
        // 通過SPI方式加載ClusterInterceptor擴(kuò)展實(shí)現(xiàn)
        List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtension(clusterInvoker.getUrl(), key);

        if (!interceptors.isEmpty()) {
            for (int i = interceptors.size() - 1; i >= 0; i--) {
                // 將InterceptorInvokerNode收尾連接到一起,形成調(diào)用鏈
                final ClusterInterceptor interceptor = interceptors.get(i);
                final AbstractClusterInvoker<T> next = last;
                last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next);
            }
        }
        return last;
    }

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        // 擴(kuò)展名稱由reference.interceptor參數(shù)確定
        return buildClusterInterceptors(doJoin(directory), directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY));
    }
}

InterceptorInvokerNode 會(huì)將底層的 AbstractClusterInvoker 對(duì)象以及關(guān)聯(lián)的 ClusterInterceptor 對(duì)象封裝到一起鼓鲁,還會(huì)維護(hù)一個(gè) next 引用蕴轨,指向下一個(gè) InterceptorInvokerNode 對(duì)象。

在 InterceptorInvokerNode.invoke() 方法中骇吭,會(huì)先調(diào)用 ClusterInterceptor 的前置邏輯橙弱,然后執(zhí)行 intercept() 方法調(diào)用 AbstractClusterInvoker 的 invoke() 方法完成遠(yuǎn)程調(diào)用,最后執(zhí)行 ClusterInterceptor 的后置邏輯燥狰。具體實(shí)現(xiàn)如下:

public abstract class AbstractCluster implements Cluster {

    protected class InterceptorInvokerNode<T> extends AbstractClusterInvoker<T> {

        private AbstractClusterInvoker<T> clusterInvoker;
        private ClusterInterceptor interceptor;
        private AbstractClusterInvoker<T> next;
        
        @Override
        public Result invoke(Invocation invocation) throws RpcException {
            Result asyncResult;
            try {
                // 前置邏輯
                interceptor.before(next, invocation);
                // 執(zhí)行invoke()方法完成遠(yuǎn)程調(diào)用
                asyncResult = interceptor.intercept(next, invocation);
            } catch (Exception e) {
                // onError callback
                if (interceptor instanceof ClusterInterceptor.Listener) {
                    // 出現(xiàn)異常時(shí)棘脐,會(huì)觸發(fā)監(jiān)聽器的onError()方法
                    ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
                    listener.onError(e, clusterInvoker, invocation);
                }
                throw e;
            } finally {
                // 執(zhí)行后置邏輯
                interceptor.after(next, invocation);
            }
            return asyncResult.whenCompleteWithContext((r, t) -> {
                // onResponse callback
                if (interceptor instanceof ClusterInterceptor.Listener) {
                    ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
                    if (t == null) {
                        // 正常返回時(shí),會(huì)調(diào)用onMessage()方法觸發(fā)監(jiān)聽器
                        listener.onMessage(r, clusterInvoker, invocation);
                    } else {
                        listener.onError(t, clusterInvoker, invocation);
                    }
                }
            });
        }
    }       
}

Dubbo 提供了兩個(gè) ClusterInterceptor 實(shí)現(xiàn)類龙致,分別是 ConsumerContextClusterInterceptor 和 ZoneAwareClusterInterceptor蛀缝,如下圖所示:


ClusterInterceptor 繼承關(guān)系圖

ConsumerContextClusterInterceptor

在 ConsumerContextClusterInterceptor 的 before() 方法中,會(huì)在 RpcContext 中設(shè)置當(dāng)前 Consumer 地址目代、此次調(diào)用的 Invoker 等信息屈梁,同時(shí)還會(huì)刪除之前與當(dāng)前線程綁定的 Server Context。在 after() 方法中榛了,會(huì)刪除本地 RpcContext 的信息在讶。ConsumerContextClusterInterceptor 的具體實(shí)現(xiàn)如下:

@Activate
public class ConsumerContextClusterInterceptor implements ClusterInterceptor, ClusterInterceptor.Listener {

    @Override
    public void before(AbstractClusterInvoker<?> invoker, Invocation invocation) {
        // 獲取當(dāng)前線程綁定的RpcContext
        RpcContext context = RpcContext.getContext();
        // 設(shè)置Invoker、Consumer地址等信息
        context.setInvocation(invocation).setLocalAddress(NetUtils.getLocalHost(), 0);
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(invoker);
        }
        RpcContext.removeServerContext();
    }

    @Override
    public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
        // 刪除本地RpcContext的信息
        RpcContext.removeContext(true);
    }
}

ConsumerContextClusterInterceptor 同時(shí)繼承了 ClusterInterceptor.Listener 接口霜大,在其 onMessage() 方法中构哺,會(huì)獲取響應(yīng)中的 attachments 并設(shè)置到 RpcContext 中的 SERVER_LOCAL 之中,具體實(shí)現(xiàn)如下:

@Activate
public class ConsumerContextClusterInterceptor implements ClusterInterceptor, ClusterInterceptor.Listener {

    @Override
    public void onMessage(Result appResponse, AbstractClusterInvoker<?> invoker, Invocation invocation) {
        // 從AppResponse中獲取attachment战坤,并設(shè)置到SERVER_LOCAL這個(gè)RpcContext中
        RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments());
    }
}

介紹完 ConsumerContextClusterInterceptor曙强,再來看 ZoneAwareClusterInterceptor残拐。

ZoneAwareClusterInterceptor

在 ZoneAwareClusterInterceptor 的 before() 方法中,會(huì)從 RpcContext 中獲取多注冊(cè)中心相關(guān)的參數(shù)并設(shè)置到 Invocation 中(主要是 registry_zone 參數(shù)和 registry_zone_force 參數(shù)旗扑,這兩個(gè)參數(shù)的具體含義蹦骑,在后面分析 ZoneAwareClusterInvoker 時(shí)詳細(xì)介紹),ZoneAwareClusterInterceptor 的 after() 方法為空實(shí)現(xiàn)臀防。ZoneAwareClusterInterceptor 的具體實(shí)現(xiàn)如下:

@Activate(value = "cluster:zone-aware")
public class ZoneAwareClusterInterceptor implements ClusterInterceptor {

    @Override
    public void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
        RpcContext rpcContext = RpcContext.getContext();
        // 從RpcContext中獲取registry_zone參數(shù)和registry_zone_force參數(shù)
        String zone = (String) rpcContext.getAttachment(REGISTRY_ZONE);
        String force = (String) rpcContext.getAttachment(REGISTRY_ZONE_FORCE);
        
        // 檢測(cè)用戶是否提供了ZoneDetector接口的擴(kuò)展實(shí)現(xiàn)
        ExtensionLoader<ZoneDetector> loader = ExtensionLoader.getExtensionLoader(ZoneDetector.class);
        if (StringUtils.isEmpty(zone) && loader.hasExtension("default")) {
            ZoneDetector detector = loader.getExtension("default");
            zone = detector.getZoneOfCurrentRequest(invocation);
            force = detector.isZoneForcingEnabled(invocation, zone);
        }
        
        // 將registry_zone參數(shù)和registry_zone_force參數(shù)設(shè)置到Invocation中
        if (StringUtils.isNotEmpty(zone)) {
            invocation.setAttachment(REGISTRY_ZONE, zone);
        }
        if (StringUtils.isNotEmpty(force)) {
            invocation.setAttachment(REGISTRY_ZONE_FORCE, force);
        }
    }

    @Override
    public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {

    }
}

需要注意的是眠菇,ZoneAwareClusterInterceptor 沒有實(shí)現(xiàn) ClusterInterceptor.Listener 接口,也就是不提供監(jiān)聽響應(yīng)的功能袱衷。

總結(jié)

本文主要介紹的是 Dubbo Cluster 層中容錯(cuò)機(jī)制相關(guān)的內(nèi)容捎废。首先,了解了集群容錯(cuò)機(jī)制的作用致燥。然后登疗,介紹了 Cluster 接口的定義以及其各個(gè)實(shí)現(xiàn)類的核心功能。之后嫌蚤,深入講解了 AbstractClusterInvoker 的實(shí)現(xiàn)辐益,其核心是實(shí)現(xiàn)了一套通用的負(fù)載均衡算法。最后脱吱,還分析了 AbstractCluster 抽象實(shí)現(xiàn)類以及其中涉及的 ClusterInterceptor 接口的內(nèi)容智政。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市箱蝠,隨后出現(xiàn)的幾起案子续捂,更是在濱河造成了極大的恐慌,老刑警劉巖宦搬,帶你破解...
    沈念sama閱讀 212,816評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件牙瓢,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡间校,警方通過查閱死者的電腦和手機(jī)矾克,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來憔足,“玉大人聂渊,你說我怎么就攤上這事∷奶保” “怎么了汉嗽?”我有些...
    開封第一講書人閱讀 158,300評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長找蜜。 經(jīng)常有香客問我饼暑,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,780評(píng)論 1 285
  • 正文 為了忘掉前任弓叛,我火速辦了婚禮彰居,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘撰筷。我一直安慰自己陈惰,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,890評(píng)論 6 385
  • 文/花漫 我一把揭開白布毕籽。 她就那樣靜靜地躺著抬闯,像睡著了一般。 火紅的嫁衣襯著肌膚如雪关筒。 梳的紋絲不亂的頭發(fā)上溶握,一...
    開封第一講書人閱讀 50,084評(píng)論 1 291
  • 那天,我揣著相機(jī)與錄音蒸播,去河邊找鬼睡榆。 笑死,一個(gè)胖子當(dāng)著我的面吹牛袍榆,可吹牛的內(nèi)容都是我干的胀屿。 我是一名探鬼主播,決...
    沈念sama閱讀 39,151評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼包雀,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼宿崭!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起馏艾,我...
    開封第一講書人閱讀 37,912評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎奴愉,沒想到半個(gè)月后琅摩,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,355評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡锭硼,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,666評(píng)論 2 327
  • 正文 我和宋清朗相戀三年房资,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片檀头。...
    茶點(diǎn)故事閱讀 38,809評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡轰异,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出暑始,到底是詐尸還是另有隱情搭独,我是刑警寧澤,帶...
    沈念sama閱讀 34,504評(píng)論 4 334
  • 正文 年R本政府宣布廊镜,位于F島的核電站牙肝,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜配椭,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,150評(píng)論 3 317
  • 文/蒙蒙 一虫溜、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧股缸,春花似錦衡楞、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,882評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至替劈,卻和暖如春寄雀,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背陨献。 一陣腳步聲響...
    開封第一講書人閱讀 32,121評(píng)論 1 267
  • 我被黑心中介騙來泰國打工盒犹, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人眨业。 一個(gè)月前我還...
    沈念sama閱讀 46,628評(píng)論 2 362
  • 正文 我出身青樓急膀,卻偏偏與公主長得像,于是被迫代替她去往敵國和親龄捡。 傳聞我的和親對(duì)象是個(gè)殘疾皇子卓嫂,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,724評(píng)論 2 351

推薦閱讀更多精彩內(nèi)容