下面我們來分析一下Dubbo的集群容錯機(jī)制。我們先來看一下各個節(jié)點(diǎn)之間的關(guān)系
這里的Invoker是Provider的一個可調(diào)用Service的抽象蓝晒,Invoker封裝了Provider地址及Service接口信息腮出。
Directory代表多個Invoker,可以把它看成List<Invoker>芝薇,但與List不同的是利诺,它的值可能是動態(tài)變化的,比如
注冊中心推送變更剩燥。
Cluster將Directory中的多個Invoker偽裝成一個Invoker慢逾,對上層透明,偽裝過程包含了容錯邏輯灭红,調(diào)用失敗
后侣滩,重試另一個。
Router負(fù)責(zé)從多個Invoker中按路由規(guī)則選出子集变擒,比如讀寫分離君珠,應(yīng)用隔離等。
LoadBalance負(fù)責(zé)從多個Invoker中選出具體的一個用于本次調(diào)用娇斑,選的過程包含了負(fù)載均衡算法策添,調(diào)用失敗
后,需要重選毫缆。
再來說一下Dubbo的集群的容錯模式
Failover Cluster
- 失敗自動切換唯竹,當(dāng)出現(xiàn)失敗,重試其它服務(wù)器苦丁。(缺省)
- 通常用于讀操作浸颓,但重試會帶來更長延遲。
- 可通過retries="2"來設(shè)置重試次數(shù)(不含第一次)旺拉。
Failfast Cluster
- 快速失敗产上,只發(fā)起一次調(diào)用,失敗立即報(bào)錯蛾狗。
- 通常用于非冪等性的寫操作晋涣,比如新增記錄。
Failsafe Cluster
- 失敗安全沉桌,出現(xiàn)異常時谢鹊,直接忽略算吩。
- 通常用于寫入審計(jì)日志等操作。
Failback Cluster
- 失敗自動恢復(fù)撇贺,后臺記錄失敗請求赌莺,定時重發(fā)冰抢。
- 通常用于消息通知操作松嘶。
Forking Cluster
- 并行調(diào)用多個服務(wù)器,只要一個成功即返回挎扰。
- 通常用于實(shí)時性要求較高的讀操作翠订,但需要浪費(fèi)更多服務(wù)資源。
- 可通過forks="2"來設(shè)置最大并行數(shù)遵倦。
Broadcast Cluster
廣播調(diào)用所有提供者尽超,逐個調(diào)用,任意一臺報(bào)錯則報(bào)錯梧躺。(2.1.0開始支持)
-
通常用于通知所有提供者更新緩存或日志等本地資源信息似谁。
重試次數(shù)配置如:(failover集群模式生效)<dubbo:service retries="2" />
或:
<dubbo:reference retries="2" />
或:
<dubbo:reference>
<dubbo:method name="findFoo" retries="2" />
</dubbo:reference>
集群模式配置如:
<dubbo:service cluster="failsafe" />
或:
<dubbo:reference cluster="failsafe" />
那么Dubbo的集群容錯是怎么實(shí)現(xiàn)的呢,現(xiàn)在我們來看一下掠哥。
先看一下Cluster接口
/**
* Merge the directory invokers to a virtual invoker.
*
* 基于 Directory 巩踏,創(chuàng)建 Invoker 對象,實(shí)現(xiàn)統(tǒng)一续搀、透明的 Invoker 調(diào)用過程
*
* @param directory Directory 對象
* @param <T> 泛型
* @return cluster invoker
* @throws RpcException
*/
@Adaptive
<T> Invoker<T> join(Directory<T> directory) throws RpcException;
下面我們來介紹幾個Cluster的實(shí)現(xiàn)類
AvailableCluster塞琼,失敗自動切換,當(dāng)出現(xiàn)失敗禁舷,重試其它服務(wù)器彪杉。通常用于讀操作,但重試會帶來更長延遲牵咙∨山可通過 retries="2" 來設(shè)置重試次數(shù)(不含第一次)。
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new AvailableClusterInvoker<T>(directory);
}
看一下AvailableClusterInvoker這個類
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// 循環(huán)候選的 Invoker 集合洁桌,調(diào)用首個可用的 Invoker 對象构哺。
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) { // 可用
// 發(fā)起 RPC 調(diào)用
return invoker.invoke(invocation);
}
}
throw new RpcException("No provider available in " + invokers);
}
BroadcastCluster,廣播調(diào)用所有提供者战坤,逐個調(diào)用曙强,任意一臺報(bào)錯則報(bào)錯。通常用于通知所有提供者更新緩存或日志等本地資源信息途茫。
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new BroadcastClusterInvoker<T>(directory);
}
看一下BroadcastClusterInvoker的實(shí)現(xiàn)
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// 檢查 invokers 即可用Invoker集合是否為空碟嘴,如果為空,那么拋出異常
checkInvokers(invokers, invocation);
// 設(shè)置已經(jīng)調(diào)用的 Invoker 集合囊卜,到 Context 中
RpcContext.getContext().setInvokers((List) invokers);
// 保存最后一次調(diào)用的異常
RpcException exception = null;
// 保存最后一次調(diào)用的結(jié)果
Result result = null;
// 循環(huán)候選的 Invoker 集合娜扇,調(diào)用所有 Invoker 對象错沃。
for (Invoker<T> invoker : invokers) {
try {
// 發(fā)起 RPC 調(diào)用
result = invoker.invoke(invocation);
} catch (RpcException e) {
exception = e;
logger.warn(e.getMessage(), e);
} catch (Throwable e) {
exception = new RpcException(e.getMessage(), e); // 封裝成 RpcException 異常
logger.warn(e.getMessage(), e);
}
}
// 若存在一個異常,拋出該異常
if (exception != null) {
throw exception;
}
return result;
}
FailbackCluster雀瓢,失敗自動恢復(fù)枢析,后臺記錄失敗請求,定時重發(fā)刃麸。通常用于消息通知操作醒叁。
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailbackClusterInvoker<T>(directory);
}
看一下FailbackClusterInvoker的實(shí)現(xiàn)
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
// 檢查 invokers 即可用Invoker集合是否為空,如果為空泊业,那么拋出異常
checkInvokers(invokers, invocation);
// 根據(jù)負(fù)載均衡機(jī)制從 invokers 中選擇一個Invoker
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
// RPC 調(diào)用得到 Result
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e);
// 添加到失敗任務(wù)
addFailed(invocation, this);
return new RpcResult(); // ignore
}
}
FailfastCluster把沼,快速失敗,只發(fā)起一次調(diào)用吁伺,失敗立即報(bào)錯饮睬。通常用于非冪等性的寫操作,比如新增記錄篮奄。
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailfastClusterInvoker<T>(directory);
}
看一下FailfastClusterInvoker的實(shí)現(xiàn)
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// 檢查 invokers 即可用Invoker集合是否為空捆愁,如果為空,那么拋出異常
checkInvokers(invokers, invocation);
// 根據(jù)負(fù)載均衡機(jī)制從 invokers 中選擇一個Invoker
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
// RPC 調(diào)用得到 Result
return invoker.invoke(invocation);
} catch (Throwable e) {
// 若是業(yè)務(wù)性質(zhì)的異常窟却,直接拋出
if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
throw (RpcException) e;
}
// 封裝 RpcException 異常昼丑,并拋出
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
"Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
}
FailoverCluster,敗自動切換间校,當(dāng)出現(xiàn)失敗矾克,重試其它服務(wù)器。通常用于讀操作憔足,但重試會帶來更長延遲胁附。可通過 retries="2" 來設(shè)置重試次數(shù)(不含第一次)滓彰。
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
看下FailoverClusterInvoker的實(shí)現(xiàn)
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
// 檢查copyinvokers即可用Invoker集合是否為空控妻,如果為空,那么拋出異常
checkInvokers(copyinvokers, invocation);
// 得到最大可調(diào)用次數(shù):最大可重試次數(shù)+1揭绑,默認(rèn)最大可重試次數(shù)Constants.DEFAULT_RETRIES=2
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// 保存最后一次調(diào)用的異常
// retry loop.
RpcException le = null; // last exception.
// 保存已經(jīng)調(diào)用過的Invoker
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
// failover機(jī)制核心實(shí)現(xiàn):如果出現(xiàn)調(diào)用失敗弓候,那么重試其他服務(wù)器
for (int i = 0; i < len; i++) {
// Reselect before retry to avoid a change of candidate `invokers`.
// NOTE: if `invokers` changed, then `invoked` also lose accuracy.
// 重試時,進(jìn)行重新選擇他匪,避免重試時invoker列表已發(fā)生變化.
// 注意:如果列表發(fā)生了變化菇存,那么invoked判斷會失效,因?yàn)閕nvoker示例已經(jīng)改變
if (i > 0) {
checkWhetherDestroyed();
// 根據(jù)Invocation調(diào)用信息從Directory中獲取所有可用Invoker
copyinvokers = list(invocation);
// check again
// 重新檢查一下
checkInvokers(copyinvokers, invocation);
}
// 根據(jù)負(fù)載均衡機(jī)制從copyinvokers中選擇一個Invoker
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
// 保存每次調(diào)用的Invoker
invoked.add(invoker);
// 設(shè)置已經(jīng)調(diào)用的 Invoker 集合邦蜜,到 Context 中
RpcContext.getContext().setInvokers((List) invoked);
try {
// RPC 調(diào)用得到 Result
Result result = invoker.invoke(invocation);
// 重試過程中依鸥,將最后一次調(diào)用的異常信息以 warn 級別日志輸出
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + invocation.getMethodName()
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
// 如果是業(yè)務(wù)性質(zhì)的異常,不再重試悼沈,直接拋出
if (e.isBiz()) { // biz exception.
throw e;
}
// 其他性質(zhì)的異常統(tǒng)一封裝成RpcException
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
// 最大可調(diào)用次數(shù)用完還得到Result的話贱迟,拋出RpcException異常:重試了N次還是失敗姐扮,并輸出最后一次異常信息
throw new RpcException(le.getCode(), "Failed to invoke the method " + invocation.getMethodName()
+ " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers "
+ providers + " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion()
+ ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
FailsafeCluster,失敗安全衣吠,出現(xiàn)異常時茶敏,直接忽略。通常用于寫入審計(jì)日志等操作缚俏。
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailsafeClusterInvoker<T>(directory);
}
看一下FailsafeClusterInvoker的實(shí)現(xiàn)
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
// 檢查 invokers 即可用Invoker集合是否為空惊搏,如果為空,那么拋出異常
checkInvokers(invokers, invocation);
// 根據(jù)負(fù)載均衡機(jī)制從 invokers 中選擇一個Invoker
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
// RPC 調(diào)用得到 Result
return invoker.invoke(invocation);
} catch (Throwable e) {
// 打印異常日志
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
// 忽略異常
return new RpcResult(); // ignore
}
}
ForkingCluster袍榆,并行調(diào)用多個服務(wù)器胀屿,只要一個成功即返回塘揣。通常用于實(shí)時性要求較高的讀操作包雀,但需要浪費(fèi)更多服務(wù)資源∏渍。可通過 forks="2" 來設(shè)置最大并行數(shù)才写。
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new ForkingClusterInvoker<T>(directory);
}
看一下ForkingClusterInvoker的實(shí)現(xiàn)
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// 檢查 invokers 即可用Invoker集合是否為空,如果為空奖蔓,那么拋出異常
checkInvokers(invokers, invocation);
// 保存選擇的 Invoker 集合
final List<Invoker<T>> selected;
// 得到最大并行數(shù)赞草,默認(rèn)為 Constants.DEFAULT_FORKS = 2
final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
// 獲得調(diào)用超時時間,默認(rèn)為 DEFAULT_TIMEOUT = 1000 毫秒
final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 若最大并行書小于等于 0吆鹤,或者大于 invokers 的數(shù)量厨疙,直接使用 invokers
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
// 循環(huán),根據(jù)負(fù)載均衡機(jī)制從 invokers疑务,中選擇一個個Invoker 沾凄,從而組成 Invoker 集合。
// 注意知允,因?yàn)樵黾恿伺胖剡壿嬋鲶埃圆荒鼙WC獲得的 Invoker 集合的大小,小于最大并行數(shù)
selected = new ArrayList<Invoker<T>>();
for (int i = 0; i < forks; i++) {
// 在invoker列表(排除selected)后,如果沒有選夠,則存在重復(fù)循環(huán)問題.見select實(shí)現(xiàn).
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) { //Avoid add the same invoker several times. //防止重復(fù)添加invoker
selected.add(invoker);
}
}
}
// 設(shè)置已經(jīng)調(diào)用的 Invoker 集合温鸽,到 Context 中
RpcContext.getContext().setInvokers((List) selected);
// 異常計(jì)數(shù)器
final AtomicInteger count = new AtomicInteger();
// 創(chuàng)建阻塞隊(duì)列
final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
// 循環(huán) selected 集合保屯,提交線程池,發(fā)起 RPC 調(diào)用
for (final Invoker<T> invoker : selected) {
executor.execute(new Runnable() {
public void run() {
try {
// RPC 調(diào)用涤垫,獲得 Result 結(jié)果
Result result = invoker.invoke(invocation);
// 添加 Result 到 `ref` 阻塞隊(duì)列
ref.offer(result);
} catch (Throwable e) {
// 異常計(jì)數(shù)器 + 1
int value = count.incrementAndGet();
// 若 RPC 調(diào)結(jié)果都是異常姑尺,則添加異常到 `ref` 阻塞隊(duì)列
if (value >= selected.size()) {
ref.offer(e);
}
}
}
});
}
try {
// 從 `ref` 隊(duì)列中,阻塞等待結(jié)果
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
// 若是異常結(jié)果蝠猬,拋出 RpcException 異常
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
// 若是正常結(jié)果切蟋,直接返回
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
}
}
MergeableCluster,分組聚合 Cluster 實(shí)現(xiàn)類
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MergeableClusterInvoker<T>(directory);
}
看一下MergeableClusterInvoker的實(shí)現(xiàn)
public Result invoke(final Invocation invocation) throws RpcException {
// 獲得 Invoker 集合
List<Invoker<T>> invokers = directory.list(invocation);
// 獲得 Merger 拓展名
String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
// 若果未配置拓展吱雏,直接調(diào)用首個可用的 Invoker 對象
if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group
for (final Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
return invoker.invoke(invocation);
}
}
return invokers.iterator().next().invoke(invocation);
}
// 通過反射敦姻,獲得返回類型
Class<?> returnType;
try {
returnType = getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
} catch (NoSuchMethodException e) {
returnType = null;
}
// 提交線程池瘾境,并行執(zhí)行,發(fā)起 RPC 調(diào)用镰惦,并添加到 results 中
Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
for (final Invoker<T> invoker : invokers) {
Future<Result> future = executor.submit(new Callable<Result>() {
public Result call() {
// RPC 調(diào)用
return invoker.invoke(new RpcInvocation(invocation, invoker));
}
});
results.put(invoker.getUrl().getServiceKey(), future);
}
// 阻塞等待執(zhí)行執(zhí)行結(jié)果迷守,并添加到 resultList 中
List<Result> resultList = new ArrayList<Result>(results.size());
int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
Future<Result> future = entry.getValue();
try {
Result r = future.get(timeout, TimeUnit.MILLISECONDS);
if (r.hasException()) { // 異常 Result ,打印錯誤日志旺入,忽略
log.error(new StringBuilder(32).append("Invoke ").append(getGroupDescFromServiceKey(entry.getKey())).append(" failed: ").append(r.getException().getMessage()).toString(), r.getException());
} else { // 正常 Result 兑凿,添加到 resultList 中
resultList.add(r);
}
} catch (Exception e) { // 異常,拋出 RpcException 異常
throw new RpcException(new StringBuilder(32).append("Failed to invoke service ").append(entry.getKey()).append(": ").append(e.getMessage()).toString(), e);
}
}
// 結(jié)果大小為空茵瘾,返回空的 RpcResult
if (resultList.isEmpty()) {
return new RpcResult((Object) null);
// 結(jié)果大小為 1 礼华,返回首個 RpcResult
} else if (resultList.size() == 1) {
return resultList.iterator().next();
}
// 返回類型為 void ,返回空的 RpcResult
if (returnType == void.class) {
return new RpcResult((Object) null);
}
Object result;
// 【第 1 種】基于合并方法
if (merger.startsWith(".")) {
// 獲得合并方法 Method
merger = merger.substring(1);
Method method;
try {
method = returnType.getMethod(merger, returnType);
} catch (NoSuchMethodException e) {
throw new RpcException(new StringBuilder(32).append("Can not merge result because missing method [ ").append(merger).append(" ] in class [ ").append(returnType.getClass().getName()).append(" ]").toString());
}
// 有 Method 拗秘,進(jìn)行合并
if (method != null) {
if (!Modifier.isPublic(method.getModifiers())) {
method.setAccessible(true);
}
result = resultList.remove(0).getValue();
try {
// 方法返回類型匹配床嫌,合并時,修改 result
if (method.getReturnType() != void.class && method.getReturnType().isAssignableFrom(result.getClass())) {
for (Result r : resultList) {
result = method.invoke(result, r.getValue());
}
// 方法返回類型不匹配诱告,合并時悲柱,不修改 result
} else {
for (Result r : resultList) {
method.invoke(result, r.getValue());
}
}
} catch (Exception e) {
throw new RpcException(new StringBuilder(32).append("Can not merge result: ").append(e.getMessage()).toString(), e);
}
// 無 Method ,拋出 RpcException 異常
} else {
throw new RpcException(new StringBuilder(32).append("Can not merge result because missing method [ ").append(merger).append(" ] in class [ ").append(returnType.getClass().getName()).append(" ]").toString());
}
// 【第 2 種】基于 Merger
} else {
Merger resultMerger;
// 【第 2.1 種】根據(jù)返回值類型自動匹配 Merger
if (ConfigUtils.isDefault(merger)) {
resultMerger = MergerFactory.getMerger(returnType);
// 【第 2.2 種】指定 Merger
} else {
resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
}
// 有 Merger 凡涩,進(jìn)行合并
if (resultMerger != null) {
List<Object> rets = new ArrayList<Object>(resultList.size());
for (Result r : resultList) {
rets.add(r.getValue());
}
result = resultMerger.merge(rets.toArray((Object[]) Array.newInstance(returnType, 0)));
// 無 Merger 棒搜,拋出 RpcException 異常
} else {
throw new RpcException("There is no merger to merge result.");
}
}
// 返回 RpcResult 結(jié)果
return new RpcResult(result);
}
Dubbo的集群容錯就介紹到這里了。