spark 2.2.0 源碼閱讀-foreach算子-到stage劃分

Spark 的RDD 支持兩種類型的操作:

  • transformations : 從一個(gè)已經(jīng)存在的數(shù)據(jù)集創(chuàng)建一個(gè)新的數(shù)據(jù)集
  • actions:在數(shù)據(jù)集上進(jìn)行計(jì)算后返回driver程序一個(gè)值
    首先建議看代碼的時(shí)候先看看spark相關(guān)的原理介紹,比如Stage的劃分
    今天從foreach 算子學(xué)習(xí)一下spark的源碼:

class RDD.scala

// Actions (launch a job to return a value to the user program)

 /**
  * Applies a function f to all elements of this RDD.
  */
 def foreach(f: T => Unit): Unit = withScope {
 //首先執(zhí)行清除閉包锡移,使得能夠被序列化發(fā)送到tasks上砸逊,如果設(shè)置了checkSerializable 侣滩,會(huì)      檢查是否可以序列化
   val cleanF = sc.clean(f) 
  //對(duì)一個(gè)rdd的所有partitons執(zhí)行cleanF并將結(jié)果作為一個(gè)數(shù)組返回
   sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) 
 }

進(jìn)入SparkContext

再看下runJob

  /**
   * Run a job on all partitions in an RDD and return the results in an array.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @return in-memory collection with a result of the job (each collection element will contain
   * a result from one partition)
   */
  def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
    runJob(rdd, func, 0 until rdd.partitions.length) // 入?yún)⑹莚dd义桂,傳入的方法漱竖,和分區(qū)的數(shù)組
  }

這個(gè)函數(shù)執(zhí)行的runJob 入?yún)⒍嗔艘粋€(gè)TaskContext

  /**
   * Run a function on a given set of partitions in an RDD and return the results as an array.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   * partitions of the target RDD, e.g. for operations like `first()`
   * @return in-memory collection with a result of the job (each collection element will contain
   * a result from one partition)
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: Iterator[T] => U,
      partitions: Seq[Int]): Array[U] = {
    val cleanedFunc = clean(func)
    runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
  }
  /**
   * Run a function on a given set of partitions in an RDD and return the results as an array.
   * The function that is run against each partition additionally takes `TaskContext` argument.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   * partitions of the target RDD, e.g. for operations like `first()`
   * @return in-memory collection with a result of the job (each collection element will contain
   * a result from one partition)
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int]): Array[U] = {
    //添加了一個(gè)結(jié)果數(shù)組
    val results = new Array[U](partitions.size)
    runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
    results
  }

下面這個(gè)開(kāi)始進(jìn)入比較核心的了org.apache.spark.scheduler.DAGScheduler

  /**
   * Run a function on a given set of partitions in an RDD and pass the results to the given
   * handler function. This is the main entry point for all actions in Spark.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   * partitions of the target RDD, e.g. for operations like `first()`
   * @param resultHandler callback to pass each result to
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      //判斷sparkContext是否已經(jīng)退出抗斤,這是一個(gè)AtomicBoolean 類型
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    // 獲取代碼的位置囚企,就是代碼的某一行,打印如下 reduce at SparkAccumulatorTest.scala:43
    val callSite = getCallSite

    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    //dagScheduluer 開(kāi)始執(zhí)行
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    //#進(jìn)度條相關(guān)瑞眼,標(biāo)記所有結(jié)束的stage
    progressBar.foreach(_.finishAll())
    //doCheckPoint 會(huì)被父RDD遞歸調(diào)用龙宏,通過(guò)保存的形式執(zhí)行checkpoint
    rdd.doCheckpoint()
  }
/**
   * Run an action job on the given RDD and pass all the results to the resultHandler function as
   * they arrive.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   *   partitions of the target RDD, e.g. for operations like first()
   * @param callSite where in the user program this job was called
   * @param resultHandler callback to pass each result to
   * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
   *
   * @note Throws `Exception` when the job fails
   */
  def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)//提交job,返回一個(gè)waiter
    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
    waiter.completionFuture.value.get match {
      case scala.util.Success(_) =>
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      case scala.util.Failure(exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
        val callerStackTrace = Thread.currentThread().getStackTrace.tail
        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
        throw exception
    }
  }

繼續(xù)看submitJob

 /**
   * Submit an action job to the scheduler.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   *   partitions of the target RDD, e.g. for operations like first()
   * @param callSite where in the user program this job was called
   * @param resultHandler callback to pass each result to
   * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
   *
   * @return a JobWaiter object that can be used to block until the job finishes executing
   *         or can be used to cancel the job.
   *
   * @throws IllegalArgumentException when partitions ids are illegal
   */
  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    //校驗(yàn)分區(qū)數(shù)是否合法
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }
  //jobid加1伤疙,原子操作银酗,調(diào)用unsafe方法
    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      // Return immediately if the job is running 0 tasks
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }

    assert(partitions.size > 0)
    //方法居然還可以這么用,第一次見(jiàn)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    //創(chuàng)建一個(gè)jobwaiter徒像,等待DAGScheduler的調(diào)用結(jié)束花吟,當(dāng)一個(gè)job結(jié)束的時(shí)候回調(diào)用resultHandler進(jìn)行處理
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    //然后創(chuàng)建一個(gè)JobSummmited的對(duì)象,并提交到eventProcessLoop
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }

首先先看下eventProcessLoop,事件通知的機(jī)制

主要四個(gè)個(gè)方法:

  • onReceive 入口方法厨姚,執(zhí)行 doOnReceive 以及關(guān)閉timerContext
  • doOnReceive 具體的每種事件的執(zhí)行邏輯
  • onError
  • onStop
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
  extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {

  private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer

  /**
   * The main event loop of the DAG scheduler.
   */
  override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
      doOnReceive(event)
    } finally {
      timerContext.stop()
    }
  }

  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
//job 提交
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
//MapStage提交
    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
//Stage 取消
    case StageCancelled(stageId, reason) =>
      dagScheduler.handleStageCancellation(stageId, reason)
// Job取消
    case JobCancelled(jobId, reason) =>
      dagScheduler.handleJobCancellation(jobId, reason)

    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)

    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()

    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)

    case ExecutorLost(execId, reason) =>
      val filesLost = reason match {
        case SlaveLost(_, true) => true
        case _ => false
      }
      dagScheduler.handleExecutorLost(execId, filesLost)

    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)

    case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)

    case completion: CompletionEvent =>
      dagScheduler.handleTaskCompletion(completion)

    case TaskSetFailed(taskSet, reason, exception) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

    case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
  }

  override def onError(e: Throwable): Unit = {
    logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e)
    try {
      dagScheduler.doCancelAllJobs()
    } catch {
      case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
    }
    dagScheduler.sc.stopInNewThread()
  }

  override def onStop(): Unit = {
    // Cancel any active jobs in postStop hook
    dagScheduler.cleanUpAfterSchedulerStop()
  }
}

著重看下 DAGScheduler 的 handleJobSubmitted衅澈,首先createResultStage

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      //根據(jù)當(dāng)前的Rdd和分區(qū) 創(chuàng)建Stage  這兒與spark 1.6.3不一樣 newResultStage
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }

    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    submitStage(finalStage)
  }

org.apache.spark.scheduler.DAGScheduler#createResultStage

創(chuàng)建ResultStage,也就是job的最后一個(gè)Stage

  /**
   * Create a ResultStage associated with the provided jobId.
   */
  private def createResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    //關(guān)鍵的地方:這里需要獲取或者創(chuàng)建父Stage,這里只是最后一個(gè)stage的父stage谬墙,其實(shí)就是為了構(gòu)建下面stage的parents的入?yún)⒔癫肌tage的入?yún)⒂腥缦聨讉€(gè),id拭抬,rdd部默,function,partition造虎,parrents傅蹂,jobid
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

org.apache.spark.scheduler.DAGScheduler#getOrCreateParentStages

  /**
   * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
   * the provided firstJobId.
   */
  private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    getShuffleDependencies(rdd).map { shuffleDep =>
      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
  }

org.apache.spark.scheduler.DAGScheduler#getShuffleDependencies

返回第一個(gè)shuffer依賴的RDD的Dependency,就是找到最后一個(gè)stage的依賴

/**
   * Returns shuffle dependencies that are immediate parents of the given RDD.
   *
   * This function will not return more distant ancestors.  For example, if C has a shuffle
   * dependency on B which has a shuffle dependency on A:
   *
   * A <-- B <-- C
   *
   * calling this function with rdd C will only return the B <-- C dependency.
   *
   * This function is scheduler-visible for the purpose of unit testing.
   */
  private[scheduler] def getShuffleDependencies(
      rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
    val parents = new HashSet[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]
    //創(chuàng)建一個(gè)stack,如果是非shuffer依賴則push,shuffer依賴的話則pop份蝴,最后返回父shuffer依賴
    val waitingForVisit = new Stack[RDD[_]]
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
      val toVisit = waitingForVisit.pop()
      if (!visited(toVisit)) {
        visited += toVisit
        toVisit.dependencies.foreach {
          //如果是shuffle依賴則加入到parents的set中
          case shuffleDep: ShuffleDependency[_, _, _] =>
            parents += shuffleDep
          case dependency =>
            //如果是非shuffle依賴則壓入堆棧
            waitingForVisit.push(dependency.rdd)
        }
      }
    }
    parents
  }

org.apache.spark.scheduler.DAGScheduler#getOrCreateShuffleMapStage

/**
   * Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the
   * shuffle map stage doesn't already exist, this method will create the shuffle map stage in
   * addition to any missing ancestor shuffle map stages.如果在shuffleIdToMapStage已經(jīng)存在則返回shufflemapstage犁功,否則的話這個(gè)方法將會(huì)創(chuàng)建一個(gè)shuffle map stage
   */
  private def getOrCreateShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) =>
        stage

      case None =>
        // Create stages for all missing ancestor shuffle dependencies.
        getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
          // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
          // that were not already in shuffleIdToMapStage, it's possible that by the time we
          // get to a particular dependency in the foreach loop, it's been added to
          // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
          // SPARK-13902 for more information.
          if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
            createShuffleMapStage(dep, firstJobId)
          }
        }
        // Finally, create a stage for the given shuffle dependency.
        createShuffleMapStage(shuffleDep, firstJobId)
    }
  }

這塊代碼描述比較詳細(xì)我簡(jiǎn)單翻譯下
如果shuffleIdToMapStage中存在一個(gè)shuffle map stage 則返回,如果不存在則創(chuàng)建一個(gè)新的shuffle map stage,什么是 ShuffleMapStages

/**
 * ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle.
 * They occur right before each shuffle operation, and might contain multiple pipelined operations
 * before that (e.g. map and filter). When executed, they save map output files that can later be
 * fetched by reduce tasks. The `shuffleDep` field describes the shuffle each stage is part of,
 * and variables like `outputLocs` and `numAvailableOutputs` track how many map outputs are ready.
 *
 * ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage.
 * For such stages, the ActiveJobs that submitted them are tracked in `mapStageJobs`. Note that
 * there can be multiple ActiveJobs trying to compute the same shuffle map stage.
 */

ShuffleMapStage 就是shuflle 過(guò)程中DAG執(zhí)行的中間狀態(tài)的stage婚夫,發(fā)生在shuffle操作之前浸卦,可能包含多個(gè)pipeline操作。一旦執(zhí)行結(jié)束案糙,它就會(huì)保存map輸出文件限嫌。這些文件會(huì)被后續(xù)的reduce task 獲取。

org.apache.spark.scheduler.DAGScheduler#getMissingAncestorShuffleDependencies

將所有的寬依賴壓入到ancestors时捌,代碼合上面的分析基本一致怒医,不同的是這個(gè)會(huì)從最后向前一直查找到第一個(gè)rdd,這就是DAG的生成方式奢讨。
rdd1 = rdd0.reduce
rdd2 = rdd1.map
rdd3 = rdd2.reduce
rdd4 = rdd3.foreach
則根據(jù)以下邏輯 rdd4 入棧稚叹,進(jìn)入循環(huán),未訪問(wèn)過(guò)禽笑,獲取shuffle依賴入录,返回rdd2,然后將ShuffleDependency壓入ancestors,壓入rdd2,然后rdd2出棧佳镜,rdd2獲得shuffle依賴 rdd0,壓入ancestors僚稿。

 /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
  private def getMissingAncestorShuffleDependencies(
      rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
    val ancestors = new Stack[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]]
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
      val toVisit = waitingForVisit.pop()
      if (!visited(toVisit)) {
        visited += toVisit
        getShuffleDependencies(toVisit).foreach { shuffleDep =>
          if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
            ancestors.push(shuffleDep)
            waitingForVisit.push(shuffleDep.rdd)
          } // Otherwise, the dependency and its ancestors have already been registered.
        }
      }
    }
    ancestors
  }

org.apache.spark.scheduler.DAGScheduler#createShuffleMapStage

遍歷壓入的stack
的ShuffleDependency創(chuàng)建stage
創(chuàng)建一個(gè)ShuffleMapStage,能夠產(chǎn)出shuffle依賴的分區(qū)蟀伸。如果之前的一個(gè)stage已經(jīng)產(chǎn)出了同樣的shuffle數(shù)據(jù)蚀同,這個(gè)方法將會(huì)從之前的shuffle的仍舊可用的輸出路徑進(jìn)行復(fù)制,從而避免重復(fù)產(chǎn)生數(shù)據(jù)啊掏。

/**
   * Creates a ShuffleMapStage that generates the given shuffle dependency's partitions. If a
   * previously run stage generated the same shuffle data, this function will copy the output
   * locations that are still available from the previous shuffle to avoid unnecessarily
   * regenerating data.
   */
  def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
    val rdd = shuffleDep.rdd
    //taskNums 等于paritions的數(shù)量
    val numTasks = rdd.partitions.length
    //創(chuàng)建父stage
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)

    stageIdToStage(id) = stage
    shuffleIdToMapStage(shuffleDep.shuffleId) = stage
    updateJobIdStageIdMaps(jobId, stage)

    if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
      // A previously run stage generated partitions for this shuffle, so for each output
      // that's still available, copy information about that output location to the new stage
      // (so we don't unnecessarily re-compute that data).
      val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
      val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
      (0 until locs.length).foreach { i =>
        if (locs(i) ne null) {
          // locs(i) will be null if missing
          stage.addOutputLoc(i, locs(i))
        }
      }
    } else {
      // Kind of ugly: need to register RDDs with the cache and map output tracker here
      // since we can't do it in the RDD constructor because # of partitions is unknown
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
    }
    stage
  }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末蠢络,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子迟蜜,更是在濱河造成了極大的恐慌刹孔,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,427評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件娜睛,死亡現(xiàn)場(chǎng)離奇詭異髓霞,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)畦戒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,551評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門方库,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人障斋,你說(shuō)我怎么就攤上這事纵潦⌒旌祝” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 165,747評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵邀层,是天一觀的道長(zhǎng)返敬。 經(jīng)常有香客問(wèn)我,道長(zhǎng)被济,這世上最難降的妖魔是什么救赐? 我笑而不...
    開(kāi)封第一講書人閱讀 58,939評(píng)論 1 295
  • 正文 為了忘掉前任涧团,我火速辦了婚禮只磷,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘泌绣。我一直安慰自己钮追,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,955評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布阿迈。 她就那樣靜靜地躺著元媚,像睡著了一般。 火紅的嫁衣襯著肌膚如雪苗沧。 梳的紋絲不亂的頭發(fā)上刊棕,一...
    開(kāi)封第一講書人閱讀 51,737評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音待逞,去河邊找鬼甥角。 笑死,一個(gè)胖子當(dāng)著我的面吹牛识樱,可吹牛的內(nèi)容都是我干的嗤无。 我是一名探鬼主播,決...
    沈念sama閱讀 40,448評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼怜庸,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼当犯!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起割疾,我...
    開(kāi)封第一講書人閱讀 39,352評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤嚎卫,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后宏榕,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體拓诸,經(jīng)...
    沈念sama閱讀 45,834評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,992評(píng)論 3 338
  • 正文 我和宋清朗相戀三年担扑,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了恰响。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,133評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡涌献,死狀恐怖胚宦,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤枢劝,帶...
    沈念sama閱讀 35,815評(píng)論 5 346
  • 正文 年R本政府宣布井联,位于F島的核電站,受9級(jí)特大地震影響您旁,放射性物質(zhì)發(fā)生泄漏烙常。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,477評(píng)論 3 331
  • 文/蒙蒙 一鹤盒、第九天 我趴在偏房一處隱蔽的房頂上張望蚕脏。 院中可真熱鬧,春花似錦侦锯、人聲如沸驼鞭。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 32,022評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)挣棕。三九已至,卻和暖如春亲桥,著一層夾襖步出監(jiān)牢的瞬間洛心,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,147評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工题篷, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留词身,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,398評(píng)論 3 373
  • 正文 我出身青樓悼凑,卻偏偏與公主長(zhǎng)得像偿枕,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子户辫,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,077評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容