1给梅、RDD 持久化
Spark 中一個很重要的能力是將數據持久化(或稱為緩存)假丧,在多個操作間都可以訪問這些持久化的數據。當持久化一個 RDD 時破喻,每個節(jié)點的其它分區(qū)都可以使用 RDD 在內存中進行計算虎谢,在該數據上的其他 action 操作將直接使用內存中的數據盟榴。這樣會讓以后的 action 操作計算速度加快(通常運行速度會加速 10 倍)曹质。緩存是迭代算法和快速的交互式使用的重要工具。
RDD 可以使用 persist() 方法或 cache() 方法進行持久化擎场。數據將會在第一次 action 操作時進行計算羽德,并緩存在節(jié)點的內存中。Spark 的緩存具有容錯機制迅办,如果一個緩存的 RDD 的某個分區(qū)丟失了宅静,Spark 將按照原來的計算過程,自動重新計算并進行緩存站欺。
在 shuffle 操作中(例如 reduceByKey)姨夹,即便是用戶沒有調用 persist 方法,Spark 也會自動緩存部分中間數據矾策。這么做的目的是磷账,在 shuffle 的過程中某個節(jié)點運行失敗時,不需要重新計算所有的輸入數據贾虽。如果用戶想多次使用某個 RDD逃糟,強烈推薦在該 RDD 上調用 persist 方法。
2、存儲級別
每個持久化的 RDD 可以使用不同的存儲級別進行緩存绰咽,例如菇肃,持久化到磁盤、已序列化的 Java 對象形式持久化到內存(可以節(jié)省空間)取募、跨節(jié)點間復制供填、以 off-heap 的方式存儲在 Tachyon。這些存儲級別通過傳遞一個 StorageLevel 對象給 persist() 方法進行設置戴尸。
詳細的存儲級別介紹如下:
- MEMORY_ONLY : 將 RDD 以反序列化 Java 對象的形式存儲在 JVM 中熬甚。如果內存空間不夠,部分數據分區(qū)將不再緩存聊品,在每次需要用到這些數據時重新進行計算飞蹂。這是默認的級別。
- MEMORY_AND_DISK : 將 RDD 以反序列化 Java 對象的形式存儲在 JVM 中翻屈。如果內存空間不夠陈哑,將未緩存的數據分區(qū)存儲到磁盤,在需要使用這些分區(qū)時從磁盤讀取伸眶。
- MEMORY_ONLY_SER : 將 RDD 以序列化的 Java 對象的形式進行存儲(每個分區(qū)為一個 byte 數組)惊窖。這種方式會比反序列化對象的方式節(jié)省很多空間,尤其是在使用 fast serializer時會節(jié)省更多的空間厘贼,但是在讀取時會增加 CPU 的計算負擔界酒。
- MEMORY_AND_DISK_SER : 類似于 MEMORY_ONLY_SER ,但是溢出的分區(qū)會存儲到磁盤嘴秸,而不是在用到它們時重新計算毁欣。
- DISK_ONLY : 只在磁盤上緩存 RDD。
- MEMORY_ONLY_2岳掐,MEMORY_AND_DISK_2凭疮,等等 : 與上面的級別功能相同,只不過每個分區(qū)在集群中兩個節(jié)點上建立副本串述。
- OFF_HEAP(實驗中): 類似于 MEMORY_ONLY_SER 执解,但是將數據存儲在 off-heap memory,這需要啟動 off-heap 內存纲酗。
注意衰腌,在 Python 中,緩存的對象總是使用 Pickle 進行序列化觅赊,所以在 Python 中不關心你選擇的是哪一種序列化級別右蕊。python 中的存儲級別包括 MEMORY_ONLY,MEMORY_ONLY_2茉兰,MEMORY_AND_DISK尤泽,MEMORY_AND_DISK_2,DISK_ONLY 和 DISK_ONLY_2 。
上面的幾個緩存級別是官網給出的坯约,但是通過源碼看熊咽,實際上一共有12種緩存級別
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
.....
}
關于構造函數的幾個參數
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
extends Externalizable
- _useDisk:使用磁盤
- _useMemory:使用內存
- _useOffHeap:使用堆外存,這是Java虛擬機里面的概念闹丐,堆外內存意味著把內存對象分配在Java虛擬機的堆以外的內存横殴,這些內存直接受操作系統(tǒng)管理(而不是虛擬機)。這樣做的結果就是能保持一個較小的堆卿拴,以減少垃圾收集對應用的影響衫仑。
- _deserialized:使用反序列化,其逆過程序列化(Serialization)是java提供的一種機制堕花,將對象表示成一連串的字節(jié)文狱;而反序列化就表示將字節(jié)恢復為對象的過程。序列化是對象永久化的一種機制缘挽,可以將對象及其屬性保存起來瞄崇,并能在反序列化后直接恢復這個對象
- _replication:副本數,默認是一個
3壕曼、如何選擇存儲級別
Spark 的存儲級別的選擇苏研,核心問題是在內存使用率和 CPU 效率之間進行權衡。建議按下面的過程進行存儲級別的選擇 :
- 如果使用默認的存儲級別(MEMORY_ONLY)腮郊,存儲在內存中的 RDD 沒有發(fā)生溢出摹蘑,那么就選擇默認的存儲級別。默認存儲級別可以最大程度的提高 CPU 的效率,可以使在 RDD 上的操作以最快的速度運行轧飞。
- 如果內存不能全部存儲 RDD衅鹿,那么使用 MEMORY_ONLY_SER,并挑選一個快速序列化庫將對象序列化踪少,以節(jié)省內存空間塘安。使用這種存儲級別糠涛,計算速度仍然很快援奢。
除了在計算該數據集的代價特別高,或者在需要過濾大量數據的情況下忍捡,盡量不要將溢出的數據存儲到磁盤集漾。因為,重新計算這個數據分區(qū)的耗時與從磁盤讀取這些數據的耗時差不多砸脊。 - 如果想快速還原故障具篇,建議使用多副本存儲級別(例如,使用 Spark 作為 web 應用的后臺服務凌埂,在服務出故障時需要快速恢復的場景下)驱显。所有的存儲級別都通過重新計算丟失的數據的方式,提供了完全容錯機制。但是多副本級別在發(fā)生數據丟失時埃疫,不需要重新計算對應的數據庫伏恐,可以讓任務繼續(xù)運行。
4栓霜、刪除數據
Spark 自動監(jiān)控各個節(jié)點上的緩存使用率翠桦,并以最近最少使用的方式(LRU)將舊數據塊移除內存。如果想手動移除一個 RDD胳蛮,而不是等待該 RDD 被 Spark 自動移除销凑,可以使用 RDD.unpersist() 方法
5、RDD的cache和persist的區(qū)別
cache()調用的persist()仅炊,是使用默認存儲級別的快捷設置方法
看一下源碼
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): this.type = persist()
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
通過源碼可以看出cache()是persist()的簡化方式斗幼,調用persist的無參版本,也就是調用persist(StorageLevel.MEMORY_ONLY)抚垄,cache只有一個默認的緩存級別MEMORY_ONLY孟岛,即將數據持久化到內存中,而persist可以通過傳遞一個 StorageLevel 對象來設置緩存的存儲級別督勺。
6渠羞、DataFrame的cache和persist的區(qū)別
官網和上的教程說的都是RDD,但是沒有講df的緩存,通過源碼發(fā)現df和rdd還是不太一樣的:
/**
* Persist this Dataset with the default storage level (`MEMORY_AND_DISK`).
*
* @group basic
* @since 1.6.0
*/
def cache(): this.type = persist()
/**
* Persist this Dataset with the default storage level (`MEMORY_AND_DISK`).
*
* @group basic
* @since 1.6.0
*/
def persist(): this.type = {
sparkSession.sharedState.cacheManager.cacheQuery(this)
this
}
/**
* Persist this Dataset with the given storage level.
* @param newLevel One of: `MEMORY_ONLY`, `MEMORY_AND_DISK`, `MEMORY_ONLY_SER`,
* `MEMORY_AND_DISK_SER`, `DISK_ONLY`, `MEMORY_ONLY_2`,
* `MEMORY_AND_DISK_2`, etc.
*
* @group basic
* @since 1.6.0
*/
def persist(newLevel: StorageLevel): this.type = {
sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
this
}
def cacheQuery(
query: Dataset[_],
tableName: Option[String] = None,
storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock
可以到cache()依然調用的persist()智哀,但是persist調用cacheQuery次询,而cacheQuery的默認存儲級別為MEMORY_AND_DISK,這點和rdd是不一樣的瓷叫。
7屯吊、代碼測試
新建一個測試的txt,文件越大越好,如果文件比較小摹菠,可能cache的效果還不如不cache的好盒卸。
import org.apache.spark.sql.SparkSession
object Test {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").getOrCreate()
val sc = spark.sparkContext
val rdd = sc.textFile("files/test.txt")
var beiginTime = System.currentTimeMillis
println(rdd.count)
var endTime = System.currentTimeMillis
println("cost " + (endTime - beiginTime) + " milliseconds.")
beiginTime = System.currentTimeMillis
println(rdd.count)
endTime = System.currentTimeMillis
println("cost " + (endTime - beiginTime) + " milliseconds.")
spark.stop
}
}
先看一下沒有cache的時間
79391
cost 575 milliseconds.
79391
cost 124 milliseconds.
然后將代碼val rdd = sc.textFile(“files/test.txt”)替換為val rdd = sc.textFile(“files/test.txt”).cache,再進行測試
79391
cost 635 milliseconds.
79391
cost 44 milliseconds.
可以看到cache后第二次count的時間明顯比沒有cache第二次count的時間少很多次氨,第一次count時間增加是因為要進行持久化蔽介,如果看總時間的話,只有多次使用該rdd的時候煮寡,效果才明顯虹蓄。
8、注意
cache()和persist()的使用是有規(guī)則的:
必須在transformation或者textfile等創(chuàng)建一個rdd之后幸撕,直接連續(xù)調用cache()或者persist()才可以薇组,如果先創(chuàng)建一個rdd,再單獨另起一行執(zhí)行cache()或者persist(),是沒有用的坐儿,而且會報錯律胀,大量的文件會丟失宋光。