Spark3-AQE-數(shù)據(jù)傾斜Join優(yōu)化

Adaptive Query Exection(自適應(yīng)查詢計劃)簡稱AQE,在最早在spark 1.6版本就已經(jīng)有了AQE;到了spark 2.x版本囊扳,intel大數(shù)據(jù)團隊進(jìn)行了相應(yīng)的原型開發(fā)和實踐;到了spark 3.0時代,AQE終于面向用戶可以使用了

注:以下代碼分析基于Spark3.0.1版本

1 Join的自適應(yīng)數(shù)據(jù)傾斜處理

代碼位于sql.core模塊的org.apache.spark.sql.execution.adaptive.OptimizeSkewedJoin
主要原理就是基于需要進(jìn)行join的兩個RDD的每個partition信息,將數(shù)據(jù)量傾斜的分區(qū)進(jìn)行切分出來再Join蔽介。

首先,是否能進(jìn)行傾斜優(yōu)化煮寡,有幾點硬性要求:

  • 必須是SortMergeJoin
  • 必須是[Inner,Cross,LeftSemi,LeftAnti,LeftOuter,RightOuter]中的一種Join
    • left表:Inner,Cross,LeftSemi,LeftAnti,LeftOuter
    • right表:Inner,Cross,RightOuter
  • left和right的分區(qū)數(shù)必須一致(這個只要不是異常情況一定可以保證虹蓄,sortMergeJoin在Mapper端會確保左右兩個rdd的partition函數(shù)一致,生成的分區(qū)數(shù)也一定是一致的)

主要流程為:

1.1 計算優(yōu)化后的partition大小

根據(jù)left或right所有partition的數(shù)據(jù)分布情況洲押,分別計算出left和right在優(yōu)化后的partition大小

調(diào)用targetSize方法武花,sizes是每個partition的bytes大小圆凰,medianSize表示整個rdd中partition大小的中位數(shù)杈帐。

變量advisorySize通過spark.sql.adaptive.advisoryPartitionSizeInBytes設(shè)置,表示優(yōu)化后的partition標(biāo)準(zhǔn)大小专钉,默認(rèn)64MB挑童。

private def targetSize(sizes: Seq[Long], medianSize: Long): Long = {
  val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
  val nonSkewSizes = sizes.filterNot(isSkewed(_, medianSize))
  // It's impossible that all the partitions are skewed, as we use median size to define skew.
  assert(nonSkewSizes.nonEmpty) #要求必須有不傾斜的分片數(shù)
  math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length)
}

首先通過調(diào)用isSkewed方法來過濾出不傾斜的分片,之后取advisorySize和整個分片的平均值大小作為優(yōu)化后的分片大小跃须,所以說targetSize也不一定就是我們設(shè)置的spark.sql.adaptive.advisoryPartitionSizeInBytes大小站叼。

private def isSkewed(size: Long, medianSize: Long): Boolean = {
  size > medianSize * conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) &&
    size > conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD)
}

isSkewed用來判斷一個分片是否傾斜,當(dāng)前分片必須滿足大小大于medianSize*spark.sql.adaptive.skewJoin.skewedPartitionFactor(默認(rèn)5)菇民,并且size>spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes(默認(rèn)256MB)尽楔。medianSize表示所有partition的中位數(shù)大小。

1.2 判斷partition是否進(jìn)行傾斜處理

首先判斷partition是否可切分第练,left的一個分片為例

val isLeftSkew = isSkewed(leftActualSizes(partitionIndex), leftMedSize) && canSplitLeft

直接調(diào)用isSkewed阔馋,canSplitLeft表示left是否滿足[Inner,Cross,LeftSemi,LeftAnti,LeftOuter]中的一種Join類型。right表判斷同理娇掏。

然后需要判斷一個partition是否是經(jīng)過coalesce操作的:

val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex

如果一個partition需要讀取的reduceId>=2個呕寝,那么認(rèn)為這個Partition經(jīng)過AQE的coalesce操作,Spark3.0.1版本對于這種情況不再考慮傾斜處理婴梧。

1.3 partition的切分操作

以left的一個分區(qū)為例下梢,必須滿足isLeftSkew && !isLeftCoalesced才會進(jìn)行split分區(qū)操作,否則返回leftPartSpec(原始的分區(qū)規(guī)則)塞蹭。

// A skewed partition should never be coalesced, but skip it here just to be safe.
val leftParts:Seq[CoalescedPartitionSpec] = if (isLeftSkew && !isLeftCoalesced) {//傾斜 &非coalesc才進(jìn)行自適應(yīng)分區(qū)
  val reducerId = leftPartSpec.startReducerIndex
  val skewSpecs:Option[Seq[PartialReducerPartitionSpec]] = createSkewPartitionSpecs(left.mapStats.shuffleId, reducerId, leftTargetSize)
  if (skewSpecs.isDefined) {
    logDebug(s"Left side partition $partitionIndex is skewed, split it into " +
      s"${skewSpecs.get.length} parts.")
    leftSkewDesc.addPartitionSize(leftActualSizes(partitionIndex))
  }
  skewSpecs.getOrElse(Seq(leftPartSpec))
} else {
  Seq(leftPartSpec)
}

首先會調(diào)用createSkewPartitionSpecs函數(shù)來進(jìn)行嘗試split處理,如果返回Some表示能進(jìn)行split番电,將該分區(qū)的大小add到leftSkewDesc竟坛,用于統(tǒng)計AQE信息。之后返回當(dāng)前分區(qū)信息,如果skewSpecs=None担汤,那么返回Seq長度為1的原始分區(qū)規(guī)則涎跨。

/**
 * Splits the skewed partition based on the map size and the target partition size
 * after split, and create a list of `PartialMapperPartitionSpec`. Returns None if can't split.
  * 為什么這里代碼看起來像是做合并呢,因為mapPartitionSizes對應(yīng)一個reducePart在上游需要讀取的part分區(qū)崭歧,但是這里將其合并為多個子分區(qū)隅很,
  * 每個子分區(qū)在AQE之后,都會單獨啟動一個分區(qū)率碾;
  * 合并分片:[0,1,2,3,4,5]->[Part[0,3],Part[3,5]]
 */
private def createSkewPartitionSpecs(
    shuffleId: Int,
    reducerId: Int,
    targetSize: Long): Option[Seq[PartialReducerPartitionSpec]] = {
  val mapPartitionSizes:Array[Long] = getMapSizesForReduceId(shuffleId, reducerId) //獲取每個分區(qū)的字節(jié)數(shù)
  // 嘗試進(jìn)行分區(qū)合并:比如[0,1,2,3,4,5]->[0,3,5]叔营,012合并為一個分片,34合并為一個分片
  val mapStartIndices:Array[Int] = ShufflePartitionsUtil.splitSizeListByTargetSize(mapPartitionSizes, targetSize)
  if (mapStartIndices.length > 1) { //如果合并后的分區(qū)數(shù)大于1個所宰,則轉(zhuǎn)換為合并后的partitions绒尊,分別對應(yīng)了mapstatus中的起始和結(jié)束索引
    //mapStartIndices.sliding(2).map(t=>PartialReducerPartitionSpec(reducerId, t(0),t(1)))//這么寫不是更好嗎?
    Some(mapStartIndices.indices.map { i =>
      val startMapIndex = mapStartIndices(i)
      val endMapIndex = if (i == mapStartIndices.length - 1) {
        mapPartitionSizes.length
      } else {
        mapStartIndices(i + 1)
      }
      PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex)
    })
  } else {//如果合并后只有一個索引仔粥,則不使用
    None
  }
}

判斷一個split到底切分為多少片其實也是個費勁的過程婴谱。其中g(shù)etMapSizesForReduceId這個方法需要解釋一下,我們知道Shuffle過程中躯泰,上游的Mapper端生成數(shù)據(jù)后谭羔,是按照reduceId來排序的,并整體放在一個data文件中麦向,同時生成一個索引文件瘟裸。Reduce端可以根據(jù)索引文件中起始的reduceId來讀取data中對應(yīng)片段的數(shù)據(jù),由于reduce端會依賴多個mapper诵竭,所以這個方法返回了一個Array[Long]類型话告,代表需要從對應(yīng)mapId拉取的bytes大小,mapId就是對應(yīng)數(shù)組下標(biāo)卵慰。通過這個方法我們起其實也可以知道沙郭,通過shuffleId+reduceId即可知道當(dāng)前reduce都需要拉取哪些數(shù)據(jù)了。

ShufflePartitionsUtil.splitSizeListByTargetSize我們這里不深入講解呵燕,有興趣的可以自己閱讀源碼棠绘。具體就是按照targetSize來將一個reduce中需要拉取的mapPartitionSizes切分為適合的大小,方法內(nèi)部主要是一個循環(huán)函數(shù)再扭,計算相鄰的mapSize氧苍,是否能合并為targetSize的大小。最終返回mapStartIndices:Array[Int]泛范,表示哪些mapId放到一個task中進(jìn)行處理让虐,比如[0,3,5],0-1-2合并為一個分片罢荡,3-4合并為一個分片赡突,而5單獨處理对扶。

之后的流程就比較簡單了,如果split完的分片數(shù)大于1個惭缰,則獲取這個分片需要讀取的startMapIndex和endMapIndex浪南,封裝為PartialReducerPartitionSpec,這樣漱受,在實際計算的時候络凿,通過shuffleId+reduceId+startMapIndex+endMapIndex就可以得到對應(yīng)task需要拉取的數(shù)據(jù)了。

通過1.2和1.3的處理昂羡,可以分別將獲取left和right切分后的part

1.4 分配另一半Partition

小標(biāo)題表述不準(zhǔn)確絮记,想表示的是left的某個partition假如split為3份,那么right需要將對應(yīng)的partition復(fù)制3份虐先,分別和left經(jīng)過split的分片做join怨愤。

代碼很精簡,雙層for循環(huán)蛹批,相當(dāng)于做了個笛卡爾積:

for {
  leftSidePartition <- leftParts
  rightSidePartition <- rightParts
} {
  leftSidePartitions += leftSidePartition
  rightSidePartitions += rightSidePartition
}

如果left和right都沒有經(jīng)過傾斜優(yōu)化撰洗,那么這段代碼中l(wèi)eftSidePartitions和rightSidePartitions分別只有各自原始的分區(qū)“忝迹看下面的例子了赵,p1代表left或這right的index=1的分片潜支。

如果left split為3份甸赃,那么leftSidePartitions=[p1_0,p1_1,p1_2],rightSidePartitions=[p1,p1,p1]冗酿。

如果left split為3份埠对,right split為2份,那么leftSidePartitions=[p1_0,p1_0,p1_1,p1_1,p1_2,p1_2]裁替,rightSidePartitions=[p1_0,p1_1,p1_0,p1_1,p1_0,p1_1]项玛。left將被fetch和處理2次,而right是3次弱判。

是不是看到了什么缺點襟沮?

1.5 更新Join計劃

新的執(zhí)行計劃建立在left或right有經(jīng)過傾斜優(yōu)化的分區(qū),smj代表SortMergeJoin昌腰,將優(yōu)化后的分區(qū)規(guī)則更新到執(zhí)行計劃中即可开伏。

if (leftSkewDesc.numPartitions > 0 || rightSkewDesc.numPartitions > 0) {
  val newLeft = CustomShuffleReaderExec(
    left.shuffleStage, leftSidePartitions, leftSkewDesc.toString)
  val newRight = CustomShuffleReaderExec(
    right.shuffleStage, rightSidePartitions, rightSkewDesc.toString)
  smj.copy(
    left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true)
} else {
  smj
}

2 補充

貼上optimizeSkewJoin的核心方法,推薦大家看看源代碼:

/*
 * This method aim to optimize the skewed join with the following steps:
 * 1. Check whether the shuffle partition is skewed based on the median size
 *    and the skewed partition threshold in origin smj.
 * 2. Assuming partition0 is skewed in left side, and it has 5 mappers (Map0, Map1...Map4).
 *    And we may split the 5 Mappers into 3 mapper ranges [(Map0, Map1), (Map2, Map3), (Map4)]
 *    based on the map size and the max split number.
 * 3. Wrap the join left child with a special shuffle reader that reads each mapper range with one
 *    task, so total 3 tasks.
 * 4. Wrap the join right child with a special shuffle reader that reads partition0 3 times by
 *    3 tasks separately.
 */
def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
  case smj @ SortMergeJoinExec(_, _, joinType, _,
      s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
      s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
      if supportedJoinTypes.contains(joinType) =>
    // 要求left和right分片數(shù)必須相同遭商,這在sortMergeJoin中是可以實現(xiàn)的
    assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
    val numPartitions = left.partitionsWithSizes.length
    // 獲取兩個rdd的所有part的bytes大小的中位數(shù)
    // Use the median size of the actual (coalesced) partition sizes to detect skewed partitions.
    val leftMedSize = medianSize(left.partitionsWithSizes.map(_._2))
    val rightMedSize = medianSize(right.partitionsWithSizes.map(_._2))
    logDebug(
      s"""
        |Optimizing skewed join.
        |Left side partitions size info:
        |${getSizeInfo(leftMedSize, left.partitionsWithSizes.map(_._2))}
        |Right side partitions size info:
        |${getSizeInfo(rightMedSize, right.partitionsWithSizes.map(_._2))}
      """.stripMargin)
    // 是否可以切分:滿足一定的join條件
    val canSplitLeft = canSplitLeftSide(joinType)
    val canSplitRight = canSplitRightSide(joinType)
    // We use the actual partition sizes (may be coalesced) to calculate target size, so that
    // the final data distribution is even (coalesced partitions + split partitions).
    // 分別對兩個rdd的每個partition進(jìn)行數(shù)據(jù)傾斜優(yōu)化后的大小
    val leftActualSizes = left.partitionsWithSizes.map(_._2)
    val rightActualSizes = right.partitionsWithSizes.map(_._2)
    val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
    val rightTargetSize = targetSize(rightActualSizes, rightMedSize)

    val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
    val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
    val leftSkewDesc = new SkewDesc
    val rightSkewDesc = new SkewDesc
    // 遍歷所有partition固灵,進(jìn)行合并或者切分
    for (partitionIndex <- 0 until numPartitions) {
          /** 遍歷每個part,如果是傾斜劫流,就按照map端分區(qū)的index進(jìn)行split巫玻,子分區(qū)不會超過targetSize*1.2的大小
            * 左右rdd都需要執(zhí)行這樣的操作丛忆,
            * */
      //是否傾斜及是否可切分
      val isLeftSkew = isSkewed(leftActualSizes(partitionIndex), leftMedSize) && canSplitLeft
      val leftPartSpec:CoalescedPartitionSpec = left.partitionsWithSizes(partitionIndex)._1
      //如果一個分片是經(jīng)過coalesc操作的,那么他的startIndex+1 != endIndex也就是至少讀取兩個分片
      val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex

      val isRightSkew = isSkewed(rightActualSizes(partitionIndex), rightMedSize) && canSplitRight
      val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
      val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex

      // A skewed partition should never be coalesced, but skip it here just to be safe.
      val leftParts = if (isLeftSkew && !isLeftCoalesced) {//傾斜 &非coalesc才進(jìn)行自適應(yīng)分區(qū)
        val reducerId = leftPartSpec.startReducerIndex
        val skewSpecs:Option[Seq[PartialReducerPartitionSpec]] = createSkewPartitionSpecs(left.mapStats.shuffleId, reducerId, leftTargetSize)
        if (skewSpecs.isDefined) {
          logDebug(s"Left side partition $partitionIndex is skewed, split it into " +
            s"${skewSpecs.get.length} parts.")
          leftSkewDesc.addPartitionSize(leftActualSizes(partitionIndex))
        }
        skewSpecs.getOrElse(Seq(leftPartSpec))
      } else {
        Seq(leftPartSpec)
      }

      // A skewed partition should never be coalesced, but skip it here just to be safe.
      val rightParts:Seq[CoalescedPartitionSpec] = if (isRightSkew && !isRightCoalesced) {
        val reducerId = rightPartSpec.startReducerIndex
        val skewSpecs :Option[Seq[PartialReducerPartitionSpec]]= createSkewPartitionSpecs(
          right.mapStats.shuffleId, reducerId, rightTargetSize)
        if (skewSpecs.isDefined) {
          logDebug(s"Right side partition $partitionIndex is skewed, split it into " +
            s"${skewSpecs.get.length} parts.")
          rightSkewDesc.addPartitionSize(rightActualSizes(partitionIndex))
        }
        skewSpecs.getOrElse(Seq(rightPartSpec))
      } else {
        Seq(rightPartSpec)
      }

          /** 這里做了一個笛卡爾積操作:
            * leftParts和rightParts分別存放了經(jīng)過split的分區(qū)或未經(jīng)過split的分區(qū)
            * 1 left right都沒有skew:則結(jié)果各自只有一個SidePartition
            * 2 left.skew right.notskew:rightSidePartition會將相同的part進(jìn)行復(fù)制(left.skew切分的part數(shù)量)
            * 3 left.notSkew +right.skew:leftSidePartition會將相同的part進(jìn)行復(fù)制(right.skew切分的part數(shù)量)
            * 4 left.skew+right.skew: 笛卡爾積仍秤,若left有3個part熄诡,right有2個part,那么left每個part會復(fù)制2份诗力,right每個part復(fù)制3份
            * */
      for {
        leftSidePartition <- leftParts
        rightSidePartition <- rightParts
      } {
        leftSidePartitions += leftSidePartition
        rightSidePartitions += rightSidePartition
      }
    }
    logDebug("number of skewed partitions: " +
      s"left ${leftSkewDesc.numPartitions}, right ${rightSkewDesc.numPartitions}")
    //修改經(jīng)過自適應(yīng)的join計劃
    if (leftSkewDesc.numPartitions > 0 || rightSkewDesc.numPartitions > 0) {
      val newLeft = CustomShuffleReaderExec(
        left.shuffleStage, leftSidePartitions, leftSkewDesc.toString)
      val newRight = CustomShuffleReaderExec(
        right.shuffleStage, rightSidePartitions, rightSkewDesc.toString)
      smj.copy(
        left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true)
    } else {
      smj
    }
}

其實整體看下來粮彤,邏輯不是那么的復(fù)雜。自適應(yīng)傾斜Join優(yōu)化并沒有使用我們熟知的分配隨機前綴Key來進(jìn)行姜骡,可能是需要進(jìn)行二次Join引入的復(fù)雜度無法預(yù)料导坟,所以這里采用split分片,進(jìn)行partition級別笛卡爾積的操作圈澈,這可能會導(dǎo)致數(shù)據(jù)傳輸量變大的問題惫周,但是整體來說還是比較可控的。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末康栈,一起剝皮案震驚了整個濱河市递递,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌啥么,老刑警劉巖登舞,帶你破解...
    沈念sama閱讀 218,755評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異悬荣,居然都是意外死亡菠秒,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評論 3 395
  • 文/潘曉璐 我一進(jìn)店門氯迂,熙熙樓的掌柜王于貴愁眉苦臉地迎上來践叠,“玉大人,你說我怎么就攤上這事嚼蚀〗疲” “怎么了?”我有些...
    開封第一講書人閱讀 165,138評論 0 355
  • 文/不壞的土叔 我叫張陵轿曙,是天一觀的道長弄捕。 經(jīng)常有香客問我,道長导帝,這世上最難降的妖魔是什么守谓? 我笑而不...
    開封第一講書人閱讀 58,791評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮舟扎,結(jié)果婚禮上分飞,老公的妹妹穿的比我還像新娘。我一直安慰自己睹限,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,794評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著丽旅,像睡著了一般。 火紅的嫁衣襯著肌膚如雪别洪。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,631評論 1 305
  • 那天柳刮,我揣著相機與錄音挖垛,去河邊找鬼。 笑死秉颗,一個胖子當(dāng)著我的面吹牛痢毒,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播蚕甥,決...
    沈念sama閱讀 40,362評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼哪替,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了菇怀?” 一聲冷哼從身側(cè)響起凭舶,我...
    開封第一講書人閱讀 39,264評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎爱沟,沒想到半個月后帅霜,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,724評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡呼伸,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年身冀,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蜂大。...
    茶點故事閱讀 40,040評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡闽铐,死狀恐怖蝶怔,靈堂內(nèi)的尸體忽然破棺而出奶浦,到底是詐尸還是另有隱情,我是刑警寧澤踢星,帶...
    沈念sama閱讀 35,742評論 5 346
  • 正文 年R本政府宣布澳叉,位于F島的核電站,受9級特大地震影響沐悦,放射性物質(zhì)發(fā)生泄漏成洗。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,364評論 3 330
  • 文/蒙蒙 一藏否、第九天 我趴在偏房一處隱蔽的房頂上張望瓶殃。 院中可真熱鬧,春花似錦副签、人聲如沸遥椿。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽冠场。三九已至家浇,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間碴裙,已是汗流浹背钢悲。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留舔株,地道東北人莺琳。 一個月前我還...
    沈念sama閱讀 48,247評論 3 371
  • 正文 我出身青樓,卻偏偏與公主長得像载慈,于是被迫代替她去往敵國和親芦昔。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,979評論 2 355

推薦閱讀更多精彩內(nèi)容