Spark-Core源碼精讀(14)丰捷、Shuffle--Write部分

前面我們分析了Spark中具體的Task的提交和運(yùn)行過程坯墨,從本文開始我們開始進(jìn)入Shuffle的世界,Shuffle對(duì)于分布式計(jì)算來說是至關(guān)重要的部分病往,它直接影響了分布式系統(tǒng)的性能捣染,所以我將盡可能進(jìn)行詳細(xì)的分析。

我們首先來看Shuffle中的Write部分:

override def runTask(context: TaskContext): MapStatus = {
  // Deserialize the RDD using the broadcast variable.
  val deserializeStartTime = System.currentTimeMillis()
  val ser = SparkEnv.get.closureSerializer.newInstance()
  val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
  metrics = Some(context.taskMetrics)
  var writer: ShuffleWriter[Any, Any] = null
  try {
    val manager = SparkEnv.get.shuffleManager
    writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
    writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    writer.stop(success = true).get
  } catch {
    case e: Exception =>
      try {
        if (writer != null) {
          writer.stop(success = false)
        }
      } catch {
        case e: Exception =>
          log.debug("Could not stop writer", e)
      }
      throw e
  }
}

首先根據(jù)SparkEnv獲得ShuffleManager停巷,ShuffleManager是為Spark shuffle系統(tǒng)而抽象的可插拔的接口耍攘,它被創(chuàng)建在Driver和Executor上,具體是在SparkEnv實(shí)例化的時(shí)候進(jìn)行配置的叠穆,源碼如下:

val shortShuffleMgrNames = Map(
  "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
  "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
  "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

可以看到是由"spark.shuffle.manager"配置項(xiàng)來決定具體使用哪種實(shí)現(xiàn)方式少漆,默認(rèn)情況下使用的是sort的方式(本文參考的是Spark 1.6.3版本的源碼)臼膏。Driver通過ShuffleManager來注冊(cè)shuffles硼被,Executors可以通過它來讀寫數(shù)據(jù)。

獲得到ShuffleManager后渗磅,就根據(jù)它來獲得ShuffleWriter(根據(jù)具體ShuffleManager的getWriter方法獲得)嚷硫,顧名思義就是用來寫數(shù)據(jù),而接下來的工作就是調(diào)用具體的ShuffleWriter的write方法來進(jìn)行寫數(shù)據(jù)的工作始鱼。

先來看一下getWriter方法仔掸,這里的第一個(gè)參數(shù)dep.shuffleHandle是ShuffleDependency的一個(gè)成員變量:

val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
  shuffleId, _rdd.partitions.size, this)

這里的registerShuffle方法用來向ShuffleManager注冊(cè)一個(gè)shuffle并且獲得一個(gè)用來傳遞任務(wù)的句柄,會(huì)根據(jù)不同ShuffleManager有不同的實(shí)現(xiàn)医清,HashShuffleManager返回的是BaseShuffleHandle起暮,而SortShuffleManager又會(huì)根據(jù)不同的情況返回BypassMergeSortShuffleHandle、SerializedShuffleHandle或者BaseShuffleHandle会烙。

HashShuffleManager:

override def registerShuffle[K, V, C](
    shuffleId: Int,
    numMaps: Int,
    dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
  new BaseShuffleHandle(shuffleId, numMaps, dependency)
}

SortShuffleManager:

override def registerShuffle[K, V, C](
    shuffleId: Int,
    numMaps: Int,
    dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
  if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) {
    // 這里注釋說的很清楚负懦,根據(jù)spark.shuffle.sort.bypassMergeThreshold的值(默認(rèn)是200)判斷是否需要進(jìn)行Map端的聚合操作
    // 如果partitions的個(gè)數(shù)小于200就不進(jìn)行該操作
    // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
    // need map-side aggregation, then write numPartitions files directly and just concatenate
    // them at the end. This avoids doing serialization and deserialization twice to merge
    // together the spilled files, which would happen with the normal code path. The downside is
    // having multiple files open at a time and thus more memory allocated to buffers.
    new BypassMergeSortShuffleHandle[K, V](
      shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
  // 這里是判斷是否使用tungsten的方式
  } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
    // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
    new SerializedShuffleHandle[K, V](
      shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
  } else {
    // 如果不是上述兩種方式,就使用默認(rèn)的方式
    // Otherwise, buffer map outputs in a deserialized form:
    new BaseShuffleHandle(shuffleId, numMaps, dependency)
  }
}

使用一張圖來總結(jié)一下上面的過程:

然后我們來看ShuffleManager的getWriter方法:

HashShuffleManager:

/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext)
    : ShuffleWriter[K, V] = {
  new HashShuffleWriter(
    shuffleBlockResolver, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context)
}

SortShuffleManager:

/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](
    handle: ShuffleHandle,
    mapId: Int,
    context: TaskContext): ShuffleWriter[K, V] = {
  numMapsForShuffle.putIfAbsent(
    handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
  val env = SparkEnv.get
  handle match {
    case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
      new UnsafeShuffleWriter(
        env.blockManager,
        shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
        context.taskMemoryManager(),
        unsafeShuffleHandle,
        mapId,
        context,
        env.conf)
    case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
      new BypassMergeSortShuffleWriter(
        env.blockManager,
        shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
        bypassMergeSortHandle,
        mapId,
        context,
        env.conf)
    case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
      new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
  }
}

從上述源碼可以看出柏腻,getWriter方法內(nèi)部實(shí)際上是根據(jù)傳進(jìn)來的ShuffleHandle的具體類型來判斷使用哪種ShuffleWriter的纸厉,然后最終執(zhí)行ShuffleWriter的write方法,下面我們就分為HashShuffleManager和SortShuffleManager兩種類型來進(jìn)行分析五嫂。

1颗品、HashShuffleManager

從上面的源碼中可以看到HashShuffleManager最終實(shí)例化的是HashShuffleWriter,實(shí)例化的時(shí)候有一行比較重要的代碼:

private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,
  writeMetrics)

我們來看forMapTask這個(gè)方法:

def forMapTask(shuffleId: Int, mapId: Int, numReducers: Int, serializer: Serializer,
    writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = {
  // 為每個(gè)ShuffleMapTask實(shí)例化了一個(gè)ShuffleWriterGroup
  new ShuffleWriterGroup {
    // 實(shí)例化ShuffleState并保存shuffleId和ShuffleState的對(duì)應(yīng)關(guān)系
    shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReducers))
    // 根據(jù)shuffleId獲得對(duì)應(yīng)的ShuffleState
    private val shuffleState = shuffleStates(shuffleId)
    val openStartTime = System.nanoTime
    val serializerInstance = serializer.newInstance()
    // 獲得該ShuffleWriterGroup的writers
    val writers: Array[DiskBlockObjectWriter] = {
      Array.tabulate[DiskBlockObjectWriter](numReducers) { bucketId =>
        // 生成ShuffleBlockId沃缘,是一個(gè)case class嫌蚤,我們可以通過name方法看到其具體的組成:
        // override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
        val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
        // 通過DiskBlockManager的getFile方法獲得File
        val blockFile = blockManager.diskBlockManager.getFile(blockId)
        // 臨時(shí)目錄
        val tmp = Utils.tempFileWith(blockFile)
        // 使用BlockManager的getDiskWriter方法獲得DiskBlockObjectWriter
        // 注意這里的bufferSize默認(rèn)情況下是32kb谒亦,可以通過spark.shuffle.file.buffer進(jìn)行配置
        // private val bufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", "32k").toInt * 1024
        blockManager.getDiskWriter(blockId, tmp, serializerInstance, bufferSize, writeMetrics)
      }
    }
    // Creating the file to write to and creating a disk writer both involve interacting with
    // the disk, so should be included in the shuffle write time.
    writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
    override def releaseWriters(success: Boolean) {
      shuffleState.completedMapTasks.add(mapId)
    }
  }
}

為每個(gè)ShuffleMapTask實(shí)例化一個(gè)ShuffleWriterGroup,其中包含了一組writers,每個(gè)writer對(duì)應(yīng)一個(gè)reducer牙瓢。

然后我們進(jìn)入到HashShuffleWriter的write方法:

/** Write a bunch of records to this task's output */
override def write(records: Iterator[Product2[K, V]]): Unit = {
  val iter = if (dep.aggregator.isDefined) {
    // 判斷是否進(jìn)行map端的combine操作
    if (dep.mapSideCombine) {
      dep.aggregator.get.combineValuesByKey(records, context)
    } else {
      records
    }
  } else {
    require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
    records
  }
  for (elem <- iter) {
    val bucketId = dep.partitioner.getPartition(elem._1)
    shuffle.writers(bucketId).write(elem._1, elem._2)
  }
}

如果沒有進(jìn)行Map端的combine操作,根據(jù)key獲得bucketId事示,實(shí)際上是進(jìn)行取模運(yùn)算:

def nonNegativeMod(x: Int, mod: Int): Int = {
  val rawMod = x % mod
  rawMod + (if (rawMod < 0) mod else 0)
}

下面就是根據(jù)bucketId獲得ShuffleWriterGroup中對(duì)應(yīng)的writer,然后執(zhí)行其write方法寨昙,將key和value寫入到對(duì)應(yīng)的block file中:

/**
 * Writes a key-value pair.
 */
def write(key: Any, value: Any) {
  if (!initialized) {
    open()
  }
  objOut.writeKey(key)
  objOut.writeValue(value)
  recordWritten()
}

寫入完成后,我們回到ShuffleMapTask的runTask方法中掀亩,接下來執(zhí)行的是:

writer.stop(success = true).get

即HashShuffleWriter的stop方法舔哪,該方法返回的是Option[MapStatus],最主要的一句代碼為:

Some(commitWritesAndBuildStatus())

進(jìn)入到commitWritesAndBuildStatus方法:

private def commitWritesAndBuildStatus(): MapStatus = {
  // Commit the writes. Get the size of each bucket block (total block size).
  val sizes: Array[Long] = shuffle.writers.map { writer: DiskBlockObjectWriter =>
    // 調(diào)用DiskBlockObjectWriter的commitAndClose方法
    writer.commitAndClose()
    // 獲得每個(gè)bucket block的大小
    writer.fileSegment().length
  }
  // 重命名所有的shuffle文件槽棍,每個(gè)executor只有一個(gè)ShuffleBlockResolver捉蚤,所以使用了synchronized關(guān)鍵字
  // rename all shuffle files to final paths
  // Note: there is only one ShuffleBlockResolver in executor
  shuffleBlockResolver.synchronized {
    shuffle.writers.zipWithIndex.foreach { case (writer, i) =>
      val output = blockManager.diskBlockManager.getFile(writer.blockId)
      if (sizes(i) > 0) {
        if (output.exists()) {
          // Use length of existing file and delete our own temporary one
          sizes(i) = output.length()
          writer.file.delete()
        } else {
          // Commit by renaming our temporary file to something the fetcher expects
          if (!writer.file.renameTo(output)) {
            throw new IOException(s"fail to rename ${writer.file} to $output")
          }
        }
      } else {
        if (output.exists()) {
          output.delete()
        }
      }
    }
  }
  MapStatus(blockManager.shuffleServerId, sizes)
}

這里我們需要注意最終返回的是封裝的MapStatus,它記錄了產(chǎn)生的磁盤文件的位置炼七,然后Executor中的MapOutputTrackerWorker將MapStatus信息發(fā)送給Driver中的MapOutputTrackerMaster缆巧,后面Shuffle Read的之后就會(huì)從Driver的MapOutputTrackerMaster獲取MapStatus的信息,也就是獲取對(duì)應(yīng)的上一個(gè)ShuffleMapTask的計(jì)算結(jié)果的輸出的文件位置信息豌拙。

Map端combine的情況

再來補(bǔ)充一下Map端combine的情況:

if (dep.mapSideCombine) {
  dep.aggregator.get.combineValuesByKey(records, context)
} else {
  records
}

進(jìn)入到Aggregator的combineValuesByKey方法:

def combineValuesByKey(
    iter: Iterator[_ <: Product2[K, V]],
    context: TaskContext): Iterator[(K, C)] = {
  // 首先實(shí)例化ExternalAppendOnlyMap
  val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
  // 執(zhí)行ExternalAppendOnlyMap的insertAll方法
  combiners.insertAll(iter)
  updateMetrics(context, combiners)
  combiners.iterator
}

首先實(shí)例化了ExternalAppendOnlyMap陕悬,然后執(zhí)行ExternalAppendOnlyMap的insertAll方法:

def insertAll(entries: Iterator[Product2[K, V]]): Unit = {
  if (currentMap == null) {
    throw new IllegalStateException(
      "Cannot insert new elements into a map after calling iterator")
  }
  // An update function for the map that we reuse across entries to avoid allocating
  // a new closure each time
  var curEntry: Product2[K, V] = null
  val update: (Boolean, C) => C = (hadVal, oldVal) => {
    if (hadVal) mergeValue(oldVal, curEntry._2) else createCombiner(curEntry._2)
  }
  while (entries.hasNext) {
    curEntry = entries.next()
    val estimatedSize = currentMap.estimateSize()
    if (estimatedSize > _peakMemoryUsedBytes) {
      _peakMemoryUsedBytes = estimatedSize
    }
    if (maybeSpill(currentMap, estimatedSize)) {
      currentMap = new SizeTrackingAppendOnlyMap[K, C]
    }
    currentMap.changeValue(curEntry._1, update)
    addElementsRead()
  }
}

具體的實(shí)現(xiàn)方式就不再解釋了,簡(jiǎn)單的說就是將key相同的value進(jìn)行合并按傅,如果某個(gè)key有對(duì)應(yīng)的值就執(zhí)行merge(也可以理解為更新)操作捉超,如果沒有對(duì)應(yīng)的值就新建一個(gè)combiner,需要注意的是如果內(nèi)存不夠的話就會(huì)將數(shù)據(jù)spill到磁盤唯绍。

HashShuffle方式的Shuffle Write部分至此結(jié)束拼岳,使用一張圖概括一下:

接下來看一下SortShuffle方式的具體流程。

2况芒、SortShuffleManager

為了解決Hash Shuffle產(chǎn)生小文件過多的問題惜纸,產(chǎn)生了Sort Shuffle,解下來我們就一起看一下Sort Shuffle的Write部分绝骚。

上文中我們已經(jīng)提到耐版,SortShuffleManager中的getWriter會(huì)根據(jù)不同的ShuffleHandle產(chǎn)生相應(yīng)的ShuffleWriter:

  • SerializedShuffleHandle 對(duì)應(yīng) UnsafeShuffleWriter
  • BypassMergeSortShuffleHandle 對(duì)應(yīng) BypassMergeSortShuffleWriter
  • BaseShuffleHandle 對(duì)應(yīng) SortShuffleWriter

下面我們分別進(jìn)行分析:

2.1、BaseShuffleHandle & SortShuffleWriter

首先來看一下SortShuffleWriter皮壁,直接來看它的write方法:

/** Write a bunch of records to this task's output */
override def write(records: Iterator[Product2[K, V]]): Unit = {
  sorter = if (dep.mapSideCombine) {
    require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
    new ExternalSorter[K, V, C](
      context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
  } else {
    // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
    // care whether the keys get sorted in each partition; that will be done on the reduce side
    // if the operation being run is sortByKey.
    new ExternalSorter[K, V, V](
      context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
  }
  sorter.insertAll(records)
  // Don't bother including the time to open the merged output file in the shuffle write time,
  // because it just opens a single file, so is typically too fast to measure accurately
  // (see SPARK-3570).
  val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
  val tmp = Utils.tempFileWith(output)
  try {
    val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
    val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
    shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
  } finally {
    if (tmp.exists() && !tmp.delete()) {
      logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
    }
  }
}

我們看到椭更,內(nèi)部有一個(gè)非常重要的部分,即ExternalSorter蛾魄,而關(guān)于ExternalSorter的使用虑瀑,源碼中的注釋說的很清楚了,這里就不做翻譯了:

/**
* Users interact with this class in the following way:
*
* 1. Instantiate an ExternalSorter.
*
* 2. Call insertAll() with a set of records.
*
* 3. Request an iterator() back to traverse sorted/aggregated records.
*     - or -
*    Invoke writePartitionedFile() to create a file containing sorted/aggregated outputs
*    that can be used in Spark's sort shuffle.
*/

我們就根據(jù)這三個(gè)步驟進(jìn)行說明:

2.1.1 第一步

首先就是實(shí)例化ExternalSorter滴须,這里有一個(gè)判斷舌狗,如果要進(jìn)行map端的combine操作的話就需要指定Aggregator和Ordering,否則這兩個(gè)參數(shù)為None扔水。我們熟悉的reduceByKey就進(jìn)行了Map端的combine操作痛侍,如下圖所示:

2.1.2 第二步(這一步非常重要)

通過判斷是否進(jìn)行Map端combine操作而實(shí)例化出不同的ExternalSorter后,就會(huì)調(diào)用insertAll方法,將輸入的記錄寫入到內(nèi)存中主届,如果內(nèi)存不足就spill到磁盤中赵哲,具體的實(shí)現(xiàn)我們來看insertAll方法:

def insertAll(records: Iterator[Product2[K, V]]): Unit = {
  // TODO: stop combining if we find that the reduction factor isn't high
  // 首先判斷是否需要進(jìn)行Map端的combine操作
  val shouldCombine = aggregator.isDefined
  if (shouldCombine) {
    // 如果需要進(jìn)行map端的combine操作,使用PartitionedAppendOnlyMap作為緩存
    // 將record根據(jù)key對(duì)value按照獲得的聚合函數(shù)進(jìn)行聚合操作(combine)
    // Combine values in-memory first using our AppendOnlyMap
    // 獲得聚合函數(shù)君丁,例如我們使用reduceByKey時(shí)編寫的函數(shù)
    val mergeValue = aggregator.get.mergeValue
    // 獲取createCombiner函數(shù)
    val createCombiner = aggregator.get.createCombiner
    var kv: Product2[K, V] = null
    // 定義update函數(shù)枫夺,主要的邏輯是:如果某個(gè)key已經(jīng)存在記錄(record)就使用上面獲取
    // 的聚合函數(shù)進(jìn)行聚合操作,如果還不存在記錄就使用createCombiner方法進(jìn)行初始化操作
    val update = (hadValue: Boolean, oldValue: C) => {
      if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
    }
    // 循環(huán)遍歷所有的records(記錄)
    while (records.hasNext) {
      // 記錄spill的頻率绘闷,每當(dāng)read一條record的時(shí)候都會(huì)記錄一次
      addElementsRead()
      // 使用kv儲(chǔ)存當(dāng)前讀的record
      kv = records.next()
      // 這里的map和下面else中的buffer都是用來緩存的數(shù)據(jù)結(jié)構(gòu)
      // 如果進(jìn)行Map端的聚合操作橡庞,使用的就是PartitionedAppendOnlyMap[K, C]
      // 如果不進(jìn)行Map端的聚合操作,使用的是PartitionedPairBuffer[K, C]
      // 調(diào)用上面定義的update函數(shù)將記錄插入到map中
      map.changeValue((getPartition(kv._1), kv._1), update)
      // 判斷是否要進(jìn)行spill操作
      maybeSpillCollection(usingMap = true)
    }
  } else {
    // 如果不需要進(jìn)行Map端的聚合操作印蔗,就直接將記錄放到buffer(PartitionedPairBuffer)中
    // Stick values into our buffer
    while (records.hasNext) {
      addElementsRead()
      val kv = records.next()
      buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
      maybeSpillCollection(usingMap = false)
    }
  }
}

具體的流程用注釋的方式寫在了上面的源碼中扒最,這里我們先來看一下PartitionedAppendOnlyMap和PartitionedPairBuffer分別是如何工作的:

PartitionedAppendOnlyMap:

首先來看PartitionedAppendOnlyMap的changeValue實(shí)現(xiàn),實(shí)際上华嘹,PartitionedAppendOnlyMap是繼承自SizeTrackingAppendOnlyMap吧趣,而SizeTrackingAppendOnlyMap又繼承自AppendOnlyMap,這里調(diào)用的changeValue方法實(shí)際上是SizeTrackingAppendOnlyMap的changeValue方法:

override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
  // 首先調(diào)用父類的changeValue方法
  val newValue = super.changeValue(key, updateFunc)
  // 然后調(diào)用SizeTracker接口的afterUpdate方法
  super.afterUpdate()
  // 返回newValue
  newValue
}

父類(AppendOnlyMap)的changeValue方法:

def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
  assert(!destroyed, destructionMessage)
  val k = key.asInstanceOf[AnyRef]
  // key為空時(shí)候的處理除呵,增加長(zhǎng)度
  if (k.eq(null)) {
    if (!haveNullValue) {
      incrementSize()
    }
    nullValue = updateFunc(haveNullValue, nullValue)
    haveNullValue = true
    return nullValue
  }
  var pos = rehash(k.hashCode) & mask
  var i = 1
  while (true) {
    // 這里的data是一個(gè)數(shù)組再菊,用來同時(shí)存儲(chǔ)key和value:key0, value0, key1, value1, key2, value2, etc.
    // 即2 * pos上存儲(chǔ)的是key的值爪喘,2 * pos + 1上存儲(chǔ)的是value的值
    val curKey = data(2 * pos)
    // 如果key已經(jīng)存在颜曾,就調(diào)用updateFunc方法更新value
    if (k.eq(curKey) || k.equals(curKey)) {
      val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
      data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
      return newValue
    } else if (curKey.eq(null)) {
      // 如果key不存在就將該key和對(duì)應(yīng)的value添加到data這個(gè)數(shù)組中
      val newValue = updateFunc(false, null.asInstanceOf[V])
      data(2 * pos) = k
      data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
      incrementSize()
      return newValue
    } else {
      // 否則繼續(xù)計(jì)算位置(pos)
      val delta = i
      pos = (pos + delta) & mask
      i += 1
    }
  }
  null.asInstanceOf[V] // Never reached but needed to keep compiler happy
}

然后是SizeTracker接口的afterUpdate方法

protected def afterUpdate(): Unit = {
  numUpdates += 1
  if (nextSampleNum == numUpdates) {
    takeSample()
  }
}

更新數(shù)據(jù)的更新次數(shù),如果更新的次數(shù)達(dá)到nextSampleNum秉剑,就執(zhí)行采樣操作泛豪,主要用來評(píng)估內(nèi)存的使用情況。

PartitionedPairBuffer:

再來看PartitionedPairBuffer的insert方法侦鹏,也就是不進(jìn)行Map端combine操作的情況:

def insert(partition: Int, key: K, value: V): Unit = {
  // 如果當(dāng)前的大小達(dá)到了capacity的值就需要擴(kuò)大該數(shù)組
  if (curSize == capacity) {
    growArray()
  }
  // 存儲(chǔ)key诡曙,這里存儲(chǔ)的是(partition Id, key)的格式
  data(2 * curSize) = (partition, key.asInstanceOf[AnyRef])
  // 存儲(chǔ)value
  data(2 * curSize + 1) = value.asInstanceOf[AnyRef]
  curSize += 1
  // 參考上面PartitionedAppendOnlyMap的部分
  afterUpdate()
}

直接將數(shù)據(jù)存儲(chǔ)到buffer中。

執(zhí)行完上面的更新數(shù)據(jù)操作后略水,就要判斷是否要將數(shù)據(jù)spill到磁盤价卤,即maybeSpillCollection方法:

private def maybeSpillCollection(usingMap: Boolean): Unit = {
  var estimatedSize = 0L
  // 這里需要判斷使用的是map(PartitionedAppendOnlyMap)還是buffer(PartitionedPairBuffer)
  // 如果true就是map,false就是buffer
  if (usingMap) {
    // 估計(jì)當(dāng)前map的內(nèi)存占用大小
    estimatedSize = map.estimateSize()
    // 如果超過內(nèi)存的限制渊涝,就將緩存中的數(shù)據(jù)spill到磁盤
    if (maybeSpill(map, estimatedSize)) {
      // spill到磁盤后慎璧,重置緩存
      map = new PartitionedAppendOnlyMap[K, C]
    }
  } else {
    // 不進(jìn)行Map端聚合操作的情況
    estimatedSize = buffer.estimateSize()
    if (maybeSpill(buffer, estimatedSize)) {
      buffer = new PartitionedPairBuffer[K, C]
    }
  }
  if (estimatedSize > _peakMemoryUsedBytes) {
    _peakMemoryUsedBytes = estimatedSize
  }
}

上面代碼的主要作用就是估計(jì)當(dāng)前緩存(map或者buffer)使用內(nèi)存的大小,如果超過了內(nèi)存使用的限制跨释,就要將緩存中的數(shù)據(jù)spill到磁盤中胸私,同時(shí)重置當(dāng)前的緩存。

下面就來看一下maybeSpill方法:

// 如果成功spill到磁盤就返回true鳖谈,否則返回false
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
  var shouldSpill = false
  // 在進(jìn)行真正的spill操作之前向TaskMemoryManager申請(qǐng)?jiān)俣喾峙湟恍﹥?nèi)存
  if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
    // Claim up to double our current memory from the shuffle memory pool
    val amountToRequest = 2 * currentMemory - myMemoryThreshold
    val granted =
      taskMemoryManager.acquireExecutionMemory(amountToRequest, MemoryMode.ON_HEAP, null)
    myMemoryThreshold += granted
    // If we were granted too little memory to grow further (either tryToAcquire returned 0,
    // or we already had more memory than myMemoryThreshold), spill the current collection
    // 如果內(nèi)存仍然不夠用岁疼,就認(rèn)定為需要spill到磁盤
    shouldSpill = currentMemory >= myMemoryThreshold
  }
  // 如果內(nèi)存中元素的個(gè)數(shù)超過了強(qiáng)制spill的上限也會(huì)認(rèn)定為需要進(jìn)行spill操作
  shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
  // Actually spill
  // 接下來就真正將數(shù)據(jù)spill到磁盤
  if (shouldSpill) {
    _spillCount += 1
    logSpillage(currentMemory)
    // spill操作
    spill(collection)
    _elementsRead = 0
    _memoryBytesSpilled += currentMemory
    releaseMemory()
  }
  shouldSpill
}

在進(jìn)行真正的spill操作之前會(huì)向TaskMemoryManager申請(qǐng)?jiān)俣喾峙湟恍﹥?nèi)存,如果還不能夠滿足缆娃,或者不能分配更多的內(nèi)存捷绒,或者內(nèi)存中元素的個(gè)數(shù)超過了強(qiáng)制spill的上限瑰排,最終就會(huì)執(zhí)行spill操作,接下來進(jìn)入spill方法:

// 這里的collection就是指的map或者buffer
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
  // Because these files may be read during shuffle, their compression must be controlled by
  // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
  // createTempShuffleBlock here; see SPARK-3426 for more context.
  // 獲取臨時(shí)的BlockId(TempShuffleBlockId)及對(duì)應(yīng)的File
  val (blockId, file) = diskBlockManager.createTempShuffleBlock()
  // These variables are reset after each flush
  var objectsWritten: Long = 0
  var spillMetrics: ShuffleWriteMetrics = null
  var writer: DiskBlockObjectWriter = null
  def openWriter(): Unit = {
    assert (writer == null && spillMetrics == null)
    spillMetrics = new ShuffleWriteMetrics
    writer = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)
  }
  // 獲得DiskWriter(DiskBlockObjectWriter)
  openWriter()
  // List of batch sizes (bytes) in the order they are written to disk
  // 用來儲(chǔ)存每個(gè)batch對(duì)應(yīng)的size
  val batchSizes = new ArrayBuffer[Long]
  // How many elements we have in each partition
  // 用來儲(chǔ)存每個(gè)partition有多少元素
  val elementsPerPartition = new Array[Long](numPartitions)
  // Flush the disk writer's contents to disk, and update relevant variables.
  // The writer is closed at the end of this process, and cannot be reused.
  def flush(): Unit = {
    val w = writer
    writer = null
    w.commitAndClose()
    _diskBytesSpilled += spillMetrics.shuffleBytesWritten
    batchSizes.append(spillMetrics.shuffleBytesWritten)
    spillMetrics = null
    objectsWritten = 0
  }
  var success = false
  try {
    // 排序部分的操作暖侨,返回迭代器
    val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
    // 循環(huán)的到的迭代器凶伙,執(zhí)行write操作
    while (it.hasNext) {
      val partitionId = it.nextPartition()
      require(partitionId >= 0 && partitionId < numPartitions,
        s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})")
      it.writeNext(writer)
      elementsPerPartition(partitionId) += 1
      objectsWritten += 1
      // 如果寫的對(duì)象達(dá)到serializerBatchSize的個(gè)數(shù)時(shí)就進(jìn)行flush操作
      if (objectsWritten == serializerBatchSize) {
        flush()
        openWriter()
      }
    }
    if (objectsWritten > 0) {
      flush()
    } else if (writer != null) {
      val w = writer
      writer = null
      w.revertPartialWritesAndClose()
    }
    success = true
  } finally {
    if (!success) {
      // This code path only happens if an exception was thrown above before we set success;
      // close our stuff and let the exception be thrown further
      if (writer != null) {
        writer.revertPartialWritesAndClose()
      }
      if (file.exists()) {
        if (!file.delete()) {
          logWarning(s"Error deleting ${file}")
        }
      }
    }
  }
  // 實(shí)例化SpilledFile,并保存在數(shù)據(jù)結(jié)構(gòu)ArrayBuffer[SpilledFile]中
  spills.append(SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition))
}

先來簡(jiǎn)單的看一下排序部分的邏輯:

def destructiveSortedWritablePartitionedIterator(keyComparator: Option[Comparator[K]])
  : WritablePartitionedIterator = {
  // 這里的partitionedDestructiveSortedIterator會(huì)根據(jù)是map或者buffer有不同的實(shí)現(xiàn)
  val it = partitionedDestructiveSortedIterator(keyComparator)
  // 最后返回的是WritablePartitionedIterator它碎,上面進(jìn)行寫操作的時(shí)候就是調(diào)用該迭代器中的writeNext方法
  new WritablePartitionedIterator {
    private[this] var cur = if (it.hasNext) it.next() else null
    def writeNext(writer: DiskBlockObjectWriter): Unit = {
      writer.write(cur._1._2, cur._2)
      cur = if (it.hasNext) it.next() else null
    }
    def hasNext(): Boolean = cur != null
    def nextPartition(): Int = cur._1._1
  }
}

如果collection是map函荣,具體的實(shí)現(xiàn)為(PartitionedAppendOnlyMap):

def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
  : Iterator[((Int, K), V)] = {
  val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
  destructiveSortedIterator(comparator)
}

如果collection是buffer,具體的實(shí)現(xiàn)為(PartitionedPairBuffer):

override def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
  : Iterator[((Int, K), V)] = {
  val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
  new Sorter(new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator)
  iterator
}

這里需要注意扳肛,首先都是獲取比較器傻挂,比較的數(shù)據(jù)格式為((partition Id, key), value),而兩者的區(qū)別在于前者才是真正的Destructive級(jí)別的挖息,具體的實(shí)現(xiàn)在destructiveSortedIterator方法中金拒,而不管采用哪種方式,其底層都是通過timSort算法實(shí)現(xiàn)的套腹,具體的排序邏輯就不詳細(xì)說明了绪抛,有興趣的朋友可以深入研究下去。

接下來就進(jìn)入到第三步电禀。

2.1.3 第三步

先貼出該步驟的代碼:

...

// 這里注釋說的很清楚了幢码,只打開了一個(gè)文件
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
try {
  // 構(gòu)造blockId
  val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
  // 寫數(shù)據(jù)
  val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
  // 寫index文件
  shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
  // 進(jìn)行Shuffle Read的時(shí)候需要參考該信息
  mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
  if (tmp.exists() && !tmp.delete()) {
    logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
  }
}

writePartitionedFile:

首先來看writePartitionedFile方法:

def writePartitionedFile(
    blockId: BlockId,
    outputFile: File): Array[Long] = {
  // Track location of each range in the output file
  val lengths = new Array[Long](numPartitions)
  // 首先判斷spills中是否有數(shù)據(jù),即判斷是否有數(shù)據(jù)被spill到了磁盤中
  if (spills.isEmpty) {
    // 數(shù)據(jù)只在內(nèi)存中的情況
    // Case where we only have in-memory data
    val collection = if (aggregator.isDefined) map else buffer
    // 獲得迭代器
    val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
    // 進(jìn)行迭代并將數(shù)據(jù)寫到磁盤
    while (it.hasNext) {
      val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
        context.taskMetrics.shuffleWriteMetrics.get)
      val partitionId = it.nextPartition()
      while (it.hasNext && it.nextPartition() == partitionId) {
        it.writeNext(writer)
      }
      writer.commitAndClose()
      val segment = writer.fileSegment()
      // 最后返回的是每個(gè)partition寫入的數(shù)據(jù)的長(zhǎng)度
      lengths(partitionId) = segment.length
    }
  } else {
    // 如果有數(shù)據(jù)被spill到了磁盤中尖飞,我們就需要進(jìn)行merge-sort操作
    // We must perform merge-sort; get an iterator by partition and write everything directly.
    for ((id, elements) <- this.partitionedIterator) {
      if (elements.hasNext) {
        val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
          context.taskMetrics.shuffleWriteMetrics.get)
        for (elem <- elements) {
          writer.write(elem._1, elem._2)
        }
        writer.commitAndClose()
        val segment = writer.fileSegment()
        lengths(id) = segment.length
      }
    }
  }
  context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
  context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
  context.internalMetricsToAccumulators(
    InternalAccumulator.PEAK_EXECUTION_MEMORY).add(peakMemoryUsedBytes)
  lengths
}

下面我們就看一下this.partitionedIterator即內(nèi)存和磁盤中的數(shù)據(jù)是如何合到一起的:

def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
  val usingMap = aggregator.isDefined
  val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
  if (spills.isEmpty) {
    // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps
    // we don't even need to sort by anything other than partition ID
    if (!ordering.isDefined) {
      // The user hasn't requested sorted keys, so only sort by partition ID, not key
      groupByPartition(collection.partitionedDestructiveSortedIterator(None))
    } else {
      // We do need to sort by both partition ID and key
      groupByPartition(collection.partitionedDestructiveSortedIterator(Some(keyComparator)))
    }
  } else {
    // Merge spilled and in-memory data
    merge(spills, collection.partitionedDestructiveSortedIterator(comparator))
  }
}

我們只考慮spills不為空的情況症副,即執(zhí)行merge方法:

private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
    : Iterator[(Int, Iterator[Product2[K, C]])] = {
  // 根據(jù)每個(gè)SpilledFile實(shí)例化一個(gè)SpillReader,這些SpillReader組成一個(gè)Seq
  val readers = spills.map(new SpillReader(_))
  // 獲得內(nèi)存BufferedIterator
  val inMemBuffered = inMemory.buffered
  // 根據(jù)partition的個(gè)數(shù)進(jìn)行迭代
  (0 until numPartitions).iterator.map { p =>
    // 實(shí)例化IteratorForPartition政基,即當(dāng)前partition下的Iterator
    val inMemIterator = new IteratorForPartition(p, inMemBuffered)
    // 這里就是合并操作
    val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
    if (aggregator.isDefined) {
      // Perform partial aggregation across partitions
      // 如果需要map端的combine操作則需要根據(jù)key進(jìn)行聚合操作
      (p, mergeWithAggregation(
        iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
    } else if (ordering.isDefined) {
      // No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);
      // sort the elements without trying to merge them
      // 排序合并贞铣,例如sortByKey
      (p, mergeSort(iterators, ordering.get))
    } else {
      (p, iterators.iterator.flatten)
    }
  }
}

具體的mergeWithAggregation和mergeSort就不一一說明了,下面再來看一下writeIndexFileAndCommit

writeIndexFileAndCommit:

再來看writeIndexFileAndCommit方法:

def writeIndexFileAndCommit(
    shuffleId: Int,
    mapId: Int,
    lengths: Array[Long],
    dataTmp: File): Unit = {
  val indexFile = getIndexFile(shuffleId, mapId)
  val indexTmp = Utils.tempFileWith(indexFile)
  try {
    val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
    Utils.tryWithSafeFinally {
      // We take in lengths of each block, need to convert it to offsets.
      var offset = 0L
      out.writeLong(offset)
      for (length <- lengths) {
        offset += length
        out.writeLong(offset)
      }
    } {
      out.close()
    }
    val dataFile = getDataFile(shuffleId, mapId)
    // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
    // the following check and rename are atomic.
    synchronized {
      val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
      if (existingLengths != null) {
        // Another attempt for the same task has already written our map outputs successfully,
        // so just use the existing partition lengths and delete our temporary map outputs.
        System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
        if (dataTmp != null && dataTmp.exists()) {
          dataTmp.delete()
        }
        indexTmp.delete()
      } else {
        // This is the first successful attempt in writing the map outputs for this task,
        // so override any existing index and data files with the ones we wrote.
        if (indexFile.exists()) {
          indexFile.delete()
        }
        if (dataFile.exists()) {
          dataFile.delete()
        }
        if (!indexTmp.renameTo(indexFile)) {
          throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
        }
        if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
          throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
        }
      }
    }
  } finally {
    if (indexTmp.exists() && !indexTmp.delete()) {
      logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")
    }
  }
}

具體的實(shí)現(xiàn)就不詳細(xì)說明了沮明,主要就是根據(jù)上一步的到的partition長(zhǎng)度的數(shù)組將偏移量寫入到index文件中辕坝。

最后就是實(shí)例化MapStatus,shuffle read的時(shí)候根據(jù)MapStatus獲取數(shù)據(jù)荐健。至此BaseShuffleHandle & SortShuffleWriter的部分就結(jié)束了酱畅。

BypassMergeSortShuffleHandle & BypassMergeSortShuffleWriter

再來看一下BypassMergeSortShuffleWriter的write方法:

public void write(Iterator<Product2<K, V>> records) throws IOException {
  assert (partitionWriters == null);
  if (!records.hasNext()) {
    partitionLengths = new long[numPartitions];
    shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
    return;
  }
  final SerializerInstance serInstance = serializer.newInstance();
  final long openStartTime = System.nanoTime();
  partitionWriters = new DiskBlockObjectWriter[numPartitions];
  // 針對(duì)每一個(gè)reducer建立一個(gè)臨時(shí)文件
  for (int i = 0; i < numPartitions; i++) {
    final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
      blockManager.diskBlockManager().createTempShuffleBlock();
    final File file = tempShuffleBlockIdPlusFile._2();
    final BlockId blockId = tempShuffleBlockIdPlusFile._1();
    partitionWriters[i] =
      blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open();
  }
  // Creating the file to write to and creating a disk writer both involve interacting with
  // the disk, and can take a long time in aggregate when we open many files, so should be
  // included in the shuffle write time.
  writeMetrics.incShuffleWriteTime(System.nanoTime() - openStartTime);
  // 根據(jù)partition將記錄寫入到不同的臨時(shí)文件中
  while (records.hasNext()) {
    final Product2<K, V> record = records.next();
    final K key = record._1();
    partitionWriters[partitioner.getPartition(key)].write(key, record._2());
  }
  for (DiskBlockObjectWriter writer : partitionWriters) {
    writer.commitAndClose();
  }
  // 將所有的臨時(shí)文件內(nèi)容按照partition Id合并到一個(gè)文件
  File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
  File tmp = Utils.tempFileWith(output);
  try {
    // 將記錄和partition長(zhǎng)度信息分別寫入到data文件和index文件中
    partitionLengths = writePartitionedFile(tmp);
    shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
  } finally {
    if (tmp.exists() && !tmp.delete()) {
      logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
    }
  }
  mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}

該writer在進(jìn)行寫記錄之前會(huì)根據(jù)reducer的個(gè)數(shù)(例如R個(gè))生成R個(gè)臨時(shí)文件,然后將記錄寫入對(duì)應(yīng)的臨時(shí)文件中摧扇,最后將這些文件進(jìn)行合并操作并寫入到一個(gè)文件中圣贸,由于直接將記錄寫入了臨時(shí)文件,并沒有緩存在內(nèi)存中扛稽,所以如果reducer的個(gè)數(shù)過多的話吁峻,就會(huì)為每個(gè)reducer打開一個(gè)臨時(shí)文件,如果reducer的數(shù)量過多的話就會(huì)影響性能,所以使用該種方式需要滿足一下條件(下面是源碼中的注釋):

    1. no Ordering is specified.
    1. no Aggregator is specified.
    1. the number of partitions is less than spark.shuffle.sort.bypassMergeThreshold.

其中spark.shuffle.sort.bypassMergeThreshold的個(gè)數(shù)默認(rèn)為200個(gè)用含。

SerializedShuffleHandle & UnsafeShuffleWriter

最后再來看一下UnsafeShuffleWriter矮慕,也就是通常所說的Tungsten。

UnsafeShuffleWriter的write方法:

public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
  // Keep track of success so we know if we encountered an exception
  // We do this rather than a standard try/catch/re-throw to handle
  // generic throwables.
  boolean success = false;
  try {
    while (records.hasNext()) {
      // 循環(huán)便利所有記錄啄骇,對(duì)其作用insertRecordIntoSorter方法
      insertRecordIntoSorter(records.next());
    }
    // 將數(shù)據(jù)輸出到磁盤上
    closeAndWriteOutput();
    success = true;
  } finally {
    if (sorter != null) {
      try {
        sorter.cleanupResources();
      } catch (Exception e) {
        // Only throw this error if we won't be masking another
        // error.
        if (success) {
          throw e;
        } else {
          logger.error("In addition to a failure during writing, we failed during " +
                       "cleanup.", e);
        }
      }
    }
  }
}

首先來看insertRecordIntoSorter:

void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
  assert(sorter != null);
  final K key = record._1();
  final int partitionId = partitioner.getPartition(key);
  serBuffer.reset();
  serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
  serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
  serOutputStream.flush();
  final int serializedRecordSize = serBuffer.size();
  assert (serializedRecordSize > 0);
  sorter.insertRecord(
    serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
}

可以看出實(shí)際上調(diào)用的是ShuffleExternalSorter的insertRecord方法痴鳄,限于篇幅,具體的底層實(shí)現(xiàn)暫不說明缸夹,以后有時(shí)間會(huì)單獨(dú)分析一下Tungsten的部分痪寻,接下來看一下closeAndWriteOutput方法:

void closeAndWriteOutput() throws IOException {
  assert(sorter != null);
  updatePeakMemoryUsed();
  serBuffer = null;
  serOutputStream = null;
  // 獲得spilled的文件
  final SpillInfo[] spills = sorter.closeAndGetSpills();
  sorter = null;
  final long[] partitionLengths;
  // 最終的輸出文件
  final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
  // 臨時(shí)文件
  final File tmp = Utils.tempFileWith(output);
  try {
    try {
      // 將spilled的文件合并并寫入到臨時(shí)文件
      partitionLengths = mergeSpills(spills, tmp);
    } finally {
      for (SpillInfo spill : spills) {
        if (spill.file.exists() && ! spill.file.delete()) {
          logger.error("Error while deleting spill file {}", spill.file.getPath());
        }
      }
    }
    // 將partition的長(zhǎng)度信息寫入index文件
    shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
  } finally {
    if (tmp.exists() && !tmp.delete()) {
      logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
    }
  }
  // mapStatus
  mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}

用圖來總結(jié)一下上面描述的三種方式如下所示:

BaseShuffleHandle & SortShuffleWriter
SerializedShuffleHandle & UnsafeShuffleWriter
BypassMergeSortShuffleHandle & BypassMergeSortShuffleWriter

最后需要說明的是如果采用的是SortShuffleManager,最后每個(gè)task產(chǎn)生的文件的個(gè)數(shù)為2 * M(M代表Mapper端ShuffleMapTask的個(gè)數(shù))虽惭,相對(duì)于Hash的方式來說文件的個(gè)數(shù)明顯減少橡类。

至此Shuffle Write的部分就分析完了,下一遍文章會(huì)繼續(xù)分析Shuffle Read的部分芽唇。

本文參照的是Spark 1.6.3版本的源碼顾画,同時(shí)給出Spark 2.1.0版本的連接:

Spark 1.6.3 源碼

Spark 2.1.0 源碼

本文為原創(chuàng),歡迎轉(zhuǎn)載匆笤,轉(zhuǎn)載請(qǐng)注明出處研侣、作者,謝謝炮捧!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末庶诡,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子寓盗,更是在濱河造成了極大的恐慌灌砖,老刑警劉巖璧函,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件傀蚌,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡蘸吓,警方通過查閱死者的電腦和手機(jī)善炫,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來库继,“玉大人箩艺,你說我怎么就攤上這事∠芴眩” “怎么了艺谆?”我有些...
    開封第一講書人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)拜英。 經(jīng)常有香客問我静汤,道長(zhǎng),這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任虫给,我火速辦了婚禮藤抡,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘抹估。我一直安慰自己缠黍,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開白布药蜻。 她就那樣靜靜地躺著瓷式,像睡著了一般。 火紅的嫁衣襯著肌膚如雪语泽。 梳的紋絲不亂的頭發(fā)上蒿往,一...
    開封第一講書人閱讀 51,125評(píng)論 1 297
  • 那天,我揣著相機(jī)與錄音湿弦,去河邊找鬼瓤漏。 笑死,一個(gè)胖子當(dāng)著我的面吹牛颊埃,可吹牛的內(nèi)容都是我干的蔬充。 我是一名探鬼主播,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼班利,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼饥漫!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起罗标,我...
    開封第一講書人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤庸队,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后闯割,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體彻消,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年宙拉,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了宾尚。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡谢澈,死狀恐怖煌贴,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情锥忿,我是刑警寧澤牛郑,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站敬鬓,受9級(jí)特大地震影響淹朋,放射性物質(zhì)發(fā)生泄漏灶似。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一瑞你、第九天 我趴在偏房一處隱蔽的房頂上張望酪惭。 院中可真熱鬧,春花似錦者甲、人聲如沸春感。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽鲫懒。三九已至,卻和暖如春刽辙,著一層夾襖步出監(jiān)牢的瞬間窥岩,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來泰國(guó)打工宰缤, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留颂翼,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓慨灭,卻偏偏與公主長(zhǎng)得像朦乏,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子氧骤,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容