[SPARK][CORE] 面試問題之 3.2新的特性Push-based Shuffle源碼解析

歡迎關(guān)注公眾號(hào)“Tim在路上”


Spark 3.2為spark shuffle帶來了重大的改變窘行,其中新增了push-based shuffle機(jī)制。但其實(shí)在push-based shuffle 之前佑惠,業(yè)界也有人提出了remote shuffle service的實(shí)踐,不過由于它們是依賴于外部組件實(shí)現(xiàn)的所以一直不被社區(qū)所接收。

在上一講我們先來了解push-based shuffle機(jī)制的實(shí)現(xiàn)原理循狰,這里我們來通過源碼分析下其實(shí)現(xiàn)的過程咧擂。

首先逞盆,Push-based shuffle機(jī)制是不依賴于外部組件的方案,但使用升級(jí)版的ESS進(jìn)行shuffle data的合并松申,所以PBS(Push-based shuffle)只支持Yarn方式的實(shí)現(xiàn)云芦。

其次,引入PBS新特性的主要原因是為了解決大shuffle的場(chǎng)景存在的問題:

  • 第一個(gè)挑戰(zhàn)是可靠性問題贸桶。由于計(jì)算節(jié)點(diǎn)數(shù)據(jù)量大和 shuffle 工作負(fù)載的規(guī)模舅逸,可能會(huì)導(dǎo)致 shuffle fetch 失敗,從而導(dǎo)致昂貴的 stage 重試皇筛。
  • 第二個(gè)挑戰(zhàn)是效率問題琉历。由于 reducer 的 shuffle fetch 請(qǐng)求是隨機(jī)到達(dá)的,因此 shuffle 服務(wù)也會(huì)隨機(jī)訪問 shuffle 文件中的數(shù)據(jù)水醋。如果單個(gè) shuffle 塊大小較小旗笔,則 shuffle 服務(wù)產(chǎn)生的小隨機(jī)讀取會(huì)嚴(yán)重影響磁盤吞吐量,從而延長 shuffle fetch 等待時(shí)間拄踪。
  • 第三個(gè)挑戰(zhàn)是擴(kuò)展問題蝇恶。由于 external shuffle service 是我們基礎(chǔ)架構(gòu)中的共享服務(wù),因此一些對(duì) shuffle services 錯(cuò)誤調(diào)優(yōu)的作業(yè)也會(huì)影響其他作業(yè)惶桐。當(dāng)一個(gè)作業(yè)錯(cuò)誤地配置導(dǎo)致產(chǎn)生許多小的 shuffle blocks 將會(huì)給 shuffle 服務(wù)帶來壓力時(shí)撮弧,它不僅會(huì)給自身帶來性能下降潘懊,還會(huì)使共享相同 shuffle 服務(wù)的所有相鄰作業(yè)的性能下降。這可能會(huì)導(dǎo)致原本正常運(yùn)行的作業(yè)出現(xiàn)不可預(yù)測(cè)的運(yùn)行時(shí)延遲贿衍,尤其是在集群高峰時(shí)段授舟。

此外,PBS不僅適用于大shuffle的場(chǎng)景舌厨,對(duì)于大量小shuffle文件岂却,這種嚴(yán)重影響磁盤IO性能的情況下, 也有很好的性能提升。push-based shuffle并不是來替換sort-based shuffle, 它是通過補(bǔ)充的方式來優(yōu)化shuffle裙椭。

接下來我們將從以下shuffle service 準(zhǔn)備躏哩、Map端push shuffle數(shù)據(jù)、shuffle service merge數(shù)據(jù)揉燃、更新MergeStatues和reducer拉取merge shuffle 數(shù)據(jù)五部分進(jìn)行分析代碼的實(shí)現(xiàn)扫尺。

shuffle service 準(zhǔn)備

push-based shuffle依賴于driver節(jié)點(diǎn)的行為,并將其作為中心的協(xié)調(diào)節(jié)點(diǎn)炊汤,為其協(xié)調(diào)資源正驻、記錄mergeLocs信息和記錄mergeStatues等。

push-based shuffle雖然有很多的性能的提升抢腐,但是社區(qū)在其使用上還是比較保守姑曙,默認(rèn)pbs是關(guān)閉的。如果要開啟它還需要滿足比較嚴(yán)格的條件迈倍,下面我們首先了解下開啟PBS需要滿足什么伤靠。

我們從DAGScheduler類中pushBasedShuffleEnabled可以看出,開啟pbs需要滿足以下條件限制:

// 標(biāo)志開啟push-based shuffle, push based shuffle 只能在以下的情況下開啟
private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(sc.getConf, isDriver = true)

def isPushBasedShuffleEnabled(conf: SparkConf,
      isDriver: Boolean,
      checkSerializer: Boolean = true): Boolean = {
    // [1] spark.shuffle.push.enabled 設(shè)置為true
    val pushBasedShuffleEnabled = conf.get(PUSH_BASED_SHUFFLE_ENABLED)
    if (pushBasedShuffleEnabled) {
      val canDoPushBasedShuffle = {
        val isTesting = conf.get(IS_TESTING).getOrElse(false)
        // [2] spark.shuffle.service.enabled 必須設(shè)置為true, shuffle merge 就是在ess上進(jìn)行合并的
        // [3] 目前resource manager資源管理的方式啼染,只支持yarn模式
        val isShuffleServiceAndYarn = conf.get(SHUFFLE_SERVICE_ENABLED) &&
            conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn"
        // [4] 序列化程序支持對(duì)象重定位relocation
        lazy val serializerIsSupported = {
          if (checkSerializer) {
            Option(SparkEnv.get)
              .map(_.serializer)
              .filter(_ != null)
              .getOrElse(instantiateSerializerFromConf[Serializer](SERIALIZER, conf, isDriver))
              .supportsRelocationOfSerializedObjects
          } else {
            // if no need to check Serializer, always set serializerIsSupported as true
            true
          }
        }
        // [5] spark.io.encryption.enabled 需要關(guān)閉
        // TODO: [SPARK-36744] needs to support IO encryption for push-based shuffle
        val ioEncryptionDisabled = !conf.get(IO_ENCRYPTION_ENABLED)
        (isShuffleServiceAndYarn || isTesting) && ioEncryptionDisabled && serializerIsSupported
      }
      if (!canDoPushBasedShuffle) {
        logWarning("Push-based shuffle can only be enabled when the application is submitted " +
          "to run in YARN mode, with external shuffle service enabled, IO encryption disabled, " +
          "and relocation of serialized objects supported.")
      }

      canDoPushBasedShuffle
    } else {
      false
    }
  }

從上述代碼可以看出宴合,開啟push-based shuffle 需要滿足以下條件:

  • [1] spark.shuffle.push.enabled 設(shè)置為true
  • [2] spark.shuffle.service.enabled 必須設(shè)置為true, shuffle merge 就是在ess上進(jìn)行合并的
  • [3] 目前resource manager資源管理的方式,只支持yarn模式
  • [4] 序列化程序支持對(duì)象重定位relocation
  • [5] spark.io.encryption.enabled 需要關(guān)閉

如果以上條件滿足并開啟了PBS迹鹅,那么在Driver節(jié)點(diǎn)會(huì)發(fā)生哪些行為呢卦洽?這些行為的作用是什么?

我們回到DAGScheduler中斜棚,在DAGScheduler 進(jìn)行submitTasks時(shí)會(huì)為pbs做以下準(zhǔn)備:

// DAGScheduler
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
  stage match {
    // 在提交shuffleMapTask節(jié)點(diǎn)會(huì)
    case s: ShuffleMapStage =>
      outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions- 1)
      // Only generate merger location for a given shuffle dependency once.
      // [1] 如果shuffle merge 開啟阀蒂,同時(shí)shuffle merge沒有完成,準(zhǔn)備shuffleservice為ShuffleMapStage
      if (s.shuffleDep.shuffleMergeEnabled) {
        if (!s.shuffleDep.shuffleMergeFinalized) {
          prepareShuffleServicesForShuffleMapStage(s)
        } else {
          s.shuffleDep.setShuffleMergeEnabled(false)
          logInfo("Push-based shuffle disabled for $stage (${stage.name}) since it" +
            " is already shuffle merge finalized")
        }
      }
// [2] 在prepareShuffleServicesForShuffleMapStage弟蚀,通過schedulerBackend獲取ShufflePushMergerLocations
// prepareShuffleServicesForShuffleMapStage
val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
        stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)

// [3] 向主節(jié)點(diǎn)塊管理器(BlockManagerMasterEndpoint 類)發(fā)送GetShufflePushMergerLocations消息
def getShufflePushMergerLocations(
      numMergersNeeded: Int,
      hostsToFilter: Set[String]): Seq[BlockManagerId] = {
    driverEndpoint.askSync[Seq[BlockManagerId]](
      GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter))
  }

從上面可以看出脂新,當(dāng)DAGScheduler 進(jìn)行submitTasks時(shí),如果stage是ShuffleMapStage粗梭,同時(shí)shuffle merge沒有完成,那么shuffleMerge會(huì)向resource manager資源管理器后端詢問可用于shuffle merge services的Executor列表级零。

在返回用于托管 shuffle merge 服務(wù)的可用節(jié)點(diǎn)后断医,DAGScheduler 將它們記錄在ShuffleDependency的mergerLocs 屬性中滞乙。下面是其詳細(xì)的過程:

  • [1] 如果shuffle merge 開啟,同時(shí)shuffle merge沒有完成鉴嗤,準(zhǔn)備shuffleService為ShuffleMapStage斩启。
  • [2] 在prepareShuffleServicesForShuffleMapStage,通過schedulerBackend獲取ShufflePushMergerLocations醉锅,返回mergerLocs兔簇。
  • [3] 向主節(jié)點(diǎn)塊管理器(BlockManagerMasterEndpoint 類)發(fā)送GetShufflePushMergerLocations消息

現(xiàn)在我們進(jìn)入BlockManagerMasterEndpoint類的getShufflePushMergerLocations方法中,進(jìn)一步看看shuffleMerge是如何獲取足夠的可用于合并的Executor列表的硬耍。

// BlockManagerMasterEndpoint
// 獲取足夠的executor進(jìn)行合并
private def getShufflePushMergerLocations(
    numMergersNeeded: Int,
    hostsToFilter: Set[String]): Seq[BlockManagerId] = {
  // [1] 通過blockManagerIdByExecutor過濾非driver的Executor如果滿足numMergersNeeded則直接返回
  val blockManagerHosts = blockManagerIdByExecutor
.filterNot(_._2.isDriver).values.map(_.host).toSet
  val filteredBlockManagerHosts = blockManagerHosts.filterNot(hostsToFilter.contains(_))
  val filteredMergersWithExecutors = filteredBlockManagerHosts.map(
BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, _,externalShuffleServicePort))
  // Enough mergers are available as part of active executors list
  if (filteredMergersWithExecutors.size >= numMergersNeeded) {
    filteredMergersWithExecutors.toSeq
  } else {
    // [2] 否則需要激活過去使用的Executor(最多 500 個(gè))用于進(jìn)行合并
    // Delta mergers added from inactive mergers list to the active mergers list
    val filteredMergersWithExecutorsHosts = filteredMergersWithExecutors.map(_.host)
    val filteredMergersWithoutExecutors =shuffleMergerLocations.values
      .filterNot(x => hostsToFilter.contains(x.host))
      .filterNot(x => filteredMergersWithExecutorsHosts.contains(x.host))
    val randomFilteredMergersLocations =
      if (filteredMergersWithoutExecutors.size >
        numMergersNeeded - filteredMergersWithExecutors.size) {
        Utils.randomize(filteredMergersWithoutExecutors)
          .take(numMergersNeeded - filteredMergersWithExecutors.size)
      } else {
        filteredMergersWithoutExecutors
      }
    filteredMergersWithExecutors.toSeq ++ randomFilteredMergersLocations
  }
}

[1] 通過blockManagerIdByExecutor過濾非driver的Executor如果滿足numMergersNeeded則直接返回

[2] 否則需要激活過去使用的Executor(最多 500 個(gè))用于進(jìn)行合并垄琐。

從中可以看出,如果executor數(shù)不滿足numMergersNeeded经柴,會(huì)從過去使用executor中選擇進(jìn)行激活狸窘,直到獲取到足夠的可用于合并的Executor列表。

這時(shí)shuffle merge service已經(jīng)準(zhǔn)備好了坯认,同時(shí)其被記錄在shuffleDependency的mergerLocs 屬性中翻擒。

總而言之,這個(gè)階段在Driver的DAGScheduler中主要做了兩件事:

  1. 獲取足夠的可用于shuffle merge services的Executor列表牛哺。
  2. 將它們記錄在ShuffleDependency的mergerLocs屬性中陋气。

那么shuffle data 是如何被push到shuffle service中的呢?

Map端push shuffle數(shù)據(jù)

乍一看引润,shuffle Writer中的代碼并沒有變化巩趁,沒有增加一種新的shuffle Writer。但PBS的實(shí)現(xiàn)主要是shuffle data生成后推送出去進(jìn)行合并椰拒。

還記的在介紹getWriter時(shí)(參考bypass的文章)晶渠,在ShuffleWriteProcessor.write 中,在 ShuffleWriter.write 成功后, 曾有段shuffleMerge處理的代碼燃观。

下面我們來詳細(xì)介紹下push-based shuffle 是怎樣處理getWriter返回的結(jié)果數(shù)據(jù)的褒脯。

def write(
    ...
    val manager = SparkEnv.get.shuffleManager
    writer = manager.getWriter[Any, Any](
      dep.shuffleHandle,
      mapId,
      context,
      createMetricsReporter(context))
    writer.write(
      rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    val mapStatus = writer.stop(success = true)
    if (mapStatus.isDefined) {
      // 創(chuàng)建了一個(gè)ShuffleBlockPusher實(shí)例并調(diào)用了它的initialBlockPush方法。
      // 在該方法內(nèi)部缆毁,推送器通過獲取連續(xù)的 shuffle 數(shù)據(jù)塊來創(chuàng)建 shuffle 合并請(qǐng)求
      if (dep.shuffleMergeEnabled && dep.getMergerLocs.nonEmpty && !dep.shuffleMergeFinalized) {
        manager.shuffleBlockResolver match {
          case resolver: IndexShuffleBlockResolver =>
            val dataFile = resolver.getDataFile(dep.shuffleId, mapId)
            new ShuffleBlockPusher(SparkEnv.get.conf)
              .initiateBlockPush(dataFile, writer.getPartitionLengths(), dep, partition.index)
          case _ =>
        }
      }
    }
    mapStatus.get
  } catch {
  ...
  }
}

從上面的代碼可見番川,在執(zhí)行完map端的writer后,會(huì)判斷shuffleMergeEnabled是否開啟, 要求dependency中MergerLocs不為空脊框,其次就是shuffleMerge還未執(zhí)行完成颁督。如果滿足這些條件,則會(huì)創(chuàng)建ShuffleBlockPusher類浇雹,并調(diào)用其initiateBlockPush方法沉御。

看來具體的push實(shí)現(xiàn)位于initiateBlockPush方法中。

private[shuffle] def initiateBlockPush(
    dataFile: File,
    partitionLengths: Array[Long],
    dep: ShuffleDependency[_, _, _],
    mapIndex: Int): Unit = {
  val numPartitions = dep.partitioner.numPartitions
  val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
  // [1] 將map task的shuffle數(shù)據(jù)轉(zhuǎn)換為PushRequest請(qǐng)求
  val requests = prepareBlockPushRequests(numPartitions, mapIndex, dep.shuffleId,
    dep.shuffleMergeId, dataFile, partitionLengths, dep.getMergerLocs, transportConf)
  // [2] 將PushRequest請(qǐng)求列表變?yōu)殡S機(jī)請(qǐng)求昭灵,這樣不同的mapper同時(shí)推送塊不會(huì)推送相同范圍的 shuffle 分區(qū)
  // Randomize the orders of the PushRequest, so different mappers pushing blocks at the same
  // time won't be pushing the same ranges of shuffle partitions.
pushRequests++= Utils.randomize(requests)

  // [3] 盡力而為的push shuffle數(shù)據(jù)到ess
  submitTask(() => {
    tryPushUpToMax()
  })
}

從上面的代碼可以看出吠裆,在initiateBlockPush主要做了以下三步工作:

  • [1] 將map task的shuffle數(shù)據(jù)轉(zhuǎn)換為PushRequest請(qǐng)求
  • [2] 將PushRequest請(qǐng)求列表變?yōu)殡S機(jī)請(qǐng)求伐谈,這樣不同的mapper同時(shí)推送塊不會(huì)推送相同范圍的 shuffle 分區(qū)
  • [3] 盡力而為的push shuffle數(shù)據(jù)到shuffle merge service

顧名思義,prepareBlockPushRequests方法的作用是將map端生成的shuffle data封裝轉(zhuǎn)換為PushRequest請(qǐng)求试疙。不過除此以外還會(huì)將shuffle中連續(xù)的塊分到同一個(gè)請(qǐng)求中诵棵,可以允許更有效的數(shù)據(jù)讀取。如下圖所示:

// prepareBlockPushRequests 方法
for (reduceId <-0 until numPartitions) {
// 分區(qū)塊進(jìn)行合并并非直接按分區(qū)發(fā)送祝旷,而是通過以下公式
  val mergeId = math.min(math.floor(reduceId *1.0 / numPartitions * numMergers),
          numMergers -1 ).asInstanceOf[Int ]

在prepareBlockPushRequests履澳,分區(qū)塊進(jìn)行合并按照chunk進(jìn)行發(fā)送,通過上面的公式進(jìn)行劃分合并塊的怀跛,同時(shí)會(huì)跳過空的分區(qū)塊和超過maxBlockSizeToPush距贷,從而避免數(shù)據(jù)傾斜。這里已經(jīng)在上一講講過了敌完,就不再過多贅述了储耐,具體可以看上一講push-based shuffle初探。

在封裝好PushRequest請(qǐng)求后滨溉,最后通過調(diào)用tryPushUpToMax方法將數(shù)據(jù)塊推送出去什湘。

tryPushUpToMax方法調(diào)用的是我們?cè)趕huffle read中使用的pushUpToMax方法。這個(gè)方法在shuffle reader文章中也已經(jīng)介紹過了晦攒,這里只簡單總結(jié)下闽撤。這里的工作就是將shuffle data push到對(duì)應(yīng)的shuffle merge service。在發(fā)送時(shí)將數(shù)據(jù)封裝為PushBlockStream脯颜,push 的時(shí)候使用的是 streamUpload 的方式哟旗,通過 OneForOneBlockPusher ,利用 RetryingBlockFetcher 進(jìn)行發(fā)送栋操。

在Map端push data的階段闸餐,主要做了三件事:

  1. 將相同分區(qū)shuffle data block合并拆分到chunk中,并將其封裝為PushRequest矾芙;
  2. 隨機(jī)打亂PushRequest舍沙,避免順序的構(gòu)造push chunk,導(dǎo)致熱點(diǎn)和嚴(yán)重的爭(zhēng)用沖突剔宪;
  3. 通過pushUpToMax方法拂铡,將數(shù)據(jù)封裝為PushBlockStream,通過 OneForOneBlockPusher葱绒,利用 RetryingBlockFetcher 進(jìn)行發(fā)送感帅。

Shuffle Service merge數(shù)據(jù)

shuffle service 上使用 TransportRequestHandler.processStreamUpload處理上傳的shuffle數(shù)據(jù)塊流。一個(gè) block 的數(shù)據(jù)會(huì)被拆成若干個(gè) bytebuffer 進(jìn)行處理地淀,最后 onComplete 的時(shí)候進(jìn)行合并失球。

if (msgObj instanceof PushBlockStream) {
  PushBlockStream message = (PushBlockStream) msgObj;
  checkAuth(client, message.appId);
  return mergeManager.receiveBlockDataAsStream(message);
....
}

如上代碼所示:在合并過程中,會(huì)調(diào)用ExternalBlockHandler.receiveStream方法中操作請(qǐng)求帮毁。它將推送消息傳遞給RemoteBlockPushResolver的receiveBlockDataAsStream方法她倘。

那么shuffle data具體是如何被合并的璧微,這里涉及到一個(gè)重要的數(shù)據(jù)結(jié)構(gòu)AppShufflePartitionInfo。

在ShuffleService上會(huì)保存appId到AppShuffleInfo的map映射硬梁,每個(gè)AppShuffleInfo內(nèi)會(huì)保存shuffleId到AppShuffleMergePartitionsInfo 的map映射在appShuffleMergePartitionsInfo內(nèi)會(huì)保存reduceId到AppShufflePartitionInfo 的映射胞得,最終在AppShufflePartitionInfo 內(nèi)部會(huì)保存三個(gè)File;


public static class AppShufflePartitionInfo {

  private final String appId;
  private final int shuffleId;
  private final int shuffleMergeId;
  private final int reduceId;
  private final File dataFile;
  // The merged shuffle data file channel
  public final FileChannel dataChannel;
  // The index file for a particular merged shuffle contains the chunk offsets.
  private final MergeShuffleFile indexFile;
  // The meta file for a particular merged shuffle contains all the map indices that belong to
  // every chunk. The entry per chunk is a serialized bitmap.
  private final MergeShuffleFile metaFile;
...
}

如上代碼所示荧止,AppShufflePartitionInfo 中包含 3 個(gè) FileChannel,分別用于 data/index/meta 信息的保存阶剑。


當(dāng)shuffle service接收到 block 塊時(shí)跃巡,在嘗試添加到對(duì)應(yīng)的 shuffle 合并文件之前,它首先要檢索相應(yīng)的 Shuffle 分區(qū)元數(shù)據(jù)牧愁。保存的元數(shù)據(jù)可以幫助shuffle service正確處理一些潛在的異常場(chǎng)景素邪。

在onData進(jìn)行數(shù)據(jù)處理時(shí),對(duì)于 streamUpload 過來的 ByteBuffer猪半,只會(huì)對(duì) AppShufflePartitionInfo 進(jìn)行加鎖兔朦,如果當(dāng)前 ByteBuffer 的數(shù)據(jù)不屬于currentMergingMapId 的,則加入到一個(gè)列表中磨确。在寫當(dāng)前正在處理的 ByteBuffer 前沽甥,會(huì)將前面列表中的數(shù)據(jù)都寫入到數(shù)據(jù)文件中。

最終的合并時(shí)在onComplete進(jìn)行的乏奥,下面我們?cè)敿?xì)看下合并要滿足的條件:

@Override
public void onComplete(String streamId) throws IOException {
  synchronized (partitionInfo) {
logger.trace("{} onComplete invoked", partitionInfo);

    AppShuffleMergePartitionsInfo info = appShuffleInfo.shuffles.get(partitionInfo.shuffleId);
    // [1] 如果shuffle map任務(wù)終止(或者說reducers已經(jīng)啟動(dòng))摆舟,表明太遲了,則不會(huì)發(fā)生合并
    if (isTooLate(info, partitionInfo.reduceId)) {
     ...
    }
    // [2] stage的狀態(tài)不確定邓了,(stage在重試中), 也不會(huì)進(jìn)行合并
    if (isStale(info, partitionInfo.shuffleMergeId)) {
      ...
    }

    // [3] 校驗(yàn)給定 reducer 的一個(gè)映射流只能與現(xiàn)有文件合并
    // Check if we can commit this block
    if (allowedToWrite()) {
      // [4] 如果是推測(cè)任務(wù)執(zhí)行中發(fā)送重復(fù)的reducer數(shù)據(jù)恨诱,則直接返回 
      if (isDuplicateBlock()) {
        deferredBufs = null;
        return;
      }
      if (partitionInfo.getCurrentMapIndex() < 0) {
        ...
      }
      // [5] 執(zhí)行buffer合并
      long updatedPos = partitionInfo.getDataFilePos() + length;
      boolean indexUpdated = false;
      if (updatedPos - partitionInfo.getLastChunkOffset() >= mergeManager.minChunkSize) {
        try {
          partitionInfo.updateChunkInfo(updatedPos, mapIndex);
          indexUpdated = true;
        } catch (IOException ioe) {
          incrementIOExceptionsAndAbortIfNecessary();
          // If the above doesn't throw a RuntimeException, then we do not propagate the
          // IOException to the client. This may increase the chunk size however the increase is
          // still limited because of the limit on the number of IOExceptions for a
          // particular shuffle partition.
        }
      }
      partitionInfo.setDataFilePos(updatedPos);
      partitionInfo.setCurrentMapIndex(-1);

      // update merged results
      partitionInfo.blockMerged(mapIndex);
      if (indexUpdated) {
        partitionInfo.resetChunkTracker();
      }
    } else {
      deferredBufs = null;
      throw new BlockPushNonFatalFailure(
        new BlockPushReturnCode(ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(), streamId)
          .toByteBuffer(), BlockPushNonFatalFailure.getErrorMsg(
            streamId, ReturnCode.BLOCK_APPEND_COLLISION_DETECTED));
    }
  }
  isWriting = false;
}

可見在合并前也需要滿足PushBlockStreamCallback定義的條件:

  • [1] 如果shuffle map任務(wù)終止(或者說reducers已經(jīng)啟動(dòng)),表明太遲了骗炉,則不會(huì)發(fā)生合并
  • [2] stage的狀態(tài)不確定照宝,(stage在重試中), 也不會(huì)進(jìn)行合并
  • [3] 校驗(yàn)給定 reducer 的一個(gè)映射流只能與現(xiàn)有文件合并
  • [4] 如果是推測(cè)任務(wù)執(zhí)行中發(fā)送重復(fù)的reducer數(shù)據(jù),則直接返回

最后再進(jìn)行合并時(shí)痕鳍,會(huì)將 shuffle 字節(jié)添加到數(shù)據(jù)文件后硫豆,合并器首先將合并后的偏移量寫入索引文件,然后才將映射器信息添加到元文件中笼呆。

這里的邏輯有點(diǎn)復(fù)雜熊响,為了避免錯(cuò)誤,只總結(jié)下要點(diǎn):

  1. shuffle service 上使用 TransportRequestHandler.processStreamUpload處理上傳的shuffle數(shù)據(jù)塊流诗赌。一個(gè) block 的數(shù)據(jù)會(huì)被拆成若干個(gè) bytebuffer 進(jìn)行處理汗茄。
  2. 在onData進(jìn)行數(shù)據(jù)處理時(shí),對(duì)于 streamUpload 過來的 ByteBuffer铭若,只會(huì)對(duì) AppShufflePartitionInfo 進(jìn)行加鎖洪碳,如果當(dāng)前 ByteBuffer 的數(shù)據(jù)不屬于currentMergingMapId 的递览,則加入到一個(gè)列表中。在寫當(dāng)前正在處理的 ByteBuffer 前瞳腌,會(huì)將前面列表中的數(shù)據(jù)都寫入到數(shù)據(jù)文件中绞铃。
  3. 在onComplete進(jìn)行合并時(shí),會(huì)先判斷是否滿足合并的條件嫂侍。合并時(shí)儿捧,會(huì)將 shuffle 字節(jié)append到數(shù)據(jù)data文件后,合并器首先將合并后的偏移量寫入索引index文件挑宠,然后才將映射器信息添加到元meta文件中菲盾。

獲取更新MergeStatues

當(dāng)每個(gè) ShuffleMapTask 結(jié)束的時(shí)候,DAGScheduler都會(huì)去判斷 ShuffleMapStage 是否 pending partitions 為空各淀,如果為空說明 stage 結(jié)束了懒鉴,此時(shí)開始向 shuffle service 上發(fā)送 finalize 信息,并將信息返回給 driver 并添加到 merge statuses 信息中碎浇。

private[scheduler] def handleTaskCompletion(event: CompletionEvent): Unit = {
     ...
        if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
              if (!shuffleStage.shuffleDep.shuffleMergeFinalized &&
                shuffleStage.shuffleDep.getMergerLocs.nonEmpty) {
                scheduleShuffleMergeFinalize(shuffleStage)
              } else {
                processShuffleMapStageCompletion(shuffleStage)
              }
            }
     ...

上面是DAGScheduler.handleTaskCompletion中的代碼临谱,可以從中看出在TaskCompletion時(shí),當(dāng)DAGScheduler收到有關(guān)執(zhí)行的最后一個(gè)map task的通知時(shí), 它會(huì)向所有 shuffle 服務(wù)發(fā)送FinalizeShuffleMerge消息南捂。服務(wù)攔截消息并從MergerShuffleFileManager完成合并過程吴裤。任何正在進(jìn)行的合并都會(huì)被中斷并取消,以避免合并文件中有部分?jǐn)?shù)據(jù)溺健。

同時(shí)DAGScheduler 等待spark.shuffle.push.result.timeout來獲取響應(yīng)麦牺。如果 shuffle 服務(wù)在此延遲內(nèi)響應(yīng),DAGScheduler會(huì)攔截包含以下屬性的響應(yīng):

public class MergeStatuses extends BlockTransferMessage {
  /** Shuffle ID **/
  public final int shuffleId;
  /**
   * shuffleMergeId is used to uniquely identify merging process of shuffle by
   * an indeterminate stage attempt.
   */
  public final int shuffleMergeId;
  /**
   * Array of bitmaps tracking the set of mapper partition blocks merged for each
   * reducer partition
   */
  public final RoaringBitmap[] bitmaps;
  /** Array of reducer IDs **/
  public final int[] reduceIds;
  /**
   * Array of merged shuffle partition block size. Each represents the total size of all
   * merged shuffle partition blocks for one reducer partition.
   * **/
  public final long[] sizes;

獲取MergeStatues的過程和MapStatus的過程類似鞭缭,其是通過getPushBasedShuffleMapSizesByExecutorId進(jìn)行獲取的剖膳,具體可以參考shuffle reader 的文章。

在更新Merge Status階段岭辣,主要做了下面的工作:

  1. 向shuffle service上發(fā)送 FinalizeShuffleMerge 信息吱晒。注意:任何正在進(jìn)行的合并都會(huì)被中斷并取消,以避免合并文件中有部分?jǐn)?shù)據(jù)沦童。
  2. 等待spark.shuffle.push.result.timeout來獲取響應(yīng)仑濒,攔截獲取MergeStatuses。

reducer拉取merge shuffle數(shù)據(jù)

reduce task 開始之后偷遗,從 driver 上獲取 merge statuses 信息墩瞳,并在數(shù)據(jù)劃分的時(shí)候,如果是 merged block 則先向 shuffle service 上請(qǐng)求一次 meta 信息氏豌,獲取到 meta 信息后喉酌,利用 shuffle service 上的 index 文件信息,讀取文件中 block 數(shù)據(jù)。

通過get reader獲取shuffle數(shù)據(jù)泪电,這塊已經(jīng)在shuffle reader中講過了般妙,這里就不大段的貼代碼了,只講解涉及到pbs的地方相速,具體的可以再復(fù)習(xí)shuffle reader的文章碟渺。

// 在劃分?jǐn)?shù)據(jù)源的請(qǐng)求:本地、主機(jī)本地和遠(yuǎn)程塊突诬, 同時(shí)劃分出pbs的blocks
val remoteRequests = partitionBlocksByFetchMode(
  blocksByAddress, localBlocks, hostLocalBlocksByExecutor, pushMergedLocalBlocks)

我們具體的收集pbs的遠(yuǎn)程blocks地址的實(shí)現(xiàn):

blockId match {
        // 獲取數(shù)據(jù)請(qǐng)求
        case ShuffleBlockChunkId(_, _, _, _) =>
          if (curRequestSize >= targetRemoteRequestSize ||
            curBlocks.size >= maxBlocksInFlightPerAddress) {
            curBlocks = createFetchRequests(curBlocks.toSeq, address, isLast = false,
              collectedRemoteRequests, enableBatchFetch = false)
            curRequestSize = curBlocks.map(_.size).sum
          }
       // 從forMergedMetas可以看出這里會(huì)為獲取元數(shù)據(jù)構(gòu)建單獨(dú)的請(qǐng)求
        case ShuffleMergedBlockId(_, _, _) =>
          if (curBlocks.size >= maxBlocksInFlightPerAddress) {
            curBlocks = createFetchRequests(curBlocks.toSeq, address, isLast = false,
              collectedRemoteRequests, enableBatchFetch = false, forMergedMetas = true)
          }
        case _ =>
          // For batch fetch, the actual block in flight should count for merged block.
          val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress
          if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) {
            curBlocks = createFetchRequests(curBlocks.toSeq, address, isLast = false,
              collectedRemoteRequests, doBatchFetch)
            curRequestSize = curBlocks.map(_.size).sum
          }
      }

從這里可以看出如果是 merged block 則先向 shuffle service 上請(qǐng)求一次 meta 信息止状。

接下來,我們?cè)賮砜聪掳l(fā)送請(qǐng)求的方法:

def send(remoteAddress: BlockManagerId, request: FetchRequest): Unit = {
  if (request.forMergedMetas) {
    pushBasedFetchHelper.sendFetchMergedStatusRequest(request)
  } else {
    sendRequest(request)
  }
numBlocksInFlightPerAddress(remoteAddress) =
numBlocksInFlightPerAddress.getOrElse(remoteAddress, 0) + request.blocks.size
}

iterator.addToResultsQueue(PushMergedRemoteMetaFetchResult(shuffleId, shuffleMergeId,
            reduceId, sizeMap((shuffleId, reduceId)), meta.readChunkBitmaps(), address))

在發(fā)送請(qǐng)求時(shí)攒霹,會(huì)區(qū)分請(qǐng)求元數(shù)據(jù)的請(qǐng)求和數(shù)據(jù)的獲取請(qǐng)求。元數(shù)據(jù)請(qǐng)求返回PushMergedRemoteMetaFetchResult 浆洗。

最后在ShuffleBlockFetcherIterator.next讀取數(shù)據(jù)時(shí)催束,將獲取元數(shù)據(jù)進(jìn)行模式匹配:

 case PushMergedRemoteMetaFetchResult(
          shuffleId, shuffleMergeId, reduceId, blockSize, bitmaps, address) =>
// ...
          val blocksToFetch = pushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(
            shuffleId, shuffleMergeId, reduceId, blockSize, bitmaps)
          val additionalRemoteReqs = new ArrayBuffer[FetchRequest]
          collectFetchRequests(address, blocksToFetch.toSeq, additionalRemoteReqs)
          fetchRequests ++= additionalRemoteReqs
          // Set result to null to force another iteration.
          result = null

從這里可以看出, 在獲取取到 PushMergedRemoteMetaFetchResult 信息后,利用 shuffle service 上的 index 文件信息伏社,再次發(fā)起讀取文件中 block 數(shù)據(jù)的請(qǐng)求抠刺。這次的請(qǐng)求blockId是ShuffleBlockChunkId類型, 從上面的代碼可以看出這意味著請(qǐng)求類型會(huì)將forMergedMetas標(biāo)志設(shè)置為 false摘昌。

獲取merge shuffle數(shù)據(jù)速妖,主要有以下步驟:

  1. 從driver上獲取 merge statuses 信息;
  2. merged block 則先向 shuffle service 上請(qǐng)求一次 meta 信息聪黎;
  3. 獲取到 meta 信息后罕容,利用shuffle service 上的 index 文件信息,讀取文件中 block 數(shù)據(jù)稿饰。

在最后锦秒,我們?cè)賲R總下push-based shuffle的詳細(xì)過程:

  1. 在Driver端,當(dāng)dagScheduler提交ShuffleMapStage類型的任務(wù)時(shí)喉镰,會(huì)向資源管理器后端詢問可用于shuffle merge services的Executor列表旅择。在資源管理器返回用于托管 shuffle merge 服務(wù)的可用節(jié)點(diǎn)后,dagScheduler 將它們記錄在shuffleDependency的mergerLocs屬性中侣姆。
  2. 在 map端生真,當(dāng)ShuffleWriter.write 成功后,會(huì)調(diào)用 ShuffleWriter.initiateBlockPush 捺宗,將已經(jīng)落盤的 ShuffleBlock push 到遠(yuǎn)端的 shuffle service 上柱蟀。將數(shù)據(jù)封裝為PushBlockStream,push 的時(shí)候使用的是 streamUpload 的方式偿凭,通過 OneForOneBlockPusher产弹,利用 RetryingBlockFetcher 進(jìn)行發(fā)送。
  3. 在shuffle service中,使用TransportRequestHandler.processStreamUpload處理上傳的shuffle數(shù)據(jù)塊流痰哨。一個(gè) block 的數(shù)據(jù)會(huì)被拆成若干個(gè) bytebuffer 進(jìn)行處理胶果。在shuffle service中,每個(gè)reduceid會(huì)維護(hù)一個(gè)AppShufflePartitionInfo 斤斧,在其中包含 3 個(gè) FileChannel早抠,分別用于 data/index/meta 信息的保存。在onData進(jìn)行數(shù)據(jù)處理時(shí)撬讽,對(duì)于 streamUpload 過來的 ByteBuffer蕊连,只會(huì)對(duì) AppShufflePartitionInfo 進(jìn)行加鎖,如果當(dāng)前 ByteBuffer 的數(shù)據(jù)不屬于currentMergingMapId 的游昼,則加入到一個(gè)列表中甘苍。最后 onComplete 的時(shí)候進(jìn)行合并合并時(shí),會(huì)將 shuffle 字節(jié)添加到數(shù)據(jù)文件后,合并器首先將合并后的偏移量寫入索引文件,然后才將映射器信息添加到元文件中榨惠。
  4. 當(dāng)每個(gè) ShuffleMapTask 結(jié)束的時(shí)候隆判,DAGScheduler都會(huì)去判斷 ShuffleMapStage 是否 pending partitions 為空,如果為空說明 stage 結(jié)束了,此時(shí)開始向 shuffle service 上發(fā)送 finalize 信息,并將信息返回給 driver 并添加到 merge statuses 信息中。同時(shí)DAGScheduler 等待spark.shuffle.push.result.timeout來獲取響應(yīng)顽铸。
  5. reduce task 開始之后,從 driver 上獲取 merge statuses 信息料皇,并在數(shù)據(jù)劃分的時(shí)候谓松,如果是 merged block 則先向 shuffle service 上請(qǐng)求一次 meta 信息,獲取到 meta 信息后瓶蝴,利用 shuffle service 上的 index 文件信息毒返,讀取文件中 block 數(shù)據(jù)。

再我們了解完P(guān)ush-based shuffle代碼后舷手,我們來回答下以下幾個(gè)問題:

  1. push-based shuffle是在shuffle write結(jié)束后追加了push與合并操作拧簸,那么是否只有在發(fā)生FetchFail的情況下(導(dǎo)致stage重試)push-base shuffle的性能更好?
  2. push-based shuffle 能否進(jìn)行精簡下男窟?例如取消掉driver端的行為盆赤。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市歉眷,隨后出現(xiàn)的幾起案子牺六,更是在濱河造成了極大的恐慌,老刑警劉巖汗捡,帶你破解...
    沈念sama閱讀 206,723評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件淑际,死亡現(xiàn)場(chǎng)離奇詭異畏纲,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)春缕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,485評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門盗胀,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人锄贼,你說我怎么就攤上這事票灰。” “怎么了宅荤?”我有些...
    開封第一講書人閱讀 152,998評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵屑迂,是天一觀的道長。 經(jīng)常有香客問我冯键,道長惹盼,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,323評(píng)論 1 279
  • 正文 為了忘掉前任惫确,我火速辦了婚禮逻锐,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘雕薪。我一直安慰自己,他們只是感情好晓淀,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,355評(píng)論 5 374
  • 文/花漫 我一把揭開白布所袁。 她就那樣靜靜地躺著,像睡著了一般凶掰。 火紅的嫁衣襯著肌膚如雪燥爷。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,079評(píng)論 1 285
  • 那天懦窘,我揣著相機(jī)與錄音前翎,去河邊找鬼。 笑死畅涂,一個(gè)胖子當(dāng)著我的面吹牛港华,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播午衰,決...
    沈念sama閱讀 38,389評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼立宜,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了臊岸?” 一聲冷哼從身側(cè)響起橙数,我...
    開封第一講書人閱讀 37,019評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎帅戒,沒想到半個(gè)月后灯帮,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,519評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,971評(píng)論 2 325
  • 正文 我和宋清朗相戀三年钟哥,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了迎献。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,100評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡瞪醋,死狀恐怖忿晕,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情银受,我是刑警寧澤践盼,帶...
    沈念sama閱讀 33,738評(píng)論 4 324
  • 正文 年R本政府宣布,位于F島的核電站宾巍,受9級(jí)特大地震影響咕幻,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜顶霞,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,293評(píng)論 3 307
  • 文/蒙蒙 一肄程、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧选浑,春花似錦蓝厌、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,289評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至隧膘,卻和暖如春代态,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背疹吃。 一陣腳步聲響...
    開封第一講書人閱讀 31,517評(píng)論 1 262
  • 我被黑心中介騙來泰國打工蹦疑, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人萨驶。 一個(gè)月前我還...
    沈念sama閱讀 45,547評(píng)論 2 354
  • 正文 我出身青樓歉摧,卻偏偏與公主長得像,于是被迫代替她去往敵國和親腔呜。 傳聞我的和親對(duì)象是個(gè)殘疾皇子判莉,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,834評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容