Spark RDD 之持久化

1. Background

當(dāng)我們需要多次使用同一個 RDD 時泥技,如果簡單的調(diào)用 Action 操作钧汹,Spark 每次都會重算 RDD 以及它所有的依賴聊疲,此時需要用到持久化技術(shù)济榨。

2. Basic

2.1 持久化級別

Storage Level Meaning
MEMORY_ONLY 未序列化只保存在內(nèi)存克婶,如果內(nèi)存裝不下,那么一部分分區(qū)數(shù)據(jù)將在他們被需要的時候重新計算术吗。該級別是默認(rèn)持久化級別
MEMORY_AND_DISK 未序列化先嘗試保存在內(nèi)存宠进,如果內(nèi)存裝不下,那么一部分分區(qū)數(shù)據(jù)保存在磁盤
MEMORY_ONLY_SER MEMORY_ONLY 的序列化版
MEMORY_AND_DISK_SER MEMORY_AND_DISK 的序列化版
DISK_ONLY 只保存在磁盤
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 帶副本的版本

2.2 cache() & persist()

def cache() = persist()

def persist() = persist(StorageLevel.MEMORY_ONLY)

根據(jù)二者的函數(shù)實現(xiàn)可以看出藐翎,cache 底層就是調(diào)用的 persist 的無參重載函數(shù),而無參重載函數(shù)的持久化級別是MEMORY_ONLY实幕。日常使用中直接調(diào)用 cache 即可

2.3 cache() 的使用

scala> val rdd = sc.makeRDD(Array("rdd"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[47] at makeRDD at <console>:24

scala> val noCache=rdd.map(_+System.currentTimeMillis())
noCache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[48] at map at <console>:26

scala> noCache.collect
res33: Array[String] = Array(rdd1600019753783)

scala> noCache.collect
res34: Array[String] = Array(rdd1600019756380)

scala> noCache.collect
res35: Array[String] = Array(rdd1600019759584)

scala> val useCache=noCache.cache
useCache: noCache.type = MapPartitionsRDD[48] at map at <console>:26

scala> useCache.collect
res36: Array[String] = Array(rdd1600019789962)

scala> useCache.collect
res37: Array[String] = Array(rdd1600019789962)

scala> useCache.collect
res38: Array[String] = Array(rdd1600019789962)

從上面的代碼中可以看到吝镣,在未使用 cache 前,每次的時間戳都是重新計算的昆庇。對 RDD 使用了 cache 后末贾,則時間戳被保存了下來

3. Deep

3.1 任何時候都應(yīng)該使用 cache 嗎?

既然 cache 的默認(rèn)持久化級別是 MEMORY_ONLY整吆,內(nèi)存裝不下時拱撵,RDD 的部分分區(qū)不進(jìn)行緩存。那是否意味著任何時候都應(yīng)該對 RDD 進(jìn)行 cache 操作呢表蝙?答案顯然是不應(yīng)該的拴测。因為內(nèi)存資源是有限的,歸根結(jié)底府蛇,cache 操作是在重新計算 RDD 的計算代價與緩存 RDD 的內(nèi)存資源之間進(jìn)行權(quán)衡集索。如果內(nèi)存資源充足,但是 cpu 計算資源緊張汇跨,那么可以多進(jìn)行 cache 操作务荆。反之則要謹(jǐn)慎 cache

3.2 DISK_ONLY 持久化級別有什么意義?

除了絕大多數(shù)時候?qū)?RDD 使用的 cache 操作外穷遂,我們可以通過 persist 自己手動指定持久化級別函匕。持久化到內(nèi)存中是很容易理解的,和緩存概念類似蚪黑。但是 DISK_ONLY 很反直覺的將 RDD 持久化到磁盤盅惜。這樣做是否有意義呢中剩?如果理解了前面所說的 cache 操作是在重新計算 RDD 的計算代價與緩存 RDD 的內(nèi)存資源之間進(jìn)行權(quán)衡的話,這里也就很好理解了酷窥。雖然磁盤資源可以認(rèn)為是永遠(yuǎn)充足的咽安,但是從磁盤讀取是有 I/O 代價的,所以 DISK_ONLY 持久化級別就是在重新計算 RDD 的計算代價與從磁盤讀取 RDD 的 I/O 代價之間進(jìn)行權(quán)衡蓬推。換句話說如果 cpu 和內(nèi)存資源不充足妆棒,并且計算該 RDD 耗時很久,可以考慮將持久化級別設(shè)置為 DISK_ONLY沸伏。

3.3 cache & checkpoint

3.3.1 checkpoint 注釋:

Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
directory set with SparkContext#setCheckpointDir and all references to its parent
RDDs will be removed. This function must be called before any job has been
executed on this RDD. It is strongly recommended that this RDD is persisted in
memory, otherwise saving it on a file will require recomputation.

從注釋中可以總結(jié)出如下幾個要點:

  1. RDD 將被保存在事先通過 setCheckpointDir 設(shè)置的路徑下
  2. RDD 的血緣關(guān)系將被切斷
  3. 必須在所有 action 前調(diào)用
  4. 最好將使用 checkpoint 的 RDD 持久化在內(nèi)存中糕珊,否則 checkpoint 時將觸發(fā)重復(fù)計算

3.3.2 代碼演示:

scala> val rdd = sc.makeRDD(Array("rdd"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at makeRDD at <console>:24

scala> val ckRdd = rdd.map(_+System.currentTimeMillis())
ckRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at map at <console>:26

scala> sc.setCheckpointDir("./ck")

scala> ckRdd.checkpoint

scala> ckRdd.toDebugString
res3: String =
(4) MapPartitionsRDD[1] at map at <console>:26 []
 |  ParallelCollectionRDD[0] at makeRDD at <console>:24 []

scala> ckRdd.collect
res4: Array[String] = Array(rdd1600026196973)

此時查看 WebUI:

ckRdd.collect 觸發(fā)的 job

可以看到對 checkpoint 的 RDD 首次 collect 時,觸發(fā)了兩個 job毅糟。第一個 job 用來返回結(jié)果红选,第二個 job 重新計算并持久化到磁盤。

scala> ckRdd.collect
res5: Array[String] = Array(rdd1600026197161)

scala> ckRdd.collect
res6: Array[String] = Array(rdd1600026197161)

scala> ckRdd.collect
res6: Array[String] = Array(rdd1600026197161)

scala> ckRdd.toDebugString
res7: String =
(4) MapPartitionsRDD[1] at map at <console>:26 []
 |  ReliableCheckpointRDD[2] at collect at <console>:29 []

scala> val cacheRdd = rdd.map(_+System.currentTimeMillis())
cacheRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at map at <console>:26

scala> cacheRdd.cache
res8: cacheRdd.type = MapPartitionsRDD[3] at map at <console>:26

scala> cacheRdd.collect
res10: Array[String] = Array(rdd1600026882262)

scala> cacheRdd.toDebugString
res11: String =
(4) MapPartitionsRDD[3] at map at <console>:26 [Memory Deserialized 1x Replicated]
 |       CachedPartitions: 4; MemorySize: 144.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
 |  ParallelCollectionRDD[0] at makeRDD at <console>:24 [Memory Deserialized 1x Replicated]

通過演示可以看到姆另,checkpoint 后的 RDD 第二次 collect 的結(jié)果和第一次不同喇肋,時間戳要大一些,驗證了第一次調(diào)用 collect 時觸發(fā)了兩個 job 迹辐,且使用的是觸發(fā)的第二個 job持久化到磁盤的結(jié)果蝶防。通過調(diào)用 toDebugString 可以看到,checkpointRDD 切斷了血緣關(guān)系只保留了一個ReliableCheckpointRDD明吩,作為對比 cache RDD 則在中間加了一層 CachedPartitions间学,保留了完整的血緣關(guān)系。

接下來看看在使用 checkpoint 前調(diào)用 action 算子是什么效果:

scala> val rdd = sc.makeRDD(Array("rdd"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at makeRDD at <console>:24

scala> val ckRdd = rdd.map(_+System.currentTimeMillis())
ckRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at map at <console>:26

scala> sc.setCheckpointDir("./ck")

scala> ckRdd.collect
res13: Array[String] = Array(rdd1600027380204)

scala> ckRdd.checkpoint

scala> ckRdd.collect
res16: Array[String] = Array(rdd1600027397304)

scala> ckRdd.collect
res17: Array[String] = Array(rdd1600027399110)

scala> ckRdd.collect
res18: Array[String] = Array(rdd1600027399948)

可以看到印荔,如果在調(diào)用 checkpoint 前調(diào)用了 action 算子低葫,將會導(dǎo)致 checkpoint 失效。

3.3.3 cache & checkpoint 對比總結(jié):

  1. cache():MEMORY_ONLY for RDD,MEMORY_AND_DISK for Dataset仍律;checkpoint需要手動指定一個外部文件系統(tǒng)作為檢查點嘿悬,保存在磁盤
  2. cache 在執(zhí)行job過程中直接保存中間結(jié)果;checkpoint 會另啟一個 job 重新計算
  3. cache 可以在 action 后調(diào)用水泉;checkpoint 必須在 action 前調(diào)用鹊漠,否則失效
  4. cache 不會切斷血緣關(guān)系(在中間添加一層 cacheRDD );checkpoint 會切斷血緣關(guān)系
  5. checkpoint 一般可以保存系統(tǒng)狀態(tài)茶行,用于故障恢復(fù)躯概;cache 不具備這個特性

3.3.4 最佳實踐

對 RDD 先調(diào)用 cache 保存在內(nèi)存,再調(diào)用 checkpoint 持久化到磁盤畔师,checkpoint 雖然會觸發(fā)第二個 job 來持久化到磁盤娶靡,但是因為 cache 在了內(nèi)存中,所以直接從內(nèi)存中取看锉,避免了重復(fù)計算姿锭。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末塔鳍,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子呻此,更是在濱河造成了極大的恐慌轮纫,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,692評論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件焚鲜,死亡現(xiàn)場離奇詭異掌唾,居然都是意外死亡,警方通過查閱死者的電腦和手機忿磅,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,482評論 3 392
  • 文/潘曉璐 我一進(jìn)店門糯彬,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人葱她,你說我怎么就攤上這事撩扒。” “怎么了吨些?”我有些...
    開封第一講書人閱讀 162,995評論 0 353
  • 文/不壞的土叔 我叫張陵搓谆,是天一觀的道長。 經(jīng)常有香客問我豪墅,道長挽拔,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,223評論 1 292
  • 正文 為了忘掉前任但校,我火速辦了婚禮,結(jié)果婚禮上啡氢,老公的妹妹穿的比我還像新娘状囱。我一直安慰自己,他們只是感情好倘是,可當(dāng)我...
    茶點故事閱讀 67,245評論 6 388
  • 文/花漫 我一把揭開白布亭枷。 她就那樣靜靜地躺著,像睡著了一般搀崭。 火紅的嫁衣襯著肌膚如雪叨粘。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,208評論 1 299
  • 那天瘤睹,我揣著相機與錄音升敲,去河邊找鬼。 笑死轰传,一個胖子當(dāng)著我的面吹牛驴党,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播获茬,決...
    沈念sama閱讀 40,091評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼港庄,長吁一口氣:“原來是場噩夢啊……” “哼倔既!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起鹏氧,我...
    開封第一講書人閱讀 38,929評論 0 274
  • 序言:老撾萬榮一對情侶失蹤渤涌,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后把还,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體实蓬,經(jīng)...
    沈念sama閱讀 45,346評論 1 311
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,570評論 2 333
  • 正文 我和宋清朗相戀三年笨篷,在試婚紗的時候發(fā)現(xiàn)自己被綠了瞳秽。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,739評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡率翅,死狀恐怖练俐,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情冕臭,我是刑警寧澤腺晾,帶...
    沈念sama閱讀 35,437評論 5 344
  • 正文 年R本政府宣布,位于F島的核電站辜贵,受9級特大地震影響悯蝉,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜托慨,卻給世界環(huán)境...
    茶點故事閱讀 41,037評論 3 326
  • 文/蒙蒙 一鼻由、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧厚棵,春花似錦蕉世、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,677評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至彬犯,卻和暖如春向楼,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背谐区。 一陣腳步聲響...
    開封第一講書人閱讀 32,833評論 1 269
  • 我被黑心中介騙來泰國打工湖蜕, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人宋列。 一個月前我還...
    沈念sama閱讀 47,760評論 2 369
  • 正文 我出身青樓重荠,卻偏偏與公主長得像,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子戈鲁,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,647評論 2 354