代碼優(yōu)化部分
- 多個Action計算最好基于同一個RDD進(jìn)行計算操作, 并且對相同的RDD進(jìn)行Cache操作药版,避免重復(fù)計算,增加任務(wù)的執(zhí)行時間翼虫;并且持久化級別最好使用MEMORY_ONLY_SER來減少內(nèi)存使用;
- 在使用join的地方看是否可以使用map算子和廣播變量的方式替代;
- 使用高效的算子拳话, 例如:
- 使用reduceByKey/aggregateByKey來代替groupByKey, 因為前者可以進(jìn)行combiner操作,減少網(wǎng)絡(luò)IO;
- 使用MapPartition來代替Map操作种吸, 尤其是在需要網(wǎng)絡(luò)連接的地方弃衍;
- 使用foreachPartition代替foreach操作,可以對數(shù)據(jù)進(jìn)行批量處理坚俗;
- 在filter操作后镜盯,可以使用colease操作岸裙,可以減少任務(wù)數(shù);
- 序列化盡量使用Kyro方式, 其性能更好;
- 減少對復(fù)雜數(shù)據(jù)結(jié)構(gòu)的使用,可以有效減少序列化時間;
- 對應(yīng)簡單的函數(shù)速缆,最好使用閉合結(jié)構(gòu)降允,可以有效減少網(wǎng)絡(luò)IO;
- 使用Repartition操作可以有效增加任務(wù)的處理并行度;
參數(shù)調(diào)整優(yōu)化部分
經(jīng)過實踐驗證艺糜,調(diào)整后有效的參數(shù)如下:
- 根據(jù)資源情況剧董,可以添加Executor的個數(shù)來有效,參數(shù)為** spark.executor.instances **
- 調(diào)整每個Executor的使用內(nèi)核數(shù), 參數(shù)為** spark.executor.cores **
- 調(diào)整每個Executor的內(nèi)存破停, 參數(shù)為** spark.executor.memory **
- shuffle write task的buffer大小送滞, 參數(shù)為** spark.shuffle.file.buffer **
- shuffle read task的buffer大小, 參數(shù)為** spark.reducer.maxSizeInFlight **
- 每一個stage的task的默認(rèn)并行度辱挥, 默認(rèn)為200犁嗅, 建議修改為1000左右, 參數(shù) ** spark.default.parallelism **
- 用于RDD的持久化使用的內(nèi)存比例晤碘,默認(rèn)0.6, 參數(shù) ** spark.storage.memoryFraction **
- 用戶shuffle使用的內(nèi)存比例褂微, 默認(rèn)為0.2, 參數(shù) ** spark.shuffle.memoryFraction **
其它優(yōu)化
- 增加數(shù)據(jù)讀取的并行度园爷,比如讀取Kafka的數(shù)據(jù)宠蚂, 可以增加topic的partition數(shù)量和executor的個數(shù);
- 限制讀取Kafka數(shù)據(jù)的速率童社,參數(shù) ** spark.streaming.kafka.maxRatePerPartition **
- 對于存在數(shù)據(jù)傾斜問題求厕,有兩類情況:
- 進(jìn)行join操作,產(chǎn)生skew問題扰楼, 可以使用map+廣播變量類進(jìn)行處理呀癣;
- 對redece/aggregate等聚合操作,參數(shù)skew問題弦赖, 可以進(jìn)行兩次聚合的思想來解決项栏, 核心是先進(jìn)行key進(jìn)行隨機(jī)數(shù)操作,是數(shù)據(jù)分布均勻蹬竖,并進(jìn)行聚合沼沈,最后是剔除隨機(jī)數(shù)據(jù),用實際數(shù)據(jù)來進(jìn)行聚合操作币厕。