Spark Shuffle原理、Shuffle操作問題解決和參數(shù)調(diào)優(yōu)

目錄:

1.shuffle原理
  1.1 mapreduce的shuffle原理
    1.1.1 map task端操作
    1.1.2 reduce task端操作
   1.2 spark現(xiàn)在的SortShuffleManager

2. Shuffle操作問題解決
  2.1 數(shù)據(jù)傾斜原理
  2.2 數(shù)據(jù)傾斜問題發(fā)現(xiàn)與解決
  2.3 數(shù)據(jù)傾斜解決方案

3. spark RDD中的shuffle算子
   3.1 去重
   3.2 聚合
   3.3 排序
   3.4 重分區(qū)
   3.5 集合操作和表操作

4. spark shuffle參數(shù)調(diào)優(yōu)

正文

1.shuffle原理

概述:Shuffle描述著數(shù)據(jù)從map task輸出到reduce task輸入的這段過程又跛。在分布式情況下,reduce task需要跨節(jié)點(diǎn)去拉取其它節(jié)點(diǎn)上的map task結(jié)果若治。這一過程將會(huì)產(chǎn)生網(wǎng)絡(luò)資源消耗和內(nèi)存慨蓝,磁盤IO的消耗。

1.1 mapreduce的shuffle原理

1.1.1 map task端操作

每個(gè)map task都有一個(gè)內(nèi)存緩沖區(qū)(默認(rèn)是100MB)端幼,存儲(chǔ)著map的輸出結(jié)果礼烈,當(dāng)緩沖區(qū)快滿的時(shí)候需要將緩沖區(qū)的數(shù)據(jù)以一個(gè)臨時(shí)文件的方式存放到磁盤,當(dāng)整個(gè)map task結(jié)束后再對磁盤中這個(gè)map task產(chǎn)生的所有臨時(shí)文件做合并婆跑,生成最終的正式輸出文件此熬,然后等待reduce task來拉數(shù)據(jù)。

Spill過程:這個(gè)從內(nèi)存往磁盤寫數(shù)據(jù)的過程被稱為Spill,中文可譯為溢寫犀忱。整個(gè)緩沖區(qū)有個(gè)溢寫的比例spill.percent(默認(rèn)是0.8)募谎,當(dāng)達(dá)到閥值時(shí)map task 可以繼續(xù)往剩余的memory寫,同時(shí)溢寫線程鎖定已用memory阴汇,先對key(序列化的字節(jié))做排序,如果client程序設(shè)置了Combiner数冬,那么在溢寫的過程中就會(huì)進(jìn)行局部聚合。

Merge過程:每次溢寫都會(huì)生成一個(gè)臨時(shí)文件搀庶,在map task真正完成時(shí)會(huì)將這些文件歸并成一個(gè)文件拐纱,這個(gè)過程叫做Merge。

1.1.2 reduce task端操作

當(dāng)某臺(tái)TaskTracker上的所有map task執(zhí)行完成哥倔,對應(yīng)節(jié)點(diǎn)的reduce task開始啟動(dòng)秸架,簡單地說,此階段就是不斷地拉取(Fetcher)每個(gè)map task所在節(jié)點(diǎn)的最終結(jié)果咆蒿,然后不斷地做merge形成reduce task的輸入文件东抹。

Copy過程:Reduce進(jìn)程啟動(dòng)一些數(shù)據(jù)copy線程(Fetcher)通過HTTP協(xié)議拉取TaskTracker的map階段輸出文件

Merge過程:Copy過來的數(shù)據(jù)會(huì)先放入內(nèi)存緩沖區(qū)(基于JVM的heap size設(shè)置),如果內(nèi)存緩沖區(qū)不足也會(huì)發(fā)生map task的spill(sort 默認(rèn),combine 可選)沃测,多個(gè)溢寫文件時(shí)會(huì)發(fā)生map task的merge

下面總結(jié)下mapreduce的關(guān)鍵詞:

  • 存儲(chǔ)相關(guān)的有:內(nèi)存緩沖區(qū)缭黔,默認(rèn)大小,溢寫閥值
  • 主要過程:溢寫(spill)芽突,排序试浙,合并(combine),歸并(Merge),Copy或Fetch
  • 相關(guān)參數(shù):內(nèi)存緩沖區(qū)默認(rèn)大小寞蚌,JVM heap size田巴,spill.percent

關(guān)于排序方法:

在Map階段,k-v溢寫時(shí)挟秤,采用的正是快排壹哺;而溢出文件的合并使用的則是歸并;在Reduce階段艘刚,通過shuffle從Map獲取的文件進(jìn)行合并的時(shí)候采用的也是歸并管宵;最后階段則使用了堆排作最后的合并過程。

1.2 spark現(xiàn)在的SortShuffleManager

SortShuffleManager運(yùn)行原理

SortShuffleManager的運(yùn)行機(jī)制主要分成兩種:

  • 一種是普通運(yùn)行機(jī)制
  • 另一種是bypass運(yùn)行機(jī)制

當(dāng)shuffle read task的數(shù)量小于等于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值時(shí)(默認(rèn)為200)攀甚,就會(huì)啟用bypass機(jī)制箩朴。

普通運(yùn)行機(jī)制

下圖說明了普通的SortShuffleManager的原理。在該模式下秋度,數(shù)據(jù)會(huì)先寫入一個(gè)內(nèi)存數(shù)據(jù)結(jié)構(gòu)中炸庞,此時(shí)根據(jù)不同的shuffle算子,可能選用不同的數(shù)據(jù)結(jié)構(gòu)荚斯。如果是reduceByKey這種聚合類的shuffle算子埠居,那么會(huì)選用Map數(shù)據(jù)結(jié)構(gòu)查牌, 直接寫入內(nèi)存。接著滥壕,每寫一條數(shù)據(jù)進(jìn)入內(nèi)存數(shù)據(jù)結(jié)構(gòu)之后纸颜,就會(huì)判斷一下,是否達(dá)到了某個(gè)臨界閾值绎橘。如果達(dá)到臨界閾值的話胁孙,那么就會(huì)嘗試將內(nèi)存數(shù)據(jù)結(jié)構(gòu)中的數(shù)據(jù)溢寫到磁盤,然后清空內(nèi)存數(shù)據(jù)結(jié)構(gòu)金踪。

在溢寫到磁盤文件之前浊洞,會(huì)先根據(jù)key對內(nèi)存數(shù)據(jù)結(jié)構(gòu)中已有的數(shù)據(jù)進(jìn)行排序牵敷。排序過后胡岔,會(huì)分批將數(shù)據(jù)寫入磁盤文件。默認(rèn)的batch數(shù)量是10000條枷餐,也就是說靶瘸,排序好的數(shù)據(jù),會(huì)以每批1萬條數(shù)據(jù)的形式分批寫入磁盤文件毛肋。寫入磁盤文件是通過Java的BufferedOutputStream實(shí)現(xiàn)的怨咪。BufferedOutputStream是Java的緩沖輸出流,首先會(huì)將數(shù)據(jù)緩沖在內(nèi)存中润匙,當(dāng)內(nèi)存緩沖滿溢之后再一次寫入磁盤文件中诗眨,這樣可以減少磁盤IO次數(shù),提升性能孕讳。

一個(gè)task將所有數(shù)據(jù)寫入內(nèi)存數(shù)據(jù)結(jié)構(gòu)的過程中匠楚,會(huì)發(fā)生多次磁盤溢寫操作,也就會(huì)產(chǎn)生多個(gè)臨時(shí)文件厂财。最后會(huì)將之前所有的臨時(shí)磁盤文件都進(jìn)行合并芋簿,這就是merge過程,此時(shí)會(huì)將之前所有臨時(shí)磁盤文件中的數(shù)據(jù)讀取出來璃饱,然后依次寫入最終的磁盤文件之中与斤。此外,由于一個(gè)task就只對應(yīng)一個(gè)磁盤文件荚恶,也就意味著該task為下游stage的task準(zhǔn)備的數(shù)據(jù)都在這一個(gè)文件中撩穿,因此還會(huì)單獨(dú)寫一份索引文件,其中標(biāo)識了下游各個(gè)task的數(shù)據(jù)在文件中的start offset與end offset谒撼。

SortShuffleManager由于有一個(gè)磁盤文件merge的過程食寡,因此大大減少了文件數(shù)量。比如第一個(gè)stage有50個(gè)task嗤栓,總共有10個(gè)Executor冻河,每個(gè)Executor執(zhí)行5個(gè)task箍邮,而第二個(gè)stage有100個(gè)task。由于每個(gè)task最終只有一個(gè)磁盤文件叨叙,因此此時(shí)每個(gè)Executor上只有5個(gè)磁盤文件锭弊,所有Executor只有50個(gè)磁盤文件。


bypass運(yùn)行機(jī)制

下圖說明了bypass SortShuffleManager的原理擂错。bypass運(yùn)行機(jī)制的觸發(fā)條件如下:

  • shuffle map task數(shù)量小于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值(默認(rèn)為200)味滞。
  • 不是排序類的shuffle算子(比如reduceByKey)。

此時(shí)task會(huì)為每個(gè)下游task都創(chuàng)建一個(gè)臨時(shí)磁盤文件钮呀,并將數(shù)據(jù)按key進(jìn)行hash然后根據(jù)key的hash值剑鞍,將key寫入對應(yīng)的磁盤文件之中。當(dāng)然爽醋,寫入磁盤文件時(shí)也是先寫入內(nèi)存緩沖蚁署,緩沖寫滿之后再溢寫到磁盤文件的。最后蚂四,同樣會(huì)將所有臨時(shí)磁盤文件都合并成一個(gè)磁盤文件光戈,并創(chuàng)建一個(gè)單獨(dú)的索引文件。

該過程的磁盤寫機(jī)制其實(shí)跟未經(jīng)優(yōu)化的HashShuffleManager是一模一樣的遂赠,因?yàn)槎家獎(jiǎng)?chuàng)建數(shù)量驚人的磁盤文件久妆,只是在最后會(huì)做一個(gè)磁盤文件的合并而已。因此少量的最終磁盤文件跷睦,也讓該機(jī)制相對未經(jīng)優(yōu)化的HashShuffleManager來說筷弦,shuffle read的性能會(huì)更好。

而該機(jī)制與普通SortShuffleManager運(yùn)行機(jī)制的不同在于:

  • 第一抑诸,磁盤寫機(jī)制不同烂琴;
  • 第二,不會(huì)進(jìn)行排序哼鬓。

也就是說监右,啟用該機(jī)制的最大好處在于: shuffle write過程中,不需要進(jìn)行數(shù)據(jù)的排序操作异希,也就節(jié)省掉了這部分的性能開銷健盒。

2. Shuffle操作問題解決

2.1 數(shù)據(jù)傾斜原理

在進(jìn)行shuffle的時(shí)候,必須將各個(gè)節(jié)點(diǎn)上相同的key拉取到某個(gè)節(jié)點(diǎn)上的一個(gè)task來進(jìn)行處理称簿,此時(shí)如果某個(gè)key對應(yīng)的數(shù)據(jù)量特別大的話扣癣,就會(huì)發(fā)生數(shù)據(jù)傾斜

2.2 數(shù)據(jù)傾斜問題發(fā)現(xiàn)與定位

通過Spark Web UI來查看當(dāng)前運(yùn)行的stage各個(gè)task分配的數(shù)據(jù)量,從而進(jìn)一步確定是不是task分配的數(shù)據(jù)不均勻?qū)е铝藬?shù)據(jù)傾斜憨降。
知道數(shù)據(jù)傾斜發(fā)生在哪一個(gè)stage之后父虑,接著我們就需要根據(jù)stage劃分原理,推算出來發(fā)生傾斜的那個(gè)stage對應(yīng)代碼中的哪一部分授药,這部分代碼中肯定會(huì)有一個(gè)shuffle類算子士嚎。通過countByKey查看各個(gè)key的分布呜魄。

2.3 數(shù)據(jù)傾斜解決方案

2.3.1 過濾少數(shù)導(dǎo)致傾斜的key
2.3.2 提高shuffle操作的并行度
2.3.3 局部聚合和全局聚合

方案實(shí)現(xiàn)思路:這個(gè)方案的核心實(shí)現(xiàn)思路就是進(jìn)行兩階段聚合。第一次是局部聚合莱衩,先給每個(gè)key都打上一個(gè)隨機(jī)數(shù)爵嗅,比如10以內(nèi)的隨機(jī)數(shù),此時(shí)原先一樣的key就變成不一樣的了笨蚁,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1)睹晒,就會(huì)變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接著對打上隨機(jī)數(shù)后的數(shù)據(jù)括细,執(zhí)行reduceByKey等聚合操作伪很,進(jìn)行局部聚合,那么局部聚合結(jié)果奋单,就會(huì)變成了(1_hello, 2) (2_hello, 2)锉试。然后將各個(gè)key的前綴給去掉,就會(huì)變成(hello,2)(hello,2)辱匿,再次進(jìn)行全局聚合操作键痛,就可以得到最終結(jié)果了炫彩,比如(hello, 4)匾七。
代碼:

2.3.4 將reduce join轉(zhuǎn)為map join((小表幾百M(fèi)或者一兩G))

方案實(shí)現(xiàn)思路:不使用join算子進(jìn)行連接操作,而使用Broadcast變量與map類算子實(shí)現(xiàn)join操作江兢,進(jìn)而完全規(guī)避掉shuffle類的操作昨忆,徹底避免數(shù)據(jù)傾斜的發(fā)生和出現(xiàn)。將較小RDD中的數(shù)據(jù)直接通過collect算子拉取到Driver端的內(nèi)存中來杉允,然后對其創(chuàng)建一個(gè)Broadcast變量邑贴;接著對另外一個(gè)RDD執(zhí)行map類算子,在算子函數(shù)內(nèi)叔磷,從Broadcast變量中獲取較小RDD的全量數(shù)據(jù)拢驾,與當(dāng)前RDD的每一條數(shù)據(jù)按照連接key進(jìn)行比對,如果連接key相同的話改基,那么就將兩個(gè)RDD的數(shù)據(jù)用你需要的方式連接起來繁疤。

代碼:

2.3.5 采樣傾斜key并分拆join操作(join的兩表都很大,但僅一個(gè)RDD的幾個(gè)key的數(shù)據(jù)量過大)

方案實(shí)現(xiàn)思路:

  • 對包含少數(shù)幾個(gè)數(shù)據(jù)量過大的key的那個(gè)RDD秕狰,通過sample算子采樣出一份樣本來稠腊,然后統(tǒng)計(jì)一下每個(gè)key的數(shù)量,計(jì)算出來數(shù)據(jù)量最大的是哪幾個(gè)key鸣哀。
  • 然后將這幾個(gè)key對應(yīng)的數(shù)據(jù)從原來的RDD中拆分出來架忌,形成一個(gè)單獨(dú)的RDD,并給每個(gè)key都打上n以內(nèi)的隨機(jī)數(shù)作為前綴我衬,而不會(huì)導(dǎo)致傾斜的大部分key形成另外一個(gè)RDD叹放。
  • 接著將需要join的另一個(gè)RDD饰恕,也過濾出來那幾個(gè)傾斜key對應(yīng)的數(shù)據(jù)并形成一個(gè)單獨(dú)的RDD,將每條數(shù)據(jù)膨脹成n條數(shù)據(jù)井仰,這n條數(shù)據(jù)都按順序附加一個(gè)0~n的前綴懂盐,不會(huì)導(dǎo)致傾斜的大部分key也形成另外一個(gè)RDD。
  • 再將附加了隨機(jī)前綴的獨(dú)立RDD與另一個(gè)膨脹n倍的獨(dú)立RDD進(jìn)行join糕档,此時(shí)就可以將原先相同的key打散成n份莉恼,分散到多個(gè)task中去進(jìn)行join了。
  • 而另外兩個(gè)普通的RDD就照常join即可速那。
  • 最后將兩次join的結(jié)果使用union算子合并起來即可俐银,就是最終的join結(jié)果。
2.3.6 使用隨機(jī)前綴和擴(kuò)容RDD進(jìn)行join(RDD中有大量的key導(dǎo)致數(shù)據(jù)傾斜)

方案實(shí)現(xiàn)思路:
將含有較多傾斜key的RDD擴(kuò)大多倍端仰,與相對分布均勻的RDD配一個(gè)隨機(jī)數(shù)捶惜。

3. spark RDD中的shuffle算子

3.1 去重:

  • def distinct()
  • def distinct(numPartitions: Int)

3.2 聚合

  • def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
  • def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
  • def groupBy[K](f: T => K, p: Partitioner):RDD[(K, Iterable[V])]
  • def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])]
  • def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner): RDD[(K, U)]
  • def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int): RDD[(K, U)]
  • def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
  • def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
  • def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]

3.3 排序

  • def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
  • def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

3.4 重分區(qū)

  • def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
  • def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null)

3.5集合或者表操作

  • def intersection(other: RDD[T]): RDD[T]
  • def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
  • def intersection(other: RDD[T], numPartitions: Int): RDD[T]
  • def subtract(other: RDD[T], numPartitions: Int): RDD[T]
  • def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
  • def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
  • def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
  • def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
  • def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
  • def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
  • def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
  • def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]



4. spark shuffle參數(shù)調(diào)優(yōu)

spark.shuffle.file.buffer

  • 默認(rèn)值:32k
  • 參數(shù)說明:該參數(shù)用于設(shè)置shuffle write task的BufferedOutputStream的buffer緩沖大小。將數(shù)據(jù)寫到磁盤文件之前荔烧,會(huì)先寫入buffer緩沖中吱七,待緩沖寫滿之后,才會(huì)溢寫到磁盤鹤竭。
  • 調(diào)優(yōu)建議:如果作業(yè)可用的內(nèi)存資源較為充足的話踊餐,可以適當(dāng)增加這個(gè)參數(shù)的大小(比如64k)臀稚,從而減少shuffle write過程中溢寫磁盤文件的次數(shù)吝岭,也就可以減少磁盤IO次數(shù),進(jìn)而提升性能吧寺。在實(shí)踐中發(fā)現(xiàn)窜管,合理調(diào)節(jié)該參數(shù),性能會(huì)有1%~5%的提升稚机。

spark.reducer.maxSizeInFlight

  • 默認(rèn)值:48m
  • 參數(shù)說明:該參數(shù)用于設(shè)置shuffle read task的buffer緩沖大小幕帆,而這個(gè)buffer緩沖決定了每次能夠拉取多少數(shù)據(jù)。
  • 調(diào)優(yōu)建議:如果作業(yè)可用的內(nèi)存資源較為充足的話赖条,可以適當(dāng)增加這個(gè)參數(shù)的大惺(比如96m),從而減少拉取數(shù)據(jù)的次數(shù)谋币,也就可以減少網(wǎng)絡(luò)傳輸?shù)拇螖?shù)仗扬,進(jìn)而提升性能。在實(shí)踐中發(fā)現(xiàn)蕾额,合理調(diào)節(jié)該參數(shù)早芭,性能會(huì)有1%~5%的提升。

spark.shuffle.io.maxRetries

  • 默認(rèn)值:3
  • 參數(shù)說明:shuffle read task從shuffle write task所在節(jié)點(diǎn)拉取屬于自己的數(shù)據(jù)時(shí)诅蝶,如果因?yàn)榫W(wǎng)絡(luò)異常導(dǎo)致拉取失敗退个,是會(huì)自動(dòng)進(jìn)行重試的募壕。該參數(shù)就代表了可以重試的最大次數(shù)。如果在指定次數(shù)之內(nèi)拉取還是沒有成功语盈,就可能會(huì)導(dǎo)致作業(yè)執(zhí)行失敗舱馅。
  • 調(diào)優(yōu)建議:對于那些包含了特別耗時(shí)的shuffle操作的作業(yè),建議增加重試最大次數(shù)(比如60次)刀荒,以避免由于JVM的full gc或者網(wǎng)絡(luò)不穩(wěn)定等因素導(dǎo)致的數(shù)據(jù)拉取失敗代嗤。在實(shí)踐中發(fā)現(xiàn),對于針對超大數(shù)據(jù)量(數(shù)十億~上百億)的shuffle過程缠借,調(diào)節(jié)該參數(shù)可以大幅度提升穩(wěn)定性干毅。

spark.shuffle.io.retryWait

  • 默認(rèn)值:5s
  • 參數(shù)說明:具體解釋同上,該參數(shù)代表了每次重試?yán)?shù)據(jù)的等待間隔泼返,默認(rèn)是5s硝逢。
  • 調(diào)優(yōu)建議:建議加大間隔時(shí)長(比如60s),以增加shuffle操作的穩(wěn)定性绅喉。

spark.shuffle.memoryFraction

  • 默認(rèn)值:0.2
  • 參數(shù)說明:該參數(shù)代表了Executor內(nèi)存中渠鸽,分配給shuffle read task進(jìn)行聚合操作的內(nèi)存比例,默認(rèn)是20%柴罐。
  • 調(diào)優(yōu)建議:在資源參數(shù)調(diào)優(yōu)中講解過這個(gè)參數(shù)徽缚。如果內(nèi)存充足,而且很少使用持久化操作丽蝎,建議調(diào)高這個(gè)比例猎拨,給shuffle read的聚合操作更多內(nèi)存,以避免由于內(nèi)存不足導(dǎo)致聚合過程中頻繁讀寫磁盤屠阻。在實(shí)踐中發(fā)現(xiàn),合理調(diào)節(jié)該參數(shù)可以將性能提升10%左右额各。

spark.shuffle.manager

  • 默認(rèn)值:sort
  • 參數(shù)說明:該參數(shù)用于設(shè)置ShuffleManager的類型国觉。Spark 1.5以后,有三個(gè)可選項(xiàng):hash虾啦、sort和tungsten-sort麻诀。HashShuffleManager是Spark 1.2以前的默認(rèn)選項(xiàng),但是Spark 1.2以及之后的版本默認(rèn)都是SortShuffleManager了傲醉。tungsten-sort與sort類似蝇闭,但是使用了tungsten計(jì)劃中的堆外內(nèi)存管理機(jī)制,內(nèi)存使用效率更高硬毕。
  • 調(diào)優(yōu)建議:由于SortShuffleManager默認(rèn)會(huì)對數(shù)據(jù)進(jìn)行排序呻引,因此如果你的業(yè)務(wù)邏輯中需要該排序機(jī)制的話,則使用默認(rèn)的SortShuffleManager就可以吐咳;而如果你的業(yè)務(wù)邏輯不需要對數(shù)據(jù)進(jìn)行排序逻悠,那么建議參考后面的幾個(gè)參數(shù)調(diào)優(yōu)元践,通過bypass機(jī)制或優(yōu)化的HashShuffleManager來避免排序操作,同時(shí)提供較好的磁盤讀寫性能童谒。這里要注意的是单旁,tungsten-sort要慎用,因?yàn)橹鞍l(fā)現(xiàn)了一些相應(yīng)的bug饥伊。

spark.shuffle.sort.bypassMergeThreshold

  • 默認(rèn)值:200
  • 參數(shù)說明當(dāng)ShuffleManager為SortShuffleManager時(shí)象浑,如果shuffle read task的數(shù)量小于這個(gè)閾值(默認(rèn)是200),則shuffle write過程中不會(huì)進(jìn)行排序操作琅豆,而是直接按照未經(jīng)優(yōu)化的HashShuffleManager的方式去寫數(shù)據(jù)融柬,但是最后會(huì)將每個(gè)task產(chǎn)生的所有臨時(shí)磁盤文件都合并成一個(gè)文件,并會(huì)創(chuàng)建單獨(dú)的索引文件趋距。
  • 調(diào)優(yōu)建議:當(dāng)你使用SortShuffleManager時(shí)粒氧,如果的確不需要排序操作,那么建議將這個(gè)參數(shù)調(diào)大一些节腐,大于shuffle read task的數(shù)量外盯。那么此時(shí)就會(huì)自動(dòng)啟用bypass機(jī)制,map-side就不會(huì)進(jìn)行排序了翼雀,減少了排序的性能開銷饱苟。但是這種方式下,依然會(huì)產(chǎn)生大量的磁盤文件狼渊,因此shuffle write性能有待提高箱熬。

spark.shuffle.consolidateFiles

  • 默認(rèn)值:false
  • 參數(shù)說明:如果使用HashShuffleManager,該參數(shù)有效狈邑。如果設(shè)置為true城须,那么就會(huì)開啟consolidate機(jī)制,會(huì)大幅度合并shuffle write的輸出文件米苹,對于shuffle read task數(shù)量特別多的情況下糕伐,這種方法可以極大地減少磁盤IO開銷,提升性能蘸嘶。
  • 調(diào)優(yōu)建議:如果的確不需要SortShuffleManager的排序機(jī)制良瞧,那么除了使用bypass機(jī)制,還可以嘗試將spark.shffle.manager參數(shù)手動(dòng)指定為hash训唱,使用HashShuffleManager褥蚯,同時(shí)開啟consolidate機(jī)制。在實(shí)踐中嘗試過况增,發(fā)現(xiàn)其性能比開啟了bypass機(jī)制的SortShuffleManager要高出10%~30%赞庶。

zhuanhttps://www.cnblogs.com/arachis/p/Spark_Shuffle.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子尘执,更是在濱河造成了極大的恐慌舍哄,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,324評論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件誊锭,死亡現(xiàn)場離奇詭異表悬,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)丧靡,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,356評論 3 392
  • 文/潘曉璐 我一進(jìn)店門蟆沫,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人温治,你說我怎么就攤上這事饭庞。” “怎么了熬荆?”我有些...
    開封第一講書人閱讀 162,328評論 0 353
  • 文/不壞的土叔 我叫張陵舟山,是天一觀的道長。 經(jīng)常有香客問我卤恳,道長累盗,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,147評論 1 292
  • 正文 為了忘掉前任突琳,我火速辦了婚禮若债,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘拆融。我一直安慰自己蠢琳,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,160評論 6 388
  • 文/花漫 我一把揭開白布镜豹。 她就那樣靜靜地躺著傲须,像睡著了一般。 火紅的嫁衣襯著肌膚如雪逛艰。 梳的紋絲不亂的頭發(fā)上躏碳,一...
    開封第一講書人閱讀 51,115評論 1 296
  • 那天,我揣著相機(jī)與錄音散怖,去河邊找鬼。 笑死肄渗,一個(gè)胖子當(dāng)著我的面吹牛镇眷,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播翎嫡,決...
    沈念sama閱讀 40,025評論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼欠动,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起具伍,我...
    開封第一講書人閱讀 38,867評論 0 274
  • 序言:老撾萬榮一對情侶失蹤翅雏,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后人芽,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體望几,經(jīng)...
    沈念sama閱讀 45,307評論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,528評論 2 332
  • 正文 我和宋清朗相戀三年萤厅,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了橄抹。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,688評論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡惕味,死狀恐怖楼誓,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情名挥,我是刑警寧澤疟羹,帶...
    沈念sama閱讀 35,409評論 5 343
  • 正文 年R本政府宣布,位于F島的核電站禀倔,受9級特大地震影響榄融,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜蹋艺,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,001評論 3 325
  • 文/蒙蒙 一剃袍、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧捎谨,春花似錦民效、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,657評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至检吆,卻和暖如春舒萎,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背蹭沛。 一陣腳步聲響...
    開封第一講書人閱讀 32,811評論 1 268
  • 我被黑心中介騙來泰國打工臂寝, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人摊灭。 一個(gè)月前我還...
    沈念sama閱讀 47,685評論 2 368
  • 正文 我出身青樓咆贬,卻偏偏與公主長得像,于是被迫代替她去往敵國和親帚呼。 傳聞我的和親對象是個(gè)殘疾皇子掏缎,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,573評論 2 353