Sentinel流程源碼

主要流程

  • springboot集成包
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
            <version>2.2.3.RELEASE</version>
        </dependency>

該包的spring.factories里導(dǎo)入了這些組件

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.sentinel.SentinelWebAutoConfiguration,\
com.alibaba.cloud.sentinel.SentinelWebFluxAutoConfiguration,\
com.alibaba.cloud.sentinel.endpoint.SentinelEndpointAutoConfiguration,\
com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration,\
com.alibaba.cloud.sentinel.feign.SentinelFeignAutoConfiguration

org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
com.alibaba.cloud.sentinel.custom.SentinelCircuitBreakerConfiguration

主要是通過(guò)SentinelWebAutoConfiguration這個(gè)類(lèi)添加一個(gè)MVC攔截器

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        if (!sentinelWebInterceptorOptional.isPresent()) {
            return;
        }
        SentinelProperties.Filter filterConfig = properties.getFilter();
        registry.addInterceptor(sentinelWebInterceptorOptional.get())
                .order(filterConfig.getOrder())
//攔截全路徑/*
                .addPathPatterns(filterConfig.getUrlPatterns());
    }
  • 入口:在web請(qǐng)求過(guò)來(lái)的時(shí)候攔截来破,調(diào)用AbstractSentinelInterceptor接口
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
        throws Exception {
        try {
//獲取請(qǐng)求的路徑 比如訪問(wèn)的說(shuō)localhost:8085/area/list币厕,resourceName 為 area/list
            String resourceName = getResourceName(request);

            if (StringUtil.isEmpty(resourceName)) {
                return true;
            }
            
            if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {
                return true;
            }
            
            String origin = parseOrigin(request);
            String contextName = getContextName(request);
            ContextUtil.enter(contextName, origin);
//主要是進(jìn)入到這個(gè)方法
            Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
            request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);
            return true;
        } catch (BlockException e) {
            try {
//處理流控的exception返回給前端喷兼,可以自己定制返回內(nèi)容
                handleBlockException(request, response, e);
            } finally {
                ContextUtil.exit();
            }
            return false;
        }
    }

  • 主要攔截方法:entry方法一直點(diǎn)進(jìn)來(lái)會(huì)進(jìn)到com.alibaba.csp.sentinel.CtSph#entryWithPriority
    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        Context context = ContextUtil.getContext();
...省略一堆校驗(yàn)
//獲取該資源的攔截鏈
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }
//將資源妆兑,攔截鏈環(huán)境包裝為一個(gè)entry
        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
//真正執(zhí)行攔截鏈的方法(下面詳解)
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }

    ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
//獲取該資源上的攔截鏈
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
//當(dāng)?shù)谝淮芜M(jìn)來(lái)的時(shí)候回?cái)r截鏈chain為空
        if (chain == null) {
            synchronized (LOCK) {
                chain = chainMap.get(resourceWrapper);
                if (chain == null) {
                    // Entry size limit.
                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                        return null;
                    }
//初始化攔截鏈,(下面單獨(dú)開(kāi)出來(lái)講)
                    chain = SlotChainProvider.newSlotChain();
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                }
            }
        }
        return chain;
    }
  • chain = SlotChainProvider.newSlotChain();創(chuàng)建攔截鏈
    這邊主要是創(chuàng)建了8個(gè)攔截鏈每强,分別對(duì)應(yīng)我們sentinel控制臺(tái)配的8個(gè),主要我們常用的說(shuō)流控規(guī)則跟降級(jí)規(guī)則
表頭 表頭
鏈路節(jié)點(diǎn)生成 NodeSelectorSlot
集群流控 ClusterBuilderSlot
規(guī)則限制日志相關(guān) LogSlot
統(tǒng)計(jì)相關(guān)(重要) StatisticSlot
熱點(diǎn)規(guī)則 ParamFlowSlot
授權(quán)規(guī)則 AuthoritySlot
系統(tǒng)規(guī)則 SystemSlot
流控規(guī)則(重要) FlowSlot
降級(jí)規(guī)則(重要) DegradeSlot
    public static ProcessorSlotChain newSlotChain() {
        if (slotChainBuilder != null) {
            return slotChainBuilder.build();
        }
...省略spi等代碼
    }

public class HotParamSlotChainBuilder implements SlotChainBuilder {
    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        chain.addLast(new NodeSelectorSlot());
        chain.addLast(new ClusterBuilderSlot());
        chain.addLast(new LogSlot());
        chain.addLast(new StatisticSlot());
        chain.addLast(new ParamFlowSlot());
        chain.addLast(new SystemSlot());
        chain.addLast(new AuthoritySlot());
        chain.addLast(new FlowSlot());
        chain.addLast(new DegradeSlot());
        return chain;
    }
}

構(gòu)建后形成這個(gè)鏈表


image.png
  • 真正執(zhí)行攔截鏈的方法 chain.entry(...);
    調(diào)用chain里面的first.transformEntry方法稿静,也就是他的處理方法,因?yàn)榈谝粋€(gè)是默認(rèn)的DefaultProcessorSlotChain诸蚕,他沒(méi)有做任何處理步势,直接調(diào)用 next.transformEntry給下面的責(zé)任鏈處理
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
        throws Throwable {
        first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
    }

    @Override
    public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        if (next != null) {
            next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
        }
    }

幾個(gè)責(zé)任鏈的主要代碼

  • NodeSelectorSlot(負(fù)責(zé)請(qǐng)求鏈路資源的歸納)
    負(fù)責(zé)收集資源的路徑氧猬,并將這些資源的調(diào)用路徑,以樹(shù)狀結(jié)構(gòu)存儲(chǔ)起來(lái)坏瘩,用于根據(jù)凋用路徑來(lái)限流降級(jí)
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
//context.getName()的結(jié)果是sentinel_spring_web_context形成下面那個(gè)節(jié)點(diǎn)
        DefaultNode node = map.get(context.getName());
        if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    node = new DefaultNode(resourceWrapper, null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                    // Build invocation tree
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }
            }
        }
        context.setCurNode(node);
//調(diào)用下個(gè)Slot處理
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
  • ClusterBuilderSlot(集群相關(guān))
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args)
        throws Throwable {
        if (clusterNode == null) {
            synchronized (lock) {
                if (clusterNode == null) {
                    // Create the cluster node.
                    clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                    HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                    newMap.putAll(clusterNodeMap);
                    newMap.put(node.getId(), clusterNode);

                    clusterNodeMap = newMap;
                }
            }
        }
        node.setClusterNode(clusterNode);
        if (!"".equals(context.getOrigin())) {
            Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
            context.getCurEntry().setOriginNode(originNode);
        }
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
  • LogSlot(日志相關(guān))
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        try {
            fireEntry(context, resourceWrapper, obj, count, prioritized, args);
        } catch (BlockException e) {
//當(dāng)發(fā)生限制時(shí)記錄信息盅抚。
            EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
                context.getOrigin(), count);
            throw e;
        } catch (Throwable e) {
            RecordLog.warn("Unexpected entry exception", e);
        }
    }
  • StatisticSlot(重要,統(tǒng)計(jì)類(lèi))

用于存儲(chǔ)資源的統(tǒng)計(jì)信息以及調(diào)用者信息倔矾,例如該資源的 RT QPS, thread count等等這些信息將用作為多維度限流妄均,降級(jí)的依據(jù)
統(tǒng)計(jì)用的是滑動(dòng)時(shí)間算法,篇幅有點(diǎn)長(zhǎng)哪自,放到滑動(dòng)時(shí)間算法與sentinel實(shí)踐

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            //執(zhí)行后面的檢查
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
            // 如果其他的校驗(yàn)都成功沒(méi)有問(wèn)題
            //增加單位時(shí)間成功線程數(shù)量
            node.increaseThreadNum();
            //增加單位時(shí)間成功請(qǐng)求,(滑動(dòng)時(shí)間窗口計(jì)數(shù)實(shí)現(xiàn))
            node.addPassRequest(count);

            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException ex) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {
            // Blocked, set block exception to current entry.
            context.getCurEntry().setError(e);

            // Add block count.
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } catch (Throwable e) {
            // Unexpected error, set error to current entry.
            context.getCurEntry().setError(e);

            // This should not happen.
            node.increaseExceptionQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseExceptionQps(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                Constants.ENTRY_NODE.increaseExceptionQps(count);
            }
            throw e;
        }
    }
  • ParamFlowSlot(熱點(diǎn)流控)
//對(duì)有添加熱點(diǎn)流控規(guī)則的資源進(jìn)行限制
    void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
        if (args == null) {
            return;
        }
        if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
            return;
        }
        List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());

        for (ParamFlowRule rule : rules) {
            applyRealParamIdx(rule, args.length);

            // Initialize the parameter metrics.
            ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);

            if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
                String triggeredParam = "";
                if (args.length > rule.getParamIdx()) {
                    Object value = args[rule.getParamIdx()];
                    triggeredParam = String.valueOf(value);
                }
                throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
            }
        }
    }
  • SystemSlot
    當(dāng)配置了系統(tǒng)規(guī)則后丰包,會(huì)根據(jù)配置的系統(tǒng)規(guī)則進(jìn)行校驗(yàn)
    public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
 ...
        // qps限制數(shù)
        double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
        if (currentQps > qps) {
            throw new SystemBlockException(resourceWrapper.getName(), "qps");
        }
        // thread限制
        int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
        if (currentThread > maxThread) {
            throw new SystemBlockException(resourceWrapper.getName(), "thread");
        }
        //rt限制
        double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
        if (rt > maxRt) {
            throw new SystemBlockException(resourceWrapper.getName(), "rt");
        }
        if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
            if (!checkBbr(currentThread)) {
                throw new SystemBlockException(resourceWrapper.getName(), "load");
            }
        }
        // CPU使用率
        if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
            throw new SystemBlockException(resourceWrapper.getName(), "cpu");
        }
    }
  • AuthoritySlot(根據(jù)配置的授權(quán)規(guī)則來(lái)限制)
    void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
        Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();

        if (authorityRules == null) {
            return;
        }

        Set<AuthorityRule> rules = authorityRules.get(resource.getName());
        if (rules == null) {
            return;
        }

        for (AuthorityRule rule : rules) {
            if (!AuthorityRuleChecker.passCheck(rule, context)) {
                throw new AuthorityException(context.getOrigin(), rule);
            }
        }
    }
  • FlowSlot(重要,限流相關(guān),sentinel拉取配置應(yīng)用nacos的動(dòng)態(tài)加載配置com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager.FlowPropertyListener主要調(diào)用該方法)
    public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider == null || resource == null) {
            return;
        }
        //獲取當(dāng)前資源下的所有的流控規(guī)則
        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }
  //----------------------------------?------------------------------------------
獲取flowRules流控規(guī)則,規(guī)則naco
        public Collection<FlowRule> apply(String resource) {
            //getFlowRuleMap里面代碼是 return flowRules; 返回本地的規(guī)則map文件
            Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
            return flowRules.get(resource);
        }

//----------------------------------?------------------------------------------
判斷是否能通過(guò)該規(guī)則
    private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        if (selectedNode == null) {
            return true;
        }
//這邊的getRater()方法會(huì)根據(jù)你配置時(shí)的 1壤巷、快速失敗  2邑彪、Warm Up 3、 排隊(duì)等待 三個(gè)類(lèi)型分別調(diào)用不同的方法
        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }
  1. 快速失敗
    主要是調(diào)用com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController#canPass()
    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        //獲取當(dāng)前平均訪問(wèn)次數(shù),avgUsedTokens是獲取node.passQps()胧华,調(diào)用StatisticNode#passQps
        int curCount = avgUsedTokens(node);
        //如果超過(guò)限制則直接返回false
        if (curCount + acquireCount > count) {
        //DefaultController這邊的 prioritized 為false里面的方法不提示
            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
      ...
                }
            }
            return false;
        }
        return true;
    }
  1. Warm Up 調(diào)用了WarmUpController#canPass(...) 這邊主要使用的是令牌桶算法
    //構(gòu)建函數(shù)
    private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
        //冷因子不能小于1,默認(rèn)為3
        if (coldFactor <= 1) {
            throw new IllegalArgumentException("Cold factor should be larger than 1");
        }
        //設(shè)置閾值
        this.count = count;
        //設(shè)置冷因子
        this.coldFactor = coldFactor;
        // thresholdPermits = 0.5 * warmupPeriod / stableInterval.
        // warningToken = 100;
        //預(yù)警token=(預(yù)熱時(shí)長(zhǎng)*閾值)/(冷因子-1)
        warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
        // / maxPermits = thresholdPermits + 2 * warmupPeriod /
        // (stableInterval + coldInterval)
        // maxToken = 200
        //最大token=(預(yù)熱預(yù)警token*閾值)/(冷因子-1)
        maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
        // slope
        // slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits
        // - thresholdPermits);
        slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        long passQps = (long) node.passQps();

        long previousQps = (long) node.previousPassQps();
        syncToken(previousQps);

        // 開(kāi)始計(jì)算它的斜率
        // 如果進(jìn)入了警戒線寄症,開(kāi)始調(diào)整他的qps
        long restToken = storedTokens.get();
        if (restToken >= warningToken) {
            long aboveToken = restToken - warningToken;
            // 消耗的速度要比warning快,但是要比慢
            // current interval = restToken*slope+1/count
            double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            if (passQps + acquireCount <= warningQps) {
                return true;
            }
        } else {
            if (passQps + acquireCount <= count) {
                return true;
            }
        }

        return false;
    }
  1. 排隊(duì)等待
    主要是用到RateLimiterController#canPass(com.alibaba.csp.sentinel.node.Node, int, boolean)
   @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        //當(dāng)請(qǐng)求通過(guò)量小于等于0時(shí)矩动,直接返回通過(guò)
        if (acquireCount <= 0) {
            return true;
        }
        // Reject when count is less or equal than 0.
        // Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
        // 這個(gè)count是你控制臺(tái)設(shè)置的閾值
        if (count <= 0) {
            return false;
        }
        //獲取當(dāng)前時(shí)間
        long currentTime = TimeUtil.currentTimeMillis();
        // Calculate the interval between every two requests.
        // 計(jì)算兩個(gè)請(qǐng)求之間需要花費(fèi)的時(shí)間
        // 假設(shè)acquireCount=1,count =10, 則((1*1)/10*1000)取整=1000
        long costTime = Math.round(1.0 * (acquireCount) / count * 1000);

        // Expected pass time of this request.
        // 預(yù)期通過(guò)這個(gè)請(qǐng)求的時(shí)間
        long expectedTime = costTime + latestPassedTime.get();
        // 判斷預(yù)期通過(guò)時(shí)間是否小于當(dāng)前時(shí)間
        if (expectedTime <= currentTime) {
            // Contention may exist here, but it's okay.
            latestPassedTime.set(currentTime);
            return true;
        } else {
            // Calculate the time to wait.
            // 計(jì)算預(yù)計(jì)需要等待的時(shí)間(當(dāng)前預(yù)期需要花費(fèi)的時(shí)間+最后一次成功的時(shí)間-當(dāng)前時(shí)間)
            long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
            // 如果預(yù)計(jì)需要等待的時(shí)間大于后臺(tái)配置的等待時(shí)間有巧,則直接拒絕
            if (waitTime > maxQueueingTimeMs(后臺(tái)配置為毫秒)) {
                return false;
            } else {
            // 設(shè)置latestPassedTime,用atomic變量防止并發(fā)
                long oldTime = latestPassedTime.addAndGet(costTime);
                try {
            // 再做一次超時(shí)判斷
                    waitTime = oldTime - TimeUtil.currentTimeMillis();
                    if (waitTime > maxQueueingTimeMs) {
                        latestPassedTime.addAndGet(-costTime);
                        return false;
                    }
                    // in race condition waitTime may <= 0
            // 休眠等待時(shí)間
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                    }
                    return true;
                } catch (InterruptedException e) {
                }
            }
        }
        return false;
    }
  • DegradeSlot(重要,降級(jí)相關(guān))
    @Override
    public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
        if (cut.get()) {
            return false;
        }

        ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
        if (clusterNode == null) {
            return true;
        }
        //熔斷策略為RT(響應(yīng)時(shí)間)慢調(diào)用比例 這邊三個(gè)規(guī)則對(duì)應(yīng)后臺(tái)的配置
        if (grade == RuleConstant.DEGRADE_GRADE_RT) {
           //平均響應(yīng)時(shí)間
            double rt = clusterNode.avgRt();
            if (rt < this.count) {
                passCount.set(0);
                return true;
            }
            // Sentinel will degrade the service only if count exceeds.
            // 只有在通過(guò)數(shù)超過(guò)設(shè)置的最小值的時(shí)候才會(huì)降級(jí)
            if (passCount.incrementAndGet() < rtSlowRequestAmount) {
                return true;
            }
        }
            // 熔斷策略為異常比例時(shí)
         else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
            // 單位時(shí)間內(nèi)的異常數(shù)
            double exception = clusterNode.exceptionQps();
            // 單位時(shí)間內(nèi)的成功數(shù)
            double success = clusterNode.successQps();
            // 單位時(shí)間內(nèi)的總共請(qǐng)求數(shù)
            double total = clusterNode.totalQps();
            // If total amount is less than minRequestAmount, the request will pass.
            // 當(dāng)總共請(qǐng)求數(shù)達(dá)不到最小請(qǐng)求數(shù)(后臺(tái)配置)時(shí)直接放行
            if (total < minRequestAmount) {
                return true;
            }

            // In the same aligned statistic time window,
            // "success" (aka. completed count) = exception count + non-exception count (realSuccess)
            // success數(shù)包含了有異常跟無(wú)異常的悲没,所以要求realSuc要減去異常數(shù)
            double realSuccess = success - exception;
            if (realSuccess <= 0 && exception < minRequestAmount) {
                return true;
            }
            if (exception / success < count) {
                return true;
            }
        }
            // 熔斷策略為異常數(shù)
          else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
            //一分鐘內(nèi)的異常數(shù)  totalException() {    return rollingCounterInMinute.exception();   }
            double exception = clusterNode.totalException();
            if (exception < count) {
                return true;
            }
        }

        if (cut.compareAndSet(false, true)) {
            ResetTask resetTask = new ResetTask(this);
            pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
        }

        return false;
    }

集成nacos后配置更新相關(guān)源碼

主要利用nacos配置讀取更新等,可參考nacos配置中心源碼Nacos配置中心源碼

  • 參考nacos配置更新篮迎,他主要是注冊(cè)一個(gè)listener監(jiān)聽(tīng)nacos config(當(dāng)有數(shù)據(jù)發(fā)生改變的時(shí)候調(diào)用listener)
    主要是通過(guò)CacheData#safeNotifyListener這個(gè)方法來(lái)修改的,我們重點(diǎn)看這個(gè)方法示姿,具體nacos配置更新流程可查看
    private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
            final String md5, final ManagerListenerWrap listenerWrap) {
        final Listener listener = listenerWrap.listener;
        
        Runnable job = new Runnable() {
            @Override
            public void run() {
                ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
                ClassLoader appClassLoader = listener.getClass().getClassLoader();
                try {
                    if (listener instanceof AbstractSharedListener) {
                        AbstractSharedListener adapter = (AbstractSharedListener) listener;
                        adapter.fillContext(dataId, group);
                        LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
                    }
                    // 執(zhí)行回調(diào)之前先將線程classloader設(shè)置為具體webapp的classloader甜橱,以免回調(diào)方法中調(diào)用spi接口是出現(xiàn)異常或錯(cuò)用(多應(yīng)用部署才會(huì)有該問(wèn)題)峻凫。
                    Thread.currentThread().setContextClassLoader(appClassLoader);
                    
                    ConfigResponse cr = new ConfigResponse();
                    cr.setDataId(dataId);
                    cr.setGroup(group);
                    cr.setContent(content);
                    configFilterChainManager.doFilter(null, cr);
                    String contentTmp = cr.getContent();
                    //主要是調(diào)用這個(gè)方法渗鬼,下面其他代碼省略
                    listener.receiveConfigInfo(contentTmp);
                    ...
}
//----------------------------------?------------------------------------------
這個(gè)是nacos sentinel實(shí)現(xiàn)的監(jiān)聽(tīng)器

    public NacosDataSource(final Properties properties, final String groupId, final String dataId,
                           Converter<String, T> parser) {
        super(parser);
        if (StringUtil.isBlank(groupId) || StringUtil.isBlank(dataId)) {
            throw new IllegalArgumentException(String.format("Bad argument: groupId=[%s], dataId=[%s]",
                groupId, dataId));
        }
        AssertUtil.notNull(properties, "Nacos properties must not be null, you could put some keys from PropertyKeyConst");
        this.groupId = groupId;
        this.dataId = dataId;
        this.properties = properties;
        this.configListener = new Listener() {
            @Override
            public Executor getExecutor() {
                return pool;
            }
            @Override
            public void receiveConfigInfo(final String configInfo) {
                RecordLog.info(String.format("[NacosDataSource] New property value received for (properties: %s) (dataId: %s, groupId: %s): %s",
                    properties, dataId, groupId, configInfo));
                T newValue = NacosDataSource.this.parser.convert(configInfo);
                //主要是調(diào)用這個(gè)方法 Update the new value to the property.
                getProperty().updateValue(newValue);
            }
        };
        initNacosListener();
        loadInitialConfig();
    }
//----------------------------------?------------------------------------------

    public boolean updateValue(T newValue) {
        if (isEqual(value, newValue)) {
            return false;
        }
        RecordLog.info("[DynamicSentinelProperty] Config will be updated to: " + newValue);

        value = newValue;
        for (PropertyListener<T> listener : listeners) {
            //找到對(duì)當(dāng)前規(guī)則感興趣的listener,并修改值
            listener.configUpdate(newValue);
        }
        return true;
    }           
//----------------------------------?------------------------------------------

    private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {

        @Override
        public void configUpdate(List<FlowRule> value) {
            //組建新的規(guī)則map
            Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
            if (rules != null) {
                //替換map
                flowRules.clear();
                flowRules.putAll(rules);
            }
            RecordLog.info("[FlowRuleManager] Flow rules received: " + flowRules);
        }

        @Override
        public void configLoad(List<FlowRule> conf) {
            Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf);
            if (rules != null) {
                flowRules.clear();
                flowRules.putAll(rules);
            }
            RecordLog.info("[FlowRuleManager] Flow rules loaded: " + flowRules);
        }
    }

sentinel集成nacos總結(jié)

客戶(hù)端啟動(dòng)時(shí),會(huì)根據(jù)配置地址去取nacos上取sentinel配置荧琼,然后保存在本地的rules譬胎,每次校次檢驗(yàn)直接用本地rules去檢驗(yàn)。
啟動(dòng)sentinel控制臺(tái)命锄,當(dāng)在控制臺(tái)修改的時(shí)候會(huì)同步到nacos配置堰乔。當(dāng)修改為nacos獲取的時(shí)候也是一樣的,也會(huì)從nacos去取配置脐恩,
客戶(hù)端方面镐侯,當(dāng)nacos配置發(fā)生變化,sentinel會(huì)有監(jiān)聽(tīng)器去執(zhí)行,修改掉本地的配置苟翻。

問(wèn)題:
如果修改nacos的配置韵卤,dashboard會(huì)同步嗎
實(shí)測(cè),直接在nacos的控制臺(tái)修改配置崇猫,dashboard也會(huì)同步沈条,原理也是用的nacos的配置動(dòng)態(tài)更新
引入的包為sentinel-datasource-nacos在這里面

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市诅炉,隨后出現(xiàn)的幾起案子蜡歹,更是在濱河造成了極大的恐慌,老刑警劉巖涕烧,帶你破解...
    沈念sama閱讀 211,042評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件月而,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡议纯,警方通過(guò)查閱死者的電腦和手機(jī)父款,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,996評(píng)論 2 384
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)瞻凤,“玉大人铛漓,你說(shuō)我怎么就攤上這事■旯梗” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 156,674評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵玫坛,是天一觀的道長(zhǎng)结笨。 經(jīng)常有香客問(wèn)我,道長(zhǎng)湿镀,這世上最難降的妖魔是什么炕吸? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,340評(píng)論 1 283
  • 正文 為了忘掉前任,我火速辦了婚禮勉痴,結(jié)果婚禮上赫模,老公的妹妹穿的比我還像新娘。我一直安慰自己蒸矛,他們只是感情好瀑罗,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,404評(píng)論 5 384
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著雏掠,像睡著了一般斩祭。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上乡话,一...
    開(kāi)封第一講書(shū)人閱讀 49,749評(píng)論 1 289
  • 那天摧玫,我揣著相機(jī)與錄音,去河邊找鬼绑青。 笑死诬像,一個(gè)胖子當(dāng)著我的面吹牛屋群,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播坏挠,決...
    沈念sama閱讀 38,902評(píng)論 3 405
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼芍躏,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了癞揉?” 一聲冷哼從身側(cè)響起纸肉,我...
    開(kāi)封第一講書(shū)人閱讀 37,662評(píng)論 0 266
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎喊熟,沒(méi)想到半個(gè)月后柏肪,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,110評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡芥牌,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,451評(píng)論 2 325
  • 正文 我和宋清朗相戀三年烦味,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片壁拉。...
    茶點(diǎn)故事閱讀 38,577評(píng)論 1 340
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡谬俄,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出弃理,到底是詐尸還是另有隱情溃论,我是刑警寧澤,帶...
    沈念sama閱讀 34,258評(píng)論 4 328
  • 正文 年R本政府宣布痘昌,位于F島的核電站钥勋,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏辆苔。R本人自食惡果不足惜算灸,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,848評(píng)論 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望驻啤。 院中可真熱鬧菲驴,春花似錦、人聲如沸骑冗。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,726評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)贼涩。三九已至森逮,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間磁携,已是汗流浹背褒侧。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,952評(píng)論 1 264
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人闷供。 一個(gè)月前我還...
    沈念sama閱讀 46,271評(píng)論 2 360
  • 正文 我出身青樓烟央,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親歪脏。 傳聞我的和親對(duì)象是個(gè)殘疾皇子疑俭,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,452評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容