kafka流處理平臺

Kafka

Apache Kafka起源于LinkedIn苟蹈,后來于2011年成為開源Apache項目,然后于2012年成為First-class Apache項目胳蛮。Kafka是用Scala和Java編寫的销凑。 Apache Kafka是基于發(fā)布訂閱的容錯消息系統。 它是快速仅炊,可擴展和設計分布斗幼。

kafka介紹

主要功能

根據官網的介紹,ApacheKafka?是一個分布式流媒體平臺抚垄,它主要有3種功能:

  1. 發(fā)布和訂閱消息流蜕窿,這個功能類似于消息隊列谋逻,這也是kafka歸類為消息隊列框架的原因

  2. 以容錯的方式記錄消息流,kafka以文件的方式來存儲消息流

  3. 可以再消息發(fā)布的時候進行處理

使用場景

  • 在系統或應用程序之間構建可靠的用于傳輸實時數據的管道桐经,消息隊列功能
  • 構建實時的流數據處理程序來變換或處理數據流毁兆,數據處理功能

基本概念

Producer: 消息和數據的生產者,向kafka的一個Topic發(fā)布消息的進程/代碼/服務

Consumer:消息和數據的消費者阴挣,訂閱數據(topic)并且處理其發(fā)布的消息的進程/代碼/服務

Consumer Group:邏輯概念气堕,對于同一個topic,會廣播給不同的group畔咧,一個group中茎芭,只有一個consumer可以消費該消息

Broker:物理概念,kafka集群中的每個kafka節(jié)點

Topic:邏輯概念誓沸,kafka消息的類別梅桩,對數據進行區(qū)分、隔離

Partition:物理概念拜隧,kafka下數據存儲的基本單元宿百。一個topic數據,會被分散存儲到多個Partition虹蓄,每一個Partition是有序的

Replication:同一個Partition可能會多個Replica犀呼,多個Replica之間數據是一樣的

Replication Leader:一個Partition的多個Replica上,需要一個Leader負責該Partition上與Producer和Consumer交互

ReplicaManager:負責管理當前broker所有分區(qū)和副本的信息薇组,處理KafkaController發(fā)起的一些請求,副本狀態(tài)的切換坐儿、添加/讀取消息等

基本概念延伸

Partition

  • 每一個Topic被切分為多個Partitions
  • 消費者數目少于或等于Partition的數目
  • Broker Group中的每一個Broker保存Topic的一個或多個Partitions
  • Consumer Group中的僅有一個Consumer讀取Topic的一個或多個Partitions律胀,并且是唯一的Consumer

Replication

  • 當集群中有Broker掛掉的情況,系統可以主動地使Replicas提供服務
  • 系統默認設置每一個Topic的Replication系數為1貌矿,可以在創(chuàng)建Topic時單獨設置

Replication特點

  • Replication的基本單位是Topic的Partition
  • 所有的讀和寫都從Leader進炭菌,Followers只是作為備份
  • Follower必須能夠及時復制Leader的數據
  • 增加容錯性和可擴展性

kafka基本結構

消息傳輸流程

消息傳輸流程
  • Producers API
  • Consumers API
  • Streams API
  • Connectors API
    下圖更好的展示了kafka消息隊列模式的運作


    kafka消息隊列模式

kafka的消息結構

消息結構

kafka特點

分布式

  • 多分區(qū)
  • 多副本
  • 多訂閱者
  • 基于ZooKeeper調度

高性能

  • 高吞吐量
  • 低延遲
  • 高并發(fā)
  • 時間復雜度為O(1)

持久性和擴展性

  • 數據可持久化
  • 容錯性
  • 支持水平在線擴展
  • 消息自動平衡

應用場景

  • 消息隊列
  • 行為跟蹤
  • 元信息監(jiān)控
  • 日志收集
  • 流處理
  • 事件源
  • 持久性日志(commit log)

kafka簡單案例

下載與安裝

ZooKeeper下載

http://zookeeper.apache.org/releases.html#download

Kafka下載

http://kafka.apache.org/downloads

安裝

解壓、配置環(huán)境變量
PS:如果是Mac的話逛漫,有一個便捷安裝方法黑低,brew install kafka

啟動

運行Zookeeper

運行cmd命令窗口,輸入zkServer回車酌毡,出現下圖的就表示zookeeper啟動成功克握,也表明安裝成功了。


Zookeeper

啟動kafka

在新的cmd命令行用cd命令切換到kafka根目錄..\kafka_2.11-2.2.0枷踏,輸入命令

.\bin\windows\kafka-server-start.bat .\config\server.properties
kafka

出現started (kafka.server.KafkaServer)字樣表示啟動成功

創(chuàng)建一個Topic

運行新的cmd命令行菩暗,進入..\kafka_2.11-2.2.0\bin\windows,創(chuàng)建主題:test旭蠕,輸入命令

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

創(chuàng)建一個Producer

運行新的cmd命令行停团,進入..\kafka_2.11-2.2.0\bin\windows旷坦,輸入命令

kafka-console-producer.bat --broker-list localhost:9092 --topic test

創(chuàng)建一個Consumer

運行新的cmd命令行,進入..\kafka_2.11-2.2.0\bin\windows佑稠,輸入命令

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

測試

在Producer窗口下輸入信息進行測試 秒梅,輸入的消息立馬就會出現在Consumer中,表明kafka已經安裝測試成功


success

kafka代碼案例

項目結構

結構

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.cxy</groupId>
    <artifactId>kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka</name>
    <description>kafka_learning</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j</artifactId>
            <version>1.3.5.RELEASE</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

application.properties

kafka.consumer.zookeeper.connect=127.0.0.1:2181
kafka.consumer.servers=127.0.0.1:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.topic=kafkaTestTopic
kafka.consumer.group.id=kafkaTest
kafka.consumer.concurrency=10

kafka.producer.servers=127.0.0.1:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960

kafka.topic.default=kafkaTestTopic

log4j.properties

log4j.rootLogger=DEBUG, stdout
# Console output...
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n

ErrorCode.java

package com.cxy.kafka.common;

/**
 * @Auther: cxy
 * @Date: 2019/4/9
 * @Description:
 */
public class ErrorCode {
    public final static int SUCCESS = 200;
    public final static int EXCEPTION = 500;
}

MessageEntity.java

package com.cxy.kafka.common;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;

/**
 * @Auther: cxy
 * @Date: 2019/4/9
 * @Description:
 */
@Getter
@Setter
@EqualsAndHashCode
public class MessageEntity {
    private String title;
    private String body;

    @Override
    public String toString(){
        return "MessageEntity{" +
                "title='" + title + '\'' +
                ",body='" + body + '\'' +
                '}';
    }
}

Response.java

package com.cxy.kafka.common;

import lombok.Getter;
import lombok.Setter;

/**
 * @Auther: cxy
 * @Date: 2019/4/9
 * @Description:
 */
@Getter
@Setter
public class Response {
    private int code;
    private String message;

    public Response(int code, String message){
        this.code = code;
        this.message = message;
    }
}

KafkaProducerConfig.java

package com.cxy.kafka.config;

import com.cxy.kafka.common.MessageEntity;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * @Auther: cxy
 * @Date: 2019/4/9
 * @Description:
 */
@Configuration
@EnableKafka
public class KafkaProducerConfig {
    @Value("${kafka.producer.servers}")
    private String servers;

    @Value("${kafka.producer.retries}")
    private int retries;

    @Value("${kafka.producer.batch.size}")
    private int batchSize;

    @Value("${kafka.producer.linger}")
    private int linger;

    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;

    public Map<String,Object> producerConfigs() {
        Map<String,Object> props = new HashMap<String, Object>(16);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    public ProducerFactory<String, MessageEntity> producerFactory(){
        return new DefaultKafkaProducerFactory<String, MessageEntity>(
                producerConfigs(),
                new StringSerializer(),
                new JsonSerializer<MessageEntity>());
    }

    @Bean
    public KafkaTemplate<String, MessageEntity> kafkaTemplate(){
        return new KafkaTemplate<String, MessageEntity>(producerFactory());
    }
}

KafkaConsumerConfig.java

package com.cxy.kafka.config;

import com.cxy.kafka.common.MessageEntity;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

/**
 * @Auther: cxy
 * @Date: 2019/4/9
 * @Description:
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${kafka.consumer.servers}")
    private String servers;

    //服務是否自動提交
    @Value("${kafka.consumer.enable.auto.commit}")
    private boolean enableAutoCommit;

    //session超時時間
    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;

    //提交的間隔
    @Value("${kafka.consumer.auto.commit.interval}")
    private String autoCommitInterval;

    //生產者groupId
    @Value("${kafka.consumer.group.id}")
    private String groupId;

    //自動將offset重置到某位置
    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;

    //并發(fā)數目
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;

    private Map<String,Object> consumerConfigs() {
        Map<String,Object> props = new HashMap<String, Object>(16);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    private ConsumerFactory<String, MessageEntity> consumerFactory() {
        return new DefaultKafkaConsumerFactory<String, MessageEntity>(
                consumerConfigs(),
                new StringDeserializer(),
                new JsonDeserializer<MessageEntity>(MessageEntity.class));
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MessageEntity>> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, MessageEntity> factory = new ConcurrentKafkaListenerContainerFactory<String, MessageEntity>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }
}

ProducerCallback.java

package com.cxy.kafka.producer;

import com.cxy.kafka.common.MessageEntity;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.Logger;
import org.springframework.kafka.support.SendResult;
import org.springframework.lang.Nullable;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * @Auther: cxy
 * @Date: 2019/4/9
 * @Description:
 */
@Slf4j
public class ProducerCallback implements ListenableFutureCallback<SendResult<String, MessageEntity>> {
    public static final Logger logger = Logger.getLogger(ProducerCallback.class);

    private final long startTime;

    private final String key;

    private final MessageEntity messageEntity;

    private final Gson gson = new Gson();

    public ProducerCallback(long startTime, String key, MessageEntity messageEntity){
        this.startTime = startTime;
        this.key = key;
        this.messageEntity = messageEntity;
    }

    @Override
    public void onFailure(Throwable throwable) {
        throwable.printStackTrace();
    }

    @Override
    public void onSuccess(@Nullable SendResult<String, MessageEntity> stringMessageEntitySendResult) {
        if(stringMessageEntitySendResult == null){
            return;
        }
        long elapsedTime = System.currentTimeMillis() - startTime;

        RecordMetadata metadata = stringMessageEntitySendResult.getRecordMetadata();
        if(metadata!=null){
            StringBuilder stringBuilder = new StringBuilder();
            stringBuilder.append("message(")
                    .append("key= ").append(key).append(",")
                    .append("message= ").append(gson.toJson(messageEntity)).append(")")
                    .append("sent to partition(").append(metadata.partition()).append(")")
                    .append("with offset(").append(metadata.offset()).append(")")
                    .append("in ").append(elapsedTime).append(" ms");
            logger.info(stringBuilder.toString());
        }
    }
}

SimpleProducer.java

package com.cxy.kafka.producer;

import com.cxy.kafka.common.MessageEntity;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

/**
 * @Auther: cxy
 * @Date: 2019/4/9
 * @Description:
 */
@Component
public class SimpleProducer {

    @Autowired
    @Qualifier("kafkaTemplate")
    private KafkaTemplate<String, MessageEntity> kafkaTemplate;

    public void send(String topic, MessageEntity messageEntity){
        kafkaTemplate.send(topic,messageEntity);
    }

    public void send(String topic, String key, MessageEntity messageEntity){
        ProducerRecord<String, MessageEntity> record = new ProducerRecord<String, MessageEntity>(topic,key,messageEntity);
        long startTime = System.currentTimeMillis();
        ListenableFuture<SendResult<String, MessageEntity>> future = kafkaTemplate.send(record);
        future.addCallback(new ProducerCallback(startTime,key,messageEntity));
    }
}

SimpleConsumer.java

package com.cxy.kafka.consumer;

import com.cxy.kafka.common.MessageEntity;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.log4j.Logger;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @Auther: cxy
 * @Date: 2019/4/9
 * @Description:
 */
@Slf4j
@Component
public class SimpleConsumer {
    public static final Logger logger = Logger.getLogger(SimpleConsumer.class);

    private final Gson gson = new Gson();

    @KafkaListener(topics = "${kafka.topic.default}",containerFactory = "kafkaListenerContainerFactory")
    public void receive(MessageEntity messageEntity){
        logger.info(gson.toJson(messageEntity));
    }
}

ProducerController.java

package com.cxy.kafka.controller;

import com.cxy.kafka.common.ErrorCode;
import com.cxy.kafka.common.MessageEntity;
import com.cxy.kafka.common.Response;
import com.cxy.kafka.producer.SimpleProducer;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Auther: cxy
 * @Date: 2019/4/9
 * @Description:
 */
@Slf4j
@RestController
@RequestMapping("/kafka")
public class ProducerController {
    public static final Logger logger = Logger.getLogger(ProducerController.class);

    @Autowired
    private SimpleProducer simpleProducer;

    @Value("${kafka.topic.default}")
    private String topic;

    private Gson gson = new Gson();

    @RequestMapping(value="/hello",method = RequestMethod.GET, produces = {"application/json"})
    public Response sendKafka(){
        return new Response(ErrorCode.SUCCESS,"SUCCESS");
    }

    @RequestMapping(value="/send",method = RequestMethod.POST, produces = {"application/json"})
    public Response sendKafka(@RequestBody MessageEntity messageEntity){
        try{
            logger.info("kafka消息:{}"+gson.toJson(messageEntity));
            simpleProducer.send(topic,"key",messageEntity);
            logger.info("發(fā)送kafka成功舌胶!");
            return new Response(ErrorCode.SUCCESS,"發(fā)送kafka成功番电!");
        }catch(Exception e){
            logger.error("發(fā)送kafka失敗辆琅!",e);
            return new Response(ErrorCode.EXCEPTION,"發(fā)送kafka失斒臁!");
        }
    }
}

啟動
啟動KafkaApplication.java婉烟。(必須打開zookeeper和kafka)
測試
我這邊用的是Restlet Client插件娩井,你們也可以根據自己喜好選擇測試工具

  • 先測試下http://localhost:8080/kafka/hello
    hello
  • 測試http://localhost:8080/kafka/send
    send

    發(fā)送kafka消息成功,去看下控制臺似袁,成功接收到kafka消息
    success

源代碼

源代碼

Kafka高級特性——消息事物

為什么要支持事物

  • 滿足“讀取-處理-寫入”模式
  • 流處理需求不斷增強
  • 不準確的數據處理的容忍度

數據傳輸的事物定義

  • 最多一次:消息不會被重復發(fā)送洞辣,最多被傳輸一次,但也有可能一次都不傳輸
  • 最少一次:消息不會被漏發(fā)送昙衅,最少被傳輸一次扬霜,但也有可能被重復傳輸
  • 精確的一次(Exactly once):不會被漏發(fā)送也不會被重復發(fā)送,每個消息都被傳輸一次而且僅僅被傳輸一次而涉,這是大家所期望的

事物保證

  • 內部重試問題:Procedure冪等處理
  • 多分區(qū)原子寫入


    多分區(qū)原子寫入

事物保證-避免僵尸實例

  • 每個事物Producer分配一個transactional.id著瓶,在進程重新啟動時能識別相同的Producer實例
  • Kafka增加了一個與transactional.id相關的epoch,存儲每個transactional.id元數據
  • 一旦epoch被觸發(fā)啼县,任何具有相同transactional.id和更舊的epoch的Producer被視為僵尸材原,Kafka會拒絕來自這些Procedure的后續(xù)事務性寫入

Kafka高級特性——零拷貝

零拷貝簡介

  • 網絡傳輸持久性日志塊
  • Java Nio channel.transforTo()方法
  • Linux sendfile系統調用

文件傳輸到網絡的公共數據路徑

  • 操作系統將數據從磁盤讀入到內核空間的頁緩存
  • 應用程序將數據從內核空間讀入到用戶空間緩存中
  • 應用程序將數據寫回到內核空間到socket緩存中
  • 操作系統將數據從socket緩存中復制到網卡緩沖區(qū),以便將數據經網絡發(fā)出

零拷貝路徑

  • 操作系統將數據從磁盤讀入到內核空間的頁緩存
  • 將數據的位置和長度的信息的描述符增加至內核空間(socket緩沖區(qū))
  • 操作系統將數據從內核復制到網卡緩沖區(qū)季眷,以便將數據經網絡發(fā)出
    零拷貝是指內核空間和用戶空間的交互的拷貝次數為零


    文件傳輸到網絡的公共數據路徑演變
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
  • 序言:七十年代末余蟹,一起剝皮案震驚了整個濱河市,隨后出現的幾起案子子刮,更是在濱河造成了極大的恐慌威酒,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件挺峡,死亡現場離奇詭異葵孤,居然都是意外死亡,警方通過查閱死者的電腦和手機沙郭,發(fā)現死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進店門佛呻,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人病线,你說我怎么就攤上這事吓著±鸬眨” “怎么了?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵绑莺,是天一觀的道長暖眼。 經常有香客問我,道長纺裁,這世上最難降的妖魔是什么诫肠? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮欺缘,結果婚禮上栋豫,老公的妹妹穿的比我還像新娘。我一直安慰自己谚殊,他們只是感情好丧鸯,可當我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著嫩絮,像睡著了一般丛肢。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上剿干,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天蜂怎,我揣著相機與錄音,去河邊找鬼置尔。 笑死杠步,一個胖子當著我的面吹牛,可吹牛的內容都是我干的撰洗。 我是一名探鬼主播篮愉,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼差导!你這毒婦竟也來了?” 一聲冷哼從身側響起猪勇,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤设褐,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后泣刹,有當地人在樹林里發(fā)現了一具尸體助析,經...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年椅您,在試婚紗的時候發(fā)現自己被綠了外冀。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡掀泳,死狀恐怖雪隧,靈堂內的尸體忽然破棺而出西轩,到底是詐尸還是另有隱情,我是刑警寧澤脑沿,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布藕畔,位于F島的核電站,受9級特大地震影響庄拇,放射性物質發(fā)生泄漏注服。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一措近、第九天 我趴在偏房一處隱蔽的房頂上張望溶弟。 院中可真熱鬧,春花似錦瞭郑、人聲如沸辜御。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽我抠。三九已至,卻和暖如春袜茧,著一層夾襖步出監(jiān)牢的瞬間菜拓,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工笛厦, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留纳鼎,地道東北人。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓裳凸,卻偏偏與公主長得像贱鄙,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子姨谷,可洞房花燭夜當晚...
    茶點故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內容