[spark] 數(shù)據(jù)本地化及延遲調(diào)度

前言

Spark數(shù)據(jù)本地化即移動計算而不是移動數(shù)據(jù)名惩,而現(xiàn)實又是殘酷的再扭,不是想要在數(shù)據(jù)塊的地方計算就有足夠的資源提供,為了讓task能盡可能的以最優(yōu)本地化級別(Locality Levels)來啟動,Spark的延遲調(diào)度應運而生不脯,資源不夠可在該Locality Levels對應的限制時間內(nèi)重試综苔,超過限制時間后還無法啟動則降低Locality Levels再嘗試啟動……

本地化級別(Locality Levels)

  • PROCESS_LOCAL:進程本地化惩系,代碼和數(shù)據(jù)在同一個進程中,也就是在同一個executor中如筛;計算數(shù)據(jù)的task由executor執(zhí)行堡牡,數(shù)據(jù)在executor的BlockManager中,性能最好
  • NODE_LOCAL:節(jié)點本地化杨刨,代碼和數(shù)據(jù)在同一個節(jié)點中晤柄;比如說,數(shù)據(jù)作為一個HDFS block塊在節(jié)點上妖胀,而task在節(jié)點上某個executor中運行芥颈;或者是數(shù)據(jù)和task在一個節(jié)點上的不同executor中,數(shù)據(jù)需要在進程間進行傳輸
  • NO_PREF:對于task來說做粤,數(shù)據(jù)從哪里獲取都一樣浇借,沒有好壞之分,比如說SparkSQL讀取MySql中的數(shù)據(jù)
  • RACK_LOCAL:機架本地化怕品,數(shù)據(jù)和task在一個機架的兩個節(jié)點上妇垢,數(shù)據(jù)需要通過網(wǎng)絡(luò)在節(jié)點之間進行傳輸
  • ANY:數(shù)據(jù)和task可能在集群中的任何地方,而且不在一個機架中,性能最差

這些Task的本地化級別其實描述的就是計算與數(shù)據(jù)的位置關(guān)系闯估,這個最終的關(guān)系是如何產(chǎn)生的呢灼舍?接下來對其來龍去脈進行詳細的講解。

DAGScheduler提交tasks

DAGScheduler對job進行stage劃分完后涨薪,會通過submitMissingTasks方法將Stage以TaskSet的形式提交給TaskScheduler骑素,看看該方法關(guān)于位置優(yōu)先的一些代碼:

...
// 獲取還未執(zhí)行或未成功執(zhí)行分區(qū)的id
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
...
// 通過getPreferredLocs方法獲取rdd該分區(qū)的優(yōu)先位置
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
      stage match {
        case s: ShuffleMapStage =>
          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
        case s: ResultStage =>
          val job = s.activeJob.get
          partitionsToCompute.map { id =>
            val p = s.partitions(id)
            (id, getPreferredLocs(stage.rdd, p))
          }.toMap
      }
    } catch { 
    }
...
//通過最優(yōu)位置等信息構(gòu)建Task
val tasks: Seq[Task[_]] = try {
      stage match {
        case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = stage.rdd.partitions(id)
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
          }

        case stage: ResultStage =>
          val job = stage.activeJob.get
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = stage.rdd.partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics)
          }
      }
    } catch { 
    }
...
//將所有task以TaskSet的形式提交給TaskScheduler
taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

注意這里提交的TaskSet里面的Task已經(jīng)包含了該Task的優(yōu)先位置,而該優(yōu)先位置是通過getPreferredLocs方法獲取刚夺,可以簡單看看其實現(xiàn):

private def getPreferredLocsInternal(
      rdd: RDD[_],
      partition: Int,
      visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
    ...
    // 從緩存中獲取
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    }
    // 直接通過rdd的preferredLocations方法獲取
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    if (rddPrefs.nonEmpty) {
      return rddPrefs.map(TaskLocation(_))
    }
    // 遞歸從parent Rdd獲认壮蟆(窄依賴)
    rdd.dependencies.foreach {
      case n: NarrowDependency[_] =>
        for (inPart <- n.getParents(partition)) {
          val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
          if (locs != Nil) {
            return locs
          }
        }
      case _ =>
    }
    Nil
  }

無論是通過哪種方式獲取RDD分區(qū)的優(yōu)先位置,第一次計算的數(shù)據(jù)來源肯定都是通過RDD的preferredLocations方法獲取的侠姑,不同的RDD有不同的preferredLocations實現(xiàn)创橄,但是數(shù)據(jù)無非就是在三個地方存在,被cache到內(nèi)存莽红、HDFS妥畏、磁盤,而這三種方式的TaskLocation都有具體的實現(xiàn):

//數(shù)據(jù)在內(nèi)存中
private [spark] case class ExecutorCacheTaskLocation(override val host: String, executorId: String)
  extends TaskLocation {
  override def toString: String = s"${TaskLocation.executorLocationTag}${host}_$executorId"
}
//數(shù)據(jù)在磁盤上(非HDFS上)
private [spark] case class HostTaskLocation(override val host: String) extends TaskLocation {
  override def toString: String = host
}
//數(shù)據(jù)在HDFS上
private [spark] case class HDFSCacheTaskLocation(override val host: String) extends TaskLocation {
  override def toString: String = TaskLocation.inMemoryLocationTag + host
}

所以安吁,在實例化Task的時候傳的優(yōu)先位置就是這三種的其中一種醉蚁。

Locality levels生成

DAGScheduler將TaskSet提交給TaskScheduler后,TaskScheduler會為每個TaskSet創(chuàng)建一個TaskSetMagager來對其Task進行管理鬼店,在初始化TaskSetMagager的時候就會通過computeValidLocalityLevels計算該TaskSet包含的locality levels:

private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
    import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
    val levels = new ArrayBuffer[TaskLocality.TaskLocality]
    if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
        pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
      levels += PROCESS_LOCAL
    }
    if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 &&
        pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
      levels += NODE_LOCAL
    }
    if (!pendingTasksWithNoPrefs.isEmpty) {
      levels += NO_PREF
    }
    if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
        pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
      levels += RACK_LOCAL
    }
    levels += ANY
    logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
    levels.toArray
  }

程序會依次判斷該TaskSetMagager是否包含各個級別网棍,邏輯都類似,我們就細看第一個薪韩,pendingTasksForExecutor的定義與添加:

// key為executorId确沸,value為在該executor上有緩存的數(shù)據(jù)塊對應的taskid數(shù)組
private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
...
//遍歷所有該TaskSet的所有task進行添加
for (i <- (0 until numTasks).reverse) {
    addPendingTask(i)
  }
...
private def addPendingTask(index: Int) {
    for (loc <- tasks(index).preferredLocations) {
      loc match {
        case e: ExecutorCacheTaskLocation =>
          pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
        case e: HDFSCacheTaskLocation =>
          val exe = sched.getExecutorsAliveOnHost(loc.host)
          exe match {
            case Some(set) =>
              for (e <- set) {
                pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
              }
              logInfo(s"Pending task $index has a cached location at ${e.host} " +
                ", where there are executors " + set.mkString(","))
            case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
                ", but there are no executors alive there.")
          }
        case _ =>
      }
      pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
      for (rack <- sched.getRackForHost(loc.host)) {
        pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
      }
    }

    if (tasks(index).preferredLocations == Nil) {
      pendingTasksWithNoPrefs += index
    }

    allPendingTasks += index  // No point scanning this whole list to find the old task there
  }

注意這里的addPendingTask方法,會遍歷該TaskSetMagager管理的所有Task的優(yōu)先位置(上文已解析)俘陷,若是ExecutorCacheTaskLocation (緩存在內(nèi)存中)則添加對應的executorId和taskId到pendingTasksForExecutor罗捎,同時還會添加到低級別需要的pendingTasksForHost、pendingTasksForRack中拉盾,說明假設(shè)一個 task 的最優(yōu)本地性為 X桨菜,那么該 task 同時也具有其他所有本地性比X差的本地性。
回到上面的本地性級別判斷:

if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
        pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
      levels += PROCESS_LOCAL
    }

只要是看第三個判斷 pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive()))捉偏,其中倒得,pendingTasksForExecutor.keySet就是上面說明的存在有與task對應的數(shù)據(jù)塊被緩存在executor中的executorId,sched.isExecutorAlive()就是判斷參數(shù)中的 executor id 當前是否 active夭禽。所以整行代碼意思是存在有與task對應的數(shù)據(jù)塊被緩存在executor中的executors是否有active的霞掺,若有則添加PROCESS_LOCAL級別到該TaskSet的LocalityLevels中。

后面的其他本地性級別是同樣的邏輯就不細講了讹躯,區(qū)別是如判斷存在有與task對應的數(shù)據(jù)塊在某些節(jié)點中的hosts是否有Alive的等……

至此菩彬,TaskSet包含的LocalityLevels就已經(jīng)計算完缠劝。

延遲調(diào)度策略

若spark跑在yarn上,也有兩層延遲調(diào)度骗灶,第一層就是yarn盡量將spark的executor分配到有數(shù)據(jù)的nodemanager上惨恭,這一層沒有做到data locality,到spark階段耙旦,data locality更不可能了脱羡。

延遲調(diào)度的目的是為了較小網(wǎng)絡(luò)及IO開銷,在數(shù)據(jù)量大而計算邏輯簡單(task執(zhí)行時間小于數(shù)據(jù)傳輸時間)的情況下表現(xiàn)明顯免都。

Spark調(diào)度總是會盡量讓每個task以最高的本地性級別來啟動锉罐,當一個task以X本地性級別啟動,但是該本地性級別對應的所有節(jié)點都沒有空閑資源而啟動失敗琴昆,此時并不會馬上降低本地性級別啟動而是在某個時間長度內(nèi)再次以X本地性級別來啟動該task氓鄙,若超過限時時間則降級啟動。

TaskSetMagager會以某一種TaskSet包含的本地性級別遍歷每個可用executor資源嘗試在該executor上啟動當前管理的tasks业舍,那么是如何決定某個task能否在該executor上啟動呢?首先都會通過getAllowedLocalityLevel(curTime)方法計算當前TaskSetMagager中未執(zhí)行的tasks的最高本地級別:

private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
    // Remove the scheduled or finished tasks lazily
    def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
      var indexOffset = pendingTaskIds.size
      while (indexOffset > 0) {
        indexOffset -= 1
        val index = pendingTaskIds(indexOffset)
        if (copiesRunning(index) == 0 && !successful(index)) {
          return true
        } else {
          pendingTaskIds.remove(indexOffset)
        }
      }
      false
    }
    // Walk through the list of tasks that can be scheduled at each location and returns true
    // if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have
    // already been scheduled.
    def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
      val emptyKeys = new ArrayBuffer[String]
      val hasTasks = pendingTasks.exists {
        case (id: String, tasks: ArrayBuffer[Int]) =>
          if (tasksNeedToBeScheduledFrom(tasks)) {
            true
          } else {
            emptyKeys += id
            false
          }
      }
      // The key could be executorId, host or rackId
      emptyKeys.foreach(id => pendingTasks.remove(id))
      hasTasks
    }

    while (currentLocalityIndex < myLocalityLevels.length - 1) {
      val moreTasks = myLocalityLevels(currentLocalityIndex) match {
        case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
        case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
        case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
        case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
      }
      if (!moreTasks) {
        // This is a performance optimization: if there are no more tasks that can
        // be scheduled at a particular locality level, there is no point in waiting
        // for the locality wait timeout (SPARK-4939).
        lastLaunchTime = curTime
        logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
          s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
        currentLocalityIndex += 1
      } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
        // Jump to the next locality level, and reset lastLaunchTime so that the next locality
        // wait timer doesn't immediately expire
        lastLaunchTime += localityWaits(currentLocalityIndex)
        logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +
          s"${localityWaits(currentLocalityIndex)}ms")
        currentLocalityIndex += 1
      } else {
        return myLocalityLevels(currentLocalityIndex)
      }
    }
    myLocalityLevels(currentLocalityIndex)
  }

循環(huán)條件里的currentLocalityIndex是getAllowedLocalityLevel 前一次被調(diào)用返回的LocalityIndex在 myLocalityLevels 中的索引升酣,初始值為0舷暮,myLocalityLevels則是TaskSetMagager所有tasks包含的本地性級別。

  • 若myLocalityLevels(currentLocalityIndex)對應的level是否還有未執(zhí)行的task噩茄,通過moreTasksToRunIn方法獲认旅妗(邏輯很簡單:執(zhí)行完及正在執(zhí)行的task都從對應列表中移除,有未執(zhí)行過的task直接返回true)
  • 若沒有绩聘,則currentLocalityIndex 加一繼續(xù)循環(huán)(降級)
  • 若有沥割,則先判斷當前時間與上次以該級別啟動時間之差是否超過了該級別能容忍的時間限制,若未超過凿菩,則直接返回對應的LocalityLevel机杜,若超過,則currentLocalityIndex 加一繼續(xù)循環(huán)(降級)

至此衅谷,就取出了該TaskSetMagager中未執(zhí)行的tasks的最高本地性級別(取和maxLocality中級別高的作為最終的allowedLocality)椒拗。

最終決定是否在某個executor上啟動某個task的是方法dequeueTask(execId, host, allowedLocality)

private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
    : Option[(Int, TaskLocality.Value, Boolean)] =
  {
    for (index <- dequeueTaskFromList(execId, getPendingTasksForExecutor(execId))) {
      return Some((index, TaskLocality.PROCESS_LOCAL, false))
    }

    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
      for (index <- dequeueTaskFromList(execId, getPendingTasksForHost(host))) {
        return Some((index, TaskLocality.NODE_LOCAL, false))
      }
    }

    if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
      // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
      for (index <- dequeueTaskFromList(execId, pendingTasksWithNoPrefs)) {
        return Some((index, TaskLocality.PROCESS_LOCAL, false))
      }
    }
    ...
  }

通過TaskLocality.isAllowed方法來保證只以比allowedLocality級別高(可相等)的locality來啟動task,因為一個 task 擁有比最優(yōu)本地性 差的其他所有本地性获黔。這樣就保證了能盡可能的以高本地性級別來啟動一個task蚀苛。

優(yōu)化建議

可用過Spark UI來查看某個job的task的locality level,根據(jù)實際情況調(diào)整數(shù)據(jù)本地化的等待時長:

  • spark.locality.wait 全局的玷氏,適用于每個locality level堵未,默認為3s
  • spark.locality.wait.process
  • spark.locality.wait.node
  • spark.locality.wait.rack
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市盏触,隨后出現(xiàn)的幾起案子渗蟹,更是在濱河造成了極大的恐慌块饺,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,013評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件拙徽,死亡現(xiàn)場離奇詭異刨沦,居然都是意外死亡,警方通過查閱死者的電腦和手機膘怕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,205評論 2 382
  • 文/潘曉璐 我一進店門想诅,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人岛心,你說我怎么就攤上這事来破。” “怎么了忘古?”我有些...
    開封第一講書人閱讀 152,370評論 0 342
  • 文/不壞的土叔 我叫張陵徘禁,是天一觀的道長。 經(jīng)常有香客問我髓堪,道長送朱,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,168評論 1 278
  • 正文 為了忘掉前任干旁,我火速辦了婚禮驶沼,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘争群。我一直安慰自己回怜,他們只是感情好,可當我...
    茶點故事閱讀 64,153評論 5 371
  • 文/花漫 我一把揭開白布换薄。 她就那樣靜靜地躺著玉雾,像睡著了一般。 火紅的嫁衣襯著肌膚如雪轻要。 梳的紋絲不亂的頭發(fā)上复旬,一...
    開封第一講書人閱讀 48,954評論 1 283
  • 那天,我揣著相機與錄音伦腐,去河邊找鬼赢底。 笑死,一個胖子當著我的面吹牛柏蘑,可吹牛的內(nèi)容都是我干的幸冻。 我是一名探鬼主播,決...
    沈念sama閱讀 38,271評論 3 399
  • 文/蒼蘭香墨 我猛地睜開眼咳焚,長吁一口氣:“原來是場噩夢啊……” “哼洽损!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起革半,我...
    開封第一講書人閱讀 36,916評論 0 259
  • 序言:老撾萬榮一對情侶失蹤碑定,失蹤者是張志新(化名)和其女友劉穎流码,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體延刘,經(jīng)...
    沈念sama閱讀 43,382評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡漫试,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 35,877評論 2 323
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了碘赖。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片驾荣。...
    茶點故事閱讀 37,989評論 1 333
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖普泡,靈堂內(nèi)的尸體忽然破棺而出播掷,到底是詐尸還是另有隱情,我是刑警寧澤撼班,帶...
    沈念sama閱讀 33,624評論 4 322
  • 正文 年R本政府宣布歧匈,位于F島的核電站,受9級特大地震影響砰嘁,放射性物質(zhì)發(fā)生泄漏件炉。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,209評論 3 307
  • 文/蒙蒙 一矮湘、第九天 我趴在偏房一處隱蔽的房頂上張望妻率。 院中可真熱鬧,春花似錦板祝、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,199評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至伏伯,卻和暖如春橘洞,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背说搅。 一陣腳步聲響...
    開封第一講書人閱讀 31,418評論 1 260
  • 我被黑心中介騙來泰國打工炸枣, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人弄唧。 一個月前我還...
    沈念sama閱讀 45,401評論 2 352
  • 正文 我出身青樓适肠,卻偏偏與公主長得像,于是被迫代替她去往敵國和親候引。 傳聞我的和親對象是個殘疾皇子侯养,可洞房花燭夜當晚...
    茶點故事閱讀 42,700評論 2 345

推薦閱讀更多精彩內(nèi)容