spark源碼閱讀之shuffleManager

1雳旅、shufflemanager的實(shí)現(xiàn)類:sortshufflemanager

Spark 0.8及以前 Hash Based Shuffle

在Shuffle Write過程按照Hash的方式重組Partition的數(shù)據(jù)音五,不進(jìn)行排序。每個(gè)map端的任務(wù)為每個(gè)reduce端的Task生成一個(gè)文件,通常會(huì)產(chǎn)生大量的文件(即對(duì)應(yīng)為M*R個(gè)中間文件戴差,其中M表示map端的Task個(gè)數(shù),R表示reduce端的Task個(gè)數(shù)),伴隨大量的隨機(jī)磁盤IO操作與大量的內(nèi)存開銷蛾方。

Shuffle Read過程如果有combiner操作,那么它會(huì)把拉到的數(shù)據(jù)保存在一個(gè)Spark封裝的哈希表(AppendOnlyMap)中進(jìn)行合并上陕。在代碼結(jié)構(gòu)上:

  • org.apache.spark.storage.ShuffleBlockManager負(fù)責(zé)Shuffle Write
  • org.apache.spark.BlockStoreShuffleFetcher負(fù)責(zé)Shuffle Read
  • org.apache.spark.Aggregator負(fù)責(zé)combine桩砰,依賴于AppendOnlyMap

Spark 0.8.1 為Hash Based Shuffle引入File Consolidation機(jī)制

通過文件合并,中間文件的生成方式修改為每個(gè)執(zhí)行單位(一個(gè)Executor中的執(zhí)行單位等于Core的個(gè)數(shù)除以每個(gè)Task所需的Core數(shù))為每個(gè)reduce端的任務(wù)生成一個(gè)文件释簿。最終可以將文件個(gè)數(shù)從MR修改為EC/T*R亚隅,其中,E表示Executor的個(gè)數(shù)庶溶,C表示每個(gè)Executor中可用Core的個(gè)數(shù)煮纵,T表示Task所分配的Core的個(gè)數(shù)。是否采用Consolidate機(jī)制偏螺,需要配置spark.shuffle.consolidateFiles參數(shù)

Spark 0.9 引入ExternalAppendOnlyMap

在combine的時(shí)候行疏,可以將數(shù)據(jù)spill到磁盤,然后通過堆排序merge

Spark 1.1 引入Sort Based Shuffle套像,但默認(rèn)仍為Hash Based Shuffle

在Sort Based Shuffle的Shuffle Write階段酿联,map端的任務(wù)會(huì)按照Partition id以及key對(duì)記錄進(jìn)行排序。同時(shí)將全部結(jié)果寫到一個(gè)數(shù)據(jù)文件中凉夯,同時(shí)生成一個(gè)索引文件货葬,reduce端的Task可以通過該索引文件獲取相關(guān)的數(shù)據(jù)。

在代碼結(jié)構(gòu)上:

從以前的ShuffleBlockManager中分離出ShuffleManager來專門管理Shuffle Writer和Shuffle Reader劲够。兩種Shuffle方式分別對(duì)應(yīng)

org.apache.spark.shuffle.hash.HashShuffleManager和

org.apache.spark.shuffle.sort.SortShuffleManager震桶,

可通過spark.shuffle.manager參數(shù)配置。兩種Shuffle方式有各自的ShuffleWriter:org.apache.spark.shuffle.hash.HashShuffle和org.apache.spark.shuffle.sort.SortShuffleWriter征绎;但共用一個(gè)ShuffleReader蹲姐,即org.apache.spark.shuffle.hash.HashShuffleReader。

org.apache.spark.util.collection.ExternalSorter實(shí)現(xiàn)排序功能人柿〔穸眨可通過對(duì)spark.shuffle.spill參數(shù)配置,決定是否可以在排序時(shí)將臨時(shí)數(shù)據(jù)Spill到磁盤凫岖。

Spark 1.2 默認(rèn)的Shuffle方式改為Sort Based Shuffle

Spark 1.4 引入Tungsten-Sort Based Shuffle

將數(shù)據(jù)記錄用序列化的二進(jìn)制方式存儲(chǔ)江咳,把排序轉(zhuǎn)化成指針數(shù)組的排序,引入堆外內(nèi)存空間和新的內(nèi)存管理模型哥放,這些技術(shù)決定了使用Tungsten-Sort要符合一些嚴(yán)格的限制歼指,比如Shuffle dependency不能帶有aggregation爹土、輸出不能排序等。由于堆外內(nèi)存的管理基于JDK Sun Unsafe API踩身,故Tungsten-Sort Based Shuffle也被稱為Unsafe Shuffle胀茵。

在代碼層面:

  • 新增org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
  • 新增org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter(用java實(shí)現(xiàn))
  • ShuffleReader復(fù)用HashShuffleReader

Spark 1.6 Tungsten-sort并入Sort Based Shuffle

由SortShuffleManager自動(dòng)判斷選擇最佳Shuffle方式,如果檢測到滿足Tungsten-sort條件會(huì)自動(dòng)采用Tungsten-sort Based Shuffle挟阻,否則采用Sort Based Shuffle琼娘。

在代碼方面:

  • UnsafeShuffleManager合并到SortShuffleManager
  • HashShuffleReader 重命名為BlockStoreShuffleReader,Sort Based Shuffle和Hash Based Shuffle仍共用ShuffleReader附鸽。

Spark 2.0 Hash Based Shuffle退出歷史舞臺(tái)脱拼,從此Spark只有Sort Based Shuffle,ShuffleManager的實(shí)現(xiàn)類就只有SortShufflemanager

2拒炎、sortshufflemanager.registerShuffle

3挪拟、sortshufflemanager.getReader

4、sortshufflemanager.getWriter

[html] view plaincopy

<embed id="ZeroClipboardMovie_1" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_1" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=1&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">

  1. case unsafeShuffleHandle: SerializedShuffleHandle] =>
  2. new UnsafeShuffleWriter(.......)
  3. case bypassMergeSortHandle: BypassMergeSortShuffleHandle=>
  4. new BypassMergeSortShuffleWriter(......)
  5. case other: BaseShuffleHandle =>
  6. new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)

5击你、BypassMergeSortShuffleWriter類似于hash shuffle玉组,但是將output file合并成一個(gè)文件

1)、BypassMergeSortShuffleWriter.write

傳參:partition的itearator

【如果record為空】

[html] view plaincopy

<embed id="ZeroClipboardMovie_2" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_2" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=2&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">

  1. if (!records.hasNext()) {
  2. partitionLengths = new long[numPartitions];
  3. shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
  4. mapStatus = MapStatus.MODULE.apply(blockManager.shuffleServerId(), partitionLengths);
  5. return;
  6. }

【獲取partition寫入磁盤文件的writer】

[html] view plaincopy

<embed id="ZeroClipboardMovie_3" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_3" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=3&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">

  1. partitionWriters = new DiskBlockObjectWriter[numPartitions];
  2. for (int i = 0; i < numPartitions; i++) {
  3. final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
  4. blockManager.diskBlockManager().createTempShuffleBlock();
  5. final File file = tempShuffleBlockIdPlusFile._2();
  6. final BlockId blockId = tempShuffleBlockIdPlusFile._1();
  7. partitionWriters[i] =
  8. blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
  9. }

【寫文件】

[html] view plaincopy

<embed id="ZeroClipboardMovie_4" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_4" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=4&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">

  1. while (records.hasNext()) {
  2. final Product2<K, V> record = records.next();
  3. final K key = record._1();
  4. partitionWriters[partitioner.getPartition(key)].write(key, record._2());
  5. }

【獲取每個(gè)ShuffleBlock丁侄,ShuffleBlock被稱為FileSegment】

[html] view plaincopy

<embed id="ZeroClipboardMovie_5" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_5" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=5&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">

  1. partitionWriterSegments = new FileSegment[numPartitions];
  2. for (int i = 0; i < numPartitions; i++) {
  3. final DiskBlockObjectWriter writer = partitionWriters[i];
  4. partitionWriterSegments[i] = writer.commitAndGet();
  5. writer.close();
  6. }

【合并文件以及寫index文件】

[html] view plaincopy

<embed id="ZeroClipboardMovie_6" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_6" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=6&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">

  1. File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
  2. File tmp = Utils.tempFileWith(output);
  3. try {
  4. partitionLengths = writePartitionedFile(tmp);
  5. shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
  6. } finally {
  7. if (tmp.exists() && !tmp.delete()) {
  8. logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
  9. }
  10. }

2)惯雳、BypassMergeSortShuffleWriter.writePartitionedFile

傳參:合并文件 File outputFile

[html] view plaincopy

<embed id="ZeroClipboardMovie_7" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_7" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=7&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">

  1. final FileOutputStream out = new FileOutputStream(outputFile, true);
  2. for (int i = 0; i < numPartitions; i++) {
  3. final File file = partitionWriterSegments[i].file();
  4. if (file.exists()) {
  5. final FileInputStream in = new FileInputStream(file);
  6. boolean copyThrewException = true;
  7. try {
  8. lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
  9. copyThrewException = false;
  10. } finally {
  11. Closeables.close(in, copyThrewException);
  12. }
  13. if (!file.delete()) {
  14. logger.error("Unable to delete file for partition {}", i);
  15. }
  16. }
  17. }
  18. threwException = false;

返回文件的偏移量

6、SortShuffleWriter

1)鸿摇、SortShuffleWriter.writer

【對(duì)rdd進(jìn)行排序】

[html] view plaincopy

<embed id="ZeroClipboardMovie_8" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_8" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=8&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">

  1. 如果map端進(jìn)行combine則石景,反之則不關(guān)心key在每個(gè)partition中是否被排序,既不傳遞aggregator也不傳遞ordering
  2. sorter = if (dep.mapSideCombine) {
  3. require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
  4. //需要combine時(shí)拙吉,傳遞partitioner以及ordering
  5. new ExternalSorter[K, V, C](
  6. context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
  7. } else {
  8. new ExternalSorter[K, V, V](
  9. context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
  10. }
  11. //將數(shù)據(jù)存儲(chǔ)在buffer或者map中潮孽,這是最關(guān)鍵的地方,根據(jù)需求(包括partition內(nèi)的key排序筷黔,partitionID排序等等)排序往史,內(nèi)存不夠時(shí)數(shù)據(jù)會(huì)spill后寫入spillfile
  12. sorter.insertAll(records)

【寫文件】

[html] view plaincopy

<embed id="ZeroClipboardMovie_9" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_9" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=9&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">

  1. //blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
  2. val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
  3. //通過工具類創(chuàng)建臨時(shí)文件
  4. val tmp = Utils.tempFileWith(output)
  5. try {
  6. val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
  7. //將buffer或者map中的數(shù)據(jù)寫入文件,各個(gè)partition
  8. val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
  9. //寫index文件
  10. shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
  11. mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
  12. } finally {
  13. if (tmp.exists() && !tmp.delete()) {
  14. logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
  15. }
  16. }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末佛舱,一起剝皮案震驚了整個(gè)濱河市椎例,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌请祖,老刑警劉巖订歪,帶你破解...
    沈念sama閱讀 222,378評(píng)論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異肆捕,居然都是意外死亡刷晋,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,970評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來眼虱,“玉大人或舞,你說我怎么就攤上這事∶苫茫” “怎么了?”我有些...
    開封第一講書人閱讀 168,983評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵胆筒,是天一觀的道長邮破。 經(jīng)常有香客問我,道長仆救,這世上最難降的妖魔是什么抒和? 我笑而不...
    開封第一講書人閱讀 59,938評(píng)論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮彤蔽,結(jié)果婚禮上摧莽,老公的妹妹穿的比我還像新娘。我一直安慰自己顿痪,他們只是感情好镊辕,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,955評(píng)論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著蚁袭,像睡著了一般征懈。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上揩悄,一...
    開封第一講書人閱讀 52,549評(píng)論 1 312
  • 那天卖哎,我揣著相機(jī)與錄音,去河邊找鬼删性。 笑死亏娜,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的蹬挺。 我是一名探鬼主播维贺,決...
    沈念sama閱讀 41,063評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼汗侵!你這毒婦竟也來了幸缕?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,991評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤晰韵,失蹤者是張志新(化名)和其女友劉穎发乔,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體雪猪,經(jīng)...
    沈念sama閱讀 46,522評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡栏尚,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,604評(píng)論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了只恨。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片译仗。...
    茶點(diǎn)故事閱讀 40,742評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡抬虽,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出纵菌,到底是詐尸還是另有隱情阐污,我是刑警寧澤,帶...
    沈念sama閱讀 36,413評(píng)論 5 351
  • 正文 年R本政府宣布咱圆,位于F島的核電站笛辟,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏序苏。R本人自食惡果不足惜手幢,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,094評(píng)論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望忱详。 院中可真熱鬧围来,春花似錦、人聲如沸匈睁。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,572評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽航唆。三九已至才漆,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間佛点,已是汗流浹背醇滥。 一陣腳步聲響...
    開封第一講書人閱讀 33,671評(píng)論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留超营,地道東北人鸳玩。 一個(gè)月前我還...
    沈念sama閱讀 49,159評(píng)論 3 378
  • 正文 我出身青樓,卻偏偏與公主長得像演闭,于是被迫代替她去往敵國和親不跟。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,747評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容