3.6 Shuffle機(jī)制
在MapReduce框架中,Shuffle是連接Map和Reduce之間的橋梁柠座,Map的輸出要用到Reduce中必須經(jīng)過(guò)Shuffle這個(gè)環(huán)節(jié),Shuffle的性能高低直接影響了整個(gè)程序的性能和吞吐量片橡。Spark作為MapReduce框架的一種實(shí)現(xiàn)妈经,自然也實(shí)現(xiàn)了Shuffle的邏輯。對(duì)于大數(shù)據(jù)計(jì)算框架而言捧书,Shuffle階段的效率是決定性能好壞的關(guān)鍵因素之一吹泡。
3.6.1 什么是Shuffle
Shuffle是MapReduce框架中的一個(gè)特定的階段,介于Map階段和Reduce階段之間经瓷,當(dāng)Map的輸出結(jié)果要被Reduce使用時(shí)爆哑,輸出結(jié)果需要按關(guān)鍵字值(key)哈希,并且分發(fā)到每一個(gè)Reducer上舆吮,這個(gè)過(guò)程就是Shuffle揭朝。直觀(guān)來(lái)講,Spark Shuffle機(jī)制是將一組無(wú)規(guī)則的數(shù)據(jù)轉(zhuǎn)換為一組具有一定規(guī)則數(shù)據(jù)的過(guò)程色冀。由于Shuffle涉及了磁盤(pán)的讀寫(xiě)和網(wǎng)絡(luò)的傳輸潭袱,因此Shuffle性能的高低直接影響整個(gè)程序的運(yùn)行效率。
在MapReduce計(jì)算框架中锋恬,Shuffle連接了Map階段和Reduce階段屯换,即每個(gè)Reduce Task從每個(gè)Map Task產(chǎn)生的數(shù)據(jù)中讀取一片數(shù)據(jù),極限情況下可能觸發(fā)M*R個(gè)數(shù)據(jù)拷貝通道(M是Map Task數(shù)目,R是Reduce Task數(shù)目)彤悔。通常Shuffle分為兩部分:Map階段的數(shù)據(jù)準(zhǔn)備和Reduce階段的數(shù)據(jù)拷貝嘉抓。首先,Map階段需根據(jù)Reduce階段的Task數(shù)量決定每個(gè)Map Task輸出的數(shù)據(jù)分片數(shù)目晕窑,有多種方式存放這些數(shù)據(jù)分片:
1)保存在內(nèi)存中或者磁盤(pán)上(Spark和MapReduce都存放在磁盤(pán)上)抑片。
2)每個(gè)分片對(duì)應(yīng)一個(gè)文件(現(xiàn)在Spark采用的方式,以及以前MapReduce采用的方式)杨赤,或者所有分片放到一個(gè)數(shù)據(jù)文件中敞斋,外加一個(gè)索引文件記錄每個(gè)分片在數(shù)據(jù)文件中的偏移量(現(xiàn)在MapReduce采用的方式)。
因此可以認(rèn)為Spark Shuffle與Mapreduce Shuffle的設(shè)計(jì)思想相同望拖,但在實(shí)現(xiàn)細(xì)節(jié)和優(yōu)化方式上不同。
在Spark中挫鸽,任務(wù)通常分為兩種说敏,Shuffle mapTask和reduceTask,具體邏輯如圖3-11所示:
[插圖]
圖3-11 Spark Shuffl e
圖3-11中的主要邏輯如下:
1)首先每一個(gè)MapTask會(huì)根據(jù)ReduceTask的數(shù)量創(chuàng)建出相應(yīng)的bucket, bucket的數(shù)量是M×R丢郊,其中M是Map的個(gè)數(shù)盔沫,R是Reduce的個(gè)數(shù)。
2)其次MapTask產(chǎn)生的結(jié)果會(huì)根據(jù)設(shè)置的partition算法填充到每個(gè)bucket中枫匾。這里的partition算法是可以自定義的架诞,當(dāng)然默認(rèn)的算法是根據(jù)key哈希到不同的bucket中。
當(dāng)ReduceTask啟動(dòng)時(shí)干茉,它會(huì)根據(jù)自己task的id和所依賴(lài)的Mapper的id從遠(yuǎn)端或本地的block manager中取得相應(yīng)的bucket作為Reducer的輸入進(jìn)行處理谴忧。
這里的bucket是一個(gè)抽象概念,在實(shí)現(xiàn)中每個(gè)bucket可以對(duì)應(yīng)一個(gè)文件角虫,可以對(duì)應(yīng)文件的一部分或是其他等沾谓。Spark shuffle可以分為兩部分:
1)將數(shù)據(jù)分成bucket,并將其寫(xiě)入磁盤(pán)的過(guò)程稱(chēng)為Shuffle Write戳鹅。
2)在存儲(chǔ)Shuffle數(shù)據(jù)的節(jié)點(diǎn)Fetch數(shù)據(jù)均驶,并執(zhí)行用戶(hù)定義的聚集操作,這個(gè)過(guò)程稱(chēng)為Shuffle Fetch枫虏。
3.6.2 Shuffle歷史及細(xì)節(jié)
下面介紹Shuffle Write與Fetch妇穴。
1. Shuffle Write
在Spark的早期版本實(shí)現(xiàn)中,Spark在每一個(gè)MapTask中為每個(gè)ReduceTask創(chuàng)建一個(gè)bucket隶债,并將RDD計(jì)算結(jié)果放進(jìn)bucket中腾它。
但早期的Shuffle Write有兩個(gè)比較大的問(wèn)題。
1)Map的輸出必須先全部存儲(chǔ)到內(nèi)存中死讹,然后寫(xiě)入磁盤(pán)携狭。這對(duì)內(nèi)存是非常大的開(kāi)銷(xiāo),當(dāng)內(nèi)存不足以存儲(chǔ)所有的Map輸出時(shí)就會(huì)出現(xiàn)OOM(Out of Memory)回俐。
2)每個(gè)MapTask會(huì)產(chǎn)生與ReduceTask數(shù)量一致的Shuffle文件逛腿,如果MapTask個(gè)數(shù)是1k, ReduceTask個(gè)數(shù)也是1k稀并,就會(huì)產(chǎn)生1M個(gè)Shuffle文件。這對(duì)于文件系統(tǒng)是比較大的壓力单默,同時(shí)在Shuffle數(shù)據(jù)量不大而Shuffle文件又非常多的情況下碘举,隨機(jī)寫(xiě)也會(huì)嚴(yán)重降低IO的性能。
后來(lái)到了Spark 0.8版實(shí)現(xiàn)時(shí)搁廓,顯著減少了Shuffle的內(nèi)存壓力引颈,現(xiàn)在Map輸出不需要先全部存儲(chǔ)在內(nèi)存中,再flush到硬盤(pán)境蜕,而是record-by-record寫(xiě)入磁盤(pán)中蝙场。對(duì)于Shuffle文件的管理也獨(dú)立出新的ShuffleBlockManager進(jìn)行管理,而不是與RDD cache文件在一起了粱年。
但是Spark 0.8版的Shuffle Write仍然有兩個(gè)大的問(wèn)題沒(méi)有解決售滤。
1)Shuffle文件過(guò)多的問(wèn)題。這會(huì)導(dǎo)致文件系統(tǒng)的壓力過(guò)大并降低IO的吞吐量台诗。
2)雖然Map輸出數(shù)據(jù)不再需要預(yù)先存儲(chǔ)在內(nèi)存中然后寫(xiě)入磁盤(pán)完箩,從而顯著減少了內(nèi)存壓力。但是新引入的DiskObjectWriter所帶來(lái)的buffer開(kāi)銷(xiāo)也是不容小視的內(nèi)存開(kāi)銷(xiāo)拉队。假定有1k個(gè)MapTask和1k個(gè)ReduceTask弊知,就會(huì)有1M個(gè)bucket,相應(yīng)地就會(huì)有1M個(gè)write handler粱快,而每一個(gè)write handler默認(rèn)需要100KB內(nèi)存秩彤,那么總共需要100GB內(nèi)存。這樣僅僅是buffer就需要這么多的內(nèi)存事哭。因此當(dāng)ReduceTask數(shù)量很多時(shí)呐舔,內(nèi)存開(kāi)銷(xiāo)會(huì)很大。
為了解決shuffle文件過(guò)多的情況慷蠕,Spark后來(lái)引入了新的Shuffle consolidation珊拼,以期顯著減少Shuffle文件的數(shù)量。
Shuffle consolidation的原理如圖3-12所示:
[插圖]
圖3-12 Shuffl e consolidation
在圖3-12中流炕,假定該job有4個(gè)Mapper和4個(gè)Reducer澎现,有2個(gè)core能并行運(yùn)行兩個(gè)task∶勘伲可以算出Spark的Shuffle Write共需要16個(gè)bucket剑辫,也就有了16個(gè)write handler。在之前的Spark版本中渠欺,每個(gè)bucket對(duì)應(yīng)一個(gè)文件妹蔽,因此在這里會(huì)產(chǎn)生16個(gè)shuffle文件。
而在Shuffle consolidation中,每個(gè)bucket并非對(duì)應(yīng)一個(gè)文件胳岂,而是對(duì)應(yīng)文件中的一個(gè)segment编整。同時(shí)Shuffle consolidation產(chǎn)生的Shuffle文件數(shù)量與Spark core的個(gè)數(shù)也有關(guān)系。在圖3-12中乳丰,job中的4個(gè)Mapper分為兩批運(yùn)行掌测,在第一批2個(gè)Mapper運(yùn)行時(shí)會(huì)申請(qǐng)8個(gè)bucket,產(chǎn)生8個(gè)Shuffle文件产园;而在第二批Mapper運(yùn)行時(shí)汞斧,申請(qǐng)的8個(gè)bucket并不會(huì)再產(chǎn)生8個(gè)新的文件,而是追加寫(xiě)到之前的8個(gè)文件后面什燕,這樣一共就只有8個(gè)Shuffle文件粘勒,而在文件內(nèi)部共有16個(gè)不同的segment。因此從理論上講Shuffle consolidation產(chǎn)生的Shuffle文件數(shù)量為C×R屎即,其中C是Spark集群的core number, R是Reducer的個(gè)數(shù)庙睡。
很顯然,當(dāng)M=C時(shí)剑勾,Shuffle consolidation產(chǎn)生的文件數(shù)和之前的實(shí)現(xiàn)相同埃撵。
Shuffle consolidation顯著減少了Shuffle文件的數(shù)量赵颅,解決了Spark之前實(shí)現(xiàn)中一個(gè)比較嚴(yán)重的問(wèn)題虽另。但是Writer handler的buffer開(kāi)銷(xiāo)過(guò)大依然沒(méi)有減少,若要減少Writer handler的buffer開(kāi)銷(xiāo)饺谬,只能減少Reducer的數(shù)量捂刺,但是這又會(huì)引入新的問(wèn)題。
2. Shuffle Fetch與Aggregator
Shuffle Write寫(xiě)出去的數(shù)據(jù)要被Reducer使用募寨,就需要Shuffle Fetch將所需的數(shù)據(jù)Fetch過(guò)來(lái)族展。這里的Fetch操作包括本地和遠(yuǎn)端,因?yàn)镾huffle數(shù)據(jù)有可能一部分是存儲(chǔ)在本地的拔鹰。在早期版本中仪缸,Spark對(duì)Shuffle Fetcher實(shí)現(xiàn)了兩套不同的框架:NIO通過(guò)socket連接Fetch數(shù)據(jù);OIO通過(guò)netty server去fetch數(shù)據(jù)列肢。分別對(duì)應(yīng)的類(lèi)是Basic-BlockFetcherIterator和NettyBlockFetcherIterator恰画。
目前在Spark1.5.0中做了優(yōu)化。新版本定義了類(lèi)ShuffleBlockFetcherIterator來(lái)完成數(shù)據(jù)的fetch瓷马。對(duì)于local的數(shù)據(jù)拴还,ShuffleBlockFetcherIterator會(huì)通過(guò)local的BlockMan-ager來(lái)fetch。對(duì)于遠(yuǎn)端的數(shù)據(jù)塊欧聘,它通過(guò)BlockTransferService類(lèi)來(lái)完成片林。具體實(shí)現(xiàn)參見(jiàn)如下代碼:
? ? ? ? ? ? ? [ShuffleBlockFetcherIterator.scala]
/* fetch local數(shù)據(jù)塊 */
private[this] def fetchLocalBlocks() {
在MapReduce的Shuffle過(guò)程中,Shuffle fetch過(guò)來(lái)的數(shù)據(jù)會(huì)進(jìn)行歸并排序(merge sort),使得相同key下的不同value按序歸并到一起供Reducer使用费封,這個(gè)過(guò)程如圖3-13所示:
[插圖]
圖3-13 Fetch merge
這些歸并排序都是在磁盤(pán)上進(jìn)行的焕妙,這樣做雖然有效地控制了內(nèi)存使用,但磁盤(pán)IO卻大幅增加了孝偎。雖然Spark屬于MapReduce體系访敌,但是對(duì)傳統(tǒng)的MapReduce算法進(jìn)行了一定的改變。Spark假定在大多數(shù)應(yīng)用場(chǎng)景下衣盾,Shuffle數(shù)據(jù)的排序不是必須的寺旺,如word count。強(qiáng)制進(jìn)行排序只會(huì)使性能變差势决,因此Spark并不在Reducer端做歸并排序阻塑。既然沒(méi)有歸并排序,那Spark是如何進(jìn)行reduce的呢果复?這就涉及下面要講的Shuffle Aggregator了陈莽。
Aggregator本質(zhì)上是一個(gè)hashmap,它是以map output的key為key虽抄,以任意所要combine的類(lèi)型為value的hashmap走搁。
在做word count reduce計(jì)算count值時(shí),它會(huì)將Shuffle fetch到的每一個(gè)key-value對(duì)更新或是插入hashmap中(若在hashmap中沒(méi)有查找到迈窟,則插入其中私植;若查找到,則更新value值)车酣。這樣就不需要預(yù)先把所有的key-value進(jìn)行merge sort曲稼,而是來(lái)一個(gè)處理一個(gè),省去了外部排序這一步驟湖员。但同時(shí)需要注意的是贫悄,reducer的內(nèi)存必須足以存放這個(gè)partition的所有key和count值,因此對(duì)內(nèi)存有一定的要求娘摔。
在上面word count的例子中窄坦,因?yàn)関alue會(huì)不斷地更新,而不需要將其全部記錄在內(nèi)存中凳寺,因此內(nèi)存的使用還是比較少的鸭津。考慮一下如果是groupByKey這樣的操作读第,Reducer需要得到key對(duì)應(yīng)的所有value曙博。在Hadoop MapReduce中,由于有了歸并排序怜瞒,因此給予Reducer的數(shù)據(jù)已經(jīng)是group by key了父泳,而Spark沒(méi)有這一步般哼,因此需要將key和對(duì)應(yīng)的value全部存放在hashmap中,并將value合并成一個(gè)array惠窄≌裘撸可以想象為了能夠存放所有數(shù)據(jù),用戶(hù)必須確保每一個(gè)partition小到內(nèi)存能夠容納杆融,這對(duì)于內(nèi)存是非常嚴(yán)峻的考驗(yàn)楞卡。因此在Spark文檔中,建議用戶(hù)涉及這類(lèi)操作時(shí)盡量增加partition脾歇,也就是增加Mapper和Reducer的數(shù)量蒋腮。
增加Mapper和Reducer的數(shù)量固然可以減小partition的大小,使內(nèi)存可以容納這個(gè)partition藕各。但是在Shuffle write中提到池摧,bucket和對(duì)應(yīng)于bucket的write handler是由Mapper和Reducer的數(shù)量決定的,task越多激况,bucket就會(huì)增加得更多作彤,由此帶來(lái)write handler所需的buffer也會(huì)更多。在一方面我們?yōu)榱藴p少內(nèi)存的使用采取了增加task數(shù)量的策略乌逐,另一方面task數(shù)量增多又會(huì)帶來(lái)buffer開(kāi)銷(xiāo)更大的問(wèn)題竭讳,因此陷入了內(nèi)存使用的兩難境地。
為了減少內(nèi)存的使用浙踢,只能將Aggregator的操作從內(nèi)存移到磁盤(pán)上進(jìn)行绢慢,因此Spark新版本中提供了外部排序的實(shí)現(xiàn),以解決這個(gè)問(wèn)題成黄。
Spark將需要聚集的數(shù)據(jù)分為兩類(lèi):不需要?dú)w并排序和需要?dú)w并排序的數(shù)據(jù)呐芥。對(duì)于前者逻杖,在內(nèi)存中的AppendOnlyMap中對(duì)數(shù)據(jù)聚集奋岁。對(duì)于需要?dú)w并排序的數(shù)據(jù),現(xiàn)在內(nèi)存中進(jìn)行聚集荸百,當(dāng)內(nèi)存數(shù)據(jù)達(dá)到閾值時(shí)闻伶,將數(shù)據(jù)排序后寫(xiě)入磁盤(pán)。事實(shí)上够话,磁盤(pán)上的數(shù)據(jù)只是全部數(shù)據(jù)的一部分蓝翰,最后將磁盤(pán)數(shù)據(jù)全部進(jìn)行歸并排序和聚集。具體Aggregator的邏輯可以參見(jiàn)Aggregator類(lèi)的實(shí)現(xiàn)女嘲。