Spark 2.0 中已經(jīng)移除 Hash Based Shuffle,但作為曾經(jīng)的默認 Shuffle 機制竿裂,還是值得進行分析
Spark 最開始只有 Hash Based Shuffle纵苛,因為在很多場景中并不需要排序剿涮,在這些場景中多余的排序反而會損耗性能。
Hash Based Shuffle Write
該過程實現(xiàn)的核心是在 HashShuffleWriter#write(records: Iterator[Product2[K, V]]): Unit
其主要流程如下:
該函數(shù)的輸入是一個 Shuffle Map Task 計算得到的結(jié)果(對應(yīng)的迭代器)攻人,若在寬依賴中定義了 map 端的聚合則會先進行聚合取试,隨后對于迭代器(若要聚合則為聚合后的迭代器)的每一項先通過計算 key 的 hash 值來確定要寫到哪個文件,然后將 key怀吻、value 寫入文件瞬浓。
寫入的文件名的格式是:shuffle_$shuffleId_$mapId_$reduceId
。寫入時蓬坡,若文件已存在會刪除會創(chuàng)建新文件猿棉。
上圖描述了如何處理一個 Shuffle Map Task 計算結(jié)果,在實際應(yīng)用中屑咳,往往有很多 Shuffle Map Tasks 及下游 tasks萨赁,即如下情況(圖摘自:JerryLead/SparkInternals-Shuffle 過程):
存在的問題
這種簡單的實現(xiàn)會有幾個問題,為說明方便兆龙,這里設(shè) M = Shuffle Map Task 數(shù)量
杖爽,R = 下游 tasks 數(shù)量
:
- 產(chǎn)生過多文件:由于每個 Shuffle Map Task 需要為每個下游的 Task 創(chuàng)建一個單獨的文件,因此文件的數(shù)量就是
M * R
紫皇。如果 Shuffle Map Tasks 數(shù)量是 1000慰安,下游的 tasks 數(shù)是 800,那么理論上會產(chǎn)生 80w 個文件(對于 size 為 0的文件會特殊處理) - 打開多個文件對于系統(tǒng)來說意味著隨機寫聪铺,尤其是每個文件較小且文件特別多的情況化焕。機械硬盤在隨機讀寫方面的性能很差,如果是固態(tài)硬盤铃剔,會改善很多
- 緩沖區(qū)占用內(nèi)存空間大:每個 Shuffle Map Task 需要開 R 個 bucket(為減少寫文件次數(shù)的緩沖區(qū))撒桨,N 個 Shuffle Map Task 就會產(chǎn)生
N * R
個 bucket脂倦。雖然一個 Shuffle Map Task,對應(yīng)的 buckets 會被回收元莫,但一個節(jié)點上的 bucket 個數(shù)最多可以達到cores * R
個赖阻,每個 bucket 默認為 32KB。對于 24 核 1000 個 reducer 來說踱蠢,占用內(nèi)存就是 750MB
改進:Shuffle Consolidate Writer
在上面提到的幾個問題火欧,Spark 提供了 Shuffle Consolidate Files 機制進行優(yōu)化。該機制的手段是減少 Shuffle 過程產(chǎn)生的文件茎截,若使用這個功能苇侵,則需要置 spark.shuffle.consolidateFiles
為 true
,其實現(xiàn)可用下圖來表示(圖摘自:JerryLead/SparkInternals-Shuffle 過程)
即:對于運行在同一個 core 的 Shuffle Map Tasks企锌,對于將要被同一個 reducer read 的數(shù)據(jù)榆浓,第一個 Shuffle Map Task 會創(chuàng)建一個文件,之后的就會將數(shù)據(jù)追加到這個文件而不是新建一個文件(相當于同一個 core 上的 Shuffle Map Task 寫了文件不同的部分)撕攒。因此文件數(shù)就從原來的 M * R
個變成了 cores * R
個陡鹃。當 M / cores
的值越大,減少文件數(shù)的效果越顯著抖坪。需要注意的是萍鲸,該機制雖然在很多時候能緩解上述的幾個問題,但是并不能徹底解決擦俐。
參考
- 《Spark 技術(shù)內(nèi)幕》
- JerryLead/SparkInternals - Shuffle 過程