[第十五章]Shuffle的讀寫源碼剖析_4

上兩節(jié)我們講了普通shuffle的操作原理,與優(yōu)化后的操作原理院尔。并對比了他們各自的特別蜻展。那么我就了解到spark shuffle其實是進行了兩步
第一步,ShuffleMapTask執(zhí)行后把計算出來的數(shù)據(jù)寫入ShuffleBlockFile里
第二步邀摆,ResultTask讀取這些數(shù)據(jù)文件進行計算纵顾。
節(jié)章節(jié)就是深入剖析這兩步的源碼。

我們在前面講過Executor在執(zhí)行Task時栋盹,調(diào)用runTask方法施逾,并返回MapStatus

 try {
      //獲取shuffleManager,在用shuffleManager獲取shuffleWriter對象
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      //最重要的代碼在這里
      /**
       * 首先調(diào)用了rdd.iterator方法,參數(shù)是傳入了當前 task要處理的partition
       * 所以核心的邏輯,就是rdd的iterator方法中汉额,在這里沪饺,就實現(xiàn)了針對RDD的某個partitione,執(zhí)行
       * 我們定義的算子或者函數(shù)
       * 當執(zhí)行完我們自定義的算子或者函數(shù),是不是相當于針對rdd的partiton執(zhí)行了處理闷愤,那么是不是有返回值
       * ok,返回的數(shù)據(jù)整葡,都是通過ShuffleWriter,經(jīng)過HashPartitioner進行分區(qū)后,寫入自己對應(yīng)的bucket
       */
      writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
      /**
       * 最后讥脐,結(jié)果返回MapStatus
       * Mapstatus封裝了ShuffleMapTask計算 后的數(shù)據(jù)遭居,存儲在哪里,這其實就是BlockManager相關(guān)的信息
       * BlockManager是spark底層的內(nèi)存旬渠,數(shù)據(jù)俱萍,磁盤管理的組件
       * 講完shuffle,我們會講blockManager
       */
      return writer.stop(success = true).get
    } 

里面講了
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
其實這個writer就是HashSuffleWriter.里write方法里,我們首先要判斷是否需要在Map端本地聚會告丢。是什么情況下可以聚合呢枪蘑,這要看我們實際的業(yè)務(wù),比如前面說的reduceBykey岖免。

  //將每個ShuffleMapTask計算出來的rdd的partition數(shù)據(jù)岳颇,寫入本地磁盤
  override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
    //首先要判斷,是否需要在map端本地聚合
    //這里包括reduceByKey,這樣的操作颅湘,dep.aggreatetor.isDefined是true
    //包括dep.mapSideCombine也是true
    val iter = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {
        //這里是執(zhí)行本地聚會
        //比如本地:(hello,1)(hello,1) ==> (hello,2)
        dep.aggregator.get.combineValuesByKey(records, context)
      } else {
        records
      }
    } else {
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
      records
    }

那么這里說到了本地聚合话侧,也可以叫組合(combine),有什么用呢闯参,大家想一相瞻鹏,是不是這里要走網(wǎng)絡(luò)傳遞了,要是本地聚合后鹿寨,大大減少了網(wǎng)絡(luò)流量 了
接著我們看源碼:

//如果本地聚會就本地聚會新博,
    //然后遍歷數(shù)據(jù)
    //對每個數(shù)據(jù),調(diào)用partitioner脚草,默認是HashPartitioner,生成bucketId
    //也就是決定 了赫悄,每一份數(shù)據(jù)寫入哪個bucket中
    for (elem <- iter) {
      val bucketId = dep.partitioner.getPartition(elem._1)
     // 調(diào)用shuffleBlockManager.forMapTask來生成bucketId對應(yīng)的Writer,然后把數(shù)據(jù)寫入bucket
      shuffle.writers(bucketId).write(elem)
    }
  }

接著我們看forMapTask的方法。這個方法里就看到我們前面兩節(jié)講的普通shuffle與優(yōu)化后的shuffle寫入本地磁盤的區(qū)別玩讳。

ivate val shuffleState = shuffleStates(shuffleId)
      private var fileGroup: ShuffleFileGroup = null

      //這里很關(guān)鍵涩蜘,前面我們講過Shuffle有兩種,一種是普通的熏纯,一種是優(yōu)化后的
      //這里會判斷,如果開啟了consolidate,就是consoldateShuffleFile是true
      //這里不會為每個bucket都獲取一個獨立的文件
      //而是為這個bucket,獲取一個ShuffleGroup的Writer
      val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
        fileGroup = getUnusedFileGroup()
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
          //首先用shuffleId, mapId, bucketId來生成一個唯一的blockId
          //然后用bucket獲取一個ShuffleGroup
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
          //然后用BlockManager.getDiskWriter方法粤策,針對ShuffleGroup獲取一個Writer
          /**
           * 這樣我們就清楚了樟澜,如果開啟了consolidate機制
           * 實際上,對于每一個bucket,都會獲取一個針對ShuffleFileGroup的writer
           * 而不是一個獨立的ShuffleBlockFile的writer
           * 
           *  這樣就實現(xiàn)了多個ShuffleMapTask的輸出 數(shù)據(jù)的合并
           */
          blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize,
            writeMetrics)
        }} 

上面講了當開啟了consolidate機制后,對于每一個bucket都會獲取一個ShuffleFileGroup的writer
而普通的shuffle的源碼如下:

else {
        //如果沒有開啟consolicate機制秩贰,也就是普通的Shuffle
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
          //同樣生成一個blockId
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
          //然后調(diào)用  blockManager.diskBlockManager霹俺,獲取了一個代表要寫入磁盤文件的blockFile
          val blockFile = blockManager.diskBlockManager.getFile(blockId)
          // Because of previous failures, the shuffle file may already exist on this machine.
          
          // If so, remove it.
          if (blockFile.exists) {
            if (blockFile.delete()) {
              logInfo(s"Removed existing shuffle file $blockFile")
            } else {
              logWarning(s"Failed to remove existing shuffle file $blockFile")
            }
          }
          //然后調(diào)用 這個方法,針對那個blockFile生成writer
          blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
        
          /**
           *所以對于普通的Shuffle操作的話
           * 對于每個ShuffleMapTask輸出的bucket,都會在本地獲取一個單獨的ShuffleBlockFIle文件
           */
        }

我們看到了不管是哪種的shuffle毒费,最終調(diào)用blockMamager.getDiskWriter方法寫數(shù)據(jù)到本地磁盤丙唧。這就是Shuffle的第一步,寫文件數(shù)據(jù)操作觅玻。

接下來我們看第二步想际,
在前面分析Task是不是分析了task的計算 ,調(diào)用了compute方法,里面是不是通過getReader方法得到ShuffleMapReader溪厘,調(diào)用read方法

 override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
      .read()
      .asInstanceOf[Iterator[(K, C)]]
  }

override def read(): Iterator[Product2[K, C]] = {
    val ser = Serializer.getSerializer(dep.serializer)
    //這里和我們以前分析的圖串起來了吧DAGScheduler的MapoutputTrackerMaster中獲取自己想要的數(shù)據(jù)信息
    //然后底層用 blockMamger拉取自己的數(shù)據(jù)
    val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)

    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {
        new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
      } else {
        new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
      }
    } else {
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")

      // Convert the Product2s to pairs since this is what downstream RDDs currently expect
      iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
    }

用blockManager拉取自己的數(shù)據(jù)胡本,調(diào)用ftech方法:

 def fetch[T](
      shuffleId: Int,
      reduceId: Int,
      context: TaskContext,
      serializer: Serializer)
    : Iterator[T] =
  {
    logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
    val blockManager = SparkEnv.get.blockManager

    val startTime = System.currentTimeMillis
    /**
     * 拿到mapOutputTracker的引用 ,然后調(diào)用getServerStatuses
     * suffleId表示這個stage的上一個stage的ID
     * reduceId是bucketId
     * 這兩個參數(shù)可以限制找到當前resultTask獲取所需要的那份數(shù)據(jù)
     * getServerStatuses這個方法一定會走網(wǎng)絡(luò)通信的畸悬,因為要聯(lián)系dirver的
     */
    val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)

    //ShuffleBlockFetcherIterator構(gòu)造 以后侧甫,就直接根據(jù)拉取地睛位置信息,通過BlockMamager
    //去遠程的ShuffleTask所以節(jié)點的blockManager去拉取數(shù)據(jù)
    val blockFetcherItr = new ShuffleBlockFetcherIterator(
      context,
      SparkEnv.get.blockManager.shuffleClient,
      blockManager,
      blocksByAddress,
      serializer,
      SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)
    val itr = blockFetcherItr.flatMap(unpackBlock)

    //對拉取的數(shù)據(jù)進行封裝
    val completionIter = CompletionIterator[T, Iterator[T]](itr, {
      context.taskMetrics.updateShuffleReadMetrics()
    })

這就是ResultTask讀取數(shù)據(jù)的過程蹋宦。在spark中Shuffle是重點披粟。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市冷冗,隨后出現(xiàn)的幾起案子僻爽,更是在濱河造成了極大的恐慌,老刑警劉巖贾惦,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件胸梆,死亡現(xiàn)場離奇詭異,居然都是意外死亡须板,警方通過查閱死者的電腦和手機碰镜,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來习瑰,“玉大人绪颖,你說我怎么就攤上這事√鹧伲” “怎么了柠横?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長课兄。 經(jīng)常有香客問我牍氛,道長,這世上最難降的妖魔是什么烟阐? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任搬俊,我火速辦了婚禮紊扬,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘唉擂。我一直安慰自己餐屎,他們只是感情好,可當我...
    茶點故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布玩祟。 她就那樣靜靜地躺著腹缩,像睡著了一般。 火紅的嫁衣襯著肌膚如雪空扎。 梳的紋絲不亂的頭發(fā)上藏鹊,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天,我揣著相機與錄音勺卢,去河邊找鬼伙判。 笑死,一個胖子當著我的面吹牛黑忱,可吹牛的內(nèi)容都是我干的宴抚。 我是一名探鬼主播,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼甫煞,長吁一口氣:“原來是場噩夢啊……” “哼菇曲!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起抚吠,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤常潮,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后楷力,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蛾方,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡投剥,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片懂鸵。...
    茶點故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡企蹭,死狀恐怖婿斥,靈堂內(nèi)的尸體忽然破棺而出茴恰,到底是詐尸還是另有隱情,我是刑警寧澤何址,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布里逆,位于F島的核電站,受9級特大地震影響用爪,放射性物質(zhì)發(fā)生泄漏原押。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一项钮、第九天 我趴在偏房一處隱蔽的房頂上張望班眯。 院中可真熱鬧希停,春花似錦烁巫、人聲如沸署隘。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽磁餐。三九已至,卻和暖如春阿弃,著一層夾襖步出監(jiān)牢的瞬間诊霹,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工渣淳, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留脾还,地道東北人。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓入愧,卻偏偏與公主長得像鄙漏,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子棺蛛,可洞房花燭夜當晚...
    茶點故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 場景 數(shù)據(jù)傾斜解決方案與shuffle類性能調(diào)優(yōu) 分析 數(shù)據(jù)傾斜 有的時候怔蚌,我們可能會遇到大數(shù)據(jù)計算中一個最棘手的...
    過江小卒閱讀 3,443評論 0 9
  • 本文基于spark源碼2.11 1. 前言 shuffle是spark job中一個重要的階段,發(fā)生在map和re...
    aaron1993閱讀 11,715評論 1 12
  • 1 數(shù)據(jù)傾斜調(diào)優(yōu) 1.1 調(diào)優(yōu)概述 有的時候旁赊,我們可能會遇到大數(shù)據(jù)計算中一個最棘手的問題——數(shù)據(jù)傾斜桦踊,此時Spar...
    wisfern閱讀 2,935評論 0 23
  • Spark 數(shù)據(jù)傾斜的解決辦法 調(diào)優(yōu)概述 轉(zhuǎn)載:http://blog.csdn.net/lw_ghy/artic...
    raincoffee閱讀 1,131評論 0 6
  • 下雨天 女孩打著粉色小傘 朝著夢境走去··· 雨后的世界 那么干凈,那么清新 陽光透過指縫 徜徉于臉龐 投影了一地...
    曹小七閱讀 291評論 0 4