本文旨在說明 Spark 的延遲調(diào)度及其是如何工作的
什么是延遲調(diào)度
在 Spark 中,若 task 與其輸入數(shù)據(jù)在同一個(gè) jvm 中,我們稱 task 的本地性為 PROCESS_LOCAL
,這種本地性(locality level)是最優(yōu)的,避免了網(wǎng)絡(luò)傳輸及文件 IO鸯檬,是最快的;其次是 task 與輸入數(shù)據(jù)在同一節(jié)點(diǎn)上的 NODE_LOCAL
螺垢,數(shù)據(jù)在哪都一樣的 NO_PREF
喧务,數(shù)據(jù)與 task 在同一機(jī)架不同節(jié)點(diǎn)的 RACK_LOCAL
及最糟糕的不在同一機(jī)架的 ANY
赖歌。
本地性越好,對于 task 來說蹂楣,花在網(wǎng)絡(luò)傳輸及文件 IO 的時(shí)間越少俏站,整個(gè) task 執(zhí)行耗時(shí)也就更少。而對于很多 task 來說痊土,執(zhí)行 task 的時(shí)間往往會(huì)比網(wǎng)絡(luò)傳輸/文件 IO 的耗時(shí)要短的多肄扎。所以 Spark 希望盡量以更優(yōu)的本地性啟動(dòng) task。延遲調(diào)度就是為此而存在的赁酝。
在Spark的位置優(yōu)先(1): TaskSetManager 的有效 Locality Levels這篇文章中犯祠,我們可以知道,假設(shè)一個(gè) task 的最優(yōu)本地性為 N酌呆,那么該 task 同時(shí)也具有其他所有本地性比 N 差的本地性衡载。
假設(shè)調(diào)度器上一次以 locality level(本地性) M 為某個(gè) taskSetManager 啟動(dòng) task 失敗,則說明該 taskSetManager 中包含本地性 M 的 tasks 的本地性 M 對應(yīng)的所有節(jié)點(diǎn)均沒有空閑資源隙袁。此時(shí)痰娱,只要當(dāng)期時(shí)間與上一次以 M 為 taskSetManager 啟動(dòng) task 時(shí)間差小于配置的值,調(diào)度器仍然會(huì)以 locality level M 來為 taskSetManager 啟動(dòng) task
延時(shí)調(diào)度如何工作
函數(shù)TaskSetManager#getAllowedLocalityLevel
是實(shí)現(xiàn)延時(shí)調(diào)度最關(guān)鍵的地方菩收,用來返回當(dāng)前該 taskSetManager 中未執(zhí)行的 tasks 的最高可能 locality level梨睁。以下為其實(shí)現(xiàn)
/**
* Get the level we can launch tasks according to delay scheduling, based on current wait time.
*/
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
// Remove the scheduled or finished tasks lazily
def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
var indexOffset = pendingTaskIds.size
while (indexOffset > 0) {
indexOffset -= 1
val index = pendingTaskIds(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
return true
} else {
pendingTaskIds.remove(indexOffset)
}
}
false
}
// Walk through the list of tasks that can be scheduled at each location and returns true
// if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have
// already been scheduled.
def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
val emptyKeys = new ArrayBuffer[String]
val hasTasks = pendingTasks.exists {
case (id: String, tasks: ArrayBuffer[Int]) =>
if (tasksNeedToBeScheduledFrom(tasks)) {
true
} else {
emptyKeys += id
false
}
}
// The key could be executorId, host or rackId
emptyKeys.foreach(id => pendingTasks.remove(id))
hasTasks
}
while (currentLocalityIndex < myLocalityLevels.length - 1) {
val moreTasks = myLocalityLevels(currentLocalityIndex) match {
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
}
if (!moreTasks) {
// This is a performance optimization: if there are no more tasks that can
// be scheduled at a particular locality level, there is no point in waiting
// for the locality wait timeout (SPARK-4939).
lastLaunchTime = curTime
logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
currentLocalityIndex += 1
} else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
// Jump to the next locality level, and reset lastLaunchTime so that the next locality
// wait timer doesn't immediately expire
lastLaunchTime += localityWaits(currentLocalityIndex)
currentLocalityIndex += 1
logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " +
s"${localityWaits(currentLocalityIndex)}ms")
} else {
return myLocalityLevels(currentLocalityIndex)
}
}
myLocalityLevels(currentLocalityIndex)
}
代碼有點(diǎn)小長,好在并不復(fù)雜娜饵,一些關(guān)鍵注釋在以上源碼中都有注明坡贺。
循環(huán)條件為while (currentLocalityIndex < myLocalityLevels.length - 1)
,
其中myLocalityLevels: Array[TaskLocality.TaskLocality]
是當(dāng)前 TaskSetManager 的所有 tasks 所包含的本地性(locality)集合箱舞,本地性越高的 locality level 在 myLocalityLevels 中的下標(biāo)越斜榉亍(具體請參見http://www.reibang.com/p/05034a9c8cae)
currentLocalityIndex 是 getAllowedLocalityLevel 前一次返回的 locality level 在 myLocalityLevels 中的索引(下標(biāo)),若 getAllowedLocalityLevel 是第一次被調(diào)用晴股,則 currentLocalityIndex 為0
整個(gè)循環(huán)體都在做這幾個(gè)事情:
判斷
myLocalityLevels(currentLocalityIndex)
這個(gè)級(jí)別的本地性對應(yīng)的待執(zhí)行 tasks 集合中是否還有待執(zhí)行的 task若無愿伴;則將
currentLocalityIndex += 1
進(jìn)行下一次循環(huán),即將 locality level 降低一級(jí)回到第1步若有队魏,且當(dāng)前時(shí)間與上次getAllowedLocalityLevel返回
myLocalityLevels(currentLocalityIndex)
時(shí)間間隔小于myLocalityLevels(currentLocalityIndex)
對應(yīng)的延遲時(shí)間(通過spark.locality.wait.process或spark.locality.wait.node或spark.locality.wait.rack
配置)公般,則 currentLocalityIndex 不變,返回myLocalityLevels(currentLocalityIndex)胡桨。這里是延遲調(diào)度的關(guān)鍵官帘,只要當(dāng)前時(shí)間與上一次以某個(gè) locality level 啟動(dòng) task 的時(shí)間只差小于配置的值,不管上次是否成功啟動(dòng)了 task昧谊,這一次仍然以上次的 locality level 來啟動(dòng) task刽虹。說的更明白一些:比如上次以 localtyX 為 taskSetManager 啟動(dòng) task 失敗,說明taskSetManager 中 tasks 對應(yīng) localityX 的節(jié)點(diǎn)均沒有空閑資源來啟動(dòng) task呢诬,但 Spark 此時(shí)仍然會(huì)以 localityX 來為 taskSetManager 啟動(dòng) task涌哲。為什么要這樣做胖缤?一般來說,task 執(zhí)行耗時(shí)相對于網(wǎng)絡(luò)傳輸/文件IO 要小得多阀圾,調(diào)度器多等待1 2秒可能就可以以更好的本地性執(zhí)行 task哪廓,避免了更耗時(shí)的網(wǎng)絡(luò)傳輸或文件IO,task 整體執(zhí)行時(shí)間會(huì)降低-
若有初烘,且當(dāng)前時(shí)間與上次getAllowedLocalityLevel返回
myLocalityLevels(currentLocalityIndex)
時(shí)間間隔大于myLocalityLevels(currentLocalityIndex)
對應(yīng)的延遲時(shí)間涡真,則將currentLocalityIndex += 1
進(jìn)行下一次循環(huán),即將 locality level 降低一級(jí)回到第1步
下面為幫助理解代碼的部分說明
判斷是否還有當(dāng)前 locality level 的 task 需要執(zhí)行
val moreTasks = myLocalityLevels(currentLocalityIndex) match {
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
}
moreTasksToRunIn就不進(jìn)行過多解釋了肾筐,主要作用有兩點(diǎn):
- 對于不同等級(jí)的 locality level 的 tasks 列表哆料,將已經(jīng)成功執(zhí)行的或正在執(zhí)行的該 locality level 的 task 從對應(yīng)的列表中移除
- 判斷對應(yīng)的 locality level 的 task 是否還要等待執(zhí)行的,若有則返回 true吗铐,否則返回 false
以 myLocalityLevels(currentLocalityIndex)
等于 PROCESS_LOCAL
為例东亦,這一段代碼用來判斷該 taskSetManager 中的 tasks 是否還有 task 的 locality levels 包含 PROCESS_LOCAL
if (!moreTasks)
若!moreTasks,則對currentLocalityIndex加1唬渗,即 locality level 變低一級(jí)典阵,再次循環(huán)。
根據(jù) http://www.reibang.com/p/05034a9c8cae 的分析我們知道镊逝,若一個(gè) task 存在于某個(gè) locality level 為 level1 待執(zhí)行 tasks 集合中萄喳,那么該 task 也一定存在于所有 locality level 低于 level1 的待執(zhí)行 tasks 集合。
從另一個(gè)角度看蹋半,對于每個(gè) task,總是嘗試以最高的 locality level 去啟動(dòng)充坑,若啟動(dòng)失敗且下次以該 locality 啟動(dòng)時(shí)間與上次以該 locality level 啟動(dòng)時(shí)間超過配置的值减江,則將 locality level 降低一級(jí)來嘗試啟動(dòng) task