13.dubbo源碼-集群容錯(cuò)

集群容錯(cuò)

在集群調(diào)用失敗時(shí),Dubbo 提供了多種容錯(cuò)方案弃理,缺省為 failover 想罕,即失敗重試∪Τ海可通過接口com.alibaba.dubbo.rpc.cluster.Cluster的SPI注解可知:

/**
 * Cluster. (SPI, Singleton, ThreadSafe)
 * 
 * <a >Cluster</a>
 * <a >Fault-Tolerant</a>
 * 
 * @author william.liangf
 */
@SPI(FailoverCluster.NAME)
public interface Cluster {
    ...
}

接下來通過對dubbo源碼的分析惫周,一一講解這些集群容錯(cuò)模式的具體實(shí)現(xiàn);

集群調(diào)用關(guān)系

cluster.jpg

圖片來源: https://dubbo.gitbooks.io/dubbo-user-book/demos/fault-tolerent-strategy.html
由圖可知康栈,通過Cluster的調(diào)用過程如下:

  1. 調(diào)用list()從Directory中取得可用Invoker集合递递;
  2. 根據(jù)路由規(guī)則過濾一些Invoker,得到可用Invoker集合啥么;
  3. 根據(jù)負(fù)載均衡機(jī)制得到一個(gè)合適的Invoker登舞,負(fù)載均衡機(jī)制參考
  4. 調(diào)用最終選出來的這個(gè)Invoker。

集群模式配置

按照以下示例在服務(wù)提供方和消費(fèi)方配置集群模式

<dubbo:service cluster="failover" />

<dubbo:reference cluster="failsafe" />

集群模式概覽

dubbo支持的集群模式如下圖所示悬荣,由于dubbo通過SPI實(shí)現(xiàn)微內(nèi)核菠秒,集群模式也不例外,所以想擴(kuò)展自己對集群容錯(cuò)的處理方式氯迂,非常簡單践叠;


dubbo集群容錯(cuò)總覽

接下來通過對源碼的閱讀,一一分析各個(gè)集群容錯(cuò)模式的實(shí)現(xiàn)嚼蚀;

Failover Cluster

dubbo默認(rèn)集群模式禁灼,失敗自動(dòng)切換,當(dāng)出現(xiàn)失敗轿曙,重試其它服務(wù)器弄捕。通常用于讀操作,但重試會(huì)帶來更長延遲导帝,且使集群的壓力更大守谓。可通過 retries="2" 來設(shè)置重試次數(shù)(默認(rèn)為2舟扎,這個(gè)值是重試次數(shù)分飞,所以不包括第一次調(diào)用,而是第一次調(diào)用失敗后最大可重試次數(shù))睹限。重試次數(shù)配置示例如下:

<dubbo:service retries="2" />

<dubbo:reference retries="2" />

<dubbo:reference><dubbo:method name="findFoo" retries="2" /></dubbo:reference>

核心實(shí)現(xiàn)源碼:

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyinvokers = invokers;
    // 檢查copyinvokers即可用Invoker集合是否為空譬猫,如果為空讯檐,那么拋出異常
    checkInvokers(copyinvokers, invocation);
    // 得到最大可調(diào)用次數(shù):最大可重試次數(shù)+1,默認(rèn)最大可重試次數(shù)Constants.DEFAULT_RETRIES=2
    int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    // 如果用戶設(shè)置reties為負(fù)數(shù)染服,那么也要調(diào)用至少1次
    if (len <= 0) {
        len = 1;
    }
    // 保存最后一次調(diào)用的異常
    RpcException le = null;
    // 保存已經(jīng)調(diào)用過的Invoker
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    // failover機(jī)制核心實(shí)現(xiàn):如果出現(xiàn)調(diào)用失敗别洪,那么重試其他服務(wù)器
    for (int i = 0; i < len; i++) {
        //重試時(shí),進(jìn)行重新選擇柳刮,避免重試時(shí)invoker列表已發(fā)生變化.
        //注意:如果列表發(fā)生了變化挖垛,那么invoked判斷會(huì)失效,因?yàn)閕nvoker示例已經(jīng)改變
        if (i > 0) {
            checkWheatherDestoried();
            // 根據(jù)Invocation調(diào)用信息從Directory中獲取所有可用Invoker
            copyinvokers = list(invocation);
            //重新檢查一下
            checkInvokers(copyinvokers, invocation);
        }
        // 根據(jù)負(fù)載均衡機(jī)制從copyinvokers中選擇一個(gè)Invoker
        Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
        // 保存每次調(diào)用的Invoker
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List)invoked);
        try {
            // RPC調(diào)用得到Result
            Result result = invoker.invoke(invocation);
            // 重試過程中秉颗,將最后一次調(diào)用的異常信息以warn級別日志輸出
            if (le != null && logger.isWarnEnabled()) {
                logger.warn("Although retry the method " + invocation.getMethodName()
                        + " in the service " + getInterface().getName()
                        + " was successful by the provider " + invoker.getUrl().getAddress()
                        + ", but there have been failed providers " + providers 
                        + " (" + providers.size() + "/" + copyinvokers.size()
                        + ") from the registry " + directory.getUrl().getAddress()
                        + " on the consumer " + NetUtils.getLocalHost()
                        + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                        + le.getMessage(), le);
            }
            return result;
        } catch (RpcException e) {
            // 如果是業(yè)務(wù)性質(zhì)的異常痢毒,不再重試,直接拋出
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            // 其他性質(zhì)的異常統(tǒng)一封裝成RpcException
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    // 最大可調(diào)用次數(shù)用完還得到Result的話蚕甥,拋出RpcException異常:重試了N次還是失敗哪替,并輸出最后一次異常信息
    throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
            + invocation.getMethodName() + " in the service " + getInterface().getName() 
            + ". Tried " + len + " times of the providers " + providers 
            + " (" + providers.size() + "/" + copyinvokers.size() 
            + ") from the registry " + directory.getUrl().getAddress()
            + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
            + Version.getVersion() + ". Last error is: "
            + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
}

Failfast Cluster

快速失敗,只發(fā)起一次調(diào)用菇怀,失敗立即報(bào)錯(cuò)凭舶。通常用于非冪等性的寫操作,比如新增記錄爱沟。
核心實(shí)現(xiàn)源碼:

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    checkInvokers(invokers, invocation);
    Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
    try {
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        if (e instanceof RpcException && ((RpcException)e).isBiz()) { // biz exception.
            throw (RpcException) e;
        }
        throw new RpcException(e instanceof RpcException ? ((RpcException)e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
    }
}

FailfastCluster實(shí)現(xiàn)比較簡單帅霜,根據(jù)負(fù)載均衡機(jī)制選擇一個(gè)Invoker后只調(diào)用1次,不管結(jié)果如何呼伸,不再進(jìn)行任何重試:如果調(diào)用正常就返回Result身冀,否則返回<u>記錄了詳細(xì)異常信息的RpcException</u>;

Failsafe Cluster

失敗安全蜂大,出現(xiàn)異常時(shí)闽铐,直接忽略蝶怔。通常用于寫入審計(jì)日志等操作奶浦。
核心實(shí)現(xiàn)源碼:

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    try {
        checkInvokers(invokers, invocation);
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        logger.error("Failsafe ignore exception: " + e.getMessage(), e);
        return new RpcResult(); // ignore
    }
}

FailsafeCluster實(shí)現(xiàn)比較簡單,根據(jù)負(fù)載均衡機(jī)制選擇一個(gè)Invoker后只調(diào)用1次踢星,不管結(jié)果如何澳叉,不再進(jìn)行任何重試:如果調(diào)用正常就返回Result,否則返回<u>一個(gè)空的RpcResult</u>沐悦,這是和FailfastCluster的唯一區(qū)別成洗,不會(huì)把任何異常信息返回給consumer;

Failback Cluster

失敗自動(dòng)恢復(fù)藏否,后臺記錄失敗請求瓶殃,定時(shí)重發(fā)。通常用于消息通知操作副签。
核心實(shí)現(xiàn)源碼:

protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    try {
        checkInvokers(invokers, invocation);
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        return invoker.invoke(invocation);
    } catch (Throwable e) {
        logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
                             + e.getMessage() + ", ", e);
        // failback實(shí)現(xiàn)的核心遥椿,如果調(diào)用失敗基矮,后臺記錄失敗請求,并定時(shí)重發(fā)
        addFailed(invocation, this);
        return new RpcResult(); // ignore
    }
}

定時(shí)重發(fā)核心實(shí)現(xiàn)源碼:

// 處理重試任務(wù)的線程池
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new NamedThreadFactory("failback-cluster-timer", true));

private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
    if (retryFuture == null) {
        // double-check保證線程安全
        synchronized (this) {
            if (retryFuture == null) {
                // 一個(gè)獨(dú)立的線程池處理冠场,執(zhí)行周期是5s
                retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
                    public void run() {
                        // 收集統(tǒng)計(jì)信息
                        try {
                            // 重試失敗的請求家浇,如果重試成功,把請求從remove掉碴裙;
                            retryFailed();
                        } catch (Throwable t) { // 防御性容錯(cuò)
                            logger.error("Unexpected error occur at collect statistic", t);
                        }
                    }
                }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
            }
        }
    }
    failed.put(invocation, router);
}

Forking Cluster

并行調(diào)用多個(gè)服務(wù)器钢悲,只要一個(gè)成功即返回。通常用于實(shí)時(shí)性要求較高的讀操作舔株,但需要浪費(fèi)更多服務(wù)資源莺琳。可通過 forks="2" 來設(shè)置最大并行數(shù)载慈。
核心實(shí)現(xiàn)源碼:

public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    checkInvokers(invokers, invocation);
    final List<Invoker<T>> selected;
    // forks數(shù)芦昔,默認(rèn)為2
    final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
    // 請求超時(shí)
    final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    // 如果設(shè)置的forks值為負(fù)數(shù),或者超過了可用Invoker數(shù)娃肿,那么選擇所有可用Invoker咕缎,即invokers
    if (forks <= 0 || forks >= invokers.size()) {
        selected = invokers;
    } else {
        selected = new ArrayList<Invoker<T>>();
        // 只選擇forks值指定的Invoker數(shù)量
        for (int i = 0; i < forks; i++) {
            //在invoker列表(排除selected)后,如果沒有選夠,則存在重復(fù)循環(huán)問題.見select實(shí)現(xiàn).
            Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
            if(!selected.contains(invoker)){//防止重復(fù)添加invoker
                selected.add(invoker);
            }
        }
    }
    RpcContext.getContext().setInvokers((List)selected);
    final AtomicInteger count = new AtomicInteger();
    final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
    for (final Invoker<T> invoker : selected) {
        // ForkingCluster核心實(shí)現(xiàn),多線程并行調(diào)用
        executor.execute(new Runnable() {
            public void run() {
                try {
                    Result result = invoker.invoke(invocation);
                    // 把結(jié)果放到BlockingQueue中
                    ref.offer(result);
                } catch(Throwable e) {
                    int value = count.incrementAndGet();
                    if (value >= selected.size()) {
                        ref.offer(e);
                    }
                }
            }
        });
    }
    try {
        // 從BlockingQueue中取結(jié)果:即并行調(diào)用最先返回的結(jié)果
        Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
        // 如果取得的是異常料扰,那么將異常封裝成RpcException并拋給Consumer
        if (ret instanceof Throwable) {
            Throwable e = (Throwable) ret;
            throw new RpcException(e instanceof RpcException ? ((RpcException)e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
        }
        return (Result) ret;
    } catch (InterruptedException e) {
        // 如果timeout指定超時(shí)時(shí)間內(nèi)還沒有返回結(jié)果凭豪,那么將異常封裝成RpcException并拋給Consumer
        throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
    }
}

Broadcast Cluster

廣播調(diào)用所有提供者,逐個(gè)調(diào)用晒杈,任意一臺報(bào)錯(cuò)則報(bào)錯(cuò) 嫂伞。通常用于通知所有提供者更新緩存或日志等本地資源信息。
核心實(shí)現(xiàn)源碼:

public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    checkInvokers(invokers, invocation);
    RpcContext.getContext().setInvokers((List)invokers);
    // 保存最后一個(gè)調(diào)用的異常
    RpcException exception = null;
    Result result = null;
    for (Invoker<T> invoker: invokers) {
        try {
            // 遍歷所有Invoker拯钻,每個(gè)Invoker都會(huì)被調(diào)用(不管某個(gè)Invoker是否拋出異常)
            result = invoker.invoke(invocation);
        } catch (RpcException e) {
            exception = e;
            logger.warn(e.getMessage(), e);
        } catch (Throwable e) {
            exception = new RpcException(e.getMessage(), e);
            logger.warn(e.getMessage(), e);
        }
    }
    // 如果調(diào)用過程有異常帖努,那么拋出該異常
    if (exception != null) {
        throw exception;
    }
    return result;
}

Available Cluster

遍歷所有從Directory中l(wèi)ist出來的Invoker集合,調(diào)用第一個(gè)isAvailable()Invoker粪般,只發(fā)起一次調(diào)用拼余,失敗立即報(bào)錯(cuò)。
isAvailable()判斷邏輯如下--Client處理連接狀態(tài)亩歹,且不是READONLY:

@Override
public boolean isAvailable() {
    if (!super.isAvailable())
        return false;
    for (ExchangeClient client : clients){
        if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)){
            //cannot write == not Available ?
            return true ;
        }
    }
    return false;
}

Mergeable Cluster

請戳鏈接14. dubbo源碼-集群容錯(cuò)之MergeableCluster

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末匙监,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子小作,更是在濱河造成了極大的恐慌亭姥,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,651評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件顾稀,死亡現(xiàn)場離奇詭異达罗,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)静秆,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,468評論 3 392
  • 文/潘曉璐 我一進(jìn)店門粮揉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來绍载,“玉大人,你說我怎么就攤上這事滔蝉』骼埽” “怎么了?”我有些...
    開封第一講書人閱讀 162,931評論 0 353
  • 文/不壞的土叔 我叫張陵蝠引,是天一觀的道長阳谍。 經(jīng)常有香客問我,道長螃概,這世上最難降的妖魔是什么矫夯? 我笑而不...
    開封第一講書人閱讀 58,218評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮吊洼,結(jié)果婚禮上训貌,老公的妹妹穿的比我還像新娘。我一直安慰自己冒窍,他們只是感情好递沪,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,234評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著综液,像睡著了一般款慨。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上谬莹,一...
    開封第一講書人閱讀 51,198評論 1 299
  • 那天檩奠,我揣著相機(jī)與錄音,去河邊找鬼附帽。 笑死埠戳,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的蕉扮。 我是一名探鬼主播整胃,決...
    沈念sama閱讀 40,084評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼慢显!你這毒婦竟也來了爪模?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,926評論 0 274
  • 序言:老撾萬榮一對情侶失蹤荚藻,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后洁段,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體应狱,經(jīng)...
    沈念sama閱讀 45,341評論 1 311
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,563評論 2 333
  • 正文 我和宋清朗相戀三年祠丝,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了疾呻。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片除嘹。...
    茶點(diǎn)故事閱讀 39,731評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖岸蜗,靈堂內(nèi)的尸體忽然破棺而出尉咕,到底是詐尸還是另有隱情,我是刑警寧澤璃岳,帶...
    沈念sama閱讀 35,430評論 5 343
  • 正文 年R本政府宣布年缎,位于F島的核電站,受9級特大地震影響铃慷,放射性物質(zhì)發(fā)生泄漏单芜。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,036評論 3 326
  • 文/蒙蒙 一犁柜、第九天 我趴在偏房一處隱蔽的房頂上張望洲鸠。 院中可真熱鬧,春花似錦馋缅、人聲如沸扒腕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,676評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽袜匿。三九已至,卻和暖如春稚疹,著一層夾襖步出監(jiān)牢的瞬間居灯,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,829評論 1 269
  • 我被黑心中介騙來泰國打工内狗, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留怪嫌,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,743評論 2 368
  • 正文 我出身青樓柳沙,卻偏偏與公主長得像岩灭,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子赂鲤,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,629評論 2 354

推薦閱讀更多精彩內(nèi)容