[SPARK][CORE] 面試問題之 Shuffle reader 的細(xì)枝末節(jié) (上)

歡迎關(guān)注微信公眾號(hào)“Tim在路上”
之前我們已經(jīng)了解了shuffle writer的詳細(xì)過程,那么生成文件后會(huì)發(fā)生什么呢?以及它們是如何被讀取呢?讀取是內(nèi)存的操作嗎寓搬?這些問題也隨之產(chǎn)生假丧,那么今天我們將先來(lái)了解了shuffle reader的細(xì)枝末節(jié)双揪。

在文章Spark Shuffle概述中我們已經(jīng)知道,在ShuffleManager中不僅定義了getWriter來(lái)獲取map writer的實(shí)現(xiàn)方式包帚, 同時(shí)還定義了getReader來(lái)獲取讀取shuffle文件的實(shí)現(xiàn)方式渔期。 在Spark中調(diào)用有兩個(gè)調(diào)用getReader的抽象類的重要實(shí)現(xiàn),分別是ShuffledRDD和ShuffleRowRDD渴邦。前者是與RDD API交互疯趟,后面一個(gè)是DataSet Api的交互實(shí)現(xiàn)。在Spark 3.0后其核心已經(jīng)變成了Spark SQL谋梭,所以我們重點(diǎn)從ShuffleRowRDD調(diào)用getReader開始講起信峻。

從ShuffleRowRDD開始

ShuffleRowRDD主要是被ShuffleExchangeExec調(diào)用。這里簡(jiǎn)單介紹下ShuffleExchangeExec操作算子瓮床。它主要負(fù)責(zé)兩件事:首先盹舞,準(zhǔn)備ShuffleDependency,它根據(jù)父節(jié)點(diǎn)所需的分區(qū)方案對(duì)子節(jié)點(diǎn)的輸出行進(jìn)行分區(qū)隘庄。其次踢步,添加一個(gè)ShuffleRowRDD并指定準(zhǔn)備好的ShuffleDependency作為此RDD的依賴項(xiàng)。


2927.png
class ShuffledRowRDD(
    var dependency: ShuffleDependency[Int, InternalRow, InternalRow],
    metrics: Map[String, SQLMetric],
    partitionSpecs: Array[ShufflePartitionSpec])
  extends RDD[InternalRow](dependency.rdd.context,Nil)

ShuffleRowRDD繼承自RDD[InternalRow], 同時(shí)內(nèi)部維護(hù)著三個(gè)參數(shù)丑掺,分別是dependency获印,metrics和partitionSpecs。dependency封裝著shuffleId 街州,shuffleHandle 兼丰,numPartitions 可以基于其判斷出shuffleWriter采用了哪種方式。partitionSpecs定義了分區(qū)規(guī)范的類型唆缴。

目前在spark 3.2版本中partitionSpecs的實(shí)現(xiàn)類主要有以下四個(gè):

  • CoalescedPartitionSpec用于coalesce shuffle partitions 邏輯規(guī)則
  • PartialReducerPartitionSpec參與了 skew join 優(yōu)化
  • PartialMapperPartitionSpec用于本地隨機(jī)讀取器
  • CoalescedMapperPartitionSpec用于優(yōu)化本地隨機(jī)讀取器

不同類型的分區(qū)規(guī)范其實(shí)質(zhì)是代表不同的隨機(jī)讀取的參數(shù)地粪。我們都知道在Spark Shuffle中g(shù)etReader僅有且唯一的一個(gè)實(shí)現(xiàn)方式, 即BlockStoreShuffleReader 的實(shí)現(xiàn)。但是不同的分區(qū)規(guī)范意味將給共享的reader器傳遞不同的參數(shù)琐谤, 下面是ShuffleRowRDD中的簡(jiǎn)化代碼:

// ShuffleRowRDD
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
  val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
  // `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator,
  // as well as the `tempMetrics` for basic shuffle metrics.
  val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics)
  val reader = split.asInstanceOf[ShuffledRowRDDPartition].spec match {
    // CoalescedPartitionSpec會(huì)讀取map task為所有reducer所產(chǎn)生的shuffle file
    case CoalescedPartitionSpec(startReducerIndex, endReducerIndex, _) =>
      SparkEnv.get.shuffleManager.getReader(
        dependency.shuffleHandle,
        startReducerIndex,
        endReducerIndex,
        context,
        sqlMetricsReporter)
   // PartialReducerPartitionSpec 讀取map task為一個(gè)reducer產(chǎn)生的部分?jǐn)?shù)據(jù)
    case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex, _) =>
      SparkEnv.get.shuffleManager.getReader(
        dependency.shuffleHandle,
        startMapIndex,
        endMapIndex,
        reducerIndex,
        reducerIndex + 1,
        context,
        sqlMetricsReporter)
   // PartialMapperPartitionSpec讀取shuffle map文件的部分
   case PartialMapperPartitionSpec(mapIndex, startReducerIndex, endReducerIndex) =>
        SparkEnv.get.shuffleManager.getReader(
          dependency.shuffleHandle,
          mapIndex,
          mapIndex + 1,
          startReducerIndex,
          endReducerIndex,
          context,
          sqlMetricsReporter)
...
    reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(_._2)
  }

其實(shí)從上面?zhèn)鞯膮?shù)中就可以看出點(diǎn)端倪CoalescedPartitionSpec(startReducerIndex,endReducer-Index) 讀取map task為所有reducer所產(chǎn)生的shuffle file;PartialReducerPartitionSpec(startMap-Index, endMapIndex,reducerIndex,reducerIndex + 1) 可以看出每次讀取只會(huì)為一個(gè)reducer讀取部分?jǐn)?shù)據(jù)玩敏。

從上面代碼可以看出ShuffleRowRDD 使用 read() 方法遍歷 shuffle 數(shù)據(jù)并將其返回給客戶端斗忌,那么接下來(lái)我們就詳細(xì)的看下getReader是如何實(shí)現(xiàn)的?

ShuffleReader調(diào)用前的準(zhǔn)備

SortShuffleManager是ShuffleManager的唯一實(shí)現(xiàn)旺聚,里面也實(shí)現(xiàn)getReader方法织阳,那么就讓我們從getReader開始。

override def getReader[K, C](
    handle: ShuffleHandle,
    startMapIndex: Int,
    endMapIndex: Int,
    startPartition: Int,
    endPartition: Int,
    context: TaskContext,
    metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
  val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, _, C]]
  val (blocksByAddress, canEnableBatchFetch) =
    // 是否開啟了push-based shuffle, 后續(xù)再分享砰粹,這里先跳過
    if (baseShuffleHandle.dependency.shuffleMergeEnabled) {
      val res = SparkEnv.get.mapOutputTracker.getPushBasedShuffleMapSizesByExecutorId(
        handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
      (res.iter, res.enableBatchFetch)
    } else {
      // [1] 使用mapOutputTracker獲取shuffle塊的位置
      val address = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
        handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition)
      (address, true)
    }
  // [2] 創(chuàng)建一個(gè)BlockStoreShuffleReader實(shí)例唧躲,該實(shí)例將負(fù)責(zé)將shuffle文件從mapper傳遞到 reducer 任務(wù)
  new BlockStoreShuffleReader(
    handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
    shouldBatchFetch =
      canEnableBatchFetch &&canUseBatchFetch(startPartition, endPartition, context))
}

可以看到getReader主要做了兩件事:

  • [1] 使用mapOutputTracker獲取shuffle塊的位置
  • [2] 創(chuàng)建一個(gè)BlockStoreShuffleReader實(shí)例,該實(shí)例將負(fù)責(zé)將shuffle文件從mapper傳遞到reducer 任務(wù)

那么Spark中如何保存和獲取shuffle塊的位置呢?

在spark中有兩種mapOutputTracker弄痹,兩種mapOutputTracker 都是在創(chuàng)建SparkEnv時(shí)創(chuàng)建饭入。

其中第一個(gè)是MapOutputTrackerMaster,它駐留在驅(qū)動(dòng)程序中并跟蹤每個(gè)階段的map output輸出, 并與DAGScheduler進(jìn)行通信肛真。

另一個(gè)是MapOutputTrackerWorker谐丢,位于執(zhí)行器上,它負(fù)責(zé)從MapOutputTrackerMaster獲取shuffle 元數(shù)據(jù)信息蚓让。

MapOutputTrackerMaster:

  1. DAGScheduler在創(chuàng)建 shuffle map 階段時(shí)會(huì)調(diào)用registerShuffle方法乾忱,從下面的代碼可以看出在創(chuàng)建ShuffleMapStage會(huì)調(diào)用registerShuffle,其實(shí)質(zhì)是在向 shuffleStatuses 映射器中放入shuffleid, 并為其值創(chuàng)建一個(gè)新的new ShuffleStatus(numMaps)历极。
def createShuffleMapStage[K, V, C](
    shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {
  val rdd = shuffleDep.rdd
  ...
  stageIdToStage(id) = stage
  shuffleIdToMapStage(shuffleDep.shuffleId) = stage
  updateJobIdStageIdMaps(jobId, stage)

  if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
    // 在創(chuàng)建ShuffleMapStage會(huì)調(diào)用registerShuffle
    mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length,
      shuffleDep.partitioner.numPartitions)
  }
  stage
}

def registerShuffle(shuffleId: Int, numMaps: Int, numReduces: Int): Unit = {
    if (pushBasedShuffleEnabled) {
      if (shuffleStatuses.put(shuffleId, new ShuffleStatus(numMaps, numReduces)).isDefined) {
        throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
      }
    } else {
      // 可以看到其實(shí)質(zhì)是在向 shuffleStatuses 放入shuffleid, 創(chuàng)建ShuffleStatus
      if (shuffleStatuses.put(shuffleId, new ShuffleStatus(numMaps)).isDefined) {
        throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
      }
    }
  }

  1. 到目前位置master tracker存放了一個(gè)shuffleid, 表明DAG中存在一個(gè)shuffle, 但還是不知道m(xù)ap output file的具體位置窄瘟。
// DAGScheduler中
private[scheduler] def handleTaskCompletion(event: CompletionEvent): Unit = {

  case smt: ShuffleMapTask =>
     val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
     ...
     mapOutputTracker.registerMapOutput(
        shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
  }

def registerMapOutput(shuffleId: Int, mapIndex: Int, status: MapStatus): Unit = {
    shuffleStatuses(shuffleId).addMapOutput(mapIndex, status)
}

從上面代碼可以看出,在每次 shuffle map 階段的任務(wù)終止時(shí)趟卸,DAGScheduler都會(huì)向MapOutputTrackerMaster發(fā)送狀態(tài)更新蹄葱。跟蹤器將有關(guān)特定 shuffle 文件的位置和大小的信息添加到在注冊(cè)步驟中初始化 的shuffleStatuses map中。


3tled.png

MapOutputTrackerWorker:

當(dāng)worker tracker 沒有緩存shuffle信息, 這時(shí)就必須發(fā)送GetMapOutputStatuses消息來(lái)從master tracker獲取它衰腌。

再回過頭來(lái)看看新蟆,在getReader中通過mapOutputTracker獲取shuffle塊的位置的方法。

// mapOutTracker
private def getMapSizesByExecutorIdImpl(
    shuffleId: Int,
    startMapIndex: Int,
    endMapIndex: Int,
    startPartition: Int,
    endPartition: Int,
    useMergeResult: Boolean): MapSizesByExecutorId = {
  logDebug(s"Fetching outputs for shuffle$shuffleId")
  // [1] 獲取mapOutputStatuses
  val (mapOutputStatuses, mergedOutputStatuses) = getStatuses(shuffleId, conf,
    // EnableBatchFetch can be set to false during stage retry when the
    // shuffleDependency.shuffleMergeEnabled is set to false, and Driver
    // has already collected the mergedStatus for its shuffle dependency.
    // In this case, boolean check helps to insure that the unnecessary
    // mergeStatus won't be fetched, thus mergedOutputStatuses won't be
    // passed to convertMapStatuses. See details in [SPARK-37023].
    if (useMergeResult)fetchMergeResultelse false)
  ...
}

從上面可以看出獲取具體的map output 位置的實(shí)現(xiàn)在getStatuses方法中右蕊。下面我們來(lái)具體分析下:

private def getStatuses(
    shuffleId: Int,
    conf: SparkConf,
    canFetchMergeResult: Boolean): (Array[MapStatus], Array[MergeStatus]) = {
  // push-based shuffle 開啟琼稻,獲取MergeStatus, 現(xiàn)暫不考慮
  if (canFetchMergeResult) {
    ...
  } else {
    val statuses = mapStatuses.get(shuffleId).orNull
    // [1] 如果mapStatuses不包含statuses饶囚, 就向master tracker發(fā)送GetMapOutputStatuses消息
    if (statuses == null) {
      logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
      val startTimeNs = System.nanoTime()
fetchingLock.withLock(shuffleId) {
        var fetchedStatuses =mapStatuses.get(shuffleId).orNull
        if (fetchedStatuses == null) {
          logInfo("Doing the fetch; tracker endpoint = " +trackerEndpoint)
          val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))
          try {
            fetchedStatuses =
              MapOutputTracker.deserializeOutputStatuses[MapStatus](fetchedBytes, conf)
          } catch {
            ...
          }
          logInfo("Got the map output locations")
          mapStatuses.put(shuffleId, fetchedStatuses)
        }
        (fetchedStatuses, null)
      }
    // [2] 如果mapStatuses包含statuses, 直接返回
    } else {
      (statuses, null)
    }
  }
}

從getStatuses可以看出:

  • [1] 如果mapStatuses不包含statuses, 就向master tracker發(fā)送GetMapOutputStatuses消息
  • [2] 如果mapStatuses包含statuses, 直接返回
private[spark] sealed trait MapStatus extends ShuffleOutputStatus {
  def location: BlockManagerId

  def updateLocation(newLoc: BlockManagerId): Unit

  def getSizeForBlock(reduceId: Int): Long

  def mapId: Long
}

可見MapStatus中包含了location帕翻, mapId等信息。

最后萝风,回到getReader方法中嘀掸,通過SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId獲取shuffle塊信息后,再將其作為 shuffle 塊的及其物理位置傳遞給BlockStoreShuffleReader规惰。

那么接下來(lái)就我們?cè)賮?lái)分析下BlockStoreShuffleReader的實(shí)現(xiàn)睬塌。

為避免冗長(zhǎng)將BlockStoreShuffleReader放到下一講進(jìn)行分析。
歡迎關(guān)注微信公眾號(hào)“Tim在路上”

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末歇万,一起剝皮案震驚了整個(gè)濱河市揩晴,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌贪磺,老刑警劉巖硫兰,帶你破解...
    沈念sama閱讀 206,126評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異寒锚,居然都是意外死亡劫映,警方通過查閱死者的電腦和手機(jī)违孝,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,254評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)泳赋,“玉大人雌桑,你說(shuō)我怎么就攤上這事∧∧ⅲ” “怎么了筹燕?”我有些...
    開封第一講書人閱讀 152,445評(píng)論 0 341
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)衅鹿。 經(jīng)常有香客問我撒踪,道長(zhǎng),這世上最難降的妖魔是什么大渤? 我笑而不...
    開封第一講書人閱讀 55,185評(píng)論 1 278
  • 正文 為了忘掉前任制妄,我火速辦了婚禮,結(jié)果婚禮上泵三,老公的妹妹穿的比我還像新娘耕捞。我一直安慰自己,他們只是感情好烫幕,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,178評(píng)論 5 371
  • 文/花漫 我一把揭開白布俺抽。 她就那樣靜靜地躺著,像睡著了一般较曼。 火紅的嫁衣襯著肌膚如雪磷斧。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 48,970評(píng)論 1 284
  • 那天捷犹,我揣著相機(jī)與錄音弛饭,去河邊找鬼。 笑死萍歉,一個(gè)胖子當(dāng)著我的面吹牛侣颂,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播枪孩,決...
    沈念sama閱讀 38,276評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼憔晒,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了蔑舞?” 一聲冷哼從身側(cè)響起拒担,我...
    開封第一講書人閱讀 36,927評(píng)論 0 259
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎斗幼,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體抚垄,經(jīng)...
    沈念sama閱讀 43,400評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡蜕窿,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,883評(píng)論 2 323
  • 正文 我和宋清朗相戀三年谋逻,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片桐经。...
    茶點(diǎn)故事閱讀 37,997評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡毁兆,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出阴挣,到底是詐尸還是另有隱情气堕,我是刑警寧澤,帶...
    沈念sama閱讀 33,646評(píng)論 4 322
  • 正文 年R本政府宣布畔咧,位于F島的核電站茎芭,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏誓沸。R本人自食惡果不足惜梅桩,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,213評(píng)論 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望拜隧。 院中可真熱鬧宿百,春花似錦、人聲如沸洪添。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,204評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)干奢。三九已至痊焊,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間律胀,已是汗流浹背宋光。 一陣腳步聲響...
    開封第一講書人閱讀 31,423評(píng)論 1 260
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留炭菌,地道東北人罪佳。 一個(gè)月前我還...
    沈念sama閱讀 45,423評(píng)論 2 352
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像黑低,于是被迫代替她去往敵國(guó)和親赘艳。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,722評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容