一晋被、概述
上篇文章介紹過(guò)了NodeSelectorSlot和ClusterBuilderSlot兩種插槽,接下來(lái)我們沿著默認(rèn)的插槽鏈繼續(xù)分析LogSlot和StatisticSlot杏糙。
二、LogSlot
我們看代碼:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)
throws Throwable {
try {
fireEntry(context, resourceWrapper, obj, count, prioritized, args);
} catch (BlockException e) {
EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
context.getOrigin(), count);
throw e;
} catch (Throwable e) {
RecordLog.info("Entry exception", e);
}
}
可以發(fā)現(xiàn),如果在接下來(lái)的插槽鏈中發(fā)生BlockException異常的話,LogSlot會(huì)記錄日志信息。
繼續(xù)看EagleEyeLogUtil類:
public class EagleEyeLogUtil {
public static final String FILE_NAME = "sentinel-block.log";
private static StatLogger statLogger;
static {
String path = LogBase.getLogBaseDir() + FILE_NAME;
statLogger = EagleEye.statLoggerBuilder("sentinel-block-log")
.intervalSeconds(1)
.entryDelimiter('|')
.keyDelimiter(',')
.valueDelimiter(',')
.maxEntryCount(6000)
.configLogFilePath(path)
.maxFileSizeMB(300)
.maxBackupIndex(3)
.buildSingleton();
}
public static void log(String resource, String exceptionName, String ruleLimitApp, String origin, int count) {
statLogger.stat(resource, exceptionName, ruleLimitApp, origin).count(count);
}
}
可以發(fā)現(xiàn)block日志是記錄在sentienl-block.log中饮亏,這里block日志的格式化及本地文件的寫(xiě)入功能在eagleeye包中,如圖:
個(gè)人覺(jué)得這個(gè)異常日志記錄相對(duì)于sentienl的record日志開(kāi)發(fā)有點(diǎn)過(guò)于復(fù)雜阅爽,鑒于不是sentienl的核心功能路幸,我也只大概看了里面的代碼。
大致就是在StatEntry中調(diào)用count的方法付翁,然后在StatRollingData中調(diào)用StatLogController中scheduleWriteTask方法進(jìn)而創(chuàng)建一個(gè)定時(shí)任務(wù)線程池劝赔,任務(wù)是StatLogWriteTask,在這個(gè)任務(wù)中寫(xiě)入日志到本地文件中胆敞。
三、StatisticSlot
StatisticSlot是sentienl的指標(biāo)數(shù)據(jù)統(tǒng)計(jì)插槽杂伟,也是sentienl種非常重要的一個(gè)模塊移层,sentienl后續(xù)的限流,降級(jí)赫粥,熔斷都是根據(jù)這一階段的統(tǒng)計(jì)數(shù)據(jù)進(jìn)行观话。
前面文章在介紹sentinel的滑動(dòng)時(shí)間窗口時(shí),已經(jīng)知道sentienl的指標(biāo)統(tǒng)計(jì)是基于滑動(dòng)時(shí)間窗口的越平,可以看文章 Sentinel之滑動(dòng)時(shí)間窗口設(shè)計(jì)频蛔。
下面主要看在StatisticSlot插槽中,sentinel做了哪些數(shù)據(jù)的統(tǒng)計(jì)的秦叛。
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
node.increaseThreadNum();
node.addPassRequest();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest();
}
if (resourceWrapper.getType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setError(e);
// Add block count.
node.increaseBlockQps();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps();
}
if (resourceWrapper.getType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps();
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected error, set error to current entry.
context.getCurEntry().setError(e);
// This should not happen.
node.increaseExceptionQps();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseExceptionQps();
}
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseExceptionQps();
}
throw e;
}
}
在方法entry中晦溪,這里是先調(diào)用fireEntry方法繼續(xù)資源保護(hù)檢查。
請(qǐng)求通過(guò)
增加node的線程數(shù)increaseThreadNum挣跋。
在DefaultNode類中increaseThreadNum方法中:
@Override
public void increaseThreadNum() {
super.increaseThreadNum();
this.clusterNode.increaseThreadNum();
}
然后調(diào)用StatisticNode中increaseThreadNum方法三圆,在increaseThreadNum方法中執(zhí)行curThreadNum.incrementAndGet()。
這里線程數(shù)增加是通過(guò)定義一個(gè)原子變量的避咆,curThreadNum是一個(gè)AtomicInteger原子變量舟肉,初始值是0 ,執(zhí)行incrementAndGet方法加1查库,并返回當(dāng)前值路媚。
增加node的通過(guò)請(qǐng)求數(shù)addPassRequest。
在DefaultNode類中addPassRequest方法中:
@Override
public void addPassRequest() {
super.addPassRequest();
this.clusterNode.addPassRequest();
}
在StatisticNode中的addPassRequest方法:
@Override
public void addPassRequest() {
rollingCounterInSecond.addPass();
rollingCounterInMinute.addPass();
}
根據(jù)滑動(dòng)時(shí)間窗口統(tǒng)計(jì)了兩個(gè)時(shí)間窗口的數(shù)據(jù)樊销。
- rollingCounterInSecond:時(shí)間窗口是1s
- rollingCounterInMinute:時(shí)間窗口是1分鐘
源節(jié)點(diǎn)originNode
如果originNode存在整慎,則也需要增加originNode的線程數(shù)和請(qǐng)求通過(guò)數(shù)脏款。
ENTRY_NODE
如果資源包裝類型是IN的話,則需要ENTRY_NODE的線程數(shù)和請(qǐng)求通過(guò)數(shù)院领。
ENTRY_NODE是sentinel全局的統(tǒng)計(jì)節(jié)點(diǎn)弛矛,用于后續(xù)系統(tǒng)規(guī)則檢查。
ProcessorSlotEntryCallback
然后再循環(huán)處理注冊(cè)了ProcessorSlotEntryCallback的StatisticSlot比然。
StatisticSlotCallbackRegistry是一個(gè)StatisticSlot回調(diào)注冊(cè)器丈氓,目前只有兩種回調(diào)支持,ProcessorSlotEntryCallback和ProcessorSlotExitCallback强法。
ProcessorSlotEntryCallback是在資源正常調(diào)用時(shí)處理万俗,ProcessorSlotExitCallback是在發(fā)送BlockException異常或者資源退出時(shí)調(diào)用饮怯。
目前只有熱點(diǎn)限流的統(tǒng)計(jì)把ParamFlowStatisticSlotCallbackInit注冊(cè)到ProcessorSlotEntryCallback中闰歪,看下面代碼。
public class ParamFlowStatisticSlotCallbackInit implements InitFunc {
@Override
public void init() {
StatisticSlotCallbackRegistry.addEntryCallback(ParamFlowStatisticEntryCallback.class.getName(),
new ParamFlowStatisticEntryCallback());
}
}
這里系統(tǒng)若是引用了熱點(diǎn)參數(shù)限流模塊蓖墅,則客戶端系統(tǒng)會(huì)在啟動(dòng)時(shí)把ProcessorSlotEntryCallback注冊(cè)到StatisticSlotCallbackRegistry中库倘。
在ProcessorSlotEntryCallback中
public class ParamFlowStatisticEntryCallback implements ProcessorSlotEntryCallback<DefaultNode> {
@Override
public void onPass(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
throws Exception {
// The "hot spot" parameter metric is present only if parameter flow rules for the resource exist.
ParameterMetric parameterMetric = ParamFlowSlot.getParamMetric(resourceWrapper);
if (parameterMetric != null) {
parameterMetric.addPass(count, args);
}
}
@Override
public void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper, DefaultNode param,
int count, Object... args) {
// Here we don't add block count here because checking the type of block exception can affect performance.
// We add the block count when throwing the ParamFlowException instead.
}
}
onPass方法就是用來(lái)熱點(diǎn)參數(shù)的指標(biāo)的,熱點(diǎn)參數(shù)限流可以專門一個(gè)模塊來(lái)學(xué)習(xí)论矾,這里后面會(huì)再專門講解教翩。
拋出BlockException
SetError
這里會(huì)設(shè)置Context的CurEntry的Error屬性,error屬性可以在資源退出的時(shí)候判斷使用贪壳,若不存在error饱亿,會(huì)統(tǒng)計(jì)響應(yīng)時(shí)間rt等。
增加block線程數(shù)
在DefaultNode類中increaseBlockQps方法中:
@Override
public void increaseBlockQps() {
super.increaseBlockQps();
this.clusterNode.increaseBlockQps();
}
在StatisticNode中的increaseBlockQps方法:
@Override
public void increaseBlockQps() {
rollingCounterInSecond.addBlock();
rollingCounterInMinute.addBlock();
}
根據(jù)滑動(dòng)時(shí)間窗口統(tǒng)計(jì)了兩個(gè)時(shí)間窗口的數(shù)據(jù)闰靴。
rollingCounterInSecond:時(shí)間窗口是1s
rollingCounterInMinute:時(shí)間窗口是1分鐘
源節(jié)點(diǎn)originNode
如果originNode存在彪笼,則也需要增加originNode請(qǐng)求Block數(shù)。
ENTRY_NODE
如果資源包裝類型是IN的話蚂且,則需要ENTRY_NODE的Block數(shù)配猫。
然后再進(jìn)行ProcessorSlotEntryCallback的onBlocke方法,這個(gè)也是熱點(diǎn)參數(shù)限流才會(huì)有杏死。
拋出Throwable
先設(shè)置curEntry的error
增加node的異常數(shù)
node.increaseExceptionQps();
DefaultNode中:
@Override
public void increaseExceptionQps() {
super.increaseExceptionQps();
this.clusterNode.increaseExceptionQps();
}
然后判斷originNode和包裝類型章姓,增加對(duì)應(yīng)的exception數(shù)。
在Exit中
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
DefaultNode node = (DefaultNode)context.getCurNode();
if (context.getCurEntry().getError() == null) {
// Calculate response time (max RT is TIME_DROP_VALVE).
long rt = TimeUtil.currentTimeMillis() - context.getCurEntry().getCreateTime();
if (rt > Constants.TIME_DROP_VALVE) {
rt = Constants.TIME_DROP_VALVE;
}
// Record response time and success count.
node.rt(rt);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().rt(rt);
}
node.decreaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().decreaseThreadNum();
}
if (resourceWrapper.getType() == EntryType.IN) {
Constants.ENTRY_NODE.rt(rt);
Constants.ENTRY_NODE.decreaseThreadNum();
}
} else {
// Error may happen.
}
// Handle exit event with registered exit callback handlers.
Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
for (ProcessorSlotExitCallback handler : exitCallbacks) {
handler.onExit(context, resourceWrapper, count, args);
}
fireExit(context, resourceWrapper, count);
}
這里主要統(tǒng)計(jì)這次請(qǐng)求的響應(yīng)時(shí)間rt识埋,
public final static int TIME_DROP_VALVE = 4900;
如果rt大于TIME_DROP_VALVE凡伊,則設(shè)置rt為TIME_DROP_VALVE。
- 設(shè)置node的rt:node.rt(rt);
- 設(shè)置originNode的rt:context.getCurEntry().getOriginNode().rt(rt);
- 減掉node的線程數(shù) node.decreaseThreadNum();
- 減掉originNode線程數(shù) context.getCurEntry().getOriginNode().decreaseThreadNum();
- Constants.ENTRY_NODE的設(shè)置
- 最后處理ProcessorSlotExitCallback的的onExit方法窒舟。
四系忙、我的總結(jié)
1、介紹LogSlot和StatisticSlot插槽的代碼設(shè)計(jì)惠豺,其中LogSlot沒(méi)有詳細(xì)深入银还。
2风宁、LogSlot代碼設(shè)計(jì)較為復(fù)雜,個(gè)人感覺(jué)功能可以重新設(shè)計(jì)蛹疯。
3戒财、StatisticSlot的統(tǒng)計(jì)是根據(jù)滑動(dòng)時(shí)間窗口,但是線程數(shù)的統(tǒng)計(jì)是設(shè)置一個(gè)AtomicInteger原子變量捺弦。
3饮寞、如果StatisticSlotCallbackRegistry中注冊(cè)ProcessorSlotEntryCallback和ProcessorSlotExitCallback回調(diào)器,則在回調(diào)器中也會(huì)統(tǒng)計(jì)數(shù)據(jù)列吼,目前只有熱點(diǎn)參數(shù)限流會(huì)使用幽崩。
4、Sentinel后續(xù)的插槽就是根據(jù)StatisticSlot統(tǒng)計(jì)的數(shù)據(jù)進(jìn)行資源的保護(hù)的寞钥。
以上內(nèi)容若有不當(dāng)之處慌申,請(qǐng)指正,謝謝理郑!