1.規(guī)則定義:
DegradeRule rule = new DegradeRule();
rule.setResource(resourceName);
//RuleConstant.DEGRADE_GRADE_RT
//平均相應(yīng)時(shí)間,當(dāng)1s內(nèi)持續(xù)進(jìn)入5個(gè)請(qǐng)求,對(duì)應(yīng)時(shí)刻平均時(shí)間(秒級(jí))均超過閾值,那么接下來的timewindow 秒之內(nèi),對(duì)這個(gè)方法熔斷
//DEGRADE_GRADE_EXCEPTION_RATIO
//當(dāng)資源每秒請(qǐng)求超過5次,且每秒異常總數(shù)占通過量的比值超過閾值之后,資源在接下來的timewindow秒進(jìn)入降級(jí),異常比率[0.0,1.0]
//DEGRADE_GRADE_EXCEPTION_COUNT
//當(dāng)資源近1分鐘的異常數(shù)目超過閾值之后會(huì)進(jìn)行熔斷,因?yàn)槭?分鐘級(jí)別,若timewindow小于60s,則結(jié)束熔斷妝后仍可能進(jìn)入熔斷
rule.setGrade(grade);
rule.setCount(count);
rule.setTimeWindow(timeWindow);
rule.setLimitApp(limitName);
2.規(guī)則加載 DegradeRuleManager.loadRules
//DegradeRuleManager的內(nèi)部類
private static class RulePropertyListener implements PropertyListener<List<DegradeRule>> {
@Override
public void configUpdate(List<DegradeRule> conf) {
//校驗(yàn) 熔斷規(guī)則
Map<String, Set<DegradeRule>> rules = loadDegradeConf(conf);
if (rules != null) {
//清空原規(guī)則Map
degradeRules.clear();
//將所有規(guī)則放入Map
degradeRules.putAll(rules);
}
RecordLog.info("[DegradeRuleManager] Degrade rules received: " + degradeRules);
}
private Map<String, Set<DegradeRule>> loadDegradeConf(List<DegradeRule> list) {
Map<String, Set<DegradeRule>> newRuleMap = new ConcurrentHashMap<>();
if (list == null || list.isEmpty()) {
return newRuleMap;
}
for (DegradeRule rule : list) {
//校驗(yàn)熔斷規(guī)則
if (!isValidRule(rule)) {
RecordLog.warn(
"[DegradeRuleManager] Ignoring invalid degrade rule when loading new rules: " + rule);
continue;
}
if (StringUtil.isBlank(rule.getLimitApp())) {
//賦值limitApp 初始值 default
rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
String identity = rule.getResource();
Set<DegradeRule> ruleSet = newRuleMap.get(identity);
if (ruleSet == null) {
ruleSet = new HashSet<>();
newRuleMap.put(identity, ruleSet);
}
ruleSet.add(rule);
}
return newRuleMap;
}
}
//校驗(yàn)規(guī)則
public static boolean isValidRule(DegradeRule rule) {
boolean baseValid = rule != null && !StringUtil.isBlank(rule.getResource())
&& rule.getCount() >= 0 && rule.getTimeWindow() > 0;
if (!baseValid) {
return false;
}
// Warn for RT mode that exceeds the {@code TIME_DROP_VALVE}.
int maxAllowedRt = Constants.TIME_DROP_VALVE;
//超出此閾值的都會(huì)算作 4900 ms帕膜,若需要變更此上限可以通過啟動(dòng)配置項(xiàng) -Dcsp.sentinel.statistic.max.rt=xxx 來配置。
//SentinelConfig 這個(gè)類進(jìn)行一些這種默認(rèn)值的初始化以及讀取JVM這些數(shù)據(jù)配置的值
if (rule.getGrade() == RuleConstant.DEGRADE_GRADE_RT && rule.getCount() > maxAllowedRt) {
RecordLog.warn(String.format("[DegradeRuleManager] WARN: setting large RT threshold (%.1f ms) in RT mode"
+ " will not take effect since it exceeds the max allowed value (%d ms)", rule.getCount(),
maxAllowedRt));
}
// Check exception ratio mode.
if (rule.getGrade() == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO && rule.getCount() > 1) {
return false;
}
return true;
}
}
3.熔斷校驗(yàn)DegradeSlot DegradeRuleManager DegradeRule
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
//校驗(yàn)是否熔斷
DegradeRuleManager.checkDegrade(resourceWrapper, context, node, count);
//下一個(gè)節(jié)點(diǎn)
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
public final class DegradeRuleManager {
private static final Map<String, Set<DegradeRule>> degradeRules = new ConcurrentHashMap<>();
//校驗(yàn)熔斷
public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count)
throws BlockException {
//獲取規(guī)則
Set<DegradeRule> rules = degradeRules.get(resource.getName());
if (rules == null) {
return;
}
for (DegradeRule rule : rules) {
//遍歷規(guī)則,如果有一個(gè)規(guī)則命中,則拋出熔斷異常
if (!rule.passCheck(context, node, count)) {
throw new DegradeException(rule.getLimitApp(), rule);
}
}
}
}
public class DegradeRule extends AbstractRule {
private static final int RT_MAX_EXCEED_N = 5;
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static ScheduledExecutorService pool = Executors.newScheduledThreadPool(
Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("sentinel-degrade-reset-task", true));
@Override
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
//熔斷后cut會(huì)被設(shè)置成true,timewindow之后由定時(shí)任務(wù)改為false
if (cut.get()) {
return false;
}
//獲取ClusterBuilderSlot賦值的ClusterNode節(jié)點(diǎn)
ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
if (clusterNode == null) {
return true;
}
//RT熔斷
if (grade == RuleConstant.DEGRADE_GRADE_RT) {
//獲取平均RT(一秒)
double rt = clusterNode.avgRt();
if (rt < this.count) {
//平均RT小于配置值count時(shí),則通過校驗(yàn)
//如果校驗(yàn)通過則,設(shè)置passCount為0,以重新計(jì)算當(dāng)前秒校驗(yàn)失敗的此數(shù)
passCount.set(0);
return true;
}
// 如果校驗(yàn)失敗的此數(shù)少于5次,則算做校驗(yàn)通過 RT_MAX_EXCEED_N:默認(rèn)值5
if (passCount.incrementAndGet() < RT_MAX_EXCEED_N) {
return true;
}
//按異常比率熔斷
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
//當(dāng)前秒的異常數(shù)
double exception = clusterNode.exceptionQps();
//當(dāng)前秒的成功數(shù)
double success = clusterNode.successQps();
//當(dāng)前秒的總數(shù)(校驗(yàn)成功(pass)+校驗(yàn)失敗的(block))MetricEvent
double total = clusterNode.totalQps();
// 總數(shù)小于5次 不降級(jí)
if (total < RT_MAX_EXCEED_N) {
return true;
}
double realSuccess = success - exception;
if (realSuccess <= 0 && exception < RT_MAX_EXCEED_N) {
return true;
}
if (exception / success < count) {
return true;
}
//按異常次數(shù)降級(jí)
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
//獲取當(dāng)前一分鐘的異常總次數(shù)
//因?yàn)榇颂幨且环昼? 所以,如果timeWindow的時(shí)間結(jié)束后,仍然處于當(dāng)前這一分鐘范圍,則應(yīng)該會(huì)直接降級(jí),因?yàn)楫?dāng)前分鐘異常次數(shù)超過了規(guī)則配置次數(shù)
double exception = clusterNode.totalException();
//總次數(shù)小于配置count 則校驗(yàn)通過
if (exception < count) {
return true;
}
}
//已經(jīng)校驗(yàn)失敗,應(yīng)該降級(jí)
//將cut 設(shè)置為true,為true時(shí)直接回降級(jí)
//unsafe.compareAndSwapInt(this, valueOffset, e, u)
//expect表示期望的值,即遇到這個(gè)值,則將value改為update值,在這里就是遇到false便改成true,更新成功返回true
//如果遇到的值不是expect值則不做任何處理,并返回個(gè)false
if (cut.compareAndSet(false, true)) {
//創(chuàng)建定時(shí)任務(wù)
ResetTask resetTask = new ResetTask(this);
//timeWindow時(shí)間后,執(zhí)行resetTask這個(gè)定時(shí)任務(wù)
pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
}
return false;
}
//定時(shí)任務(wù)
private static final class ResetTask implements Runnable {
private DegradeRule rule;
ResetTask(DegradeRule rule) {
this.rule = rule;
}
@Override
public void run() {
//將校驗(yàn)失敗次數(shù)設(shè)置為0,用來設(shè)定5次失敗內(nèi)不降級(jí)
rule.getPassCount().set(0);
//將cut設(shè)置為false,表示進(jìn)行正常校驗(yàn)流程
rule.cut.set(false);
}
}
}