-
Task Locality
Data locality can have a major impact on the performance of Spark jobs. If data and the code that operates on it are together then computation tends to be fast. But if code and data are separated, one must move to the other. Typically it is faster to ship serialized code from place to place than a chunk of data because code size is much smaller than data. Spark builds its scheduling around this general principle of data locality.
?Data locality is how close data is to the code processing it. There are several levels of locality based on the data’s current location. In order from closest to farthest:
PROCESS_LOCAL - data is in the same JVM as the running code. This is the best locality possible
NODE_LOCAL - data is on the same node. Examples might be in HDFS on the same node, or in another executor on the same node. This is a little slower than PROCESS_LOCAL because the data has to travel between processes
NO_PREF - data is accessed equally quickly from anywhere and has no locality preference
RACK_LOCAL - data is on the same rack of servers. Data is on a different server on the same rack so needs to be sent over the network, typically through a single switch
ANY - data is elsewhere on the network and not in the same rack
?Spark prefers to schedule all tasks at the best locality level, but this is not always possible. In situations where there is no unprocessed data on any idle executor, Spark switches to lower locality levels. There are two options:
a) wait until a busy CPU frees up to start a task on data on the same server, or
b) immediately start a new task in a farther away place that requires moving data there.
What Spark typically does is wait a bit in the hopes that a busy CPU frees up. Once that timeout expires, it starts moving the data from far away to the free CPU. The wait timeout for fallback between each level can be configured individually or all together in one parameter; see the spark.locality parameters on the configuration page for details. You should increase these settings if your tasks are long and see poor locality, but the default usually works well.
-
1瞎暑、獲取partition位置信息
[DAGScheduler]->private def submitMissingTasks(stage: Stage, jobId: Int)
...
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id =>
/**
getPreferredLocs - 獲取partition數(shù)據(jù)的位置信息,下文將分析不
同情況下獲取該信息的方式。
**/
(id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
val job = s.activeJob.getpartitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
}
...
[DAGScheduler]->submitMissingTasks->getPreferredLocsInternal
private def getPreferredLocsInternal(
rdd: RDD[_],
partition: Int,
visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
...
/**
cacheLocs 維護RDD的partitions 的 location信息,該信息是
TaskLocation的實例。
如果cacheLocs沒有當前partition的location信息,則會執(zhí)行如下邏輯:
如果RDD的storageLevel為空返回nil,并填入cacheLocs,否則會通過
blocakManagerMaster來獲取持有該partition信息的 blockManager
并實例化ExecutorCacheTaskLocation放入cacheLocs中。
具體參看getCecheLocs方法。關于block及cache細節(jié)將在Storage章
節(jié)具體分析。
**/
val cached = getCacheLocs(rdd)(partition)
if (cached.nonEmpty) {
return cached
}
/**
RDD有個方法preferredLocations,該方法首先嘗試獲取
partition的checkPoint信息,如果未進行過checkPoint則調(diào)用
getPreferredLocations(split)肖揣,不同的RDD有不同的實現(xiàn)。例如:
HadoopRdd即通過Hadoop InputSplit 來獲取當前partition的位置浮入。
如果當前RDD既未cache也不是輸入RDD龙优,則進行下一個邏輯。
**/
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}
/**
當RDD未cache也不是輸入RDD即無法通過preferredLocations來獲取
partition位置信息時則通過遞歸尋找父RDD對應的partition的位置信息
舵盈,該方式只對窄依賴有效陋率。
**/
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
case _ =>
}
Nil}
-
2球化、Task構(gòu)造
[DAGScheduler]->private def submitMissingTasks(stage: Stage, jobId: Int)
...
/**
根據(jù)不同的stage類型構(gòu)造不同類型的task。
每個partition對應一個task且每個task都包含目標partition的location信
息,最終所有tasks將作為taskSet進行提交瓦糟。
注:task的實際執(zhí)行邏輯已經(jīng)序列化到taskBinary中并broadcast到每個
executor上筒愚,此處構(gòu)造的tasks只是加上了location信息,目的是通過driver端的TaskScheduler進行調(diào)度,并不會將該taskSet進行序列化和廣播。
**/
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
...
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
}
case stage: ResultStage =>
val job = stage.activeJob.get
partitionsToCompute.map { id =>
...
new ResultTask(stage.id, stage.latestInfo.attemptId,taskBinary,
part, locs, id, properties, stage.latestInfo.taskMetrics)
}
}
}
...
if (tasks.size > 0) {
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
/**
構(gòu)造TaskSet進行提交,大部分情況下使用的是
TaskSchedulerImpl,DAG通過調(diào)用其實例進行task提交,而
TaskSchedulerImpl通過TaskSetManager的實例
對taskSet進行管理(
TaskSetmanager在實例化過程中會對
task進行executor分配,有且僅有兩種分配方式根據(jù)上述
preferedLocation類型而定:
ExecutorCacheTaskLocation即cache在executor上的RDD:
HDFSCacheTaskLocation:及hdfs輸入數(shù)據(jù)或者checkpoint數(shù)據(jù)
詳見[TaskSetManager]->addPendingTask
)菩浙。細節(jié)在'spark調(diào)度'章節(jié)
會描述巢掺。
**/
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
}
...
-
2、Task提交
[TaskSchedulerImpl]->def submitTasks(taskSet: TaskSet)
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " +
tasks.length + " tasks")
this.synchronized {
/**Schedules the tasks within a single TaskSet in the
TaskSchedulerImpl. This class keeps track of each task, retries
tasks if they fail (up to a limited number of times), and handles
locality-aware scheduling for this TaskSet via delay scheduling.
The main interfaces to it are resourceOffer, which asks the
TaskSet whether it wants to run a task on one node,and
statusUpdate, which tells it that one of its tasks changed state
(e.g. finished).
**/
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
...
/**
有兩種實現(xiàn)對應不同的task調(diào)度算法(與OS中的調(diào)度一樣):
1劲蜻、FIFOSchedulableBuilder
2陆淀、FairSchedulableBuilder
schedulableBuilder中持有Pool用于管理taskmanager,并根據(jù)不
同的調(diào)度算法返回不同順序的taskmanager。
同時該pool的checkSpeculatableTasks方法用于對開啟了speculate
的job進行task的重復執(zhí)行先嬉。
此處實際操作是將taskManager放入pool中,進行異步調(diào)度轧苫。
**/
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
/**
on yarn的情況下此處的backend為CoarseGrainedSchedulerBackend
的實例,該backend持有當前job執(zhí)行狀態(tài)下所有executor信息,并可
對其進行管理,該backend啟動在driver端疫蔓。此處調(diào)用最終會調(diào)用
backend.makeOffers進行資源申請并觸發(fā)task調(diào)度含懊。
**/
backend.reviveOffers()
}
-
3、Task 資源申請及調(diào)度
[CoarseGrainedSchedulerBackend]->private def makeOffers()**
/**
該方法會在每次系統(tǒng)資源發(fā)生變化時被調(diào)用,例如executor向backend
進行注冊時,task完成時衅胀。executor向backend發(fā)送消息,backend在
處理邏輯最后一步觸發(fā)一次task調(diào)度邏輯岔乔。spark中的所有異步調(diào)度
都是類似的處理方法,例如standalone模式下對executor的調(diào)度、
waitingStage的調(diào)度等滚躯。
**/
private def makeOffers() {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
/**
獲取系統(tǒng)當前每個executor的可用cpu資源以case class workerOffer
返回雏门。在后面對task進行分配時以此為依據(jù)執(zhí)行l(wèi)ocality邏輯。
**/
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq
/**
該方法會序列化TaskDescription并發(fā)送到相應的executor上進行
邏輯執(zhí)行掸掏。
scheduler.resourceOffers(workOffers):進行具體的task分配
邏輯茁影。
**/
launchTasks(scheduler.resourceOffers(workOffers))
}
[CoarseGrainedSchedulerBackend]->private def makeOffers()
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]]
= synchronized {
var newExecAvail = false
/**
如果當前系統(tǒng)executors中有新加入的,這里
executorAdded(o.executorId, o.host)最終會調(diào)用DAGScheduler中
的submitWaitingStages()進行一次stage的提交,正如上限task
的調(diào)度一樣阅束,當系統(tǒng)資源發(fā)生變化時即觸發(fā)一次調(diào)度邏輯呼胚。
**/
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
newExecAvail = true
}
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
val shuffledOffers = Random.shuffle(offers)
/**
Build a list of tasks to assign to each worker.
根據(jù)每個executor可用core的數(shù)量進行task分配,每個core對應一個
task。
**/
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
/**
此處如上所述根據(jù)不同的調(diào)度算法得到不同排序的task,也即
最終的執(zhí)行順序息裸。
FIFO或者FAIR。
**/
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
/**
當啟動了新的executor,這里會重新計算task的locality沪编。
TaskSetManager.recomputeLocality
**/
if (newExecAvail) {
taskSet.executorAdded()
}
}
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}