1雳旅、shufflemanager的實(shí)現(xiàn)類:sortshufflemanager
Spark 0.8及以前 Hash Based Shuffle
在Shuffle Write過程按照Hash的方式重組Partition的數(shù)據(jù)音五,不進(jìn)行排序。每個(gè)map端的任務(wù)為每個(gè)reduce端的Task生成一個(gè)文件,通常會(huì)產(chǎn)生大量的文件(即對(duì)應(yīng)為M*R個(gè)中間文件戴差,其中M表示map端的Task個(gè)數(shù),R表示reduce端的Task個(gè)數(shù)),伴隨大量的隨機(jī)磁盤IO操作與大量的內(nèi)存開銷蛾方。
Shuffle Read過程如果有combiner操作,那么它會(huì)把拉到的數(shù)據(jù)保存在一個(gè)Spark封裝的哈希表(AppendOnlyMap)中進(jìn)行合并上陕。在代碼結(jié)構(gòu)上:
- org.apache.spark.storage.ShuffleBlockManager負(fù)責(zé)Shuffle Write
- org.apache.spark.BlockStoreShuffleFetcher負(fù)責(zé)Shuffle Read
- org.apache.spark.Aggregator負(fù)責(zé)combine桩砰,依賴于AppendOnlyMap
Spark 0.8.1 為Hash Based Shuffle引入File Consolidation機(jī)制
通過文件合并,中間文件的生成方式修改為每個(gè)執(zhí)行單位(一個(gè)Executor中的執(zhí)行單位等于Core的個(gè)數(shù)除以每個(gè)Task所需的Core數(shù))為每個(gè)reduce端的任務(wù)生成一個(gè)文件释簿。最終可以將文件個(gè)數(shù)從MR修改為EC/T*R亚隅,其中,E表示Executor的個(gè)數(shù)庶溶,C表示每個(gè)Executor中可用Core的個(gè)數(shù)煮纵,T表示Task所分配的Core的個(gè)數(shù)。是否采用Consolidate機(jī)制偏螺,需要配置spark.shuffle.consolidateFiles參數(shù)
Spark 0.9 引入ExternalAppendOnlyMap
在combine的時(shí)候行疏,可以將數(shù)據(jù)spill到磁盤,然后通過堆排序merge
Spark 1.1 引入Sort Based Shuffle套像,但默認(rèn)仍為Hash Based Shuffle
在Sort Based Shuffle的Shuffle Write階段酿联,map端的任務(wù)會(huì)按照Partition id以及key對(duì)記錄進(jìn)行排序。同時(shí)將全部結(jié)果寫到一個(gè)數(shù)據(jù)文件中凉夯,同時(shí)生成一個(gè)索引文件货葬,reduce端的Task可以通過該索引文件獲取相關(guān)的數(shù)據(jù)。
在代碼結(jié)構(gòu)上:
從以前的ShuffleBlockManager中分離出ShuffleManager來專門管理Shuffle Writer和Shuffle Reader劲够。兩種Shuffle方式分別對(duì)應(yīng)
org.apache.spark.shuffle.hash.HashShuffleManager和
org.apache.spark.shuffle.sort.SortShuffleManager震桶,
可通過spark.shuffle.manager參數(shù)配置。兩種Shuffle方式有各自的ShuffleWriter:org.apache.spark.shuffle.hash.HashShuffle和org.apache.spark.shuffle.sort.SortShuffleWriter征绎;但共用一個(gè)ShuffleReader蹲姐,即org.apache.spark.shuffle.hash.HashShuffleReader。
org.apache.spark.util.collection.ExternalSorter實(shí)現(xiàn)排序功能人柿〔穸眨可通過對(duì)spark.shuffle.spill參數(shù)配置,決定是否可以在排序時(shí)將臨時(shí)數(shù)據(jù)Spill到磁盤凫岖。
Spark 1.2 默認(rèn)的Shuffle方式改為Sort Based Shuffle
Spark 1.4 引入Tungsten-Sort Based Shuffle
將數(shù)據(jù)記錄用序列化的二進(jìn)制方式存儲(chǔ)江咳,把排序轉(zhuǎn)化成指針數(shù)組的排序,引入堆外內(nèi)存空間和新的內(nèi)存管理模型哥放,這些技術(shù)決定了使用Tungsten-Sort要符合一些嚴(yán)格的限制歼指,比如Shuffle dependency不能帶有aggregation爹土、輸出不能排序等。由于堆外內(nèi)存的管理基于JDK Sun Unsafe API踩身,故Tungsten-Sort Based Shuffle也被稱為Unsafe Shuffle胀茵。
在代碼層面:
- 新增org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
- 新增org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter(用java實(shí)現(xiàn))
- ShuffleReader復(fù)用HashShuffleReader
Spark 1.6 Tungsten-sort并入Sort Based Shuffle
由SortShuffleManager自動(dòng)判斷選擇最佳Shuffle方式,如果檢測到滿足Tungsten-sort條件會(huì)自動(dòng)采用Tungsten-sort Based Shuffle挟阻,否則采用Sort Based Shuffle琼娘。
在代碼方面:
- UnsafeShuffleManager合并到SortShuffleManager
- HashShuffleReader 重命名為BlockStoreShuffleReader,Sort Based Shuffle和Hash Based Shuffle仍共用ShuffleReader附鸽。
Spark 2.0 Hash Based Shuffle退出歷史舞臺(tái)脱拼,從此Spark只有Sort Based Shuffle,ShuffleManager的實(shí)現(xiàn)類就只有SortShufflemanager
2拒炎、sortshufflemanager.registerShuffle
3挪拟、sortshufflemanager.getReader
4、sortshufflemanager.getWriter
[html] view plaincopy
<embed id="ZeroClipboardMovie_1" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_1" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=1&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">
- case unsafeShuffleHandle: SerializedShuffleHandle] =>
- new UnsafeShuffleWriter(.......)
- case bypassMergeSortHandle: BypassMergeSortShuffleHandle=>
- new BypassMergeSortShuffleWriter(......)
- case other: BaseShuffleHandle =>
- new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
5击你、BypassMergeSortShuffleWriter類似于hash shuffle玉组,但是將output file合并成一個(gè)文件
1)、BypassMergeSortShuffleWriter.write
傳參:partition的itearator
【如果record為空】
[html] view plaincopy
<embed id="ZeroClipboardMovie_2" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_2" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=2&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">
- if (!records.hasNext()) {
- partitionLengths = new long[numPartitions];
- shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
- mapStatus = MapStatus
.apply(blockManager.shuffleServerId(), partitionLengths);
- return;
- }
【獲取partition寫入磁盤文件的writer】
[html] view plaincopy
<embed id="ZeroClipboardMovie_3" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_3" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=3&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">
- partitionWriters = new DiskBlockObjectWriter[numPartitions];
- for (int i = 0; i < numPartitions; i++) {
- final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
- blockManager.diskBlockManager().createTempShuffleBlock();
- final File file = tempShuffleBlockIdPlusFile._2();
- final BlockId blockId = tempShuffleBlockIdPlusFile._1();
- partitionWriters[i] =
- blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
- }
【寫文件】
[html] view plaincopy
<embed id="ZeroClipboardMovie_4" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_4" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=4&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">
- while (records.hasNext()) {
- final Product2<K, V> record = records.next();
- final K key = record._1();
- partitionWriters[partitioner.getPartition(key)].write(key, record._2());
- }
【獲取每個(gè)ShuffleBlock丁侄,ShuffleBlock被稱為FileSegment】
[html] view plaincopy
<embed id="ZeroClipboardMovie_5" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_5" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=5&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">
- partitionWriterSegments = new FileSegment[numPartitions];
- for (int i = 0; i < numPartitions; i++) {
- final DiskBlockObjectWriter writer = partitionWriters[i];
- partitionWriterSegments[i] = writer.commitAndGet();
- writer.close();
- }
【合并文件以及寫index文件】
[html] view plaincopy
<embed id="ZeroClipboardMovie_6" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_6" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=6&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">
- File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
- File tmp = Utils.tempFileWith(output);
- try {
- partitionLengths = writePartitionedFile(tmp);
- shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
- } finally {
- if (tmp.exists() && !tmp.delete()) {
- logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
- }
- }
2)惯雳、BypassMergeSortShuffleWriter.writePartitionedFile
傳參:合并文件 File outputFile
[html] view plaincopy
<embed id="ZeroClipboardMovie_7" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_7" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=7&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">
- final FileOutputStream out = new FileOutputStream(outputFile, true);
- for (int i = 0; i < numPartitions; i++) {
- final File file = partitionWriterSegments[i].file();
- if (file.exists()) {
- final FileInputStream in = new FileInputStream(file);
- boolean copyThrewException = true;
- try {
- lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
- copyThrewException = false;
- } finally {
- Closeables.close(in, copyThrewException);
- }
- if (!file.delete()) {
- logger.error("Unable to delete file for partition {}", i);
- }
- }
- }
- threwException = false;
返回文件的偏移量
6、SortShuffleWriter
1)鸿摇、SortShuffleWriter.writer
【對(duì)rdd進(jìn)行排序】
[html] view plaincopy
<embed id="ZeroClipboardMovie_8" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_8" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=8&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">
- 如果map端進(jìn)行combine則石景,反之則不關(guān)心key在每個(gè)partition中是否被排序,既不傳遞aggregator也不傳遞ordering
- sorter = if (dep.mapSideCombine) {
- require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
- //需要combine時(shí)拙吉,傳遞partitioner以及ordering
- new ExternalSorter[K, V, C](
- context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
- } else {
- new ExternalSorter[K, V, V](
- context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
- }
- //將數(shù)據(jù)存儲(chǔ)在buffer或者map中潮孽,這是最關(guān)鍵的地方,根據(jù)需求(包括partition內(nèi)的key排序筷黔,partitionID排序等等)排序往史,內(nèi)存不夠時(shí)數(shù)據(jù)會(huì)spill后寫入spillfile
- sorter.insertAll(records)
【寫文件】
[html] view plaincopy
<embed id="ZeroClipboardMovie_9" src="https://csdnimg.cn/public/highlighter/ZeroClipboard.swf" loop="false" menu="false" quality="best" bgcolor="#ffffff" width="16" height="16" name="ZeroClipboardMovie_9" align="middle" allowscriptaccess="always" allowfullscreen="false" type="application/x-shockwave-flash" pluginspage="http://www.macromedia.com/go/getflashplayer" flashvars="id=9&width=16&height=16" wmode="transparent" style="box-sizing: border-box; outline: 0px; white-space: normal; word-break: break-all;">
- //blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
- val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
- //通過工具類創(chuàng)建臨時(shí)文件
- val tmp = Utils.tempFileWith(output)
- try {
- val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
- //將buffer或者map中的數(shù)據(jù)寫入文件,各個(gè)partition
- val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
- //寫index文件
- shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
- mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
- } finally {
- if (tmp.exists() && !tmp.delete()) {
- logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
- }
- }