本篇文章主要介紹下FLink的內(nèi)存模型硕旗,在介紹Flink內(nèi)存模型之前阴绢,我們首先學(xué)習(xí)下JVM內(nèi)存結(jié)構(gòu)
1. JVM內(nèi)存結(jié)構(gòu)
Java7 升級(jí)為 Java8的時(shí)候,JVM內(nèi)存結(jié)構(gòu)發(fā)生了改變,咱們看下區(qū)別是什么窄绒。這部分內(nèi)容原文 Java8 JVM內(nèi)存結(jié)構(gòu)
1.1 Java7 對(duì)應(yīng)的 JVM 內(nèi)存結(jié)構(gòu)
很多人愿意將方法區(qū)稱作永久代。
本質(zhì)上來(lái)講兩者并不等價(jià)积仗,僅因?yàn)镠otspot將GC分代擴(kuò)展至方法區(qū)擅编,或者說(shuō)使用永久代來(lái)實(shí)現(xiàn)方法區(qū)。在其他虛擬機(jī)上是沒(méi)有永久代的概念的羊苟。也就是說(shuō)方法區(qū)是規(guī)范塑陵,永久代是Hotspot針對(duì)該規(guī)范進(jìn)行的實(shí)現(xiàn)。
1.2 Java8 對(duì)應(yīng)的 JVM 內(nèi)存結(jié)構(gòu)
元空間MetaSpace存在于本地內(nèi)存蜡励,意味著只要本地內(nèi)存足夠令花,它不會(huì)出現(xiàn)像Java7永久代中“java.lang.OutOfMemoryError: PermGen space”這種錯(cuò)誤阻桅。
那為什么用MetaSpace代替了方法區(qū)呢?是因?yàn)橥ǔJ褂肞ermSize和MaxPermSize設(shè)置永久代的大小就決定了永久代的上限兼都,但是不知道應(yīng)該設(shè)置多大合適, 如果使用默認(rèn)值很容易遇到OOM錯(cuò)誤嫂沉。
當(dāng)使用元空間時(shí),可以加載多少類的元數(shù)據(jù)就不再由MaxPermSize控制, 而由系統(tǒng)的實(shí)際可用空間來(lái)控制扮碧。
1.3 JVM 堆內(nèi)堆外內(nèi)存什么含義趟章?
這部分內(nèi)容原文 堆內(nèi)堆外內(nèi)存
堆內(nèi)內(nèi)存
- 在JVM的這些分區(qū)中,占用內(nèi)存空間最大的一部分叫做“堆(heap)”慎王,也就是我們所說(shuō)的堆內(nèi)內(nèi)存(on-heap memory)蚓土。JVM中的“堆”主要是存放所有對(duì)象的實(shí)例。這一塊區(qū)域在java虛擬機(jī)啟動(dòng)的時(shí)候被創(chuàng)建赖淤,被所有的線程所共享蜀漆,同時(shí)也是垃圾收集器的主要工作區(qū)域,因此這一部分區(qū)域除了被叫做“堆內(nèi)內(nèi)存”以外漫蛔,也被叫做“GC堆”(Garbage Collected Heap)嗜愈。
堆外內(nèi)存
- 為了解決堆內(nèi)內(nèi)存過(guò)大帶來(lái)的長(zhǎng)時(shí)間GC停頓的問(wèn)題,以及操作系統(tǒng)對(duì)堆內(nèi)內(nèi)存不可知的問(wèn)題莽龟,java虛擬機(jī)開辟出了堆外內(nèi)存(off-heap memory)蠕嫁。堆外內(nèi)存意味著把一些對(duì)象的實(shí)例分配在Java虛擬機(jī)堆內(nèi)內(nèi)存以外的內(nèi)存區(qū)域,這些內(nèi)存直接受操作系統(tǒng)(而不是虛擬機(jī))管理毯盈。這樣做的結(jié)果就是能保持一個(gè)較小的堆剃毒,以減少垃圾收集對(duì)應(yīng)用的影響。同時(shí)因?yàn)檫@部分區(qū)域直接受操作系統(tǒng)的管理搂赋,別的進(jìn)程和設(shè)備(例如GPU)可以直接通過(guò)操作系統(tǒng)對(duì)其進(jìn)行訪問(wèn)赘阀,減少了從虛擬機(jī)中復(fù)制內(nèi)存數(shù)據(jù)的過(guò)程。
- java 在NIO 包中提供了ByteBuffer類脑奠,對(duì)堆外內(nèi)存進(jìn)行訪問(wèn)基公。
- 雖然堆外內(nèi)存本身不受垃圾回收算法的管轄,但是因?yàn)樗怯葿yteBuffer所創(chuàng)造出來(lái)的宋欺,因此這個(gè)buffer自身作為一個(gè)實(shí)例化的對(duì)象轰豆,其自身的信息(例如堆外內(nèi)存在主存中的起始地址等信息)必須存儲(chǔ)在堆內(nèi)內(nèi)存中。
2. Flink 內(nèi)存模型
Flink1.10 對(duì)Flink的內(nèi)存模型進(jìn)行了改造齿诞,咱們分開來(lái)介紹Flink1.10之前版本酸休,已經(jīng)Flink1.10之后版本
2.1 Flink1.10前的Flink內(nèi)存模型
首先看下內(nèi)存模型圖
看下flink源碼,來(lái)分析下內(nèi)存各個(gè)分區(qū)大小是怎么設(shè)置的祷杈,入口 ContaineredTaskManagerParameters#create 方法
/**
* Computes the parameters to be used to start a TaskManager Java process.
*
* @param config The Flink configuration.
* @param containerMemoryMB The size of the complete container, in megabytes.
* @return The parameters to start the TaskManager processes with.
*/
public static ContaineredTaskManagerParameters create(
Configuration config,
long containerMemoryMB,
int numSlots) {
// (1) try to compute how much memory used by container
final long cutoffMB = calculateCutoffMB(config, containerMemoryMB);
// (2) split the remaining Java memory between heap and off-heap
final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(containerMemoryMB - cutoffMB, config);
// use the cut-off memory for off-heap (that was its intention)
final long offHeapSizeMB = containerMemoryMB - heapSizeMB;
// (3) obtain the additional environment variables from the configuration
final HashMap<String, String> envVars = new HashMap<>();
final String prefix = ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
for (String key : config.keySet()) {
if (key.startsWith(prefix) && key.length() > prefix.length()) {
// remove prefix
String envVarKey = key.substring(prefix.length());
envVars.put(envVarKey, config.getString(key, null));
}
}
// done
return new ContaineredTaskManagerParameters(
containerMemoryMB, heapSizeMB, offHeapSizeMB, numSlots, envVars);
}
}
container cut-off 區(qū)域
?? - check cutoff ratio斑司,memoryCutoffRatio 默認(rèn)是0.25
?? - check min cutoff value, 默認(rèn)最小cutoff區(qū)域是600MB
?? - cutoff區(qū)域大械:containerMemoryMB * memoryCutoffRatio
?? - 這部分區(qū)域是預(yù)留內(nèi)存宿刮,RocksDB使用的native內(nèi)存互站,或者 JVM overhead都是使用這部分區(qū)域。Network buffers 區(qū)域(也就是 Off-heap 區(qū)域)
?? - 用于網(wǎng)絡(luò)傳輸(比如 shuffle糙置、broadcast)的內(nèi)存 Buffer 池云茸,屬于 Direct Memory 并由 Flink 管理。
?? - taskmanager.memory.segment-size 默認(rèn)是 32kb
?? - taskmanager.network.memory.fraction 默認(rèn)是 0.1
?? - Network buffers 區(qū)域大邪埂:( containerMemoryMB - cutoffMB ) * taskmanager.network.memory.fraction标捺,并且介于 64MB ~ 1GB之間Heap 區(qū)域
?? - Heap 區(qū)域大小: containerMemoryMB - cutoffMB - networkReservedMemory
?? - 可以使用JVM參數(shù) -Xms 和 -Xmx 來(lái)設(shè)置上下限
2.2 Flink1.10及之后版本的Flink內(nèi)存模型
首先看下內(nèi)存模型圖
看下flink源碼揉抵,來(lái)分析下內(nèi)存各個(gè)分區(qū)大小是怎么設(shè)置的亡容,入口 AbstractContainerizedClusterClientFactory#getClusterSpecification 方法,這個(gè)方法是在集群提交作業(yè)的時(shí)候被調(diào)度冤今。
@Override
public ClusterSpecification getClusterSpecification(Configuration configuration) {
checkNotNull(configuration);
// JM 內(nèi)存模型
final int jobManagerMemoryMB =
JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
configuration, JobManagerOptions.TOTAL_PROCESS_MEMORY)
.getTotalProcessMemorySize()
.getMebiBytes();
// TM 內(nèi)存模型
final int taskManagerMemoryMB =
TaskExecutorProcessUtils.processSpecFromConfig(
TaskExecutorProcessUtils
.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
configuration,
TaskManagerOptions.TOTAL_PROCESS_MEMORY))
.getTotalProcessMemorySize()
.getMebiBytes();
int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
return new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(jobManagerMemoryMB)
.setTaskManagerMemoryMB(taskManagerMemoryMB)
.setSlotsPerTaskManager(slotsPerTaskManager)
.createClusterSpecification();
}
咱們主要是分析TM的內(nèi)存模型闺兢,也就是直接看下TaskExecutorProcessUtils.processSpecFromConfig方法,該方法主要做了下面幾件事情
2.2.1. 首先初始化TM進(jìn)程內(nèi)存選項(xiàng)
對(duì)應(yīng)TaskExecutorProcessUtils.TM_PROCESS_MEMORY_OPTIONS對(duì)象戏罢,根據(jù)configure文件讀取屋谭,涉及到下面幾個(gè)參數(shù):
1.1 task 和 managed 內(nèi)存相關(guān)
?? - taskmanager.memory.task.heap.size: task heap大小,沒(méi)有默認(rèn)值龟糕。
?? - taskmanager.memory.managed.size:flink框架manage 內(nèi)存大小桐磁,沒(méi)有默認(rèn)值
1.2 Total Flink Memory 內(nèi)存
?? - taskmanager.memory.flink.size: Total Flink Memory大小,沒(méi)有默認(rèn)值讲岁。
1.3 Total Process Memory 內(nèi)存
?? - taskmanager.memory.process.size: Total Process Memory大小我擂,沒(méi)有默認(rèn)值。
1.4 Jvm Metaspace和 Overhead 內(nèi)存
?? - taskmanager.memory.jvm-metaspace.size: jvm-metaspace大小缓艳,主要存儲(chǔ)類的元數(shù)據(jù)信息校摩,默認(rèn)值 256MB
?? - taskmanager.memory.jvm-overhead.min:jvm-overhead區(qū)域的最小值,默認(rèn)值是 192 MB
?? - taskmanager.memory.jvm-overhead.max:jvm-overhead區(qū)域的最大值阶淘,默認(rèn)值是 1GB
?? - taskmanager.memory.jvm-overhead.fraction:jvm-overhead占內(nèi)存比例衙吩,默認(rèn)值是0.1
static final ProcessMemoryOptions TM_PROCESS_MEMORY_OPTIONS =
new ProcessMemoryOptions(
Arrays.asList(
TaskManagerOptions.TASK_HEAP_MEMORY,
TaskManagerOptions.MANAGED_MEMORY_SIZE),
TaskManagerOptions.TOTAL_FLINK_MEMORY,
TaskManagerOptions.TOTAL_PROCESS_MEMORY,
new JvmMetaspaceAndOverheadOptions(
TaskManagerOptions.JVM_METASPACE,
TaskManagerOptions.JVM_OVERHEAD_MIN,
TaskManagerOptions.JVM_OVERHEAD_MAX,
TaskManagerOptions.JVM_OVERHEAD_FRACTION));
2.2.2. 接下來(lái)看下FLink內(nèi)存模型,內(nèi)存大小怎么計(jì)算的
如果指定了Total Process Memory大小溪窒,可以接下來(lái)看 ProcessMemoryUtils#deriveProcessSpecWithTotalProcessMemory方法
根據(jù)設(shè)置的Total Process Memory大小坤塞,來(lái)計(jì)算其他區(qū)域內(nèi)存大小
(1) 內(nèi)存第一部分: Jvm Metaspace大小
?? - 如果在conf文件中設(shè)置了taskmanager.memory.jvm-metaspace.size,就按照設(shè)置的來(lái)霉猛,否則走默認(rèn)值 256MB
(2) 內(nèi)存第二部分: Jvm Overhead 大小
?? - 根據(jù) taskmanager.memory.jvm-overhead.fraction比例來(lái)計(jì)算,OverheadMemorySize = totalProcessMemorySize * taskmanager.memory.jvm-overhead.fraction
如果珠闰,OverheadMemorySize正好介于taskmanager.memory.jvm-overhead.min(192MB)和 taskmanager.memory.jvm-overhead.max(1GB)之間惜浅,那取值就是OverheadMemorySize
否則,if OverheadMemorySize > taskmanager.memory.jvm-overhead.max 伏嗜,那取值就是taskmanager.memory.jvm-overhead.max坛悉。
if OverheadMemorySize < taskmanager.memory.jvm-overhead.min 伐厌,那取值就是taskmanager.memory.jvm-overhead.min
(3) 內(nèi)存第三部分: Total Flink Memory 大小
?? - totalFlinkMemorySize = totalProcessMemorySize - jvmMetaspaceSize - jvmOverheadSize
2.2.3. Total Flink Memory 內(nèi),各個(gè)分區(qū)大小計(jì)算
Total Flink Memory中又涉及到好幾塊區(qū)域裸影,分別來(lái)看下計(jì)算規(guī)則挣轨,都是基于totalFlinkMemorySize來(lái)計(jì)算的。對(duì)應(yīng)TaskExecutorFlinkMemoryUtils#deriveFromTotalFlinkMemory方法轩猩,該方法主要做了下面幾件事情:
- 獲取frameworkHeap大小卷扮,可以通過(guò)taskmanager.memory.framework.heap.size參數(shù)修改,默認(rèn)值是128MB
- 獲取frameworkOffHeap大小均践,可以通過(guò)taskmanager.memory.framework.off-heap.size參數(shù)修改晤锹,默認(rèn)值是128MB
- 獲取taskOffHeap大小,可以通過(guò)taskmanager.memory.task.off-heap.size參數(shù)修改彤委,默認(rèn)值是0
接下來(lái)還有taskHeapMemory鞭铆,networkMemory,managedMemory需要設(shè)置焦影,這里會(huì)有if-else邏輯车遂,第一種情況:如果明確指定了taskHeapMemory大小, else 是第二種情況斯辰,接下來(lái)咱們按照else這個(gè)分支分析下 taskHeapMemory舶担,networkMemory,managedMemory這三塊區(qū)域的內(nèi)存大小椒涯。
- managedMemorySize 大小
- 如果指定了taskmanager.memory.managed.size柄沮,按照設(shè)置的來(lái)
- 否則按照taskmanager.memory.managed.fraction,默認(rèn)值是0.4废岂,那取值就是totalFlinkMemorySize * taskmanager.memory.managed.fraction
- Managed Memory托管內(nèi)存由Flink管理祖搓,以native內(nèi)存的方式進(jìn)行分配,使用的是off-heap堆外內(nèi)存湖苞。以下場(chǎng)景會(huì)用到Managed Memory
?? - Streaming Job 的 RocksDB 使用這部分內(nèi)存
?? - Batch Job 使用 Managed Memory 進(jìn)行 sort/hash table
?? - Streaming Job 或者 Batch Job 執(zhí)行Python的自定義UDF使用該內(nèi)存
- networkMemorySize 大小
- 如果設(shè)置了taskmanager.memory.network.min(64MB)拯欧,taskmanager.memory.network.max(1GB),taskmanager.memory.network.fraction(0.1)其中的任意一個(gè)财骨,那么networkMemorySize = taskmanager.network.numberOfBuffers(2048) * taskmanager.memory.segment-size(32KB)
- 否則 networkMemorySize = totalFlinkMemorySize * taskmanager.memory.network.fraction(0.1)镐作,不過(guò)networkMemorySize要介于taskmanager.memory.network.min(64MB)與 taskmanager.memory.network.max(1GB)之間
- taskHeapMemorySize 大小
- taskHeapMemorySize = totalFlinkMemorySize - frameworkHeapMemorySize - frameworkOffHeapMemorySize - taskOffHeapMemorySize - managedMemorySize - networkMemorySize
至此,F(xiàn)link內(nèi)存模型已經(jīng)介紹完成隆箩。
3. 指定TM內(nèi)存模型的方式
整個(gè)TM內(nèi)存模型可以通過(guò)三種方式來(lái)指定
- 通過(guò)指定 taskmanager.memory.task.heap.size 和 taskmanager.memory.managed.size來(lái)確定
- 通過(guò)指定 taskmanager.memory.flink.size 也就是 Total Flink Memory大小
- 通過(guò)指定 * taskmanager.memory.process.size* 也就是 Total Process Memory大小
對(duì)應(yīng)源碼ProcessMemoryUtils#memoryProcessSpecFromConfig方法
public CommonProcessMemorySpec<FM> memoryProcessSpecFromConfig(Configuration config) {
if (options.getRequiredFineGrainedOptions().stream().allMatch(config::contains)) {
// all internal memory options are configured, use these to derive total Flink and
// process memory
return deriveProcessSpecWithExplicitInternalMemory(config);
} else if (config.contains(options.getTotalFlinkMemoryOption())) {
// internal memory options are not configured, total Flink memory is configured,
// derive from total flink memory
return deriveProcessSpecWithTotalFlinkMemory(config);
} else if (config.contains(options.getTotalProcessMemoryOption())) {
// total Flink memory is not configured, total process memory is configured,
// derive from total process memory
return deriveProcessSpecWithTotalProcessMemory(config);
}
return failBecauseRequiredOptionsNotConfigured();
}
4. 總結(jié)
本文介紹了Flink1.9 和 Flink1.12的內(nèi)存模型以及各個(gè)區(qū)域的計(jì)算方法该贾。
簡(jiǎn)單總結(jié)下Flink1.10之后的內(nèi)存模型:
本質(zhì)上 Java 應(yīng)用使用的內(nèi)存(不包括 JVM 自身的開銷)可以分為三類:
?? - JVM 堆內(nèi)存:Heap
?? - 不在 JVM 堆上但受到 JVM 管理的內(nèi)存:Direct
?? - 完全不受 JVM 管理的內(nèi)存:NativeDirect 內(nèi)存是直接映射到 JVM 虛擬機(jī)外部的內(nèi)存空間,但是其用量又受到 JVM 的管理和限制捌臊,從這個(gè)角度來(lái)講杨蛋,認(rèn)為它是 JVM 內(nèi)存或者非 JVM 內(nèi)存都是講得通的。
Flink UI中 metric 加在一起為什么不是 TM 的總內(nèi)存?
?? 一方面是因?yàn)?Native 內(nèi)存沒(méi)有被算進(jìn)去(也就是 Cut-off 的主要部分)逞力,因?yàn)?Native 是不受 JVM 管理的曙寡,MXBean 完全不知道它的使用情況。另一方面寇荧,JVM 自身的開銷也并不是都被覆蓋到了举庶,比如對(duì)于棧空間揩抡,JVM 只能限制每個(gè)線程的椈Ы模空間有多大,但是不能限制線程的數(shù)量捅膘,因此總的椞砘觯空間大小也是不受控制的,也沒(méi)有通過(guò) Metric 來(lái)體現(xiàn)寻仗。Window相關(guān)的算子會(huì)將窗口內(nèi)的數(shù)據(jù)作為狀態(tài)保存在內(nèi)存里刃泌,等待窗口觸發(fā)再進(jìn)行計(jì)算。想問(wèn)一下這里的狀態(tài)是存在哪種類型的內(nèi)存里面署尤?
??這個(gè)應(yīng)該是存在 state 里的耙替,具體用哪種類型的內(nèi)存取決于你的 State Backend 類型。MemoryStateBackend/FsStateBackend 用的是 Heap 內(nèi)存曹体,RocksDBStateBackend 用的是 Native 內(nèi)存俗扇,也就是 1.10 中的 Manage Memory。flink 1.10里將RocksDBStateBackend改為使用managed memory箕别,統(tǒng)一使用 offheap 內(nèi)存铜幽,您的解答里說(shuō)的是native內(nèi)存,不知道是不是您說(shuō)的“完全不受 JVM 管理的內(nèi)存:Native”這個(gè)串稀?
?? 是的不是太清楚offheap和direct以及native的關(guān)系是怎樣的除抛?
?? Flink 配置項(xiàng)中的 task/framework offheap,是包括了 direct 和 native 內(nèi)存算在一起的母截,也就是說(shuō)用戶不需要關(guān)心具體使用的是 direct 還是 native到忽。Overhead 主要涉及到哪部分信息存儲(chǔ)?
?? 使用的native內(nèi)存清寇,主要存儲(chǔ)線程棧喘漏,code cache, garbage collection space等华烟。為什么本地起的 Standalone Flink翩迈,為啥 UI 上展示的 Heap 會(huì)超過(guò)設(shè)置的 taskmanager.memory.process.size 的值?
?? 這主要是因?yàn)榭梗覀冎会槍?duì) Metaspace 設(shè)置了 JVM 的參數(shù)负饲,對(duì)于其他 Overhead 并沒(méi)有設(shè)置 JVM 的參數(shù)搅方,也并不是所有的Overhead 都有參數(shù)可以控制(比如棧空間)绽族。
Non-Heap Max 是 JVM 自己決定的,所以通常會(huì)比 Flink 配置的 Metaspace + Overhead 要大衩藤。
可以這樣理解吧慢,F(xiàn)link 將整個(gè) TM 的內(nèi)存預(yù)算劃分給了不同的用途,但是并不能嚴(yán)格保證各部分的內(nèi)存都不超用赏表,只能是 Best Effort检诗。
其中,Managed瓢剿、Network逢慌、Metaspace 是嚴(yán)格限制的,Off-Heap间狂、Overhead 是不能完全嚴(yán)格限制的攻泼,Heap 整體是嚴(yán)格限制的但是 Task/Framework 之間是非嚴(yán)格的。