一.Spark 性能優(yōu)化概述
首先筆者能力優(yōu)先擎厢,使用Spark有一段時間士鸥,如下是筆者的工作經(jīng)驗的總結(jié)赁项。
Spark任務(wù)運行圖:
Spark的優(yōu)化思路:
一般是從3個層面進行Spark程序的優(yōu)化:
- 運行環(huán)境優(yōu)化
- RDD算子優(yōu)化
- 參數(shù)微調(diào)
二.運行環(huán)境優(yōu)化
2.1 數(shù)據(jù)本地性
我們知道HDFS的數(shù)據(jù)文件存儲在不同的datanode籽暇,一般數(shù)據(jù)副本數(shù)量是3,因為Spark計算的數(shù)據(jù)量比較大养铸,如果數(shù)據(jù)不在本節(jié)點雁芙,需要通過網(wǎng)絡(luò)去其它的datanode讀取數(shù)據(jù)轧膘。
所以此時我們可以通過提高數(shù)據(jù)本地性,減少網(wǎng)絡(luò)傳輸兔甘,來達到性能優(yōu)化的目的谎碍。
- 計算和存儲同節(jié)點(executor和HDFS的datanode、hbase的region server同節(jié)點)
- executor數(shù)目合適: 如果100個數(shù)據(jù)界定洞焙,3個計算節(jié)點蟆淀,就有97份網(wǎng)絡(luò)傳遞,所以此種情況可以適當(dāng)增加計算節(jié)點澡匪。
- 適當(dāng)增加數(shù)據(jù)副本數(shù)量
2.2 數(shù)據(jù)存儲格式
推薦使用列式存儲格式: parquet.
parquet存在如下優(yōu)先:
- 相同數(shù)據(jù)類型的數(shù)據(jù)有很高壓縮比
- Hive主要支持ORC熔任、也支持parquet
三.RDD算子優(yōu)化
3.1 盡可能復(fù)用同一個RDD
每創(chuàng)建一個RDD都會帶來性能的開銷,盡可能的對同一個RDD做算子操作仙蛉,而不要頻繁創(chuàng)建新的
RDD笋敞。
3.2 對多次使用的RDD進行持久化
如果RDD的算子特別多,需要頻繁多次操作同一個RDD荠瘪,最好的辦法是將該RDD進行持久化,
四.參數(shù)微調(diào)
num-executors
參數(shù)說明:該參數(shù)用于設(shè)置每個Executor進程的內(nèi)存赛惩。Executor內(nèi)存的大小哀墓,很多時候直接決定了Spark作業(yè)的性能,而且跟常見的JVM OOM異常喷兼,也有直接的關(guān)聯(lián)篮绰。executor-cores
參數(shù)說明:該參數(shù)用于設(shè)置每個Executor進程的CPU core數(shù)量。driver-memory
參數(shù)說明:該參數(shù)用于設(shè)置Driver進程的內(nèi)存季惯。spark.default.parallelism
參數(shù)說明:該參數(shù)用于設(shè)置每個stage的默認task數(shù)量吠各。spark.storage.memoryFraction
參數(shù)說明:該參數(shù)用于設(shè)置RDD持久化數(shù)據(jù)在Executor內(nèi)存中能占的比例,默認是0.6勉抓。spark.shuffle.memoryFraction
參數(shù)說明:該參數(shù)用于設(shè)置shuffle過程中一個task拉取到上個stage的task的輸出后贾漏,進行聚合操作時能夠使用的Executor內(nèi)存的比例,默認是0.2藕筋。
資源參數(shù)參考示例:
./bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \
五.數(shù)據(jù)傾斜
絕大多數(shù)task執(zhí)行得都非匙萆ⅲ快,但個別task執(zhí)行極慢隐圾。比如伍掀,總共有1000個task,997個task都在1分鐘之內(nèi)執(zhí)行完了暇藏,但是剩余兩三個task卻要一兩個小時蜜笤。這種情況很常見。
數(shù)據(jù)傾斜圖例:
解決數(shù)據(jù)傾斜一般有如下幾種常用方法:
使用Hive ETL預(yù)處理數(shù)據(jù)
先使用Hive進行預(yù)處理數(shù)據(jù)盐碱,也就是使用Hive先計算一層中間數(shù)據(jù)把兔,Spark從中間層數(shù)據(jù)開始計算沪伙。過濾少數(shù)導(dǎo)致傾斜的key
如果發(fā)生導(dǎo)致傾斜的key非常少,可以將Spark任務(wù)拆分為包含 導(dǎo)致傾斜的key的任務(wù)和不包含key的任務(wù)垛贤。sample采樣傾斜key單獨進行join
通過采樣焰坪,提前預(yù)估會發(fā)生數(shù)據(jù)傾斜的key,然后將一個join拆分為兩個join聘惦,其中一個不包含該key某饰,一個只包含該key,最后將結(jié)果集進行union善绎。調(diào)整并行度
調(diào)整Shuffle并行度黔漂,數(shù)據(jù)打散廣播小數(shù)據(jù)集
適用于一個大表,一個小表
不用join連接操作禀酱,而改用Broadcast變量與map模擬join操作炬守,完全規(guī)避shuffle操作
spark.sql: spark.sql.autoBroadcastJoinThreshold=104857600增加隨機前綴
對發(fā)生傾斜的RDD增加隨機前綴
對另外一個RDD等量擴容
如果少量的key發(fā)生傾斜,可以先過濾出一個單獨的RDD剂跟,對另外一個RDD同理吹减途,join之后再合并
六. Spark常用的調(diào)優(yōu)參數(shù)
6.1 在內(nèi)存中緩存數(shù)據(jù)
Spark SQL可以通過調(diào)用Spark.catalog.cachetable ("tableName")或DataFrame.cache()來使用內(nèi)存中的columnar格式緩存表。然后Spark SQL將只掃描所需的列曹洽,并自動調(diào)優(yōu)壓縮以最小化內(nèi)存使用和GC壓力鳍置。你可以調(diào)用spark.catalog.uncacheTable("tableName")從內(nèi)存中刪除表。
內(nèi)存緩存的配置可以在SparkSession上使用setConf方法或者使用SQL運行SET key=value命令來完成送淆。
參數(shù)名 | 默認值 | 參數(shù)說明 | 啟始版本 |
---|---|---|---|
spark.sql.inMemoryColumnarStorage.compressed | true | 當(dāng)設(shè)置為true時税产,Spark SQL會根據(jù)數(shù)據(jù)統(tǒng)計自動為每列選擇壓縮編解碼器。 | 1.0.1 |
spark.sql.inMemoryColumnarStorage.batchSize | 10000 | 控制柱狀緩存的批大小偷崩。更大的批處理大小可以提高內(nèi)存利用率和壓縮辟拷,但在緩存數(shù)據(jù)時可能會帶來OOMs風(fēng)險。 | 1.1.1 |
6.2 其它配置項
還可以使用以下選項調(diào)優(yōu)查詢執(zhí)行的性能阐斜。隨著更多的優(yōu)化被自動執(zhí)行衫冻,這些選項可能會在未來的版本中被棄用。
參數(shù)名 | 默認值 | 參數(shù)說明 | 啟始版本 |
---|---|---|---|
spark.sql.files.maxPartitionBytes | 134217728 (128 MB) | 讀取文件時裝入單個分區(qū)的最大字節(jié)數(shù)智听。此配置僅在使用基于文件的源(如Parquet羽杰、JSON和ORC)時有效。 | 2.0.0 |
spark.sql.files.openCostInBytes | 4194304 (4 MB) | 打開一個文件的估計成本到推,由可以在同一時間掃描的字節(jié)數(shù)來衡量考赛。當(dāng)將多個文件放入一個分區(qū)時使用。最好是高估莉测,那么帶有小文件的分區(qū)將比帶有大文件的分區(qū)更快(這是首先安排的)颜骤。此配置僅在使用基于文件的源(如Parquet、JSON和ORC)時有效捣卤。 | 2.0.0 |
spark.sql.files.minPartitionNum | Default Parallelism | 建議的(不是保證的)最小分割文件分區(qū)數(shù)忍抽。如果沒有設(shè)置八孝,默認值是' spark.default.parallelism '。此配置僅在使用基于文件的源(如Parquet鸠项、JSON和ORC)時有效干跛。 | 3.1.0 |
spark.sql.broadcastTimeout | 300 | broadcast join 等待時間的超時(秒) | 1.3.0 |
spark.sql.autoBroadcastJoinThreshold | 10485760 (10 MB) | 配置在執(zhí)行聯(lián)接時將廣播到所有工作節(jié)點的表的最大字節(jié)大小。通過將此值設(shè)置為-1祟绊,可以禁用廣播楼入。注意:目前統(tǒng)計只支持運行ANALYZE TABLE COMPUTE statistics noscan命令的Hive Metastore表。 | 1.1.0 |
spark.sql.shuffle.partitions | 200 | 配置將數(shù)據(jù)變換為連接或聚合時要使用的分區(qū)數(shù)量牧抽。 | 1.1.0 |
spark.sql.sources.parallelPartitionDiscovery.threshold | 32 | 配置閾值以啟用作業(yè)輸入路徑的并行列出嘉熊。如果輸入路徑數(shù)大于該閾值,Spark將通過Spark分布式作業(yè)列出文件扬舒。否則阐肤,它將退回到順序列表。此配置僅在使用基于文件的數(shù)據(jù)源(如Parquet讲坎、ORC和JSON)時有效孕惜。 | 1.5.0 |
spark.sql.sources.parallelPartitionDiscovery.parallelism | 10000 | 配置作業(yè)輸入路徑的最大列出并行度。如果輸入路徑的數(shù)量大于這個值晨炕,它將被降低到使用這個值诊赊。與上面一樣,此配置僅在使用基于文件的數(shù)據(jù)源(如Parquet府瞄、ORC和JSON)時有效。 | 2.1.1 |
6.3 SQL查詢連接的hint
join策略提示BROADCAST碘箍、MERGE遵馆、SHUFFLE_HASH和SHUFFLE_REPLICATE_NL,在將指定的關(guān)系加入到另一個關(guān)系時丰榴,指示Spark對每個指定的關(guān)系使用暗示策略货邓。例如,在表' t1 '上使用BROADCAST提示時四濒,廣播加入(廣播散列連接或廣播嵌套循環(huán)聯(lián)接取決于是否有等值連接鍵)與t1的構(gòu)建方面將由火花即使大小的優(yōu)先表t1的建議的統(tǒng)計配置spark.sql.autoBroadcastJoinThreshold之上换况。
當(dāng)連接兩端指定了不同的連接策略提示時,Spark會優(yōu)先考慮BROADCAST提示而不是MERGE提示盗蟆,優(yōu)先考慮SHUFFLE_HASH提示而不是SHUFFLE_REPLICATE_NL提示戈二。當(dāng)雙方都指定了BROADCAST提示或SHUFFLE_HASH提示時,Spark將根據(jù)連接類型和關(guān)系的大小選擇構(gòu)建端喳资。
請注意觉吭,不能保證Spark會選擇提示中指定的連接策略,因為特定的策略可能不支持所有的連接類型仆邓。
-- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint
SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key
Coalesce hint允許Spark SQL用戶控制輸出文件的數(shù)量鲜滩,就像Dataset API中的Coalesce伴鳖、repartition和repartitionByRange一樣,它們可以用于性能調(diào)優(yōu)和減少輸出文件的數(shù)量徙硅。COALESCE hint只有一個分區(qū)號作為參數(shù)榜聂。“REPARTITION”提示有一個分區(qū)號嗓蘑、列或它們都作為參數(shù)须肆。“REPARTITION_BY_RANGE”提示必須有列名脐往,分區(qū)號是可選的休吠。
SELECT /*+ COALESCE(3) */ * FROM t
SELECT /*+ REPARTITION(3) */ * FROM t
SELECT /*+ REPARTITION(c) */ * FROM t
SELECT /*+ REPARTITION(3, c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t
6.4 自適應(yīng)查詢執(zhí)行
Adaptive Query Execution (AQE)是Spark SQL中的一種優(yōu)化技術(shù),它利用運行時統(tǒng)計信息來選擇最高效的查詢執(zhí)行計劃业簿。默認情況下AQE是禁用的瘤礁。Spark SQL可以使用Spark.SQL.adaptive.enabled的傘配置來控制是否打開/關(guān)閉。從Spark 3.0開始梅尤,AQE中有三個主要特性柜思,包括合并shuffle后分區(qū)、將排序合并連接轉(zhuǎn)換為廣播連接以及傾斜連接優(yōu)化巷燥。
6.5 合并分區(qū)后重新組合
當(dāng)spark.sql.adaptive.enabled和spark.sql.adaptive.coalescePartitions.enabled配置都為true時赡盘,該特性根據(jù)map輸出統(tǒng)計信息來合并post shuffle分區(qū)。這個特性簡化了運行查詢時shuffle分區(qū)號的調(diào)優(yōu)缰揪。您不需要設(shè)置合適的shuffle分區(qū)號來適合您的數(shù)據(jù)集陨享。一旦您通過Spark .sql. adaptive.coalescepartitions . initialpartitionnum配置設(shè)置了足夠大的初始shuffle分區(qū)數(shù),Spark就可以在運行時選擇適當(dāng)?shù)膕huffle分區(qū)號钝腺。
參數(shù)名 | 默認值 | 參數(shù)說明 | 啟始版本 |
---|---|---|---|
spark.sql.adaptive.coalescePartitions.enabled | true | 當(dāng)true和Spark .sql. adaptive_enabled為true時抛姑,Spark會根據(jù)目標(biāo)大小(由Spark .sql. adaptive_advisorypartitionsizeinbytes指定)合并連續(xù)的shuffle分區(qū),以避免過多的小任務(wù)艳狐。 | 3.0.0 |
spark.sql.adaptive.coalescePartitions.minPartitionNum | Default Parallelism | 合并后的最小洗牌分區(qū)數(shù)定硝。如果不設(shè)置,則默認為Spark集群的默認并行度毫目。此配置僅在spark.sql. adaptive.net enabled和spark.sql. adaptive.net coalescepartitions .enabled同時啟用時有效蔬啡。 | 3.0.0 |
spark.sql.adaptive.coalescePartitions.initialPartitionNum | 200 | 合并前的初始shuffle分區(qū)數(shù)。默認情況下它等于spark.sql.shuffle.partitions镀虐。此配置僅在spark.sql. adaptive.net enabled和spark.sql. adaptive.net coalescepartitions .enabled同時啟用時有效箱蟆。 | 3.0.0 |
spark.sql.adaptive.advisoryPartitionSizeInBytes | 64 MB | 自適應(yīng)優(yōu)化期間shuffle分區(qū)的建議大小(當(dāng)spark.sql. adaptive_enabled為true時)。當(dāng)Spark對小shuffle分區(qū)或斜shuffle分區(qū)進行合并時生效粉私。 | 3.0.0 |
6.6 將排序合并聯(lián)接轉(zhuǎn)換為廣播聯(lián)接
當(dāng)任何連接側(cè)的運行時統(tǒng)計數(shù)據(jù)小于廣播散列連接閾值時顽腾,AQE將排序合并連接轉(zhuǎn)換為廣播散列連接。這不是一樣有效規(guī)劃一個廣播散列連接首先,但這總比繼續(xù)做分類合并加入,我們可以節(jié)省連接雙方的排序,并在本地讀取洗牌文件節(jié)省網(wǎng)絡(luò)流量(如果spark.sql.adaptive.localShuffleReader.enabled被設(shè)置為true)
6.7 優(yōu)化傾斜連接
數(shù)據(jù)傾斜會嚴重降低連接查詢的性能。該特性通過將傾斜任務(wù)拆分(如果需要的話還可以復(fù)制)為大小大致相同的任務(wù)抄肖,動態(tài)處理排序-合并連接中的傾斜任務(wù)久信。當(dāng)spark.sql.adaptive.enabled和spark.sql.adaptive.skewJoin.enabled配置同時啟用時生效。
參數(shù)名 | 默認值 | 參數(shù)說明 | 啟始版本 |
---|---|---|---|
spark.sql.adaptive.skewJoin.enabled | true | 當(dāng)true和Spark .sql.adaptive.enabled為true時漓摩,Spark通過拆分(并在需要時復(fù)制)傾斜分區(qū)來動態(tài)處理排序-合并連接中的傾斜裙士。 | 3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionFactor | 10 | 如果一個分區(qū)的大小大于這個因子乘以中值分區(qū)大小,并且大于spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes管毙,則認為該分區(qū)是傾斜的腿椎。 | 3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes | 256MB | 如果分區(qū)的字節(jié)大小大于這個閾值,并且大于spark.sql.adaptive.skewJoin.skewedPartitionFactor乘以分區(qū)中值大小夭咬,則認為該分區(qū)是傾斜的啃炸。理想情況下,該配置應(yīng)該設(shè)置為大于spark.sql.adaptive.advisoryPartitionSizeInBytes卓舵。 | 3.0.0 |