更多大數(shù)據(jù)技術(shù)干貨,歡迎關(guān)注“大數(shù)據(jù)技術(shù)進階”微信公眾號胡陪。
Shuffle簡介
Shuffle的本意是洗牌沥寥、混洗的意思,把一組有規(guī)則的數(shù)據(jù)盡量打亂成無規(guī)則的數(shù)據(jù)柠座。而在MapReduce中邑雅,Shuffle更像是洗牌的逆過程,指的是將map端的無規(guī)則輸出按指定的規(guī)則“打亂”成具有一定規(guī)則的數(shù)據(jù)妈经,以便reduce端接收處理淮野。其在MapReduce中所處的工作階段是map輸出后到reduce接收前,具體可以分為map端和reduce端前后兩個部分吹泡。
在Shuffle之前骤星,也就是在map階段,MapReduce會對要處理的數(shù)據(jù)進行分片(split)操作爆哑,為每一個分片分配一個MapTask任務(wù)洞难。接下來map會對每一個分片中的每一行數(shù)據(jù)進行處理得到鍵值對(key,value)此時得到的鍵值對又叫做“中間結(jié)果”。此后便進入reduce階段揭朝,由此可以看出Shuffle階段的作用是處理“中間結(jié)果”队贱。
由于Shuffle涉及到了磁盤的讀寫和網(wǎng)絡(luò)的傳輸,因此Shuffle性能的高低直接影響到了整個程序的運行效率潭袱。
MapReduce Shuffle
Hadoop的核心思想是MapReduce柱嫌,但Shuffle又是MapReduce的核心。Shuffle的主要工作是從Map結(jié)束到Reduce開始之間的過程屯换。Shuffle階段又可以分為Map端的Shuffle和Reduce端的Shuffle编丘。
Map端的Shuffle
下圖是MapReduce Shuffle的官方流程:
因為頻繁的磁盤I/O操作會嚴重的降低效率,因此“中間結(jié)果”不會立馬寫入磁盤彤悔,而是優(yōu)先存儲到map節(jié)點的“環(huán)形內(nèi)存緩沖區(qū)”嘉抓,在寫入的過程中進行分區(qū)(partition),也就是對于每個鍵值對來說蜗巧,都增加了一個partition屬性值千绪,然后連同鍵值對一起序列化成字節(jié)數(shù)組寫入到緩沖區(qū)(緩沖區(qū)采用的就是字節(jié)數(shù)組怔软,默認大小為100M)悔橄。
當(dāng)寫入的數(shù)據(jù)量達到預(yù)先設(shè)置的闕值后便會啟動溢寫出線程將緩沖區(qū)中的那部分數(shù)據(jù)溢出寫(spill)到磁盤的臨時文件中,并在寫入前根據(jù)key進行排序(sort)和合并(combine黄虱,可選操作)。
溢出寫過程按輪詢方式將緩沖區(qū)中的內(nèi)容寫到mapreduce.cluster.local.dir屬性指定的本地目錄中无拗。當(dāng)整個map任務(wù)完成溢出寫后梁只,會對磁盤中這個map任務(wù)產(chǎn)生的所有臨時文件(spill文件)進行歸并(merge)操作生成最終的正式輸出文件,此時的歸并是將所有spill文件中的相同partition合并到一起说敏,并對各個partition中的數(shù)據(jù)再進行一次排序(sort)鸥跟,生成key和對應(yīng)的value-list,文件歸并時盔沫,如果溢寫文件數(shù)量超過參數(shù)min.num.spills.for.combine的值(默認為3)時医咨,可以再次進行合并。
至此map端的工作已經(jīng)全部結(jié)束架诞,最終生成的文件也會存儲在TaskTracker能夠訪問的位置拟淮。每個reduce task不間斷的通過RPC從JobTracker那里獲取map task是否完成的信息,如果得到的信息是map task已經(jīng)完成谴忧,那么Shuffle的后半段開始啟動很泊。
Reduce端的shuffle
當(dāng)mapreduce任務(wù)提交后,reduce task就不斷通過RPC從JobTracker那里獲取map task是否完成的信息沾谓,如果獲知某臺TaskTracker上的map task執(zhí)行完成委造,Shuffle的后半段過程就開始啟動。Reduce端的shuffle主要包括三個階段均驶,copy昏兆、merge和reduce。
每個reduce task負責(zé)處理一個分區(qū)的文件妇穴,以下是reduce task的處理流程:
reduce task從每個map task的結(jié)果文件中拉取對應(yīng)分區(qū)的數(shù)據(jù)亮垫。因為數(shù)據(jù)在map階段已經(jīng)是分好區(qū)了,并且會有一個額外的索引文件記錄每個分區(qū)的起始偏移量伟骨。所以reducetask取數(shù)的時候直接根據(jù)偏移量去拉取數(shù)據(jù)就ok饮潦。
reduce task從每個map task拉取分區(qū)數(shù)據(jù)的時候會進行再次合并,排序携狭,按照自定義的reducer的邏輯代碼去處理继蜡。
最后就是Reduce過程了,在這個過程中產(chǎn)生了最終的輸出結(jié)果逛腿,并將其寫到HDFS上稀并。
為什么要排序
key存在combine操作,排序之后相同的key放到一塊顯然方便做合并操作单默。
reduce task是按key去處理數(shù)據(jù)的碘举。 如果沒有排序那必須從所有數(shù)據(jù)中把當(dāng)前相同key的所有value數(shù)據(jù)拿出來,然后進行reduce邏輯處理搁廓。顯然每個key到這個邏輯都需要做一次全量數(shù)據(jù)掃描引颈,影響性能耕皮,有了排序很方便的得到一個key對于的value集合。
reduce task按key去處理數(shù)據(jù)時蝙场,如果key按順序排序凌停,那么reduce task就按key順序去讀取,顯然當(dāng)讀到的key是文件末尾的key那么就標志數(shù)據(jù)處理完畢售滤。如果沒有排序那還得有其他邏輯來記錄哪些key處理完了罚拟,哪些key沒有處理完。
雖有千萬種理由需要這么做完箩,但是很耗資源赐俗,并且像排序其實我們有些業(yè)務(wù)并不需要排序。
為什么要文件合并
因為內(nèi)存放不下就會溢寫文件弊知,就會發(fā)生多次溢寫阻逮,形成很多小文件,如果不合并吉捶,顯然會小文件泛濫夺鲜,集群需要資源開銷去管理這些小文件數(shù)據(jù)。
任務(wù)去讀取文件的數(shù)增多呐舔,打開的文件句柄數(shù)也會增多币励。
mapreduce是全局有序。單個文件有序珊拼,不代表全局有序食呻,只有把小文件合并一起排序才會全局有序。
Spark的Shuffle
Spark的Shuffle是在MapReduce Shuffle基礎(chǔ)上進行的調(diào)優(yōu)澎现。其實就是對排序仅胞、合并邏輯做了一些優(yōu)化。在Spark中Shuffle write相當(dāng)于MapReduce 的map剑辫,Shuffle read相當(dāng)于MapReduce 的reduce干旧。
Spark豐富了任務(wù)類型,有些任務(wù)之間數(shù)據(jù)流轉(zhuǎn)不需要通過Shuffle妹蔽,但是有些任務(wù)之間還是需要通過Shuffle來傳遞數(shù)據(jù)椎眯,比如寬依賴的group by key以及各種by key算子。寬依賴之間會劃分stage胳岂,而Stage之間就是Shuffle编整,如下圖中的stage0,stage1和stage3之間就會產(chǎn)生Shuffle乳丰。
在Spark的中掌测,負責(zé)shuffle過程的執(zhí)行、計算和處理的組件主要就是ShuffleManager产园,也即shuffle管理器汞斧。ShuffleManager隨著Spark的發(fā)展有兩種實現(xiàn)的方式夜郁,分別為HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort Shuffle兩種断箫。
Spark Shuffle發(fā)展史
Spark 0.8及以前 Hash Based Shuffle
Spark 0.8.1 為Hash Based Shuffle引入File Consolidation機制
Spark 0.9 引入ExternalAppendOnlyMap
Spark 1.1 引入Sort Based Shuffle拂酣,但默認仍為Hash Based Shuffle
Spark 1.2 默認的Shuffle方式改為Sort Based Shuffle
Spark 1.4 引入Tungsten-Sort Based Shuffle
Spark 1.6 Tungsten-sort并入Sort Based Shuffle
Spark 2.0 Hash Based Shuffle退出歷史舞臺
在Spark的版本的發(fā)展秋冰,ShuffleManager在不斷迭代仲义,變得越來越先進。
在Spark 1.2以前剑勾,默認的shuffle計算引擎是HashShuffleManager埃撵。該ShuffleManager而HashShuffleManager有著一個非常嚴重的弊端,就是會產(chǎn)生大量的中間磁盤文件虽另,進而由大量的磁盤IO操作影響了性能暂刘。因此在Spark 1.2以后的版本中,默認的ShuffleManager改成了SortShuffleManager捂刺。
SortShuffleManager相較于HashShuffleManager來說谣拣,有了一定的改進。主要就在于族展,每個Task在進行shuffle操作時森缠,雖然也會產(chǎn)生較多的臨時磁盤文件,但是最后會將所有的臨時文件合并(merge)成一個磁盤文件仪缸,因此每個Task就只有一個磁盤文件贵涵。在下一個stage的shuffle read task拉取自己的數(shù)據(jù)時,只要根據(jù)索引讀取每個磁盤文件中的部分數(shù)據(jù)即可恰画。
Hash Shuffle
HashShuffleManager的運行機制主要分成兩種宾茂,一種是普通運行機制,另一種是合并的運行機制拴还。合并機制主要是通過復(fù)用buffer來優(yōu)化Shuffle過程中產(chǎn)生的小文件的數(shù)量跨晴。Hashshuffle是不具有排序的Shuffle。
普通機制的Hash Shuffle
最開始使用的Hash Based Shuffle片林,每個Mapper會根據(jù)Reducer的數(shù)量創(chuàng)建對應(yīng)的bucket端盆,bucket的數(shù)量是M * R,M是map的數(shù)量拇厢,R是Reduce的數(shù)量爱谁。
如下圖所示:2個core 4個map task 3 個reduce task,會產(chǎn)生4*3=12個小文件孝偎。
優(yōu)化后的Hash Shuffle
普通機制Hash Shuffle會產(chǎn)生大量的小文件(M * R)访敌,對文件系統(tǒng)的壓力也很大,也不利于IO的吞吐量衣盾,后來做了優(yōu)化(設(shè)置spark.shuffle.consolidateFiles=true開啟寺旺,默認false)爷抓,把在同一個core上的多個Mapper輸出到同一個文件,這樣文件數(shù)就變成core * R個了阻塑。
如下圖所示:2個core 4個map task 3 個reduce task蓝撇,會產(chǎn)生2*3=6個小文件。
Hash shuffle合并機制的問題:
如果 Reducer 端的并行任務(wù)或者是數(shù)據(jù)分片過多的話則 Core * Reducer Task 依舊過大陈莽,也會產(chǎn)生很多小文件渤昌。進而引出了更優(yōu)化的sort shuffle。
在Spark 1.2以后的版本中走搁,默認的ShuffleManager改成了SortShuffleManager独柑。
Sort Shuffle
SortShuffleManager的運行機制主要分成兩種,一種是普通運行機制私植,另一種是bypass運行機制忌栅。當(dāng)shuffle read task的數(shù)量小于等于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值時(默認為200),就會啟用bypass機制曲稼。
普通機制的Sort Shuffle
這種機制和mapreduce差不多索绪,在該模式下,數(shù)據(jù)會先寫入一個內(nèi)存數(shù)據(jù)結(jié)構(gòu)中贫悄,此時根據(jù)不同的shuffle算子瑞驱,可能選用不同的數(shù)據(jù)結(jié)構(gòu)。如果是reduceByKey這種聚合類的shuffle算子清女,那么會選用Map數(shù)據(jù)結(jié)構(gòu)钱烟,一邊通過Map進行聚合,一邊寫入內(nèi)存嫡丙;如果是join這種普通的shuffle算子拴袭,那么會選用Array數(shù)據(jù)結(jié)構(gòu),直接寫入內(nèi)存曙博。接著拥刻,每寫一條數(shù)據(jù)進入內(nèi)存數(shù)據(jù)結(jié)構(gòu)之后,就會判斷一下父泳,是否達到了某個臨界閾值般哼。如果達到臨界閾值的話,那么就會嘗試將內(nèi)存數(shù)據(jù)結(jié)構(gòu)中的數(shù)據(jù)溢寫到磁盤惠窄,然后清空內(nèi)存數(shù)據(jù)結(jié)構(gòu)蒸眠。
在溢寫到磁盤文件之前,會先根據(jù)key對內(nèi)存數(shù)據(jù)結(jié)構(gòu)中已有的數(shù)據(jù)進行排序杆融。排序過后楞卡,會分批將數(shù)據(jù)寫入磁盤文件。默認的batch數(shù)量是10000條,也就是說蒋腮,排序好的數(shù)據(jù)淘捡,會以每批1萬條數(shù)據(jù)的形式分批寫入磁盤文件。
一個task將所有數(shù)據(jù)寫入內(nèi)存數(shù)據(jù)結(jié)構(gòu)的過程中池摧,會發(fā)生多次磁盤溢寫操作焦除,也會產(chǎn)生多個臨時文件。最后會將之前所有的臨時磁盤文件都進行合并作彤,由于一個task就只對應(yīng)一個磁盤文件因此還會單獨寫一份索引文件膘魄,其中標識了下游各個task的數(shù)據(jù)在文件中的start offset與endoffset。
SortShuffleManager由于有一個磁盤文件merge的過程宦棺,因此大大減少了文件數(shù)量瓣距,由于每個task最終只有一個磁盤文件所以文件個數(shù)等于上游shuffle write個數(shù)黔帕。
bypass機制的Sort Shuffle
bypass運行機制的觸發(fā)條件如下:
1)shuffle map task數(shù)量小于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值代咸,默認值200。
2)不是聚合類的shuffle算子(比如reduceByKey)成黄。
此時task會為每個reduce端的task都創(chuàng)建一個臨時磁盤文件呐芥,并將數(shù)據(jù)按key進行hash然后根據(jù)key的hash值,將key寫入對應(yīng)的磁盤文件之中奋岁。當(dāng)然思瘟,寫入磁盤文件時也是先寫入內(nèi)存緩沖,緩沖寫滿之后再溢寫到磁盤文件的闻伶。最后滨攻,同樣會將所有臨時磁盤文件都合并成一個磁盤文件,并創(chuàng)建一個單獨的索引文件蓝翰。
該過程的磁盤寫機制其實跟未經(jīng)優(yōu)化的HashShuffleManager是一模一樣的光绕,因為都要創(chuàng)建數(shù)量驚人的磁盤文件,只是在最后會做一個磁盤文件的合并而已畜份。因此少量的最終磁盤文件诞帐,也讓該機制相對未經(jīng)優(yōu)化的HashShuffleManager來說,shuffle read的性能會更好爆雹。
而該機制與普通SortShuffleManager運行機制的不同在于:
第一停蕉,磁盤寫機制不同;
第二,不會進行排序钙态。也就是說慧起,啟用該機制的最大好處在于,shuffle write過程中册倒,不需要進行數(shù)據(jù)的排序操作蚓挤,也就節(jié)省掉了這部分的性能開銷。
Spark Shuffle總結(jié)
Shuffle 過程本質(zhì)上都是將 Map 端獲得的數(shù)據(jù)使用分區(qū)器進行劃分,并將數(shù)據(jù)發(fā)送給對應(yīng)的Reducer 的過程屈尼。
Shuffle作為處理連接map端和reduce端的樞紐册着,其shuffle的性能高低直接影響了整個程序的性能和吞吐量。map端的shuffle一般為shuffle的Write階段脾歧,reduce端的shuffle一般為shuffle的read階段甲捏。Hadoop和spark的shuffle在實現(xiàn)上面存在很大的不同,spark的shuffle分為兩種實現(xiàn)鞭执,分別為HashShuffle和SortShuffle司顿。
HashShuffle又分為普通機制和合并機制,普通機制因為其會產(chǎn)生M * R個數(shù)的巨量磁盤小文件而產(chǎn)生大量性能低下的Io操作兄纺,從而性能較低大溜,因為其巨量的磁盤小文件還可能導(dǎo)致OOM,HashShuffle的合并機制通過重復(fù)利用buffer從而將磁盤小文件的數(shù)量降低到Core * R個估脆,但是當(dāng)Reducer 端的并行任務(wù)或者是數(shù)據(jù)分片過多的時候钦奋,依然會產(chǎn)生大量的磁盤小文件。
SortShuffle也分為普通機制和bypass機制疙赠,普通機制在內(nèi)存數(shù)據(jù)結(jié)構(gòu)(默認為5M)完成排序付材,會產(chǎn)生2M個磁盤小文件。而當(dāng)shuffle map task數(shù)量小于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值圃阳⊙嵯危或者算子不是聚合類的shuffle算子(比如reduceByKey)的時候會觸發(fā)SortShuffle的bypass機制,SortShuffle的bypass機制不會進行排序捍岳,極大的提高了其性能富寿。
在Spark 1.2以前,默認的shuffle計算引擎是HashShuffleManager锣夹,因為HashShuffleManager會產(chǎn)生大量的磁盤小文件而性能低下页徐,在Spark 1.2以后的版本中,默認的ShuffleManager改成了SortShuffleManager晕城。
SortShuffleManager相較于HashShuffleManager來說泞坦,有了一定的改進。主要就在于砖顷,每個Task在進行shuffle操作時贰锁,雖然也會產(chǎn)生較多的臨時磁盤文件,但是最后會將所有的臨時文件合并(merge)成一個磁盤文件滤蝠,因此每個Task就只有一個磁盤文件豌熄。在下一個stage的shuffle read task拉取自己的數(shù)據(jù)時,只要根據(jù)索引讀取每個磁盤文件中的部分數(shù)據(jù)即可物咳。
Spark與MapReduce Shuffle的異同
從整體功能上看锣险,兩者并沒有大的差別。 都是將 mapper(Spark 里是ShuffleMapTask)的輸出進行 partition,不同的 partition 送到不同的reducer(Spark 里 reducer 可能是下一個 stage 里的 ShuffleMapTask芯肤,也可能是ResultTask)巷折。Reducer 以內(nèi)存作緩沖區(qū),邊 shuffle 邊 aggregate 數(shù)據(jù)崖咨,等到數(shù)據(jù)aggregate 好以后進行 reduce(Spark 里可能是后續(xù)的一系列操作)锻拘。
從流程的上看,兩者差別不小击蹲。 Hadoop MapReduce 是 sort-based署拟,進入 combine和 reduce的 records 必須先 sort。這樣的好處在于 combine/reduce可以處理大規(guī)模的數(shù)據(jù)歌豺,因為其輸入數(shù)據(jù)可以通過外排得到(mapper 對每段數(shù)據(jù)先做排序推穷,reducer 的shuffle 對排好序的每段數(shù)據(jù)做歸并)。以前 Spark 默認選擇的是 hash-based类咧,通常使用 HashMap 來對 shuffle 來的數(shù)據(jù)進行合并馒铃,不會對數(shù)據(jù)進行提前排序。如果用戶需要經(jīng)過排序的數(shù)據(jù)轮听,那么需要自己調(diào)用類似 sortByKey的操作骗露。在Spark 1.2之后,sort-based變?yōu)槟J的Shuffle實現(xiàn)血巍。
從流程實現(xiàn)角度來看,兩者也有不少差別珊随。 Hadoop MapReduce 將處理流程劃分出明顯的幾個階段:map, spill, merge, shuffle, sort, reduce等述寡。每個階段各司其職,可以按照過程式的編程思想來逐一實現(xiàn)每個階段的功能叶洞。在 Spark 中鲫凶,沒有這樣功能明確的階段,只有不同的 stage 和一系列的 transformation衩辟,所以 spill, merge, aggregate等操作需要蘊含在 transformation中螟炫。