Kafka源碼深度解析-序列2 -Producer -Metadata的數(shù)據(jù)結(jié)構(gòu)與讀取毯侦、更新策略

在上一篇牢硅,我們從使用方式和策略上窥淆,對消息隊(duì)列做了一個(gè)宏觀描述澎胡。從本篇開始猜年,我們將深入到源碼內(nèi)部根欧,仔細(xì)分析Kafka到底是如何實(shí)現(xiàn)一個(gè)分布式消息隊(duì)列羽杰。我們的分析將從Producer端開始渡紫。

從Kafka 0.8.2開始,發(fā)布了一套新的Java版的client api, KafkaProducer/KafkaConsumer考赛,替代之前的scala版的api惕澎。本系列的分析將只針對這套Java版的api。

多線程異步發(fā)送模型

下圖是經(jīng)過源碼分析之后颜骤,整理出來的Producer端的架構(gòu)圖:

<figure style="margin: 24px 0px; color: rgb(26, 26, 26); font-family: -apple-system, system-ui, "Helvetica Neue", "PingFang SC", "Microsoft YaHei", "Source Han Sans SC", "Noto Sans CJK SC", "WenQuanYi Micro Hei", sans-serif; font-size: 16px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; white-space: pre-wrap; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; background-color: rgb(255, 255, 255); text-decoration-style: initial; text-decoration-color: initial;">
image

</figure>

在上一篇我們講過唧喉,Producer有同步發(fā)送和異步發(fā)送2種策略。在以前的Kafka client api實(shí)現(xiàn)中忍抽,同步和異步是分開實(shí)現(xiàn)的八孝。而在0.9中,同步發(fā)送其實(shí)是通過異步發(fā)送間接實(shí)現(xiàn)鸠项,其接口如下:

public class KafkaProducer<K, V> implements Producer<K, V> {
...
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)  //異步發(fā)送接口
     {
     ...
     }
}

要實(shí)現(xiàn)同步發(fā)送干跛,只要在拿到返回的Future對象之后,直接調(diào)用get()就可以了祟绊。

基本思路

從上圖我們可以看出楼入,異步發(fā)送的基本思路就是:send的時(shí)候,KafkaProducer把消息放到本地的消息隊(duì)列RecordAccumulator牧抽,然后一個(gè)后臺(tái)線程Sender不斷循環(huán)嘉熊,把消息發(fā)給Kafka集群。

要實(shí)現(xiàn)這個(gè)扬舒,還得有一個(gè)前提條件:就是KafkaProducer/Sender都需要獲取集群的配置信息Metadata阐肤。所謂Metadata,也就是在上一篇所講的,Topic/Partion與broker的映射關(guān)系:每一個(gè)Topic的每一個(gè)Partion孕惜,得知道其對應(yīng)的broker列表是什么愧薛,其中l(wèi)eader是誰,follower是誰诊赊。

2個(gè)數(shù)據(jù)流

所以在上圖中厚满,有2個(gè)數(shù)據(jù)流:
Metadata流(A1,A2,A3):Sender從集群獲取信息,然后更新Metadata碧磅; KafkaProducer先讀取Metadata碘箍,然后把消息放入隊(duì)列。

消息流(B1, B2, B3):這個(gè)很好理解鲸郊,不再詳述丰榴。

本篇著重講述Metadata流,消息流秆撮,將在后續(xù)詳細(xì)講述四濒。

Metadata的線程安全性

從上圖可以看出,Metadata是多個(gè)producer線程讀职辨,一個(gè)sender線程更新盗蟆,因此它必須是線程安全的。

Kafka的官方文檔上也有說明舒裤,KafkaProducer是線程安全的喳资,可以在多線程中調(diào)用:

The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.

從下面代碼也可以看出,它的所有public方法都是synchronized:

public final class Metadata {
  腾供。仆邓。。
    public synchronized Cluster fetch() {
        return this.cluster;
    }
    public synchronized long timeToNextUpdate(long nowMs) {
       伴鳖。节值。。
    }
    public synchronized int requestUpdate() {
      榜聂。搞疗。。
    }
    须肆。贴汪。。    
}

Metadata的數(shù)據(jù)結(jié)構(gòu)

下面代碼列舉了Metadata的主要數(shù)據(jù)結(jié)構(gòu):一個(gè)Cluster對象 + 1堆狀態(tài)變量休吠。前者記錄了集群的配置信息,后者用于控制Metadata的更新策略业簿。

public final class Metadata {
...
    private final long refreshBackoffMs;  //更新失敗的情況下瘤礁,下1次更新的補(bǔ)償時(shí)間(這個(gè)變量在代碼中意義不是太大)
    private final long metadataExpireMs; //關(guān)鍵值:每隔多久,更新一次梅尤。缺省是600*1000柜思,也就是10分種
    private int version;         //每更新成功1次岩调,version遞增1。這個(gè)變量主要用于在while循環(huán)赡盘,wait的時(shí)候号枕,作為循環(huán)判斷條件
    private long lastRefreshMs;  //上一次更新時(shí)間(也包含更新失敗的情況)
    private long lastSuccessfulRefreshMs; //上一次成功更新的時(shí)間(如果每次都成功的話,則2者相等陨享。否則葱淳,lastSuccessulRefreshMs < lastRefreshMs)
    private Cluster cluster;   //集群配置信息
    private boolean needUpdate;  //是否強(qiáng)制刷新
、
  ...
}

public final class Cluster {
...
    private final List<Node> nodes;   //Node也就是Broker
    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;  //Topic/Partion和broker list的映射關(guān)系
    private final Map<String, List<PartitionInfo>> partitionsByTopic;
    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
    private final Map<Integer, List<PartitionInfo>> partitionsByNode;
    private final Map<Integer, Node> nodesById;
}

public class PartitionInfo {
    private final String topic;
    private final int partition;
    private final Node leader;
    private final Node[] replicas;
    private final Node[] inSyncReplicas;
}

producer讀取Metadata

下面是send函數(shù)的源碼抛姑,可以看到赞厕,在send之前,會(huì)先讀取metadata定硝。如果metadata讀不到皿桑,會(huì)一直阻塞在那,直到超時(shí)蔬啡,拋出TimeoutException

//KafkaProducer
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        try {
     long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);  //拿不到topic的配置信息诲侮,會(huì)一直阻塞在這,直到拋異常

     ... //拿到了箱蟆,執(zhí)行下面的send邏輯
     } catch()
     {}
 }

//KafkaProducer
    private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
        if (!this.metadata.containsTopic(topic))
            this.metadata.add(topic);

        if (metadata.fetch().partitionsForTopic(topic) != null)
            return 0;   //取到topic的配置信息沟绪,直接返回

        long begin = time.milliseconds();
        long remainingWaitMs = maxWaitMs;
        while (metadata.fetch().partitionsForTopic(topic) == null) { //取不到topic的配置信息,一直死循環(huán)wait顽腾,直到超時(shí)近零,拋TimeoutException
            log.trace("Requesting metadata update for topic {}.", topic);
            int version = metadata.requestUpdate(); //把needUpdate置為true
            sender.wakeup(); //喚起sender

            metadata.awaitUpdate(version, remainingWaitMs); //metadata的關(guān)鍵函數(shù)
            long elapsed = time.milliseconds() - begin;
            if (elapsed >= maxWaitMs)
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
            if (metadata.fetch().unauthorizedTopics().contains(topic))
                throw new TopicAuthorizationException(topic);
            remainingWaitMs = maxWaitMs - elapsed;
        }
        return time.milliseconds() - begin;
    }

//Metadata
    public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
        if (maxWaitMs < 0) {
            throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
        }
        long begin = System.currentTimeMillis();
        long remainingWaitMs = maxWaitMs;
        while (this.version <= lastVersion) {  //當(dāng)Sender成功更新meatadata之后,version加1抄肖。否則會(huì)循環(huán)久信,一直wait
            if (remainingWaitMs != 0
                wait(remainingWaitMs);  //線程的wait機(jī)制,wait和synchronized的配合使用
            long elapsed = System.currentTimeMillis() - begin;
            if (elapsed >= maxWaitMs)  //wait時(shí)間超出了最長等待時(shí)間
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
            remainingWaitMs = maxWaitMs - elapsed;
        }
    }

總結(jié):從上面代碼可以看出漓摩,producer wait metadata的時(shí)候裙士,有2個(gè)條件:
(1) while (metadata.fetch().partitionsForTopic(topic) == null)
(2)while (this.version <= lastVersion)

有wait就會(huì)有notify,notify在Sender更新Metadata的時(shí)候發(fā)出管毙。

Sender的創(chuàng)建

下面是KafkaProducer的構(gòu)造函數(shù)腿椎,從代碼可以看出,Sender就是KafkaProducer中創(chuàng)建的一個(gè)Thread.

private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        try {
        ...
                    this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); //構(gòu)造metadata

this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); //往metadata中夭咬,填入初始的啃炸,配置的node列表

            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());

            NetworkClient client = new NetworkClient(
                    new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", metricTags, channelBuilder),
                    this.metadata,
                    clientId,
                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
                    config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                    config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                    config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),

            this.sender = new Sender(client,  //構(gòu)造一個(gè)sender。sender本身實(shí)現(xiàn)的是Runnable接口
                    this.metadata,
                    this.accumulator,
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                    (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                    config.getInt(ProducerConfig.RETRIES_CONFIG),
                    this.metrics,
                    new SystemTime(),
                    clientId,
                    this.requestTimeoutMs);

            String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();  //一個(gè)線程卓舵,開啟sender

Sender poll()更新Metadata

public void run() {
        // main loop, runs until close is called
        while (running) {
            try {
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }
       南用。。。
    }

    public void run(long now) {
        Cluster cluster = metadata.fetch();
裹虫。肿嘲。。
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);   //遍歷消息隊(duì)列中所有的消息筑公,找出對應(yīng)的雳窟,已經(jīng)ready的Node

        if (result.unknownLeadersExist)  //如果一個(gè)ready的node都沒有,請求更新metadata
            this.metadata.requestUpdate();

  匣屡。封救。。

     //client的2個(gè)關(guān)鍵函數(shù)耸采,一個(gè)發(fā)送ClientRequest兴泥,一個(gè)接收ClientResponse。底層調(diào)用的是NIO的poll虾宇。關(guān)于nio, 后面會(huì)詳細(xì)介紹
        for (ClientRequest request : requests)
            client.send(request, now);

        this.client.poll(pollTimeout, now);
    }

//NetworkClient
    public List<ClientResponse> poll(long timeout, long now) {
        long metadataTimeout = metadataUpdater.maybeUpdate(now); //關(guān)鍵點(diǎn):每次poll的時(shí)候判斷是否要更新metadata

        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }

        // process completed actions
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);   //在返回的handler中搓彻,會(huì)處理metadata的更新
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleTimedOutRequests(responses, updatedNow);

        // invoke callbacks
        for (ClientResponse response : responses) {
            if (response.request().hasCallback()) {
                try {
                    response.request().callback().onComplete(response);
                } catch (Exception e) {
                    log.error("Uncaught error in request completion:", e);
                }
            }
        }

        return responses;
    }

 //DefaultMetadataUpdater
         @Override
        public long maybeUpdate(long now) {
            // should we update our metadata?
            long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
            long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
            long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
            // if there is no node available to connect, back off refreshing metadata
            long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
                    waitForMetadataFetch);

            if (metadataTimeout == 0) {
                // highly dependent on the behavior of leastLoadedNode.
                Node node = leastLoadedNode(now);  //找到負(fù)載最小的Node
                maybeUpdate(now, node); //把更新Metadata的請求,發(fā)給這個(gè)Node
            }

            return metadataTimeout;
        }

        private void maybeUpdate(long now, Node node) {
            if (node == null) {
                log.debug("Give up sending metadata request since no node is available");
                // mark the timestamp for no node available to connect
                this.lastNoNodeAvailableMs = now;
                return;
            }
            String nodeConnectionId = node.idString();

            if (canSendRequest(nodeConnectionId)) {
                Set<String> topics = metadata.needMetadataForAllTopics() ? new HashSet<String>() : metadata.topics();
                this.metadataFetchInProgress = true;
                ClientRequest metadataRequest = request(now, nodeConnectionId, topics);  //關(guān)鍵點(diǎn):發(fā)送更新Metadata的Request
                log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
                doSend(metadataRequest, now); //這里只是異步發(fā)送嘱朽,返回的response在上面的handleCompletedReceives里面處理
            } else if (connectionStates.canConnect(nodeConnectionId, now)) {
                log.debug("Initialize connection to node {} for sending metadata request", node.id());
                initiateConnect(node, now);

            } else { // connected, but can't send more OR connecting
                this.lastNoNodeAvailableMs = now;
            }
        }

     private void handleCompletedReceives(List<ClientResponse> responses, long now) {
        for (NetworkReceive receive : this.selector.completedReceives()) {
            String source = receive.source();
            ClientRequest req = inFlightRequests.completeNext(source);
            ResponseHeader header = ResponseHeader.parse(receive.payload());
            // Always expect the response version id to be the same as the request version id
            short apiKey = req.request().header().apiKey();
            short apiVer = req.request().header().apiVersion();
            Struct body = (Struct) ProtoUtils.responseSchema(apiKey, apiVer).read(receive.payload());
            correlate(req.request().header(), header);
            if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
                responses.add(new ClientResponse(req, now, false, body));
        }
    }

        @Override
        public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
            short apiKey = req.request().header().apiKey();
            if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
                handleResponse(req.request().header(), body, now);
                return true;
            }
            return false;
        }

//關(guān)鍵函數(shù)
        private void handleResponse(RequestHeader header, Struct body, long now) {
            this.metadataFetchInProgress = false;
            MetadataResponse response = new MetadataResponse(body);
            Cluster cluster = response.cluster();   //從response中旭贬,拿到一個(gè)新的cluster對象
            if (response.errors().size() > 0) {
                log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), response.errors());
            }

            if (cluster.nodes().size() > 0) {
                this.metadata.update(cluster, now);   //更新metadata,用新的cluster覆蓋舊的cluster
            } else {
                log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
                this.metadata.failedUpdate(now);  //更新metadata失敗搪泳,做失敗處理邏輯
            }
        }

//更新成功稀轨,version+1, 同時(shí)更新其它字段
    public synchronized void update(Cluster cluster, long now) {
        this.needUpdate = false;
        this.lastRefreshMs = now;
        this.lastSuccessfulRefreshMs = now;
        this.version += 1;

        for (Listener listener: listeners)
            listener.onMetadataUpdate(cluster);  //如果有人監(jiān)聽了metadata的更新,通知他們

        this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;    //新的cluster覆蓋舊的cluster

        notifyAll();  //通知所有的阻塞的producer線程

        log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
    }

//更新失敗岸军,只更新lastRefreshMs
    public synchronized void failedUpdate(long now) {
        this.lastRefreshMs = now;
    }

從上面可以看出奋刽,Metadata的更新,是在while循環(huán)艰赞,每次調(diào)用client.poll()的時(shí)候更新的佣谐。
更新機(jī)制又有以下2種:

Metadata的2種更新機(jī)制

(1)周期性的更新: 每隔一段時(shí)間更新一次,這個(gè)通過 Metadata的lastRefreshMs, lastSuccessfulRefreshMs 這2個(gè)字段來實(shí)現(xiàn)

對應(yīng)的ProducerConfig配置項(xiàng)為:
metadata.max.age.ms //缺省300000方妖,即10分鐘1次

(2) 失效檢測狭魂,強(qiáng)制更新:檢查到metadata失效以后,調(diào)用metadata.requestUpdate()強(qiáng)制更新党觅。 requestUpdate()函數(shù)里面其實(shí)什么都沒做雌澄,就是把needUpdate置成了false

每次poll的時(shí)候,都檢查這2種更新機(jī)制杯瞻,達(dá)到了镐牺,就觸發(fā)更新。

那如何判定Metadata失效了呢魁莉?這個(gè)在代碼中很分散任柜,有很多地方卒废,會(huì)判定Metadata失效。

Metadata失效檢測

條件1:initConnect的時(shí)候

private void initiateConnect(Node node, long now) {
        String nodeConnectionId = node.idString();
        try {
            log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
            this.connectionStates.connecting(nodeConnectionId, now);
            selector.connect(nodeConnectionId,
                             new InetSocketAddress(node.host(), node.port()),
                             this.socketSendBuffer,
                             this.socketReceiveBuffer);
        } catch (IOException e) {
            connectionStates.disconnected(nodeConnectionId, now);
            metadataUpdater.requestUpdate(); //判定metadata失效
            log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
        }
    }

條件2:poll里面IO的時(shí)候宙地,連接斷掉了

private void handleDisconnections(List<ClientResponse> responses, long now) {
        for (String node : this.selector.disconnected()) {
            log.debug("Node {} disconnected.", node);
            processDisconnection(responses, node, now);
        }
        if (this.selector.disconnected().size() > 0)
            metadataUpdater.requestUpdate();  //判定metadata失效
    }

條件3:有請求超時(shí)

private void handleTimedOutRequests(List<ClientResponse> responses, long now) {
        List<String> nodeIds = this.inFlightRequests.getNodesWithTimedOutRequests(now, this.requestTimeoutMs);
        for (String nodeId : nodeIds) {
            this.selector.close(nodeId);
            log.debug("Disconnecting from node {} due to request timeout.", nodeId);
            processDisconnection(responses, nodeId, now);
        }

        if (nodeIds.size() > 0)
            metadataUpdater.requestUpdate();  //判定metadata失效
    }

條件4:發(fā)消息的時(shí)候,有partition的leader沒找到

public void run(long now) {
        Cluster cluster = metadata.fetch();
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        if (result.unknownLeadersExist)
            this.metadata.requestUpdate();

條件5:返回的response和請求對不上的時(shí)候

private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) {
        int correlationId = response.request().request().header().correlationId();
        if (response.wasDisconnected()) {
            log.trace("Cancelled request {} due to node {} being disconnected", response, response.request()
                                                                                                  .request()
                                                                                                  .destination());
            for (RecordBatch batch : batches.values())
                completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlationId, now);

總之1句話:發(fā)生各式各樣的異常逆皮,數(shù)據(jù)不同步宅粥,都認(rèn)為metadata可能出問題了,要求更新电谣。

Metadata其他的更新策略

除了上面所述秽梅,Metadata的更新,還有以下幾個(gè)特點(diǎn):

1.更新請求MetadataRequest是nio異步發(fā)送的剿牺,在poll的返回中企垦,處理MetadataResponse的時(shí)候,才真正更新Metadata晒来。

這里有個(gè)關(guān)鍵點(diǎn):Metadata的cluster對象钞诡,每次是整個(gè)覆蓋的,而不是局部更新湃崩。所以cluster內(nèi)部不用加鎖荧降。

2.更新的時(shí)候,是從metadata保存的所有Node攒读,或者說Broker中朵诫,選負(fù)載最小的那個(gè),也就是當(dāng)前接收請求最少的那個(gè)薄扁。向其發(fā)送MetadataRequest請求剪返,獲取新的Cluster對象。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末邓梅,一起剝皮案震驚了整個(gè)濱河市脱盲,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌震放,老刑警劉巖宾毒,帶你破解...
    沈念sama閱讀 219,110評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異殿遂,居然都是意外死亡诈铛,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,443評論 3 395
  • 文/潘曉璐 我一進(jìn)店門墨礁,熙熙樓的掌柜王于貴愁眉苦臉地迎上來幢竹,“玉大人,你說我怎么就攤上這事恩静』篮粒” “怎么了蹲坷?”我有些...
    開封第一講書人閱讀 165,474評論 0 356
  • 文/不壞的土叔 我叫張陵,是天一觀的道長邑飒。 經(jīng)常有香客問我循签,道長,這世上最難降的妖魔是什么疙咸? 我笑而不...
    開封第一講書人閱讀 58,881評論 1 295
  • 正文 為了忘掉前任县匠,我火速辦了婚禮,結(jié)果婚禮上撒轮,老公的妹妹穿的比我還像新娘乞旦。我一直安慰自己,他們只是感情好题山,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,902評論 6 392
  • 文/花漫 我一把揭開白布兰粉。 她就那樣靜靜地躺著,像睡著了一般顶瞳。 火紅的嫁衣襯著肌膚如雪玖姑。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,698評論 1 305
  • 那天浊仆,我揣著相機(jī)與錄音客峭,去河邊找鬼。 笑死抡柿,一個(gè)胖子當(dāng)著我的面吹牛舔琅,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播洲劣,決...
    沈念sama閱讀 40,418評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼备蚓,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了囱稽?” 一聲冷哼從身側(cè)響起郊尝,我...
    開封第一講書人閱讀 39,332評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎战惊,沒想到半個(gè)月后流昏,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,796評論 1 316
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡吞获,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,968評論 3 337
  • 正文 我和宋清朗相戀三年况凉,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片各拷。...
    茶點(diǎn)故事閱讀 40,110評論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡刁绒,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出烤黍,到底是詐尸還是另有隱情知市,我是刑警寧澤傻盟,帶...
    沈念sama閱讀 35,792評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站嫂丙,受9級特大地震影響娘赴,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜奢入,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,455評論 3 331
  • 文/蒙蒙 一筝闹、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧腥光,春花似錦、人聲如沸糊秆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,003評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽痘番。三九已至捉片,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間汞舱,已是汗流浹背伍纫。 一陣腳步聲響...
    開封第一講書人閱讀 33,130評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留昂芜,地道東北人莹规。 一個(gè)月前我還...
    沈念sama閱讀 48,348評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像泌神,于是被迫代替她去往敵國和親良漱。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,047評論 2 355

推薦閱讀更多精彩內(nèi)容