啟動(dòng)Spark Speculative后监透,有時(shí)候運(yùn)行任務(wù)會(huì)發(fā)現(xiàn)如下提示:
WARN TaskSetManager: Lost task 55.0 in stage 15.0 (TID 20815, spark047216)
org.apache.spark.executor.CommitDeniedException: attempt_201604191557_0015_m_000055_0: Not committed because the driver did not authorize commit
啟動(dòng) Speculative 后桶错,運(yùn)行較慢的task會(huì)在其他executor上同時(shí)再啟動(dòng)一個(gè)相同的task,如果其中一個(gè)task執(zhí)行完畢,相同的另一個(gè)task就會(huì)被禁止提交胀蛮。因此產(chǎn)生了這個(gè)WARN院刁。
這個(gè)WARN是因?yàn)閠ask提交commit被driver拒絕引發(fā),這個(gè)錯(cuò)誤不會(huì)被統(tǒng)計(jì)在stage的failure中醇滥,這樣做的目的是防止你看到一些具有欺騙性的提示黎比。
相關(guān)源碼
org.apache.spark.executor
case cDE: CommitDeniedException =>
val reason = cDE.toTaskEndReason
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
org.apache.spark.executor.CommitDeniedException
private[spark] class CommitDeniedException(
msg: String,
jobID: Int,
splitID: Int,
attemptNumber: Int)
extends Exception(msg) {
def toTaskEndReason: TaskEndReason = TaskCommitDenied(jobID, splitID, attemptNumber)
}
org.apache.spark.TaskCommitDenied
case class TaskCommitDenied(
jobID: Int,
partitionID: Int,
attemptNumber: Int) extends TaskFailedReason {
override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
s" for job: $jobID, partition: $partitionID, attemptNumber: $attemptNumber"
override def countTowardsTaskFailures: Boolean = false
}