一捐康、引子
該篇是插槽分析的最后一篇仇矾。
何為熱點?熱點即經常訪問的數(shù)據(jù)解总。很多時候我們希望統(tǒng)計某個熱點數(shù)據(jù)中訪問頻次最高的 Top K 數(shù)據(jù)贮匕,并對其訪問進行限制。比如:
- 商品 ID 為參數(shù)倾鲫,統(tǒng)計一段時間內最常購買的商品 ID 并進行限制
- 用戶 ID 為參數(shù)粗合,針對一段時間內頻繁訪問的用戶 ID 進行限制
熱點參數(shù)限流會統(tǒng)計傳入?yún)?shù)中的熱點參數(shù)萍嬉,并根據(jù)配置的限流閾值與模式,對包含熱點參數(shù)的資源調用進行限流隙疚。熱點參數(shù)限流可以看做是一種特殊的流量控制壤追,僅對包含熱點參數(shù)的資源調用生效。
Sentinel 利用 LRU 策略供屉,結合底層的滑動窗口機制來實現(xiàn)熱點參數(shù)統(tǒng)計行冰。LRU 策略可以統(tǒng)計單位時間內,最近最常訪問的熱點參數(shù)伶丐,而滑動窗口機制可以幫助統(tǒng)計每個參數(shù)的 QPS悼做。
二、使用
使用熱點限流功能哗魂,需要引入以下依賴:
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-parameter-flow-control</artifactId>
<version>x.y.z</version>
</dependency>
然后為對應的資源配置熱點參數(shù)限流規(guī)則肛走,并在 entry
的時候傳入相應的參數(shù),即可使熱點參數(shù)限流生效录别。
注:若自行擴展并注冊了自己實現(xiàn)的
SlotChainBuilder
朽色,并希望使用熱點參數(shù)限流功能,則可以在chain
里面合適的地方插入ParamFlowSlot
组题。
public static Entry entry(String name, EntryType type, int count, Object... args) throws BlockException
public static Entry entry(Method method, EntryType type, int count, Object... args) throws BlockException
其中最后的一串 args 就是要傳入的參數(shù)葫男,有多個就按照次序依次傳入。比如要傳入兩個參數(shù) paramA 和 paramB崔列,則可以:
// paramA in index 0, paramB in index 1.
SphU.entry(resourceName, EntryType.IN, 1, paramA, paramB);
三梢褐、熱點參數(shù)規(guī)則
熱點參數(shù)規(guī)則(ParamFlowRule
)類似于流量控制規(guī)則(FlowRule
):
屬性 | 說明 | 默認值 |
---|---|---|
resource | 資源名,必填 | |
count | 限流閾值赵讯,必填 | |
grade | 限流模式 | QPS 模式 |
paramIdx | 熱點參數(shù)的索引盈咳,必填,對應 SphU.entry(xxx, args) 中的參數(shù)索引位置 |
|
paramFlowItemList | 參數(shù)例外項瘦癌,可以針對指定的參數(shù)值單獨設置限流閾值猪贪,不受前面 count 閾值的限制。僅支持基本類型
|
可以通過 ParamFlowRuleManager
的 loadRules
方法更新熱點參數(shù)規(guī)則讯私,下面是一個示例:
ParamFlowRule rule = new ParamFlowRule(resourceName)
.setParamIdx(0)
.setCount(5);
// 針對 int 類型的參數(shù) PARAM_B,單獨設置限流 QPS 閾值為 10西傀,而不是全局的閾值 5.
ParamFlowItem item = new ParamFlowItem().setObject(String.valueOf(PARAM_B))
.setClassType(int.class.getName())
.setCount(10);
rule.setParamFlowItemList(Collections.singletonList(item));
ParamFlowRuleManager.loadRules(Collections.singletonList(rule));
四斤寇、源碼分析
1、ParamFlowSlot
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
throws Throwable {
//檢查該資源是否存在限流資源
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
fireEntry(context, resourceWrapper, node, count, args);
return;
}
//限流規(guī)則檢測
checkFlow(resourceWrapper, count, args);
fireEntry(context, resourceWrapper, node, count, args);
}
void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args)
throws BlockException {
//若存在限流規(guī)則
if (ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
//獲取限流規(guī)則
List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
if (rules == null) {
return;
}
for (ParamFlowRule rule : rules) {
// Initialize the parameter metrics.
//初始化參數(shù)統(tǒng)計
initHotParamMetricsFor(resourceWrapper, rule.getParamIdx());
//若熱點參數(shù)限流符合拥褂,則進行限流處理
if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
// Here we add the block count.
addBlockCount(resourceWrapper, count, args);
String message = "";
if (args.length > rule.getParamIdx()) {
Object value = args[rule.getParamIdx()];
message = String.valueOf(value);
}
throw new ParamFlowException(resourceWrapper.getName(), message);
}
}
}
}
通過checkFlow進行限流規(guī)則檢測:
- 若該資源存在限流規(guī)則娘锁,并獲取該資源的限流規(guī)則;否則跳過檢測饺鹃;
- 循環(huán)判斷規(guī)則莫秆,并先初始化熱點參數(shù)統(tǒng)計窗口间雀,如下:
void initHotParamMetricsFor(ResourceWrapper resourceWrapper, /*@Valid*/ int index) {
ParameterMetric metric;
// Assume that the resource is valid.
if ((metric = metricsMap.get(resourceWrapper)) == null) {
synchronized (LOCK) {
if ((metric = metricsMap.get(resourceWrapper)) == null) {
metric = new ParameterMetric();
metricsMap.put(resourceWrapper, metric);
RecordLog.info("[ParamFlowSlot] Creating parameter metric for: " + resourceWrapper.getName());
}
}
}
metric.initializeForIndex(index);
}
通過ParameterMetric的initialiazeForIndex初始了一個時間窗口。
- 通過ParamFlowChecker的passCheck方法檢測規(guī)則镊屎。
2惹挟、ParamFlowChecker
static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,
Object... args) {
// 如果參數(shù)不存在直接返回
if (args == null) {
return true;
}
int paramIdx = rule.getParamIdx();
//參數(shù)的個數(shù)小于規(guī)則的索引直接返回
if (args.length <= paramIdx) {
return true;
}
Object value = args[paramIdx];
//規(guī)則調用
return passLocalCheck(resourceWrapper, rule, count, value);
}
private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object value) {
try {
if (Collection.class.isAssignableFrom(value.getClass())) {
for (Object param : ((Collection)value)) {
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
} else if (value.getClass().isArray()) {
int length = Array.getLength(value);
for (int i = 0; i < length; i++) {
Object param = Array.get(value, i);
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
} else {
return passSingleValueCheck(resourceWrapper, rule, count, value);
}
} catch (Throwable e) {
RecordLog.info("[ParamFlowChecker] Unexpected error", e);
}
return true;
}
在passLocalCheck方法中:
1.依次對Collection類型,Array類型缝驳,非數(shù)組集合類型進行處理连锯,進入到passSingleValueCheck中;
static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object value) {
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
//獲取當前參數(shù)在該資源時間窗口內通過的數(shù)量
double curCount = getHotParameters(resourceWrapper).getPassParamQps(rule.getParamIdx(), value);
if (exclusionItems.contains(value)) {
// Pass check for exclusion items.
int itemQps = rule.getParsedHotItems().get(value);
return curCount + count <= itemQps;
} else if (curCount + count > rule.getCount()) {
if ((curCount - rule.getCount()) < 1 && (curCount - rule.getCount()) > 0) {
return true;
}
return false;
}
}
return true;
}
- 獲取rule的解析參數(shù)集合HotItems用狱;
- 獲取當前參數(shù)在該資源時間窗口內通過的數(shù)量运怖;
- 如果exclusionItems(參數(shù)例外項)包含該值,判斷
curCount + count
大于itemQps
夏伊,itemQps
是該參數(shù)對應的count摇展,若大于則說明超過限流閾值,則返回false溺忧,反之返回true咏连; - 如果exclusionItems不包含該值,并且
curCount + count
大于rule.getCount()
時砸狞,并判斷(curCount - rule.getCount()) < 1 && (curCount - rule.getCount()) > 0
是否滿足捻勉,若滿足則返回true,反之返回false刀森。
3踱启、ParameterMetric
//保存了對應資源的時間窗口數(shù)據(jù)
private Map<Integer, HotParameterLeapArray> rollingParameters =
new ConcurrentHashMap<Integer, HotParameterLeapArray>();
public Map<Integer, HotParameterLeapArray> getRollingParameters() {
return rollingParameters;
}
public synchronized void clear() {
rollingParameters.clear();
}
// 初始化一個時間窗口
public void initializeForIndex(int index) {
if (!rollingParameters.containsKey(index)) {
synchronized (this) {
// putIfAbsent
if (rollingParameters.get(index) == null) {
rollingParameters.put(index, new HotParameterLeapArray(
1000 / SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL));
}
}
}
}
熱點參數(shù)統(tǒng)計
//增加通過數(shù)
public void addPass(int count, Object... args) {
add(RollingParamEvent.REQUEST_PASSED, count, args);
}
//增加阻塞數(shù)
public void addBlock(int count, Object... args) {
add(RollingParamEvent.REQUEST_BLOCKED, count, args);
}
@SuppressWarnings("rawtypes")
private void add(RollingParamEvent event, int count, Object... args) {
if (args == null) {
return;
}
try {
for (int index = 0; index < args.length; index++) {
HotParameterLeapArray param = rollingParameters.get(index);
if (param == null) {
continue;
}
Object arg = args[index];
if (arg == null) {
continue;
}
if (Collection.class.isAssignableFrom(arg.getClass())) {
for (Object value : ((Collection)arg)) {
param.addValue(event, count, value);
}
} else if (arg.getClass().isArray()) {
int length = Array.getLength(arg);
for (int i = 0; i < length; i++) {
Object value = Array.get(arg, i);
param.addValue(event, count, value);
}
} else {
param.addValue(event, count, arg);
}
}
} catch (Throwable e) {
RecordLog.warn("[ParameterMetric] Param exception", e);
}
}
- 可以發(fā)現(xiàn)熱點參數(shù)的統(tǒng)計也是基于滑動時間窗口統(tǒng)計,這個就不具體分析了研底,滑動時間窗口前面有講解埠偿,見滑動時間窗口。
2.有點不同的是:在限流規(guī)則里指標的是通過LongAdder分段統(tǒng)計的榜晦,而熱點參數(shù)的指標是通過AtomicInteger的addAndGet方法統(tǒng)計的冠蒋。
public ParamMapBucket add(RollingParamEvent event, int count, Object value) {
data[event.ordinal()].putIfAbsent(value, new AtomicInteger());
AtomicInteger counter = data[event.ordinal()].get(value);
counter.addAndGet(count);
return this;
}
3.熱點參數(shù)獲取的實際上是統(tǒng)計范圍內一個平均值。如下:
public double getPassParamQps(int index, Object value) {
try {
HotParameterLeapArray parameter = rollingParameters.get(index);
if (parameter == null || value == null) {
return -1;
}
return parameter.getRollingAvg(RollingParamEvent.REQUEST_PASSED, value);
} catch (Throwable e) {
RecordLog.info(e.getMessage(), e);
}
return -1;
}
public double getRollingAvg(RollingParamEvent event, Object value) {
return ((double) getRollingSum(event, value)) / getIntervalInSec();
}
public long getRollingSum(RollingParamEvent event, Object value) {
currentWindow();
long sum = 0;
List<ParamMapBucket> buckets = this.values();
for (ParamMapBucket b : buckets) {
sum += b.get(event, value);
}
return sum;
}
五乾胶、我的總結
- 本文介紹熱點參數(shù)的概念抖剿、使用、及部分源碼分析识窿。
- 熱點參數(shù)限流使用需要單獨引用Jar包斩郎,并設置規(guī)則后才會啟動限流效果。
- 目前熱點參數(shù)限流只是支持QPS模式喻频,且還支持額外參數(shù)項進行限流缩宜。
- 熱點參數(shù)的參數(shù)限流統(tǒng)計也是基于滑動窗口統(tǒng)計的,內部使用了AtomicInteger的原子結構統(tǒng)計及ConcurrentLinkedHashMap結構作為數(shù)據(jù)存儲。
以上內容锻煌,如有不當之處妓布,請指正