場景
一個 spark 應(yīng)用的產(chǎn)生過程: 獲取需求 -> 編寫spark代碼 -> 測試通過 -> 扔上平臺調(diào)度。
往往應(yīng)用會正常運(yùn)行一段時間修噪,突然有一天運(yùn)行失敗,或是失敗了一次才運(yùn)行成功着降。
從開發(fā)者的角度看化漆,我的代碼沒問題,測試也通過了殊轴,之前一段都運(yùn)行好好的衰倦,怎么突然就失敗了呢?為什么我重新調(diào)度又能正常運(yùn)行了旁理,是不是你們平臺不穩(wěn)定樊零?
是什么導(dǎo)致了上述問題?
分布式集群中,特別是高負(fù)載的情況下韧拒,就會引發(fā)很多意想不到的問題淹接,例如:
- 壞盤/硬盤滿將會 導(dǎo)致
/path/to/usercache
目錄創(chuàng)建失敗,一個stage中任務(wù)失敗次數(shù)達(dá)到一定次數(shù)(spark.task.maxFailures
)會導(dǎo)致整個job失敗叛溢。 - executor 注冊 external shuffle service 超時。
- executor 從 external shuffle service 獲取數(shù)據(jù)超時劲适,task 反復(fù)失敗后導(dǎo)致了整 個stage 的失敗楷掉。
- 環(huán)境依賴問題,例如 xxx 包不存在, xxx 包沒有安裝烹植。
- dns 沒有配置斑鸦,網(wǎng)絡(luò)不通。
- etc.
為什么 task
失敗后還會被 schedular
重新調(diào)度在原來的 node
或是 executor
上草雕?
數(shù)據(jù)本地性(spark會優(yōu)先把task調(diào)度在有相應(yīng)數(shù)據(jù)的節(jié)點(diǎn)上)導(dǎo)致巷屿。
是否只能聽天由命,每次失敗后重新調(diào)度墩虹? 如果任務(wù)有SLA的限制怎么辦嘱巾?
介紹
spark 2.1 中增加了 blacklist
機(jī)制,當(dāng)前(2.3.0)還是試驗(yàn)性質(zhì)的功能诫钓,黑名單機(jī)制允許你設(shè)置 task 在 executor / node 上失敗次數(shù)的閾值旬昭, 從而避免了一路走到黑的情況出現(xiàn)。 :)
相關(guān)參數(shù)
配置 | 默認(rèn)值 | 描述 |
---|---|---|
spark.blacklist.enabled | false | 是否開啟黑名單機(jī)制 |
spark.blacklist.timeout | 1h | 對于被加入 application 黑名單的 executor/節(jié)點(diǎn) 菌湃,多長時間后無條件的移出黑名單以運(yùn)行新任務(wù) |
spark.blacklist.task.maxTaskAttemptsPerExecutor | 1 | 對于同一個 task 在某個 executor 中的失敗重試閾值问拘。達(dá)到閾值后,在執(zhí)行這個 task 時惧所,該 executor 將被加入黑名單 |
spark.blacklist.task.maxTaskAttemptsPerNode | 2 | 對于同一個 task 在某個節(jié)點(diǎn)上的失敗重試閾值骤坐。達(dá)到閾值后,在執(zhí)行這個 task 時下愈,該節(jié)點(diǎn)將被加入黑名單 |
spark.blacklist.stage.maxFailedTasksPerExecutor | 2 | 一個 stage 中纽绍,不同的 task 在同一個 executor 的失敗閾值。達(dá)到閾值后驰唬,在執(zhí)行這個 stage 時該 executor 將會被加入黑名單 |
spark.blacklist.stage.maxFailedExecutorsPerNode | 2 | 一個 stage 中顶岸,不同的 executor 加入黑名單的閾值。達(dá)到閾值后叫编,在執(zhí)行這個 stage 時該節(jié)點(diǎn)將會被加入黑名單 |
spark.blacklist.application.maxFailedTasksPerExecutor | 2 | 在同一個 executor 中辖佣,不同的 task的失敗閾值 。達(dá)到閾值后搓逾,在整個 appliction 運(yùn)行期間卷谈,該 executor 都會被加入黑名單,加入時間超過 spark.blacklist.timeout 后霞篡,自動從黑名單中移除世蔗。值得注意的是,如果開啟了 dynamic allocation 朗兵,這些 executor 可能會由于空閑時間過長被回收污淋。 |
spark.blacklist.application.maxFailedExecutorsPerNode | 2 | 在一個節(jié)點(diǎn)中,不同 executor 加入 application 黑名單的閾值余掖。達(dá)到這個閾值后寸爆,該節(jié)點(diǎn)會進(jìn)入 application 黑名單,加入時間超過 spark.blacklist.timeout 后,自動從黑名單中移除赁豆。值得注意的是仅醇,如果開啟了 dynamic allocation ,該節(jié)點(diǎn)上的 executor 可能會由于空閑時間過長被回收魔种。 |
spark.blacklist.killBlacklistedExecutors | false | 如果開啟該配置析二,spark 會自動關(guān)閉并重啟加入黑名單的 executor,如果整個節(jié)點(diǎn)都加入了黑名單节预,則該節(jié)點(diǎn)上的所有 executor 都會被關(guān)閉叶摄。 |
spark.blacklist.application.fetchFailure.enabled | false | 如果開啟該配置,當(dāng)發(fā)生 fetch failure 時心铃,立即將該 executor 加入到黑名單准谚。要是開啟了 external shuffle service ,整個節(jié)點(diǎn)都會被加入黑名單去扣。 |
實(shí)現(xiàn)細(xì)節(jié)
因?yàn)槭菍?shí)驗(yàn)性質(zhì)的功能柱衔,所以代碼可能會隨時變動。
只貼出部分核心代碼愉棱。
TaskSetBlacklist
黑名單賬本:
//k:executor v:該executor上每個 task 的失敗情況(task失敗的次數(shù)和最近一次失敗時間)
val execToFailures = new HashMap[String, ExecutorFailuresInTaskSet]()
//k:節(jié)點(diǎn)唆铐,v:該節(jié)點(diǎn)上有失敗任務(wù)的 executor
private val nodeToExecsWithFailures = new HashMap[String, HashSet[String]]()
//k:節(jié)點(diǎn), v:該節(jié)點(diǎn)上加入黑名單的 taskId
private val nodeToBlacklistedTaskIndexes = new HashMap[String, HashSet[Int]]()
//加入黑名單的 executor
private val blacklistedExecs = new HashSet[String]()
//加入黑名單的 node
private val blacklistedNodes = new HashSet[String]()
// 判斷 executor 是否加入了給定 task 的黑名單
def isExecutorBlacklistedForTask(executorId: String, index: Int): Boolean = {
execToFailures.get(executorId).exists { execFailures =>
execFailures.getNumTaskFailures(index) >= MAX_TASK_ATTEMPTS_PER_EXECUTOR
}
}
//判斷 node 是否加入了給定 task 的黑名單
def isNodeBlacklistedForTask(node: String, index: Int): Boolean = {
nodeToBlacklistedTaskIndexes.get(node).exists(_.contains(index))
}
當(dāng)有task失敗時,TaskSetManager 會調(diào)用更新黑名單的操作:
- 根據(jù)
taskid
更新excutor
上該task
的失敗次數(shù)和失敗時間 - 判斷
task
是否在該節(jié)點(diǎn)其他executor
上有失敗記錄奔滑,如果有艾岂,將重試次數(shù)相加,如果 >=MAX_TASK_ATTEMPTS_PER_NODE
朋其,則將該node
加入這個taskId
的黑名單 - 判斷在這個stage中王浴,一個executor中失敗的任務(wù)次數(shù)是否 >=
MAX_FAILURES_PER_EXEC_STAGE
,如果是梅猿,則將該executor
加入這個stageId
的黑名單 - 判斷在這個stage中氓辣,同一個 node 的 executor 的失敗記錄是否 >=
MAX_FAILED_EXEC_PER_NODE_STAGE
,如果是袱蚓,則將該node
加入這個stageId
的黑名單
閾值參數(shù):
- MAX_TASK_ATTEMPTS_PER_EXECUTOR:每個 executor 上最大的任務(wù)重試次數(shù)
- MAX_TASK_ATTEMPTS_PER_NODE:每個 node 上最大的任務(wù)重試次數(shù)
- MAX_FAILURES_PER_EXEC_STAGE:一個 stage 中钞啸,每個executor 上最多任務(wù)失敗次數(shù)
- MAX_FAILED_EXEC_PER_NODE_STAGE:一個 stage 中,每個節(jié)點(diǎn)上 executor 的最多失敗次數(shù)
private[scheduler] def updateBlacklistForFailedTask(
host: String,
exec: String,
index: Int,
failureReason: String): Unit = {
latestFailureReason = failureReason
val execFailures = execToFailures.getOrElseUpdate(exec, new ExecutorFailuresInTaskSet(host))
execFailures.updateWithFailure(index, clock.getTimeMillis())
val execsWithFailuresOnNode = nodeToExecsWithFailures.getOrElseUpdate(host, new HashSet())
execsWithFailuresOnNode += exec
val failuresOnHost = execsWithFailuresOnNode.toIterator.flatMap { exec =>
execToFailures.get(exec).map { failures =>
failures.getNumTaskFailures(index)
}
}.sum
if (failuresOnHost >= MAX_TASK_ATTEMPTS_PER_NODE) {
nodeToBlacklistedTaskIndexes.getOrElseUpdate(host, new HashSet()) += index
}
if (execFailures.numUniqueTasksWithFailures >= MAX_FAILURES_PER_EXEC_STAGE) {
if (blacklistedExecs.add(exec)) {
logInfo(s"Blacklisting executor ${exec} for stage $stageId")
val blacklistedExecutorsOnNode =
execsWithFailuresOnNode.filter(blacklistedExecs.contains(_))
if (blacklistedExecutorsOnNode.size >= MAX_FAILED_EXEC_PER_NODE_STAGE) {
if (blacklistedNodes.add(host)) {
logInfo(s"Blacklisting ${host} for stage $stageId")
}
}
}
}
}
BlacklistTracker
實(shí)現(xiàn)原理和TaskSetBlacklist
喇潘,下文就不再貼出黑名單判斷体斩,黑名單對象等代碼。
與 TaskSetBlacklist
不同的是颖低,在一個 taskSet 完全成功之前絮吵,BlacklistTracker
無法獲取到任務(wù)失敗的情況。
核心代碼:
當(dāng)一個 taskSet 執(zhí)行成功時會調(diào)用以下代碼忱屑,流程如下:
- 將每個 executor 上的 task 失敗次數(shù)進(jìn)行累計源武,如果 executor 最后一次 task 失敗的時間超過
BLACKLIST_TIMEOUT_MILLIS
扼褪,則移除該失敗任務(wù)想幻。 - 如果 executor 上失敗次數(shù)大于等于設(shè)定的閾值并且不在黑名單中
- 將
executor
及其對應(yīng)的到期時間加入到application
的黑名單中粱栖,從executor失敗列表中移除該 executor,并更新nextExpiryTime
脏毯,用于下次啟動任務(wù)的時候判斷黑名單是否已到期 - 根據(jù)
spark.blacklist.killBlacklistedExecutors
判斷是否要?dú)⑺?executor
- 更新
node
上的 executor 失敗次數(shù) - 如果一個節(jié)點(diǎn)上的
executor
的失敗次數(shù)大于等于閾值并且不在黑名單中- 將
node
及其對應(yīng)的到期時間加入到application
的黑名單中 - 如果開啟了
spark.blacklist.killBlacklistedExecutors
闹究,則將此node
上的所有 executor 殺死
- 將
- 將
- BLACKLIST_TIMEOUT_MILLIS:加入黑名單后的過期時間
- MAX_FAILURES_PER_EXEC:每個executor上最多的task失敗次數(shù)
- MAX_FAILED_EXEC_PER_NODE: 每個節(jié)點(diǎn)上加入黑名單的executor的最大數(shù)量
def updateBlacklistForSuccessfulTaskSet(
stageId: Int,
stageAttemptId: Int,
failuresByExec: HashMap[String, ExecutorFailuresInTaskSet]): Unit = {
val now = clock.getTimeMillis()
failuresByExec.foreach { case (exec, failuresInTaskSet) =>
val appFailuresOnExecutor =
executorIdToFailureList.getOrElseUpdate(exec, new ExecutorFailureList)
appFailuresOnExecutor.addFailures(stageId, stageAttemptId, failuresInTaskSet)
appFailuresOnExecutor.dropFailuresWithTimeoutBefore(now)
val newTotal = appFailuresOnExecutor.numUniqueTaskFailures
val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS
if (newTotal >= MAX_FAILURES_PER_EXEC && !executorIdToBlacklistStatus.contains(exec)) {
logInfo(s"Blacklisting executor id: $exec because it has $newTotal" +
s" task failures in successful task sets")
val node = failuresInTaskSet.node
executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTimeForNewBlacklists))
listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
executorIdToFailureList.remove(exec)
updateNextExpiryTime()
killBlacklistedExecutor(exec)
val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(node, HashSet[String]())
blacklistedExecsOnNode += exec
if (blacklistedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE &&
!nodeIdToBlacklistExpiryTime.contains(node)) {
logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " +
s"executors blacklisted: ${blacklistedExecsOnNode}")
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
killExecutorsOnBlacklistedNode(node)
}
}
}
}
什么時候進(jìn)行黑名單的判斷
一個 stage 提交的調(diào)用鏈:
TaskSchedulerImpl.submitTasks
->
CoarseGrainedSchedulerBackend.reviveOffers
->
CoarseGrainedSchedulerBackend.makeOffers
->
TaskSchedulerImpl.resourceOffers
->
TaskSchedulerImpl.resourceOfferSingleTaskSet
->
CoarseGrainedSchedulerBackend.launchTasks
appliaction 級別的黑名單在 TaskSchedulerImpl.resourceOffers
中完成判斷,stage/task 級別的黑名單在 TaskSchedulerImpl.resourceOfferSingleTaskSet
中完成判斷食店。
如果所有的節(jié)點(diǎn)都被加入了黑名單渣淤?
如果將task的重試次數(shù)設(shè)置的比較高,有可能會出現(xiàn)這個問題吉嫩,這個時候价认。將會中斷這個 stage 的執(zhí)行
TaskSchedulerImpl.resourceOffers
if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}
結(jié)語
簡單的來說,對于一個 application 自娩,提供了三種級別的黑名單可以用于 executor/node: task blacklist -> stage blacklist -> application blacklist
通過這些黑名單的設(shè)置可以避免由于 task 反復(fù)調(diào)度在有問題的 executor/node
(壞盤用踩,磁盤滿了,shuffle fetch 失敗忙迁,環(huán)境錯誤等)上脐彩,進(jìn)而導(dǎo)致整個 Application
運(yùn)行失敗的情況。
tip: BlacklistTracker.updateBlacklistForFetchFailure
在當(dāng)前版本(2.3.0)存在BUG SPARK-24021姊扔,將在 2.3.1 進(jìn)行修復(fù)惠奸。如果打開了 spark.blacklist.application.fetchFailure.enabled
配置將會受到影響。