(1) persist算子
- 使用方法:
var rdd = sc.textFile("test")
rdd = rdd.persist(StorageLevel.MEMORY_ONLY)
val count = rdd.count() //或者其他操作
- StorageLevel說明:
StorageLevel的構造函數(shù):
class StorageLevel private(
private var _useDisk: Boolean, # 是否存入磁盤
private var _useMemory: Boolean, # 是否存入內存
private var _useOffHeap: Boolean, # 是否使用堆外內存
private var _deserialized: Boolean, # 是否不進行序列化
private var _replication: Int = 1 # 副本數(shù)(默認為1))
StorageLevel object中已經定義了幾種代表RDD持久化的級別:
image.png
使用不同參數(shù)的組合構造的實例被預先定義為一些值苫费,比如MEMORY_ONLY代表著不存入磁盤染厅,存入內存,不使用堆外內存,不進行序列化,副本數(shù)為1,使用persisit()方法時把這些持久化的級別作為參數(shù)傳入即可。
(2) cache算子
cache() = persist(StorageLevel.MEMORY_ONLY)
(3) checkpoint算子:可以把RDD持久化到HDFS
使用方法:
使用方法:
sc.setCheckpointDir("hdfs://...")
var rdd = sc.textFile("test")
rdd.checkpoint()
val count = rdd.count() //或者其他操作
checkpoint()執(zhí)行原理:
- 當RDD的job執(zhí)行完畢后,會從finalRDD從后往前回溯
- 當回溯到調用了checkpoint()方法的RDD后蹦疑,會給這個RDD做一個標記
- Spark框架自動啟動一個新的job,計算這個RDD的數(shù)據(jù)互墓,然后把數(shù)據(jù)持久化到HDFS上
- 優(yōu)化:對某個RDD執(zhí)行checkpoint()之前必尼,對該RDD執(zhí)行cache(),這樣的話篡撵,新啟動的job只需要把內存中的數(shù)據(jù)上傳到HDFS中即可判莉,不需要重新計算。
(4) 關于這3個算子的幾點說明
- 這3個算子都是Transformations類算子育谬,需要Actions類算子觸發(fā)才能執(zhí)行
- cache 和 persist 算子的返回執(zhí)行必須賦值給一個變量券盅,在接下來的job中直接使用這個變量,那么就是使用了持久化的數(shù)據(jù)了膛檀,如果application中只有一個job锰镀,沒有必要使用RDD持久化
- cache 和 persist 算子后不能立即緊跟action類算子,比如count算子咖刃,但是在下一行可以有action類算子
error :
cache().count()
right :
rdd = rdd.cache()
rdd.count()
- checkpoint()算子執(zhí)行后就切斷了RDD之間的依賴
當業(yè)務邏輯很復雜時泳炉,RDD之間頻繁轉換,RDD的血統(tǒng)很長嚎杨,如果中間某個RDD的數(shù)據(jù)丟失花鹅,還需要重新從頭計算,如果對中間某個RDD調用了checkpoint()方法枫浙,把這個RDD上傳到HDFS刨肃,同時讓后面的RDD不再依賴于這個RDD,而是依賴于HDFS上的數(shù)據(jù)箩帚,那么下次計算會方便很多真友。 - checkpoint持久化到磁盤和persist持久化到磁盤的區(qū)別
- persist()把RDD持久化到磁盤,這個RDD的持久化數(shù)據(jù)保存在Worker的工作目錄下紧帕,且當整個application執(zhí)行結束后盔然,就會自動刪除持久化的數(shù)據(jù)
- checkpoint()持久化到指定的目錄,可以是HDFS是嗜,而且永久保存