Sentinel之Slots插槽源碼分析熱點參數(shù)限流(七)

一捐康、引子

該篇是插槽分析的最后一篇仇矾。
何為熱點?熱點即經常訪問的數(shù)據(jù)解总。很多時候我們希望統(tǒng)計某個熱點數(shù)據(jù)中訪問頻次最高的 Top K 數(shù)據(jù)贮匕,并對其訪問進行限制。比如:

  • 商品 ID 為參數(shù)倾鲫,統(tǒng)計一段時間內最常購買的商品 ID 并進行限制
  • 用戶 ID 為參數(shù)粗合,針對一段時間內頻繁訪問的用戶 ID 進行限制

熱點參數(shù)限流會統(tǒng)計傳入?yún)?shù)中的熱點參數(shù)萍嬉,并根據(jù)配置的限流閾值與模式,對包含熱點參數(shù)的資源調用進行限流隙疚。熱點參數(shù)限流可以看做是一種特殊的流量控制壤追,僅對包含熱點參數(shù)的資源調用生效。

Sentinel 利用 LRU 策略供屉,結合底層的滑動窗口機制來實現(xiàn)熱點參數(shù)統(tǒng)計行冰。LRU 策略可以統(tǒng)計單位時間內,最近最常訪問的熱點參數(shù)伶丐,而滑動窗口機制可以幫助統(tǒng)計每個參數(shù)的 QPS悼做。

二、使用

使用熱點限流功能哗魂,需要引入以下依賴:

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-parameter-flow-control</artifactId>
    <version>x.y.z</version>
</dependency>

然后為對應的資源配置熱點參數(shù)限流規(guī)則肛走,并在 entry的時候傳入相應的參數(shù),即可使熱點參數(shù)限流生效录别。

注:若自行擴展并注冊了自己實現(xiàn)的 SlotChainBuilder朽色,并希望使用熱點參數(shù)限流功能,則可以在 chain 里面合適的地方插入ParamFlowSlot组题。

public static Entry entry(String name, EntryType type, int count, Object... args) throws BlockException
public static Entry entry(Method method, EntryType type, int count, Object... args) throws BlockException

其中最后的一串 args 就是要傳入的參數(shù)葫男,有多個就按照次序依次傳入。比如要傳入兩個參數(shù) paramA 和 paramB崔列,則可以:

// paramA in index 0, paramB in index 1.
SphU.entry(resourceName, EntryType.IN, 1, paramA, paramB);

三梢褐、熱點參數(shù)規(guī)則

熱點參數(shù)規(guī)則(ParamFlowRule)類似于流量控制規(guī)則(FlowRule):

屬性 說明 默認值
resource 資源名,必填
count 限流閾值赵讯,必填
grade 限流模式 QPS 模式
paramIdx 熱點參數(shù)的索引盈咳,必填,對應 SphU.entry(xxx, args) 中的參數(shù)索引位置
paramFlowItemList 參數(shù)例外項瘦癌,可以針對指定的參數(shù)值單獨設置限流閾值猪贪,不受前面 count 閾值的限制。僅支持基本類型

可以通過 ParamFlowRuleManagerloadRules 方法更新熱點參數(shù)規(guī)則讯私,下面是一個示例:

ParamFlowRule rule = new ParamFlowRule(resourceName)
    .setParamIdx(0)
    .setCount(5);
// 針對 int 類型的參數(shù) PARAM_B,單獨設置限流 QPS 閾值為 10西傀,而不是全局的閾值 5.
ParamFlowItem item = new ParamFlowItem().setObject(String.valueOf(PARAM_B))
    .setClassType(int.class.getName())
    .setCount(10);
rule.setParamFlowItemList(Collections.singletonList(item));

ParamFlowRuleManager.loadRules(Collections.singletonList(rule));

四斤寇、源碼分析

1、ParamFlowSlot

    @Override
 public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
        throws Throwable {

        //檢查該資源是否存在限流資源
        if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
            fireEntry(context, resourceWrapper, node, count, args);
            return;
        }

        //限流規(guī)則檢測
        checkFlow(resourceWrapper, count, args);
        fireEntry(context, resourceWrapper, node, count, args);
    }

 void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args)
        throws BlockException {
        //若存在限流規(guī)則
        if (ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
           //獲取限流規(guī)則
            List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
            if (rules == null) {
                return;
            }

            for (ParamFlowRule rule : rules) {
                // Initialize the parameter metrics.
                //初始化參數(shù)統(tǒng)計
                initHotParamMetricsFor(resourceWrapper, rule.getParamIdx());
                //若熱點參數(shù)限流符合拥褂,則進行限流處理
                if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {

                    // Here we add the block count.
                    addBlockCount(resourceWrapper, count, args);

                    String message = "";
                    if (args.length > rule.getParamIdx()) {
                        Object value = args[rule.getParamIdx()];
                        message = String.valueOf(value);
                    }
                    throw new ParamFlowException(resourceWrapper.getName(), message);
                }
            }
        }
    }

通過checkFlow進行限流規(guī)則檢測:

  1. 若該資源存在限流規(guī)則娘锁,并獲取該資源的限流規(guī)則;否則跳過檢測饺鹃;
  2. 循環(huán)判斷規(guī)則莫秆,并先初始化熱點參數(shù)統(tǒng)計窗口间雀,如下:
  void initHotParamMetricsFor(ResourceWrapper resourceWrapper, /*@Valid*/ int index) {
        ParameterMetric metric;
        // Assume that the resource is valid.
        if ((metric = metricsMap.get(resourceWrapper)) == null) {
            synchronized (LOCK) {
                if ((metric = metricsMap.get(resourceWrapper)) == null) {
                    metric = new ParameterMetric();
                    metricsMap.put(resourceWrapper, metric);
                    RecordLog.info("[ParamFlowSlot] Creating parameter metric for: " + resourceWrapper.getName());
                }
            }
        }
        metric.initializeForIndex(index);
    }

通過ParameterMetric的initialiazeForIndex初始了一個時間窗口。

  1. 通過ParamFlowChecker的passCheck方法檢測規(guī)則镊屎。

2惹挟、ParamFlowChecker

  static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,
                             Object... args) {
        // 如果參數(shù)不存在直接返回
        if (args == null) {
            return true;
        }

        int paramIdx = rule.getParamIdx();
        //參數(shù)的個數(shù)小于規(guī)則的索引直接返回
        if (args.length <= paramIdx) {
            return true;
        }

        Object value = args[paramIdx];
        
        //規(guī)則調用
        return passLocalCheck(resourceWrapper, rule, count, value);
    }

  private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object value) {
        try {
            if (Collection.class.isAssignableFrom(value.getClass())) {
                for (Object param : ((Collection)value)) {
                    if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
                        return false;
                    }
                }
            } else if (value.getClass().isArray()) {
                int length = Array.getLength(value);
                for (int i = 0; i < length; i++) {
                    Object param = Array.get(value, i);
                    if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
                        return false;
                    }
                }
            } else {
                return passSingleValueCheck(resourceWrapper, rule, count, value);
            }
        } catch (Throwable e) {
            RecordLog.info("[ParamFlowChecker] Unexpected error", e);
        }

        return true;
    }

在passLocalCheck方法中:
1.依次對Collection類型,Array類型缝驳,非數(shù)組集合類型進行處理连锯,進入到passSingleValueCheck中;


    static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object value) {
        Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
        if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
         //獲取當前參數(shù)在該資源時間窗口內通過的數(shù)量
            double curCount = getHotParameters(resourceWrapper).getPassParamQps(rule.getParamIdx(), value);

            if (exclusionItems.contains(value)) {
                // Pass check for exclusion items.
                int itemQps = rule.getParsedHotItems().get(value);
                return curCount + count <= itemQps;
            } else if (curCount + count > rule.getCount()) {
                if ((curCount - rule.getCount()) < 1 && (curCount - rule.getCount()) > 0) {
                    return true;
                }
                return false;
            }
        }

        return true;
    }
  1. 獲取rule的解析參數(shù)集合HotItems用狱;
  2. 獲取當前參數(shù)在該資源時間窗口內通過的數(shù)量运怖;
  3. 如果exclusionItems(參數(shù)例外項)包含該值,判斷curCount + count 大于itemQps夏伊,itemQps是該參數(shù)對應的count摇展,若大于則說明超過限流閾值,則返回false溺忧,反之返回true咏连;
  4. 如果exclusionItems不包含該值,并且curCount + count 大于rule.getCount()時砸狞,并判斷(curCount - rule.getCount()) < 1 && (curCount - rule.getCount()) > 0是否滿足捻勉,若滿足則返回true,反之返回false刀森。

3踱启、ParameterMetric

   //保存了對應資源的時間窗口數(shù)據(jù)
   private Map<Integer, HotParameterLeapArray> rollingParameters =
        new ConcurrentHashMap<Integer, HotParameterLeapArray>();

    public Map<Integer, HotParameterLeapArray> getRollingParameters() {
        return rollingParameters;
    }

    public synchronized void clear() {
        rollingParameters.clear();
    }

    // 初始化一個時間窗口
    public void initializeForIndex(int index) {
        if (!rollingParameters.containsKey(index)) {
            synchronized (this) {
                // putIfAbsent
                if (rollingParameters.get(index) == null) {
                    rollingParameters.put(index, new HotParameterLeapArray(
                        1000 / SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL));
                }
            }
        }
    }

熱點參數(shù)統(tǒng)計

//增加通過數(shù)
 public void addPass(int count, Object... args) {
        add(RollingParamEvent.REQUEST_PASSED, count, args);
    }
//增加阻塞數(shù)
public void addBlock(int count, Object... args) {
        add(RollingParamEvent.REQUEST_BLOCKED, count, args);
}

@SuppressWarnings("rawtypes")
private void add(RollingParamEvent event, int count, Object... args) {
        if (args == null) {
            return;
        }
        try {
            for (int index = 0; index < args.length; index++) {
                HotParameterLeapArray param = rollingParameters.get(index);
                if (param == null) {
                    continue;
                }

                Object arg = args[index];
                if (arg == null) {
                    continue;
                }
                if (Collection.class.isAssignableFrom(arg.getClass())) {
                    for (Object value : ((Collection)arg)) {
                        param.addValue(event, count, value);
                    }
                } else if (arg.getClass().isArray()) {
                    int length = Array.getLength(arg);
                    for (int i = 0; i < length; i++) {
                        Object value = Array.get(arg, i);
                        param.addValue(event, count, value);
                    }
                } else {
                    param.addValue(event, count, arg);
                }

            }
        } catch (Throwable e) {
            RecordLog.warn("[ParameterMetric] Param exception", e);
        }
    }
  1. 可以發(fā)現(xiàn)熱點參數(shù)的統(tǒng)計也是基于滑動時間窗口統(tǒng)計,這個就不具體分析了研底,滑動時間窗口前面有講解埠偿,見滑動時間窗口
    2.有點不同的是:在限流規(guī)則里指標的是通過LongAdder分段統(tǒng)計的榜晦,而熱點參數(shù)的指標是通過AtomicInteger的addAndGet方法統(tǒng)計的冠蒋。
 public ParamMapBucket add(RollingParamEvent event, int count, Object value) {
        data[event.ordinal()].putIfAbsent(value, new AtomicInteger());
        AtomicInteger counter = data[event.ordinal()].get(value);
        counter.addAndGet(count);
        return this;
    }

3.熱點參數(shù)獲取的實際上是統(tǒng)計范圍內一個平均值。如下:

 public double getPassParamQps(int index, Object value) {
        try {
            HotParameterLeapArray parameter = rollingParameters.get(index);
            if (parameter == null || value == null) {
                return -1;
            }
            return parameter.getRollingAvg(RollingParamEvent.REQUEST_PASSED, value);
        } catch (Throwable e) {
            RecordLog.info(e.getMessage(), e);
        }

        return -1;
    }

public double getRollingAvg(RollingParamEvent event, Object value) {
        return ((double) getRollingSum(event, value)) / getIntervalInSec();
 }

public long getRollingSum(RollingParamEvent event, Object value) {
        currentWindow();

        long sum = 0;

        List<ParamMapBucket> buckets = this.values();
        for (ParamMapBucket b : buckets) {
            sum += b.get(event, value);
        }

        return sum;
    }

五乾胶、我的總結

  1. 本文介紹熱點參數(shù)的概念抖剿、使用、及部分源碼分析识窿。
  2. 熱點參數(shù)限流使用需要單獨引用Jar包斩郎,并設置規(guī)則后才會啟動限流效果。
  3. 目前熱點參數(shù)限流只是支持QPS模式喻频,且還支持額外參數(shù)項進行限流缩宜。
  4. 熱點參數(shù)的參數(shù)限流統(tǒng)計也是基于滑動窗口統(tǒng)計的,內部使用了AtomicInteger的原子結構統(tǒng)計及ConcurrentLinkedHashMap結構作為數(shù)據(jù)存儲。

以上內容锻煌,如有不當之處妓布,請指正

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市宋梧,隨后出現(xiàn)的幾起案子匣沼,更是在濱河造成了極大的恐慌,老刑警劉巖乃秀,帶你破解...
    沈念sama閱讀 206,214評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件肛著,死亡現(xiàn)場離奇詭異,居然都是意外死亡跺讯,警方通過查閱死者的電腦和手機枢贿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,307評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來刀脏,“玉大人局荚,你說我怎么就攤上這事∮郏” “怎么了耀态?”我有些...
    開封第一講書人閱讀 152,543評論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長暂雹。 經常有香客問我首装,道長,這世上最難降的妖魔是什么杭跪? 我笑而不...
    開封第一講書人閱讀 55,221評論 1 279
  • 正文 為了忘掉前任仙逻,我火速辦了婚禮,結果婚禮上涧尿,老公的妹妹穿的比我還像新娘系奉。我一直安慰自己,他們只是感情好姑廉,可當我...
    茶點故事閱讀 64,224評論 5 371
  • 文/花漫 我一把揭開白布缺亮。 她就那樣靜靜地躺著,像睡著了一般桥言。 火紅的嫁衣襯著肌膚如雪萌踱。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,007評論 1 284
  • 那天号阿,我揣著相機與錄音虫蝶,去河邊找鬼。 笑死倦西,一個胖子當著我的面吹牛,可吹牛的內容都是我干的赁严。 我是一名探鬼主播扰柠,決...
    沈念sama閱讀 38,313評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼粉铐,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了卤档?” 一聲冷哼從身側響起蝙泼,我...
    開封第一講書人閱讀 36,956評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎劝枣,沒想到半個月后汤踏,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經...
    沈念sama閱讀 43,441評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡舔腾,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 35,925評論 2 323
  • 正文 我和宋清朗相戀三年溪胶,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片稳诚。...
    茶點故事閱讀 38,018評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡哗脖,死狀恐怖,靈堂內的尸體忽然破棺而出扳还,到底是詐尸還是另有隱情才避,我是刑警寧澤,帶...
    沈念sama閱讀 33,685評論 4 322
  • 正文 年R本政府宣布氨距,位于F島的核電站桑逝,受9級特大地震影響,放射性物質發(fā)生泄漏俏让。R本人自食惡果不足惜楞遏,卻給世界環(huán)境...
    茶點故事閱讀 39,234評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望舆驶。 院中可真熱鬧橱健,春花似錦、人聲如沸沙廉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,240評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽撬陵。三九已至珊皿,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間巨税,已是汗流浹背蟋定。 一陣腳步聲響...
    開封第一講書人閱讀 31,464評論 1 261
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留草添,地道東北人驶兜。 一個月前我還...
    沈念sama閱讀 45,467評論 2 352
  • 正文 我出身青樓,卻偏偏與公主長得像,于是被迫代替她去往敵國和親抄淑。 傳聞我的和親對象是個殘疾皇子屠凶,可洞房花燭夜當晚...
    茶點故事閱讀 42,762評論 2 345

推薦閱讀更多精彩內容