1、Shuffle流程
spark的shuffle過程如下圖所示氢哮,和mapreduce中的類似,但在spark2.0及之后的版本中只存在SortShuffleManager而將原來的HashShuffleManager廢棄掉(但是shuffleWriter的子類BypassMergeSortShuffleWriter和已經(jīng)被廢棄掉的HashShuffleWriter類似)袋毙。這樣,每個mapTask在shuffle的sort階段只會生成一個結(jié)果文件冗尤,單個文件按照partitionId分成多個region听盖。reducer階段根據(jù)partitionId來fetch對應(yīng)的region數(shù)據(jù)。
整個shuffle過程分為兩個階段裂七,write(核心)和read階段,其中write階段比較重要的實現(xiàn)類為ExternalSorter(后面會重點分析該類)皆看。
2、Shuffle Write
- BypassMergeSortShuffleWriter -
這種方式是對partition(對應(yīng)的reduce)數(shù)量較少且不需要map-side aggregation的shuffle優(yōu)化背零,將每個partition的數(shù)據(jù)直接寫到對應(yīng)的文件腰吟,在所有數(shù)據(jù)都寫入完成后進行一次合并,下面是部分代碼:
[BypassMergeSortShuffleWriter]->write
public void write(Iterator<Product2<K, V>> records) throws IOException {
...
partitionWriters = new DiskBlockObjectWriter[numPartitions];
/**
為每個partition創(chuàng)建一個DiskWriter用于寫臨時文件
**/
for (int i = 0; i < numPartitions; i++) {
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
}
...
/**
對每個record用對應(yīng)的writer進行文件寫入操作
**/
while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}
//flush
for (DiskBlockObjectWriter writer : partitionWriters) {
writer.commitAndClose();
}
/**
構(gòu)造最終的輸出文件實例,其中文件名為(reduceId為0):
"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
文件所在的local文件夾是根據(jù)該文件名的hash值確定徙瓶。
1毛雇、如果運行在yarn上,yarn在啟動的時候會根據(jù)配置項'LOCAL_DIRS'在本地創(chuàng)建
文件夾
**/
File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
//在實際結(jié)果文件名后加上uuid用于標識文件正在寫入,結(jié)束后重命名
File tmp = Utils.tempFileWith(output);
try {
//合并每個partition對應(yīng)的文件到一個文件中
partitionLengths = writePartitionedFile(tmp);
//將每個partition的offset寫入index文件方便reduce端fetch數(shù)據(jù)
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(),
partitionLengths);
}
-
UnsafeShuffleWriter(詳見project tungsten)
該writer可將數(shù)據(jù)序列化后寫入到堆外內(nèi)存,只需要按照partitionid對地址進行排序,整個過程不涉及反序列化。
條件:
1侦镇、使用的序列化類需要支持object relocation.目前只能使用kryoSerializer
2灵疮、不需要map side aggregate即不能定義aggregator
3、partition數(shù)量不能大于支持的上限(2^24)
內(nèi)存模型:
每條數(shù)據(jù)地址由一個64位的指針確定,其構(gòu)成為:[24 bit partition number][13 bit memory page number][27 bit offset in page]
在內(nèi)存為非8字節(jié)對齊的情況下,每個page的容量為227bits=128Mb,page總數(shù)為213,因此每個task可操作內(nèi)存總量為:227*213bits=1Tb,在內(nèi)存按字節(jié)對齊的情況下允許每個page的size有1g(即128*8,實際64位系統(tǒng)的內(nèi)存都是8字節(jié)對齊的)的容量,數(shù)據(jù)存放在off heap上虽缕。在地址中加入partitionID 是為了排序階段只需要對record的地址排序始藕。
數(shù)據(jù)存儲格式:
4、Shuffle過程中涉及到的幾個參數(shù)
-
spark.shuffle.sort.bypassMergeThreshold
當(dāng)partition的數(shù)量小于該值并且不需要進行map-side aggregation時使用BypassMergeSortShuffleWriter來進行shuffle的write操作氮趋,默認值為200.
[SortShuffleWriter]->shouldBypassMergeSort
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
false
} else {
val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
dep.partitioner.numPartitions <= bypassMergeThreshold
}
}```
- *spark.shuffle.compress*伍派、*spark.shuffle.file.buffer*
**[DiskBlockObjectWriter]->open**
def open(): DiskBlockObjectWriter = {
...
/**
'spark.shuffle.compress'-該參數(shù)決定是否對寫入文件的序列化數(shù)據(jù)進行壓縮。
'spark.shuffle.file.buffer'-設(shè)置buffer stream的buffersize,每writey
一個byte時會檢查當(dāng)前buffer容量,容量滿的時候則會flush到磁盤剩胁。該參數(shù)值在代碼中
會乘以1024轉(zhuǎn)換為字節(jié)長度行剂。默認值為'32k'救欧,該值太大可能導(dǎo)致內(nèi)存溢出。
**/
bs = compressStream(new BufferedOutputStream(ts, bufferSize))
...
}```
spark.file.transferTo
決定在使用BypassMergeWriter過程中,最后對文件進行合并時是否使用NIO方式進行file stream的copy栓票。默認為true,在為false的情況下合并文件效率比較低(創(chuàng)建一個大小為8192的字節(jié)數(shù)組作為buffer,從in stream中讀滿后寫入out stream,單線程讀寫),版本號為2.6.32的linux內(nèi)核在使用NIO方式會產(chǎn)生bug,需要將該參數(shù)設(shè)置為false。spark.shuffle.spill.numElementsForceSpillThreshold
在使用UnsafeShuffleWriter時,如果內(nèi)存中的數(shù)據(jù)超過這個值則對當(dāng)前內(nèi)存數(shù)據(jù)進行排序并寫入磁盤臨時文件扩所。