常規(guī)性能調(diào)優(yōu)
1. 最優(yōu)資源配置
Spark性能調(diào)優(yōu)的第一步碎税,就是為任務(wù)分配更多的資源贝乎,在一定范圍內(nèi),增加資源的分配與性能的提升是成正比的鱼冀,實(shí)現(xiàn)了最優(yōu)的資源配置后,在此基礎(chǔ)上再考慮進(jìn)行后面論述的性能調(diào)優(yōu)策略悠就。資源的分配在使用腳本提交Spark任務(wù)時(shí)進(jìn)行指定千绪。
名稱 | 說明 |
---|---|
--num-executors | 配置Executor的數(shù)量 |
--driver-memory | 配置Driver內(nèi)存(影響不大) |
--executor-memory | 配置每個(gè)Executor的內(nèi)存大小 |
--executor-cores | 配置每個(gè)Executor的CPU core數(shù)量 |
配置 | 配置作用 |
---|---|
增加Executor·個(gè)數(shù) | 在資源允許的情況下,增加Executor的個(gè)數(shù)可以提高執(zhí)行梗脾,task的并行度荸型。比如有4個(gè)Executor,每個(gè)Executor有2個(gè)CPU core炸茧,那么可以并行執(zhí)行8個(gè)task瑞妇,如果將Executor的個(gè)數(shù)增加到8個(gè)(資源允許的情況下),那么可以并行執(zhí)行16個(gè)task梭冠,此時(shí)的并行能力提升了一倍 |
增加每個(gè)Executor的CPU core個(gè)數(shù) | 在資源允許的情況下辕狰,增加每個(gè)Executor的Cpu core個(gè)數(shù),可以提高執(zhí)行task的并行度控漠。比如有4個(gè)Executor蔓倍,每個(gè)Executor有2個(gè)CPU core,那么可以并行執(zhí)行8個(gè)task盐捷,如果將每個(gè)Executor的CPU core個(gè)數(shù)增加到4個(gè)(資源允許的情況下)偶翅,那么可以并行執(zhí)行16個(gè)task,此時(shí)的并行能力提升了一倍 |
增加每個(gè)Executor的內(nèi)存量 | 在資源允許的情況下碉渡,增加每個(gè)Executor的內(nèi)存量以后聚谁,對(duì)性能的提升有三點(diǎn): 1. 可以緩存更多的數(shù)據(jù)(即對(duì)RDD進(jìn)行cache)蚂夕,寫入磁盤的數(shù)據(jù)相應(yīng)減少洲炊,甚至可以不寫入磁盤绸吸,減少了可能的磁盤IO砌些;2. 可以為shuffle操作提供更多內(nèi)存,即有更多空間來存放reduce端拉取的數(shù)據(jù)朵耕,寫入磁盤的數(shù)據(jù)相應(yīng)減少秦爆,甚至可以不寫入磁盤,減少了可能的磁盤IO憔披;3. 可以為task的執(zhí)行提供更多內(nèi)存,在task的執(zhí)行過程中可能創(chuàng)建很多對(duì)象爸吮,內(nèi)存較小時(shí)會(huì)引發(fā)頻繁的GC芬膝,增加內(nèi)存后,可以避免頻繁的GC形娇,提升整體性能 |
例如:
/usr/local/spark/bin/spark-submit \
--class com.atguigu.spark.WordCount \
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 3 \
--master yarn-cluster \
--queue root.default \
--conf spark.yarn.executor.memoryOverhead=2048 \
--conf spark.core.connection.ack.wait.timeout=300 \
/usr/local/spark/spark.jar
參數(shù)配置參考值:
--num-executors:50~100
--driver-memory:1G~5G
--executor-memory:6G~10G
--executor-cores:3
--master:實(shí)際生產(chǎn)環(huán)境一般使用yarn-cluster
2. RDD優(yōu)化
- RDD持久化
在Spark中锰霜,當(dāng)多次對(duì)同一個(gè)RDD執(zhí)行算子操作時(shí),每一次都會(huì)對(duì)這個(gè)RDD以之前的父RDD重新計(jì)算一次桐早,這種情況是必須要避免的癣缅,對(duì)同一個(gè)RDD的重復(fù)計(jì)算是對(duì)資源的極大浪費(fèi),因此哄酝,必須對(duì)多次使用的RDD進(jìn)行持久化友存,通過持久化將公共RDD的數(shù)據(jù)緩存到內(nèi)存/磁盤中,之后對(duì)于公共RDD的計(jì)算都會(huì)從內(nèi)存/磁盤中直接獲取RDD數(shù)據(jù)陶衅。
對(duì)于RDD的持久化屡立,有兩點(diǎn)需要說明:
第一,RDD的持久化是可以進(jìn)行序列化的搀军,當(dāng)內(nèi)存無法將RDD的數(shù)據(jù)完整的進(jìn)行存放的時(shí)候膨俐,可以考慮使用序列化的方式減小數(shù)據(jù)體積,將數(shù)據(jù)完整存儲(chǔ)在內(nèi)存中罩句。
第二焚刺,如果對(duì)于數(shù)據(jù)的可靠性要求很高,并且內(nèi)存充足门烂,可以使用副本機(jī)制乳愉,對(duì)RDD數(shù)據(jù)進(jìn)行持久化。當(dāng)持久化啟用了復(fù)本機(jī)制時(shí)诅福,對(duì)于持久化的每個(gè)數(shù)據(jù)單元都存儲(chǔ)一個(gè)副本匾委,放在其他節(jié)點(diǎn)上面,由此實(shí)現(xiàn)數(shù)據(jù)的容錯(cuò)氓润,一旦一個(gè)副本數(shù)據(jù)丟失赂乐,不需要重新計(jì)算,還可以使用另外一個(gè)副本
- RDD盡可能早的filter操作
獲取到初始RDD后咖气,應(yīng)該考慮盡早地過濾掉不需要的數(shù)據(jù)挨措,進(jìn)而減少對(duì)內(nèi)存的占用挖滤,從而提升Spark作業(yè)的運(yùn)行效率
3. 并行度調(diào)節(jié)
Spark作業(yè)中的并行度指各個(gè)stage的task的數(shù)量。
如果并行度設(shè)置不合理而導(dǎo)致并行度過低浅役,會(huì)導(dǎo)致資源的極大浪費(fèi)斩松,例如,20個(gè)Executor觉既,每個(gè)Executor分配3個(gè)CPU core惧盹,而Spark作業(yè)有40個(gè)task,這樣每個(gè)Executor分配到的task個(gè)數(shù)是2個(gè)瞪讼,這就使得每個(gè)Executor有一個(gè)CPU core空閑钧椰,導(dǎo)致資源的浪費(fèi)。
理想的并行度設(shè)置符欠,應(yīng)該是讓并行度與資源相匹配嫡霞,簡(jiǎn)單來說就是在資源允許的前提下,并行度要設(shè)置的盡可能大希柿,達(dá)到可以充分利用集群資源诊沪。合理的設(shè)置并行度,可以提升整個(gè)Spark作業(yè)的性能和運(yùn)行速度曾撤。
Spark官方推薦端姚,task數(shù)量應(yīng)該設(shè)置為Spark作業(yè)總CPU core數(shù)量的2~3倍。之所以沒有推薦task數(shù)量與CPU core總數(shù)相等盾戴,是因?yàn)閠ask的執(zhí)行時(shí)間不同寄锐,有的task執(zhí)行速度快而有的task執(zhí)行速度慢,如果task數(shù)量與CPU core總數(shù)相等尖啡,那么執(zhí)行快的task執(zhí)行完成后橄仆,會(huì)出現(xiàn)CPU core空閑的情況。如果task數(shù)量設(shè)置為CPU core總數(shù)的2~3倍衅斩,那么一個(gè)task執(zhí)行完畢后盆顾,CPU core會(huì)立刻執(zhí)行下一個(gè)task,降低了資源的浪費(fèi)畏梆,同時(shí)提升了Spark作業(yè)運(yùn)行的效率您宪。
Spark作業(yè)并行度的設(shè)置:
val conf = new SparkConf().set("spark.default.parallelism", "500")
4. 廣播大變量
默認(rèn)情況下,task中的算子中如果使用了外部的變量奠涌,每個(gè)task都會(huì)獲取一份變量的復(fù)本宪巨,這就造成了內(nèi)存的極大消耗。一方面溜畅,如果后續(xù)對(duì)RDD進(jìn)行持久化捏卓,可能就無法將RDD數(shù)據(jù)存入內(nèi)存,只能寫入磁盤慈格,磁盤IO將會(huì)嚴(yán)重消耗性能怠晴;另一方面遥金,task在創(chuàng)建對(duì)象的時(shí)候,也許會(huì)發(fā)現(xiàn)堆內(nèi)存無法存放新創(chuàng)建的對(duì)象蒜田,這就會(huì)導(dǎo)致頻繁的GC稿械,GC會(huì)導(dǎo)致工作線程停止,進(jìn)而導(dǎo)致Spark暫停工作一段時(shí)間冲粤,嚴(yán)重影響Spark性能美莫。
假設(shè)當(dāng)前任務(wù)配置了20個(gè)Executor,指定500個(gè)task梯捕,有一個(gè)20M的變量被所有task共用茂嗓,此時(shí)會(huì)在500個(gè)task中產(chǎn)生500個(gè)副本,耗費(fèi)集群10G的內(nèi)存科阎,如果使用了廣播變量, 那么每個(gè)Executor保存一個(gè)副本忿族,一共消耗400M內(nèi)存锣笨,內(nèi)存消耗減少了5倍。
廣播變量在每個(gè)Executor保存一個(gè)副本道批,此Executor的所有task共用此廣播變量错英,這讓變量產(chǎn)生的副本數(shù)量大大減少。
在初始階段隆豹,廣播變量只在Driver中有一份副本椭岩。task在運(yùn)行的時(shí)候,想要使用廣播變量中的數(shù)據(jù)璃赡,此時(shí)首先會(huì)在自己本地的Executor對(duì)應(yīng)的BlockManager中嘗試獲取變量判哥,如果本地沒有,BlockManager就會(huì)從Driver或者其他節(jié)點(diǎn)的BlockManager上遠(yuǎn)程拉取變量的復(fù)本碉考,并由本地的BlockManager進(jìn)行管理塌计;之后此Executor的所有task都會(huì)直接從本地的BlockManager中獲取變量。
5. Kryo序列化
默認(rèn)情況下侯谁,Spark使用Java的序列化機(jī)制锌仅。Java的序列化機(jī)制使用方便,不需要額外的配置墙贱,在算子中使用的變量實(shí)現(xiàn)Serializable接口即可热芹,但是,Java序列化機(jī)制的效率不高惨撇,序列化速度慢并且序列化后的數(shù)據(jù)所占用的空間依然較大伊脓。
Kryo序列化機(jī)制比Java序列化機(jī)制性能提高10倍左右,Spark之所以沒有默認(rèn)使用Kryo作為序列化類庫串纺,是因?yàn)樗恢С炙袑?duì)象的序列化丽旅,同時(shí)Kryo需要用戶在使用前注冊(cè)需要序列化的類型椰棘,不夠方便,但從Spark 2.0.0版本開始榄笙,簡(jiǎn)單類型邪狞、簡(jiǎn)單類型數(shù)組、字符串類型的Shuffling RDDs 已經(jīng)默認(rèn)使用Kryo序列化方式了茅撞。
配置Kryo序列化:
//創(chuàng)建SparkConf對(duì)象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用Kryo序列化庫帆卓,如果要使用Java序列化庫,需要把該行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
//在Kryo序列化庫中注冊(cè)自定義的類集合米丘,如果要使用Java序列化庫剑令,需要把該行屏蔽掉
conf.set("spark.kryo.registrator", "atguigu.com.MyKryoRegistrator");
5. 調(diào)節(jié)本地化等待時(shí)長(zhǎng)
Spark作業(yè)運(yùn)行過程中,Driver會(huì)對(duì)每一個(gè)stage的task進(jìn)行分配拄查。根據(jù)Spark的task分配算法吁津,Spark希望task能夠運(yùn)行在它要計(jì)算的數(shù)據(jù)算在的節(jié)點(diǎn)(數(shù)據(jù)本地化思想),這樣就可以避免數(shù)據(jù)的網(wǎng)絡(luò)傳輸堕扶。通常來說碍脏,task可能不會(huì)被分配到它處理的數(shù)據(jù)所在的節(jié)點(diǎn),因?yàn)檫@些節(jié)點(diǎn)可用的資源可能已經(jīng)用盡稍算,此時(shí)典尾,Spark會(huì)等待一段時(shí)間,默認(rèn)3s糊探,如果等待指定時(shí)間后仍然無法在指定節(jié)點(diǎn)運(yùn)行钾埂,那么會(huì)自動(dòng)降級(jí),嘗試將task分配到比較差的本地化級(jí)別所對(duì)應(yīng)的節(jié)點(diǎn)上科平,比如將task分配到離它要計(jì)算的數(shù)據(jù)比較近的一個(gè)節(jié)點(diǎn)褥紫,然后進(jìn)行計(jì)算,如果當(dāng)前級(jí)別仍然不行瞪慧,那么繼續(xù)降級(jí)故源。
當(dāng)task要處理的數(shù)據(jù)不在task所在節(jié)點(diǎn)上時(shí),會(huì)發(fā)生數(shù)據(jù)的傳輸汞贸。task會(huì)通過所在節(jié)點(diǎn)的BlockManager獲取數(shù)據(jù)绳军,BlockManager發(fā)現(xiàn)數(shù)據(jù)不在本地時(shí),戶通過網(wǎng)絡(luò)傳輸組件從數(shù)據(jù)所在節(jié)點(diǎn)的BlockManager處獲取數(shù)據(jù)矢腻。
網(wǎng)絡(luò)傳輸數(shù)據(jù)的情況是我們不愿意看到的门驾,大量的網(wǎng)絡(luò)傳輸會(huì)嚴(yán)重影響性能,因此多柑,我們希望通過調(diào)節(jié)本地化等待時(shí)長(zhǎng)奶是,如果在等待時(shí)長(zhǎng)這段時(shí)間內(nèi),目標(biāo)節(jié)點(diǎn)處理完成了一部分task,那么當(dāng)前的task將有機(jī)會(huì)得到執(zhí)行聂沙,這樣就能夠改善Spark作業(yè)的整體性能秆麸。
Spark的本地化等級(jí)如下:
名稱 | 解析 |
---|---|
PROCESS_LOCAL | 進(jìn)程本地化,task和數(shù)據(jù)在同一個(gè)Executor中及汉,性能最好沮趣。 |
NODE_LOCAL | 節(jié)點(diǎn)本地化,task和數(shù)據(jù)在同一個(gè)節(jié)點(diǎn)中坷随,但是task和數(shù)據(jù)不在同一個(gè)Executor中房铭,數(shù)據(jù)需要在進(jìn)程間進(jìn)行傳輸。 |
RACK_LOCAL | 機(jī)架本地化温眉,task和數(shù)據(jù)在同一個(gè)機(jī)架的兩個(gè)節(jié)點(diǎn)上缸匪,數(shù)據(jù)需要通過網(wǎng)絡(luò)在節(jié)點(diǎn)之間進(jìn)行傳輸。 |
NO_PREF | 對(duì)于task來說类溢,從哪里獲取都一樣凌蔬,沒有好壞之分。 |
ANY | task和數(shù)據(jù)可以在集群的任何地方闯冷,而且不在一個(gè)機(jī)架中龟梦,性能最差。 |
在Spark項(xiàng)目開發(fā)階段窃躲,可以使用client模式對(duì)程序進(jìn)行測(cè)試,此時(shí)钦睡,可以在本地看到比較全的日志信息蒂窒,日志信息中有明確的task數(shù)據(jù)本地化的級(jí)別,如果大部分都是PROCESS_LOCAL荞怒,那么就無需進(jìn)行調(diào)節(jié)洒琢,但是如果發(fā)現(xiàn)很多的級(jí)別都是NODE_LOCAL、ANY褐桌,那么需要對(duì)本地化的等待時(shí)長(zhǎng)進(jìn)行調(diào)節(jié)衰抑,通過延長(zhǎng)本地化等待時(shí)長(zhǎng),看看task的本地化級(jí)別有沒有提升荧嵌,并觀察Spark作業(yè)的運(yùn)行時(shí)間有沒有縮短呛踊。
注意,不要將本地化等待時(shí)長(zhǎng)延長(zhǎng)地過長(zhǎng)啦撮,導(dǎo)致因?yàn)榇罅康牡却龝r(shí)長(zhǎng)谭网,使得Spark作業(yè)的運(yùn)行時(shí)間反而增加了。
算子調(diào)優(yōu)
1. mapPartitions
普通的map算子對(duì)RDD中的每一個(gè)元素進(jìn)行操作赃春,而mapPartitions算子對(duì)RDD中每一個(gè)分區(qū)進(jìn)行操作愉择。如果是普通的map算子,假設(shè)一個(gè)partition有1萬條數(shù)據(jù),那么map算子中的function要執(zhí)行1萬次锥涕,也就是對(duì)每個(gè)元素進(jìn)行操作衷戈。
如果是mapPartition算子,由于一個(gè)task處理一個(gè)RDD的partition层坠,那么一個(gè)task只會(huì)執(zhí)行一次function殖妇,function一次接收所有的partition數(shù)據(jù),效率比較高窿春。
比如拉一,當(dāng)要把RDD中的所有數(shù)據(jù)通過JDBC寫入數(shù)據(jù),如果使用map算子旧乞,那么需要對(duì)RDD中的每一個(gè)元素都創(chuàng)建一個(gè)數(shù)據(jù)庫連接蔚润,這樣對(duì)資源的消耗很大,如果使用mapPartitions算子尺栖,那么針對(duì)一個(gè)分區(qū)的數(shù)據(jù)嫡纠,只需要建立一個(gè)數(shù)據(jù)庫連接。
mapPartitions算子也存在一些缺點(diǎn):對(duì)于普通的map操作延赌,一次處理一條數(shù)據(jù)除盏,如果在處理了2000條數(shù)據(jù)后內(nèi)存不足,那么可以將已經(jīng)處理完的2000條數(shù)據(jù)從內(nèi)存中垃圾回收掉挫以;但是如果使用mapPartitions算子者蠕,但數(shù)據(jù)量非常大時(shí),function一次處理一個(gè)分區(qū)的數(shù)據(jù)掐松,如果一旦內(nèi)存不足踱侣,此時(shí)無法回收內(nèi)存,就可能會(huì)OOM大磺,即內(nèi)存溢出抡句。
因此,mapPartitions算子適用于數(shù)據(jù)量不是特別大的時(shí)候杠愧,此時(shí)使用mapPartitions算子對(duì)性能的提升效果還是不錯(cuò)的待榔。(當(dāng)數(shù)據(jù)量很大的時(shí)候,一旦使用mapPartitions算子流济,就會(huì)直接OOM)
在項(xiàng)目中锐锣,應(yīng)該首先估算一下RDD的數(shù)據(jù)量、每個(gè)partition的數(shù)據(jù)量绳瘟,以及分配給每個(gè)Executor的內(nèi)存資源刺下,如果資源允許,可以考慮使用mapPartitions算子代替map稽荧。
2. foreachPartition優(yōu)化數(shù)據(jù)庫操作
在生產(chǎn)環(huán)境中橘茉,通常使用foreachPartition算子來完成數(shù)據(jù)庫的寫入工腋,通過foreachPartition算子的特性,可以優(yōu)化寫數(shù)據(jù)庫的性能畅卓。
如果使用foreach算子完成數(shù)據(jù)庫的操作擅腰,由于foreach算子是遍歷RDD的每條數(shù)據(jù),因此翁潘,每條數(shù)據(jù)都會(huì)建立一個(gè)數(shù)據(jù)庫連接趁冈,這是對(duì)資源的極大浪費(fèi),因此拜马,對(duì)于寫數(shù)據(jù)庫操作渗勘,我們應(yīng)當(dāng)使用foreachPartition算子。
與mapPartitions算子非常相似俩莽,foreachPartition是將RDD的每個(gè)分區(qū)作為遍歷對(duì)象旺坠,一次處理一個(gè)分區(qū)的數(shù)據(jù),也就是說扮超,如果涉及數(shù)據(jù)庫的相關(guān)操作取刃,一個(gè)分區(qū)的數(shù)據(jù)只需要?jiǎng)?chuàng)建一次數(shù)據(jù)庫連接。
使用了foreachPartition算子后出刷,可以獲得以下的性能提升:
- 對(duì)于我們寫的function函數(shù)璧疗,一次處理一整個(gè)分區(qū)的數(shù)據(jù);
- 對(duì)于一個(gè)分區(qū)內(nèi)的數(shù)據(jù)馁龟,創(chuàng)建唯一的數(shù)據(jù)庫連接崩侠;
- 只需要向數(shù)據(jù)庫發(fā)送一次SQL語句和多組參數(shù);
在生產(chǎn)環(huán)境中坷檩,全部都會(huì)使用foreachPartition算子完成數(shù)據(jù)庫操作却音。foreachPartition算子存在一個(gè)問題,與mapPartitions算子類似淌喻,如果一個(gè)分區(qū)的數(shù)據(jù)量特別大,可能會(huì)造成OOM雀摘,即內(nèi)存溢出裸删。
3. filter與coalesce的配合使用
在Spark任務(wù)中我們經(jīng)常會(huì)使用filter算子完成RDD中數(shù)據(jù)的過濾,在任務(wù)初始階段阵赠,從各個(gè)分區(qū)中加載到的數(shù)據(jù)量是相近的涯塔,但是一旦進(jìn)過filter過濾后,每個(gè)分區(qū)的數(shù)據(jù)量有可能會(huì)存在較大差異清蚀。
如圖我們可以發(fā)現(xiàn)兩個(gè)問題:
- 每個(gè)partition的數(shù)據(jù)量變小了匕荸,如果還按照之前與partition相等的task個(gè)數(shù)去處理當(dāng)前數(shù)據(jù),有點(diǎn)浪費(fèi)task的計(jì)算資源枷邪;
- 每個(gè)partition的數(shù)據(jù)量不一樣榛搔,會(huì)導(dǎo)致后面的每個(gè)task處理每個(gè)partition數(shù)據(jù)的時(shí)候,每個(gè)task要處理的數(shù)據(jù)量不同,這很有可能導(dǎo)致數(shù)據(jù)傾斜問題践惑。
如圖所示腹泌,第二個(gè)分區(qū)的數(shù)據(jù)過濾后只剩100條,而第三個(gè)分區(qū)的數(shù)據(jù)過濾后剩下800條尔觉,在相同的處理邏輯下凉袱,第二個(gè)分區(qū)對(duì)應(yīng)的task處理的數(shù)據(jù)量與第三個(gè)分區(qū)對(duì)應(yīng)的task處理的數(shù)據(jù)量差距達(dá)到了8倍,這也會(huì)導(dǎo)致運(yùn)行速度可能存在數(shù)倍的差距侦铜,這也就是數(shù)據(jù)傾斜問題专甩。
針對(duì)上述的兩個(gè)問題,我們分別進(jìn)行分析:
針對(duì)第一個(gè)問題钉稍,既然分區(qū)的數(shù)據(jù)量變小了涤躲,我們希望可以對(duì)分區(qū)數(shù)據(jù)進(jìn)行重新分配,比如將原來4個(gè)分區(qū)的數(shù)據(jù)轉(zhuǎn)化到2個(gè)分區(qū)中嫁盲,這樣只需要用后面的兩個(gè)task進(jìn)行處理即可篓叶,避免了資源的浪費(fèi)。
針對(duì)第二個(gè)問題羞秤,解決方法和第一個(gè)問題的解決方法非常相似缸托,對(duì)分區(qū)數(shù)據(jù)重新分配,讓每個(gè)partition中的數(shù)據(jù)量差不多瘾蛋,這就避免了數(shù)據(jù)傾斜問題俐镐。
那么具體應(yīng)該如何實(shí)現(xiàn)上面的解決思路?我們需要coalesce算子哺哼。
repartition與coalesce都可以用來進(jìn)行重分區(qū)佩抹,其中repartition只是coalesce接口中shuffle為true的簡(jiǎn)易實(shí)現(xiàn),coalesce默認(rèn)情況下不進(jìn)行shuffle取董,但是可以通過參數(shù)進(jìn)行設(shè)置棍苹。
假設(shè)我們希望將原本的分區(qū)個(gè)數(shù)A通過重新分區(qū)變?yōu)锽,那么有以下幾種情況:
A > B(多數(shù)分區(qū)合并為少數(shù)分區(qū))
① A與B相差值不大
此時(shí)使用coalesce即可茵汰,無需shuffle過程枢里。
② A與B相差值很大
此時(shí)可以使用coalesce并且不啟用shuffle過程,但是會(huì)導(dǎo)致合并過程性能低下蹂午,所以推薦設(shè)置coalesce的第二個(gè)參數(shù)為true栏豺,即啟動(dòng)shuffle過程。A < B(少數(shù)分區(qū)分解為多數(shù)分區(qū))
此時(shí)使用repartition即可豆胸,如果使用coalesce需要將shuffle設(shè)置為true奥洼,否則coalesce無效。
我們可以在filter操作之后晚胡,使用coalesce算子針對(duì)每個(gè)partition的數(shù)據(jù)量各不相同的情況灵奖,壓縮partition的數(shù)量嚼沿,而且讓每個(gè)partition的數(shù)據(jù)量盡量均勻緊湊,以便于后面的task進(jìn)行計(jì)算操作桑寨,在某種程度上能夠在一定程度上提升性能伏尼。
注意:local模式是進(jìn)程內(nèi)模擬集群運(yùn)行,已經(jīng)對(duì)并行度和分區(qū)數(shù)量有了一定的內(nèi)部?jī)?yōu)化尉尾,因此不用去設(shè)置并行度和分區(qū)數(shù)量爆阶。
4. repartition解決SparkSQL低并行度問題
常規(guī)性能調(diào)優(yōu)中我們講解了并行度的調(diào)節(jié)策略,但是沙咏,并行度的設(shè)置對(duì)于Spark SQL是不生效的辨图,用戶設(shè)置的并行度只對(duì)于Spark SQL以外的所有Spark的stage生效。
Spark SQL的并行度不允許用戶自己指定肢藐,Spark SQL自己會(huì)默認(rèn)根據(jù)hive表對(duì)應(yīng)的HDFS文件的split個(gè)數(shù)自動(dòng)設(shè)置Spark SQL所在的那個(gè)stage的并行度故河,用戶自己通spark.default.parallelism參數(shù)指定的并行度,只會(huì)在沒Spark SQL的stage中生效吆豹。
由于Spark SQL所在stage的并行度無法手動(dòng)設(shè)置鱼的,如果數(shù)據(jù)量較大,并且此stage中后續(xù)的transformation操作有著復(fù)雜的業(yè)務(wù)邏輯痘煤,而Spark SQL自動(dòng)設(shè)置的task數(shù)量很少凑阶,這就意味著每個(gè)task要處理為數(shù)不少的數(shù)據(jù)量,然后還要執(zhí)行非常復(fù)雜的處理邏輯衷快,這就可能表現(xiàn)為第一個(gè)有Spark SQL的stage速度很慢宙橱,而后續(xù)的沒有Spark SQL的stage運(yùn)行速度非常快蘸拔。
為了解決Spark SQL無法設(shè)置并行度和task數(shù)量的問題师郑,我們可以使用repartition算子。
Spark SQL這一步的并行度和task數(shù)量肯定是沒有辦法去改變了调窍,但是宝冕,對(duì)于Spark SQL查詢出來的RDD,立即使用repartition算子邓萨,去重新進(jìn)行分區(qū)地梨,這樣可以重新分區(qū)為多個(gè)partition,從repartition之后的RDD操作先誉,由于不再設(shè)計(jì)Spark SQL湿刽,因此stage的并行度就會(huì)等于你手動(dòng)設(shè)置的值的烁,這樣就避免了Spark SQL所在的stage只能用少量的task去處理大量數(shù)據(jù)并執(zhí)行復(fù)雜的算法邏輯褐耳。
5. reduceByKey本地聚合
reduceByKey相較于普通的shuffle操作一個(gè)顯著的特點(diǎn)就是會(huì)進(jìn)行map端的本地聚合,map端會(huì)先對(duì)本地的數(shù)據(jù)進(jìn)行combine操作渴庆,然后將數(shù)據(jù)寫入給下個(gè)stage的每個(gè)task創(chuàng)建的文件中铃芦,也就是在map端雅镊,對(duì)每一個(gè)key對(duì)應(yīng)的value,執(zhí)行reduceByKey算子函數(shù)刃滓。
reduceByKey算子的執(zhí)行過程如下:
使用reduceByKey對(duì)性能的提升如下:
1.本地聚合后仁烹,在map端的數(shù)據(jù)量變少,減少了磁盤IO咧虎,也減少了對(duì)磁盤空間的占用卓缰;
2.本地聚合后,下一個(gè)stage拉取的數(shù)據(jù)量變少砰诵,減少了網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)量征唬;
3.本地聚合后,在reduce端進(jìn)行數(shù)據(jù)緩存的內(nèi)存占用減少茁彭;
4.本地聚合后总寒,在reduce端進(jìn)行聚合的數(shù)據(jù)量減少。
基于reduceByKey的本地聚合特征理肺,我們應(yīng)該考慮使用reduceByKey代替其他的shuffle算子摄闸。比如groupByKey。
Shuffle調(diào)優(yōu)
1. 調(diào)節(jié)map端緩沖區(qū)大小
在Spark任務(wù)運(yùn)行過程中妹萨,如果shuffle的map端處理的數(shù)據(jù)量比較大年枕,但是map端緩沖的大小是固定的,可能會(huì)出現(xiàn)map端緩沖數(shù)據(jù)頻繁spill溢寫到磁盤文件中的情況眠副,使得性能非常低下画切,通過調(diào)節(jié)map端緩沖的大小,可以避免頻繁的磁盤IO操作囱怕,進(jìn)而提升Spark任務(wù)的整體性能霍弹。
map端緩沖的默認(rèn)配置是32KB,如果每個(gè)task處理640KB的數(shù)據(jù)娃弓,那么會(huì)發(fā)生640/32 = 20次溢寫典格,如果每個(gè)task處理64000KB的數(shù)據(jù),機(jī)會(huì)發(fā)生64000/32=2000此溢寫台丛,這對(duì)于性能的影響是非常嚴(yán)重的耍缴。
map端緩沖的配置方法:
val conf = new SparkConf()
.set("spark.shuffle.file.buffer", "64")
2. 調(diào)節(jié)reduce端拉取數(shù)據(jù)緩沖區(qū)大小
Spark Shuffle過程中,shuffle reduce task的buffer緩沖區(qū)大小決定了reduce task每次能夠緩沖的數(shù)據(jù)量挽霉,也就是每次能夠拉取的數(shù)據(jù)量防嗡,如果內(nèi)存資源較為充足,適當(dāng)增加拉取數(shù)據(jù)緩沖區(qū)的大小侠坎,可以減少拉取數(shù)據(jù)的次數(shù)蚁趁,也就可以減少網(wǎng)絡(luò)傳輸?shù)拇螖?shù),進(jìn)而提升性能实胸。
reduce端數(shù)據(jù)拉取緩沖區(qū)的大小可以通過spark.reducer.maxSizeInFlight參數(shù)進(jìn)行設(shè)置他嫡,默認(rèn)為48MB番官,該參數(shù)的設(shè)置方法:
val conf = new SparkConf()
.set("spark.reducer.maxSizeInFlight", "96")
3. 調(diào)節(jié)reduce端拉取數(shù)據(jù)重試次數(shù)
Spark Shuffle過程中,reduce task拉取屬于自己的數(shù)據(jù)時(shí)钢属,如果因?yàn)榫W(wǎng)絡(luò)異常等原因?qū)е率?huì)自動(dòng)進(jìn)行重試徘熔。對(duì)于那些包含了特別耗時(shí)的shuffle操作的作業(yè),建議增加重試最大次數(shù)(比如60次)淆党,以避免由于JVM的full gc或者網(wǎng)絡(luò)不穩(wěn)定等因素導(dǎo)致的數(shù)據(jù)拉取失敗酷师。在實(shí)踐中發(fā)現(xiàn),對(duì)于針對(duì)超大數(shù)據(jù)量(數(shù)十億~上百億)的shuffle過程染乌,調(diào)節(jié)該參數(shù)可以大幅度提升穩(wěn)定性窒升。
reduce端拉取數(shù)據(jù)重試次數(shù)可以通過spark.shuffle.io.maxRetries參數(shù)進(jìn)行設(shè)置,該參數(shù)就代表了可以重試的最大次數(shù)慕匠。如果在指定次數(shù)之內(nèi)拉取還是沒有成功饱须,就可能會(huì)導(dǎo)致作業(yè)執(zhí)行失敗,默認(rèn)為3.
該參數(shù)的設(shè)置方法:
val conf = new SparkConf()
.set("spark.shuffle.io.maxRetries", "6")
4. 調(diào)節(jié)reduce端拉取數(shù)據(jù)等待間隔
Spark Shuffle過程中台谊,reduce task拉取屬于自己的數(shù)據(jù)時(shí)蓉媳,如果因?yàn)榫W(wǎng)絡(luò)異常等原因?qū)е率?huì)自動(dòng)進(jìn)行重試,在一次失敗后锅铅,會(huì)等待一定的時(shí)間間隔再進(jìn)行重試酪呻,可以通過加大間隔時(shí)長(zhǎng)(比如60s),以增加shuffle操作的穩(wěn)定性盐须。
reduce端拉取數(shù)據(jù)等待間隔可以通過spark.shuffle.io.retryWait參數(shù)進(jìn)行設(shè)置玩荠,默認(rèn)值為5s。
該參數(shù)的設(shè)置方法:
val conf = new SparkConf()
.set("spark.shuffle.io.retryWait", "60s")
5. 調(diào)節(jié)SortShuffle排序操作閾值
對(duì)于SortShuffleManager贼邓,如果shuffle reduce task的數(shù)量小于某一閾值則shuffle write過程中不會(huì)進(jìn)行排序操作阶冈,而是直接按照未經(jīng)優(yōu)化的HashShuffleManager的方式去寫數(shù)據(jù),但是最后會(huì)將每個(gè)task產(chǎn)生的所有臨時(shí)磁盤文件都合并成一個(gè)文件塑径,并會(huì)創(chuàng)建單獨(dú)的索引文件女坑。
當(dāng)你使用SortShuffleManager時(shí),如果的確不需要排序操作统舀,那么建議將這個(gè)參數(shù)調(diào)大一些匆骗,大于shuffle read task的數(shù)量,那么此時(shí)map-side就不會(huì)進(jìn)行排序了誉简,減少了排序的性能開銷碉就,但是這種方式下,依然會(huì)產(chǎn)生大量的磁盤文件闷串,因此shuffle write性能有待提高瓮钥。
SortShuffleManager排序操作閾值的設(shè)置可以通過spark.shuffle.sort. bypassMergeThreshold這一參數(shù)進(jìn)行設(shè)置,默認(rèn)值為200。
該參數(shù)的設(shè)置方法:
val conf = new SparkConf()
.set("spark.shuffle.sort.bypassMergeThreshold", "400")
JVM調(diào)優(yōu)
1. 降低cache操作的內(nèi)存占比
1). 靜態(tài)內(nèi)存管理機(jī)制
根據(jù)Spark靜態(tài)內(nèi)存管理機(jī)制骏庸,堆內(nèi)存被劃分為了兩塊,Storage和Execution年叮。Storage主要用于緩存RDD數(shù)據(jù)和broadcast數(shù)據(jù)具被,Execution主要用于緩存在shuffle過程中產(chǎn)生的中間數(shù)據(jù),Storage占系統(tǒng)內(nèi)存的60%只损,Execution占系統(tǒng)內(nèi)存的20%一姿,并且兩者完全獨(dú)立。
在一般情況下跃惫,Storage的內(nèi)存都提供給了cache操作叮叹,但是如果在某些情況下cache操作內(nèi)存不是很緊張,而task的算子中創(chuàng)建的對(duì)象很多爆存,Execution內(nèi)存又相對(duì)較小蛉顽,這回導(dǎo)致頻繁的minor gc,甚至于頻繁的full gc先较,進(jìn)而導(dǎo)致Spark頻繁的停止工作携冤,性能影響會(huì)很大。
在Spark UI中可以查看每個(gè)stage的運(yùn)行情況闲勺,包括每個(gè)task的運(yùn)行時(shí)間曾棕、gc時(shí)間等等,如果發(fā)現(xiàn)gc太頻繁菜循,時(shí)間太長(zhǎng)翘地,就可以考慮調(diào)節(jié)Storage的內(nèi)存占比,讓task執(zhí)行算子函數(shù)式癌幕,有更多的內(nèi)存可以使用衙耕。
Storage內(nèi)存區(qū)域可以通過spark.storage.memoryFraction參數(shù)進(jìn)行指定,默認(rèn)為0.6勺远,即60%臭杰,可以逐級(jí)向下遞減。
內(nèi)存占比設(shè)置:
val conf = new SparkConf()
.set("spark.storage.memoryFraction", "0.4")
2). 統(tǒng)一內(nèi)存管理機(jī)制
根據(jù)Spark統(tǒng)一內(nèi)存管理機(jī)制谚中,堆內(nèi)存被劃分為了兩塊渴杆,Storage和Execution。Storage主要用于緩存數(shù)據(jù)宪塔,Execution主要用于緩存在shuffle過程中產(chǎn)生的中間數(shù)據(jù)磁奖,兩者所組成的內(nèi)存部分稱為統(tǒng)一內(nèi)存,Storage和Execution各占統(tǒng)一內(nèi)存的50%某筐,由于動(dòng)態(tài)占用機(jī)制的實(shí)現(xiàn)比搭,shuffle過程需要的內(nèi)存過大時(shí),會(huì)自動(dòng)占用Storage的內(nèi)存區(qū)域南誊,因此無需手動(dòng)進(jìn)行調(diào)節(jié)身诺。
2. 調(diào)節(jié)Executor堆外內(nèi)存
Executor的堆外內(nèi)存主要用于程序的共享庫蜜托、Perm Space、 線程Stack和一些Memory mapping等, 或者類C方式allocate object霉赡。
有時(shí)橄务,如果你的Spark作業(yè)處理的數(shù)據(jù)量非常大,達(dá)到幾億的數(shù)據(jù)量穴亏,此時(shí)運(yùn)行Spark作業(yè)會(huì)時(shí)不時(shí)地報(bào)錯(cuò)蜂挪,例如shuffle output file cannot find,executor lost嗓化,task lost棠涮,out of memory等,這可能是Executor的堆外內(nèi)存不太夠用刺覆,導(dǎo)致Executor在運(yùn)行的過程中內(nèi)存溢出严肪。
stage的task在運(yùn)行的時(shí)候,可能要從一些Executor中去拉取shuffle map output文件谦屑,但是Executor可能已經(jīng)由于內(nèi)存溢出掛掉了诬垂,其關(guān)聯(lián)的BlockManager也沒有了,這就可能會(huì)報(bào)出shuffle output file cannot find伦仍,executor lost结窘,task lost,out of memory等錯(cuò)誤充蓝,此時(shí)隧枫,就可以考慮調(diào)節(jié)一下Executor的堆外內(nèi)存,也就可以避免報(bào)錯(cuò)谓苟,與此同時(shí)官脓,堆外內(nèi)存調(diào)節(jié)的比較大的時(shí)候,對(duì)于性能來講涝焙,也會(huì)帶來一定的提升卑笨。
默認(rèn)情況下,Executor堆外內(nèi)存上限大概為300多MB仑撞,在實(shí)際的生產(chǎn)環(huán)境下赤兴,對(duì)海量數(shù)據(jù)進(jìn)行處理的時(shí)候,這里都會(huì)出現(xiàn)問題隧哮,導(dǎo)致Spark作業(yè)反復(fù)崩潰桶良,無法運(yùn)行,此時(shí)就會(huì)去調(diào)節(jié)這個(gè)參數(shù)沮翔,到至少1G陨帆,甚至于2G、4G。
Executor堆外內(nèi)存的配置需要在spark-submit腳本里配置:
--conf spark.yarn.executor.memoryOverhead=2048
以上參數(shù)配置完成后疲牵,會(huì)避免掉某些JVM OOM的異常問題承二,同時(shí),可以提升整體Spark作業(yè)的性能纲爸。
3. 調(diào)節(jié)連接等待時(shí)長(zhǎng)
在Spark作業(yè)運(yùn)行過程中亥鸠,Executor優(yōu)先從自己本地關(guān)聯(lián)的BlockManager中獲取某份數(shù)據(jù),如果本地BlockManager沒有的話缩焦,會(huì)通過TransferService遠(yuǎn)程連接其他節(jié)點(diǎn)上Executor的BlockManager來獲取數(shù)據(jù)。
如果task在運(yùn)行過程中創(chuàng)建大量對(duì)象或者創(chuàng)建的對(duì)象較大责静,會(huì)占用大量的內(nèi)存袁滥,這回導(dǎo)致頻繁的垃圾回收,但是垃圾回收會(huì)導(dǎo)致工作現(xiàn)場(chǎng)全部停止灾螃,也就是說题翻,垃圾回收一旦執(zhí)行,Spark的Executor進(jìn)程就會(huì)停止工作腰鬼,無法提供相應(yīng)嵌赠,此時(shí),由于沒有響應(yīng)熄赡,無法建立網(wǎng)絡(luò)連接姜挺,會(huì)導(dǎo)致網(wǎng)絡(luò)連接超時(shí)。
在生產(chǎn)環(huán)境下彼硫,有時(shí)會(huì)遇到file not found炊豪、file lost這類錯(cuò)誤,在這種情況下拧篮,很有可能是Executor的BlockManager在拉取數(shù)據(jù)的時(shí)候词渤,無法建立連接,然后超過默認(rèn)的連接等待時(shí)長(zhǎng)60s后串绩,宣告數(shù)據(jù)拉取失敗缺虐,如果反復(fù)嘗試都拉取不到數(shù)據(jù),可能會(huì)導(dǎo)致Spark作業(yè)的崩潰礁凡。這種情況也可能會(huì)導(dǎo)致DAGScheduler反復(fù)提交幾次stage高氮,TaskScheduler返回提交幾次task,大大延長(zhǎng)了我們的Spark作業(yè)的運(yùn)行時(shí)間顷牌。
連接等待時(shí)長(zhǎng)需要在spark-submit腳本中進(jìn)行設(shè)置:
--conf spark.core.connection.ack.wait.timeout=300
調(diào)節(jié)連接等待時(shí)長(zhǎng)后纫溃,通常可以避免部分的XX文件拉取失敗韧掩、XX文件lost等報(bào)錯(cuò)紊浩。