Spark SQL Adaptive Execution
Adaptive execution in Spark-9850
Spark SQL是Apache Spark最廣泛使用的一個(gè)組件,它提供了非常友好的接口來(lái)分布式處理結(jié)構(gòu)化數(shù)據(jù),在很多應(yīng)用領(lǐng)域都有成功的生產(chǎn)實(shí)踐谒撼,但是在超大規(guī)模集群和數(shù)據(jù)集上,Spark SQL仍然遇到不少易用性和可擴(kuò)展性的挑戰(zhàn)辟汰。為了應(yīng)對(duì)這些挑戰(zhàn)列敲,英特爾大數(shù)據(jù)技術(shù)團(tuán)隊(duì)和百度大數(shù)據(jù)基礎(chǔ)架構(gòu)部工程師在Spark 社區(qū)版本的基礎(chǔ)上,改進(jìn)并實(shí)現(xiàn)了自適應(yīng)執(zhí)行引擎帖汞。本文首先討論Spark SQL在大規(guī)模數(shù)據(jù)集上遇到的挑戰(zhàn)戴而,然后介紹自適應(yīng)執(zhí)行的背景和基本架構(gòu),以及自適應(yīng)執(zhí)行如何應(yīng)對(duì)Spark SQL這些問(wèn)題翩蘸,最后我們將比較自適應(yīng)執(zhí)行和現(xiàn)有的社區(qū)版本Spark SQL在100 TB 規(guī)模TPC-DS基準(zhǔn)測(cè)試碰到的挑戰(zhàn)和性能差異所意,以及自適應(yīng)執(zhí)行在Baidu Big SQL平臺(tái)的使用情況。
挑戰(zhàn)1:關(guān)于shuffle partition數(shù)
在Spark SQL中鹿鳖, shufflepartition數(shù)可以通過(guò)參數(shù)spark.sql.shuffle.partition來(lái)設(shè)置扁眯,默認(rèn)值是200。這個(gè)參數(shù)決定了SQL作業(yè)每個(gè)reduce階段任務(wù)數(shù)量翅帜,對(duì)整個(gè)查詢性能有很大影響姻檀。假設(shè)一個(gè)查詢運(yùn)行前申請(qǐng)了E個(gè)Executor,每個(gè)Executor包含C個(gè)core(并發(fā)執(zhí)行線程數(shù))涝滴,那么該作業(yè)在運(yùn)行時(shí)可以并行執(zhí)行的任務(wù)數(shù)就等于E x C個(gè)绣版,或者說(shuō)該作業(yè)的并發(fā)數(shù)是E x C。假設(shè)shuffle partition個(gè)數(shù)為P歼疮,除了map stage的任務(wù)數(shù)和原始數(shù)據(jù)的文件數(shù)量以及大小相關(guān)杂抽,后續(xù)的每個(gè)reduce stage的任務(wù)數(shù)都是P。由于Spark作業(yè)調(diào)度是搶占式的韩脏,E x C個(gè)并發(fā)任務(wù)執(zhí)行單元會(huì)搶占執(zhí)行P個(gè)任務(wù)缩麸,“能者多勞”,直至所有任務(wù)完成赡矢,則進(jìn)入到下一個(gè)Stage杭朱。但這個(gè)過(guò)程中,如果有任務(wù)因?yàn)樘幚頂?shù)據(jù)量過(guò)大(例如:數(shù)據(jù)傾斜導(dǎo)致大量數(shù)據(jù)被劃分到同一個(gè)reducer partition)或者其它原因造成該任務(wù)執(zhí)行時(shí)間過(guò)長(zhǎng)吹散,一方面會(huì)導(dǎo)致整個(gè)stage執(zhí)行時(shí)間變長(zhǎng)弧械,另一方面E x C個(gè)并發(fā)執(zhí)行單元大部分可能都處于空閑等待狀態(tài),集群資源整體利用率急劇下降空民。
那么spark.sql.shuffle.partition參數(shù)究竟是多少比較合適刃唐?如果設(shè)置過(guò)小,分配給每一個(gè)reduce任務(wù)處理的數(shù)據(jù)量就越多界轩,在內(nèi)存大小有限的情況下画饥,不得不溢寫(spill)到計(jì)算節(jié)點(diǎn)本地磁盤上。Spill會(huì)導(dǎo)致額外的磁盤讀寫浊猾,影響整個(gè)SQL查詢的性能荒澡,更差的情況還可能導(dǎo)致嚴(yán)重的GC問(wèn)題甚至是OOM。相反与殃,如果shuffle partition設(shè)置過(guò)大单山。第一,每一個(gè)reduce任務(wù)處理的數(shù)據(jù)量很小并且很快結(jié)束幅疼,進(jìn)而導(dǎo)致Spark任務(wù)調(diào)度負(fù)擔(dān)變大米奸。第二丈咐,每一個(gè)mapper任務(wù)必須把自己的shuffle輸出數(shù)據(jù)分成P個(gè)hash bucket赶诊,即確定數(shù)據(jù)屬于哪一個(gè)reduce partition截酷,當(dāng)shuffle partition數(shù)量太多時(shí)表箭,hash bucket里數(shù)據(jù)量會(huì)很小掸宛,在作業(yè)并發(fā)數(shù)很大時(shí)么抗,reduce任務(wù)shuffle拉取數(shù)據(jù)會(huì)造成一定程度的隨機(jī)小數(shù)據(jù)讀操作瓷胧,當(dāng)使用機(jī)械硬盤作為shuffle數(shù)據(jù)臨時(shí)存取的時(shí)候性能下降會(huì)更加明顯衡蚂。最后泪喊,當(dāng)最后一個(gè)stage保存數(shù)據(jù)時(shí)會(huì)寫出P個(gè)文件棕硫,也可能會(huì)造成HDFS文件系統(tǒng)中大量的小文件。
從上袒啼,shuffle partition的設(shè)置既不能太小也不能太大哈扮。為了達(dá)到最佳的性能,往往需要經(jīng)多次試驗(yàn)才能確定某個(gè)SQL查詢最佳的shuffle partition值蚓再。然而在生產(chǎn)環(huán)境中滑肉,往往SQL以定時(shí)作業(yè)的方式處理不同時(shí)間段的數(shù)據(jù),數(shù)據(jù)量大小可能變化很大摘仅,我們也無(wú)法為每一個(gè)SQL查詢?nèi)プ龊臅r(shí)的人工調(diào)優(yōu)靶庙,這也意味這些SQL作業(yè)很難以最佳的性能方式運(yùn)行。
Shuffle partition的另外一個(gè)問(wèn)題是娃属,同一個(gè)shuffle partition數(shù)設(shè)置將應(yīng)用到所有的stage六荒。Spark在執(zhí)行一個(gè)SQL作業(yè)時(shí),會(huì)劃分成多個(gè)stage膳犹。通常情況下恬吕,每個(gè)stage的數(shù)據(jù)分布和大小可能都不太一樣,全局的shuffle partition設(shè)置最多只能對(duì)某個(gè)或者某些stage最優(yōu)须床,沒(méi)有辦法做到全局所有的stage設(shè)置最優(yōu)铐料。
這一系列關(guān)于shufflepartition的性能和易用性挑戰(zhàn),促使我們思考新的方法:我們能否根據(jù)運(yùn)行時(shí)獲取的shuffle數(shù)據(jù)量信息豺旬,例如數(shù)據(jù)塊大小钠惩,記錄行數(shù)等等,自動(dòng)為每一個(gè)stage設(shè)置合適的shuffle partition值族阅?
挑戰(zhàn)2:Spark SQL最佳執(zhí)行計(jì)劃
Spark SQL在執(zhí)行SQL之前篓跛,會(huì)將SQL或者Dataset程序解析成邏輯計(jì)劃,然后經(jīng)歷一系列的優(yōu)化坦刀,最后確定一個(gè)可執(zhí)行的物理計(jì)劃愧沟。最終選擇的物理計(jì)劃的不同對(duì)性能有很大的影響蔬咬。如何選擇最佳的執(zhí)行計(jì)劃,這便是Spark SQL的Catalyst優(yōu)化器的核心工作沐寺。Catalyst早期主要是基于規(guī)則的優(yōu)化器(RBO)林艘,在Spark 2.2中又加入了基于代價(jià)的優(yōu)化(CBO)。目前執(zhí)行計(jì)劃的確定是在計(jì)劃階段混坞,一旦確認(rèn)以后便不再改變狐援。然而在運(yùn)行期間,當(dāng)我們獲取到更多運(yùn)行時(shí)信息時(shí)究孕,我們將有可能得到一個(gè)更佳的執(zhí)行計(jì)劃啥酱。
以join操作為例,在Spark中最常見的策略是BroadcastHashJoin和SortMergeJoin厨诸。BroadcastHashJoin屬于map side join镶殷,其原理是當(dāng)其中一張表存儲(chǔ)空間大小小于broadcast閾值時(shí),Spark選擇將這張小表廣播到每一個(gè)Executor上泳猬,然后在map階段批钠,每一個(gè)mapper讀取大表的一個(gè)分片,并且和整張小表進(jìn)行join得封,整個(gè)過(guò)程中避免了把大表的數(shù)據(jù)在集群中進(jìn)行shuffle埋心。而SortMergeJoin在map階段2張數(shù)據(jù)表都按相同的分區(qū)方式進(jìn)行shuffle寫,reduce階段每個(gè)reducer將兩張表屬于對(duì)應(yīng)partition的數(shù)據(jù)拉取到同一個(gè)任務(wù)中做join忙上。RBO根據(jù)數(shù)據(jù)的大小拷呆,盡可能把join操作優(yōu)化成BroadcastHashJoin。Spark中使用參數(shù)spark.sql.autoBroadcastJoinThreshold來(lái)控制選擇BroadcastHashJoin的閾值疫粥,默認(rèn)是10MB茬斧。然而對(duì)于復(fù)雜的SQL查詢,它可能使用中間結(jié)果來(lái)作為join的輸入梗逮,在計(jì)劃階段项秉,Spark并不能精確地知道join中兩表的大小或者會(huì)錯(cuò)誤地估計(jì)它們的大小,以致于錯(cuò)失了使用BroadcastHashJoin策略來(lái)優(yōu)化join執(zhí)行的機(jī)會(huì)慷彤。但是在運(yùn)行時(shí)娄蔼,通過(guò)從shuffle寫得到的信息,我們可以動(dòng)態(tài)地選用BroadcastHashJoin底哗。以下是一個(gè)例子岁诉,join一邊的輸入大小只有600K,但Spark仍然規(guī)劃成SortMergeJoin跋选。
這促使我們思考第二個(gè)問(wèn)題:我們能否通過(guò)運(yùn)行時(shí)收集到的信息涕癣,來(lái)動(dòng)態(tài)地調(diào)整執(zhí)行計(jì)劃?
挑戰(zhàn)3:數(shù)據(jù)傾斜
數(shù)據(jù)傾斜是常見的導(dǎo)致Spark SQL性能變差的問(wèn)題前标。數(shù)據(jù)傾斜是指某一個(gè)partition的數(shù)據(jù)量遠(yuǎn)遠(yuǎn)大于其它partition的數(shù)據(jù)坠韩,導(dǎo)致個(gè)別任務(wù)的運(yùn)行時(shí)間遠(yuǎn)遠(yuǎn)大于其它任務(wù)距潘,因此拖累了整個(gè)SQL的運(yùn)行時(shí)間。在實(shí)際SQL作業(yè)中同眯,數(shù)據(jù)傾斜很常見绽昼,join key對(duì)應(yīng)的hash bucket總是會(huì)出現(xiàn)記錄數(shù)不太平均的情況,在極端情況下须蜗,相同join key對(duì)應(yīng)的記錄數(shù)特別多,大量的數(shù)據(jù)必然被分到同一個(gè)partition因而造成數(shù)據(jù)嚴(yán)重傾斜目溉。如圖2明肮,可以看到大部分任務(wù)3秒左右就完成了,而最慢的任務(wù)卻花了4分鐘缭付,它處理的數(shù)據(jù)量卻是其它任務(wù)的若干倍柿估。
目前,處理join時(shí)數(shù)據(jù)傾斜的一些常見手段有: (1)增加shuffle partition數(shù)量陷猫,期望原本分在同一個(gè)partition中的數(shù)據(jù)可以被分散到多個(gè)partition中秫舌,但是對(duì)于同key的數(shù)據(jù)沒(méi)有作用。(2)調(diào)大BroadcastHashJoin的閾值绣檬,在某些場(chǎng)景下可以把SortMergeJoin轉(zhuǎn)化成BroadcastHashJoin而避免shuffle產(chǎn)生的數(shù)據(jù)傾斜足陨。(3)手動(dòng)過(guò)濾傾斜的key,并且對(duì)這些數(shù)據(jù)加入隨機(jī)的前綴娇未,在另一張表中這些key對(duì)應(yīng)的數(shù)據(jù)也相應(yīng)的膨脹處理墨缘,然后再做join。綜上零抬,這些手段都有各自的局限性并且涉及很多的人為處理镊讼。基于此平夜,我們思考了第三個(gè)問(wèn)題:Spark能否在運(yùn)行時(shí)自動(dòng)地處理join中的數(shù)據(jù)傾斜蝶棋?
自適應(yīng)執(zhí)行背景和簡(jiǎn)介
早在2015年,Spark社區(qū)就提出了自適應(yīng)執(zhí)行的基本想法忽妒,在Spark的DAGScheduler中增加了提交單個(gè)map stage的接口玩裙,并且在實(shí)現(xiàn)運(yùn)行時(shí)調(diào)整shuffle partition數(shù)量上做了嘗試。但目前該實(shí)現(xiàn)有一定的局限性锰扶,在某些場(chǎng)景下會(huì)引入更多的shuffle献酗,即更多的stage,對(duì)于三表在同一個(gè)stage中做join等情況也無(wú)法很好的處理坷牛。所以該功能一直處于實(shí)驗(yàn)階段罕偎,配置參數(shù)也沒(méi)有在官方文檔中提及。
基于這些社區(qū)的工作京闰,英特爾大數(shù)據(jù)技術(shù)團(tuán)隊(duì)對(duì)自適應(yīng)執(zhí)行做了重新的設(shè)計(jì)颜及,實(shí)現(xiàn)了一個(gè)更為靈活的自適性執(zhí)行框架甩苛。在這個(gè)框架下面,我們可以添加額外的規(guī)則俏站,來(lái)實(shí)現(xiàn)更多的功能讯蒲。目前,已實(shí)現(xiàn)的特性包括:自動(dòng)設(shè)置shuffle partition數(shù)肄扎,動(dòng)態(tài)調(diào)整執(zhí)行計(jì)劃墨林,動(dòng)態(tài)處理數(shù)據(jù)傾斜等等。
自適應(yīng)執(zhí)行架構(gòu)
在Spark SQL中犯祠,當(dāng)Spark確定最后的物理執(zhí)行計(jì)劃后旭等,根據(jù)每一個(gè)operator對(duì)RDD的轉(zhuǎn)換定義,它會(huì)生成一個(gè)RDD的DAG圖衡载。之后Spark基于DAG圖靜態(tài)劃分stage并且提交執(zhí)行搔耕,所以一旦執(zhí)行計(jì)劃確定后,在運(yùn)行階段無(wú)法再更新痰娱。自適應(yīng)執(zhí)行的基本思路是在執(zhí)行計(jì)劃中事先劃分好stage弃榨,然后按stage提交執(zhí)行,在運(yùn)行時(shí)收集當(dāng)前stage的shuffle統(tǒng)計(jì)信息梨睁,以此來(lái)優(yōu)化下一個(gè)stage的執(zhí)行計(jì)劃鲸睛,然后再提交執(zhí)行后續(xù)的stage。
從圖3中我們可以看出自適應(yīng)執(zhí)行的工作方法而姐,首先以Exchange節(jié)點(diǎn)作為分界將執(zhí)行計(jì)劃這棵樹劃分成多個(gè)QueryStage(Exchange節(jié)點(diǎn)在Spark SQL中代表shuffle)腊凶。每一個(gè)QueryStage都是一棵獨(dú)立的子樹,也是一個(gè)獨(dú)立的執(zhí)行單元拴念。在加入QueryStage的同時(shí)钧萍,我們也加入一個(gè)QueryStageInput的葉子節(jié)點(diǎn),作為父親QueryStage的輸入政鼠。例如對(duì)于圖中兩表join的執(zhí)行計(jì)劃來(lái)說(shuō)我們會(huì)創(chuàng)建3個(gè)QueryStage风瘦。最后一個(gè)QueryStage中的執(zhí)行計(jì)劃是join本身,它有2個(gè)QueryStageInput代表它的輸入公般,分別指向2個(gè)孩子的QueryStage万搔。在執(zhí)行QueryStage時(shí),我們首先提交它的孩子stage官帘,并且收集這些stage運(yùn)行時(shí)的信息瞬雹。當(dāng)這些孩子stage運(yùn)行完畢后,我們可以知道它們的大小等信息刽虹,以此來(lái)判斷QueryStage中的計(jì)劃是否可以優(yōu)化更新酗捌。例如當(dāng)我們獲知某一張表的大小是5M,它小于broadcast的閾值時(shí),我們可以將SortMergeJoin轉(zhuǎn)化成BroadcastHashJoin來(lái)優(yōu)化當(dāng)前的執(zhí)行計(jì)劃胖缤。我們也可以根據(jù)孩子stage產(chǎn)生的shuffle數(shù)據(jù)量尚镰,來(lái)動(dòng)態(tài)地調(diào)整該stage的reducer個(gè)數(shù)。在完成一系列的優(yōu)化處理后哪廓,最終我們?yōu)樵換ueryStage生成RDD的DAG圖狗唉,并且提交給DAG Scheduler來(lái)執(zhí)行。
自動(dòng)設(shè)置reducer個(gè)數(shù)
假設(shè)我們?cè)O(shè)置的shufflepartition個(gè)數(shù)為5涡真,在map stage結(jié)束之后分俯,我們知道每一個(gè)partition的大小分別是70MB,30MB哆料,20MB澳迫,10MB和50MB。假設(shè)我們?cè)O(shè)置每一個(gè)reducer處理的目標(biāo)數(shù)據(jù)量是64MB剧劝,那么在運(yùn)行時(shí),我們可以實(shí)際使用3個(gè)reducer抓歼。第一個(gè)reducer處理partition 0 (70MB)讥此,第二個(gè)reducer處理連續(xù)的partition 1 到3,共60MB谣妻,第三個(gè)reducer處理partition 4 (50MB)萄喳,如圖4所示。
在自適應(yīng)執(zhí)行的框架中蹋半,因?yàn)槊總€(gè)QueryStage都知道自己所有的孩子stage他巨,因此在調(diào)整reducer個(gè)數(shù)時(shí),可以考慮到所有的stage輸入减江。另外染突,我們也可以將記錄條數(shù)作為一個(gè)reducer處理的目標(biāo)值。因?yàn)閟huffle的數(shù)據(jù)往往都是經(jīng)過(guò)壓縮的辈灼,有時(shí)partition的數(shù)據(jù)量并不大份企,但解壓后記錄條數(shù)確遠(yuǎn)遠(yuǎn)大于其它partition,造成數(shù)據(jù)不均巡莹。所以同時(shí)考慮數(shù)據(jù)大小和記錄條數(shù)可以更好地決定reducer的個(gè)數(shù)司志。
動(dòng)態(tài)調(diào)整執(zhí)行計(jì)劃
目前我們支持在運(yùn)行時(shí)動(dòng)態(tài)調(diào)整join的策略,在滿足條件的情況下降宅,即一張表小于Broadcast閾值骂远,可以將SortMergeJoin轉(zhuǎn)化成BroadcastHashJoin。由于SortMergeJoin和BroadcastHashJoin輸出的partition情況并不相同腰根,隨意轉(zhuǎn)換可能在下一個(gè)stage引入額外的shuffle操作激才。因此我們?cè)趧?dòng)態(tài)調(diào)整join策略時(shí),遵循一個(gè)規(guī)則,即在不引入額外shuffle的前提下才進(jìn)行轉(zhuǎn)換贸营。
將SortMergeJoin轉(zhuǎn)化成BroadcastHashJoin有哪些好處呢吨述?因?yàn)閿?shù)據(jù)已經(jīng)shuffle寫到磁盤上,我們?nèi)匀恍枰猻huffle讀取這些數(shù)據(jù)钞脂。我們可以看看圖5的例子揣云,假設(shè)A表和B表join,map階段2張表各有2個(gè)map任務(wù)冰啃,并且shuffle partition個(gè)數(shù)為5邓夕。如果做SortMergeJoin,在reduce階段需要啟動(dòng)5個(gè)reducer阎毅,每個(gè)reducer通過(guò)網(wǎng)絡(luò)shuffle讀取屬于自己的數(shù)據(jù)焚刚。然而,當(dāng)我們?cè)谶\(yùn)行時(shí)發(fā)現(xiàn)B表可以broadcast扇调,并且將其轉(zhuǎn)換成BroadcastHashJoin之后矿咕,我們只需要啟動(dòng)2個(gè)reducer,每一個(gè)reducer讀取一個(gè)mapper的整個(gè)shuffle output文件狼钮。當(dāng)我們調(diào)度這2個(gè)reducer任務(wù)時(shí)碳柱,可以優(yōu)先將其調(diào)度在運(yùn)行mapper的Executor上,因此整個(gè)shuffle讀變成了本地讀取熬芜,沒(méi)有數(shù)據(jù)通過(guò)網(wǎng)絡(luò)傳輸莲镣。并且讀取一個(gè)文件這樣的順序讀,相比原先shuffle時(shí)隨機(jī)的小文件讀涎拉,效率也更勝一籌瑞侮。另外,SortMergeJoin過(guò)程中往往會(huì)出現(xiàn)不同程度的數(shù)據(jù)傾斜問(wèn)題鼓拧,拖慢整體的運(yùn)行時(shí)間半火。而轉(zhuǎn)換成BroadcastHashJoin后,數(shù)據(jù)量一般比較均勻毁枯,也就避免了傾斜慈缔,我們可以在下文實(shí)驗(yàn)結(jié)果中看到更具體的信息。
動(dòng)態(tài)處理數(shù)據(jù)傾斜
在自適應(yīng)執(zhí)行的框架下种玛,我們可以在運(yùn)行時(shí)很容易地檢測(cè)出有數(shù)據(jù)傾斜的partition藐鹤。當(dāng)執(zhí)行某個(gè)stage時(shí),我們收集該stage每個(gè)mapper 的shuffle數(shù)據(jù)大小和記錄條數(shù)赂韵。如果某一個(gè)partition的數(shù)據(jù)量或者記錄條數(shù)超過(guò)中位數(shù)的N倍娱节,并且大于某個(gè)預(yù)先配置的閾值,我們就認(rèn)為這是一個(gè)數(shù)據(jù)傾斜的partition祭示,需要進(jìn)行特殊的處理肄满。
假設(shè)我們A表和B表做inner join,并且A表中第0個(gè)partition是一個(gè)傾斜的partition。一般情況下稠歉,A表和B表中partition 0的數(shù)據(jù)都會(huì)shuffle到同一個(gè)reducer中進(jìn)行處理掰担,由于這個(gè)reducer需要通過(guò)網(wǎng)絡(luò)拉取大量的數(shù)據(jù)并且進(jìn)行處理,它會(huì)成為一個(gè)最慢的任務(wù)拖慢整體的性能怒炸。在自適應(yīng)執(zhí)行框架下带饱,一旦我們發(fā)現(xiàn)A表的partition 0發(fā)生傾斜,我們隨后使用N個(gè)任務(wù)去處理該partition阅羹。每個(gè)任務(wù)只讀取若干個(gè)mapper的shuffle 輸出文件勺疼,然后讀取B表partition 0的數(shù)據(jù)做join。最后捏鱼,我們將N個(gè)任務(wù)join的結(jié)果通過(guò)Union操作合并起來(lái)执庐。為了實(shí)現(xiàn)這樣的處理,我們對(duì)shuffle read的接口也做了改變导梆,允許它只讀取部分mapper中某一個(gè)partition的數(shù)據(jù)轨淌。在這樣的處理中,B表的partition 0會(huì)被讀取N次看尼,雖然這增加了一定的額外代價(jià)猿诸,但是通過(guò)N個(gè)任務(wù)處理傾斜數(shù)據(jù)帶來(lái)的收益仍然大于這樣的代價(jià)。如果B表中partition 0也發(fā)生傾斜狡忙,對(duì)于inner join來(lái)說(shuō)我們也可以將B表的partition 0分成若干塊,分別與A表的partition 0進(jìn)行join址芯,最終union起來(lái)灾茁。但對(duì)于其它的join類型例如Left Semi Join我們暫時(shí)不支持將B表的partition 0拆分。
自適應(yīng)執(zhí)行和Spark SQL在100TB上的性能比較
我們使用99臺(tái)機(jī)器搭建了一個(gè)集群谷炸,使用Spark2.2在TPC-DS 100TB的數(shù)據(jù)集進(jìn)行了實(shí)驗(yàn)北专,比較原版Spark和自適應(yīng)執(zhí)行的性能。以下是集群的詳細(xì)信息:
實(shí)驗(yàn)結(jié)果顯示旬陡,在自適應(yīng)執(zhí)行模式下拓颓,103條SQL中有92條都得到了明顯的性能提升,其中47條SQL的性能提升超過(guò)10%描孟,最大的性能提升達(dá)到了3.8倍驶睦,并且沒(méi)有出現(xiàn)性能下降的情況。另外在原版Spark中匿醒,有5條SQL因?yàn)镺OM等原因無(wú)法順利運(yùn)行场航,在自適應(yīng)模式下我們也對(duì)這些問(wèn)題做了優(yōu)化,使得103條SQL在TPC-DS 100TB數(shù)據(jù)集上全部成功運(yùn)行廉羔。以下是具體的性能提升比例和性能提升最明顯的幾條SQL溉痢。
通過(guò)仔細(xì)分析了這些性能提升的SQL,我們可以看到自適應(yīng)執(zhí)行帶來(lái)的好處。首先是自動(dòng)設(shè)置reducer個(gè)數(shù)孩饼,原版Spark使用10976作為shuffle partition數(shù)髓削,在自適應(yīng)執(zhí)行時(shí),以下SQL的reducer個(gè)數(shù)自動(dòng)調(diào)整為1064和1079镀娶,可以明顯看到執(zhí)行時(shí)間上也提升了很多立膛。這正是因?yàn)闇p少了調(diào)度的負(fù)擔(dān)和任務(wù)啟動(dòng)的時(shí)間,以及減少了磁盤IO請(qǐng)求汽畴。
原版Spark:
自適應(yīng)執(zhí)行:
在運(yùn)行時(shí)動(dòng)態(tài)調(diào)整執(zhí)行計(jì)劃旧巾,將SortMergeJoin轉(zhuǎn)化成BroadcastHashJoin在某些SQL中也帶來(lái)了很大的提升。例如在以下的例子中忍些,原本使用SortMergeJoin因?yàn)閿?shù)據(jù)傾斜等問(wèn)題花費(fèi)了2.5分鐘鲁猩。在自適應(yīng)執(zhí)行時(shí),因?yàn)槠渲幸粡埍淼拇笮≈挥?.5k所以在運(yùn)行時(shí)轉(zhuǎn)化成了BroadcastHashJoin罢坝,執(zhí)行時(shí)間縮短為10秒廓握。
原版Spark:
自適應(yīng)執(zhí)行:
100 TB的挑戰(zhàn)及優(yōu)化
成功運(yùn)行TPC-DS 100 TB數(shù)據(jù)集中的所有SQL,對(duì)于Apache Spark來(lái)說(shuō)也是一大挑戰(zhàn)嘁酿。雖然SparkSQL官方表示支持TPC-DS所有的SQL隙券,但這是基于小數(shù)據(jù)集。在100TB這個(gè)量級(jí)上闹司,Spark暴露出了一些問(wèn)題導(dǎo)致有些SQL執(zhí)行效率不高娱仔,甚至無(wú)法順利執(zhí)行。在做實(shí)驗(yàn)的過(guò)程中游桩,我們?cè)谧赃m應(yīng)執(zhí)行框架的基礎(chǔ)上牲迫,對(duì)Spark也做了其它的優(yōu)化改進(jìn),來(lái)確保所有SQL在100TB數(shù)據(jù)集上可以成功運(yùn)行借卧。以下是一些典型的問(wèn)題盹憎。
統(tǒng)計(jì)map端輸出數(shù)據(jù)時(shí)driver單點(diǎn)瓶頸的優(yōu)化(SPARK-22537)
在每個(gè)map任務(wù)結(jié)束后,會(huì)有一個(gè)表示每個(gè)partition大小的數(shù)據(jù)結(jié)構(gòu)(即下面提到的CompressedMapStatus或HighlyCompressedMapStatus)返回給driver铐刘。而在自適應(yīng)執(zhí)行中陪每,當(dāng)一次shuffle的map stage結(jié)束后,driver會(huì)聚合每個(gè)mapper給出的partition大小信息镰吵,得到在各個(gè)partition上所有mapper輸出的數(shù)據(jù)總大小檩禾。該統(tǒng)計(jì)由單線程完成,如果mapper的數(shù)量是M疤祭,shuffle partition的數(shù)量為S锌订,那么統(tǒng)計(jì)的時(shí)間復(fù)雜度在O(M x S) ~ O (M x S x log(M x S)) 之間,當(dāng)CompressedMapStatus被使用時(shí)画株,復(fù)雜度為這個(gè)區(qū)間的下限辆飘,當(dāng)HighlyCompressedMapStatus被使用時(shí)啦辐,空間有所節(jié)省,時(shí)間會(huì)更長(zhǎng)蜈项,在幾乎所有的partition數(shù)據(jù)都為空時(shí)芹关,復(fù)雜度會(huì)接近該區(qū)間的上限。
在M x S增大時(shí)紧卒,我們會(huì)遇到driver上的單點(diǎn)瓶頸侥衬,一個(gè)明顯的表現(xiàn)是UI上map stage和reduce stage之間的停頓。為了解決這個(gè)單點(diǎn)瓶頸跑芳,我們將任務(wù)盡量均勻地劃分給多個(gè)線程轴总,線程之間不相交地為scala Array中的不同元素賦聚合值。
在這項(xiàng)優(yōu)化中博个,新的spark.shuffle.mapOutput.parallelAggregationThreshold(簡(jiǎn)稱threshold)被引入怀樟,用于配置使用多線程聚合的閾值,聚合的并行度由JVM中可用core數(shù)和M * S / threshold + 1中的小值決定盆佣。
Shuffle讀取連續(xù)partition時(shí)的優(yōu)化 (SPARK-9853)
在自適應(yīng)執(zhí)行的模式下往堡,一個(gè)reducer可能會(huì)從一個(gè)mapoutput文件中讀取諾干個(gè)連續(xù)的數(shù)據(jù)塊。目前的實(shí)現(xiàn)中共耍,它需要拆分成許多獨(dú)立的getBlockData調(diào)用虑灰,每次調(diào)用分別從硬盤讀取一小塊數(shù)據(jù),這樣就需要很多的磁盤IO痹兜。我們對(duì)這樣的場(chǎng)景做了優(yōu)化穆咐,使得Spark可以一次性地把這些連續(xù)數(shù)據(jù)塊都讀上來(lái),這樣就大大減少了磁盤的IO字旭。在小的基準(zhǔn)測(cè)試程序中庸娱,我們發(fā)現(xiàn)shuffle read的性能可以提升3倍。
BroadcastHashJoin中避免不必要的partition讀的優(yōu)化
自適應(yīng)執(zhí)行可以為現(xiàn)有的operator提供更多優(yōu)化的可能谐算。在SortMergeJoin中有一個(gè)基本的設(shè)計(jì):每個(gè)reducetask會(huì)先讀取左表中的記錄,如果左表的 partition為空归露,則右表中的數(shù)據(jù)我們無(wú)需關(guān)注(對(duì)于非anti join的情況)洲脂,這樣的設(shè)計(jì)在左表有一些partition為空時(shí)可以節(jié)省不必要的右表讀取,在SortMergeJoin中這樣的實(shí)現(xiàn)很自然剧包。
BroadcastHashJoin中不存在按照join key分區(qū)的過(guò)程恐锦,所以缺失了這項(xiàng)優(yōu)化。然而在自適應(yīng)執(zhí)行的一些情況中疆液,利用stage間的精確統(tǒng)計(jì)信息一铅,我們可以找回這項(xiàng)優(yōu)化:如果SortMergeJoin在運(yùn)行時(shí)被轉(zhuǎn)換成了BroadcastHashJoin,且我們能得到各個(gè)partition key對(duì)應(yīng)partition的精確大小堕油,則新轉(zhuǎn)換成的BroadcastHashJoin將被告知:無(wú)需去讀那些小表中為空的partition潘飘,因?yàn)椴粫?huì)join出任何結(jié)果肮之。
Baidu真實(shí)產(chǎn)品線試用情況
我們將自適應(yīng)執(zhí)行優(yōu)化應(yīng)用在Baidu內(nèi)部基于Spark SQL的即席查詢服務(wù)BaiduBig SQL之上,做了進(jìn)一步的落地驗(yàn)證卜录,通過(guò)選取單日全天真實(shí)用戶查詢戈擒,按照原有執(zhí)行順序回放重跑和分析,得到如下幾點(diǎn)結(jié)論:
- 對(duì)于秒級(jí)的簡(jiǎn)單查詢艰毒,自適應(yīng)版本的性能提升并不明顯筐高,這主要是因?yàn)樗鼈兊钠款i和主要耗時(shí)集中在了IO上面,而這不是自適應(yīng)執(zhí)行的優(yōu)化點(diǎn)丑瞧。
- 按照查詢復(fù)雜度維度考量測(cè)試結(jié)果發(fā)現(xiàn):查詢中迭代次數(shù)越多柑土,多表join場(chǎng)景越復(fù)雜的情況下自適應(yīng)執(zhí)行效果越好。我們簡(jiǎn)單按照group by, sort, join, 子查詢等操作個(gè)數(shù)來(lái)將查詢分類绊汹,如上關(guān)鍵詞大于3的查詢有明顯的性能提升稽屏,優(yōu)化比從50%~200%不等,主要優(yōu)化點(diǎn)來(lái)源于shuffle的動(dòng)態(tài)并發(fā)數(shù)調(diào)整及join優(yōu)化灸促。
- 從業(yè)務(wù)使用角度來(lái)分析诫欠,前文所述SortMergeJoin轉(zhuǎn)BroadcastHashJoin的優(yōu)化在Big SQL場(chǎng)景中命中了多種典型的業(yè)務(wù)SQL模板,試考慮如下計(jì)算需求:用戶期望從兩張不同維度的計(jì)費(fèi)信息中撈取感興趣的user列表在兩個(gè)維度的整體計(jì)費(fèi)浴栽。收入信息原表大小在百T級(jí)別荒叼,用戶列表只包含對(duì)應(yīng)用戶的元信息,大小在10M以內(nèi)典鸡。兩張計(jì)費(fèi)信息表字段基本一致被廓,所以我們將兩張表與用戶列表做inner join后union做進(jìn)一步分析,SQL表達(dá)如下:
select t.c1, t.id, t.c2, t.c3, t.c4, sum(t.num1), sum(t.num2), sum(t.num3) from
(
select c1, t1.id as id, c2, c3, c4, sum(num1s) as num1, sum(num2) as num2, sum(num3) as num3 from basedata.shitu_a t1 INNER JOIN basedata.user_82_1512023432000 t2 ON (t1.id = t2.id) where (event_day=20171107) and flag != 'true' group by c1, t1.id, c2, c3, c4
union all
select c1, t1.id as id, c2, c3, c4, sum(num1s) as num1, sum(num2) as num2, sum(num3) as num3 from basedata.shitu_b t1 INNER JOIN basedata.user_82_1512023432000 t2 ON (t1.id = t2.id) where (event_day=20171107) and flag != 'true' group by c1, t1.id, c2, c3, c4
) t group by t.c1, t.id, t.c2, t.c3, c4
對(duì)應(yīng)的原版Spark執(zhí)行計(jì)劃如下:
針對(duì)于此類用戶場(chǎng)景萝玷,可以全部命中自適應(yīng)執(zhí)行的join優(yōu)化邏輯嫁乘,執(zhí)行過(guò)程中多次SortMergeJoin轉(zhuǎn)為BroadcastHashJoin,減少了中間內(nèi)存消耗及多輪sort球碉,得到了近200%的性能提升蜓斧。
結(jié)合上述3點(diǎn),下一步自適應(yīng)執(zhí)行在Baidu內(nèi)部的優(yōu)化落地工作將進(jìn)一步集中在大數(shù)據(jù)量睁冬、復(fù)雜查詢的例行批量作業(yè)之上挎春,并考慮與用戶查詢復(fù)雜度關(guān)聯(lián)進(jìn)行動(dòng)態(tài)的開關(guān)控制。對(duì)于數(shù)千臺(tái)的大規(guī)模集群上運(yùn)行的復(fù)雜查詢豆拨,自適應(yīng)執(zhí)行可以動(dòng)態(tài)調(diào)整計(jì)算過(guò)程中的并行度直奋,可以幫助大幅提升集群的資源利用率。另外施禾,自適應(yīng)執(zhí)行可以獲取到多輪stage之間更完整的統(tǒng)計(jì)信息脚线,下一步我們也考慮將對(duì)應(yīng)數(shù)據(jù)及Strategy接口開放給Baidu Spark平臺(tái)上層用戶,針對(duì)特殊作業(yè)進(jìn)行進(jìn)一步的定制化Strategy策略編寫弥搞。
總結(jié)
隨著Spark SQL廣泛的使用以及業(yè)務(wù)規(guī)模的不斷增長(zhǎng)邮绿,在大規(guī)模數(shù)據(jù)集上遇到的易用性和性能方面的挑戰(zhàn)將日益明顯渠旁。本文討論了三個(gè)典型的問(wèn)題,包括調(diào)整shuffle partition數(shù)量斯碌,選擇最佳執(zhí)行計(jì)劃和數(shù)據(jù)傾斜一死。這些問(wèn)題在現(xiàn)有的框架下并不容易解決,而自適應(yīng)執(zhí)行可以很好地應(yīng)對(duì)這些問(wèn)題傻唾。我們介紹了自適應(yīng)執(zhí)行的基本架構(gòu)以及解決這些問(wèn)題的具體方法投慈。最后我們?cè)赥PC-DS 100TB數(shù)據(jù)集上驗(yàn)證了自適應(yīng)執(zhí)行的優(yōu)勢(shì),相比較原版Spark SQL冠骄,103個(gè)SQL查詢中伪煤,90%的查詢都得到了明顯的性能提升,最大的提升達(dá)到3.8倍凛辣,并且原先失敗的5個(gè)查詢?cè)谧赃m應(yīng)執(zhí)行下也順利完成抱既。我們?cè)诎俣鹊腂ig SQL平臺(tái)也做了進(jìn)一步的驗(yàn)證,對(duì)于復(fù)雜的真實(shí)查詢可以達(dá)到2倍的性能提升扁誓》辣茫總之,自適應(yīng)執(zhí)行解決了Spark SQL在大數(shù)據(jù)規(guī)模上遇到的很多挑戰(zhàn)蝗敢,并且很大程度上改善了Spark SQL的易用性和性能捷泞,提高了超大集群中多租戶多并發(fā)作業(yè)情況下集群的資源利用率。將來(lái)寿谴,我們考慮在自適應(yīng)執(zhí)行的框架之下锁右,提供更多運(yùn)行時(shí)可以優(yōu)化的策略,并且將我們的工作貢獻(xiàn)回饋給社區(qū)讶泰,也希望有更多的朋友可以參與進(jìn)來(lái)咏瑟,將其進(jìn)一步完善。
轉(zhuǎn)載:https://blog.csdn.net/fl63zv9zou86950w/article/details/79049280?utm_source=copy