Spark?Shuffle
以Shuffle為邊界盾舌,Spark將一個Job劃分為不同的Stage隅熙,這些Stage構成了一個大粒度的DAG稽煤。Spark的Shuffle分為Write和Read兩個階段,分屬于兩個不同的Stage囚戚,前者是Parent Stage的最后一步酵熙,后者是Child Stage的第一步。
MapReduce shuffle機制
在MapReduce框架中驰坊,shuffle是連接Map和Reduce之間的橋梁匾二,Map的輸出要用到Reduce中必須經(jīng)過shuffle這個環(huán)節(jié),shuffle的性能高低直接影響了整個程序的性能和吞吐量。
Shuffle是MapReduce框架中的一個特定的phase察藐,介于Map phase和Reduce phase之間借嗽,當Map的輸出結果要被Reduce使用時,輸出結果需要按key哈希转培,并且分發(fā)到每一個Reducer上去恶导,這個過程就是shuffle。由于shuffle涉及到了磁盤的讀寫和網(wǎng)絡的傳輸浸须,因此shuffle性能的高低直接影響到了整個程序的運行效率惨寿。
下圖描述了MapReduce算法的整個流程,其中shuffle phase是介于Map phase和Reduce phase之間:
在Hadoop, 在mapper端每次當memory buffer中的數(shù)據(jù)快滿的時候, 先將memory中的數(shù)據(jù), 按partition進行劃分, 然后各自存成小文件, 這樣當buffer不斷的spill的時候, 就會產(chǎn)生大量的小文件。
所以Hadoop后面直到reduce之前做的所有的事情其實就是不斷的merge, 基于文件的多路并歸排序,在map端的將相同partition的merge到一起, 在reduce端, 把從mapper端copy來的數(shù)據(jù)文件進行merge, 以用于最終的reduce多路歸并排序, 達到兩個目的。
merge, 把相同key的value都放到一個arraylist里面魔招;sort, 最終的結果是按key排序的。
正式的Spark Shuffle原理
在Spark2.0前蕉拢,還是使用的 Hash Shuffle, 而目前大多使用Sort Shuffle
這里的每一個 mapTask 只會產(chǎn)生一個文件(解決了 ReduceTask 導致文件過多的問題,文件數(shù)只跟 maptask 相關)這些數(shù)據(jù)是有序的诚亚,同時為這個文件建立一個索引晕换,他會指定相同 key 的數(shù)據(jù)的索引位置,最終每個 Reducetask 會從每個文件中讀取所需要的那一片數(shù)據(jù)(即它所要聚集的 key 的數(shù)據(jù))站宗。
為什么顆粒度是一個mapTask呢闸准?因為之前文章有提到,Stage是按task從前往后梢灭,按Task進行劃分夷家,所Shuffle Read根據(jù)task來讀取進行下一步操作。
shuffle相關參數(shù)調優(yōu)
以下是Shffule過程中的一些主要參數(shù)敏释,這里詳細講解了各個參數(shù)的功能库快、默認值以及基于實踐經(jīng)驗給出的調優(yōu)建議。
spark.shuffle.file.buffer
默認值:32k
參數(shù)說明:該參數(shù)用于設置shuffle write task的BufferedOutputStream的buffer緩沖大小钥顽。將數(shù)據(jù)寫到磁盤文件之前义屏,會先寫入buffer緩沖中,待緩沖寫滿之后耳鸯,才會溢寫到磁盤湿蛔。
調優(yōu)建議:如果作業(yè)可用的內存資源較為充足的話,可以適當增加這個參數(shù)的大邢嘏馈(比如64k),從而減少shuffle write過程中溢寫磁盤文件的次數(shù)添谊,也就可以減少磁盤IO次數(shù)财喳,進而提升性能。在實踐中發(fā)現(xiàn),合理調節(jié)該參數(shù)耳高,性能會有1%~5%的提升扎瓶。
spark.reducer.maxSizeInFlight
默認值:48m
參數(shù)說明:該參數(shù)用于設置shuffle read task的buffer緩沖大小,而這個buffer緩沖決定了每次能夠拉取多少數(shù)據(jù)泌枪。
調優(yōu)建議:如果作業(yè)可用的內存資源較為充足的話概荷,可以適當增加這個參數(shù)的大小(比如96m)碌燕,從而減少拉取數(shù)據(jù)的次數(shù)误证,也就可以減少網(wǎng)絡傳輸?shù)拇螖?shù),進而提升性能修壕。在實踐中發(fā)現(xiàn)愈捅,合理調節(jié)該參數(shù),性能會有1%~5%的提升慈鸠。
spark.shuffle.io.maxRetries
默認值:3
參數(shù)說明:shuffle read task從shuffle write task所在節(jié)點拉取屬于自己的數(shù)據(jù)時蓝谨,如果因為網(wǎng)絡異常導致拉取失敗,是會自動進行重試的青团。該參數(shù)就代表了可以重試的最大次數(shù)譬巫。如果在指定次數(shù)之內拉取還是沒有成功,就可能會導致作業(yè)執(zhí)行失敗督笆。
調優(yōu)建議:對于那些包含了特別耗時的shuffle操作的作業(yè)缕题,建議增加重試最大次數(shù)(比如60次),以避免由于JVM的full gc或者網(wǎng)絡不穩(wěn)定等因素導致的數(shù)據(jù)拉取失敗胖腾。在實踐中發(fā)現(xiàn)烟零,對于針對超大數(shù)據(jù)量(數(shù)十億~上百億)的shuffle過程,調節(jié)該參數(shù)可以大幅度提升穩(wěn)定性咸作。
spark.shuffle.io.retryWait
默認值:5s
參數(shù)說明:具體解釋同上锨阿,該參數(shù)代表了每次重試拉取數(shù)據(jù)的等待間隔,默認是5s记罚。
調優(yōu)建議:建議加大間隔時長(比如60s)墅诡,以增加shuffle操作的穩(wěn)定性。
spark.shuffle.memoryFraction
默認值:0.2
參數(shù)說明:該參數(shù)代表了Executor內存中桐智,分配給shuffle read task進行聚合操作的內存比例末早,默認是20%。
調優(yōu)建議:在資源參數(shù)調優(yōu)中講解過這個參數(shù)说庭。如果內存充足然磷,而且很少使用持久化操作,建議調高這個比例刊驴,給shuffle read的聚合操作更多內存姿搜,以避免由于內存不足導致聚合過程中頻繁讀寫磁盤寡润。在實踐中發(fā)現(xiàn),合理調節(jié)該參數(shù)可以將性能提升10%左右舅柜。
spark.shuffle.manager
默認值:sort
參數(shù)說明:該參數(shù)用于設置ShuffleManager的類型梭纹。Spark 1.5以后,有三個可選項:hash致份、sort和tungsten-sort变抽。HashShuffleManager是Spark 1.2以前的默認選項,但是Spark 1.2以及之后的版本默認都是SortShuffleManager了氮块。tungsten-sort與sort類似绍载,但是使用了tungsten計劃中的堆外內存管理機制,內存使用效率更高雇锡。
調優(yōu)建議:由于SortShuffleManager默認會對數(shù)據(jù)進行排序逛钻,因此如果你的業(yè)務邏輯中需要該排序機制的話,則使用默認的SortShuffleManager就可以锰提;而如果你的業(yè)務邏輯不需要對數(shù)據(jù)進行排序曙痘,那么建議參考后面的幾個參數(shù)調優(yōu),通過bypass機制或優(yōu)化的HashShuffleManager來避免排序操作立肘,同時提供較好的磁盤讀寫性能边坤。這里要注意的是,tungsten-sort要慎用谅年,因為之前發(fā)現(xiàn)了一些相應的bug茧痒。
spark.shuffle.sort.bypassMergeThreshold
默認值:200
參數(shù)說明:當ShuffleManager為SortShuffleManager時,如果shuffle read task的數(shù)量小于這個閾值(默認是200)融蹂,則shuffle write過程中不會進行排序操作旺订,而是直接按照未經(jīng)優(yōu)化的HashShuffleManager的方式去寫數(shù)據(jù),但是最后會將每個task產(chǎn)生的所有臨時磁盤文件都合并成一個文件超燃,并會創(chuàng)建單獨的索引文件区拳。
調優(yōu)建議:當你使用SortShuffleManager時,如果的確不需要排序操作意乓,那么建議將這個參數(shù)調大一些樱调,大于shuffle read task的數(shù)量。那么此時就會自動啟用bypass機制届良,map-side就不會進行排序了笆凌,減少了排序的性能開銷。但是這種方式下士葫,依然會產(chǎn)生大量的磁盤文件乞而,因此shuffle write性能有待提高。
spark.shuffle.consolidateFiles
默認值:false
參數(shù)說明:如果使用HashShuffleManager为障,該參數(shù)有效晦闰。如果設置為true放祟,那么就會開啟consolidate機制鳍怨,會大幅度合并shuffle write的輸出文件呻右,對于shuffle read task數(shù)量特別多的情況下,這種方法可以極大地減少磁盤IO開銷鞋喇,提升性能声滥。
調優(yōu)建議:如果的確不需要SortShuffleManager的排序機制,那么除了使用bypass機制侦香,還可以嘗試將spark.shffle.manager參數(shù)手動指定為hash落塑,使用HashShuffleManager,同時開啟consolidate機制罐韩。在實踐中嘗試過憾赁,發(fā)現(xiàn)其性能比開啟了bypass機制的SortShuffleManager要高出10%~30%。