更多大數(shù)據(jù)技術(shù)干貨,歡迎關(guān)注“大數(shù)據(jù)技術(shù)進(jìn)階”微信公眾號(hào)谬盐。
1 代碼優(yōu)化
a. 對(duì)于多次使用的RDD甸私,進(jìn)行數(shù)據(jù)持久化操作(eg: cache、persist)
b. 如果對(duì)同一個(gè)份數(shù)據(jù)進(jìn)行操作飞傀,那么盡量公用一個(gè)RDD
c. 優(yōu)先使用reduceByKey和aggregateByKey取代groupByKey
原因:前兩個(gè)API存在combiner皇型,可以降低數(shù)據(jù)量诬烹;groupByKey可能存在OOM異常
d. 對(duì)于Executor使用到Driver中的變量的情況,使用廣播變量進(jìn)行數(shù)據(jù)傳遞, 可以減少網(wǎng)絡(luò)傳輸量弃鸦,原理是:使用廣播變量后绞吁,原來(lái)Driver傳遞給Task的數(shù)據(jù),變成只需要傳遞給Executor即可唬格。
e. 當(dāng)大表join小表家破,而且存在shuffle的時(shí)候,可以考慮使用map join來(lái)進(jìn)行替換<使用廣播變量將小表的數(shù)據(jù)廣播出去购岗,前提:Driver和單個(gè)的Executor的內(nèi)存可以存儲(chǔ)下小表的數(shù)據(jù)>汰聋;
h. 啟動(dòng)kyro序列化機(jī)制
2資源優(yōu)化
a. spark-submit腳本相關(guān)參數(shù)
driver的內(nèi)存:--driver-memory
driver的cpu:
standalone(cluster):--driver-cores
yarn(cluster): --driver-cores
executor的數(shù)量:
yarn: --num-executors
總的executor的CPU數(shù)量:
standalone/mesos:--total-executor-cores
單個(gè)executor的內(nèi)存:--executor-memory
單個(gè)executor的cpu:
standalone/yarn:--executor-cores
b. 資源相關(guān)參數(shù)
spark.driver.cores:1
spark.driver.memory:1g
spark.executor.cores:1(yarn)/all(standalone)
spark.executor.memory:1g
spark.memory.fraction:0.75
spark.memory.storageFraction:0.5
Spark中執(zhí)行和緩存的內(nèi)存是公用的,執(zhí)行可以爭(zhēng)奪緩存的內(nèi)存喊积,就是可以將部分緩存自動(dòng)清楚烹困,用于執(zhí)行過(guò)程中使用內(nèi)存;這兩個(gè)參數(shù)的含義分別是:spark.memory.fraction指定總內(nèi)存占比((1g-300M) * 0.75)注服,spark.memory.storageFraction指定緩存部分最少占比內(nèi)存((1g-300M) * 0.75 * 0.5)韭邓;當(dāng)沒(méi)有執(zhí)行的情況下,緩存可以使用全部的公用內(nèi)存溶弟,即緩存最多使用((1g-300M) * 0.75)女淑,最少可占用((1g-300M) * 0.75 * 0.5)
1.5版本以前的采用固定內(nèi)存設(shè)置:spark.storage.memoryFraction(0.6)以及spark.shuffle.memoryFraction(0.2)
spark.default.parallelism: 默認(rèn)的分區(qū)數(shù)量,默認(rèn)兩個(gè)辜御,一般比較醒寄恪;在實(shí)際環(huán)境中一般需要改大擒权。
spark.scheduler.mode:FIFO(默認(rèn),先進(jìn)先出)/FAIR(公平調(diào)度)
spark.task.cpus:每個(gè)Task執(zhí)行需要的CPU數(shù)量(默認(rèn)值1)
spark.task.maxFailures:每個(gè)Task允許的最大失敗次數(shù)(默認(rèn)值4)
spark.dynamicAllocation.enabled: false; 是否啟動(dòng)動(dòng)態(tài)分配資源袱巨,默認(rèn)為不啟動(dòng)
spark.shuffle.service.enabled:false,當(dāng)啟動(dòng)動(dòng)態(tài)資源分配的時(shí)候碳抄,該參數(shù)必須設(shè)置為true愉老,表示允許額外的shuffle服務(wù)管理
spark.dynamicAllocation.initialExecutors:動(dòng)態(tài)資源初始executor數(shù)量
spark.dynamicAllocation.maxExecutors:動(dòng)態(tài)資源設(shè)置最大允許分配資源
spark.dynamicAllocation.minExecutors:動(dòng)態(tài)資源設(shè)置最小允許分配資源,默認(rèn)(0)
Spark on Yarn:
spark.yarn.am.memory:512m; 運(yùn)行在Yarn上的時(shí)候ApplicationMaster運(yùn)行的內(nèi)存大小(client模式下)
spark.yarn.am.cores:1; ApplicationMaster運(yùn)行的CPU核數(shù)(client模式下)
spark.executor.instances: Executor的數(shù)量剖效,默認(rèn)2個(gè)嫉入;該參數(shù)和動(dòng)態(tài)參數(shù)參數(shù)互斥,當(dāng)兩者都存在的時(shí)候璧尸,動(dòng)態(tài)參數(shù)設(shè)置無(wú)效咒林。
3 數(shù)據(jù)傾斜優(yōu)化
a. 兩階段聚合
b. 使用MAP JOIN替代REDUCE JOIN
c. 數(shù)據(jù)重分區(qū)(更改分區(qū)數(shù)量)
e. 擴(kuò)容RDD及隨機(jī)前綴JOIN方式
4 shuffle過(guò)程優(yōu)化(兩種ShuffleManager,四種模式一定要懂)
a. spark.shuffle.file.buffer:32k; 數(shù)據(jù)溢出磁盤(pán)的緩沖區(qū)內(nèi)存大小
b. spark.shuffle.manager: sort; 給定數(shù)據(jù)
shuffle的管理器爷光,sort(基于排序規(guī)則)或者h(yuǎn)ash(基于Hash值)
c. spark.shuffle.sort.bypassMergeThreshold: 200; 當(dāng)分區(qū)數(shù)量小于該值的時(shí)候垫竞,啟動(dòng)SortShuffleManager中的bypass模式
d. spark.shuffle.consolidateFiles: false; 當(dāng)該參數(shù)為true的時(shí)候,使用hash shuffle的時(shí)候蛀序,可以提高shuffle速度欢瞪,原理是:合并shuffle過(guò)程中的數(shù)據(jù)輸出文件