Dubbo中的Cluster可以將多個(gè)服務(wù)提供方偽裝成一個(gè)提供方,具體也就是將Directory中的多個(gè)Invoker偽裝成一個(gè)Invoker录粱,在偽裝的過程中包含了容錯(cuò)的處理腻格,負(fù)載均衡的處理和路由的處理。這篇文章介紹下集群相關(guān)的東西啥繁,開始先對(duì)著文檔解釋下容錯(cuò)模式菜职,負(fù)載均衡,路由等概念旗闽,然后解析下源碼的處理酬核。(稍微有點(diǎn)亂,心情不太好宪睹,不適合分析源碼。)
集群的容錯(cuò)模式
Failover Cluster
這是dubbo中默認(rèn)的集群容錯(cuò)模式
- 失敗自動(dòng)切換蚕钦,當(dāng)出現(xiàn)失敗亭病,重試其它服務(wù)器。
- 通常用于讀操作嘶居,但重試會(huì)帶來更長延遲罪帖。
- 可通過retries="2"來設(shè)置重試次數(shù)(不含第一次)促煮。
Failfast Cluster
- 快速失敗,只發(fā)起一次調(diào)用整袁,失敗立即報(bào)錯(cuò)菠齿。
- 通常用于非冪等性的寫操作,比如新增記錄坐昙。
Failsafe Cluster
- 失敗安全绳匀,出現(xiàn)異常時(shí),直接忽略炸客。
- 通常用于寫入審計(jì)日志等操作疾棵。
Failback Cluster
- 失敗自動(dòng)恢復(fù),后臺(tái)記錄失敗請(qǐng)求痹仙,定時(shí)重發(fā)是尔。
- 通常用于消息通知操作。
Forking Cluster
- 并行調(diào)用多個(gè)服務(wù)器开仰,只要一個(gè)成功即返回拟枚。
- 通常用于實(shí)時(shí)性要求較高的讀操作,但需要浪費(fèi)更多服務(wù)資源众弓。
- 可通過forks="2"來設(shè)置最大并行數(shù)恩溅。
Broadcast Cluster
- 廣播調(diào)用所有提供者,逐個(gè)調(diào)用田轧,任意一臺(tái)報(bào)錯(cuò)則報(bào)錯(cuò)暴匠。(2.1.0開始支持)
- 通常用于通知所有提供者更新緩存或日志等本地資源信息。
負(fù)載均衡
dubbo默認(rèn)的負(fù)載均衡策略是random傻粘,隨機(jī)調(diào)用每窖。
Random LoadBalance
- 隨機(jī),按權(quán)重設(shè)置隨機(jī)概率弦悉。
- 在一個(gè)截面上碰撞的概率高窒典,但調(diào)用量越大分布越均勻,而且按概率使用權(quán)重后也比較均勻稽莉,有利于動(dòng)態(tài)調(diào)整提供者權(quán)重瀑志。
RoundRobin LoadBalance
- 輪循,按公約后的權(quán)重設(shè)置輪循比率污秆。
- 存在慢的提供者累積請(qǐng)求問題劈猪,比如:第二臺(tái)機(jī)器很慢,但沒掛良拼,當(dāng)請(qǐng)求調(diào)到第二臺(tái)時(shí)就卡在那战得,久而久之,所有請(qǐng)求都卡在調(diào)到第二臺(tái)上庸推。
LeastActive LoadBalance
- 最少活躍調(diào)用數(shù)常侦,相同活躍數(shù)的隨機(jī)浇冰,活躍數(shù)指調(diào)用前后計(jì)數(shù)差。
- 使慢的提供者收到更少請(qǐng)求聋亡,因?yàn)樵铰奶峁┱叩恼{(diào)用前后計(jì)數(shù)差會(huì)越大肘习。
ConsistentHash LoadBalance
- 一致性Hash,相同參數(shù)的請(qǐng)求總是發(fā)到同一提供者坡倔。
- 當(dāng)某一臺(tái)提供者掛時(shí)漂佩,原本發(fā)往該提供者的請(qǐng)求,基于虛擬節(jié)點(diǎn)致讥,平攤到其它提供者仅仆,不會(huì)引起劇烈變動(dòng)。
- 缺省只對(duì)第一個(gè)參數(shù)Hash垢袱。
- 缺省用160份虛擬節(jié)點(diǎn)墓拜。
集群相關(guān)源碼解析
回想一下在服務(wù)消費(fèi)者初始化的過程中,在引用遠(yuǎn)程服務(wù)的那一步请契,也就是RegistryProtocol的refer方法中咳榜,調(diào)用了doRefer方法,doRefer方法中第一個(gè)參數(shù)就是cluster爽锥,我們就從這里開始解析涌韩。RegistryProtocol的refer方法:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
//根據(jù)url獲取注冊(cè)中心實(shí)例
//這一步連接注冊(cè)中心,并把消費(fèi)者注冊(cè)到注冊(cè)中心
Registry registry = registryFactory.getRegistry(url);
//對(duì)注冊(cè)中心服務(wù)的處理
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
//以下是我們自己定義的業(yè)務(wù)的服務(wù)處理
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
//服務(wù)需要合并不同實(shí)現(xiàn)
if (group != null && group.length() > 0 ) {
if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1
|| "*".equals( group ) ) {
return doRefer( getMergeableCluster(), registry, type, url );
}
}
//這里參數(shù)cluster是集群的適配類氯夷,代碼在下面
return doRefer(cluster, registry, type, url);
}
接著看doRefer臣樱,真正去做服務(wù)引用的方法:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//Directory中是Invoker的集合,相當(dāng)于一個(gè)List
//也就是說這里面存放了多個(gè)Invoker腮考,那么我們?cè)撜{(diào)用哪一個(gè)呢雇毫?
//該調(diào)用哪一個(gè)Invoker的工作就是Cluster來處理的
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
//到注冊(cè)中心注冊(cè)服務(wù) registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
//訂閱服務(wù),注冊(cè)中心會(huì)推送服務(wù)消息給消費(fèi)者踩蔚,消費(fèi)者會(huì)再次進(jìn)行服務(wù)的引用棚放。 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
//服務(wù)的引用和變更全部由Directory異步完成
//Directory中可能存在多個(gè)Invoker
//而Cluster會(huì)把多個(gè)Invoker偽裝成一個(gè)Invoker
//這一步就是做這個(gè)事情的
return cluster.join(directory);
}
集群處理的入口
入口就是在doRefer的時(shí)候最后一步:cluster.join(directory);
。
首先解釋下cluster馅闽,這個(gè)是根據(jù)dubbo的擴(kuò)展機(jī)制生成的飘蚯,在RegistryProtocol中有一個(gè)setCluster方法,根據(jù)擴(kuò)展機(jī)制可以知道福也,這是注入Cluster的地方局骤,代碼如下:
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Cluster$Adpative implements com.alibaba.dubbo.rpc.cluster.Cluster {
public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.cluster.Directory {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("cluster", "failover");
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
return extension.join(arg0);
}
}
可以看到,如果我們沒有配置集群策略的話暴凑,默認(rèn)是用failover模式峦甩,在Cluster接口的注解上@SPI(FailoverCluster.NAME)
也可以看到默認(rèn)是failover。
繼續(xù)執(zhí)行cluster.join方法搬设,會(huì)首先進(jìn)入MockClusterWrapper的join方法:
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
//先執(zhí)行FailoverCluster的join方法處理
//然后將Directory和返回的Invoker封裝成一個(gè)MockCluster
return new MockClusterInvoker<T>(directory,
this.cluster.join(directory));
}
看下Failover的join方法:
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
//直接返回一個(gè)FailoverClusterInvoker的實(shí)例
return new FailoverClusterInvoker<T>(directory);
}
到這里就算把Invoker都封裝好了穴店,返回的Invoker是一個(gè)MockClusterInvoker,MockClusterInvoker內(nèi)部包含一個(gè)Directory和一個(gè)FailoverClusterInvoker拿穴。
Invoker都封裝好了之后泣洞,就是創(chuàng)建代理,然后使用代理調(diào)用我們的要調(diào)用的方法默色。
調(diào)用方法時(shí)集群的處理
在進(jìn)行具體方法調(diào)用的時(shí)候球凰,代理中會(huì)invoker.invoke()
,這里Invoker就是我們上面封裝好的MockClusterInvoker腿宰,所以首先進(jìn)入MockClusterInvoker的invoke方法:
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
//我們沒配置mock呕诉,所以這里為false
//Mock通常用于服務(wù)降級(jí)
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
//沒有使用mock
if (value.length() == 0 || value.equalsIgnoreCase("false")){
//這里的invoker是FailoverClusterInvoker
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
//mock=force:return+null
//表示消費(fèi)方對(duì)方法的調(diào)用都直接返回null,不發(fā)起遠(yuǎn)程調(diào)用
//可用于屏蔽不重要服務(wù)不可用的時(shí)候吃度,對(duì)調(diào)用方的影響
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
//mock=fail:return+null
//表示消費(fèi)方對(duì)該服務(wù)的方法調(diào)用失敗后甩挫,再返回null,不拋異常
//可用于對(duì)不重要服務(wù)不穩(wěn)定的時(shí)候椿每,忽略對(duì)調(diào)用方的影響
//fail-mock
try {
result = this.invoker.invoke(invocation);
}catch (RpcException e) {
if (e.isBiz()) {
throw e;
} else {
result = doMockInvoke(invocation, e);
}
}
}
return result;
}
我們這里么有配置mock屬性伊者。首先進(jìn)入的是AbstractClusterInvoker的incoke方法:
public Result invoke(final Invocation invocation) throws RpcException {
//檢查是否已經(jīng)被銷毀
checkWheatherDestoried();
//可以看到這里該處理負(fù)載均衡的問題了
LoadBalance loadbalance;
//根據(jù)invocation中的信息從Directory中獲取Invoker列表
//這一步中會(huì)進(jìn)行路由的處理
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && invokers.size() > 0) {
//使用擴(kuò)展機(jī)制,加載LoadBalance的實(shí)現(xiàn)類间护,默認(rèn)使用的是random
//我們這里得到的就是RandomLoadBalance
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
//異步操作默認(rèn)添加invocation id
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
//調(diào)用具體的實(shí)現(xiàn)類的doInvoke方法亦渗,這里是FailoverClusterInvoker
return doInvoke(invocation, invokers, loadbalance);
}
看下FailoverClusterInvoker的invoke方法:
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
//Invoker列表
List<Invoker<T>> copyinvokers = invokers;
//確認(rèn)下Invoker列表不為空
checkInvokers(copyinvokers, invocation);
//重試次數(shù)
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//重試時(shí),進(jìn)行重新選擇汁尺,避免重試時(shí)invoker列表已發(fā)生變化.
//注意:如果列表發(fā)生了變化法精,那么invoked判斷會(huì)失效,因?yàn)閕nvoker示例已經(jīng)改變
if (i > 0) {
checkWheatherDestoried();
copyinvokers = list(invocation);
//重新檢查一下
checkInvokers(copyinvokers, invocation);
}
//使用loadBalance選擇一個(gè)Invoker返回
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List)invoked);
try {
//使用選擇的結(jié)果Invoker進(jìn)行調(diào)用痴突,返回結(jié)果
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {搂蜓。。苞也。} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(洛勉。。如迟。);
}
先看下使用loadbalance選擇invoker的select方法:
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.size() == 0)
return null;
String methodName = invocation == null ? "" : invocation.getMethodName();
//sticky收毫,滯連接用于有狀態(tài)服務(wù),盡可能讓客戶端總是向同一提供者發(fā)起調(diào)用殷勘,除非該提供者掛了此再,再連另一臺(tái)。
boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName,Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY) ;
{
//ignore overloaded method
if ( stickyInvoker != null && !invokers.contains(stickyInvoker) ){
stickyInvoker = null;
}
//ignore cucurrent problem
if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))){
if (availablecheck && stickyInvoker.isAvailable()){
return stickyInvoker;
}
}
}
Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);
if (sticky){
stickyInvoker = invoker;
}
return invoker;
}
doselect方法:
private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.size() == 0)
return null;
//只有一個(gè)invoker玲销,直接返回输拇,不需要處理
if (invokers.size() == 1)
return invokers.get(0);
// 如果只有兩個(gè)invoker,退化成輪循
if (invokers.size() == 2 && selected != null && selected.size() > 0) {
return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
}
//使用loadBalance進(jìn)行選擇
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
//如果 selected中包含(優(yōu)先判斷) 或者 不可用&&availablecheck=true 則重試.
if( (selected != null && selected.contains(invoker))
||(!invoker.isAvailable() && getUrl()!=null && availablecheck)){
try{
//重新選擇
Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if(rinvoker != null){
invoker = rinvoker;
}else{
//看下第一次選的位置贤斜,如果不是最后策吠,選+1位置.
int index = invokers.indexOf(invoker);
try{
//最后在避免碰撞
invoker = index <invokers.size()-1?invokers.get(index+1) :invoker;
}catch (Exception e) {逛裤。。猴抹。 }
}
}catch (Throwable t){带族。。蟀给。}
}
return invoker;
}
接著看使用loadBalance進(jìn)行選擇蝙砌,首先進(jìn)入AbstractLoadBalance的select方法:
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (invokers == null || invokers.size() == 0)
return null;
if (invokers.size() == 1)
return invokers.get(0);
// 進(jìn)行選擇,具體的子類實(shí)現(xiàn)跋理,我們這里是RandomLoadBalance
return doSelect(invokers, url, invocation);
}
接著去RandomLoadBalance中查看:
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // 總個(gè)數(shù)
int totalWeight = 0; // 總權(quán)重
boolean sameWeight = true; // 權(quán)重是否都一樣
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
totalWeight += weight; // 累計(jì)總權(quán)重
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false; // 計(jì)算所有權(quán)重是否一樣
}
}
if (totalWeight > 0 && ! sameWeight) {
// 如果權(quán)重不相同且權(quán)重大于0則按總權(quán)重?cái)?shù)隨機(jī)
int offset = random.nextInt(totalWeight);
// 并確定隨機(jī)值落在哪個(gè)片斷上
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
}
// 如果權(quán)重相同或權(quán)重為0則均等隨機(jī)
return invokers.get(random.nextInt(length));
}
上面根據(jù)權(quán)重之類的來進(jìn)行選擇一個(gè)Invoker返回择克。接下來reselect的方法不在說明,是先從非selected的列表中選擇前普,沒有在從selected列表中選擇肚邢。
選擇好了Invoker之后,就回去FailoverClusterInvoker的doInvoke方法拭卿,接著就是根據(jù)選中的Invoker調(diào)用invoke方法進(jìn)行返回結(jié)果道偷,接著就是到具體的Invoker進(jìn)行調(diào)用的過程了。這部分的解析在消費(fèi)者和提供者請(qǐng)求響應(yīng)過程已經(jīng)解析過了记劈,不再重復(fù)勺鸦。
路由
回到AbstractClusterInvoker的invoke方法中,這里有一步是List<Invoker<T>> invokers = list(invocation);
獲取Invoker列表目木,這里同時(shí)也進(jìn)行了路由的操作换途,看下list方法:
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = directory.list(invocation);
return invokers;
}
接著看AbstractDirectory的list方法:
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed){
throw new RpcException("Directory already destroyed .url: "+ getUrl());
}
//RegistryDirectory中的doList實(shí)現(xiàn)
List<Invoker<T>> invokers = doList(invocation);
List<Router> localRouters = this.routers; // local reference
if (localRouters != null && localRouters.size() > 0) {
for (Router router: localRouters){
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {
//路由選擇
//MockInvokersSelector中
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {。刽射。军拟。}
}
}
return invokers;
}
路由來過濾之后,進(jìn)行負(fù)載均衡的處理誓禁。