wordCount執(zhí)行過程中的源碼解析

  1. wordCount程序流程圖示


    rdd.png
  2. wordcount程序

sc.textFile("").
   flatMap(_.split("\t")).
    map((_,1)).
    reduceByKey(_+_).
    collect()

2.1 collect函數(shù)

  //將所有的結(jié)果集以數(shù)組的形式返回回來
  //this代表當(dāng)前rdd绎橘,有了rdd其血緣關(guān)系就有了
  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }
  def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
    //此時(shí)就決定rdd的task的數(shù)量
    runJob(rdd, func, 0 until rdd.partitions.length)
  }
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: Iterator[T] => U,
      partitions: Seq[Int]): Array[U] = {
    val cleanedFunc = clean(func)
    runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
  }
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int]): Array[U] = {
    val results = new Array[U](partitions.size)
    //func代表使用的是哪個(gè)算子
    //partitions代表多少分區(qū)
    runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
    results
  } 
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    //開始作業(yè)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    //此時(shí)dagScheduler和taskScheduler都準(zhǔn)備好
    //此時(shí)已進(jìn)入上圖的第二階段
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

此時(shí)進(jìn)入DAGScheduler類

  def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    //該方法將rdd、算子、分區(qū)等提交上去
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
    waiter.completionFuture.value.get match {
      case scala.util.Success(_) =>
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      case scala.util.Failure(exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
        val callerStackTrace = Thread.currentThread().getStackTrace.tail
        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
        throw exception
    }
  }
//Submit an action job to the scheduler
//提交action作業(yè)到scheduler上
  def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    //分區(qū)數(shù)量
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }
    //jobId是什么
    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      // Return immediately if the job is running 0 tasks
      //分區(qū)數(shù)量為0直接返回
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }
private[spark] class JobWaiter[T](
    dagScheduler: DAGScheduler,
    val jobId: Int,
    totalTasks: Int,
    resultHandler: (Int, T) => Unit)
  extends JobListener with Logging{
  //讓dagScheduler取消job
  def cancel() {
    dagScheduler.cancelJob(jobId, None)
  }

  //失敗job給予提示
  override def jobFailed(exception: Exception): Unit = {
    if (!jobPromise.tryFailure(exception)) {
      logWarning("Ignore failure", exception)
    }
  }
}

根據(jù)debug程序可以知道jobSubmitted之后運(yùn)行handleJobSubmitted

private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    //根據(jù)dag圖對(duì)其拆分成多個(gè)stage橘洞,因?yàn)榇藭r(shí)得到的是經(jīng)過處理的RDD, 
    //所以先得到finalStage路媚,需要從后向前推
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      //根據(jù)結(jié)果創(chuàng)建stage也就是finalstage
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: BarrierJobSlotsNumberCheckFailed =>
        logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " +
          "than the total number of slots in the cluster currently.")
        // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
        val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
          new BiFunction[Int, Int, Int] {
            override def apply(key: Int, value: Int): Int = value + 1
          })
        if (numCheckFailures <= maxFailureNumTasksCheck) {
          messageScheduler.schedule(
            new Runnable {
              override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
                partitions, callSite, listener, properties))
            },
            timeIntervalNumTasksCheck,
            TimeUnit.SECONDS
          )
          return
        } else {
          // Job failed, clear internal data.
          barrierJobIdToNumTasksCheckFailures.remove(jobId)
          listener.jobFailed(e)
          return
        }
// Job submitted, clear internal data.
    barrierJobIdToNumTasksCheckFailures.remove(jobId)

    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
// 該行輸出為Got job 0 (collect atSparkContextNewApp.scala:10) with 2 output partitions
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
//該行輸出為DAGScheduler: Final stage: ResultStage 1 (collect at SparkContextNewApp.scala:10)
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
// 該行輸出為Parents of final stage: List(ShuffleMapStage 0)
//得到其父stage
    logInfo("Parents of final stage: " + finalStage.parents)
// 該行輸出為Missing parents: List(ShuffleMapStage 0)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    //提交最后一個(gè)stage
    submitStage(finalStage)
  }

    case e: Exception =>
    logWarning("Creating new stage failed due to exception - job: " + jobId, e)
    listener.jobFailed(e)
    return
}
private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
      logDebug("submitStage(" + stage + ")")
      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
        //查看其有無丟失的父節(jié)點(diǎn)
        val missing = getMissingParentStages(stage).sortBy(_.id)
        logDebug("missing: " + missing)
        if (missing.isEmpty) {
          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
          //提交丟失的task
          submitMissingTasks(stage, jobId.get)
        } else {
          //不為空的話澄者,進(jìn)行迭代
         //對(duì)其父節(jié)點(diǎn)依次執(zhí)行之前的流程
        //一個(gè)反向迭代過程
       //就可以將所有的stage都提交了
          for (parent <- missing) {
            submitStage(parent)
          }
          waitingStages += stage
        }
      }
    } else {
      abortStage(stage, "No active job for stage " + stage.id, None)
    }
  }
 private def submitMissingTasks(stage: Stage, jobId: Int){
  if (tasks.size > 0) {
      logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
        s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
    //此時(shí)stage已經(jīng)被封裝成一個(gè)一個(gè)的taskset
    //通過taskScheduler將taskset提交上去
    //對(duì)應(yīng)上圖的第二階段提交到第三階段  
    taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
    } else {
      // Because we posted SparkListenerStageSubmitted earlier, we should mark
      // the stage as completed here in case there are no tasks to run
      markStageAsFinished(stage, None)

      stage match {
        case stage: ShuffleMapStage =>
          logDebug(s"Stage ${stage} is actually done; " +
              s"(available: ${stage.isAvailable}," +
              s"available outputs: ${stage.numAvailableOutputs}," +
              s"partitions: ${stage.numPartitions})")
          markMapStageJobsAsFinished(stage)
        case stage : ResultStage =>
          logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
      }
      submitWaitingChildStages(stage)
    }
}
override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    // 此時(shí)輸出為Adding task set 0.0 with 2 tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    backend.reviveOffers()
  }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末锥涕,一起剝皮案震驚了整個(gè)濱河市病游,隨后出現(xiàn)的幾起案子唇跨,更是在濱河造成了極大的恐慌,老刑警劉巖衬衬,帶你破解...
    沈念sama閱讀 222,627評(píng)論 6 517
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件买猖,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡滋尉,警方通過查閱死者的電腦和手機(jī)玉控,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 95,180評(píng)論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來狮惜,“玉大人奸远,你說我怎么就攤上這事既棺》硇” “怎么了懒叛?”我有些...
    開封第一講書人閱讀 169,346評(píng)論 0 362
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)耽梅。 經(jīng)常有香客問我薛窥,道長(zhǎng),這世上最難降的妖魔是什么眼姐? 我笑而不...
    開封第一講書人閱讀 60,097評(píng)論 1 300
  • 正文 為了忘掉前任诅迷,我火速辦了婚禮,結(jié)果婚禮上众旗,老公的妹妹穿的比我還像新娘罢杉。我一直安慰自己,他們只是感情好贡歧,可當(dāng)我...
    茶點(diǎn)故事閱讀 69,100評(píng)論 6 398
  • 文/花漫 我一把揭開白布滩租。 她就那樣靜靜地躺著,像睡著了一般利朵。 火紅的嫁衣襯著肌膚如雪律想。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,696評(píng)論 1 312
  • 那天绍弟,我揣著相機(jī)與錄音技即,去河邊找鬼。 笑死樟遣,一個(gè)胖子當(dāng)著我的面吹牛而叼,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播豹悬,決...
    沈念sama閱讀 41,165評(píng)論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼葵陵,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了屿衅?” 一聲冷哼從身側(cè)響起埃难,我...
    開封第一講書人閱讀 40,108評(píng)論 0 277
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎涤久,沒想到半個(gè)月后涡尘,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,646評(píng)論 1 319
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡响迂,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,709評(píng)論 3 342
  • 正文 我和宋清朗相戀三年考抄,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片蔗彤。...
    茶點(diǎn)故事閱讀 40,861評(píng)論 1 353
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡污淋,死狀恐怖蝇更,靈堂內(nèi)的尸體忽然破棺而出馋缅,到底是詐尸還是另有隱情,我是刑警寧澤吧彪,帶...
    沈念sama閱讀 36,527評(píng)論 5 351
  • 正文 年R本政府宣布,位于F島的核電站丢早,受9級(jí)特大地震影響姨裸,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜怨酝,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,196評(píng)論 3 336
  • 文/蒙蒙 一傀缩、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧农猬,春花似錦赡艰、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,698評(píng)論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至苦掘,卻和暖如春换帜,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背鹤啡。 一陣腳步聲響...
    開封第一講書人閱讀 33,804評(píng)論 1 274
  • 我被黑心中介騙來泰國(guó)打工惯驼, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人递瑰。 一個(gè)月前我還...
    沈念sama閱讀 49,287評(píng)論 3 379
  • 正文 我出身青樓祟牲,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親抖部。 傳聞我的和親對(duì)象是個(gè)殘疾皇子说贝,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,860評(píng)論 2 361

推薦閱讀更多精彩內(nèi)容