轉(zhuǎn)自:https://yq.aliyun.com/articles/461770?spm=a2c4e.11163080.searchblog.129.49792ec1bgg2MF
目錄
3.3.2 優(yōu)化數(shù)據(jù)結(jié)構(gòu)
4.2.1 使用Hive ETL預(yù)處理數(shù)據(jù)
4.4.2 將reduce join 轉(zhuǎn)化為map join
Apache Spark廣泛用于大規(guī)模數(shù)據(jù)處理方面,憑借支持交互查詢慧邮、流式計算的性能快速嶄露頭角。Spark需要快速的處理海量數(shù)據(jù)邻储,由此針對Spark進行性能調(diào)優(yōu)顯得十分必要赋咽。Spark調(diào)優(yōu)包含眾多方面旧噪,本文中僅針對于:資源吨娜、程序開發(fā)、數(shù)據(jù)傾斜三方面的調(diào)優(yōu)進行闡述淘钟。資源優(yōu)化方面宦赠,針對特定生產(chǎn)環(huán)境設(shè)置集群配置參數(shù);程序開發(fā)優(yōu)化方面米母,對于RDD的使用勾扭、算子的使用、數(shù)據(jù)及數(shù)據(jù)結(jié)構(gòu)等方面進行了闡述铁瞒;數(shù)據(jù)傾斜優(yōu)化方面妙色,在簡單了解數(shù)據(jù)傾斜之后,提出了解決數(shù)據(jù)傾斜的三類方法:優(yōu)化數(shù)據(jù)源慧耍、優(yōu)化并行度身辨、優(yōu)化數(shù)據(jù)結(jié)構(gòu)。
關(guān)鍵字:?Spark調(diào)優(yōu)芍碧、數(shù)據(jù)傾斜煌珊、開發(fā)調(diào)優(yōu)
Spark調(diào)優(yōu)綜述
Apache Spark 是專為大規(guī)模數(shù)據(jù)處理而設(shè)計的快速通用的計算引擎泌豆,在非常短的時間內(nèi)嶄露頭角定庵,它的API更豐富,且支持交互式查詢、流式計算蔬浙、機器學(xué)習(xí)等猪落。與曾經(jīng)引爆大數(shù)據(jù)產(chǎn)業(yè)革命的Hadoop Mapreduce相比它具有更快的速度。然而畴博,想要讓Spark作業(yè)擁有更好的性能需要一定的技巧许布。如果沒有對于Spark作業(yè)進行合理的優(yōu)化,那么Spark作業(yè)的執(zhí)行速度可能大大降低绎晃,這樣Spark的速度優(yōu)勢就不能完全體現(xiàn)蜜唾。由此可見,對于Spark作業(yè)進行優(yōu)化十分有必要庶艾。
想要對Spark作業(yè)進行優(yōu)化袁余,了解Spark作業(yè)的執(zhí)行原理十分有必要。通過對于Spark作業(yè)的執(zhí)行流程的分析咱揍,有助于做出適合的優(yōu)化操作颖榜。同時,Spark的表現(xiàn)實際上是由很多方面決定的煤裙,對于Spark性能調(diào)優(yōu)也是由很多部分組成掩完,不是調(diào)節(jié)幾個參數(shù)就可以大幅度提升作業(yè)性能的。我們需要結(jié)合實際應(yīng)用場景對Spark作業(yè)進行綜合分析硼砰,調(diào)節(jié)多個方面且蓬,才能獲得更好的性能。在本文中我們將對資源题翰、開發(fā)恶阴、數(shù)據(jù)傾斜三個方面進行原理介紹及優(yōu)化。
2.1?優(yōu)化資源參數(shù)
了解了Spark作業(yè)的基本原理之后冯事,對于資源相關(guān)的參數(shù)進行調(diào)優(yōu)就比較容易理解了。對資源參數(shù)進行調(diào)優(yōu)就是對Spark作業(yè)運行過程中的需要使用到資源的地方血公,通過調(diào)節(jié)各種參數(shù)昵仅,來優(yōu)化資源的使用率,從而提升Spark作業(yè)的執(zhí)行性能累魔。
各個參數(shù)對應(yīng)于作業(yè)運行原理中的某個部分摔笤。在Spark中我們有三種方式來設(shè)置資源參數(shù),按照優(yōu)先級排序依是:(1)用戶代碼中顯示調(diào)用set()方法設(shè)置薛夜;(2)通過Spark-submit傳遞參數(shù)籍茧;(3)配置文件。當(dāng)以上三種方法均沒有設(shè)置參數(shù)的值時梯澜,Spark將使用系統(tǒng)默認值寞冯,下面對主要參數(shù)的配置進行產(chǎn)闡述渴析。
(1)num-executors。該參數(shù)用于設(shè)計Spark作業(yè)總的Executor的個數(shù)吮龄。YARN集群管理器盡可能根據(jù)num-executor設(shè)置在工作節(jié)點上啟動Executor俭茧。這個參數(shù)十分重要,Spark默認只會啟動很少的進程漓帚,這時并行度不夠母债,任務(wù)執(zhí)行速度十分緩慢。一般為每個Spark作業(yè)設(shè)置50~100個Executor尝抖,設(shè)置Executor太多大部分隊列無法給予充分的資源毡们;設(shè)置Executor太少無法充分利用集群性能。
(2)executor-memory昧辽。這個參數(shù)設(shè)置每個Executor進程的內(nèi)存衙熔,Executor內(nèi)存的大小,很多程度上直接決定了Spark作業(yè)的性能搅荞,而且跟很常見的java虛擬機內(nèi)存溢出異常也有關(guān)系饵逐。這里executor-memory最大可以設(shè)置為式(3.1)所示菇怀,其中rMemory表示資源隊列的最大內(nèi)存限制煤墙。
executor-memory=rMemory/num-executor?? ???????公式(3.1)
(3)executor-core距误,該參數(shù)用于設(shè)置每個Executor進程的CPU core數(shù)量。因為每個CPU core同一時間只能執(zhí)行一個task線程茉贡,所以executor-core的個數(shù)決定了Executor進程的并發(fā)線程能力塞栅。該參數(shù)設(shè)置為2-4比較合適。
(4)driver-memory块仆,該參數(shù)用于設(shè)置Driver進程的內(nèi)存构蹬。這個參數(shù)通常不設(shè)置王暗,但是要注意的一點是悔据,使用collect算子時,一定要保證Driver內(nèi)存足夠大俗壹,否則會出現(xiàn)內(nèi)存溢出的錯誤科汗。
(5)Spark.default.parallelism,該參數(shù)用于設(shè)置每個Stage默認的task數(shù)量绷雏。該參數(shù)使用默認值時头滔,Spark會根據(jù)底層HDFS的block數(shù)量設(shè)置task數(shù)量,通常一個block對應(yīng)一個task涎显,這樣task的數(shù)量通常是偏少的坤检。由于task是真正執(zhí)行Spark作業(yè)的線程,如果task數(shù)量太少期吓,那么Executor中將面臨有足夠資源卻沒有偶task執(zhí)行的窘境早歇,針對Executor所做的優(yōu)化也將前功盡棄。這里Spark.default.parallelism的大小可以用式(3.2)計算。
Spark.default.parallelism=[2,3]*num-executors * executor-cores ????公式(3.2)
(6)Spark.storage.memoryFraction箭跳,該參數(shù)用于設(shè)置RDD持久化數(shù)據(jù)在Executor內(nèi)存中能占的比例晨另,默認是0.6。也就是說谱姓,默認Executor 60%的內(nèi)存借尿,可以用來保存持久化的RDD數(shù)據(jù)。當(dāng)Spark作業(yè)中有較多RDD需要進行持久化操作時屉来,可以將該參數(shù)值調(diào)高路翻;當(dāng)Spark作業(yè)中有較少RDD需要進行持久化操作時,可以將該參數(shù)值調(diào)低茄靠。
(7)Spark.Shuffle.memoryFraction帚桩,該參數(shù)用于設(shè)置Shuffle過程中一個task拉取到上個Stage的task的輸出后,進行聚合操作時能夠使用的Executor內(nèi)存的比例嘹黔,默認是0.2账嚎。如果Spark作業(yè)中RDD持久化操作較少,Shuffle操作較多時儡蔓,可以將該參數(shù)調(diào)高郭蕉;如果Spark作業(yè)中RDD持久化操作較多,Shuffle操作較少時喂江,可以將該參數(shù)調(diào)低召锈。
在編寫Spark程序之初我們就要注意性能優(yōu)化获询。實現(xiàn)同一個目的的Spark程序往往因為:使用的算子不同涨岁、RDD的復(fù)用程度不同、序列化方式不同等表現(xiàn)出性能方面的差異吉嚣。開發(fā)調(diào)優(yōu)就是優(yōu)化RDD梢薪、優(yōu)化算子、優(yōu)化數(shù)據(jù)的過程尝哆,通過遵循開發(fā)調(diào)優(yōu)的原則并將這些原則根據(jù)具體的業(yè)務(wù)應(yīng)用到實際中秉撇,可能很好的實現(xiàn)Spark作業(yè)的性能提升。
3.1?優(yōu)化RDD
3.1.1避免創(chuàng)建重復(fù)的RDD
通常來說秋泄,一個Spark作業(yè)就是對某個數(shù)據(jù)源創(chuàng)建RDD琐馆,然后對這個RDD進行轉(zhuǎn)化和行為操作。通過轉(zhuǎn)化操作恒序,得到下一個RDD瘦麸;通過行為操作,得到處理結(jié)果歧胁。在開發(fā)過程中需要注意滋饲,對于同一份數(shù)據(jù)彤敛,只應(yīng)該創(chuàng)建一個RDD,不能創(chuàng)建多個RDD代表同一份數(shù)據(jù)了赌。使用多個RDD代表同一份數(shù)據(jù)源時常常會增加作業(yè)的性能開銷墨榄,這些開銷包括:(1)創(chuàng)建新RDD的開銷;(2)從外部系統(tǒng)中加載數(shù)據(jù)到RDD中的開銷勿她;(3)重復(fù)計算的開銷袄秩。
3.1.2?盡可能復(fù)用一個RDD
在對不同的數(shù)據(jù)執(zhí)行算子操作時應(yīng)該盡量復(fù)用一個RDD。例如逢并,當(dāng)RDD A的數(shù)據(jù)格式是key-value類型的之剧,RDD B的數(shù)據(jù)格式是value類型的,但是這兩個RDD的value數(shù)據(jù)完全相同砍聊;那么背稼,RDD A包含了RDD B中的所有信息,理論上來說RDD B可以被替代玻蝌,而實際開發(fā)中也應(yīng)該盡量減少多個RDD數(shù)據(jù)有重復(fù)或者包含的情況蟹肘,這樣可以盡可能減少RDD的數(shù)量從而減少算子執(zhí)行的次數(shù)。
3.1.3?對多次使用的RDD進行持久化
Spark使用懶惰計算俯树,執(zhí)行轉(zhuǎn)化操作時并不馬上執(zhí)行命令而是維護一張記錄了執(zhí)行RDD轉(zhuǎn)化關(guān)系的譜系圖帘腹,如圖3.1所示。每次同一個RDD執(zhí)行多個算子運算時都會重新從源頭處計算一次许饿,得到該RDD后阳欲,在對這個RDD執(zhí)行算子操作,這種方式極大的損耗了Spark集群的資源陋率。對于這種情況球化,應(yīng)該對于多次使用的RDD進行持久化操作,持久化操作會將RDD數(shù)據(jù)保存到內(nèi)存或者磁盤中瓦糟,以后每次使用這個RDD時都不必重新計算筒愚,而是從內(nèi)存或磁盤中直接取出該持久化RDD。
圖3.1 RDD轉(zhuǎn)化譜系圖
RDD的持有化有幾種不同的級別狸页,分別是:MEMORY_ONLY锨能、MEMORY_AND_DISK、MEMORY_ONLY_SER芍耘、MEMORY_AND_DISK_SER、DISK_ONLY熄阻、MEMORY_ONLY_2等斋竞,表3.1中對各種級別的含義進行了簡單的介紹。這幾種持久化級別使用的優(yōu)先級排序如下:
(1)MEMORY_ONLY性能最高秃殉,直接將RDD存儲在內(nèi)存中坝初,省區(qū)了序列化及反序列化浸剩、從磁盤讀取的時間,從但是對于內(nèi)存的容量有較高的要求鳄袍;
(2)MEMORY_ONLY_SER會將數(shù)據(jù)序列化后保存在內(nèi)存中绢要,通過序列化壓縮了RDD的大小,但是相較于MEMORY_ONLY多出了序列化及反序列化的時間拗小;
(3)MEMORY_AND_DISK_SER優(yōu)先將RDD緩存在內(nèi)存中重罪,內(nèi)存緩存不下時才會存在磁盤中;
(4)DISK_ONLY和后綴為_2的級別通常不建議使用,完全基于磁盤文件的讀寫會導(dǎo)致性能的極具降低哀九;后綴為2的級別會將所有數(shù)據(jù)都復(fù)制一份副本到其他節(jié)點上剿配,數(shù)據(jù)復(fù)制及網(wǎng)絡(luò)傳輸會導(dǎo)致較大的性能開銷。
表3.1 RDD持久化級別
持久化級別含義
MEMORY_ONLY使用未序列化的Java對象格式阅束,將數(shù)據(jù)保存在內(nèi)存中呼胚。如果內(nèi)存不夠存放所有的數(shù)據(jù),則數(shù)據(jù)可能就不會進行持久化息裸。那么下次對這個RDD執(zhí)行算子操作時蝇更,那些沒有被持久化的數(shù)據(jù),需要從源頭處重新計算一遍呼盆。這是默認的持久化策略簿寂,使用cache()方法時,實際就是使用的這種持久化策略宿亡。
MEMORY_AND_DISK使用未序列化的Java對象格式常遂,優(yōu)先嘗試將數(shù)據(jù)保存在內(nèi)存中。如果內(nèi)存不夠存放所有的數(shù)據(jù)挽荠,會將數(shù)據(jù)寫入磁盤文件中克胳,下次對這個RDD執(zhí)行算子時,持久化在磁盤文件中的數(shù)據(jù)會被讀取出來使用圈匆。
MEMORY_ONLY_SER基本含義同MEMORY_ONLY漠另。區(qū)別是會將RDD中的數(shù)據(jù)進行序列化,RDD的每個partition會被序列化成一個字節(jié)數(shù)組跃赚。這種方式更加節(jié)省內(nèi)存笆搓,從而可以避免持久化的數(shù)據(jù)占用過多內(nèi)存導(dǎo)致頻繁GC。
MEMORY_AND_DISK_SER基本含義同MEMORY_AND_DISK纬傲。區(qū)別是會將RDD中的數(shù)據(jù)進行序列化满败,RDD的每個partition會被序列化成一個字節(jié)數(shù)組。這種方式更加節(jié)省內(nèi)存叹括,從而可以避免持久化的數(shù)據(jù)占用過多內(nèi)存導(dǎo)致頻繁GC算墨。
DISK_ONLY使用未序列化的Java對象格式,將數(shù)據(jù)全部寫入磁盤文件中汁雷。
MEMORY_ONLY_2,
MEMORY_AND_DISK_2
對于上述任意一種持久化策略净嘀,如果加上后綴_2报咳,代表的是將每個持久化的數(shù)據(jù),都復(fù)制一份副本挖藏,并將副本保存到其他節(jié)點上暑刃。
3.2?優(yōu)化算子
3.2.1?盡量避免使用Shuffle算子
Spark作業(yè)最消耗性能的部分就是Shuffle過程,應(yīng)盡量避免使用Shuffle算子膜眠。Shuffle過程就是將分布在集群中多個節(jié)點上的同一個key岩臣,拉取到同一個節(jié)點上,進行聚合或者join操作柴底,在操作過程中可能會因為一個節(jié)點上處理的key過多導(dǎo)致數(shù)據(jù)溢出到磁盤婿脸。由此可見,Shuffle過程可能會發(fā)生大量的磁盤文件讀寫的IO操作柄驻,以及數(shù)據(jù)的網(wǎng)絡(luò)傳輸操作狐树,Shuffle過程如圖3.2所示。Shuffle類算子有:distinct鸿脓、groupByKey抑钟、reduceByKey、aggregateByKey野哭、join在塔、cogroup、repartition等拨黔,編寫Spark作業(yè)程序時蛔溃,應(yīng)該盡量使用map類算子替代Shuffle算子。
圖3.2 Shuffle過程
3.2.1?使用高性能算子
Spark提供了幾十種算子篱蝇,使用這些算子可以讓Spark作業(yè)不同的性能贺待,在編寫Spark程序時應(yīng)盡量使用性能高的算子替換性能低的算子。這里給出幾種算子的替換方案:
(1)使用mapPartitions替代普通map零截。mapPartition類的算子每次會處理一個分區(qū)的數(shù)據(jù)麸塞,而普通map每次只會處理一條數(shù)據(jù)。一次處理一個分區(qū)的數(shù)據(jù)節(jié)省了多次建立連接涧衙、多次打開數(shù)據(jù)流的時間哪工;但是,mapPartitions也有可能會出現(xiàn)內(nèi)存溢出問題弧哎,需要謹慎使用雁比。
(2)foreachPartitions替代foreach。原理類似于“使用mapPartitions替代普通map”傻铣,foreachPartitions函數(shù)也是每次處理一個分區(qū)章贞,而foreach函數(shù)每次只處理一條數(shù)據(jù)。比如在foreach函數(shù)中非洲,將RDD中所有數(shù)據(jù)寫MySQL鸭限,那么如果是普通的foreach算子,就會一條數(shù)據(jù)一條數(shù)據(jù)地寫两踏,每次函數(shù)調(diào)用可能就會創(chuàng)建一個數(shù)據(jù)庫連接败京,此時就勢必會頻繁地創(chuàng)建和銷毀數(shù)據(jù)庫連接,性能是非常低下梦染;但是如果用foreachPartitions算子一次性處理一個partition的數(shù)據(jù)赡麦,那么對于每個partition,只要創(chuàng)建一個數(shù)據(jù)庫連接即可帕识,然后執(zhí)行批量插入操作泛粹,此時性能是比較高的。實踐中發(fā)現(xiàn)肮疗,對于1萬條左右的數(shù)據(jù)量寫MySQL晶姊,性能可以提升30%以上。
(3)使用filter之后進行coalesce操作伪货。對于一個RDD使用filter進行過濾后们衙,分區(qū)中的數(shù)據(jù)量可能會大為縮減,每個task任務(wù)處理的數(shù)據(jù)量大為減少碱呼,有些浪費資源蒙挑,這時考慮將RDD的分區(qū)縮減。coalesce函數(shù)可以重新劃分分區(qū)愚臀,但是要注意使用該函數(shù)會引起Shuffle忆蚀。
(4)使用repartitionAndSortWithinPartitions替代repartition與sort類操作。repartitionAndSortWithinPartitions函數(shù)是Spark官方網(wǎng)站推薦的一個函數(shù)姑裂,如果需要現(xiàn)對RDD進行分區(qū)操作然后排序馋袜,那么不必使用repartition與sort的組合,直接使用repartitionAndSortWithinPartitions函數(shù)性能會更好炭分。因為該算子可以在分區(qū)的同時進行排序操作桃焕,Shuffle操作與sort操作同時進行。
3.3?優(yōu)化數(shù)據(jù)
3.3.1?使用Kryo優(yōu)化序列化性能
在一個Spark作業(yè)中捧毛,有三處涉及到了序列化:(1)在算子函數(shù)中使用到外部變量時观堂,該變量將會被序列化后進行網(wǎng)絡(luò)傳輸;(2)將自定的類型作為RDD的泛型類型是呀忧,所有自定義類型對象都會進行序列化师痕。此時,自定義的類必須要實現(xiàn)Serializable接口而账;(3)使用可序列化的持久化策略時胰坟,RDD的每個分區(qū)都會被序列化成為一個大的字節(jié)數(shù)組。
對于這三種涉及到序列化的地方泞辐,可以使用java提供的序列化機制笔横,這也是Spark作業(yè)默認的序列化機制竞滓,但是這種序列化機制要保存全類名,效率較低吹缔。這里提供一種性能更好的序列化方法:Kryo序列化類庫商佑,這種方法較java序列化方法速度快了10倍作業(yè)。但是使用Kryo時需要自行注冊需要序列化的自定義類厢塘,比較有難度茶没。
3.3.2?優(yōu)化數(shù)據(jù)結(jié)構(gòu)
在java中有三種類型比較耗費內(nèi)存:(1)對象;(2)字符串晚碾;(3)集合類型抓半。因此Spark編碼時應(yīng)盡量不要使用以上三種數(shù)據(jù)結(jié)構(gòu)。盡量使用字符串代替對象格嘁;使用原始類型代替字符串笛求;使用數(shù)組代替集合,這樣可以減少內(nèi)存的占用讥蔽,降低垃圾回收的頻率提高性能涣易。
4.1走近數(shù)據(jù)傾斜
4.1.1?數(shù)據(jù)傾斜的原因
數(shù)據(jù)傾斜是在進行數(shù)據(jù)計算時冶伞,數(shù)據(jù)分散度不夠大量數(shù)據(jù)集中到少數(shù)機器上計算新症,這些數(shù)據(jù)的計算速度遠遠低于平均計算速度,導(dǎo)致整個計算過程過慢响禽。發(fā)生數(shù)據(jù)清晰時常有以下現(xiàn)象:(1)Executor lost徒爹,OOM,Shuffle過程出錯芋类;(2)Driver OOM隆嗅;(3)單個Executor執(zhí)行時間特別久,整體任務(wù)卡在某個階段不能結(jié)束侯繁;(4)正常運行的任務(wù)突然失敗胖喳。
數(shù)據(jù)傾斜的產(chǎn)生的原理十分簡單:在Spark作業(yè)進行Shuffle時會將個個節(jié)點上相同的key拉取到某個節(jié)點的一個task上進行操作,此時贮竟,如果某一個key對應(yīng)的數(shù)據(jù)量特別大時丽焊,該key對應(yīng)的task就要處理非常龐大的數(shù)據(jù)量,這就發(fā)生了數(shù)據(jù)傾斜咕别。通過圖4.1可以很好的理解這一過程:在三個節(jié)點上技健,hello對應(yīng)7條數(shù)據(jù),world對應(yīng)1條數(shù)據(jù)惰拱,you對應(yīng)1條數(shù)據(jù)雌贱,執(zhí)行Shuffle操作時,第一個task需要處理7條數(shù),其運行時間遠大于其他兩個task欣孤。由于木桶效應(yīng)馋没,整個Stage的運行速度是由運行最慢的task決定的。
圖4.1 Shuffle過程中數(shù)據(jù)傾斜的產(chǎn)生
4.1.2?定位數(shù)據(jù)傾斜的位置
數(shù)據(jù)傾斜現(xiàn)象只會發(fā)生在Shuffle過程中导街,當(dāng)出現(xiàn)數(shù)據(jù)傾斜時應(yīng)檢查代碼中可能會觸發(fā)Shuffle操作的算子披泪。對于不同的數(shù)據(jù)傾斜現(xiàn)象纤子,有不同的定位方法:
(1)某個task執(zhí)行特別慢的情況
對于這種情況搬瑰,首先要確定數(shù)據(jù)傾斜發(fā)生在第幾個Stage中。如果是用yarn-client模式提交控硼,那么本地是直接可以看到log的泽论,可以在log中找到當(dāng)前運行到了第幾個Stage;如果是用yarn-cluster模式提交卡乾,則可以通過Spark Web UI來查看當(dāng)前運行到了第幾個Stage翼悴。此外,無論是使用yarn-client模式還是yarn-cluster模式幔妨,我們都可以在Spark Web UI上深入看一下當(dāng)前這個Stage各個task分配的數(shù)據(jù)量鹦赎,從而進一步確定是不是task分配的數(shù)據(jù)不均勻?qū)е铝藬?shù)據(jù)傾斜。
知道數(shù)據(jù)傾斜發(fā)生在哪一個Stage之后误堡,接著根據(jù)Stage劃分原理古话,推算出來發(fā)生傾斜的那個Stage對應(yīng)代碼中的哪一部分。精準推算Stage與代碼的對應(yīng)關(guān)系锁施,需要對Spark的源碼有深入的理解陪踩,這里有一個相對簡單實用的推算方法:只要看到Spark代碼中出現(xiàn)了一個Shuffle類算子或者是Spark SQL的SQL語句中出現(xiàn)了會導(dǎo)致Shuffle的語句,那么就可以判定悉抵,以此為界限劃分出了前后兩個Stage肩狂。
(2)程序異常報錯
這種情況比較容易定位有問題的代碼,可以直接查看yarn-client模式下本地log的異常信息姥饰,或通過yarn-cluster模式下的log中的異常信息傻谁。一般來說,通過異常棧信息就可以定位到你的代碼中哪一行發(fā)生了內(nèi)存溢出列粪。然后在那行代碼附近找找审磁,一般也會有Shuffle類算子,此時很可能就是這個算子導(dǎo)致了數(shù)據(jù)傾斜篱竭。
要注意的是力图,不能單純靠偶然的內(nèi)存溢出就判定發(fā)生了數(shù)據(jù)傾斜。因為代碼的bug掺逼、偶然出現(xiàn)的數(shù)據(jù)異常等吃媒,也可能會導(dǎo)致內(nèi)存溢出。通過Spark Web UI查看報錯的那個Stage的各個task的運行時間以及分配的數(shù)據(jù)量,才能確定是否是由于數(shù)據(jù)傾斜才導(dǎo)致了這次內(nèi)存溢出赘那。
4.2?優(yōu)化數(shù)據(jù)源
4.2.1?使用Hive ETL預(yù)處理數(shù)據(jù)
當(dāng)Spark作業(yè)的數(shù)據(jù)來自Hive刑桑,且Hive表中數(shù)據(jù)不均勻,即少量的key對應(yīng)了大多數(shù)的數(shù)據(jù)時募舟,對于Hive中的數(shù)據(jù)進行處理是解決數(shù)據(jù)傾斜的一種辦法祠斧。
該方法的思路是在Spark作業(yè)之前先對Hive中的數(shù)據(jù)進行聚合、join等處理拱礁,然后Spark作業(yè)處理的數(shù)據(jù)就是解決了數(shù)據(jù)傾斜問題的數(shù)據(jù)琢锋。該方法從根本上解決了數(shù)據(jù)傾斜,但是這也是一種危機轉(zhuǎn)移方案呢灶,雖然Spark作業(yè)避免了數(shù)據(jù)傾斜吴超,但是在Hive中的預(yù)操作中依舊存在數(shù)據(jù)傾斜問題。當(dāng)對于Spark作業(yè)的實時性要求很高時可以采用這種方案鸯乃,將數(shù)據(jù)傾斜提前在上游的Hive ETL中完成鲸阻,周期內(nèi)僅執(zhí)行一次,周期內(nèi)其他時間的操作都會提速缨睡。
4.2.2?過濾少數(shù)導(dǎo)致傾斜的key
當(dāng)導(dǎo)致數(shù)據(jù)傾斜的key很少鸟悴,且少量的key對作業(yè)結(jié)果影響并不大,那么過濾掉少數(shù)導(dǎo)致傾斜的key是一種不錯的處理方法奖年。
如果我們判斷那少數(shù)幾個數(shù)據(jù)量特別多的key细诸,對作業(yè)的執(zhí)行和計算結(jié)果不是特別重要的話,那么干脆就直接過濾掉那少數(shù)幾個key拾并。比如揍堰,在Spark SQL中可以使用where子句過濾掉這些key或者在Spark Core中對RDD執(zhí)行filter算子過濾掉這些key。如果需要每次作業(yè)執(zhí)行時嗅义,動態(tài)判定哪些key的數(shù)據(jù)量最多然后再進行過濾屏歹,那么可以使用sample算子對RDD進行采樣,然后計算出每個key的數(shù)量之碗,取數(shù)據(jù)量最多的key過濾掉即可蝙眶。這種方法實現(xiàn)簡單,效果也很好褪那,可以完全規(guī)避掉數(shù)據(jù)傾斜幽纷。但是在實際應(yīng)用場景中,導(dǎo)致數(shù)據(jù)傾斜的key往往較多博敬,所以該方法適用范圍較小友浸。
4.3?優(yōu)化集群并行度
4.3.1?提高Shuffle操作的并行度
當(dāng)必須要正面解決數(shù)據(jù)傾斜問題時,該方案較為適合偏窝,這也是處理數(shù)據(jù)傾斜最簡單的方法之一收恢。
通過增加Shuffle read task的數(shù)量武学,可以讓原本分配給一個task的key分配給多個task,從而讓每個task處理比原來更少的數(shù)據(jù)伦意,原理如圖4.2所示火窒。這種方案實現(xiàn)起來比較簡單,可以有效的緩解數(shù)據(jù)傾斜的影響驮肉。但是這種方法通常無法從根本解決問題熏矿,因為如果有一些極端情況出現(xiàn),如:某個key對應(yīng)的數(shù)據(jù)量占整個數(shù)據(jù)集的90%离钝,那么該key所對應(yīng)的90%的數(shù)據(jù)還是會分配到一個task中票编,這是數(shù)據(jù)傾斜現(xiàn)象還是是產(chǎn)生。
圖4.2?提高Shuffle操作的并行度
4.4?優(yōu)化算法
4.4.1?兩階段聚合
對RDD執(zhí)行reduceByKey等聚合類Shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時奈辰,比較適用這種方案栏妖。將原本相同的key通過附加隨機前綴的方式,變成多個不同的key奖恰,就可以讓原本被一個task處理的數(shù)據(jù)分散到多個task上去做局部聚合,進而解決單個task處理數(shù)據(jù)量過多的問題宛裕。接著去除掉隨機前綴瑟啃,再次進行全局聚合,就可以得到最終的結(jié)果揩尸。
這種方案的核心思想是將會產(chǎn)生數(shù)據(jù)傾斜的一次聚合作業(yè)蛹屿,分為兩個階段進行聚合。第一次聚合岩榆,先為每個key標記一個隨機數(shù)错负,隨機數(shù)范圍為[0,n],此時一個key被分為n份勇边,對標記后的key進行局部聚合犹撒;第二個階段,將局部聚合后的key所標記的隨機數(shù)去除粒褒,然后在對key進行聚合识颊。最終,Spark聚合作業(yè)就完成了奕坟,具體原理如圖4.3所示祥款。這種方案對于聚合類的Shuffle操作導(dǎo)致的數(shù)據(jù)傾斜效果很不錯,但是僅僅適用于聚合類的操作月杉。
圖4.3?兩階段聚合原理圖
4.4.2?將reduce join 轉(zhuǎn)化為map join
本方案適合于以下情況使用:RDD中使用join類型的操做或Spark SQL中使用join語句刃跛,且join操作中的一個RDD表數(shù)據(jù)量較小。普通的join是會走Shuffle過程的苛萎,而一旦Shuffle桨昙,就相當(dāng)于會將相同key的數(shù)據(jù)拉取到一個Shuffle read task中再進行join跌帐,此時就是reduce join。但是如果一個RDD是比較小的绊率,則可以采用廣播小RDD全量數(shù)據(jù)+map算子來實現(xiàn)與join同樣的效果谨敛,也就是map join,此時就不會發(fā)生Shuffle操作滤否,也就不會發(fā)生數(shù)據(jù)傾斜脸狸。
本方案使用廣播變量與map類算子代替了join操作,從而完全規(guī)避掉Shuffle操作藐俺,徹底避免了數(shù)據(jù)傾斜的發(fā)生和出現(xiàn)炊甲。首先,將較小的RDD中的數(shù)據(jù)通過collect算子拉去到Driver端內(nèi)存中欲芹,然后將該RDD的數(shù)據(jù)通過Broadcast變量廣播出去卿啡;然后,對另一個RDD執(zhí)行map操作菱父,在算子函數(shù)內(nèi)颈娜,從Broadcast變量中獲取較小RDD的全量數(shù)據(jù),與當(dāng)前RDD的每一條數(shù)據(jù)按照連接key進行比對浙宜,如果連接key相同的話官辽,那么就將兩個RDD的數(shù)據(jù)用你需要的方式連接起來,具體原理如圖4.4所示粟瞬。本方案對于join操作導(dǎo)致的數(shù)據(jù)傾斜十分有效同仆,但是本方案僅僅局限于執(zhí)行join操作的兩個RDD中有一個數(shù)據(jù)量較小時。
圖4.4?將reduce join?轉(zhuǎn)化為map join
4.4.3?采樣傾斜key并分拆join操作
本方案適合于以下情況使用:RDD中使用join類型的操做或Spark SQL中使用join語句裙品,且兩個RDD數(shù)據(jù)集的數(shù)據(jù)量都比較大俗批,且出現(xiàn)數(shù)據(jù)傾斜的原因是一個RDD中少數(shù)幾個key的數(shù)據(jù)量過大,另一個RDD的key分布均勻市怎。對于join導(dǎo)致的數(shù)據(jù)傾斜岁忘,如果只是某幾個key導(dǎo)致了傾斜,可以將少數(shù)幾個key分拆成獨立RDD焰轻,并附加隨機前綴打散成n份去進行join臭觉,此時這幾個key對應(yīng)的數(shù)據(jù)就不會集中在少數(shù)幾個task上,而是分散到多個task進行join辱志。
本方案的操作過程如下:
(1)對包含少數(shù)幾個數(shù)據(jù)量過大的key的那個RDD蝠筑,通過sample算子采樣出一份樣本來,然后統(tǒng)計一下每個key的數(shù)量揩懒,計算出來數(shù)據(jù)量最大的是哪幾個key什乙;
(2)將這幾個key對應(yīng)的數(shù)據(jù)從原來的RDD中拆分出來,形成一個單獨的RDD A已球,并給每個key都打上n以內(nèi)的隨機數(shù)作為前綴臣镣,而不會導(dǎo)致傾斜的大部分key形成另外一個RDD B辅愿;
(3)將需要join的另一個RDD也過濾出傾斜key對應(yīng)的數(shù)據(jù)并形成一個單獨的RDD C,將每條數(shù)據(jù)膨脹成n條數(shù)據(jù)忆某,這n條數(shù)據(jù)按順序附加一個0~n的前綴点待,不會導(dǎo)致傾斜的大部分key形成另外一個RDD D;
(4)將RDD A與RDD C進行join弃舒,此時導(dǎo)致數(shù)據(jù)傾斜的key分成n份癞埠,分散到多個task中去進行join,得到RDD E聋呢;
(5)將RDD B和RDD D進行join操作苗踪,得到RDD F;
(6)對RDD E與RDD F執(zhí)行union操作削锰,得到 RDD G通铲,RDD G即為最終結(jié)果。
以上操作步驟可以用圖4.5來表示器贩,該方案針對于某幾個key導(dǎo)致的數(shù)據(jù)傾斜十分有效颅夺,只需要針對少數(shù)導(dǎo)致數(shù)據(jù)傾斜的key進行擴容n倍,不需要對全量數(shù)據(jù)進行擴容磨澡,避免了占用過多內(nèi)存碗啄;但對于導(dǎo)致數(shù)據(jù)傾斜的key的數(shù)量特別多時,這種方案是無能為力的稳摄。
圖4.5?采樣傾斜key并
分拆join操作
4.4.4?使用隨機前綴和擴容RDD進行join
本方案適合于以下情況使用:RDD中使用join類型的操做或Spark SQL中使用join語句,且兩個RDD數(shù)據(jù)集的數(shù)據(jù)量都比較大饲宿,且出現(xiàn)數(shù)據(jù)傾斜的原因是一個RDD中有大量的key導(dǎo)致數(shù)據(jù)傾斜厦酬。該方案將導(dǎo)致數(shù)據(jù)傾斜的RDD中的所有key均附加隨機前綴,然后將處理后的key分散到不同的key中進行處理瘫想,而不是讓一個task處理大量的數(shù)據(jù)仗阅。該方案與4.4.3小節(jié)方案類似,但是本方案需要更多的內(nèi)存資源国夜。
本方案的操作過程如下:
(1)查看RDD/Hive表中的數(shù)據(jù)分布情況减噪,找到那個造成數(shù)據(jù)傾斜的RDD/Hive表;
(2)將該RDD的每條數(shù)據(jù)都打上一個n以內(nèi)的隨機前綴车吹;
(3)對另外一個正常的RDD進行擴容筹裕,將每條數(shù)據(jù)都擴容成n條數(shù)據(jù),擴容出來的每條數(shù)據(jù)都依次打上一個0~n的前綴窄驹;
(4)將兩個處理后的RDD進行join朝卒。
該方案對于所有join類型的數(shù)據(jù)傾斜都可以處理,效果較好乐埠,同時本方案更注重與緩解數(shù)據(jù)傾斜抗斤,而不是徹底避免數(shù)據(jù)傾斜囚企,對于內(nèi)存資源的要求很高。
Spark性能優(yōu)化是一項繁復(fù)的任務(wù)龙宏,需要結(jié)合實際生產(chǎn)情況對于多個方面級進行優(yōu)化,僅僅對于某一方面的調(diào)整很難取得集群性能上巨大的提升伤疙。本文對于Spark調(diào)優(yōu)主要的三個方面:資源優(yōu)化银酗、開發(fā)程序優(yōu)化、數(shù)據(jù)傾斜優(yōu)化進行了闡述掩浙,并在每個部分給出了常見的調(diào)優(yōu)方法花吟。在撰寫本文的過程中,我對于Spark整體有了更深入的理解厨姚,希望能夠砥礪前行衅澈。