OFF_HEAP
Spark中RDD提供了幾種存儲(chǔ)級(jí)別屿储,不同的存儲(chǔ)級(jí)別可以帶來(lái)不同的容錯(cuò)性能拣技,例如 MEMORY_ONLY
,MEMORY_ONLY_SER_2
...其中偏形,有一種特別的是OFF_HEAP
off_heap
的優(yōu)勢(shì)在于敏储,在內(nèi)存有限的條件下谎柄,減少不必要的內(nèi)存消耗丁侄,以及頻繁的GC問(wèn)題,提升程序性能朝巫。
Spark2.0以前绒障,默認(rèn)的off_heap是Tachyon,當(dāng)然捍歪,你可以通過(guò)繼承ExternalBlockManager
來(lái)實(shí)現(xiàn)你自己想要的任何off_heap户辱。
這里說(shuō)Tachyon,是因?yàn)镾park默認(rèn)的TachyonBlockManager開(kāi)發(fā)完成之后糙臼,就再也沒(méi)有更新過(guò)庐镐,以至于Tachyon升級(jí)為Alluxio之后移除不使用的API,導(dǎo)致Spark默認(rèn)off_heap不可用变逃,這個(gè)問(wèn)題Spark社區(qū)和Alluxio社區(qū)都有反饋:ALLUXIO-1881
Spark2.0的off_heap
從spark2.0開(kāi)始必逆,社區(qū)已經(jīng)移除默認(rèn)的TachyonBlockManager以及ExternalBlockManager相關(guān)的API:SPARK-12667。
那么,問(wèn)題來(lái)了名眉,在Spark2.0中粟矿,OFF_HEAP是怎么處理的呢?數(shù)據(jù)存在哪里损拢?
上代碼:
首先陌粹,在StorageLevel里面,不同的存儲(chǔ)級(jí)別解析成不同的構(gòu)造函數(shù)福压,從OFF_HEAP的構(gòu)造函數(shù)可以看出來(lái)掏秩,OFF_HEAP依舊存在。
Object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
...... ........
}
在org.apache.spark.memory中荆姆,有一個(gè)MemoryMode蒙幻,MemoryMode標(biāo)記了使用ON_HEAP還是OFF_HEAP,在org.apache.spark.storage.memory.MemoryStore中,根據(jù)MemoryMode類(lèi)型來(lái)調(diào)用不同的存儲(chǔ)
def putBytes[T: ClassTag](
blockId: BlockId,
size: Long,
memoryMode: MemoryMode,
_bytes: () => ChunkedByteBuffer): Boolean = {
.............
val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
entries.synchronized {
entries.put(blockId, entry)
}
.............
}
再看MemoryStore中存數(shù)據(jù)的方法:putIteratorAsBytes
val allocator = memoryMode match {
case MemoryMode.ON_HEAP => ByteBuffer.allocate _
case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
}
終于找到Spark2.0中off_heap的底層存儲(chǔ)了:Platform是利用java unsafe API實(shí)現(xiàn)的一個(gè)訪問(wèn)off_heap的類(lèi)胆筒。
總結(jié)
spark2.0 off_heap就是利用java unsafe API實(shí)現(xiàn)的內(nèi)存管理邮破。
優(yōu)點(diǎn):依然可以減少內(nèi)存的使用,減少頻繁的GC仆救,提高程序性能抒和。
缺點(diǎn):從代碼中看到,使用OFF_HEAP并沒(méi)有備份數(shù)據(jù)派桩,也不能像alluxio那樣保證數(shù)據(jù)高可用,丟失數(shù)據(jù)則需要重新計(jì)算蚌斩。