Spark Task

  • Task Locality

Data locality can have a major impact on the performance of Spark jobs. If data and the code that operates on it are together then computation tends to be fast. But if code and data are separated, one must move to the other. Typically it is faster to ship serialized code from place to place than a chunk of data because code size is much smaller than data. Spark builds its scheduling around this general principle of data locality.
?Data locality is how close data is to the code processing it. There are several levels of locality based on the data’s current location. In order from closest to farthest:
PROCESS_LOCAL - data is in the same JVM as the running code. This is the best locality possible
NODE_LOCAL - data is on the same node. Examples might be in HDFS on the same node, or in another executor on the same node. This is a little slower than PROCESS_LOCAL because the data has to travel between processes
NO_PREF - data is accessed equally quickly from anywhere and has no locality preference
RACK_LOCAL - data is on the same rack of servers. Data is on a different server on the same rack so needs to be sent over the network, typically through a single switch
ANY - data is elsewhere on the network and not in the same rack
?Spark prefers to schedule all tasks at the best locality level, but this is not always possible. In situations where there is no unprocessed data on any idle executor, Spark switches to lower locality levels. There are two options:
a) wait until a busy CPU frees up to start a task on data on the same server, or
b) immediately start a new task in a farther away place that requires moving data there
.
What Spark typically does is wait a bit in the hopes that a busy CPU frees up. Once that timeout expires, it starts moving the data from far away to the free CPU. The wait timeout for fallback between each level can be configured individually or all together in one parameter; see the spark.locality parameters on the configuration page for details. You should increase these settings if your tasks are long and see poor locality, but the default usually works well.


  • 1瞎暑、獲取partition位置信息

[DAGScheduler]->private def submitMissingTasks(stage: Stage, jobId: Int)
                                  ...
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => 
            /**
             getPreferredLocs - 獲取partition數(shù)據(jù)的位置信息,下文將分析不
              同情況下獲取該信息的方式。
            **/
            (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
            val job = s.activeJob.getpartitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    }
                                   ...
[DAGScheduler]->submitMissingTasks->getPreferredLocsInternal
    private def getPreferredLocsInternal(
      rdd: RDD[_],
      partition: Int,
      visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
                                  ...
     /**
      cacheLocs 維護RDD的partitions 的 location信息,該信息是
      TaskLocation的實例。
      如果cacheLocs沒有當前partition的location信息,則會執(zhí)行如下邏輯:
      如果RDD的storageLevel為空返回nil,并填入cacheLocs,否則會通過    
      blocakManagerMaster來獲取持有該partition信息的 blockManager 
     并實例化ExecutorCacheTaskLocation放入cacheLocs中。
      具體參看getCecheLocs方法。關于block及cache細節(jié)將在Storage章
      節(jié)具體分析。
      **/
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    }

    /**
     RDD有個方法preferredLocations,該方法首先嘗試獲取
     partition的checkPoint信息,如果未進行過checkPoint則調(diào)用
     getPreferredLocations(split)肖揣,不同的RDD有不同的實現(xiàn)。例如:
     HadoopRdd即通過Hadoop InputSplit 來獲取當前partition的位置浮入。
     如果當前RDD既未cache也不是輸入RDD龙优,則進行下一個邏輯。
    **/
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    if (rddPrefs.nonEmpty) {
      return rddPrefs.map(TaskLocation(_))
    }
    /**
      當RDD未cache也不是輸入RDD即無法通過preferredLocations來獲取
      partition位置信息時則通過遞歸尋找父RDD對應的partition的位置信息
      舵盈,該方式只對窄依賴有效陋率。
    **/
    rdd.dependencies.foreach {
      case n: NarrowDependency[_] =>
        for (inPart <- n.getParents(partition)) {
          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
          if (locs != Nil) {
            return locs
          }
        }
      case _ =>
    }
    Nil}
  • 2球化、Task構(gòu)造

[DAGScheduler]->private def submitMissingTasks(stage: Stage, jobId: Int)
                                    ...
 /**
  根據(jù)不同的stage類型構(gòu)造不同類型的task。
  每個partition對應一個task且每個task都包含目標partition的location信
  息,最終所有tasks將作為taskSet進行提交瓦糟。
  注:task的實際執(zhí)行邏輯已經(jīng)序列化到taskBinary中并broadcast到每個
executor上筒愚,此處構(gòu)造的tasks只是加上了location信息,目的是通過driver端的TaskScheduler進行調(diào)度,并不會將該taskSet進行序列化和廣播。
 **/
 val tasks: Seq[Task[_]] = try {
      stage match {
        case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>
                                   ...
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
          }
        case stage: ResultStage =>
           val job = stage.activeJob.get
            partitionsToCompute.map { id =>
                                   ...
              new ResultTask(stage.id, stage.latestInfo.attemptId,taskBinary, 
              part, locs, id, properties, stage.latestInfo.taskMetrics)
          }
      }
    }
                                     ...
if (tasks.size > 0) {
      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
      stage.pendingPartitions ++= tasks.map(_.partitionId)
      logDebug("New pending partitions: " + stage.pendingPartitions)
      /**
          構(gòu)造TaskSet進行提交,大部分情況下使用的是
          TaskSchedulerImpl,DAG通過調(diào)用其實例進行task提交,而  
          TaskSchedulerImpl通過TaskSetManager的實例
          對taskSet進行管理(
          TaskSetmanager在實例化過程中會對
          task進行executor分配,有且僅有兩種分配方式根據(jù)上述      
          preferedLocation類型而定:
          ExecutorCacheTaskLocation即cache在executor上的RDD:
          HDFSCacheTaskLocation:及hdfs輸入數(shù)據(jù)或者checkpoint數(shù)據(jù)
          詳見[TaskSetManager]->addPendingTask
          )菩浙。細節(jié)在'spark調(diào)度'章節(jié)    
          會描述巢掺。
       **/
      taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    }
                                      ...
  • 2、Task提交

[TaskSchedulerImpl]->def submitTasks(taskSet: TaskSet)

 override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " +
           tasks.length + " tasks")
    this.synchronized {
      /**Schedules the tasks within a single TaskSet in the  
         TaskSchedulerImpl. This class keeps track of each task, retries 
        tasks if they fail (up to a limited number of times), and handles 
        locality-aware scheduling for this TaskSet via delay scheduling.
        The main interfaces to it are resourceOffer, which asks the 
        TaskSet whether it wants to run a task on one node,and 
        statusUpdate, which tells it that one of its tasks changed state
        (e.g. finished).
      **/
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
                                    ...
      /**
          有兩種實現(xiàn)對應不同的task調(diào)度算法(與OS中的調(diào)度一樣):
          1劲蜻、FIFOSchedulableBuilder
          2陆淀、FairSchedulableBuilder
          schedulableBuilder中持有Pool用于管理taskmanager,并根據(jù)不
          同的調(diào)度算法返回不同順序的taskmanager。
          同時該pool的checkSpeculatableTasks方法用于對開啟了speculate
          的job進行task的重復執(zhí)行先嬉。
          此處實際操作是將taskManager放入pool中,進行異步調(diào)度轧苫。
       **/
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +"and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    /**
      on yarn的情況下此處的backend為CoarseGrainedSchedulerBackend  
      的實例,該backend持有當前job執(zhí)行狀態(tài)下所有executor信息,并可
      對其進行管理,該backend啟動在driver端疫蔓。此處調(diào)用最終會調(diào)用
      backend.makeOffers進行資源申請并觸發(fā)task調(diào)度含懊。
     **/
    backend.reviveOffers()
  }
  • 3、Task 資源申請及調(diào)度

[CoarseGrainedSchedulerBackend]->private def makeOffers()**
    /**
      該方法會在每次系統(tǒng)資源發(fā)生變化時被調(diào)用,例如executor向backend
      進行注冊時,task完成時衅胀。executor向backend發(fā)送消息,backend在
      處理邏輯最后一步觸發(fā)一次task調(diào)度邏輯岔乔。spark中的所有異步調(diào)度
      都是類似的處理方法,例如standalone模式下對executor的調(diào)度、 
      waitingStage的調(diào)度等滚躯。
     **/
    private def makeOffers() {
      // Filter out executors under killing
      val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
      /**
        獲取系統(tǒng)當前每個executor的可用cpu資源以case class workerOffer
        返回雏门。在后面對task進行分配時以此為依據(jù)執(zhí)行l(wèi)ocality邏輯。
        **/
      val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq
      /**
        該方法會序列化TaskDescription并發(fā)送到相應的executor上進行
        邏輯執(zhí)行掸掏。
        scheduler.resourceOffers(workOffers):進行具體的task分配
        邏輯茁影。
      **/
      launchTasks(scheduler.resourceOffers(workOffers))
    }
[CoarseGrainedSchedulerBackend]->private def makeOffers()
 def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] 
= synchronized {
    var newExecAvail = false
    /**
    如果當前系統(tǒng)executors中有新加入的,這里
    executorAdded(o.executorId, o.host)最終會調(diào)用DAGScheduler中
    的submitWaitingStages()進行一次stage的提交,正如上限task
     的調(diào)度一樣阅束,當系統(tǒng)資源發(fā)生變化時即觸發(fā)一次調(diào)度邏輯呼胚。
     **/
    for (o <- offers) {
      executorIdToHost(o.executorId) = o.host
      executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
      if (!executorsByHost.contains(o.host)) {
        executorsByHost(o.host) = new HashSet[String]()
        executorAdded(o.executorId, o.host)
        newExecAvail = true
      }
      for (rack <- getRackForHost(o.host)) {
        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
      }
    }

    // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
    val shuffledOffers = Random.shuffle(offers)
    /**
        Build a list of tasks to assign to each worker.
        根據(jù)每個executor可用core的數(shù)量進行task分配,每個core對應一個
        task。  
    **/
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    /**
      此處如上所述根據(jù)不同的調(diào)度算法得到不同排序的task,也即
      最終的執(zhí)行順序息裸。
      FIFO或者FAIR。
     **/
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
       /**
          當啟動了新的executor,這里會重新計算task的locality沪编。
          TaskSetManager.recomputeLocality         
       **/
      if (newExecAvail) {
        taskSet.executorAdded()
      }
    }

    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    var launchedTask = false
    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
      do {
        launchedTask = resourceOfferSingleTaskSet(
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }
    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末呼盆,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子蚁廓,更是在濱河造成了極大的恐慌访圃,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,682評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件相嵌,死亡現(xiàn)場離奇詭異腿时,居然都是意外死亡,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評論 3 395
  • 文/潘曉璐 我一進店門彤叉,熙熙樓的掌柜王于貴愁眉苦臉地迎上來蜀撑,“玉大人,你說我怎么就攤上這事徽鼎∈⒛” “怎么了?”我有些...
    開封第一講書人閱讀 165,083評論 0 355
  • 文/不壞的土叔 我叫張陵否淤,是天一觀的道長悄但。 經(jīng)常有香客問我,道長石抡,這世上最難降的妖魔是什么檐嚣? 我笑而不...
    開封第一講書人閱讀 58,763評論 1 295
  • 正文 為了忘掉前任,我火速辦了婚禮啰扛,結(jié)果婚禮上净嘀,老公的妹妹穿的比我還像新娘。我一直安慰自己侠讯,他們只是感情好挖藏,可當我...
    茶點故事閱讀 67,785評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著厢漩,像睡著了一般膜眠。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上溜嗜,一...
    開封第一講書人閱讀 51,624評論 1 305
  • 那天宵膨,我揣著相機與錄音,去河邊找鬼炸宵。 笑死辟躏,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的土全。 我是一名探鬼主播捎琐,決...
    沈念sama閱讀 40,358評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼裹匙!你這毒婦竟也來了瑞凑?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,261評論 0 276
  • 序言:老撾萬榮一對情侶失蹤概页,失蹤者是張志新(化名)和其女友劉穎籽御,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,722評論 1 315
  • 正文 獨居荒郊野嶺守林人離奇死亡技掏,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,900評論 3 336
  • 正文 我和宋清朗相戀三年铃将,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片哑梳。...
    茶點故事閱讀 40,030評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡劲阎,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出涧衙,到底是詐尸還是另有隱情哪工,我是刑警寧澤,帶...
    沈念sama閱讀 35,737評論 5 346
  • 正文 年R本政府宣布弧哎,位于F島的核電站雁比,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏撤嫩。R本人自食惡果不足惜偎捎,卻給世界環(huán)境...
    茶點故事閱讀 41,360評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望序攘。 院中可真熱鬧茴她,春花似錦、人聲如沸程奠。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽瞄沙。三九已至己沛,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間距境,已是汗流浹背申尼。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留垫桂,地道東北人师幕。 一個月前我還...
    沈念sama閱讀 48,237評論 3 371
  • 正文 我出身青樓,卻偏偏與公主長得像诬滩,于是被迫代替她去往敵國和親霹粥。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,976評論 2 355

推薦閱讀更多精彩內(nèi)容

  • 小時候碱呼,你告訴我車輪是圓的 我用泥巴捏成你所說的樣子 但我覺得那更像一朵太陽花 長大后蒙挑,我知道車輪是一個圓柱體 就...
    曉朔一一閱讀 405評論 6 5
  • 今年的胚胎發(fā)育學,老師講課時說,他的目的是節(jié)課時姑裂,講的讓我們不知道自己的性別馋袜,才算成功。期末考試的最后一道論述大題...
    黃楊姑娘閱讀 235評論 0 0
  • 親愛的們舶斧,大家好欣鳖!我是咚咚,就是那個愛學習愛折騰思維導圖的咚咚茴厉! 我們的宗旨:手把手,一步步教會你使用思維導圖優(yōu)化...
    咚咚老師閱讀 1,664評論 1 10