dubbo之Cluster(容錯)

在介紹dubbo的cluster之前炊豪,先來看一下cluster在dubbo整體設計中的位置湃缎。按照官網(wǎng)的說法岁忘,Cluster作為路由層,封裝多個提供者的路由及負載均衡深员,并橋接注冊中心负蠕,以 Invoker 為中心,核心擴展接口為 Cluster, Directory, Router, LoadBalance,接口間的依賴關系如下:

# 其中 A->B 表示 A依賴B
Cluster -> Directory & LoadBalance
Directory -> Router 

虛擬Invoker暴露流程程:Cluster => (Directory => Router) => LoadBalance => Invoker,依照這個順序倦畅,我們先來看Cluster遮糖。Cluster不屬于核心層,目的是將多個 Invoker 偽裝成一個 Invoker叠赐,這樣其它人只要關注 Protocol 層 Invoker 即可欲账,加上 Cluster 或者去掉 Cluster 對其它層都不會造成影響,因為只有一個提供者時芭概,是不需要 Cluster 的赛不。本文主要關注Cluster層的容錯及其核心接口(LoadBalance在之前的文章已經(jīng)做過介紹)。

dubbo-framework.jpg

先來看Cluster層中的Cluster接口罢洲,支持SPI擴展踢故、自適應擴展,默認SPI實現(xiàn)是FailOverCluster惹苗,核心只有一個join接口

<T> Invoker<T> join(Directory<T> directory) throws RpcException;

比較好理解殿较,把Directory中的所有原始Invoker合并成一個虛擬Invoker,虛擬Invoker是一系列通過合并原始Invoker鸽粉,并在此基礎上擴展帶有容錯機制的Invoker斜脂。以FailOverCluster為例抓艳,join返回FailoverClusterInvoker触机,具體的invoke邏輯由虛擬Invoker(FailoverClusterInboker)實現(xiàn),構造方法(這里以FailoverClsterInvoker為例,其他虛擬Invoker的構造方法大同小異)通過繼承父類AbstractClusterInvoker實現(xiàn),只有一個Directory參數(shù):

public FailoverClusterInvoker(Directory<T> directory) {
    super(directory);
}

當前dubbo版本提供的虛擬Invoker主要有下面幾種儡首,下面來分別介紹:

  1. 失效轉(zhuǎn)移:FailoverCluster -> FailoverClusterInvoker (Cluster默認SPI實現(xiàn)

    若當前Invoker不可用片任,則重試調(diào)用其他Invoker,重試次數(shù)可以通過URL參數(shù)retries指定蔬胯;假設retries=n对供,那么也就是說,最多重新調(diào)用n次不同的Invoker氛濒。邏輯比較簡單产场,直接來看核心代碼:

    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        //線程封閉,保證并發(fā)安全
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
         // 默認重試3次舞竿,至少重試1一次
        int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            //重試的時候京景,從directory拉取最新的Invoker列表
            if (i > 0) {
                checkWhetherDestroyed();
                copyInvokers = list(invocation);
                // check again
                checkInvokers(copyInvokers, invocation);
            }
            //調(diào)用AbstractClusterInvoker.select方法
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // 若調(diào)用出現(xiàn)異常,異常處理之后骗奖,重試
                Result result = invoker.invoke(invocation);
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        // 重試失敗确徙,直接拋異常
    }
    
  2. 失效恢復:FailbackCluster -> FailbackClusterInvoker

    調(diào)用失敗,則記錄失敗記錄执桌,然后利用HashedWheelTimer定時重試鄙皇,對通知類服務比較有效。核心代碼如下:可以看到仰挣,每次失敗伴逸,都會往定時器的bucket加一條重試任務

    protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            Invoker<T> invoker = null;
            try {
                checkInvokers(invokers, invocation);
                invoker = select(loadbalance, invocation, invokers, null);
                return invoker.invoke(invocation);
            } catch (Throwable e) {
                 //調(diào)用失敗,把當前Invoker包裝成RetryTask椎木,放入HashedWheelTimer的bucket
                logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                        + e.getMessage() + ", ", e);
                addFailed(loadbalance, invocation, invokers, invoker);
                return new RpcResult(); // ignore
            }
        }
    
    // 關注RetryTask的核心run方法
    public void run(Timeout timeout) {
                try {
                     //同樣根據(jù)負載均衡策略违柏,選擇重試的Invoker
                    Invoker<T> retryInvoker = select(loadbalance, invocation, invokers, Collections.singletonList(lastInvoker));
                    lastInvoker = retryInvoker;
                     // 重試
                    retryInvoker.invoke(invocation);
                } catch (Throwable e) {
                    logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
                    if ((++retryTimes) >= retries) {
                        logger.error("Failed retry times exceed threshold (" + retries + "), We have to abandon, invocation->" + invocation);
                    } else {
                         // 再次失敗會重新放進bucket
                        rePut(timeout);
                    }
                }
            }
    
    // 調(diào)用失敗的Invoker,放進定時器的bucket
    private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
         //初始化HashedWheelTimer定時器
        if (failTimer == null) {
            synchronized (this) {
                if (failTimer == null) {
                    failTimer = new HashedWheelTimer(
                            new NamedThreadFactory("failback-cluster-timer", true),1,TimeUnit.SECONDS, 32, failbackTasks);
                }
            }
        }
        RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
        try {
            failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
        } catch (Throwable e) {
            logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
        }
    }
    
  3. 快速失斚阕怠:FailfastCluster -> FailfastClusterInvoker

    僅執(zhí)行一次漱竖,也就是說,若當前調(diào)用失敗畜伐,則直接拋異常馍惹,通常用于非冪等的寫操作。邏輯比較簡單玛界,如下:

    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        //調(diào)用父類select方法選擇Invoker万矾,并調(diào)用,失敗則直接拋異常
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        try {
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            // 直接拋一場慎框,忽略
        }
    }
    
  4. 失效安全:FailsafeCluster -> FailsafeClusterInvoker

    調(diào)用失敗良狈,則只做日志記錄,并返回空的RpcResult笨枯,邏輯同樣比較簡單薪丁,如下:

    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            //調(diào)用父類select方法選擇Invoker遇西,并調(diào)用,失敗則返回空的RpcResult
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            logger.error("Failsafe ignore exception: " + e.getMessage(), e);
            return new RpcResult(); // ignore
        }
    }
    
  5. Available :AvailableCluster-> AvailableClusterInvoker(無需負載均衡

    與上面4種機制不同严嗜,AvailableClusterInvoker不涉及LoadBalance粱檀,直接調(diào)用第一個可用的Invoker;若無可用Invoker漫玄,直接拋異常茄蚯。核心邏輯如下:

    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        for (Invoker<T> invoker : invokers) {
             //比較簡單,拿到可用的Invoker睦优,直接調(diào)用渗常,成功則成功,失敗則拋RpcException汗盘;
            if (invoker.isAvailable()) {
                return invoker.invoke(invocation);
            }
        }
        throw new RpcException("No provider available in " + invokers);
    }
    
  6. Forking : ForkingCluster -> ForkingClusterInvoker

    支持并發(fā)調(diào)用多個invoker凳谦,內(nèi)置cached線程池,同時支持超時時間衡未,超時時間由URL參數(shù)timeout指定尸执;并發(fā)數(shù)由URL參數(shù)forks指定,假設fork=n缓醋,那么會往cached線程池丟n個Runnable執(zhí)行對應的invoke操作如失,最終結果存放在阻塞隊列,適用于實時性要求比較高的操作送粱,但是相對比較耗資源褪贵。下面是核心邏輯:

    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            final List<Invoker<T>> selected;
            final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
            final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (forks <= 0 || forks >= invokers.size()) {
                selected = invokers;
            } else {
                 //選擇Invoker做備用
                selected = new ArrayList<>();
                for (int i = 0; i < forks; i++) {
                    // TODO. Add some comment here, refer chinese version for more details.
                    Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                    if (!selected.contains(invoker)) {
                        //Avoid add the same invoker several times.
                        selected.add(invoker);
                    }
                }
            }
            RpcContext.getContext().setInvokers((List) selected);
            final AtomicInteger count = new AtomicInteger();
             //阻塞隊列,用于存放異步結果
            final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
             // 調(diào)用備選Inboker抗俄,結果存放隊列
            for (final Invoker<T> invoker : selected) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            Result result = invoker.invoke(invocation);
                            ref.offer(result);
                        } catch (Throwable e) {
                            int value = count.incrementAndGet();
                            if (value >= selected.size()) {
                                ref.offer(e);
                            }
                        }
                    }
                });
            }
            try {
                 //有結果則直接返回
                Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                if (ret instanceof Throwable) {
                    Throwable e = (Throwable) ret;
                    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
                }
                return (Result) ret;
            } catch (InterruptedException e) {
                throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
            }
        } finally {
            // clear attachments which is binding to current thread.
            RpcContext.getContext().clearAttachments();
        }
    }
    
  7. Mergeable :MergeableCluster -> MergeableClusterInvoker (無需負載均衡

    主要用于URL中Method帶merge參數(shù)的Invoker脆丁,無需負載均衡;若URL的Method參數(shù)不帶merger动雹,則退化為availebleClusterInvoke槽卫;內(nèi)置cached線程池,用于執(zhí)行異步invoker調(diào)用胰蝠,結果緩存于list歼培,用于后面的merge;約定merger值為".xxx"或者"xxx",若以.xxx開頭茸塞,則會直接調(diào)用xxx方法進行merge(可以理解為用戶自定義merge邏輯躲庄,而不采用dubbo自身提供的Merger接口SPI實現(xiàn));否則钾虐,根據(jù)merger值噪窘,找到對應Merger的SPI實現(xiàn)對結果list進行merge;

    protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        //方法是否支持merger
        String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
         //不支持merger效扫,則退化為availaClusterInvoker
        if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group
            for (final Invoker<T> invoker : invokers) {
                if (invoker.isAvailable()) {
                    try {
                        return invoker.invoke(invocation);
                    } catch (RpcException e) {
                      // 異常處理倔监,略過
                    }
                }
            }
            return invokers.iterator().next().invoke(invocation);
        }
    
        //方法返回類型
        Class<?> returnType;
        try {
            returnType = getInterface().getMethod(
                    invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
        } catch (NoSuchMethodException e) {
            returnType = null;
        }
    
        //異步調(diào)用結果map无切,<invoker.getUrl,Future<Result>>
        Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
        for (final Invoker<T> invoker : invokers) {
             // 線程池處理異步調(diào)用
            Future<Result> future = executor.submit(new Callable<Result>() {
                @Override
                public Result call() throws Exception {
                    return invoker.invoke(new RpcInvocation(invocation, invoker));
                }
            });
            results.put(invoker.getUrl().getServiceKey(), future);
        }
    
        Object result = null;
        List<Result> resultList = new ArrayList<Result>(results.size());
    
        //獲取結果列表,用于后續(xù)合并
        int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
            Future<Result> future = entry.getValue();
            try {
                Result r = future.get(timeout, TimeUnit.MILLISECONDS);
                if (r.hasException()) {
                    log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) +
                                    " failed: " + r.getException().getMessage(),
                            r.getException());
                } else {
                    resultList.add(r);
                }
            } catch (Exception e) {
                throw new RpcException("Failed to invoke service " + entry.getKey() + ": " + e.getMessage(), e);
            }
        }
    
        //異步invoker調(diào)用結果resultList
        if (resultList.isEmpty()) {
            return new RpcResult((Object) null);
        } else if (resultList.size() == 1) {
            return resultList.iterator().next();
        }
    
        //方法返回類類型為 void丐枉,則直接返回
        if (returnType == void.class) {
            return new RpcResult((Object) null);
        }
    
        //自定義merger值,以".merger"開頭
        if (merger.startsWith(".")) {
            merger = merger.substring(1);
            Method method;
            try {
                //獲取方法
                method = returnType.getMethod(merger, returnType);
            } catch (NoSuchMethodException e) {
                throw new RpcException("Can not merge result because missing method [ " + merger + " ] in class [ " +
                        returnType.getClass().getName() + " ]");
            }
            //設置方法訪問權限
            if (!Modifier.isPublic(method.getModifiers())) {
                method.setAccessible(true);
            }
            //拿到result中的第一個掘托,拿到result的值
            result = resultList.remove(0).getValue();
            try {
                if (method.getReturnType() != void.class
                        && method.getReturnType().isAssignableFrom(result.getClass())) {
                    //根據(jù)自定義merge方法瘦锹,合并resultList的結果
                    for (Result r : resultList) {
                        result = method.invoke(result, r.getValue());
                    }
                } else {
                    //無返回值,則只做merge
                    for (Result r : resultList) {
                        method.invoke(result, r.getValue());
                    }
                }
            } catch (Exception e) {
                throw new RpcException("Can not merge result: " + e.getMessage(), e);
            }
        } else {
            Merger resultMerger;
            //merger == default,則使用與returnType類型相匹配的默認merger
            if (ConfigUtils.isDefault(merger)) {
                resultMerger = MergerFactory.getMerger(returnType);
            } else {
                //否則闪盔,使用指定merger
                resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
            }
            if (resultMerger != null) {
                List<Object> rets = new ArrayList<Object>(resultList.size());
                for (Result r : resultList) {
                    rets.add(r.getValue());
                }
                result = resultMerger.merge(
                        rets.toArray((Object[]) Array.newInstance(returnType, 0)));
            } else {
                throw new RpcException("There is no merger to merge result.");
            }
        }
        return new RpcResult(result);
    }
    
  8. 廣播弯院。 :BroadcastCluster -> BroadcastClusterInvoker (無需負載均衡

    所有原始invoker都會被調(diào)用,無需負載均衡泪掀,適用于notify場景听绳,邏輯比較簡單,

    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        RpcContext.getContext().setInvokers((List) invokers);
        RpcException exception = null;
        Result result = null;
         // 依次調(diào)用所有Invoker异赫,異常則記錄日志椅挣,返回結果以最后一個Invoker調(diào)用結果為準
        for (Invoker<T> invoker : invokers) {
            try {
                result = invoker.invoke(invocation);
            } catch (RpcException e) {
                exception = e;
                logger.warn(e.getMessage(), e);
            } catch (Throwable e) {
                exception = new RpcException(e.getMessage(), e);
                logger.warn(e.getMessage(), e);
            }
        }
        if (exception != null) {
            throw exception;
        }
        return result;
    }
    

所有8種cluster中,除去AvailableCluster塔拳、MergeableCluster鼠证、BroadcastCluster,其他都需要根據(jù)LoadBalance選取可用Invoker靠抑,具體邏輯在AbstractClusterInvoker.select量九。先來看AbstractClusterInvoker的構造方法:

public AbstractClusterInvoker(Directory<T> directory, URL url) {
    if (directory == null) {
        throw new IllegalArgumentException("service directory == null");
    }

    this.directory = directory;
    //sticky: invoker.isAvailable() should always be checked before using when availablecheck is true.
    this.availablecheck = url.getParameter(Constants.CLUSTER_AVAILABLE_CHECK_KEY, Constants.DEFAULT_CLUSTER_AVAILABLE_CHECK);
}

兩個核心參數(shù),Directory和URL颂碧,Directory本節(jié)先不做介紹荠列,這里的URL是被調(diào)用服務的URL;availableCheck為了與服務的availableCheck做區(qū)分载城,這里的參數(shù)名是cluster.availablecheck肌似;核心關注上面提到的select方法,先來看邏輯:

  1. 先判斷是否開啟粘性策略()诉瓦,值取自URL參數(shù)sticky锈嫩;
  2. 當前粘性Invoker是否在可用列表,不可用則置空垦搬;
  3. 若采用粘性策略呼寸,當前stickyInvoker可用,且該stickyInvoker未被使用過(虛擬Invoker執(zhí)行單次invoke猴贰,當前Invoker從未被選中過对雪;盡可能保證平均調(diào)用每個原始Invoker),直接返回stickyInvoker
  4. 否則采用負載均衡策略選擇一個原始Invoker返回(詳情參考后面的doSelect方法
  5. 若采用粘性策略米绕,則把4中的Invoker賦值給stickInvoker;
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    String methodName = invocation == null ? StringUtils.EMPTY : invocation.getMethodName();

    boolean sticky = invokers.get(0).getUrl()
            .getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);

    //ignore overloaded method
    // stickyInvoker不包含在invokers中瑟捣,則stickyInvoker置空
    if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
        stickyInvoker = null;
    }
    //ignore concurrency problem
    // 啟用sticky馋艺,且stickyInvoker非空,stickyInvoker未被使用過迈套,且stickyInvoker可用的情況下捐祠,返回stickyInvoker
    if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
        if (availablecheck && stickyInvoker.isAvailable()) {
            return stickyInvoker;
        }
    }
    // 否則利用負載均衡策略選擇一個invoker,重點關注
    Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
    if (sticky) {
        stickyInvoker = invoker;
    }
    return invoker;
}

整體select方法都是為了盡可能保證每次選出的Invoker不重復桑李,也就是說最大限度的保證負載均衡踱蛀;doSelect方法在處理的時候,通過loadBalance選出的Invoker贵白,還會對其進一步判斷是否已被選中過率拒,步驟如下:

  1. invokers.size = 1,則直接返回禁荒,否則執(zhí)行步驟2猬膨;
  2. 利用負載均衡選擇一個invoker,然后執(zhí)行步驟3呛伴;
  3. 若selected非空勃痴,且2中的invoker已在selected中,則執(zhí)行步驟4進行重新選擇热康;
  4. 重新選擇召耘,結果非空則直接返回,否則執(zhí)行步驟5褐隆;
  5. 重新選擇結果為空垃喊,則根據(jù)hash規(guī)則趾疚,直接從invokers中直接返回一個結果
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    if (invokers.size() == 1) {
        return invokers.get(0);
    }
    Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

    //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
    // selected非空骂删,且通過負載均衡得到的invoker已在selected中尺碰,或者選中的invoker不可用則重新選擇。
    if ((selected != null && selected.contains(invoker))
            || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
        try {
            // 重新選擇,重點關注
            Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
            if (rinvoker != null) {
                invoker = rinvoker;
            } else {
             //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                int index = invokers.indexOf(invoker);
                // 重新選擇失敗歇攻,則利用mod重新選擇一個invoker
                try {
                    //Avoid collision
                    invoker = invokers.get((index + 1) % invokers.size());
                } catch (Exception e) {
                    logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                }
            }
        } catch (Throwable t) {
            logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
        }
    }
    return invoker;
}

doSelect方法中的loadbalance.select已經(jīng)在LoadBalance部分做了分析固惯,這里不再冗述,重點關注reSelect方法缴守;先把備選Invoker中葬毫,未被選中過的Invoker過濾出來,優(yōu)先從中選取可用Invoker屡穗,步驟如下:

  1. 初始化reselectInvokers列表贴捡,size= 1 或者 invokers.size -1,用于緩存未被選中過的Invoker村砂;
  2. reselectInvokers非空烂斋,則根據(jù)負載均衡策略,選擇一個invoker,直接返回汛骂,否則執(zhí)行3罕模;
  3. reselectInvokers為空,即invokers中所有invoker都在selected中帘瞭,則從selected中過濾可用invoer淑掌,存放至reselectInvokers;
  4. 重復步驟2蝶念,否則返回null
private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException {

    //Allocating one in advance, this list is certain to be used.
    List<Invoker<T>> reselectInvokers = new ArrayList<>(
            invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());

    // First, try picking a invoker not in `selected`.
    // 過濾未被selected的invoker抛腕,存放至reselectInvoker
    for (Invoker<T> invoker : invokers) {
        if (availablecheck && !invoker.isAvailable()) {
            continue;
        }

        if (selected == null || !selected.contains(invoker)) {
            reselectInvokers.add(invoker);
        }
    }

    //reselectInvokers非空,則利用負載均衡重新選擇
    if (!reselectInvokers.isEmpty()) {
        return loadbalance.select(reselectInvokers, getUrl(), invocation);
    }

    // Just pick an available invoker using loadbalance policy
    // 若reselectInvokers為空祸轮,則從selected中過濾可用invoker,存放至reselectInvokers
    if (selected != null) {
        for (Invoker<T> invoker : selected) {
            if ((invoker.isAvailable()) // available first
                    && !reselectInvokers.contains(invoker)) {
                reselectInvokers.add(invoker);
            }
        }
    }
    if (!reselectInvokers.isEmpty()) {
        return loadbalance.select(reselectInvokers, getUrl(), invocation);
    }

    return null;
}

Cluster層的容錯主要通過幾種常用的容錯機制配合負載均衡侥钳,保證最終通過Cluster暴露可用的Invoker适袜;而且,dubbo在保證Invoker可用性前提下舷夺,要求盡可能均衡負載苦酱,過程會多次執(zhí)行負載均衡策略。

注:dubbo源碼版本2.7.1给猾,歡迎指正疫萤。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市敢伸,隨后出現(xiàn)的幾起案子扯饶,更是在濱河造成了極大的恐慌,老刑警劉巖池颈,帶你破解...
    沈念sama閱讀 217,084評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件尾序,死亡現(xiàn)場離奇詭異,居然都是意外死亡躯砰,警方通過查閱死者的電腦和手機每币,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,623評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來琢歇,“玉大人兰怠,你說我怎么就攤上這事±蠲#” “怎么了揭保?”我有些...
    開封第一講書人閱讀 163,450評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長魄宏。 經(jīng)常有香客問我掖举,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,322評論 1 293
  • 正文 為了忘掉前任塔次,我火速辦了婚禮方篮,結果婚禮上,老公的妹妹穿的比我還像新娘励负。我一直安慰自己藕溅,他們只是感情好,可當我...
    茶點故事閱讀 67,370評論 6 390
  • 文/花漫 我一把揭開白布继榆。 她就那樣靜靜地躺著巾表,像睡著了一般。 火紅的嫁衣襯著肌膚如雪略吨。 梳的紋絲不亂的頭發(fā)上集币,一...
    開封第一講書人閱讀 51,274評論 1 300
  • 那天,我揣著相機與錄音翠忠,去河邊找鬼鞠苟。 笑死,一個胖子當著我的面吹牛秽之,可吹牛的內(nèi)容都是我干的当娱。 我是一名探鬼主播,決...
    沈念sama閱讀 40,126評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼考榨,長吁一口氣:“原來是場噩夢啊……” “哼跨细!你這毒婦竟也來了?” 一聲冷哼從身側響起河质,我...
    開封第一講書人閱讀 38,980評論 0 275
  • 序言:老撾萬榮一對情侶失蹤冀惭,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后掀鹅,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體云头,經(jīng)...
    沈念sama閱讀 45,414評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,599評論 3 334
  • 正文 我和宋清朗相戀三年淫半,在試婚紗的時候發(fā)現(xiàn)自己被綠了溃槐。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,773評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡科吭,死狀恐怖昏滴,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情对人,我是刑警寧澤谣殊,帶...
    沈念sama閱讀 35,470評論 5 344
  • 正文 年R本政府宣布,位于F島的核電站牺弄,受9級特大地震影響姻几,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,080評論 3 327
  • 文/蒙蒙 一蛇捌、第九天 我趴在偏房一處隱蔽的房頂上張望抚恒。 院中可真熱鬧,春花似錦络拌、人聲如沸俭驮。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,713評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽混萝。三九已至,卻和暖如春萍恕,著一層夾襖步出監(jiān)牢的瞬間逸嘀,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,852評論 1 269
  • 我被黑心中介騙來泰國打工允粤, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留崭倘,地道東北人。 一個月前我還...
    沈念sama閱讀 47,865評論 2 370
  • 正文 我出身青樓维哈,卻偏偏與公主長得像绳姨,于是被迫代替她去往敵國和親登澜。 傳聞我的和親對象是個殘疾皇子阔挠,可洞房花燭夜當晚...
    茶點故事閱讀 44,689評論 2 354

推薦閱讀更多精彩內(nèi)容

  • 前言 本文繼續(xù)分析dubbo的cluster層,此層封裝多個提供者的路由及負載均衡脑蠕,并橋接注冊中心购撼,以Invoke...
    Java大生閱讀 991評論 0 0
  • 先看官網(wǎng)一張圖image.png這就是dubbo的集群設計了,本章主要解析的就是圖中的主要幾個藍色點谴仙,簡單堆土做個...
    致慮閱讀 393評論 0 1
  • 6.7再談 Invoker 在前面的服務注冊與發(fā)現(xiàn)中迂求,我們發(fā)現(xiàn),服務在訂閱過程中晃跺,把 notify 過來的 url...
    Curtain_call閱讀 277評論 0 1
  • Dubbo的分支: 3.0服務消費者發(fā)出請求與提供者返回響應的過程揩局,包括了代理與傳輸部分,主要的內(nèi)容都在第2篇里面...
    whslowly閱讀 755評論 0 0
  • 我是黑夜里大雨紛飛的人啊 1 “又到一年六月掀虎,有人笑有人哭凌盯,有人歡樂有人憂愁,有人驚喜有人失落烹玉,有的覺得收獲滿滿有...
    陌忘宇閱讀 8,536評論 28 53