Adaptive Query Exection(自適應(yīng)查詢計劃)簡稱AQE,在最早在spark 1.6版本就已經(jīng)有了AQE;到了spark 2.x版本囊扳,intel大數(shù)據(jù)團隊進(jìn)行了相應(yīng)的原型開發(fā)和實踐;到了spark 3.0時代,AQE終于面向用戶可以使用了
注:以下代碼分析基于Spark3.0.1版本
1 Join的自適應(yīng)數(shù)據(jù)傾斜處理
代碼位于sql.core模塊的org.apache.spark.sql.execution.adaptive.OptimizeSkewedJoin
主要原理就是基于需要進(jìn)行join的兩個RDD的每個partition信息,將數(shù)據(jù)量傾斜的分區(qū)進(jìn)行切分出來再Join蔽介。
首先,是否能進(jìn)行傾斜優(yōu)化煮寡,有幾點硬性要求:
- 必須是SortMergeJoin
- 必須是[Inner,Cross,LeftSemi,LeftAnti,LeftOuter,RightOuter]中的一種Join
- left表:Inner,Cross,LeftSemi,LeftAnti,LeftOuter
- right表:Inner,Cross,RightOuter
- left和right的分區(qū)數(shù)必須一致(這個只要不是異常情況一定可以保證虹蓄,sortMergeJoin在Mapper端會確保左右兩個rdd的partition函數(shù)一致,生成的分區(qū)數(shù)也一定是一致的)
主要流程為:
1.1 計算優(yōu)化后的partition大小
根據(jù)left或right所有partition的數(shù)據(jù)分布情況洲押,分別計算出left和right在優(yōu)化后的partition大小
調(diào)用targetSize方法武花,sizes是每個partition的bytes大小圆凰,medianSize表示整個rdd中partition大小的中位數(shù)杈帐。
變量advisorySize通過spark.sql.adaptive.advisoryPartitionSizeInBytes設(shè)置,表示優(yōu)化后的partition標(biāo)準(zhǔn)大小专钉,默認(rèn)64MB挑童。
private def targetSize(sizes: Seq[Long], medianSize: Long): Long = {
val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
val nonSkewSizes = sizes.filterNot(isSkewed(_, medianSize))
// It's impossible that all the partitions are skewed, as we use median size to define skew.
assert(nonSkewSizes.nonEmpty) #要求必須有不傾斜的分片數(shù)
math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length)
}
首先通過調(diào)用isSkewed方法來過濾出不傾斜的分片,之后取advisorySize和整個分片的平均值大小作為優(yōu)化后的分片大小跃须,所以說targetSize也不一定就是我們設(shè)置的spark.sql.adaptive.advisoryPartitionSizeInBytes大小站叼。
private def isSkewed(size: Long, medianSize: Long): Boolean = {
size > medianSize * conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) &&
size > conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD)
}
isSkewed用來判斷一個分片是否傾斜,當(dāng)前分片必須滿足大小大于medianSize*spark.sql.adaptive.skewJoin.skewedPartitionFactor(默認(rèn)5)菇民,并且size>spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes(默認(rèn)256MB)尽楔。medianSize表示所有partition的中位數(shù)大小。
1.2 判斷partition是否進(jìn)行傾斜處理
首先判斷partition是否可切分第练,left的一個分片為例
val isLeftSkew = isSkewed(leftActualSizes(partitionIndex), leftMedSize) && canSplitLeft
直接調(diào)用isSkewed阔馋,canSplitLeft表示left是否滿足[Inner,Cross,LeftSemi,LeftAnti,LeftOuter]中的一種Join類型。right表判斷同理娇掏。
然后需要判斷一個partition是否是經(jīng)過coalesce操作的:
val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
如果一個partition需要讀取的reduceId>=2個呕寝,那么認(rèn)為這個Partition經(jīng)過AQE的coalesce操作,Spark3.0.1版本對于這種情況不再考慮傾斜處理婴梧。
1.3 partition的切分操作
以left的一個分區(qū)為例下梢,必須滿足isLeftSkew && !isLeftCoalesced才會進(jìn)行split分區(qū)操作,否則返回leftPartSpec(原始的分區(qū)規(guī)則)塞蹭。
// A skewed partition should never be coalesced, but skip it here just to be safe.
val leftParts:Seq[CoalescedPartitionSpec] = if (isLeftSkew && !isLeftCoalesced) {//傾斜 &非coalesc才進(jìn)行自適應(yīng)分區(qū)
val reducerId = leftPartSpec.startReducerIndex
val skewSpecs:Option[Seq[PartialReducerPartitionSpec]] = createSkewPartitionSpecs(left.mapStats.shuffleId, reducerId, leftTargetSize)
if (skewSpecs.isDefined) {
logDebug(s"Left side partition $partitionIndex is skewed, split it into " +
s"${skewSpecs.get.length} parts.")
leftSkewDesc.addPartitionSize(leftActualSizes(partitionIndex))
}
skewSpecs.getOrElse(Seq(leftPartSpec))
} else {
Seq(leftPartSpec)
}
首先會調(diào)用createSkewPartitionSpecs函數(shù)來進(jìn)行嘗試split處理,如果返回Some表示能進(jìn)行split番电,將該分區(qū)的大小add到leftSkewDesc竟坛,用于統(tǒng)計AQE信息。之后返回當(dāng)前分區(qū)信息,如果skewSpecs=None担汤,那么返回Seq長度為1的原始分區(qū)規(guī)則涎跨。
/**
* Splits the skewed partition based on the map size and the target partition size
* after split, and create a list of `PartialMapperPartitionSpec`. Returns None if can't split.
* 為什么這里代碼看起來像是做合并呢,因為mapPartitionSizes對應(yīng)一個reducePart在上游需要讀取的part分區(qū)崭歧,但是這里將其合并為多個子分區(qū)隅很,
* 每個子分區(qū)在AQE之后,都會單獨啟動一個分區(qū)率碾;
* 合并分片:[0,1,2,3,4,5]->[Part[0,3],Part[3,5]]
*/
private def createSkewPartitionSpecs(
shuffleId: Int,
reducerId: Int,
targetSize: Long): Option[Seq[PartialReducerPartitionSpec]] = {
val mapPartitionSizes:Array[Long] = getMapSizesForReduceId(shuffleId, reducerId) //獲取每個分區(qū)的字節(jié)數(shù)
// 嘗試進(jìn)行分區(qū)合并:比如[0,1,2,3,4,5]->[0,3,5]叔营,012合并為一個分片,34合并為一個分片
val mapStartIndices:Array[Int] = ShufflePartitionsUtil.splitSizeListByTargetSize(mapPartitionSizes, targetSize)
if (mapStartIndices.length > 1) { //如果合并后的分區(qū)數(shù)大于1個所宰,則轉(zhuǎn)換為合并后的partitions绒尊,分別對應(yīng)了mapstatus中的起始和結(jié)束索引
//mapStartIndices.sliding(2).map(t=>PartialReducerPartitionSpec(reducerId, t(0),t(1)))//這么寫不是更好嗎?
Some(mapStartIndices.indices.map { i =>
val startMapIndex = mapStartIndices(i)
val endMapIndex = if (i == mapStartIndices.length - 1) {
mapPartitionSizes.length
} else {
mapStartIndices(i + 1)
}
PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex)
})
} else {//如果合并后只有一個索引仔粥,則不使用
None
}
}
判斷一個split到底切分為多少片其實也是個費勁的過程婴谱。其中g(shù)etMapSizesForReduceId這個方法需要解釋一下,我們知道Shuffle過程中躯泰,上游的Mapper端生成數(shù)據(jù)后谭羔,是按照reduceId來排序的,并整體放在一個data文件中麦向,同時生成一個索引文件瘟裸。Reduce端可以根據(jù)索引文件中起始的reduceId來讀取data中對應(yīng)片段的數(shù)據(jù),由于reduce端會依賴多個mapper诵竭,所以這個方法返回了一個Array[Long]類型话告,代表需要從對應(yīng)mapId拉取的bytes大小,mapId就是對應(yīng)數(shù)組下標(biāo)卵慰。通過這個方法我們起其實也可以知道沙郭,通過shuffleId+reduceId即可知道當(dāng)前reduce都需要拉取哪些數(shù)據(jù)了。
ShufflePartitionsUtil.splitSizeListByTargetSize我們這里不深入講解呵燕,有興趣的可以自己閱讀源碼棠绘。具體就是按照targetSize來將一個reduce中需要拉取的mapPartitionSizes切分為適合的大小,方法內(nèi)部主要是一個循環(huán)函數(shù)再扭,計算相鄰的mapSize氧苍,是否能合并為targetSize的大小。最終返回mapStartIndices:Array[Int]泛范,表示哪些mapId放到一個task中進(jìn)行處理让虐,比如[0,3,5],0-1-2合并為一個分片罢荡,3-4合并為一個分片赡突,而5單獨處理对扶。
之后的流程就比較簡單了,如果split完的分片數(shù)大于1個惭缰,則獲取這個分片需要讀取的startMapIndex和endMapIndex浪南,封裝為PartialReducerPartitionSpec,這樣漱受,在實際計算的時候络凿,通過shuffleId+reduceId+startMapIndex+endMapIndex就可以得到對應(yīng)task需要拉取的數(shù)據(jù)了。
通過1.2和1.3的處理昂羡,可以分別將獲取left和right切分后的part
1.4 分配另一半Partition
小標(biāo)題表述不準(zhǔn)確絮记,想表示的是left的某個partition假如split為3份,那么right需要將對應(yīng)的partition復(fù)制3份虐先,分別和left經(jīng)過split的分片做join怨愤。
代碼很精簡,雙層for循環(huán)蛹批,相當(dāng)于做了個笛卡爾積:
for {
leftSidePartition <- leftParts
rightSidePartition <- rightParts
} {
leftSidePartitions += leftSidePartition
rightSidePartitions += rightSidePartition
}
如果left和right都沒有經(jīng)過傾斜優(yōu)化撰洗,那么這段代碼中l(wèi)eftSidePartitions和rightSidePartitions分別只有各自原始的分區(qū)“忝迹看下面的例子了赵,p1代表left或這right的index=1的分片潜支。
如果left split為3份甸赃,那么leftSidePartitions=[p1_0,p1_1,p1_2],rightSidePartitions=[p1,p1,p1]冗酿。
如果left split為3份埠对,right split為2份,那么leftSidePartitions=[p1_0,p1_0,p1_1,p1_1,p1_2,p1_2]裁替,rightSidePartitions=[p1_0,p1_1,p1_0,p1_1,p1_0,p1_1]项玛。left將被fetch和處理2次,而right是3次弱判。
是不是看到了什么缺點襟沮?
1.5 更新Join計劃
新的執(zhí)行計劃建立在left或right有經(jīng)過傾斜優(yōu)化的分區(qū),smj代表SortMergeJoin昌腰,將優(yōu)化后的分區(qū)規(guī)則更新到執(zhí)行計劃中即可开伏。
if (leftSkewDesc.numPartitions > 0 || rightSkewDesc.numPartitions > 0) {
val newLeft = CustomShuffleReaderExec(
left.shuffleStage, leftSidePartitions, leftSkewDesc.toString)
val newRight = CustomShuffleReaderExec(
right.shuffleStage, rightSidePartitions, rightSkewDesc.toString)
smj.copy(
left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true)
} else {
smj
}
2 補充
貼上optimizeSkewJoin的核心方法,推薦大家看看源代碼:
/*
* This method aim to optimize the skewed join with the following steps:
* 1. Check whether the shuffle partition is skewed based on the median size
* and the skewed partition threshold in origin smj.
* 2. Assuming partition0 is skewed in left side, and it has 5 mappers (Map0, Map1...Map4).
* And we may split the 5 Mappers into 3 mapper ranges [(Map0, Map1), (Map2, Map3), (Map4)]
* based on the map size and the max split number.
* 3. Wrap the join left child with a special shuffle reader that reads each mapper range with one
* task, so total 3 tasks.
* 4. Wrap the join right child with a special shuffle reader that reads partition0 3 times by
* 3 tasks separately.
*/
def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp {
case smj @ SortMergeJoinExec(_, _, joinType, _,
s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _),
s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _)
if supportedJoinTypes.contains(joinType) =>
// 要求left和right分片數(shù)必須相同遭商,這在sortMergeJoin中是可以實現(xiàn)的
assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length)
val numPartitions = left.partitionsWithSizes.length
// 獲取兩個rdd的所有part的bytes大小的中位數(shù)
// Use the median size of the actual (coalesced) partition sizes to detect skewed partitions.
val leftMedSize = medianSize(left.partitionsWithSizes.map(_._2))
val rightMedSize = medianSize(right.partitionsWithSizes.map(_._2))
logDebug(
s"""
|Optimizing skewed join.
|Left side partitions size info:
|${getSizeInfo(leftMedSize, left.partitionsWithSizes.map(_._2))}
|Right side partitions size info:
|${getSizeInfo(rightMedSize, right.partitionsWithSizes.map(_._2))}
""".stripMargin)
// 是否可以切分:滿足一定的join條件
val canSplitLeft = canSplitLeftSide(joinType)
val canSplitRight = canSplitRightSide(joinType)
// We use the actual partition sizes (may be coalesced) to calculate target size, so that
// the final data distribution is even (coalesced partitions + split partitions).
// 分別對兩個rdd的每個partition進(jìn)行數(shù)據(jù)傾斜優(yōu)化后的大小
val leftActualSizes = left.partitionsWithSizes.map(_._2)
val rightActualSizes = right.partitionsWithSizes.map(_._2)
val leftTargetSize = targetSize(leftActualSizes, leftMedSize)
val rightTargetSize = targetSize(rightActualSizes, rightMedSize)
val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
val leftSkewDesc = new SkewDesc
val rightSkewDesc = new SkewDesc
// 遍歷所有partition固灵,進(jìn)行合并或者切分
for (partitionIndex <- 0 until numPartitions) {
/** 遍歷每個part,如果是傾斜劫流,就按照map端分區(qū)的index進(jìn)行split巫玻,子分區(qū)不會超過targetSize*1.2的大小
* 左右rdd都需要執(zhí)行這樣的操作丛忆,
* */
//是否傾斜及是否可切分
val isLeftSkew = isSkewed(leftActualSizes(partitionIndex), leftMedSize) && canSplitLeft
val leftPartSpec:CoalescedPartitionSpec = left.partitionsWithSizes(partitionIndex)._1
//如果一個分片是經(jīng)過coalesc操作的,那么他的startIndex+1 != endIndex也就是至少讀取兩個分片
val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex
val isRightSkew = isSkewed(rightActualSizes(partitionIndex), rightMedSize) && canSplitRight
val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1
val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex
// A skewed partition should never be coalesced, but skip it here just to be safe.
val leftParts = if (isLeftSkew && !isLeftCoalesced) {//傾斜 &非coalesc才進(jìn)行自適應(yīng)分區(qū)
val reducerId = leftPartSpec.startReducerIndex
val skewSpecs:Option[Seq[PartialReducerPartitionSpec]] = createSkewPartitionSpecs(left.mapStats.shuffleId, reducerId, leftTargetSize)
if (skewSpecs.isDefined) {
logDebug(s"Left side partition $partitionIndex is skewed, split it into " +
s"${skewSpecs.get.length} parts.")
leftSkewDesc.addPartitionSize(leftActualSizes(partitionIndex))
}
skewSpecs.getOrElse(Seq(leftPartSpec))
} else {
Seq(leftPartSpec)
}
// A skewed partition should never be coalesced, but skip it here just to be safe.
val rightParts:Seq[CoalescedPartitionSpec] = if (isRightSkew && !isRightCoalesced) {
val reducerId = rightPartSpec.startReducerIndex
val skewSpecs :Option[Seq[PartialReducerPartitionSpec]]= createSkewPartitionSpecs(
right.mapStats.shuffleId, reducerId, rightTargetSize)
if (skewSpecs.isDefined) {
logDebug(s"Right side partition $partitionIndex is skewed, split it into " +
s"${skewSpecs.get.length} parts.")
rightSkewDesc.addPartitionSize(rightActualSizes(partitionIndex))
}
skewSpecs.getOrElse(Seq(rightPartSpec))
} else {
Seq(rightPartSpec)
}
/** 這里做了一個笛卡爾積操作:
* leftParts和rightParts分別存放了經(jīng)過split的分區(qū)或未經(jīng)過split的分區(qū)
* 1 left right都沒有skew:則結(jié)果各自只有一個SidePartition
* 2 left.skew right.notskew:rightSidePartition會將相同的part進(jìn)行復(fù)制(left.skew切分的part數(shù)量)
* 3 left.notSkew +right.skew:leftSidePartition會將相同的part進(jìn)行復(fù)制(right.skew切分的part數(shù)量)
* 4 left.skew+right.skew: 笛卡爾積仍秤,若left有3個part熄诡,right有2個part,那么left每個part會復(fù)制2份诗力,right每個part復(fù)制3份
* */
for {
leftSidePartition <- leftParts
rightSidePartition <- rightParts
} {
leftSidePartitions += leftSidePartition
rightSidePartitions += rightSidePartition
}
}
logDebug("number of skewed partitions: " +
s"left ${leftSkewDesc.numPartitions}, right ${rightSkewDesc.numPartitions}")
//修改經(jīng)過自適應(yīng)的join計劃
if (leftSkewDesc.numPartitions > 0 || rightSkewDesc.numPartitions > 0) {
val newLeft = CustomShuffleReaderExec(
left.shuffleStage, leftSidePartitions, leftSkewDesc.toString)
val newRight = CustomShuffleReaderExec(
right.shuffleStage, rightSidePartitions, rightSkewDesc.toString)
smj.copy(
left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true)
} else {
smj
}
}
其實整體看下來粮彤,邏輯不是那么的復(fù)雜。自適應(yīng)傾斜Join優(yōu)化并沒有使用我們熟知的分配隨機前綴Key來進(jìn)行姜骡,可能是需要進(jìn)行二次Join引入的復(fù)雜度無法預(yù)料导坟,所以這里采用split分片,進(jìn)行partition級別笛卡爾積的操作圈澈,這可能會導(dǎo)致數(shù)據(jù)傳輸量變大的問題惫周,但是整體來說還是比較可控的。