3.4 RDD的計算

3.4 RDD的計算

3.4.1 Ta s k簡介

原始的RDD經(jīng)過一系列轉換后蝗柔,會在最后一個RDD上觸發(fā)一個動作硝清,這個動作會生成一個Job。在Job被劃分為一批計算任務(Task)后赠摇,這批Task會被提交到集群上的計算節(jié)點去計算芍瑞。計算節(jié)點執(zhí)行計算邏輯的部分稱為Executor。Executor在準備好Task的運行時環(huán)境后皮胡,會通過調用org.apache.spark.scheduler.Task#run來執(zhí)行計算痴颊。Spark的Task分為兩種:

1)org.apache.spark.scheduler.ShuffleMapTask

2)org.apache.spark.scheduler.ResultTask

簡單來說,DAG的最后一個階段會為每個結果的Partition生成一個ResultTask屡贺,其余所有的階段都會生成ShuffleMapTask蠢棱。生成的Task會被發(fā)送到已經(jīng)啟動的Executor上,由Executor來完成計算任務的執(zhí)行甩栈,執(zhí)行過程的實現(xiàn)在org.apache. spark.executor.Executor.TaskRunner#run泻仙。第6章會介紹這一部分的實現(xiàn)原理和設計思想。

3.4.2 Task的執(zhí)行起點

org.apache.spark.scheduler.Task#run會 調 用ShuffleMapTask或 者ResultTask的runTask量没;runTask會調用RDD的org.apache.spark.rdd.RDD#iterator玉转。計算由此開始。

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {

if(storageLevel != StorageLevel.NONE) {

//如果存儲級別不是NONE殴蹄,那么先檢查是否有緩存究抓;沒有緩存則要進行計算

SparkEnv.get.cacheManager.getOrCompute(this, split, context,storageLevel)

} else {

//如果有checkpoint猾担,那么直接讀取結果;否則直接進行計算

computeOrReadCheckpoint(split, context)

}

}

其中刺下,SparkEnv中包含了一個運行時節(jié)點所需要的所有的環(huán)境信息绑嘹。cache-Manager是org.apache.spark.CacheManager,它負責調用BlockManager來管理RDD的緩存橘茉,如果當前RDD原來計算過并且把結果緩存起來工腋,那么接下來的運算都可以通過BlockManager來直接讀取緩存后返回。SparkEnv除了cacheManager畅卓,還包括以下重要的成員變量:

1)akka.actor.ActorSystem:運行在該節(jié)點的Actor System擅腰,其中運行在Driver上的名字是sparkDriver;運行在Executor上的是sparkExecutor翁潘。

2)org.apache.spark.serializer.Serializer:序列化和發(fā)序列化的工具惕鼓。

3)org.apache.spark.MapOutputTracker;保存Shuffle Map Task輸出的位置信息唐础。其中在Driver上的Tracer是org.apache.spark.MapOutputTrackerMaster;而在Executor上的Tracker是org.apache.spark.MapOutputTrackerWorker矾飞,它會從org.apache.spark. MapOutputTrackerMaster獲取信息一膨。

4)org.apache.spark.shuffle.ShuffleManager:Shuffle的管理者,其中Driver端會注冊Shuffle的信息洒沦,而Executor端會上報和獲取Shuffle的信息”鳎現(xiàn)階段內置支持Hash Based Shuffle和Sort Based Shuffle,具體實現(xiàn)細節(jié)請參閱第7章申眼。

5)org.apache.spark.broadcast.BroadcastManager:廣播變量的管理者瞒津。

6)org.apache.spark.network.BlockTransferService:Executor讀取Shuffle數(shù)據(jù)的Client。當前支持netty和nio括尸,可以通過spark.shuffle.blockTransferService來設置巷蚪。具體詳情可以參閱第7章。

7)org.apache.spark.storage.BlockManager:提供了Storage模塊與其他模塊的交互接口濒翻,管理Storage模塊屁柏。

8)org.apache.spark.SecurityManager:Spark對于認證授權的實現(xiàn)。

9)org.apache.spark.HttpFileServer:可以提供HTTP服務的Server有送。當前主要用于Executor端下載依賴淌喻。

10)org.apache.spark.metrics.MetricsSystem:用于搜集統(tǒng)計信息。

11)org.apache.spark.shuffle.ShuffleMemoryManager:管理Shuffle過程中使用的內存雀摘。ExternalAppendOnlyMap 和ExternalSorter都會從ShuffleMemoryManager中申請內存裸删,在數(shù)據(jù)spill到Disk后會釋放內存。當然了阵赠,當Task退出時這個內存也會被回收涯塔。為了使得每個thread都會比較公平地獲取內存資源肌稻,避免一個thread申請了大量內存后造成其他的thread需要頻繁地進行spill操作,它采取的內存分配策略是:對于N個thread伤塌,每個thread可以至少申請1/(2*N)的內存灯萍,但是至多申請1/N。這個N是動態(tài)變化的每聪,感興趣的讀者可以查閱這個類的具體實現(xiàn)旦棉。

在用戶創(chuàng)建org.apache.spark.SparkContext時會創(chuàng)建org.apache.spark.SparkEnv。

3.4.3 緩存的處理

如果存儲級別不是NONE药薯,那么先檢查是否有緩存绑洛;沒有緩存則要進行計算。什么是存儲級別童本?從用戶的角度來看就是緩存保存到不同的存儲位置真屯,比如內存、硬盤穷娱、Tachyon绑蔫;還有緩存的數(shù)據(jù)是否需要序列化等。詳細的存儲級別的介紹可以參閱第8章泵额。

cacheManager對Storage模塊進行了封裝配深,使得RDD可以更加簡單地從Storage模塊讀取或者寫入數(shù)據(jù)。RDD的每個Partition對應Storage模塊的一個Block嫁盲,只不過Block是Partition經(jīng)過處理后的數(shù)據(jù)篓叶。在系統(tǒng)實現(xiàn)的層面上,可以認為Partition和Block是一一對應的羞秤。cacheManager會通過getOrCompute來判斷當前的RDD是否需要進行計算缸托。

首先,cacheManager會通過RDD的ID和當前計算的Partition的ID向Storage模塊的BlockManager發(fā)起查詢請求瘾蛋,如果能夠獲得Block的信息俐镐,會直接返回Block的信息。否則哺哼,代表該RDD是需要計算的京革。這個RDD以前可能計算過并且被存儲到了內存中,但是后來由于內存緊張幸斥,這部分內存被清理了匹摇。在計算結束后,計算結果會根據(jù)用戶定義的存儲級別甲葬,寫入BlockManager中廊勃。這樣,下次就可以不經(jīng)過計算而直接讀取該RDD的計算結果了。核心實現(xiàn)邏輯如下:

def getOrCompute[T](

rdd: RDD[T],

partition: Partition,

context: TaskContext,

storageLevel: StorageLevel): Iterator[T] = {

//獲取RDD的BlockId

val key = RDDBlockId(rdd.id, partition.index)

logDebug(s"Looking for partition $key")

blockManager.get(key) match { //向BlockManager查詢是否有緩存

case Some(blockResult) => //緩存命中

//更新統(tǒng)計信息坡垫,將緩存作為結果返回

context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)

new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])


case None => //沒有緩存命中梭灿,需要計算

// 判斷當前是否有線程在處理當前的Partition,如果有那么等待它結束后冰悠,直接從Block

// Manager中讀取處理結果如果沒有線程在計算堡妒,那么storedValues就是None,否則

// 就是計算的結果

val storedValues = acquireLockForPartition[T](key)

if (storedValues.isDefined) { // 已經(jīng)被其他線程處理了溉卓,直接返回結果

return new InterruptibleIterator[T](context, storedValues.get)

}


// 需要計算

try {

// 如果被checkpoint過皮迟,那么讀取checkpoint的數(shù)據(jù);否則調用rdd的compute()開始


? // 計算

? ? ? ? val computedValues = rdd.computeOrReadCheckpoint(partition,context)

// Task是在Driver端執(zhí)行的話就不需要緩存結果桑寨,這個主要是為了first() 或者take()

// 這種僅僅有一個執(zhí)行階段的任務的快速執(zhí)行伏尼。這類任務由于沒有Shuffle階段,直接運行

// 在Driver端可能會更省時間

if (context.isRunningLocally) {

return computedValues


}

// 將計算結果寫入到BlockManager

val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

val cachedValues =

putInBlockManager(key, computedValues, storageLevel, updatedBlocks)

// 更新任務的統(tǒng)計信息

val metrics = context.taskMetrics

val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(

Seq[(BlockId, BlockStatus)]())

metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)

new InterruptibleIterator(context, cachedValues)

} finally {

loading.synchronized {

loading.remove(key)

// 如果有其他的線程在等待該Partition的處理結果尉尾,那么通知它們計算已經(jīng)完成爆阶,結果已

// 經(jīng)存到BlockManager中(注意前面那類不會寫入BlockManager的本地任務)

// loading.notifyAll()

}

}

}

}

3.4.4 checkpoint的處理

在緩存沒有命中的情況下,首先會判斷是否保存了RDD的checkpoint沙咏,如果有辨图,則讀取checkpoint。為了理解checkpoint的RDD是如何讀取計算結果的肢藐,需要先看一下checkpoint的數(shù)據(jù)是如何寫入的徒役。

首先在Job結束后,會判斷是否需要checkpoint窖壕。如果需要,就調用org.apache. spark.rdd.RDDCheckpointData#doCheckpoint杉女。doCheckpoint首先為數(shù)據(jù)創(chuàng)建一個目錄瞻讽;然后啟動一個新的Job來計算,并且將計算結果寫入新創(chuàng)建的目錄熏挎;接著創(chuàng)建一個org.apache.spark.rdd.CheckpointRDD速勇;最后,原始RDD的所有依賴被清除坎拐,這就意味著RDD的轉換的計算鏈(compute chain)等信息都被清除烦磁。這個處理邏輯中,數(shù)據(jù)寫入的實現(xiàn)在org.apache.spark.rdd.CheckpointRDD$#writeToFile哼勇。簡要的核心邏輯如下:

// 創(chuàng)建一個保存checkpoint數(shù)據(jù)的目錄

val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)

val fs = path.getFileSystem(rdd.context.hadoopConfiguration)

if (!fs.mkdirs(path)) {

throw new SparkException("Failed to create checkpoint path " + path)

}


// 創(chuàng)建廣播變量

val broadcastedConf = rdd.context.broadcast(

new SerializableWritable(rdd.context.hadoopConfiguration))

//開始一個新的Job進行計算都伪,計算結果存入路徑path中

rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _)

//根據(jù)結果的路徑path來創(chuàng)建CheckpointRDD

val newRDD = new CheckpointRDD[T](rdd.context, path.toString)

//保存結果,清除原始RDD的依賴积担、Partition信息等

RDDCheckpointData.synchronized {

cpFile = Some(path.toString)

cpRDD = Some(newRDD) // RDDCheckpointData對應的CheckpointRDD

rdd.markCheckpointed(newRDD)? ? ? // 清除原始RDD的依賴陨晶,Partition

cpState = Checkpointed? ? ? ? ? ? //標記checkpoint的狀態(tài)為完成

}

至此,RDD的checkpoint完成帝璧,其中checkpoint的數(shù)據(jù)可以通過checkpointRDD的readFromFile讀取先誉。但是湿刽,上述邏輯在清除了RDD的依賴后,并沒有和check-pointRDD建立聯(lián)系褐耳,那么Spark是如何確定一個RDD是否被checkpoint了诈闺,而且正確讀取checkpoint的數(shù)據(jù)呢?

答案就在org.apache.spark.rdd.RDD#dependencies的實現(xiàn)铃芦,它會首先判斷當前的RDD是否已經(jīng)Checkpoint過雅镊,如果有,那么RDD的依賴就變成了對應的CheckpointRDD:

privatedefcheckpointRDD: Option[RDD[T]]=checkpointData.flatMap(_.checkpointRDD)

final def dependencies: Seq[Dependency[_]] = {

checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {

if (dependencies_ == null) { //沒有checkpoint

dependencies_ = getDependencies

}

dependencies_

}

}

理解了Checkpoint的實現(xiàn)過程杨帽,接下來看一下computeOrReadCheckpoint的實現(xiàn)漓穿。前面提到了,它一共在兩個地方被調用注盈,org.apache.spark.rdd.RDD#iterator和org.apache. spark.CacheManager#getOrCompute晃危。它實現(xiàn)的邏輯比較簡單,首先檢查當前RDD是否被Checkpoint過老客,如果有僚饭,讀取Checkpoint的數(shù)據(jù);否則開始計算胧砰。實現(xiàn)如下:

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext)

: Iterator[T] =

{

if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)

}

firstParent[T].iterator(split鳍鸵,context)會調用對應CheckpointRDD的iterator,最終調用到它的compute:

override def compute(split: Partition, context: TaskContext): Iterator[T] = {

val file=new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))

CheckpointRDD.readFromFile(file, broadcastedConf, context) //讀取Checkpoint的數(shù)據(jù)

}

3.4.5 RDD的計算邏輯

RDD的計算邏輯在org.apache.spark.rdd.RDD#compute中實現(xiàn)尉间。每個特定的RDD都會實現(xiàn)compute偿乖。比如前面提到的CheckpointRDD的compute就是直接讀取checkpoint數(shù)據(jù)。HadoopRDD就是讀取指定Partition的數(shù)據(jù)哲嘲。MapPartitionsRDD就是將用戶的轉換邏輯作用到指定的Partition上贪薪。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市眠副,隨后出現(xiàn)的幾起案子画切,更是在濱河造成了極大的恐慌,老刑警劉巖囱怕,帶你破解...
    沈念sama閱讀 217,277評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件霍弹,死亡現(xiàn)場離奇詭異,居然都是意外死亡娃弓,警方通過查閱死者的電腦和手機典格,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,689評論 3 393
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來台丛,“玉大人钝计,你說我怎么就攤上這事。” “怎么了私恬?”我有些...
    開封第一講書人閱讀 163,624評論 0 353
  • 文/不壞的土叔 我叫張陵债沮,是天一觀的道長。 經(jīng)常有香客問我本鸣,道長疫衩,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,356評論 1 293
  • 正文 為了忘掉前任荣德,我火速辦了婚禮闷煤,結果婚禮上,老公的妹妹穿的比我還像新娘涮瞻。我一直安慰自己鲤拿,他們只是感情好,可當我...
    茶點故事閱讀 67,402評論 6 392
  • 文/花漫 我一把揭開白布署咽。 她就那樣靜靜地躺著近顷,像睡著了一般。 火紅的嫁衣襯著肌膚如雪宁否。 梳的紋絲不亂的頭發(fā)上窒升,一...
    開封第一講書人閱讀 51,292評論 1 301
  • 那天,我揣著相機與錄音慕匠,去河邊找鬼饱须。 笑死,一個胖子當著我的面吹牛台谊,可吹牛的內容都是我干的蓉媳。 我是一名探鬼主播,決...
    沈念sama閱讀 40,135評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼锅铅,長吁一口氣:“原來是場噩夢啊……” “哼酪呻!你這毒婦竟也來了?” 一聲冷哼從身側響起狠角,我...
    開封第一講書人閱讀 38,992評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎蚪腋,沒想到半個月后丰歌,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,429評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡屉凯,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,636評論 3 334
  • 正文 我和宋清朗相戀三年立帖,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片悠砚。...
    茶點故事閱讀 39,785評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡晓勇,死狀恐怖,靈堂內的尸體忽然破棺而出,到底是詐尸還是另有隱情绑咱,我是刑警寧澤绰筛,帶...
    沈念sama閱讀 35,492評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站描融,受9級特大地震影響铝噩,放射性物質發(fā)生泄漏。R本人自食惡果不足惜窿克,卻給世界環(huán)境...
    茶點故事閱讀 41,092評論 3 328
  • 文/蒙蒙 一骏庸、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧年叮,春花似錦具被、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,723評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至改执,卻和暖如春啸蜜,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背辈挂。 一陣腳步聲響...
    開封第一講書人閱讀 32,858評論 1 269
  • 我被黑心中介騙來泰國打工衬横, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人终蒂。 一個月前我還...
    沈念sama閱讀 47,891評論 2 370
  • 正文 我出身青樓蜂林,卻偏偏與公主長得像,于是被迫代替她去往敵國和親拇泣。 傳聞我的和親對象是個殘疾皇子噪叙,可洞房花燭夜當晚...
    茶點故事閱讀 44,713評論 2 354

推薦閱讀更多精彩內容