為什么要限流
眾所周知呀癣,互聯(lián)網(wǎng)電商的各類活動(dòng)是越來(lái)越多,例如削減男同胞錢包厚度的雙十一弦赖、618项栏、雙十二、各類秒殺活動(dòng)等蹬竖,幾乎所有的互聯(lián)網(wǎng)電商企業(yè)都會(huì)參與其中沼沈,沖擊GMV,會(huì)電商平臺(tái)帶來(lái)巨大的流量與可觀的利潤(rùn)币厕。
作為互聯(lián)網(wǎng)電商中的一員列另,我自己所屬的公司雖然遠(yuǎn)比不上淘寶、京東等旦装,但作為社交電商領(lǐng)域的領(lǐng)頭羊页衙,我們?cè)谏鲜鰧?duì)于電商企業(yè)及其特殊的日子,流量也是不容小覷的阴绢。
嗯店乐,畢竟我們的注冊(cè)用戶數(shù)已經(jīng)超過(guò)了6000W了,
供應(yīng)商已經(jīng)超過(guò)1.5W家了呻袭,去年雙十一單天的GMV也突破了2億RMB眨八,
嘿嘿,強(qiáng)行給自己公司打Call
好了左电,讓我們進(jìn)入這期的主題廉侧。例如在雙十一、或者周年慶等這種特殊的日子篓足,當(dāng)12點(diǎn)剛到那一刻段誊,巨大的用戶流量涌入你們的系統(tǒng),訪問(wèn)量突然劇增時(shí)纷纫,我們是如何保證系統(tǒng)的可用性枕扫、穩(wěn)定性。我們的解決方案主要是通過(guò)Sentinel的限流辱魁、降級(jí)烟瞧、熔斷(增加服務(wù)器數(shù)量就不說(shuō)了)以及消息中間件的削峰(我會(huì)專門寫一期關(guān)于消息中間件的文章诗鸭,到時(shí)候大家可以看看)。沒(méi)錯(cuò)参滴,本期的主角出現(xiàn)了强岸,他就是Sentinel,阿里開(kāi)源的面向分布式服務(wù)框架的輕量級(jí)流量控制框架砾赔。官網(wǎng)如下:https://github.com/alibaba/Sentinel
為什么選擇Sentinel
以下是另一個(gè)開(kāi)源的流量控制框架hystrix與Sentinel的對(duì)比
由上圖顯而易見(jiàn)蝌箍,Sentinel相比于hystrix有更加強(qiáng)大的功能,它支持hystrix不具有的系統(tǒng)負(fù)載保護(hù)與限流以及強(qiáng)大的監(jiān)控API等暴心。更加適配的Dubbo(我們公司用的RCP就是它)的也是Sentinel妓盲;更加重要的一點(diǎn)就是我們技術(shù)部的管理層都是阿里出身,技術(shù)選型也就都是阿里那一套专普,畢竟國(guó)際大廠悯衬,品質(zhì)還是有保證的,哈哈檀夹。
Sentinel名詞解釋
資源
分布式系統(tǒng)中筋粗,限流的資源可以是一個(gè)http接口,也可使是某個(gè)分布式應(yīng)用中的API炸渡;一般我們針對(duì)C端的http接口進(jìn)行限流娜亿,針對(duì)API進(jìn)行熔斷降級(jí)。
限流
限制請(qǐng)求的數(shù)量蚌堵,限制某段時(shí)間內(nèi)的請(qǐng)求總量對(duì)于超出的總量的請(qǐng)求买决,可以直接拒絕,也可以在請(qǐng)求的時(shí)候?qū)φ?qǐng)求分組吼畏,允許特殊請(qǐng)求進(jìn)來(lái)策州,剩下的拒絕,也可以放入消息隊(duì)列宫仗,削峰填谷。
限流的實(shí)現(xiàn)方式:
- 計(jì)數(shù)器(滑動(dòng)窗口):維護(hù)一個(gè)counter旁仿,每個(gè)時(shí)間段清零藕夫,對(duì)時(shí)間段內(nèi)的請(qǐng)求進(jìn)行計(jì)數(shù),計(jì)數(shù)前判斷counter是否達(dá)到闕值枯冈,如果沒(méi)有就加一毅贮,達(dá)到則拒絕
- 漏斗算法:一個(gè)固定容量的桶,當(dāng)桶為空的時(shí)候尘奏,不會(huì)漏出水滴滩褥,流入桶的水的速率是任意的,漏出水的速率是固定的炫加,如果流入桶的水超出桶的容量瑰煎,進(jìn)行拒絕
一般的實(shí)現(xiàn)方法是隊(duì)列铺然,隊(duì)列模擬漏斗,空的時(shí)候不再出隊(duì)酒甸,滿的時(shí)候拒絕 - 令牌桶算法:和漏斗算法很類似魄健,不過(guò)除了一個(gè)隊(duì)列以外,還加入了一個(gè)中間人插勤,它會(huì)以一定的速率發(fā)放令牌(token)到桶內(nèi)沽瘦,隊(duì)列中的的等待著只有拿到token才能通過(guò)漏斗限制了傳輸速率,而令牌桶在限制的同時(shí)农尖,還允許突然的大流量析恋,即:在大流量到來(lái)的時(shí)候,有足夠空間的情況下(足夠的隊(duì)列和桶內(nèi)有足夠的令牌)盛卡,就允許進(jìn)入
降級(jí)
服務(wù)降級(jí)是從整個(gè)系統(tǒng)的負(fù)荷情況出發(fā)和考慮的助隧,對(duì)某些負(fù)荷會(huì)比較高的情況,為了預(yù)防某些功能(業(yè)務(wù)場(chǎng)景)出現(xiàn)負(fù)荷過(guò)載或者響應(yīng)慢的情況窟扑,在其內(nèi)部暫時(shí)舍棄對(duì)一些非核心的接口和數(shù)據(jù)的請(qǐng)求喇颁,而直接返回一個(gè)提前準(zhǔn)備好的fallback(退路)錯(cuò)誤處理信息。這樣嚎货,雖然提供的是一個(gè)有損的服務(wù)橘霎,但卻保證了整個(gè)系統(tǒng)的穩(wěn)定性和可用性。例如:當(dāng)雙11活動(dòng)時(shí)殖属,把無(wú)關(guān)交易的服務(wù)統(tǒng)統(tǒng)降級(jí)姐叁,如查看歷史訂單、工單等等洗显。
熔斷
在微服務(wù)架構(gòu)中外潜,微服務(wù)是完成一個(gè)單一的業(yè)務(wù)功能,這樣做的好處是可以做到解耦挠唆,每個(gè)微服務(wù)可以獨(dú)立演進(jìn)处窥。但是,一個(gè)應(yīng)用可能會(huì)有多個(gè)微服務(wù)組成玄组,微服務(wù)之間的數(shù)據(jù)交互通過(guò)遠(yuǎn)程過(guò)程調(diào)用完成滔驾。這就帶來(lái)一個(gè)問(wèn)題,假設(shè)微服務(wù)A調(diào)用微服務(wù)B和微服務(wù)C俄讹,微服務(wù)B和微服務(wù)C又調(diào)用其它的微服務(wù)哆致。如果調(diào)用鏈路上某個(gè)微服務(wù)的調(diào)用響應(yīng)時(shí)間過(guò)長(zhǎng)或者不可用,對(duì)微服務(wù)A的調(diào)用就會(huì)占用越來(lái)越多的系統(tǒng)資源患膛,進(jìn)而引起系統(tǒng)崩潰摊阀,所謂的“雪崩效應(yīng)”。
熔斷機(jī)制是應(yīng)對(duì)雪崩效應(yīng)的一種微服務(wù)鏈路保護(hù)機(jī)制。服務(wù)熔斷的作用類似于我們家用的保險(xiǎn)絲胞此,當(dāng)某服務(wù)出現(xiàn)不可用或響應(yīng)超時(shí)的情況時(shí)臣咖,為了防止整個(gè)系統(tǒng)出現(xiàn)雪崩,暫時(shí)停止對(duì)該服務(wù)的調(diào)用豌鹤。熔段解決如下幾個(gè)問(wèn)題:
- 當(dāng)所依賴的對(duì)象不穩(wěn)定時(shí)亡哄,能夠起到快速失敗的目的;
- 快速失敗后布疙,能夠根據(jù)一定的算法動(dòng)態(tài)試探所依賴對(duì)象是否恢復(fù)
Sentinel源碼解析
本源碼解析以限流為例蚊惯,降級(jí)具體實(shí)現(xiàn)可自行參考源碼Sentinel采用滑動(dòng)窗口算法來(lái)實(shí)現(xiàn)限流的。限流的直接表現(xiàn)是在執(zhí)行 Entry nodeA = SphU.entry(資源名字) 的時(shí)候拋出 FlowException 異常灵临。FlowException 是BlockException 的子類截型,您可以捕捉 BlockException 來(lái)自定義被限流之后的處理邏輯。
public static void main(String[] args) throws Exception {
//初始化一個(gè)規(guī)則
initFlowRule();
// 觸發(fā)內(nèi)部初始化
Entry entry = null;
try {
entry = SphU.entry(KEY);
} catch (BlockException e){
//如果被限流了儒溉,那么會(huì)拋出這個(gè)異常
e.printStackTrace();
} finally {
if (entry != null) {
entry.exit();
}
}
}
由上可知宦焦,會(huì)先初始化一個(gè)限流規(guī)則,initFlowRule方法中將創(chuàng)建一個(gè)限流規(guī)則FlowRule對(duì)象顿涣,主要限流參數(shù)如下
public class FlowRule extends AbstractRule {
/**
* 限流閾值類型波闹,QPS 或線程數(shù),默認(rèn)QPS
*/
private int grade = RuleConstant.FLOW_GRADE_QPS;
/**
* 限流閾值
*/
private double count;
/**
* 根據(jù)調(diào)用關(guān)系選擇策略
*/
private int strategy = RuleConstant.STRATEGY_DIRECT;
/**
* 資源名涛碑,即限流規(guī)則的作用對(duì)象
* 1-預(yù)熱/冷啟動(dòng)
* 2-速率限制
* 3-預(yù)熱/冷啟動(dòng)+速率限制
*/
private String refResource;
/**
* 限流控制行為精堕,默認(rèn)0-直接拒絕
*/
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
}
并設(shè)置其相應(yīng)的限流規(guī)則屬性,最后通過(guò)FlowRuleManager.loadRules(rules)加載限流規(guī)則蒲障。
public class FlowRuleManager {
/**
* Load {@link FlowRule}s, former rules will be replaced.
* @param rules new rules to load.
*/
public static void loadRules(List<FlowRule> rules) {
currentProperty.updateValue(rules);
}
}
public class DynamicSentinelProperty<T> implements SentinelProperty<T> {
@Override
public boolean updateValue(T newValue) {
if (isEqual(value, newValue)) {
return false;
}
RecordLog.info("[DynamicSentinelProperty] Config will be updated to: " + newValue);
value = newValue;
for (PropertyListener<T> listener : listeners) {
//監(jiān)聽(tīng)修改限流規(guī)則
listener.configUpdate(newValue);
}
return true;
}
}
限流規(guī)則初始化之后歹篓,通過(guò)entry= SphU.entry(resource)觸發(fā)內(nèi)部初始化。
從 SphU.entry() 方法往下執(zhí)行會(huì)進(jìn)入到 Sph.entry() 揉阎,Sph的默認(rèn)實(shí)現(xiàn)類是 CtSph,而最終會(huì)進(jìn)入CtSph 的entry 方法
@Override
public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
//封裝一個(gè)資源對(duì)象
StringResourceWrapper resource = new StringResourceWrapper(name, type);
return entry(resource, count, args);
}
通過(guò)我們給定的資源去封裝了一個(gè) StringResourceWrapper 庄撮,然后傳入自己的重載方法,繼而調(diào)用 entryWithPriority(resourceWrapper, count, false, args):
public class CtSph implements Sph {
//......
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
// 使用默認(rèn)上下文
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// 全局開(kāi)關(guān)關(guān)閉毙籽,沒(méi)有規(guī)則檢查洞斯,返回CtEntry對(duì)象
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
// 獲取該資源對(duì)應(yīng)的 chain
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
// 獲取的chain對(duì)象為空,返回CtEntry對(duì)象
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
//執(zhí)行chain的 entry方法
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
}
由上述方法可知坑赡,主要是為了獲取該資源對(duì)應(yīng)的資源處理鏈巡扇,讓我們來(lái)看下slotChain是如何獲取的
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
//這里與dubbo(雙重檢查鎖)中如出一轍,采用本地Map緩存機(jī)制
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// SlotChain最大閾值:6000
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
// 構(gòu)造SlotChain對(duì)象
chain = SlotChainProvider.newSlotChain();
// 資源 --> 處理鏈
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
// 放入本地map緩存
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
當(dāng)Map緩存中不存在ProcessorSlotChain實(shí)例垮衷,則具體通過(guò) SlotChainProvider 去構(gòu)造處理鏈
public final class SlotChainProvider {
// SlotChain的構(gòu)造器
private static volatile SlotChainBuilder slotChainBuilder = null;
/**
* 構(gòu)造SlotChain對(duì)象
*/
public static ProcessorSlotChain newSlotChain() {
// 若slotChainBuilder存在,直接調(diào)用構(gòu)造方法
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// 通過(guò)SpiLoader加載SlotChainBuilder
slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);
if (slotChainBuilder == null) {
// Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "
+ slotChainBuilder.getClass().getCanonicalName());
}
return slotChainBuilder.build();
}
}
繼續(xù)讓我們來(lái)看下slotChainBuilder的build方法中做了些什么
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
// 獲取ProcessorSlot實(shí)例集合
List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
for (ProcessorSlot slot : sortedSlotList) {
// 過(guò)濾非AbstractLinkedProcessorSlot類型的Slot
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}
// 加入鏈路中
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
}
我們可以看出上述底層源碼是一個(gè)標(biāo)準(zhǔn)的責(zé)任鏈設(shè)計(jì)模式乖坠,通過(guò)查看ProcessorSlot的具體實(shí)現(xiàn)類搀突,我們可以知道該責(zé)任鏈中的具體節(jié)點(diǎn)如圖所示
執(zhí)行對(duì)應(yīng)的這些節(jié)點(diǎn),具有有不同的職責(zé)熊泵,例如:
- NodeSelectorSlot :收集資源的路徑仰迁,并將這些資源的調(diào)用路徑甸昏,以樹(shù)狀結(jié)構(gòu)存儲(chǔ)起來(lái),用于根據(jù)調(diào)用路徑來(lái)限流降級(jí)徐许;
- ClusterBuilderSlot :用于存儲(chǔ)資源的統(tǒng)計(jì)信息以及調(diào)用者信息施蜜,例如該資源的 RT, QPS, thread count 等等,這些信息將用作為多維度限流雌隅,降級(jí)的依據(jù)翻默;
- StatisticSlot :用于記錄、統(tǒng)計(jì)不同緯度的 runtime 指標(biāo)監(jiān)控信息恰起;
- SystemSlot :通過(guò)系統(tǒng)的狀態(tài)修械,例如 load1 等,來(lái)控制總的入口流量检盼;
- AuthoritySlot :根據(jù)配置的黑白名單和調(diào)用來(lái)源信息肯污,來(lái)做黑白名單控制;
- FlowSlot :用于根據(jù)預(yù)設(shè)的限流規(guī)則以及前面 slot 統(tǒng)計(jì)的狀態(tài)吨枉,來(lái)進(jìn)行流量控制蹦渣;
- DegradeSlot :通過(guò)統(tǒng)計(jì)信息以及預(yù)設(shè)的規(guī)則,來(lái)做熔斷降級(jí)貌亭;
slot執(zhí)行鏈路可參考如下
slot執(zhí)行框架
上面獲得的ProcessorSlotChain的實(shí)例是DefaultProcessorSlotChain柬唯,那么執(zhí)行chain.entry方法,就會(huì)執(zhí)行DefaultProcessorSlotChain.first的entry方法属提,而DefaultProcessorSlotChain.first的entry方法是這樣的:
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
super.fireExit(context, resourceWrapper, count, args);
}
};
}
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
private AbstractLinkedProcessorSlot<?> next = null;
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
@SuppressWarnings("unchecked")
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
throws Throwable {
T t = (T)o;
entry(context, resourceWrapper, t, count, prioritized, args);
}
下圖所示是各個(gè)slot對(duì)應(yīng)的entry方法的具體實(shí)現(xiàn)
我們以StatisticSlot為例权逗,來(lái)看看這些具體實(shí)現(xiàn)類內(nèi)部的邏輯是怎樣的。
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// 傳播到下一個(gè)Slot.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 執(zhí)行到這里表示通過(guò)檢查冤议,不被限流
node.increaseThreadNum();
// 請(qǐng)求通過(guò)了sentinel的流控等規(guī)則斟薇,記錄當(dāng)次請(qǐng)求
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
// 增加線程統(tǒng)計(jì)
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
context.getCurEntry().setBlockError(e);
// 增加QPS統(tǒng)計(jì)
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
請(qǐng)求通過(guò)了sentinel的流控等規(guī)則,再通過(guò)node.addPassRequest() 將當(dāng)次請(qǐng)求記錄下來(lái)
public void addPassRequest(int count) {
super.addPassRequest(count);
this.clusterNode.addPassRequest(count);
}
addPassRequest方法如下
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
addPass方法如下
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
WindowWrap主要屬性如下
public class WindowWrap<T> {
/**
* 時(shí)間窗口的長(zhǎng)度
*/
private final long windowLengthInMs;
/**
* 時(shí)間窗口的開(kāi)始時(shí)間恕酸,單位是毫秒
*/
private long windowStart;
/**
* S時(shí)間窗口的內(nèi)容堪滨,在 WindowWrap 中是用泛型表示這個(gè)值的,但實(shí)際上就是 MetricBucket 類
*/
private T value;
}
我們?cè)倏纯传@取當(dāng)前窗口的方法 data.currentWindow()
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
long windowStart = calculateWindowStart(timeMillis);
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
return old;
} else if (windowStart > old.windowStart()) {
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
我們?cè)倩氐?/p>
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
獲取到窗口以后通過(guò) wrap.value().addPass(count)增加統(tǒng)計(jì)的 QPS蕊温。而這里的 wrap.value() 得到的是之前提到的 MetricBucket 袱箱,在 Sentinel 中QPS相關(guān)數(shù)據(jù)的統(tǒng)計(jì)結(jié)果是維護(hù)在這個(gè)類的 LongAdder[] 中,最終由這個(gè)指標(biāo)來(lái)與我們實(shí)現(xiàn)設(shè)置好的規(guī)則進(jìn)行匹配义矛,查看是否限流发笔,也就是 StatisticSlot的entry 方法中的。在執(zhí)行StatisticSlot的entry前都要先進(jìn)入到FlowSlot的entry方法進(jìn)行限流過(guò)濾:
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// 限流校驗(yàn)
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
讓我們進(jìn)入checkFlow的內(nèi)部
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
再此處我們拿到了設(shè)置的 FlowRule 凉翻,循環(huán)匹配資源進(jìn)行限流過(guò)濾了讨。這就是Sentinel 能做到限流的原因。
Sentinel配置介紹
我們可以通過(guò)Sentinel的客戶端查看接入了sentinel的各個(gè)系統(tǒng)∏凹疲可針對(duì)系統(tǒng)中的各個(gè)資源設(shè)置相應(yīng)的限流規(guī)則胞谭,如QPS或者線程數(shù);或者設(shè)置相應(yīng)的降級(jí)規(guī)則男杈,如平均RT丈屹,異常比例以及異常數(shù)。
[{
"resource": "com.aifocus.itemplatform.api.productcenter.CategoryApi",
"count": 1000,//RT threshold or exception ratio threshold count
"grade": 0, //Degrade strategy (0: average RT, 1: exception ratio)
"passCount": 0,
"timeWindow": 10 // Degrade recover timeout (in seconds) when degradation occurs
},
{
"resource": "com.aifocus.itemplatform.api.productcenter.CategoryApi",
"count": 0.5,//RT threshold or exception ratio threshold count
"grade": 1,// Degrade strategy (0: average RT, 1: exception ratio)
"passCount": 0,
"timeWindow": 10 // Degrade recover timeout (in seconds) when degradation occurs
}]