1. Background
當(dāng)我們需要多次使用同一個 RDD 時泥技,如果簡單的調(diào)用 Action 操作钧汹,Spark 每次都會重算 RDD 以及它所有的依賴聊疲,此時需要用到持久化技術(shù)济榨。
2. Basic
2.1 持久化級別
Storage Level | Meaning |
---|---|
MEMORY_ONLY | 未序列化只保存在內(nèi)存克婶,如果內(nèi)存裝不下,那么一部分分區(qū)數(shù)據(jù)將在他們被需要的時候重新計算术吗。該級別是默認(rèn)持久化級別 |
MEMORY_AND_DISK | 未序列化先嘗試保存在內(nèi)存宠进,如果內(nèi)存裝不下,那么一部分分區(qū)數(shù)據(jù)保存在磁盤 |
MEMORY_ONLY_SER | MEMORY_ONLY 的序列化版 |
MEMORY_AND_DISK_SER | MEMORY_AND_DISK 的序列化版 |
DISK_ONLY | 只保存在磁盤 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | 帶副本的版本 |
2.2 cache() & persist()
def cache() = persist()
def persist() = persist(StorageLevel.MEMORY_ONLY)
根據(jù)二者的函數(shù)實現(xiàn)可以看出藐翎,cache 底層就是調(diào)用的 persist 的無參重載函數(shù),而無參重載函數(shù)的持久化級別是MEMORY_ONLY实幕。日常使用中直接調(diào)用 cache 即可
2.3 cache() 的使用
scala> val rdd = sc.makeRDD(Array("rdd"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[47] at makeRDD at <console>:24
scala> val noCache=rdd.map(_+System.currentTimeMillis())
noCache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[48] at map at <console>:26
scala> noCache.collect
res33: Array[String] = Array(rdd1600019753783)
scala> noCache.collect
res34: Array[String] = Array(rdd1600019756380)
scala> noCache.collect
res35: Array[String] = Array(rdd1600019759584)
scala> val useCache=noCache.cache
useCache: noCache.type = MapPartitionsRDD[48] at map at <console>:26
scala> useCache.collect
res36: Array[String] = Array(rdd1600019789962)
scala> useCache.collect
res37: Array[String] = Array(rdd1600019789962)
scala> useCache.collect
res38: Array[String] = Array(rdd1600019789962)
從上面的代碼中可以看到吝镣,在未使用 cache 前,每次的時間戳都是重新計算的昆庇。對 RDD 使用了 cache 后末贾,則時間戳被保存了下來
3. Deep
3.1 任何時候都應(yīng)該使用 cache 嗎?
既然 cache 的默認(rèn)持久化級別是 MEMORY_ONLY整吆,內(nèi)存裝不下時拱撵,RDD 的部分分區(qū)不進(jìn)行緩存。那是否意味著任何時候都應(yīng)該對 RDD 進(jìn)行 cache 操作呢表蝙?答案顯然是不應(yīng)該的拴测。因為內(nèi)存資源是有限的,歸根結(jié)底府蛇,cache 操作是在重新計算 RDD 的計算代價與緩存 RDD 的內(nèi)存資源之間進(jìn)行權(quán)衡集索。如果內(nèi)存資源充足,但是 cpu 計算資源緊張汇跨,那么可以多進(jìn)行 cache 操作务荆。反之則要謹(jǐn)慎 cache
3.2 DISK_ONLY 持久化級別有什么意義?
除了絕大多數(shù)時候?qū)?RDD 使用的 cache 操作外穷遂,我們可以通過 persist 自己手動指定持久化級別函匕。持久化到內(nèi)存中是很容易理解的,和緩存概念類似蚪黑。但是 DISK_ONLY 很反直覺的將 RDD 持久化到磁盤盅惜。這樣做是否有意義呢中剩?如果理解了前面所說的 cache 操作是在重新計算 RDD 的計算代價與緩存 RDD 的內(nèi)存資源之間進(jìn)行權(quán)衡的話,這里也就很好理解了酷窥。雖然磁盤資源可以認(rèn)為是永遠(yuǎn)充足的咽安,但是從磁盤讀取是有 I/O 代價的,所以 DISK_ONLY 持久化級別就是在重新計算 RDD 的計算代價與從磁盤讀取 RDD 的 I/O 代價之間進(jìn)行權(quán)衡蓬推。換句話說如果 cpu 和內(nèi)存資源不充足妆棒,并且計算該 RDD 耗時很久,可以考慮將持久化級別設(shè)置為 DISK_ONLY沸伏。
3.3 cache & checkpoint
3.3.1 checkpoint 注釋:
Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
directory set withSparkContext#setCheckpointDir
and all references to its parent
RDDs will be removed. This function must be called before any job has been
executed on this RDD. It is strongly recommended that this RDD is persisted in
memory, otherwise saving it on a file will require recomputation.
從注釋中可以總結(jié)出如下幾個要點:
- RDD 將被保存在事先通過 setCheckpointDir 設(shè)置的路徑下
- RDD 的血緣關(guān)系將被切斷
- 必須在所有 action 前調(diào)用
- 最好將使用 checkpoint 的 RDD 持久化在內(nèi)存中糕珊,否則 checkpoint 時將觸發(fā)重復(fù)計算
3.3.2 代碼演示:
scala> val rdd = sc.makeRDD(Array("rdd"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> val ckRdd = rdd.map(_+System.currentTimeMillis())
ckRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at map at <console>:26
scala> sc.setCheckpointDir("./ck")
scala> ckRdd.checkpoint
scala> ckRdd.toDebugString
res3: String =
(4) MapPartitionsRDD[1] at map at <console>:26 []
| ParallelCollectionRDD[0] at makeRDD at <console>:24 []
scala> ckRdd.collect
res4: Array[String] = Array(rdd1600026196973)
此時查看 WebUI:
可以看到對 checkpoint 的 RDD 首次 collect 時,觸發(fā)了兩個 job毅糟。第一個 job 用來返回結(jié)果红选,第二個 job 重新計算并持久化到磁盤。
scala> ckRdd.collect
res5: Array[String] = Array(rdd1600026197161)
scala> ckRdd.collect
res6: Array[String] = Array(rdd1600026197161)
scala> ckRdd.collect
res6: Array[String] = Array(rdd1600026197161)
scala> ckRdd.toDebugString
res7: String =
(4) MapPartitionsRDD[1] at map at <console>:26 []
| ReliableCheckpointRDD[2] at collect at <console>:29 []
scala> val cacheRdd = rdd.map(_+System.currentTimeMillis())
cacheRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at map at <console>:26
scala> cacheRdd.cache
res8: cacheRdd.type = MapPartitionsRDD[3] at map at <console>:26
scala> cacheRdd.collect
res10: Array[String] = Array(rdd1600026882262)
scala> cacheRdd.toDebugString
res11: String =
(4) MapPartitionsRDD[3] at map at <console>:26 [Memory Deserialized 1x Replicated]
| CachedPartitions: 4; MemorySize: 144.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ParallelCollectionRDD[0] at makeRDD at <console>:24 [Memory Deserialized 1x Replicated]
通過演示可以看到姆另,checkpoint 后的 RDD 第二次 collect 的結(jié)果和第一次不同喇肋,時間戳要大一些,驗證了第一次調(diào)用 collect 時觸發(fā)了兩個 job 迹辐,且使用的是觸發(fā)的第二個 job持久化到磁盤的結(jié)果蝶防。通過調(diào)用 toDebugString 可以看到,checkpointRDD 切斷了血緣關(guān)系只保留了一個ReliableCheckpointRDD
明吩,作為對比 cache RDD 則在中間加了一層 CachedPartitions
间学,保留了完整的血緣關(guān)系。
接下來看看在使用 checkpoint 前調(diào)用 action 算子是什么效果:
scala> val rdd = sc.makeRDD(Array("rdd"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at makeRDD at <console>:24
scala> val ckRdd = rdd.map(_+System.currentTimeMillis())
ckRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at map at <console>:26
scala> sc.setCheckpointDir("./ck")
scala> ckRdd.collect
res13: Array[String] = Array(rdd1600027380204)
scala> ckRdd.checkpoint
scala> ckRdd.collect
res16: Array[String] = Array(rdd1600027397304)
scala> ckRdd.collect
res17: Array[String] = Array(rdd1600027399110)
scala> ckRdd.collect
res18: Array[String] = Array(rdd1600027399948)
可以看到印荔,如果在調(diào)用 checkpoint 前調(diào)用了 action 算子低葫,將會導(dǎo)致 checkpoint 失效。
3.3.3 cache & checkpoint 對比總結(jié):
- cache():MEMORY_ONLY for RDD,MEMORY_AND_DISK for Dataset仍律;checkpoint需要手動指定一個外部文件系統(tǒng)作為檢查點嘿悬,保存在磁盤
- cache 在執(zhí)行job過程中直接保存中間結(jié)果;checkpoint 會另啟一個 job 重新計算
- cache 可以在 action 后調(diào)用水泉;checkpoint 必須在 action 前調(diào)用鹊漠,否則失效
- cache 不會切斷血緣關(guān)系(在中間添加一層 cacheRDD );checkpoint 會切斷血緣關(guān)系
- checkpoint 一般可以保存系統(tǒng)狀態(tài)茶行,用于故障恢復(fù)躯概;cache 不具備這個特性
3.3.4 最佳實踐
對 RDD 先調(diào)用 cache 保存在內(nèi)存,再調(diào)用 checkpoint 持久化到磁盤畔师,checkpoint 雖然會觸發(fā)第二個 job 來持久化到磁盤娶靡,但是因為 cache 在了內(nèi)存中,所以直接從內(nèi)存中取看锉,避免了重復(fù)計算姿锭。