第十篇SparkStreaming手動(dòng)維護(hù)Kafka Offset的幾種方式

Spark Streaming No Receivers 方式的createDirectStream 方法不使用接收器痢站,而是創(chuàng)建輸入流直接從Kafka 集群節(jié)點(diǎn)拉取消息绿渣。輸入流保證每個(gè)消息從Kafka 集群拉取以后只完全轉(zhuǎn)換一次撩轰,保證語(yǔ)義一致性膀哲。但是當(dāng)作業(yè)發(fā)生故障或重啟時(shí)讳侨,要保障從當(dāng)前的消費(fèi)位點(diǎn)去處理數(shù)據(jù)(即Exactly Once語(yǔ)義)行冰,單純的依靠SparkStreaming本身的機(jī)制是不太理想的溺蕉,生產(chǎn)環(huán)境中通常借助手動(dòng)管理offset的方式來(lái)維護(hù)kafka的消費(fèi)位點(diǎn)。本文分享將介紹如何手動(dòng)管理Kafka的Offset悼做,希望對(duì)你有所幫助疯特。本文主要包括以下內(nèi)容:

  • 如何使用MySQL管理Kafka的Offset
  • 如何使用Redis管理Kafka的OffSet

如何使用MySQL管理Kafka的Offset

我們可以從Spark Streaming 應(yīng)用程序中編寫代碼來(lái)手動(dòng)管理Kafka偏移量,偏移量可以從每一批流處理中生成的RDDS偏移量來(lái)獲取肛走,獲取方式為:

KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
// 獲取偏移量
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 ...
  }

當(dāng)獲取到偏移量之后漓雅,可以將將其保存到外部存儲(chǔ)設(shè)備中(MySQL、Redis朽色、Zookeeper邻吞、HBase等)。

使用案例代碼

  • MySQL中用于保存偏移量的表
CREATE TABLE `topic_par_group_offset` (
  `topic` varchar(255) NOT NULL,
  `partition` int(11) NOT NULL,
  `groupid` varchar(255) NOT NULL,
  `offset` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`topic`,`partition`,`groupid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ;
  • 常量配置類:ConfigConstants
object ConfigConstants {
  // Kafka配置
  val kafkaBrokers = "kms-2:9092,kms-3:9092,kms-4:9092"
  val groupId = "group_test"
  val kafkaTopics = "test"
  val batchInterval = Seconds(5)
  val streamingStorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  val kafkaKeySer = "org.apache.kafka.common.serialization.StringSerializer"
  val kafkaValueSer = "org.apache.kafka.common.serialization.StringSerializer"
  val sparkSerializer = "org.apache.spark.serializer.KryoSerializer"
  val batchSize = 16384
  val lingerMs = 1
  val bufferMemory = 33554432
  // MySQL配置
  val user = "root"
  val password = "123qwe"
  val url = "jdbc:mysql://localhost:3306/kafka_offset"
  val driver = "com.mysql.jdbc.Driver"
  // 檢查點(diǎn)配置
  val checkpointDir = "file:///e:/checkpoint"
  val checkpointInterval = Seconds(10)
  // Redis配置
  val redisAddress = "192.168.10.203"
  val redisPort = 6379
  val redisAuth = "123qwe"
  val redisTimeout = 3000
}
  • JDBC連接工具類:JDBCConnPool
object JDBCConnPool {

  val log: Logger = Logger.getLogger(JDBCConnPool.getClass)
  var dataSource: BasicDataSource = null
  /**
    * 創(chuàng)建數(shù)據(jù)源
    *
    * @return
    */
  def getDataSource(): BasicDataSource = {
    if (dataSource == null) {
      dataSource = new BasicDataSource()
      dataSource.setDriverClassName(ConfigConstants.driver)
      dataSource.setUrl(ConfigConstants.url)
      dataSource.setUsername(ConfigConstants.user)
      dataSource.setPassword(ConfigConstants.password)
      dataSource.setMaxTotal(50)
      dataSource.setInitialSize(3)
      dataSource.setMinIdle(3)
      dataSource.setMaxIdle(10)
      dataSource.setMaxWaitMillis(2 * 10000)
      dataSource.setRemoveAbandonedTimeout(180)
      dataSource.setRemoveAbandonedOnBorrow(true)
      dataSource.setRemoveAbandonedOnMaintenance(true)
      dataSource.setTestOnReturn(true)
      dataSource.setTestOnBorrow(true)
    }
    return dataSource
  }
  /**
    * 釋放數(shù)據(jù)源
    */
  def closeDataSource() = {
    if (dataSource != null) {
      dataSource.close()
    }
  }
  /**
    * 獲取數(shù)據(jù)庫(kù)連接
    *
    * @return
    */
  def getConnection(): Connection = {
    var conn: Connection = null
    try {
      if (dataSource != null) {
        conn = dataSource.getConnection()
      } else {
        conn = getDataSource().getConnection()
      }
    } catch {
      case e: Exception =>
        log.error(e.getMessage(), e)
    }
    conn
  }

  /**
    * 關(guān)閉連接
    */
 def closeConnection (ps:PreparedStatement , conn:Connection ) {
    if (ps != null) {
      try {
        ps.close();
      } catch  {
        case e:Exception =>
          log.error("預(yù)編譯SQL語(yǔ)句對(duì)象PreparedStatement關(guān)閉異常葫男!" + e.getMessage(), e);
      }
    }
    if (conn != null) {
      try {
        conn.close();
      } catch  {
        case e:Exception =>
        log.error("關(guān)閉連接對(duì)象Connection異常抱冷!" + e.getMessage(), e);
      }
    }
  }
}
  • Kafka生產(chǎn)者:KafkaProducerTest
object KafkaProducerTest {
  def main(args: Array[String]): Unit = {
    val  props : Properties = new Properties()
    props.put("bootstrap.servers", ConfigConstants.kafkaBrokers)
    props.put("batch.size", ConfigConstants.batchSize.asInstanceOf[Integer])
    props.put("linger.ms", ConfigConstants.lingerMs.asInstanceOf[Integer])
    props.put("buffer.memory", ConfigConstants.bufferMemory.asInstanceOf[Integer])
    props.put("key.serializer",ConfigConstants.kafkaKeySer)
    props.put("value.serializer", ConfigConstants.kafkaValueSer)
   val  producer :  Producer[String, String] = new KafkaProducer[String, String](props)
    val startTime : Long  = System.currentTimeMillis()
    for ( i <- 1 to 100) {
      producer.send(new ProducerRecord[String, String](ConfigConstants.kafkaTopics, "Spark", Integer.toString(i)))
    }
  println("消耗時(shí)間:" + (System.currentTimeMillis() - startTime))
    producer.close()
  }
}
  • 讀取和保存Offset:

該對(duì)象的作用是從外部設(shè)備中讀取和寫入Offset,包括MySQL和Redis

object OffsetReadAndSave {

  /**
    * 從MySQL中獲取偏移量
    *
    * @param groupid
    * @param topic
    * @return
    */

  def getOffsetMap(groupid: String, topic: String): mutable.Map[TopicPartition, Long] = {

    val conn = JDBCConnPool.getConnection()
    val selectSql = "select * from topic_par_group_offset where groupid = ? and topic = ?"
    val ppst = conn.prepareStatement(selectSql)
    ppst.setString(1, groupid)
    ppst.setString(2, topic)

    val result: ResultSet = ppst.executeQuery()

    // 主題分區(qū)偏移量
    val topicPartitionOffset = mutable.Map[TopicPartition, Long]()

    while (result.next()) {

      val topicPartition: TopicPartition = new TopicPartition(result.getString("topic"), result.getInt("partition"))

      topicPartitionOffset += (topicPartition -> result.getLong("offset"))
    }

    JDBCConnPool.closeConnection(ppst, conn)
    topicPartitionOffset
  }

  /**
    * 從Redis中獲取偏移量
    *
    * @param groupid
    * @param topic
    * @return
    */
  def getOffsetFromRedis(groupid: String, topic: String): Map[TopicPartition, Long] = {
    val jedis: Jedis = JedisConnPool.getConnection()
    var offsets = mutable.Map[TopicPartition, Long]()

    val key = s"${topic}_${groupid}"
    val fields : java.util.Map[String, String] = jedis.hgetAll(key)
    for (partition <- JavaConversions.mapAsScalaMap(fields)) {

      offsets.put(new TopicPartition(topic, partition._1.toInt), partition._2.toLong)
    }

    offsets.toMap

  }
  /**
    * 將偏移量寫入MySQL
    *
    * @param groupid     消費(fèi)者組ID
    * @param offsetRange 消息偏移量范圍
    */

  def saveOffsetRanges(groupid: String, offsetRange: Array[OffsetRange]) = {

    val conn = JDBCConnPool.getConnection()
    val insertSql = "replace into topic_par_group_offset(`topic`, `partition`, `groupid`, `offset`) values(?,?,?,?)"
    val ppst = conn.prepareStatement(insertSql)

    for (offset <- offsetRange) {

      ppst.setString(1, offset.topic)
      ppst.setInt(2, offset.partition)
      ppst.setString(3, groupid)
      ppst.setLong(4, offset.untilOffset)
      ppst.executeUpdate()
    }
    JDBCConnPool.closeConnection(ppst, conn)

  }
  /**
    * 將偏移量保存到Redis中
    * @param groupid
    * @param offsetRange
    */
  def saveOffsetToRedis(groupid: String, offsetRange: Array[OffsetRange]) = {
    val jedis :Jedis = JedisConnPool.getConnection()
    for(offsetRange<-offsetRange){
      val topic=offsetRange.topic
      val partition=offsetRange.partition
      val offset=offsetRange.untilOffset
      // key為topic_groupid,field為partition梢褐,value為offset
      jedis.hset(s"${topic}_${groupid}",partition.toString,offset.toString)
    }
  }
}

  • 業(yè)務(wù)處理類

該對(duì)象是業(yè)務(wù)處理邏輯旺遮,主要是消費(fèi)Kafka數(shù)據(jù),再處理之后進(jìn)行手動(dòng)將偏移量保存到MySQL中盈咳。在啟動(dòng)程序時(shí)耿眉,會(huì)判斷外部存儲(chǔ)設(shè)備中是否存在偏移量,如果是首次啟動(dòng)則從最初的消費(fèi)位點(diǎn)消費(fèi)鱼响,如果存在Offset鸣剪,則從當(dāng)前的Offset去消費(fèi)。

觀察現(xiàn)象:當(dāng)首次啟動(dòng)時(shí)會(huì)從頭消費(fèi)數(shù)據(jù),手動(dòng)停止程序西傀,然后再次啟動(dòng)斤寇,會(huì)發(fā)現(xiàn)會(huì)從當(dāng)前提交的偏移量消費(fèi)數(shù)據(jù)。

object ManualCommitOffset {
  
  def main(args: Array[String]): Unit = {

    val brokers = ConfigConstants.kafkaBrokers
    val groupId = ConfigConstants.groupId
    val topics = ConfigConstants.kafkaTopics
    val batchInterval = ConfigConstants.batchInterval

    val conf = new SparkConf()
      .setAppName(ManualCommitOffset.getClass.getSimpleName)
      .setMaster("local[1]")
      .set("spark.serializer",ConfigConstants.sparkSerializer)

    val ssc = new StreamingContext(conf, batchInterval)
    // 必須開啟checkpoint,否則會(huì)報(bào)錯(cuò)
    ssc.checkpoint(ConfigConstants.checkpointDir)

    ssc.sparkContext.setLogLevel("OFF")
    //使用broker和topic創(chuàng)建direct kafka stream
    val topicSet = topics.split(" ").toSet

    // kafka連接參數(shù)
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
    )


    // 從MySQL中讀取該主題對(duì)應(yīng)的消費(fèi)者組的分區(qū)偏移量
    val offsetMap = OffsetReadAndSave.getOffsetMap(groupId, topics)
    var inputDStream: InputDStream[ConsumerRecord[String, String]] = null

    //如果MySQL中已經(jīng)存在了偏移量,則應(yīng)該從該偏移量處開始消費(fèi)
    if (offsetMap.size > 0) {
      println("存在偏移量拥褂,從該偏移量處進(jìn)行消費(fèi)D锼!")

      inputDStream = KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))

    } else {
      //如果MySQL中沒有存在了偏移量饺鹃,從最早開始消費(fèi)
      inputDStream = KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))

    }
    // checkpoint時(shí)間間隔莫秆,必須是batchInterval的整數(shù)倍
    inputDStream.checkpoint(ConfigConstants.checkpointInterval)

    // 保存batch的offset
    var offsetRanges = Array[OffsetRange]()
    // 獲取當(dāng)前DS的消息偏移量
    val transformDS = inputDStream.transform { rdd =>
      // 獲取offset
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }


    /**
      * 狀態(tài)更新函數(shù)
      * @param newValues:新的value值
      * @param stateValue:狀態(tài)值
      * @return
      */
    def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {
      var oldvalue = stateValue.getOrElse(0) // 獲取狀態(tài)值
      // 遍歷當(dāng)前數(shù)據(jù),并更新狀態(tài)
      for (newValue <- newValues) {
        oldvalue += newValue
      }
      // 返回最新的狀態(tài)
      Option(oldvalue)
    }
    // 業(yè)務(wù)邏輯處理
    // 該示例統(tǒng)計(jì)消息key的個(gè)數(shù)悔详,用于查看是否是從已經(jīng)提交的偏移量消費(fèi)數(shù)據(jù)
    transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()

    // 打印偏移量和數(shù)據(jù)信息镊屎,觀察輸出的結(jié)果
    transformDS.foreachRDD { (rdd, time) =>
      // 遍歷打印該RDD數(shù)據(jù)
      rdd.foreach { record =>
        println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
      }
      // 打印消費(fèi)偏移量信息
      for (o <- offsetRanges) {
        println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")

      }

      //將偏移量保存到到MySQL中
      OffsetReadAndSave.saveOffsetRanges(groupId, offsetRanges)
    }
    ssc.start()
    ssc.awaitTermination()
  }
}

如何使用Redis管理Kafka的OffSet

  • Redis連接類
object JedisConnPool {
  val config = new JedisPoolConfig
  //最大連接數(shù)
  config.setMaxTotal(60)
  //最大空閑連接數(shù)
  config.setMaxIdle(10)
  config.setTestOnBorrow(true)

  //服務(wù)器ip
  val redisAddress :String = ConfigConstants.redisAddress.toString
  // 端口號(hào)
  val redisPort:Int = ConfigConstants.redisPort.toInt
  //訪問密碼
  val redisAuth :String = ConfigConstants.redisAuth.toString
  //等待可用連接的最大時(shí)間
  val redisTimeout:Int = ConfigConstants.redisTimeout.toInt

  val pool = new JedisPool(config,redisAddress,redisPort,redisTimeout,redisAuth)

  def getConnection():Jedis = {
    pool.getResource
  }

}
  • 業(yè)務(wù)邏輯處理

該對(duì)象與上面的基本類似,只不過(guò)使用的是Redis來(lái)進(jìn)行存儲(chǔ)Offset茄螃,存儲(chǔ)到Redis的數(shù)據(jù)類型是Hash缝驳,基本格式為:[key field value] -> [ topic_groupid partition offset],即 key為topic_groupid,field為partition归苍,value為offset用狱。

object ManualCommitOffsetToRedis {

  def main(args: Array[String]): Unit = {

    val brokers = ConfigConstants.kafkaBrokers
    val groupId = ConfigConstants.groupId
    val topics = ConfigConstants.kafkaTopics
    val batchInterval = ConfigConstants.batchInterval

    val conf = new SparkConf()
      .setAppName(ManualCommitOffset.getClass.getSimpleName)
      .setMaster("local[1]")
      .set("spark.serializer", ConfigConstants.sparkSerializer)


    val ssc = new StreamingContext(conf, batchInterval)
    // 必須開啟checkpoint,否則會(huì)報(bào)錯(cuò)
    ssc.checkpoint(ConfigConstants.checkpointDir)

    ssc.sparkContext.setLogLevel("OFF")
    //使用broker和topic創(chuàng)建direct kafka stream
    val topicSet = topics.split(" ").toSet

    // kafka連接參數(shù)
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
    )


    // 從Redis中讀取該主題對(duì)應(yīng)的消費(fèi)者組的分區(qū)偏移量
    val offsetMap = OffsetReadAndSave.getOffsetFromRedis(groupId, topics)
    var inputDStream: InputDStream[ConsumerRecord[String, String]] = null

    //如果Redis中已經(jīng)存在了偏移量,則應(yīng)該從該偏移量處開始消費(fèi)
    if (offsetMap.size > 0) {
      println("存在偏移量,從該偏移量處進(jìn)行消費(fèi)F雌夏伊!")

      inputDStream = KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams, offsetMap))

    } else {
      //如果Redis中沒有存在了偏移量,從最早開始消費(fèi)
      inputDStream = KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))

    }
    // checkpoint時(shí)間間隔吻氧,必須是batchInterval的整數(shù)倍
    inputDStream.checkpoint(ConfigConstants.checkpointInterval)

    // 保存batch的offset
    var offsetRanges = Array[OffsetRange]()
    // 獲取當(dāng)前DS的消息偏移量
    val transformDS = inputDStream.transform { rdd =>
      // 獲取offset
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }


    /**
      * 狀態(tài)更新函數(shù)
      *
      * @param newValues  :新的value值
      * @param stateValue :狀態(tài)值
      * @return
      */
    def updateFunc(newValues: Seq[Int], stateValue: Option[Int]): Option[Int] = {
      var oldvalue = stateValue.getOrElse(0) // 獲取狀態(tài)值
      // 遍歷當(dāng)前數(shù)據(jù)溺忧,并更新狀態(tài)
      for (newValue <- newValues) {
        oldvalue += newValue
      }
      // 返回最新的狀態(tài)
      Option(oldvalue)
    }
    // 業(yè)務(wù)邏輯處理
    // 該示例統(tǒng)計(jì)消息key的個(gè)數(shù),用于查看是否是從已經(jīng)提交的偏移量消費(fèi)數(shù)據(jù)
    transformDS.map(meg => ("spark", meg.value().toInt)).updateStateByKey(updateFunc).print()

    // 打印偏移量和數(shù)據(jù)信息盯孙,觀察輸出的結(jié)果
    transformDS.foreachRDD { (rdd, time) =>
      // 遍歷打印該RDD數(shù)據(jù)
      rdd.foreach { record =>
        println(s"key=${record.key()},value=${record.value()},partition=${record.partition()},offset=${record.offset()}")
      }
      // 打印消費(fèi)偏移量信息
      for (o <- offsetRanges) {
        println(s"topic=${o.topic},partition=${o.partition},fromOffset=${o.fromOffset},untilOffset=${o.untilOffset},time=${time}")

      }

      //將偏移量保存到到Redis中
      OffsetReadAndSave.saveOffsetToRedis(groupId, offsetRanges)
    }
    ssc.start()
    ssc.awaitTermination()
  }

}

總結(jié)

本文介紹了如何使用外部存儲(chǔ)設(shè)備來(lái)保存Kafka的消費(fèi)位點(diǎn)鲁森,通過(guò)詳細(xì)的代碼示例說(shuō)明了使用MySQL和Redis管理消費(fèi)位點(diǎn)的方式。當(dāng)然振惰,外部存儲(chǔ)設(shè)備很多刀森,用戶也可以使用其他的存儲(chǔ)設(shè)備進(jìn)行管理Offset,比如Zookeeper和HBase等报账,其基本處理思路都十分相似研底。

大數(shù)據(jù)技術(shù)與數(shù)倉(cāng)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市透罢,隨后出現(xiàn)的幾起案子榜晦,更是在濱河造成了極大的恐慌,老刑警劉巖羽圃,帶你破解...
    沈念sama閱讀 222,627評(píng)論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件乾胶,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)识窿,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,180評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門斩郎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人喻频,你說(shuō)我怎么就攤上這事缩宜。” “怎么了甥温?”我有些...
    開封第一講書人閱讀 169,346評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵锻煌,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我姻蚓,道長(zhǎng)宋梧,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,097評(píng)論 1 300
  • 正文 為了忘掉前任狰挡,我火速辦了婚禮捂龄,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘加叁。我一直安慰自己跺讯,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,100評(píng)論 6 398
  • 文/花漫 我一把揭開白布殉农。 她就那樣靜靜地躺著,像睡著了一般局荚。 火紅的嫁衣襯著肌膚如雪超凳。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,696評(píng)論 1 312
  • 那天耀态,我揣著相機(jī)與錄音轮傍,去河邊找鬼。 笑死首装,一個(gè)胖子當(dāng)著我的面吹牛创夜,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播仙逻,決...
    沈念sama閱讀 41,165評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼驰吓,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了系奉?” 一聲冷哼從身側(cè)響起檬贰,我...
    開封第一講書人閱讀 40,108評(píng)論 0 277
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎缺亮,沒想到半個(gè)月后翁涤,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,646評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,709評(píng)論 3 342
  • 正文 我和宋清朗相戀三年葵礼,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了号阿。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,861評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡鸳粉,死狀恐怖扔涧,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情赁严,我是刑警寧澤扰柠,帶...
    沈念sama閱讀 36,527評(píng)論 5 351
  • 正文 年R本政府宣布,位于F島的核電站疼约,受9級(jí)特大地震影響卤档,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜程剥,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,196評(píng)論 3 336
  • 文/蒙蒙 一劝枣、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧织鲸,春花似錦舔腾、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,698評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至瀑踢,卻和暖如春扳还,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背橱夭。 一陣腳步聲響...
    開封第一講書人閱讀 33,804評(píng)論 1 274
  • 我被黑心中介騙來(lái)泰國(guó)打工氨距, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人棘劣。 一個(gè)月前我還...
    沈念sama閱讀 49,287評(píng)論 3 379
  • 正文 我出身青樓俏让,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親茬暇。 傳聞我的和親對(duì)象是個(gè)殘疾皇子首昔,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,860評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容