本篇結(jié)構(gòu):
- Spark Shuffle 的發(fā)展
- Spark Shuffle 中數(shù)據(jù)結(jié)構(gòu)
- Spark Shuffle 原理
- 來源文章
Spark Shuffle 是 spark job 中某些算子觸發(fā)的操作。當(dāng) rdd 依賴中出現(xiàn)寬依賴的時(shí)候要出,就會(huì)觸發(fā) Shuffle 操作樱哼,Shuffle 操作通常會(huì)伴隨著不同 executor/host 之間數(shù)據(jù)的傳輸敌买。
Shuffle 操作可能涉及的過程包括數(shù)據(jù)的排序盯滚,聚合茁计,溢寫码荔,合并砸脊,傳輸,磁盤IO拳氢,網(wǎng)絡(luò)的 IO 等等募逞。Shuffle 是連接 MapTask 和 ReduceTask 之間的橋梁,Map 的輸出到 Reduce 中須經(jīng)過 Shuffle 環(huán)節(jié)馋评,Shuffle 的性能高低直接影響了整個(gè)程序的性能和吞吐量放接。
通常 Shuffle 分為兩部分:Map 階段的數(shù)據(jù)準(zhǔn)備( ShuffleMapTask )和Reduce(ShuffleReduceTask) 階段的數(shù)據(jù)拷貝處理。一般將在 Map 端的 Shuffle 稱之為 Shuffle Write留特,在 Reduce 端的 Shuffle 稱之為 Shuffle Read纠脾。
一、Spark Shuffle 的發(fā)展
- Spark 0.8及以前 Hash Based Shuffle
- Spark 0.8.1 為Hash Based Shuffle引入File Consolidation機(jī)制
- Spark 0.9 引入ExternalAppendOnlyMap
- Spark 1.1 引入Sort Based Shuffle磕秤,但默認(rèn)仍為Hash Based Shuffle
- Spark 1.2 默認(rèn)的Shuffle方式改為Sort Based Shuffle
- Spark 1.4 引入Tungsten-Sort Based Shuffle
- Spark 1.6 Tungsten-sort并入Sort Based Shuffle
- Spark 2.0 Hash Based Shuffle退出歷史舞臺(tái)
Spark Shuffle 機(jī)制總共有三種:
1.1腹尖、未優(yōu)化的 HashShuffle
每一個(gè) ShuffleMapTask 都會(huì)為每一個(gè) ReducerTask 創(chuàng)建一個(gè)單獨(dú)的文件缭裆,總的文件數(shù)是 M * R互亮,其中 M 是 ShuffleMapTask 的數(shù)量愁铺,R 是 ShuffleReduceTask 的數(shù)量。
見下圖(來源網(wǎng)絡(luò)):
在處理大數(shù)據(jù)時(shí)蒙兰,ShuffleMapTask 和 ShuffleReduceTask 的數(shù)量很多磷瘤,創(chuàng)建的磁盤文件數(shù)量 M*R 也越多芒篷,大量的文件要寫磁盤,再從磁盤讀出來采缚,不僅會(huì)占用大量的時(shí)間针炉,而且每個(gè)磁盤文件記錄的句柄都會(huì)保存在內(nèi)存中(每個(gè)人大約 100k),因此也會(huì)占用很大的內(nèi)存空間扳抽,頻繁的打開和關(guān)閉文件篡帕,會(huì)導(dǎo)致頻繁的GC操作,很容易出現(xiàn) OOM 的情況贸呢。
也正是上述原因镰烧,該 HashShuffle 如今已退出歷史舞臺(tái)。
1.2楞陷、優(yōu)化后 HashShuffle
在 Spark 0.8.1 版本中怔鳖,引入了 Consolidation 機(jī)制,該機(jī)制是對(duì) HashShuffle 的一種優(yōu)化固蛾。
如下圖(來源網(wǎng)絡(luò)):
可以明顯看出结执,在一個(gè) core 上連續(xù)執(zhí)行的 ShuffleMapTasks 可以共用一個(gè)輸出文件 ShuffleFile。
先執(zhí)行完的 ShuffleMapTask 形成 ShuffleBlock i艾凯,后執(zhí)行的 ShuffleMapTask 可以將輸出數(shù)據(jù)直接追加到 ShuffleBlock i 后面献幔,形成 ShuffleBlock i',每個(gè) ShuffleBlock 被稱為 FileSegment览芳。下一個(gè) stage 的 reducer 只需要 fetch 整個(gè) ShuffleFile 就行了斜姥。
這樣,每個(gè) worker 持有的文件數(shù)降為 cores * R沧竟。cores 代表核數(shù),R 是 ShuffleReduceTask 數(shù)缚忧。
1.3悟泵、Sort-Based Shuffle
由于 HashShuffle 會(huì)產(chǎn)生很多的磁盤文件,引入 Consolidation 機(jī)制雖然在一定程度上減少了磁盤文件數(shù)量闪水,但是不足以有效提高 Shuffle 的性能糕非,適合中小型數(shù)據(jù)規(guī)模的大數(shù)據(jù)處理。
為了讓 Spark 在更大規(guī)模的集群上更高性能處理更大規(guī)模的數(shù)據(jù)球榆,因此在 Spark 1.1 版本中朽肥,引入了 SortShuffle。
如下圖(來源網(wǎng)絡(luò)):
該機(jī)制每一個(gè) ShuffleMapTask 都只創(chuàng)建一個(gè)文件持钉,將所有的 ShuffleReduceTask 的輸入都寫入同一個(gè)文件衡招,并且對(duì)應(yīng)生成一個(gè)索引文件。
以前的數(shù)據(jù)是放在內(nèi)存緩存中每强,等到數(shù)據(jù)完了再刷到磁盤始腾,現(xiàn)在為了減少內(nèi)存的使用州刽,在內(nèi)存不夠用的時(shí)候,可以將輸出溢寫到磁盤浪箭,結(jié)束的時(shí)候穗椅,再將這些不同的文件聯(lián)合內(nèi)存的數(shù)據(jù)一起進(jìn)行歸并,從而減少內(nèi)存的使用量奶栖。一方面文件數(shù)量顯著減少匹表,另一方面減少Writer 緩存所占用的內(nèi)存大小,而且同時(shí)避免 GC 的風(fēng)險(xiǎn)和頻率宣鄙。
但對(duì)于 Rueducer 數(shù)比較少的情況桑孩,Hash Shuffle 要比 Sort Shuffle 快,因此 Sort Shuffle 有個(gè) “fallback” 計(jì)劃框冀,對(duì)于 Reducers 數(shù)少于 “spark.shuffle.sort.bypassMergeThreshold” (200 by default)流椒,將使用 fallback 計(jì)劃,hashing 相關(guān)數(shù)據(jù)到分開的文件明也,然后合并這些文件為一個(gè)宣虾。
二、Sort Based Shuffle
因?yàn)?hash based shuffle 已經(jīng)退出歷史舞臺(tái)温数,所以以 spark 2.3 的 sort based shuffle 為例绣硝,看 Spark Shuffle 的原理。
Shuffle 的整個(gè)生命周期由 ShuffleManager 來管理撑刺,Spark 2.3中鹉胖,唯一的支持方式為 SortShuffleManager,SortShuffleManager 中定義了 writer 和 reader 對(duì)應(yīng)shuffle 的 map 和 reduce 階段够傍。reader 只有一種實(shí)現(xiàn) BlockStoreShuffleReader
甫菠,writer 有三種運(yùn)行實(shí)現(xiàn):
- BypassMergeSortShuffleWriter:當(dāng)前 shuffle 沒有聚合, 并且分區(qū)數(shù)小于
spark.shuffle.sort.bypassMergeThreshold
(默認(rèn)200) - UnsafeShuffleWriter:當(dāng)條件不滿足 BypassMergeSortShuffleWriter 時(shí)冕屯, 并且當(dāng)前 rdd 的數(shù)據(jù)支持序列化(即 UnsafeRowSerializer)寂诱,也不需要聚合, 分區(qū)數(shù)小于 2^24
- SortShuffleWriter:其余
2.1安聘、BypassMergeSortShuffleWriter
首先痰洒,BypassMergeSortShuffleWriter 的運(yùn)行機(jī)制的觸發(fā)條件如下:
- shuffle reduce task(即partition)數(shù)量小于
spark.shuffle.sort.bypassMergeThreshold
參數(shù)的值。 - 沒有map side aggregations浴韭。
note: map side aggregations是指在 map 端的聚合操作丘喻,通常來說一些聚合類的算子都會(huì)都 map 端的 aggregation。不過對(duì)于 groupByKey 和combineByKey念颈, 如果設(shè)定 mapSideCombine 為false泉粉,就不會(huì)有 map side aggregations。
BypassMergeSortShuffleHandle 算法適用于沒有聚合舍肠,數(shù)據(jù)量不大的場(chǎng)景搀继。 給每個(gè)分區(qū)分配一個(gè)臨時(shí)文件窘面,對(duì)每個(gè) record 的 key 使用分區(qū)器(模式是hash,如果用戶自定義就使用自定義的分區(qū)器)找到對(duì)應(yīng)分區(qū)的輸出文件句柄叽躯,寫入文件對(duì)應(yīng)的文件财边。
因?yàn)閷懭氪疟P文件是通過 Java的 BufferedOutputStream 實(shí)現(xiàn)的,BufferedOutputStream 是 Java 的緩沖輸出流点骑,首先會(huì)將數(shù)據(jù)緩沖在內(nèi)存中酣难,當(dāng)內(nèi)存緩沖滿溢之后再一次寫入磁盤文件中,這樣可以減少磁盤 IO 次數(shù)黑滴,提升性能憨募。所以圖中會(huì)有內(nèi)存緩沖的概念。
圖片來源網(wǎng)絡(luò)(正常應(yīng)該有 3 個(gè) reduce task):
最后袁辈,會(huì)將所有臨時(shí)文件合并成一個(gè)磁盤文件菜谣,并創(chuàng)建一個(gè)索引文件標(biāo)識(shí)下游各個(gè) reduce task 的數(shù)據(jù)在文件中的 start offset與 end offset。
該過程的磁盤寫機(jī)制其實(shí)跟未經(jīng)優(yōu)化的 HashShuffleManager 是一樣的晚缩,也會(huì)創(chuàng)建很多的臨時(shí)文件(所以觸發(fā)條件中會(huì)有 reduce task 數(shù)量限制)尾膊,只是在最后會(huì)做一個(gè)磁盤文件的合并,對(duì)于 shuffle reader 會(huì)更友好一些荞彼。
BypassMergeSortShuffleWriter 所有的中間數(shù)據(jù)都是在磁盤里冈敛,并沒有利用內(nèi)存。而且它只保證分區(qū)索引的排序鸣皂,而并不保證數(shù)據(jù)的排序抓谴。
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
final long openStartTime = System.nanoTime();
// DiskBlockObjectWriter 數(shù)組,索引是 reduce 端的分區(qū)索引
partitionWriters = new DiskBlockObjectWriter[numPartitions];
// FileSegment數(shù)組寞缝,索引是 reduce 端的分區(qū)索引
partitionWriterSegments = new FileSegment[numPartitions];
// 為每個(gè) reduce 端的分區(qū)癌压,創(chuàng)建臨時(shí) Block 和文件
for (int i = 0; i < numPartitions; i++) {
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
// 遍歷數(shù)據(jù),根據(jù)key找到分區(qū)索引第租,存到對(duì)應(yīng)的文件中
while (records.hasNext()) {
final Product2<K, V> record = records.next();
// 獲取數(shù)據(jù)的key
final K key = record._1();
// 根據(jù)reduce端的分區(qū)器措拇,判斷該條數(shù)據(jù)應(yīng)該存在reduce端的哪個(gè)分區(qū)
// 并且通過DiskBlockObjectWriter,存到對(duì)應(yīng)的文件中
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}
for (int i = 0; i < numPartitions; i++) {
final DiskBlockObjectWriter writer = partitionWriters[i];
// 調(diào)用DiskBlockObjectWriter的commitAndGet方法慎宾,獲取FileSegment,包含寫入的數(shù)據(jù)信息
partitionWriterSegments[i] = writer.commitAndGet();
writer.close();
}
// 獲取最終結(jié)果的文件名
File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
// 根據(jù)output文件名浅悉,生成臨時(shí)文件趟据。臨時(shí)文件的名稱只是在output文件名后面添加了一個(gè)uuid
File tmp = Utils.tempFileWith(output);
try {
// 將所有的文件都合并到tmp文件中,返回每個(gè)數(shù)據(jù)段的長度
partitionLengths = writePartitionedFile(tmp);
// 這里writeIndexFileAndCommit會(huì)將tmp文件重命名术健,并且會(huì)創(chuàng)建索引文件汹碱。
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}
2.2、SortShuffleWriter
可以先考慮一個(gè)問題荞估,假如有 100 億條數(shù)據(jù)咳促,內(nèi)存只有 1M稚新,但是磁盤很大,現(xiàn)在要對(duì)這 100 億條數(shù)據(jù)進(jìn)行排序跪腹,是沒法把所有的數(shù)據(jù)一次性的 load 進(jìn)行內(nèi)存進(jìn)行排序的褂删,這就涉及到一個(gè)外部排序的問題。
假設(shè) 1M 內(nèi)存能裝進(jìn) 1 億條數(shù)據(jù)冲茸,每次能對(duì)這 1 億條數(shù)據(jù)進(jìn)行排序屯阀,排好序后輸出到磁盤,總共輸出 100 個(gè)文件轴术,最后把這 100 個(gè)文件進(jìn)行 merge 成一個(gè)全局有序的大文件难衰,這是歸并的思路:
可以每個(gè)文件(有序的)都取一部分頭部數(shù)據(jù)最為一個(gè) buffer, 并且把這 100個(gè) buffer 放在一個(gè)堆里面逗栽,進(jìn)行堆排序盖袭,比較方式就是對(duì)所有堆元素(buffer)的head 元素進(jìn)行比較大小, 然后不斷的把每個(gè)堆頂?shù)?buffer 的head 元素 pop 出來輸出到最終文件中彼宠, 然后繼續(xù)堆排序鳄虱,繼續(xù)輸出。如果哪個(gè) buffer 空了兵志,就去對(duì)應(yīng)的文件中繼續(xù)補(bǔ)充一部分?jǐn)?shù)據(jù)醇蝴。最終就得到一個(gè)全局有序的大文件。
SortShuffleWirter 的實(shí)現(xiàn)大概就是這樣想罕,和 Hadoop MR 的實(shí)現(xiàn)相似悠栓。
圖片來源網(wǎng)絡(luò):
該模式下,數(shù)據(jù)首先寫入一個(gè)內(nèi)存數(shù)據(jù)結(jié)構(gòu)中按价,此時(shí)根據(jù)不同的 shuffle 算子惭适,可能選用不同的數(shù)據(jù)結(jié)構(gòu)。有些 shuffle 操作涉及到聚合楼镐,對(duì)于這種需要聚合的操作癞志,使用 PartitionedAppendOnlyMap 來排序。對(duì)于不需要聚合的框产,則使用 PartitionedPairBuffer 排序凄杯。
在進(jìn)行 shuffle 之前,map 端會(huì)先將數(shù)據(jù)進(jìn)行排序秉宿。排序的規(guī)則戒突,根據(jù)不同的場(chǎng)景,會(huì)分為兩種描睦。首先會(huì)根據(jù) Key 將元素分成不同的 partition膊存。第一種只需要保證元素的 partitionId 排序,但不會(huì)保證同一個(gè) partitionId 的內(nèi)部排序。第二種是既保證元素的 partitionId 排序隔崎,也會(huì)保證同一個(gè) partitionId 的內(nèi)部排序今艺。
接著,往內(nèi)存寫入數(shù)據(jù)爵卒,每隔一段時(shí)間虚缎,當(dāng)向 MemoryManager 申請(qǐng)不到足夠的內(nèi)存時(shí),或者數(shù)據(jù)量超過 spark.shuffle.spill.numElementsForceSpillThreshold
這個(gè)閾值時(shí) (默認(rèn)是 Long 的最大值技潘,不起作用)遥巴,就會(huì)進(jìn)行 Spill 內(nèi)存數(shù)據(jù)到文件,然后清空內(nèi)存數(shù)據(jù)結(jié)構(gòu)享幽。假設(shè)可以源源不斷的申請(qǐng)到內(nèi)存铲掐,那么 Write 階段的所有數(shù)據(jù)將一直保存在內(nèi)存中,由此可見值桩,PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 是比較吃內(nèi)存的摆霉。
在溢寫到磁盤文件之前,會(huì)先根據(jù) key 對(duì)內(nèi)存數(shù)據(jù)結(jié)構(gòu)中已有的數(shù)據(jù)進(jìn)行排序奔坟。排序過后携栋,會(huì)分批將數(shù)據(jù)寫入磁盤文件。默認(rèn)的 batch 數(shù)量是 10000 條咳秉,也就是說婉支,排序好的數(shù)據(jù),會(huì)以每批 1 萬條數(shù)據(jù)的形式分批寫入磁盤文件澜建。寫入磁盤文件也是通過 Java 的 BufferedOutputStream 實(shí)現(xiàn)的向挖。
一個(gè) task 將所有數(shù)據(jù)寫入內(nèi)存數(shù)據(jù)結(jié)構(gòu)的過程中,會(huì)發(fā)生多次磁盤溢寫操作炕舵,也就會(huì)產(chǎn)生多個(gè)臨時(shí)文件何之。在將最終排序結(jié)果寫入到數(shù)據(jù)文件之前,需要將內(nèi)存中的 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 和已經(jīng) spill 到磁盤的 SpillFiles 進(jìn)行合并咽筋。
此外溶推,由于一個(gè) task 就只對(duì)應(yīng)一個(gè)磁盤文件,也就意味著該 task 為下游 stage 的 task 準(zhǔn)備的數(shù)據(jù)都在這一個(gè)文件中奸攻,因此還會(huì)單獨(dú)寫一份索引文件蒜危,其中標(biāo)識(shí)了下游各個(gè) task 的數(shù)據(jù)在文件中的 start offset 與 end offset。
BypassMergeSortShuffleWriter 與該機(jī)制相比:
第一睹耐,磁盤寫機(jī)制不同舰褪;第二,不會(huì)進(jìn)行排序疏橄。也就是說,啟用 BypassMerge 機(jī)制的最大好處在于,shuffle write 過程中捎迫,不需要進(jìn)行數(shù)據(jù)的排序操作晃酒,也就節(jié)省掉了這部分的性能開銷,當(dāng)然需要滿足那兩個(gè)觸發(fā)條件窄绒。
override def write(records: Iterator[Product2[K, V]]): Unit = {
// 根據(jù)是否在map端進(jìn)行數(shù)據(jù)合并初始化 ExternalSorter
sorter = if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
// 不進(jìn)行聚合贝次,也不進(jìn)行排序,reduce端再進(jìn)行排序彰导,只會(huì)根據(jù) key 值獲取對(duì)應(yīng)的分區(qū) id蛔翅,來劃分?jǐn)?shù)據(jù),不會(huì)在分區(qū)內(nèi)排序位谋,如果結(jié)果需要排序山析,例如sortByKey,會(huì)在 reduce 端獲取 shuffle 數(shù)據(jù)后進(jìn)行
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
sorter.insertAll(records)
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
// shuffle輸出文件
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
try {
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
// sorter 中的數(shù)據(jù)寫出到該文件中
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
// 寫出對(duì)應(yīng)的index文件掏父,紀(jì)錄每個(gè)Partition對(duì)應(yīng)的偏移量
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
// shuffleWriter的返回結(jié)果
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
}
}
}
2.3笋轨、UnsafeShuffleWriter
觸發(fā)條件有三個(gè):
- Serializer 支持 relocation。Serializer 支持 relocation 是指赊淑,Serializer 可以對(duì)已經(jīng)序列化的對(duì)象進(jìn)行排序爵政,這種排序起到的效果和先對(duì)數(shù)據(jù)排序再序列化一致。支持 relocation 的 Serializer 是 KryoSerializer陶缺,Spark 默認(rèn)使用 JavaSerializer钾挟,通過參數(shù) spark.serializer 設(shè)置;
- 沒有指定 aggregation 或者 key 排序饱岸, 因?yàn)?key 沒有編碼到排序指針中掺出,所以只有 partition 級(jí)別的排序。
- partition 數(shù)量不能大于指定的閾值(2^24)伶贰,因?yàn)?partition number 使用24bit 表示的蛛砰。
UnsafeShuffleWriter 將 record 序列化后插入sorter,然后對(duì)已經(jīng)序列化的 record 進(jìn)行排序黍衙,并在排序完成后寫入磁盤文件作為 spill file泥畅,再將多個(gè) spill file 合并成一個(gè)輸出文件。在合并時(shí)會(huì)基于 spill file 的數(shù)量和 IO compression codec 選擇最合適的合并策略琅翻。
下面內(nèi)容來自 Spark ShuffleWriter 原理位仁。
UnsafeShuffleWriter 首先將數(shù)據(jù)序列化,保存在 MemoryBlock 中方椎。然后將該數(shù)據(jù)的地址和對(duì)應(yīng)的分區(qū)索引聂抢,保存在 ShuffleInMemorySorter 內(nèi)存中,利用ShuffleInMemorySorter 根據(jù)分區(qū)排序棠众。當(dāng)內(nèi)存不足時(shí)琳疏,會(huì)觸發(fā) spill 操作有决,生成spill 文件。最后會(huì)將所有的 spill文 件合并在同一個(gè)文件里空盼。
整個(gè)過程可以想象成歸并排序书幕。ShuffleExternalSorter 負(fù)責(zé)分片的讀取數(shù)據(jù)到內(nèi)存,然后利用 ShuffleInMemorySorter 進(jìn)行排序揽趾。排序之后會(huì)將結(jié)果存儲(chǔ)到磁盤文件中台汇。這樣就會(huì)有很多個(gè)已排序的文件, UnsafeShuffleWriter 會(huì)將所有的文件合并篱瞎。
下圖來自 Spark ShuffleWriter 原理苟呐,表示了map端一個(gè)分區(qū)的shuffle過程:
UnsafeShuffleWriter 是對(duì) SortShuffleWriter 的優(yōu)化,大體上也和 SortShuffleWriter 差不多俐筋。從內(nèi)存使用角度看牵素,主要差異在以下兩點(diǎn):
一方面,在 SortShuffleWriter 的 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 中校哎,存儲(chǔ)的是鍵值或者值的具體類型两波,也就是 Java 對(duì)象,是反序列化過后的數(shù)據(jù)闷哆。而在 UnsafeShuffleWriter 的 ShuffleExternalSorter 中數(shù)據(jù)是序列化以后存儲(chǔ)到實(shí)際的 Page 中腰奋,而且在寫入數(shù)據(jù)過程中會(huì)額外寫入長度信息”д總體而言劣坊,序列化以后數(shù)據(jù)大小是遠(yuǎn)遠(yuǎn)小于序列化之前的數(shù)據(jù)。
另一方面屈留,UnsafeShuffleWriter 中需要額外的存儲(chǔ)記錄(LongArray)局冰,它保存著分區(qū)信息和實(shí)際指向序列化后數(shù)據(jù)的指針(經(jīng)過編碼的Page num 以及 Offset)。相對(duì)于 SortShuffleWriter灌危, UnsafeShuffleWriter 中這部分存儲(chǔ)的開銷是額外的康二。
三、Spark Shuffle 中數(shù)據(jù)結(jié)構(gòu)
SortShuffleWriter 中使用 ExternalSorter 來對(duì)內(nèi)存中的數(shù)據(jù)進(jìn)行排序勇蝙,ExternalSorter 中緩存記錄數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu)有兩種:一種是Buffer沫勿,對(duì)應(yīng)的實(shí)現(xiàn)類PartitionedPairBuffer,設(shè)置mapSideCombine=false 時(shí)會(huì)使用該結(jié)構(gòu)味混;另一種是Map产雹,對(duì)應(yīng)的實(shí)現(xiàn)類是PartitionedAppendOnlyMap,設(shè)置mapSideCombine=true時(shí)會(huì)使用該結(jié)構(gòu)翁锡。
兩者都是使用了 hash table 數(shù)據(jù)結(jié)構(gòu)蔓挖, 如果需要進(jìn)行 aggregation, 就使用 PartitionedAppendOnlyMap(支持 lookup 某個(gè)Key馆衔,如果之前存儲(chǔ)過相同 key 的 K-V 元素瘟判,就需要進(jìn)行 aggregation怨绣,然后再存入aggregation 后的 K-V), 否則使用 PartitionedPairBuffer(只進(jìn)行添K-V 元素) 荒适。
3.1梨熙、PartitionedPairBuffer
設(shè)置 mapSideCombine=false 時(shí) ,這種情況在 Map 階段不進(jìn)行 Combine 操作刀诬,在內(nèi)存中緩存記錄數(shù)據(jù)會(huì)使用 PartitionedPairBuffer 這種數(shù)據(jù)結(jié)構(gòu)來緩存、排序記錄數(shù)據(jù)邪财,它是一個(gè) Append-only Buffer陕壹,僅支持向 Buffer 中追加數(shù)據(jù)鍵值對(duì)記錄,PartitionedPairBuffer 的結(jié)構(gòu)如下圖所示:
默認(rèn)情況下树埠,PartitionedPairBuffer 初始分配的存儲(chǔ)容量為 capacity = initialCapacity = 64糠馆,實(shí)際上這個(gè)容量是針對(duì)key的容量,因?yàn)橐鎯?chǔ)的是鍵值對(duì)記錄數(shù)據(jù)怎憋,所以實(shí)際存儲(chǔ)鍵值對(duì)的容量為 2 * initialCapacity = 128又碌。PartitionedPairBuffer 是一個(gè)能夠動(dòng)態(tài)擴(kuò)充容量的 Buffer,內(nèi)部使用一個(gè)一維數(shù)組來存儲(chǔ)鍵值對(duì)绊袋,每次擴(kuò)容結(jié)果為當(dāng)前Buffer容量的 2 倍毕匀,即 2*capacity,最大支持存儲(chǔ) 2^31-1個(gè)鍵值對(duì)記錄(1073741823個(gè)) 癌别。
PartitionedPairBuffer 存儲(chǔ)的鍵值對(duì)記錄數(shù)據(jù)皂岔,鍵是(partition, key)這樣一個(gè)Tuple,值是對(duì)應(yīng)的數(shù)據(jù) value展姐,而且 curSize 是用來跟蹤寫入 Buffer 中的記錄的躁垛,key 在 Buffer 中的索引位置為 2 * curSize,value 的索引位置為 2 * curSize+1圾笨,可見一個(gè)鍵值對(duì)的 key 和 value 的存儲(chǔ)在 PartitionedPairBuffer 內(nèi)部的數(shù)組中是相鄰的教馆。
使用PartitionedPairBuffer緩存鍵值對(duì)記錄數(shù)據(jù),通過跟蹤實(shí)際寫入到Buffer內(nèi)的記錄數(shù)據(jù)的字節(jié)數(shù)來判斷擂达,是否需要將Buffer中的數(shù)據(jù)Spill到磁盤文件土铺,如下代碼所示:
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
var shouldSpill = false
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
// Claim up to double our current memory from the shuffle memory pool
val amountToRequest = 2 * currentMemory - myMemoryThreshold
val granted = acquireMemory(amountToRequest)
myMemoryThreshold += granted
// If we were granted too little memory to grow further (either tryToAcquire returned 0,
// or we already had more memory than myMemoryThreshold), spill the current collection
shouldSpill = currentMemory >= myMemoryThreshold
}
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
// Actually spill
if (shouldSpill) {
_spillCount += 1
logSpillage(currentMemory)
spill(collection)
_elementsRead = 0
_memoryBytesSpilled += currentMemory
releaseMemory()
}
shouldSpill
}
elementsRead 表示存儲(chǔ)到 PartitionedPairBuffer 中的記錄數(shù),currentMemory 是對(duì) Buffer 中的總記錄數(shù)據(jù)大械瘛(字節(jié)數(shù))的估算舒憾,myMemoryThreshold 通過配置項(xiàng)spark.shuffle.spill.initialMemoryThreshold
來進(jìn)行設(shè)置,默認(rèn)值為5 * 1024 * 1024 = 5M穗熬。
當(dāng)滿足條件elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold
時(shí)镀迂,會(huì)先嘗試向 MemoryManager 申請(qǐng) 2 * currentMemory – myMemoryThreshold 大小的內(nèi)存,如果能夠申請(qǐng)到唤蔗,則不進(jìn)行 Spill 操作探遵,而是繼續(xù)向 Buffer 中存儲(chǔ)數(shù)據(jù)窟赏,否則就會(huì)調(diào)用 spill() 方法將 Buffer 中數(shù)據(jù)輸出到磁盤文件。
向 PartitionedPairBuffer 中寫入記錄數(shù)據(jù)箱季,以及滿足條件 Spill 記錄數(shù)據(jù)到磁盤文件涯穷,具體處理流程,如下圖所示:
在對(duì) PartitionedPairBuffer 中的記錄數(shù)據(jù) Spill 到磁盤之前藏雏,要使用默認(rèn)的排序規(guī)則進(jìn)行排序拷况,排序的規(guī)則是只對(duì) PartitionedPairBuffer 中的記錄按 Partition ID 進(jìn)行升序排序 。
當(dāng)滿足 Spill 條件時(shí)掘殴,先對(duì) PartitionedPairBuffer 中記錄進(jìn)行排序赚瘦,最后 Spill 到磁盤文件,這個(gè)過程中 PartitionedPairBuffer 中的記錄數(shù)據(jù)的變化情況奏寨,如下圖所示:
對(duì)內(nèi)存中 PartitionedPairBuffer 中的記錄按照 Partition ID 進(jìn)行排序起意,并且屬于同一個(gè) Partition 的數(shù)據(jù)記錄在 PartitionedPairBuffer 內(nèi)部的 data 數(shù)組中是連續(xù)的。排序結(jié)束后病瞳,在 Spill 到磁盤文件時(shí)揽咕,將對(duì)應(yīng)的 Partition ID 去掉了,只在文件 temp_shuffle_4c4b258d-52e4-47a0-a9b6-692f1af7ec9d 中連續(xù)存儲(chǔ)鍵值對(duì)數(shù)據(jù)套菜,但同時(shí)在另一個(gè)內(nèi)存數(shù)組結(jié)構(gòu)中會(huì)保存文件中每個(gè) Partition 擁有的記錄數(shù)亲善,這樣就能根據(jù) Partition 的記錄數(shù)來順序讀取文件 temp_shuffle_4c4b258d-52e4-47a0-a9b6-692f1af7ec9d 中屬于同一個(gè) Partition 的全部記錄數(shù)據(jù)。
ExternalSorter 類內(nèi)部維護(hù)了一個(gè) SpillFile 的 ArrayBuffer 數(shù)組笼踩,最終可能會(huì)生成多個(gè) SpillFile逗爹,SpillFile 的定義如下所示:
private[this] case class SpilledFile(
file: File,
blockId: BlockId,
serializerBatchSizes: Array[Long],
elementsPerPartition: Array[Long])
每個(gè) SpillFile 包含一個(gè) blockId,標(biāo)識(shí) Map 輸出的該臨時(shí)文件嚎于;serializerBatchSizes 表示每次批量寫入到文件的 Object 的數(shù)量掘而,默認(rèn)為 10000,由配置項(xiàng)spark.shuffle.spill.batchSize
來控制于购;elementsPerPartition 表示每個(gè) Partition 中的 Object 的數(shù)量袍睡。調(diào)用 ExternalSorter的insertAll() 方法,最終可能有如下3種情況:
- Map 階段輸出記錄數(shù)較少肋僧,沒有生成 SpillFile斑胜,那么所有數(shù)據(jù)都在 Buffer 中,直接對(duì) Buffer 中記錄排序并輸出到文件嫌吠;
- Map 階段輸出記錄數(shù)較多止潘,生成多個(gè) SpillFile,同時(shí) Buffer 中也有部分記錄數(shù)據(jù)辫诅;
- Map 階段輸出記錄數(shù)較多凭戴,只生成多個(gè) SpillFile;
3.2炕矮、PartitionedAppendOnlyMap
設(shè)置 mapSideCombine=true 時(shí) 么夫, 這種情況在Map階段會(huì)執(zhí)行 Combine 操作者冤,在 Map 階段進(jìn)行 Combine 操作能夠降低 Map 階段數(shù)據(jù)記錄的總數(shù),從而降低 Shuffle 過程中數(shù)據(jù)的跨網(wǎng)絡(luò)拷貝傳輸档痪。這時(shí)涉枫,RDD 對(duì)應(yīng)的 ShuffleDependency 需要設(shè)置一個(gè) Aggregator 用來執(zhí)行 Combine 操作。
Aggregator類聲明如下:
case class Aggregator[K, V, C] (
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {
...
}
由于在 Map 階段用到了構(gòu)造 Aggregator 的幾個(gè)函數(shù)參數(shù) createCombiner腐螟、mergeValue愿汰、mergeCombiners,對(duì)這幾個(gè)函數(shù)詳細(xì)說明如下:
- createCombiner:進(jìn)行 Aggregation 開始時(shí)遭垛,需要設(shè)置初始值尼桶。因?yàn)樵贏ggregation 過程中使用了類似 Map 的內(nèi)存數(shù)據(jù)結(jié)構(gòu)來管理鍵值對(duì),每次加入前會(huì)先查看 Map 內(nèi)存結(jié)構(gòu)中是否存在 Key 對(duì)應(yīng)的 Value锯仪,第一次肯定不存在,所以首次將某個(gè) Key 的 Value 加入到 Map 內(nèi)存結(jié)構(gòu)中時(shí)趾盐,Key 在 Map 內(nèi)存結(jié)構(gòu)中第一次有了 Value庶喜。
- mergeValue:某個(gè) Key 已經(jīng)在 Map 結(jié)構(gòu)中存在 Value,后續(xù)某次又遇到相同的 Key 和一個(gè)新的 Value救鲤,這時(shí)需要通過該函數(shù)久窟,將舊 Value 和新Value 進(jìn)行合并,根據(jù) Key 檢索能夠得到合并后的新 Value本缠。
- mergeCombiners:一個(gè) Map 內(nèi)存結(jié)構(gòu)中 Key 和 Value 是由mergeValue生成的斥扛,那么在向 Map 中插入數(shù)據(jù),肯定會(huì)遇到 Map 使用容量達(dá)到上限丹锹,這時(shí)需要將記錄數(shù)據(jù) Spill 到磁盤文件稀颁,那么多個(gè) Spill 輸出的磁盤文件中可能存在同一個(gè) Key,這時(shí)需要對(duì)多個(gè) Spill 輸出的磁盤文件中的 Key 的多個(gè) Value 進(jìn)行合并楣黍,這時(shí)需要使用 mergeCombiners 函數(shù)進(jìn)行處理匾灶。
Map Side Combine 時(shí)的處理流程,如下所示:
當(dāng)需要進(jìn)行Map Side Combine時(shí)租漂,對(duì)應(yīng)的ExternalSorter類insertAll()方法中的處理邏輯阶女,代碼如下所示:
def insertAll(records: Iterator[Product2[K, V]]): Unit = {
// TODO: stop combining if we find that the reduction factor isn't high
val shouldCombine = aggregator.isDefined
if (shouldCombine) {
// Combine values in-memory first using our AppendOnlyMap
val mergeValue = aggregator.get.mergeValue
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (records.hasNext) {
addElementsRead()
kv = records.next()
map.changeValue((getPartition(kv._1), kv._1), update)
maybeSpillCollection(usingMap = true)
}
}
}
map 是內(nèi)存數(shù)據(jù)結(jié)構(gòu),最重要的是 update 函數(shù)和 map 的 changeValue方法(這里的 map 對(duì)應(yīng)的實(shí)現(xiàn)類是 PartitionedAppendOnlyMap)哩治。
- update 函數(shù)所做的工作秃踩,其實(shí)就是對(duì) createCombiner 和 mergeValue 這兩個(gè)函數(shù)的使用,第一次遇到一個(gè)Key調(diào)用createCombiner函數(shù)處理业筏,非首次遇到同一個(gè)Key 對(duì)應(yīng)新的 Value 調(diào)用 mergeValue 函數(shù)進(jìn)行合并處理憔杨。
- map 的 changeValue 方法主要是將 Key 和 Value 在 map 中存儲(chǔ)或者進(jìn)行修改(對(duì)出現(xiàn)的同一個(gè) Key 的多個(gè) Value 進(jìn)行合并,并將合并后的新 Value 替換舊 Value)驾孔。
PartitionedAppendOnlyMap 是一個(gè)經(jīng)過優(yōu)化的哈希表芍秆,它支持向 map 中追加數(shù)據(jù)惯疙,以及修改 Key 對(duì)應(yīng)的 Value,但是不支持刪除某個(gè) Key 及其對(duì)應(yīng)的 Value妖啥。它能夠支持的存儲(chǔ)容量是 0.7 * 2 ^ 29 = 375809638霉颠。當(dāng)達(dá)到指定存儲(chǔ)容量或者指定限制,就會(huì)將 map 中記錄數(shù)據(jù) Spill 到磁盤文件荆虱,這個(gè)過程和前面的類似蒿偎,不再累述。
PartitionedAppendOnlyMap 中的 K 是(PatitionId, K)的元組怀读, 這樣就是先按照partitionId 進(jìn)行排序诉位,如果 partitionId 相同,再按照 hash(key)再進(jìn)行排序菜枷。
3.3苍糠、AppendOnlyMap
AppendOnlyMap 單從命名上來看,是一個(gè)只能追加元素的 Map 結(jié)構(gòu)。的確啤誊,它是只支持追加的 map岳瞭,可以修改某個(gè) key 對(duì)應(yīng)的 value,但是不能刪除已經(jīng)存在的 key蚊锹。
底層是由數(shù)組結(jié)構(gòu)實(shí)現(xiàn)的瞳筏,當(dāng)需要對(duì) Key-Value 進(jìn)行聚合時(shí),會(huì)使用AppendOnlyMap 作為 buffer牡昆。在插入或者修改元素的時(shí)候姚炕,會(huì)判斷是否擴(kuò)容,如果達(dá)到擴(kuò)容標(biāo)準(zhǔn)丢烘,將會(huì)對(duì)數(shù)組 2 倍容量進(jìn)行擴(kuò)容柱宦,擴(kuò)容過程中原有元素并不是直接拷貝,而是進(jìn)行原有元素的重新定位存儲(chǔ)铅协,如果集合中存在的數(shù)據(jù)量大捷沸,那么這里的操作將會(huì)耗時(shí)又耗資源。
存儲(chǔ)級(jí)別是 Memory-Only 狐史,在 shuffle reduce 數(shù)據(jù)不會(huì)溢寫痒给,在數(shù)據(jù)量不大的情況下可以,但是數(shù)據(jù)量大時(shí)骏全,會(huì)極易出現(xiàn)OOM苍柏。
3.3、ExternalAppendOnlyMap
繼承于AppendOnlyMap 姜贡,但是存儲(chǔ)級(jí)別是 Memory and Disk试吁,即在數(shù)據(jù)量達(dá)到一個(gè)閾值的時(shí)候,會(huì)把數(shù)據(jù)溢寫到磁盤,達(dá)到釋放內(nèi)存空間熄捍,降低 OOM 的風(fēng)險(xiǎn)的作用烛恤。
四、來源文章
詳見:
1.https://zhangchenchen.github.io/2018/09/26/deep-in-spark-shuffle/
2.https://zhmin.github.io/2019/01/26/spark-shuffle-writer/
3.https://toutiao.io/posts/eicdjo/preview
4.http://shiyanjun.cn/archives/1655.html