對比 Hadoop MapReduce 和 Spark 的 Shuffle 過程
如果熟悉 Hadoop MapReduce 中的 shuffle 過程,可能會按照 MapReduce 的思路去想象 Spark 的 shuffle 過程。然而,它們之間有一些區(qū)別和聯(lián)系适刀。
從 high-level 的角度來看衰粹,兩者并沒有大的差別劫流。都是將 mapper(Spark 里是 ShuffleMapTask)的輸出進行 partition捏卓,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一個 stage 里的 ShuffleMapTask,也可能是 ResultTask)否纬。Reducer 以內存作緩沖區(qū)吕晌,邊 shuffle 邊 aggregate 數(shù)據(jù),等到數(shù)據(jù) aggregate 好以后進行 reduce() (Spark 里可能是后續(xù)的一系列操作)临燃。
從 low-level 的角度來看睛驳,兩者差別不小。Hadoop MapReduce 是 sort-based谬俄,進入 combine() 和 reduce() 的 records 必須先 sort。這樣的好處在于 combine/reduce() 可以處理大規(guī)模的數(shù)據(jù)弃理,因為其輸入數(shù)據(jù)可以通過外排得到(mapper 對每段數(shù)據(jù)先做排序溃论,reducer 的 shuffle 對排好序的每段數(shù)據(jù)做歸并)。目前的 Spark 默認選擇的是 hash-based痘昌,通常使用 HashMap 來對 shuffle 來的數(shù)據(jù)進行 aggregate钥勋,不會對數(shù)據(jù)進行提前排序。如果用戶需要經(jīng)過排序的數(shù)據(jù)辆苔,那么需要自己調用類似 sortByKey() 的操作算灸;如果你是Spark 1.1的用戶,可以將spark.shuffle.manager設置為sort驻啤,則會對數(shù)據(jù)進行排序菲驴。在Spark 1.2中,sort將作為默認的Shuffle實現(xiàn)骑冗。
從實現(xiàn)角度來看赊瞬,兩者也有不少差別。Hadoop MapReduce 將處理流程劃分出明顯的幾個階段:map(), spill, merge, shuffle, sort, reduce() 等贼涩。每個階段各司其職巧涧,可以按照過程式的編程思想來逐一實現(xiàn)每個階段的功能。在 Spark 中遥倦,沒有這樣功能明確的階段谤绳,只有不同的 stage 和一系列的 transformation(),所以 spill, merge, aggregate 等操作需要蘊含在 transformation() 中袒哥。
如果我們將 map 端劃分數(shù)據(jù)缩筛、持久化數(shù)據(jù)的過程稱為 shuffle write,而將 reducer 讀入數(shù)據(jù)堡称、aggregate 數(shù)據(jù)的過程稱為 shuffle read歪脏。那么在 Spark 中,問題就變?yōu)樵趺丛?job 的邏輯或者物理執(zhí)行圖中加入 shuffle write 和 shuffle read 的處理邏輯粮呢?以及兩個處理邏輯應該怎么高效實現(xiàn)婿失?
shuffle-write
上圖有 4 個 ShuffleMapTask 要在同一個 worker node 上運行钞艇,CPU core 數(shù)為 2,可以同時運行兩個 task豪硅。每個 task 的執(zhí)行結果(該 stage 的 finalRDD 中某個 partition 包含的 records)被逐一寫到本地磁盤上哩照。每個 task 包含 R 個緩沖區(qū),R = reducer 個數(shù)(也就是下一個 stage 中 task 的個數(shù))懒浮,緩沖區(qū)被稱為 bucket飘弧,其大小為spark.shuffle.file.buffer.kb,默認是 32KB(Spark 1.1 版本以前是 100KB)砚著。
其實 bucket 是一個廣義的概念次伶,代表 ShuffleMapTask 輸出結果經(jīng)過 partition 后要存放的地方,這里為了細化數(shù)據(jù)存放位置和數(shù)據(jù)名稱稽穆,僅僅用 bucket 表示緩沖區(qū)冠王。
ShuffleMapTask 的執(zhí)行過程很簡單:先利用 pipeline 計算得到 finalRDD 中對應 partition 的 records。每得到一個 record 就將其送到對應的 bucket 里舌镶,具體是哪個 bucket 由partitioner.partition(record.getKey()))決定柱彻。每個 bucket 里面的數(shù)據(jù)會不斷被寫到本地磁盤上,形成一個 ShuffleBlockFile餐胀,或者簡稱FileSegment哟楷。之后的 reducer 會去 fetch 屬于自己的 FileSegment,進入 shuffle read 階段否灾。
這樣的實現(xiàn)很簡單卖擅,但有幾個問題:
**產(chǎn)生的 FileSegment 過多。**每個 ShuffleMapTask 產(chǎn)生 R(reducer 個數(shù))個 FileSegment墨技,M 個 ShuffleMapTask 就會產(chǎn)生 M * R 個文件磨镶。一般 Spark job 的 M 和 R 都很大,因此磁盤上會存在大量的數(shù)據(jù)文件健提。
**緩沖區(qū)占用內存空間大琳猫。**每個 ShuffleMapTask 需要開 R 個 bucket,M 個 ShuffleMapTask 就會產(chǎn)生 M * R 個 bucket私痹。雖然一個 ShuffleMapTask 結束后脐嫂,對應的緩沖區(qū)可以被回收,但一個 worker node 上同時存在的 bucket 個數(shù)可以達到 cores * R 個(一般 worker 同時可以運行 cores 個 ShuffleMapTask)紊遵,占用的內存空間也就達到了cores * R * 32 KB账千。對于 8 核 1000 個 reducer 來說,占用內存就是 256MB暗膜。
目前來看匀奏,第二個問題還沒有好的方法解決,因為寫磁盤終究是要開緩沖區(qū)的学搜,緩沖區(qū)太小會影響 IO 速度娃善。但第一個問題有一些方法去解決论衍,下面介紹已經(jīng)在 Spark 里面實現(xiàn)的 FileConsolidation 方法。
未完待續(xù)-
參考:https://github.com/JerryLead/SparkInternals/blob/master/markdown/4-shuffleDetails.md