5W字總結Spark(三)(建議收藏)

八帝火、Spark 數(shù)據(jù)傾斜

詳見: 八種解決 Spark 數(shù)據(jù)傾斜的方法
http://www.reibang.com/p/a917c9969cff

九祠够、Spark性能優(yōu)化

Spark調優(yōu)之RDD算子調優(yōu)

1. RDD復用

在對RDD進行算子時诈铛,要避免相同的算子和計算邏輯之下對RDD進行重復的計算问窃,如下圖所示:

RDD的重復計算

對上圖中的RDD計算架構進行修改,得到如下圖所示的優(yōu)化結果:


RDD架構優(yōu)化

2. 盡早filter

獲取到初始RDD后锐想,應該考慮盡早地過濾掉不需要的數(shù)據(jù)抽碌,進而減少對內存的占用,從而提升Spark作業(yè)的運行效率援制。

本文首發(fā)于公眾號:五分鐘學大數(shù)據(jù)戏挡,歡迎圍觀!回復【書籍】即可獲得上百本大數(shù)據(jù)書籍

3. 讀取大量小文件-用wholeTextFiles

當我們將一個文本文件讀取為 RDD 時晨仑,輸入的每一行都會成為RDD的一個元素褐墅。

也可以將多個完整的文本文件一次性讀取為一個pairRDD,其中鍵是文件名洪己,值是文件內容妥凳。

val input:RDD[String] = sc.textFile("dir/*.log") 

如果傳遞目錄,則將目錄下的所有文件讀取作為RDD答捕。文件路徑支持通配符逝钥。

但是這樣對于大量的小文件讀取效率并不高,應該使用 wholeTextFiles返回值為RDD[(String, String)]拱镐,其中Key是文件的名稱艘款,Value是文件的內容持际。

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)])

wholeTextFiles讀取小文件:

val filesRDD: RDD[(String, String)] =
sc.wholeTextFiles("D:\\data\\files", minPartitions = 3)
val linesRDD: RDD[String] = filesRDD.flatMap(_._2.split("\\r\\n"))
val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" "))
wordsRDD.map((_, 1)).reduceByKey(_ + _).collect().foreach(println)

4. mapPartition和foreachPartition

  • mapPartitions

map(_....) 表示每一個元素

mapPartitions(_....) 表示每個分區(qū)的數(shù)據(jù)組成的迭代器

普通的map算子對RDD中的每一個元素進行操作,而mapPartitions算子對RDD中每一個分區(qū)進行操作哗咆。

如果是普通的map算子蜘欲,假設一個partition有1萬條數(shù)據(jù),那么map算子中的function要執(zhí)行1萬次晌柬,也就是對每個元素進行操作姥份。

map 算子

如果是mapPartition算子,由于一個task處理一個RDD的partition年碘,那么一個task只會執(zhí)行一次function澈歉,function一次接收所有的partition數(shù)據(jù),效率比較高盛泡。

mapPartition 算子

比如闷祥,當要把RDD中的所有數(shù)據(jù)通過JDBC寫入數(shù)據(jù),如果使用map算子傲诵,那么需要對RDD中的每一個元素都創(chuàng)建一個數(shù)據(jù)庫連接凯砍,這樣對資源的消耗很大,如果使用mapPartitions算子拴竹,那么針對一個分區(qū)的數(shù)據(jù)悟衩,只需要建立一個數(shù)據(jù)庫連接

mapPartitions算子也存在一些缺點:對于普通的map操作栓拜,一次處理一條數(shù)據(jù)座泳,如果在處理了2000條數(shù)據(jù)后內存不足,那么可以將已經處理完的2000條數(shù)據(jù)從內存中垃圾回收掉幕与;但是如果使用mapPartitions算子挑势,但數(shù)據(jù)量非常大時,function一次處理一個分區(qū)的數(shù)據(jù)啦鸣,如果一旦內存不足潮饱,此時無法回收內存,就可能會OOM诫给,即內存溢出香拉。

因此,mapPartitions算子適用于數(shù)據(jù)量不是特別大的時候中狂,此時使用mapPartitions算子對性能的提升效果還是不錯的凫碌。(當數(shù)據(jù)量很大的時候,一旦使用mapPartitions算子胃榕,就會直接OOM)

在項目中盛险,應該首先估算一下RDD的數(shù)據(jù)量、每個partition的數(shù)據(jù)量,以及分配給每個Executor的內存資源枉层,如果資源允許泉褐,可以考慮使用mapPartitions算子代替map。

  • foreachPartition

rrd.foreache(_....) 表示每一個元素

rrd.forPartitions(_....) 表示每個分區(qū)的數(shù)據(jù)組成的迭代器

在生產環(huán)境中鸟蜡,通常使用foreachPartition算子來完成數(shù)據(jù)庫的寫入,通過foreachPartition算子的特性挺邀,可以優(yōu)化寫數(shù)據(jù)庫的性能揉忘。

如果使用foreach算子完成數(shù)據(jù)庫的操作,由于foreach算子是遍歷RDD的每條數(shù)據(jù)端铛,因此泣矛,每條數(shù)據(jù)都會建立一個數(shù)據(jù)庫連接,這是對資源的極大浪費禾蚕,因此您朽,對于寫數(shù)據(jù)庫操作,我們應當使用foreachPartition算子换淆。

與mapPartitions算子非常相似哗总,foreachPartition是將RDD的每個分區(qū)作為遍歷對象,一次處理一個分區(qū)的數(shù)據(jù)倍试,也就是說讯屈,如果涉及數(shù)據(jù)庫的相關操作,一個分區(qū)的數(shù)據(jù)只需要創(chuàng)建一次數(shù)據(jù)庫連接县习,如下圖所示:

foreachPartition 算子

使用了foreachPartition 算子后涮母,可以獲得以下的性能提升:

  1. 對于我們寫的function函數(shù),一次處理一整個分區(qū)的數(shù)據(jù)躁愿;
  2. 對于一個分區(qū)內的數(shù)據(jù)叛本,創(chuàng)建唯一的數(shù)據(jù)庫連接;
  3. 只需要向數(shù)據(jù)庫發(fā)送一次SQL語句和多組參數(shù)彤钟;

在生產環(huán)境中来候,全部都會使用foreachPartition算子完成數(shù)據(jù)庫操作。foreachPartition算子存在一個問題,與mapPartitions算子類似傲武,如果一個分區(qū)的數(shù)據(jù)量特別大若治,可能會造成OOM,即內存溢出剧防。

5. filter+coalesce/repartition(減少分區(qū))

在Spark任務中我們經常會使用filter算子完成RDD中數(shù)據(jù)的過濾,在任務初始階段辫樱,從各個分區(qū)中加載到的數(shù)據(jù)量是相近的峭拘,但是一旦進過filter過濾后,每個分區(qū)的數(shù)據(jù)量有可能會存在較大差異,如下圖所示:

分區(qū)數(shù)據(jù)過濾結果

根據(jù)上圖我們可以發(fā)現(xiàn)兩個問題:

  1. 每個partition的數(shù)據(jù)量變小了鸡挠,如果還按照之前與partition相等的task個數(shù)去處理當前數(shù)據(jù)辉饱,有點浪費task的計算資源;

  2. 每個partition的數(shù)據(jù)量不一樣拣展,會導致后面的每個task處理每個partition數(shù)據(jù)的時候彭沼,每個task要處理的數(shù)據(jù)量不同,這很有可能導致數(shù)據(jù)傾斜問題备埃。

如上圖所示姓惑,第二個分區(qū)的數(shù)據(jù)過濾后只剩100條,而第三個分區(qū)的數(shù)據(jù)過濾后剩下800條按脚,在相同的處理邏輯下于毙,第二個分區(qū)對應的task處理的數(shù)據(jù)量與第三個分區(qū)對應的task處理的數(shù)據(jù)量差距達到了8倍,這也會導致運行速度可能存在數(shù)倍的差距辅搬,這也就是數(shù)據(jù)傾斜問題唯沮。

針對上述的兩個問題,我們分別進行分析:

  1. 針對第一個問題堪遂,既然分區(qū)的數(shù)據(jù)量變小了介蛉,我們希望可以對分區(qū)數(shù)據(jù)進行重新分配,比如將原來4個分區(qū)的數(shù)據(jù)轉化到2個分區(qū)中蚤氏,這樣只需要用后面的兩個task進行處理即可甘耿,避免了資源的浪費。

  2. 針對第二個問題竿滨,解決方法和第一個問題的解決方法非常相似佳恬,對分區(qū)數(shù)據(jù)重新分配,讓每個partition中的數(shù)據(jù)量差不多于游,這就避免了數(shù)據(jù)傾斜問題毁葱。

那么具體應該如何實現(xiàn)上面的解決思路?我們需要coalesce算子贰剥。

repartition與coalesce都可以用來進行重分區(qū)倾剿,其中repartition只是coalesce接口中shuffle為true的簡易實現(xiàn),coalesce默認情況下不進行shuffle蚌成,但是可以通過參數(shù)進行設置前痘。

假設我們希望將原本的分區(qū)個數(shù)A通過重新分區(qū)變?yōu)锽,那么有以下幾種情況:

  1. A > B(多數(shù)分區(qū)合并為少數(shù)分區(qū))
  • A與B相差值不大

    此時使用coalesce即可担忧,無需shuffle過程芹缔。

  • A與B相差值很大

    此時可以使用coalesce并且不啟用shuffle過程,但是會導致合并過程性能低下瓶盛,所以推薦設置coalesce的第二個參數(shù)為true最欠,即啟動shuffle過程示罗。

  1. A < B(少數(shù)分區(qū)分解為多數(shù)分區(qū))

此時使用repartition即可,如果使用coalesce需要將shuffle設置為true芝硬,否則coalesce無效蚜点。

我們可以在filter操作之后,使用coalesce算子針對每個partition的數(shù)據(jù)量各不相同的情況拌阴,壓縮partition的數(shù)量绍绘,而且讓每個partition的數(shù)據(jù)量盡量均勻緊湊,以便于后面的task進行計算操作迟赃,在某種程度上能夠在一定程度上提升性能脯倒。

注意:local模式是進程內模擬集群運行,已經對并行度和分區(qū)數(shù)量有了一定的內部優(yōu)化捺氢,因此不用去設置并行度和分區(qū)數(shù)量。

6. 并行度設置

Spark作業(yè)中的并行度指各個stage的task的數(shù)量剪撬。

如果并行度設置不合理而導致并行度過低摄乒,會導致資源的極大浪費,例如残黑,20個Executor馍佑,每個Executor分配3個CPU core,而Spark作業(yè)有40個task梨水,這樣每個Executor分配到的task個數(shù)是2個拭荤,這就使得每個Executor有一個CPU core空閑,導致資源的浪費疫诽。

理想的并行度設置舅世,應該是讓并行度與資源相匹配,簡單來說就是在資源允許的前提下奇徒,并行度要設置的盡可能大雏亚,達到可以充分利用集群資源。合理的設置并行度摩钙,可以提升整個Spark作業(yè)的性能和運行速度罢低。

Spark官方推薦,task數(shù)量應該設置為Spark作業(yè)總CPU core數(shù)量的2~3倍胖笛。之所以沒有推薦task數(shù)量與CPU core總數(shù)相等网持,是因為task的執(zhí)行時間不同,有的task執(zhí)行速度快而有的task執(zhí)行速度慢长踊,如果task數(shù)量與CPU core總數(shù)相等功舀,那么執(zhí)行快的task執(zhí)行完成后,會出現(xiàn)CPU core空閑的情況之斯。如果task數(shù)量設置為CPU core總數(shù)的2~3倍日杈,那么一個task執(zhí)行完畢后遣铝,CPU core會立刻執(zhí)行下一個task,降低了資源的浪費莉擒,同時提升了Spark作業(yè)運行的效率酿炸。

Spark作業(yè)并行度的設置如下:

val conf = new SparkConf().set("spark.default.parallelism", "500")

原則:讓 cpu 的 Core(cpu 核心數(shù)) 充分利用起來, 如有100個 Core,那么并行度可以設置為200~300涨冀。

7. repartition/coalesce調節(jié)并行度

我們知道 Spark 中有并行度的調節(jié)策略填硕,但是,并行度的設置對于Spark SQL是不生效的鹿鳖,用戶設置的并行度只對于Spark SQL以外的所有Spark的stage生效扁眯。

Spark SQL的并行度不允許用戶自己指定,Spark SQL自己會默認根據(jù)hive表對應的HDFS文件的split個數(shù)自動設置Spark SQL所在的那個stage的并行度翅帜,用戶自己通 spark.default.parallelism 參數(shù)指定的并行度姻檀,只會在沒Spark SQL的stage中生效。

由于Spark SQL所在stage的并行度無法手動設置涝滴,如果數(shù)據(jù)量較大绣版,并且此stage中后續(xù)的transformation操作有著復雜的業(yè)務邏輯,而Spark SQL自動設置的task數(shù)量很少歼疮,這就意味著每個task要處理為數(shù)不少的數(shù)據(jù)量杂抽,然后還要執(zhí)行非常復雜的處理邏輯,這就可能表現(xiàn)為第一個有Spark SQL的stage速度很慢韩脏,而后續(xù)的沒有Spark SQL的stage運行速度非乘豸铮快。

為了解決Spark SQL無法設置并行度和task數(shù)量的問題赡矢,我們可以使用repartition算子杭朱。

repartition 算子使用前后對比圖如下:

repartition 算子使用前后對比圖

Spark SQL這一步的并行度和task數(shù)量肯定是沒有辦法去改變了,但是济竹,對于Spark SQL查詢出來的RDD痕檬,立即使用repartition算子,去重新進行分區(qū)送浊,這樣可以重新分區(qū)為多個partition梦谜,從repartition之后的RDD操作,由于不再涉及Spark SQL袭景,因此stage的并行度就會等于你手動設置的值唁桩,這樣就避免了Spark SQL所在的stage只能用少量的task去處理大量數(shù)據(jù)并執(zhí)行復雜的算法邏輯。使用repartition算子的前后對比如上圖所示耸棒。

8. reduceByKey本地預聚合

reduceByKey相較于普通的shuffle操作一個顯著的特點就是會進行map端的本地聚合荒澡,map端會先對本地的數(shù)據(jù)進行combine操作,然后將數(shù)據(jù)寫入給下個stage的每個task創(chuàng)建的文件中与殃,也就是在map端单山,對每一個key對應的value碍现,執(zhí)行reduceByKey算子函數(shù)。

reduceByKey算子的執(zhí)行過程如下圖所示:

reduceByKey 算子執(zhí)行過程

使用reduceByKey對性能的提升如下:

  1. 本地聚合后米奸,在map端的數(shù)據(jù)量變少昼接,減少了磁盤IO,也減少了對磁盤空間的占用悴晰;
  2. 本地聚合后慢睡,下一個stage拉取的數(shù)據(jù)量變少,減少了網絡傳輸?shù)臄?shù)據(jù)量铡溪;
  3. 本地聚合后漂辐,在reduce端進行數(shù)據(jù)緩存的內存占用減少;
  4. 本地聚合后棕硫,在reduce端進行聚合的數(shù)據(jù)量減少髓涯。

基于reduceByKey的本地聚合特征,我們應該考慮使用reduceByKey代替其他的shuffle算子哈扮,例如groupByKey复凳。

groupByKey與reduceByKey的運行原理如下圖1和圖2所示:

圖1:groupByKey原理
圖2:reduceByKey原理

根據(jù)上圖可知,groupByKey不會進行map端的聚合灶泵,而是將所有map端的數(shù)據(jù)shuffle到reduce端,然后在reduce端進行數(shù)據(jù)的聚合操作对途。由于reduceByKey有map端聚合的特性赦邻,使得網絡傳輸?shù)臄?shù)據(jù)量減小,因此效率要明顯高于groupByKey实檀。

9. 使用持久化+checkpoint

Spark持久化在大部分情況下是沒有問題的惶洲,但是有時數(shù)據(jù)可能會丟失,如果數(shù)據(jù)一旦丟失膳犹,就需要對丟失的數(shù)據(jù)重新進行計算恬吕,計算完后再緩存和使用,為了避免數(shù)據(jù)的丟失须床,可以選擇對這個RDD進行checkpoint铐料,也就是將數(shù)據(jù)持久化一份到容錯的文件系統(tǒng)上(比如HDFS)

一個RDD緩存并checkpoint后豺旬,如果一旦發(fā)現(xiàn)緩存丟失钠惩,就會優(yōu)先查看checkpoint數(shù)據(jù)存不存在,如果有族阅,就會使用checkpoint數(shù)據(jù)篓跛,而不用重新計算。也即是說坦刀,checkpoint可以視為cache的保障機制愧沟,如果cache失敗蔬咬,就使用checkpoint的數(shù)據(jù)。

使用checkpoint的優(yōu)點在于提高了Spark作業(yè)的可靠性沐寺,一旦緩存出現(xiàn)問題林艘,不需要重新計算數(shù)據(jù),缺點在于芽丹,checkpoint時需要將數(shù)據(jù)寫入HDFS等文件系統(tǒng)北启,對性能的消耗較大

持久化設置如下:

sc.setCheckpointDir(‘HDFS’)
rdd.cache/persist(memory_and_disk)
rdd.checkpoint

10. 使用廣播變量

默認情況下拔第,task中的算子中如果使用了外部的變量咕村,每個task都會獲取一份變量的復本,這就造成了內存的極大消耗蚊俺。一方面懈涛,如果后續(xù)對RDD進行持久化,可能就無法將RDD數(shù)據(jù)存入內存泳猬,只能寫入磁盤批钠,磁盤IO將會嚴重消耗性能;另一方面得封,task在創(chuàng)建對象的時候埋心,也許會發(fā)現(xiàn)堆內存無法存放新創(chuàng)建的對象,這就會導致頻繁的GC忙上,GC會導致工作線程停止拷呆,進而導致Spark暫停工作一段時間,嚴重影響Spark性能疫粥。

假設當前任務配置了20個Executor茬斧,指定500個task,有一個20M的變量被所有task共用梗逮,此時會在500個task中產生500個副本项秉,耗費集群10G的內存,如果使用了廣播變量慷彤, 那么每個Executor保存一個副本娄蔼,一共消耗400M內存,內存消耗減少了5倍底哗。

廣播變量在每個Executor保存一個副本贷屎,此Executor的所有task共用此廣播變量,這讓變量產生的副本數(shù)量大大減少艘虎。

在初始階段唉侄,廣播變量只在Driver中有一份副本。task在運行的時候野建,想要使用廣播變量中的數(shù)據(jù)属划,此時首先會在自己本地的Executor對應的BlockManager中嘗試獲取變量恬叹,如果本地沒有,BlockManager就會從Driver或者其他節(jié)點的BlockManager上遠程拉取變量的復本同眯,并由本地的BlockManager進行管理绽昼;之后此Executor的所有task都會直接從本地的BlockManager中獲取變量。

對于多個Task可能會共用的數(shù)據(jù)可以廣播到每個Executor上:

val 廣播變量名= sc.broadcast(會被各個Task用到的變量,即需要廣播的變量)

廣播變量名.value//獲取廣播變量

11. 使用Kryo序列化

默認情況下须蜗,Spark使用Java的序列化機制硅确。Java的序列化機制使用方便,不需要額外的配置明肮,在算子中使用的變量實現(xiàn)Serializable接口即可菱农,但是,Java序列化機制的效率不高柿估,序列化速度慢并且序列化后的數(shù)據(jù)所占用的空間依然較大循未。

Spark官方宣稱Kryo序列化機制比Java序列化機制性能提高10倍左右,Spark之所以沒有默認使用Kryo作為序列化類庫秫舌,是因為它不支持所有對象的序列化的妖,同時Kryo需要用戶在使用前注冊需要序列化的類型,不夠方便足陨,但從Spark 2.0.0版本開始嫂粟,簡單類型、簡單類型數(shù)組墨缘、字符串類型的Shuffling RDDs 已經默認使用Kryo序列化方式了赋元。

Kryo序列化注冊方式的代碼如下:

public class MyKryoRegistrator implements KryoRegistrator{
  @Override
  public void registerClasses(Kryo kryo){
    kryo.register(StartupReportLogs.class);
  }
}

配置Kryo序列化方式的代碼如下:

//創(chuàng)建SparkConf對象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用Kryo序列化庫
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");  
//在Kryo序列化庫中注冊自定義的類集合
conf.set("spark.kryo.registrator", "bigdata.com.MyKryoRegistrator"); 

本文檔首發(fā)于公眾號:五分鐘學大數(shù)據(jù),回復【666】即可獲得全套大數(shù)據(jù)筆面試教程

Spark調優(yōu)之Shuffle調優(yōu)

1. map和reduce端緩沖區(qū)大小

在Spark任務運行過程中飒房,如果shuffle的map端處理的數(shù)據(jù)量比較大,但是map端緩沖的大小是固定的媚值,可能會出現(xiàn)map端緩沖數(shù)據(jù)頻繁spill溢寫到磁盤文件中的情況狠毯,使得性能非常低下,通過調節(jié)map端緩沖的大小褥芒,可以避免頻繁的磁盤IO操作嚼松,進而提升Spark任務的整體性能。

map端緩沖的默認配置是32KB锰扶,如果每個task處理640KB的數(shù)據(jù)献酗,那么會發(fā)生640/32 = 20次溢寫,如果每個task處理64000KB的數(shù)據(jù)坷牛,即會發(fā)生64000/32=2000次溢寫罕偎,這對于性能的影響是非常嚴重的。

map端緩沖的配置方法:

val conf = new SparkConf()
  .set("spark.shuffle.file.buffer", "64")

Spark Shuffle過程中京闰,shuffle reduce task的buffer緩沖區(qū)大小決定了reduce task每次能夠緩沖的數(shù)據(jù)量颜及,也就是每次能夠拉取的數(shù)據(jù)量甩苛,如果內存資源較為充足,適當增加拉取數(shù)據(jù)緩沖區(qū)的大小俏站,可以減少拉取數(shù)據(jù)的次數(shù)讯蒲,也就可以減少網絡傳輸?shù)拇螖?shù),進而提升性能肄扎。

reduce端數(shù)據(jù)拉取緩沖區(qū)的大小可以通過spark.reducer.maxSizeInFlight參數(shù)進行設置墨林,默認為48MB。該參數(shù)的設置方法如下:

reduce端數(shù)據(jù)拉取緩沖區(qū)配置:

val conf = new SparkConf()
  .set("spark.reducer.maxSizeInFlight", "96")

2. reduce端重試次數(shù)和等待時間間隔

Spark Shuffle過程中犯祠,reduce task拉取屬于自己的數(shù)據(jù)時旭等,如果因為網絡異常等原因導致失敗會自動進行重試。對于那些包含了特別耗時的shuffle操作的作業(yè)雷则,建議增加重試最大次數(shù)(比如60次)辆雾,以避免由于JVM的full gc或者網絡不穩(wěn)定等因素導致的數(shù)據(jù)拉取失敗。在實踐中發(fā)現(xiàn)月劈,對于針對超大數(shù)據(jù)量(數(shù)十億~上百億)的shuffle過程度迂,調節(jié)該參數(shù)可以大幅度提升穩(wěn)定性。

reduce端拉取數(shù)據(jù)重試次數(shù)可以通過spark.shuffle.io.maxRetries參數(shù)進行設置猜揪,該參數(shù)就代表了可以重試的最大次數(shù)惭墓。如果在指定次數(shù)之內拉取還是沒有成功,就可能會導致作業(yè)執(zhí)行失敗而姐,默認為3腊凶,該參數(shù)的設置方法如下:

reduce端拉取數(shù)據(jù)重試次數(shù)配置:

val conf = new SparkConf()
  .set("spark.shuffle.io.maxRetries", "6")

Spark Shuffle過程中,reduce task拉取屬于自己的數(shù)據(jù)時拴念,如果因為網絡異常等原因導致失敗會自動進行重試钧萍,在一次失敗后,會等待一定的時間間隔再進行重試政鼠,可以通過加大間隔時長(比如60s)风瘦,以增加shuffle操作的穩(wěn)定性

reduce端拉取數(shù)據(jù)等待間隔可以通過spark.shuffle.io.retryWait參數(shù)進行設置公般,默認值為5s万搔,該參數(shù)的設置方法如下:

reduce端拉取數(shù)據(jù)等待間隔配置:

val conf = new SparkConf()
  .set("spark.shuffle.io.retryWait", "60s")

3. bypass機制開啟閾值

對于SortShuffleManager,如果shuffle reduce task的數(shù)量小于某一閾值則shuffle write過程中不會進行排序操作官帘,而是直接按照未經優(yōu)化的HashShuffleManager的方式去寫數(shù)據(jù)瞬雹,但是最后會將每個task產生的所有臨時磁盤文件都合并成一個文件,并會創(chuàng)建單獨的索引文件刽虹。

當你使用SortShuffleManager時酗捌,如果的確不需要排序操作,那么建議將這個參數(shù)調大一些,大于shuffle read task的數(shù)量意敛,那么此時map-side就不會進行排序了馅巷,減少了排序的性能開銷,但是這種方式下草姻,依然會產生大量的磁盤文件钓猬,因此shuffle write性能有待提高。

SortShuffleManager排序操作閾值的設置可以通過spark.shuffle.sort.bypassMergeThreshold這一參數(shù)進行設置撩独,默認值為200敞曹,該參數(shù)的設置方法如下:

reduce端拉取數(shù)據(jù)等待間隔配置:

val conf = new SparkConf()
  .set("spark.shuffle.sort.bypassMergeThreshold", "400")

十一、Spark大廠面試真題

詳見:# Spark面試題匯總及答案(推薦收藏)

http://www.reibang.com/p/8cb8ef2beee0

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末综膀,一起剝皮案震驚了整個濱河市澳迫,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌剧劝,老刑警劉巖橄登,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異讥此,居然都是意外死亡拢锹,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進店門萄喳,熙熙樓的掌柜王于貴愁眉苦臉地迎上來卒稳,“玉大人,你說我怎么就攤上這事他巨〕淇樱” “怎么了?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵染突,是天一觀的道長捻爷。 經常有香客問我,道長份企,這世上最難降的妖魔是什么也榄? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮薪棒,結果婚禮上,老公的妹妹穿的比我還像新娘榕莺。我一直安慰自己俐芯,他們只是感情好,可當我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布钉鸯。 她就那樣靜靜地躺著吧史,像睡著了一般。 火紅的嫁衣襯著肌膚如雪唠雕。 梳的紋絲不亂的頭發(fā)上贸营,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天吨述,我揣著相機與錄音,去河邊找鬼钞脂。 笑死揣云,一個胖子當著我的面吹牛,可吹牛的內容都是我干的冰啃。 我是一名探鬼主播邓夕,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼阎毅!你這毒婦竟也來了焚刚?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤扇调,失蹤者是張志新(化名)和其女友劉穎矿咕,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體狼钮,經...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡碳柱,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了燃领。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片士聪。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖猛蔽,靈堂內的尸體忽然破棺而出剥悟,到底是詐尸還是另有隱情,我是刑警寧澤曼库,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布区岗,位于F島的核電站,受9級特大地震影響毁枯,放射性物質發(fā)生泄漏慈缔。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一种玛、第九天 我趴在偏房一處隱蔽的房頂上張望藐鹤。 院中可真熱鬧,春花似錦赂韵、人聲如沸娱节。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽肄满。三九已至,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間稠歉,已是汗流浹背掰担。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留怒炸,地道東北人带饱。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像横媚,于是被迫代替她去往敵國和親纠炮。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,927評論 2 355

推薦閱讀更多精彩內容