Spark詳解06容錯機制Cache 和 Checkpoint

Cache 和 Checkpoint

作為區(qū)別于 Hadoop 的一個重要 feature,cache 機制保證了需要訪問重復數(shù)據(jù)的應(yīng)用(如迭代型算法和交互式應(yīng)用)可以運行的更快喉誊。與 Hadoop MapReduce job 不同的是 Spark 的邏輯/物理執(zhí)行圖可能很龐大轨奄,task 中 computing
chain 可能會很長,計算某些 RDD 也可能會很耗時初澎。這時,如果 task 中途運行出錯,那么 task 的整個 computing chain 需要重算侦啸,代價太高。因此丧枪,有必要將計算代價較大的 RDD checkpoint 一下光涂,這樣,當下游 RDD 計算出錯時拧烦,可以直接從 checkpoint 過的 RDD 那里讀取數(shù)據(jù)繼續(xù)算忘闻。

Cache 機制

回到 Overview 提到的 GroupByTest 的例子,里面對 FlatMappedRDD 進行了 cache恋博,這樣 Job 1 在執(zhí)行時就直接從 FlatMappedRDD 開始算了齐佳。可見 cache 能夠讓重復數(shù)據(jù)在同一個 application 中的 jobs 間共享债沮。

邏輯執(zhí)行圖:

JobRDD

物理執(zhí)行圖:

PhysicalView.png

問題:哪些 RDD 需要 cache炼吴?

會被重復使用的(但不能太大)。

問題:用戶怎么設(shè)定哪些 RDD 要 cache疫衩?

因為用戶只與 driver program 打交道硅蹦,因此只能用 rdd.cache() 去 cache 用戶能看到的 RDD。所謂能看到指的是調(diào)用 transformation() 后生成的 RDD,而某些在 transformation() 中 Spark 自己生成的 RDD 是不能被用戶直接 cache 的童芹,比如 reduceByKey() 中會生成的 ShuffledRDD涮瞻、MapPartitionsRDD 是不能被用戶直接 cache 的。

問題:driver program 設(shè)定 rdd.cache() 后假褪,系統(tǒng)怎么對 RDD 進行 cache署咽?

先不看實現(xiàn),自己來想象一下如何完成 cache:當 task 計算得到 RDD 的某個 partition 的第一個 record 后嗜价,就去判斷該 RDD 是否要被 cache艇抠,如果要被 cache 的話,將這個 record 及后續(xù)計算的到的 records 直接丟給本地 blockManager 的 memoryStore久锥,如果 memoryStore 存不下就交給 diskStore 存放到磁盤家淤。

實際實現(xiàn)與設(shè)想的基本類似,區(qū)別在于:將要計算 RDD partition 的時候(而不是已經(jīng)計算得到第一個 record 的時候)就去判斷 partition 要不要被 cache瑟由。如果要被 cache 的話絮重,先將 partition 計算出來,然后 cache 到內(nèi)存歹苦。cache 只使用 memory青伤,寫磁盤的話那就叫 checkpoint 了。

調(diào)用 rdd.cache() 后殴瘦, rdd 就變成 persistRDD 了狠角,其 StorageLevel 為 MEMORY_ONLY。persistRDD 會告知 driver 說自己是需要被 persist 的蚪腋。

cache.png

如果用代碼表示:

rdd.iterator()
=> SparkEnv.get.cacheManager.getOrCompute(thisRDD, split, context, storageLevel)
=> key = RDDBlockId(rdd.id, split.index)
=> blockManager.get(key)
=> computedValues = rdd.computeOrReadCheckpoint(split, context)
      if (isCheckpointed) firstParent[T].iterator(split, context) 
      else compute(split, context)
=> elements = new ArrayBuffer[Any]
=> elements ++= computedValues
=> updatedBlocks = blockManager.put(key, elements, tellMaster = true)    

當 rdd.iterator() 被調(diào)用的時候丰歌,也就是要計算該 rdd 中某個 partition 的時候,會先去 cacheManager 那里領(lǐng)取一個 blockId屉凯,表明是要存哪個 RDD 的哪個 partition立帖,這個 blockId 類型是 RDDBlockId(memoryStore 里面可能還存放有 task 的 result 等數(shù)據(jù),因此 blockId 的類型是用來區(qū)分不同的數(shù)據(jù))悠砚。然后去 blockManager 里面查看該 partition 是不是已經(jīng)被 checkpoint 了晓勇,如果是,表明以前運行過該 task灌旧,那就不用計算該 partition 了绑咱,直接從 checkpoint 中讀取該 partition 的所有 records 放到叫做 elements 的 ArrayBuffer 里面。如果沒有被 checkpoint 過枢泰,先將 partition 計算出來羡玛,然后將其所有 records 放到 elements 里面。最后將 elements 交給 blockManager 進行 cache宗苍。

blockManager 將 elements(也就是 partition) 存放到 memoryStore 管理的 LinkedHashMap[BlockId, Entry] 里面。如果 partition 大于 memoryStore 的存儲極限(默認是 60% 的 heap),那么直接返回說存不下讳窟。如果剩余空間也許能放下让歼,會先 drop 掉一些早先被 cached 的 RDD 的 partition,為新來的 partition 騰地方丽啡,如果騰出的地方夠谋右,就把新來的 partition 放到 LinkedHashMap 里面,騰不出就返回說存不下补箍。注意 drop 的時候不會去 drop 與新來的 partition 同屬于一個 RDD 的 partition改执。drop 的時候先 drop 最早被 cache 的 partition。(說好的 LRU 替換算法呢坑雅?)

問題:cached RDD 怎么被讀缺补摇?

下次計算(一般是同一 application 的下一個 job 計算)時如果用到 cached RDD裹粤,task 會直接去 blockManager 的 memoryStore 中讀取终蒂。具體地講,當要計算某個 rdd 中的 partition 時候(通過調(diào)用 rdd.iterator())會先去 blockManager 里面查找是否已經(jīng)被 cache 了遥诉,如果 partition 被 cache 在本地拇泣,就直接使用 blockManager.getLocal() 去本地 memoryStore 里讀取。如果該 partition 被其他節(jié)點上 blockManager cache 了矮锈,會通過 blockManager.getRemote() 去其他節(jié)點上讀取霉翔,讀取過程如下圖。

cacheRead.png

獲取 cached partitions 的存儲位置:partition 被 cache 后所在節(jié)點上的 blockManager 會通知 driver 上的 blockMangerMasterActor 說某 rdd 的 partition 已經(jīng)被我 cache 了苞笨,這個信息會存儲在 blockMangerMasterActor 的 blockLocations: HashMap中债朵。等到 task 執(zhí)行需要 cached rdd 的時候,會調(diào)用 blockManagerMaster 的 getLocations(blockId) 去詢問某 partition 的存儲位置猫缭,這個詢問信息會發(fā)到 driver 那里葱弟,driver 查詢 blockLocations 獲得位置信息并將信息送回。

讀取其他節(jié)點上的 cached partition:task 得到 cached partition 的位置信息后猜丹,將 GetBlock(blockId) 的請求通過 connectionManager 發(fā)送到目標節(jié)點芝加。目標節(jié)點收到請求后從本地 blockManager 那里的 memoryStore 讀取 cached partition,最后發(fā)送回來射窒。

Checkpoint

問題:哪些 RDD 需要 checkpoint藏杖?

運算時間很長或運算量太大才能得到的 RDD,computing chain 過長或依賴其他 RDD 很多的 RDD脉顿。
實際上蝌麸,將 ShuffleMapTask 的輸出結(jié)果存放到本地磁盤也算是 checkpoint,只不過這個 checkpoint 的主要目的是去 partition 輸出數(shù)據(jù)艾疟。

問題:什么時候 checkpoint来吩?

cache 機制是每計算出一個要 cache 的 partition 就直接將其 cache 到內(nèi)存了敢辩。但 checkpoint 沒有使用這種第一次計算得到就存儲的方法,而是等到 job 結(jié)束后另外啟動專門的 job 去完成 checkpoint 弟疆。也就是說需要 checkpoint 的 RDD 會被計算兩次戚长。因此,在使用 rdd.checkpoint() 的時候怠苔,建議加上 rdd.cache()同廉,這樣第二次運行的 job 就不用再去計算該 rdd 了,直接讀取 cache 寫磁盤柑司。其實 Spark 提供了 rdd.persist(StorageLevel.DISK_ONLY) 這樣的方法迫肖,相當于 cache 到磁盤上,這樣可以做到 rdd 第一次被計算得到時就存儲到磁盤上攒驰,但這個 persist 和 checkpoint 有很多不同蟆湖,之后會討論。

問題:checkpoint 怎么實現(xiàn)讼育?

RDD 需要經(jīng)過 [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ] 這幾個階段才能被 checkpoint帐姻。

Initialized: 首先 driver program 需要使用 rdd.checkpoint() 去設(shè)定哪些 rdd 需要 checkpoint,設(shè)定后奶段,該 rdd 就接受 RDDCheckpointData 管理饥瓷。用戶還要設(shè)定 checkpoint 的存儲路徑,一般在 HDFS 上痹籍。

marked for checkpointing:初始化后呢铆,RDDCheckpointData 會將 rdd 標記為 MarkedForCheckpoint。

checkpointing in progress:每個 job 運行結(jié)束后會調(diào)用 finalRdd.doCheckpoint()蹲缠,finalRdd 會順著 computing chain 回溯掃描棺克,碰到要 checkpoint 的 RDD 就將其標記為 CheckpointingInProgress,然后將寫磁盤(比如寫 HDFS)需要的配置文件(如 core-site.xml 等)broadcast 到其他 worker 節(jié)點上的 blockManager线定。完成以后娜谊,啟動一個 job 來完成 checkpoint(使用 rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf)))。

checkpointed:job 完成 checkpoint 后斤讥,將該 rdd 的 dependency 全部清掉纱皆,并設(shè)定該 rdd 狀態(tài)為 checkpointed。然后芭商,為該 rdd 強加一個依賴派草,設(shè)置該 rdd 的 parent rdd 為 CheckpointRDD,該 CheckpointRDD 負責以后讀取在文件系統(tǒng)上的 checkpoint 文件铛楣,生成該 rdd 的 partition近迁。

有意思的是我在 driver program 里 checkpoint 了兩個 rdd,結(jié)果只有一個(下面的 result)被 checkpoint 成功簸州,pairs2 沒有被 checkpoint鉴竭,也不知道是 bug 還是故意只 checkpoint 下游的 RDD:

val data1 = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'), 
    (4, 'd'), (5, 'e'), (3, 'f'), (2, 'g'), (1, 'h'))
val pairs1 = sc.parallelize(data1, 3)
    
val data2 = Array[(Int, Char)]((1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'))
val pairs2 = sc.parallelize(data2, 2)

pairs2.checkpoint

val result = pairs1.join(pairs2)
result.checkpoint

問題:怎么讀取 checkpoint 過的 RDD歧譬?

在 runJob() 的時候會先調(diào)用 finalRDD 的 partitions() 來確定最后會有多個 task。rdd.partitions() 會去檢查(通過 RDDCheckpointData 去檢查拓瞪,因為它負責管理被 checkpoint 過的 rdd)該 rdd 是會否被 checkpoint 過了缴罗,如果該 rdd 已經(jīng)被 checkpoint 過了,直接返回該 rdd 的 partitions 也就是 Array[Partition]祭埂。

當調(diào)用 rdd.iterator() 去計算該 rdd 的 partition 的時候,會調(diào)用 computeOrReadCheckpoint(split: Partition) 去查看該 rdd 是否被 checkpoint 過了兵钮,如果是蛆橡,就調(diào)用該 rdd 的 parent rdd 的 iterator() 也就是 CheckpointRDD.iterator(),CheckpointRDD 負責讀取文件系統(tǒng)上的文件掘譬,生成該 rdd 的 partition泰演。這就解釋了為什么那么 trickly 地為 checkpointed rdd 添加一個 parent CheckpointRDD。

問題:cache 與 checkpoint 的區(qū)別葱轩?

關(guān)于這個問題睦焕,Tathagata Das 有一段回答: There is a significant difference between cache and checkpoint. Cache materializes the RDD and keeps it in memory and/or disk(其實只有 memory). But the lineage(也就是 computing chain) of RDD (that is, seq of operations that generated the RDD) will be remembered, so that if there are node failures and parts of the cached RDDs are lost, they can be regenerated. However, checkpoint saves the RDD to an HDFS file and actually forgets the lineage completely. This is allows long lineages to be truncated and the data to be saved reliably in HDFS (which is naturally fault tolerant by replication).

深入一點討論,rdd.persist(StorageLevel.DISK_ONLY) 與 checkpoint 也有區(qū)別靴拱。前者雖然可以將 RDD 的 partition 持久化到磁盤垃喊,但該 partition 由 blockManager 管理。一旦 driver program 執(zhí)行結(jié)束袜炕,也就是 executor 所在進程 CoarseGrainedExecutorBackend stop本谜,blockManager 也會 stop,被 cache 到磁盤上的 RDD 也會被清空(整個 blockManager 使用的 local 文件夾被刪除)偎窘。而 checkpoint 將 RDD 持久化到 HDFS 或本地文件夾乌助,如果不被手動 remove 掉(話說怎么 remove checkpoint 過的 RDD?)陌知,是一直存在的他托,也就是說可以被下一個 driver program 使用,而 cached RDD 不能被其他 dirver program 使用仆葡。

Discussion

Hadoop MapReduce 在執(zhí)行 job 的時候赏参,不停地做持久化,每個 task 運行結(jié)束做一次浙芙,每個 job 運行結(jié)束做一次(寫到 HDFS)登刺。在 task 運行過程中也不停地在內(nèi)存和磁盤間 swap 來 swap 去。 可是諷刺的是嗡呼,Hadoop 中的 task 太傻纸俭,中途出錯需要完全重新運行,比如 shuffle 了一半的數(shù)據(jù)存放到了磁盤南窗,下次重新運行時仍然要重新 shuffle揍很。Spark 好的一點在于盡量不去持久化郎楼,所以使用 pipeline,cache 等機制窒悔。用戶如果感覺 job 可能會出錯可以手動去 checkpoint 一些 critical 的 RDD呜袁,job 如果出錯,下次運行時直接從 checkpoint 中讀取數(shù)據(jù)简珠。唯一不足的是阶界,checkpoint 需要兩次運行 job。

Example

貌似還沒有發(fā)現(xiàn)官方給出的 checkpoint 的例子聋庵,這里我寫了一個:

package internals

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object groupByKeyTest {

   def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("GroupByKey").setMaster("local")
    val sc = new SparkContext(conf) 
    sc.setCheckpointDir("/Users/xulijie/Documents/data/checkpoint")
     
    val data = Array[(Int, Char)]((1, 'a'), (2, 'b'),
                                     (3, 'c'), (4, 'd'),
                                     (5, 'e'), (3, 'f'),
                                     (2, 'g'), (1, 'h')
                                    )                               
    val pairs = sc.parallelize(data, 3)
    
    pairs.checkpoint
    pairs.count
    
    val result = pairs.groupByKey(2)

    result.foreachWith(i => i)((x, i) => println("[PartitionIndex " + i + "] " + x))
    
    println(result.toDebugString)
   }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末膘融,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子祭玉,更是在濱河造成了極大的恐慌氧映,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,122評論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件脱货,死亡現(xiàn)場離奇詭異岛都,居然都是意外死亡,警方通過查閱死者的電腦和手機振峻,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,070評論 3 395
  • 文/潘曉璐 我一進店門臼疫,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人铺韧,你說我怎么就攤上這事多矮。” “怎么了哈打?”我有些...
    開封第一講書人閱讀 164,491評論 0 354
  • 文/不壞的土叔 我叫張陵塔逃,是天一觀的道長。 經(jīng)常有香客問我料仗,道長湾盗,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,636評論 1 293
  • 正文 為了忘掉前任立轧,我火速辦了婚禮格粪,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘氛改。我一直安慰自己帐萎,他們只是感情好,可當我...
    茶點故事閱讀 67,676評論 6 392
  • 文/花漫 我一把揭開白布胜卤。 她就那樣靜靜地躺著疆导,像睡著了一般。 火紅的嫁衣襯著肌膚如雪葛躏。 梳的紋絲不亂的頭發(fā)上澈段,一...
    開封第一講書人閱讀 51,541評論 1 305
  • 那天悠菜,我揣著相機與錄音,去河邊找鬼败富。 笑死悔醋,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的兽叮。 我是一名探鬼主播芬骄,決...
    沈念sama閱讀 40,292評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼充择!你這毒婦竟也來了德玫?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,211評論 0 276
  • 序言:老撾萬榮一對情侶失蹤椎麦,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后材彪,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體观挎,經(jīng)...
    沈念sama閱讀 45,655評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,846評論 3 336
  • 正文 我和宋清朗相戀三年段化,在試婚紗的時候發(fā)現(xiàn)自己被綠了嘁捷。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,965評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡显熏,死狀恐怖雄嚣,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情喘蟆,我是刑警寧澤缓升,帶...
    沈念sama閱讀 35,684評論 5 347
  • 正文 年R本政府宣布,位于F島的核電站蕴轨,受9級特大地震影響港谊,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜橙弱,卻給世界環(huán)境...
    茶點故事閱讀 41,295評論 3 329
  • 文/蒙蒙 一歧寺、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧棘脐,春花似錦斜筐、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,894評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至内斯,卻和暖如春蕴潦,著一層夾襖步出監(jiān)牢的瞬間像啼,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,012評論 1 269
  • 我被黑心中介騙來泰國打工潭苞, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留忽冻,地道東北人。 一個月前我還...
    沈念sama閱讀 48,126評論 3 370
  • 正文 我出身青樓此疹,卻偏偏與公主長得像僧诚,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子蝗碎,可洞房花燭夜當晚...
    茶點故事閱讀 44,914評論 2 355

推薦閱讀更多精彩內(nèi)容