先看下Dubbo官方的一張圖
Cluster是容錯(cuò)的核心,官方的說(shuō)法是
Cluster 將 Directory 中的多個(gè) Invoker 偽裝成一個(gè) Invoker囤攀,對(duì)上層透明品洛,偽裝過(guò)程包含了容錯(cuò)邏輯裁赠,調(diào)用失敗后问窃,重試另一個(gè)
即Cluster是對(duì)外暴露的一個(gè)接口,內(nèi)部返回一個(gè)集群版的Invoker傻寂,通過(guò)不同的容錯(cuò)策略息尺,對(duì)從Directory中獲取的invoker有不同的調(diào)用方式
下面看下代碼實(shí)現(xiàn)
AvailableCluster
public class AvailableCluster implements Cluster {
public static final String NAME = "available";
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new AbstractClusterInvoker<T>(directory) {
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
return invoker.invoke(invocation);
}
}
throw new RpcException("No provider available in " + invokers);
}
};
}
}
這個(gè)策略很簡(jiǎn)單,就是從List<Invoker<T>>中獲取一個(gè)可用的Invoker來(lái)進(jìn)行調(diào)用
BroadcastCluster
public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
RpcContext.getContext().setInvokers((List)invokers);
RpcException exception = null;
Result result = null;
for (Invoker<T> invoker: invokers) {
try {
result = invoker.invoke(invocation);
} catch (RpcException e) {
exception = e;
} catch (Throwable e) {
exception = e;
}
}
if (exception != null) {
throw exception;
}
return result;
}
}
看名字就知道大概的意思疾掰,廣播調(diào)用搂誉。看代碼也很簡(jiǎn)單静檬,遍歷List<Invoker<T>>進(jìn)行調(diào)用炭懊,如果有一個(gè)出現(xiàn)異常則拋出異常
FailbackCluster
public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {
private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>();
public FailbackClusterInvoker(Directory<T> directory){
super(directory);
}
private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
if (retryFuture == null) {
synchronized (this) {
if (retryFuture == null) {
retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
public void run() {
// 收集統(tǒng)計(jì)信息
try {
retryFailed();
} catch (Throwable t) { // 防御性容錯(cuò)
logger.error("Unexpected error occur at collect statistic", t);
}
}
}, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
}
}
}
failed.put(invocation, router);
}
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
addFailed(invocation, this);
return new RpcResult(); // ignore
}
}
}
到這里就和上面的幾種方式不一樣了,上面的都是直接對(duì)List<Invoker<T>>進(jìn)行操作拂檩,而這里首先需要通過(guò)負(fù)載均衡策略獲取到一個(gè)Invoker侮腹,然后才進(jìn)行調(diào)用,這個(gè)方法是普遍的稻励,那么出錯(cuò)怎么處理才是核心父阻。
這里出錯(cuò)的時(shí)候,會(huì)調(diào)用addFailed方法
首先addFailed方法望抽,使用了double-check的方式來(lái)初始化retryFuture加矛,保證其是單例的。如果是已經(jīng)初始化的煤篙,直接放入map中等待定時(shí)重試斟览,如何重試那么要看retryFailed方法
void retryFailed() {
if (failed.size() == 0) {
return;
}
for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(
failed).entrySet()) {
Invocation invocation = entry.getKey();
Invoker<?> invoker = entry.getValue();
try {
invoker.invoke(invocation);
failed.remove(invocation);
} catch (Throwable e) {
logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
}
}
}
重試邏輯也很簡(jiǎn)單,從map中拿出失敗的Invoker進(jìn)行調(diào)用舰蟆,成功則從failed中移除
FailfastCluster
public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T>{
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
return invoker.invoke(invocation);
} catch (Throwable e) {
if (e instanceof RpcException && ((RpcException)e).isBiz()) { // biz exception.
throw (RpcException) e;
}
throw new RpcException(e instanceof RpcException ? ((RpcException)e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
}
這個(gè)策略也很簡(jiǎn)單趣惠,從名字也可以猜出是什么功能:快速失敗。
從代碼中看身害,選出一個(gè)Invoker進(jìn)行調(diào)用味悄,如果失敗,那么不重試
FailoverCluster
類上的注釋:
失敗轉(zhuǎn)移塌鸯,當(dāng)出現(xiàn)失敗侍瑟,重試其它服務(wù)器,通常用于讀操作丙猬,但重試會(huì)帶來(lái)更長(zhǎng)延遲涨颜。
看到注釋,有幾個(gè)問(wèn)題:
- 重試其他服務(wù)器茧球,這個(gè)其他服務(wù)器是隨機(jī)挑選的嗎庭瑰?
- 如果所有服務(wù)器都失敗,還會(huì)繼續(xù)重試嗎抢埋?
- 會(huì)重試幾次呢弹灭?
我們看下代碼具體是怎么實(shí)現(xiàn)的:
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // 已經(jīng)調(diào)用過(guò)的Invoker集合.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//重試時(shí),進(jìn)行重新選擇揪垄,避免重試時(shí)invoker列表已發(fā)生變化.
//注意:如果列表發(fā)生了變化穷吮,那么invoked判斷會(huì)失效,因?yàn)閕nvoker示例已經(jīng)改變
if (i > 0) {
//進(jìn)行到這里饥努,證明第一次已經(jīng)失敗
checkWheatherDestoried();
copyinvokers = list(invocation);
//重新檢查一下
checkInvokers(copyinvokers, invocation);
}
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List)invoked);
try {
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(...);
}
}
首先捡鱼,從url中獲取重試次數(shù),在這個(gè)基礎(chǔ)上+1酷愧,進(jìn)行l(wèi)en次調(diào)用驾诈。
調(diào)用的過(guò)程和其他策略,都是先使用LoadBalance策略選出一個(gè)Invoker進(jìn)行調(diào)用伟墙,但是有沒(méi)有注意到翘鸭,select方法在其他策略里傳入的是null,這里傳入的是List戳葵,這會(huì)導(dǎo)致什么不同的結(jié)果就乓,那么需要看下select的實(shí)現(xiàn)
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.size() == 0)
return null;
String methodName = invocation == null ? "" : invocation.getMethodName();
boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName,Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY) ;
{
//ignore overloaded method
if ( stickyInvoker != null && !invokers.contains(stickyInvoker) ){
stickyInvoker = null;
}
//ignore cucurrent problem
if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))){
if (availablecheck && stickyInvoker.isAvailable()){
return stickyInvoker;
}
}
}
Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);
if (sticky){
stickyInvoker = invoker;
}
return invoker;
}
select不是核心所在,而是sticky這個(gè)參數(shù)的實(shí)現(xiàn)拱烁,有興趣的可以研究一下生蚁,我們主要看核心方法doselect
private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.size() == 0)
return null;
if (invokers.size() == 1)// 如果只有一個(gè)invoker,那么沒(méi)法負(fù)載均衡了戏自,只能選這個(gè)了
return invokers.get(0);
// 如果只有兩個(gè)invoker邦投,退化成輪循
// 如果有兩個(gè)Invoker,那么不需要進(jìn)行復(fù)雜的負(fù)載均衡計(jì)算擅笔,這個(gè)不行就選另外一個(gè)
if (invokers.size() == 2 && selected != null && selected.size() > 0) {
return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
}
// 使用LB策略選出一個(gè)Invoker
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
//如果 selected中包含(優(yōu)先判斷) 或者 不可用&&availablecheck=true 則重試.
if( (selected != null && selected.contains(invoker))
||(!invoker.isAvailable() && getUrl()!=null && availablecheck)){
try{
// 重新選擇
Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if(rinvoker != null){
invoker = rinvoker;
}else{
//看下第一次選的位置志衣,如果不是最后屯援,選+1位置.
int index = invokers.indexOf(invoker);
try{
//最后在避免碰撞
invoker = index <invokers.size()-1?invokers.get(index+1) :invoker;
}catch (Exception e) {
logger.warn(e.getMessage()+" may because invokers list dynamic change, ignore.",e);
}
}
}catch (Throwable t){
logger.error("clustor relselect fail reason is :"+t.getMessage() +" if can not slove ,you can set cluster.availablecheck=false in url",t);
}
}
return invoker;
}
通過(guò)LB選出一個(gè)Invoker之后,會(huì)判斷改Invoker是否符合條件念脯,不符合條件會(huì)進(jìn)行重新選擇狞洋,如果選出的不為空那么直接返回,如果重新選擇之后還是返回null绿店,那么如果原來(lái)選中的這個(gè)Invoker是List<Invoker<T>>最后一個(gè)元素吉懊,那么還是使用這個(gè)Invoker,否則在該Invoker后找一個(gè)Invoker(這里為什么是List<Invoker<T>>最后一個(gè)元素那么還是選回原來(lái)的Invoker假勿,而不是另外的元素借嗽,例如第一個(gè),或者)转培,是看下reselect的實(shí)現(xiàn)
private Invoker<T> reselect(LoadBalance loadbalance,Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected ,boolean availablecheck)
throws RpcException {
List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size()>1?(invokers.size()-1):invokers.size());
//先從非select中選
if( availablecheck ){ //選isAvailable 的非select
for(Invoker<T> invoker : invokers){
if(invoker.isAvailable()){
if(selected ==null || !selected.contains(invoker)){
reselectInvokers.add(invoker);
}
}
}
if(reselectInvokers.size()>0){
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
}else{ //選全部非select
for(Invoker<T> invoker : invokers){
if(selected ==null || !selected.contains(invoker)){
reselectInvokers.add(invoker);
}
}
if(reselectInvokers.size()>0){
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
}
//到了這里恶导,證明 reselectInvokers為空,那么從已選的列表中選擇
{
if(selected != null){
for(Invoker<T> invoker : selected){
if((invoker.isAvailable()) //優(yōu)先選available
&& !reselectInvokers.contains(invoker)){
reselectInvokers.add(invoker);
}
}
}
if(reselectInvokers.size()>0){
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
}
return null;
}
總的分3個(gè)分支
- 選擇不在selected中的Invoker
- 選擇不在selected中的Invoker且可用的invoker
- 上面選擇出來(lái)的invokers為空浸须,那么從已選的列表中選擇
如果上面3種情況都不能返回一個(gè)invoker甲锡,那么才會(huì)執(zhí)行這部操作
invoker = index <invokers.size()-1?invokers.get(index+1) :invoker;
那么到這里,整個(gè)選擇流程就結(jié)束了羽戒,上面幾個(gè)問(wèn)題也有了答案
- 重試其他的服務(wù)器缤沦,這個(gè)不是隨機(jī)挑選的,需要根據(jù)LB策略易稠,提供方是否可用等策略進(jìn)行判斷缸废,重試過(guò)的在下一次重試的時(shí)候,基本不會(huì)從這個(gè)重試過(guò)的服務(wù)器選擇(看到代碼中驶社,某些情況還是會(huì)選擇已經(jīng)選擇過(guò)的)
- 重試有一定的次數(shù)企量,次數(shù)為retries參數(shù)+1,如果都失敗了亡电,那么只能拋出異常了
- 重試有一定的次數(shù)届巩,次數(shù)為retries參數(shù)+1
另外,重試的時(shí)候有個(gè)注意點(diǎn)
if (i > 0) { //進(jìn)行到這里份乒,證明第一次已經(jīng)失敗 checkWheatherDestoried(); copyinvokers = list(invocation); checkInvokers(copyinvokers, invocation); }
由于invoker會(huì)動(dòng)態(tài)變化(例如某個(gè)服務(wù)器掛了恕汇,或者某個(gè)服務(wù)器被禁用了,甚至整個(gè)Dubbo實(shí)例已經(jīng)關(guān)閉了)或辖,而在一開始的時(shí)候拿到的invokers是當(dāng)時(shí)的可用的invoker列表瘾英,所以可能存在某個(gè)invoker已經(jīng)不可用了,那么需要檢查一下颂暇,然后從Directory中獲取最新的Invoker列表
FailsafeCluster
public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T>{
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
try {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
return new RpcResult(); // ignore
}
}
}
這個(gè)比較簡(jiǎn)單缺谴,失敗了記錄日志....
ForkingCluster
這也是一種比較有意思的容錯(cuò)策略,先看下官方描述
并行調(diào)用耳鸯,只要一個(gè)成功即返回湿蛔,通常用于實(shí)時(shí)性要求較高的操作膀曾,但需要浪費(fèi)更多服務(wù)資源
看了描述,有幾個(gè)問(wèn)題
- 并行調(diào)用是把所有invoker拿來(lái)并行調(diào)用嗎阳啥?
- 如何進(jìn)行并行調(diào)用妓肢?
- 有一個(gè)成功即返回,這個(gè)怎么做苫纤?
- 如果有失敗的怎么處理?
帶著問(wèn)題去看源碼
public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T>{
private final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("forking-cluster-timer", true));
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);
final List<Invoker<T>> selected;
final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
//如果并行數(shù)大于invoker的數(shù)量或小于0纲缓,那么直接用并行調(diào)用所有invoker
// 因?yàn)槿绻⑿袨?卷拘,而invoker數(shù)量為3,那么其實(shí)最大并行量也只有3
if (forks <= 0 || forks >= invokers.size()) {
selected = invokers;
} else {
// 如果并行數(shù)大于invoker的數(shù)量祝高,那么就需要挑選invoker了栗弟,例如并行為4,invoker數(shù)量為5工闺,那么從5個(gè)挑4個(gè)進(jìn)行并行調(diào)用
// 這里和failover類似乍赫,通過(guò)select選出一個(gè)invoker,然后放到selected里陆蟆,保證不重復(fù)
selected = new ArrayList<Invoker<T>>();//并行調(diào)用的invoker集合
for (int i = 0; i < forks; i++) {
//在invoker列表(排除selected)后,如果沒(méi)有選夠,則存在重復(fù)循環(huán)問(wèn)題.見(jiàn)select實(shí)現(xiàn).
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if(!selected.contains(invoker)){//防止重復(fù)添加invoker
selected.add(invoker);
}
}
}
RpcContext.getContext().setInvokers((List)selected);
// 記錄失敗次數(shù)
final AtomicInteger count = new AtomicInteger();
//返回結(jié)果放到隊(duì)列中
final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
// 每個(gè)invoker調(diào)用都使用一個(gè)線程調(diào)用
for (final Invoker<T> invoker : selected) {
executor.execute(new Runnable() {
public void run() {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch(Throwable e) {
int value = count.incrementAndGet();
if (value >= selected.size()) {//當(dāng)所有的invoker都調(diào)用失敗則把異常加入到隊(duì)列中
ref.offer(e);
}
}
}
});
}
try {
// 從隊(duì)列中獲取結(jié)果雷厂,該過(guò)程阻塞
Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
if (ret instanceof Throwable) {// 當(dāng)所有
Throwable e = (Throwable) ret;
throw new RpcException(e instanceof RpcException ? ((RpcException)e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
}
return (Result) ret;
} catch (InterruptedException e) {
throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
}
}
}
看到源碼,答案就出來(lái)了
- 如果并行數(shù)大于invoker數(shù)或者小于0叠殷,那么拿全部invoker進(jìn)行調(diào)用
- 每個(gè)invoker使用一個(gè)線程執(zhí)行
- 每個(gè)invoker調(diào)用后改鲫,會(huì)返回結(jié)果,結(jié)果會(huì)放到隊(duì)列中林束,主線程會(huì)使用poll從隊(duì)列獲取值像棘,只要有一個(gè)線程從invoker中獲取到數(shù)據(jù),那么就返回結(jié)果
- 當(dāng)所有的invoker都失敗了壶冒,那么隊(duì)列就放的是異常缕题,而不是結(jié)果,主線程poll會(huì)判斷該返回值胖腾,如果是異常則拋出