降級(jí)方式
Alibaba Sentinel 支持多種降級(jí)方式:
- 根據(jù)響應(yīng)時(shí)間:判斷單位時(shí)間內(nèi)平均響應(yīng)時(shí)間是否達(dá)到閾值参淹;
- 根據(jù)異常比例:判斷單位時(shí)間內(nèi)宁炫,異常數(shù)量和異常比例是否達(dá)到閾值;
- 根據(jù)異常數(shù)量:判斷單位時(shí)間內(nèi)異常數(shù)量是否達(dá)到閾值州既;
一旦觸發(fā)熔斷谜洽,熔斷開(kāi)關(guān)將會(huì)打開(kāi),這時(shí)將拒絕所有請(qǐng)求易桃,拒絕時(shí)間為設(shè)置的降級(jí)時(shí)間間隔褥琐。通過(guò)源碼我們可以發(fā)現(xiàn),Sentinel直接使用的是ScheduledExecutorService
開(kāi)啟的一個(gè)延遲任務(wù)來(lái)實(shí)現(xiàn)降級(jí)時(shí)間間隔晤郑。
如:響應(yīng)時(shí)間達(dá)到閾值敌呈,并且熔斷時(shí)間間隔配置為5S,這時(shí)熔斷開(kāi)關(guān)會(huì)打開(kāi)造寝,并且在5S內(nèi)拒絕所有請(qǐng)求磕洪,當(dāng)5S后熔斷開(kāi)關(guān)再次關(guān)閉,這時(shí)會(huì)放行請(qǐng)求诫龙,如果放行請(qǐng)求后又觸發(fā)了熔斷析显,那么又需要等5S鐘。
熔斷降級(jí)規(guī)則說(shuō)明
熔斷降級(jí)規(guī)則(DegradeRule)包含下面幾個(gè)重要的屬性:
Field | 說(shuō)明 | 默認(rèn)值 |
---|---|---|
resource | 資源名签赃,即規(guī)則的作用對(duì)象 | |
grade | 熔斷策略谷异,支持慢調(diào)用比例/異常比例/異常數(shù)策略 | 慢調(diào)用比例 |
count | 慢調(diào)用比例模式下為慢調(diào)用臨界 RT(超出該值計(jì)為慢調(diào)用)分尸;異常比例/異常數(shù)模式下為對(duì)應(yīng)的閾值 | |
timeWindow | 熔斷時(shí)長(zhǎng),單位為 s | |
minRequestAmount | 熔斷觸發(fā)的最小請(qǐng)求數(shù)歹嘹,請(qǐng)求數(shù)小于該值時(shí)即使異常比率超出閾值也不會(huì)熔斷(1.7.0 引入) | 5 |
statIntervalMs | 統(tǒng)計(jì)時(shí)長(zhǎng)(單位為 ms)箩绍,如 60*1000 代表分鐘級(jí)(1.8.0 引入) | 1000 ms |
slowRatioThreshold | 慢調(diào)用比例閾值,僅慢調(diào)用比例模式有效(1.8.0 引入) |
最新參數(shù)請(qǐng)看官網(wǎng) 尺上。
源碼
public class DegradeRule extends AbstractRule {
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private static ScheduledExecutorService pool = Executors.newScheduledThreadPool(
Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("sentinel-degrade-reset-task", true));
public DegradeRule() {}
public DegradeRule(String resourceName) {
setResource(resourceName);
}
/**
* RT threshold or exception ratio threshold count.
*/
private double count;
/**
* Degrade recover timeout (in seconds) when degradation occurs.
*/
private int timeWindow;
/**
* Degrade strategy (0: average RT, 1: exception ratio, 2: exception count).
*/
private int grade = RuleConstant.DEGRADE_GRADE_RT;
/**
* Minimum number of consecutive slow requests that can trigger RT circuit breaking.
*
* @since 1.7.0
*/
private int rtSlowRequestAmount = RuleConstant.DEGRADE_DEFAULT_SLOW_REQUEST_AMOUNT;
/**
* Minimum number of requests (in an active statistic time span) that can trigger circuit breaking.
*
* @since 1.7.0
*/
private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;
...
// Internal implementation (will be deprecated and moved outside).
private AtomicLong passCount = new AtomicLong(0);
private final AtomicBoolean cut = new AtomicBoolean(false);
@Override
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
// 判斷熔斷開(kāi)關(guān)的狀態(tài)
if (cut.get()) {
return false;
}
ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
if (clusterNode == null) {
return true;
}
if (grade == RuleConstant.DEGRADE_GRADE_RT) { // 基于RT的熔斷模式
// 獲取平均RT
double rt = clusterNode.avgRt();
// 判斷平均RT是否達(dá)到閾值材蛛,如果沒(méi)有就放行
if (rt < this.count) {
passCount.set(0);
return true;
}
// 如果RT達(dá)到閾值,還需要判斷單位時(shí)間內(nèi)請(qǐng)求量是否達(dá)到閾值(默認(rèn)是5)
if (passCount.incrementAndGet() < rtSlowRequestAmount) {
return true;
}
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) { // 基于異常比例的熔斷策略
double exception = clusterNode.exceptionQps();
double success = clusterNode.successQps();
double total = clusterNode.totalQps();
// If total amount is less than minRequestAmount, the request will pass.
// 判斷單位時(shí)間內(nèi)請(qǐng)求數(shù)量是否發(fā)到熔斷閾值(默認(rèn)5)
if (total < minRequestAmount) {
return true;
}
// In the same aligned statistic time window,
// "success" (aka. completed count) = exception count + non-exception count (realSuccess)
double realSuccess = success - exception;
// 判斷單位時(shí)間內(nèi)異常請(qǐng)求數(shù)量是否達(dá)到閾值
if (realSuccess <= 0 && exception < minRequestAmount) {
return true;
}
// 判斷異常比例是否達(dá)到閾值
if (exception / success < count) {
return true;
}
} else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) { // 基于異常數(shù)量的熔斷策略
double exception = clusterNode.totalException();
// 判斷異常數(shù)量是否達(dá)到閾值
if (exception < count) {
return true;
}
}
if (cut.compareAndSet(false, true)) {
ResetTask resetTask = new ResetTask(this);
pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
}
return false;
}
private static final class ResetTask implements Runnable {
private DegradeRule rule;
ResetTask(DegradeRule rule) {
this.rule = rule;
}
@Override
public void run() {
rule.passCount.set(0);
rule.cut.set(false);
}
}
}
基于RT的熔斷流程圖:
總結(jié)
- 有一個(gè)值得借鑒的地方怎抛,使用
ScheduledExecutorService
來(lái)實(shí)現(xiàn)延遲任務(wù)的執(zhí)行卑吭。 - 基于RT的熔斷,框架是基于所有請(qǐng)求的平均響應(yīng)時(shí)間來(lái)實(shí)現(xiàn)的马绝,這種方式不會(huì)產(chǎn)生上下文切換豆赏。還有一種簡(jiǎn)單的方式,這種方式采用
FutureTask
機(jī)制迹淌,但是會(huì)產(chǎn)生上下文切換河绽,如:
ExecutorService executorService = Executors.newFixedThreadPool(10);
FutureTask<String> futureTask = new FutureTask<String>(() -> {
// 業(yè)務(wù)邏輯
return "處理結(jié)果";
});
executorService.submit(futureTask);
futureTask.get(5, TimeUnit.SECONDS);