3.4 RDD的計算
3.4.1 Ta s k簡介
原始的RDD經(jīng)過一系列轉換后蝗柔,會在最后一個RDD上觸發(fā)一個動作硝清,這個動作會生成一個Job。在Job被劃分為一批計算任務(Task)后赠摇,這批Task會被提交到集群上的計算節(jié)點去計算芍瑞。計算節(jié)點執(zhí)行計算邏輯的部分稱為Executor。Executor在準備好Task的運行時環(huán)境后皮胡,會通過調用org.apache.spark.scheduler.Task#run來執(zhí)行計算痴颊。Spark的Task分為兩種:
1)org.apache.spark.scheduler.ShuffleMapTask
2)org.apache.spark.scheduler.ResultTask
簡單來說,DAG的最后一個階段會為每個結果的Partition生成一個ResultTask屡贺,其余所有的階段都會生成ShuffleMapTask蠢棱。生成的Task會被發(fā)送到已經(jīng)啟動的Executor上,由Executor來完成計算任務的執(zhí)行甩栈,執(zhí)行過程的實現(xiàn)在org.apache. spark.executor.Executor.TaskRunner#run泻仙。第6章會介紹這一部分的實現(xiàn)原理和設計思想。
3.4.2 Task的執(zhí)行起點
org.apache.spark.scheduler.Task#run會 調 用ShuffleMapTask或 者ResultTask的runTask量没;runTask會調用RDD的org.apache.spark.rdd.RDD#iterator玉转。計算由此開始。
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if(storageLevel != StorageLevel.NONE) {
//如果存儲級別不是NONE殴蹄,那么先檢查是否有緩存究抓;沒有緩存則要進行計算
SparkEnv.get.cacheManager.getOrCompute(this, split, context,storageLevel)
} else {
//如果有checkpoint猾担,那么直接讀取結果;否則直接進行計算
computeOrReadCheckpoint(split, context)
}
}
其中刺下,SparkEnv中包含了一個運行時節(jié)點所需要的所有的環(huán)境信息绑嘹。cache-Manager是org.apache.spark.CacheManager,它負責調用BlockManager來管理RDD的緩存橘茉,如果當前RDD原來計算過并且把結果緩存起來工腋,那么接下來的運算都可以通過BlockManager來直接讀取緩存后返回。SparkEnv除了cacheManager畅卓,還包括以下重要的成員變量:
1)akka.actor.ActorSystem:運行在該節(jié)點的Actor System擅腰,其中運行在Driver上的名字是sparkDriver;運行在Executor上的是sparkExecutor翁潘。
2)org.apache.spark.serializer.Serializer:序列化和發(fā)序列化的工具惕鼓。
3)org.apache.spark.MapOutputTracker;保存Shuffle Map Task輸出的位置信息唐础。其中在Driver上的Tracer是org.apache.spark.MapOutputTrackerMaster;而在Executor上的Tracker是org.apache.spark.MapOutputTrackerWorker矾飞,它會從org.apache.spark. MapOutputTrackerMaster獲取信息一膨。
4)org.apache.spark.shuffle.ShuffleManager:Shuffle的管理者,其中Driver端會注冊Shuffle的信息洒沦,而Executor端會上報和獲取Shuffle的信息”鳎現(xiàn)階段內置支持Hash Based Shuffle和Sort Based Shuffle,具體實現(xiàn)細節(jié)請參閱第7章申眼。
5)org.apache.spark.broadcast.BroadcastManager:廣播變量的管理者瞒津。
6)org.apache.spark.network.BlockTransferService:Executor讀取Shuffle數(shù)據(jù)的Client。當前支持netty和nio括尸,可以通過spark.shuffle.blockTransferService來設置巷蚪。具體詳情可以參閱第7章。
7)org.apache.spark.storage.BlockManager:提供了Storage模塊與其他模塊的交互接口濒翻,管理Storage模塊屁柏。
8)org.apache.spark.SecurityManager:Spark對于認證授權的實現(xiàn)。
9)org.apache.spark.HttpFileServer:可以提供HTTP服務的Server有送。當前主要用于Executor端下載依賴淌喻。
10)org.apache.spark.metrics.MetricsSystem:用于搜集統(tǒng)計信息。
11)org.apache.spark.shuffle.ShuffleMemoryManager:管理Shuffle過程中使用的內存雀摘。ExternalAppendOnlyMap 和ExternalSorter都會從ShuffleMemoryManager中申請內存裸删,在數(shù)據(jù)spill到Disk后會釋放內存。當然了阵赠,當Task退出時這個內存也會被回收涯塔。為了使得每個thread都會比較公平地獲取內存資源肌稻,避免一個thread申請了大量內存后造成其他的thread需要頻繁地進行spill操作,它采取的內存分配策略是:對于N個thread伤塌,每個thread可以至少申請1/(2*N)的內存灯萍,但是至多申請1/N。這個N是動態(tài)變化的每聪,感興趣的讀者可以查閱這個類的具體實現(xiàn)旦棉。
在用戶創(chuàng)建org.apache.spark.SparkContext時會創(chuàng)建org.apache.spark.SparkEnv。
3.4.3 緩存的處理
如果存儲級別不是NONE药薯,那么先檢查是否有緩存绑洛;沒有緩存則要進行計算。什么是存儲級別童本?從用戶的角度來看就是緩存保存到不同的存儲位置真屯,比如內存、硬盤穷娱、Tachyon绑蔫;還有緩存的數(shù)據(jù)是否需要序列化等。詳細的存儲級別的介紹可以參閱第8章泵额。
cacheManager對Storage模塊進行了封裝配深,使得RDD可以更加簡單地從Storage模塊讀取或者寫入數(shù)據(jù)。RDD的每個Partition對應Storage模塊的一個Block嫁盲,只不過Block是Partition經(jīng)過處理后的數(shù)據(jù)篓叶。在系統(tǒng)實現(xiàn)的層面上,可以認為Partition和Block是一一對應的羞秤。cacheManager會通過getOrCompute來判斷當前的RDD是否需要進行計算缸托。
首先,cacheManager會通過RDD的ID和當前計算的Partition的ID向Storage模塊的BlockManager發(fā)起查詢請求瘾蛋,如果能夠獲得Block的信息俐镐,會直接返回Block的信息。否則哺哼,代表該RDD是需要計算的京革。這個RDD以前可能計算過并且被存儲到了內存中,但是后來由于內存緊張幸斥,這部分內存被清理了匹摇。在計算結束后,計算結果會根據(jù)用戶定義的存儲級別甲葬,寫入BlockManager中廊勃。這樣,下次就可以不經(jīng)過計算而直接讀取該RDD的計算結果了。核心實現(xiàn)邏輯如下:
def getOrCompute[T](
rdd: RDD[T],
partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {
//獲取RDD的BlockId
val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
blockManager.get(key) match { //向BlockManager查詢是否有緩存
case Some(blockResult) => //緩存命中
//更新統(tǒng)計信息坡垫,將緩存作為結果返回
context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
case None => //沒有緩存命中梭灿,需要計算
// 判斷當前是否有線程在處理當前的Partition,如果有那么等待它結束后冰悠,直接從Block
// Manager中讀取處理結果如果沒有線程在計算堡妒,那么storedValues就是None,否則
// 就是計算的結果
val storedValues = acquireLockForPartition[T](key)
if (storedValues.isDefined) { // 已經(jīng)被其他線程處理了溉卓,直接返回結果
return new InterruptibleIterator[T](context, storedValues.get)
}
// 需要計算
try {
// 如果被checkpoint過皮迟,那么讀取checkpoint的數(shù)據(jù);否則調用rdd的compute()開始
? // 計算
? ? ? ? val computedValues = rdd.computeOrReadCheckpoint(partition,context)
// Task是在Driver端執(zhí)行的話就不需要緩存結果桑寨,這個主要是為了first() 或者take()
// 這種僅僅有一個執(zhí)行階段的任務的快速執(zhí)行伏尼。這類任務由于沒有Shuffle階段,直接運行
// 在Driver端可能會更省時間
if (context.isRunningLocally) {
return computedValues
}
// 將計算結果寫入到BlockManager
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val cachedValues =
putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
// 更新任務的統(tǒng)計信息
val metrics = context.taskMetrics
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(
Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
new InterruptibleIterator(context, cachedValues)
} finally {
loading.synchronized {
loading.remove(key)
// 如果有其他的線程在等待該Partition的處理結果尉尾,那么通知它們計算已經(jīng)完成爆阶,結果已
// 經(jīng)存到BlockManager中(注意前面那類不會寫入BlockManager的本地任務)
// loading.notifyAll()
}
}
}
}
3.4.4 checkpoint的處理
在緩存沒有命中的情況下,首先會判斷是否保存了RDD的checkpoint沙咏,如果有辨图,則讀取checkpoint。為了理解checkpoint的RDD是如何讀取計算結果的肢藐,需要先看一下checkpoint的數(shù)據(jù)是如何寫入的徒役。
首先在Job結束后,會判斷是否需要checkpoint窖壕。如果需要,就調用org.apache. spark.rdd.RDDCheckpointData#doCheckpoint杉女。doCheckpoint首先為數(shù)據(jù)創(chuàng)建一個目錄瞻讽;然后啟動一個新的Job來計算,并且將計算結果寫入新創(chuàng)建的目錄熏挎;接著創(chuàng)建一個org.apache.spark.rdd.CheckpointRDD速勇;最后,原始RDD的所有依賴被清除坎拐,這就意味著RDD的轉換的計算鏈(compute chain)等信息都被清除烦磁。這個處理邏輯中,數(shù)據(jù)寫入的實現(xiàn)在org.apache.spark.rdd.CheckpointRDD$#writeToFile哼勇。簡要的核心邏輯如下:
// 創(chuàng)建一個保存checkpoint數(shù)據(jù)的目錄
val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
if (!fs.mkdirs(path)) {
throw new SparkException("Failed to create checkpoint path " + path)
}
// 創(chuàng)建廣播變量
val broadcastedConf = rdd.context.broadcast(
new SerializableWritable(rdd.context.hadoopConfiguration))
//開始一個新的Job進行計算都伪,計算結果存入路徑path中
rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _)
//根據(jù)結果的路徑path來創(chuàng)建CheckpointRDD
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
//保存結果,清除原始RDD的依賴积担、Partition信息等
RDDCheckpointData.synchronized {
cpFile = Some(path.toString)
cpRDD = Some(newRDD) // RDDCheckpointData對應的CheckpointRDD
rdd.markCheckpointed(newRDD)? ? ? // 清除原始RDD的依賴陨晶,Partition
cpState = Checkpointed? ? ? ? ? ? //標記checkpoint的狀態(tài)為完成
}
至此,RDD的checkpoint完成帝璧,其中checkpoint的數(shù)據(jù)可以通過checkpointRDD的readFromFile讀取先誉。但是湿刽,上述邏輯在清除了RDD的依賴后,并沒有和check-pointRDD建立聯(lián)系褐耳,那么Spark是如何確定一個RDD是否被checkpoint了诈闺,而且正確讀取checkpoint的數(shù)據(jù)呢?
答案就在org.apache.spark.rdd.RDD#dependencies的實現(xiàn)铃芦,它會首先判斷當前的RDD是否已經(jīng)Checkpoint過雅镊,如果有,那么RDD的依賴就變成了對應的CheckpointRDD:
privatedefcheckpointRDD: Option[RDD[T]]=checkpointData.flatMap(_.checkpointRDD)
final def dependencies: Seq[Dependency[_]] = {
checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
if (dependencies_ == null) { //沒有checkpoint
dependencies_ = getDependencies
}
dependencies_
}
}
理解了Checkpoint的實現(xiàn)過程杨帽,接下來看一下computeOrReadCheckpoint的實現(xiàn)漓穿。前面提到了,它一共在兩個地方被調用注盈,org.apache.spark.rdd.RDD#iterator和org.apache. spark.CacheManager#getOrCompute晃危。它實現(xiàn)的邏輯比較簡單,首先檢查當前RDD是否被Checkpoint過老客,如果有僚饭,讀取Checkpoint的數(shù)據(jù);否則開始計算胧砰。實現(xiàn)如下:
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext)
: Iterator[T] =
{
if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
}
firstParent[T].iterator(split鳍鸵,context)會調用對應CheckpointRDD的iterator,最終調用到它的compute:
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
val file=new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
CheckpointRDD.readFromFile(file, broadcastedConf, context) //讀取Checkpoint的數(shù)據(jù)
}
3.4.5 RDD的計算邏輯
RDD的計算邏輯在org.apache.spark.rdd.RDD#compute中實現(xiàn)尉间。每個特定的RDD都會實現(xiàn)compute偿乖。比如前面提到的CheckpointRDD的compute就是直接讀取checkpoint數(shù)據(jù)。HadoopRDD就是讀取指定Partition的數(shù)據(jù)哲嘲。MapPartitionsRDD就是將用戶的轉換邏輯作用到指定的Partition上贪薪。