Pool-Spark Standalone模式下的隊(duì)列
org.apache.spark.scheduler.Pool
是 Spark Standalone 模式下的隊(duì)列撰洗。從其重要成員及成員函數(shù)來剖析這個(gè)在 TaskScheduler 調(diào)度中起關(guān)鍵作用的類。
成員
下圖展示了 Pool 的所有成員及一些簡(jiǎn)要說明
其中差导,taskSetSchedulingAlgorithm
的類型由schedulingMode
決定,下文會(huì)對(duì)FairSchedulingAlgorithm
和FIFOSchedulingAlgorithm
做詳細(xì)分析
var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
schedulingMode match {
case SchedulingMode.FAIR =>
new FairSchedulingAlgorithm()
case SchedulingMode.FIFO =>
new FIFOSchedulingAlgorithm()
}
}
成員函數(shù)
先來看看如何向一個(gè) Pool 中添加 TaskSetManager 或 Pool猪勇,說明都寫在注釋中。
override def addSchedulable(schedulable: Schedulable) {
//<f 判斷 schedulable 不為 null
require(schedulable != null)
//< 往隊(duì)列中添加schedulable 對(duì)象埠对,可以是taskSet,也可以是子隊(duì)列
schedulableQueue.add(schedulable)
schedulableNameToSchedulable.put(schedulable.name, schedulable)
//< 將該 schedulable 對(duì)象的父親設(shè)置為自己
schedulable.parent = this
}
以下為如何 remove 一個(gè) TaskSetManager 或 Pool项玛,需要注意的是schedulableQueue為ConcurrentLinkedQueue
類型,其 remove 方法可以刪除與參數(shù)值相等的元素
override def removeSchedulable(schedulable: Schedulable) {
schedulableQueue.remove(schedulable)
schedulableNameToSchedulable.remove(schedulable.name)
}
當(dāng)有 executor 丟失時(shí)襟沮,會(huì)調(diào)用 executorLost 方法
override def executorLost(executorId: String, host: String) {
schedulableQueue.foreach(_.executorLost(executorId, host))
}
若該隊(duì)列中某個(gè)元素為 TaskSetManager 類型昌腰,會(huì)調(diào)用 TaskSetManager.executorLost
方法,該方法將查找是否有自己管理的 task 在 lost 的 executor 上運(yùn)行膀跌,若有,則重新將該 lost 的 task 插入隊(duì)列捅伤,等待執(zhí)行;若某元素為 Pool 類型丛忆,即子隊(duì)列祠汇,那么 Pool.executorLost
方法會(huì)對(duì)其schedulableQueue的所有元素調(diào)用 executorLost 方法熄诡,這樣一來,若根 Pool 調(diào)用 executorLost 方法凰浮,則該隊(duì)列下的所有 TaskSetManager 對(duì)象都能調(diào)用 executorLost 方法,那么因某個(gè) executor lost 而 lost 的 task 都將被重新插入隊(duì)列執(zhí)行
getSortedTaskSetQueue
方法是 Pool 最重要的方法袜茧,它將以該 Pool 為根隊(duì)列的所有 TaskSetManager 排序后存在一個(gè)數(shù)組中菜拓,下標(biāo)越小的數(shù)組越早被執(zhí)行笛厦。代碼如下:
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
val sortedSchedulableQueue =
schedulableQueue.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
for (schedulable <- sortedSchedulableQueue) {
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
}
sortedTaskSetQueue
}
這個(gè)函數(shù)的實(shí)現(xiàn)邏輯主要分為兩步,假設(shè)現(xiàn)在調(diào)用 tmpPool.getSortedTaskSetQueue
递递,tmpPool 為 Pool 類型:
- 對(duì) tmpPool 的直接子 Pool 和 TaskSetManager 進(jìn)行排序啥么,排序的算法根據(jù)Pool 的 schedulingMode 而定,F(xiàn)AIR 和 FIFO 不相同悬荣。排序后得到sortedSchedulableQueue
- 遍歷sortedSchedulableQueue所有元素。若元素為 TaskSetManager 類型氯迂,則將該元素添加到
sortedTaskSetQueue: ArrayBuffer[TaskSetManager]
尾部,若為 Pool 類型嚼蚀,則執(zhí)行第一步 - 返回包含對(duì) tmpPool 下所有 TaskSetManager 排序過后的數(shù)組
經(jīng)過這幾部禁灼,就能將一個(gè) Pool 下的所有 TaskSetManager 排序轿曙,也就能確定哪個(gè) TaskSetManager 的 tasks 要優(yōu)先被 TaskScheduler 調(diào)度僻孝。
如上所述,排序的關(guān)鍵是taskSetSchedulingAlgorithm.comparator
穿铆,上文中已經(jīng)提到taskSetSchedulingAlgorithm
根據(jù)schedulingMode值的不同,可以有FairSchedulingAlgorithm
和FIFOSchedulingAlgorithm
兩種類型斋荞。先來看
FIFOSchedulingAlgorithm的排序
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority
val priority2 = s2.priority
var res = math.signum(priority1 - priority2)
if (res == 0) {
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
}
if (res < 0) {
true
} else {
false
}
}
}
FIFOSchedulingAlgorithm比較邏輯很簡(jiǎn)單,可概括為下面兩句話:
- 首先比較優(yōu)先級(jí)值平酿,優(yōu)先級(jí)值越小的更優(yōu)先(好拗口)
- 若優(yōu)先級(jí)值相等,則比較 stageId 值染服,stageId 值越小的越優(yōu)先
FairSchedulingAlgorithm的比較邏輯會(huì)復(fù)雜一些,代碼如下:
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val minShare1 = s1.minShare
val minShare2 = s2.minShare
val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2
val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var compare: Int = 0
if (s1Needy && !s2Needy) {
//< s1中正在執(zhí)行的 tasks 個(gè)數(shù)小于 s1的最小 cpu 核數(shù)柳刮;且s2中正在執(zhí)行的 tasks 個(gè)數(shù)等于 s2的最小 cpu 核數(shù)。則 s1優(yōu)先
return true
} else if (!s1Needy && s2Needy) {
//< s2中正在執(zhí)行的 tasks 個(gè)數(shù)小于 s2的最小 cpu 核數(shù)秉颗;且s1中正在執(zhí)行的 tasks 個(gè)數(shù)等于 s1的最小 cpu 核數(shù)痢毒。則 s2優(yōu)先
return false
} else if (s1Needy && s2Needy) {
//< s1,s2中正在執(zhí)行的 tasks 個(gè)數(shù)小于其最小 cpu 核數(shù)蚕甥。則比較各自 runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble 的比值,小的優(yōu)先
compare = minShareRatio1.compareTo(minShareRatio2)
} else {
//< s1,s2中正在執(zhí)行的 tasks 個(gè)數(shù)等于其最小 cpu 核數(shù)菇怀。則比較runningTasks1.toDouble / s1.weight.toDouble,小的優(yōu)先
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
}
if (compare < 0) {
true
} else if (compare > 0) {
false
} else {
//< 若以上比較都相等爱沟,則比較 s1和 s2的名字
s1.name < s2.name
}
}
}
FairSchedulingAlgorithm的比較規(guī)則以在上面代碼的注釋中說明
PS
Pool 的成員stageId 初始值為-1帅霜,但搜遍整個(gè) Spark 源碼也沒有找到哪里有對(duì)該值的重新賦值呼伸。這個(gè) stageId 的具體含義及如何發(fā)揮作用還沒有完全搞明白,若哪位朋友知道括享,麻煩告知,多謝