關(guān)于executor和task的概念可以參考官方文檔
本文使用的源碼是spark 2.0.0版本
Task的數(shù)量
根據(jù)類DAGScheduler
中的submitMissingTasks
方法可以知道损俭,在stage中會(huì)為每個(gè)需要計(jì)算的partition生成一個(gè)task医增,換句話說也就是每個(gè)task處理一個(gè)partition把沼。
//From submitMissingTasks
......
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
}
case stage: ResultStage =>
val job = stage.activeJob.get
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics)
}
}
}
......
Task的最大并發(fā)數(shù)
當(dāng)task被提交到executor之后,會(huì)根據(jù)executor可用的cpu核數(shù),決定一個(gè)executor中最多同時(shí)運(yùn)行多少個(gè)task箫津。在類TaskSchedulerImpl
的resourceOfferSingleTaskSet
方法中婿崭,CPUS_PER_TASK
的定義為val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
,也就是說默認(rèn)情況下一個(gè)task對(duì)應(yīng)cpu的一個(gè)核躺枕。如果一個(gè)executor可用cpu核數(shù)為8服猪,那么一個(gè)executor中最多同是并發(fā)執(zhí)行8個(gè)task供填;假如設(shè)置spark.task.cpus
為2,那么同時(shí)就只能運(yùn)行4個(gè)task罢猪。
//From resourceOfferSingleTaskSet
......
if (availableCpus(i) >= CPUS_PER_TASK) {
try {
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
executorIdToTaskCount(execId) += 1
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
}
} catch {
case e: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
// Do not offer resources for this task, but don't throw an error to allow other
// task sets to be submitted.
return launchedTask
}
}
......
Yarn的task與Spark中task的區(qū)別
在Yarn的NodeManager節(jié)點(diǎn)上啟動(dòng)一個(gè)map task或者reduce task近她,在物理上啟動(dòng)的是一個(gè)jvm進(jìn)程;而Spark的task是Executor進(jìn)程中的一個(gè)線程膳帕。