4菱阵、算子調(diào)優(yōu)
4.1、MapPartitions提升Map類操作性能
spark中案糙,最基本的原則限嫌,就是每個(gè)task處理一個(gè)RDD的partition。
4.1.1时捌、MapPartitions的優(yōu)缺點(diǎn)
MapPartitions操作的優(yōu)點(diǎn):
如果是普通的map怒医,比如一個(gè)partition中有1萬條數(shù)據(jù)。ok奢讨,那么你的function要執(zhí)行和計(jì)算1萬次稚叹。
但是,使用MapPartitions操作之后拿诸,一個(gè)task僅僅會(huì)執(zhí)行一次function扒袖,function一次接收所有的partition數(shù)據(jù)。只要執(zhí)行一次就可以了亩码,性能比較高季率。
MapPartitions的缺點(diǎn):
如果是普通的map操作,一次function的執(zhí)行就處理一條數(shù)據(jù)描沟。那么如果內(nèi)存不夠用的情況下飒泻,比如處理了1千條數(shù)據(jù)了,那么這個(gè)時(shí)候內(nèi)存不夠了吏廉,那么就可以將已經(jīng)處理完的1千條數(shù)據(jù)從內(nèi)存里面垃圾回收掉泞遗,或者用其他方法,騰出空間來吧席覆。
所以說普通的map操作通常不會(huì)導(dǎo)致內(nèi)存的OOM異常史辙。
但是MapPartitions操作,對(duì)于大量數(shù)據(jù)來說佩伤,比如甚至一個(gè)partition聊倔,100萬數(shù)據(jù),一次傳入一個(gè)function以后生巡,那么可能一下子內(nèi)存不夠方库,但是又沒有辦法去騰出內(nèi)存空間來,可能就OOM障斋,內(nèi)存溢出纵潦。
4.1.2徐鹤、MapPartitions使用場(chǎng)景
當(dāng)分析的數(shù)據(jù)量不是特別大的時(shí)候,都可以用這種MapPartitions系列操作邀层,性能還是非常不錯(cuò)的返敬,是有提升的。比如原來是15分鐘寥院,(曾經(jīng)有一次性能調(diào)優(yōu))劲赠,12分鐘。10分鐘->9分鐘秸谢。
但是也有過出問題的經(jīng)驗(yàn)凛澎,MapPartitions只要一用,直接OOM估蹄,內(nèi)存溢出塑煎,崩潰。
在項(xiàng)目中臭蚁,自己先去估算一下RDD的數(shù)據(jù)量最铁,以及每個(gè)partition的量,還有自己分配給每個(gè)executor的內(nèi)存資源垮兑±湮荆看看一下子內(nèi)存容納所有的partition數(shù)據(jù)行不行。如果行系枪,可以試一下雀哨,能跑通就好。性能肯定是有提升的私爷。但是試了以后震束,發(fā)現(xiàn)OOM了,那就放棄吧当犯。
4.2、filter過后使用coalesce減少分區(qū)數(shù)量
4.2.1割疾、出現(xiàn)問題
默認(rèn)情況下嚎卫,經(jīng)過了filter之后,RDD中的每個(gè)partition的數(shù)據(jù)量宏榕,可能都不太一樣了拓诸。(原本每個(gè)partition的數(shù)據(jù)量可能是差不多的)
可能出現(xiàn)的問題:
- 每個(gè)partition數(shù)據(jù)量變少了,但是在后面進(jìn)行處理的時(shí)候麻昼,還是要跟partition數(shù)量一樣數(shù)量的task奠支,來進(jìn)行處理,有點(diǎn)浪費(fèi)task計(jì)算資源抚芦。
- 每個(gè)partition的數(shù)據(jù)量不一樣倍谜,會(huì)導(dǎo)致后面的每個(gè)task處理每個(gè)partition的時(shí)候迈螟,每個(gè)task要處理的數(shù)據(jù)量就不同,這樣就會(huì)導(dǎo)致有些task運(yùn)行的速度很快尔崔,有些task運(yùn)行的速度很慢答毫。這就是數(shù)據(jù)傾斜。
針對(duì)上述的兩個(gè)問題季春,我們希望應(yīng)該能夠怎么樣洗搂? - 針對(duì)第一個(gè)問題,我們希望可以進(jìn)行partition的壓縮吧载弄,因?yàn)閿?shù)據(jù)量變少了耘拇,那么partition其實(shí)也完全可以對(duì)應(yīng)的變少。比如原來是4個(gè)partition宇攻,現(xiàn)在完全可以變成2個(gè)partition惫叛。那么就只要用后面的2個(gè)task來處理即可。就不會(huì)造成task計(jì)算資源的浪費(fèi)尺碰。(不必要挣棕,針對(duì)只有一點(diǎn)點(diǎn)數(shù)據(jù)的partition,還去啟動(dòng)一個(gè)task來計(jì)算)
- 針對(duì)第二個(gè)問題亲桥,其實(shí)解決方案跟第一個(gè)問題是一樣的洛心,也是去壓縮partition,盡量讓每個(gè)partition的數(shù)據(jù)量差不多题篷。那么后面的task分配到的partition的數(shù)據(jù)量也就差不多词身。不會(huì)造成有的task運(yùn)行速度特別慢,有的task運(yùn)行速度特別快番枚。避免了數(shù)據(jù)傾斜的問題法严。
4.2.2、解決問題方法
調(diào)用coalesce算子
主要就是用于在filter操作之后葫笼,針對(duì)每個(gè)partition的數(shù)據(jù)量各不相同的情況深啤,來壓縮partition的數(shù)量,而且讓每個(gè)partition的數(shù)據(jù)量都盡量均勻緊湊路星。從而便于后面的task進(jìn)行計(jì)算操作溯街,在某種程度上,能夠一定程度的提升性能洋丐。
4.3呈昔、使用foreachPartition優(yōu)化寫數(shù)據(jù)庫性能
4.3.1、默認(rèn)的foreach的性能缺陷在哪里友绝?
首先堤尾,對(duì)于每條數(shù)據(jù),都要單獨(dú)去調(diào)用一次function迁客,task為每個(gè)數(shù)據(jù)郭宝,都要去執(zhí)行一次function函數(shù)辞槐。
如果100萬條數(shù)據(jù),(一個(gè)partition)剩蟀,調(diào)用100萬次催蝗。性能比較差。
另外一個(gè)非常非常重要的一點(diǎn)
如果每個(gè)數(shù)據(jù)育特,你都去創(chuàng)建一個(gè)數(shù)據(jù)庫連接的話丙号,那么你就得創(chuàng)建100萬次數(shù)據(jù)庫連接。
但是要注意的是缰冤,數(shù)據(jù)庫連接的創(chuàng)建和銷毀犬缨,都是非常非常消耗性能的。雖然我們之前已經(jīng)用了數(shù)據(jù)庫連接池棉浸,只是創(chuàng)建了固定數(shù)量的數(shù)據(jù)庫連接怀薛。
你還是得多次通過數(shù)據(jù)庫連接,往數(shù)據(jù)庫(MySQL)發(fā)送一條SQL語句迷郑,然后MySQL需要去執(zhí)行這條SQL語句枝恋。如果有100萬條數(shù)據(jù),那么就是100萬次發(fā)送SQL語句嗡害。
以上兩點(diǎn)(數(shù)據(jù)庫連接焚碌,多次發(fā)送SQL語句),都是非常消耗性能的霸妹。
4.3.2蛾默、用了foreachPartition算子之后芹关,好處在哪里?
- 對(duì)于我們寫的function函數(shù)宅荤,就調(diào)用一次裆针,一次傳入一個(gè)partition所有的數(shù)據(jù)掌逛。
- 主要?jiǎng)?chuàng)建或者獲取一個(gè)數(shù)據(jù)庫連接就可以痴施。
- 只要向數(shù)據(jù)庫發(fā)送一次SQL語句和多組參數(shù)即可味榛。
注意,與mapPartitions操作一樣良价,如果一個(gè)partition的數(shù)量真的特別特別大寝殴,比如是100萬,那基本上就不太靠譜了棚壁。很有可能會(huì)發(fā)生OOM,內(nèi)存溢出的問題栈虚。
4.4泌射、使用repartition解決Spark SQL低并行度的性能問題
4.4.1、設(shè)置并行度
并行度:之前說過,并行度是設(shè)置的:
- spark.default.parallelism
- textFile()次屠,傳入第二個(gè)參數(shù)浑此,指定partition數(shù)量(比較少用)
在生產(chǎn)環(huán)境中蒲犬,是最好設(shè)置一下并行度擂送。官網(wǎng)有推薦的設(shè)置方式蟀拷,根據(jù)你的application的總cpu core數(shù)量(在spark-submit中可以指定)此衅,自己手動(dòng)設(shè)置spark.default.parallelism參數(shù)酌泰,指定為cpu core總數(shù)的2~3倍。
4.4.2、你設(shè)置的這個(gè)并行度虑省,在哪些情況下會(huì)生效伪节?什么情況下不會(huì)生效?
如果你壓根兒沒有使用Spark SQL(DataFrame)钓瞭,那么你整個(gè)spark application默認(rèn)所有stage的并行度都是你設(shè)置的那個(gè)參數(shù)驳遵。(除非你使用coalesce算子縮減過partition數(shù)量)。
問題來了山涡,用Spark SQL的情況下堤结,stage的并行度沒法自己指定。Spark SQL自己會(huì)默認(rèn)根據(jù)hive表對(duì)應(yīng)的hdfs文件的block鸭丛,自動(dòng)設(shè)置Spark SQL查詢所在的那個(gè)stage的并行度竞穷。你自己通過spark.default.parallelism參數(shù)指定的并行度,只會(huì)在沒有Spark SQL的stage中生效鳞溉。
比如你第一個(gè)stage瘾带,用了Spark SQL從hive表中查詢出了一些數(shù)據(jù),然后做了一些transformation操作熟菲,接著做了一個(gè)shuffle操作(groupByKey)看政。下一個(gè)stage,在shuffle操作之后抄罕,做了一些transformation操作允蚣。hive表,對(duì)應(yīng)了一個(gè)hdfs文件呆贿,有20個(gè)block嚷兔。你自己設(shè)置了spark.default.parallelism參數(shù)為100。
你的第一個(gè)stage的并行度做入,是不受你的控制的冒晰,就只有20個(gè)task。第二個(gè)stage母蛛,才會(huì)變成你自己設(shè)置的那個(gè)并行度翩剪,100。
4.4.3彩郊、可能出現(xiàn)的問題前弯?
Spark SQL默認(rèn)情況下,它的那個(gè)并行度秫逝,咱們沒法設(shè)置恕出。可能導(dǎo)致的問題违帆,也許沒什么問題浙巫,也許很有問題。Spark SQL所在的那個(gè)stage中,后面的那些transformation操作的畴,可能會(huì)有非常復(fù)雜的業(yè)務(wù)邏輯渊抄,甚至說復(fù)雜的算法。如果你的Spark SQL默認(rèn)把task數(shù)量設(shè)置的很少丧裁,20個(gè)护桦,然后每個(gè)task要處理為數(shù)不少的數(shù)據(jù)量,然后還要執(zhí)行特別復(fù)雜的算法煎娇。
這個(gè)時(shí)候二庵,就會(huì)導(dǎo)致第一個(gè)stage的速度,特別慢缓呛。第二個(gè)stage1000個(gè)task非炒呦恚快。
4.4.4哟绊、解決Spark SQL無法設(shè)置并行度和task數(shù)量的辦法
repartition算子因妙,你用Spark SQL這一步的并行度和task數(shù)量,肯定是沒有辦法去改變了票髓。但是呢兰迫,可以將你用Spark SQL查詢出來的RDD,使用repartition算子去重新進(jìn)行分區(qū)炬称,此時(shí)可以分成多個(gè)partition汁果。然后呢,從repartition以后的RDD玲躯,再往后据德,并行度和task數(shù)量,就會(huì)按照你預(yù)期的來了跷车。就可以避免跟Spark SQL綁定在一個(gè)stage中的算子棘利,只能使用少量的task去處理大量數(shù)據(jù)以及復(fù)雜的算法邏輯。
4.5朽缴、reduceByKey本地聚合介紹
reduceByKey善玫,相較于普通的shuffle操作(比如groupByKey),它的一個(gè)特點(diǎn)密强,就是說茅郎,會(huì)進(jìn)行map端的本地聚合。對(duì)map端給下個(gè)stage每個(gè)task創(chuàng)建的輸出文件中或渤,寫數(shù)據(jù)之前系冗,就會(huì)進(jìn)行本地的combiner操作,也就是說對(duì)每一個(gè)key薪鹦,對(duì)應(yīng)的values掌敬,都會(huì)執(zhí)行你的算子函數(shù)(_ + _)
4.5.1惯豆、用reduceByKey對(duì)性能的提升
1、在本地進(jìn)行聚合以后奔害,在map端的數(shù)據(jù)量就變少了楷兽,減少磁盤IO。而且可以減少磁盤空間的占用华临。
2拄养、下一個(gè)stage,拉取數(shù)據(jù)的量银舱,也就變少了。減少網(wǎng)絡(luò)的數(shù)據(jù)傳輸?shù)男阅芟摹?br>
3跛梗、在reduce端進(jìn)行數(shù)據(jù)緩存的內(nèi)存占用變少了寻馏。
4、reduce端核偿,要進(jìn)行聚合的數(shù)據(jù)量也變少了诚欠。
4.5.2、reduceByKey在什么情況下使用呢漾岳?
1轰绵、非常普通的,比如說尼荆,就是要實(shí)現(xiàn)類似于wordcount程序一樣的左腔,對(duì)每個(gè)key對(duì)應(yīng)的值,進(jìn)行某種數(shù)據(jù)公式或者算法的計(jì)算(累加捅儒、類乘)液样。
2、對(duì)于一些類似于要對(duì)每個(gè)key進(jìn)行一些字符串拼接的這種較為復(fù)雜的操作巧还,可以自己衡量一下鞭莽,其實(shí)有時(shí),也是可以使用reduceByKey來實(shí)現(xiàn)的麸祷。但是不太好實(shí)現(xiàn)澎怒。如果真能夠?qū)崿F(xiàn)出來,對(duì)性能絕對(duì)是有幫助的阶牍。(shuffle基本上就占了整個(gè)spark作業(yè)的90%以上的性能消耗喷面,主要能對(duì)shuffle進(jìn)行一定的調(diào)優(yōu),都是有價(jià)值的)
5走孽、troubleshooting
5.1乖酬、控制shuffle reduce端緩沖大小以避免OOM
map端的task是不斷的輸出數(shù)據(jù)的,數(shù)據(jù)量可能是很大的融求。
但是咬像,其實(shí)reduce端的task,并不是等到map端task將屬于自己的那份數(shù)據(jù)全部寫入磁盤文件之后,再去拉取的县昂。map端寫一點(diǎn)數(shù)據(jù)肮柜,reduce端task就會(huì)拉取一小部分?jǐn)?shù)據(jù),立即進(jìn)行后面的聚合倒彰、算子函數(shù)的應(yīng)用审洞。
每次reduece能夠拉取多少數(shù)據(jù),就由buffer來決定待讳。因?yàn)槔∵^來的數(shù)據(jù)芒澜,都是先放在buffer中的。然后才用后面的executor分配的堆內(nèi)存占比(0.2)创淡,hashmap痴晦,去進(jìn)行后續(xù)的聚合、函數(shù)的執(zhí)行琳彩。
5.1.1誊酌、reduce端緩沖大小的另外一面,關(guān)于性能調(diào)優(yōu)的一面
假如Map端輸出的數(shù)據(jù)量也不是特別大露乏,然后你的整個(gè)application的資源也特別充足碧浊。200個(gè)executor、5個(gè)cpu core瘟仿、10G內(nèi)存箱锐。
其實(shí)可以嘗試去增加這個(gè)reduce端緩沖大小的,比如從48M劳较,變成96M瑞躺。那么這樣的話,每次reduce task能夠拉取的數(shù)據(jù)量就很大兴想。需要拉取的次數(shù)也就變少了幢哨。比如原先需要拉取100次,現(xiàn)在只要拉取50次就可以執(zhí)行完了嫂便。
對(duì)網(wǎng)絡(luò)傳輸性能開銷的減少捞镰,以及reduce端聚合操作執(zhí)行的次數(shù)的減少,都是有幫助的毙替。
最終達(dá)到的效果岸售,就應(yīng)該是性能上的一定程度上的提升。
注意厂画,一定要在資源充足的前提下做此操作凸丸。
5.1.2reduce端緩沖(buffer),可能會(huì)出現(xiàn)的問題及解決方式
可能會(huì)出現(xiàn)袱院,默認(rèn)是48MB屎慢,也許大多數(shù)時(shí)候瞭稼,reduce端task一邊拉取一邊計(jì)算,不一定一直都會(huì)拉滿48M的數(shù)據(jù)腻惠。大多數(shù)時(shí)候环肘,拉取個(gè)10M數(shù)據(jù),就計(jì)算掉了集灌。
大多數(shù)時(shí)候悔雹,也許不會(huì)出現(xiàn)什么問題。但是有的時(shí)候欣喧,map端的數(shù)據(jù)量特別大腌零,然后寫出的速度特別快。reduce端所有task唆阿,拉取的時(shí)候益涧,全部達(dá)到自己的緩沖的最大極限值,緩沖區(qū)48M酷鸦,全部填滿。
這個(gè)時(shí)候牙咏,再加上你的reduce端執(zhí)行的聚合函數(shù)的代碼臼隔,可能會(huì)創(chuàng)建大量的對(duì)象。也許妄壶,一下子內(nèi)存就撐不住了摔握,就會(huì)OOM。reduce端的內(nèi)存中丁寄,就會(huì)發(fā)生內(nèi)存溢出的問題氨淌。
針對(duì)上述的可能出現(xiàn)的問題,我們?cè)撛趺磥斫鉀Q呢伊磺?
這個(gè)時(shí)候盛正,就應(yīng)該減少reduce端task緩沖的大小。我寧愿多拉取幾次屑埋,但是每次同時(shí)能夠拉取到reduce端每個(gè)task的數(shù)量比較少豪筝,就不容易發(fā)生OOM內(nèi)存溢出的問題。(比如摘能,可以調(diào)節(jié)成12M)
在實(shí)際生產(chǎn)環(huán)境中续崖,我們都是碰到過這種問題的。這是典型的以性能換執(zhí)行的原理团搞。reduce端緩沖小了严望,不容易OOM了,但是逻恐,性能一定是有所下降的像吻,你要拉取的次數(shù)就多了峻黍。就走更多的網(wǎng)絡(luò)傳輸開銷。
這種時(shí)候萧豆,只能采取犧牲性能的方式了奸披,spark作業(yè),首先涮雷,第一要義阵面,就是一定要讓它可以跑起來。
5.1.3洪鸭、操作方法
new SparkConf().set(spark.reducer.maxSizeInFlight样刷,”48”)
5.2、解決JVM GC導(dǎo)致的shuffle文件拉取失敗
5.2.1览爵、問題描述
有時(shí)會(huì)出現(xiàn)一種情況置鼻,在spark的作業(yè)中,log日志會(huì)提示shuffle file not found蜓竹。(spark作業(yè)中箕母,非常常見的)而且有的時(shí)候,它是偶爾才會(huì)出現(xiàn)的一種情況俱济。有的時(shí)候嘶是,出現(xiàn)這種情況以后,重新去提交task蛛碌。重新執(zhí)行一遍聂喇,發(fā)現(xiàn)就好了。沒有這種錯(cuò)誤了蔚携。
log怎么看希太?用client模式去提交你的spark作業(yè)。比如standalone client或yarn client酝蜒。一提交作業(yè)誊辉,直接可以在本地看到更新的log。
問題原因:比如亡脑,executor的JVM進(jìn)程可能內(nèi)存不夠用了芥映。那么此時(shí)就會(huì)執(zhí)行GC。minor GC or full GC远豺。此時(shí)就會(huì)導(dǎo)致executor內(nèi)奈偏,所有的工作線程全部停止。比如BlockManager躯护,基于netty的網(wǎng)絡(luò)通信惊来。
下一個(gè)stage的executor,可能還沒有停止掉的task想要去上一個(gè)stage的task所在的exeuctor去拉取屬于自己的數(shù)據(jù)棺滞,結(jié)果由于對(duì)方正在gc裁蚁,就導(dǎo)致拉取了半天沒有拉取到矢渊。
就很可能會(huì)報(bào)出shuffle file not found。但是枉证,可能下一個(gè)stage又重新提交了task以后矮男,再執(zhí)行就沒有問題了,因?yàn)榭赡艿诙尉蜎]有碰到JVM在gc了室谚。
5.2.2毡鉴、解決方案
spark.shuffle.io.maxRetries 3
第一個(gè)參數(shù),意思就是說秒赤,shuffle文件拉取的時(shí)候猪瞬,如果沒有拉取到(拉取失敗)入篮,最多或重試幾次(會(huì)重新拉取幾次文件)陈瘦,默認(rèn)是3次。
spark.shuffle.io.retryWait 5s
第二個(gè)參數(shù)潮售,意思就是說痊项,每一次重試?yán)∥募臅r(shí)間間隔,默認(rèn)是5s鐘酥诽。
默認(rèn)情況下鞍泉,假如說第一個(gè)stage的executor正在進(jìn)行漫長的full gc。第二個(gè)stage的executor嘗試去拉取文件盆均,結(jié)果沒有拉取到塞弊,默認(rèn)情況下漱逸,會(huì)反復(fù)重試?yán)?次泪姨,每次間隔是五秒鐘。最多只會(huì)等待3 * 5s = 15s饰抒。如果15s內(nèi)肮砾,沒有拉取到shuffle file。就會(huì)報(bào)出shuffle file not found袋坑。
針對(duì)這種情況仗处,我們完全可以進(jìn)行預(yù)備性的參數(shù)調(diào)節(jié)。增大上述兩個(gè)參數(shù)的值枣宫,達(dá)到比較大的一個(gè)值婆誓,盡量保證第二個(gè)stage的task,一定能夠拉取到上一個(gè)stage的輸出文件也颤。避免報(bào)shuffle file not found洋幻。然后可能會(huì)重新提交stage和task去執(zhí)行。那樣反而對(duì)性能也不好翅娶。
spark.shuffle.io.maxRetries 60
spark.shuffle.io.retryWait 60s
最多可以忍受1個(gè)小時(shí)沒有拉取到shuffle file文留。只是去設(shè)置一個(gè)最大的可能的值好唯。full gc不可能1個(gè)小時(shí)都沒結(jié)束吧。
這樣呢燥翅,就可以盡量避免因?yàn)間c導(dǎo)致的shuffle file not found骑篙,無法拉取到的問題。
5.3森书、YARN隊(duì)列資源不足導(dǎo)致的application直接失敗
5.3.1靶端、問題描述
如果說,你是基于yarn來提交spark拄氯。比如yarn-cluster或者yarn-client躲查。你可以指定提交到某個(gè)hadoop隊(duì)列上的。每個(gè)隊(duì)列都是可以有自己的資源的译柏。
跟大家說一個(gè)生產(chǎn)環(huán)境中的镣煮,給spark用的yarn資源隊(duì)列的情況:500G內(nèi)存,200個(gè)cpu core鄙麦。
比如說典唇,某個(gè)spark application,在spark-submit里面你自己配了胯府,executor介衔,80個(gè)。每個(gè)executor骂因,4G內(nèi)存炎咖。每個(gè)executor,2個(gè)cpu core寒波。你的spark作業(yè)每次運(yùn)行乘盼,大概要消耗掉320G內(nèi)存,以及160個(gè)cpu core俄烁。
乍看起來绸栅,咱們的隊(duì)列資源,是足夠的页屠,500G內(nèi)存粹胯,280個(gè)cpu core。
首先辰企,第一點(diǎn)风纠,你的spark作業(yè)實(shí)際運(yùn)行起來以后,耗費(fèi)掉的資源量牢贸,可能是比你在spark-submit里面配置的竹观,以及你預(yù)期的,是要大一些的十减。400G內(nèi)存栈幸,190個(gè)cpu core愤估。
那么這個(gè)時(shí)候,的確速址,咱們的隊(duì)列資源還是有一些剩余的玩焰。但問題是如果你同時(shí)又提交了一個(gè)spark作業(yè)上去,一模一樣的芍锚。那就可能會(huì)出問題昔园。
第二個(gè)spark作業(yè),又要申請(qǐng)320G內(nèi)存+160個(gè)cpu core并炮。結(jié)果默刚,發(fā)現(xiàn)隊(duì)列資源不足。
此時(shí)逃魄,可能會(huì)出現(xiàn)兩種情況:(備注荤西,具體出現(xiàn)哪種情況,跟你的YARN伍俘、Hadoop的版本邪锌,你們公司的一些運(yùn)維參數(shù),以及配置癌瘾、硬件觅丰、資源肯能都有關(guān)系)
1、YARN妨退,發(fā)現(xiàn)資源不足時(shí)妇萄,你的spark作業(yè),并沒有hang在那里咬荷,等待資源的分配冠句,而是直接打印一行fail的log入热,直接就fail掉了策严。
2、YARN,發(fā)現(xiàn)資源不足逝变,你的spark作業(yè),就hang在那里奋构。一直等待之前的spark作業(yè)執(zhí)行完壳影,等待有資源分配給自己來執(zhí)行。
5.3.2弥臼、解決方案
1宴咧、在你的J2EE(我們這個(gè)項(xiàng)目里面,spark作業(yè)的運(yùn)行径缅, J2EE平臺(tái)觸發(fā)的掺栅,執(zhí)行spark-submit腳本的平臺(tái))進(jìn)行限制烙肺,同時(shí)只能提交一個(gè)spark作業(yè)到y(tǒng)arn上去執(zhí)行,確保一個(gè)spark作業(yè)的資源肯定是有的氧卧。
2桃笙、你應(yīng)該采用一些簡單的調(diào)度區(qū)分的方式,比如說沙绝,有的spark作業(yè)可能是要長時(shí)間運(yùn)行的搏明,比如運(yùn)行30分鐘。有的spark作業(yè)闪檬,可能是短時(shí)間運(yùn)行的星著,可能就運(yùn)行2分鐘。此時(shí)粗悯,都提交到一個(gè)隊(duì)列上去虚循,肯定不合適。很可能出現(xiàn)30分鐘的作業(yè)卡住后面一大堆2分鐘的作業(yè)样傍。分隊(duì)列邮丰,可以申請(qǐng)(跟你們的YARN、Hadoop運(yùn)維的同事申請(qǐng))铭乾。你自己給自己搞兩個(gè)調(diào)度隊(duì)列剪廉。每個(gè)隊(duì)列的根據(jù)你要執(zhí)行的作業(yè)的情況來設(shè)置。在你的J2EE程序里面炕檩,要判斷斗蒋,如果是長時(shí)間運(yùn)行的作業(yè),就干脆都提交到某一個(gè)固定的隊(duì)列里面去把笛质。如果是短時(shí)間運(yùn)行的作業(yè)泉沾,就統(tǒng)一提交到另外一個(gè)隊(duì)列里面去。這樣妇押,避免了長時(shí)間運(yùn)行的作業(yè)跷究,阻塞了短時(shí)間運(yùn)行的作業(yè)。
3敲霍、你的隊(duì)列里面俊马,無論何時(shí),只會(huì)有一個(gè)作業(yè)在里面運(yùn)行肩杈。那么此時(shí)柴我,就應(yīng)該用我們之前講過的性能調(diào)優(yōu)的手段,去將每個(gè)隊(duì)列能承載的最大的資源扩然,分配給你的每一個(gè)spark作業(yè)艘儒,比如80個(gè)executor,6G的內(nèi)存,3個(gè)cpu core界睁。盡量讓你的spark作業(yè)每一次運(yùn)行觉增,都達(dá)到最滿的資源使用率,最快的速度翻斟,最好的性能抑片。并行度,240個(gè)cpu core杨赤,720個(gè)task敞斋。
4、在J2EE中疾牲,通過線程池的方式(一個(gè)線程池對(duì)應(yīng)一個(gè)資源隊(duì)列)植捎,來實(shí)現(xiàn)上述我們說的方案。
5.4阳柔、解決各種序列化導(dǎo)致的報(bào)錯(cuò)
5.4.1焰枢、問題描述
用client模式去提交spark作業(yè),觀察本地打印出來的log舌剂。如果出現(xiàn)了類似于Serializable济锄、Serialize等等字眼報(bào)錯(cuò)的log,那么恭喜大家霍转,就碰到了序列化問題導(dǎo)致的報(bào)錯(cuò)荐绝。
5.4.2、序列化報(bào)錯(cuò)及解決方法
1避消、你的算子函數(shù)里面低滩,如果使用到了外部的自定義類型的變量,那么此時(shí)岩喷,就要求你的自定義類型恕沫,必須是可序列化的。
final Teacher teacher = new Teacher("leo");
studentsRDD.foreach(new VoidFunction() {
public void call(Row row) throws Exception {
String teacherName = teacher.getName();
....
}
});
public class Teacher implements Serializable {
}
2纱意、如果要將自定義的類型婶溯,作為RDD的元素類型,那么自定義的類型也必須是可以序列化的偷霉。
JavaPairRDD<Integer, Teacher> teacherRDD
JavaPairRDD<Integer, Student> studentRDD
studentRDD.join(teacherRDD)
public class Teacher implements Serializable {
}
public class Student implements Serializable {
}
3迄委、不能在上述兩種情況下,去使用一些第三方的腾它,不支持序列化的類型跑筝。
Connection conn =
studentsRDD.foreach(new VoidFunction() {
public void call(Row row) throws Exception {
conn.....
}
});
Connection是不支持序列化的
5.5死讹、解決算子函數(shù)返回NULL導(dǎo)致的問題
5.5.1瞒滴、問題描述
在有些算子函數(shù)里面,是需要我們有一個(gè)返回值的。但是妓忍,有時(shí)候不需要返回值虏两。我們?nèi)绻苯臃祷豊ULL的話,是會(huì)報(bào)錯(cuò)的世剖。
Scala.Math(NULL)定罢,異常
5.5.2、解決方案
如果碰到你的確是對(duì)于某些值不想要有返回值的話旁瘫,有一個(gè)解決的辦法:
1祖凫、在返回的時(shí)候,返回一些特殊的值酬凳,不要返回null惠况,比如“-999”
2、在通過算子獲取到了一個(gè)RDD之后宁仔,可以對(duì)這個(gè)RDD執(zhí)行filter操作稠屠,進(jìn)行數(shù)據(jù)過濾。filter內(nèi)翎苫,可以對(duì)數(shù)據(jù)進(jìn)行判定权埠,如果是-999,那么就返回false煎谍,給過濾掉就可以了攘蔽。
3、大家不要忘了呐粘,之前咱們講過的那個(gè)算子調(diào)優(yōu)里面的coalesce算子秩彤,在filter之后,可以使用coalesce算子壓縮一下RDD的partition的數(shù)量事哭,讓各個(gè)partition的數(shù)據(jù)比較緊湊一些漫雷。也能提升一些性能。
5.6鳍咱、解決yarn-client模式導(dǎo)致的網(wǎng)卡流量激增問題
5.6.1降盹、Spark-On-Yarn任務(wù)執(zhí)行流程
Driver到底是什么?
我們寫的spark程序谤辜,打成jar包蓄坏,用spark-submit來提交。jar包中的一個(gè)main類丑念,通過jvm的命令啟動(dòng)起來涡戳。
JVM進(jìn)程,其實(shí)就是Driver進(jìn)程脯倚。
Driver進(jìn)程啟動(dòng)起來以后渔彰,執(zhí)行我們自己寫的main函數(shù)嵌屎,從new SparkContext()開始。
driver接收到屬于自己的executor進(jìn)程的注冊(cè)之后恍涂,就可以去進(jìn)行我們寫的spark作業(yè)代碼的執(zhí)行了宝惰。此時(shí)會(huì)一行一行的去執(zhí)行咱們寫的那些spark代碼。執(zhí)行到某個(gè)action操作的時(shí)候再沧,就會(huì)觸發(fā)一個(gè)job尼夺,然后DAGScheduler會(huì)將job劃分為一個(gè)一個(gè)的stage,為每個(gè)stage都創(chuàng)建指定數(shù)量的task炒瘸。TaskScheduler將每個(gè)stage的task分配到各個(gè)executor上面去執(zhí)行淤堵。
task就會(huì)執(zhí)行咱們寫的算子函數(shù)。
spark在yarn-client模式下顷扩,application的注冊(cè)(executor的申請(qǐng))和計(jì)算task的調(diào)度粘勒,是分離開來的。
standalone模式下屎即,這兩個(gè)操作都是driver負(fù)責(zé)的庙睡。
ApplicationMaster(ExecutorLauncher)負(fù)責(zé)executor的申請(qǐng),driver負(fù)責(zé)job和stage的劃分技俐,以及task的創(chuàng)建乘陪、分配和調(diào)度。
每種計(jì)算框架(mr雕擂、spark)啡邑,如果想要在yarn上執(zhí)行自己的計(jì)算應(yīng)用,那么就必須自己實(shí)現(xiàn)和提供一個(gè)ApplicationMaster井赌。相當(dāng)于是實(shí)現(xiàn)了yarn提供的接口谤逼,spark自己開發(fā)的一個(gè)類。
5.6.2仇穗、yarn-client模式下流部,會(huì)產(chǎn)生什么樣的問題呢?
由于driver是啟動(dòng)在本地機(jī)器的纹坐,而且driver是全權(quán)負(fù)責(zé)所有的任務(wù)的調(diào)度的枝冀,也就是說要跟yarn集群上運(yùn)行的多個(gè)executor進(jìn)行頻繁的通信(中間有task的啟動(dòng)消息、task的執(zhí)行統(tǒng)計(jì)消息耘子、task的運(yùn)行狀態(tài)果漾、shuffle的輸出結(jié)果)。
想象一下谷誓,比如你的executor有100個(gè)绒障,stage有10個(gè),task有1000個(gè)捍歪。每個(gè)stage運(yùn)行的時(shí)候户辱,都有1000個(gè)task提交到executor上面去運(yùn)行鸵钝,平均每個(gè)executor有10個(gè)task。接下來問題來了焕妙,driver要頻繁地跟executor上運(yùn)行的1000個(gè)task進(jìn)行通信蒋伦。通信消息特別多弓摘,通信的頻率特別高焚鹊。運(yùn)行完一個(gè)stage,接著運(yùn)行下一個(gè)stage韧献,又是頻繁的通信末患。
在整個(gè)spark運(yùn)行的生命周期內(nèi),都會(huì)頻繁的去進(jìn)行通信和調(diào)度锤窑。所有這一切通信和調(diào)度都是從你的本地機(jī)器上發(fā)出去的璧针,和接收到的。這是最要命的地方渊啰。你的本地機(jī)器探橱,很可能在30分鐘內(nèi)(spark作業(yè)運(yùn)行的周期內(nèi)),進(jìn)行頻繁大量的網(wǎng)絡(luò)通信绘证。那么此時(shí)隧膏,你的本地機(jī)器的網(wǎng)絡(luò)通信負(fù)載是非常非常高的。會(huì)導(dǎo)致你的本地機(jī)器的網(wǎng)卡流量會(huì)激增嚷那!
你的本地機(jī)器的網(wǎng)卡流量激增胞枕,當(dāng)然不是一件好事了。因?yàn)樵谝恍┐蟮墓纠锩嫖嚎恚瑢?duì)每臺(tái)機(jī)器的使用情況腐泻,都是有監(jiān)控的。不會(huì)允許單個(gè)機(jī)器出現(xiàn)耗費(fèi)大量網(wǎng)絡(luò)帶寬等等這種資源的情況队询。
5.6.3派桩、解決方案
實(shí)際上解決的方法很簡單,就是心里要清楚蚌斩,yarn-client模式是什么情況下窄坦,可以使用的?yarn-client模式凳寺,通常咱們就只會(huì)使用在測(cè)試環(huán)境中鸭津,你寫好了某個(gè)spark作業(yè),打了一個(gè)jar包肠缨,在某臺(tái)測(cè)試機(jī)器上逆趋,用yarn-client模式去提交一下。因?yàn)闇y(cè)試的行為是偶爾為之的晒奕,不會(huì)長時(shí)間連續(xù)提交大量的spark作業(yè)去測(cè)試闻书。還有一點(diǎn)好處名斟,yarn-client模式提交,可以在本地機(jī)器觀察到詳細(xì)全面的log魄眉。通過查看log砰盐,可以去解決線上報(bào)錯(cuò)的故障(troubleshooting)、對(duì)性能進(jìn)行觀察并進(jìn)行性能調(diào)優(yōu)坑律。
實(shí)際上線了以后岩梳,在生產(chǎn)環(huán)境中,都得用yarn-cluster模式晃择,去提交你的spark作業(yè)冀值。
yarn-cluster模式,就跟你的本地機(jī)器引起的網(wǎng)卡流量激增的問題宫屠,就沒有關(guān)系了列疗。也就是說,就算有問題浪蹂,也應(yīng)該是yarn運(yùn)維團(tuán)隊(duì)和基礎(chǔ)運(yùn)維團(tuán)隊(duì)之間的事情了抵栈。使用了yarn-cluster模式以后,就不是你的本地機(jī)器運(yùn)行Driver坤次,進(jìn)行task調(diào)度了古劲。是yarn集群中,某個(gè)節(jié)點(diǎn)會(huì)運(yùn)行driver進(jìn)程浙踢,負(fù)責(zé)task調(diào)度绢慢。
5.7、解決yarn-cluster模式的JVM棧內(nèi)存溢出問題
5.7.1洛波、問題描述
有的時(shí)候胰舆,運(yùn)行一些包含了spark sql的spark作業(yè),可能會(huì)碰到y(tǒng)arn-client模式下蹬挤,可以正常提交運(yùn)行缚窿。yarn-cluster模式下,可能無法提交運(yùn)行的焰扳,會(huì)報(bào)出JVM的PermGen(永久代)的內(nèi)存溢出倦零,OOM。
yarn-client模式下吨悍,driver是運(yùn)行在本地機(jī)器上的扫茅,spark使用的JVM的PermGen的配置,是本地的spark-class文件(spark客戶端是默認(rèn)有配置的)育瓜,JVM的永久代的大小是128M恋脚,這個(gè)是沒有問題的。但是在yarn-cluster模式下糟描,driver是運(yùn)行在yarn集群的某個(gè)節(jié)點(diǎn)上的怀喉,使用的是沒有經(jīng)過配置的默認(rèn)設(shè)置(PermGen永久代大杏蚩丁)李皇,82M哪亿。
spark-sql戳气,它的內(nèi)部是要進(jìn)行很復(fù)雜的SQL的語義解析夜赵、語法樹的轉(zhuǎn)換等等,特別復(fù)雜堕虹。在這種復(fù)雜的情況下掰派,如果說你的sql本身特別復(fù)雜的話从诲,很可能會(huì)比較導(dǎo)致性能的消耗,內(nèi)存的消耗靡羡∠德澹可能對(duì)PermGen永久代的占用會(huì)比較大。
所以略步,此時(shí)描扯,如果對(duì)永久代的占用需求,超過了82M的話趟薄,但是呢又在128M以內(nèi)绽诚,就會(huì)出現(xiàn)如上所述的問題,yarn-client模式下竟趾,默認(rèn)是128M憔购,這個(gè)還能運(yùn)行宫峦,如果在yarn-cluster模式下岔帽,默認(rèn)是82M,就有問題了导绷。會(huì)報(bào)出PermGen Out of Memory error log犀勒。
5.7.2、解決方案
既然是JVM的PermGen永久代內(nèi)存溢出妥曲,那么就是內(nèi)存不夠用贾费。我們就給yarn-cluster模式下的driver的PermGen多設(shè)置一些。
spark-submit腳本中檐盟,加入以下配置即可:
--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"
這個(gè)就設(shè)置了driver永久代的大小褂萧,默認(rèn)是128M,最大是256M葵萎。這樣的話导犹,就可以基本保證你的spark作業(yè)不會(huì)出現(xiàn)上述的yarn-cluster模式導(dǎo)致的永久代內(nèi)存溢出的問題。
spark sql中羡忘,寫sql谎痢,要注意一個(gè)問題:
如果sql有大量的or語句。比如where keywords='' or keywords='' or keywords=''
當(dāng)達(dá)到or語句卷雕,有成百上千的時(shí)候节猿,此時(shí)可能就會(huì)出現(xiàn)一個(gè)driver端的jvm stack overflow,JVM棧內(nèi)存溢出的問題漫雕。
JVM棧內(nèi)存溢出滨嘱,基本上就是由于調(diào)用的方法層級(jí)過多峰鄙,因?yàn)楫a(chǎn)生了大量的,非常深的太雨,超出了JVM棧深度限制的遞歸方法先馆。我們的猜測(cè),spark sql有大量or語句的時(shí)候躺彬,spark sql內(nèi)部源碼中煤墙,在解析sql,比如轉(zhuǎn)換成語法樹宪拥,或者進(jìn)行執(zhí)行計(jì)劃的生成的時(shí)候仿野,對(duì)or的處理是遞歸。or特別多的話她君,就會(huì)發(fā)生大量的遞歸脚作。
JVM Stack Memory Overflow,棧內(nèi)存溢出缔刹。
這種時(shí)候球涛,建議不要搞那么復(fù)雜的spark sql語句。采用替代方案:將一條sql語句校镐,拆解成多條sql語句來執(zhí)行亿扁。每條sql語句,就只有100個(gè)or子句以內(nèi)鸟廓。一條一條SQL語句來執(zhí)行从祝。根據(jù)生產(chǎn)環(huán)境經(jīng)驗(yàn)的測(cè)試,一條sql語句引谜,100個(gè)or子句以內(nèi)牍陌,是還可以的。通常情況下员咽,不會(huì)報(bào)那個(gè)棧內(nèi)存溢出毒涧。
5.7、錯(cuò)誤的持久化方式以及checkpoint的使用
5.7.1贝室、使用持久化方式
錯(cuò)誤的持久化使用方式:
usersRDD契讲,想要對(duì)這個(gè)RDD做一個(gè)cache,希望能夠在后面多次使用這個(gè)RDD的時(shí)候档玻,不用反復(fù)重新計(jì)算RDD怀泊。可以直接使用通過各個(gè)節(jié)點(diǎn)上的executor的BlockManager管理的內(nèi)存 / 磁盤上的數(shù)據(jù)误趴,避免重新反復(fù)計(jì)算RDD霹琼。
usersRDD.cache()
usersRDD.count()
usersRDD.take()
上面這種方式,不要說會(huì)不會(huì)生效了,實(shí)際上是會(huì)報(bào)錯(cuò)的枣申。會(huì)報(bào)什么錯(cuò)誤呢售葡?會(huì)報(bào)一大堆file not found的錯(cuò)誤。
正確的持久化使用方式:
usersRDD
usersRDD = usersRDD.cache() // Java
val cachedUsersRDD = usersRDD.cache() // Scala
之后再去使用usersRDD忠藤,或者cachedUsersRDD就可以了挟伙。
5.7.2、checkpoint的使用
對(duì)于持久化模孩,大多數(shù)時(shí)候都是會(huì)正常工作的尖阔。但有些時(shí)候會(huì)出現(xiàn)意外。
比如說榨咐,緩存在內(nèi)存中的數(shù)據(jù)介却,可能莫名其妙就丟失掉了。
或者說块茁,存儲(chǔ)在磁盤文件中的數(shù)據(jù)齿坷,莫名其妙就沒了,文件被誤刪了数焊。
出現(xiàn)上述情況的時(shí)候永淌,如果要對(duì)這個(gè)RDD執(zhí)行某些操作,可能會(huì)發(fā)現(xiàn)RDD的某個(gè)partition找不到了佩耳。
下來task就會(huì)對(duì)消失的partition重新計(jì)算遂蛀,計(jì)算完以后再緩存和使用。
有些時(shí)候蚕愤,計(jì)算某個(gè)RDD答恶,可能是極其耗時(shí)的∑加眨可能RDD之前有大量的父RDD。那么如果你要重新計(jì)算一個(gè)partition污呼,可能要重新計(jì)算之前所有的父RDD對(duì)應(yīng)的partition裕坊。
這種情況下,就可以選擇對(duì)這個(gè)RDD進(jìn)行checkpoint燕酷,以防萬一籍凝。進(jìn)行checkpoint,就是說苗缩,會(huì)將RDD的數(shù)據(jù)饵蒂,持久化一份到容錯(cuò)的文件系統(tǒng)上(比如hdfs)。
在對(duì)這個(gè)RDD進(jìn)行計(jì)算的時(shí)候酱讶,如果發(fā)現(xiàn)它的緩存數(shù)據(jù)不見了退盯。優(yōu)先就是先找一下有沒有checkpoint數(shù)據(jù)(到hdfs上面去找)。如果有的話,就使用checkpoint數(shù)據(jù)了渊迁。不至于去重新計(jì)算慰照。
checkpoint,其實(shí)就是可以作為是cache的一個(gè)備胎琉朽。如果cache失效了毒租,checkpoint就可以上來使用了。
checkpoint有利有弊箱叁,利在于墅垮,提高了spark作業(yè)的可靠性,一旦發(fā)生問題耕漱,還是很可靠的噩斟,不用重新計(jì)算大量的rdd。但是弊在于孤个,進(jìn)行checkpoint操作的時(shí)候剃允,也就是將rdd數(shù)據(jù)寫入hdfs中的時(shí)候,還是會(huì)消耗性能的齐鲤。
checkpoint斥废,用性能換可靠性。
checkpoint原理:
1给郊、在代碼中牡肉,用SparkContext,設(shè)置一個(gè)checkpoint目錄淆九,可以是一個(gè)容錯(cuò)文件系統(tǒng)的目錄统锤,比如hdfs。
2炭庙、在代碼中饲窿,對(duì)需要進(jìn)行checkpoint的rdd,執(zhí)行RDD.checkpoint()焕蹄。
3逾雄、RDDCheckpointData(spark內(nèi)部的API),接管你的RDD腻脏,會(huì)標(biāo)記為marked for checkpoint鸦泳,準(zhǔn)備進(jìn)行checkpoint。
4永品、你的job運(yùn)行完之后做鹰,會(huì)調(diào)用一個(gè)finalRDD.doCheckpoint()方法,會(huì)順著rdd lineage鼎姐,回溯掃描钾麸,發(fā)現(xiàn)有標(biāo)記為待checkpoint的rdd更振,就會(huì)進(jìn)行二次標(biāo)記,inProgressCheckpoint喂走,正在接受checkpoint操作殃饿。
5、job執(zhí)行完之后芋肠,就會(huì)啟動(dòng)一個(gè)內(nèi)部的新job乎芳,去將標(biāo)記為inProgressCheckpoint的rdd的數(shù)據(jù),都寫入hdfs文件中帖池。(備注奈惑,如果rdd之前cache過,會(huì)直接從緩存中獲取數(shù)據(jù)睡汹,寫入hdfs中姜骡。如果沒有cache過馁龟,那么就會(huì)重新計(jì)算一遍這個(gè)rdd承桥,再checkpoint)柒爵。
6、將checkpoint過的rdd之前的依賴rdd彤叉,改成一個(gè)CheckpointRDD*庶柿,強(qiáng)制改變你的rdd的lineage。后面如果rdd的cache數(shù)據(jù)獲取失敗秽浇,直接會(huì)通過它的上游CheckpointRDD浮庐,去容錯(cuò)的文件系統(tǒng),比如hdfs柬焕,中审残,獲取checkpoint的數(shù)據(jù)。
checkpoint的使用:
1斑举、sc.checkpointFile("hdfs://")搅轿,設(shè)置checkpoint目錄
2、對(duì)RDD執(zhí)行checkpoint操作
6懂昂、數(shù)據(jù)傾斜解決方案
數(shù)據(jù)傾斜的解決介时,跟之前講解的性能調(diào)優(yōu),有一點(diǎn)異曲同工之妙凌彬。
性能調(diào)優(yōu)中最有效最直接最簡單的方式就是加資源加并行度,并注意RDD架構(gòu)(復(fù)用同一個(gè)RDD循衰,加上cache緩存)铲敛。相對(duì)于前面,shuffle会钝、jvm等是次要的伐蒋。
6.1工三、原理以及現(xiàn)象分析
6.1.1、數(shù)據(jù)傾斜怎么出現(xiàn)的
在執(zhí)行shuffle操作的時(shí)候先鱼,是按照key俭正,來進(jìn)行values的數(shù)據(jù)的輸出、拉取和聚合的焙畔。
同一個(gè)key的values掸读,一定是分配到一個(gè)reduce task進(jìn)行處理的。
多個(gè)key對(duì)應(yīng)的values宏多,比如一共是90萬儿惫。可能某個(gè)key對(duì)應(yīng)了88萬數(shù)據(jù)伸但,被分配到一個(gè)task上去面去執(zhí)行肾请。
另外兩個(gè)task,可能各分配到了1萬數(shù)據(jù)更胖,可能是數(shù)百個(gè)key铛铁,對(duì)應(yīng)的1萬條數(shù)據(jù)。
這樣就會(huì)出現(xiàn)數(shù)據(jù)傾斜問題却妨。
想象一下饵逐,出現(xiàn)數(shù)據(jù)傾斜以后的運(yùn)行的情況。很糟糕管呵!
其中兩個(gè)task梳毙,各分配到了1萬數(shù)據(jù),可能同時(shí)在10分鐘內(nèi)都運(yùn)行完了捐下。另外一個(gè)task有88萬條账锹,88 * 10 = 880分鐘 = 14.5個(gè)小時(shí)。
大家看坷襟,本來另外兩個(gè)task很快就運(yùn)行完畢了(10分鐘)奸柬,但是由于一個(gè)拖后腿的家伙,第三個(gè)task婴程,要14.5個(gè)小時(shí)才能運(yùn)行完廓奕,就導(dǎo)致整個(gè)spark作業(yè),也得14.5個(gè)小時(shí)才能運(yùn)行完档叔。
數(shù)據(jù)傾斜桌粉,一旦出現(xiàn),是不是性能殺手衙四?铃肯!
6.1.2、發(fā)生數(shù)據(jù)傾斜以后的現(xiàn)象
Spark數(shù)據(jù)傾斜传蹈,有兩種表現(xiàn):
1押逼、你的大部分的task步藕,都執(zhí)行的特別特別快,(你要用client模式挑格,standalone client咙冗,yarn client,本地機(jī)器一執(zhí)行spark-submit腳本漂彤,就會(huì)開始打印log)雾消,task175 finished,剩下幾個(gè)task显歧,執(zhí)行的特別特別慢仪或,前面的task,一般1s可以執(zhí)行完5個(gè)士骤,最后發(fā)現(xiàn)1000個(gè)task范删,998,999 task拷肌,要執(zhí)行1個(gè)小時(shí)到旦,2個(gè)小時(shí)才能執(zhí)行完一個(gè)task。
出現(xiàn)以上loginfo巨缘,就表明出現(xiàn)數(shù)據(jù)傾斜了添忘。
這樣還算好的,因?yàn)殡m然老牛拉破車一樣非常慢若锁,但是至少還能跑搁骑。
2、另一種情況是又固,運(yùn)行的時(shí)候仲器,其他task都執(zhí)行完了,也沒什么特別的問題仰冠,但是有的task乏冀,就是會(huì)突然間報(bào)了一個(gè)OOM,JVM Out Of Memory洋只,內(nèi)存溢出了辆沦,task failed,task lost识虚,resubmitting task肢扯。反復(fù)執(zhí)行幾次都到了某個(gè)task就是跑不通,最后就掛掉担锤。
某個(gè)task就直接OOM鹃彻,那么基本上也是因?yàn)閿?shù)據(jù)傾斜了,task分配的數(shù)量實(shí)在是太大了妻献!所以內(nèi)存放不下蛛株,然后你的task每處理一條數(shù)據(jù),還要?jiǎng)?chuàng)建大量的對(duì)象育拨,內(nèi)存爆掉了谨履。
這樣也表明出現(xiàn)數(shù)據(jù)傾斜了。
這種就不太好了熬丧,因?yàn)槟愕某绦蛉绻蝗ソ鉀Q數(shù)據(jù)傾斜的問題笋粟,壓根兒就跑不出來。
作業(yè)都跑不完析蝴,還談什么性能調(diào)優(yōu)這些東西害捕?!
6.1.3闷畸、定位數(shù)據(jù)傾斜出現(xiàn)的原因與出現(xiàn)問題的位置
根據(jù)log去定位
出現(xiàn)數(shù)據(jù)傾斜的原因尝盼,基本只可能是因?yàn)榘l(fā)生了shuffle操作,在shuffle的過程中佑菩,出現(xiàn)了數(shù)據(jù)傾斜的問題盾沫。因?yàn)槟硞€(gè)或者某些key對(duì)應(yīng)的數(shù)據(jù),遠(yuǎn)遠(yuǎn)的高于其他的key殿漠。
1赴精、你在自己的程序里面找找,哪些地方用了會(huì)產(chǎn)生shuffle的算子绞幌,groupByKey蕾哟、countByKey、reduceByKey莲蜘、join
2谭确、看log
log一般會(huì)報(bào)是在你的哪一行代碼,導(dǎo)致了OOM異常菇夸∏砀唬或者看log,看看是執(zhí)行到了第幾個(gè)stage庄新。spark代碼鞠眉,是怎么劃分成一個(gè)一個(gè)的stage的。哪一個(gè)stage生成的task特別慢择诈,就能夠自己用肉眼去對(duì)你的spark代碼進(jìn)行stage的劃分械蹋,就能夠通過stage定位到你的代碼,到底哪里發(fā)生了數(shù)據(jù)傾斜羞芍。
6.2哗戈、聚合源數(shù)據(jù)以及過濾導(dǎo)致傾斜的key
數(shù)據(jù)傾斜解決方案,第一個(gè)方案和第二個(gè)方案荷科,一起來講唯咬。這兩個(gè)方案是最直接纱注、最有效、最簡單的解決數(shù)據(jù)傾斜問題的方案胆胰。
第一個(gè)方案:聚合源數(shù)據(jù)狞贱。
第二個(gè)方案:過濾導(dǎo)致傾斜的key。
后面的五個(gè)方案蜀涨,尤其是最后4個(gè)方案瞎嬉,都是那種特別狂拽炫酷吊炸天的方案。但沒有第一二個(gè)方案簡單直接厚柳。如果碰到了數(shù)據(jù)傾斜的問題氧枣。上來就先考慮第一個(gè)和第二個(gè)方案看能不能做,如果能做的話别垮,后面的5個(gè)方案便监,都不用去搞了。
有效宰闰、簡單茬贵、直接才是最好的,徹底根除了數(shù)據(jù)傾斜的問題移袍。
6.2.1解藻、方案一:聚合源數(shù)據(jù)
一些聚合的操作,比如groupByKey葡盗、reduceByKey螟左,groupByKey說白了就是拿到每個(gè)key對(duì)應(yīng)的values。reduceByKey說白了就是對(duì)每個(gè)key對(duì)應(yīng)的values執(zhí)行一定的計(jì)算觅够。
這些操作胶背,比如groupByKey和reduceByKey,包括之前說的join喘先。都是在spark作業(yè)中執(zhí)行的钳吟。
spark作業(yè)的數(shù)據(jù)來源,通常是哪里呢窘拯?90%的情況下红且,數(shù)據(jù)來源都是hive表(hdfs,大數(shù)據(jù)分布式存儲(chǔ)系統(tǒng))涤姊。hdfs上存儲(chǔ)的大數(shù)據(jù)暇番。hive表中的數(shù)據(jù)通常是怎么出來的呢?有了spark以后思喊,hive比較適合做什么事情壁酬?hive就是適合做離線的,晚上凌晨跑的,ETL(extract transform load舆乔,數(shù)據(jù)的采集岳服、清洗、導(dǎo)入)蜕煌,hive sql派阱,去做這些事情,從而去形成一個(gè)完整的hive中的數(shù)據(jù)倉庫斜纪。說白了,數(shù)據(jù)倉庫文兑,就是一堆表盒刚。
spark作業(yè)的源表,hive表绿贞,通常情況下來說因块,也是通過某些hive etl生成的。hive etl可能是晚上凌晨在那兒跑籍铁。今天跑昨天的數(shù)據(jù)涡上。
數(shù)據(jù)傾斜,某個(gè)key對(duì)應(yīng)的80萬數(shù)據(jù)拒名,某些key對(duì)應(yīng)幾百條吩愧,某些key對(duì)應(yīng)幾十條。現(xiàn)在咱們直接在生成hive表的hive etl中對(duì)數(shù)據(jù)進(jìn)行聚合增显。比如按key來分組雁佳,將key對(duì)應(yīng)的所有的values全部用一種特殊的格式拼接到一個(gè)字符串里面去,比如“key=sessionid, value: action_seq=1|user_id=1|search_keyword=火鍋|category_id=001;action_seq=2|user_id=1|search_keyword=涮肉|category_id=001”同云。
對(duì)key進(jìn)行g(shù)roup糖权,在spark中,拿到key=sessionid炸站,values<Iterable>星澳。hive etl中,直接對(duì)key進(jìn)行了聚合旱易。那么也就意味著禁偎,每個(gè)key就只對(duì)應(yīng)一條數(shù)據(jù)。在spark中咒唆,就不需要再去執(zhí)行g(shù)roupByKey+map這種操作了届垫。直接對(duì)每個(gè)key對(duì)應(yīng)的values字符串進(jìn)行map操作,進(jìn)行你需要的操作即可全释。
spark中装处,可能對(duì)這個(gè)操作,就不需要執(zhí)行shffule操作了,也就根本不可能導(dǎo)致數(shù)據(jù)傾斜妄迁。
或者是對(duì)每個(gè)key在hive etl中進(jìn)行聚合寝蹈,對(duì)所有values聚合一下,不一定是拼接起來登淘,可能是直接進(jìn)行計(jì)算箫老。reduceByKey計(jì)算函數(shù)應(yīng)用在hive etl中,從而得到每個(gè)key的values黔州。
聚合源數(shù)據(jù)方案第二種做法是耍鬓,你可能沒有辦法對(duì)每個(gè)key聚合出來一條數(shù)據(jù)。那么也可以做一個(gè)妥協(xié)流妻,對(duì)每個(gè)key對(duì)應(yīng)的數(shù)據(jù)牲蜀,10萬條。有好幾個(gè)粒度绅这,比如10萬條里面包含了幾個(gè)城市涣达、幾天、幾個(gè)地區(qū)的數(shù)據(jù)证薇,現(xiàn)在放粗粒度度苔。直接就按照城市粒度,做一下聚合浑度,幾個(gè)城市寇窑,幾天、幾個(gè)地區(qū)粒度的數(shù)據(jù)俺泣,都給聚合起來疗认。比如說
city_id date area_id
select ... from ... group by city_id
盡量去聚合,減少每個(gè)key對(duì)應(yīng)的數(shù)量伏钠,也許聚合到比較粗的粒度之后横漏,原先有10萬數(shù)據(jù)量的key,現(xiàn)在只有1萬數(shù)據(jù)量熟掂。減輕數(shù)據(jù)傾斜的現(xiàn)象和問題缎浇。
6.2.2、方案二:過濾導(dǎo)致傾斜的key
如果你能夠接受某些數(shù)據(jù)在spark作業(yè)中直接就摒棄掉不使用赴肚。比如說素跺,總共有100萬個(gè)key。只有2個(gè)key是數(shù)據(jù)量達(dá)到10萬的誉券。其他所有的key指厌,對(duì)應(yīng)的數(shù)量都是幾十萬。
這個(gè)時(shí)候踊跟,你自己可以去取舍踩验,如果業(yè)務(wù)和需求可以理解和接受的話,在你從hive表查詢?cè)磾?shù)據(jù)的時(shí)候,直接在sql中用where條件箕憾,過濾掉某幾個(gè)key牡借。
那么這幾個(gè)原先有大量數(shù)據(jù),會(huì)導(dǎo)致數(shù)據(jù)傾斜的key袭异,被過濾掉之后钠龙,那么在你的spark作業(yè)中,自然就不會(huì)發(fā)生數(shù)據(jù)傾斜了御铃。
6.3碴里、提高shuffle操作reduce并行度
6.3.1、問題描述
第一個(gè)和第二個(gè)方案畅买,都不適合做并闲,然后再考慮這個(gè)方案。
將reduce task的數(shù)量變多谷羞,就可以讓每個(gè)reduce task分配到更少的數(shù)據(jù)量。這樣的話也許就可以緩解甚至是基本解決掉數(shù)據(jù)傾斜的問題溜徙。
6.3.2湃缎、提升shuffle reduce端并行度的操作方法
很簡單,主要給我們所有的shuffle算子蠢壹,比如groupByKey嗓违、countByKey、reduceByKey图贸。在調(diào)用的時(shí)候蹂季,傳入進(jìn)去一個(gè)參數(shù)。那個(gè)數(shù)字疏日,就代表了那個(gè)shuffle操作的reduce端的并行度偿洁。那么在進(jìn)行shuffle操作的時(shí)候,就會(huì)對(duì)應(yīng)著創(chuàng)建指定數(shù)量的reduce task沟优。
這樣的話涕滋,就可以讓每個(gè)reduce task分配到更少的數(shù)據(jù)∧痈螅基本可以緩解數(shù)據(jù)傾斜的問題宾肺。
比如說,原本某個(gè)task分配數(shù)據(jù)特別多侵俗,直接OOM锨用,內(nèi)存溢出了,程序沒法運(yùn)行隘谣,直接掛掉增拥。按照log,找到發(fā)生數(shù)據(jù)傾斜的shuffle操作,給它傳入一個(gè)并行度數(shù)字跪者,這樣的話棵帽,原先那個(gè)task分配到的數(shù)據(jù),肯定會(huì)變少渣玲。就至少可以避免OOM的情況逗概,程序至少是可以跑的。
6.3.2忘衍、提升shuffle reduce并行度的缺陷
治標(biāo)不治本的意思逾苫,因?yàn)樗鼪]有從根本上改變數(shù)據(jù)傾斜的本質(zhì)和問題。不像第一個(gè)和第二個(gè)方案(直接避免了數(shù)據(jù)傾斜的發(fā)生)枚钓。原理沒有改變铅搓,只是說,盡可能地去緩解和減輕shuffle reduce task的數(shù)據(jù)壓力搀捷,以及數(shù)據(jù)傾斜的問題星掰。
實(shí)際生產(chǎn)環(huán)境中的經(jīng)驗(yàn):
1、如果最理想的情況下嫩舟,提升并行度以后氢烘,減輕了數(shù)據(jù)傾斜的問題,或者甚至可以讓數(shù)據(jù)傾斜的現(xiàn)象忽略不計(jì)家厌,那么就最好播玖。就不用做其他的數(shù)據(jù)傾斜解決方案了。
2饭于、不太理想的情況下蜀踏,比如之前某個(gè)task運(yùn)行特別慢,要5個(gè)小時(shí)掰吕,現(xiàn)在稍微快了一點(diǎn)果覆,變成了4個(gè)小時(shí)〕肫埽或者是原先運(yùn)行到某個(gè)task随静,直接OOM,現(xiàn)在至少不會(huì)OOM了吗讶,但是那個(gè)task運(yùn)行特別慢燎猛,要5個(gè)小時(shí)才能跑完。
那么照皆,如果出現(xiàn)第二種情況的話重绷,各位,就立即放棄第三種方案膜毁,開始去嘗試和選擇后面的四種方案昭卓。
6.4愤钾、使用隨機(jī)key實(shí)現(xiàn)雙重聚合
6.4.1、使用場(chǎng)景
groupByKey候醒、reduceByKey比較適合使用這種方式能颁。join咱們通常不會(huì)這樣來做,后面會(huì)講三種針對(duì)不同的join造成的數(shù)據(jù)傾斜的問題的解決方案倒淫。
6.4.2伙菊、解決方案
第一輪聚合的時(shí)候,對(duì)key進(jìn)行打散敌土,將原先一樣的key镜硕,變成不一樣的key,相當(dāng)于是將每個(gè)key分為多組返干。
先針對(duì)多個(gè)組兴枯,進(jìn)行key的局部聚合。接著矩欠,再去除掉每個(gè)key的前綴财剖,然后對(duì)所有的key進(jìn)行全局的聚合。
對(duì)groupByKey癌淮、reduceByKey造成的數(shù)據(jù)傾斜峰伙,有比較好的效果。
如果說该默,之前的第一、第二策彤、第三種方案栓袖,都沒法解決數(shù)據(jù)傾斜的問題,那么就只能依靠這一種方式了店诗。
6.5裹刮、將reduce join轉(zhuǎn)換為map join
6.5.1、使用方式
普通的join庞瘸,那么肯定是要走shuffle捧弃。既然是走shuffle,那么普通的join就肯定是走的是reduce join擦囊。那怎么將reduce join 轉(zhuǎn)換為mapjoin呢违霞?先將所有相同的key,對(duì)應(yīng)的value匯聚到一個(gè)task中瞬场,然后再進(jìn)行join买鸽。
6.5.2、使用場(chǎng)景
這種方式適合在什么樣的情況下來使用贯被?
如果兩個(gè)RDD要進(jìn)行join眼五,其中一個(gè)RDD是比較小的妆艘。比如一個(gè)RDD是100萬數(shù)據(jù),一個(gè)RDD是1萬數(shù)據(jù)看幼。(一個(gè)RDD是1億數(shù)據(jù)批旺,一個(gè)RDD是100萬數(shù)據(jù))。
其中一個(gè)RDD必須是比較小的诵姜,broadcast出去那個(gè)小RDD的數(shù)據(jù)以后汽煮,就會(huì)在每個(gè)executor的block manager中都保存一份。要確保你的內(nèi)存足夠存放那個(gè)小RDD中的數(shù)據(jù)茅诱。
這種方式下逗物,根本不會(huì)發(fā)生shuffle操作,肯定也不會(huì)發(fā)生數(shù)據(jù)傾斜瑟俭。從根本上杜絕了join操作可能導(dǎo)致的數(shù)據(jù)傾斜的問題翎卓。
對(duì)于join中有數(shù)據(jù)傾斜的情況,大家盡量第一時(shí)間先考慮這種方式摆寄,效果非常好失暴。
不適合的情況
兩個(gè)RDD都比較大,那么這個(gè)時(shí)候微饥,你去將其中一個(gè)RDD做成broadcast逗扒,就很笨拙了。很可能導(dǎo)致內(nèi)存不足欠橘。最終導(dǎo)致內(nèi)存溢出矩肩,程序掛掉。
而且其中某些key(或者是某個(gè)key)肃续,還發(fā)生了數(shù)據(jù)傾斜黍檩。此時(shí)可以采用最后兩種方式。
對(duì)于join這種操作始锚,不光是考慮數(shù)據(jù)傾斜的問題刽酱。即使是沒有數(shù)據(jù)傾斜問題,也完全可以優(yōu)先考慮瞧捌,用我們講的這種高級(jí)的reduce join轉(zhuǎn)map join的技術(shù)棵里,不要用普通的join,去通過shuffle姐呐,進(jìn)行數(shù)據(jù)的join殿怜。完全可以通過簡單的map,使用map join的方式皮钠,犧牲一點(diǎn)內(nèi)存資源稳捆。在可行的情況下,優(yōu)先這么使用麦轰。
不走shuffle乔夯,直接走map砖织,是不是性能也會(huì)高很多?這是肯定的末荐。
6.6侧纯、sample采樣傾斜key單獨(dú)進(jìn)行join
6.6.1、方案實(shí)現(xiàn)思路
將發(fā)生數(shù)據(jù)傾斜的key甲脏,單獨(dú)拉出來眶熬,放到一個(gè)RDD中去。就用這個(gè)原本會(huì)傾斜的key RDD跟其他RDD單獨(dú)去join一下块请,這個(gè)時(shí)候key對(duì)應(yīng)的數(shù)據(jù)可能就會(huì)分散到多個(gè)task中去進(jìn)行join操作娜氏。
就不至于說是,這個(gè)key跟之前其他的key混合在一個(gè)RDD中時(shí)墩新,肯定是會(huì)導(dǎo)致一個(gè)key對(duì)應(yīng)的所有數(shù)據(jù)都到一個(gè)task中去贸弥,就會(huì)導(dǎo)致數(shù)據(jù)傾斜。
6.6.2海渊、使用場(chǎng)景
這種方案什么時(shí)候適合使用绵疲?
優(yōu)先對(duì)于join,肯定是希望能夠采用上一個(gè)方案臣疑,即reduce join轉(zhuǎn)換map join盔憨。兩個(gè)RDD數(shù)據(jù)都比較大,那么就不要那么搞了讯沈。
針對(duì)你的RDD的數(shù)據(jù)郁岩,你可以自己把它轉(zhuǎn)換成一個(gè)中間表,或者是直接用countByKey()的方式缺狠,你可以看一下這個(gè)RDD各個(gè)key對(duì)應(yīng)的數(shù)據(jù)量驯用。此時(shí)如果你發(fā)現(xiàn)整個(gè)RDD就一個(gè),或者少數(shù)幾個(gè)key對(duì)應(yīng)的數(shù)據(jù)量特別多儒老。盡量建議,比如就是一個(gè)key對(duì)應(yīng)的數(shù)據(jù)量特別多记餐。
此時(shí)可以采用這種方案驮樊,單拉出來那個(gè)最多的key,單獨(dú)進(jìn)行join片酝,盡可能地將key分散到各個(gè)task上去進(jìn)行join操作囚衔。
什么時(shí)候不適用呢?
如果一個(gè)RDD中雕沿,導(dǎo)致數(shù)據(jù)傾斜的key特別多练湿。那么此時(shí),最好還是不要這樣了审轮。還是使用我們最后一個(gè)方案肥哎,終極的join數(shù)據(jù)傾斜的解決方案辽俗。
就是說,咱們單拉出來了一個(gè)或者少數(shù)幾個(gè)可能會(huì)產(chǎn)生數(shù)據(jù)傾斜的key篡诽,然后還可以進(jìn)行更加優(yōu)化的一個(gè)操作崖飘。
對(duì)于那個(gè)key,從另外一個(gè)要join的表中杈女,也過濾出來一份數(shù)據(jù)朱浴,比如可能就只有一條數(shù)據(jù)。userid2infoRDD达椰,一個(gè)userid key翰蠢,就對(duì)應(yīng)一條數(shù)據(jù)和悦。
然后呢适贸,采取對(duì)那個(gè)只有一條數(shù)據(jù)的RDD穴肘,進(jìn)行flatMap操作击敌,打上100個(gè)隨機(jī)數(shù)坝冕,作為前綴场躯,返回100條數(shù)據(jù)剩蟀。
單獨(dú)拉出來的可能產(chǎn)生數(shù)據(jù)傾斜的RDD嚎尤,給每一條數(shù)據(jù)猖辫,都打上一個(gè)100以內(nèi)的隨機(jī)數(shù)酥泞,作為前綴。
再去進(jìn)行join啃憎,是不是性能就更好了芝囤。肯定可以將數(shù)據(jù)進(jìn)行打散辛萍,去進(jìn)行join悯姊。join完以后,可以執(zhí)行map操作贩毕,去將之前打上的隨機(jī)數(shù)給去掉悯许,然后再和另外一個(gè)普通RDD join以后的結(jié)果進(jìn)行union操作。
6.7辉阶、使用隨機(jī)數(shù)以及擴(kuò)容表進(jìn)行join
6.7.1先壕、使用場(chǎng)景及步驟
當(dāng)采用隨機(jī)數(shù)和擴(kuò)容表進(jìn)行join解決數(shù)據(jù)傾斜的時(shí)候,就代表著谆甜,你的之前的數(shù)據(jù)傾斜的解決方案垃僚,都沒法使用。
這個(gè)方案是沒辦法徹底解決數(shù)據(jù)傾斜的规辱,更多的谆棺,是一種對(duì)數(shù)據(jù)傾斜的緩解。
步驟:
1罕袋、選擇一個(gè)RDD改淑,要用flatMap碍岔,進(jìn)行擴(kuò)容,將每條數(shù)據(jù)溅固,映射為多條數(shù)據(jù)付秕,每個(gè)映射出來的數(shù)據(jù),都帶了一個(gè)n以內(nèi)的隨機(jī)數(shù)侍郭,通常來說會(huì)選擇10询吴。
2、將另外一個(gè)RDD亮元,做普通的map映射操作猛计,每條數(shù)據(jù)都打上一個(gè)10以內(nèi)的隨機(jī)數(shù)。
3爆捞、最后將兩個(gè)處理后的RDD進(jìn)行join操作奉瘤。
6.7.2、局限性
1煮甥、因?yàn)槟愕膬蓚€(gè)RDD都很大盗温,所以你沒有辦法去將某一個(gè)RDD擴(kuò)的特別大,一般咱們就是10倍成肘。
2卖局、如果就是10倍的話,那么數(shù)據(jù)傾斜問題的確是只能說是緩解和減輕双霍,不能說徹底解決砚偶。
sample采樣傾斜key并單獨(dú)進(jìn)行join
將key,從另外一個(gè)RDD中過濾出的數(shù)據(jù)洒闸,可能只有一條或者幾條染坯,此時(shí),咱們可以任意進(jìn)行擴(kuò)容丘逸,擴(kuò)成1000倍单鹿。
將從第一個(gè)RDD中拆分出來的那個(gè)傾斜key RDD,打上1000以內(nèi)的一個(gè)隨機(jī)數(shù)深纲。
這種情況下羞反,還可以配合上,提升shuffle reduce并行度囤萤,join(rdd, 1000)。通常情況下是趴,效果還是非常不錯(cuò)的涛舍。
打散成100份,甚至1000份唆途,2000份富雅,去進(jìn)行join掸驱,那么就肯定沒有數(shù)據(jù)傾斜的問題了吧。
附:實(shí)時(shí)計(jì)算程序性能調(diào)優(yōu)
1没佑、并行化數(shù)據(jù)接收:處理多個(gè)topic的數(shù)據(jù)時(shí)比較有效
int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(numStreams);
for (int i = 0; i < numStreams; i++) {
kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();
2毕贼、spark.streaming.blockInterval:增加block數(shù)量,增加每個(gè)batch rdd的partition數(shù)量蛤奢,增加處理并行度
receiver從數(shù)據(jù)源源源不斷地獲取到數(shù)據(jù)鬼癣;首先是會(huì)按照block interval,將指定時(shí)間間隔的數(shù)據(jù)啤贩,收集為一個(gè)block待秃;默認(rèn)時(shí)間是200ms,官方推薦不要小于50ms痹屹;接著呢章郁,會(huì)將指定batch interval時(shí)間間隔內(nèi)的block,合并為一個(gè)batch志衍;創(chuàng)建為一個(gè)rdd暖庄,然后啟動(dòng)一個(gè)job,去處理這個(gè)batch rdd中的數(shù)據(jù)
batch rdd楼肪,它的partition數(shù)量是多少呢培廓?一個(gè)batch有多少個(gè)block,就有多少個(gè)partition淹辞;就意味著并行度是多少医舆;就意味著每個(gè)batch rdd有多少個(gè)task會(huì)并行計(jì)算和處理。
當(dāng)然是希望可以比默認(rèn)的task數(shù)量和并行度再多一些了象缀;可以手動(dòng)調(diào)節(jié)block interval蔬将;減少block interval;每個(gè)batch可以包含更多的block央星;有更多的partition霞怀;也就有更多的task并行處理每個(gè)batch rdd。
定死了莉给,初始的rdd過來毙石,直接就是固定的partition數(shù)量了
3、inputStream.repartition(<number of partitions>):重分區(qū)颓遏,增加每個(gè)batch rdd的partition數(shù)量
有些時(shí)候徐矩,希望對(duì)某些dstream中的rdd進(jìn)行定制化的分區(qū)
對(duì)dstream中的rdd進(jìn)行重分區(qū),去重分區(qū)成指定數(shù)量的分區(qū)叁幢,這樣也可以提高指定dstream的rdd的計(jì)算并行度
4滤灯、調(diào)節(jié)并行度
spark.default.parallelism
reduceByKey(numPartitions)
5、使用Kryo序列化機(jī)制:
spark streaming,也是有不少序列化的場(chǎng)景的
提高序列化task發(fā)送到executor上執(zhí)行的性能鳞骤,如果task很多的時(shí)候窒百,task序列化和反序列化的性能開銷也比較可觀
默認(rèn)輸入數(shù)據(jù)的存儲(chǔ)級(jí)別是StorageLevel.MEMORY_AND_DISK_SER_2,receiver接收到數(shù)據(jù)豫尽,默認(rèn)就會(huì)進(jìn)行持久化操作篙梢;首先序列化數(shù)據(jù),存儲(chǔ)到內(nèi)存中美旧;如果內(nèi)存資源不夠大渤滞,那么就寫入磁盤;而且陈症,還會(huì)寫一份冗余副本到其他executor的block manager中蔼水,進(jìn)行數(shù)據(jù)冗余。
6录肯、batch interval:每個(gè)的處理時(shí)間必須小于batch interval
實(shí)際上你的spark streaming跑起來以后趴腋,其實(shí)都是可以在spark ui上觀察它的運(yùn)行情況的;可以看到batch的處理時(shí)間论咏;
如果發(fā)現(xiàn)batch的處理時(shí)間大于batch interval优炬,就必須調(diào)節(jié)batch interval
盡量不要讓batch處理時(shí)間大于batch interval
比如你的batch每隔5秒生成一次;你的batch處理時(shí)間要達(dá)到6秒厅贪;就會(huì)出現(xiàn)蠢护,batch在你的內(nèi)存中日積月累,一直囤積著养涮,沒法及時(shí)計(jì)算掉葵硕,釋放內(nèi)存空間;而且對(duì)內(nèi)存空間的占用越來越大贯吓,那么此時(shí)會(huì)導(dǎo)致內(nèi)存空間快速消耗
如果發(fā)現(xiàn)batch處理時(shí)間比batch interval要大懈凹,就盡量將batch interval調(diào)節(jié)大一些