阿里sentinel源碼解析

寫在前面

sentinel是阿里巴巴開源的流量整形(限流黎泣、熔斷)框架,目前在github擁有15k+的star桑嘶,sentinel以流量為切入點(diǎn),從流量控制躬充、熔斷降級(jí)不翩、系統(tǒng)負(fù)載保護(hù)等多個(gè)維度保護(hù)服務(wù)的穩(wěn)定性。

我們以sentinel的主流程入手麻裳,分析sentinel是怎么搜集流量指標(biāo)口蝠,完成流量整形的。


sentinel處理鏈圖

首先我們先看一個(gè)sentinel的簡單使用demo津坑,只需要調(diào)用SphU.entry獲取到entry妙蔗,然后在完成業(yè)務(wù)方法之后調(diào)用entry.exit即可。

Entry entry = null;
// 務(wù)必保證 finally 會(huì)被執(zhí)行
try {
  // 資源名可使用任意有業(yè)務(wù)語義的字符串疆瑰,注意數(shù)目不能太多(超過 1K)眉反,超出幾千請(qǐng)作為參數(shù)傳入而不要直接作為資源名
  // EntryType 代表流量類型(inbound/outbound),其中系統(tǒng)規(guī)則只對(duì) IN 類型的埋點(diǎn)生效
  entry = SphU.entry("自定義資源名",EntryType.IN);
  // 被保護(hù)的業(yè)務(wù)邏輯
  // do something...
} catch (BlockException ex) {
  // 資源訪問阻止穆役,被限流或被降級(jí)
  // 進(jìn)行相應(yīng)的處理操作
} catch (Exception ex) {
  // 若需要配置降級(jí)規(guī)則寸五,需要通過這種方式記錄業(yè)務(wù)異常
  Tracer.traceEntry(ex, entry);
} finally {
  // 務(wù)必保證 exit,務(wù)必保證每個(gè) entry 與 exit 配對(duì)
  if (entry != null) {
    entry.exit();
  }
}

SphU.entry會(huì)調(diào)用Env.sph.entry耿币,將name和流量流向封裝成StringResourceWrapper梳杏,然后繼續(xù)調(diào)用entry處理。

    public static Entry entry(String name, EntryType trafficType) throws BlockException {
        return Env.sph.entry(name, trafficType, 1, OBJECTS0);
    }

   @Override
    public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
        StringResourceWrapper resource = new StringResourceWrapper(name, type);
        return entry(resource, count, args);
    }

進(jìn)入CtSph的entry方法,最終來到entryWithPriority十性,調(diào)用InternalContextUtil.internalEnter初始化ThreadLocal的Context叛溢,然后調(diào)用lookProcessChain初始化責(zé)任鏈,最終調(diào)用chain.entry進(jìn)入責(zé)任鏈進(jìn)行處理劲适。

    public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
        return entryWithPriority(resourceWrapper, count, false, args);
    }

    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        Context context = ContextUtil.getContext();
        if (context instanceof NullContext) {
            // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
            // so here init the entry only. No rule checking will be done.
            return new CtEntry(resourceWrapper, null, context);
        }

        if (context == null) {
            // Using default context.
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }

        // Global switch is close, no rule checking will do.
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }

        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

        /*
         * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
         * so no rule checking will be done.
         */
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }

        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }

InternalContextUtil.internalEnter會(huì)調(diào)用trueEnter方法楷掉,主要是生成DefaultNode到contextNameNodeMap,然后生成Context設(shè)置到contextHolder的過程霞势。

   static Context internalEnter(String name) {
            return trueEnter(name, "");
        }

 protected static Context trueEnter(String name, String origin) {
        Context context = contextHolder.get();
        if (context == null) {
            Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
            DefaultNode node = localCacheNameMap.get(name);
            if (node == null) {
                if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                    setNullContext();
                    return NULL_CONTEXT;
                } else {
                    LOCK.lock();
                    try {
                        node = contextNameNodeMap.get(name);
                        if (node == null) {
                            if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                                setNullContext();
                                return NULL_CONTEXT;
                            } else {
                                node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                                // Add entrance node.
                                Constants.ROOT.addChild(node);

                                Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                                newMap.putAll(contextNameNodeMap);
                                newMap.put(name, node);
                                contextNameNodeMap = newMap;
                            }
                        }
                    } finally {
                        LOCK.unlock();
                    }
                }
            }
            context = new Context(node, name);
            context.setOrigin(origin);
            contextHolder.set(context);
        }

        return context;
    }

lookProcessChain已經(jīng)做過優(yōu)化烹植,支持spi加載自定義的責(zé)任鏈bulider,如果沒有定義則使用默認(rèn)的DefaultSlotChainBuilder進(jìn)行加載愕贡。默認(rèn)加載的slot和順序可見鎮(zhèn)樓圖草雕,不再細(xì)說。

    ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        if (chain == null) {
            synchronized (LOCK) {
                chain = chainMap.get(resourceWrapper);
                if (chain == null) {
                    // Entry size limit.
                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                        return null;
                    }

                    chain = SlotChainProvider.newSlotChain();
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                }
            }
        }
        return chain;
    }

    public static ProcessorSlotChain newSlotChain() {
        if (slotChainBuilder != null) {
            return slotChainBuilder.build();
        }

        // Resolve the slot chain builder SPI.
        slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);

        if (slotChainBuilder == null) {
            // Should not go through here.
            RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
            slotChainBuilder = new DefaultSlotChainBuilder();
        } else {
            RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
                slotChainBuilder.getClass().getCanonicalName());
        }
        return slotChainBuilder.build();
    }

 @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();

        // Note: the instances of ProcessorSlot should be different, since they are not stateless.
        List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
        for (ProcessorSlot slot : sortedSlotList) {
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }

            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }

        return chain;
    }

最后來到重頭戲chain.entry進(jìn)入責(zé)任鏈進(jìn)行處理颂鸿,下面會(huì)按照順序分別對(duì)每個(gè)處理器進(jìn)行分析促绵。
首先來到NodeSelectorSlot,主要是獲取到name對(duì)應(yīng)的DefaultNode并緩存起來嘴纺,設(shè)置為context的當(dāng)前節(jié)點(diǎn)败晴,然后通知下一個(gè)節(jié)點(diǎn)。

 @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        DefaultNode node = map.get(context.getName());
        if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    node = new DefaultNode(resourceWrapper, null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                    // Build invocation tree
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }

            }
        }

        context.setCurNode(node);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

下一個(gè)節(jié)點(diǎn)是ClusterBuilderSlot栽渴,繼續(xù)對(duì)DefaultNode設(shè)置ClusterNode與OriginNode尖坤,然后通知下一節(jié)點(diǎn)。

  @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args)
        throws Throwable {
        if (clusterNode == null) {
            synchronized (lock) {
                if (clusterNode == null) {
                    // Create the cluster node.
                    clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                    HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                    newMap.putAll(clusterNodeMap);
                    newMap.put(node.getId(), clusterNode);

                    clusterNodeMap = newMap;
                }
            }
        }
        node.setClusterNode(clusterNode);

        /*
         * if context origin is set, we should get or create a new {@link Node} of
         * the specific origin.
         */
        if (!"".equals(context.getOrigin())) {
            Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
            context.getCurEntry().setOriginNode(originNode);
        }

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

下一個(gè)節(jié)點(diǎn)是LogSlot闲擦,只是單純的打印日志慢味,不再細(xì)說。

 @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        try {
            fireEntry(context, resourceWrapper, obj, count, prioritized, args);
        } catch (BlockException e) {
            EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
                context.getOrigin(), count);
            throw e;
        } catch (Throwable e) {
            RecordLog.warn("Unexpected entry exception", e);
        }
    }

下一個(gè)節(jié)點(diǎn)是StatisticSlot墅冷,是一個(gè)后置節(jié)點(diǎn)纯路,先通知下一個(gè)節(jié)點(diǎn)處理完后,
1.如果沒有報(bào)錯(cuò)寞忿,則對(duì)node驰唬、clusterNode、originNode腔彰、ENTRY_NODE的線程數(shù)叫编、通過請(qǐng)求數(shù)進(jìn)行增加。
2.如果報(bào)錯(cuò)是PriorityWaitException霹抛,則只對(duì)線程數(shù)進(jìn)行增加搓逾。
3.如果報(bào)錯(cuò)是BlockException,設(shè)置報(bào)錯(cuò)到node杯拐,然后對(duì)阻擋請(qǐng)求數(shù)進(jìn)行增加霞篡。
4.如果是其他報(bào)錯(cuò)世蔗,設(shè)置報(bào)錯(cuò)到node即可。

  @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // Do some checking.
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // Request passed, add thread count and pass count.
            node.increaseThreadNum();
            node.addPassRequest(count);

            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException ex) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {
            // Blocked, set block exception to current entry.
            context.getCurEntry().setBlockError(e);

            // Add block count.
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } catch (Throwable e) {
            // Unexpected internal error, set error to current entry.
            context.getCurEntry().setError(e);

            throw e;
        }
    }

下一個(gè)節(jié)點(diǎn)是FlowSlot寇损,這個(gè)節(jié)點(diǎn)就是重要的限流處理節(jié)點(diǎn)凸郑,進(jìn)入此節(jié)點(diǎn)是調(diào)用checker.checkFlow進(jìn)行限流處理裳食。

 @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        checkFlow(resourceWrapper, context, node, count, prioritized);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
        throws BlockException {
        checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
    }

來到FlowRuleChecker的checkFlow方法矛市,調(diào)用ruleProvider.apply獲取到資源對(duì)應(yīng)的FlowRule列表,然后遍歷FlowRule調(diào)用canPassCheck校驗(yàn)限流規(guī)則诲祸。

 public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider == null || resource == null) {
            return;
        }
        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }

canPassCheck會(huì)根據(jù)rule的限流模式浊吏,選擇集群限流或者本地限流,這里分別作出分析救氯。

    public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                    boolean prioritized) {
        String limitApp = rule.getLimitApp();
        if (limitApp == null) {
            return true;
        }

        if (rule.isClusterMode()) {
            return passClusterCheck(rule, context, node, acquireCount, prioritized);
        }

        return passLocalCheck(rule, context, node, acquireCount, prioritized);
    }

passLocalCheck是本地限流的入口找田,首先會(huì)調(diào)用selectNodeByRequesterAndStrategy選出限流的node,然后調(diào)用canPass進(jìn)行校驗(yàn)着憨。

    private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        if (selectedNode == null) {
            return true;
        }

        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }

selectNodeByRequesterAndStrategy會(huì)根據(jù)以下規(guī)則選中node墩衙。
1.strategy是STRATEGY_DIRECT。
1.1.limitApp不是other和default甲抖,并且等于orgin時(shí)漆改,選擇originNode。
1.2.limitApp是other准谚,選擇originNode挫剑。
1.3.limitApp是default,選擇clusterNode柱衔。
2.strategy是STRATEGY_RELATE樊破,選擇clusterNode。
3.strategy是STRATEGY_CHAIN唆铐,選擇node哲戚。

static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
        // The limit app should not be empty.
        String limitApp = rule.getLimitApp();
        int strategy = rule.getStrategy();
        String origin = context.getOrigin();

        if (limitApp.equals(origin) && filterOrigin(origin)) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                // Matches limit origin, return origin statistic node.
                return context.getOriginNode();
            }

            return selectReferenceNode(rule, context, node);
        } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                // Return the cluster node.
                return node.getClusterNode();
            }

            return selectReferenceNode(rule, context, node);
        } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
            && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                return context.getOriginNode();
            }

            return selectReferenceNode(rule, context, node);
        }

        return null;
    }

static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
        String refResource = rule.getRefResource();
        int strategy = rule.getStrategy();

        if (StringUtil.isEmpty(refResource)) {
            return null;
        }

        if (strategy == RuleConstant.STRATEGY_RELATE) {
            return ClusterBuilderSlot.getClusterNode(refResource);
        }

        if (strategy == RuleConstant.STRATEGY_CHAIN) {
            if (!refResource.equals(context.getName())) {
                return null;
            }
            return node;
        }
        // No node.
        return null;
    }

選擇好對(duì)應(yīng)的node后就是調(diào)用canPass校驗(yàn)限流規(guī)則,目前sentinel有三種本地限流規(guī)則:普通限流艾岂、勻速限流顺少、冷啟動(dòng)限流。

普通限流的實(shí)現(xiàn)是DefaultController澳盐,就是統(tǒng)計(jì)當(dāng)前的線程數(shù)或者qps加上需要通過的數(shù)量有沒有大于限定值祈纯,小于等于則直接通過,否則阻擋叼耙。

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        int curCount = avgUsedTokens(node);
        if (curCount + acquireCount > count) {
            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
                long currentTime;
                long waitInMs;
                currentTime = TimeUtil.currentTimeMillis();
                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
                if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                    node.addOccupiedPass(acquireCount);
                    sleep(waitInMs);

                    // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                    throw new PriorityWaitException(waitInMs);
                }
            }
            return false;
        }
        return true;
    }

    private int avgUsedTokens(Node node) {
        if (node == null) {
            return DEFAULT_AVG_USED_TOKENS;
        }
        return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
    }

勻速限流的實(shí)現(xiàn)是RateLimiterController腕窥,使用了AtomicLong保證了latestPassedTime的原子增長,因此停頓的時(shí)間是根據(jù)latestPassedTime-currentTime計(jì)算出來筛婉,得到一個(gè)勻速的睡眠時(shí)間簇爆。

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        // Pass when acquire count is less or equal than 0.
        if (acquireCount <= 0) {
            return true;
        }
        // Reject when count is less or equal than 0.
        // Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
        if (count <= 0) {
            return false;
        }

        long currentTime = TimeUtil.currentTimeMillis();
        // Calculate the interval between every two requests.
        long costTime = Math.round(1.0 * (acquireCount) / count * 1000);

        // Expected pass time of this request.
        long expectedTime = costTime + latestPassedTime.get();

        if (expectedTime <= currentTime) {
            // Contention may exist here, but it's okay.
            latestPassedTime.set(currentTime);
            return true;
        } else {
            // Calculate the time to wait.
            long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
            if (waitTime > maxQueueingTimeMs) {
                return false;
            } else {
                long oldTime = latestPassedTime.addAndGet(costTime);
                try {
                    waitTime = oldTime - TimeUtil.currentTimeMillis();
                    if (waitTime > maxQueueingTimeMs) {
                        latestPassedTime.addAndGet(-costTime);
                        return false;
                    }
                    // in race condition waitTime may <= 0
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                    }
                    return true;
                } catch (InterruptedException e) {
                }
            }
        }
        return false;
    }

冷啟動(dòng)限流的實(shí)現(xiàn)是WarmUpController癞松,是sentinel中最難懂的限流方式,其實(shí)不太需要關(guān)注這些復(fù)雜公式的計(jì)算入蛆,也可以得出冷啟動(dòng)的限流思路:
1.當(dāng)qps已經(jīng)達(dá)到溫?zé)釥顟B(tài)時(shí)响蓉,按照正常的添加令牌消耗令牌即可。
2.當(dāng)qps處于過冷狀態(tài)時(shí)哨毁,會(huì)添加令牌使得算法繼續(xù)降溫枫甲。
3.當(dāng)qps逐漸回升,大于過冷的邊界qps值時(shí)扼褪,不再添加令牌想幻,慢慢消耗令牌使得逐漸增大單位時(shí)間可通過的請(qǐng)求數(shù),讓算法繼續(xù)回溫话浇。
總結(jié)出一點(diǎn)脏毯,可通過的請(qǐng)求數(shù)跟令牌桶剩余令牌數(shù)量成反比,以達(dá)到冷啟動(dòng)的作用幔崖。

 private void construct(double count, int warmUpPeriodInSec, int coldFactor) {

        if (coldFactor <= 1) {
            throw new IllegalArgumentException("Cold factor should be larger than 1");
        }

        this.count = count;

        this.coldFactor = coldFactor;

        // thresholdPermits = 0.5 * warmupPeriod / stableInterval.
        // warningToken = 100;
        warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
        // / maxPermits = thresholdPermits + 2 * warmupPeriod /
        // (stableInterval + coldInterval)
        // maxToken = 200
        maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));

        // slope
        // slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits
        // - thresholdPermits);
        slope = (coldFactor - 1.0) / count / (maxToken - warningToken);

    }

@Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        long passQps = (long) node.passQps();

        long previousQps = (long) node.previousPassQps();
        syncToken(previousQps);

        // 開始計(jì)算它的斜率
        // 如果進(jìn)入了警戒線食店,開始調(diào)整他的qps
        long restToken = storedTokens.get();
        if (restToken >= warningToken) {
            long aboveToken = restToken - warningToken;
            // 消耗的速度要比warning快,但是要比慢
            // current interval = restToken*slope+1/count
            double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            if (passQps + acquireCount <= warningQps) {
                return true;
            }
        } else {
            if (passQps + acquireCount <= count) {
                return true;
            }
        }

        return false;
    }

 protected void syncToken(long passQps) {
        long currentTime = TimeUtil.currentTimeMillis();
        currentTime = currentTime - currentTime % 1000;
        long oldLastFillTime = lastFilledTime.get();
        if (currentTime <= oldLastFillTime) {
            return;
        }

        long oldValue = storedTokens.get();
        long newValue = coolDownTokens(currentTime, passQps);

        if (storedTokens.compareAndSet(oldValue, newValue)) {
            long currentValue = storedTokens.addAndGet(0 - passQps);
            if (currentValue < 0) {
                storedTokens.set(0L);
            }
            lastFilledTime.set(currentTime);
        }
    }

    private long coolDownTokens(long currentTime, long passQps) {
        long oldValue = storedTokens.get();
        long newValue = oldValue;

        // 添加令牌的判斷前提條件:
        // 當(dāng)令牌的消耗程度遠(yuǎn)遠(yuǎn)低于警戒線的時(shí)候
        if (oldValue < warningToken) {
            newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
        } else if (oldValue > warningToken) {
            if (passQps < (int)count / coldFactor) {
                newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
            }
        }
        return Math.min(newValue, maxToken);
    }

接下來是集群限流赏寇,passClusterCheck是集群限流的入口吉嫩,會(huì)根據(jù)flowId調(diào)用clusterSerivce獲取指定數(shù)量的token,然后根據(jù)其結(jié)果判斷是否通過蹋订、睡眠率挣、降級(jí)到本地限流、阻擋露戒。

 private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                            boolean prioritized) {
        try {
            TokenService clusterService = pickClusterService();
            if (clusterService == null) {
                return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
            }
            long flowId = rule.getClusterConfig().getFlowId();
            TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
            return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
            // If client is absent, then fallback to local mode.
        } catch (Throwable ex) {
            RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
        }
        // Fallback to local flow control when token client or server for this rule is not available.
        // If fallback is not enabled, then directly pass.
        return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
    }

private static boolean applyTokenResult(/*@NonNull*/ TokenResult result, FlowRule rule, Context context,
                                                         DefaultNode node,
                                                         int acquireCount, boolean prioritized) {
        switch (result.getStatus()) {
            case TokenResultStatus.OK:
                return true;
            case TokenResultStatus.SHOULD_WAIT:
                // Wait for next tick.
                try {
                    Thread.sleep(result.getWaitInMs());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return true;
            case TokenResultStatus.NO_RULE_EXISTS:
            case TokenResultStatus.BAD_REQUEST:
            case TokenResultStatus.FAIL:
            case TokenResultStatus.TOO_MANY_REQUEST:
                return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
            case TokenResultStatus.BLOCKED:
            default:
                return false;
        }
    }

接下來看一下ClusterService的處理椒功,會(huì)根據(jù)ruleId獲取到對(duì)應(yīng)的FlowRule,然后調(diào)用ClusterFlowChecker.acquireClusterToken獲取結(jié)果返回智什。ClusterFlowChecker.acquireClusterToken的處理方式跟普通限流是一樣的动漾,只是會(huì)將集群的請(qǐng)求都集中在一個(gè)service中處理,來達(dá)到集群限流的效果荠锭,不再細(xì)說旱眯。

    @Override
    public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
        if (notValidRequest(ruleId, acquireCount)) {
            return badRequest();
        }
        // The rule should be valid.
        FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
        if (rule == null) {
            return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
        }

        return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized);
    }

    static TokenResult acquireClusterToken(/*@Valid*/ FlowRule rule, int acquireCount, boolean prioritized) {
        Long id = rule.getClusterConfig().getFlowId();

        if (!allowProceed(id)) {
            return new TokenResult(TokenResultStatus.TOO_MANY_REQUEST);
        }

        ClusterMetric metric = ClusterMetricStatistics.getMetric(id);
        if (metric == null) {
            return new TokenResult(TokenResultStatus.FAIL);
        }

        double latestQps = metric.getAvg(ClusterFlowEvent.PASS);
        double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.getExceedCount();
        double nextRemaining = globalThreshold - latestQps - acquireCount;

        if (nextRemaining >= 0) {
            // TODO: checking logic and metric operation should be separated.
            metric.add(ClusterFlowEvent.PASS, acquireCount);
            metric.add(ClusterFlowEvent.PASS_REQUEST, 1);
            if (prioritized) {
                // Add prioritized pass.
                metric.add(ClusterFlowEvent.OCCUPIED_PASS, acquireCount);
            }
            // Remaining count is cut down to a smaller integer.
            return new TokenResult(TokenResultStatus.OK)
                .setRemaining((int) nextRemaining)
                .setWaitInMs(0);
        } else {
            if (prioritized) {
                // Try to occupy incoming buckets.
                double occupyAvg = metric.getAvg(ClusterFlowEvent.WAITING);
                if (occupyAvg <= ClusterServerConfigManager.getMaxOccupyRatio() * globalThreshold) {
                    int waitInMs = metric.tryOccupyNext(ClusterFlowEvent.PASS, acquireCount, globalThreshold);
                    // waitInMs > 0 indicates pre-occupy incoming buckets successfully.
                    if (waitInMs > 0) {
                        ClusterServerStatLogUtil.log("flow|waiting|" + id);
                        return new TokenResult(TokenResultStatus.SHOULD_WAIT)
                            .setRemaining(0)
                            .setWaitInMs(waitInMs);
                    }
                    // Or else occupy failed, should be blocked.
                }
            }
            // Blocked.
            metric.add(ClusterFlowEvent.BLOCK, acquireCount);
            metric.add(ClusterFlowEvent.BLOCK_REQUEST, 1);
            ClusterServerStatLogUtil.log("flow|block|" + id, acquireCount);
            ClusterServerStatLogUtil.log("flow|block_request|" + id, 1);
            if (prioritized) {
                // Add prioritized block.
                metric.add(ClusterFlowEvent.OCCUPIED_BLOCK, acquireCount);
                ClusterServerStatLogUtil.log("flow|occupied_block|" + id, 1);
            }

            return blockedResult();
        }
    }

FlowSlot的下一個(gè)節(jié)點(diǎn)是DegradeSlot,是熔斷處理器证九,進(jìn)入時(shí)會(huì)調(diào)用performChecking删豺,進(jìn)而獲取到CircuitBreaker列表,然后調(diào)用其tryPass校驗(yàn)是否熔斷愧怜。

   @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        performChecking(context, resourceWrapper);

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    void performChecking(Context context, ResourceWrapper r) throws BlockException {
        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
        if (circuitBreakers == null || circuitBreakers.isEmpty()) {
            return;
        }
        for (CircuitBreaker cb : circuitBreakers) {
            if (!cb.tryPass(context)) {
                throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
            }
        }
    }

來到AbstractCircuitBreaker的tryPass方法呀页,主要是判斷熔斷器狀態(tài),如果是close直接放行拥坛,如果是open則會(huì)校驗(yàn)是否到達(dá)開啟halfopen的時(shí)間蓬蝶,如果成功將狀態(tài)cas成halfopen則繼續(xù)放行尘分,其他情況都是阻攔。

    @Override
    public boolean tryPass(Context context) {
        // Template implementation.
        if (currentState.get() == State.CLOSED) {
            return true;
        }
        if (currentState.get() == State.OPEN) {
            // For half-open state we allow a request for probing.
            return retryTimeoutArrived() && fromOpenToHalfOpen(context);
        }
        return false;
    }

protected boolean fromOpenToHalfOpen(Context context) {
        if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
            notifyObservers(State.OPEN, State.HALF_OPEN, null);
            Entry entry = context.getCurEntry();
            entry.whenTerminate(new BiConsumer<Context, Entry>() {
                @Override
                public void accept(Context context, Entry entry) {
                    // Note: This works as a temporary workaround for https://github.com/alibaba/Sentinel/issues/1638
                    // Without the hook, the circuit breaker won't recover from half-open state in some circumstances
                    // when the request is actually blocked by upcoming rules (not only degrade rules).
                    if (entry.getBlockError() != null) {
                        // Fallback to OPEN due to detecting request is blocked
                        currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
                        notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
                    }
                }
            });
            return true;
        }
        return false;
    }

那怎么將熔斷器的狀態(tài)從close變成open呢丸氛?怎么將halfopen變成close或者open呢培愁?sentinel由兩種熔斷器:錯(cuò)誤數(shù)熔斷器ExceptionCircuitBreaker、響應(yīng)時(shí)間熔斷器ResponseTimeCircuitBreaker缓窜,都分析一遍定续。
當(dāng)業(yè)務(wù)方法報(bào)錯(cuò)時(shí)會(huì)調(diào)用Tracer.traceEntry將報(bào)錯(cuò)設(shè)置到entry上。

 public static void traceEntry(Throwable e, Entry entry) {
        if (!shouldTrace(e)) {
            return;
        }
        traceEntryInternal(e, entry);
    }

    private static void traceEntryInternal(/*@NeedToTrace*/ Throwable e, Entry entry) {
        if (entry == null) {
            return;
        }

        entry.setError(e);
    }

當(dāng)調(diào)用entry.exit時(shí)雹洗,會(huì)隨著責(zé)任鏈來到DegradeSlot的exit方法香罐,會(huì)遍歷熔斷器列表調(diào)用其onRequestComplete方法卧波。

    @Override
    public void exit(Context context, ResourceWrapper r, int count, Object... args) {
        Entry curEntry = context.getCurEntry();
        if (curEntry.getBlockError() != null) {
            fireExit(context, r, count, args);
            return;
        }
        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
        if (circuitBreakers == null || circuitBreakers.isEmpty()) {
            fireExit(context, r, count, args);
            return;
        }

        if (curEntry.getBlockError() == null) {
            // passed request
            for (CircuitBreaker circuitBreaker : circuitBreakers) {
                circuitBreaker.onRequestComplete(context);
            }
        }

        fireExit(context, r, count, args);
    }

ExceptionCircuitBreaker的onRequestComplete會(huì)記錄錯(cuò)誤數(shù)和總請(qǐng)求數(shù)时肿,然后調(diào)用handleStateChangeWhenThresholdExceeded繼續(xù)處理。
1.當(dāng)前狀態(tài)是open時(shí)港粱,不應(yīng)該由熔斷器底層去轉(zhuǎn)換狀態(tài)螃成,直接退出。
2.當(dāng)前狀態(tài)是halfopen時(shí)查坪,如果沒有報(bào)錯(cuò)寸宏,則將halfopen變成close,否則將halfopen變成open偿曙。
3.當(dāng)前狀態(tài)時(shí)close時(shí)氮凝,則根據(jù)是否總請(qǐng)求達(dá)到了最低請(qǐng)求數(shù),如果達(dá)到了話再比較錯(cuò)誤數(shù)/錯(cuò)誤比例是否大于限定值望忆,如果大于則直接轉(zhuǎn)換成open罩阵。

  @Override
    public void onRequestComplete(Context context) {
        Entry entry = context.getCurEntry();
        if (entry == null) {
            return;
        }
        Throwable error = entry.getError();
        SimpleErrorCounter counter = stat.currentWindow().value();
        if (error != null) {
            counter.getErrorCount().add(1);
        }
        counter.getTotalCount().add(1);

        handleStateChangeWhenThresholdExceeded(error);
    }

    private void handleStateChangeWhenThresholdExceeded(Throwable error) {
        if (currentState.get() == State.OPEN) {
            return;
        }
        
        if (currentState.get() == State.HALF_OPEN) {
            // In detecting request
            if (error == null) {
                fromHalfOpenToClose();
            } else {
                fromHalfOpenToOpen(1.0d);
            }
            return;
        }
        
        List<SimpleErrorCounter> counters = stat.values();
        long errCount = 0;
        long totalCount = 0;
        for (SimpleErrorCounter counter : counters) {
            errCount += counter.errorCount.sum();
            totalCount += counter.totalCount.sum();
        }
        if (totalCount < minRequestAmount) {
            return;
        }
        double curCount = errCount;
        if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
            // Use errorRatio
            curCount = errCount * 1.0d / totalCount;
        }
        if (curCount > threshold) {
            transformToOpen(curCount);
        }
    }

ExceptionCircuitBreaker的onRequestComplete會(huì)記錄慢響應(yīng)數(shù)和總請(qǐng)求數(shù),然后調(diào)用handleStateChangeWhenThresholdExceeded繼續(xù)處理启摄。
1.當(dāng)前狀態(tài)是open時(shí)稿壁,不應(yīng)該由熔斷器底層去轉(zhuǎn)換狀態(tài),直接退出歉备。
2.當(dāng)前狀態(tài)是halfopen時(shí)傅是,如果當(dāng)前響應(yīng)時(shí)間小于限定值,則將halfopen變成close蕾羊,否則將halfopen變成open喧笔。
3.當(dāng)前狀態(tài)時(shí)close時(shí),則根據(jù)是否總請(qǐng)求達(dá)到了最低請(qǐng)求數(shù)龟再,如果達(dá)到了話再比較慢請(qǐng)求數(shù)/慢請(qǐng)求比例是否大于限定值书闸,如果大于則直接轉(zhuǎn)換成open。

@Override
    public void onRequestComplete(Context context) {
        SlowRequestCounter counter = slidingCounter.currentWindow().value();
        Entry entry = context.getCurEntry();
        if (entry == null) {
            return;
        }
        long completeTime = entry.getCompleteTimestamp();
        if (completeTime <= 0) {
            completeTime = TimeUtil.currentTimeMillis();
        }
        long rt = completeTime - entry.getCreateTimestamp();
        if (rt > maxAllowedRt) {
            counter.slowCount.add(1);
        }
        counter.totalCount.add(1);

        handleStateChangeWhenThresholdExceeded(rt);
    }

    private void handleStateChangeWhenThresholdExceeded(long rt) {
        if (currentState.get() == State.OPEN) {
            return;
        }
        
        if (currentState.get() == State.HALF_OPEN) {
            // In detecting request
            // TODO: improve logic for half-open recovery
            if (rt > maxAllowedRt) {
                fromHalfOpenToOpen(1.0d);
            } else {
                fromHalfOpenToClose();
            }
            return;
        }

        List<SlowRequestCounter> counters = slidingCounter.values();
        long slowCount = 0;
        long totalCount = 0;
        for (SlowRequestCounter counter : counters) {
            slowCount += counter.slowCount.sum();
            totalCount += counter.totalCount.sum();
        }
        if (totalCount < minRequestAmount) {
            return;
        }
        double currentRatio = slowCount * 1.0d / totalCount;
        if (currentRatio > maxSlowRequestRatio) {
            transformToOpen(currentRatio);
        }
        if (Double.compare(currentRatio, maxSlowRequestRatio) == 0 &&
                Double.compare(maxSlowRequestRatio, SLOW_REQUEST_RATIO_MAX_VALUE) == 0) {
            transformToOpen(currentRatio);
        }
    }

下一個(gè)節(jié)點(diǎn)是AuthoritySlot吸申,權(quán)限控制器梗劫,這個(gè)控制器就是看當(dāng)前origin是否被允許進(jìn)入請(qǐng)求享甸,不允許則報(bào)錯(cuò),不再細(xì)說梳侨。

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
        throws Throwable {
        checkBlackWhiteAuthority(resourceWrapper, context);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

   void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
        Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();

        if (authorityRules == null) {
            return;
        }

        Set<AuthorityRule> rules = authorityRules.get(resource.getName());
        if (rules == null) {
            return;
        }

        for (AuthorityRule rule : rules) {
            if (!AuthorityRuleChecker.passCheck(rule, context)) {
                throw new AuthorityException(context.getOrigin(), rule);
            }
        }
    }

 static boolean passCheck(AuthorityRule rule, Context context) {
        String requester = context.getOrigin();

        // Empty origin or empty limitApp will pass.
        if (StringUtil.isEmpty(requester) || StringUtil.isEmpty(rule.getLimitApp())) {
            return true;
        }

        // Do exact match with origin name.
        int pos = rule.getLimitApp().indexOf(requester);
        boolean contain = pos > -1;

        if (contain) {
            boolean exactlyMatch = false;
            String[] appArray = rule.getLimitApp().split(",");
            for (String app : appArray) {
                if (requester.equals(app)) {
                    exactlyMatch = true;
                    break;
                }
            }

            contain = exactlyMatch;
        }

        int strategy = rule.getStrategy();
        if (strategy == RuleConstant.AUTHORITY_BLACK && contain) {
            return false;
        }

        if (strategy == RuleConstant.AUTHORITY_WHITE && !contain) {
            return false;
        }

        return true;
    }

終于來到最后一個(gè)節(jié)點(diǎn)SystemSlot了蛉威,此節(jié)點(diǎn)是自適應(yīng)處理器,主要是根據(jù)系統(tǒng)自身負(fù)載(qps走哺、最大線程數(shù)蚯嫌、最高響應(yīng)時(shí)間、cpu使用率丙躏、系統(tǒng)bbr)來判斷請(qǐng)求是否能夠通過择示,保證系統(tǒng)處于一個(gè)能穩(wěn)定處理請(qǐng)求的安全狀態(tài)。

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        SystemRuleManager.checkSystem(resourceWrapper);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

 public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
        if (resourceWrapper == null) {
            return;
        }
        // Ensure the checking switch is on.
        if (!checkSystemStatus.get()) {
            return;
        }

        // for inbound traffic only
        if (resourceWrapper.getEntryType() != EntryType.IN) {
            return;
        }

        // total qps
        double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
        if (currentQps > qps) {
            throw new SystemBlockException(resourceWrapper.getName(), "qps");
        }

        // total thread
        int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
        if (currentThread > maxThread) {
            throw new SystemBlockException(resourceWrapper.getName(), "thread");
        }

        double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
        if (rt > maxRt) {
            throw new SystemBlockException(resourceWrapper.getName(), "rt");
        }

        // load. BBR algorithm.
        if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
            if (!checkBbr(currentThread)) {
                throw new SystemBlockException(resourceWrapper.getName(), "load");
            }
        }

        // cpu usage
        if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
            throw new SystemBlockException(resourceWrapper.getName(), "cpu");
        }
    }

尤其值得一提的是bbr算法晒旅,作者參考了tcp bbr的設(shè)計(jì)栅盲,通過最大的qps和最小的響應(yīng)時(shí)間動(dòng)態(tài)計(jì)算出可進(jìn)入的線程數(shù),而不是一個(gè)粗暴的固定可進(jìn)入的線程數(shù)废恋,為什么能通過這兩個(gè)值就能計(jì)算出可進(jìn)入的線程數(shù)谈秫?可以網(wǎng)上搜索一下tcp bbr算法的解析,十分巧妙鱼鼓,不再細(xì)說拟烫。

    private static boolean checkBbr(int currentThread) {
        if (currentThread > 1 &&
            currentThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) {
            return false;
        }
        return true;
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市迄本,隨后出現(xiàn)的幾起案子硕淑,更是在濱河造成了極大的恐慌,老刑警劉巖嘉赎,帶你破解...
    沈念sama閱讀 206,968評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件置媳,死亡現(xiàn)場離奇詭異,居然都是意外死亡曹阔,警方通過查閱死者的電腦和手機(jī)半开,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來赃份,“玉大人寂拆,你說我怎么就攤上這事∽ズ” “怎么了纠永?”我有些...
    開封第一講書人閱讀 153,220評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長谒拴。 經(jīng)常有香客問我尝江,道長,這世上最難降的妖魔是什么英上? 我笑而不...
    開封第一講書人閱讀 55,416評(píng)論 1 279
  • 正文 為了忘掉前任炭序,我火速辦了婚禮啤覆,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘惭聂。我一直安慰自己窗声,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,425評(píng)論 5 374
  • 文/花漫 我一把揭開白布辜纲。 她就那樣靜靜地躺著笨觅,像睡著了一般。 火紅的嫁衣襯著肌膚如雪耕腾。 梳的紋絲不亂的頭發(fā)上见剩,一...
    開封第一講書人閱讀 49,144評(píng)論 1 285
  • 那天,我揣著相機(jī)與錄音扫俺,去河邊找鬼苍苞。 笑死,一個(gè)胖子當(dāng)著我的面吹牛牵舵,可吹牛的內(nèi)容都是我干的柒啤。 我是一名探鬼主播,決...
    沈念sama閱讀 38,432評(píng)論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼畸颅,長吁一口氣:“原來是場噩夢(mèng)啊……” “哼!你這毒婦竟也來了方援?” 一聲冷哼從身側(cè)響起没炒,我...
    開封第一講書人閱讀 37,088評(píng)論 0 261
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎犯戏,沒想到半個(gè)月后送火,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,586評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡先匪,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,028評(píng)論 2 325
  • 正文 我和宋清朗相戀三年种吸,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片呀非。...
    茶點(diǎn)故事閱讀 38,137評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡坚俗,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出岸裙,到底是詐尸還是另有隱情猖败,我是刑警寧澤,帶...
    沈念sama閱讀 33,783評(píng)論 4 324
  • 正文 年R本政府宣布降允,位于F島的核電站恩闻,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏剧董。R本人自食惡果不足惜幢尚,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,343評(píng)論 3 307
  • 文/蒙蒙 一破停、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧尉剩,春花似錦辱挥、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至功蜓,卻和暖如春惶洲,著一層夾襖步出監(jiān)牢的瞬間纷闺,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評(píng)論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留添祸,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,595評(píng)論 2 355
  • 正文 我出身青樓狡相,卻偏偏與公主長得像后专,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子美浦,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,901評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容