前言
Spark數(shù)據(jù)本地化即移動計算而不是移動數(shù)據(jù)名惩,而現(xiàn)實又是殘酷的再扭,不是想要在數(shù)據(jù)塊的地方計算就有足夠的資源提供,為了讓task能盡可能的以最優(yōu)本地化級別(Locality Levels)來啟動,Spark的延遲調(diào)度應運而生不脯,資源不夠可在該Locality Levels對應的限制時間內(nèi)重試综苔,超過限制時間后還無法啟動則降低Locality Levels再嘗試啟動……
本地化級別(Locality Levels)
- PROCESS_LOCAL:進程本地化惩系,代碼和數(shù)據(jù)在同一個進程中,也就是在同一個executor中如筛;計算數(shù)據(jù)的task由executor執(zhí)行堡牡,數(shù)據(jù)在executor的BlockManager中,性能最好
- NODE_LOCAL:節(jié)點本地化杨刨,代碼和數(shù)據(jù)在同一個節(jié)點中晤柄;比如說,數(shù)據(jù)作為一個HDFS block塊在節(jié)點上妖胀,而task在節(jié)點上某個executor中運行芥颈;或者是數(shù)據(jù)和task在一個節(jié)點上的不同executor中,數(shù)據(jù)需要在進程間進行傳輸
- NO_PREF:對于task來說做粤,數(shù)據(jù)從哪里獲取都一樣浇借,沒有好壞之分,比如說SparkSQL讀取MySql中的數(shù)據(jù)
- RACK_LOCAL:機架本地化怕品,數(shù)據(jù)和task在一個機架的兩個節(jié)點上妇垢,數(shù)據(jù)需要通過網(wǎng)絡(luò)在節(jié)點之間進行傳輸
- ANY:數(shù)據(jù)和task可能在集群中的任何地方,而且不在一個機架中,性能最差
這些Task的本地化級別其實描述的就是計算與數(shù)據(jù)的位置關(guān)系闯估,這個最終的關(guān)系是如何產(chǎn)生的呢灼舍?接下來對其來龍去脈進行詳細的講解。
DAGScheduler提交tasks
DAGScheduler對job進行stage劃分完后涨薪,會通過submitMissingTasks方法將Stage以TaskSet的形式提交給TaskScheduler骑素,看看該方法關(guān)于位置優(yōu)先的一些代碼:
...
// 獲取還未執(zhí)行或未成功執(zhí)行分區(qū)的id
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
...
// 通過getPreferredLocs方法獲取rdd該分區(qū)的優(yōu)先位置
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
val job = s.activeJob.get
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
}
...
//通過最優(yōu)位置等信息構(gòu)建Task
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
}
case stage: ResultStage =>
val job = stage.activeJob.get
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics)
}
}
} catch {
}
...
//將所有task以TaskSet的形式提交給TaskScheduler
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
注意這里提交的TaskSet里面的Task已經(jīng)包含了該Task的優(yōu)先位置,而該優(yōu)先位置是通過getPreferredLocs方法獲取刚夺,可以簡單看看其實現(xiàn):
private def getPreferredLocsInternal(
rdd: RDD[_],
partition: Int,
visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
...
// 從緩存中獲取
val cached = getCacheLocs(rdd)(partition)
if (cached.nonEmpty) {
return cached
}
// 直接通過rdd的preferredLocations方法獲取
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}
// 遞歸從parent Rdd獲认壮蟆(窄依賴)
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
case _ =>
}
Nil
}
無論是通過哪種方式獲取RDD分區(qū)的優(yōu)先位置,第一次計算的數(shù)據(jù)來源肯定都是通過RDD的preferredLocations方法獲取的侠姑,不同的RDD有不同的preferredLocations實現(xiàn)创橄,但是數(shù)據(jù)無非就是在三個地方存在,被cache到內(nèi)存莽红、HDFS妥畏、磁盤,而這三種方式的TaskLocation都有具體的實現(xiàn):
//數(shù)據(jù)在內(nèi)存中
private [spark] case class ExecutorCacheTaskLocation(override val host: String, executorId: String)
extends TaskLocation {
override def toString: String = s"${TaskLocation.executorLocationTag}${host}_$executorId"
}
//數(shù)據(jù)在磁盤上(非HDFS上)
private [spark] case class HostTaskLocation(override val host: String) extends TaskLocation {
override def toString: String = host
}
//數(shù)據(jù)在HDFS上
private [spark] case class HDFSCacheTaskLocation(override val host: String) extends TaskLocation {
override def toString: String = TaskLocation.inMemoryLocationTag + host
}
所以安吁,在實例化Task的時候傳的優(yōu)先位置就是這三種的其中一種醉蚁。
Locality levels生成
DAGScheduler將TaskSet提交給TaskScheduler后,TaskScheduler會為每個TaskSet創(chuàng)建一個TaskSetMagager來對其Task進行管理鬼店,在初始化TaskSetMagager的時候就會通過computeValidLocalityLevels計算該TaskSet包含的locality levels:
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
levels += PROCESS_LOCAL
}
if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 &&
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
levels += NODE_LOCAL
}
if (!pendingTasksWithNoPrefs.isEmpty) {
levels += NO_PREF
}
if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
levels += RACK_LOCAL
}
levels += ANY
logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
levels.toArray
}
程序會依次判斷該TaskSetMagager是否包含各個級別网棍,邏輯都類似,我們就細看第一個薪韩,pendingTasksForExecutor的定義與添加:
// key為executorId确沸,value為在該executor上有緩存的數(shù)據(jù)塊對應的taskid數(shù)組
private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
...
//遍歷所有該TaskSet的所有task進行添加
for (i <- (0 until numTasks).reverse) {
addPendingTask(i)
}
...
private def addPendingTask(index: Int) {
for (loc <- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =>
pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
case e: HDFSCacheTaskLocation =>
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) =>
for (e <- set) {
pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
", but there are no executors alive there.")
}
case _ =>
}
pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
for (rack <- sched.getRackForHost(loc.host)) {
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}
if (tasks(index).preferredLocations == Nil) {
pendingTasksWithNoPrefs += index
}
allPendingTasks += index // No point scanning this whole list to find the old task there
}
注意這里的addPendingTask方法,會遍歷該TaskSetMagager管理的所有Task的優(yōu)先位置(上文已解析)俘陷,若是ExecutorCacheTaskLocation (緩存在內(nèi)存中)則添加對應的executorId和taskId到pendingTasksForExecutor罗捎,同時還會添加到低級別需要的pendingTasksForHost、pendingTasksForRack中拉盾,說明假設(shè)一個 task 的最優(yōu)本地性為 X桨菜,那么該 task 同時也具有其他所有本地性比X差的本地性。
回到上面的本地性級別判斷:
if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
levels += PROCESS_LOCAL
}
只要是看第三個判斷 pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive()))捉偏,其中倒得,pendingTasksForExecutor.keySet就是上面說明的存在有與task對應的數(shù)據(jù)塊被緩存在executor中的executorId,sched.isExecutorAlive()就是判斷參數(shù)中的 executor id 當前是否 active夭禽。所以整行代碼意思是存在有與task對應的數(shù)據(jù)塊被緩存在executor中的executors是否有active的霞掺,若有則添加PROCESS_LOCAL級別到該TaskSet的LocalityLevels中。
后面的其他本地性級別是同樣的邏輯就不細講了讹躯,區(qū)別是如判斷存在有與task對應的數(shù)據(jù)塊在某些節(jié)點中的hosts是否有Alive的等……
至此菩彬,TaskSet包含的LocalityLevels就已經(jīng)計算完缠劝。
延遲調(diào)度策略
若spark跑在yarn上,也有兩層延遲調(diào)度骗灶,第一層就是yarn盡量將spark的executor分配到有數(shù)據(jù)的nodemanager上惨恭,這一層沒有做到data locality,到spark階段耙旦,data locality更不可能了脱羡。
延遲調(diào)度的目的是為了較小網(wǎng)絡(luò)及IO開銷,在數(shù)據(jù)量大而計算邏輯簡單(task執(zhí)行時間小于數(shù)據(jù)傳輸時間)的情況下表現(xiàn)明顯免都。
Spark調(diào)度總是會盡量讓每個task以最高的本地性級別來啟動锉罐,當一個task以X本地性級別啟動,但是該本地性級別對應的所有節(jié)點都沒有空閑資源而啟動失敗琴昆,此時并不會馬上降低本地性級別啟動而是在某個時間長度內(nèi)再次以X本地性級別來啟動該task氓鄙,若超過限時時間則降級啟動。
TaskSetMagager會以某一種TaskSet包含的本地性級別遍歷每個可用executor資源嘗試在該executor上啟動當前管理的tasks业舍,那么是如何決定某個task能否在該executor上啟動呢?首先都會通過getAllowedLocalityLevel(curTime)方法計算當前TaskSetMagager中未執(zhí)行的tasks的最高本地級別:
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
// Remove the scheduled or finished tasks lazily
def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
var indexOffset = pendingTaskIds.size
while (indexOffset > 0) {
indexOffset -= 1
val index = pendingTaskIds(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
return true
} else {
pendingTaskIds.remove(indexOffset)
}
}
false
}
// Walk through the list of tasks that can be scheduled at each location and returns true
// if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have
// already been scheduled.
def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
val emptyKeys = new ArrayBuffer[String]
val hasTasks = pendingTasks.exists {
case (id: String, tasks: ArrayBuffer[Int]) =>
if (tasksNeedToBeScheduledFrom(tasks)) {
true
} else {
emptyKeys += id
false
}
}
// The key could be executorId, host or rackId
emptyKeys.foreach(id => pendingTasks.remove(id))
hasTasks
}
while (currentLocalityIndex < myLocalityLevels.length - 1) {
val moreTasks = myLocalityLevels(currentLocalityIndex) match {
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
}
if (!moreTasks) {
// This is a performance optimization: if there are no more tasks that can
// be scheduled at a particular locality level, there is no point in waiting
// for the locality wait timeout (SPARK-4939).
lastLaunchTime = curTime
logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
currentLocalityIndex += 1
} else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
// Jump to the next locality level, and reset lastLaunchTime so that the next locality
// wait timer doesn't immediately expire
lastLaunchTime += localityWaits(currentLocalityIndex)
logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +
s"${localityWaits(currentLocalityIndex)}ms")
currentLocalityIndex += 1
} else {
return myLocalityLevels(currentLocalityIndex)
}
}
myLocalityLevels(currentLocalityIndex)
}
循環(huán)條件里的currentLocalityIndex是getAllowedLocalityLevel 前一次被調(diào)用返回的LocalityIndex在 myLocalityLevels 中的索引升酣,初始值為0舷暮,myLocalityLevels則是TaskSetMagager所有tasks包含的本地性級別。
- 若myLocalityLevels(currentLocalityIndex)對應的level是否還有未執(zhí)行的task噩茄,通過moreTasksToRunIn方法獲认旅妗(邏輯很簡單:執(zhí)行完及正在執(zhí)行的task都從對應列表中移除,有未執(zhí)行過的task直接返回true)
- 若沒有绩聘,則currentLocalityIndex 加一繼續(xù)循環(huán)(降級)
- 若有沥割,則先判斷當前時間與上次以該級別啟動時間之差是否超過了該級別能容忍的時間限制,若未超過凿菩,則直接返回對應的LocalityLevel机杜,若超過,則currentLocalityIndex 加一繼續(xù)循環(huán)(降級)
至此衅谷,就取出了該TaskSetMagager中未執(zhí)行的tasks的最高本地性級別(取和maxLocality中級別高的作為最終的allowedLocality)椒拗。
最終決定是否在某個executor上啟動某個task的是方法dequeueTask(execId, host, allowedLocality)
private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value, Boolean)] =
{
for (index <- dequeueTaskFromList(execId, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
for (index <- dequeueTaskFromList(execId, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL, false))
}
}
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
for (index <- dequeueTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
}
...
}
通過TaskLocality.isAllowed方法來保證只以比allowedLocality級別高(可相等)的locality來啟動task,因為一個 task 擁有比最優(yōu)本地性 差的其他所有本地性获黔。這樣就保證了能盡可能的以高本地性級別來啟動一個task蚀苛。
優(yōu)化建議
可用過Spark UI來查看某個job的task的locality level,根據(jù)實際情況調(diào)整數(shù)據(jù)本地化的等待時長:
- spark.locality.wait 全局的玷氏,適用于每個locality level堵未,默認為3s
- spark.locality.wait.process
- spark.locality.wait.node
- spark.locality.wait.rack