spark調(diào)優(yōu)

1、 性能調(diào)優(yōu)

1.1荸实、 分配更多資源

1.1.1旱函、分配哪些資源?

Executor的數(shù)量

每個(gè)Executor所能分配的CPU數(shù)量

每個(gè)Executor所能分配的內(nèi)存量

Driver端分配的內(nèi)存數(shù)量

1.1.2、在哪里分配這些資源?

在生產(chǎn)環(huán)境中,提交spark作業(yè)時(shí)暇唾,用的spark-submit shell腳本尘奏,里面調(diào)整對(duì)應(yīng)的參數(shù):

/usr/local/spark/bin/spark-submit\

--classcn.spark.sparktest.core.WordCountCluster \

--num-executors3 \? 配置executor的數(shù)量

--driver-memory100m \? 配置driver的內(nèi)存(影響不大)

--executor-memory100m \? 配置每個(gè)executor的內(nèi)存大小

--total-executor-cores3 \? 配置所有executor的cpu core數(shù)量

/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar\

1.1.3苛蒲、調(diào)節(jié)到多大,算是最大呢柜与?

常用的資源調(diào)度模式有Spark Standalone和Spark On Yarn。比如說(shuō)你的每臺(tái)機(jī)器能夠給你使用60G內(nèi)存发钝,10個(gè)cpu core夺英,20臺(tái)機(jī)器晌涕。那么executor的數(shù)量是20。平均每個(gè)executor所能分配60G內(nèi)存和10個(gè)cpu core痛悯。


1.1.4余黎、為什么多分配了這些資源以后,性能會(huì)得到提升载萌?

[if !supportLists]?? [endif]增加executor:

如果executor數(shù)量比較少惧财,那么,能夠并行執(zhí)行的task數(shù)量就比較少扭仁,就意味著垮衷,我們的Application的并行執(zhí)行的能力就很弱。

比如有3個(gè)executor乖坠,每個(gè)executor有2個(gè)cpu core搀突,那么同時(shí)能夠并行執(zhí)行的task,就是6個(gè)熊泵。6個(gè)執(zhí)行完以后仰迁,再換下一批6個(gè)task。

增加了executor數(shù)量以后顽分,那么徐许,就意味著,能夠并行執(zhí)行的task數(shù)量怯邪,也就變多了绊寻。比如原先是6個(gè),現(xiàn)在可能可以并行執(zhí)行10個(gè)悬秉,甚至20個(gè),100個(gè)冰蘑。那么并行能力就比之前提升了數(shù)倍和泌,數(shù)十倍。相應(yīng)的祠肥,性能(執(zhí)行的速度)武氓,也能提升數(shù)倍~數(shù)十倍。


增加每個(gè)executor的cpu core仇箱,也是增加了執(zhí)行的并行能力县恕。原本20個(gè)executor,每個(gè)才2個(gè)cpu core剂桥。能夠并行執(zhí)行的task數(shù)量忠烛,就是40個(gè)task。

現(xiàn)在每個(gè)executor的cpu core权逗,增加到了4個(gè)美尸。能夠并行執(zhí)行的task數(shù)量冤议,就是100個(gè)task。就物理性能來(lái)看师坎,執(zhí)行的速度恕酸,提升了2倍。

增加每個(gè)executor的內(nèi)存量胯陋。增加了內(nèi)存量以后蕊温,對(duì)性能的提升,有三點(diǎn):

1遏乔、如果需要對(duì)RDD進(jìn)行cache义矛,那么更多的內(nèi)存,就可以緩存更多的數(shù)據(jù)按灶,將更少的數(shù)據(jù)寫入磁盤症革,甚至不寫入磁盤。減少了磁盤IO鸯旁。

2噪矛、對(duì)于shuffle操作,reduce端铺罢,會(huì)需要內(nèi)存來(lái)存放拉取的數(shù)據(jù)并進(jìn)行聚合艇挨。如果內(nèi)存不夠,也會(huì)寫入磁盤韭赘。如果給executor分配更多內(nèi)存以后缩滨,就有更少的數(shù)據(jù),需要寫入磁盤泉瞻,甚至不需要寫入磁盤脉漏。減少了磁盤IO,提升了性能袖牙。

3侧巨、對(duì)于task的執(zhí)行,可能會(huì)創(chuàng)建很多對(duì)象鞭达。如果內(nèi)存比較小司忱,可能會(huì)頻繁導(dǎo)致JVM堆內(nèi)存滿了,然后頻繁GC畴蹭,垃圾回收坦仍,minor GC和full GC。(速度很慢)叨襟。內(nèi)存加大以后繁扎,帶來(lái)更少的GC,垃圾回收芹啥,避免了速度變慢锻离,速度變快了铺峭。

1.2、調(diào)節(jié)并行度

1.2.1汽纠、并行度的概念

就是指的是Spark作業(yè)中卫键,各個(gè)stage的task數(shù)量,代表了Spark作業(yè)的在各個(gè)階段(stage)的并行度虱朵。

1.2.2穷躁、如果不調(diào)節(jié)并行度僻焚,導(dǎo)致并行度過(guò)低,會(huì)怎么樣?

比如現(xiàn)在spark-submit腳本里面媳拴,給我們的spark作業(yè)分配了足夠多的資源科贬,比如50個(gè)executor府适,每個(gè)executor有10G內(nèi)存唯绍,每個(gè)executor有3個(gè)cpu core〕ズ桑基本已經(jīng)達(dá)到了集群或者yarn隊(duì)列的資源上限窘游。task沒(méi)有設(shè)置,或者設(shè)置的很少跳纳,比如就設(shè)置了100個(gè)task忍饰,50個(gè)executor,每個(gè)executor有3個(gè)cpu core寺庄,也就是說(shuō)艾蓝,你的Application任何一個(gè)stage運(yùn)行的時(shí)候,都有總數(shù)在150個(gè)cpu core斗塘,可以并行運(yùn)行赢织。但是你現(xiàn)在,只有100個(gè)task馍盟,平均分配一下敌厘,每個(gè)executor分配到2個(gè)task,ok朽合,那么同時(shí)在運(yùn)行的task,只有100個(gè)饱狂,每個(gè)executor只會(huì)并行運(yùn)行2個(gè)task曹步。每個(gè)executor剩下的一個(gè)cpu core,就浪費(fèi)掉了休讳。

你的資源雖然分配足夠了讲婚,但是問(wèn)題是,并行度沒(méi)有與資源相匹配俊柔,導(dǎo)致你分配下去的資源都浪費(fèi)掉了筹麸。

合理的并行度的設(shè)置活合,應(yīng)該是要設(shè)置的足夠大,大到可以完全合理的利用你的集群資源物赶。比如上面的例子白指,總共集群有150個(gè)cpu core,可以并行運(yùn)行150個(gè)task酵紫。那么就應(yīng)該將你的Application的并行度告嘲,至少設(shè)置成150,才能完全有效的利用你的集群資源奖地,讓150個(gè)task橄唬,并行執(zhí)行。而且task增加到150個(gè)以后参歹,即可以同時(shí)并行運(yùn)行仰楚,還可以讓每個(gè)task要處理的數(shù)據(jù)量變少。比如總共150G的數(shù)據(jù)要處理犬庇,如果是100個(gè)task僧界,每個(gè)task計(jì)算1.5G的數(shù)據(jù),現(xiàn)在增加到150個(gè)task械筛,可以并行運(yùn)行捎泻,而且每個(gè)task主要處理1G的數(shù)據(jù)就可以。

很簡(jiǎn)單的道理埋哟,只要合理設(shè)置并行度笆豁,就可以完全充分利用你的集群計(jì)算資源,并且減少每個(gè)task要處理的數(shù)據(jù)量赤赊,最終闯狱,就是提升你的整個(gè)Spark作業(yè)的性能和運(yùn)行速度。

1.2.3抛计、設(shè)置并行度

1)哄孤、task數(shù)量,至少設(shè)置成與Spark

application的總cpu core數(shù)量相同(最理想情況吹截,比如總共150個(gè)cpu core瘦陈,分配了150個(gè)task,一起運(yùn)行波俄,差不多同一時(shí)間運(yùn)行完畢)晨逝。

2)、官方是推薦懦铺,task數(shù)量捉貌,設(shè)置成spark

application總cpu core數(shù)量的2~3倍,比如150個(gè)cpu core,基本要設(shè)置task數(shù)量為300~500趁窃。

實(shí)際情況牧挣,與理想情況不同的,有些task會(huì)運(yùn)行的快一點(diǎn)醒陆,比如50s就完了瀑构,有些task,可能會(huì)慢一點(diǎn)统求,要1分半才運(yùn)行完检碗,所以如果你的task數(shù)量,剛好設(shè)置的跟cpu core數(shù)量相同码邻,可能還是會(huì)導(dǎo)致資源的浪費(fèi)折剃。比如150個(gè)task,10個(gè)先運(yùn)行完了像屋,剩余140個(gè)還在運(yùn)行怕犁,但是這個(gè)時(shí)候,有10個(gè)cpu core就空閑出來(lái)了己莺,就導(dǎo)致了浪費(fèi)奏甫。那如果task數(shù)量設(shè)置成cpu core總數(shù)的2~3倍,那么一個(gè)task運(yùn)行完了以后凌受,另一個(gè)task馬上可以補(bǔ)上來(lái)阵子,就盡量讓cpu core不要空閑,同時(shí)也是盡量提升spark作業(yè)運(yùn)行的效率和速度胜蛉,提升性能挠进。

3)、如何設(shè)置一個(gè)Spark

Application的并行度誊册?

spark.default.parallelism

SparkConf conf = newSparkConf()

? ???????????? .set("spark.default.parallelism","500")


1.3领突、 重構(gòu)RDD架構(gòu)以及RDD持久化

1.3.1、RDD架構(gòu)重構(gòu)與優(yōu)化

盡量去復(fù)用RDD案怯,差不多的RDD君旦,可以抽取成為一個(gè)共同的RDD,供后面的RDD計(jì)算時(shí)嘲碱,反復(fù)使用金砍。

1.3.2、公共RDD一定要實(shí)現(xiàn)持久化

對(duì)于要多次計(jì)算和使用的公共RDD麦锯,一定要進(jìn)行持久化捞魁。

持久化,就是將RDD的數(shù)據(jù)緩存到內(nèi)存中/磁盤中(BlockManager)以后無(wú)論對(duì)這個(gè)RDD做多少次計(jì)算离咐,那么都是直接取這個(gè)RDD的持久化的數(shù)據(jù),比如從內(nèi)存中或者磁盤中,直接提取一份數(shù)據(jù)宵蛀。

1.3.3昆著、持久化,是可以進(jìn)行序列化的

如果正常將數(shù)據(jù)持久化在內(nèi)存中术陶,那么可能會(huì)導(dǎo)致內(nèi)存的占用過(guò)大凑懂,這樣的話,也許梧宫,會(huì)導(dǎo)致OOM內(nèi)存溢出接谨。

當(dāng)純內(nèi)存無(wú)法支撐公共RDD數(shù)據(jù)完全存放的時(shí)候,就優(yōu)先考慮使用序列化的方式在純內(nèi)存中存儲(chǔ)塘匣。將RDD的每個(gè)partition的數(shù)據(jù)脓豪,序列化成一個(gè)大的字節(jié)數(shù)組,就一個(gè)對(duì)象忌卤。序列化后扫夜,大大減少內(nèi)存的空間占用。

序列化的方式驰徊,唯一的缺點(diǎn)就是笤闯,在獲取數(shù)據(jù)的時(shí)候,需要反序列化棍厂。

如果序列化純內(nèi)存方式颗味,還是導(dǎo)致OOM內(nèi)存溢出,就只能考慮磁盤的方式牺弹、內(nèi)存+磁盤的普通方式(無(wú)序列化)浦马、內(nèi)存+磁盤(序列化)。

1.3.4例驹、為了數(shù)據(jù)的高可靠性捐韩,而且內(nèi)存充足,可以使用雙副本機(jī)制鹃锈,進(jìn)行持久化荤胁。

持久化的雙副本機(jī)制,持久化后的一個(gè)副本屎债,因?yàn)闄C(jī)器宕機(jī)了仅政,副本丟了,就還是得重新計(jì)算一次盆驹。持久化的每個(gè)數(shù)據(jù)單元圆丹,存儲(chǔ)一份副本,放在其他節(jié)點(diǎn)上面躯喇。從而進(jìn)行容錯(cuò)辫封。一個(gè)副本丟了硝枉,不用重新計(jì)算,還可以使用另外一份副本倦微。這種方式妻味,僅僅針對(duì)你的內(nèi)存資源極度充足的情況。

1.4欣福、 廣播變量

1.4.1责球、概念及需求

Spark Application(我們自己寫的Spark作業(yè))最開(kāi)始在Driver端,在我們提交任務(wù)的時(shí)候拓劝,需要傳遞到各個(gè)Executor的Task上運(yùn)行雏逾。對(duì)于一些只讀、固定的數(shù)據(jù)(比如從DB中讀出的數(shù)據(jù)),每次都需要Driver廣播到各個(gè)Task上郑临,這樣效率低下栖博。廣播變量允許將變量只廣播(提前廣播)給各個(gè)Executor。該Executor上的各個(gè)Task再?gòu)乃诠?jié)點(diǎn)的BlockManager獲取變量牧抵,如果本地沒(méi)有笛匙,那么就從Driver遠(yuǎn)程拉取變量副本,并保存在本地的BlockManager中犀变。此后這個(gè)executor上的task妹孙,都會(huì)直接使用本地的BlockManager中的副本。而不是從Driver獲取變量获枝,從而提升了效率蠢正。

一個(gè)Executor只需要在第一個(gè)Task啟動(dòng)時(shí),獲得一份Broadcast數(shù)據(jù)省店,之后的Task都從本節(jié)點(diǎn)的BlockManager中獲取相關(guān)數(shù)據(jù)嚣崭。/

1.4.2、使用方法

1)調(diào)用SparkContext.broadcast方法創(chuàng)建一個(gè)Broadcast[T]對(duì)象懦傍。任何序列化的類型都可以這么實(shí)現(xiàn)雹舀。

2)通過(guò)value屬性訪問(wèn)改對(duì)象的值(Java之中為value()方法)

3)變量只會(huì)被發(fā)送到各個(gè)節(jié)點(diǎn)一次,應(yīng)作為只讀值處理(修改這個(gè)值不會(huì)影響到別的節(jié)點(diǎn))

1.5粗俱、使用Kryo序列化

1.5.1说榆、概念及需求

默認(rèn)情況下,Spark內(nèi)部是使用Java的序列化機(jī)制寸认,ObjectOutputStream / ObjectInputStream签财,對(duì)象輸入輸出流機(jī)制,來(lái)進(jìn)行序列化偏塞。

這種默認(rèn)序列化機(jī)制的好處在于唱蒸,處理起來(lái)比較方便,也不需要我們手動(dòng)去做什么事情灸叼,只是神汹,你在算子里面使用的變量庆捺,必須是實(shí)現(xiàn)Serializable接口的,可序列化即可慎冤。

但是缺點(diǎn)在于疼燥,默認(rèn)的序列化機(jī)制的效率不高,序列化的速度比較慢蚁堤,序列化以后的數(shù)據(jù),占用的內(nèi)存空間相對(duì)還是比較大但狭。

Spark支持使用Kryo序列化機(jī)制披诗。這種序列化機(jī)制,比默認(rèn)的Java序列化機(jī)制速度要快立磁,序列化后的數(shù)據(jù)更小呈队,大概是Java序列化機(jī)制的1/10。

所以Kryo序列化優(yōu)化以后唱歧,可以讓網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)變少,在集群中耗費(fèi)的內(nèi)存資源大大減少颅崩。

1.5.2沿彭、Kryo序列化機(jī)制啟用以后生效的幾個(gè)地方

1)、算子函數(shù)中使用到的外部變量恨锚,使用Kryo以后:優(yōu)化網(wǎng)絡(luò)傳輸?shù)男阅苡罴荩梢詢?yōu)化集群中內(nèi)存的占用和消耗

2)、持久化RDD猴伶,優(yōu)化內(nèi)存的占用和消耗课舍。持久化RDD占用的內(nèi)存越少塌西,task執(zhí)行的時(shí)候,創(chuàng)建的對(duì)象筝尾,就不至于頻繁的占滿內(nèi)存捡需,頻繁發(fā)生GC。

3)筹淫、shuffle:可以優(yōu)化網(wǎng)絡(luò)傳輸?shù)男阅?/p>

1.5.3站辉、使用方法

第一步,在SparkConf中設(shè)置一個(gè)屬性损姜,spark.serializer饰剥,org.apache.spark.serializer.KryoSerializer類。

第二步摧阅,注冊(cè)你使用的需要通過(guò)Kryo序列化的一些自定義類汰蓉,SparkConf.registerKryoClasses()。

項(xiàng)目中的使用:

.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

.registerKryoClasses(newClass[]{CategorySortKey.class})

[if !supportLists]1.6棒卷、 [endif]使用fastutil優(yōu)化數(shù)據(jù)格式

1.6.1顾孽、fastutil介紹

fastutil是擴(kuò)展了Java標(biāo)準(zhǔn)集合框架(Map、List比规、Set若厚。HashMap、ArrayList苞俘、HashSet)的類庫(kù)盹沈,提供了特殊類型的map、set吃谣、list和queue乞封。

fastutil能夠提供更小的內(nèi)存占用,更快的存取速度岗憋。我們使用fastutil提供的集合類肃晚,來(lái)替代自己平時(shí)使用的JDK的原生的Map、List仔戈、Set关串,好處在于fastutil集合類可以減小內(nèi)存的占用,并且在進(jìn)行集合的遍歷监徘、根據(jù)索引(或者key)獲取元素的值和設(shè)置元素的值的時(shí)候晋修,提供更快的存取速度。

fastutil也提供了64位的array凰盔、set和list墓卦,以及高性能快速的,以及實(shí)用的IO類户敬,來(lái)處理二進(jìn)制和文本類型的文件落剪。

fastutil最新版本要求Java 7以及以上版本睁本。

fastutil的每一種集合類型,都實(shí)現(xiàn)了對(duì)應(yīng)的Java中的標(biāo)準(zhǔn)接口(比如fastutil的map忠怖,實(shí)現(xiàn)了Java的Map接口)呢堰,因此可以直接放入已有系統(tǒng)的任何代碼中。

fastutil還提供了一些JDK標(biāo)準(zhǔn)類庫(kù)中沒(méi)有的額外功能(比如雙向迭代器)凡泣。

fastutil除了對(duì)象和原始類型為元素的集合双炕,fastutil也提供引用類型的支持现喳,但是對(duì)引用類型是使用等于號(hào)(=)進(jìn)行比較的裙犹,而不是equals()方法阵谚。

fastutil盡量提供了在任何場(chǎng)景下都是速度最快的集合類庫(kù)。

1.6.2严卖、Spark中應(yīng)用fastutil的場(chǎng)景

1)、如果算子函數(shù)使用了外部變量布轿。第一哮笆,你可以使用Broadcast廣播變量?jī)?yōu)化。第二汰扭,可以使用Kryo序列化類庫(kù)稠肘,提升序列化性能和效率。第三萝毛,如果外部變量是某種比較大的集合项阴,那么可以考慮使用fastutil改寫外部變量,首先從源頭上就減少內(nèi)存的占用笆包,通過(guò)廣播變量進(jìn)一步減少內(nèi)存占用环揽,再通過(guò)Kryo序列化類庫(kù)進(jìn)一步減少內(nèi)存占用。

2)庵佣、在你的算子函數(shù)里歉胶,也就是task要執(zhí)行的計(jì)算邏輯里面,如果有邏輯中巴粪,出現(xiàn)通今,要?jiǎng)?chuàng)建比較大的Map、List等集合肛根,可能會(huì)占用較大的內(nèi)存空間辫塌,而且可能涉及到消耗性能的遍歷、存取等集合操作派哲,此時(shí)臼氨,可以考慮將這些集合類型使用fastutil類庫(kù)重寫,使用了fastutil集合類以后狮辽,就可以在一定程度上一也,減少task創(chuàng)建出來(lái)的集合類型的內(nèi)存占用巢寡。避免executor內(nèi)存頻繁占滿,頻繁喚起GC椰苟,導(dǎo)致性能下降抑月。

1.6.3、關(guān)于fastutil調(diào)優(yōu)的說(shuō)明

fastutil其實(shí)沒(méi)有你想象中的那么強(qiáng)大舆蝴,也不會(huì)跟官網(wǎng)上說(shuō)的效果那么一鳴驚人谦絮。廣播變量、Kryo序列化類庫(kù)洁仗、fastutil层皱,都是之前所說(shuō)的,對(duì)于性能來(lái)說(shuō)赠潦,類似于一種調(diào)味品叫胖,烤雞,本來(lái)就很好吃了她奥,然后加了一點(diǎn)特質(zhì)的孜然麻辣粉調(diào)料瓮增,就更加好吃了一點(diǎn)。分配資源哩俭、并行度绷跑、RDD架構(gòu)與持久化,這三個(gè)就是烤雞凡资。broadcast砸捏、kryo、fastutil隙赁,類似于調(diào)料垦藏。

比如說(shuō),你的spark作業(yè)鸳谜,經(jīng)過(guò)之前一些調(diào)優(yōu)以后膝藕,大概30分鐘運(yùn)行完,現(xiàn)在加上broadcast咐扭、kryo芭挽、fastutil,也許就是優(yōu)化到29分鐘運(yùn)行完蝗肪、或者更好一點(diǎn)袜爪,也許就是28分鐘、25分鐘薛闪。

shuffle調(diào)優(yōu)辛馆,15分鐘。groupByKey用reduceByKey改寫,執(zhí)行本地聚合昙篙,也許10分鐘腊状。跟公司申請(qǐng)更多的資源,比如資源更大的YARN隊(duì)列苔可,1分鐘缴挖。

1.6.4、fastutil的使用

在pom.xml中引用fastutil的包

???fastutil

???fastutil

???5.0.9

速度比較慢焚辅,可能是從國(guó)外的網(wǎng)去拉取jar包映屋,可能要等待5分鐘,甚至幾十分鐘同蜻,不等

List 相當(dāng)于IntList

基本都是類似于IntList的格式棚点,前綴就是集合的元素類型。特殊的就是Map湾蔓,Int2IntMap瘫析,代表了key-value映射的元素類型。除此之外默责,還支持object颁股、reference。

[if !supportLists]1.7傻丝、 [endif]調(diào)節(jié)數(shù)據(jù)本地化等待時(shí)長(zhǎng)

1.7.1、task的locality有五種

1)诉儒、PROCESS_LOCAL:進(jìn)程本地化葡缰,代碼和數(shù)據(jù)在同一個(gè)進(jìn)程中,也就是在同一個(gè)executor中忱反。計(jì)算數(shù)據(jù)的task由executor執(zhí)行泛释,數(shù)據(jù)在executor的BlockManager中,性能最好温算。

2)怜校、NODE_LOCAL:節(jié)點(diǎn)本地化,代碼和數(shù)據(jù)在同一個(gè)節(jié)點(diǎn)中注竿。比如說(shuō)茄茁,數(shù)據(jù)作為一個(gè)HDFS block塊,就在節(jié)點(diǎn)上巩割,而task在節(jié)點(diǎn)上某個(gè)executor中運(yùn)行裙顽,或者是,數(shù)據(jù)和task在一個(gè)節(jié)點(diǎn)上的不同executor中宣谈,數(shù)據(jù)需要在進(jìn)程間進(jìn)行傳輸愈犹。

3)、NO_PREF:對(duì)于task來(lái)說(shuō)闻丑,數(shù)據(jù)從哪里獲取都一樣漩怎,沒(méi)有好壞之分勋颖。

4)、RACK_LOCAL:機(jī)架本地化勋锤,數(shù)據(jù)和task在一個(gè)機(jī)架的兩個(gè)節(jié)點(diǎn)上饭玲,數(shù)據(jù)需要通過(guò)網(wǎng)絡(luò)在節(jié)點(diǎn)之間進(jìn)行傳輸。

5)怪得、ANY:數(shù)據(jù)和task可能在集群中的任何地方咱枉,而且不在一個(gè)機(jī)架中,性能最差徒恋。

1.7.2蚕断、Spark的任務(wù)調(diào)度

Spark在Driver上,對(duì)Application的每一個(gè)stage的task進(jìn)行分配之前都會(huì)計(jì)算出每個(gè)task要計(jì)算的是哪個(gè)分片數(shù)據(jù)入挣。Spark的task分配算法優(yōu)先會(huì)希望每個(gè)task正好分配到它要計(jì)算的數(shù)據(jù)所在的節(jié)點(diǎn)亿乳,這樣的話,就不用在網(wǎng)絡(luò)間傳輸數(shù)據(jù)径筏。

但是葛假,有時(shí)可能task沒(méi)有機(jī)會(huì)分配到它的數(shù)據(jù)所在的節(jié)點(diǎn)。為什么呢滋恬,可能那個(gè)節(jié)點(diǎn)的計(jì)算資源和計(jì)算能力都滿了聊训。所以這種時(shí)候, Spark會(huì)等待一段時(shí)間恢氯,默認(rèn)情況下是3s(不是絕對(duì)的带斑,還有很多種情況,對(duì)不同的本地化級(jí)別勋拟,都會(huì)去等待)勋磕,到最后,實(shí)在是等待不了了敢靡,就會(huì)選擇一個(gè)比較差的本地化級(jí)別挂滓。比如說(shuō),將task分配到靠它要計(jì)算的數(shù)據(jù)所在節(jié)點(diǎn)比較近的一個(gè)節(jié)點(diǎn)啸胧,然后進(jìn)行計(jì)算赶站。

但是對(duì)于第二種情況,通常來(lái)說(shuō)纺念,肯定是要發(fā)生數(shù)據(jù)傳輸亲怠,task會(huì)通過(guò)其所在節(jié)點(diǎn)的BlockManager來(lái)獲取數(shù)據(jù),BlockManager發(fā)現(xiàn)自己本地沒(méi)有數(shù)據(jù)柠辞,會(huì)通過(guò)一個(gè)getRemote()方法团秽,通過(guò)TransferService(網(wǎng)絡(luò)數(shù)據(jù)傳輸組件)從數(shù)據(jù)所在節(jié)點(diǎn)的BlockManager中,獲取數(shù)據(jù),通過(guò)網(wǎng)絡(luò)傳輸回task所在節(jié)點(diǎn)习勤。

對(duì)于我們來(lái)說(shuō)踪栋,當(dāng)然不希望是類似于第二種情況的了。最好的图毕,當(dāng)然是task和數(shù)據(jù)在一個(gè)節(jié)點(diǎn)上夷都,直接從本地executor的BlockManager中獲取數(shù)據(jù),純內(nèi)存予颤,或者帶一點(diǎn)磁盤IO囤官。如果要通過(guò)網(wǎng)絡(luò)傳輸數(shù)據(jù)的話,性能肯定會(huì)下降的蛤虐。大量網(wǎng)絡(luò)傳輸党饮,以及磁盤IO,都是性能的殺手驳庭。

1.7.3刑顺、我們什么時(shí)候要調(diào)節(jié)這個(gè)參數(shù)

觀察spark作業(yè)的運(yùn)行日志。推薦大家在測(cè)試的時(shí)候饲常,先用client模式在本地就直接可以看到比較全的日志蹲堂。日志里面會(huì)顯示:starting task…,PROCESS LOCAL贝淤、NODE LOCAL

觀察大部分task的數(shù)據(jù)本地化級(jí)別柒竞,如果大多都是PROCESS_LOCAL,那就不用調(diào)節(jié)了播聪。

如果是發(fā)現(xiàn)能犯,好多的級(jí)別都是NODE_LOCAL、ANY犬耻,那么最好就去調(diào)節(jié)一下數(shù)據(jù)本地化的等待時(shí)長(zhǎng)。要反復(fù)調(diào)節(jié)执泰,每次調(diào)節(jié)完以后再運(yùn)行并觀察日志枕磁,看看大部分的task的本地化級(jí)別有沒(méi)有提升,看看整個(gè)spark作業(yè)的運(yùn)行時(shí)間有沒(méi)有縮短术吝。注意计济,不要本末倒置,不要本地化級(jí)別是提升了排苍,但是因?yàn)榇罅康牡却龝r(shí)長(zhǎng)沦寂,spark作業(yè)的運(yùn)行時(shí)間反而增加了,那還是不要調(diào)節(jié)了淘衙。

1.7.4传藏、怎么調(diào)節(jié)

spark.locality.wait,默認(rèn)是3s。6s毯侦,10s

默認(rèn)情況下哭靖,下面3個(gè)的等待時(shí)長(zhǎng),都是跟上面那個(gè)是一樣的侈离,都是3s

spark.locality.wait.process

spark.locality.wait.node

spark.locality.wait.rack

newSparkConf().set("spark.locality.wait", "10")

2试幽、JVM調(diào)優(yōu)

堆內(nèi)存存放我們創(chuàng)建的一些對(duì)象,有老年代和年輕代卦碾。理想情況下铺坞,老年代都是放一些生命周期很長(zhǎng)的對(duì)象,數(shù)量應(yīng)該是很少的洲胖,比如數(shù)據(jù)庫(kù)連接池济榨。我們?cè)趕park task執(zhí)行算子函數(shù)(我們自己寫的),可能會(huì)創(chuàng)建很多對(duì)象宾濒,這些對(duì)象都是要放入JVM年輕代中的腿短。

每一次放對(duì)象的時(shí)候,都是放入eden區(qū)域绘梦,和其中一個(gè)survivor區(qū)域橘忱。另外一個(gè)survivor區(qū)域是空閑的。

當(dāng)eden區(qū)域和一個(gè)survivor區(qū)域放滿了以后(spark運(yùn)行過(guò)程中卸奉,產(chǎn)生的對(duì)象實(shí)在太多了)钝诚,就會(huì)觸發(fā)minor gc,小型垃圾回收榄棵。把不再使用的對(duì)象凝颇,從內(nèi)存中清空,給后面新創(chuàng)建的對(duì)象騰出來(lái)點(diǎn)兒地方疹鳄。

清理掉了不再使用的對(duì)象之后拧略,那么也會(huì)將存活下來(lái)的對(duì)象(還要繼續(xù)使用的),放入之前空閑的那一個(gè)survivor區(qū)域中瘪弓。這里可能會(huì)出現(xiàn)一個(gè)問(wèn)題垫蛆。默認(rèn)eden、survior1和survivor2的內(nèi)存占比是8:1:1腺怯。問(wèn)題是袱饭,如果存活下來(lái)的對(duì)象是1.5,一個(gè)survivor區(qū)域放不下呛占。此時(shí)就可能通過(guò)JVM的擔(dān)保機(jī)制(不同JVM版本可能對(duì)應(yīng)的行為)虑乖,將多余的對(duì)象,直接放入老年代了晾虑。

如果你的JVM內(nèi)存不夠大的話疹味,可能導(dǎo)致頻繁的年輕代內(nèi)存滿溢仅叫,頻繁的進(jìn)行minor gc。頻繁的minor gc會(huì)導(dǎo)致短時(shí)間內(nèi)佛猛,有些存活的對(duì)象惑芭,多次垃圾回收都沒(méi)有回收掉。會(huì)導(dǎo)致這種短生命周期(其實(shí)不一定是要長(zhǎng)期使用的)對(duì)象继找,年齡過(guò)大遂跟,垃圾回收次數(shù)太多還沒(méi)有回收到,跑到老年代婴渡。

老年代中幻锁,可能會(huì)因?yàn)閮?nèi)存不足,囤積一大堆边臼,短生命周期的哄尔,本來(lái)應(yīng)該在年輕代中的,可能馬上就要被回收掉的對(duì)象柠并。此時(shí)岭接,可能導(dǎo)致老年代頻繁滿溢。頻繁進(jìn)行full gc(全局/全面垃圾回收)臼予。full gc就會(huì)去回收老年代中的對(duì)象鸣戴。full gc由于這個(gè)算法的設(shè)計(jì),是針對(duì)的是粘拾,老年代中的對(duì)象數(shù)量很少窄锅,滿溢進(jìn)行full gc的頻率應(yīng)該很少,因此采取了不太復(fù)雜缰雇,但是耗費(fèi)性能和時(shí)間的垃圾回收算法入偷。full gc很慢。

full gc / minor gc械哟,無(wú)論是快疏之,還是慢,都會(huì)導(dǎo)致jvm的工作線程停止工作暇咆,stop the world锋爪。簡(jiǎn)而言之,就是說(shuō)糯崎,gc的時(shí)候,spark停止工作了河泳。等著垃圾回收結(jié)束沃呢。

內(nèi)存不充足的時(shí)候,出現(xiàn)的問(wèn)題:

1拆挥、頻繁minor gc薄霜,也會(huì)導(dǎo)致頻繁spark停止工作

2某抓、老年代囤積大量活躍對(duì)象(短生命周期的對(duì)象),導(dǎo)致頻繁full gc惰瓜,full gc時(shí)間很長(zhǎng)否副,短則數(shù)十秒坷虑,長(zhǎng)則數(shù)分鐘索绪,甚至數(shù)小時(shí)复颈≡赶眨可能導(dǎo)致spark長(zhǎng)時(shí)間停止工作碧聪。

3厦章、嚴(yán)重影響咱們的spark的性能和運(yùn)行的速度雀费。

2.1比肄、降低cache操作的內(nèi)存占比

spark中男翰,堆內(nèi)存又被劃分成了兩塊另患,一塊是專門用來(lái)給RDD的cache、persist操作進(jìn)行RDD數(shù)據(jù)緩存用的蛾绎。另外一塊用來(lái)給spark算子函數(shù)的運(yùn)行使用的昆箕,存放函數(shù)中自己創(chuàng)建的對(duì)象。

默認(rèn)情況下租冠,給RDD cache操作的內(nèi)存占比鹏倘,是0.6,60%的內(nèi)存都給了cache操作了肺稀。但是問(wèn)題是第股,如果某些情況下cache不是那么的緊張,問(wèn)題在于task算子函數(shù)中創(chuàng)建的對(duì)象過(guò)多话原,然后內(nèi)存又不太大夕吻,導(dǎo)致了頻繁的minor gc,甚至頻繁full gc繁仁,導(dǎo)致spark頻繁的停止工作涉馅。性能影響會(huì)很大。

針對(duì)上述這種情況黄虱,可以在任務(wù)運(yùn)行界面稚矿,去查看你的spark作業(yè)的運(yùn)行統(tǒng)計(jì),可以看到每個(gè)stage的運(yùn)行情況捻浦,包括每個(gè)task的運(yùn)行時(shí)間晤揣、gc時(shí)間等等。如果發(fā)現(xiàn)gc太頻繁朱灿,時(shí)間太長(zhǎng)昧识。此時(shí)就可以適當(dāng)調(diào)價(jià)這個(gè)比例。

降低cache操作的內(nèi)存占比盗扒,大不了用persist操作跪楞,選擇將一部分緩存的RDD數(shù)據(jù)寫入磁盤缀去,或者序列化方式,配合Kryo序列化類甸祭,減少RDD緩存的內(nèi)存占用缕碎。降低cache操作內(nèi)存占比,對(duì)應(yīng)的池户,算子函數(shù)的內(nèi)存占比就提升了咏雌。這個(gè)時(shí)候,可能就可以減少minor gc的頻率煞檩,同時(shí)減少full gc的頻率处嫌。對(duì)性能的提升是有一定的幫助的。

一句話斟湃,讓task執(zhí)行算子函數(shù)時(shí)熏迹,有更多的內(nèi)存可以使用。

spark.storage.memoryFraction凝赛,0.6 ->0.5 -> 0.4 -> 0.2


2.2注暗、調(diào)節(jié)executor堆外內(nèi)存與連接等待時(shí)長(zhǎng)

調(diào)節(jié)executor堆外內(nèi)存

有時(shí)候,如果你的spark作業(yè)處理的數(shù)據(jù)量特別大墓猎,幾億數(shù)據(jù)量捆昏。然后spark作業(yè)一運(yùn)行,時(shí)不時(shí)的報(bào)錯(cuò)毙沾,shuffle file cannot find骗卜,executor、task lost左胞,out of

memory(內(nèi)存溢出)寇仓。

可能是executor的堆外內(nèi)存不太夠用,導(dǎo)致executor在運(yùn)行的過(guò)程中烤宙,可能會(huì)內(nèi)存溢出遍烦,可能導(dǎo)致后續(xù)的stage的task在運(yùn)行的時(shí)候,要從一些executor中去拉取shuffle map output文件躺枕,但是executor可能已經(jīng)掛掉了服猪,關(guān)聯(lián)的block

manager也沒(méi)有了。所以會(huì)報(bào)shuffle output file not found拐云,resubmitting

task罢猪,executor lost。spark作業(yè)徹底崩潰叉瘩。

上述情況下膳帕,就可以去考慮調(diào)節(jié)一下executor的堆外內(nèi)存。也許就可以避免報(bào)錯(cuò)房揭。此外备闲,有時(shí)堆外內(nèi)存調(diào)節(jié)的比較大的時(shí)候,對(duì)于性能來(lái)說(shuō)捅暴,也會(huì)帶來(lái)一定的提升恬砂。

可以調(diào)節(jié)堆外內(nèi)存的上限:

--conf spark.yarn.executor.memoryOverhead=2048

spark-submit腳本里面,去用--conf的方式蓬痒,去添加配置泻骤。用new

SparkConf().set()這種方式去設(shè)置是沒(méi)有用的!一定要在spark-submit腳本中去設(shè)置梧奢。

spark.yarn.executor.memoryOverhead(看名字狱掂,顧名思義,針對(duì)的是基于yarn的提交模式)

默認(rèn)情況下亲轨,這個(gè)堆外內(nèi)存上限大概是300M趋惨。通常在項(xiàng)目中,真正處理大數(shù)據(jù)的時(shí)候惦蚊,這里都會(huì)出現(xiàn)問(wèn)題器虾,導(dǎo)致spark作業(yè)反復(fù)崩潰,無(wú)法運(yùn)行蹦锋。此時(shí)就會(huì)去調(diào)節(jié)這個(gè)參數(shù)兆沙,到至少1G(1024M),甚至說(shuō)2G莉掂、4G葛圃。

通常這個(gè)參數(shù)調(diào)節(jié)上去以后,就會(huì)避免掉某些JVM OOM的異常問(wèn)題憎妙,同時(shí)呢库正,會(huì)讓整體spark作業(yè)的性能,得到較大的提升尚氛。


調(diào)節(jié)連接等待時(shí)長(zhǎng)

我們知道诀诊,executor會(huì)優(yōu)先從自己本地關(guān)聯(lián)的BlockManager中獲取某份數(shù)據(jù)。如果本地block manager沒(méi)有的話阅嘶,那么會(huì)通過(guò)TransferService属瓣,去遠(yuǎn)程連接其他節(jié)點(diǎn)上executor的block manager去獲取。

而此時(shí)上面executor去遠(yuǎn)程連接的那個(gè)executor讯柔,因?yàn)閠ask創(chuàng)建的對(duì)象特別大抡蛙,特別多,

頻繁的讓JVM堆內(nèi)存滿溢魂迄,正在進(jìn)行垃圾回收粗截。而處于垃圾回收過(guò)程中,所有的工作線程全部停止捣炬,相當(dāng)于只要一旦進(jìn)行垃圾回收熊昌,spark / executor停止工作绽榛,無(wú)法提供響應(yīng)。

此時(shí)呢婿屹,就會(huì)沒(méi)有響應(yīng)灭美,無(wú)法建立網(wǎng)絡(luò)連接,會(huì)卡住昂利。spark默認(rèn)的網(wǎng)絡(luò)連接的超時(shí)時(shí)長(zhǎng)届腐,是60s,如果卡住60s都無(wú)法建立連接的話蜂奸,那么就宣告失敗了犁苏。

報(bào)錯(cuò)幾次,幾次都拉取不到數(shù)據(jù)的話扩所,可能會(huì)導(dǎo)致spark作業(yè)的崩潰围详。也可能會(huì)導(dǎo)致DAGScheduler,反復(fù)提交幾次stage祖屏。TaskScheduler反復(fù)提交幾次task短曾。大大延長(zhǎng)我們的spark作業(yè)的運(yùn)行時(shí)間。

可以考慮調(diào)節(jié)連接的超時(shí)時(shí)長(zhǎng):

--conf spark.core.connection.ack.wait.timeout=300

spark-submit腳本赐劣,切記嫉拐,不是在new

SparkConf().set()這種方式來(lái)設(shè)置的。

spark.core.connection.ack.wait.timeout(spark core魁兼,connection婉徘,連接,ack咐汞,wait timeout盖呼,建立不上連接的時(shí)候,超時(shí)等待時(shí)長(zhǎng))

調(diào)節(jié)這個(gè)值比較大以后化撕,通常來(lái)說(shuō)几晤,可以避免部分的偶爾出現(xiàn)的某某文件拉取失敗,某某文件lost掉了植阴。

3蟹瘾、Shuffle調(diào)優(yōu)

原理概述:

什么樣的情況下,會(huì)發(fā)生shuffle掠手?

在spark中憾朴,主要是以下幾個(gè)算子:groupByKey、reduceByKey喷鸽、countByKey众雷、join,等等。

什么是shuffle砾省?

groupByKey鸡岗,要把分布在集群各個(gè)節(jié)點(diǎn)上的數(shù)據(jù)中的同一個(gè)key,對(duì)應(yīng)的values编兄,都要集中到一塊兒纤房,集中到集群中同一個(gè)節(jié)點(diǎn)上,更嚴(yán)密一點(diǎn)說(shuō)翻诉,就是集中到一個(gè)節(jié)點(diǎn)的一個(gè)executor的一個(gè)task中。

然后呢捌刮,集中一個(gè)key對(duì)應(yīng)的values之后碰煌,才能交給我們來(lái)進(jìn)行處理,>绅作。reduceByKey芦圾,算子函數(shù)去對(duì)values集合進(jìn)行reduce操作,最后變成一個(gè)value俄认。countByKey需要在一個(gè)task中个少,獲取到一個(gè)key對(duì)應(yīng)的所有的value,然后進(jìn)行計(jì)數(shù)眯杏,統(tǒng)計(jì)一共有多少個(gè)value夜焦。join,RDD岂贩,RDD

value>茫经,只要是兩個(gè)RDD中,key相同對(duì)應(yīng)的2個(gè)value萎津,都能到一個(gè)節(jié)點(diǎn)的executor的task中卸伞,給我們進(jìn)行處理。

shuffle锉屈,一定是分為兩個(gè)stage來(lái)完成的荤傲。因?yàn)檫@其實(shí)是個(gè)逆向的過(guò)程,不是stage決定shuffle颈渊,是shuffle決定stage遂黍。

reduceByKey(_+_),在某個(gè)action觸發(fā)job的時(shí)候俊嗽,DAGScheduler妓湘,會(huì)負(fù)責(zé)劃分job為多個(gè)stage。劃分的依據(jù)乌询,就是榜贴,如果發(fā)現(xiàn)有會(huì)觸發(fā)shuffle操作的算子,比如reduceByKey,就將這個(gè)操作的前半部分唬党,以及之前所有的RDD和transformation操作鹃共,劃分為一個(gè)stage。shuffle操作的后半部分驶拱,以及后面的霜浴,直到action為止的RDD和transformation操作,劃分為另外一個(gè)stage蓝纲。


3.1阴孟、合并map端輸出文件

3.1.1、如果不合并map端輸出文件的話税迷,會(huì)怎么樣永丝?

舉例實(shí)際生產(chǎn)環(huán)境的條件:

100個(gè)節(jié)點(diǎn)(每個(gè)節(jié)點(diǎn)一個(gè)executor):100個(gè)executor

每個(gè)executor:2個(gè)cpu core

總共1000個(gè)task:每個(gè)executor平均10個(gè)task

每個(gè)節(jié)點(diǎn),10個(gè)task箭养,每個(gè)節(jié)點(diǎn)會(huì)輸出多少份map端文件慕嚷?10 * 1000=1萬(wàn)個(gè)文件

總共有多少份map端輸出文件?100 * 10000 = 100萬(wàn)毕泌。

第一個(gè)stage喝检,每個(gè)task,都會(huì)給第二個(gè)stage的每個(gè)task創(chuàng)建一份map端的輸出文件

第二個(gè)stage撼泛,每個(gè)task挠说,會(huì)到各個(gè)節(jié)點(diǎn)上面去,拉取第一個(gè)stage每個(gè)task輸出的愿题,屬于自己的那一份文件纺涤。

shuffle中的寫磁盤的操作,基本上就是shuffle中性能消耗最為嚴(yán)重的部分抠忘。

通過(guò)上面的分析撩炊,一個(gè)普通的生產(chǎn)環(huán)境的spark job的一個(gè)shuffle環(huán)節(jié),會(huì)寫入磁盤100萬(wàn)個(gè)文件崎脉。

磁盤IO對(duì)性能和spark作業(yè)執(zhí)行速度的影響拧咳,是極其驚人和嚇人的。

基本上囚灼,spark作業(yè)的性能骆膝,都消耗在shuffle中了,雖然不只是shuffle的map端輸出文件這一個(gè)部分灶体,但是這里也是非常大的一個(gè)性能消耗點(diǎn)阅签。

3.1.2、開(kāi)啟shuffle map端輸出文件合并的機(jī)制

new SparkConf().set("spark.shuffle.consolidateFiles","true")

默認(rèn)情況下蝎抽,是不開(kāi)啟的政钟,就是會(huì)發(fā)生如上所述的大量map端輸出文件的操作,嚴(yán)重影響性能。

3.1.3养交、合并map端輸出文件精算,對(duì)咱們的spark的性能有哪些方面的影響呢?

1碎连、map task寫入磁盤文件的IO灰羽,減少:100萬(wàn)文件-> 20萬(wàn)文件

2、第二個(gè)stage鱼辙,原本要拉取第一個(gè)stage的task數(shù)量份文件廉嚼,1000個(gè)task,第二個(gè)stage的每個(gè)task倒戏,都要拉取1000份文件怠噪,走網(wǎng)絡(luò)傳輸。合并以后峭梳,100個(gè)節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)2個(gè)cpu core蹂喻,第二個(gè)stage的每個(gè)task葱椭,主要拉取100 * 2 = 200個(gè)文件即可。此時(shí)網(wǎng)絡(luò)傳輸?shù)男阅芟囊泊蟠鬁p少口四。

分享一下孵运,實(shí)際在生產(chǎn)環(huán)境中,使用了spark.shuffle.consolidateFiles機(jī)制以后蔓彩,實(shí)際的性能調(diào)優(yōu)的效果:對(duì)于上述的這種生產(chǎn)環(huán)境的配置治笨,性能的提升,還是相當(dāng)?shù)目捎^的赤嚼。spark作業(yè)旷赖,5個(gè)小時(shí)-> 2~3個(gè)小時(shí)。

大家不要小看這個(gè)map端輸出文件合并機(jī)制更卒。實(shí)際上等孵,在數(shù)據(jù)量比較大,你自己本身做了前面的性能調(diào)優(yōu)蹂空,executor上去->cpu core上去->并行度(task數(shù)量)上去俯萌,shuffle沒(méi)調(diào)優(yōu),shuffle就很糟糕了上枕。大量的map端輸出文件的產(chǎn)生咐熙,對(duì)性能有比較惡劣的影響。

這個(gè)時(shí)候辨萍,去開(kāi)啟這個(gè)機(jī)制棋恼,可以很有效的提升性能。

3.2、調(diào)節(jié)map端內(nèi)存緩沖與reduce端內(nèi)存占比

3.2.1蘸泻、默認(rèn)情況下可能出現(xiàn)的問(wèn)題

默認(rèn)情況下琉苇,shuffle的map task,輸出到磁盤文件的時(shí)候悦施,統(tǒng)一都會(huì)先寫入每個(gè)task自己關(guān)聯(lián)的一個(gè)內(nèi)存緩沖區(qū)并扇。

這個(gè)緩沖區(qū)大小,默認(rèn)是32kb抡诞。

每一次穷蛹,當(dāng)內(nèi)存緩沖區(qū)滿溢之后议谷,才會(huì)進(jìn)行spill溢寫操作俘陷,溢寫到磁盤文件中去。

reduce端task硝皂,在拉取到數(shù)據(jù)之后顷窒,會(huì)用hashmap的數(shù)據(jù)格式蛙吏,來(lái)對(duì)各個(gè)key對(duì)應(yīng)的values進(jìn)行匯聚。

針對(duì)每個(gè)key對(duì)應(yīng)的values鞋吉,執(zhí)行我們自定義的聚合函數(shù)的代碼鸦做,比如_ + _(把所有values累加起來(lái))。

reduce task谓着,在進(jìn)行匯聚泼诱、聚合等操作的時(shí)候,實(shí)際上赊锚,使用的就是自己對(duì)應(yīng)的executor的內(nèi)存治筒,executor(jvm進(jìn)程,堆)舷蒲,默認(rèn)executor內(nèi)存中劃分給reduce task進(jìn)行聚合的比例是0.2耸袜。

問(wèn)題來(lái)了,因?yàn)楸壤?.2牲平,所以句灌,理論上,很有可能會(huì)出現(xiàn)欠拾,拉取過(guò)來(lái)的數(shù)據(jù)很多胰锌,那么在內(nèi)存中,放不下藐窄。這個(gè)時(shí)候资昧,默認(rèn)的行為就是將在內(nèi)存放不下的數(shù)據(jù)都spill(溢寫)到磁盤文件中去。

在數(shù)據(jù)量比較大的情況下荆忍,可能頻繁地發(fā)生reduce端的磁盤文件的讀寫格带。

3.2.2撤缴、調(diào)優(yōu)方式

調(diào)節(jié)map task內(nèi)存緩沖:spark.shuffle.file.buffer,默認(rèn)32k(spark 1.3.x不是這個(gè)參數(shù)叽唱,后面還有一個(gè)后綴屈呕,kb。spark 1.5.x以后棺亭,變了虎眨,就是現(xiàn)在這個(gè)參數(shù))

調(diào)節(jié)reduce端聚合內(nèi)存占比:spark.shuffle.memoryFraction,0.2

3.2.3镶摘、在實(shí)際生產(chǎn)環(huán)境中嗽桩,我們?cè)谑裁磿r(shí)候來(lái)調(diào)節(jié)兩個(gè)參數(shù)?

看Spark UI凄敢,如果你的公司是決定采用standalone模式碌冶,那么狠簡(jiǎn)單,你的spark跑起來(lái)涝缝,會(huì)顯示一個(gè)Spark UI的地址扑庞,4040的端口。進(jìn)去觀察每個(gè)stage的詳情拒逮,有哪些executor罐氨,有哪些task,每個(gè)task的shuffle write和shuffle read的量消恍,shuffle的磁盤和內(nèi)存讀寫的數(shù)據(jù)量岂昭。如果是用的yarn模式來(lái)提交以现,從yarn的界面進(jìn)去狠怨,點(diǎn)擊對(duì)應(yīng)的application,進(jìn)入Spark UI邑遏,查看詳情佣赖。

如果發(fā)現(xiàn)shuffle 磁盤的write和read,很大记盒。這個(gè)時(shí)候憎蛤,就意味著最好調(diào)節(jié)一些shuffle的參數(shù)。首先當(dāng)然是考慮開(kāi)啟map端輸出文件合并機(jī)制纪吮。其次調(diào)節(jié)上面說(shuō)的那兩個(gè)參數(shù)俩檬。調(diào)節(jié)的時(shí)候的原則:spark.shuffle.file.buffer每次擴(kuò)大一倍,然后看看效果碾盟,64棚辽,128。spark.shuffle.memoryFraction冰肴,每次提高0.1屈藐,看看效果榔组。

不能調(diào)節(jié)的太大,太大了以后過(guò)猶不及联逻,因?yàn)閮?nèi)存資源是有限的搓扯,你這里調(diào)節(jié)的太大了,其他環(huán)節(jié)的內(nèi)存使用就會(huì)有問(wèn)題了包归。

3.2.4锨推、調(diào)節(jié)以后的效果

map task內(nèi)存緩沖變大了,減少spill到磁盤文件的次數(shù)箫踩。reduce端聚合內(nèi)存變大了爱态,減少spill到磁盤的次數(shù),而且減少了后面聚合讀取磁盤文件的數(shù)量境钟。


3.3锦担、HashShuffleManager與SortShuffleManager

3.3.1、shuffle調(diào)優(yōu)概述

大多數(shù)Spark作業(yè)的性能主要就是消耗在了shuffle環(huán)節(jié)慨削,因?yàn)樵摥h(huán)節(jié)包含了大量的磁盤IO洞渔、序列化、網(wǎng)絡(luò)數(shù)據(jù)傳輸?shù)炔僮鞲刻R虼舜沤罚绻屪鳂I(yè)的性能更上一層樓,就有必要對(duì)shuffle過(guò)程進(jìn)行調(diào)優(yōu)玫芦。但是也必須提醒大家的是浆熔,影響一個(gè)Spark作業(yè)性能的因素,主要還是代碼開(kāi)發(fā)桥帆、資源參數(shù)以及數(shù)據(jù)傾斜医增,shuffle調(diào)優(yōu)只能在整個(gè)Spark的性能調(diào)優(yōu)中占到一小部分而已。因此大家務(wù)必把握住調(diào)優(yōu)的基本原則老虫,千萬(wàn)不要舍本逐末叶骨。下面我們就給大家詳細(xì)講解shuffle的原理,以及相關(guān)參數(shù)的說(shuō)明祈匙,同時(shí)給出各個(gè)參數(shù)的調(diào)優(yōu)建議忽刽。

3.3.2、ShuffleManager發(fā)展概述

在Spark的源碼中夺欲,負(fù)責(zé)shuffle過(guò)程的執(zhí)行跪帝、計(jì)算和處理的組件主要就是ShuffleManager,也即shuffle管理器些阅。

在Spark 1.2以前伞剑,默認(rèn)的shuffle計(jì)算引擎是HashShuffleManager。該ShuffleManager而HashShuffleManager有著一個(gè)非常嚴(yán)重的弊端扑眉,就是會(huì)產(chǎn)生大量的中間磁盤文件纸泄,進(jìn)而由大量的磁盤IO操作影響了性能赖钞。

因此在Spark 1.2以后的版本中,默認(rèn)的ShuffleManager改成了SortShuffleManager聘裁。SortShuffleManager相較于HashShuffleManager來(lái)說(shuō)雪营,有了一定的改進(jìn)。主要就在于衡便,每個(gè)Task在進(jìn)行shuffle操作時(shí)献起,雖然也會(huì)產(chǎn)生較多的臨時(shí)磁盤文件,但是最后會(huì)將所有的臨時(shí)文件合并(merge)成一個(gè)磁盤文件镣陕,因此每個(gè)Task就只有一個(gè)磁盤文件谴餐。在下一個(gè)stage的shuffle read task拉取自己的數(shù)據(jù)時(shí),只要根據(jù)索引讀取每個(gè)磁盤文件中的部分?jǐn)?shù)據(jù)即可呆抑。

在spark 1.5.x以后岂嗓,對(duì)于shuffle manager又出來(lái)了一種新的manager,tungsten-sort(鎢絲)鹊碍,鎢絲sort shuffle manager厌殉。官網(wǎng)上一般說(shuō),鎢絲sort

shuffle manager侈咕,效果跟sort shuffle manager是差不多的公罕。

但是,唯一的不同之處在于耀销,鎢絲manager楼眷,是使用了自己實(shí)現(xiàn)的一套內(nèi)存管理機(jī)制,性能上有很大的提升熊尉,而且可以避免shuffle過(guò)程中產(chǎn)生的大量的OOM罐柳,GC,等等內(nèi)存相關(guān)的異常帽揪。

3.3.3硝清、hash辅斟、sort转晰、tungsten-sort。如何來(lái)選擇士飒?

1查邢、需不需要數(shù)據(jù)默認(rèn)就讓spark給你進(jìn)行排序?就好像mapreduce酵幕,默認(rèn)就是有按照key的排序扰藕。如果不需要的話,其實(shí)還是建議搭建就使用最基本的HashShuffleManager芳撒,因?yàn)樽铋_(kāi)始就是考慮的是不排序邓深,換取高性能未桥。

2、什么時(shí)候需要用sort shuffle

manager芥备?如果你需要你的那些數(shù)據(jù)按key排序了冬耿,那么就選擇這種吧,而且要注意萌壳,reduce task的數(shù)量應(yīng)該是超過(guò)200的亦镶,這樣sort、merge(多個(gè)文件合并成一個(gè))的機(jī)制袱瓮,才能生效把缤骨。但是這里要注意,你一定要自己考量一下尺借,有沒(méi)有必要在shuffle的過(guò)程中绊起,就做這個(gè)事情,畢竟對(duì)性能是有影響的燎斩。

3勒庄、如果你不需要排序,而且你希望你的每個(gè)task輸出的文件最終是會(huì)合并成一份的瘫里,你自己認(rèn)為可以減少性能開(kāi)銷实蔽。可以去調(diào)節(jié)bypassMergeThreshold這個(gè)閾值谨读,比如你的reduce task數(shù)量是500局装,默認(rèn)閾值是200,所以默認(rèn)還是會(huì)進(jìn)行sort和直接merge的劳殖☆砩校可以將閾值調(diào)節(jié)成550,不會(huì)進(jìn)行sort哆姻,按照hash的做法宣增,每個(gè)reduce

task創(chuàng)建一份輸出文件,最后合并成一份文件矛缨。(一定要提醒大家爹脾,這個(gè)參數(shù),其實(shí)我們通常不會(huì)在生產(chǎn)環(huán)境里去使用箕昭,也沒(méi)有經(jīng)過(guò)驗(yàn)證說(shuō)灵妨,這樣的方式,到底有多少性能的提升)

4落竹、如果你想選用sort based

shuffle manager泌霍,而且你們公司的spark版本比較高,是1.5.x版本的述召,那么可以考慮去嘗試使用tungsten-sort shuffle manager朱转⌒返兀看看性能的提升與穩(wěn)定性怎么樣。

總結(jié):

1藤为、在生產(chǎn)環(huán)境中锈津,不建議大家貿(mào)然使用第三點(diǎn)和第四點(diǎn):

2、如果你不想要你的數(shù)據(jù)在shuffle時(shí)排序凉蜂,那么就自己設(shè)置一下琼梆,用hash shuffle manager。

3窿吩、如果你的確是需要你的數(shù)據(jù)在shuffle時(shí)進(jìn)行排序的茎杂,那么就默認(rèn)不用動(dòng),默認(rèn)就是sort shuffle manager纫雁』屯或者是什么?如果你壓根兒不care是否排序這個(gè)事兒轧邪,那么就默認(rèn)讓他就是sort的刽脖。調(diào)節(jié)一些其他的參數(shù)(consolidation機(jī)制)。(80%忌愚,都是用這種)

spark.shuffle.manager:hash曲管、sort、tungsten-sort

spark.shuffle.sort.bypassMergeThreshold:200硕糊。自己可以設(shè)定一個(gè)閾值院水,默認(rèn)是200,當(dāng)reduce task數(shù)量少于等于200简十,map task創(chuàng)建的輸出文件小于等于200的檬某,最后會(huì)將所有的輸出文件合并為一份文件。這樣做的好處螟蝙,就是避免了sort排序恢恼,節(jié)省了性能開(kāi)銷,而且還能將多個(gè)reduce task的文件合并成一份文件胰默,節(jié)省了reduce task拉取數(shù)據(jù)的時(shí)候的磁盤IO的開(kāi)銷场斑。

4、算子調(diào)優(yōu)

4.1初坠、MapPartitions提升Map類操作性能

spark中和簸,最基本的原則彭雾,就是每個(gè)task處理一個(gè)RDD的partition碟刺。

4.1.1、MapPartitions的優(yōu)缺點(diǎn)

MapPartitions操作的優(yōu)點(diǎn):

如果是普通的map薯酝,比如一個(gè)partition中有1萬(wàn)條數(shù)據(jù)半沽。ok爽柒,那么你的function要執(zhí)行和計(jì)算1萬(wàn)次。

但是者填,使用MapPartitions操作之后浩村,一個(gè)task僅僅會(huì)執(zhí)行一次function,function一次接收所有的partition數(shù)據(jù)占哟。只要執(zhí)行一次就可以了心墅,性能比較高。

MapPartitions的缺點(diǎn):

如果是普通的map操作榨乎,一次function的執(zhí)行就處理一條數(shù)據(jù)怎燥。那么如果內(nèi)存不夠用的情況下,比如處理了1千條數(shù)據(jù)了蜜暑,那么這個(gè)時(shí)候內(nèi)存不夠了铐姚,那么就可以將已經(jīng)處理完的1千條數(shù)據(jù)從內(nèi)存里面垃圾回收掉,或者用其他方法肛捍,騰出空間來(lái)吧隐绵。

所以說(shuō)普通的map操作通常不會(huì)導(dǎo)致內(nèi)存的OOM異常。

但是MapPartitions操作拙毫,對(duì)于大量數(shù)據(jù)來(lái)說(shuō)依许,比如甚至一個(gè)partition,100萬(wàn)數(shù)據(jù)缀蹄,一次傳入一個(gè)function以后悍手,那么可能一下子內(nèi)存不夠,但是又沒(méi)有辦法去騰出內(nèi)存空間來(lái)袍患,可能就OOM坦康,內(nèi)存溢出。

4.1.2诡延、MapPartitions使用場(chǎng)景

當(dāng)分析的數(shù)據(jù)量不是特別大的時(shí)候滞欠,都可以用這種MapPartitions系列操作,性能還是非常不錯(cuò)的肆良,是有提升的筛璧。比如原來(lái)是15分鐘,(曾經(jīng)有一次性能調(diào)優(yōu))惹恃,12分鐘夭谤。10分鐘->9分鐘。

但是也有過(guò)出問(wèn)題的經(jīng)驗(yàn)巫糙,MapPartitions只要一用朗儒,直接OOM,內(nèi)存溢出,崩潰醉锄。

在項(xiàng)目中乏悄,自己先去估算一下RDD的數(shù)據(jù)量,以及每個(gè)partition的量恳不,還有自己分配給每個(gè)executor的內(nèi)存資源檩小。看看一下子內(nèi)存容納所有的partition數(shù)據(jù)行不行烟勋。如果行规求,可以試一下,能跑通就好卵惦。性能肯定是有提升的颓哮。但是試了以后,發(fā)現(xiàn)OOM了鸵荠,那就放棄吧冕茅。

4.2、filter過(guò)后使用coalesce減少分區(qū)數(shù)量

4.2.1蛹找、出現(xiàn)問(wèn)題

默認(rèn)情況下姨伤,經(jīng)過(guò)了filter之后,RDD中的每個(gè)partition的數(shù)據(jù)量庸疾,可能都不太一樣了乍楚。(原本每個(gè)partition的數(shù)據(jù)量可能是差不多的)

可能出現(xiàn)的問(wèn)題:

1、每個(gè)partition數(shù)據(jù)量變少了届慈,但是在后面進(jìn)行處理的時(shí)候徒溪,還是要跟partition數(shù)量一樣數(shù)量的task,來(lái)進(jìn)行處理金顿,有點(diǎn)浪費(fèi)task計(jì)算資源臊泌。

2、每個(gè)partition的數(shù)據(jù)量不一樣揍拆,會(huì)導(dǎo)致后面的每個(gè)task處理每個(gè)partition的時(shí)候矗钟,每個(gè)task要處理的數(shù)據(jù)量就不同锰提,這樣就會(huì)導(dǎo)致有些task運(yùn)行的速度很快豆赏,有些task運(yùn)行的速度很慢杠园。這就是數(shù)據(jù)傾斜。

針對(duì)上述的兩個(gè)問(wèn)題筒狠,我們希望應(yīng)該能夠怎么樣猪狈?

1、針對(duì)第一個(gè)問(wèn)題辩恼,我們希望可以進(jìn)行partition的壓縮吧雇庙,因?yàn)閿?shù)據(jù)量變少了谓形,那么partition其實(shí)也完全可以對(duì)應(yīng)的變少。比如原來(lái)是4個(gè)partition状共,現(xiàn)在完全可以變成2個(gè)partition套耕。那么就只要用后面的2個(gè)task來(lái)處理即可谁帕。就不會(huì)造成task計(jì)算資源的浪費(fèi)峡继。(不必要,針對(duì)只有一點(diǎn)點(diǎn)數(shù)據(jù)的partition匈挖,還去啟動(dòng)一個(gè)task來(lái)計(jì)算)

2碾牌、針對(duì)第二個(gè)問(wèn)題,其實(shí)解決方案跟第一個(gè)問(wèn)題是一樣的儡循,也是去壓縮partition舶吗,盡量讓每個(gè)partition的數(shù)據(jù)量差不多。那么后面的task分配到的partition的數(shù)據(jù)量也就差不多择膝。不會(huì)造成有的task運(yùn)行速度特別慢誓琼,有的task運(yùn)行速度特別快。避免了數(shù)據(jù)傾斜的問(wèn)題肴捉。

4.2.2腹侣、解決問(wèn)題方法

調(diào)用coalesce算子

主要就是用于在filter操作之后,針對(duì)每個(gè)partition的數(shù)據(jù)量各不相同的情況齿穗,來(lái)壓縮partition的數(shù)量傲隶,而且讓每個(gè)partition的數(shù)據(jù)量都盡量均勻緊湊。從而便于后面的task進(jìn)行計(jì)算操作窃页,在某種程度上跺株,能夠一定程度的提升性能。

4.3脖卖、使用foreachPartition優(yōu)化寫數(shù)據(jù)庫(kù)性能

4.3.1乒省、默認(rèn)的foreach的性能缺陷在哪里?

首先畦木,對(duì)于每條數(shù)據(jù)作儿,都要單獨(dú)去調(diào)用一次function,task為每個(gè)數(shù)據(jù)馋劈,都要去執(zhí)行一次function函數(shù)攻锰。

如果100萬(wàn)條數(shù)據(jù),(一個(gè)partition)妓雾,調(diào)用100萬(wàn)次娶吞。性能比較差。

另外一個(gè)非常非常重要的一點(diǎn)

如果每個(gè)數(shù)據(jù)械姻,你都去創(chuàng)建一個(gè)數(shù)據(jù)庫(kù)連接的話妒蛇,那么你就得創(chuàng)建100萬(wàn)次數(shù)據(jù)庫(kù)連接机断。

但是要注意的是,數(shù)據(jù)庫(kù)連接的創(chuàng)建和銷毀绣夺,都是非常非常消耗性能的吏奸。雖然我們之前已經(jīng)用了數(shù)據(jù)庫(kù)連接池,只是創(chuàng)建了固定數(shù)量的數(shù)據(jù)庫(kù)連接陶耍。

你還是得多次通過(guò)數(shù)據(jù)庫(kù)連接奋蔚,往數(shù)據(jù)庫(kù)(MySQL)發(fā)送一條SQL語(yǔ)句,然后MySQL需要去執(zhí)行這條SQL語(yǔ)句烈钞。如果有100萬(wàn)條數(shù)據(jù)泊碑,那么就是100萬(wàn)次發(fā)送SQL語(yǔ)句。

以上兩點(diǎn)(數(shù)據(jù)庫(kù)連接毯欣,多次發(fā)送SQL語(yǔ)句)馒过,都是非常消耗性能的。

4.3.2酗钞、用了foreachPartition算子之后腹忽,好處在哪里?

1砚作、對(duì)于我們寫的function函數(shù)窘奏,就調(diào)用一次,一次傳入一個(gè)partition所有的數(shù)據(jù)偎巢。

2蔼夜、主要?jiǎng)?chuàng)建或者獲取一個(gè)數(shù)據(jù)庫(kù)連接就可以。

3压昼、只要向數(shù)據(jù)庫(kù)發(fā)送一次SQL語(yǔ)句和多組參數(shù)即可求冷。

注意,與mapPartitions操作一樣窍霞,如果一個(gè)partition的數(shù)量真的特別特別大匠题,比如是100萬(wàn),那基本上就不太靠譜了但金。很有可能會(huì)發(fā)生OOM韭山,內(nèi)存溢出的問(wèn)題。


4.4冷溃、使用repartition解決Spark SQL低并行度的性能問(wèn)題

4.4.1钱磅、設(shè)置并行度

并行度:之前說(shuō)過(guò),并行度是設(shè)置的:

1似枕、spark.default.parallelism

2盖淡、textFile(),傳入第二個(gè)參數(shù)凿歼,指定partition數(shù)量(比較少用)


在生產(chǎn)環(huán)境中褪迟,是最好設(shè)置一下并行度冗恨。官網(wǎng)有推薦的設(shè)置方式,根據(jù)你的application的總cpu core數(shù)量(在spark-submit中可以指定)味赃,自己手動(dòng)設(shè)置spark.default.parallelism參數(shù)掀抹,指定為cpu

core總數(shù)的2~3倍。


4.4.2心俗、你設(shè)置的這個(gè)并行度傲武,在哪些情況下會(huì)生效?什么情況下不會(huì)生效另凌?

如果你壓根兒沒(méi)有使用Spark SQL(DataFrame)谱轨,那么你整個(gè)spark application默認(rèn)所有stage的并行度都是你設(shè)置的那個(gè)參數(shù)戒幔。(除非你使用coalesce算子縮減過(guò)partition數(shù)量)吠谢。

問(wèn)題來(lái)了,用Spark SQL的情況下诗茎,stage的并行度沒(méi)法自己指定工坊。Spark SQL自己會(huì)默認(rèn)根據(jù)hive表對(duì)應(yīng)的hdfs文件的block,自動(dòng)設(shè)置Spark SQL查詢所在的那個(gè)stage的并行度敢订。你自己通過(guò)spark.default.parallelism參數(shù)指定的并行度王污,只會(huì)在沒(méi)有Spark SQL的stage中生效。

比如你第一個(gè)stage楚午,用了Spark SQL從hive表中查詢出了一些數(shù)據(jù)昭齐,然后做了一些transformation操作,接著做了一個(gè)shuffle操作(groupByKey)矾柜。下一個(gè)stage阱驾,在shuffle操作之后,做了一些transformation操作怪蔑。hive表里覆,對(duì)應(yīng)了一個(gè)hdfs文件,有20個(gè)block缆瓣。你自己設(shè)置了spark.default.parallelism參數(shù)為100喧枷。

你的第一個(gè)stage的并行度,是不受你的控制的弓坞,就只有20個(gè)task隧甚。第二個(gè)stage,才會(huì)變成你自己設(shè)置的那個(gè)并行度渡冻,100戚扳。

4.4.3、可能出現(xiàn)的問(wèn)題菩帝?

Spark SQL默認(rèn)情況下咖城,它的那個(gè)并行度茬腿,咱們沒(méi)法設(shè)置∫巳福可能導(dǎo)致的問(wèn)題切平,也許沒(méi)什么問(wèn)題,也許很有問(wèn)題辐董。Spark SQL所在的那個(gè)stage中悴品,后面的那些transformation操作,可能會(huì)有非常復(fù)雜的業(yè)務(wù)邏輯简烘,甚至說(shuō)復(fù)雜的算法苔严。如果你的Spark SQL默認(rèn)把task數(shù)量設(shè)置的很少,20個(gè)孤澎,然后每個(gè)task要處理為數(shù)不少的數(shù)據(jù)量届氢,然后還要執(zhí)行特別復(fù)雜的算法。

這個(gè)時(shí)候覆旭,就會(huì)導(dǎo)致第一個(gè)stage的速度退子,特別慢。第二個(gè)stage1000個(gè)task非承徒快寂祥。

4.4.4、解決Spark SQL無(wú)法設(shè)置并行度和task數(shù)量的辦法

repartition算子七兜,你用Spark

SQL這一步的并行度和task數(shù)量丸凭,肯定是沒(méi)有辦法去改變了。但是呢腕铸,可以將你用Spark SQL查詢出來(lái)的RDD惜犀,使用repartition算子去重新進(jìn)行分區(qū),此時(shí)可以分成多個(gè)partition恬惯。然后呢向拆,從repartition以后的RDD,再往后酪耳,并行度和task數(shù)量浓恳,就會(huì)按照你預(yù)期的來(lái)了。就可以避免跟Spark SQL綁定在一個(gè)stage中的算子碗暗,只能使用少量的task去處理大量數(shù)據(jù)以及復(fù)雜的算法邏輯颈将。


4.5、reduceByKey本地聚合介紹

reduceByKey言疗,相較于普通的shuffle操作(比如groupByKey)晴圾,它的一個(gè)特點(diǎn),就是說(shuō)噪奄,會(huì)進(jìn)行map端的本地聚合死姚。對(duì)map端給下個(gè)stage每個(gè)task創(chuàng)建的輸出文件中人乓,寫數(shù)據(jù)之前,就會(huì)進(jìn)行本地的combiner操作都毒,也就是說(shuō)對(duì)每一個(gè)key色罚,對(duì)應(yīng)的values,都會(huì)執(zhí)行你的算子函數(shù)(_ + _)

4.5.1账劲、用reduceByKey對(duì)性能的提升

1戳护、在本地進(jìn)行聚合以后,在map端的數(shù)據(jù)量就變少了瀑焦,減少磁盤IO腌且。而且可以減少磁盤空間的占用。

2榛瓮、下一個(gè)stage铺董,拉取數(shù)據(jù)的量,也就變少了榆芦。減少網(wǎng)絡(luò)的數(shù)據(jù)傳輸?shù)男阅芟摹?/p>

3柄粹、在reduce端進(jìn)行數(shù)據(jù)緩存的內(nèi)存占用變少了喘鸟。

4匆绣、reduce端,要進(jìn)行聚合的數(shù)據(jù)量也變少了什黑。

4.5.2崎淳、reduceByKey在什么情況下使用呢?

1愕把、非常普通的拣凹,比如說(shuō),就是要實(shí)現(xiàn)類似于wordcount程序一樣的恨豁,對(duì)每個(gè)key對(duì)應(yīng)的值嚣镜,進(jìn)行某種數(shù)據(jù)公式或者算法的計(jì)算(累加、類乘)橘蜜。

2菊匿、對(duì)于一些類似于要對(duì)每個(gè)key進(jìn)行一些字符串拼接的這種較為復(fù)雜的操作,可以自己衡量一下计福,其實(shí)有時(shí)跌捆,也是可以使用reduceByKey來(lái)實(shí)現(xiàn)的。但是不太好實(shí)現(xiàn)象颖。如果真能夠?qū)崿F(xiàn)出來(lái)佩厚,對(duì)性能絕對(duì)是有幫助的。(shuffle基本上就占了整個(gè)spark作業(yè)的90%以上的性能消耗说订,主要能對(duì)shuffle進(jìn)行一定的調(diào)優(yōu)抄瓦,都是有價(jià)值的)


5潮瓶、troubleshooting

5.1、控制shuffle reduce端緩沖大小以避免OOM

map端的task是不斷的輸出數(shù)據(jù)的钙姊,數(shù)據(jù)量可能是很大的筋讨。

但是,其實(shí)reduce端的task摸恍,并不是等到map端task將屬于自己的那份數(shù)據(jù)全部寫入磁盤文件之后悉罕,再去拉取的。map端寫一點(diǎn)數(shù)據(jù)立镶,reduce端task就會(huì)拉取一小部分?jǐn)?shù)據(jù)壁袄,立即進(jìn)行后面的聚合、算子函數(shù)的應(yīng)用媚媒。

每次reduece能夠拉取多少數(shù)據(jù)嗜逻,就由buffer來(lái)決定。因?yàn)槔∵^(guò)來(lái)的數(shù)據(jù)缭召,都是先放在buffer中的栈顷。然后才用后面的executor分配的堆內(nèi)存占比(0.2),hashmap嵌巷,去進(jìn)行后續(xù)的聚合萄凤、函數(shù)的執(zhí)行。

5.1.1搪哪、reduce端緩沖大小的另外一面靡努,關(guān)于性能調(diào)優(yōu)的一面

假如Map端輸出的數(shù)據(jù)量也不是特別大,然后你的整個(gè)application的資源也特別充足晓折。200個(gè)executor惑朦、5個(gè)cpu core、10G內(nèi)存漓概。

其實(shí)可以嘗試去增加這個(gè)reduce端緩沖大小的漾月,比如從48M,變成96M胃珍。那么這樣的話梁肿,每次reduce task能夠拉取的數(shù)據(jù)量就很大。需要拉取的次數(shù)也就變少了堂鲜。比如原先需要拉取100次栈雳,現(xiàn)在只要拉取50次就可以執(zhí)行完了。

對(duì)網(wǎng)絡(luò)傳輸性能開(kāi)銷的減少缔莲,以及reduce端聚合操作執(zhí)行的次數(shù)的減少哥纫,都是有幫助的。

最終達(dá)到的效果,就應(yīng)該是性能上的一定程度上的提升蛀骇。

注意厌秒,一定要在資源充足的前提下做此操作。

5.1.2reduce端緩沖(buffer)擅憔,可能會(huì)出現(xiàn)的問(wèn)題及解決方式

可能會(huì)出現(xiàn)鸵闪,默認(rèn)是48MB,也許大多數(shù)時(shí)候暑诸,reduce端task一邊拉取一邊計(jì)算蚌讼,不一定一直都會(huì)拉滿48M的數(shù)據(jù)。大多數(shù)時(shí)候个榕,拉取個(gè)10M數(shù)據(jù)篡石,就計(jì)算掉了。

大多數(shù)時(shí)候西采,也許不會(huì)出現(xiàn)什么問(wèn)題凰萨。但是有的時(shí)候,map端的數(shù)據(jù)量特別大械馆,然后寫出的速度特別快胖眷。reduce端所有task,拉取的時(shí)候霹崎,全部達(dá)到自己的緩沖的最大極限值珊搀,緩沖區(qū)48M,全部填滿仿畸。

這個(gè)時(shí)候食棕,再加上你的reduce端執(zhí)行的聚合函數(shù)的代碼,可能會(huì)創(chuàng)建大量的對(duì)象错沽。也許,一下子內(nèi)存就撐不住了眶拉,就會(huì)OOM千埃。reduce端的內(nèi)存中,就會(huì)發(fā)生內(nèi)存溢出的問(wèn)題忆植。


針對(duì)上述的可能出現(xiàn)的問(wèn)題放可,我們?cè)撛趺磥?lái)解決呢?

這個(gè)時(shí)候朝刊,就應(yīng)該減少reduce端task緩沖的大小耀里。我寧愿多拉取幾次,但是每次同時(shí)能夠拉取到reduce端每個(gè)task的數(shù)量比較少拾氓,就不容易發(fā)生OOM內(nèi)存溢出的問(wèn)題冯挎。(比如,可以調(diào)節(jié)成12M)

在實(shí)際生產(chǎn)環(huán)境中咙鞍,我們都是碰到過(guò)這種問(wèn)題的房官。這是典型的以性能換執(zhí)行的原理趾徽。reduce端緩沖小了,不容易OOM了翰守,但是孵奶,性能一定是有所下降的,你要拉取的次數(shù)就多了蜡峰。就走更多的網(wǎng)絡(luò)傳輸開(kāi)銷了袁。

這種時(shí)候,只能采取犧牲性能的方式了湿颅,spark作業(yè)早像,首先,第一要義肖爵,就是一定要讓它可以跑起來(lái)卢鹦。

5.1.3、操作方法

new SparkConf().set(spark.reducer.maxSizeInFlight劝堪,”48”)

5.2冀自、解決JVM GC導(dǎo)致的shuffle文件拉取失敗

5.2.1、問(wèn)題描述

有時(shí)會(huì)出現(xiàn)一種情況秒啦,在spark的作業(yè)中熬粗,log日志會(huì)提示shuffle file not found。(spark作業(yè)中余境,非常常見(jiàn)的)而且有的時(shí)候捶索,它是偶爾才會(huì)出現(xiàn)的一種情況弟孟。有的時(shí)候,出現(xiàn)這種情況以后,重新去提交task夏块。重新執(zhí)行一遍,發(fā)現(xiàn)就好了啦粹。沒(méi)有這種錯(cuò)誤了肺樟。

log怎么看?用client模式去提交你的spark作業(yè)顽聂。比如standalone

client或yarn client肥惭。一提交作業(yè),直接可以在本地看到更新的log紊搪。

問(wèn)題原因:比如蜜葱,executor的JVM進(jìn)程可能內(nèi)存不夠用了。那么此時(shí)就會(huì)執(zhí)行GC耀石。minor GC or full GC牵囤。此時(shí)就會(huì)導(dǎo)致executor內(nèi),所有的工作線程全部停止。比如BlockManager奔浅,基于netty的網(wǎng)絡(luò)通信馆纳。

下一個(gè)stage的executor,可能還沒(méi)有停止掉的task想要去上一個(gè)stage的task所在的exeuctor去拉取屬于自己的數(shù)據(jù)汹桦,結(jié)果由于對(duì)方正在gc鲁驶,就導(dǎo)致拉取了半天沒(méi)有拉取到。

就很可能會(huì)報(bào)出shuffle file not found舞骆。但是钥弯,可能下一個(gè)stage又重新提交了task以后,再執(zhí)行就沒(méi)有問(wèn)題了督禽,因?yàn)榭赡艿诙尉蜎](méi)有碰到JVM在gc了脆霎。

5.2.2、解決方案

spark.shuffle.io.maxRetries 3

第一個(gè)參數(shù)狈惫,意思就是說(shuō)睛蛛,shuffle文件拉取的時(shí)候,如果沒(méi)有拉取到(拉取失旊侍浮)忆肾,最多或重試幾次(會(huì)重新拉取幾次文件),默認(rèn)是3次菱肖。

spark.shuffle.io.retryWait 5s

第二個(gè)參數(shù)客冈,意思就是說(shuō),每一次重試?yán)∥募臅r(shí)間間隔稳强,默認(rèn)是5s鐘场仲。

默認(rèn)情況下,假如說(shuō)第一個(gè)stage的executor正在進(jìn)行漫長(zhǎng)的full gc退疫。第二個(gè)stage的executor嘗試去拉取文件渠缕,結(jié)果沒(méi)有拉取到,默認(rèn)情況下蹄咖,會(huì)反復(fù)重試?yán)?次褐健,每次間隔是五秒鐘。最多只會(huì)等待3 * 5s = 15s澜汤。如果15s內(nèi),沒(méi)有拉取到shuffle file舵匾。就會(huì)報(bào)出shuffle file not found俊抵。

針對(duì)這種情況,我們完全可以進(jìn)行預(yù)備性的參數(shù)調(diào)節(jié)坐梯。增大上述兩個(gè)參數(shù)的值徽诲,達(dá)到比較大的一個(gè)值,盡量保證第二個(gè)stage的task,一定能夠拉取到上一個(gè)stage的輸出文件谎替。避免報(bào)shuffle file not found偷溺。然后可能會(huì)重新提交stage和task去執(zhí)行。那樣反而對(duì)性能也不好钱贯。

spark.shuffle.io.maxRetries 60

spark.shuffle.io.retryWait 60s

最多可以忍受1個(gè)小時(shí)沒(méi)有拉取到shuffle file挫掏。只是去設(shè)置一個(gè)最大的可能的值。full gc不可能1個(gè)小時(shí)都沒(méi)結(jié)束吧秩命。

這樣呢尉共,就可以盡量避免因?yàn)間c導(dǎo)致的shuffle file not found,無(wú)法拉取到的問(wèn)題弃锐。


5.3袄友、YARN隊(duì)列資源不足導(dǎo)致的application直接失敗

5.3.1、問(wèn)題描述

如果說(shuō)霹菊,你是基于yarn來(lái)提交spark剧蚣。比如yarn-cluster或者yarn-client。你可以指定提交到某個(gè)hadoop隊(duì)列上的旋廷。每個(gè)隊(duì)列都是可以有自己的資源的鸠按。

跟大家說(shuō)一個(gè)生產(chǎn)環(huán)境中的,給spark用的yarn資源隊(duì)列的情況:500G內(nèi)存柳洋,200個(gè)cpu core待诅。

比如說(shuō),某個(gè)spark application熊镣,在spark-submit里面你自己配了卑雁,executor,80個(gè)绪囱。每個(gè)executor测蹲,4G內(nèi)存。每個(gè)executor鬼吵,2個(gè)cpu core扣甲。你的spark作業(yè)每次運(yùn)行,大概要消耗掉320G內(nèi)存齿椅,以及160個(gè)cpu core琉挖。

乍看起來(lái),咱們的隊(duì)列資源涣脚,是足夠的示辈,500G內(nèi)存,280個(gè)cpu core遣蚀。

首先矾麻,第一點(diǎn)纱耻,你的spark作業(yè)實(shí)際運(yùn)行起來(lái)以后,耗費(fèi)掉的資源量险耀,可能是比你在spark-submit里面配置的弄喘,以及你預(yù)期的,是要大一些的甩牺。400G內(nèi)存蘑志,190個(gè)cpu core。

那么這個(gè)時(shí)候柴灯,的確卖漫,咱們的隊(duì)列資源還是有一些剩余的。但問(wèn)題是如果你同時(shí)又提交了一個(gè)spark作業(yè)上去赠群,一模一樣的羊始。那就可能會(huì)出問(wèn)題。

第二個(gè)spark作業(yè)查描,又要申請(qǐng)320G內(nèi)存+160個(gè)cpu core突委。結(jié)果,發(fā)現(xiàn)隊(duì)列資源不足冬三。

此時(shí)匀油,可能會(huì)出現(xiàn)兩種情況:(備注,具體出現(xiàn)哪種情況勾笆,跟你的YARN敌蚜、Hadoop的版本,你們公司的一些運(yùn)維參數(shù)窝爪,以及配置弛车、硬件、資源肯能都有關(guān)系)

1蒲每、YARN纷跛,發(fā)現(xiàn)資源不足時(shí),你的spark作業(yè)邀杏,并沒(méi)有hang在那里贫奠,等待資源的分配,而是直接打印一行fail的log望蜡,直接就fail掉了唤崭。

2、YARN脖律,發(fā)現(xiàn)資源不足浩姥,你的spark作業(yè),就hang在那里状您。一直等待之前的spark作業(yè)執(zhí)行完勒叠,等待有資源分配給自己來(lái)執(zhí)行。

5.3.2膏孟、解決方案

1眯分、在你的J2EE(我們這個(gè)項(xiàng)目里面,spark作業(yè)的運(yùn)行柒桑, J2EE平臺(tái)觸發(fā)的弊决,執(zhí)行spark-submit腳本的平臺(tái))進(jìn)行限制,同時(shí)只能提交一個(gè)spark作業(yè)到y(tǒng)arn上去執(zhí)行魁淳,確保一個(gè)spark作業(yè)的資源肯定是有的飘诗。

2、你應(yīng)該采用一些簡(jiǎn)單的調(diào)度區(qū)分的方式界逛,比如說(shuō)昆稿,有的spark作業(yè)可能是要長(zhǎng)時(shí)間運(yùn)行的,比如運(yùn)行30分鐘息拜。有的spark作業(yè)溉潭,可能是短時(shí)間運(yùn)行的,可能就運(yùn)行2分鐘少欺。此時(shí)喳瓣,都提交到一個(gè)隊(duì)列上去,肯定不合適赞别。很可能出現(xiàn)30分鐘的作業(yè)卡住后面一大堆2分鐘的作業(yè)畏陕。分隊(duì)列,可以申請(qǐng)(跟你們的YARN仿滔、Hadoop運(yùn)維的同事申請(qǐng))惠毁。你自己給自己搞兩個(gè)調(diào)度隊(duì)列。每個(gè)隊(duì)列的根據(jù)你要執(zhí)行的作業(yè)的情況來(lái)設(shè)置堤撵。在你的J2EE程序里面仁讨,要判斷,如果是長(zhǎng)時(shí)間運(yùn)行的作業(yè)实昨,就干脆都提交到某一個(gè)固定的隊(duì)列里面去把洞豁。如果是短時(shí)間運(yùn)行的作業(yè),就統(tǒng)一提交到另外一個(gè)隊(duì)列里面去荒给。這樣丈挟,避免了長(zhǎng)時(shí)間運(yùn)行的作業(yè),阻塞了短時(shí)間運(yùn)行的作業(yè)志电。

3曙咽、你的隊(duì)列里面,無(wú)論何時(shí)挑辆,只會(huì)有一個(gè)作業(yè)在里面運(yùn)行例朱。那么此時(shí)孝情,就應(yīng)該用我們之前講過(guò)的性能調(diào)優(yōu)的手段,去將每個(gè)隊(duì)列能承載的最大的資源洒嗤,分配給你的每一個(gè)spark作業(yè)箫荡,比如80個(gè)executor,6G的內(nèi)存渔隶,3個(gè)cpu core羔挡。盡量讓你的spark作業(yè)每一次運(yùn)行,都達(dá)到最滿的資源使用率间唉,最快的速度绞灼,最好的性能。并行度呈野,240個(gè)cpu core低矮,720個(gè)task。

4际跪、在J2EE中商佛,通過(guò)線程池的方式(一個(gè)線程池對(duì)應(yīng)一個(gè)資源隊(duì)列),來(lái)實(shí)現(xiàn)上述我們說(shuō)的方案姆打。

5.4良姆、解決各種序列化導(dǎo)致的報(bào)錯(cuò)

5.4.1、問(wèn)題描述

用client模式去提交spark作業(yè)幔戏,觀察本地打印出來(lái)的log玛追。如果出現(xiàn)了類似于Serializable、Serialize等等字眼報(bào)錯(cuò)的log闲延,那么恭喜大家痊剖,就碰到了序列化問(wèn)題導(dǎo)致的報(bào)錯(cuò)。

5.4.2垒玲、序列化報(bào)錯(cuò)及解決方法

1陆馁、你的算子函數(shù)里面,如果使用到了外部的自定義類型的變量合愈,那么此時(shí)叮贩,就要求你的自定義類型,必須是可序列化的佛析。

final Teacher teacher = newTeacher("leo");

studentsRDD.foreach(new VoidFunction() {

public void call(Row row) throws Exception {

? ?String teacherName = teacher.getName();

? ?....?

}

});

public class Teacher implements Serializable {

}

2益老、如果要將自定義的類型,作為RDD的元素類型寸莫,那么自定義的類型也必須是可以序列化的捺萌。

JavaPairRDD teacherRDD

JavaPairRDD studentRDD

studentRDD.join(teacherRDD)

public class Teacher implements Serializable {

}

public class Student implements Serializable {

}

3、不能在上述兩種情況下膘茎,去使用一些第三方的桃纯,不支持序列化的類型酷誓。

Connection conn =

studentsRDD.foreach(new VoidFunction() {

public void call(Row row)throws Exception {

? ???????????? conn.....

}

});

Connection是不支持序列化的

5.5、解決算子函數(shù)返回NULL導(dǎo)致的問(wèn)題

5.5.1慈参、問(wèn)題描述

在有些算子函數(shù)里面呛牲,是需要我們有一個(gè)返回值的。但是驮配,有時(shí)候不需要返回值。我們?nèi)绻苯臃祷豊ULL的話着茸,是會(huì)報(bào)錯(cuò)的壮锻。

Scala.Math(NULL),異常

5.5.2涮阔、解決方案

如果碰到你的確是對(duì)于某些值不想要有返回值的話猜绣,有一個(gè)解決的辦法:

1、在返回的時(shí)候敬特,返回一些特殊的值掰邢,不要返回null,比如“-999”

2伟阔、在通過(guò)算子獲取到了一個(gè)RDD之后辣之,可以對(duì)這個(gè)RDD執(zhí)行filter操作,進(jìn)行數(shù)據(jù)過(guò)濾皱炉。filter內(nèi)怀估,可以對(duì)數(shù)據(jù)進(jìn)行判定,如果是-999合搅,那么就返回false多搀,給過(guò)濾掉就可以了。

3灾部、大家不要忘了康铭,之前咱們講過(guò)的那個(gè)算子調(diào)優(yōu)里面的coalesce算子,在filter之后赌髓,可以使用coalesce算子壓縮一下RDD的partition的數(shù)量从藤,讓各個(gè)partition的數(shù)據(jù)比較緊湊一些。也能提升一些性能春弥。


5.6呛哟、解決yarn-client模式導(dǎo)致的網(wǎng)卡流量激增問(wèn)題

5.6.1、Spark-On-Yarn任務(wù)執(zhí)行流程

Driver到底是什么匿沛?

我們寫的spark程序扫责,打成jar包,用spark-submit來(lái)提交逃呼。jar包中的一個(gè)main類鳖孤,通過(guò)jvm的命令啟動(dòng)起來(lái)者娱。

JVM進(jìn)程,其實(shí)就是Driver進(jìn)程苏揣。

Driver進(jìn)程啟動(dòng)起來(lái)以后黄鳍,執(zhí)行我們自己寫的main函數(shù),從new

SparkContext()開(kāi)始平匈。

driver接收到屬于自己的executor進(jìn)程的注冊(cè)之后框沟,就可以去進(jìn)行我們寫的spark作業(yè)代碼的執(zhí)行了。此時(shí)會(huì)一行一行的去執(zhí)行咱們寫的那些spark代碼增炭。執(zhí)行到某個(gè)action操作的時(shí)候忍燥,就會(huì)觸發(fā)一個(gè)job,然后DAGScheduler會(huì)將job劃分為一個(gè)一個(gè)的stage隙姿,為每個(gè)stage都創(chuàng)建指定數(shù)量的task梅垄。TaskScheduler將每個(gè)stage的task分配到各個(gè)executor上面去執(zhí)行。

task就會(huì)執(zhí)行咱們寫的算子函數(shù)输玷。

spark在yarn-client模式下队丝,application的注冊(cè)(executor的申請(qǐng))和計(jì)算task的調(diào)度,是分離開(kāi)來(lái)的欲鹏。

standalone模式下机久,這兩個(gè)操作都是driver負(fù)責(zé)的。

ApplicationMaster(ExecutorLauncher)負(fù)責(zé)executor的申請(qǐng)貌虾,driver負(fù)責(zé)job和stage的劃分吞加,以及task的創(chuàng)建、分配和調(diào)度尽狠。

每種計(jì)算框架(mr衔憨、spark),如果想要在yarn上執(zhí)行自己的計(jì)算應(yīng)用袄膏,那么就必須自己實(shí)現(xiàn)和提供一個(gè)ApplicationMaster践图。相當(dāng)于是實(shí)現(xiàn)了yarn提供的接口,spark自己開(kāi)發(fā)的一個(gè)類沉馆。

5.6.2码党、yarn-client模式下,會(huì)產(chǎn)生什么樣的問(wèn)題呢斥黑?

由于driver是啟動(dòng)在本地機(jī)器的揖盘,而且driver是全權(quán)負(fù)責(zé)所有的任務(wù)的調(diào)度的,也就是說(shuō)要跟yarn集群上運(yùn)行的多個(gè)executor進(jìn)行頻繁的通信(中間有task的啟動(dòng)消息锌奴、task的執(zhí)行統(tǒng)計(jì)消息兽狭、task的運(yùn)行狀態(tài)、shuffle的輸出結(jié)果)。

想象一下箕慧,比如你的executor有100個(gè)服球,stage有10個(gè),task有1000個(gè)颠焦。每個(gè)stage運(yùn)行的時(shí)候斩熊,都有1000個(gè)task提交到executor上面去運(yùn)行,平均每個(gè)executor有10個(gè)task伐庭。接下來(lái)問(wèn)題來(lái)了粉渠,driver要頻繁地跟executor上運(yùn)行的1000個(gè)task進(jìn)行通信。通信消息特別多似忧,通信的頻率特別高渣叛。運(yùn)行完一個(gè)stage,接著運(yùn)行下一個(gè)stage盯捌,又是頻繁的通信。

在整個(gè)spark運(yùn)行的生命周期內(nèi)蘑秽,都會(huì)頻繁的去進(jìn)行通信和調(diào)度饺著。所有這一切通信和調(diào)度都是從你的本地機(jī)器上發(fā)出去的,和接收到的肠牲。這是最要命的地方幼衰。你的本地機(jī)器,很可能在30分鐘內(nèi)(spark作業(yè)運(yùn)行的周期內(nèi))缀雳,進(jìn)行頻繁大量的網(wǎng)絡(luò)通信渡嚣。那么此時(shí),你的本地機(jī)器的網(wǎng)絡(luò)通信負(fù)載是非常非常高的肥印。會(huì)導(dǎo)致你的本地機(jī)器的網(wǎng)卡流量會(huì)激增识椰!

你的本地機(jī)器的網(wǎng)卡流量激增,當(dāng)然不是一件好事了深碱。因?yàn)樵谝恍┐蟮墓纠锩娓桂模瑢?duì)每臺(tái)機(jī)器的使用情況,都是有監(jiān)控的敷硅。不會(huì)允許單個(gè)機(jī)器出現(xiàn)耗費(fèi)大量網(wǎng)絡(luò)帶寬等等這種資源的情況功咒。

5.6.3、解決方案

實(shí)際上解決的方法很簡(jiǎn)單绞蹦,就是心里要清楚力奋,yarn-client模式是什么情況下,可以使用的?yarn-client模式篡九,通常咱們就只會(huì)使用在測(cè)試環(huán)境中宾娜,你寫好了某個(gè)spark作業(yè)瘫怜,打了一個(gè)jar包滨彻,在某臺(tái)測(cè)試機(jī)器上藕届,用yarn-client模式去提交一下。因?yàn)闇y(cè)試的行為是偶爾為之的亭饵,不會(huì)長(zhǎng)時(shí)間連續(xù)提交大量的spark作業(yè)去測(cè)試休偶。還有一點(diǎn)好處,yarn-client模式提交辜羊,可以在本地機(jī)器觀察到詳細(xì)全面的log踏兜。通過(guò)查看log,可以去解決線上報(bào)錯(cuò)的故障(troubleshooting)八秃、對(duì)性能進(jìn)行觀察并進(jìn)行性能調(diào)優(yōu)碱妆。

實(shí)際上線了以后,在生產(chǎn)環(huán)境中昔驱,都得用yarn-cluster模式疹尾,去提交你的spark作業(yè)。

yarn-cluster模式骤肛,就跟你的本地機(jī)器引起的網(wǎng)卡流量激增的問(wèn)題纳本,就沒(méi)有關(guān)系了。也就是說(shuō)腋颠,就算有問(wèn)題繁成,也應(yīng)該是yarn運(yùn)維團(tuán)隊(duì)和基礎(chǔ)運(yùn)維團(tuán)隊(duì)之間的事情了。使用了yarn-cluster模式以后淑玫,就不是你的本地機(jī)器運(yùn)行Driver巾腕,進(jìn)行task調(diào)度了。是yarn集群中絮蒿,某個(gè)節(jié)點(diǎn)會(huì)運(yùn)行driver進(jìn)程尊搬,負(fù)責(zé)task調(diào)度。


5.7歌径、解決yarn-cluster模式的JVM棧內(nèi)存溢出問(wèn)題

5.7.1毁嗦、問(wèn)題描述

有的時(shí)候,運(yùn)行一些包含了spark sql的spark作業(yè)回铛,可能會(huì)碰到y(tǒng)arn-client模式下狗准,可以正常提交運(yùn)行。yarn-cluster模式下茵肃,可能無(wú)法提交運(yùn)行的腔长,會(huì)報(bào)出JVM的PermGen(永久代)的內(nèi)存溢出,OOM验残。

yarn-client模式下捞附,driver是運(yùn)行在本地機(jī)器上的,spark使用的JVM的PermGen的配置,是本地的spark-class文件(spark客戶端是默認(rèn)有配置的)鸟召,JVM的永久代的大小是128M胆绊,這個(gè)是沒(méi)有問(wèn)題的。但是在yarn-cluster模式下欧募,driver是運(yùn)行在yarn集群的某個(gè)節(jié)點(diǎn)上的压状,使用的是沒(méi)有經(jīng)過(guò)配置的默認(rèn)設(shè)置(PermGen永久代大小)跟继,82M种冬。

spark-sql,它的內(nèi)部是要進(jìn)行很復(fù)雜的SQL的語(yǔ)義解析舔糖、語(yǔ)法樹(shù)的轉(zhuǎn)換等等娱两,特別復(fù)雜。在這種復(fù)雜的情況下金吗,如果說(shuō)你的sql本身特別復(fù)雜的話十兢,很可能會(huì)比較導(dǎo)致性能的消耗,內(nèi)存的消耗摇庙〖涂妫可能對(duì)PermGen永久代的占用會(huì)比較大。

所以跟匆,此時(shí),如果對(duì)永久代的占用需求通砍,超過(guò)了82M的話玛臂,但是呢又在128M以內(nèi),就會(huì)出現(xiàn)如上所述的問(wèn)題封孙,yarn-client模式下迹冤,默認(rèn)是128M,這個(gè)還能運(yùn)行虎忌,如果在yarn-cluster模式下泡徙,默認(rèn)是82M,就有問(wèn)題了膜蠢。會(huì)報(bào)出PermGen Out of Memory error log堪藐。

5.7.2、解決方案

既然是JVM的PermGen永久代內(nèi)存溢出挑围,那么就是內(nèi)存不夠用礁竞。我們就給yarn-cluster模式下的driver的PermGen多設(shè)置一些。

spark-submit腳本中杉辙,加入以下配置即可:

--confspark.driver.extraJavaOptions="-XX:PermSize=128M-XX:MaxPermSize=256M"

這個(gè)就設(shè)置了driver永久代的大小模捂,默認(rèn)是128M,最大是256M。這樣的話狂男,就可以基本保證你的spark作業(yè)不會(huì)出現(xiàn)上述的yarn-cluster模式導(dǎo)致的永久代內(nèi)存溢出的問(wèn)題综看。

spark sql中,寫sql岖食,要注意一個(gè)問(wèn)題:

如果sql有大量的or語(yǔ)句红碑。比如where keywords='' or keywords='' or keywords=''

當(dāng)達(dá)到or語(yǔ)句,有成百上千的時(shí)候县耽,此時(shí)可能就會(huì)出現(xiàn)一個(gè)driver端的jvm stack overflow句喷,JVM棧內(nèi)存溢出的問(wèn)題。

JVM棧內(nèi)存溢出兔毙,基本上就是由于調(diào)用的方法層級(jí)過(guò)多唾琼,因?yàn)楫a(chǎn)生了大量的,非常深的澎剥,超出了JVM棧深度限制的遞歸方法锡溯。我們的猜測(cè),spark sql有大量or語(yǔ)句的時(shí)候哑姚,spark sql內(nèi)部源碼中祭饭,在解析sql,比如轉(zhuǎn)換成語(yǔ)法樹(shù)叙量,或者進(jìn)行執(zhí)行計(jì)劃的生成的時(shí)候倡蝙,對(duì)or的處理是遞歸。or特別多的話绞佩,就會(huì)發(fā)生大量的遞歸寺鸥。

JVM Stack Memory Overflow,棧內(nèi)存溢出品山。

這種時(shí)候胆建,建議不要搞那么復(fù)雜的spark sql語(yǔ)句。采用替代方案:將一條sql語(yǔ)句肘交,拆解成多條sql語(yǔ)句來(lái)執(zhí)行笆载。每條sql語(yǔ)句,就只有100個(gè)or子句以內(nèi)涯呻。一條一條SQL語(yǔ)句來(lái)執(zhí)行凉驻。根據(jù)生產(chǎn)環(huán)境經(jīng)驗(yàn)的測(cè)試,一條sql語(yǔ)句魄懂,100個(gè)or子句以內(nèi)沿侈,是還可以的。通常情況下市栗,不會(huì)報(bào)那個(gè)棧內(nèi)存溢出缀拭。


5.7咳短、錯(cuò)誤的持久化方式以及checkpoint的使用

5.7.1、使用持久化方式

錯(cuò)誤的持久化使用方式:

usersRDD蛛淋,想要對(duì)這個(gè)RDD做一個(gè)cache咙好,希望能夠在后面多次使用這個(gè)RDD的時(shí)候,不用反復(fù)重新計(jì)算RDD褐荷」葱В可以直接使用通過(guò)各個(gè)節(jié)點(diǎn)上的executor的BlockManager管理的內(nèi)存/ 磁盤上的數(shù)據(jù),避免重新反復(fù)計(jì)算RDD叛甫。

usersRDD.cache()

usersRDD.count()

usersRDD.take()

上面這種方式层宫,不要說(shuō)會(huì)不會(huì)生效了,實(shí)際上是會(huì)報(bào)錯(cuò)的其监。會(huì)報(bào)什么錯(cuò)誤呢萌腿?會(huì)報(bào)一大堆file not found的錯(cuò)誤。


正確的持久化使用方式:

usersRDD

usersRDD = usersRDD.cache() // Java

val cachedUsersRDD = usersRDD.cache() // Scala

之后再去使用usersRDD抖苦,或者cachedUsersRDD就可以了毁菱。

5.7.2、checkpoint的使用

對(duì)于持久化锌历,大多數(shù)時(shí)候都是會(huì)正常工作的贮庞。但有些時(shí)候會(huì)出現(xiàn)意外。

比如說(shuō)究西,緩存在內(nèi)存中的數(shù)據(jù)窗慎,可能莫名其妙就丟失掉了。

或者說(shuō)卤材,存儲(chǔ)在磁盤文件中的數(shù)據(jù)捉邢,莫名其妙就沒(méi)了,文件被誤刪了商膊。

出現(xiàn)上述情況的時(shí)候,如果要對(duì)這個(gè)RDD執(zhí)行某些操作宠进,可能會(huì)發(fā)現(xiàn)RDD的某個(gè)partition找不到了晕拆。

下來(lái)task就會(huì)對(duì)消失的partition重新計(jì)算,計(jì)算完以后再緩存和使用材蹬。

有些時(shí)候实幕,計(jì)算某個(gè)RDD,可能是極其耗時(shí)的堤器±ケ樱可能RDD之前有大量的父RDD。那么如果你要重新計(jì)算一個(gè)partition闸溃,可能要重新計(jì)算之前所有的父RDD對(duì)應(yīng)的partition整吆。

這種情況下拱撵,就可以選擇對(duì)這個(gè)RDD進(jìn)行checkpoint,以防萬(wàn)一表蝙。進(jìn)行checkpoint拴测,就是說(shuō),會(huì)將RDD的數(shù)據(jù)府蛇,持久化一份到容錯(cuò)的文件系統(tǒng)上(比如hdfs)集索。

在對(duì)這個(gè)RDD進(jìn)行計(jì)算的時(shí)候,如果發(fā)現(xiàn)它的緩存數(shù)據(jù)不見(jiàn)了汇跨。優(yōu)先就是先找一下有沒(méi)有checkpoint數(shù)據(jù)(到hdfs上面去找)务荆。如果有的話,就使用checkpoint數(shù)據(jù)了穷遂。不至于去重新計(jì)算函匕。

checkpoint,其實(shí)就是可以作為是cache的一個(gè)備胎塞颁。如果cache失效了浦箱,checkpoint就可以上來(lái)使用了。

checkpoint有利有弊祠锣,利在于酷窥,提高了spark作業(yè)的可靠性,一旦發(fā)生問(wèn)題伴网,還是很可靠的蓬推,不用重新計(jì)算大量的rdd。但是弊在于澡腾,進(jìn)行checkpoint操作的時(shí)候沸伏,也就是將rdd數(shù)據(jù)寫入hdfs中的時(shí)候,還是會(huì)消耗性能的动分。

checkpoint毅糟,用性能換可靠性。

checkpoint原理:

1澜公、在代碼中姆另,用SparkContext,設(shè)置一個(gè)checkpoint目錄坟乾,可以是一個(gè)容錯(cuò)文件系統(tǒng)的目錄迹辐,比如hdfs。

2甚侣、在代碼中明吩,對(duì)需要進(jìn)行checkpoint的rdd,執(zhí)行RDD.checkpoint()殷费。

3印荔、RDDCheckpointData(spark內(nèi)部的API)低葫,接管你的RDD,會(huì)標(biāo)記為marked

for checkpoint躏鱼,準(zhǔn)備進(jìn)行checkpoint氮采。

4、你的job運(yùn)行完之后染苛,會(huì)調(diào)用一個(gè)finalRDD.doCheckpoint()方法鹊漠,會(huì)順著rdd

lineage,回溯掃描茶行,發(fā)現(xiàn)有標(biāo)記為待checkpoint的rdd躯概,就會(huì)進(jìn)行二次標(biāo)記,inProgressCheckpoint畔师,正在接受checkpoint操作娶靡。

5、job執(zhí)行完之后看锉,就會(huì)啟動(dòng)一個(gè)內(nèi)部的新job姿锭,去將標(biāo)記為inProgressCheckpoint的rdd的數(shù)據(jù),都寫入hdfs文件中伯铣。(備注呻此,如果rdd之前cache過(guò),會(huì)直接從緩存中獲取數(shù)據(jù)腔寡,寫入hdfs中焚鲜。如果沒(méi)有cache過(guò),那么就會(huì)重新計(jì)算一遍這個(gè)rdd放前,再checkpoint)忿磅。

6、將checkpoint過(guò)的rdd之前的依賴rdd凭语,改成一個(gè)CheckpointRDD*葱她,強(qiáng)制改變你的rdd的lineage。后面如果rdd的cache數(shù)據(jù)獲取失敗似扔,直接會(huì)通過(guò)它的上游CheckpointRDD览效,去容錯(cuò)的文件系統(tǒng),比如hdfs虫几,中,獲取checkpoint的數(shù)據(jù)挽拔。

checkpoint的使用:

1辆脸、sc.checkpointFile("hdfs://"),設(shè)置checkpoint目錄

2螃诅、對(duì)RDD執(zhí)行checkpoint操作

6啡氢、數(shù)據(jù)傾斜解決方案

數(shù)據(jù)傾斜的解決状囱,跟之前講解的性能調(diào)優(yōu),有一點(diǎn)異曲同工之妙倘是。

性能調(diào)優(yōu)中最有效最直接最簡(jiǎn)單的方式就是加資源加并行度亭枷,并注意RDD架構(gòu)(復(fù)用同一個(gè)RDD,加上cache緩存)搀崭。相對(duì)于前面叨粘,shuffle、jvm等是次要的瘤睹。

6.1升敲、原理以及現(xiàn)象分析

6.1.1、數(shù)據(jù)傾斜怎么出現(xiàn)的

在執(zhí)行shuffle操作的時(shí)候轰传,是按照key驴党,來(lái)進(jìn)行values的數(shù)據(jù)的輸出、拉取和聚合的获茬。

同一個(gè)key的values港庄,一定是分配到一個(gè)reduce task進(jìn)行處理的。

多個(gè)key對(duì)應(yīng)的values恕曲,比如一共是90萬(wàn)鹏氧。可能某個(gè)key對(duì)應(yīng)了88萬(wàn)數(shù)據(jù)码俩,被分配到一個(gè)task上去面去執(zhí)行度帮。

另外兩個(gè)task,可能各分配到了1萬(wàn)數(shù)據(jù)稿存,可能是數(shù)百個(gè)key笨篷,對(duì)應(yīng)的1萬(wàn)條數(shù)據(jù)。

這樣就會(huì)出現(xiàn)數(shù)據(jù)傾斜問(wèn)題瓣履。

想象一下率翅,出現(xiàn)數(shù)據(jù)傾斜以后的運(yùn)行的情況。很糟糕袖迎!

其中兩個(gè)task冕臭,各分配到了1萬(wàn)數(shù)據(jù),可能同時(shí)在10分鐘內(nèi)都運(yùn)行完了燕锥。另外一個(gè)task有88萬(wàn)條辜贵,88 * 10 =? 880分鐘= 14.5個(gè)小時(shí)。

大家看归形,本來(lái)另外兩個(gè)task很快就運(yùn)行完畢了(10分鐘)托慨,但是由于一個(gè)拖后腿的家伙,第三個(gè)task暇榴,要14.5個(gè)小時(shí)才能運(yùn)行完厚棵,就導(dǎo)致整個(gè)spark作業(yè)蕉世,也得14.5個(gè)小時(shí)才能運(yùn)行完。

數(shù)據(jù)傾斜婆硬,一旦出現(xiàn)狠轻,是不是性能殺手?彬犯!

6.1.2向楼、發(fā)生數(shù)據(jù)傾斜以后的現(xiàn)象

Spark數(shù)據(jù)傾斜,有兩種表現(xiàn):

1躏嚎、你的大部分的task蜜自,都執(zhí)行的特別特別快,(你要用client模式卢佣,standalone client重荠,yarn client,本地機(jī)器一執(zhí)行spark-submit腳本虚茶,就會(huì)開(kāi)始打印log)戈鲁,task175

finished,剩下幾個(gè)task嘹叫,執(zhí)行的特別特別慢婆殿,前面的task,一般1s可以執(zhí)行完5個(gè)罩扇,最后發(fā)現(xiàn)1000個(gè)task婆芦,998,999 task喂饥,要執(zhí)行1個(gè)小時(shí)消约,2個(gè)小時(shí)才能執(zhí)行完一個(gè)task员帮。

出現(xiàn)以上loginfo或粮,就表明出現(xiàn)數(shù)據(jù)傾斜了。

這樣還算好的捞高,因?yàn)殡m然老牛拉破車一樣非常慢氯材,但是至少還能跑。

2硝岗、另一種情況是氢哮,運(yùn)行的時(shí)候,其他task都執(zhí)行完了型檀,也沒(méi)什么特別的問(wèn)題冗尤,但是有的task,就是會(huì)突然間報(bào)了一個(gè)OOM,JVM Out Of Memory生闲,內(nèi)存溢出了,task failed月幌,task lost碍讯,resubmitting

task。反復(fù)執(zhí)行幾次都到了某個(gè)task就是跑不通扯躺,最后就掛掉捉兴。

某個(gè)task就直接OOM,那么基本上也是因?yàn)閿?shù)據(jù)傾斜了录语,task分配的數(shù)量實(shí)在是太大了倍啥!所以內(nèi)存放不下,然后你的task每處理一條數(shù)據(jù)澎埠,還要?jiǎng)?chuàng)建大量的對(duì)象虽缕,內(nèi)存爆掉了。

這樣也表明出現(xiàn)數(shù)據(jù)傾斜了蒲稳。

這種就不太好了氮趋,因?yàn)槟愕某绦蛉绻蝗ソ鉀Q數(shù)據(jù)傾斜的問(wèn)題,壓根兒就跑不出來(lái)江耀。

作業(yè)都跑不完剩胁,還談什么性能調(diào)優(yōu)這些東西?祥国!

6.1.3昵观、定位數(shù)據(jù)傾斜出現(xiàn)的原因與出現(xiàn)問(wèn)題的位置

根據(jù)log去定位

出現(xiàn)數(shù)據(jù)傾斜的原因,基本只可能是因?yàn)榘l(fā)生了shuffle操作舌稀,在shuffle的過(guò)程中啊犬,出現(xiàn)了數(shù)據(jù)傾斜的問(wèn)題。因?yàn)槟硞€(gè)或者某些key對(duì)應(yīng)的數(shù)據(jù)扩借,遠(yuǎn)遠(yuǎn)的高于其他的key椒惨。

1、你在自己的程序里面找找潮罪,哪些地方用了會(huì)產(chǎn)生shuffle的算子康谆,groupByKey、countByKey嫉到、reduceByKey沃暗、join

2谓松、看log

log一般會(huì)報(bào)是在你的哪一行代碼内列,導(dǎo)致了OOM異常∫校或者看log,看看是執(zhí)行到了第幾個(gè)stage惜辑。spark代碼唬涧,是怎么劃分成一個(gè)一個(gè)的stage的。哪一個(gè)stage生成的task特別慢盛撑,就能夠自己用肉眼去對(duì)你的spark代碼進(jìn)行stage的劃分碎节,就能夠通過(guò)stage定位到你的代碼,到底哪里發(fā)生了數(shù)據(jù)傾斜抵卫。

6.2狮荔、聚合源數(shù)據(jù)以及過(guò)濾導(dǎo)致傾斜的key

數(shù)據(jù)傾斜解決方案,第一個(gè)方案和第二個(gè)方案介粘,一起來(lái)講殖氏。這兩個(gè)方案是最直接、最有效姻采、最簡(jiǎn)單的解決數(shù)據(jù)傾斜問(wèn)題的方案雅采。

第一個(gè)方案:聚合源數(shù)據(jù)。

第二個(gè)方案:過(guò)濾導(dǎo)致傾斜的key偎谁。

后面的五個(gè)方案总滩,尤其是最后4個(gè)方案,都是那種特別狂拽炫酷吊炸天的方案巡雨。但沒(méi)有第一二個(gè)方案簡(jiǎn)單直接闰渔。如果碰到了數(shù)據(jù)傾斜的問(wèn)題。上來(lái)就先考慮第一個(gè)和第二個(gè)方案看能不能做铐望,如果能做的話冈涧,后面的5個(gè)方案,都不用去搞了正蛙。

有效督弓、簡(jiǎn)單、直接才是最好的乒验,徹底根除了數(shù)據(jù)傾斜的問(wèn)題愚隧。

6.2.1、方案一:聚合源數(shù)據(jù)

一些聚合的操作锻全,比如groupByKey狂塘、reduceByKey,groupByKey說(shuō)白了就是拿到每個(gè)key對(duì)應(yīng)的values鳄厌。reduceByKey說(shuō)白了就是對(duì)每個(gè)key對(duì)應(yīng)的values執(zhí)行一定的計(jì)算荞胡。

這些操作,比如groupByKey和reduceByKey了嚎,包括之前說(shuō)的join泪漂。都是在spark作業(yè)中執(zhí)行的廊营。

spark作業(yè)的數(shù)據(jù)來(lái)源,通常是哪里呢萝勤?90%的情況下露筒,數(shù)據(jù)來(lái)源都是hive表(hdfs,大數(shù)據(jù)分布式存儲(chǔ)系統(tǒng))敌卓。hdfs上存儲(chǔ)的大數(shù)據(jù)邀窃。hive表中的數(shù)據(jù)通常是怎么出來(lái)的呢?有了spark以后假哎,hive比較適合做什么事情?hive就是適合做離線的鞍历,晚上凌晨跑的舵抹,ETL(extract transform load,數(shù)據(jù)的采集劣砍、清洗惧蛹、導(dǎo)入),hive

sql刑枝,去做這些事情香嗓,從而去形成一個(gè)完整的hive中的數(shù)據(jù)倉(cāng)庫(kù)。說(shuō)白了装畅,數(shù)據(jù)倉(cāng)庫(kù)靠娱,就是一堆表。

spark作業(yè)的源表掠兄,hive表像云,通常情況下來(lái)說(shuō),也是通過(guò)某些hive etl生成的蚂夕。hive etl可能是晚上凌晨在那兒跑迅诬。今天跑昨天的數(shù)據(jù)。

數(shù)據(jù)傾斜婿牍,某個(gè)key對(duì)應(yīng)的80萬(wàn)數(shù)據(jù)侈贷,某些key對(duì)應(yīng)幾百條,某些key對(duì)應(yīng)幾十條〉戎現(xiàn)在咱們直接在生成hive表的hive etl中對(duì)數(shù)據(jù)進(jìn)行聚合俏蛮。比如按key來(lái)分組,將key對(duì)應(yīng)的所有的values全部用一種特殊的格式拼接到一個(gè)字符串里面去慎菲,比如“key=sessionid, value: action_seq=1|user_id=1|search_keyword=火鍋|category_id=001;action_seq=2|user_id=1|search_keyword=涮肉|category_id=001”嫁蛇。

對(duì)key進(jìn)行g(shù)roup,在spark中露该,拿到key=sessionid睬棚,values。hive etl中,直接對(duì)key進(jìn)行了聚合抑党。那么也就意味著包警,每個(gè)key就只對(duì)應(yīng)一條數(shù)據(jù)。在spark中底靠,就不需要再去執(zhí)行g(shù)roupByKey+map這種操作了害晦。直接對(duì)每個(gè)key對(duì)應(yīng)的values字符串進(jìn)行map操作,進(jìn)行你需要的操作即可暑中。

spark中壹瘟,可能對(duì)這個(gè)操作,就不需要執(zhí)行shffule操作了鳄逾,也就根本不可能導(dǎo)致數(shù)據(jù)傾斜稻轨。

或者是對(duì)每個(gè)key在hive etl中進(jìn)行聚合,對(duì)所有values聚合一下雕凹,不一定是拼接起來(lái)殴俱,可能是直接進(jìn)行計(jì)算。reduceByKey計(jì)算函數(shù)應(yīng)用在hive etl中枚抵,從而得到每個(gè)key的values线欲。


聚合源數(shù)據(jù)方案第二種做法是,你可能沒(méi)有辦法對(duì)每個(gè)key聚合出來(lái)一條數(shù)據(jù)汽摹。那么也可以做一個(gè)妥協(xié)李丰,對(duì)每個(gè)key對(duì)應(yīng)的數(shù)據(jù),10萬(wàn)條逼泣。有好幾個(gè)粒度嫌套,比如10萬(wàn)條里面包含了幾個(gè)城市、幾天圾旨、幾個(gè)地區(qū)的數(shù)據(jù)踱讨,現(xiàn)在放粗粒度。直接就按照城市粒度砍的,做一下聚合痹筛,幾個(gè)城市,幾天廓鞠、幾個(gè)地區(qū)粒度的數(shù)據(jù)帚稠,都給聚合起來(lái)。比如說(shuō)

city_id date area_id

select ... from ... group by city_id

盡量去聚合床佳,減少每個(gè)key對(duì)應(yīng)的數(shù)量滋早,也許聚合到比較粗的粒度之后,原先有10萬(wàn)數(shù)據(jù)量的key砌们,現(xiàn)在只有1萬(wàn)數(shù)據(jù)量杆麸。減輕數(shù)據(jù)傾斜的現(xiàn)象和問(wèn)題搁进。

6.2.2、方案二:過(guò)濾導(dǎo)致傾斜的key

如果你能夠接受某些數(shù)據(jù)在spark作業(yè)中直接就摒棄掉不使用昔头。比如說(shuō)饼问,總共有100萬(wàn)個(gè)key。只有2個(gè)key是數(shù)據(jù)量達(dá)到10萬(wàn)的揭斧。其他所有的key莱革,對(duì)應(yīng)的數(shù)量都是幾十萬(wàn)。

這個(gè)時(shí)候讹开,你自己可以去取舍盅视,如果業(yè)務(wù)和需求可以理解和接受的話,在你從hive表查詢?cè)磾?shù)據(jù)的時(shí)候旦万,直接在sql中用where條件左冬,過(guò)濾掉某幾個(gè)key。

那么這幾個(gè)原先有大量數(shù)據(jù)纸型,會(huì)導(dǎo)致數(shù)據(jù)傾斜的key,被過(guò)濾掉之后梅忌,那么在你的spark作業(yè)中狰腌,自然就不會(huì)發(fā)生數(shù)據(jù)傾斜了。

6.3牧氮、提高shuffle操作reduce并行度

6.3.1琼腔、問(wèn)題描述

第一個(gè)和第二個(gè)方案,都不適合做踱葛,然后再考慮這個(gè)方案丹莲。

將reduce task的數(shù)量變多,就可以讓每個(gè)reduce task分配到更少的數(shù)據(jù)量尸诽。這樣的話也許就可以緩解甚至是基本解決掉數(shù)據(jù)傾斜的問(wèn)題甥材。

6.3.2、提升shuffle reduce端并行度的操作方法

很簡(jiǎn)單性含,主要給我們所有的shuffle算子洲赵,比如groupByKey、countByKey商蕴、reduceByKey叠萍。在調(diào)用的時(shí)候,傳入進(jìn)去一個(gè)參數(shù)绪商。那個(gè)數(shù)字苛谷,就代表了那個(gè)shuffle操作的reduce端的并行度。那么在進(jìn)行shuffle操作的時(shí)候格郁,就會(huì)對(duì)應(yīng)著創(chuàng)建指定數(shù)量的reduce task腹殿。

這樣的話独悴,就可以讓每個(gè)reduce task分配到更少的數(shù)據(jù)『丈撸基本可以緩解數(shù)據(jù)傾斜的問(wèn)題绵患。

比如說(shuō),原本某個(gè)task分配數(shù)據(jù)特別多悟耘,直接OOM落蝙,內(nèi)存溢出了,程序沒(méi)法運(yùn)行暂幼,直接掛掉筏勒。按照l(shuí)og,找到發(fā)生數(shù)據(jù)傾斜的shuffle操作旺嬉,給它傳入一個(gè)并行度數(shù)字管行,這樣的話,原先那個(gè)task分配到的數(shù)據(jù)邪媳,肯定會(huì)變少捐顷。就至少可以避免OOM的情況,程序至少是可以跑的雨效。

6.3.2迅涮、提升shuffle reduce并行度的缺陷

治標(biāo)不治本的意思,因?yàn)樗鼪](méi)有從根本上改變數(shù)據(jù)傾斜的本質(zhì)和問(wèn)題徽龟。不像第一個(gè)和第二個(gè)方案(直接避免了數(shù)據(jù)傾斜的發(fā)生)叮姑。原理沒(méi)有改變,只是說(shuō)据悔,盡可能地去緩解和減輕shuffle reduce task的數(shù)據(jù)壓力传透,以及數(shù)據(jù)傾斜的問(wèn)題。

實(shí)際生產(chǎn)環(huán)境中的經(jīng)驗(yàn):

1极颓、如果最理想的情況下朱盐,提升并行度以后,減輕了數(shù)據(jù)傾斜的問(wèn)題菠隆,或者甚至可以讓數(shù)據(jù)傾斜的現(xiàn)象忽略不計(jì)托享,那么就最好。就不用做其他的數(shù)據(jù)傾斜解決方案了浸赫。

2闰围、不太理想的情況下,比如之前某個(gè)task運(yùn)行特別慢既峡,要5個(gè)小時(shí)羡榴,現(xiàn)在稍微快了一點(diǎn),變成了4個(gè)小時(shí)运敢⌒B兀或者是原先運(yùn)行到某個(gè)task忠售,直接OOM,現(xiàn)在至少不會(huì)OOM了迄沫,但是那個(gè)task運(yùn)行特別慢稻扬,要5個(gè)小時(shí)才能跑完。

那么羊瘩,如果出現(xiàn)第二種情況的話泰佳,各位,就立即放棄第三種方案尘吗,開(kāi)始去嘗試和選擇后面的四種方案逝她。

6.4、使用隨機(jī)key實(shí)現(xiàn)雙重聚合

6.4.1睬捶、使用場(chǎng)景

groupByKey黔宛、reduceByKey比較適合使用這種方式。join咱們通常不會(huì)這樣來(lái)做擒贸,后面會(huì)講三種針對(duì)不同的join造成的數(shù)據(jù)傾斜的問(wèn)題的解決方案臀晃。

6.4.2、解決方案

第一輪聚合的時(shí)候介劫,對(duì)key進(jìn)行打散徽惋,將原先一樣的key,變成不一樣的key蜕猫,相當(dāng)于是將每個(gè)key分為多組。

先針對(duì)多個(gè)組哎迄,進(jìn)行key的局部聚合回右。接著,再去除掉每個(gè)key的前綴漱挚,然后對(duì)所有的key進(jìn)行全局的聚合翔烁。

對(duì)groupByKey、reduceByKey造成的數(shù)據(jù)傾斜旨涝,有比較好的效果蹬屹。

如果說(shuō),之前的第一白华、第二慨默、第三種方案,都沒(méi)法解決數(shù)據(jù)傾斜的問(wèn)題弧腥,那么就只能依靠這一種方式了厦取。

6.5、將reduce join轉(zhuǎn)換為map join

6.5.1管搪、使用方式

普通的join虾攻,那么肯定是要走shuffle铡买。既然是走shuffle,那么普通的join就肯定是走的是reduce join霎箍。那怎么將reduce join 轉(zhuǎn)換為mapjoin呢奇钞?先將所有相同的key,對(duì)應(yīng)的value匯聚到一個(gè)task中漂坏,然后再進(jìn)行join景埃。

6.5.2、使用場(chǎng)景

這種方式適合在什么樣的情況下來(lái)使用樊拓?

如果兩個(gè)RDD要進(jìn)行join纠亚,其中一個(gè)RDD是比較小的。比如一個(gè)RDD是100萬(wàn)數(shù)據(jù)筋夏,一個(gè)RDD是1萬(wàn)數(shù)據(jù)蒂胞。(一個(gè)RDD是1億數(shù)據(jù),一個(gè)RDD是100萬(wàn)數(shù)據(jù))条篷。

其中一個(gè)RDD必須是比較小的骗随,broadcast出去那個(gè)小RDD的數(shù)據(jù)以后,就會(huì)在每個(gè)executor的block manager中都保存一份赴叹。要確保你的內(nèi)存足夠存放那個(gè)小RDD中的數(shù)據(jù)鸿染。

這種方式下,根本不會(huì)發(fā)生shuffle操作乞巧,肯定也不會(huì)發(fā)生數(shù)據(jù)傾斜涨椒。從根本上杜絕了join操作可能導(dǎo)致的數(shù)據(jù)傾斜的問(wèn)題。

對(duì)于join中有數(shù)據(jù)傾斜的情況绽媒,大家盡量第一時(shí)間先考慮這種方式蚕冬,效果非常好。

不適合的情況

兩個(gè)RDD都比較大是辕,那么這個(gè)時(shí)候囤热,你去將其中一個(gè)RDD做成broadcast,就很笨拙了获三。很可能導(dǎo)致內(nèi)存不足旁蔼。最終導(dǎo)致內(nèi)存溢出,程序掛掉疙教。

而且其中某些key(或者是某個(gè)key)棺聊,還發(fā)生了數(shù)據(jù)傾斜。此時(shí)可以采用最后兩種方式贞谓。

對(duì)于join這種操作躺屁,不光是考慮數(shù)據(jù)傾斜的問(wèn)題。即使是沒(méi)有數(shù)據(jù)傾斜問(wèn)題经宏,也完全可以優(yōu)先考慮犀暑,用我們講的這種高級(jí)的reduce join轉(zhuǎn)map join的技術(shù)驯击,不要用普通的join,去通過(guò)shuffle耐亏,進(jìn)行數(shù)據(jù)的join徊都。完全可以通過(guò)簡(jiǎn)單的map,使用map join的方式广辰,犧牲一點(diǎn)內(nèi)存資源暇矫。在可行的情況下,優(yōu)先這么使用择吊。

不走shuffle李根,直接走map,是不是性能也會(huì)高很多几睛?這是肯定的房轿。


6.6、sample采樣傾斜key單獨(dú)進(jìn)行join

6.6.1所森、方案實(shí)現(xiàn)思路

將發(fā)生數(shù)據(jù)傾斜的key囱持,單獨(dú)拉出來(lái),放到一個(gè)RDD中去焕济。就用這個(gè)原本會(huì)傾斜的key RDD跟其他RDD單獨(dú)去join一下纷妆,這個(gè)時(shí)候key對(duì)應(yīng)的數(shù)據(jù)可能就會(huì)分散到多個(gè)task中去進(jìn)行join操作。

就不至于說(shuō)是晴弃,這個(gè)key跟之前其他的key混合在一個(gè)RDD中時(shí)掩幢,肯定是會(huì)導(dǎo)致一個(gè)key對(duì)應(yīng)的所有數(shù)據(jù)都到一個(gè)task中去,就會(huì)導(dǎo)致數(shù)據(jù)傾斜上鞠。

6.6.2际邻、使用場(chǎng)景

這種方案什么時(shí)候適合使用?

優(yōu)先對(duì)于join旗国,肯定是希望能夠采用上一個(gè)方案枯怖,即reduce join轉(zhuǎn)換map join注整。兩個(gè)RDD數(shù)據(jù)都比較大能曾,那么就不要那么搞了。

針對(duì)你的RDD的數(shù)據(jù)肿轨,你可以自己把它轉(zhuǎn)換成一個(gè)中間表寿冕,或者是直接用countByKey()的方式,你可以看一下這個(gè)RDD各個(gè)key對(duì)應(yīng)的數(shù)據(jù)量椒袍。此時(shí)如果你發(fā)現(xiàn)整個(gè)RDD就一個(gè)驼唱,或者少數(shù)幾個(gè)key對(duì)應(yīng)的數(shù)據(jù)量特別多。盡量建議驹暑,比如就是一個(gè)key對(duì)應(yīng)的數(shù)據(jù)量特別多玫恳。

此時(shí)可以采用這種方案辨赐,單拉出來(lái)那個(gè)最多的key,單獨(dú)進(jìn)行join京办,盡可能地將key分散到各個(gè)task上去進(jìn)行join操作掀序。

什么時(shí)候不適用呢?

如果一個(gè)RDD中惭婿,導(dǎo)致數(shù)據(jù)傾斜的key特別多不恭。那么此時(shí),最好還是不要這樣了财饥。還是使用我們最后一個(gè)方案换吧,終極的join數(shù)據(jù)傾斜的解決方案。

就是說(shuō)钥星,咱們單拉出來(lái)了一個(gè)或者少數(shù)幾個(gè)可能會(huì)產(chǎn)生數(shù)據(jù)傾斜的key沾瓦,然后還可以進(jìn)行更加優(yōu)化的一個(gè)操作。

對(duì)于那個(gè)key打颤,從另外一個(gè)要join的表中暴拄,也過(guò)濾出來(lái)一份數(shù)據(jù),比如可能就只有一條數(shù)據(jù)编饺。userid2infoRDD乖篷,一個(gè)userid key,就對(duì)應(yīng)一條數(shù)據(jù)透且。

然后呢撕蔼,采取對(duì)那個(gè)只有一條數(shù)據(jù)的RDD,進(jìn)行flatMap操作秽誊,打上100個(gè)隨機(jī)數(shù)鲸沮,作為前綴,返回100條數(shù)據(jù)锅论。

單獨(dú)拉出來(lái)的可能產(chǎn)生數(shù)據(jù)傾斜的RDD讼溺,給每一條數(shù)據(jù),都打上一個(gè)100以內(nèi)的隨機(jī)數(shù)最易,作為前綴怒坯。

再去進(jìn)行join,是不是性能就更好了藻懒√拊常肯定可以將數(shù)據(jù)進(jìn)行打散,去進(jìn)行join嬉荆。join完以后归敬,可以執(zhí)行map操作,去將之前打上的隨機(jī)數(shù)給去掉,然后再和另外一個(gè)普通RDD join以后的結(jié)果進(jìn)行union操作汪茧。


6.7椅亚、使用隨機(jī)數(shù)以及擴(kuò)容表進(jìn)行join

6.7.1、使用場(chǎng)景及步驟

當(dāng)采用隨機(jī)數(shù)和擴(kuò)容表進(jìn)行join解決數(shù)據(jù)傾斜的時(shí)候舱污,就代表著什往,你的之前的數(shù)據(jù)傾斜的解決方案,都沒(méi)法使用慌闭。

這個(gè)方案是沒(méi)辦法徹底解決數(shù)據(jù)傾斜的别威,更多的,是一種對(duì)數(shù)據(jù)傾斜的緩解驴剔。

步驟:

1省古、選擇一個(gè)RDD,要用flatMap丧失,進(jìn)行擴(kuò)容豺妓,將每條數(shù)據(jù),映射為多條數(shù)據(jù)布讹,每個(gè)映射出來(lái)的數(shù)據(jù)京革,都帶了一個(gè)n以內(nèi)的隨機(jī)數(shù)劝术,通常來(lái)說(shuō)會(huì)選擇10。

2、將另外一個(gè)RDD梭灿,做普通的map映射操作耳鸯,每條數(shù)據(jù)都打上一個(gè)10以內(nèi)的隨機(jī)數(shù)榨乎。

3丁屎、最后將兩個(gè)處理后的RDD進(jìn)行join操作。


6.7.2呼股、局限性

1耕魄、因?yàn)槟愕膬蓚€(gè)RDD都很大,所以你沒(méi)有辦法去將某一個(gè)RDD擴(kuò)的特別大彭谁,一般咱們就是10倍吸奴。

2、如果就是10倍的話缠局,那么數(shù)據(jù)傾斜問(wèn)題的確是只能說(shuō)是緩解和減輕则奥,不能說(shuō)徹底解決。

sample采樣傾斜key并單獨(dú)進(jìn)行join

將key甩鳄,從另外一個(gè)RDD中過(guò)濾出的數(shù)據(jù)逞度,可能只有一條或者幾條额划,此時(shí)妙啃,咱們可以任意進(jìn)行擴(kuò)容,擴(kuò)成1000倍。

將從第一個(gè)RDD中拆分出來(lái)的那個(gè)傾斜key RDD揖赴,打上1000以內(nèi)的一個(gè)隨機(jī)數(shù)馆匿。

這種情況下,還可以配合上燥滑,提升shuffle reduce并行度渐北,join(rdd, 1000)。通常情況下铭拧,效果還是非常不錯(cuò)的赃蛛。

打散成100份,甚至1000份搀菩,2000份呕臂,去進(jìn)行join,那么就肯定沒(méi)有數(shù)據(jù)傾斜的問(wèn)題了吧肪跋。

附:實(shí)時(shí)計(jì)算程序性能調(diào)優(yōu)

1歧蒋、并行化數(shù)據(jù)接收:處理多個(gè)topic的數(shù)據(jù)時(shí)比較有效

int numStreams = 5;

List>kafkaStreams = new ArrayList>(numStreams);

for (int i = 0; i < numStreams; i++) {

?kafkaStreams.add(KafkaUtils.createStream(...));

}

JavaPairDStream unifiedStream= streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1,kafkaStreams.size()));

unifiedStream.print();

2、spark.streaming.blockInterval:增加block數(shù)量州既,增加每個(gè)batch

rdd的partition數(shù)量谜洽,增加處理并行度

receiver從數(shù)據(jù)源源源不斷地獲取到數(shù)據(jù);首先是會(huì)按照block interval吴叶,將指定時(shí)間間隔的數(shù)據(jù)阐虚,收集為一個(gè)block;默認(rèn)時(shí)間是200ms蚌卤,官方推薦不要小于50ms敌呈;接著呢,會(huì)將指定batch interval時(shí)間間隔內(nèi)的block造寝,合并為一個(gè)batch磕洪;創(chuàng)建為一個(gè)rdd,然后啟動(dòng)一個(gè)job诫龙,去處理這個(gè)batch rdd中的數(shù)據(jù)

batch rdd析显,它的partition數(shù)量是多少呢?一個(gè)batch有多少個(gè)block签赃,就有多少個(gè)partition谷异;就意味著并行度是多少;就意味著每個(gè)batch rdd有多少個(gè)task會(huì)并行計(jì)算和處理锦聊。

當(dāng)然是希望可以比默認(rèn)的task數(shù)量和并行度再多一些了歹嘹;可以手動(dòng)調(diào)節(jié)block interval;減少block interval孔庭;每個(gè)batch可以包含更多的block尺上;有更多的partition材蛛;也就有更多的task并行處理每個(gè)batch rdd。

定死了怎抛,初始的rdd過(guò)來(lái)卑吭,直接就是固定的partition數(shù)量了

3、inputStream.repartition(

of partitions>):重分區(qū)马绝,增加每個(gè)batch rdd的partition數(shù)量

有些時(shí)候豆赏,希望對(duì)某些dstream中的rdd進(jìn)行定制化的分區(qū)

對(duì)dstream中的rdd進(jìn)行重分區(qū),去重分區(qū)成指定數(shù)量的分區(qū)富稻,這樣也可以提高指定dstream的rdd的計(jì)算并行度

4掷邦、調(diào)節(jié)并行度

spark.default.parallelism

reduceByKey(numPartitions)

5、使用Kryo序列化機(jī)制:

spark streaming椭赋,也是有不少序列化的場(chǎng)景的

提高序列化task發(fā)送到executor上執(zhí)行的性能耙饰,如果task很多的時(shí)候,task序列化和反序列化的性能開(kāi)銷也比較可觀

默認(rèn)輸入數(shù)據(jù)的存儲(chǔ)級(jí)別是StorageLevel.MEMORY_AND_DISK_SER_2纹份,receiver接收到數(shù)據(jù)苟跪,默認(rèn)就會(huì)進(jìn)行持久化操作;首先序列化數(shù)據(jù)蔓涧,存儲(chǔ)到內(nèi)存中件已;如果內(nèi)存資源不夠大,那么就寫入磁盤元暴;而且篷扩,還會(huì)寫一份冗余副本到其他executor的block manager中,進(jìn)行數(shù)據(jù)冗余茉盏。

6鉴未、batch interval:每個(gè)的處理時(shí)間必須小于batchinterval

實(shí)際上你的spark streaming跑起來(lái)以后,其實(shí)都是可以在spark ui上觀察它的運(yùn)行情況的鸠姨;可以看到batch的處理時(shí)間铜秆;

如果發(fā)現(xiàn)batch的處理時(shí)間大于batch interval,就必須調(diào)節(jié)batch interval

盡量不要讓batch處理時(shí)間大于batch interval

比如你的batch每隔5秒生成一次讶迁;你的batch處理時(shí)間要達(dá)到6秒连茧;就會(huì)出現(xiàn),batch在你的內(nèi)存中日積月累巍糯,一直囤積著啸驯,沒(méi)法及時(shí)計(jì)算掉,釋放內(nèi)存空間祟峦;而且對(duì)內(nèi)存空間的占用越來(lái)越大罚斗,那么此時(shí)會(huì)導(dǎo)致內(nèi)存空間快速消耗

如果發(fā)現(xiàn)batch處理時(shí)間比batch interval要大,就盡量將batch interval調(diào)節(jié)大一些

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末宅楞,一起剝皮案震驚了整個(gè)濱河市针姿,隨后出現(xiàn)的幾起案子袱吆,更是在濱河造成了極大的恐慌,老刑警劉巖搓幌,帶你破解...
    沈念sama閱讀 206,126評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異迅箩,居然都是意外死亡溉愁,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門饲趋,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)拐揭,“玉大人,你說(shuō)我怎么就攤上這事奕塑√梦郏” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,445評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵龄砰,是天一觀的道長(zhǎng)盟猖。 經(jīng)常有香客問(wèn)我,道長(zhǎng)换棚,這世上最難降的妖魔是什么式镐? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,185評(píng)論 1 278
  • 正文 為了忘掉前任,我火速辦了婚禮固蚤,結(jié)果婚禮上娘汞,老公的妹妹穿的比我還像新娘。我一直安慰自己夕玩,他們只是感情好你弦,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評(píng)論 5 371
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著燎孟,像睡著了一般禽作。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上揩页,一...
    開(kāi)封第一講書(shū)人閱讀 48,970評(píng)論 1 284
  • 那天领迈,我揣著相機(jī)與錄音,去河邊找鬼碍沐。 笑死狸捅,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的累提。 我是一名探鬼主播尘喝,決...
    沈念sama閱讀 38,276評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼斋陪!你這毒婦竟也來(lái)了朽褪?” 一聲冷哼從身側(cè)響起置吓,我...
    開(kāi)封第一講書(shū)人閱讀 36,927評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎缔赠,沒(méi)想到半個(gè)月后衍锚,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,400評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡嗤堰,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評(píng)論 2 323
  • 正文 我和宋清朗相戀三年戴质,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片踢匣。...
    茶點(diǎn)故事閱讀 37,997評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡告匠,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出离唬,到底是詐尸還是另有隱情后专,我是刑警寧澤,帶...
    沈念sama閱讀 33,646評(píng)論 4 322
  • 正文 年R本政府宣布输莺,位于F島的核電站戚哎,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏嫂用。R本人自食惡果不足惜建瘫,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望尸折。 院中可真熱鬧啰脚,春花似錦、人聲如沸实夹。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,204評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)亮航。三九已至荸实,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間缴淋,已是汗流浹背准给。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,423評(píng)論 1 260
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留重抖,地道東北人露氮。 一個(gè)月前我還...
    沈念sama閱讀 45,423評(píng)論 2 352
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像钟沛,于是被迫代替她去往敵國(guó)和親畔规。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容