何為熱點(diǎn)
熱點(diǎn)即經(jīng)常訪(fǎng)問(wèn)的數(shù)據(jù)毒租。很多時(shí)候我們希望統(tǒng)計(jì)某個(gè)熱點(diǎn)數(shù)據(jù)中訪(fǎng)問(wèn)頻次最高的 Top K 數(shù)據(jù),并對(duì)其訪(fǎng)問(wèn)進(jìn)行限制澈魄,比如:
- 商品 ID 為參數(shù)吵瞻,統(tǒng)計(jì)一段時(shí)間內(nèi)最常購(gòu)買(mǎi)的商品 ID 并進(jìn)行限制
- 用戶(hù) ID 為參數(shù),針對(duì)一段時(shí)間內(nèi)頻繁訪(fǎng)問(wèn)的用戶(hù) ID 進(jìn)行限制
版本
本文基于 1.8.0
如何使用
- pom 中引入如下
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>${sentinel.version}</version>
</dependency>
<!-- 熱點(diǎn)參數(shù)限流 -->
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-parameter-flow-control</artifactId>
<version>${sentinel.version}</version>
</dependency>
- 定義 ParamFlowRule
private static void loadRules() {
ParamFlowRule rule = new ParamFlowRule(RESOURCE_KEY)
.setParamIdx(0) // 指定當(dāng)前 rule 對(duì)應(yīng)的熱點(diǎn)參數(shù)索引
.setGrade(RuleConstant.FLOW_GRADE_QPS) // 限流的維度从媚,該策略針對(duì) QPS 限流
.setDurationInSec(1) // 限流的單位時(shí)間
.setCount(50) // 未使用指定熱點(diǎn)參數(shù)時(shí)逞泄,該資源限流大小為50
.setParamFlowItemList(new ArrayList<>());
// item1 設(shè)置了對(duì) goods_id = goods_uuid1 的限流,單位時(shí)間(DurationInSec)內(nèi)只能訪(fǎng)問(wèn)10次
ParamFlowItem item1 = new ParamFlowItem().setObject("goods_uuid1") // 熱點(diǎn)參數(shù) value
.setClassType(String.class.getName()) // 熱點(diǎn)參數(shù)數(shù)據(jù)類(lèi)型
.setCount(10); // 針對(duì)該value的限流值
ParamFlowRuleManager.loadRules(Collections.singletonList(rule));
}
這里的配置屬性后文講源碼的時(shí)候都會(huì)看到拜效,所以要重點(diǎn)關(guān)注一下
- Rule 本身可以定義一個(gè)限流閾值喷众,每個(gè)熱點(diǎn)參數(shù)也可以定義自己的限流閾值
- 還可以為限流閥值設(shè)置一個(gè)單位時(shí)間
- 調(diào)用
try {
// 調(diào)用限流
entry = SphU.entry(RESOURCE_KEY, EntryType.IN, 1, hotParamValue);
// 業(yè)務(wù)代碼...
} catch (BlockException e) {
// 當(dāng)前請(qǐng)求被限流
e.printStackTrace();
} finally {
if (entry != null) {
entry.exit(1, hotParamValue);
}
}
之前有用過(guò) Sentinel 的同學(xué)的話(huà)其實(shí)很好理解。配置方面的話(huà) Rule 屬性有些不同紧憾,調(diào)用方面到千,需要添加上本次調(diào)用相關(guān)的參數(shù)
舉個(gè)例子,我們配置了對(duì)商品 ID = 1 的限流規(guī)則赴穗,每次請(qǐng)求商品接口之前調(diào)用 Sentinel 的限流 API憔四,指定 Resource
并傳入當(dāng)前要訪(fǎng)問(wèn)的商品 ID膀息。
如果 Sentinel 能找到 Resource 對(duì)應(yīng)的 Rule,則根據(jù) Rule 進(jìn)行限流了赵。Rule 中如果找到 arg
對(duì)應(yīng)的熱點(diǎn)參數(shù)配置潜支,則使用熱點(diǎn)參數(shù)的閾值進(jìn)行限流。找不到的話(huà)柿汛,則使用 Rule 中的閾值冗酿。
實(shí)現(xiàn)原理
Sentinel 整體采用了責(zé)任鏈的設(shè)計(jì)模式(類(lèi)似 Servlet Filter),每次調(diào)用 SphU.entry
時(shí)络断,都會(huì)經(jīng)歷一系列功能插槽(slot chain)已烤。不同的 Slot 職責(zé)不同,有的是負(fù)責(zé)收集信息妓羊,有的是負(fù)責(zé)根據(jù)不同的算法策略進(jìn)行熔斷限流操作胯究,關(guān)于整體流程大家可以閱讀下 官網(wǎng) 中對(duì) Sentinel 工作流程的介紹。
ParamFlowSlot
關(guān)于熱點(diǎn)參數(shù)限流的邏輯在 com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot
中
public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// ParamFlowManager 中沒(méi)有對(duì)應(yīng)的 Rule躁绸,則執(zhí)行下一個(gè)Slot
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
fireEntry(context, resourceWrapper, node, count, prioritized, args);
return;
}
// 限流檢查
checkFlow(resourceWrapper, count, args);
// 執(zhí)行下一個(gè)Slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
// 執(zhí)行下一個(gè)Slot
fireExit(context, resourceWrapper, count, args);
}
void applyRealParamIdx(/*@NonNull*/ ParamFlowRule rule, int length) {
int paramIdx = rule.getParamIdx();
if (paramIdx < 0) {
if (-paramIdx <= length) {
rule.setParamIdx(length + paramIdx);
} else {
// Illegal index, give it a illegal positive value, latter rule checking will pass.
rule.setParamIdx(-paramIdx);
}
}
}
void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
if (args == null) {
return;
}
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
return;
}
// 獲取 resource 對(duì)應(yīng)的全部 ParamFlowRule
List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
for (ParamFlowRule rule : rules) {
applyRealParamIdx(rule, args.length);
// 初始化該 Rule 需要的限流指標(biāo)數(shù)據(jù)
ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
// 如果不滿(mǎn)足某個(gè) Rule 則拋出異常裕循,代表當(dāng)前請(qǐng)求被限流
if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
String triggeredParam = "";
if (args.length > rule.getParamIdx()) {
Object value = args[rule.getParamIdx()];
triggeredParam = String.valueOf(value);
}
throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
}
}
}
}
ParamFlowSlot 中代碼不多,也沒(méi)做什么事净刮。參考注釋的話(huà)應(yīng)該很好理解剥哑。咱們直接挑干的講,來(lái)看下 ParamFlowChecker 中是如何實(shí)現(xiàn)限流的
ParamFlowChecker 數(shù)據(jù)結(jié)構(gòu)
熱點(diǎn)參數(shù)限流使用的算法為令牌桶算法淹父,首先來(lái)看一下數(shù)據(jù)結(jié)構(gòu)是如何存儲(chǔ)的
public class ParameterMetric {
/**
* Format: (rule, (value, timeRecorder))
*
* @since 1.6.0
*/
private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTimeCounters = new HashMap<>();
/**
* Format: (rule, (value, tokenCounter))
*
* @since 1.6.0
*/
private final Map<ParamFlowRule, CacheMap<Object, AtomicLong>> ruleTokenCounter = new HashMap<>();
private final Map<Integer, CacheMap<Object, AtomicInteger>> threadCountMap = new HashMap<>();
// 省略...
}
Sentinel 中 Resource 代表當(dāng)前要訪(fǎng)問(wèn)的資源(方法或者api接口)株婴,一個(gè) Resource 可以對(duì)應(yīng)多個(gè) Rule,這些 Rule 可以是相同的 class暑认。
現(xiàn)在再來(lái)看 ParameterMetric 的結(jié)構(gòu)困介,每個(gè) Resource 對(duì)應(yīng)一個(gè) ParameterMetric 對(duì)象,上述 CacheMap<Object, AtomicLong>
的 Key 代表熱點(diǎn)參數(shù)的值蘸际,Value 則是對(duì)應(yīng)的計(jì)數(shù)器座哩。
所以這里數(shù)據(jù)結(jié)構(gòu)的關(guān)系是這樣的
- 一個(gè) Resource 有一個(gè) ParameterMetric
- 一個(gè) ParameterMetric 統(tǒng)計(jì)了多個(gè) Rule 所需要的限流指標(biāo)數(shù)據(jù)
- 每個(gè) Rule 又可以配置多個(gè)熱點(diǎn)參數(shù)
CacheMap 的默認(rèn)實(shí)現(xiàn),包裝了
com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap
使用該類(lèi)的主要原因是為了實(shí)現(xiàn)熱點(diǎn)參數(shù)的 LRU
詳細(xì)解釋一下粮彤,這三個(gè)變量
- ruleTimeCounters :記錄令牌桶的最后添加時(shí)間根穷,用于 QPS 限流
- ruleTokenCounter :記錄令牌桶的令牌數(shù)量,用于 QPS 限流
- threadCountMap :用于線(xiàn)程級(jí)別限流导坟,這個(gè)其實(shí)和令牌桶算法沒(méi)有關(guān)系了屿良,線(xiàn)程限流只是在 Rule 中定義了最大線(xiàn)程數(shù),請(qǐng)求時(shí)判斷一下當(dāng)前的線(xiàn)程數(shù)是否大于最大線(xiàn)程惫周,具體的應(yīng)用在
ParamFlowChecker#passSingleValueCheck
實(shí)際使用 ParameterMetric 時(shí)尘惧,使用 ParameterMetricStorage 獲取 Resource 對(duì)應(yīng)的 ParameterMetric
public final class ParameterMetricStorage {
// Format (Resource, ParameterMetric)
private static final Map<String, ParameterMetric> metricsMap = new ConcurrentHashMap<>();
// 省略相關(guān)代碼
}
ParamFlowChecker 執(zhí)行邏輯
ParamFlowChecker 中 QPS 級(jí)限流支持兩種策略
- CONTROL_BEHAVIOR_RATE_LIMITER :請(qǐng)求速率限制,對(duì)應(yīng)的方法
ParamFlowChecker#passThrottleLocalCheck
- DEFAULT :只要桶中還有令牌闯两,就可以通過(guò)褥伴,對(duì)應(yīng)的方法
ParamFlowChecker#passDefaultLocalCheck
接下來(lái)我們將以 passDefaultLocalCheck 為例谅将,進(jìn)行分析。但是在這之前重慢,先來(lái)捋一下饥臂,從 ParamFlowSlot#checkFlow
到 ParamFlowChecker#passDefaultLocalCheck
這中間都經(jīng)歷了什么,詳見(jiàn)??
// 偽代碼似踱,忽略了一些參數(shù)傳遞
checkFlow() {
// if 沒(méi)有對(duì)應(yīng)的 rule隅熙,跳出 ParamFlowSlot 邏輯
// if args == null,跳出 ParamFlowSlot 邏輯
List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
rules.forEach(r -> {
// 初始化該 Rule 需要的限流指標(biāo)數(shù)據(jù)
ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
// 拋出限流異常
}
})
}
passCheck() {
// 從 args 中獲取本次限流需要使用的 value
int paramIdx = rule.getParamIdx();
Object value = args[paramIdx];
// 根據(jù) rule 判斷是該請(qǐng)求使用集群限流還是本地限流
if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
return passClusterCheck(resourceWrapper, rule, count, value);
}
return passLocalCheck(resourceWrapper, rule, count, value);
}
passLocalCheck() {
// 如果 value 是 Collection 或者 Array
// Sentinel 認(rèn)為這一組數(shù)據(jù)都需要經(jīng)過(guò)熱點(diǎn)參數(shù)限流校驗(yàn)
// 遍歷所有值調(diào)用熱點(diǎn)參數(shù)限流校驗(yàn)
if (isCollectionOrArray(value)) {
value.forEach(v -> {
// 當(dāng)數(shù)組中某個(gè) value 無(wú)法通過(guò)限流校驗(yàn)時(shí)核芽,return false 外部會(huì)拋出限流異常
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
})
}
}
passSingleValueCheck() {
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {
// 速率限制
return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);
} else {
// 默認(rèn)限流
return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);
}
} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {
// 線(xiàn)程級(jí)限流邏輯
}
}
上面提到了一個(gè)集群限流囚戚,和上一篇中說(shuō)到的集群限流實(shí)現(xiàn)原理是一樣的,選出一臺(tái) Server 來(lái)做限流決策轧简,所有客戶(hù)端的限流請(qǐng)求都咨詢(xún) Server驰坊,由 Server 來(lái)決定。由于不是本文重點(diǎn)哮独,就不多說(shuō)了拳芙。
ParamFlowChecker 限流核心代碼
鋪墊了這么多,終于迎來(lái)了我們的主角 ParamFlowChecker#passDefaultLocalCheck
皮璧,該方法中實(shí)現(xiàn)了簡(jiǎn)單的令牌桶算法舟扎,用于熱點(diǎn)參數(shù)限流
static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
// 根據(jù) resource 獲取 ParameterMetric
ParameterMetric metric = getParameterMetric(resourceWrapper);
// 根據(jù) rule 從 metric 中獲取當(dāng)前 rule 的計(jì)數(shù)器
CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);
if (tokenCounters == null || timeCounters == null) {
return true;
}
// Calculate max token count (threshold)
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long tokenCount = (long)rule.getCount();
// 如果熱點(diǎn)參數(shù)中包含當(dāng)前 value,則使用熱點(diǎn)參數(shù)配置的count悴务,否則使用 rule 中定義的 count
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}
if (tokenCount == 0) {
return false;
}
long maxCount = tokenCount + rule.getBurstCount();
// 當(dāng)前申請(qǐng)的流量 和 最大流量比較
if (acquireCount > maxCount) {
return false;
}
while (true) {
long currentTime = TimeUtil.currentTimeMillis();
// 這里相當(dāng)于對(duì)當(dāng)前 value 對(duì)應(yīng)的令牌桶進(jìn)行初始化
AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
if (lastAddTokenTime == null) {
// Token never added, just replenish the tokens and consume {@code acquireCount} immediately.
tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
return true;
}
// Calculate the time duration since last token was added.
long passTime = currentTime - lastAddTokenTime.get();
// A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.
if (passTime > rule.getDurationInSec() * 1000) {
// 補(bǔ)充 token
AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
if (oldQps == null) {
// Might not be accurate here.
lastAddTokenTime.set(currentTime);
return true;
} else {
long restQps = oldQps.get();
// 每毫秒應(yīng)該生成的 token = tokenCount / (rule.getDurationInSec() * 1000)
// 再 * passTime 即等于應(yīng)該補(bǔ)充的 token
long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
// 補(bǔ)充的 token 不會(huì)超過(guò)最大值
long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
: (restQps + toAddCount - acquireCount);
if (newQps < 0) {
return false;
}
if (oldQps.compareAndSet(restQps, newQps)) {
lastAddTokenTime.set(currentTime);
return true;
}
Thread.yield();
}
} else {
// 直接操作計(jì)數(shù)器扣減即可
AtomicLong oldQps = tokenCounters.get(value);
if (oldQps != null) {
long oldQpsValue = oldQps.get();
if (oldQpsValue - acquireCount >= 0) {
if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
return true;
}
} else {
return false;
}
}
Thread.yield();
}
}
}
令牌桶算法核心思想如下圖所示睹限,結(jié)合這個(gè)圖咱們?cè)賮?lái)理解理解代碼
核心邏輯在 while 循環(huán)中,咱們直接挑干的講
先回顧一下上面說(shuō)過(guò) tokenCounters 和 timeCounters讯檐,在默認(rèn)限流實(shí)現(xiàn)中羡疗,這兩個(gè)參數(shù)分別代表最后添加令牌時(shí)間,令牌剩余數(shù)量
while 邏輯:
- 首先如果當(dāng)前 value 對(duì)應(yīng)的令牌桶為空裂垦,則執(zhí)行初始化
- 計(jì)算當(dāng)前時(shí)間到上次添加 token 時(shí)間經(jīng)歷了多久顺囊,即
passTime = currentTime - lastAddTokenTime.get()
用于判斷是否需要添加 token
2.1if (pass > rule 中設(shè)定的限流單位時(shí)間)
,則使用原子操作為令牌桶補(bǔ)充 token(具體補(bǔ)充 token 的邏輯詳見(jiàn)上面代碼注釋?zhuān)?br> 2.2else 不需要補(bǔ)充 token
蕉拢,使用原子操作扣減令牌
可以看到關(guān)于 token 的操作全是使用原子操作(CAS),保證了線(xiàn)程安全诚亚。如果原子操作更新失敗晕换,則會(huì)繼續(xù)執(zhí)行。
速率限制的實(shí)現(xiàn)
再順便叨咕下上面說(shuō)過(guò)CONTROL_BEHAVIOR_RATE_LIMITER
速率限制策略是如何實(shí)現(xiàn)的站宗,只簡(jiǎn)單說(shuō)說(shuō)思路闸准,具體細(xì)節(jié)大家可以自己看下源碼
該策略中邑遏,僅使用 timeCounters捻撑,該參數(shù)存儲(chǔ)的數(shù)據(jù)變成了 lastPassTime
(最后通過(guò)時(shí)間)悔橄,所以這個(gè)實(shí)現(xiàn)和令牌桶也沒(méi)啥關(guān)系了
新的請(qǐng)求到來(lái)時(shí)帝美,首先根據(jù) Rule 中定義時(shí)間范圍,count 計(jì)算 costTime
库快,代表每隔多久才能通過(guò)一個(gè)請(qǐng)求
long costTime = Math.round(1.0 * 1000 * acquireCount * rule.getDurationInSec() / tokenCount);
只有 lastPassTime + costTime <= currentTime
摸袁,請(qǐng)求才有可能成功通過(guò),lastPassTime + costTime 過(guò)大會(huì)導(dǎo)致限流义屏。
最后
如果覺(jué)得我的文章對(duì)你有幫助靠汁,動(dòng)動(dòng)小手點(diǎn)下關(guān)注,你的支持是對(duì)我最大的幫助