Spark性能優(yōu)化分為四個(gè)方面:
1氮凝、開(kāi)發(fā)調(diào)優(yōu)
2、資源調(diào)優(yōu)
3望忆、數(shù)據(jù)傾斜調(diào)優(yōu)
4罩阵、shuffle調(diào)優(yōu)
1. 開(kāi)發(fā)調(diào)優(yōu)
1.1 避免創(chuàng)建重復(fù)的RDD
對(duì)于同一份數(shù)據(jù),只應(yīng)該創(chuàng)建一個(gè)RDD炭臭,不能創(chuàng)建多個(gè)RDD來(lái)代表同一份數(shù)據(jù)永脓。
錯(cuò)誤示例:
val rdd1 = sc.textFile("../hello.txt")
rdd1.map(...)
val rdd2 = sc.textFile("../hello.txt")
rdd2.reduce(...)
正確示例:
val rdd1 = sc.textFile("../hello.txt")
rdd1.map(...)
rdd1.reduce(...)
1.2 盡可能復(fù)用同一個(gè)RDD
在對(duì)不同的數(shù)據(jù)執(zhí)行算子操作時(shí)還要盡可能地復(fù)用一個(gè)RDD。
錯(cuò)誤示例:
JavaPairRDD</long><long , String> rdd1 = ...
JavaRDD<string> rdd2 = rdd1.map(...)
// 分別對(duì)rdd1和rdd2執(zhí)行了不同的算子操作鞋仍。
rdd1.reduceByKey(...)
rdd2.map(...)
正確示例:
JavaPairRDD<long , String> rdd1 = ...
rdd1.reduceByKey(...)
rdd1.map(tuple._2...)
1.3 對(duì)多次使用的RDD進(jìn)行持久化
對(duì)多次使用的RDD進(jìn)行持久化常摧,以后每次對(duì)這個(gè)RDD進(jìn)行算子操作時(shí),都會(huì)直接從內(nèi)存或磁盤中提取持久化的RDD數(shù)據(jù)威创。
正確示例:
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
rdd1.map(...)
rdd1.reduce(...)
// persist()方法表示:手動(dòng)選擇持久化級(jí)別落午,并使用指定的方式進(jìn)行持久化。
// 比如說(shuō)肚豺,StorageLevel.MEMORY_AND_DISK_SER表示溃斋,內(nèi)存充足時(shí)優(yōu)先持久化到內(nèi)存中,
//內(nèi)存不充足時(shí)持久化到磁盤文件中吸申。
// 而且其中的_SER后綴表示梗劫,使用序列化的方式來(lái)保存RDD數(shù)據(jù),此時(shí)RDD中的每個(gè)partition
//都會(huì)序列化成一個(gè)大的字節(jié)數(shù)組截碴,然后再持久化到內(nèi)存或磁盤中梳侨。
// 序列化的方式可以減少持久化的數(shù)據(jù)對(duì)內(nèi)存/磁盤的占用量,進(jìn)而避免內(nèi)存被持久化數(shù)據(jù)占用過(guò)多日丹,
//從而發(fā)生頻繁GC走哺。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
.persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)
對(duì)于persist(),可根據(jù)業(yè)務(wù)場(chǎng)景選擇持久化級(jí)別哲虾。
1.4 盡可能避免使用shuffle類算子
shuffle過(guò)程丙躏,簡(jiǎn)單來(lái)說(shuō)择示,就是將分布在集群中多個(gè)節(jié)點(diǎn)上的同一個(gè)key,拉取到同一個(gè)節(jié)點(diǎn)上晒旅,進(jìn)行聚合或join等操作栅盲。所以要盡可能避免使用reduceByKey、join敢朱、distinct剪菱、repartition等會(huì)進(jìn)行shuffle的算子,盡量使用map類的非shuffle算子拴签。
Broadcast與map進(jìn)行join代碼示例:
val rdd3 = rdd1.join(rdd2)
// Broadcast+map的join操作孝常,不會(huì)導(dǎo)致shuffle操作。
// 使用Broadcast將一個(gè)數(shù)據(jù)量較小的RDD作為廣播變量蚓哩。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
// 在rdd1.map算子中构灸,可以從rdd2DataBroadcast中,獲取rdd2的所有數(shù)據(jù)岸梨。
// 然后進(jìn)行遍歷喜颁,如果發(fā)現(xiàn)rdd2中某條數(shù)據(jù)的key與rdd1的當(dāng)前數(shù)據(jù)的key是相同的,
//那么就判定可以進(jìn)行join曹阔。
// 此時(shí)就可以根據(jù)自己需要的方式半开,將rdd1當(dāng)前數(shù)據(jù)與rdd2中可以連接的數(shù)據(jù),
//拼接在一起(String或Tuple)赃份。
val rdd3 = rdd1.map(rdd2DataBroadcast...)
1.5 使用map-side預(yù)聚合的shuffle操作
如果因?yàn)闃I(yè)務(wù)需要寂拆,一定要使用shuffle操作,無(wú)法用map類的算子來(lái)替代抓韩,那么盡量使用可以map-side預(yù)聚合的算子纠永。
所謂的map-side預(yù)聚合,說(shuō)的是在每個(gè)節(jié)點(diǎn)本地對(duì)相同的key進(jìn)行一次聚合操作谒拴,類似于MapReduce中的本地combiner尝江。
在可能的情況下,建議使用reduceByKey或者aggregateByKey算子來(lái)替代掉groupByKey算子英上。因?yàn)閞educeByKey和aggregateByKey算子都會(huì)使用用戶自定義的函數(shù)對(duì)每個(gè)節(jié)點(diǎn)本地的相同key進(jìn)行預(yù)聚合炭序。而groupByKey算子是不會(huì)進(jìn)行預(yù)聚合的,全量的數(shù)據(jù)會(huì)在集群的各個(gè)節(jié)點(diǎn)之間分發(fā)和傳輸苍日,性能相對(duì)來(lái)說(shuō)比較差惭聂。
1.6 使用高性能的算子
1)使用reduceByKey/aggregateByKey替代groupByKey
2)使用mapPartitions替代普通map
3)使用foreachPartitions替代foreach
4)使用filter之后進(jìn)行coalesce操作
5)使用repartitionAndSortWithinPartitions替代repartition與sort類操作
1.7 廣播大變量
有時(shí)在開(kāi)發(fā)過(guò)程中,會(huì)遇到需要在算子函數(shù)中使用外部變量的場(chǎng)景(尤其是大變量易遣,比如100M以上的大集合),那么此時(shí)就應(yīng)該使用Spark的廣播(Broadcast)功能來(lái)提升性能嫌佑。
錯(cuò)誤示例:
// 以下代碼在算子函數(shù)中豆茫,使用了外部的變量侨歉。
// 此時(shí)沒(méi)有做任何特殊操作,每個(gè)task都會(huì)有一份list1的副本揩魂。
val list1 = ...
rdd1.map(list1...)
正確示例:
// 以下代碼將list1封裝成了Broadcast類型的廣播變量幽邓。
// 在算子函數(shù)中,使用廣播變量時(shí)火脉,首先會(huì)判斷當(dāng)前task所在Executor內(nèi)存中牵舵,是否有變量副本。
// 如果有則直接使用倦挂;如果沒(méi)有則從Driver或者其他Executor節(jié)點(diǎn)上遠(yuǎn)程拉取一份放到本地Executor內(nèi)存中畸颅。
// 每個(gè)Executor內(nèi)存中,就只會(huì)駐留一份廣播變量副本方援。
val list1 = ...
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast...)
1.8 使用Kryo優(yōu)化序列化性能
在Spark中没炒,主要有三個(gè)地方涉及到了序列化:
1、在算子函數(shù)中使用到外部變量時(shí)犯戏,該變量會(huì)被序列化后進(jìn)行網(wǎng)絡(luò)傳輸(見(jiàn)“原則七:廣播大變量”中的講解)送火。
2、將自定義的類型作為RDD的泛型類型時(shí)(比如JavaRDD先匪,Student是自定義類型)种吸,所有自定義類型對(duì)象,都會(huì)進(jìn)行序列化呀非。因此這種情況下坚俗,也要求自定義的類必須實(shí)現(xiàn)Serializable接口。
3姜钳、使用可序列化的持久化策略時(shí)(比如MEMORY_ONLY_SER)坦冠,Spark會(huì)將RDD中的每個(gè)partition都序列化成一個(gè)大的字節(jié)數(shù)組。
對(duì)于這三種出現(xiàn)序列化的地方哥桥,我們都可以通過(guò)使用Kryo序列化類庫(kù)辙浑,來(lái)優(yōu)化序列化和反序列化的性能。
// 創(chuàng)建SparkConf對(duì)象拟糕。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 設(shè)置序列化器為KryoSerializer判呕。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注冊(cè)要序列化的自定義類型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
1.9 優(yōu)化數(shù)據(jù)結(jié)構(gòu)
Java中送滞,有三種類型比較耗費(fèi)內(nèi)存:
1侠草、對(duì)象,每個(gè)Java對(duì)象都有對(duì)象頭犁嗅、引用等額外的信息边涕,因此比較占用內(nèi)存空間。
2、字符串功蜓,每個(gè)字符串內(nèi)部都有一個(gè)字符數(shù)組以及長(zhǎng)度等額外信息园爷。
3、集合類型式撼,比如HashMap童社、LinkedList等,因?yàn)榧项愋蛢?nèi)部通常會(huì)使用一些內(nèi)部類來(lái)封裝集合元素著隆,比如Map.Entry扰楼。
因此Spark官方建議,在Spark編碼實(shí)現(xiàn)中美浦,特別是對(duì)于算子函數(shù)中的代碼弦赖,盡量不要使用上述三種數(shù)據(jù)結(jié)構(gòu),盡量使用字符串替代對(duì)象抵代,使用原始類型(比如Int腾节、Long)替代字符串,使用數(shù)組替代集合類型荤牍,這樣盡可能地減少內(nèi)存占用案腺,從而降低GC頻率,提升性能康吵。