1. 應(yīng)用開發(fā)的原則
-
原則一:坐享其成
我們應(yīng)該盡可能地充分利用 Spark 為我們提供的“性能紅利”,如鎢絲計劃摸屠、AQE诸典、SQL functions 等等。
AQE 可以讓 Spark 在運(yùn)行時的不同階段今膊,結(jié)合實(shí)時的運(yùn)行時狀態(tài)些阅,周期性地動態(tài)調(diào)整前面的邏輯計劃,然后根據(jù)再優(yōu)化的邏輯計劃斑唬,重新選定最優(yōu)的物理計劃市埋,從而調(diào)整運(yùn)行時后續(xù)階段的執(zhí)行方式。AQE 功能默認(rèn)是關(guān)閉的恕刘,如果我們想要充分利用自動分區(qū)合并缤谎、自動數(shù)據(jù)傾斜處理和 Join 策略調(diào)整,需要把相關(guān)的配置項(xiàng)打開
AQE 原則二:能省則省褐着、能拖則拖
盡量把能節(jié)省數(shù)據(jù)掃描量和數(shù)據(jù)處理量的操作往前推坷澡;盡力消滅掉 Shuffle,省去數(shù)據(jù)落盤與分發(fā)的開銷含蓉;如果不能干掉 Shuffle频敛,盡可能地把涉及 Shuffle 的操作拖到最后去執(zhí)行项郊。原則三:跳出單機(jī)思維模式
比如忽視實(shí)例化 Util 操作的行為還有很多,比如在循環(huán)語句中反復(fù)訪問 RDD斟赚,用臨時變量緩存數(shù)據(jù)轉(zhuǎn)換的中間結(jié)果等等着降,單機(jī)思維模式會讓開發(fā)者在分布式環(huán)境中,無意識地引入巨大的計算開銷拗军。
2. 配置項(xiàng)速查手冊
計算負(fù)載主要由 Executors 承擔(dān)鹊碍,Driver 主要負(fù)責(zé)分布式調(diào)度,調(diào)優(yōu)空間有限食绿,因此對 Driver 端的配置項(xiàng)我們不作考慮
配置項(xiàng)的分類
- 首先侈咕,硬件資源類包含的是與 CPU、內(nèi)存器紧、磁盤有關(guān)的配置項(xiàng)
- 其次耀销,Shuffle 類是專門針對 Shuffle 操作的。
- 最后铲汪,Spark SQL 早已演化為新一代的底層優(yōu)化引擎熊尉。
哪些配置項(xiàng)與CPU設(shè)置有關(guān)?
通過如下參數(shù)就可以明確有多少 CPU 資源被劃撥給 Spark 用于分布式計算掌腰。
spark.cores.max 集群
spark.executor.cores Executor
spark.task.cpus 計算任務(wù)
并行度
spark.default.parallelism 并行度
spark.sql.shuffle.partitions 用于明確指定數(shù)據(jù)關(guān)聯(lián)或聚合操作中 Reduce 端的分區(qū)數(shù)量狰住。
哪些配置項(xiàng)與內(nèi)存設(shè)置有關(guān)?
在平衡 Execution memory 與 Storage memory 的時候齿梁,如果 RDD 緩存是剛需催植,我們就把 spark.memory.storageFraction 調(diào)大,并且在應(yīng)用中優(yōu)先把緩存灌滿勺择,再把計算邏輯應(yīng)用在緩存數(shù)據(jù)之上创南。除此之外,我們還可以同時調(diào)整 spark.rdd.compress 和 spark.memory.storageFraction 來緩和 Full GC 的沖擊
哪些配置項(xiàng)與磁盤設(shè)置有關(guān)?
spark.local.dir 這個配置項(xiàng),這個參數(shù)允許開發(fā)者設(shè)置磁盤目錄授嘀,該目錄用于存儲 RDD cache 落盤數(shù)據(jù)塊和 Shuffle 中間文件。如果你的經(jīng)費(fèi)比較充裕邻储,有條件在計算節(jié)點(diǎn)中配備足量的 SSD 存儲,甚至是更多的內(nèi)存資源旧噪,完全可以把 SSD 上的文件系統(tǒng)目錄吨娜,或是內(nèi)存文件系統(tǒng)添加到 spark.local.dir 配置項(xiàng)中去,從而提供更好的 I/O 性能舌菜。
Shuffle 類配置項(xiàng)
首先萌壳,在 Map 階段亦镶,計算結(jié)果會以中間文件的形式被寫入到磁盤文件系統(tǒng)日月。同時袱瓮,為了避免頻繁的 I/O 操作,Spark 會把中間文件存儲到寫緩沖區(qū)(Write Buffer)爱咬。這個時候尺借,我們可以通過設(shè)置 spark.shuffle.file.buffer 來擴(kuò)大寫緩沖區(qū)的大小,緩沖區(qū)越大精拟,能夠緩存的落盤數(shù)據(jù)越多燎斩,Spark 需要刷盤的次數(shù)就越少,I/O 效率也就能得到整體的提升蜂绎。
其次栅表,在 Reduce 階段,因?yàn)?Spark 會通過網(wǎng)絡(luò)從不同節(jié)點(diǎn)的磁盤中拉取中間文件师枣,它們又會以數(shù)據(jù)塊的形式暫存到計算節(jié)點(diǎn)的讀緩沖區(qū)(Read Buffer)怪瓶。緩沖區(qū)越大,可以暫存的數(shù)據(jù)塊越多践美,在數(shù)據(jù)總量不變的情況下洗贰,拉取數(shù)據(jù)所需的網(wǎng)絡(luò)請求次數(shù)越少,單次請求的網(wǎng)絡(luò)吞吐越高陨倡,網(wǎng)絡(luò) I/O 的效率也就越高敛滋。這個時候,我們就可以通過 spark.reducer.maxSizeInFlight 配置項(xiàng)控制 Reduce 端緩沖區(qū)大小兴革,來調(diào)節(jié) Shuffle 過程中的網(wǎng)絡(luò)負(fù)載绎晃。
自 1.6 版本之后,Spark 統(tǒng)一采用 Sort shuffle manager 來管理 Shuffle 操作杂曲,在 Sort shuffle manager 的管理機(jī)制下箕昭,無論計算結(jié)果本身是否需要排序,Shuffle 計算過程在 Map 階段和 Reduce 階段都會引入排序操作解阅。
在不需要聚合落竹,也不需要排序的計算場景中,我們就可以通過設(shè)置 spark.shuffle.sort.bypassMergeThreshold 的參數(shù)货抄,來改變 Reduce 端的并行度(默認(rèn)值是 200)述召。當(dāng) Reduce 端的分區(qū)數(shù)小于這個設(shè)置值的時候,我們就能避免 Shuffle 在計算過程引入排序蟹地。
Spark SQL 大類配置項(xiàng)
Spark SQL 下面的配置項(xiàng)還是蠻多的积暖,其中對執(zhí)行性能貢獻(xiàn)最大的,當(dāng)屬 AQE(Adaptive query execution怪与,自適應(yīng)查詢引擎)引入的那 3 個特性了夺刑,也就是自動分區(qū)合并、自動數(shù)據(jù)傾斜處理和 Join 策略調(diào)整。
哪些配置項(xiàng)與自動分區(qū)合并有關(guān)遍愿?
AQE 事先并不判斷哪些分區(qū)足夠小存淫,而是按照分區(qū)編號進(jìn)行掃描,當(dāng)掃描量超過“目標(biāo)尺寸”時沼填,就合并一次,那么桅咆,“目標(biāo)尺寸”由什么來決定的呢?Spark 提供了兩個配置項(xiàng)來共同決定分區(qū)合并的“目標(biāo)尺寸”,分區(qū)合并的目標(biāo)尺寸取 advisoryPartitionSizeInBytes 與 partitionSize (每個分區(qū)的平均大小)之間的最小值坞笙。
我們來舉個例子岩饼。假設(shè),Shuffle 過后數(shù)據(jù)大小為 20GB薛夜,minPartitionNum 設(shè)置為 200籍茧,反推過來,每個分區(qū)的尺寸就是 20GB / 200 = 100MB梯澜。再假設(shè)硕糊,advisoryPartitionSizeInBytes 設(shè)置為 200MB,最終的目標(biāo)分區(qū)尺寸就是壤搬恪(100MB简十,200MB)之間的最小值,也就是 100MB撬腾。因此你看螟蝙,并不是你指定了 advisoryPartitionSizeInBytes 是多少
哪些配置項(xiàng)與自動數(shù)據(jù)傾斜處理有關(guān)?
首先民傻,分區(qū)尺寸必須要大于 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 參數(shù)的設(shè)定值胰默,才有可能被判定為傾斜分區(qū)。然后漓踢,AQE 統(tǒng)計所有數(shù)據(jù)分區(qū)大小并排序牵署,取中位數(shù)作為放大基數(shù),尺寸大于中位數(shù)一定倍數(shù)的分區(qū)會被判定為傾斜分區(qū)喧半,中位數(shù)的放大倍數(shù)也是由參數(shù) spark.sql.adaptive.skewJoin.skewedPartitionFactor(默認(rèn)值是 5 倍) 控制奴迅。
哪些配置項(xiàng)與 Join 策略調(diào)整有關(guān)?
實(shí)際上指的是挺据,把會引入 Shuffle 的 Join 方式取具,如 Hash Join、Sort Merge Join扁耐,“降級”(Demote)為 Broadcast Join暇检。
在 Spark 發(fā)布 AQE 之前,開發(fā)者可以利用 spark.sql.autoBroadcastJoinThreshold 配置項(xiàng)對數(shù)據(jù)關(guān)聯(lián)操作進(jìn)行主動降級婉称。這個參數(shù)的默認(rèn)值是 10MB块仆,參與 Join 的兩張表中只要有一張數(shù)據(jù)表的尺寸小于 10MB
不過构蹬,autoBroadcastJoinThreshold 這個參數(shù)雖然好用,但是有兩個讓人頭疼的短板悔据。一是可靠性較差庄敛。盡管開發(fā)者明確設(shè)置了廣播閾值,而且小表數(shù)據(jù)量在閾值以內(nèi)蜜暑,但 Spark 對小表尺寸的誤判時有發(fā)生,導(dǎo)致 Broadcast Join 降級失敗策肝。二來肛捍,預(yù)先設(shè)置廣播閾值是一種靜態(tài)的優(yōu)化機(jī)制,它沒有辦法在運(yùn)行時動態(tài)對數(shù)據(jù)關(guān)聯(lián)進(jìn)行降級調(diào)整之众。
AQE 很好地解決了這兩個頭疼的問題拙毫。首先,AQE 的 Join 策略調(diào)整是一種動態(tài)優(yōu)化機(jī)制棺禾,對于剛才的兩張大表缀蹄,AQE 會在數(shù)據(jù)表完成過濾操作之后動態(tài)計算剩余數(shù)據(jù)量,當(dāng)數(shù)據(jù)量滿足廣播條件時膘婶,AQE 會重新調(diào)整邏輯執(zhí)行計劃缺前,在新的邏輯計劃中把 Shuffle Joins 降級為 Broadcast Join。再者悬襟,運(yùn)行時的數(shù)據(jù)量估算要比編譯時準(zhǔn)確得多衅码,因此 AQE 的動態(tài) Join 策略調(diào)整相比靜態(tài)優(yōu)化會更可靠、更穩(wěn)定脊岳。
3. 性能殺手Shuffle
Map 階段是如何輸出中間文件的逝段?
- Map 階段的輸出到底是什么?
Map 階段最終生產(chǎn)的數(shù)據(jù)會以中間文件的形式物化到磁盤中割捅,這些中間文件就存儲在 spark.local.dir 設(shè)置的文件目錄里奶躯。中間文件包含兩種類型:一類是后綴為 data 的數(shù)據(jù)文件,存儲的內(nèi)容是 Map 階段生產(chǎn)的待分發(fā)數(shù)據(jù)亿驾;另一類是后綴為 index 的索引文件嘹黔,它記錄的是數(shù)據(jù)文件中不同分區(qū)的偏移地址。這里的分區(qū)是指 Reduce 階段的分區(qū)莫瞬,因此参淹,分區(qū)數(shù)量與 Reduce 階段的并行度保持一致。 - groupByKey版Map 階段的計算步驟乏悄?
對于分片中的數(shù)據(jù)記錄浙值,逐一計算其目標(biāo)分區(qū),并將其填充到 PartitionedPairBuffer(第一個元素是(目標(biāo)分區(qū)檩小,Key)开呐,第二個元素是 Value。);
PartitionedPairBuffer 填滿后筐付,如果分片中還有未處理的數(shù)據(jù)記錄卵惦,就對 Buffer 中的數(shù)據(jù)記錄按(目標(biāo)分區(qū) ID,Key)進(jìn)行排序瓦戚,將所有數(shù)據(jù)溢出到臨時文件沮尿,同時清空緩存;
重復(fù)步驟 1较解、2畜疾,直到分片中所有的數(shù)據(jù)記錄都被處理;
對所有臨時文件和 PartitionedPairBuffer 歸并排序(文件內(nèi)的數(shù)據(jù)是有序的)印衔,最終生成數(shù)據(jù)文件和索引文件啡捶。 - reduceByKey版的Map 階段的計算步驟?
在計算的過程中奸焙,reduceByKey 采用一種叫做 PartitionedAppendOnlyMap 的數(shù)據(jù)結(jié)構(gòu)來填充數(shù)據(jù)記錄瞎暑。這個數(shù)據(jù)結(jié)構(gòu)是一種 Map,而 Map 的 Value 值是可累加与帆、可更新的了赌。因此,PartitionedAppendOnlyMap 非常適合聚合類的計算場景玄糟,如計數(shù)揍拆、求和、均值計算茶凳、極值計算等等嫂拴。
以此類推,最終合并的數(shù)據(jù)文件也會小很多贮喧。依靠高效的內(nèi)存數(shù)據(jù)結(jié)構(gòu)筒狠、更少的磁盤文件、更小的文件尺寸箱沦,我們就能大幅降低了 Shuffle 過程中的磁盤和網(wǎng)絡(luò)開銷辩恼。
Reduce 階段是如何進(jìn)行數(shù)據(jù)分發(fā)的?
每個 Map Task 生成的數(shù)據(jù)文件谓形,都包含所有 Reduce Task 所需的部分?jǐn)?shù)據(jù)灶伊。因此,任何一個 Reduce Task 要想完成計算寒跳,必須先從所有 Map Task 的中間文件里去拉取屬于自己的那部分?jǐn)?shù)據(jù)聘萨。索引文件正是用于幫助判定哪部分?jǐn)?shù)據(jù)屬于哪個 Reduce Task。Reduce Task 通過網(wǎng)絡(luò)拉取中間文件的過程童太,實(shí)際上就是不同 Stages 之間數(shù)據(jù)分發(fā)的過程米辐。
顯然胸完,Shuffle 中數(shù)據(jù)分發(fā)的網(wǎng)絡(luò)開銷,會隨著 Map Task 與 Reduce Task 的線性增長翘贮,呈指數(shù)級爆炸赊窥。
性能殺手
- 首先,對于 Shuffle 來說狸页,它需要消耗所有的硬件資源:無論是 PartitionedPairBuffer锨能、PartitionedAppendOnlyMap 這些內(nèi)存數(shù)據(jù)結(jié)構(gòu),還是讀寫緩沖區(qū)芍耘,都會消耗寶貴的內(nèi)存資源址遇;由于內(nèi)存空間有限,因此溢出的臨時文件會引入大量磁盤 I/O齿穗,而且傲隶,Map 階段輸出的中間文件也會消耗磁盤饺律;呈指數(shù)級增長的跨節(jié)點(diǎn)數(shù)據(jù)分發(fā)窃页,帶來的網(wǎng)絡(luò)開銷更是不容小覷。
-
其次复濒,Shuffle 消耗的不同硬件資源之間很難達(dá)到平衡
延遲
4. 廣播變量
兩種創(chuàng)建廣播變量的方式
- 從普通變量創(chuàng)建廣播變量脖卖。在廣播變量的運(yùn)行機(jī)制下,普通變量存儲的數(shù)據(jù)封裝成廣播變量巧颈,由 Driver 端以 Executors 為粒度進(jìn)行分發(fā)畦木,每一個 Executors 接收到廣播變量之后,將其交由 BlockManager 管理砸泛。
- 從分布式數(shù)據(jù)集創(chuàng)建廣播變量十籍,這就要比第一種方式復(fù)雜一些了。第一步唇礁,Driver 需要從所有的 Executors 拉取數(shù)據(jù)分片勾栗,然后在本地構(gòu)建全量數(shù)據(jù);第二步盏筐,Driver 把匯總好的全量數(shù)據(jù)分發(fā)給各個 Executors围俘,Executors 再將接收到的全量數(shù)據(jù)緩存到存儲系統(tǒng)的 BlockManager 中。
Shuffle Joins
第一步就是對參與關(guān)聯(lián)的左右表分別進(jìn)行 Shuffle琢融,Shuffle 的分區(qū)規(guī)則是先對 Join keys 計算哈希值界牡,再把哈希值對分區(qū)數(shù)取模。Shuffle 完成之后漾抬,第二步就是在同一個 Executors 內(nèi)宿亡,Reduce task 就可以對 userID 一致的記錄進(jìn)行關(guān)聯(lián)操作。
Broadcast Join
利用配置項(xiàng)強(qiáng)制廣播
使用廣播閾值配置項(xiàng)讓 Spark 優(yōu)先選擇 Broadcast Joins 的關(guān)鍵纳令,就是要確保至少有一張表的存儲尺寸小于廣播閾值(數(shù)據(jù)表在磁盤上的存儲大小她混,同一份數(shù)據(jù)在內(nèi)存中的存儲大小往往會比磁盤中的存儲大小膨脹數(shù)倍)
-
利用配置項(xiàng)強(qiáng)制廣播
利用配置項(xiàng)強(qiáng)制廣播烈钞,它的設(shè)置值是存儲大小,默認(rèn)是 10MB - 用 Join Hints 強(qiáng)制廣播
設(shè)置 Join Hints 的方法就是在 SQL 結(jié)構(gòu)化查詢語句里面加上一句“/*+ broadcast(某表) */” - 用 broadcast 函數(shù)強(qiáng)制廣播
廣播變量不是銀彈
- 首先坤按,從性能上來講毯欣,Driver 在創(chuàng)建廣播變量的過程中,需要拉取分布式數(shù)據(jù)集所有的數(shù)據(jù)分片臭脓。
- 其次酗钞,從功能上來講,并不是所有的 Joins 類型都可以轉(zhuǎn)換為 Broadcast Joins来累。一來砚作,Broadcast Joins 不支持全連接(Full Outer Joins);二來嘹锁,在所有的數(shù)據(jù)關(guān)聯(lián)中葫录,我們不能廣播基表×旎或者說米同,即便開發(fā)者強(qiáng)制廣播基表,也無濟(jì)于事摔竿。比如說面粮,在左連接(Left Outer Join)中,我們只能廣播右表继低;在右連接(Right Outer Join)中熬苍,我們只能廣播左表。
5. CPU視角
CPU 與內(nèi)存的平衡本質(zhì)上是什么袁翁?
Spark 將內(nèi)存分成了 Execution Memory 和 Storage Memory 兩類柴底,分別用于分布式任務(wù)執(zhí)行和 RDD 緩存。其中粱胜,RDD 緩存雖然最終占用的是 Storage Memory柄驻,但在 RDD 展開(Unroll)之前,計算任務(wù)消耗的還是 Execution Memory年柠。因此凿歼,Spark 中 CPU 與內(nèi)存的平衡,其實(shí)就是 CPU 與執(zhí)行內(nèi)存之間的協(xié)同與配比冗恨。
三足鼎立:并行度答憔、并發(fā)度與執(zhí)行內(nèi)存
并行度指的是為了實(shí)現(xiàn)分布式計算,分布式數(shù)據(jù)集被劃分出來的份數(shù)掀抹。并行度明確了數(shù)據(jù)劃分的粒度:并行度越高虐拓,數(shù)據(jù)的粒度越細(xì),數(shù)據(jù)分片越多傲武,數(shù)據(jù)越分散蓉驹。并行度可以通過兩個參數(shù)來設(shè)置城榛,分別是 spark.default.parallelism 和 spark.sql.shuffle.partitions。前者用于設(shè)置 RDD 的默認(rèn)并行度态兴,后者在 Spark SQL 開發(fā)框架下狠持,指定了 Shuffle Reduce 階段默認(rèn)的并行度。并發(fā)度呢瞻润?Executor 的線程池大小由參數(shù) spark.executor.cores 決定喘垂,每個任務(wù)在執(zhí)行期間需要消耗的線程數(shù)由 spark.task.cpus 配置項(xiàng)給定。兩者相除得到的商就是并發(fā)度绍撞,也就是同一時間內(nèi)正勒,一個 Executor 內(nèi)部可以同時運(yùn)行的最大任務(wù)數(shù)量。又因?yàn)樯迪常瑂park.task.cpus 默認(rèn)數(shù)值為 1章贞,并且通常不需要調(diào)整,所以非洲,并發(fā)度基本由 spark.executor.cores 參數(shù)敲定鸭限。就 Executor 的線程池來說,盡管線程本身可以復(fù)用怪蔑,但每個線程在同一時間只能計算一個任務(wù)里覆,每個任務(wù)負(fù)責(zé)處理一個數(shù)據(jù)分片丧荐。因此缆瓣,在運(yùn)行時,線程虹统、任務(wù)與分區(qū)是一一對應(yīng)的關(guān)系弓坞。
- CPU 低效原因之一:線程掛起
-
CPU 低效原因之二:調(diào)度開銷
三足鼎立
6. 內(nèi)存視角
User Memory 性能隱患
對于 User Memory 內(nèi)存區(qū)域來說,使用 空間去重復(fù)存儲同樣的數(shù)據(jù)车荔,本身就是降低了內(nèi)存的利用率
預(yù)估內(nèi)存占用
- 第一步渡冻,計算 User Memory 的內(nèi)存消耗。我們先匯總應(yīng)用中包含的自定義數(shù)據(jù)結(jié)構(gòu)忧便,并估算這些對象的總大小 #size族吻,然后用 #size 乘以 Executor 的線程池大小,即可得到 User Memory 區(qū)域的內(nèi)存消耗 #User珠增。
- 第二步超歌,計算 Storage Memory 的內(nèi)存消耗。我們先匯總應(yīng)用中涉及的廣播變量和分布式數(shù)據(jù)集緩存蒂教,分別估算這兩類對象的總大小巍举,分別記為 #bc、#cache凝垛。另外懊悯,我們把集群中的 Executors 總數(shù)記作 #E蜓谋。這樣,每個 Executor 中 Storage Memory 區(qū)域的內(nèi)存消耗的公式就是:#Storage = #bc + #cache / #E炭分。
- 第三步桃焕,計算執(zhí)行內(nèi)存的消耗。我們知道執(zhí)行內(nèi)存的消耗與多個因素有關(guān)捧毛。第一個因素是 Executor 線程池大小 #threads覆旭,第二個因素是數(shù)據(jù)分片大小,而數(shù)據(jù)分片大小取決于數(shù)據(jù)集尺寸 #dataset 和并行度 #N岖妄。因此型将,每個 Executor 中執(zhí)行內(nèi)存的消耗的計算公式為:#Execution = #threads * #dataset / #N。
調(diào)整內(nèi)存配置項(xiàng)
- 首先荐虐,根據(jù)定義七兜,spark.memory.fraction 可以由公式(#Storage + #Execution)/(#User + #Storage + #Execution)計算得到。
- 同理福扬,spark.memory.storageFraction 的數(shù)值應(yīng)該參考(#Storage)/(#Storage + #Execution)腕铸。
- 最后,對于 Executor 堆內(nèi)內(nèi)存總大小 spark.executor.memory 的設(shè)置铛碑,我們自然要參考 4 個內(nèi)存區(qū)域的總消耗狠裹,也就是 300MB + #User + #Storage + #Execution。不過汽烦,我們要注意涛菠,利用這個公式計算的前提是,不同內(nèi)存區(qū)域的占比與不同類型的數(shù)據(jù)消耗一致撇吞。
Cache
對于存儲級別來說俗冻,實(shí)際開發(fā)中最常用到的有兩個,MEMORY_ONLY 和 MEMORY_AND_DISK牍颈,它們分別是 RDD 緩存和 DataFrame 緩存的默認(rèn)存儲級別迄薄。對于緩存計算來說,它分為 3 個步驟煮岁,第一步是 Unroll讥蔽,把 RDD 數(shù)據(jù)分片的 Iterator 物化為對象值,第二步是 Transfer画机,把對象值封裝為 MemoryEntry冶伞,第三步是把 BlockId、MemoryEntry 價值對注冊到 LinkedHashMap 數(shù)據(jù)結(jié)構(gòu)色罚。另外碰缔,當(dāng)數(shù)據(jù)緩存需求遠(yuǎn)大于 Storage Memory 區(qū)域的空間供給時,Spark 利用 LinkedHashMap 數(shù)據(jù)結(jié)構(gòu)提供的特性戳护,會遵循 LRU 和兔子不吃窩邊草這兩個基本原則來清除內(nèi)存空間:LRU:按照元素的訪問順序金抡,優(yōu)先清除那些“最近最少訪問”的 BlockId瀑焦、MemoryEntry 鍵值對兔子不吃窩邊草:在清除的過程中,同屬一個 RDD 的 MemoryEntry 擁有“赦免權(quán)”
OOM
- Driver 端的 OOM
創(chuàng)建的數(shù)據(jù)集超過內(nèi)存上限
收集的結(jié)果集超過內(nèi)存上限
調(diào)節(jié) Driver 端側(cè)內(nèi)存大小我們要用到 spark.driver.memory 配置項(xiàng)梗肝,預(yù)估數(shù)據(jù)集尺寸可以用“先 Cache榛瓮,再查看執(zhí)行計劃”的方式 - Storage Memory 的 OOM
不會發(fā)生,數(shù)據(jù)集不能完全緩存到 MemoryStore巫击,Spark 也不會拋 OOM 異常禀晓,額外的數(shù)據(jù)要么落盤(MEMORY_AND_DISK)、要么直接放棄(MEMORY_ONLY) - User Memory 的 OOM
User Memory 用于存儲用戶自定義的數(shù)據(jù)結(jié)構(gòu)坝锰,如數(shù)組粹懒、列表、字典等顷级。因此凫乖,如果這些數(shù)據(jù)結(jié)構(gòu)的總大小超出了 User Memory 內(nèi)存區(qū)域的上限就會OOM,解決 User Memory 端 OOM 的思路和 Driver 端的并無二致弓颈,也是先對數(shù)據(jù)結(jié)構(gòu)的消耗進(jìn)行預(yù)估帽芽,然后相應(yīng)地擴(kuò)大 User Memory 的內(nèi)存配置,總大小由 spark.executor.memory * ( 1 - spark.memory.fraction)計算得到 - Execution Memory 的 OOM
數(shù)據(jù)量并不是決定 OOM 與否的關(guān)鍵因素翔冀,數(shù)據(jù)分布與 Execution Memory 的運(yùn)行時規(guī)劃是否匹配才是导街,一旦分布式任務(wù)的內(nèi)存請求超出 1/N 這個上限,Execution Memory 就會出現(xiàn) OOM 問題纤子。而且搬瑰,相比其他場景下的 OOM 問題,Execution Memory 的 OOM 要復(fù)雜得多计福,它不僅僅與內(nèi)存空間大小跌捆、數(shù)據(jù)分布有關(guān)徽职,還與 Executor 線程池和運(yùn)行時任務(wù)調(diào)度有關(guān)象颖。
數(shù)據(jù)傾斜:Spark 在 Reduce 階段支持 Spill 和外排,PairBuffer 和 AppendOnlyMap 等數(shù)據(jù)結(jié)構(gòu)的內(nèi)存消耗姆钉,以及數(shù)據(jù)排序的臨時內(nèi)存消耗
消除數(shù)據(jù)傾斜说订,調(diào)整數(shù)據(jù)分片尺寸
調(diào)整 Executor 線程池、內(nèi)存潮瓶、并行度等相關(guān)配置陶冷,提高 1/N 上限
數(shù)據(jù)膨脹:磁盤中的數(shù)據(jù)進(jìn)了 JVM 之后會膨脹
把數(shù)據(jù)打散,提高數(shù)據(jù)分片數(shù)量毯辅、降低數(shù)據(jù)粒度
加大內(nèi)存配置埂伦,結(jié)合 Executor 線程池調(diào)整,提高 1/N 上限
7. 磁盤視角
磁盤在功能上的作用
- 溢出臨時文件
- 存儲 Shuffle 中間文件
- 緩存分布式數(shù)據(jù)集思恐。也就是說沾谜,凡是帶DISK字樣的存儲模式膊毁,都會把內(nèi)存中放不下的數(shù)據(jù)緩存到磁盤
磁盤復(fù)用
- 磁盤復(fù)用的收益之一就是縮短失敗重試的路徑,在保障作業(yè)穩(wěn)定性的同時提升執(zhí)行性能基跑。
- ReuseExchange 機(jī)制下的磁盤復(fù)用:相同或是相似的物理計劃可以共享 Shuffle 計算的中間結(jié)果
8. 網(wǎng)絡(luò)視角
數(shù)據(jù)讀寫
PROCESS_LOCAL:任務(wù)與數(shù)據(jù)同在一個 JVM 進(jìn)程中
NODE_LOCAL:任務(wù)與數(shù)據(jù)同在一個計算節(jié)點(diǎn)婚温,數(shù)據(jù)可能在磁盤上或是另一個 JVM 進(jìn)程中
RACK_LOCAL:任務(wù)與數(shù)據(jù)不在同一節(jié)點(diǎn),但在同一個物理機(jī)架上
ANY:任務(wù)與數(shù)據(jù)是跨機(jī)架媳否、甚至是跨 DC(Data Center栅螟,數(shù)據(jù)中心)的關(guān)系訪問數(shù)據(jù)源是否會引入網(wǎng)絡(luò)開銷,取決于任務(wù)與數(shù)據(jù)的本地性關(guān)系篱竭,也就是任務(wù)的本地性級別
數(shù)據(jù)處理
Shuffle 作為大多數(shù)計算場景的“性能瓶頸擔(dān)當(dāng)”,確實(shí)是網(wǎng)絡(luò)開銷的罪魁禍?zhǔn)撞舯啤8鶕?jù)“能省則省”的開發(fā)原則搪哪,我們自然要想盡辦法去避免 Shuffle。
數(shù)據(jù)傳輸
在數(shù)據(jù)通過網(wǎng)絡(luò)分發(fā)之前坪圾,我們可以利用 Kryo Serializer 序列化器晓折,提升序列化字節(jié)的存儲效率,從而有效降低在網(wǎng)絡(luò)中分發(fā)的數(shù)據(jù)量兽泄,整體上減少網(wǎng)絡(luò)開銷漓概。
9. 補(bǔ)充
- join分析:shuffle hash join、broadcast hash join join分析:shuffle hash join病梢、broadcast hash join,hash映射成功之后再檢查 join 條件的
- 使用 Parquet胃珍、ORC 等文件格式,去坐享謂詞下推帶來的數(shù)據(jù)讀取效率",應(yīng)該如何理解蜓陌?謂詞下推本身觅彰,不依賴于任何文件存儲格式,它本身就是Spark SQL的優(yōu)化策略钮热,DataFrame里面如果包含filter一類的操作填抬,他們就會盡可能地被推到執(zhí)行計劃的最下面。
但是隧期,謂詞下推的效果飒责,和文件存儲格式有關(guān)。假設(shè)是CSV這種行存格式仆潮,那么謂詞下推頂多是在整個執(zhí)行計劃的shuffle之前宏蛉,降低數(shù)據(jù)量大小。但如果是orc性置、Parquet這種列存文件拾并,謂詞下推能直接推到文件掃描上去,直接在磁盤掃描階段,就降低文件掃描量嗅义,降低i/o開銷个榕,從而提升執(zhí)行性能。 - 如何利用鎢絲計劃的優(yōu)勢芥喇?
- 如何利用 AQE 的優(yōu)勢西采?
- Task,Partition的概念:Spark中Task继控,Partition
- 并行度(Parallelism)械馆,并行計算任務(wù)(Paralleled Tasks),這兩個概念:并行度的出發(fā)點(diǎn)是數(shù)據(jù)武通,它明確了數(shù)據(jù)劃分的粒度霹崎。像分區(qū)數(shù)量、分片數(shù)量冶忱、Partitions 這些概念都是并行度的同義詞尾菇。并行計算任務(wù)則不同,它指的是在任一時刻整個集群能夠同時計算的任務(wù)數(shù)量囚枪。換句話說派诬,它的出發(fā)點(diǎn)是計算任務(wù)、是 CPU链沼,由與 CPU 有關(guān)的三個參數(shù)共同決定默赂,并行度決定了數(shù)據(jù)粒度,數(shù)據(jù)粒度決定了分區(qū)大小括勺,分區(qū)大小則決定著每個計算任務(wù)的內(nèi)存消耗缆八。在同一個 Executor 中,多個同時運(yùn)行的計算任務(wù)“基本上”是平均瓜分可用內(nèi)存的疾捍,每個計算任務(wù)能獲取到的內(nèi)存空間是有上限的奈辰,因此并行計算任務(wù)數(shù)會反過來制約并行度的設(shè)置。
- 內(nèi)存空間是有限的乱豆,該把多少內(nèi)存劃分給堆內(nèi)业扒,又該把多少內(nèi)存留給堆外呢足画?對于需要處理的數(shù)據(jù)集喜每,如果數(shù)據(jù)模式比較扁平二汛,而且字段多是定長數(shù)據(jù)類型涛酗,就更多地使用堆外內(nèi)存啥供。相反地舅锄,如果數(shù)據(jù)模式很復(fù)雜谜悟,嵌套結(jié)構(gòu)或變長字段很多孵奶,就更多采用 JVM 堆內(nèi)內(nèi)存會更加穩(wěn)妥
- 在堆內(nèi)內(nèi)存里疲酌,該怎么平衡 User Memory 和 Spark 用于計算的內(nèi)存空間?spark.memory.fraction 的默認(rèn)值是 0.6,也就是 JVM 堆內(nèi)空間的 60% 會劃撥給 Spark 支配朗恳,剩下的 40% 劃撥給 User Memory湿颅。當(dāng)在 JVM 內(nèi)平衡 Spark 可用內(nèi)存和 User Memory 時,你需要考慮你的應(yīng)用中類似的自定義數(shù)據(jù)結(jié)構(gòu)多不多粥诫、占比大不大油航?然后再相應(yīng)地調(diào)整兩塊內(nèi)存區(qū)域的相對占比。
- 在統(tǒng)一內(nèi)存管理模式下怀浆,該如何平衡 Execution Memory 和 Storage Memory谊囚?如果你的應(yīng)用類型是“緩存密集型”,如機(jī)器學(xué)習(xí)訓(xùn)練任務(wù)执赡,就很有必要通過調(diào)節(jié)這個參數(shù)來保障數(shù)據(jù)的全量緩存镰踏。在這個過程中,你要特別注意 RDD 緩存與執(zhí)行效率之間的平衡沙合,首先奠伪,RDD 緩存占用的內(nèi)存空間多了,Spark 用于執(zhí)行分布式計算任務(wù)的內(nèi)存空間自然就變少了首懈,而且數(shù)據(jù)分析場景中常見的關(guān)聯(lián)绊率、排序和聚合等操作都會消耗執(zhí)行內(nèi)存,這部分內(nèi)存空間變少究履,自然會影響到這類計算的執(zhí)行效率即舌。其次,大量緩存引入的 GC(Garbage Collection挎袜,垃圾回收)負(fù)擔(dān)對執(zhí)行效率來說是個巨大的隱患顽聂。我們可以調(diào)節(jié) spark.rdd.compress 這個參數(shù)。RDD 緩存默認(rèn)是不壓縮的盯仪,啟用壓縮之后紊搪,緩存的存儲效率會大幅提升,有效節(jié)省緩存內(nèi)存的占用全景,從而把更多的內(nèi)存空間留給分布式任務(wù)執(zhí)行耀石。但都是以引入額外的計算開銷、犧牲 CPU 為代價的爸黄。所以說滞伟,性能調(diào)優(yōu)的過程本質(zhì)上就是不斷地平衡不同硬件資源消耗的過程。
- 為什么堆外內(nèi)存如此優(yōu)秀炕贵?:Spark 開辟的堆外內(nèi)存基于緊湊的二進(jìn)制格式梆奈,相比 JVM 堆內(nèi)內(nèi)存,Spark 通過 Java Unsafe API 在堆外內(nèi)存中的管理称开,才會有那么多的優(yōu)勢亩钟。
- spark.sql.shuffle.partitions:只有你的計算中涉及Joins或是聚合乓梨,spark.sql.shuffle.partitions,這個參數(shù)的設(shè)置清酥,才會影響Shuffle Reduce階段的并行度扶镀。如果你的作業(yè)沒有Joins或是聚合計算,這個參數(shù)設(shè)了也是擺設(shè)焰轻。
- Class Student是存在User Memory? new Student("小明")是存在Executor Memory臭觉?1. 如果你用RDD封裝這些自定義類型,比如RDD[Student]辱志,那么蝠筑,數(shù)據(jù)集消耗的是Execution memory。2. 相反荸频,如果你是在處理分布式數(shù)據(jù)集的函數(shù)中菱肖,new Student來輔助計算過程,那么這個對象旭从,是放在User memory里面的稳强。
- AQE 如何判定數(shù)據(jù)分區(qū)是否傾斜呢?它又是怎么把大分區(qū)拆分成多個小分區(qū)的和悦?我們還是通過一個例子來理解退疫。假設(shè)數(shù)據(jù)表 A 有 3 個分區(qū),分區(qū)大小分別是 80MB鸽素、100MB 和 512MB褒繁。顯然,這些分區(qū)按大小個排序后的中位數(shù)是 100MB馍忽,因?yàn)?skewedPartitionFactor 的默認(rèn)值是 5 倍棒坏,所以大于 100MB * 5 = 500MB 的分區(qū)才有可能被判定為傾斜分區(qū)。在我們的例子中遭笋,只有最后一個尺寸是 512MB 的分區(qū)符合這個條件坝冕。這個時候,Spark 還不能完全判定它就是傾斜分區(qū)瓦呼,還要看 skewedPartitionThresholdInBytes 配置項(xiàng)喂窟,這個參數(shù)的默認(rèn)值是 256MB。對于那些滿足中位數(shù)條件的分區(qū)央串,必須要大于 256MB磨澡,Spark 才會把這個分區(qū)最終判定為傾斜分區(qū)。假設(shè) skewedPartitionThresholdInBytes 設(shè)定為 1GB质和,那在我們的例子中稳摄,512MB 那個大分區(qū),Spark 也不會把它看成是傾斜分區(qū)侦另,自然也就不能享受到 AQE 對于數(shù)據(jù)傾斜的優(yōu)化處理秩命。檢測到傾斜分區(qū)之后尉共,接下來就是對它拆分褒傅,拆分的時候還會用到 advisoryPartitionSizeInBytes 參數(shù)弃锐。假設(shè)我們將這個參數(shù)的值設(shè)置為 256MB,那么殿托,剛剛那個 512MB 的傾斜分區(qū)會以 256MB 為粒度拆分成多份霹菊,因此,這個大分區(qū)會被拆成 2 個小分區(qū)( 512MB / 256MB =2)支竹。拆分之后旋廷,原來的數(shù)據(jù)表就由 3 個分區(qū)變成了 4 個分區(qū),每個分區(qū)的尺寸都不大于 256MB礼搁。
- AQE 中數(shù)據(jù)傾斜的處理機(jī)制?啟用動態(tài) Join 策略調(diào)整還有個前提饶碘,也就是要滿足 nonEmptyPartitionRatioForBroadcastJoin 參數(shù)的限制。這個參數(shù)的默認(rèn)值是 0.2馒吴,大表過濾之后扎运,非空的數(shù)據(jù)分區(qū)占比要小于 0.2,才能成功觸發(fā) Broadcast Join 降級饮戳。我們來舉個例子豪治。假設(shè),大表過濾之前有 100 個分區(qū)扯罐,F(xiàn)ilter 操作之后负拟,有 85 個分區(qū)內(nèi)的數(shù)據(jù)因?yàn)椴粷M足過濾條件,在過濾之后都變成了沒有任何數(shù)據(jù)的空分區(qū)歹河,另外的 15 個分區(qū)還保留著滿足過濾條件的數(shù)據(jù)掩浙。這樣一來,這張大表過濾之后的非空分區(qū)占比是 15 / 100 = 15%秸歧,因?yàn)?15% 小于 0.2厨姚,所以這個例子中的大表會成功觸發(fā) Broadcast Join 降級。相反寥茫,如果大表過濾之后遣蚀,非空分區(qū)占比大于 0.2,那么剩余數(shù)據(jù)量再小纱耻,AQE 也不會把 Shuffle Joins 降級為 Broadcast Join芭梯。因此,如果你想要充分利用 Broadcast Join 的優(yōu)勢弄喘,可以考慮把這個參數(shù)適當(dāng)調(diào)高玖喘。
- map端的合并導(dǎo)致同一個key的數(shù)據(jù)沒有被拉取到同一個執(zhí)行reduce task的executor中?假設(shè)是單表shuffle,比如reduceByKey這種蘑志,那么合并之后不影響累奈,即便一個分片有多個key贬派,也不要緊,只要保證同一個key的payload都在一個分區(qū)就行澎媒。兩表join其實(shí)外表搞乏,也就是驅(qū)動表,會驅(qū)動內(nèi)表的“全量掃描”戒努,帶引號是因?yàn)樾什灰粯忧攵兀瑂mj、hj不用全量储玫,但意思是一樣的侍筛,就是不管內(nèi)表有多少分區(qū),都會被外表驅(qū)動著去被遍歷撒穷。因此匣椰,不輪內(nèi)表數(shù)據(jù)分配到了哪個分區(qū),其實(shí)都還是在一個executor進(jìn)程內(nèi)端礼,所以禽笑,不影響join邏輯,也不影響效率齐媒。
- spark.shuffle.sort.bypassMergeThreshold 這個閾值為什么是跟Reduce 端的分區(qū)數(shù)有關(guān)蒲每?如果你的reduce階段并行度非常的高,那么map task的計算開銷會非常大喻括,要同時打開非常多的臨時文件邀杏、建立非常多的寫buffer
- 比較大的shuffle 可以對下面的參數(shù)進(jìn)行調(diào)節(jié),提高整個shuffle 的健壯性唬血?
spark.shuffle.compress 是否對shuffle 的中間結(jié)果進(jìn)行壓縮望蜡,如果壓縮的話使用spark.io.compression.codec 的配置進(jìn)行壓縮
spark.shuffle.io.maxRetries io 失敗的重試次數(shù),在大型shuffle遇到網(wǎng)絡(luò)或者GC 問題的時候很有用拷恨。
spark.shuffle.io.retryWait io 失敗的時候的等待時間 - bypass的實(shí)現(xiàn)機(jī)制脖律? Spark源碼精讀分析計劃 (shuffle write)
a)為每一個reduce task生成一個臨時文件
b)為每個臨時文件創(chuàng)建寫buffer、以及一個serializer對象腕侄,用于序列化數(shù)據(jù)
c)保持所有的臨時文件打開小泉,將map階段的數(shù)據(jù),按照reduce端partitionId的不同冕杠,依次寫入到這些臨時文件
d)最后微姊,map task計算完畢,把所有這些臨時文件合并到一起分预,生成data文件兢交,并同時生成index文件 - 在shuffle中對所有臨時文件和內(nèi)存數(shù)據(jù)結(jié)構(gòu)中剩余的數(shù)據(jù)記錄做歸并排序,是結(jié)合堆排序的嗎笼痹,臨時文件太多的時候配喳,會不會不能同時打開這么多文件酪穿,還是用的類似優(yōu)化版的兩兩歸并呢?通常來說晴裹,每個Task處理的數(shù)據(jù)分片大小在200MB最佳被济,這個是結(jié)合經(jīng)驗(yàn)得出的結(jié)論。那么我們就可以利用后面說的“三足鼎立”來保證每個Task的分片大小就在200MB左右息拜。在這樣的情況下溉潭,現(xiàn)代計算機(jī)的硬件資源基本上都比較充足净响,比如說少欺,對于一個有著5g Execution Memory,32 cores的Executors來說馋贤,每個Task能分到的內(nèi)存時160MB左右赞别,對于200MB的數(shù)據(jù)分區(qū)來說,其實(shí)spills的數(shù)量是有限的配乓,因此這種情況下仿滔,同時打開多個臨時文件的壓力其實(shí)還好。
- 理解 by pass 機(jī)制極其性能瓶頸:注意是為每一個reduce task生成一個臨時文件并且在合并的時候要打開所有的臨時文件進(jìn)行合并犹芹,所以說不宜reduce的分區(qū)過多(分區(qū)數(shù)是怎么決定的)
- Spark 廣播機(jī)制現(xiàn)有的實(shí)現(xiàn)方式是存在隱患的崎页,在數(shù)據(jù)量較大的情況下,Driver 可能會成為瓶頸腰埂,你能想到更好的方式來重新實(shí)現(xiàn) Spark 的廣播機(jī)制嗎飒焦?https://issues.apache.org/jira/browse/SPARK-17556 改成由driver獲取到數(shù)據(jù)分布,然后通知各個executor之間進(jìn)行拉取屿笼,這樣可以利用多個executor網(wǎng)絡(luò)牺荠,避免只有driver組裝以后再一個一個發(fā)送效率過低
- 有什么辦法能準(zhǔn)確地預(yù)估一張表在內(nèi)存中的存儲大小呢?第一步驴一,把要預(yù)估大小的數(shù)據(jù)表緩存到內(nèi)存休雌,比如直接在 DataFrame 或是 Dataset 上調(diào)用 cache 方法;第二步肝断,讀取 Spark SQL 執(zhí)行計劃的統(tǒng)計數(shù)據(jù)杈曲。
- 字符串“abcd”只需要消耗 4 個字節(jié),為什么JVM 在堆內(nèi)存儲這 4 個字符串總共需要消耗 48 個字節(jié)胸懈?Project Tungsten: Improving the Efficiency of Spark Applications
- 執(zhí)行內(nèi)存計算公式担扑?堆內(nèi)執(zhí)行內(nèi)存的初始值由很多參數(shù)共同決定,具體的計算公式是:spark.executor.memory * spark.memory.fraction * (1 - spark.memory.storageFraction)箫荡。堆外執(zhí)行內(nèi)存的計算:spark.memory.offHeap.size * (1 - spark.memory.storageFraction)
- 有效提升 CPU 利用率的方法魁亦?首先,在一個 Executor 中羔挡,每個 CPU 線程能夠申請到的內(nèi)存比例是有上下限的洁奈,最高不超過 1/N间唉,最低不少于 1/N/2,其中 N 代表線程池大小利术。其次呈野,在給定線程池大小和執(zhí)行內(nèi)存的時候,并行度較低印叁、數(shù)據(jù)分片較大容易導(dǎo)致 CPU 線程掛起被冒,線程頻繁掛起不利于提升 CPU 利用率,而并行度過高轮蜕、數(shù)據(jù)過于分散會讓調(diào)度開銷更顯著昨悼,也不利于提升 CPU 利用率。最后跃洛,在給定執(zhí)行內(nèi)存 M率触、線程池大小 N 和數(shù)據(jù)總量 D 的時候,想要有效地提升 CPU 利用率汇竭,我們就要計算出最佳并行度 P葱蝗,計算方法是讓數(shù)據(jù)分片的平均大小 D/P 坐落在(M/N/2, M/N)區(qū)間。這樣细燎,在運(yùn)行時两曼,我們的 CPU 利用率往往不會太差。(注意理解并行度的意義玻驻,這里的公式表明提供了一個范圍區(qū)間悼凑,每個并行的task處理的數(shù)據(jù)量等同與總的內(nèi)存數(shù)/線程數(shù))
- 從 Executor 并發(fā)度、執(zhí)行內(nèi)存大小和分布式任務(wù)并行度出發(fā)击狮,你認(rèn)為在什么情況下會出現(xiàn) OOM 的問題佛析?在每個線程都分配到了最大內(nèi)存,即 M/N 的內(nèi)存時彪蓬,如果 task 還需要更多的內(nèi)存寸莫,那么就會發(fā)生 OOM。 在每個線程都分配到了最少內(nèi)存档冬,即 M/2N的內(nèi)存時膘茎,如果 task 還需要更多的內(nèi)存,此時又沒有其他線程釋放內(nèi)存供其使用酷誓,那么也會導(dǎo)致OOM披坏。
- 你覺得,為什么 Eviction 規(guī)則要遵循“兔子不吃窩邊草”呢盐数?如果允許同一個 RDD 的 MemoryEntry 被驅(qū)逐棒拂,有什么危害嗎?
- 對于 DataFrame 的緩存復(fù)用,Cache Manager 為什么沒有采用根據(jù) Optimized Logical Plan 的方式帚屉,你覺得難點(diǎn)在哪里谜诫?如果讓你實(shí)現(xiàn) Cache Manager 的話,你會怎么做攻旦?
- Spark調(diào)度系統(tǒng):Spark調(diào)度系統(tǒng)
- ReuseExchange 機(jī)制觸發(fā)條件:多個查詢所依賴的分區(qū)規(guī)則要與 Shuffle 中間數(shù)據(jù)的分區(qū)規(guī)則保持一致多個查詢所涉及的字段(Attributes)要保持一致