廣播變量
在Spark Application中,經(jīng)常會(huì)使用到一個(gè)共享變量,眾所周知的芯砸,Spark是一個(gè)并行計(jì)算框架,對(duì)于這個(gè)變量给梅,每一個(gè)executor的task在訪(fǎng)問(wèn)它的時(shí)候假丧,都會(huì)去拷貝一份副本去使用。如下圖所示:
對(duì)于這種默認(rèn)方式动羽,它會(huì)極大的系統(tǒng)的內(nèi)存包帚,我們可以假設(shè)一個(gè)集群中有1024個(gè)task,這個(gè)共享變量大小假設(shè)為1M运吓,那么就會(huì)去復(fù)制1024份到集群上去渴邦,這樣就會(huì)有1個(gè)G的數(shù)據(jù)在網(wǎng)絡(luò)中傳輸,并且系統(tǒng)需要耗費(fèi)1G內(nèi)存去為這些副本分配空間拘哨,這樣對(duì)于系統(tǒng)有什么影響呢谋梭?
如果系統(tǒng)內(nèi)存不足,RDD持久化的時(shí)候無(wú)法在內(nèi)存中持久化倦青,需要持久化到磁盤(pán)中瓮床,那么后續(xù)的操作會(huì)因?yàn)轭l繁的磁盤(pán)IO使得速度變慢,性能下降。
當(dāng)task中創(chuàng)建對(duì)象時(shí)隘庄,發(fā)現(xiàn)堆中內(nèi)存不足踢步,那么就需要進(jìn)行GC操作,進(jìn)行GC的時(shí)候峭沦,會(huì)導(dǎo)致工作線(xiàn)程暫停贾虽,如果內(nèi)存嚴(yán)重不足,頻繁的GC對(duì)于Spark作業(yè)的速度的影響是可想而知的吼鱼。
對(duì)于以上問(wèn)題蓬豁,我們?cè)撛趺催M(jìn)行性能優(yōu)化呢?
這時(shí)菇肃,我們可以使用Broadcast地粪,將這種每個(gè)task需要用到的共享變量廣播出去。
從上面的圖中可以看到琐谤,當(dāng)每一個(gè)task需要使用這個(gè)變量的時(shí)候都會(huì)拷貝一份蟆技。如果使用廣播變量,首先該廣播變量會(huì)拷貝一份副本到Driver中斗忌,當(dāng)每一個(gè)executor的task使用到該變量時(shí)质礼,首先會(huì)去每個(gè)executor的BlockManager中去檢查是否有該變量的副本,如果沒(méi)有织阳,接著會(huì)去Driver中去拷貝一份副本到BlockManager中眶蕉,然后供該executor中的每一個(gè)task使用,到下一個(gè)executor的task需要使用這個(gè)變量時(shí)唧躲,它的BlockManager可以去Driver中拷貝副本造挽,也可以去距離比較近的executor的BlockManager中去拷貝。(每一個(gè)executor中的BlockManager的作用是負(fù)責(zé)管理每一個(gè)executor對(duì)應(yīng)的內(nèi)存和磁盤(pán)的數(shù)據(jù)弄痹。)其原理圖如上所示:
在默認(rèn)情況下饭入,如果是1024個(gè)task需要消耗1G內(nèi)存,但是如果我們有50個(gè)executor來(lái)平分這些task肛真,那么只需要50個(gè)副本即可谐丢,總共消耗了50M內(nèi)存,那么在內(nèi)存的消耗了節(jié)省了大約20倍蚓让。而且副本的復(fù)制有時(shí)不需要從Driver拷貝乾忱,而是從其他executor中拷貝,那么凭疮,網(wǎng)絡(luò) 傳輸帶來(lái)的性能消耗也會(huì)小很多,可想而知串述,使用廣播變量可以節(jié)省很多內(nèi)存执解,從而使得性能顯著提升。
如何使用廣播變量呢?
比如我們的共享變量是一個(gè)map類(lèi)型的變量衰腌,我們可以使用Spark上下文來(lái)創(chuàng)建廣播變量:
Broadcast<Map<String>> broadcast=sc.broadcast(map);
在task中使用的時(shí)候可以使用value方法或者getValue方法來(lái)獲取它的值:
Map<String> map=broadcast.value;
Kryo序列化
在上面通過(guò)廣播變量降低網(wǎng)絡(luò)傳輸壓力以及節(jié)省了不少內(nèi)存之后新蟆,我們可以再進(jìn)一步的優(yōu)化,序列化是一種不錯(cuò)的選擇右蕊,可以減少數(shù)據(jù)占用的內(nèi)存大小琼稻。
默認(rèn)的Spark使用的是java序列化機(jī)制,即通過(guò)ObjectOutputStream / ObjectInputStream饶囚,對(duì)象輸入輸出流機(jī)制帕翻,來(lái)進(jìn)行序列化。
使用默認(rèn)的這種序列化機(jī)制萝风,好處在于簡(jiǎn)單方便嘀掸,不需要你自己進(jìn)行任何配置,只需要在需要序列化的類(lèi)上實(shí)現(xiàn)Serializable接口规惰;缺點(diǎn)在于序列化的效率不高睬塌,序列化的速度比較低,序列化之后的數(shù)據(jù)占用空間依舊很大歇万。因此揩晴,我們?cè)诒匾獣r(shí)可以手動(dòng)指定序列化方式進(jìn)行優(yōu)化。
Spark支持Kryo序列化機(jī)制贪磺,Kryo序列化機(jī)制硫兰,比默認(rèn)的Java序列化機(jī)制,速度要快缘挽,序列化后的數(shù)據(jù)要更小瞄崇,大概是Java序列化機(jī)制的1/10。
因此壕曼,使用Kryo序列化機(jī)制可以讓 網(wǎng)絡(luò)中傳輸?shù)臄?shù)據(jù)更小苏研,而且在集群中耗費(fèi)的內(nèi)存也大大減少。
Kryo序列化作用的地方
Kryo序列化一旦啟用腮郊,在以下幾個(gè)地方將會(huì)生效:
算子函數(shù)中使用的外部變量摹蘑。算子函數(shù)中使用到的外部變量,使用Kryo以后轧飞,優(yōu)化網(wǎng)絡(luò)傳輸?shù)男阅苄坡梗梢詢(xún)?yōu)化集群中內(nèi)存的占用和消耗
持久化RDD時(shí)進(jìn)行序列化,比如StorageLevel.MEMORY_ONLY_SER过咬。持久化RDD大渤,優(yōu)化內(nèi)存的占用和消耗;持久化RDD占用的內(nèi)存越少掸绞,task執(zhí)行的時(shí)候泵三,創(chuàng)建的對(duì)象,就不至于頻繁的占滿(mǎn)內(nèi)存,頻繁發(fā)生GC烫幕。
shuffle時(shí)進(jìn)行序列化俺抽,可以?xún)?yōu)化網(wǎng)絡(luò)傳輸?shù)男阅堋?br>
如何使用Kryo序列化
第一步:在SparkConf中設(shè)置序列化屬性spark.serializer,值為org.apache.spark.serializer.KryoSerializer较曼。
第二步:注冊(cè)你需要使用Kryo序列化的一些自定義類(lèi)磷斧,使用SparkConf.registerKryoClasses()
方法進(jìn)行注冊(cè)。
例如:
new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{CategorySortKey.class})
使用廣播變量和序列化很簡(jiǎn)單捷犹,但是我們不能忽視它對(duì)于系統(tǒng)性能的影響弛饭,也許一個(gè)小小的修改就能對(duì)系統(tǒng)的性能提升很多倍,這也是我們性能優(yōu)化的重點(diǎn)伏恐,不能忽略每一個(gè)細(xì)節(jié)孩哑,一昧的去追去高大上的優(yōu)化技巧。