【Spark Core】TaskScheduler源碼與任務(wù)提交原理淺析2

引言

上一節(jié)《TaskScheduler源碼與任務(wù)提交原理淺析1》介紹了TaskScheduler的創(chuàng)建過(guò)程弃甥,在這一節(jié)中短纵,我將承接《Stage生成和Stage源碼淺析》中的submitMissingTasks函數(shù)繼續(xù)介紹task的創(chuàng)建和分發(fā)工作泼掠。

DAGScheduler中的submitMissingTasks函數(shù)

如果一個(gè)Stage的所有的parent stage都已經(jīng)計(jì)算完成或者存在于cache中褥紫,那么他會(huì)調(diào)用submitMissingTasks來(lái)提交該Stage所包含的Tasks剩愧。
submitMissingTasks負(fù)責(zé)創(chuàng)建新的Task窖梁。
Spark將由Executor執(zhí)行的Task分為ShuffleMapTask和ResultTask兩種赘风。
每個(gè)Stage生成Task的時(shí)候根據(jù)Stage中的isShuffleMap標(biāo)記確定是否為ShuffleMapStage,如果標(biāo)記為真纵刘,則這個(gè)Stage輸出的結(jié)果會(huì)經(jīng)過(guò)Shuffle階段作為下一個(gè)Stage的輸入邀窃,創(chuàng)建ShuffleMapTask;否則是ResultStage假哎,這樣會(huì)創(chuàng)建ResultTask瞬捕,Stage的結(jié)果會(huì)輸出到Spark空間;最后舵抹,Task是通過(guò)taskScheduler.submitTasks來(lái)提交的肪虎。

計(jì)算流程

submitMissingTasks的計(jì)算流程如下:

  1. 首先得到RDD中需要計(jì)算的partition,對(duì)于Shuffle類型的stage惧蛹,需要判斷stage中是否緩存了該結(jié)果扇救;對(duì)于Result類型的Final Stage刑枝,則判斷計(jì)算Job中該partition是否已經(jīng)計(jì)算完成。
  1. 序列化task的binary迅腔。Executor可以通過(guò)廣播變量得到它装畅。每個(gè)task運(yùn)行的時(shí)候首先會(huì)反序列化。這樣在不同的executor上運(yùn)行的task是隔離的沧烈,不會(huì)相互影響掠兄。
  2. 為每個(gè)需要計(jì)算的partition生成一個(gè)task:對(duì)于Shuffle類型依賴的Stage,生成ShuffleMapTask類型的task锌雀;對(duì)于Result類型的Stage徽千,生成一個(gè)ResultTask類型的task。
  3. 確保Task是可以被序列化的汤锨。因?yàn)椴煌腸luster有不同的taskScheduler双抽,在這里判斷可以簡(jiǎn)化邏輯;保證TaskSet的task都是可以序列化的闲礼。
  4. 通過(guò)TaskScheduler提交TaskSet牍汹。

部分代碼

下面是submitMissingTasks判斷是否為ShuffleMapStage的部分代碼,其中部分參數(shù)說(shuō)明在注釋中:

    val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {
      partitionsToCompute.map { id =>
        val locs = getPreferredLocs(stage.rdd, id)
        val part = stage.rdd.partitions(id)
        //stage.id:Stage的序號(hào)
        //taskBinary:這個(gè)在下面具體介紹
        //part:RDD對(duì)應(yīng)的partition
        //locs:最適合的執(zhí)行位置
        new ShuffleMapTask(stage.id, taskBinary, part, locs)
      }
    } else {
      val job = stage.resultOfJob.get
      partitionsToCompute.map { id =>
        val p: Int = job.partitions(id)
        val part = stage.rdd.partitions(p)
        val locs = getPreferredLocs(stage.rdd, p)
        //p:partition索引柬泽,表示從哪個(gè)partition讀取數(shù)據(jù)
        //id:輸出的分區(qū)索引慎菲,表示reduceID
        new ResultTask(stage.id, taskBinary, part, locs, id)
      }
    }

關(guān)于taskBinary參數(shù):這是RDD和ShuffleDependency的廣播變量(broadcase version),作為序列化之后的結(jié)果锨并。
這里將RDD和其依賴關(guān)系進(jìn)行序列化露该,在executor運(yùn)行task之前再進(jìn)行反序列化。這種方式對(duì)不同的task之間提供了較好的隔離第煮。

下面是submitMissingTasks進(jìn)行任務(wù)提交的部分代碼:

    if (tasks.size > 0) {
      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
      stage.pendingTasks ++= tasks
      logDebug("New pending tasks: " + stage.pendingTasks)
      taskScheduler.submitTasks(
        new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    } else {
      // Because we posted SparkListenerStageSubmitted earlier, we should mark
      // the stage as completed here in case there are no tasks to run
      markStageAsFinished(stage, None)
      logDebug("Stage " + stage + " is actually done; %b %d %d".format(
        stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
    }

TaskSchedulerImpl中的submitTasks

submitTasks的流程如下:

  1. 任務(wù)(tasks)會(huì)被包裝成TaskSetManager(由于TaskSetManager不是線程安全的解幼,所以源碼中需要進(jìn)行同步)
  1. TaskSetManager實(shí)例通過(guò)schedulableBuilder(分為FIFOSchedulableBuilder和FairSchedulableBuilder兩種)投入調(diào)度池中等待調(diào)度
  2. 任務(wù)提交同時(shí)啟動(dòng)定時(shí)器,如果任務(wù)還未被執(zhí)行包警,定時(shí)器會(huì)持續(xù)發(fā)出警告直到任務(wù)被執(zhí)行
  3. 調(diào)用backend的reviveOffers函數(shù)撵摆,向backend的driverActor實(shí)例發(fā)送ReviveOffers消息,driveerActor收到ReviveOffers消息后害晦,調(diào)用makeOffers處理函數(shù)
  override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      activeTaskSets(taskSet.id) = manager
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
      }
      hasReceivedTask = true
    }
    backend.reviveOffers()
  }

TaskSetManager調(diào)度

每個(gè)Stage一經(jīng)確認(rèn)特铝,生成相應(yīng)的TaskSet(即為一組tasks),其對(duì)應(yīng)一個(gè)TaskSetManager通過(guò)Stage回溯到最源頭缺失的Stage提交到調(diào)度池pool中壹瘟,在調(diào)度池中帮哈,這些TaskSetMananger又會(huì)根據(jù)Job ID排序浙于,先提交的Job的TaskSetManager優(yōu)先調(diào)度愿卒,然后一個(gè)Job內(nèi)的TaskSetManager ID小的先調(diào)度蜈敢,并且如果有未執(zhí)行完的父母Stage的TaskSetManager,則不會(huì)提交到調(diào)度池中澄者。

reviveOffers函數(shù)代碼

下面是CoarseGrainedSchedulerBackend的reviveOffers函數(shù):

  override def reviveOffers() {
    driverActor ! ReviveOffers
  }

driveerActor收到ReviveOffers消息后笆呆,調(diào)用makeOffers處理函數(shù)请琳。

DriverActor的makeOffers函數(shù)

makeOffers函數(shù)的處理邏輯是:

  1. 找到空閑的Executor,分發(fā)的策略是隨機(jī)分發(fā)的赠幕,即盡可能將任務(wù)平攤到各個(gè)Executor
  1. 如果有空閑的Executor俄精,就將任務(wù)列表中的部分任務(wù)利用launchTasks發(fā)送給指定的Executor

SchedulerBackend(這里實(shí)際是CoarseGrainedSchedulerBackend)負(fù)責(zé)將新創(chuàng)建的Task分發(fā)給Executor,從launchTasks代碼中可以看出榕堰,在發(fā)送LauchTasks指令之前需要將TaskDescription序列化竖慧。

    // Make fake resource offers on all executors
    def makeOffers() {
      launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq))
    }

TaskSchedulerImpl中的resourceOffers函數(shù)

任務(wù)是隨機(jī)分發(fā)給各個(gè)Executor的,資源分配的工作由resourceOffers函數(shù)處理逆屡。
正如上面submitTasks函數(shù)提到的圾旨,在TaskSchedulerImpl中,這一組Task被交給一個(gè)新的TaskSetManager實(shí)例進(jìn)行管理魏蔗,所有的TaskSetManager經(jīng)由SchedulableBuilder根據(jù)特定的調(diào)度策略進(jìn)行排序砍的,在TaskSchedulerImpl的resourceOffers函數(shù)中,當(dāng)前被選擇的TaskSetManager的ResourceOffer函數(shù)被調(diào)用并返回包含了序列化任務(wù)數(shù)據(jù)的TaskDescription莺治,最后這些TaskDescription再由SchedulerBackend派發(fā)到ExecutorBackend去執(zhí)行廓鞠。

resourceOffers主要做了3件事:

  1. 從Workers里面隨機(jī)抽出一些來(lái)執(zhí)行任務(wù)。
  1. 通過(guò)TaskSetManager找出和Worker在一起的Task谣旁,最后編譯打包成TaskDescription返回床佳。
  2. 將Worker-->Array[TaskDescription]的映射關(guān)系返回。
  /**
   * Called by cluster manager to offer resources on slaves. We respond by asking our active task
   * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
   * that tasks are balanced across the cluster.
   */
  def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // Mark each slave as alive and remember its hostname
    // Also track if new executor is added
    var newExecAvail = false
    // 遍歷worker提供的資源榄审,更新executor相關(guān)的映射
    for (o <- offers) {
      executorIdToHost(o.executorId) = o.host
      activeExecutorIds += o.executorId
      if (!executorsByHost.contains(o.host)) {
        executorsByHost(o.host) = new HashSet[String]()
        executorAdded(o.executorId, o.host)
        newExecAvail = true
      }
      for (rack <- getRackForHost(o.host)) {
        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
      }
    }
    // 從worker當(dāng)中隨機(jī)選出一些來(lái)砌们,防止任務(wù)都堆在一個(gè)機(jī)器上
    // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
    val shuffledOffers = Random.shuffle(offers)
    // Build a list of tasks to assign to each worker.
    // worker的task列表
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    // getSortedTask函數(shù)對(duì)taskset進(jìn)行排序
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      if (newExecAvail) {
        taskSet.executorAdded()
      }
    }

    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    // 隨機(jī)遍歷抽出來(lái)的worker,通過(guò)TaskSetManager的resourceOffer搁进,把本地性最高的Task分給Worker
    // 本地性是根據(jù)當(dāng)前的等待時(shí)間來(lái)確定的任務(wù)本地性的級(jí)別浪感。
    // 它的本地性主要是包括四類:PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY。

    //1. 首先依次遍歷 sortedTaskSets, 并對(duì)于每個(gè) Taskset, 遍歷 TaskLocality
    //2. 越 local 越優(yōu)先, 找不到(launchedTask 為 false)才會(huì)到下個(gè) locality 級(jí)別
    //3. (封裝在resourceOfferSingleTaskSet函數(shù))在多次遍歷offer list,
    //因?yàn)橐淮蝨askSet.resourceOffer只會(huì)占用一個(gè)core, 
    //而不是一次用光所有的 core, 這樣有助于一個(gè) taskset 中的 task 比較均勻的分布在workers上
    //4. 只有在該taskset, 該locality下, 對(duì)所有worker offer都找不到合適的task時(shí), 
    //才跳到下個(gè) locality 級(jí)別
    var launchedTask = false
    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
      do {
        launchedTask = resourceOfferSingleTaskSet(
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }

TaskDescription代碼:

private[spark] class TaskDescription(
    val taskId: Long,
    val attemptNumber: Int,
    val executorId: String,
    val name: String,
    val index: Int,    // Index within this task's TaskSet
    _serializedTask: ByteBuffer)
  extends Serializable {

  // Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer
  private val buffer = new SerializableBuffer(_serializedTask)

  def serializedTask: ByteBuffer = buffer.value

  override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index)
}

DriverActor的launchTasks函數(shù)

launchTasks函數(shù)流程:

  1. launchTasks函數(shù)將resourceOffers函數(shù)返回的TaskDescription信息進(jìn)行序列化
  1. 向executorActor發(fā)送封裝了serializedTask的LaunchTask消息

由于受到Akka Frame Size尺寸的限制拷获,如果發(fā)送數(shù)據(jù)過(guò)大篮撑,會(huì)被截?cái)唷?/p>

    // Launch tasks returned by a set of resource offers
    def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        val ser = SparkEnv.get.closureSerializer.newInstance()
        val serializedTask = ser.serialize(task)
        if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
          val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
          scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
                "spark.akka.frameSize or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
                AkkaUtils.reservedSizeBytes)
              taskSet.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        }
        else {
          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK
          executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
        }
      }
    }

參考資料

Spark大數(shù)據(jù)處理,高彥杰著匆瓜,機(jī)械工業(yè)出版社
Spark技術(shù)內(nèi)幕: Task向Executor提交的源碼解析
Spark源碼系列(三)作業(yè)運(yùn)行過(guò)程

轉(zhuǎn)載請(qǐng)注明作者Jason Ding及其出處
GitCafe博客主頁(yè)(http://jasonding1354.gitcafe.io/)
Github博客主頁(yè)(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
簡(jiǎn)書主頁(yè)(http://www.reibang.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354進(jìn)入我的博客主頁(yè)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市未蝌,隨后出現(xiàn)的幾起案子驮吱,更是在濱河造成了極大的恐慌,老刑警劉巖萧吠,帶你破解...
    沈念sama閱讀 218,755評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件左冬,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡纸型,警方通過(guò)查閱死者的電腦和手機(jī)拇砰,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,305評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門梅忌,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人除破,你說(shuō)我怎么就攤上這事牧氮。” “怎么了瑰枫?”我有些...
    開封第一講書人閱讀 165,138評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵踱葛,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我光坝,道長(zhǎng)尸诽,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,791評(píng)論 1 295
  • 正文 為了忘掉前任盯另,我火速辦了婚禮性含,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘鸳惯。我一直安慰自己胶滋,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,794評(píng)論 6 392
  • 文/花漫 我一把揭開白布悲敷。 她就那樣靜靜地躺著究恤,像睡著了一般。 火紅的嫁衣襯著肌膚如雪后德。 梳的紋絲不亂的頭發(fā)上部宿,一...
    開封第一講書人閱讀 51,631評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音瓢湃,去河邊找鬼理张。 笑死,一個(gè)胖子當(dāng)著我的面吹牛绵患,可吹牛的內(nèi)容都是我干的雾叭。 我是一名探鬼主播,決...
    沈念sama閱讀 40,362評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼落蝙,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼织狐!你這毒婦竟也來(lái)了?” 一聲冷哼從身側(cè)響起筏勒,我...
    開封第一講書人閱讀 39,264評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤移迫,失蹤者是張志新(化名)和其女友劉穎,沒(méi)想到半個(gè)月后管行,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體厨埋,經(jīng)...
    沈念sama閱讀 45,724評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年捐顷,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了荡陷。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片雨效。...
    茶點(diǎn)故事閱讀 40,040評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖废赞,靈堂內(nèi)的尸體忽然破棺而出徽龟,到底是詐尸還是另有隱情,我是刑警寧澤蛹头,帶...
    沈念sama閱讀 35,742評(píng)論 5 346
  • 正文 年R本政府宣布顿肺,位于F島的核電站,受9級(jí)特大地震影響渣蜗,放射性物質(zhì)發(fā)生泄漏屠尊。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,364評(píng)論 3 330
  • 文/蒙蒙 一耕拷、第九天 我趴在偏房一處隱蔽的房頂上張望讼昆。 院中可真熱鬧,春花似錦骚烧、人聲如沸浸赫。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,944評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)既峡。三九已至,卻和暖如春碧查,著一層夾襖步出監(jiān)牢的瞬間运敢,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,060評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工忠售, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留传惠,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 48,247評(píng)論 3 371
  • 正文 我出身青樓稻扬,卻偏偏與公主長(zhǎng)得像卦方,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子泰佳,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,979評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容