Spark Shuffle(ExternalSorter)

1、Shuffle流程

spark的shuffle過程如下圖所示氢哮,和mapreduce中的類似,但在spark2.0及之后的版本中只存在SortShuffleManager而將原來的HashShuffleManager廢棄掉(但是shuffleWriter的子類BypassMergeSortShuffleWriter和已經(jīng)被廢棄掉的HashShuffleWriter類似)袋毙。這樣,每個mapTask在shuffle的sort階段只會生成一個結(jié)果文件冗尤,單個文件按照partitionId分成多個region听盖。reducer階段根據(jù)partitionId來fetch對應(yīng)的region數(shù)據(jù)。
整個shuffle過程分為兩個階段裂七,write(核心)和read階段,其中write階段比較重要的實現(xiàn)類為ExternalSorter(后面會重點分析該類)皆看。

shuffle

2、Shuffle Write

  • BypassMergeSortShuffleWriter -
    這種方式是對partition(對應(yīng)的reduce)數(shù)量較少且不需要map-side aggregation的shuffle優(yōu)化背零,將每個partition的數(shù)據(jù)直接寫到對應(yīng)的文件腰吟,在所有數(shù)據(jù)都寫入完成后進行一次合并,下面是部分代碼:
[BypassMergeSortShuffleWriter]->write
public void write(Iterator<Product2<K, V>> records) throws IOException {

                                    ...

    partitionWriters = new DiskBlockObjectWriter[numPartitions];
    /**
      為每個partition創(chuàng)建一個DiskWriter用于寫臨時文件
    **/
    for (int i = 0; i < numPartitions; i++) {
      final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
        blockManager.diskBlockManager().createTempShuffleBlock();
      final File file = tempShuffleBlockIdPlusFile._2();
      final BlockId blockId = tempShuffleBlockIdPlusFile._1();
      partitionWriters[i] =
        blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
    }
                        ...
    /**
      對每個record用對應(yīng)的writer進行文件寫入操作
    **/
    while (records.hasNext()) {
      final Product2<K, V> record = records.next();
      final K key = record._1();
      partitionWriters[partitioner.getPartition(key)].write(key, record._2());
    }
    //flush
    for (DiskBlockObjectWriter writer : partitionWriters) {
      writer.commitAndClose();
    }
    /**
        構(gòu)造最終的輸出文件實例,其中文件名為(reduceId為0):
        "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
         文件所在的local文件夾是根據(jù)該文件名的hash值確定徙瓶。
        1毛雇、如果運行在yarn上,yarn在啟動的時候會根據(jù)配置項'LOCAL_DIRS'在本地創(chuàng)建
        文件夾
    **/
    File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
    //在實際結(jié)果文件名后加上uuid用于標識文件正在寫入,結(jié)束后重命名
    File tmp = Utils.tempFileWith(output);
    try {
      //合并每個partition對應(yīng)的文件到一個文件中
      partitionLengths = writePartitionedFile(tmp);
      //將每個partition的offset寫入index文件方便reduce端fetch數(shù)據(jù)
      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
      }
    }
    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), 
partitionLengths);
  }
  • UnsafeShuffleWriter(詳見project tungsten)

該writer可將數(shù)據(jù)序列化后寫入到堆外內(nèi)存,只需要按照partitionid對地址進行排序,整個過程不涉及反序列化。
條件
1侦镇、使用的序列化類需要支持object relocation.目前只能使用kryoSerializer
2灵疮、不需要map side aggregate即不能定義aggregator
3、partition數(shù)量不能大于支持的上限(2^24)
內(nèi)存模型:
每條數(shù)據(jù)地址由一個64位的指針確定,其構(gòu)成為:[24 bit partition number][13 bit memory page number][27 bit offset in page]
在內(nèi)存為非8字節(jié)對齊的情況下,每個page的容量為227bits=128Mb,page總數(shù)為213,因此每個task可操作內(nèi)存總量為:227*213bits=1Tb,在內(nèi)存按字節(jié)對齊的情況下允許每個page的size有1g(即128*8,實際64位系統(tǒng)的內(nèi)存都是8字節(jié)對齊的)的容量,數(shù)據(jù)存放在off heap上虽缕。在地址中加入partitionID 是為了排序階段只需要對record的地址排序始藕。

數(shù)據(jù)存儲格式:

4、Shuffle過程中涉及到的幾個參數(shù)

  • spark.shuffle.sort.bypassMergeThreshold
    當(dāng)partition的數(shù)量小于該值并且不需要進行map-side aggregation時使用BypassMergeSortShuffleWriter來進行shuffle的write操作氮趋,默認值為200.
    [SortShuffleWriter]->shouldBypassMergeSort
  def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
    if (dep.mapSideCombine) {
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      false
    } else {
      val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
      dep.partitioner.numPartitions <= bypassMergeThreshold
    }
}```
- *spark.shuffle.compress*伍派、*spark.shuffle.file.buffer*
**[DiskBlockObjectWriter]->open**

def open(): DiskBlockObjectWriter = {
...
/**
'spark.shuffle.compress'-該參數(shù)決定是否對寫入文件的序列化數(shù)據(jù)進行壓縮。
'spark.shuffle.file.buffer'-設(shè)置buffer stream的buffersize,每writey
一個byte時會檢查當(dāng)前buffer容量,容量滿的時候則會flush到磁盤剩胁。該參數(shù)值在代碼中
會乘以1024轉(zhuǎn)換為字節(jié)長度行剂。默認值為'32k'救欧,該值太大可能導(dǎo)致內(nèi)存溢出。
**/
bs = compressStream(new BufferedOutputStream(ts, bufferSize))
...
}```

  • spark.file.transferTo
    決定在使用BypassMergeWriter過程中,最后對文件進行合并時是否使用NIO方式進行file stream的copy栓票。默認為true,在為false的情況下合并文件效率比較低(創(chuàng)建一個大小為8192的字節(jié)數(shù)組作為buffer,從in stream中讀滿后寫入out stream,單線程讀寫),版本號為2.6.32的linux內(nèi)核在使用NIO方式會產(chǎn)生bug,需要將該參數(shù)設(shè)置為false。

  • spark.shuffle.spill.numElementsForceSpillThreshold
    在使用UnsafeShuffleWriter時,如果內(nèi)存中的數(shù)據(jù)超過這個值則對當(dāng)前內(nèi)存數(shù)據(jù)進行排序并寫入磁盤臨時文件扩所。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末嫉入,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子觉至,更是在濱河造成了極大的恐慌剔应,老刑警劉巖,帶你破解...
    沈念sama閱讀 222,590評論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件语御,死亡現(xiàn)場離奇詭異峻贮,居然都是意外死亡,警方通過查閱死者的電腦和手機应闯,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,157評論 3 399
  • 文/潘曉璐 我一進店門纤控,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人碉纺,你說我怎么就攤上這事船万。” “怎么了骨田?”我有些...
    開封第一講書人閱讀 169,301評論 0 362
  • 文/不壞的土叔 我叫張陵唬涧,是天一觀的道長。 經(jīng)常有香客問我盛撑,道長碎节,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 60,078評論 1 300
  • 正文 為了忘掉前任抵卫,我火速辦了婚禮狮荔,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘介粘。我一直安慰自己殖氏,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 69,082評論 6 398
  • 文/花漫 我一把揭開白布姻采。 她就那樣靜靜地躺著雅采,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上婚瓜,一...
    開封第一講書人閱讀 52,682評論 1 312
  • 那天宝鼓,我揣著相機與錄音,去河邊找鬼巴刻。 笑死愚铡,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的胡陪。 我是一名探鬼主播沥寥,決...
    沈念sama閱讀 41,155評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼柠座!你這毒婦竟也來了邑雅?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 40,098評論 0 277
  • 序言:老撾萬榮一對情侶失蹤妈经,失蹤者是張志新(化名)和其女友劉穎蒂阱,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體狂塘,經(jīng)...
    沈念sama閱讀 46,638評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡录煤,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,701評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了荞胡。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片妈踊。...
    茶點故事閱讀 40,852評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖泪漂,靈堂內(nèi)的尸體忽然破棺而出廊营,到底是詐尸還是另有隱情,我是刑警寧澤萝勤,帶...
    沈念sama閱讀 36,520評論 5 351
  • 正文 年R本政府宣布露筒,位于F島的核電站,受9級特大地震影響敌卓,放射性物質(zhì)發(fā)生泄漏慎式。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 42,181評論 3 335
  • 文/蒙蒙 一趟径、第九天 我趴在偏房一處隱蔽的房頂上張望瘪吏。 院中可真熱鬧,春花似錦蜗巧、人聲如沸掌眠。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,674評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蓝丙。三九已至级遭,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間渺尘,已是汗流浹背挫鸽。 一陣腳步聲響...
    開封第一講書人閱讀 33,788評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留沧烈,地道東北人掠兄。 一個月前我還...
    沈念sama閱讀 49,279評論 3 379
  • 正文 我出身青樓像云,卻偏偏與公主長得像锌雀,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子迅诬,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,851評論 2 361

推薦閱讀更多精彩內(nèi)容