Spark性能優(yōu)化:開發(fā)調(diào)優(yōu)篇(轉(zhuǎn))

《Spark性能優(yōu)化:開發(fā)調(diào)優(yōu)篇》
《Spark性能優(yōu)化:資源調(diào)優(yōu)篇》
《Spark性能優(yōu)化:數(shù)據(jù)傾斜調(diào)優(yōu)》
《Spark性能優(yōu)化:shuffle調(diào)優(yōu)》
 
在大數(shù)據(jù)計(jì)算領(lǐng)域,Spark已經(jīng)成為了越來越流行二蓝、越來越受歡迎的計(jì)算平臺(tái)之一胸嘁。Spark的功能涵蓋了大數(shù)據(jù)領(lǐng)域的離線批處理、SQL類處理凹蜂、流式/實(shí)時(shí)計(jì)算馍驯、機(jī)器學(xué)習(xí)、圖計(jì)算等各種不同類型的計(jì)算操作玛痊,應(yīng)用范圍與前景非常廣泛汰瘫。

然而,通過Spark開發(fā)出高性能的大數(shù)據(jù)計(jì)算作業(yè)擂煞,并不是那么簡(jiǎn)單的混弥。如果沒有對(duì)Spark作業(yè)進(jìn)行合理的調(diào)優(yōu),Spark作業(yè)的執(zhí)行速度可能會(huì)很慢对省,這樣就完全體現(xiàn)不出Spark作為一種快速大數(shù)據(jù)計(jì)算引擎的優(yōu)勢(shì)來蝗拿。因此,想要用好Spark蒿涎,就必須對(duì)其進(jìn)行合理的性能優(yōu)化蛹磺。

Spark的性能調(diào)優(yōu)實(shí)際上是由很多部分組成的,不是調(diào)節(jié)幾個(gè)參數(shù)就可以立竿見影提升作業(yè)性能的同仆。我們需要根據(jù)不同的業(yè)務(wù)場(chǎng)景以及數(shù)據(jù)情況萤捆,對(duì)Spark作業(yè)進(jìn)行綜合性的分析,然后進(jìn)行多個(gè)方面的調(diào)節(jié)和優(yōu)化,才能獲得最佳性能俗或。

筆者根據(jù)之前的Spark作業(yè)開發(fā)經(jīng)驗(yàn)以及實(shí)踐積累市怎,總結(jié)出了一套Spark作業(yè)的性能優(yōu)化方案。整套方案主要分為開發(fā)調(diào)優(yōu)辛慰、資源調(diào)優(yōu)区匠、數(shù)據(jù)傾斜調(diào)優(yōu)、shuffle調(diào)優(yōu)幾個(gè)部分帅腌。開發(fā)調(diào)優(yōu)和資源調(diào)優(yōu)是所有Spark作業(yè)都需要注意和遵循的一些基本原則驰弄,是高性能Spark作業(yè)的基礎(chǔ);數(shù)據(jù)傾斜調(diào)優(yōu)速客,主要講解了一套完整的用來解決Spark作業(yè)數(shù)據(jù)傾斜的解決方案戚篙;shuffle調(diào)優(yōu),面向的是對(duì)Spark的原理有較深層次掌握和研究的同學(xué)溺职,主要講解了如何對(duì)Spark作業(yè)的shuffle運(yùn)行過程以及細(xì)節(jié)進(jìn)行調(diào)優(yōu)岔擂。

本文作為Spark性能優(yōu)化指南的基礎(chǔ)篇,主要講解開發(fā)調(diào)優(yōu)以及資源調(diào)優(yōu)浪耘。

開發(fā)調(diào)優(yōu)

Spark性能優(yōu)化的第一步乱灵,就是要在開發(fā)Spark作業(yè)的過程中注意和應(yīng)用一些性能優(yōu)化的基本原則。開發(fā)調(diào)優(yōu)七冲,就是要讓大家了解以下一些Spark基本開發(fā)原則痛倚,包括:RDD lineage設(shè)計(jì)、算子的合理使用澜躺、特殊操作的優(yōu)化等蝉稳。在開發(fā)過程中,時(shí)時(shí)刻刻都應(yīng)該注意以上原則苗踪,并將這些原則根據(jù)具體的業(yè)務(wù)以及實(shí)際的應(yīng)用場(chǎng)景,靈活地運(yùn)用到自己的Spark作業(yè)中削锰。

原則一:避免創(chuàng)建重復(fù)的RDD

通常來說通铲,我們?cè)陂_發(fā)一個(gè)Spark作業(yè)時(shí),首先是基于某個(gè)數(shù)據(jù)源(比如Hive表或HDFS文件)創(chuàng)建一個(gè)初始的RDD器贩;接著對(duì)這個(gè)RDD執(zhí)行某個(gè)算子操作颅夺,然后得到下一個(gè)RDD;以此類推蛹稍,循環(huán)往復(fù)吧黄,直到計(jì)算出最終我們需要的結(jié)果。在這個(gè)過程中唆姐,多個(gè)RDD會(huì)通過不同的算子操作(比如map拗慨、reduce等)串起來,這個(gè)“RDD串”,就是RDD lineage赵抢,也就是“RDD的血緣關(guān)系鏈”剧蹂。我們?cè)陂_發(fā)過程中要注意:對(duì)于同一份數(shù)據(jù),只應(yīng)該創(chuàng)建一個(gè)RDD烦却,不能創(chuàng)建多個(gè)RDD來代表同一份數(shù)據(jù)宠叼。一些Spark初學(xué)者在剛開始開發(fā)Spark作業(yè)時(shí),或者是有經(jīng)驗(yàn)的工程師在開發(fā)RDD lineage極其冗長(zhǎng)的Spark作業(yè)時(shí)其爵,可能會(huì)忘了自己之前對(duì)于某一份數(shù)據(jù)已經(jīng)創(chuàng)建過一個(gè)RDD了冒冬,從而導(dǎo)致對(duì)于同一份數(shù)據(jù),創(chuàng)建了多個(gè)RDD摩渺。這就意味著简烤,我們的Spark作業(yè)會(huì)進(jìn)行多次重復(fù)計(jì)算來創(chuàng)建多個(gè)代表相同數(shù)據(jù)的RDD,進(jìn)而增加了作業(yè)的性能開銷证逻。

一個(gè)簡(jiǎn)單的例子

//需要對(duì)名為“hello.txt”的HDFS文件進(jìn)行一次map操作乐埠,再進(jìn)行一次reduce操作
//也就是說,需要對(duì)一份數(shù)據(jù)執(zhí)行兩次算子操作囚企。
//錯(cuò)誤的做法:對(duì)于同一份數(shù)據(jù)執(zhí)行多次算子操作時(shí)丈咐,創(chuàng)建多個(gè)RDD。
//這里執(zhí)行了兩次textFile方法龙宏,針對(duì)同一個(gè)HDFS文件棵逊,創(chuàng)建了兩個(gè)RDD出來,
//然后分別對(duì)每個(gè)RDD都執(zhí)行了一個(gè)算子操作银酗。
//這種情況下辆影,Spark需要從HDFS上兩次加載hello.txt文件的內(nèi)容,并創(chuàng)建兩個(gè)單獨(dú)的RDD黍特;
//第二次加載HDFS文件以及創(chuàng)建RDD的性能開銷蛙讥,很明顯是白白浪費(fèi)掉的。
val rdd1 = sc.textFile("[hdfs://192.168.0.1:9000/hello.txt](hdfs://192.168.0.1:9000/hello.txt)")
rdd1.map(...)
val rdd2 = sc.textFile("[hdfs://192.168.0.1:9000/hello.txt](hdfs://192.168.0.1:9000/hello.txt)")
rdd2.reduce(...)
//正確的用法:對(duì)于一份數(shù)據(jù)執(zhí)行多次算子操作時(shí)灭衷,只使用一個(gè)RDD次慢。
//這種寫法很明顯比上一種寫法要好多了,因?yàn)槲覀儗?duì)于同一份數(shù)據(jù)只創(chuàng)建了一個(gè)RDD翔曲,
//然后對(duì)這一個(gè)RDD執(zhí)行了多次算子操作迫像。
//但是要注意到這里為止優(yōu)化還沒有結(jié)束,由于rdd1被執(zhí)行了兩次算子操作瞳遍,第二次執(zhí)行reduce操作的時(shí)候闻妓,
//還會(huì)再次從源頭處重新計(jì)算一次rdd1的數(shù)據(jù),因此還是會(huì)有重復(fù)計(jì)算的性能開銷掠械。
//要徹底解決這個(gè)問題由缆,必須結(jié)合“原則三:對(duì)多次使用的RDD進(jìn)行持久化”注祖,
//才能保證一個(gè)RDD被多次使用時(shí)只被計(jì)算一次。
val rdd1 = sc.textFile("[hdfs://192.168.0.1:9000/hello.txt](hdfs://192.168.0.1:9000/hello.txt)")
rdd1.map(...)
rdd1.reduce(...)

原則二:盡可能復(fù)用同一個(gè)RDD

除了要避免在開發(fā)過程中對(duì)一份完全相同的數(shù)據(jù)創(chuàng)建多個(gè)RDD之外犁功,在對(duì)不同的數(shù)據(jù)執(zhí)行算子操作時(shí)還要盡可能地復(fù)用一個(gè)RDD氓轰。比如說,有一個(gè)RDD的數(shù)據(jù)格式是key-value類型的浸卦,另一個(gè)是單value類型的署鸡,這兩個(gè)RDD的value數(shù)據(jù)是完全一樣的。那么此時(shí)我們可以只使用key-value類型的那個(gè)RDD限嫌,因?yàn)槠渲幸呀?jīng)包含了另一個(gè)的數(shù)據(jù)靴庆。對(duì)于類似這種多個(gè)RDD的數(shù)據(jù)有重疊或者包含的情況,我們應(yīng)該盡量復(fù)用一個(gè)RDD怒医,這樣可以盡可能地減少RDD的數(shù)量炉抒,從而盡可能減少算子執(zhí)行的次數(shù)。

一個(gè)簡(jiǎn)單的例子


// 錯(cuò)誤的做法稚叹。
// 有一個(gè)<long , String>格式的RDD焰薄,即rdd1。
// 接著由于業(yè)務(wù)需要扒袖,對(duì)rdd1執(zhí)行了一個(gè)map操作塞茅,創(chuàng)建了一個(gè)rdd2,
//而rdd2中的數(shù)據(jù)僅僅是rdd1中的value值而已季率,也就是說野瘦,rdd2是rdd1的子集。
JavaPairRDD</long><long , String> rdd1 = ...
JavaRDD<string> rdd2 = rdd1.map(...)
// 分別對(duì)rdd1和rdd2執(zhí)行了不同的算子操作飒泻。
rdd1.reduceByKey(...)
rdd2.map(...)
// 正確的做法鞭光。
// 上面這個(gè)case中,其實(shí)rdd1和rdd2的區(qū)別無(wú)非就是數(shù)據(jù)格式不同而已泞遗,
//rdd2的數(shù)據(jù)完全就是rdd1的子集而已惰许,卻創(chuàng)建了兩個(gè)rdd,并對(duì)兩個(gè)rdd都執(zhí)行了一次算子操作史辙。
// 此時(shí)會(huì)因?yàn)閷?duì)rdd1執(zhí)行map算子來創(chuàng)建rdd2汹买,而多執(zhí)行一次算子操作,進(jìn)而增加性能開銷髓霞。
// 其實(shí)在這種情況下完全可以復(fù)用同一個(gè)RDD卦睹。
// 我們可以使用rdd1畦戒,既做reduceByKey操作方库,也做map操作。
// 在進(jìn)行第二個(gè)map操作時(shí)障斋,只使用每個(gè)數(shù)據(jù)的tuple._2纵潦,也就是rdd1中的value值徐鹤,即可。
JavaPairRDD<long , String> rdd1 = ...
rdd1.reduceByKey(...)
rdd1.map(tuple._2...)
// 第二種方式相較于第一種方式而言邀层,很明顯減少了一次rdd2的計(jì)算開銷返敬。
// 但是到這里為止,優(yōu)化還沒有結(jié)束寥院,對(duì)rdd1我們還是執(zhí)行了兩次算子操作劲赠,rdd1實(shí)際上還是會(huì)被計(jì)算兩次。
// 因此還需要配合“原則三:對(duì)多次使用的RDD進(jìn)行持久化”進(jìn)行使用秸谢,
//才能保證一個(gè)RDD被多次使用時(shí)只被計(jì)算一次凛澎。

原則三:對(duì)多次使用的RDD進(jìn)行持久化

當(dāng)你在Spark代碼中多次對(duì)一個(gè)RDD做了算子操作后,恭喜估蹄,你已經(jīng)實(shí)現(xiàn)Spark作業(yè)第一步的優(yōu)化了塑煎,也就是盡可能復(fù)用RDD。此時(shí)就該在這個(gè)基礎(chǔ)之上臭蚁,進(jìn)行第二步優(yōu)化了最铁,也就是要保證對(duì)一個(gè)RDD執(zhí)行多次算子操作時(shí),這個(gè)RDD本身僅僅被計(jì)算一次垮兑。

Spark中對(duì)于一個(gè)RDD執(zhí)行多次算子的默認(rèn)原理是這樣的:每次你對(duì)一個(gè)RDD執(zhí)行一個(gè)算子操作時(shí)冷尉,都會(huì)重新從源頭處計(jì)算一遍,計(jì)算出那個(gè)RDD來甥角,然后再對(duì)這個(gè)RDD執(zhí)行你的算子操作网严。這種方式的性能是很差的。

因此對(duì)于這種情況嗤无,我們的建議是:對(duì)多次使用的RDD進(jìn)行持久化震束。此時(shí)Spark就會(huì)根據(jù)你的持久化策略,將RDD中的數(shù)據(jù)保存到內(nèi)存或者磁盤中当犯。以后每次對(duì)這個(gè)RDD進(jìn)行算子操作時(shí)垢村,都會(huì)直接從內(nèi)存或磁盤中提取持久化的RDD數(shù)據(jù),然后執(zhí)行算子嚎卫,而不會(huì)從源頭處重新計(jì)算一遍這個(gè)RDD嘉栓,再執(zhí)行算子操作。

對(duì)多次使用的RDD進(jìn)行持久化的代碼示例

// 如果要對(duì)一個(gè)RDD進(jìn)行持久化拓诸,只要對(duì)這個(gè)RDD調(diào)用cache()和persist()即可侵佃。
// 正確的做法。
// cache()方法表示:使用非序列化的方式將RDD中的數(shù)據(jù)全部嘗試持久化到內(nèi)存中奠支。
// 此時(shí)再對(duì)rdd1執(zhí)行兩次算子操作時(shí)馋辈,只有在第一次執(zhí)行map算子時(shí),才會(huì)將這個(gè)rdd1從源頭處計(jì)算一次倍谜。
// 第二次執(zhí)行reduce算子時(shí)迈螟,就會(huì)直接從內(nèi)存中提取數(shù)據(jù)進(jìn)行計(jì)算叉抡,不會(huì)重復(fù)計(jì)算一個(gè)rdd。
val rdd1 = sc.textFile("[hdfs://192.168.0.1:9000/hello.txt](hdfs://192.168.0.1:9000/hello.txt)").cache()
rdd1.map(...)
rdd1.reduce(...)
// persist()方法表示:手動(dòng)選擇持久化級(jí)別答毫,并使用指定的方式進(jìn)行持久化褥民。
// 比如說,StorageLevel.MEMORY_AND_DISK_SER表示洗搂,內(nèi)存充足時(shí)優(yōu)先持久化到內(nèi)存中消返,
//內(nèi)存不充足時(shí)持久化到磁盤文件中。
// 而且其中的_SER后綴表示耘拇,使用序列化的方式來保存RDD數(shù)據(jù)侦副,此時(shí)RDD中的每個(gè)partition
//都會(huì)序列化成一個(gè)大的字節(jié)數(shù)組,然后再持久化到內(nèi)存或磁盤中驼鞭。
// 序列化的方式可以減少持久化的數(shù)據(jù)對(duì)內(nèi)存/磁盤的占用量秦驯,進(jìn)而避免內(nèi)存被持久化數(shù)據(jù)占用過多,
//從而發(fā)生頻繁GC挣棕。
val rdd1 = sc.textFile("[hdfs://192.168.0.1:9000/hello.txt](hdfs://192.168.0.1:9000/hello.txt)")
.persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)

對(duì)于persist()方法而言译隘,我們可以根據(jù)不同的業(yè)務(wù)場(chǎng)景選擇不同的持久化級(jí)別。

Spark的持久化級(jí)別

持久化級(jí)別 含義解釋
MEMORY_ONLY 使用未序列化的Java對(duì)象格式洛心,將數(shù)據(jù)保存在內(nèi)存中固耘。如果內(nèi)存不夠存放所有的數(shù)據(jù),則數(shù)據(jù)可能就不會(huì)進(jìn)行持久化词身。那么下次對(duì)這個(gè)RDD執(zhí)行算子操作時(shí)厅目,那些沒有被持久化的數(shù)據(jù),需要從源頭處重新計(jì)算一遍法严。這是默認(rèn)的持久化策略损敷,使用cache()方法時(shí),實(shí)際就是使用的這種持久化策略深啤。
MEMORY_AND_DISK 使用未序列化的Java對(duì)象格式拗馒,優(yōu)先嘗試將數(shù)據(jù)保存在內(nèi)存中。如果內(nèi)存不夠存放所有的數(shù)據(jù)溯街,會(huì)將數(shù)據(jù)寫入磁盤文件中诱桂,下次對(duì)這個(gè)RDD執(zhí)行算子時(shí),持久化在磁盤文件中的數(shù)據(jù)會(huì)被讀取出來使用呈昔。
MEMORY_ONLY_SER 基本含義同MEMORY_ONLY挥等。唯一的區(qū)別是,會(huì)將RDD中的數(shù)據(jù)進(jìn)行序列化堤尾,RDD的每個(gè)partition會(huì)被序列化成一個(gè)字節(jié)數(shù)組肝劲。這種方式更加節(jié)省內(nèi)存,從而可以避免持久化的數(shù)據(jù)占用過多內(nèi)存導(dǎo)致頻繁GC哀峻。
MEMORY_AND_DISK_SER 基本含義同MEMORY_AND_DISK涡相。唯一的區(qū)別是,會(huì)將RDD中的數(shù)據(jù)進(jìn)行序列化剩蟀,RDD的每個(gè)partition會(huì)被序列化成一個(gè)字節(jié)數(shù)組催蝗。這種方式更加節(jié)省內(nèi)存,從而可以避免持久化的數(shù)據(jù)占用過多內(nèi)存導(dǎo)致頻繁GC育特。
DISK_ONLY 使用未序列化的Java對(duì)象格式丙号,將數(shù)據(jù)全部寫入磁盤文件中。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等. 對(duì)于上述任意一種持久化策略缰冤,如果加上后綴_2犬缨,代表的是將每個(gè)持久化的數(shù)據(jù),都復(fù)制一份副本棉浸,并將副本保存到其他節(jié)點(diǎn)上怀薛。這種基于副本的持久化機(jī)制主要用于進(jìn)行容錯(cuò)。假如某個(gè)節(jié)點(diǎn)掛掉迷郑,節(jié)點(diǎn)的內(nèi)存或磁盤中的持久化數(shù)據(jù)丟失了枝恋,那么后續(xù)對(duì)RDD計(jì)算時(shí)還可以使用該數(shù)據(jù)在其他節(jié)點(diǎn)上的副本。如果沒有副本的話嗡害,就只能將這些數(shù)據(jù)從源頭處重新計(jì)算一遍了焚碌。

如何選擇一種最合適的持久化策略

1、默認(rèn)情況下霸妹,性能最高的當(dāng)然是MEMORY_ONLY十电,但前提是你的內(nèi)存必須足夠足夠大,可以綽綽有余地存放下整個(gè)RDD的所有數(shù)據(jù)叹螟。因?yàn)椴贿M(jìn)行序列化與反序列化操作鹃骂,就避免了這部分的性能開銷;對(duì)這個(gè)RDD的后續(xù)算子操作罢绽,都是基于純內(nèi)存中的數(shù)據(jù)的操作偎漫,不需要從磁盤文件中讀取數(shù)據(jù),性能也很高有缆;而且不需要復(fù)制一份數(shù)據(jù)副本象踊,并遠(yuǎn)程傳送到其他節(jié)點(diǎn)上。但是這里必須要注意的是棚壁,在實(shí)際的生產(chǎn)環(huán)境中杯矩,恐怕能夠直接用這種策略的場(chǎng)景還是有限的,如果RDD中數(shù)據(jù)比較多時(shí)(比如幾十億)袖外,直接用這種持久化級(jí)別史隆,會(huì)導(dǎo)致JVM的OOM內(nèi)存溢出異常。

2曼验、如果使用MEMORY_ONLY級(jí)別時(shí)發(fā)生了內(nèi)存溢出泌射,那么建議嘗試使用MEMORY_ONLY_SER級(jí)別粘姜。該級(jí)別會(huì)將RDD數(shù)據(jù)序列化后再保存在內(nèi)存中,此時(shí)每個(gè)partition僅僅是一個(gè)字節(jié)數(shù)組而已熔酷,大大減少了對(duì)象數(shù)量孤紧,并降低了內(nèi)存占用。這種級(jí)別比MEMORY_ONLY多出來的性能開銷拒秘,主要就是序列化與反序列化的開銷号显。但是后續(xù)算子可以基于純內(nèi)存進(jìn)行操作,因此性能總體還是比較高的躺酒。此外押蚤,可能發(fā)生的問題同上,如果RDD中的數(shù)據(jù)量過多的話羹应,還是可能會(huì)導(dǎo)致OOM內(nèi)存溢出的異常揽碘。

3、如果純內(nèi)存的級(jí)別都無(wú)法使用园匹,那么建議使用MEMORY_AND_DISK_SER策略钾菊,而不是MEMORY_AND_DISK策略。因?yàn)榧热坏搅诉@一步偎肃,就說明RDD的數(shù)據(jù)量很大煞烫,內(nèi)存無(wú)法完全放下。序列化后的數(shù)據(jù)比較少累颂,可以節(jié)省內(nèi)存和磁盤的空間開銷滞详。同時(shí)該策略會(huì)優(yōu)先盡量嘗試將數(shù)據(jù)緩存在內(nèi)存中,內(nèi)存緩存不下才會(huì)寫入磁盤紊馏。

4料饥、通常不建議使用DISK_ONLY和后綴為_2的級(jí)別:因?yàn)橥耆诖疟P文件進(jìn)行數(shù)據(jù)的讀寫,會(huì)導(dǎo)致性能急劇降低朱监,有時(shí)還不如重新計(jì)算一次所有RDD岸啡。后綴為_2的級(jí)別,必須將所有數(shù)據(jù)都復(fù)制一份副本赫编,并發(fā)送到其他節(jié)點(diǎn)上巡蘸,數(shù)據(jù)復(fù)制以及網(wǎng)絡(luò)傳輸會(huì)導(dǎo)致較大的性能開銷,除非是要求作業(yè)的高可用性擂送,否則不建議使用悦荒。

原則四:盡量避免使用shuffle類算子

如果有可能的話,要盡量避免使用shuffle類算子嘹吨。因?yàn)镾park作業(yè)運(yùn)行過程中搬味,最消耗性能的地方就是shuffle過程。shuffle過程,簡(jiǎn)單來說碰纬,就是將分布在集群中多個(gè)節(jié)點(diǎn)上的同一個(gè)key萍聊,拉取到同一個(gè)節(jié)點(diǎn)上,進(jìn)行聚合或join等操作悦析。比如reduceByKey寿桨、join等算子,都會(huì)觸發(fā)shuffle操作她按。

shuffle過程中,各個(gè)節(jié)點(diǎn)上的相同key都會(huì)先寫入本地磁盤文件中炕柔,然后其他節(jié)點(diǎn)需要通過網(wǎng)絡(luò)傳輸拉取各個(gè)節(jié)點(diǎn)上的磁盤文件中的相同key酌泰。而且相同key都拉取到同一個(gè)節(jié)點(diǎn)進(jìn)行聚合操作時(shí),還有可能會(huì)因?yàn)橐粋€(gè)節(jié)點(diǎn)上處理的key過多匕累,導(dǎo)致內(nèi)存不夠存放陵刹,進(jìn)而溢寫到磁盤文件中。因此在shuffle過程中欢嘿,可能會(huì)發(fā)生大量的磁盤文件讀寫的IO操作衰琐,以及數(shù)據(jù)的網(wǎng)絡(luò)傳輸操作。磁盤IO和網(wǎng)絡(luò)數(shù)據(jù)傳輸也是shuffle性能較差的主要原因炼蹦。

因此在我們的開發(fā)過程中羡宙,能避免則盡可能避免使用reduceByKey、join掐隐、distinct狗热、repartition等會(huì)進(jìn)行shuffle的算子,盡量使用map類的非shuffle算子虑省。這樣的話匿刮,沒有shuffle操作或者僅有較少shuffle操作的Spark作業(yè),可以大大減少性能開銷探颈。

Broadcast與map進(jìn)行join代碼示例

// 傳統(tǒng)的join操作會(huì)導(dǎo)致shuffle操作熟丸。
// 因?yàn)閮蓚€(gè)RDD中,相同的key都需要通過網(wǎng)絡(luò)拉取到一個(gè)節(jié)點(diǎn)上伪节,由一個(gè)task進(jìn)行join操作光羞。
val rdd3 = rdd1.join(rdd2)
// Broadcast+map的join操作,不會(huì)導(dǎo)致shuffle操作怀大。
// 使用Broadcast將一個(gè)數(shù)據(jù)量較小的RDD作為廣播變量狞山。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
// 在rdd1.map算子中,可以從rdd2DataBroadcast中叉寂,獲取rdd2的所有數(shù)據(jù)萍启。
// 然后進(jìn)行遍歷,如果發(fā)現(xiàn)rdd2中某條數(shù)據(jù)的key與rdd1的當(dāng)前數(shù)據(jù)的key是相同的,
//那么就判定可以進(jìn)行join勘纯。
// 此時(shí)就可以根據(jù)自己需要的方式局服,將rdd1當(dāng)前數(shù)據(jù)與rdd2中可以連接的數(shù)據(jù),
//拼接在一起(String或Tuple)驳遵。
val rdd3 = rdd1.map(rdd2DataBroadcast...)
// 注意淫奔,以上操作,建議僅僅在rdd2的數(shù)據(jù)量比較少(比如幾百M(fèi)堤结,或者一兩G)的情況下使用唆迁。
// 因?yàn)槊總€(gè)Executor的內(nèi)存中,都會(huì)駐留一份rdd2的全量數(shù)據(jù)竞穷。

原則五:使用map-side預(yù)聚合的shuffle操作

如果因?yàn)闃I(yè)務(wù)需要唐责,一定要使用shuffle操作,無(wú)法用map類的算子來替代瘾带,那么盡量使用可以map-side預(yù)聚合的算子鼠哥。

所謂的map-side預(yù)聚合,說的是在每個(gè)節(jié)點(diǎn)本地對(duì)相同的key進(jìn)行一次聚合操作看政,類似于MapReduce中的本地combiner朴恳。map-side預(yù)聚合之后,每個(gè)節(jié)點(diǎn)本地就只會(huì)有一條相同的key允蚣,因?yàn)槎鄺l相同的key都被聚合起來了于颖。其他節(jié)點(diǎn)在拉取所有節(jié)點(diǎn)上的相同key時(shí),就會(huì)大大減少需要拉取的數(shù)據(jù)數(shù)量嚷兔,從而也就減少了磁盤IO以及網(wǎng)絡(luò)傳輸開銷恍飘。通常來說,在可能的情況下谴垫,建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子章母。因?yàn)閞educeByKey和aggregateByKey算子都會(huì)使用用戶自定義的函數(shù)對(duì)每個(gè)節(jié)點(diǎn)本地的相同key進(jìn)行預(yù)聚合。而groupByKey算子是不會(huì)進(jìn)行預(yù)聚合的翩剪,全量的數(shù)據(jù)會(huì)在集群的各個(gè)節(jié)點(diǎn)之間分發(fā)和傳輸乳怎,性能相對(duì)來說比較差。

比如下圖前弯,就是典型的例子蚪缀,分別基于reduceByKey和groupByKey進(jìn)行單詞計(jì)數(shù)。其中第一張圖是groupByKey的原理圖恕出,可以看到询枚,沒有進(jìn)行任何本地聚合時(shí),所有數(shù)據(jù)都會(huì)在集群節(jié)點(diǎn)之間傳輸浙巫;第二張圖是reduceByKey的原理圖金蜀,可以看到刷后,每個(gè)節(jié)點(diǎn)本地的相同key數(shù)據(jù),都進(jìn)行了預(yù)聚合渊抄,然后才傳輸?shù)狡渌?jié)點(diǎn)上進(jìn)行全局聚合尝胆。

原則六:使用高性能的算子

除了shuffle相關(guān)的算子有優(yōu)化原則之外,其他的算子也都有著相應(yīng)的優(yōu)化原則护桦。

使用reduceByKey/aggregateByKey替代groupByKey

詳情見“原則五:使用map-side預(yù)聚合的shuffle操作”含衔。

使用mapPartitions替代普通map

mapPartitions類的算子,一次函數(shù)調(diào)用會(huì)處理一個(gè)partition所有的數(shù)據(jù)二庵,而不是一次函數(shù)調(diào)用處理一條贪染,性能相對(duì)來說會(huì)高一些。但是有的時(shí)候催享,使用mapPartitions會(huì)出現(xiàn)OOM(內(nèi)存溢出)的問題杭隙。因?yàn)閱未魏瘮?shù)調(diào)用就要處理掉一個(gè)partition所有的數(shù)據(jù),如果內(nèi)存不夠睡陪,垃圾回收時(shí)是無(wú)法回收掉太多對(duì)象的寺渗,很可能出現(xiàn)OOM異常匿情。所以使用這類操作時(shí)要慎重兰迫!

使用foreachPartitions替代foreach

原理類似于“使用mapPartitions替代map”,也是一次函數(shù)調(diào)用處理一個(gè)partition的所有數(shù)據(jù)炬称,而不是一次函數(shù)調(diào)用處理一條數(shù)據(jù)汁果。在實(shí)踐中發(fā)現(xiàn),foreachPartitions類的算子玲躯,對(duì)性能的提升還是很有幫助的据德。比如在foreach函數(shù)中,將RDD中所有數(shù)據(jù)寫MySQL跷车,那么如果是普通的foreach算子棘利,就會(huì)一條數(shù)據(jù)一條數(shù)據(jù)地寫,每次函數(shù)調(diào)用可能就會(huì)創(chuàng)建一個(gè)數(shù)據(jù)庫(kù)連接朽缴,此時(shí)就勢(shì)必會(huì)頻繁地創(chuàng)建和銷毀數(shù)據(jù)庫(kù)連接善玫,性能是非常低下;但是如果用foreachPartitions算子一次性處理一個(gè)partition的數(shù)據(jù)密强,那么對(duì)于每個(gè)partition茅郎,只要?jiǎng)?chuàng)建一個(gè)數(shù)據(jù)庫(kù)連接即可,然后執(zhí)行批量插入操作或渤,此時(shí)性能是比較高的系冗。實(shí)踐中發(fā)現(xiàn),對(duì)于1萬(wàn)條左右的數(shù)據(jù)量寫MySQL薪鹦,性能可以提升30%以上掌敬。

使用filter之后進(jìn)行coalesce操作

通常對(duì)一個(gè)RDD執(zhí)行filter算子過濾掉RDD中較多數(shù)據(jù)后(比如30%以上的數(shù)據(jù))惯豆,建議使用coalesce算子征堪,手動(dòng)減少RDD的partition數(shù)量攻柠,將RDD中的數(shù)據(jù)壓縮到更少的partition中去。因?yàn)閒ilter之后惠遏,RDD的每個(gè)partition中都會(huì)有很多數(shù)據(jù)被過濾掉舀武,此時(shí)如果照常進(jìn)行后續(xù)的計(jì)算拄养,其實(shí)每個(gè)task處理的partition中的數(shù)據(jù)量并不是很多,有一點(diǎn)資源浪費(fèi)银舱,而且此時(shí)處理的task越多瘪匿,可能速度反而越慢。因此用coalesce減少partition數(shù)量寻馏,將RDD中的數(shù)據(jù)壓縮到更少的partition之后棋弥,只要使用更少的task即可處理完所有的partition。在某些場(chǎng)景下诚欠,對(duì)于性能的提升會(huì)有一定的幫助顽染。

使用repartitionAndSortWithinPartitions替代repartition與sort類操作

repartitionAndSortWithinPartitions是Spark官網(wǎng)推薦的一個(gè)算子,官方建議轰绵,如果需要在repartition重分區(qū)之后粉寞,還要進(jìn)行排序,建議直接使用repartitionAndSortWithinPartitions算子左腔。因?yàn)樵撍阕涌梢砸贿呥M(jìn)行重分區(qū)的shuffle操作唧垦,一邊進(jìn)行排序。shuffle與sort兩個(gè)操作同時(shí)進(jìn)行液样,比先shuffle再sort來說振亮,性能可能是要高的。

原則七:廣播大變量

有時(shí)在開發(fā)過程中鞭莽,會(huì)遇到需要在算子函數(shù)中使用外部變量的場(chǎng)景(尤其是大變量坊秸,比如100M以上的大集合),那么此時(shí)就應(yīng)該使用Spark的廣播(Broadcast)功能來提升性能澎怒。

在算子函數(shù)中使用到外部變量時(shí)褒搔,默認(rèn)情況下,Spark會(huì)將該變量復(fù)制多個(gè)副本丹拯,通過網(wǎng)絡(luò)傳輸?shù)絫ask中站超,此時(shí)每個(gè)task都有一個(gè)變量副本。如果變量本身比較大的話(比如100M乖酬,甚至1G)死相,那么大量的變量副本在網(wǎng)絡(luò)中傳輸?shù)男阅荛_銷,以及在各個(gè)節(jié)點(diǎn)的Executor中占用過多內(nèi)存導(dǎo)致的頻繁GC咬像,都會(huì)極大地影響性能算撮。

因此對(duì)于上述情況生宛,如果使用的外部變量比較大,建議使用Spark的廣播功能肮柜,對(duì)該變量進(jìn)行廣播陷舅。廣播后的變量,會(huì)保證每個(gè)Executor的內(nèi)存中审洞,只駐留一份變量副本莱睁,而Executor中的task執(zhí)行時(shí)共享該Executor中的那份變量副本。這樣的話芒澜,可以大大減少變量副本的數(shù)量仰剿,從而減少網(wǎng)絡(luò)傳輸?shù)男阅荛_銷,并減少對(duì)Executor內(nèi)存的占用開銷痴晦,降低GC的頻率南吮。

廣播大變量的代碼示例

// 以下代碼在算子函數(shù)中,使用了外部的變量誊酌。
// 此時(shí)沒有做任何特殊操作部凑,每個(gè)task都會(huì)有一份list1的副本。
val list1 = ...
rdd1.map(list1...)
// 以下代碼將list1封裝成了Broadcast類型的廣播變量碧浊。
// 在算子函數(shù)中涂邀,使用廣播變量時(shí),首先會(huì)判斷當(dāng)前task所在Executor內(nèi)存中辉词,是否有變量副本必孤。
// 如果有則直接使用猾骡;如果沒有則從Driver或者其他Executor節(jié)點(diǎn)上遠(yuǎn)程拉取一份放到本地Executor內(nèi)存中瑞躺。
// 每個(gè)Executor內(nèi)存中,就只會(huì)駐留一份廣播變量副本兴想。
val list1 = ...
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast...)

原則八:使用Kryo優(yōu)化序列化性能

在Spark中幢哨,主要有三個(gè)地方涉及到了序列化:

1、在算子函數(shù)中使用到外部變量時(shí)嫂便,該變量會(huì)被序列化后進(jìn)行網(wǎng)絡(luò)傳輸(見“原則七:廣播大變量”中的講解)捞镰。
2、將自定義的類型作為RDD的泛型類型時(shí)(比如JavaRDD毙替,Student是自定義類型)岸售,所有自定義類型對(duì)象,都會(huì)進(jìn)行序列化厂画。因此這種情況下凸丸,也要求自定義的類必須實(shí)現(xiàn)Serializable接口。
3袱院、使用可序列化的持久化策略時(shí)(比如MEMORY_ONLY_SER)屎慢,Spark會(huì)將RDD中的每個(gè)partition都序列化成一個(gè)大的字節(jié)數(shù)組瞭稼。

對(duì)于這三種出現(xiàn)序列化的地方,我們都可以通過使用Kryo序列化類庫(kù)腻惠,來優(yōu)化序列化和反序列化的性能环肘。Spark默認(rèn)使用的是Java的序列化機(jī)制,也就是ObjectOutputStream/ObjectInputStream API來進(jìn)行序列化和反序列化集灌。但是Spark同時(shí)支持使用Kryo序列化庫(kù)悔雹,Kryo序列化類庫(kù)的性能比Java序列化類庫(kù)的性能要高很多。官方介紹欣喧,Kryo序列化機(jī)制比Java序列化機(jī)制荠商,性能高10倍左右。Spark之所以默認(rèn)沒有使用Kryo作為序列化類庫(kù)续誉,是因?yàn)镵ryo要求最好要注冊(cè)所有需要進(jìn)行序列化的自定義類型莱没,因此對(duì)于開發(fā)者來說,這種方式比較麻煩酷鸦。

以下是使用Kryo的代碼示例饰躲,我們只要設(shè)置序列化類,再注冊(cè)要序列化的自定義類型即可(比如算子函數(shù)中使用到的外部變量類型臼隔、作為RDD泛型類型的自定義類型等):

// 創(chuàng)建SparkConf對(duì)象嘹裂。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 設(shè)置序列化器為KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注冊(cè)要序列化的自定義類型摔握。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

原則九:優(yōu)化數(shù)據(jù)結(jié)構(gòu)

Java中寄狼,有三種類型比較耗費(fèi)內(nèi)存:

1、對(duì)象氨淌,每個(gè)Java對(duì)象都有對(duì)象頭泊愧、引用等額外的信息,因此比較占用內(nèi)存空間盛正。
2删咱、字符串,每個(gè)字符串內(nèi)部都有一個(gè)字符數(shù)組以及長(zhǎng)度等額外信息豪筝。
3痰滋、集合類型,比如HashMap续崖、LinkedList等敲街,因?yàn)榧项愋蛢?nèi)部通常會(huì)使用一些內(nèi)部類來封裝集合元素,比如Map.Entry严望。

因此Spark官方建議多艇,在Spark編碼實(shí)現(xiàn)中,特別是對(duì)于算子函數(shù)中的代碼著蟹,盡量不要使用上述三種數(shù)據(jù)結(jié)構(gòu)墩蔓,盡量使用字符串替代對(duì)象梢莽,使用原始類型(比如Int、Long)替代字符串奸披,使用數(shù)組替代集合類型昏名,這樣盡可能地減少內(nèi)存占用,從而降低GC頻率阵面,提升性能轻局。

但是在筆者的編碼實(shí)踐中發(fā)現(xiàn),要做到該原則其實(shí)并不容易样刷。因?yàn)槲覀兺瑫r(shí)要考慮到代碼的可維護(hù)性仑扑,如果一個(gè)代碼中,完全沒有任何對(duì)象抽象置鼻,全部是字符串拼接的方式镇饮,那么對(duì)于后續(xù)的代碼維護(hù)和修改,無(wú)疑是一場(chǎng)巨大的災(zāi)難箕母。同理储藐,如果所有操作都基于數(shù)組實(shí)現(xiàn),而不使用HashMap嘶是、LinkedList等集合類型钙勃,那么對(duì)于我們的編碼難度以及代碼可維護(hù)性,也是一個(gè)極大的挑戰(zhàn)聂喇。因此筆者建議辖源,在可能以及合適的情況下,使用占用內(nèi)存較少的數(shù)據(jù)結(jié)構(gòu)希太,但是前提是要保證代碼的可維護(hù)性克饶。

轉(zhuǎn)載自過往記憶(https://www.iteblog.com/)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市跛十,隨后出現(xiàn)的幾起案子彤路,更是在濱河造成了極大的恐慌秕硝,老刑警劉巖芥映,帶你破解...
    沈念sama閱讀 216,591評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異远豺,居然都是意外死亡奈偏,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門躯护,熙熙樓的掌柜王于貴愁眉苦臉地迎上來惊来,“玉大人,你說我怎么就攤上這事棺滞〔靡希” “怎么了矢渊?”我有些...
    開封第一講書人閱讀 162,823評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)枉证。 經(jīng)常有香客問我矮男,道長(zhǎng),這世上最難降的妖魔是什么室谚? 我笑而不...
    開封第一講書人閱讀 58,204評(píng)論 1 292
  • 正文 為了忘掉前任毡鉴,我火速辦了婚禮,結(jié)果婚禮上秒赤,老公的妹妹穿的比我還像新娘猪瞬。我一直安慰自己,他們只是感情好入篮,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,228評(píng)論 6 388
  • 文/花漫 我一把揭開白布陈瘦。 她就那樣靜靜地躺著,像睡著了一般潮售。 火紅的嫁衣襯著肌膚如雪甘晤。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,190評(píng)論 1 299
  • 那天饲做,我揣著相機(jī)與錄音线婚,去河邊找鬼。 笑死盆均,一個(gè)胖子當(dāng)著我的面吹牛塞弊,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播泪姨,決...
    沈念sama閱讀 40,078評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼游沿,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了肮砾?” 一聲冷哼從身側(cè)響起诀黍,我...
    開封第一講書人閱讀 38,923評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎仗处,沒想到半個(gè)月后眯勾,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,334評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡婆誓,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,550評(píng)論 2 333
  • 正文 我和宋清朗相戀三年吃环,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片洋幻。...
    茶點(diǎn)故事閱讀 39,727評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡郁轻,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情好唯,我是刑警寧澤竭沫,帶...
    沈念sama閱讀 35,428評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站骑篙,受9級(jí)特大地震影響输吏,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜替蛉,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,022評(píng)論 3 326
  • 文/蒙蒙 一贯溅、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧躲查,春花似錦它浅、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,672評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至典唇,卻和暖如春镊折,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背介衔。 一陣腳步聲響...
    開封第一講書人閱讀 32,826評(píng)論 1 269
  • 我被黑心中介騙來泰國(guó)打工恨胚, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人炎咖。 一個(gè)月前我還...
    沈念sama閱讀 47,734評(píng)論 2 368
  • 正文 我出身青樓赃泡,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親乘盼。 傳聞我的和親對(duì)象是個(gè)殘疾皇子升熊,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,619評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容