前言
在上一章節(jié)侈玄,我們?cè)岬竭@樣一個(gè)問(wèn)題:
當(dāng)調(diào)用服務(wù)失敗后古掏,我們?cè)趺刺幚懋?dāng)前的請(qǐng)求宜肉?拋出異常亦或是重試匀钧?
為了解決這個(gè)問(wèn)題,Dubbo 定義了集群接口 Cluster 以及 Cluster Invoker谬返。集群 Cluster 用途是將多個(gè)服務(wù)提供者合并為一個(gè) Cluster Invoker之斯,并將這個(gè) Invoker 暴露給服務(wù)消費(fèi)者。這樣一來(lái)遣铝,服務(wù)消費(fèi)者只需通過(guò)這個(gè) Invoker 進(jìn)行遠(yuǎn)程調(diào)用即可佑刷,至于具體調(diào)用哪個(gè)服務(wù)提供者,以及調(diào)用失敗后如何處理等問(wèn)題酿炸,現(xiàn)在都交給集群模塊去處理瘫絮。
一、合并
在服務(wù)引用的過(guò)程中填硕,我們最終會(huì)將一個(gè)或多個(gè)服務(wù)提供者Invoker封裝成服務(wù)目錄對(duì)象麦萤,但最后還要將它合并轉(zhuǎn)換成Cluster Invoker對(duì)象鹿鳖。
Invoker invoker = cluster.join(directory);
這里的cluster就是擴(kuò)展點(diǎn)自適應(yīng)類,在Dubbo中默認(rèn)是Failover壮莹,所以上面代碼會(huì)調(diào)用到:
public class FailoverCluster implements Cluster {
public final static String NAME = "failover";
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
}
上面的代碼很簡(jiǎn)單翅帜,所以最后的Invoker對(duì)象指向的是FailoverClusterInvoker
實(shí)例。它也是一個(gè)Invoker命满,它繼承了抽象的AbstractClusterInvoker
涝滴。
我們看下AbstractClusterInvoker
類中的invoke方法。
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
public Result invoke(final Invocation invocation) throws RpcException {
LoadBalance loadbalance = null;
//調(diào)用服務(wù)目錄胶台,獲取所有的服務(wù)提供者Invoker對(duì)象
List<Invoker<T>> invokers = directory.list(invocation);
if (invokers != null && !invokers.isEmpty()) {
//加載負(fù)載均衡組件
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).
getExtension(invokers.get(0).getUrl().
getMethodParameter(invocation.getMethodName(), "loadbalance", "random"));
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
//調(diào)用子類實(shí)現(xiàn) 歼疮,不同的集群容錯(cuò)機(jī)制
return doInvoke(invocation, invokers, loadbalance);
}
}
以上代碼也很簡(jiǎn)單,我們分為三個(gè)步驟來(lái)看
- 調(diào)用服務(wù)目錄概作,獲取所有的服務(wù)提供者列表
- 加載負(fù)載均衡組件
- 調(diào)用子類實(shí)現(xiàn)腋妙,轉(zhuǎn)發(fā)請(qǐng)求
關(guān)于負(fù)載均衡我們后續(xù)再深入了解,這是只知道它負(fù)責(zé)從多個(gè)Invoker中選取一個(gè)返回就行讯榕。
二骤素、集群容錯(cuò)策略
Dubbo為我們提供了多種集群容錯(cuò)機(jī)制。主要如下:
- Failover Cluster - 失敗自動(dòng)切換
FailoverClusterInvoker在調(diào)用失敗時(shí)愚屁,會(huì)自動(dòng)切換 Invoker 進(jìn)行重試济竹。默認(rèn)配置下,Dubbo 會(huì)使用這個(gè)類作為缺省 Cluster Invoker霎槐。
- Failfast Cluster - 快速失敗
FailfastClusterInvoker 只會(huì)進(jìn)行一次調(diào)用送浊,失敗后立即拋出異常。
- Failsafe Cluster - 失敗安全
FailsafeClusterInvoker 當(dāng)調(diào)用過(guò)程中出現(xiàn)異常時(shí)丘跌,僅會(huì)打印異常袭景,而不會(huì)拋出異常。
- Failback Cluster - 失敗自動(dòng)恢復(fù)
FailbackClusterInvoker 會(huì)在調(diào)用失敗后闭树,返回一個(gè)空結(jié)果給服務(wù)提供者耸棒。并通過(guò)定時(shí)任務(wù)對(duì)失敗的調(diào)用進(jìn)行重傳,適合執(zhí)行消息通知等操作报辱。
- Forking Cluster - 并行調(diào)用多個(gè)服務(wù)提供者
ForkingClusterInvoker 會(huì)在運(yùn)行時(shí)通過(guò)線程池創(chuàng)建多個(gè)線程与殃,并發(fā)調(diào)用多個(gè)服務(wù)提供者。只要有一個(gè)服務(wù)提供者成功返回了結(jié)果碍现,doInvoke 方法就會(huì)立即結(jié)束運(yùn)行幅疼。ForkingClusterInvoker 的應(yīng)用場(chǎng)景是在一些對(duì)實(shí)時(shí)性要求比較高讀操作(注意是讀操作,并行寫操作可能不安全)下使用昼接,但這將會(huì)耗費(fèi)更多的資源爽篷。
- BroadcastClusterInvoker - 廣播
BroadcastClusterInvoker 會(huì)逐個(gè)調(diào)用每個(gè)服務(wù)提供者,如果其中一臺(tái)報(bào)錯(cuò)慢睡,在循環(huán)調(diào)用結(jié)束后狼忱,BroadcastClusterInvoker 會(huì)拋出異常膨疏。該類通常用于通知所有提供者更新緩存或日志等本地資源信息。
三钻弄、自動(dòng)切換
FailoverClusterInvoker 在調(diào)用失敗時(shí),會(huì)自動(dòng)切換 Invoker 進(jìn)行重試者吁。我們重點(diǎn)看它的doInvoke
方法窘俺。
public Result doInvoke(Invocation invocation,
final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
//檢查invokers是否為空
checkInvokers(copyinvokers, invocation);
//獲取重試次數(shù) 這里默認(rèn)是3次
int len = getUrl().getMethodParameter(invocation.getMethodName(), "retries",2) + 1;
if (len <= 0) {
len = 1;
}
//異常信息對(duì)象
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size());
Set<String> providers = new HashSet<String>(len);
//循環(huán)調(diào)用 失敗重試len次
for (int i = 0; i < len; i++) {
if (i > 0) {
checkWhetherDestroyed();
//重新獲取服務(wù)提供者列表
copyinvokers = list(invocation);
//再次檢查
checkInvokers(copyinvokers, invocation);
}
//通過(guò)loadbalance選取一個(gè)Invoker
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
//調(diào)用服務(wù)
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("");
}
return result;
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
//重試失敗
throw new RpcException("");
}
我們可以看到,它的重點(diǎn)是invoker的調(diào)用是在一個(gè)循環(huán)方法中复凳。只要不return瘤泪,就會(huì)一直調(diào)用,重試 len 次育八。我們總結(jié)下它的過(guò)程:
- 檢查invokers是否為空
- 獲取重試次數(shù)对途,默認(rèn)為3
- 進(jìn)入循環(huán)
- 如果是重試,再次獲取服務(wù)提供者列表髓棋,并校驗(yàn)
- 選取Invoker实檀,并調(diào)用
- 無(wú)異常,返回結(jié)果按声,循環(huán)結(jié)束
- 捕獲到異常膳犹,繼續(xù)循環(huán)調(diào)用直至重試最大次數(shù)
四、快速失敗
FailfastClusterInvoker就很簡(jiǎn)單了签则,它只會(huì)進(jìn)行一次調(diào)用须床,失敗后立即拋出異常。
public Result doInvoke(Invocation invocation,
List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
return invoker.invoke(invocation);
} catch (Throwable e) {
if (e instanceof RpcException && ((RpcException) e).isBiz()) {
throw (RpcException) e;
}
throw new RpcException("....");
}
}
五渐裂、失敗安全
FailsafeClusterInvoker跟上面這個(gè)差異不大豺旬,它調(diào)用失敗后并不拋出異常。而是打印異常信息并返回一個(gè)空的結(jié)果對(duì)象柒凉。
public Result doInvoke(Invocation invocation,
List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
return new RpcResult();
}
}
六族阅、自動(dòng)恢復(fù)
FailbackClusterInvoker 會(huì)在調(diào)用失敗后,也是打印異常信息并返回一個(gè)空的結(jié)果對(duì)象扛拨,但是還沒(méi)結(jié)束耘分,它還會(huì)偷偷開(kāi)啟一個(gè)定時(shí)任務(wù),再次去調(diào)用绑警。
protected Result doInvoke(Invocation invocation,
List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failback to invoke method " + invocation.getMethodName() + ",
wait for retry in background. Ignored exception: "
+ e.getMessage() + ", ", e);
//添加失敗信息
addFailed(invocation, this);
return new RpcResult();
}
}
我們可以看到求泰,調(diào)用失敗后,除了打印異常信息和返回空結(jié)果對(duì)象之外计盒,還有一個(gè)方法addFailed
它就是開(kāi)啟定時(shí)任務(wù)的地方渴频。
1、開(kāi)啟定時(shí)任務(wù)
首先北启,定義一個(gè)包含2個(gè)線程的線程池對(duì)象卜朗。
Executors.newScheduledThreadPool(2, new NamedThreadFactory("failback-cluster-timer", true));
然后拔第,延遲5秒后,每隔5秒調(diào)用retryFailed
方法场钉,直到調(diào)用成功蚊俺。
private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
if (retryFuture == null) {
synchronized (this) {
if (retryFuture == null) {
retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
//重試方法
retryFailed();
} catch (Throwable t) {
logger.error("Unexpected error occur at collect statistic", t);
}
}
}, 5000, 5000, TimeUnit.MILLISECONDS);
}
}
}
//ConcurrentHashMap 添加失敗任務(wù)
failed.put(invocation, router);
}
最后,我們需要注意failed.put(invocation, router);
它將當(dāng)前失敗的任務(wù)添加到failed逛万,它是一個(gè)ConcurrentHashMap對(duì)象泳猬。
2、重試
重試的邏輯也不復(fù)雜宇植,從failed對(duì)象中獲取失敗的記錄得封,調(diào)用即可。
void retryFailed() {
//如果為空指郁,說(shuō)明已經(jīng)沒(méi)有了失敗的任務(wù)
if (failed.size() == 0) {
return;
}
//遍歷failed忙上,對(duì)失敗的調(diào)用進(jìn)行重試
Set<Entry<Invocation, AbstractClusterInvoker<?>>> failedSet = failed.entrySet();
for (Entry<Invocation, AbstractClusterInvoker<?>> entry : failedSet) {
Invocation invocation = entry.getKey();
Invoker<?> invoker = entry.getValue();
try {
// 再次進(jìn)行調(diào)用
invoker.invoke(invocation);
// 調(diào)用成功后,從 failed 中移除 invoker
failed.remove(invocation);
} catch (Throwable e) {
logger.error("......", e);
}
}
}
如上代碼闲坎,其中的重點(diǎn)是調(diào)用成功后疫粥,要將invocation移除。當(dāng)再次調(diào)用到這個(gè)方法箫柳,開(kāi)頭的條件判斷成立手形,就直接返回,不再繼續(xù)調(diào)用悯恍。
3库糠、問(wèn)題
實(shí)際上,這套自動(dòng)恢復(fù)的機(jī)制是有點(diǎn)小問(wèn)題的涮毫。只要有一次調(diào)用失敗瞬欧,就會(huì)開(kāi)啟定時(shí)任務(wù)不斷重試調(diào)用,直至成功罢防。但問(wèn)題是艘虎,即便重試調(diào)用成功后,定時(shí)任務(wù)并不會(huì)關(guān)閉咒吐,會(huì)持續(xù)的調(diào)用retryFailed
方法野建。雖然這個(gè)方法有個(gè)判斷,會(huì)直接返回恬叹。
如果服務(wù)調(diào)用失敗次數(shù)多了之后候生,就會(huì)有大量的線程以5s的間隔,不斷調(diào)用這個(gè)方法绽昼。
這句話不嚴(yán)謹(jǐn)唯鸭。當(dāng)時(shí)筆者是新開(kāi)的消費(fèi)者端項(xiàng)目,才看到有大量的新建線程硅确;但如果是同一個(gè)服務(wù)中目溉,自始至終就是一開(kāi)始創(chuàng)建的2個(gè)線程在運(yùn)行明肮。不過(guò)空跑的情況依然存在。
筆者建議缭付,如果有此類需求柿估,不要直接用Dubbo中的這個(gè)Cluster。最好利用SPI機(jī)制重寫一個(gè)方法來(lái)實(shí)現(xiàn)蛉腌。
七官份、并行調(diào)用
ForkingClusterInvoker 會(huì)在運(yùn)行時(shí)通過(guò)線程池創(chuàng)建多個(gè)線程,并發(fā)調(diào)用多個(gè)服務(wù)提供者烙丛。只要有一個(gè)服務(wù)提供者成功返回了結(jié)果,doInvoke 方法就會(huì)立即結(jié)束運(yùn)行羔味。
public Result doInvoke(final Invocation invocation,
List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
final List<Invoker<T>> selected;
//獲取最大并行數(shù) 默認(rèn)為2
final int forks = getUrl().getParameter("forks", 2);
//超時(shí)時(shí)間
final int timeout = getUrl().getParameter("timeout", 1000);
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
selected = new ArrayList<Invoker<T>>();
//選擇Invoker 并添加到selected
for (int i = 0; i < forks; i++) {
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
selected.add(invoker);
}
}
}
RpcContext.getContext().setInvokers((List) selected);
final AtomicInteger count = new AtomicInteger();
//阻塞隊(duì)列 先進(jìn)先出
final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
for (final Invoker<T> invoker : selected) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
//調(diào)用服務(wù) 將結(jié)果放入隊(duì)列
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch (Throwable e) {
//如果異常調(diào)用次數(shù)大于等于最大并行數(shù)
int value = count.incrementAndGet();
if (value >= selected.size()) {
ref.offer(e);
}
}
}
});
}
try {
//從隊(duì)列中獲取結(jié)果
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {
Throwable e = (Throwable) ret;
throw new RpcException("....");
}
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException(e.getMessage(), e);
}
}
以上代碼的重點(diǎn)就是阻塞隊(duì)列LinkedBlockingQueue河咽。如果有結(jié)果放入,poll方法會(huì)立即返回赋元,完成整個(gè)調(diào)用忘蟹。我們?cè)倏偨Y(jié)下整體流程:
- 獲取最大并行數(shù),默認(rèn)為2搁凸;獲取超時(shí)時(shí)間
- 選擇Invoker媚值,并添加到selected
- 通過(guò)newCachedThreadPool創(chuàng)建多個(gè)線程,調(diào)用服務(wù)护糖。
- 正常返回后褥芒,將結(jié)果offer到隊(duì)列。此時(shí)調(diào)用流程結(jié)束嫡良,返回正常信息锰扶。
- 調(diào)用服務(wù)異常后,判斷異常次數(shù)是否大于等于最大并行數(shù)寝受,條件成立則將異常信息offer到隊(duì)列坷牛,此時(shí)調(diào)用流程結(jié)束,返回異常信息很澄。
八京闰、廣播
BroadcastClusterInvoker 會(huì)逐個(gè)調(diào)用每個(gè)服務(wù)提供者,如果其中一臺(tái)報(bào)錯(cuò)甩苛,在循環(huán)調(diào)用結(jié)束后蹂楣,BroadcastClusterInvoker 會(huì)拋出異常。
public Result doInvoke(final Invocation invocation,
List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
RpcContext.getContext().setInvokers((List) invokers);
RpcException exception = null;
Result result = null;
//循環(huán)調(diào)用服務(wù)
for (Invoker<T> invoker : invokers) {
try {
result = invoker.invoke(invocation);
} catch (RpcException e) {
exception = e;
logger.warn(e.getMessage(), e);
} catch (Throwable e) {
exception = new RpcException(e.getMessage(), e);
logger.warn(e.getMessage(), e);
}
}
//異常
if (exception != null) {
throw exception;
}
return result;
}