歡迎關(guān)注公眾號(hào)“Tim在路上”
Spark 3.2為spark shuffle帶來了重大的改變窘行,其中新增了push-based shuffle機(jī)制。但其實(shí)在push-based shuffle 之前佑惠,業(yè)界也有人提出了remote shuffle service的實(shí)踐,不過由于它們是依賴于外部組件實(shí)現(xiàn)的所以一直不被社區(qū)所接收。
在上一講我們先來了解push-based shuffle機(jī)制的實(shí)現(xiàn)原理循狰,這里我們來通過源碼分析下其實(shí)現(xiàn)的過程咧擂。
首先逞盆,Push-based shuffle機(jī)制是不依賴于外部組件的方案,但使用升級(jí)版的ESS進(jìn)行shuffle data的合并松申,所以PBS(Push-based shuffle)只支持Yarn方式的實(shí)現(xiàn)云芦。
其次,引入PBS新特性的主要原因是為了解決大shuffle的場(chǎng)景存在的問題:
- 第一個(gè)挑戰(zhàn)是可靠性問題贸桶。由于計(jì)算節(jié)點(diǎn)數(shù)據(jù)量大和 shuffle 工作負(fù)載的規(guī)模舅逸,可能會(huì)導(dǎo)致 shuffle fetch 失敗,從而導(dǎo)致昂貴的 stage 重試皇筛。
- 第二個(gè)挑戰(zhàn)是效率問題琉历。由于 reducer 的 shuffle fetch 請(qǐng)求是隨機(jī)到達(dá)的,因此 shuffle 服務(wù)也會(huì)隨機(jī)訪問 shuffle 文件中的數(shù)據(jù)水醋。如果單個(gè) shuffle 塊大小較小旗笔,則 shuffle 服務(wù)產(chǎn)生的小隨機(jī)讀取會(huì)嚴(yán)重影響磁盤吞吐量,從而延長 shuffle fetch 等待時(shí)間拄踪。
- 第三個(gè)挑戰(zhàn)是擴(kuò)展問題蝇恶。由于 external shuffle service 是我們基礎(chǔ)架構(gòu)中的共享服務(wù),因此一些對(duì) shuffle services 錯(cuò)誤調(diào)優(yōu)的作業(yè)也會(huì)影響其他作業(yè)惶桐。當(dāng)一個(gè)作業(yè)錯(cuò)誤地配置導(dǎo)致產(chǎn)生許多小的 shuffle blocks 將會(huì)給 shuffle 服務(wù)帶來壓力時(shí)撮弧,它不僅會(huì)給自身帶來性能下降潘懊,還會(huì)使共享相同 shuffle 服務(wù)的所有相鄰作業(yè)的性能下降。這可能會(huì)導(dǎo)致原本正常運(yùn)行的作業(yè)出現(xiàn)不可預(yù)測(cè)的運(yùn)行時(shí)延遲贿衍,尤其是在集群高峰時(shí)段授舟。
此外,PBS不僅適用于大shuffle的場(chǎng)景舌厨,對(duì)于大量小shuffle文件岂却,這種嚴(yán)重影響磁盤IO性能的情況下, 也有很好的性能提升。push-based shuffle并不是來替換sort-based shuffle, 它是通過補(bǔ)充的方式來優(yōu)化shuffle裙椭。
接下來我們將從以下shuffle service 準(zhǔn)備躏哩、Map端push shuffle數(shù)據(jù)、shuffle service merge數(shù)據(jù)揉燃、更新MergeStatues和reducer拉取merge shuffle 數(shù)據(jù)五部分進(jìn)行分析代碼的實(shí)現(xiàn)扫尺。
shuffle service 準(zhǔn)備
push-based shuffle依賴于driver節(jié)點(diǎn)的行為,并將其作為中心的協(xié)調(diào)節(jié)點(diǎn)炊汤,為其協(xié)調(diào)資源正驻、記錄mergeLocs信息和記錄mergeStatues等。
push-based shuffle雖然有很多的性能的提升抢腐,但是社區(qū)在其使用上還是比較保守姑曙,默認(rèn)pbs是關(guān)閉的。如果要開啟它還需要滿足比較嚴(yán)格的條件迈倍,下面我們首先了解下開啟PBS需要滿足什么伤靠。
我們從DAGScheduler類中pushBasedShuffleEnabled可以看出,開啟pbs需要滿足以下條件限制:
// 標(biāo)志開啟push-based shuffle, push based shuffle 只能在以下的情況下開啟
private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(sc.getConf, isDriver = true)
def isPushBasedShuffleEnabled(conf: SparkConf,
isDriver: Boolean,
checkSerializer: Boolean = true): Boolean = {
// [1] spark.shuffle.push.enabled 設(shè)置為true
val pushBasedShuffleEnabled = conf.get(PUSH_BASED_SHUFFLE_ENABLED)
if (pushBasedShuffleEnabled) {
val canDoPushBasedShuffle = {
val isTesting = conf.get(IS_TESTING).getOrElse(false)
// [2] spark.shuffle.service.enabled 必須設(shè)置為true, shuffle merge 就是在ess上進(jìn)行合并的
// [3] 目前resource manager資源管理的方式啼染,只支持yarn模式
val isShuffleServiceAndYarn = conf.get(SHUFFLE_SERVICE_ENABLED) &&
conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn"
// [4] 序列化程序支持對(duì)象重定位relocation
lazy val serializerIsSupported = {
if (checkSerializer) {
Option(SparkEnv.get)
.map(_.serializer)
.filter(_ != null)
.getOrElse(instantiateSerializerFromConf[Serializer](SERIALIZER, conf, isDriver))
.supportsRelocationOfSerializedObjects
} else {
// if no need to check Serializer, always set serializerIsSupported as true
true
}
}
// [5] spark.io.encryption.enabled 需要關(guān)閉
// TODO: [SPARK-36744] needs to support IO encryption for push-based shuffle
val ioEncryptionDisabled = !conf.get(IO_ENCRYPTION_ENABLED)
(isShuffleServiceAndYarn || isTesting) && ioEncryptionDisabled && serializerIsSupported
}
if (!canDoPushBasedShuffle) {
logWarning("Push-based shuffle can only be enabled when the application is submitted " +
"to run in YARN mode, with external shuffle service enabled, IO encryption disabled, " +
"and relocation of serialized objects supported.")
}
canDoPushBasedShuffle
} else {
false
}
}
從上述代碼可以看出宴合,開啟push-based shuffle 需要滿足以下條件:
- [1] spark.shuffle.push.enabled 設(shè)置為true
- [2] spark.shuffle.service.enabled 必須設(shè)置為true, shuffle merge 就是在ess上進(jìn)行合并的
- [3] 目前resource manager資源管理的方式,只支持yarn模式
- [4] 序列化程序支持對(duì)象重定位relocation
- [5] spark.io.encryption.enabled 需要關(guān)閉
如果以上條件滿足并開啟了PBS迹鹅,那么在Driver節(jié)點(diǎn)會(huì)發(fā)生哪些行為呢卦洽?這些行為的作用是什么?
我們回到DAGScheduler中斜棚,在DAGScheduler 進(jìn)行submitTasks時(shí)會(huì)為pbs做以下準(zhǔn)備:
// DAGScheduler
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
stage match {
// 在提交shuffleMapTask節(jié)點(diǎn)會(huì)
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions- 1)
// Only generate merger location for a given shuffle dependency once.
// [1] 如果shuffle merge 開啟阀蒂,同時(shí)shuffle merge沒有完成,準(zhǔn)備shuffleservice為ShuffleMapStage
if (s.shuffleDep.shuffleMergeEnabled) {
if (!s.shuffleDep.shuffleMergeFinalized) {
prepareShuffleServicesForShuffleMapStage(s)
} else {
s.shuffleDep.setShuffleMergeEnabled(false)
logInfo("Push-based shuffle disabled for $stage (${stage.name}) since it" +
" is already shuffle merge finalized")
}
}
// [2] 在prepareShuffleServicesForShuffleMapStage弟蚀,通過schedulerBackend獲取ShufflePushMergerLocations
// prepareShuffleServicesForShuffleMapStage
val mergerLocs = sc.schedulerBackend.getShufflePushMergerLocations(
stage.shuffleDep.partitioner.numPartitions, stage.resourceProfileId)
// [3] 向主節(jié)點(diǎn)塊管理器(BlockManagerMasterEndpoint 類)發(fā)送GetShufflePushMergerLocations消息
def getShufflePushMergerLocations(
numMergersNeeded: Int,
hostsToFilter: Set[String]): Seq[BlockManagerId] = {
driverEndpoint.askSync[Seq[BlockManagerId]](
GetShufflePushMergerLocations(numMergersNeeded, hostsToFilter))
}
從上面可以看出脂新,當(dāng)DAGScheduler 進(jìn)行submitTasks時(shí),如果stage是ShuffleMapStage粗梭,同時(shí)shuffle merge沒有完成,那么shuffleMerge會(huì)向resource manager資源管理器后端詢問可用于shuffle merge services的Executor列表级零。
在返回用于托管 shuffle merge 服務(wù)的可用節(jié)點(diǎn)后断医,DAGScheduler 將它們記錄在ShuffleDependency的mergerLocs 屬性中滞乙。下面是其詳細(xì)的過程:
- [1] 如果shuffle merge 開啟,同時(shí)shuffle merge沒有完成鉴嗤,準(zhǔn)備shuffleService為ShuffleMapStage斩启。
- [2] 在prepareShuffleServicesForShuffleMapStage,通過schedulerBackend獲取ShufflePushMergerLocations醉锅,返回mergerLocs兔簇。
- [3] 向主節(jié)點(diǎn)塊管理器(BlockManagerMasterEndpoint 類)發(fā)送GetShufflePushMergerLocations消息
現(xiàn)在我們進(jìn)入BlockManagerMasterEndpoint類的getShufflePushMergerLocations方法中,進(jìn)一步看看shuffleMerge是如何獲取足夠的可用于合并的Executor列表的硬耍。
// BlockManagerMasterEndpoint
// 獲取足夠的executor進(jìn)行合并
private def getShufflePushMergerLocations(
numMergersNeeded: Int,
hostsToFilter: Set[String]): Seq[BlockManagerId] = {
// [1] 通過blockManagerIdByExecutor過濾非driver的Executor如果滿足numMergersNeeded則直接返回
val blockManagerHosts = blockManagerIdByExecutor
.filterNot(_._2.isDriver).values.map(_.host).toSet
val filteredBlockManagerHosts = blockManagerHosts.filterNot(hostsToFilter.contains(_))
val filteredMergersWithExecutors = filteredBlockManagerHosts.map(
BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, _,externalShuffleServicePort))
// Enough mergers are available as part of active executors list
if (filteredMergersWithExecutors.size >= numMergersNeeded) {
filteredMergersWithExecutors.toSeq
} else {
// [2] 否則需要激活過去使用的Executor(最多 500 個(gè))用于進(jìn)行合并
// Delta mergers added from inactive mergers list to the active mergers list
val filteredMergersWithExecutorsHosts = filteredMergersWithExecutors.map(_.host)
val filteredMergersWithoutExecutors =shuffleMergerLocations.values
.filterNot(x => hostsToFilter.contains(x.host))
.filterNot(x => filteredMergersWithExecutorsHosts.contains(x.host))
val randomFilteredMergersLocations =
if (filteredMergersWithoutExecutors.size >
numMergersNeeded - filteredMergersWithExecutors.size) {
Utils.randomize(filteredMergersWithoutExecutors)
.take(numMergersNeeded - filteredMergersWithExecutors.size)
} else {
filteredMergersWithoutExecutors
}
filteredMergersWithExecutors.toSeq ++ randomFilteredMergersLocations
}
}
[1] 通過blockManagerIdByExecutor過濾非driver的Executor如果滿足numMergersNeeded則直接返回
[2] 否則需要激活過去使用的Executor(最多 500 個(gè))用于進(jìn)行合并垄琐。
從中可以看出,如果executor數(shù)不滿足numMergersNeeded经柴,會(huì)從過去使用executor中選擇進(jìn)行激活狸窘,直到獲取到足夠的可用于合并的Executor列表。
這時(shí)shuffle merge service已經(jīng)準(zhǔn)備好了坯认,同時(shí)其被記錄在shuffleDependency的mergerLocs 屬性中翻擒。
總而言之,這個(gè)階段在Driver的DAGScheduler中主要做了兩件事:
- 獲取足夠的可用于shuffle merge services的Executor列表牛哺。
- 將它們記錄在ShuffleDependency的mergerLocs屬性中陋气。
那么shuffle data 是如何被push到shuffle service中的呢?
Map端push shuffle數(shù)據(jù)
乍一看引润,shuffle Writer中的代碼并沒有變化巩趁,沒有增加一種新的shuffle Writer。但PBS的實(shí)現(xiàn)主要是shuffle data生成后推送出去進(jìn)行合并椰拒。
還記的在介紹getWriter時(shí)(參考bypass的文章)晶渠,在ShuffleWriteProcessor.write 中,在 ShuffleWriter.write 成功后, 曾有段shuffleMerge處理的代碼燃观。
下面我們來詳細(xì)介紹下push-based shuffle 是怎樣處理getWriter返回的結(jié)果數(shù)據(jù)的褒脯。
def write(
...
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](
dep.shuffleHandle,
mapId,
context,
createMetricsReporter(context))
writer.write(
rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
val mapStatus = writer.stop(success = true)
if (mapStatus.isDefined) {
// 創(chuàng)建了一個(gè)ShuffleBlockPusher實(shí)例并調(diào)用了它的initialBlockPush方法。
// 在該方法內(nèi)部缆毁,推送器通過獲取連續(xù)的 shuffle 數(shù)據(jù)塊來創(chuàng)建 shuffle 合并請(qǐng)求
if (dep.shuffleMergeEnabled && dep.getMergerLocs.nonEmpty && !dep.shuffleMergeFinalized) {
manager.shuffleBlockResolver match {
case resolver: IndexShuffleBlockResolver =>
val dataFile = resolver.getDataFile(dep.shuffleId, mapId)
new ShuffleBlockPusher(SparkEnv.get.conf)
.initiateBlockPush(dataFile, writer.getPartitionLengths(), dep, partition.index)
case _ =>
}
}
}
mapStatus.get
} catch {
...
}
}
從上面的代碼可見番川,在執(zhí)行完map端的writer后,會(huì)判斷shuffleMergeEnabled是否開啟, 要求dependency中MergerLocs不為空脊框,其次就是shuffleMerge還未執(zhí)行完成颁督。如果滿足這些條件,則會(huì)創(chuàng)建ShuffleBlockPusher類浇雹,并調(diào)用其initiateBlockPush方法沉御。
看來具體的push實(shí)現(xiàn)位于initiateBlockPush方法中。
private[shuffle] def initiateBlockPush(
dataFile: File,
partitionLengths: Array[Long],
dep: ShuffleDependency[_, _, _],
mapIndex: Int): Unit = {
val numPartitions = dep.partitioner.numPartitions
val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
// [1] 將map task的shuffle數(shù)據(jù)轉(zhuǎn)換為PushRequest請(qǐng)求
val requests = prepareBlockPushRequests(numPartitions, mapIndex, dep.shuffleId,
dep.shuffleMergeId, dataFile, partitionLengths, dep.getMergerLocs, transportConf)
// [2] 將PushRequest請(qǐng)求列表變?yōu)殡S機(jī)請(qǐng)求昭灵,這樣不同的mapper同時(shí)推送塊不會(huì)推送相同范圍的 shuffle 分區(qū)
// Randomize the orders of the PushRequest, so different mappers pushing blocks at the same
// time won't be pushing the same ranges of shuffle partitions.
pushRequests++= Utils.randomize(requests)
// [3] 盡力而為的push shuffle數(shù)據(jù)到ess
submitTask(() => {
tryPushUpToMax()
})
}
從上面的代碼可以看出吠裆,在initiateBlockPush主要做了以下三步工作:
- [1] 將map task的shuffle數(shù)據(jù)轉(zhuǎn)換為PushRequest請(qǐng)求
- [2] 將PushRequest請(qǐng)求列表變?yōu)殡S機(jī)請(qǐng)求伐谈,這樣不同的mapper同時(shí)推送塊不會(huì)推送相同范圍的 shuffle 分區(qū)
- [3] 盡力而為的push shuffle數(shù)據(jù)到shuffle merge service
顧名思義,prepareBlockPushRequests方法的作用是將map端生成的shuffle data封裝轉(zhuǎn)換為PushRequest請(qǐng)求试疙。不過除此以外還會(huì)將shuffle中連續(xù)的塊分到同一個(gè)請(qǐng)求中诵棵,可以允許更有效的數(shù)據(jù)讀取。如下圖所示:
// prepareBlockPushRequests 方法
for (reduceId <-0 until numPartitions) {
// 分區(qū)塊進(jìn)行合并并非直接按分區(qū)發(fā)送祝旷,而是通過以下公式
val mergeId = math.min(math.floor(reduceId *1.0 / numPartitions * numMergers),
numMergers -1 ).asInstanceOf[Int ]
在prepareBlockPushRequests履澳,分區(qū)塊進(jìn)行合并按照chunk進(jìn)行發(fā)送,通過上面的公式進(jìn)行劃分合并塊的怀跛,同時(shí)會(huì)跳過空的分區(qū)塊和超過maxBlockSizeToPush距贷,從而避免數(shù)據(jù)傾斜。這里已經(jīng)在上一講講過了敌完,就不再過多贅述了储耐,具體可以看上一講push-based shuffle初探。
在封裝好PushRequest請(qǐng)求后滨溉,最后通過調(diào)用tryPushUpToMax方法將數(shù)據(jù)塊推送出去什湘。
tryPushUpToMax方法調(diào)用的是我們?cè)趕huffle read中使用的pushUpToMax方法。這個(gè)方法在shuffle reader文章中也已經(jīng)介紹過了晦攒,這里只簡單總結(jié)下闽撤。這里的工作就是將shuffle data push到對(duì)應(yīng)的shuffle merge service。在發(fā)送時(shí)將數(shù)據(jù)封裝為PushBlockStream脯颜,push 的時(shí)候使用的是 streamUpload 的方式哟旗,通過 OneForOneBlockPusher ,利用 RetryingBlockFetcher 進(jìn)行發(fā)送栋操。
在Map端push data的階段闸餐,主要做了三件事:
- 將相同分區(qū)shuffle data block合并拆分到chunk中,并將其封裝為PushRequest矾芙;
- 隨機(jī)打亂PushRequest舍沙,避免順序的構(gòu)造push chunk,導(dǎo)致熱點(diǎn)和嚴(yán)重的爭(zhēng)用沖突剔宪;
- 通過pushUpToMax方法拂铡,將數(shù)據(jù)封裝為PushBlockStream,通過 OneForOneBlockPusher葱绒,利用 RetryingBlockFetcher 進(jìn)行發(fā)送感帅。
Shuffle Service merge數(shù)據(jù)
shuffle service 上使用 TransportRequestHandler.processStreamUpload處理上傳的shuffle數(shù)據(jù)塊流。一個(gè) block 的數(shù)據(jù)會(huì)被拆成若干個(gè) bytebuffer 進(jìn)行處理地淀,最后 onComplete 的時(shí)候進(jìn)行合并失球。
if (msgObj instanceof PushBlockStream) {
PushBlockStream message = (PushBlockStream) msgObj;
checkAuth(client, message.appId);
return mergeManager.receiveBlockDataAsStream(message);
....
}
如上代碼所示:在合并過程中,會(huì)調(diào)用ExternalBlockHandler.receiveStream方法中操作請(qǐng)求帮毁。它將推送消息傳遞給RemoteBlockPushResolver的receiveBlockDataAsStream方法她倘。
那么shuffle data具體是如何被合并的璧微,這里涉及到一個(gè)重要的數(shù)據(jù)結(jié)構(gòu)AppShufflePartitionInfo。
在ShuffleService上會(huì)保存appId到AppShuffleInfo的map映射硬梁,每個(gè)AppShuffleInfo內(nèi)會(huì)保存shuffleId到AppShuffleMergePartitionsInfo 的map映射,在appShuffleMergePartitionsInfo內(nèi)會(huì)保存reduceId到AppShufflePartitionInfo 的映射胞得,最終在AppShufflePartitionInfo 內(nèi)部會(huì)保存三個(gè)File;
public static class AppShufflePartitionInfo {
private final String appId;
private final int shuffleId;
private final int shuffleMergeId;
private final int reduceId;
private final File dataFile;
// The merged shuffle data file channel
public final FileChannel dataChannel;
// The index file for a particular merged shuffle contains the chunk offsets.
private final MergeShuffleFile indexFile;
// The meta file for a particular merged shuffle contains all the map indices that belong to
// every chunk. The entry per chunk is a serialized bitmap.
private final MergeShuffleFile metaFile;
...
}
如上代碼所示荧止,AppShufflePartitionInfo 中包含 3 個(gè) FileChannel,分別用于 data/index/meta 信息的保存阶剑。
當(dāng)shuffle service接收到 block 塊時(shí)跃巡,在嘗試添加到對(duì)應(yīng)的 shuffle 合并文件之前,它首先要檢索相應(yīng)的 Shuffle 分區(qū)元數(shù)據(jù)牧愁。保存的元數(shù)據(jù)可以幫助shuffle service正確處理一些潛在的異常場(chǎng)景素邪。
在onData進(jìn)行數(shù)據(jù)處理時(shí),對(duì)于 streamUpload 過來的 ByteBuffer猪半,只會(huì)對(duì) AppShufflePartitionInfo 進(jìn)行加鎖兔朦,如果當(dāng)前 ByteBuffer 的數(shù)據(jù)不屬于currentMergingMapId 的,則加入到一個(gè)列表中磨确。在寫當(dāng)前正在處理的 ByteBuffer 前沽甥,會(huì)將前面列表中的數(shù)據(jù)都寫入到數(shù)據(jù)文件中。
最終的合并時(shí)在onComplete進(jìn)行的乏奥,下面我們?cè)敿?xì)看下合并要滿足的條件:
@Override
public void onComplete(String streamId) throws IOException {
synchronized (partitionInfo) {
logger.trace("{} onComplete invoked", partitionInfo);
AppShuffleMergePartitionsInfo info = appShuffleInfo.shuffles.get(partitionInfo.shuffleId);
// [1] 如果shuffle map任務(wù)終止(或者說reducers已經(jīng)啟動(dòng))摆舟,表明太遲了,則不會(huì)發(fā)生合并
if (isTooLate(info, partitionInfo.reduceId)) {
...
}
// [2] stage的狀態(tài)不確定邓了,(stage在重試中), 也不會(huì)進(jìn)行合并
if (isStale(info, partitionInfo.shuffleMergeId)) {
...
}
// [3] 校驗(yàn)給定 reducer 的一個(gè)映射流只能與現(xiàn)有文件合并
// Check if we can commit this block
if (allowedToWrite()) {
// [4] 如果是推測(cè)任務(wù)執(zhí)行中發(fā)送重復(fù)的reducer數(shù)據(jù)恨诱,則直接返回
if (isDuplicateBlock()) {
deferredBufs = null;
return;
}
if (partitionInfo.getCurrentMapIndex() < 0) {
...
}
// [5] 執(zhí)行buffer合并
long updatedPos = partitionInfo.getDataFilePos() + length;
boolean indexUpdated = false;
if (updatedPos - partitionInfo.getLastChunkOffset() >= mergeManager.minChunkSize) {
try {
partitionInfo.updateChunkInfo(updatedPos, mapIndex);
indexUpdated = true;
} catch (IOException ioe) {
incrementIOExceptionsAndAbortIfNecessary();
// If the above doesn't throw a RuntimeException, then we do not propagate the
// IOException to the client. This may increase the chunk size however the increase is
// still limited because of the limit on the number of IOExceptions for a
// particular shuffle partition.
}
}
partitionInfo.setDataFilePos(updatedPos);
partitionInfo.setCurrentMapIndex(-1);
// update merged results
partitionInfo.blockMerged(mapIndex);
if (indexUpdated) {
partitionInfo.resetChunkTracker();
}
} else {
deferredBufs = null;
throw new BlockPushNonFatalFailure(
new BlockPushReturnCode(ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(), streamId)
.toByteBuffer(), BlockPushNonFatalFailure.getErrorMsg(
streamId, ReturnCode.BLOCK_APPEND_COLLISION_DETECTED));
}
}
isWriting = false;
}
可見在合并前也需要滿足PushBlockStreamCallback定義的條件:
- [1] 如果shuffle map任務(wù)終止(或者說reducers已經(jīng)啟動(dòng)),表明太遲了骗炉,則不會(huì)發(fā)生合并
- [2] stage的狀態(tài)不確定照宝,(stage在重試中), 也不會(huì)進(jìn)行合并
- [3] 校驗(yàn)給定 reducer 的一個(gè)映射流只能與現(xiàn)有文件合并
- [4] 如果是推測(cè)任務(wù)執(zhí)行中發(fā)送重復(fù)的reducer數(shù)據(jù),則直接返回
最后再進(jìn)行合并時(shí)痕鳍,會(huì)將 shuffle 字節(jié)添加到數(shù)據(jù)文件后硫豆,合并器首先將合并后的偏移量寫入索引文件,然后才將映射器信息添加到元文件中笼呆。
這里的邏輯有點(diǎn)復(fù)雜熊响,為了避免錯(cuò)誤,只總結(jié)下要點(diǎn):
- shuffle service 上使用 TransportRequestHandler.processStreamUpload處理上傳的shuffle數(shù)據(jù)塊流诗赌。一個(gè) block 的數(shù)據(jù)會(huì)被拆成若干個(gè) bytebuffer 進(jìn)行處理汗茄。
- 在onData進(jìn)行數(shù)據(jù)處理時(shí),對(duì)于 streamUpload 過來的 ByteBuffer铭若,只會(huì)對(duì) AppShufflePartitionInfo 進(jìn)行加鎖洪碳,如果當(dāng)前 ByteBuffer 的數(shù)據(jù)不屬于currentMergingMapId 的递览,則加入到一個(gè)列表中。在寫當(dāng)前正在處理的 ByteBuffer 前瞳腌,會(huì)將前面列表中的數(shù)據(jù)都寫入到數(shù)據(jù)文件中绞铃。
- 在onComplete進(jìn)行合并時(shí),會(huì)先判斷是否滿足合并的條件嫂侍。合并時(shí)儿捧,會(huì)將 shuffle 字節(jié)append到數(shù)據(jù)data文件后,合并器首先將合并后的偏移量寫入索引index文件挑宠,然后才將映射器信息添加到元meta文件中菲盾。
獲取更新MergeStatues
當(dāng)每個(gè) ShuffleMapTask 結(jié)束的時(shí)候,DAGScheduler都會(huì)去判斷 ShuffleMapStage 是否 pending partitions 為空各淀,如果為空說明 stage 結(jié)束了懒鉴,此時(shí)開始向 shuffle service 上發(fā)送 finalize 信息,并將信息返回給 driver 并添加到 merge statuses 信息中碎浇。
private[scheduler] def handleTaskCompletion(event: CompletionEvent): Unit = {
...
if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
if (!shuffleStage.shuffleDep.shuffleMergeFinalized &&
shuffleStage.shuffleDep.getMergerLocs.nonEmpty) {
scheduleShuffleMergeFinalize(shuffleStage)
} else {
processShuffleMapStageCompletion(shuffleStage)
}
}
...
上面是DAGScheduler.handleTaskCompletion中的代碼临谱,可以從中看出在TaskCompletion時(shí),當(dāng)DAGScheduler收到有關(guān)執(zhí)行的最后一個(gè)map task的通知時(shí), 它會(huì)向所有 shuffle 服務(wù)發(fā)送FinalizeShuffleMerge消息南捂。服務(wù)攔截消息并從MergerShuffleFileManager完成合并過程吴裤。任何正在進(jìn)行的合并都會(huì)被中斷并取消,以避免合并文件中有部分?jǐn)?shù)據(jù)溺健。
同時(shí)DAGScheduler 等待spark.shuffle.push.result.timeout來獲取響應(yīng)麦牺。如果 shuffle 服務(wù)在此延遲內(nèi)響應(yīng),DAGScheduler會(huì)攔截包含以下屬性的響應(yīng):
public class MergeStatuses extends BlockTransferMessage {
/** Shuffle ID **/
public final int shuffleId;
/**
* shuffleMergeId is used to uniquely identify merging process of shuffle by
* an indeterminate stage attempt.
*/
public final int shuffleMergeId;
/**
* Array of bitmaps tracking the set of mapper partition blocks merged for each
* reducer partition
*/
public final RoaringBitmap[] bitmaps;
/** Array of reducer IDs **/
public final int[] reduceIds;
/**
* Array of merged shuffle partition block size. Each represents the total size of all
* merged shuffle partition blocks for one reducer partition.
* **/
public final long[] sizes;
獲取MergeStatues的過程和MapStatus的過程類似鞭缭,其是通過getPushBasedShuffleMapSizesByExecutorId進(jìn)行獲取的剖膳,具體可以參考shuffle reader 的文章。
在更新Merge Status階段岭辣,主要做了下面的工作:
- 向shuffle service上發(fā)送 FinalizeShuffleMerge 信息吱晒。注意:任何正在進(jìn)行的合并都會(huì)被中斷并取消,以避免合并文件中有部分?jǐn)?shù)據(jù)沦童。
- 等待spark.shuffle.push.result.timeout來獲取響應(yīng)仑濒,攔截獲取MergeStatuses。
reducer拉取merge shuffle數(shù)據(jù)
reduce task 開始之后偷遗,從 driver 上獲取 merge statuses 信息墩瞳,并在數(shù)據(jù)劃分的時(shí)候,如果是 merged block 則先向 shuffle service 上請(qǐng)求一次 meta 信息氏豌,獲取到 meta 信息后喉酌,利用 shuffle service 上的 index 文件信息,讀取文件中 block 數(shù)據(jù)。
通過get reader獲取shuffle數(shù)據(jù)泪电,這塊已經(jīng)在shuffle reader中講過了般妙,這里就不大段的貼代碼了,只講解涉及到pbs的地方相速,具體的可以再復(fù)習(xí)shuffle reader的文章碟渺。
// 在劃分?jǐn)?shù)據(jù)源的請(qǐng)求:本地、主機(jī)本地和遠(yuǎn)程塊突诬, 同時(shí)劃分出pbs的blocks
val remoteRequests = partitionBlocksByFetchMode(
blocksByAddress, localBlocks, hostLocalBlocksByExecutor, pushMergedLocalBlocks)
我們具體的收集pbs的遠(yuǎn)程blocks地址的實(shí)現(xiàn):
blockId match {
// 獲取數(shù)據(jù)請(qǐng)求
case ShuffleBlockChunkId(_, _, _, _) =>
if (curRequestSize >= targetRemoteRequestSize ||
curBlocks.size >= maxBlocksInFlightPerAddress) {
curBlocks = createFetchRequests(curBlocks.toSeq, address, isLast = false,
collectedRemoteRequests, enableBatchFetch = false)
curRequestSize = curBlocks.map(_.size).sum
}
// 從forMergedMetas可以看出這里會(huì)為獲取元數(shù)據(jù)構(gòu)建單獨(dú)的請(qǐng)求
case ShuffleMergedBlockId(_, _, _) =>
if (curBlocks.size >= maxBlocksInFlightPerAddress) {
curBlocks = createFetchRequests(curBlocks.toSeq, address, isLast = false,
collectedRemoteRequests, enableBatchFetch = false, forMergedMetas = true)
}
case _ =>
// For batch fetch, the actual block in flight should count for merged block.
val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress
if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) {
curBlocks = createFetchRequests(curBlocks.toSeq, address, isLast = false,
collectedRemoteRequests, doBatchFetch)
curRequestSize = curBlocks.map(_.size).sum
}
}
從這里可以看出如果是 merged block 則先向 shuffle service 上請(qǐng)求一次 meta 信息止状。
接下來,我們?cè)賮砜聪掳l(fā)送請(qǐng)求的方法:
def send(remoteAddress: BlockManagerId, request: FetchRequest): Unit = {
if (request.forMergedMetas) {
pushBasedFetchHelper.sendFetchMergedStatusRequest(request)
} else {
sendRequest(request)
}
numBlocksInFlightPerAddress(remoteAddress) =
numBlocksInFlightPerAddress.getOrElse(remoteAddress, 0) + request.blocks.size
}
iterator.addToResultsQueue(PushMergedRemoteMetaFetchResult(shuffleId, shuffleMergeId,
reduceId, sizeMap((shuffleId, reduceId)), meta.readChunkBitmaps(), address))
在發(fā)送請(qǐng)求時(shí)攒霹,會(huì)區(qū)分請(qǐng)求元數(shù)據(jù)的請(qǐng)求和數(shù)據(jù)的獲取請(qǐng)求。元數(shù)據(jù)請(qǐng)求返回PushMergedRemoteMetaFetchResult 浆洗。
最后在ShuffleBlockFetcherIterator.next讀取數(shù)據(jù)時(shí)催束,將獲取元數(shù)據(jù)進(jìn)行模式匹配:
case PushMergedRemoteMetaFetchResult(
shuffleId, shuffleMergeId, reduceId, blockSize, bitmaps, address) =>
// ...
val blocksToFetch = pushBasedFetchHelper.createChunkBlockInfosFromMetaResponse(
shuffleId, shuffleMergeId, reduceId, blockSize, bitmaps)
val additionalRemoteReqs = new ArrayBuffer[FetchRequest]
collectFetchRequests(address, blocksToFetch.toSeq, additionalRemoteReqs)
fetchRequests ++= additionalRemoteReqs
// Set result to null to force another iteration.
result = null
從這里可以看出, 在獲取取到 PushMergedRemoteMetaFetchResult 信息后,利用 shuffle service 上的 index 文件信息伏社,再次發(fā)起讀取文件中 block 數(shù)據(jù)的請(qǐng)求抠刺。這次的請(qǐng)求blockId是ShuffleBlockChunkId類型, 從上面的代碼可以看出這意味著請(qǐng)求類型會(huì)將forMergedMetas標(biāo)志設(shè)置為 false摘昌。
獲取merge shuffle數(shù)據(jù)速妖,主要有以下步驟:
- 從driver上獲取 merge statuses 信息;
- merged block 則先向 shuffle service 上請(qǐng)求一次 meta 信息聪黎;
- 獲取到 meta 信息后罕容,利用shuffle service 上的 index 文件信息,讀取文件中 block 數(shù)據(jù)稿饰。
在最后锦秒,我們?cè)賲R總下push-based shuffle的詳細(xì)過程:
- 在Driver端,當(dāng)dagScheduler提交ShuffleMapStage類型的任務(wù)時(shí)喉镰,會(huì)向資源管理器后端詢問可用于shuffle merge services的Executor列表旅择。在資源管理器返回用于托管 shuffle merge 服務(wù)的可用節(jié)點(diǎn)后,dagScheduler 將它們記錄在shuffleDependency的mergerLocs屬性中侣姆。
- 在 map端生真,當(dāng)ShuffleWriter.write 成功后,會(huì)調(diào)用 ShuffleWriter.initiateBlockPush 捺宗,將已經(jīng)落盤的 ShuffleBlock push 到遠(yuǎn)端的 shuffle service 上柱蟀。將數(shù)據(jù)封裝為PushBlockStream,push 的時(shí)候使用的是 streamUpload 的方式偿凭,通過 OneForOneBlockPusher产弹,利用 RetryingBlockFetcher 進(jìn)行發(fā)送。
- 在shuffle service中,使用TransportRequestHandler.processStreamUpload處理上傳的shuffle數(shù)據(jù)塊流痰哨。一個(gè) block 的數(shù)據(jù)會(huì)被拆成若干個(gè) bytebuffer 進(jìn)行處理胶果。在shuffle service中,每個(gè)reduceid會(huì)維護(hù)一個(gè)AppShufflePartitionInfo 斤斧,在其中包含 3 個(gè) FileChannel早抠,分別用于 data/index/meta 信息的保存。在onData進(jìn)行數(shù)據(jù)處理時(shí)撬讽,對(duì)于 streamUpload 過來的 ByteBuffer蕊连,只會(huì)對(duì) AppShufflePartitionInfo 進(jìn)行加鎖,如果當(dāng)前 ByteBuffer 的數(shù)據(jù)不屬于currentMergingMapId 的游昼,則加入到一個(gè)列表中甘苍。最后 onComplete 的時(shí)候進(jìn)行合并合并時(shí),會(huì)將 shuffle 字節(jié)添加到數(shù)據(jù)文件后,合并器首先將合并后的偏移量寫入索引文件,然后才將映射器信息添加到元文件中榨惠。
- 當(dāng)每個(gè) ShuffleMapTask 結(jié)束的時(shí)候隆判,DAGScheduler都會(huì)去判斷 ShuffleMapStage 是否 pending partitions 為空,如果為空說明 stage 結(jié)束了,此時(shí)開始向 shuffle service 上發(fā)送 finalize 信息,并將信息返回給 driver 并添加到 merge statuses 信息中。同時(shí)DAGScheduler 等待spark.shuffle.push.result.timeout來獲取響應(yīng)顽铸。
- reduce task 開始之后,從 driver 上獲取 merge statuses 信息料皇,并在數(shù)據(jù)劃分的時(shí)候谓松,如果是 merged block 則先向 shuffle service 上請(qǐng)求一次 meta 信息,獲取到 meta 信息后瓶蝴,利用 shuffle service 上的 index 文件信息毒返,讀取文件中 block 數(shù)據(jù)。
再我們了解完P(guān)ush-based shuffle代碼后舷手,我們來回答下以下幾個(gè)問題:
- push-based shuffle是在shuffle write結(jié)束后追加了push與合并操作拧簸,那么是否只有在發(fā)生FetchFail的情況下(導(dǎo)致stage重試)push-base shuffle的性能更好?
- push-based shuffle 能否進(jìn)行精簡下男窟?例如取消掉driver端的行為盆赤。