Kafka 基礎(chǔ)總結(jié)

Kafka是由美國(guó)的職業(yè)網(wǎng)站LinkedIn創(chuàng)造,作為一個(gè)社交企業(yè)漆魔,LinkedIn 有非常多的IT 系統(tǒng)而且日活量非常大硝全, 由此產(chǎn)生的數(shù)據(jù)被許多系統(tǒng)產(chǎn)生及使用。剛開(kāi)始抄腔,LinkedIn使用ActiveMQ作為數(shù)據(jù)通道去處理這些數(shù)據(jù)瓢湃,但是后來(lái)發(fā)現(xiàn)經(jīng)常會(huì)出現(xiàn)阻塞及服務(wù)不可用,然后LinkedIn就開(kāi)發(fā)了kafka赫蛇。由于Kafka解決的是生產(chǎn)環(huán)境中上下游系統(tǒng)的耦合問(wèn)題绵患,所以kafka不僅是一個(gè)消息中間件,還是一個(gè)數(shù)據(jù)引擎悟耘,或者分布式實(shí)時(shí)流處理平臺(tái)落蝙。

使用場(chǎng)景

1. 消息傳遞

消息傳遞就是發(fā)送消息,可以把kafka作為一個(gè)MQ(ActiveMQ作煌、RabbitMQ...)掘殴,實(shí)現(xiàn)異步、解耦粟誓、削峰奏寨,而且吞吐量比這些消息中間件更大。

2. 網(wǎng)站活動(dòng)追蹤

可以把用戶(hù)活動(dòng)信息鹰服,比如登錄病瞳、點(diǎn)擊、瀏覽等各種行為進(jìn)行監(jiān)控悲酷、追蹤套菜,然后分析并發(fā)送到下游系統(tǒng),給用戶(hù)提供更加精確的內(nèi)容推薦设易。

3. 日志聚合

Kafka可以實(shí)現(xiàn)日志聚合逗柴,這樣就不用把日志記錄到磁盤(pán)或者數(shù)據(jù)庫(kù),實(shí)現(xiàn)分布式的日志聚合顿肺。

4. 應(yīng)用指標(biāo)監(jiān)控

還可以用來(lái)做運(yùn)維系統(tǒng)監(jiān)控戏溺。比如監(jiān)控交易系統(tǒng)的訂單信息渣蜗,用戶(hù)的年齡分布,的確分布旷祸,購(gòu)買(mǎi)偏好等耕拷。或者監(jiān)控應(yīng)用服務(wù)器的內(nèi)存托享、CPU骚烧、磁盤(pán)、網(wǎng)絡(luò)等的使用情況闰围,并進(jìn)行緊急情況提醒赃绊。

5. 數(shù)據(jù)集成+流計(jì)算

kafka 內(nèi)置的kafka Streams 可以更加方便的進(jìn)行數(shù)據(jù)流的處理,把數(shù)據(jù)導(dǎo)入的離線(xiàn)數(shù)據(jù)庫(kù)比如Hadoop辫诅、Hbased等凭戴,實(shí)現(xiàn)數(shù)據(jù)分析。所以Kafka不僅僅是一個(gè)MQ中間件炕矮,還是一個(gè)流處理平臺(tái)。在kafka中者冤,消息就是日志肤视。日志就是消息的數(shù)據(jù)文件。

簡(jiǎn)單使用

首先需要搭建一套Kafka的環(huán)境涉枫,在這就不過(guò)多介紹了邢滑,網(wǎng)上一大堆教程。但是要說(shuō)明一點(diǎn)愿汰,kafka需要ZK的服務(wù)困后,Zookeeper做了什么?因?yàn)閆ookeeper的有序節(jié)點(diǎn)衬廷、臨時(shí)節(jié)點(diǎn)和監(jiān)聽(tīng)機(jī)制摇予,所以ZK幫kafka做了這些事情:配置中心(管理Broker、Topic吗跋、Partition侧戴、Consumer的信息,包括元數(shù)據(jù)的變動(dòng))跌宛、負(fù)載均衡酗宋、集群管理和選舉、分布式鎖等疆拘。

1. Jave Basic Demo蜕猫。

導(dǎo)入Kafka的Maven依賴(lài):

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>

創(chuàng)建一個(gè)Counsumer類(lèi),先初始化Consumer需要的配置哎迄,然后創(chuàng)建一個(gè)KafkaConsumer對(duì)象并訂閱MyTopic主題, 然后隔一段時(shí)間從Kafka中拉取消息回右。

public class Consumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        // 綁定KafkaServer
        props.put("bootstrap.servers","192.168.1.91:9092");
        // 綁定一個(gè)Consumer Group
        props.put("group.id","yq-test-group");
        // 消費(fèi)Messgae之后自動(dòng)提交到Kafka
        props.put("enable.auto.commit","true");
        // 消費(fèi)Messgae之后自動(dòng)提交的時(shí)間間隔
        props.put("auto.commit.interval.ms","1000");
        // 新來(lái)的消費(fèi)者從哪里開(kāi)始消費(fèi)隆圆,值有:earlist / lastest / none
        props.put("auto.offset.reset","earliest");
        // key 和 value的序列化工具
   props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
   
            // 創(chuàng)建一個(gè)Consumer對(duì)象
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
        consumer.subscribe(Arrays.asList("MyTopic"));
        try {
            while (true){
                    // 從服務(wù)器隔一段時(shí)間去拉取最新消息
                ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String,String> record : records) {
                    System.out.printf("offset = %d, key = %s, value= %s, partition = %s\n",
                            record.offset(),record.key(),record.value(),record.partition());
                }

            }
        }finally {
                // 關(guān)閉消費(fèi)者
            consumer.close();
        }

    }
}

然后創(chuàng)建一個(gè)Producer類(lèi),初始化producer配置楣黍,并創(chuàng)建一個(gè)KafkaProducer對(duì)象匾灶,并發(fā)送MyTopic主題的消息.

public class Producer {

    public static void main(String[] args) {
        Properties props = new Properties();
        // 綁定服務(wù)器
        props.put("bootstrap.servers","192.168.1.91:9092");
        // 重發(fā)次數(shù)
        props.put("ack","1");
        // 批量發(fā)送的大小
        props.put("batch.size",16384);
        // 發(fā)送
        props.put("group.id","yq-test-group");
        props.put("buffer.memory",33554432);        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");    props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i <= 100; i++) {
            producer.send(new ProducerRecord<String, String>("MyTopic",Integer.toString(i),Integer.toString(i)));
        }
        producer.close();

    }
}

先運(yùn)行Consumer類(lèi),Consumer開(kāi)始監(jiān)聽(tīng)服務(wù)器租漂,然后運(yùn)行Producer類(lèi)阶女,然后在Consumer的Console就可以看到類(lèi)似下面的log:

offset = 592, key = 88, value= 88, partition = 0
offset = 592, key = 88, value= 88, partition = 0
offset = 593, key = 89, value= 89, partition = 0
offset = 594, key = 90, value= 90, partition = 0
offset = 595, key = 91, value= 91, partition = 0
offset = 596, key = 92, value= 92, partition = 0
offset = 597, key = 93, value= 93, partition = 0
offset = 598, key = 94, value= 94, partition = 0
offset = 599, key = 95, value= 95, partition = 0
offset = 600, key = 96, value= 96, partition = 0
offset = 601, key = 97, value= 97, partition = 0
offset = 602, key = 98, value= 98, partition = 0
offset = 603, key = 99, value= 99, partition = 0

2. SpringBoot Demo

創(chuàng)建一個(gè)SpringBoot Project,同上先導(dǎo)入Maven依賴(lài)哩治,然后再application.properties中添加如下配置

server.port=7271
spring.kafka.bootstrap-servers=192.168.1.91:9092

spring.kafka.group.id=yq-test-group
spring.kafka.enable.auto.commit=true
spring.kafka.auto.commit.interval.ms=1000
spring.kafka.auto.offset.reset=earliest
spring.kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.ack=1
spring.kafka.retries=1
spring.kafka.batch.size=16384
spring.kafka.linger.ms=5
spring.kafka.buffer.memory=33554432
spring.kafka.max.block.ms=3000
spring.kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer

Consumer類(lèi):

@Component
public class Consumer {

    @KafkaListener(topics = "SpringBootTopics",groupId = "SpringBootTopic-group")
    public void onMessage(String msg){
        System.out.println("------ 收到消息 : " + msg);
    }

}

Producer類(lèi)

@RestController
public class Producer {

    @Autowired
    private KafkaTemplate template;

    @GetMapping("/send")
    public String sent(@RequestParam("msg") String msg) {
        template.send("SpringBootTopics", msg);
        return "Ok";
    }

}

再瀏覽器打開(kāi)如下鏈接:http://localhost:7271/send?msg=12313秃踩,在編輯器控制臺(tái)就可以看到如下log:

2021-03-02 21:24:44.378  INFO 62448 --- [nio-7271-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 66563e712b0b9f84
2021-03-02 21:24:44.378  INFO 62448 --- [nio-7271-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1614691484378
2021-03-02 21:24:44.386  INFO 62448 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: i815xO16TgeNlXHhQPfEzA
------ 收到消息 : 12313

上面就是Kafka的簡(jiǎn)單使用。

Kafka架構(gòu)

這個(gè)經(jīng)典的圖是kafka的data flow业筏,下面就來(lái)分析一下這個(gè)圖中的信息憔杨。

WX20210302-221200@2x.png
  1. 有2個(gè)生產(chǎn)者,Producer0生產(chǎn)主題為Topic0的消息 蒜胖,Producer1生產(chǎn)Topic0Topic1兩種主題的消息消别。
  2. 有3個(gè)Broker,可以認(rèn)為是有三臺(tái)服務(wù)器台谢。
  3. Topic0 有2個(gè)partition寻狂,3個(gè)replica(副本),Topic1有一個(gè)partition朋沮,也有3個(gè)replica蛇券。紅色字體代表是該分區(qū)的leader節(jié)點(diǎn),剩下兩個(gè)代表該主題的follower節(jié)點(diǎn)樊拓。生產(chǎn)者向leader節(jié)點(diǎn)中寫(xiě)入數(shù)據(jù)纠亚,follower節(jié)點(diǎn)向leader節(jié)點(diǎn)同步最新數(shù)據(jù)。
  4. 有2個(gè)Consumer Group筋夏,Consumer group0中的Consumer 0消費(fèi)Topic0蒂胞;Consumer group1中的Consumer 0消費(fèi)Topic0Topic1Consumer 1消費(fèi)Topic0叁丧,Consumer 2什么也不消費(fèi)啤誊。
1. Producer

Producer就是生產(chǎn)者,就是發(fā)送消息的一方拥娄。Kafka發(fā)消息不是一條一條發(fā)送的蚊锹,是批量發(fā)送的,這樣就能夠提高發(fā)送效率稚瘾。

由這個(gè)字段決定:

props.put("batch.size",16384);

2. Consumer

Consumer就是消費(fèi)者牡昆,就是消費(fèi)消息的一方。Kafka使用pull來(lái)獲取消息,這樣消費(fèi)者就可以控制消費(fèi)的速率丢烘,就不會(huì)出現(xiàn)消息太多消費(fèi)不了的情況柱宦。

3. Broker
WX20210302-224454@2x.png

Broker就是kafka的服務(wù)播瞳。生產(chǎn)者和消費(fèi)者都需要連接broker才能實(shí)現(xiàn)消息的轉(zhuǎn)發(fā)。

4. Message

客戶(hù)端之間傳輸?shù)臄?shù)據(jù)就叫做message忧侧,也叫做record。生產(chǎn)者中為ProducerRecord牌芋,消費(fèi)者中為ConsumerRecord。

消息在傳輸過(guò)程中需要進(jìn)行序列化躺屁,可以使用kafka內(nèi)置的幾種序列化工具,不滿(mǎn)足需求也可以創(chuàng)建自定義的序列化工具犀暑。

5. Topic

生產(chǎn)者和消費(fèi)者需要一個(gè)隊(duì)列才能關(guān)聯(lián)起來(lái)驯击,生產(chǎn)者要把消息發(fā)送到某個(gè)主題的隊(duì)列中,消費(fèi)者接受消息耐亏,也要接收這個(gè)隊(duì)列的消息苹熏。這個(gè)隊(duì)列在kafka中叫做Topic币喧。一個(gè)生產(chǎn)者可以生產(chǎn)多種不同topic的消息,一個(gè)消費(fèi)這個(gè)也可以消費(fèi)不同topic的消息干发。

如果要?jiǎng)h除一個(gè)topic,就要把a(bǔ)uto.create.topics.enable 設(shè)置為false枉长,默認(rèn)為true琼讽。

6. Partition

如果一個(gè)topic中消息過(guò)多钻蹬,就會(huì)產(chǎn)生下面的問(wèn)題:

  1. 并發(fā)或者負(fù)載的問(wèn)題,如果客戶(hù)端的操作都指向一個(gè)topic,在高并發(fā)情況下性能就會(huì)下降问欠。

  2. 還有不方便橫向擴(kuò)展,如果想把數(shù)據(jù)分散到不同的機(jī)器上做集群旗国,而不是升級(jí)硬件,這個(gè)topic就無(wú)法在物理上拆分到各個(gè)機(jī)器上能曾。

為了解決這些問(wèn)題kafka就提出了Partition的概念,其實(shí)就是把Topic分割為不同的分區(qū)(分片思想借浊,類(lèi)似數(shù)據(jù)庫(kù)中的分庫(kù)分表)蚂斤。

舉個(gè)例子, Topic有3個(gè)Partition,發(fā)送了9條消息曙蒸,第一個(gè)分區(qū)存儲(chǔ)1纽窟、4、7森枪,第二個(gè)分區(qū)存儲(chǔ)2审孽、5、8式散,第三個(gè)分區(qū)存儲(chǔ)3打颤、6编饺、9。這樣就實(shí)現(xiàn)了負(fù)載均衡那伐。

在服務(wù)器中的/tmp/kafka-logs/下面就可以看到每個(gè)Partition的數(shù)據(jù),而且kafka的數(shù)據(jù)文件是順序追加寫(xiě)入的畅形,這也是為啥kafka吞吐量很大的一個(gè)原因诉探。

7. Replica機(jī)制

如果Partition的數(shù)據(jù)只有一份,如果這個(gè)broker 掛掉了竖席,就會(huì)出現(xiàn)服務(wù)不可用毕荐,甚至數(shù)據(jù)丟失不可恢復(fù)艳馒。這個(gè)副本機(jī)制就是為了提高kafka的容錯(cuò)率而設(shè)計(jì)的。

8. Segment

kafka的消息是放在.log文件中第美,如果一個(gè)Partition只有一個(gè)log文件陆爽,如果寫(xiě)入很多數(shù)據(jù)后慌闭,檢索效率也會(huì)變得很差,因此這時(shí)候kafka就會(huì)重新拆分Partition文件兔港,切分處理后的每一個(gè).log文件就是一個(gè)segment仔拟。segment的大小可以由log.segment.bytes控制利花。

WX20210303-222409@2x.png

比如這個(gè)圖中就有2個(gè)segment, 每一個(gè)segment都有2個(gè)索引文件和一個(gè)數(shù)據(jù)文件载佳。

xxx.index:記錄的Consumer當(dāng)前讀取消息的偏移量

xxx.log: 數(shù)據(jù)文件

xxx.timeindex : 記錄的是數(shù)據(jù)創(chuàng)建或者寫(xiě)入的時(shí)間

9. Consumer group

Consumer group是為了確定這個(gè)組內(nèi)的消費(fèi)者是不是消費(fèi)同一個(gè)topci然后引入的蔫慧。消費(fèi)同一topic的消費(fèi)者不一定在同一個(gè)組。只有g(shù)roup id相同才是同一個(gè)消費(fèi)者組睡扬。

注意:同一group的消費(fèi)者不能同時(shí)消費(fèi)相同的Partition,這樣會(huì)造成重復(fù)消費(fèi)的問(wèn)題屎开,這也是為什么上面那個(gè)data flow圖中 Consumer 3 沒(méi)有消費(fèi)任何topic的原因马靠,因?yàn)橹挥?個(gè)Partition甩鳄,但是有3個(gè)Consumer,而最多能讓2個(gè)消費(fèi)者去消費(fèi)妙啃。

10. Consumer offset

因?yàn)閗afka是順序?qū)懭氲模绻鸆onsumer掛掉了茁瘦,后來(lái)起來(lái)后我么如何知道它上一次消費(fèi)到哪里了呢? kafka對(duì)消息者進(jìn)行了編號(hào)储笑,而且存儲(chǔ)了每個(gè)消費(fèi)者消費(fèi)每個(gè)partition的offset, 這個(gè)offset就存儲(chǔ)在/tmp/kafka-logs/下的類(lèi)似這樣__consumer_offsets-46的folder中突倍。總共有50個(gè)焊虏,會(huì)根據(jù)cousumer id的hash code 與50取模秕磷,然后放到相應(yīng)的Consumer 索引文件中,這樣即使服務(wù)掛掉疏尿,只要這個(gè)文件不丟失易桃,依然可以確定上次消費(fèi)到那個(gè)offset晤郑。

進(jìn)階功能

1. 消息冪等性

如果生產(chǎn)者無(wú)法確定消費(fèi)者有沒(méi)有收到或者消費(fèi)消息贸宏,如果要重發(fā)消息吭练,這就可能造成消息重復(fù)消費(fèi)的情況褐鸥。消息重復(fù)消費(fèi)要在消費(fèi)者端去處理,kafka干脆在broker端實(shí)現(xiàn)了消息的重復(fù)問(wèn)題處理浑侥,這樣就大大減輕了消費(fèi)者的工作寓落。
在Producer端設(shè)置enable.idempotence 為true后荞下,這個(gè)producer就是冪等性的producer,kafka會(huì)自動(dòng)去重仰税。主要是通過(guò)這兩個(gè)值去確定唯一消息抽诉。

1. PID(Producer ID, 冪等性的每個(gè)客戶(hù)端都有一個(gè)唯一的ID
2. Sequence Number, 冪等性的每個(gè)生產(chǎn)者發(fā)送的消息都有一個(gè)唯一的Seq number

Broker 就是根據(jù)上面這兩個(gè)值來(lái)確保數(shù)據(jù)是否重復(fù)迹淌。但是這也只能保證在單partition上冪等性或者單一回話(huà)上的冪等性,即重啟producer后的冪等性就不能保證了耙饰。如果想要保證全局?jǐn)?shù)據(jù)的冪等性纹份,就要用到kafka的事物機(jī)制了蔓涧。

2. 生產(chǎn)者事務(wù)處理

通過(guò)Kafka的事物機(jī)制就可以保證全局消息的冪等性。在下面這幾種情況就需要使用事務(wù)來(lái)處理了。

  1. 只有1個(gè)broker, 1個(gè)topic 1個(gè)副本昨寞,發(fā)送一組消息,要么全部失敗歼狼,要么全部成功。
  2. 如果發(fā)送到多個(gè)partition或者多個(gè)topic园蝠,它們可能分布在不同的服務(wù)器上,需要它們要么全部失敗鳞贷,要么全部成功搀愧。
  3. 消費(fèi)者后生產(chǎn)者在同一塊代碼中(Consume-Process-Produce),即先消費(fèi)然后再作為生產(chǎn)者發(fā)送給下游系統(tǒng)搓幌,就需要保證接受消息和發(fā)送消息全部成功迅箩。

使用方式:

// 開(kāi)啟事務(wù)一定要設(shè)置transaction.id
props.put("transactional.id", UUID.randomUUID().toString());
Producer<String,String> producer = new KafkaProducer<String,String>(props);
// 初始化事務(wù)
producer.initTransactions();
try {
    producer.beginTransaction();
    producer.send(new ProducerRecord<String,String>("transaction-test","1","1"));
    producer.send(new ProducerRecord<String,String>("transaction-test","2","2"));
    // Integer i = 1/0;
    producer.send(new ProducerRecord<String,String>("transaction-test","3","3"));
    // 提交事務(wù)
    producer.commitTransaction();
} catch (KafkaException e) {
    // 中止事務(wù)
    producer.abortTransaction();
}

因?yàn)橄⒖赡芸绶?wù)器沙热,所以這里的事務(wù)就是分布式事務(wù)篙贸。采用的是2PC(2階段提交)。大家都可以commit才commit敷鸦,否則就abort寝贡。kafka會(huì)場(chǎng)景一個(gè)Coordinator圃泡,并把事務(wù)的日志用topic_transaction_state記錄下來(lái),就跟消費(fèi)的offset一樣价说。這樣生產(chǎn)者掛了之后下次重啟后還能根據(jù)這個(gè)offset繼續(xù)上次的事務(wù)。

總結(jié)

kafka作為一個(gè)流處理平臺(tái)扮叨,有許多優(yōu)點(diǎn)彻磁。

  • 高吞吐狸捅、低延遲:kakfa的優(yōu)點(diǎn)就是收發(fā)消息特別快,每秒可以處理幾十萬(wàn)條消息恍箭。
  • 高伸縮性:通過(guò)分區(qū)partition的機(jī)制扯夭,不同的分區(qū)可以處在不同的broker中鞍匾。然后通過(guò)ZK去管理橡淑,實(shí)現(xiàn)負(fù)載。
  • 持久性置森、可靠性:通過(guò)把消息持久化在磁盤(pán)中符糊,并在不同broker中的replica機(jī)制男娄,保證數(shù)據(jù)可靠性。
  • 容錯(cuò)性:集群中的某個(gè)節(jié)點(diǎn)掛掉建瘫,整體服務(wù)也可用尸折。
  • 高并發(fā):同時(shí)支持?jǐn)?shù)千個(gè)客戶(hù)端同時(shí)讀寫(xiě)实夹。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市贮配,隨后出現(xiàn)的幾起案子泪勒,更是在濱河造成了極大的恐慌宴猾,老刑警劉巖仇哆,帶你破解...
    沈念sama閱讀 216,470評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件讹剔,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡陌兑,警方通過(guò)查閱死者的電腦和手機(jī)由捎,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,393評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)狞玛,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)心肪,“玉大人,你說(shuō)我怎么就攤上這事贰镣∩拍” “怎么了蹬音?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,577評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵著淆,是天一觀(guān)的道長(zhǎng)拴疤。 經(jīng)常有香客問(wèn)我呐矾,道長(zhǎng)懦砂,這世上最難降的妖魔是什么荞膘? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,176評(píng)論 1 292
  • 正文 為了忘掉前任羽资,我火速辦了婚禮,結(jié)果婚禮上潮改,老公的妹妹穿的比我還像新娘进陡。我一直安慰自己微服,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,189評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著丛肮,像睡著了一般宝与。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上咆瘟,一...
    開(kāi)封第一講書(shū)人閱讀 51,155評(píng)論 1 299
  • 那天袒餐,我揣著相機(jī)與錄音灸眼,去河邊找鬼。 笑死霉囚,一個(gè)胖子當(dāng)著我的面吹牛匕积,可吹牛的內(nèi)容都是我干的闸天。 我是一名探鬼主播斜做,決...
    沈念sama閱讀 40,041評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼瓤逼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼霸旗!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起撵枢,我...
    開(kāi)封第一講書(shū)人閱讀 38,903評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤锄禽,失蹤者是張志新(化名)和其女友劉穎靴姿,沒(méi)想到半個(gè)月后佛吓,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,319評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,539評(píng)論 2 332
  • 正文 我和宋清朗相戀三年钝凶,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片掂名。...
    茶點(diǎn)故事閱讀 39,703評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡饺蔑,死狀恐怖猾警,靈堂內(nèi)的尸體忽然破棺而出隆敢,到底是詐尸還是另有隱情拂蝎,我是刑警寧澤,帶...
    沈念sama閱讀 35,417評(píng)論 5 343
  • 正文 年R本政府宣布玄货,位于F島的核電站松捉,受9級(jí)特大地震影響馆里,放射性物質(zhì)發(fā)生泄漏隘世。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,013評(píng)論 3 325
  • 文/蒙蒙 一鸠踪、第九天 我趴在偏房一處隱蔽的房頂上張望以舒。 院中可真熱鬧,春花似錦慢哈、人聲如沸蔓钟。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,664評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)滥沫。三九已至,卻和暖如春键俱,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背编振。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,818評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工缀辩, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,711評(píng)論 2 368
  • 正文 我出身青樓臀玄,卻偏偏與公主長(zhǎng)得像瓢阴,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子健无,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,601評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容