歡迎關(guān)注公眾號(hào) “Tim在路上”
BypassMergeSortShuffleWriter
就如其名,旁支的sort-baesd Shuffle, 他是采用Hash-style實(shí)現(xiàn)的Sort based Shuffle库糠。在map階段records會(huì)按分區(qū)寫入不同的文件舷丹, 一個(gè)分區(qū)一個(gè)文件。然后鏈接這些分區(qū)文件形成一個(gè)output文件翘魄,并生成其index鼎天。reducer通過(guò)IndexShuffleBlockResolver
查找消費(fèi)輸出文件的不同分區(qū)。
在 BypassMergeSortShuffleWriter
中records是不會(huì)緩存在內(nèi)存中暑竟,所有的records最終都會(huì)被flush到磁盤斋射。
在寫入時(shí),BypassMergeSortShuffleWriter
會(huì)同時(shí)為所有的分區(qū)打開(kāi)單獨(dú)的序列化器和文件流但荤,所以當(dāng)reduce分區(qū)數(shù)量特別大的時(shí)候性能會(huì)非常低下罗岖。
ShuffleWriter 的調(diào)用是在ShuffleMapTask的runTask中進(jìn)行調(diào)用,每個(gè)mapTask 都會(huì)調(diào)用一次runTask腹躁。
BypassMergeSortShuffleWriter 源碼解析
首先桑包,我們來(lái)回顧下ShuffleWriter的過(guò)程。Shuffle發(fā)生與寬依賴的stage間纺非,由于stage內(nèi)的計(jì)算采用pipeline的方式哑了。shuffle發(fā)生的上一個(gè)stage為map節(jié)點(diǎn)赘方,下游的stage為reduce階段。而shuffle寫的過(guò)程就發(fā)生在map階段弱左,shuffleWriter的調(diào)用主要是在ShuffleMapStage中窄陡,每個(gè)ShuffleMapStage包含多個(gè)ShuffleMapTask, mapTask個(gè)數(shù)和分區(qū)數(shù)相關(guān)。
這樣每個(gè)ShuffleMapTask都會(huì)在其runTask調(diào)用下Writer接口拆火,其并非直接調(diào)用到具體的執(zhí)行類跳夭。而是在劃分寬依賴時(shí)想ShuffleManage注冊(cè)shuffle時(shí),返回的ShuffleHandler決定的榜掌。
在ShuffleMapTask調(diào)用Writer時(shí)优妙,是先調(diào)用了ShuffleWriteProcessor
,主要控制了ShuffleWriter的生命周期憎账。下面我們看下ShuffleWriteProcessor
中的Write的實(shí)現(xiàn):
// ShuffleWriteProcessor
def write(
rdd: RDD[_],
dep: ShuffleDependency[_, _, _],
mapId: Long,
context: TaskContext,
partition: Partition): MapStatus = {
var writer: ShuffleWriter[Any, Any] = null
try {
// [1] 通過(guò)SparkEnv獲取ShuffleManager, 并通過(guò)dep的shuffleHandle套硼, 獲取對(duì)應(yīng)的shuffleWriter的具體實(shí)現(xiàn)。
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](
dep.shuffleHandle,
mapId,
context,
createMetricsReporter(context))
// [2] 調(diào)用shuffleWriter的write方法, 并將當(dāng)前rdd的迭代器傳入
writer.write(
rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
// [3] shuffleWriter結(jié)束后,返回mapStatus,或清空數(shù)據(jù)
val mapStatus = writer.stop(success = true)
// [4] 如果shuffleWriter執(zhí)行成功,初始化push-based shuffle, 后面再細(xì)講
if (mapStatus.isDefined) {
// Initiate shuffle push process if push based shuffle is enabled
// The map task only takes care of converting the shuffle data file into multiple
// block push requests. It delegates pushing the blocks to a different thread-pool -
// ShuffleBlockPusher.BLOCK_PUSHER_POOL.
if (dep.shuffleMergeEnabled && dep.getMergerLocs.nonEmpty && !dep.shuffleMergeFinalized) {
manager.shuffleBlockResolver match {
case resolver: IndexShuffleBlockResolver =>
val dataFile = resolver.getDataFile(dep.shuffleId, mapId)
new ShuffleBlockPusher(SparkEnv.get.conf)
.initiateBlockPush(dataFile, writer.getPartitionLengths(), dep, partition.index)
case _ =>
}
}
}
mapStatus.get
}
...
}
ShuffleWriteProcessor
中主要做了三件事:
- [1] 通過(guò)SparkEnv獲取ShuffleManager, 并通過(guò)dep的shuffleHandle隙赁, 獲取對(duì)應(yīng)的shuffleWriter的具體實(shí)現(xiàn)卓起。
- [2] 調(diào)用shuffleWriter的write方法, 并將當(dāng)前rdd的迭代器傳入
- [3] shuffleWriter結(jié)束后馒胆,返回mapStatus,或清空數(shù)據(jù)
可見(jiàn)每一個(gè)ShuffleMapTask執(zhí)行結(jié)束后,就會(huì)返回一個(gè)mapStatus。Task 結(jié)果被封裝成 CompletionEvent
發(fā)送到Driver DAG Scheduler 。判斷Task的類型是ShuffleMapTask會(huì)DagScheduler 會(huì)向 MapOutputTracker 注冊(cè) MapOutput status 信息策菜。
那么map中的數(shù)據(jù)是如何通過(guò)BypassMergeSortShuffleWriter寫入的?
// BypassMergeSortShuffleWriter
@Override
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
// [1] 創(chuàng)建處理mapTask所有分區(qū)數(shù)據(jù)commit提交writer
ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
.createMapOutputWriter(shuffleId, mapId, numPartitions);
try {
// 如果沒(méi)有數(shù)據(jù)酒贬,直接提交所有分區(qū)的commit, 并返回分區(qū)長(zhǎng)度又憨,獲取mapStatus
if (!records.hasNext()) {
partitionLengths = mapOutputWriter.commitAllPartitions(
ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE).getPartitionLengths();
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapId);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
final long openStartTime = System.nanoTime();
// [2] 為每個(gè)分區(qū)創(chuàng)建一個(gè)DiskBlockObjectWriter寫入流和FileSegment文件段
partitionWriters = new DiskBlockObjectWriter[numPartitions];
partitionWriterSegments = new FileSegment[numPartitions];
for (int i = 0; i < numPartitions; i++) {
// [2.1] 每個(gè)分區(qū)創(chuàng)建個(gè)臨時(shí)file和blockid, 并生成維護(hù)一個(gè)寫入流
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
DiskBlockObjectWriter writer =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
if (partitionChecksums.length > 0) {
writer.setChecksum(partitionChecksums[i]);
}
partitionWriters[i] = writer;
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
// [3] 依次將records寫入到對(duì)應(yīng)分區(qū)的寫入流中, 并提交
while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}
// [3.1]依次對(duì)每個(gè)分區(qū)提交和flush寫入流
for (int i = 0; i < numPartitions; i++) {
try (DiskBlockObjectWriter writer = partitionWriters[i]) {
partitionWriterSegments[i] = writer.commitAndGet();
}
}
// [4] 遍歷所有分區(qū)的FileSegement, 并將其鏈接為一個(gè)文件,同時(shí)會(huì)調(diào)用writeMetadataFileAndCommit锭吨,為其生成索引文件
partitionLengths = writePartitionedData(mapOutputWriter);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapId);
} catch (Exception e) {
try {
mapOutputWriter.abort(e);
} catch (Exception e2) {
logger.error("Failed to abort the writer after failing to write map output.", e2);
e.addSuppressed(e2);
}
throw e;
}
}
綜上蠢莺,Bypass的writer步驟有四步:
[1] 創(chuàng)建處理mapTask所有分區(qū)數(shù)據(jù)commit提交writer
-
[2] 為每個(gè)分區(qū)創(chuàng)建一個(gè)DiskBlockObjectWriter寫入流和FileSegment文件段
- [2.1] 每個(gè)分區(qū)創(chuàng)建個(gè)臨時(shí)file和blockid, 并生成維護(hù)一個(gè)DiskBlockObjectWriter寫入流
-
[3] 依次將records寫入到對(duì)應(yīng)分區(qū)的寫入流中, 并提交
- [3.1]依次對(duì)每個(gè)分區(qū)提交和flush寫入流
[4] 遍歷所有分區(qū)的FileSegement, 并將其鏈接為一個(gè)文件,同時(shí)會(huì)調(diào)用writeMetadataFileAndCommit零如,為其生成索引文件
所以說(shuō)躏将, Bypass在進(jìn)行寫入時(shí)會(huì)為每個(gè)MapTask都會(huì)生成reduce分區(qū)個(gè)FileSegement, 寫入時(shí)會(huì)并發(fā)的為所有的分區(qū)都創(chuàng)建臨時(shí)文件和維護(hù)一個(gè)io的寫入流, 最終在鏈接為一個(gè)文件考蕾。所以如果分區(qū)數(shù)特別多的情況下祸憋,是會(huì)維護(hù)很多io流,所以Bypass限制了分區(qū)的閾值肖卧。另外通過(guò)源碼發(fā)現(xiàn)Bypass在實(shí)現(xiàn)過(guò)程中并沒(méi)有使用buffer, 而是直接將數(shù)據(jù)寫入到流中夺衍,這也就是為什么Bypass不能處理mapSide的預(yù)聚合的算子。
那么BypassMergeSortShuffleWriter 屬于sort-based Shuffle 到底有沒(méi)有排序呢喜命?
接下來(lái)沟沙,我們?cè)倏聪翨ypass是如何將分區(qū)的FileSegement, 并將其鏈接為一個(gè)文件, 我們就需要詳細(xì)看下writePartitionedData是如何實(shí)現(xiàn)的壁榕。
private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) throws IOException {
// Track location of the partition starts in the output file
if (partitionWriters != null) {
final long writeStartTime = System.nanoTime();
try {
for (int i = 0; i < numPartitions; i++) {
// [1] 獲取每個(gè)分區(qū)的 fileSegement 臨時(shí)文件矛紫,和writer寫出流
final File file = partitionWriterSegments[i].file();
ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
if (file.exists()) {
if (transferToEnabled) {
// Using WritableByteChannelWrapper to make resource closing consistent between
// this implementation and UnsafeShuffleWriter.
Optional<WritableByteChannelWrapper> maybeOutputChannel = writer.openChannelWrapper();
if (maybeOutputChannel.isPresent()) {
writePartitionedDataWithChannel(file, maybeOutputChannel.get());
} else {
writePartitionedDataWithStream(file, writer);
}
} else {
// [2] 將fileSegement合并為一個(gè)文件
writePartitionedDataWithStream(file, writer);
}
if (!file.delete()) {
logger.error("Unable to delete file for partition {}", i);
}
}
}
} finally {
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
}
partitionWriters = null;
}
// [3] 提交所有的分區(qū),傳入每個(gè)分區(qū)數(shù)據(jù)的長(zhǎng)度牌里, 調(diào)用 writeMetadataFileAndCommit生成索引文件颊咬,記錄每個(gè)分區(qū)的偏移量
return mapOutputWriter.commitAllPartitions(getChecksumValues(partitionChecksums))
.getPartitionLengths();
}
writePartitionedData是如何實(shí)現(xiàn),有三個(gè)步驟:
- [1] 獲取每個(gè)分區(qū)的 fileSegement 臨時(shí)文件牡辽,和writer寫出流
- [2] 將fileSegement合并為一個(gè)文件
- [3] 提交所有的分區(qū)喳篇,傳入每個(gè)分區(qū)數(shù)據(jù)的長(zhǎng)度, 調(diào)用 writeMetadataFileAndCommit生成索引文件态辛,記錄每個(gè)分區(qū)的偏移量
總結(jié)麸澜, BypassMergeSortShuffleWriter 的實(shí)現(xiàn)是hash-style的方式,其中沒(méi)有sort, 沒(méi)有buffer奏黑,每一個(gè)mapTask都會(huì)生成分區(qū)數(shù)量個(gè)FileSegment, 最后再合并為一個(gè)File, 最終根據(jù)分區(qū)的長(zhǎng)度為其生成索引文件炊邦。所以BypassMergeSortShuffleWriter在分區(qū)數(shù)量比較小的情況下,性能是比較佳的熟史。其最終每個(gè)task會(huì)生成2個(gè)文件馁害, 所以最終的生成文件數(shù)也是2 * M個(gè)文件。
今天就先到這里蹂匹,通過(guò)上面的介紹碘菜,我們也留下些面試題:
- BypassMergeSortShuffleWriter和HashShuffle有什么區(qū)別?
- 為什么不保留HashShuffleManage, 而是將其作為SortShuffleManager中的一個(gè)Writer實(shí)現(xiàn)?
歡迎關(guān)注公眾號(hào) “Tim在路上”