《Spark性能優(yōu)化:開發(fā)調(diào)優(yōu)篇》
《Spark性能優(yōu)化:資源調(diào)優(yōu)篇》
《Spark性能優(yōu)化:數(shù)據(jù)傾斜調(diào)優(yōu)》
《Spark性能優(yōu)化:shuffle調(diào)優(yōu)》
在大數(shù)據(jù)計(jì)算領(lǐng)域,Spark已經(jīng)成為了越來越流行二蓝、越來越受歡迎的計(jì)算平臺(tái)之一胸嘁。Spark的功能涵蓋了大數(shù)據(jù)領(lǐng)域的離線批處理、SQL類處理凹蜂、流式/實(shí)時(shí)計(jì)算馍驯、機(jī)器學(xué)習(xí)、圖計(jì)算等各種不同類型的計(jì)算操作玛痊,應(yīng)用范圍與前景非常廣泛汰瘫。
然而,通過Spark開發(fā)出高性能的大數(shù)據(jù)計(jì)算作業(yè)擂煞,并不是那么簡(jiǎn)單的混弥。如果沒有對(duì)Spark作業(yè)進(jìn)行合理的調(diào)優(yōu),Spark作業(yè)的執(zhí)行速度可能會(huì)很慢对省,這樣就完全體現(xiàn)不出Spark作為一種快速大數(shù)據(jù)計(jì)算引擎的優(yōu)勢(shì)來蝗拿。因此,想要用好Spark蒿涎,就必須對(duì)其進(jìn)行合理的性能優(yōu)化蛹磺。
Spark的性能調(diào)優(yōu)實(shí)際上是由很多部分組成的,不是調(diào)節(jié)幾個(gè)參數(shù)就可以立竿見影提升作業(yè)性能的同仆。我們需要根據(jù)不同的業(yè)務(wù)場(chǎng)景以及數(shù)據(jù)情況萤捆,對(duì)Spark作業(yè)進(jìn)行綜合性的分析,然后進(jìn)行多個(gè)方面的調(diào)節(jié)和優(yōu)化,才能獲得最佳性能俗或。
筆者根據(jù)之前的Spark作業(yè)開發(fā)經(jīng)驗(yàn)以及實(shí)踐積累市怎,總結(jié)出了一套Spark作業(yè)的性能優(yōu)化方案。整套方案主要分為開發(fā)調(diào)優(yōu)辛慰、資源調(diào)優(yōu)区匠、數(shù)據(jù)傾斜調(diào)優(yōu)、shuffle調(diào)優(yōu)幾個(gè)部分帅腌。開發(fā)調(diào)優(yōu)和資源調(diào)優(yōu)是所有Spark作業(yè)都需要注意和遵循的一些基本原則驰弄,是高性能Spark作業(yè)的基礎(chǔ);數(shù)據(jù)傾斜調(diào)優(yōu)速客,主要講解了一套完整的用來解決Spark作業(yè)數(shù)據(jù)傾斜的解決方案戚篙;shuffle調(diào)優(yōu),面向的是對(duì)Spark的原理有較深層次掌握和研究的同學(xué)溺职,主要講解了如何對(duì)Spark作業(yè)的shuffle運(yùn)行過程以及細(xì)節(jié)進(jìn)行調(diào)優(yōu)岔擂。
本文作為Spark性能優(yōu)化指南的基礎(chǔ)篇,主要講解開發(fā)調(diào)優(yōu)以及資源調(diào)優(yōu)浪耘。
開發(fā)調(diào)優(yōu)
Spark性能優(yōu)化的第一步乱灵,就是要在開發(fā)Spark作業(yè)的過程中注意和應(yīng)用一些性能優(yōu)化的基本原則。開發(fā)調(diào)優(yōu)七冲,就是要讓大家了解以下一些Spark基本開發(fā)原則痛倚,包括:RDD lineage設(shè)計(jì)、算子的合理使用澜躺、特殊操作的優(yōu)化等蝉稳。在開發(fā)過程中,時(shí)時(shí)刻刻都應(yīng)該注意以上原則苗踪,并將這些原則根據(jù)具體的業(yè)務(wù)以及實(shí)際的應(yīng)用場(chǎng)景,靈活地運(yùn)用到自己的Spark作業(yè)中削锰。
原則一:避免創(chuàng)建重復(fù)的RDD
通常來說通铲,我們?cè)陂_發(fā)一個(gè)Spark作業(yè)時(shí),首先是基于某個(gè)數(shù)據(jù)源(比如Hive表或HDFS文件)創(chuàng)建一個(gè)初始的RDD器贩;接著對(duì)這個(gè)RDD執(zhí)行某個(gè)算子操作颅夺,然后得到下一個(gè)RDD;以此類推蛹稍,循環(huán)往復(fù)吧黄,直到計(jì)算出最終我們需要的結(jié)果。在這個(gè)過程中唆姐,多個(gè)RDD會(huì)通過不同的算子操作(比如map拗慨、reduce等)串起來,這個(gè)“RDD串”,就是RDD lineage赵抢,也就是“RDD的血緣關(guān)系鏈”剧蹂。我們?cè)陂_發(fā)過程中要注意:對(duì)于同一份數(shù)據(jù),只應(yīng)該創(chuàng)建一個(gè)RDD烦却,不能創(chuàng)建多個(gè)RDD來代表同一份數(shù)據(jù)宠叼。一些Spark初學(xué)者在剛開始開發(fā)Spark作業(yè)時(shí),或者是有經(jīng)驗(yàn)的工程師在開發(fā)RDD lineage極其冗長(zhǎng)的Spark作業(yè)時(shí)其爵,可能會(huì)忘了自己之前對(duì)于某一份數(shù)據(jù)已經(jīng)創(chuàng)建過一個(gè)RDD了冒冬,從而導(dǎo)致對(duì)于同一份數(shù)據(jù),創(chuàng)建了多個(gè)RDD摩渺。這就意味著简烤,我們的Spark作業(yè)會(huì)進(jìn)行多次重復(fù)計(jì)算來創(chuàng)建多個(gè)代表相同數(shù)據(jù)的RDD,進(jìn)而增加了作業(yè)的性能開銷证逻。
一個(gè)簡(jiǎn)單的例子
//需要對(duì)名為“hello.txt”的HDFS文件進(jìn)行一次map操作乐埠,再進(jìn)行一次reduce操作
//也就是說,需要對(duì)一份數(shù)據(jù)執(zhí)行兩次算子操作囚企。
//錯(cuò)誤的做法:對(duì)于同一份數(shù)據(jù)執(zhí)行多次算子操作時(shí)丈咐,創(chuàng)建多個(gè)RDD。
//這里執(zhí)行了兩次textFile方法龙宏,針對(duì)同一個(gè)HDFS文件棵逊,創(chuàng)建了兩個(gè)RDD出來,
//然后分別對(duì)每個(gè)RDD都執(zhí)行了一個(gè)算子操作银酗。
//這種情況下辆影,Spark需要從HDFS上兩次加載hello.txt文件的內(nèi)容,并創(chuàng)建兩個(gè)單獨(dú)的RDD黍特;
//第二次加載HDFS文件以及創(chuàng)建RDD的性能開銷蛙讥,很明顯是白白浪費(fèi)掉的。
val rdd1 = sc.textFile("[hdfs://192.168.0.1:9000/hello.txt](hdfs://192.168.0.1:9000/hello.txt)")
rdd1.map(...)
val rdd2 = sc.textFile("[hdfs://192.168.0.1:9000/hello.txt](hdfs://192.168.0.1:9000/hello.txt)")
rdd2.reduce(...)
//正確的用法:對(duì)于一份數(shù)據(jù)執(zhí)行多次算子操作時(shí)灭衷,只使用一個(gè)RDD次慢。
//這種寫法很明顯比上一種寫法要好多了,因?yàn)槲覀儗?duì)于同一份數(shù)據(jù)只創(chuàng)建了一個(gè)RDD翔曲,
//然后對(duì)這一個(gè)RDD執(zhí)行了多次算子操作迫像。
//但是要注意到這里為止優(yōu)化還沒有結(jié)束,由于rdd1被執(zhí)行了兩次算子操作瞳遍,第二次執(zhí)行reduce操作的時(shí)候闻妓,
//還會(huì)再次從源頭處重新計(jì)算一次rdd1的數(shù)據(jù),因此還是會(huì)有重復(fù)計(jì)算的性能開銷掠械。
//要徹底解決這個(gè)問題由缆,必須結(jié)合“原則三:對(duì)多次使用的RDD進(jìn)行持久化”注祖,
//才能保證一個(gè)RDD被多次使用時(shí)只被計(jì)算一次。
val rdd1 = sc.textFile("[hdfs://192.168.0.1:9000/hello.txt](hdfs://192.168.0.1:9000/hello.txt)")
rdd1.map(...)
rdd1.reduce(...)
原則二:盡可能復(fù)用同一個(gè)RDD
除了要避免在開發(fā)過程中對(duì)一份完全相同的數(shù)據(jù)創(chuàng)建多個(gè)RDD之外犁功,在對(duì)不同的數(shù)據(jù)執(zhí)行算子操作時(shí)還要盡可能地復(fù)用一個(gè)RDD氓轰。比如說,有一個(gè)RDD的數(shù)據(jù)格式是key-value類型的浸卦,另一個(gè)是單value類型的署鸡,這兩個(gè)RDD的value數(shù)據(jù)是完全一樣的。那么此時(shí)我們可以只使用key-value類型的那個(gè)RDD限嫌,因?yàn)槠渲幸呀?jīng)包含了另一個(gè)的數(shù)據(jù)靴庆。對(duì)于類似這種多個(gè)RDD的數(shù)據(jù)有重疊或者包含的情況,我們應(yīng)該盡量復(fù)用一個(gè)RDD怒医,這樣可以盡可能地減少RDD的數(shù)量炉抒,從而盡可能減少算子執(zhí)行的次數(shù)。
一個(gè)簡(jiǎn)單的例子
// 錯(cuò)誤的做法稚叹。
// 有一個(gè)<long , String>格式的RDD焰薄,即rdd1。
// 接著由于業(yè)務(wù)需要扒袖,對(duì)rdd1執(zhí)行了一個(gè)map操作塞茅,創(chuàng)建了一個(gè)rdd2,
//而rdd2中的數(shù)據(jù)僅僅是rdd1中的value值而已季率,也就是說野瘦,rdd2是rdd1的子集。
JavaPairRDD</long><long , String> rdd1 = ...
JavaRDD<string> rdd2 = rdd1.map(...)
// 分別對(duì)rdd1和rdd2執(zhí)行了不同的算子操作飒泻。
rdd1.reduceByKey(...)
rdd2.map(...)
// 正確的做法鞭光。
// 上面這個(gè)case中,其實(shí)rdd1和rdd2的區(qū)別無(wú)非就是數(shù)據(jù)格式不同而已泞遗,
//rdd2的數(shù)據(jù)完全就是rdd1的子集而已惰许,卻創(chuàng)建了兩個(gè)rdd,并對(duì)兩個(gè)rdd都執(zhí)行了一次算子操作史辙。
// 此時(shí)會(huì)因?yàn)閷?duì)rdd1執(zhí)行map算子來創(chuàng)建rdd2汹买,而多執(zhí)行一次算子操作,進(jìn)而增加性能開銷髓霞。
// 其實(shí)在這種情況下完全可以復(fù)用同一個(gè)RDD卦睹。
// 我們可以使用rdd1畦戒,既做reduceByKey操作方库,也做map操作。
// 在進(jìn)行第二個(gè)map操作時(shí)障斋,只使用每個(gè)數(shù)據(jù)的tuple._2纵潦,也就是rdd1中的value值徐鹤,即可。
JavaPairRDD<long , String> rdd1 = ...
rdd1.reduceByKey(...)
rdd1.map(tuple._2...)
// 第二種方式相較于第一種方式而言邀层,很明顯減少了一次rdd2的計(jì)算開銷返敬。
// 但是到這里為止,優(yōu)化還沒有結(jié)束寥院,對(duì)rdd1我們還是執(zhí)行了兩次算子操作劲赠,rdd1實(shí)際上還是會(huì)被計(jì)算兩次。
// 因此還需要配合“原則三:對(duì)多次使用的RDD進(jìn)行持久化”進(jìn)行使用秸谢,
//才能保證一個(gè)RDD被多次使用時(shí)只被計(jì)算一次凛澎。
原則三:對(duì)多次使用的RDD進(jìn)行持久化
當(dāng)你在Spark代碼中多次對(duì)一個(gè)RDD做了算子操作后,恭喜估蹄,你已經(jīng)實(shí)現(xiàn)Spark作業(yè)第一步的優(yōu)化了塑煎,也就是盡可能復(fù)用RDD。此時(shí)就該在這個(gè)基礎(chǔ)之上臭蚁,進(jìn)行第二步優(yōu)化了最铁,也就是要保證對(duì)一個(gè)RDD執(zhí)行多次算子操作時(shí),這個(gè)RDD本身僅僅被計(jì)算一次垮兑。
Spark中對(duì)于一個(gè)RDD執(zhí)行多次算子的默認(rèn)原理是這樣的:每次你對(duì)一個(gè)RDD執(zhí)行一個(gè)算子操作時(shí)冷尉,都會(huì)重新從源頭處計(jì)算一遍,計(jì)算出那個(gè)RDD來甥角,然后再對(duì)這個(gè)RDD執(zhí)行你的算子操作网严。這種方式的性能是很差的。
因此對(duì)于這種情況嗤无,我們的建議是:對(duì)多次使用的RDD進(jìn)行持久化震束。此時(shí)Spark就會(huì)根據(jù)你的持久化策略,將RDD中的數(shù)據(jù)保存到內(nèi)存或者磁盤中当犯。以后每次對(duì)這個(gè)RDD進(jìn)行算子操作時(shí)垢村,都會(huì)直接從內(nèi)存或磁盤中提取持久化的RDD數(shù)據(jù),然后執(zhí)行算子嚎卫,而不會(huì)從源頭處重新計(jì)算一遍這個(gè)RDD嘉栓,再執(zhí)行算子操作。
對(duì)多次使用的RDD進(jìn)行持久化的代碼示例
// 如果要對(duì)一個(gè)RDD進(jìn)行持久化拓诸,只要對(duì)這個(gè)RDD調(diào)用cache()和persist()即可侵佃。
// 正確的做法。
// cache()方法表示:使用非序列化的方式將RDD中的數(shù)據(jù)全部嘗試持久化到內(nèi)存中奠支。
// 此時(shí)再對(duì)rdd1執(zhí)行兩次算子操作時(shí)馋辈,只有在第一次執(zhí)行map算子時(shí),才會(huì)將這個(gè)rdd1從源頭處計(jì)算一次倍谜。
// 第二次執(zhí)行reduce算子時(shí)迈螟,就會(huì)直接從內(nèi)存中提取數(shù)據(jù)進(jìn)行計(jì)算叉抡,不會(huì)重復(fù)計(jì)算一個(gè)rdd。
val rdd1 = sc.textFile("[hdfs://192.168.0.1:9000/hello.txt](hdfs://192.168.0.1:9000/hello.txt)").cache()
rdd1.map(...)
rdd1.reduce(...)
// persist()方法表示:手動(dòng)選擇持久化級(jí)別答毫,并使用指定的方式進(jìn)行持久化褥民。
// 比如說,StorageLevel.MEMORY_AND_DISK_SER表示洗搂,內(nèi)存充足時(shí)優(yōu)先持久化到內(nèi)存中消返,
//內(nèi)存不充足時(shí)持久化到磁盤文件中。
// 而且其中的_SER后綴表示耘拇,使用序列化的方式來保存RDD數(shù)據(jù)侦副,此時(shí)RDD中的每個(gè)partition
//都會(huì)序列化成一個(gè)大的字節(jié)數(shù)組,然后再持久化到內(nèi)存或磁盤中驼鞭。
// 序列化的方式可以減少持久化的數(shù)據(jù)對(duì)內(nèi)存/磁盤的占用量秦驯,進(jìn)而避免內(nèi)存被持久化數(shù)據(jù)占用過多,
//從而發(fā)生頻繁GC挣棕。
val rdd1 = sc.textFile("[hdfs://192.168.0.1:9000/hello.txt](hdfs://192.168.0.1:9000/hello.txt)")
.persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)
對(duì)于persist()方法而言译隘,我們可以根據(jù)不同的業(yè)務(wù)場(chǎng)景選擇不同的持久化級(jí)別。
Spark的持久化級(jí)別
持久化級(jí)別 | 含義解釋 |
---|---|
MEMORY_ONLY | 使用未序列化的Java對(duì)象格式洛心,將數(shù)據(jù)保存在內(nèi)存中固耘。如果內(nèi)存不夠存放所有的數(shù)據(jù),則數(shù)據(jù)可能就不會(huì)進(jìn)行持久化词身。那么下次對(duì)這個(gè)RDD執(zhí)行算子操作時(shí)厅目,那些沒有被持久化的數(shù)據(jù),需要從源頭處重新計(jì)算一遍法严。這是默認(rèn)的持久化策略损敷,使用cache()方法時(shí),實(shí)際就是使用的這種持久化策略深啤。 |
MEMORY_AND_DISK | 使用未序列化的Java對(duì)象格式拗馒,優(yōu)先嘗試將數(shù)據(jù)保存在內(nèi)存中。如果內(nèi)存不夠存放所有的數(shù)據(jù)溯街,會(huì)將數(shù)據(jù)寫入磁盤文件中诱桂,下次對(duì)這個(gè)RDD執(zhí)行算子時(shí),持久化在磁盤文件中的數(shù)據(jù)會(huì)被讀取出來使用呈昔。 |
MEMORY_ONLY_SER | 基本含義同MEMORY_ONLY挥等。唯一的區(qū)別是,會(huì)將RDD中的數(shù)據(jù)進(jìn)行序列化堤尾,RDD的每個(gè)partition會(huì)被序列化成一個(gè)字節(jié)數(shù)組肝劲。這種方式更加節(jié)省內(nèi)存,從而可以避免持久化的數(shù)據(jù)占用過多內(nèi)存導(dǎo)致頻繁GC哀峻。 |
MEMORY_AND_DISK_SER | 基本含義同MEMORY_AND_DISK涡相。唯一的區(qū)別是,會(huì)將RDD中的數(shù)據(jù)進(jìn)行序列化剩蟀,RDD的每個(gè)partition會(huì)被序列化成一個(gè)字節(jié)數(shù)組催蝗。這種方式更加節(jié)省內(nèi)存,從而可以避免持久化的數(shù)據(jù)占用過多內(nèi)存導(dǎo)致頻繁GC育特。 |
DISK_ONLY | 使用未序列化的Java對(duì)象格式丙号,將數(shù)據(jù)全部寫入磁盤文件中。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等. | 對(duì)于上述任意一種持久化策略缰冤,如果加上后綴_2犬缨,代表的是將每個(gè)持久化的數(shù)據(jù),都復(fù)制一份副本棉浸,并將副本保存到其他節(jié)點(diǎn)上怀薛。這種基于副本的持久化機(jī)制主要用于進(jìn)行容錯(cuò)。假如某個(gè)節(jié)點(diǎn)掛掉迷郑,節(jié)點(diǎn)的內(nèi)存或磁盤中的持久化數(shù)據(jù)丟失了枝恋,那么后續(xù)對(duì)RDD計(jì)算時(shí)還可以使用該數(shù)據(jù)在其他節(jié)點(diǎn)上的副本。如果沒有副本的話嗡害,就只能將這些數(shù)據(jù)從源頭處重新計(jì)算一遍了焚碌。 |
如何選擇一種最合適的持久化策略
1、默認(rèn)情況下霸妹,性能最高的當(dāng)然是MEMORY_ONLY十电,但前提是你的內(nèi)存必須足夠足夠大,可以綽綽有余地存放下整個(gè)RDD的所有數(shù)據(jù)叹螟。因?yàn)椴贿M(jìn)行序列化與反序列化操作鹃骂,就避免了這部分的性能開銷;對(duì)這個(gè)RDD的后續(xù)算子操作罢绽,都是基于純內(nèi)存中的數(shù)據(jù)的操作偎漫,不需要從磁盤文件中讀取數(shù)據(jù),性能也很高有缆;而且不需要復(fù)制一份數(shù)據(jù)副本象踊,并遠(yuǎn)程傳送到其他節(jié)點(diǎn)上。但是這里必須要注意的是棚壁,在實(shí)際的生產(chǎn)環(huán)境中杯矩,恐怕能夠直接用這種策略的場(chǎng)景還是有限的,如果RDD中數(shù)據(jù)比較多時(shí)(比如幾十億)袖外,直接用這種持久化級(jí)別史隆,會(huì)導(dǎo)致JVM的OOM內(nèi)存溢出異常。
2曼验、如果使用MEMORY_ONLY級(jí)別時(shí)發(fā)生了內(nèi)存溢出泌射,那么建議嘗試使用MEMORY_ONLY_SER級(jí)別粘姜。該級(jí)別會(huì)將RDD數(shù)據(jù)序列化后再保存在內(nèi)存中,此時(shí)每個(gè)partition僅僅是一個(gè)字節(jié)數(shù)組而已熔酷,大大減少了對(duì)象數(shù)量孤紧,并降低了內(nèi)存占用。這種級(jí)別比MEMORY_ONLY多出來的性能開銷拒秘,主要就是序列化與反序列化的開銷号显。但是后續(xù)算子可以基于純內(nèi)存進(jìn)行操作,因此性能總體還是比較高的躺酒。此外押蚤,可能發(fā)生的問題同上,如果RDD中的數(shù)據(jù)量過多的話羹应,還是可能會(huì)導(dǎo)致OOM內(nèi)存溢出的異常揽碘。
3、如果純內(nèi)存的級(jí)別都無(wú)法使用园匹,那么建議使用MEMORY_AND_DISK_SER策略钾菊,而不是MEMORY_AND_DISK策略。因?yàn)榧热坏搅诉@一步偎肃,就說明RDD的數(shù)據(jù)量很大煞烫,內(nèi)存無(wú)法完全放下。序列化后的數(shù)據(jù)比較少累颂,可以節(jié)省內(nèi)存和磁盤的空間開銷滞详。同時(shí)該策略會(huì)優(yōu)先盡量嘗試將數(shù)據(jù)緩存在內(nèi)存中,內(nèi)存緩存不下才會(huì)寫入磁盤紊馏。
4料饥、通常不建議使用DISK_ONLY和后綴為_2的級(jí)別:因?yàn)橥耆诖疟P文件進(jìn)行數(shù)據(jù)的讀寫,會(huì)導(dǎo)致性能急劇降低朱监,有時(shí)還不如重新計(jì)算一次所有RDD岸啡。后綴為_2的級(jí)別,必須將所有數(shù)據(jù)都復(fù)制一份副本赫编,并發(fā)送到其他節(jié)點(diǎn)上巡蘸,數(shù)據(jù)復(fù)制以及網(wǎng)絡(luò)傳輸會(huì)導(dǎo)致較大的性能開銷,除非是要求作業(yè)的高可用性擂送,否則不建議使用悦荒。
原則四:盡量避免使用shuffle類算子
如果有可能的話,要盡量避免使用shuffle類算子嘹吨。因?yàn)镾park作業(yè)運(yùn)行過程中搬味,最消耗性能的地方就是shuffle過程。shuffle過程,簡(jiǎn)單來說碰纬,就是將分布在集群中多個(gè)節(jié)點(diǎn)上的同一個(gè)key萍聊,拉取到同一個(gè)節(jié)點(diǎn)上,進(jìn)行聚合或join等操作悦析。比如reduceByKey寿桨、join等算子,都會(huì)觸發(fā)shuffle操作她按。
shuffle過程中,各個(gè)節(jié)點(diǎn)上的相同key都會(huì)先寫入本地磁盤文件中炕柔,然后其他節(jié)點(diǎn)需要通過網(wǎng)絡(luò)傳輸拉取各個(gè)節(jié)點(diǎn)上的磁盤文件中的相同key酌泰。而且相同key都拉取到同一個(gè)節(jié)點(diǎn)進(jìn)行聚合操作時(shí),還有可能會(huì)因?yàn)橐粋€(gè)節(jié)點(diǎn)上處理的key過多匕累,導(dǎo)致內(nèi)存不夠存放陵刹,進(jìn)而溢寫到磁盤文件中。因此在shuffle過程中欢嘿,可能會(huì)發(fā)生大量的磁盤文件讀寫的IO操作衰琐,以及數(shù)據(jù)的網(wǎng)絡(luò)傳輸操作。磁盤IO和網(wǎng)絡(luò)數(shù)據(jù)傳輸也是shuffle性能較差的主要原因炼蹦。
因此在我們的開發(fā)過程中羡宙,能避免則盡可能避免使用reduceByKey、join掐隐、distinct狗热、repartition等會(huì)進(jìn)行shuffle的算子,盡量使用map類的非shuffle算子虑省。這樣的話匿刮,沒有shuffle操作或者僅有較少shuffle操作的Spark作業(yè),可以大大減少性能開銷探颈。
Broadcast與map進(jìn)行join代碼示例
// 傳統(tǒng)的join操作會(huì)導(dǎo)致shuffle操作熟丸。
// 因?yàn)閮蓚€(gè)RDD中,相同的key都需要通過網(wǎng)絡(luò)拉取到一個(gè)節(jié)點(diǎn)上伪节,由一個(gè)task進(jìn)行join操作光羞。
val rdd3 = rdd1.join(rdd2)
// Broadcast+map的join操作,不會(huì)導(dǎo)致shuffle操作怀大。
// 使用Broadcast將一個(gè)數(shù)據(jù)量較小的RDD作為廣播變量狞山。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
// 在rdd1.map算子中,可以從rdd2DataBroadcast中叉寂,獲取rdd2的所有數(shù)據(jù)萍启。
// 然后進(jìn)行遍歷,如果發(fā)現(xiàn)rdd2中某條數(shù)據(jù)的key與rdd1的當(dāng)前數(shù)據(jù)的key是相同的,
//那么就判定可以進(jìn)行join勘纯。
// 此時(shí)就可以根據(jù)自己需要的方式局服,將rdd1當(dāng)前數(shù)據(jù)與rdd2中可以連接的數(shù)據(jù),
//拼接在一起(String或Tuple)驳遵。
val rdd3 = rdd1.map(rdd2DataBroadcast...)
// 注意淫奔,以上操作,建議僅僅在rdd2的數(shù)據(jù)量比較少(比如幾百M(fèi)堤结,或者一兩G)的情況下使用唆迁。
// 因?yàn)槊總€(gè)Executor的內(nèi)存中,都會(huì)駐留一份rdd2的全量數(shù)據(jù)竞穷。
原則五:使用map-side預(yù)聚合的shuffle操作
如果因?yàn)闃I(yè)務(wù)需要唐责,一定要使用shuffle操作,無(wú)法用map類的算子來替代瘾带,那么盡量使用可以map-side預(yù)聚合的算子鼠哥。
所謂的map-side預(yù)聚合,說的是在每個(gè)節(jié)點(diǎn)本地對(duì)相同的key進(jìn)行一次聚合操作看政,類似于MapReduce中的本地combiner朴恳。map-side預(yù)聚合之后,每個(gè)節(jié)點(diǎn)本地就只會(huì)有一條相同的key允蚣,因?yàn)槎鄺l相同的key都被聚合起來了于颖。其他節(jié)點(diǎn)在拉取所有節(jié)點(diǎn)上的相同key時(shí),就會(huì)大大減少需要拉取的數(shù)據(jù)數(shù)量嚷兔,從而也就減少了磁盤IO以及網(wǎng)絡(luò)傳輸開銷恍飘。通常來說,在可能的情況下谴垫,建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子章母。因?yàn)閞educeByKey和aggregateByKey算子都會(huì)使用用戶自定義的函數(shù)對(duì)每個(gè)節(jié)點(diǎn)本地的相同key進(jìn)行預(yù)聚合。而groupByKey算子是不會(huì)進(jìn)行預(yù)聚合的翩剪,全量的數(shù)據(jù)會(huì)在集群的各個(gè)節(jié)點(diǎn)之間分發(fā)和傳輸乳怎,性能相對(duì)來說比較差。
比如下圖前弯,就是典型的例子蚪缀,分別基于reduceByKey和groupByKey進(jìn)行單詞計(jì)數(shù)。其中第一張圖是groupByKey的原理圖恕出,可以看到询枚,沒有進(jìn)行任何本地聚合時(shí),所有數(shù)據(jù)都會(huì)在集群節(jié)點(diǎn)之間傳輸浙巫;第二張圖是reduceByKey的原理圖金蜀,可以看到刷后,每個(gè)節(jié)點(diǎn)本地的相同key數(shù)據(jù),都進(jìn)行了預(yù)聚合渊抄,然后才傳輸?shù)狡渌?jié)點(diǎn)上進(jìn)行全局聚合尝胆。
原則六:使用高性能的算子
除了shuffle相關(guān)的算子有優(yōu)化原則之外,其他的算子也都有著相應(yīng)的優(yōu)化原則护桦。
使用reduceByKey/aggregateByKey替代groupByKey
詳情見“原則五:使用map-side預(yù)聚合的shuffle操作”含衔。
使用mapPartitions替代普通map
mapPartitions類的算子,一次函數(shù)調(diào)用會(huì)處理一個(gè)partition所有的數(shù)據(jù)二庵,而不是一次函數(shù)調(diào)用處理一條贪染,性能相對(duì)來說會(huì)高一些。但是有的時(shí)候催享,使用mapPartitions會(huì)出現(xiàn)OOM(內(nèi)存溢出)的問題杭隙。因?yàn)閱未魏瘮?shù)調(diào)用就要處理掉一個(gè)partition所有的數(shù)據(jù),如果內(nèi)存不夠睡陪,垃圾回收時(shí)是無(wú)法回收掉太多對(duì)象的寺渗,很可能出現(xiàn)OOM異常匿情。所以使用這類操作時(shí)要慎重兰迫!
使用foreachPartitions替代foreach
原理類似于“使用mapPartitions替代map”,也是一次函數(shù)調(diào)用處理一個(gè)partition的所有數(shù)據(jù)炬称,而不是一次函數(shù)調(diào)用處理一條數(shù)據(jù)汁果。在實(shí)踐中發(fā)現(xiàn),foreachPartitions類的算子玲躯,對(duì)性能的提升還是很有幫助的据德。比如在foreach函數(shù)中,將RDD中所有數(shù)據(jù)寫MySQL跷车,那么如果是普通的foreach算子棘利,就會(huì)一條數(shù)據(jù)一條數(shù)據(jù)地寫,每次函數(shù)調(diào)用可能就會(huì)創(chuàng)建一個(gè)數(shù)據(jù)庫(kù)連接朽缴,此時(shí)就勢(shì)必會(huì)頻繁地創(chuàng)建和銷毀數(shù)據(jù)庫(kù)連接善玫,性能是非常低下;但是如果用foreachPartitions算子一次性處理一個(gè)partition的數(shù)據(jù)密强,那么對(duì)于每個(gè)partition茅郎,只要?jiǎng)?chuàng)建一個(gè)數(shù)據(jù)庫(kù)連接即可,然后執(zhí)行批量插入操作或渤,此時(shí)性能是比較高的系冗。實(shí)踐中發(fā)現(xiàn),對(duì)于1萬(wàn)條左右的數(shù)據(jù)量寫MySQL薪鹦,性能可以提升30%以上掌敬。
使用filter之后進(jìn)行coalesce操作
通常對(duì)一個(gè)RDD執(zhí)行filter算子過濾掉RDD中較多數(shù)據(jù)后(比如30%以上的數(shù)據(jù))惯豆,建議使用coalesce算子征堪,手動(dòng)減少RDD的partition數(shù)量攻柠,將RDD中的數(shù)據(jù)壓縮到更少的partition中去。因?yàn)閒ilter之后惠遏,RDD的每個(gè)partition中都會(huì)有很多數(shù)據(jù)被過濾掉舀武,此時(shí)如果照常進(jìn)行后續(xù)的計(jì)算拄养,其實(shí)每個(gè)task處理的partition中的數(shù)據(jù)量并不是很多,有一點(diǎn)資源浪費(fèi)银舱,而且此時(shí)處理的task越多瘪匿,可能速度反而越慢。因此用coalesce減少partition數(shù)量寻馏,將RDD中的數(shù)據(jù)壓縮到更少的partition之后棋弥,只要使用更少的task即可處理完所有的partition。在某些場(chǎng)景下诚欠,對(duì)于性能的提升會(huì)有一定的幫助顽染。
使用repartitionAndSortWithinPartitions替代repartition與sort類操作
repartitionAndSortWithinPartitions是Spark官網(wǎng)推薦的一個(gè)算子,官方建議轰绵,如果需要在repartition重分區(qū)之后粉寞,還要進(jìn)行排序,建議直接使用repartitionAndSortWithinPartitions算子左腔。因?yàn)樵撍阕涌梢砸贿呥M(jìn)行重分區(qū)的shuffle操作唧垦,一邊進(jìn)行排序。shuffle與sort兩個(gè)操作同時(shí)進(jìn)行液样,比先shuffle再sort來說振亮,性能可能是要高的。
原則七:廣播大變量
有時(shí)在開發(fā)過程中鞭莽,會(huì)遇到需要在算子函數(shù)中使用外部變量的場(chǎng)景(尤其是大變量坊秸,比如100M以上的大集合),那么此時(shí)就應(yīng)該使用Spark的廣播(Broadcast)功能來提升性能澎怒。
在算子函數(shù)中使用到外部變量時(shí)褒搔,默認(rèn)情況下,Spark會(huì)將該變量復(fù)制多個(gè)副本丹拯,通過網(wǎng)絡(luò)傳輸?shù)絫ask中站超,此時(shí)每個(gè)task都有一個(gè)變量副本。如果變量本身比較大的話(比如100M乖酬,甚至1G)死相,那么大量的變量副本在網(wǎng)絡(luò)中傳輸?shù)男阅荛_銷,以及在各個(gè)節(jié)點(diǎn)的Executor中占用過多內(nèi)存導(dǎo)致的頻繁GC咬像,都會(huì)極大地影響性能算撮。
因此對(duì)于上述情況生宛,如果使用的外部變量比較大,建議使用Spark的廣播功能肮柜,對(duì)該變量進(jìn)行廣播陷舅。廣播后的變量,會(huì)保證每個(gè)Executor的內(nèi)存中审洞,只駐留一份變量副本莱睁,而Executor中的task執(zhí)行時(shí)共享該Executor中的那份變量副本。這樣的話芒澜,可以大大減少變量副本的數(shù)量仰剿,從而減少網(wǎng)絡(luò)傳輸?shù)男阅荛_銷,并減少對(duì)Executor內(nèi)存的占用開銷痴晦,降低GC的頻率南吮。
廣播大變量的代碼示例
// 以下代碼在算子函數(shù)中,使用了外部的變量誊酌。
// 此時(shí)沒有做任何特殊操作部凑,每個(gè)task都會(huì)有一份list1的副本。
val list1 = ...
rdd1.map(list1...)
// 以下代碼將list1封裝成了Broadcast類型的廣播變量碧浊。
// 在算子函數(shù)中涂邀,使用廣播變量時(shí),首先會(huì)判斷當(dāng)前task所在Executor內(nèi)存中辉词,是否有變量副本必孤。
// 如果有則直接使用猾骡;如果沒有則從Driver或者其他Executor節(jié)點(diǎn)上遠(yuǎn)程拉取一份放到本地Executor內(nèi)存中瑞躺。
// 每個(gè)Executor內(nèi)存中,就只會(huì)駐留一份廣播變量副本兴想。
val list1 = ...
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast...)
原則八:使用Kryo優(yōu)化序列化性能
在Spark中幢哨,主要有三個(gè)地方涉及到了序列化:
1、在算子函數(shù)中使用到外部變量時(shí)嫂便,該變量會(huì)被序列化后進(jìn)行網(wǎng)絡(luò)傳輸(見“原則七:廣播大變量”中的講解)捞镰。
2、將自定義的類型作為RDD的泛型類型時(shí)(比如JavaRDD毙替,Student是自定義類型)岸售,所有自定義類型對(duì)象,都會(huì)進(jìn)行序列化厂画。因此這種情況下凸丸,也要求自定義的類必須實(shí)現(xiàn)Serializable接口。
3袱院、使用可序列化的持久化策略時(shí)(比如MEMORY_ONLY_SER)屎慢,Spark會(huì)將RDD中的每個(gè)partition都序列化成一個(gè)大的字節(jié)數(shù)組瞭稼。
對(duì)于這三種出現(xiàn)序列化的地方,我們都可以通過使用Kryo序列化類庫(kù)腻惠,來優(yōu)化序列化和反序列化的性能环肘。Spark默認(rèn)使用的是Java的序列化機(jī)制,也就是ObjectOutputStream/ObjectInputStream API來進(jìn)行序列化和反序列化集灌。但是Spark同時(shí)支持使用Kryo序列化庫(kù)悔雹,Kryo序列化類庫(kù)的性能比Java序列化類庫(kù)的性能要高很多。官方介紹欣喧,Kryo序列化機(jī)制比Java序列化機(jī)制荠商,性能高10倍左右。Spark之所以默認(rèn)沒有使用Kryo作為序列化類庫(kù)续誉,是因?yàn)镵ryo要求最好要注冊(cè)所有需要進(jìn)行序列化的自定義類型莱没,因此對(duì)于開發(fā)者來說,這種方式比較麻煩酷鸦。
以下是使用Kryo的代碼示例饰躲,我們只要設(shè)置序列化類,再注冊(cè)要序列化的自定義類型即可(比如算子函數(shù)中使用到的外部變量類型臼隔、作為RDD泛型類型的自定義類型等):
// 創(chuàng)建SparkConf對(duì)象嘹裂。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 設(shè)置序列化器為KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注冊(cè)要序列化的自定義類型摔握。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
原則九:優(yōu)化數(shù)據(jù)結(jié)構(gòu)
Java中寄狼,有三種類型比較耗費(fèi)內(nèi)存:
1、對(duì)象氨淌,每個(gè)Java對(duì)象都有對(duì)象頭泊愧、引用等額外的信息,因此比較占用內(nèi)存空間盛正。
2删咱、字符串,每個(gè)字符串內(nèi)部都有一個(gè)字符數(shù)組以及長(zhǎng)度等額外信息豪筝。
3痰滋、集合類型,比如HashMap续崖、LinkedList等敲街,因?yàn)榧项愋蛢?nèi)部通常會(huì)使用一些內(nèi)部類來封裝集合元素,比如Map.Entry严望。
因此Spark官方建議多艇,在Spark編碼實(shí)現(xiàn)中,特別是對(duì)于算子函數(shù)中的代碼著蟹,盡量不要使用上述三種數(shù)據(jù)結(jié)構(gòu)墩蔓,盡量使用字符串替代對(duì)象梢莽,使用原始類型(比如Int、Long)替代字符串奸披,使用數(shù)組替代集合類型昏名,這樣盡可能地減少內(nèi)存占用,從而降低GC頻率阵面,提升性能轻局。
但是在筆者的編碼實(shí)踐中發(fā)現(xiàn),要做到該原則其實(shí)并不容易样刷。因?yàn)槲覀兺瑫r(shí)要考慮到代碼的可維護(hù)性仑扑,如果一個(gè)代碼中,完全沒有任何對(duì)象抽象置鼻,全部是字符串拼接的方式镇饮,那么對(duì)于后續(xù)的代碼維護(hù)和修改,無(wú)疑是一場(chǎng)巨大的災(zāi)難箕母。同理储藐,如果所有操作都基于數(shù)組實(shí)現(xiàn),而不使用HashMap嘶是、LinkedList等集合類型钙勃,那么對(duì)于我們的編碼難度以及代碼可維護(hù)性,也是一個(gè)極大的挑戰(zhàn)聂喇。因此筆者建議辖源,在可能以及合適的情況下,使用占用內(nèi)存較少的數(shù)據(jù)結(jié)構(gòu)希太,但是前提是要保證代碼的可維護(hù)性克饶。
轉(zhuǎn)載自過往記憶(https://www.iteblog.com/)