Spark調(diào)優(yōu)綜述

轉(zhuǎn)自:https://yq.aliyun.com/articles/461770?spm=a2c4e.11163080.searchblog.129.49792ec1bgg2MF


目錄

摘要

一阳谍、引言

二、資源調(diào)優(yōu)

2.1 優(yōu)化資源參數(shù)

三慌植、程序開發(fā)調(diào)優(yōu)

3.1 優(yōu)化RDD

3.1.1避免創(chuàng)建重復(fù)的RDD

3.1.2 盡可能復(fù)用一個RDD

3.1.3 對多次使用的RDD進行持久化

3.2 優(yōu)化算子

3.2.1 盡量避免使用Shuffle算子

3.2.1 使用高性能算子

3.3 優(yōu)化數(shù)據(jù)

3.3.1 使用Kryo優(yōu)化序列化性能

3.3.2 優(yōu)化數(shù)據(jù)結(jié)構(gòu)

四、數(shù)據(jù)傾斜調(diào)優(yōu)

4.1走近數(shù)據(jù)傾斜

4.1.1 數(shù)據(jù)傾斜的原因

4.1.2 定位數(shù)據(jù)傾斜的位置

4.2 優(yōu)化數(shù)據(jù)源

4.2.1 使用Hive ETL預(yù)處理數(shù)據(jù)

4.2.2 過濾少數(shù)導(dǎo)致傾斜的key

4.3 優(yōu)化集群并行度

4.3.1 提高Shuffle操作的并行度

4.4 優(yōu)化算法

4.4.1 兩階段聚合

4.4.2 將reduce join 轉(zhuǎn)化為map join

4.4.3 采樣傾斜key并分拆join操作

4.4.4 使用隨機前綴和擴容RDD進行join

五、總結(jié)

六、參考文獻


摘要

Apache Spark廣泛用于大規(guī)模數(shù)據(jù)處理方面,憑借支持交互查詢慧邮、流式計算的性能快速嶄露頭角。Spark需要快速的處理海量數(shù)據(jù)邻储,由此針對Spark進行性能調(diào)優(yōu)顯得十分必要赋咽。Spark調(diào)優(yōu)包含眾多方面旧噪,本文中僅針對于:資源吨娜、程序開發(fā)、數(shù)據(jù)傾斜三方面的調(diào)優(yōu)進行闡述淘钟。資源優(yōu)化方面宦赠,針對特定生產(chǎn)環(huán)境設(shè)置集群配置參數(shù);程序開發(fā)優(yōu)化方面米母,對于RDD的使用勾扭、算子的使用、數(shù)據(jù)及數(shù)據(jù)結(jié)構(gòu)等方面進行了闡述铁瞒;數(shù)據(jù)傾斜優(yōu)化方面妙色,在簡單了解數(shù)據(jù)傾斜之后,提出了解決數(shù)據(jù)傾斜的三類方法:優(yōu)化數(shù)據(jù)源慧耍、優(yōu)化并行度身辨、優(yōu)化數(shù)據(jù)結(jié)構(gòu)。


關(guān)鍵字:?Spark調(diào)優(yōu)芍碧、數(shù)據(jù)傾斜煌珊、開發(fā)調(diào)優(yōu)

Spark調(diào)優(yōu)綜述

一、引言

Apache Spark 是專為大規(guī)模數(shù)據(jù)處理而設(shè)計的快速通用的計算引擎泌豆,在非常短的時間內(nèi)嶄露頭角定庵,它的API更豐富,且支持交互式查詢、流式計算蔬浙、機器學(xué)習(xí)等猪落。與曾經(jīng)引爆大數(shù)據(jù)產(chǎn)業(yè)革命的Hadoop Mapreduce相比它具有更快的速度。然而畴博,想要讓Spark作業(yè)擁有更好的性能需要一定的技巧许布。如果沒有對于Spark作業(yè)進行合理的優(yōu)化,那么Spark作業(yè)的執(zhí)行速度可能大大降低绎晃,這樣Spark的速度優(yōu)勢就不能完全體現(xiàn)蜜唾。由此可見,對于Spark作業(yè)進行優(yōu)化十分有必要庶艾。

想要對Spark作業(yè)進行優(yōu)化袁余,了解Spark作業(yè)的執(zhí)行原理十分有必要。通過對于Spark作業(yè)的執(zhí)行流程的分析咱揍,有助于做出適合的優(yōu)化操作颖榜。同時,Spark的表現(xiàn)實際上是由很多方面決定的煤裙,對于Spark性能調(diào)優(yōu)也是由很多部分組成掩完,不是調(diào)節(jié)幾個參數(shù)就可以大幅度提升作業(yè)性能的。我們需要結(jié)合實際應(yīng)用場景對Spark作業(yè)進行綜合分析硼砰,調(diào)節(jié)多個方面且蓬,才能獲得更好的性能。在本文中我們將對資源题翰、開發(fā)恶阴、數(shù)據(jù)傾斜三個方面進行原理介紹及優(yōu)化。

二豹障、資源調(diào)優(yōu)

2.1?優(yōu)化資源參數(shù)

了解了Spark作業(yè)的基本原理之后冯事,對于資源相關(guān)的參數(shù)進行調(diào)優(yōu)就比較容易理解了。對資源參數(shù)進行調(diào)優(yōu)就是對Spark作業(yè)運行過程中的需要使用到資源的地方血公,通過調(diào)節(jié)各種參數(shù)昵仅,來優(yōu)化資源的使用率,從而提升Spark作業(yè)的執(zhí)行性能累魔。

各個參數(shù)對應(yīng)于作業(yè)運行原理中的某個部分摔笤。在Spark中我們有三種方式來設(shè)置資源參數(shù),按照優(yōu)先級排序依是:(1)用戶代碼中顯示調(diào)用set()方法設(shè)置薛夜;(2)通過Spark-submit傳遞參數(shù)籍茧;(3)配置文件。當(dāng)以上三種方法均沒有設(shè)置參數(shù)的值時梯澜,Spark將使用系統(tǒng)默認值寞冯,下面對主要參數(shù)的配置進行產(chǎn)闡述渴析。

(1)num-executors。該參數(shù)用于設(shè)計Spark作業(yè)總的Executor的個數(shù)吮龄。YARN集群管理器盡可能根據(jù)num-executor設(shè)置在工作節(jié)點上啟動Executor俭茧。這個參數(shù)十分重要,Spark默認只會啟動很少的進程漓帚,這時并行度不夠母债,任務(wù)執(zhí)行速度十分緩慢。一般為每個Spark作業(yè)設(shè)置50~100個Executor尝抖,設(shè)置Executor太多大部分隊列無法給予充分的資源毡们;設(shè)置Executor太少無法充分利用集群性能。

(2)executor-memory昧辽。這個參數(shù)設(shè)置每個Executor進程的內(nèi)存衙熔,Executor內(nèi)存的大小,很多程度上直接決定了Spark作業(yè)的性能搅荞,而且跟很常見的java虛擬機內(nèi)存溢出異常也有關(guān)系饵逐。這里executor-memory最大可以設(shè)置為式(3.1)所示菇怀,其中rMemory表示資源隊列的最大內(nèi)存限制煤墙。

executor-memory=rMemory/num-executor?? ???????公式(3.1)

(3)executor-core距误,該參數(shù)用于設(shè)置每個Executor進程的CPU core數(shù)量。因為每個CPU core同一時間只能執(zhí)行一個task線程茉贡,所以executor-core的個數(shù)決定了Executor進程的并發(fā)線程能力塞栅。該參數(shù)設(shè)置為2-4比較合適。

(4)driver-memory块仆,該參數(shù)用于設(shè)置Driver進程的內(nèi)存构蹬。這個參數(shù)通常不設(shè)置王暗,但是要注意的一點是悔据,使用collect算子時,一定要保證Driver內(nèi)存足夠大俗壹,否則會出現(xiàn)內(nèi)存溢出的錯誤科汗。

(5)Spark.default.parallelism,該參數(shù)用于設(shè)置每個Stage默認的task數(shù)量绷雏。該參數(shù)使用默認值時头滔,Spark會根據(jù)底層HDFS的block數(shù)量設(shè)置task數(shù)量,通常一個block對應(yīng)一個task涎显,這樣task的數(shù)量通常是偏少的坤检。由于task是真正執(zhí)行Spark作業(yè)的線程,如果task數(shù)量太少期吓,那么Executor中將面臨有足夠資源卻沒有偶task執(zhí)行的窘境早歇,針對Executor所做的優(yōu)化也將前功盡棄。這里Spark.default.parallelism的大小可以用式(3.2)計算。

Spark.default.parallelism=[2,3]*num-executors * executor-cores ????公式(3.2)

(6)Spark.storage.memoryFraction箭跳,該參數(shù)用于設(shè)置RDD持久化數(shù)據(jù)在Executor內(nèi)存中能占的比例晨另,默認是0.6。也就是說谱姓,默認Executor 60%的內(nèi)存借尿,可以用來保存持久化的RDD數(shù)據(jù)。當(dāng)Spark作業(yè)中有較多RDD需要進行持久化操作時屉来,可以將該參數(shù)值調(diào)高路翻;當(dāng)Spark作業(yè)中有較少RDD需要進行持久化操作時,可以將該參數(shù)值調(diào)低茄靠。

(7)Spark.Shuffle.memoryFraction帚桩,該參數(shù)用于設(shè)置Shuffle過程中一個task拉取到上個Stage的task的輸出后,進行聚合操作時能夠使用的Executor內(nèi)存的比例嘹黔,默認是0.2账嚎。如果Spark作業(yè)中RDD持久化操作較少,Shuffle操作較多時儡蔓,可以將該參數(shù)調(diào)高郭蕉;如果Spark作業(yè)中RDD持久化操作較多,Shuffle操作較少時喂江,可以將該參數(shù)調(diào)低召锈。

三、程序開發(fā)調(diào)優(yōu)

在編寫Spark程序之初我們就要注意性能優(yōu)化获询。實現(xiàn)同一個目的的Spark程序往往因為:使用的算子不同涨岁、RDD的復(fù)用程度不同、序列化方式不同等表現(xiàn)出性能方面的差異吉嚣。開發(fā)調(diào)優(yōu)就是優(yōu)化RDD梢薪、優(yōu)化算子、優(yōu)化數(shù)據(jù)的過程尝哆,通過遵循開發(fā)調(diào)優(yōu)的原則并將這些原則根據(jù)具體的業(yè)務(wù)應(yīng)用到實際中秉撇,可能很好的實現(xiàn)Spark作業(yè)的性能提升。

3.1?優(yōu)化RDD

3.1.1避免創(chuàng)建重復(fù)的RDD

通常來說秋泄,一個Spark作業(yè)就是對某個數(shù)據(jù)源創(chuàng)建RDD琐馆,然后對這個RDD進行轉(zhuǎn)化和行為操作。通過轉(zhuǎn)化操作恒序,得到下一個RDD瘦麸;通過行為操作,得到處理結(jié)果歧胁。在開發(fā)過程中需要注意滋饲,對于同一份數(shù)據(jù)彤敛,只應(yīng)該創(chuàng)建一個RDD,不能創(chuàng)建多個RDD代表同一份數(shù)據(jù)了赌。使用多個RDD代表同一份數(shù)據(jù)源時常常會增加作業(yè)的性能開銷墨榄,這些開銷包括:(1)創(chuàng)建新RDD的開銷;(2)從外部系統(tǒng)中加載數(shù)據(jù)到RDD中的開銷勿她;(3)重復(fù)計算的開銷袄秩。

3.1.2?盡可能復(fù)用一個RDD

在對不同的數(shù)據(jù)執(zhí)行算子操作時應(yīng)該盡量復(fù)用一個RDD。例如逢并,當(dāng)RDD A的數(shù)據(jù)格式是key-value類型的之剧,RDD B的數(shù)據(jù)格式是value類型的,但是這兩個RDD的value數(shù)據(jù)完全相同砍聊;那么背稼,RDD A包含了RDD B中的所有信息,理論上來說RDD B可以被替代玻蝌,而實際開發(fā)中也應(yīng)該盡量減少多個RDD數(shù)據(jù)有重復(fù)或者包含的情況蟹肘,這樣可以盡可能減少RDD的數(shù)量從而減少算子執(zhí)行的次數(shù)。

3.1.3?對多次使用的RDD進行持久化

Spark使用懶惰計算俯树,執(zhí)行轉(zhuǎn)化操作時并不馬上執(zhí)行命令而是維護一張記錄了執(zhí)行RDD轉(zhuǎn)化關(guān)系的譜系圖帘腹,如圖3.1所示。每次同一個RDD執(zhí)行多個算子運算時都會重新從源頭處計算一次许饿,得到該RDD后阳欲,在對這個RDD執(zhí)行算子操作,這種方式極大的損耗了Spark集群的資源陋率。對于這種情況球化,應(yīng)該對于多次使用的RDD進行持久化操作,持久化操作會將RDD數(shù)據(jù)保存到內(nèi)存或者磁盤中瓦糟,以后每次使用這個RDD時都不必重新計算筒愚,而是從內(nèi)存或磁盤中直接取出該持久化RDD。

圖3.1 RDD轉(zhuǎn)化譜系圖

RDD的持有化有幾種不同的級別狸页,分別是:MEMORY_ONLY锨能、MEMORY_AND_DISK、MEMORY_ONLY_SER芍耘、MEMORY_AND_DISK_SER、DISK_ONLY熄阻、MEMORY_ONLY_2等斋竞,表3.1中對各種級別的含義進行了簡單的介紹。這幾種持久化級別使用的優(yōu)先級排序如下:

(1)MEMORY_ONLY性能最高秃殉,直接將RDD存儲在內(nèi)存中坝初,省區(qū)了序列化及反序列化浸剩、從磁盤讀取的時間,從但是對于內(nèi)存的容量有較高的要求鳄袍;

(2)MEMORY_ONLY_SER會將數(shù)據(jù)序列化后保存在內(nèi)存中绢要,通過序列化壓縮了RDD的大小,但是相較于MEMORY_ONLY多出了序列化及反序列化的時間拗小;

(3)MEMORY_AND_DISK_SER優(yōu)先將RDD緩存在內(nèi)存中重罪,內(nèi)存緩存不下時才會存在磁盤中;

(4)DISK_ONLY和后綴為_2的級別通常不建議使用,完全基于磁盤文件的讀寫會導(dǎo)致性能的極具降低哀九;后綴為2的級別會將所有數(shù)據(jù)都復(fù)制一份副本到其他節(jié)點上剿配,數(shù)據(jù)復(fù)制及網(wǎng)絡(luò)傳輸會導(dǎo)致較大的性能開銷。

表3.1 RDD持久化級別

持久化級別含義

MEMORY_ONLY使用未序列化的Java對象格式阅束,將數(shù)據(jù)保存在內(nèi)存中呼胚。如果內(nèi)存不夠存放所有的數(shù)據(jù),則數(shù)據(jù)可能就不會進行持久化息裸。那么下次對這個RDD執(zhí)行算子操作時蝇更,那些沒有被持久化的數(shù)據(jù),需要從源頭處重新計算一遍呼盆。這是默認的持久化策略簿寂,使用cache()方法時,實際就是使用的這種持久化策略宿亡。

MEMORY_AND_DISK使用未序列化的Java對象格式常遂,優(yōu)先嘗試將數(shù)據(jù)保存在內(nèi)存中。如果內(nèi)存不夠存放所有的數(shù)據(jù)挽荠,會將數(shù)據(jù)寫入磁盤文件中克胳,下次對這個RDD執(zhí)行算子時,持久化在磁盤文件中的數(shù)據(jù)會被讀取出來使用圈匆。

MEMORY_ONLY_SER基本含義同MEMORY_ONLY漠另。區(qū)別是會將RDD中的數(shù)據(jù)進行序列化,RDD的每個partition會被序列化成一個字節(jié)數(shù)組跃赚。這種方式更加節(jié)省內(nèi)存笆搓,從而可以避免持久化的數(shù)據(jù)占用過多內(nèi)存導(dǎo)致頻繁GC。

MEMORY_AND_DISK_SER基本含義同MEMORY_AND_DISK纬傲。區(qū)別是會將RDD中的數(shù)據(jù)進行序列化满败,RDD的每個partition會被序列化成一個字節(jié)數(shù)組。這種方式更加節(jié)省內(nèi)存叹括,從而可以避免持久化的數(shù)據(jù)占用過多內(nèi)存導(dǎo)致頻繁GC算墨。

DISK_ONLY使用未序列化的Java對象格式,將數(shù)據(jù)全部寫入磁盤文件中汁雷。

MEMORY_ONLY_2,

MEMORY_AND_DISK_2

對于上述任意一種持久化策略净嘀,如果加上后綴_2报咳,代表的是將每個持久化的數(shù)據(jù),都復(fù)制一份副本挖藏,并將副本保存到其他節(jié)點上暑刃。

3.2?優(yōu)化算子

3.2.1?盡量避免使用Shuffle算子

Spark作業(yè)最消耗性能的部分就是Shuffle過程,應(yīng)盡量避免使用Shuffle算子膜眠。Shuffle過程就是將分布在集群中多個節(jié)點上的同一個key岩臣,拉取到同一個節(jié)點上,進行聚合或者join操作柴底,在操作過程中可能會因為一個節(jié)點上處理的key過多導(dǎo)致數(shù)據(jù)溢出到磁盤婿脸。由此可見,Shuffle過程可能會發(fā)生大量的磁盤文件讀寫的IO操作柄驻,以及數(shù)據(jù)的網(wǎng)絡(luò)傳輸操作狐树,Shuffle過程如圖3.2所示。Shuffle類算子有:distinct鸿脓、groupByKey抑钟、reduceByKey、aggregateByKey野哭、join在塔、cogroup、repartition等拨黔,編寫Spark作業(yè)程序時蛔溃,應(yīng)該盡量使用map類算子替代Shuffle算子。

圖3.2 Shuffle過程

3.2.1?使用高性能算子

Spark提供了幾十種算子篱蝇,使用這些算子可以讓Spark作業(yè)不同的性能贺待,在編寫Spark程序時應(yīng)盡量使用性能高的算子替換性能低的算子。這里給出幾種算子的替換方案:

(1)使用mapPartitions替代普通map零截。mapPartition類的算子每次會處理一個分區(qū)的數(shù)據(jù)麸塞,而普通map每次只會處理一條數(shù)據(jù)。一次處理一個分區(qū)的數(shù)據(jù)節(jié)省了多次建立連接涧衙、多次打開數(shù)據(jù)流的時間哪工;但是,mapPartitions也有可能會出現(xiàn)內(nèi)存溢出問題弧哎,需要謹慎使用雁比。

(2)foreachPartitions替代foreach。原理類似于“使用mapPartitions替代普通map”傻铣,foreachPartitions函數(shù)也是每次處理一個分區(qū)章贞,而foreach函數(shù)每次只處理一條數(shù)據(jù)。比如在foreach函數(shù)中非洲,將RDD中所有數(shù)據(jù)寫MySQL鸭限,那么如果是普通的foreach算子,就會一條數(shù)據(jù)一條數(shù)據(jù)地寫两踏,每次函數(shù)調(diào)用可能就會創(chuàng)建一個數(shù)據(jù)庫連接败京,此時就勢必會頻繁地創(chuàng)建和銷毀數(shù)據(jù)庫連接,性能是非常低下梦染;但是如果用foreachPartitions算子一次性處理一個partition的數(shù)據(jù)赡麦,那么對于每個partition,只要創(chuàng)建一個數(shù)據(jù)庫連接即可帕识,然后執(zhí)行批量插入操作泛粹,此時性能是比較高的。實踐中發(fā)現(xiàn)肮疗,對于1萬條左右的數(shù)據(jù)量寫MySQL晶姊,性能可以提升30%以上。

(3)使用filter之后進行coalesce操作伪货。對于一個RDD使用filter進行過濾后们衙,分區(qū)中的數(shù)據(jù)量可能會大為縮減,每個task任務(wù)處理的數(shù)據(jù)量大為減少碱呼,有些浪費資源蒙挑,這時考慮將RDD的分區(qū)縮減。coalesce函數(shù)可以重新劃分分區(qū)愚臀,但是要注意使用該函數(shù)會引起Shuffle忆蚀。

(4)使用repartitionAndSortWithinPartitions替代repartition與sort類操作。repartitionAndSortWithinPartitions函數(shù)是Spark官方網(wǎng)站推薦的一個函數(shù)姑裂,如果需要現(xiàn)對RDD進行分區(qū)操作然后排序馋袜,那么不必使用repartition與sort的組合,直接使用repartitionAndSortWithinPartitions函數(shù)性能會更好炭分。因為該算子可以在分區(qū)的同時進行排序操作桃焕,Shuffle操作與sort操作同時進行。

3.3?優(yōu)化數(shù)據(jù)

3.3.1?使用Kryo優(yōu)化序列化性能

在一個Spark作業(yè)中捧毛,有三處涉及到了序列化:(1)在算子函數(shù)中使用到外部變量時观堂,該變量將會被序列化后進行網(wǎng)絡(luò)傳輸;(2)將自定的類型作為RDD的泛型類型是呀忧,所有自定義類型對象都會進行序列化师痕。此時,自定義的類必須要實現(xiàn)Serializable接口而账;(3)使用可序列化的持久化策略時胰坟,RDD的每個分區(qū)都會被序列化成為一個大的字節(jié)數(shù)組。

對于這三種涉及到序列化的地方泞辐,可以使用java提供的序列化機制笔横,這也是Spark作業(yè)默認的序列化機制竞滓,但是這種序列化機制要保存全類名,效率較低吹缔。這里提供一種性能更好的序列化方法:Kryo序列化類庫商佑,這種方法較java序列化方法速度快了10倍作業(yè)。但是使用Kryo時需要自行注冊需要序列化的自定義類厢塘,比較有難度茶没。

3.3.2?優(yōu)化數(shù)據(jù)結(jié)構(gòu)

在java中有三種類型比較耗費內(nèi)存:(1)對象;(2)字符串晚碾;(3)集合類型抓半。因此Spark編碼時應(yīng)盡量不要使用以上三種數(shù)據(jù)結(jié)構(gòu)。盡量使用字符串代替對象格嘁;使用原始類型代替字符串笛求;使用數(shù)組代替集合,這樣可以減少內(nèi)存的占用讥蔽,降低垃圾回收的頻率提高性能涣易。

四、數(shù)據(jù)傾斜調(diào)優(yōu)

4.1走近數(shù)據(jù)傾斜

4.1.1?數(shù)據(jù)傾斜的原因

數(shù)據(jù)傾斜是在進行數(shù)據(jù)計算時冶伞,數(shù)據(jù)分散度不夠大量數(shù)據(jù)集中到少數(shù)機器上計算新症,這些數(shù)據(jù)的計算速度遠遠低于平均計算速度,導(dǎo)致整個計算過程過慢响禽。發(fā)生數(shù)據(jù)清晰時常有以下現(xiàn)象:(1)Executor lost徒爹,OOM,Shuffle過程出錯芋类;(2)Driver OOM隆嗅;(3)單個Executor執(zhí)行時間特別久,整體任務(wù)卡在某個階段不能結(jié)束侯繁;(4)正常運行的任務(wù)突然失敗胖喳。

數(shù)據(jù)傾斜的產(chǎn)生的原理十分簡單:在Spark作業(yè)進行Shuffle時會將個個節(jié)點上相同的key拉取到某個節(jié)點的一個task上進行操作,此時贮竟,如果某一個key對應(yīng)的數(shù)據(jù)量特別大時丽焊,該key對應(yīng)的task就要處理非常龐大的數(shù)據(jù)量,這就發(fā)生了數(shù)據(jù)傾斜咕别。通過圖4.1可以很好的理解這一過程:在三個節(jié)點上技健,hello對應(yīng)7條數(shù)據(jù),world對應(yīng)1條數(shù)據(jù)惰拱,you對應(yīng)1條數(shù)據(jù)雌贱,執(zhí)行Shuffle操作時,第一個task需要處理7條數(shù),其運行時間遠大于其他兩個task欣孤。由于木桶效應(yīng)馋没,整個Stage的運行速度是由運行最慢的task決定的。

圖4.1 Shuffle過程中數(shù)據(jù)傾斜的產(chǎn)生

4.1.2?定位數(shù)據(jù)傾斜的位置

數(shù)據(jù)傾斜現(xiàn)象只會發(fā)生在Shuffle過程中导街,當(dāng)出現(xiàn)數(shù)據(jù)傾斜時應(yīng)檢查代碼中可能會觸發(fā)Shuffle操作的算子披泪。對于不同的數(shù)據(jù)傾斜現(xiàn)象纤子,有不同的定位方法:

(1)某個task執(zhí)行特別慢的情況

對于這種情況搬瑰,首先要確定數(shù)據(jù)傾斜發(fā)生在第幾個Stage中。如果是用yarn-client模式提交控硼,那么本地是直接可以看到log的泽论,可以在log中找到當(dāng)前運行到了第幾個Stage;如果是用yarn-cluster模式提交卡乾,則可以通過Spark Web UI來查看當(dāng)前運行到了第幾個Stage翼悴。此外,無論是使用yarn-client模式還是yarn-cluster模式幔妨,我們都可以在Spark Web UI上深入看一下當(dāng)前這個Stage各個task分配的數(shù)據(jù)量鹦赎,從而進一步確定是不是task分配的數(shù)據(jù)不均勻?qū)е铝藬?shù)據(jù)傾斜。

知道數(shù)據(jù)傾斜發(fā)生在哪一個Stage之后误堡,接著根據(jù)Stage劃分原理古话,推算出來發(fā)生傾斜的那個Stage對應(yīng)代碼中的哪一部分。精準推算Stage與代碼的對應(yīng)關(guān)系锁施,需要對Spark的源碼有深入的理解陪踩,這里有一個相對簡單實用的推算方法:只要看到Spark代碼中出現(xiàn)了一個Shuffle類算子或者是Spark SQL的SQL語句中出現(xiàn)了會導(dǎo)致Shuffle的語句,那么就可以判定悉抵,以此為界限劃分出了前后兩個Stage肩狂。

(2)程序異常報錯

這種情況比較容易定位有問題的代碼,可以直接查看yarn-client模式下本地log的異常信息姥饰,或通過yarn-cluster模式下的log中的異常信息傻谁。一般來說,通過異常棧信息就可以定位到你的代碼中哪一行發(fā)生了內(nèi)存溢出列粪。然后在那行代碼附近找找审磁,一般也會有Shuffle類算子,此時很可能就是這個算子導(dǎo)致了數(shù)據(jù)傾斜篱竭。

要注意的是力图,不能單純靠偶然的內(nèi)存溢出就判定發(fā)生了數(shù)據(jù)傾斜。因為代碼的bug掺逼、偶然出現(xiàn)的數(shù)據(jù)異常等吃媒,也可能會導(dǎo)致內(nèi)存溢出。通過Spark Web UI查看報錯的那個Stage的各個task的運行時間以及分配的數(shù)據(jù)量,才能確定是否是由于數(shù)據(jù)傾斜才導(dǎo)致了這次內(nèi)存溢出赘那。

4.2?優(yōu)化數(shù)據(jù)源

4.2.1?使用Hive ETL預(yù)處理數(shù)據(jù)

當(dāng)Spark作業(yè)的數(shù)據(jù)來自Hive刑桑,且Hive表中數(shù)據(jù)不均勻,即少量的key對應(yīng)了大多數(shù)的數(shù)據(jù)時募舟,對于Hive中的數(shù)據(jù)進行處理是解決數(shù)據(jù)傾斜的一種辦法祠斧。

該方法的思路是在Spark作業(yè)之前先對Hive中的數(shù)據(jù)進行聚合、join等處理拱礁,然后Spark作業(yè)處理的數(shù)據(jù)就是解決了數(shù)據(jù)傾斜問題的數(shù)據(jù)琢锋。該方法從根本上解決了數(shù)據(jù)傾斜,但是這也是一種危機轉(zhuǎn)移方案呢灶,雖然Spark作業(yè)避免了數(shù)據(jù)傾斜吴超,但是在Hive中的預(yù)操作中依舊存在數(shù)據(jù)傾斜問題。當(dāng)對于Spark作業(yè)的實時性要求很高時可以采用這種方案鸯乃,將數(shù)據(jù)傾斜提前在上游的Hive ETL中完成鲸阻,周期內(nèi)僅執(zhí)行一次,周期內(nèi)其他時間的操作都會提速缨睡。

4.2.2?過濾少數(shù)導(dǎo)致傾斜的key

當(dāng)導(dǎo)致數(shù)據(jù)傾斜的key很少鸟悴,且少量的key對作業(yè)結(jié)果影響并不大,那么過濾掉少數(shù)導(dǎo)致傾斜的key是一種不錯的處理方法奖年。

如果我們判斷那少數(shù)幾個數(shù)據(jù)量特別多的key细诸,對作業(yè)的執(zhí)行和計算結(jié)果不是特別重要的話,那么干脆就直接過濾掉那少數(shù)幾個key拾并。比如揍堰,在Spark SQL中可以使用where子句過濾掉這些key或者在Spark Core中對RDD執(zhí)行filter算子過濾掉這些key。如果需要每次作業(yè)執(zhí)行時嗅义,動態(tài)判定哪些key的數(shù)據(jù)量最多然后再進行過濾屏歹,那么可以使用sample算子對RDD進行采樣,然后計算出每個key的數(shù)量之碗,取數(shù)據(jù)量最多的key過濾掉即可蝙眶。這種方法實現(xiàn)簡單,效果也很好褪那,可以完全規(guī)避掉數(shù)據(jù)傾斜幽纷。但是在實際應(yīng)用場景中,導(dǎo)致數(shù)據(jù)傾斜的key往往較多博敬,所以該方法適用范圍較小友浸。

4.3?優(yōu)化集群并行度

4.3.1?提高Shuffle操作的并行度

當(dāng)必須要正面解決數(shù)據(jù)傾斜問題時,該方案較為適合偏窝,這也是處理數(shù)據(jù)傾斜最簡單的方法之一收恢。

通過增加Shuffle read task的數(shù)量武学,可以讓原本分配給一個task的key分配給多個task,從而讓每個task處理比原來更少的數(shù)據(jù)伦意,原理如圖4.2所示火窒。這種方案實現(xiàn)起來比較簡單,可以有效的緩解數(shù)據(jù)傾斜的影響驮肉。但是這種方法通常無法從根本解決問題熏矿,因為如果有一些極端情況出現(xiàn),如:某個key對應(yīng)的數(shù)據(jù)量占整個數(shù)據(jù)集的90%离钝,那么該key所對應(yīng)的90%的數(shù)據(jù)還是會分配到一個task中票编,這是數(shù)據(jù)傾斜現(xiàn)象還是是產(chǎn)生。

圖4.2?提高Shuffle操作的并行度

4.4?優(yōu)化算法

4.4.1?兩階段聚合

對RDD執(zhí)行reduceByKey等聚合類Shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時奈辰,比較適用這種方案栏妖。將原本相同的key通過附加隨機前綴的方式,變成多個不同的key奖恰,就可以讓原本被一個task處理的數(shù)據(jù)分散到多個task上去做局部聚合,進而解決單個task處理數(shù)據(jù)量過多的問題宛裕。接著去除掉隨機前綴瑟啃,再次進行全局聚合,就可以得到最終的結(jié)果揩尸。

這種方案的核心思想是將會產(chǎn)生數(shù)據(jù)傾斜的一次聚合作業(yè)蛹屿,分為兩個階段進行聚合。第一次聚合岩榆,先為每個key標記一個隨機數(shù)错负,隨機數(shù)范圍為[0,n],此時一個key被分為n份勇边,對標記后的key進行局部聚合犹撒;第二個階段,將局部聚合后的key所標記的隨機數(shù)去除粒褒,然后在對key進行聚合识颊。最終,Spark聚合作業(yè)就完成了奕坟,具體原理如圖4.3所示祥款。這種方案對于聚合類的Shuffle操作導(dǎo)致的數(shù)據(jù)傾斜效果很不錯,但是僅僅適用于聚合類的操作月杉。

圖4.3?兩階段聚合原理圖

4.4.2?將reduce join 轉(zhuǎn)化為map join

本方案適合于以下情況使用:RDD中使用join類型的操做或Spark SQL中使用join語句刃跛,且join操作中的一個RDD表數(shù)據(jù)量較小。普通的join是會走Shuffle過程的苛萎,而一旦Shuffle桨昙,就相當(dāng)于會將相同key的數(shù)據(jù)拉取到一個Shuffle read task中再進行join跌帐,此時就是reduce join。但是如果一個RDD是比較小的绊率,則可以采用廣播小RDD全量數(shù)據(jù)+map算子來實現(xiàn)與join同樣的效果谨敛,也就是map join,此時就不會發(fā)生Shuffle操作滤否,也就不會發(fā)生數(shù)據(jù)傾斜脸狸。

本方案使用廣播變量與map類算子代替了join操作,從而完全規(guī)避掉Shuffle操作藐俺,徹底避免了數(shù)據(jù)傾斜的發(fā)生和出現(xiàn)炊甲。首先,將較小的RDD中的數(shù)據(jù)通過collect算子拉去到Driver端內(nèi)存中欲芹,然后將該RDD的數(shù)據(jù)通過Broadcast變量廣播出去卿啡;然后,對另一個RDD執(zhí)行map操作菱父,在算子函數(shù)內(nèi)颈娜,從Broadcast變量中獲取較小RDD的全量數(shù)據(jù),與當(dāng)前RDD的每一條數(shù)據(jù)按照連接key進行比對浙宜,如果連接key相同的話官辽,那么就將兩個RDD的數(shù)據(jù)用你需要的方式連接起來,具體原理如圖4.4所示粟瞬。本方案對于join操作導(dǎo)致的數(shù)據(jù)傾斜十分有效同仆,但是本方案僅僅局限于執(zhí)行join操作的兩個RDD中有一個數(shù)據(jù)量較小時。

圖4.4?將reduce join?轉(zhuǎn)化為map join

4.4.3?采樣傾斜key并分拆join操作

本方案適合于以下情況使用:RDD中使用join類型的操做或Spark SQL中使用join語句裙品,且兩個RDD數(shù)據(jù)集的數(shù)據(jù)量都比較大俗批,且出現(xiàn)數(shù)據(jù)傾斜的原因是一個RDD中少數(shù)幾個key的數(shù)據(jù)量過大,另一個RDD的key分布均勻市怎。對于join導(dǎo)致的數(shù)據(jù)傾斜岁忘,如果只是某幾個key導(dǎo)致了傾斜,可以將少數(shù)幾個key分拆成獨立RDD焰轻,并附加隨機前綴打散成n份去進行join臭觉,此時這幾個key對應(yīng)的數(shù)據(jù)就不會集中在少數(shù)幾個task上,而是分散到多個task進行join辱志。

本方案的操作過程如下:

(1)對包含少數(shù)幾個數(shù)據(jù)量過大的key的那個RDD蝠筑,通過sample算子采樣出一份樣本來,然后統(tǒng)計一下每個key的數(shù)量揩懒,計算出來數(shù)據(jù)量最大的是哪幾個key什乙;

(2)將這幾個key對應(yīng)的數(shù)據(jù)從原來的RDD中拆分出來,形成一個單獨的RDD A已球,并給每個key都打上n以內(nèi)的隨機數(shù)作為前綴臣镣,而不會導(dǎo)致傾斜的大部分key形成另外一個RDD B辅愿;

(3)將需要join的另一個RDD也過濾出傾斜key對應(yīng)的數(shù)據(jù)并形成一個單獨的RDD C,將每條數(shù)據(jù)膨脹成n條數(shù)據(jù)忆某,這n條數(shù)據(jù)按順序附加一個0~n的前綴点待,不會導(dǎo)致傾斜的大部分key形成另外一個RDD D;

(4)將RDD A與RDD C進行join弃舒,此時導(dǎo)致數(shù)據(jù)傾斜的key分成n份癞埠,分散到多個task中去進行join,得到RDD E聋呢;

(5)將RDD B和RDD D進行join操作苗踪,得到RDD F;

(6)對RDD E與RDD F執(zhí)行union操作削锰,得到 RDD G通铲,RDD G即為最終結(jié)果。

以上操作步驟可以用圖4.5來表示器贩,該方案針對于某幾個key導(dǎo)致的數(shù)據(jù)傾斜十分有效颅夺,只需要針對少數(shù)導(dǎo)致數(shù)據(jù)傾斜的key進行擴容n倍,不需要對全量數(shù)據(jù)進行擴容磨澡,避免了占用過多內(nèi)存碗啄;但對于導(dǎo)致數(shù)據(jù)傾斜的key的數(shù)量特別多時,這種方案是無能為力的稳摄。

圖4.5?采樣傾斜key并

分拆join操作

4.4.4?使用隨機前綴和擴容RDD進行join

本方案適合于以下情況使用:RDD中使用join類型的操做或Spark SQL中使用join語句,且兩個RDD數(shù)據(jù)集的數(shù)據(jù)量都比較大饲宿,且出現(xiàn)數(shù)據(jù)傾斜的原因是一個RDD中有大量的key導(dǎo)致數(shù)據(jù)傾斜厦酬。該方案將導(dǎo)致數(shù)據(jù)傾斜的RDD中的所有key均附加隨機前綴,然后將處理后的key分散到不同的key中進行處理瘫想,而不是讓一個task處理大量的數(shù)據(jù)仗阅。該方案與4.4.3小節(jié)方案類似,但是本方案需要更多的內(nèi)存資源国夜。

本方案的操作過程如下:

(1)查看RDD/Hive表中的數(shù)據(jù)分布情況减噪,找到那個造成數(shù)據(jù)傾斜的RDD/Hive表;

(2)將該RDD的每條數(shù)據(jù)都打上一個n以內(nèi)的隨機前綴车吹;

(3)對另外一個正常的RDD進行擴容筹裕,將每條數(shù)據(jù)都擴容成n條數(shù)據(jù),擴容出來的每條數(shù)據(jù)都依次打上一個0~n的前綴窄驹;

(4)將兩個處理后的RDD進行join朝卒。

該方案對于所有join類型的數(shù)據(jù)傾斜都可以處理,效果較好乐埠,同時本方案更注重與緩解數(shù)據(jù)傾斜抗斤,而不是徹底避免數(shù)據(jù)傾斜囚企,對于內(nèi)存資源的要求很高。

五瑞眼、總結(jié)

Spark性能優(yōu)化是一項繁復(fù)的任務(wù)龙宏,需要結(jié)合實際生產(chǎn)情況對于多個方面級進行優(yōu)化,僅僅對于某一方面的調(diào)整很難取得集群性能上巨大的提升伤疙。本文對于Spark調(diào)優(yōu)主要的三個方面:資源優(yōu)化银酗、開發(fā)程序優(yōu)化、數(shù)據(jù)傾斜優(yōu)化進行了闡述掩浙,并在每個部分給出了常見的調(diào)優(yōu)方法花吟。在撰寫本文的過程中,我對于Spark整體有了更深入的理解厨姚,希望能夠砥礪前行衅澈。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市谬墙,隨后出現(xiàn)的幾起案子今布,更是在濱河造成了極大的恐慌,老刑警劉巖拭抬,帶你破解...
    沈念sama閱讀 216,544評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件部默,死亡現(xiàn)場離奇詭異,居然都是意外死亡造虎,警方通過查閱死者的電腦和手機洒闸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,430評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來犬金,“玉大人忆畅,你說我怎么就攤上這事∶ズ洌” “怎么了婚夫?”我有些...
    開封第一講書人閱讀 162,764評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長署鸡。 經(jīng)常有香客問我案糙,道長,這世上最難降的妖魔是什么靴庆? 我笑而不...
    開封第一講書人閱讀 58,193評論 1 292
  • 正文 為了忘掉前任时捌,我火速辦了婚禮,結(jié)果婚禮上撒穷,老公的妹妹穿的比我還像新娘匣椰。我一直安慰自己,他們只是感情好端礼,可當(dāng)我...
    茶點故事閱讀 67,216評論 6 388
  • 文/花漫 我一把揭開白布禽笑。 她就那樣靜靜地躺著入录,像睡著了一般。 火紅的嫁衣襯著肌膚如雪佳镜。 梳的紋絲不亂的頭發(fā)上僚稿,一...
    開封第一講書人閱讀 51,182評論 1 299
  • 那天,我揣著相機與錄音蟀伸,去河邊找鬼蚀同。 笑死,一個胖子當(dāng)著我的面吹牛啊掏,可吹牛的內(nèi)容都是我干的蠢络。 我是一名探鬼主播,決...
    沈念sama閱讀 40,063評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼迟蜜,長吁一口氣:“原來是場噩夢啊……” “哼刹孔!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起娜睛,我...
    開封第一講書人閱讀 38,917評論 0 274
  • 序言:老撾萬榮一對情侶失蹤髓霞,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后畦戒,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體方库,經(jīng)...
    沈念sama閱讀 45,329評論 1 310
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,543評論 2 332
  • 正文 我和宋清朗相戀三年障斋,在試婚紗的時候發(fā)現(xiàn)自己被綠了纵潦。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,722評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡垃环,死狀恐怖酪穿,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情晴裹,我是刑警寧澤,帶...
    沈念sama閱讀 35,425評論 5 343
  • 正文 年R本政府宣布救赐,位于F島的核電站涧团,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏经磅。R本人自食惡果不足惜泌绣,卻給世界環(huán)境...
    茶點故事閱讀 41,019評論 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望预厌。 院中可真熱鬧阿迈,春花似錦、人聲如沸轧叽。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,671評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至待逞,卻和暖如春甥角,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背识樱。 一陣腳步聲響...
    開封第一講書人閱讀 32,825評論 1 269
  • 我被黑心中介騙來泰國打工嗤无, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人怜庸。 一個月前我還...
    沈念sama閱讀 47,729評論 2 368
  • 正文 我出身青樓当犯,卻偏偏與公主長得像,于是被迫代替她去往敵國和親割疾。 傳聞我的和親對象是個殘疾皇子嚎卫,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,614評論 2 353

推薦閱讀更多精彩內(nèi)容