本文翻譯自Mastering Query Plans in Spark 3.0,能夠很好的幫助學(xué)習(xí)spark sql理解spark UI的計劃邢锯,決定翻譯記錄一下。
在Spark SQL中查詢計劃是理解查詢執(zhí)行的入口點,它攜帶了大量的信息慕购,并且能夠洞察查詢是怎么執(zhí)行的。在大的負載下或者執(zhí)行的任務(wù)很長的時候康辑,這些信息很重要的咸灿。從查詢計劃的信息我們可以發(fā)現(xiàn)哪些是低效的并且能夠重寫查詢?nèi)ヌ峁└玫男阅堋?br>
對于不熟悉查詢計劃的人來說,乍一看冕茅,這些信息有點難懂。它是樹形結(jié)構(gòu)蛹找,并且每個節(jié)點代表了一種操作姨伤,每種操作上提供了執(zhí)行的基本信息。spark官方文檔上涉及到查詢計劃的信息是比較少的庸疾。這邊文章的動機就是讓我們熟悉物理計劃乍楚,我們接下來將會來看一下常用到的操作以及它們提供的信息以及他們是怎么執(zhí)行的。
這邊文章所涉及到的理論大部分是基于對源碼的研究和運行優(yōu)化spark查詢計劃的實踐届慈。
基本例子
我們考慮一個簡單的例子徒溪,一個查詢中涉及到filter以及aggregation,join操作的語句:
# in PySpark API:
query = (
questionsDF
.filter(col('year') == 2019)
.groupBy('user_id')
.agg(
count('*').alias('cnt')
)
.join(usersDF, 'user_id')
)
我們把例子中的usersDF是一組問問題的用戶金顿,這些問題用questionsDF來表示臊泌。這些問題用year的這一列來進行分區(qū),代表著哪一年問的問題揍拆。在這個查詢里渠概,我們對2019年問問題的用戶感興趣,并且想知道每個人問了多少問題,而且我們想知道在輸出中我們想知道一些額外信息播揪,這就是為什么我們在聚合之后進行了usersDF的join操作贮喧。
這里有兩種基本的方式去查看物理計劃。第一種是在DataFrame上調(diào)用explain函數(shù),該函數(shù)展現(xiàn)這個計劃的文本化的展示:
這在spark 3.0有了一些優(yōu)化猪狈,explain函數(shù)帶有了一個新參數(shù) mode箱沦,這個參數(shù)的值可以是:formatted,cost雇庙,codegen谓形。使用formatted模式將會把查詢計劃轉(zhuǎn)化為更加有組織的輸出(這里之展現(xiàn)了一部分):
在formatted計劃中,我們能看到裸數(shù)疆前,改裸數(shù)只是展現(xiàn)了操作的名字并帶有一個括號的數(shù)字套耕。在數(shù)的下面,這里有一些數(shù)字對應(yīng)的細節(jié)描述峡继。cost模式將會展示除了物理計劃之外的優(yōu)化的邏輯計劃冯袍,這些邏輯計劃帶有每個操作的統(tǒng)計信息,所以我們能看到在不同執(zhí)行階段的數(shù)據(jù)大小碾牌。最終codegen模式展現(xiàn)了將會執(zhí)行的生成的java代碼康愤。
第二種方式是查看spark ui中的sql tab,這里有正在跑的和已經(jīng)完成了的查詢舶吗。通過點擊你要查看的查詢征冷,我們可以看到物理計劃的文本表示锭弊。在下面這個圖片中鹃唯,我們結(jié)合圖形表示昭殉,文本表示以及它們之間的對應(yīng)關(guān)系:
不同點是圖形表示的葉子節(jié)點在上面芝此,根節(jié)點在下面,而文本表示的是反過來的磷箕。
CollapseCodegenStages
在物理計劃的圖形表示中,你能看到一些操作被組織成了一大塊藍色的矩形尝江。這些大矩形對應(yīng)著codegen階段吠昭。這是發(fā)生在物理計劃的優(yōu)化階段傲隶。這個是叫做CollapseCodegenStages來負責(zé)優(yōu)化的饺律,原理是把支持代碼生成的操作聚合到一起,通過消除虛擬函數(shù)的調(diào)用來加速跺株。但是并不是所有的操作支持代碼生成复濒。所以一些操作(如exchange操作)并不是大矩形的一部分。在我們的例子中乒省,這里有三個codegen stages巧颈,對應(yīng)著三個大矩形,你能在操作的括號中看到codegen stage的id袖扛。從這個樹我們也可以分辨出一個操作是夠支持代碼生成砸泛,因為加入支持代碼生成的話,這里將會在對應(yīng)的操作的括號里有個星號。
我們簡單的分析一下在我們查詢中的每一個操作晾嘶。
Scan parquet
scan parquet操作代表著從parquet文件中讀取數(shù)據(jù)。從明細信息中娶吞,我們能直接看到從這個數(shù)據(jù)源中我們選擇了哪些列垒迂。雖然我們沒指定具體的字段,但是這里也會應(yīng)用ColumnPruning規(guī)則妒蛇,這個規(guī)則會確保只有真正字段才會從這個數(shù)據(jù)源中提取出來机断。我們也能看到有兩種filters:PartitionFilters和PushFilters。PartitionFilters應(yīng)用在數(shù)據(jù)源分區(qū)的字段上绣夺。這是非常重要的因為我們能跳過我們不需要的數(shù)據(jù)吏奸。檢查對應(yīng)的filters是否傳播到正確的位置總是沒錯的。這是因為我們盡可能讀取少量的數(shù)據(jù)陶耍,因為IO是比較費時的奋蔚。在spark 2.4,這里還有一個代表實際讀取到的分區(qū)的partitionCount字段烈钞,這個字段在spark 3.0已經(jīng)去掉了泊碑。
PushFilters把字段直接下推到parquet文件中去,假如parquet文件過濾的列是按照過濾字段排序的話毯欣,這個規(guī)則就很有用了馒过,因為這種情況下,我們能利用parquet內(nèi)部結(jié)構(gòu)去過濾數(shù)據(jù)酗钞。parquet文件是按照行組和每個行組的元數(shù)據(jù)文件組成的腹忽。這個元數(shù)據(jù)包含了每個行組的最大最小值,基于這個信息砚作,我們就能判斷是否讀取這個行組窘奏。
Filter
Filter操作佷容易理解。它僅僅是代表過濾條件葫录。但是這個操作怎么創(chuàng)建的并不是很明顯蔼夜,因為在查詢中它并不是直接對應(yīng)著過濾條件。因為所有的filters首先被Catalyst optimzer處理压昼,改規(guī)則可能修改或者重新移動她們求冷。這里有好幾個規(guī)則在她們轉(zhuǎn)換為物理計劃前的邏輯計劃。我們列舉了一下:
- PushDownPredicates-這個規(guī)則通過其他的操作把filter下推到離數(shù)據(jù)源更近的地方窍霞,但不是所有的操作都支持匠题。比如,如果表達式不是確定性的但金,這就不行韭山,假如我們使用類似first,last,collect_set,collect_list,rand等钱磅,filters操作就不能通過這些操作而進行下推梦裂,因為這些函數(shù)是不確定性的。
- CombineFilters-結(jié)合兩個臨近的操作合成一個(收集兩個filters條件合成一個更為復(fù)雜的的條件)
- InferFiltersFromConstraints-這個規(guī)則實際上會創(chuàng)建新的filter操作盖淡,如從join操作(從inner join中創(chuàng)建一個joining key is not null)
- PruneFilters-移除多余的filters(比如一個filters總是true)
Exchange
Exchange操作代表著shuffle操作年柠,意味著物理數(shù)據(jù)的集群范圍內(nèi)的移動。這個操作是很費時的褪迟,因為它會通過網(wǎng)絡(luò)移動數(shù)據(jù)冗恨。查詢計劃的信息也包含了一些數(shù)據(jù)重新分區(qū)的細節(jié)。在我們的例子中味赃,是hashPartitioning(user_id,200):
這意味著數(shù)據(jù)將會根據(jù)user_id列重新分區(qū)為200個分區(qū)掀抹,有著同樣user_id的行將會屬于同一個分區(qū),將會分配到同一個executor上心俗。為了確保只有200分區(qū)傲武,spark將會計算user_id的hashcode并且對200取模。這個結(jié)果就是不同的user_ids就會分到同一個分區(qū)城榛。同時有些分區(qū)可能是空的谱轨。這里也有其他類型的分區(qū)值的去留意一下:
- RoundRobinPartitioning-數(shù)據(jù)將會隨機分配到n個分區(qū)中,n在函數(shù)repartition(n)中指定
- SinglePartition-所有數(shù)據(jù)將會分配到一個分區(qū)中吠谢,進而到一個executor中土童。
- RangePartitioning-這個用在對數(shù)據(jù)排序中,用在orderBy或者sort操作中
HashAggregate
這個代表著數(shù)據(jù)聚合工坊,這個經(jīng)常是兩個操作献汗,要么被Exchange分開或者不分開:
為什么這里有兩個HashAggregate操作的原因是第一個是部分聚合,它在每個executor上每個分區(qū)分別進行聚合王污。在我們的例子中罢吃,你能看到partial_count(1)的function字段,最終的部分聚合結(jié)果就是第二個聚合昭齐。這個操作也展示了數(shù)據(jù)按照哪個分組的Keys字段尿招。results字段展示了在聚合以后的可用的列。
BroadcastHashJoin & BroadcastExchange
BroadcastHashJoin(BHJ)代表著join算法的操作阱驾,除了這個就谜,還有SortMergeJoin和ShuffleHashJoin。BHJ總是伴隨著BroadcastExchange里覆,這個代表著廣播shuffle-數(shù)據(jù)將會收集到driver端并且會被傳播到需要的executor上丧荐。
ColumnarToRow
這是在spark 3.0引入的新操作,用于列行之間的轉(zhuǎn)換
總結(jié)
在spark sql中的物理計劃由攜帶了有用信息的操作組成喧枷,正確理解每個操作能夠更好的洞察執(zhí)行虹统,并且通過分析計劃弓坞,我們可以分析是夠是最優(yōu)的,必要的時候可以進行優(yōu)化车荔。
在這篇文章里渡冻,我們描述了在物理計劃中經(jīng)常用到的一組操作,雖然不是全部但是我們盡量去覆蓋經(jīng)常使用到的操作忧便。