shuffle調(diào)優(yōu)
上一篇介紹了HashShuffleManager,這次介紹SortShuffleManager
SortShuffleManager運(yùn)行原理
SortShuffleManager的運(yùn)行機(jī)制主要分成兩種,一種是普通運(yùn)行機(jī)制诸老,另一種是bypass運(yùn)行機(jī)制汹押。當(dāng)shuffle read task的數(shù)量小于等于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值時(shí)(默認(rèn)為200)千所,就會(huì)啟用bypass機(jī)制奏寨。
普通運(yùn)行機(jī)制
下圖說明了普通的SortShuffleManager的原理榛搔。在該模式下,數(shù)據(jù)會(huì)先寫入一個(gè)內(nèi)存數(shù)據(jù)結(jié)構(gòu)中说贝,此時(shí)根據(jù)不同的shuffle算子议惰,可能選用不同的數(shù)據(jù)結(jié)構(gòu)。
如果是reduceByKey這種聚合類的shuffle算子乡恕,那么會(huì)選用Map數(shù)據(jù)結(jié)構(gòu)言询,一邊通過Map進(jìn)行聚合,一邊寫入內(nèi)存几颜;如果是join這種普通的shuffle算子簇爆,那么會(huì)選用Array數(shù)據(jù)結(jié)構(gòu)粱胜,直接寫入內(nèi)存蛔垢。
接著泵额,每寫一條數(shù)據(jù)進(jìn)入內(nèi)存數(shù)據(jù)結(jié)構(gòu)之后,就會(huì)判斷一下谆趾,是否達(dá)到了某個(gè)臨界閾值躁愿。如果達(dá)到臨界閾值的話,那么就會(huì)嘗試將內(nèi)存數(shù)據(jù)結(jié)構(gòu)中的數(shù)據(jù)溢寫到磁盤沪蓬,然后清空內(nèi)存數(shù)據(jù)結(jié)構(gòu)彤钟。
在溢寫到磁盤文件之前,會(huì)先根據(jù)key對(duì)內(nèi)存數(shù)據(jù)結(jié)構(gòu)中已有的數(shù)據(jù)進(jìn)行排序跷叉。排序過后逸雹,會(huì)分批將數(shù)據(jù)寫入磁盤文件。默認(rèn)的batch數(shù)量是10000條云挟,也就是說梆砸,排序好的數(shù)據(jù),會(huì)以每批1萬條數(shù)據(jù)的形式分批寫入磁盤文件园欣。寫入磁盤文件是通過Java的BufferedOutputStream實(shí)現(xiàn)的帖世。BufferedOutputStream是Java的緩沖輸出流,首先會(huì)將數(shù)據(jù)緩沖在內(nèi)存中沸枯,當(dāng)內(nèi)存緩沖滿溢之后再一次寫入磁盤文件中日矫,這樣可以減少磁盤IO次數(shù),提升性能绑榴。
一個(gè)task將所有數(shù)據(jù)寫入內(nèi)存數(shù)據(jù)結(jié)構(gòu)的過程中哪轿,會(huì)發(fā)生多次磁盤溢寫操作,也就會(huì)產(chǎn)生多個(gè)臨時(shí)文件翔怎。最后會(huì)將之前所有的臨時(shí)磁盤文件都進(jìn)行合并窃诉,這就是merge過程,此時(shí)會(huì)將之前所有臨時(shí)磁盤文件中的數(shù)據(jù)讀取出來,然后依次寫入最終的磁盤文件之中褐奴。此外,由于一個(gè)task就只對(duì)應(yīng)一個(gè)磁盤文件于毙,也就意味著該task為下游stage的task準(zhǔn)備的數(shù)據(jù)都在這一個(gè)文件中敦冬,因此還會(huì)單獨(dú)寫一份索引文件,其中標(biāo)識(shí)了下游各個(gè)task的數(shù)據(jù)在文件中的start offset與end offset唯沮。
SortShuffleManager由于有一個(gè)磁盤文件merge的過程脖旱,因此大大減少了文件數(shù)量。比如第一個(gè)stage有50個(gè)task介蛉,總共有10個(gè)Executor萌庆,每個(gè)Executor執(zhí)行5個(gè)task,而第二個(gè)stage有100個(gè)task币旧。由于每個(gè)task最終只有一個(gè)磁盤文件践险,因此此時(shí)每個(gè)Executor上只有5個(gè)磁盤文件,所有Executor只有50個(gè)磁盤文件吹菱。
bypass運(yùn)行機(jī)制
下圖說明了bypass SortShuffleManager的原理巍虫。bypass運(yùn)行機(jī)制的觸發(fā)條件如下:
- shuffle map task數(shù)量小于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值。
- 不是聚合類的shuffle算子(比如reduceByKey)鳍刷。
此時(shí)task會(huì)為每個(gè)下游task都創(chuàng)建一個(gè)臨時(shí)磁盤文件占遥,并將數(shù)據(jù)按key進(jìn)行hash然后根據(jù)key的hash值,將key寫入對(duì)應(yīng)的磁盤文件之中输瓜。當(dāng)然瓦胎,寫入磁盤文件時(shí)也是先寫入內(nèi)存緩沖,緩沖寫滿之后再溢寫到磁盤文件的尤揣。最后搔啊,同樣會(huì)將所有臨時(shí)磁盤文件都合并成一個(gè)磁盤文件,并創(chuàng)建一個(gè)單獨(dú)的索引文件芹缔。
該過程的磁盤寫機(jī)制其實(shí)跟未經(jīng)優(yōu)化的HashShuffleManager是一模一樣的坯癣,因?yàn)槎家獎(jiǎng)?chuàng)建數(shù)量驚人的磁盤文件,只是在最后會(huì)做一個(gè)磁盤文件的合并而已最欠。因此少量的最終磁盤文件示罗,也讓該機(jī)制相對(duì)未經(jīng)優(yōu)化的HashShuffleManager來說,shuffle read的性能會(huì)更好芝硬。
而該機(jī)制與普通SortShuffleManager運(yùn)行機(jī)制的不同在于:
第一蚜点,磁盤寫機(jī)制不同;
第二拌阴,不會(huì)進(jìn)行排序绍绘。
也就是說,啟用該機(jī)制的最大好處在于,shuffle write過程中陪拘,不需要進(jìn)行數(shù)據(jù)的排序操作厂镇,也就節(jié)省掉了這部分的性能開銷。