前言
image
上一篇降到movk本地偽裝其實是對Cluster集群容錯類實例的再次包裝,調(diào)完mockInvoker的invoker方法后,會調(diào)到被包裝的Cluster實例的invoker,對真正的遠程調(diào)用執(zhí)行各種集群容錯的策略。
image
集群容錯策略的選擇
- 配置方式 :指定集群容錯策略為available`
@Reference(cluster="AvailableCluster")
private UserService userService;
- 默認策略 :failover
Cluster實例都是從SPI工廠里加載而來的,如果沒有主動配置,默認得到的就是接口上@SPI的值
image
Cluster$Adaptive源代碼 :
public class Cluster$Adaptive implements com.alibaba.dubbo.rpc.cluster.Cluster {
public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
// 從url拿cluster的配置,沒有配就failover
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("cluster", "failover");
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
return extension.join(arg0);
}
}
FailoverClusterInvoker
當首次調(diào)用失敗后,還會重試指定次數(shù)
循環(huán)最大調(diào)用次數(shù)( 1+重試次數(shù)),每次都去發(fā)起遠程調(diào)用,如果其中一次調(diào)用成功就return,結(jié)束循環(huán),否則繼續(xù)下一次調(diào)用如果超出重試次數(shù)還沒有調(diào)用成功,就拋rpc異常
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
//獲取重試次數(shù)
int len = getUrl().getMethodParameter(methodName,Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
//已經(jīng)調(diào)用過了的服務(wù)列表
List<Invoker<T>> invoked = newArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
//如果掉完一次后变逃,服務(wù)列表更新了奢米,再次獲取服務(wù)列表
if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
// check again
checkInvokers(copyinvokers, invocation);
}
//根據(jù)負載均衡算法,選擇一個服務(wù)調(diào)用
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers,invoked);
//記錄已經(jīng)調(diào)用過的invoker
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
//具體的服務(wù)調(diào)用邏輯
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " +invocation.getMethodName()
+ " in the service " + getInterface().getName()
+ " was successful by the provider " +invoker.getUrl().getAddress()
+ ", but there have been failed providers " +providers
+ " (" + providers.size() + "/" +copyinvokers.size()
+ ") from the registry " +directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " +Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
// 期間有一次調(diào)用成功就返回結(jié)果,結(jié)束循環(huán)
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
// 如果超出重試次數(shù)還沒有調(diào)用成功,就拋rpc異常
throw new RpcException(le != null ? le.getCode() : 0, "Failed toinvoke the method "
+ invocation.getMethodName() + " in the service " +getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using thedubbo version "
+ Version.getVersion() + ". Last error is: "
+ (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
}
FailBackClusterInvoker
出錯返回空結(jié)果,異步重試蜗侈,起一個線程朝蜘,等待配置的時間,進行重試
catch到異常
image
image
FailfastClusterInvoker
catch到異常并直接拋出RpcException胞此,不進行重試,以快速響應(yīng)
image
FailsafeClusterInvoker
直接吞掉異常,不進行任何處理,返回空的結(jié)果,適用于對結(jié)果不敏感的業(yè)務(wù)
image
ForkingClusterInvoker
同時調(diào)n臺(默認為2臺)主機臣咖,選擇最快返回結(jié)果的那臺主機返回的結(jié)果
負載均衡后選出與fork數(shù)相同的invoker
image
selecte出幾個invoker就起對應(yīng)數(shù)量的縣城,去發(fā)起遠程調(diào)用,并異步把結(jié)果放到LinkedBlockingQueue 阻塞隊列中
image
一旦從隊列poll到第一個result,就return 結(jié)果
image
BroadcastClusterInvoker
服務(wù)列表里所有主機都會被調(diào),原子廣播
image
AvailableClusterInvoker
沒有負載均衡沒有重試,調(diào)服務(wù)之前會先查看tcp存在有效的長連接,確保后端服務(wù)正常,如果有,再調(diào)用具體的業(yè)務(wù)方法漱牵。用于對數(shù)據(jù)安全要求比較高的業(yè)務(wù)
image
image