寫在前面
sentinel是阿里巴巴開源的流量整形(限流黎泣、熔斷)框架,目前在github擁有15k+的star桑嘶,sentinel以流量為切入點(diǎn),從流量控制躬充、熔斷降級(jí)不翩、系統(tǒng)負(fù)載保護(hù)等多個(gè)維度保護(hù)服務(wù)的穩(wěn)定性。
我們以sentinel的主流程入手麻裳,分析sentinel是怎么搜集流量指標(biāo)口蝠,完成流量整形的。
首先我們先看一個(gè)sentinel的簡單使用demo津坑,只需要調(diào)用SphU.entry獲取到entry妙蔗,然后在完成業(yè)務(wù)方法之后調(diào)用entry.exit即可。
Entry entry = null;
// 務(wù)必保證 finally 會(huì)被執(zhí)行
try {
// 資源名可使用任意有業(yè)務(wù)語義的字符串疆瑰,注意數(shù)目不能太多(超過 1K)眉反,超出幾千請(qǐng)作為參數(shù)傳入而不要直接作為資源名
// EntryType 代表流量類型(inbound/outbound),其中系統(tǒng)規(guī)則只對(duì) IN 類型的埋點(diǎn)生效
entry = SphU.entry("自定義資源名",EntryType.IN);
// 被保護(hù)的業(yè)務(wù)邏輯
// do something...
} catch (BlockException ex) {
// 資源訪問阻止穆役,被限流或被降級(jí)
// 進(jìn)行相應(yīng)的處理操作
} catch (Exception ex) {
// 若需要配置降級(jí)規(guī)則寸五,需要通過這種方式記錄業(yè)務(wù)異常
Tracer.traceEntry(ex, entry);
} finally {
// 務(wù)必保證 exit,務(wù)必保證每個(gè) entry 與 exit 配對(duì)
if (entry != null) {
entry.exit();
}
}
SphU.entry會(huì)調(diào)用Env.sph.entry耿币,將name和流量流向封裝成StringResourceWrapper梳杏,然后繼續(xù)調(diào)用entry處理。
public static Entry entry(String name, EntryType trafficType) throws BlockException {
return Env.sph.entry(name, trafficType, 1, OBJECTS0);
}
@Override
public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
StringResourceWrapper resource = new StringResourceWrapper(name, type);
return entry(resource, count, args);
}
進(jìn)入CtSph的entry方法,最終來到entryWithPriority十性,調(diào)用InternalContextUtil.internalEnter初始化ThreadLocal的Context叛溢,然后調(diào)用lookProcessChain初始化責(zé)任鏈,最終調(diào)用chain.entry進(jìn)入責(zé)任鏈進(jìn)行處理劲适。
public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
return entryWithPriority(resourceWrapper, count, false, args);
}
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
// so here init the entry only. No rule checking will be done.
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
// Using default context.
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// Global switch is close, no rule checking will do.
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
/*
* Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
* so no rule checking will be done.
*/
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
InternalContextUtil.internalEnter會(huì)調(diào)用trueEnter方法楷掉,主要是生成DefaultNode到contextNameNodeMap,然后生成Context設(shè)置到contextHolder的過程霞势。
static Context internalEnter(String name) {
return trueEnter(name, "");
}
protected static Context trueEnter(String name, String origin) {
Context context = contextHolder.get();
if (context == null) {
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {
if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
LOCK.lock();
try {
node = contextNameNodeMap.get(name);
if (node == null) {
if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// Add entrance node.
Constants.ROOT.addChild(node);
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
}
} finally {
LOCK.unlock();
}
}
}
context = new Context(node, name);
context.setOrigin(origin);
contextHolder.set(context);
}
return context;
}
lookProcessChain已經(jīng)做過優(yōu)化烹植,支持spi加載自定義的責(zé)任鏈bulider,如果沒有定義則使用默認(rèn)的DefaultSlotChainBuilder進(jìn)行加載愕贡。默認(rèn)加載的slot和順序可見鎮(zhèn)樓圖草雕,不再細(xì)說。
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// Resolve the slot chain builder SPI.
slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);
if (slotChainBuilder == null) {
// Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
slotChainBuilder.getClass().getCanonicalName());
}
return slotChainBuilder.build();
}
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
// Note: the instances of ProcessorSlot should be different, since they are not stateless.
List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
for (ProcessorSlot slot : sortedSlotList) {
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
最后來到重頭戲chain.entry進(jìn)入責(zé)任鏈進(jìn)行處理颂鸿,下面會(huì)按照順序分別對(duì)每個(gè)處理器進(jìn)行分析促绵。
首先來到NodeSelectorSlot,主要是獲取到name對(duì)應(yīng)的DefaultNode并緩存起來嘴纺,設(shè)置為context的當(dāng)前節(jié)點(diǎn)败晴,然后通知下一個(gè)節(jié)點(diǎn)。
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
下一個(gè)節(jié)點(diǎn)是ClusterBuilderSlot栽渴,繼續(xù)對(duì)DefaultNode設(shè)置ClusterNode與OriginNode尖坤,然后通知下一節(jié)點(diǎn)。
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// Create the cluster node.
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
node.setClusterNode(clusterNode);
/*
* if context origin is set, we should get or create a new {@link Node} of
* the specific origin.
*/
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
下一個(gè)節(jié)點(diǎn)是LogSlot闲擦,只是單純的打印日志慢味,不再細(xì)說。
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)
throws Throwable {
try {
fireEntry(context, resourceWrapper, obj, count, prioritized, args);
} catch (BlockException e) {
EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
context.getOrigin(), count);
throw e;
} catch (Throwable e) {
RecordLog.warn("Unexpected entry exception", e);
}
}
下一個(gè)節(jié)點(diǎn)是StatisticSlot墅冷,是一個(gè)后置節(jié)點(diǎn)纯路,先通知下一個(gè)節(jié)點(diǎn)處理完后,
1.如果沒有報(bào)錯(cuò)寞忿,則對(duì)node驰唬、clusterNode、originNode腔彰、ENTRY_NODE的線程數(shù)叫编、通過請(qǐng)求數(shù)進(jìn)行增加。
2.如果報(bào)錯(cuò)是PriorityWaitException霹抛,則只對(duì)線程數(shù)進(jìn)行增加搓逾。
3.如果報(bào)錯(cuò)是BlockException,設(shè)置報(bào)錯(cuò)到node杯拐,然后對(duì)阻擋請(qǐng)求數(shù)進(jìn)行增加霞篡。
4.如果是其他報(bào)錯(cuò)世蔗,設(shè)置報(bào)錯(cuò)到node即可。
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
下一個(gè)節(jié)點(diǎn)是FlowSlot寇损,這個(gè)節(jié)點(diǎn)就是重要的限流處理節(jié)點(diǎn)凸郑,進(jìn)入此節(jié)點(diǎn)是調(diào)用checker.checkFlow進(jìn)行限流處理裳食。
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
來到FlowRuleChecker的checkFlow方法矛市,調(diào)用ruleProvider.apply獲取到資源對(duì)應(yīng)的FlowRule列表,然后遍歷FlowRule調(diào)用canPassCheck校驗(yàn)限流規(guī)則诲祸。
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
canPassCheck會(huì)根據(jù)rule的限流模式浊吏,選擇集群限流或者本地限流,這里分別作出分析救氯。
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
passLocalCheck是本地限流的入口找田,首先會(huì)調(diào)用selectNodeByRequesterAndStrategy選出限流的node,然后調(diào)用canPass進(jìn)行校驗(yàn)着憨。
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
selectNodeByRequesterAndStrategy會(huì)根據(jù)以下規(guī)則選中node墩衙。
1.strategy是STRATEGY_DIRECT。
1.1.limitApp不是other和default甲抖,并且等于orgin時(shí)漆改,選擇originNode。
1.2.limitApp是other准谚,選擇originNode挫剑。
1.3.limitApp是default,選擇clusterNode柱衔。
2.strategy是STRATEGY_RELATE樊破,選擇clusterNode。
3.strategy是STRATEGY_CHAIN唆铐,選擇node哲戚。
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
// The limit app should not be empty.
String limitApp = rule.getLimitApp();
int strategy = rule.getStrategy();
String origin = context.getOrigin();
if (limitApp.equals(origin) && filterOrigin(origin)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Matches limit origin, return origin statistic node.
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Return the cluster node.
return node.getClusterNode();
}
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
}
return null;
}
static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
String refResource = rule.getRefResource();
int strategy = rule.getStrategy();
if (StringUtil.isEmpty(refResource)) {
return null;
}
if (strategy == RuleConstant.STRATEGY_RELATE) {
return ClusterBuilderSlot.getClusterNode(refResource);
}
if (strategy == RuleConstant.STRATEGY_CHAIN) {
if (!refResource.equals(context.getName())) {
return null;
}
return node;
}
// No node.
return null;
}
選擇好對(duì)應(yīng)的node后就是調(diào)用canPass校驗(yàn)限流規(guī)則,目前sentinel有三種本地限流規(guī)則:普通限流艾岂、勻速限流顺少、冷啟動(dòng)限流。
普通限流的實(shí)現(xiàn)是DefaultController澳盐,就是統(tǒng)計(jì)當(dāng)前的線程數(shù)或者qps加上需要通過的數(shù)量有沒有大于限定值祈纯,小于等于則直接通過,否則阻擋叼耙。
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
勻速限流的實(shí)現(xiàn)是RateLimiterController腕窥,使用了AtomicLong保證了latestPassedTime的原子增長,因此停頓的時(shí)間是根據(jù)latestPassedTime-currentTime計(jì)算出來筛婉,得到一個(gè)勻速的睡眠時(shí)間簇爆。
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
// Reject when count is less or equal than 0.
// Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
if (count <= 0) {
return false;
}
long currentTime = TimeUtil.currentTimeMillis();
// Calculate the interval between every two requests.
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// Expected pass time of this request.
long expectedTime = costTime + latestPassedTime.get();
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
// Calculate the time to wait.
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
冷啟動(dòng)限流的實(shí)現(xiàn)是WarmUpController癞松,是sentinel中最難懂的限流方式,其實(shí)不太需要關(guān)注這些復(fù)雜公式的計(jì)算入蛆,也可以得出冷啟動(dòng)的限流思路:
1.當(dāng)qps已經(jīng)達(dá)到溫?zé)釥顟B(tài)時(shí)响蓉,按照正常的添加令牌消耗令牌即可。
2.當(dāng)qps處于過冷狀態(tài)時(shí)哨毁,會(huì)添加令牌使得算法繼續(xù)降溫枫甲。
3.當(dāng)qps逐漸回升,大于過冷的邊界qps值時(shí)扼褪,不再添加令牌想幻,慢慢消耗令牌使得逐漸增大單位時(shí)間可通過的請(qǐng)求數(shù),讓算法繼續(xù)回溫话浇。
總結(jié)出一點(diǎn)脏毯,可通過的請(qǐng)求數(shù)跟令牌桶剩余令牌數(shù)量成反比,以達(dá)到冷啟動(dòng)的作用幔崖。
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}
this.count = count;
this.coldFactor = coldFactor;
// thresholdPermits = 0.5 * warmupPeriod / stableInterval.
// warningToken = 100;
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
// / maxPermits = thresholdPermits + 2 * warmupPeriod /
// (stableInterval + coldInterval)
// maxToken = 200
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
// slope
// slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits
// - thresholdPermits);
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long passQps = (long) node.passQps();
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
// 開始計(jì)算它的斜率
// 如果進(jìn)入了警戒線食店,開始調(diào)整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 消耗的速度要比warning快,但是要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
if (currentTime <= oldLastFillTime) {
return;
}
long oldValue = storedTokens.get();
long newValue = coolDownTokens(currentTime, passQps);
if (storedTokens.compareAndSet(oldValue, newValue)) {
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;
// 添加令牌的判斷前提條件:
// 當(dāng)令牌的消耗程度遠(yuǎn)遠(yuǎn)低于警戒線的時(shí)候
if (oldValue < warningToken) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
return Math.min(newValue, maxToken);
}
接下來是集群限流赏寇,passClusterCheck是集群限流的入口吉嫩,會(huì)根據(jù)flowId調(diào)用clusterSerivce獲取指定數(shù)量的token,然后根據(jù)其結(jié)果判斷是否通過蹋订、睡眠率挣、降級(jí)到本地限流、阻擋露戒。
private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
try {
TokenService clusterService = pickClusterService();
if (clusterService == null) {
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
long flowId = rule.getClusterConfig().getFlowId();
TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
// If client is absent, then fallback to local mode.
} catch (Throwable ex) {
RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
}
// Fallback to local flow control when token client or server for this rule is not available.
// If fallback is not enabled, then directly pass.
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
private static boolean applyTokenResult(/*@NonNull*/ TokenResult result, FlowRule rule, Context context,
DefaultNode node,
int acquireCount, boolean prioritized) {
switch (result.getStatus()) {
case TokenResultStatus.OK:
return true;
case TokenResultStatus.SHOULD_WAIT:
// Wait for next tick.
try {
Thread.sleep(result.getWaitInMs());
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
case TokenResultStatus.NO_RULE_EXISTS:
case TokenResultStatus.BAD_REQUEST:
case TokenResultStatus.FAIL:
case TokenResultStatus.TOO_MANY_REQUEST:
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
case TokenResultStatus.BLOCKED:
default:
return false;
}
}
接下來看一下ClusterService的處理椒功,會(huì)根據(jù)ruleId獲取到對(duì)應(yīng)的FlowRule,然后調(diào)用ClusterFlowChecker.acquireClusterToken獲取結(jié)果返回智什。ClusterFlowChecker.acquireClusterToken的處理方式跟普通限流是一樣的动漾,只是會(huì)將集群的請(qǐng)求都集中在一個(gè)service中處理,來達(dá)到集群限流的效果荠锭,不再細(xì)說旱眯。
@Override
public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
if (notValidRequest(ruleId, acquireCount)) {
return badRequest();
}
// The rule should be valid.
FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
if (rule == null) {
return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
}
return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized);
}
static TokenResult acquireClusterToken(/*@Valid*/ FlowRule rule, int acquireCount, boolean prioritized) {
Long id = rule.getClusterConfig().getFlowId();
if (!allowProceed(id)) {
return new TokenResult(TokenResultStatus.TOO_MANY_REQUEST);
}
ClusterMetric metric = ClusterMetricStatistics.getMetric(id);
if (metric == null) {
return new TokenResult(TokenResultStatus.FAIL);
}
double latestQps = metric.getAvg(ClusterFlowEvent.PASS);
double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.getExceedCount();
double nextRemaining = globalThreshold - latestQps - acquireCount;
if (nextRemaining >= 0) {
// TODO: checking logic and metric operation should be separated.
metric.add(ClusterFlowEvent.PASS, acquireCount);
metric.add(ClusterFlowEvent.PASS_REQUEST, 1);
if (prioritized) {
// Add prioritized pass.
metric.add(ClusterFlowEvent.OCCUPIED_PASS, acquireCount);
}
// Remaining count is cut down to a smaller integer.
return new TokenResult(TokenResultStatus.OK)
.setRemaining((int) nextRemaining)
.setWaitInMs(0);
} else {
if (prioritized) {
// Try to occupy incoming buckets.
double occupyAvg = metric.getAvg(ClusterFlowEvent.WAITING);
if (occupyAvg <= ClusterServerConfigManager.getMaxOccupyRatio() * globalThreshold) {
int waitInMs = metric.tryOccupyNext(ClusterFlowEvent.PASS, acquireCount, globalThreshold);
// waitInMs > 0 indicates pre-occupy incoming buckets successfully.
if (waitInMs > 0) {
ClusterServerStatLogUtil.log("flow|waiting|" + id);
return new TokenResult(TokenResultStatus.SHOULD_WAIT)
.setRemaining(0)
.setWaitInMs(waitInMs);
}
// Or else occupy failed, should be blocked.
}
}
// Blocked.
metric.add(ClusterFlowEvent.BLOCK, acquireCount);
metric.add(ClusterFlowEvent.BLOCK_REQUEST, 1);
ClusterServerStatLogUtil.log("flow|block|" + id, acquireCount);
ClusterServerStatLogUtil.log("flow|block_request|" + id, 1);
if (prioritized) {
// Add prioritized block.
metric.add(ClusterFlowEvent.OCCUPIED_BLOCK, acquireCount);
ClusterServerStatLogUtil.log("flow|occupied_block|" + id, 1);
}
return blockedResult();
}
}
FlowSlot的下一個(gè)節(jié)點(diǎn)是DegradeSlot,是熔斷處理器证九,進(jìn)入時(shí)會(huì)調(diào)用performChecking删豺,進(jìn)而獲取到CircuitBreaker列表,然后調(diào)用其tryPass校驗(yàn)是否熔斷愧怜。
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
performChecking(context, resourceWrapper);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void performChecking(Context context, ResourceWrapper r) throws BlockException {
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
for (CircuitBreaker cb : circuitBreakers) {
if (!cb.tryPass(context)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}
來到AbstractCircuitBreaker的tryPass方法呀页,主要是判斷熔斷器狀態(tài),如果是close直接放行拥坛,如果是open則會(huì)校驗(yàn)是否到達(dá)開啟halfopen的時(shí)間蓬蝶,如果成功將狀態(tài)cas成halfopen則繼續(xù)放行尘分,其他情況都是阻攔。
@Override
public boolean tryPass(Context context) {
// Template implementation.
if (currentState.get() == State.CLOSED) {
return true;
}
if (currentState.get() == State.OPEN) {
// For half-open state we allow a request for probing.
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
return false;
}
protected boolean fromOpenToHalfOpen(Context context) {
if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
notifyObservers(State.OPEN, State.HALF_OPEN, null);
Entry entry = context.getCurEntry();
entry.whenTerminate(new BiConsumer<Context, Entry>() {
@Override
public void accept(Context context, Entry entry) {
// Note: This works as a temporary workaround for https://github.com/alibaba/Sentinel/issues/1638
// Without the hook, the circuit breaker won't recover from half-open state in some circumstances
// when the request is actually blocked by upcoming rules (not only degrade rules).
if (entry.getBlockError() != null) {
// Fallback to OPEN due to detecting request is blocked
currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
}
}
});
return true;
}
return false;
}
那怎么將熔斷器的狀態(tài)從close變成open呢丸氛?怎么將halfopen變成close或者open呢培愁?sentinel由兩種熔斷器:錯(cuò)誤數(shù)熔斷器ExceptionCircuitBreaker、響應(yīng)時(shí)間熔斷器ResponseTimeCircuitBreaker缓窜,都分析一遍定续。
當(dāng)業(yè)務(wù)方法報(bào)錯(cuò)時(shí)會(huì)調(diào)用Tracer.traceEntry將報(bào)錯(cuò)設(shè)置到entry上。
public static void traceEntry(Throwable e, Entry entry) {
if (!shouldTrace(e)) {
return;
}
traceEntryInternal(e, entry);
}
private static void traceEntryInternal(/*@NeedToTrace*/ Throwable e, Entry entry) {
if (entry == null) {
return;
}
entry.setError(e);
}
當(dāng)調(diào)用entry.exit時(shí)雹洗,會(huì)隨著責(zé)任鏈來到DegradeSlot的exit方法香罐,會(huì)遍歷熔斷器列表調(diào)用其onRequestComplete方法卧波。
@Override
public void exit(Context context, ResourceWrapper r, int count, Object... args) {
Entry curEntry = context.getCurEntry();
if (curEntry.getBlockError() != null) {
fireExit(context, r, count, args);
return;
}
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
fireExit(context, r, count, args);
return;
}
if (curEntry.getBlockError() == null) {
// passed request
for (CircuitBreaker circuitBreaker : circuitBreakers) {
circuitBreaker.onRequestComplete(context);
}
}
fireExit(context, r, count, args);
}
ExceptionCircuitBreaker的onRequestComplete會(huì)記錄錯(cuò)誤數(shù)和總請(qǐng)求數(shù)时肿,然后調(diào)用handleStateChangeWhenThresholdExceeded繼續(xù)處理。
1.當(dāng)前狀態(tài)是open時(shí)港粱,不應(yīng)該由熔斷器底層去轉(zhuǎn)換狀態(tài)螃成,直接退出。
2.當(dāng)前狀態(tài)是halfopen時(shí)查坪,如果沒有報(bào)錯(cuò)寸宏,則將halfopen變成close,否則將halfopen變成open偿曙。
3.當(dāng)前狀態(tài)時(shí)close時(shí)氮凝,則根據(jù)是否總請(qǐng)求達(dá)到了最低請(qǐng)求數(shù),如果達(dá)到了話再比較錯(cuò)誤數(shù)/錯(cuò)誤比例是否大于限定值望忆,如果大于則直接轉(zhuǎn)換成open罩阵。
@Override
public void onRequestComplete(Context context) {
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
Throwable error = entry.getError();
SimpleErrorCounter counter = stat.currentWindow().value();
if (error != null) {
counter.getErrorCount().add(1);
}
counter.getTotalCount().add(1);
handleStateChangeWhenThresholdExceeded(error);
}
private void handleStateChangeWhenThresholdExceeded(Throwable error) {
if (currentState.get() == State.OPEN) {
return;
}
if (currentState.get() == State.HALF_OPEN) {
// In detecting request
if (error == null) {
fromHalfOpenToClose();
} else {
fromHalfOpenToOpen(1.0d);
}
return;
}
List<SimpleErrorCounter> counters = stat.values();
long errCount = 0;
long totalCount = 0;
for (SimpleErrorCounter counter : counters) {
errCount += counter.errorCount.sum();
totalCount += counter.totalCount.sum();
}
if (totalCount < minRequestAmount) {
return;
}
double curCount = errCount;
if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
// Use errorRatio
curCount = errCount * 1.0d / totalCount;
}
if (curCount > threshold) {
transformToOpen(curCount);
}
}
ExceptionCircuitBreaker的onRequestComplete會(huì)記錄慢響應(yīng)數(shù)和總請(qǐng)求數(shù),然后調(diào)用handleStateChangeWhenThresholdExceeded繼續(xù)處理启摄。
1.當(dāng)前狀態(tài)是open時(shí)稿壁,不應(yīng)該由熔斷器底層去轉(zhuǎn)換狀態(tài),直接退出歉备。
2.當(dāng)前狀態(tài)是halfopen時(shí)傅是,如果當(dāng)前響應(yīng)時(shí)間小于限定值,則將halfopen變成close蕾羊,否則將halfopen變成open喧笔。
3.當(dāng)前狀態(tài)時(shí)close時(shí),則根據(jù)是否總請(qǐng)求達(dá)到了最低請(qǐng)求數(shù)龟再,如果達(dá)到了話再比較慢請(qǐng)求數(shù)/慢請(qǐng)求比例是否大于限定值书闸,如果大于則直接轉(zhuǎn)換成open。
@Override
public void onRequestComplete(Context context) {
SlowRequestCounter counter = slidingCounter.currentWindow().value();
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
long completeTime = entry.getCompleteTimestamp();
if (completeTime <= 0) {
completeTime = TimeUtil.currentTimeMillis();
}
long rt = completeTime - entry.getCreateTimestamp();
if (rt > maxAllowedRt) {
counter.slowCount.add(1);
}
counter.totalCount.add(1);
handleStateChangeWhenThresholdExceeded(rt);
}
private void handleStateChangeWhenThresholdExceeded(long rt) {
if (currentState.get() == State.OPEN) {
return;
}
if (currentState.get() == State.HALF_OPEN) {
// In detecting request
// TODO: improve logic for half-open recovery
if (rt > maxAllowedRt) {
fromHalfOpenToOpen(1.0d);
} else {
fromHalfOpenToClose();
}
return;
}
List<SlowRequestCounter> counters = slidingCounter.values();
long slowCount = 0;
long totalCount = 0;
for (SlowRequestCounter counter : counters) {
slowCount += counter.slowCount.sum();
totalCount += counter.totalCount.sum();
}
if (totalCount < minRequestAmount) {
return;
}
double currentRatio = slowCount * 1.0d / totalCount;
if (currentRatio > maxSlowRequestRatio) {
transformToOpen(currentRatio);
}
if (Double.compare(currentRatio, maxSlowRequestRatio) == 0 &&
Double.compare(maxSlowRequestRatio, SLOW_REQUEST_RATIO_MAX_VALUE) == 0) {
transformToOpen(currentRatio);
}
}
下一個(gè)節(jié)點(diǎn)是AuthoritySlot吸申,權(quán)限控制器梗劫,這個(gè)控制器就是看當(dāng)前origin是否被允許進(jìn)入請(qǐng)求享甸,不允許則報(bào)錯(cuò),不再細(xì)說梳侨。
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
checkBlackWhiteAuthority(resourceWrapper, context);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
if (authorityRules == null) {
return;
}
Set<AuthorityRule> rules = authorityRules.get(resource.getName());
if (rules == null) {
return;
}
for (AuthorityRule rule : rules) {
if (!AuthorityRuleChecker.passCheck(rule, context)) {
throw new AuthorityException(context.getOrigin(), rule);
}
}
}
static boolean passCheck(AuthorityRule rule, Context context) {
String requester = context.getOrigin();
// Empty origin or empty limitApp will pass.
if (StringUtil.isEmpty(requester) || StringUtil.isEmpty(rule.getLimitApp())) {
return true;
}
// Do exact match with origin name.
int pos = rule.getLimitApp().indexOf(requester);
boolean contain = pos > -1;
if (contain) {
boolean exactlyMatch = false;
String[] appArray = rule.getLimitApp().split(",");
for (String app : appArray) {
if (requester.equals(app)) {
exactlyMatch = true;
break;
}
}
contain = exactlyMatch;
}
int strategy = rule.getStrategy();
if (strategy == RuleConstant.AUTHORITY_BLACK && contain) {
return false;
}
if (strategy == RuleConstant.AUTHORITY_WHITE && !contain) {
return false;
}
return true;
}
終于來到最后一個(gè)節(jié)點(diǎn)SystemSlot了蛉威,此節(jié)點(diǎn)是自適應(yīng)處理器,主要是根據(jù)系統(tǒng)自身負(fù)載(qps走哺、最大線程數(shù)蚯嫌、最高響應(yīng)時(shí)間、cpu使用率丙躏、系統(tǒng)bbr)來判斷請(qǐng)求是否能夠通過择示,保證系統(tǒng)處于一個(gè)能穩(wěn)定處理請(qǐng)求的安全狀態(tài)。
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
SystemRuleManager.checkSystem(resourceWrapper);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
if (resourceWrapper == null) {
return;
}
// Ensure the checking switch is on.
if (!checkSystemStatus.get()) {
return;
}
// for inbound traffic only
if (resourceWrapper.getEntryType() != EntryType.IN) {
return;
}
// total qps
double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
if (currentQps > qps) {
throw new SystemBlockException(resourceWrapper.getName(), "qps");
}
// total thread
int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
if (currentThread > maxThread) {
throw new SystemBlockException(resourceWrapper.getName(), "thread");
}
double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
if (rt > maxRt) {
throw new SystemBlockException(resourceWrapper.getName(), "rt");
}
// load. BBR algorithm.
if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
if (!checkBbr(currentThread)) {
throw new SystemBlockException(resourceWrapper.getName(), "load");
}
}
// cpu usage
if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
throw new SystemBlockException(resourceWrapper.getName(), "cpu");
}
}
尤其值得一提的是bbr算法晒旅,作者參考了tcp bbr的設(shè)計(jì)栅盲,通過最大的qps和最小的響應(yīng)時(shí)間動(dòng)態(tài)計(jì)算出可進(jìn)入的線程數(shù),而不是一個(gè)粗暴的固定可進(jìn)入的線程數(shù)废恋,為什么能通過這兩個(gè)值就能計(jì)算出可進(jìn)入的線程數(shù)谈秫?可以網(wǎng)上搜索一下tcp bbr算法的解析,十分巧妙鱼鼓,不再細(xì)說拟烫。
private static boolean checkBbr(int currentThread) {
if (currentThread > 1 &&
currentThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) {
return false;
}
return true;
}