為什么需要調(diào)優(yōu)
在大數(shù)據(jù)計(jì)算領(lǐng)域,Spark已經(jīng)成為了越來(lái)越流行剃幌、越來(lái)越受歡迎的計(jì)算平臺(tái)之一聋涨。然而,通過(guò)Spark開(kāi)發(fā)出高性能的大數(shù)據(jù)計(jì)算作業(yè)负乡,并不是那么簡(jiǎn)單的牍白。如果沒(méi)有對(duì)Spark作業(yè)進(jìn)行合理的調(diào)優(yōu),Spark作業(yè)的執(zhí)行速度可能會(huì)很慢抖棘,這樣就完全體現(xiàn)不出Spark作為一種快速大數(shù)據(jù)計(jì)算引擎的優(yōu)勢(shì)來(lái)茂腥。因此,想要用好Spark切省,就必須對(duì)其進(jìn)行合理的性能優(yōu)化最岗。
Spark的性能調(diào)優(yōu)由開(kāi)發(fā)調(diào)優(yōu)、資源調(diào)優(yōu)朝捆、數(shù)據(jù)傾斜調(diào)優(yōu)般渡、shuffle調(diào)優(yōu)幾個(gè)部分組成。開(kāi)發(fā)調(diào)優(yōu)和資源調(diào)優(yōu)是所有Spark作業(yè)都需要注意和遵循的一些基本原則芙盘,是高性能Spark作業(yè)的基礎(chǔ)驯用;數(shù)據(jù)傾斜調(diào)優(yōu),主要用一套完整的用來(lái)解決Spark作業(yè)數(shù)據(jù)傾斜的解決方案儒老;shuffle調(diào)優(yōu)蝴乔,面向的是對(duì)Spark的原理有較深層次掌握的開(kāi)發(fā)者。
性能優(yōu)化學(xué)習(xí)
學(xué)習(xí)Spark開(kāi)發(fā)調(diào)優(yōu)和資源調(diào)優(yōu)比較好的方式是參考美團(tuán)點(diǎn)評(píng)技術(shù)團(tuán)隊(duì)的技術(shù)博客Spark性能調(diào)優(yōu)-基礎(chǔ)篇驮樊,這里已經(jīng)寫得非常全面了薇正,學(xué)習(xí)完就可以掌握Spark性能調(diào)優(yōu)的基礎(chǔ)部分了片酝。總體可以分為兩個(gè)方面:
- 開(kāi)發(fā)調(diào)優(yōu)
Spark性能優(yōu)化的第一步挖腰,就是要在開(kāi)發(fā)Spark作業(yè)的過(guò)程中注意和應(yīng)用一些性能優(yōu)化的基本原則雕沿。開(kāi)發(fā)調(diào)優(yōu),包括:RDD lineage設(shè)計(jì)曙聂、算子的合理使用晦炊、特殊操作的優(yōu)化等。在開(kāi)發(fā)過(guò)程中宁脊,時(shí)時(shí)刻刻都應(yīng)該注意以上原則断国,并將這些原則根據(jù)具體的業(yè)務(wù)以及實(shí)際的應(yīng)用場(chǎng)景,靈活地運(yùn)用到自己的Spark作業(yè)中榆苞。- 避免創(chuàng)建重復(fù)的RDD
- 盡可能復(fù)用同一個(gè)RDD
- 對(duì)多次使用的RDD進(jìn)行持久化
- 盡量避免使用shuffle類算子
- 使用map-side預(yù)聚合的shuffle操作
- 使用高性能的算子
- 廣播大變量
- 使用Kryo優(yōu)化序列化性能
- 優(yōu)化數(shù)據(jù)結(jié)構(gòu)
- 資源調(diào)優(yōu)
在開(kāi)發(fā)完Spark作業(yè)之后稳衬,就該為作業(yè)配置合適的資源了。Spark的資源參數(shù)坐漏,基本都可以在spark-submit命令中作為參數(shù)設(shè)置薄疚。很多Spark初學(xué)者,通常不知道該設(shè)置哪些必要的參數(shù)赊琳,以及如何設(shè)置這些參數(shù)街夭,最后就只能胡亂設(shè)置,甚至壓根兒不設(shè)置躏筏。資源參數(shù)設(shè)置的不合理板丽,可能會(huì)導(dǎo)致沒(méi)有充分利用集群資源,作業(yè)運(yùn)行會(huì)極其緩慢趁尼;或者設(shè)置的資源過(guò)大埃碱,隊(duì)列沒(méi)有足夠的資源來(lái)提供,進(jìn)而導(dǎo)致各種異常酥泞。調(diào)整的主要是一系列的資源相關(guān)參數(shù)砚殿。
所謂的Spark資源參數(shù)調(diào)優(yōu),其實(shí)主要就是對(duì)Spark運(yùn)行過(guò)程中各個(gè)使用資源的地方芝囤,通過(guò)調(diào)節(jié)各種參數(shù)似炎,來(lái)優(yōu)化資源使用的效率,從而提升Spark作業(yè)的執(zhí)行性能凡人。- num-executors
- executor-memory
- executor-cores
- driver-memory
- spark.default.parallelism
- spark.storage.memoryFraction
- spark.shuffle.memoryFraction
性能優(yōu)化實(shí)踐
以MovieLens數(shù)據(jù)集為基礎(chǔ)名党,完成Spark的Map-Side Join和Reduce Side Join例子(過(guò)濾出評(píng)分高于4.0分的電影,要求顯示電影ID 電影名稱 電影分?jǐn)?shù)),并比較性能優(yōu)劣挠轴。應(yīng)該如何調(diào)整不同的spark-submit參數(shù)獲得最佳效果(運(yùn)行時(shí)間),并給出基于目前的運(yùn)行環(huán)境最優(yōu)參數(shù)設(shè)置方案耳幢。
查看數(shù)據(jù)
簡(jiǎn)單查看一下所有表的結(jié)構(gòu)才能完成目標(biāo)任務(wù)岸晦。
所有評(píng)級(jí)都包含在“ratings.dat”文件中欧啤,并且位于格式如下:
用戶名 | MovieID | 評(píng)級(jí) | 時(shí)間戳 |
---|
- UserID的范圍在1到6040之間
- MovieID的范圍在1到3952之間
- 評(píng)級(jí)為5星級(jí)(僅限全星評(píng)級(jí))
- 時(shí)間戳以秒為單位表示
- 每個(gè)用戶至少有20個(gè)評(píng)級(jí)
用戶信息位于“users.dat”文件中,如下所示
用戶名 | 性別 | 年齡 | 職業(yè) | 郵政編碼 |
---|
性別用男性表示“M”启上,女性表示“F”表示
-
年齡選自以下范圍:
- 1:“18歲以下”
- 18:“18-24”
- 25:“25-34”
- 35:“35-44”
- 45:“45-49”
- 50:“50-55”
- 56:“56+”
職業(yè)選自0-20的數(shù)字邢隧,分別代表不同意義(具體意義可查看官網(wǎng))
電影信息位于文件“movies.dat”中,如下所示
MovieID | 標(biāo)題 | 流派 |
---|
- 流派是安裝分隔符(|)分開(kāi)的冈在,關(guān)鍵字符的拼接倒慧,如(Animation|Children's|Comedy)
- 由于意外重復(fù),某些MovieID與電影不對(duì)應(yīng)
條目和/或測(cè)試條目 - 電影大多是手動(dòng)輸入的包券,因此可能存在錯(cuò)誤和不一致
所以任務(wù)就是把3個(gè)表連接起來(lái)并按條件過(guò)濾纫谅,但是不同的連接方式在性能上會(huì)出現(xiàn)極大的差距。
Reduce Side Join
- 當(dāng)兩個(gè)文件/目錄中的數(shù)據(jù)非常大溅固,難以將某一個(gè)存放到內(nèi)存中時(shí)付秕,Reduce-side Join是一種解決思路。該算法需要通過(guò)Map和Reduce兩個(gè)階段完成侍郭,在Map階段询吴,將key相同的記錄劃分給同一個(gè)Reduce Task(需標(biāo)記每條記錄的來(lái)源,便于在Reduce階段合并)亮元,在Reduce階段猛计,對(duì)key相同的進(jìn)行合并。
- reduce-side-join 的缺陷在于會(huì)將key相同的數(shù)據(jù)發(fā)送到同一個(gè)partition中進(jìn)行運(yùn)算爆捞,大數(shù)據(jù)集的傳輸需要長(zhǎng)時(shí)間的IO奉瘤,同時(shí)任務(wù)并發(fā)度收到限制,還可能造成數(shù)據(jù)傾斜嵌削。
- Spark提供了Join算子毛好,可以直接通過(guò)該算子實(shí)現(xiàn)reduce-side join,但要求RDD中的記錄必須是pair苛秕,即RDD[KEY, VALUE]
簡(jiǎn)單的講就是先把集群上key相同的數(shù)據(jù)拉取到一個(gè)節(jié)點(diǎn)(shuffle操作)肌访,為每個(gè)key的數(shù)據(jù)創(chuàng)建一個(gè)task進(jìn)行join連接操作,然后再把每個(gè)key連接的結(jié)果進(jìn)行匯總艇劫。
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object ReduceJoin {
def main(args: Array[String]){
val conf = new SparkConf()
val sc = new SparkContext(conf)
val ratingRDD = sc.textFile(args(0)) //rating.dat表
val moviesRDD = sc.textFile(args(1)) //movies.dat表
val startTime = System.currentTimeMillis()//開(kāi)始時(shí)間
val ratingPair = ratingRDD.map { x => //將數(shù)據(jù)轉(zhuǎn)化為(K,V)吼驶,K為movieID,V為平均分店煞,直接join
val temp = x.split("::") //按照原始數(shù)據(jù)格式拆分為RDD格式
(temp(1),(temp(2).toFloat,1))
}.reduceByKey((x,y) => (x._1.toFloat+y._1.toFloat,x._2+y._2)).mapValues(x =>
x._1/x._2 //通過(guò)每部電影總分和評(píng)論的人數(shù)計(jì)算出平均分
).filter(x => x._2.toFloat >= 4.0)//過(guò)濾出分?jǐn)?shù)高于4分的電影
val moviePair = moviesRDD.map { x => //將它的類型轉(zhuǎn)化為(K,V)蟹演,K為movieID,方便join操作
val temp = x.split("::")
(temp(0),temp(1))
}
//println(ratingPair.count()) //查看rating表是否成功過(guò)濾掉4.0以下的電影
//根據(jù)key(movieID)進(jìn)行連接,并將數(shù)據(jù)從KV形式格式化為原始格式
val result = moviePair.join(ratingPair).map(x => (x._1,x._2._1,x._2._2))
result.saveAsTextFile(args(2))
val endTime = System.currentTimeMillis()//結(jié)束時(shí)間
println("運(yùn)行時(shí)間(秒)"+(endTime-startTime)*0.001)
}
}
然后編寫相應(yīng)的運(yùn)行腳本顷蟀,這里submit的時(shí)間隨便使用最簡(jiǎn)單的幾個(gè)參數(shù)酒请,因?yàn)槟康氖菍?duì)比map-side join和reduce-side join性能上的差異。
#!/bin/bash
hdfs dfs -rm -r /tmp/result
spark-submit --class ReduceJoin --master yarn-cluster /usr/tmp/untitled.jar /tmp/input/ratings.dat /tmp/input/movies.dat /tmp/result
提交任務(wù)后就可以去master的8088端口查看spark任務(wù)的執(zhí)行情況了鸣个,18088端口查看執(zhí)行記錄和詳細(xì)過(guò)程羞反,看到Reduce-side join任務(wù)執(zhí)行情況如下:
[圖片上傳中...(image.png-94aeb0-1535712775641-0)]
這里首先需要明白job布朦,stage,task的概念昼窗。簡(jiǎn)單的講是趴,我們提交一個(gè)作業(yè)到spark,spark首先根據(jù)提交作業(yè)中的action算子將作業(yè)分為若干個(gè)job澄惊。
之后對(duì)于每個(gè)job而言唆途,Spark是根據(jù)shuffle類算子來(lái)進(jìn)行stage的劃分。如果我們的代碼中執(zhí)行了某個(gè)shuffle類算子(比如reduceByKey掸驱、join等)肛搬,那么就會(huì)在該算子處,劃分出一個(gè)stage界限來(lái)亭敢」鐾瘢可以大致理解為,shuffle算子執(zhí)行之前的代碼會(huì)被劃分為一個(gè)stage帅刀,shuffle算子執(zhí)行以及之后的代碼會(huì)被劃分為下一個(gè)stage让腹。
每個(gè)stage執(zhí)行一部分代碼片段,并為每個(gè)stage創(chuàng)建一批task扣溺,然后將這些task分配到各個(gè)Executor進(jìn)程中執(zhí)行骇窍。task是最小的計(jì)算單元,負(fù)責(zé)執(zhí)行一模一樣的計(jì)算邏輯(也就是我們自己編寫的某個(gè)代碼片段)锥余,只是每個(gè)task處理的數(shù)據(jù)不同而已腹纳。
這里只有一個(gè)job是因?yàn)橹挥幸粋€(gè)action算子(savaAsTextFile),3個(gè)stage是因?yàn)閞educeByKey屬于shuffle算子驱犹,還有未經(jīng)協(xié)同劃分的join也屬于shuffle算子嘲恍,一起將job分成了3個(gè)stage,每個(gè)stage的2個(gè)task是因?yàn)镽DD數(shù)據(jù)被存在了兩臺(tái)機(jī)器上雄驹。通過(guò)時(shí)間統(tǒng)計(jì)可以看到stage1是最消耗時(shí)間的佃牛,因?yàn)樗獔?zhí)行reduceByKey的shuffle操作,會(huì)把key相同的數(shù)據(jù)集中到一個(gè)節(jié)點(diǎn)医舆,在這個(gè)時(shí)候數(shù)據(jù)是整個(gè)評(píng)論數(shù)據(jù)集俘侠。而后面求平均后過(guò)濾再join的時(shí)候數(shù)據(jù)已經(jīng)變得不是那么多了,所以這里的shuffle相對(duì)消耗時(shí)間較少(網(wǎng)絡(luò)蔬将,IO少)爷速。
Map-Side Join
- Map-side Join使用場(chǎng)景是一個(gè)大表和一個(gè)小表的連接操作,其中霞怀,“小表”是指文件足夠小惫东,可以加載到內(nèi)存中。該算法可以將join算子執(zhí)行在Map端毙石,無(wú)需經(jīng)歷shuffle和reduce等階段凿蒜,因此效率非常高禁谦。
- 在Hadoop MapReduce中胁黑, map-side join是借助DistributedCache實(shí)現(xiàn)的废封。DistributedCache可以幫我們將小文件分發(fā)到各個(gè)節(jié)點(diǎn)的Task工作目錄下,這樣丧蘸,我們只需在程序中將文件加載到內(nèi)存中(比如保存到Map數(shù)據(jù)結(jié)構(gòu)中)漂洋,然后借助Mapper的迭代機(jī)制,遍歷另一個(gè)大表中的每一條記錄力喷,并查找是否在小表中刽漂,如果在則輸出,否則跳過(guò)弟孟。
- 在Apache Spark中贝咙,同樣存在類似于DistributedCache的功能,稱為“廣播變量”(Broadcast variable)拂募。其實(shí)現(xiàn)原理與DistributedCache非常類似庭猩,但提供了更多的數(shù)據(jù)/文件廣播算法,包括高效的P2P算法陈症,該算法在節(jié)點(diǎn)數(shù)目非常多的場(chǎng)景下蔼水,效率遠(yuǎn)遠(yuǎn)好于DistributedCache這種基于HDFS共享存儲(chǔ)的方式。使用MapReduce DistributedCache時(shí)录肯,用戶需要顯示地使用File API編寫程序從本地讀取小表數(shù)據(jù)趴腋,而Spark則不用,它借助Scala語(yǔ)言強(qiáng)大的函數(shù)閉包特性论咏,可以隱藏?cái)?shù)據(jù)/文件廣播過(guò)程优炬,讓用戶編寫程序更加簡(jiǎn)單。
簡(jiǎn)單的講就是把需要join的數(shù)據(jù)集中較小的那個(gè)數(shù)據(jù)集進(jìn)行廣播(因?yàn)樵诜植际较到y(tǒng)應(yīng)用中厅贪,存儲(chǔ)數(shù)據(jù)都是用RDD對(duì)象蠢护,每個(gè)RDD對(duì)象中的數(shù)據(jù)都被劃分為多個(gè)分區(qū),每個(gè)節(jié)點(diǎn)都只持有部分分區(qū)卦溢,也就是數(shù)據(jù)集的一部分糊余,而廣播就是讓每個(gè)節(jié)點(diǎn)都持有被廣播數(shù)據(jù)的完整信息),然后在每個(gè)節(jié)點(diǎn)上(map端操作)將自己節(jié)點(diǎn)上持有的部分?jǐn)?shù)據(jù)和被廣播的表進(jìn)行連接即可单寂。但是需要注意贬芥,因?yàn)槟莻€(gè)小的數(shù)據(jù)集要被廣播,所以要求每個(gè)節(jié)點(diǎn)的內(nèi)存必須足夠存儲(chǔ)被廣播的那個(gè)數(shù)據(jù)集宣决,不然就不能進(jìn)行map-side-join蘸劈。
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object MapJoin {
def main(args: Array[String]){
val conf = new SparkConf()
val sc = new SparkContext(conf)
val ratingRDD = sc.textFile(args(0)) //rating.dat表
val moviesRDD = sc.textFile(args(1)) //movies.dat表
val startTime = System.currentTimeMillis()//開(kāi)始時(shí)間
val ratingPair = ratingRDD.map { x => //將數(shù)據(jù)轉(zhuǎn)化為(K,V),K為movieID尊沸,V為平均分
val temp = x.split("::") //按照原始數(shù)據(jù)格式拆分為RDD格式
(temp(1),(temp(2).toFloat,1))
}.reduceByKey((x,y) => (x._1.toFloat+y._1.toFloat,x._2+y._2)).mapValues(x =>
x._1/x._2 //通過(guò)每部電影總分和評(píng)論的人數(shù)計(jì)算出平均分
).filter(x => x._2 >= 4.0)//過(guò)濾出分?jǐn)?shù)高于4分的電影
val moviePair = moviesRDD.map { x => //將數(shù)據(jù)轉(zhuǎn)化為(K,V)威沫,K為movieID贤惯,V為電影name
val temp = x.split("::")
(temp(0),temp(1))
}.collectAsMap//保存為map 進(jìn)行廣播
var moviesBroadcast = sc.broadcast(moviePair) //將電影數(shù)據(jù)集廣播,使每個(gè)節(jié)點(diǎn)都有一份完整的棒掠,就不需要shuffle
var result = ratingPair.map({ x =>
var movies = moviesBroadcast.value //取出廣播變量?jī)?nèi)容值
var name = movies.getOrElse(x._1,"No") //取出當(dāng)前movieID的電影名字
(x._1,(name,x._2)) //
})
result.map(x => (x._1,x._2._1,x._2._2)).saveAsTextFile(args(2))//重新定義輸出格式并輸出
val endTime = System.currentTimeMillis()//結(jié)束時(shí)間
println("運(yùn)行時(shí)間(秒)"+(endTime-startTime)*0.001)
}
}
這里出現(xiàn)了兩個(gè)Job的原因是有兩個(gè)action算子(saveAsTextFile孵构,collectAsMap)。在Job0中烟很,只進(jìn)行了一個(gè)工作collectAsMap颈墅,是為了后面廣播方便。在Job1中雾袱,因?yàn)槲覀儽苊饬撕臅r(shí)的join的Shuffle操作恤筛,自然就只有兩個(gè)stage了。
-
這里還有一個(gè)地方可以改進(jìn),在廣播的時(shí)候我有兩個(gè)選擇,廣播內(nèi)容為(電影ID修肠,電影名字)的RDD和(電影ID,電影平均分)煎殷,雖然兩個(gè)數(shù)據(jù)集的行數(shù)一樣,但是述么,電影名字的字節(jié)數(shù)遠(yuǎn)遠(yuǎn)大于平均分的字節(jié)數(shù)蝌数,所以廣播(電影ID,電影平均分)度秘,最后再把分?jǐn)?shù)低于4.0的過(guò)濾掉顶伞,可以傳輸更小的字節(jié)數(shù),節(jié)約IO和網(wǎng)絡(luò)傳輸時(shí)耗剑梳,時(shí)間縮短了3s唆貌。
運(yùn)行結(jié)果分析
- 從整體運(yùn)行時(shí)間來(lái)看,Reduce-side Join和Map-side Join分別為27s和24s垢乙,其最大的原因就是Reduce-side Join有更多的Shuffle操作锨咙,增加網(wǎng)絡(luò)和IO時(shí)耗。
- Map-side Join的時(shí)耗主要是collectAsMap操作耗時(shí)追逮,而Reduce-side Join的時(shí)耗主要是兩個(gè)shuffle操作酪刀。在這里主要是為了計(jì)算電影評(píng)分的平均分,使得Map-side Join不得不用了一次shuffle操作钮孵,如果只是單純的連接表骂倘,不需要求平均值的話,那么Map-side Join就不需要shuffle操作巴席,會(huì)變得非忱裕快了,這樣Map-side Join和Reduce-side Join的差異會(huì)更明顯了。
- 所以在開(kāi)發(fā)過(guò)程中盡肯能的要去避免shuffle操作荧库,用高性能的算子去代替堰塌。
Spark參數(shù)優(yōu)化
前面說(shuō)了,除了可以在開(kāi)發(fā)過(guò)程中進(jìn)行開(kāi)發(fā)調(diào)優(yōu)分衫,還可以靈活的分配資源场刑,使在現(xiàn)有資源上運(yùn)行達(dá)到最優(yōu)。以Map-side Join為例丐箩,使用3臺(tái)centos6.5虛擬機(jī)摇邦,內(nèi)存分別為6 2 2,cup為4核心i5屎勘。
詳細(xì)原理見(jiàn)上圖瓤摧。我們使用spark-submit提交一個(gè)Spark作業(yè)之后,這個(gè)作業(yè)就會(huì)啟動(dòng)一個(gè)對(duì)應(yīng)的Driver進(jìn)程玉吁。根據(jù)你使用的部署模式(deploy-mode)不同照弥,Driver進(jìn)程可能在本地啟動(dòng),也可能在集群中某個(gè)工作節(jié)點(diǎn)上啟動(dòng)进副。Driver進(jìn)程本身會(huì)根據(jù)我們?cè)O(shè)置的參數(shù)这揣,占有一定數(shù)量的內(nèi)存和CPU core。而Driver進(jìn)程要做的第一件事情影斑,就是向集群管理器(可以是Spark Standalone集群给赞,也可以是其他的資源管理集群,美團(tuán)?大眾點(diǎn)評(píng)使用的是YARN作為資源管理集群)申請(qǐng)運(yùn)行Spark作業(yè)需要使用的資源矫户,這里的資源指的就是Executor進(jìn)程片迅。YARN集群管理器會(huì)根據(jù)我們?yōu)镾park作業(yè)設(shè)置的資源參數(shù),在各個(gè)工作節(jié)點(diǎn)上皆辽,啟動(dòng)一定數(shù)量的Executor進(jìn)程柑蛇,每個(gè)Executor進(jìn)程都占有一定數(shù)量的內(nèi)存和CPU core。
在申請(qǐng)到了作業(yè)執(zhí)行所需的資源之后驱闷,Driver進(jìn)程就會(huì)開(kāi)始調(diào)度和執(zhí)行我們編寫的作業(yè)代碼了耻台。Driver進(jìn)程會(huì)將我們編寫的Spark作業(yè)代碼分拆為多個(gè)stage,每個(gè)stage執(zhí)行一部分代碼片段遗嗽,并為每個(gè)stage創(chuàng)建一批task粘我,然后將這些task分配到各個(gè)Executor進(jìn)程中執(zhí)行。task是最小的計(jì)算單元,負(fù)責(zé)執(zhí)行一模一樣的計(jì)算邏輯(也就是我們自己編寫的某個(gè)代碼片段)征字,只是每個(gè)task處理的數(shù)據(jù)不同而已都弹。一個(gè)stage的所有task都執(zhí)行完畢之后,會(huì)在各個(gè)節(jié)點(diǎn)本地的磁盤文件中寫入計(jì)算中間結(jié)果匙姜,然后Driver就會(huì)調(diào)度運(yùn)行下一個(gè)stage畅厢。下一個(gè)stage的task的輸入數(shù)據(jù)就是上一個(gè)stage輸出的中間結(jié)果。如此循環(huán)往復(fù)氮昧,直到將我們自己編寫的代碼邏輯全部執(zhí)行完框杜,并且計(jì)算完所有的數(shù)據(jù),得到我們想要的結(jié)果為止袖肥。
Spark是根據(jù)shuffle類算子來(lái)進(jìn)行stage的劃分咪辱。如果我們的代碼中執(zhí)行了某個(gè)shuffle類算子(比如reduceByKey、join等)椎组,那么就會(huì)在該算子處油狂,劃分出一個(gè)stage界限來(lái)〈绨可以大致理解為专筷,shuffle算子執(zhí)行之前的代碼會(huì)被劃分為一個(gè)stage,shuffle算子執(zhí)行以及之后的代碼會(huì)被劃分為下一個(gè)stage蒸苇。因此一個(gè)stage剛開(kāi)始執(zhí)行的時(shí)候磷蛹,它的每個(gè)task可能都會(huì)從上一個(gè)stage的task所在的節(jié)點(diǎn),去通過(guò)網(wǎng)絡(luò)傳輸拉取需要自己處理的所有key溪烤,然后對(duì)拉取到的所有相同的key使用我們自己編寫的算子函數(shù)執(zhí)行聚合操作(比如reduceByKey()算子接收的函數(shù))味咳。這個(gè)過(guò)程就是shuffle。
當(dāng)我們?cè)诖a中執(zhí)行了cache/persist等持久化操作時(shí)氛什,根據(jù)我們選擇的持久化級(jí)別的不同莺葫,每個(gè)task計(jì)算出來(lái)的數(shù)據(jù)也會(huì)保存到Executor進(jìn)程的內(nèi)存或者所在節(jié)點(diǎn)的磁盤文件中。
因此Executor的內(nèi)存主要分為三塊:第一塊是讓task執(zhí)行我們自己編寫的代碼時(shí)使用枪眉,默認(rèn)是占Executor總內(nèi)存的20%捺檬;第二塊是讓task通過(guò)shuffle過(guò)程拉取了上一個(gè)stage的task的輸出后,進(jìn)行聚合等操作時(shí)使用贸铜,默認(rèn)也是占Executor總內(nèi)存的20%堡纬;第三塊是讓RDD持久化時(shí)使用,默認(rèn)占Executor總內(nèi)存的60%蒿秦。
task的執(zhí)行速度是跟每個(gè)Executor進(jìn)程的CPU core數(shù)量有直接關(guān)系的烤镐。一個(gè)CPU core同一時(shí)間只能執(zhí)行一個(gè)線程。而每個(gè)Executor進(jìn)程上分配到的多個(gè)task棍鳖,都是以每個(gè)task一條線程的方式炮叶,多線程并發(fā)運(yùn)行的碗旅。如果CPU core數(shù)量比較充足,而且分配到的task數(shù)量比較合理镜悉,那么通常來(lái)說(shuō)祟辟,可以比較快速和高效地執(zhí)行完這些task線程。
以上就是Spark作業(yè)的基本運(yùn)行原理的說(shuō)明侣肄,大家可以結(jié)合上圖來(lái)理解旧困。理解作業(yè)基本原理,是我們進(jìn)行資源參數(shù)調(diào)優(yōu)的基本前提稼锅。
num-executors
- 參數(shù)說(shuō)明:該參數(shù)用于設(shè)置Spark作業(yè)總共要用多少個(gè)Executor進(jìn)程來(lái)執(zhí)行吼具。Driver在向YARN集群管理器申請(qǐng)資源時(shí),YARN集群管理器會(huì)盡可能按照你的設(shè)置來(lái)在集群的各個(gè)工作節(jié)點(diǎn)上矩距,啟動(dòng)相應(yīng)數(shù)量的Executor進(jìn)程拗盒。這個(gè)參數(shù)非常之重要,如果不設(shè)置的話剩晴,默認(rèn)只會(huì)給你啟動(dòng)少量的Executor進(jìn)程锣咒,此時(shí)你的Spark作業(yè)的運(yùn)行速度是非常慢的。
- 參數(shù)調(diào)優(yōu)建議:每個(gè)Spark作業(yè)的運(yùn)行一般設(shè)置50~100個(gè)左右的Executor進(jìn)程比較合適赞弥,設(shè)置太少或太多的Executor進(jìn)程都不好。設(shè)置的太少趣兄,無(wú)法充分利用集群資源绽左;設(shè)置的太多的話,大部分隊(duì)列可能無(wú)法給予充分的資源艇潭。
之前沒(méi)有手動(dòng)設(shè)置這個(gè)參數(shù)拼窥,可以看到spark啟動(dòng)了2個(gè)executor進(jìn)程。我這里只有3臺(tái)虛擬機(jī)蹋凝,嘗試就設(shè)置為3了鲁纠。發(fā)現(xiàn)性能降低了很多,估計(jì)是因?yàn)槲业臄?shù)據(jù)太小了鳍寂,節(jié)點(diǎn)多了就加大了shuffle的消耗改含,所以設(shè)置為1,發(fā)現(xiàn)更快了迄汛,所以這真的是因?yàn)閿?shù)據(jù)太小捍壤,數(shù)據(jù)傳輸?shù)臅r(shí)間代價(jià)大于數(shù)據(jù)處理的時(shí)間代價(jià)。平常大數(shù)據(jù)情況下這個(gè)參數(shù)應(yīng)該根據(jù)經(jīng)驗(yàn)設(shè)置為50-100鞍爱。
executor-memory
- 參數(shù)說(shuō)明:該參數(shù)用于設(shè)置每個(gè)Executor進(jìn)程的內(nèi)存鹃觉。Executor內(nèi)存的大小,很多時(shí)候直接決定了Spark作業(yè)的性能睹逃,而且跟常見(jiàn)的JVM OOM異常盗扇,也有直接的關(guān)聯(lián)。
- 參數(shù)調(diào)優(yōu)建議:每個(gè)Executor進(jìn)程的內(nèi)存設(shè)置4G~8G較為合適。但是這只是一個(gè)參考值疗隶,具體的設(shè)置還是得根據(jù)不同部門的資源隊(duì)列來(lái)定佑笋。可以看看自己團(tuán)隊(duì)的資源隊(duì)列的最大內(nèi)存限制是多少抽减,num-executors乘以executor-memory允青,是不能超過(guò)隊(duì)列的最大內(nèi)存量的。此外卵沉,如果你是跟團(tuán)隊(duì)里其他人共享這個(gè)資源隊(duì)列颠锉,那么申請(qǐng)的內(nèi)存量最好不要超過(guò)資源隊(duì)列最大總內(nèi)存的1/3 ~1/2,避免你自己的Spark作業(yè)占用了隊(duì)列所有的資源史汗,導(dǎo)致別的同學(xué)的作業(yè)無(wú)法運(yùn)行琼掠。
executor-cores
- 參數(shù)說(shuō)明:該參數(shù)用于設(shè)置每個(gè)Executor進(jìn)程的CPU core數(shù)量冠桃。這個(gè)參數(shù)決定了每個(gè)Executor進(jìn)程并行執(zhí)行task線程的能力。因?yàn)槊總€(gè)CPU core同一時(shí)間只能執(zhí)行一個(gè)task線程道宅,因此每個(gè)Executor進(jìn)程的CPU core數(shù)量越多食听,越能夠快速地執(zhí)行完分配給自己的所有task線程。
- 參數(shù)調(diào)優(yōu)建議:Executor的CPU core數(shù)量設(shè)置為2~4個(gè)較為合適污茵。同樣得根據(jù)不同部門的資源隊(duì)列來(lái)定樱报,可以看看自己的資源隊(duì)列的最大CPU core限制是多少,再依據(jù)設(shè)置的Executor數(shù)量泞当,來(lái)決定每個(gè)Executor進(jìn)程可以分配到幾個(gè)CPU core迹蛤。同樣建議,如果是跟他人共享這個(gè)隊(duì)列零蓉,那么num-executors * executor-cores不要超過(guò)隊(duì)列總CPU core的1/3~1/2左右比較合適笤受,也是避免影響其他同學(xué)的作業(yè)運(yùn)行。
由于我的集群是運(yùn)行在虛擬機(jī)上的敌蜂,所以所有節(jié)點(diǎn)共享windows的cpu箩兽,即每個(gè)節(jié)點(diǎn)相當(dāng)于有4個(gè)cpu,所以設(shè)置為4章喉,發(fā)現(xiàn)報(bào)錯(cuò)了汗贫,應(yīng)該是資源不足身坐,設(shè)置為2也報(bào)錯(cuò),應(yīng)該是虛擬機(jī)的cpu限制機(jī)制吧落包,所以只能設(shè)置為1或者默認(rèn)了部蛇。
driver-memory
- 參數(shù)說(shuō)明:該參數(shù)用于設(shè)置Driver進(jìn)程的內(nèi)存。
- 參數(shù)調(diào)優(yōu)建議:Driver的內(nèi)存通常來(lái)說(shuō)不設(shè)置咐蝇,或者設(shè)置1G左右應(yīng)該就夠了涯鲁。唯一需要注意的一點(diǎn)是,如果需要使用collect算子將RDD的數(shù)據(jù)全部拉取到Driver上進(jìn)行處理有序,那么必須確保Driver的內(nèi)存足夠大抹腿,否則會(huì)出現(xiàn)OOM內(nèi)存溢出的問(wèn)題。
這個(gè)參數(shù)只要保證進(jìn)行collect算子的時(shí)候旭寿,所有數(shù)據(jù)全部集中到Driver進(jìn)程不會(huì)oom就行了警绩,我這里數(shù)據(jù)相當(dāng)小就不用設(shè)置了。
spark.default.parallelism
- 參數(shù)說(shuō)明:該參數(shù)用于設(shè)置每個(gè)stage的默認(rèn)task數(shù)量盅称。這個(gè)參數(shù)極為重要肩祥,如果不設(shè)置可能會(huì)直接影響你的Spark作業(yè)性能。
- 參數(shù)調(diào)優(yōu)建議:Spark作業(yè)的默認(rèn)task數(shù)量為500~1000個(gè)較為合適缩膝。很多同學(xué)常犯的一個(gè)錯(cuò)誤就是不去設(shè)置這個(gè)參數(shù)混狠,那么此時(shí)就會(huì)導(dǎo)致Spark自己根據(jù)底層HDFS的block數(shù)量來(lái)設(shè)置task的數(shù)量,默認(rèn)是一個(gè)HDFS block對(duì)應(yīng)一個(gè)task疾层。通常來(lái)說(shuō)檀蹋,Spark默認(rèn)設(shè)置的數(shù)量是偏少的(比如就幾十個(gè)task),如果task數(shù)量偏少的話云芦,就會(huì)導(dǎo)致你前面設(shè)置好的Executor的參數(shù)都前功盡棄。試想一下贸桶,無(wú)論你的Executor進(jìn)程有多少個(gè)舅逸,內(nèi)存和CPU有多大,但是task只有1個(gè)或者10個(gè)皇筛,那么90%的Executor進(jìn)程可能根本就沒(méi)有task執(zhí)行琉历,也就是白白浪費(fèi)了資源!因此Spark官網(wǎng)建議的設(shè)置原則是水醋,設(shè)置該參數(shù)為num-executors * executor-cores的2~3倍較為合適旗笔,比如Executor的總CPU core數(shù)量為300個(gè),那么設(shè)置1000個(gè)task是可以的拄踪,此時(shí)可以充分地利用Spark集群的資源蝇恶。
首先隨便查看一個(gè)stage的信息,發(fā)現(xiàn)每個(gè)executor的task為1(因?yàn)槲募苄≈挥幸粋€(gè)hadoop block)惶桐,只有一個(gè)線程完全沒(méi)有并發(fā)撮弧,效率很低潘懊。根據(jù)num-executors * executor-cores的2~3倍,我這里就設(shè)置為2贿衍,和默認(rèn)相比進(jìn)步了1s授舟。
spark.storage.memoryFraction
- 參數(shù)說(shuō)明:該參數(shù)用于設(shè)置RDD持久化數(shù)據(jù)在Executor內(nèi)存中能占的比例,默認(rèn)是0.6贸辈。也就是說(shuō)释树,默認(rèn)Executor 60%的內(nèi)存,可以用來(lái)保存持久化的RDD數(shù)據(jù)擎淤。根據(jù)你選擇的不同的持久化策略奢啥,如果內(nèi)存不夠時(shí),可能數(shù)據(jù)就不會(huì)持久化揉燃,或者數(shù)據(jù)會(huì)寫入磁盤扫尺。
- 參數(shù)調(diào)優(yōu)建議:如果Spark作業(yè)中,有較多的RDD持久化操作炊汤,該參數(shù)的值可以適當(dāng)提高一些正驻,保證持久化的數(shù)據(jù)能夠容納在內(nèi)存中。避免內(nèi)存不夠緩存所有的數(shù)據(jù)抢腐,導(dǎo)致數(shù)據(jù)只能寫入磁盤中姑曙,降低了性能。但是如果Spark作業(yè)中的shuffle類操作比較多迈倍,而持久化操作比較少伤靠,那么這個(gè)參數(shù)的值適當(dāng)降低一些比較合適。此外啼染,如果發(fā)現(xiàn)作業(yè)由于頻繁的gc導(dǎo)致運(yùn)行緩慢(通過(guò)spark web ui可以觀察到作業(yè)的gc耗時(shí))宴合,意味著task執(zhí)行用戶代碼的內(nèi)存不夠用,那么同樣建議調(diào)低這個(gè)參數(shù)的值迹鹅。
spark.shuffle.memoryFraction
- 參數(shù)說(shuō)明:該參數(shù)用于設(shè)置shuffle過(guò)程中一個(gè)task拉取到上個(gè)stage的task的輸出后,進(jìn)行聚合操作時(shí)能夠使用的Executor內(nèi)存的比例断医,默認(rèn)是0.2滞乙。也就是說(shuō)奏纪,Executor默認(rèn)只有20%的內(nèi)存用來(lái)進(jìn)行該操作。shuffle操作在進(jìn)行聚合時(shí)斩启,如果發(fā)現(xiàn)使用的內(nèi)存超出了這個(gè)20%的限制序调,那么多余的數(shù)據(jù)就會(huì)溢寫到磁盤文件中去,此時(shí)就會(huì)極大地降低性能兔簇。
- 參數(shù)調(diào)優(yōu)建議:如果Spark作業(yè)中的RDD持久化操作較少发绢,shuffle操作較多時(shí),建議降低持久化操作的內(nèi)存占比垄琐,提高shuffle操作的內(nèi)存占比比例边酒,避免shuffle過(guò)程中數(shù)據(jù)過(guò)多時(shí)內(nèi)存不夠用,必須溢寫到磁盤上狸窘,降低了性能墩朦。此外,如果發(fā)現(xiàn)作業(yè)由于頻繁的gc導(dǎo)致運(yùn)行緩慢翻擒,意味著task執(zhí)行用戶代碼的內(nèi)存不夠用氓涣,那么同樣建議調(diào)低這個(gè)參數(shù)的值。
序列化算法
在Spark的架構(gòu)中,在網(wǎng)絡(luò)中傳遞的或者緩存在內(nèi)存别凹、硬盤中的對(duì)象需要進(jìn)行序列化操作便瑟,序列化的作用主要是利用時(shí)間換空間:
- 分發(fā)給Executor上的Task
- 需要緩存的RDD(前提是使用序列化方式緩存)
- 廣播變量
- Shuffle過(guò)程中的數(shù)據(jù)緩存
- 使用receiver方式接收的流數(shù)據(jù)緩存
- 算子函數(shù)中使用的外部變量
上面的六種數(shù)據(jù),通過(guò)Java序列化(默認(rèn)的序列化方式)形成一個(gè)二進(jìn)制字節(jié)數(shù)組番川,大大減少了數(shù)據(jù)在內(nèi)存、硬盤中占用的空間脊框,減少了網(wǎng)絡(luò)數(shù)據(jù)傳輸?shù)拈_(kāi)銷颁督,并且可以精確的推測(cè)內(nèi)存使用情況,降低GC頻率浇雹。
但是在序列化和反序列化的過(guò)程中沉御,會(huì)消耗大量的時(shí)間,所以選擇一個(gè)好的序列化算法很重要昭灵。目前Spark使用Kryo比Java默認(rèn)的序列化快10倍吠裆。具體原理可見(jiàn)Kryo參考伐谈,這里只需要添加配置使用Kryo即可。
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") //使用Kryo序列化庫(kù)
運(yùn)行時(shí)間變成了17s试疙,再去查看序列化和反序列化的時(shí)耗:
總結(jié)
- 首先要從根源進(jìn)行優(yōu)化祝旷,也就是編寫程序的時(shí)候履澳,比如注意避免創(chuàng)建重復(fù)RDD、持久化常使用的RDD等編碼方式怀跛。
- 編碼過(guò)程中盡量少的出現(xiàn)shuffle操作距贷,用其它操作代替。
- 序列化和反序列化使用得非常多吻谋,所以使用Kryo比默認(rèn)快10倍是非常重要的忠蝗。
- 對(duì)于資源而言,沒(méi)有絕對(duì)的配置方法漓拾,首先要理解每個(gè)資源參數(shù)的意義和使用經(jīng)驗(yàn)阁最,再根據(jù)自己的集群狀態(tài)來(lái)做調(diào)整。