1膝舅、 性能調(diào)優(yōu)
1.1议惰、 分配更多資源
1.1.1厢漩、分配哪些資源褐澎?
Executor的數(shù)量
每個Executor所能分配的CPU數(shù)量
每個Executor所能分配的內(nèi)存量
Driver端分配的內(nèi)存數(shù)量
1.1.2籽孙、在哪里分配這些資源烈评?
在生產(chǎn)環(huán)境中,提交spark作業(yè)時犯建,用的spark-submit shell腳本讲冠,里面調(diào)整對應(yīng)的參數(shù):
/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \ 配置executor的數(shù)量
--driver-memory 100m \ 配置driver的內(nèi)存(影響不大)
--executor-memory 100m \ 配置每個executor的內(nèi)存大小
--total-executor-cores 3 \ 配置所有executor的cpu core數(shù)量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
1.1.3、調(diào)節(jié)到多大适瓦,算是最大呢竿开?
常用的資源調(diào)度模式有Spark Standalone和Spark On Yarn谱仪。比如說你的每臺機器能夠給你使用60G內(nèi)存,10個cpu core否彩,20臺機器疯攒。那么executor的數(shù)量是20。平均每個executor所能分配60G內(nèi)存和10個cpu core列荔。
1.1.4敬尺、為什么多分配了這些資源以后,性能會得到提升贴浙?
加executor
:
如果executor數(shù)量比較少砂吞,那么,能夠并行執(zhí)行的task數(shù)量就比較少崎溃,就意味著蜻直,我們的Application的并行執(zhí)行的能力就很弱。
比如有3個executor袁串,每個executor有2個cpu core概而,那么同時能夠并行執(zhí)行的task,就是6個般婆。6個執(zhí)行完以后到腥,再換下一批6個task。
增加了executor數(shù)量以后蔚袍,那么乡范,就意味著,能夠并行執(zhí)行的task數(shù)量啤咽,也就變多了晋辆。比如原先是6個,現(xiàn)在可能可以并行執(zhí)行10個宇整,甚至20個瓶佳,100個。那么并行能力就比之前提升了數(shù)倍鳞青,數(shù)十倍霸饲。相應(yīng)的,性能(執(zhí)行的速度)臂拓,也能提升數(shù)倍~數(shù)十倍厚脉。
增加每個executor的cpu core,也是增加了執(zhí)行的并行能力
胶惰。原本20個executor傻工,每個才2個cpu core。能夠并行執(zhí)行的task數(shù)量,就是40個task中捆。
現(xiàn)在每個executor的cpu core鸯匹,增加到了4個。能夠并行執(zhí)行的task數(shù)量泄伪,就是80個task殴蓬。就物理性能來看,執(zhí)行的速度臂容,提升了2倍科雳。
增加每個executor的內(nèi)存量
。增加了內(nèi)存量以后脓杉,對性能的提升糟秘,有三點:
- 如果需要對RDD進行cache,那么更多的內(nèi)存球散,就可以緩存更多的數(shù)據(jù)尿赚,將更少的數(shù)據(jù)寫入磁盤,甚至不寫入磁盤蕉堰。減少了磁盤IO凌净。
- 對于shuffle操作,reduce端屋讶,會需要內(nèi)存來存放拉取的數(shù)據(jù)并進行聚合冰寻。如果內(nèi)存不夠,也會寫入磁盤皿渗。如果給executor分配更多內(nèi)存以后斩芭,就有更少的數(shù)據(jù),需要寫入磁盤乐疆,甚至不需要寫入磁盤划乖。減少了磁盤IO,提升了性能挤土。
- 對于task的執(zhí)行琴庵,可能會創(chuàng)建很多對象。如果內(nèi)存比較小仰美,可能會頻繁導(dǎo)致JVM堆內(nèi)存滿了迷殿,然后頻繁GC,垃圾回收咖杂,minor GC和full GC庆寺。(速度很慢)。內(nèi)存加大以后翰苫,帶來更少的GC,垃圾回收,避免了速度變慢奏窑,速度變快了导披。
1.2、調(diào)節(jié)并行度
1.2.1埃唯、并行度的概念
就是指的是Spark作業(yè)中撩匕,各個stage的task數(shù)量,代表了Spark作業(yè)的在各個階段(stage)的并行度墨叛。
1.2.2止毕、如果不調(diào)節(jié)并行度,導(dǎo)致并行度過低漠趁,會怎么樣扁凛?
比如現(xiàn)在spark-submit腳本里面,給我們的spark作業(yè)分配了足夠多的資源闯传,比如50個executor谨朝,每個executor有10G內(nèi)存,每個executor有3個cpu core甥绿∽直遥基本已經(jīng)達到了集群或者yarn隊列的資源上限。task沒有設(shè)置共缕,或者設(shè)置的很少洗出,比如就設(shè)置了100個task,你的Application任何一個stage運行的時候图谷,都有總數(shù)在150個cpu core翩活,可以并行運行。但是你現(xiàn)在蜓萄,只有100個task隅茎,平均分配一下,每個executor分配到2個task嫉沽,ok辟犀,那么同時在運行的task,只有100個绸硕,每個executor只會并行運行2個task堂竟。每個executor剩下的一個cpu core, 就浪費掉了玻佩。
你的資源雖然分配足夠了出嘹,但是問題是,并行度沒有與資源相匹配咬崔,導(dǎo)致你分配下去的資源都浪費掉了税稼。
合理的并行度的設(shè)置烦秩,應(yīng)該是要設(shè)置的足夠大,大到可以完全合理的利用你的集群資源郎仆。比如上面的例子只祠,總共集群有150個cpu core,可以并行運行150個task扰肌。那么就應(yīng)該將你的Application的并行度抛寝,至少設(shè)置成150,才能完全有效的利用你的集群資源曙旭,讓150個task盗舰,并行執(zhí)行。而且task增加到150個以后桂躏,即可以同時并行運行钻趋,還可以讓每個task要處理的數(shù)據(jù)量變少。比如總共150G的數(shù)據(jù)要處理沼头,如果是100個task爷绘,每個task計算1.5G的數(shù)據(jù),現(xiàn)在增加到150個task进倍,可以并行運行土至,而且每個task主要處理1G的數(shù)據(jù)就可以。
很簡單的道理猾昆,只要合理設(shè)置并行度陶因,就可以完全充分利用你的集群計算資源,并且減少每個task要處理的數(shù)據(jù)量垂蜗,最終楷扬,就是提升你的整個Spark作業(yè)的性能和運行速度。
1.2.3贴见、設(shè)置并行度
1)烘苹、task數(shù)量,至少設(shè)置成與Spark application的總cpu core數(shù)量相同(最理想情況片部,比如總共150個cpu core镣衡,分配了150個task,一起運行档悠,差不多同一時間運行完畢)廊鸥。
2)、官方是推薦辖所,task數(shù)量惰说,設(shè)置成spark application總cpu core數(shù)量的2~3倍,比如150個cpu core缘回,基本要設(shè)置task數(shù)量為300~500吆视。
實際情況典挑,與理想情況不同的,有些task會運行的快一點啦吧,比如50s就完了搔弄,有些task,可能會慢一點丰滑,要1分半才運行完,所以如果你的task數(shù)量倒庵,剛好設(shè)置的跟cpu core數(shù)量相同褒墨,可能還是會導(dǎo)致資源的浪費。比如150個task擎宝,10個先運行完了郁妈,剩余140個還在運行,但是這個時候绍申,有10個cpu core就空閑出來了噩咪,就導(dǎo)致了浪費。那如果task數(shù)量設(shè)置成cpu core總數(shù)的2~3倍极阅,那么一個task運行完了以后胃碾,另一個task馬上可以補上來,就盡量讓cpu core不要空閑筋搏,同時也是盡量提升spark作業(yè)運行的效率和速度仆百,提升性能。
3)奔脐、如何設(shè)置一個Spark Application的并行度俄周?
spark.default.parallelism
SparkConf conf = new SparkConf()
.set("spark.default.parallelism", "500")
1.3、 重構(gòu)RDD架構(gòu)以及RDD持久化
1.3.1髓迎、RDD架構(gòu)重構(gòu)與優(yōu)化
盡量去復(fù)用RDD峦朗,差不多的RDD,可以抽取成為一個共同的RDD排龄,供后面的RDD計算時波势,反復(fù)使用。
1.3.2涣雕、公共RDD一定要實現(xiàn)持久化
對于要多次計算和使用的公共RDD艰亮,一定要進行持久化。
持久化挣郭,就是將RDD的數(shù)據(jù)緩存到內(nèi)存中/磁盤中(BlockManager)以后無論對這個RDD做多少次計算迄埃,那么都是直接取這個RDD的持久化的數(shù)據(jù),比如從內(nèi)存中或者磁盤中兑障,直接提取一份數(shù)據(jù)侄非。
1.3.3蕉汪、持久化,是可以進行序列化的
如果正常將數(shù)據(jù)持久化在內(nèi)存中逞怨,那么可能會導(dǎo)致內(nèi)存的占用過大者疤,這樣的話,也許叠赦,會導(dǎo)致OOM內(nèi)存溢出驹马。
當純內(nèi)存無法支撐公共RDD數(shù)據(jù)完全存放的時候,就優(yōu)先考慮使用序列化的方式在純內(nèi)存中存儲除秀。將RDD的每個partition的數(shù)據(jù)糯累,序列化成一個大的字節(jié)數(shù)組,就一個對象册踩。序列化后泳姐,大大減少內(nèi)存的空間占用。
序列化的方式暂吉,唯一的缺點就是胖秒,在獲取數(shù)據(jù)的時候,需要反序列化慕的。
如果序列化純內(nèi)存方式阎肝,還是導(dǎo)致OOM內(nèi)存溢出,就只能考慮磁盤的方式肮街、內(nèi)存+磁盤的普通方式(無序列化)盗痒、內(nèi)存+磁盤(序列化)。
1.3.4低散、為了數(shù)據(jù)的高可靠性俯邓,而且內(nèi)存充足,可以使用雙副本機制熔号,進行持久化稽鞭。
持久化的雙副本機制,持久化后的一個副本引镊,因為機器宕機了朦蕴,副本丟了,就還是得重新計算一次弟头。持久化的每個數(shù)據(jù)單元吩抓,存儲一份副本,放在其他節(jié)點上面赴恨。從而進行容錯疹娶。一個副本丟了,不用重新計算伦连,還可以使用另外一份副本雨饺。這種方式钳垮,僅僅針對你的內(nèi)存資源極度充足的情況。
1.4额港、 廣播變量
1.3.1饺窿、概念及需求
Spark Application(我們自己寫的Spark作業(yè))最開始在Driver端,在我們提交任務(wù)的時候移斩,需要傳遞到各個Executor的Task上運行肚医。對于一些只讀、固定的數(shù)據(jù)(比如從DB中讀出的數(shù)據(jù)),每次都需要Driver廣播到各個Task上向瓷,這樣效率低下忍宋。廣播變量允許將變量只廣播(提前廣播)給各個Executor。該Executor上的各個Task再從所在節(jié)點的BlockManager獲取變量风罩,如果本地沒有,那么就從Driver遠程拉取變量副本舵稠,并保存在本地的BlockManager中超升。此后這個executor上的task,都會直接使用本地的BlockManager中的副本哺徊。而不是從Driver獲取變量室琢,從而提升了效率。
一個Executor只需要在第一個Task啟動時落追,獲得一份Broadcast數(shù)據(jù)盈滴,之后的Task都從本節(jié)點的BlockManager中獲取相關(guān)數(shù)據(jù)。
1.3.2轿钠、使用方法
1)調(diào)用SparkContext.broadcast方法創(chuàng)建一個Broadcast[T]對象巢钓。任何序列化的類型都可以這么實現(xiàn)。
2)通過value方法訪問該對象的值疗垛。
3)變量只會被發(fā)送到各個節(jié)點一次症汹,應(yīng)作為只讀值處理(修改這個值不會影響到別的節(jié)點)
1.5、使用Kryo序列化
1.5.1贷腕、概念及需求
默認情況下背镇,Spark內(nèi)部是使用Java的序列化機制,ObjectOutputStream / ObjectInputStream泽裳,對象輸入輸出流機制瞒斩,來進行序列化。
這種默認序列化機制的好處在于涮总,處理起來比較方便胸囱,也不需要我們手動去做什么事情,只是瀑梗,你在算子里面使用的變量旺矾,必須是實現(xiàn)Serializable接口的蔑鹦,可序列化即可。
但是缺點在于箕宙,默認的序列化機制的效率不高嚎朽,序列化的速度比較慢,序列化以后的數(shù)據(jù)柬帕,占用的內(nèi)存空間相對還是比較大哟忍。
Spark支持使用Kryo序列化機制。這種序列化機制陷寝,比默認的Java序列化機制速度要快锅很,序列化后的數(shù)據(jù)更小,大概是Java序列化機制的1/10凤跑。
所以Kryo序列化優(yōu)化以后爆安,可以讓網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)變少,在集群中耗費的內(nèi)存資源大大減少仔引。
1.5.2扔仓、Kryo序列化機制啟用以后生效的幾個地方
1)、算子函數(shù)中使用到的外部變量咖耘,使用Kryo以后:優(yōu)化網(wǎng)絡(luò)傳輸?shù)男阅芮檀兀梢詢?yōu)化集群中內(nèi)存的占用和消耗。
2)儿倒、持久化RDD版保,優(yōu)化內(nèi)存的占用和消耗。持久化RDD占用的內(nèi)存越少夫否,task執(zhí)行的時候彻犁,創(chuàng)建的對象,就不至于頻繁的占滿內(nèi)存凰慈,頻繁發(fā)生GC袖裕。
3)、shuffle:可以優(yōu)化網(wǎng)絡(luò)傳輸?shù)男阅堋?/p>
1.5.3溉瓶、使用方法
第一步急鳄,在SparkConf中設(shè)置一個屬性,spark.serializer堰酿,org.apache.spark.serializer.KryoSerializer類疾宏。
第二步,注冊你使用的需要通過Kryo序列化的一些自定義類触创,SparkConf.registerKryoClasses()坎藐。
項目中的使用:
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{CategorySortKey.class})
1.6、 使用fastutil優(yōu)化數(shù)據(jù)格式
1.6.1、fastutil介紹
fastutil是擴展了Java標準集合框架(Map岩馍、List碉咆、Set。HashMap蛀恩、ArrayList疫铜、HashSet)的類庫,提供了特殊類型的map双谆、set壳咕、list和queue。
fastutil能夠提供更小的內(nèi)存占用顽馋,更快的存取速度谓厘。我們使用fastutil提供的集合類,來替代自己平時使用的JDK的原生的Map寸谜、List竟稳、Set,好處在于fastutil集合類可以減小內(nèi)存的占用熊痴,并且在進行集合的遍歷他爸、根據(jù)索引(或者key)獲取元素的值和設(shè)置元素的值的時候,提供更快的存取速度愁拭。
fastutil也提供了64位的array、set和list亏吝,以及高性能快速的岭埠,以及實用的IO類,來處理二進制和文本類型的文件蔚鸥。
fastutil最新版本要求Java 7以及以上版本惜论。
fastutil的每一種集合類型,都實現(xiàn)了對應(yīng)的Java中的標準接口(比如fastutil的map止喷,實現(xiàn)了Java的Map接口)馆类,因此可以直接放入已有系統(tǒng)的任何代碼中。
fastutil還提供了一些JDK標準類庫中沒有的額外功能(比如雙向迭代器)弹谁。
fastutil除了對象和原始類型為元素的集合乾巧,fastutil也提供引用類型的支持,但是對引用類型是使用等于號(=)進行比較的预愤,而不是equals()方法沟于。
fastutil盡量提供了在任何場景下都是速度最快的集合類庫。
1.6.2植康、Spark中應(yīng)用fastutil的場景
1)旷太、如果算子函數(shù)使用了外部變量。第一,你可以使用Broadcast廣播變量優(yōu)化供璧。第二存崖,可以使用Kryo序列化類庫,提升序列化性能和效率睡毒。第三来惧,如果外部變量是某種比較大的集合,那么可以考慮使用fastutil改寫外部變量吕嘀,首先從源頭上就減少內(nèi)存的占用违寞,通過廣播變量進一步減少內(nèi)存占用,再通過Kryo序列化類庫進一步減少內(nèi)存占用偶房。
2)趁曼、在你的算子函數(shù)里,也就是task要執(zhí)行的計算邏輯里面棕洋,如果有邏輯中挡闰,出現(xiàn),要創(chuàng)建比較大的Map掰盘、List等集合摄悯,可能會占用較大的內(nèi)存空間,而且可能涉及到消耗性能的遍歷愧捕、存取等集合操作奢驯,此時,可以考慮將這些集合類型使用fastutil類庫重寫次绘,使用了fastutil集合類以后瘪阁,就可以在一定程度上,減少task創(chuàng)建出來的集合類型的內(nèi)存占用邮偎。避免executor內(nèi)存頻繁占滿管跺,頻繁喚起GC,導(dǎo)致性能下降禾进。
1.6.3豁跑、關(guān)于fastutil調(diào)優(yōu)的說明
fastutil其實沒有你想象中的那么強大,也不會跟官網(wǎng)上說的效果那么一鳴驚人泻云。廣播變量艇拍、Kryo序列化類庫、fastutil宠纯,都是之前所說的淑倾,對于性能來說,類似于一種調(diào)味品征椒,烤雞娇哆,本來就很好吃了,然后加了一點特質(zhì)的孜然麻辣粉調(diào)料,就更加好吃了一點碍讨。分配資源治力、并行度、RDD架構(gòu)與持久化勃黍,這三個就是烤雞宵统。broadcast、kryo覆获、fastutil马澈,類似于調(diào)料。
比如說弄息,你的spark作業(yè)痊班,經(jīng)過之前一些調(diào)優(yōu)以后,大概30分鐘運行完摹量,現(xiàn)在加上broadcast涤伐、kryo、fastutil缨称,也許就是優(yōu)化到29分鐘運行完凝果、或者更好一點,也許就是28分鐘睦尽、25分鐘器净。
shuffle調(diào)優(yōu),15分鐘当凡。groupByKey用reduceByKey改寫山害,執(zhí)行本地聚合,也許10分鐘宁玫。跟公司申請更多的資源粗恢,比如資源更大的YARN隊列柑晒,1分鐘欧瘪。
1.6.4、fastutil的使用
在pom.xml中引用fastutil的包
<dependency>
<groupId>fastutil</groupId>
<artifactId>fastutil</artifactId>
<version>5.0.9</version>
</dependency>
速度比較慢匙赞,可能是從國外的網(wǎng)去拉取jar包佛掖,可能要等待5分鐘,甚至幾十分鐘涌庭,不等
List<Integer> 相當于 IntList
基本都是類似于IntList的格式芥被,前綴就是集合的元素類型。特殊的就是Map坐榆,Int2IntMap拴魄,代表了key-value映射的元素類型。除此之外,還支持object匹中、reference夏漱。
1.7、 調(diào)節(jié)數(shù)據(jù)本地化等待時長
1.7.1顶捷、task的locality有五種
1)挂绰、PROCESS_LOCAL:進程本地化,代碼和數(shù)據(jù)在同一個進程中服赎,也就是在同一個executor中葵蒂。計算數(shù)據(jù)的task由executor執(zhí)行,數(shù)據(jù)在executor的BlockManager中重虑,性能最好践付。
2)、NODE_LOCAL:節(jié)點本地化嚎尤,代碼和數(shù)據(jù)在同一個節(jié)點中荔仁。比如說,數(shù)據(jù)作為一個HDFS block塊芽死,就在節(jié)點上乏梁,而task在節(jié)點上某個executor中運行,或者是关贵,數(shù)據(jù)和task在一個節(jié)點上的不同executor中森瘪,數(shù)據(jù)需要在進程間進行傳輸岩调。
3)、NO_PREF:對于task來說,數(shù)據(jù)從哪里獲取都一樣炬灭,沒有好壞之分。
4)每币、RACK_LOCAL:機架本地化先壕,數(shù)據(jù)和task在一個機架的兩個節(jié)點上,數(shù)據(jù)需要通過網(wǎng)絡(luò)在節(jié)點之間進行傳輸奴拦。
5)媒鼓、ANY:數(shù)據(jù)和task可能在集群中的任何地方,而且不在一個機架中错妖,性能最差绿鸣。
1.7.2、Spark的任務(wù)調(diào)度
Spark在Driver上暂氯,對Application的每一個stage的task進行分配之前都會計算出每個task要計算的是哪個分片數(shù)據(jù)潮模。Spark的task分配算法優(yōu)先會希望每個task正好分配到它要計算的數(shù)據(jù)所在的節(jié)點,這樣的話痴施,就不用在網(wǎng)絡(luò)間傳輸數(shù)據(jù)擎厢。
但是究流,有時可能task沒有機會分配到它的數(shù)據(jù)所在的節(jié)點。為什么呢动遭,可能那個節(jié)點的計算資源和計算能力都滿了梯嗽。所以這種時候, Spark會等待一段時間沽损,默認情況下是3s(不是絕對的灯节,還有很多種情況,對不同的本地化級別绵估,都會去等待)炎疆,到最后,實在是等待不了了国裳,就會選擇一個比較差的本地化級別形入。比如說,將task分配到靠它要計算的數(shù)據(jù)所在節(jié)點比較近的一個節(jié)點缝左,然后進行計算亿遂。
但是對于第二種情況,通常來說渺杉,肯定是要發(fā)生數(shù)據(jù)傳輸蛇数,task會通過其所在節(jié)點的BlockManager來獲取數(shù)據(jù),BlockManager發(fā)現(xiàn)自己本地沒有數(shù)據(jù)是越,會通過一個getRemote()方法耳舅,通過TransferService(網(wǎng)絡(luò)數(shù)據(jù)傳輸組件)從數(shù)據(jù)所在節(jié)點的BlockManager中,獲取數(shù)據(jù)倚评,通過網(wǎng)絡(luò)傳輸回task所在節(jié)點浦徊。
對于我們來說,當然不希望是類似于第二種情況的了天梧。最好的盔性,當然是task和數(shù)據(jù)在一個節(jié)點上,直接從本地executor的BlockManager中獲取數(shù)據(jù)呢岗,純內(nèi)存冕香,或者帶一點磁盤IO。如果要通過網(wǎng)絡(luò)傳輸數(shù)據(jù)的話敷燎,性能肯定會下降的暂筝。大量網(wǎng)絡(luò)傳輸箩言,以及磁盤IO硬贯,都是性能的殺手。
1.7.3陨收、我們什么時候要調(diào)節(jié)這個參數(shù)
觀察spark作業(yè)的運行日志饭豹。推薦大家在測試的時候鸵赖,先用client模式在本地就直接可以看到比較全的日志。日志里面會顯示:starting task…拄衰,PROCESS LOCAL它褪、NODE LOCAL
觀察大部分task的數(shù)據(jù)本地化級別,如果大多都是PROCESS_LOCAL翘悉,那就不用調(diào)節(jié)了茫打。
如果是發(fā)現(xiàn),好多的級別都是NODE_LOCAL妖混、ANY老赤,那么最好就去調(diào)節(jié)一下數(shù)據(jù)本地化的等待時長。要反復(fù)調(diào)節(jié)制市,每次調(diào)節(jié)完以后再運行并觀察日志抬旺,看看大部分的task的本地化級別有沒有提升,看看整個spark作業(yè)的運行時間有沒有縮短祥楣。注意开财,不要本末倒置,不要本地化級別是提升了误褪,但是因為大量的等待時長责鳍,spark作業(yè)的運行時間反而增加了,那還是不要調(diào)節(jié)了兽间。
1.7.4薇搁、怎么調(diào)節(jié)
spark.locality.wait,默認是3s渡八。6s啃洋,10s
默認情況下,下面3個的等待時長屎鳍,都是跟上面那個是一樣的宏娄,都是3s
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack
new SparkConf().set("spark.locality.wait", "10")
2、JVM調(diào)優(yōu)
堆內(nèi)存存放我們創(chuàng)建的一些對象逮壁,有老年代和年輕代孵坚。理想情況下,老年代都是放一些生命周期很長的對象窥淆,數(shù)量應(yīng)該是很少的卖宠,比如數(shù)據(jù)庫連接池。我們在spark task執(zhí)行算子函數(shù)(我們自己寫的)忧饭,可能會創(chuàng)建很多對象扛伍,這些對象都是要放入JVM年輕代中的。
每一次放對象的時候词裤,都是放入eden區(qū)域刺洒,和其中一個survivor區(qū)域鳖宾。另外一個survivor區(qū)域是空閑的。
當eden區(qū)域和一個survivor區(qū)域放滿了以后(spark運行過程中逆航,產(chǎn)生的對象實在太多了)鼎文,就會觸發(fā)minor gc,小型垃圾回收因俐。把不再使用的對象拇惋,從內(nèi)存中清空,給后面新創(chuàng)建的對象騰出來點兒地方抹剩。
清理掉了不再使用的對象之后蚤假,那么也會將存活下來的對象(還要繼續(xù)使用的),放入之前空閑的那一個survivor區(qū)域中吧兔。這里可能會出現(xiàn)一個問題磷仰。默認eden、survior1和survivor2的內(nèi)存占比是8:1:1境蔼。問題是灶平,如果存活下來的對象是1.5,一個survivor區(qū)域放不下箍土。此時就可能通過JVM的擔保機制(不同JVM版本可能對應(yīng)的行為)逢享,將多余的對象,直接放入老年代了吴藻。
如果你的JVM內(nèi)存不夠大的話瞒爬,可能導(dǎo)致頻繁的年輕代內(nèi)存滿溢,頻繁的進行minor gc沟堡。頻繁的minor gc會導(dǎo)致短時間內(nèi)侧但,有些存活的對象,多次垃圾回收都沒有回收掉航罗。會導(dǎo)致這種短生命周期(其實不一定是要長期使用的)對象禀横,年齡過大,垃圾回收次數(shù)太多還沒有回收到粥血,跑到老年代柏锄。
老年代中,可能會因為內(nèi)存不足复亏,囤積一大堆趾娃,短生命周期的,本來應(yīng)該在年輕代中的缔御,可能馬上就要被回收掉的對象抬闷。此時,可能導(dǎo)致老年代頻繁滿溢刹淌。頻繁進行full gc(全局/全面垃圾回收)饶氏。full gc就會去回收老年代中的對象。full gc由于這個算法的設(shè)計有勾,是針對的是疹启,老年代中的對象數(shù)量很少,滿溢進行full gc的頻率應(yīng)該很少蔼卡,因此采取了不太復(fù)雜喊崖,但是耗費性能和時間的垃圾回收算法。full gc很慢雇逞。
full gc / minor gc荤懂,無論是快,還是慢塘砸,都會導(dǎo)致jvm的工作線程停止工作节仿,stop the world。簡而言之掉蔬,就是說廊宪,gc的時候,spark停止工作了女轿。等著垃圾回收結(jié)束箭启。
內(nèi)存不充足的時候,出現(xiàn)的問題:
- 頻繁minor gc蛉迹,也會導(dǎo)致頻繁spark停止工作
- 老年代囤積大量活躍對象(短生命周期的對象)傅寡,導(dǎo)致頻繁full gc,full gc時間很長北救,短則數(shù)十秒荐操,長則數(shù)分鐘,甚至數(shù)小時珍策〉砹悖可能導(dǎo)致spark長時間停止工作。
- 嚴重影響咱們的spark的性能和運行的速度膛壹。
2.1驾中、降低cache操作的內(nèi)存占比
spark中,堆內(nèi)存又被劃分成了兩塊模聋,一塊是專門用來給RDD的cache肩民、persist操作進行RDD數(shù)據(jù)緩存用的。另外一塊用來給spark算子函數(shù)的運行使用的链方,存放函數(shù)中自己創(chuàng)建的對象持痰。
默認情況下,給RDD cache操作的內(nèi)存占比祟蚀,是0.6工窍,60%的內(nèi)存都給了cache操作了割卖。但是問題是,如果某些情況下cache不是那么的緊張患雏,問題在于task算子函數(shù)中創(chuàng)建的對象過多鹏溯,然后內(nèi)存又不太大,導(dǎo)致了頻繁的minor gc淹仑,甚至頻繁full gc丙挽,導(dǎo)致spark頻繁的停止工作。性能影響會很大匀借。
針對上述這種情況颜阐,可以在任務(wù)運行界面,去查看你的spark作業(yè)的運行統(tǒng)計吓肋,可以看到每個stage的運行情況凳怨,包括每個task的運行時間、gc時間等等是鬼。如果發(fā)現(xiàn)gc太頻繁猿棉,時間太長。此時就可以適當調(diào)價這個比例屑咳。
降低cache操作的內(nèi)存占比萨赁,大不了用persist操作,選擇將一部分緩存的RDD數(shù)據(jù)寫入磁盤兆龙,或者序列化方式杖爽,配合Kryo序列化類,減少RDD緩存的內(nèi)存占用紫皇。降低cache操作內(nèi)存占比慰安,對應(yīng)的,算子函數(shù)的內(nèi)存占比就提升了聪铺。這個時候化焕,可能就可以減少minor gc的頻率,同時減少full gc的頻率铃剔。對性能的提升是有一定的幫助的撒桨。
一句話,讓task執(zhí)行算子函數(shù)時键兜,有更多的內(nèi)存可以使用凤类。
spark.storage.memoryFraction,0.6 -> 0.5 -> 0.4 -> 0.2
2.2普气、調(diào)節(jié)executor堆外內(nèi)存與連接等待時長
調(diào)節(jié)executor堆外內(nèi)存
有時候谜疤,如果你的spark作業(yè)處理的數(shù)據(jù)量特別大,幾億數(shù)據(jù)量。然后spark作業(yè)一運行夷磕,時不時的報錯履肃,shuffle file cannot find,executor坐桩、task lost尺棋,out of memory(內(nèi)存溢出)。
可能是executor的堆外內(nèi)存不太夠用撕攒,導(dǎo)致executor在運行的過程中陡鹃,可能會內(nèi)存溢出烘浦,可能導(dǎo)致后續(xù)的stage的task在運行的時候抖坪,要從一些executor中去拉取shuffle map output文件,但是executor可能已經(jīng)掛掉了闷叉,關(guān)聯(lián)的block manager也沒有了擦俐。所以會報shuffle output file not found,resubmitting task握侧,executor lost蚯瞧。spark作業(yè)徹底崩潰。
上述情況下品擎,就可以去考慮調(diào)節(jié)一下executor的堆外內(nèi)存埋合。也許就可以避免報錯。此外萄传,有時堆外內(nèi)存調(diào)節(jié)的比較大的時候甚颂,對于性能來說,也會帶來一定的提升秀菱。
可以調(diào)節(jié)堆外內(nèi)存的上限:
--conf spark.yarn.executor.memoryOverhead=2048
spark-submit腳本里面振诬,去用--conf的方式,去添加配置衍菱。用new SparkConf().set()這種方式去設(shè)置是沒有用的赶么!一定要在spark-submit腳本中去設(shè)置。
spark.yarn.executor.memoryOverhead(看名字脊串,顧名思義辫呻,針對的是基于yarn的提交模式)
默認情況下,這個堆外內(nèi)存上限大概是300M琼锋。通常在項目中印屁,真正處理大數(shù)據(jù)的時候,這里都會出現(xiàn)問題斩例,導(dǎo)致spark作業(yè)反復(fù)崩潰雄人,無法運行。此時就會去調(diào)節(jié)這個參數(shù),到至少1G(1024M)础钠,甚至說2G恰力、4G。
通常這個參數(shù)調(diào)節(jié)上去以后旗吁,就會避免掉某些JVM OOM的異常問題踩萎,同時呢,會讓整體spark作業(yè)的性能很钓,得到較大的提升香府。
調(diào)節(jié)連接等待時長
我們知道,executor會優(yōu)先從自己本地關(guān)聯(lián)的BlockManager中獲取某份數(shù)據(jù)码倦。如果本地block manager沒有的話企孩,那么會通過TransferService,去遠程連接其他節(jié)點上executor的block manager去獲取袁稽。
而此時上面executor去遠程連接的那個executor勿璃,因為task創(chuàng)建的對象特別大,特別多推汽,
頻繁的讓JVM堆內(nèi)存滿溢补疑,正在進行垃圾回收。而處于垃圾回收過程中歹撒,所有的工作線程全部停止莲组,相當于只要一旦進行垃圾回收,spark / executor停止工作暖夭,無法提供響應(yīng)锹杈。
此時呢,就會沒有響應(yīng)鳞尔,無法建立網(wǎng)絡(luò)連接嬉橙,會卡住。spark默認的網(wǎng)絡(luò)連接的超時時長寥假,是60s市框,如果卡住60s都無法建立連接的話,那么就宣告失敗了糕韧。
報錯幾次枫振,幾次都拉取不到數(shù)據(jù)的話,可能會導(dǎo)致spark作業(yè)的崩潰萤彩。也可能會導(dǎo)致DAGScheduler粪滤,反復(fù)提交幾次stage。TaskScheduler反復(fù)提交幾次task雀扶。大大延長我們的spark作業(yè)的運行時間杖小。
可以考慮調(diào)節(jié)連接的超時時長:
--conf spark.core.connection.ack.wait.timeout=300
spark-submit腳本肆汹,切記,不是在new SparkConf().set()這種方式來設(shè)置的予权。
spark.core.connection.ack.wait.timeout(spark core昂勉,connection,連接扫腺,ack岗照,wait timeout,建立不上連接的時候笆环,超時等待時長)
調(diào)節(jié)這個值比較大以后攒至,通常來說,可以避免部分的偶爾出現(xiàn)的某某文件拉取失敗躁劣,某某文件lost掉了迫吐。
3、Shuffle調(diào)優(yōu)
原理概述:
什么樣的情況下习绢,會發(fā)生shuffle渠抹?
在spark中蝙昙,主要是以下幾個算子:groupByKey闪萄、reduceByKey、countByKey奇颠、join(分情況败去,先groupByKey后再join是不會發(fā)生shuffle的),等等烈拒。
什么是shuffle圆裕?
groupByKey,要把分布在集群各個節(jié)點上的數(shù)據(jù)中的同一個key荆几,對應(yīng)的values吓妆,都要集中到一塊兒,集中到集群中同一個節(jié)點上吨铸,更嚴密一點說行拢,就是集中到一個節(jié)點的一個executor的一個task中。
然后呢诞吱,集中一個key對應(yīng)的values之后舟奠,才能交給我們來進行處理,<key, Iterable<value>>房维。reduceByKey沼瘫,算子函數(shù)去對values集合進行reduce操作,最后變成一個value咙俩。countByKey需要在一個task中耿戚,獲取到一個key對應(yīng)的所有的value,然后進行計數(shù),統(tǒng)計一共有多少個value膜蛔。join晓锻,RDD<key, value>,RDD<key, value>飞几,只要是兩個RDD中砚哆,key相同對應(yīng)的2個value,都能到一個節(jié)點的executor的task中屑墨,給我們進行處理躁锁。
shuffle,一定是分為兩個stage來完成的卵史。因為這其實是個逆向的過程战转,不是stage決定shuffle,是shuffle決定stage以躯。
reduceByKey(+)槐秧,在某個action觸發(fā)job的時候,DAGScheduler忧设,會負責(zé)劃分job為多個stage刁标。劃分的依據(jù),就是址晕,如果發(fā)現(xiàn)有會觸發(fā)shuffle操作的算子膀懈,比如reduceByKey,就將這個操作的前半部分谨垃,以及之前所有的RDD和transformation操作启搂,劃分為一個stage。shuffle操作的后半部分刘陶,以及后面的胳赌,直到action為止的RDD和transformation操作,劃分為另外一個stage匙隔。
3.1疑苫、合并map端輸出文件
3.1.1、如果不合并map端輸出文件的話牡直,會怎么樣缀匕?
舉例實際生產(chǎn)環(huán)境的條件:
100個節(jié)點(每個節(jié)點一個executor):100個executor
每個executor:2個cpu core
總共1000個task:每個executor平均10個task
每個節(jié)點,10個task碰逸,每個節(jié)點會輸出多少份map端文件乡小?10 * 1000=1萬個文件
總共有多少份map端輸出文件?100 * 10000 = 100萬饵史。
第一個stage满钟,每個task胜榔,都會給第二個stage的每個task創(chuàng)建一份map端的輸出文件
第二個stage,每個task湃番,會到各個節(jié)點上面去夭织,拉取第一個stage每個task輸出的,屬于自己的那一份文件吠撮。
shuffle中的寫磁盤的操作尊惰,基本上就是shuffle中性能消耗最為嚴重的部分。
通過上面的分析泥兰,一個普通的生產(chǎn)環(huán)境的spark job的一個shuffle環(huán)節(jié)弄屡,會寫入磁盤100萬個文件。
磁盤IO對性能和spark作業(yè)執(zhí)行速度的影響鞋诗,是極其驚人和嚇人的膀捷。
基本上,spark作業(yè)的性能削彬,都消耗在shuffle中了全庸,雖然不只是shuffle的map端輸出文件這一個部分,但是這里也是非常大的一個性能消耗點融痛。
3.1.2壶笼、開啟shuffle map端輸出文件合并的機制
new SparkConf().set("spark.shuffle.consolidateFiles", "true")
默認情況下,是不開啟的酌心,就是會發(fā)生如上所述的大量map端輸出文件的操作拌消,嚴重影響性能挑豌。
3.1.3安券、合并map端輸出文件,對咱們的spark的性能有哪些方面的影響呢氓英?
1侯勉、map task寫入磁盤文件的IO,減少:100萬文件 -> 20萬文件
2铝阐、第二個stage址貌,原本要拉取第一個stage的task數(shù)量份文件,1000個task徘键,第二個stage的每個task练对,都要拉取1000份文件,走網(wǎng)絡(luò)傳輸吹害。合并以后螟凭,100個節(jié)點,每個節(jié)點2個cpu core它呀,第二個stage的每個task螺男,主要拉取100 * 2 = 200個文件即可棒厘。此時網(wǎng)絡(luò)傳輸?shù)男阅芟囊泊蟠鬁p少。
分享一下下隧,實際在生產(chǎn)環(huán)境中奢人,使用了spark.shuffle.consolidateFiles機制以后,實際的性能調(diào)優(yōu)的效果:對于上述的這種生產(chǎn)環(huán)境的配置淆院,性能的提升何乎,還是相當?shù)目捎^的。spark作業(yè)土辩,5個小時 -> 2~3個小時宪赶。
大家不要小看這個map端輸出文件合并機制。實際上脯燃,在數(shù)據(jù)量比較大搂妻,你自己本身做了前面的性能調(diào)優(yōu),executor上去->cpu core上去->并行度(task數(shù)量)上去辕棚,shuffle沒調(diào)優(yōu)欲主,shuffle就很糟糕了。大量的map端輸出文件的產(chǎn)生逝嚎,對性能有比較惡劣的影響扁瓢。
這個時候,去開啟這個機制补君,可以很有效的提升性能引几。
3.2、調(diào)節(jié)map端內(nèi)存緩沖與reduce端內(nèi)存占比
3.2.1挽铁、默認情況下可能出現(xiàn)的問題
默認情況下伟桅,shuffle的map task,輸出到磁盤文件的時候叽掘,統(tǒng)一都會先寫入每個task自己關(guān)聯(lián)的一個內(nèi)存緩沖區(qū)楣铁。
這個緩沖區(qū)大小,默認是32kb更扁。
每一次盖腕,當內(nèi)存緩沖區(qū)滿溢之后,才會進行spill溢寫操作浓镜,溢寫到磁盤文件中去溃列。
reduce端task,在拉取到數(shù)據(jù)之后膛薛,會用hashmap的數(shù)據(jù)格式听隐,來對各個key對應(yīng)的values進行匯聚。
針對每個key對應(yīng)的values相叁,執(zhí)行我們自定義的聚合函數(shù)的代碼遵绰,比如_ + _(把所有values累加起來)辽幌。
reduce task,在進行匯聚椿访、聚合等操作的時候乌企,實際上,使用的就是自己對應(yīng)的executor的內(nèi)存成玫,executor(jvm進程加酵,堆),默認executor內(nèi)存中劃分給reduce task進行聚合的比例是0.2哭当。
問題來了猪腕,因為比例是0.2,所以钦勘,理論上陋葡,很有可能會出現(xiàn),拉取過來的數(shù)據(jù)很多彻采,那么在內(nèi)存中腐缤,放不下。這個時候肛响,默認的行為就是將在內(nèi)存放不下的數(shù)據(jù)都spill(溢寫)到磁盤文件中去岭粤。
在數(shù)據(jù)量比較大的情況下,可能頻繁地發(fā)生reduce端的磁盤文件的讀寫特笋。
3.2.2剃浇、調(diào)優(yōu)方式
調(diào)節(jié)map task內(nèi)存緩沖:spark.shuffle.file.buffer,默認32k(spark 1.3.x不是這個參數(shù)猎物,后面還有一個后綴虎囚,kb。spark 1.5.x以后霸奕,變了溜宽,就是現(xiàn)在這個參數(shù))
調(diào)節(jié)reduce端聚合內(nèi)存占比:spark.shuffle.memoryFraction,0.2
3.2.3质帅、在實際生產(chǎn)環(huán)境中,我們在什么時候來調(diào)節(jié)兩個參數(shù)留攒?
看Spark UI煤惩,如果你的公司是決定采用standalone模式,那么狠簡單炼邀,你的spark跑起來魄揉,會顯示一個Spark UI的地址,4040的端口拭宁。進去觀察每個stage的詳情洛退,有哪些executor瓣俯,有哪些task,每個task的shuffle write和shuffle read的量兵怯,shuffle的磁盤和內(nèi)存讀寫的數(shù)據(jù)量彩匕。如果是用的yarn模式來提交群发,從yarn的界面進去痴脾,點擊對應(yīng)的application,進入Spark UI珊楼,查看詳情袜漩。
如果發(fā)現(xiàn)shuffle 磁盤的write和read绪爸,很大。這個時候宙攻,就意味著最好調(diào)節(jié)一些shuffle的參數(shù)奠货。首先當然是考慮開啟map端輸出文件合并機制。其次調(diào)節(jié)上面說的那兩個參數(shù)座掘。調(diào)節(jié)的時候的原則:spark.shuffle.file.buffer每次擴大一倍仇味,然后看看效果,64雹顺,128丹墨。spark.shuffle.memoryFraction,每次提高0.1嬉愧,看看效果贩挣。
不能調(diào)節(jié)的太大,太大了以后過猶不及没酣,因為內(nèi)存資源是有限的王财,你這里調(diào)節(jié)的太大了,其他環(huán)節(jié)的內(nèi)存使用就會有問題了裕便。
3.2.4绒净、調(diào)節(jié)以后的效果
map task內(nèi)存緩沖變大了,減少spill到磁盤文件的次數(shù)偿衰。reduce端聚合內(nèi)存變大了挂疆,減少spill到磁盤的次數(shù),而且減少了后面聚合讀取磁盤文件的數(shù)量下翎。
3.3缤言、HashShuffleManager與SortShuffleManager
3.3.1、shuffle調(diào)優(yōu)概述
大多數(shù)Spark作業(yè)的性能主要就是消耗在了shuffle環(huán) 節(jié)视事,因為該環(huán)節(jié)包含了大量的磁盤IO胆萧、序列化、網(wǎng)絡(luò)數(shù)據(jù)傳輸?shù)炔僮骼R虼说耄绻屪鳂I(yè)的性能更上一層樓订晌,就有必要對shuffle過程進行調(diào)優(yōu)。但是也 必須提醒大家的是蚌吸,影響一個Spark作業(yè)性能的因素锈拨,主要還是代碼開發(fā)、資源參數(shù)以及數(shù)據(jù)傾斜套利,shuffle調(diào)優(yōu)只能在整個Spark的性能調(diào)優(yōu)中占 到一小部分而已推励。因此大家務(wù)必把握住調(diào)優(yōu)的基本原則,千萬不要舍本逐末肉迫。下面我們就給大家詳細講解shuffle的原理验辞,以及相關(guān)參數(shù)的說明,同時給出各個參數(shù)的調(diào)優(yōu)建議喊衫。
3.3.2跌造、ShuffleManager發(fā)展概述
在Spark的源碼中,負責(zé)shuffle過程的執(zhí)行族购、計算和處理的組件主要就是ShuffleManager壳贪,也即shuffle管理器。
在Spark 1.2以前寝杖,默認的shuffle計算引擎是HashShuffleManager违施。該ShuffleManager而HashShuffleManager有著一個非常嚴重的弊端,就是會產(chǎn)生大量的中間磁盤文件瑟幕,進而由大量的磁盤IO操作影響了性能磕蒲。
因此在Spark 1.2以后的版本中,默認的ShuffleManager改成了SortShuffleManager只盹。SortShuffleManager相較于 HashShuffleManager來說辣往,有了一定的改進。主要就在于殖卑,每個Task在進行shuffle操作時站削,雖然也會產(chǎn)生較多的臨時磁盤文件,但是最后會將所有的臨時文件合并(merge)成一個磁盤文件孵稽,因此每個Task就只有一個磁盤文件许起。在下一個stage的shuffle read task拉取自己的數(shù)據(jù)時,只要根據(jù)索引讀取每個磁盤文件中的部分數(shù)據(jù)即可肛冶。
在spark 1.5.x以后街氢,對于shuffle manager又出來了一種新的manager,tungsten-sort(鎢絲)睦袖,鎢絲sort shuffle manager。官網(wǎng)上一般說荣刑,鎢絲sort shuffle manager馅笙,效果跟sort shuffle manager是差不多的伦乔。
但是,唯一的不同之處在于董习,鎢絲manager烈和,是使用了自己實現(xiàn)的一套內(nèi)存管理機制,性能上有很大的提升皿淋, 而且可以避免shuffle過程中產(chǎn)生的大量的OOM招刹,GC,等等內(nèi)存相關(guān)的異常窝趣。
3.3.3疯暑、hash、sort哑舒、tungsten-sort妇拯。如何來選擇?
1洗鸵、需不需要數(shù)據(jù)默認就讓spark給你進行排序越锈?就好像mapreduce,默認就是有按照key的排序膘滨。如果不需要的話甘凭,其實還是建議搭建就使用最基本的HashShuffleManager,因為最開始就是考慮的是不排序火邓,換取高性能丹弱。
2、什么時候需要用sort shuffle manager贡翘?如果你需要你的那些數(shù)據(jù)按key排序了蹈矮,那么就選擇這種吧,而且要注意鸣驱,reduce task的數(shù)量應(yīng)該是超過200的泛鸟,這樣sort、merge(多個文件合并成一個)的機制踊东,才能生效把北滥。但是這里要注意,你一定要自己考量一下闸翅,有沒有必要在shuffle的過程中再芋,就做這個事情,畢竟對性能是有影響的坚冀。
3济赎、如果你不需要排序,而且你希望你的每個task輸出的文件最終是會合并成一份的,你自己認為可以減少性能開銷司训」辜瘢可以去調(diào)節(jié)bypassMergeThreshold這個閾值,比如你的reduce task數(shù)量是500壳猜,默認閾值是200勾徽,所以默認還是會進行sort和直接merge的⊥嘲猓可以將閾值調(diào)節(jié)成550喘帚,不會進行sort,按照hash的做法咒钟,每個reduce task創(chuàng)建一份輸出文件吹由,最后合并成一份文件。(一定要提醒大家盯腌,這個參數(shù)溉知,其實我們通常不會在生產(chǎn)環(huán)境里去使用,也沒有經(jīng)過驗證說腕够,這樣的方式级乍,到底有多少性能的提升)
4、如果你想選用sort based shuffle manager帚湘,而且你們公司的spark版本比較高玫荣,是1.5.x版本的,那么可以考慮去嘗試使用tungsten-sort shuffle manager大诸⊥背В看看性能的提升與穩(wěn)定性怎么樣。
總結(jié):
1资柔、在生產(chǎn)環(huán)境中焙贷,不建議大家貿(mào)然使用第三點和第四點: 2、如果你不想要你的數(shù)據(jù)在shuffle時排序贿堰,那么就自己設(shè)置一下辙芍,用hash shuffle manager。 3羹与、如果你的確是需要你的數(shù)據(jù)在shuffle時進行排序的故硅,那么就默認不用動,默認就是sort shuffle manager纵搁〕孕疲或者是什么?如果你壓根兒不care是否排序這個事兒腾誉,那么就默認讓他就是sort的徘层。調(diào)節(jié)一些其他的參數(shù)(consolidation機制)峻呕。(80%,都是用這種) spark.shuffle.manager:hash惑灵、sort山上、tungsten-sort spark.shuffle.sort.bypassMergeThreshold:200眼耀。自己可以設(shè)定一個閾值英支,默認是200,當reduce task數(shù)量少于等于200哮伟,map task創(chuàng)建的輸出文件小于等于200的干花,最后會將所有的輸出文件合并為一份文件。這樣做的好處楞黄,就是避免了sort排序池凄,節(jié)省了性能開銷,而且還能將多個reduce task的文件合并成一份文件鬼廓,節(jié)省了reduce task拉取數(shù)據(jù)的時候的磁盤IO的開銷肿仑。