[Spark源碼剖析]Spark 延遲調(diào)度策略

本文旨在說明 Spark 的延遲調(diào)度及其是如何工作的

什么是延遲調(diào)度

在 Spark 中,若 task 與其輸入數(shù)據(jù)在同一個(gè) jvm 中,我們稱 task 的本地性為 PROCESS_LOCAL,這種本地性(locality level)是最優(yōu)的,避免了網(wǎng)絡(luò)傳輸及文件 IO鸯檬,是最快的;其次是 task 與輸入數(shù)據(jù)在同一節(jié)點(diǎn)上的 NODE_LOCAL螺垢,數(shù)據(jù)在哪都一樣的 NO_PREF喧务,數(shù)據(jù)與 task 在同一機(jī)架不同節(jié)點(diǎn)的 RACK_LOCAL 及最糟糕的不在同一機(jī)架的 ANY赖歌。

本地性越好,對于 task 來說蹂楣,花在網(wǎng)絡(luò)傳輸及文件 IO 的時(shí)間越少俏站,整個(gè) task 執(zhí)行耗時(shí)也就更少。而對于很多 task 來說痊土,執(zhí)行 task 的時(shí)間往往會(huì)比網(wǎng)絡(luò)傳輸/文件 IO 的耗時(shí)要短的多肄扎。所以 Spark 希望盡量以更優(yōu)的本地性啟動(dòng) task。延遲調(diào)度就是為此而存在的赁酝。

Spark的位置優(yōu)先(1): TaskSetManager 的有效 Locality Levels這篇文章中犯祠,我們可以知道,假設(shè)一個(gè) task 的最優(yōu)本地性為 N酌呆,那么該 task 同時(shí)也具有其他所有本地性比 N 差的本地性衡载。

假設(shè)調(diào)度器上一次以 locality level(本地性) M 為某個(gè) taskSetManager 啟動(dòng) task 失敗,則說明該 taskSetManager 中包含本地性 M 的 tasks 的本地性 M 對應(yīng)的所有節(jié)點(diǎn)均沒有空閑資源隙袁。此時(shí)痰娱,只要當(dāng)期時(shí)間與上一次以 M 為 taskSetManager 啟動(dòng) task 時(shí)間差小于配置的值,調(diào)度器仍然會(huì)以 locality level M 來為 taskSetManager 啟動(dòng) task

延時(shí)調(diào)度如何工作

函數(shù)TaskSetManager#getAllowedLocalityLevel是實(shí)現(xiàn)延時(shí)調(diào)度最關(guān)鍵的地方菩收,用來返回當(dāng)前該 taskSetManager 中未執(zhí)行的 tasks 的最高可能 locality level梨睁。以下為其實(shí)現(xiàn)

/**
   * Get the level we can launch tasks according to delay scheduling, based on current wait time.
   */
  private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
    // Remove the scheduled or finished tasks lazily
    def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
      var indexOffset = pendingTaskIds.size
      while (indexOffset > 0) {
        indexOffset -= 1
        val index = pendingTaskIds(indexOffset)
        if (copiesRunning(index) == 0 && !successful(index)) {
          return true
        } else {
          pendingTaskIds.remove(indexOffset)
        }
      }
      false
    }
    // Walk through the list of tasks that can be scheduled at each location and returns true
    // if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have
    // already been scheduled.
    def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
      val emptyKeys = new ArrayBuffer[String]
      val hasTasks = pendingTasks.exists {
        case (id: String, tasks: ArrayBuffer[Int]) =>
          if (tasksNeedToBeScheduledFrom(tasks)) {
            true
          } else {
            emptyKeys += id
            false
          }
      }
      // The key could be executorId, host or rackId
      emptyKeys.foreach(id => pendingTasks.remove(id))
      hasTasks
    }

    while (currentLocalityIndex < myLocalityLevels.length - 1) {
      val moreTasks = myLocalityLevels(currentLocalityIndex) match {
        case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
        case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
        case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
        case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
      }
      if (!moreTasks) {
        // This is a performance optimization: if there are no more tasks that can
        // be scheduled at a particular locality level, there is no point in waiting
        // for the locality wait timeout (SPARK-4939).
        lastLaunchTime = curTime
        logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
          s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
        currentLocalityIndex += 1
      } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
        // Jump to the next locality level, and reset lastLaunchTime so that the next locality
        // wait timer doesn't immediately expire
        lastLaunchTime += localityWaits(currentLocalityIndex)
        currentLocalityIndex += 1
        logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " +
          s"${localityWaits(currentLocalityIndex)}ms")
      } else {
        return myLocalityLevels(currentLocalityIndex)
      }
    }
    myLocalityLevels(currentLocalityIndex)
  }

代碼有點(diǎn)小長,好在并不復(fù)雜娜饵,一些關(guān)鍵注釋在以上源碼中都有注明坡贺。

循環(huán)條件為while (currentLocalityIndex < myLocalityLevels.length - 1)
其中myLocalityLevels: Array[TaskLocality.TaskLocality]是當(dāng)前 TaskSetManager 的所有 tasks 所包含的本地性(locality)集合箱舞,本地性越高的 locality level 在 myLocalityLevels 中的下標(biāo)越斜榉亍(具體請參見http://www.reibang.com/p/05034a9c8cae

currentLocalityIndex 是 getAllowedLocalityLevel 前一次返回的 locality level 在 myLocalityLevels 中的索引(下標(biāo)),若 getAllowedLocalityLevel 是第一次被調(diào)用晴股,則 currentLocalityIndex 為0

整個(gè)循環(huán)體都在做這幾個(gè)事情:

  1. 判斷 myLocalityLevels(currentLocalityIndex) 這個(gè)級(jí)別的本地性對應(yīng)的待執(zhí)行 tasks 集合中是否還有待執(zhí)行的 task

  2. 若無愿伴;則將 currentLocalityIndex += 1 進(jìn)行下一次循環(huán),即將 locality level 降低一級(jí)回到第1步

  3. 若有队魏,且當(dāng)前時(shí)間與上次getAllowedLocalityLevel返回 myLocalityLevels(currentLocalityIndex) 時(shí)間間隔小于 myLocalityLevels(currentLocalityIndex) 對應(yīng)的延遲時(shí)間(通過spark.locality.wait.process或spark.locality.wait.node或spark.locality.wait.rack配置)公般,則 currentLocalityIndex 不變,返回myLocalityLevels(currentLocalityIndex)胡桨。這里是延遲調(diào)度的關(guān)鍵官帘,只要當(dāng)前時(shí)間與上一次以某個(gè) locality level 啟動(dòng) task 的時(shí)間只差小于配置的值,不管上次是否成功啟動(dòng)了 task昧谊,這一次仍然以上次的 locality level 來啟動(dòng) task刽虹。說的更明白一些:比如上次以 localtyX 為 taskSetManager 啟動(dòng) task 失敗,說明taskSetManager 中 tasks 對應(yīng) localityX 的節(jié)點(diǎn)均沒有空閑資源來啟動(dòng) task呢诬,但 Spark 此時(shí)仍然會(huì)以 localityX 來為 taskSetManager 啟動(dòng) task涌哲。為什么要這樣做胖缤?一般來說,task 執(zhí)行耗時(shí)相對于網(wǎng)絡(luò)傳輸/文件IO 要小得多阀圾,調(diào)度器多等待1 2秒可能就可以以更好的本地性執(zhí)行 task哪廓,避免了更耗時(shí)的網(wǎng)絡(luò)傳輸或文件IO,task 整體執(zhí)行時(shí)間會(huì)降低

  4. 若有初烘,且當(dāng)前時(shí)間與上次getAllowedLocalityLevel返回 myLocalityLevels(currentLocalityIndex) 時(shí)間間隔大于 myLocalityLevels(currentLocalityIndex) 對應(yīng)的延遲時(shí)間涡真,則將 currentLocalityIndex += 1 進(jìn)行下一次循環(huán),即將 locality level 降低一級(jí)回到第1步


下面為幫助理解代碼的部分說明

判斷是否還有當(dāng)前 locality level 的 task 需要執(zhí)行

val moreTasks = myLocalityLevels(currentLocalityIndex) match {
    case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
    case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
    case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
    case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
  }

moreTasksToRunIn就不進(jìn)行過多解釋了肾筐,主要作用有兩點(diǎn):

  1. 對于不同等級(jí)的 locality level 的 tasks 列表哆料,將已經(jīng)成功執(zhí)行的或正在執(zhí)行的該 locality level 的 task 從對應(yīng)的列表中移除
  2. 判斷對應(yīng)的 locality level 的 task 是否還要等待執(zhí)行的,若有則返回 true吗铐,否則返回 false

myLocalityLevels(currentLocalityIndex) 等于 PROCESS_LOCAL 為例东亦,這一段代碼用來判斷該 taskSetManager 中的 tasks 是否還有 task 的 locality levels 包含 PROCESS_LOCAL

if (!moreTasks)

若!moreTasks,則對currentLocalityIndex加1唬渗,即 locality level 變低一級(jí)典阵,再次循環(huán)。

根據(jù) http://www.reibang.com/p/05034a9c8cae 的分析我們知道镊逝,若一個(gè) task 存在于某個(gè) locality level 為 level1 待執(zhí)行 tasks 集合中萄喳,那么該 task 也一定存在于所有 locality level 低于 level1 的待執(zhí)行 tasks 集合。

從另一個(gè)角度看蹋半,對于每個(gè) task,總是嘗試以最高的 locality level 去啟動(dòng)充坑,若啟動(dòng)失敗且下次以該 locality 啟動(dòng)時(shí)間與上次以該 locality level 啟動(dòng)時(shí)間超過配置的值减江,則將 locality level 降低一級(jí)來嘗試啟動(dòng) task

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市捻爷,隨后出現(xiàn)的幾起案子辈灼,更是在濱河造成了極大的恐慌,老刑警劉巖也榄,帶你破解...
    沈念sama閱讀 206,013評(píng)論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件巡莹,死亡現(xiàn)場離奇詭異,居然都是意外死亡甜紫,警方通過查閱死者的電腦和手機(jī)降宅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,205評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來囚霸,“玉大人腰根,你說我怎么就攤上這事⊥匦停” “怎么了额嘿?”我有些...
    開封第一講書人閱讀 152,370評(píng)論 0 342
  • 文/不壞的土叔 我叫張陵瘸恼,是天一觀的道長。 經(jīng)常有香客問我册养,道長东帅,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,168評(píng)論 1 278
  • 正文 為了忘掉前任球拦,我火速辦了婚禮靠闭,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘刘莹。我一直安慰自己阎毅,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,153評(píng)論 5 371
  • 文/花漫 我一把揭開白布点弯。 她就那樣靜靜地躺著扇调,像睡著了一般。 火紅的嫁衣襯著肌膚如雪抢肛。 梳的紋絲不亂的頭發(fā)上狼钮,一...
    開封第一講書人閱讀 48,954評(píng)論 1 283
  • 那天,我揣著相機(jī)與錄音捡絮,去河邊找鬼熬芜。 笑死,一個(gè)胖子當(dāng)著我的面吹牛福稳,可吹牛的內(nèi)容都是我干的涎拉。 我是一名探鬼主播,決...
    沈念sama閱讀 38,271評(píng)論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼的圆,長吁一口氣:“原來是場噩夢啊……” “哼鼓拧!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起越妈,我...
    開封第一講書人閱讀 36,916評(píng)論 0 259
  • 序言:老撾萬榮一對情侶失蹤季俩,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后梅掠,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體酌住,經(jīng)...
    沈念sama閱讀 43,382評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,877評(píng)論 2 323
  • 正文 我和宋清朗相戀三年阎抒,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了酪我。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 37,989評(píng)論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡且叁,死狀恐怖祭示,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情,我是刑警寧澤质涛,帶...
    沈念sama閱讀 33,624評(píng)論 4 322
  • 正文 年R本政府宣布稠歉,位于F島的核電站,受9級(jí)特大地震影響汇陆,放射性物質(zhì)發(fā)生泄漏怒炸。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,209評(píng)論 3 307
  • 文/蒙蒙 一毡代、第九天 我趴在偏房一處隱蔽的房頂上張望阅羹。 院中可真熱鬧,春花似錦教寂、人聲如沸捏鱼。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,199評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽导梆。三九已至,卻和暖如春迂烁,著一層夾襖步出監(jiān)牢的瞬間看尼,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,418評(píng)論 1 260
  • 我被黑心中介騙來泰國打工盟步, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留藏斩,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,401評(píng)論 2 352
  • 正文 我出身青樓却盘,卻偏偏與公主長得像狰域,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子黄橘,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,700評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容