在spark源碼閱讀之storage模塊①中烹笔,描繪了Storage模塊的整體框架是標準的master-slave框架:master用來管理slave的元數(shù)據(jù)信息尿孔,slave則是具體存儲數(shù)據(jù)祠汇,分析了作為master節(jié)點的BlockManagerMasterEndpoint和作為slave節(jié)點的BlockManagerSlaveEndpoint之間如何傳遞消息。
這篇文章中將分析數(shù)據(jù)Block存儲的具體過程谢床,分析它是如何實現(xiàn)的
本篇文章源碼基于spark 1.6.3
存儲級別
緩存RDD有兩個方法作谭,cache()和persist()源哩,而cache方法底層調(diào)用的還是persist方法,只不過cache方法傳入了默認的參數(shù)鸦做,算是persist的一個快捷操作励烦。
persist的構(gòu)造方法中可以傳入存儲級別,如下所示:
def persist(newLevel: StorageLevel): this.type = {
if (isLocallyCheckpointed) {
// This means the user previously called localCheckpoint(), which should have already
// marked this RDD for persisting. Here we should override the old storage level with
// one that is explicitly requested by the user (after adapting it to use disk).
persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
} else {
persist(newLevel, allowOverride = false)
}
}
這個StorageLevel參數(shù)就是spark中的存儲級別泼诱,代表storage的數(shù)據(jù)以什么方式存入什么媒介中
StorageLevel的構(gòu)造方法如下:
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
extends Externalizable {...}
五個參數(shù)分別代表:
_useDisk:是否使用磁盤
_useMemory:是否使用內(nèi)存
_useOffHeap:是否使用堆外存儲
_deserialized:是否序列化
_replication:副本個數(shù)
除了_useOffHeap外坛掠,其他參數(shù)可以隨意配合使用,使用方法如下:
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)
/**
* :: DeveloperApi ::
* Return the StorageLevel object with the specified name.
*/
@DeveloperApi
def fromString(s: String): StorageLevel = s match {
case "NONE" => NONE //不保存任何數(shù)據(jù)
case "DISK_ONLY" => DISK_ONLY //僅保存在磁盤
case "DISK_ONLY_2" => DISK_ONLY_2 //僅保存在磁盤,備份一份
case "MEMORY_ONLY" => MEMORY_ONLY //僅保存在內(nèi)存
case "MEMORY_ONLY_2" => MEMORY_ONLY_2 //僅保存在內(nèi)存,備份一份
case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER //僅保存在內(nèi)存执桌,保存序列化后的對象
case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2 //僅保存在內(nèi)存爪喘,保存序列化后的對象,備份一份
case "MEMORY_AND_DISK" => MEMORY_AND_DISK //優(yōu)先保存在內(nèi)存,溢出部分保存在磁盤
case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2 //同上,備份一份
case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER //優(yōu)先保存在內(nèi)存,溢出部分保存在磁盤域滥,保存序列化后的結(jié)果
case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2 //同上,備份一份
case "OFF_HEAP" => OFF_HEAP //保存在堆外存儲
case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $s")
}
即在persist方法中傳入對應(yīng)的字符串即可指定存儲級別
存儲級別的選擇:
一般來說
- 優(yōu)先使用內(nèi)存MEMORY_ONLY蜈抓,如果內(nèi)存不夠可以加上序列化MEMORY_ONLY_SER骗绕,當然也需要衡量序列化帶來的cpu消耗
- 盡量不要使用磁盤,因為磁盤IO消耗的時間遠大于內(nèi)存资昧,迫不得已重算partition數(shù)據(jù)可能都要更優(yōu)酬土,除非計算邏輯復雜,且內(nèi)存放不下數(shù)據(jù)集格带,或者你安裝的是SSD盤撤缴,可以考慮采用MEMORY_AND_DISK
- 副本機制的作用相較于容錯其實更偏向于效率,因為在spark中丟失的數(shù)據(jù)可以重算叽唱,且數(shù)據(jù)源一般都有副本機制(如HDFS)屈呕,那么增加一個副本的理由可能就是避免重算,提高效率
存儲細節(jié)
RDD的persist只有在觸發(fā)一個action操作(比如count)的時候才會真正實施棺亭,然后通過一系列操作虎眨,最后會在Task中調(diào)用RDD的iterator()方法來執(zhí)行計算,以下是iterator方法的代碼:
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) { // 如果存儲級別不是NONE,就從cacheManager中獲取數(shù)據(jù)
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else { // 否則就讀取checkpoint镶摘,再否則就重新計算
computeOrReadCheckpoint(split, context)
}
}
如果存儲級別不為NONE嗽桩,那么會調(diào)用CacheManager的getOrCompute方法,如果有緩存則讀取凄敢,如果沒有則計算并按照存儲級別將數(shù)據(jù)寫入緩存碌冶,CacheManager相當于BlockManager的包裝類,用來管理緩存內(nèi)容涝缝,繼續(xù)看CacheManager的getOrCompute方法:
def getOrCompute[T](
rdd: RDD[T],
partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {
val key = RDDBlockId(rdd.id, partition.index) //獲取blockid
logDebug(s"Looking for partition $key")
blockManager.get(key) match { //向BlockManager查詢是否有緩存
case Some(blockResult) => //緩存命中
// Partition is already materialized, so just return its values
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(blockResult.readMethod) //更新統(tǒng)計信息
existingMetrics.incBytesRead(blockResult.bytes)
//將緩存作為結(jié)果返回
val iter = blockResult.data.asInstanceOf[Iterator[T]]
new InterruptibleIterator[T](context, iter) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
case None => //沒有命中緩存扑庞,需要計算
// Acquire a lock for loading this partition
// If another thread already holds the lock, wait for it to finish return its results
val storedValues = acquireLockForPartition[T](key) //申請一個鎖來加載這個分區(qū)的數(shù)據(jù)
if (storedValues.isDefined) { //如果這部分數(shù)據(jù)已經(jīng)被計算過直接返回結(jié)果
return new InterruptibleIterator[T](context, storedValues.get)
}
// Otherwise, we have to load the partition ourselves
//如果沒有被計算過譬重,我們需要重新計算這部分數(shù)據(jù)
try {
logInfo(s"Partition $key not found, computing it")
//如果被checkpoint過則讀取checkpoint的數(shù)據(jù),否則就計算
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
// If the task is running locally, do not persist the result
//如果這個task是在driver端執(zhí)行的話就直接返回結(jié)果
if (context.isRunningLocally) {
return computedValues
}
// Otherwise, cache the values and keep track of any updates in block statuses
//如果是在executor端執(zhí)行的話就需要更新緩存信息
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] //將計算結(jié)果寫入BlockManager中
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
val metrics = context.taskMetrics //更新任務(wù)的統(tǒng)計信息
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
new InterruptibleIterator(context, cachedValues)
} finally {
loading.synchronized {
//如果有其他的線程在等待改分區(qū)的處理結(jié)果罐氨,那么通知它們已經(jīng)計算完成
//結(jié)果已經(jīng)儲存到BlockManager中
loading.remove(key)
loading.notifyAll()
}
}
}
}
以上代碼中臀规,數(shù)據(jù)在計算之前會反復確認是否存在緩存中,最后也會調(diào)用RDD的computeOrReadCheckpoint方法來計算這部分數(shù)據(jù):
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
computeOrReadCheckpoint方法中會遞歸的調(diào)用當前RDD的parentRDD的iterator方法栅隐,最后會調(diào)用不同RDD類別的compute方法來計算數(shù)據(jù):
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}
拿到計算數(shù)據(jù)后會調(diào)用putInBlockManager方法將計算結(jié)果寫入到BlockManager中
private def putInBlockManager[T](
key: BlockId,
values: Iterator[T],
level: StorageLevel,
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
val putLevel = effectiveStorageLevel.getOrElse(level) //獲取存儲級別
if (!putLevel.useMemory) { //如果沒有使用內(nèi)存的存儲級別塔嬉,可以直接寫入BlockManager
/*
* This RDD is not to be cached in memory, so we can just pass the computed values as an
* iterator directly to the BlockManager rather than first fully unrolling it in memory.
*/
updatedBlocks ++=
blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
blockManager.get(key) match {
case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
logInfo(s"Failure to store $key")
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
}
} else { //否則就在內(nèi)存中展開數(shù)據(jù)
/*
* This RDD is to be cached in memory. In this case we cannot pass the computed values
* to the BlockManager as an iterator and expect to read it back later. This is because
* we may end up dropping a partition from memory store before getting it back.
*
* In addition, we must be careful to not unroll the entire partition in memory at once.
* Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
* single partition. Instead, we unroll the values cautiously, potentially aborting and
* dropping the partition to disk if applicable.
*/
blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
case Left(arr) =>
// We have successfully unrolled the entire partition, so cache it in memory
updatedBlocks ++=
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
arr.iterator.asInstanceOf[Iterator[T]]
case Right(it) =>
// There is not enough space to cache this partition in memory
val returnValues = it.asInstanceOf[Iterator[T]]
if (putLevel.useDisk) {
logWarning(s"Persisting partition $key to disk instead.")
val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
useOffHeap = false, deserialized = false, putLevel.replication)
putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
} else {
returnValues
}
}
}
}
代碼中,如果獲取的存儲級別沒有memory约啊,那么就調(diào)用BlockManager的putIterator方法將計算結(jié)果直接寫入磁盤邑遏,否則就調(diào)用BlockManager的putArray方法將計算結(jié)果在內(nèi)存中展開佣赖。
def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
require(values != null, "Values is null")
doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel)
}
def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
require(values != null, "Values is null")
doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel)
}
def putBytes(
blockId: BlockId,
bytes: ByteBuffer,
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
require(bytes != null, "Bytes is null")
doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
}
兩者最終都會調(diào)用doPut方法恰矩,只不過一個數(shù)據(jù)封裝為IteratorValues另一個為ArrayValues,一個對應(yīng)磁盤一個對應(yīng)內(nèi)存憎蛤,還有一個方法也會調(diào)用doPut方法外傅,就是BlockManager的putBytes方法,對應(yīng)的是外部存儲俩檬。
doPut方法篇幅略長萎胰,分為以下三個部分來說明,
- 其中分類緩存數(shù)據(jù)的部分代碼如下所示:
try {
// returnValues - Whether to return the values put
// blockStore - The type of storage to put these values into
val (returnValues, blockStore: BlockStore) = {
if (putLevel.useMemory) {
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
(true, memoryStore)
} else if (putLevel.useOffHeap) {
// Use external block store
(false, externalBlockStore)
} else if (putLevel.useDisk) {
// Don't get back the bytes from put unless we replicate them
(putLevel.replication > 1, diskStore)
} else {
assert(putLevel == StorageLevel.NONE)
throw new BlockException(
blockId, s"Attempted to put block $blockId without specifying storage level!")
}
}
// Actually put the values
val result: PutResult = data match {
case IteratorValues(iterator) =>
blockStore.putIterator(blockId, iterator, putLevel, returnValues)
case ArrayValues(array) =>
blockStore.putArray(blockId, array, putLevel, returnValues)
case ByteBufferValues(bytes) =>
bytes.rewind()
blockStore.putBytes(blockId, bytes, putLevel)
}
這里存入的數(shù)據(jù)結(jié)構(gòu)為memoryStore棚辽、diskStore技竟、externalBlockStore分別對應(yīng)著存儲級別中的內(nèi)存、磁盤和外部存儲屈藐,他們緩存數(shù)據(jù)的邏輯后面單獨說明榔组,這里分別調(diào)用了它們的putArray、putIterator联逻、putBytes方法搓扯。
其中需要注意的是,如果存儲級別是MEMORY_AND_DISK包归,代碼中體現(xiàn)了優(yōu)先存儲在內(nèi)存memoryStore中锨推,等到內(nèi)存滿了才會寫到diskStore中。
2.其次是副本邏輯公壤,代碼體現(xiàn)如下:
// If we're storing bytes, then initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
val replicationFuture = data match {
case b: ByteBufferValues if putLevel.replication > 1 =>
// Duplicate doesn't copy the bytes, but just creates a wrapper
val bufferView = b.buffer.duplicate()
Future {
// This is a blocking action and should run in futureExecutionContext which is a cached
// thread pool
replicate(blockId, bufferView, putLevel)
}(futureExecutionContext)
case _ => null
}
這里啟動一個Future的線程優(yōu)先去處理ByteBufferValues類的數(shù)據(jù)(也就是外部存儲類的數(shù)據(jù))的復制换可,其中的核心方法是replicate方法,感興趣的話可以深入了解一下厦幅。
那么IteratorValues類型的數(shù)據(jù)和ArrayValues類型的副本邏輯怎么處理呢锦担?請看以下代碼:
if (putLevel.replication > 1) {
data match {
case ByteBufferValues(bytes) =>
if (replicationFuture != null) {
Await.ready(replicationFuture, Duration.Inf)
}
case _ =>
val remoteStartTime = System.currentTimeMillis
// Serialize the block if not already done
if (bytesAfterPut == null) {
if (valuesAfterPut == null) {
throw new SparkException(
"Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
}
bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
}
replicate(blockId, bytesAfterPut, putLevel)
logDebug("Put block %s remotely took %s"
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
}
如果是ByteBufferValues類型的數(shù)據(jù),那么會對應(yīng)上面的代碼慨削,去等待那個Future線程的回傳值洞渔。
如果是另外兩種類型套媚,則首先進行序列化,然后調(diào)用replicate方法去進行復制操作磁椒。
- tellMaster將更新上報Master
if (tellMaster) {
reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
}
調(diào)用reportBlockStatus方法向master匯報更新堤瘤,最后會向BlockManagerMasterEndpoint發(fā)送UpdateBlockInfo消息,而Master會在收到消息后更新Block的元數(shù)據(jù)
存儲Block的類
BlockStore
Block存儲的抽象類浆熔,定義了接口的一些基本功能和方法:
/**
* Abstract class to store blocks.
*/
private[spark] abstract class BlockStore(val blockManager: BlockManager) extends Logging {
// 根據(jù)StorageLevel將blockId標識的Block的內(nèi)容bytes寫入系統(tǒng)
def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): PutResult
/**
* Put in a block and, possibly, also return its content as either bytes or another Iterator.
* This is used to efficiently write the values to multiple locations (e.g. for replication).
*
* @return a PutResult that contains the size of the data, as well as the values put if
* returnValues is true (if not, the result's data field can be null)
*/
// 將values寫入系統(tǒng)本辐,如果returnValues為true,需要將結(jié)果寫入PutResult
def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean): PutResult
//同上医增,只不過由Iterator變成Array
def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult
/**
* Return the size of a block in bytes.
*/
// 獲得Block的大小
def getSize(blockId: BlockId): Long
// 獲得Block的數(shù)據(jù)慎皱,返回類型ByteBuffer
def getBytes(blockId: BlockId): Option[ByteBuffer]
// 獲取Block的數(shù)據(jù),返回類型Iterator[Any]
def getValues(blockId: BlockId): Option[Iterator[Any]]
/**
* Remove a block, if it exists.
* @param blockId the block to remove.
* @return True if the block was found and removed, False otherwise.
*/
// 刪除Block叶骨,成功返回true茫多, 否則返回false
def remove(blockId: BlockId): Boolean
// 查詢是否包含某個Block
def contains(blockId: BlockId): Boolean
// 退出時清理回收資源
def clear() { }
}
BlockStore的實現(xiàn)類有三個MemoryStore、DiskStore忽刽、ExternalBlockStore天揖,分別對應(yīng)了存儲級別的內(nèi)存、磁盤和外部存儲跪帝。
MemoryStore維護了一個數(shù)據(jù)結(jié)構(gòu)今膊,是一個HashMap
private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
所有需要緩存在內(nèi)存中的數(shù)據(jù)都是通過tryToPut方法維護到這個數(shù)據(jù)結(jié)構(gòu)中,如果內(nèi)存不夠的話會釋放一些老的緩存伞剑,如果存儲級別中還有磁盤斑唬,就會調(diào)用DiskStore的putIterator寫入Disk,如果沒有黎泣,那么就不緩存這部分數(shù)據(jù)恕刘,下次需要就重新計算。
DiskStore將數(shù)據(jù)持久化到磁盤中聘裁,會以什么樣的形式存儲呢雪营?我們來看DiskStore的putIterator方法:
override def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
logDebug(s"Attempting to write values for block $blockId")
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId) //獲取文件句柄
val outputStream = new FileOutputStream(file) //創(chuàng)建流
try {
Utils.tryWithSafeFinally {
blockManager.dataSerializeStream(blockId, outputStream, values) //序列化流
} {
// Close outputStream here because it should be closed before file is deleted.
outputStream.close()
}
} catch {
case e: Throwable =>
if (file.exists()) {
if (!file.delete()) {
logWarning(s"Error deleting ${file}")
}
}
throw e
}
val length = file.length
val timeTaken = System.currentTimeMillis - startTime
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName, Utils.bytesToString(length), timeTaken))
if (returnValues) {
// Return a byte buffer for the contents of the file
val buffer = getBytes(blockId).get
PutResult(length, Right(buffer))
} else {
PutResult(length, null)
}
以上代碼中創(chuàng)建了一個文件流,序列化之后就寫入了本地的物理文件衡便,獲取文件句柄的方法為getFile献起,接著看其實現(xiàn):
def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename) //根據(jù)文件名hash值獲取文件應(yīng)該存放的層級位置
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
// Create the subdirectory if it doesn't already exist
val subDir = subDirs(dirId).synchronized {
val old = subDirs(dirId)(subDirId)
if (old != null) {
old
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists() && !newDir.mkdir()) {
throw new IOException(s"Failed to create local dir in $newDir.")
}
subDirs(dirId)(subDirId) = newDir
newDir
}
}
new File(subDir, filename) //創(chuàng)建文件句柄
}
根據(jù)BlockId的name的hash值取得文件的存放路徑,然后創(chuàng)建一個文件句柄將數(shù)據(jù)寫入物理文件镣陕,而這個物理文件的路徑可以通過spark.local.dir來進行配置谴餐,在yarn-cluster模式下,這個路徑會被yarn.nodemanager.local-dirs替換呆抑。
Storage模塊調(diào)優(yōu)
- 首先就是本篇文章開始時說的選擇存儲級別的注意事項岂嗓,盡量使用內(nèi)存,少用磁盤鹊碍,序列化和副本根據(jù)情況選擇使用厌殉。
- spark.local.dir:
磁盤存儲級別物理文件的路徑設(shè)置項食绿,盡量配置多個路徑(用逗號隔開),如有條件最好選擇SSD盤公罕。 - spark.memory.storageFraction:
可用內(nèi)存中器紧,用于Storage模塊緩存數(shù)據(jù)的占比,默認為0.5楼眷,也就是和Shuffle模塊占用內(nèi)存五五開铲汪,但是1.5版本之后,spark有一個動態(tài)內(nèi)存分配模型的功能罐柳,簡單來說就是在使用內(nèi)存的時候掌腰,Shuffle是親兒子,它可以占用分給Storage的內(nèi)存拒不歸還张吉,而Storage卻不行齿梁。
這里調(diào)優(yōu)的策略就是,根據(jù)實際情況芦拿,如果程序RDD的緩存數(shù)據(jù)集量較大士飒,而期間很少產(chǎn)生shuffle數(shù)據(jù)的話查邢,可以適當把這個參數(shù)提高蔗崎。 - 堆外內(nèi)存(off-head memory)
堆外內(nèi)存也是一種外部存儲,是spark通過調(diào)用java的unsafe相關(guān)API直接向操作系統(tǒng)要內(nèi)存扰藕,這種方式的優(yōu)點是跳過JVM的管理可以避免GC影響缓苛,缺點是需要自己來編寫內(nèi)存申請和釋放的邏輯
spark.memory.offHeap.enabled
默認為false,設(shè)置為true打開堆外內(nèi)存功能
spark.memory.offHeap.size
默認為0邓深,,打開堆外內(nèi)存功能后未桥,方可設(shè)置內(nèi)存大小,但在配置的時候需要小心內(nèi)存溢出的問題
總結(jié)
關(guān)于Storage模塊的源碼閱讀就分析到這兒芥备,閱讀Storage模塊的源碼有助于了解RDD之下冬耿,系統(tǒng)又做了哪些操作,RDD實現(xiàn)了邏輯萌壳,而Storage管理著數(shù)據(jù)亦镶。通過閱讀源碼,對于今后的問題定位和性能調(diào)優(yōu)提供了理論依據(jù)袱瓮。