flink 源碼分析1之RichSinkFunction

flink sink 2 mysql demo

我們先看一個(gè)自定義sink 的demo步做,將 nc 的數(shù)據(jù)寫入到mysql 中虫溜。


import myflink.learn.model.Student;
import myflink.learn.sink.SinkToMySQL;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Author wtx
 * @Date 2019/1/23
 */
public class Flink2MysqlDemo {
    public static void main(String[] args) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        // 設(shè)置數(shù)據(jù)源
        DataStream<String> text = env.socketTextStream("localhost", 9000, "\n");

//        DataStream<Student> studentDataStream = text.map(new MapFunction<String, Student>() {
//            @Override
//            public Student map(String s) throws Exception {
//                Student student = new Student();
//                student.setName(s);
//                student.setId(atomicInteger.addAndGet(1));
//                return student;
//            }
//        });

        DataStream<Student> studentDataStream = text.map((str) -> {
            Student student = new Student();
            student.setName(str);
            student.setId(atomicInteger.addAndGet(1));
            return student;
        });

        studentDataStream.addSink(new SinkToMySQL());
        env.execute();
    }
}
@Slf4j
public class SinkToMySQL extends RichSinkFunction<Student> {
    PreparedStatement ps;
    private Connection connection;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = getConnection();
        String sql = "insert into student(id, name) values(?, ?);";
        ps = this.connection.prepareStatement(sql);
    }

    @Override
    public void close() throws Exception {
        super.close();
        //關(guān)閉連接和釋放資源
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    @Override
    public void invoke(Student value, Context context) throws Exception {
        //組裝數(shù)據(jù),執(zhí)行插入操作
        ps.setInt(1, value.getId());
        ps.setString(2, value.getName());
        ps.executeUpdate();
    }

    private static Connection getConnection() {
        Connection con = null;
        try {
            Class.forName("com.mysql.jdbc.Driver");
            con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "");
        } catch (Exception e) {
            log.error("exception e:", e);
        }
        return con;
    }
}

RichSinkFunction 的類結(jié)構(gòu)

可以看到自定義的sink 繼承自RichSinkFunction. 來看 RichSinkFunction 的類結(jié)構(gòu)

image


/**
 * The base interface for all user-defined functions.
 *
 * <p>This interface is empty in order to allow extending interfaces to
 * be SAM (single abstract method) interfaces that can be implemented via Java 8 lambdas.</p>
 */
@Public
public interface Function extends java.io.Serializable {
}
public interface SinkFunction<IN> extends Function, Serializable{
  default void invoke(IN value, Context context) throws Exception {
        invoke(value);
    }
  @Public
    interface Context<T> {
        long currentProcessingTime();
        long currentWatermark();
        Long timestamp();
    }
}

在上面的 SinkFunction 接口中實(shí)際只有一個(gè)方法,invoke(),將類型為IN 的value 寫入到sink 中。Context: 寫入value 時(shí)的上下文

@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable {
  private transient RuntimeContext runtimeContext;
  @Override
    public void open(Configuration parameters) throws Exception {}

    @Override
    public void close() throws Exception {}
}

而在AbstractRichFunction 只有默認(rèn)的生命周期方法 open() 和 close() 的空實(shí)現(xiàn)。 留給我們自己的比如上面的 SinkToMySQL那樣 實(shí)現(xiàn) 對(duì)于mysql 的 open() close() 另外可以類似的實(shí)現(xiàn)對(duì)于redis 的sink 類裳凸。查看flink-connector-redis 發(fā)現(xiàn)已經(jīng)有了RedisSink 類贱鄙。
我們先來看看簡(jiǎn)單的使用:只需要將 new SinkToMySQL() -> new RedisSink

studentDataStream.addSink(new SinkToMySQL());

               ==>

FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
studentDataStream.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisExampleMapper()));


public static final class RedisExampleMapper implements RedisMapper<Tuple2<String, Integer>> {
       public RedisCommandDescription getCommandDescription() {
           return new RedisCommandDescription(RedisCommand.HSET, "flink");
       }

       public String getKeyFromData(Tuple2<String, Integer> data) {
           return data.f0;
       }

       public String getValueFromData(Tuple2<String, Integer> data) {
           return data.f1.toString();
       }
   }

回到 RedisSink,也是通過繼承自RichSinkFunction:

public class RedisSink<IN> extends RichSinkFunction<IN> {
  public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper) {
  Preconditions.checkNotNull(flinkJedisConfigBase, "Redis connection pool config should not be null");
  Preconditions.checkNotNull(redisSinkMapper, "Redis Mapper can not be null");
  Preconditions.checkNotNull(redisSinkMapper.getCommandDescription(), "Redis Mapper data type description can not be null");

  this.flinkJedisConfigBase = flinkJedisConfigBase;

  this.redisSinkMapper = redisSinkMapper;
  RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription();
  this.redisCommand = redisCommandDescription.getCommand();
  this.additionalKey = redisCommandDescription.getAdditionalKey();
}
}

通過傳入: conf 和 redisSinkMapper 構(gòu)造出來RedisSink,然后 override invoke().

@Override
    public void invoke(IN input) throws Exception {
        String key = redisSinkMapper.getKeyFromData(input);
        String value = redisSinkMapper.getValueFromData(input);

        switch (redisCommand) {
            case RPUSH:
                this.redisCommandsContainer.rpush(key, value);
                break;
            case LPUSH:
                this.redisCommandsContainer.lpush(key, value);
                break;
            case SADD:
                this.redisCommandsContainer.sadd(key, value);
                break;
            case SET:
                this.redisCommandsContainer.set(key, value);
                break;
            case PFADD:
                this.redisCommandsContainer.pfadd(key, value);
                break;
            case PUBLISH:
                this.redisCommandsContainer.publish(key, value);
                break;
            case ZADD:
                this.redisCommandsContainer.zadd(this.additionalKey, value, key);
                break;
            case HSET:
                this.redisCommandsContainer.hset(this.additionalKey, key, value);
                break;
            default:
                throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
        }
    }

通過上面的 switch 我們知道,flink -> redis 目前只支持8個(gè)最基礎(chǔ)的redisCommand姨谷,要想調(diào)用其他的redisCommand贰逾,目前看還是需要自己實(shí)現(xiàn)。
另外在 RedisSink 中 還Override 了open 和 close 方法實(shí)現(xiàn)了對(duì)于redis 的連接和關(guān)閉菠秒。

RedisSink open() 方法

我們來看看open()

@Override
public void open(Configuration parameters) throws Exception {
        this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
    }

這里使用 RedisCommandsContainerBuilder 構(gòu)造了一個(gè) redisCommandsContainer疙剑,RedisCommandsContainer 是一個(gè) 接口(The container for all available Redis commands.)
剛好對(duì)應(yīng)上面的 switch (redisCommand)

void hset(String key, String hashField, String value);
void rpush(String listName, String value);
void lpush(String listName, String value);
void sadd(String setName, String value);
void publish(String channelName, String message);
void set(String key, String value);
void pfadd(String key, String element);
void zadd(String key, String score, String element);
void close() throws IOException;

在來看 RedisCommandsContainerBuilder。它通過flinkJedisConfigBase 構(gòu)造出來redisCommandsContainer践叠。其中 FlinkJedisConfigBase 定義了4個(gè)redis 連接時(shí)常用的屬性connectionTimeout言缤,maxTotal,maxIdle禁灼,minIdle

public abstract class FlinkJedisConfigBase implements Serializable {
    private static final long serialVersionUID = 1L;

    protected final int maxTotal;
    protected final int maxIdle;
    protected final int minIdle;
    protected final int connectionTimeout;

    protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle){
        Preconditions.checkArgument(connectionTimeout >= 0, "connection timeout can not be negative");
        Preconditions.checkArgument(maxTotal >= 0, "maxTotal value can not be negative");
        Preconditions.checkArgument(maxIdle >= 0, "maxIdle value can not be negative");
        Preconditions.checkArgument(minIdle >= 0, "minIdle value can not be negative");
        this.connectionTimeout = connectionTimeout;
        this.maxTotal = maxTotal;
        this.maxIdle = maxIdle;
        this.minIdle = minIdle;
    }

回到RedisCommandsContainerBuilder管挟,可以看到FlinkJedisPoolConfig 的實(shí)現(xiàn)類有3種,對(duì)應(yīng)

FlinkJedisPoolConfig jedis 連接池的方式
FlinkJedisClusterConfig redis cluster 方式
FlinkJedisSentinelConfig redis sentinel 方式

public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase){
        if(flinkJedisConfigBase instanceof FlinkJedisPoolConfig){
            FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig) flinkJedisConfigBase;
            return RedisCommandsContainerBuilder.build(flinkJedisPoolConfig);
        } else if (flinkJedisConfigBase instanceof FlinkJedisClusterConfig) {
            FlinkJedisClusterConfig flinkJedisClusterConfig = (FlinkJedisClusterConfig) flinkJedisConfigBase;
            return RedisCommandsContainerBuilder.build(flinkJedisClusterConfig);
        } else if (flinkJedisConfigBase instanceof FlinkJedisSentinelConfig) {
            FlinkJedisSentinelConfig flinkJedisSentinelConfig = (FlinkJedisSentinelConfig) flinkJedisConfigBase;
            return RedisCommandsContainerBuilder.build(flinkJedisSentinelConfig);
        } else {
            throw new IllegalArgumentException("Jedis configuration not found");
        }
    }

我們看 jedis pool 的方式,最終生成一個(gè) 由jedisPool 構(gòu)造的 RedisContainer

public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) {
        Preconditions.checkNotNull(jedisPoolConfig, "Redis pool config should not be Null");

        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
        genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle());
        genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal());
        genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle());

        JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(),
            jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(),
            jedisPoolConfig.getDatabase());
        return new RedisContainer(jedisPool);
    }

這里的RedisContainer 實(shí)現(xiàn)了 上面 RedisCommandsContainer 接口弄捕,并且通過jedis pool 的方式真正實(shí)現(xiàn)了接口中的hset 等8個(gè)方法僻孝。

同樣的 redis cluster 方式 最終生成的一個(gè) RedisClusterContainer

public static RedisCommandsContainer build(FlinkJedisClusterConfig jedisClusterConfig) {
        Preconditions.checkNotNull(jedisClusterConfig, "Redis cluster config should not be Null");

        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
        genericObjectPoolConfig.setMaxIdle(jedisClusterConfig.getMaxIdle());
        genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal());
        genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle());

        JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout(),
            jedisClusterConfig.getMaxRedirections(), genericObjectPoolConfig);
        return new RedisClusterContainer(jedisCluster);
    }

同樣: RedisClusterContainer 實(shí)現(xiàn)了 上面 RedisCommandsContainer 接口,并且通過jedis cluster 的方式真正實(shí)現(xiàn)了接口中的hset 等8個(gè)方法守谓。

sink 2 kafka

flink 同樣實(shí)現(xiàn)了 到 kafka 的寫入穿铆,先將 SinkToMySQL 換成 FlinkKafkaProducer

Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
text.addSink(new FlinkKafkaProducer<>("flink_2_kafka_demo", new SimpleStringSchema(),
                properties));

除了 kafka 所需的 properties 外,還有個(gè) SimpleStringSchema斋荞,按照上面的 Sink2Mysql 和 RedisSink荞雏,我們可以很容易的想到 FlinkKafkaProducer 實(shí)現(xiàn)了 RichSinkFunction,來看 源碼:

public class FlinkKafkaProducer<IN>
    extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer.KafkaTransactionState, FlinkKafkaProducer.KafkaTransactionContext> {

  }
image

sink 2 kafka 比較復(fù)雜,其中 TwoPhaseCommitSinkFunction 除了實(shí)現(xiàn)寫入kafka 消息外平酿,還有 兩階段提交協(xié)議的實(shí)現(xiàn)凤优。 <基本上還是依賴kafka的事務(wù)處理實(shí)現(xiàn)的,下篇文章在詳細(xì)分析>

先來看構(gòu)造函數(shù)

public FlinkKafkaProducer(
  String defaultTopicId,
  KeyedSerializationSchema<IN> serializationSchema,
  Properties producerConfig,
  Optional<FlinkKafkaPartitioner<IN>> customPartitioner,
  FlinkKafkaProducer.Semantic semantic,
  int kafkaProducersPoolSize) {
  super(new FlinkKafkaProducer.TransactionStateSerializer(), new FlinkKafkaProducer.ContextStateSerializer());

  this.defaultTopicId = checkNotNull(defaultTopicId, "defaultTopicId is null");
  this.schema = checkNotNull(serializationSchema, "serializationSchema is null");
  this.producerConfig = checkNotNull(producerConfig, "producerConfig is null");
  this.flinkKafkaPartitioner = checkNotNull(customPartitioner, "customPartitioner is null").orElse(null);
  this.semantic = checkNotNull(semantic, "semantic is null");
  this.kafkaProducersPoolSize = kafkaProducersPoolSize;
  checkState(kafkaProducersPoolSize > 0, "kafkaProducersPoolSize must be non empty");

  ClosureCleaner.clean(this.flinkKafkaPartitioner, true);
  ClosureCleaner.ensureSerializable(serializationSchema);

  // set the producer configuration properties for kafka record key value serializers.
  if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
    this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
  } else {
    LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
  }

  if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
    this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
  } else {
    LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
  }

  // eagerly ensure that bootstrap servers are set.
  if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
    throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties.");
  }

  if (!producerConfig.containsKey(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) {
    long timeout = DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMilliseconds();
    checkState(timeout < Integer.MAX_VALUE && timeout > 0, "timeout does not fit into 32 bit integer");
    this.producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) timeout);
    LOG.warn("Property [{}] not specified. Setting it to {}", ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_KAFKA_TRANSACTION_TIMEOUT);
  }

  // Enable transactionTimeoutWarnings to avoid silent data loss
  // See KAFKA-6119 (affects versions 0.11.0.0 and 0.11.0.1):
  // The KafkaProducer may not throw an exception if the transaction failed to commit
  if (semantic == FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
    final Object object = this.producerConfig.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
    final long transactionTimeout;
    if (object instanceof String && StringUtils.isNumeric((String) object)) {
      transactionTimeout = Long.parseLong((String) object);
    } else if (object instanceof Number) {
      transactionTimeout = ((Number) object).longValue();
    } else {
      throw new IllegalArgumentException(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG
        + " must be numeric, was " + object);
    }
    super.setTransactionTimeout(transactionTimeout);
    super.enableTransactionTimeoutWarnings(0.8);
  }

  this.topicPartitionsMap = new HashMap<>();
}

構(gòu)造函數(shù)看起來很長(zhǎng)蜈彼,基本都是在給屬性賦值筑辨。其中:

defaultTopicId: kafka的 tpoicId
serializationSchema
producerConfig
customPartitioner
semantic:
kafkaProducersPoolSize: default KafkaProducers pool size

這里面有個(gè) enum Semantic:

public enum Semantic {
 EXACTLY_ONCE,
 AT_LEAST_ONCE,
 NONE
}

. Semantic.EXACTLY_ONCE 有且僅有一次
the Flink producer will write all messages in a Kafka transaction that will be committed to Kafka on a checkpoint.
. Semantic.AT_LEAST_ONCE 最少一次
the Flink producer will wait for all outstanding messages in the Kafka buffers to be acknowledged by the Kafka producer on a checkpoint

代碼量很大,我們先看兩個(gè)簡(jiǎn)單的close和open幸逆,與mysql 和 redis 的close,open相比棍辕,代碼行數(shù)也是比較大的。
close 方法中對(duì)于EXACTLY_ONCE 的Semantic秉颗,首先拿到 currentTransaction痢毒,如果不為空送矩,flush(),對(duì)于AT_LEAST_ONCE 和 NONE 類型的蚕甥,需要手動(dòng)調(diào)用 currentTransaction.producer.close();
然后 將pendingTransactions 的transaction closeQuietly。

@Override
    public void close() throws FlinkKafkaException {
        final FlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction();
        if (currentTransaction != null) {
            // to avoid exceptions on aborting transactions with some pending records
            flush(currentTransaction);

            // normal abort for AT_LEAST_ONCE and NONE do not clean up resources because of producer reusing, thus
            // we need to close it manually
            switch (semantic) {
                case EXACTLY_ONCE:
                    break;
                case AT_LEAST_ONCE:
                case NONE:
                    currentTransaction.producer.close();
                    break;
            }
        }
        try {
            super.close();
        }
        catch (Exception e) {
            asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
        }
        // make sure we propagate pending errors
        checkErroneous();
        pendingTransactions().forEach(transaction ->
            IOUtils.closeQuietly(transaction.getValue().producer)
        );
    }

open 方法比較簡(jiǎn)單栋荸,根據(jù)是否logFailuresOnly菇怀,構(gòu)造不同的 Callback,用于在 發(fā)送給kafka消息成功后凭舶,調(diào)用不用的 Callback

@Override
    public void open(Configuration configuration) throws Exception {
        if (logFailuresOnly) {
            callback = new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e != null) {
                        LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
                    }
                    acknowledgeMessage();
                }
            };
        }
        else {
            callback = new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null && asyncException == null) {
                        asyncException = exception;
                    }
                    acknowledgeMessage();
                }
            };
        }

        super.open(configuration);
    }

在來看看invoke方法,最終調(diào)用transaction里面的producer 去發(fā)送消息給kafka。 transaction.producer.send(record, callback);

@Override
    public void invoke(FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context) throws FlinkKafkaException {
        checkErroneous();

        byte[] serializedKey = schema.serializeKey(next);
        byte[] serializedValue = schema.serializeValue(next);
        String targetTopic = schema.getTargetTopic(next);
        if (targetTopic == null) {
            targetTopic = defaultTopicId;
        }

        Long timestamp = null;
        if (this.writeTimestampToKafka) {
            timestamp = context.timestamp();
        }

        ProducerRecord<byte[], byte[]> record;
        int[] partitions = topicPartitionsMap.get(targetTopic);
        if (null == partitions) {
            partitions = getPartitionsByTopic(targetTopic, transaction.producer);
            topicPartitionsMap.put(targetTopic, partitions);
        }
        if (flinkKafkaPartitioner != null) {
            record = new ProducerRecord<>(
                targetTopic,
                flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),
                timestamp,
                serializedKey,
                serializedValue);
        } else {
            record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
        }
    /**
     * pendingRecords 是一個(gè)AtomicLong
     * private final AtomicLong pendingRecords = new AtomicLong();
     */
        pendingRecords.incrementAndGet();
        transaction.producer.send(record, callback);
    }

里面有個(gè)flinkKafkaPartitioner

public abstract class FlinkKafkaPartitioner<T> implements Serializable{
  public void open(int parallelInstanceId, int parallelInstances) {
        // overwrite this method if needed.
    }

  // Determine the id of the partition that the record should be written to
  // 決定了record 應(yīng)該寫入到哪個(gè)分區(qū)爱沟。返回該分區(qū)的id
  public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions);

}

目前 flink 中只剩下一個(gè)具體的實(shí)現(xiàn),partitions[parallelInstanceId % partitions.length];

public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
  @Override
    public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
        return partitions[parallelInstanceId % partitions.length];
    }
}

總結(jié)

flink 通過 繼承 RichSinkFunction 實(shí)現(xiàn)對(duì)不同存儲(chǔ)的sink帅霜。并且只需要 overide 里面的open,close呼伸,invoke 三個(gè)方法即可

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末身冀,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子括享,更是在濱河造成了極大的恐慌搂根,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,104評(píng)論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件铃辖,死亡現(xiàn)場(chǎng)離奇詭異剩愧,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)娇斩,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門仁卷,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人犬第,你說我怎么就攤上這事锦积。” “怎么了歉嗓?”我有些...
    開封第一講書人閱讀 168,697評(píng)論 0 360
  • 文/不壞的土叔 我叫張陵充包,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我遥椿,道長(zhǎng)基矮,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,836評(píng)論 1 298
  • 正文 為了忘掉前任冠场,我火速辦了婚禮家浇,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘碴裙。我一直安慰自己钢悲,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,851評(píng)論 6 397
  • 文/花漫 我一把揭開白布舔株。 她就那樣靜靜地躺著莺琳,像睡著了一般。 火紅的嫁衣襯著肌膚如雪载慈。 梳的紋絲不亂的頭發(fā)上惭等,一...
    開封第一講書人閱讀 52,441評(píng)論 1 310
  • 那天,我揣著相機(jī)與錄音办铡,去河邊找鬼辞做。 笑死琳要,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的秤茅。 我是一名探鬼主播稚补,決...
    沈念sama閱讀 40,992評(píng)論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼框喳!你這毒婦竟也來了课幕?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,899評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤五垮,失蹤者是張志新(化名)和其女友劉穎撰豺,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體拼余,經(jīng)...
    沈念sama閱讀 46,457評(píng)論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡污桦,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,529評(píng)論 3 341
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了匙监。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片凡橱。...
    茶點(diǎn)故事閱讀 40,664評(píng)論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖亭姥,靈堂內(nèi)的尸體忽然破棺而出稼钩,到底是詐尸還是另有隱情,我是刑警寧澤达罗,帶...
    沈念sama閱讀 36,346評(píng)論 5 350
  • 正文 年R本政府宣布坝撑,位于F島的核電站,受9級(jí)特大地震影響粮揉,放射性物質(zhì)發(fā)生泄漏巡李。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,025評(píng)論 3 334
  • 文/蒙蒙 一扶认、第九天 我趴在偏房一處隱蔽的房頂上張望侨拦。 院中可真熱鬧,春花似錦辐宾、人聲如沸狱从。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,511評(píng)論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽季研。三九已至,卻和暖如春誉察,著一層夾襖步出監(jiān)牢的瞬間与涡,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,611評(píng)論 1 272
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留递沪,地道東北人豺鼻。 一個(gè)月前我還...
    沈念sama閱讀 49,081評(píng)論 3 377
  • 正文 我出身青樓综液,卻偏偏與公主長(zhǎng)得像款慨,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子谬莹,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,675評(píng)論 2 359

推薦閱讀更多精彩內(nèi)容