Spark-Core源碼精讀(10)、注冊Application及Executors的啟動注冊流程(二)

承接上一篇文章,我們繼續(xù)來分析Executor的啟動過程腻菇,本文主要分為兩部分:

  • 向worker發(fā)送啟動Executor的消息
  • 啟動完成后向driver發(fā)送ExecutorAdded的消息园细,這里的driver就是ClientEndpoint
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
  logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
  worker.addExecutor(exec)
  worker.endpoint.send(LaunchExecutor(masterUrl,
    exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
  exec.application.driver.send(
    ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}

啟動Executor

首先我們分析Worker在接收到LaunchExecutor消息之后所執(zhí)行的操作:

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
  // 首先判斷Master是否為Active狀態(tài)
  if (masterUrl != activeMasterUrl) {
    logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
  } else {
    try {
      logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
      // 創(chuàng)建executor的工作目錄
      // Create the executor's working directory
      val executorDir = new File(workDir, appId + "/" + execId)
      if (!executorDir.mkdirs()) {
        throw new IOException("Failed to create directory " + executorDir)
      }
      // Create local dirs for the executor. These are passed to the executor via the
      // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
      // application finishes.
      根據(jù)application創(chuàng)建executor的本地目錄惦积,可以通過SPARK_EXECUTOR_DIRS進行配置
      val appLocalDirs = appDirectories.get(appId).getOrElse {
        Utils.getOrCreateLocalRootDirs(conf).map { dir =>
          val appDir = Utils.createDirectory(dir, namePrefix = "executor")
          Utils.chmod700(appDir)
          appDir.getAbsolutePath()
        }.toSeq
      }
      appDirectories(appId) = appLocalDirs
      // 實例化ExecutorRunner
      val manager = new ExecutorRunner(
        appId,
        execId,
        appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
        cores_,
        memory_,
        self,
        workerId,
        host,
        webUi.boundPort,
        publicAddress,
        sparkHome,
        executorDir,
        workerUri,
        conf,
        appLocalDirs, ExecutorState.RUNNING)
      // 保存在executors中
      executors(appId + "/" + execId) = manager
      // 執(zhí)行ExecutorRunner的start方法
      manager.start()
      // 修改計算資源的使用情況
      coresUsed += cores_
      memoryUsed += memory_
      // 向Master發(fā)送ExecutorStateChanged的消息
      sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
    } catch {
      case e: Exception => {
        logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
        if (executors.contains(appId + "/" + execId)) {
          executors(appId + "/" + execId).kill()
          executors -= appId + "/" + execId
        }
        sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
          Some(e.toString), None))
      }
    }
  }

首先實例化ExecutorRunner,ExecutorRunner就是Standalone模式下用來管理一個executor進程的執(zhí)行的珊肃。然后調(diào)用ExecutorRunner的start()方法:

private[worker] def start() {
  workerThread = new Thread("ExecutorRunner for " + fullId) {
    override def run() { fetchAndRunExecutor() }
  }
  workerThread.start()
  // Shutdown hook that kills actors on shutdown.
  shutdownHook = ShutdownHookManager.addShutdownHook { () =>
    // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
    // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
    if (state == ExecutorState.RUNNING) {
      state = ExecutorState.FAILED
    }
    killProcess(Some("Worker shutting down")) }
}

可以看見內(nèi)部創(chuàng)建了一條線程用來執(zhí)行fetchAndRunExecutor方法荣刑,當調(diào)用線程的start方法時馅笙,線程中的run方法運行,即fetchAndRunExecutor()方法開始執(zhí)行:

private def fetchAndRunExecutor() {
  try {
    // Launch the process
    // 首先構建command
    val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
      memory, sparkHome.getAbsolutePath, substituteVariables)
    val command = builder.command()
    val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
    logInfo(s"Launch command: $formattedCommand")
    // 設置Executor的本地目錄并設置一些配置參數(shù)
    builder.directory(executorDir)
    builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
    // In case we are running this from within the Spark Shell, avoid creating a "scala"
    // parent process for the executor command
    builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
    // Add webUI log urls
    val baseUrl =
      s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
    builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
    builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
    // 開啟一個新的進程運行command
    process = builder.start()
    val header = "Spark Executor Command: %s\n%s\n\n".format(
      formattedCommand, "=" * 40)
    // Redirect its stdout and stderr to files
    val stdout = new File(executorDir, "stdout")
    stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
    val stderr = new File(executorDir, "stderr")
    Files.write(header, stderr, UTF_8)
    stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
    // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
    // or with nonzero exit code
    val exitCode = process.waitFor()
    state = ExecutorState.EXITED
    val message = "Command exited with code " + exitCode
    worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
  } catch {
    case interrupted: InterruptedException => {
      logInfo("Runner thread for executor " + fullId + " interrupted")
      state = ExecutorState.KILLED
      killProcess(None)
    }
    case e: Exception => {
      logError("Error running executor", e)
      state = ExecutorState.FAILED
      killProcess(Some(e.toString))
    }
  }
}

這里最重要的就是process = builder.start()厉亏,即開啟一個新的線程來運行我們構建的command董习,也就是說開辟一個新的進程(JVM)來運行"org.apache.spark.executor.CoarseGrainedExecutorBackend"這個類的main方法,還記得這是在哪里設置的嗎爱只,沒錯皿淋,就是SparkDeploySchedulerBackend的start()方法中,所以我們現(xiàn)在進入CoarseGrainedExecutorBackend這個類的main方法:

def main(args: Array[String]) {
  var driverUrl: String = null
  var executorId: String = null
  var hostname: String = null
  var cores: Int = 0
  var appId: String = null
  var workerUrl: Option[String] = None
  val userClassPath = new mutable.ListBuffer[URL]()
  var argv = args.toList
  // 這里就是通過我們構建command的時候傳入的參數(shù)對變量進行初始化操作
  while (!argv.isEmpty) {
    argv match {
      case ("--driver-url") :: value :: tail =>
        driverUrl = value
        argv = tail
      case ("--executor-id") :: value :: tail =>
        executorId = value
        argv = tail
      case ("--hostname") :: value :: tail =>
        hostname = value
        argv = tail
      case ("--cores") :: value :: tail =>
        cores = value.toInt
        argv = tail
      case ("--app-id") :: value :: tail =>
        appId = value
        argv = tail
      case ("--worker-url") :: value :: tail =>
        // Worker url is used in spark standalone mode to enforce fate-sharing with worker
        workerUrl = Some(value)
        argv = tail
      case ("--user-class-path") :: value :: tail =>
        userClassPath += new URL(value)
        argv = tail
      case Nil =>
      case tail =>
        // scalastyle:off println
        System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
        // scalastyle:on println
        printUsageAndExit()
    }
  }
  if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
    appId == null) {
    printUsageAndExit()
  }
  // 如果傳入的參數(shù)沒有問題就執(zhí)行run方法
  run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
}

這里要先說明一下恬试,CoarseGrainedExecutorBackend實際上實現(xiàn)的是ExecutorBackend窝趣,而ExecutorBackend根據(jù)集群的運行模式不同有三種不同的實現(xiàn),分別是CoarseGrainedExecutorBackend训柴、LocalBackend哑舒、MesosExecutorBackend,而這里的CoarseGrainedExecutorBackend就是Standalone模式下的具體實現(xiàn)幻馁,而Standalone模式下是通過ExecutorRunner來啟動一個進程運行CoarseGrainedExecutorBackend的main方法的洗鸵。

接下來就是調(diào)用run方法:

private def run(
    driverUrl: String,
    executorId: String,
    hostname: String,
    cores: Int,
    appId: String,
    workerUrl: Option[String],
    userClassPath: Seq[URL]) {
  SignalLogger.register(log)
  SparkHadoopUtil.get.runAsSparkUser { () =>
    // Debug code
    Utils.checkHost(hostname)
    // Bootstrap to fetch the driver's Spark properties.
    val executorConf = new SparkConf
    val port = executorConf.getInt("spark.executor.port", 0)
    val fetcher = RpcEnv.create(
      "driverPropsFetcher",
      hostname,
      port,
      executorConf,
      new SecurityManager(executorConf),
      clientMode = true)
    val driver = fetcher.setupEndpointRefByURI(driverUrl)
    val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++
      Seq[(String, String)](("spark.app.id", appId))
    fetcher.shutdown()
    // Create SparkEnv using properties we fetched from the driver.
    val driverConf = new SparkConf()
    for ((key, value) <- props) {
      // this is required for SSL in standalone mode
      if (SparkConf.isExecutorStartupConf(key)) {
        driverConf.setIfMissing(key, value)
      } else {
        driverConf.set(key, value)
      }
    }
    if (driverConf.contains("spark.yarn.credentials.file")) {
      logInfo("Will periodically update credentials from: " +
        driverConf.get("spark.yarn.credentials.file"))
      SparkHadoopUtil.get.startExecutorDelegationTokenRenewer(driverConf)
    }
    val env = SparkEnv.createExecutorEnv(
      driverConf, executorId, hostname, port, cores, isLocal = false)
    // SparkEnv will set spark.executor.port if the rpc env is listening for incoming
    // connections (e.g., if it's using akka). Otherwise, the executor is running in
    // client mode only, and does not accept incoming connections.
    val sparkHostPort = env.conf.getOption("spark.executor.port").map { port =>
        hostname + ":" + port
      }.orNull
    env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
      env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env))
    workerUrl.foreach { url =>
      env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
    }
    env.rpcEnv.awaitTermination()
    SparkHadoopUtil.get.stopExecutorDelegationTokenRenewer()
  }
}

上面的源碼主要分為部分:

  • 從Driver上獲得Spark的一些屬性信息
  • 使用得到的信息創(chuàng)建ExecutorEnv即Executor的運行時環(huán)境
  • 然后實例化CoarseGrainedExecutorBackend并向RpcEnv進行注冊
  • 注冊時會調(diào)用CoarseGrainedExecutorBackend的onStart方法

WorkerWatcher部分此處我們不做分析,我們看CoarseGrainedExecutorBackend的onStart方法:

override def onStart() {
  logInfo("Connecting to driver: " + driverUrl)
  rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
    // This is a very fast action so we can use "ThreadUtils.sameThread"
    driver = Some(ref)
    // 向Driver發(fā)送RegisterExecutor消息
    ref.ask[RegisterExecutorResponse](
      RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
  }(ThreadUtils.sameThread).onComplete {
    // This is a very fast action so we can use "ThreadUtils.sameThread"
    case Success(msg) => Utils.tryLogNonFatalError {
      Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse
    }
    case Failure(e) => {
      logError(s"Cannot register with driver: $driverUrl", e)
      System.exit(1)
    }
  }(ThreadUtils.sameThread)
}

這里我們需要關心的是這個driver到底是誰仗嗦,即driverUrl到底是什么膘滨?

那么我們追蹤一下:driverUrl是實例化CoarseGrainedExecutorBackend的時候傳入的,而執(zhí)行實例化時候的這個driverUrl又是通過run方法傳入的稀拐,而run方法中的driverUrl又是main方法執(zhí)行的時候傳入的火邓,main方法的driverUrl是根據(jù)傳入的參數(shù)獲得的,即創(chuàng)建新進程時傳入的參數(shù)德撬,即執(zhí)行的command铲咨,而command是通過appDesc的command構建的,而appDesc是在SparkDeploySchedulerBackend中的start方法中構建的砰逻,如下所示:

// The endpoint for executors to talk to us
val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,
  RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt),
  CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
val args = Seq(
  "--driver-url", driverUrl,
  "--executor-id", "{{EXECUTOR_ID}}",
  "--hostname", "{{HOSTNAME}}",
  "--cores", "{{CORES}}",
  "--app-id", "{{APP_ID}}",
  "--worker-url", "{{WORKER_URL}}")

這里的CoarseGrainedSchedulerBackend.ENDPOINT_NAME是"CoarseGrainedScheduler":

private[spark] object CoarseGrainedSchedulerBackend {
  val ENDPOINT_NAME = "CoarseGrainedScheduler"
}

而DriverEndpoint注冊的時候就是使用的ENDPOINT_NAME

driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))

所以這里的driverUrl指的就是DriverEndpoint鸣驱,DriverEndpoint在接收到RegisterExecutor消息后執(zhí)行的操作為:

case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
  if (executorDataMap.contains(executorId)) {
    context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
  } else {
    // If the executor's rpc env is not listening for incoming connections, `hostPort`
    // will be null, and the client connection should be used to contact the executor.
    val executorAddress = if (executorRef.address != null) {
        executorRef.address
      } else {
        context.senderAddress
      }
    logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
    addressToExecutorId(executorAddress) = executorId
    totalCoreCount.addAndGet(cores)
    totalRegisteredExecutors.addAndGet(1)
    val data = new ExecutorData(executorRef, executorRef.address, executorAddress.host,
      cores, cores, logUrls)
    // This must be synchronized because variables mutated
    // in this block are read when requesting executors
    CoarseGrainedSchedulerBackend.this.synchronized {
      executorDataMap.put(executorId, data)
      if (numPendingExecutors > 0) {
        numPendingExecutors -= 1
        logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
      }
    }
    // Note: some tests expect the reply to come after we put the executor in the map
    context.reply(RegisteredExecutor(executorAddress.host))
    listenerBus.post(
      SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
    makeOffers()
  }

如果一切正常DriverEndpoint會向CoarseGrainedExecutorBackend回復消息RegisteredExecutor,CoarseGrainedExecutorBackend接收到消息后實例化了Executor蝠咆,具體的實例化過程中比較重要的兩個部分就是初始化運行tasks的線程池和向driver發(fā)送心跳信息踊东,部分源碼如下:

...
// 開啟線程池,用來運行提交的tasks
// Start worker thread pool
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
private val executorSource = new ExecutorSource(threadPool, executorId)
...
// 可以看到是開辟了一個線程來發(fā)送心跳
// Executor for the heartbeat task.
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
// 使用driver中的HeartbeatReceiver來接收心跳刚操,實際上HeartbeatReceiver是SparkContext實例化的時候創(chuàng)建的
// must be initialized before running startDriverHeartbeat()
private val heartbeatReceiverRef =
  RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)
/**
 * When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES`
 * times, it should kill itself. The default value is 60. It means we will retry to send
 * heartbeats about 10 minutes because the heartbeat interval is 10s.
 */
// 上面的注釋說的很清楚了闸翅,最大的失敗次數(shù)是60次,每隔10s重試一次菊霜,也就是說可以重試10分鐘
private val HEARTBEAT_MAX_FAILURES = conf.getInt("spark.executor.heartbeat.maxFailures", 60)
/**
 * Count the failure times of heartbeat. It should only be acessed in the heartbeat thread. Each
 * successful heartbeat will reset it to 0.
 */
private var heartbeatFailures = 0
// 開始發(fā)送心跳
startDriverHeartbeater()

具體startDriverHeartbeater()方法的實現(xiàn)這里就不追蹤下去了坚冀,同時本文上述源碼中出現(xiàn)的向Master、Worker鉴逞、Driver回復消息的部分也不進行說明记某,大家可以自行閱讀司训,其實原理都是一樣的,就跟我們平時工作一樣液南,如果公司來了一個新同事壳猜,當他準備完成認為可以工作了,就要向領導匯報滑凉,領導接收到匯報之后就會為其分配具體的工作任務统扳。

附上一副圖,方便大家理解(注意該圖只是畫了主要流程畅姊,為了便于觀看咒钟,Rpc通信的部分只是簡單的畫成了“A發(fā)送消息給B”的形式,特此說明)

向driver發(fā)消息

下面是向driver發(fā)送消息的部分若未,注意這里的driver指的是ClientEndpoint

exec.application.driver.send(
  ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}

ClientEndpoint接收到消息后執(zhí)行的操作:

case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
  val fullId = appId + "/" + id
  logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort,
    cores))
  listener.executorAdded(fullId, workerId, hostPort, cores, memory)

這里主要就是日志相關的工作了朱嘴,不再闡述。

至此Application的注冊和Executor的啟動注冊大致的流程我們就走完了陨瘩,接下來就是task的提交和運行的部分了腕够。

本文參照的是Spark 1.6.3版本的源碼,同時給出Spark 2.1.0版本的連接:

Spark 1.6.3 源碼

Spark 2.1.0 源碼

本文為原創(chuàng)舌劳,歡迎轉載,轉載請注明出處玫荣、作者甚淡,謝謝!

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末捅厂,一起剝皮案震驚了整個濱河市贯卦,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌焙贷,老刑警劉巖撵割,帶你破解...
    沈念sama閱讀 219,490評論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異辙芍,居然都是意外死亡啡彬,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,581評論 3 395
  • 文/潘曉璐 我一進店門故硅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來庶灿,“玉大人,你說我怎么就攤上這事吃衅⊥撸” “怎么了?”我有些...
    開封第一講書人閱讀 165,830評論 0 356
  • 文/不壞的土叔 我叫張陵徘层,是天一觀的道長峻呕。 經(jīng)常有香客問我利职,道長,這世上最難降的妖魔是什么瘦癌? 我笑而不...
    開封第一講書人閱讀 58,957評論 1 295
  • 正文 為了忘掉前任眼耀,我火速辦了婚禮,結果婚禮上佩憾,老公的妹妹穿的比我還像新娘哮伟。我一直安慰自己,他們只是感情好妄帘,可當我...
    茶點故事閱讀 67,974評論 6 393
  • 文/花漫 我一把揭開白布楞黄。 她就那樣靜靜地躺著,像睡著了一般抡驼。 火紅的嫁衣襯著肌膚如雪鬼廓。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,754評論 1 307
  • 那天致盟,我揣著相機與錄音碎税,去河邊找鬼。 笑死馏锡,一個胖子當著我的面吹牛雷蹂,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播杯道,決...
    沈念sama閱讀 40,464評論 3 420
  • 文/蒼蘭香墨 我猛地睜開眼匪煌,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了党巾?” 一聲冷哼從身側響起萎庭,我...
    開封第一講書人閱讀 39,357評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎齿拂,沒想到半個月后驳规,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,847評論 1 317
  • 正文 獨居荒郊野嶺守林人離奇死亡署海,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,995評論 3 338
  • 正文 我和宋清朗相戀三年吗购,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片叹侄。...
    茶點故事閱讀 40,137評論 1 351
  • 序言:一個原本活蹦亂跳的男人離奇死亡巩搏,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出趾代,到底是詐尸還是另有隱情贯底,我是刑警寧澤,帶...
    沈念sama閱讀 35,819評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站禽捆,受9級特大地震影響笙什,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜胚想,卻給世界環(huán)境...
    茶點故事閱讀 41,482評論 3 331
  • 文/蒙蒙 一琐凭、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧浊服,春花似錦统屈、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,023評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至孽拷,卻和暖如春吨掌,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背脓恕。 一陣腳步聲響...
    開封第一講書人閱讀 33,149評論 1 272
  • 我被黑心中介騙來泰國打工膜宋, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人炼幔。 一個月前我還...
    沈念sama閱讀 48,409評論 3 373
  • 正文 我出身青樓秋茫,卻偏偏與公主長得像,于是被迫代替她去往敵國和親江掩。 傳聞我的和親對象是個殘疾皇子学辱,可洞房花燭夜當晚...
    茶點故事閱讀 45,086評論 2 355

推薦閱讀更多精彩內(nèi)容