Spark內(nèi)部有兩大類操作,Transformation和Action叽粹;
Transformation又分窄依賴操作和寬依賴操作冀值,區(qū)分這兩種操作的很簡(jiǎn)單,RDD之間轉(zhuǎn)化過(guò)程中沒(méi)有shuffle的就是窄依賴杯道,有shuffle的就是寬依賴:
- RDD中數(shù)據(jù)是分partition的,每個(gè)partition分布在特定節(jié)點(diǎn)责蝠,shuffle就是指在RDD在轉(zhuǎn)化過(guò)程中党巾,一個(gè)partition中的數(shù)據(jù)需要被split成多個(gè)分片,傳入到下游RDD中的多個(gè)partition中去霜医,比如reduceByKey這類的操作齿拂,實(shí)際生產(chǎn)中,下游partition中的數(shù)據(jù)往往依賴于上游多個(gè)partition的數(shù)據(jù)肴敛,這樣就是會(huì)產(chǎn)生一個(gè)問(wèn)題署海,如果下游某個(gè)partition中的數(shù)據(jù)缺失吗购,需要重新計(jì)算上游多個(gè)partition的數(shù)據(jù),而重新計(jì)算的這些上游partition中又會(huì)同時(shí)含有下游缺失的數(shù)據(jù)partition和正常的partition砸狞,這就會(huì)造成計(jì)算的冗余捻勉;
- 與之相對(duì)的是窄依賴計(jì)算,上游一個(gè)或多個(gè)partition只對(duì)應(yīng)下游一個(gè)partition刀森,所以下游某個(gè)節(jié)點(diǎn)故障后踱启,某個(gè)partition缺失數(shù)據(jù),上游需要計(jì)算的所有partition不含有冗余計(jì)算研底,比如map埠偿,filter,union等等榜晦;
Spark數(shù)據(jù)集是RDD冠蒋,如果數(shù)據(jù)類型是Tuple2,還提供PariRDDFunctions的一些方法(是通過(guò)object RDD addToPariFuncitons隱式包含進(jìn)來(lái)的):
- 如果是窄依賴乾胶,比如map操作抖剿,生成
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF));
- 如果是寬依賴胚吁,比如reduceByKey操作牙躺,生成
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine);
其中aggregator定義了數(shù)據(jù)merge的規(guī)則,這個(gè)merge包括在map端(類似hadoop中的combiner腕扶,也就是partition內(nèi)部數(shù)據(jù)merge的規(guī)則)和reduce端(partition之間數(shù)據(jù)merge的規(guī)則)孽拷,這個(gè)merge可以是多個(gè)value組成更大的集合,例如groupByKey半抱,也可以是多個(gè)value合并計(jì)算出新的value脓恕,比如word count作業(yè)中的reduceByKey,根據(jù)業(yè)務(wù)邏輯而定窿侈; mapSideCombine是一個(gè)boolean炼幔,表示是否需要在map端進(jìn)行merge操作,比如reduceByKey是true史简,groupByKey是false乃秀;
首先來(lái)看一下MapPartitionRDD:
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
extends RDD[U](prev) {
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
override def getPartitions: Array[Partition] = firstParent[T].partitions
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
override def clearDependencies() {
super.clearDependencies()
prev = null
}
}
其中partitioner(默認(rèn)HashPartitioner)定義如果存在shuffle,不同的key被shuffle到下游的那一個(gè)分片(partition)中去圆兵,對(duì)于MapPartitionsRDD跺讯,不存在這樣的情況;每個(gè)RDD都會(huì)維護(hù)自己的dependencies殉农,是一個(gè)Seq刀脏,這里的dependency可能是OneToOneDependency(一對(duì)一,例如map)超凳,可能是RangeDependency(例如union愈污,兩個(gè)RDD合并成一個(gè)耀态,但是partition不會(huì)發(fā)生merge,上游RDD多個(gè)partition會(huì)變成下游RDD的partition range)暂雹,也可能是ShuffleDependency首装;
firstParent這里就是dependencies中的第一個(gè),map操作擎析,上游只有一個(gè)RDD簿盅,也就是只有一個(gè)partition,compute操作很簡(jiǎn)單揍魂,通過(guò)split指定上游partition,對(duì)其執(zhí)行f函數(shù)棚瘟,返回的也是一個(gè)Iterator现斋,其中firstParent[T].iterator,如果沒(méi)有cache或checkpoint則也是一個(gè)compute實(shí)現(xiàn)偎蘸;
上面的結(jié)果就是RDD內(nèi)部層層套R(shí)DD庄蹋,最后計(jì)算(compute)的時(shí)候,由里到外迷雪,不斷的遍歷iterator限书,完成計(jì)算,這里直觀感覺(jué)上是遍歷多次章咧,但基于scala內(nèi)部的實(shí)現(xiàn)倦西,減少不必要的遍歷;
下面再看ShuffledRDD:
compute內(nèi)部通過(guò)shuffleManager獲取上游shuffle-write產(chǎn)生的數(shù)據(jù)赁严,根據(jù)split扰柠,返回iterator,并不包涵其他的函數(shù)計(jì)算:
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context).read().asInstanceOf[Iterator[(K, C)]]
}
下面來(lái)看一下調(diào)用rdd.collect(Spark的Action操作)之后stage的劃分疼约,最后直接的處理函數(shù)是handleJobSubmitted卤档,
finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite);
Stage這里面涉及到兩種ResultStage和ShuffleMapStage程剥,每一個(gè)Stage都包含若干parent stages劝枣,對(duì)于這種一對(duì)一的RDD DAG 作業(yè),parent stages這個(gè)集合中织鲸,保存的都只是上游1個(gè)stage舔腾,是一個(gè)單鏈條;劃分Stage的方法很簡(jiǎn)單昙沦,就是通過(guò)ShuffleDependency來(lái)判斷琢唾;
stage都確定好之后,將stage轉(zhuǎn)化為task進(jìn)行計(jì)算盾饮,計(jì)算的條件就是一個(gè)stage的所有parent stges都已經(jīng)計(jì)算完成采桃,stage到task的邏輯是:submitMissingTasks;
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
首先得到需要計(jì)算哪些partition懒熙,然后根據(jù)ShuffleMapStage和ResultStage分別生成ShuffleMapTask和ResultTask,然后提交task:
taskScheduler.submitTasks(new TaskSet(tasks.toArray, [stage.id](http://stage.id), stage.latestInfo.attemptId, jobId, properties))
CoarseGrainedExecutorBackend收到LaunchTask之后:
case LaunchTask(data) => if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask)
}
通過(guò)executor執(zhí)行task普办。
補(bǔ)充說(shuō)明一點(diǎn)工扎,整個(gè)RDD的DAG圖根據(jù)ShuffleDependency劃分出若干stage之后,或者是一個(gè)ShuffleMapStage衔蹲,或者是一個(gè)ResultStage肢娘,對(duì)于ResultStage很簡(jiǎn)單,從上游讀取數(shù)據(jù)舆驶,聚合之后就可以返回了橱健,對(duì)于ShuffleMapStage,他的目標(biāo)就是完成Shuffle-Write操作沙廉,這個(gè)實(shí)在ShuffleMapTask中的runTask完成的拘荡,而需要讀取的數(shù)據(jù)由上游的RDD的iterator提供,上游如果是普通的RDD撬陵,比如MapPartitionsRDD珊皿,直接讀取,內(nèi)部的compute函數(shù)完成兩個(gè)事情巨税,第一提供iterator蟋定,第二完成上游RDD的計(jì)算邏輯,即用函數(shù)對(duì)iterator操作草添,而這個(gè)提供數(shù)據(jù)iterator又回繼續(xù)遞歸調(diào)用更上游的RDD的compute驶兜,但是如果是ShuffledRDD,則不會(huì)遞歸到更上游果元,而是去reader促王,讀取上游數(shù)據(jù),返回iterator而晒,僅此而已蝇狼,reader的實(shí)現(xiàn)中是到driver拿到mapstatuses,里面包括block的executor位置信息倡怎,然后連接executor進(jìn)行讀取迅耘。