看負(fù)載均衡器這源碼,好繞兔魂,看的好累。
雖然Spring Cloud
中定義了LoadBalancerClient
作為負(fù)載均衡器的通用接口,并且針對(duì)Ribbon實(shí)現(xiàn)了RibbonLoadBalancerClient
初肉,但是它作為具體實(shí)現(xiàn)客戶端負(fù)載均衡時(shí),是通過(guò)Ribbon的com.netflix.loadbalancer.ILoadBalancer
接口實(shí)現(xiàn)的郊愧。
總結(jié)一下:
ILoadBalancer接口實(shí)現(xiàn)類(lèi)做了以下的一些事情:
1.維護(hù)了存儲(chǔ)服務(wù)實(shí)例Server對(duì)象的二個(gè)列表朴译。一個(gè)用于存儲(chǔ)所有服務(wù)實(shí)例的清單,一個(gè)用于存儲(chǔ)正常服務(wù)的實(shí)例清單
2.初始化得到可用的服務(wù)列表属铁,啟動(dòng)定時(shí)任務(wù)去實(shí)時(shí)的檢測(cè)服務(wù)列表中的服務(wù)的可用性眠寿,并且間斷性的去更新服務(wù)列表,結(jié)合注冊(cè)中心焦蘑。
3.選擇可用的服務(wù)進(jìn)行調(diào)用(這個(gè)一般交給IRule去實(shí)現(xiàn)盯拱,不同的輪詢策略)三個(gè)很重要的概念
- ServerList接口:定義用于獲取服務(wù)器列表的方法的接口,主要實(shí)現(xiàn)DomainExtractingServerList接口,每隔30s種執(zhí)行g(shù)etUpdatedListOfServers方法進(jìn)行服務(wù)列表的更新例嘱。
- ServerListUpdater接口:主要實(shí)現(xiàn)類(lèi)EurekaNotificationServerListUpdater和PollingServerListUpdater(默認(rèn)使用的是PollingServerListUpdater狡逢,結(jié)合Eureka注冊(cè)中心,定時(shí)任務(wù)的方式進(jìn)行服務(wù)列表的更新)
- ServerListFilter接口:根據(jù)LoadBalancerStats然后根據(jù)一些規(guī)則去過(guò)濾部分服務(wù)拼卵,比如根據(jù)zone(區(qū)域感知)去過(guò)濾奢浑。(主要實(shí)現(xiàn)類(lèi)ZonePreferenceServerListFilter的getFilteredListOfServers會(huì)在更新服務(wù)列表的時(shí)候去執(zhí)行)。
com.netflix.loadbalancer.AbstractLoadBalancer
AbstractLoadBalancer contains features required for most loadbalancing
implementations.
An anatomy of a typical LoadBalancer consists of 1. A List of Servers (nodes)
that are potentially bucketed based on a specific criteria. 2. A Class that
defines and implements a LoadBalacing Strategy via IRule 3. A
Class that defines and implements a mechanism to determine the
suitability/availability of the nodes/servers in the List.
AbstractLoadBalancer包含大多數(shù)負(fù)載均衡實(shí)現(xiàn)的特征腋腮。
典型的LoadBalancer(負(fù)載均衡器)包括
1.一個(gè)基于某些特征的服務(wù)列表雀彼。
2.一個(gè)通過(guò)IRule定義和實(shí)現(xiàn)負(fù)載均衡戰(zhàn)略的類(lèi)。
3.一個(gè)用來(lái)確定列表節(jié)點(diǎn)/服務(wù)是否可用的類(lèi)即寡。
public abstract class AbstractLoadBalancer implements ILoadBalancer {
public enum ServerGroup{
//所有服務(wù)實(shí)例
ALL,
//正常服務(wù)的實(shí)例
STATUS_UP,
//停止服務(wù)的實(shí)例
STATUS_NOT_UP
}
/**
* 選擇具體的服務(wù)實(shí)例徊哑,key為null,忽略key的條件判斷
*/
public Server chooseServer() {
return chooseServer(null);
}
/**
* 定義了根據(jù)分組類(lèi)型來(lái)獲取不同的服務(wù)實(shí)例的列表聪富。
*/
public abstract List<Server> getServerList(ServerGroup serverGroup);
/**
* 定義了獲取LoadBalancerStats對(duì)象的方法莺丑,LoadBalancerStats對(duì)象被用來(lái)存儲(chǔ)負(fù)載均衡器中
* 各個(gè)服務(wù)實(shí)例當(dāng)前的屬性和統(tǒng)計(jì)信息。這些信息非常有用墩蔓,我們可以利用這些信息來(lái)觀察負(fù)載均衡
* 的運(yùn)行情況梢莽,同時(shí)這些信息也是用來(lái)制定負(fù)載均衡策略的重要依據(jù)萧豆。
*/
public abstract LoadBalancerStats getLoadBalancerStats();
}
com.netflix.loadbalancer.BaseLoadBalancer
com.netflix.loadbalancer.BaseLoadBalancer
類(lèi)是Ribbon負(fù)載均衡器的基礎(chǔ)實(shí)現(xiàn)類(lèi),在該類(lèi)中定義了很多關(guān)于負(fù)載均衡器相關(guān)的基礎(chǔ)內(nèi)容蟹漓。
定義并維護(hù)了兩種存儲(chǔ)服務(wù)實(shí)例Server對(duì)象的列表炕横。一個(gè)用于存儲(chǔ)所有服務(wù)實(shí)例的清單,一個(gè)用于存儲(chǔ)正常服務(wù)的實(shí)例清單葡粒。
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
.synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
.synchronizedList(new ArrayList<Server>());
- 定義了之前我們提到的用來(lái)存儲(chǔ)負(fù)載均衡實(shí)例屬性和統(tǒng)計(jì)信息的
LoadBalancerStates
對(duì)象份殿。 - 定義了檢查服務(wù)實(shí)例是否正常服務(wù)的
IPing
對(duì)象,在BaseLoadBalancer
中默認(rèn)為null嗽交,需要在構(gòu)造時(shí)注入它的具體實(shí)現(xiàn)卿嘲。
protected IPing ping = null;
- 定義了檢查服務(wù)實(shí)例操作的執(zhí)行策略對(duì)象
IPingStrategy
,在BaseLoadBalancer
中默認(rèn)使用了該類(lèi)中定義的靜態(tài)內(nèi)部類(lèi)SerialPingStrategy
實(shí)現(xiàn)夫壁。根據(jù)源碼拾枣,我們可以看到該策略采用線性遍歷ping
服務(wù)實(shí)例的方式實(shí)現(xiàn)檢查。該策略在當(dāng)IPing
對(duì)象的實(shí)現(xiàn)速度不理想盒让,或者是Server
列表過(guò)大時(shí)梅肤,可能會(huì)影響到系統(tǒng)性能,這時(shí)候需要通過(guò)實(shí)現(xiàn)IPingStrategy
接口并重寫(xiě)pingServer(IPing ping Server[] servers)
函數(shù)去擴(kuò)展ping
的執(zhí)行策略邑茄。
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
- 定義了負(fù)載均衡的處理原則
IRule
對(duì)象姨蝴,從BaseLoadBalancer
中chooseServer(Object key)
的實(shí)現(xiàn)源碼可以知道,負(fù)載均衡器實(shí)際將服務(wù)實(shí)例選擇的任務(wù)委托給IRule
實(shí)例中的choose
函數(shù)來(lái)實(shí)現(xiàn)肺缕。
默認(rèn)初始化了RoundRobinRule
實(shí)現(xiàn)左医,RoundRobinRule
實(shí)現(xiàn)了最基本且常用的線性負(fù)載均衡規(guī)則。
protected IRule rule = DEFAULT_RULE;
private final static IRule DEFAULT_RULE = new RoundRobinRule();
- 啟動(dòng)ping任務(wù):在
BaseLoadBalancer
的默認(rèn)構(gòu)造函數(shù)中同木,會(huì)直接啟動(dòng)一個(gè)用于定時(shí)檢查Server是否健康的任務(wù)浮梢。該任務(wù)默認(rèn)的執(zhí)行間隔式10s。
/**
* Default constructor which sets name as "default", sets null ping, and
* {@link RoundRobinRule} as the rule.
* <p>
* This constructor is mainly used by {@link ClientFactory}. Calling this
* constructor must be followed by calling {@link #init()} or
* {@link #initWithNiwsConfig(IClientConfig)} to complete initialization.
* This constructor is provided for reflection. When constructing
* programatically, it is recommended to use other constructors.
*/
public BaseLoadBalancer() {
this.name = DEFAULT_NAME;
this.ping = null;
setRule(DEFAULT_RULE);
setupPingTask();
lbStats = new LoadBalancerStats(DEFAULT_NAME);
}
定時(shí)任務(wù)
void setupPingTask() {
if (canSkipPing()) {
return;
}
if (lbTimer != null) {
lbTimer.cancel();
}
lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
true);
lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
forceQuickPing();
}
- 實(shí)現(xiàn)了
ILoadBalancer
接口定義的一系列的基本操作:
addServers(List<Server> newServers)
:向負(fù)載均衡器中增加新的服務(wù)實(shí)例列表彤路,該實(shí)現(xiàn)將原本已經(jīng)維護(hù)的所有服務(wù)實(shí)例清單allServerList
和新傳入的服務(wù)實(shí)例清單newServers
都加入了newList
中秕硝,然后通過(guò)調(diào)用setServersList
函數(shù)對(duì)newList
進(jìn)行處理,在BaseLoadBalancer
中實(shí)現(xiàn)的時(shí)候會(huì)使用新的列表覆蓋舊的列表洲尊。
/**
* Add a list of servers to the 'allServer' list; does not verify
* uniqueness, so you could give a server a greater share by adding it more
* than once
*/
@Override
public void addServers(List<Server> newServers) {
if (newServers != null && newServers.size() > 0) {
try {
ArrayList<Server> newList = new ArrayList<Server>();
newList.addAll(allServerList);
newList.addAll(newServers);
setServersList(newList);
} catch (Exception e) {
logger.error("Exception while adding Servers", e);
}
}
}
chooseServer(Object key) :挑選一個(gè)具體的服務(wù)實(shí)例缝裤,
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Throwable t) {
return null;
}
}
}
markServerDown(Server server):標(biāo)記某個(gè)服務(wù)實(shí)例為暫停服務(wù)。
public void markServerDown(Server server) {
if (server == null) {
return;
}
if (!server.isAlive()) {
return;
}
logger.error("LoadBalancer: markServerDown called on ["
+ server.getId() + "]");
server.setAlive(false);
//forceQuickPing();
notifyServerStatusChangeListener(singleton(server));
}
getReachableServers():獲取可用的服務(wù)實(shí)例列表颊郎。由于BaseLoadBalancer中單獨(dú)維護(hù)了一個(gè)正常服務(wù)的實(shí)例清單,所以直接返回即可
@Override
public List<Server> getReachableServers() {
return Collections.unmodifiableList(upServerList);
}
getAllServers():獲取所有的服務(wù)實(shí)例列表霎苗。由于BaseLoadBalancer中單獨(dú)維護(hù)了一個(gè)正常服務(wù)的實(shí)例清單姆吭,所以直接返回即可。
@Override
public List<Server> getAllServers() {
return Collections.unmodifiableList(allServerList);
}
com.netflix.loadbalancer.DynamicServerListLoadBalancer
com.netflix.loadbalancer.DynamicServerListLoadBalancer
類(lèi)繼承com.netflix.loadbalancer.BaseLoadBalancer
類(lèi)唁盏,它是對(duì)基礎(chǔ)負(fù)載均衡器的擴(kuò)展内狸。
該負(fù)載均衡器中检眯,實(shí)現(xiàn)了服務(wù)實(shí)例清單在運(yùn)行期的動(dòng)態(tài)更新能力;同時(shí)昆淡,它還具備了對(duì)服務(wù)實(shí)例清單的過(guò)濾功能锰瘸,也就是說(shuō),我們可以通過(guò)過(guò)濾器來(lái)選擇性的獲取一批服務(wù)實(shí)例清單昂灵。
ServerList
從DynamicServerListLoadBalancer
的成員定義中避凝,我們馬上可以發(fā)現(xiàn)新增了一個(gè)關(guān)于服務(wù)列表的操作對(duì)象ServerList<T> serverListImpl
。從類(lèi)名DynamicServerListLoadBalancer<T extends Server>
發(fā)現(xiàn)T泛型是Server
子類(lèi)眨补,即代表了一個(gè)具體的服務(wù)實(shí)例的擴(kuò)展類(lèi)管削,而ServerList
接口定義如下:
volatile ServerList<T> serverListImpl;
ServerList接口定義如下
/**
* Interface that defines the methods sed to obtain the List of Servers
* @author stonse
*
* @param <T>
*/
public interface ServerList<T extends Server> {
//用于獲取初始化的服務(wù)實(shí)例清單
public List<T> getInitialListOfServers();
//獲取更新的服務(wù)實(shí)例清單,每隔30s更新一次
public List<T> getUpdatedListOfServers();
}
其實(shí)現(xiàn)類(lèi):
DynamicServerListLoadBalancer
中的ServerList
默認(rèn)配置到底使用了哪些具體的實(shí)現(xiàn)呢撑螺?既然是該負(fù)載均衡器中實(shí)現(xiàn)服務(wù)實(shí)例的動(dòng)態(tài)更新含思,那么勢(shì)必需要Ribbon
訪問(wèn)Eureka
來(lái)獲取服務(wù)實(shí)例的能力,可以從Ribbon
整合Eureka
的包下去尋找甘晤,
org.springframework.cloud.netflix.ribbon.eureka.EurekaRibbonClientConfiguration
中的含潘,
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {
if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
return this.propertiesFactory.get(ServerList.class, config, serviceId);
}
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
config, eurekaClientProvider);
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}
可以看出創(chuàng)建了DomainExtractingServerList
實(shí)例,其內(nèi)部也維護(hù)了ServerList list
线婚,同時(shí)DomainExtractingServerList
類(lèi)中對(duì)getInitialListOfServers
和getUpdatedListOfServers
的具體實(shí)現(xiàn)遏弱,其實(shí)是委托給內(nèi)部定義的ServerList<DiscoveryEnabledServer> list
對(duì)象,而該對(duì)象是通過(guò)創(chuàng)建DiscoveryEnabledNIWSServerList
實(shí)例傳遞進(jìn)去的
org.springframework.cloud.netflix.ribbon.eureka.DomainExtractingServerList
的源碼:
那么DiscoveryEnabledNIWSServerList
是如何實(shí)現(xiàn)這兩個(gè)服務(wù)實(shí)例獲取的呢酌伊?從DiscoveryEnabledNIWSServerList
其源碼的私有方法obtainServersViaDiscovery
通過(guò)服務(wù)發(fā)現(xiàn)機(jī)制來(lái)實(shí)現(xiàn)服務(wù)實(shí)例的獲取的腾窝,
com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList的obtainServersViaDiscovery
方法,
@Override
public List<DiscoveryEnabledServer> getInitialListOfServers(){
return obtainServersViaDiscovery();
}
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
}
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}
EurekaClient eurekaClient = eurekaClientProvider.get();
//vipAddresses可以理解為邏輯上的服務(wù)名居砖,對(duì)這些服務(wù)名進(jìn)行遍歷奏候,將狀態(tài)為UP(正常服務(wù))的實(shí)例轉(zhuǎn)換成DiscoveryEnabledServer對(duì)象
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
// if targetRegion is null, it will be interpreted as the same region of client
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
}
// copy is necessary since the InstanceInfo builder just uses the original reference,
// and we don't want to corrupt the global eureka copy of the object which may be
// used by other clients in our system
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
des.setZone(DiscoveryClient.getZone(ii));
//將這些實(shí)例組織成列表返回。
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
}
}
}
return serverList;
}
ServerListUpdater
com.netflix.loadbalancer.DynamicServerListLoadBalancer
負(fù)載均衡器使用不同的策略進(jìn)行列表更新的策略蔗草。
上面分析了如何從Eureka Server中獲取服務(wù)實(shí)例清單,那么它又是如何觸發(fā)向Eureka Server去獲取服務(wù)實(shí)例清單以及如何在獲取到服務(wù)實(shí)例清單后更新本地實(shí)例清單呢?
回到com.netflix.loadbalancer.DynamicServerListLoadBalancer
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
//實(shí)現(xiàn)對(duì)服務(wù)列表的更新
updateListOfServers();
}
};
protected volatile ServerListUpdater serverListUpdater;
updateListOfServers方法
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
ServerListUpdater
的二種實(shí)現(xiàn):
PollingServerListUpdater
:動(dòng)態(tài)服務(wù)列表更新的默認(rèn)策略,也就是說(shuō),DynamicServerListLoadBalancer
負(fù)載均衡器中的默認(rèn)實(shí)現(xiàn)就是它邮弹,它通過(guò)定時(shí)任務(wù)的方式進(jìn)行服務(wù)列表的更新。
EurekaNotificationServerListUpdater
:該更新器也可以服務(wù)于DynamicServerListLoadBalancer
負(fù)載均衡器勺阐,但是它的觸發(fā)機(jī)制與PollingServerListUpdater
不同,它需要利用Eureka的事件監(jiān)聽(tīng)來(lái)驅(qū)動(dòng)服務(wù)列表的更新操作织阅。
查看PollingServerListUpdater
的實(shí)現(xiàn),
private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
public PollingServerListUpdater() {
this(LISTOFSERVERS_CACHE_UPDATE_DELAY, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
}
public PollingServerListUpdater(IClientConfig clientConfig) {
this(LISTOFSERVERS_CACHE_UPDATE_DELAY, getRefreshIntervalMs(clientConfig));
}
public PollingServerListUpdater(final long initialDelayMs, final long refreshIntervalMs) {
this.initialDelayMs = initialDelayMs;
this.refreshIntervalMs = refreshIntervalMs;
}
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
initialDelayMs
和refreshIntervalMs
的默認(rèn)定義是1000和30*1000震捣,單位是毫秒荔棉。就是說(shuō),更新服務(wù)實(shí)例在初始化之后延遲1s后開(kāi)始執(zhí)行蒿赢,并以30s位周期重復(fù)執(zhí)行润樱,還會(huì)記錄最后更新時(shí)間,是否存活等信息羡棵。
ServerListFilter
volatile ServerListFilter<T> filter;
ServerListFilter接口的定義:
This interface allows for filtering the configured or dynamically obtained List of candidate servers with desirable characteristics.
該接口允許用配置或動(dòng)態(tài)獲取的具有所需特性的候選服務(wù)器列表進(jìn)行過(guò)濾壹若。
public interface ServerListFilter<T extends Server> {
public List<T> getFilteredListOfServers(List<T> servers);
}
其中,除了org.springframework.cloud.netflix.ribbon.ZonePreferenceServerListFilter
的實(shí)現(xiàn)是Spring Cloud Ribbon
中對(duì)Netflix Ribbon
的擴(kuò)展實(shí)現(xiàn)外皂冰,其他均是Netflix Ribbon
中的原生實(shí)現(xiàn)類(lèi)店展,
-
com.netflix.loadbalancer.AbstractServerListFilter
:這是一個(gè)抽象的接口,接收一個(gè)重要的依據(jù)對(duì)象LoadBalancerStats
-
com.netflix.loadbalancer.ZoneAffinityServerListFilter
:該過(guò)濾器基于"區(qū)域感知(Zone Affinity)"的方式實(shí)現(xiàn)服務(wù)實(shí)例的過(guò)濾秃流,也就說(shuō)赂蕴,它會(huì)根據(jù)提供服務(wù)的實(shí)例所處于的區(qū)域(Zone)與消費(fèi)者自身所處區(qū)域(Zone)進(jìn)行比較,過(guò)濾掉那些不是同處一個(gè)區(qū)域的實(shí)例
@Override
public List<T> getFilteredListOfServers(List<T> servers) {
if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
List<T> filteredServers = Lists.newArrayList(Iterables.filter(
servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
if (shouldEnableZoneAffinity(filteredServers)) {
return filteredServers;
} else if (zoneAffinity) {
overrideCounter.increment();
}
}
return servers;
}
com.netflix.niws.loadbalancer.DefaultNIWSServerListFilter
:
該過(guò)濾器完全繼承自ZoneAffinityServerListFilter
舶胀,是默認(rèn)的NIWS(Netfilx Internal Web Service)
過(guò)濾器概说。com.netflix.loadbalancer.ServerListSubsetFilter
:
該過(guò)濾器也繼承自ZoneAffinityServerListFilter
,它非常適用于擁有大規(guī)模服務(wù)器集群(上百或者更多)的系統(tǒng)嚣伐。因?yàn)樗梢援a(chǎn)生一個(gè)“區(qū)域感知”org.springframework.cloud.netflix.ribbon.ZonePreferenceServerListFilter
:
spring cloud
整合時(shí)新增的過(guò)濾器糖赔,若使用Spring Cloud
整合Eureka
和Ribbon
時(shí)會(huì)默認(rèn)使用該過(guò)濾器,它實(shí)現(xiàn)了通過(guò)配置或者Eureka實(shí)例元數(shù)據(jù)的所屬區(qū)域(Zone)來(lái)過(guò)濾同區(qū)域的服務(wù)實(shí)例纤控,它的實(shí)現(xiàn)非常簡(jiǎn)單挂捻,首先通過(guò)ZoneAffinityServerListFilter
的過(guò)濾器來(lái)獲得"區(qū)域感知"的服務(wù)實(shí)例列表,然后遍歷這個(gè)結(jié)果船万,取出根據(jù)消費(fèi)者配置預(yù)設(shè)的區(qū)域Zone來(lái)進(jìn)行過(guò)濾刻撒,如果過(guò)濾的結(jié)果是空就直接返回父類(lèi)的結(jié)果,如果不為空就返回通過(guò)消費(fèi)者的Zone過(guò)濾后的結(jié)果耿导。
@Override
public List<Server> getFilteredListOfServers(List<Server> servers) {
List<Server> output = super.getFilteredListOfServers(servers);
if (this.zone != null && output.size() == servers.size()) {
List<Server> local = new ArrayList<Server>();
for (Server server : output) {
if (this.zone.equalsIgnoreCase(server.getZone())) {
local.add(server);
}
}
if (!local.isEmpty()) {
return local;
}
}
return output;
}
com.netflix.loadbalancer.ZoneAwareLoadBalancer
ZoneAwareLoadBalancer
負(fù)載均衡器是對(duì)DynamicServerListLoadBalancer
的擴(kuò)展声怔。在DynamicServerListLoadBalancer
中,我們可以看到它并沒(méi)有重寫(xiě)選擇具體服務(wù)實(shí)例的chooseServer
函數(shù)舱呻,所以它依然會(huì)采用在BaseLoadBalancer
中實(shí)現(xiàn)的算法醋火。使用RoundRobinRule
規(guī)則悠汽,以線性輪詢的方式來(lái)選擇調(diào)用的服務(wù)實(shí)例,該算法實(shí)現(xiàn)簡(jiǎn)單并沒(méi)有區(qū)域(Zone)的概念芥驳,所以它會(huì)把所有實(shí)例視為一個(gè)Zone下的節(jié)點(diǎn)來(lái)看待柿冲,這樣就會(huì)周期性的跨區(qū)域(Zone)訪問(wèn)的情況,由于跨區(qū)域會(huì)產(chǎn)生更高的延遲兆旬,這些實(shí)例主要以防止區(qū)域性故障實(shí)現(xiàn)高可用為目的而不能作為常規(guī)訪問(wèn)的實(shí)例跑杭,所以在多區(qū)域部署的情況會(huì)出現(xiàn)一定的性能問(wèn)題普监,而該負(fù)載均衡器則可以規(guī)避這樣的問(wèn)題剔宪。
在ZoneAwareLoadBalancer
中胀瞪,并沒(méi)有重寫(xiě)setServersList
方法,說(shuō)明實(shí)現(xiàn)服務(wù)實(shí)例清單的更新的主要邏輯沒(méi)有變化脚祟。但是重寫(xiě)了setServerListForZones
方法谬以,DynamicServerListLoadBalancer
中的定義:
setServerListForZones
函數(shù)的調(diào)用位于更新服務(wù)實(shí)例清單函數(shù)setServersList
最后,根據(jù)區(qū)域Zone
分組的實(shí)例列表由桌,為負(fù)載均衡器中的LoadBalancerStats
對(duì)象創(chuàng)建ZoneStats
并放入Map zoneServersMap
集合中为黎,每一個(gè)區(qū)域Zone
對(duì)應(yīng)一個(gè)ZoneStats
,它用于存儲(chǔ)每個(gè)Zone
的一些狀態(tài)和統(tǒng)計(jì)信息沥寥。
在ZoneAwareLoadBalancer
中對(duì)setServerListForZones
重寫(xiě)如下:
@Override
protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
super.setServerListForZones(zoneServersMap);碍舍,它存儲(chǔ)每個(gè)Zone區(qū)域?qū)?yīng)的負(fù)載均衡器。
if (balancers == null) {
//創(chuàng)建一個(gè)ConcurrentHashMap類(lèi)型的balancers對(duì)象
balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();
}
//具體的負(fù)載均衡器在getLoadBalancer函數(shù)中完成邑雅,同時(shí)在創(chuàng)建負(fù)載均衡器的時(shí)候會(huì)創(chuàng)建它的規(guī)則
for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {
String zone = entry.getKey().toLowerCase();
//創(chuàng)建完負(fù)載均衡器的時(shí)候會(huì)馬上調(diào)用setServersList函數(shù)為其設(shè)置對(duì)應(yīng)Zone區(qū)域的實(shí)例清單
getLoadBalancer(zone).setServersList(entry.getValue());
}
// check if there is any zone that no longer has a server
// and set the list to empty so that the zone related metrics does not
// contain stale data
//對(duì)Zone區(qū)域中實(shí)例清單的檢查片橡,看看是否有Zone區(qū)域的實(shí)例清單已經(jīng)沒(méi)有實(shí)例了,是的話就將balancers中對(duì)應(yīng)的Xone區(qū)域的實(shí)例列表清空淮野,該操作
//的作用是為了后續(xù)選擇節(jié)點(diǎn)時(shí)捧书,防止過(guò)多的Zone區(qū)域統(tǒng)計(jì)信息干擾具體實(shí)例的選擇算法
for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {
if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
existingLBEntry.getValue().setServersList(Collections.emptyList());
}
}
}
getLoadBalancer方法
//如果當(dāng)前實(shí)現(xiàn)中沒(méi)有IRule的實(shí)例,就創(chuàng)建一個(gè)AvailabilityFilteringRule規(guī)則骤星,如果有實(shí)現(xiàn)克隆一個(gè)
@VisibleForTesting
BaseLoadBalancer getLoadBalancer(String zone) {
zone = zone.toLowerCase();
BaseLoadBalancer loadBalancer = balancers.get(zone);
if (loadBalancer == null) {
// We need to create rule object for load balancer for each zone
IRule rule = cloneRule(this.getRule());
loadBalancer = new BaseLoadBalancer(this.getName() + "_" + zone, rule, this.getLoadBalancerStats());
BaseLoadBalancer prev = balancers.putIfAbsent(zone, loadBalancer);
if (prev != null) {
loadBalancer = prev;
}
}
return loadBalancer;
}
在了解了負(fù)載均衡器如何擴(kuò)展服務(wù)實(shí)例清單的時(shí)候经瓷,看其怎樣挑選服務(wù)實(shí)例,來(lái)實(shí)現(xiàn)對(duì)區(qū)域的識(shí)別的洞难,
@Override
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
//只有當(dāng)負(fù)載均衡器中維護(hù)的實(shí)例所屬的Zone區(qū)域的個(gè)數(shù)大于1的時(shí)候才會(huì)執(zhí)行這里的策略
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
//調(diào)用ZoneAvoidanceRule.createSnapshot方法舆吮,當(dāng)前的負(fù)載均衡器中所有的Zone區(qū)域分布創(chuàng)建快照,保存在Map zoneSnapshot中
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
}
if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
//調(diào)用ZoneAvoidanceRule.getAvailableZones方法队贱,來(lái)獲取可用Zone區(qū)域集合色冀,在該函數(shù)中會(huì)通過(guò)Zone區(qū)域快照的統(tǒng)計(jì)數(shù)據(jù)實(shí)現(xiàn)可用區(qū)的挑選。
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
//當(dāng)獲得的可用Zone區(qū)域集合不為空柱嫌,并且個(gè)數(shù)小于Zone區(qū)域總數(shù)锋恬,就隨機(jī)選擇一個(gè)Zone區(qū)域
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
//在確定了某個(gè)Zone區(qū)域后,則獲取了對(duì)應(yīng)Zone區(qū)域服務(wù)均衡器编丘,并調(diào)用zoneLoadBalancer.chooseServer來(lái)選擇具體的服務(wù)實(shí)例与学,而在
//zoneLoadBalancer.chooseServer中將使用IRule接口的choose函數(shù)來(lái)選擇具體的服務(wù)實(shí)例彤悔,在這里,IRule接口的實(shí)現(xiàn)會(huì)使用ZoneAvoidanceRule來(lái)挑選具體的服務(wù)實(shí)例索守。
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Throwable e) {
logger.error("Unexpected exception when choosing server using zone aware logic", e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
//否則實(shí)現(xiàn)父類(lèi)的策略
return super.chooseServer(key);
}
}