分布式限流Sentinel

為什么要限流

眾所周知呀癣,互聯(lián)網(wǎng)電商的各類活動(dòng)是越來(lái)越多,例如削減男同胞錢包厚度的雙十一弦赖、618项栏、雙十二、各類秒殺活動(dòng)等蹬竖,幾乎所有的互聯(lián)網(wǎng)電商企業(yè)都會(huì)參與其中沼沈,沖擊GMV,會(huì)電商平臺(tái)帶來(lái)巨大的流量與可觀的利潤(rùn)币厕。

作為互聯(lián)網(wǎng)電商中的一員列另,我自己所屬的公司雖然遠(yuǎn)比不上淘寶、京東等旦装,但作為社交電商領(lǐng)域的領(lǐng)頭羊页衙,我們?cè)谏鲜鰧?duì)于電商企業(yè)及其特殊的日子,流量也是不容小覷的阴绢。

嗯店乐,畢竟我們的注冊(cè)用戶數(shù)已經(jīng)超過(guò)了6000W了,
供應(yīng)商已經(jīng)超過(guò)1.5W家了呻袭,去年雙十一單天的GMV也突破了2億RMB眨八,
嘿嘿,強(qiáng)行給自己公司打Call

好了左电,讓我們進(jìn)入這期的主題廉侧。例如在雙十一、或者周年慶等這種特殊的日子篓足,當(dāng)12點(diǎn)剛到那一刻段誊,巨大的用戶流量涌入你們的系統(tǒng),訪問(wèn)量突然劇增時(shí)纷纫,我們是如何保證系統(tǒng)的可用性枕扫、穩(wěn)定性。我們的解決方案主要是通過(guò)Sentinel的限流辱魁、降級(jí)烟瞧、熔斷(增加服務(wù)器數(shù)量就不說(shuō)了)以及消息中間件的削峰(我會(huì)專門寫一期關(guān)于消息中間件的文章诗鸭,到時(shí)候大家可以看看)。沒(méi)錯(cuò)参滴,本期的主角出現(xiàn)了强岸,他就是Sentinel,阿里開(kāi)源的面向分布式服務(wù)框架的輕量級(jí)流量控制框架砾赔。官網(wǎng)如下:https://github.com/alibaba/Sentinel

為什么選擇Sentinel

以下是另一個(gè)開(kāi)源的流量控制框架hystrix與Sentinel的對(duì)比

Sentinel與hystrix對(duì)比

由上圖顯而易見(jiàn)蝌箍,Sentinel相比于hystrix有更加強(qiáng)大的功能,它支持hystrix不具有的系統(tǒng)負(fù)載保護(hù)與限流以及強(qiáng)大的監(jiān)控API等暴心。更加適配的Dubbo(我們公司用的RCP就是它)的也是Sentinel妓盲;更加重要的一點(diǎn)就是我們技術(shù)部的管理層都是阿里出身,技術(shù)選型也就都是阿里那一套专普,畢竟國(guó)際大廠悯衬,品質(zhì)還是有保證的,哈哈檀夹。

Sentinel名詞解釋

資源

分布式系統(tǒng)中筋粗,限流的資源可以是一個(gè)http接口,也可使是某個(gè)分布式應(yīng)用中的API炸渡;一般我們針對(duì)C端的http接口進(jìn)行限流娜亿,針對(duì)API進(jìn)行熔斷降級(jí)。

限流

限制請(qǐng)求的數(shù)量蚌堵,限制某段時(shí)間內(nèi)的請(qǐng)求總量對(duì)于超出的總量的請(qǐng)求买决,可以直接拒絕,也可以在請(qǐng)求的時(shí)候?qū)φ?qǐng)求分組吼畏,允許特殊請(qǐng)求進(jìn)來(lái)策州,剩下的拒絕,也可以放入消息隊(duì)列宫仗,削峰填谷。
限流的實(shí)現(xiàn)方式:

  • 計(jì)數(shù)器(滑動(dòng)窗口):維護(hù)一個(gè)counter旁仿,每個(gè)時(shí)間段清零藕夫,對(duì)時(shí)間段內(nèi)的請(qǐng)求進(jìn)行計(jì)數(shù),計(jì)數(shù)前判斷counter是否達(dá)到闕值枯冈,如果沒(méi)有就加一毅贮,達(dá)到則拒絕
  • 漏斗算法:一個(gè)固定容量的桶,當(dāng)桶為空的時(shí)候尘奏,不會(huì)漏出水滴滩褥,流入桶的水的速率是任意的,漏出水的速率是固定的炫加,如果流入桶的水超出桶的容量瑰煎,進(jìn)行拒絕
    一般的實(shí)現(xiàn)方法是隊(duì)列铺然,隊(duì)列模擬漏斗,空的時(shí)候不再出隊(duì)酒甸,滿的時(shí)候拒絕
  • 令牌桶算法:和漏斗算法很類似魄健,不過(guò)除了一個(gè)隊(duì)列以外,還加入了一個(gè)中間人插勤,它會(huì)以一定的速率發(fā)放令牌(token)到桶內(nèi)沽瘦,隊(duì)列中的的等待著只有拿到token才能通過(guò)漏斗限制了傳輸速率,而令牌桶在限制的同時(shí)农尖,還允許突然的大流量析恋,即:在大流量到來(lái)的時(shí)候,有足夠空間的情況下(足夠的隊(duì)列和桶內(nèi)有足夠的令牌)盛卡,就允許進(jìn)入
降級(jí)

服務(wù)降級(jí)是從整個(gè)系統(tǒng)的負(fù)荷情況出發(fā)和考慮的助隧,對(duì)某些負(fù)荷會(huì)比較高的情況,為了預(yù)防某些功能(業(yè)務(wù)場(chǎng)景)出現(xiàn)負(fù)荷過(guò)載或者響應(yīng)慢的情況窟扑,在其內(nèi)部暫時(shí)舍棄對(duì)一些非核心的接口和數(shù)據(jù)的請(qǐng)求喇颁,而直接返回一個(gè)提前準(zhǔn)備好的fallback(退路)錯(cuò)誤處理信息。這樣嚎货,雖然提供的是一個(gè)有損的服務(wù)橘霎,但卻保證了整個(gè)系統(tǒng)的穩(wěn)定性和可用性。例如:當(dāng)雙11活動(dòng)時(shí)殖属,把無(wú)關(guān)交易的服務(wù)統(tǒng)統(tǒng)降級(jí)姐叁,如查看歷史訂單、工單等等洗显。

熔斷

在微服務(wù)架構(gòu)中外潜,微服務(wù)是完成一個(gè)單一的業(yè)務(wù)功能,這樣做的好處是可以做到解耦挠唆,每個(gè)微服務(wù)可以獨(dú)立演進(jìn)处窥。但是,一個(gè)應(yīng)用可能會(huì)有多個(gè)微服務(wù)組成玄组,微服務(wù)之間的數(shù)據(jù)交互通過(guò)遠(yuǎn)程過(guò)程調(diào)用完成滔驾。這就帶來(lái)一個(gè)問(wèn)題,假設(shè)微服務(wù)A調(diào)用微服務(wù)B和微服務(wù)C俄讹,微服務(wù)B和微服務(wù)C又調(diào)用其它的微服務(wù)哆致。如果調(diào)用鏈路上某個(gè)微服務(wù)的調(diào)用響應(yīng)時(shí)間過(guò)長(zhǎng)或者不可用,對(duì)微服務(wù)A的調(diào)用就會(huì)占用越來(lái)越多的系統(tǒng)資源患膛,進(jìn)而引起系統(tǒng)崩潰摊阀,所謂的“雪崩效應(yīng)”。
熔斷機(jī)制是應(yīng)對(duì)雪崩效應(yīng)的一種微服務(wù)鏈路保護(hù)機(jī)制。服務(wù)熔斷的作用類似于我們家用的保險(xiǎn)絲胞此,當(dāng)某服務(wù)出現(xiàn)不可用或響應(yīng)超時(shí)的情況時(shí)臣咖,為了防止整個(gè)系統(tǒng)出現(xiàn)雪崩,暫時(shí)停止對(duì)該服務(wù)的調(diào)用豌鹤。熔段解決如下幾個(gè)問(wèn)題:

  • 當(dāng)所依賴的對(duì)象不穩(wěn)定時(shí)亡哄,能夠起到快速失敗的目的;
  • 快速失敗后布疙,能夠根據(jù)一定的算法動(dòng)態(tài)試探所依賴對(duì)象是否恢復(fù)

Sentinel源碼解析

本源碼解析以限流為例蚊惯,降級(jí)具體實(shí)現(xiàn)可自行參考源碼Sentinel采用滑動(dòng)窗口算法來(lái)實(shí)現(xiàn)限流的。限流的直接表現(xiàn)是在執(zhí)行 Entry nodeA = SphU.entry(資源名字) 的時(shí)候拋出 FlowException 異常灵临。FlowException 是BlockException 的子類截型,您可以捕捉 BlockException 來(lái)自定義被限流之后的處理邏輯。

public static void main(String[] args) throws Exception {
        //初始化一個(gè)規(guī)則       
        initFlowRule();
        // 觸發(fā)內(nèi)部初始化
        Entry entry = null;
        try {
            entry = SphU.entry(KEY);
        } catch  (BlockException e){
             //如果被限流了儒溉,那么會(huì)拋出這個(gè)異常
             e.printStackTrace();
        } finally {
            if (entry != null) {
                entry.exit();
            }
        } 
    }

由上可知宦焦,會(huì)先初始化一個(gè)限流規(guī)則,initFlowRule方法中將創(chuàng)建一個(gè)限流規(guī)則FlowRule對(duì)象顿涣,主要限流參數(shù)如下

public class FlowRule extends AbstractRule {
   /**
     * 限流閾值類型波闹,QPS 或線程數(shù),默認(rèn)QPS
   */
    private int grade = RuleConstant.FLOW_GRADE_QPS;

    /**
     *  限流閾值
     */
    private double count;

    /**
     * 根據(jù)調(diào)用關(guān)系選擇策略
     */
    private int strategy = RuleConstant.STRATEGY_DIRECT;

    /**
     * 資源名涛碑,即限流規(guī)則的作用對(duì)象
     * 1-預(yù)熱/冷啟動(dòng)
     * 2-速率限制
     * 3-預(yù)熱/冷啟動(dòng)+速率限制
     */
    private String refResource;

    /**
     * 限流控制行為精堕,默認(rèn)0-直接拒絕
     */
    private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
}

并設(shè)置其相應(yīng)的限流規(guī)則屬性,最后通過(guò)FlowRuleManager.loadRules(rules)加載限流規(guī)則蒲障。

public class FlowRuleManager {
   /**
     * Load {@link FlowRule}s, former rules will be replaced.
     * @param rules new rules to load.
   */
    public static void loadRules(List<FlowRule> rules) {
          currentProperty.updateValue(rules);
    }
}
public class DynamicSentinelProperty<T> implements SentinelProperty<T> {
  @Override
    public boolean updateValue(T newValue) {
        if (isEqual(value, newValue)) {
            return false;
        }
        RecordLog.info("[DynamicSentinelProperty] Config will be updated to: " + newValue);
        value = newValue;
        for (PropertyListener<T> listener : listeners) {
            //監(jiān)聽(tīng)修改限流規(guī)則
            listener.configUpdate(newValue);
        }
        return true;
    }
}

限流規(guī)則初始化之后歹篓,通過(guò)entry= SphU.entry(resource)觸發(fā)內(nèi)部初始化。
從 SphU.entry() 方法往下執(zhí)行會(huì)進(jìn)入到 Sph.entry() 揉阎,Sph的默認(rèn)實(shí)現(xiàn)類是 CtSph,而最終會(huì)進(jìn)入CtSph 的entry 方法

@Override
public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
   //封裝一個(gè)資源對(duì)象
    StringResourceWrapper resource = new StringResourceWrapper(name, type);
    return entry(resource, count, args);
}

通過(guò)我們給定的資源去封裝了一個(gè) StringResourceWrapper 庄撮,然后傳入自己的重載方法,繼而調(diào)用 entryWithPriority(resourceWrapper, count, false, args):

public class CtSph implements Sph {
  //......
  private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        Context context = ContextUtil.getContext();
        if (context instanceof NullContext) {
            return new CtEntry(resourceWrapper, null, context);
        }
        if (context == null) {
            // 使用默認(rèn)上下文
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }
        // 全局開(kāi)關(guān)關(guān)閉毙籽,沒(méi)有規(guī)則檢查洞斯,返回CtEntry對(duì)象
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }
        // 獲取該資源對(duì)應(yīng)的 chain
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
        // 獲取的chain對(duì)象為空,返回CtEntry對(duì)象
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }
        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
            //執(zhí)行chain的 entry方法
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }
}

由上述方法可知坑赡,主要是為了獲取該資源對(duì)應(yīng)的資源處理鏈巡扇,讓我們來(lái)看下slotChain是如何獲取的

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        //這里與dubbo(雙重檢查鎖)中如出一轍,采用本地Map緩存機(jī)制
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        if (chain == null) {
            synchronized (LOCK) {
                chain = chainMap.get(resourceWrapper);
                if (chain == null) {
                    // SlotChain最大閾值:6000
                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                        return null;
                    }
                    // 構(gòu)造SlotChain對(duì)象
                    chain = SlotChainProvider.newSlotChain();
                    //  資源 -->  處理鏈
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                    // 放入本地map緩存
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                }
            }
        }
        return chain;
    }

當(dāng)Map緩存中不存在ProcessorSlotChain實(shí)例垮衷,則具體通過(guò) SlotChainProvider 去構(gòu)造處理鏈

public final class SlotChainProvider {
    //  SlotChain的構(gòu)造器
    private static volatile SlotChainBuilder slotChainBuilder = null;
    /**
     * 構(gòu)造SlotChain對(duì)象
     */
    public static ProcessorSlotChain newSlotChain() {
        // 若slotChainBuilder存在,直接調(diào)用構(gòu)造方法
        if (slotChainBuilder != null) {
            return slotChainBuilder.build();
        }
        // 通過(guò)SpiLoader加載SlotChainBuilder
        slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);
        if (slotChainBuilder == null) {
            // Should not go through here.
            RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
            slotChainBuilder = new DefaultSlotChainBuilder();
        } else {
            RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "
                + slotChainBuilder.getClass().getCanonicalName());
        }
        return slotChainBuilder.build();
    }
}

繼續(xù)讓我們來(lái)看下slotChainBuilder的build方法中做了些什么

public class DefaultSlotChainBuilder implements SlotChainBuilder {
    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        // 獲取ProcessorSlot實(shí)例集合
        List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
        for (ProcessorSlot slot : sortedSlotList) {
            // 過(guò)濾非AbstractLinkedProcessorSlot類型的Slot
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }
            // 加入鏈路中
            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }
        return chain;
    }
}

我們可以看出上述底層源碼是一個(gè)標(biāo)準(zhǔn)的責(zé)任鏈設(shè)計(jì)模式乖坠,通過(guò)查看ProcessorSlot的具體實(shí)現(xiàn)類搀突,我們可以知道該責(zé)任鏈中的具體節(jié)點(diǎn)如圖所示


責(zé)任鏈中的節(jié)點(diǎn)

執(zhí)行對(duì)應(yīng)的這些節(jié)點(diǎn),具有有不同的職責(zé)熊泵,例如:

  • NodeSelectorSlot :收集資源的路徑仰迁,并將這些資源的調(diào)用路徑甸昏,以樹(shù)狀結(jié)構(gòu)存儲(chǔ)起來(lái),用于根據(jù)調(diào)用路徑來(lái)限流降級(jí)徐许;
  • ClusterBuilderSlot :用于存儲(chǔ)資源的統(tǒng)計(jì)信息以及調(diào)用者信息施蜜,例如該資源的 RT, QPS, thread count 等等,這些信息將用作為多維度限流雌隅,降級(jí)的依據(jù)翻默;
  • StatisticSlot :用于記錄、統(tǒng)計(jì)不同緯度的 runtime 指標(biāo)監(jiān)控信息恰起;
  • SystemSlot :通過(guò)系統(tǒng)的狀態(tài)修械,例如 load1 等,來(lái)控制總的入口流量检盼;
  • AuthoritySlot :根據(jù)配置的黑白名單和調(diào)用來(lái)源信息肯污,來(lái)做黑白名單控制;
  • FlowSlot :用于根據(jù)預(yù)設(shè)的限流規(guī)則以及前面 slot 統(tǒng)計(jì)的狀態(tài)吨枉,來(lái)進(jìn)行流量控制蹦渣;
  • DegradeSlot :通過(guò)統(tǒng)計(jì)信息以及預(yù)設(shè)的規(guī)則,來(lái)做熔斷降級(jí)貌亭;
    slot執(zhí)行鏈路可參考如下
    slot執(zhí)行框架

    上面獲得的ProcessorSlotChain的實(shí)例是DefaultProcessorSlotChain柬唯,那么執(zhí)行chain.entry方法,就會(huì)執(zhí)行DefaultProcessorSlotChain.first的entry方法属提,而DefaultProcessorSlotChain.first的entry方法是這樣的:
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
    AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
            throws Throwable {
            super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
        }
        @Override
        public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
            super.fireExit(context, resourceWrapper, count, args);
        }

    };
}
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
    private AbstractLinkedProcessorSlot<?> next = null;
    @Override
    public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        if (next != null) {
            next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
        }
    }
    @SuppressWarnings("unchecked")
    void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
        throws Throwable {
        T t = (T)o;
        entry(context, resourceWrapper, t, count, prioritized, args);
    }

下圖所示是各個(gè)slot對(duì)應(yīng)的entry方法的具體實(shí)現(xiàn)


entry方法具體實(shí)現(xiàn)

我們以StatisticSlot為例权逗,來(lái)看看這些具體實(shí)現(xiàn)類內(nèi)部的邏輯是怎樣的。

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // 傳播到下一個(gè)Slot.
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // 執(zhí)行到這里表示通過(guò)檢查冤议,不被限流
            node.increaseThreadNum();
            // 請(qǐng)求通過(guò)了sentinel的流控等規(guī)則斟薇,記錄當(dāng)次請(qǐng)求
            node.addPassRequest(count);

            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException ex) {
            // 增加線程統(tǒng)計(jì)
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {

            context.getCurEntry().setBlockError(e);

            // 增加QPS統(tǒng)計(jì)
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } catch (Throwable e) {
            // Unexpected internal error, set error to current entry.
            context.getCurEntry().setError(e);

            throw e;
        }
    }

請(qǐng)求通過(guò)了sentinel的流控等規(guī)則,再通過(guò)node.addPassRequest() 將當(dāng)次請(qǐng)求記錄下來(lái)

public void addPassRequest(int count) {
        super.addPassRequest(count);
        this.clusterNode.addPassRequest(count);
    }

addPassRequest方法如下

public void addPassRequest(int count) {
        rollingCounterInSecond.addPass(count);
        rollingCounterInMinute.addPass(count);
    }

addPass方法如下

public void addPass(int count) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addPass(count);
    }

WindowWrap主要屬性如下

public class WindowWrap<T> {
    /**
     * 時(shí)間窗口的長(zhǎng)度
     */
    private final long windowLengthInMs;
    /**
     * 時(shí)間窗口的開(kāi)始時(shí)間恕酸,單位是毫秒
     */
    private long windowStart;
    /**
     * S時(shí)間窗口的內(nèi)容堪滨,在 WindowWrap 中是用泛型表示這個(gè)值的,但實(shí)際上就是 MetricBucket 類
     */
    private T value;
}

我們?cè)倏纯传@取當(dāng)前窗口的方法 data.currentWindow()

public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        int idx = calculateTimeIdx(timeMillis);
        long windowStart = calculateWindowStart(timeMillis);
        while (true) {
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    // Successfully updated, return the created bucket.
                    return window;
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
                return old;
            } else if (windowStart > old.windowStart()) {
                if (updateLock.tryLock()) {
                    try {
                        // Successfully get the update lock, now we reset the bucket.
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                // Should not go through here, as the provided time is already behind.
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }

我們?cè)倩氐?/p>

public void addPass(int count) {
    WindowWrap<MetricBucket> wrap = data.currentWindow();
    wrap.value().addPass(count);
}

獲取到窗口以后通過(guò) wrap.value().addPass(count)增加統(tǒng)計(jì)的 QPS蕊温。而這里的 wrap.value() 得到的是之前提到的 MetricBucket 袱箱,在 Sentinel 中QPS相關(guān)數(shù)據(jù)的統(tǒng)計(jì)結(jié)果是維護(hù)在這個(gè)類的 LongAdder[] 中,最終由這個(gè)指標(biāo)來(lái)與我們實(shí)現(xiàn)設(shè)置好的規(guī)則進(jìn)行匹配义矛,查看是否限流发笔,也就是 StatisticSlot的entry 方法中的。在執(zhí)行StatisticSlot的entry前都要先進(jìn)入到FlowSlot的entry方法進(jìn)行限流過(guò)濾:

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        // 限流校驗(yàn)
        checkFlow(resourceWrapper, context, node, count, prioritized);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

讓我們進(jìn)入checkFlow的內(nèi)部

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider == null || resource == null) {
            return;
        }
        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }

再此處我們拿到了設(shè)置的 FlowRule 凉翻,循環(huán)匹配資源進(jìn)行限流過(guò)濾了讨。這就是Sentinel 能做到限流的原因。

Sentinel配置介紹

我們可以通過(guò)Sentinel的客戶端查看接入了sentinel的各個(gè)系統(tǒng)∏凹疲可針對(duì)系統(tǒng)中的各個(gè)資源設(shè)置相應(yīng)的限流規(guī)則胞谭,如QPS或者線程數(shù);或者設(shè)置相應(yīng)的降級(jí)規(guī)則男杈,如平均RT丈屹,異常比例以及異常數(shù)。

[{
    "resource": "com.aifocus.itemplatform.api.productcenter.CategoryApi",
    "count": 1000,//RT threshold or exception ratio threshold count
    "grade": 0,  //Degrade strategy (0: average RT, 1: exception ratio)
    "passCount": 0,
    "timeWindow": 10 // Degrade recover timeout (in seconds) when degradation occurs
  },
  {
    "resource": "com.aifocus.itemplatform.api.productcenter.CategoryApi",
    "count": 0.5,//RT threshold or exception ratio threshold count
    "grade": 1,// Degrade strategy (0: average RT, 1: exception ratio)
    "passCount": 0,
    "timeWindow": 10 // Degrade recover timeout (in seconds) when degradation occurs
  }]
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末伶棒,一起剝皮案震驚了整個(gè)濱河市旺垒,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌苞冯,老刑警劉巖袖牙,帶你破解...
    沈念sama閱讀 219,539評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異舅锄,居然都是意外死亡鞭达,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,594評(píng)論 3 396
  • 文/潘曉璐 我一進(jìn)店門皇忿,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)畴蹭,“玉大人,你說(shuō)我怎么就攤上這事鳍烁∵督螅” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 165,871評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵幔荒,是天一觀的道長(zhǎng)糊闽。 經(jīng)常有香客問(wèn)我,道長(zhǎng)爹梁,這世上最難降的妖魔是什么右犹? 我笑而不...
    開(kāi)封第一講書人閱讀 58,963評(píng)論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮姚垃,結(jié)果婚禮上念链,老公的妹妹穿的比我還像新娘。我一直安慰自己积糯,他們只是感情好掂墓,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,984評(píng)論 6 393
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著看成,像睡著了一般君编。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上川慌,一...
    開(kāi)封第一講書人閱讀 51,763評(píng)論 1 307
  • 那天啦粹,我揣著相機(jī)與錄音偿荷,去河邊找鬼。 笑死唠椭,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的忍饰。 我是一名探鬼主播贪嫂,決...
    沈念sama閱讀 40,468評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼艾蓝!你這毒婦竟也來(lái)了力崇?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 39,357評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤赢织,失蹤者是張志新(化名)和其女友劉穎亮靴,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體于置,經(jīng)...
    沈念sama閱讀 45,850評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡茧吊,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,002評(píng)論 3 338
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了八毯。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片搓侄。...
    茶點(diǎn)故事閱讀 40,144評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖话速,靈堂內(nèi)的尸體忽然破棺而出讶踪,到底是詐尸還是另有隱情,我是刑警寧澤泊交,帶...
    沈念sama閱讀 35,823評(píng)論 5 346
  • 正文 年R本政府宣布乳讥,位于F島的核電站,受9級(jí)特大地震影響廓俭,放射性物質(zhì)發(fā)生泄漏云石。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,483評(píng)論 3 331
  • 文/蒙蒙 一白指、第九天 我趴在偏房一處隱蔽的房頂上張望留晚。 院中可真熱鬧,春花似錦告嘲、人聲如沸错维。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 32,026評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)赋焕。三九已至,卻和暖如春仰楚,著一層夾襖步出監(jiān)牢的瞬間隆判,已是汗流浹背犬庇。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,150評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留侨嘀,地道東北人臭挽。 一個(gè)月前我還...
    沈念sama閱讀 48,415評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像咬腕,于是被迫代替她去往敵國(guó)和親欢峰。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,092評(píng)論 2 355