Spark Stage如何劃分

本文基于2.3.0
眾所周知良蒸,RDD的依賴(lài)關(guān)系形成后控嗜,我們就可以根據(jù)寬依賴(lài)劃分Stage了铐懊。
目前Spark 的 stage分為兩種: org.apache.spark.scheduler.ResultStageorg.apache.spark.scheduler.ShuffleMapStage, 他們都是由org.apache.spark.scheduler.DAGScheduler對(duì)RDD進(jìn)行劃分得來(lái)媒抠。

關(guān)于DAGScheduler的幾個(gè)問(wèn)題:

DAGScheduler什么時(shí)候生成

SparkContext初始化時(shí)

_dagScheduler = new DAGScheduler(this)
怎么觸發(fā)DAGScheduler進(jìn)行工作

總的來(lái)說(shuō)蝇完,DAGScheduler每一個(gè)工作的開(kāi)始爸邢,依靠事件驅(qū)動(dòng)巫员,其中當(dāng)然也包括Stage劃分。
下面來(lái)詳細(xì)分析一番:
與DAGScheduler工作相關(guān)的驅(qū)動(dòng)事件都定義在DAGSchedulerEvent.scala里甲棍,其中和啟動(dòng)Stage劃分有關(guān)的就是org.apache.spark.scheduler.JobSubmitted

各種事件

下面簡(jiǎn)單梳理一下整個(gè)事件觸發(fā)的流程:
首先org.apache.spark.SparkContext#submitJob提交job

def submitJob[T, U, R](
      rdd: RDD[T],
      processPartition: Iterator[T] => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit,
      resultFunc: => R): SimpleFutureAction[R] =
  {
    assertNotStopped()
    val cleanF = clean(processPartition)
    val callSite = getCallSite
    val waiter = dagScheduler.submitJob( / 重點(diǎn)
      rdd,
      (context: TaskContext, iter: Iterator[T]) => cleanF(iter),
      partitions,
      callSite,
      resultHandler,
      localProperties.get)
    new SimpleFutureAction(waiter, resultFunc)
  }

可以看到简识,最終還是調(diào)用dagScheduler.submitJob()
進(jìn)入org.apache.spark.scheduler.DAGScheduler#submitJob方法

 def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    ......
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted( / 把事件post出去
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }

很明顯赶掖,eventProcessLoopJobSubmitted事件做了一個(gè)post操作。這個(gè)post操作很簡(jiǎn)單七扰,就是將事件放入了org.apache.spark.util.EventLoop#eventQueue這個(gè)阻塞隊(duì)列而已奢赂。
代碼中的eventProcessLoop是什么呢? 其實(shí)就是EventLoop的子類(lèi)org.apache.spark.scheduler.DAGSchedulerEventProcessLoop颈走。很顯然膳灶,上面提到的隊(duì)列也就是DAGSchedulerEventProcessLoop對(duì)象的。
org.apache.spark.util.EventLoop這個(gè)類(lèi)里有個(gè)重要的線程需要提一下:

private val eventThread = new Thread(name) {
    setDaemon(true)
    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            onReceive(event)
          } catch {
            ......
      } catch {
          ......
      }
    }
  }

這個(gè)線程一旦開(kāi)啟立由,會(huì)不停在eventQueue取事件轧钓,然后調(diào)用onReceive(event)

到這锐膜,其實(shí)大概就已經(jīng)能猜到事件是如何觸發(fā)DAGScheduler工作了毕箍。
DAGScheduler內(nèi)部有成員變量DAGSchedulerEventProcessLoop, 并且會(huì)在自己初始化時(shí),調(diào)用其start方法

private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
eventProcessLoop.start()

這個(gè)start()方法做了什么呢道盏?

def start(): Unit = {
    if (stopped.get) {
      throw new IllegalStateException(name + " has already been stopped")
    }
    // Call onStart before starting the event thread to make sure it happens before onReceive
    onStart()
    eventThread.start()
  }

沒(méi)錯(cuò)而柑,啟動(dòng)了eventThread。到此荷逞,可以看到DAGScheduler持有eventProcessLoop, 自己post媒咳,自己消費(fèi)。
還記得eventThread#run方法里調(diào)用了onReceive(event)嗎种远?我們看下DAGSchedulerEventProcessLoop是如何實(shí)現(xiàn)父類(lèi)的onReceive(event)的涩澡。

override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
      doOnReceive(event) /重點(diǎn)
    } finally {
      timerContext.stop()
    }
  }

很簡(jiǎn)單,繼續(xù)跟進(jìn)doOnReceive(event) , org.apache.spark.scheduler.DAGSchedulerEventProcessLoop#doOnReceive

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
......
}

很簡(jiǎn)單坠敷,繼續(xù)跟進(jìn)dagScheduler.handleJobSubmitted, org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      / 分Stage入口
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) 
    }
    ......
}

看到createResultStage妙同,這就是開(kāi)始劃分Stage了。
到這里常拓,總結(jié)而言渐溶,提交job后,dagScheduler會(huì)根據(jù)JobSubmitted事件弄抬,觸發(fā)stage劃分工作茎辐。

具體DAGScheduler是如何劃分的

上面部分已經(jīng)提到createResultStage方法,跟進(jìn)看一下:

private def createResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    /1. 建立ResultStage 的 父Stage
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    /2. ResultStage直接new就可以了
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

ResultStage沒(méi)有什么太特別掂恕,直接new出來(lái)了拖陆,主要還是它的父Stage們。我們?cè)訇P(guān)注getOrCreateParentStages(rdd, jobId)

 private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    getShuffleDependencies(rdd).map { shuffleDep =>
      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
  }

上面代碼意思很明顯懊亡,要獲得rdd的ShuffleDependencies, 也就是寬依賴(lài)們依啰。然后逐個(gè)生成ShuffleMapStage. 那我們繼續(xù)跟蹤getOrCreateShuffleMapStage(shuffleDep, firstJobId)的調(diào)用,最后會(huì)來(lái)到DAGScheduler#createShuffleMapStage

def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
    val rdd = shuffleDep.rdd / 這里拿出了父rdd
    val numTasks = rdd.partitions.length
    val parents = getOrCreateParentStages(rdd, jobId) / 在此繼續(xù)往前追溯ShuffleMapStage
    val id = nextStageId.getAndIncrement()
    val stage = new ShuffleMapStage( /這個(gè)ShuffleMapStage的生成就到此為止了
      id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)

    stageIdToStage(id) = stage
    shuffleIdToMapStage(shuffleDep.shuffleId) = stage
    updateJobIdStageIdMaps(jobId, stage)

    if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
      // Kind of ugly: need to register RDDs with the cache and map output tracker here
      // since we can't do it in the RDD constructor because # of partitions is unknown
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
    }
    stage
  }

到這里,ShuffleMapStage生成了店枣。
總結(jié)速警,整體過(guò)程還是比較清晰叹誉,從最末尾的rdd開(kāi)始,往前追溯闷旧,按寬依賴(lài)劃分生成ShuffleMapStage, 最后一個(gè)stage直接為ResultStage长豁。整個(gè)過(guò)程中也同時(shí)在構(gòu)建stage間的父子依賴(lài)關(guān)系。

收工Cψ啤=辰蟆!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末该园,一起剝皮案震驚了整個(gè)濱河市酸舍,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌里初,老刑警劉巖啃勉,帶你破解...
    沈念sama閱讀 219,188評(píng)論 6 508
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異青瀑,居然都是意外死亡璧亮,警方通過(guò)查閱死者的電腦和手機(jī)萧诫,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,464評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門(mén)斥难,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人帘饶,你說(shuō)我怎么就攤上這事哑诊。” “怎么了及刻?”我有些...
    開(kāi)封第一講書(shū)人閱讀 165,562評(píng)論 0 356
  • 文/不壞的土叔 我叫張陵镀裤,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我缴饭,道長(zhǎng)暑劝,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,893評(píng)論 1 295
  • 正文 為了忘掉前任颗搂,我火速辦了婚禮担猛,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘丢氢。我一直安慰自己傅联,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,917評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布疚察。 她就那樣靜靜地躺著蒸走,像睡著了一般。 火紅的嫁衣襯著肌膚如雪貌嫡。 梳的紋絲不亂的頭發(fā)上比驻,一...
    開(kāi)封第一講書(shū)人閱讀 51,708評(píng)論 1 305
  • 那天该溯,我揣著相機(jī)與錄音,去河邊找鬼别惦。 笑死朗伶,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的步咪。 我是一名探鬼主播论皆,決...
    沈念sama閱讀 40,430評(píng)論 3 420
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼猾漫!你這毒婦竟也來(lái)了点晴?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 39,342評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤悯周,失蹤者是張志新(化名)和其女友劉穎粒督,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體禽翼,經(jīng)...
    沈念sama閱讀 45,801評(píng)論 1 317
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡屠橄,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,976評(píng)論 3 337
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了闰挡。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片锐墙。...
    茶點(diǎn)故事閱讀 40,115評(píng)論 1 351
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖长酗,靈堂內(nèi)的尸體忽然破棺而出溪北,到底是詐尸還是另有隱情,我是刑警寧澤夺脾,帶...
    沈念sama閱讀 35,804評(píng)論 5 346
  • 正文 年R本政府宣布之拨,位于F島的核電站,受9級(jí)特大地震影響咧叭,放射性物質(zhì)發(fā)生泄漏蚀乔。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,458評(píng)論 3 331
  • 文/蒙蒙 一菲茬、第九天 我趴在偏房一處隱蔽的房頂上張望吉挣。 院中可真熱鬧,春花似錦生均、人聲如沸听想。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 32,008評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)汉买。三九已至,卻和暖如春佩脊,著一層夾襖步出監(jiān)牢的瞬間蛙粘,已是汗流浹背垫卤。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 33,135評(píng)論 1 272
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留出牧,地道東北人穴肘。 一個(gè)月前我還...
    沈念sama閱讀 48,365評(píng)論 3 373
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像舔痕,于是被迫代替她去往敵國(guó)和親评抚。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,055評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容