前言
正常情況下臼婆,當(dāng)我們進(jìn)行系統(tǒng)設(shè)計(jì)時(shí)候,不僅要考慮正常邏輯下代碼該如何走,還要考慮異常情況下代碼邏輯應(yīng)該怎么走裹粤。當(dāng)服務(wù)消費(fèi)方調(diào)用服務(wù)提供方的服務(wù)出現(xiàn)錯(cuò)誤時(shí)候,Dubbo提供了多種容錯(cuò)方案蜂林,缺省模式為failover遥诉,也就是失敗重試。
Cluster 接口提供了我們常說的集群容錯(cuò)功能噪叙。
集群中的單個(gè)節(jié)點(diǎn)有一定概率出現(xiàn)一些問題矮锈,例如,磁盤損壞睁蕾、系統(tǒng)崩潰等苞笨,導(dǎo)致節(jié)點(diǎn)無法對(duì)外提供服務(wù),因此在分布式 RPC 框架中子眶,必須要重視這種情況瀑凝。為了避免單點(diǎn)故障,我們的 Provider 通常至少會(huì)部署在兩臺(tái)服務(wù)器上臭杰,以集群的形式對(duì)外提供服務(wù)粤咪,對(duì)于一些負(fù)載比較高的服務(wù),則需要部署更多 Provider 來抗住流量渴杆。
在 Dubbo 中寥枝,通過 Cluster 這個(gè)接口把一組可供調(diào)用的 Provider 信息組合成為一個(gè)統(tǒng)一的 Invoker 供調(diào)用方進(jìn)行調(diào)用。經(jīng)過 Router 過濾磁奖、LoadBalance 選址之后囊拜,選中一個(gè)具體 Provider 進(jìn)行調(diào)用,如果調(diào)用失敗点寥,則會(huì)按照集群的容錯(cuò)策略進(jìn)行容錯(cuò)處理艾疟。
Dubbo 默認(rèn)內(nèi)置了若干容錯(cuò)策略,并且每種容錯(cuò)策略都有自己獨(dú)特的應(yīng)用場(chǎng)景敢辩,我們可以通過配置選擇不同的容錯(cuò)策略。如果這些內(nèi)置容錯(cuò)策略不能滿足需求戚长,我們還可以通過自定義容錯(cuò)策略進(jìn)行配置盗冷。
Cluster 接口與容錯(cuò)機(jī)制
Cluster 的工作流程大致可以分為兩步(如下圖所示):①創(chuàng)建 Cluster Invoker 實(shí)例(在 Consumer 初始化時(shí),Cluster 實(shí)現(xiàn)類會(huì)創(chuàng)建一個(gè) Cluster Invoker 實(shí)例同廉,即下圖中的 merge 操作)仪糖;②使用 Cluster Invoker 實(shí)例(在 Consumer 服務(wù)消費(fèi)者發(fā)起遠(yuǎn)程調(diào)用請(qǐng)求的時(shí)候柑司,Cluster Invoker 會(huì)依賴前面課時(shí)介紹的 Directory、Router锅劝、LoadBalance 等組件得到最終要調(diào)用的 Invoker 對(duì)象)攒驰。
Cluster Invoker 獲取 Invoker 的流程大致可描述為如下:
1、通過 Directory 獲取 Invoker 列表故爵,以 RegistryDirectory 為例玻粪,會(huì)感知注冊(cè)中心的動(dòng)態(tài)變化,實(shí)時(shí)獲取當(dāng)前 Provider 對(duì)應(yīng)的 Invoker 集合诬垂。
2劲室、調(diào)用 Router 的 route() 方法進(jìn)行路由,過濾掉不符合路由規(guī)則的 Invoker 對(duì)象结窘。
3很洋、通過 LoadBalance 從 Invoker 列表中選擇一個(gè) Invoker。
4隧枫、ClusterInvoker 會(huì)將參數(shù)傳給 LoadBalance 選擇出的 Invoker 實(shí)例的 invoke 方法喉磁,進(jìn)行真正的遠(yuǎn)程調(diào)用。
這個(gè)過程是一個(gè)正常流程悠垛,沒有涉及容錯(cuò)處理线定。Dubbo 中常見的容錯(cuò)方式有如下幾個(gè):
Failover Cluster:失敗自動(dòng)切換。它是 Dubbo 的默認(rèn)容錯(cuò)機(jī)制确买,在請(qǐng)求一個(gè) Provider 節(jié)點(diǎn)失敗的時(shí)候斤讥,自動(dòng)切換其他 Provider 節(jié)點(diǎn),默認(rèn)執(zhí)行 3 次湾趾,適合冪等操作芭商。當(dāng)然,重試次數(shù)越多搀缠,在故障容錯(cuò)的時(shí)候帶給 Provider 的壓力就越大铛楣,在極端情況下甚至可能造成雪崩式的問題。
Failback Cluster:失敗自動(dòng)恢復(fù)艺普。失敗后記錄到隊(duì)列中簸州,通過定時(shí)器重試。
Failfast Cluster:快速失敗歧譬。請(qǐng)求失敗后返回異常岸浑,不進(jìn)行任何重試。
Failsafe Cluster:失敗安全瑰步。請(qǐng)求失敗后忽略異常矢洲,不進(jìn)行任何重試。
Forking Cluster:并行調(diào)用多個(gè) Provider 節(jié)點(diǎn)缩焦,只要有一個(gè)成功就返回读虏。
Broadcast Cluster:廣播多個(gè) Provider 節(jié)點(diǎn)责静,只要有一個(gè)節(jié)點(diǎn)失敗就失敗。
Available Cluster:遍歷所有的 Provider 節(jié)點(diǎn)盖桥,找到每一個(gè)可用的節(jié)點(diǎn)屠升,就直接調(diào)用凝化。如果沒有可用的 Provider 節(jié)點(diǎn)褐鸥,則直接拋出異常狠毯。
Mergeable Cluster:請(qǐng)求多個(gè) Provider 節(jié)點(diǎn)并將得到的結(jié)果進(jìn)行合并。
下面再來看 Cluster 接口靴拱。Cluster 接口是一個(gè)擴(kuò)展接口,通過 @SPI 注解的參數(shù)我們知道其使用的默認(rèn)實(shí)現(xiàn)是 FailoverCluster猾普,它只定義了一個(gè) join() 方法袜炕,在其上添加了 @Adaptive 注解,會(huì)動(dòng)態(tài)生成適配器類初家,其中會(huì)優(yōu)先根據(jù) Directory.getUrl() 方法返回的 URL 中的 cluster 參數(shù)值選擇擴(kuò)展實(shí)現(xiàn)偎窘,若無 cluster 參數(shù)則使用默認(rèn)的 FailoverCluster 實(shí)現(xiàn)。Cluster 接口的具體定義如下所示:
@SPI(Cluster.DEFAULT)
public interface Cluster {
String DEFAULT = FailoverCluster.NAME;
@Adaptive
<T> Invoker<T> join(Directory<T> directory) throws RpcException;
static Cluster getCluster(String name) {
return getCluster(name, true);
}
static Cluster getCluster(String name, boolean wrap) {
if (StringUtils.isEmpty(name)) {
name = Cluster.DEFAULT;
}
return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(name, wrap);
}
}
Cluster 接口的實(shí)現(xiàn)類如下圖所示溜在,分別對(duì)應(yīng)前面提到的多種容錯(cuò)策略:
在每個(gè) Cluster 接口實(shí)現(xiàn)中陌知,都會(huì)創(chuàng)建對(duì)應(yīng)的 Invoker 對(duì)象,這些都繼承自 AbstractClusterInvoker 抽象類掖肋,如下圖所示:
通過上面兩張繼承關(guān)系圖我們可以看出仆葡,Cluster 接口和 Invoker 接口都會(huì)有相應(yīng)的抽象實(shí)現(xiàn)類,這些抽象實(shí)現(xiàn)類都實(shí)現(xiàn)了一些公共能力志笼。下面我們就來深入介紹 AbstractClusterInvoker 和 AbstractCluster 這兩個(gè)抽象類沿盅。
AbstractClusterInvoker
了解了 Cluster Invoker 的繼承關(guān)系之后,首先來看 AbstractClusterInvoker纫溃,它有兩點(diǎn)核心功能:一個(gè)是實(shí)現(xiàn)的 Invoker 接口腰涧,對(duì) Invoker.invoke() 方法進(jìn)行通用的抽象實(shí)現(xiàn);另一個(gè)是實(shí)現(xiàn)通用的負(fù)載均衡算法紊浩。
在 AbstractClusterInvoker.invoke() 方法中窖铡,會(huì)通過 Directory 獲取 Invoker 列表,然后通過 SPI 初始化 LoadBalance坊谁,最后調(diào)用 doInvoke() 方法執(zhí)行子類的邏輯费彼。在 Directory.list() 方法返回 Invoker 集合之前,已經(jīng)使用 Router 進(jìn)行了一次篩選呜袁。
public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> {
@Override
public Result invoke(final Invocation invocation) throws RpcException {
// 檢測(cè)當(dāng)前Invoker是否已銷毀
checkWhetherDestroyed();
// 將RpcContext中的attachment添加到Invocation中
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}
// 通過Directory獲取Invoker對(duì)象列表敌买,通過對(duì)RegistryDirectory的介紹我們知道,其中已經(jīng)調(diào)用了Router進(jìn)行過濾
List<Invoker<T>> invokers = list(invocation);
// 通過SPI加載LoadBalance
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 調(diào)用doInvoke()方法阶界,該方法是個(gè)抽象方法
return doInvoke(invocation, invokers, loadbalance);
}
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
// 調(diào)用Directory.list()方法
return directory.list(invocation);
}
}
下面來看一下 AbstractClusterInvoker 是如何按照不同的 LoadBalance 算法從 Invoker 集合中選取最終 Invoker 對(duì)象的虹钮。
AbstractClusterInvoker 并沒有簡單粗暴地使用 LoadBalance.select() 方法完成負(fù)載均衡聋庵,而是做了進(jìn)一步的封裝,具體實(shí)現(xiàn)在 select() 方法中芙粱。在 select() 方法中會(huì)根據(jù)配置決定是否開啟粘滯連接特性祭玉,如果開啟了,則需要將上次使用的 Invoker 緩存起來春畔,只要 Provider 節(jié)點(diǎn)可用就直接調(diào)用脱货,不會(huì)再進(jìn)行負(fù)載均衡。如果調(diào)用失敗律姨,才會(huì)重新進(jìn)行負(fù)載均衡振峻,并且排除已經(jīng)重試過的 Provider 節(jié)點(diǎn)。
public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> {
// 第一個(gè)參數(shù)是此次使用的LoadBalance實(shí)現(xiàn)择份,第二個(gè)參數(shù)Invocation是此次服務(wù)調(diào)用的上下文信息扣孟,
// 第三個(gè)參數(shù)是待選擇的Invoker集合,第四個(gè)參數(shù)用來記錄負(fù)載均衡已經(jīng)選出來荣赶、嘗試過的Invoker集合
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
// 獲取調(diào)用方法名
String methodName = invocation == null ? StringUtils.EMPTY_STRING : invocation.getMethodName();
// 獲取sticky配置凤价,sticky表示粘滯連接,所謂粘滯連接是指Consumer會(huì)盡可能地
// 調(diào)用同一個(gè)Provider節(jié)點(diǎn)拔创,除非這個(gè)Provider無法提供服務(wù)
boolean sticky = invokers.get(0).getUrl()
.getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY);
// 檢測(cè)invokers列表是否包含sticky Invoker利诺,如果不包含,
// 說明stickyInvoker代表的服務(wù)提供者掛了剩燥,此時(shí)需要將其置空
if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
stickyInvoker = null;
}
// 如果開啟了粘滯連接特性慢逾,需要先判斷這個(gè)Provider節(jié)點(diǎn)是否已經(jīng)重試過了
if (sticky && stickyInvoker != null // 表示粘滯連接
&& (selected == null || !selected.contains(stickyInvoker))) {// 表示stickyInvoker未重試過
// 檢測(cè)當(dāng)前stickyInvoker是否可用,如果可用灭红,直接返回stickyInvoker
if (availablecheck && stickyInvoker.isAvailable()) {
return stickyInvoker;
}
}
// 執(zhí)行到這里氛改,說明前面的stickyInvoker為空,或者不可用
// 這里會(huì)繼續(xù)調(diào)用doSelect選擇新的Invoker對(duì)象
Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
if (sticky) {
// 是否開啟粘滯比伏,更新stickyInvoker字段
stickyInvoker = invoker;
}
return invoker;
}
}
doSelect() 方法主要做了兩件事:
一是通過 LoadBalance 選擇 Invoker 對(duì)象胜卤。
二是如果選出來的 Invoker 不穩(wěn)定或不可用,會(huì)調(diào)用 reselect() 方法進(jìn)行重選赁项。
public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> {
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
// 判斷是否需要進(jìn)行負(fù)載均衡葛躏,Invoker集合為空,直接返回null
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
if (invokers.size() == 1) {
// 只有一個(gè)Invoker對(duì)象悠菜,直接返回即可
return invokers.get(0);
}
// 通過LoadBalance實(shí)現(xiàn)選擇Invoker對(duì)象
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
// 如果LoadBalance選出的Invoker對(duì)象舰攒,已經(jīng)嘗試過請(qǐng)求了或不可用,則需要調(diào)用reselect()方法重選
if ((selected != null && selected.contains(invoker)) // Invoker已經(jīng)嘗試調(diào)用過了悔醋,但是失敗了
|| (!invoker.isAvailable() && getUrl() != null && availablecheck)) {// Invoker不可用
try {
// 調(diào)用reselect()方法重選
Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
// 如果重選的Invoker對(duì)象不為空摩窃,則直接返回這個(gè) rInvoker
if (rInvoker != null) {
invoker = rInvoker;
} else {
//Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
int index = invokers.indexOf(invoker);
try {
// 如果重選的Invoker對(duì)象為空,則返回該Invoker的下一個(gè)Invoker對(duì)象
invoker = invokers.get((index + 1) % invokers.size());
} catch (Exception e) {
logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
}
}
} catch (Throwable t) {
logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
}
}
return invoker;
}
}
reselect() 方法會(huì)重新進(jìn)行一次負(fù)載均衡,首先對(duì)未嘗試過的可用 Invokers 進(jìn)行負(fù)載均衡猾愿,如果已經(jīng)全部重試過了鹦聪,則將嘗試過的 Provider 節(jié)點(diǎn)過濾掉,然后在可用的 Provider 節(jié)點(diǎn)中重新進(jìn)行負(fù)載均衡蒂秘。
public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> {
private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck) throws RpcException {
// 用于記錄要重新進(jìn)行負(fù)載均衡的Invoker集合
List<Invoker<T>> reselectInvokers = new ArrayList<>(
invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());
// 將不在selected集合中的Invoker過濾出來進(jìn)行負(fù)載均衡
for (Invoker<T> invoker : invokers) {
if (availablecheck && !invoker.isAvailable()) {
continue;
}
if (selected == null || !selected.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
// reselectInvokers不為空時(shí)泽本,才需要通過負(fù)載均衡組件進(jìn)行選擇
if (!reselectInvokers.isEmpty()) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
// 只能對(duì)selected集合中可用的Invoker再次進(jìn)行負(fù)載均衡
if (selected != null) {
for (Invoker<T> invoker : selected) {
if ((invoker.isAvailable()) // available first
&& !reselectInvokers.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
}
if (!reselectInvokers.isEmpty()) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
return null;
}
}
AbstractCluster
常用的 ClusterInvoker 實(shí)現(xiàn)都繼承了 AbstractClusterInvoker 類型,對(duì)應(yīng)的 Cluster 擴(kuò)展實(shí)現(xiàn)都繼承了 AbstractCluster 抽象類姻僧。AbstractCluster 抽象類的核心邏輯是在 ClusterInvoker 外層包裝一層 ClusterInterceptor规丽,從而實(shí)現(xiàn)類似切面的效果。
下面是 ClusterInterceptor 接口的定義:
@SPI
public interface ClusterInterceptor {
// 前置攔截方法
void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);
// 后置攔截方法
void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);
// 調(diào)用ClusterInvoker的invoke()方法完成請(qǐng)求
default Result intercept(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) throws RpcException {
return clusterInvoker.invoke(invocation);
}
// 這個(gè)Listener用來監(jiān)聽請(qǐng)求的正常結(jié)果以及異常
interface Listener {
void onMessage(Result appResponse, AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);
void onError(Throwable t, AbstractClusterInvoker<?> clusterInvoker, Invocation invocation);
}
}
在 AbstractCluster 抽象類的 join() 方法中撇贺,首先會(huì)調(diào)用 doJoin() 方法獲取最終要調(diào)用的 Invoker 對(duì)象赌莺,doJoin() 是個(gè)抽象方法,由 AbstractCluster 子類根據(jù)具體的策略進(jìn)行實(shí)現(xiàn)松嘶。之后雄嚣,AbstractCluster.join() 方法會(huì)調(diào)用 buildClusterInterceptors() 方法加載 ClusterInterceptor 擴(kuò)展實(shí)現(xiàn)類,對(duì) Invoker 對(duì)象進(jìn)行包裝喘蟆。具體實(shí)現(xiàn)如下:
public abstract class AbstractCluster implements Cluster {
private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, String key) {
AbstractClusterInvoker<T> last = clusterInvoker;
// 通過SPI方式加載ClusterInterceptor擴(kuò)展實(shí)現(xiàn)
List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtension(clusterInvoker.getUrl(), key);
if (!interceptors.isEmpty()) {
for (int i = interceptors.size() - 1; i >= 0; i--) {
// 將InterceptorInvokerNode收尾連接到一起,形成調(diào)用鏈
final ClusterInterceptor interceptor = interceptors.get(i);
final AbstractClusterInvoker<T> next = last;
last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next);
}
}
return last;
}
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
// 擴(kuò)展名稱由reference.interceptor參數(shù)確定
return buildClusterInterceptors(doJoin(directory), directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY));
}
}
InterceptorInvokerNode 會(huì)將底層的 AbstractClusterInvoker 對(duì)象以及關(guān)聯(lián)的 ClusterInterceptor 對(duì)象封裝到一起鼓鲁,還會(huì)維護(hù)一個(gè) next 引用蕴轨,指向下一個(gè) InterceptorInvokerNode 對(duì)象。
在 InterceptorInvokerNode.invoke() 方法中骇吭,會(huì)先調(diào)用 ClusterInterceptor 的前置邏輯橙弱,然后執(zhí)行 intercept() 方法調(diào)用 AbstractClusterInvoker 的 invoke() 方法完成遠(yuǎn)程調(diào)用,最后執(zhí)行 ClusterInterceptor 的后置邏輯燥狰。具體實(shí)現(xiàn)如下:
public abstract class AbstractCluster implements Cluster {
protected class InterceptorInvokerNode<T> extends AbstractClusterInvoker<T> {
private AbstractClusterInvoker<T> clusterInvoker;
private ClusterInterceptor interceptor;
private AbstractClusterInvoker<T> next;
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
// 前置邏輯
interceptor.before(next, invocation);
// 執(zhí)行invoke()方法完成遠(yuǎn)程調(diào)用
asyncResult = interceptor.intercept(next, invocation);
} catch (Exception e) {
// onError callback
if (interceptor instanceof ClusterInterceptor.Listener) {
// 出現(xiàn)異常時(shí)棘脐,會(huì)觸發(fā)監(jiān)聽器的onError()方法
ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
listener.onError(e, clusterInvoker, invocation);
}
throw e;
} finally {
// 執(zhí)行后置邏輯
interceptor.after(next, invocation);
}
return asyncResult.whenCompleteWithContext((r, t) -> {
// onResponse callback
if (interceptor instanceof ClusterInterceptor.Listener) {
ClusterInterceptor.Listener listener = (ClusterInterceptor.Listener) interceptor;
if (t == null) {
// 正常返回時(shí),會(huì)調(diào)用onMessage()方法觸發(fā)監(jiān)聽器
listener.onMessage(r, clusterInvoker, invocation);
} else {
listener.onError(t, clusterInvoker, invocation);
}
}
});
}
}
}
Dubbo 提供了兩個(gè) ClusterInterceptor 實(shí)現(xiàn)類龙致,分別是 ConsumerContextClusterInterceptor 和 ZoneAwareClusterInterceptor蛀缝,如下圖所示:
ConsumerContextClusterInterceptor
在 ConsumerContextClusterInterceptor 的 before() 方法中,會(huì)在 RpcContext 中設(shè)置當(dāng)前 Consumer 地址目代、此次調(diào)用的 Invoker 等信息屈梁,同時(shí)還會(huì)刪除之前與當(dāng)前線程綁定的 Server Context。在 after() 方法中榛了,會(huì)刪除本地 RpcContext 的信息在讶。ConsumerContextClusterInterceptor 的具體實(shí)現(xiàn)如下:
@Activate
public class ConsumerContextClusterInterceptor implements ClusterInterceptor, ClusterInterceptor.Listener {
@Override
public void before(AbstractClusterInvoker<?> invoker, Invocation invocation) {
// 獲取當(dāng)前線程綁定的RpcContext
RpcContext context = RpcContext.getContext();
// 設(shè)置Invoker、Consumer地址等信息
context.setInvocation(invocation).setLocalAddress(NetUtils.getLocalHost(), 0);
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
RpcContext.removeServerContext();
}
@Override
public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
// 刪除本地RpcContext的信息
RpcContext.removeContext(true);
}
}
ConsumerContextClusterInterceptor 同時(shí)繼承了 ClusterInterceptor.Listener 接口霜大,在其 onMessage() 方法中构哺,會(huì)獲取響應(yīng)中的 attachments 并設(shè)置到 RpcContext 中的 SERVER_LOCAL 之中,具體實(shí)現(xiàn)如下:
@Activate
public class ConsumerContextClusterInterceptor implements ClusterInterceptor, ClusterInterceptor.Listener {
@Override
public void onMessage(Result appResponse, AbstractClusterInvoker<?> invoker, Invocation invocation) {
// 從AppResponse中獲取attachment战坤,并設(shè)置到SERVER_LOCAL這個(gè)RpcContext中
RpcContext.getServerContext().setObjectAttachments(appResponse.getObjectAttachments());
}
}
介紹完 ConsumerContextClusterInterceptor曙强,再來看 ZoneAwareClusterInterceptor残拐。
ZoneAwareClusterInterceptor
在 ZoneAwareClusterInterceptor 的 before() 方法中,會(huì)從 RpcContext 中獲取多注冊(cè)中心相關(guān)的參數(shù)并設(shè)置到 Invocation 中(主要是 registry_zone 參數(shù)和 registry_zone_force 參數(shù)旗扑,這兩個(gè)參數(shù)的具體含義蹦骑,在后面分析 ZoneAwareClusterInvoker 時(shí)詳細(xì)介紹),ZoneAwareClusterInterceptor 的 after() 方法為空實(shí)現(xiàn)臀防。ZoneAwareClusterInterceptor 的具體實(shí)現(xiàn)如下:
@Activate(value = "cluster:zone-aware")
public class ZoneAwareClusterInterceptor implements ClusterInterceptor {
@Override
public void before(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
RpcContext rpcContext = RpcContext.getContext();
// 從RpcContext中獲取registry_zone參數(shù)和registry_zone_force參數(shù)
String zone = (String) rpcContext.getAttachment(REGISTRY_ZONE);
String force = (String) rpcContext.getAttachment(REGISTRY_ZONE_FORCE);
// 檢測(cè)用戶是否提供了ZoneDetector接口的擴(kuò)展實(shí)現(xiàn)
ExtensionLoader<ZoneDetector> loader = ExtensionLoader.getExtensionLoader(ZoneDetector.class);
if (StringUtils.isEmpty(zone) && loader.hasExtension("default")) {
ZoneDetector detector = loader.getExtension("default");
zone = detector.getZoneOfCurrentRequest(invocation);
force = detector.isZoneForcingEnabled(invocation, zone);
}
// 將registry_zone參數(shù)和registry_zone_force參數(shù)設(shè)置到Invocation中
if (StringUtils.isNotEmpty(zone)) {
invocation.setAttachment(REGISTRY_ZONE, zone);
}
if (StringUtils.isNotEmpty(force)) {
invocation.setAttachment(REGISTRY_ZONE_FORCE, force);
}
}
@Override
public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
}
}
需要注意的是眠菇,ZoneAwareClusterInterceptor 沒有實(shí)現(xiàn) ClusterInterceptor.Listener 接口,也就是不提供監(jiān)聽響應(yīng)的功能袱衷。
總結(jié)
本文主要介紹的是 Dubbo Cluster 層中容錯(cuò)機(jī)制相關(guān)的內(nèi)容捎废。首先,了解了集群容錯(cuò)機(jī)制的作用致燥。然后登疗,介紹了 Cluster 接口的定義以及其各個(gè)實(shí)現(xiàn)類的核心功能。之后嫌蚤,深入講解了 AbstractClusterInvoker 的實(shí)現(xiàn)辐益,其核心是實(shí)現(xiàn)了一套通用的負(fù)載均衡算法。最后脱吱,還分析了 AbstractCluster 抽象實(shí)現(xiàn)類以及其中涉及的 ClusterInterceptor 接口的內(nèi)容智政。