[spark] TaskScheduler 任務(wù)提交與調(diào)度源碼解析

在DAGScheduler劃分為Stage并以TaskSet的形式提交給TaskScheduler后收捣,再由TaskScheduler通過TaskSetMagager對taskSet的task進(jìn)行調(diào)度與執(zhí)行尽纽。

taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

submitTasks方法的實現(xiàn)在TaskScheduler的實現(xiàn)類TaskSchedulerImpl中弄贿。先看整個實現(xiàn):

override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    backend.reviveOffers()
  }
val manager = createTaskSetManager(taskSet, maxTaskFailures)

先為當(dāng)前TaskSet創(chuàng)建TaskSetManager呐萌,TaskSetManager負(fù)責(zé)管理一個單獨taskSet的每一個task搁胆,決定某個task是否在一個executor上啟動,如果task失敗粤铭,負(fù)責(zé)重試task直到task重試次數(shù)梆惯,并通過延遲調(diào)度來執(zhí)行task的位置感知調(diào)度。

val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager

key為stageId,value為一個HashMap锨络,其中key為stageAttemptId是钥,value為TaskSet拂到。

val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }

isZombie是TaskSetManager中所有tasks是否不需要執(zhí)行(成功執(zhí)行或者stage被刪除)的一個標(biāo)記兄旬,如果該TaskSet沒有被完全執(zhí)行并且已經(jīng)存在和新進(jìn)來的taskset一樣的另一個TaskSet宋舷,則拋出異常祝蝠,確保一個stage不能有兩個taskSet同時運行细溅。

schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

將當(dāng)前taskSet添加到調(diào)度池中喇聊,schedulableBuilder的類型是SchedulerBuilder的一個trait朋贬,有兩個實現(xiàn)FIFOSchedulerBuilder和 FairSchedulerBuilder,并且默認(rèn)采用的是FIFO方式御滩。

schedulableBuilder是SparkContext 中newTaskSchedulerImpl(sc)在創(chuàng)建TaskSchedulerImpl的時候通過scheduler.initialize(backend)的initialize方法對schedulableBuilder進(jìn)行了實例化沟娱。

def initialize(backend: SchedulerBackend) {
    this.backend = backend
    // temporarily set rootPool name to empty
    rootPool = new Pool("", schedulingMode, 0, 0)
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
      }
    }
    schedulableBuilder.buildPools()
  }
backend.reviveOffers()

接下來調(diào)用了SchedulerBackend的riviveOffers方法向schedulerBackend申請資源砰蠢。backend也是通過scheduler.initialize(backend)的參數(shù)傳遞過來的台舱,具體是在SparkContext 中被創(chuàng)建的灰嫉。

val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)

回到向schedulerBackend申請資源浑厚,
調(diào)用CoarseGrainedSchedulerBackend的reviveOffers方法物蝙,該方法給driverEndpoint發(fā)送ReviveOffer消息。

 override def reviveOffers() {
    driverEndpoint.send(ReviveOffers)
  }

driverEndpoint收到ReviveOffer消息后調(diào)用makeOffers方法。

private def makeOffers() {
      // Filter out executors under killing
      val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
      val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq
      launchTasks(scheduler.resourceOffers(workOffers))
    }

該方法先過濾出活躍的executor并封裝成WorkerOffer责掏,WorkerOffer包含executorId湃望、host、可用的cores三個信息瞳浦。這里的executorDataMap是HashMap[String, ExecutorData]類型废士,key為executorId,value為對應(yīng)executor的信息矗蕊,包括host氢架、RPC信息、totalCores卿操、freeCores缎玫。

在客戶端向Master注冊Application的時候筝家,Master已經(jīng)為Application分配并啟動好Executor莹菱,然后注冊給CoarseGrainedSchedulerBackend蜜徽,注冊信息就是存儲在executorDataMap數(shù)據(jù)結(jié)構(gòu)中盆色。

launchTasks(scheduler.resourceOffers(workOffers))

先看里面的scheduler.resourceOffers(workOffers)摩梧,TaskSchedulerImpl調(diào)用resourceOffers方法通過準(zhǔn)備好的資源獲得要被執(zhí)行的Seq[TaskDescription],交給CoarseGrainedSchedulerBackend分發(fā)到各個executor上執(zhí)行。下面看具體實現(xiàn):

def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    //標(biāo)記是否有新的executor加入
    var newExecAvail = false
    // 更新executor城舞,host,rack信息
    for (o <- offers) {
      executorIdToHost(o.executorId) = o.host
      executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
      if (!executorsByHost.contains(o.host)) {
        executorsByHost(o.host) = new HashSet[String]()
        executorAdded(o.executorId, o.host)
        newExecAvail = true
      }
      for (rack <- getRackForHost(o.host)) {
        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
      }
    }

    // 隨機(jī)打亂offers随闺,避免多個task集中分配到某些節(jié)點上散罕,為了負(fù)載均衡
    val shuffledOffers = Random.shuffle(offers)
    // 建一個二維數(shù)組硫椰,保存每個Executor上將要分配的那些task
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    //每個executor上可用的cores
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    //返回排序過的TaskSet隊列奕翔,有FIFO及Fair兩種排序規(guī)則驾窟,默認(rèn)為FIFO
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      if (newExecAvail) { // 如果該executor是新分配來的
        taskSet.executorAdded() // 重新計算TaskSetManager的就近原則
      }
    }

    // 利用雙重循環(huán)對每一個taskSet依照調(diào)度的順序,依次按照本地性級別順序嘗試啟動task
    // 根據(jù)taskSet及l(fā)ocality遍歷所有可用的executor,找出可以在各個executor上啟動的task匾荆,
    // 加到tasks:Seq[Seq[TaskDescription]]中
    // 數(shù)據(jù)本地性級別順序:PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    var launchedTask = false
    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
      do {
       //將計算資源按照就近原則分配給taskSet,用于執(zhí)行其中的task
        launchedTask = resourceOfferSingleTaskSet(
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }

跟進(jìn)resourceOfferSingleTaskSet方法:

private def resourceOfferSingleTaskSet(
      taskSet: TaskSetManager,
      maxLocality: TaskLocality,
      shuffledOffers: Seq[WorkerOffer],
      availableCpus: Array[Int],
      tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
    var launchedTask = false
    //遍歷所有executor
    for (i <- 0 until shuffledOffers.size) {
      val execId = shuffledOffers(i).executorId
      val host = shuffledOffers(i).host
      //若當(dāng)前executor可用的core數(shù)滿足一個task所需的core數(shù)
      if (availableCpus(i) >= CPUS_PER_TASK) {
        try {
          //獲取taskSet哪些task可以在該executor上啟動
          for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
            //將需要在該executor啟動的task添加到tasks中
            tasks(i) += task 
            val tid = task.taskId 
            taskIdToTaskSetManager(tid) = taskSet // task -> taskSetManager
            taskIdToExecutorId(tid) = execId // task -> executorId
            executorIdToTaskCount(execId) += 1 //該executor上的task+1
            executorsByHost(host) += execId // host -> executorId
            availableCpus(i) -= CPUS_PER_TASK //該executor上可用core數(shù)減去對應(yīng)task的core數(shù)
            assert(availableCpus(i) >= 0)
            launchedTask = true
          }
        } catch {
          case e: TaskNotSerializableException =>
            logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
            // Do not offer resources for this task, but don't throw an error to allow other
            // task sets to be submitted.
            return launchedTask
        }
      }
    }
    return launchedTask
  }

這個方法主要是遍歷所有可用的executor吊奢,在core滿足一個task所需core的條件下裹驰,通過resourceOffer方法獲取taskSet能在該executor上啟動的task沪饺,并添加到tasks中予以返回。下面具體看resourceOffer的實現(xiàn):

def resourceOffer(
      execId: String,
      host: String,
      maxLocality: TaskLocality.TaskLocality)
    : Option[TaskDescription] =
  {
    if (!isZombie) {
      val curTime = clock.getTimeMillis()

      var allowedLocality = maxLocality

      if (maxLocality != TaskLocality.NO_PREF) {
        allowedLocality = getAllowedLocalityLevel(curTime)
        if (allowedLocality > maxLocality) {
          // We're not allowed to search for farther-away tasks
          allowedLocality = maxLocality
        }
      }

      dequeueTask(execId, host, allowedLocality) match {
        case Some((index, taskLocality, speculative)) =>
          // Found a task; do some bookkeeping and return a task description
          val task = tasks(index)
          val taskId = sched.newTaskId()
          // Do various bookkeeping
          copiesRunning(index) += 1
          val attemptNum = taskAttempts(index).size
          val info = new TaskInfo(taskId, index, attemptNum, curTime,
            execId, host, taskLocality, speculative)
          taskInfos(taskId) = info
          taskAttempts(index) = info :: taskAttempts(index)
          // Update our locality level for delay scheduling
          // NO_PREF will not affect the variables related to delay scheduling
          if (maxLocality != TaskLocality.NO_PREF) {
            currentLocalityIndex = getLocalityIndex(taskLocality)
            lastLaunchTime = curTime
          }
          // Serialize and return the task
          val startTime = clock.getTimeMillis()
          val serializedTask: ByteBuffer = try {
            Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
          } catch {
            // If the task cannot be serialized, then there's no point to re-attempt the task,
            // as it will always fail. So just abort the whole task-set.
            case NonFatal(e) =>
              val msg = s"Failed to serialize task $taskId, not attempting to retry it."
              logError(msg, e)
              abort(s"$msg Exception during serialization: $e")
              throw new TaskNotSerializableException(e)
          }
          if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
              !emittedTaskSizeWarning) {
            emittedTaskSizeWarning = true
            logWarning(s"Stage ${task.stageId} contains a task of very large size " +
              s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
              s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
          }
          addRunningTask(taskId)

          // We used to log the time it takes to serialize the task, but task size is already
          // a good proxy to task serialization time.
          // val timeTaken = clock.getTime() - startTime
          val taskName = s"task ${info.id} in stage ${taskSet.id}"
          logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," +
            s" $taskLocality, ${serializedTask.limit} bytes)")

          sched.dagScheduler.taskStarted(task, info)
          return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
            taskName, index, serializedTask))
        case _ =>
      }
    }
    None
  }
 if (maxLocality != TaskLocality.NO_PREF) {
        allowedLocality = getAllowedLocalityLevel(curTime)
        if (allowedLocality > maxLocality) {
          // We're not allowed to search for farther-away tasks
          allowedLocality = maxLocality
        }
      }

getAllowedLocalityLevel(curTime)會根據(jù)延遲調(diào)度調(diào)整合適的Locality赦役,目的都是盡可能的以最好的locality來啟動每一個task,getAllowedLocalityLevel返回的是當(dāng)前taskSet中所有未執(zhí)行的task的最高locality玩讳,以該locality作為本次調(diào)度能容忍的最差locality豆巨,在后續(xù)的搜索中只搜索本地性比這個級別好的情況吭服。allowedLocality 最終取以getAllowedLocalityLevel(curTime)返回的locality和maxLocality中級別較高的locality串塑。

根據(jù)allowedLocality尋找合適的task闺骚,若返回不為空,則說明在該executor上分配了task,然后進(jìn)行信息跟新菠发,將taskid加入到runningTask中踱稍,跟新延遲調(diào)度信息,序列化task掏觉,通知DAGScheduler,最后返回taskDescription蕊玷,我們來看看dequeueTask的實現(xiàn):

private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value, Boolean)] =
  {
    for (index <- dequeueTaskFromList(execId, getPendingTasksForExecutor(execId))) {
      return Some((index, TaskLocality.PROCESS_LOCAL, false))
    }

    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
      for (index <- dequeueTaskFromList(execId, getPendingTasksForHost(host))) {
        return Some((index, TaskLocality.NODE_LOCAL, false))
      }
    }

    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
      // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
      for (index <- dequeueTaskFromList(execId, pendingTasksWithNoPrefs)) {
        return Some((index, TaskLocality.PROCESS_LOCAL, false))
      }
    }

    if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
      for {
        rack <- sched.getRackForHost(host)
        index <- dequeueTaskFromList(execId, getPendingTasksForRack(rack))
      } {
        return Some((index, TaskLocality.RACK_LOCAL, false))
      }
    }

    if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
      for (index <- dequeueTaskFromList(execId, allPendingTasks)) {
        return Some((index, TaskLocality.ANY, false))
      }
    }

    // find a speculative task if all others tasks have been scheduled
    dequeueSpeculativeTask(execId, host, maxLocality).map {
      case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
  }

首先看是否存在execId對應(yīng)的PROCESS_LOCAL類別的任務(wù)运悲,如果存在,取出來調(diào)度恃鞋,如果不存在荠呐,只在比allowedLocality大或者等于的級別上去查看是否存在execId對應(yīng)類別的任務(wù),若有則調(diào)度芒炼。

其中的dequeueTaskFromList是從execId對應(yīng)類別(如PROCESS_LOCAL)的任務(wù)列表中尾部取出一個task返回其在taskSet中的taskIndex,跟進(jìn)該方法:

private def dequeueTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int] = {
    var indexOffset = list.size
    while (indexOffset > 0) {
      indexOffset -= 1
      val index = list(indexOffset)
      if (!executorIsBlacklisted(execId, index)) {
        // This should almost always be list.trimEnd(1) to remove tail
        list.remove(indexOffset)
        if (copiesRunning(index) == 0 && !successful(index)) {
          return Some(index)
        }
      }
    }
    None
  }

這里有個黑名單機(jī)制,利用executorIsBlacklisted方法查看該executor是否屬于task的黑名單,黑名單記錄task上一次失敗所在的Executor Id和Host浪漠,以及其對應(yīng)的“黑暗”時間,“黑暗”時間是指這段時間內(nèi)不要再往這個節(jié)點上調(diào)度這個Task了。

private def executorIsBlacklisted(execId: String, taskId: Int): Boolean = {
    if (failedExecutors.contains(taskId)) {
      val failed = failedExecutors.get(taskId).get
      return failed.contains(execId) &&
        clock.getTimeMillis() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
    }
    false
  }

可以看到在dequeueTask方法的最后一段代碼:

 // find a speculative task if all others tasks have been scheduled
    dequeueSpeculativeTask(execId, host, maxLocality).map {
      case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}

這里是啟動推測執(zhí)行涩堤,推測任務(wù)是指對一個Task在不同的Executor上啟動多個實例上岗,如果有Task實例運行成功台夺,則會干掉其他Executor上運行的實例薯定,只會對運行慢的任務(wù)啟動推測任務(wù)盏浇。

通過scheduler.resourceOffers(workOffers)方法返回了在哪些executor上啟動哪些task的Seq[Seq[TaskDescription]]信息后,將調(diào)用launchTasks來啟動各個task,實現(xiàn)如下:

private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        val serializedTask = ser.serialize(task)
        if (serializedTask.limit >= maxRpcMessageSize) {
          scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.rpc.message.maxSize (%d bytes). Consider increasing " +
                "spark.rpc.message.maxSize or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
              taskSetMgr.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        }
        else {
          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK

          logInfo(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
            s"${executorData.executorHost}.")

          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        }
      }
    }

先將task進(jìn)行序列化几莽, 如果當(dāng)前task序列化后的大小超過了128MB-200KB矾策,跳過當(dāng)前task,并把對應(yīng)的taskSetManager置為zombie模式玩敏,若大小不超過限制妻坝,則發(fā)送消息到executor啟動task執(zhí)行。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末执解,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌横殴,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,013評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件筹燕,死亡現(xiàn)場離奇詭異忍捡,居然都是意外死亡诗芜,警方通過查閱死者的電腦和手機(jī)胳蛮,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,205評論 2 382
  • 文/潘曉璐 我一進(jìn)店門次询,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人虹蓄,你說我怎么就攤上這事罪佳。” “怎么了婴梧?”我有些...
    開封第一講書人閱讀 152,370評論 0 342
  • 文/不壞的土叔 我叫張陵,是天一觀的道長。 經(jīng)常有香客問我畜挥,道長,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,168評論 1 278
  • 正文 為了忘掉前任惕耕,我火速辦了婚禮,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘腐芍。我一直安慰自己犀被,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 64,153評論 5 371
  • 文/花漫 我一把揭開白布诗力。 她就那樣靜靜地躺著纳鼎,像睡著了一般登舞。 火紅的嫁衣襯著肌膚如雪践叠。 梳的紋絲不亂的頭發(fā)上弄捕,一...
    開封第一講書人閱讀 48,954評論 1 283
  • 那天悦陋,我揣著相機(jī)與錄音哪替,去河邊找鬼括享。 笑死铃辖,一個胖子當(dāng)著我的面吹牛猪叙,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播犬第,決...
    沈念sama閱讀 38,271評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼藏否,長吁一口氣:“原來是場噩夢啊……” “哼副签!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起淆储,我...
    開封第一講書人閱讀 36,916評論 0 259
  • 序言:老撾萬榮一對情侶失蹤本砰,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后舔株,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體还棱,經(jīng)...
    沈念sama閱讀 43,382評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,877評論 2 323
  • 正文 我和宋清朗相戀三年办铡,在試婚紗的時候發(fā)現(xiàn)自己被綠了寡具。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 37,989評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡框喳,死狀恐怖厦坛,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情拼余,我是刑警寧澤亩歹,帶...
    沈念sama閱讀 33,624評論 4 322
  • 正文 年R本政府宣布小作,位于F島的核電站,受9級特大地震影響达罗,放射性物質(zhì)發(fā)生泄漏静秆。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,209評論 3 307
  • 文/蒙蒙 一扶认、第九天 我趴在偏房一處隱蔽的房頂上張望殊橙。 院中可真熱鬧,春花似錦叠纹、人聲如沸敞葛。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,199評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽豺鼻。三九已至,卻和暖如春谬莹,著一層夾襖步出監(jiān)牢的瞬間桩了,已是汗流浹背井誉。 一陣腳步聲響...
    開封第一講書人閱讀 31,418評論 1 260
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留喳钟,地道東北人在岂。 一個月前我還...
    沈念sama閱讀 45,401評論 2 352
  • 正文 我出身青樓蔽午,卻偏偏與公主長得像,于是被迫代替她去往敵國和親抽莱。 傳聞我的和親對象是個殘疾皇子骄恶,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,700評論 2 345

推薦閱讀更多精彩內(nèi)容