Kafka生產(chǎn)者:寫消息到Kafka

本章我們將會討論Kafka生產(chǎn)者是如何發(fā)送消息到Kafka的翩瓜。Kafka項目有一個生產(chǎn)者客戶端,我們可以通過這個客戶端的API來發(fā)送消息物独。

概要

當我們發(fā)送消息之前挤土,先問幾個問題:每條消息都是很關(guān)鍵且不能容忍丟失么容劳?偶爾重復(fù)消息可以么?我們關(guān)注的是消息延遲還是寫入消息的吞吐量?
舉個例子此再,有一個信用卡交易處理系統(tǒng),當交易發(fā)生時會發(fā)送一條消息到Kafka句惯,另一個服務(wù)來讀取消息并根據(jù)規(guī)則引擎來檢查交易是否通過土辩,將結(jié)果通過Kafka返回。對于這樣的業(yè)務(wù)抢野,消息既不能丟失也不能重復(fù)拷淘,由于交易量大因此吞吐量需要盡可能大,延遲可以稍微高一點指孤。
再舉個例子启涯,假如我們需要收集用戶在網(wǎng)頁上的點擊數(shù)據(jù)贬堵,對于這樣的場景,少量消息丟失或者重復(fù)是可以容忍的结洼,延遲多大都不重要只要不影響用戶體驗黎做,吞吐則根據(jù)實時用戶數(shù)來決定。
不同的業(yè)務(wù)需要使用不同的寫入方式和配置松忍。后面我們將會討論這些API蒸殿,現(xiàn)在先看下生產(chǎn)者寫消息的基本流程:

image

流程如下:

  1. 首先,我們需要創(chuàng)建一個ProducerRecord鸣峭,這個對象需要包含消息的主題(topic)和值(value)宏所,可以選擇性指定一個鍵值(key)或者分區(qū)(partition)。
  2. 發(fā)送消息時摊溶,生產(chǎn)者會對鍵值和值序列化成字節(jié)數(shù)組爬骤,然后發(fā)送到分配器(partitioner)。
  3. 如果我們指定了分區(qū)莫换,那么分配器返回該分區(qū)即可霞玄;否則,分配器將會基于鍵值來選擇一個分區(qū)并返回浓镜。
  4. 選擇完分區(qū)后溃列,生產(chǎn)者知道了消息所屬的主題和分區(qū),它將這條記錄添加到相同主題和分區(qū)的批量消息中膛薛,另一個線程負責發(fā)送這些批量消息到對應(yīng)的Kafka broker听隐。
  5. 當broker接收到消息后,如果成功寫入則返回一個包含消息的主題哄啄、分區(qū)及位移的RecordMetadata對象雅任,否則返回異常。
  6. 生產(chǎn)者接收到結(jié)果后咨跌,對于異郴γ矗可能會進行重試。

創(chuàng)建Kafka生產(chǎn)者

創(chuàng)建Kafka生產(chǎn)者有三個基本屬性:

  • bootstrap.servers:屬性值是一個host:port的broker列表锌半。這個屬性指定了生產(chǎn)者建立初始連接的broker列表禽车,這個列表不需要包含所有的broker,因為生產(chǎn)者建立初始連接后會從相應(yīng)的broker獲取到集群信息刊殉。但建議指定至少包含兩個broker殉摔,這樣一個broker宕機后生產(chǎn)者可以連接到另一個broker。
  • key.serializer:屬性值是類的名稱记焊。這個屬性指定了用來序列化鍵值(key)的類逸月。Kafka broker只接受字節(jié)數(shù)組,但生產(chǎn)者的發(fā)送消息接口允許發(fā)送任何的Java對象遍膜,因此需要將這些對象序列化成字節(jié)數(shù)組碗硬。key.serializer指定的類需要實現(xiàn)org.apache.kafka.common.serialization.Serializer接口瓤湘,Kafka客戶端包中包含了幾個默認實現(xiàn),例如ByteArraySerializer恩尾、StringSerializer和IntegerSerializer弛说。
  • value.serializer:屬性值是類的名稱。這個屬性指定了用來序列化消息記錄的類特笋,與key.serializer差不多剃浇。

Maven依賴

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.11.0.0</version>
    </dependency>

下面是一個樣例代碼:

private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(kafkaProps);

創(chuàng)建完生產(chǎn)者后,我們可以發(fā)送消息猎物。Kafka中有三種發(fā)送消息的方式:

  • 只發(fā)不管結(jié)果(fire-and-forget):只調(diào)用接口發(fā)送消息到Kafka服務(wù)器虎囚,但不管成功寫入與否。由于Kafka是高可用的蔫磨,因此大部分情況下消息都會寫入淘讥,但在異常情況下會丟消息。
  • 同步發(fā)送(Synchronous send):調(diào)用send()方法返回一個Future對象堤如,我們可以使用它的get()方法來判斷消息發(fā)送成功與否蒲列。
  • 異步發(fā)送(Asynchronous send):調(diào)用send()時提供一個回調(diào)方法,當接收到broker結(jié)果后回調(diào)此方法搀罢。

本章的例子都是單線程發(fā)送的蝗岖,但生產(chǎn)者對象是線程安全的,它支持多線程發(fā)送消息來提高吞吐榔至。需要的話抵赢,我們可以使用多個生產(chǎn)者對象來進一步提高吞吐。

發(fā)送消息到Kafka

最簡單的發(fā)送消息方式如下:

ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry", "Precision Products", "France");

try {
  producer.send(record);
} catch (Exception e) {
  e.printStackTrace();
}

這里做了如下幾件事:

  1. 我們創(chuàng)建了一個ProducerRecord唧取,并且指定了主題以及消息的key/value铅鲤。主題總是字符串類型的,但key/value則可以是任意類型枫弟,在本例中也是字符串邢享。需要注意的是,這里的key/value的類型需要與serializer和生產(chǎn)者的類型匹配淡诗。
  2. 使用send()方法來發(fā)送消息骇塘,該方法會返回一個RecordMetadata的Future對象,但由于我們沒有跟蹤Future對象韩容,因此并不知道發(fā)送結(jié)果绪爸。如前所述,這種方式可能會丟失消息宙攻。
  3. 雖然我們忽略了發(fā)送消息到broker的異常,但是我們調(diào)用send()方法時仍然可能會遇到一些異常介褥,例如序列化異常座掘、發(fā)送緩沖區(qū)溢出異常等等递惋。

同步發(fā)送消息

同步發(fā)送方式可以簡單修改如下:

ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry", "Precision Products", "France");

try {
  producer.send(record).get();
} catch (Exception e) {
  e.printStackTrace();
}

注意到,這里使用了Future.get()來獲取發(fā)送結(jié)果溢陪,如果發(fā)送消息失敗則會拋出異常萍虽,否則返回一個RecordMetadata對象。發(fā)送失敗異常包含:1)broker返回不可恢復(fù)異常形真,生產(chǎn)者直接拋出該異常杉编;2)對于broker其他異常,生產(chǎn)者會進行重試咆霜,如果重試超過一定次數(shù)仍不成功則拋出異常邓馒。

可恢復(fù)異常指的是,如果生產(chǎn)者進行重試可能會成功蛾坯,例如連接異常光酣;不可恢復(fù)異常則是進行重試也不會成功的異常,例如消息內(nèi)容過大脉课。

異步發(fā)送消息

首先了解下什么場景下需要異步發(fā)送消息救军。假如生產(chǎn)者與broker之間的網(wǎng)絡(luò)延時為10ms,我們發(fā)送100條消息倘零,發(fā)送每條消息都等待結(jié)果唱遭,那么需要1秒的時間。而如果我們采用異步的方式呈驶,幾乎沒有任何耗時拷泽,而且我們還可以通過回調(diào)知道消息的發(fā)送結(jié)果。

異步發(fā)送消息的樣例如下:

public class DemoProducerCallback implements Callback {
  @Override
  public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    if (e != null) {
      e.printStackTrace();
    }
  }
}

ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry", "Precision Products", "France");

producer.send(record, new DemoProducerCallback());

異步回調(diào)的類需要實現(xiàn)org.apache.kafka.clients.producer.Callback接口俐东,這個接口只有一個onCompletion方法跌穗。當Kafka返回異常時,異常值不為null虏辫,代碼中只是簡單的打印蚌吸,但我們可以采取其他處理方式。

kafka生產(chǎn)者 配置

  1. ackstimeout.ms
  • timeout.ms(0.9.0.0版本中就被棄用)
    指定了 broker 等待同步副本返回消息確認的時間砌庄,與 asks 的配置相匹配——如果在指定時間內(nèi)沒有收到同步副本的確認羹唠,那么 broker 就會返回一個錯誤。

  • acks = 1
    指定了必須要有多少個分區(qū)副本收到消息娄昆,生產(chǎn)者才會認為消息寫入是成功的佩微。這個參數(shù)對消
    息丟失的可能性有重要影響。該參數(shù)有如下選項:

    • acks=0萌焰,生產(chǎn)者在成功寫入消息之前不會等待任何來自服務(wù)器的響應(yīng)哺眯。也就是說,如果當中
      出現(xiàn)了問題扒俯,導致服務(wù)器沒有收到消息奶卓,那么生產(chǎn)者就無從得知一疯,消息也就丟失了。不過夺姑,因為
      生產(chǎn)者不需要等待服務(wù)器的響應(yīng)墩邀,所以它可以以網(wǎng)絡(luò)能夠支持的最大速度發(fā)送消息,從而達到很
      高的吞吐量盏浙。

    • acks=1眉睹,只要集群的 Leader 節(jié)點收到消息,生產(chǎn)者就會收到一個來自服務(wù)器的成功響應(yīng)废膘。如果消息無法到達 Leader 節(jié)點(比如首領(lǐng)節(jié)點崩潰竹海,新的 Leader 還沒有被選舉出來),生產(chǎn)者會收到一個錯誤響應(yīng)殖卑,為了避免數(shù)據(jù)丟失站削,生產(chǎn)者會重發(fā)消息。不過孵稽,如果一個沒有收到消息的節(jié)點成為新Leader许起,消息還是會丟失。這個時候的吞吐量取決于使用的是同步發(fā)送還是異步發(fā)送菩鲜。如果讓發(fā)送客戶端等待服務(wù)器的響應(yīng)(通過調(diào)用 Future 對象的 get() 方法)园细,顯然會增加延遲(在網(wǎng)絡(luò)上傳輸一個來回的延遲)。如果客戶端使用回調(diào)接校,延遲問題就可以得到緩解猛频,不過吞吐量還是會受發(fā)送中消息數(shù)量的限制(比如,生產(chǎn)者在收到服務(wù)器響應(yīng)之前可以發(fā)送多少個消息)蛛勉。

    • 如果 acks=all鹿寻,只有當所有參與復(fù)制的節(jié)點全部收到消息時,生產(chǎn)者才會收到一個來自服務(wù)器的成功響應(yīng)诽凌。這種模式是最安全的毡熏,它可以保證不止一個服務(wù)器收到消息,就算有服務(wù)器發(fā)生崩潰侣诵,整個集群仍然可以運行痢法。不過,它的延遲比 acks=1 時更高杜顺,因為我們要等待不只一個服務(wù)器節(jié)點接收消息财搁。

  1. buffer.memory=33554432
    該參數(shù)用來設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小,生產(chǎn)者用它緩沖要發(fā)送到服務(wù)器的消息躬络。如果生產(chǎn)消息的速度超過發(fā)送的速度尖奔,會導致生產(chǎn)者空間不足。這個時候, send()方法調(diào)用要么被阻塞提茁,要么拋出異常仗嗦,取決于如何設(shè)置 block.on.buffer.full 參數(shù)(在 0.9.0.0 版本里被替換成了max.block.ms,表示在拋出異常之前可以阻塞一段時間)

  2. compression.type=none
    默認情況下甘凭,消息發(fā)送時不會被壓縮。該參數(shù)可以設(shè)置為 snappy火邓、gziplz4丹弱,它指定了消息被發(fā)送給 broker 之前使用哪一種壓縮算法進行壓縮。

  • snappy 壓縮算法由 Google 發(fā)明铲咨,占用較少的 CPU躲胳,卻能提供較好的性能和相當可觀的壓縮比,如果比較關(guān)注性能和網(wǎng)絡(luò)帶寬纤勒,可以使用這種算法坯苹。
  • gzip 壓縮算法一般會占用較多的 CPU,但會提供更高的壓縮比摇天,所以如果網(wǎng)絡(luò)帶寬比較有限粹湃,可以使用這種算法。

使用壓縮可以降低網(wǎng)絡(luò)傳輸開銷和存儲開銷泉坐,而這往往是向 Kafka 發(fā)送消息的瓶頸所在为鳄。

  1. retriesretry.backoff.ms
  • retries=0
    生產(chǎn)者從服務(wù)器收到的錯誤有可能是臨時性的錯誤(比如分區(qū)找不到 Leader)。在這種情況下腕让,retries
    參數(shù)的值決定了生產(chǎn)者可以重發(fā)消息的次數(shù)孤钦,如果達到這個次數(shù),生產(chǎn)者會放棄重試并返回錯誤纯丸。

  • retry.backoff.ms=100
    默認情況下偏形,生產(chǎn)者會在每次重試之間等待 100ms,不過可以通過 retry.backoff.ms參數(shù)來改變這個時間間隔觉鼻。建議在設(shè)置重試次數(shù)和重試時間間隔之前俊扭,先測試一下恢復(fù)一個崩潰節(jié)點需要多少時間(比如所有分區(qū)選舉出 Leader 需要多長時間),讓總的重試時間比 Kafka 集群從崩潰中恢復(fù)的時間長滑凉,否則生產(chǎn)者會過早地放棄重試统扳。

  • 不過有些錯誤不是臨時性錯誤,沒辦法通過重試來解決(比如“消息太大”錯誤)畅姊。一般情況下咒钟,因為生產(chǎn)者會自動進行重試,所以就沒必要在代碼邏輯里處理那些可重試的錯誤若未。你只需要處理那些不可重試的錯誤和重試次數(shù)超出上限的情況朱嘴。

  1. batch.sizelinger.ms
  • batch.size:=16384
    當有多個消息需要被發(fā)送到同一個分區(qū)時,生產(chǎn)者會把它們放在同一個批次里。該參數(shù)指定了一個批次可以使用的內(nèi)存大小萍嬉,按照字節(jié)數(shù)計算(而不是消息個數(shù))乌昔。
  • linger.ms:=0
    指定了生產(chǎn)者在每次發(fā)送消息的時間間隔

當批次被填滿 或者 等待時間達到 linger.ms設(shè)置的間隔時間,批次里的所有消息會被發(fā)送出去壤追,哪怕此時該批次只有一條消息磕道。
所以就算把批次大小設(shè)置得很大,也不會造成延遲行冰,只是會占用更多的內(nèi)存而已溺蕉。但如果設(shè)置得太小,因為生產(chǎn)者需要更頻繁地發(fā)送消息悼做,會增加一些額外的開銷疯特。

  1. client.id=''
    該參數(shù)可以是任意的字符串,服務(wù)器會用它來識別消息的來源

  2. max.in.flight.requests.per.connection=5
    該參數(shù)指定了生產(chǎn)者在收到服務(wù)器響應(yīng)之前可以發(fā)送多少個消息肛走。它的值越高漓雅,就會占用越多的內(nèi)存,不過也會提升吞吐量朽色。把它設(shè)為 1 可以保證消息是按照發(fā)送的順序?qū)懭敕?wù)器的邻吞,即使發(fā)生了重試。

如何保證順序性:如果把 retries 設(shè)為非零整數(shù)纵搁,同時把 max.in.flight.requests.per.connection 設(shè)為比 1 大的數(shù)吃衅,那么,如果第一個批次消息寫入失敗腾誉,而第二個批次寫入成功徘层,broker 會重試寫入第一個批次。如果此時第一個批次也寫入成功利职,那么兩個批次的順序就反過來了趣效。

一般來說,如果某些場景要求消息是有序的猪贪,那么消息是否寫入成功也是很關(guān)鍵的跷敬,所以不建議把retries設(shè)為 0∪妊海可以把 max.in.flight.requests.per.connection設(shè)為 1西傀,這樣在生產(chǎn)者嘗試發(fā)送第一批消息時,就不會有其他的消息發(fā)送給broker桶癣。不過這樣會嚴重影響生產(chǎn)者的吞吐量拥褂,所以只有在對消息的順序有嚴格要求的情況下才能這么做。

  1. request.timeout.msmetadata.fetch.timeout.ms
  • request.timeout.ms=305000
    指定了生產(chǎn)者在發(fā)送數(shù)據(jù)時等待服務(wù)器返回響應(yīng)的時間
  • metadata.fetch.timeout.ms (0.9.0.0版本中就被棄用)
    指定了生產(chǎn)者在獲取元數(shù)據(jù)(比如目標分區(qū)的 Leader 是誰)時等待服務(wù)器返回響應(yīng)的時間牙寞。如果等待響應(yīng)超時饺鹃,那么生產(chǎn)者要么重試發(fā)送數(shù)據(jù)莫秆,要么返回一個錯誤(拋出異常或執(zhí)行回調(diào))悔详。
  1. max.request.size=1048576
    該參數(shù)用于控制生產(chǎn)者發(fā)送的請求大小镊屎。它可以指能發(fā)送的單個消息的最大值,也可以指單個請求里所有消息總的大小茄螃。例如缝驳,假設(shè)這個值為 1MB,那么可以發(fā)送的單個最大消息為 1MB归苍,或者生產(chǎn)者可以在單個請求里發(fā)送一個批次党巾,該批次包含了 1000 個消息,每個消息大小為 1KB霜医。另外,broker 對可接收的消息最大值也有自己的限制(message.max.bytes)驳规,所以兩邊的配置最好可以匹配肴敛,避免生產(chǎn)者發(fā)送的消息被 broker 拒絕。

注意區(qū)分 batch.size只是針對一個 topic 的 partition吗购,而 max.request.size針對單次請求的医男。

  1. receive.buffer.bytes=32768 和 send.buffer.bytes=131072
    這兩個參數(shù)分別指定了 TCP socket 接收和發(fā)送數(shù)據(jù)包的緩沖區(qū)大小。如果它們被設(shè)為 -1捻勉,就使用操作系統(tǒng)的默認值镀梭。如果生產(chǎn)者或消費者與 broker 處于不同的數(shù)據(jù)中心,那么可以適當增大這些值踱启,因為跨數(shù)據(jù)中心的網(wǎng)絡(luò)一般都有比較高的延遲和比較低的帶寬报账。

關(guān)于更多的配置信息,可以查看:http://kafka.apachecn.org/documentation.html#configuration

完整實例

package com.neuedu;

import java.util.Properties;
import org.apache.kafka.clients.producer.*;

public class Producer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers",
                "hadoop03:9092,hadoop05:9092,hadoop06:9092");//該地址是集群的子集埠偿,用來探測集群透罢。
        props.put("acks", "all");// 記錄完整提交,最慢的但是最大可能的持久化
        props.put("retries", 3);// 請求失敗重試的次數(shù)
        props.put("batch.size", 16384);// batch的大小
        props.put("linger.ms", 1);// 默認情況即使緩沖區(qū)有剩余的空間冠蒋,也會立即發(fā)送請求羽圃,設(shè)置一段時間用來等待從而將緩沖區(qū)填的更多,單位為毫秒抖剿,producer發(fā)送數(shù)據(jù)會延遲1ms朽寞,可以減少發(fā)送到kafka服務(wù)器的請求數(shù)據(jù)
        props.put("buffer.memory", 33554432);// 提供給生產(chǎn)者緩沖內(nèi)存總量
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");// 序列化的方式,
        // ByteArraySerializer或者StringSerializer
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 100; i++)
        {
            producer.send(new ProducerRecord<String, String>("payment", Integer.toString(i), Integer.toString(i)));
        }
        producer.close();

    }
}

通過上面的一些講解斩郎,應(yīng)該已經(jīng)可以比較友好的使用 kafka生產(chǎn)者了脑融,接下來我們還剩下最后一個部分,kafka的分區(qū)

分區(qū)
我們創(chuàng)建消息的時候孽拷,必須要提供主題和消息的內(nèi)容吨掌,而消息的key是可選的,當不指定key時默認為null。消息的key有兩個重要的作用:1)提供描述消息的額外信息膜宋;2)用來決定消息寫入到哪個分區(qū)窿侈,所有具有相同key的消息會分配到同一個分區(qū)中。
如果key為null秋茫,那么生產(chǎn)者會使用默認的分配器史简,該分配器使用輪詢(round-robin)算法來將消息均衡到所有分區(qū)。
如果key不為null而且使用的是默認的分配器肛著,那么生產(chǎn)者會對key進行哈希并根據(jù)結(jié)果將消息分配到特定的分區(qū)圆兵。注意的是,在計算消息與分區(qū)的映射關(guān)系時枢贿,使用的是全部的分區(qū)數(shù)而不僅僅是可用的分區(qū)數(shù)殉农。這也意味著,如果某個分區(qū)不可用(雖然使用復(fù)制方案的話這極少發(fā)生)局荚,而消息剛好被分配到該分區(qū)超凳,那么將會寫入失敗。另外耀态,如果需要增加額外的分區(qū)轮傍,那么消息與分區(qū)的映射關(guān)系將會發(fā)生改變,因此盡量避免這種情況首装。
自定義分配器
在kafka配置參數(shù)時設(shè)置分區(qū)器的類
//設(shè)置自定義分區(qū)
kafkaProps.put("partitioner.class", "com.chb.partitioner.MyPartitioner");

現(xiàn)在來看下如何自定義一個分配器创夜,下面將key為Banana的消息單獨放在一個分區(qū),與其他的消息進行分區(qū)隔離:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;

public class BananaPartitioner implements Partitioner {
    public void configure(Map<String, ?> configs) {}
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if ((keyBytes == null) || (!(key instanceOf String)))
        throw new InvalidRecordException("We expect all messages to have customer name as key")
    if (((String) key).equals("Banana"))
        return numPartitions - 1; // Banana will always go to last partition

     // Other records will get hashed to the rest of the partitions
    return (Math.abs(Utils.murmur2(keyBytes)) % numPartitions)
    }

    public void close() {}

}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末仙逻,一起剝皮案震驚了整個濱河市驰吓,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌系奉,老刑警劉巖棚瘟,帶你破解...
    沈念sama閱讀 219,039評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異喜最,居然都是意外死亡偎蘸,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,426評論 3 395
  • 文/潘曉璐 我一進店門瞬内,熙熙樓的掌柜王于貴愁眉苦臉地迎上來迷雪,“玉大人,你說我怎么就攤上這事虫蝶≌逻郑” “怎么了?”我有些...
    開封第一講書人閱讀 165,417評論 0 356
  • 文/不壞的土叔 我叫張陵能真,是天一觀的道長赁严。 經(jīng)常有香客問我扰柠,道長,這世上最難降的妖魔是什么疼约? 我笑而不...
    開封第一講書人閱讀 58,868評論 1 295
  • 正文 為了忘掉前任卤档,我火速辦了婚禮,結(jié)果婚禮上程剥,老公的妹妹穿的比我還像新娘劝枣。我一直安慰自己,他們只是感情好织鲸,可當我...
    茶點故事閱讀 67,892評論 6 392
  • 文/花漫 我一把揭開白布舔腾。 她就那樣靜靜地躺著,像睡著了一般搂擦。 火紅的嫁衣襯著肌膚如雪稳诚。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,692評論 1 305
  • 那天瀑踢,我揣著相機與錄音采桃,去河邊找鬼。 笑死丘损,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的工扎。 我是一名探鬼主播徘钥,決...
    沈念sama閱讀 40,416評論 3 419
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼肢娘!你這毒婦竟也來了呈础?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,326評論 0 276
  • 序言:老撾萬榮一對情侶失蹤橱健,失蹤者是張志新(化名)和其女友劉穎而钞,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體拘荡,經(jīng)...
    沈念sama閱讀 45,782評論 1 316
  • 正文 獨居荒郊野嶺守林人離奇死亡臼节,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,957評論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了珊皿。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片网缝。...
    茶點故事閱讀 40,102評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖蟋定,靈堂內(nèi)的尸體忽然破棺而出粉臊,到底是詐尸還是另有隱情,我是刑警寧澤驶兜,帶...
    沈念sama閱讀 35,790評論 5 346
  • 正文 年R本政府宣布扼仲,位于F島的核電站远寸,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏屠凶。R本人自食惡果不足惜驰后,卻給世界環(huán)境...
    茶點故事閱讀 41,442評論 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望阅畴。 院中可真熱鬧倡怎,春花似錦、人聲如沸贱枣。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,996評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽纽哥。三九已至钠乏,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間春塌,已是汗流浹背晓避。 一陣腳步聲響...
    開封第一講書人閱讀 33,113評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留只壳,地道東北人俏拱。 一個月前我還...
    沈念sama閱讀 48,332評論 3 373
  • 正文 我出身青樓,卻偏偏與公主長得像吼句,于是被迫代替她去往敵國和親锅必。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,044評論 2 355

推薦閱讀更多精彩內(nèi)容