[Spark源碼剖析]Pool-Standalone模式下的隊(duì)列

Pool-Spark Standalone模式下的隊(duì)列

org.apache.spark.scheduler.Pool是 Spark Standalone 模式下的隊(duì)列撰洗。從其重要成員及成員函數(shù)來剖析這個(gè)在 TaskScheduler 調(diào)度中起關(guān)鍵作用的類。

成員

下圖展示了 Pool 的所有成員及一些簡(jiǎn)要說明

其中差导,taskSetSchedulingAlgorithm的類型由schedulingMode決定,下文會(huì)對(duì)FairSchedulingAlgorithmFIFOSchedulingAlgorithm做詳細(xì)分析

  var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
    schedulingMode match {
      case SchedulingMode.FAIR =>
        new FairSchedulingAlgorithm()
      case SchedulingMode.FIFO =>
        new FIFOSchedulingAlgorithm()
    }
  }

成員函數(shù)

先來看看如何向一個(gè) Pool 中添加 TaskSetManager 或 Pool猪勇,說明都寫在注釋中。

  override def addSchedulable(schedulable: Schedulable) {
    //<f 判斷 schedulable 不為 null
    require(schedulable != null)
    //< 往隊(duì)列中添加schedulable 對(duì)象埠对,可以是taskSet,也可以是子隊(duì)列
    schedulableQueue.add(schedulable)
    schedulableNameToSchedulable.put(schedulable.name, schedulable)
    //< 將該 schedulable 對(duì)象的父親設(shè)置為自己
    schedulable.parent = this
  }

以下為如何 remove 一個(gè) TaskSetManager 或 Pool项玛,需要注意的是schedulableQueue為ConcurrentLinkedQueue類型,其 remove 方法可以刪除與參數(shù)值相等的元素

  override def removeSchedulable(schedulable: Schedulable) {
    schedulableQueue.remove(schedulable)
    schedulableNameToSchedulable.remove(schedulable.name)
  }

當(dāng)有 executor 丟失時(shí)襟沮,會(huì)調(diào)用 executorLost 方法

  override def executorLost(executorId: String, host: String) {
    schedulableQueue.foreach(_.executorLost(executorId, host))
  }

若該隊(duì)列中某個(gè)元素為 TaskSetManager 類型昌腰,會(huì)調(diào)用 TaskSetManager.executorLost 方法,該方法將查找是否有自己管理的 task 在 lost 的 executor 上運(yùn)行膀跌,若有,則重新將該 lost 的 task 插入隊(duì)列捅伤,等待執(zhí)行;若某元素為 Pool 類型丛忆,即子隊(duì)列祠汇,那么 Pool.executorLost 方法會(huì)對(duì)其schedulableQueue的所有元素調(diào)用 executorLost 方法熄诡,這樣一來,若根 Pool 調(diào)用 executorLost 方法凰浮,則該隊(duì)列下的所有 TaskSetManager 對(duì)象都能調(diào)用 executorLost 方法,那么因某個(gè) executor lost 而 lost 的 task 都將被重新插入隊(duì)列執(zhí)行

getSortedTaskSetQueue方法是 Pool 最重要的方法袜茧,它將以該 Pool 為根隊(duì)列的所有 TaskSetManager 排序后存在一個(gè)數(shù)組中菜拓,下標(biāo)越小的數(shù)組越早被執(zhí)行笛厦。代碼如下:

  override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
    val sortedSchedulableQueue =
      schedulableQueue.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
    for (schedulable <- sortedSchedulableQueue) {
      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
    }
    sortedTaskSetQueue
  }

這個(gè)函數(shù)的實(shí)現(xiàn)邏輯主要分為兩步,假設(shè)現(xiàn)在調(diào)用 tmpPool.getSortedTaskSetQueue递递,tmpPool 為 Pool 類型:

  1. 對(duì) tmpPool 的直接子 Pool 和 TaskSetManager 進(jìn)行排序啥么,排序的算法根據(jù)Pool 的 schedulingMode 而定,F(xiàn)AIR 和 FIFO 不相同悬荣。排序后得到sortedSchedulableQueue
  2. 遍歷sortedSchedulableQueue所有元素。若元素為 TaskSetManager 類型氯迂,則將該元素添加到sortedTaskSetQueue: ArrayBuffer[TaskSetManager]尾部,若為 Pool 類型嚼蚀,則執(zhí)行第一步
  3. 返回包含對(duì) tmpPool 下所有 TaskSetManager 排序過后的數(shù)組

經(jīng)過這幾部禁灼,就能將一個(gè) Pool 下的所有 TaskSetManager 排序轿曙,也就能確定哪個(gè) TaskSetManager 的 tasks 要優(yōu)先被 TaskScheduler 調(diào)度僻孝。

如上所述,排序的關(guān)鍵是taskSetSchedulingAlgorithm.comparator穿铆,上文中已經(jīng)提到taskSetSchedulingAlgorithm根據(jù)schedulingMode值的不同,可以有FairSchedulingAlgorithmFIFOSchedulingAlgorithm兩種類型斋荞。先來看
FIFOSchedulingAlgorithm的排序

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    }
    if (res < 0) {
      true
    } else {
      false
    }
  }
}

FIFOSchedulingAlgorithm比較邏輯很簡(jiǎn)單,可概括為下面兩句話:

  1. 首先比較優(yōu)先級(jí)值平酿,優(yōu)先級(jí)值越小的更優(yōu)先(好拗口)
  2. 若優(yōu)先級(jí)值相等,則比較 stageId 值染服,stageId 值越小的越優(yōu)先

FairSchedulingAlgorithm的比較邏輯會(huì)復(fù)雜一些,代碼如下:

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
    var compare: Int = 0

    if (s1Needy && !s2Needy) {
      //< s1中正在執(zhí)行的 tasks 個(gè)數(shù)小于 s1的最小 cpu 核數(shù)柳刮;且s2中正在執(zhí)行的 tasks 個(gè)數(shù)等于 s2的最小 cpu 核數(shù)。則 s1優(yōu)先
      return true
    } else if (!s1Needy && s2Needy) {
      //< s2中正在執(zhí)行的 tasks 個(gè)數(shù)小于 s2的最小 cpu 核數(shù)秉颗;且s1中正在執(zhí)行的 tasks 個(gè)數(shù)等于 s1的最小 cpu 核數(shù)痢毒。則 s2優(yōu)先
      return false
    } else if (s1Needy && s2Needy) {
      //< s1,s2中正在執(zhí)行的 tasks 個(gè)數(shù)小于其最小 cpu 核數(shù)蚕甥。則比較各自 runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble 的比值,小的優(yōu)先
      compare = minShareRatio1.compareTo(minShareRatio2)
    } else {
      //< s1,s2中正在執(zhí)行的 tasks 個(gè)數(shù)等于其最小 cpu 核數(shù)菇怀。則比較runningTasks1.toDouble / s1.weight.toDouble,小的優(yōu)先
      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
    }

    if (compare < 0) {
      true
    } else if (compare > 0) {
      false
    } else {
      //< 若以上比較都相等爱沟,則比較 s1和 s2的名字
      s1.name < s2.name
    }
  }
}

FairSchedulingAlgorithm的比較規(guī)則以在上面代碼的注釋中說明

PS

Pool 的成員stageId 初始值為-1帅霜,但搜遍整個(gè) Spark 源碼也沒有找到哪里有對(duì)該值的重新賦值呼伸。這個(gè) stageId 的具體含義及如何發(fā)揮作用還沒有完全搞明白,若哪位朋友知道括享,麻煩告知,多謝

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末铃辖,一起剝皮案震驚了整個(gè)濱河市剩愧,隨后出現(xiàn)的幾起案子澳叉,更是在濱河造成了極大的恐慌沐悦,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,839評(píng)論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件藏否,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡副签,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評(píng)論 2 382
  • 文/潘曉璐 我一進(jìn)店門基矮,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人家浇,你說我怎么就攤上這事「直” “怎么了点额?”我有些...
    開封第一講書人閱讀 153,116評(píng)論 0 344
  • 文/不壞的土叔 我叫張陵莺琳,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我惭等,道長(zhǎng),這世上最難降的妖魔是什么辞做? 我笑而不...
    開封第一講書人閱讀 55,371評(píng)論 1 279
  • 正文 為了忘掉前任琳要,我火速辦了婚禮秤茅,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘嫂伞。我一直安慰自己拯钻,他們只是感情好帖努,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,384評(píng)論 5 374
  • 文/花漫 我一把揭開白布拼余。 她就那樣靜靜地躺著,像睡著了一般亩歹。 火紅的嫁衣襯著肌膚如雪凡橱。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,111評(píng)論 1 285
  • 那天稼钩,我揣著相機(jī)與錄音,去河邊找鬼达罗。 笑死,一個(gè)胖子當(dāng)著我的面吹牛粮揉,可吹牛的內(nèi)容都是我干的巡李。 我是一名探鬼主播扶认,決...
    沈念sama閱讀 38,416評(píng)論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼辐宾!你這毒婦竟也來了狱从?” 一聲冷哼從身側(cè)響起螃概,我...
    開封第一講書人閱讀 37,053評(píng)論 0 259
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎吊洼,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體冒窍,經(jīng)...
    沈念sama閱讀 43,558評(píng)論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡递沪,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,007評(píng)論 2 325
  • 正文 我和宋清朗相戀三年款慨,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片谬莹。...
    茶點(diǎn)故事閱讀 38,117評(píng)論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖附帽,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情蕉扮,我是刑警寧澤整胃,帶...
    沈念sama閱讀 33,756評(píng)論 4 324
  • 正文 年R本政府宣布喳钟,位于F島的核電站在岂,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏蛮寂。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,324評(píng)論 3 307
  • 文/蒙蒙 一共郭、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧除嘹,春花似錦写半、人聲如沸尉咕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評(píng)論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽悔捶。三九已至单芜,卻和暖如春蜕该,著一層夾襖步出監(jiān)牢的瞬間洲鸠,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評(píng)論 1 262
  • 我被黑心中介騙來泰國打工扒腕, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人瘾腰。 一個(gè)月前我還...
    沈念sama閱讀 45,578評(píng)論 2 355
  • 正文 我出身青樓皆的,卻偏偏與公主長(zhǎng)得像蹋盆,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子栖雾,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,877評(píng)論 2 345

推薦閱讀更多精彩內(nèi)容