Spark Adaptive Execution調(diào)研

https://blog.csdn.net/u013332124/article/details/90677676

一言缤、Spark 目前現(xiàn)有的一些問題

問題一:Shuffle partition數(shù)量沒有達(dá)到最優(yōu)

在Spark SQL中,我們可以通過spark.sql.shuffle.partition來設(shè)置shuffle后的partition數(shù)量,默認(rèn)值是200债蜜。shuffle partition的數(shù)量等同于下一Stage的Reduce Task的數(shù)量警儒。因?yàn)閟huffle的原因,這些Task處理的數(shù)據(jù)量殘差不齊抱慌,大的可能很大歹鱼,小的可能很小泣栈。而Stage的完成又取決于最慢的那個(gè)Task,其他的Task可能早早完成醉冤,在那等待秩霍。如果沒有開啟動(dòng)態(tài)資源,這勢必會造成集群資源上的浪費(fèi)蚁阳。即使開啟了動(dòng)態(tài)資源,頻繁的kill Executor和申請新的Executor一樣可能會帶來性能損耗鸽照。

雖然說我們可以認(rèn)為設(shè)置shuffle partition數(shù)量螺捐,但是我們還是無法給出一個(gè)對所有任務(wù)來說都是最優(yōu)的值,因?yàn)槊總€(gè)任務(wù)的數(shù)據(jù)和shuffle情況都不一樣。

  • 如果這個(gè)值調(diào)整的太大可能會導(dǎo)致大量的Task定血,大量的Task就意味著Task調(diào)度開銷以及資源調(diào)度開銷(如果開啟了動(dòng)態(tài)資源)赔癌。另外,如果這個(gè)Stage最后要輸出澜沟,也會造成大量的小文件存在hdfs上灾票。大量的小文件就意味著集群的namenode需要承受更大的壓力
  • 如果這個(gè)值調(diào)整的太小,就會導(dǎo)致每個(gè)Task處理的數(shù)據(jù)量變大茫虽,可能會導(dǎo)致OOM的問題刊苍。就算不發(fā)生OOM,Task的處理性能我們也不能接受

因此濒析,現(xiàn)階段Shuffle partition數(shù)量只能針對不同的任務(wù)不斷的去優(yōu)化調(diào)整正什,才能得到一個(gè)針對這個(gè)任務(wù)的最優(yōu)值。但這個(gè)在實(shí)際的開發(fā)中是很難做到的(除非性能太差号杏,否則大多數(shù)的spark job開發(fā)人員并不會主動(dòng)去做這種優(yōu)化)婴氮。

所有,有沒有一種辦法盾致,可以讓我們在執(zhí)行過程中動(dòng)態(tài)的設(shè)置shuffle partition數(shù)量主经,讓其達(dá)到一個(gè)近似最優(yōu)值呢?

問題二:現(xiàn)有執(zhí)行計(jì)劃的一些不足

我們都知道庭惜,shuffle是一個(gè)很耗性能的操作罩驻。通過避免不必要的shuffle也能帶上一定的性能提升。最常見的做法就是在大小表做Join時(shí)蜈块,將小表提前加載進(jìn)內(nèi)存鉴腻,之后直接使用內(nèi)存的數(shù)據(jù)進(jìn)行join,這樣就少了shuffle帶來的性能損耗了百揭。這種做法就是MapJoin爽哎,在Spark中,也叫做BroadcastHashJoin器一。原理是將小表數(shù)據(jù)以broadcast變量加載到內(nèi)存课锌,然后廣播到各個(gè)Executor上,直接在map中做join祈秕。在Spark中渺贤,可以通過spark.sql.autoBroadcastJoinThreshold來設(shè)置啟動(dòng)BroadcastHashJoin的閥值,默認(rèn)是10MB请毛。

SparkSQL在執(zhí)行過中志鞍,在經(jīng)過邏輯優(yōu)化時(shí)艺蝴,會估算是否要開啟BroadcastHashJoin订晌。但是這種優(yōu)化對于復(fù)雜的SQL效果并不明顯董饰,因?yàn)閺?fù)雜SQL會產(chǎn)生大量的Stage,spark優(yōu)化程序很難準(zhǔn)確的估算各個(gè)Stage的數(shù)據(jù)量來判斷是否要開啟BroadcastHashJoin懈费。下面是網(wǎng)上的一張圖:

在這里插入圖片描述

圖中左邊的Stage的數(shù)據(jù)量只有46.9KB渐溶,完全可以優(yōu)化成BroadcastHashJoin畅厢。然而Spark使用的還是常規(guī)的SortMergeJoin(也就是Shuffle)业崖。

這個(gè)問題主要還是在邏輯優(yōu)化時(shí)無法準(zhǔn)確的估算數(shù)據(jù)量導(dǎo)致的,那么我們是否可以在執(zhí)行過程中根據(jù)數(shù)據(jù)量動(dòng)態(tài)的去調(diào)整執(zhí)行計(jì)劃來解決這個(gè)問題呢呜师?

問題三:數(shù)據(jù)傾斜的問題

不管是mapreduce還是spark娶桦,都可能存在數(shù)據(jù)傾斜問題。數(shù)據(jù)傾斜是某一些partition的數(shù)據(jù)量遠(yuǎn)大于其他的partition汁汗,數(shù)據(jù)量大的那個(gè)partition處理速度就會拖慢整個(gè)任務(wù)的處理速度(很可能所有的task都處理完了衷畦,只剩下一個(gè)task還在處理)。對于數(shù)據(jù)傾斜問題碰酝,我們也有多種解決辦法霎匈。比如:

  1. 如果partition數(shù)據(jù)從外界獲取,就保證外界輸入的數(shù)據(jù)是可以Split的送爸,并保證各個(gè)Split后的塊是均衡的铛嘱。比如保證Kafka的各個(gè)partition數(shù)據(jù)均衡,讀取一個(gè)目錄時(shí)袭厂,保證下面的文件大小是均衡的等等
  2. 如果是shuffle partition墨吓,可以通過調(diào)整shuffle partition數(shù)量來避免某個(gè)shuffle partition數(shù)據(jù)量特別大
  3. 如果存在一個(gè)Key的數(shù)據(jù)量非常大,調(diào)整shuffle partition數(shù)量也沒辦法很好的規(guī)避數(shù)據(jù)傾斜問題纹磺。就可以對Key加一些前綴或者后綴來分散數(shù)據(jù)
  4. 從shuffle的角度出發(fā)帖烘,如果兩個(gè)join的表中有一個(gè)表是小表,可以優(yōu)化成BroadcastHashJoin來消除shuffle從而消除shuffle引起的數(shù)據(jù)傾斜問題

但是上面這些解決方案都是針對單一任務(wù)進(jìn)行調(diào)優(yōu)橄杨,沒有一個(gè)解決方案可以有效的解決所有的數(shù)據(jù)傾斜問題秘症。

對于這種問題,我們是不是可以在執(zhí)行過程中式矫,通過判斷shuffle write后各個(gè)partition的數(shù)據(jù)量乡摹,動(dòng)態(tài)的調(diào)整后面的執(zhí)行計(jì)劃。比如對于存在數(shù)據(jù)傾斜的分區(qū)采转,我們是否可以開啟多個(gè)task處理聪廉,之后再將處理的結(jié)果做union?

二故慈、Spark Adaptive Execution提出的相關(guān)解決方案

1板熊、自動(dòng)設(shè)置Shuffle Partition數(shù)量

Shuffle的過程是先通過Shuffle Write將各個(gè)分區(qū)的數(shù)據(jù)寫到磁盤,之后另外一個(gè)Stage通過Shuffle Read來讀取這些數(shù)據(jù)察绷。那么我們其實(shí)可以在開啟下一個(gè)Stage前先計(jì)算好Shuffle Write產(chǎn)生的各個(gè)分區(qū)的數(shù)據(jù)量是多少干签,之后對于那些比較小的分區(qū),將它們當(dāng)成一個(gè)分區(qū)來處理拆撼。

一般情況下筒严,一個(gè)分區(qū)是由一個(gè)task來處理的丹泉。經(jīng)過優(yōu)化情萤,我們可以安排一個(gè)task處理多個(gè)分區(qū)鸭蛙,這樣,我們就可以保證各個(gè)分區(qū)相對均衡筋岛,不會存在大量數(shù)據(jù)量很小的partitin了娶视。

比如Shuffle Write外我們檢測到有5個(gè)partition,數(shù)據(jù)量大小分別是64M睁宰、1M肪获、2M、20M柒傻、4M孝赫。如果沒有進(jìn)行優(yōu)化,會開啟5個(gè)task來處理红符,要等64M的那個(gè)partiiton處理完后整個(gè)Stage才算完成青柄。經(jīng)過優(yōu)化后,我們可以1M预侯、2M致开、20M、4M這些分區(qū)都交給一個(gè)task來處理萎馅。這樣双戳,總共就只有兩個(gè)task,但是整個(gè)stage的處理速度并不會比之前的慢糜芳,還少了3個(gè)task所需要的資源損耗飒货。

一些關(guān)鍵點(diǎn):

  • 目前只會合并連在一起的那些partition,主要是為了保證順序讀峭竣,提高磁盤IO性能
  • 可以通過配置spark.sql.adaptive.shuffle.targetPostShuffleInputSize來設(shè)置合并的閥值塘辅,默認(rèn)為64M
  • 只會合并小的分區(qū),太大的分區(qū)并不會進(jìn)行拆分

開啟方式:

spark.sql.adaptive.enabled=true:啟動(dòng)Adaptive Execution邪驮。

通過spark.sql.adaptive.shuffle.targetPostShuffleInputSize可以設(shè)置shuffle后每個(gè)partition的目標(biāo)數(shù)據(jù)量莫辨。一個(gè)Task加起來處理的所有分區(qū)的數(shù)據(jù)量不會超過個(gè)閥值。

2毅访、執(zhí)行過程中動(dòng)態(tài)調(diào)整執(zhí)行計(jì)劃

還是在Shuffle Write之后沮榜,我們可以觀察兩個(gè)Stage輸出的數(shù)據(jù)量。如果有一個(gè)Stage數(shù)據(jù)量明顯比較小喻粹,可以轉(zhuǎn)換成BroadcastHashJoin蟆融,那么我們就可以動(dòng)態(tài)的去調(diào)整執(zhí)行計(jì)劃。

雖然shuffle write的數(shù)據(jù)已經(jīng)輸出到磁盤上守呜,這時(shí)候我們?nèi)绻_啟了動(dòng)態(tài)調(diào)整執(zhí)行計(jì)劃型酥,shuffle read改成BroadcastHashJoin。假設(shè)表A(1M)和表B(4G)做join時(shí)郁竟,并已經(jīng)進(jìn)行了Shuffle Write棚亩,轉(zhuǎn)換成BroadcastHashJoin的過程如下:

  • 將表A的數(shù)據(jù)加載成broadcast
  • 假設(shè)上游表B有5個(gè)partition虏杰,那么此時(shí)下游Stage也創(chuàng)建對應(yīng)5個(gè)reduce task纺阔,每個(gè)reduce task都讀取對應(yīng)上游partition的shuffle write生成的文件,然后在讀取過程中從內(nèi)存讀取表A的數(shù)據(jù)進(jìn)行join

因?yàn)橄掠蔚腞educe Task可以直接發(fā)到表B Shuffle Write文件所在的Executor上质况,此時(shí)讀取數(shù)據(jù)是直接讀取磁盤文件了婆翔,避開了網(wǎng)絡(luò)IO的開銷啃奴,性能會比原先的shuffle read快很多。

開啟方式:

  • spark.sql.adaptive.enabled和spark.sql.adaptive.join.enabled 都設(shè)置為 true依溯。
  • spark.sql.adaptiveBroadcastJoinThreshold 設(shè)置了 SortMergeJoin 轉(zhuǎn) BroadcastJoin 的閾值黎炉。如果不設(shè)置該參數(shù)醋拧,該閾值與 spark.sql.autoBroadcastJoinThreshold 的值相等

3、自動(dòng)處理數(shù)據(jù)傾斜

還是在Shuffle Write之后解決問題丹壕。一樣是獲取到shuffle Write后各個(gè)partition的數(shù)據(jù)量,根據(jù)一定算法算出哪些partition數(shù)據(jù)超標(biāo)缭乘,出現(xiàn)傾斜琉用。

對于那些存在大量小數(shù)據(jù)的partiiton策幼,我們可以通過合并來解決問題(一個(gè)task處理多個(gè)partition的數(shù)據(jù))奴紧。那對于這種數(shù)據(jù)量特別大的partition绰寞,我們完全可以反其道而行,用多個(gè)task來處理這個(gè)partition。

開啟自動(dòng)處理數(shù)據(jù)傾斜后件缸,在執(zhí)行過程中叔遂,spark會自動(dòng)找出那些出現(xiàn)傾斜的partiiton已艰,然后用多個(gè)task來處理這個(gè)partition哩掺,之后再將這些task的處理結(jié)果進(jìn)行union。

比如表A和表B做join盒件,表A在shuffle write完舱禽,partition 0有4G的數(shù)據(jù)誊稚,其他partition都只有1翔始,200M。這時(shí)候我們可以開啟多個(gè)task里伯,每個(gè)task讀取幾個(gè)上游mapper生成的partition 0的數(shù)據(jù)城瞎,然后和表B的partition 0做join俏脊,最后這個(gè)幾個(gè)task再進(jìn)行union全谤。這樣雖然表B的partition 0要被多次讀取,但是并行處理帶來的收益還是要高過這些消耗的认然。

開啟方式:

  • spark.sql.adaptive.skewedJoin.enabled 設(shè)置為 true
  • spark.sql.adaptive.skewedPartitionMaxSplits 控制處理一個(gè)傾斜 Partition 的 Task 個(gè)數(shù)上限补憾,默認(rèn)值為 5
  • spark.sql.adaptive.skewedPartitionRowCountThreshold ,partition的條數(shù)如果少于這個(gè)值卷员,數(shù)據(jù)量再大也不會被當(dāng)成是傾斜的partition。默認(rèn)是1000W
  • spark.sql.adaptive.skewedPartitionSizeThreshold毕骡,被認(rèn)定為是傾斜partiiton的大小下限削饵。默認(rèn)是64M
  • spark.sql.adaptive.skewedPartitionFactor,傾斜因子未巫。如果一個(gè) Partition 的大小大于 spark.sql.adaptive.skewedPartitionSizeThreshold 的同時(shí)大于各 Partition 大小中位數(shù)與該因子的乘積窿撬,或者行數(shù)大于 spark.sql.adaptive.skewedPartitionRowCountThreshold 的同時(shí)大于各 Partition 行數(shù)中位數(shù)與該因子的乘積叙凡,則它會被視為傾斜的 Partition

三劈伴、總結(jié)

Adaptive Execution是英特爾大數(shù)據(jù)技術(shù)團(tuán)隊(duì)和百度大數(shù)據(jù)基礎(chǔ)架構(gòu)部工程師在Spark 社區(qū)版本的基礎(chǔ)上,改進(jìn)并實(shí)現(xiàn)的自適應(yīng)執(zhí)行引擎。目前代碼在https://github.com/Intel-bigdata/spark-adaptive里新啼,并沒有全部merge到spark中。相關(guān)的issue也還是In proccess狀態(tài):https://issues.apache.org/jira/browse/SPARK-23128叨吮。

目前就看到有博客說2.3.1版本中已經(jīng)有了"自動(dòng)設(shè)置Shuffle Partition數(shù)量"的特性辆布,我在spark-2.2之后的代碼中也可以搜到spark.sql.adaptive.enabled和spark.sql.adaptiveBroadcastJoinThreshold配置。截止到2.4.3版本涵叮,另外兩個(gè)特性應(yīng)該還沒真正發(fā)布惭蹂,可以期待一下。

參考資料

http://www.jasongj.com/spark/adaptive_execution/

https://github.com/Intel-bigdata/spark-adaptive

Spark SQL在100TB上的自適應(yīng)執(zhí)行實(shí)踐

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末割粮,一起剝皮案震驚了整個(gè)濱河市盾碗,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌舀瓢,老刑警劉巖廷雅,帶你破解...
    沈念sama閱讀 216,324評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異,居然都是意外死亡航缀,警方通過查閱死者的電腦和手機(jī)商架,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,356評論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來芥玉,“玉大人蛇摸,你說我怎么就攤上這事〔忧桑” “怎么了赶袄?”我有些...
    開封第一講書人閱讀 162,328評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長抠藕。 經(jīng)常有香客問我饿肺,道長,這世上最難降的妖魔是什么幢痘? 我笑而不...
    開封第一講書人閱讀 58,147評論 1 292
  • 正文 為了忘掉前任唬格,我火速辦了婚禮,結(jié)果婚禮上颜说,老公的妹妹穿的比我還像新娘。我一直安慰自己汰聋,他們只是感情好门粪,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,160評論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著烹困,像睡著了一般玄妈。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上髓梅,一...
    開封第一講書人閱讀 51,115評論 1 296
  • 那天拟蜻,我揣著相機(jī)與錄音,去河邊找鬼枯饿。 笑死酝锅,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的奢方。 我是一名探鬼主播搔扁,決...
    沈念sama閱讀 40,025評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼蟋字!你這毒婦竟也來了稿蹲?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,867評論 0 274
  • 序言:老撾萬榮一對情侶失蹤鹊奖,失蹤者是張志新(化名)和其女友劉穎苛聘,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,307評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡设哗,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,528評論 2 332
  • 正文 我和宋清朗相戀三年唱捣,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片熬拒。...
    茶點(diǎn)故事閱讀 39,688評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡爷光,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出澎粟,到底是詐尸還是另有隱情蛀序,我是刑警寧澤,帶...
    沈念sama閱讀 35,409評論 5 343
  • 正文 年R本政府宣布活烙,位于F島的核電站徐裸,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏啸盏。R本人自食惡果不足惜重贺,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,001評論 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望回懦。 院中可真熱鬧气笙,春花似錦、人聲如沸怯晕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,657評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽舟茶。三九已至谭期,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間吧凉,已是汗流浹背隧出。 一陣腳步聲響...
    開封第一講書人閱讀 32,811評論 1 268
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留阀捅,地道東北人胀瞪。 一個(gè)月前我還...
    沈念sama閱讀 47,685評論 2 368
  • 正文 我出身青樓,卻偏偏與公主長得像也搓,于是被迫代替她去往敵國和親赏廓。 傳聞我的和親對象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,573評論 2 353

推薦閱讀更多精彩內(nèi)容

  • Apache Spark 是專為大規(guī)模數(shù)據(jù)處理而設(shè)計(jì)的快速通用的計(jì)算引擎傍妒。Spark是UC Berkeley AM...
    大佛愛讀書閱讀 2,827評論 0 20
  • 1 數(shù)據(jù)傾斜調(diào)優(yōu) 1.1 調(diào)優(yōu)概述 有的時(shí)候幔摸,我們可能會遇到大數(shù)據(jù)計(jì)算中一個(gè)最棘手的問題——數(shù)據(jù)傾斜,此時(shí)Spar...
    wisfern閱讀 2,935評論 0 23
  • 原文:https://tech.meituan.com/spark-tuning-pro.html Spark性能...
    code_solve閱讀 1,188評論 0 34
  • spark-submit的時(shí)候如何引入外部jar包 在通過spark-submit提交任務(wù)時(shí)颤练,可以通過添加配置參數(shù)...
    博弈史密斯閱讀 2,740評論 1 14
  • 前言 繼基礎(chǔ)篇講解了每個(gè)Spark開發(fā)人員都必須熟知的開發(fā)調(diào)優(yōu)與資源調(diào)優(yōu)之后既忆,本文作為《Spark性能優(yōu)化指南》的...
    Alukar閱讀 871評論 0 2