Spark Streaming 數(shù)據(jù)計算階段分析

—————?—————?—————?—————?—————?—————
Spark Streaming概述
Spark Streaming 初始化過程
Spark Streaming Receiver啟動過程分析
Spark Streaming 數(shù)據(jù)準備階段分析(Receiver方式)
Spark Streaming 數(shù)據(jù)計算階段分析
SparkStreaming Backpressure分析
Spark Streaming Executor DynamicAllocation 機制分析

—————?—————?—————?—————?—————?—————

SparkStreaming的全過程分為兩個階段:數(shù)據(jù)準備階段和數(shù)據(jù)計算階段坎怪。兩個階段在功能上相互獨立,僅通過數(shù)據(jù)聯(lián)系在一起廓握。"Spark Streaming 數(shù)據(jù)準備階段分析"從源碼角度分析了Spark Streaming數(shù)據(jù)準備階段的具體流程搅窿。本文將從源碼的角度對數(shù)據(jù)計算階段的具體流程進行分析嘁酿。
Spark Streaming數(shù)據(jù)計算階段包含批次數(shù)據(jù)劃分,批作業(yè)生成男应,批wt提交三個部分闹司。

1、 JobGenerator 啟動

JobGenerator用于定期生成Job并進行提交 殉了。"Spark Streaming 初始化過程分析"中提到开仰,在啟動JobScheduler時拟枚,其會調(diào)用JobGenerator的start方法薪铜,啟動JobGenerator.
JobGenerator的start方法實現(xiàn)如下:

  /** Start generation of jobs */
  def start(): Unit = synchronized {
    if (eventLoop != null) return // generator has already been started

    // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
    // See SPARK-10125
    checkpointWriter

    eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = {
        jobScheduler.reportError("Error in job generator", e)
      }
    }
    eventLoop.start()

    if (ssc.isCheckpointPresent) {
      restart()
    } else {
      startFirstTime()
    }
  }

通過分析上述代碼可知,在JobGenerator.start()被調(diào)用時恩溅,其將創(chuàng)建

  • eventLoop對象并啟動隔箍,其中eventLoop定義事件交由processEvent(event).
    processEvent其依據(jù)事件的類型,對其進行不同的處理脚乡。
  • 調(diào)用startFirstTime()方法蜒滩。通過分析startFirstTime的實現(xiàn)邏輯,可知其進行兩項主要工作:
  • 調(diào)用 timer.start方法奶稠、 定期生成Job
  • 調(diào)用graph.start方法
  /** Starts the generator for the first time */
  private def startFirstTime() {
    val startTime = new Time(timer.getStartTime())
    graph.start(startTime - graph.batchDuration)
    timer.start(startTime.milliseconds)
    logInfo("Started JobGenerator at " + startTime)
  }

下面我分別對這二者進行解析俯艰。

2、Job 生成及提交

2.1 周期性觸發(fā)Job生成事件

startFirstTime()方法中調(diào)用了timer.start方法锌订,其中timer[RecurringTimer]為定時器竹握,與Spark Streaming 數(shù)據(jù)準備階段分析一文中介紹切片時所有定時器一樣。其按設置的時間周期辆飘,重復的執(zhí)行計劃的任務啦辐。此處Timer的具體實現(xiàn)為:

  private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

其每個batchDuration規(guī)定時間,都會向eventLoop發(fā)送一GenerateJobs事件蜈项,eventLoop收到GenerateJobs事件芹关,則使用processEvent進行相應處理,此處為調(diào)用 generateJobs()方法 ,生成job.

  /** Processes all events */
  private def processEvent(event: JobGeneratorEvent) {
    logDebug("Got event " + event)
    event match {
      case GenerateJobs(time) => generateJobs(time)
      case ClearMetadata(time) => clearMetadata(time)
      case DoCheckpoint(time, clearCheckpointDataLater) =>
        doCheckpoint(time, clearCheckpointDataLater)
      case ClearCheckpointData(time) => clearCheckpointData(time)
    }
  }

2.2 Job生成詳細過程

上文搞到Timer分周期性的觸發(fā)Job生成事件紧卒,并通過generateJobs來生成Job.
JobGenerator在每個Batch Interval都會為應用中的每個Output Stream建立一個Job, 該批次中的所有Job組成一個JobSet.使用JobScheduler的submitJobSet進行批量Job提交侥衬。
下面來分析generateJobs的實現(xiàn)邏輯。

  /** Generate jobs and perform checkpointing for the given `time`.  */
  private def generateJobs(time: Time) {
    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
    // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }

在generateJobs生成Job時跑芳, 其首先通過ReceiverTracker 取得其中注冊的未分配的數(shù)據(jù)信息浇冰。然后通過DStreamGraph生成Job。

2.2.1 批數(shù)據(jù)信息劃分

本部分會將Spark Streaming 數(shù)據(jù)準備階段分析 中生成的未分配的block聋亡,劃歸到某個批次進行處理肘习。具體過程如下:

在生成Job時,首先調(diào)用如下語句:

jobScheduler.receiverTracker.allocateBlocksToBatch(time)

該語句用來劃分某批次(time)要處理的數(shù)據(jù)坡倔。下面對其獲取過程進行詳說說明漂佩。

其中allocateBlocksToBatch的實現(xiàn)如下:


  /** Allocate all unallocated blocks to the given batch. */
  def allocateBlocksToBatch(batchTime: Time): Unit = {
    if (receiverInputStreams.nonEmpty) {
      receivedBlockTracker.allocateBlocksToBatch(batchTime)
    }
  }

其將調(diào)用receivedBlockTracker的allocateBlocksToBatch方法脖含,將未分配數(shù)據(jù)信息取出,并劃分給batchTime所指批次投蝉。首先receivedBlockTracker從streamIdToUnallocatedBlockQueues中取出未分配的block信息养葵,將其包裝為AllocatedBlocks,并注冊在timeToAllocatedBlocks表中瘩缆,等待某批次(batchTime)生成Job時关拒,與Job進行綁定。

  /**
   * Allocate all unallocated blocks to the given batch.
   * This event will get written to the write ahead log (if enabled).
   */
  def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
      val streamIdToBlocks = streamIds.map { streamId =>
          (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
      }.toMap
      val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
      if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
        timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
        lastAllocatedBatchTime = batchTime
      } else {
        logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
      }
    } else {
      // This situation occurs when:
      // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
      // possibly processed batch job or half-processed batch job need to be processed again,
      // so the batchTime will be equal to lastAllocatedBatchTime.
      // 2. Slow checkpointing makes recovered batch time older than WAL recovered
      // lastAllocatedBatchTime.
      // This situation will only occurs in recovery time.
      logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
    }
  }

2.2.2 批作業(yè)(Job)生成

通過graph.generateJobs(time)方法分別將DStreamGraph中的每個OutputStream轉(zhuǎn)換了一個Job(如果應用中有多個OutputStream算子庸娱,則一個批次會生成多個Job)着绊。generateJobs實現(xiàn)邏輯如下:

def generateJobs(time: Time): Seq[Job] = {
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
      outputStreams.flatMap { outputStream =>
        val jobOption = outputStream.generateJob(time)
        jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
    logDebug("Generated " + jobs.length + " jobs for time " + time)
    jobs
  }

通過分析源碼,其將逐個調(diào)用OutputStream的generateJob方法來將每個OutputStream轉(zhuǎn)化為Job. OutputStream不同于其它DStream的地方為其重寫了generateJob方法, 以WordCount程序中使用的print算子中的ForEachDStream為例熟尉,其 generateJob實現(xiàn)如下:

override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

通過程序归露,可以看出,其將調(diào)用父DStream中的getOrCompute方法斤儿,生成RDD剧包,然后包裝成Job。

2.2.2.1 RDD 生成

以WordCount為例往果,先來看一下WordCount應用中DStream的轉(zhuǎn)換疆液,轉(zhuǎn)換關系如下:


WordCount應用中DStream轉(zhuǎn)換關系

通過分析,getOrCompute( compute方法與之類似)方法由DStream基類創(chuàng)建, 如果子類重寫該方法,則執(zhí)行子類方法; 若未重寫,則執(zhí)行基類中的方法陕贮。通過查看上述轉(zhuǎn)換關系鏈中ForEachDStream的父親-ShuffledDStream堕油,發(fā)現(xiàn)其未重寫getOrCompute方法,因此將使用繼承自基類DStream中的getOrCompute飘蚯, 代碼如下馍迄。

  /**
   * Get the RDD corresponding to the given time; either retrieve it from cache
   * or compute-and-cache it.
   */
  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    // If RDD was already generated, then retrieve it from HashMap,
    // or else compute the RDD
    generatedRDDs.get(time).orElse {
      // Compute the RDD if time is valid (e.g. correct time in a sliding window)
      // of RDD generation, else generate nothing.
      if (isTimeValid(time)) {

        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details. We need to have this call here because
          // compute() might cause Spark jobs to be launched.
          SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
            compute(time)
          }
        }

        rddOption.foreach { case newRDD =>
          // Register the generated RDD for caching and checkpointing
          if (storageLevel != StorageLevel.NONE) {
            newRDD.persist(storageLevel)
            logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
          }
          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
            newRDD.checkpoint()
            logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
          }
          generatedRDDs.put(time, newRDD)
        }
        rddOption
      } else {
        None
      }
    }
  }

此代碼將調(diào)用ShuffledDStream的compute生成RDD,其compute實現(xiàn)為:

override def compute(validTime: Time): Option[RDD[(K, C)]] = {
    parent.getOrCompute(validTime) match {
      case Some(rdd) => Some(rdd.combineByKey[C](
          createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
      case None => None
    }
  }

通過分析局骤,其將調(diào)用其父DStream的compute方法攀圈,其父DStream繼續(xù)遞歸向上調(diào)用父DStream的compute直到源頭DStream(SocketInputDStream),
SocketInputDStream的compute方法繼承自ReceiverInputDStream峦甩,其compute方法將生成源頭RDD赘来,并按DStream遞歸逆向生成RDD Graph.

ReceiverInputDStream定義的compute的實現(xiàn)如下:

 /**
   * Generates RDDs with blocks received by the receiver of this stream. */
  override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {

      if (validTime < graph.startTime) {
        // If this is called for any time before the start time of the context,
        // then this returns an empty RDD. This may happen when recovering from a
        // driver failure without any write ahead log to recover pre-failure data.
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
        // for this batch
        val receiverTracker = ssc.scheduler.receiverTracker
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

        // Register the input blocks information into InputInfoTracker
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

        // Create the BlockRDD
        createBlockRDD(validTime, blockInfos)
      }
    }
    Some(blockRDD)
  }

此處,通過如下邏輯

val receiverTracker = ssc.scheduler.receiverTracker 
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

將 2.2.1 節(jié)中提到的劃分過批次的數(shù)據(jù)信息(blockInfos)取出凯傲,包裝成StreamInputInfo犬辰,然后通過createBlockRDD方法生成RDD. 此處,如果blockInfos信息不空冰单,則生成正常的RDD幌缝;若blockInfos為空,則沒有Block的空RDD(new BlockRDD(ssc.sc, Array.empty))诫欠。

2.2.3 Job 的提交

當成功轉(zhuǎn)化為Job之后涵卵,然后通過JobScheduler對JobSet進行提交浴栽。

case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

其中submitJobSet方法實現(xiàn)如下:

  def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
      logInfo("Added jobs for time " + jobSet.time)
    }
  }
  • 首先將JobSet加入JobSets表中,以便監(jiān)控系統(tǒng)可以追蹤轿偎。
  • 將Job通過JobHandler進行包裝典鸡,然后由ThreadPoolExecutor的execute增加到其workQueue中,等待被調(diào)度執(zhí)行坏晦。如果線程池有空閑線程萝玷,則其將被調(diào)度。(此部分為Java并發(fā)編程中Executor的相關內(nèi)容昆婿。)
    其中線程池的定義如下所示:
  private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
  private val jobExecutor =
    ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")

通過分析代碼可知球碉,JobScheduler創(chuàng)建一固定長度的daemon線程池jobExecutor ,大小由“spark.streaming.concurrentJobs”挖诸,默認為1汁尺。 線程池中有多個線程則可以同時執(zhí)行多少個Job法精, 默認情況下每次只能提交一個Job多律。當Job來不及執(zhí)行時,會產(chǎn)生堆集搂蜓,堆集的Job會保存在ThreadPoolExecutor中的workQueue隊列中狼荞,等待有空閑線程時被調(diào)度。

  • JobHandler是ThreadPoolExecutor中Executor運行的主要任務帮碰,其功能是對提交的Job進行處理相味,實現(xiàn)如下, 其將通過EventLoop對Job狀態(tài)進行管理,并通過調(diào)用job.run方法殉挽,使用Job開始運行丰涉。
    def run() {
      val oldProps = ssc.sparkContext.getLocalProperties
      try {
        ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
        val formattedTime = UIUtils.formatBatchTime(
          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
        val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
        val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

        ssc.sc.setJobDescription(
          s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
        // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
        // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
        ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")

        // We need to assign `eventLoop` to a temp variable. Otherwise, because
        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
        // it's possible that when `post` is called, `eventLoop` happens to null.
        var _eventLoop = eventLoop
        if (_eventLoop != null) {
          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details.
          SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
            job.run()
          }
          _eventLoop = eventLoop
          if (_eventLoop != null) {
            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
          }
        } else {
          // JobScheduler has been stopped.
        }
      } finally {
        ssc.sparkContext.setLocalProperties(oldProps)
      }
    }

其中Job.run方法,實現(xiàn)如下:

 def run() {
    _result = Try(func())
  }

其將執(zhí)行創(chuàng)建Job時的方法func斯碌。WordCount應用是ForEachDStream中進行Job創(chuàng)建一死。其創(chuàng)建方法 上文已經(jīng)提到:

override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

分析代碼可知,其將調(diào)用foreachFunc, 該方法是創(chuàng)建ForEachDStream時引入的參數(shù)傻唾,由print方法定義

/**
   * Print the first num elements of each RDD generated in this DStream. This is an output
   * operator, so this DStream will be registered as an output stream and there materialized.
   */
  def print(num: Int): Unit = ssc.withScope {
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum = rdd.take(num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println(s"Time: $time")
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }

其中投慈,會調(diào)用rdd.take()算子, take算子屬于action算子冠骄,會觸發(fā)SparkJob的提交伪煤,接下來的處理流程與spark 批處理相同。
前述生成的Job凛辣,只是Streaming中定義的抽象抱既,與SparkJob(真正進行調(diào)度,生成Task)不同扁誓。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末防泵,一起剝皮案震驚了整個濱河市阳堕,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌择克,老刑警劉巖恬总,帶你破解...
    沈念sama閱讀 216,402評論 6 499
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異肚邢,居然都是意外死亡壹堰,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,377評論 3 392
  • 文/潘曉璐 我一進店門骡湖,熙熙樓的掌柜王于貴愁眉苦臉地迎上來贱纠,“玉大人,你說我怎么就攤上這事响蕴∽缓福” “怎么了?”我有些...
    開封第一講書人閱讀 162,483評論 0 353
  • 文/不壞的土叔 我叫張陵浦夷,是天一觀的道長辖试。 經(jīng)常有香客問我,道長劈狐,這世上最難降的妖魔是什么罐孝? 我笑而不...
    開封第一講書人閱讀 58,165評論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮肥缔,結果婚禮上莲兢,老公的妹妹穿的比我還像新娘。我一直安慰自己续膳,他們只是感情好改艇,可當我...
    茶點故事閱讀 67,176評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著坟岔,像睡著了一般谒兄。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上炮车,一...
    開封第一講書人閱讀 51,146評論 1 297
  • 那天舵变,我揣著相機與錄音,去河邊找鬼瘦穆。 笑死纪隙,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的扛或。 我是一名探鬼主播绵咱,決...
    沈念sama閱讀 40,032評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼熙兔!你這毒婦竟也來了悲伶?” 一聲冷哼從身側(cè)響起艾恼,我...
    開封第一講書人閱讀 38,896評論 0 274
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎麸锉,沒想到半個月后违寿,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體胳施,經(jīng)...
    沈念sama閱讀 45,311評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡供置,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,536評論 2 332
  • 正文 我和宋清朗相戀三年进鸠,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片碱屁。...
    茶點故事閱讀 39,696評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡磷脯,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出娩脾,到底是詐尸還是另有隱情赵誓,我是刑警寧澤,帶...
    沈念sama閱讀 35,413評論 5 343
  • 正文 年R本政府宣布柿赊,位于F島的核電站俩功,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏闹瞧。R本人自食惡果不足惜绑雄,卻給世界環(huán)境...
    茶點故事閱讀 41,008評論 3 325
  • 文/蒙蒙 一展辞、第九天 我趴在偏房一處隱蔽的房頂上張望奥邮。 院中可真熱鬧,春花似錦罗珍、人聲如沸洽腺。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蘸朋。三九已至,卻和暖如春扣唱,著一層夾襖步出監(jiān)牢的瞬間藕坯,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,815評論 1 269
  • 我被黑心中介騙來泰國打工噪沙, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留炼彪,地道東北人。 一個月前我還...
    沈念sama閱讀 47,698評論 2 368
  • 正文 我出身青樓正歼,卻偏偏與公主長得像辐马,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子局义,可洞房花燭夜當晚...
    茶點故事閱讀 44,592評論 2 353

推薦閱讀更多精彩內(nèi)容