雖然spark已經(jīng)提供了大量簡單易用的API左胞,但要想編寫出高性能的spark應(yīng)用担钮,必須要對整體框架有一定的了解,對于Spark初學(xué)者來說是比較困難的厚棵。
針對這個這個問題蕉世,其實在spark1.6中,已經(jīng)加入了dataset婆硬,官方已經(jīng)對其進行了一系列的優(yōu)化狠轻,用戶可以將rdd轉(zhuǎn)化為dataset操作,減少學(xué)習(xí)成本彬犯。不過目前(1.6版本)依舊存在一些bug向楼。
下文講解了使用RDD編程時,常用的幾種代碼優(yōu)化方法谐区。
1. repartition和coalesce
這兩個方法都可以用在對數(shù)據(jù)的重新分區(qū)中蜜自,其中repartition
是一個代價很大的操作,它會將所有的數(shù)據(jù)進行一次shuffle卢佣,然后重新分區(qū)重荠。
如果你僅僅只是想減少分區(qū)數(shù),從而達到減少碎片任務(wù)或者碎片數(shù)據(jù)的目的虚茶。使用coalesce
就可以實現(xiàn)戈鲁,該操作默認(rèn)不會進行shuffle。其實repartition
只是coalesce
的shuffle版本嘹叫。
一般我們會在filter
算子過濾了大量數(shù)據(jù)后使用它婆殿。比如將 partition 數(shù)從1000減少到100。這可以減少碎片任務(wù)罩扇,降低啟動task的開銷婆芦。
note1: 如果想查看當(dāng)前rdd的分區(qū)數(shù),在java/scala中可以使用rdd.partitions.size()
喂饥,在python中使用rdd.getNumPartitions()
消约。
note2: 如果要增加分區(qū)數(shù),只能使用repartition,或者把partition縮減為一個非常小的值员帮,比如說“1”或粮,也建議使用repartition。
2. mapPartitions和foreachPartitions
適當(dāng)使用mapPartitions
和foreachPartitions
代替map
和foreach
可以提高程序運行速度捞高。這類操作一次會處理一個partition中的所有數(shù)據(jù)氯材,而不是一條數(shù)據(jù)渣锦。
mapPartition - 因為每次操作是針對partition的,那么操作中的很多對象和變量都將可以復(fù)用氢哮,比如說在方法中使用廣播變量等袋毙。
foreachPartition - 在和外部數(shù)據(jù)庫交互操作時使用,比如 redis , mysql 等冗尤。通過該方法可以避免頻繁的創(chuàng)建和銷毀鏈接听盖,每個partition使用一個數(shù)據(jù)庫鏈接,對效率的提升還是非常明顯的生闲。
note: 此類方法也存在缺陷媳溺,因為一次處理一個partition中的所有數(shù)據(jù)月幌,在內(nèi)存不足的時候碍讯,將會遇到OOM的問題。
3.reduceByKey和aggregateByKey
使用reduceByKey
/aggregateByKey
代替groupByKey
扯躺。
reduceByKey
/aggregateByKey
會先在map端對本地數(shù)據(jù)按照用戶定義的規(guī)則進行一次聚合捉兴,之后再將計算的結(jié)果進行shuffle,而groupByKey
則會將所以的計算放在reduce階段進行(全量數(shù)據(jù)在各個節(jié)點中進行了分發(fā)和傳輸)录语。所以前者的操作大量的減少shuffle的數(shù)據(jù)倍啥,減少了網(wǎng)絡(luò)IO,提高運行效率澎埠。
4. mapValues
針對k,v結(jié)構(gòu)的rdd虽缕,mapValues
直接對value進行操作,不對Key造成影響蒲稳,可以減少不必要的分區(qū)操作氮趋。
5. broadcast
Spark中廣播變量有幾個常見的用法。
-
實現(xiàn)map-side join
在需要join操作時江耀,將較小的那份數(shù)據(jù)轉(zhuǎn)化為普通的集合(數(shù)組)進行廣播剩胁,然后在大數(shù)據(jù)集中使用小數(shù)據(jù)進行相應(yīng)的查詢操作,就可以實現(xiàn)map-side join的功能祥国,避免了join操作的shuffle過程昵观。在我之前的文章中對此用法有詳細說明和過程圖解。
-
使用較大的外部變量
如果存在較大的外部變量(外部變量可以理解為在driver中定義的變量)舌稀,比如說字典數(shù)據(jù)等啊犬。在運算過程中,會將這個變量復(fù)制出多個副本壁查,傳輸?shù)矫總€task之中進行執(zhí)行椒惨。如果這個變量的大小有100M或者更大,將會浪費大量的網(wǎng)絡(luò)IO潮罪,同時康谆,executor也會因此被占用大量的內(nèi)存领斥,造成頻繁GC,甚至引發(fā)OOM沃暗。
因此在這種情況下月洛,我最好提前對該變量進行廣播,它會被事先將副本分發(fā)到每個executor中孽锥,同一executor中的task則在執(zhí)行時共享該變量嚼黔。很大程度的減少了網(wǎng)絡(luò)IO開銷以及executor的內(nèi)存使用。
6. 復(fù)用RDD
避免創(chuàng)建一些用處不大的中間RDD(比如從父RDD抽取了某幾個字段形成新的RDD)惜辑,這樣可以減少一些算子操作唬涧。
對多次使用的RDD進行緩存操作,減少重復(fù)計算盛撑,在下文有說明碎节。
7. cache和persist
cache
方法等價于persist(StorageLevel.MEMORY_ONLY)
不要濫用緩存操作。緩存操作非常消耗內(nèi)存抵卫,緩存前考慮好是否還可以對一些無關(guān)數(shù)據(jù)進行過濾狮荔。如果你的數(shù)據(jù)在接下來的操作中只使用一次,則不要進行緩存介粘。
如果需要復(fù)用RDD殖氏,則可以考慮使用緩存操作,將大幅度提高運行效率姻采。緩存也分幾個級別雅采。
-
MEMORY_ONLY
如果緩存的數(shù)據(jù)量不大或是內(nèi)存足夠,可以使用這種方式慨亲,該策略效率是最高的婚瓜。但是如果內(nèi)存不夠,之前緩存的數(shù)據(jù)則會被清出內(nèi)存巡雨。在spark1.6中闰渔,則會直接提示OOM。
-
MEMORY_AND_DISK
優(yōu)先將數(shù)據(jù)寫入內(nèi)存铐望,如果內(nèi)存不夠則寫入硬盤冈涧。較為穩(wěn)妥的策略,但是如果不是很復(fù)雜的計算正蛙,可能重算的速度比從磁盤中讀取還要快督弓。
-
MEMORY_ONLY_SER
會將RDD中的數(shù)據(jù)序列化后存入內(nèi)存,占用更小的內(nèi)存空間乒验,減少GC頻率愚隧,當(dāng)然,取出數(shù)據(jù)時需要反序列化锻全,同樣會消耗資源狂塘。
-
MEMORY_AND_DISK_SER
不再贅述录煤。
-
DISK_ONLY
該策略類似于checkPoint方法,把所有的數(shù)據(jù)存入了硬盤荞胡,再使用的時候從中讀出妈踊。適用于數(shù)據(jù)量很大,重算代價也非常高的操作泪漂。
-
各種
_2
結(jié)尾的存儲策略實際上是對緩存的數(shù)據(jù)做了一個備份廊营,代價非常高,一般不建議使用萝勤。
結(jié)語
spark的優(yōu)化方法還有很多露筒,這篇文章主要從使用的角度講解了常用的優(yōu)化方法,具體的使用方法可以參考博主的其他優(yōu)化文章敌卓。