spark 2.2.0 源碼閱讀-foreach算子-到stage劃分

Spark 的RDD 支持兩種類型的操作:

  • transformations : 從一個(gè)已經(jīng)存在的數(shù)據(jù)集創(chuàng)建一個(gè)新的數(shù)據(jù)集
  • actions:在數(shù)據(jù)集上進(jìn)行計(jì)算后返回driver程序一個(gè)值
    今天從foreach 算子學(xué)習(xí)一下spark的源碼:

class RDD.scala

// Actions (launch a job to return a value to the user program)

  * Applies a function f to all elements of this RDD.
 def foreach(f: T => Unit): Unit = withScope {
 //首先執(zhí)行清除閉包锡移,使得能夠被序列化發(fā)送到tasks上砸逊,如果設(shè)置了checkSerializable 侣滩,會(huì)      檢查是否可以序列化
   val cleanF = sc.clean(f) 
   sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) 



   * Run a job on all partitions in an RDD and return the results in an array.
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @return in-memory collection with a result of the job (each collection element will contain
   * a result from one partition)
  def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
    runJob(rdd, func, 0 until rdd.partitions.length) // 入?yún)⑹莚dd义桂,傳入的方法漱竖,和分區(qū)的數(shù)組

這個(gè)函數(shù)執(zhí)行的runJob 入?yún)⒍嗔艘粋€(gè)TaskContext

   * Run a function on a given set of partitions in an RDD and return the results as an array.
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   * partitions of the target RDD, e.g. for operations like `first()`
   * @return in-memory collection with a result of the job (each collection element will contain
   * a result from one partition)
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: Iterator[T] => U,
      partitions: Seq[Int]): Array[U] = {
    val cleanedFunc = clean(func)
    runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
   * Run a function on a given set of partitions in an RDD and return the results as an array.
   * The function that is run against each partition additionally takes `TaskContext` argument.
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   * partitions of the target RDD, e.g. for operations like `first()`
   * @return in-memory collection with a result of the job (each collection element will contain
   * a result from one partition)
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int]): Array[U] = {
    val results = new Array[U](partitions.size)
    runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)


   * Run a function on a given set of partitions in an RDD and pass the results to the given
   * handler function. This is the main entry point for all actions in Spark.
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   * partitions of the target RDD, e.g. for operations like `first()`
   * @param resultHandler callback to pass each result to
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      //判斷sparkContext是否已經(jīng)退出抗斤,這是一個(gè)AtomicBoolean 類型
      throw new IllegalStateException("SparkContext has been shutdown")
    // 獲取代碼的位置囚企,就是代碼的某一行,打印如下 reduce at SparkAccumulatorTest.scala:43
    val callSite = getCallSite

    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    //dagScheduluer 開(kāi)始執(zhí)行
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    //doCheckPoint 會(huì)被父RDD遞歸調(diào)用龙宏,通過(guò)保存的形式執(zhí)行checkpoint
   * Run an action job on the given RDD and pass all the results to the resultHandler function as
   * they arrive.
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   *   partitions of the target RDD, e.g. for operations like first()
   * @param callSite where in the user program this job was called
   * @param resultHandler callback to pass each result to
   * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
   * @note Throws `Exception` when the job fails
  def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)//提交job,返回一個(gè)waiter
    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
    waiter.completionFuture.value.get match {
      case scala.util.Success(_) =>
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      case scala.util.Failure(exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
        val callerStackTrace = Thread.currentThread().getStackTrace.tail
        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
        throw exception


   * Submit an action job to the scheduler.
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   *   partitions of the target RDD, e.g. for operations like first()
   * @param callSite where in the user program this job was called
   * @param resultHandler callback to pass each result to
   * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
   * @return a JobWaiter object that can be used to block until the job finishes executing
   *         or can be used to cancel the job.
   * @throws IllegalArgumentException when partitions ids are illegal
  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      // Return immediately if the job is running 0 tasks
      return new JobWaiter[U](this, jobId, 0, resultHandler)

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
      jobId, rdd, func2, partitions.toArray, callSite, waiter,



  • onReceive 入口方法厨姚,執(zhí)行 doOnReceive 以及關(guān)閉timerContext
  • doOnReceive 具體的每種事件的執(zhí)行邏輯
  • onError
  • onStop
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
  extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {

  private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer

   * The main event loop of the DAG scheduler.
  override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
    } finally {

  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
//job 提交
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
//Stage 取消
    case StageCancelled(stageId, reason) =>
      dagScheduler.handleStageCancellation(stageId, reason)
// Job取消
    case JobCancelled(jobId, reason) =>
      dagScheduler.handleJobCancellation(jobId, reason)

    case JobGroupCancelled(groupId) =>

    case AllJobsCancelled =>

    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)

    case ExecutorLost(execId, reason) =>
      val filesLost = reason match {
        case SlaveLost(_, true) => true
        case _ => false
      dagScheduler.handleExecutorLost(execId, filesLost)

    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)

    case GettingResultEvent(taskInfo) =>

    case completion: CompletionEvent =>

    case TaskSetFailed(taskSet, reason, exception) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

    case ResubmitFailedStages =>

  override def onError(e: Throwable): Unit = {
    logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e)
    try {
    } catch {
      case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)

  override def onStop(): Unit = {
    // Cancel any active jobs in postStop hook

著重看下 DAGScheduler 的 handleJobSubmitted衅澈,首先createResultStage

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      //根據(jù)當(dāng)前的Rdd和分區(qū) 創(chuàng)建Stage  這兒與spark 1.6.3不一樣 newResultStage
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)

    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))



   * Create a ResultStage associated with the provided jobId.
  private def createResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)


   * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
   * the provided firstJobId.
  private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    getShuffleDependencies(rdd).map { shuffleDep =>
      getOrCreateShuffleMapStage(shuffleDep, firstJobId)



   * Returns shuffle dependencies that are immediate parents of the given RDD.
   * This function will not return more distant ancestors.  For example, if C has a shuffle
   * dependency on B which has a shuffle dependency on A:
   * A <-- B <-- C
   * calling this function with rdd C will only return the B <-- C dependency.
   * This function is scheduler-visible for the purpose of unit testing.
  private[scheduler] def getShuffleDependencies(
      rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
    val parents = new HashSet[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new Stack[RDD[_]]
    while (waitingForVisit.nonEmpty) {
      val toVisit = waitingForVisit.pop()
      if (!visited(toVisit)) {
        visited += toVisit
        toVisit.dependencies.foreach {
          case shuffleDep: ShuffleDependency[_, _, _] =>
            parents += shuffleDep
          case dependency =>


   * Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the
   * shuffle map stage doesn't already exist, this method will create the shuffle map stage in
   * addition to any missing ancestor shuffle map stages.如果在shuffleIdToMapStage已經(jīng)存在則返回shufflemapstage犁功,否則的話這個(gè)方法將會(huì)創(chuàng)建一個(gè)shuffle map stage
  private def getOrCreateShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) =>

      case None =>
        // Create stages for all missing ancestor shuffle dependencies.
        getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
          // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
          // that were not already in shuffleIdToMapStage, it's possible that by the time we
          // get to a particular dependency in the foreach loop, it's been added to
          // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
          // SPARK-13902 for more information.
          if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
            createShuffleMapStage(dep, firstJobId)
        // Finally, create a stage for the given shuffle dependency.
        createShuffleMapStage(shuffleDep, firstJobId)

如果shuffleIdToMapStage中存在一個(gè)shuffle map stage 則返回,如果不存在則創(chuàng)建一個(gè)新的shuffle map stage,什么是 ShuffleMapStages

 * ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle.
 * They occur right before each shuffle operation, and might contain multiple pipelined operations
 * before that (e.g. map and filter). When executed, they save map output files that can later be
 * fetched by reduce tasks. The `shuffleDep` field describes the shuffle each stage is part of,
 * and variables like `outputLocs` and `numAvailableOutputs` track how many map outputs are ready.
 * ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage.
 * For such stages, the ActiveJobs that submitted them are tracked in `mapStageJobs`. Note that
 * there can be multiple ActiveJobs trying to compute the same shuffle map stage.

ShuffleMapStage 就是shuflle 過(guò)程中DAG執(zhí)行的中間狀態(tài)的stage婚夫,發(fā)生在shuffle操作之前浸卦,可能包含多個(gè)pipeline操作。一旦執(zhí)行結(jié)束案糙,它就會(huì)保存map輸出文件限嫌。這些文件會(huì)被后續(xù)的reduce task 獲取。


rdd1 = rdd0.reduce
rdd2 =
rdd3 = rdd2.reduce
rdd4 = rdd3.foreach
則根據(jù)以下邏輯 rdd4 入棧稚叹,進(jìn)入循環(huán),未訪問(wèn)過(guò)禽笑,獲取shuffle依賴入录,返回rdd2,然后將ShuffleDependency壓入ancestors,壓入rdd2,然后rdd2出棧佳镜,rdd2獲得shuffle依賴 rdd0,壓入ancestors僚稿。

 /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
  private def getMissingAncestorShuffleDependencies(
      rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
    val ancestors = new Stack[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]]
    while (waitingForVisit.nonEmpty) {
      val toVisit = waitingForVisit.pop()
      if (!visited(toVisit)) {
        visited += toVisit
        getShuffleDependencies(toVisit).foreach { shuffleDep =>
          if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
          } // Otherwise, the dependency and its ancestors have already been registered.



   * Creates a ShuffleMapStage that generates the given shuffle dependency's partitions. If a
   * previously run stage generated the same shuffle data, this function will copy the output
   * locations that are still available from the previous shuffle to avoid unnecessarily
   * regenerating data.
  def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
    val rdd = shuffleDep.rdd
    //taskNums 等于paritions的數(shù)量
    val numTasks = rdd.partitions.length
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)

    stageIdToStage(id) = stage
    shuffleIdToMapStage(shuffleDep.shuffleId) = stage
    updateJobIdStageIdMaps(jobId, stage)

    if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
      // A previously run stage generated partitions for this shuffle, so for each output
      // that's still available, copy information about that output location to the new stage
      // (so we don't unnecessarily re-compute that data).
      val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
      val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
      (0 until locs.length).foreach { i =>
        if (locs(i) ne null) {
          // locs(i) will be null if missing
          stage.addOutputLoc(i, locs(i))
    } else {
      // Kind of ugly: need to register RDDs with the cache and map output tracker here
      // since we can't do it in the RDD constructor because # of partitions is unknown
      logInfo("Registering RDD " + + " (" + rdd.getCreationSite + ")")
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
  • 序言:七十年代末蠢络,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子迟蜜,更是在濱河造成了極大的恐慌刹孔,老刑警劉巖,帶你破解...
    沈念sama閱讀 219,427評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件娜睛,死亡現(xiàn)場(chǎng)離奇詭異髓霞,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)畦戒,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,551評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門方库,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人障斋,你說(shuō)我怎么就攤上這事纵潦⌒旌祝” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 165,747評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵邀层,是天一觀的道長(zhǎng)返敬。 經(jīng)常有香客問(wèn)我,道長(zhǎng)被济,這世上最難降的妖魔是什么救赐? 我笑而不...
    開(kāi)封第一講書人閱讀 58,939評(píng)論 1 295
  • 正文 為了忘掉前任涧团,我火速辦了婚禮只磷,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘泌绣。我一直安慰自己钮追,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,955評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布阿迈。 她就那樣靜靜地躺著元媚,像睡著了一般。 火紅的嫁衣襯著肌膚如雪苗沧。 梳的紋絲不亂的頭發(fā)上刊棕,一...
    開(kāi)封第一講書人閱讀 51,737評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音待逞,去河邊找鬼甥角。 笑死,一個(gè)胖子當(dāng)著我的面吹牛识樱,可吹牛的內(nèi)容都是我干的嗤无。 我是一名探鬼主播,決...
    沈念sama閱讀 40,448評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼怜庸,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼当犯!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起割疾,我...
    開(kāi)封第一講書人閱讀 39,352評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤嚎卫,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后宏榕,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體拓诸,經(jīng)...
    沈念sama閱讀 45,834評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,992評(píng)論 3 338
  • 正文 我和宋清朗相戀三年担扑,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了恰响。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,133評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡涌献,死狀恐怖胚宦,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤枢劝,帶...
    沈念sama閱讀 35,815評(píng)論 5 346
  • 正文 年R本政府宣布井联,位于F島的核電站,受9級(jí)特大地震影響您旁,放射性物質(zhì)發(fā)生泄漏烙常。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,477評(píng)論 3 331
  • 文/蒙蒙 一鹤盒、第九天 我趴在偏房一處隱蔽的房頂上張望蚕脏。 院中可真熱鬧,春花似錦侦锯、人聲如沸驼鞭。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 32,022評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)挣棕。三九已至,卻和暖如春亲桥,著一層夾襖步出監(jiān)牢的瞬間洛心,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,147評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工题篷, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留词身,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,398評(píng)論 3 373
  • 正文 我出身青樓悼凑,卻偏偏與公主長(zhǎng)得像偿枕,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子户辫,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,077評(píng)論 2 355
