yarn-cluster模式下的spark啟動

spark支持standlone缎除、yarn膝但、mesos等多種運行模式环形,其中standlone模式主要用于線下環(huán)境的測試策泣,線上都采用yarn或者mesos進行資源的管控、容錯抬吟,這篇文章中介紹下spark在yarn-cluster的整個啟動流程萨咕,重點介紹spark端的實現(xiàn)邏輯,關(guān)于yarn的一些細節(jié)我們會在其他的章節(jié)中進行介紹火本。


什么是yarn

yarn(Yet Another Resource Negotiator)是hadoop生態(tài)圈中用于資源管理危队、協(xié)調(diào)、任務(wù)隔離的框架钙畔,其他的計算編程模型可以基于yarn完成任務(wù)的調(diào)度執(zhí)行茫陆;更多請查看yarn官網(wǎng)

yarn的架構(gòu)圖

spark on yarn

消息流轉(zhuǎn)圖

消息流轉(zhuǎn)圖
  1. client端向ResourceManager提交spark任務(wù)
  1. ResourceManager根據(jù)ApplicationManager請求的參數(shù)以及當(dāng)前集群的運行狀況將啟動AM進程的請求發(fā)給相應(yīng)的NodeManager
  2. NodeManager在本機上根據(jù)AM的啟動命令拉起AM進程
  3. ApplicationManager向ResourceManager申請資源擎析,同時將啟動executor的ContainerRequest請求發(fā)送給ResourceManager;
  4. ResourceManger同樣將拉起executor的請求發(fā)給相應(yīng)的NodeManager簿盅,它根據(jù)executor的啟動命令拉起executor進程
  5. executor進程向ApplicationManager中的DriverEndpoint注冊自己,后續(xù)當(dāng)ApplicationManger中有任務(wù)需要執(zhí)行時揍魂,就會將任務(wù)的執(zhí)行調(diào)度到注冊成功的executor上;

類交互圖

類交互圖
  1. 設(shè)置環(huán)境變量HADOOP_CONF_DIR或者YARN_CONF_DIR來指定yarn-site.xmlcore-site.xml等hadoop和yarn的資源配置信息桨醋;
  2. 執(zhí)行命令行sh bin/spark-submit --master yarn --depoly-mode cluster --files xxx --class xxx --jars xxx來向yarn提交spark應(yīng)用程序;
  3. 命令行最終轉(zhuǎn)化為執(zhí)行SparkSubmit.class的main函數(shù)现斋,再依次執(zhí)行參數(shù)的解析讨盒、校驗、轉(zhuǎn)換步责,最終再運行具體的類的main函數(shù),在yarn模式中執(zhí)行的類名為org.apache.spark.deploy.yarn.Client;
def main(args: Array[String]): Unit = {
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      // scalastyle:off println
      printStream.println(appArgs)
      // scalastyle:on println
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }

參數(shù)解析完畢后運行相應(yīng)的childMainClass禀苦,yarn模式下為Client類的main方法蔓肯。

private def submit(args: SparkSubmitArguments): Unit = {
    val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)

    def doRunMain(): Unit = {
      if (args.proxyUser != null) {
        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
          UserGroupInformation.getCurrentUser())
        try {
          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
            }
          })
        } catch {
          case e: Exception =>
            // Hadoop's AuthorizationException suppresses the exception's stack trace, which
            // makes the message printed to the output by the JVM not very helpful. Instead,
            // detect exceptions with empty stack traces here, and treat them differently.
            if (e.getStackTrace().length == 0) {
              // scalastyle:off println
              printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
              // scalastyle:on println
              exitFn(1)
            } else {
              throw e
            }
        }
      } else {
        runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
      }
    }
  1. Client中首先向yarn申請一個ApplicationId,然后上傳相應(yīng)的文件振乏、jar蔗包、配置參數(shù)等到hdfs上;然后提交應(yīng)用到ResourceManager慧邮,如果spark.yarn.submit.waitAppCompletion設(shè)置為true调限,啟動進程會一直獲取應(yīng)用的狀態(tài)信息直到應(yīng)用狀態(tài)變?yōu)?code>FINISHED舟陆、KILLEDFAILED后退出耻矮;否則直接退出秦躯。
 private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
    : ContainerLaunchContext = {
    logInfo("Setting up container launch context for our AM")
    val appId = newAppResponse.getApplicationId
    val appStagingDir = getAppStagingDir(appId)
    val pySparkArchives =
      if (sparkConf.getBoolean("spark.yarn.isPython", false)) {
        findPySparkArchives()
      } else {
        Nil
      }
    val launchEnv = setupLaunchEnv(appStagingDir, pySparkArchives)
     //上傳spark.yarn.jar到/user/{user.home}/.sparkStaging/{applicationid}/__spark__.jar
     //上傳userjar到/user/{user.home}/.sparkStaging/{applicationid}/__app__.jar
     //上傳--jar中指定的jar到/user/{user.home}/.sparkStaging/{applicationid}/{#linkname}.jar
     //上傳--files中指定的文件到/user/{user.home}/.sparkStaging/{applicationid}/{#linkname}
     //上傳__spark_conf__.zip中指定的jar到/user/{user.home}/.sparkStaging/{applicationid}/__spark__conf__.zip,conf文件中包括HADOOP_CONF_DIR和YARN_CONF_DIR下面的所有文件以及由SparkConf生成的__spark_conf__.propertie文件
    val localResources = prepareLocalResources(appStagingDir, pySparkArchives)

    // Set the environment variables to be passed on to the executors.
    distCacheMgr.setDistFilesEnv(launchEnv)
    distCacheMgr.setDistArchivesEnv(launchEnv)

    val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
    amContainer.setLocalResources(localResources.asJava)
    amContainer.setEnvironment(launchEnv.asJava)

    val javaOpts = ListBuffer[String]()

    // Set the environment variable through a command prefix
    // to append to the existing value of the variable
    var prefixEnv: Option[String] = None
  1. 根據(jù)queue裆装、am的啟動命令踱承、依賴的環(huán)境變量等信息初始化ApplicationSubmissionContext
 def createApplicationSubmissionContext(
      newApp: YarnClientApplication,
      containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
    val appContext = newApp.getApplicationSubmissionContext
    appContext.setApplicationName(args.appName)
    appContext.setQueue(args.amQueue)
    appContext.setAMContainerSpec(containerContext)
    appContext.setApplicationType("SPARK")
    sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS)
      .map(StringUtils.getTrimmedStringCollection(_))
      .filter(!_.isEmpty())
      .foreach { tagCollection =>
        try {
          // The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use
          // reflection to set it, printing a warning if a tag was specified but the YARN version
          // doesn't support it.
          val method = appContext.getClass().getMethod(
            "setApplicationTags", classOf[java.util.Set[String]])
          method.invoke(appContext, new java.util.HashSet[String](tagCollection))
        } catch {
          case e: NoSuchMethodException =>
            logWarning(s"Ignoring $CONF_SPARK_YARN_APPLICATION_TAGS because this version of " +
              "YARN does not support it")
        }
      }
    sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match {
      case Some(v) => appContext.setMaxAppAttempts(v)
      case None => logDebug("spark.yarn.maxAppAttempts is not set. " +
          "Cluster's default value will be used.")
    }

    if (sparkConf.contains("spark.yarn.am.attemptFailuresValidityInterval")) {
      try {
        val interval = sparkConf.getTimeAsMs("spark.yarn.am.attemptFailuresValidityInterval")
        val method = appContext.getClass().getMethod(
          "setAttemptFailuresValidityInterval", classOf[Long])
        method.invoke(appContext, interval: java.lang.Long)
      } catch {
        case e: NoSuchMethodException =>
          logWarning("Ignoring spark.yarn.am.attemptFailuresValidityInterval because the version " +
            "of YARN does not support it")
      }
    }

    val capability = Records.newRecord(classOf[Resource])
    capability.setMemory(args.amMemory + amMemoryOverhead)
    capability.setVirtualCores(args.amCores)

    if (sparkConf.contains("spark.yarn.am.nodeLabelExpression")) {
      try {
        val amRequest = Records.newRecord(classOf[ResourceRequest])
        amRequest.setResourceName(ResourceRequest.ANY)
        amRequest.setPriority(Priority.newInstance(0))
        amRequest.setCapability(capability)
        amRequest.setNumContainers(1)
        val amLabelExpression = sparkConf.get("spark.yarn.am.nodeLabelExpression")
        val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String])
        method.invoke(amRequest, amLabelExpression)

        val setResourceRequestMethod =
          appContext.getClass.getMethod("setAMContainerResourceRequest", classOf[ResourceRequest])
        setResourceRequestMethod.invoke(appContext, amRequest)
      } catch {
        case e: NoSuchMethodException =>
          logWarning("Ignoring spark.yarn.am.nodeLabelExpression because the version " +
            "of YARN does not support it")
          appContext.setResource(capability)
      }
    } else {
      appContext.setResource(capability)
    }

    appContext
  }
  1. 提交應(yīng)用上下文到ResourceManager的ClientRMService接口完成任務(wù)的提交哨免;

ApplicationMaster啟動

  1. NodeManager拉起ApplicationMaster茎活,執(zhí)行main函數(shù),設(shè)置amfilter琢唾,確保webui只能被指定的原ip訪問载荔,否則重定向到proxyurl;
  2. 啟動線程driver來運行userclass的main函數(shù)采桃,初始化相應(yīng)的SparkContext懒熙,在主線程中向ResoureManager注冊am,告知RM其sparkUI的地址
 private def runDriver(securityMgr: SecurityManager): Unit = {
    //添加AmIpFilter芍碧,控制sparkui只能從特定的ip訪問煌珊,否則重定向到指定的url
    addAmIpFilter()
    //執(zhí)行userclass的main 函數(shù),開始運行用戶定義的代碼(sparksubmit命令中--class指定的類)
    userClassThread = startUserApplication()

    // This a bit hacky, but we need to wait until the spark.driver.port property has
    // been set by the Thread executing the user class.
    val sc = waitForSparkContextInitialized()

    // If there is no SparkContext at this point, just fail the app.
    if (sc == null) {
      finish(FinalApplicationStatus.FAILED,
        ApplicationMaster.EXIT_SC_NOT_INITED,
        "Timed out waiting for SparkContext.")
    } else {
      rpcEnv = sc.env.rpcEnv
      val driverRef = runAMEndpoint(
        sc.getConf.get("spark.driver.host"),
        sc.getConf.get("spark.driver.port"),
        isClusterMode = true)
      //向ResourceManager注冊啟動成功的AM泌豆,告知其ui的地址定庵,程序退出后,在去注冊的時候告知其history ui address
      registerAM(rpcEnv, driverRef, sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
      //等待應(yīng)用線程執(zhí)行結(jié)束
      userClassThread.join()
    }
  }
  1. 同時開啟report線程踪危,用于定期的向ResourceManager申請資源蔬浙,主要邏輯在YarnAllocator中;同時監(jiān)測executor失敗的情況贞远,當(dāng)executor失敗的次數(shù)超過spark.yarn.max.executor.failures指定的值時畴博,停止AM;
//開啟reporter線程監(jiān)控程序的運行狀態(tài),同時定期的向RM申請資源
 private def launchReporterThread(): Thread = {
    // The number of failures in a row until Reporter thread give up
    val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5)

    val t = new Thread {
      override def run() {
        var failureCount = 0
        while (!finished) {
          try {
            if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
              finish(FinalApplicationStatus.FAILED,
                ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
                s"Max number of executor failures ($maxNumExecutorFailures) reached")
            } else {
              logDebug("Sending progress")
              //觸發(fā)資源的申請流程
              allocator.allocateResources()
            }
            failureCount = 0
          } catch {
            case i: InterruptedException =>
            case e: Throwable => {
              failureCount += 1
              if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
                finish(FinalApplicationStatus.FAILED,
                  ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
                    s"$failureCount time(s) from Reporter thread.")
              } else {
                logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
              }
            }
          }
          try {
            val numPendingAllocate = allocator.getPendingAllocate.size
            allocatorLock.synchronized {
              val sleepInterval =
                if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
                  val currentAllocationInterval =
                    math.min(heartbeatInterval, nextAllocationInterval)
                  nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
                  currentAllocationInterval
                } else {
                  nextAllocationInterval = initialAllocationInterval
                  heartbeatInterval
                }
              logDebug(s"Number of pending allocations is $numPendingAllocate. " +
                       s"Sleeping for $sleepInterval.")
              allocatorLock.wait(sleepInterval)
            }
          } catch {
            case e: InterruptedException =>
          }
        }
      }
    }
    // setting to daemon status, though this is usually not a good idea.
    t.setDaemon(true)
    t.setName("Reporter")
    t.start()
    logInfo(s"Started progress reporter thread with (heartbeat : $heartbeatInterval, " +
            s"initial allocation : $initialAllocationInterval) intervals")
    t
  1. 如果未開啟dynamic功能蓝仲,系統(tǒng)一開始需要申請的executor個數(shù)由spark.executor.instances參數(shù)指定俱病;開啟dynamic allocate的話,最初的資源個數(shù)由參數(shù)spark.dynamicAllocation.initialExecutors指定
    YarnAllocator
 def allocateResources(): Unit = synchronized {
    //根據(jù)需要申請的executor總數(shù)袱结、已經(jīng)成功申請和任務(wù)的位置獲取需要申請的executor數(shù)目以及其地理位置信息
    updateResourceRequests()

    val progressIndicator = 0.1f
    // Poll the ResourceManager. This doubles as a heartbeat if there are no pending container
    // 獲取AmClientImpl中處于pending和release狀態(tài)的請求亮隙,組裝成AllocateRequest,最終調(diào)用ApplicationMasterProtocol.allocate接口向RM申請資源
    val allocateResponse = amClient.allocate(progressIndicator)

    val allocatedContainers = allocateResponse.getAllocatedContainers()

    if (allocatedContainers.size > 0) {
      logDebug("Allocated containers: %d. Current executor count: %d. Cluster resources: %s."
        .format(
          allocatedContainers.size,
          numExecutorsRunning,
          allocateResponse.getAvailableResources))

      handleAllocatedContainers(allocatedContainers.asScala)
    }
def updateResourceRequests(): Unit = {
    val pendingAllocate = getPendingAllocate
    val numPendingAllocate = pendingAllocate.size
    //計算出還需要申請多少container
    val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning

    if (missing > 0) {
      logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " +
        s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")

      // Split the pending container request into three groups: locality matched list, locality
      // unmatched list and non-locality list. Take the locality matched container request into
      // consideration of container placement, treat as allocated containers.
      // For locality unmatched and locality free container requests, cancel these container
      // requests, since required locality preference has been changed, recalculating using
      // container placement strategy.
      val (localityMatched, localityUnMatched, localityFree) = splitPendingAllocationsByLocality(
        hostToLocalTaskCounts, pendingAllocate)

      // Remove the outdated container request and recalculate the requested container number
      localityUnMatched.foreach(amClient.removeContainerRequest)
      localityFree.foreach(amClient.removeContainerRequest)
      val updatedNumContainer = missing + localityUnMatched.size + localityFree.size

     //計算出container的位置信息垢夹,比如分布到哪些host或者rack
      val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
        updatedNumContainer, numLocalityAwareTasks, hostToLocalTaskCounts,
          allocatedHostToContainersMap, localityMatched)

      for (locality <- containerLocalityPreferences) {
        val request = createContainerRequest(resource, locality.nodes, locality.racks)
        //只是簡單存儲到本地的全局的map中溢吻,待后續(xù)真正申請才向rm發(fā)送申請命令
        amClient.addContainerRequest(request)
        val nodes = request.getNodes
        val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.asScala.last
        logInfo(s"Container request (host: $hostStr, capability: $resource)")
      }
    } else if (missing < 0) {
      val numToCancel = math.min(numPendingAllocate, -missing)
      logInfo(s"Canceling requests for $numToCancel executor containers")

      val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)
      if (!matchingRequests.isEmpty) {
        matchingRequests.iterator().next().asScala
          .take(numToCancel).foreach(amClient.removeContainerRequest)
      } else {
        logWarning("Expected to find pending requests, but found none.")
      }
    }
  }
  1. 其余運行過程中executor故障退出后,會由YarnSchedulerBackend向YarnAllocator設(shè)置需要申請的總數(shù)果元,觸發(fā)節(jié)點的補充促王。

資源的彈性伸縮

spark on yarn運行用戶開啟彈性伸縮策略犀盟,系統(tǒng)將根據(jù)當(dāng)前的負載來決定增加或者移除相應(yīng)的executor,負載是根據(jù)正在運行以及待運行的任務(wù)數(shù)來決定需要的executor數(shù)目蝇狼;詳情可以參考類ExecutorAllocationManager阅畴。

  private def maxNumExecutorsNeeded(): Int = {
    val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
    //tasksPerExecutor是executor的核數(shù)
    (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
  }

動態(tài)伸縮的策略

  • 擴容策略: 當(dāng)積壓的task的持續(xù)時間超過一定的閾值時题翰,開始進行executor的增加恶阴,增加的方式類似于tcp的慢啟動算法,指數(shù)級增加
  • 縮容策略:當(dāng)executor空閑的時間超過一定的閾值時豹障,進行資源的釋放
    參數(shù)詳解
參數(shù)值 參數(shù)說明
spark.dynamicAllocation.minExecutors 最小的executor的數(shù)量
spark.dynamicAllocation.maxExecutors 最大的executor的數(shù)量
spark.dynamicAllocation.enabled 是否開啟動態(tài)分配策略冯事,前提是spark.executor.instances為0
spark.executor.instances 設(shè)置executor的個數(shù)
spark.dynamicAllocation.initialExecutors 初次申請executor個數(shù)
spark.dynamicAllocation.schedulerBacklogTimeout 開始擴容的閾值
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 超過schedulerBacklogTimeout閾值后,再次擴容的時間
spark.dynamicAllocation.executorIdleTimeout executor空閑時間血公,超過改值昵仅,移除掉executor
spark.dynamicAllocation.cachedExecutorIdleTimeout 有緩存block的executor的超時時間
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市累魔,隨后出現(xiàn)的幾起案子摔笤,更是在濱河造成了極大的恐慌,老刑警劉巖垦写,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件吕世,死亡現(xiàn)場離奇詭異,居然都是意外死亡梯投,警方通過查閱死者的電腦和手機命辖,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來尔艇,“玉大人,你說我怎么就攤上這事么鹤≈胀蓿” “怎么了?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵蒸甜,是天一觀的道長棠耕。 經(jīng)常有香客問我,道長柠新,這世上最難降的妖魔是什么昧辽? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮登颓,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘红氯。我一直安慰自己框咙,他們只是感情好咕痛,可當(dāng)我...
    茶點故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著喇嘱,像睡著了一般茉贡。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上者铜,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天腔丧,我揣著相機與錄音,去河邊找鬼德频。 笑死搂根,一個胖子當(dāng)著我的面吹牛劣针,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播衣厘,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼压恒!你這毒婦竟也來了影暴?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤探赫,失蹤者是張志新(化名)和其女友劉穎型宙,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體伦吠,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡妆兑,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了讨勤。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片箭跳。...
    茶點故事閱讀 40,505評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖潭千,靈堂內(nèi)的尸體忽然破棺而出谱姓,到底是詐尸還是另有隱情,我是刑警寧澤刨晴,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布屉来,位于F島的核電站,受9級特大地震影響狈癞,放射性物質(zhì)發(fā)生泄漏茄靠。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一蝶桶、第九天 我趴在偏房一處隱蔽的房頂上張望慨绳。 院中可真熱鬧,春花似錦、人聲如沸脐雪。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽战秋。三九已至璧亚,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間脂信,已是汗流浹背癣蟋。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留狰闪,地道東北人疯搅。 一個月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像尝哆,于是被迫代替她去往敵國和親秉撇。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,515評論 2 359

推薦閱讀更多精彩內(nèi)容