Sentinel之集群限流源碼分析(二)

集群限流源碼分析(二)

接下來分析sentinel-cluster-server-default模塊中server包下的內(nèi)容。

1悟狱、源碼目錄結(jié)構(gòu)

源碼結(jié)構(gòu)

2、源碼分析

分析內(nèi)容

2.1 編解碼codec

2.1.1 ServerEntityCodecProvider

編解碼提供工具類击儡,用于獲取RequestEntityDecoder與ResponseEntityWriter闷哆;
源碼如下:

public final class ServerEntityCodecProvider {
    private static RequestEntityDecoder requestEntityDecoder = null;
    private static ResponseEntityWriter responseEntityWriter = null;

    static {
        resolveInstance();
    }

    private static void resolveInstance() {
        ResponseEntityWriter writer = SpiLoader.loadFirstInstance(ResponseEntityWriter.class);
        if (writer == null) {
            RecordLog.warn("[ServerEntityCodecProvider] No existing response entity writer, resolve failed");
        } else {
            responseEntityWriter = writer;
            RecordLog.info(
                "[ServerEntityCodecProvider] Response entity writer resolved: " + responseEntityWriter.getClass()
                    .getCanonicalName());
        }
        RequestEntityDecoder decoder = SpiLoader.loadFirstInstance(RequestEntityDecoder.class);
        if (decoder == null) {
            RecordLog.warn("[ServerEntityCodecProvider] No existing request entity decoder, resolve failed");
        } else {
            requestEntityDecoder = decoder;
            RecordLog.info(
                "[ServerEntityCodecProvider] Request entity decoder resolved: " + requestEntityDecoder.getClass()
                    .getCanonicalName());
        }
    }

    public static RequestEntityDecoder getRequestEntityDecoder() {
        return requestEntityDecoder;
    }

    public static ResponseEntityWriter getResponseEntityWriter() {
        return responseEntityWriter;
    }

}

實(shí)例變量

  • requestEntityDecoder:客戶端請(qǐng)求解碼器
  • responseEntityWriter:服務(wù)端響應(yīng)編碼器

靜態(tài)方法

  • resolveInstance:通過SPI機(jī)制獲取requestEntityDecoder與responseEntityWriter的實(shí)現(xiàn)類腰奋;并保存在實(shí)例變量中。
  • requestEntityDecoder與responseEntityWriter實(shí)現(xiàn)類分別是DefaultRequestEntityDecoder抱怔、DefaultResponseEntityWriter氛堕;實(shí)現(xiàn)類的指定配置META-INF.services目錄下。

2.1.2 DefaultRequestEntityDecoder

默認(rèn)的ClusterRequest對(duì)象解碼器
解碼格式如下:

  • +--------+---------+---------+
  • | xid(4) | type(1) | data... |
  • +--------+---------+---------+

源碼

@Override
public ClusterRequest decode(ByteBuf source) {

    //1. 判斷可讀的字節(jié)數(shù)是否大于5個(gè)字節(jié)
    if (source.readableBytes() >= 5) {

        //2. 獲取xid:4個(gè)字節(jié)
        int xid = source.readInt();

        //3. 獲取type 1個(gè)字節(jié)野蝇;三種類型:PING讼稚、FLOW、PARAM_FLOW:下文會(huì)講
        int type = source.readByte();

        //4. 從注冊(cè)器中獲取具體類型的解碼器
        EntityDecoder<ByteBuf, ?> dataDecoder = RequestDataDecodeRegistry.getDecoder(type);
        if (dataDecoder == null) {
            RecordLog.warn("Unknown type of request data decoder: {0}", type);
            return null;
        }
        Object data;

        //5. 再次確認(rèn)下可讀取的字節(jié)數(shù)
        if (source.readableBytes() == 0) {
            data = null;
        } else {

            // 6. 讀取數(shù)據(jù)
            data = dataDecoder.decode(source);
        }
        return new ClusterRequest<>(xid, type, data);
    }
    return null;
}

2.1.3 DefaultResponseEntityWriter

默認(rèn)的ClusterResponse響應(yīng)編碼器
源碼

@Override
public void writeTo(ClusterResponse response, ByteBuf out) {

    //1. 獲取響應(yīng)類型及具體的響應(yīng)數(shù)據(jù)編碼器
    int type = response.getType();
    EntityWriter<Object, ByteBuf> responseDataWriter = ResponseDataWriterRegistrygetWriter(type);
    if (responseDataWriter == null) {
        writeHead(response.setStatus(ClusterConstants.RESPONSE_STATUS_BAD), out);
        RecordLog.warn("[NettyResponseEncoder] Cannot find matching writer for type <{0>", response.getType());
        return;
    }

    // 2. 寫入頭部數(shù)據(jù)
    writeHead(response, out);

    // 3. 寫入數(shù)據(jù)包
    responseDataWriter.writeTo(response.getData(), out);
}
private void writeHead(Response response, ByteBuf out) {
    out.writeInt(response.getId());
    out.writeByte(response.getType());

    //狀態(tài)绕沈,如錯(cuò)誤狀態(tài)會(huì)寫入
    out.writeByte(response.getStatus());
}

2.1.3 registry

  • RequestDataDecodeRegistry:RequestData解碼注冊(cè)器
  • ResponseDataWriterRegistry:ResponseData編碼注冊(cè)器
    源碼
public final class RequestDataDecodeRegistry {

    //保存EntityDecoder的Map锐想,key是類型type
    private static final Map<Integer, EntityDecoder<ByteBuf, ?>> DECODER_MAP = new HashMap<>();

    public static boolean addDecoder(int type, EntityDecoder<ByteBuf, ?> decoder) {
        if (DECODER_MAP.containsKey(type)) {
            return false;
        }
        DECODER_MAP.put(type, decoder);
        return true;
    }

    public static EntityDecoder<ByteBuf, Object> getDecoder(int type) {
        return (EntityDecoder<ByteBuf, Object>)DECODER_MAP.get(type);
    }

    public static boolean removeDecoder(int type) {
        return DECODER_MAP.remove(type) != null;
    }
}
public final class ResponseDataWriterRegistry {

    //保存EntityWriter的Map,key是類型type
    private static final Map<Integer, EntityWriter<Object, ByteBuf>> WRITER_MAP = new HashMap<>();

    public static <T> boolean addWriter(int type, EntityWriter<T, ByteBuf> writer) {
        if (WRITER_MAP.containsKey(type)) {
            return false;
        }
        WRITER_MAP.put(type, (EntityWriter<Object, ByteBuf>)writer);
        return true;
    }

    public static EntityWriter<Object, ByteBuf> getWriter(int type) {
        return WRITER_MAP.get(type);
    }

    public static boolean remove(int type) {
        return WRITER_MAP.remove(type) != null;
    }
}

源碼很簡(jiǎn)單乍狐,可以理解就是一個(gè)工具類赠摇;那么Map的中編碼、解碼器數(shù)據(jù)在哪個(gè)地方設(shè)置的呢浅蚪;在后面的init包中源碼會(huì)分析藕帜。

2.1.4 netty

這個(gè)包下其實(shí)就是handler類了,會(huì)放在ChannelPipeline中惜傲;在NettyTransportServer啟動(dòng)類中可以看到洽故。
源碼分析

public class NettyRequestDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        //1. 獲取RequestEntityDecoder
        RequestEntityDecoder<ByteBuf, Request> requestDecoder = ServerEntityCodecProvider.getRequestEntityDecoder();
        if (requestDecoder == null) {
            RecordLog.warn("[NettyRequestDecoder] Cannot resolve the global request entity decoder, "
                + "dropping the request");
            return;
        }

        // TODO: handle decode error here.
        // 2. 請(qǐng)求數(shù)據(jù)解碼
        Request request = requestDecoder.decode(in);
        if (request != null) {
            out.add(request);
        }
    }
}

public class NettyResponseEncoder extends MessageToByteEncoder<ClusterResponse> {

    @Override
    protected void encode(ChannelHandlerContext ctx, ClusterResponse response, ByteBuf out) throws Exception {

        //1. 獲取ResponseEntityWriter
        ResponseEntityWriter<ClusterResponse, ByteBuf> responseEntityWriter = ServerEntityCodecProvider.getResponseEntityWriter();
        if (responseEntityWriter == null) {
            RecordLog.warn("[NettyResponseEncoder] Cannot resolve the global response entity writer, reply bad status");

            //寫入錯(cuò)誤狀態(tài)數(shù)據(jù)
            writeBadStatusHead(response, out);
            return;
        }

        //2. 寫入數(shù)據(jù)
        responseEntityWriter.writeTo(response, out);
    }


    private void writeBadStatusHead(Response response, ByteBuf out) {
        out.writeInt(response.getId());
        out.writeByte(ClusterConstants.RESPONSE_STATUS_BAD);
        out.writeByte(response.getStatus());
    }
}
  • NettyRequestDecoder類繼承ByteToMessageDecoder,而MessageToByteEncoder又繼承ChannelInboundHandlerAdapter盗誊,熟悉netty的知道时甚,通過繼承ChannelInboundHandlerAdapter可以自定義攔截器
  • NettyResponseEncoder繼承MessageToByteEncoder,MessageToByteEncoder繼承ChannelOutboundHandlerAdapter

2.1.5 data

該包下就是EntityWriter和EntityDecoder的實(shí)現(xiàn)類了哈踱;其中EntityDecoder的子類有FlowRequestDataDecoder荒适,ParamFlowRequestDataDecoder,PingRequestDataDecoder开镣;EntityWriter的子類有FlowResponseDataWriter刀诬,PingResponseDataWriter。

2.1.5.1 對(duì)請(qǐng)求數(shù)據(jù)解碼

主要可以看到FlowRequestDataDecoder邪财、ParamFlowRequestDataDecoder陕壹、PingRequestDataDecoder類
對(duì)ClusterResponse響應(yīng)的數(shù)據(jù)進(jìn)行解碼质欲。

  • FlowRequestDataDecoder
    通用限流響應(yīng)數(shù)據(jù)解碼器,解碼后對(duì)象FlowRequestData,配合FlowRequestDataWriter查看

| flow ID (8) | count (4) | priority flag (1) |

源碼

public class FlowRequestDataDecoder implements EntityDecoder<ByteBuf, FlowRequestData> {

    @Override
    public FlowRequestData decode(ByteBuf source) {

        // 字節(jié)數(shù)判斷帐要,需要大于12
        if (source.readableBytes() >= 12) {

            FlowRequestData requestData = new FlowRequestData()
                .setFlowId(source.readLong())
                .setCount(source.readInt());

            //判斷是否有priority屬性
            if (source.readableBytes() >= 1) {
                requestData.setPriority(source.readBoolean());
            }
            return requestData;
        }
        return null;
    }
}
  • ParamFlowRequestDataDecoder
    熱點(diǎn)參數(shù)解碼器,解碼后的對(duì)象是ParamFlowRequestData弥奸,配合ParamFlowRequestDataWriter查看

| flow ID (8) | count (4) | param count (4) |

源碼

public class ParamFlowRequestDataDecoder implements EntityDecoder<ByteBuf, ParamFlowRequestData> {

    @Override
    public ParamFlowRequestData decode(ByteBuf source) {

        //讀取16個(gè)字符
        //| flow ID (8) | count (4) | param count (4) |
        if (source.readableBytes() >= 16) {
            ParamFlowRequestData requestData = new ParamFlowRequestData()
                .setFlowId(source.readLong())
                .setCount(source.readInt());

            //熱點(diǎn)參數(shù)個(gè)數(shù)
            int amount = source.readInt();
            if (amount > 0) {
                List<Object> params = new ArrayList<>(amount);
                for (int i = 0; i < amount; i++) {

                    //解析熱點(diǎn)參數(shù)
                    decodeParam(source, params);
                }

                requestData.setParams(params);
                return requestData;
            }
        }
        return null;
    }

    private boolean decodeParam(ByteBuf source, List<Object> params) {

        //熱點(diǎn)參數(shù)類型
        byte paramType = source.readByte();

        switch (paramType) {

            //int
            case ClusterConstants.PARAM_TYPE_INTEGER:
                params.add(source.readInt());
                return true;

            //string
            case ClusterConstants.PARAM_TYPE_STRING:

                //先讀取字符長(zhǎng)度
                int length = source.readInt();
                byte[] bytes = new byte[length];

                //讀取字符
                source.readBytes(bytes);
                // TODO: take care of charset?
                params.add(new String(bytes));
                return true;
            
            //boolean
            case ClusterConstants.PARAM_TYPE_BOOLEAN:
                params.add(source.readBoolean());
                return true;
            
            //double
            case ClusterConstants.PARAM_TYPE_DOUBLE:
                params.add(source.readDouble());
                return true;

            //long
            case ClusterConstants.PARAM_TYPE_LONG:
                params.add(source.readLong());
                return true;

            //float
            case ClusterConstants.PARAM_TYPE_FLOAT:
                params.add(source.readFloat());
                return true;

            //byte
            case ClusterConstants.PARAM_TYPE_BYTE:
                params.add(source.readByte());
                return true;

            //short
            case ClusterConstants.PARAM_TYPE_SHORT:
                params.add(source.readShort());
                return true;
            default:
                return false;
        }
    }
}
  • PingRequestDataDecoder
    測(cè)試數(shù)據(jù)
2.1.5.2 響應(yīng)數(shù)據(jù)編碼

有FlowResponseDataWriter榨惠、PingRequestDataWriter

  • FlowResponseDataWriter:Server端響應(yīng)編碼器,需要配合FlowResponseDataDecoder查看

源碼

public class FlowResponseDataWriter implements EntityWriter<FlowTokenResponseData, ByteBuf> {

    @Override
    public void writeTo(FlowTokenResponseData entity, ByteBuf out) {

        //剩下的數(shù)據(jù)
        out.writeInt(entity.getRemainingCount());
        //等待時(shí)間
        out.writeInt(entity.getWaitInMs());
    }
}
  • PingRequestDataWriter
    測(cè)試數(shù)據(jù)

2.2 配置config

這個(gè)包下主要是server的一些配置盛霎,包括常量配置赠橙、動(dòng)態(tài)配置等。

2.2.1 ServerTransportConfigObserver接口

源碼

public interface ServerTransportConfigObserver {

    /**
     * Callback on server transport config (e.g. port) change.
     *
     * @param config new server transport config
     */
     //定義了一個(gè)回調(diào)方法愤炸,用于傳輸配置變更時(shí)通知
    void onTransportConfigChange(ServerTransportConfig config);
}

2.2.2 ServerTransportConfig

實(shí)例變量

  • port:端口
  • idleSeconds:活躍時(shí)間

2.2.3 ServerFlowConfig

服務(wù)流控規(guī)則配置及默認(rèn)值期揪,看源碼
源碼

    public static final double DEFAULT_EXCEED_COUNT = 1.0d;
    public static final double DEFAULT_MAX_OCCUPY_RATIO = 1.0d;

    public static final int DEFAULT_INTERVAL_MS = 1000;
    public static final int DEFAULT_SAMPLE_COUNT= 10;
    public static final double DEFAULT_MAX_ALLOWED_QPS= 30000;

    private final String namespace;

    //超過數(shù)
    private double exceedCount = DEFAULT_EXCEED_COUNT;
    //最大比例
    private double maxOccupyRatio = DEFAULT_MAX_OCCUPY_RATIO;
    //間隔ms
    private int intervalMs = DEFAULT_INTERVAL_MS;
    //采樣個(gè)數(shù)
    private int sampleCount = DEFAULT_SAMPLE_COUNT;
    //最大允許qps
    private double maxAllowedQps = DEFAULT_MAX_ALLOWED_QPS;

2.2.4 ClusterServerConfigManager

集群流控配置管理器
實(shí)例變量并賦值默認(rèn)值

private static boolean embedded = false;

    /**
     * Server global transport and scope config.
     * 全局的服務(wù)端傳送配置
     */
    private static volatile int port = ClusterConstants.DEFAULT_CLUSTER_SERVER_PORT;
    private static volatile int idleSeconds = ServerTransportConfig.DEFAULT_IDLE_SECONDS;
    private static volatile Set<String> namespaceSet = Collections.singleton(ServerConstants.DEFAULT_NAMESPACE);

    /**
     * Server global flow config.
     * 服務(wù)端流控配置
     */
    private static volatile double exceedCount = ServerFlowConfig.DEFAULT_EXCEED_COUNT;
    private static volatile double maxOccupyRatio = ServerFlowConfig.DEFAULT_MAX_OCCUPY_RATIO;
    private static volatile int intervalMs = ServerFlowConfig.DEFAULT_INTERVAL_MS;
    private static volatile int sampleCount = ServerFlowConfig.DEFAULT_SAMPLE_COUNT;
    private static volatile double maxAllowedQps = ServerFlowConfig.DEFAULT_MAX_ALLOWED_QPS;

動(dòng)態(tài)配置初始化

    /**
     * Namespace-specific flow config for token server.
     * Format: (namespace, config).
     */
    //token server的命名配置
    private static final Map<String, ServerFlowConfig> NAMESPACE_CONF = new ConcurrentHashMap<>();

    //服務(wù)端Transport觀察者
    private static final List<ServerTransportConfigObserver> TRANSPORT_CONFIG_OBSERVERS = new ArrayList<>();

    /**
     * Property for cluster server global transport configuration.
     */
    //傳輸動(dòng)態(tài)配置
    private static SentinelProperty<ServerTransportConfig> transportConfigProperty = new DynamicSentinelProperty<>();
    /**
     * Property for cluster server namespace set.
     */
    //namespace動(dòng)態(tài)配置
    private static SentinelProperty<Set<String>> namespaceSetProperty = new DynamicSentinelProperty<>();
    /**
     * Property for cluster server global flow control configuration.
     */
    //流控規(guī)則動(dòng)態(tài)配置
    private static SentinelProperty<ServerFlowConfig> globalFlowProperty = new DynamicSentinelProperty<>();

    //配置監(jiān)聽者
    private static final PropertyListener<ServerTransportConfig> TRANSPORT_PROPERTY_LISTENER
        = new ServerGlobalTransportPropertyListener();
    private static final PropertyListener<ServerFlowConfig> GLOBAL_FLOW_PROPERTY_LISTENER
        = new ServerGlobalFlowPropertyListener();
    private static final PropertyListener<Set<String>> NAMESPACE_SET_PROPERTY_LISTENER
        = new ServerNamespaceSetPropertyListener();

    //啟動(dòng)時(shí)加載,增加監(jiān)聽器
    static {
        transportConfigProperty.addListener(TRANSPORT_PROPERTY_LISTENER);
        globalFlowProperty.addListener(GLOBAL_FLOW_PROPERTY_LISTENER);
        namespaceSetProperty.addListener(NAMESPACE_SET_PROPERTY_LISTENER);
    }

    //省略部分代碼
    
    //增加傳送變更配置變更者
    public static void addTransportConfigChangeObserver(ServerTransportConfigObserver observer) {
        AssertUtil.notNull(observer, "observer cannot be null");
        TRANSPORT_CONFIG_OBSERVERS.add(observer);
    }

監(jiān)聽器內(nèi)部類

//namespace 監(jiān)聽器
private static class ServerNamespaceSetPropertyListener implements PropertyListener<Set<String>> {

    @Override
    public synchronized void configLoad(Set<String> set) {
        if (set == null || set.isEmpty()) {
            RecordLog.warn("[ClusterServerConfigManager] WARN: empty initial server namespace set");
            return;
        }
        //更新
        applyNamespaceSetChange(set);
    }
    @Override
    public synchronized void configUpdate(Set<String> set) {
        // TODO: should debounce?
        applyNamespaceSetChange(set);
    }
}

private static void applyNamespaceSetChange(Set<String> newSet) {
    if (newSet == null) {
        return;
    }
    RecordLog.info("[ClusterServerConfigManager] Server namespace set will be update to: " + newSet);
    if (newSet.isEmpty()) {
        ClusterServerConfigManager.namespaceSet = Collections.singleton(ServerConstants.DEFAULT_NAMESPACE);
        return;
    }
    newSet = new HashSet<>(newSet);
    // Always add the `default` namespace to the namespace set.
    newSet.add(ServerConstants.DEFAULT_NAMESPACE);
    if (embedded) {
        // In embedded server mode, the server itself is also a part of service,
        // so it should be added to namespace set.
        // By default, the added namespace is the appName.
        //嵌入模式也需要增加
        newSet.add(ConfigSupplierRegistry.getNamespaceSupplier().get());
    }
    //更新
    Set<String> oldSet = ClusterServerConfigManager.namespaceSet;
    if (oldSet != null && !oldSet.isEmpty()) {
        for (String ns : oldSet) {
            // Remove the cluster rule property for deprecated namespace set.
            if (!newSet.contains(ns)) {
                ClusterFlowRuleManager.removeProperty(ns);
                ClusterParamFlowRuleManager.removeProperty(ns);
            }
        }
    }
    ClusterServerConfigManager.namespaceSet = newSet;
    //注冊(cè)規(guī)則屬性规个,在后面會(huì)講解
    for (String ns : newSet) {
        // Register the rule property if needed.
        ClusterFlowRuleManager.registerPropertyIfAbsent(ns);
        ClusterParamFlowRuleManager.registerPropertyIfAbsent(ns);
        // Initialize the global QPS limiter for the namespace.
        GlobalRequestLimiter.initIfAbsent(ns);
    }
} 
//globaTransport監(jiān)聽器
private static class ServerGlobalTransportPropertyListener implements PropertyListener<ServerTransportConfig> {
    @Override
    public void configLoad(ServerTransportConfig config) {
        if (config == null) {
            RecordLog.warn("[ClusterServerConfigManager] Empty initial server transport config");
            return;
        }
        applyConfig(config);
    }
    @Override
    public void configUpdate(ServerTransportConfig config) {
        //應(yīng)用配置
        applyConfig(config);
    }
    private synchronized void applyConfig(ServerTransportConfig config) {
        //校驗(yàn)配置
        if (!isValidTransportConfig(config)) {
            RecordLog.warn(
                "[ClusterServerConfigManager] Invalid cluster server transport config, ignoring: " + config);
            return;
        }
        RecordLog.info("[ClusterServerConfigManager] Updating new server transport config: " + config);
        if (config.getIdleSeconds() != idleSeconds) {
            idleSeconds = config.getIdleSeconds();
        }
        //更新server的token
        updateTokenServer(config);
    }
}

private static void updateTokenServer(ServerTransportConfig config) {
    int newPort = config.getPort();
    AssertUtil.isTrue(newPort > 0, "token server port should be valid (positive)");
    if (newPort == port) {
        return;
    }
    ClusterServerConfigManager.port = newPort;
    for (ServerTransportConfigObserver observer : TRANSPORT_CONFIG_OBSERVERS) {
        //更新凤薛,通過觀察者模式更新
        observer.onTransportConfigChange(config);
    }
}

//globalFlow流控監(jiān)聽器  
private static class ServerGlobalFlowPropertyListener implements PropertyListener<ServerFlowConfig> {
    @Override
    public void configUpdate(ServerFlowConfig config) {
        //更新
        applyGlobalFlowConfig(config);
    }
    @Override
    public void configLoad(ServerFlowConfig config) {
        applyGlobalFlowConfig(config);
    }
    private synchronized void applyGlobalFlowConfig(ServerFlowConfig config) {

        //校驗(yàn)規(guī)則
        if (!isValidFlowConfig(config)) {
            RecordLog.warn(
                "[ClusterServerConfigManager] Invalid cluster server global flow config, ignoring: " + config);
            return;
        }
        RecordLog.info("[ClusterServerConfigManager] Updating new server global flow config: " + config);

        //判斷有沒有更新
        if (config.getExceedCount() != exceedCount) {
            exceedCount = config.getExceedCount();
        }
        if (config.getMaxOccupyRatio() != maxOccupyRatio) {
            maxOccupyRatio = config.getMaxOccupyRatio();
        }
        if (config.getMaxAllowedQps() != maxAllowedQps) {
            maxAllowedQps = config.getMaxAllowedQps();
            //調(diào)用GlobalRequestLimiter設(shè)置qps變更
            GlobalRequestLimiter.applyMaxQpsChange(maxAllowedQps);
        }

        int newIntervalMs = config.getIntervalMs();
        int newSampleCount = config.getSampleCount();
        if (newIntervalMs != intervalMs || newSampleCount != sampleCount) {
            if (newIntervalMs <= 0 || newSampleCount <= 0 || newIntervalMs % newSampleCount != 0) {
                RecordLog.warn("[ClusterServerConfigManager] Ignoring invalid flow interval or sample count");
            } else {
                intervalMs = newIntervalMs;
                sampleCount = newSampleCount;
                // Reset all the metrics.
                //重置統(tǒng)計(jì)指標(biāo)
                ClusterMetricStatistics.resetFlowMetrics();
                ClusterParamMetricStatistics.resetFlowMetrics();
            }
        }
    }
}
public static boolean isValidTransportConfig(ServerTransportConfig config) {
    return config != null && config.getPort() > 0 && config.getPort() <= 65535;
}

public static boolean isValidFlowConfig(ServerFlowConfig config) {
    return config != null && config.getMaxOccupyRatio() >= 0 && config.getExceedCount() >= 0
        && config.getMaxAllowedQps() >= 0
        && FlowRuleUtil.isWindowConfigValid(config.getSampleCount(), config.getIntervalMs());
}

2.3 連接器connection

這個(gè)包下主要就是集群鏈接器、連接器組的管理诞仓。

2.3.1 Connection接口

源碼

public interface Connection extends AutoCloseable {

    //獲取SocketAddress
    SocketAddress getLocalAddress();

    //port
    int getRemotePort();

    //ip
    String getRemoteIP();

    //刷新readTime
    void refreshLastReadTime(long lastReadTime);

    //獲取lastReadTime
    long getLastReadTime();

    //獲取鏈接的key
    String getConnectionKey();
}

Connection繼承AutoCloseable接口后就可以自動(dòng)釋放資源了缤苫,JDK中的文件流操作在1.7版本后也實(shí)現(xiàn)了。
定義了6個(gè)方法墅拭,在子類實(shí)現(xiàn)活玲。

2.3.2 NettyConnection

實(shí)例變量

  • remoteIp:遠(yuǎn)程ip
  • remotePort:遠(yuǎn)程port
  • channel:渠道Channel
  • lastReadTime:上傳刷新事件
  • pool:鏈接池

源碼

//Netty鏈接器
public class NettyConnection implements Connection {

    private String remoteIp;
    private int remotePort;
    private Channel channel;

    private long lastReadTime;

    private ConnectionPool pool;

    //構(gòu)造器,需要傳入channel以及pool谍婉;ConnectionPool下面會(huì)說
    public NettyConnection(Channel channel, ConnectionPool pool) {
        this.channel = channel;
        this.pool = pool;

        //獲取socketAddress
        InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();
        this.remoteIp = socketAddress.getAddress().getHostAddress();
        this.remotePort = socketAddress.getPort();
        this.lastReadTime = System.currentTimeMillis();
    }

     @Override
    public SocketAddress getLocalAddress() {
        return channel.localAddress();
    }

    //省略部分代碼

    @Override
    public String getConnectionKey() {
        //ip:port
        return remoteIp + ":" + remotePort;
    }

    @Override
    //實(shí)現(xiàn)了AutoCloseable的close方法舒憾,可以自動(dòng)關(guān)閉資源
    public void close() {
        // Remove from connection pool.
        pool.remove(channel);
        // Close the connection.
        if (channel != null && channel.isActive()){
            channel.close();
        }
    }

}

2.3.3 ConnectionPool

通用連接池連接管理。
實(shí)例變量

  • TIMER:初始化了一個(gè)可定時(shí)執(zhí)行的線程執(zhí)行器穗熬,核心線程2個(gè)
  • CONNECTION_MAP:鏈接器保存對(duì)象:Format: ("ip:port", connection)
  • scanTaskFuture:定期掃描任務(wù)

源碼

public class ConnectionPool {

    @SuppressWarnings("PMD.ThreadPoolCreationRule")
    private static final ScheduledExecutorService TIMER = Executors.newScheduledThreadPool(2);

    /**
     * Format: ("ip:port", connection)
     */
    private final Map<String, Connection> CONNECTION_MAP = new ConcurrentHashMap<String, Connection>();

    /**
     * Periodic scan task.
     */
    private ScheduledFuture scanTaskFuture = null;

    //創(chuàng)建連接器
    public void createConnection(Channel channel) {
        if (channel != null) {

            //通過構(gòu)造方法創(chuàng)建
            Connection connection = new NettyConnection(channel, this);

            //獲取connKey并保存在CONNECTION_MAP中
            String connKey = getConnectionKey(channel);
            CONNECTION_MAP.put(connKey, connection);
        }
    }

    /**
     * Start the scan task for long-idle connections.
     */
    //啟動(dòng)一個(gè)定時(shí)任務(wù)镀迂,定時(shí)任務(wù)下面講解
    private synchronized void startScan() {
        if (scanTaskFuture == null
            || scanTaskFuture.isCancelled()
            || scanTaskFuture.isDone()) {
            scanTaskFuture = TIMER.scheduleAtFixedRate(
                new ScanIdleConnectionTask(this), 10, 30, TimeUnit.SECONDS);
        }
    }

    /**
     * Format to "ip:port".
     *
     * @param channel channel
     * @return formatted key
     */
    private String getConnectionKey(Channel channel) {
        InetSocketAddress socketAddress = (InetSocketAddress)channel.remoteAddress();
        String remoteIp = socketAddress.getAddress().getHostAddress();
        int remotePort = socketAddress.getPort();
        return remoteIp + ":" + remotePort;
    }

    private String getConnectionKey(String ip, int port) {
        return ip + ":" + port;
    }

    //刷新readTime,readTime為當(dāng)前時(shí)間
    public void refreshLastReadTime(Channel channel) {
        if (channel != null) {
            String connKey = getConnectionKey(channel);
            Connection connection = CONNECTION_MAP.get(connKey);
            if (connection != null) {
                connection.refreshLastReadTime(System.currentTimeMillis());
            }
        }
    }

    //獲取鏈接,直接從map中獲取
    public Connection getConnection(String remoteIp, int remotePort) {
        String connKey = getConnectionKey(remoteIp, remotePort);
        return CONNECTION_MAP.get(connKey);
    }

    public void remove(Channel channel) {
        String connKey = getConnectionKey(channel);
        CONNECTION_MAP.remove(connKey);
    }

    public List<Connection> listAllConnection() {
        List<Connection> connections = new ArrayList<Connection>(CONNECTION_MAP.values());
        return connections;
    }

    public int count() {
        return CONNECTION_MAP.size();
    }

    public void clear() {
        CONNECTION_MAP.clear();
    }

    //shoudownAll
    public void shutdownAll() throws Exception {
        for (Connection c : CONNECTION_MAP.values()) {
            c.close();
        }
    }

    //刷新定時(shí)任務(wù)
    public void refreshIdleTask() {
        if (scanTaskFuture == null || scanTaskFuture.cancel(false)) {
            startScan();
        } else {
            RecordLog.info("The result of canceling scanTask is error.");
        }
    }
}

2.3.4 ScanIdleConnectionTask

ScanIdleConnectionTask是定時(shí)任務(wù)執(zhí)行實(shí)現(xiàn)了Runnable的方法唤蔗。
源碼

public class ScanIdleConnectionTask implements Runnable {

    private final ConnectionPool connectionPool;

    //構(gòu)造方法
    public ScanIdleConnectionTask(ConnectionPool connectionPool) {
        this.connectionPool = connectionPool;
    }

    @Override
    public void run() {
        try {
            
            //獲取鏈接獲取時(shí)間招拙,默認(rèn)600s
            int idleSeconds = ClusterServerConfigManager.getIdleSeconds();
            long idleTimeMillis = idleSeconds * 1000;
            if (idleTimeMillis < 0) {
                idleTimeMillis = ServerTransportConfig.DEFAULT_IDLE_SECONDS * 1000;
            }

            long now = System.currentTimeMillis();

            //拿到所有的連接
            List<Connection> connections = connectionPool.listAllConnection();
            for (Connection conn : connections) {

                //如果當(dāng)前時(shí)間-上次readTime大于活躍時(shí)間,說明鏈接可以關(guān)閉了
                if ((now - conn.getLastReadTime()) > idleTimeMillis) {
                    RecordLog.info(
                        String.format("[ScanIdleConnectionTask] The connection <%s:%d> has been idle for <%d>s. "
                            + "It will be closed now.", conn.getRemoteIP(), conn.getRemotePort(), idleSeconds)
                    );
                    //關(guān)閉鏈接
                    conn.close();
                }
            }
        } catch (Throwable t) {
            RecordLog.warn("[ScanIdleConnectionTask] Failed to clean-up idle tasks", t);
        }
    }
}

2.3.5 連接組ConnectionGroup

連接組可以理解就是連接connection的集合措译。
實(shí)例變量

  • namespace:命名空間
  • connectionSet:ConnectionDescriptor的集合别凤,ConnectionDescriptor的實(shí)例變量是address、host领虹。
  • connectedCount:連接次數(shù)规哪,定義的是AtomicInteger類型

源碼分析

public class ConnectionGroup {

    private final String namespace;

    private final Set<ConnectionDescriptor> connectionSet = Collections.synchronizedSet(new HashSet<ConnectionDescriptor>());
    private final AtomicInteger connectedCount = new AtomicInteger();

    //帶namespace構(gòu)造函數(shù)
    public ConnectionGroup(String namespace) {
        AssertUtil.notEmpty(namespace, "namespace cannot be empty");
        this.namespace = namespace;
    }

    //無參構(gòu)造函數(shù)
    public ConnectionGroup() {
        this(ServerConstants.DEFAULT_NAMESPACE);
    }

    //增加連接
    public ConnectionGroup addConnection(String address) {
        AssertUtil.notEmpty(address, "address cannot be empty");

        //獲取host,若是ip:port形式塌衰,就只需要ip
        String[] ip = address.split(":");
        String host;
        if (ip != null && ip.length >= 1) {
            host = ip[0];
        } else {
            host = address;
        }
        
        //已經(jīng)重寫的equals诉稍、hashCode方法
        boolean newAdded = connectionSet.add(new ConnectionDescriptor().setAddress(address).setHost(host));

        //增加成功蝠嘉,連接數(shù)加1
        if (newAdded) {
            connectedCount.incrementAndGet();
        }

        return this;
    }

    //移除連接
    public ConnectionGroup removeConnection(String address) {
        AssertUtil.notEmpty(address, "address cannot be empty");

        if (connectionSet.remove(new ConnectionDescriptor().setAddress(address))) {
            connectedCount.decrementAndGet();
        }

        return this;
    }
}

2.3.6 連接管理ConnectionManager

顧名思義ConnectionManager是對(duì)連接的管
實(shí)例變量

  • CONN_MAP:Connection map (namespace, connection).
  • NAMESPACE_MAP:namespace map (address, namespace).

源碼分析

public final class ConnectionManager {

    /**
     * Connection map (namespace, connection).
     */
    private static final Map<String, ConnectionGroup> CONN_MAP = new ConcurrentHashMap<>();
    /**
     * namespace map (address, namespace).
     */
    private static final Map<String, String> NAMESPACE_MAP = new ConcurrentHashMap<>();

    /**
     * Get connected count for specific namespace.
     *
     * @param namespace namespace to check
     * @return connected count for specific namespace
     */
    //獲取namespace的連接數(shù)
    public static int getConnectedCount(String namespace) {
        AssertUtil.notEmpty(namespace, "namespace should not be empty");
        ConnectionGroup group = CONN_MAP.get(namespace);
        return group == null ? 0 : group.getConnectedCount();
    }

    //查詢獲取創(chuàng)建連接組,注意:連接沒有創(chuàng)建
    public static ConnectionGroup getOrCreateGroup(String namespace) {
        AssertUtil.assertNotBlank(namespace, "namespace should not be empty");
        ConnectionGroup group = CONN_MAP.get(namespace);
        if (group == null) {

            //synchronized鎖住杯巨,防止并發(fā)問題
            synchronized (CREATE_LOCK) {
                if ((group = CONN_MAP.get(namespace)) == null) {

                    //創(chuàng)建并保存在CONN_MAP方便獲取
                    group = new ConnectionGroup(namespace);
                    CONN_MAP.put(namespace, group);
                }
            }
        }
        return group;
    }

    //移除連接
    public static void removeConnection(String address) {
        AssertUtil.assertNotBlank(address, "address should not be empty");
        String namespace = NAMESPACE_MAP.get(address);
        if (namespace != null) {
            ConnectionGroup group = CONN_MAP.get(namespace);
            if (group == null) {
                return;
            }
            
            //調(diào)用ConnectionGroup的方法蚤告,上面講解過,其實(shí)就是移除保存在set里面的方法服爷,
            group.removeConnection(address);
            RecordLog.info("[ConnectionManager] Client <{0}> disconnected and removed from namespace <{1}>", address, namespace);
        }

        //map中移除
        NAMESPACE_MAP.remove(address);
    }

    //傳入兩個(gè)參數(shù)移除杜恰,少走一步獲取namespace
    public static void removeConnection(String namespace, String address) {
        AssertUtil.assertNotBlank(namespace, "namespace should not be empty");
        AssertUtil.assertNotBlank(address, "address should not be empty");
        ConnectionGroup group = CONN_MAP.get(namespace);
        if (group == null) {
            return;
        }
        group.removeConnection(address);
        NAMESPACE_MAP.remove(address);
        RecordLog.info("[ConnectionManager] Client <{0}> disconnected and removed from namespace <{1}>", address, namespace);
    }

    //增加連接,有address參數(shù)仍源,需要把a(bǔ)ddress增加到NAMESPACE_MAP map中心褐,并增加連接次數(shù)
    public static ConnectionGroup addConnection(String namespace, String address) {
        AssertUtil.assertNotBlank(namespace, "namespace should not be empty");
        AssertUtil.assertNotBlank(address, "address should not be empty");
        ConnectionGroup group = getOrCreateGroup(namespace);
        group.addConnection(address);
        NAMESPACE_MAP.put(address, namespace);
        RecordLog.info("[ConnectionManager] Client <{0}> registered with namespace <{1}>", address, namespace);
        return group;
    }

    //增加連接
    public static ConnectionGroup getOrCreateConnectionGroup(String namespace) {
        AssertUtil.assertNotBlank(namespace, "namespace should not be empty");
        ConnectionGroup group = getOrCreateGroup(namespace);
        return group;
    }

    // 拿到連接組
    public static ConnectionGroup getConnectionGroup(String namespace) {
        AssertUtil.assertNotBlank(namespace, "namespace should not be empty");
        ConnectionGroup group = CONN_MAP.get(namespace);
        return group;
    }

    static void clear() {
        CONN_MAP.clear();
        NAMESPACE_MAP.clear();
    }

    private static final Object CREATE_LOCK = new Object();

    private ConnectionManager() {}
}

2.4 處理器processor

這個(gè)包下主要是流控的處理器,分別有普通限流和熱點(diǎn)限流處理器

2.4.1 接口RequestProcessor

public interface RequestProcessor<T, R> {

    /**
     * Process the cluster request.
     *
     * @param request Sentinel cluster request
     * @return the response after processed
     */
     //處理請(qǐng)求
     //有兩個(gè)實(shí)現(xiàn)類笼踩,分別是FlowRequestProcessor逗爹,ParamFlowRequestProcessor
    ClusterResponse<R> processRequest(ClusterRequest<T> request);
}

2.4.2 RequestProcessorProvider

請(qǐng)求流控提供者,類似于工廠類
實(shí)例變量

  • PROCESSOR_MAP:請(qǐng)求類型對(duì)應(yīng)的請(qǐng)求處理器
  • SERVICE_LOADER:通過ServiceLoader加載流控實(shí)現(xiàn)類

源碼分析

public final class RequestProcessorProvider {

    private static final Map<Integer, RequestProcessor> PROCESSOR_MAP = new ConcurrentHashMap<>();

    //默認(rèn)配置的實(shí)現(xiàn)類有FlowRequestProcessor嚎于,ParamFlowRequestProcessor
    private static final ServiceLoader<RequestProcessor> SERVICE_LOADER = ServiceLoaderUtil.getServiceLoader(
        RequestProcessor.class);

    //靜態(tài)代碼快掘而,類啟動(dòng)是會(huì)加載
    static {
        loadAndInit();
    }

    private static void loadAndInit() {
        for (RequestProcessor processor : SERVICE_LOADER) {
            Integer type = parseRequestType(processor);
            if (type != null) {

                //放入map中
                PROCESSOR_MAP.put(type, processor);
            }
        }
    }

    //獲取RequestType
    private static Integer parseRequestType(RequestProcessor processor) {

        //配置在注解上
        RequestType requestType = processor.getClass().getAnnotation(RequestType.class);
        if (requestType != null) {
            return requestType.value();
        } else {
            return null;
        }
    }

    //獲取RequestProcessor
    public static RequestProcessor getProcessor(int type) {
        return PROCESSOR_MAP.get(type);
    }

    static void addProcessorIfAbsent(int type, RequestProcessor processor) {
        // TBD: use putIfAbsent in JDK 1.8.
        if (PROCESSOR_MAP.containsKey(type)) {
            return;
        }
        PROCESSOR_MAP.put(type, processor);
    }

    static void addProcessor(int type, RequestProcessor processor) {
        AssertUtil.notNull(processor, "processor cannot be null");
        PROCESSOR_MAP.put(type, processor);
    }

    private RequestProcessorProvider() {}
}

2.4.3 FlowRequestProcessor

源碼分析

@RequestType(ClusterConstants.MSG_TYPE_FLOW)
public class FlowRequestProcessor implements RequestProcessor<FlowRequestData, FlowTokenResponseData> {

    @Override
    public ClusterResponse<FlowTokenResponseData> processRequest(ClusterRequest<FlowRequestData> request) {

        //獲取TokenService,配置的是DefaultTokenService
        TokenService tokenService = TokenServiceProvider.getService();

        long flowId = request.getData().getFlowId();
        int count = request.getData().getCount();
        boolean prioritized = request.getData().isPriority();

        //獲取請(qǐng)求token
        TokenResult result = tokenService.requestToken(flowId, count, prioritized);

        //解析響應(yīng)結(jié)果
        return toResponse(result, request);
    }

    private ClusterResponse<FlowTokenResponseData> toResponse(TokenResult result, ClusterRequest request) {
        return new ClusterResponse<>(request.getId(), request.getType(), result.getStatus(),
            new FlowTokenResponseData()
                .setRemainingCount(result.getRemaining())
                .setWaitInMs(result.getWaitInMs())
        );
    }
}

2.4.4 ParamFlowRequestProcessor

源碼分析

@RequestType(ClusterConstants.MSG_TYPE_PARAM_FLOW)
public class ParamFlowRequestProcessor implements RequestProcessor<ParamFlowRequestData, FlowTokenResponseData> {

    @Override
    public ClusterResponse<FlowTokenResponseData> processRequest(ClusterRequest<ParamFlowRequestData> request) {


        TokenService tokenService = TokenServiceProvider.getService();

        long flowId = request.getData().getFlowId();
        int count = request.getData().getCount();
        Collection<Object> args = request.getData().getParams();

        //請(qǐng)求熱點(diǎn)參數(shù)
        TokenResult result = tokenService.requestParamToken(flowId, count, args);

        //流控接口解析
        return toResponse(result, request);
    }

    private ClusterResponse<FlowTokenResponseData> toResponse(TokenResult result, ClusterRequest request) {
        return new ClusterResponse<>(request.getId(), request.getType(), result.getStatus(),
            new FlowTokenResponseData()
                .setRemainingCount(result.getRemaining())
                .setWaitInMs(0)
        );
    }
}

流控解析于购,下一面講解镣屹。

2.5 啟動(dòng)加載器DefaultClusterServerInitFunc

源碼

//實(shí)現(xiàn)了InitFunc,InitFunc會(huì)在系統(tǒng)啟動(dòng)時(shí)加載
public class DefaultClusterServerInitFunc implements InitFunc {

    @Override
    public void init() throws Exception {

        //初始化Decoders
        initDefaultEntityDecoders();
        //初始化wriders
        initDefaultEntityWriters();
        //初始化processors
        initDefaultProcessors();

        // Eagerly-trigger the SPI pre-load of token service.
        // 這個(gè)時(shí)候就把TokenService加載好了
        TokenServiceProvider.getService();

        RecordLog.info("[DefaultClusterServerInitFunc] Default entity codec and processors registered");
    }

    private void initDefaultEntityWriters() {

        //ping
        ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_PING, new PingResponseDataWriter());
        //流控Writer
        ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_FLOW, new FlowResponseDataWriter());
        //熱點(diǎn)參數(shù)Writer和Flow一樣
        ResponseDataWriterRegistry.addWriter(ClusterConstants.MSG_TYPE_PARAM_FLOW, new FlowResponseDataWriter());
    }

    private void initDefaultEntityDecoders() {
        //ping
        RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_PING, new PingRequestDataDecoder());
        //普通Flow
        RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_FLOW, new FlowRequestDataDecoder());
        //熱點(diǎn)參數(shù)
        RequestDataDecodeRegistry.addDecoder(ClusterConstants.MSG_TYPE_PARAM_FLOW, new ParamFlowRequestDataDecoder());
    }

    private void initDefaultProcessors() {
        // Eagerly-trigger the SPI pre-load.
        //獲取默認(rèn),實(shí)際上就是加載了价涝,這個(gè)類在上面已經(jīng)講過了
        RequestProcessorProvider.getProcessor(0);
    }
}

2.6 TokenServiceProvider

類似于RequestProcessor女蜈,可以了解到sentinel的源碼作者,習(xí)慣于使用Provider作為一個(gè)工廠使用色瘩。
源碼

public final class TokenServiceProvider {

    private static TokenService service = null;

    static {
        resolveTokenServiceSpi();
    }

    public static TokenService getService() {
        return service;
    }

    private static void resolveTokenServiceSpi() {
        //加載TokenServeice伪窖,若不存在的則使用默認(rèn)的DefalutTokenService,這里使用的就是默認(rèn)的
        service = SpiLoader.loadFirstInstanceOrDefault(TokenService.class, DefaultTokenService.class);
        if (service != null) {
            RecordLog.info("[TokenServiceProvider] Global token service resolved: "
                + service.getClass().getCanonicalName());
        } else {
            RecordLog.warn("[TokenServiceProvider] Unable to resolve TokenService: no SPI found");
        }
    }
}

3居兆、其他內(nèi)容

  • ClusterTokenServer:TokenServer相關(guān)的在后面與client關(guān)聯(lián)時(shí)再講解覆山,這邊有NettyTransportServer、SentinelDefaultTokenServer泥栖、DefaultEmbeddedTokenServer類簇宽,以及handler包下的TokenServerHandler。
  • log下的ClusterServerStatLogUtil主要用于集群流控記錄日志的
  • command.handler包下類主要用于管理中心動(dòng)態(tài)配置的查詢與變更的交互:如流控規(guī)則吧享、監(jiān)控指標(biāo)魏割、集群配置等
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市钢颂,隨后出現(xiàn)的幾起案子钞它,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,273評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件遭垛,死亡現(xiàn)場(chǎng)離奇詭異尼桶,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)锯仪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,349評(píng)論 3 398
  • 文/潘曉璐 我一進(jìn)店門泵督,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人庶喜,你說我怎么就攤上這事小腊。” “怎么了溃卡?”我有些...
    開封第一講書人閱讀 167,709評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵溢豆,是天一觀的道長(zhǎng)蜒简。 經(jīng)常有香客問我瘸羡,道長(zhǎng),這世上最難降的妖魔是什么搓茬? 我笑而不...
    開封第一講書人閱讀 59,520評(píng)論 1 296
  • 正文 為了忘掉前任犹赖,我火速辦了婚禮,結(jié)果婚禮上卷仑,老公的妹妹穿的比我還像新娘峻村。我一直安慰自己,他們只是感情好锡凝,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,515評(píng)論 6 397
  • 文/花漫 我一把揭開白布粘昨。 她就那樣靜靜地躺著,像睡著了一般窜锯。 火紅的嫁衣襯著肌膚如雪张肾。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,158評(píng)論 1 308
  • 那天锚扎,我揣著相機(jī)與錄音吞瞪,去河邊找鬼。 笑死驾孔,一個(gè)胖子當(dāng)著我的面吹牛芍秆,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播翠勉,決...
    沈念sama閱讀 40,755評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼妖啥,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了对碌?” 一聲冷哼從身側(cè)響起迹栓,我...
    開封第一講書人閱讀 39,660評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后克伊,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體酥郭,經(jīng)...
    沈念sama閱讀 46,203評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,287評(píng)論 3 340
  • 正文 我和宋清朗相戀三年愿吹,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了不从。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,427評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡犁跪,死狀恐怖椿息,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情坷衍,我是刑警寧澤寝优,帶...
    沈念sama閱讀 36,122評(píng)論 5 349
  • 正文 年R本政府宣布,位于F島的核電站枫耳,受9級(jí)特大地震影響乏矾,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜迁杨,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,801評(píng)論 3 333
  • 文/蒙蒙 一钻心、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧铅协,春花似錦捷沸、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,272評(píng)論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至骏全,卻和暖如春苍柏,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背吟温。 一陣腳步聲響...
    開封第一講書人閱讀 33,393評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工序仙, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人鲁豪。 一個(gè)月前我還...
    沈念sama閱讀 48,808評(píng)論 3 376
  • 正文 我出身青樓潘悼,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親爬橡。 傳聞我的和親對(duì)象是個(gè)殘疾皇子治唤,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,440評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容