歡迎關(guān)注我的微信公眾號:FunnyBigData
在《Spark 內(nèi)存管理的前世今生(上)》中肺孤,我們介紹了 UnifiedMemoryManager 是如何管理內(nèi)存的衰絮。然而,UnifiedMemoryManager 是 MemoryManager 而不是 MemoryAllocator 或 MemoryConsumer咆畏,不進行實質(zhì)上的內(nèi)存分配和使用燃异,只是負責(zé)可以分配多少 storage 或 execution 內(nèi)存給誰,記錄各種元數(shù)據(jù)信息夯膀。
這篇文章會關(guān)注 storage 的堆內(nèi)堆外內(nèi)存到底是在什么樣的情況下诗充,以什么樣的形式分配以及是怎么使用的。
緩存 RDD 是 storage 內(nèi)存最核心的用途诱建,那我們就來看看緩存 RDD 的 partition 是怎樣分配蝴蜓、使用 storage 內(nèi)存的。
可以以非序列化或序列化的形式緩存 RDD俺猿,兩種情況有所不同茎匠,我們先來看看非序列化形式的。
1: 緩存非序列化 RDD(只支持 ON_HEAP)
緩存非序列化 RDD 通過調(diào)用
MemoryStore#putIteratorAsValues[T](blockId: BlockId,
values: Iterator[T],
classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long]
來緩存一個個 partition 押袍。該函數(shù)緩存一個 partition(一個 partition 對應(yīng)一個 block) 數(shù)據(jù)至 storage 內(nèi)存诵冒。其中:
- blockId:緩存到內(nèi)存后的 block 的 blockId
- values:對象類型的迭代器,對應(yīng)一個 partition 的數(shù)據(jù)
整個流程還可以細化為以下兩個子流程:
- unroll block:展開迭代器
- store unrolled to storage memory:將展開后的數(shù)據(jù)存入 storage 內(nèi)存
1-1: unroll block
一圖勝千言谊惭,我們先來看看 unroll 的流程
我們先對上圖中的流程做進一步的說明汽馋,然后再簡潔的描述下整個過程以加深印象
1-1-1: 為什么申請初始 unroll 內(nèi)存不成功還繼續(xù)往下走?
初始的用于 unroll 的內(nèi)存大小由 spark.storage.unrollMemoryThreshold
控制圈盔,默認為 1M惭蟋。繼續(xù)往下走主要由兩個原因:
- 由于初始 unroll 大小是可以設(shè)置的,如果不小心設(shè)置了過大药磺,比如 1G告组,這時申請這么大的 storage 內(nèi)存很可能失敗,但 block 的真正大小可能遠遠小于該值癌佩;即使該值設(shè)置的比較合理木缝,block 也很可能比初始 unroll 大小要小
- 對于
MemoryStore#putIteratorAsValues
,即使 block 大小比初始 unroll 大小要大围辙,甚至最終都沒能完整的把 values unroll 也是有用的我碟,這個將在后文展開,這里先請關(guān)注返回值new PartiallyUnrolledIterator(...)
1-1-2: 關(guān)于 vector: SizeTrackingVector
如流程圖中所示姚建,在 partition 對應(yīng)的 iterator 不斷被展開的過程中矫俺,每展開獲取一個記錄,就加到 vector 中,該 vector 為 SizeTrackingVector 類型厘托,是一個只能追加的 buffer(內(nèi)部通過數(shù)組實現(xiàn))友雳,并持續(xù)記錄自身的估算大小。從這里也可以看出铅匹,unroll 過程使用的內(nèi)存都是 ON_HEAP 的押赊。
整個展開過程,說白了就是盡量把更多的 records 塞到這個 vector 中包斑。因為所有展開的 records 都存入了 vector 中流礁,所以從圖中可以看出,每當在計算 vector 的估算 size 后罗丰,就會與累計已申請的 unroll 內(nèi)存大小進行比較神帅,如果 vector 的估算 size 更大,說明申請的 unroll 內(nèi)存不夠萌抵,就會觸發(fā)申請更多的 unroll 內(nèi)存(具體是申請 vector 估算大小的 1.5 倍減去已申請的 unroll 總內(nèi)存)找御,這:
- 一是為了接下去的展開操作申請 unroll 內(nèi)存
- 二也是為了盡量保障向 MemoryManager 申請的 unroll 內(nèi)存能稍大于真實消耗的 unroll 內(nèi)存,以避免 OOM(若向 MemoryManager 申請的 unroll 內(nèi)存小于真實使用的谜嫉,那么就會導(dǎo)致 MemoryManager 認為有比真實情況下更多的空閑內(nèi)存萎坷,如果使用了這部分不存在的空閑內(nèi)存就會 OOM)
如圖所示,要符合一定的條件才 check unroll memory 是否夠用沐兰,也就是 vector 計算其估算大小并判斷是否大于已申請的 unroll memory size哆档。這里是每展開 16 條記錄進行一次檢查,設(shè)置這樣的間隔是因為每次估算都需要耗費數(shù)毫秒住闯。
1-1-3: 繼續(xù)還是停止 unroll 瓜浸?
每展開一條記錄后,都會判斷是否還需要比原、還能夠繼續(xù)展開插佛,當 values 還有未展開的 record 且還有 unroll 內(nèi)存來展開時就會繼續(xù)展開,將 record 追加到 vector 中量窘。
需要注意的是雇寇,只有當 keepUnrolling 為 true 時(不管 values.hasNext 是否為 true)才會進入 store unrolled to storage memory
流程。這樣的邏輯其實有些問題蚌铜,我們先來看看其實現(xiàn)代碼:
while (values.hasNext && keepUnrolling) {
vector += values.next()
if (elementsUnrolled % 16 == 0) {
// currentSize 為 vector 的估算大小
val currentSize = vector.estimateSize()
if (currentSize >= memoryThreshold) {
// 申請 size 為 amountToRequest 的估算大小锨侯,memoryGrowthFactor 為 1.5
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP)
}
memoryThreshold += amountToRequest
}
}
elementsUnrolled += 1
}
if (keepUnrolling) {
// store unrolled to storage memory
}
此時,假設(shè) keepUnrolling 為 true冬殃, values.hasNext
為 true囚痴,也就是還有一些記錄沒有展開(在假設(shè)剩余未展開的 records 總大小為 1M),進入循環(huán)后审葬,展開一條記錄追加到 vector 中后深滚,恰好 elementsUnrolled % 16 == 0
且 currentSize >= memoryThreshold
奕谭。根據(jù) val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
計算出要為了展開剩余 records 本次要申請的 unroll 內(nèi)存大小為 amountToRequest,大小為 5M痴荐,這時候?qū)嶋H上最大能申請的 unroll 內(nèi)存大小為 3M血柳,那么申請就失敗了,keepUnrolling 為 false蹬昌,此時進入下一次循環(huán)判斷就失敗了混驰,整個展開過程也就失敗了攀隔,但事實上剩余能申請的 unroll 內(nèi)存大小是足以滿足剩余的 records 的皂贩。
一個簡單的治標不治本的改進方案是將 memoryGrowthFactor 的值設(shè)置的更小(當前為 1.5)昆汹,該值越小發(fā)生上述情況的概率越小明刷,并且,這里的申請內(nèi)存其實只是做一些數(shù)值上的狀態(tài)更新满粗,并不會發(fā)生耗資源或耗時的操作辈末,所以多申請幾次并不會帶來什么性能下降。
回到當前的實現(xiàn)中來映皆,當循環(huán)結(jié)束挤聘,若 keepUnrolling 為 true ,values 一定被全部展開捅彻;若 keepUnrolling 為 false(存在展開最后一條 record 后 check 出 vector 估算 size 大于已申請 unroll 總內(nèi)存并申請失敗的情況)组去,則無論 values 有沒有被全部展開,都說明能申請到的總 unroll 內(nèi)存是不足以展開整個 values 的步淹,這就意味著緩存該 partition 至內(nèi)存失敗从隆。
需要注意的是,緩存到內(nèi)存失敗并不代表整個緩存動作是失敗的缭裆,根據(jù) StorageLevel 還可能會緩存到磁盤键闺。
1-2: store unrolled to storage memory
1-2-1: 真正的 block 即 DeserializedMemoryEntry
這一流程說白了就是將 unroll 的總內(nèi)存占用轉(zhuǎn)化為 storage 的內(nèi)存占用,事實上真正保存 records 的 vector 中的數(shù)組也被移到了 entry 中(引用傳遞)澈驼。entry 是這樣被構(gòu)造的:
private case class DeserializedMemoryEntry[T](
value: Array[T],
size: Long,
classTag: ClassTag[T]) extends MemoryEntry[T] {
val memoryMode: MemoryMode = MemoryMode.ON_HEAP
}
val arrayValues = vector.toArray
vector = null
val entry = new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)
entry 的成員 value 為 vector 中保存 records 的數(shù)組辛燥,entry 的 size 成員為該數(shù)組的估算大小。DeserializedMemoryEntry 繼承于 MemoryEntry缝其,MemoryEntry 的另一個子類是 SerializedMemoryEntry挎塌,對應(yīng)的是一個序列化的 block。在 MemoryStore 中氏淑,以 entries: LinkedHashMap[BlockId, MemoryEntry[_]]
的形式維護 blockId 及序列化或非序列化的 block 的映射勃蜘。
從這里,你也可以看出假残,當前緩存非序列化的 RDD 只能使用 ON_HEAP 內(nèi)存缭贡。
1-2-2: unroll 內(nèi)存的多退少補
這之后炉擅,再次使用 array[record] 的估算大小與 unroll 總內(nèi)存進行比較:
- 若前者較大,則計算要再申請多少 unroll 內(nèi)存(兩者之差)并申請之阳惹,申請的結(jié)果為 acquireExtra
- 若后者較大谍失,則說明申請了在 unroll 過程中申請了過多的內(nèi)存,則釋放多出來的部分(兩者之差)莹汤。會出現(xiàn)多出來的情況有兩點原因:
- 這次 array[record] 的估算結(jié)果更為準確
- 在 unroll 過程中由于每次申請的內(nèi)存是
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
快鱼,這樣的算法是容易導(dǎo)致申請多余實際需要的
1-2-3: transform unroll to storage
將 unroll 內(nèi)存占用轉(zhuǎn)為 storage 內(nèi)存占用實現(xiàn)如下:
def transferUnrollToStorage(amount: Long): Unit = {
// Synchronize so that transfer is atomic
memoryManager.synchronized {
releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)
assert(success, "transferring unroll memory to storage memory failed")
}
}
可以看到,這是一個 memoryManager 級別的同步操作纲岭,不用擔(dān)心剛被 release 的 unroll 內(nèi)存在占用等量的 storage 內(nèi)存之前會在其他地方被占用抹竹。
在 UnifiedMemoryManager
的內(nèi)存劃分中,unroll 內(nèi)存其實就是 storage 內(nèi)存止潮,所以上面代碼所做的事看起來沒什么意義窃判,先讓 storage used memory 減去某個值,再加上該值喇闸,結(jié)果是沒變袄琳。那為什么還要這么做呢?我想是為了 MemoryStore 和 MemoryManager 的解耦燃乍,對于 MemoryStore 來說其并不知道在 MemoryManager 中 unroll 內(nèi)存就是 storage 內(nèi)存唆樊,如果之后 MemoryManager 不是這樣實現(xiàn)了,對 MemoryStore 也不會有影響刻蟹。
1-2-4: enoughStorageMemory 及結(jié)果
在這一流程的最后逗旁,會根據(jù) enoughStorageMemory 為 true 后 false,返回不同的結(jié)果座咆。只有當以上流程中痢艺,partition 被完全展開并成功存放到 storage 內(nèi)存中 enoughStorageMemory 才為 true;即使partition 全部展開介陶,并生成了 entry堤舒,如果最終能申請的最多的 storage 內(nèi)存還是小于 array[record] 的估算 size,整個 cache block to memory 的操作也是失敗的哺呜,此時的 enoughStorageMemory 為 false舌缤。
如果最終結(jié)果是成功的,返回值為 array[record] 的估算 size某残。如果是失敗的国撵,包括 unroll 失敗,將返回 PartiallyUnrolledIterator 對象實例:
class PartiallyUnrolledIterator[T](
memoryStore: MemoryStore,
memoryMode: MemoryMode,
unrollMemory: Long,
private[this] var unrolled: Iterator[T],
rest: Iterator[T])
extends Iterator[T]
該實例(也是個迭代器)由部分已經(jīng)展開的迭代器(unrolled)以及剩余未展開的迭代器(rest)組合而成玻墅,調(diào)用者可根據(jù) StorageLevel 是否還包含 Disk 級別來決定是 close 還是使用該返回值將 block 持久化到磁盤(可以避免部分的 unroll 操作)介牙。
2: 緩存序列化 RDD(支持 ON_HEAP 和 OFF_HEAP)
有了上面分析緩存非序列化 RDD 至內(nèi)存的經(jīng)驗,再來看下面的緩存序列化 RDD 至內(nèi)存的圖會發(fā)現(xiàn)有一些相似澳厢,也有一些不同环础。在下面的流程圖中囚似,包含了 unroll 過程和 store block to storage memory 過程。為了方便分析线得,我將整個流程分為三大塊:
- 紅框部分:初始化 allocator饶唤、bbos、serializationStream
- 灰框部分:展開 values 并 append 到 serializationStream 中
- 籃框部分:store block to storage memory
2-1: 初始化 allocator贯钩、bbos募狂、serializationStream
allocator: Int => ByteBuffer
是一個函數(shù)變量,用來分配內(nèi)存,它是這樣被構(gòu)造的:
val allocator = memoryMode match {
case MemoryMode.ON_HEAP => ByteBuffer.allocate _
case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
}
- 當 MemoryMode 為 ON_HAP 時,allocator 分配的是 HeapByteBuffer 形式的堆上內(nèi)存
- 當 MemoryMode 為 OFF_HEAP 時,allocator 分配的是 DirectByteBuffer 形式的堆外內(nèi)存。需要特別注意的是唇兑,DirectByteBuffer 本身是堆內(nèi)的對象,這里的堆外是指其指向的內(nèi)存是堆外的
HeapByteBuffer 通過調(diào)用 new 分配內(nèi)存狐粱,而 DirectByteBuffer 最終調(diào)用 C++ 的 malloc 方法分配拒啰,在分配和銷毀上 HeapByteBuffer 要比 DirectByteBuffer 稍快。但在網(wǎng)絡(luò)讀寫和文件讀寫方面胯舷,DirectByteBuffer 比 HeapByteBuffer 更快(具體原因請自行調(diào)研刻蚯,不是本文重點),這對經(jīng)常會被網(wǎng)絡(luò)讀寫的 block 來說很有意義桑嘶。
另外炊汹,HeapByteBuffer 指向的內(nèi)存受 GC 管理;而 DirectByteBuffer 指向的內(nèi)存不受 GC 管理逃顶,可減小 GC 壓力讨便。DirectByteBuffer 指向的內(nèi)存會在兩種情況下會釋放:
- remove 某個 block 時,會通過 DirectByteBuffer 的 cleaner 來釋放其指向的內(nèi)存
- 當 BlockManager stop 時以政,會 clear 整個 MemoryStore 中的所有 blocks霸褒,這時會釋放所有的 DirectByteBuffers 及其指向的內(nèi)存
接下來是:
val bbos = new ChunkedByteBufferOutputStream(initialMemoryThreshold.toInt, allocator)
ChunkedByteBufferOutputStream 包含一個 chunks: ArrayBuffer[ByteBuffer]
,該數(shù)組中的 ByteBuffer 通過 allocator 創(chuàng)建盈蛮,用于真正存儲 unrolled 數(shù)據(jù)废菱。再次說明,如果是 ON_HEAP抖誉,這里的 ByteBuffer 是 HeapByteBuffer殊轴;而如果是 OFF_HEAP,這里的 ByteBuffer 則是 DirectByteBuffer袒炉。
bbos 之后將用于建構(gòu)構(gòu)造 serializeStream: SerializationStream
旁理,records 將一條條寫入 serializeStream,serializeStream 最終會將 records 寫入 bbos 的 chunks: ArrayBuffer[ByteBuffer]
中我磁,一條 record 對應(yīng) ByteBuffer 元素孽文。
2-2: 展開 values 并 append 到 serializationStream 中
具體展開的流程與 “緩存非序列化 RDD” 類似(serializationStream.writeObject(values.next())(classTag)
也在上一小節(jié)進行了說明)淹接,最大的區(qū)別是在沒展開一條 record 都會調(diào)用 reserveAdditionalMemoryIfNecessary()
,實現(xiàn)如下
def reserveAdditionalMemoryIfNecessary(): Unit = {
if (bbos.size > unrollMemoryUsedByThisBlock) {
val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
}
}
}
由于是序列化的數(shù)據(jù)叛溢,這里的 bbos.size 是準確值而不是估算值塑悼。reserveAdditionalMemoryIfNecessary 說白了就是計算真實已經(jīng)占用的 unroll 內(nèi)存(bbos.size)比已經(jīng)申請的 unrolll 總內(nèi)存 size 大多少,并申請相應(yīng) MemoryMode 的 unroll 內(nèi)存來使得申請的 unroll 總大小和實際使用的保持一致楷掉。如果申請失敗厢蒜,則 keepUnrolling 為 false,那么緩存該非序列化 block 至內(nèi)存就失敗了烹植,將返回 PartiallySerializedBlock 類型對象斑鸦。
在完整展開后,會再調(diào)用一次 reserveAdditionalMemoryIfNecessary草雕,以最終確保實際申請的 unroll 內(nèi)存和實際占用的大小相同巷屿。
2-3: store block to storage memory
這里將 bbos 中的 ArrayBuffer[ByteBuffer]
轉(zhuǎn)化為 ChunkedByteBuffer 對象,ChunkedByteBuffer 是只讀的物理上是以多塊內(nèi)存組成(即 Array[ByteBuffer])墩虹。
再以該 ChunkedByteBuffer 對象構(gòu)造真正的序列化的 block嘱巾,即 entry: SerializedMemoryEntry
,構(gòu)造時同樣會傳入 MemoryMode诫钓。
最后將 entry 加到 entries: LinkedHashMap[BlockId, MemoryEntry[_]]
中旬昭。
與 “緩存非序列化 RDD” 相同,如果緩存序列化 block 至內(nèi)存失敗了菌湃,根據(jù) StorageLevel 還有機會緩存到磁盤上问拘。
總結(jié)
上篇文章主要講解 MemoryManager 是怎樣管理內(nèi)存的,即如何劃分內(nèi)存區(qū)域惧所、分配踢除策略骤坐、借用策略等,并不涉及真正的內(nèi)存分配下愈,只做數(shù)值上的管理纽绍,是處于中心的storage 內(nèi)存調(diào)度 “調(diào)度”。而本文則分析了在最重要的緩存非序列化/序列化 RDD 至內(nèi)存的場景下驰唬,storage 內(nèi)存真正是如何分配使用的顶岸,即以什么樣的 MemoryMode、什么樣的分配邏輯及方式叫编,還介紹了 block 在 memory 中的表現(xiàn)形式等辖佣。