集群調(diào)用
為了提供性能與保持高可用屿岂,通常一個(gè)接口能力會(huì)有多個(gè)提供者,也就是集群(Cluster)爷怀。
下面分析一下,Dubbo中一次invoke(調(diào)用)的流程磺浙。
首先介紹一下幾個(gè)關(guān)鍵類徒坡,可以結(jié)合具體流程在進(jìn)行消化瘤缩。
cluster
各節(jié)點(diǎn)關(guān)系(參考Dubbo官網(wǎng))
-
Cluster
將Directory
中的多個(gè)Invoker
偽裝成一個(gè)Invoker
,對(duì)上層透明锦溪,偽裝過程包含了容錯(cuò)邏輯府怯,調(diào)用失敗后牺丙,重試另一個(gè)
-
Invoker
是Provider
的一個(gè)可調(diào)用Service
的抽象,Invoker
封裝了Provider
地址及Service
接口信息亿昏。
image
-
Directory
代表多個(gè)Invoker
角钩,可以把它看成List<Invoker>
呻澜,但與List
不同的是羹幸,它的值可能是動(dòng)態(tài)變化的,比如注冊(cè)中心推送變更 -
Router
負(fù)責(zé)從多個(gè)Invoker
中按路由規(guī)則選出子集供炼,比如讀寫分離窘疮,應(yīng)用隔離等 -
LoadBalance
負(fù)責(zé)從多個(gè)Invoker
中選出具體的一個(gè)用于本次調(diào)用袋哼,選的過程包含了負(fù)載均衡算法,調(diào)用失敗后闸衫,需要重選
invoke流程
使用的模板方法設(shè)計(jì)模式,可以看到一些主流程都在父類的AbstractClusterInvoker中蔚出,一些特殊實(shí)現(xiàn)如doXxx(比如doList、doInvoke)根據(jù)配置的不同子類骄酗,或者根據(jù)SPI定制化實(shí)現(xiàn)稀余。
AbstractClusterInvoker#invoke
public Result invoke(final Invocation invocation) throws RpcException {
//校驗(yàn)服務(wù)可用性,是否已經(jīng)被摧毀
checkWhetherDestroyed();
//將一些上下文信息綁定到invocation中睛琳,可以傳遞到遠(yuǎn)程服務(wù)器
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}
//將可用的invokers全部列出來(DynamicDirectory、RegistryDirectory)踏烙,再根據(jù)router規(guī)則進(jìn)行過濾選出子集师骗,如讀寫分離等
List<Invoker<T>> invokers = list(invocation);
//進(jìn)行負(fù)載均衡,選擇真正調(diào)用的一個(gè)提供者讨惩,默認(rèn)使用random
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
//若是異步調(diào)用則記錄調(diào)用id辟癌,為了冪等
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
//根據(jù)集群容錯(cuò)策略進(jìn)行實(shí)際調(diào)用根據(jù)荐捻,默認(rèn)failover
return doInvoke(invocation, invokers, loadbalance);
}
可以看到很多使用默認(rèn)值黍少,默認(rèn)實(shí)現(xiàn)是如何選擇的呢仍侥?可以看到是通過SPI機(jī)制指定缺省實(shí)現(xiàn)的
@SPI(Cluster.DEFAULT)
public interface Cluster {
String DEFAULT = FailoverCluster.NAME;
}
public class FailoverCluster extends AbstractCluster {
public final static String NAME = "failover";
}
集群容錯(cuò)
容錯(cuò)策略有很多,這里分析缺省實(shí)現(xiàn)Failover
Failover:當(dāng)調(diào)用失敗時(shí),記錄初始錯(cuò)誤并重試其他調(diào)用者(重試n次传于,這意味著最多將調(diào)用n個(gè)不同的調(diào)用者)請(qǐng)注意,重試會(huì)導(dǎo)致延遲沼溜。默認(rèn)延遲為5s系草。適合實(shí)時(shí)性不高的讀操作。
FailoverClusterInvoker#doInvoke也就是父類最后一步的具體調(diào)用實(shí)現(xiàn)唆涝。
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
//校驗(yàn)invokers是否為空
checkInvokers(copyInvokers, invocation);
//獲取方法名
String methodName = RpcUtils.getMethodName(invocation);
//從url中獲取配置的重試次數(shù)并進(jìn)行+1找都,若配置重試次數(shù)為2,那么最多調(diào)用3次廊酣。
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
//防御性編程能耻,若配置的為負(fù)數(shù)或0也會(huì)進(jìn)行一次調(diào)用
if (len <= 0) {
len = 1;
}
// retry loop.
// last exception.
RpcException le = null;
//已經(jīng)調(diào)用過的會(huì)在后續(xù)負(fù)載均衡進(jìn)行過濾。
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size());
//調(diào)用過的服務(wù)進(jìn)行地址記錄
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
//校驗(yàn)是否被銷毀
checkWhetherDestroyed();
//列出提供者
copyInvokers = list(invocation);
// check again是否為空
checkInvokers(copyInvokers, invocation);
}
//進(jìn)行負(fù)載均衡選擇亡驰,如果選擇出來invoker已經(jīng)被調(diào)用過則reselect一把
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
//將調(diào)用過的放入list晓猛,日志記錄使用
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
//業(yè)務(wù)異常則拋出
if (e.isBiz()) {
throw e;
}
le = e;
} catch (Throwable e) {
//網(wǎng)絡(luò)等異常則進(jìn)行重試并記錄上一次的異常
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
整體流程
image