參考:
how-to-tune-your-apache-spark-jobs-part-1
how-to-tune-your-apache-spark-jobs-part-2
tuning_spark_streaming
Spark Streaming性能調(diào)優(yōu)詳解
Spark性能優(yōu)化:shuffle調(diào)優(yōu)
Spark性能優(yōu)化:數(shù)據(jù)傾斜調(diào)優(yōu)
Spark性能優(yōu)化:開發(fā)調(diào)優(yōu)篇
top-5-mistakes-when-writing-spark-applications 強力推薦
一 基礎(chǔ)說明
- job-->stage-->task
job劃分為stage,stage劃分為Task,一個Task運行在一個core上 - executor-->core
The number of tasks in a stage is the same as the number of partitions in the last RDD in the stage.
二 Tuning Resource Allocation
Spark應(yīng)用的GC調(diào)優(yōu) -->重點講解了G1垃圾回收器的調(diào)優(yōu)工作
Spark性能優(yōu)化:資源調(diào)優(yōu)篇
Every Spark executor in an application has the same fixed number of cores and same fixed heap size.
--executor-cores/ spark.executor.cores 提交時通過該參數(shù)設(shè)置每個executor的core數(shù)量鸣峭,決定了Task的并行度
--executor-memory/spark.executor.memory 設(shè)置executor的JVM memory
--num-executors/spark.executor.instances 設(shè)置executor的數(shù)量
spark.dynamicAllocation.enabled 設(shè)置動態(tài)申請資源(value設(shè)為true)羽资,此時不要設(shè)置num-executors
spark.yarn.executor.memoryOverhead 設(shè)置堆外的memory大小
spark.dynamicAllocation.enabled
executor空閑超時后岛杀,會被移除
對于Spark Streaming,數(shù)據(jù)按時間段到達畦粮,為了防止executor頻繁出現(xiàn)添加移除現(xiàn)象掌测,應(yīng)該禁用該功能。
內(nèi)存格局
說明:
The application master, which is a non-executor container with the special capability of requesting containers from YARN, takes up resources of its own that must be budgeted in. In yarn-client mode, it defaults to a 1024MB and one vcore. In yarn-cluster mode, the application master runs the driver, so it’s often useful to bolster its resources with the --driver-memory and --driver-cores properties.
Running executors with too much memory often results in excessive garbage collection delays. 64GB is a rough guess at a good upper limit for a single executor.最多4G內(nèi)存栖疑,防止GC壓力過大讨永。
I’ve noticed that the HDFS client has trouble with tons of concurrent threads. A rough guess is that at most five tasks per executor can achieve full write throughput, so it’s good to keep the number of cores per executor below that number. 最多5個Task可以同時達到最高的HDFS寫入帶寬
Running tiny executors (with a single core and just enough memory needed to run a single task, for example) throws away the benefits that come from running multiple tasks in a single JVM. For example, broadcast variables need to be replicated once on each executor, so many small executors will result in many more copies of the data.
注意事項:
保留內(nèi)存和core給hadoop ,yarn等系統(tǒng)運行
Slimming Down Your Data Structures
定制序列化方法,減少序列化后的存儲占用
spark.serializer=org.apache.spark.serializer.KryoSerializer
三 Tuning Parallelism
分區(qū)過少時遇革,Task數(shù)量有限卿闹,無法充分利用機器資源。
方法:
- Use the repartition transformation, which will trigger a shuffle.
- Configure your InputFormat to create more splits.
- Write the input data out to HDFS with a smaller block size.
3.1 參數(shù)spark.default.parallelism
參數(shù)說明:該參數(shù)用于設(shè)置每個stage的默認task數(shù)量萝快。這個參數(shù)極為重要锻霎,如果不設(shè)置可能會直接影響你的Spark作業(yè)性能。
參數(shù)調(diào)優(yōu)建議:Spark作業(yè)的默認task數(shù)量為500~1000個較為合適揪漩。很多同學(xué)常犯的一個錯誤就是不去設(shè)置這個參數(shù)旋恼,那么此時就會導(dǎo)致Spark自己根據(jù)底層HDFS的block數(shù)量來設(shè)置task的數(shù)量,默認是一個HDFS block對應(yīng)一個task奄容。通常來說冰更,Spark默認設(shè)置的數(shù)量是偏少的(比如就幾十個task),如果task數(shù)量偏少的話昂勒,就會導(dǎo)致你前面設(shè)置好的Executor的參數(shù)都前功盡棄蜀细。
減少shuffle以及shuffle的數(shù)據(jù)量
操作repartition , join, cogroup, and any of the *By or *ByKey transformations can result in shuffles.
-
Avoid groupByKey when performing an associative reductive operation. For example, rdd.groupByKey().mapValues(_.sum) will produce the same results as rdd.reduceByKey(_ + _)
However, the former will transfer the entire dataset across the network, while the latter will compute local sums for each key in each partition and combine those local sums into larger sums after shuffling.
以下函數(shù)應(yīng)該優(yōu)先于 groupByKey :
- combineByKey組合數(shù)據(jù),但是組合之后的數(shù)據(jù)類型與輸入時值的類型不一樣戈盈。
- foldByKey 合并每一個 key 的所有值奠衔,在級聯(lián)函數(shù)和“零值”中使用。
- Avoid reduceByKey When the input and output value types are different.
rdd.map(kv => (kv._1, new Set[String]() + kv._2))
.reduceByKey(_ ++ _)
This code results in tons of unnecessary object creation because a new set must be allocated for each record. It’s better to use aggregateByKey, which performs the map-side aggregation more efficiently:
val zero = new collection.mutable.Set[String]()
rdd.aggregateByKey(zero)(
(set, v) => set += v,
(set1, set2) => set1 ++= set2)
- Avoid the
flatMap-join-groupBy
pattern. When two datasets are already grouped by key and you want to join them and keep them grouped, you can just usecogroup
. That avoids all the overhead associated with unpacking and repacking the groups. join數(shù)據(jù)源時直接使用cogroup
四 shuffle不發(fā)生的情況
- 兩個數(shù)據(jù)源進行join時塘娶,已經(jīng)進行g(shù)roup分組后涣觉,如果分組時使用的是同樣的partitioner,那么進行join時是不需要進行shuffle的血柳。
- 當(dāng)數(shù)據(jù)量較少時,使用廣播變量生兆,不需要shuffle
When More Shuffles are Better
當(dāng)數(shù)據(jù)partition較少难捌,數(shù)據(jù)量較大時,進行shuffle可以提高partition數(shù)量鸦难,提高并行度根吁,從而達到提高效率的目的。
五 RDD
Spark性能優(yōu)化:開發(fā)調(diào)優(yōu)篇
- 原則一:避免創(chuàng)建重復(fù)的RDD
- 原則二:盡可能復(fù)用同一個RDD
- 原則三:對多次使用的RDD進行持久化 cache persist
- 原則四:盡量避免使用shuffle類算子 廣播大變量
- 原則五:使用map-side預(yù)聚合的shuffle操作
- 原則六:使用高性能的算子
- 使用reduceByKey/aggregateByKey替代groupByKey
- 使用mapPartitions替代普通map(mapPartitions類的算子合蔽,一次函數(shù)調(diào)用會處理一個partition所有的數(shù)據(jù)击敌,而不是一次函數(shù)調(diào)用處理一條,性能相對來說會高一些拴事。)
- 使用foreachPartitions替代foreach(一次函數(shù)調(diào)用處理一個partition的所有數(shù)據(jù)沃斤,而不是一次函數(shù)調(diào)用處理一條數(shù)據(jù))
- 使用filter之后進行coalesce操作(通常對一個RDD執(zhí)行filter算子過濾掉RDD中較多數(shù)據(jù)后(比如30%以上的數(shù)據(jù))圣蝎,建議使用coalesce算子,手動減少RDD的partition數(shù)量衡瓶,將RDD中的數(shù)據(jù)壓縮到更少的partition中去徘公。)
- 原則七:廣播大變量
- 原則八:使用Kryo優(yōu)化序列化性能
- 原則九:優(yōu)化數(shù)據(jù)結(jié)構(gòu)
5.1 不要將大型RDD中所有元素發(fā)送到Driver端
慎重使用collect countByKey countByValue collectAsMap
等函數(shù),使用take或者takeSample
來限制數(shù)據(jù)大小的上限
六 其他
6.1 Spark優(yōu)化:禁止應(yīng)用程序?qū)⒁蕾嚨腏ar包傳到HDFS
Spark優(yōu)化:禁止應(yīng)用程序?qū)⒁蕾嚨腏ar包傳到HDFS
編輯spark-default.conf文件哮针,添加以下內(nèi)容:
spark.yarn.jar=hdfs://my/home/iteblog/spark_lib/spark-assembly-1.1.0-hadoop2.2.0.jar
也就是使得spark.yarn.jar指向我們HDFS上的Spark lib庫关面。