目錄
dubbo 拓展機(jī)制 SPI
dubbo 自適應(yīng)拓展機(jī)制
dubbo 服務(wù)導(dǎo)出
dubbo 服務(wù)引用
dubbo 服務(wù)字典
dubbo 服務(wù)路由
dubbo 集群
dubbo 負(fù)載均衡
dubbo 服務(wù)調(diào)用過(guò)程
1. AbstractLoadBalance
所有的負(fù)載均衡都繼承了AbstractLoadBalance,我們先來(lái)看看這個(gè)類
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
if (invokers.size() == 1) {
return invokers.get(0);
}
return doSelect(invokers, url, invocation);
}
主要是調(diào)用子類實(shí)現(xiàn)的doSelect方法激涤,這個(gè)類中還有些其他的方法如:getWeight計(jì)算服務(wù)提供者權(quán)重倦踢。
int getWeight(Invoker<?> invoker, Invocation invocation) {
// 從 url 中獲取權(quán)重 weight 配置值
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
if (weight > 0) {
// 獲取服務(wù)提供者啟動(dòng)時(shí)間戳
long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
// 計(jì)算服務(wù)提供者運(yùn)行時(shí)長(zhǎng)
long uptime = System.currentTimeMillis() - timestamp;
if (uptime < 0) {
return 1;
}
// 獲取服務(wù)預(yù)熱時(shí)間,默認(rèn)為10分鐘
int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
// 如果服務(wù)運(yùn)行時(shí)間小于預(yù)熱時(shí)間边涕,則重新計(jì)算服務(wù)權(quán)重,即降權(quán)
if (uptime > 0 && uptime < warmup) {
// 重新計(jì)算服務(wù)權(quán)重
weight = calculateWarmupWeight((int)uptime, warmup, weight);
}
}
}
return Math.max(weight, 0);
}
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
// 計(jì)算權(quán)重宠蚂,下面代碼邏輯上形似于 (uptime / warmup) * weight求厕。
// 隨著服務(wù)運(yùn)行時(shí)間 uptime 增大扰楼,權(quán)重計(jì)算值 ww 會(huì)慢慢接近配置weight
int ww = (int) ( uptime / ((float) warmup / weight));
return ww < 1 ? 1 : (Math.min(ww, weight));
}
上面是權(quán)重的計(jì)算過(guò)程弦赖,該過(guò)程主要用于保證當(dāng)服務(wù)運(yùn)行時(shí)長(zhǎng)小于服務(wù)預(yù)熱時(shí)間時(shí)蹬竖,對(duì)服務(wù)進(jìn)行降權(quán)案腺,避免讓服務(wù)在啟動(dòng)之初就處于高負(fù)載狀態(tài)。服務(wù)預(yù)熱是一個(gè)優(yōu)化手段访递,與此類似的還有 JVM 預(yù)熱拷姿。主要目的是讓服務(wù)啟動(dòng)后“低功率”運(yùn)行一段時(shí)間响巢,使其效率慢慢提升至最佳狀態(tài)棒妨。
2.AbstractLoadBalance的子類
2.1 RandomLoadBalance
按權(quán)重設(shè)置隨機(jī)概率進(jìn)行服務(wù)的調(diào)用
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// Every invoker has the same weight?
boolean sameWeight = true;
// the weight of every invokers
int[] weights = new int[length];
// the first invoker's weight
int firstWeight = getWeight(invokers.get(0), invocation);
weights[0] = firstWeight;
// The sum of weights
int totalWeight = firstWeight;
// 下面這個(gè)循環(huán)有兩個(gè)作用券腔,第一是計(jì)算總權(quán)重 totalWeight,
// 第二是檢測(cè)每個(gè)服務(wù)提供者的權(quán)重是否相同
for (int i = 1; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
// save for later use
weights[i] = weight;
// Sum
totalWeight += weight;
if (sameWeight && weight != firstWeight) {
sameWeight = false;
}
}
if (totalWeight > 0 && !sameWeight) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
// 使用ThreadLocalRandom線程安全的獲取一個(gè)0-totalWeight之間的隨機(jī)數(shù)
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
// 讓隨機(jī)值 offset 減去權(quán)重值
for (int i = 0; i < length; i++) {
offset -= weights[i];
// 返回相應(yīng)的 Invoker
if (offset < 0) {
return invokers.get(i);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
// 如果所有服務(wù)提供者權(quán)重值相同,此時(shí)直接隨機(jī)返回一個(gè)即可
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
2.2 LeastActiveLoadBalance
最少活躍調(diào)用數(shù)烟瞧,相同活躍數(shù)的隨機(jī),活躍數(shù)指調(diào)用前后計(jì)數(shù)差参滴。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// invokers的數(shù)量
int length = invokers.size();
// 最近的活躍數(shù)
int leastActive = -1;
// 具有相同“最小活躍數(shù)”的服務(wù)者提供者數(shù)量
int leastCount = 0;
// leastIndexs 用于記錄具有相同“最小活躍數(shù)”的 Invoker 在 invokers 列表中的下標(biāo)信息
int[] leastIndexes = new int[length];
int[] weights = new int[length];
int totalWeight = 0;
// 第一個(gè)最小活躍數(shù)的 Invoker 權(quán)重值,用于與其他具有相同最小活躍數(shù)的 Invoker 的權(quán)重進(jìn)行對(duì)比请唱,
// 以檢測(cè)是否“所有具有相同最小活躍數(shù)的 Invoker 的權(quán)重”均相等
int firstWeight = 0;
boolean sameWeight = true;
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// 獲取 Invoker 對(duì)應(yīng)的活躍數(shù)
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// 獲取invoker的權(quán)重
int afterWarmup = getWeight(invoker, invocation);
weights[i] = afterWarmup;
if (leastActive == -1 || active < leastActive) {
// 使用當(dāng)前活躍數(shù) active 更新最小活躍數(shù) leastActive
leastActive = active;
// 更新 leastCount 為 1
leastCount = 1;
leastIndexes[0] = i;
totalWeight = afterWarmup;
firstWeight = afterWarmup;
sameWeight = true;
}
// 當(dāng)前 Invoker 的活躍數(shù) active 與最小活躍數(shù) leastActive 相同
else if (active == leastActive) {
leastIndexes[leastCount++] = i;
totalWeight += afterWarmup;
if (sameWeight && i > 0
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// 當(dāng)只有一個(gè) Invoker 具有最小活躍數(shù)过蹂,此時(shí)直接返回該 Invoker 即可
if (leastCount == 1) {
return invokers.get(leastIndexes[0]);
}
// 有多個(gè) Invoker 具有相同的最小活躍數(shù)十绑,但它們之間的權(quán)重不同
if (!sameWeight && totalWeight > 0) {
// 隨機(jī)生成一個(gè) [0, totalWeight) 之間的數(shù)字
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
// 循環(huán)讓隨機(jī)數(shù)減去具有最小活躍數(shù)的 Invoker 的權(quán)重值,
// 當(dāng) offset 小于等于0時(shí)酷勺,返回相應(yīng)的 Invoker
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
return invokers.get(leastIndex);
}
}
}
// 如果權(quán)重相同或權(quán)重為0時(shí)本橙,隨機(jī)返回一個(gè) Invoker
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
2.3 ConsistentHashLoadBalance
一致性 Hash,相同參數(shù)的請(qǐng)求總是發(fā)到同一提供者脆诉。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String methodName = RpcUtils.getMethodName(invocation);
// 返回 {group}/{interfaceName}:{version}.methodName
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
// 獲取 invokers 原始的 hashcode
int identityHashCode = System.identityHashCode(invokers);
// 如果 invokers 是一個(gè)新的 List 對(duì)象甚亭,意味著服務(wù)提供者數(shù)量發(fā)生了變化,可能新增也可能減少了击胜。
// 此時(shí) selector.identityHashCode != identityHashCode 條件成立
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
// 調(diào)用 ConsistentHashSelector 的 select 方法選擇 Invoker
return selector.select(invocation);
}
如上,doSelect 方法主要做了一些前置工作暇唾,比如檢測(cè) invokers 列表是不是變動(dòng)過(guò),以及創(chuàng)建 ConsistentHashSelector。這些工作做完后孽糖,接下來(lái)開(kāi)始調(diào)用 ConsistentHashSelector 的 select 方法執(zhí)行負(fù)載均衡邏輯。在分析 select 方法之前,我們先來(lái)看一下一致性 hash 選擇器 ConsistentHashSelector 的初始化過(guò)程铡恕,如下:
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
// 使用 TreeMap 存儲(chǔ) Invoker 虛擬節(jié)點(diǎn)
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
// 獲取虛擬節(jié)點(diǎn)數(shù),默認(rèn)為160
this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
// 獲取參與 hash 計(jì)算的參數(shù)下標(biāo)值,默認(rèn)對(duì)第一個(gè)參數(shù)進(jìn)行 hash 運(yùn)算
String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
for (Invoker<T> invoker : invokers) {
String address = invoker.getUrl().getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
// 對(duì) address + i 進(jìn)行 md5 運(yùn)算其垄,得到一個(gè)長(zhǎng)度為16的字節(jié)數(shù)組
byte[] digest = md5(address + i);
// 對(duì) digest 部分字節(jié)進(jìn)行4次 hash 運(yùn)算窟扑,得到四個(gè)不同的 long 型正整數(shù)
for (int h = 0; h < 4; h++) {
// h = 0 時(shí)橘霎,取 digest 中下標(biāo)為 0 ~ 3 的4個(gè)字節(jié)進(jìn)行位運(yùn)算
// h = 1 時(shí)忱辅,取 digest 中下標(biāo)為 4 ~ 7 的4個(gè)字節(jié)進(jìn)行位運(yùn)算
// h = 2, h = 3 時(shí)過(guò)程同上
long m = hash(digest, h);
// 將 hash 到 invoker 的映射關(guān)系存儲(chǔ)到 virtualInvokers 中橡卤,
// virtualInvokers 需要提供高效的查詢操作巧勤,因此選用 TreeMap 作為存儲(chǔ)結(jié)構(gòu)
virtualInvokers.put(m, invoker);
}
}
}
}
完成初始化后就用select方法開(kāi)始選擇invoker
public Invoker<T> select(Invocation invocation) {
// 將參數(shù)轉(zhuǎn)為 key
String key = toKey(invocation.getArguments());
// 對(duì)參數(shù) key 進(jìn)行 md5 運(yùn)算
byte[] digest = md5(key);
// 取 digest 數(shù)組的前四個(gè)字節(jié)進(jìn)行 hash 運(yùn)算,再將 hash 值傳給 selectForKey 方法驹溃,
// 尋找合適的 Invoker
return selectForKey(hash(digest, 0));
}
private Invoker<T> selectForKey(long hash) {
// 到 TreeMap 中查找第一個(gè)節(jié)點(diǎn)值大于或等于當(dāng)前 hash 的 Invoker
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
// 如果 hash 大于 Invoker 在圓環(huán)上最大的位置亡哄,此時(shí) entry = null,
// 需要將 TreeMap 的頭節(jié)點(diǎn)賦值給 entry
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
// 返回 Invoker
return entry.getValue();
}
2.4 RoundRobinLoadBalance
輪詢截型,按公約后的權(quán)重設(shè)置輪詢比率睁搭。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// key = 全限定類名 + "." + 方法名,比如 com.xxx.DemoService.sayHello
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map == null) {
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
map = methodWeightMap.get(key);
}
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (Invoker<T> invoker : invokers) {
// 獲取url的標(biāo)示性字符串
String identifyString = invoker.getUrl().toIdentityString();
WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
// 計(jì)算權(quán)重
int weight = getWeight(invoker, invocation);
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
// 設(shè)置 Invoker 權(quán)重
weightedRoundRobin.setWeight(weight);
// 存儲(chǔ) url 唯一標(biāo)識(shí) identifyString 到 weightedRoundRobin 的映射關(guān)系
map.putIfAbsent(identifyString, weightedRoundRobin);
}
// Invoker 權(quán)重不等于 WeightedRoundRobin 中保存的權(quán)重,說(shuō)明權(quán)重變化了,此時(shí)進(jìn)行更新
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
weightedRoundRobin.setWeight(weight);
}
// current + weight
long cur = weightedRoundRobin.increaseCurrent();
// 設(shè)置 lastUpdate,表示近期更新過(guò)
weightedRoundRobin.setLastUpdate(now);
// 找出最大的 current
if (cur > maxCurrent) {
maxCurrent = cur;
// 將具有最大 current 權(quán)重的 Invoker 賦值給 selectedInvoker
selectedInvoker = invoker;
// 將 Invoker 對(duì)應(yīng)的 weightedRoundRobin 賦值給 selectedWRR,留作后用
selectedWRR = weightedRoundRobin;
}
totalWeight += weight;
}
// 對(duì) <identifyString, WeightedRoundRobin> 進(jìn)行檢查厅翔,過(guò)濾掉長(zhǎng)時(shí)間未被更新的節(jié)點(diǎn)。
// 該節(jié)點(diǎn)可能掛了,invokers 中不包含該節(jié)點(diǎn),所以該節(jié)點(diǎn)的 lastUpdate 長(zhǎng)時(shí)間無(wú)法被更新。
// 若未更新時(shí)長(zhǎng)超過(guò)閾值后冰蘑,就會(huì)被移除掉,默認(rèn)閾值為60秒仇箱。
if (!updateLock.get() && invokers.size() != map.size()) {
if (updateLock.compareAndSet(false, true)) {
try {
// copy -> modify -> update reference
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
methodWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
}
if (selectedInvoker != null) {
// 讓 current 減去權(quán)重總和属提,等價(jià)于 current -= totalWeight
selectedWRR.sel(totalWeight);
// 返回具有最大 current 的 Invoker
return selectedInvoker;
}
// should not happen here
return invokers.get(0);
}