算子調(diào)優(yōu)
4.1 MapPartitions提升Map類操作性能
spark中,最基本的原則扫腺,就是每個(gè)task處理一個(gè)RDD的partition岗照。
- MapPartitions的優(yōu)缺點(diǎn)
MapPartitions操作的優(yōu)點(diǎn):
如果是普通的map,比如一個(gè)partition中有1萬(wàn)條數(shù)據(jù)笆环。ok攒至,那么你的function要執(zhí)行和計(jì)算1萬(wàn)次。
但是躁劣,使用MapPartitions操作之后迫吐,一個(gè)task僅僅會(huì)執(zhí)行一次function,function一次接收所有的partition數(shù)據(jù)账忘。只要執(zhí)行一次就可以了志膀,性能比較高。MapPartitions的缺點(diǎn):
如果是普通的map操作鳖擒,一次function的執(zhí)行就處理一條數(shù)據(jù)溉浙。那么如果內(nèi)存不夠用的情況下,比如處理了1千條數(shù)據(jù)了蒋荚,那么這個(gè)時(shí)候內(nèi)存不夠了戳稽,那么就可以將已經(jīng)處理完的1千條數(shù)據(jù)從內(nèi)存里面垃圾回收掉,或者用其他方法期升,騰出空間來(lái)吧惊奇。
所以說(shuō)普通的map操作通常不會(huì)導(dǎo)致內(nèi)存的OOM異常。
但是MapPartitions操作播赁,對(duì)于大量數(shù)據(jù)來(lái)說(shuō)颂郎,比如甚至一個(gè)partition,100萬(wàn)數(shù)據(jù)容为,一次傳入一個(gè)function以后祖秒,那么可能一下子內(nèi)存不夠诞吱,但是又沒有辦法去騰出內(nèi)存空間來(lái),可能就OOM竭缝,內(nèi)存溢出房维。
- MapPartitions使用場(chǎng)景
當(dāng)分析的數(shù)據(jù)量不是特別大的時(shí)候,都可以用這種MapPartitions系列操作抬纸,性能還是非常不錯(cuò)的咙俩,是有提升的。比如原來(lái)是15分鐘湿故,(曾經(jīng)有一次性能調(diào)優(yōu))阿趁,12分鐘。10分鐘->9分鐘坛猪。
但是也有過出問題的經(jīng)驗(yàn)脖阵,MapPartitions只要一用,直接OOM墅茉,內(nèi)存溢出命黔,崩潰。
在項(xiàng)目中就斤,自己先去估算一下RDD的數(shù)據(jù)量悍募,以及每個(gè)partition的量,還有自己分配給每個(gè)executor的內(nèi)存資源洋机∽寡纾看看一下子內(nèi)存容納所有的partition數(shù)據(jù)行不行。如果行绷旗,可以試一下喜鼓,能跑通就好。性能肯定是有提升的衔肢。但是試了以后庄岖,發(fā)現(xiàn)OOM了,那就放棄吧膀懈。
4.2 filter過后使用coalesce減少分區(qū)數(shù)量
默認(rèn)情況下,經(jīng)過了filter之后谨垃,RDD中的每個(gè)partition的數(shù)據(jù)量启搂,可能都不太一樣了。(原本每個(gè)partition的數(shù)據(jù)量可能是差不多的)
可能出現(xiàn)的問題:
1刘陶、每個(gè)partition數(shù)據(jù)量變少了胳赌,但是在后面進(jìn)行處理的時(shí)候,還是要跟partition數(shù)量一樣數(shù)量的task匙隔,來(lái)進(jìn)行處理疑苫,有點(diǎn)浪費(fèi)task計(jì)算資源。
2、每個(gè)partition的數(shù)據(jù)量不一樣捍掺,會(huì)導(dǎo)致后面的每個(gè)task處理每個(gè)partition的時(shí)候撼短,每個(gè)task要處理的數(shù)據(jù)量就不同,這樣就會(huì)導(dǎo)致有些task運(yùn)行的速度很快挺勿,有些task運(yùn)行的速度很慢曲横。這就是數(shù)據(jù)傾斜。針對(duì)上述的兩個(gè)問題不瓶,我們希望應(yīng)該能夠怎么樣禾嫉?
1、針對(duì)第一個(gè)問題蚊丐,我們希望可以進(jìn)行partition的壓縮吧熙参,因?yàn)閿?shù)據(jù)量變少了,那么partition其實(shí)也完全可以對(duì)應(yīng)的變少麦备。比如原來(lái)是4個(gè)partition孽椰,現(xiàn)在完全可以變成2個(gè)partition。那么就只要用后面的2個(gè)task來(lái)處理即可泥兰。就不會(huì)造成task計(jì)算資源的浪費(fèi)弄屡。(不必要,針對(duì)只有一點(diǎn)點(diǎn)數(shù)據(jù)的partition鞋诗,還去啟動(dòng)一個(gè)task來(lái)計(jì)算)
2膀捷、針對(duì)第二個(gè)問題,其實(shí)解決方案跟第一個(gè)問題是一樣的削彬,也是去壓縮partition全庸,盡量讓每個(gè)partition的數(shù)據(jù)量差不多。那么后面的task分配到的partition的數(shù)據(jù)量也就差不多融痛。不會(huì)造成有的task運(yùn)行速度特別慢壶笼,有的task運(yùn)行速度特別快。避免了數(shù)據(jù)傾斜的問題雁刷。解決問題方法
調(diào)用coalesce算子
主要就是用于在filter操作之后覆劈,針對(duì)每個(gè)partition的數(shù)據(jù)量各不相同的情況,來(lái)壓縮partition的數(shù)量沛励,而且讓每個(gè)partition的數(shù)據(jù)量都盡量均勻緊湊责语。從而便于后面的task進(jìn)行計(jì)算操作,在某種程度上目派,能夠一定程度的提升性能坤候。
4.3、使用foreachPartition優(yōu)化寫數(shù)據(jù)庫(kù)性能
- 默認(rèn)的foreach的性能缺陷在哪里企蹭?
首先白筹,對(duì)于每條數(shù)據(jù)智末,都要單獨(dú)去調(diào)用一次function,task為每個(gè)數(shù)據(jù)徒河,都要去執(zhí)行一次function函數(shù)系馆。
如果100萬(wàn)條數(shù)據(jù),(一個(gè)partition)虚青,調(diào)用100萬(wàn)次它呀。性能比較差衷佃。
另外一個(gè)非常非常重要的一點(diǎn), 如果每個(gè)數(shù)據(jù)审姓,你都去創(chuàng)建一個(gè)數(shù)據(jù)庫(kù)連接的話欺缘,那么你就得創(chuàng)建100萬(wàn)次數(shù)據(jù)庫(kù)連接识虚。
但是要注意的是容劳,數(shù)據(jù)庫(kù)連接的創(chuàng)建和銷毀蓬戚,都是非常非常消耗性能的蚯斯。雖然我們之前已經(jīng)用了數(shù)據(jù)庫(kù)連接池此再,只是創(chuàng)建了固定數(shù)量的數(shù)據(jù)庫(kù)連接何乎。
你還是得多次通過數(shù)據(jù)庫(kù)連接句惯,往數(shù)據(jù)庫(kù)(MySQL)發(fā)送一條SQL語(yǔ)句,然后MySQL需要去執(zhí)行這條SQL語(yǔ)句支救。如果有100萬(wàn)條數(shù)據(jù)抢野,那么就是100萬(wàn)次發(fā)送SQL語(yǔ)句。
以上兩點(diǎn)(數(shù)據(jù)庫(kù)連接各墨,多次發(fā)送SQL語(yǔ)句)指孤,都是非常消耗性能的。
- 用了foreachPartition算子之后贬堵,好處在哪里恃轩?
1、對(duì)于我們寫的function函數(shù)黎做,就調(diào)用一次叉跛,一次傳入一個(gè)partition所有的數(shù)據(jù)。
2蒸殿、主要?jiǎng)?chuàng)建或者獲取一個(gè)數(shù)據(jù)庫(kù)連接就可以筷厘。
3、只要向數(shù)據(jù)庫(kù)發(fā)送一次SQL語(yǔ)句和多組參數(shù)即可宏所。
注意酥艳,與mapPartitions操作一樣,如果一個(gè)partition的數(shù)量真的特別特別大楣铁,比如是100萬(wàn)玖雁,那基本上就不太靠譜了更扁。很有可能會(huì)發(fā)生OOM盖腕,內(nèi)存溢出的問題赫冬。
4.4、使用repartition解決Spark SQL低并行度的性能問題
- 設(shè)置并行度
并行度:之前說(shuō)過溃列,并行度是設(shè)置的:
1劲厌、spark.default.parallelism
2、textFile()听隐,傳入第二個(gè)參數(shù)补鼻,指定partition數(shù)量(比較少用)
- 每次shuffle過程, 都可以設(shè)置分區(qū)數(shù)量的
在生產(chǎn)環(huán)境中,是最好設(shè)置一下并行度雅任。官網(wǎng)有推薦的設(shè)置方式风范,根據(jù)你的application的總cpu core數(shù)量(在spark-submit中可以指定),自己手動(dòng)設(shè)置spark.default.parallelism參數(shù)沪么,指定為cpu core總數(shù)的2~3倍硼婿。
你設(shè)置的這個(gè)并行度,在哪些情況下會(huì)生效禽车?什么情況下不會(huì)生效寇漫?
如果你壓根兒沒有使用Spark SQL(DataFrame),那么你整個(gè)spark application默認(rèn)所有stage的并行度都是你設(shè)置的那個(gè)參數(shù)殉摔。(除非你使用coalesce算子縮減過partition數(shù)量)州胳。
問題來(lái)了,用Spark SQL的情況下逸月,stage的并行度沒法自己指定栓撞。Spark SQL自己會(huì)默認(rèn)根據(jù)hive表對(duì)應(yīng)的hdfs文件的block,自動(dòng)設(shè)置Spark SQL查詢所在的那個(gè)stage的并行度彻采。你自己通過spark.default.parallelism參數(shù)指定的并行度腐缤,只會(huì)在沒有Spark SQL的stage中生效。
比如你第一個(gè)stage肛响,用了Spark SQL從hive表中查詢出了一些數(shù)據(jù)岭粤,然后做了一些transformation操作,接著做了一個(gè)shuffle操作(groupByKey)特笋。下一個(gè)stage剃浇,在shuffle操作之后,做了一些transformation操作猎物。hive表虎囚,對(duì)應(yīng)了一個(gè)hdfs文件,有20個(gè)block蔫磨。你自己設(shè)置了spark.default.parallelism參數(shù)為100淘讥。
你的第一個(gè)stage的并行度,是不受你的控制的堤如,就只有20個(gè)task蒲列。第二個(gè)stage窒朋,才會(huì)變成你自己設(shè)置的那個(gè)并行度,100蝗岖。可能出現(xiàn)的問題侥猩?
Spark SQL默認(rèn)情況下,它的那個(gè)并行度抵赢,咱們沒法設(shè)置欺劳。可能導(dǎo)致的問題铅鲤,也許沒什么問題划提,也許很有問題。Spark SQL所在的那個(gè)stage中邢享,后面的那些transformation操作腔剂,可能會(huì)有非常復(fù)雜的業(yè)務(wù)邏輯,甚至說(shuō)復(fù)雜的算法驼仪。如果你的Spark SQL默認(rèn)把task數(shù)量設(shè)置的很少掸犬,20個(gè),然后每個(gè)task要處理為數(shù)不少的數(shù)據(jù)量绪爸,然后還要執(zhí)行特別復(fù)雜的算法湾碎。
這個(gè)時(shí)候,就會(huì)導(dǎo)致第一個(gè)stage的速度奠货,特別慢介褥。第二個(gè)stage1000個(gè)task非常快递惋。解決Spark SQL無(wú)法設(shè)置并行度和task數(shù)量的辦法
repartition算子柔滔,你用Spark SQL這一步的并行度和task數(shù)量,肯定是沒有辦法去改變了萍虽。但是呢睛廊,可以將你用Spark SQL查詢出來(lái)的RDD,使用repartition算子去重新進(jìn)行分區(qū)杉编,此時(shí)可以分成多個(gè)partition超全。然后呢,從repartition以后的RDD邓馒,再往后嘶朱,并行度和task數(shù)量,就會(huì)按照你預(yù)期的來(lái)了光酣。就可以避免跟Spark SQL綁定在一個(gè)stage中的算子疏遏,只能使用少量的task去處理大量數(shù)據(jù)以及復(fù)雜的算法邏輯。
4.5、reduceByKey本地聚合
reduceByKey财异,相較于普通的shuffle操作(比如groupByKey)下翎,它的一個(gè)特點(diǎn),就是說(shuō)宝当,會(huì)進(jìn)行map端的本地聚合。對(duì)map端給下個(gè)stage每個(gè)task創(chuàng)建的輸出文件中胆萧,寫數(shù)據(jù)之前庆揩,就會(huì)進(jìn)行本地的combiner操作,也就是說(shuō)對(duì)每一個(gè)key跌穗,對(duì)應(yīng)的values订晌,都會(huì)執(zhí)行你的算子函數(shù)(_ + _)
用reduceByKey對(duì)性能的提升
1、在本地進(jìn)行聚合以后蚌吸,在map端的數(shù)據(jù)量就變少了锈拨,減少磁盤IO。而且可以減少磁盤空間的占用羹唠。
2奕枢、下一個(gè)stage,拉取數(shù)據(jù)的量佩微,也就變少了缝彬。減少網(wǎng)絡(luò)的數(shù)據(jù)傳輸?shù)男阅芟摹?br> 3、在reduce端進(jìn)行數(shù)據(jù)緩存的內(nèi)存占用變少了哺眯。
4谷浅、reduce端,要進(jìn)行聚合的數(shù)據(jù)量也變少了奶卓。reduceByKey在什么情況下使用呢一疯?
1、非常普通的夺姑,比如說(shuō)墩邀,就是要實(shí)現(xiàn)類似于wordcount程序一樣的,對(duì)每個(gè)key對(duì)應(yīng)的值盏浙,進(jìn)行某種數(shù)據(jù)公式或者算法的計(jì)算(累加磕蒲、類乘)。
2只盹、對(duì)于一些類似于要對(duì)每個(gè)key進(jìn)行一些字符串拼接的這種較為復(fù)雜的操作辣往,可以自己衡量一下,其實(shí)有時(shí)殖卑,也是可以使用reduceByKey來(lái)實(shí)現(xiàn)的站削。但是不太好實(shí)現(xiàn)。如果真能夠?qū)崿F(xiàn)出來(lái)孵稽,對(duì)性能絕對(duì)是有幫助的许起。(shuffle基本上就占了整個(gè)spark作業(yè)的90%以上的性能消耗十偶,主要能對(duì)shuffle進(jìn)行一定的調(diào)優(yōu),都是有價(jià)值的)