調(diào)優(yōu)的思路依賴平時(shí)工作中不斷總結(jié)所形成的豐富經(jīng)驗(yàn)。而這些是很難直接從知識(shí)文檔中獲取的,應(yīng)當(dāng)具體問題具體分析阵具,本文對(duì)Spark調(diào)優(yōu)進(jìn)行歸納總結(jié)碍遍,縮短了你摸爬滾打的時(shí)間。
常規(guī)調(diào)優(yōu)
- 并行度調(diào)節(jié)
理想的并行度設(shè)置怔昨,應(yīng)該是讓并行度與資源相匹配雀久,一般來說,task數(shù)量應(yīng)該設(shè)置為Spark作業(yè)總CPU core數(shù)量的2-3倍趁舀,task總數(shù)量盡量使并行執(zhí)行partition上限的3-5倍赖捌,可以避免task數(shù)量過少,Executor分配過多CPU core所造成的資源浪費(fèi)矮烹。 - 廣播變量
如果多個(gè)Executor都要使用同一數(shù)據(jù)越庇,每一個(gè)Executor都要去Driver端多次讀取會(huì)浪費(fèi)大量資源,此時(shí)首先會(huì)在自己本地的Executor對(duì)應(yīng)的BlockManager中嘗試獲取變量奉狈,如果本地沒有卤唉,BlockManager就會(huì)從Driver或者其他節(jié)點(diǎn)的BlockManager上遠(yuǎn)程拉取變量的副本,并由本地的BlockManager進(jìn)行管理仁期;之后此Executor的所有task都會(huì)直接從本地的BlockManager中獲取變量桑驱。 - Kryo序列化
默認(rèn)情況下,Spark使用Java的序列化機(jī)制跛蛋。Java的序列化機(jī)制使用方便熬的,不需要額外的配置,在算子中使用的變量實(shí)現(xiàn)Serializable接口即可赊级,但是押框,Java序列化機(jī)制的效率不高,序列化速度慢并且序列化后的數(shù)據(jù)所占用的空間依然較大理逊。
Kryo序列化機(jī)制比Java序列化機(jī)制性能提高10倍左右橡伞,Spark之所以沒有默認(rèn)使用Kryo作為序列化類庫(kù),是因?yàn)樗恢С炙袑?duì)象的序列化晋被,同時(shí)Kryo需要用戶在使用前注冊(cè)需要序列化的類型兑徘,不夠方便,但從Spark 2.0.0版本開始羡洛,簡(jiǎn)單類型道媚、簡(jiǎn)單類型數(shù)組、字符串類型的Shuffling RDDs 已經(jīng)默認(rèn)使用Kryo序列化方式了翘县。 - 調(diào)節(jié)本地等待時(shí)長(zhǎng)
根據(jù)Spark的task分配算法最域,Spark希望task能夠運(yùn)行在它要計(jì)算的數(shù)據(jù)所在的節(jié)點(diǎn)上,但是這些節(jié)點(diǎn)可用的資源可能已經(jīng)用盡锈麸,此時(shí)Spark會(huì)等待一段時(shí)間镀脂,默認(rèn)3s,如果等待指定時(shí)間后仍然無法在指定節(jié)點(diǎn)運(yùn)行忘伞,那么會(huì)自動(dòng)降級(jí)薄翅,嘗試將task分配到比較差的本地化級(jí)別所對(duì)應(yīng)的節(jié)點(diǎn)上沙兰,比如將task分配到離它要計(jì)算的數(shù)據(jù)比較近的一個(gè)節(jié)點(diǎn),然后進(jìn)行計(jì)算翘魄,如果當(dāng)前級(jí)別仍然不行鼎天,那么繼續(xù)降級(jí)。 - RDD持久化緩存
算子調(diào)優(yōu)
- mapPartitions/foreachPartition
mapPartitions和foreachPartition算子針對(duì)每個(gè)分區(qū)只進(jìn)行一次操作暑竟,相比map算子對(duì)所有數(shù)據(jù)都要執(zhí)行一次操作斋射,效率更高,但是如果數(shù)據(jù)量比較大的時(shí)候但荤,一旦內(nèi)存不足罗岖,容易出現(xiàn)OOM,也就是內(nèi)存溢出腹躁。 - filter與coalesce的配合使用
通常filter之后桑包,每個(gè)分區(qū)內(nèi)數(shù)據(jù)不一致,即數(shù)據(jù)傾斜纺非,如果還按照之前每個(gè)partition分配的task數(shù)哑了,就會(huì)出現(xiàn)運(yùn)算速度的差異。
這個(gè)時(shí)候我們可以對(duì)數(shù)據(jù)進(jìn)行重新分區(qū)烧颖,如果是分區(qū)合并弱左,最好采用coalesce算子;如果是分區(qū)分解倒信,采用repartition算子。 - repartition解決SparkSQL低并行度問題
SparkSQL的并行度不允許用戶自己指定泳梆,所以前面所說的并行度調(diào)節(jié)對(duì)SparkSQL無效鳖悠,我們可以使用repertition算子,對(duì)SparkSQL查詢出來的結(jié)果重新分區(qū)优妙,stage的并行度就等于你手動(dòng)重新分區(qū)之后的值乘综。 - reduceByKey本地聚合
reduceByKey會(huì)進(jìn)行本地的map聚合,效率比groupByKey高套硼,所有我們可以考慮將
groupByKey替換成reduceByKey卡辰。
Shuffle調(diào)優(yōu)
- 調(diào)節(jié)map端緩沖區(qū)大小
map端緩沖的默認(rèn)配置是32KB,導(dǎo)致溢寫次數(shù)過多邪意,對(duì)性能影響比較大九妈,適當(dāng)增大map端緩沖區(qū)。 - 調(diào)節(jié)reduce端拉取數(shù)據(jù)緩沖區(qū)的大小
Spark Shuffle過程中雾鬼,shuffle reduce task的buffer緩沖區(qū)大小決定了reduce task每次能夠緩沖的數(shù)據(jù)量萌朱,也就是每次能夠拉取的數(shù)據(jù)量,適當(dāng)增加緩沖區(qū)大小策菜,可以減
少拉取數(shù)據(jù)的次數(shù)晶疼,也就可以減少網(wǎng)絡(luò)傳輸?shù)拇螖?shù)酒贬,進(jìn)而提升性能。 - 增加reduce端拉取數(shù)據(jù)重試次數(shù)
Spark Shuffle過程中翠霍,reduce task拉取屬于自己的數(shù)據(jù)時(shí)锭吨,如果因?yàn)榫W(wǎng)絡(luò)異常等原因?qū)е率?huì)自動(dòng)進(jìn)行重試。建議增加重試最大次數(shù)寒匙,可以大幅提升穩(wěn)定性零如。 - 調(diào)節(jié)reduce端拉取數(shù)據(jù)等待間隔
Spark Shuffle過程中,reduce task拉取屬于自己的數(shù)據(jù)時(shí)蒋情,如果因?yàn)榫W(wǎng)絡(luò)異常等原因?qū)е率?huì)自動(dòng)進(jìn)行重試埠况,在一次失敗后,會(huì)等待一定的時(shí)間間隔再進(jìn)行重試棵癣,可以通過加大間隔時(shí)長(zhǎng)辕翰,以增加shuffle操作的穩(wěn)定性。 - 調(diào)節(jié)SortShuffle排序操作閾值
對(duì)于SortShuffleManager狈谊,如果shuffle reduce task的數(shù)量小于某一閾值則shuffle write過程中不會(huì)進(jìn)行排序操作喜命,而是直接按照未經(jīng)優(yōu)化的HashShuffleManager的方式去寫數(shù)據(jù),但是最后會(huì)將每個(gè)task產(chǎn)生的所有臨時(shí)磁盤文件都合并成一個(gè)文件河劝,并會(huì)創(chuàng)建單獨(dú)的索引文件壁榕。當(dāng)你使用SortShuffleManager時(shí),如果的確不需要排序操作赎瞎,那么建議將這個(gè)參數(shù)調(diào)大一些牌里,大于shuffle read task的數(shù)量,那么此時(shí)map-side就不會(huì)進(jìn)行排序了务甥,減少了排序的性能開銷牡辽,但是這種方式下,依然會(huì)產(chǎn)生大量的磁盤文件敞临,因此shuffle write性能有待提高态辛。
Spark數(shù)據(jù)傾斜
- 聚合原始數(shù)據(jù)
如果Spark作業(yè)的數(shù)據(jù)來源于Hive表,那么可以先在Hive表中對(duì)數(shù)據(jù)進(jìn)行聚合挺尿,比如說將同一key對(duì)應(yīng)的所有value用一種特殊的格式拼接到一個(gè)字符串里去奏黑,這樣,一個(gè)key就只有一條數(shù)據(jù)了编矾;之后熟史,對(duì)一個(gè)key的所有value進(jìn)行處理時(shí),只需要進(jìn)行map操作即可窄俏,無需再進(jìn)行任何的shuffle操作以故。通過上述方式就避免了執(zhí)行shuffle操作,也就不可能會(huì)發(fā)生任何的數(shù)據(jù)傾斜問題裆操。
還可以通過增大粒度的方式怒详,減少key的數(shù)量炉媒,key之間的數(shù)據(jù)量差異也有可
能會(huì)減少,由此可以減輕數(shù)據(jù)傾斜的現(xiàn)象和問題昆烁。 - 提高shuffle操作中的reduce并行度
增加shuffle read task的數(shù)量吊骤,可以讓原本分配給一個(gè)task的多個(gè)key分配給多個(gè)task,從而讓每個(gè)task處理比原來更少的數(shù)據(jù)静尼,在一定程度上緩解數(shù)據(jù)傾斜白粉。 - 使用隨機(jī)key實(shí)現(xiàn)雙重聚合
通過map算子給每個(gè)數(shù)據(jù)的key添加隨機(jī)數(shù)前綴,將原先一樣的key變成不一樣的key鼠渺,然后進(jìn)行第一次聚合鸭巴,這樣就可以讓原本被一個(gè)task處理的數(shù)據(jù)分散到多個(gè)task上去做局部聚合;隨后拦盹,去除掉每個(gè)key的前綴鹃祖,再次進(jìn)行聚合。 - 將reducejoin轉(zhuǎn)換為mapjoin
將較小RDD中的數(shù)據(jù)直接通過collect算子拉取到Driver端的內(nèi)存中來普舆,然后對(duì)其創(chuàng)建一個(gè)Broadcast變量恬口;接著對(duì)另外一個(gè)RDD執(zhí)行map類算子,在算子函數(shù)內(nèi)沼侣,從Broadcast變量中獲取較小RDD的全量數(shù)據(jù)祖能,與當(dāng)前RDD的每一條數(shù)據(jù)按照連接key進(jìn)行比對(duì),如果連接key相同的話蛾洛,那么就將兩個(gè)RDD的數(shù)據(jù)用你需要的方式連接起來养铸。