spark源碼閱讀之shuffle模塊②

spark源碼閱讀之shuffle模塊①中,介紹了spark版本shuffle的演化史罪针,提到了主要的兩個shuffle策略:HashBasedShuffle和SortedBasedShuffle盹愚,分別分析了它們的原理以及shuffle write過程,而中間的過程站故,也就是shuffleMapTask運算結(jié)果的處理過程在spark源碼閱讀之executor模塊③文章中也已經(jīng)分析過皆怕,本章繼續(xù)分析下游的shuffle read過程,本篇文章源碼基于spark 1.6.3

shuffle read

shuffle read的起點應(yīng)該是下游的Reducer來讀取中間落地文件西篓,而除了需要從外部存儲取數(shù)據(jù)和已經(jīng)cache或者checkpoint的RDD之外愈腾,一般的Task都是通過ShuffledRDD的shuffle read開始reduce之旅的。

首先可以看一下ShuffledRDD的compute()方法

override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
  val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
  SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
    .read()
    .asInstanceOf[Iterator[(K, C)]]
}

調(diào)用ShuffleManager的getReader方法回去一個reader岂津,之前說過虱黄,ShuffleManager這里有兩個實現(xiàn)類,HashShuffleManager和SortShuffleManager了吮成,分別對應(yīng)兩種不同的策略橱乱,但在shuffle read的過程中,他們的getReader方法都創(chuàng)建了同一個BlockStoreShuffleReader對象粱甫,也就是他們的shuffle read過程相同泳叠,接著應(yīng)該點入BlockStoreShuffleReader的read()方法:

// shuffle read的核心實現(xiàn),讀取map out結(jié)果并做聚合
  override def read(): Iterator[Product2[K, C]] = {
    val blockFetcherItr = new ShuffleBlockFetcherIterator(
      context,
      blockManager.shuffleClient,
      blockManager,
      mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
      // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
      SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024)

    // Wrap the streams for compression based on configuration
    val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) =>
      blockManager.wrapForCompression(blockId, inputStream)   //將輸入根據(jù)參數(shù)進(jìn)行壓縮
    }

    val ser = Serializer.getSerializer(dep.serializer)
    val serializerInstance = ser.newInstance()    //獲取序列化工具

    // Create a key/value iterator for each stream
    val recordIter = wrappedStreams.flatMap { wrappedStream =>
      // Note: the asKeyValueIterator below wraps a key/value iterator inside of a
      // NextIterator. The NextIterator makes sure that close() is called on the
      // underlying InputStream when all records have been read.
      serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator    //將輸入反序列化為KeyValueIterator
    }

    // Update the context task metrics for each record read.
    // 更新Task context的元數(shù)據(jù)信息
    val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
    val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
      recordIter.map(record => {
        readMetrics.incRecordsRead(1)
        record
      }),
      context.taskMetrics().updateShuffleReadMetrics())

    // An interruptible iterator must be used here in order to support task cancellation
    val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)  //可取消的iter

    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { //需要聚合
      if (dep.mapSideCombine) { //讀取map端已聚合過的數(shù)據(jù)
        // We are reading values that are already combined
        val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
        dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
      } else {    //僅需要reduce端的聚合
        // We don't know the value type, but also don't care -- the dependency *should*
        // have made sure its compatible w/ this aggregator, which will convert the value
        // type to the combined type C
        val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
        dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
      }
    } else {  //不需要聚合
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
      interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
    }

    // Sort the output if there is a sort ordering defined.
    dep.keyOrdering match {   //判斷是否需要排序
      case Some(keyOrd: Ordering[K]) => //如果需要排序
        // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
        // the ExternalSorter won't spill to disk.
        // 使用ExternalSorter進(jìn)行排序茶宵,如果spark.shuffle.spill沒有開啟危纫,那么數(shù)據(jù)是不會寫入硬盤的
        val sorter =
          new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = Some(ser))
        sorter.insertAll(aggregatedIter)
        context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
        context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
        context.internalMetricsToAccumulators(
          InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.peakMemoryUsedBytes)
        CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
      case None =>
        aggregatedIter
    }
  }

這段代碼中已經(jīng)做了注釋,切分一下有三塊功能:

  1. 用序列化工具讀取文件成為一個key/value iterator并更新Task context的元數(shù)據(jù)信息
  2. 根據(jù)傳入的Dependency中是否有聚合動作來對數(shù)據(jù)進(jìn)行聚合處理
  3. 根據(jù)Dependency中是否存在key的排序器來對數(shù)據(jù)進(jìn)行排序處理

其中,aggregator和keyOrdering對應(yīng)著shuffle write過程中的相應(yīng)參數(shù)种蝶,實現(xiàn)比較簡單契耿,這里不做具體分析,我們主要關(guān)注下游是如何獲取數(shù)據(jù)的螃征,這樣可以與上一篇文章一起形成關(guān)于shuffle整個過程的閉環(huán)搪桂。

block fetch

在第一部分中,首先創(chuàng)建了一個ShuffleBlockFetcherIterator對象盯滚,這個對象會創(chuàng)建一個(BlockID, InputStream)形式的Iterator來拉取中間文件的multiple blocks锅棕,這個對象在實例化的過程中首先會調(diào)用initialize()方法,以下是其源碼:

private[this] def initialize(): Unit = {
  // Add a task completion callback (called in both success case and failure case) to cleanup.
  context.addTaskCompletionListener(_ => cleanup())
  // Split local and remote blocks.
  // 如果數(shù)據(jù)從其他節(jié)點上獲取淌山,那么需要通過網(wǎng)絡(luò)
  val remoteRequests: ArrayBuffer[FetchRequest] = splitLocalRemoteBlocks()
  // Add the remote requests into our queue in a random order
  fetchRequests ++= Utils.randomize(remoteRequests)
  // Send out initial requests for blocks, up to our maxBytesInFlight
  // sendFetchRequests發(fā)送請求裸燎,每次請求最大值為maxBytesInFlight(默認(rèn)48MB),5個線程到5個節(jié)點
  fetchUpToMaxBytes()
  val numFetches = remoteRequests.size - fetchRequests.size
  logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))
  // Get Local Blocks
  // 如果數(shù)據(jù)在本地泼疑,直接獲取即可
  fetchLocalBlocks()
  logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime))
}

代碼中拉取數(shù)據(jù)有兩種德绿,一種是remoteBlocks另一種localBlocks,如果數(shù)據(jù)不在本地節(jié)點上退渗,那么就要通過網(wǎng)絡(luò)去獲取數(shù)據(jù)移稳,通過網(wǎng)絡(luò)拉取就會占用網(wǎng)絡(luò)帶寬,所以系統(tǒng)提供了兩種策略会油,具體實現(xiàn)在splitLocalRemoteBlocks方法中:

private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
    // Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them
    // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
    // nodes, rather than blocking on reading output from one node.
    // 每次最多啟動5個線程到最多5個節(jié)點上讀取數(shù)據(jù)
    // 每次請求的數(shù)據(jù)大小不會超過maxBytesInFlight的1/5
    val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
    logDebug("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize)

    // Split local and remote blocks. Remote blocks are further split into FetchRequests of size
    // at most maxBytesInFlight in order to limit the amount of data in flight.
    val remoteRequests = new ArrayBuffer[FetchRequest]

    // Tracks total number of blocks (including zero sized blocks)
    var totalBlocks = 0
    for ((address, blockInfos) <- blocksByAddress) {
      totalBlocks += blockInfos.size
      if (address.executorId == blockManager.blockManagerId.executorId) {
        // Filter out zero-sized blocks
        // 需要過濾大小為0的本地block
        localBlocks ++= blockInfos.filter(_._2 != 0).map(_._1)
        numBlocksToFetch += localBlocks.size
      } else {    // 需要遠(yuǎn)程獲取的block
        val iterator = blockInfos.iterator
        var curRequestSize = 0L
        var curBlocks = new ArrayBuffer[(BlockId, Long)]
        while (iterator.hasNext) {
          val (blockId, size) = iterator.next()
          // Skip empty blocks
          if (size > 0) {
            curBlocks += ((blockId, size))
            remoteBlocks += blockId
            numBlocksToFetch += 1
            curRequestSize += size
          } else if (size < 0) {
            throw new BlockException(blockId, "Negative block size " + size)
          }
          if (curRequestSize >= targetRequestSize) {
            // Add this FetchRequest
            remoteRequests += new FetchRequest(address, curBlocks)
            curBlocks = new ArrayBuffer[(BlockId, Long)]
            logDebug(s"Creating fetch request of $curRequestSize at $address")
            curRequestSize = 0
          }
        }
        // Add in the final request
        if (curBlocks.nonEmpty) {
          remoteRequests += new FetchRequest(address, curBlocks)
        }
      }
    }
    logInfo(s"Getting $numBlocksToFetch non-empty blocks out of $totalBlocks blocks")
    remoteRequests
  }

從代碼邏輯中可以得出通過網(wǎng)絡(luò)了拉取數(shù)據(jù)blocks的策略:

  1. 每次最多啟動5個線程到最多5個節(jié)點上讀取數(shù)據(jù)
  2. 每次請求數(shù)據(jù)的大小不會超過spark.reducer.maxMbInFlight(默認(rèn)48MB)的五分之一

這么做的目的一個是減少占用帶寬个粱,另一個是使用并行化請求數(shù)據(jù)減少請求時間。

請求已經(jīng)切分好了翻翩,接下來通過調(diào)用fetchUpToMaxBytes()方法來發(fā)送請求:

private def fetchUpToMaxBytes(): Unit = {
  // Send fetch requests up to maxBytesInFlight
  while (fetchRequests.nonEmpty &&
    (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
    sendRequest(fetchRequests.dequeue())
  }
}

當(dāng)請求大小不超過maxBytesInFlight都许,發(fā)送請求sendRequest()

private[this] def sendRequest(req: FetchRequest) {
  logDebug("Sending request for %d blocks (%s) from %s".format(
    req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))
  bytesInFlight += req.size
  // so we can look up the size of each blockID
  val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, size) }.toMap
  val blockIds = req.blocks.map(_._1.toString)
  val address = req.address
  // 通過網(wǎng)絡(luò)fetchBlocks的實現(xiàn)類為:NettyBlockTransferService,本地的fetchBlocks實現(xiàn)類為:BlockTransferService
  shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,
    new BlockFetchingListener {
      override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = {
        // Only add the buffer to results queue if the iterator is not zombie,
        // i.e. cleanup() has not been called yet.
        if (!isZombie) {
          // Increment the ref count because we need to pass this to a different thread.
          // This needs to be released after use.
          buf.retain()
          results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf))
          shuffleMetrics.incRemoteBytesRead(buf.size)
          shuffleMetrics.incRemoteBlocksFetched(1)
        }
        logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
      }
      override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
        logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e)
        results.put(new FailureFetchResult(BlockId(blockId), address, e))
      }
    }
  )
}

通過ShuffleClient實例去拉取Blocks嫂冻,這里的ShuffleClient有多種實現(xiàn)胶征,其中通過網(wǎng)絡(luò)獲取Blocks的實現(xiàn)類為:NettyBlockTransferService,而本地獲取Blocks的實現(xiàn)類為:BlockTransferService桨仿,fetchBlocks方法中根據(jù)傳入的host地址端口和executorId睛低,然后使用Netty協(xié)議去獲取數(shù)據(jù)。

接下來服傍,我們再來看一下本地的數(shù)據(jù)拉取方法:

private[this] def fetchLocalBlocks() {
  val iter = localBlocks.iterator
  while (iter.hasNext) {
    val blockId = iter.next()
    try {
      val buf = blockManager.getBlockData(blockId)
      shuffleMetrics.incLocalBlocksFetched(1)
      shuffleMetrics.incLocalBytesRead(buf.size)
      buf.retain()
      results.put(new SuccessFetchResult(blockId, blockManager.blockManagerId, 0, buf))
    } catch {
      case e: Exception =>
        // If we see an exception, stop immediately.
        logError(s"Error occurred while fetching local blocks", e)
        results.put(new FailureFetchResult(blockId, blockManager.blockManagerId, e))
        return
    }
  }
}

可以看出钱雷,本地的Blocks直接通過blockManager的getBlockData方法去獲取數(shù)據(jù),而如果數(shù)據(jù)是通過shuffle過程獲取的吹零,getBlockData就有兩種實現(xiàn):Hash和Sort
Hash的實現(xiàn)類為:FileShuffleBlockResolver
Sort的實現(xiàn)類為:IndexShuffleBlockResolver

其中的不同就是Sort策略的getBlockData需要先通過IndexFile定位到數(shù)據(jù)對應(yīng)的FileSegment罩抗,而Hash則可以直接通過blockId直接獲取文件.
以下是IndexShuffleBlockResolver的getBlockData方法:

override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
  // The block is actually going to be a range of a single map output file for this map, so
  // find out the consolidated file, then the offset within that from our index
  val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
  val in = new DataInputStream(new FileInputStream(indexFile))
  try {
    ByteStreams.skipFully(in, blockId.reduceId * 8)   //跳到本次block的數(shù)據(jù)區(qū)
    val offset = in.readLong()    // 數(shù)據(jù)文件中的開始位置
    val nextOffset = in.readLong()    // 數(shù)據(jù)文件中的結(jié)束位置
    new FileSegmentManagedBuffer(
      transportConf,
      getDataFile(blockId.shuffleId, blockId.mapId),
      offset,
      nextOffset - offset)
  } finally {
    in.close()
  }
}
性能調(diào)優(yōu)

通過兩篇對于shuffle的架構(gòu)和源碼實現(xiàn)的分析,可以得出shuffle是Spark Core中比較復(fù)雜的模塊瘪校,也很影響性能澄暮,這里總結(jié)一下shuffle模塊中對性能有影響的系統(tǒng)配置:

spark.shuffle.manager

這個參數(shù)用來選擇shuffle的機制:Hash還是Sort名段,在spark 1.2版本后默認(rèn)的機制已從Hash變成了Sort阱扬,而在2.0版本后泣懊,Hash機制已經(jīng)退出歷史舞臺。那么選擇Hash還是Sort主要是取決于內(nèi)存麻惶、排序和文件操作等多方面因素馍刮,如果產(chǎn)生的中間文件不是很多,那么采用Hash模式來避免不必要的排序可能是更好地選擇

spark.shuffle.sort.BypassMergeThreshold

這個配置的默認(rèn)值是200窃蹋,用于設(shè)置在Reducer的partitions數(shù)目少于這個值時卡啰,Sort Based Shuffle內(nèi)部使用歸并排序的方式處理數(shù)據(jù),而是直接將每個Partition寫入單獨的文件警没。這種方式可以看作Sort Based Shuffle在Shuffle量比較小的時候?qū)ash Based Shuffle的一種折中匈辱,當(dāng)然它也存在中間文件過多的問題,如果GC或者內(nèi)存使用比較緊張的話杀迹,可以適當(dāng)降低這個值

spark.shuffle.compress和spark.shuffle.spill.compress

這兩個參數(shù)的默認(rèn)配置都是true亡脸,前者是設(shè)置shuffle最終輸出到文件系統(tǒng)的文件是否壓縮,后者是在shuffle過程中處理數(shù)據(jù)寫入外部存儲的數(shù)據(jù)是否壓縮树酪。
spark.shuffle.compress
如果下游的Task讀取上游結(jié)果的網(wǎng)絡(luò)IO成為瓶頸浅碾,那么可以考慮啟用壓縮來減少網(wǎng)絡(luò)IO,如果計算是CPU密集型的续语,那么將這個選項設(shè)置為false更為合適垂谢。
spark.shuffle.spill.compress
如果在處理中間結(jié)果spill到本地硬盤時,出現(xiàn)Disk IO疮茄,那么設(shè)置為true啟用壓縮可能會比較合適滥朱,如果本地硬盤是SSD的,那么設(shè)置為false會比較合適力试。

簡單來說焚虱,需要在項目中衡量壓縮、解壓縮帶來的時間消耗與磁盤懂版、帶寬IO之間的利弊鹃栽,具體情況,具體對待躯畴。

spark.reducer.maxSizeInFlight

這個參數(shù)用于限制一個Reducer Task向其他的Executor請求shuffle數(shù)據(jù)是所占用的最大內(nèi)存數(shù)民鼓,默認(rèn)值為48MB,如果帶寬限制較大蓬抄,那么適當(dāng)調(diào)小這個值丰嘉,如果是萬兆網(wǎng)卡,可以考慮增大這個值嚷缭。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末饮亏,一起剝皮案震驚了整個濱河市耍贾,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌路幸,老刑警劉巖荐开,帶你破解...
    沈念sama閱讀 221,695評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異简肴,居然都是意外死亡晃听,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,569評論 3 399
  • 文/潘曉璐 我一進(jìn)店門砰识,熙熙樓的掌柜王于貴愁眉苦臉地迎上來能扒,“玉大人,你說我怎么就攤上這事辫狼〕醢撸” “怎么了?”我有些...
    開封第一講書人閱讀 168,130評論 0 360
  • 文/不壞的土叔 我叫張陵膨处,是天一觀的道長见秤。 經(jīng)常有香客問我,道長灵迫,這世上最難降的妖魔是什么秦叛? 我笑而不...
    開封第一講書人閱讀 59,648評論 1 297
  • 正文 為了忘掉前任,我火速辦了婚禮瀑粥,結(jié)果婚禮上挣跋,老公的妹妹穿的比我還像新娘。我一直安慰自己狞换,他們只是感情好避咆,可當(dāng)我...
    茶點故事閱讀 68,655評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著修噪,像睡著了一般查库。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上黄琼,一...
    開封第一講書人閱讀 52,268評論 1 309
  • 那天樊销,我揣著相機與錄音,去河邊找鬼脏款。 笑死围苫,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的撤师。 我是一名探鬼主播剂府,決...
    沈念sama閱讀 40,835評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼剃盾!你這毒婦竟也來了腺占?” 一聲冷哼從身側(cè)響起淤袜,我...
    開封第一講書人閱讀 39,740評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎衰伯,沒想到半個月后铡羡,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,286評論 1 318
  • 正文 獨居荒郊野嶺守林人離奇死亡嚎研,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,375評論 3 340
  • 正文 我和宋清朗相戀三年蓖墅,在試婚紗的時候發(fā)現(xiàn)自己被綠了库倘。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片临扮。...
    茶點故事閱讀 40,505評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖教翩,靈堂內(nèi)的尸體忽然破棺而出杆勇,到底是詐尸還是另有隱情,我是刑警寧澤饱亿,帶...
    沈念sama閱讀 36,185評論 5 350
  • 正文 年R本政府宣布蚜退,位于F島的核電站,受9級特大地震影響彪笼,放射性物質(zhì)發(fā)生泄漏钻注。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,873評論 3 333
  • 文/蒙蒙 一配猫、第九天 我趴在偏房一處隱蔽的房頂上張望幅恋。 院中可真熱鬧,春花似錦泵肄、人聲如沸捆交。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,357評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽品追。三九已至,卻和暖如春冯丙,著一層夾襖步出監(jiān)牢的瞬間肉瓦,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,466評論 1 272
  • 我被黑心中介騙來泰國打工胃惜, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留泞莉,地道東北人。 一個月前我還...
    沈念sama閱讀 48,921評論 3 376
  • 正文 我出身青樓蛹疯,卻偏偏與公主長得像戒财,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子捺弦,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,515評論 2 359

推薦閱讀更多精彩內(nèi)容