Ribbon 是netflix 公司開(kāi)源的基于客戶(hù)端的負(fù)載均衡組件,是Spring Cloud大家庭中非常重要的一個(gè)模塊找都;Ribbon應(yīng)該也是整個(gè)大家庭中相對(duì)而言比較復(fù)雜的模塊,直接影響到服務(wù)調(diào)度的質(zhì)量和性能廊敌。全面掌握Ribbon可以幫助我們了解在分布式微服務(wù)集群工作模式下飒炎,服務(wù)調(diào)度應(yīng)該考慮到的每個(gè)環(huán)節(jié)谁不。
本文將詳細(xì)地剖析Ribbon的設(shè)計(jì)原理,幫助大家對(duì)Spring Cloud 有一個(gè)更好的認(rèn)知震嫉。
一. Spring集成下的Ribbon工作結(jié)構(gòu)
先貼一張總覽圖森瘪,說(shuō)明一下Spring如何集成Ribbon的,如下所示:
Spring Cloud集成模式下的Ribbon有以下幾個(gè)特征:
- Ribbon 服務(wù)配置方式
每一個(gè)服務(wù)配置都有一個(gè)Spring ApplicationContext上下文票堵,用于加載各自服務(wù)的實(shí)例扼睬。
比如,當(dāng)前Spring Cloud 系統(tǒng)內(nèi)悴势,有如下幾個(gè)服務(wù):
服務(wù)名稱(chēng) | 角色 | 依賴(lài)服務(wù) |
---|---|---|
order |
訂單模塊 | user |
user |
用戶(hù)模塊 | 無(wú) |
mobile-bff |
移動(dòng)端BFF |
order ,user
|
mobile-bff
服務(wù)在實(shí)際使用中窗宇,會(huì)用到order
和user
模塊,那么在mobile-bff
服務(wù)的Spring上下文中特纤,會(huì)為order
和user
分別創(chuàng)建一個(gè)子ApplicationContext
,用于加載各自服務(wù)模塊的配置军俊。也就是說(shuō),各個(gè)客戶(hù)端的配置相互獨(dú)立捧存,彼此不收影響
- 和Feign的集成模式
在使用Feign作為客戶(hù)端時(shí)粪躬,最終請(qǐng)求會(huì)轉(zhuǎn)發(fā)成http://<服務(wù)名稱(chēng)>/<relative-path-to-service>
的格式,通過(guò)LoadBalancerFeignClient
昔穴, 提取出服務(wù)標(biāo)識(shí)<服務(wù)名稱(chēng)>
镰官,然后根據(jù)服務(wù)名稱(chēng)
在上下文中查找對(duì)應(yīng)服務(wù)
的負(fù)載均衡器FeignLoadBalancer
,負(fù)載均衡器負(fù)責(zé)根據(jù)既有的服務(wù)實(shí)例的統(tǒng)計(jì)信息吗货,挑選出最合適的服務(wù)實(shí)例
二泳唠、Spring Cloud模式下和Feign的集成實(shí)現(xiàn)方式
和Feign結(jié)合的場(chǎng)景下,F(xiàn)eign的調(diào)用會(huì)被包裝成調(diào)用請(qǐng)求LoadBalancerCommand
宙搬,然后底層通過(guò)Rxjava基于事件的編碼風(fēng)格笨腥,發(fā)送請(qǐng)求孙援;Spring Cloud框架通過(guò) Feigin 請(qǐng)求的URL,提取出服務(wù)名稱(chēng)扇雕,然后在上下文中找到對(duì)應(yīng)服務(wù)的的負(fù)載均衡器實(shí)現(xiàn)FeignLoadBalancer
,然后通過(guò)負(fù)載均衡器中挑選一個(gè)合適的Server實(shí)例
,然后將調(diào)用請(qǐng)求轉(zhuǎn)發(fā)到該Server實(shí)例
上拓售,完成調(diào)用,在此過(guò)程中镶奉,記錄對(duì)應(yīng)Server實(shí)例
的調(diào)用統(tǒng)計(jì)信息础淤。
/**
* Create an {@link Observable} that once subscribed execute network call asynchronously with a server chosen by load balancer.
* If there are any errors that are indicated as retriable by the {@link RetryHandler}, they will be consumed internally by the
* function and will not be observed by the {@link Observer} subscribed to the returned {@link Observable}. If number of retries has
* exceeds the maximal allowed, a final error will be emitted by the returned {@link Observable}. Otherwise, the first successful
* result during execution and retries will be emitted.
*/
public Observable<T> submit(final ServerOperation<T> operation) {
final ExecutionInfoContext context = new ExecutionInfoContext();
if (listenerInvoker != null) {
try {
listenerInvoker.onExecutionStart();
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
// 同一Server最大嘗試次數(shù)
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
//下一Server最大嘗試次數(shù)
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
// Use the load balancer
// 使用負(fù)載均衡器,挑選出合適的Server哨苛,然后執(zhí)行Server請(qǐng)求鸽凶,將請(qǐng)求的數(shù)據(jù)和行為整合到ServerStats中
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
@Override
// Called for each server being selected
public Observable<T> call(Server server) {
// 獲取Server的統(tǒng)計(jì)值
context.setServer(server);
final ServerStats stats = loadBalancerContext.getServerStats(server);
// Called for each attempt and retry 服務(wù)調(diào)用
Observable<T> o = Observable
.just(server)
.concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(final Server server) {
context.incAttemptCount();//重試計(jì)數(shù)
loadBalancerContext.noteOpenConnection(stats);//鏈接統(tǒng)計(jì)
if (listenerInvoker != null) {
try {
listenerInvoker.onStartWithServer(context.toExecutionInfo());
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
//執(zhí)行監(jiān)控器,記錄執(zhí)行時(shí)間
final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
//找到合適的server后建峭,開(kāi)始執(zhí)行請(qǐng)求
//底層調(diào)用有結(jié)果后玻侥,做消息處理
return operation.call(server).doOnEach(new Observer<T>() {
private T entity;
@Override
public void onCompleted() {
recordStats(tracer, stats, entity, null);
// 記錄統(tǒng)計(jì)信息
}
@Override
public void onError(Throwable e) {
recordStats(tracer, stats, null, e);//記錄異常信息
logger.debug("Got error {} when executed on server {}", e, server);
if (listenerInvoker != null) {
listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
}
}
@Override
public void onNext(T entity) {
this.entity = entity;//返回結(jié)果值
if (listenerInvoker != null) {
listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
}
}
private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
tracer.stop();//結(jié)束計(jì)時(shí)
//標(biāo)記請(qǐng)求結(jié)束,更新統(tǒng)計(jì)信息
loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
}
});
}
});
//如果失敗亿蒸,根據(jù)重試策略觸發(fā)重試邏輯
// 使用observable 做重試邏輯凑兰,根據(jù)predicate 做邏輯判斷,這里做
if (maxRetrysSame > 0)
o = o.retry(retryPolicy(maxRetrysSame, true));
return o;
}
});
// next請(qǐng)求處理边锁,基于重試器操作
if (maxRetrysNext > 0 && server == null)
o = o.retry(retryPolicy(maxRetrysNext, false));
return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
@Override
public Observable<T> call(Throwable e) {
if (context.getAttemptCount() > 0) {
if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
"Number of retries on next server exceeded max " + maxRetrysNext
+ " retries, while making a call for: " + context.getServer(), e);
}
else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
"Number of retries exceeded max " + maxRetrysSame
+ " retries, while making a call for: " + context.getServer(), e);
}
}
if (listenerInvoker != null) {
listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
}
return Observable.error(e);
}
});
}
從一組ServerList 列表中挑選合適的Server
/**
* Compute the final URI from a partial URI in the request. The following steps are performed:
* <ul>
* <li> 如果host尚未指定姑食,則從負(fù)載均衡器中選定 host/port
* <li> 如果host 尚未指定并且尚未找到負(fù)載均衡器,則嘗試從 虛擬地址中確定host/port
* <li> 如果指定了HOST,并且URI的授權(quán)部分通過(guò)虛擬地址設(shè)置茅坛,并且存在負(fù)載均衡器音半,則通過(guò)負(fù)載就均衡器中確定host/port(指定的HOST將會(huì)被忽略)
* <li> 如果host已指定,但是尚未指定負(fù)載均衡器和虛擬地址配置贡蓖,則使用真實(shí)地址作為host
* <li> if host is missing but none of the above applies, throws ClientException
* </ul>
*
* @param original Original URI passed from caller
*/
public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
String host = null;
int port = -1;
if (original != null) {
host = original.getHost();
}
if (original != null) {
Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);
port = schemeAndPort.second();
}
// Various Supported Cases
// The loadbalancer to use and the instances it has is based on how it was registered
// In each of these cases, the client might come in using Full Url or Partial URL
ILoadBalancer lb = getLoadBalancer();
if (host == null) {
// 提供部分URI曹鸠,缺少HOST情況下
// well we have to just get the right instances from lb - or we fall back
if (lb != null){
Server svc = lb.chooseServer(loadBalancerKey);// 使用負(fù)載均衡器選擇Server
if (svc == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Load balancer does not have available server for client: "
+ clientName);
}
//通過(guò)負(fù)載均衡器選擇的結(jié)果中選擇host
host = svc.getHost();
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Invalid Server for :" + svc);
}
logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
return svc;
} else {
// No Full URL - and we dont have a LoadBalancer registered to
// obtain a server
// if we have a vipAddress that came with the registration, we
// can use that else we
// bail out
// 通過(guò)虛擬地址配置解析出host配置返回
if (vipAddresses != null && vipAddresses.contains(",")) {
throw new ClientException(
ClientException.ErrorType.GENERAL,
"Method is invoked for client " + clientName + " with partial URI of ("
+ original
+ ") with no load balancer configured."
+ " Also, there are multiple vipAddresses and hence no vip address can be chosen"
+ " to complete this partial uri");
} else if (vipAddresses != null) {
try {
Pair<String,Integer> hostAndPort = deriveHostAndPortFromVipAddress(vipAddresses);
host = hostAndPort.first();
port = hostAndPort.second();
} catch (URISyntaxException e) {
throw new ClientException(
ClientException.ErrorType.GENERAL,
"Method is invoked for client " + clientName + " with partial URI of ("
+ original
+ ") with no load balancer configured. "
+ " Also, the configured/registered vipAddress is unparseable (to determine host and port)");
}
} else {
throw new ClientException(
ClientException.ErrorType.GENERAL,
this.clientName
+ " has no LoadBalancer registered and passed in a partial URL request (with no host:port)."
+ " Also has no vipAddress registered");
}
}
} else {
// Full URL Case URL中指定了全地址,可能是虛擬地址或者是hostAndPort
// This could either be a vipAddress or a hostAndPort or a real DNS
// if vipAddress or hostAndPort, we just have to consult the loadbalancer
// but if it does not return a server, we should just proceed anyways
// and assume its a DNS
// For restClients registered using a vipAddress AND executing a request
// by passing in the full URL (including host and port), we should only
// consult lb IFF the URL passed is registered as vipAddress in Discovery
boolean shouldInterpretAsVip = false;
if (lb != null) {
shouldInterpretAsVip = isVipRecognized(original.getAuthority());
}
if (shouldInterpretAsVip) {
Server svc = lb.chooseServer(loadBalancerKey);
if (svc != null){
host = svc.getHost();
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Invalid Server for :" + svc);
}
logger.debug("using LB returned Server: {} for request: {}", svc, original);
return svc;
} else {
// just fall back as real DNS
logger.debug("{}:{} assumed to be a valid VIP address or exists in the DNS", host, port);
}
} else {
// consult LB to obtain vipAddress backed instance given full URL
//Full URL execute request - where url!=vipAddress
logger.debug("Using full URL passed in by caller (not using load balancer): {}", original);
}
}
// end of creating final URL
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,"Request contains no HOST to talk to");
}
// just verify that at this point we have a full URL
return new Server(host, port);
}
三. LoadBalancer--負(fù)載均衡器的核心
LoadBalancer 的職能主要有三個(gè):
- 維護(hù)Sever列表的數(shù)量(新增斥铺、更新彻桃、刪除等)
- 維護(hù)Server列表的狀態(tài)(狀態(tài)更新)
- 當(dāng)請(qǐng)求Server實(shí)例時(shí),能否返回最合適的Server實(shí)例
本章節(jié)將通過(guò)詳細(xì)闡述著這三個(gè)方面仅父。
3.1 負(fù)載均衡器的內(nèi)部基本實(shí)現(xiàn)原理
先熟悉一下負(fù)載均衡器LoadBalancer的實(shí)現(xiàn)原理圖:
組成部分 | 職能 | 參考章節(jié) |
---|---|---|
Server |
Server 作為服務(wù)實(shí)例的表示叛薯,會(huì)記錄服務(wù)實(shí)例的相關(guān)信息,如:服務(wù)地址笙纤,所屬zone耗溜,服務(wù)名稱(chēng),實(shí)例ID等 |
|
ServerList | 維護(hù)著一組Server 實(shí)例列表,在應(yīng)用運(yùn)行的過(guò)程中省容,Ribbon通過(guò)ServerList中的服務(wù)實(shí)例供負(fù)載均衡器選擇抖拴。ServerList維護(hù)列表可能在運(yùn)行的過(guò)程中動(dòng)態(tài)改變 |
3.2 |
ServerStats | 作為對(duì)應(yīng)Server 的運(yùn)行情況統(tǒng)計(jì),一般是服務(wù)調(diào)用過(guò)程中的Server 平均響應(yīng)時(shí)間,累計(jì)請(qǐng)求失敗計(jì)數(shù)阿宅,熔斷時(shí)間控制等候衍。一個(gè)ServerStats 實(shí)例唯一對(duì)應(yīng)一個(gè)Server 實(shí)例 |
|
LoadBalancerStats | 作為 ServerStats 實(shí)例列表的容器,統(tǒng)一維護(hù) |
|
ServerListUpdater | 負(fù)載均衡器通過(guò)ServerListUpdater 來(lái)更新ServerList ,比如實(shí)現(xiàn)一個(gè)定時(shí)任務(wù)洒放,每隔一段時(shí)間獲取最新的Server實(shí)例列表 |
3.2 |
Pinger | 服務(wù)狀態(tài)檢驗(yàn)器蛉鹿,負(fù)責(zé)維護(hù)ServerList 列表中的服務(wù)狀態(tài)注意:Pinger僅僅負(fù)責(zé)Server的狀態(tài),沒(méi)有能力決定是否刪除
|
|
PingerStrategy | 定義以何種方式還檢驗(yàn)服務(wù)是否有效往湿,比如是按照順序的方式還是并行的方式 | |
IPing | Ping妖异,檢驗(yàn)服務(wù)是否可用的方法,常見(jiàn)的是通過(guò)HTTP领追,或者TCP/IP的方式看服務(wù)有無(wú)認(rèn)為正常的請(qǐng)求 |
3.2 如何維護(hù)Server列表他膳?(新增、更新绒窑、刪除)
單從服務(wù)列表的維護(hù)角度上棕孙,Ribbon的結(jié)構(gòu)如下所示:
Server列表的維護(hù)從實(shí)現(xiàn)方法上分為兩類(lèi):
- 基于配置的服務(wù)列表
這種方式一般是通過(guò)配置文件,靜態(tài)地配置服務(wù)器列表些膨,這種方式相對(duì)而言比較簡(jiǎn)單蟀俊,但并不是意味著在機(jī)器運(yùn)行的時(shí)候就一直不變。netflix
在做Spring cloud 套件時(shí)傀蓉,使用了分布式配置框架netflix archaius
欧漱,archaius
框架有一個(gè)特點(diǎn)是會(huì)動(dòng)態(tài)的監(jiān)控配置文件的變化职抡,將變化刷新到各個(gè)應(yīng)用上葬燎。也就是說(shuō),當(dāng)我們?cè)诓魂P(guān)閉服務(wù)的情況下缚甩,如果修改了基于配置的服務(wù)列表時(shí), 服務(wù)列表可以直接刷新- 結(jié)合服務(wù)發(fā)現(xiàn)組件(如
Eureka
)的服務(wù)注冊(cè)信息動(dòng)態(tài)維護(hù)服務(wù)列表
基于Spring Cloud
框架下谱净,服務(wù)注冊(cè)和發(fā)現(xiàn)是一個(gè)分布式服務(wù)集群必不可少的一個(gè)組件,它負(fù)責(zé)維護(hù)不同的服務(wù)實(shí)例(注冊(cè)擅威、續(xù)約壕探、取消注冊(cè)),本文將介紹和Eureka集成模式下郊丛,如果借助Eureka的服務(wù)注冊(cè)信息動(dòng)態(tài)刷新ribbon
的服務(wù)列表
Ribbon 通過(guò)配置項(xiàng):<service-name>.ribbon.NIWSServerListClassName
來(lái)決定使用哪種實(shí)現(xiàn)方式李请。對(duì)應(yīng)地:
策略 | ServerList實(shí)現(xiàn) |
---|---|
基于配置 | com.netflix.loadbalancer.ConfigurationBasedServerList |
基于服務(wù)發(fā)現(xiàn) | com.netflix.loadbalancer.DiscoveryEnabledNIWSServerList |
Server列表可能在運(yùn)行的時(shí)候動(dòng)態(tài)的更新,而具體的更新方式由<service-name>.ribbon.ServerListUpdaterClassName
,當(dāng)前有如下兩種實(shí)現(xiàn)方式:
更新策略 | ServerListUpdater實(shí)現(xiàn) |
---|---|
基于定時(shí)任務(wù)的拉取服務(wù)列表 | com.netflix.loadbalancer.PollingServerListUpdater |
基于Eureka服務(wù)事件通知的方式更新 | com.netflix.loadbalancer.EurekaNotificationServerListUpdater |
-
基于定時(shí)任務(wù)拉取服務(wù)列表方式
這種方式的實(shí)現(xiàn)為:com.netflix.loadbalancer.PollingServerListUpdater
,其內(nèi)部維護(hù)了一個(gè)周期性的定時(shí)任務(wù)厉熟,拉取最新的服務(wù)列表导盅,然后將最新的服務(wù)列表更新到ServerList之中,其核心的實(shí)現(xiàn)邏輯如下:
public class PollingServerListUpdater implements ServerListUpdater {
private static final Logger logger = LoggerFactory.getLogger(PollingServerListUpdater.class);
private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
// 更新器線程池定義以及鉤子設(shè)置
private static class LazyHolder {
private final static String CORE_THREAD = "DynamicServerListLoadBalancer.ThreadPoolSize";
private final static DynamicIntProperty poolSizeProp = new DynamicIntProperty(CORE_THREAD, 2);
private static Thread _shutdownThread;
static ScheduledThreadPoolExecutor _serverListRefreshExecutor = null;
static {
int coreSize = poolSizeProp.get();
ThreadFactory factory = (new ThreadFactoryBuilder())
.setNameFormat("PollingServerListUpdater-%d")
.setDaemon(true)
.build();
_serverListRefreshExecutor = new ScheduledThreadPoolExecutor(coreSize, factory);
poolSizeProp.addCallback(new Runnable() {
@Override
public void run() {
_serverListRefreshExecutor.setCorePoolSize(poolSizeProp.get());
}
});
_shutdownThread = new Thread(new Runnable() {
public void run() {
logger.info("Shutting down the Executor Pool for PollingServerListUpdater");
shutdownExecutorPool();
}
});
Runtime.getRuntime().addShutdownHook(_shutdownThread);
}
private static void shutdownExecutorPool() {
if (_serverListRefreshExecutor != null) {
_serverListRefreshExecutor.shutdown();
if (_shutdownThread != null) {
try {
Runtime.getRuntime().removeShutdownHook(_shutdownThread);
} catch (IllegalStateException ise) { // NOPMD
// this can happen if we're in the middle of a real
// shutdown,
// and that's 'ok'
}
}
}
}
}
// 省略部分代碼
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
//創(chuàng)建定時(shí)任務(wù),按照特定的實(shí)行周期執(zhí)行更新操作
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
//執(zhí)行update操作 揍瑟,更新操作定義在LoadBalancer中
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
//定時(shí)任務(wù)創(chuàng)建
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs, //初始延遲時(shí)間
refreshIntervalMs, //內(nèi)部刷新時(shí)間
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
//省略部分代碼
}
有上述代碼可以看到,ServerListUpdator
只是定義了更新的方式白翻,而具體怎么更新,則是封裝成UpdateAction
來(lái)操作的:
/**
* an interface for the updateAction that actually executes a server list update
*/
public interface UpdateAction {
void doUpdate();
}
//在DynamicServerListLoadBalancer 中則實(shí)現(xiàn)了具體的操作:
public DynamicServerListLoadBalancer() {
this.isSecure = false;
this.useTunnel = false;
this.serverListUpdateInProgress = new AtomicBoolean(false);
this.updateAction = new UpdateAction() {
public void doUpdate() {
//更新服務(wù)列表
DynamicServerListLoadBalancer.this.updateListOfServers();
}
};
}
@VisibleForTesting
public void updateListOfServers() {
List<T> servers = new ArrayList();
// 通過(guò)ServerList獲取最新的服務(wù)列表
if (this.serverListImpl != null) {
servers = this.serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
//返回的結(jié)果通過(guò)過(guò)濾器的方式進(jìn)行過(guò)濾
if (this.filter != null) {
servers = this.filter.getFilteredListOfServers((List)servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers);
}
}
//更新列表
this.updateAllServerList((List)servers);
}
protected void updateAllServerList(List<T> ls) {
if (this.serverListUpdateInProgress.compareAndSet(false, true)) {
try {
Iterator var2 = ls.iterator();
while(var2.hasNext()) {
T s = (Server)var2.next();
s.setAlive(true);
}
this.setServersList(ls);
super.forceQuickPing();
} finally {
this.serverListUpdateInProgress.set(false);
}
}
}
-
基于Eureka服務(wù)事件通知的方式更新
基于Eureka的更新方式則有些不同, 當(dāng)Eureka注冊(cè)中心發(fā)生了Server服務(wù)注冊(cè)信息變更時(shí)绢片,會(huì)將消息通知發(fā)送到EurekaNotificationServerListUpdater
上滤馍,然后此Updator觸發(fā)刷新ServerList:
public class EurekaNotificationServerListUpdater implements ServerListUpdater {
//省略部分代碼
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
//創(chuàng)建Eureka時(shí)間監(jiān)聽(tīng)器岛琼,當(dāng)Eureka發(fā)生改變后,將觸發(fā)對(duì)應(yīng)邏輯
this.updateListener = new EurekaEventListener() {
@Override
public void onEvent(EurekaEvent event) {
if (event instanceof CacheRefreshedEvent) {
//內(nèi)部消息隊(duì)列
if (!updateQueued.compareAndSet(false, true)) { // if an update is already queued
logger.info("an update action is already queued, returning as no-op");
return;
}
if (!refreshExecutor.isShutdown()) {
try {
//提交更新操作請(qǐng)求到消息隊(duì)列中
refreshExecutor.submit(new Runnable() {
@Override
public void run() {
try {
updateAction.doUpdate(); // 執(zhí)行真正的更新操作
lastUpdated.set(System.currentTimeMillis());
} catch (Exception e) {
logger.warn("Failed to update serverList", e);
} finally {
updateQueued.set(false);
}
}
}); // fire and forget
} catch (Exception e) {
logger.warn("Error submitting update task to executor, skipping one round of updates", e);
updateQueued.set(false); // if submit fails, need to reset updateQueued to false
}
}
else {
logger.debug("stopping EurekaNotificationServerListUpdater, as refreshExecutor has been shut down");
stop();
}
}
}
};
//EurekaClient 客戶(hù)端實(shí)例
if (eurekaClient == null) {
eurekaClient = eurekaClientProvider.get();
}
//基于EeurekaClient注冊(cè)事件監(jiān)聽(tīng)器
if (eurekaClient != null) {
eurekaClient.registerEventListener(updateListener);
} else {
logger.error("Failed to register an updateListener to eureka client, eureka client is null");
throw new IllegalStateException("Failed to start the updater, unable to register the update listener due to eureka client being null.");
}
} else {
logger.info("Update listener already registered, no-op");
}
}
}
3.2.1 相關(guān)的配置項(xiàng)
配置項(xiàng) | 說(shuō)明 | 生效場(chǎng)景 | 默認(rèn)值 |
---|---|---|---|
<service-name>.ribbon.NIWSServerListClassName |
ServerList 的實(shí)現(xiàn)巢株,實(shí)現(xiàn)參考上述描述 |
ConfigurationBasedServerList |
|
<service-name>.ribbon.listOfServers |
服務(wù)列表 hostname:port 形式槐瑞,以逗號(hào)隔開(kāi) | 當(dāng)ServerList 實(shí)現(xiàn)基于配置時(shí) |
|
<service-name>.ribbon.ServerListUpdaterClassName |
服務(wù)列表更新策略實(shí)現(xiàn),參考上述描述 | PollingServerListUpdater |
|
<service-name>.ribbon.ServerListRefreshInterval |
服務(wù)列表刷新頻率 | 基于定時(shí)任務(wù)拉取時(shí) | 30s |
3.2.2 ribbon的默認(rèn)實(shí)現(xiàn)
ribbon在默認(rèn)情況下阁苞,會(huì)采用如下的配置項(xiàng)随珠,即,采用基于配置的服務(wù)列表維護(hù)猬错,基于定時(shí)任務(wù)按時(shí)拉取服務(wù)列表的方式窗看,頻率為30s.
<service-name>.ribbon.NIWSServerListClassName=com.netflix.loadbalancer.ConfigurationBasedServerList
<service-name>.ribbon.listOfServers=<ip:port>,<ip:port>
<service-name>.ribbon.ServerListUpdaterClassName=com.netflix.loadbalancer.EurekaNotificationServerListUpdater
<service-name>.ribbon.ServerListRefreshInterval=30
### 更新線程池大小
DynamicServerListLoadBalancer.ThreadPoolSize=2
3.2.3 Spring Cloud集成下的配置
ribbon在默認(rèn)情況下,會(huì)采用如下的配置項(xiàng)倦炒,即显沈,采用基于配置的服務(wù)列表維護(hù),基于定時(shí)任務(wù)按時(shí)拉取服務(wù)列表的方式逢唤,頻率為30s.
<service-name>.ribbon.NIWSServerListClassName=com.netflix.loadbalancer.DiscoveryEnabledNIWSServerList
<service-name>.ribbon.ServerListUpdaterClassName=com.netflix.loadbalancer.EurekaNotificationServerListUpdater
### 更新線程池大小
EurekaNotificationServerListUpdater.ThreadPoolSize=2
###通知隊(duì)列接收大小
EurekaNotificationServerListUpdater.queueSize=1000
3.3 負(fù)載均衡器如何維護(hù)服務(wù)實(shí)例的狀態(tài)拉讯?
Ribbon負(fù)載均衡器將服務(wù)實(shí)例的狀態(tài)維護(hù)托交給Pinger
、 PingerStrategy
鳖藕、IPing
來(lái)維護(hù)魔慷,具體交互模式如下所示:
/**
* 定義Ping服務(wù)狀態(tài)是否有效的策略,是序列化順序Ping著恩,還是并行的方式Ping院尔,在此過(guò)程中,應(yīng)當(dāng)保證相互不受影響
*
*/
public interface IPingStrategy {
boolean[] pingServers(IPing ping, Server[] servers);
}
/**
* 定義如何Ping一個(gè)服務(wù),確保是否有效
* @author stonse
*
*/
public interface IPing {
/**
* Checks whether the given <code>Server</code> is "alive" i.e. should be
* considered a candidate while loadbalancing
* 校驗(yàn)是否存活
*/
public boolean isAlive(Server server);
}
3.3.1 創(chuàng)建Ping定時(shí)任務(wù)
默認(rèn)情況下,負(fù)載均衡器內(nèi)部會(huì)創(chuàng)建一個(gè)周期性定時(shí)任務(wù)
控制參數(shù) | 說(shuō)明 | 默認(rèn)值 |
---|---|---|
<service-name>.ribbon.NFLoadBalancerPingInterval | Ping定時(shí)任務(wù)周期 | 30 s |
<service-name>.ribbon.NFLoadBalancerMaxTotalPingTime | Ping超時(shí)時(shí)間 | 2s |
<service-name>.ribbon.NFLoadBalancerPingClassName | IPing實(shí)現(xiàn)類(lèi) | DummyPing,直接返回true |
默認(rèn)的PingStrategy仇穗,采用序列化的實(shí)現(xiàn)方式绳瘟,依次檢查服務(wù)實(shí)例是否可用:
/**
* Default implementation for <c>IPingStrategy</c>, performs ping
* serially, which may not be desirable, if your <c>IPing</c>
* implementation is slow, or you have large number of servers.
*/
private static class SerialPingStrategy implements IPingStrategy {
@Override
public boolean[] pingServers(IPing ping, Server[] servers) {
int numCandidates = servers.length;
boolean[] results = new boolean[numCandidates];
logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);
for (int i = 0; i < numCandidates; i++) {
results[i] = false; /* Default answer is DEAD. */
try {
// 按序列依次檢查服務(wù)是否正常,并返回對(duì)應(yīng)的數(shù)組表示
if (ping != null) {
results[i] = ping.isAlive(servers[i]);
}
} catch (Exception e) {
logger.error("Exception while pinging Server: '{}'", servers[i], e);
}
}
return results;
}
}
3.3.2 Ribbon默認(rèn)的IPing實(shí)現(xiàn):DummyPing
默認(rèn)的IPing實(shí)現(xiàn),直接返回為true:
public class DummyPing extends AbstractLoadBalancerPing {
public DummyPing() {
}
public boolean isAlive(Server server) {
return true;
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}
除此之外,IPing還有如下幾種實(shí)現(xiàn):
3.3.3 Spring Cloud集成下的IPing實(shí)現(xiàn):NIWSDiscoveryPing
而和Spring Cloud 集成后,IPing的默認(rèn)實(shí)現(xiàn),是
NIWSDiscoveryPing
,其使用Eureka作為服務(wù)注冊(cè)和發(fā)現(xiàn)例获,則校驗(yàn)服務(wù)是否可用,則通過(guò)監(jiān)聽(tīng)Eureka 服務(wù)更新來(lái)更新Ribbon的Server狀態(tài)曹仗,而具體的實(shí)現(xiàn)就是NIWSDiscoveryPing
:
/**
* "Ping" Discovery Client
* i.e. we dont do a real "ping". We just assume that the server is up if Discovery Client says so
* @author stonse
*
*/
public class NIWSDiscoveryPing extends AbstractLoadBalancerPing {
BaseLoadBalancer lb = null;
public NIWSDiscoveryPing() {
}
public BaseLoadBalancer getLb() {
return lb;
}
/**
* Non IPing interface method - only set this if you care about the "newServers Feature"
* @param lb
*/
public void setLb(BaseLoadBalancer lb) {
this.lb = lb;
}
public boolean isAlive(Server server) {
boolean isAlive = true;
//取 Eureka Server 的Instance實(shí)例狀態(tài)作為Ribbon服務(wù)的狀態(tài)
if (server!=null && server instanceof DiscoveryEnabledServer){
DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;
InstanceInfo instanceInfo = dServer.getInstanceInfo();
if (instanceInfo!=null){
InstanceStatus status = instanceInfo.getStatus();
if (status!=null){
isAlive = status.equals(InstanceStatus.UP);
}
}
}
return isAlive;
}
@Override
public void initWithNiwsConfig(
IClientConfig clientConfig) {
}
}
Spring Cloud下的默認(rèn)實(shí)現(xiàn)入口:
@Bean
@ConditionalOnMissingBean
public IPing ribbonPing(IClientConfig config) {
if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
return this.propertiesFactory.get(IPing.class, config, serviceId);
}
NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
ping.initWithNiwsConfig(config);
return ping;
}
3.4 如何從服務(wù)列表中挑選一個(gè)合適的服務(wù)實(shí)例榨汤?
3.4.1 服務(wù)實(shí)例容器:ServerList的維護(hù)
負(fù)載均衡器通過(guò) ServerList來(lái)統(tǒng)一維護(hù)服務(wù)實(shí)例,具體模式如下:
基礎(chǔ)的接口定義非常簡(jiǎn)單:
/**
* Interface that defines the methods sed to obtain the List of Servers
* @author stonse
*
* @param <T>
*/
public interface ServerList<T extends Server> {
//獲取初始化的服務(wù)列表
public List<T> getInitialListOfServers();
/**
* Return updated list of servers. This is called say every 30 secs
* (configurable) by the Loadbalancer's Ping cycle
* 獲取更新后的的服務(wù)列表
*/
public List<T> getUpdatedListOfServers();
}
在Ribbon的實(shí)現(xiàn)中整葡,在ServerList
中件余,維護(hù)著Server的實(shí)例,并返回最新的List<Server>
集合,供LoadBalancer
使用
ServerList<Server>的職能:
負(fù)責(zé)維護(hù)服務(wù)實(shí)例啼器,并使用ServerListFilter
過(guò)濾器過(guò)濾出符合要求的服務(wù)實(shí)例列表List<Server>
3.4.2 服務(wù)實(shí)例列表過(guò)濾器ServerListFilter
服務(wù)實(shí)例列表過(guò)濾器
ServerListFilter
的職能很簡(jiǎn)單:
傳入一個(gè)服務(wù)實(shí)例列表旬渠,過(guò)濾出滿(mǎn)足過(guò)濾條件的服務(wù)列表
public interface ServerListFilter<T extends Server> {
public List<T> getFilteredListOfServers(List<T> servers);
}
3.4.2.1 Ribbon 的默認(rèn)ServerListFilter
實(shí)現(xiàn):ZoneAffinityServerListFilter
Ribbon默認(rèn)采取了區(qū)域
優(yōu)先的過(guò)濾策略,即當(dāng)Server列表中端壳,過(guò)濾出和當(dāng)前實(shí)例所在的區(qū)域(zone)
一致的server
列表
與此相關(guān)聯(lián)的告丢,Ribbon有兩個(gè)相關(guān)得配置參數(shù):
控制參數(shù) | 說(shuō)明 | 默認(rèn)值 |
---|---|---|
<service-name>.ribbon.EnableZoneAffinity | 是否開(kāi)啟區(qū)域優(yōu)先 | false |
<service-name>.ribbon.EnableZoneExclusivity | 是否采取區(qū)域排他性,即只返回和當(dāng)前Zone一致的服務(wù)實(shí)例 | false |
<service-name>.ribbon.zoneAffinity.maxLoadPerServer | 每個(gè)Server上的最大活躍請(qǐng)求負(fù)載數(shù)閾值 | 0.6 |
<service-name>.ribbon.zoneAffinity.maxBlackOutServesrPercentage | 最大斷路過(guò)濾的百分比 | 0.8 |
<service-name>.ribbon.zoneAffinity.minAvailableServers | 最少可用的服務(wù)實(shí)例閾值 | 2 |
其核心實(shí)現(xiàn)如下所示:
public class ZoneAffinityServerListFilter<T extends Server> extends
AbstractServerListFilter<T> implements IClientConfigAware {
@Override
public List<T> getFilteredListOfServers(List<T> servers) {
//zone非空损谦,并且開(kāi)啟了區(qū)域優(yōu)先岖免,并且服務(wù)實(shí)例數(shù)量不為空
if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
//基于斷言過(guò)濾服務(wù)列表
List<T> filteredServers = Lists.newArrayList(Iterables.filter(
servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
//如果允許區(qū)域優(yōu)先,則返回過(guò)濾列表
if (shouldEnableZoneAffinity(filteredServers)) {
return filteredServers;
} else if (zoneAffinity) {
overrideCounter.increment();
}
}
return servers;
}
// 判斷是否應(yīng)該使用區(qū)域優(yōu)先過(guò)濾條件
private boolean shouldEnableZoneAffinity(List<T> filtered) {
if (!zoneAffinity && !zoneExclusive) {
return false;
}
if (zoneExclusive) {
return true;
}
// 獲取統(tǒng)計(jì)信息
LoadBalancerStats stats = getLoadBalancerStats();
if (stats == null) {
return zoneAffinity;
} else {
logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
//獲取區(qū)域Server快照照捡,包含統(tǒng)計(jì)數(shù)據(jù)
ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
//平均負(fù)載颅湘,此負(fù)載的意思是,當(dāng)前所有的Server中栗精,平均每臺(tái)機(jī)器上的活躍請(qǐng)求數(shù)
double loadPerServer = snapshot.getLoadPerServer();
int instanceCount = snapshot.getInstanceCount();
int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
// 1. 如果Server斷路的比例超過(guò)了設(shè)置的上限(默認(rèn)`0.8`)
// 2. 或者當(dāng)前負(fù)載超過(guò)了設(shè)置的負(fù)載上限
// 3. 如果可用的服務(wù)小于設(shè)置的服務(wù)上限`默認(rèn)為2`
if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get()
|| loadPerServer >= activeReqeustsPerServerThreshold.get()
|| (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) {
logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}",
new Object[] {(double) circuitBreakerTrippedCount / instanceCount, loadPerServer, instanceCount - circuitBreakerTrippedCount});
return false;
} else {
return true;
}
}
}
}
具體判斷流程如下所示:
3.4.2.2 Ribbon 的ServerListFilter
實(shí)現(xiàn)2:ZonePreferenceServerListFilter
ZonePreferenceServerListFilter
集成自 ZoneAffinityServerListFilter
闯参,在此基礎(chǔ)上做了拓展,在 ZoneAffinityServerListFilter
返回結(jié)果的基礎(chǔ)上悲立,再過(guò)濾出和本地服務(wù)相同區(qū)域(zone
)的服務(wù)列表鹿寨。
核心邏輯-什么時(shí)候起作用?
當(dāng)指定了當(dāng)前服務(wù)的所在Zone薪夕,并且ZoneAffinityServerListFilter
沒(méi)有起到過(guò)濾效果時(shí)脚草,ZonePreferenceServerListFilter
會(huì)返回當(dāng)前Zone的Server列表。
public class ZonePreferenceServerListFilter extends ZoneAffinityServerListFilter<Server> {
private String zone;
@Override
public void initWithNiwsConfig(IClientConfig niwsClientConfig) {
super.initWithNiwsConfig(niwsClientConfig);
if (ConfigurationManager.getDeploymentContext() != null) {
this.zone = ConfigurationManager.getDeploymentContext().getValue(
ContextKey.zone);
}
}
@Override
public List<Server> getFilteredListOfServers(List<Server> servers) {
//父類(lèi)的基礎(chǔ)上原献,獲取過(guò)濾結(jié)果
List<Server> output = super.getFilteredListOfServers(servers);
//沒(méi)有起到過(guò)濾效果馏慨,則進(jìn)行區(qū)域優(yōu)先過(guò)濾
if (this.zone != null && output.size() == servers.size()) {
List<Server> local = new ArrayList<>();
for (Server server : output) {
if (this.zone.equalsIgnoreCase(server.getZone())) {
local.add(server);
}
}
if (!local.isEmpty()) {
return local;
}
}
return output;
}
public String getZone() {
return zone;
}
public void setZone(String zone) {
this.zone = zone;
}
3.4.2.3 Ribbon 的ServerListFilter
實(shí)現(xiàn)3:ServerListSubsetFilter
這個(gè)過(guò)濾器作用于當(dāng)Server數(shù)量列表特別龐大時(shí)(比如有上百個(gè)Server
實(shí)例),這時(shí)嚼贡,長(zhǎng)時(shí)間保持Http鏈接也不太合適熏纯,可以適當(dāng)?shù)乇A舨糠址?wù),舍棄其中一些服務(wù)粤策,這樣可使釋放沒(méi)必要的鏈接。
此過(guò)濾器也是繼承自 ZoneAffinityServerListFilter
,在此基礎(chǔ)上做了拓展误窖,在實(shí)際使用中不太常見(jiàn)叮盘,這個(gè)后續(xù)再展開(kāi)介紹,暫且不表霹俺。
3.4.3 LoadBalancer選擇服務(wù)實(shí)例
的流程
LoadBalancer的核心功能是根據(jù)負(fù)載情況柔吼,從服務(wù)列表中挑選最合適的服務(wù)實(shí)例
。LoadBalancer內(nèi)部采用了如下圖所示的組件完成:
LoadBalancer 選擇服務(wù)實(shí)例的流程
- 通過(guò)
ServerList
獲取當(dāng)前可用的服務(wù)實(shí)例列表丙唧;- 通過(guò)
ServerListFilter
將步驟1 得到的服務(wù)列表進(jìn)行一次過(guò)濾愈魏,返回滿(mǎn)足過(guò)濾器條件的服務(wù)實(shí)例列表;- 應(yīng)用
Rule
規(guī)則,結(jié)合服務(wù)實(shí)例的統(tǒng)計(jì)信息
,返回滿(mǎn)足規(guī)則的某一個(gè)服務(wù)實(shí)例培漏;通過(guò)上述的流程可以看到溪厘,實(shí)際上,在服務(wù)實(shí)例列表選擇的過(guò)程中牌柄,有兩次過(guò)濾的機(jī)會(huì):第一次是首先通過(guò)ServerListFilter過(guò)濾器畸悬,另外一次是用過(guò)IRule 的選擇規(guī)則進(jìn)行過(guò)濾
通過(guò)ServerListFilter
進(jìn)行服務(wù)實(shí)例過(guò)濾的策略上面已經(jīng)介紹得比較詳細(xì)了,接下來(lái)將介紹Rule是如何從一堆服務(wù)列表中選擇服務(wù)的。
在介紹Rule之前珊佣,需要介紹一個(gè)概念:Server統(tǒng)計(jì)信息
當(dāng)LoadBalancer在選擇合適的Server提供給應(yīng)用后蹋宦,應(yīng)用會(huì)向該Server
發(fā)送服務(wù)請(qǐng)求,則在請(qǐng)求的過(guò)程中咒锻,應(yīng)用會(huì)根據(jù)請(qǐng)求的相應(yīng)時(shí)間或者網(wǎng)絡(luò)連接情況等進(jìn)行統(tǒng)計(jì)冷冗;當(dāng)應(yīng)用后續(xù)從LoadBalancer選擇合適的Server時(shí),LoadBalancer 會(huì)根據(jù)每個(gè)服務(wù)的統(tǒng)計(jì)信息惑艇,結(jié)合Rule來(lái)判定哪個(gè)服務(wù)是最合適的贾惦。
3.4.3.1 負(fù)載均衡器LoaderBalancer 都統(tǒng)計(jì)了哪些關(guān)于服務(wù)實(shí)例Server相關(guān)的信息?
ServerStats | 說(shuō)明 | 類(lèi)型 | 默認(rèn)值 |
---|---|---|---|
zone | 當(dāng)前服務(wù)所屬的可用區(qū) | 配置 | 可通過(guò) eureka.instance.meta.zone 指定 |
totalRequests | 總請(qǐng)求數(shù)量敦捧,client每次調(diào)用须板,數(shù)量會(huì)遞增 | 實(shí)時(shí) | 0 |
activeRequestsCountTimeout | 活動(dòng)請(qǐng)求計(jì)數(shù)時(shí)間窗niws.loadbalancer.serverStats.activeRequestsCount.effectiveWindowSeconds ,如果時(shí)間窗范圍之內(nèi)沒(méi)有activeRequestsCount 值的改變,則activeRequestsCounts 初始化為0 |
配置 | 60*10(seconds) |
successiveConnectionFailureCount | 連續(xù)連接失敗計(jì)數(shù) | 實(shí)時(shí) | |
connectionFailureThreshold | 連接失敗閾值通過(guò)屬性niws.loadbalancer.default.connectionFailureCountThreshold 進(jìn)行配置 |
配置 | 3 |
circuitTrippedTimeoutFactor | 斷路器超時(shí)因子,niws.loadbalancer.default.circuitTripTimeoutFactorSeconds
|
配置 | 10(seconds) |
maxCircuitTrippedTimeout | 最大斷路器超時(shí)秒數(shù)兢卵,niws.loadbalancer.default.circuitTripMaxTimeoutSeconds
|
配置 | 30(seconds) |
totalCircuitBreakerBlackOutPeriod | 累計(jì)斷路器終端時(shí)間區(qū)間 | 實(shí)時(shí) | milliseconds |
lastAccessedTimestamp | 最后連接時(shí)間 | 實(shí)時(shí) | |
lastConnectionFailedTimestamp | 最后連接失敗時(shí)間 | 實(shí)時(shí) | |
firstConnectionTimestamp | 首次連接時(shí)間 | 實(shí)時(shí) | |
activeRequestsCount | 當(dāng)前活躍的連接數(shù) | 實(shí)時(shí) | |
failureCountSlidingWindowInterval | 失敗次數(shù)統(tǒng)計(jì)時(shí)間窗 | 配置 | 1000(ms) |
serverFailureCounts | 當(dāng)前時(shí)間窗內(nèi)連接失敗的數(shù)量 | 實(shí)時(shí) | |
responseTimeDist.mean | 請(qǐng)求平均響應(yīng)時(shí)間 | 實(shí)時(shí) | (ms) |
responseTimeDist.max | 請(qǐng)求最大響應(yīng)時(shí)間 | 實(shí)時(shí) | (ms) |
responseTimeDist.minimum | 請(qǐng)求最小響應(yīng)時(shí)間 | 實(shí)時(shí) | (ms) |
responseTimeDist.minimum | 請(qǐng)求最小響應(yīng)時(shí)間 | 實(shí)時(shí) | (ms) |
responseTimeDist.stddev | 請(qǐng)求響應(yīng)時(shí)間標(biāo)準(zhǔn)差 | 實(shí)時(shí) | (ms) |
dataDist.sampleSize | QoS服務(wù)質(zhì)量采集點(diǎn)大小 | 實(shí)時(shí) | |
dataDist.timestamp | QoS服務(wù)質(zhì)量最后計(jì)算時(shí)間點(diǎn) | 實(shí)時(shí) | |
dataDist.timestampMillis | QoS服務(wù)質(zhì)量最后計(jì)算時(shí)間點(diǎn)毫秒數(shù)习瑰,自1970.1.1開(kāi)始 | 實(shí)時(shí) | |
dataDist.mean | QoS 最近的時(shí)間窗內(nèi)的請(qǐng)求平均響應(yīng)時(shí)間 | 實(shí)時(shí) | |
dataDist.10thPercentile | QoS 10% 處理請(qǐng)求的時(shí)間 | 實(shí)時(shí) | ms |
dataDist.25thPercentile | QoS 25% 處理請(qǐng)求的時(shí)間 | 實(shí)時(shí) | ms |
dataDist.50thPercentile | QoS 50% 處理請(qǐng)求的時(shí)間 | 實(shí)時(shí) | ms |
dataDist.75thPercentile | QoS 75% 處理請(qǐng)求的時(shí)間 | 實(shí)時(shí) | ms |
dataDist.95thPercentile | QoS 95% 處理請(qǐng)求的時(shí)間 | 實(shí)時(shí) | ms |
dataDist.99thPercentile | QoS 99% 處理請(qǐng)求的時(shí)間 | 實(shí)時(shí) | ms |
dataDist.99.5thPercentile | QoS 前99.5% 處理請(qǐng)求的時(shí)間 | 實(shí)時(shí) | ms |
3.4.3.2 服務(wù)斷路器的工作原理
當(dāng)有某個(gè)服務(wù)存在多個(gè)實(shí)例時(shí),在請(qǐng)求的過(guò)程中秽荤,負(fù)載均衡器會(huì)統(tǒng)計(jì)每次請(qǐng)求的情況(請(qǐng)求相應(yīng)時(shí)間甜奄,是否發(fā)生網(wǎng)絡(luò)異常等),當(dāng)出現(xiàn)了請(qǐng)求出現(xiàn)累計(jì)重試時(shí)窃款,負(fù)載均衡器會(huì)標(biāo)識(shí)當(dāng)前
服務(wù)實(shí)例
课兄,設(shè)置當(dāng)前服務(wù)實(shí)例
的斷路的時(shí)間區(qū)間,在此區(qū)間內(nèi)晨继,當(dāng)請(qǐng)求過(guò)來(lái)時(shí)烟阐,負(fù)載均衡器會(huì)將此服務(wù)實(shí)例
從可用服務(wù)實(shí)例列表中
暫時(shí)剔除,優(yōu)先選擇其他服務(wù)實(shí)例紊扬。
相關(guān)統(tǒng)計(jì)信息如下:
ServerStats | 說(shuō)明 | 類(lèi)型 | 默認(rèn)值 |
---|---|---|---|
successiveConnectionFailureCount | 連續(xù)連接失敗計(jì)數(shù) | 實(shí)時(shí) | |
connectionFailureThreshold | 連接失敗閾值通過(guò)屬性niws.loadbalancer.default.connectionFailureCountThreshold 進(jìn)行配置蜒茄,當(dāng)successiveConnectionFailureCount 超過(guò)了此限制時(shí),將計(jì)算熔斷時(shí)間 |
配置 | 3 |
circuitTrippedTimeoutFactor | 斷路器超時(shí)因子,niws.loadbalancer.default.circuitTripTimeoutFactorSeconds
|
配置 | 10(seconds) |
maxCircuitTrippedTimeout | 最大斷路器超時(shí)秒數(shù)餐屎,niws.loadbalancer.default.circuitTripMaxTimeoutSeconds
|
配置 | 30(seconds) |
totalCircuitBreakerBlackOutPeriod | 累計(jì)斷路器終端時(shí)間區(qū)間 | 實(shí)時(shí) | milliseconds |
lastAccessedTimestamp | 最后連接時(shí)間 | 實(shí)時(shí) | |
lastConnectionFailedTimestamp | 最后連接失敗時(shí)間 | 實(shí)時(shí) | |
firstConnectionTimestamp | 首次連接時(shí)間 | 實(shí)時(shí) |
@Monitor(name="CircuitBreakerTripped", type = DataSourceType.INFORMATIONAL)
public boolean isCircuitBreakerTripped() {
return isCircuitBreakerTripped(System.currentTimeMillis());
}
public boolean isCircuitBreakerTripped(long currentTime) {
//斷路器熔斷的時(shí)間戳
long circuitBreakerTimeout = getCircuitBreakerTimeout();
if (circuitBreakerTimeout <= 0) {
return false;
}
return circuitBreakerTimeout > currentTime;//還在熔斷區(qū)間內(nèi)檀葛,則返回熔斷結(jié)果
}
//獲取熔斷超時(shí)時(shí)間
private long getCircuitBreakerTimeout() {
long blackOutPeriod = getCircuitBreakerBlackoutPeriod();
if (blackOutPeriod <= 0) {
return 0;
}
return lastConnectionFailedTimestamp + blackOutPeriod;
}
//返回中斷毫秒數(shù)
private long getCircuitBreakerBlackoutPeriod() {
int failureCount = successiveConnectionFailureCount.get();
int threshold = connectionFailureThreshold.get();
// 連續(xù)失敗,但是尚未超過(guò)上限腹缩,則服務(wù)中斷周期為 0 屿聋,表示可用
if (failureCount < threshold) {
return 0;
}
//當(dāng)鏈接失敗超過(guò)閾值時(shí)空扎,將進(jìn)行熔斷,熔斷的時(shí)間間隔為:
int diff = (failureCount - threshold) > 16 ? 16 : (failureCount - threshold);
int blackOutSeconds = (1 << diff) * circuitTrippedTimeoutFactor.get();
if (blackOutSeconds > maxCircuitTrippedTimeout.get()) {
blackOutSeconds = maxCircuitTrippedTimeout.get();
}
return blackOutSeconds * 1000L;
}
熔斷時(shí)間的計(jì)算
- 計(jì)算累計(jì)連接失敗計(jì)數(shù)
successiveConnectionFailureCount
是否超過(guò) 鏈接失敗閾值connectionFailureThreshold
润讥。如果successiveConnectionFailureCount
<connectionFailureThreshold
,即尚未超過(guò)限額转锈,則熔斷時(shí)間為 0 ;反之象对,如果超過(guò)限額黑忱,則進(jìn)行步驟2的計(jì)算; - 計(jì)算失敗基數(shù)勒魔,最大不得超過(guò) 16:
diff = (failureCount - threshold) > 16 ? 16 : (failureCount - threshold)
- 根據(jù)超時(shí)因子
circuitTrippedTimeoutFactor
計(jì)算超時(shí)時(shí)間:int blackOutSeconds = (1 << diff) * circuitTrippedTimeoutFactor.get();
- 超時(shí)時(shí)間不得超過(guò)最大超時(shí)時(shí)間`maxCircuitTrippedTimeout 上線甫煞,
當(dāng)有鏈接失敗情況出現(xiàn)斷路邏輯時(shí),將會(huì)最多:1<<16 * 10 =320s
冠绢、最少1<<1 * 10 =100s
的請(qǐng)求熔斷時(shí)間抚吠,再此期間內(nèi),此Server將會(huì)被忽略弟胀。
即:
熔斷時(shí)間最大值:1<<16 * 10 =320s
熔斷時(shí)間最小值:1<<1 * 10 =100s
熔斷統(tǒng)計(jì)何時(shí)清空楷力?
熔斷的統(tǒng)計(jì)有自己的清除策略,當(dāng)如下幾種場(chǎng)景存在時(shí)孵户,熔斷統(tǒng)計(jì)會(huì)清空歸零:
- 當(dāng)請(qǐng)求時(shí)萧朝,發(fā)生的異常不是
斷路攔截類(lèi)
的異常(Exception
)時(shí)(至于如何節(jié)點(diǎn)是否是斷路攔截類(lèi)異常,可以自定義)- 當(dāng)請(qǐng)求未發(fā)生異常夏哭,切且有結(jié)果返回時(shí)
3.4.3.3 定義IRule检柬,從服務(wù)實(shí)例列表中,選擇最合適的Server實(shí)例
由上圖可見(jiàn)竖配,IRule會(huì)從服務(wù)列表中何址,根據(jù)自身定義的規(guī)則,返回最合適的Server實(shí)例进胯,其接口定義如下:
public interface IRule{
/*
* choose one alive server from lb.allServers or
* lb.upServers according to key
*
* @return choosen Server object. NULL is returned if none
* server is available
*/
public Server choose(Object key);
public void setLoadBalancer(ILoadBalancer lb);
public ILoadBalancer getLoadBalancer();
}
Ribbon定義了一些常見(jiàn)的規(guī)則
實(shí)現(xiàn) | 描述 | 備注 |
---|---|---|
RoundRobinRule | 通過(guò)輪詢(xún)的方式用爪,選擇過(guò)程會(huì)有最多10次的重試機(jī)制 | |
RandomRule | 隨機(jī)方式,從列表中隨機(jī)挑選一個(gè)服務(wù) | |
ZoneAvoidanceRule | 基于ZoneAvoidancePredicate斷言和AvailabilityPredicate斷言的規(guī)則胁镐。ZoneAvoidancePredicate計(jì)算出哪個(gè)Zone的服務(wù)最差偎血,然后將此Zone的服務(wù)從服務(wù)列表中剔除掉;而AvaliabitiyPredicate是過(guò)濾掉正處于熔斷狀態(tài)的服務(wù)希停;上述兩個(gè)斷言過(guò)濾出來(lái)的結(jié)果后烁巫,再通過(guò)RoundRobin輪詢(xún)的方式從列表中挑選一個(gè)服務(wù) | |
BestAvailableRule | 最優(yōu)匹配規(guī)則:從服務(wù)列表中給挑選出并發(fā)數(shù)最少的Server | |
RetryRule | 采用了裝飾模式,為Rule提供了重試機(jī)制 | |
WeightedResponseTimeRule | 基于請(qǐng)求響應(yīng)時(shí)間加權(quán)計(jì)算的規(guī)則宠能,如果此規(guī)則沒(méi)有生效,將采用 RoundRobinRule的的策略選擇服務(wù)實(shí)例 |
3.4.3.3.1 RoundRobinRule 的實(shí)現(xiàn)
public class RoundRobinRule extends AbstractLoadBalancerRule {
private AtomicInteger nextServerCyclicCounter;
private static final boolean AVAILABLE_ONLY_SERVERS = true;
private static final boolean ALL_SERVERS = false;
private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);
public RoundRobinRule() {
nextServerCyclicCounter = new AtomicInteger(0);
}
public RoundRobinRule(ILoadBalancer lb) {
this();
setLoadBalancer(lb);
}
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
int count = 0;
//10次重試機(jī)制
while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
// 生成輪詢(xún)數(shù)據(jù)
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
// Next.
server = null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
/**
* Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.
*
* @param modulo The modulo to bound the value of the counter.
* @return The next value.
*/
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextServerCyclicCounter.get();
int next = (current + 1) % modulo;
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}
@Override
public Server choose(Object key) {
return choose(getLoadBalancer(), key);
}
}
3.4.3.3.2 ZoneAvoidanceRule的實(shí)現(xiàn)
ZoneAvoidanceRule的處理思路:
- ZoneAvoidancePredicate 計(jì)算出哪個(gè)Zone的服務(wù)最差磁餐,然后將此Zone的服務(wù)從服務(wù)列表中剔除掉违崇;
- AvailabilityPredicate 將處于熔斷狀態(tài)的服務(wù)剔除掉阿弃;
- 將上述兩步驟過(guò)濾后的服務(wù)通過(guò)RoundRobinRule挑選一個(gè)服務(wù)實(shí)例返回
ZoneAvoidancePredicate 剔除最差的Zone的過(guò)程:
public static Set<String> getAvailableZones(
Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
double triggeringBlackoutPercentage) {
if (snapshot.isEmpty()) {
return null;
}
Set<String> availableZones = new HashSet<String>(snapshot.keySet());
if (availableZones.size() == 1) {
return availableZones;
}
Set<String> worstZones = new HashSet<String>();
double maxLoadPerServer = 0;
boolean limitedZoneAvailability = false;
for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
String zone = zoneEntry.getKey();
ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
int instanceCount = zoneSnapshot.getInstanceCount();
if (instanceCount == 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
double loadPerServer = zoneSnapshot.getLoadPerServer();
//如果負(fù)載超過(guò)限額,則將用可用區(qū)中剔除出去
if (((double) zoneSnapshot.getCircuitTrippedCount())
/ instanceCount >= triggeringBlackoutPercentage
|| loadPerServer < 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
//計(jì)算最差的Zone區(qū)域
if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
// they are the same considering double calculation
// round error
worstZones.add(zone);
} else if (loadPerServer > maxLoadPerServer) {
maxLoadPerServer = loadPerServer;
worstZones.clear();
worstZones.add(zone);
}
}
}
}
// 如果最大負(fù)載沒(méi)有超過(guò)上限羞延,則返回所有可用分區(qū)
if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
// zone override is not needed here
return availableZones;
}
// 從最差的可用分區(qū)中隨機(jī)挑選一個(gè)剔除渣淳,這么做是保證服務(wù)的高可用
String zoneToAvoid = randomChooseZone(snapshot, worstZones);
if (zoneToAvoid != null) {
availableZones.remove(zoneToAvoid);
}
return availableZones;
}
四. Ribbon的配置參數(shù)
控制參數(shù) | 說(shuō)明 | 默認(rèn)值 |
---|---|---|
<service-name>.ribbon.NFLoadBalancerPingInterval | Ping定時(shí)任務(wù)周期 | 30 s |
<service-name>.ribbon.NFLoadBalancerMaxTotalPingTime | Ping超時(shí)時(shí)間 | 2s |
<service-name>.ribbon.NFLoadBalancerRuleClassName | IRule實(shí)現(xiàn)類(lèi) | RoundRobinRule,基于輪詢(xún)調(diào)度算法規(guī)則選擇服務(wù)實(shí)例 |
<service-name>.ribbon.NFLoadBalancerPingClassName | IPing實(shí)現(xiàn)類(lèi) | DummyPing,直接返回true |
<service-name>.ribbon.NFLoadBalancerClassName | 負(fù)載均衡器實(shí)現(xiàn)類(lèi) | 2s |
<service-name>.ribbon.NIWSServerListClassName | ServerList實(shí)現(xiàn)類(lèi) | ConfigurationBasedServerList,基于配置的服務(wù)列表 |
<service-name>.ribbon.ServerListUpdaterClassName | 服務(wù)列表更新類(lèi) | PollingServerListUpdater伴箩, |
<service-name>.ribbon.NIWSServerListFilterClassName | 服務(wù)實(shí)例過(guò)濾器 | 2s |
<service-name>.ribbon.ServerListRefreshInterval | 服務(wù)列表刷新頻率 | 2s |
<service-name>.ribbon.NFLoadBalancerClassName | 自定義負(fù)載均衡器實(shí)現(xiàn)類(lèi) | 2s |
<service-name>.ribbon.NFLoadBalancerClassName | 自定義負(fù)載均衡器實(shí)現(xiàn)類(lèi) | 2s |
<service-name>.ribbon.NFLoadBalancerClassName | 自定義負(fù)載均衡器實(shí)現(xiàn)類(lèi) | 2s |
五. 結(jié)語(yǔ)
Ribbon是Spring Cloud框架中相當(dāng)核心的模塊入愧,負(fù)責(zé)著服務(wù)負(fù)載調(diào)用,Ribbon也可以脫離SpringCloud單獨(dú)使用嗤谚。
另外Ribbon是客戶(hù)端的負(fù)載均衡框架棺蛛,即每個(gè)客戶(hù)端上,獨(dú)立維護(hù)著自身的調(diào)用信息統(tǒng)計(jì)巩步,相互隔離旁赊;也就是說(shuō):Ribbon的負(fù)載均衡表現(xiàn)在各個(gè)機(jī)器上變現(xiàn)并不完全一致
Ribbon 也是整個(gè)組件框架中最復(fù)雜的一環(huán),控制流程上為了保證服務(wù)的高可用性椅野,有很多比較細(xì)節(jié)的參數(shù)控制终畅,在使用的過(guò)程中,需要深入理清每個(gè)環(huán)節(jié)的處理機(jī)制竟闪,這樣在問(wèn)題定位上會(huì)高效很多离福。