Spark 的RDD 支持兩種類型的操作:
- transformations : 從一個(gè)已經(jīng)存在的數(shù)據(jù)集創(chuàng)建一個(gè)新的數(shù)據(jù)集
- actions:在數(shù)據(jù)集上進(jìn)行計(jì)算后返回driver程序一個(gè)值
首先建議看代碼的時(shí)候先看看spark相關(guān)的原理介紹,比如Stage的劃分
今天從foreach 算子學(xué)習(xí)一下spark的源碼:
class RDD.scala
// Actions (launch a job to return a value to the user program)
/**
* Applies a function f to all elements of this RDD.
*/
def foreach(f: T => Unit): Unit = withScope {
//首先執(zhí)行清除閉包锡移,使得能夠被序列化發(fā)送到tasks上砸逊,如果設(shè)置了checkSerializable 侣滩,會(huì) 檢查是否可以序列化
val cleanF = sc.clean(f)
//對(duì)一個(gè)rdd的所有partitons執(zhí)行cleanF并將結(jié)果作為一個(gè)數(shù)組返回
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
進(jìn)入SparkContext
再看下runJob
/**
* Run a job on all partitions in an RDD and return the results in an array.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @return in-memory collection with a result of the job (each collection element will contain
* a result from one partition)
*/
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length) // 入?yún)⑹莚dd义桂,傳入的方法漱竖,和分區(qū)的數(shù)組
}
這個(gè)函數(shù)執(zhí)行的runJob 入?yún)⒍嗔艘粋€(gè)TaskContext
/**
* Run a function on a given set of partitions in an RDD and return the results as an array.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like `first()`
* @return in-memory collection with a result of the job (each collection element will contain
* a result from one partition)
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int]): Array[U] = {
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}
/**
* Run a function on a given set of partitions in an RDD and return the results as an array.
* The function that is run against each partition additionally takes `TaskContext` argument.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like `first()`
* @return in-memory collection with a result of the job (each collection element will contain
* a result from one partition)
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int]): Array[U] = {
//添加了一個(gè)結(jié)果數(shù)組
val results = new Array[U](partitions.size)
runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
results
}
下面這個(gè)開(kāi)始進(jìn)入比較核心的了org.apache.spark.scheduler.DAGScheduler
/**
* Run a function on a given set of partitions in an RDD and pass the results to the given
* handler function. This is the main entry point for all actions in Spark.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like `first()`
* @param resultHandler callback to pass each result to
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
//判斷sparkContext是否已經(jīng)退出抗斤,這是一個(gè)AtomicBoolean 類型
throw new IllegalStateException("SparkContext has been shutdown")
}
// 獲取代碼的位置囚企,就是代碼的某一行,打印如下 reduce at SparkAccumulatorTest.scala:43
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
//dagScheduluer 開(kāi)始執(zhí)行
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
//#進(jìn)度條相關(guān)瑞眼,標(biāo)記所有結(jié)束的stage
progressBar.foreach(_.finishAll())
//doCheckPoint 會(huì)被父RDD遞歸調(diào)用龙宏,通過(guò)保存的形式執(zhí)行checkpoint
rdd.doCheckpoint()
}
/**
* Run an action job on the given RDD and pass all the results to the resultHandler function as
* they arrive.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like first()
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
* @note Throws `Exception` when the job fails
*/
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)//提交job,返回一個(gè)waiter
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}
繼續(xù)看submitJob
/**
* Submit an action job to the scheduler.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like first()
* @param callSite where in the user program this job was called
* @param resultHandler callback to pass each result to
* @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*
* @return a JobWaiter object that can be used to block until the job finishes executing
* or can be used to cancel the job.
*
* @throws IllegalArgumentException when partitions ids are illegal
*/
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
//校驗(yàn)分區(qū)數(shù)是否合法
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
//jobid加1伤疙,原子操作银酗,調(diào)用unsafe方法
val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
// Return immediately if the job is running 0 tasks
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.size > 0)
//方法居然還可以這么用,第一次見(jiàn)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
//創(chuàng)建一個(gè)jobwaiter徒像,等待DAGScheduler的調(diào)用結(jié)束花吟,當(dāng)一個(gè)job結(jié)束的時(shí)候回調(diào)用resultHandler進(jìn)行處理
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
//然后創(chuàng)建一個(gè)JobSummmited的對(duì)象,并提交到eventProcessLoop
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
首先先看下eventProcessLoop,事件通知的機(jī)制
主要四個(gè)個(gè)方法:
- onReceive 入口方法厨姚,執(zhí)行 doOnReceive 以及關(guān)閉timerContext
- doOnReceive 具體的每種事件的執(zhí)行邏輯
- onError
- onStop
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {
private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer
/**
* The main event loop of the DAG scheduler.
*/
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
doOnReceive(event)
} finally {
timerContext.stop()
}
}
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
//job 提交
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
//MapStage提交
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
//Stage 取消
case StageCancelled(stageId, reason) =>
dagScheduler.handleStageCancellation(stageId, reason)
// Job取消
case JobCancelled(jobId, reason) =>
dagScheduler.handleJobCancellation(jobId, reason)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId, reason) =>
val filesLost = reason match {
case SlaveLost(_, true) => true
case _ => false
}
dagScheduler.handleExecutorLost(execId, filesLost)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}
override def onError(e: Throwable): Unit = {
logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e)
try {
dagScheduler.doCancelAllJobs()
} catch {
case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
}
dagScheduler.sc.stopInNewThread()
}
override def onStop(): Unit = {
// Cancel any active jobs in postStop hook
dagScheduler.cleanUpAfterSchedulerStop()
}
}
著重看下 DAGScheduler 的 handleJobSubmitted衅澈,首先createResultStage
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
//根據(jù)當(dāng)前的Rdd和分區(qū) 創(chuàng)建Stage 這兒與spark 1.6.3不一樣 newResultStage
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}
org.apache.spark.scheduler.DAGScheduler#createResultStage
創(chuàng)建ResultStage,也就是job的最后一個(gè)Stage
/**
* Create a ResultStage associated with the provided jobId.
*/
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
//關(guān)鍵的地方:這里需要獲取或者創(chuàng)建父Stage,這里只是最后一個(gè)stage的父stage谬墙,其實(shí)就是為了構(gòu)建下面stage的parents的入?yún)⒔癫肌tage的入?yún)⒂腥缦聨讉€(gè),id拭抬,rdd部默,function,partition造虎,parrents傅蹂,jobid
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
org.apache.spark.scheduler.DAGScheduler#getOrCreateParentStages
/**
* Get or create the list of parent stages for a given RDD. The new Stages will be created with
* the provided firstJobId.
*/
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
org.apache.spark.scheduler.DAGScheduler#getShuffleDependencies
返回第一個(gè)shuffer依賴的RDD的Dependency,就是找到最后一個(gè)stage的依賴
/**
* Returns shuffle dependencies that are immediate parents of the given RDD.
*
* This function will not return more distant ancestors. For example, if C has a shuffle
* dependency on B which has a shuffle dependency on A:
*
* A <-- B <-- C
*
* calling this function with rdd C will only return the B <-- C dependency.
*
* This function is scheduler-visible for the purpose of unit testing.
*/
private[scheduler] def getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
//創(chuàng)建一個(gè)stack,如果是非shuffer依賴則push,shuffer依賴的話則pop份蝴,最后返回父shuffer依賴
val waitingForVisit = new Stack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
//如果是shuffle依賴則加入到parents的set中
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
//如果是非shuffle依賴則壓入堆棧
waitingForVisit.push(dependency.rdd)
}
}
}
parents
}
org.apache.spark.scheduler.DAGScheduler#getOrCreateShuffleMapStage
/**
* Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the
* shuffle map stage doesn't already exist, this method will create the shuffle map stage in
* addition to any missing ancestor shuffle map stages.如果在shuffleIdToMapStage已經(jīng)存在則返回shufflemapstage犁功,否則的話這個(gè)方法將會(huì)創(chuàng)建一個(gè)shuffle map stage
*/
private def getOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) =>
stage
case None =>
// Create stages for all missing ancestor shuffle dependencies.
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
// Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
// that were not already in shuffleIdToMapStage, it's possible that by the time we
// get to a particular dependency in the foreach loop, it's been added to
// shuffleIdToMapStage by the stage creation process for an earlier dependency. See
// SPARK-13902 for more information.
if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId)
}
}
// Finally, create a stage for the given shuffle dependency.
createShuffleMapStage(shuffleDep, firstJobId)
}
}
這塊代碼描述比較詳細(xì)我簡(jiǎn)單翻譯下
如果shuffleIdToMapStage中存在一個(gè)shuffle map stage 則返回,如果不存在則創(chuàng)建一個(gè)新的shuffle map stage,什么是 ShuffleMapStages
/**
* ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle.
* They occur right before each shuffle operation, and might contain multiple pipelined operations
* before that (e.g. map and filter). When executed, they save map output files that can later be
* fetched by reduce tasks. The `shuffleDep` field describes the shuffle each stage is part of,
* and variables like `outputLocs` and `numAvailableOutputs` track how many map outputs are ready.
*
* ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage.
* For such stages, the ActiveJobs that submitted them are tracked in `mapStageJobs`. Note that
* there can be multiple ActiveJobs trying to compute the same shuffle map stage.
*/
ShuffleMapStage 就是shuflle 過(guò)程中DAG執(zhí)行的中間狀態(tài)的stage婚夫,發(fā)生在shuffle操作之前浸卦,可能包含多個(gè)pipeline操作。一旦執(zhí)行結(jié)束案糙,它就會(huì)保存map輸出文件限嫌。這些文件會(huì)被后續(xù)的reduce task 獲取。
org.apache.spark.scheduler.DAGScheduler#getMissingAncestorShuffleDependencies
將所有的寬依賴壓入到ancestors时捌,代碼合上面的分析基本一致怒医,不同的是這個(gè)會(huì)從最后向前一直查找到第一個(gè)rdd,這就是DAG的生成方式奢讨。
rdd1 = rdd0.reduce
rdd2 = rdd1.map
rdd3 = rdd2.reduce
rdd4 = rdd3.foreach
則根據(jù)以下邏輯 rdd4 入棧稚叹,進(jìn)入循環(huán),未訪問(wèn)過(guò)禽笑,獲取shuffle依賴入录,返回rdd2,然后將ShuffleDependency壓入ancestors,壓入rdd2,然后rdd2出棧佳镜,rdd2獲得shuffle依賴 rdd0,壓入ancestors僚稿。
/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
private def getMissingAncestorShuffleDependencies(
rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
val ancestors = new Stack[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
getShuffleDependencies(toVisit).foreach { shuffleDep =>
if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
ancestors.push(shuffleDep)
waitingForVisit.push(shuffleDep.rdd)
} // Otherwise, the dependency and its ancestors have already been registered.
}
}
}
ancestors
}
org.apache.spark.scheduler.DAGScheduler#createShuffleMapStage
遍歷壓入的stack
的ShuffleDependency創(chuàng)建stage
創(chuàng)建一個(gè)ShuffleMapStage,能夠產(chǎn)出shuffle依賴的分區(qū)蟀伸。如果之前的一個(gè)stage已經(jīng)產(chǎn)出了同樣的shuffle數(shù)據(jù)蚀同,這個(gè)方法將會(huì)從之前的shuffle的仍舊可用的輸出路徑進(jìn)行復(fù)制,從而避免重復(fù)產(chǎn)生數(shù)據(jù)啊掏。
/**
* Creates a ShuffleMapStage that generates the given shuffle dependency's partitions. If a
* previously run stage generated the same shuffle data, this function will copy the output
* locations that are still available from the previous shuffle to avoid unnecessarily
* regenerating data.
*/
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
//taskNums 等于paritions的數(shù)量
val numTasks = rdd.partitions.length
//創(chuàng)建父stage
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
updateJobIdStageIdMaps(jobId, stage)
if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
// A previously run stage generated partitions for this shuffle, so for each output
// that's still available, copy information about that output location to the new stage
// (so we don't unnecessarily re-compute that data).
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
(0 until locs.length).foreach { i =>
if (locs(i) ne null) {
// locs(i) will be null if missing
stage.addOutputLoc(i, locs(i))
}
}
} else {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
stage
}