Spark Streaming(3) - Receiver和ReceiverTacker

本文基于spark 2.11

1. 前言

在Spark Streaming(1)中介紹spark streaming工作流程中時瘦陈,大致介紹了streaming job在運(yùn)行時從stream中讀取數(shù)據(jù)的流程:

  1. Receiver運(yùn)行在executor上接收數(shù)據(jù)徽龟,將數(shù)據(jù)轉(zhuǎn)交給ReceiverSupervisor彰触,然后ReceiverSupervisor利用blockmanager存儲好數(shù)據(jù)斋荞,并將數(shù)據(jù)塊信息匯報給ReceiverTracker。
  2. ReceiverTracker運(yùn)行在Driver上胎撤,接收數(shù)據(jù)塊信息保存,后續(xù)在JobGenerator生成新jobs時分配數(shù)據(jù)作為新jobs的數(shù)據(jù)源断凶。

本文將詳細(xì)介紹上述流程伤提。

2 ReceiverTracker

ReceiverTracker有以下核心成員:

private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
private val receiverInputStreamIds = receiverInputStreams.map { _.id }
private val receivedBlockTracker = ...
  1. receiverInputStreams,DStreamGraph保存的所有DStream Graph所有的實現(xiàn)了ReceiverInputDStream類DStream,意味著這些DStream持有receiver认烁,能夠發(fā)送在executor啟動執(zhí)行接收流數(shù)據(jù)的任務(wù)肿男。
  2. 每一個receiver都有一個id,receiver匯報自己的數(shù)據(jù)時一并匯報自己的id却嗡,DStream DAG源頭的ReceiverInputDStream使用receiverTracker獲取屬于數(shù)據(jù)時就根據(jù)自己的id知道應(yīng)該取哪些數(shù)據(jù)次伶。
  3. receivedBlockTracker,后面會講稽穆。

2.1 ReceiverTracker的啟動

先在Driver端啟動ReceiverTracker冠王,是在JobScheduler啟動時完成,有如下調(diào)用序列:

StreamingContext#start
   ->JobScheduler#start
      ->ReceiverTracker#start

下面則是ReceiverTracker的方法:

def start(): Unit = synchronized {
    if (isTrackerStarted) {
      throw new SparkException("ReceiverTracker already started")
    }

    if (!receiverInputStreams.isEmpty) {
      endpoint = ssc.env.rpcEnv.setupEndpoint(
        "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
      if (!skipReceiverLaunch) launchReceivers()
      logInfo("ReceiverTracker started")
      trackerState = Started
    }
  }

可以看到start代碼中在rpcEnv上注冊了一個Rpc服務(wù)(關(guān)于spark 的Rcp原理可以參考spark網(wǎng)絡(luò)通信-RPC的實現(xiàn)

可以看看它注冊的receiverTrackerEndpoint的receive和receiveAdnreply方法看看它就收什么消息舌镶,提供什么服務(wù):

override def receive: PartialFunction[Any, Unit] = {
      // Local messages
      case StartAllReceivers(receivers) =>
          ...
          //這個消息是自己發(fā)給自己的柱彻,在executor上啟動receiver
          startReceiver(receiver, executors)
        }
      case RestartReceiver(receiver) =>
        ...
        startReceiver(receiver, scheduledLocations)
      case c: CleanupOldBlocks =>
        // 處理過了batch數(shù)據(jù)可以清除了
        ...
      case ReportError(streamId, message, error) =>
        reportError(streamId, message, error)
    }

    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
      // Remote messages
      case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
         ...
        // receiver在executor上啟動之后會將自己注冊到ReceiverTracker上豪娜,好讓它感知到
      case DeregisterReceiver(streamId, message, error) =>
        deregisterReceiver(streamId, message, error)
        context.reply(true)

      // Local messages
     case AddBlock(receivedBlockInfo) =>
      // 接收receiver上報的數(shù)據(jù)信息
     ...
      case AllReceiverIds =>
        context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)
      case GetAllReceiverInfo =>
        context.reply(receiverTrackingInfos.toMap)
      case StopAllReceivers =>
        assert(isTrackerStopping || isTrackerStopped)
        stopReceivers()
        context.reply(true)
    }

從上面可以它提供的服務(wù)包括注冊啟動receivers,注冊和銷毀數(shù)據(jù)等哟楷。

在回到start方法中調(diào)用launchReceivers()啟動receiver瘤载。

2.2 ReceiverTracker 處理receiver匯報的數(shù)據(jù)

ReceiverTracker的rpc服務(wù)接收到AddBlock()消息表示接收到receiver匯報的數(shù)據(jù)信息。

先看看AddBlock消息的結(jié)構(gòu):

// AddBlock消息包含了ReceiverdBlockInfo,這里存儲了receiver上報的數(shù)據(jù)具體信息
private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)

// 上報了streamId卖擅,numRecords表示本次batch中記錄數(shù),
// blockStoreResult  有兩個實現(xiàn)類:
//.       1. BlockManagerBasedStoreResult鸣奔,receiver端使用blockmanager管理batch數(shù)據(jù)
//        2. WriteAheadLogBasedStoreResult, receiver端使用了WAL保存了batch數(shù)據(jù)
//.     關(guān)于這兩種方式會在Receiver端時解釋
private[streaming] case class ReceivedBlockInfo(
    streamId: Int,
    numRecords: Option[Long],
    metadataOption: Option[Any],
    blockStoreResult: ReceivedBlockStoreResult
  ) {...}

再回到到rpc服務(wù)接收到AddBlock的處理,進(jìn)入如下調(diào)用序列:

case AddBlock =>. 接收到AddBlock消息
    -> ReceiverTracker#addBlock
     -> ReceivedBlockTracker#addBlock 使用receivedBlockTracker來管理上報的數(shù)據(jù)

ReceiveBlockTracker
接收到的消息最終時通過ReceivedBlockTracker來管理的惩阶,下面兩個成員涉及到ReceivedBlockTracker管理上報的數(shù)據(jù)信息:

private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]

  private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
  private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
  1. ReceivedBlockQueue挎狸,定義這個類型,后邊的所有數(shù)據(jù)信息是保存在隊列里的
  2. streamIdToUnallocatedBlockQueues断楷,首先上報的數(shù)據(jù)是安streamId區(qū)分開來的锨匆,這個隊列保存上報上來的,但是還還沒有分配給某個job的的數(shù)據(jù)冬筒。
  3. timeToAllocatedBlocks恐锣,JobGenerator生成job時請求receiverTracker分配數(shù)據(jù)給job,receiverTracker調(diào)用ReceiveBlockTracker分配數(shù)據(jù)舞痰,數(shù)據(jù)時間(job生成時間)土榴,streamId索引到,job對應(yīng)的RDD DAG就能根據(jù)時間和streamId從這里去數(shù)據(jù)响牛。

到這里ReceiveBlockTracker的addBlock的工作就清楚了將上報的數(shù)據(jù)保存到streamIdToUnallocatedBlockQueues就行了玷禽。

2.3 ReceiverTracker 為job分配數(shù)據(jù)

Spark Streaming(2)中第3節(jié)介紹JobGenerator生成job是方法generateJobs調(diào)用了receiverTracker.allocateBlocksToBatch為job分配輸入數(shù)據(jù),分配數(shù)據(jù)的工作同樣委派給ReceiveBlockTracker娃善,下面是其allocateBlocksToBatch方法:

def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
     // 將streamIdToUnallocatedBlockQueues中的數(shù)據(jù)全部取出來按照streamId區(qū)分
      val streamIdToBlocks = streamIds.map { streamId =>
          (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
      }.toMap
      val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
      if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
      // 保存到timeToAllocatedBlocks论衍,job里處于輸入源的DStream根據(jù)自己的時間的streamId取數(shù)據(jù)轉(zhuǎn)換成BlockRDD瑞佩。
        timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
        lastAllocatedBatchTime = batchTime
      } else {
        logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
      }
    } else {
   
      logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
    }
  }

3. Receiver和ReceiverSupervisor

2.1中提到ReceiverTracker的start方法調(diào)用launchReceivers啟動receiver, 在receiver啟動之前的調(diào)用是這樣的:

ReceiverTracker#start
   ->ReceiverTracker#launchReceivers
         -> ReceiverTrackerEndpoint#send(StartAllReceivers(receivers))
                                               | 給rpc服務(wù)發(fā)送StartAllReceivers消息
                                               v
    rpc服務(wù)收到消息    ReceiverTrackerEndpoint#receive 
                      ->ReceiverTracker#startReceiver  在executor上啟動receiver

receiver的獲取
spark streaming(1) 的2.2節(jié)提到ReceiverInputDStream需要返回一個receiver聚磺。
啟動receiver

  1. launchReceivers 從ReceiverTracker#receiverInputStreams成員中最終獲取到所有receivers,
  2. 給自己持有的rpc發(fā)送StartAllReceivers消息
  3. 接收到消息的rpc服務(wù)調(diào)用ReceiverTracker#startReceiver

核心在startReceiver炬丸,下面代碼:

 private def startReceiver(
        receiver: Receiver[_],
        scheduledLocations: Seq[TaskLocation]): Unit = {
      def shouldStartReceiver: Boolean = {
        // It's okay to start when trackerState is Initialized or Started
        !(isTrackerStopping || isTrackerStopped)
      }

      val receiverId = receiver.streamId
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
        return
      }

      val checkpointDirOption = Option(ssc.checkpointDir)
      val serializableHadoopConf =
        new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

      // 這個函數(shù)會在receiver相關(guān)信息發(fā)送到executor上執(zhí)行
      val startReceiverFunc: Iterator[Receiver[_]] => Unit =
        (iterator: Iterator[Receiver[_]]) => {
          if (!iterator.hasNext) {
            throw new SparkException(
              "Could not start receiver as object not found.")
          }
          if (TaskContext.get().attemptNumber() == 0) {
            val receiver = iterator.next()
            assert(iterator.hasNext == false)
            // 啟動receiverSupervisor
            val supervisor = new ReceiverSupervisorImpl(
              receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
            supervisor.start()
            supervisor.awaitTermination()
          } else {
            // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
          }
        }

      // 創(chuàng)建了RDD瘫寝,RDD的數(shù)據(jù)就是receivers組成的,結(jié)合上面的函數(shù)startReceiverFunc運(yùn)行在RDD的數(shù)據(jù)上稠炬,也就是接收receiver作為參數(shù)運(yùn)行
     
      val receiverRDD: RDD[Receiver[_]] =
        if (scheduledLocations.isEmpty) {
          ssc.sc.makeRDD(Seq(receiver), 1)
        } else {
          val preferredLocations = scheduledLocations.map(_.toString).distinct
          ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
        }
      receiverRDD.setName(s"Receiver $receiverId")
      ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
      ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

     // RDD生成job提交運(yùn)行
      val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
        receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
      // We will keep restarting the receiver job until ReceiverTracker is stopped
      future.onComplete {
        case Success(_) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))
          }
        case Failure(e) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logError("Receiver has been stopped. Try to restart it.", e)
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))
          }
      }(ThreadUtils.sameThread)
      logInfo(s"Receiver ${receiver.streamId} started")
    }

根據(jù)spark job提交一文介紹焕阿,最終startReceiverFunc函數(shù)會被包裝成ResultTask運(yùn)行在executor上,而ResultTask會調(diào)用startReceiverFunc完成receiverSupervisor的創(chuàng)建首启。

3.1 Receiver

上 面代碼Receiver的啟動序列是:

ReceiverSupervisor#start
   -> ReceiverSupervisor#startReceiver
      -> Receiver#onStart

Receiver主要有如下成員和方法:

@transient private var _supervisor: ReceiverSupervisor = null
def storeXXX()
def onStart()
def streamId: Int = id
  1. _supervisor, Receiver需要將自己接受到的數(shù)據(jù)轉(zhuǎn)給ReceiverSupervisor
  2. storeXXX,是一系列的方法暮屡, 存儲數(shù)據(jù),內(nèi)部就是調(diào)用ReceiverSupervisor的方法從而將數(shù)據(jù)轉(zhuǎn)給他存儲并匯報給ReceiverTracker毅桃。
  3. onStart褒纲,receiver啟動是調(diào)用准夷,一般在這里從流中讀數(shù)據(jù)
  4. streamId, 每一個輸入流唯一的id標(biāo)識

以SocketReceiver為例:

private[streaming]
class SocketReceiver[T: ClassTag](
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends Receiver[T](storageLevel) with Logging {

  private var socket: Socket = _

  def onStart() {

    logInfo(s"Connecting to $host:$port")
    try {
      socket = new Socket(host, port)
    } catch {
      case e: ConnectException =>
        restart(s"Error connecting to $host:$port", e)
        return
    }
    logInfo(s"Connected to $host:$port")

    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      setDaemon(true)
      override def run() { receive() }
    }.start()
  }

  def onStop() {
    // in case restart thread close it twice
    synchronized {
      if (socket != null) {
        socket.close()
        socket = null
        logInfo(s"Closed socket to $host:$port")
      }
    }
  }

  /** Create a socket connection and receive data until receiver is stopped */
  def receive() {
    try {
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
        store(iterator.next())
      }
      if (!isStopped()) {
        restart("Socket data stream had no more data")
      } else {
        logInfo("Stopped receiving")
      }
    } catch {
      case NonFatal(e) =>
        logWarning("Error receiving data", e)
        restart("Error receiving data", e)
    } finally {
      onStop()
    }
  }
}
  1. onStart方法啟動了后臺線程調(diào)用receive()接收數(shù)據(jù)
  2. receive方法調(diào)用store方法存入一條數(shù)據(jù)記錄莺掠。
    下面是store方法:
 def store(dataItem: T) {
   // 數(shù)據(jù)交給了ReceiverSupervisor
    supervisor.pushSingle(dataItem)
  }

3.2 ReceiverSupervisor

ReceiverSupervisor只有一個實現(xiàn)類ReceiverSupervisorImpl衫嵌,它運(yùn)行在executor上,啟動時會一同啟動Receiver彻秆。并將接收到的數(shù)據(jù)存儲起來楔绞,然后將數(shù)據(jù)信息匯報到ReceiverTracker,下面是其主要的方法和屬性:

private val receivedBlockHandler: ReceivedBlockHandler
private val trackerEndpoint 
private val endpoint 
private val registeredBlockGenerators = new ConcurrentLinkedQueue[BlockGenerator]()
private val  defaultBlockGeneratorListener
  1. receivedBlockHandler
    主要有兩種實現(xiàn):
  • WriteAheadLogBasedBlockHandler唇兑, 對于receiver轉(zhuǎn)過來的數(shù)據(jù)酒朵,使用WAL的方式保存,當(dāng)出錯重啟時可以從中恢復(fù)幔亥,確背芊恚可靠性。
  • BlockManagerBasedBlockHandler帕棉,使用blockmanager來管理數(shù)據(jù)针肥。

WAL的方式的好處是數(shù)據(jù)寫在hdfs中,當(dāng)driver application意外退出是香伴,數(shù)據(jù)也不會丟失慰枕,使用blockmanager的話如果driver application失敗了,或者executor所在node沒了即纲,都有可能導(dǎo)致數(shù)據(jù)丟失具帮。
通過spark.streaming.receiver.writeAheadLog.enable設(shè)置使用WAL的方式,使用WAL方式時低斋,數(shù)據(jù)同時也會使用blockmanager管理蜂厅。

  1. trackerEndpoint,是由ReceiverTracker 的rpc服務(wù)的引用膊畴,用來和ReceiverTracker通信(Spark Rpc原理參考spark rpc原理

  2. endpoint掘猿,ReceiverSupervisor自身提供的一些rpc服務(wù),接收的消息主要有:

    • StopReceiver唇跨, 停止Receiver稠通,ReceiverSupervisor
    • 對receivedBlockHandler保存下來的數(shù)據(jù)做一些清除工作
  3. registeredBlockGenerators
    有時候receiver每次只上報一條數(shù)據(jù),顯然為一條數(shù)據(jù)創(chuàng)建一個block id取管理是低效的买猖,registeredBlockGenerators就是用來匯集那些一條條上報的數(shù)據(jù)改橘,達(dá)到一定大小后交給ReceiverSupervisor去保存成一個block

  4. defaultBlockGeneratorListener,這個listener下面講BlockGenerator會講到玉控,BlockGenerator講匯集好的block再轉(zhuǎn)交給ReceiverSupervisor時就是用這個listener會調(diào)完成的飞主。

3.2.1 BlockGenerator

上面4中,ReceiverSupervisor啟動時會默認(rèn)注冊一個defaultBlockGenerator,其類就是BlockGenerator碌识。
這個類有如下一些成員:

private[streaming] class BlockGenerator(
    listener: BlockGeneratorListener,
    receiverId: Int,
    conf: SparkConf,
    clock: Clock = new SystemClock()
  ) extends RateLimiter(conf) with Logging {
...
private val blockIntervalTimer =
  new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")

private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
@volatile private var currentBuffer = new ArrayBuffer[Any]
...
  1. listener, 創(chuàng)建時由ReceiverSupervisor傳遞的讽挟,也就是上面5中的defaultBlockGeneratorListener
  2. blockIntervalTimer,前面說blockGenerator用來將一條條轉(zhuǎn)過來的數(shù)據(jù)匯集成一個個block,這個定時器每隔一段時間(blockIntervalMs)匯集一次數(shù)據(jù)
  3. blocksForPushing丸冕,數(shù)據(jù)被匯集成block后耽梅,先暫存在這里,等待轉(zhuǎn)交給ReceiverSupervisor保存并匯報
  4. blockPushingThread胖烛,線程不停的講blocksForPushing中的block轉(zhuǎn)交給ReceiverSupervisor
  5. currentBuffer,receiver發(fā)過來的一條條數(shù)據(jù)先暫時存在這里眼姐,等待blockIntervalTimer匯集一起成block。

下圖描述了數(shù)據(jù)從receiver到ReceiverTracker的流程

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末佩番,一起剝皮案震驚了整個濱河市众旗,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌趟畏,老刑警劉巖贡歧,帶你破解...
    沈念sama閱讀 218,546評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異赋秀,居然都是意外死亡利朵,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評論 3 395
  • 文/潘曉璐 我一進(jìn)店門猎莲,熙熙樓的掌柜王于貴愁眉苦臉地迎上來绍弟,“玉大人,你說我怎么就攤上這事著洼≌燎玻” “怎么了?”我有些...
    開封第一講書人閱讀 164,911評論 0 354
  • 文/不壞的土叔 我叫張陵身笤,是天一觀的道長豹悬。 經(jīng)常有香客問我,道長液荸,這世上最難降的妖魔是什么瞻佛? 我笑而不...
    開封第一講書人閱讀 58,737評論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮莹弊,結(jié)果婚禮上涤久,老公的妹妹穿的比我還像新娘涡尘。我一直安慰自己忍弛,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,753評論 6 392
  • 文/花漫 我一把揭開白布考抄。 她就那樣靜靜地躺著细疚,像睡著了一般。 火紅的嫁衣襯著肌膚如雪川梅。 梳的紋絲不亂的頭發(fā)上疯兼,一...
    開封第一講書人閱讀 51,598評論 1 305
  • 那天然遏,我揣著相機(jī)與錄音,去河邊找鬼吧彪。 笑死待侵,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的姨裸。 我是一名探鬼主播秧倾,決...
    沈念sama閱讀 40,338評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼傀缩!你這毒婦竟也來了那先?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,249評論 0 276
  • 序言:老撾萬榮一對情侶失蹤赡艰,失蹤者是張志新(化名)和其女友劉穎售淡,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體慷垮,經(jīng)...
    沈念sama閱讀 45,696評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡揖闸,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,888評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了料身。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片楔壤。...
    茶點故事閱讀 40,013評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖惯驼,靈堂內(nèi)的尸體忽然破棺而出蹲嚣,到底是詐尸還是另有隱情,我是刑警寧澤祟牲,帶...
    沈念sama閱讀 35,731評論 5 346
  • 正文 年R本政府宣布隙畜,位于F島的核電站,受9級特大地震影響说贝,放射性物質(zhì)發(fā)生泄漏议惰。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,348評論 3 330
  • 文/蒙蒙 一乡恕、第九天 我趴在偏房一處隱蔽的房頂上張望言询。 院中可真熱鬧,春花似錦傲宜、人聲如沸运杭。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽辆憔。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間虱咧,已是汗流浹背熊榛。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留腕巡,地道東北人玄坦。 一個月前我還...
    沈念sama閱讀 48,203評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像绘沉,于是被迫代替她去往敵國和親营搅。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,960評論 2 355

推薦閱讀更多精彩內(nèi)容