SparkSteaming運(yùn)行流程分析以及CheckPoint操作

本文主要通過(guò)源碼來(lái)了解SparkStreaming程序從任務(wù)生成到任務(wù)完成整個(gè)執(zhí)行流程以及中間伴隨的checkpoint操作

注:下面源碼只貼出跟分析內(nèi)容有關(guān)的代碼椿肩,其他省略

1 結(jié)論先行

SparkStreaming的啟動(dòng)盖矫、任務(wù)生成、任務(wù)結(jié)束、Checkpoint操作流程如下:

  • SparkStreamingContext.start() 啟動(dòng) JobScheduler
  • JobScheduler的啟動(dòng)操作
    • JobScheduler 創(chuàng)建了 EventLoop[JobSchedulerEvent] 來(lái)處理 JobStarted 和 JobCompleted 事件
    • 啟動(dòng) JobGenerator
  • JobGenerator 的啟動(dòng)操作
    • JobGenerator 創(chuàng)建了 EventLoop[JobGeneratorEvent] 來(lái)處理 GenerateJobs湃密、ClearMetaData缎玫、DoCheckPoint和ClearCheckpointData 事件
    • 創(chuàng)建RecurringTimer周期性地發(fā)送 GenerateJobs 事件用于任務(wù)的周期性創(chuàng)建和執(zhí)行
  • JobGenerator的任務(wù)生成工作
    • 調(diào)用 geneateJobs() 來(lái)生成 JobSet 并通過(guò) JobScheduler.submitJobset 進(jìn)行任務(wù)的提交
      • submitJobset 將 Job 作為參數(shù)傳入 JobHandler 值骇,并將 JobHandler 丟進(jìn)線(xiàn)程池 JobExecutor 中執(zhí)行
    • 發(fā)送 DoCheckPoint 事件進(jìn)行元數(shù)據(jù)的 checkpoint
  • 任務(wù)完成
    • JobHandler 中任務(wù)完成之后會(huì)發(fā)送 JobCompleted 事件,JobScheduler.EventLoop 接收到該事件后會(huì)進(jìn)行處理掐禁,并且判斷 JobSet 全部完成之后,發(fā)送 ClearMetaData 事件买置,進(jìn)行數(shù)據(jù)的 checkpoint 或者刪除

2 具體分析

應(yīng)用程序入口:

val sparkConf = new SparkConf().setAppName("SparkStreaming")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong))
ssc.start()
ssc.awaitTermination()

一旦ssc.start()調(diào)用之后椅邓,程序便真正開(kāi)始運(yùn)行

第一步:
SparkStreamingContext.start()進(jìn)行如下主要工作:

  • 調(diào)用JobScheduler.start()
  • 發(fā)送StreamingListenerStreamingStarted消息
SparkStreamingContext.start()

def start(): Unit = synchronized {
    state match {
      case INITIALIZED =>
        StreamingContext.ACTIVATION_LOCK.synchronized {
          StreamingContext.assertNoOtherContextIsActive()
          try{
              ...
              scheduler.start()
            }
            state = StreamingContextState.ACTIVE
            scheduler.listenerBus.post(
              StreamingListenerStreamingStarted(System.currentTimeMillis()))
          } catch {
            ...
          }
          StreamingContext.setActiveContext(this)
        }
        ...
      case ACTIVE =>
        logWarning("StreamingContext has already been started")
      case STOPPED =>
        throw new IllegalStateException("StreamingContext has already been stopped")
    }
  }

第二步:
調(diào)用JobScheduler.start()執(zhí)行以下主要操作:

  • 創(chuàng)建EventLoop用于處理接收到的JobSchedulerEvent笨使,processEvent就是實(shí)際的處理邏輯
  • 調(diào)用jobGenerator.start()
JobScheduler.start():

def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler has already been started

    logDebug("Starting JobScheduler")
    //創(chuàng)建一個(gè)Event監(jiān)聽(tīng)器并啟動(dòng)
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    eventLoop.start()
    ...
    //啟動(dòng)JobGenerator
    jobGenerator.start()
    ...
  }

第三步:
JobGenerator.start()執(zhí)行以下主要操作:

  • 創(chuàng)建EventLoop[JobGeneratorEvent]用于處理JobGeneratorEvent事件
  • 開(kāi)始執(zhí)行job的生成工作
    • 創(chuàng)建一個(gè)timer周期地執(zhí)行eventLoop.post(GenerateJobs(new Time(longTime)))
    • JobGenerator.start()中的EventLoop收到GenerateJobs事件后,去執(zhí)行g(shù)enerateJobs(time)
    • generateJobs()中生成JobSet并調(diào)用jobScheduler.submitJobSet()進(jìn)行提交讯泣,然后發(fā)送一個(gè)DoCheckpointEvent進(jìn)行checkpoint
JobGenerator.start()

def start(): Unit = synchronized {
    if (eventLoop != null) return // generator has already been started
    //創(chuàng)建checkpointWriter用于將checkpoint信息持久化
    checkpointWriter
    //創(chuàng)建了Event監(jiān)聽(tīng)器霍掺,用于監(jiān)聽(tīng)JobGeneratorEvent并處理
    eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = {
        jobScheduler.reportError("Error in job generator", e)
      }
    }
    eventLoop.start()

    if (ssc.isCheckpointPresent) {
      //從checkpoint中恢復(fù)
      restart()
    } else {
      //首次創(chuàng)建
      startFirstTime()
    }
}

首次啟動(dòng)會(huì)調(diào)用startFirstTime(),在該方法中主要是調(diào)用已經(jīng)初始化好的RecurringTimer.start()進(jìn)行周期性的發(fā)送GenerateJobs事件滔韵,這個(gè)周期是ssc.graph.batchDuration.milliseconds也就是你所設(shè)置的batchTime,JobGenerate.start()中所創(chuàng)建好的EventLoop收到GenerateJobs事件掌实,就會(huì)執(zhí)行processEvent(),從而執(zhí)行g(shù)enerateJobs(time)來(lái)進(jìn)行Job的生成工作

  private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

  private def startFirstTime() {
    val startTime = new Time(timer.getStartTime())
    graph.start(startTime - graph.batchDuration)
    timer.start(startTime.milliseconds)
    logInfo("Started JobGenerator at " + startTime)
  }

  private def processEvent(event: JobGeneratorEvent) {
    logDebug("Got event " + event)
    event match {
      case GenerateJobs(time) => generateJobs(time)
      case ClearMetadata(time) => clearMetadata(time)
      case DoCheckpoint(time, clearCheckpointDataLater) =>
        doCheckpoint(time, clearCheckpointDataLater)
      case ClearCheckpointData(time) => clearCheckpointData(time)
    }
  }

generateJobs的主要工作:

  • 生成JobSet并調(diào)用jobScheduler.submitJobSet()進(jìn)行提交
  • 發(fā)送一個(gè)DoCheckpointEvent進(jìn)行checkpoint
  private def generateJobs(time: Time) {
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }

第一個(gè)操作:jobScheduler.submitJobSet()中的主要操作是遍歷jobSet中的job陪蜻,并將其作為參數(shù)傳入JobHandler對(duì)象中,并將JobHandler丟到j(luò)obExecutor中去執(zhí)行贱鼻。JobHandler是實(shí)現(xiàn)了Runnable宴卖,它的run()主要做了以下三件事

  • 發(fā)送JobStarted事件
  • 執(zhí)行Job.run()
  • 發(fā)送JobCompleted事件
def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
      logInfo("Added jobs for time " + jobSet.time)
    }
}

private class JobHandler(job: Job) extends Runnable with Logging {
    import JobScheduler._
    def run() {
      try {
        var _eventLoop = eventLoop
        if (_eventLoop != null) {
          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))//發(fā)送JobStarted事件
          SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
            job.run()
          }
          _eventLoop = eventLoop
          if (_eventLoop != null) {
            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))//發(fā)送JobCompleted事件
          }
        } else {
        }
      } finally {
        ssc.sparkContext.setLocalProperties(oldProps)
      }
    }
  }

第二個(gè)操作:發(fā)送DoCheckpoint事件

JobScheduler.start()中創(chuàng)建的EventLoop的核心內(nèi)容是processEvent(event)方法,Event的類(lèi)型有三種邻悬,分別是JobStarted症昏、JobCompleted和ErrorReported,EventLoop收到DoCheckpoint事件后會(huì)執(zhí)行doCheckpoint():

  //JobGenerator.processEvent()
  private def processEvent(event: JobGeneratorEvent) {
    logDebug("Got event " + event)
    event match {
      ...
      case DoCheckpoint(time, clearCheckpointDataLater) =>
        doCheckpoint(time, clearCheckpointDataLater)
      ...
    }
  }

doCheckpoint()調(diào)用graph.updateCheckpointData進(jìn)行checkpoint信息的更新父丰,調(diào)用checkpointWriter.write對(duì)checkpoint信息進(jìn)行持久化

  private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
    if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
      logInfo("Checkpointing graph for time " + time)
      //將新的checkpoint寫(xiě)到
      ssc.graph.updateCheckpointData(time)
      //將checkpoint寫(xiě)到文件系統(tǒng)中
      checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
    } else if (clearCheckpointDataLater) {
      markBatchFullyProcessed(time)
    }
  }

checkpoint的update中主要是調(diào)用DStreamGraph.updateCheckpointData:

def updateCheckpointData(time: Time) {
    logInfo("Updating checkpoint data for time " + time)
    this.synchronized {
      outputStreams.foreach(_.updateCheckpointData(time))
    }
    logInfo("Updated checkpoint data for time " + time)
  }

outputStreams.foreach(_.updateCheckpointData(time))則是調(diào)用了DStream中的updateCheckpointData方法肝谭,而該方法主要是調(diào)用checkpointData.update(currentTime)來(lái)進(jìn)行更新,并且調(diào)用該DStream所依賴(lài)的DStream進(jìn)行更新蛾扇。

private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()

private[streaming] def updateCheckpointData(currentTime: Time) {
    logDebug(s"Updating checkpoint data for time $currentTime")
    checkpointData.update(currentTime)
    dependencies.foreach(_.updateCheckpointData(currentTime))
    logDebug(s"Updated checkpoint data for time $currentTime: $checkpointData")
  }

我們接下來(lái)來(lái)看看checkpointData.update(currentTime):我們可以在DStream中看到以下的實(shí)現(xiàn):

private[streaming] val checkpointData = new DStreamCheckpointData(this)

我們接著找到了:DStreamCheckpointData.update攘烛,DStreamCheckpointData有其他子類(lèi)用于自定義保存的內(nèi)容和邏輯

  //key為指定時(shí)間,value為checkpoint file內(nèi)容
  @transient private var timeToCheckpointFile = new HashMap[Time, String]
  // key為batchtime镀首,value該batch中最先checkpointed RDD的time
  @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time]
  protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]]

def update(time: Time) {
    //從dsteam中獲得要checkpoint的RDDs,generatedRDDs就是一個(gè)HashMap[Time, RDD[T]]
    val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined)
                                       .map(x => (x._1, x._2.getCheckpointFile.get))
    logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n"))

    // checkpoint文件添加到最后要進(jìn)行序列化的HashMap中
    if (!checkpointFiles.isEmpty) {
      currentCheckpointFiles.clear()
      currentCheckpointFiles ++= checkpointFiles
      //更新checkpointfile
      timeToCheckpointFile ++= currentCheckpointFiles
      // key為傳入的time坟漱,value為最先進(jìn)行checkpoint的rdd的time
      timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering)
    }
  }

第四步:任務(wù)完成
在上面generateJobs中所調(diào)用的jobScheduler.submitJobSet()中提到每個(gè)Job都會(huì)作為參數(shù)傳入JobHandler,而JobHandler會(huì)丟到JobExecutor中去執(zhí)行,而JobHandler的主要工作是發(fā)送JobStarted事件更哄,執(zhí)行完任務(wù)后會(huì)發(fā)送JobCompleted事件芋齿,而JobScheduler.EventLoop收到事件后會(huì)執(zhí)行handleJobCompletion方法

 //JobScheduler.processEvent()
 private def processEvent(event: JobSchedulerEvent) {
    try {
      event match {
        case JobStarted(job, startTime) => handleJobStart(job, startTime)
        case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
        case ErrorReported(m, e) => handleError(m, e)
      }
    } catch {
      case e: Throwable =>
        reportError("Error in job scheduler", e)
    }
  }

handleJobCompletion方法會(huì)調(diào)用jobSet.handleJobCompletion(job),并且在最后會(huì)判斷jobSet是否已經(jīng)全部完成成翩,如果是的話(huà)會(huì)執(zhí)行jobGenerator.onBatchCompletion(jobSet.time)

private def handleJobCompletion(job: Job, completedTime: Long) {
    val jobSet = jobSets.get(job.time)
    jobSet.handleJobCompletion(job)
    job.setEndTime(completedTime)
    listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
    logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
    if (jobSet.hasCompleted) {
      listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
    }
    job.result match {
      case Failure(e) =>
        reportError("Error running job " + job, e)
      case _ => //如果所有事件完成則會(huì)執(zhí)行以下操作
        if (jobSet.hasCompleted) {
          jobSets.remove(jobSet.time)
          jobGenerator.onBatchCompletion(jobSet.time)
          logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
            jobSet.totalDelay / 1000.0, jobSet.time.toString,
            jobSet.processingDelay / 1000.0
          ))
        }
    }
  }

此時(shí)到JobGenerator中找到onBatchCompletion():

def onBatchCompletion(time: Time) {
    eventLoop.post(ClearMetadata(time))
}

JobGenerator.processEvent()執(zhí)行clearMetadata(time)

private def processEvent(event: JobGeneratorEvent) {
    logDebug("Got event " + event)
    event match {
      case GenerateJobs(time) => generateJobs(time)
      case ClearMetadata(time) => clearMetadata(time)
      case DoCheckpoint(time, clearCheckpointDataLater) =>
        doCheckpoint(time, clearCheckpointDataLater)
      case ClearCheckpointData(time) => clearCheckpointData(time)
    }
}

clearMetadata()對(duì)原數(shù)據(jù)進(jìn)行checkpoint或者刪除

private def clearMetadata(time: Time) {
    ssc.graph.clearMetadata(time)

    // If checkpointing is enabled, then checkpoint,
    // else mark batch to be fully processed
    if (shouldCheckpoint) {
      //如果需要進(jìn)行checkpoint觅捆,發(fā)送DoCheckpoint事件 
      eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
    } else {
      //將meta數(shù)據(jù)進(jìn)行刪除
    }
}

附:RecurringTimer和EventLoop的源碼,并作簡(jiǎn)單的注釋

RecurringTimer的代碼如下:

private[streaming]
class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
  extends Logging {
  //創(chuàng)建一個(gè)thread捕传,用來(lái)執(zhí)行l(wèi)oop
  private val thread = new Thread("RecurringTimer - " + name) {
    setDaemon(true)
    override def run() { loop }
  }

  @volatile private var prevTime = -1L
  @volatile private var nextTime = -1L
  @volatile private var stopped = false

  def getStartTime(): Long = {
    (math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period
  }

  def getRestartTime(originalStartTime: Long): Long = {
    val gap = clock.getTimeMillis() - originalStartTime
    (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
  }

  //start方法中主要是啟動(dòng)thread惠拭,用于執(zhí)行thread中的loop
  def start(startTime: Long): Long = synchronized {
    nextTime = startTime
    thread.start()
    logInfo("Started timer for " + name + " at time " + nextTime)
    nextTime
  }

  def start(): Long = {
    start(getStartTime())
  }

  def stop(interruptTimer: Boolean): Long = synchronized {
    if (!stopped) {
      stopped = true
      if (interruptTimer) {
        thread.interrupt()
      }
      thread.join()
      logInfo("Stopped timer for " + name + " after time " + prevTime)
    }
    prevTime
  }

  private def triggerActionForNextInterval(): Unit = {
    clock.waitTillTime(nextTime)
    callback(nextTime)
    prevTime = nextTime
    nextTime += period
    logDebug("Callback for " + name + " called at time " + prevTime)
  }

  //周期性地執(zhí)行callback的內(nèi)容,也就是 
  private def loop() {
    try {
      while (!stopped) {
        triggerActionForNextInterval()
      }
      triggerActionForNextInterval()
    } catch {
      case e: InterruptedException =>
    }
  }
}

EventLoop的源碼:主要功能就是創(chuàng)建一個(gè)線(xiàn)程在后臺(tái)判斷是否Event進(jìn)來(lái)庸论,有的話(huà)則進(jìn)行相應(yīng)的處理

private[spark] abstract class EventLoop[E](name: String) extends Logging {

  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

  private val stopped = new AtomicBoolean(false)

  private val eventThread = new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            onReceive(event)
          } catch {
            case NonFatal(e) =>
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }

  }

  def start(): Unit = {
    if (stopped.get) {
      throw new IllegalStateException(name + " has already been stopped")
    }
    // Call onStart before starting the event thread to make sure it happens before onReceive
    onStart()
    eventThread.start()
  }

  def stop(): Unit = {
    if (stopped.compareAndSet(false, true)) {
      eventThread.interrupt()
      var onStopCalled = false
      try {
        eventThread.join()
        // Call onStop after the event thread exits to make sure onReceive happens before onStop
        onStopCalled = true
        onStop()
      } catch {
        case ie: InterruptedException =>
          Thread.currentThread().interrupt()
          if (!onStopCalled) {
            // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since
            // it's already called.
            onStop()
          }
      }
    } else {
      // Keep quiet to allow calling `stop` multiple times.
    }
  }

  //將event放進(jìn)eventQueue中职辅,在eventThread進(jìn)行相應(yīng)的處理
  def post(event: E): Unit = {
    eventQueue.put(event)
  }

  //返回eventThread是否存活
  def isActive: Boolean = eventThread.isAlive

  //eventThread啟動(dòng)前調(diào)用
  protected def onStart(): Unit = {}

  //在eventThred結(jié)束后調(diào)用
  protected def onStop(): Unit = {}

  //實(shí)現(xiàn)類(lèi)實(shí)現(xiàn)Event的處理邏輯
  protected def onReceive(event: E): Unit

  //出錯(cuò)時(shí)的處理邏輯
  protected def onError(e: Throwable): Unit

}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市聂示,隨后出現(xiàn)的幾起案子域携,更是在濱河造成了極大的恐慌,老刑警劉巖鱼喉,帶你破解...
    沈念sama閱讀 218,682評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件秀鞭,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡扛禽,警方通過(guò)查閱死者的電腦和手機(jī)锋边,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén),熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)编曼,“玉大人豆巨,你說(shuō)我怎么就攤上這事∑。” “怎么了往扔?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,083評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)熊户。 經(jīng)常有香客問(wèn)我萍膛,道長(zhǎng),這世上最難降的妖魔是什么嚷堡? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,763評(píng)論 1 295
  • 正文 為了忘掉前任蝗罗,我火速辦了婚禮,結(jié)果婚禮上蝌戒,老公的妹妹穿的比我還像新娘绿饵。我一直安慰自己,他們只是感情好瓶颠,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,785評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布拟赊。 她就那樣靜靜地躺著,像睡著了一般粹淋。 火紅的嫁衣襯著肌膚如雪吸祟。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,624評(píng)論 1 305
  • 那天桃移,我揣著相機(jī)與錄音屋匕,去河邊找鬼。 笑死借杰,一個(gè)胖子當(dāng)著我的面吹牛过吻,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播,決...
    沈念sama閱讀 40,358評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼纤虽,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼乳绕!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起逼纸,我...
    開(kāi)封第一講書(shū)人閱讀 39,261評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤洋措,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后杰刽,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體菠发,經(jīng)...
    沈念sama閱讀 45,722評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年贺嫂,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了滓鸠。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,030評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡第喳,死狀恐怖糜俗,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情墩弯,我是刑警寧澤吩跋,帶...
    沈念sama閱讀 35,737評(píng)論 5 346
  • 正文 年R本政府宣布,位于F島的核電站渔工,受9級(jí)特大地震影響锌钮,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜引矩,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,360評(píng)論 3 330
  • 文/蒙蒙 一梁丘、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧旺韭,春花似錦氛谜、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,941評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至织盼,卻和暖如春杨何,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背沥邻。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,057評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工危虱, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人唐全。 一個(gè)月前我還...
    沈念sama閱讀 48,237評(píng)論 3 371
  • 正文 我出身青樓埃跷,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子弥雹,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,976評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容