[kafka系列]之producer端消息發(fā)送

本小節(jié)我們來討論Kafka生產(chǎn)者是如何發(fā)送消息到Kafka的凯旭, Kafka項目有一個生產(chǎn)者客戶端,我們可以通過這個客戶端的API來發(fā)送消息。生產(chǎn)者客戶端是用Java寫的罐呼,但Kafka寫消息的協(xié)議是支持多語言的鞠柄,其它語言的api可見這個wiki


概要

通過本文,你可以了解到以下內(nèi)容:

  • kafka producer端的整體結(jié)構(gòu),相關(guān)參數(shù)配置,以及性能優(yōu)化;
  • 分區(qū)器,攔截器的擴展;
  • 消息序列化擴展;
  • 分區(qū)器,攔截器,序列化的執(zhí)行順序;

開始

很多做業(yè)務(wù)的同學(xué)都知道,在我們系統(tǒng)中發(fā)送一條消息給kafka 集群,我們只需要簡單的調(diào)一下已經(jīng)封裝好的接口,下面是來于我實際項目中的接口方法:

kafkaProducer.produce(String topic,Object msg)

每次要發(fā)消息,我就是這么簡單的調(diào)用一下就能確保消息能被consumer端正常的消費,但是kafka producer做了哪些工作我卻渾然不知,今天我就跟大家說到底說道這個里面到底有哪些不為人知的操作;

  • 引出第一個問題,kafka消息的發(fā)送是一個什么樣的過程? 這個過程中做了哪些操作?

借助于kafka官網(wǎng)上的API,首先給大家來一張producer端的消息流轉(zhuǎn)圖:

消息發(fā)送.png

接下來,結(jié)合一段代碼,給大家簡單說下流程:

    Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("compresstion.type","snappy");
        props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < MAZ_RETRY_SIZE; i++) {
            producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
        }
producer.close();

流程如下:

  • 首先需要指定kafka producer端的配置;
  1. zk的地址和端口;
  2. producer端ack應(yīng)答機制,本demo中ack設(shè)置為all,表示生產(chǎn)者會等待所有副本成功寫入該消息,這種方式是最安全的嫉柴,能夠保證消息不丟失厌杜,但是延遲也是最大的;
  3. retries設(shè)置標(biāo)示消息發(fā)送失敗,生產(chǎn)者可以自動重試,但是此刻設(shè)置為0標(biāo)示不重試;這個參數(shù)需要結(jié)合retry.backoff.ms(重試等待間隔)來使用,建議總的重試時間比集群重新選舉群首的時間長,這樣可以避免生產(chǎn)者過早結(jié)束重試導(dǎo)致失敗;
  4. batch.size參數(shù)標(biāo)示生產(chǎn)者為每個分區(qū)維護了一個未發(fā)送記錄的緩沖區(qū),這個緩沖區(qū)的大小由batch.size配置指定,配置的很大可能會導(dǎo)致更多的批處理,也需要更多的內(nèi)存(但是對于每個活動分區(qū)计螺,我們通常都有一個這樣的緩沖區(qū)),默認是16384Bytes;
  5. linger.ms 指定生產(chǎn)者在發(fā)送批量消息前等待的時間,當(dāng)設(shè)置此參數(shù)后,即便沒有達到批量消息的指定大小,到達時間后生產(chǎn)者也會發(fā)送批量消息到broker.默認情況下,生產(chǎn)者的發(fā)送消息線程只要空閑了就會發(fā)送消息,即便只有一條消息.設(shè)置這個參數(shù)后,發(fā)送線程會等待一定的時間,這樣可以批量發(fā)送消息增加吞吐量,但同時也會增加延遲;
  6. buffer.memory控制生產(chǎn)者可用于緩沖的內(nèi)存總量;消息的發(fā)送速度超過了它們可以傳輸?shù)椒?wù)器的速度,那么這個緩沖空間將被耗盡.
    當(dāng)緩沖區(qū)空間耗盡時,額外的發(fā)送調(diào)用將阻塞.阻止時間的閾值由max.block.ms確定夯尽,在此之后它將引發(fā)TimeoutException.這個緩存是針對每個producerThread,不應(yīng)設(shè)置高以免影響內(nèi)存;

生產(chǎn)者如果每發(fā)送一條消息都直接通過網(wǎng)絡(luò)發(fā)送到服務(wù)端,勢必會造成過多 的網(wǎng)絡(luò)請求登馒。如果我們能夠?qū)⒍鄺l消息按照分區(qū)進行分組匙握,并采用批量的方式一次發(fā)送一個消息集,并且對消息集進行壓縮谊娇,就可以減少網(wǎng)絡(luò)傳輸?shù)膸挿喂拢M一步提高數(shù)據(jù)的傳輸效率。

  1. key.serializervalue.serializer指定了如何將key和value序列化成二進制碼流的方式,也就是上圖中的序列化方式;
  2. compresstion.type:默認情況下消息是不壓縮的济欢,這個參數(shù)可以指定使用消息壓縮,參數(shù)可以取值為snappy小渊、gzip或者lz4;
  • 接下來,我們需要創(chuàng)建一個ProducerRecord法褥,這個對象需要包含消息的topic和值value,可以選擇性指定一個鍵值key或者分區(qū)partition酬屉。
  • 發(fā)送消息時半等,生產(chǎn)者會根據(jù)配置的key.serializervalue.serializer對鍵值和值序列化成字節(jié)數(shù)組,然后發(fā)送到分配器partitioner呐萨。
  • 如果我們指定了分區(qū)杀饵,那么分配器返回該分區(qū)即可;否則,分配器將會基于鍵值來選擇一個分區(qū)并返回。
  • 選擇完分區(qū)后谬擦,生產(chǎn)者知道了消息所屬的主題和分區(qū)切距,它將這條記錄添加到相同主題和分區(qū)的批量消息中,另一個線程負責(zé)發(fā)送這些批量消息到對應(yīng)的Kafka broker惨远。
  • 當(dāng)broker接收到消息后谜悟,如果成功寫入則返回一個包含消息的主題、分區(qū)及位移的RecordMetadata對象北秽,否則返回異常.
  • 生產(chǎn)者接收到結(jié)果后葡幸,對于異常可能會進行重試,根據(jù)參數(shù)reties的配置決定.

kafka發(fā)送端文件存儲原理

我們普遍認為一旦涉及到磁盤的訪問贺氓,數(shù)據(jù)的讀寫就會變得很慢蔚叨,其實不然,操作系統(tǒng)已經(jīng)針對磁盤的訪問速率做了很大的優(yōu)化;比如,預(yù)讀會提前將一個比較大的磁盤讀入內(nèi)存蔑水,后寫會把很多小的邏輯寫操作合并起來組合成一個大的物理寫操作邢锯;并且,操作系統(tǒng)還會將主內(nèi)存剩余的所有空間都用作磁盤緩存肤粱,所有的磁盤讀寫都會經(jīng)過統(tǒng)一的磁盤緩存弹囚,綜上所述,如果針對磁盤的順序讀寫领曼,某些情況它可能比隨機的內(nèi)存訪問都要快鸥鹉。

文件寫入的邏輯無外乎一下這兩種,但kafka選擇了第一種庶骄,也就是a圖的邏輯:

image.png

b圖是首先在內(nèi)存中保存盡可能多的數(shù)據(jù)毁渗,并在需要時將這些數(shù)據(jù)刷新進磁盤;
a圖是所有數(shù)據(jù)立即寫入磁盤单刁,但不進行刷新數(shù)據(jù)的調(diào)用灸异,數(shù)據(jù)首先會被傳輸?shù)酱疟P緩存,操作系統(tǒng)隨后會將數(shù)據(jù)定期自動刷新到磁盤羔飞。

發(fā)送端優(yōu)化

新的API中肺樟,生產(chǎn)者要發(fā)送消息,并不是直接發(fā)送給服務(wù)器逻淌,而是在客戶端先把消息放入一個緩沖隊列中么伯,然后由一個消息發(fā)送線程從隊列中拉取消息,以批鹽的方式發(fā)送消息給服務(wù)端卡儒。 Kafka的記錄收集器RecordAccumulator 負責(zé)緩存生產(chǎn)者客戶端產(chǎn)生的消息田柔,發(fā)送線程( Sender)負責(zé)讀取記錄收集器的批量消息, 通過網(wǎng)絡(luò)發(fā)送給服務(wù)端骨望。

開篇我們便列出了kafka 發(fā)送端的流程圖硬爆,消息發(fā)送之初,首先會為消息指定一個分區(qū)(發(fā)送消息時未指定分區(qū)的情況下)擎鸠,對于沒有鍵的消息缀磕,通過計數(shù)器自增輪詢的方式依次將消息分配到不同的分區(qū)上;對于有鍵的消息,對鍵計算散列值糠亩,然后和主題的分區(qū)數(shù)進行取模得到分區(qū)編號虐骑,具體的客戶端代碼實現(xiàn):

public int partition(ProducerRecord<byte[], byte[]> record, Cluster cluster) {
       //獲取集群中所有的分區(qū)
        List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
        int numPartitions = partitions.size();
        //如果指定分區(qū)
        if (record.partition() != null) {
            // they have given us a partition, use it
            if (record.partition() < 0 || record.partition() >= numPartitions)
                throw new IllegalArgumentException("Invalid partition given with record: " + record.partition()
                                                   + " is not in the range [0..."
                                                   + numPartitions
                                                   + "].");
            return record.partition();
      //  如果沒有key,則負載均衡的分布
        } else if (record.key() == null) {
            int nextValue = counter.getAndIncrement();
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(record.topic());
            if (availablePartitions.size() > 0) {
                int part = Utils.abs(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.abs(nextValue) % numPartitions;
            }
        } else {
            // 如果有key赎线,則對key 進行hash取模運算
            return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
        }
    }

}

在客戶端就為消息選擇分區(qū)的目的是什么? 只有為消息選擇分區(qū)廷没,我們才能知道應(yīng)該發(fā)送到哪個節(jié)點,如果隨便找一個服務(wù)端節(jié)點垂寥,再由那個節(jié)點去決定如何將消息轉(zhuǎn)發(fā)給其他正確的節(jié)點來保存颠黎。這種方式增加了服務(wù)端的負擔(dān)另锋,多了不必要的數(shù)據(jù)傳輸。

序列化

在上述代碼中,我們看到了kafka producer在發(fā)送消息的時候會將key和value進行序列化,上面的程序中使用的是Kafka客戶端自帶的org.apache.kafka.common.serialization.StringSerializer狭归,除了用于String類型的序列化器之外還有:ByteArray夭坪、ByteBufferBytes过椎、Double室梅、IntegerLong這幾種類型,這幾個序列化類都實現(xiàn)了org.apache.kafka.common.serialization.Serializer接口接下來,此接口有三種方法:

  • public void configure(Map<String, ?> configs, boolean isKey):用來配置當(dāng)前類疚宇。
  • public byte[] serialize(String topic, T data):用來執(zhí)行序列化亡鼠。
  • public void close():用來關(guān)閉當(dāng)前序列化器。一般情況下這個方法都是個空方法敷待,如果實現(xiàn)了此方法间涵,必須確保此方法的冪等性,因為這個方法很可能會被KafkaProducer調(diào)用多次榜揖。

業(yè)界用的多的序列化框架無外乎如Avro勾哩、JSON、Thrift举哟、ProtoBuf或者Protostuff等工具,這里我就不擴展開了,讀者如果感興趣可以搜索相關(guān)的資料,下面就以一個簡單的例子來介紹下如何自定義序列化方式.

假設(shè)我們有一個自定義的Company類:

@Data
public class Company {
    private String name;
    private String address;
}

接下來我們Company的name和address屬性進行序列化,實現(xiàn)下Serializer接口:

public class CompanySerializer implements Serializer<Customer> {
    public void configure(Map<String, ?> configs, boolean isKey) {}
    public byte[] serialize(String topic, Company data) {
        if (data == null) {
            return null;
        }
        byte[] name, address;
        try {
            if (data.getName() != null) {
                name = data.getName().getBytes("UTF-8");
            } else {
                name = new byte[0];
            }
            if (data.getAddress() != null) {
                address = data.getAddress().getBytes("UTF-8");
            } else {
                address = new byte[0];
            }
            ByteBuffer buffer = ByteBuffer.allocate(4+4+name.length + address.length);
            buffer.putInt(name.length);
            buffer.put(name);
            buffer.putInt(address.length);
            buffer.put(address);
            return buffer.array();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return new byte[0];
    }
    public void close() {}
}

使用自定義的序列化類的方式也簡單,在前面的代碼中替換下properties中的序列化類即可:

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.xxx.kafka. CompanySerializer");

分區(qū)器

在上文的demo中,我們創(chuàng)建消息的時候,必須要提供主題和消息的內(nèi)容,而消息的key是可選的(也就是我們平時工作中,發(fā)送消息時只需要指定topic和message),當(dāng)不指定key時默認為null.消息的key有兩個重要的作用:

  • 提供描述消息的額外信息;
  • 用來決定消息寫入到哪個分區(qū),所有具有相同key的消息會分配到同一個分區(qū)中.

如果key為null,那么生產(chǎn)者會使用默認的分配器,該分配器使用輪詢round-robin)算法來將消息均衡到所有分區(qū).

如果key不為null且使用的是默認的分配器,那么生產(chǎn)者會對key進行哈希并根據(jù)結(jié)果將消息分配到特定的分區(qū).注意的是,在計算消息與分區(qū)的映射關(guān)系時,使用的是全部的分區(qū)數(shù)而不僅僅是可用的分區(qū)數(shù).這也意味著,如果某個分區(qū)不可用(雖然使用復(fù)制方案的話這極少發(fā)生),而消息剛好被分配到該分區(qū),那么將會寫入失敗.另外,如果需要增加額外的分區(qū),那么消息與分區(qū)的映射關(guān)系將會發(fā)生改變,因此盡量避免這種情況,具體的信息可以查看DefaultPartitioner中的代碼實現(xiàn):

    /**
     * Compute the partition for the given record.
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //獲取指定topic的partitions
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        //key=null 
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            //可用分區(qū)
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                //消息隨機分布到topic可用的partition中
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // 無分區(qū)可利用, 給定一個不可用的分區(qū)
                return Utils.toPositive(nextValue) % numPartitions;
            }
            //如果 key 不為 null思劳,并且使用了默認的分區(qū)器,kafka 會使用自己的 hash 算法對 key 取 hash 值
        } else {//通過hash獲取partition
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

現(xiàn)在來看下如何自定義一個分配器,下面將key為Test的消息單獨放在一個分區(qū),與其他的消息進行分區(qū)隔離:

public class TestPartitioner implements Partitioner {
    public void configure(Map<String, ?> configs) {}
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if ((keyBytes == null) || (!(key instanceOf String)))
        throw new InvalidRecordException("We expect all messages to have customer name as key")
    if (((String) key).equals("Test"))
        return numPartitions; // Banana will always go to last partition
   
     // Other records will get hashed to the rest of the partitions
    return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
    }
    
    public void close() {}
 
}

使用自定義的分區(qū)器

使用很簡單,在配置文件中或者properties文件中指定分區(qū)器的類即可;

props.put("partitioner.class", "com.xxx.kafka.TestPartitioner");

攔截器

Producer攔截器是個相當(dāng)新的功能.對于producer而言,interceptor使得用戶在消息發(fā)送前以及producer回調(diào)邏輯前有機會對消息做一些定制化需求,比如修改消息等.同時,producer允許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈,Intercetpor的實現(xiàn)接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:

  • onSend(ProducerRecord):該方法封裝進KafkaProducer.send方法中妨猩,即它運行在用戶主線程中的芳室。Producer確保在消息被序列化以計算分區(qū)前調(diào)用該方法哟绊。用戶可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的topic和分區(qū)坐梯,否則會影響目標(biāo)分區(qū)的計算
  • onAcknowledgement(RecordMetadata, Exception e):該方法會在消息被應(yīng)答之前或消息發(fā)送失敗時調(diào)用震嫉,并且通常都是在producer回調(diào)邏輯觸發(fā)之前森瘪。onAcknowledgement運行在producer的IO線程中,因此不要在該方法中放入很重的邏輯票堵,否則會拖慢producer的消息發(fā)送效率
  • close:關(guān)閉interceptor,主要用于執(zhí)行一些資源清理工作,一般不作實現(xiàn);

interceptor可能被運行在多個線程中,因此在具體實現(xiàn)時用戶需要自行確保線程安全.另外倘若指定了多個interceptor,則producer將按照指定順序調(diào)用它們,并僅僅是捕獲每個interceptor可能拋出的異常記錄到錯誤日志中而非在向上傳遞.這在使用過程中要特別留意.

下面我們簡單演示一個雙interceptor組成的攔截鏈,第一個interceptor會在消息發(fā)送前將時間戳信息加到消息value的最前部;第二個interceptor會在消息發(fā)送后更新成功發(fā)送消息數(shù)或失敗發(fā)送消息數(shù).

第一個,在send方法中,我們會創(chuàng)建一個新的message,把時間戳寫入消息體的最前部.

public class TimeStampPrependerInterceptor implements ProducerInterceptor<String, String> {
    @Override
    public void configure(Map<String, ?> configs) {
 
    }
 
    @Override
    public ProducerRecord onSend(ProducerRecord msg) {
        return new ProducerRecord(
                msg(), msg(), record.timestamp(), msg(), System.currentTimeMillis() + "," + msg().toString());
    }
 
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
 
    }
 
    @Override
    public void close() {
    }
}

定義第二個interceptor:CounterInterceptor,該interceptor會在消息發(fā)送后更新"發(fā)送成功消息數(shù)"和"發(fā)送失敗消息數(shù)"兩個計數(shù)器,并在producer關(guān)閉時打印這兩個計數(shù)器;

public class CounterInterceptor implements ProducerInterceptor<String, String> {
 
    private int errorCounter = 0;
    private int successCounter = 0;
 
    @Override
    public void configure(Map<String, ?> configs) {
    }
 
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return record;
    }
 
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
    }
 
    @Override
    public void close() {
        // 保存結(jié)果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
    }
}

定義好interceptor之后,我們需要在producer中這樣指定即可,代碼如下:

List<String> interceptors = new ArrayList<>();
interceptors.add("com.xxx.kafka.TimeStampPrependerInterceptor"); // interceptor 1
interceptors.add("com.xxx.kafka.CounterInterceptor"); // interceptor 2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

// 一定要關(guān)閉producer扼睬,這樣才會調(diào)用interceptors中的close方法
producer.close();

寫了這么多,基本上將一個簡單消息從kafka producer發(fā)送時可以做的事情弄清了,但是我還是有一個疑問,分區(qū)器,攔截器,序列化他們之間有順序?
這個疑問留給大家自己去解決!!!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末悴势,一起剝皮案震驚了整個濱河市窗宇,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌特纤,老刑警劉巖军俊,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異捧存,居然都是意外死亡粪躬,警方通過查閱死者的電腦和手機担败,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來镰官,“玉大人提前,你說我怎么就攤上這事∮具耄” “怎么了狈网?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長笨腥。 經(jīng)常有香客問我拓哺,道長,這世上最難降的妖魔是什么扇雕? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任拓售,我火速辦了婚禮,結(jié)果婚禮上镶奉,老公的妹妹穿的比我還像新娘础淤。我一直安慰自己,他們只是感情好哨苛,可當(dāng)我...
    茶點故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布鸽凶。 她就那樣靜靜地躺著,像睡著了一般建峭。 火紅的嫁衣襯著肌膚如雪玻侥。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天亿蒸,我揣著相機與錄音凑兰,去河邊找鬼。 笑死边锁,一個胖子當(dāng)著我的面吹牛姑食,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播茅坛,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼音半,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了贡蓖?” 一聲冷哼從身側(cè)響起曹鸠,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎斥铺,沒想到半個月后彻桃,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡仅父,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年叛薯,在試婚紗的時候發(fā)現(xiàn)自己被綠了浑吟。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,117評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡耗溜,死狀恐怖组力,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情抖拴,我是刑警寧澤燎字,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布,位于F島的核電站阿宅,受9級特大地震影響候衍,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜洒放,卻給世界環(huán)境...
    茶點故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一蛉鹿、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧往湿,春花似錦妖异、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至绒窑,卻和暖如春棕孙,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背些膨。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工蟀俊, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人订雾。 一個月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓欧漱,卻偏偏與公主長得像,于是被迫代替她去往敵國和親葬燎。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,877評論 2 345

推薦閱讀更多精彩內(nèi)容

  • 一天晚上,在樓下奶茶店擅威,偶然間看到這個乖巧的小可愛壕探,就把它定格了下來。因為最近考研的事情郊丛,真的好煩人李请,總是覺得時間...
    突然就想你了閱讀 170評論 0 0
  • 喜歡的女生終于發(fā)消息給我导盅,我卻果斷拉黑 大學(xué)喜歡一個女生四年较幌,被拒絕了4年 畢業(yè)后她去了外省,我留在本地白翻,從此不再...
    迷茫懶惰君閱讀 573評論 0 0
  • 想有一首歌 我們在歌里 冬天入夜猩紅的天空 再來一場細碎的大雪 空靜無人的廣場 出現(xiàn)一個你 陪我看完這雪景 若真有...
    竹七君閱讀 343評論 0 0
  • 一乍炉、10月整體分析 月計劃60%完成,重要事項取得的效果比預(yù)期更好滤馍!優(yōu)化了工作上的石墨協(xié)作工作集流程岛琼,建立了家庭石...
    揚頭望月亮閱讀 226評論 0 0