spark源碼分析之Executor啟動與任務提交篇

任務提交流程

概述

在闡明了Spark的Master的啟動流程與Worker啟動流程芹橡。接下繼續(xù)執(zhí)行的就是Worker上的Executor進程了韩玩,本文繼續(xù)分析整個Executor的啟動與任務提交流程

Spark-submit

提交一個任務到集群通過的是Spark-submit
通過啟動腳本的方式啟動它的主類佑吝,這里以WordCount為例子
`spark-submit --class cn.itcast.spark.WordCount``

  1. bin/spark-clas -> org.apache.spark.deploy.SparkSubmit 調(diào)用這個類的main方法

  2. doRunMain方法中傳進來一個自定義spark應用程序的main方法
    class cn.kinge.spark.WordCount

  3. 通過反射拿到類的實例的引用mainClass = Utils.classForName(childMainClass)

  4. 在通過反射調(diào)用class cn.kinge.spark.WordCountmain方法

我們來看SparkSubmit的main方法

  def main(args: Array[String]): Unit = {
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      printStream.println(appArgs)
    }
    //匹配任務類型
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }

這里的類型是submit骨饿,調(diào)用submit方法

  private[spark] def submit(args: SparkSubmitArguments): Unit = {
    val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)

   
    def doRunMain(): Unit = {
     围段。顾翼。。奈泪。适贸。。
        try {
          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              //childMainClass這個你自己定義的App的main所在的全類名
              runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
            }
          })
        } catch {
            涝桅。拜姿。。冯遂。蕊肥。。        
        }
      }        
        蛤肌。壁却。批狱。。儒洛。精耐。。
         //掉用上面的doRunMain
        doRunMain()
    }

submit里調(diào)用了doRunMain()琅锻,然后調(diào)用了runMain卦停,來看runMain

  private def runMain(
    。恼蓬。惊完。。处硬。小槐。

    try {
      //通過反射
      mainClass = Class.forName(childMainClass, true, loader)
    } catch {
        。荷辕。凿跳。。疮方。控嗜。
    }

    //反射拿到面方法實例
    val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
    if (!Modifier.isStatic(mainMethod.getModifiers)) {
      throw new IllegalStateException("The main method in the given main class must be static")
    }

    。骡显。疆栏。。惫谤。壁顶。

    try {
      //調(diào)用App的main方法
      mainMethod.invoke(null, childArgs.toArray)
    } catch {
      case t: Throwable =>
        throw findCause(t)
    }
  }

最主要的流程就在這里了,上面的代碼注釋很清楚溜歪,通過反射調(diào)用我們寫的類的main方法若专,大體的流程到此

SparkSubmit時序圖

Executor啟動流程

SparkSubmit通過反射調(diào)用了我們程序的main方法后,就開始執(zhí)行我們的代碼
蝴猪,一個Spark程序中需要創(chuàng)建SparkContext對象调衰,我們就從這個對象開始

SparkContext的構(gòu)造方法代碼很長,主要關(guān)注的地方如下

class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
  拯腮。窖式。。动壤。萝喘。。
    
 private[spark] def createSparkEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus): SparkEnv = {
    //通過SparkEnv來創(chuàng)建createDriverEnv
    SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
  }
  //在這里調(diào)用了createSparkEnv,返回一個SparkEnv對象阁簸,這個對象里面有很多重要屬性爬早,最重要的ActorSystem
  private[spark] val env = createSparkEnv(conf, isLocal, listenerBus)
  SparkEnv.set(env)

  //創(chuàng)建taskScheduler
  // Create and start the scheduler
  private[spark] var (schedulerBackend, taskScheduler) =
    SparkContext.createTaskScheduler(this, master)

  //創(chuàng)建DAGScheduler
  dagScheduler = new DAGScheduler(this)

  //啟動TaksScheduler
  taskScheduler.start()
    。启妹。筛严。。饶米。
}

Spark的構(gòu)造方法主要干三件事桨啃,創(chuàng)建了一個SparkEnv,taskScheduler檬输,dagScheduler照瘾,我們先來看createTaskScheduler里干了什么

 //通過給定的URL創(chuàng)建TaskScheduler
  private def createTaskScheduler(
     .....

    //匹配URL選擇不同的方式
    master match {
         。丧慈。析命。。逃默。鹃愤。

      //這個是Spark的Standalone模式
      case SPARK_REGEX(sparkUrl) =>
        //首先創(chuàng)建TaskScheduler
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)

        //很重要
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        //初始化了一個調(diào)度器,默認是FIFO
        scheduler.initialize(backend)
        (backend, scheduler)

        完域。软吐。。筒主。关噪。
    }        
}

通過master的url來匹配到Standalone模式:然后初始化了SparkDeploySchedulerBackendTaskSchedulerImpl鸟蟹,這兩個對象很重要乌妙,是啟動任務調(diào)度的核心,然后調(diào)用了scheduler.initialize(backend)進行初始化

啟動TaksScheduler初始化完成建钥,回到我們的SparkContext構(gòu)造方法后面繼續(xù)調(diào)用了
taskScheduler.start() 啟動TaksScheduler
來看start方法

override def start() {
    //調(diào)用backend的實現(xiàn)的start方法
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      import sc.env.actorSystem.dispatcher
      sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
            SPECULATION_INTERVAL milliseconds) {
        Utils.tryOrExit { checkSpeculatableTasks() }
      }
    }
  }

這里的backend是SparkDeploySchedulerBackend調(diào)用了它的start

override def start() {
    //CoarseGrainedSchedulerBackend的start方法藤韵,在這個方法里面創(chuàng)建了一個DriverActor
    super.start()

    // The endpoint for executors to talk to us
    //下面是為了啟動java子進程做準備,準備一下參數(shù)
    val driverUrl = AkkaUtils.address(
      AkkaUtils.protocol(actorSystem),
      SparkEnv.driverActorSystemName,
      conf.get("spark.driver.host"),
      conf.get("spark.driver.port"),
      CoarseGrainedSchedulerBackend.ACTOR_NAME)
    val args = Seq(
      "--driver-url", driverUrl,
      "--executor-id", "{{EXECUTOR_ID}}",
      "--hostname", "{{HOSTNAME}}",
      "--cores", "{{CORES}}",
      "--app-id", "{{APP_ID}}",
      "--worker-url", "{{WORKER_URL}}")
    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
      .map(Utils.splitCommandString).getOrElse(Seq.empty)
    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)

    // When testing, expose the parent class path to the child. This is processed by
    // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
    // when the assembly is built with the "*-provided" profiles enabled.
    val testingClassPath =
      if (sys.props.contains("spark.testing")) {
        sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
      } else {
        Nil
      }

    // Start executors with a few necessary configs for registering with the scheduler
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts

    //用command拼接參數(shù)熊经,最終會啟動org.apache.spark.executor.CoarseGrainedExecutorBackend子進程
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")

    //用ApplicationDescription封裝了一些重要的參數(shù)
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      appUIAddress, sc.eventLogDir, sc.eventLogCodec)

    //在這里面創(chuàng)建ClientActor
    client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
    //啟動ClientActor
    client.start()

    waitForRegistration()
  }

這里是拼裝了啟動Executor的一些參數(shù)泽艘,類名+參數(shù) 封裝成ApplicationDescription。最后傳給并創(chuàng)建AppClient并調(diào)用它的start方法

AppClient創(chuàng)建時序圖

AppClient的start方法

接來下關(guān)注start方法

  def start() {
    // Just launch an actor; it will call back into the listener.
   
    actor = actorSystem.actorOf(Props(new ClientActor))
  }

在start方法里創(chuàng)建了與Master通信的ClientActor,然后會調(diào)用它的preStart方法向Master注冊,接下來看它的preStart

  override def preStart() {
      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
      try {
        //ClientActor向Master注冊
        registerWithMaster()
      } catch {
        case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          context.stop(self)
      }
    }

最后會調(diào)用該方法向所有Master注冊

    def tryRegisterAllMasters() {
      for (masterAkkaUrl <- masterAkkaUrls) {
        logInfo("Connecting to master " + masterAkkaUrl + "...")
        //t通過actorSelection拿到了Master的引用
        val actor = context.actorSelection(masterAkkaUrl)
        //向Master發(fā)送異步的注冊App的消息
        actor ! RegisterApplication(appDescription)
      }
    }

ClientActor發(fā)送來的注冊App的消息镐依,ApplicationDescription匹涮,他包含了需求的資源,要求啟動的Executor類名和一些參數(shù)
Master的Receiver

 case RegisterApplication(description) => {
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        //創(chuàng)建App  sender:ClientActor
        val app = createApplication(description, sender)
        //注冊App
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        //持久化App
        persistenceEngine.addApplication(app)
        //向ClientActor反饋信息槐壳,告訴他app注冊成功了
        sender ! RegisteredApplication(app.id, masterUrl)
        //TODO 調(diào)度任務
        schedule()
      }
    }

registerApplication(app)

 def registerApplication(app: ApplicationInfo): Unit = {
    val appAddress = app.driver.path.address
    if (addressToApp.contains(appAddress)) {
      logInfo("Attempted to re-register application at same address: " + appAddress)
      return
    }
    //把App放到集合里面
    applicationMetricsSystem.registerSource(app.appSource)
    apps += app
    idToApp(app.id) = app
    actorToApp(app.driver) = app
    addressToApp(appAddress) = app
    waitingApps += app
  }

Master將接受的信息保存到集合并序列化后發(fā)送一個RegisteredApplication消息通知反饋給ClientActor然低,接著執(zhí)行schedule()方法,該方法中會遍歷workers集合,并執(zhí)行l(wèi)aunchExecutor

  def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    //記錄該worker上使用了多少資源
    worker.addExecutor(exec)
    //Master向Worker發(fā)送啟動Executor的消息
    worker.actor ! LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
    //Master向ClientActor發(fā)送消息,告訴ClientActor executor已經(jīng)啟動了
    exec.application.driver ! ExecutorAdded(
      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
  }

這里Master向Worker發(fā)送啟動Executor的消息
worker.actor ! LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
application.desc里包含了Executor類的啟動信息

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
         雳攘。带兜。。吨灭。刚照。

          appDirectories(appId) = appLocalDirs
          //創(chuàng)建一個ExecutorRunner,這個很重要喧兄,保存了Executor的執(zhí)行配置和參數(shù)
          val manager = new ExecutorRunner(
            appId,
            execId,
            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
            cores_,
            memory_,
            self,
            workerId,
            host,
            webUi.boundPort,
            publicAddress,
            sparkHome,
            executorDir,
            akkaUrl,
            conf,
            appLocalDirs, ExecutorState.LOADING)
          executors(appId + "/" + execId) = manager
          //TODO 開始啟動ExecutorRunner
          manager.start()
         
            无畔。。吠冤。檩互。。咨演。
          }
        }
      }

Worker的Receiver接受到了啟動Executor的消息闸昨,appDesc對象保存了Command命令、Executor的實現(xiàn)類和參數(shù)

manager.start()里會創(chuàng)建一個線程

  def start() {
    //啟動一個線程
    workerThread = new Thread("ExecutorRunner for " + fullId) {
      //用一個子線程來幫助Worker啟動Executor子進程
      override def run() { fetchAndRunExecutor() }
    }
    workerThread.start()
    // Shutdown hook that kills actors on shutdown.
    shutdownHook = new Thread() {
      override def run() {
        killProcess(Some("Worker shutting down"))
      }
    }
    Runtime.getRuntime.addShutdownHook(shutdownHook)
  }

在線程中調(diào)用了fetchAndRunExecutor()方法薄风,我們來看該方法

def fetchAndRunExecutor() {
    try {
      // Launch the process
      val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,
        sparkHome.getAbsolutePath, substituteVariables)
      //構(gòu)建命令
      val command = builder.command()
      logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))

      builder.directory(executorDir)
      builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))
      // In case we are running this from within the Spark Shell, avoid creating a "scala"
      // parent process for the executor command
      builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

      // Add webUI log urls
      val baseUrl =
        s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
      builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
      builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

      //啟動子進程
      process = builder.start()
      val header = "Spark Executor Command: %s\n%s\n\n".format(
        command.mkString("\"", "\" \"", "\""), "=" * 40)

      // Redirect its stdout and stderr to files
      val stdout = new File(executorDir, "stdout")
      stdoutAppender = FileAppender(process.getInputStream, stdout, conf)

      val stderr = new File(executorDir, "stderr")
      Files.write(header, stderr, UTF_8)
      stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

      // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
      // or with nonzero exit code
      //開始執(zhí)行饵较,等待結(jié)束信號
      val exitCode = process.waitFor()
      。遭赂。循诉。。
    }
  }

這里面進行了類名和參數(shù)的拼裝撇他,具體拼裝過程不用關(guān)心茄猫,最終builder.start()會以SystemRuntime的方式啟動一個子進程,這個是進程的類名是CoarseGrainedExecutorBackend
到此Executor進程就啟動起來了

Executor創(chuàng)建時序圖

Executor任務調(diào)度對象啟動

Executor進程后困肩,就首先要執(zhí)行main方法划纽,main的代碼如下

  //Executor進程啟動的入口
  def main(args: Array[String]) {
      。锌畸。勇劣。。
    
    //拼裝參數(shù)
    while (!argv.isEmpty) {
      argv match {
        case ("--driver-url") :: value :: tail =>
          driverUrl = value
          argv = tail
        case ("--executor-id") :: value :: tail =>
          executorId = value
          argv = tail
        case ("--hostname") :: value :: tail =>
          hostname = value
          argv = tail
        case ("--cores") :: value :: tail =>
          cores = value.toInt
          argv = tail
        case ("--app-id") :: value :: tail =>
          appId = value
          argv = tail
        case ("--worker-url") :: value :: tail =>
          // Worker url is used in spark standalone mode to enforce fate-sharing with worker
          workerUrl = Some(value)
          argv = tail
        case ("--user-class-path") :: value :: tail =>
          userClassPath += new URL(value)
          argv = tail
        case Nil =>
        case tail =>
          System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
          printUsageAndExit()
      }
    }

    if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
      appId == null) {
      printUsageAndExit()
    }
    //開始執(zhí)行Executor
    run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
  }

執(zhí)行了run方法

  private def run(
      driverUrl: String,
      executorId: String,
      hostname: String,
      cores: Int,
      appId: String,
      workerUrl: Option[String],
      userClassPath: Seq[URL])   
        
        潭枣。比默。。盆犁。命咐。
        
      //通過actorSystem創(chuàng)建CoarseGrainedExecutorBackend -> Actor
      //CoarseGrainedExecutorBackend -> DriverActor通信
      env.actorSystem.actorOf(
        Props(classOf[CoarseGrainedExecutorBackend],
          driverUrl, executorId, sparkHostPort, cores, userClassPath, env),
        name = "Executor")
     
            。谐岁。醋奠。瓮下。。钝域。
      }
      env.actorSystem.awaitTermination()
    }
  }

run方法中創(chuàng)建了CoarseGrainedExecutorBackend的Actor對象用于準備和DriverActor通信讽坏,接著會繼續(xù)調(diào)用preStart生命周期方法

  override def preStart() {
    logInfo("Connecting to driver: " + driverUrl)
    //Executor跟DriverActor建立連接
    driver = context.actorSelection(driverUrl)
    //Executor向DriverActor發(fā)送消息
    driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
  }

Executor向DriverActor發(fā)送注冊的消息
driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)

DriverActor的receiver收到消息后

def receiveWithLogging = {
      //Executor發(fā)送給DriverActor的注冊消息
      case RegisterExecutor(executorId, hostPort, cores, logUrls) =>
        Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
        if (executorDataMap.contains(executorId)) {
          sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
        } else {
          logInfo("Registered executor: " + sender + " with ID " + executorId)
          //DriverActor向Executor發(fā)送注冊成功的消息
          sender ! RegisteredExecutor
            
          addressToExecutorId(sender.path.address) = executorId
          totalCoreCount.addAndGet(cores)
          totalRegisteredExecutors.addAndGet(1)
          val (host, _) = Utils.parseHostPort(hostPort)
         
         //將Executor的信息封裝起來
          val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls)
          // This must be synchronized because variables mutated
          // in this block are read when requesting executors
          CoarseGrainedSchedulerBackend.this.synchronized {
            //往集合添加Executor的信息對象
            executorDataMap.put(executorId, data)
            if (numPendingExecutors > 0) {
              numPendingExecutors -= 1
              logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
            }
          }
          listenerBus.post(
            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))

          //將來用來執(zhí)行真正的業(yè)務邏輯
          makeOffers()
        }

DriverActor的receiver里將Executor信息封裝到Map中保存起來,并發(fā)送反饋消息 sender ! RegisteredExecutor
給CoarseGrainedExecutorBackend

override def receiveWithLogging = {
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      val (hostname, _) = Utils.parseHostPort(hostPort)
      executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)

CoarseGrainedExecutorBackend收到消息后創(chuàng)建一個Executor對象用于準備任務的執(zhí)行例证,到此Executor 就已經(jīng)成功啟動了路呜,接下來就是等待任務的調(diào)度與執(zhí)行。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末织咧,一起剝皮案震驚了整個濱河市胀葱,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌笙蒙,老刑警劉巖抵屿,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異捅位,居然都是意外死亡轧葛,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進店門艇搀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來尿扯,“玉大人,你說我怎么就攤上這事焰雕∩” “怎么了袱讹?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵栓撞,是天一觀的道長舔琅。 經(jīng)常有香客問我,道長吝秕,這世上最難降的妖魔是什么泊脐? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮郭膛,結(jié)果婚禮上晨抡,老公的妹妹穿的比我還像新娘氛悬。我一直安慰自己则剃,他們只是感情好,可當我...
    茶點故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布如捅。 她就那樣靜靜地躺著棍现,像睡著了一般。 火紅的嫁衣襯著肌膚如雪镜遣。 梳的紋絲不亂的頭發(fā)上己肮,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天士袄,我揣著相機與錄音,去河邊找鬼谎僻。 笑死娄柳,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的艘绍。 我是一名探鬼主播赤拒,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼诱鞠!你這毒婦竟也來了挎挖?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤航夺,失蹤者是張志新(化名)和其女友劉穎蕉朵,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體阳掐,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡始衅,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了缭保。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片觅闽。...
    茶點故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖涮俄,靈堂內(nèi)的尸體忽然破棺而出蛉拙,到底是詐尸還是另有隱情,我是刑警寧澤彻亲,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布孕锄,位于F島的核電站,受9級特大地震影響苞尝,放射性物質(zhì)發(fā)生泄漏畸肆。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一宙址、第九天 我趴在偏房一處隱蔽的房頂上張望轴脐。 院中可真熱鬧,春花似錦抡砂、人聲如沸大咱。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽碴巾。三九已至,卻和暖如春丑搔,著一層夾襖步出監(jiān)牢的瞬間厦瓢,已是汗流浹背提揍。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留煮仇,地道東北人劳跃。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像浙垫,于是被迫代替她去往敵國和親售碳。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容