目前,spark shuffle write有三種方法:hash shuffle靠汁、sort shuffle和tungsten-sort shuffle。從1.2版本開始默認為sort shuffle兢孝, 之前采用hash shuffle局劲。在1.4版本以后可以通過(spark.shuffle.manager = tungsten-sort)開啟tungsten-sort shuffle。
Hash shuffle
Hash shuffle 經(jīng)歷了有兩個階段晚胡,前一階段的過程如圖:
- 每一個Mapper創(chuàng)建出和Reducer數(shù)目相同的bucket灵奖,bucket實際上是一個buffer,其大小為spark.shuffle.file.buffer.kb(默認32KB)估盘。
- Mapper產(chǎn)生的結(jié)果會根據(jù)設(shè)置的partition算法填充到每個bucket中去瓷患,然后再寫入到磁盤文件。
- Reducer從遠端或是本地的block manager中找到相應(yīng)的文件讀取數(shù)據(jù)遣妥。
這一階段的問題:
- 當Mapper數(shù)量和Reducer數(shù)量比較大時擅编,產(chǎn)生輸出大量文件(M * R),這對文件系統(tǒng)是一個非常大的負擔。同時在shuffle數(shù)據(jù)量不大而shuffle文件又非常多的情況下爱态,隨機寫也會嚴重降低IO的性能谭贪。
- 緩存空間占用比較大,一個 worker node 上同時存在的 bucket 個數(shù)可以達到 cores*R 個(一般 worker 同時可以運行 cores 個 ShuffleMapTask)锦担。
第二階段改善了第一階段出現(xiàn)的中間shuffle文件數(shù)量多的問題俭识。
與上一階段相比,在同一個 core 上連續(xù)運行的 ShuffleMapTasks 共用一個輸出文件洞渔,這樣產(chǎn)生shuffle文件的數(shù)量是cores*R套媚,比上一階段減少。
Sort shuffle
與hash shuffle相比磁椒,sort shuffle中每個Mapper只產(chǎn)生一個數(shù)據(jù)文件和一個索引文件堤瘤,數(shù)據(jù)文件中的數(shù)據(jù)按照Reducer排序,但屬于同一個Reducer的數(shù)據(jù)不排序浆熔。Mapper產(chǎn)生的數(shù)據(jù)先放到AppendOnlyMap這個數(shù)據(jù)結(jié)構(gòu)中本辐,如果內(nèi)存不夠,數(shù)據(jù)則會spill到磁盤医增,最后合并成一個文件慎皱。
與Hash shuffle相比,shuffle文件數(shù)量減少调窍,內(nèi)存使用更加可控宝冕。但排序會影響速度。In case you use SSD drives for the temporary data of Spark shuffles, hash shuffle might work better for you(來自于https://0x0fff.com/spark-architecture-shuffle/)邓萨。
Tungsten-sort shuffle
與Sort shuffle相比地梨,Tungsten最大不同在于內(nèi)存管理機制。Tungsten采用獨特的內(nèi)存模型來存儲數(shù)據(jù)缔恳,而Sort shuffle采用Java的數(shù)據(jù)結(jié)構(gòu)AppendOnlyMap來存儲數(shù)據(jù)宝剖,并且存儲的數(shù)據(jù)是序列化的。這種獨特的內(nèi)存模型叫做page歉甚。序列化后的數(shù)據(jù)放在page中万细,當page滿后,spill到磁盤文件纸泄,然后從新allocate一個新的page(如果spark.unsafe.offHeap=true赖钞,會從off-heap分配內(nèi)存,否則聘裁,從in-heap分配內(nèi)存)雪营。最后將page里數(shù)據(jù)和spilled磁盤文件merge到一個文件里。注意merge的時候不需要反序列化(sort shuffle需要)衡便。
為了數(shù)據(jù)record在page中尋址献起,定義了PackedRecordPointer對象用一個64bit的long型變量來記錄如下信息:
[24 bit partition number][13 bit memory page number][27 bit offset in page]洋访。
注意這些信息是用來將數(shù)據(jù)按照partition進行排序。從這些信息中谴餐,我們得到如下的約束姻政。
一是partition 的數(shù)量(Reducer的數(shù)目)最多為2^24=16777216。
二是單條記錄不能大于 2^27=128 MB岂嗓,加上page數(shù)目限制汁展,一個task 能管理到的內(nèi)存最多是 2^13 * 128M 也就是1TB左右。
數(shù)據(jù)是序列化后放在內(nèi)存厌殉,所以占據(jù)的內(nèi)存空間小善镰,減少了spill的次數(shù)。sort是在序列化的數(shù)據(jù)上進行年枕,效率更高。merge時不需要反序列數(shù)據(jù)乎完。