Java-Spark系列10-Spark性能調(diào)優(yōu)概述

一.Spark 性能優(yōu)化概述

首先筆者能力優(yōu)先擎厢,使用Spark有一段時間士鸥,如下是筆者的工作經(jīng)驗的總結(jié)赁项。

Spark任務(wù)運行圖:


image.png

Spark的優(yōu)化思路:
一般是從3個層面進行Spark程序的優(yōu)化:

  1. 運行環(huán)境優(yōu)化
  2. RDD算子優(yōu)化
  3. 參數(shù)微調(diào)

二.運行環(huán)境優(yōu)化

2.1 數(shù)據(jù)本地性

我們知道HDFS的數(shù)據(jù)文件存儲在不同的datanode籽暇,一般數(shù)據(jù)副本數(shù)量是3,因為Spark計算的數(shù)據(jù)量比較大养铸,如果數(shù)據(jù)不在本節(jié)點雁芙,需要通過網(wǎng)絡(luò)去其它的datanode讀取數(shù)據(jù)轧膘。

所以此時我們可以通過提高數(shù)據(jù)本地性,減少網(wǎng)絡(luò)傳輸兔甘,來達到性能優(yōu)化的目的谎碍。

  1. 計算和存儲同節(jié)點(executor和HDFS的datanode、hbase的region server同節(jié)點)
  2. executor數(shù)目合適: 如果100個數(shù)據(jù)界定洞焙,3個計算節(jié)點蟆淀,就有97份網(wǎng)絡(luò)傳遞,所以此種情況可以適當(dāng)增加計算節(jié)點澡匪。
  3. 適當(dāng)增加數(shù)據(jù)副本數(shù)量

2.2 數(shù)據(jù)存儲格式

推薦使用列式存儲格式: parquet.
parquet存在如下優(yōu)先:

  1. 相同數(shù)據(jù)類型的數(shù)據(jù)有很高壓縮比
  2. Hive主要支持ORC熔任、也支持parquet

三.RDD算子優(yōu)化

3.1 盡可能復(fù)用同一個RDD

每創(chuàng)建一個RDD都會帶來性能的開銷,盡可能的對同一個RDD做算子操作仙蛉,而不要頻繁創(chuàng)建新的
RDD笋敞。

3.2 對多次使用的RDD進行持久化

如果RDD的算子特別多,需要頻繁多次操作同一個RDD荠瘪,最好的辦法是將該RDD進行持久化,

四.參數(shù)微調(diào)

  1. num-executors
    參數(shù)說明:該參數(shù)用于設(shè)置每個Executor進程的內(nèi)存赛惩。Executor內(nèi)存的大小哀墓,很多時候直接決定了Spark作業(yè)的性能,而且跟常見的JVM OOM異常喷兼,也有直接的關(guān)聯(lián)篮绰。

  2. executor-cores
    參數(shù)說明:該參數(shù)用于設(shè)置每個Executor進程的CPU core數(shù)量。

  3. driver-memory
    參數(shù)說明:該參數(shù)用于設(shè)置Driver進程的內(nèi)存季惯。

  4. spark.default.parallelism
    參數(shù)說明:該參數(shù)用于設(shè)置每個stage的默認task數(shù)量吠各。

  5. spark.storage.memoryFraction
    參數(shù)說明:該參數(shù)用于設(shè)置RDD持久化數(shù)據(jù)在Executor內(nèi)存中能占的比例,默認是0.6勉抓。

  6. spark.shuffle.memoryFraction
    參數(shù)說明:該參數(shù)用于設(shè)置shuffle過程中一個task拉取到上個stage的task的輸出后贾漏,進行聚合操作時能夠使用的Executor內(nèi)存的比例,默認是0.2藕筋。

資源參數(shù)參考示例:

./bin/spark-submit \
  --master yarn-cluster \
  --num-executors 100 \
  --executor-memory 6G \
  --executor-cores 4 \
  --driver-memory 1G \
  --conf spark.default.parallelism=1000 \
  --conf spark.storage.memoryFraction=0.5 \
  --conf spark.shuffle.memoryFraction=0.3 \

五.數(shù)據(jù)傾斜

絕大多數(shù)task執(zhí)行得都非匙萆ⅲ快,但個別task執(zhí)行極慢隐圾。比如伍掀,總共有1000個task,997個task都在1分鐘之內(nèi)執(zhí)行完了暇藏,但是剩余兩三個task卻要一兩個小時蜜笤。這種情況很常見。

數(shù)據(jù)傾斜圖例:


image.png

解決數(shù)據(jù)傾斜一般有如下幾種常用方法:

  1. 使用Hive ETL預(yù)處理數(shù)據(jù)
    先使用Hive進行預(yù)處理數(shù)據(jù)盐碱,也就是使用Hive先計算一層中間數(shù)據(jù)把兔,Spark從中間層數(shù)據(jù)開始計算沪伙。

  2. 過濾少數(shù)導(dǎo)致傾斜的key
    如果發(fā)生導(dǎo)致傾斜的key非常少,可以將Spark任務(wù)拆分為包含 導(dǎo)致傾斜的key的任務(wù)和不包含key的任務(wù)垛贤。

  3. sample采樣傾斜key單獨進行join
    通過采樣焰坪,提前預(yù)估會發(fā)生數(shù)據(jù)傾斜的key,然后將一個join拆分為兩個join聘惦,其中一個不包含該key某饰,一個只包含該key,最后將結(jié)果集進行union善绎。

  4. 調(diào)整并行度
    調(diào)整Shuffle并行度黔漂,數(shù)據(jù)打散

  5. 廣播小數(shù)據(jù)集
    適用于一個大表,一個小表
    不用join連接操作禀酱,而改用Broadcast變量與map模擬join操作炬守,完全規(guī)避shuffle操作
    spark.sql: spark.sql.autoBroadcastJoinThreshold=104857600

  6. 增加隨機前綴
    對發(fā)生傾斜的RDD增加隨機前綴
    對另外一個RDD等量擴容
    如果少量的key發(fā)生傾斜,可以先過濾出一個單獨的RDD剂跟,對另外一個RDD同理吹减途,join之后再合并

六. Spark常用的調(diào)優(yōu)參數(shù)

6.1 在內(nèi)存中緩存數(shù)據(jù)

Spark SQL可以通過調(diào)用Spark.catalog.cachetable ("tableName")或DataFrame.cache()來使用內(nèi)存中的columnar格式緩存表。然后Spark SQL將只掃描所需的列曹洽,并自動調(diào)優(yōu)壓縮以最小化內(nèi)存使用和GC壓力鳍置。你可以調(diào)用spark.catalog.uncacheTable("tableName")從內(nèi)存中刪除表。

內(nèi)存緩存的配置可以在SparkSession上使用setConf方法或者使用SQL運行SET key=value命令來完成送淆。

參數(shù)名 默認值 參數(shù)說明 啟始版本
spark.sql.inMemoryColumnarStorage.compressed true 當(dāng)設(shè)置為true時税产,Spark SQL會根據(jù)數(shù)據(jù)統(tǒng)計自動為每列選擇壓縮編解碼器。 1.0.1
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制柱狀緩存的批大小偷崩。更大的批處理大小可以提高內(nèi)存利用率和壓縮辟拷,但在緩存數(shù)據(jù)時可能會帶來OOMs風(fēng)險。 1.1.1

6.2 其它配置項

還可以使用以下選項調(diào)優(yōu)查詢執(zhí)行的性能阐斜。隨著更多的優(yōu)化被自動執(zhí)行衫冻,這些選項可能會在未來的版本中被棄用。

參數(shù)名 默認值 參數(shù)說明 啟始版本
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 讀取文件時裝入單個分區(qū)的最大字節(jié)數(shù)智听。此配置僅在使用基于文件的源(如Parquet羽杰、JSON和ORC)時有效。 2.0.0
spark.sql.files.openCostInBytes 4194304 (4 MB) 打開一個文件的估計成本到推,由可以在同一時間掃描的字節(jié)數(shù)來衡量考赛。當(dāng)將多個文件放入一個分區(qū)時使用。最好是高估莉测,那么帶有小文件的分區(qū)將比帶有大文件的分區(qū)更快(這是首先安排的)颜骤。此配置僅在使用基于文件的源(如Parquet、JSON和ORC)時有效捣卤。 2.0.0
spark.sql.files.minPartitionNum Default Parallelism 建議的(不是保證的)最小分割文件分區(qū)數(shù)忍抽。如果沒有設(shè)置八孝,默認值是' spark.default.parallelism '。此配置僅在使用基于文件的源(如Parquet鸠项、JSON和ORC)時有效干跛。 3.1.0
spark.sql.broadcastTimeout 300 broadcast join 等待時間的超時(秒) 1.3.0
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置在執(zhí)行聯(lián)接時將廣播到所有工作節(jié)點的表的最大字節(jié)大小。通過將此值設(shè)置為-1祟绊,可以禁用廣播楼入。注意:目前統(tǒng)計只支持運行ANALYZE TABLE COMPUTE statistics noscan命令的Hive Metastore表。 1.1.0
spark.sql.shuffle.partitions 200 配置將數(shù)據(jù)變換為連接或聚合時要使用的分區(qū)數(shù)量牧抽。 1.1.0
spark.sql.sources.parallelPartitionDiscovery.threshold 32 配置閾值以啟用作業(yè)輸入路徑的并行列出嘉熊。如果輸入路徑數(shù)大于該閾值,Spark將通過Spark分布式作業(yè)列出文件扬舒。否則阐肤,它將退回到順序列表。此配置僅在使用基于文件的數(shù)據(jù)源(如Parquet讲坎、ORC和JSON)時有效孕惜。 1.5.0
spark.sql.sources.parallelPartitionDiscovery.parallelism 10000 配置作業(yè)輸入路徑的最大列出并行度。如果輸入路徑的數(shù)量大于這個值晨炕,它將被降低到使用這個值诊赊。與上面一樣,此配置僅在使用基于文件的數(shù)據(jù)源(如Parquet府瞄、ORC和JSON)時有效。 2.1.1

6.3 SQL查詢連接的hint

join策略提示BROADCAST碘箍、MERGE遵馆、SHUFFLE_HASH和SHUFFLE_REPLICATE_NL,在將指定的關(guān)系加入到另一個關(guān)系時丰榴,指示Spark對每個指定的關(guān)系使用暗示策略货邓。例如,在表' t1 '上使用BROADCAST提示時四濒,廣播加入(廣播散列連接或廣播嵌套循環(huán)聯(lián)接取決于是否有等值連接鍵)與t1的構(gòu)建方面將由火花即使大小的優(yōu)先表t1的建議的統(tǒng)計配置spark.sql.autoBroadcastJoinThreshold之上换况。

當(dāng)連接兩端指定了不同的連接策略提示時,Spark會優(yōu)先考慮BROADCAST提示而不是MERGE提示盗蟆,優(yōu)先考慮SHUFFLE_HASH提示而不是SHUFFLE_REPLICATE_NL提示戈二。當(dāng)雙方都指定了BROADCAST提示或SHUFFLE_HASH提示時,Spark將根據(jù)連接類型和關(guān)系的大小選擇構(gòu)建端喳资。

請注意觉吭,不能保證Spark會選擇提示中指定的連接策略,因為特定的策略可能不支持所有的連接類型仆邓。

-- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint
SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key

Coalesce hint允許Spark SQL用戶控制輸出文件的數(shù)量鲜滩,就像Dataset API中的Coalesce伴鳖、repartition和repartitionByRange一樣,它們可以用于性能調(diào)優(yōu)和減少輸出文件的數(shù)量徙硅。COALESCE hint只有一個分區(qū)號作為參數(shù)榜聂。“REPARTITION”提示有一個分區(qū)號嗓蘑、列或它們都作為參數(shù)须肆。“REPARTITION_BY_RANGE”提示必須有列名脐往,分區(qū)號是可選的休吠。

SELECT /*+ COALESCE(3) */ * FROM t
SELECT /*+ REPARTITION(3) */ * FROM t
SELECT /*+ REPARTITION(c) */ * FROM t
SELECT /*+ REPARTITION(3, c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t

6.4 自適應(yīng)查詢執(zhí)行

Adaptive Query Execution (AQE)是Spark SQL中的一種優(yōu)化技術(shù),它利用運行時統(tǒng)計信息來選擇最高效的查詢執(zhí)行計劃业簿。默認情況下AQE是禁用的瘤礁。Spark SQL可以使用Spark.SQL.adaptive.enabled的傘配置來控制是否打開/關(guān)閉。從Spark 3.0開始梅尤,AQE中有三個主要特性柜思,包括合并shuffle后分區(qū)、將排序合并連接轉(zhuǎn)換為廣播連接以及傾斜連接優(yōu)化巷燥。

6.5 合并分區(qū)后重新組合

當(dāng)spark.sql.adaptive.enabled和spark.sql.adaptive.coalescePartitions.enabled配置都為true時赡盘,該特性根據(jù)map輸出統(tǒng)計信息來合并post shuffle分區(qū)。這個特性簡化了運行查詢時shuffle分區(qū)號的調(diào)優(yōu)缰揪。您不需要設(shè)置合適的shuffle分區(qū)號來適合您的數(shù)據(jù)集陨享。一旦您通過Spark .sql. adaptive.coalescepartitions . initialpartitionnum配置設(shè)置了足夠大的初始shuffle分區(qū)數(shù),Spark就可以在運行時選擇適當(dāng)?shù)膕huffle分區(qū)號钝腺。

參數(shù)名 默認值 參數(shù)說明 啟始版本
spark.sql.adaptive.coalescePartitions.enabled true 當(dāng)true和Spark .sql. adaptive_enabled為true時抛姑,Spark會根據(jù)目標(biāo)大小(由Spark .sql. adaptive_advisorypartitionsizeinbytes指定)合并連續(xù)的shuffle分區(qū),以避免過多的小任務(wù)艳狐。 3.0.0
spark.sql.adaptive.coalescePartitions.minPartitionNum Default Parallelism 合并后的最小洗牌分區(qū)數(shù)定硝。如果不設(shè)置,則默認為Spark集群的默認并行度毫目。此配置僅在spark.sql. adaptive.net enabled和spark.sql. adaptive.net coalescepartitions .enabled同時啟用時有效蔬啡。 3.0.0
spark.sql.adaptive.coalescePartitions.initialPartitionNum 200 合并前的初始shuffle分區(qū)數(shù)。默認情況下它等于spark.sql.shuffle.partitions镀虐。此配置僅在spark.sql. adaptive.net enabled和spark.sql. adaptive.net coalescepartitions .enabled同時啟用時有效箱蟆。 3.0.0
spark.sql.adaptive.advisoryPartitionSizeInBytes 64 MB 自適應(yīng)優(yōu)化期間shuffle分區(qū)的建議大小(當(dāng)spark.sql. adaptive_enabled為true時)。當(dāng)Spark對小shuffle分區(qū)或斜shuffle分區(qū)進行合并時生效粉私。 3.0.0

6.6 將排序合并聯(lián)接轉(zhuǎn)換為廣播聯(lián)接

當(dāng)任何連接側(cè)的運行時統(tǒng)計數(shù)據(jù)小于廣播散列連接閾值時顽腾,AQE將排序合并連接轉(zhuǎn)換為廣播散列連接。這不是一樣有效規(guī)劃一個廣播散列連接首先,但這總比繼續(xù)做分類合并加入,我們可以節(jié)省連接雙方的排序,并在本地讀取洗牌文件節(jié)省網(wǎng)絡(luò)流量(如果spark.sql.adaptive.localShuffleReader.enabled被設(shè)置為true)

6.7 優(yōu)化傾斜連接

數(shù)據(jù)傾斜會嚴重降低連接查詢的性能。該特性通過將傾斜任務(wù)拆分(如果需要的話還可以復(fù)制)為大小大致相同的任務(wù)抄肖,動態(tài)處理排序-合并連接中的傾斜任務(wù)久信。當(dāng)spark.sql.adaptive.enabled和spark.sql.adaptive.skewJoin.enabled配置同時啟用時生效。

參數(shù)名 默認值 參數(shù)說明 啟始版本
spark.sql.adaptive.skewJoin.enabled true 當(dāng)true和Spark .sql.adaptive.enabled為true時漓摩,Spark通過拆分(并在需要時復(fù)制)傾斜分區(qū)來動態(tài)處理排序-合并連接中的傾斜裙士。 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionFactor 10 如果一個分區(qū)的大小大于這個因子乘以中值分區(qū)大小,并且大于spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes管毙,則認為該分區(qū)是傾斜的腿椎。 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB 如果分區(qū)的字節(jié)大小大于這個閾值,并且大于spark.sql.adaptive.skewJoin.skewedPartitionFactor乘以分區(qū)中值大小夭咬,則認為該分區(qū)是傾斜的啃炸。理想情況下,該配置應(yīng)該設(shè)置為大于spark.sql.adaptive.advisoryPartitionSizeInBytes卓舵。 3.0.0

參考:

  1. http://spark.apache.org/docs/latest/rdd-programming-guide.html
  2. https://tech.meituan.com/2016/04/29/spark-tuning-basic.html
  3. https://blog.csdn.net/meihao5/article/details/81084876
  4. http://spark.apache.org/docs/latest/sql-performance-tuning.html
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末南用,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子掏湾,更是在濱河造成了極大的恐慌裹虫,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,820評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件融击,死亡現(xiàn)場離奇詭異筑公,居然都是意外死亡,警方通過查閱死者的電腦和手機尊浪,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,648評論 3 399
  • 文/潘曉璐 我一進店門匣屡,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人拇涤,你說我怎么就攤上這事耸采。” “怎么了工育?”我有些...
    開封第一講書人閱讀 168,324評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長搓彻。 經(jīng)常有香客問我如绸,道長,這世上最難降的妖魔是什么旭贬? 我笑而不...
    開封第一講書人閱讀 59,714評論 1 297
  • 正文 為了忘掉前任怔接,我火速辦了婚禮,結(jié)果婚禮上稀轨,老公的妹妹穿的比我還像新娘扼脐。我一直安慰自己,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 68,724評論 6 397
  • 文/花漫 我一把揭開白布瓦侮。 她就那樣靜靜地躺著艰赞,像睡著了一般。 火紅的嫁衣襯著肌膚如雪肚吏。 梳的紋絲不亂的頭發(fā)上方妖,一...
    開封第一講書人閱讀 52,328評論 1 310
  • 那天,我揣著相機與錄音罚攀,去河邊找鬼党觅。 笑死,一個胖子當(dāng)著我的面吹牛斋泄,可吹牛的內(nèi)容都是我干的杯瞻。 我是一名探鬼主播,決...
    沈念sama閱讀 40,897評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼炫掐,長吁一口氣:“原來是場噩夢啊……” “哼魁莉!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起卒废,我...
    開封第一講書人閱讀 39,804評論 0 276
  • 序言:老撾萬榮一對情侶失蹤沛厨,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后摔认,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體回论,經(jīng)...
    沈念sama閱讀 46,345評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,431評論 3 340
  • 正文 我和宋清朗相戀三年帮碰,在試婚紗的時候發(fā)現(xiàn)自己被綠了浑彰。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,561評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡抹蚀,死狀恐怖剿牺,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情环壤,我是刑警寧澤晒来,帶...
    沈念sama閱讀 36,238評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站郑现,受9級特大地震影響湃崩,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜接箫,卻給世界環(huán)境...
    茶點故事閱讀 41,928評論 3 334
  • 文/蒙蒙 一攒读、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧辛友,春花似錦薄扁、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,417評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽脱盲。三九已至,卻和暖如春震放,著一層夾襖步出監(jiān)牢的瞬間宾毒,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,528評論 1 272
  • 我被黑心中介騙來泰國打工殿遂, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留诈铛,地道東北人。 一個月前我還...
    沈念sama閱讀 48,983評論 3 376
  • 正文 我出身青樓墨礁,卻偏偏與公主長得像幢竹,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子恩静,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,573評論 2 359

推薦閱讀更多精彩內(nèi)容