引子
在上篇文章中我們介紹了sentinel中的滑動窗口算法,發(fā)現(xiàn)限流的準確度依賴于劃分的子窗口數(shù)量。而在很多情況下拣凹,我們的限流更多的是需要限制到參數(shù)級別,比如我們需要限制每個用戶調(diào)用接口的頻次恨豁,這種就需要精確到接口參數(shù)級別嚣镜。而如果用滑動窗口來實現(xiàn),就需要統(tǒng)計每個窗口內(nèi)不同參數(shù)對應的請求數(shù)量橘蜜,這樣過于復雜菊匿,對于這種熱點參數(shù)的限流,sentinel根據(jù)限流效果的不同计福,分別使用了漏桶算法和令牌桶算法來進行實現(xiàn)跌捆。
漏桶算法原理
主要目的是控制數(shù)據(jù)注入到網(wǎng)絡的速率,平滑網(wǎng)絡上的突發(fā)流量象颖。漏桶算法提供了一種機制疹蛉,通過它,突發(fā)流量可以被整形以便為網(wǎng)絡提供一個穩(wěn)定的流量力麸】煽睿看下漏桶算法的原理圖
請求先進入到漏桶里,漏桶以一定的速度出水克蚂,當水請求過大會直接溢出闺鲸,可以看出漏桶算法能強行限制數(shù)據(jù)的傳輸速率。
sentinel中的漏桶算法實現(xiàn)
@Spi(order = -3000)
public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode>{
void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
if (args == null) {
return;
}
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
return;
}
//獲取熱點參數(shù)配置
List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
for (ParamFlowRule rule : rules) {
applyRealParamIdx(rule, args.length);
// Initialize the parameter metrics.
//初始化熱點參數(shù)
ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
//passCheck 校驗請求是否通過
if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
String triggeredParam = "";
if (args.length > rule.getParamIdx()) {
Object value = args[rule.getParamIdx()];
triggeredParam = String.valueOf(value);
}
throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
}
}
}
}
public final class ParamFlowChecker{
public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,
Object... args) {
if (args == null) {
return true;
}
int paramIdx = rule.getParamIdx();
if (args.length <= paramIdx) {
return true;
}
// Get parameter value.
Object value = args[paramIdx];
// Assign value with the result of paramFlowKey method
if (value instanceof ParamFlowArgument) {
value = ((ParamFlowArgument) value).paramFlowKey();
}
// If value is null, then pass
if (value == null) {
return true;
}
//集群參數(shù)流控
if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
return passClusterCheck(resourceWrapper, rule, count, value);
}
//單機參數(shù)流控
return passLocalCheck(resourceWrapper, rule, count, value);
}
private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
Object value) {
try {
//參數(shù)類型如果為Collection埃叭,循環(huán)校驗每個參數(shù)
if (Collection.class.isAssignableFrom(value.getClass())) {
for (Object param : ((Collection)value)) {
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
}
//參數(shù)類型如果為Collection摸恍,循環(huán)校驗每個參數(shù)
else if (value.getClass().isArray()) {
int length = Array.getLength(value);
for (int i = 0; i < length; i++) {
Object param = Array.get(value, i);
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
}
//單個元素校驗
else {
return passSingleValueCheck(resourceWrapper, rule, count, value);
}
} catch (Throwable e) {
RecordLog.warn("[ParamFlowChecker] Unexpected error", e);
}
return true;
}
重點來了,passThrottleLocalCheck方法就是漏桶算法的具體實現(xiàn),主要原理就是:根據(jù)rule配置的每多少秒可以通過多少請求來計算出一個請求需要多少毫秒costTime立镶,取出上次請求通過的時間+costTime壁袄,就能得到我們期望此次請求到來的時間(expectedTime),如果請求到達時間大于期望時間媚媒,則放行嗜逻;如果小于期望時間,說明流量過于密集缭召,需要限制其速率栈顷,等待一段時間后(expectedTime-now)再放行。
/**
* 勻速排隊限流
* @param resourceWrapper
* @param rule
* @param acquireCount
* @param value
* @return
*/
static boolean passThrottleLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
ParameterMetric metric = getParameterMetric(resourceWrapper);
CacheMap<Object, AtomicLong> timeRecorderMap = metric == null ? null : metric.getRuleTimeCounter(rule);
if (timeRecorderMap == null) {
return true;
}
// Calculate max token count (threshold)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}
if (tokenCount == 0) {
return false;
}
//根據(jù)rule配置的每多少秒可以通過多少請求來計算出一個請求需要多少毫秒
long costTime = Math.round(1.0 * 1000 * acquireCount * rule.getDurationInSec() / tokenCount);
while (true) {
long currentTime = TimeUtil.currentTimeMillis();
AtomicLong timeRecorder = timeRecorderMap.putIfAbsent(value, new AtomicLong(currentTime));
if (timeRecorder == null) {
return true;
}
//AtomicLong timeRecorder = timeRecorderMap.get(value);
//上次請求通過的時間
long lastPassTime = timeRecorder.get();
//期望的請求到達時間
long expectedTime = lastPassTime + costTime;
/**
* 實際請求到達時間如果大于期望的請求到達時間 并且 多出來的這部分時間大于設置的等待時間嵌巷,則限流
* 否則請求
*/
if (expectedTime <= currentTime || expectedTime - currentTime < rule.getMaxQueueingTimeMs()) {
AtomicLong lastPastTimeRef = timeRecorderMap.get(value);
if (lastPastTimeRef.compareAndSet(lastPassTime, currentTime)) {
long waitTime = expectedTime - currentTime;
/**
* 等待時間大于0萄凤,說明請求在期望時間之前到達,流量太過密集搪哪,等待之后放行靡努;小于0,說明請求在期望時間之后到達晓折,直接放行
*/
if (waitTime > 0) {
lastPastTimeRef.set(expectedTime);
try {
TimeUnit.MILLISECONDS.sleep(waitTime);
} catch (InterruptedException e) {
RecordLog.warn("passThrottleLocalCheck: wait interrupted", e);
}
}
return true;
} else {
Thread.yield();
}
} else {
return false;
}
}
}
}
令牌桶算法原理
對于很多應用場景來說惑朦,除了要求能夠限制數(shù)據(jù)的平均傳輸速率外,還要求允許某種程度的突發(fā)傳輸已维。這時候漏桶算法可能就不合適了,令牌桶算法更為適合已日。令牌桶算法是網(wǎng)絡流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一種算法垛耳。典型情況下,令牌桶算法用來控制發(fā)送到網(wǎng)絡上的數(shù)據(jù)的數(shù)目飘千,并允許突發(fā)數(shù)據(jù)的發(fā)送堂鲜。如圖所示,令牌桶算法的原理是系統(tǒng)會以一個恒定的速度往桶里放入令牌护奈,而如果請求需要被處理缔莲,則需要先從桶里獲取一個令牌,當桶里沒有令牌可取時霉旗,則拒絕服務痴奏。
sentinel-令牌桶算法實現(xiàn)
public final class ParamFlowChecker {
static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
ParameterMetric metric = getParameterMetric(resourceWrapper);
//令牌計數(shù)器
CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
//時間計數(shù)器
CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);
if (tokenCounters == null || timeCounters == null) {
return true;
}
// Calculate max token count (threshold)
//針對參數(shù)例外項單獨限流
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}
if (tokenCount == 0) {
return false;
}
//最大令牌數(shù) = 設置的閾值 + 額外允許的突發(fā)流量數(shù)
long maxCount = tokenCount + rule.getBurstCount();
if (acquireCount > maxCount) {
return false;
}
while (true) {
long currentTime = TimeUtil.currentTimeMillis();
//獲取上一次令牌更新時間
AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
//表示系統(tǒng)啟動后的第一次請求,直接返回校驗通過
if (lastAddTokenTime == null) {
// Token never added, just replenish the tokens and consume {@code acquireCount} immediately.
//計算在時間窗口內(nèi)有多少次請求可以通過
tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
return true;
}
// Calculate the time duration since last token was added.
//計算此次請求距離上一次請求(這里的上一次請求指的是一個統(tǒng)計窗口周期內(nèi)的第一個請求)過去了多久
long passTime = currentTime - lastAddTokenTime.get();
// A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.
if (passTime > rule.getDurationInSec() * 1000) {
//一個統(tǒng)計時間窗口已經(jīng)過去厌秒,會重新計算在此時間窗口內(nèi)有多少次請求可以通過
AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
if (oldQps == null) {
// Might not be accurate here.
lastAddTokenTime.set(currentTime);
return true;
} else {
//獲取請求通過時剩余令牌數(shù)
long restQps = oldQps.get();
/** * 舉例說明读拆,比如1s 允許通過100個請求,當前時間是1900ms鸵闪,
* 上一次請求時間是800ms * 那么totalCount = 1100 * 100 / 1 * 1000 = 110
*/
long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
/**
* 最新的qps需要之前剩余的加上間隔時間內(nèi)新增的檐晕,但是不能超過最大值
*/
long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
: (restQps + toAddCount - acquireCount);
if (newQps < 0) {
return false;
}
if (oldQps.compareAndSet(restQps, newQps)) {
//這里需要更新lastAddTokenTime
lastAddTokenTime.set(currentTime);
return true;
}
Thread.yield();
}
} else {
//未超過時間窗口,獲取剩余令牌數(shù)
AtomicLong oldQps = tokenCounters.get(value);
if (oldQps != null) {
long oldQpsValue = oldQps.get();
if (oldQpsValue - acquireCount >= 0) {
if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
//這里不需要更新lastAddTokenTime,因為還在一個統(tǒng)計窗口時間周期內(nèi)
return true;
}
} else {
return false;
}
}
Thread.yield();
}
}
}
}
代碼較為冗長辟灰,我們來看一下整體的流程圖:
令牌桶算法最重要的就是關注令牌生成的部分个榕。而在sentinel的實現(xiàn)中,令牌生成是在 當前請求時間距離上一次請求時間已經(jīng)大于設置的窗口時間芥喇。那么該算法是怎么支持突發(fā)流量的呢西采。
我們以一個具體示例來看:
假定我們設置一分鐘請求不能超過1000次。時間窗口durationInSec = 60s
,tokenCount = 1000;允許突發(fā)的流量為burstCount = 2000; maxCount = tokenCount + burstCount = 3000;
假設第1s來了一個請求乃坤,因為首次請求苛让,通過,剩余令牌數(shù)為 2999湿诊。在第10s-20s 流量增大陸續(xù)來了2900次請求狱杰,因為都在一個窗口時間內(nèi)且令牌數(shù)足夠,所有請求均通過厅须,剩余令牌數(shù)為9仿畸。隨后教長一段時間沒有請求,一直到第90s請求陸續(xù)到達朗和,這時90s距離上一次請求時間20s已經(jīng)過去了一個時間窗口错沽,我們需要計算在這 70s內(nèi)我們需要新增多少個令牌數(shù)
toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000)
=70s * 1000ms * 1000 / 60s * 1000ms = 1166.67
newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
: (restQps + toAddCount - acquireCount);
上次剩余的令牌數(shù) restQps = 9
newQps = 1166.67 + 9 -1 = 1174.67 < maxCount
所有在接下來的 60s-120s 時間段內(nèi),請求總數(shù)不可以超過1174.67眶拉。
但是隨著時間的流逝千埃,只要在一段時間內(nèi)沒有請求過來,令牌又會達到maxCount個忆植,理論上只要3分鐘內(nèi)沒有請求到達放可,令牌數(shù)最少又會恢復到maxCount=3000個。
我們發(fā)現(xiàn)朝刊,sentinel應對突發(fā)流量耀里,有個緩沖的效果,首次流量突發(fā)之后拾氓,如果再有流量突發(fā)的情況冯挎,必須給系統(tǒng)足夠的緩沖時間,否則依然會被限流咙鞍。
總結(jié):
漏桶算法和令牌桶算法兩者主要區(qū)別在于“漏桶算法”能夠強行限制數(shù)據(jù)的傳輸速率房官,而“令牌桶算法”在能夠限制數(shù)據(jù)的平均傳輸速率外,還允許某種程度的突發(fā)傳輸续滋。在“令牌桶算法”中易阳,只要令牌桶中存在令牌,那么就允許突發(fā)地傳輸數(shù)據(jù)直到達到用戶配置的門限吃粒,所以它適合于具有突發(fā)特性的流量