接著上篇的文章來。
負(fù)載均衡算法
- 代碼出處括蝠,使用了一個工具類完成了負(fù)載均衡后端節(jié)點(diǎn)的選取
DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
- 看下這里的源碼
public static DivideUpstream selector(final List<DivideUpstream> upstreamList, final String algorithm, final String ip) {
LoadBalance loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getJoin(algorithm);
return loadBalance.select(upstreamList, ip);
}
- 可以看出這里是通過
SPI
的機(jī)制,根據(jù)傳入算法名稱跷叉,運(yùn)行時(shí)動態(tài)得到具體的負(fù)載均衡實(shí)現(xiàn),由擴(kuò)展加載器加載到的對應(yīng)的負(fù)載均衡實(shí)例 - 先看看這里的
SPI
擴(kuò)展點(diǎn)機(jī)制,然后再去看具體的負(fù)載均衡算法
SPI擴(kuò)展點(diǎn)機(jī)制
-
org.dromara.soul.spi.ExtensionLoader
關(guān)鍵代碼
public static <T> ExtensionLoader<T> getExtensionLoader(final Class<T> clazz) {
if (clazz == null) {
throw new NullPointerException("extension clazz is null");
}
if (!clazz.isInterface()) {
throw new IllegalArgumentException("extension clazz (" + clazz + ") is not interface!");
}
if (!clazz.isAnnotationPresent(SPI.class)) {
throw new IllegalArgumentException("extension clazz (" + clazz + ") without @" + SPI.class + " Annotation");
}
ExtensionLoader<T> extensionLoader = (ExtensionLoader<T>) LOADERS.get(clazz);
if (extensionLoader != null) {
return extensionLoader;
}
LOADERS.putIfAbsent(clazz, new ExtensionLoader<>(clazz));
return (ExtensionLoader<T>) LOADERS.get(clazz);
}
- 使用
SPI
機(jī)制是soul
提供出來的擴(kuò)展點(diǎn)性芬,如果使用用戶想要實(shí)現(xiàn)一套自己的負(fù)載均衡算法峡眶,則只要根據(jù)soul
的規(guī)范剧防,并做相應(yīng)的配置就可以完成了植锉。
// TODO 寫一個簡單例子演示
- 我們了解下 擴(kuò)展點(diǎn)加載器
ExtensionLoader
,該加載器參考了dubbo
中的ExtensionLoader
峭拘,為其縮減版俊庇;對比之下,主要減少了依賴注入功能鸡挠。跟jdk
原生的SPI
機(jī)制相比辉饱,實(shí)現(xiàn)了按需加載,因jdk
會一次性將所有的擴(kuò)展實(shí)現(xiàn)一次性加載的拣展。 - 關(guān)于
SPI
擴(kuò)展點(diǎn)機(jī)制 -
soul
中SPI
擴(kuò)展點(diǎn)機(jī)制就先介紹到這里彭沼,我們看下soul
提供的幾種負(fù)載均衡算法
負(fù)載均衡算法實(shí)現(xiàn)
- 從源碼得出,負(fù)載均衡算法實(shí)現(xiàn)默認(rèn)???3種
random
备埃、roundRobin
,hash
random
-加權(quán)隨機(jī)
- 看代碼實(shí)際上是一個加權(quán)的隨機(jī)算法
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
// 計(jì)算所有后端節(jié)點(diǎn)的權(quán)重值
int totalWeight = calculateTotalWeight(upstreamList);
// 判斷是否所有的節(jié)點(diǎn)為相同權(quán)重的情況
boolean sameWeight = isAllUpStreamSameWeight(upstreamList);
if (totalWeight > 0 && !sameWeight) {
// 不是姓惑,則要加權(quán)隨機(jī)
return random(totalWeight, upstreamList);
}
// If the weights are the same or the weights are 0 then random
// 如果各節(jié)點(diǎn)權(quán)重都相同,則按照總的節(jié)點(diǎn)數(shù)N按脚,生成一個1-N之間的隨機(jī)數(shù)X于毙,然后選取出該X節(jié)點(diǎn)
return random(upstreamList);
}
- 加權(quán)隨機(jī)的實(shí)現(xiàn)稍稍復(fù)雜點(diǎn),看下源碼
private DivideUpstream random(final int totalWeight, final List<DivideUpstream> upstreamList) {
// If the weights are not the same and the weights are greater than 0, then random by the total number of weights
// 先拿到隨機(jī)數(shù)為區(qū)間辅搬,為1-總權(quán)重值N之間
int offset = RANDOM.nextInt(totalWeight);
// Determine which segment the random value falls on
// 遍歷所有的節(jié)點(diǎn)唯沮,每次循環(huán)都用區(qū)間=區(qū)間-當(dāng)前權(quán)重值,然后再看區(qū)間是否小于0堪遂;
// 若小于0介蛉,則證明剛好位于當(dāng)前節(jié)點(diǎn)的區(qū)間,返回當(dāng)前節(jié)點(diǎn)溶褪;若沒有币旧,則繼續(xù)循環(huán)
for (DivideUpstream divideUpstream : upstreamList) {
offset -= getWeight(divideUpstream);
if (offset < 0) {
return divideUpstream;
}
}
// 最后返回第一個兜底
return upstreamList.get(0);
}
roundRobin
-加權(quán)輪詢
- 加權(quán)輪詢的算法會有些難懂,我們先看下關(guān)鍵方法
doSelect
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
//TODO question 如果這批upstream第一個節(jié)點(diǎn)經(jīng)常變更竿滨,可能會導(dǎo)致前面節(jié)點(diǎn)生成的WeightedRoundRobin數(shù)據(jù)無法釋放
String key = upstreamList.get(0).getUpstreamUrl();
// 取出此批后端服務(wù)的權(quán)重對象 key -> 后端server佳恬,為ip:port value -> 后端server對應(yīng)的權(quán)重對象
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map == null) {
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<>(16));
map = methodWeightMap.get(key);
}
// 總權(quán)重值
int totalWeight = 0;
// 當(dāng)前請求權(quán)重的最大值,每次請求都會置為MIN_VALUE
long maxCurrent = Long.MIN_VALUE;
// 當(dāng)前系統(tǒng)時(shí)間
long now = System.currentTimeMillis();
// 選中節(jié)點(diǎn)
DivideUpstream selectedInvoker = null;
// 選中節(jié)點(diǎn)的權(quán)重
WeightedRoundRobin selectedWRR = null;
//TODO question 輪詢這里為什么沒有break于游,應(yīng)該找到選中節(jié)點(diǎn)后就跳出循環(huán)毁葱,不需要繼續(xù)循環(huán)下去
for (DivideUpstream upstream : upstreamList) {
// rkey -> 后端server,ip:port
String rKey = upstream.getUpstreamUrl();
// 后端server對應(yīng)的權(quán)重對象
WeightedRoundRobin weightedRoundRobin = map.get(rKey);
// 當(dāng)前后端節(jié)點(diǎn)的權(quán)重值
int weight = getWeight(upstream);
// 如果當(dāng)前后端server權(quán)重對象不存在贰剥,則需生成當(dāng)前后端server的權(quán)重對象倾剿,并添加到內(nèi)存
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(rKey, weightedRoundRobin);
}
// 節(jié)點(diǎn)權(quán)重值發(fā)生了變化,則需要重新設(shè)置
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
// 權(quán)重值發(fā)生變化后,將內(nèi)存中后端server權(quán)重更新為最新權(quán)重值
// 同時(shí)其內(nèi)部current值也將更新為0前痘,回到初始狀態(tài)
weightedRoundRobin.setWeight(weight);
}
// 內(nèi)部序列從0開始增凛捏,每次進(jìn)來都+自己的權(quán)重值
long cur = weightedRoundRobin.increaseCurrent();
// 重新設(shè)置更新時(shí)間
weightedRoundRobin.setLastUpdate(now);
// 如果節(jié)點(diǎn)當(dāng)前輪詢權(quán)重大于此次調(diào)用的最大權(quán)重值,則滿足項(xiàng)芹缔,可以作為選中節(jié)點(diǎn)
// 這里選中節(jié)點(diǎn)后坯癣,并不會停止循環(huán),會遍歷所有的節(jié)點(diǎn)最欠,將最后一個滿足條件的節(jié)點(diǎn)作為選中節(jié)點(diǎn)
// 這里是為了找到當(dāng)前權(quán)重最大的那個后端server示罗,將其作為選中節(jié)點(diǎn)
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = upstream;
selectedWRR = weightedRoundRobin;
}
// 總權(quán)重
totalWeight += weight;
}
// 更新標(biāo)志鎖位未更新 后端節(jié)點(diǎn)數(shù)不等于原有節(jié)點(diǎn)數(shù) 采用compareAndSet取鎖,獲取到鎖之后才更新
if (!updateLock.get() && upstreamList.size() != map.size() && updateLock.compareAndSet(false, true)) {
try {
// copy -> modify -> update reference
// 新的map
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
// 如果從輪詢開始芝硬,到結(jié)束蚜点,中間間隔了1min鐘,則需要移除該后端server的權(quán)重對象數(shù)據(jù)
// 這里是為了移除掉那些下線的節(jié)點(diǎn)拌阴,因?yàn)橹灰斜惠喸兩芑妫銵astUpdate=now;而超過1min鐘迟赃,都沒有輪詢到的陪拘,則就是已下線的節(jié)點(diǎn)
newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > recyclePeriod);
// 將新的后端節(jié)點(diǎn)的權(quán)重?cái)?shù)據(jù)替換舊有的
methodWeightMap.put(key, newMap);
} finally {
// 最終將鎖的標(biāo)志為設(shè)置為false
updateLock.set(false);
}
}
// 選中節(jié)點(diǎn)的處理
if (selectedInvoker != null) {
// 選中節(jié)點(diǎn)的內(nèi)部計(jì)數(shù)的當(dāng)前權(quán)重current要減去總的權(quán)重值,降低其當(dāng)前權(quán)重捺氢,也就意味著其下次被選中的優(yōu)先級被降低
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return upstreamList.get(0);
}
- 因不懂其思想藻丢,在
debug
源碼過程中,將關(guān)鍵變量變化的過程用表格記錄了下來摄乒,方便跟蹤邏輯
- 從上圖可以看出:當(dāng)權(quán)重為50:50時(shí)悠反,4次調(diào)用其最終選取出來的結(jié)果為1:1;當(dāng)權(quán)重為10:20時(shí)馍佑,6次調(diào)用其最終選中出來的結(jié)果為1:2斋否,前3次調(diào)用最終選取出來的結(jié)果也為1:2。
-
//TODO
很是神奇拭荤,不知曉其中原理 實(shí)現(xiàn)邏輯看起來比較復(fù)雜茵臭,簡單說來就是 對于每個 DivideUpstream 都維護(hù)一個權(quán)重對象,每次遍歷所有權(quán)重對象獲取到權(quán)重最大的那個 DivideUpstream舅世,同時(shí)減去被選中的 DivideUpstream 的權(quán)重(減去總權(quán)重旦委,優(yōu)先級降為最低)
- 參考:【高性能網(wǎng)關(guān)soul學(xué)習(xí)】11. 插件之DevidePlugin
hash
-IP哈希
- 實(shí)際上是基于遠(yuǎn)程客戶端
IP
的一個hash
負(fù)載均衡算法 -
doSelect
的源碼
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
// 并發(fā)跳表map key->后端server的hash值 value->后端節(jié)點(diǎn)
final ConcurrentSkipListMap<Long, DivideUpstream> treeMap = new ConcurrentSkipListMap<>();
for (DivideUpstream address : upstreamList) {
// 每個實(shí)際后端節(jié)點(diǎn)會生成5個虛擬節(jié)點(diǎn),用于更大范圍映射雏亚,類似于一致性hash
for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
// 后端server的hash
long addressHash = hash("SOUL-" + address.getUpstreamUrl() + "-HASH-" + i);
treeMap.put(addressHash, address);
}
}
// 遠(yuǎn)程客戶端ip的hash
long hash = hash(String.valueOf(ip));
// 如果 請求地址的Hash值 右邊存在節(jié)點(diǎn)缨硝,返回右邊第一個
// treeMap.tailMap(hash) 會返回所有(key >= hash)的映射集合,同時(shí)是有序的罢低。
SortedMap<Long, DivideUpstream> lastRing = treeMap.tailMap(hash);
// 如果找到了查辩,則從找到的entry拿出后端節(jié)點(diǎn)
if (!lastRing.isEmpty()) {
// 這里也就是取 url hash 值右邊的第一個節(jié)點(diǎn)
return lastRing.get(lastRing.firstKey());
}
// 沒找到,則直接去第一個節(jié)點(diǎn)兜底
return treeMap.firstEntry().getValue();
}
- 有點(diǎn)高級,內(nèi)部使用了
ConcurrentSkipListMap
結(jié)構(gòu)宜岛,以及一致性hash
中虛擬節(jié)點(diǎn)的技術(shù)长踊。 - 關(guān)于
ConcurrentSkipListMap
- 關(guān)于
一致性hash
算法
總結(jié)
-
Divide
插件這塊的內(nèi)容還挺多的萍倡,后面還需要將負(fù)載均衡這塊的邏輯重點(diǎn)看一看身弊,搞清楚roundRobin
與hash
算法的實(shí)現(xiàn),同時(shí)也去看看其他框架中這兩種算法的實(shí)現(xiàn) - 已分析
Divide
插件中重點(diǎn)塊:后端節(jié)點(diǎn)的探活機(jī)制遣铝、負(fù)載均衡算法 - 后續(xù)還有
http
服務(wù)的請求轉(zhuǎn)發(fā)和websocket
服務(wù)的支持 - 關(guān)于負(fù)載均衡分析的好文章