簡讀筆記-深入理解kafka-第一部分

第一章 初始kafka

參考書籍: 朱小廝--深入理解Kafka 核心設(shè)計(jì)與實(shí)踐原理

Kafka體系結(jié)構(gòu)

  • Kafka體系架構(gòu)包含若干Producer, 若干Broker , 若干Consumer,以及一個(gè)Zookeeper集群鞭盟。

    • Zookeeper是Kafka用來負(fù)責(zé)集群元數(shù)據(jù)的管理齿诉、控制器的選舉等操作粤剧。
    • Producer:生產(chǎn)者抵恋,即發(fā)送消息的一方弧关。生產(chǎn)者負(fù)責(zé)創(chuàng)建消息世囊,然后將其投遞到Kafka中
    • Broker:一個(gè)獨(dú)立的Kafka服務(wù)節(jié)點(diǎn)株憾。 一個(gè)或多個(gè)Broker組成了一個(gè)Kafka集群
    • Consumer: 消費(fèi)者墙歪,也就是接收消息的一方虹菲。消費(fèi)者連接到Kafka上并拉取消息届惋,進(jìn)行相應(yīng)的業(yè)務(wù)邏輯處理
    kafka體系架構(gòu)

主題和分區(qū)

  • Kafka的每條消息都屬于一個(gè)主題郑藏,生產(chǎn)者負(fù)責(zé)將消息發(fā)送到特定的主題,而消費(fèi)者負(fù)責(zé)訂閱主題并消費(fèi)

  • 一個(gè)主題可以細(xì)分為多個(gè)分區(qū)拌牲,一個(gè)分區(qū)屬于單個(gè)主題。 分區(qū)可以看成是一個(gè)可追加的日志文件失驶, 消息在被追加到分區(qū)日志文件時(shí)會(huì)分配一個(gè)偏移量土居, 偏移量是消息在分區(qū)中的唯一標(biāo)識擦耀,Kafka保證了 偏移量在分區(qū)中是有序的眷蜓。

    消息寫入
  • 每一條消息發(fā)送時(shí)會(huì)根據(jù)分區(qū)規(guī)則選擇存儲(chǔ)到哪一個(gè)分區(qū)吁系,在主題創(chuàng)建之后可以通過修改分區(qū)的數(shù)量實(shí)現(xiàn)水平擴(kuò)展

多副本(Replica機(jī)制)

  • 多副本機(jī)制是通過增加副本數(shù)量進(jìn)行數(shù)據(jù)冗余碧聪,從而提高容災(zāi)能力逞姿。 副本之間是“一主多從”的關(guān)系。其中leader副本負(fù)責(zé)處理讀寫請求栋烤,follower副本只負(fù)責(zé)與leader副本的消息同步 (區(qū)別于讀寫分離) 明郭, 當(dāng)leader副本宕機(jī)時(shí)通過leader選舉和失效轉(zhuǎn)移,保證了Kafka的高可用性话侄。
  • folower副本的消息相對于leader副本具有一定的滯后性
多副本架構(gòu)

幾個(gè)重要名詞概念

  • AR (Assigned Replicas): 分區(qū)中的所有副本

  • ISR (In-Sync-Replicas): 與leader副本保持一定程度同步的副本

  • OSR(Out-of-Sync-Replicas): 與leader副本滯后過多的副本

    即 AR = ISR + OR年堆;

  • HW(High WaterMark): 高水位, 用來標(biāo)記一個(gè)特定的消息偏移量痒蓬,消費(fèi)者只能拉取到這個(gè)offset之前的消息(可見性)

  • LEO( Log End Offset) : 標(biāo)志著當(dāng)前日志文件中下一條待寫入消息的offset 谊却。 分區(qū)ISR集合中的每個(gè)副本都維護(hù)自身的LEO炎辨,而ISR集合中的最小LEO為分區(qū)的HW碴萧,對消費(fèi)者而言只能消費(fèi)HW之前的消息破喻。

HW和LEO的關(guān)系

第二章 生產(chǎn)者

KafkaProducer是線程安全的

public class KafkaProducerAnalysis {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";

    public static Properties initConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("client.id", "producer.client.id.demo");
        return props;
    }
    
    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello, Kafka!");
        producer.send(record);
    }
}
  • 生產(chǎn)邏輯的幾個(gè)步驟
    • 配置生產(chǎn)者客戶端參數(shù)并創(chuàng)建生產(chǎn)者實(shí)例
    • 構(gòu)建待發(fā)送消息
    • 發(fā)送消息
    • 關(guān)閉生產(chǎn)者實(shí)例

發(fā)送消息的三種模式

public Future<RecordMetadata> send(ProducerRecord<K, V> record);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
  • 發(fā)后即忘
    • 只管發(fā)送消息,不關(guān)心信息是否正確到達(dá)几莽。
    • 優(yōu)點(diǎn):性能最高,吞吐量大 缺點(diǎn):會(huì)造成數(shù)據(jù)丟失姨夹,可靠性低
  • 同步
    • 發(fā)送消息后返回Future對象峭沦,調(diào)用get()方法時(shí)阻塞等待熙侍,直到發(fā)送成功或出現(xiàn)異常
    • 優(yōu)點(diǎn):可靠性高,如有異程曜纾可處理或進(jìn)行消息重發(fā) 缺點(diǎn):性能低矛辕,造成阻塞
  • 異步
    • 發(fā)送消息時(shí)指定回調(diào)函數(shù)聊品,Kafka在返回響應(yīng)時(shí)會(huì)調(diào)用該函數(shù)實(shí)現(xiàn)異步的發(fā)送確認(rèn)翻屈。
    • 在同一個(gè)分區(qū)中,如果消息record1比record2先發(fā)送厘贼,那么它會(huì)保證callback1在callback2之前調(diào)用嘴秸。

序列化器

  • 生產(chǎn)者使用序列化器將對象轉(zhuǎn)換為字節(jié)數(shù)組凭疮,才能通過網(wǎng)絡(luò)發(fā)送給Kafka
  • 消費(fèi)者使用反序列化其把Kafka中收到的字節(jié)數(shù)組轉(zhuǎn)換為相應(yīng)的對象哭尝。
  • 因此生產(chǎn)者的序列化器和消費(fèi)者使用的反序列化器要一一對應(yīng)。

分區(qū)器

分區(qū)器 是根據(jù)key這個(gè)字段來計(jì)算partition值桶唐。它的作用是為消息分配分區(qū)

生產(chǎn)者攔截器

生產(chǎn)者攔截器既可用來在消息發(fā)送前做一些準(zhǔn)備工作如 按照某個(gè)規(guī)則過濾掉不符合要求的消息尤泽,修改消息內(nèi)容等坯约。也可以用來在發(fā)送回調(diào)邏輯前做一些定制化需求,如統(tǒng)計(jì)工作卿拴。 還可以指定多個(gè)攔截器形成攔截器鏈

生產(chǎn)者整體架構(gòu)

生產(chǎn)者架構(gòu)
  • 整個(gè)生產(chǎn)者客戶端由 主線程和Sender線程構(gòu)成
  • 在④中,是用于緩存消息梨与,以便Sender線程進(jìn)行批量發(fā)送堕花,進(jìn)而減少網(wǎng)絡(luò)傳輸
  • 在⑤中,是將 <分區(qū)粥鞋,消息集合> 轉(zhuǎn)化為 <brokerId, 消息集合>缘挽。 即邏輯地址到物理地址的轉(zhuǎn)化
  • 在⑦中,用于緩存尚未收到回應(yīng)的消息呻粹,以便異常時(shí)可進(jìn)行重發(fā)
    • 重要參數(shù) max.in.flight.requests.per 默認(rèn)值為5到踏,即每個(gè)連接最多只能緩存5個(gè)未響應(yīng)的請求窝稿。 可類比于TCP連接中的滑動(dòng)窗口大小

元數(shù)據(jù)的更新

元數(shù)據(jù)是指Kafka集群中的元數(shù)據(jù)踪少,這些元數(shù)據(jù)記錄了集群中有哪些主題,這些主題有哪些分區(qū),每個(gè)分區(qū)的leader副本分配在哪個(gè)節(jié)點(diǎn)上,follwer副本分配在哪些節(jié)點(diǎn)上,哪些副本在AR,ISR集合中,集群有哪些節(jié)點(diǎn),控制節(jié)點(diǎn)又是哪一個(gè)等信息愁铺。

元數(shù)據(jù)更新會(huì)挑選 InFlightRequests中當(dāng)前負(fù)載最小的節(jié)點(diǎn)發(fā)送更新元數(shù)據(jù)請求瓶竭。 由于Sender線程需要更新荧恍,而主線程需要讀取。因此數(shù)據(jù)同步問題也要考慮蔽介。使用synchronized和final保證武花。

幾個(gè)重要的參數(shù)

  • acks : 用來指定分區(qū)中必須要有多少個(gè)副本收到這條消息累铅,這樣生產(chǎn)者才認(rèn)為消息寫入成功
    • 取值為1 : 只要leader副本成功寫入消息投储,就會(huì)收到kafka的成功響應(yīng)
    • 取值為0: 不需要等待任何服務(wù)器響應(yīng)勋眯,寫入就認(rèn)為成功
    • 取值為-1或all:需要等待ISR中的所有副本都成功寫入消息讶坯,才會(huì)收到kafka的成功響應(yīng)
  • max.request.size
    • 限制生產(chǎn)者客戶端能發(fā)送消息最大值
  • retires 洼冻、retry.backoff.ms
    • 配置生產(chǎn)者重試次數(shù) 屋彪、 兩次重試的時(shí)間間隔
  • max.in.flight.requests.per.connection
    • 默認(rèn)值為5蟹但,即每個(gè)連接最多只能緩存5個(gè)未響應(yīng)的請求。
    • 當(dāng)此參數(shù) > 1 沙郭,則會(huì)因?yàn)橹匕l(fā)而出現(xiàn)錯(cuò)序的問題

第三章 消費(fèi)者

kafkaConsumer是線程不安全的

消費(fèi)者和消費(fèi)者組

  • 每個(gè)消費(fèi)者只能消費(fèi)所分配到的分區(qū)中的消息再扭,即每一個(gè)分區(qū)只能被一個(gè)消費(fèi)者組中的一個(gè)消費(fèi)者所消費(fèi)

  • 當(dāng)消費(fèi)組內(nèi)的消費(fèi)者個(gè)數(shù)變化時(shí)對應(yīng)的分區(qū)分配演變?nèi)缦拢海J(rèn)的RangeAssinor為例)

    消費(fèi)者與消費(fèi)者組
  • 消費(fèi)者與消費(fèi)者組的模型讓整體消費(fèi)能力具有了伸縮性÷缭洌可以增加消費(fèi)者個(gè)數(shù)來提高(或降低)整體消費(fèi)能力

  • 當(dāng)消費(fèi)者過多怨愤,出現(xiàn)消費(fèi)者分配不到任何分區(qū)時(shí)柿汛,那么這些消費(fèi)者將無法消費(fèi)消息貌笨,造成浪費(fèi)

    消費(fèi)組內(nèi)有過多的消費(fèi)者
  • Kafka基于消費(fèi)者和消費(fèi)者組模型 支持了 點(diǎn)對點(diǎn)和發(fā)布/訂閱兩種模式

    • 如果所有消費(fèi)者都隸屬于同一個(gè)消費(fèi)組,那么所有的消息都會(huì)被均衡地投遞給每一個(gè)消費(fèi)者祠汇,即每條消息只會(huì)被一個(gè)消費(fèi)者處理屿良,相當(dāng)于點(diǎn)對點(diǎn)模式
    • 如果所有的消費(fèi)者都隸屬于不同的消費(fèi)組喷橙,那么所有的消息都會(huì)被廣播給所有的消費(fèi)者,即每條消息會(huì)被所有的消費(fèi)者處理登舞,相當(dāng)于發(fā)布/訂閱模式的應(yīng)用
  • 每一個(gè)消費(fèi)者只隸屬于一個(gè)消費(fèi)組。消息發(fā)送時(shí)可指定消費(fèi)者組管挟, 消費(fèi)者客戶端通過group.id配置消費(fèi)者組名稱守谓,默認(rèn)為空字符串。

消費(fèi)者客戶端開發(fā)

KafkaConsumer是非線程安全的

public class KafkaConsumerAnalysis {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";
    public static final String groupId = "group.demo";
    public static final AtomicBoolean isRunning = new AtomicBoolean(true);

    public static Properties initConfig() {
        Properties props = new Properties();
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("bootstrap.servers", brokerList);
        props.put("group.id", groupId);
        props.put("client.id", "consumer.client.id.demo");
        return props;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));

        try {
            while (isRunning.get()) {
                ConsumerRecords<String, String> records =
                        consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("topic = " + record.topic()
                            + ", partition = " + record.partition()
                            + ", offset = " + record.offset());
                    System.out.println("key = " + record.key()
                            + ", value = " + record.value());
                    //do something to process record.
                }
            }
        } catch (Exception e) {
            log.error("occur exception ", e);
        } finally {
            consumer.close();
        }
    }
}
  • Kafka的消費(fèi)邏輯

    • 配置消費(fèi)者客戶端參數(shù)及創(chuàng)建相應(yīng)消費(fèi)者實(shí)例
    • 訂閱主題
    • 拉取消息并消費(fèi)
    • 提交消費(fèi)位移(后面會(huì)講)
    • 關(guān)閉消費(fèi)者實(shí)例
  • 訂閱主題和分區(qū)的細(xì)節(jié)

    • 有三種訂閱的方式。 集合訂閱的方式subscribe(Collection)闽铐、正則表達(dá)式訂閱方式subscribe(Pattern)、

      指定分區(qū)的訂閱方式assign(Collection)

    • subscribe訂閱主題時(shí)具有再平衡(后面會(huì)講)的功能奶浦,而assign沒有兄墅。

反序列化器

將字節(jié)數(shù)組轉(zhuǎn)化為對象,與生產(chǎn)者的序列化器要一一對應(yīng)

消息消費(fèi)

幾個(gè)常用API

#KafkaConsumer
public ConsumerRecords<K, V> poll(Duration timeout);
#ConsumerRecords
public List<ConsumerRecord<K, V>> records(TopicPartition partition);
public Iterable<ConsumerRecord<K, V>> records(String topic);
  • timeout參數(shù)用于控制阻塞時(shí)間澳叉,再消費(fèi)者緩沖區(qū)里沒有數(shù)據(jù)時(shí)會(huì)發(fā)生阻塞
  • 按照分區(qū)的維度隙咸,獲取拉取消息中 某個(gè)分區(qū)的所有記錄
  • 按照主題的維度,獲取拉取消息中 某個(gè)主題的所有記錄

位移提交

  • Consumer會(huì)記錄上一次的消費(fèi)位移成洗,并進(jìn)行持久化保存五督。 存儲(chǔ)到Kafka的內(nèi)部主題__consumer_offset中

    消費(fèi)位移
  • 位移的提交時(shí)機(jī)也有講究,可能會(huì)造成重復(fù)消費(fèi)和消息丟失的現(xiàn)象

    • 拉取到消息之后就進(jìn)行位移提交瓶殃, 若消費(fèi)到一半時(shí)宕機(jī)充包,則造成消失丟失現(xiàn)象

    • 消費(fèi)完所有消息后在進(jìn)行位移提交, 若消費(fèi)到一半時(shí)宕機(jī),則造成重復(fù)消費(fèi)現(xiàn)象

      消費(fèi)位移的提交位置
  • Kafka默認(rèn)的消費(fèi)位移提交方式是自動(dòng)提交(定期)基矮。enable.auto.commit默認(rèn)為true; auto.commit.interval.ms配置提交的周期淆储。自動(dòng)提交的動(dòng)作是在poll()方法的邏輯中完成的,會(huì)在每次拉取請求之間檢查是否可以進(jìn)行位移提交家浇。

  • 自動(dòng)提交會(huì)造成重復(fù)消費(fèi)和消息丟失的現(xiàn)象

    • 重復(fù)消費(fèi): 消費(fèi)到一半時(shí)宕機(jī)本砰,而尚未提交,則造成重復(fù)消費(fèi)

    • 消息丟失:如圖線程A進(jìn)行拉取消息到緩存钢悲,線程B從緩存中處理邏輯点额。 若線程B處理到一半時(shí)宕機(jī),那么下次恢復(fù)時(shí)又從【X+7】開始拉取莺琳,造成了【x+4】-【X+7】消息的丟失

      自動(dòng)位移提交中消息丟失的情況
  • 可以看出自動(dòng)提交編碼簡單但會(huì)出現(xiàn)消息丟失和重復(fù)消費(fèi)現(xiàn)象还棱,并且無法做到精確的位移管理,因此Kafka還提供了 手動(dòng)提交的方式惭等。通常不是拉取到消息就算消費(fèi)完成了诱贿,而是當(dāng)我們通過這條消息完成一系列業(yè)務(wù)處理后,才認(rèn)為消息被成功消費(fèi)咕缎。開啟手動(dòng)提交需要enable.auto.commit設(shè)置為false

  • 手動(dòng)提交可分為 同步提交和異步提交珠十。 即commitSync()和commitAsync()兩種方式

    以下是同步提交和異步提交的一些案例

    #拉取所有消息并處理后進(jìn)行同步提交
    while (running.get()) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        for (ConsumerRecord<String, String> record : records) {
        //do some logical processing.
        }
        consumer.commitSync();
    }
    
    #批量處理+批量提交
    int minBatchSize = 200;
    while (running.get()) {
      ConsumerRecords<String, String> records = consumer.poll(1000);
      for (ConsumerRecord<String, String> record : records) {
      buffer.add(record);
      }
      if (buffer.size() >= minBatchSize) {
          //do some logical processing with buffer.
          consumer.commitSync();
          buffer.clear();
      }
    }
    
    #帶參數(shù)的同步位移提交,可控制提交的offset凭豪,該案例為每消費(fèi)一條就提交一次
    while (running.get()) {
      ConsumerRecords<String, String> records = consumer.poll(1000);
      for (ConsumerRecord<String, String> record : records) {
          //do some logical processing.
          long offset = record.offset();
          TopicPartition partition =
          new TopicPartition(record.topic(), record.partition());
          consumer.commitSync(Collections
              .singletonMap(partition, new OffsetAndMetadata(offset + 1)));
      }
    }
    
    #按分區(qū)粒度同步提交消費(fèi)位移焙蹭,每處理完一個(gè)分區(qū)就提交一次
    while (running.get()) {
      ConsumerRecords<String, String> records = consumer.poll(1000);
      for (TopicPartition partition : records.partitions()) {
          List<ConsumerRecord<String, String>> partitionRecords =
          records.records(partition);
          for (ConsumerRecord<String, String> record : partitionRecords) {
          //do some logical processing.
          }
          long lastConsumedOffset = partitionRecords
              .get(partitionRecords.size() - 1).offset();
          consumer.commitSync(Collections.singletonMap(partition,
                new OffsetAndMetadata(lastConsumedOffset + 1)));
      }
    }
    
    #異步提交,可指定提交完成后的回調(diào)函數(shù)
    public void commitAsync()嫂伞;
    public void commitAsync(OffsetCommitCallback callback)孔厉;
    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
    
  • 異步提交也存在重復(fù)消費(fèi)的問題。如果先提交了【X+2】帖努,再提交【X+8】撰豺。如果后者提交成功而前者提交失敗。 如果此時(shí)前者進(jìn)行重試提交拼余,那么成功后會(huì)造成數(shù)據(jù)的重復(fù)消費(fèi)污桦。

  • 對于異步提交可以設(shè)置一個(gè)遞增的序號維護(hù)異步提交的順序,如當(dāng)位移提交失敗需要重試提交時(shí)匙监,對比所提交的位移和維護(hù)的序號大小凡橱,如果前者小于后者,就不需要再重復(fù)提交了亭姥。

控制或關(guān)閉消費(fèi)

  • KafkaConsumer提供了暫停pause()和恢復(fù)resume()某些分區(qū)的消費(fèi)稼钩;以及關(guān)閉close()的方法'

指定位移消費(fèi)

  • 如用一個(gè)新的消費(fèi)者組來消費(fèi)主題時(shí),由于沒有可查找的消費(fèi)偏移达罗,因此會(huì)按照auto.offset.reset配置來決定從何處開始消費(fèi)消息坝撑。
  • seek()方法為我們提供了從特定位置讀取消息的能力,通過這個(gè)方法來向前跳過若干消息,也可以通過這個(gè)方法來向后回溯若干消息

再均衡

  • 再均衡是指分區(qū)所屬權(quán)從一個(gè)消費(fèi)者轉(zhuǎn)移到另一個(gè)消費(fèi)者的行為巡李。它為消費(fèi)者組具備高可用性和伸縮性提供了保障抚笔。
  • 在再均衡期間,消費(fèi)組內(nèi)的消費(fèi)者是無法讀取消息的。即在再均衡發(fā)生期間的一段時(shí)間內(nèi)击儡,消費(fèi)者會(huì)變得不可用
  • 當(dāng)一個(gè)分區(qū)被重新分配給另一個(gè)消費(fèi)者時(shí),消費(fèi)者當(dāng)前的狀態(tài)也會(huì)丟失蝠引。比如消費(fèi)者消費(fèi)完某個(gè)分區(qū)的一部分信息但還沒來得及提交消費(fèi)位移就發(fā)生了再均衡操作
  • 調(diào)用subscribe()方法時(shí)可以提供再均衡監(jiān)聽器來設(shè)置 消費(fèi)者停止讀取消息開始讀取消費(fèi) 時(shí)的回調(diào)函數(shù)

消費(fèi)者攔截器

  • 在消費(fèi)者poll()方法返回之前 阳谍、 提交完消息位移之后關(guān)閉消費(fèi)者之前 調(diào)用攔截器中的方法
  • 多個(gè)消費(fèi)者攔截器也能組成攔截器鏈

多線程的實(shí)現(xiàn)

  • 一個(gè)消費(fèi)線程消費(fèi)一個(gè)或多個(gè)分區(qū)

    一個(gè)消費(fèi)線程消費(fèi)一個(gè)或多個(gè)分區(qū)
  • 多個(gè)消費(fèi)線程同時(shí)消費(fèi)同一個(gè)分區(qū) (會(huì)使得提交偏移量異常復(fù)雜)

  • 類似IO多路復(fù)用螃概,一個(gè)線程拉取消息矫夯,拉取消息后提交到線程池中。 (因?yàn)槔∠?huì)比處理業(yè)務(wù)邏輯快)

    • 會(huì)出現(xiàn)消息丟失的情況吊洼,線程A消費(fèi)0-99 训貌,線程B消費(fèi)100-199 后提交。 線程A未提交而掛掉后冒窍,那么0-99這一段數(shù)據(jù)就丟失了
    第三種方式
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末递沪,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子综液,更是在濱河造成了極大的恐慌款慨,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,657評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件谬莹,死亡現(xiàn)場離奇詭異檩奠,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)附帽,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評論 3 394
  • 文/潘曉璐 我一進(jìn)店門埠戳,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人蕉扮,你說我怎么就攤上這事整胃。” “怎么了喳钟?”我有些...
    開封第一講書人閱讀 164,057評論 0 354
  • 文/不壞的土叔 我叫張陵爪模,是天一觀的道長。 經(jīng)常有香客問我荚藻,道長屋灌,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,509評論 1 293
  • 正文 為了忘掉前任应狱,我火速辦了婚禮共郭,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己除嘹,他們只是感情好写半,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,562評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著尉咕,像睡著了一般叠蝇。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上年缎,一...
    開封第一講書人閱讀 51,443評論 1 302
  • 那天悔捶,我揣著相機(jī)與錄音,去河邊找鬼单芜。 笑死蜕该,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的洲鸠。 我是一名探鬼主播堂淡,決...
    沈念sama閱讀 40,251評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼扒腕!你這毒婦竟也來了绢淀?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,129評論 0 276
  • 序言:老撾萬榮一對情侶失蹤瘾腰,失蹤者是張志新(化名)和其女友劉穎更啄,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體居灯,經(jīng)...
    沈念sama閱讀 45,561評論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡祭务,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,779評論 3 335
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了怪嫌。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片义锥。...
    茶點(diǎn)故事閱讀 39,902評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖岩灭,靈堂內(nèi)的尸體忽然破棺而出拌倍,到底是詐尸還是另有隱情,我是刑警寧澤噪径,帶...
    沈念sama閱讀 35,621評論 5 345
  • 正文 年R本政府宣布柱恤,位于F島的核電站,受9級特大地震影響找爱,放射性物質(zhì)發(fā)生泄漏梗顺。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,220評論 3 328
  • 文/蒙蒙 一车摄、第九天 我趴在偏房一處隱蔽的房頂上張望寺谤。 院中可真熱鬧仑鸥,春花似錦、人聲如沸变屁。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,838評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽粟关。三九已至疮胖,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間闷板,已是汗流浹背澎灸。 一陣腳步聲響...
    開封第一講書人閱讀 32,971評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留蛔垢,地道東北人击孩。 一個(gè)月前我還...
    沈念sama閱讀 48,025評論 2 370
  • 正文 我出身青樓迫悠,卻偏偏與公主長得像鹏漆,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子创泄,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,843評論 2 354

推薦閱讀更多精彩內(nèi)容