在上一篇文章Spark內(nèi)存模型初探(1)-Storage/Execution Memory的使用中酬蹋,我們初步解析了一下Storage/Execution Memory的使用盼理。最后我們也留下了幾個問題嚷节,等待我們解答毅厚。這些問題更多的集中在User Memory上独郎。
而這段時間果漾,經(jīng)過思索,探索虹曙,以及重新閱讀Spark Memory Management迫横,終于解決了大部分。
但是酝碳,本文其實更多的是作者的猜測矾踱,以及配合Spark Memory Management進行驗證。并沒有一個非常嚴謹疏哗,科學的方法來驗證本文的全部猜測呛讲。所以希望讀者在讀本文的時候,保持懷疑的目光。
UserMemory是怎樣被使用的?當我們在RDD.map(function)中的function中初始化新的對象時圣蝎,是在哪部分內(nèi)存被初始化的刃宵?是不是就是UserMemory?
這個問題的答案,其實在原文里就已經(jīng)有了徘公。但是由于當時讀那篇文章的時候,對Spark內(nèi)存并不了解哮针,所以這里并沒有讀懂关面。
原文中有這么一段:
This is the memory pool that remains after the allocation of Spark Memory, and it is completely up to you to use it in a way you like. You can store your own data structures there that would be used in RDD transformations. For example, you can rewrite Spark aggregation by using mapPartitions transformation maintaining hash table for this aggregation to run, which would consume so called User Memory
要說驗證么,我實在是沒想出來辦法驗證十厢。因為從Spark代碼中等太,看不到任何跟User Memory相關的內(nèi)容。而我們知道蛮放,Spark內(nèi)存分為三部分:Reserved Memory, User Memory, Spark Memory(Storage/Execution Memory)缩抡。我們在上篇文章也測試了,function
中初始化新的對象時包颁,是不會在Spark Memory中分配的瞻想,更不會在Reserved Memory,所以可能的地方就只有在User Memory了娩嚼。
如果我在function
里就是要初始化很多個對象蘑险,超過了這個User Memory的大小的話,Spark會怎樣做?
先看原文:
its completely up to you what would be stored in this RAM and how, Spark makes completely no accounting on what you do there and whether you respect this boundary or not. Not respecting this boundary in your code might cause OOM error.
我簡單寫了一點測試代碼岳悟,來測試這個問題:
package com.hypers.spark
import org.apache.spark.{SparkConf, SparkContext}
/**
*
* Reading:
* Task max available memory is executor heap.
*
* And this program will cause OOM because:
* User Memory just 200M, but we allocate an Array[Byte] whose size is 500MB
*
**/
object TestTaskAvailableMemory {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setMaster("spark://localhost:7077")
.setAppName("TestSparkCanRun")
val sparkContext = new SparkContext(sparkConf)
val rdd = sparkContext.parallelize(Seq(1))
rdd.map {
item => {
val line = Array.fill[Byte](489354548)(0)
(item, line)
}
}.repartition(1).foreach(println(_))
}
}
這是會導致OOM的佃迄。原因在代碼的注釋里已經(jīng)寫了。
另外贵少,超過User Memory并不一定會導致OOM『乔危現(xiàn)在User Memory的大小大概是200MB左右,我分配一個300MB的Array[Byte]滔灶,是沒問題的普碎。
我使用的JVM參數(shù)是-Xmx1g -XX:+UseSerialGC -XX:-UseAdaptiveSizePolicy -XX:PretenureSizeThreshold=10000000
】砥可以看到随常,超過10M的對象,就要直接在老年代分配了萄涯⌒鞣眨可我測試時,老年代的大小是600MB左右涝影。已用100MB枣察。所以這個老年代沒有足夠的空間來進行分配。所以會出現(xiàn)OOM。
為什么不直接在新生代測試呢序目?因為新生代的大小共有300MB左右臂痕,Eden:Survivor1:Survivor2=240MB:30MB:30MB。而我們的User Memory就已經(jīng)200MB+了猿涨,達不到測試的目的握童。
從這兒我們也能看到,其實Spark中叛赚,Storage/Execution Memory的大小澡绩,不會被User Memory擠壓。從Spark的源代碼中俺附,我們就能看到肥卡,它只是一個數(shù)字,是MemoryManager
(具體類是UnifiedUserMemory
或者StaticUserMemory
)初始化時就提供的事镣,它并不會在運行時動態(tài)獲取Java Heap的可用內(nèi)存大小步鉴,進而自動伸縮。
另外璃哟,Storage/Exeuction Memory也不是說氛琢,你Java Heap在我Spark啟動的時候,就給我把Storage/Execution Memory這些內(nèi)存分配好了沮稚,只有我能用艺沼,其它人不能用。它只是申明蕴掏,我需要多少Storage/Execution Memory障般,你User Memory要是用多了,我Storage/Execution Memory到時候用的時候盛杰,你要是給我騰不出來挽荡,那咱倆同歸于盡。
Shuffle時即供,Reducer端接收到的數(shù)據(jù)定拟,是在哪個部分分配的?是不是就是UserMemory?
我認為是在User Memory分配的逗嫡。但是由于Shuffle是ExternalShuffle
青自,所以并不會占用過多的內(nèi)存,導致User Memory過大出現(xiàn)OOM驱证。
Spark的內(nèi)存模型延窜,跟Java內(nèi)存模型之間,是什么關系?
假設我們的User Memory是300MB抹锄,而新生代是200MB逆瑞,并且沒有啟用超過閾值就自動在老年代進行分配的機制荠藤。
那我們?nèi)绻赨ser Memory中分配一個250MB的對象,在這種情況下获高,新生代根本就放不下這個對象哈肖,所以即使我們看到User Memory有300MB可用,實際上也不能分配超過200MB的大對象念秧。
其它的Storage/Execution Memory淤井,跟這個道理類似。
疑問
我們可以看到出爹,由于Java內(nèi)存模型的緣故庄吼,我們可能無形之間碰到一些莫名其妙的坑。
而Spark又支持off-heap Memory严就,以及Tungsten,那使用這些又如何呢器罐?有什么優(yōu)缺點呢梢为?