4-dubbo源碼分析之集群設(shè)計

  • 先看官網(wǎng)一張圖


    image.png

    這就是dubbo的集群設(shè)計了滤淳,本章主要解析的就是圖中的主要幾個藍色點梧喷,簡單堆土做個說明:

  • Cluster對于dubbo集群整個管控,會有各種方案脖咐,比如快速失敗铺敌、安全失敗等

  • Directory在consumer章節(jié)中就已經(jīng)接觸過,主要維護的invoker的動態(tài)管理

  • Router一看名詞就是路由相關(guān)了

  • LoadBalance講的就是負(fù)載均衡

    整體結(jié)合起來就是:consumer去遠程調(diào)用一個具體的provider時屁擅,會通過集群中路由偿凭、負(fù)載均衡等策略選取最終一個具體的服務(wù)完成具體調(diào)用。


具體解析

一.Cluster
  • 看下官網(wǎng)描述:

Cluster 將 Directory 中的多個 Invoker 偽裝成一個 Invoker派歌,對上層透明弯囊,偽裝過程包含了容錯邏輯,調(diào)用失敗后胶果,重試另一個

  • 具體接口

    /**
     * Cluster. (SPI, Singleton, ThreadSafe)
     * <p>
     * <a >Cluster</a>
     * <a >Fault-Tolerant</a>
     *
     * Cluster 將 Directory 中的多個 Invoker 偽裝成一個 Invoker匾嘱,對上層透明,偽裝過程包含了容錯邏輯稽物,調(diào)用失敗后奄毡,重試另一個
     * 應(yīng)對出錯情況采取的策略-9種實現(xiàn)
     */
    @SPI(FailoverCluster.NAME)
    public interface Cluster {
    
        /**
         * Merge the directory invokers to a virtual invoker.
         *
         * @param <T>
         * @param directory
         * @return cluster invoker
         * @throws RpcException
         */
        @Adaptive
        <T> Invoker<T> join(Directory<T> directory) throws RpcException;
    
    }
    

    很明顯默認(rèn)擴展是FailoverCluster,里面就一個很熟悉的方法贝或,join吼过,在consumer中已經(jīng)出現(xiàn)過,那么跟蹤一下咪奖;


    image.png

    MockClusterInvoker里面構(gòu)造的是FailoverClusterInvoker盗忱,因此最終的invoker不斷調(diào)用下傳,繼續(xù):


    image.png
image.png

image.png

image.png

image.png

image.png

這個代碼無非就是將相關(guān)的consumer調(diào)用信息進行構(gòu)造封裝羊赵,返回趟佃,但真正發(fā)揮作用的地方就是那個返回的Invoker: MockClusterInvoker-->FailoverClusterInvoker,為什么想罕?因為這一步直接決定最終發(fā)起遠程調(diào)用時所使用的ClusterInvoker寞钥,也就是如下的doInvoker方法:

  • 先看MockClusterInvoker

    /**
     * 降級處理方案
     * 原理就是改變注冊在zookeeper上的節(jié)點信息.從而zookeeper通知重新生成invoker
     */
    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;
    
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || value.equalsIgnoreCase("false")) {
            /**
             * 無降級: no mock
             * 這里的invoker是FailoverClusterInvoker
             */
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            if (logger.isWarnEnabled()) {
                logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
            }
            /**
             * 屏蔽: force:direct mock
             * mock=force:return+null
             * 表示消費方對方法的調(diào)用都直接返回null赠橙,不發(fā)起遠程調(diào)用
             * 可用于屏蔽不重要服務(wù)不可用的時候蜜氨,對調(diào)用方的影響
             */
            //
            result = doMockInvoke(invocation, null);
        } else {
            /**
             * 容錯: fail-mock
             * mock=fail:return+null
             * 表示消費方對該服務(wù)的方法調(diào)用失敗后,再返回null捏膨,不拋異常
             * 可用于對不重要服務(wù)不穩(wěn)定的時候树瞭,忽略對調(diào)用方的影響 */
            try {
                result = this.invoker.invoke(invocation);
            } catch (RpcException e) {
                if (e.isBiz()) {
                    throw e;
                } else {
                    if (logger.isWarnEnabled()) {
                        logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
                    }
                    result = doMockInvoke(invocation, e);
                }
            }
        }
        return result;
    }
    

    這里出現(xiàn)了consumer配置項中的一個重要配置:mock;代碼邏輯很清楚启绰,講的是容器的容錯與降級方案跋破。

  • 繼續(xù)跟著看FailoverClusterInvoker

    @Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    
        // Invoker列表
        List<Invoker<T>> copyinvokers = invokers;
    
        //確認(rèn)下Invoker列表不為空
        checkInvokers(copyinvokers, invocation);
    
        //重試次數(shù)
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            /**
             * 重試時簸淀,進行重新選擇瓶蝴,避免重試時invoker列表已發(fā)生變化.
             * 注意:如果列表發(fā)生了變化,那么invoked判斷會失效租幕,因為invoker示例已經(jīng)改變
             */
            if (i > 0) {
                checkWhetherDestroyed();
                copyinvokers = list(invocation);
                // check again
                //重新檢查一下
                checkInvokers(copyinvokers, invocation);
            }
    
            /** 使用loadBalance選擇一個Invoker返回 */
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
    
                /** 使用選擇的結(jié)果Invoker進行調(diào)用舷手,返回結(jié)果 */
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + invocation.getMethodName()
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                + invocation.getMethodName() + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyinvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
    }
    

    看看發(fā)起遠程調(diào)用的debug情況:


    image.png

    恩 確實進來了,既然FailoverCluster的策略是:失敗自動切換劲绪,當(dāng)出現(xiàn)失敗男窟,重試其它服務(wù)器,那么這個策略的體現(xiàn)邏輯就在這個doInvoker的for循環(huán)重試?yán)?/p>

    image.png

    len的取值就是配置項retries珠叔,即重試次數(shù)蝎宇,默認(rèn)是3次;注意:重試時祷安,進行重新選擇,避免重試時invoker列表已發(fā)生變化.
    至于當(dāng)前invoker節(jié)點失敗后重試的機制如何兔乞,就是select如何再次選擇的問題了
    Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
    

    次實現(xiàn)在父類AbstractClusterInvoker中:

     /**
     *
     * 使用loadbalance選擇invoker.</br>
     * a)先lb選擇汇鞭,如果在selected列表中 或者 不可用且做檢驗時,進入下一步(重選),否則直接返回</br>
     * b)重選驗證規(guī)則:selected > available .保證重選出的結(jié)果盡量不在select中庸追,并且是可用的
     *
     * @param selected 已選過的invoker.注意:輸入保證不重復(fù)
     *
     * Select a invoker using loadbalance policy.</br>
     * a)Firstly, select an invoker using loadbalance. If this invoker is in previously selected list, or, 
     * if this invoker is unavailable, then continue step b (reselect), otherwise return the first selected invoker</br>
     * b)Reslection, the validation rule for reselection: selected > available. This rule guarantees that
     * the selected invoker has the minimum chance to be one in the previously selected list, and also 
     * guarantees this invoker is available.
     *
     * @param loadbalance load balance policy
     * @param invocation
     * @param invokers invoker candidates
     * @param selected  exclude selected invokers or not
     * @return
     * @throws RpcException
     */
    protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
        if (invokers == null || invokers.isEmpty()) return null;
    
        String methodName = invocation == null ? "" : invocation.getMethodName();
    
        // sticky霍骄,滯連接用于有狀態(tài)服務(wù),盡可能讓客戶端總是向同一提供者發(fā)起調(diào)用淡溯,除非該提供者掛了读整,再連另一臺。
        boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
        {
            //ignore overloaded method
            if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
                stickyInvoker = null;
            }
            //ignore concurrency problem
            if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
                if (availablecheck && stickyInvoker.isAvailable()) {
                    return stickyInvoker;
                }
            }
        }
        Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
    
        if (sticky) {
            stickyInvoker = invoker;
        }
        return invoker;
    }
    

    繼續(xù)看核心方法:doSelect

    private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    
        if (invokers == null || invokers.isEmpty()) return null;
    
        // 只有一個invoker咱娶,直接返回米间,不需要處理
        if (invokers.size() == 1)  return invokers.get(0);
    
        if (loadbalance == null) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
    
        /** 通過具體的負(fù)載均衡的算法得到一個invoker,最后調(diào)用 */
        Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
    
        //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
        /** 如果 selected中包含(優(yōu)先判斷) 或者 不可用&&availablecheck=true 則重試. */
        if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
            try {
    
                /**
                 * 重新選擇
                 * 先從非selected的列表中選擇,沒有在從selected列表中選擇
                 */
                Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
                if (rinvoker != null) {
                    invoker = rinvoker;
                } else {
                    //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                    /** 看下第一次選的位置膘侮,如果不是最后屈糊,選+1位置. */
                    int index = invokers.indexOf(invoker);
                    try {
                        //Avoid collision
                        //最后在避免碰撞
                        invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);
                    } catch (Exception e) {
                        logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                    }
                }
            } catch (Throwable t) {
                logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
            }
        }
        return invoker;
    }
    

    大致如下:

    • 通過具體的負(fù)載均衡的算法得到一個invoker(后面詳細說負(fù)債均衡)
    • 如果 selected中包含(優(yōu)先判斷) 或者 不可用&&availablecheck=true 則重試

    這里有個重要的細節(jié):sticky配置


    image.png

    看代碼就知道其作用:

    滯連接用于有狀態(tài)服務(wù),盡可能讓客戶端總是向同一提供者發(fā)起調(diào)用琼了,除非該提供者掛了逻锐,再連另一臺。

    FailoverCluster失敗重試策略就差不多講完了雕薪,大概回顧下:

  • 選擇Cluster

  • 決定ClusterInvoker

  • 執(zhí)行doInvoker時實現(xiàn)具體策略

    這里不是很難昧诱,下面各種策略幾乎類似方式處理,就簡單根據(jù)官網(wǎng)介紹下其實現(xiàn)的效果:

  • FailfastCluster

    快速失敗所袁,只發(fā)起一次調(diào)用盏档,失敗立即報錯。通常用于非冪等性的寫操作纲熏,比如新增記錄妆丘。

   public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {

   public FailfastClusterInvoker(Directory<T> directory) {
       super(directory);
   }

   @Override
   public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
       checkInvokers(invokers, invocation);
       Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
       try {
           return invoker.invoke(invocation);
       } catch (Throwable e) {
           if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
               throw (RpcException) e;
           }
           throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
       }
     }
   }

代碼邏輯一目倆然

  • FailsafeCluster

    失敗安全锄俄,出現(xiàn)異常時,直接忽略勺拣。通常用于寫入審計日志等操作奶赠。

public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
       private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);
   
       public FailsafeClusterInvoker(Directory<T> directory) {
           super(directory);
       }
   
       @Override
       public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
           try {
               checkInvokers(invokers, invocation);
               Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
               return invoker.invoke(invocation);
           } catch (Throwable e) {
               logger.error("Failsafe ignore exception: " + e.getMessage(), e);
               return new RpcResult(); // ignore
           }
       }
}   
  • FailbackCluster

    失敗自動恢復(fù),后臺記錄失敗請求药有,定時重發(fā)毅戈。通常用于消息通知操作。

   public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {

       private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);
   
       private static final long RETRY_FAILED_PERIOD = 5 * 1000;
   
       /**
        * Use {@link NamedInternalThreadFactory} to produce {@link com.alibaba.dubbo.common.threadlocal.InternalThread}
        * which with the use of {@link com.alibaba.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
        */
       private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2,
               new NamedInternalThreadFactory("failback-cluster-timer", true));
   
       private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>();
       private volatile ScheduledFuture<?> retryFuture;
   
       public FailbackClusterInvoker(Directory<T> directory) {
           super(directory);
       }
   
       private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
           if (retryFuture == null) {
               synchronized (this) {
                   if (retryFuture == null) {
                       retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
   
                           @Override
                           public void run() {
                               // collect retry statistics
                               try {
                                   retryFailed();
                               } catch (Throwable t) { // Defensive fault tolerance
                                   logger.error("Unexpected error occur at collect statistic", t);
                               }
                           }
                       }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
                   }
               }
           }
           failed.put(invocation, router);
       }
   
       void retryFailed() {
           if (failed.size() == 0) {
               return;
           }
           for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(failed).entrySet()) {
               Invocation invocation = entry.getKey();
               Invoker<?> invoker = entry.getValue();
               try {
                   invoker.invoke(invocation);
                   failed.remove(invocation);
               } catch (Throwable e) {
                   logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
               }
           }
       }
   
       @Override
       protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
           try {
               checkInvokers(invokers, invocation);
               Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
               return invoker.invoke(invocation);
           } catch (Throwable e) {
               logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e);
               addFailed(invocation, this);
               return new RpcResult(); // ignore
           }
       }
}
  • ForkingCluster

    并行調(diào)用多個服務(wù)器愤惰,只要一個成功即返回苇经。通常用于實時性要求較高的讀操作,但需要浪費更多服務(wù)資源宦言∩鹊ィ可通過 forks="2" 來設(shè)置最大并行數(shù)。

public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
       /**
        * Use {@link NamedInternalThreadFactory} to produce {@link com.alibaba.dubbo.common.threadlocal.InternalThread}
        * which with the use of {@link com.alibaba.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
        */
       private final ExecutorService executor = Executors.newCachedThreadPool(new NamedInternalThreadFactory("forking-cluster-timer", true));
   
       public ForkingClusterInvoker(Directory<T> directory) {
           super(directory);
       }
   
       @Override
       @SuppressWarnings({"unchecked", "rawtypes"})
       public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
           checkInvokers(invokers, invocation);
           final List<Invoker<T>> selected;
           final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
           final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
           if (forks <= 0 || forks >= invokers.size()) {
               selected = invokers;
           } else {
               selected = new ArrayList<Invoker<T>>();
               for (int i = 0; i < forks; i++) {
                   // TODO. Add some comment here, refer chinese version for more details.
                   Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                   if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
                       selected.add(invoker);
                   }
               }
           }
           RpcContext.getContext().setInvokers((List) selected);
           final AtomicInteger count = new AtomicInteger();
           final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
           for (final Invoker<T> invoker : selected) {
               executor.execute(new Runnable() {
                   @Override
                   public void run() {
                       try {
                           Result result = invoker.invoke(invocation);
                           ref.offer(result);
                       } catch (Throwable e) {
                           int value = count.incrementAndGet();
                           if (value >= selected.size()) {
                               ref.offer(e);
                           }
                       }
                   }
               });
           }
           try {
               Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
               if (ret instanceof Throwable) {
                   Throwable e = (Throwable) ret;
                   throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
               }
               return (Result) ret;
           } catch (InterruptedException e) {
               throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
           }
       }
}
  • BroadcastCluster

    廣播調(diào)用所有提供者奠旺,逐個調(diào)用蜘澜,任意一臺報錯則報錯。通常用于通知所有提供者更新緩存或日志等本地資源信息响疚。

   public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {

       private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);
   
       public BroadcastClusterInvoker(Directory<T> directory) {
           super(directory);
       }
   
       @Override
       @SuppressWarnings({"unchecked", "rawtypes"})
       public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
           checkInvokers(invokers, invocation);
           RpcContext.getContext().setInvokers((List) invokers);
           RpcException exception = null;
           Result result = null;
           for (Invoker<T> invoker : invokers) {
               try {
                   result = invoker.invoke(invocation);
               } catch (RpcException e) {
                   exception = e;
                   logger.warn(e.getMessage(), e);
               } catch (Throwable e) {
                   exception = new RpcException(e.getMessage(), e);
                   logger.warn(e.getMessage(), e);
               }
           }
           if (exception != null) {
               throw exception;
           }
           return result;
       }
}

2.LoadBalance
  • 看下主接口
   /**
* 負(fù)載均衡-四種負(fù)載均衡策略
* LoadBalance. (SPI, Singleton, ThreadSafe)
* <p>
* <a >Load-Balancing</a>
*
* @see com.alibaba.dubbo.rpc.cluster.Cluster#join(Directory)
*/
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {

       /**
        * select one invoker in list.
        *
        * @param invokers   invokers.
        * @param url        refer url
        * @param invocation invocation.
        * @return selected invoker.
        */
       @Adaptive("loadbalance")
       <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}

默認(rèn)取的是RandomLoadBalance鄙信,那我們就以消費流程去詳細解析下這個負(fù)債均衡策略。

  • RandomLoadBalance
    既然是負(fù)債均衡忿晕,那就是發(fā)起遠程調(diào)用時選擇provider服務(wù)時發(fā)揮作用装诡,那我們從默認(rèn)的FailoverClusterInvoker.doInvoke進入:


    image.png

    出現(xiàn)了loadbalance,那就繼續(xù)跟蹤


    image.png

    image.png

    因為我本地就啟了一個provider践盼,因此就無需走負(fù)債均衡了鸦采,直接返回,但這里如果provider大于1的話宏侍,看上面畫出的重點:
    先找到AbstractLoadBalance的select方法:
@Override
   public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {

       if (invokers == null || invokers.isEmpty()) return null;

       if (invokers.size() == 1) return invokers.get(0);

       // 進行選擇赖淤,具體的子類實現(xiàn),我們這里是RandomLoadBalance
       return doSelect(invokers, url, invocation);
   }

又是鉤子谅河,具體就順其到子類了:

   /**
* random load balance.
* 默認(rèn)的策略
*
* 隨機咱旱,按權(quán)重設(shè)置隨機概率。
* 在一個截面上碰撞的概率高绷耍,但調(diào)用量越大分布越均勻吐限,而且按概率使用權(quán)重后也比較均勻,有利于動態(tài)調(diào)整提供者權(quán)重褂始。
*
* 1.獲取invokers的個數(shù),并遍歷累加權(quán)重
* 2.若不為第0個,則將當(dāng)前權(quán)重與上一個進行比較,只要有一個不等則認(rèn)為不等,即:sameWeight=false
* 3.若總權(quán)重>0 且 sameWeight=false 按權(quán)重獲取隨機數(shù),根據(jù)隨機數(shù)合權(quán)重相減確定調(diào)用節(jié)點
* 4.sameWeight=true,則均等隨機調(diào)用
*
* eg:假設(shè)有四個集群節(jié)點A,B,C,D,對應(yīng)的權(quán)重分別是1,2,3,4,那么請求到A節(jié)點的概率就為1/(1+2+3+4) = 10%.B,C,D節(jié)點依次類推為20%,30%,40%.
*/
public class RandomLoadBalance extends AbstractLoadBalance {

   public static final String NAME = "random";

   private final Random random = new Random();

   @Override
   protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
       int length = invokers.size(); // Number of invokers 總個數(shù)
       int totalWeight = 0; // The sum of weights 總權(quán)重
       boolean sameWeight = true; // Every invoker has the same weight? 權(quán)重是否都一樣
       for (int i = 0; i < length; i++) {
           int weight = getWeight(invokers.get(i), invocation);
           totalWeight += weight; // Sum 累計總權(quán)重
           if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) {
               sameWeight = false; // 計算所有權(quán)重是否都一樣
           }
       }

       // eg: 總權(quán)重為10(1+2+3+4),那么怎么做到按權(quán)重隨機呢?根據(jù)10隨機出一個整數(shù),假如為隨機出來的是2.然后依次和權(quán)重相減,比如2(隨機數(shù))-1(A的權(quán)重) = 1,然后1(上一步計算的結(jié)果)-2(B的權(quán)重) = -1,此時-1 < 0,那么則調(diào)用B,其他的以此類推
       if (totalWeight > 0 && !sameWeight) {
           // 如果權(quán)重不相同且權(quán)重大于0.則按總權(quán)重數(shù)隨機
           // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
           int offset = random.nextInt(totalWeight);
           // 確定隨機值落在那個片段上
           // Return a invoker based on the random value.
           for (int i = 0; i < length; i++) {
               offset -= getWeight(invokers.get(i), invocation);
               if (offset < 0) {
                   return invokers.get(i);
               }
           }
       }
       // 如果權(quán)重相同或權(quán)重為0則均等隨機
       // If all invokers have the same weight value or totalWeight=0, return evenly.
       return invokers.get(random.nextInt(length));
   }
}

當(dāng)前策略的算法在注釋中很清楚了诸典,這里不在細說。其他三種負(fù)債均衡其實處理方式大致相同崎苗,簡單列一下:

  • RoundRobinLoadBalance

輪循狐粱,按公約后的權(quán)重設(shè)置輪循比率舀寓。

   @Override
   protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
       String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
       int length = invokers.size(); // Number of invokers invokers的個數(shù)
       int maxWeight = 0; // The maximum weight // 最大權(quán)重
       int minWeight = Integer.MAX_VALUE; // The minimum weight 最小權(quán)重
       final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
       int weightSum = 0;
       for (int i = 0; i < length; i++) {
           int weight = getWeight(invokers.get(i), invocation);
           maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight  累計最大權(quán)重
           minWeight = Math.min(minWeight, weight); // Choose the minimum weight  累計最小權(quán)重
           if (weight > 0) {
               invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
               weightSum += weight;
           }
       }
       AtomicPositiveInteger sequence = sequences.get(key);
       if (sequence == null) {
           sequences.putIfAbsent(key, new AtomicPositiveInteger());
           sequence = sequences.get(key);
       }
       int currentSequence = sequence.getAndIncrement();
       if (maxWeight > 0 && minWeight < maxWeight) {  // 如果權(quán)重不一樣
           int mod = currentSequence % weightSum;
           for (int i = 0; i < maxWeight; i++) {
               for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                   final Invoker<T> k = each.getKey();
                   final IntegerWrapper v = each.getValue();
                   if (mod == 0 && v.getValue() > 0) {
                       return k;
                   }
                   if (v.getValue() > 0) {
                       v.decrement();
                       mod--;
                   }
               }
           }
       }
       // Round robin 取模循環(huán)
       return invokers.get(currentSequence % length);
   }
  • LeastActiveLoadBalance

最少活躍調(diào)用數(shù),相同活躍數(shù)的隨機肌蜻,活躍數(shù)指調(diào)用前后計數(shù)差互墓。
使慢的提供者收到更少請求,因為越慢的提供者的調(diào)用前后計數(shù)差會越大

舉個實際的例子:
A請求接受一個請求時計數(shù)+1蒋搜,請求完再-1篡撵;B請求接受一個請求時,計數(shù)+1,請求完計數(shù)-1豆挽;按照這種邏輯育谬,如果請求中的節(jié)點肯定比沒有請求的計數(shù)低,因此找計數(shù)低的服務(wù)處理帮哈。場景就是:處理越慢的服務(wù)膛檀,計數(shù)越容易高,因此將后面請求分發(fā)給計數(shù)低的服務(wù)會更加友好娘侍。

   @Override
   protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
       int length = invokers.size(); // Number of invokers ,invoker總數(shù)
       int leastActive = -1; // The least active value of all invokers ,所有invoker的最小活躍數(shù)
       int leastCount = 0; // The number of invokers having the same least active value (leastActive)  擁有最小活躍數(shù)的Invoker是的個數(shù)
       int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)  擁有最小活躍數(shù)的Invoker的下標(biāo),也就是將最小活躍的invoker集中放入新數(shù)組,以便后續(xù)遍歷
       int totalWeight = 0; // The sum of weights  總權(quán)重
       int firstWeight = 0; // Initial value, used for comparision  初始權(quán)重,用于計算是否相同
       boolean sameWeight = true; // Every invoker has the same weight value?  是否所有invoker的權(quán)重都相同
       for (int i = 0; i < length; i++) {
           Invoker<T> invoker = invokers.get(i);
           int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number 活躍數(shù)
           int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight
           if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.  如果發(fā)現(xiàn)更小的活躍數(shù)則重新開始
               leastActive = active; // Record the current least active value 記錄下最小的活躍數(shù)
               leastCount = 1; // Reset leastCount, count again based on current leastCount 重新統(tǒng)計最小活躍數(shù)的個數(shù)
               leastIndexs[0] = i; // Reset  重置小標(biāo)
               totalWeight = weight; // Reset
               firstWeight = weight; // Record the weight the first invoker 重置第一個權(quán)重
               sameWeight = true; // Reset, every invoker has the same weight value?  重置是否權(quán)重相同標(biāo)識
           } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.  累計相同的最小活躍數(shù)
               leastIndexs[leastCount++] = i; // Record index number of this invoker  累計相同的最小活躍invoker的小標(biāo)
               totalWeight += weight; // Add this invoker's weight to totalWeight. 累加總權(quán)重
               // If every invoker has the same weight?  是否所有權(quán)重一樣
               if (sameWeight && i > 0
                       && weight != firstWeight) {
                   sameWeight = false;
               }
           }
       }
       // assert(leastCount > 0)
       if (leastCount == 1) {
           // 如果只有一個最小則直接返回
           // If we got exactly one invoker having the least active value, return this invoker directly.
           return invokers.get(leastIndexs[0]);
       }
       if (!sameWeight && totalWeight > 0) {
           // 如果權(quán)重不相同且總權(quán)重大于0,則按總權(quán)重隨機
           // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
           int offsetWeight = random.nextInt(totalWeight);
           // 按隨機數(shù)去值
           // Return a invoker based on the random value.
           for (int i = 0; i < leastCount; i++) {
               int leastIndex = leastIndexs[i];
               offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
               if (offsetWeight <= 0)
                   return invokers.get(leastIndex);
           }
       }
       // 如果權(quán)重相同或總權(quán)重為0,則均等隨機
       // If all invokers have the same weight value or totalWeight=0, return evenly.
       return invokers.get(leastIndexs[random.nextInt(leastCount)]);
   }
  • ConsistentHashLoadBalance

一致性 Hash宿刮,相同參數(shù)的請求總是發(fā)到同一提供者。當(dāng)某一臺提供者掛時私蕾,原本發(fā)往該提供者的請求,基于虛擬節(jié)點胡桃,平攤到其它提供者踩叭,不會引起劇烈變動。

   protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
       String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
       int identityHashCode = System.identityHashCode(invokers);
       ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
       if (selector == null || selector.identityHashCode != identityHashCode) {
           selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
           selector = (ConsistentHashSelector<T>) selectors.get(key);
       }
       return selector.select(invocation);
   }

具體相關(guān)算法:
http://en.wikipedia.org/wiki/Consistent_hashing


3.Router
  • 請求被路由到哪個服務(wù)器翠胰,靠的就是路由啦容贝,先看下主接口:
   /**
* Router. (SPI, Prototype, ThreadSafe)
* <p>
* <a >Routing</a>
*
* @see com.alibaba.dubbo.rpc.cluster.Cluster#join(Directory)
* @see com.alibaba.dubbo.rpc.cluster.Directory#list(Invocation)
*/
public interface Router extends Comparable<Router> {

       /**
        * get the router url.
        *
        * @return url
        */
       URL getUrl();
   
       /**
        * route.
        *
        * @param invokers
        * @param url        refer url
        * @param invocation
        * @return routed invokers
        * @throws RpcException
        */
       <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
核心方法已經(jīng)出現(xiàn)了。我們還是按照原有思路debug一下:
image.png
image.png

image.png

OK 之景,路由核心出現(xiàn)了斤富,上面方法做了兩件事:
- 1.RegistryDirectory doList(invocation)將所有可用的invokers根據(jù)參數(shù)條件篩選出來;
- 2.根據(jù)路由規(guī)則锻狗,將directory中篩選出來的invokers進行過濾满力,比如MockInvokersSelector將所有mock invokers過濾掉。

image.png

image.png

過濾出來的invokers再返回即完成路由操作轻纪。路由執(zhí)行大體流程就是如此油额,接下來列一下幾個路由策略:

  • ScriptRouter

    腳本路由規(guī)則 支持 JDK 腳本引擎的所有腳本,比如:javascript, jruby, groovy 等刻帚,通過 type=javascript 參數(shù)設(shè)置腳本類型潦嘶,缺省為 javascript。

    @Override
    @SuppressWarnings("unchecked")
    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
        try {
            List<Invoker<T>> invokersCopy = new ArrayList<Invoker<T>>(invokers);
            Compilable compilable = (Compilable) engine;
            Bindings bindings = engine.createBindings();
            bindings.put("invokers", invokersCopy);
            bindings.put("invocation", invocation);
            bindings.put("context", RpcContext.getContext());
            CompiledScript function = compilable.compile(rule);
            Object obj = function.eval(bindings);
            if (obj instanceof Invoker[]) {
                invokersCopy = Arrays.asList((Invoker<T>[]) obj);
            } else if (obj instanceof Object[]) {
                invokersCopy = new ArrayList<Invoker<T>>();
                for (Object inv : (Object[]) obj) {
                    invokersCopy.add((Invoker<T>) inv);
                }
            } else {
                invokersCopy = (List<Invoker<T>>) obj;
            }
            return invokersCopy;
        } catch (ScriptException e) {
            //fail then ignore rule .invokers.
            logger.error("route error , rule has been ignored. rule: " + rule + ", method:" + invocation.getMethodName() + ", url: " + RpcContext.getContext().getUrl(), e);
            return invokers;
        }
    }
    
  • ConditionRouter

條件路由: 根據(jù)dubbo管理控制臺配置的路由規(guī)則來過濾相關(guān)的invoker,這里會實時觸發(fā)RegistryDirectory類的notify方法崇众,通知本地重建invokers

```
@Override
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
    if (invokers == null || invokers.isEmpty()) {
        return invokers;
    }
    try {
        if (!matchWhen(url, invocation)) {
            return invokers;
        }
        List<Invoker<T>> result = new ArrayList<Invoker<T>>();
        if (thenCondition == null) {
            logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());
            return result;
        }
        for (Invoker<T> invoker : invokers) {
            if (matchThen(invoker.getUrl(), url)) {
                result.add(invoker);
            }
        }
        if (!result.isEmpty()) {
            return result;
        } else if (force) {
            logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(Constants.RULE_KEY));
            return result;
        }
    } catch (Throwable t) {
        logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t);
    }
    return invokers;
}
```

OK 路由基本就分析到這里掂僵;


4.Directory
  • 這個在consumer中已經(jīng)分析過了航厚,簡單看看官網(wǎng)描述:

    Directory 代表多個 Invoker,可以把它看成 List<Invoker> ,但與 List 不同的是锰蓬,它的值可能是動態(tài)變化的幔睬,比如注冊中心推送變更

   /**
* Directory. (SPI, Prototype, ThreadSafe)
* <p>
* <a >Directory Service</a>
*
* Directory 代表多個 Invoker,可以把它看成 List<Invoker> 互妓,但與 List 不同的是溪窒,它的值可能是動態(tài)變化的,比如注冊中心推送變更
*
* @see com.alibaba.dubbo.rpc.cluster.Cluster#join(Directory)
*/
public interface Directory<T> extends Node {

       /**
        * get service type.
        *
        * @return service type.
        */
       Class<T> getInterface();
   
       /**
        * list invokers.
        *
        * @return invokers
        */
       List<Invoker<T>> list(Invocation invocation) throws RpcException;
}

而此處list方法的核心邏輯也是在分析Route中就已經(jīng)見過了冯勉,不在分析澈蚌;
Directory能夠動態(tài)根據(jù)注冊中心維護Invokers列表,是因為相關(guān)Listener在被notify之后會觸發(fā)methodInvokerMap和urlInvokerMap等緩存的相關(guān)變動灼狰;最后在list方法中也就實時取出了最新的invokers宛瞄;看下之前的流程就清楚了;

  • StaticDirectory

構(gòu)造方法傳入invokers,因此這個Directory的invokers是不會動態(tài)變化的,使用場景不多交胚;

public StaticDirectory(List<Invoker<T>> invokers) {
       this(null, invokers, null);
   }

   public StaticDirectory(List<Invoker<T>> invokers, List<Router> routers) {
       this(null, invokers, routers);
   }

   public StaticDirectory(URL url, List<Invoker<T>> invokers) {
       this(url, invokers, null);
   }

   public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers) {
       super(url == null && invokers != null && !invokers.isEmpty() ? invokers.get(0).getUrl() : url, routers);
       if (invokers == null || invokers.isEmpty())
           throw new IllegalArgumentException("invokers == null");
       this.invokers = invokers;
   }
  • RegistryDirectory

根據(jù)注冊中心的推送變更份汗,動態(tài)維護invokers列表;

整個集群大致模塊就到這里蝴簇。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末杯活,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子熬词,更是在濱河造成了極大的恐慌旁钧,老刑警劉巖,帶你破解...
    沈念sama閱讀 211,948評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件互拾,死亡現(xiàn)場離奇詭異歪今,居然都是意外死亡,警方通過查閱死者的電腦和手機颜矿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,371評論 3 385
  • 文/潘曉璐 我一進店門寄猩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人骑疆,你說我怎么就攤上這事田篇。” “怎么了封断?”我有些...
    開封第一講書人閱讀 157,490評論 0 348
  • 文/不壞的土叔 我叫張陵斯辰,是天一觀的道長。 經(jīng)常有香客問我坡疼,道長彬呻,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,521評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮闸氮,結(jié)果婚禮上剪况,老公的妹妹穿的比我還像新娘。我一直安慰自己蒲跨,他們只是感情好译断,可當(dāng)我...
    茶點故事閱讀 65,627評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著或悲,像睡著了一般孙咪。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上巡语,一...
    開封第一講書人閱讀 49,842評論 1 290
  • 那天翎蹈,我揣著相機與錄音,去河邊找鬼男公。 笑死荤堪,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的枢赔。 我是一名探鬼主播澄阳,決...
    沈念sama閱讀 38,997評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼踏拜!你這毒婦竟也來了碎赢?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,741評論 0 268
  • 序言:老撾萬榮一對情侶失蹤速梗,失蹤者是張志新(化名)和其女友劉穎揩抡,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體镀琉,經(jīng)...
    沈念sama閱讀 44,203評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,534評論 2 327
  • 正文 我和宋清朗相戀三年蕊唐,在試婚紗的時候發(fā)現(xiàn)自己被綠了屋摔。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,673評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡替梨,死狀恐怖钓试,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情副瀑,我是刑警寧澤弓熏,帶...
    沈念sama閱讀 34,339評論 4 330
  • 正文 年R本政府宣布,位于F島的核電站糠睡,受9級特大地震影響挽鞠,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,955評論 3 313
  • 文/蒙蒙 一信认、第九天 我趴在偏房一處隱蔽的房頂上張望材义。 院中可真熱鬧,春花似錦嫁赏、人聲如沸其掂。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,770評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽款熬。三九已至,卻和暖如春攘乒,著一層夾襖步出監(jiān)牢的瞬間贤牛,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,000評論 1 266
  • 我被黑心中介騙來泰國打工持灰, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留盔夜,地道東北人。 一個月前我還...
    沈念sama閱讀 46,394評論 2 360
  • 正文 我出身青樓堤魁,卻偏偏與公主長得像喂链,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子妥泉,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,562評論 2 349

推薦閱讀更多精彩內(nèi)容