[SPARK][CORE] 面試問(wèn)題之 BypassMergeSortShuffleWriter的細(xì)節(jié)

歡迎關(guān)注公眾號(hào) “Tim在路上”
BypassMergeSortShuffleWriter 就如其名,旁支的sort-baesd Shuffle, 他是采用Hash-style實(shí)現(xiàn)的Sort based Shuffle库糠。在map階段records會(huì)按分區(qū)寫入不同的文件舷丹, 一個(gè)分區(qū)一個(gè)文件。然后鏈接這些分區(qū)文件形成一個(gè)output文件翘魄,并生成其index鼎天。reducer通過(guò)IndexShuffleBlockResolver 查找消費(fèi)輸出文件的不同分區(qū)。

BypassMergeSortShuffleWriter 中records是不會(huì)緩存在內(nèi)存中暑竟,所有的records最終都會(huì)被flush到磁盤斋射。

在寫入時(shí),BypassMergeSortShuffleWriter 會(huì)同時(shí)為所有的分區(qū)打開(kāi)單獨(dú)的序列化器和文件流但荤,所以當(dāng)reduce分區(qū)數(shù)量特別大的時(shí)候性能會(huì)非常低下罗岖。

ShuffleWriter 的調(diào)用是在ShuffleMapTask的runTask中進(jìn)行調(diào)用,每個(gè)mapTask 都會(huì)調(diào)用一次runTask腹躁。

BypassMergeSortShuffleWriter 源碼解析

首先桑包,我們來(lái)回顧下ShuffleWriter的過(guò)程。Shuffle發(fā)生與寬依賴的stage間纺非,由于stage內(nèi)的計(jì)算采用pipeline的方式哑了。shuffle發(fā)生的上一個(gè)stage為map節(jié)點(diǎn)赘方,下游的stage為reduce階段。而shuffle寫的過(guò)程就發(fā)生在map階段弱左,shuffleWriter的調(diào)用主要是在ShuffleMapStage中窄陡,每個(gè)ShuffleMapStage包含多個(gè)ShuffleMapTask, mapTask個(gè)數(shù)和分區(qū)數(shù)相關(guān)。

這樣每個(gè)ShuffleMapTask都會(huì)在其runTask調(diào)用下Writer接口拆火,其并非直接調(diào)用到具體的執(zhí)行類跳夭。而是在劃分寬依賴時(shí)想ShuffleManage注冊(cè)shuffle時(shí),返回的ShuffleHandler決定的榜掌。

在ShuffleMapTask調(diào)用Writer時(shí)优妙,是先調(diào)用了ShuffleWriteProcessor ,主要控制了ShuffleWriter的生命周期憎账。下面我們看下ShuffleWriteProcessor 中的Write的實(shí)現(xiàn):

// ShuffleWriteProcessor
def write(
    rdd: RDD[_],
    dep: ShuffleDependency[_, _, _],
    mapId: Long,
    context: TaskContext,
    partition: Partition): MapStatus = {
  var writer: ShuffleWriter[Any, Any] = null
  try {
    // [1] 通過(guò)SparkEnv獲取ShuffleManager, 并通過(guò)dep的shuffleHandle套硼, 獲取對(duì)應(yīng)的shuffleWriter的具體實(shí)現(xiàn)。
    val manager = SparkEnv.get.shuffleManager
    writer = manager.getWriter[Any, Any](
      dep.shuffleHandle,
      mapId,
      context,
      createMetricsReporter(context))
    // [2] 調(diào)用shuffleWriter的write方法, 并將當(dāng)前rdd的迭代器傳入
    writer.write(
      rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    // [3] shuffleWriter結(jié)束后,返回mapStatus,或清空數(shù)據(jù)
    val mapStatus = writer.stop(success = true)
    // [4] 如果shuffleWriter執(zhí)行成功,初始化push-based shuffle, 后面再細(xì)講
    if (mapStatus.isDefined) {
      // Initiate shuffle push process if push based shuffle is enabled
      // The map task only takes care of converting the shuffle data file into multiple
      // block push requests. It delegates pushing the blocks to a different thread-pool -
      // ShuffleBlockPusher.BLOCK_PUSHER_POOL.
      if (dep.shuffleMergeEnabled && dep.getMergerLocs.nonEmpty && !dep.shuffleMergeFinalized) {
        manager.shuffleBlockResolver match {
          case resolver: IndexShuffleBlockResolver =>
            val dataFile = resolver.getDataFile(dep.shuffleId, mapId)
            new ShuffleBlockPusher(SparkEnv.get.conf)
              .initiateBlockPush(dataFile, writer.getPartitionLengths(), dep, partition.index)
          case _ =>
        }
      }
    }
    mapStatus.get
  }
...
}

ShuffleWriteProcessor 中主要做了三件事:

  • [1] 通過(guò)SparkEnv獲取ShuffleManager, 并通過(guò)dep的shuffleHandle隙赁, 獲取對(duì)應(yīng)的shuffleWriter的具體實(shí)現(xiàn)卓起。
  • [2] 調(diào)用shuffleWriter的write方法, 并將當(dāng)前rdd的迭代器傳入
  • [3] shuffleWriter結(jié)束后馒胆,返回mapStatus,或清空數(shù)據(jù)

可見(jiàn)每一個(gè)ShuffleMapTask執(zhí)行結(jié)束后,就會(huì)返回一個(gè)mapStatus。Task 結(jié)果被封裝成 CompletionEvent發(fā)送到Driver DAG Scheduler 。判斷Task的類型是ShuffleMapTask會(huì)DagScheduler 會(huì)向 MapOutputTracker 注冊(cè) MapOutput status 信息策菜。

那么map中的數(shù)據(jù)是如何通過(guò)BypassMergeSortShuffleWriter寫入的?

// BypassMergeSortShuffleWriter
@Override
public void write(Iterator<Product2<K, V>> records) throws IOException {
  assert (partitionWriters == null);
  // [1] 創(chuàng)建處理mapTask所有分區(qū)數(shù)據(jù)commit提交writer
  ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
      .createMapOutputWriter(shuffleId, mapId, numPartitions);
  try {
    // 如果沒(méi)有數(shù)據(jù)酒贬,直接提交所有分區(qū)的commit, 并返回分區(qū)長(zhǎng)度又憨,獲取mapStatus
    if (!records.hasNext()) {
      partitionLengths = mapOutputWriter.commitAllPartitions(
        ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE).getPartitionLengths();
      mapStatus = MapStatus$.MODULE$.apply(
        blockManager.shuffleServerId(), partitionLengths, mapId);
      return;
    }
    final SerializerInstance serInstance = serializer.newInstance();
    final long openStartTime = System.nanoTime();
    // [2] 為每個(gè)分區(qū)創(chuàng)建一個(gè)DiskBlockObjectWriter寫入流和FileSegment文件段
    partitionWriters = new DiskBlockObjectWriter[numPartitions];
    partitionWriterSegments = new FileSegment[numPartitions];
    for (int i = 0; i < numPartitions; i++) {
      // [2.1] 每個(gè)分區(qū)創(chuàng)建個(gè)臨時(shí)file和blockid, 并生成維護(hù)一個(gè)寫入流
      final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
          blockManager.diskBlockManager().createTempShuffleBlock();
      final File file = tempShuffleBlockIdPlusFile._2();
      final BlockId blockId = tempShuffleBlockIdPlusFile._1();
      DiskBlockObjectWriter writer =
        blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
      if (partitionChecksums.length > 0) {
        writer.setChecksum(partitionChecksums[i]);
      }
      partitionWriters[i] = writer;
    } 
    // Creating the file to write to and creating a disk writer both involve interacting with
    // the disk, and can take a long time in aggregate when we open many files, so should be
    // included in the shuffle write time.
    writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
    // [3] 依次將records寫入到對(duì)應(yīng)分區(qū)的寫入流中, 并提交
    while (records.hasNext()) {
      final Product2<K, V> record = records.next();
      final K key = record._1();
      partitionWriters[partitioner.getPartition(key)].write(key, record._2());
    }

    // [3.1]依次對(duì)每個(gè)分區(qū)提交和flush寫入流
    for (int i = 0; i < numPartitions; i++) {
      try (DiskBlockObjectWriter writer = partitionWriters[i]) {
        partitionWriterSegments[i] = writer.commitAndGet();
      }
    }
    // [4] 遍歷所有分區(qū)的FileSegement, 并將其鏈接為一個(gè)文件,同時(shí)會(huì)調(diào)用writeMetadataFileAndCommit锭吨,為其生成索引文件
    partitionLengths = writePartitionedData(mapOutputWriter);
    mapStatus = MapStatus$.MODULE$.apply(
      blockManager.shuffleServerId(), partitionLengths, mapId);
  } catch (Exception e) {
    try {
      mapOutputWriter.abort(e);
    } catch (Exception e2) {
logger.error("Failed to abort the writer after failing to write map output.", e2);
      e.addSuppressed(e2);
    }
    throw e;
  }
}

綜上蠢莺,Bypass的writer步驟有四步:

  • [1] 創(chuàng)建處理mapTask所有分區(qū)數(shù)據(jù)commit提交writer

  • [2] 為每個(gè)分區(qū)創(chuàng)建一個(gè)DiskBlockObjectWriter寫入流和FileSegment文件段

    • [2.1] 每個(gè)分區(qū)創(chuàng)建個(gè)臨時(shí)file和blockid, 并生成維護(hù)一個(gè)DiskBlockObjectWriter寫入流
  • [3] 依次將records寫入到對(duì)應(yīng)分區(qū)的寫入流中, 并提交

    • [3.1]依次對(duì)每個(gè)分區(qū)提交和flush寫入流
  • [4] 遍歷所有分區(qū)的FileSegement, 并將其鏈接為一個(gè)文件,同時(shí)會(huì)調(diào)用writeMetadataFileAndCommit零如,為其生成索引文件

所以說(shuō)躏将, Bypass在進(jìn)行寫入時(shí)會(huì)為每個(gè)MapTask都會(huì)生成reduce分區(qū)個(gè)FileSegement, 寫入時(shí)會(huì)并發(fā)的為所有的分區(qū)都創(chuàng)建臨時(shí)文件和維護(hù)一個(gè)io的寫入流, 最終在鏈接為一個(gè)文件考蕾。所以如果分區(qū)數(shù)特別多的情況下祸憋,是會(huì)維護(hù)很多io流,所以Bypass限制了分區(qū)的閾值肖卧。另外通過(guò)源碼發(fā)現(xiàn)Bypass在實(shí)現(xiàn)過(guò)程中并沒(méi)有使用buffer, 而是直接將數(shù)據(jù)寫入到流中夺衍,這也就是為什么Bypass不能處理mapSide的預(yù)聚合的算子。

那么BypassMergeSortShuffleWriter 屬于sort-based Shuffle 到底有沒(méi)有排序呢喜命?

接下來(lái)沟沙,我們?cè)倏聪翨ypass是如何將分區(qū)的FileSegement, 并將其鏈接為一個(gè)文件, 我們就需要詳細(xì)看下writePartitionedData是如何實(shí)現(xiàn)的壁榕。

private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) throws IOException {
  // Track location of the partition starts in the output file
  if (partitionWriters != null) {
    final long writeStartTime = System.nanoTime();
    try {
      for (int i = 0; i < numPartitions; i++) {
        // [1] 獲取每個(gè)分區(qū)的 fileSegement 臨時(shí)文件矛紫,和writer寫出流
        final File file = partitionWriterSegments[i].file();
        ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
        if (file.exists()) {
          if (transferToEnabled) {
            // Using WritableByteChannelWrapper to make resource closing consistent between
            // this implementation and UnsafeShuffleWriter.
            Optional<WritableByteChannelWrapper> maybeOutputChannel = writer.openChannelWrapper();
            if (maybeOutputChannel.isPresent()) {
              writePartitionedDataWithChannel(file, maybeOutputChannel.get());
            } else {
              writePartitionedDataWithStream(file, writer);
            }
          } else {
            // [2] 將fileSegement合并為一個(gè)文件
            writePartitionedDataWithStream(file, writer);
          }
          if (!file.delete()) {
logger.error("Unable to delete file for partition {}", i);
          }
        }
      }
    } finally {
      writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
    }
    partitionWriters = null;
  }
  // [3] 提交所有的分區(qū),傳入每個(gè)分區(qū)數(shù)據(jù)的長(zhǎng)度牌里, 調(diào)用 writeMetadataFileAndCommit生成索引文件颊咬,記錄每個(gè)分區(qū)的偏移量
  return mapOutputWriter.commitAllPartitions(getChecksumValues(partitionChecksums))
    .getPartitionLengths();
}

writePartitionedData是如何實(shí)現(xiàn),有三個(gè)步驟:

  • [1] 獲取每個(gè)分區(qū)的 fileSegement 臨時(shí)文件牡辽,和writer寫出流
  • [2] 將fileSegement合并為一個(gè)文件
  • [3] 提交所有的分區(qū)喳篇,傳入每個(gè)分區(qū)數(shù)據(jù)的長(zhǎng)度, 調(diào)用 writeMetadataFileAndCommit生成索引文件态辛,記錄每個(gè)分區(qū)的偏移量
bypass.png

總結(jié)麸澜, BypassMergeSortShuffleWriter 的實(shí)現(xiàn)是hash-style的方式,其中沒(méi)有sort, 沒(méi)有buffer奏黑,每一個(gè)mapTask都會(huì)生成分區(qū)數(shù)量個(gè)FileSegment, 最后再合并為一個(gè)File, 最終根據(jù)分區(qū)的長(zhǎng)度為其生成索引文件炊邦。所以BypassMergeSortShuffleWriter在分區(qū)數(shù)量比較小的情況下,性能是比較佳的熟史。其最終每個(gè)task會(huì)生成2個(gè)文件馁害, 所以最終的生成文件數(shù)也是2 * M個(gè)文件。

今天就先到這里蹂匹,通過(guò)上面的介紹碘菜,我們也留下些面試題:

  1. BypassMergeSortShuffleWriter和HashShuffle有什么區(qū)別?
  2. 為什么不保留HashShuffleManage, 而是將其作為SortShuffleManager中的一個(gè)Writer實(shí)現(xiàn)?

歡迎關(guān)注公眾號(hào) “Tim在路上”

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末限寞,一起剝皮案震驚了整個(gè)濱河市忍啸,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌昆烁,老刑警劉巖吊骤,帶你破解...
    沈念sama閱讀 206,723評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異静尼,居然都是意外死亡白粉,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,485評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門鼠渺,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)鸭巴,“玉大人,你說(shuō)我怎么就攤上這事拦盹【樽妫” “怎么了?”我有些...
    開(kāi)封第一講書(shū)人閱讀 152,998評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵普舆,是天一觀的道長(zhǎng)恬口。 經(jīng)常有香客問(wèn)我校读,道長(zhǎng),這世上最難降的妖魔是什么祖能? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 55,323評(píng)論 1 279
  • 正文 為了忘掉前任歉秫,我火速辦了婚禮,結(jié)果婚禮上养铸,老公的妹妹穿的比我還像新娘雁芙。我一直安慰自己,他們只是感情好钞螟,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,355評(píng)論 5 374
  • 文/花漫 我一把揭開(kāi)白布兔甘。 她就那樣靜靜地躺著,像睡著了一般鳞滨。 火紅的嫁衣襯著肌膚如雪洞焙。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 49,079評(píng)論 1 285
  • 那天太援,我揣著相機(jī)與錄音闽晦,去河邊找鬼。 笑死提岔,一個(gè)胖子當(dāng)著我的面吹牛仙蛉,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播碱蒙,決...
    沈念sama閱讀 38,389評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼荠瘪,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了赛惩?” 一聲冷哼從身側(cè)響起哀墓,我...
    開(kāi)封第一講書(shū)人閱讀 37,019評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎喷兼,沒(méi)想到半個(gè)月后篮绰,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,519評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡季惯,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,971評(píng)論 2 325
  • 正文 我和宋清朗相戀三年吠各,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片勉抓。...
    茶點(diǎn)故事閱讀 38,100評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡贾漏,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出藕筋,到底是詐尸還是另有隱情纵散,我是刑警寧澤,帶...
    沈念sama閱讀 33,738評(píng)論 4 324
  • 正文 年R本政府宣布,位于F島的核電站伍掀,受9級(jí)特大地震影響掰茶,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜硕盹,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,293評(píng)論 3 307
  • 文/蒙蒙 一符匾、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧瘩例,春花似錦、人聲如沸甸各。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,289評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)趣倾。三九已至聘惦,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間儒恋,已是汗流浹背善绎。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 31,517評(píng)論 1 262
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留诫尽,地道東北人禀酱。 一個(gè)月前我還...
    沈念sama閱讀 45,547評(píng)論 2 354
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像牧嫉,于是被迫代替她去往敵國(guó)和親剂跟。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,834評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容