1.1虹脯、 分配更多資源
1.1.1、分配哪些資源削饵?
Executor的數(shù)量
每個(gè)Executor所能分配的CPU數(shù)量
每個(gè)Executor所能分配的內(nèi)存量
Driver端分配的內(nèi)存數(shù)量
1.1.2岩瘦、在哪里分配這些資源?
在生產(chǎn)環(huán)境中窿撬,提交spark作業(yè)時(shí)启昧,用的spark-submit shell腳本,里面調(diào)整對(duì)應(yīng)的參數(shù):
/usr/local/spark/bin/spark-submit\
--classcn.spark.sparktest.core.WordCountCluster \
--num-executors3 \ 配置executor的數(shù)量
--driver-memory100m \ 配置driver的內(nèi)存(影響不大)
--executor-memory100m \ 配置每個(gè)executor的內(nèi)存大小
--total-executor-cores3 \ 配置所有executor的cpu core數(shù)量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar\
1.1.3劈伴、調(diào)節(jié)到多大密末,算是最大呢?
常用的資源調(diào)度模式有Spark Standalone和Spark On Yarn。比如說(shuō)你的每臺(tái)機(jī)器能夠給你使用60G內(nèi)存严里,10個(gè)cpu core新啼,20臺(tái)機(jī)器。那么executor的數(shù)量是20刹碾。平均每個(gè)executor所能分配60G內(nèi)存和10個(gè)cpu core燥撞。
1.1.4、為什么多分配了這些資源以后迷帜,性能會(huì)得到提升叨吮?
[if !supportLists]? [endif]增加executor:
如果executor數(shù)量比較少,那么瞬矩,能夠并行執(zhí)行的task數(shù)量就比較少茶鉴,就意味著,我們的Application的并行執(zhí)行的能力就很弱景用。
比如有3個(gè)executor涵叮,每個(gè)executor有2個(gè)cpu core,那么同時(shí)能夠并行執(zhí)行的task伞插,就是6個(gè)割粮。6個(gè)執(zhí)行完以后,再換下一批6個(gè)task媚污。
增加了executor數(shù)量以后舀瓢,那么,就意味著耗美,能夠并行執(zhí)行的task數(shù)量京髓,也就變多了。比如原先是6個(gè)商架,現(xiàn)在可能可以并行執(zhí)行10個(gè)堰怨,甚至20個(gè),100個(gè)蛇摸。那么并行能力就比之前提升了數(shù)倍备图,數(shù)十倍。相應(yīng)的赶袄,性能(執(zhí)行的速度)揽涮,也能提升數(shù)倍~數(shù)十倍。
增加每個(gè)executor的cpu core饿肺,也是增加了執(zhí)行的并行能力蒋困。原本20個(gè)executor,每個(gè)才2個(gè)cpu core唬格。能夠并行執(zhí)行的task數(shù)量家破,就是40個(gè)task颜说。
現(xiàn)在每個(gè)executor的cpu core购岗,增加到了4個(gè)汰聋。能夠并行執(zhí)行的task數(shù)量,就是100個(gè)task喊积。就物理性能來(lái)看烹困,執(zhí)行的速度躲胳,提升了2倍蔫巩。
增加每個(gè)executor的內(nèi)存量。增加了內(nèi)存量以后壳繁,對(duì)性能的提升绎签,有三點(diǎn):
1枯饿、如果需要對(duì)RDD進(jìn)行cache,那么更多的內(nèi)存诡必,就可以緩存更多的數(shù)據(jù)奢方,將更少的數(shù)據(jù)寫(xiě)入磁盤(pán),甚至不寫(xiě)入磁盤(pán)爸舒。減少了磁盤(pán)IO蟋字。
2、對(duì)于shuffle操作扭勉,reduce端鹊奖,會(huì)需要內(nèi)存來(lái)存放拉取的數(shù)據(jù)并進(jìn)行聚合。如果內(nèi)存不夠涂炎,也會(huì)寫(xiě)入磁盤(pán)忠聚。如果給executor分配更多內(nèi)存以后,就有更少的數(shù)據(jù)唱捣,需要寫(xiě)入磁盤(pán)咒林,甚至不需要寫(xiě)入磁盤(pán)。減少了磁盤(pán)IO爷光,提升了性能垫竞。
3、對(duì)于task的執(zhí)行蛀序,可能會(huì)創(chuàng)建很多對(duì)象欢瞪。如果內(nèi)存比較小,可能會(huì)頻繁導(dǎo)致JVM堆內(nèi)存滿了徐裸,然后頻繁GC遣鼓,垃圾回收,minor GC和full GC重贺。(速度很慢)骑祟。內(nèi)存加大以后回懦,帶來(lái)更少的GC,垃圾回收次企,避免了速度變慢怯晕,速度變快了。
1.2缸棵、調(diào)節(jié)并行度
1.2.1舟茶、并行度的概念
就是指的是Spark作業(yè)中,各個(gè)stage的task數(shù)量堵第,代表了Spark作業(yè)的在各個(gè)階段(stage)的并行度吧凉。
1.2.2、如果不調(diào)節(jié)并行度踏志,導(dǎo)致并行度過(guò)低阀捅,會(huì)怎么樣?
比如現(xiàn)在spark-submit腳本里面针余,給我們的spark作業(yè)分配了足夠多的資源饲鄙,比如50個(gè)executor,每個(gè)executor有10G內(nèi)存涵紊,每個(gè)executor有3個(gè)cpu core傍妒。基本已經(jīng)達(dá)到了集群或者yarn隊(duì)列的資源上限摸柄。task沒(méi)有設(shè)置颤练,或者設(shè)置的很少,比如就設(shè)置了100個(gè)task驱负,50個(gè)executor嗦玖,每個(gè)executor有3個(gè)cpu core,也就是說(shuō)跃脊,你的Application任何一個(gè)stage運(yùn)行的時(shí)候宇挫,都有總數(shù)在150個(gè)cpu core,可以并行運(yùn)行酪术。但是你現(xiàn)在器瘪,只有100個(gè)task,平均分配一下绘雁,每個(gè)executor分配到2個(gè)task橡疼,ok,那么同時(shí)在運(yùn)行的task庐舟,只有100個(gè)欣除,每個(gè)executor只會(huì)并行運(yùn)行2個(gè)task。每個(gè)executor剩下的一個(gè)cpu core挪略,就浪費(fèi)掉了历帚。
你的資源雖然分配足夠了滔岳,但是問(wèn)題是,并行度沒(méi)有與資源相匹配挽牢,導(dǎo)致你分配下去的資源都浪費(fèi)掉了谱煤。
合理的并行度的設(shè)置,應(yīng)該是要設(shè)置的足夠大卓研,大到可以完全合理的利用你的集群資源趴俘。比如上面的例子睹簇,總共集群有150個(gè)cpu core奏赘,可以并行運(yùn)行150個(gè)task。那么就應(yīng)該將你的Application的并行度太惠,至少設(shè)置成150磨淌,才能完全有效的利用你的集群資源,讓150個(gè)task凿渊,并行執(zhí)行梁只。而且task增加到150個(gè)以后,即可以同時(shí)并行運(yùn)行埃脏,還可以讓每個(gè)task要處理的數(shù)據(jù)量變少搪锣。比如總共150G的數(shù)據(jù)要處理,如果是100個(gè)task彩掐,每個(gè)task計(jì)算1.5G的數(shù)據(jù)构舟,現(xiàn)在增加到150個(gè)task,可以并行運(yùn)行堵幽,而且每個(gè)task主要處理1G的數(shù)據(jù)就可以狗超。
很簡(jiǎn)單的道理,只要合理設(shè)置并行度朴下,就可以完全充分利用你的集群計(jì)算資源努咐,并且減少每個(gè)task要處理的數(shù)據(jù)量,最終殴胧,就是提升你的整個(gè)Spark作業(yè)的性能和運(yùn)行速度渗稍。
1.2.3、設(shè)置并行度
1)团滥、task數(shù)量竿屹,至少設(shè)置成與Spark
application的總cpu core數(shù)量相同(最理想情況,比如總共150個(gè)cpu core惫撰,分配了150個(gè)task羔沙,一起運(yùn)行,差不多同一時(shí)間運(yùn)行完畢)厨钻。
2)扼雏、官方是推薦坚嗜,task數(shù)量,設(shè)置成spark
application總cpu core數(shù)量的2~3倍诗充,比如150個(gè)cpu core苍蔬,基本要設(shè)置task數(shù)量為300~500。
實(shí)際情況蝴蜓,與理想情況不同的碟绑,有些task會(huì)運(yùn)行的快一點(diǎn),比如50s就完了茎匠,有些task格仲,可能會(huì)慢一點(diǎn),要1分半才運(yùn)行完诵冒,所以如果你的task數(shù)量凯肋,剛好設(shè)置的跟cpu core數(shù)量相同,可能還是會(huì)導(dǎo)致資源的浪費(fèi)汽馋。比如150個(gè)task侮东,10個(gè)先運(yùn)行完了,剩余140個(gè)還在運(yùn)行豹芯,但是這個(gè)時(shí)候悄雅,有10個(gè)cpu core就空閑出來(lái)了,就導(dǎo)致了浪費(fèi)铁蹈。那如果task數(shù)量設(shè)置成cpu core總數(shù)的2~3倍宽闲,那么一個(gè)task運(yùn)行完了以后,另一個(gè)task馬上可以補(bǔ)上來(lái)木缝,就盡量讓cpu core不要空閑便锨,同時(shí)也是盡量提升spark作業(yè)運(yùn)行的效率和速度,提升性能我碟。
3)放案、如何設(shè)置一個(gè)Spark
Application的并行度?
spark.default.parallelism
SparkConf conf = newSparkConf()
.set("spark.default.parallelism","500")
1.3矫俺、 重構(gòu)RDD架構(gòu)以及RDD持久化
1.3.1吱殉、RDD架構(gòu)重構(gòu)與優(yōu)化
盡量去復(fù)用RDD,差不多的RDD厘托,可以抽取成為一個(gè)共同的RDD友雳,供后面的RDD計(jì)算時(shí),反復(fù)使用铅匹。
1.3.2押赊、公共RDD一定要實(shí)現(xiàn)持久化
對(duì)于要多次計(jì)算和使用的公共RDD,一定要進(jìn)行持久化包斑。
持久化流礁,就是將RDD的數(shù)據(jù)緩存到內(nèi)存中/磁盤(pán)中(BlockManager)以后無(wú)論對(duì)這個(gè)RDD做多少次計(jì)算涕俗,那么都是直接取這個(gè)RDD的持久化的數(shù)據(jù),比如從內(nèi)存中或者磁盤(pán)中神帅,直接提取一份數(shù)據(jù)再姑。
1.3.3、持久化找御,是可以進(jìn)行序列化的
如果正常將數(shù)據(jù)持久化在內(nèi)存中元镀,那么可能會(huì)導(dǎo)致內(nèi)存的占用過(guò)大,這樣的話霎桅,也許栖疑,會(huì)導(dǎo)致OOM內(nèi)存溢出。
當(dāng)純內(nèi)存無(wú)法支撐公共RDD數(shù)據(jù)完全存放的時(shí)候哆档,就優(yōu)先考慮使用序列化的方式在純內(nèi)存中存儲(chǔ)蔽挠。將RDD的每個(gè)partition的數(shù)據(jù)住闯,序列化成一個(gè)大的字節(jié)數(shù)組瓜浸,就一個(gè)對(duì)象。序列化后比原,大大減少內(nèi)存的空間占用插佛。
序列化的方式,唯一的缺點(diǎn)就是量窘,在獲取數(shù)據(jù)的時(shí)候雇寇,需要反序列化。
如果序列化純內(nèi)存方式蚌铜,還是導(dǎo)致OOM內(nèi)存溢出锨侯,就只能考慮磁盤(pán)的方式、內(nèi)存+磁盤(pán)的普通方式(無(wú)序列化)冬殃、內(nèi)存+磁盤(pán)(序列化)囚痴。
1.3.4、為了數(shù)據(jù)的高可靠性审葬,而且內(nèi)存充足深滚,可以使用雙副本機(jī)制,進(jìn)行持久化涣觉。
持久化的雙副本機(jī)制痴荐,持久化后的一個(gè)副本,因?yàn)闄C(jī)器宕機(jī)了官册,副本丟了生兆,就還是得重新計(jì)算一次。持久化的每個(gè)數(shù)據(jù)單元膝宁,存儲(chǔ)一份副本鸦难,放在其他節(jié)點(diǎn)上面栖榨。從而進(jìn)行容錯(cuò)。一個(gè)副本丟了明刷,不用重新計(jì)算婴栽,還可以使用另外一份副本。這種方式辈末,僅僅針對(duì)你的內(nèi)存資源極度充足的情況愚争。
1.4、 廣播變量
1.4.1挤聘、概念及需求
Spark Application(我們自己寫(xiě)的Spark作業(yè))最開(kāi)始在Driver端轰枝,在我們提交任務(wù)的時(shí)候,需要傳遞到各個(gè)Executor的Task上運(yùn)行组去。對(duì)于一些只讀鞍陨、固定的數(shù)據(jù)(比如從DB中讀出的數(shù)據(jù)),每次都需要Driver廣播到各個(gè)Task上,這樣效率低下从隆。廣播變量允許將變量只廣播(提前廣播)給各個(gè)Executor诚撵。該Executor上的各個(gè)Task再?gòu)乃诠?jié)點(diǎn)的BlockManager獲取變量,如果本地沒(méi)有键闺,那么就從Driver遠(yuǎn)程拉取變量副本寿烟,并保存在本地的BlockManager中。此后這個(gè)executor上的task辛燥,都會(huì)直接使用本地的BlockManager中的副本筛武。而不是從Driver獲取變量,從而提升了效率挎塌。
一個(gè)Executor只需要在第一個(gè)Task啟動(dòng)時(shí)徘六,獲得一份Broadcast數(shù)據(jù),之后的Task都從本節(jié)點(diǎn)的BlockManager中獲取相關(guān)數(shù)據(jù)榴都。/
1.4.2待锈、使用方法
1)調(diào)用SparkContext.broadcast方法創(chuàng)建一個(gè)Broadcast[T]對(duì)象。任何序列化的類型都可以這么實(shí)現(xiàn)缭贡。
2)通過(guò)value屬性訪問(wèn)改對(duì)象的值(Java之中為value()方法)
3)變量只會(huì)被發(fā)送到各個(gè)節(jié)點(diǎn)一次炉擅,應(yīng)作為只讀值處理(修改這個(gè)值不會(huì)影響到別的節(jié)點(diǎn))
1.5、使用Kryo序列化
1.5.1阳惹、概念及需求
默認(rèn)情況下谍失,Spark內(nèi)部是使用Java的序列化機(jī)制,ObjectOutputStream / ObjectInputStream莹汤,對(duì)象輸入輸出流機(jī)制快鱼,來(lái)進(jìn)行序列化。
這種默認(rèn)序列化機(jī)制的好處在于,處理起來(lái)比較方便抹竹,也不需要我們手動(dòng)去做什么事情线罕,只是,你在算子里面使用的變量窃判,必須是實(shí)現(xiàn)Serializable接口的钞楼,可序列化即可。
但是缺點(diǎn)在于袄琳,默認(rèn)的序列化機(jī)制的效率不高询件,序列化的速度比較慢,序列化以后的數(shù)據(jù)唆樊,占用的內(nèi)存空間相對(duì)還是比較大宛琅。
Spark支持使用Kryo序列化機(jī)制。這種序列化機(jī)制逗旁,比默認(rèn)的Java序列化機(jī)制速度要快嘿辟,序列化后的數(shù)據(jù)更小,大概是Java序列化機(jī)制的1/10片效。
所以Kryo序列化優(yōu)化以后红伦,可以讓網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)變少,在集群中耗費(fèi)的內(nèi)存資源大大減少堤舒。
1.5.2色建、Kryo序列化機(jī)制啟用以后生效的幾個(gè)地方
1)、算子函數(shù)中使用到的外部變量舌缤,使用Kryo以后:優(yōu)化網(wǎng)絡(luò)傳輸?shù)男阅埽梢詢?yōu)化集群中內(nèi)存的占用和消耗
2)某残、持久化RDD学搜,優(yōu)化內(nèi)存的占用和消耗瞄沙。持久化RDD占用的內(nèi)存越少,task執(zhí)行的時(shí)候,創(chuàng)建的對(duì)象恭应,就不至于頻繁的占滿內(nèi)存,頻繁發(fā)生GC鸦采。
3)荸百、shuffle:可以優(yōu)化網(wǎng)絡(luò)傳輸?shù)男阅?/p>
1.5.3、使用方法
第一步剩拢,在SparkConf中設(shè)置一個(gè)屬性线得,spark.serializer,org.apache.spark.serializer.KryoSerializer類徐伐。
第二步贯钩,注冊(cè)你使用的需要通過(guò)Kryo序列化的一些自定義類,SparkConf.registerKryoClasses()。
項(xiàng)目中的使用:
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(newClass[]{CategorySortKey.class})
[if !supportLists]1.6角雷、 [endif]使用fastutil優(yōu)化數(shù)據(jù)格式
1.6.1祸穷、fastutil介紹
fastutil是擴(kuò)展了Java標(biāo)準(zhǔn)集合框架(Map、List勺三、Set雷滚。HashMap、ArrayList吗坚、HashSet)的類庫(kù)揭措,提供了特殊類型的map、set刻蚯、list和queue绊含。
fastutil能夠提供更小的內(nèi)存占用,更快的存取速度炊汹。我們使用fastutil提供的集合類躬充,來(lái)替代自己平時(shí)使用的JDK的原生的Map、List讨便、Set充甚,好處在于fastutil集合類可以減小內(nèi)存的占用,并且在進(jìn)行集合的遍歷霸褒、根據(jù)索引(或者key)獲取元素的值和設(shè)置元素的值的時(shí)候伴找,提供更快的存取速度。
fastutil也提供了64位的array废菱、set和list技矮,以及高性能快速的,以及實(shí)用的IO類殊轴,來(lái)處理二進(jìn)制和文本類型的文件衰倦。
fastutil最新版本要求Java 7以及以上版本。
fastutil的每一種集合類型旁理,都實(shí)現(xiàn)了對(duì)應(yīng)的Java中的標(biāo)準(zhǔn)接口(比如fastutil的map樊零,實(shí)現(xiàn)了Java的Map接口),因此可以直接放入已有系統(tǒng)的任何代碼中孽文。
fastutil還提供了一些JDK標(biāo)準(zhǔn)類庫(kù)中沒(méi)有的額外功能(比如雙向迭代器)驻襟。
fastutil除了對(duì)象和原始類型為元素的集合,fastutil也提供引用類型的支持芋哭,但是對(duì)引用類型是使用等于號(hào)(=)進(jìn)行比較的沉衣,而不是equals()方法。
fastutil盡量提供了在任何場(chǎng)景下都是速度最快的集合類庫(kù)楷掉。
1.6.2厢蒜、Spark中應(yīng)用fastutil的場(chǎng)景
1)霞势、如果算子函數(shù)使用了外部變量。第一斑鸦,你可以使用Broadcast廣播變量?jī)?yōu)化愕贡。第二,可以使用Kryo序列化類庫(kù)巷屿,提升序列化性能和效率固以。第三,如果外部變量是某種比較大的集合嘱巾,那么可以考慮使用fastutil改寫(xiě)外部變量憨琳,首先從源頭上就減少內(nèi)存的占用,通過(guò)廣播變量進(jìn)一步減少內(nèi)存占用旬昭,再通過(guò)Kryo序列化類庫(kù)進(jìn)一步減少內(nèi)存占用篙螟。
2)、在你的算子函數(shù)里问拘,也就是task要執(zhí)行的計(jì)算邏輯里面遍略,如果有邏輯中,出現(xiàn)骤坐,要?jiǎng)?chuàng)建比較大的Map绪杏、List等集合,可能會(huì)占用較大的內(nèi)存空間纽绍,而且可能涉及到消耗性能的遍歷蕾久、存取等集合操作,此時(shí)拌夏,可以考慮將這些集合類型使用fastutil類庫(kù)重寫(xiě)僧著,使用了fastutil集合類以后,就可以在一定程度上辖佣,減少task創(chuàng)建出來(lái)的集合類型的內(nèi)存占用霹抛。避免executor內(nèi)存頻繁占滿,頻繁喚起GC卷谈,導(dǎo)致性能下降。
1.6.3霞篡、關(guān)于fastutil調(diào)優(yōu)的說(shuō)明
fastutil其實(shí)沒(méi)有你想象中的那么強(qiáng)大世蔗,也不會(huì)跟官網(wǎng)上說(shuō)的效果那么一鳴驚人。廣播變量朗兵、Kryo序列化類庫(kù)污淋、fastutil,都是之前所說(shuō)的余掖,對(duì)于性能來(lái)說(shuō)寸爆,類似于一種調(diào)味品,烤雞,本來(lái)就很好吃了赁豆,然后加了一點(diǎn)特質(zhì)的孜然麻辣粉調(diào)料仅醇,就更加好吃了一點(diǎn)。分配資源魔种、并行度析二、RDD架構(gòu)與持久化,這三個(gè)就是烤雞节预。broadcast叶摄、kryo、fastutil安拟,類似于調(diào)料蛤吓。
比如說(shuō),你的spark作業(yè)糠赦,經(jīng)過(guò)之前一些調(diào)優(yōu)以后会傲,大概30分鐘運(yùn)行完,現(xiàn)在加上broadcast愉棱、kryo唆铐、fastutil,也許就是優(yōu)化到29分鐘運(yùn)行完奔滑、或者更好一點(diǎn)艾岂,也許就是28分鐘、25分鐘朋其。
shuffle調(diào)優(yōu)王浴,15分鐘。groupByKey用reduceByKey改寫(xiě)梅猿,執(zhí)行本地聚合氓辣,也許10分鐘。跟公司申請(qǐng)更多的資源袱蚓,比如資源更大的YARN隊(duì)列钞啸,1分鐘。
1.6.4喇潘、fastutil的使用
在pom.xml中引用fastutil的包
fastutil
fastutil
5.0.9
速度比較慢体斩,可能是從國(guó)外的網(wǎng)去拉取jar包,可能要等待5分鐘颖低,甚至幾十分鐘絮吵,不等
List 相當(dāng)于IntList
基本都是類似于IntList的格式,前綴就是集合的元素類型忱屑。特殊的就是Map蹬敲,Int2IntMap暇昂,代表了key-value映射的元素類型。除此之外伴嗡,還支持object急波、reference。
[if !supportLists]1.7闹究、 [endif]調(diào)節(jié)數(shù)據(jù)本地化等待時(shí)長(zhǎng)
1.7.1幔崖、task的locality有五種
1)、PROCESS_LOCAL:進(jìn)程本地化渣淤,代碼和數(shù)據(jù)在同一個(gè)進(jìn)程中赏寇,也就是在同一個(gè)executor中。計(jì)算數(shù)據(jù)的task由executor執(zhí)行价认,數(shù)據(jù)在executor的BlockManager中嗅定,性能最好。
2)用踩、NODE_LOCAL:節(jié)點(diǎn)本地化渠退,代碼和數(shù)據(jù)在同一個(gè)節(jié)點(diǎn)中。比如說(shuō)脐彩,數(shù)據(jù)作為一個(gè)HDFS block塊碎乃,就在節(jié)點(diǎn)上,而task在節(jié)點(diǎn)上某個(gè)executor中運(yùn)行惠奸,或者是梅誓,數(shù)據(jù)和task在一個(gè)節(jié)點(diǎn)上的不同executor中,數(shù)據(jù)需要在進(jìn)程間進(jìn)行傳輸佛南。
3)梗掰、NO_PREF:對(duì)于task來(lái)說(shuō),數(shù)據(jù)從哪里獲取都一樣嗅回,沒(méi)有好壞之分及穗。
4)、RACK_LOCAL:機(jī)架本地化绵载,數(shù)據(jù)和task在一個(gè)機(jī)架的兩個(gè)節(jié)點(diǎn)上埂陆,數(shù)據(jù)需要通過(guò)網(wǎng)絡(luò)在節(jié)點(diǎn)之間進(jìn)行傳輸。
5)娃豹、ANY:數(shù)據(jù)和task可能在集群中的任何地方猜惋,而且不在一個(gè)機(jī)架中,性能最差培愁。
1.7.2、Spark的任務(wù)調(diào)度
Spark在Driver上缓窜,對(duì)Application的每一個(gè)stage的task進(jìn)行分配之前都會(huì)計(jì)算出每個(gè)task要計(jì)算的是哪個(gè)分片數(shù)據(jù)定续。Spark的task分配算法優(yōu)先會(huì)希望每個(gè)task正好分配到它要計(jì)算的數(shù)據(jù)所在的節(jié)點(diǎn)谍咆,這樣的話,就不用在網(wǎng)絡(luò)間傳輸數(shù)據(jù)私股。
但是摹察,有時(shí)可能task沒(méi)有機(jī)會(huì)分配到它的數(shù)據(jù)所在的節(jié)點(diǎn)。為什么呢倡鲸,可能那個(gè)節(jié)點(diǎn)的計(jì)算資源和計(jì)算能力都滿了供嚎。所以這種時(shí)候, Spark會(huì)等待一段時(shí)間峭状,默認(rèn)情況下是3s(不是絕對(duì)的克滴,還有很多種情況,對(duì)不同的本地化級(jí)別优床,都會(huì)去等待)劝赔,到最后,實(shí)在是等待不了了胆敞,就會(huì)選擇一個(gè)比較差的本地化級(jí)別着帽。比如說(shuō),將task分配到靠它要計(jì)算的數(shù)據(jù)所在節(jié)點(diǎn)比較近的一個(gè)節(jié)點(diǎn)移层,然后進(jìn)行計(jì)算仍翰。
但是對(duì)于第二種情況,通常來(lái)說(shuō)观话,肯定是要發(fā)生數(shù)據(jù)傳輸予借,task會(huì)通過(guò)其所在節(jié)點(diǎn)的BlockManager來(lái)獲取數(shù)據(jù),BlockManager發(fā)現(xiàn)自己本地沒(méi)有數(shù)據(jù)匪燕,會(huì)通過(guò)一個(gè)getRemote()方法蕾羊,通過(guò)TransferService(網(wǎng)絡(luò)數(shù)據(jù)傳輸組件)從數(shù)據(jù)所在節(jié)點(diǎn)的BlockManager中,獲取數(shù)據(jù)帽驯,通過(guò)網(wǎng)絡(luò)傳輸回task所在節(jié)點(diǎn)龟再。
對(duì)于我們來(lái)說(shuō),當(dāng)然不希望是類似于第二種情況的了尼变。最好的利凑,當(dāng)然是task和數(shù)據(jù)在一個(gè)節(jié)點(diǎn)上,直接從本地executor的BlockManager中獲取數(shù)據(jù)嫌术,純內(nèi)存哀澈,或者帶一點(diǎn)磁盤(pán)IO。如果要通過(guò)網(wǎng)絡(luò)傳輸數(shù)據(jù)的話度气,性能肯定會(huì)下降的割按。大量網(wǎng)絡(luò)傳輸,以及磁盤(pán)IO磷籍,都是性能的殺手适荣。
1.7.3现柠、我們什么時(shí)候要調(diào)節(jié)這個(gè)參數(shù)
觀察spark作業(yè)的運(yùn)行日志。推薦大家在測(cè)試的時(shí)候弛矛,先用client模式在本地就直接可以看到比較全的日志够吩。日志里面會(huì)顯示:starting task…,PROCESS LOCAL丈氓、NODE LOCAL
觀察大部分task的數(shù)據(jù)本地化級(jí)別周循,如果大多都是PROCESS_LOCAL,那就不用調(diào)節(jié)了万俗。
如果是發(fā)現(xiàn)湾笛,好多的級(jí)別都是NODE_LOCAL、ANY该编,那么最好就去調(diào)節(jié)一下數(shù)據(jù)本地化的等待時(shí)長(zhǎng)迄本。要反復(fù)調(diào)節(jié),每次調(diào)節(jié)完以后再運(yùn)行并觀察日志课竣,看看大部分的task的本地化級(jí)別有沒(méi)有提升嘉赎,看看整個(gè)spark作業(yè)的運(yùn)行時(shí)間有沒(méi)有縮短。注意于樟,不要本末倒置公条,不要本地化級(jí)別是提升了,但是因?yàn)榇罅康牡却龝r(shí)長(zhǎng)迂曲,spark作業(yè)的運(yùn)行時(shí)間反而增加了靶橱,那還是不要調(diào)節(jié)了。
1.7.4路捧、怎么調(diào)節(jié)
spark.locality.wait关霸,默認(rèn)是3s。6s杰扫,10s
默認(rèn)情況下队寇,下面3個(gè)的等待時(shí)長(zhǎng),都是跟上面那個(gè)是一樣的章姓,都是3s
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack
newSparkConf().set("spark.locality.wait", "10")
2佳遣、JVM調(diào)優(yōu)
堆內(nèi)存存放我們創(chuàng)建的一些對(duì)象,有老年代和年輕代凡伊。理想情況下零渐,老年代都是放一些生命周期很長(zhǎng)的對(duì)象,數(shù)量應(yīng)該是很少的系忙,比如數(shù)據(jù)庫(kù)連接池诵盼。我們?cè)趕park task執(zhí)行算子函數(shù)(我們自己寫(xiě)的),可能會(huì)創(chuàng)建很多對(duì)象,這些對(duì)象都是要放入JVM年輕代中的拦耐。
每一次放對(duì)象的時(shí)候耕腾,都是放入eden區(qū)域,和其中一個(gè)survivor區(qū)域杀糯。另外一個(gè)survivor區(qū)域是空閑的。
當(dāng)eden區(qū)域和一個(gè)survivor區(qū)域放滿了以后(spark運(yùn)行過(guò)程中苍苞,產(chǎn)生的對(duì)象實(shí)在太多了)固翰,就會(huì)觸發(fā)minor gc,小型垃圾回收羹呵。把不再使用的對(duì)象骂际,從內(nèi)存中清空,給后面新創(chuàng)建的對(duì)象騰出來(lái)點(diǎn)兒地方冈欢。
清理掉了不再使用的對(duì)象之后歉铝,那么也會(huì)將存活下來(lái)的對(duì)象(還要繼續(xù)使用的),放入之前空閑的那一個(gè)survivor區(qū)域中凑耻。這里可能會(huì)出現(xiàn)一個(gè)問(wèn)題。默認(rèn)eden、survior1和survivor2的內(nèi)存占比是8:1:1克胳。問(wèn)題是不撑,如果存活下來(lái)的對(duì)象是1.5,一個(gè)survivor區(qū)域放不下邻吭。此時(shí)就可能通過(guò)JVM的擔(dān)保機(jī)制(不同JVM版本可能對(duì)應(yīng)的行為)餐弱,將多余的對(duì)象,直接放入老年代了囱晴。
如果你的JVM內(nèi)存不夠大的話膏蚓,可能導(dǎo)致頻繁的年輕代內(nèi)存滿溢,頻繁的進(jìn)行minor gc畸写。頻繁的minor gc會(huì)導(dǎo)致短時(shí)間內(nèi)驮瞧,有些存活的對(duì)象,多次垃圾回收都沒(méi)有回收掉艺糜。會(huì)導(dǎo)致這種短生命周期(其實(shí)不一定是要長(zhǎng)期使用的)對(duì)象剧董,年齡過(guò)大,垃圾回收次數(shù)太多還沒(méi)有回收到破停,跑到老年代翅楼。
老年代中,可能會(huì)因?yàn)閮?nèi)存不足真慢,囤積一大堆毅臊,短生命周期的,本來(lái)應(yīng)該在年輕代中的黑界,可能馬上就要被回收掉的對(duì)象管嬉。此時(shí)皂林,可能導(dǎo)致老年代頻繁滿溢。頻繁進(jìn)行full gc(全局/全面垃圾回收)蚯撩。full gc就會(huì)去回收老年代中的對(duì)象础倍。full gc由于這個(gè)算法的設(shè)計(jì),是針對(duì)的是胎挎,老年代中的對(duì)象數(shù)量很少沟启,滿溢進(jìn)行full gc的頻率應(yīng)該很少,因此采取了不太復(fù)雜犹菇,但是耗費(fèi)性能和時(shí)間的垃圾回收算法德迹。full gc很慢。
full gc / minor gc揭芍,無(wú)論是快胳搞,還是慢,都會(huì)導(dǎo)致jvm的工作線程停止工作称杨,stop the world肌毅。簡(jiǎn)而言之,就是說(shuō)列另,gc的時(shí)候芽腾,spark停止工作了。等著垃圾回收結(jié)束页衙。
內(nèi)存不充足的時(shí)候摊滔,出現(xiàn)的問(wèn)題:
1、頻繁minor gc店乐,也會(huì)導(dǎo)致頻繁spark停止工作
2艰躺、老年代囤積大量活躍對(duì)象(短生命周期的對(duì)象),導(dǎo)致頻繁full gc眨八,full gc時(shí)間很長(zhǎng)腺兴,短則數(shù)十秒,長(zhǎng)則數(shù)分鐘廉侧,甚至數(shù)小時(shí)页响。可能導(dǎo)致spark長(zhǎng)時(shí)間停止工作段誊。
3闰蚕、嚴(yán)重影響咱們的spark的性能和運(yùn)行的速度。
2.1连舍、降低cache操作的內(nèi)存占比
spark中没陡,堆內(nèi)存又被劃分成了兩塊,一塊是專門用來(lái)給RDD的cache、persist操作進(jìn)行RDD數(shù)據(jù)緩存用的盼玄。另外一塊用來(lái)給spark算子函數(shù)的運(yùn)行使用的贴彼,存放函數(shù)中自己創(chuàng)建的對(duì)象。
默認(rèn)情況下埃儿,給RDD cache操作的內(nèi)存占比器仗,是0.6,60%的內(nèi)存都給了cache操作了蝌箍。但是問(wèn)題是青灼,如果某些情況下cache不是那么的緊張,問(wèn)題在于task算子函數(shù)中創(chuàng)建的對(duì)象過(guò)多妓盲,然后內(nèi)存又不太大,導(dǎo)致了頻繁的minor gc专普,甚至頻繁full gc悯衬,導(dǎo)致spark頻繁的停止工作。性能影響會(huì)很大檀夹。
針對(duì)上述這種情況筋粗,可以在任務(wù)運(yùn)行界面,去查看你的spark作業(yè)的運(yùn)行統(tǒng)計(jì)炸渡,可以看到每個(gè)stage的運(yùn)行情況娜亿,包括每個(gè)task的運(yùn)行時(shí)間、gc時(shí)間等等蚌堵。如果發(fā)現(xiàn)gc太頻繁买决,時(shí)間太長(zhǎng)。此時(shí)就可以適當(dāng)調(diào)價(jià)這個(gè)比例吼畏。
降低cache操作的內(nèi)存占比督赤,大不了用persist操作,選擇將一部分緩存的RDD數(shù)據(jù)寫(xiě)入磁盤(pán)泻蚊,或者序列化方式躲舌,配合Kryo序列化類,減少RDD緩存的內(nèi)存占用性雄。降低cache操作內(nèi)存占比没卸,對(duì)應(yīng)的,算子函數(shù)的內(nèi)存占比就提升了秒旋。這個(gè)時(shí)候约计,可能就可以減少minor gc的頻率,同時(shí)減少full gc的頻率滩褥。對(duì)性能的提升是有一定的幫助的病蛉。
一句話,讓task執(zhí)行算子函數(shù)時(shí),有更多的內(nèi)存可以使用铺然。
spark.storage.memoryFraction俗孝,0.6 ->0.5 -> 0.4 -> 0.2
2.2、調(diào)節(jié)executor堆外內(nèi)存與連接等待時(shí)長(zhǎng)
調(diào)節(jié)executor堆外內(nèi)存
有時(shí)候魄健,如果你的spark作業(yè)處理的數(shù)據(jù)量特別大赋铝,幾億數(shù)據(jù)量。然后spark作業(yè)一運(yùn)行沽瘦,時(shí)不時(shí)的報(bào)錯(cuò)革骨,shuffle file cannot find,executor析恋、task lost良哲,out of
memory(內(nèi)存溢出)。
可能是executor的堆外內(nèi)存不太夠用助隧,導(dǎo)致executor在運(yùn)行的過(guò)程中筑凫,可能會(huì)內(nèi)存溢出,可能導(dǎo)致后續(xù)的stage的task在運(yùn)行的時(shí)候并村,要從一些executor中去拉取shuffle map output文件巍实,但是executor可能已經(jīng)掛掉了,關(guān)聯(lián)的block
manager也沒(méi)有了哩牍。所以會(huì)報(bào)shuffle output file not found棚潦,resubmitting
task,executor lost膝昆。spark作業(yè)徹底崩潰丸边。
上述情況下,就可以去考慮調(diào)節(jié)一下executor的堆外內(nèi)存外潜。也許就可以避免報(bào)錯(cuò)原环。此外,有時(shí)堆外內(nèi)存調(diào)節(jié)的比較大的時(shí)候处窥,對(duì)于性能來(lái)說(shuō)嘱吗,也會(huì)帶來(lái)一定的提升。
可以調(diào)節(jié)堆外內(nèi)存的上限:
--conf spark.yarn.executor.memoryOverhead=2048
spark-submit腳本里面滔驾,去用--conf的方式谒麦,去添加配置。用new
SparkConf().set()這種方式去設(shè)置是沒(méi)有用的哆致!一定要在spark-submit腳本中去設(shè)置绕德。
spark.yarn.executor.memoryOverhead(看名字,顧名思義摊阀,針對(duì)的是基于yarn的提交模式)
默認(rèn)情況下耻蛇,這個(gè)堆外內(nèi)存上限大概是300M踪蹬。通常在項(xiàng)目中,真正處理大數(shù)據(jù)的時(shí)候臣咖,這里都會(huì)出現(xiàn)問(wèn)題跃捣,導(dǎo)致spark作業(yè)反復(fù)崩潰,無(wú)法運(yùn)行夺蛇。此時(shí)就會(huì)去調(diào)節(jié)這個(gè)參數(shù)疚漆,到至少1G(1024M),甚至說(shuō)2G刁赦、4G娶聘。
通常這個(gè)參數(shù)調(diào)節(jié)上去以后,就會(huì)避免掉某些JVM OOM的異常問(wèn)題甚脉,同時(shí)呢丸升,會(huì)讓整體spark作業(yè)的性能,得到較大的提升牺氨。
調(diào)節(jié)連接等待時(shí)長(zhǎng)
我們知道发钝,executor會(huì)優(yōu)先從自己本地關(guān)聯(lián)的BlockManager中獲取某份數(shù)據(jù)。如果本地block manager沒(méi)有的話波闹,那么會(huì)通過(guò)TransferService,去遠(yuǎn)程連接其他節(jié)點(diǎn)上executor的block manager去獲取涛碑。
而此時(shí)上面executor去遠(yuǎn)程連接的那個(gè)executor精堕,因?yàn)閠ask創(chuàng)建的對(duì)象特別大,特別多蒲障,
頻繁的讓JVM堆內(nèi)存滿溢歹篓,正在進(jìn)行垃圾回收。而處于垃圾回收過(guò)程中揉阎,所有的工作線程全部停止庄撮,相當(dāng)于只要一旦進(jìn)行垃圾回收,spark / executor停止工作毙籽,無(wú)法提供響應(yīng)洞斯。
此時(shí)呢,就會(huì)沒(méi)有響應(yīng)坑赡,無(wú)法建立網(wǎng)絡(luò)連接烙如,會(huì)卡住。spark默認(rèn)的網(wǎng)絡(luò)連接的超時(shí)時(shí)長(zhǎng)毅否,是60s亚铁,如果卡住60s都無(wú)法建立連接的話,那么就宣告失敗了螟加。
報(bào)錯(cuò)幾次徘溢,幾次都拉取不到數(shù)據(jù)的話吞琐,可能會(huì)導(dǎo)致spark作業(yè)的崩潰。也可能會(huì)導(dǎo)致DAGScheduler然爆,反復(fù)提交幾次stage站粟。TaskScheduler反復(fù)提交幾次task。大大延長(zhǎng)我們的spark作業(yè)的運(yùn)行時(shí)間施蜜。
可以考慮調(diào)節(jié)連接的超時(shí)時(shí)長(zhǎng):
--conf spark.core.connection.ack.wait.timeout=300
spark-submit腳本卒蘸,切記,不是在new
SparkConf().set()這種方式來(lái)設(shè)置的翻默。
spark.core.connection.ack.wait.timeout(spark core缸沃,connection,連接修械,ack趾牧,wait timeout,建立不上連接的時(shí)候肯污,超時(shí)等待時(shí)長(zhǎng))
調(diào)節(jié)這個(gè)值比較大以后翘单,通常來(lái)說(shuō),可以避免部分的偶爾出現(xiàn)的某某文件拉取失敗蹦渣,某某文件lost掉了哄芜。
3、Shuffle調(diào)優(yōu)
原理概述:
什么樣的情況下柬唯,會(huì)發(fā)生shuffle认臊?
在spark中,主要是以下幾個(gè)算子:groupByKey锄奢、reduceByKey失晴、countByKey、join拘央,等等涂屁。
什么是shuffle?
groupByKey灰伟,要把分布在集群各個(gè)節(jié)點(diǎn)上的數(shù)據(jù)中的同一個(gè)key拆又,對(duì)應(yīng)的values,都要集中到一塊兒袱箱,集中到集群中同一個(gè)節(jié)點(diǎn)上遏乔,更嚴(yán)密一點(diǎn)說(shuō),就是集中到一個(gè)節(jié)點(diǎn)的一個(gè)executor的一個(gè)task中发笔。
然后呢盟萨,集中一個(gè)key對(duì)應(yīng)的values之后,才能交給我們來(lái)進(jìn)行處理了讨,>捻激。reduceByKey制轰,算子函數(shù)去對(duì)values集合進(jìn)行reduce操作,最后變成一個(gè)value胞谭。countByKey需要在一個(gè)task中垃杖,獲取到一個(gè)key對(duì)應(yīng)的所有的value,然后進(jìn)行計(jì)數(shù)丈屹,統(tǒng)計(jì)一共有多少個(gè)value调俘。join,RDD旺垒,RDD
value>彩库,只要是兩個(gè)RDD中,key相同對(duì)應(yīng)的2個(gè)value先蒋,都能到一個(gè)節(jié)點(diǎn)的executor的task中骇钦,給我們進(jìn)行處理。
shuffle竞漾,一定是分為兩個(gè)stage來(lái)完成的眯搭。因?yàn)檫@其實(shí)是個(gè)逆向的過(guò)程,不是stage決定shuffle业岁,是shuffle決定stage鳞仙。
reduceByKey(+),在某個(gè)action觸發(fā)job的時(shí)候笔时,DAGScheduler繁扎,會(huì)負(fù)責(zé)劃分job為多個(gè)stage。劃分的依據(jù)糊闽,就是,如果發(fā)現(xiàn)有會(huì)觸發(fā)shuffle操作的算子爹梁,比如reduceByKey右犹,就將這個(gè)操作的前半部分,以及之前所有的RDD和transformation操作姚垃,劃分為一個(gè)stage念链。shuffle操作的后半部分,以及后面的积糯,直到action為止的RDD和transformation操作掂墓,劃分為另外一個(gè)stage。
3.1看成、合并map端輸出文件
3.1.1君编、如果不合并map端輸出文件的話,會(huì)怎么樣川慌?
舉例實(shí)際生產(chǎn)環(huán)境的條件:
100個(gè)節(jié)點(diǎn)(每個(gè)節(jié)點(diǎn)一個(gè)executor):100個(gè)executor
每個(gè)executor:2個(gè)cpu core
總共1000個(gè)task:每個(gè)executor平均10個(gè)task
每個(gè)節(jié)點(diǎn)吃嘿,10個(gè)task祠乃,每個(gè)節(jié)點(diǎn)會(huì)輸出多少份map端文件?10 * 1000=1萬(wàn)個(gè)文件
總共有多少份map端輸出文件兑燥?100 * 10000 = 100萬(wàn)亮瓷。
第一個(gè)stage,每個(gè)task降瞳,都會(huì)給第二個(gè)stage的每個(gè)task創(chuàng)建一份map端的輸出文件
第二個(gè)stage嘱支,每個(gè)task,會(huì)到各個(gè)節(jié)點(diǎn)上面去挣饥,拉取第一個(gè)stage每個(gè)task輸出的除师,屬于自己的那一份文件。
shuffle中的寫(xiě)磁盤(pán)的操作亮靴,基本上就是shuffle中性能消耗最為嚴(yán)重的部分馍盟。
通過(guò)上面的分析,一個(gè)普通的生產(chǎn)環(huán)境的spark job的一個(gè)shuffle環(huán)節(jié)茧吊,會(huì)寫(xiě)入磁盤(pán)100萬(wàn)個(gè)文件贞岭。
磁盤(pán)IO對(duì)性能和spark作業(yè)執(zhí)行速度的影響,是極其驚人和嚇人的搓侄。
基本上瞄桨,spark作業(yè)的性能嘉涌,都消耗在shuffle中了坷牛,雖然不只是shuffle的map端輸出文件這一個(gè)部分乳讥,但是這里也是非常大的一個(gè)性能消耗點(diǎn)。
3.1.2唉工、開(kāi)啟shuffle map端輸出文件合并的機(jī)制
new SparkConf().set("spark.shuffle.consolidateFiles","true")
默認(rèn)情況下汹忠,是不開(kāi)啟的宽菜,就是會(huì)發(fā)生如上所述的大量map端輸出文件的操作继谚,嚴(yán)重影響性能僧界。
3.1.3、合并map端輸出文件,對(duì)咱們的spark的性能有哪些方面的影響呢?
1懊直、map task寫(xiě)入磁盤(pán)文件的IO,減少:100萬(wàn)文件-> 20萬(wàn)文件
2、第二個(gè)stage饶火,原本要拉取第一個(gè)stage的task數(shù)量份文件抖僵,1000個(gè)task,第二個(gè)stage的每個(gè)task,都要拉取1000份文件,走網(wǎng)絡(luò)傳輸嚷闭。合并以后,100個(gè)節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)2個(gè)cpu core,第二個(gè)stage的每個(gè)task,主要拉取100 * 2 = 200個(gè)文件即可励稳。此時(shí)網(wǎng)絡(luò)傳輸?shù)男阅芟囊泊蟠鬁p少琅绅。
分享一下,實(shí)際在生產(chǎn)環(huán)境中,使用了spark.shuffle.consolidateFiles機(jī)制以后顺呕,實(shí)際的性能調(diào)優(yōu)的效果:對(duì)于上述的這種生產(chǎn)環(huán)境的配置图焰,性能的提升卧抗,還是相當(dāng)?shù)目捎^的。spark作業(yè),5個(gè)小時(shí)-> 2~3個(gè)小時(shí)。
大家不要小看這個(gè)map端輸出文件合并機(jī)制。實(shí)際上,在數(shù)據(jù)量比較大,你自己本身做了前面的性能調(diào)優(yōu),executor上去->cpu core上去->并行度(task數(shù)量)上去,shuffle沒(méi)調(diào)優(yōu),shuffle就很糟糕了断盛。大量的map端輸出文件的產(chǎn)生屑宠,對(duì)性能有比較惡劣的影響。
這個(gè)時(shí)候,去開(kāi)啟這個(gè)機(jī)制,可以很有效的提升性能。
3.2、調(diào)節(jié)map端內(nèi)存緩沖與reduce端內(nèi)存占比
3.2.1、默認(rèn)情況下可能出現(xiàn)的問(wèn)題
默認(rèn)情況下,shuffle的map task,輸出到磁盤(pán)文件的時(shí)候,統(tǒng)一都會(huì)先寫(xiě)入每個(gè)task自己關(guān)聯(lián)的一個(gè)內(nèi)存緩沖區(qū)。
這個(gè)緩沖區(qū)大小,默認(rèn)是32kb。
每一次,當(dāng)內(nèi)存緩沖區(qū)滿溢之后苦囱,才會(huì)進(jìn)行spill溢寫(xiě)操作猛拴,溢寫(xiě)到磁盤(pán)文件中去。
reduce端task,在拉取到數(shù)據(jù)之后,會(huì)用hashmap的數(shù)據(jù)格式,來(lái)對(duì)各個(gè)key對(duì)應(yīng)的values進(jìn)行匯聚。
針對(duì)每個(gè)key對(duì)應(yīng)的values,執(zhí)行我們自定義的聚合函數(shù)的代碼,比如_ + _(把所有values累加起來(lái))。
reduce task,在進(jìn)行匯聚隶糕、聚合等操作的時(shí)候蜒什,實(shí)際上铃拇,使用的就是自己對(duì)應(yīng)的executor的內(nèi)存显晶,executor(jvm進(jìn)程,堆),默認(rèn)executor內(nèi)存中劃分給reduce task進(jìn)行聚合的比例是0.2。
問(wèn)題來(lái)了,因?yàn)楸壤?.2,所以航闺,理論上懈叹,很有可能會(huì)出現(xiàn)畏吓,拉取過(guò)來(lái)的數(shù)據(jù)很多宏悦,那么在內(nèi)存中,放不下。這個(gè)時(shí)候,默認(rèn)的行為就是將在內(nèi)存放不下的數(shù)據(jù)都spill(溢寫(xiě))到磁盤(pán)文件中去逃片。
在數(shù)據(jù)量比較大的情況下,可能頻繁地發(fā)生reduce端的磁盤(pán)文件的讀寫(xiě)。
3.2.2、調(diào)優(yōu)方式
調(diào)節(jié)map task內(nèi)存緩沖:spark.shuffle.file.buffer,默認(rèn)32k(spark 1.3.x不是這個(gè)參數(shù),后面還有一個(gè)后綴,kb厚掷。spark 1.5.x以后勤哗,變了冬竟,就是現(xiàn)在這個(gè)參數(shù))
調(diào)節(jié)reduce端聚合內(nèi)存占比:spark.shuffle.memoryFraction,0.2
3.2.3民逼、在實(shí)際生產(chǎn)環(huán)境中泵殴,我們?cè)谑裁磿r(shí)候來(lái)調(diào)節(jié)兩個(gè)參數(shù)拼苍?
看Spark UI笑诅,如果你的公司是決定采用standalone模式,那么狠簡(jiǎn)單疮鲫,你的spark跑起來(lái)吆你,會(huì)顯示一個(gè)Spark UI的地址,4040的端口俊犯。進(jìn)去觀察每個(gè)stage的詳情妇多,有哪些executor,有哪些task燕侠,每個(gè)task的shuffle write和shuffle read的量砌梆,shuffle的磁盤(pán)和內(nèi)存讀寫(xiě)的數(shù)據(jù)量默责。如果是用的yarn模式來(lái)提交,從yarn的界面進(jìn)去咸包,點(diǎn)擊對(duì)應(yīng)的application桃序,進(jìn)入Spark UI,查看詳情烂瘫。
如果發(fā)現(xiàn)shuffle 磁盤(pán)的write和read媒熊,很大。這個(gè)時(shí)候坟比,就意味著最好調(diào)節(jié)一些shuffle的參數(shù)芦鳍。首先當(dāng)然是考慮開(kāi)啟map端輸出文件合并機(jī)制。其次調(diào)節(jié)上面說(shuō)的那兩個(gè)參數(shù)葛账。調(diào)節(jié)的時(shí)候的原則:spark.shuffle.file.buffer每次擴(kuò)大一倍柠衅,然后看看效果,64籍琳,128菲宴。spark.shuffle.memoryFraction,每次提高0.1趋急,看看效果喝峦。
不能調(diào)節(jié)的太大,太大了以后過(guò)猶不及呜达,因?yàn)閮?nèi)存資源是有限的谣蠢,你這里調(diào)節(jié)的太大了,其他環(huán)節(jié)的內(nèi)存使用就會(huì)有問(wèn)題了查近。
3.2.4眉踱、調(diào)節(jié)以后的效果
map task內(nèi)存緩沖變大了,減少spill到磁盤(pán)文件的次數(shù)霜威。reduce端聚合內(nèi)存變大了勋锤,減少spill到磁盤(pán)的次數(shù),而且減少了后面聚合讀取磁盤(pán)文件的數(shù)量侥祭。
3.3叁执、HashShuffleManager與SortShuffleManager
3.3.1、shuffle調(diào)優(yōu)概述
大多數(shù)Spark作業(yè)的性能主要就是消耗在了shuffle環(huán)節(jié)矮冬,因?yàn)樵摥h(huán)節(jié)包含了大量的磁盤(pán)IO谈宛、序列化、網(wǎng)絡(luò)數(shù)據(jù)傳輸?shù)炔僮魈ナ稹R虼诉郝迹绻屪鳂I(yè)的性能更上一層樓,就有必要對(duì)shuffle過(guò)程進(jìn)行調(diào)優(yōu)琼牧。但是也必須提醒大家的是恢筝,影響一個(gè)Spark作業(yè)性能的因素哀卫,主要還是代碼開(kāi)發(fā)、資源參數(shù)以及數(shù)據(jù)傾斜撬槽,shuffle調(diào)優(yōu)只能在整個(gè)Spark的性能調(diào)優(yōu)中占到一小部分而已此改。因此大家務(wù)必把握住調(diào)優(yōu)的基本原則,千萬(wàn)不要舍本逐末侄柔。下面我們就給大家詳細(xì)講解shuffle的原理共啃,以及相關(guān)參數(shù)的說(shuō)明,同時(shí)給出各個(gè)參數(shù)的調(diào)優(yōu)建議暂题。
3.3.2移剪、ShuffleManager發(fā)展概述
在Spark的源碼中,負(fù)責(zé)shuffle過(guò)程的執(zhí)行薪者、計(jì)算和處理的組件主要就是ShuffleManager纵苛,也即shuffle管理器。
在Spark 1.2以前言津,默認(rèn)的shuffle計(jì)算引擎是HashShuffleManager攻人。該ShuffleManager而HashShuffleManager有著一個(gè)非常嚴(yán)重的弊端,就是會(huì)產(chǎn)生大量的中間磁盤(pán)文件纺念,進(jìn)而由大量的磁盤(pán)IO操作影響了性能。
因此在Spark 1.2以后的版本中想括,默認(rèn)的ShuffleManager改成了SortShuffleManager陷谱。SortShuffleManager相較于HashShuffleManager來(lái)說(shuō),有了一定的改進(jìn)瑟蜈。主要就在于烟逊,每個(gè)Task在進(jìn)行shuffle操作時(shí),雖然也會(huì)產(chǎn)生較多的臨時(shí)磁盤(pán)文件铺根,但是最后會(huì)將所有的臨時(shí)文件合并(merge)成一個(gè)磁盤(pán)文件宪躯,因此每個(gè)Task就只有一個(gè)磁盤(pán)文件。在下一個(gè)stage的shuffle read task拉取自己的數(shù)據(jù)時(shí)位迂,只要根據(jù)索引讀取每個(gè)磁盤(pán)文件中的部分?jǐn)?shù)據(jù)即可访雪。
在spark 1.5.x以后,對(duì)于shuffle manager又出來(lái)了一種新的manager掂林,tungsten-sort(鎢絲)臣缀,鎢絲sort shuffle manager。官網(wǎng)上一般說(shuō)泻帮,鎢絲sort
shuffle manager精置,效果跟sort shuffle manager是差不多的。
但是锣杂,唯一的不同之處在于脂倦,鎢絲manager番宁,是使用了自己實(shí)現(xiàn)的一套內(nèi)存管理機(jī)制,性能上有很大的提升赖阻,而且可以避免shuffle過(guò)程中產(chǎn)生的大量的OOM蝶押,GC,等等內(nèi)存相關(guān)的異常政供。
3.3.3播聪、hash、sort布隔、tungsten-sort离陶。如何來(lái)選擇?
1衅檀、需不需要數(shù)據(jù)默認(rèn)就讓spark給你進(jìn)行排序招刨?就好像mapreduce,默認(rèn)就是有按照key的排序哀军。如果不需要的話沉眶,其實(shí)還是建議搭建就使用最基本的HashShuffleManager,因?yàn)樽铋_(kāi)始就是考慮的是不排序杉适,換取高性能谎倔。
2、什么時(shí)候需要用sort shuffle
manager猿推?如果你需要你的那些數(shù)據(jù)按key排序了片习,那么就選擇這種吧,而且要注意蹬叭,reduce task的數(shù)量應(yīng)該是超過(guò)200的藕咏,這樣sort、merge(多個(gè)文件合并成一個(gè))的機(jī)制秽五,才能生效把孽查。但是這里要注意,你一定要自己考量一下坦喘,有沒(méi)有必要在shuffle的過(guò)程中盲再,就做這個(gè)事情,畢竟對(duì)性能是有影響的瓣铣。
3洲胖、如果你不需要排序,而且你希望你的每個(gè)task輸出的文件最終是會(huì)合并成一份的坯沪,你自己認(rèn)為可以減少性能開(kāi)銷绿映。可以去調(diào)節(jié)bypassMergeThreshold這個(gè)閾值,比如你的reduce task數(shù)量是500叉弦,默認(rèn)閾值是200丐一,所以默認(rèn)還是會(huì)進(jìn)行sort和直接merge的⊙捅可以將閾值調(diào)節(jié)成550库车,不會(huì)進(jìn)行sort,按照hash的做法樱拴,每個(gè)reduce
task創(chuàng)建一份輸出文件柠衍,最后合并成一份文件。(一定要提醒大家晶乔,這個(gè)參數(shù)珍坊,其實(shí)我們通常不會(huì)在生產(chǎn)環(huán)境里去使用,也沒(méi)有經(jīng)過(guò)驗(yàn)證說(shuō)正罢,這樣的方式阵漏,到底有多少性能的提升)
4、如果你想選用sort based
shuffle manager翻具,而且你們公司的spark版本比較高履怯,是1.5.x版本的,那么可以考慮去嘗試使用tungsten-sort shuffle manager裆泳√局蓿看看性能的提升與穩(wěn)定性怎么樣。
總結(jié):
1工禾、在生產(chǎn)環(huán)境中运提,不建議大家貿(mào)然使用第三點(diǎn)和第四點(diǎn):
2、如果你不想要你的數(shù)據(jù)在shuffle時(shí)排序帜篇,那么就自己設(shè)置一下糙捺,用hash shuffle manager诫咱。
3笙隙、如果你的確是需要你的數(shù)據(jù)在shuffle時(shí)進(jìn)行排序的,那么就默認(rèn)不用動(dòng)坎缭,默認(rèn)就是sort shuffle manager竟痰。或者是什么掏呼?如果你壓根兒不care是否排序這個(gè)事兒坏快,那么就默認(rèn)讓他就是sort的。調(diào)節(jié)一些其他的參數(shù)(consolidation機(jī)制)憎夷。(80%莽鸿,都是用這種)
spark.shuffle.manager:hash、sort、tungsten-sort
spark.shuffle.sort.bypassMergeThreshold:200祥得。自己可以設(shè)定一個(gè)閾值兔沃,默認(rèn)是200,當(dāng)reduce task數(shù)量少于等于200级及,map task創(chuàng)建的輸出文件小于等于200的乒疏,最后會(huì)將所有的輸出文件合并為一份文件。這樣做的好處饮焦,就是避免了sort排序怕吴,節(jié)省了性能開(kāi)銷,而且還能將多個(gè)reduce task的文件合并成一份文件县踢,節(jié)省了reduce task拉取數(shù)據(jù)的時(shí)候的磁盤(pán)IO的開(kāi)銷转绷。
4、算子調(diào)優(yōu)
4.1殿雪、MapPartitions提升Map類操作性能
spark中暇咆,最基本的原則,就是每個(gè)task處理一個(gè)RDD的partition丙曙。
4.1.1爸业、MapPartitions的優(yōu)缺點(diǎn)
MapPartitions操作的優(yōu)點(diǎn):
如果是普通的map,比如一個(gè)partition中有1萬(wàn)條數(shù)據(jù)亏镰。ok扯旷,那么你的function要執(zhí)行和計(jì)算1萬(wàn)次。
但是索抓,使用MapPartitions操作之后钧忽,一個(gè)task僅僅會(huì)執(zhí)行一次function,function一次接收所有的partition數(shù)據(jù)逼肯。只要執(zhí)行一次就可以了耸黑,性能比較高。
MapPartitions的缺點(diǎn):
如果是普通的map操作篮幢,一次function的執(zhí)行就處理一條數(shù)據(jù)大刊。那么如果內(nèi)存不夠用的情況下,比如處理了1千條數(shù)據(jù)了三椿,那么這個(gè)時(shí)候內(nèi)存不夠了缺菌,那么就可以將已經(jīng)處理完的1千條數(shù)據(jù)從內(nèi)存里面垃圾回收掉,或者用其他方法搜锰,騰出空間來(lái)吧伴郁。
所以說(shuō)普通的map操作通常不會(huì)導(dǎo)致內(nèi)存的OOM異常。
但是MapPartitions操作蛋叼,對(duì)于大量數(shù)據(jù)來(lái)說(shuō)焊傅,比如甚至一個(gè)partition剂陡,100萬(wàn)數(shù)據(jù),一次傳入一個(gè)function以后狐胎,那么可能一下子內(nèi)存不夠鹏倘,但是又沒(méi)有辦法去騰出內(nèi)存空間來(lái),可能就OOM顽爹,內(nèi)存溢出纤泵。
4.1.2栋烤、MapPartitions使用場(chǎng)景
當(dāng)分析的數(shù)據(jù)量不是特別大的時(shí)候砰盐,都可以用這種MapPartitions系列操作块饺,性能還是非常不錯(cuò)的置吓,是有提升的移宅。比如原來(lái)是15分鐘阐污,(曾經(jīng)有一次性能調(diào)優(yōu))叔遂,12分鐘微渠。10分鐘->9分鐘同规。
但是也有過(guò)出問(wèn)題的經(jīng)驗(yàn)循狰,MapPartitions只要一用,直接OOM券勺,內(nèi)存溢出绪钥,崩潰。
在項(xiàng)目中关炼,自己先去估算一下RDD的數(shù)據(jù)量程腹,以及每個(gè)partition的量,還有自己分配給每個(gè)executor的內(nèi)存資源儒拂〈缌剩看看一下子內(nèi)存容納所有的partition數(shù)據(jù)行不行。如果行社痛,可以試一下见转,能跑通就好。性能肯定是有提升的蒜哀。但是試了以后斩箫,發(fā)現(xiàn)OOM了,那就放棄吧凡怎。
4.2校焦、filter過(guò)后使用coalesce減少分區(qū)數(shù)量
4.2.1赊抖、出現(xiàn)問(wèn)題
默認(rèn)情況下统倒,經(jīng)過(guò)了filter之后,RDD中的每個(gè)partition的數(shù)據(jù)量氛雪,可能都不太一樣了房匆。(原本每個(gè)partition的數(shù)據(jù)量可能是差不多的)
可能出現(xiàn)的問(wèn)題:
1、每個(gè)partition數(shù)據(jù)量變少了,但是在后面進(jìn)行處理的時(shí)候浴鸿,還是要跟partition數(shù)量一樣數(shù)量的task井氢,來(lái)進(jìn)行處理,有點(diǎn)浪費(fèi)task計(jì)算資源岳链。
2花竞、每個(gè)partition的數(shù)據(jù)量不一樣,會(huì)導(dǎo)致后面的每個(gè)task處理每個(gè)partition的時(shí)候掸哑,每個(gè)task要處理的數(shù)據(jù)量就不同约急,這樣就會(huì)導(dǎo)致有些task運(yùn)行的速度很快,有些task運(yùn)行的速度很慢苗分。這就是數(shù)據(jù)傾斜厌蔽。
針對(duì)上述的兩個(gè)問(wèn)題,我們希望應(yīng)該能夠怎么樣摔癣?
1奴饮、針對(duì)第一個(gè)問(wèn)題,我們希望可以進(jìn)行partition的壓縮吧择浊,因?yàn)閿?shù)據(jù)量變少了戴卜,那么partition其實(shí)也完全可以對(duì)應(yīng)的變少。比如原來(lái)是4個(gè)partition琢岩,現(xiàn)在完全可以變成2個(gè)partition叉瘩。那么就只要用后面的2個(gè)task來(lái)處理即可。就不會(huì)造成task計(jì)算資源的浪費(fèi)粘捎。(不必要薇缅,針對(duì)只有一點(diǎn)點(diǎn)數(shù)據(jù)的partition,還去啟動(dòng)一個(gè)task來(lái)計(jì)算)
2攒磨、針對(duì)第二個(gè)問(wèn)題泳桦,其實(shí)解決方案跟第一個(gè)問(wèn)題是一樣的,也是去壓縮partition娩缰,盡量讓每個(gè)partition的數(shù)據(jù)量差不多灸撰。那么后面的task分配到的partition的數(shù)據(jù)量也就差不多。不會(huì)造成有的task運(yùn)行速度特別慢拼坎,有的task運(yùn)行速度特別快浮毯。避免了數(shù)據(jù)傾斜的問(wèn)題。
4.2.2泰鸡、解決問(wèn)題方法
調(diào)用coalesce算子
主要就是用于在filter操作之后债蓝,針對(duì)每個(gè)partition的數(shù)據(jù)量各不相同的情況,來(lái)壓縮partition的數(shù)量盛龄,而且讓每個(gè)partition的數(shù)據(jù)量都盡量均勻緊湊饰迹。從而便于后面的task進(jìn)行計(jì)算操作芳誓,在某種程度上,能夠一定程度的提升性能啊鸭。
4.3锹淌、使用foreachPartition優(yōu)化寫(xiě)數(shù)據(jù)庫(kù)性能
4.3.1、默認(rèn)的foreach的性能缺陷在哪里赠制?
首先赂摆,對(duì)于每條數(shù)據(jù),都要單獨(dú)去調(diào)用一次function钟些,task為每個(gè)數(shù)據(jù)库正,都要去執(zhí)行一次function函數(shù)。
如果100萬(wàn)條數(shù)據(jù)厘唾,(一個(gè)partition)褥符,調(diào)用100萬(wàn)次。性能比較差抚垃。
另外一個(gè)非常非常重要的一點(diǎn)
如果每個(gè)數(shù)據(jù)喷楣,你都去創(chuàng)建一個(gè)數(shù)據(jù)庫(kù)連接的話,那么你就得創(chuàng)建100萬(wàn)次數(shù)據(jù)庫(kù)連接鹤树。
但是要注意的是铣焊,數(shù)據(jù)庫(kù)連接的創(chuàng)建和銷毀,都是非常非常消耗性能的罕伯。雖然我們之前已經(jīng)用了數(shù)據(jù)庫(kù)連接池曲伊,只是創(chuàng)建了固定數(shù)量的數(shù)據(jù)庫(kù)連接。
你還是得多次通過(guò)數(shù)據(jù)庫(kù)連接追他,往數(shù)據(jù)庫(kù)(MySQL)發(fā)送一條SQL語(yǔ)句坟募,然后MySQL需要去執(zhí)行這條SQL語(yǔ)句。如果有100萬(wàn)條數(shù)據(jù)邑狸,那么就是100萬(wàn)次發(fā)送SQL語(yǔ)句懈糯。
以上兩點(diǎn)(數(shù)據(jù)庫(kù)連接,多次發(fā)送SQL語(yǔ)句)单雾,都是非常消耗性能的赚哗。
4.3.2、用了foreachPartition算子之后硅堆,好處在哪里屿储?
1、對(duì)于我們寫(xiě)的function函數(shù)渐逃,就調(diào)用一次够掠,一次傳入一個(gè)partition所有的數(shù)據(jù)。
2朴乖、主要?jiǎng)?chuàng)建或者獲取一個(gè)數(shù)據(jù)庫(kù)連接就可以祖屏。
3、只要向數(shù)據(jù)庫(kù)發(fā)送一次SQL語(yǔ)句和多組參數(shù)即可买羞。
注意袁勺,與mapPartitions操作一樣,如果一個(gè)partition的數(shù)量真的特別特別大畜普,比如是100萬(wàn)期丰,那基本上就不太靠譜了。很有可能會(huì)發(fā)生OOM吃挑,內(nèi)存溢出的問(wèn)題钝荡。
4.4、使用repartition解決Spark SQL低并行度的性能問(wèn)題
4.4.1舶衬、設(shè)置并行度
并行度:之前說(shuō)過(guò)埠通,并行度是設(shè)置的:
1、spark.default.parallelism
2逛犹、textFile()端辱,傳入第二個(gè)參數(shù),指定partition數(shù)量(比較少用)
在生產(chǎn)環(huán)境中虽画,是最好設(shè)置一下并行度舞蔽。官網(wǎng)有推薦的設(shè)置方式,根據(jù)你的application的總cpu core數(shù)量(在spark-submit中可以指定)码撰,自己手動(dòng)設(shè)置spark.default.parallelism參數(shù)渗柿,指定為cpu
core總數(shù)的2~3倍。
4.4.2脖岛、你設(shè)置的這個(gè)并行度朵栖,在哪些情況下會(huì)生效?什么情況下不會(huì)生效柴梆?
如果你壓根兒沒(méi)有使用Spark SQL(DataFrame)混槐,那么你整個(gè)spark application默認(rèn)所有stage的并行度都是你設(shè)置的那個(gè)參數(shù)。(除非你使用coalesce算子縮減過(guò)partition數(shù)量)轩性。
問(wèn)題來(lái)了声登,用Spark SQL的情況下,stage的并行度沒(méi)法自己指定揣苏。Spark SQL自己會(huì)默認(rèn)根據(jù)hive表對(duì)應(yīng)的hdfs文件的block悯嗓,自動(dòng)設(shè)置Spark SQL查詢所在的那個(gè)stage的并行度。你自己通過(guò)spark.default.parallelism參數(shù)指定的并行度卸察,只會(huì)在沒(méi)有Spark SQL的stage中生效脯厨。
比如你第一個(gè)stage,用了Spark SQL從hive表中查詢出了一些數(shù)據(jù)坑质,然后做了一些transformation操作合武,接著做了一個(gè)shuffle操作(groupByKey)临梗。下一個(gè)stage,在shuffle操作之后稼跳,做了一些transformation操作盟庞。hive表,對(duì)應(yīng)了一個(gè)hdfs文件汤善,有20個(gè)block什猖。你自己設(shè)置了spark.default.parallelism參數(shù)為100。
你的第一個(gè)stage的并行度红淡,是不受你的控制的不狮,就只有20個(gè)task。第二個(gè)stage在旱,才會(huì)變成你自己設(shè)置的那個(gè)并行度摇零,100。
4.4.3桶蝎、可能出現(xiàn)的問(wèn)題遂黍?
Spark SQL默認(rèn)情況下,它的那個(gè)并行度俊嗽,咱們沒(méi)法設(shè)置雾家。可能導(dǎo)致的問(wèn)題绍豁,也許沒(méi)什么問(wèn)題芯咧,也許很有問(wèn)題。Spark SQL所在的那個(gè)stage中竹揍,后面的那些transformation操作敬飒,可能會(huì)有非常復(fù)雜的業(yè)務(wù)邏輯,甚至說(shuō)復(fù)雜的算法芬位。如果你的Spark SQL默認(rèn)把task數(shù)量設(shè)置的很少无拗,20個(gè),然后每個(gè)task要處理為數(shù)不少的數(shù)據(jù)量昧碉,然后還要執(zhí)行特別復(fù)雜的算法英染。
這個(gè)時(shí)候,就會(huì)導(dǎo)致第一個(gè)stage的速度被饿,特別慢四康。第二個(gè)stage1000個(gè)task非常快狭握。
4.4.4闪金、解決Spark SQL無(wú)法設(shè)置并行度和task數(shù)量的辦法
repartition算子,你用Spark
SQL這一步的并行度和task數(shù)量,肯定是沒(méi)有辦法去改變了哎垦。但是呢囱嫩,可以將你用Spark SQL查詢出來(lái)的RDD,使用repartition算子去重新進(jìn)行分區(qū)漏设,此時(shí)可以分成多個(gè)partition墨闲。然后呢,從repartition以后的RDD愿题,再往后损俭,并行度和task數(shù)量蛙奖,就會(huì)按照你預(yù)期的來(lái)了潘酗。就可以避免跟Spark SQL綁定在一個(gè)stage中的算子,只能使用少量的task去處理大量數(shù)據(jù)以及復(fù)雜的算法邏輯雁仲。
4.5仔夺、reduceByKey本地聚合介紹
reduceByKey,相較于普通的shuffle操作(比如groupByKey)攒砖,它的一個(gè)特點(diǎn)缸兔,就是說(shuō),會(huì)進(jìn)行map端的本地聚合吹艇。對(duì)map端給下個(gè)stage每個(gè)task創(chuàng)建的輸出文件中惰蜜,寫(xiě)數(shù)據(jù)之前,就會(huì)進(jìn)行本地的combiner操作受神,也就是說(shuō)對(duì)每一個(gè)key抛猖,對(duì)應(yīng)的values,都會(huì)執(zhí)行你的算子函數(shù)(_ + _)
4.5.1鼻听、用reduceByKey對(duì)性能的提升
1财著、在本地進(jìn)行聚合以后,在map端的數(shù)據(jù)量就變少了撑碴,減少磁盤(pán)IO撑教。而且可以減少磁盤(pán)空間的占用。
2醉拓、下一個(gè)stage伟姐,拉取數(shù)據(jù)的量,也就變少了亿卤。減少網(wǎng)絡(luò)的數(shù)據(jù)傳輸?shù)男阅芟摹?/p>
3玫镐、在reduce端進(jìn)行數(shù)據(jù)緩存的內(nèi)存占用變少了。
4怠噪、reduce端恐似,要進(jìn)行聚合的數(shù)據(jù)量也變少了。
4.5.2傍念、reduceByKey在什么情況下使用呢矫夷?
1葛闷、非常普通的,比如說(shuō)双藕,就是要實(shí)現(xiàn)類似于wordcount程序一樣的淑趾,對(duì)每個(gè)key對(duì)應(yīng)的值,進(jìn)行某種數(shù)據(jù)公式或者算法的計(jì)算(累加忧陪、類乘)扣泊。
2、對(duì)于一些類似于要對(duì)每個(gè)key進(jìn)行一些字符串拼接的這種較為復(fù)雜的操作嘶摊,可以自己衡量一下延蟹,其實(shí)有時(shí),也是可以使用reduceByKey來(lái)實(shí)現(xiàn)的叶堆。但是不太好實(shí)現(xiàn)阱飘。如果真能夠?qū)崿F(xiàn)出來(lái),對(duì)性能絕對(duì)是有幫助的虱颗。(shuffle基本上就占了整個(gè)spark作業(yè)的90%以上的性能消耗沥匈,主要能對(duì)shuffle進(jìn)行一定的調(diào)優(yōu),都是有價(jià)值的)
5忘渔、troubleshooting
5.1高帖、控制shuffle reduce端緩沖大小以避免OOM
map端的task是不斷的輸出數(shù)據(jù)的,數(shù)據(jù)量可能是很大的畦粮。
但是散址,其實(shí)reduce端的task,并不是等到map端task將屬于自己的那份數(shù)據(jù)全部寫(xiě)入磁盤(pán)文件之后锈玉,再去拉取的爪飘。map端寫(xiě)一點(diǎn)數(shù)據(jù),reduce端task就會(huì)拉取一小部分?jǐn)?shù)據(jù)拉背,立即進(jìn)行后面的聚合师崎、算子函數(shù)的應(yīng)用。
每次reduece能夠拉取多少數(shù)據(jù)椅棺,就由buffer來(lái)決定犁罩。因?yàn)槔∵^(guò)來(lái)的數(shù)據(jù),都是先放在buffer中的两疚。然后才用后面的executor分配的堆內(nèi)存占比(0.2)床估,hashmap,去進(jìn)行后續(xù)的聚合诱渤、函數(shù)的執(zhí)行丐巫。
5.1.1、reduce端緩沖大小的另外一面,關(guān)于性能調(diào)優(yōu)的一面
假如Map端輸出的數(shù)據(jù)量也不是特別大递胧,然后你的整個(gè)application的資源也特別充足碑韵。200個(gè)executor、5個(gè)cpu core缎脾、10G內(nèi)存祝闻。
其實(shí)可以嘗試去增加這個(gè)reduce端緩沖大小的,比如從48M遗菠,變成96M联喘。那么這樣的話,每次reduce task能夠拉取的數(shù)據(jù)量就很大辙纬。需要拉取的次數(shù)也就變少了豁遭。比如原先需要拉取100次,現(xiàn)在只要拉取50次就可以執(zhí)行完了牲平。
對(duì)網(wǎng)絡(luò)傳輸性能開(kāi)銷的減少堤框,以及reduce端聚合操作執(zhí)行的次數(shù)的減少域滥,都是有幫助的纵柿。
最終達(dá)到的效果,就應(yīng)該是性能上的一定程度上的提升启绰。
注意昂儒,一定要在資源充足的前提下做此操作。
5.1.2reduce端緩沖(buffer)委可,可能會(huì)出現(xiàn)的問(wèn)題及解決方式
可能會(huì)出現(xiàn)渊跋,默認(rèn)是48MB,也許大多數(shù)時(shí)候着倾,reduce端task一邊拉取一邊計(jì)算拾酝,不一定一直都會(huì)拉滿48M的數(shù)據(jù)。大多數(shù)時(shí)候卡者,拉取個(gè)10M數(shù)據(jù)蒿囤,就計(jì)算掉了。
大多數(shù)時(shí)候崇决,也許不會(huì)出現(xiàn)什么問(wèn)題材诽。但是有的時(shí)候,map端的數(shù)據(jù)量特別大恒傻,然后寫(xiě)出的速度特別快脸侥。reduce端所有task,拉取的時(shí)候,全部達(dá)到自己的緩沖的最大極限值茂洒,緩沖區(qū)48M忌栅,全部填滿锄贼。
這個(gè)時(shí)候外遇,再加上你的reduce端執(zhí)行的聚合函數(shù)的代碼拒逮,可能會(huì)創(chuàng)建大量的對(duì)象。也許臀规,一下子內(nèi)存就撐不住了滩援,就會(huì)OOM。reduce端的內(nèi)存中塔嬉,就會(huì)發(fā)生內(nèi)存溢出的問(wèn)題玩徊。
針對(duì)上述的可能出現(xiàn)的問(wèn)題,我們?cè)撛趺磥?lái)解決呢谨究?
這個(gè)時(shí)候恩袱,就應(yīng)該減少reduce端task緩沖的大小。我寧愿多拉取幾次胶哲,但是每次同時(shí)能夠拉取到reduce端每個(gè)task的數(shù)量比較少畔塔,就不容易發(fā)生OOM內(nèi)存溢出的問(wèn)題。(比如鸯屿,可以調(diào)節(jié)成12M)
在實(shí)際生產(chǎn)環(huán)境中澈吨,我們都是碰到過(guò)這種問(wèn)題的。這是典型的以性能換執(zhí)行的原理寄摆。reduce端緩沖小了谅辣,不容易OOM了,但是婶恼,性能一定是有所下降的桑阶,你要拉取的次數(shù)就多了。就走更多的網(wǎng)絡(luò)傳輸開(kāi)銷勾邦。
這種時(shí)候蚣录,只能采取犧牲性能的方式了,spark作業(yè)眷篇,首先萎河,第一要義,就是一定要讓它可以跑起來(lái)铅歼。
5.1.3公壤、操作方法
new SparkConf().set(spark.reducer.maxSizeInFlight,”48”)
5.2椎椰、解決JVM GC導(dǎo)致的shuffle文件拉取失敗
5.2.1厦幅、問(wèn)題描述
有時(shí)會(huì)出現(xiàn)一種情況,在spark的作業(yè)中慨飘,log日志會(huì)提示shuffle file not found确憨。(spark作業(yè)中译荞,非常常見(jiàn)的)而且有的時(shí)候,它是偶爾才會(huì)出現(xiàn)的一種情況休弃。有的時(shí)候吞歼,出現(xiàn)這種情況以后,重新去提交task塔猾。重新執(zhí)行一遍篙骡,發(fā)現(xiàn)就好了。沒(méi)有這種錯(cuò)誤了丈甸。
log怎么看糯俗?用client模式去提交你的spark作業(yè)。比如standalone
client或yarn client睦擂。一提交作業(yè)得湘,直接可以在本地看到更新的log。
問(wèn)題原因:比如顿仇,executor的JVM進(jìn)程可能內(nèi)存不夠用了淘正。那么此時(shí)就會(huì)執(zhí)行GC。minor GC or full GC臼闻。此時(shí)就會(huì)導(dǎo)致executor內(nèi)鸿吆,所有的工作線程全部停止。比如BlockManager些阅,基于netty的網(wǎng)絡(luò)通信伞剑。
下一個(gè)stage的executor斑唬,可能還沒(méi)有停止掉的task想要去上一個(gè)stage的task所在的exeuctor去拉取屬于自己的數(shù)據(jù)市埋,結(jié)果由于對(duì)方正在gc,就導(dǎo)致拉取了半天沒(méi)有拉取到恕刘。
就很可能會(huì)報(bào)出shuffle file not found缤谎。但是,可能下一個(gè)stage又重新提交了task以后褐着,再執(zhí)行就沒(méi)有問(wèn)題了坷澡,因?yàn)榭赡艿诙尉蜎](méi)有碰到JVM在gc了。
5.2.2含蓉、解決方案
spark.shuffle.io.maxRetries 3
第一個(gè)參數(shù)频敛,意思就是說(shuō),shuffle文件拉取的時(shí)候馅扣,如果沒(méi)有拉取到(拉取失斦遄),最多或重試幾次(會(huì)重新拉取幾次文件)差油,默認(rèn)是3次拗军。
spark.shuffle.io.retryWait 5s
第二個(gè)參數(shù)任洞,意思就是說(shuō),每一次重試?yán)∥募臅r(shí)間間隔发侵,默認(rèn)是5s鐘交掏。
默認(rèn)情況下,假如說(shuō)第一個(gè)stage的executor正在進(jìn)行漫長(zhǎng)的full gc刃鳄。第二個(gè)stage的executor嘗試去拉取文件盅弛,結(jié)果沒(méi)有拉取到,默認(rèn)情況下叔锐,會(huì)反復(fù)重試?yán)?次熊尉,每次間隔是五秒鐘。最多只會(huì)等待3 * 5s = 15s掌腰。如果15s內(nèi)狰住,沒(méi)有拉取到shuffle file。就會(huì)報(bào)出shuffle file not found齿梁。
針對(duì)這種情況催植,我們完全可以進(jìn)行預(yù)備性的參數(shù)調(diào)節(jié)。增大上述兩個(gè)參數(shù)的值勺择,達(dá)到比較大的一個(gè)值创南,盡量保證第二個(gè)stage的task,一定能夠拉取到上一個(gè)stage的輸出文件省核。避免報(bào)shuffle file not found稿辙。然后可能會(huì)重新提交stage和task去執(zhí)行。那樣反而對(duì)性能也不好气忠。
spark.shuffle.io.maxRetries 60
spark.shuffle.io.retryWait 60s
最多可以忍受1個(gè)小時(shí)沒(méi)有拉取到shuffle file邻储。只是去設(shè)置一個(gè)最大的可能的值。full gc不可能1個(gè)小時(shí)都沒(méi)結(jié)束吧旧噪。
這樣呢吨娜,就可以盡量避免因?yàn)間c導(dǎo)致的shuffle file not found,無(wú)法拉取到的問(wèn)題淘钟。
5.3宦赠、YARN隊(duì)列資源不足導(dǎo)致的application直接失敗
5.3.1、問(wèn)題描述
如果說(shuō)米母,你是基于yarn來(lái)提交spark勾扭。比如yarn-cluster或者yarn-client。你可以指定提交到某個(gè)hadoop隊(duì)列上的铁瞒。每個(gè)隊(duì)列都是可以有自己的資源的妙色。
跟大家說(shuō)一個(gè)生產(chǎn)環(huán)境中的,給spark用的yarn資源隊(duì)列的情況:500G內(nèi)存精拟,200個(gè)cpu core燎斩。
比如說(shuō)虱歪,某個(gè)spark application,在spark-submit里面你自己配了栅表,executor笋鄙,80個(gè)。每個(gè)executor怪瓶,4G內(nèi)存萧落。每個(gè)executor,2個(gè)cpu core洗贰。你的spark作業(yè)每次運(yùn)行找岖,大概要消耗掉320G內(nèi)存,以及160個(gè)cpu core敛滋。
乍看起來(lái)许布,咱們的隊(duì)列資源,是足夠的绎晃,500G內(nèi)存蜜唾,280個(gè)cpu core。
首先庶艾,第一點(diǎn)袁余,你的spark作業(yè)實(shí)際運(yùn)行起來(lái)以后,耗費(fèi)掉的資源量咱揍,可能是比你在spark-submit里面配置的颖榜,以及你預(yù)期的,是要大一些的煤裙。400G內(nèi)存掩完,190個(gè)cpu core。
那么這個(gè)時(shí)候积暖,的確藤为,咱們的隊(duì)列資源還是有一些剩余的。但問(wèn)題是如果你同時(shí)又提交了一個(gè)spark作業(yè)上去夺刑,一模一樣的。那就可能會(huì)出問(wèn)題分别。
第二個(gè)spark作業(yè)遍愿,又要申請(qǐng)320G內(nèi)存+160個(gè)cpu core。結(jié)果耘斩,發(fā)現(xiàn)隊(duì)列資源不足沼填。
此時(shí),可能會(huì)出現(xiàn)兩種情況:(備注括授,具體出現(xiàn)哪種情況坞笙,跟你的YARN岩饼、Hadoop的版本,你們公司的一些運(yùn)維參數(shù)薛夜,以及配置籍茧、硬件、資源肯能都有關(guān)系)
1梯澜、YARN寞冯,發(fā)現(xiàn)資源不足時(shí),你的spark作業(yè)晚伙,并沒(méi)有hang在那里吮龄,等待資源的分配,而是直接打印一行fail的log咆疗,直接就fail掉了漓帚。
2、YARN午磁,發(fā)現(xiàn)資源不足胰默,你的spark作業(yè),就hang在那里漓踢。一直等待之前的spark作業(yè)執(zhí)行完牵署,等待有資源分配給自己來(lái)執(zhí)行。
5.3.2喧半、解決方案
1奴迅、在你的J2EE(我們這個(gè)項(xiàng)目里面,spark作業(yè)的運(yùn)行挺据, J2EE平臺(tái)觸發(fā)的取具,執(zhí)行spark-submit腳本的平臺(tái))進(jìn)行限制,同時(shí)只能提交一個(gè)spark作業(yè)到y(tǒng)arn上去執(zhí)行扁耐,確保一個(gè)spark作業(yè)的資源肯定是有的暇检。
2、你應(yīng)該采用一些簡(jiǎn)單的調(diào)度區(qū)分的方式婉称,比如說(shuō)块仆,有的spark作業(yè)可能是要長(zhǎng)時(shí)間運(yùn)行的,比如運(yùn)行30分鐘王暗。有的spark作業(yè)悔据,可能是短時(shí)間運(yùn)行的,可能就運(yùn)行2分鐘俗壹。此時(shí)科汗,都提交到一個(gè)隊(duì)列上去,肯定不合適绷雏。很可能出現(xiàn)30分鐘的作業(yè)卡住后面一大堆2分鐘的作業(yè)头滔。分隊(duì)列怖亭,可以申請(qǐng)(跟你們的YARN、Hadoop運(yùn)維的同事申請(qǐng))坤检。你自己給自己搞兩個(gè)調(diào)度隊(duì)列兴猩。每個(gè)隊(duì)列的根據(jù)你要執(zhí)行的作業(yè)的情況來(lái)設(shè)置。在你的J2EE程序里面缀蹄,要判斷峭跳,如果是長(zhǎng)時(shí)間運(yùn)行的作業(yè),就干脆都提交到某一個(gè)固定的隊(duì)列里面去把缺前。如果是短時(shí)間運(yùn)行的作業(yè)蛀醉,就統(tǒng)一提交到另外一個(gè)隊(duì)列里面去。這樣衅码,避免了長(zhǎng)時(shí)間運(yùn)行的作業(yè)拯刁,阻塞了短時(shí)間運(yùn)行的作業(yè)。
3逝段、你的隊(duì)列里面垛玻,無(wú)論何時(shí),只會(huì)有一個(gè)作業(yè)在里面運(yùn)行奶躯。那么此時(shí)帚桩,就應(yīng)該用我們之前講過(guò)的性能調(diào)優(yōu)的手段,去將每個(gè)隊(duì)列能承載的最大的資源嘹黔,分配給你的每一個(gè)spark作業(yè)账嚎,比如80個(gè)executor,6G的內(nèi)存儡蔓,3個(gè)cpu core郭蕉。盡量讓你的spark作業(yè)每一次運(yùn)行,都達(dá)到最滿的資源使用率喂江,最快的速度召锈,最好的性能。并行度获询,240個(gè)cpu core涨岁,720個(gè)task。
4筐付、在J2EE中卵惦,通過(guò)線程池的方式(一個(gè)線程池對(duì)應(yīng)一個(gè)資源隊(duì)列),來(lái)實(shí)現(xiàn)上述我們說(shuō)的方案瓦戚。
5.4、解決各種序列化導(dǎo)致的報(bào)錯(cuò)
5.4.1丛塌、問(wèn)題描述
用client模式去提交spark作業(yè)较解,觀察本地打印出來(lái)的log畜疾。如果出現(xiàn)了類似于Serializable、Serialize等等字眼報(bào)錯(cuò)的log印衔,那么恭喜大家啡捶,就碰到了序列化問(wèn)題導(dǎo)致的報(bào)錯(cuò)。
5.4.2奸焙、序列化報(bào)錯(cuò)及解決方法
1瞎暑、你的算子函數(shù)里面,如果使用到了外部的自定義類型的變量与帆,那么此時(shí)了赌,就要求你的自定義類型,必須是可序列化的玄糟。
final Teacher teacher = newTeacher("leo");
studentsRDD.foreach(new VoidFunction() {
public void call(Row row) throws Exception {
String teacherName = teacher.getName();
....
}
});
public class Teacher implements Serializable {
}
2勿她、如果要將自定義的類型,作為RDD的元素類型阵翎,那么自定義的類型也必須是可以序列化的逢并。
JavaPairRDD teacherRDD
JavaPairRDD studentRDD
studentRDD.join(teacherRDD)
public class Teacher implements Serializable {
}
public class Student implements Serializable {
}
3、不能在上述兩種情況下郭卫,去使用一些第三方的砍聊,不支持序列化的類型。
Connection conn =
studentsRDD.foreach(new VoidFunction() {
public void call(Row row)throws Exception {
conn.....
}
});
Connection是不支持序列化的
5.5贰军、解決算子函數(shù)返回NULL導(dǎo)致的問(wèn)題
5.5.1玻蝌、問(wèn)題描述
在有些算子函數(shù)里面,是需要我們有一個(gè)返回值的谓形。但是灶伊,有時(shí)候不需要返回值。我們?nèi)绻苯臃祷豊ULL的話寒跳,是會(huì)報(bào)錯(cuò)的聘萨。
Scala.Math(NULL),異常
5.5.2童太、解決方案
如果碰到你的確是對(duì)于某些值不想要有返回值的話米辐,有一個(gè)解決的辦法:
1、在返回的時(shí)候书释,返回一些特殊的值翘贮,不要返回null,比如“-999”
2爆惧、在通過(guò)算子獲取到了一個(gè)RDD之后狸页,可以對(duì)這個(gè)RDD執(zhí)行filter操作,進(jìn)行數(shù)據(jù)過(guò)濾。filter內(nèi)芍耘,可以對(duì)數(shù)據(jù)進(jìn)行判定址遇,如果是-999,那么就返回false斋竞,給過(guò)濾掉就可以了倔约。
3、大家不要忘了坝初,之前咱們講過(guò)的那個(gè)算子調(diào)優(yōu)里面的coalesce算子浸剩,在filter之后,可以使用coalesce算子壓縮一下RDD的partition的數(shù)量鳄袍,讓各個(gè)partition的數(shù)據(jù)比較緊湊一些绢要。也能提升一些性能。
5.6畦木、解決yarn-client模式導(dǎo)致的網(wǎng)卡流量激增問(wèn)題
5.6.1袖扛、Spark-On-Yarn任務(wù)執(zhí)行流程
Driver到底是什么?
我們寫(xiě)的spark程序十籍,打成jar包蛆封,用spark-submit來(lái)提交。jar包中的一個(gè)main類勾栗,通過(guò)jvm的命令啟動(dòng)起來(lái)惨篱。
JVM進(jìn)程,其實(shí)就是Driver進(jìn)程围俘。
Driver進(jìn)程啟動(dòng)起來(lái)以后砸讳,執(zhí)行我們自己寫(xiě)的main函數(shù),從new
SparkContext()開(kāi)始界牡。
driver接收到屬于自己的executor進(jìn)程的注冊(cè)之后簿寂,就可以去進(jìn)行我們寫(xiě)的spark作業(yè)代碼的執(zhí)行了。此時(shí)會(huì)一行一行的去執(zhí)行咱們寫(xiě)的那些spark代碼宿亡。執(zhí)行到某個(gè)action操作的時(shí)候常遂,就會(huì)觸發(fā)一個(gè)job,然后DAGScheduler會(huì)將job劃分為一個(gè)一個(gè)的stage挽荠,為每個(gè)stage都創(chuàng)建指定數(shù)量的task克胳。TaskScheduler將每個(gè)stage的task分配到各個(gè)executor上面去執(zhí)行。
task就會(huì)執(zhí)行咱們寫(xiě)的算子函數(shù)圈匆。
spark在yarn-client模式下漠另,application的注冊(cè)(executor的申請(qǐng))和計(jì)算task的調(diào)度,是分離開(kāi)來(lái)的跃赚。
standalone模式下笆搓,這兩個(gè)操作都是driver負(fù)責(zé)的。
ApplicationMaster(ExecutorLauncher)負(fù)責(zé)executor的申請(qǐng),driver負(fù)責(zé)job和stage的劃分砚作,以及task的創(chuàng)建窘奏、分配和調(diào)度嘹锁。
每種計(jì)算框架(mr葫录、spark),如果想要在yarn上執(zhí)行自己的計(jì)算應(yīng)用领猾,那么就必須自己實(shí)現(xiàn)和提供一個(gè)ApplicationMaster锉试。相當(dāng)于是實(shí)現(xiàn)了yarn提供的接口杂瘸,spark自己開(kāi)發(fā)的一個(gè)類。
5.6.2、yarn-client模式下祈纯,會(huì)產(chǎn)生什么樣的問(wèn)題呢?
由于driver是啟動(dòng)在本地機(jī)器的黑界,而且driver是全權(quán)負(fù)責(zé)所有的任務(wù)的調(diào)度的劳澄,也就是說(shuō)要跟yarn集群上運(yùn)行的多個(gè)executor進(jìn)行頻繁的通信(中間有task的啟動(dòng)消息、task的執(zhí)行統(tǒng)計(jì)消息袁翁、task的運(yùn)行狀態(tài)柴底、shuffle的輸出結(jié)果)。
想象一下粱胜,比如你的executor有100個(gè)柄驻,stage有10個(gè),task有1000個(gè)焙压。每個(gè)stage運(yùn)行的時(shí)候鸿脓,都有1000個(gè)task提交到executor上面去運(yùn)行,平均每個(gè)executor有10個(gè)task涯曲。接下來(lái)問(wèn)題來(lái)了野哭,driver要頻繁地跟executor上運(yùn)行的1000個(gè)task進(jìn)行通信。通信消息特別多幻件,通信的頻率特別高拨黔。運(yùn)行完一個(gè)stage,接著運(yùn)行下一個(gè)stage傲武,又是頻繁的通信蓉驹。
在整個(gè)spark運(yùn)行的生命周期內(nèi),都會(huì)頻繁的去進(jìn)行通信和調(diào)度揪利。所有這一切通信和調(diào)度都是從你的本地機(jī)器上發(fā)出去的态兴,和接收到的。這是最要命的地方疟位。你的本地機(jī)器瞻润,很可能在30分鐘內(nèi)(spark作業(yè)運(yùn)行的周期內(nèi)),進(jìn)行頻繁大量的網(wǎng)絡(luò)通信。那么此時(shí)绍撞,你的本地機(jī)器的網(wǎng)絡(luò)通信負(fù)載是非常非常高的正勒。會(huì)導(dǎo)致你的本地機(jī)器的網(wǎng)卡流量會(huì)激增!
你的本地機(jī)器的網(wǎng)卡流量激增傻铣,當(dāng)然不是一件好事了章贞。因?yàn)樵谝恍┐蟮墓纠锩妫瑢?duì)每臺(tái)機(jī)器的使用情況非洲,都是有監(jiān)控的鸭限。不會(huì)允許單個(gè)機(jī)器出現(xiàn)耗費(fèi)大量網(wǎng)絡(luò)帶寬等等這種資源的情況。
5.6.3两踏、解決方案
實(shí)際上解決的方法很簡(jiǎn)單败京,就是心里要清楚,yarn-client模式是什么情況下梦染,可以使用的赡麦?yarn-client模式,通常咱們就只會(huì)使用在測(cè)試環(huán)境中帕识,你寫(xiě)好了某個(gè)spark作業(yè)泛粹,打了一個(gè)jar包,在某臺(tái)測(cè)試機(jī)器上渡冻,用yarn-client模式去提交一下戚扳。因?yàn)闇y(cè)試的行為是偶爾為之的,不會(huì)長(zhǎng)時(shí)間連續(xù)提交大量的spark作業(yè)去測(cè)試族吻。還有一點(diǎn)好處帽借,yarn-client模式提交,可以在本地機(jī)器觀察到詳細(xì)全面的log超歌。通過(guò)查看log砍艾,可以去解決線上報(bào)錯(cuò)的故障(troubleshooting)、對(duì)性能進(jìn)行觀察并進(jìn)行性能調(diào)優(yōu)巍举。
實(shí)際上線了以后脆荷,在生產(chǎn)環(huán)境中,都得用yarn-cluster模式懊悯,去提交你的spark作業(yè)蜓谋。
yarn-cluster模式,就跟你的本地機(jī)器引起的網(wǎng)卡流量激增的問(wèn)題炭分,就沒(méi)有關(guān)系了桃焕。也就是說(shuō),就算有問(wèn)題捧毛,也應(yīng)該是yarn運(yùn)維團(tuán)隊(duì)和基礎(chǔ)運(yùn)維團(tuán)隊(duì)之間的事情了观堂。使用了yarn-cluster模式以后让网,就不是你的本地機(jī)器運(yùn)行Driver,進(jìn)行task調(diào)度了师痕。是yarn集群中溃睹,某個(gè)節(jié)點(diǎn)會(huì)運(yùn)行driver進(jìn)程,負(fù)責(zé)task調(diào)度胰坟。
5.7因篇、解決yarn-cluster模式的JVM棧內(nèi)存溢出問(wèn)題
5.7.1、問(wèn)題描述
有的時(shí)候腕铸,運(yùn)行一些包含了spark sql的spark作業(yè)惜犀,可能會(huì)碰到y(tǒng)arn-client模式下,可以正常提交運(yùn)行狠裹。yarn-cluster模式下,可能無(wú)法提交運(yùn)行的汽烦,會(huì)報(bào)出JVM的PermGen(永久代)的內(nèi)存溢出涛菠,OOM。
yarn-client模式下撇吞,driver是運(yùn)行在本地機(jī)器上的俗冻,spark使用的JVM的PermGen的配置,是本地的spark-class文件(spark客戶端是默認(rèn)有配置的)牍颈,JVM的永久代的大小是128M迄薄,這個(gè)是沒(méi)有問(wèn)題的。但是在yarn-cluster模式下煮岁,driver是運(yùn)行在yarn集群的某個(gè)節(jié)點(diǎn)上的讥蔽,使用的是沒(méi)有經(jīng)過(guò)配置的默認(rèn)設(shè)置(PermGen永久代大小)画机,82M冶伞。
spark-sql,它的內(nèi)部是要進(jìn)行很復(fù)雜的SQL的語(yǔ)義解析步氏、語(yǔ)法樹(shù)的轉(zhuǎn)換等等响禽,特別復(fù)雜。在這種復(fù)雜的情況下荚醒,如果說(shuō)你的sql本身特別復(fù)雜的話芋类,很可能會(huì)比較導(dǎo)致性能的消耗,內(nèi)存的消耗界阁『罘保可能對(duì)PermGen永久代的占用會(huì)比較大。
所以铺董,此時(shí)巫击,如果對(duì)永久代的占用需求禀晓,超過(guò)了82M的話,但是呢又在128M以內(nèi)坝锰,就會(huì)出現(xiàn)如上所述的問(wèn)題粹懒,yarn-client模式下,默認(rèn)是128M顷级,這個(gè)還能運(yùn)行凫乖,如果在yarn-cluster模式下,默認(rèn)是82M弓颈,就有問(wèn)題了帽芽。會(huì)報(bào)出PermGen Out of Memory error log。
5.7.2翔冀、解決方案
既然是JVM的PermGen永久代內(nèi)存溢出导街,那么就是內(nèi)存不夠用。我們就給yarn-cluster模式下的driver的PermGen多設(shè)置一些纤子。
spark-submit腳本中搬瑰,加入以下配置即可:
--confspark.driver.extraJavaOptions="-XX:PermSize=128M-XX:MaxPermSize=256M"
這個(gè)就設(shè)置了driver永久代的大小,默認(rèn)是128M控硼,最大是256M泽论。這樣的話,就可以基本保證你的spark作業(yè)不會(huì)出現(xiàn)上述的yarn-cluster模式導(dǎo)致的永久代內(nèi)存溢出的問(wèn)題卡乾。
spark sql中翼悴,寫(xiě)sql,要注意一個(gè)問(wèn)題:
如果sql有大量的or語(yǔ)句幔妨。比如where keywords='' or keywords='' or keywords=''
當(dāng)達(dá)到or語(yǔ)句鹦赎,有成百上千的時(shí)候,此時(shí)可能就會(huì)出現(xiàn)一個(gè)driver端的jvm stack overflow陶冷,JVM棧內(nèi)存溢出的問(wèn)題钙姊。
JVM棧內(nèi)存溢出,基本上就是由于調(diào)用的方法層級(jí)過(guò)多埂伦,因?yàn)楫a(chǎn)生了大量的煞额,非常深的,超出了JVM棧深度限制的遞歸方法沾谜。我們的猜測(cè)膊毁,spark sql有大量or語(yǔ)句的時(shí)候,spark sql內(nèi)部源碼中基跑,在解析sql婚温,比如轉(zhuǎn)換成語(yǔ)法樹(shù),或者進(jìn)行執(zhí)行計(jì)劃的生成的時(shí)候媳否,對(duì)or的處理是遞歸栅螟。or特別多的話荆秦,就會(huì)發(fā)生大量的遞歸。
JVM Stack Memory Overflow力图,棧內(nèi)存溢出步绸。
這種時(shí)候,建議不要搞那么復(fù)雜的spark sql語(yǔ)句吃媒。采用替代方案:將一條sql語(yǔ)句瓤介,拆解成多條sql語(yǔ)句來(lái)執(zhí)行。每條sql語(yǔ)句赘那,就只有100個(gè)or子句以內(nèi)刑桑。一條一條SQL語(yǔ)句來(lái)執(zhí)行。根據(jù)生產(chǎn)環(huán)境經(jīng)驗(yàn)的測(cè)試募舟,一條sql語(yǔ)句祠斧,100個(gè)or子句以內(nèi),是還可以的胃珍。通常情況下梁肿,不會(huì)報(bào)那個(gè)棧內(nèi)存溢出。
5.7觅彰、錯(cuò)誤的持久化方式以及checkpoint的使用
5.7.1、使用持久化方式
錯(cuò)誤的持久化使用方式:
usersRDD钮热,想要對(duì)這個(gè)RDD做一個(gè)cache填抬,希望能夠在后面多次使用這個(gè)RDD的時(shí)候,不用反復(fù)重新計(jì)算RDD隧期§穑可以直接使用通過(guò)各個(gè)節(jié)點(diǎn)上的executor的BlockManager管理的內(nèi)存/ 磁盤(pán)上的數(shù)據(jù),避免重新反復(fù)計(jì)算RDD仆潮。
usersRDD.cache()
usersRDD.count()
usersRDD.take()
上面這種方式宏蛉,不要說(shuō)會(huì)不會(huì)生效了,實(shí)際上是會(huì)報(bào)錯(cuò)的性置。會(huì)報(bào)什么錯(cuò)誤呢拾并?會(huì)報(bào)一大堆file not found的錯(cuò)誤。
正確的持久化使用方式:
usersRDD
usersRDD = usersRDD.cache() // Java
val cachedUsersRDD = usersRDD.cache() // Scala
之后再去使用usersRDD鹏浅,或者cachedUsersRDD就可以了嗅义。
5.7.2、checkpoint的使用
對(duì)于持久化隐砸,大多數(shù)時(shí)候都是會(huì)正常工作的之碗。但有些時(shí)候會(huì)出現(xiàn)意外。
比如說(shuō)季希,緩存在內(nèi)存中的數(shù)據(jù)褪那,可能莫名其妙就丟失掉了幽纷。
或者說(shuō),存儲(chǔ)在磁盤(pán)文件中的數(shù)據(jù)博敬,莫名其妙就沒(méi)了友浸,文件被誤刪了。
出現(xiàn)上述情況的時(shí)候冶忱,如果要對(duì)這個(gè)RDD執(zhí)行某些操作尾菇,可能會(huì)發(fā)現(xiàn)RDD的某個(gè)partition找不到了。
下來(lái)task就會(huì)對(duì)消失的partition重新計(jì)算囚枪,計(jì)算完以后再緩存和使用派诬。
有些時(shí)候,計(jì)算某個(gè)RDD链沼,可能是極其耗時(shí)的默赂。可能RDD之前有大量的父RDD括勺。那么如果你要重新計(jì)算一個(gè)partition缆八,可能要重新計(jì)算之前所有的父RDD對(duì)應(yīng)的partition。
這種情況下疾捍,就可以選擇對(duì)這個(gè)RDD進(jìn)行checkpoint,以防萬(wàn)一乱豆。進(jìn)行checkpoint奖恰,就是說(shuō),會(huì)將RDD的數(shù)據(jù)宛裕,持久化一份到容錯(cuò)的文件系統(tǒng)上(比如hdfs)瑟啃。
在對(duì)這個(gè)RDD進(jìn)行計(jì)算的時(shí)候,如果發(fā)現(xiàn)它的緩存數(shù)據(jù)不見(jiàn)了揩尸。優(yōu)先就是先找一下有沒(méi)有checkpoint數(shù)據(jù)(到hdfs上面去找)蛹屿。如果有的話,就使用checkpoint數(shù)據(jù)了岩榆。不至于去重新計(jì)算错负。
checkpoint,其實(shí)就是可以作為是cache的一個(gè)備胎朗恳。如果cache失效了湿颅,checkpoint就可以上來(lái)使用了。
checkpoint有利有弊粥诫,利在于油航,提高了spark作業(yè)的可靠性,一旦發(fā)生問(wèn)題怀浆,還是很可靠的谊囚,不用重新計(jì)算大量的rdd怕享。但是弊在于,進(jìn)行checkpoint操作的時(shí)候镰踏,也就是將rdd數(shù)據(jù)寫(xiě)入hdfs中的時(shí)候函筋,還是會(huì)消耗性能的。
checkpoint奠伪,用性能換可靠性跌帐。
checkpoint原理:
1、在代碼中绊率,用SparkContext谨敛,設(shè)置一個(gè)checkpoint目錄,可以是一個(gè)容錯(cuò)文件系統(tǒng)的目錄滤否,比如hdfs脸狸。
2、在代碼中藐俺,對(duì)需要進(jìn)行checkpoint的rdd炊甲,執(zhí)行RDD.checkpoint()。
3欲芹、RDDCheckpointData(spark內(nèi)部的API)卿啡,接管你的RDD,會(huì)標(biāo)記為marked
for checkpoint菱父,準(zhǔn)備進(jìn)行checkpoint牵囤。
4、你的job運(yùn)行完之后滞伟,會(huì)調(diào)用一個(gè)finalRDD.doCheckpoint()方法,會(huì)順著rdd
lineage炕贵,回溯掃描梆奈,發(fā)現(xiàn)有標(biāo)記為待checkpoint的rdd,就會(huì)進(jìn)行二次標(biāo)記称开,inProgressCheckpoint亩钟,正在接受checkpoint操作。
5鳖轰、job執(zhí)行完之后清酥,就會(huì)啟動(dòng)一個(gè)內(nèi)部的新job,去將標(biāo)記為inProgressCheckpoint的rdd的數(shù)據(jù)蕴侣,都寫(xiě)入hdfs文件中焰轻。(備注,如果rdd之前cache過(guò)昆雀,會(huì)直接從緩存中獲取數(shù)據(jù)辱志,寫(xiě)入hdfs中蝠筑。如果沒(méi)有cache過(guò),那么就會(huì)重新計(jì)算一遍這個(gè)rdd揩懒,再checkpoint)什乙。
6、將checkpoint過(guò)的rdd之前的依賴rdd已球,改成一個(gè)CheckpointRDD*臣镣,強(qiáng)制改變你的rdd的lineage。后面如果rdd的cache數(shù)據(jù)獲取失敗智亮,直接會(huì)通過(guò)它的上游CheckpointRDD忆某,去容錯(cuò)的文件系統(tǒng),比如hdfs鸽素,中褒繁,獲取checkpoint的數(shù)據(jù)。
checkpoint的使用:
1馍忽、sc.checkpointFile("hdfs://")棒坏,設(shè)置checkpoint目錄
2、對(duì)RDD執(zhí)行checkpoint操作
6遭笋、數(shù)據(jù)傾斜解決方案
數(shù)據(jù)傾斜的解決坝冕,跟之前講解的性能調(diào)優(yōu),有一點(diǎn)異曲同工之妙瓦呼。
性能調(diào)優(yōu)中最有效最直接最簡(jiǎn)單的方式就是加資源加并行度喂窟,并注意RDD架構(gòu)(復(fù)用同一個(gè)RDD,加上cache緩存)央串。相對(duì)于前面磨澡,shuffle、jvm等是次要的质和。
6.1稳摄、原理以及現(xiàn)象分析
6.1.1、數(shù)據(jù)傾斜怎么出現(xiàn)的
在執(zhí)行shuffle操作的時(shí)候饲宿,是按照key厦酬,來(lái)進(jìn)行values的數(shù)據(jù)的輸出、拉取和聚合的瘫想。
同一個(gè)key的values仗阅,一定是分配到一個(gè)reduce task進(jìn)行處理的。
多個(gè)key對(duì)應(yīng)的values国夜,比如一共是90萬(wàn)减噪。可能某個(gè)key對(duì)應(yīng)了88萬(wàn)數(shù)據(jù),被分配到一個(gè)task上去面去執(zhí)行旋廷。
另外兩個(gè)task鸠按,可能各分配到了1萬(wàn)數(shù)據(jù),可能是數(shù)百個(gè)key饶碘,對(duì)應(yīng)的1萬(wàn)條數(shù)據(jù)目尖。
這樣就會(huì)出現(xiàn)數(shù)據(jù)傾斜問(wèn)題。
想象一下扎运,出現(xiàn)數(shù)據(jù)傾斜以后的運(yùn)行的情況瑟曲。很糟糕!
其中兩個(gè)task豪治,各分配到了1萬(wàn)數(shù)據(jù)洞拨,可能同時(shí)在10分鐘內(nèi)都運(yùn)行完了。另外一個(gè)task有88萬(wàn)條负拟,88 * 10 = 880分鐘= 14.5個(gè)小時(shí)烦衣。
大家看,本來(lái)另外兩個(gè)task很快就運(yùn)行完畢了(10分鐘)掩浙,但是由于一個(gè)拖后腿的家伙花吟,第三個(gè)task,要14.5個(gè)小時(shí)才能運(yùn)行完厨姚,就導(dǎo)致整個(gè)spark作業(yè)衅澈,也得14.5個(gè)小時(shí)才能運(yùn)行完。
數(shù)據(jù)傾斜谬墙,一旦出現(xiàn)今布,是不是性能殺手?拭抬!
6.1.2部默、發(fā)生數(shù)據(jù)傾斜以后的現(xiàn)象
Spark數(shù)據(jù)傾斜,有兩種表現(xiàn):
1造虎、你的大部分的task甩牺,都執(zhí)行的特別特別快,(你要用client模式累奈,standalone client,yarn client急但,本地機(jī)器一執(zhí)行spark-submit腳本澎媒,就會(huì)開(kāi)始打印log),task175
finished波桩,剩下幾個(gè)task戒努,執(zhí)行的特別特別慢,前面的task儒喊,一般1s可以執(zhí)行完5個(gè)惑申,最后發(fā)現(xiàn)1000個(gè)task,998葫督,999 task撒穷,要執(zhí)行1個(gè)小時(shí)匣椰,2個(gè)小時(shí)才能執(zhí)行完一個(gè)task。
出現(xiàn)以上loginfo端礼,就表明出現(xiàn)數(shù)據(jù)傾斜了禽笑。
這樣還算好的,因?yàn)殡m然老牛拉破車一樣非常慢蛤奥,但是至少還能跑佳镜。
2、另一種情況是凡桥,運(yùn)行的時(shí)候蟀伸,其他task都執(zhí)行完了,也沒(méi)什么特別的問(wèn)題缅刽,但是有的task啊掏,就是會(huì)突然間報(bào)了一個(gè)OOM,JVM Out Of Memory拷恨,內(nèi)存溢出了脖律,task failed,task lost腕侄,resubmitting
task小泉。反復(fù)執(zhí)行幾次都到了某個(gè)task就是跑不通,最后就掛掉冕杠。
某個(gè)task就直接OOM微姊,那么基本上也是因?yàn)閿?shù)據(jù)傾斜了,task分配的數(shù)量實(shí)在是太大了分预!所以內(nèi)存放不下兢交,然后你的task每處理一條數(shù)據(jù),還要?jiǎng)?chuàng)建大量的對(duì)象笼痹,內(nèi)存爆掉了配喳。
這樣也表明出現(xiàn)數(shù)據(jù)傾斜了。
這種就不太好了凳干,因?yàn)槟愕某绦蛉绻蝗ソ鉀Q數(shù)據(jù)傾斜的問(wèn)題晴裹,壓根兒就跑不出來(lái)。
作業(yè)都跑不完救赐,還談什么性能調(diào)優(yōu)這些東西涧团?!
6.1.3、定位數(shù)據(jù)傾斜出現(xiàn)的原因與出現(xiàn)問(wèn)題的位置
根據(jù)log去定位
出現(xiàn)數(shù)據(jù)傾斜的原因泌绣,基本只可能是因?yàn)榘l(fā)生了shuffle操作钮追,在shuffle的過(guò)程中,出現(xiàn)了數(shù)據(jù)傾斜的問(wèn)題阿迈。因?yàn)槟硞€(gè)或者某些key對(duì)應(yīng)的數(shù)據(jù)元媚,遠(yuǎn)遠(yuǎn)的高于其他的key。
1仿滔、你在自己的程序里面找找惠毁,哪些地方用了會(huì)產(chǎn)生shuffle的算子,groupByKey崎页、countByKey鞠绰、reduceByKey、join
2飒焦、看log
log一般會(huì)報(bào)是在你的哪一行代碼蜈膨,導(dǎo)致了OOM異常∥或者看log翁巍,看看是執(zhí)行到了第幾個(gè)stage。spark代碼休雌,是怎么劃分成一個(gè)一個(gè)的stage的灶壶。哪一個(gè)stage生成的task特別慢,就能夠自己用肉眼去對(duì)你的spark代碼進(jìn)行stage的劃分杈曲,就能夠通過(guò)stage定位到你的代碼驰凛,到底哪里發(fā)生了數(shù)據(jù)傾斜。
6.2担扑、聚合源數(shù)據(jù)以及過(guò)濾導(dǎo)致傾斜的key
數(shù)據(jù)傾斜解決方案恰响,第一個(gè)方案和第二個(gè)方案,一起來(lái)講涌献。這兩個(gè)方案是最直接胚宦、最有效、最簡(jiǎn)單的解決數(shù)據(jù)傾斜問(wèn)題的方案燕垃。
第一個(gè)方案:聚合源數(shù)據(jù)枢劝。
第二個(gè)方案:過(guò)濾導(dǎo)致傾斜的key。
后面的五個(gè)方案卜壕,尤其是最后4個(gè)方案呈野,都是那種特別狂拽炫酷吊炸天的方案。但沒(méi)有第一二個(gè)方案簡(jiǎn)單直接印叁。如果碰到了數(shù)據(jù)傾斜的問(wèn)題。上來(lái)就先考慮第一個(gè)和第二個(gè)方案看能不能做,如果能做的話轮蜕,后面的5個(gè)方案昨悼,都不用去搞了。
有效跃洛、簡(jiǎn)單率触、直接才是最好的,徹底根除了數(shù)據(jù)傾斜的問(wèn)題汇竭。
6.2.1葱蝗、方案一:聚合源數(shù)據(jù)
一些聚合的操作,比如groupByKey细燎、reduceByKey两曼,groupByKey說(shuō)白了就是拿到每個(gè)key對(duì)應(yīng)的values。reduceByKey說(shuō)白了就是對(duì)每個(gè)key對(duì)應(yīng)的values執(zhí)行一定的計(jì)算玻驻。
這些操作悼凑,比如groupByKey和reduceByKey,包括之前說(shuō)的join璧瞬。都是在spark作業(yè)中執(zhí)行的户辫。
spark作業(yè)的數(shù)據(jù)來(lái)源,通常是哪里呢嗤锉?90%的情況下渔欢,數(shù)據(jù)來(lái)源都是hive表(hdfs,大數(shù)據(jù)分布式存儲(chǔ)系統(tǒng))瘟忱。hdfs上存儲(chǔ)的大數(shù)據(jù)奥额。hive表中的數(shù)據(jù)通常是怎么出來(lái)的呢?有了spark以后酷誓,hive比較適合做什么事情披坏?hive就是適合做離線的,晚上凌晨跑的盐数,ETL(extract transform load棒拂,數(shù)據(jù)的采集、清洗玫氢、導(dǎo)入)帚屉,hive
sql,去做這些事情漾峡,從而去形成一個(gè)完整的hive中的數(shù)據(jù)倉(cāng)庫(kù)攻旦。說(shuō)白了,數(shù)據(jù)倉(cāng)庫(kù)生逸,就是一堆表牢屋。
spark作業(yè)的源表且预,hive表,通常情況下來(lái)說(shuō)烙无,也是通過(guò)某些hive etl生成的锋谐。hive etl可能是晚上凌晨在那兒跑。今天跑昨天的數(shù)據(jù)截酷。
數(shù)據(jù)傾斜涮拗,某個(gè)key對(duì)應(yīng)的80萬(wàn)數(shù)據(jù),某些key對(duì)應(yīng)幾百條迂苛,某些key對(duì)應(yīng)幾十條∪龋現(xiàn)在咱們直接在生成hive表的hive etl中對(duì)數(shù)據(jù)進(jìn)行聚合。比如按key來(lái)分組三幻,將key對(duì)應(yīng)的所有的values全部用一種特殊的格式拼接到一個(gè)字符串里面去就漾,比如“key=sessionid, value: action_seq=1|user_id=1|search_keyword=火鍋|category_id=001;action_seq=2|user_id=1|search_keyword=涮肉|category_id=001”。
對(duì)key進(jìn)行g(shù)roup赌髓,在spark中从藤,拿到key=sessionid,values锁蠕。hive etl中夷野,直接對(duì)key進(jìn)行了聚合。那么也就意味著荣倾,每個(gè)key就只對(duì)應(yīng)一條數(shù)據(jù)悯搔。在spark中,就不需要再去執(zhí)行g(shù)roupByKey+map這種操作了舌仍。直接對(duì)每個(gè)key對(duì)應(yīng)的values字符串進(jìn)行map操作妒貌,進(jìn)行你需要的操作即可。
spark中铸豁,可能對(duì)這個(gè)操作灌曙,就不需要執(zhí)行shffule操作了,也就根本不可能導(dǎo)致數(shù)據(jù)傾斜节芥。
或者是對(duì)每個(gè)key在hive etl中進(jìn)行聚合在刺,對(duì)所有values聚合一下,不一定是拼接起來(lái)头镊,可能是直接進(jìn)行計(jì)算蚣驼。reduceByKey計(jì)算函數(shù)應(yīng)用在hive etl中,從而得到每個(gè)key的values相艇。
聚合源數(shù)據(jù)方案第二種做法是颖杏,你可能沒(méi)有辦法對(duì)每個(gè)key聚合出來(lái)一條數(shù)據(jù)。那么也可以做一個(gè)妥協(xié)坛芽,對(duì)每個(gè)key對(duì)應(yīng)的數(shù)據(jù)留储,10萬(wàn)條翼抠。有好幾個(gè)粒度,比如10萬(wàn)條里面包含了幾個(gè)城市获讳、幾天机久、幾個(gè)地區(qū)的數(shù)據(jù),現(xiàn)在放粗粒度赔嚎。直接就按照城市粒度,做一下聚合胧弛,幾個(gè)城市尤误,幾天、幾個(gè)地區(qū)粒度的數(shù)據(jù)结缚,都給聚合起來(lái)损晤。比如說(shuō)
city_id date area_id
select ... from ... group by city_id
盡量去聚合,減少每個(gè)key對(duì)應(yīng)的數(shù)量红竭,也許聚合到比較粗的粒度之后尤勋,原先有10萬(wàn)數(shù)據(jù)量的key,現(xiàn)在只有1萬(wàn)數(shù)據(jù)量茵宪。減輕數(shù)據(jù)傾斜的現(xiàn)象和問(wèn)題最冰。
6.2.2、方案二:過(guò)濾導(dǎo)致傾斜的key
如果你能夠接受某些數(shù)據(jù)在spark作業(yè)中直接就摒棄掉不使用稀火。比如說(shuō)暖哨,總共有100萬(wàn)個(gè)key。只有2個(gè)key是數(shù)據(jù)量達(dá)到10萬(wàn)的凰狞。其他所有的key篇裁,對(duì)應(yīng)的數(shù)量都是幾十萬(wàn)。
這個(gè)時(shí)候赡若,你自己可以去取舍达布,如果業(yè)務(wù)和需求可以理解和接受的話,在你從hive表查詢?cè)磾?shù)據(jù)的時(shí)候逾冬,直接在sql中用where條件黍聂,過(guò)濾掉某幾個(gè)key。
那么這幾個(gè)原先有大量數(shù)據(jù)粉渠,會(huì)導(dǎo)致數(shù)據(jù)傾斜的key分冈,被過(guò)濾掉之后,那么在你的spark作業(yè)中霸株,自然就不會(huì)發(fā)生數(shù)據(jù)傾斜了雕沉。
6.3、提高shuffle操作reduce并行度
6.3.1去件、問(wèn)題描述
第一個(gè)和第二個(gè)方案坡椒,都不適合做扰路,然后再考慮這個(gè)方案。
將reduce task的數(shù)量變多倔叼,就可以讓每個(gè)reduce task分配到更少的數(shù)據(jù)量汗唱。這樣的話也許就可以緩解甚至是基本解決掉數(shù)據(jù)傾斜的問(wèn)題。
6.3.2丈攒、提升shuffle reduce端并行度的操作方法
很簡(jiǎn)單哩罪,主要給我們所有的shuffle算子,比如groupByKey巡验、countByKey际插、reduceByKey。在調(diào)用的時(shí)候显设,傳入進(jìn)去一個(gè)參數(shù)框弛。那個(gè)數(shù)字,就代表了那個(gè)shuffle操作的reduce端的并行度捕捂。那么在進(jìn)行shuffle操作的時(shí)候瑟枫,就會(huì)對(duì)應(yīng)著創(chuàng)建指定數(shù)量的reduce task。
這樣的話指攒,就可以讓每個(gè)reduce task分配到更少的數(shù)據(jù)慷妙。基本可以緩解數(shù)據(jù)傾斜的問(wèn)題幽七。
比如說(shuō)景殷,原本某個(gè)task分配數(shù)據(jù)特別多,直接OOM澡屡,內(nèi)存溢出了猿挚,程序沒(méi)法運(yùn)行,直接掛掉驶鹉。按照l(shuí)og绩蜻,找到發(fā)生數(shù)據(jù)傾斜的shuffle操作,給它傳入一個(gè)并行度數(shù)字室埋,這樣的話办绝,原先那個(gè)task分配到的數(shù)據(jù),肯定會(huì)變少姚淆。就至少可以避免OOM的情況孕蝉,程序至少是可以跑的。
6.3.2腌逢、提升shuffle reduce并行度的缺陷
治標(biāo)不治本的意思降淮,因?yàn)樗鼪](méi)有從根本上改變數(shù)據(jù)傾斜的本質(zhì)和問(wèn)題。不像第一個(gè)和第二個(gè)方案(直接避免了數(shù)據(jù)傾斜的發(fā)生)搏讶。原理沒(méi)有改變佳鳖,只是說(shuō)霍殴,盡可能地去緩解和減輕shuffle reduce task的數(shù)據(jù)壓力,以及數(shù)據(jù)傾斜的問(wèn)題系吩。
實(shí)際生產(chǎn)環(huán)境中的經(jīng)驗(yàn):
1来庭、如果最理想的情況下,提升并行度以后穿挨,減輕了數(shù)據(jù)傾斜的問(wèn)題月弛,或者甚至可以讓數(shù)據(jù)傾斜的現(xiàn)象忽略不計(jì),那么就最好科盛。就不用做其他的數(shù)據(jù)傾斜解決方案了尊搬。
2、不太理想的情況下土涝,比如之前某個(gè)task運(yùn)行特別慢,要5個(gè)小時(shí)幌墓,現(xiàn)在稍微快了一點(diǎn)但壮,變成了4個(gè)小時(shí)〕B拢或者是原先運(yùn)行到某個(gè)task蜡饵,直接OOM,現(xiàn)在至少不會(huì)OOM了胳施,但是那個(gè)task運(yùn)行特別慢溯祸,要5個(gè)小時(shí)才能跑完。
那么舞肆,如果出現(xiàn)第二種情況的話焦辅,各位,就立即放棄第三種方案椿胯,開(kāi)始去嘗試和選擇后面的四種方案筷登。
6.4、使用隨機(jī)key實(shí)現(xiàn)雙重聚合
6.4.1哩盲、使用場(chǎng)景
groupByKey前方、reduceByKey比較適合使用這種方式。join咱們通常不會(huì)這樣來(lái)做廉油,后面會(huì)講三種針對(duì)不同的join造成的數(shù)據(jù)傾斜的問(wèn)題的解決方案惠险。
6.4.2、解決方案
第一輪聚合的時(shí)候抒线,對(duì)key進(jìn)行打散班巩,將原先一樣的key,變成不一樣的key十兢,相當(dāng)于是將每個(gè)key分為多組趣竣。
先針對(duì)多個(gè)組摇庙,進(jìn)行key的局部聚合。接著遥缕,再去除掉每個(gè)key的前綴卫袒,然后對(duì)所有的key進(jìn)行全局的聚合。
對(duì)groupByKey单匣、reduceByKey造成的數(shù)據(jù)傾斜夕凝,有比較好的效果。
如果說(shuō)户秤,之前的第一码秉、第二、第三種方案鸡号,都沒(méi)法解決數(shù)據(jù)傾斜的問(wèn)題转砖,那么就只能依靠這一種方式了。
6.5鲸伴、將reduce join轉(zhuǎn)換為map join
6.5.1府蔗、使用方式
普通的join,那么肯定是要走shuffle汞窗。既然是走shuffle姓赤,那么普通的join就肯定是走的是reduce join。那怎么將reduce join 轉(zhuǎn)換為mapjoin呢仲吏?先將所有相同的key不铆,對(duì)應(yīng)的value匯聚到一個(gè)task中,然后再進(jìn)行join裹唆。
6.5.2誓斥、使用場(chǎng)景
這種方式適合在什么樣的情況下來(lái)使用?
如果兩個(gè)RDD要進(jìn)行join许帐,其中一個(gè)RDD是比較小的岖食。比如一個(gè)RDD是100萬(wàn)數(shù)據(jù),一個(gè)RDD是1萬(wàn)數(shù)據(jù)舞吭。(一個(gè)RDD是1億數(shù)據(jù)泡垃,一個(gè)RDD是100萬(wàn)數(shù)據(jù))。
其中一個(gè)RDD必須是比較小的羡鸥,broadcast出去那個(gè)小RDD的數(shù)據(jù)以后蔑穴,就會(huì)在每個(gè)executor的block manager中都保存一份。要確保你的內(nèi)存足夠存放那個(gè)小RDD中的數(shù)據(jù)惧浴。
這種方式下存和,根本不會(huì)發(fā)生shuffle操作,肯定也不會(huì)發(fā)生數(shù)據(jù)傾斜。從根本上杜絕了join操作可能導(dǎo)致的數(shù)據(jù)傾斜的問(wèn)題捐腿。
對(duì)于join中有數(shù)據(jù)傾斜的情況纵朋,大家盡量第一時(shí)間先考慮這種方式,效果非常好茄袖。
不適合的情況
兩個(gè)RDD都比較大操软,那么這個(gè)時(shí)候,你去將其中一個(gè)RDD做成broadcast聂薪,就很笨拙了蝗羊。很可能導(dǎo)致內(nèi)存不足。最終導(dǎo)致內(nèi)存溢出耀找,程序掛掉翔悠。
而且其中某些key(或者是某個(gè)key),還發(fā)生了數(shù)據(jù)傾斜野芒。此時(shí)可以采用最后兩種方式凉驻。
對(duì)于join這種操作,不光是考慮數(shù)據(jù)傾斜的問(wèn)題复罐。即使是沒(méi)有數(shù)據(jù)傾斜問(wèn)題,也完全可以優(yōu)先考慮雄家,用我們講的這種高級(jí)的reduce join轉(zhuǎn)map join的技術(shù)效诅,不要用普通的join,去通過(guò)shuffle趟济,進(jìn)行數(shù)據(jù)的join乱投。完全可以通過(guò)簡(jiǎn)單的map,使用map join的方式顷编,犧牲一點(diǎn)內(nèi)存資源戚炫。在可行的情況下,優(yōu)先這么使用媳纬。
不走shuffle钮惠,直接走map蔑赘,是不是性能也會(huì)高很多?這是肯定的。
6.6卤材、sample采樣傾斜key單獨(dú)進(jìn)行join
6.6.1、方案實(shí)現(xiàn)思路
將發(fā)生數(shù)據(jù)傾斜的key,單獨(dú)拉出來(lái)卓练,放到一個(gè)RDD中去狮含。就用這個(gè)原本會(huì)傾斜的key RDD跟其他RDD單獨(dú)去join一下,這個(gè)時(shí)候key對(duì)應(yīng)的數(shù)據(jù)可能就會(huì)分散到多個(gè)task中去進(jìn)行join操作木羹。
就不至于說(shuō)是弛姜,這個(gè)key跟之前其他的key混合在一個(gè)RDD中時(shí)盅惜,肯定是會(huì)導(dǎo)致一個(gè)key對(duì)應(yīng)的所有數(shù)據(jù)都到一個(gè)task中去,就會(huì)導(dǎo)致數(shù)據(jù)傾斜屈芜。
6.6.2眠寿、使用場(chǎng)景
這種方案什么時(shí)候適合使用盒发?
優(yōu)先對(duì)于join奢浑,肯定是希望能夠采用上一個(gè)方案壤蚜,即reduce join轉(zhuǎn)換map join实柠。兩個(gè)RDD數(shù)據(jù)都比較大钢拧,那么就不要那么搞了葡粒。
針對(duì)你的RDD的數(shù)據(jù),你可以自己把它轉(zhuǎn)換成一個(gè)中間表,或者是直接用countByKey()的方式,你可以看一下這個(gè)RDD各個(gè)key對(duì)應(yīng)的數(shù)據(jù)量姨蝴。此時(shí)如果你發(fā)現(xiàn)整個(gè)RDD就一個(gè)搓谆,或者少數(shù)幾個(gè)key對(duì)應(yīng)的數(shù)據(jù)量特別多滤钱。盡量建議,比如就是一個(gè)key對(duì)應(yīng)的數(shù)據(jù)量特別多憋飞。
此時(shí)可以采用這種方案内狸,單拉出來(lái)那個(gè)最多的key,單獨(dú)進(jìn)行join舞萄,盡可能地將key分散到各個(gè)task上去進(jìn)行join操作。
什么時(shí)候不適用呢?
如果一個(gè)RDD中,導(dǎo)致數(shù)據(jù)傾斜的key特別多。那么此時(shí),最好還是不要這樣了。還是使用我們最后一個(gè)方案唇敞,終極的join數(shù)據(jù)傾斜的解決方案镶柱。
就是說(shuō)鞋屈,咱們單拉出來(lái)了一個(gè)或者少數(shù)幾個(gè)可能會(huì)產(chǎn)生數(shù)據(jù)傾斜的key逻卖,然后還可以進(jìn)行更加優(yōu)化的一個(gè)操作。
對(duì)于那個(gè)key,從另外一個(gè)要join的表中,也過(guò)濾出來(lái)一份數(shù)據(jù)邮弹,比如可能就只有一條數(shù)據(jù)黔衡。userid2infoRDD,一個(gè)userid key腌乡,就對(duì)應(yīng)一條數(shù)據(jù)盟劫。
然后呢,采取對(duì)那個(gè)只有一條數(shù)據(jù)的RDD影所,進(jìn)行flatMap操作,打上100個(gè)隨機(jī)數(shù)嫉称,作為前綴渣触,返回100條數(shù)據(jù)。
單獨(dú)拉出來(lái)的可能產(chǎn)生數(shù)據(jù)傾斜的RDD碉纺,給每一條數(shù)據(jù),都打上一個(gè)100以內(nèi)的隨機(jī)數(shù),作為前綴。
再去進(jìn)行join邑雅,是不是性能就更好了色冀。肯定可以將數(shù)據(jù)進(jìn)行打散挫鸽,去進(jìn)行join搏屑。join完以后单默,可以執(zhí)行map操作境蜕,去將之前打上的隨機(jī)數(shù)給去掉币励,然后再和另外一個(gè)普通RDD join以后的結(jié)果進(jìn)行union操作挠将。
6.7、使用隨機(jī)數(shù)以及擴(kuò)容表進(jìn)行join
6.7.1编整、使用場(chǎng)景及步驟
當(dāng)采用隨機(jī)數(shù)和擴(kuò)容表進(jìn)行join解決數(shù)據(jù)傾斜的時(shí)候捐名,就代表著,你的之前的數(shù)據(jù)傾斜的解決方案闹击,都沒(méi)法使用镶蹋。
這個(gè)方案是沒(méi)辦法徹底解決數(shù)據(jù)傾斜的,更多的,是一種對(duì)數(shù)據(jù)傾斜的緩解贺归。
步驟:
1淆两、選擇一個(gè)RDD,要用flatMap拂酣,進(jìn)行擴(kuò)容秋冰,將每條數(shù)據(jù),映射為多條數(shù)據(jù)婶熬,每個(gè)映射出來(lái)的數(shù)據(jù)剑勾,都帶了一個(gè)n以內(nèi)的隨機(jī)數(shù),通常來(lái)說(shuō)會(huì)選擇10赵颅。
2虽另、將另外一個(gè)RDD,做普通的map映射操作饺谬,每條數(shù)據(jù)都打上一個(gè)10以內(nèi)的隨機(jī)數(shù)捂刺。
3、最后將兩個(gè)處理后的RDD進(jìn)行join操作募寨。
6.7.2族展、局限性
1、因?yàn)槟愕膬蓚€(gè)RDD都很大拔鹰,所以你沒(méi)有辦法去將某一個(gè)RDD擴(kuò)的特別大仪缸,一般咱們就是10倍。
2列肢、如果就是10倍的話腹殿,那么數(shù)據(jù)傾斜問(wèn)題的確是只能說(shuō)是緩解和減輕,不能說(shuō)徹底解決例书。
sample采樣傾斜key并單獨(dú)進(jìn)行join
將key锣尉,從另外一個(gè)RDD中過(guò)濾出的數(shù)據(jù),可能只有一條或者幾條决采,此時(shí)自沧,咱們可以任意進(jìn)行擴(kuò)容,擴(kuò)成1000倍树瞭。
將從第一個(gè)RDD中拆分出來(lái)的那個(gè)傾斜key RDD拇厢,打上1000以內(nèi)的一個(gè)隨機(jī)數(shù)。
這種情況下晒喷,還可以配合上孝偎,提升shuffle reduce并行度,join(rdd, 1000)凉敲。通常情況下衣盾,效果還是非常不錯(cuò)的寺旺。
打散成100份,甚至1000份势决,2000份阻塑,去進(jìn)行join,那么就肯定沒(méi)有數(shù)據(jù)傾斜的問(wèn)題了吧果复。
附:實(shí)時(shí)計(jì)算程序性能調(diào)優(yōu)
1陈莽、并行化數(shù)據(jù)接收:處理多個(gè)topic的數(shù)據(jù)時(shí)比較有效
int numStreams = 5;
List>kafkaStreams = new ArrayList>(numStreams);
for (int i = 0; i < numStreams; i++) {
kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream unifiedStream= streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,kafkaStreams.size()));
unifiedStream.print();
2、spark.streaming.blockInterval:增加block數(shù)量虽抄,增加每個(gè)batch
rdd的partition數(shù)量走搁,增加處理并行度
receiver從數(shù)據(jù)源源源不斷地獲取到數(shù)據(jù);首先是會(huì)按照block interval迈窟,將指定時(shí)間間隔的數(shù)據(jù)私植,收集為一個(gè)block;默認(rèn)時(shí)間是200ms菠隆,官方推薦不要小于50ms兵琳;接著呢狂秘,會(huì)將指定batch interval時(shí)間間隔內(nèi)的block骇径,合并為一個(gè)batch;創(chuàng)建為一個(gè)rdd者春,然后啟動(dòng)一個(gè)job破衔,去處理這個(gè)batch rdd中的數(shù)據(jù)
batch rdd,它的partition數(shù)量是多少呢钱烟?一個(gè)batch有多少個(gè)block晰筛,就有多少個(gè)partition;就意味著并行度是多少拴袭;就意味著每個(gè)batch rdd有多少個(gè)task會(huì)并行計(jì)算和處理读第。
當(dāng)然是希望可以比默認(rèn)的task數(shù)量和并行度再多一些了;可以手動(dòng)調(diào)節(jié)block interval拥刻;減少block interval怜瞒;每個(gè)batch可以包含更多的block;有更多的partition般哼;也就有更多的task并行處理每個(gè)batch rdd吴汪。
定死了,初始的rdd過(guò)來(lái)蒸眠,直接就是固定的partition數(shù)量了
3漾橙、inputStream.repartition(
of partitions>):重分區(qū),增加每個(gè)batch rdd的partition數(shù)量
有些時(shí)候楞卡,希望對(duì)某些dstream中的rdd進(jìn)行定制化的分區(qū)
對(duì)dstream中的rdd進(jìn)行重分區(qū)霜运,去重分區(qū)成指定數(shù)量的分區(qū)脾歇,這樣也可以提高指定dstream的rdd的計(jì)算并行度
4、調(diào)節(jié)并行度
spark.default.parallelism
reduceByKey(numPartitions)
5觉渴、使用Kryo序列化機(jī)制:
spark streaming介劫,也是有不少序列化的場(chǎng)景的
提高序列化task發(fā)送到executor上執(zhí)行的性能,如果task很多的時(shí)候案淋,task序列化和反序列化的性能開(kāi)銷也比較可觀
默認(rèn)輸入數(shù)據(jù)的存儲(chǔ)級(jí)別是StorageLevel.MEMORY_AND_DISK_SER_2座韵,receiver接收到數(shù)據(jù),默認(rèn)就會(huì)進(jìn)行持久化操作踢京;首先序列化數(shù)據(jù)誉碴,存儲(chǔ)到內(nèi)存中;如果內(nèi)存資源不夠大瓣距,那么就寫(xiě)入磁盤(pán)黔帕;而且,還會(huì)寫(xiě)一份冗余副本到其他executor的block manager中蹈丸,進(jìn)行數(shù)據(jù)冗余成黄。
6、batch interval:每個(gè)的處理時(shí)間必須小于batchinterval
實(shí)際上你的spark streaming跑起來(lái)以后逻杖,其實(shí)都是可以在spark ui上觀察它的運(yùn)行情況的奋岁;可以看到batch的處理時(shí)間;
如果發(fā)現(xiàn)batch的處理時(shí)間大于batch interval荸百,就必須調(diào)節(jié)batch interval
盡量不要讓batch處理時(shí)間大于batch interval
比如你的batch每隔5秒生成一次闻伶;你的batch處理時(shí)間要達(dá)到6秒;就會(huì)出現(xiàn)够话,batch在你的內(nèi)存中日積月累蓝翰,一直囤積著,沒(méi)法及時(shí)計(jì)算掉女嘲,釋放內(nèi)存空間畜份;而且對(duì)內(nèi)存空間的占用越來(lái)越大,那么此時(shí)會(huì)導(dǎo)致內(nèi)存空間快速消耗
如果發(fā)現(xiàn)batch處理時(shí)間比batch interval要大欣尼,就盡量將batch interval調(diào)節(jié)大一些
作者:Frank_8942
鏈接:http://www.reibang.com/p/6dee4cffcb56
來(lái)源:簡(jiǎn)書(shū)
簡(jiǎn)書(shū)著作權(quán)歸作者所有爆雹,任何形式的轉(zhuǎn)載都請(qǐng)聯(lián)系作者獲得授權(quán)并注明出處。