https://blog.csdn.net/u013332124/article/details/90677676
一言缤、Spark 目前現(xiàn)有的一些問題
問題一:Shuffle partition數(shù)量沒有達(dá)到最優(yōu)
在Spark SQL中,我們可以通過spark.sql.shuffle.partition來設(shè)置shuffle后的partition數(shù)量,默認(rèn)值是200债蜜。shuffle partition的數(shù)量等同于下一Stage的Reduce Task的數(shù)量警儒。因?yàn)閟huffle的原因,這些Task處理的數(shù)據(jù)量殘差不齊抱慌,大的可能很大歹鱼,小的可能很小泣栈。而Stage的完成又取決于最慢的那個(gè)Task,其他的Task可能早早完成醉冤,在那等待秩霍。如果沒有開啟動(dòng)態(tài)資源,這勢必會造成集群資源上的浪費(fèi)蚁阳。即使開啟了動(dòng)態(tài)資源,頻繁的kill Executor和申請新的Executor一樣可能會帶來性能損耗鸽照。
雖然說我們可以認(rèn)為設(shè)置shuffle partition數(shù)量螺捐,但是我們還是無法給出一個(gè)對所有任務(wù)來說都是最優(yōu)的值,因?yàn)槊總€(gè)任務(wù)的數(shù)據(jù)和shuffle情況都不一樣。
- 如果這個(gè)值調(diào)整的太大可能會導(dǎo)致大量的Task定血,大量的Task就意味著Task調(diào)度開銷以及資源調(diào)度開銷(如果開啟了動(dòng)態(tài)資源)赔癌。另外,如果這個(gè)Stage最后要輸出澜沟,也會造成大量的小文件存在hdfs上灾票。大量的小文件就意味著集群的namenode需要承受更大的壓力
- 如果這個(gè)值調(diào)整的太小,就會導(dǎo)致每個(gè)Task處理的數(shù)據(jù)量變大茫虽,可能會導(dǎo)致OOM的問題刊苍。就算不發(fā)生OOM,Task的處理性能我們也不能接受
因此濒析,現(xiàn)階段Shuffle partition數(shù)量只能針對不同的任務(wù)不斷的去優(yōu)化調(diào)整正什,才能得到一個(gè)針對這個(gè)任務(wù)的最優(yōu)值。但這個(gè)在實(shí)際的開發(fā)中是很難做到的(除非性能太差号杏,否則大多數(shù)的spark job開發(fā)人員并不會主動(dòng)去做這種優(yōu)化)婴氮。
所有,有沒有一種辦法盾致,可以讓我們在執(zhí)行過程中動(dòng)態(tài)的設(shè)置shuffle partition數(shù)量主经,讓其達(dá)到一個(gè)近似最優(yōu)值呢?
問題二:現(xiàn)有執(zhí)行計(jì)劃的一些不足
我們都知道庭惜,shuffle是一個(gè)很耗性能的操作罩驻。通過避免不必要的shuffle也能帶上一定的性能提升。最常見的做法就是在大小表做Join時(shí)蜈块,將小表提前加載進(jìn)內(nèi)存鉴腻,之后直接使用內(nèi)存的數(shù)據(jù)進(jìn)行join,這樣就少了shuffle帶來的性能損耗了百揭。這種做法就是MapJoin爽哎,在Spark中,也叫做BroadcastHashJoin器一。原理是將小表數(shù)據(jù)以broadcast變量加載到內(nèi)存课锌,然后廣播到各個(gè)Executor上,直接在map中做join祈秕。在Spark中渺贤,可以通過spark.sql.autoBroadcastJoinThreshold來設(shè)置啟動(dòng)BroadcastHashJoin的閥值,默認(rèn)是10MB请毛。
SparkSQL在執(zhí)行過中志鞍,在經(jīng)過邏輯優(yōu)化時(shí)艺蝴,會估算是否要開啟BroadcastHashJoin订晌。但是這種優(yōu)化對于復(fù)雜的SQL效果并不明顯董饰,因?yàn)閺?fù)雜SQL會產(chǎn)生大量的Stage,spark優(yōu)化程序很難準(zhǔn)確的估算各個(gè)Stage的數(shù)據(jù)量來判斷是否要開啟BroadcastHashJoin懈费。下面是網(wǎng)上的一張圖:
圖中左邊的Stage的數(shù)據(jù)量只有46.9KB渐溶,完全可以優(yōu)化成BroadcastHashJoin畅厢。然而Spark使用的還是常規(guī)的SortMergeJoin(也就是Shuffle)业崖。
這個(gè)問題主要還是在邏輯優(yōu)化時(shí)無法準(zhǔn)確的估算數(shù)據(jù)量導(dǎo)致的,那么我們是否可以在執(zhí)行過程中根據(jù)數(shù)據(jù)量動(dòng)態(tài)的去調(diào)整執(zhí)行計(jì)劃來解決這個(gè)問題呢呜师?
問題三:數(shù)據(jù)傾斜的問題
不管是mapreduce還是spark娶桦,都可能存在數(shù)據(jù)傾斜問題。數(shù)據(jù)傾斜是某一些partition的數(shù)據(jù)量遠(yuǎn)大于其他的partition汁汗,數(shù)據(jù)量大的那個(gè)partition處理速度就會拖慢整個(gè)任務(wù)的處理速度(很可能所有的task都處理完了衷畦,只剩下一個(gè)task還在處理)。對于數(shù)據(jù)傾斜問題碰酝,我們也有多種解決辦法霎匈。比如:
- 如果partition數(shù)據(jù)從外界獲取,就保證外界輸入的數(shù)據(jù)是可以Split的送爸,并保證各個(gè)Split后的塊是均衡的铛嘱。比如保證Kafka的各個(gè)partition數(shù)據(jù)均衡,讀取一個(gè)目錄時(shí)袭厂,保證下面的文件大小是均衡的等等
- 如果是shuffle partition墨吓,可以通過調(diào)整shuffle partition數(shù)量來避免某個(gè)shuffle partition數(shù)據(jù)量特別大
- 如果存在一個(gè)Key的數(shù)據(jù)量非常大,調(diào)整shuffle partition數(shù)量也沒辦法很好的規(guī)避數(shù)據(jù)傾斜問題纹磺。就可以對Key加一些前綴或者后綴來分散數(shù)據(jù)
- 從shuffle的角度出發(fā)帖烘,如果兩個(gè)join的表中有一個(gè)表是小表,可以優(yōu)化成BroadcastHashJoin來消除shuffle從而消除shuffle引起的數(shù)據(jù)傾斜問題
但是上面這些解決方案都是針對單一任務(wù)進(jìn)行調(diào)優(yōu)橄杨,沒有一個(gè)解決方案可以有效的解決所有的數(shù)據(jù)傾斜問題秘症。
對于這種問題,我們是不是可以在執(zhí)行過程中式矫,通過判斷shuffle write后各個(gè)partition的數(shù)據(jù)量乡摹,動(dòng)態(tài)的調(diào)整后面的執(zhí)行計(jì)劃。比如對于存在數(shù)據(jù)傾斜的分區(qū)采转,我們是否可以開啟多個(gè)task處理聪廉,之后再將處理的結(jié)果做union?
二故慈、Spark Adaptive Execution提出的相關(guān)解決方案
1板熊、自動(dòng)設(shè)置Shuffle Partition數(shù)量
Shuffle的過程是先通過Shuffle Write將各個(gè)分區(qū)的數(shù)據(jù)寫到磁盤,之后另外一個(gè)Stage通過Shuffle Read來讀取這些數(shù)據(jù)察绷。那么我們其實(shí)可以在開啟下一個(gè)Stage前先計(jì)算好Shuffle Write產(chǎn)生的各個(gè)分區(qū)的數(shù)據(jù)量是多少干签,之后對于那些比較小的分區(qū),將它們當(dāng)成一個(gè)分區(qū)來處理拆撼。
一般情況下筒严,一個(gè)分區(qū)是由一個(gè)task來處理的丹泉。經(jīng)過優(yōu)化情萤,我們可以安排一個(gè)task處理多個(gè)分區(qū)鸭蛙,這樣,我們就可以保證各個(gè)分區(qū)相對均衡筋岛,不會存在大量數(shù)據(jù)量很小的partitin了娶视。
比如Shuffle Write外我們檢測到有5個(gè)partition,數(shù)據(jù)量大小分別是64M睁宰、1M肪获、2M、20M柒傻、4M孝赫。如果沒有進(jìn)行優(yōu)化,會開啟5個(gè)task來處理红符,要等64M的那個(gè)partiiton處理完后整個(gè)Stage才算完成青柄。經(jīng)過優(yōu)化后,我們可以1M预侯、2M致开、20M、4M這些分區(qū)都交給一個(gè)task來處理萎馅。這樣双戳,總共就只有兩個(gè)task,但是整個(gè)stage的處理速度并不會比之前的慢糜芳,還少了3個(gè)task所需要的資源損耗飒货。
一些關(guān)鍵點(diǎn):
- 目前只會合并連在一起的那些partition,主要是為了保證順序讀峭竣,提高磁盤IO性能
- 可以通過配置spark.sql.adaptive.shuffle.targetPostShuffleInputSize來設(shè)置合并的閥值塘辅,默認(rèn)為64M
- 只會合并小的分區(qū),太大的分區(qū)并不會進(jìn)行拆分
開啟方式:
spark.sql.adaptive.enabled=true:啟動(dòng)Adaptive Execution邪驮。
通過spark.sql.adaptive.shuffle.targetPostShuffleInputSize可以設(shè)置shuffle后每個(gè)partition的目標(biāo)數(shù)據(jù)量莫辨。一個(gè)Task加起來處理的所有分區(qū)的數(shù)據(jù)量不會超過個(gè)閥值。
2毅访、執(zhí)行過程中動(dòng)態(tài)調(diào)整執(zhí)行計(jì)劃
還是在Shuffle Write之后沮榜,我們可以觀察兩個(gè)Stage輸出的數(shù)據(jù)量。如果有一個(gè)Stage數(shù)據(jù)量明顯比較小喻粹,可以轉(zhuǎn)換成BroadcastHashJoin蟆融,那么我們就可以動(dòng)態(tài)的去調(diào)整執(zhí)行計(jì)劃。
雖然shuffle write的數(shù)據(jù)已經(jīng)輸出到磁盤上守呜,這時(shí)候我們?nèi)绻_啟了動(dòng)態(tài)調(diào)整執(zhí)行計(jì)劃型酥,shuffle read改成BroadcastHashJoin。假設(shè)表A(1M)和表B(4G)做join時(shí)郁竟,并已經(jīng)進(jìn)行了Shuffle Write棚亩,轉(zhuǎn)換成BroadcastHashJoin的過程如下:
- 將表A的數(shù)據(jù)加載成broadcast
- 假設(shè)上游表B有5個(gè)partition虏杰,那么此時(shí)下游Stage也創(chuàng)建對應(yīng)5個(gè)reduce task纺阔,每個(gè)reduce task都讀取對應(yīng)上游partition的shuffle write生成的文件,然后在讀取過程中從內(nèi)存讀取表A的數(shù)據(jù)進(jìn)行join
因?yàn)橄掠蔚腞educe Task可以直接發(fā)到表B Shuffle Write文件所在的Executor上质况,此時(shí)讀取數(shù)據(jù)是直接讀取磁盤文件了婆翔,避開了網(wǎng)絡(luò)IO的開銷啃奴,性能會比原先的shuffle read快很多。
開啟方式:
- spark.sql.adaptive.enabled和
spark.sql.adaptive.join.enabled
都設(shè)置為true依溯。
-
spark.sql.adaptiveBroadcastJoinThreshold
設(shè)置了 SortMergeJoin 轉(zhuǎn) BroadcastJoin 的閾值黎炉。如果不設(shè)置該參數(shù)醋拧,該閾值與spark.sql.autoBroadcastJoinThreshold
的值相等
3、自動(dòng)處理數(shù)據(jù)傾斜
還是在Shuffle Write之后解決問題丹壕。一樣是獲取到shuffle Write后各個(gè)partition的數(shù)據(jù)量,根據(jù)一定算法算出哪些partition數(shù)據(jù)超標(biāo)缭乘,出現(xiàn)傾斜琉用。
對于那些存在大量小數(shù)據(jù)的partiiton策幼,我們可以通過合并來解決問題(一個(gè)task處理多個(gè)partition的數(shù)據(jù))奴紧。那對于這種數(shù)據(jù)量特別大的partition绰寞,我們完全可以反其道而行,用多個(gè)task來處理這個(gè)partition。
開啟自動(dòng)處理數(shù)據(jù)傾斜后件缸,在執(zhí)行過程中叔遂,spark會自動(dòng)找出那些出現(xiàn)傾斜的partiiton已艰,然后用多個(gè)task來處理這個(gè)partition哩掺,之后再將這些task的處理結(jié)果進(jìn)行union。
比如表A和表B做join盒件,表A在shuffle write完舱禽,partition 0有4G的數(shù)據(jù)誊稚,其他partition都只有1翔始,200M。這時(shí)候我們可以開啟多個(gè)task里伯,每個(gè)task讀取幾個(gè)上游mapper生成的partition 0的數(shù)據(jù)城瞎,然后和表B的partition 0做join俏脊,最后這個(gè)幾個(gè)task再進(jìn)行union全谤。這樣雖然表B的partition 0要被多次讀取,但是并行處理帶來的收益還是要高過這些消耗的认然。
開啟方式:
-
spark.sql.adaptive.skewedJoin.enabled
設(shè)置為 true -
spark.sql.adaptive.skewedPartitionMaxSplits
控制處理一個(gè)傾斜 Partition 的 Task 個(gè)數(shù)上限补憾,默認(rèn)值為 5 - spark.sql.adaptive.skewedPartitionRowCountThreshold ,partition的條數(shù)如果少于這個(gè)值卷员,數(shù)據(jù)量再大也不會被當(dāng)成是傾斜的partition。默認(rèn)是1000W
- spark.sql.adaptive.skewedPartitionSizeThreshold毕骡,被認(rèn)定為是傾斜partiiton的大小下限削饵。默認(rèn)是64M
- spark.sql.adaptive.skewedPartitionFactor,傾斜因子未巫。如果一個(gè) Partition 的大小大于
spark.sql.adaptive.skewedPartitionSizeThreshold
的同時(shí)大于各 Partition 大小中位數(shù)與該因子的乘積窿撬,或者行數(shù)大于spark.sql.adaptive.skewedPartitionRowCountThreshold
的同時(shí)大于各 Partition 行數(shù)中位數(shù)與該因子的乘積叙凡,則它會被視為傾斜的 Partition
三劈伴、總結(jié)
Adaptive Execution是英特爾大數(shù)據(jù)技術(shù)團(tuán)隊(duì)和百度大數(shù)據(jù)基礎(chǔ)架構(gòu)部工程師在Spark 社區(qū)版本的基礎(chǔ)上,改進(jìn)并實(shí)現(xiàn)的自適應(yīng)執(zhí)行引擎。目前代碼在https://github.com/Intel-bigdata/spark-adaptive里新啼,并沒有全部merge到spark中。相關(guān)的issue也還是In proccess狀態(tài):https://issues.apache.org/jira/browse/SPARK-23128叨吮。
目前就看到有博客說2.3.1版本中已經(jīng)有了"自動(dòng)設(shè)置Shuffle Partition數(shù)量"的特性辆布,我在spark-2.2之后的代碼中也可以搜到spark.sql.adaptive.enabled和spark.sql.adaptiveBroadcastJoinThreshold配置。截止到2.4.3版本涵叮,另外兩個(gè)特性應(yīng)該還沒真正發(fā)布惭蹂,可以期待一下。
參考資料
http://www.jasongj.com/spark/adaptive_execution/