提高spark任務(wù)穩(wěn)定性1 - Blacklist 機(jī)制

場景

一個 spark 應(yīng)用的產(chǎn)生過程: 獲取需求 -> 編寫spark代碼 -> 測試通過 -> 扔上平臺調(diào)度。

往往應(yīng)用會正常運(yùn)行一段時間修噪,突然有一天運(yùn)行失敗,或是失敗了一次才運(yùn)行成功着降。

從開發(fā)者的角度看化漆,我的代碼沒問題,測試也通過了殊轴,之前一段都運(yùn)行好好的衰倦,怎么突然就失敗了呢?為什么我重新調(diào)度又能正常運(yùn)行了旁理,是不是你們平臺不穩(wěn)定樊零?

是什么導(dǎo)致了上述問題?

分布式集群中,特別是高負(fù)載的情況下韧拒,就會引發(fā)很多意想不到的問題淹接,例如:

  1. 壞盤/硬盤滿將會 導(dǎo)致 /path/to/usercache 目錄創(chuàng)建失敗,一個stage中任務(wù)失敗次數(shù)達(dá)到一定次數(shù)(spark.task.maxFailures)會導(dǎo)致整個job失敗叛溢。
  2. executor 注冊 external shuffle service 超時。
  3. executor 從 external shuffle service 獲取數(shù)據(jù)超時劲适,task 反復(fù)失敗后導(dǎo)致了整 個stage 的失敗楷掉。
  4. 環(huán)境依賴問題,例如 xxx 包不存在, xxx 包沒有安裝烹植。
  5. dns 沒有配置斑鸦,網(wǎng)絡(luò)不通。
  6. etc.

為什么 task 失敗后還會被 schedular 重新調(diào)度在原來的 node 或是 executor上草雕?

數(shù)據(jù)本地性(spark會優(yōu)先把task調(diào)度在有相應(yīng)數(shù)據(jù)的節(jié)點(diǎn)上)導(dǎo)致巷屿。

是否只能聽天由命,每次失敗后重新調(diào)度墩虹? 如果任務(wù)有SLA的限制怎么辦嘱巾?

介紹

spark 2.1 中增加了 blacklist 機(jī)制,當(dāng)前(2.3.0)還是試驗(yàn)性質(zhì)的功能诫钓,黑名單機(jī)制允許你設(shè)置 task 在 executor / node 上失敗次數(shù)的閾值旬昭, 從而避免了一路走到黑的情況出現(xiàn)。 :)

相關(guān)參數(shù)

配置 默認(rèn)值 描述
spark.blacklist.enabled false 是否開啟黑名單機(jī)制
spark.blacklist.timeout 1h 對于被加入 application 黑名單的 executor/節(jié)點(diǎn) 菌湃,多長時間后無條件的移出黑名單以運(yùn)行新任務(wù)
spark.blacklist.task.maxTaskAttemptsPerExecutor 1 對于同一個 task 在某個 executor 中的失敗重試閾值问拘。達(dá)到閾值后,在執(zhí)行這個 task 時惧所,該 executor 將被加入黑名單
spark.blacklist.task.maxTaskAttemptsPerNode 2 對于同一個 task 在某個節(jié)點(diǎn)上的失敗重試閾值骤坐。達(dá)到閾值后,在執(zhí)行這個 task 時下愈,該節(jié)點(diǎn)將被加入黑名單
spark.blacklist.stage.maxFailedTasksPerExecutor 2 一個 stage 中纽绍,不同的 task 在同一個 executor 的失敗閾值。達(dá)到閾值后驰唬,在執(zhí)行這個 stage 時該 executor 將會被加入黑名單
spark.blacklist.stage.maxFailedExecutorsPerNode 2 一個 stage 中顶岸,不同的 executor 加入黑名單的閾值。達(dá)到閾值后叫编,在執(zhí)行這個 stage 時該節(jié)點(diǎn)將會被加入黑名單
spark.blacklist.application.maxFailedTasksPerExecutor 2 在同一個 executor 中辖佣,不同的 task的失敗閾值 。達(dá)到閾值后搓逾,在整個 appliction 運(yùn)行期間卷谈,該 executor 都會被加入黑名單,加入時間超過 spark.blacklist.timeout 后霞篡,自動從黑名單中移除世蔗。值得注意的是,如果開啟了 dynamic allocation朗兵,這些 executor 可能會由于空閑時間過長被回收污淋。
spark.blacklist.application.maxFailedExecutorsPerNode 2 在一個節(jié)點(diǎn)中,不同 executor 加入 application 黑名單的閾值余掖。達(dá)到這個閾值后寸爆,該節(jié)點(diǎn)會進(jìn)入 application 黑名單,加入時間超過 spark.blacklist.timeout 后,自動從黑名單中移除赁豆。值得注意的是仅醇,如果開啟了 dynamic allocation,該節(jié)點(diǎn)上的 executor 可能會由于空閑時間過長被回收魔种。
spark.blacklist.killBlacklistedExecutors false 如果開啟該配置析二,spark 會自動關(guān)閉并重啟加入黑名單的 executor,如果整個節(jié)點(diǎn)都加入了黑名單节预,則該節(jié)點(diǎn)上的所有 executor 都會被關(guān)閉叶摄。
spark.blacklist.application.fetchFailure.enabled false 如果開啟該配置,當(dāng)發(fā)生 fetch failure時心铃,立即將該 executor 加入到黑名單准谚。要是開啟了 external shuffle service,整個節(jié)點(diǎn)都會被加入黑名單去扣。

實(shí)現(xiàn)細(xì)節(jié)

因?yàn)槭菍?shí)驗(yàn)性質(zhì)的功能柱衔,所以代碼可能會隨時變動。

只貼出部分核心代碼愉棱。

TaskSetBlacklist

黑名單賬本:

//k:executor v:該executor上每個 task 的失敗情況(task失敗的次數(shù)和最近一次失敗時間)
val execToFailures = new HashMap[String, ExecutorFailuresInTaskSet]()

//k:節(jié)點(diǎn)唆铐,v:該節(jié)點(diǎn)上有失敗任務(wù)的 executor
private val nodeToExecsWithFailures = new HashMap[String, HashSet[String]]()
//k:節(jié)點(diǎn), v:該節(jié)點(diǎn)上加入黑名單的 taskId
private val nodeToBlacklistedTaskIndexes = new HashMap[String, HashSet[Int]]()
  
//加入黑名單的 executor 
private val blacklistedExecs = new HashSet[String]()
//加入黑名單的 node
private val blacklistedNodes = new HashSet[String]()
// 判斷 executor 是否加入了給定 task 的黑名單
def isExecutorBlacklistedForTask(executorId: String, index: Int): Boolean = {
    execToFailures.get(executorId).exists { execFailures =>
      execFailures.getNumTaskFailures(index) >= MAX_TASK_ATTEMPTS_PER_EXECUTOR
    }
}

//判斷 node 是否加入了給定 task 的黑名單
def isNodeBlacklistedForTask(node: String, index: Int): Boolean = {
    nodeToBlacklistedTaskIndexes.get(node).exists(_.contains(index))
}

當(dāng)有task失敗時,TaskSetManager 會調(diào)用更新黑名單的操作:

  1. 根據(jù) taskid 更新 excutor 上該 task 的失敗次數(shù)和失敗時間
  2. 判斷 task 是否在該節(jié)點(diǎn)其他 executor 上有失敗記錄奔滑,如果有艾岂,將重試次數(shù)相加,如果 >= MAX_TASK_ATTEMPTS_PER_NODE 朋其,則將該 node 加入這個 taskId 的黑名單
  3. 判斷在這個stage中王浴,一個executor中失敗的任務(wù)次數(shù)是否 >= MAX_FAILURES_PER_EXEC_STAGE,如果是梅猿,則將該 executor 加入這個 stageId 的黑名單
  4. 判斷在這個stage中氓辣,同一個 node 的 executor 的失敗記錄是否 >= MAX_FAILED_EXEC_PER_NODE_STAGE,如果是袱蚓,則將該 node 加入這個 stageId 的黑名單

閾值參數(shù):

  • MAX_TASK_ATTEMPTS_PER_EXECUTOR:每個 executor 上最大的任務(wù)重試次數(shù)
  • MAX_TASK_ATTEMPTS_PER_NODE:每個 node 上最大的任務(wù)重試次數(shù)
  • MAX_FAILURES_PER_EXEC_STAGE:一個 stage 中钞啸,每個executor 上最多任務(wù)失敗次數(shù)
  • MAX_FAILED_EXEC_PER_NODE_STAGE:一個 stage 中,每個節(jié)點(diǎn)上 executor 的最多失敗次數(shù)
  private[scheduler] def updateBlacklistForFailedTask(
      host: String,
      exec: String,
      index: Int,
      failureReason: String): Unit = {
    latestFailureReason = failureReason
    val execFailures = execToFailures.getOrElseUpdate(exec, new ExecutorFailuresInTaskSet(host))
    execFailures.updateWithFailure(index, clock.getTimeMillis())

    val execsWithFailuresOnNode = nodeToExecsWithFailures.getOrElseUpdate(host, new HashSet())
    execsWithFailuresOnNode += exec
    val failuresOnHost = execsWithFailuresOnNode.toIterator.flatMap { exec =>
      execToFailures.get(exec).map { failures =>
        failures.getNumTaskFailures(index)
      }
    }.sum
    if (failuresOnHost >= MAX_TASK_ATTEMPTS_PER_NODE) {
      nodeToBlacklistedTaskIndexes.getOrElseUpdate(host, new HashSet()) += index
    }

    if (execFailures.numUniqueTasksWithFailures >= MAX_FAILURES_PER_EXEC_STAGE) {
      if (blacklistedExecs.add(exec)) {
        logInfo(s"Blacklisting executor ${exec} for stage $stageId")
        val blacklistedExecutorsOnNode =
          execsWithFailuresOnNode.filter(blacklistedExecs.contains(_))
        if (blacklistedExecutorsOnNode.size >= MAX_FAILED_EXEC_PER_NODE_STAGE) {
          if (blacklistedNodes.add(host)) {
            logInfo(s"Blacklisting ${host} for stage $stageId")
          }
        }
      }
    }
  }

BlacklistTracker

實(shí)現(xiàn)原理和TaskSetBlacklist喇潘,下文就不再貼出黑名單判斷体斩,黑名單對象等代碼。

TaskSetBlacklist 不同的是颖低,在一個 taskSet 完全成功之前絮吵,BlacklistTracker 無法獲取到任務(wù)失敗的情況。

核心代碼:

當(dāng)一個 taskSet 執(zhí)行成功時會調(diào)用以下代碼忱屑,流程如下:

  1. 將每個 executor 上的 task 失敗次數(shù)進(jìn)行累計源武,如果 executor 最后一次 task 失敗的時間超過 BLACKLIST_TIMEOUT_MILLIS扼褪,則移除該失敗任務(wù)想幻。
  2. 如果 executor 上失敗次數(shù)大于等于設(shè)定的閾值并且不在黑名單中
    • executor 及其對應(yīng)的到期時間加入到 application 的黑名單中粱栖,從executor失敗列表中移除該 executor,并更新 nextExpiryTime脏毯,用于下次啟動任務(wù)的時候判斷黑名單是否已到期
    • 根據(jù) spark.blacklist.killBlacklistedExecutors 判斷是否要?dú)⑺?executor
    • 更新 node 上的 executor 失敗次數(shù)
    • 如果一個節(jié)點(diǎn)上的 executor 的失敗次數(shù)大于等于閾值并且不在黑名單中
      • node 及其對應(yīng)的到期時間加入到 application 的黑名單中
      • 如果開啟了 spark.blacklist.killBlacklistedExecutors闹究,則將此 node 上的所有 executor 殺死
  • BLACKLIST_TIMEOUT_MILLIS:加入黑名單后的過期時間
  • MAX_FAILURES_PER_EXEC:每個executor上最多的task失敗次數(shù)
  • MAX_FAILED_EXEC_PER_NODE: 每個節(jié)點(diǎn)上加入黑名單的executor的最大數(shù)量
def updateBlacklistForSuccessfulTaskSet(
      stageId: Int,
      stageAttemptId: Int,
      failuresByExec: HashMap[String, ExecutorFailuresInTaskSet]): Unit = {
    val now = clock.getTimeMillis()
    failuresByExec.foreach { case (exec, failuresInTaskSet) =>
      val appFailuresOnExecutor =
        executorIdToFailureList.getOrElseUpdate(exec, new ExecutorFailureList)
      appFailuresOnExecutor.addFailures(stageId, stageAttemptId, failuresInTaskSet)
      appFailuresOnExecutor.dropFailuresWithTimeoutBefore(now)
      val newTotal = appFailuresOnExecutor.numUniqueTaskFailures

      val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS
      if (newTotal >= MAX_FAILURES_PER_EXEC && !executorIdToBlacklistStatus.contains(exec)) {
        logInfo(s"Blacklisting executor id: $exec because it has $newTotal" +
          s" task failures in successful task sets")
        val node = failuresInTaskSet.node
        executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTimeForNewBlacklists))
        listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
        executorIdToFailureList.remove(exec)
        updateNextExpiryTime()
        killBlacklistedExecutor(exec)

        val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(node, HashSet[String]())
        blacklistedExecsOnNode += exec
        if (blacklistedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE &&
            !nodeIdToBlacklistExpiryTime.contains(node)) {
          logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " +
            s"executors blacklisted: ${blacklistedExecsOnNode}")
          nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
          listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
          _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
          killExecutorsOnBlacklistedNode(node)
        }
      }
    }
  }

什么時候進(jìn)行黑名單的判斷

一個 stage 提交的調(diào)用鏈:

TaskSchedulerImpl.submitTasks ->
CoarseGrainedSchedulerBackend.reviveOffers ->
CoarseGrainedSchedulerBackend.makeOffers ->
TaskSchedulerImpl.resourceOffers ->
TaskSchedulerImpl.resourceOfferSingleTaskSet ->
CoarseGrainedSchedulerBackend.launchTasks

appliaction 級別的黑名單在 TaskSchedulerImpl.resourceOffers 中完成判斷,stage/task 級別的黑名單在 TaskSchedulerImpl.resourceOfferSingleTaskSet 中完成判斷食店。

如果所有的節(jié)點(diǎn)都被加入了黑名單渣淤?

如果將task的重試次數(shù)設(shè)置的比較高,有可能會出現(xiàn)這個問題吉嫩,這個時候价认。將會中斷這個 stage 的執(zhí)行

TaskSchedulerImpl.resourceOffers

if (!launchedAnyTask) {
    taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}

結(jié)語

簡單的來說,對于一個 application 自娩,提供了三種級別的黑名單可以用于 executor/node: task blacklist -> stage blacklist -> application blacklist

通過這些黑名單的設(shè)置可以避免由于 task 反復(fù)調(diào)度在有問題的 executor/node (壞盤用踩,磁盤滿了,shuffle fetch 失敗忙迁,環(huán)境錯誤等)上脐彩,進(jìn)而導(dǎo)致整個 Application 運(yùn)行失敗的情況。

tip: BlacklistTracker.updateBlacklistForFetchFailure 在當(dāng)前版本(2.3.0)存在BUG SPARK-24021姊扔,將在 2.3.1 進(jìn)行修復(fù)惠奸。如果打開了 spark.blacklist.application.fetchFailure.enabled 配置將會受到影響。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末恰梢,一起剝皮案震驚了整個濱河市佛南,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌嵌言,老刑警劉巖嗅回,帶你破解...
    沈念sama閱讀 218,640評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異呀页,居然都是意外死亡妈拌,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,254評論 3 395
  • 文/潘曉璐 我一進(jìn)店門蓬蝶,熙熙樓的掌柜王于貴愁眉苦臉地迎上來尘分,“玉大人,你說我怎么就攤上這事丸氛∨喑睿” “怎么了?”我有些...
    開封第一講書人閱讀 165,011評論 0 355
  • 文/不壞的土叔 我叫張陵缓窜,是天一觀的道長定续。 經(jīng)常有香客問我谍咆,道長,這世上最難降的妖魔是什么私股? 我笑而不...
    開封第一講書人閱讀 58,755評論 1 294
  • 正文 為了忘掉前任摹察,我火速辦了婚禮,結(jié)果婚禮上倡鲸,老公的妹妹穿的比我還像新娘供嚎。我一直安慰自己,他們只是感情好峭状,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,774評論 6 392
  • 文/花漫 我一把揭開白布克滴。 她就那樣靜靜地躺著,像睡著了一般优床。 火紅的嫁衣襯著肌膚如雪劝赔。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,610評論 1 305
  • 那天胆敞,我揣著相機(jī)與錄音着帽,去河邊找鬼。 笑死竿秆,一個胖子當(dāng)著我的面吹牛启摄,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播幽钢,決...
    沈念sama閱讀 40,352評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼歉备,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了匪燕?” 一聲冷哼從身側(cè)響起蕾羊,我...
    開封第一講書人閱讀 39,257評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎帽驯,沒想到半個月后龟再,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,717評論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡尼变,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,894評論 3 336
  • 正文 我和宋清朗相戀三年利凑,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片嫌术。...
    茶點(diǎn)故事閱讀 40,021評論 1 350
  • 序言:一個原本活蹦亂跳的男人離奇死亡哀澈,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出度气,到底是詐尸還是另有隱情割按,我是刑警寧澤,帶...
    沈念sama閱讀 35,735評論 5 346
  • 正文 年R本政府宣布磷籍,位于F島的核電站适荣,受9級特大地震影響现柠,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜弛矛,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,354評論 3 330
  • 文/蒙蒙 一够吩、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧汪诉,春花似錦废恋、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,936評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽拟烫。三九已至该编,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間硕淑,已是汗流浹背课竣。 一陣腳步聲響...
    開封第一講書人閱讀 33,054評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留置媳,地道東北人于樟。 一個月前我還...
    沈念sama閱讀 48,224評論 3 371
  • 正文 我出身青樓,卻偏偏與公主長得像拇囊,于是被迫代替她去往敵國和親迂曲。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,974評論 2 355