主要流程
- springboot集成包
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
<version>2.2.3.RELEASE</version>
</dependency>
該包的spring.factories里導(dǎo)入了這些組件
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.sentinel.SentinelWebAutoConfiguration,\
com.alibaba.cloud.sentinel.SentinelWebFluxAutoConfiguration,\
com.alibaba.cloud.sentinel.endpoint.SentinelEndpointAutoConfiguration,\
com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration,\
com.alibaba.cloud.sentinel.feign.SentinelFeignAutoConfiguration
org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
com.alibaba.cloud.sentinel.custom.SentinelCircuitBreakerConfiguration
主要是通過(guò)SentinelWebAutoConfiguration這個(gè)類(lèi)添加一個(gè)MVC攔截器
@Override
public void addInterceptors(InterceptorRegistry registry) {
if (!sentinelWebInterceptorOptional.isPresent()) {
return;
}
SentinelProperties.Filter filterConfig = properties.getFilter();
registry.addInterceptor(sentinelWebInterceptorOptional.get())
.order(filterConfig.getOrder())
//攔截全路徑/*
.addPathPatterns(filterConfig.getUrlPatterns());
}
- 入口:在web請(qǐng)求過(guò)來(lái)的時(shí)候攔截来破,調(diào)用AbstractSentinelInterceptor接口
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {
try {
//獲取請(qǐng)求的路徑 比如訪問(wèn)的說(shuō)localhost:8085/area/list币厕,resourceName 為 area/list
String resourceName = getResourceName(request);
if (StringUtil.isEmpty(resourceName)) {
return true;
}
if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {
return true;
}
String origin = parseOrigin(request);
String contextName = getContextName(request);
ContextUtil.enter(contextName, origin);
//主要是進(jìn)入到這個(gè)方法
Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);
return true;
} catch (BlockException e) {
try {
//處理流控的exception返回給前端喷兼,可以自己定制返回內(nèi)容
handleBlockException(request, response, e);
} finally {
ContextUtil.exit();
}
return false;
}
}
- 主要攔截方法:entry方法一直點(diǎn)進(jìn)來(lái)會(huì)進(jìn)到com.alibaba.csp.sentinel.CtSph#entryWithPriority
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
Context context = ContextUtil.getContext();
...省略一堆校驗(yàn)
//獲取該資源的攔截鏈
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
//將資源妆兑,攔截鏈環(huán)境包裝為一個(gè)entry
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
//真正執(zhí)行攔截鏈的方法(下面詳解)
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
//獲取該資源上的攔截鏈
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
//當(dāng)?shù)谝淮芜M(jìn)來(lái)的時(shí)候回?cái)r截鏈chain為空
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
//初始化攔截鏈,(下面單獨(dú)開(kāi)出來(lái)講)
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
- chain = SlotChainProvider.newSlotChain();創(chuàng)建攔截鏈
這邊主要是創(chuàng)建了8個(gè)攔截鏈每强,分別對(duì)應(yīng)我們sentinel控制臺(tái)配的8個(gè),主要我們常用的說(shuō)流控規(guī)則跟降級(jí)規(guī)則
表頭 | 表頭 |
---|---|
鏈路節(jié)點(diǎn)生成 | NodeSelectorSlot |
集群流控 | ClusterBuilderSlot |
規(guī)則限制日志相關(guān) | LogSlot |
統(tǒng)計(jì)相關(guān)(重要) | StatisticSlot |
熱點(diǎn)規(guī)則 | ParamFlowSlot |
授權(quán)規(guī)則 | AuthoritySlot |
系統(tǒng)規(guī)則 | SystemSlot |
流控規(guī)則(重要) | FlowSlot |
降級(jí)規(guī)則(重要) | DegradeSlot |
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
...省略spi等代碼
}
public class HotParamSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new ParamFlowSlot());
chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
return chain;
}
}
構(gòu)建后形成這個(gè)鏈表
- 真正執(zhí)行攔截鏈的方法 chain.entry(...);
調(diào)用chain里面的first.transformEntry方法稿静,也就是他的處理方法,因?yàn)榈谝粋€(gè)是默認(rèn)的DefaultProcessorSlotChain诸蚕,他沒(méi)有做任何處理步势,直接調(diào)用 next.transformEntry給下面的責(zé)任鏈處理
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
幾個(gè)責(zé)任鏈的主要代碼
- NodeSelectorSlot(負(fù)責(zé)請(qǐng)求鏈路資源的歸納)
負(fù)責(zé)收集資源的路徑氧猬,并將這些資源的調(diào)用路徑,以樹(shù)狀結(jié)構(gòu)存儲(chǔ)起來(lái)坏瘩,用于根據(jù)凋用路徑來(lái)限流降級(jí)
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
//context.getName()的結(jié)果是sentinel_spring_web_context形成下面那個(gè)節(jié)點(diǎn)
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
context.setCurNode(node);
//調(diào)用下個(gè)Slot處理
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
- ClusterBuilderSlot(集群相關(guān))
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// Create the cluster node.
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
node.setClusterNode(clusterNode);
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
- LogSlot(日志相關(guān))
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)
throws Throwable {
try {
fireEntry(context, resourceWrapper, obj, count, prioritized, args);
} catch (BlockException e) {
//當(dāng)發(fā)生限制時(shí)記錄信息盅抚。
EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
context.getOrigin(), count);
throw e;
} catch (Throwable e) {
RecordLog.warn("Unexpected entry exception", e);
}
}
-
StatisticSlot(重要,統(tǒng)計(jì)類(lèi))
用于存儲(chǔ)資源的統(tǒng)計(jì)信息以及調(diào)用者信息倔矾,例如該資源的 RT QPS, thread count等等這些信息將用作為多維度限流妄均,降級(jí)的依據(jù)
統(tǒng)計(jì)用的是滑動(dòng)時(shí)間算法,篇幅有點(diǎn)長(zhǎng)哪自,放到滑動(dòng)時(shí)間算法與sentinel實(shí)踐
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
//執(zhí)行后面的檢查
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 如果其他的校驗(yàn)都成功沒(méi)有問(wèn)題
//增加單位時(shí)間成功線程數(shù)量
node.increaseThreadNum();
//增加單位時(shí)間成功請(qǐng)求,(滑動(dòng)時(shí)間窗口計(jì)數(shù)實(shí)現(xiàn))
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setError(e);
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected error, set error to current entry.
context.getCurEntry().setError(e);
// This should not happen.
node.increaseExceptionQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseExceptionQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseExceptionQps(count);
}
throw e;
}
}
- ParamFlowSlot(熱點(diǎn)流控)
//對(duì)有添加熱點(diǎn)流控規(guī)則的資源進(jìn)行限制
void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
if (args == null) {
return;
}
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
return;
}
List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
for (ParamFlowRule rule : rules) {
applyRealParamIdx(rule, args.length);
// Initialize the parameter metrics.
ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
String triggeredParam = "";
if (args.length > rule.getParamIdx()) {
Object value = args[rule.getParamIdx()];
triggeredParam = String.valueOf(value);
}
throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
}
}
}
- SystemSlot
當(dāng)配置了系統(tǒng)規(guī)則后丰包,會(huì)根據(jù)配置的系統(tǒng)規(guī)則進(jìn)行校驗(yàn)
public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
...
// qps限制數(shù)
double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
if (currentQps > qps) {
throw new SystemBlockException(resourceWrapper.getName(), "qps");
}
// thread限制
int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
if (currentThread > maxThread) {
throw new SystemBlockException(resourceWrapper.getName(), "thread");
}
//rt限制
double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
if (rt > maxRt) {
throw new SystemBlockException(resourceWrapper.getName(), "rt");
}
if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
if (!checkBbr(currentThread)) {
throw new SystemBlockException(resourceWrapper.getName(), "load");
}
}
// CPU使用率
if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
throw new SystemBlockException(resourceWrapper.getName(), "cpu");
}
}
- AuthoritySlot(根據(jù)配置的授權(quán)規(guī)則來(lái)限制)
void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
if (authorityRules == null) {
return;
}
Set<AuthorityRule> rules = authorityRules.get(resource.getName());
if (rules == null) {
return;
}
for (AuthorityRule rule : rules) {
if (!AuthorityRuleChecker.passCheck(rule, context)) {
throw new AuthorityException(context.getOrigin(), rule);
}
}
}
-
FlowSlot(重要,限流相關(guān),sentinel拉取配置應(yīng)用nacos的動(dòng)態(tài)加載配置com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager.FlowPropertyListener主要調(diào)用該方法)
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
//獲取當(dāng)前資源下的所有的流控規(guī)則
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
//----------------------------------?------------------------------------------
獲取flowRules流控規(guī)則,規(guī)則naco
public Collection<FlowRule> apply(String resource) {
//getFlowRuleMap里面代碼是 return flowRules; 返回本地的規(guī)則map文件
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
return flowRules.get(resource);
}
//----------------------------------?------------------------------------------
判斷是否能通過(guò)該規(guī)則
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
//這邊的getRater()方法會(huì)根據(jù)你配置時(shí)的 1壤巷、快速失敗 2邑彪、Warm Up 3、 排隊(duì)等待 三個(gè)類(lèi)型分別調(diào)用不同的方法
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
- 快速失敗
主要是調(diào)用com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController#canPass()
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//獲取當(dāng)前平均訪問(wèn)次數(shù),avgUsedTokens是獲取node.passQps()胧华,調(diào)用StatisticNode#passQps
int curCount = avgUsedTokens(node);
//如果超過(guò)限制則直接返回false
if (curCount + acquireCount > count) {
//DefaultController這邊的 prioritized 為false里面的方法不提示
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
...
}
}
return false;
}
return true;
}
- Warm Up 調(diào)用了WarmUpController#canPass(...) 這邊主要使用的是令牌桶算法
//構(gòu)建函數(shù)
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
//冷因子不能小于1,默認(rèn)為3
if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}
//設(shè)置閾值
this.count = count;
//設(shè)置冷因子
this.coldFactor = coldFactor;
// thresholdPermits = 0.5 * warmupPeriod / stableInterval.
// warningToken = 100;
//預(yù)警token=(預(yù)熱時(shí)長(zhǎng)*閾值)/(冷因子-1)
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
// / maxPermits = thresholdPermits + 2 * warmupPeriod /
// (stableInterval + coldInterval)
// maxToken = 200
//最大token=(預(yù)熱預(yù)警token*閾值)/(冷因子-1)
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
// slope
// slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits
// - thresholdPermits);
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long passQps = (long) node.passQps();
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
// 開(kāi)始計(jì)算它的斜率
// 如果進(jìn)入了警戒線寄症,開(kāi)始調(diào)整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 消耗的速度要比warning快,但是要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
- 排隊(duì)等待
主要是用到RateLimiterController#canPass(com.alibaba.csp.sentinel.node.Node, int, boolean)
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//當(dāng)請(qǐng)求通過(guò)量小于等于0時(shí)矩动,直接返回通過(guò)
if (acquireCount <= 0) {
return true;
}
// Reject when count is less or equal than 0.
// Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
// 這個(gè)count是你控制臺(tái)設(shè)置的閾值
if (count <= 0) {
return false;
}
//獲取當(dāng)前時(shí)間
long currentTime = TimeUtil.currentTimeMillis();
// Calculate the interval between every two requests.
// 計(jì)算兩個(gè)請(qǐng)求之間需要花費(fèi)的時(shí)間
// 假設(shè)acquireCount=1,count =10, 則((1*1)/10*1000)取整=1000
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// Expected pass time of this request.
// 預(yù)期通過(guò)這個(gè)請(qǐng)求的時(shí)間
long expectedTime = costTime + latestPassedTime.get();
// 判斷預(yù)期通過(guò)時(shí)間是否小于當(dāng)前時(shí)間
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
// Calculate the time to wait.
// 計(jì)算預(yù)計(jì)需要等待的時(shí)間(當(dāng)前預(yù)期需要花費(fèi)的時(shí)間+最后一次成功的時(shí)間-當(dāng)前時(shí)間)
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
// 如果預(yù)計(jì)需要等待的時(shí)間大于后臺(tái)配置的等待時(shí)間有巧,則直接拒絕
if (waitTime > maxQueueingTimeMs(后臺(tái)配置為毫秒)) {
return false;
} else {
// 設(shè)置latestPassedTime,用atomic變量防止并發(fā)
long oldTime = latestPassedTime.addAndGet(costTime);
try {
// 再做一次超時(shí)判斷
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
// 休眠等待時(shí)間
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
-
DegradeSlot(重要,降級(jí)相關(guān))
@Override
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
if (cut.get()) {
return false;
}
ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
if (clusterNode == null) {
return true;
}
//熔斷策略為RT(響應(yīng)時(shí)間)慢調(diào)用比例 這邊三個(gè)規(guī)則對(duì)應(yīng)后臺(tái)的配置
if (grade == RuleConstant.DEGRADE_GRADE_RT) {
//平均響應(yīng)時(shí)間
double rt = clusterNode.avgRt();
if (rt < this.count) {
passCount.set(0);
return true;
}
// Sentinel will degrade the service only if count exceeds.
// 只有在通過(guò)數(shù)超過(guò)設(shè)置的最小值的時(shí)候才會(huì)降級(jí)
if (passCount.incrementAndGet() < rtSlowRequestAmount) {
return true;
}
}
// 熔斷策略為異常比例時(shí)
else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
// 單位時(shí)間內(nèi)的異常數(shù)
double exception = clusterNode.exceptionQps();
// 單位時(shí)間內(nèi)的成功數(shù)
double success = clusterNode.successQps();
// 單位時(shí)間內(nèi)的總共請(qǐng)求數(shù)
double total = clusterNode.totalQps();
// If total amount is less than minRequestAmount, the request will pass.
// 當(dāng)總共請(qǐng)求數(shù)達(dá)不到最小請(qǐng)求數(shù)(后臺(tái)配置)時(shí)直接放行
if (total < minRequestAmount) {
return true;
}
// In the same aligned statistic time window,
// "success" (aka. completed count) = exception count + non-exception count (realSuccess)
// success數(shù)包含了有異常跟無(wú)異常的悲没,所以要求realSuc要減去異常數(shù)
double realSuccess = success - exception;
if (realSuccess <= 0 && exception < minRequestAmount) {
return true;
}
if (exception / success < count) {
return true;
}
}
// 熔斷策略為異常數(shù)
else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
//一分鐘內(nèi)的異常數(shù) totalException() { return rollingCounterInMinute.exception(); }
double exception = clusterNode.totalException();
if (exception < count) {
return true;
}
}
if (cut.compareAndSet(false, true)) {
ResetTask resetTask = new ResetTask(this);
pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
}
return false;
}
集成nacos后配置更新相關(guān)源碼
主要利用nacos配置讀取更新等,可參考nacos配置中心源碼Nacos配置中心源碼
- 參考nacos配置更新篮迎,他主要是注冊(cè)一個(gè)listener監(jiān)聽(tīng)nacos config(當(dāng)有數(shù)據(jù)發(fā)生改變的時(shí)候調(diào)用listener)
主要是通過(guò)CacheData#safeNotifyListener這個(gè)方法來(lái)修改的,我們重點(diǎn)看這個(gè)方法示姿,具體nacos配置更新流程可查看
private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
final String md5, final ManagerListenerWrap listenerWrap) {
final Listener listener = listenerWrap.listener;
Runnable job = new Runnable() {
@Override
public void run() {
ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
ClassLoader appClassLoader = listener.getClass().getClassLoader();
try {
if (listener instanceof AbstractSharedListener) {
AbstractSharedListener adapter = (AbstractSharedListener) listener;
adapter.fillContext(dataId, group);
LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
}
// 執(zhí)行回調(diào)之前先將線程classloader設(shè)置為具體webapp的classloader甜橱,以免回調(diào)方法中調(diào)用spi接口是出現(xiàn)異常或錯(cuò)用(多應(yīng)用部署才會(huì)有該問(wèn)題)峻凫。
Thread.currentThread().setContextClassLoader(appClassLoader);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setGroup(group);
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
String contentTmp = cr.getContent();
//主要是調(diào)用這個(gè)方法渗鬼,下面其他代碼省略
listener.receiveConfigInfo(contentTmp);
...
}
//----------------------------------?------------------------------------------
這個(gè)是nacos sentinel實(shí)現(xiàn)的監(jiān)聽(tīng)器
public NacosDataSource(final Properties properties, final String groupId, final String dataId,
Converter<String, T> parser) {
super(parser);
if (StringUtil.isBlank(groupId) || StringUtil.isBlank(dataId)) {
throw new IllegalArgumentException(String.format("Bad argument: groupId=[%s], dataId=[%s]",
groupId, dataId));
}
AssertUtil.notNull(properties, "Nacos properties must not be null, you could put some keys from PropertyKeyConst");
this.groupId = groupId;
this.dataId = dataId;
this.properties = properties;
this.configListener = new Listener() {
@Override
public Executor getExecutor() {
return pool;
}
@Override
public void receiveConfigInfo(final String configInfo) {
RecordLog.info(String.format("[NacosDataSource] New property value received for (properties: %s) (dataId: %s, groupId: %s): %s",
properties, dataId, groupId, configInfo));
T newValue = NacosDataSource.this.parser.convert(configInfo);
//主要是調(diào)用這個(gè)方法 Update the new value to the property.
getProperty().updateValue(newValue);
}
};
initNacosListener();
loadInitialConfig();
}
//----------------------------------?------------------------------------------
public boolean updateValue(T newValue) {
if (isEqual(value, newValue)) {
return false;
}
RecordLog.info("[DynamicSentinelProperty] Config will be updated to: " + newValue);
value = newValue;
for (PropertyListener<T> listener : listeners) {
//找到對(duì)當(dāng)前規(guī)則感興趣的listener,并修改值
listener.configUpdate(newValue);
}
return true;
}
//----------------------------------?------------------------------------------
private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {
@Override
public void configUpdate(List<FlowRule> value) {
//組建新的規(guī)則map
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
if (rules != null) {
//替換map
flowRules.clear();
flowRules.putAll(rules);
}
RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules);
}
@Override
public void configLoad(List<FlowRule> conf) {
Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf);
if (rules != null) {
flowRules.clear();
flowRules.putAll(rules);
}
RecordLog.info("[FlowRuleManager] Flow rules loaded: " + flowRules);
}
}
sentinel集成nacos總結(jié)
客戶(hù)端啟動(dòng)時(shí),會(huì)根據(jù)配置地址去取nacos上取sentinel配置荧琼,然后保存在本地的rules譬胎,每次校次檢驗(yàn)直接用本地rules去檢驗(yàn)。
啟動(dòng)sentinel控制臺(tái)命锄,當(dāng)在控制臺(tái)修改的時(shí)候會(huì)同步到nacos配置堰乔。當(dāng)修改為nacos獲取的時(shí)候也是一樣的,也會(huì)從nacos去取配置脐恩,
客戶(hù)端方面镐侯,當(dāng)nacos配置發(fā)生變化,sentinel會(huì)有監(jiān)聽(tīng)器去執(zhí)行,修改掉本地的配置苟翻。
問(wèn)題:
如果修改nacos的配置韵卤,dashboard會(huì)同步嗎
實(shí)測(cè),直接在nacos的控制臺(tái)修改配置崇猫,dashboard也會(huì)同步沈条,原理也是用的nacos的配置動(dòng)態(tài)更新
引入的包為sentinel-datasource-nacos在這里面