歡迎關(guān)注公眾號“Tim在路上”
在Spark3.2中引入了領(lǐng)英設(shè)計的一種新的shuffle方案右钾,今天我們先來了解下其大致的設(shè)計原理蚁吝,之后會再分析其具體的代碼實現(xiàn)。
當(dāng)我們在Yarn上部署Spark時霹粥,通常會使用ESS來管理shuffle數(shù)據(jù)(具體可見什么是ESS的文章)灭将。我們先來回顧下基于ESS進(jìn)行shuffle的過程。
- 每個 Spark Executor 在啟動后都會和位于同一個節(jié)點上面的 Spark External Shuffle Service (ESS)進(jìn)行注冊后控。此類注冊允許 Spark ESS 了解來自每個注冊 Executor 的本地 Map 任務(wù)產(chǎn)生的物化 Shuffle 數(shù)據(jù)的位置庙曙。請注意,Spark ESS 的實例在 Spark Executor 的外部浩淘,并且可以在多個 Spark 應(yīng)用程序中共享捌朴。
- Shuffle Map Stage 中的每個任務(wù)都會處理部分?jǐn)?shù)據(jù)吴攒。在 Map 任務(wù)結(jié)束時,它會產(chǎn)生 2 個文件砂蔽,一個用來存儲 Shuffle 數(shù)據(jù)洼怔,另一個用來索引前者的 Shuffle 塊。為了這樣做左驾, Map 任務(wù)會根據(jù)分區(qū)鍵的散列值對所有轉(zhuǎn)換的記錄進(jìn)行排序镣隶。在此過程中,如果無法在內(nèi)存中對整個數(shù)據(jù)進(jìn)行排序诡右,則 Map 任務(wù)會溢出中間數(shù)據(jù)到磁盤安岂。一旦排序,將生成 Shuffle 數(shù)據(jù)文件帆吻,其中屬于相同 Shuffle 分區(qū)的所有記錄都會被組合到一起域那,放到一個 Shuffle 塊中。還會生成匹配的 Shuffle 索引文件猜煮,用來記錄塊邊界的偏移量次员。
- 當(dāng)下一個 Stage 的 Reduce 任務(wù)開始運行時,它們會查詢 Spark 的Driver 以獲取輸入的 Shuffle 塊的位置王带。一旦此信息變?yōu)榭捎檬缥担總€Reduce 任務(wù)將會建立和對應(yīng)的 Spark ESS 實例的連接,以便獲取其輸入數(shù)據(jù)愕撰。 Spark ESS 在接收到這樣的請求時束倍,會利用 Shuffle 索引文件來跳到 Shuffle 數(shù)據(jù)文件中對應(yīng)塊數(shù)據(jù),從磁盤讀取它盟戏,并將其發(fā)送回 Reduce 任務(wù)。
然而在實踐中任然存在些問題甥桂,使得spark任務(wù)的穩(wěn)定性不高柿究。
- Spark ESS 每個 FETCH 請求只會讀取一個 Shuffle 塊,因此Shuffle 塊的平均大小決定了每次盤讀的平均數(shù)據(jù)量黄选,如果存在大量小 Shuffle 塊導(dǎo)致磁盤 I/O 低效蝇摸。
- Reduce 任務(wù)在建立與遠(yuǎn)程 Spark ESS 的連接時出現(xiàn)失敗的情況,它會立即失敗整個的 Shuffle Reduce Stage办陷,導(dǎo)致前面的 Stage 重試貌夕,來重新生成拉取不到的 Shuffle 數(shù)據(jù)。
- 如果 Shuffle 塊在 Reduce 任務(wù)中本地可用民镜,則任務(wù)可以直接從磁盤讀取啡专,繞過 Shuffle 服務(wù),這有助于減少 Shuffle 期間的 RPC 連接數(shù)制圈。但是 Spark 當(dāng)前的 Shuffle 機制會導(dǎo)致 Reduce 任務(wù)的數(shù)據(jù)本地性很少们童,因為它們的任務(wù)輸入數(shù)據(jù)分散在所有的 Map 任務(wù)中畔况。
Push-based shuffle架構(gòu)流程
PBS主要結(jié)構(gòu)和流程:
- Spark driver組件,協(xié)調(diào)整體的shuffle操作;
- map任務(wù)的shuffle writer過程完成后慧库,增加了一個額外的操作push-merge跷跪,將數(shù)據(jù)復(fù)制一份推到遠(yuǎn)程shuffle服務(wù)上;
- magnet shuffle service是一個強化版的ESS齐板。將隸屬于同一個shuffle partition的block吵瞻,會在遠(yuǎn)程傳輸?shù)絤agnet 后被merge到一個文件中;
- reduce任務(wù)從magnet shuffle service 接收合并好的shuffle數(shù)據(jù)甘磨,不同reduce任務(wù)可以共享shuffle數(shù)據(jù)來提升shuffle傳輸效率橡羞。
幾個重要的特性:
Push-Merge Shuffle - Magnet采用 Push-Merge Shuffle 機制,其中 Mapper 生成的 Shuffle 數(shù)據(jù)被推送到遠(yuǎn)程的 Magnet Shuffle Service宽档,從而實現(xiàn)每個 shuffle 分區(qū)都能被合并尉姨。這允許Magnet將小的 Shuffle 塊的隨機讀取轉(zhuǎn)化成 MB 大小塊的順序讀取。此外吗冤,此推送操作與 Mapper 分離又厉,這樣的話,如果操作失敗椎瘟,也不會增加 Map Task 的運行時間或者導(dǎo)致 Map Task 失敗覆致。
最給力的兜底方法 - Magnet不需要塊 push 操作完成的那么完美。通過執(zhí)行Push-Merge Shuffle肺蔚,Magnet有效地復(fù)制了 shuffle 數(shù)據(jù)煌妈。Magnet允許 reducer 獲取合并的和未合并的 shuffle 數(shù)據(jù)都作為任務(wù)輸入。這使得Magnet能夠容忍塊 push 操作的部分完成宣羊。
靈活的部署策略 - Magnet 通過在頂層構(gòu)建的方式集成了 Spark 原生的 shuffle璧诵。這使得Magnet可以部署在具有相同位置的計算和存儲節(jié)點的 on-prem 集群中與disaggrecated存儲層的cloud-based的集群中。在前一種情況下仇冯,隨著每次 Reduce Task 的大部分都合并在一個位置之宿,Magnet利用這種本地性來調(diào)度 Reduce Task 并實現(xiàn)更好的 Reducer 數(shù)據(jù)本地性。在后一種情況下苛坚,代替數(shù)據(jù)本地性比被,Magnet可以選擇較少負(fù)載的遠(yuǎn)程 shuffle 服務(wù),從而更好的優(yōu)化了負(fù)載均衡泼舱。
緩解落后/數(shù)據(jù)傾斜 - Magnet可以處理落后和數(shù)據(jù)傾斜等缀。由于Magnet可以容忍塊 push 操作的部分完成,因此可以通過停止慢速 push 操作或跳過 push 大/傾斜的 block 塊來緩解落后和數(shù)據(jù)傾斜娇昙。
為遠(yuǎn)程 push 準(zhǔn)備 block 塊
push-merge 的根本目的是減少reduce側(cè)的隨機IO, 在Magnet上把小文件block合并后, 將隨機IO轉(zhuǎn)變?yōu)轫樞騃O尺迂。reduce task可以讀取連續(xù)存儲的、大小在MB級別的文件。
為了解決map端的小文件問題枪狂,提高磁盤 I/O 效率危喉,我們需要增加每次 I/O 操作的數(shù)據(jù)量。這里提出了采用合并屬于同一個 Shuffle 分區(qū)的 Shuffle block 塊州疾,以創(chuàng)建更大的數(shù)據(jù)塊的方式辜限。
下面我們來詳細(xì)解釋下:
首先,push-merge的基本單位是chunk严蓖,map task輸出block后薄嫡,首先要將block以算法的方式分配到chunk中去。
這里的算法的簡單思想就是將block塊合并為chunk颗胡,當(dāng)chunk的長度超過超限之后又push到magent上的過程毫深。具體的實現(xiàn)在方法ShuffleBlockPusher.prepareBlockPushRequests方法中:
for (reduceId <- 0 until numPartitions) {
val blockSize = partitionLengths(reduceId)
if (blockSize > 0) {
// [1] 通過以下公式,更新一下merge service機器編號毒姨,把chunk發(fā)送到下一臺機器上
val mergerId = math.min(math.floor(reduceId * 1.0 / numPartitions * numMergers),
numMergers - 1).asInstanceOf[Int]
// [2] 當(dāng)chunk長度沒有超過限制maxBlockSizeToPush哑蔫,將block append到chunk中,更新chunk長度
// service, and does not go beyond existing limitations.
if (currentReqSize + blockSize <= maxBlockBatchSize
&& blocks.size < maxBlocksInFlightPerAddress
&& mergerId == currentMergerId && blockSize <= maxBlockSizeToPush) {
// Add current block to current batch
currentReqSize += blockSize.toInt
// [3] 當(dāng)chunk長度超過限制弧呐,將chunk推到編號為currentMergerId的Magnet機器上闸迷,之后寫入新的block進(jìn)去(重新初始化)
} else {
if (blocks.nonEmpty) {
// Convert the previous batch into a PushRequest
requests +=PushRequest(mergerLocs(currentMergerId), blocks.toSeq,
createRequestBuffer(transportConf, dataFile, currentReqOffset, currentReqSize))
blocks = new ArrayBuffer[(BlockId, Int)]
}
// Start a new batch
currentReqSize = 0
// Set currentReqOffset to -1 so we are able to distinguish between the initial value
// of currentReqOffset and when we are about to start a new batch
currentReqOffset = -1
currentMergerId = mergerId
}
// push的blocks長度都是小于maxBlockSizeToPush
// Only push blocks under the size limit
if (blockSize <= maxBlockSizeToPush) {
val blockSizeInt = blockSize.toInt
blocks += ((ShufflePushBlockId(shuffleId, shuffleMergeId, partitionId,
reduceId), blockSizeInt))
// Only update currentReqOffset if the current block is the first in the request
if (currentReqOffset == -1) {
currentReqOffset = offset
}
if (currentReqSize == 0) {
currentReqSize += blockSizeInt
}
}
}
offset += blockSize
}
可見這里的算法的流程為:
- [1] 通過math.min(math.floor(reduceId * 1.0 / numPartitions * numMergers),numMergers - 1)公式,更新一下merge service機器編號俘枫,把chunk發(fā)送到下一臺機器上腥沽;
- [2] 當(dāng)chunk長度沒有超過限制maxBlockSizeToPush,將block append到chunk中鸠蚪,更新chunk長度
- [3] 當(dāng)chunk長度超過限制今阳,將chunk推到編號為currentMergerId的Magnet機器上,之后寫入新的block進(jìn)去(重新初始化)
同時需要注意這里push的blocks的大小都是小于maxBlockSizeToPush茅信,這里用于跳過數(shù)據(jù)傾斜的分區(qū)塊盾舌。
// Randomize the orders of the PushRequest, so different mappers pushing blocks at the same
// time won't be pushing the same ranges of shuffle partitions.
pushRequests ++= Utils.randomize(requests)
另外為了避免順序的構(gòu)造push chunk,導(dǎo)致Magnet上資源的熱點和嚴(yán)重的爭用沖突蘸鲸。在完成準(zhǔn)備將shuffle data轉(zhuǎn)換為push request后矿筝,將chunk按照編號進(jìn)行了隨機化處理,來避免所有map task按照相同次序push chunk棚贾。
合并Magnet shuffle service的 block 塊
在 Magnet shuffle service一側(cè),對于正在主動合并的每個 Shuffle 分區(qū)榆综,Magnet shuffle service會生成合并的 Shuffle 文件妙痹,用來添加所有接收的相應(yīng) block 塊。
它還為每個主動合并的 Shuffle 分區(qū)維護(hù)一些元數(shù)據(jù)鼻疮。
這份元數(shù)據(jù)的唯一鍵由 applicationID怯伊,shuffle ID和 shuffle partition ID混合組成,并且放到一個 ConcurrentHashMap 中判沟。
Magnet機器上需要維護(hù)一些重要的元信息耿芹,如上圖所示崭篡,包括:
- bitmap: 存儲已merge的mapper id,防止重復(fù)merge吧秕;
- position offset: 如果本次block沒有正常merge琉闪,可以恢復(fù)到上一個block的位置;
- currentMapId:標(biāo)識當(dāng)前正在append的block砸彬,保證不同mapper 的block能依次append颠毙。
當(dāng)Magnet shuffle service接收到 block 塊時,在嘗試添加到對應(yīng)的 shuffle 合并文件之前砂碉,它首先要檢索相應(yīng)的 Shuffle 分區(qū)元數(shù)據(jù)蛀蜜。元數(shù)據(jù)可以幫助Magnet shuffle service正確處理一些潛在的異常場景。
例如增蹭,bitmap可幫助Magnet shuffle service識別任何潛在的重復(fù)塊滴某,因此沒有多余的數(shù)據(jù)會被寫入 Shuffle 合并文件中。
currentMapId用于保證當(dāng)前正在append的block滋迈,即使Magnet shuffle service可以從不同的 Map 任務(wù)中接收同一個 shuffle 分區(qū)的多個 block 塊霎奢,只有當(dāng)currentMapId的 block 塊完整地添加到 Shuffle 合并文件中,下一次寫入才可開始杀怠。
并且椰憋,在遇到足以損壞整個 shuffle 合并文件的故障之前,可以將 block 塊部分地添加到 Shuffle 合并文件中赔退。當(dāng)發(fā)生這種情況時橙依,position offset會有助于將 Shuffle 合并文件帶回到健康狀態(tài)。下一個 block 塊會從位置偏移量處開始添加硕旗,這可以有效地覆蓋損壞的部分窗骑。如果損壞的 block 塊是最后一個的話,block 合并操作結(jié)束之后將截斷損壞的部分漆枚。通過追蹤這份元數(shù)據(jù)创译,Magnet shuffle service可以在 block 塊合并操作期間適宜地去處理重復(fù),沖撞和故障的情況墙基。
提升 Shuffle 的可靠性
magnet shuffle服務(wù)通過Best-effort的方式來解決海量連接可靠性低的問題软族。在該體系上,所有連接異常都是non-fatal的残制,可以理解為每個環(huán)節(jié)上的連接斷開或異常立砸,都有一個對應(yīng)的備選和兜底方案:
- 如果Map task輸出的Block沒有成功Push到magnet上,并且反復(fù)重試仍然失敗初茶,則reduce task直接從ESS上拉取原始block數(shù)據(jù)颗祝;
- 如果magnet上的block因為重復(fù)或者沖突等原因,沒有正常完成merge的過程,則reduce task直接拉取未完成merge的block螺戳;
- 如果reduce拉取已經(jīng)merge好的block失敗搁宾,則會直接拉取merge前的原始block。
對于一個有著 M 個 Map 任務(wù)和 R 個 Reduce 任務(wù)的 Shuffle 來說倔幼,Spark Driver 會收集 M 個 MapStatus和 R 個 MergeStatus盖腿。
這些元數(shù)據(jù)會告訴 Spark Driver 每個未合并的 Shuffle block 塊和已合并的 Shuffle 文件的位置和大小,還有哪些 block 塊會合并到每一個 Shuffle 合并文件中凤藏。
因此奸忽,Spark Driver 可以完整的看到,怎樣去獲取每個 Reduce 任務(wù)已合并的Shuffle 文件和未合并的 Shuffle 塊揖庄。當(dāng) Reduce 任務(wù)沒能獲取到 Shuffle 合并 block 塊時栗菜,元數(shù)據(jù)便會能夠回過頭來獲取原始的未合并的 block 塊。
Magnet 盡最大可能有效地維護(hù)了兩份 Shuffle 數(shù)據(jù)的副本蹄梢。
靈活的部署策略
Magnet允許 Spark原生地去管理 Shuffle 的各個方面疙筹,包括存儲 Shuffle 數(shù)據(jù),提供容錯能力禁炒,還有可以追蹤 Shuffle 數(shù)據(jù)的位置元數(shù)據(jù)信息而咆。
在這種情況下,Spark 不依賴于外部的系統(tǒng)進(jìn)行 Shuffle幕袱。
這允許靈活地將Magnet部署在計算/存儲同一節(jié)點的 on-prem 集群和具有disaggregated storage layer的cloud-based的集群暴备。對于計算和存儲同一個節(jié)點的on prem數(shù)據(jù)中心,Shuffle Reduce 任務(wù)的數(shù)據(jù)本地性可以帶來很多好處们豌。
其中包括提高 I/O 效率涯捻,并且由于繞過網(wǎng)絡(luò)傳輸減少了 Shuffle 獲取失敗的情況。
通過利用 Spark 的位置感知任務(wù)調(diào)度并且基于 Spark Executor 的位置信息選擇 Magnet shuffle service來 push Shuffle block 塊望迎,實現(xiàn) Shuffle 數(shù)據(jù)本地性似乎微不足道障癌。
動態(tài)分配的功能使得 Spark 在一段時間內(nèi)如果沒有任務(wù)運行,則釋放空閑的 Executor辩尊,并且如果任務(wù)再次待辦涛浙,則可以稍后重新啟動 Executor。
這使得 Spark 應(yīng)用程序在多租戶集群中資源更加富裕摄欲。
通過 Spark 動態(tài)分配轿亮,當(dāng) Driver 在 Shuffle Map Stage 的開頭選擇Magnet shuffle service列表時,由于 Executor 在前一個 Stage 的結(jié)尾會釋放胸墙,活躍的 Spark Executor 的數(shù)量可能小于需求的數(shù)量我注。如果我們選擇基于 Spark Executor 位置信息的 Magnet shuffle service,我們最終可能比需求的 Shuffle 服務(wù)更少劳秋。
為了解決這個問題,我們選擇在活躍 Spark Executor 之外位置的Magnet shuffle service,并通過基于所選Magnet shuffle service位置信息的動態(tài)分配機制來啟動 Spark Executor玻淑。這樣的話嗽冒,我們基于Magnet shuffle service的位置信息來啟動 Spark Executor,而不會去基于 Spark Executor 的位置信息來選擇 Magnet shuffle service补履。由于Magnet和 Spark 原生的 Shuffle 集成添坊,因此可以進(jìn)行這種優(yōu)化。
對于cloud-based
的集群部署箫锤,計算和存儲節(jié)點通常是分開的贬蛙。
在這樣的部署中, Shuffle 中間數(shù)據(jù)可以通過快速網(wǎng)絡(luò)連接在disaggregated storage
中物化谚攒。
Shuffle Reduce 任務(wù)的數(shù)據(jù)本地性在這種設(shè)置中不再重要阳准。然而,Magnet仍然適合這種cloud-based的部署馏臭。Magnet shuffle service在計算節(jié)點上運行野蝇,在 disaggregated storage 節(jié)點上面存儲合并的 shuffle 文件。通過讀取更大的數(shù)據(jù) chunk 塊而不是橫跨網(wǎng)絡(luò)的細(xì)碎的 shuffle block 塊括儒,Magnet有助于更好地利用可用網(wǎng)絡(luò)帶寬绕沈。
此外,Spark Executor 在選擇Magnet shuffle service的時候可以選擇優(yōu)化更好的負(fù)載均衡而不是數(shù)據(jù)本地性帮寻。Spark Driver 可以查詢可用Magnet shuffle service的負(fù)載乍狐,以便選擇負(fù)載低的。在我們的Magnet實現(xiàn)中固逗,我們允許通過靈活的政策來選擇Magnet shuffle service的位置浅蚪。
因此,我們可以選擇根據(jù)集群的部署模式要么優(yōu)化數(shù)據(jù)本地性要么優(yōu)化負(fù)載均衡抒蚜,或者兩者都有也行掘鄙。
處理落后和數(shù)據(jù)傾斜
- 解決Task Straggler問題
當(dāng)所有的 Map 任務(wù)在 Shuffle Map Stage 結(jié)尾完成的時候,Shuffle block 塊推送操作可能還沒有完全完成嗡髓。
此時有一批 Map 任務(wù)剛剛開始推送 block 塊操漠,也可能有落后者做不到足夠快地推送 block 塊。不同于 Reduce 任務(wù)中的落后者饿这,我們在 Shuffle Map Stage 結(jié)尾經(jīng)歷的任何延遲都將直接影響作業(yè)的運行時間浊伙。
為了緩解這樣的落后,Magnet允許 Spark Driver 設(shè)置期望等待 block 塊推送/合并操作的時間上限长捧。
magnet服務(wù)設(shè)置了push-merge超時時間嚣鄙,如果block沒有在超時時間內(nèi)完成push-merge,magnet服務(wù)會停止繼續(xù)接受block串结,提前讓reduce task開始執(zhí)行哑子;而未完成push-merge的block舅列,根據(jù)上面中提到的Best-effort方案,reduce task會從MapStatus中獲取狀態(tài)與位置信息卧蜓,直接拉取沒有merge的block數(shù)據(jù)帐要。
然而,它確保Magnet可以提供 push/merge shuffle 的大部分益處弥奸,同時將落后者的負(fù)面影響限制在 Spark 應(yīng)用程序的運行時間內(nèi)榨惠。
- 解決數(shù)據(jù)傾斜
在Spark shuffle過程中,如果某個partition的shuffle數(shù)據(jù)量遠(yuǎn)高于其他partition盛霎,則會出現(xiàn)數(shù)據(jù)傾斜(data skew)問題赠橙。 data skew 不是magnet特有的問題,而是在Spark上已經(jīng)有成熟解決方案愤炸,即 AQE期揪。
magnet需要適配Spark 的adaptive execution特性,同時防止一個magnet服務(wù)上因data skew而導(dǎo)致有 100GB / 1TB級別的數(shù)據(jù)需要merge摇幻。為此横侦,針對上文的算法可以看出,push的blocks的大小都是小于maxBlockSizeToPush绰姻,通過限制 size超過閾值的block被并入到chunk中枉侧;如果超過閾值,則會利用上節(jié)中的Best-effort方案狂芋,直接拉取未完成merge的block數(shù)據(jù)榨馁。而普通的、未有data skew情況的block帜矾,則會走正常的push-merge流程翼虫。
push-based shuffle 配置
服務(wù)器端配置(yarn-site.xml)
# 默認(rèn)的push based shuffle是關(guān)閉的。如果需要開啟請設(shè)置為:org.apache.spark.network.shuffle.RemoteBlockPushResolver屡萤。
spark.shuffle.push.server.mergedShuffleFileManagerImpl=org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager
# 在push-based shuffle期間將合并的shuffle文件劃分為多個塊時最小的大小珍剑,默認(rèn)為2m。
spark.shuffle.push.server.minChunkSizeInMergedShuffleFile=2m
# 緩存大小死陆,可以存儲合并的索引文件
spark.shuffle.push.server.mergedIndexCacheSize=100m
客戶端配置
接下來我們將從源碼的角度進(jìn)行進(jìn)一步的分析招拙。