通過文章“Spark Scheduler內(nèi)部原理剖析”我們知道,Spark在DAG調(diào)度階段會(huì)將一個(gè)Job劃分為多個(gè)Stage锅纺,上游Stage做map工作纳寂,下游Stage做reduce工作傻谁,其本質(zhì)上還是MapReduce計(jì)算框架龄砰。Shuffle是連接map和reduce之間的橋梁肛宋,它將map的輸出對(duì)應(yīng)到reduce輸入中舰罚,這期間涉及到序列化反序列化微谓、跨節(jié)點(diǎn)網(wǎng)絡(luò)IO以及磁盤讀寫IO等森篷,所以說Shuffle是整個(gè)應(yīng)用程序運(yùn)行過程中非常昂貴的一個(gè)階段,理解Spark Shuffle原理有助于優(yōu)化Spark應(yīng)用程序豺型。
Spark Shuffle的基本原理與特性
與MapReduce計(jì)算框架一樣仲智,Spark的Shuffle實(shí)現(xiàn)大致如下圖所示,在DAG階段以shuffle為界姻氨,劃分stage钓辆,上游stage做map task,每個(gè)map task將計(jì)算結(jié)果數(shù)據(jù)分成多份肴焊,每一份對(duì)應(yīng)到下游stage的每個(gè)partition中前联,并將其臨時(shí)寫到磁盤,該過程叫做shuffle write娶眷;下游stage做reduce task似嗤,每個(gè)reduce task通過網(wǎng)絡(luò)拉取上游stage中所有map task的指定分區(qū)結(jié)果數(shù)據(jù),該過程叫做shuffle read届宠,最后完成reduce的業(yè)務(wù)邏輯烁落。舉個(gè)栗子,假如上游stage有100個(gè)map task豌注,下游stage有1000個(gè)reduce task伤塌,那么這100個(gè)map task中每個(gè)map task都會(huì)得到1000份數(shù)據(jù),而1000個(gè)reduce task中的每個(gè)reduce task都會(huì)拉取上游100個(gè)map task對(duì)應(yīng)的那份數(shù)據(jù)幌羞,即第一個(gè)reduce task會(huì)拉取所有map task結(jié)果數(shù)據(jù)的第一份寸谜,以此類推竟稳。
在map階段属桦,除了map的業(yè)務(wù)邏輯外熊痴,還有shuffle write的過程,這個(gè)過程涉及到序列化聂宾、磁盤IO等耗時(shí)操作果善;在reduce階段,除了reduce的業(yè)務(wù)邏輯外系谐,還有前面shuffle read過程巾陕,這個(gè)過程涉及到網(wǎng)絡(luò)IO、反序列化等耗時(shí)操作纪他。所以整個(gè)shuffle過程是極其昂貴的鄙煤,spark在shuffle的實(shí)現(xiàn)上也做了很多優(yōu)化改進(jìn),隨著版本的迭代發(fā)布茶袒,spark shuffle的實(shí)現(xiàn)也逐步得到改進(jìn)梯刚。下面詳細(xì)介紹spark shuffle的實(shí)現(xiàn)演進(jìn)過程。
Spark Shuffle實(shí)現(xiàn)演進(jìn)
Spark在1.1以前的版本一直是采用Hash Shuffle的實(shí)現(xiàn)的方式薪寓,到1.1版本時(shí)參考Hadoop MapReduce的實(shí)現(xiàn)開始引入Sort Shuffle亡资,在1.5版本時(shí)開始Tungsten鎢絲計(jì)劃,引入U(xiǎn)nSafe Shuffle優(yōu)化內(nèi)存及CPU的使用向叉,在1.6中將Tungsten統(tǒng)一到Sort Shuffle中锥腻,實(shí)現(xiàn)自我感知選擇最佳Shuffle方式,到最近的2.0版本母谎,Hash Shuffle已被刪除瘦黑,所有Shuffle方式全部統(tǒng)一到Sort Shuffle一個(gè)實(shí)現(xiàn)中。下圖是spark shuffle實(shí)現(xiàn)的一個(gè)版本演進(jìn)销睁。
Hash Shuffle v1
在spark-1.1版本以前供璧,spark內(nèi)部實(shí)現(xiàn)的是Hash Shuffle,其大致原理與前面基本原理介紹中提到的基本一樣冻记,如下圖所示睡毒。
在map階段(shuffle write),每個(gè)map都會(huì)為下游stage的每個(gè)partition寫一個(gè)臨時(shí)文件冗栗,假如下游stage有1000個(gè)partition演顾,那么每個(gè)map都會(huì)生成1000個(gè)臨時(shí)文件,一般來說一個(gè)executor上會(huì)運(yùn)行多個(gè)map task隅居,這樣下來钠至,一個(gè)executor上會(huì)有非常多的臨時(shí)文件,假如一個(gè)executor上運(yùn)行M個(gè)map task胎源,下游stage有N個(gè)partition棉钧,那么一個(gè)executor上會(huì)生成MN個(gè)文件。另一方面涕蚤,如果一個(gè)executor上有K個(gè)core宪卿,那么executor同時(shí)可運(yùn)行K個(gè)task的诵,這樣一來,就會(huì)同時(shí)申請(qǐng)KN個(gè)文件描述符佑钾,一旦partition數(shù)較多西疤,勢(shì)必會(huì)耗盡executor上的文件描述符,同時(shí)生成K*N個(gè)write handler也會(huì)帶來大量內(nèi)存的消耗休溶。
在reduce階段(shuffle read)代赁,每個(gè)reduce task都會(huì)拉取所有map對(duì)應(yīng)的那部分partition數(shù)據(jù),那么executor會(huì)打開所有臨時(shí)文件準(zhǔn)備網(wǎng)絡(luò)傳輸兽掰,這里又涉及到大量文件描述符芭碍,另外,如果reduce階段有combiner操作孽尽,那么它會(huì)把網(wǎng)絡(luò)中拉到的數(shù)據(jù)保存在一個(gè)HashMap
中進(jìn)行合并操作豁跑,如果數(shù)據(jù)量較大,很容易引發(fā)OOM操作泻云。
綜上所述艇拍,Hash Shuffle實(shí)現(xiàn)簡單但是特別naive,在小數(shù)據(jù)量下運(yùn)行比較快宠纯,一旦數(shù)據(jù)量較大卸夕,基本就垮了。當(dāng)然這個(gè)版本的shuffle也是在spark早期版本中婆瓜,隨著版本迭代的進(jìn)行快集,shuffle的實(shí)現(xiàn)也越來越成熟。
Hash Shuffle v2
在上一節(jié)講到每個(gè)map task都要生成N個(gè)partition文件廉白,為了減少文件數(shù)个初,后面引進(jìn)了,目的是減少單個(gè)executor上的文件數(shù)猴蹂。如下圖所示院溺,一個(gè)executor上所有的map task生成的分區(qū)文件只有一份,即將所有的map task相同的分區(qū)文件合并磅轻,這樣每個(gè)executor上最多只生成N個(gè)分區(qū)文件珍逸。
表面上看是減少了文件數(shù),但是假如下游stage的分區(qū)數(shù)N很大聋溜,還是會(huì)在每個(gè)executor上生成N個(gè)文件谆膳,同樣,如果一個(gè)executor上有K個(gè)core撮躁,還是會(huì)開K*N個(gè)writer handler漱病,總體上來說基本沒太解決問題。對(duì)于shuffle read階段跟v1版一樣沒改進(jìn),仍然容易導(dǎo)致OOM杨帽。
Sort Shuffle v1
針對(duì)上述Hash Shuffle的弊端凝果,在spark 1.1.0版本中引入Sort Shuffle,它參考了Hadoop MapReduce中的shuffle實(shí)現(xiàn)睦尽,對(duì)記錄進(jìn)行排序來做shuffle,如下圖所示型雳。
在map階段(shuffle write)当凡,會(huì)按照partition id以及key對(duì)記錄進(jìn)行排序,將所有partition的數(shù)據(jù)寫在同一個(gè)文件中纠俭,該文件中的記錄首先是按照partition id排序一個(gè)一個(gè)分區(qū)的順序排列沿量,每個(gè)partition內(nèi)部是按照key進(jìn)行排序存放,map task運(yùn)行期間會(huì)順序?qū)懨總€(gè)partition的數(shù)據(jù)冤荆,并通過一個(gè)索引文件記錄每個(gè)partition的大小和偏移量朴则。這樣一來,每個(gè)map task一次只開兩個(gè)文件描述符钓简,一個(gè)寫數(shù)據(jù)乌妒,一個(gè)寫索引,大大減輕了Hash Shuffle大量文件描述符的問題外邓,即使一個(gè)executor有K個(gè)core撤蚊,那么最多一次性開K*2個(gè)文件描述符。
在reduce階段(shuffle read)损话,reduce task拉取數(shù)據(jù)做combine時(shí)不再是采用HashMap
侦啸,而是采用ExternalAppendOnlyMap
,該數(shù)據(jù)結(jié)構(gòu)在做combine時(shí)丧枪,如果內(nèi)存不足光涂,會(huì)刷寫磁盤,很大程度的保證了魯棒性拧烦,避免大數(shù)據(jù)情況下的OOM忘闻。
總體上看來Sort Shuffle解決了Hash Shuffle的所有弊端,但是因?yàn)樾枰鋝huffle過程需要對(duì)記錄進(jìn)行排序恋博,所以在性能上有所損失服赎。
Unsafe Shuffle
從spark 1.5.0開始,spark開始了鎢絲計(jì)劃(Tungsten)交播,目的是優(yōu)化內(nèi)存和CPU的使用重虑,進(jìn)一步提升spark的性能。為此秦士,引入U(xiǎn)nsafe Shuffle缺厉,它的做法是將數(shù)據(jù)記錄用二進(jìn)制的方式存儲(chǔ),直接在序列化的二進(jìn)制數(shù)據(jù)上sort而不是在java 對(duì)象上,這樣一方面可以減少memory的使用和GC的開銷提针,另一方面避免shuffle過程中頻繁的序列化以及反序列化命爬。在排序過程中,它提供cache-efficient sorter辐脖,使用一個(gè)8 bytes的指針饲宛,把排序轉(zhuǎn)化成了一個(gè)指針數(shù)組的排序,極大的優(yōu)化了排序性能嗜价。更多Tungsten詳細(xì)介紹請(qǐng)移步databricks博客艇抠。
但是使用Unsafe Shuffle有幾個(gè)限制,shuffle階段不能有aggregate操作久锥,分區(qū)數(shù)不能超過一定大小( 家淤,這是可編碼的最大parition id),所以像reduceByKey這類有aggregate操作的算子是不能使用Unsafe Shuffle瑟由,它會(huì)退化采用Sort Shuffle絮重。
Sort Shuffle v2
從spark-1.6.0開始,把Sort Shuffle和Unsafe Shuffle全部統(tǒng)一到Sort Shuffle中歹苦,如果檢測(cè)到滿足Unsafe Shuffle條件會(huì)自動(dòng)采用Unsafe Shuffle青伤,否則采用Sort Shuffle。從spark-2.0.0開始殴瘦,spark把Hash Shuffle移除潮模,可以說目前spark-2.0中只有一種Shuffle,即為Sort Shuffle痴施。
Spark Shuffle相關(guān)調(diào)優(yōu)
從上述shuffle的原理介紹可以知道擎厢,shuffle是一個(gè)涉及到CPU(序列化反序列化)、網(wǎng)絡(luò)IO(跨節(jié)點(diǎn)數(shù)據(jù)傳輸)以及磁盤IO(shuffle中間結(jié)果落地)的操作辣吃,用戶在編寫spark應(yīng)用程序的時(shí)候應(yīng)當(dāng)盡可能考慮shuffle相關(guān)的優(yōu)化动遭,提升spark應(yīng)用程序的性能。下面簡單列舉幾點(diǎn)關(guān)于spark shuffle調(diào)優(yōu)的參考神得。
* 盡量減少shuffle次數(shù)
// 兩次shuffle
rdd.map(...).repartition(1000).reduceByKey(_ + _, 3000)
// 一次shuffle
rdd.map(...).repartition(3000).reduceByKey(_ + _)
* 必要時(shí)主動(dòng)shuffle厘惦,通常用于改變并行度,提高后續(xù)分布式運(yùn)行速度
* 使用treeReduce & treeAggregate替換reduce & aggregate哩簿。數(shù)據(jù)量較大時(shí)宵蕉,reduce & aggregate一次性聚合,shuffle量太大节榜,而treeReduce & treeAggregate是分批聚合羡玛,更為保險(xiǎn)。
小結(jié)
本文詳細(xì)闡述了spark shuffle的原理以及實(shí)現(xiàn)演進(jìn)宗苍,清楚地知道shuffle原理有助于調(diào)優(yōu)應(yīng)用程序稼稿,并了解應(yīng)用程序執(zhí)行的每個(gè)過程薄榛。
轉(zhuǎn)載來自于Spark Shuffle原理及相關(guān)調(diào)優(yōu)