Spark 源碼分析(二): Driver 注冊及啟動

上一篇文章已經(jīng)已經(jīng)執(zhí)行到 Client 向 masterEndpoint 發(fā)送了 RequestSubmitDriver 信息印屁,下面就看看 master 怎么注冊 driver 信息,并且怎么讓 worker 去啟動 driver 的留量。

一,org.apache.spark.deploy.master.Master

這個 Master 就是前面 Client 發(fā)送的對象哟冬,是一個 ThreadSafeRpcEndpoint楼熄。內(nèi)部的 receiveAndReply 這個方法在監(jiān)聽外部發(fā)來信息。下面就來看這個方法浩峡。

1孝赫,receiveAndReply 方法

這個方法內(nèi)部會根據(jù)發(fā)送過來的消息做模式匹配,我們找到 Client 發(fā)送過來的 RequestSubmitDriver 這個消息對應(yīng)代碼红符,如下:

// 匹配到 Client 發(fā)送過來的消息
case RequestSubmitDriver(description) =>
        // 判斷當(dāng)前 master 的狀態(tài)是否為 alive
      if (state != RecoveryState.ALIVE) {
        // 如果不是 alive 則回復(fù) driver 提交失敗
        val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
          "Can only accept driver submissions in ALIVE state."
        context.reply(SubmitDriverResponse(self, false, None, msg))
      } else {
        logInfo("Driver submitted " + description.command.mainClass)
        // 根據(jù) client 發(fā)過來的 driver 信息創(chuàng)建 driver青柄,然后持久化 driver
        // 然后將 driver 加入到等待隊(duì)列中去
        val driver = createDriver(description)
        persistenceEngine.addDriver(driver)
        waitingDrivers += driver
        // 將 driver 加入到 HashSet 中去
        drivers.add(driver)
          
        // 開始調(diào)度
        schedule()

        // TODO: It might be good to instead have the submission client poll the master to determine
        //       the current status of the driver. For now it's simply "fire and forget".

        context.reply(SubmitDriverResponse(self, true, Some(driver.id),
          s"Driver successfully submitted as ${driver.id}"))
      }

這段代碼伐债,做了這么一些操作:判斷當(dāng)前 master 的狀態(tài)是否為 alive ,如果不是則回復(fù)消息說:提交失敗致开,如果是則根據(jù)傳遞過來的 driver 信息創(chuàng)建 driver 對象(通過 createDriver 方法創(chuàng)建)并將其持久化峰锁,加入到等待隊(duì)列中去,然后開始執(zhí)行調(diào)度算法 schduler双戳。

這里涉及到連個方法虹蒋,分別可以看一下,一個是 createDriver 方法飒货,一個是 schduler 方法魄衅。

2,createDriver 方法

// 創(chuàng)建 driver 對象
private def createDriver(desc: DriverDescription): DriverInfo = {
    val now = System.currentTimeMillis()
    val date = new Date(now)
    // 通過系統(tǒng)當(dāng)前時間生成一個 driverId
    // 然后將系統(tǒng)當(dāng)前時間塘辅,driverId晃虫,DriverDescription,日期 這些信息封裝成一個 DriverInfo
    new DriverInfo(now, newDriverId(date), desc, date)
  }

這個方法主要是通過當(dāng)前時間生成一個 driverId扣墩,然后將當(dāng)前時間哲银,DriverDescription 等參數(shù)封裝成一個 DriverInfo 對象。

3呻惕,schduler 方法

該方法在 master 中會被多次調(diào)用荆责,每當(dāng) driver 的等待隊(duì)列中數(shù)據(jù)發(fā)生變動或者集群資源發(fā)生變化都會掉用這個方法。這個方法主要是為當(dāng)前 driver 的等待隊(duì)列分配資源的亚脆。

private def schedule(): Unit = {
    // 首先判斷當(dāng)前 master 的狀態(tài)是否為 alive 的做院,如果不是 alive 則不往下執(zhí)行
    if (state != RecoveryState.ALIVE) {
      return
    }
    // Random.shuffle 這個方法主要是隨機(jī)分散各個元素,具體代碼可以點(diǎn)進(jìn)去看
    // 這里主要是將集群中 state 為 alive 的 worker 帥選出來濒持,然后隨機(jī)打亂
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    
    // 當(dāng)前 alive 的 worker 數(shù)量
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
      
    // 將等待分配資源的 driver 隊(duì)列中的所有 driver 進(jìn)行遍歷
    // 然后為每個 driver 遍歷一遍所有的 alive worker键耕,當(dāng)碰到 worker 的可用內(nèi)存和比當(dāng)前隊(duì)列中
    // 等待的 driver 所需要的內(nèi)存要大并且 worker 的 core 數(shù)量也滿足 driver 的需求時
    // 就會調(diào)用 launcherDriver 方法去將 driver 發(fā)送對應(yīng)的 worker 上去執(zhí)行
    for (driver <- waitingDrivers.toList) { 
      var launched = false
      var numWorkersVisited = 0
      while (numWorkersVisited < numWorkersAlive && !launched) {
        val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        // 找到符合條件的 worker
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          launchDriver(worker, driver)
          // 將該 driver 從等待隊(duì)列中移除
          waitingDrivers -= driver
          // 標(biāo)記當(dāng)前 driver 為 launched
          launched = true
        }
        
        // 移到下一個 driver 上
        curPos = (curPos + 1) % numWorkersAlive
      }
    }
    
    // 調(diào)用 startExecutorsOnWorkers 方法
    startExecutorsOnWorkers()
  }

這個 schduler 方法會遍歷等待分配資源的 driver 隊(duì)列,為每個 driver 遍歷一遍 alive 的 worker弥喉,找到資源滿足的 worker郁竟,然后調(diào)用 launchDriver 方法玛迄,將該 driver 在這個 worker 上啟動由境,移除掉等待隊(duì)列中當(dāng)前 driver,然后調(diào)用 startExecutorsOnWorkers 啟動 executor蓖议。

這里又有兩個方法虏杰,一個是 launchDriver 方法,一個是 startExecutorsOnWorkers 方法去啟動 executor勒虾,startExecutorsOnWorkers 這個方法放到下面文章里講纺阔,這篇文章主要講 driver 注冊和啟動。

4修然,launchDriver 方法

這個方法主要是更新一些信息(worker 中的資源變更笛钝,worker 中啟動的 driver 信息記錄质况;driver 中添加上 worker 的信息),然后將向?qū)?yīng)的 worker 發(fā)送 LaunchDriver 的消息玻靡。

private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
    // 這里是將 workerInfo 中添加上啟動 driver 的信息结榄,內(nèi)部也會減去 driver 使用掉的資源
    worker.addDriver(driver)
    // 將 driver 啟動的 worker 信息記錄到 driver 中
    driver.worker = Some(worker)
    // 給 worker 發(fā)送 LaunchDriver 的信息
    worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
    // 標(biāo)記當(dāng)前 driver 狀態(tài)為 running 狀態(tài)
    driver.state = DriverState.RUNNING
  }

通過把啟動的 driver 信息記錄到對應(yīng)的 worker 信息中,再將對應(yīng)的 worker 信息記錄到 driver 里囤捻,然后給 worker 發(fā)送消息讓 worker 啟動 driver臼朗,標(biāo)記當(dāng)前的 driver 狀態(tài)為 running。

這里會給 worker 發(fā)送 LaunchDriver 的消息蝎土,下面去看下 worker 中是怎么處理這個消息的视哑。

二,org.apache.spark.deploy.worker.Worker

private[deploy] class Worker(
    override val rpcEnv: RpcEnv,
    webUiPort: Int,
    cores: Int,
    memory: Int,
    masterRpcAddresses: Array[RpcAddress],
    endpointName: String,
    workDirPath: String = null,
    val conf: SparkConf,
    val securityMgr: SecurityManager)
  extends ThreadSafeRpcEndpoint with Logging

從繼承關(guān)系上可以看出 worker 也是 RpcEndPoint誊涯,所以直接找到它的 receive 方法挡毅,然后根據(jù)模式匹配找到 LaunchDriver 這個匹配下看操作邏輯即可。

case LaunchDriver(driverId, driverDesc) =>
      logInfo(s"Asked to launch driver $driverId")
        // 將 driver 信息封裝到一個 runner 內(nèi)
      val driver = new DriverRunner(
        conf,
        driverId,
        workDir,
        sparkHome,
        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
        self,
        workerUri,
        securityMgr)
        // 然后將這個 runner 保存到一個 HashMap 中
      drivers(driverId) = driver
      // 啟動這個 runner
      driver.start()
            // 更新當(dāng)前 worker 的資源信息
      coresUsed += driverDesc.cores
      memoryUsed += driverDesc.mem

這里會將 driver 的信息封裝到一個 DriverRunner 里面醋拧,然后再降這個 runner 保存到內(nèi)存的一個 HashMap 中慷嗜,然后開啟這個 ruuner,更新當(dāng)前 worker 的資源信息丹壕。

到這里我們需要去看 DriverRunner 里是怎么操作的庆械。

三,org.apache.spark.deploy.worker.DriverRunner

DriverRunner 是在 standalone cluster 部署模式下用來執(zhí)行 driver 操作的菌赖,包括當(dāng) driver 掛掉之后的自動重啟缭乘。

1,start 方法

前面調(diào)用的是 runner 的 start 方法琉用,所以我們直接看這個 start 方法:

private[worker] def start() = {
    // 開一個線程
    new Thread("DriverRunner for " + driverId) {
      override def run() {
        var shutdownHook: AnyRef = null
        try {
          shutdownHook = ShutdownHookManager.addShutdownHook { () =>
            logInfo(s"Worker shutting down, killing driver $driverId")
            kill()
          }

          // 準(zhǔn)備 driver 的 jar 包并且執(zhí)行 driver堕绩,并返回一個 exitCode
          val exitCode = prepareAndRunDriver()

          // 根據(jù) exitCode 設(shè)置 finalState,一共有三種邑时,分別為:FINISHED奴紧,KILLED,F(xiàn)AILED
          finalState = if (exitCode == 0) {
            Some(DriverState.FINISHED)
          } else if (killed) {
            Some(DriverState.KILLED)
          } else {
            Some(DriverState.FAILED)
          }
        } catch {
          case e: Exception =>
            kill()
            finalState = Some(DriverState.ERROR)
            finalException = Some(e)
        } finally {
          if (shutdownHook != null) {
            ShutdownHookManager.removeShutdownHook(shutdownHook)
          }
        }

        // 然后將 driverId 和 driver 執(zhí)行結(jié)果 finalState 以及一些異常信息發(fā)送給 worker
        worker.send(DriverStateChanged(driverId, finalState.get, finalException))
      }
    }.start()
  }

這里主要是調(diào)用了一個 prepareAndRunDriver 這個方法晶丘,返回了一個結(jié)果碼黍氮,然后把結(jié)果碼轉(zhuǎn)換為 finalState ,然后發(fā)送給 worker浅浮。

所以我們直接去找 prepareAndRunDriver 這個方法沫浆。

2,prepareAndRunDriver 方法

private[worker] def prepareAndRunDriver(): Int = {
    
    // 創(chuàng)建 driver 的工作目錄
    val driverDir = createWorkingDirectory()
    // 下載 driver 的 jar 包到工作目錄下
    val localJarFilename = downloadUserJar(driverDir)

    def substituteVariables(argument: String): String = argument match {
      case "{{WORKER_URL}}" => workerUrl
      case "{{USER_JAR}}" => localJarFilename
      case other => other
    }

    // 創(chuàng)建 ProcessBuilder
    val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
      driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)

    runDriver(builder, driverDir, driverDesc.supervise)
  }

這個方法主要做了這些事:創(chuàng)建 driver 的工作目錄滚秩,將 driver 的 jar 包下載到工作目錄下专执,然后創(chuàng)建 ProcessBuilder,傳入 driver 的執(zhí)行命令郁油,然后調(diào)用 runDriver 方法本股。

下面我們看下 runDriver 方法攀痊。

3,runDriver 方法

private def runDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean): Int = {
    builder.directory(baseDir)
    // 初始化操作
    def initialize(process: Process): Unit = {
      // 創(chuàng)建 stout 文件
      val stdout = new File(baseDir, "stdout")
      // 將 process 的 InputStream 流重定向?yàn)?stout 文件
      CommandUtils.redirectStream(process.getInputStream, stdout)
            
      // 創(chuàng)建 stderr 文件
      val stderr = new File(baseDir, "stderr")
      // 將 builder 命令格式化處理
      val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"")
      val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)
      Files.append(header, stderr, StandardCharsets.UTF_8)
      // 將 process 的 ErrStream 重定向到 stderr 文件
      CommandUtils.redirectStream(process.getErrorStream, stderr)
    }
    // 調(diào)用 runCommandWithRetry
    runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
  }

該方法主要是定義了一個 initialize 方法拄显,里面會將傳入的 process 的輸入流和 err 流重定向到自定義的兩個文件中去蚕苇,然后調(diào)用 runCommandWithRetry 這個方法。

看下 runCommandWithRetry 這個方法凿叠。

4涩笤,runCommandWithRetry 方法

private[worker] def runCommandWithRetry(
      command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Int = {
    // 退出碼  
    var exitCode = -1
    // 提交重試的燈帶時間
    var waitSeconds = 1
    // A run of this many seconds resets the exponential back-off.
    val successfulRunDuration = 5
    var keepTrying = !killed

    while (keepTrying) {
      logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))

      synchronized {
        // 如果被 kill 則返回 exitcode
        if (killed) { return exitCode }
        // 執(zhí)行 command 命令,啟動 driver 進(jìn)程
        process = Some(command.start())
        // 調(diào)用上面定義好的 initialize 方法盒件,將一些流的輸出文件做重定向
        initialize(process.get)
      }

      val processStart = clock.getTimeMillis()
      exitCode = process.get.waitFor()

      // check if attempting another run
      keepTrying = supervise && exitCode != 0 && !killed
      if (keepTrying) {
        if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
          waitSeconds = 1
        }
        logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
        sleeper.sleep(waitSeconds)
        waitSeconds = waitSeconds * 2 // exponential back-off
      }
    }

    exitCode
  }

這里是真正運(yùn)行 driver 進(jìn)程的地方蹬碧,開啟 driver 進(jìn)程后會使用上面 runDriver 中定義好的 initialize 方法去將 driver 進(jìn)程中的一些流的輸出文件做重定向操作,并返回 exitcode炒刁。

至此恩沽,driver 就已經(jīng)在 master 上注冊好了,并且 master 也分配合適的 worker 啟動了該 driver 進(jìn)程翔始。

我們在 DriverRunner start 方法的最后會調(diào)用 worker.send(DriverStateChanged(driverId, finalState.get, finalException)) 這個方法罗心,給 worker 發(fā)送 driver 狀態(tài)變化的消息。

四城瞎,org.apache.spark.deploy.worker.Worker

這里我們看下 worker 是怎么處理的渤闷。

在 woker 的 receive 方法的模式匹配中是這么操作的:

case driverStateChanged @ DriverStateChanged(driverId, state, exception) =>
      handleDriverStateChanged(driverStateChanged)

會去調(diào)用 handleDriverStateChanged 這個方法。

1脖镀,handleDriverStateChanged 方法

我們再看下 handleDriverStateChanged 這個方法:

private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
    val driverId = driverStateChanged.driverId
    val exception = driverStateChanged.exception
    val state = driverStateChanged.state
    // 根據(jù) state 做匹配打印日志
    state match {
      case DriverState.ERROR =>
        logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
      case DriverState.FAILED =>
        logWarning(s"Driver $driverId exited with failure")
      case DriverState.FINISHED =>
        logInfo(s"Driver $driverId exited successfully")
      case DriverState.KILLED =>
        logInfo(s"Driver $driverId was killed by user")
      case _ =>
        logDebug(s"Driver $driverId changed state to $state")
    }
    // 向 master 發(fā)送 driverStateChanged 消息
    sendToMaster(driverStateChanged)
    // 將該 driver 從 drivers 移除到 finishedDrivers 中去
    val driver = drivers.remove(driverId).get
    finishedDrivers(driverId) = driver
    trimFinishedDriversIfNecessary()
    // 更新 worker 節(jié)點(diǎn)的資源情況
    memoryUsed -= driver.driverDesc.mem
    coresUsed -= driver.driverDesc.cores
  }

主要是做了這些事:根據(jù)發(fā)送過來的 state 做模式匹配飒箭,打印對應(yīng)的 log。然后把這個 driverStateChanged 消息轉(zhuǎn)發(fā)給 master蜒灰,最后再更新下當(dāng)前 worker 的一些存儲數(shù)據(jù)弦蹂。

最后在看下 master 收到這個 driverStateChanged 消息是怎么處理的。

五强窖,org.apache.spark.deploy.master.Master

在其 recieve 方法中可以找到匹配到 driverStageChanged 消息后的操作:

case DriverStateChanged(driverId, state, exception) =>
      state match {
        case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
          // 調(diào)用 removeDriver 方法
          removeDriver(driverId, state, exception)
        case _ =>
          throw new Exception(s"Received unexpected state update for driver $driverId: $state")
      }

在這里是調(diào)用了 removeDriver 方法凸椿,我們下面就看下這個方法。

1翅溺,removeDriver 方法

private def removeDriver(
      driverId: String,
      finalState: DriverState,
      exception: Option[Exception]) {
    // 根據(jù) driver id 進(jìn)行模式匹配
    drivers.find(d => d.id == driverId) match {
      case Some(driver) =>
        logInfo(s"Removing driver: $driverId")
        // 從 drivers 集合中移除當(dāng)前 driver
        drivers -= driver
        if (completedDrivers.size >= RETAINED_DRIVERS) {
          val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
          completedDrivers.trimStart(toRemove)
        }
        // 將 driver 添加到 completedDrivers 中去
        completedDrivers += driver
        // 從持久化引擎中移除
        persistenceEngine.removeDriver(driver)
        // 更新 driver 的狀態(tài)和 exception 并從 driver 的 worker 中移除掉當(dāng)前 driver
        driver.state = finalState
        driver.exception = exception
        driver.worker.foreach(w => w.removeDriver(driver))
        schedule()
      case None =>
        logWarning(s"Asked to remove unknown driver: $driverId")
    }
  }

這個方法主要是將 master 中資源做恢復(fù)操作脑漫,會根據(jù)當(dāng)前退出的 driver 做模式匹配,找到這個 driver未巫,然后將其從 drivers 的集合中移除窿撬,添加到 completedDrivers 中去启昧,然后從持久化引擎中移除掉叙凡,更新 driver 的狀態(tài),并從 driver 持有的 worker 中移除掉結(jié)束的這個 driver密末。然后再調(diào)用 schedule 方法握爷,讓釋放資源重新調(diào)度跛璧。

至此,driver 的注冊新啼,啟動追城,以及退出后資源回收,都結(jié)束了燥撞。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末座柱,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子物舒,更是在濱河造成了極大的恐慌色洞,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,372評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件冠胯,死亡現(xiàn)場離奇詭異火诸,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)荠察,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評論 3 392
  • 文/潘曉璐 我一進(jìn)店門置蜀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人悉盆,你說我怎么就攤上這事盯荤。” “怎么了焕盟?”我有些...
    開封第一講書人閱讀 162,415評論 0 353
  • 文/不壞的土叔 我叫張陵廷雅,是天一觀的道長。 經(jīng)常有香客問我京髓,道長航缀,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,157評論 1 292
  • 正文 為了忘掉前任堰怨,我火速辦了婚禮芥玉,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘备图。我一直安慰自己灿巧,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評論 6 388
  • 文/花漫 我一把揭開白布揽涮。 她就那樣靜靜地躺著抠藕,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蒋困。 梳的紋絲不亂的頭發(fā)上盾似,一...
    開封第一講書人閱讀 51,125評論 1 297
  • 那天,我揣著相機(jī)與錄音雪标,去河邊找鬼零院。 笑死溉跃,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的告抄。 我是一名探鬼主播撰茎,決...
    沈念sama閱讀 40,028評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼打洼!你這毒婦竟也來了龄糊?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,887評論 0 274
  • 序言:老撾萬榮一對情侶失蹤募疮,失蹤者是張志新(化名)和其女友劉穎绎签,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體酝锅,經(jīng)...
    沈念sama閱讀 45,310評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡诡必,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了搔扁。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片爸舒。...
    茶點(diǎn)故事閱讀 39,690評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖稿蹲,靈堂內(nèi)的尸體忽然破棺而出扭勉,到底是詐尸還是另有隱情,我是刑警寧澤苛聘,帶...
    沈念sama閱讀 35,411評論 5 343
  • 正文 年R本政府宣布涂炎,位于F島的核電站,受9級特大地震影響设哗,放射性物質(zhì)發(fā)生泄漏唱捣。R本人自食惡果不足惜恶耽,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評論 3 325
  • 文/蒙蒙 一珊皿、第九天 我趴在偏房一處隱蔽的房頂上張望谓晌。 院中可真熱鬧顷级,春花似錦、人聲如沸只冻。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽烦感。三九已至巡社,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間手趣,已是汗流浹背晌该。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評論 1 268
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人气笙。 一個月前我還...
    沈念sama閱讀 47,693評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像怯晕,于是被迫代替她去往敵國和親潜圃。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評論 2 353

推薦閱讀更多精彩內(nèi)容