引言
在分析Sentinel的上一篇文章中,我們知道了它是基于滑動(dòng)窗口做的流量統(tǒng)計(jì),那么在當(dāng)我們能夠根據(jù)流量統(tǒng)計(jì)算法拿到流量的實(shí)時(shí)數(shù)據(jù)后秸滴,下一步要做的事情自然就是基于這些數(shù)據(jù)做流控柑土。在介紹Sentinel
的流控模型之前,我們先來(lái)簡(jiǎn)單看下 Sentinel 后臺(tái)是如何去定義一個(gè)流控規(guī)則的
對(duì)于上圖的配置Sentinel
把它抽象成一個(gè)FlowRule
類(lèi),與其屬性一一對(duì)應(yīng)
- resource 資源名
- limitApp 限流來(lái)源,默認(rèn)為default不區(qū)分來(lái)源
- grade 限流類(lèi)型,有QPS和并發(fā)線程數(shù)兩種類(lèi)型
- count 限流閾值
- strategy 流控策略 1. 直接 2. 關(guān)聯(lián) 3.鏈路
- controlBehavior 流控效果 1.快速失敗 2.預(yù)熱啟動(dòng) 3.排隊(duì)等待 4. 預(yù)熱啟動(dòng)排隊(duì)等待
- warmUpPeriodSec 流控效果為預(yù)熱啟動(dòng)時(shí)的預(yù)熱時(shí)長(zhǎng)(秒)
- maxQueueingTimeMs 流控效果為排隊(duì)等待時(shí)的等待時(shí)長(zhǎng) (毫秒)
下面我們來(lái)看下選擇流控策略和流控效果的核心代碼
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {
// 根據(jù)流控策略選擇需要流控的Node維度節(jié)點(diǎn)
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
// 獲取配置的流控效果 控制器 (1. 直接拒絕 2. 預(yù)熱啟動(dòng) 3. 排隊(duì) 4. 預(yù)熱啟動(dòng)排隊(duì)等待)
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
上面的代碼比較簡(jiǎn)單流程也很清晰笋轨,首先根據(jù)我們配置的流控策略獲取到合適維度的 Node 節(jié)點(diǎn)(Node節(jié)點(diǎn)是Sentinel做流量統(tǒng)計(jì)的基本單位),然后再獲取到規(guī)則中配置的流控效果控制器(1. 直接拒絕 2. 預(yù)熱啟動(dòng) 3. 排隊(duì)等待 4.預(yù)熱啟動(dòng)排隊(duì)等待)。
流控策略
下面我們來(lái)看下選擇流控策略的源碼分析
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
// 獲取限流來(lái)源 limitApp
String limitApp = rule.getLimitApp();
// 獲取限流策略
int strategy = rule.getStrategy();
// 獲取當(dāng)前 上下文的 來(lái)源
String origin = context.getOrigin();
// 如果規(guī)則配置的限流來(lái)源 limitApp 等于 當(dāng)前上下文來(lái)源
if (limitApp.equals(origin) && filterOrigin(origin)) {
// 且配置的流控策略是 直接關(guān)聯(lián)策略
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// 直接返回當(dāng)前來(lái)源 origin 節(jié)點(diǎn)
return context.getOriginNode();
}
// 配置的策略為關(guān)聯(lián)或則鏈路
return selectReferenceNode(rule, context, node);
// 如果規(guī)則配置的限流來(lái)源 limitApp 等于 default
} else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
// 且配置的流控策略是 直接關(guān)聯(lián)策略
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// 直接返回當(dāng)前資源的 clusterNode
return node.getClusterNode();
}
// 配置的策略為關(guān)聯(lián)或則鏈路
return selectReferenceNode(rule, context, node);
// 如果規(guī)則配置的限流來(lái)源 limitApp 等于 other爵政,且當(dāng)前上下文origin不在流控規(guī)則策略中
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
// 且配置的流控策略是 直接關(guān)聯(lián)策略
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
// 配置的策略為關(guān)聯(lián)或則鏈路
return selectReferenceNode(rule, context, node);
}
return null;
}
static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
// 關(guān)聯(lián)資源名稱(chēng) (如果策略是關(guān)聯(lián) 則是關(guān)聯(lián)的資源名稱(chēng)仅讽,如果策略是鏈路 則是上下文名稱(chēng))
String refResource = rule.getRefResource();
int strategy = rule.getStrategy();
if (StringUtil.isEmpty(refResource)) {
return null;
}
// 策略是關(guān)聯(lián)
if (strategy == RuleConstant.STRATEGY_RELATE) {
// 返回關(guān)聯(lián)的資源ClusterNode
return ClusterBuilderSlot.getClusterNode(refResource);
}
// 策略是鏈路
if (strategy == RuleConstant.STRATEGY_CHAIN) {
// 當(dāng)前上下文名稱(chēng)不是規(guī)則配置的name 直接返回null
if (!refResource.equals(context.getName())) {
return null;
}
return node;
}
// No node.
return null;
}
這段代碼的邏輯判斷比較多,我們稍微理一下整個(gè)過(guò)程
-
LimitApp
的作用域只在配置的流控策略為RuleConstant.STRATEGY_DIRECT
(直接關(guān)聯(lián))時(shí)起作用钾挟。其有三種配置洁灵,分別為default
,origin_name
掺出,other
- default 如果配置為default徽千,表示統(tǒng)計(jì)不區(qū)分來(lái)源,當(dāng)前資源的任何來(lái)源流量都會(huì)被統(tǒng)計(jì)(其實(shí)就是選擇 Node 為 clusterNode 維度)
- origin_name 如果配置為指定名稱(chēng)的 origin_name汤锨,則只會(huì)對(duì)當(dāng)前配置的來(lái)源流量做統(tǒng)計(jì)
- other 如果配置為other 則會(huì)對(duì)其他全部來(lái)源生效但不包括第二條配置的來(lái)源
- 當(dāng)策略配置為 RuleConstant.STRATEGY_RELATE 或 RuleConstant.STRATEGY_CHAIN 時(shí)
- STRATEGY_RELATE 關(guān)聯(lián)其他的指定資源双抽,如資源A想以資源B的流量狀況來(lái)決定是否需要限流,這時(shí)資源A規(guī)則配置可以使用 STRATEGY_RELATE 策略
- STRATEGY_CHAIN 對(duì)指定入口的流量限流闲礼,因?yàn)榱髁靠梢杂卸鄠€(gè)不同的入口(EntranceNode)
- 對(duì)于上面幾點(diǎn)節(jié)點(diǎn)之前的關(guān)系不清楚的可以去看我這篇文章開(kāi)頭的總覽圖 https://www.cnblogs.com/taromilk/p/11750962.html
流控效果
關(guān)于流控效果的配置有四種牍汹,我們來(lái)看下它們的初始化代碼
/**
* class com.alibaba.csp.sentinel.slots.block.flow.FlowRuleUtil
*/
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
// 只有Grade為統(tǒng)計(jì) QPS時(shí) 才可以選擇除默認(rèn)流控效果外的 其他流控效果控制器
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
switch (rule.getControlBehavior()) {
// 預(yù)熱啟動(dòng)
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
ColdFactorProperty.coldFactor);
// 超過(guò) 閾值 排隊(duì)等待 控制器
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
// 上面兩個(gè)的結(jié)合體
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
default:
// Default mode or unknown mode: default traffic shaping controller (fast-reject).
}
}
// 默認(rèn)控制器 超過(guò) 閾值 直接拒絕
return new DefaultController(rule.getCount(), rule.getGrade());
}
可以比較清晰的看到總共對(duì)應(yīng)有四種流控器的初始化
直接拒絕
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 獲取當(dāng)前qps
int curCount = avgUsedTokens(node);
// 判斷是否已經(jīng)大于閾值
if (curCount + acquireCount > count) {
// 如果當(dāng)前流量具有優(yōu)先級(jí),則會(huì)提前去獲取未來(lái)的通過(guò)資格
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
此種策略比較簡(jiǎn)單粗暴柬泽,超過(guò)流量閾值的會(huì)直接拒絕慎菲。不過(guò)這里有一個(gè)小細(xì)節(jié),如果入口流量prioritized為true锨并,也就是優(yōu)先級(jí)比較高露该,則會(huì)通過(guò)占用未來(lái)時(shí)間窗口的名額來(lái)實(shí)現(xiàn)。這個(gè)在上一篇文章有介紹到
預(yù)熱啟動(dòng)
WarmUpController
主要是用來(lái)防止流量的突然上升琳疏,使系統(tǒng)本在穩(wěn)定狀態(tài)下能處理的有决,但是由于許多資源沒(méi)有預(yù)熱闸拿,導(dǎo)致處理不了了空盼。注意這里的預(yù)熱并不是指系統(tǒng)啟動(dòng)之后的一次性預(yù)熱,而是指系統(tǒng)在運(yùn)行的任何時(shí)候流量從低峰到突增的預(yù)熱階段新荤。
下面我們來(lái)看下WarmUpController
的具體實(shí)現(xiàn)類(lèi)
/**
* WarmUpController 構(gòu)造方法
* @param count 當(dāng)前qps閾值
* @param warmUpPeriodInSec 預(yù)熱時(shí)長(zhǎng) 秒
* @param coldFactor 冷啟動(dòng)系數(shù) 默認(rèn)為3
*/
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}
this.count = count;
this.coldFactor = coldFactor;
// 剩余Token的警戒值揽趾,小于警戒值系統(tǒng)就進(jìn)入正常運(yùn)行期
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
// 系統(tǒng)最冷時(shí)候的剩余Token數(shù)
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
// 系統(tǒng)預(yù)熱的速率(斜率)
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long passQps = (long) node.passQps();
long previousQps = (long) node.previousPassQps();
// 計(jì)算當(dāng)前的 剩余 token 數(shù)
syncToken(previousQps);
// 如果進(jìn)入了警戒線,開(kāi)始調(diào)整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
// 計(jì)算剩余token超出警戒值的值
long aboveToken = restToken - warningToken;
// 計(jì)算當(dāng)前允許通過(guò)的最大 qps
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
// 不在預(yù)熱階段苛骨,則直接判斷當(dāng)前qps是否大于閾值
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
首先是構(gòu)造方法篱瞎,主要關(guān)注2個(gè)重要參數(shù)
- warningToken 剩余token的警戒值
- maxToken 剩余的最大token數(shù),如果剩余token數(shù)等于maxToken痒芝,則說(shuō)明系統(tǒng)處于最冷階段
要理解這兩個(gè)參數(shù)的含義俐筋,可以參考令牌桶算法,每通過(guò)一個(gè)請(qǐng)求严衬,就會(huì)從令牌桶中取走一個(gè)令牌澄者。那么試想一下,當(dāng)令牌桶中的令牌達(dá)到最大值是,是不是意味著系統(tǒng)目前處于最冷階段粱挡,因?yàn)橥袄锏牧钆剖冀K處于一個(gè)非常飽和的狀態(tài)赠幕。這里的令牌最大值對(duì)應(yīng)的就是maxToken
,而warningToken
询筏,則是對(duì)應(yīng)了一個(gè)警戒值榕堰,當(dāng)桶中的令牌數(shù)減少到一個(gè)指定的值時(shí),說(shuō)明系統(tǒng)已經(jīng)度過(guò)了預(yù)熱階段
當(dāng)一個(gè)請(qǐng)求進(jìn)來(lái)時(shí)嫌套,首先需要計(jì)算當(dāng)前桶中剩余的token數(shù)逆屡,具體邏輯在syncToken
方法中
當(dāng)系統(tǒng)剩余Token大于warningToken時(shí),說(shuō)明系統(tǒng)仍處于預(yù)熱階段灌危,故需要調(diào)整當(dāng)前所能通過(guò)的最大qps閾值
protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
// 獲取秒級(jí)別時(shí)間(去除毫秒)
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
if (currentTime <= oldLastFillTime) {
return;
}
long oldValue = storedTokens.get();
// 判斷是否需要往桶中添加令牌
long newValue = coolDownTokens(currentTime, passQps);
// 設(shè)置新的token數(shù)
if (storedTokens.compareAndSet(oldValue, newValue)) {
// 如果設(shè)置成功的話則減去上次通過(guò)的qps數(shù)量康二,就得到當(dāng)前的實(shí)際token數(shù)
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
- 獲取當(dāng)前時(shí)間
- coolDownTokens 方法會(huì)判斷是否需要往桶中放 token,并返回最新的token數(shù)
- 如果返回了最新的token數(shù)勇蝙,則將當(dāng)前剩余的token數(shù)減去已經(jīng)通過(guò)的qps沫勿,得到最新的剩余token數(shù)
private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;
// 添加令牌的幾種情況
// 1. 系統(tǒng)初始啟動(dòng)階段,oldvalue = 0味混,lastFilledTime也等于0产雹,此時(shí)得到一個(gè)非常大的newValue,會(huì)取maxToken為當(dāng)前token數(shù)量值
// 2. 系統(tǒng)處于預(yù)熱階段 且 當(dāng)前qps小于 count / coldFactor
// 3. 系統(tǒng)處于完成預(yù)熱階段
if (oldValue < warningToken) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
return Math.min(newValue, maxToken);
}
這里看一下會(huì)添加令牌的幾種情況
- 系統(tǒng)初始啟動(dòng)階段翁锡,oldvalue = 0蔓挖,lastFilledTime也等于0,此時(shí)得到一個(gè)非常大的newValue馆衔,會(huì)取maxToken為當(dāng)前token數(shù)量值
- 系統(tǒng)處于完成預(yù)熱階段瘟判,需要補(bǔ)充 token 使其穩(wěn)定在一個(gè)范圍內(nèi)
- 系統(tǒng)處于預(yù)熱階段 且 當(dāng)前qps小于 count / coldFactor
前2種情況比較好理解,這里主要解釋一下第三種情況角溃,為何 當(dāng)前qps
小于count / coldFactor
時(shí)拷获,需要往桶中添加Token?試想一下如果沒(méi)有這一步會(huì)怎么樣减细,如果沒(méi)有這一步在比較低的qps情況下補(bǔ)充Token匆瓜,系統(tǒng)最終也會(huì)慢慢度過(guò)預(yù)熱階段,但實(shí)際上這么低的qps(小于 count / coldFactor時(shí)
)不應(yīng)該完成預(yù)熱未蝌。所以這里才會(huì)在 qps低于count / coldFactor
時(shí)補(bǔ)充剩余token數(shù)驮吱,來(lái)讓系統(tǒng)在低qps情況下始終處于預(yù)熱狀態(tài)下
排隊(duì)等待
排隊(duì)等待的實(shí)現(xiàn)相對(duì)預(yù)熱啟動(dòng)實(shí)現(xiàn)比較簡(jiǎn)單
首先會(huì)通過(guò)我們的配置,計(jì)算出相鄰兩個(gè)請(qǐng)求允許通過(guò)的最小時(shí)間萧吠,然后會(huì)記錄最近一個(gè)通過(guò)的時(shí)間左冬。兩者相加即是下一次請(qǐng)求允許通過(guò)的最小時(shí)間。
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
if (acquireCount <= 0) {
return true;
}
if (count <= 0) {
return false;
}
long currentTime = TimeUtil.currentTimeMillis();
// 計(jì)算相隔兩個(gè)請(qǐng)求 需要相隔多長(zhǎng)時(shí)間
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// 本次期望通過(guò)的最小時(shí)間
long expectedTime = costTime + latestPassedTime.get();
// 如果當(dāng)前時(shí)間大于期望時(shí)間纸型,說(shuō)明qps還未超過(guò)閾值拇砰,直接通過(guò)
if (expectedTime <= currentTime) {
latestPassedTime.set(currentTime);
return true;
} else {
// 當(dāng)前時(shí)間小于于期望時(shí)間九昧,請(qǐng)求過(guò)快了,需要排隊(duì)等待指定時(shí)間
// 計(jì)算等待時(shí)間
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
// 等待時(shí)長(zhǎng)大于我們?cè)O(shè)置的最大時(shí)長(zhǎng)毕匀,則不通過(guò)
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
// 否則則排隊(duì)等待铸鹰,占用下通過(guò)時(shí)間
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
// 判斷等待時(shí)間是否已經(jīng)大于最大值
if (waitTime > maxQueueingTimeMs) {
// 大于則將上一步加的值重新減去
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
// 占用等待時(shí)間成功,直接sleep costTime
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
排隊(duì)等待控制器的核心策略其實(shí)就是圍繞了latestPassedTime
進(jìn)行的皂岔,latestPassedTime
指的是上一次請(qǐng)求通過(guò)的時(shí)間蹋笼,通過(guò)latestPassedTime
+ costTime
來(lái)與當(dāng)前時(shí)間做比較,來(lái)判斷當(dāng)前請(qǐng)求是否可以通過(guò)躁垛,無(wú)法通過(guò)的請(qǐng)求則會(huì)優(yōu)先占用latestPassedTime
時(shí)間剖毯,直到sleep到可以通過(guò)的時(shí)間。當(dāng)然我們也可以配置排隊(duì)等待的最大時(shí)間教馆,來(lái)限制目前排隊(duì)等待通過(guò)的請(qǐng)求數(shù)量逊谋。
預(yù)熱啟動(dòng)排隊(duì)等待
預(yù)熱排隊(duì)等待,WarmUpRateLimiterController
實(shí)現(xiàn)類(lèi)我們發(fā)現(xiàn)其繼承了WarmUpController
土铺,這是Sentinel在1.4版本后新加的一種控制器胶滋,其實(shí)就是預(yù)熱啟動(dòng)和排隊(duì)等待的結(jié)合體,具體源碼我們就不做分析悲敷。
尾言
Sentinel
的流控策略和流控效果的相結(jié)合使用還是非常巧妙的究恤,當(dāng)中的一些設(shè)計(jì)思想還是非常有借鑒意義的
Sentinel系列