【Spark】Spark 存儲原理--讀數(shù)據(jù)過程

本篇結構:

  • 讀取數(shù)據(jù)塊過程
  • 內(nèi)存讀取
  • 磁盤讀取
  • 遠程讀取

一遏佣、讀取數(shù)據(jù)塊過程

BlockManager 的 get 方法是讀數(shù)據(jù)的入口點平酿,有本地讀取和遠程讀取兩個分叉口塞椎。本地讀取使用 getLocalValues 方法,根據(jù)存儲級別的不同喜庞,使用 MemoryStore.getValues 或者 DiskStore.getBytes 讀取數(shù)據(jù)。

遠程讀取使用 getRemoteValues 方法棋返,調用遠程數(shù)據(jù)傳輸服務類 BlockTransferService 的 fetchBlockSync 獲取數(shù)據(jù)延都。

完整的數(shù)據(jù)讀取過程如下:

二、內(nèi)存讀取

根據(jù)緩存的數(shù)據(jù)是否反序列化睛竣,getLocalValues 讀取內(nèi)存中的數(shù)據(jù)方法不同晰房,如果反序列化,則調用 MemoryStore 的 getValues 方法射沟,如果沒有反序列化殊者,則調用 MemoryStore 的 getBytes 方法。

BlockManager # getLocalValues:

if (level.useMemory && memoryStore.contains(blockId)) {
  // 如果反序列化验夯,則直接讀取內(nèi)存中的數(shù)據(jù)
  val iter: Iterator[Any] = if (level.deserialized) {
    memoryStore.getValues(blockId).get
  } else {
   // 否則讀取字節(jié)數(shù)組猖吴,并需要做反序列化處理
    serializerManager.dataDeserializeStream(
      blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag)
  }
  // We need to capture the current taskId in case the iterator completion is triggered
  // from a different thread which does not have TaskContext set; see SPARK-18406 for
  // discussion.
  // 返回數(shù)據(jù)及數(shù)據(jù)塊大小、讀取方法等
  val ci = CompletionIterator[Any, Iterator[Any]](iter, {
    releaseLock(blockId, taskAttemptId)
  })
  Some(new BlockResult(ci, DataReadMethod.Memory, info.size))

在 MemoryStore 中挥转, getValues 和 getBytes 都根據(jù) BlockId 獲取內(nèi)存中的數(shù)據(jù)塊海蔽。

MemoryStore # getValues:

def getValues(blockId: BlockId): Option[Iterator[_]] = {
  val entry = entries.synchronized { entries.get(blockId) }
  entry match {
    case null => None
    case e: SerializedMemoryEntry[_] =>
      throw new IllegalArgumentException("should only call getValues on deserialized blocks")
    case DeserializedMemoryEntry(values, _, _) =>
      val x = Some(values)
      x.map(_.iterator)
  }
}

MemoryStore # getBytes:

def getBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
  val entry = entries.synchronized { entries.get(blockId) }
  entry match {
    case null => None
    case e: DeserializedMemoryEntry[_] =>
      throw new IllegalArgumentException("should only call getBytes on serialized blocks")
    case SerializedMemoryEntry(bytes, _, _) => Some(bytes)
  }
}

觀察 entries,發(fā)現(xiàn)其實就是一個 LinkedHashMap绑谣。所以緩存在內(nèi)存里的數(shù)據(jù)都是放入 LinkedHashMap 中党窜。

private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)

LinkedHashMap 保存了插入的順序,遍歷 LinkedHashMap 時借宵,先得到的記錄是先插入的幌衣。如果內(nèi)存不夠,先保存的數(shù)據(jù)會被先清除壤玫。

三泼掠、磁盤讀取

getLocalValues 方法中怔软,根據(jù)緩存級別,如果使用磁盤緩存择镇,則調用 DiskStore 的 getBytes 方法挡逼。

BlockManager # getLocalValues:

else if (level.useDisk && diskStore.contains(blockId)) {
    // 從磁盤中獲取數(shù)據(jù),由于保存到磁盤的數(shù)據(jù)是序列化的腻豌,讀取到的數(shù)據(jù)也是序列化后的
    val diskData = diskStore.getBytes(blockId)
  val iterToReturn: Iterator[Any] = {
    if (level.deserialized) {
      // 如果儲存級別需要反序列化家坎,則先反序列化,然后根據(jù)是否 level.useMemory 的值吝梅,判斷是否存儲到內(nèi)存中
      val diskValues = serializerManager.dataDeserializeStream(
        blockId,
        diskData.toInputStream())(info.classTag)
      maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
    } else {
      // 如果不需要反序列化虱疏,則直接判斷是否需要將這些序列化數(shù)據(jù)緩存到內(nèi)存中
      val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
        .map { _.toInputStream(dispose = false) }
        .getOrElse { diskData.toInputStream() }
      // 返回的數(shù)據(jù)需做反序列化處理
      serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
    }
  }
  val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
    releaseLockAndDispose(blockId, diskData, taskAttemptId)
  })
  // 返回數(shù)據(jù)及數(shù)據(jù)塊大小、讀取方法等
  Some(new BlockResult(ci, DataReadMethod.Disk, info.size))

重點看 DiskStore # getBytes:

def getBytes(blockId: BlockId): BlockData = {
  val file = diskManager.getFile(blockId.name)
  val blockSize = getSize(blockId)

  securityManager.getIOEncryptionKey() match {
    case Some(key) =>
      // Encrypted blocks cannot be memory mapped; return a special object that does decryption
      // and provides InputStream / FileRegion implementations for reading the data.
      new EncryptedBlockData(file, blockSize, conf, key)

    case _ =>
      new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize)
  }
}

3.1苏携、獲取磁盤存儲目錄

DiskStore 通過 DiskBlockManager 管理 Block 和相應磁盤文件的映射關系做瞪,從而將 Block 存儲到磁盤的文件中。

val file = diskManager.getFile(blockId.name)

DiskBlockManager 根據(jù) LOCAL_DIRS(yarn模式)右冻,SPARK_LOCAL_DIRS 或 spark.local.dir(其他模式装蓬,默認值 System.getProperty(“java.io.tmpdir“))配置的本地根目錄(可能有多個,以逗號分隔)來生成 DiskStore 存放 Block 的根目錄(與配置的根目錄對應纱扭,也有可能有多個):

  • …/blockmgr-UUID.randomUUID.toString(yarn模式)
  • …/spark-UUID.randomUUID.toString/blockmgr-UUID.randomUUID.toString(其他模式)

同時 DiskBlockManager 會為每個根目錄生成conf.getInt(“spark.diskStore.subDirectories“, 64) 個子目錄用來存放 Block 對應的文件牍帚,每個 Block 會根據(jù)它的 name 哈希到相應的子目錄,然后以 Block 的 name 為文件名來生成文件存儲乳蛾。

具體過程參看 DiskBlockManager 的 localDirs 屬性賦值過程:

private[spark] val localDirs: Array[File] = createLocalDirs(conf)

DiskBlockManager # createLocalDirs :

/**
 * Create local directories for storing block data. These directories are
 * located inside configured local directories and won't
 * be deleted on JVM exit when using the external shuffle service.
 */
private def createLocalDirs(conf: SparkConf): Array[File] = {
  Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
    try {
      val localDir = Utils.createDirectory(rootDir, "blockmgr")
      logInfo(s"Created local directory at $localDir")
      Some(localDir)
    } catch {
      case e: IOException =>
        logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e)
        None
    }
  }
}

DiskBlockManager # getConfiguredLocalDirs:

def getConfiguredLocalDirs(conf: SparkConf): Array[String] = {
  val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
  if (isRunningInYarnContainer(conf)) {
    // If we are in yarn mode, systems can have different disk layouts so we must set it
    // to what Yarn on this system said was available. Note this assumes that Yarn has
    // created the directories already, and that they are secured so that only the
    // user has access to them.
    getYarnLocalDirs(conf).split(",")
  } else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
    conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
  } else if (conf.getenv("SPARK_LOCAL_DIRS") != null) {
    conf.getenv("SPARK_LOCAL_DIRS").split(",")
  } else if (conf.getenv("MESOS_DIRECTORY") != null && !shuffleServiceEnabled) {
    // Mesos already creates a directory per Mesos task. Spark should use that directory
    // instead so all temporary files are automatically cleaned up when the Mesos task ends.
    // Note that we don't want this if the shuffle service is enabled because we want to
    // continue to serve shuffle files after the executors that wrote them have already exited.
    Array(conf.getenv("MESOS_DIRECTORY"))
  } else {
    if (conf.getenv("MESOS_DIRECTORY") != null && shuffleServiceEnabled) {
      logInfo("MESOS_DIRECTORY available but not using provided Mesos sandbox because " +
        "spark.shuffle.service.enabled is enabled.")
    }
    // In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user
    // configuration to point to a secure directory. So create a subdirectory with restricted
    // permissions under each listed directory.
    conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(",")
  }
}

3.2暗赶、從文件讀取數(shù)據(jù)塊

參看 DiskBlockData 源碼:

private class DiskBlockData(
    minMemoryMapBytes: Long,
    maxMemoryMapBytes: Long,
    file: File,
    blockSize: Long) extends BlockData {

  override def toInputStream(): InputStream = new FileInputStream(file)

  /**
  * Returns a Netty-friendly wrapper for the block's data.
  *
  * Please see `ManagedBuffer.convertToNetty()` for more details.
  */
  override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)

  override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = {
    Utils.tryWithResource(open()) { channel =>
      var remaining = blockSize
      val chunks = new ListBuffer[ByteBuffer]()
      while (remaining > 0) {
        val chunkSize = math.min(remaining, maxMemoryMapBytes)
        val chunk = allocator(chunkSize.toInt)
        remaining -= chunkSize
        JavaUtils.readFully(channel, chunk)
        chunk.flip()
        chunks += chunk
      }
      new ChunkedByteBuffer(chunks.toArray)
    }
  }

  override def toByteBuffer(): ByteBuffer = {
    require(blockSize < maxMemoryMapBytes,
      s"can't create a byte buffer of size $blockSize" +
      s" since it exceeds ${Utils.bytesToString(maxMemoryMapBytes)}.")
    Utils.tryWithResource(open()) { channel =>
      if (blockSize < minMemoryMapBytes) {
        // For small files, directly read rather than memory map.
        val buf = ByteBuffer.allocate(blockSize.toInt)
        JavaUtils.readFully(channel, buf)
        buf.flip()
        buf
      } else {
        channel.map(MapMode.READ_ONLY, 0, file.length)
      }
    }
  }

  override def size: Long = blockSize

  override def dispose(): Unit = {}

  private def open() = new FileInputStream(file).getChannel
}

提供 toInputStream、toChunkedByteBuffer肃叶、 toByteBuffer 的方式讀取數(shù)據(jù)蹂随。

四、遠程讀取

Spark 讀取遠程節(jié)點的數(shù)據(jù)因惭,依賴 Netty 實現(xiàn)的 Spark Rpc 框架糙及,涉及兩個重要的類:

  • NettyBlockTransferService:為 Shuffle、存儲模塊提供了數(shù)據(jù)存取的接口實現(xiàn)筛欢,接收到數(shù)據(jù)存取的命令時浸锨,通過 Netty RPC 框架發(fā)送消息給指定節(jié)點,請求進行數(shù)據(jù)存取操作版姑。
  • NettyBlockRpcServer:Executor啟動時柱搜,會啟動 RPC 監(jiān)聽器,當監(jiān)聽到消息時將消息傳遞到該類進行處理剥险,消息包括讀取數(shù)據(jù) OpenBlocks 和寫入數(shù)據(jù) uploadBlock 兩種聪蘸。

4.1、獲取數(shù)據(jù)塊位置

入口為 BlockManager # getRemoteValues,接著調用 getRemoteBytes 方法健爬。在 getRemoteBytes 方法中調用 getLocationsAndStatus 方法向 BlockManagerMasterEndpoint 發(fā)送 GetLocationsAndStatus 消息控乾,請求數(shù)據(jù)塊所在的位置和狀態(tài)。

/**
 * Get block from remote block managers.
 *
 * This does not acquire a lock on this block in this JVM.
 */
private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
  val ct = implicitly[ClassTag[T]]
  getRemoteBytes(blockId).map { data =>
    val values =
      serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct)
    new BlockResult(values, DataReadMethod.Network, data.size)
  }
}

BlockManagerMaster # getLocationsAndStatus:

/** Get locations as well as status of the blockId from the driver */
def getLocationsAndStatus(blockId: BlockId): Option[BlockLocationsAndStatus] = {
  driverEndpoint.askSync[Option[BlockLocationsAndStatus]](
    GetLocationsAndStatus(blockId))
}

獲取到 Block 的位置列表后娜遵,BlockManager 的 getRemoteBytes 方法中調用 BlockTransferService 的 fetchBlockSync 方法蜕衡。

4.2、向數(shù)據(jù)塊所在節(jié)點發(fā)送 OpenBlocks 消息

BlockTransferService 的 fetchBlockSync 調用其實現(xiàn) NettyBlockTransferService 的fetchBlocks 方法设拟。

/**
 * A special case of [[fetchBlocks]], as it fetches only one block and is blocking.
 *
 * It is also only available after [[init]] is invoked.
 */
def fetchBlockSync(
    host: String,
    port: Int,
    execId: String,
    blockId: String,
    tempFileManager: TempFileManager): ManagedBuffer = {
  // A monitor for the thread to wait on.
  val result = Promise[ManagedBuffer]()
  fetchBlocks(host, port, execId, Array(blockId),
    new BlockFetchingListener {
      override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {
        result.failure(exception)
      }
      override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
        data match {
          case f: FileSegmentManagedBuffer =>
            result.success(f)
          case _ =>
            val ret = ByteBuffer.allocate(data.size.toInt)
            ret.put(data.nioByteBuffer())
            ret.flip()
            result.success(new NioManagedBuffer(ret))
        }
      }
    }, tempFileManager)
  ThreadUtils.awaitResult(result.future, Duration.Inf)
}

NettyBlockTransferService # fetchBlocks:

override def fetchBlocks(
    host: String,
    port: Int,
    execId: String,
    blockIds: Array[String],
    listener: BlockFetchingListener,
    tempFileManager: TempFileManager): Unit = {
  logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
  try {
    val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
      override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
        // 根據(jù)遠程節(jié)點的地址和端口創(chuàng)建通信客戶端 
        val client = clientFactory.createClient(host, port)
        // 通過該客戶端向指定節(jié)點發(fā)送讀取數(shù)據(jù)消息
        new OneForOneBlockFetcher(client, appId, execId, blockIds, listener,
          transportConf, tempFileManager).start()
      }
    }

    val maxRetries = transportConf.maxIORetries()
    if (maxRetries > 0) {
      // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
      // a bug in this code. We should remove the if statement once we're sure of the stability.
      new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start()
    } else {
      blockFetchStarter.createAndStart(blockIds, listener)
    }
  } catch {
    case e: Exception =>
      logError("Exception while beginning fetchBlocks", e)
      blockIds.foreach(listener.onBlockFetchFailure(_, e))
  }
}

fetchBlocks 中慨仿,根據(jù)遠程節(jié)點的地址和端口創(chuàng)建通信客戶端 TransportClient,通過該客戶端向指定節(jié)點發(fā)送讀取數(shù)據(jù)消息纳胧。

消息的具體發(fā)送是在 OneForOneBlockFetcher 的 start 方法中镰吆。

public void start() {
  if (blockIds.length == 0) {
    throw new IllegalArgumentException("Zero-sized blockIds array");
  }

  client.sendRpc(openMessage.toByteBuffer(), new RpcResponseCallback() {
    @Override
    public void onSuccess(ByteBuffer response) {
        ...
    }

    @Override
    public void onFailure(Throwable e) {
        ...
    }
  });
}

openMessage 是 OpenBlocks 類型。

this.openMessage = new OpenBlocks(appId, execId, blockIds);

4.3跑慕、遠程節(jié)點響應并傳輸對應的數(shù)據(jù)塊

對應的遠程節(jié)點監(jiān)聽消息万皿,當接收到消息后,在 NettyBlockRpcServer 中進行消息匹配核行。

override def receive(
    client: TransportClient,
    rpcMessage: ByteBuffer,
    responseContext: RpcResponseCallback): Unit = {
  val message = BlockTransferMessage.Decoder.fromByteBuffer(rpcMessage)
  logTrace(s"Received request: $message")

  message match {
    case openBlocks: OpenBlocks =>
      val blocksNum = openBlocks.blockIds.length
      val blocks = for (i <- (0 until blocksNum).view)
        yield blockManager.getBlockData(BlockId.apply(openBlocks.blockIds(i)))
      // 注冊 ManagedBuffer牢硅,利用 Netty 傳輸
      val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)
      logTrace(s"Registered streamId $streamId with $blocksNum buffers")
      responseContext.onSuccess(new StreamHandle(streamId, blocksNum).toByteBuffer)

    case uploadBlock: UploadBlock =>
      // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
      val (level: StorageLevel, classTag: ClassTag[_]) = {
        serializer
          .newInstance()
          .deserialize(ByteBuffer.wrap(uploadBlock.metadata))
          .asInstanceOf[(StorageLevel, ClassTag[_])]
      }
      val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
      val blockId = BlockId(uploadBlock.blockId)
      blockManager.putBlockData(blockId, data, level, classTag)
      responseContext.onSuccess(ByteBuffer.allocate(0))
  }
}

如上源碼,當匹配到 OpenBlocks 時钮科,調用 BlockManager 的 getBlockData 方法讀取該節(jié)點上的數(shù)據(jù)唤衫。讀取的數(shù)據(jù)塊封裝為 ManagedBuffer 婆赠,然后使用 Netty 傳輸通道绵脯,把數(shù)據(jù)傳遞到請求節(jié)點上,完成數(shù)據(jù)傳輸休里。

BlockManager # getBlockData:

/**
 * Interface to get local block data. Throws an exception if the block cannot be found or
 * cannot be read successfully.
 */
override def getBlockData(blockId: BlockId): ManagedBuffer = {
  if (blockId.isShuffle) {
    shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
  } else {
    getLocalBytes(blockId) match {
      case Some(blockData) =>
        new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, true)
      case None =>
        // If this block manager receives a request for a block that it doesn't have then it's
        // likely that the master has outdated block statuses for this block. Therefore, we send
        // an RPC so that this block is marked as being unavailable from this block manager.
        reportBlockStatus(blockId, BlockStatus.empty)
        throw new BlockNotFoundException(blockId.toString)
    }
  }
}
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末蛆挫,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子妙黍,更是在濱河造成了極大的恐慌悴侵,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,482評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件拭嫁,死亡現(xiàn)場離奇詭異可免,居然都是意外死亡,警方通過查閱死者的電腦和手機做粤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,377評論 2 382
  • 文/潘曉璐 我一進店門浇借,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人怕品,你說我怎么就攤上這事妇垢。” “怎么了?”我有些...
    開封第一講書人閱讀 152,762評論 0 342
  • 文/不壞的土叔 我叫張陵闯估,是天一觀的道長灼舍。 經(jīng)常有香客問我,道長涨薪,這世上最難降的妖魔是什么骑素? 我笑而不...
    開封第一講書人閱讀 55,273評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮尤辱,結果婚禮上砂豌,老公的妹妹穿的比我還像新娘。我一直安慰自己光督,他們只是感情好阳距,可當我...
    茶點故事閱讀 64,289評論 5 373
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著结借,像睡著了一般筐摘。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上船老,一...
    開封第一講書人閱讀 49,046評論 1 285
  • 那天咖熟,我揣著相機與錄音,去河邊找鬼柳畔。 笑死馍管,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的薪韩。 我是一名探鬼主播确沸,決...
    沈念sama閱讀 38,351評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼俘陷!你這毒婦竟也來了罗捎?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 36,988評論 0 259
  • 序言:老撾萬榮一對情侶失蹤拉盾,失蹤者是張志新(化名)和其女友劉穎桨菜,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體捉偏,經(jīng)...
    沈念sama閱讀 43,476評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡倒得,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,948評論 2 324
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了夭禽。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片霞掺。...
    茶點故事閱讀 38,064評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖驻粟,靈堂內(nèi)的尸體忽然破棺而出根悼,到底是詐尸還是另有隱情凶异,我是刑警寧澤,帶...
    沈念sama閱讀 33,712評論 4 323
  • 正文 年R本政府宣布挤巡,位于F島的核電站剩彬,受9級特大地震影響,放射性物質發(fā)生泄漏矿卑。R本人自食惡果不足惜喉恋,卻給世界環(huán)境...
    茶點故事閱讀 39,261評論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望母廷。 院中可真熱鬧轻黑,春花似錦、人聲如沸琴昆。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,264評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽业舍。三九已至抖拦,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間舷暮,已是汗流浹背态罪。 一陣腳步聲響...
    開封第一講書人閱讀 31,486評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留下面,地道東北人复颈。 一個月前我還...
    沈念sama閱讀 45,511評論 2 354
  • 正文 我出身青樓,卻偏偏與公主長得像沥割,于是被迫代替她去往敵國和親耗啦。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 42,802評論 2 345

推薦閱讀更多精彩內(nèi)容