CoarseGrainedSchedulerBackend 以 spark.scheduler.revive.interval 默認(rèn)1s調(diào)用makeoffers(), 在分配到的executor上調(diào)度task撬碟;makeoffers() 中scheduler.resourceOffers(workerOffers)產(chǎn)生可執(zhí)行的task策略凭豪,包含task到executor的映射方面;
每一個(gè)stage對(duì)應(yīng)的tasks都由一個(gè)TaskSetManager管理氓栈,分配策略由以下生成:
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels){
do {
launchedTask = resourceOfferSingleTaskSet(taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)}
其中myLocalityLevels是對(duì)應(yīng)taskSet中所包含的所有本地性偏好級(jí)別纺讲,包括PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY;
shuffledOffers是對(duì)executor offer的隨機(jī)化處理墩瞳,taskSet整個(gè)分配過(guò)程是兩層for循環(huán):
第一層for循環(huán)就是上面的maxLocality.taskSet.myLocalityLevels售滤;
第二層for循環(huán)在resourceOfferSingleTaskSet豹芯,對(duì)offer中的每一個(gè)executor進(jìn)行判斷悄雅,是否有一個(gè)task能夠滿足本地性偏好,和executor綁定一起铁蹈,形成一個(gè)執(zhí)行略宽闲;
其實(shí)還有第三層for循環(huán),就是dequeueTask握牧;
重點(diǎn)看TaskSetManager::resourceOffer:
一個(gè)關(guān)鍵的問(wèn)題就是如何通過(guò)延遲機(jī)制保證數(shù)據(jù)本地行容诬,其實(shí)現(xiàn)方法就在getAllowedLocalityLevel,spark關(guān)于延遲調(diào)度由三個(gè)參數(shù):
spark.locality.wait.process, spark.locality.wait.node, spark.locality.wait.rack, 默認(rèn)3s沿腰;
TaskSetManager中記錄了lastLaunchTime览徒,如果當(dāng)前時(shí)間減去lastLaunchTime大于上面的值,對(duì)應(yīng)getAllowedLocalityLevel就返回允許的本地偏好級(jí)別颂龙;
在遍歷過(guò)程中习蓬,第一層的for循環(huán)的locality和getAllowedLocalityLevel的返回值取最小值,然后執(zhí)行dequeueTask措嵌,如果dequeueTask如果返回Some(_)躲叼,則更新lastLaunchTime和currentLocalityIndex;
這就帶來(lái)一個(gè)問(wèn)題企巢,我們舉一個(gè)極端的例子:
有100個(gè)executor和100個(gè)task枫慷,每?jī)蓚€(gè)executor一個(gè)node,每四個(gè)executor一個(gè)rack,100個(gè)task的本地便好全都是到executor1的process或听,則整個(gè)調(diào)度過(guò)程如下:
- 第一次dequeue task1到executor1探孝,之后executor2~executor100的遍歷,由于本地性原因誉裆,全部調(diào)度失敗再姑,并且dequeueTask導(dǎo)致currentLocalityIndex=0;
- 3s過(guò)后找御,currentLocalityIndex加1元镀,getAllowedLocality返回NODE_LOCAL,導(dǎo)致task2被調(diào)度到executor2霎桅,但是executor3~executor100均調(diào)度失斊芤伞;
- 2s過(guò)后假如executor1執(zhí)行task1結(jié)束滔驶,executor1參與調(diào)度遇革,task3成功調(diào)度到executor1;
- dequeueTask返回Some(_)揭糕,currentLocalityIndex=0萝快,lastLaunch=curTime,其他executor調(diào)度失斨恰揪漩;
- 1s過(guò)后,類似步驟2的情況再次發(fā)生吏口;
- 假如上米娜四步循環(huán)發(fā)生奄容,會(huì)導(dǎo)致長(zhǎng)時(shí)間的executor處于idle狀態(tài),默認(rèn)60s产徊,idle的executor被系統(tǒng)釋放掉昂勒,
- stage被拖死;
應(yīng)對(duì)方法:
- 調(diào)整currentLocalityIndex和lastLaunchTime的更新策略舟铜,能夠提高task的調(diào)度效率戈盈;
- 減少spark.locaity.wait;
以上兩點(diǎn)均以犧牲數(shù)據(jù)本地性為代價(jià)谆刨。
補(bǔ)充寫(xiě)一點(diǎn)Spark關(guān)于本地偏好的機(jī)制塘娶,Spark通過(guò)RDD的依賴關(guān)系拓?fù)鋱D來(lái)描述整個(gè)一個(gè)Job的計(jì)算過(guò)程,整個(gè)拓?fù)鋱D通過(guò)shuffle dependency來(lái)劃分出各個(gè)stage痴荐,我們說(shuō)一個(gè)stage就是從一個(gè)shuffle-read開(kāi)始到一個(gè)shuffle-write血柳,task運(yùn)行的executor距離shuffle-read(或者讀取dfs, cache)數(shù)據(jù)物理距離越近,本地性就越強(qiáng)生兆,那hdfs距離难捌,RDD的HadoopPartition內(nèi)部就描述著split的位置信息膝宁,而這樣的信息會(huì)在DAGScheduler.submitMissingTasks時(shí)通過(guò)listenerBus以SparkListenerStageSubmitted的形式通知給ExecutorAllocationManager,ExecutorAllocation據(jù)此向ExecutorAllocationClient指示最終通過(guò)YarnAllocator如何申請(qǐng)executor根吁,申請(qǐng)獲得的executor以offer的形式最終分配給TaskScheduler员淫,offer和task最終在TaskSetManager內(nèi)部完成匹配;
TaskSetManager內(nèi)有幾個(gè)存儲(chǔ)結(jié)構(gòu)击敌,
pendingTasksForExecutor,
pendingTasksForHost,
pendingTasksForRack,
pendingTasksWithNoPrefs,
allPendingTasks介返;
由低到高,假如一個(gè)task存在于pendingTasksForExecutor沃斤,它一定存在于其他四種圣蝎,相應(yīng)的key就是executor所屬的Host,Rack等衡瓶;上面四個(gè)集合在tasks初始化的時(shí)候就根據(jù)task的location preference確定了徘公,TaskSetManager只要根據(jù)offer內(nèi)的executor,按照延遲分配的策略哮针,匹配出對(duì)應(yīng)的task即可关面,當(dāng)然,也有可能由于本地性的原因十厢,無(wú)法匹配出任何task等太;
下面終點(diǎn)講一下延遲匹配的函數(shù):
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
......
while (currentLocalityIndex < myLocalityLevels.length - 1) {
val moreTasks = myLocalityLevels(currentLocalityIndex) match {
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
}
if (!moreTasks) {
// This is a performance optimization: if there are no more tasks that can
// be scheduled at a particular locality level, there is no point in waiting
// for the locality wait timeout (SPARK-4939).
lastLaunchTime = curTime
logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " + s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}") currentLocalityIndex += 1
} else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
// Jump to the next locality level, and reset lastLaunchTime so that the next locality
// wait timer doesn't immediately expire
lastLaunchTime += localityWaits(currentLocalityIndex)
logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " + s"${localityWaits(currentLocalityIndex)}ms")
currentLocalityIndex += 1 }
else {
return myLocalityLevels(currentLocalityIndex)
}
}
myLocalityLevels(currentLocalityIndex)
}
延遲策略的關(guān)鍵就是currentLocalityIndex的變化,終點(diǎn)是上面的else if蛮放,這里面一個(gè)地方有一些微妙缩抡,就是四個(gè)級(jí)別的先后順序是process, node, nopref, rack, any,有人可能會(huì)猜測(cè)nopref會(huì)被rack先選中筛武,其實(shí)不可能缝其,因?yàn)閚opref對(duì)應(yīng)的wait時(shí)間默認(rèn)是0s挎塌,所以while循環(huán)內(nèi)徘六,在遍歷到node之后,會(huì)自動(dòng)遍歷通過(guò)nopref榴都,并進(jìn)入下一次遍歷到rack待锈,所以如果我們本地性偏好幾種在rack,就可以把所有的wait值設(shè)成0嘴高,然后rack的wait值設(shè)成1~3s竿音,這樣能夠緩解因?yàn)楸镜仄玫絹?lái)的調(diào)度效率低下,極端情況下還是不能避免上面舉出的例子拴驮,但是生產(chǎn)情況下應(yīng)該會(huì)好很多春瞬。