1. 概述
相當(dāng)于一個多叉樹的非葉子節(jié)點(diǎn), 在內(nèi)部用ConcurrentLinkedQueue維護(hù)多個child node.
進(jìn)一步實(shí)現(xiàn)一種自下而上的對整個樹的調(diào)度和管理.
前面介紹過, Pool
是對Trait Scheduler
的另外一種實(shí)現(xiàn).
Pool
會根據(jù)FIFO原則或者FAIR原則來管理它維護(hù)的所有child的先后執(zhí)行順序, 相對于TaskSetManager
關(guān)注一個TaskSet
內(nèi)部的Task的運(yùn)行狀態(tài)和順序, Pool
關(guān)注的是TaskSet之間的向后運(yùn)行順序.
2. 重要內(nèi)部結(jié)構(gòu)
// 這個兩個結(jié)構(gòu)聯(lián)合起來管理這個Pool里所有的child node
// 可以是pool或者tasksetmanager
val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
//
var weight = initWeight
var minShare = initMinShare
/* 前面我們看到過, 當(dāng)這個Pool下面的葉子節(jié)點(diǎn)里有task在運(yùn)行
這里就會+1, 它反映的是這個Pool管理的所有的TaskSet共有多少個task在運(yùn)行*/
var runningTasks = 0
// 優(yōu)先級用于FAIR調(diào)度
var priority = 0
// A pool's stage id is used to break the tie in scheduling.
var stageId = -1
var name = poolName
3. 重要的內(nèi)部方法
幾個簡單的override就不介紹了, 就是對維護(hù)的各種tasksetmnager進(jìn)行狀態(tài)標(biāo)注和計數(shù)
有一個非常重要的是
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
val sortedSchedulableQueue =
schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
for (schedulable <- sortedSchedulableQueue) {
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
}
sortedTaskSetQueue
}
這個方法實(shí)現(xiàn)了對TaskSet的排序, 決定了哪個TaskSet先運(yùn)行, 哪個后運(yùn)行. 可以看到具體的實(shí)現(xiàn)依賴taskSetSchedulingAlgorithm
var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
schedulingMode match {
case SchedulingMode.FAIR =>
new FairSchedulingAlgorithm()
case SchedulingMode.FIFO =>
new FIFOSchedulingAlgorithm()
}
}
3.1 FIFO的實(shí)現(xiàn)
非常簡單, 先進(jìn)先出嘛, 默認(rèn)的Queue就有這個功能.
具體實(shí)現(xiàn)的時候, 就是先看誰的priority高, 高的先運(yùn)行.
如果priority一樣, 就看似會的stageId大, 大的后運(yùn)行, 小的先運(yùn)行.
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority
val priority2 = s2.priority
var res = math.signum(priority1 - priority2)
if (res == 0) {
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
}
if (res < 0) {
true
} else {
false
}
}
}
3.2 FAIR的實(shí)現(xiàn)
FAIR
這個就復(fù)雜了, FAIR是根據(jù)
minShare
runningTasks
weight
來決定哪個TaskSet先運(yùn)行.