10 Spark Streaming源碼解讀之流數(shù)據(jù)不斷接收全生命周期徹底研究和思考

在上一篇中介紹了Receiver在Driver的精妙實(shí)現(xiàn),本篇內(nèi)容主要介紹Receiver在Executor中的啟動(dòng)蚣驼,數(shù)據(jù)接收和存儲(chǔ)

  1. 從ReceiverTracker的start方法開始,調(diào)用launchReceivers()方法,給endpoint發(fā)送消息匣屡,endpoint.send(StartAllReceivers(receivers)),endpoint就是ReceiverTrackerEndpoint拇涤,也可以說是給自己的消息通訊體發(fā)送了一條消息捣作。看接收到的消息
case StartAllReceivers(receivers) =>
        val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
        // 循環(huán)啟動(dòng)receiver
        for (receiver <- receivers) {
          val executors = scheduledLocations(receiver.streamId)
          updateReceiverScheduledExecutors(receiver.streamId, executors)
          receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
          //啟動(dòng)receiver
          startReceiver(receiver, executors)
}

startReceiver(receiver, executors)循環(huán)調(diào)用鹅士,每一個(gè)receiver會(huì)啟動(dòng)一個(gè)job券躁。startReceiver的代碼如下

private def startReceiver(
        receiver: Receiver[_],
        scheduledLocations: Seq[TaskLocation]): Unit = {
      def shouldStartReceiver: Boolean = {
        // It's okay to start when trackerState is Initialized or Started
        !(isTrackerStopping || isTrackerStopped)
      }

      val receiverId = receiver.streamId
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
        return
      }

      val checkpointDirOption = Option(ssc.checkpointDir)
      val serializableHadoopConf = new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

      // Function to start the receiver on the worker node
      // 在worker節(jié)點(diǎn)啟動(dòng)receiver的方法,(就是action中的方法)
      val startReceiverFunc: Iterator[Receiver[_]] => Unit =
        (iterator: Iterator[Receiver[_]]) => {
          if (!iterator.hasNext) {
            throw new SparkException("Could not start receiver as object not found.")
          }
          //判斷task的重試次數(shù)為0如绸,就是沒有task失敗后嘱朽,重試運(yùn)行不執(zhí)行以下代碼
          if (TaskContext.get().attemptNumber() == 0) {
            val receiver = iterator.next()
            assert(iterator.hasNext == false)
            //這里創(chuàng)建接收器管理者,在start方法里啟動(dòng)receiver接收數(shù)據(jù)
            val supervisor = new ReceiverSupervisorImpl(receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
            supervisor.start()
            supervisor.awaitTermination()
          } else {
            // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
          }
        }

      // Create the RDD using the scheduledLocations to run the receiver in a Spark job
      // 創(chuàng)建接收數(shù)據(jù)的RDD
      val receiverRDD: RDD[Receiver[_]] =
        if (scheduledLocations.isEmpty) {
          //
          ssc.sc.makeRDD(Seq(receiver), 1)
        } else {
          // 根據(jù)數(shù)據(jù)本地性創(chuàng)建receiverRDD
          val preferredLocations = scheduledLocations.map(_.toString).distinct
          ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
        }
      // 對(duì)job進(jìn)行一些配置
      receiverRDD.setName(s"Receiver $receiverId")
      ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
      ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
      // 到這里就提交了receiverRDD到集群中
      val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
      // We will keep restarting the receiver job until ReceiverTracker is stopped
      future.onComplete {
        case Success(_) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            // 重啟receiver
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))
          }
        case Failure(e) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logError("Receiver has been stopped. Try to restart it.", e)
            logInfo(s"Restarting Receiver $receiverId")
            // 重啟receiver
            self.send(RestartReceiver(receiver))
          }
      }(submitJobThreadPool)
      logInfo(s"Receiver ${receiver.streamId} started")
}

在startReceiverFunc函數(shù)中定義了從iterator中取一條記錄怔接,也就是receiver,然后實(shí)例化一個(gè)ReceiverSupervisorImpl,把receiver傳遞進(jìn)入搪泳,然后調(diào)用ReceiverSupervisorImpl的start方法。當(dāng)然這里并沒有啟動(dòng)ReceiverSupervisorImpl扼脐,只是定義了操作而已岸军,真正的執(zhí)行是在Executor中奋刽。
然后提交ReceiverRDD到集群運(yùn)行,代碼如下

val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
  1. 通過startReceiverFunc函數(shù) 來看ReceiverSupervisorImpl在Executor上的運(yùn)行艰赞。
    從supervisor.start()開始佣谐,start方法代碼如下
def start() {  
      onStart()  
      startReceiver()
}

onStart方法代碼如下

/** 
 * Called when supervisor is started.
 * Note that this must be called before the receiver.onStart() is called to ensure 
 * things like [[BlockGenerator]]s are started before the receiver starts sending data.
 */
protected def onStart() { }

重點(diǎn)是看onStart的注釋,注釋內(nèi)容說在receiver.onStart()之前方妖,必須BlockGenerator先啟動(dòng)狭魂,以保證接收到的數(shù)據(jù)能夠被存儲(chǔ)起來〉趁伲看onStart方法的子類實(shí)現(xiàn)雌澄,代碼如下

  override protected def onStart() {
    registeredBlockGenerators.foreach { _.start() }
  }

registeredBlockGenerators在ReceiverSupervisorImpl實(shí)例化的時(shí)候創(chuàng)建,代碼如下

private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator]

registeredBlockGenerators在createBlockGenerator方法中添加了BlockGenerator杯瞻,代碼如下

override def createBlockGenerator(blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {
    // Cleanup BlockGenerators that have already been stopped
    registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }

    // 每一個(gè)receiver創(chuàng)建一個(gè)BlockGenerator镐牺,因?yàn)閟treamId一一對(duì)應(yīng)receiver
    val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
    registeredBlockGenerators += newBlockGenerator
    newBlockGenerator
}

那么createBlockGenerator在什么時(shí)候被調(diào)用呢?看代碼

private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener)

registeredBlockGenerators的BlockGenerator已經(jīng)有了魁莉,看BlockGenerator的start()方法,代碼如下

def start(): Unit = synchronized {
    if (state == Initialized) {
      state = Active
      blockIntervalTimer.start()
      blockPushingThread.start()
      logInfo("Started BlockGenerator")
    } else {
      throw new SparkException(
        s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")
    }
}

這里啟動(dòng)了blockIntervalTimer和blockPushingThread睬涧,blockIntervalTimer就是一個(gè)定時(shí)器,默認(rèn)每200ms回調(diào)一下updateCurrentBuffer方法旗唁,回調(diào)時(shí)間通過參數(shù)spark.streaming.blockInterval設(shè)置畦浓,這也是一個(gè)性能調(diào)優(yōu)的參數(shù),時(shí)間過短太造成block碎片太多检疫,時(shí)間過長(zhǎng)可能導(dǎo)致block塊過大宅粥,具體時(shí)間長(zhǎng)短要根據(jù)實(shí)際業(yè)務(wù)而定,updateCurrentBuffer方法作用就是將接收到的數(shù)據(jù)包裝到block存儲(chǔ)电谣,代碼后面再看;blockPushingThread作用是定時(shí)從blocksForPushing隊(duì)列中取block,然后存儲(chǔ)抹蚀,并向ReceiverTrackerEndpoint匯報(bào)剿牺,代碼后面再看

  1. BlockGenerator啟動(dòng)之后接著看 supervisor.start()方法中的 startReceiver()方法, startReceiver()代碼如下
def startReceiver(): Unit = synchronized {
    try {
      if (onReceiverStart()) {
        logInfo("Starting receiver")
        receiverState = Started
        receiver.onStart()
        logInfo("Called receiver onStart")
      } else {
        // The driver refused us
        stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
      }
    } catch {
      case NonFatal(t) =>
        stop("Error starting receiver " + streamId, Some(t))
    }
}

首先判斷onReceiverStart()的返回值环壤,onReceiverStart()代碼在子類中的實(shí)現(xiàn)如下

override protected def onReceiverStart(): Boolean = {
    val msg = RegisterReceiver(
      streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
    trackerEndpoint.askWithRetry[Boolean](msg)
}

onReceiverStart內(nèi)部向trackerEndpoint發(fā)送了一條RegisterReceiver注冊(cè)receiver的消息晒来,在trackerEndpoint內(nèi)部收到消息后,將注冊(cè)信息包裝到一個(gè)ReceiverTrackingInfo的case class類中郑现,然后把ReceiverTrackingInfo按照k-v的方式put到receiverTrackingInfos中湃崩,key就是streamId,再次說明一個(gè)inputDstream對(duì)應(yīng)一個(gè)receiver接箫。
回到上面的調(diào)用返回true,將receiverState 標(biāo)記為Started攒读,然后調(diào)用了receiver的onStart方法。

  1. 以SocketReceiver為例辛友,看SocketReceiver的onStart方法 ,啟動(dòng)了一條后臺(tái)線程薄扁,調(diào)用receive()方法接收數(shù)據(jù)剪返,代碼如下
def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      setDaemon(true)
      override def run() { receive() }
    }.start()
}

接著看receive()方法,代碼如下

def receive() {
    var socket: Socket = null
    try {
      logInfo("Connecting to " + host + ":" + port)
      socket = new Socket(host, port)
      logInfo("Connected to " + host + ":" + port)
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
        store(iterator.next)
      }
      if (!isStopped()) {
        restart("Socket data stream had no more data")
      } else {
        logInfo("Stopped receiving")
      }
    } catch {
      case e: java.net.ConnectException =>
        restart("Error connecting to " + host + ":" + port, e)
      case NonFatal(e) =>
        logWarning("Error receiving data", e)
        restart("Error receiving data", e)
    } finally {
      if (socket != null) {
        socket.close()
        logInfo("Closed socket to " + host + ":" + port)
      }
    }
}

receiver方法的內(nèi)容就很簡(jiǎn)單了邓梅,啟動(dòng)一個(gè)socket接收數(shù)據(jù)脱盲,接收一行就調(diào)用store方法存儲(chǔ)起來,store方法的代碼如下

def store(dataItem: T) {  
      supervisor.pushSingle(dataItem)
}

調(diào)用supervisor的pushSingle方法日缨,supervisor就是ReceiverSupervisor的實(shí)現(xiàn)類ReceiverSupervisorImpl的方法钱反,代碼如下

def pushSingle(data: Any) { 
       defaultBlockGenerator.addData(data)
}

defaultBlockGenerator在上面說過,他是ReceiverSupervisorImpl的一個(gè)成員變量匣距,接著看他的addData方法面哥,代碼如下

def addData(data: Any): Unit = {
    if (state == Active) {
      waitToPush()
      synchronized {
        if (state == Active) {
          currentBuffer += data
        } else {
          throw new SparkException(
            "Cannot add data as BlockGenerator has not been started or has been stopped")
        }
      }
    } else {
      throw new SparkException(
        "Cannot add data as BlockGenerator has not been started or has been stopped")
    }
}

currentBuffer += data,在currentBuffer 上不斷的累加數(shù)據(jù),那么currentBuffer 的數(shù)據(jù)是怎樣存儲(chǔ)起來的呢墨礁,這時(shí)候就用到了前面介紹的 blockIntervalTimer和blockPushingThread

  1. 首先看blockIntervalTimer定時(shí)回調(diào)的updateCurrentBuffer()方法幢竹,代碼如下
private def updateCurrentBuffer(time: Long): Unit = {
   try {
     var newBlock: Block = null
     synchronized {
       if (currentBuffer.nonEmpty) {
         val newBlockBuffer = currentBuffer
         currentBuffer = new ArrayBuffer[Any]
         val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
         listener.onGenerateBlock(blockId)
         newBlock = new Block(blockId, newBlockBuffer)
       }
     }

     if (newBlock != null) {
       blocksForPushing.put(newBlock)  // put is blocking when queue is full
     }
   } catch {
     case ie: InterruptedException =>
       logInfo("Block updating timer thread was interrupted")
     case e: Exception =>
       reportError("Error in block updating thread", e)
   }
}

將currentBuffer交給newBlockBuffer ,然后實(shí)例化一個(gè)空的ArrayBuffer給currentBuffer恩静,接著實(shí)例化一個(gè)Block把newBlockBuffer 傳遞進(jìn)去焕毫,最后把newBlock 放入到blocksForPushing隊(duì)列中

  1. 接下來就是blockPushingThread干的活了,在blockPushingThread線程中調(diào)用keepPushingBlocks方法驶乾,代碼如下
private def keepPushingBlocks() {
    logInfo("Started block pushing thread")

    def areBlocksBeingGenerated: Boolean = synchronized {
      state != StoppedGeneratingBlocks
    }

    try {
      // While blocks are being generated, keep polling for to-be-pushed blocks and push them.
      while (areBlocksBeingGenerated) {
        Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
          case Some(block) => pushBlock(block)
          case None =>
        }
      }

      // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.
      logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
      while (!blocksForPushing.isEmpty) {
        val block = blocksForPushing.take()
        logDebug(s"Pushing block $block")
        pushBlock(block)
        logInfo("Blocks left to push " + blocksForPushing.size())
      }
      logInfo("Stopped block pushing thread")
    } catch {
      case ie: InterruptedException =>
        logInfo("Block pushing thread was interrupted")
      case e: Exception =>
        reportError("Error in block pushing thread", e)
    }
}

從blocksForPushing隊(duì)列中定時(shí)取出block然后pushBlock邑飒,代碼如下

Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
       case Some(block) => pushBlock(block)
       case None =>
}

接著看pushBlock(block)方法,代碼如下

listener.onPushBlock(block.id, block.buffer)

這里調(diào)用了listener的onPushBlock方法级乐,那么listener是從哪來的疙咸,查詢一下listener變量,listener是在BlockGenerator實(shí)例化的時(shí)候傳遞進(jìn)來的风科,找BlockGenerator的實(shí)例化撒轮,是通過createBlockGenerator方法接收的參數(shù)并傳遞給BlockGenerator。找createBlockGenerator方法的調(diào)用贼穆,終于看到了defaultBlockGeneratorListener的實(shí)例化题山,代碼如下

private val defaultBlockGeneratorListener = new BlockGeneratorListener {
    def onAddData(data: Any, metadata: Any): Unit = { }

    def onGenerateBlock(blockId: StreamBlockId): Unit = { }

    def onError(message: String, throwable: Throwable) {
      reportError(message, throwable)
    }

    def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
      pushArrayBuffer(arrayBuffer, None, Some(blockId))
    }
}

原來onPushBlock方法在這里,看pushArrayBuffer的調(diào)用 故痊,pushArrayBuffer方法的代碼如下

def pushArrayBuffer(
      arrayBuffer: ArrayBuffer[_],
      metadataOption: Option[Any],
      blockIdOption: Option[StreamBlockId]
    ) {
    pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)
}

重磅性的一行代碼出現(xiàn)了 pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption),代碼如下

def pushAndReportBlock(
      receivedBlock: ReceivedBlock,
      metadataOption: Option[Any],
      blockIdOption: Option[StreamBlockId]
    ) {
    val blockId = blockIdOption.getOrElse(nextBlockId)
    val time = System.currentTimeMillis
    val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
    logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
    val numRecords = blockStoreResult.numRecords
    val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
    trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
    logDebug(s"Reported block $blockId")
}

這里面做了幾事件事顶瞳,第一調(diào)用receivedBlockHandler來存儲(chǔ)block
第二向trackerEndpoint匯報(bào)block的存儲(chǔ)結(jié)果blockInfo

  1. receivedBlockHandler是在ReceiverSupervisorImpl實(shí)例化的時(shí)候創(chuàng)建的,代碼如下
private val receivedBlockHandler: ReceivedBlockHandler = {
    if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
      if (checkpointDirOption.isEmpty) {
        throw new SparkException(
          "Cannot enable receiver write-ahead log without checkpoint directory set. " +
            "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
            "See documentation for more details.")
      }
      new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
        receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
    } else {
      new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
    }
}

有兩種類型愕秫,一種的WAL方式慨菱,還有一種普通的方式。WAL的方式以后再看戴甩,這里看BlockManagerBasedBlockHandler符喝,代碼如下

private[streaming] class BlockManagerBasedBlockHandler(
    blockManager: BlockManager, storageLevel: StorageLevel)
  extends ReceivedBlockHandler with Logging {

  def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {

    var numRecords = None: Option[Long]

    val putResult: Seq[(BlockId, BlockStatus)] = block match {
      case ArrayBufferBlock(arrayBuffer) =>
        numRecords = Some(arrayBuffer.size.toLong)
        blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,tellMaster = true)
      case IteratorBlock(iterator) =>
        val countIterator = new CountingIterator(iterator)
        val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,tellMaster = true)
        numRecords = countIterator.count
        putResult
      case ByteBufferBlock(byteBuffer) =>
        blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
      case o =>
        throw new SparkException(
          s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
    }
    if (!putResult.map { _._1 }.contains(blockId)) {
      throw new SparkException(s"Could not store $blockId to block manager with storage level $storageLevel")
    }
    BlockManagerBasedStoreResult(blockId, numRecords)
  }

  def cleanupOldBlocks(threshTime: Long) {
    // this is not used as blocks inserted into the BlockManager are cleared by DStream's clearing
    // of BlockRDDs.
  }
}

這里就是借助BlockManager來存儲(chǔ)block并返回block存儲(chǔ)的元數(shù)據(jù),終于看完了receiver的整個(gè)數(shù)據(jù)接收和存儲(chǔ)甜孤。

  1. 整個(gè)過程還是很清晰的洲劣,如果有張流程圖就最好了备蚓,流程圖以后補(bǔ)上,謝謝
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末囱稽,一起剝皮案震驚了整個(gè)濱河市郊尝,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌战惊,老刑警劉巖流昏,帶你破解...
    沈念sama閱讀 210,914評(píng)論 6 490
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異吞获,居然都是意外死亡况凉,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 89,935評(píng)論 2 383
  • 文/潘曉璐 我一進(jìn)店門各拷,熙熙樓的掌柜王于貴愁眉苦臉地迎上來刁绒,“玉大人,你說我怎么就攤上這事烤黍≈校” “怎么了?”我有些...
    開封第一講書人閱讀 156,531評(píng)論 0 345
  • 文/不壞的土叔 我叫張陵速蕊,是天一觀的道長(zhǎng)嫂丙。 經(jīng)常有香客問我,道長(zhǎng)规哲,這世上最難降的妖魔是什么跟啤? 我笑而不...
    開封第一講書人閱讀 56,309評(píng)論 1 282
  • 正文 為了忘掉前任,我火速辦了婚禮唉锌,結(jié)果婚禮上隅肥,老公的妹妹穿的比我還像新娘。我一直安慰自己袄简,他們只是感情好武福,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,381評(píng)論 5 384
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著痘番,像睡著了一般。 火紅的嫁衣襯著肌膚如雪平痰。 梳的紋絲不亂的頭發(fā)上汞舱,一...
    開封第一講書人閱讀 49,730評(píng)論 1 289
  • 那天,我揣著相機(jī)與錄音宗雇,去河邊找鬼昂芜。 笑死,一個(gè)胖子當(dāng)著我的面吹牛赔蒲,可吹牛的內(nèi)容都是我干的泌神。 我是一名探鬼主播良漱,決...
    沈念sama閱讀 38,882評(píng)論 3 404
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼欢际!你這毒婦竟也來了母市?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,643評(píng)論 0 266
  • 序言:老撾萬榮一對(duì)情侶失蹤损趋,失蹤者是張志新(化名)和其女友劉穎患久,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體浑槽,經(jīng)...
    沈念sama閱讀 44,095評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蒋失,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,448評(píng)論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了桐玻。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片篙挽。...
    茶點(diǎn)故事閱讀 38,566評(píng)論 1 339
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖镊靴,靈堂內(nèi)的尸體忽然破棺而出铣卡,到底是詐尸還是另有隱情,我是刑警寧澤邑闲,帶...
    沈念sama閱讀 34,253評(píng)論 4 328
  • 正文 年R本政府宣布算行,位于F島的核電站,受9級(jí)特大地震影響苫耸,放射性物質(zhì)發(fā)生泄漏州邢。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,829評(píng)論 3 312
  • 文/蒙蒙 一褪子、第九天 我趴在偏房一處隱蔽的房頂上張望量淌。 院中可真熱鬧,春花似錦嫌褪、人聲如沸呀枢。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,715評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽裙秋。三九已至,卻和暖如春缨伊,著一層夾襖步出監(jiān)牢的瞬間摘刑,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,945評(píng)論 1 264
  • 我被黑心中介騙來泰國(guó)打工刻坊, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留枷恕,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 46,248評(píng)論 2 360
  • 正文 我出身青樓谭胚,卻偏偏與公主長(zhǎng)得像徐块,于是被迫代替她去往敵國(guó)和親未玻。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,440評(píng)論 2 348

推薦閱讀更多精彩內(nèi)容