Kafka 源碼解析之 Producer 發(fā)送

轉(zhuǎn)載: http://matt33.com/2017/06/25/kafka-producer-send-module/
[TOC]
Kafka躺酒,作為目前在大數(shù)據(jù)領(lǐng)域應(yīng)用最為廣泛的消息隊(duì)列,其內(nèi)部實(shí)現(xiàn)和設(shè)計(jì)有很多值得深入研究和分析的地方阳似。

再 0.10.2 的 Kafka 中,其 Client 端是由 Java 實(shí)現(xiàn)誓酒,Server 端是由 Scala 來(lái)實(shí)現(xiàn)的咖为,在使用 Kafka 時(shí)柒凉,Client 是用戶最先接觸到部分苞笨,因此债朵,計(jì)劃寫(xiě)的源碼分析也會(huì)從 Client 端開(kāi)始,會(huì)先從 Producer 端開(kāi)始瀑凝,今天講的是 Producer 端的發(fā)送模型的實(shí)現(xiàn)序芦。

Producer 使用

在分析 Producer 發(fā)送模型之前,先看一下用戶是如何使用 Producer 向 Kafka 寫(xiě)數(shù)據(jù)的粤咪,下面是一個(gè)關(guān)于 Producer 最簡(jiǎn)單的應(yīng)用示例谚中。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Producer;

import java.util.Properties;

public class ProducerTest {
    private static String topicName;
    private static int msgNum;
    private static int key;

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092,127.0.0.2:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        topicName = "test";
        msgNum = 10; // 發(fā)送的消息數(shù)

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < msgNum; i++) {
            String msg = i + " This is matt's blog.";
            producer.send(new ProducerRecord<String, String>(topicName, msg));
        }
        producer.close();
    }
}

從上面的代碼可以看出 Kafka 為用戶提供了非常簡(jiǎn)單的 API,在使用時(shí)寥枝,只需要如下兩步:

  1. 初始化 KafkaProducer 實(shí)例藏杖;
  2. 調(diào)用 send 接口發(fā)送數(shù)據(jù)。

本文主要是圍繞著 Producer 在內(nèi)部是如何實(shí)現(xiàn) send 接口而展開(kāi)的脉顿。

Producer 數(shù)據(jù)發(fā)送流程

下面通過(guò)對(duì) send 源碼分析來(lái)一步步剖析 Producer 數(shù)據(jù)的發(fā)送流程。

Producer 的 send 實(shí)現(xiàn)

用戶是直接使用 producer.send() 發(fā)送的數(shù)據(jù)点寥,先看一下 send() 接口的實(shí)現(xiàn)

// 異步向一個(gè) topic 發(fā)送數(shù)據(jù)
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
    return send(record, null);
}

// 向 topic 異步地發(fā)送數(shù)據(jù)艾疟,當(dāng)發(fā)送確認(rèn)后喚起回調(diào)函數(shù)
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
    ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
}

數(shù)據(jù)發(fā)送的最終實(shí)現(xiàn)還是調(diào)用了 Producer 的 doSend() 接口。

Producer 的 doSend 實(shí)現(xiàn)

下面是 doSend() 的具體實(shí)現(xiàn)

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
       TopicPartition tp = null;
       try {
           // 1.確認(rèn)數(shù)據(jù)要發(fā)送到的 topic 的 metadata 是可用的
           ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
           long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
           Cluster cluster = clusterAndWaitTime.cluster;
           // 2.序列化 record 的 key 和 value
           byte[] serializedKey;
           try {
               serializedKey = keySerializer.serialize(record.topic(), record.key());
           } catch (ClassCastException cce) {
               throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                       " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                       " specified in key.serializer");
           }
           byte[] serializedValue;
           try {
               serializedValue = valueSerializer.serialize(record.topic(), record.value());
           } catch (ClassCastException cce) {
               throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                       " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                       " specified in value.serializer");
           }

           // 3. 獲取該 record 的 partition 的值(可以指定,也可以根據(jù)算法計(jì)算)
           int partition = partition(record, serializedKey, serializedValue, cluster);
           int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
           ensureValidRecordSize(serializedSize); // record 的字節(jié)超出限制或大于內(nèi)存限制時(shí),就會(huì)拋出 RecordTooLargeException 異常
           tp = new TopicPartition(record.topic(), partition);
           long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); // 時(shí)間戳
           log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
           Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
           // 4. 向 accumulator 中追加數(shù)據(jù)
           RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
           // 5. 如果 batch 已經(jīng)滿了,喚醒 sender 線程發(fā)送數(shù)據(jù)
           if (result.batchIsFull || result.newBatchCreated) {
               log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
               this.sender.wakeup();
           }
           return result.future;
       } catch (ApiException e) {
           log.debug("Exception occurred during message send:", e);
           if (callback != null)
               callback.onCompletion(null, e);
           this.errors.record();
           if (this.interceptors != null)
               this.interceptors.onSendError(record, tp, e);
           return new FutureFailure(e);
       } catch (InterruptedException e) {
           this.errors.record();
           if (this.interceptors != null)
               this.interceptors.onSendError(record, tp, e);
           throw new InterruptException(e);
       } catch (BufferExhaustedException e) {
           this.errors.record();
           this.metrics.sensor("buffer-exhausted-records").record();
           if (this.interceptors != null)
               this.interceptors.onSendError(record, tp, e);
           throw e;
       } catch (KafkaException e) {
           this.errors.record();
           if (this.interceptors != null)
               this.interceptors.onSendError(record, tp, e);
           throw e;
       } catch (Exception e) {
           if (this.interceptors != null)
               this.interceptors.onSendError(record, tp, e);
           throw e;
       }
   }

在 dosend() 方法的實(shí)現(xiàn)上敢辩,一條 Record 數(shù)據(jù)的發(fā)送蔽莱,可以分為以下五步:

  1. 確認(rèn)數(shù)據(jù)要發(fā)送到的 topic 的 metadata 是可用的(如果該 partition 的 leader 存在則是可用的,如果開(kāi)啟權(quán)限時(shí)戚长,client 有相應(yīng)的權(quán)限)盗冷,如果沒(méi)有 topic 的 metadata 信息,就需要獲取相應(yīng)的 metadata同廉;
  2. 序列化 record 的 key 和 value仪糖;
  3. 獲取該 record 要發(fā)送到的 partition(可以指定柑司,也可以根據(jù)算法計(jì)算);
  4. 向 accumulator 中追加 record 數(shù)據(jù)锅劝,數(shù)據(jù)會(huì)先進(jìn)行緩存攒驰;
  5. 如果追加完數(shù)據(jù)后,對(duì)應(yīng)的 RecordBatch 已經(jīng)達(dá)到了 batch.size 的大泄示簟(或者batch 的剩余空間不足以添加下一條 Record)玻粪,則喚醒 sender 線程發(fā)送數(shù)據(jù)。

數(shù)據(jù)的發(fā)送過(guò)程诬垂,可以簡(jiǎn)單總結(jié)為以上五點(diǎn)劲室,下面會(huì)這幾部分的具體實(shí)現(xiàn)進(jìn)行詳細(xì)分析。

發(fā)送過(guò)程詳解

獲取 topic 的 metadata 信息

Producer 通過(guò) waitOnMetadata() 方法來(lái)獲取對(duì)應(yīng) topic 的 metadata 信息结窘,這部分后面會(huì)單獨(dú)抽出一篇文章來(lái)介紹很洋,這里就不再詳述,總結(jié)起來(lái)就是:在數(shù)據(jù)發(fā)送前晦鞋,需要先該 topic 是可用的蹲缠。

key 和 value 的序列化
Producer 端對(duì) record 的 key 和 value 值進(jìn)行序列化操作,在 Consumer 端再進(jìn)行相應(yīng)的反序列化悠垛,Kafka 內(nèi)部提供的序列化和反序列化算法如下圖所示:

image.png

當(dāng)然我們也是可以自定義序列化的具體實(shí)現(xiàn)线定,不過(guò)一般情況下,Kafka 內(nèi)部提供的這些方法已經(jīng)足夠使用确买。

獲取 partition 值
關(guān)于 partition 值的計(jì)算斤讥,分為三種情況:

  1. 指明 partition 的情況下,直接將指明的值直接作為 partiton 值湾趾;
  2. 沒(méi)有指明 partition 值但有 key 的情況下芭商,將 key 的 hash 值與 topic 的 partition 數(shù)進(jìn)行取余得到 partition 值;
  3. 既沒(méi)有 partition 值又沒(méi)有 key 值的情況下搀缠,第一次調(diào)用時(shí)隨機(jī)生成一個(gè)整數(shù)(后面每次調(diào)用在這個(gè)整數(shù)上自增)铛楣,將這個(gè)值與 topic 可用的 partition 總數(shù)取余得到 partition 值,也就是常說(shuō)的 round-robin 算法艺普。

具體實(shí)現(xiàn)如下:

// 當(dāng) record 中有 partition 值時(shí)簸州,直接返回,沒(méi)有的情況下調(diào)用 partitioner 的類的 partition 方法去計(jì)算(KafkaProducer.class)
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    return partition != null ?
            partition :
            partitioner.partition(
                    record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

Producer 默認(rèn)使用的 partitioner 是 org.apache.kafka.clients.producer.internals.DefaultPartitioner歧譬,用戶也可以自定義 partition 的策略岸浑,下面是這個(gè)類兩個(gè)方法的具體實(shí)現(xiàn):

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {// 沒(méi)有指定 key 的情況下
            int nextValue = nextValue(topic); // 第一次的時(shí)候產(chǎn)生一個(gè)隨機(jī)整數(shù),后面每次調(diào)用在之前的基礎(chǔ)上自增;
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            // leader 不為 null,即為可用的 partition
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {// 有 key 的情況下,使用 key 的 hash 值進(jìn)行計(jì)算
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; // 選擇 key 的 hash 值
        }
    }

    // 根據(jù) topic 獲取對(duì)應(yīng)的整數(shù)變量
    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) { // 第一次調(diào)用時(shí),隨機(jī)產(chǎn)生
            counter = new AtomicInteger(new Random().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement(); // 后面再調(diào)用時(shí)瑰步,根據(jù)之前的結(jié)果自增
    }

這就是 Producer 中默認(rèn)的 partitioner 實(shí)現(xiàn)矢洲。

向 accumulator 寫(xiě)數(shù)據(jù)

Producer 會(huì)先將 record 寫(xiě)入到 buffer 中,當(dāng)達(dá)到一個(gè) batch.size 的大小時(shí)缩焦,再喚起 sender 線程去發(fā)送 RecordBatch(第五步)读虏,這里先詳細(xì)分析一下 Producer 是如何向 buffer 中寫(xiě)入數(shù)據(jù)的责静。

Producer 是通過(guò) RecordAccumulator 實(shí)例追加數(shù)據(jù),RecordAccumulator 模型如下圖所示掘譬,一個(gè)重要的變量就是 ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches泰演,每個(gè) TopicPartition 都會(huì)對(duì)應(yīng)一個(gè) Deque<RecordBatch>,當(dāng)添加數(shù)據(jù)時(shí)葱轩,會(huì)向其 topic-partition 對(duì)應(yīng)的這個(gè) queue 最新創(chuàng)建的一個(gè) RecordBatch 中添加 record睦焕,而發(fā)送數(shù)據(jù)時(shí),則會(huì)先從 queue 中最老的那個(gè) RecordBatch 開(kāi)始發(fā)送靴拱。

image.png
/ org.apache.kafka.clients.producer.internals.RecordAccumulator
     // 向 accumulator 添加一條 record垃喊,并返回添加后的結(jié)果(結(jié)果主要包含: future metadata靴寂、batch 是否滿的標(biāo)志以及新 batch 是否創(chuàng)建)其中牛柒, maxTimeToBlock 是 buffer.memory 的 block 的最大時(shí)間
    public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        appendsInProgress.incrementAndGet();
        try {
            Deque<RecordBatch> dq = getOrCreateDeque(tp);// 每個(gè) topicPartition 對(duì)應(yīng)一個(gè) queue
            synchronized (dq) {// 在對(duì)一個(gè) queue 進(jìn)行操作時(shí),會(huì)保證線程安全
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); // 追加數(shù)據(jù)
                if (appendResult != null)// 這個(gè) topic-partition 已經(jīng)有記錄了
                    return appendResult;
            }

            // 為 topic-partition 創(chuàng)建一個(gè)新的 RecordBatch, 需要初始化相應(yīng)的 RecordBatch熄攘,要為其分配的大小是: max(batch.size, 加上頭文件的本條消息的大屑岵取)
            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);// 給這個(gè) RecordBatch 初始化一個(gè) buffer
            synchronized (dq) {
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");

                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                if (appendResult != null) {// 如果突然發(fā)現(xiàn)這個(gè) queue 已經(jīng)存在,那么就釋放這個(gè)已經(jīng)分配的空間
                    free.deallocate(buffer);
                    return appendResult;
                }
                // 給 topic-partition 創(chuàng)建一個(gè) RecordBatch
                MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder(buffer, compression, TimestampType.CREATE_TIME, this.batchSize);
                RecordBatch batch = new RecordBatch(tp, recordsBuilder, time.milliseconds());
                // 向新的 RecordBatch 中追加數(shù)據(jù)
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

                dq.addLast(batch);// 將 RecordBatch 添加到對(duì)應(yīng)的 queue 中
                incomplete.add(batch);// 向未 ack 的 batch 集合添加這個(gè) batch
                // 如果 dp.size()>1 就證明這個(gè) queue 有一個(gè) batch 是可以發(fā)送了
                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
            }
        } finally {
            appendsInProgress.decrementAndGet();
        }
    }

總結(jié)一下其 record 寫(xiě)入的具體流程如下圖所示:

image.png
  1. 獲取該 topic-partition 對(duì)應(yīng)的 queue购公,沒(méi)有的話會(huì)創(chuàng)建一個(gè)空的 queue呻率;
  2. 向 queue 中追加數(shù)據(jù)很澄,先獲取 queue 中最新加入的那個(gè) RecordBatch陌知,如果不存在或者存在但剩余空余不足以添加本條 record 則返回 null他托,成功寫(xiě)入的話直接返回結(jié)果,寫(xiě)入成功仆葡;
  3. 創(chuàng)建一個(gè)新的 RecordBatch赏参,初始化內(nèi)存大小根據(jù) max(batch.size, Records.LOG_OVERHEAD + Record.recordSize(key, value)) 來(lái)確定(防止單條 record 過(guò)大的情況);
  4. 向新建的 RecordBatch 寫(xiě)入 record沿盅,并將 RecordBatch 添加到 queue 中把篓,返回結(jié)果,寫(xiě)入成功腰涧。

發(fā)送 RecordBatch

當(dāng) record 寫(xiě)入成功后韧掩,如果發(fā)現(xiàn) RecordBatch 已滿足發(fā)送的條件(通常是 queue 中有多個(gè) batch,那么最先添加的那些 batch 肯定是可以發(fā)送了)窖铡,那么就會(huì)喚醒 sender 線程揍很,發(fā)送 RecordBatch。

sender 線程對(duì) RecordBatch 的處理是在 run() 方法中進(jìn)行的万伤,該方法具體實(shí)現(xiàn)如下:

void run(long now) {
        Cluster cluster = metadata.fetch();
        // 獲取那些已經(jīng)可以發(fā)送的 RecordBatch 對(duì)應(yīng)的 nodes
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // 如果有 topic-partition 的 leader 是未知的,就強(qiáng)制 metadata 更新
        if (!result.unknownLeaderTopics.isEmpty()) {
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);
            this.metadata.requestUpdate();
        }

        // 如果與node 沒(méi)有連接(如果可以連接,同時(shí)初始化該連接),就證明該 node 暫時(shí)不能發(fā)送數(shù)據(jù),暫時(shí)移除該 node
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        // 返回該 node 對(duì)應(yīng)的所有可以發(fā)送的 RecordBatch 組成的 batches(key 是 node.id),并將 RecordBatch 從對(duì)應(yīng)的 queue 中移除
        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
        if (guaranteeMessageOrder) {
            //記錄將要發(fā)送的 RecordBatch
            for (List<RecordBatch> batchList : batches.values()) {
                for (RecordBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

        // 將由于元數(shù)據(jù)不可用而導(dǎo)致發(fā)送超時(shí)的 RecordBatch 移除
        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
        for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

        sensors.updateProduceRequestMetrics(batches);

        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (!result.readyNodes.isEmpty()) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            pollTimeout = 0;
        }
        // 發(fā)送 RecordBatch
        sendProduceRequests(batches, now);

        this.client.poll(pollTimeout, now); // 關(guān)于 socket 的一些實(shí)際的讀寫(xiě)操作(其中包括 meta 信息的更新)
    }

這段代碼前面有很多是其他的邏輯處理,如:移除暫時(shí)不可用的 node呜袁、處理由于元數(shù)據(jù)不可用導(dǎo)致的超時(shí) RecordBatch敌买,真正進(jìn)行發(fā)送發(fā)送 RecordBatch 的是 sendProduceRequests(batches, now) 這個(gè)方法,具體是:

/**
 * Transfer the record batches into a list of produce requests on a per-node basis
 */
private void sendProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
    for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
        sendProduceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue());
}

/**
 * Create a produce request from the given record batches
 */
// 發(fā)送 produce 請(qǐng)求
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
    Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
    final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<>(batches.size());
    for (RecordBatch batch : batches) {
        TopicPartition tp = batch.topicPartition;
        produceRecordsByPartition.put(tp, batch.records());
        recordsByPartition.put(tp, batch);
    }

    ProduceRequest.Builder requestBuilder =
            new ProduceRequest.Builder(acks, timeout, produceRecordsByPartition);
    RequestCompletionHandler callback = new RequestCompletionHandler() {
        public void onComplete(ClientResponse response) {
            handleProduceResponse(response, recordsByPartition, time.milliseconds());
        }
    };

    String nodeId = Integer.toString(destination);
    ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
    client.send(clientRequest, now);
    log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}

這段代碼就簡(jiǎn)單很多阶界,總來(lái)起來(lái)就是虹钮,將 batches 中 leader 為同一個(gè) node 的所有 RecordBatch 放在一個(gè)請(qǐng)求中進(jìn)行發(fā)送聋庵。這里并沒(méi)有真正進(jìn)行發(fā)送,真正網(wǎng)絡(luò)發(fā)送是在后面的poll方法里面芙粱。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末祭玉,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子春畔,更是在濱河造成了極大的恐慌脱货,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,888評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件律姨,死亡現(xiàn)場(chǎng)離奇詭異振峻,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)择份,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,677評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門扣孟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人荣赶,你說(shuō)我怎么就攤上這事凤价。” “怎么了拔创?”我有些...
    開(kāi)封第一講書(shū)人閱讀 168,386評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵利诺,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我伏蚊,道長(zhǎng)立轧,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 59,726評(píng)論 1 297
  • 正文 為了忘掉前任躏吊,我火速辦了婚禮氛改,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘比伏。我一直安慰自己胜卤,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,729評(píng)論 6 397
  • 文/花漫 我一把揭開(kāi)白布赁项。 她就那樣靜靜地躺著葛躏,像睡著了一般。 火紅的嫁衣襯著肌膚如雪悠菜。 梳的紋絲不亂的頭發(fā)上舰攒,一...
    開(kāi)封第一講書(shū)人閱讀 52,337評(píng)論 1 310
  • 那天,我揣著相機(jī)與錄音悔醋,去河邊找鬼摩窃。 笑死,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的猾愿。 我是一名探鬼主播鹦聪,決...
    沈念sama閱讀 40,902評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼蒂秘!你這毒婦竟也來(lái)了泽本?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,807評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤姻僧,失蹤者是張志新(化名)和其女友劉穎规丽,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體段化,經(jīng)...
    沈念sama閱讀 46,349評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡嘁捷,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,439評(píng)論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了显熏。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片雄嚣。...
    茶點(diǎn)故事閱讀 40,567評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖喘蟆,靈堂內(nèi)的尸體忽然破棺而出缓升,到底是詐尸還是另有隱情,我是刑警寧澤蕴轨,帶...
    沈念sama閱讀 36,242評(píng)論 5 350
  • 正文 年R本政府宣布港谊,位于F島的核電站,受9級(jí)特大地震影響橙弱,放射性物質(zhì)發(fā)生泄漏歧寺。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,933評(píng)論 3 334
  • 文/蒙蒙 一棘脐、第九天 我趴在偏房一處隱蔽的房頂上張望斜筐。 院中可真熱鬧,春花似錦蛀缝、人聲如沸顷链。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,420評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)嗤练。三九已至,卻和暖如春在讶,著一層夾襖步出監(jiān)牢的瞬間煞抬,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,531評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工构哺, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留此疹,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,995評(píng)論 3 377
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像蝗碎,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子旗扑,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,585評(píng)論 2 359