image.png
- LoadBalance:負(fù)載均衡 SPI 接口箍邮;
- AbstractLoadBalance:負(fù)載均衡
模板基類
茉帅;提供了 “獲取一個(gè) Invoker(filtered) 的權(quán)重” 的方式:
- 獲取當(dāng)前Invoker設(shè)置的權(quán)重weight和預(yù)熱時(shí)間warmup,并且計(jì)算啟動(dòng)至今時(shí)間uptime
- 如果uptime<warmup锭弊,則重新計(jì)算當(dāng)前Invoker的weight(uptime/warmup*weight)堪澎,否則直接返回設(shè)置的weight。
eg. 假設(shè)設(shè)置的權(quán)重是 100味滞, 預(yù)熱時(shí)間 10min,
- 第一分鐘的時(shí)候:權(quán)重變?yōu)?(1/10)*100=10, 也就是承擔(dān) 10/100 = 10% 的流量樱蛤;
- 第二分鐘的時(shí)候:權(quán)重變?yōu)?(2/10)*100=20, 也就是承擔(dān) 20/100 = 20% 的流量;
- 第十分鐘的時(shí)候:權(quán)重變?yōu)?(10/10)*100=100, 也就是承擔(dān) 100/100 = 100% 的流量剑鞍;
- 超過十分鐘之后(即 uptime>warmup昨凡,表示預(yù)熱期過了,則直接返回 weight=100蚁署,不再計(jì)算)
- RandomLoadBalance(
默認(rèn)
):帶權(quán)重的隨機(jī)負(fù)載均衡器便脊;
- 通過計(jì)算每一個(gè) Invoker 的權(quán)重來計(jì)算總權(quán)重 totalWeight,并判斷是否所有的 Invoker 都有相同的權(quán)重光戈;
- 如果總權(quán)重=0或者所有的 Invoker 都有相同的權(quán)重哪痰,則直接隨機(jī)獲人煸;
- 如果總權(quán)重>0并且不是所有的 Invoker 都有相同的權(quán)重晌杰,則根據(jù)權(quán)重進(jìn)行隨機(jī)獲取跷睦,算法如下:
假設(shè)有 4 個(gè) Invoker,權(quán)重分別是1,2,3,4乎莉,則總權(quán)重是 1+2+3+4=10送讲,說明每個(gè) Invoker 被選中的概率為1/10,2/10,3/10,4/10。先隨機(jī)生成一個(gè) [0,10) 的值 index惋啃,比如 5哼鬓,從前向后讓 index 遞減權(quán)重,直到差值<0边灭,那么最后那個(gè)使差值<0的 Invoker 就是當(dāng)前選擇的 Invoker - RandomLoadBalance 采用該種算法
- 減第一個(gè) Invoker异希,5-1=4>0,繼續(xù)減
- 減第二個(gè) Invoker绒瘦,4-2=2>0称簿,繼續(xù)減
- 減第三個(gè) Invoker,2-3<0惰帽,則獲取第三個(gè) Invoker
- RoundRobinLoadBalance:帶權(quán)重的輪詢負(fù)載均衡器憨降;
平滑權(quán)重輪詢算法:
- 每次做負(fù)載均衡時(shí),遍歷所有的服務(wù)端(Invoker)列表该酗。對(duì)每個(gè) Invoker授药,
a) current = current + weight
b) 計(jì)算總權(quán)重 totalWeight = totalWeight + weight
- 遍歷完所有的 Invoker 后,current 最大的節(jié)點(diǎn)就是本次要選擇的節(jié)點(diǎn)呜魄。最后悔叽,該節(jié)點(diǎn)的 current = current - totalWeight
- LeastActiveLoadBalance:帶權(quán)重的最小活躍數(shù)負(fù)載均衡器;
- 需要與 ActiveLimitFilter 配合使用爵嗅,后者用于記錄當(dāng)前客戶端對(duì)當(dāng)前 Invoker 的活躍數(shù)及其當(dāng)前調(diào)用方法的活躍數(shù)娇澎。(注意:actives如果設(shè)置為0,則不會(huì)加載睹晒;設(shè)置為<0趟庄,只會(huì)記錄活躍數(shù),不會(huì)進(jìn)行并發(fā)數(shù)限流册招;設(shè)置為>0岔激,則會(huì)進(jìn)行每個(gè)客戶端的并發(fā)限制邏輯)
- 總體步驟(都是針對(duì)當(dāng)前客戶端對(duì)指定 Invoker 的并發(fā)執(zhí)行數(shù))
- 初始化最小活躍數(shù)的 Invoker 列表:leastIndexs[]
遍歷所有的 Invoker,1.1. 獲取每一個(gè) Invoker 的當(dāng)前被調(diào)用方法的活躍數(shù) active 及其權(quán)重是掰;
1.2. 如果遍歷到的 Invoker 是第一個(gè)遍歷的 Invoker 或者有更小的活躍數(shù)的 Invoker,所有的計(jì)數(shù)清空辱匿,重新進(jìn)行初始化键痛;
1.3. 如果遍歷到的 Invoker 的活躍數(shù) active 與之前記錄的 leastActive 相同炫彩,則將當(dāng)前的 Invoker 記錄到 leastIndexs[] 中
判斷所有的 Invoker 是否都有相同的權(quán)重。
- 如果 leastIndexs[] 中只有一個(gè)值絮短,則直接獲取對(duì)應(yīng)索引的 Invoker江兢;否則按照 RandomLoadBalance 的邏輯進(jìn)行選擇:如果總權(quán)重=0或者所有的 Invoker 都有相同的權(quán)重,則直接隨機(jī)獲榷∑怠杉允;如果總權(quán)重>0并且不是所有的 Invoker 都有相同的權(quán)重,則根據(jù)權(quán)重進(jìn)行隨機(jī)獲取席里。
- ConsistentHashLoadBalance:一致性 Hash 負(fù)載均衡器(與權(quán)重?zé)o關(guān)):
- 組裝
serviceKey.methodName
=> {group/}interface{:version}.methodName叔磷,獲取或創(chuàng)建(第一次或者 invokers 發(fā)生了變化
)該 serviceKey 對(duì)應(yīng)的的 selector。
為每一個(gè)Invoker創(chuàng)建160個(gè)虛擬節(jié)點(diǎn)奖磁,存儲(chǔ)到TreeMap
中:
- key 的計(jì)算(需要將每個(gè)虛擬節(jié)點(diǎn)的 key 打散到整個(gè)環(huán)上:0 ~ 232)
- value:當(dāng)前遍歷的 Invoker
- 根據(jù)請(qǐng)求參數(shù)值 invocation.getArguments() 使用 selector 獲取 Invoker
a) 獲取argumentIndex[](默認(rèn)只是用第一個(gè)參數(shù)值改基,可配置)中指定的參數(shù)值,連接起來作為key
b) 對(duì)該key進(jìn)行md5咖为,得到長(zhǎng)度為16的字節(jié)數(shù)組 byte[] digest秕狰,對(duì) digest[0~3] 進(jìn)行 hash
c) 從 TreeMap 中獲取第一個(gè) >= 該hash 的 Entry,如果沒有獲取到躁染,則直接獲取 TreeMap 的第一個(gè) Entry 元素
d) 返回該 Entry 的 value 值鸣哀,即 Invoker
一、LoadBalance
@SPI(RandomLoadBalance.NAME) // 默認(rèn)是 random=RandomLoadBalance
public interface LoadBalance {
/** 從 List<Invoker> 中選擇一個(gè) Invoker */
@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
二吞彤、AbstractLoadBalance
public abstract class AbstractLoadBalance implements LoadBalance {
@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (invokers == null || invokers.isEmpty()) {
return null;
}
// 1. 如果只有一個(gè) Invoker我衬,直接返回
if (invokers.size() == 1) {
return invokers.get(0);
}
// 2. 調(diào)用子類進(jìn)行選擇
return doSelect(invokers, url, invocation);
}
/**
* 子類重寫的方法:真正選擇 Invoker(filtered) 的方法
*/
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
/**
* 獲取一個(gè) Invoker(filtered) 的權(quán)重
* 1、獲取當(dāng)前Invoker設(shè)置的權(quán)重weight和預(yù)熱時(shí)間warmup备畦,并且計(jì)算啟動(dòng)至今時(shí)間uptime
* 2低飒、如果uptime<warmup,則重新計(jì)算當(dāng)前Invoker的weight(uptime/warmup*weight)懂盐,否則直接返回設(shè)置的weight
*/
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
// 1. 獲取當(dāng)前Invoker設(shè)置的權(quán)重:weight=100(該值配置在provider端)
// 全局:<dubbo:provider warmup="100000" weight="10"/>
// 單個(gè)服務(wù):<dubbo:service interface="..." warmup="6000000" weight="10"/>
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
// 2. 獲取啟動(dòng)的時(shí)間點(diǎn)褥赊,該值服務(wù)啟動(dòng)時(shí)會(huì)存儲(chǔ)在注冊(cè)的URL上(timestamp):dubbo://10.213.11.98:20880/com.alibaba.dubbo.demo.DemoService?...×tamp=1565775720703&warmup=10000&weight=10
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
// 3. 計(jì)算啟動(dòng)至今時(shí)間
int uptime = (int) (System.currentTimeMillis() - timestamp);
// 4. 獲取當(dāng)前Invoker設(shè)置的預(yù)熱時(shí)間,默認(rèn) warmup=10*60*1000=10min
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP); //
// 5. 如果沒有過完預(yù)熱時(shí)間莉恼,則計(jì)算預(yù)熱權(quán)重
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight;
}
/**
* 計(jì)算預(yù)熱權(quán)重
* 預(yù)熱公式:uptime/warmup*weight => 啟動(dòng)至今時(shí)間/設(shè)置的預(yù)熱總時(shí)間*權(quán)重
*/
private int calculateWarmupWeight(int uptime, int warmup, int weight) {
/**
* eg. 設(shè)置的權(quán)重是 100拌喉, 預(yù)熱時(shí)間 10min,
* 第一分鐘的時(shí)候:權(quán)重變?yōu)?(1/10)*100=10, 也就是承擔(dān) 10/100 = 10% 的流量;
* 第二分鐘的時(shí)候:權(quán)重變?yōu)?(2/10)*100=20, 也就是承擔(dān) 20/100 = 20% 的流量俐银;
* 第十分鐘的時(shí)候:權(quán)重變?yōu)?(10/10)*100=100, 也就是承擔(dān) 100/100 = 100% 的流量尿背;
* 超過十分鐘之后(即 uptime>warmup,表示預(yù)熱期過了捶惜,則直接返回 weight=100田藐,不再計(jì)算)
*/
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
}
預(yù)熱和權(quán)重的設(shè)置
- 配置在 provider 端,注冊(cè)在 provider 的 URL 上:dubbo://10.213.11.98:20880/com.alibaba.dubbo.demo.DemoService?...&
timestamp
=1565775720703&warmup
=10000&weight
=10- consumer 服務(wù)發(fā)現(xiàn)時(shí),會(huì)拉取該 URL汽久,獲取其參數(shù)鹤竭,其中 timestamp 會(huì)用來計(jì)算啟動(dòng)至今時(shí)間 uptime。
全局:<dubbo:provider warmup="100000" weight="10"/>
單個(gè)服務(wù):<dubbo:service interface="..." warmup="6000000" weight="10"/>
三景醇、RandomLoadBalance
- 隨機(jī)臀稚,按權(quán)重設(shè)置隨機(jī)概率。
- 在一個(gè)截面上碰撞的概率高三痰,但調(diào)用量越大分布越均勻吧寺,而且按概率使用權(quán)重后也比較均勻,有利于動(dòng)態(tài)調(diào)整提供者權(quán)重散劫。
三種使用姿勢(shì)
<!-- 1. 所有的消費(fèi)者使用 RandomLoadBalance -->
<dubbo:consumer loadbalance="random" />
<!-- 2. 指定的消費(fèi)者的所有方法使用 RandomLoadBalance -->
<dubbo:reference ... loadbalance="random" />
<!-- 3. 指定的消費(fèi)者的指定方法 sayHello 使用 RandomLoadBalance稚机,其余方法使用默認(rèn)值 -->
<dubbo:reference id="demoService" ...>
<dubbo:method name="sayHello" loadbalance="random" />
</dubbo:reference>
public class RandomLoadBalance extends AbstractLoadBalance {
public static final String NAME = "random";
private final Random random = new Random();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // Number of invokers
int totalWeight = 0; // The sum of weights
boolean sameWeight = true; // Every invoker has the same weight?
for (int i = 0; i < length; i++) {
// 計(jì)算每一個(gè)Invoker的權(quán)重
int weight = getWeight(invokers.get(i), invocation);
// 計(jì)算總權(quán)重
totalWeight += weight; // Sum
// 計(jì)算所有Invoker的權(quán)重是否相同
// 判斷方法:每次遍歷一個(gè)Invoker,都與其前一個(gè)Invoker的權(quán)重作比較舷丹,如果不相等抒钱,則設(shè)置sameWeight=false,一旦sameWeight=false后颜凯,后續(xù)的遍歷就不必再進(jìn)行判斷了
if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false;
}
}
// 如果總權(quán)重>0&&不是所有的Invoker都有相同的權(quán)重谋币,則根據(jù)權(quán)重進(jìn)行隨機(jī)獲取
// eg. 4個(gè)Invoker,權(quán)重分別是1,2,3,4症概,則總權(quán)重是1+2+3+4=10蕾额,說明每個(gè)Invoker被選中的概率為1/10,2/10,3/10,4/10。此時(shí)有兩種算法可以實(shí)現(xiàn)帶概率的選擇:
// 1. 想象有這樣的一個(gè)數(shù)組 [1,2,2,3,3,3,4,4,4,4], 先隨機(jī)生成一個(gè)[0,10)的值彼城,比如5诅蝶,該值作為數(shù)組的index,此時(shí)獲取到的是3募壕,即使用第三個(gè)Invoker
// 2. 先隨機(jī)生成一個(gè)[0,10)的值调炬,比如5,從前向后讓索引遞減權(quán)重舱馅,直到差值<0缰泡,那么最后那個(gè)使差值<0的Invoker就是當(dāng)前選擇的Invoker,5-1-2-3<0代嗤,那么最終獲取的就是第三個(gè)Invoker(Dubbo 使用了該算法)
if (totalWeight > 0 && !sameWeight) {
int offset = random.nextInt(totalWeight);
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
}
// 如果所有的Invokers都有相同的權(quán)重 or 總權(quán)重=0棘钞,則直接隨機(jī)獲取
return invokers.get(random.nextInt(length));
}
}
- 總體步驟:
- 通過計(jì)算每一個(gè) Invoker 的權(quán)重來計(jì)算總權(quán)重 totalWeight,并判斷是否所有的 Invoker 都有相同的權(quán)重干毅;
- 如果總權(quán)重=0或者所有的 Invoker 都有相同的權(quán)重宜猜,則直接隨機(jī)獲取硝逢;
- 如果總權(quán)重>0并且不是所有的 Invoker 都有相同的權(quán)重姨拥,則根據(jù)權(quán)重進(jìn)行隨機(jī)獲取绅喉,算法如下:
- 根據(jù)權(quán)重進(jìn)行隨機(jī)獲取,有兩種算法:假設(shè)有 4 個(gè) Invoker垫毙,權(quán)重分別是1,2,3,4霹疫,則總權(quán)重是 1+2+3+4=10拱绑,說明每個(gè) Invoker 被選中的概率為1/10,2/10,3/10,4/10综芥。此時(shí)有兩種算法可以實(shí)現(xiàn)帶概率的選擇:
- 構(gòu)造這樣的一個(gè)數(shù)組 a = [1,2,2,3,3,3,4,4,4,4], 先隨機(jī)生成一個(gè) [0,10) 的值,比如 5猎拨,該值作為數(shù)組的 index膀藐,此時(shí)獲取到的是 a[5] = 3,即使用第三個(gè) Invoker红省;(該算法占用空間大)
- 先隨機(jī)生成一個(gè) [0,10) 的值 index额各,比如 5,從前向后讓 index 遞減權(quán)重吧恃,直到差值<0虾啦,那么最后那個(gè)使差值<0的 Invoker 就是當(dāng)前選擇的 Invoker - RandomLoadBalance 采用該種算法
- 減第一個(gè) Invoker,5-1=4>0痕寓,繼續(xù)減
- 減第二個(gè) Invoker傲醉,4-2=2>0,繼續(xù)減
- 減第三個(gè) Invoker呻率,2-3<0硬毕,則獲取第三個(gè) Invoker
- 判斷所有 Invoker 的權(quán)重是否相同,有兩種算法:
- 每次遍歷一個(gè) Invoker礼仗,都與其前一個(gè) Invoker 的權(quán)重作比較吐咳,如果不相等,則設(shè)置 sameWeight=false元践,一旦 sameWeight=false 后韭脊,后續(xù)的遍歷就不必再進(jìn)行判斷了 - RandomLoadBalance 采用這種;
- 將所有的 Invoker 與第一個(gè) Invoker 的權(quán)重作比較单旁,如果都相等沪羔,則sameWeight=true,否則 sameWeight=false - LeastActiveLoadBalance 采用這種慎恒。
四任内、RoundRobinLoadBalance
- 輪詢,按公約后的權(quán)重設(shè)置輪詢比率融柬。
- 存在慢的提供者累積請(qǐng)求的問題死嗦,比如:第二臺(tái)機(jī)器很慢,但沒掛粒氧,當(dāng)請(qǐng)求調(diào)到第二臺(tái)時(shí)就卡在那越除,久而久之,所有請(qǐng)求都卡在調(diào)到第二臺(tái)上。
public class RoundRobinLoadBalance extends AbstractLoadBalance {
public static final String NAME = "roundrobin";
private static int RECYCLE_PERIOD = 60000;
protected static class WeightedRoundRobin {
private int weight; // Invoker的權(quán)重
private AtomicLong current = new AtomicLong(0); // 同時(shí)被多個(gè)線程選中的權(quán)重之后摘盆,假設(shè)同時(shí)被4個(gè)線程選中翼雀,weight=100,那么current=400
private long lastUpdate; // 用于緩存超時(shí)的判斷
...
// 增加一個(gè)權(quán)重值
public long increaseCurrent() {
return current.addAndGet(weight);
}
public void sel(int total) {
current.addAndGet(-1 * total);
}
...
}
/**
* 外層 key: serviceKey.methodName
* 內(nèi)層 key: url = {protocol://username:password@ip:port/path?xx=yy&uu=ii}
*/
private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
private AtomicBoolean updateLock = new AtomicBoolean();
...
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 創(chuàng)建緩存 Map<String, Map<String, WeightedRoundRobin>> methodWeightMap
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map == null) {
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
map = methodWeightMap.get(key);
}
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (Invoker<T> invoker : invokers) {
// url = {protocol://username:password@ip:port/path?xx=yy&uu=ii}
String identifyString = invoker.getUrl().toIdentityString();
WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
// 獲取當(dāng)前 Invoker 的權(quán)重
int weight = getWeight(invoker, invocation);
if (weight < 0) {
weight = 0;
}
// 創(chuàng)建 WeightedRoundRobin
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(identifyString, weightedRoundRobin);
weightedRoundRobin = map.get(identifyString);
}
// 權(quán)重發(fā)生了變化孩擂,eg. 在預(yù)熱期間狼渊,預(yù)熱權(quán)重隨時(shí)間發(fā)生變化
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
weightedRoundRobin.setWeight(weight);
}
// 1. 將weight加到current上:current=current+weight
long cur = weightedRoundRobin.increaseCurrent();
// 設(shè)置最后更新的時(shí)間
weightedRoundRobin.setLastUpdate(now);
// 2. 最后選出current最大的Invoker作為最終要調(diào)用的Invoker
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
// 計(jì)算總權(quán)重
totalWeight += weight;
}
// 加鎖做緩存清除操作:
// invokers.size() != map.size() 說明 invokers 發(fā)生了變化(新增或下線)
// 新增:下次循環(huán)會(huì)增加到map
// 下線:只能靠如下的緩存清除策略從map中進(jìn)行刪除
if (!updateLock.get() && invokers.size() != map.size()) {
if (updateLock.compareAndSet(false, true)) {
try {
// CopyOnWrite
// copy -> modify -> update reference
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
newMap.putAll(map);
Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
while (it.hasNext()) {
Entry<String, WeightedRoundRobin> item = it.next();
// 如果該緩存已經(jīng)有60s沒有使用了,則清除
if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
it.remove();
}
}
methodWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
}
// 返回current最大的Invoker作為最終要調(diào)用的Invoker
if (selectedInvoker != null) {
// 3. 當(dāng)前的Invoker的current減去總權(quán)重:current=current-totalWeight
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
}
}
平滑權(quán)重輪詢算法:
- 每次做負(fù)載均衡時(shí)类垦,遍歷所有的服務(wù)端(Invoker)列表狈邑。對(duì)每個(gè) Invoker,
a) current = current + weight
b) 計(jì)算總權(quán)重 totalWeight = totalWeight + weight
- 遍歷完所有的 Invoker 后蚤认,current 最大的節(jié)點(diǎn)就是本次要選擇的節(jié)點(diǎn)米苹。最后,該節(jié)點(diǎn)的 current = current - totalWeight
舉例說明:
eg. 假設(shè)有3個(gè)Invoker:A,B,C, 權(quán)重是1砰琢,2蘸嘶,3,調(diào)用如下陪汽,我們發(fā)現(xiàn)最后調(diào)用次數(shù) A:B:C=1:2:3 與權(quán)重相符训唱。而且A,B,C的調(diào)用也是穿插的(平滑權(quán)重輪詢的好處,而普通權(quán)重輪詢是會(huì)出現(xiàn) [C,C,C,B,B,A] 這樣短時(shí)間內(nèi)不斷調(diào)用同一個(gè)節(jié)點(diǎn)的問題-會(huì)導(dǎo)致該節(jié)點(diǎn)壓力驟增)
image.png
五掩缓、LeastActiveLoadBalance
- 最少活躍調(diào)用數(shù)雪情,相同活躍數(shù)的隨機(jī),活躍數(shù)指調(diào)用前后計(jì)數(shù)差(通過
ActiveLimitFilter
計(jì)算每個(gè)接口方法的活躍數(shù))- 使慢的提供者收到更少請(qǐng)求你辣,因?yàn)樵铰奶峁┱叩恼{(diào)用前后計(jì)數(shù)差會(huì)越大
三種使用姿勢(shì)
<!-- 第一步:指定負(fù)載均衡器 -->
<!-- 1. 所有的消費(fèi)者使用 LeastActiveLoadBalance -->
<dubbo:consumer loadbalance="leastactive" />
<!-- 2. 指定的消費(fèi)者的所有方法使用 LeastActiveLoadBalance -->
<dubbo:reference ... loadbalance="leastactive" />
<!-- 3. 指定的消費(fèi)者調(diào)用的指定方法 sayHello 使用 LeastActiveLoadBalance巡通,其余方法使用默認(rèn)值 -->
<dubbo:reference id="demoService" ...>
<dubbo:method name="sayHello" loadbalance="leastactive" />
</dubbo:reference>
<!-- 第二步:激活 ActiveLimitFilter -->
<!-- 2. 指定的消費(fèi)者的所有方法使用 actives -->
<dubbo:reference ... actives="-1"/>
<!-- 3. 指定的消費(fèi)者調(diào)用的指定方法 sayHello 使用 actives,其余方法不用 -->
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
<dubbo:method name="sayHello" actives="-1"/>
</dubbo:reference>
注意
LeastActiveLoadBalance 需要與 ActiveLimitFilter 配合使用舍哄,后者用于記錄當(dāng)前客戶端對(duì)當(dāng)前 Invoker 的活躍數(shù)及其當(dāng)前調(diào)用方法的活躍數(shù)宴凉。(注意:actives如果設(shè)置為0,則不會(huì)加載表悬;設(shè)置為<0弥锄,只會(huì)記錄活躍數(shù),不會(huì)進(jìn)行并發(fā)數(shù)限流蟆沫;設(shè)置為>0籽暇,則會(huì)進(jìn)行每個(gè)客戶端的并發(fā)限制邏輯)
/**
* LeastActiveLoadBalance
*
* 需要與ActiveLimitFilter配合使用(ActiveLimitFilter用于記錄當(dāng)前的Invoker的當(dāng)前方法的活躍數(shù)active)
* @see com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
*/
public class LeastActiveLoadBalance extends AbstractLoadBalance {
public static final String NAME = "leastactive";
private final Random random = new Random();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // Number of invokers
int leastActive = -1; // 所有Invoker的最小活躍數(shù)
int leastCount = 0; // 有最小活躍數(shù)leastActive的Invoker個(gè)數(shù)(leastCount=2,表示有兩個(gè)Invoker其leastActive相同)
int[] leastIndexs = new int[length]; // 存儲(chǔ)leastActive的Invoker在List<Invoker<T>> invokers列表中的索引值
int totalWeight = 0; // 總權(quán)重
int firstWeight = 0; // 第一個(gè)被遍歷的Invoker的權(quán)重饭庞,用于比較來計(jì)算是否所有的Invoker都有相同的權(quán)重
boolean sameWeight = true; // 是否所有的Invoker都有相同的權(quán)重
/**
* 1戒悠、初始化最小活躍數(shù)的Invoker列表:leastIndexs[]
* 遍歷所有的Invoker,
* a) 獲取每一個(gè)方法的活躍數(shù)active及其權(quán)重舟山;
* b) 如果遍歷到的Invoker是第一個(gè)遍歷的Invoker或者有更小的活躍數(shù)的Invoker绸狐,所有的計(jì)數(shù)清空卤恳,重新進(jìn)行初始化;
* c) 如果遍歷到的Invoker的活躍數(shù)active與之前記錄的leastActive相同寒矿,則將當(dāng)前的Invoker記錄到 leastIndexs[] 中
* 判斷所有的Invoker是否都有相同的權(quán)重突琳。
* 2、如果leastIndexs[]中只有一個(gè)值符相,則直接獲取對(duì)應(yīng)索引的Invoker拆融;否則按照 RandomLoadBalance 的邏輯進(jìn)行選擇
*/
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// 獲取當(dāng)前Invoker當(dāng)前方法的活躍數(shù),該活躍數(shù)由 ActiveLimitFilter 進(jìn)行記錄
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number
int afterWarmup = getWeight(invoker, invocation); // Weight
if (leastActive == -1 || active < leastActive) { // 如果遍歷的是第一個(gè)Invoker或者有更小的活躍數(shù)主巍,所有的計(jì)數(shù)清空冠息,重新進(jìn)行初始化
leastActive = active; // 記錄最小活躍數(shù)
leastCount = 1; // Reset leastCount, count again based on current leastCount
leastIndexs[0] = i; // Reset
totalWeight = afterWarmup; // Reset
firstWeight = afterWarmup; // 記錄第一個(gè)被遍歷的Invoker的權(quán)重
sameWeight = true; // Reset, every invoker has the same weight value?
} else if (active == leastActive) { // 如果遍歷到的Invoker的活躍數(shù)active與之前記錄的leastActive相同
leastIndexs[leastCount++] = i; // 則將當(dāng)前的Invoker記錄到 leastIndexs[] 中
totalWeight += afterWarmup; // Add this invoker's weight to totalWeight.
// 判斷所有的Invoker是否都有相同的權(quán)重?
if (sameWeight && i > 0 && afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
if (leastCount == 1) {
return invokers.get(leastIndexs[0]);
}
// 后續(xù)的邏輯與 RandomLoadBalance 相同
if (!sameWeight && totalWeight > 0) {
int offsetWeight = random.nextInt(totalWeight) + 1;
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexs[i];
offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
if (offsetWeight <= 0) {
return invokers.get(leastIndex);
}
}
}
return invokers.get(leastIndexs[random.nextInt(leastCount)]);
}
}
- 總體步驟(都是針對(duì)當(dāng)前客戶端對(duì)指定 Invoker 的并發(fā)執(zhí)行數(shù))
- 初始化最小活躍數(shù)的 Invoker 列表:leastIndexs[]
遍歷所有的 Invoker,
1.1. 獲取每一個(gè) Invoker 的當(dāng)前被調(diào)用方法的活躍數(shù) active 及其權(quán)重孕索;
1.2. 如果遍歷到的 Invoker 是第一個(gè)遍歷的 Invoker 或者有更小的活躍數(shù)的 Invoker,所有的計(jì)數(shù)清空躏碳,重新進(jìn)行初始化搞旭;
1.3. 如果遍歷到的 Invoker 的活躍數(shù) active 與之前記錄的 leastActive 相同,則將當(dāng)前的 Invoker 記錄到 leastIndexs[] 中
判斷所有的 Invoker 是否都有相同的權(quán)重菇绵。
- 如果 leastIndexs[] 中只有一個(gè)值肄渗,則直接獲取對(duì)應(yīng)索引的 Invoker;否則按照 RandomLoadBalance 的邏輯進(jìn)行選擇:如果總權(quán)重=0或者所有的 Invoker 都有相同的權(quán)重咬最,則直接隨機(jī)獲若岬铡;如果總權(quán)重>0并且不是所有的 Invoker 都有相同的權(quán)重永乌,則根據(jù)權(quán)重進(jìn)行隨機(jī)獲取惑申。
最后看下 ActiveLimitFilter
的相關(guān)邏輯:(關(guān)于 ActiveLimitFilter
后續(xù)進(jìn)行分析)
/**
* 1. 僅用于 consumer 端
* 2. 需要配置 actives 參數(shù)
* <0,只記錄活躍數(shù)(并發(fā)度)
* >0, 記錄活躍數(shù)(并發(fā)度)+ 限流(限制每個(gè)客戶端的并發(fā)執(zhí)行數(shù))
*/
@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)
public class ActiveLimitFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
// 獲取最大并發(fā)數(shù) actives=10
int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
// 獲取當(dāng)前調(diào)用方法的RpcStatus
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
// 只有當(dāng)配置的 actives>0翅雏,才會(huì)做并發(fā)度限流圈驼,否則只是簡(jiǎn)單的計(jì)數(shù)
if (max > 0) {
// 限流操作
...
}
long begin = System.currentTimeMillis();
// 當(dāng)前活躍數(shù) + 1
RpcStatus.beginCount(url, methodName);
try {
// 真正調(diào)用
Result result = invoker.invoke(invocation);
// 正常結(jié)束:當(dāng)前活躍數(shù)-1
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
return result;
} catch (RuntimeException t) {
// 發(fā)生異常:當(dāng)前活躍數(shù)-1,拋出異常
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
throw t;
}
}
}
六望几、ConsistentHashLoadBalance
關(guān)于一致性 hash 的介紹及其優(yōu)點(diǎn)绩脆,見 https://www.cnblogs.com/java-zhao/p/5158034.html
- 一致性 Hash涛舍,相同參數(shù)的請(qǐng)求總是發(fā)到同一提供者懊亡。
- 當(dāng)某一臺(tái)提供者掛時(shí)涂滴,原本發(fā)往該提供者的請(qǐng)求屈嗤,基于虛擬節(jié)點(diǎn)鬼癣,平攤到其它提供者凡桥,不會(huì)引起劇烈變動(dòng)抢肛。(https://www.cnblogs.com/java-zhao/p/5158034.html)
- 缺省只對(duì)第一個(gè)參數(shù)值 hash甸各,如果要修改慌随,請(qǐng)配置
<dubbo:parameter key="hash.arguments" value="0,1" />
(表示對(duì)前兩個(gè)參數(shù)值進(jìn)行 hash)- 缺省用
160
份虛擬節(jié)點(diǎn)芬沉,如果要修改躺同,請(qǐng)配置<dubbo:parameter key="hash.nodes" value="320" />
/**
* ConsistentHashLoadBalance
* 1. 組裝 serviceKey.methodName => {group/}interface{:version}.methodName,獲取或創(chuàng)建(第一次或者invokers發(fā)生了變化)該 serviceKey 對(duì)應(yīng)的的 selector
* 為每一個(gè)Invoker創(chuàng)建160個(gè)虛擬節(jié)點(diǎn)丸逸,存儲(chǔ)到 TreeMap 中
* key的計(jì)算(需要將key打散): 對(duì)于每一個(gè)Invoker蹋艺,
* a) 40次外層循環(huán):md5(ip:port+i)(i=[0~39]),此時(shí)生成一個(gè)長(zhǎng)度為16的字節(jié)數(shù)組 byte[] digest
* b) 4層內(nèi)層循環(huán):分別對(duì) digest 按照每四個(gè)字節(jié)(h=0=>digest[0~3],h=1=>digest[4~7],...)進(jìn)行hash(公式:hash(md5(ip:port+i),h)(h=[0~3]))黄刚,計(jì)算出4個(gè)不同的Long捎谨,該值作為TreeMap的key
* value:當(dāng)前遍歷的 Invoker
*
* 2. 根據(jù)請(qǐng)求參數(shù)值 invocation.getArguments() 使用 selector 獲取 Invoker
* a) 獲取argumentIndex[](默認(rèn)只是用第一個(gè)參數(shù)值,可配置)中指定的參數(shù)值憔维,連接起來作為key
* b) 對(duì)該key進(jìn)行md5涛救,得到長(zhǎng)度為16的字節(jié)數(shù)組 byte[] digest,對(duì)digest[0~3]進(jìn)行hash
* c) 從 TreeMap 中獲取第一個(gè) >= 該hash 的Entry业扒,如果沒有獲取到检吆,則直接獲取 TreeMap 的第一個(gè) Entry 元素
* d) 返回該 Entry 的 value 值,即 Invoker
*/
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
/**
* key: serviceKey.methodName => {group/}interface{:version}.methodName
*/
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
@SuppressWarnings("unchecked")
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 1. 獲取方法名
String methodName = RpcUtils.getMethodName(invocation);
// 2. 組裝key:serviceKey.methodName => {group/}interface{:version}.methodName
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
// 3. 計(jì)算 invokers 的 hash 值
int identityHashCode = System.identityHashCode(invokers);
// 4. 根據(jù) key 獲取相應(yīng)的 selector 實(shí)例
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
// 5. 如果是第一次創(chuàng)建selector || invokers已經(jīng)發(fā)生了變化(宕機(jī)或者添加了新的機(jī)器 - 此時(shí)identityHashCode發(fā)生了變化)程储,
// 則新建selector蹭沛,并存儲(chǔ)到緩存中
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
// 6. 根據(jù)請(qǐng)求參數(shù)invocation使用selector獲取Invoker(一致性hash:相同的請(qǐng)求參數(shù)值會(huì)請(qǐng)求到同一臺(tái)機(jī)器Invoker上去)
return selector.select(invocation);
}
/**
* 該 selector 是針對(duì) key:serviceKey.methodName => {group/}interface{:version}.methodName 的存儲(chǔ)器
*/
private static final class ConsistentHashSelector<T> {
/**
* 虛擬節(jié)點(diǎn)
*/
private final TreeMap<Long, Invoker<T>> virtualInvokers;
/**
* 虛擬節(jié)點(diǎn)數(shù),默認(rèn)是 160
* <dubbo:consumer>
* <dubbo:parameter key="hash.nodes" value="200"/>
* </dubbo:consumer>
*/
private final int replicaNumber;
/**
* 默認(rèn)只對(duì)第一個(gè)參數(shù)進(jìn)行hash章鲤,配置 hash.arguments=0,1摊灭,表示對(duì)前兩個(gè)參數(shù)進(jìn)行hash
* <dubbo:consumer>
* <dubbo:parameter key="hash.arguments" value="0,1"/>
* </dubbo:consumer>
*/
private final int[] argumentIndex;
/**
* 如上key的全部Invoker列表的hash值
* 如果發(fā)生了變化,說明新添加了如上key的機(jī)器Invoker或者有如上key的機(jī)器Invoker下線败徊,此時(shí)需要重建selector
*/
private final int identityHashCode;
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
// 默認(rèn)的虛擬節(jié)點(diǎn)分片數(shù)為 160
this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
// 默認(rèn)只對(duì)第一個(gè)參數(shù)進(jìn)行 hash帚呼,配置 hash.arguments=0,1
String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
for (Invoker<T> invoker : invokers) {
// 1. 獲取真實(shí)節(jié)點(diǎn): ip:port
String address = invoker.getUrl().getAddress();
// 2. 生成虛擬節(jié)點(diǎn)
for (int i = 0; i < replicaNumber / 4; i++) {
// 2.1. 對(duì)ip:port+遞增數(shù)字做md5 -> 4個(gè)虛擬節(jié)點(diǎn)的總標(biāo)識(shí)digest(16字節(jié)長(zhǎng)度)
byte[] digest = md5(address + i);
for (int h = 0; h < 4; h++) {
// 2.2. 再對(duì)digest進(jìn)行hash(每四位)得到最終的每個(gè)虛擬節(jié)點(diǎn)的標(biāo)識(shí)m,m作為TreeMap的key皱蹦,Invoker作為value煤杀,存儲(chǔ)起來
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}
public Invoker<T> select(Invocation invocation) {
// 1. 獲取argumentIndex中指定的參數(shù)值,連接起來作為key
String key = toKey(invocation.getArguments());
// 2. 對(duì)參數(shù)值連接key做md5
byte[] digest = md5(key);
// 3. 對(duì)digest[0~3]進(jìn)行hash根欧,然后進(jìn)行選擇
return selectForKey(hash(digest, 0));
}
// 獲取 argumentIndex[](默認(rèn)只是用第一個(gè)參數(shù)值怜珍,可配置)中指定的參數(shù)值,連接起來作為 key
private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
for (int i : argumentIndex) {
if (i >= 0 && i < args.length) {
buf.append(args[i]);
}
}
return buf.toString();
}
// TreeMap 是有序的樹形結(jié)構(gòu)凤粗。
// 1. 首先獲取至少大于或者等于當(dāng)前key的Entry - 即完成順時(shí)針查找的目的
// 2. 如果沒有找到酥泛,則直接獲取第一個(gè)Entry
private Invoker<T> selectForKey(long hash) {
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
// number=0,對(duì) digest[0~3] 進(jìn)行 hash
// number=1嫌拣,對(duì) digest[4~7] 進(jìn)行 hash
// number=2柔袁,對(duì) digest[8~11] 進(jìn)行 hash
// number=3,對(duì) digest[12~15] 進(jìn)行 hash
private long hash(byte[] digest, int number) {
...
}
// 返回 16 個(gè)字節(jié)長(zhǎng)度的字節(jié)數(shù)組
private byte[] md5(String value) {
...
}
}
}
- 總體步驟
- 組裝
serviceKey.methodName
=> {group/}interface{:version}.methodName异逐,獲取或創(chuàng)建(第一次或者 invokers 發(fā)生了變化
)該 serviceKey 對(duì)應(yīng)的的 selector捶索。
為每一個(gè)Invoker創(chuàng)建160個(gè)虛擬節(jié)點(diǎn),存儲(chǔ)到 TreeMap 中:
- key 的計(jì)算(需要將 key 打散): 對(duì)于每一個(gè) Invoker灰瞻,
- a) 40次外層循環(huán):md5(ip:port+i)(i=[0~39])腥例,此時(shí)生成一個(gè)長(zhǎng)度為16的字節(jié)數(shù)組 byte[] diges
- b) 4次內(nèi)層循環(huán):分別對(duì) digest 按照每四個(gè)字節(jié)(h=0=>digest[0 ~ 3]辅甥,h=1=>digest[4 ~ 7],...)進(jìn)行hash(公式:hash(md5(ip:port+i),h)(h=[0~3])),計(jì)算出4個(gè)不同的Long燎竖,該值作為TreeMap的key
- value:當(dāng)前遍歷的 Invoker
- 根據(jù)請(qǐng)求參數(shù)值 invocation.getArguments() 使用 selector 獲取 Invoker
a) 獲取argumentIndex[](默認(rèn)只是用第一個(gè)參數(shù)值璃弄,可配置)中指定的參數(shù)值,連接起來作為key
b) 對(duì)該key進(jìn)行md5构回,得到長(zhǎng)度為16的字節(jié)數(shù)組 byte[] digest夏块,對(duì) digest[0~3] 進(jìn)行 hash
c) 從 TreeMap 中獲取第一個(gè) >= 該hash 的 Entry,如果沒有獲取到纤掸,則直接獲取 TreeMap 的第一個(gè) Entry 元素
d) 返回該 Entry 的 value 值脐供,即 Invoker