前言
參考 : <Flink內(nèi)核原理與實(shí)踐>
1. 一個(gè)java對(duì)象內(nèi)存大小
java所有數(shù)據(jù)類型對(duì)應(yīng)的字節(jié)大小
類型 | 大小(byte) | 說明 |
---|---|---|
Long | 8 | |
Double | 8 | |
Int | 4 | |
Float | 4 | |
Char | 2 | |
Short | 2 | |
Byte | 1 | |
Boolean | 1 | |
Class Point | 8 | 保存對(duì)象的引用指針,指針壓縮后為4個(gè)字節(jié) |
Mark Word | 8 | 保存運(yùn)行時(shí)的數(shù)據(jù) : hash,鎖狀態(tài)等. |
Array length | 4 | 如果對(duì)象是數(shù)組需要額外的4個(gè)字節(jié)記錄數(shù)組長(zhǎng)度 |
上面是按照64位的,如果是32位有所不同
java對(duì)象的組成 : 對(duì)象頭,實(shí)例數(shù)據(jù),對(duì)齊部分
類型 | 說明 |
---|---|
對(duì)象頭 | 由Markword和類指針組成,運(yùn)行時(shí)存儲(chǔ)對(duì)象運(yùn)行時(shí)數(shù)據(jù){hash code,gc年齡,鎖標(biāo)志等..},32系統(tǒng)頭大小8byte,64系統(tǒng)為16byte開啟指針壓縮為12byte |
實(shí)例數(shù)據(jù) | 當(dāng)前對(duì)象中的實(shí)例字段,有上面的數(shù)據(jù)類型組成 |
對(duì)齊 | JVM要求對(duì)象大小比須是8的倍數(shù),為了使對(duì)象達(dá)到8的倍數(shù)而補(bǔ)充的數(shù)據(jù) |
JVM關(guān)于壓縮指針的參數(shù)
-XX:-+UseCompressedOops : 普通對(duì)象指針壓縮(OOP即ordinary object pointer)
-XX:-+UseCompressedClassPointers : 類型指針壓縮,即針對(duì)klass pointer的指針壓縮
class A {
int i;
String s;
} // 是否開啟指針壓縮
// 那么一個(gè)new A()所占用的大小 = 16/12(對(duì)象頭) + 4(int) + 8/4(ref) + 4/4(對(duì)齊) = 32/24byte;
測(cè)試對(duì)象大小的代碼,具體展示結(jié)果可自行測(cè)試,下面是具體測(cè)試代碼
/**
* -XX:-+UseCompressedClassPointers 壓縮class指針的. 對(duì)象頭中使用 開啟這個(gè)參數(shù)需要開啟下個(gè)參數(shù)才可生效
* -XX:-+UseCompressedOops 壓縮對(duì)象值指針的,
* maven opjdk提供的
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.10</version>
</dependency>
* @author xuzhiwen
*/
public class Test {
public static void main(String[] args) throws NoSuchFieldException {
extracted(new Object(), "Object ");
extracted(new int[1], " int[1]");
extracted(new int[10], " int[10]");
extracted(new int[0], "int[0]");
extracted(new Integer[1], "Integer[1] ");
extracted(new Integer[0], "Integer[0]");
extracted(new T(new T()), "T");
extracted(new T2(), "T2");
}
private static void extracted(Object o, String s) {
System.out.println(s + " 占用大小");
System.out.println(ClassLayout.parseInstance(o).toPrintable());
System.out.println();
}
static class T2 {}
static class T {
public static T t2 = new T();
int i;
T t;
public T(T t) {this.t = t;}
public T() {}
}
}
jvm 序列化缺點(diǎn)
- 無法跨語言 : java序列化的對(duì)象,其他語言無法反序列化,除非實(shí)現(xiàn)java序列化協(xié)議
- 易被攻擊 : java無法保證序列化和反序列化的安全性,需要手動(dòng)添加代碼黑白名單,判斷等方式防止被攻擊
- 流太大,性能差 : 序列化后的字節(jié)比目前主流的protobuf,kryo等序列化框架要大很多,并且耗時(shí)要久一點(diǎn)
可以看到j(luò)ava的序列化有很多不足的地方,所以在很多框架中都是選擇自己實(shí)現(xiàn)序列化或者使用一些主流的開源框架
2. heap & off-heap
-
heap (堆內(nèi)存)
- jvm的內(nèi)存區(qū)域中,占用內(nèi)存空間最大的一部分叫做堆'heap',也就是我們所說的堆內(nèi)存. jvm中的heap主要是存放所有對(duì)象的實(shí)例,這一塊區(qū)域在jvm啟動(dòng)的時(shí)候被創(chuàng)建,并被所有的線程所共享,同時(shí)也是垃圾收集器的主要工作區(qū)域
-
off-heap (非堆內(nèi)存/堆外內(nèi)存)
- 堆外內(nèi)存意味著把一些對(duì)象的實(shí)例分配在Java虛擬機(jī)堆內(nèi)存以外的內(nèi)存區(qū)域坡倔,這些內(nèi)存直接受操作系統(tǒng)而不是jvm管理,這樣做的結(jié)果就是能保持一個(gè)較小的堆,以減少垃圾收集對(duì)程序的影響
- 為了解決heap過大導(dǎo)致gc停頓過長(zhǎng)的問題,java可以通過off-heap來緩解這個(gè)問題, off-heap可以通過零拷貝, 以減少數(shù)據(jù)從jvm內(nèi)存到系統(tǒng)之間的拷貝次數(shù)
3. Flink為什么自己管理內(nèi)存
- java序列化問題
- 序列化效率低,序列化時(shí)間過久
- 序列化密度低,序列化后的字節(jié)數(shù)組過大
- GC & oom問題
- 由于大數(shù)據(jù)情況下,jvm啟動(dòng)會(huì)開辟較大空間,導(dǎo)致full gc的時(shí)候時(shí)間較久,極大的影響任務(wù)性能
- 數(shù)據(jù)量大的情況下,當(dāng)jvm對(duì)象分配的大小超過jvm內(nèi)存(heap)時(shí),會(huì)拋出oom,導(dǎo)致jvm停止,影響框架健壯性和整體性能
4. Flink自主管理內(nèi)存的好處
- 將內(nèi)存抽象成MemorySegment,每個(gè)MemorySegment(底層就是一個(gè)字節(jié)數(shù)組)默認(rèn)大小32K
- segment會(huì)存儲(chǔ)在老年代,segment會(huì)被持續(xù)引用,不會(huì)被ygc的時(shí)候回收掉,那么老年代的內(nèi)存有很大一部分是不變的,如果full gc 那么一定是新生代對(duì)象移動(dòng)到老年代中造成,這樣對(duì)于管理老年代和控制full gc就會(huì)變得輕松一些.在新生代中創(chuàng)建的對(duì)象很大一部分會(huì)被flink通過序列化存入segment中,只要控制好非序列化的對(duì)象即可,這樣內(nèi)存管理會(huì)變得簡(jiǎn)單很多. memorySegment在后面會(huì)介紹
- 自定義序列化.彌補(bǔ)java序列化的不足,在flink中處理的對(duì)象都是經(jīng)過序列化的,然后存入segment中,所以u(píng)ser創(chuàng)建對(duì)象大部分都被在老年代中,由于flink的序列化性能較高,會(huì)使內(nèi)存占用降低,這樣老年代的對(duì)象也很穩(wěn)定,發(fā)生full gc的次數(shù)就會(huì)被減少,對(duì)象存儲(chǔ)在segment的字節(jié)數(shù)組中,當(dāng)對(duì)象被釋放的時(shí)候我們只需要通過調(diào)整數(shù)組的指針即可
- 直接操作二進(jìn)制數(shù)據(jù).flink可以直接對(duì)序列化后的字節(jié)進(jìn)行計(jì)算,所以內(nèi)存使用和計(jì)算效率非常高.二進(jìn)制數(shù)據(jù)的操作結(jié)果依然是字節(jié),同樣保存在segment中,只有在需要的時(shí)候才會(huì)反序列化成對(duì)象.序列化是消耗cpu的主要操作,因此flink的計(jì)算避免了大量的序列化操作
一.Flink內(nèi)存模型
上面圖為TaskManager內(nèi)存模型,左邊為細(xì)分的內(nèi)存模型,右邊為整體內(nèi)存模型,該圖摘自Flink官網(wǎng)
內(nèi)存可以分為三部分
heap內(nèi)存
- TaskManager使用的heap內(nèi)存,作為flink(Runtime)框架使用內(nèi)存的一部分
- Task使用的heap內(nèi)存,用于用戶代碼使用
non-heap內(nèi)存
- 托管內(nèi)存 : 實(shí)時(shí)用于rocksDB狀態(tài)后端,離線則用于排序,hash,中間結(jié)果緩存,不預(yù)分配參數(shù) managed memory
taskmanager.memory.preallocate: false- 直接內(nèi)存 :
- TaskManager使用的直接內(nèi)存,作為flink(Runtime)框架使用內(nèi)存的一部分
- Task使用的直接內(nèi)存,用于用戶代碼使用
- 用于網(wǎng)絡(luò)傳輸使用的直接內(nèi)存
jvm使用內(nèi)存
- JVM 元空間,即jvm的方法區(qū),可以通過jvm參數(shù)進(jìn)行配置
- JVM開銷保留的內(nèi)存,如線程堆棧,jit緩存,gc
關(guān)于內(nèi)存的判斷
- 若是 Flink 有硬限制的分區(qū),Flink 會(huì)報(bào)該分區(qū)內(nèi)存不足微渠。否則進(jìn)入下一步症歇。
- 若該分區(qū)屬于 JVM 管理的分區(qū),在其實(shí)際值增長(zhǎng)導(dǎo)致 JVM 分區(qū)也內(nèi)存耗盡時(shí),JVM 會(huì)報(bào)其所屬的 JVM 分區(qū)的 OOM (比如 java.lang.OutOfMemoryError: Jave heap space)却特。否則進(jìn)入下一步绵估。
- 該分區(qū)內(nèi)存持續(xù)溢出,最終導(dǎo)致進(jìn)程總體內(nèi)存超出容器內(nèi)存限制。在開啟嚴(yán)格資源控制的環(huán)境下,資源管理器(YARN/k8s 等)會(huì) kill 掉該進(jìn)程,在該情況下通常由開啟了rocksDB導(dǎo)致的
heap內(nèi)存在jvm啟動(dòng)的時(shí)候申請(qǐng)的一塊不變的內(nèi)存區(qū)域,該內(nèi)存實(shí)際上是Flink和task公用的一塊區(qū)域,在flink層面通過控制來區(qū)分框架使用和task內(nèi)存,heap內(nèi)存管理起來是比較容易的,實(shí)際上non-heap的內(nèi)存是難管理的一塊,如果管理不當(dāng)或者使用不當(dāng)可能造成內(nèi)存泄漏或者內(nèi)存無限增長(zhǎng)等問題
內(nèi)存參數(shù)配置
# ------------------- 堆內(nèi) ------------------------------
#flink框架使用內(nèi)存,默認(rèn)128,不計(jì)入slot
taskmanager.memory.framework.heap.size = xxxMB
#task使用的內(nèi)存,即用戶代碼
taskmanager.memory.task.heap.size = xxxMB
# ------------------- 堆外 -------------------------------
#flink框架使用 默認(rèn)128
taskmanager.memory.framework.off-heap.size = xxxMB
#task使用 , 默認(rèn)0
taskmanager.memory.task.heap.size = xxxMB
# network使用的內(nèi)存大小 默認(rèn) min=64mb max=1gb fraction=0.1
# 通過fraction計(jì)算出來值若比最小值小或比最大值大,就會(huì)限制到最小值或者最大值,比例是按照總內(nèi)存計(jì)算
taskmanager.memory.network.off-heap.[min/max/fraction] = xxxMB/xxxMB/0.x
# 托管內(nèi)存 ManagedMemory 計(jì)算同上,默認(rèn)0.4
taskmanager.memory.managed.off-heap.[size/fraction]= 0.x
#------------------- jvm使用 -----------------------------
# jvm元空間使用
taskmanager.memory.jvm-metaspace = xxxMB
# jvm執(zhí)行的開銷,堆棧,io編譯換成等使用的內(nèi)存
taskmanager.memory.jvm-overhead = xxxMB
# ------------------ 總內(nèi)存 ------------------------------
# 綜上框架使用的內(nèi)存堆和堆外內(nèi)存,通過該參數(shù)控制,
taskmanager.memory.flink.size = xxxMB
# flink任務(wù)進(jìn)程的使用的內(nèi)存
taskmanager.memory.porcess.size = xxxMB
# ------------------ jvm線束 ------------------------------
# 對(duì)應(yīng)于上面的jvm相關(guān)參數(shù),實(shí)際上都是根據(jù)上面內(nèi)存配置計(jì)算出來的jvm參數(shù),在啟動(dòng)tm的時(shí)候會(huì)指定對(duì)應(yīng)的jvm參數(shù)
-Xmx / -Xms 配置jvm堆內(nèi)存大小
-XX:MaxDirectMemorySize jvm直接內(nèi)存
-XX:MeataspaceSize 元空間使用內(nèi)存
二.Flink內(nèi)存管理
1.存在的問題
1). jvm問題
- 數(shù)據(jù)密度低
- 上面我們已經(jīng)介紹了java對(duì)象在內(nèi)存的分配,由于有大量的額外對(duì)齊數(shù)據(jù),會(huì)導(dǎo)致jvm中有效數(shù)據(jù)信息密度低,如果在大量對(duì)象的情況下會(huì)占用過多額外的內(nèi)存空間
- gc影響
- jvm的gc雖然內(nèi)存泄漏的可能已經(jīng)開發(fā)人員的工作量,但是gc回收是不可控的,在tb,pb級(jí)的數(shù)據(jù)計(jì)算需要大量?jī)?nèi)存,在內(nèi)存中生成大量的對(duì)象導(dǎo)致在full gc的時(shí)候會(huì)用時(shí)較久直接影響性能,甚至還會(huì)導(dǎo)致flink心跳超時(shí)問題
- oom影響
- 如果出現(xiàn)oom,則jvm直接崩潰,影響flink健壯性和性能問題
- 緩存未命中
- 由于java對(duì)象在堆上存儲(chǔ)不是連續(xù)的,所以從內(nèi)存讀取java對(duì)象時(shí),鄰近的數(shù)據(jù)通常不是cpu下一步需要計(jì)算的數(shù)據(jù),這就是緩存未命中,此時(shí)cpu等待從內(nèi)存重新讀取數(shù)據(jù),此時(shí)cpu空轉(zhuǎn),cpu的速度和內(nèi)存速度差距比較大,那么執(zhí)行效率也會(huì)隨之下降
2). 自主管理
- 自定義序列化工具 : 將數(shù)據(jù)序列化成二進(jìn)制數(shù)據(jù)存入在
MemorySegment
中,并且提供一些高效的讀寫方法,一些計(jì)算可以直接操作二進(jìn)制,減少序列化,如果需要序列化直接序列化需要計(jì)算的數(shù)據(jù)即可,無需全部序列化 - 合理使用堆外內(nèi)存 : 堆外在寫磁盤和網(wǎng)絡(luò)io中是通過零拷貝的,而堆內(nèi)需要在用戶態(tài)復(fù)制一次,提高io效率
- 緩存友好的數(shù)據(jù)結(jié)構(gòu)和算法 : 通過友好的緩存,以較少緩存未命中,提高cpu的執(zhí)行效率
3). 堆外不足
堆外出現(xiàn)問題是難以排查,操作不當(dāng)容易內(nèi)存泄漏
-
對(duì)于聲明周期短的MemorySegment,如果分配在堆外,開銷跟更高,分配時(shí)間要比堆內(nèi)分配響應(yīng)慢很多
通過下面代碼測(cè)試堆內(nèi)和堆外內(nèi)存的分配效率
public void test(){ for (int i = 1; i < 60; i++) { System.out.println("第" + i + "循環(huán)"); alloc(1024 * 1024, 1024); } private static void alloc(int size, int s) { printTakesTime( s, "分配堆內(nèi)存用時(shí) = ",val1 -> { byte[] bytes = new byte[size];}); printingTakesTime( s, "分配直接內(nèi)存用時(shí) = ", val1 -> { ByteBuffer buf = ByteBuffer.allocateDirect(size);}); System.gc(); } private static void printTakesTime( int s, String str, Consumer<Integer> consumer) { long start = System.currentTimeMillis(); for (int i = 0; i < s; i++) { consumer.accept(null); } System.out.println(str + (System.currentTimeMillis() - start) + "ms "); } }
2. 內(nèi)存數(shù)據(jù)結(jié)構(gòu)
1).MemorySegment (內(nèi)存段)
在flink中對(duì)內(nèi)存進(jìn)行了抽象成了MemorySegment,?默認(rèn)情況下,一個(gè) MemorySegment 對(duì)應(yīng)著一個(gè) 32KB 大小的內(nèi)存塊,這塊內(nèi)存既可以是堆上內(nèi)存( byte數(shù)組) ,也可以是堆外內(nèi)存(nio的ByteBufferr ) .
同時(shí)MemorySegment也提供了對(duì)二進(jìn)制數(shù)據(jù)的操作方法,以及讀取字節(jié)數(shù)組序列化以及序列化字節(jié)數(shù)組的方法等
下面是類繼承圖,該類有兩MemorySegment實(shí)現(xiàn)類有兩個(gè)分別為使用heap的以及混合的即有heap和non-heap,對(duì)于內(nèi)存的訪問有子類具體的實(shí)現(xiàn)
在MemorySegment類的注釋中有兩段話,我們解釋一下
Note on efficiency: For best efficiency, the code that uses this class should make sure that only one subclass is loaded, or that the methods that are abstract in this class are used only from one of the subclasses (either the HeapMemorySegment, or the HybridMemorySegment).
That way, all the abstract methods in the MemorySegment base class have only one loaded actual implementation. This is easy for the JIT to recognize through class hierarchy analysis, or by identifying that the invocations are monomorphic (all go to the same concrete method implementation). Under these conditions, the JIT can perfectly inline methods.
解釋一下上面的兩句話
為了提升效率應(yīng)該只加載一個(gè)MemorySegment的子類或者,調(diào)用抽象方法的時(shí)候只調(diào)用其中一個(gè)子類的,避免交叉使用
這樣MemorySegment的抽象方法只有一個(gè)已加載的實(shí)際發(fā)現(xiàn),通過類的層次分析或確定是單態(tài)(所有調(diào)用都指向一個(gè)具
體實(shí)現(xiàn))的,這樣JIT很容易識(shí)別并(去虛化)進(jìn)行方法內(nèi)聯(lián)
JIT(Just In Time)優(yōu)化
jvm是編譯和解釋
- 熱點(diǎn)代碼優(yōu)化 : 對(duì)執(zhí)行的熱點(diǎn)代碼進(jìn)行編譯成本地代碼,執(zhí)行編譯后的代碼效率會(huì)更快,編譯后的代碼會(huì)比膨脹,所以只有熱點(diǎn)代碼才會(huì)被編譯
- 完全去虛化 : 通過類型推導(dǎo)或者類層次分析,識(shí)別虛方法調(diào)用的唯一目標(biāo)方法,將其轉(zhuǎn)換為直接調(diào)用
static class A{ void a(){System.out.println("a");} } static class A1 extends A{ @Override void a() { System.out.println("a1");} } static class A2 extends A{ @Override void a() { System.out.println("a2");} } public static void main(String[] args) throws Exception { // 子類實(shí)現(xiàn)了父類的方法,構(gòu)建對(duì)象的時(shí)候通過子類引用指向父類 // 在調(diào)用方法的時(shí)候只能調(diào)用父類擁有的方法,但是子類有重寫或者實(shí)現(xiàn)了對(duì)應(yīng)的方法 // 在編譯時(shí)調(diào)用父類方法,運(yùn)行時(shí)調(diào)用具體實(shí)現(xiàn)類方法 // 那么這個(gè)方法就是虛方法 // 如果只有A1.class被加載,通過層次分析,很容易識(shí)別到目標(biāo)方法, // 如果A2.class也被加載,那么就會(huì)通過具體的類-------------- final A a = new A1(); a.a(); } }
- 方法內(nèi)聯(lián) : 將目標(biāo)方法代碼轉(zhuǎn)移至當(dāng)前代碼中,避免入棧和出棧不必要的開銷
public void t1(){ int r = sum(1,2)+ sum(2,3); // 優(yōu)化后 , 代碼直接嵌入 // int r = 1+2+2+3; } public int sum(int a,int b){ return a + b; } }
- ......等
public abstract class MemorySegment {
/** unsafe對(duì)象,用于操作heap an non-heap內(nèi)存 */
@SuppressWarnings("restriction")
protected static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
/* 數(shù)組對(duì)象的偏移量,基本為16,因?yàn)槭窍鄬?duì)于數(shù)組對(duì)象而言,前面是對(duì)象頭,后面是數(shù)據(jù),當(dāng)然不完全是,通常情況下而已 */
@SuppressWarnings("restriction")
protected static final long BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
/* 判斷是大端法還是小端法,不同的cpu會(huì)使用不同的字節(jié)順序 大端 : 低地址存放最高有效字節(jié), 小端與大端相反 */
private static final boolean LITTLE_ENDIAN =
(ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN);
// ------------------------------------------------------------------------
/* 如果使用heap的情況下為字節(jié)數(shù)組對(duì)象,non-heap情況下為null */
protected final byte[] heapMemory;
/* 內(nèi)存地址,heapMemory =null 則為絕對(duì)地址,否則為heapMemory的相對(duì)地址 */
protected long address;
/* 標(biāo)識(shí)地址結(jié)束位置, address+size */
protected final long addressLimit;
/** memorySegment的大小 heap情況為字節(jié)數(shù)組長(zhǎng)度,non-heap為byteBuffer的capacity */
protected final int size;
/** memory segment owner */
private final Object owner;
// ----------------------------------------- 構(gòu)造方法 ---------------------------------------------
// 是用heap情況下的構(gòu)造方法
MemorySegment(byte[] buffer, Object owner) {
this.heapMemory = buffer;
this.address = BYTE_ARRAY_BASE_OFFSET; // address為字節(jié)數(shù)組的相對(duì)偏移量
this.size = buffer.length; // 字節(jié)長(zhǎng)度
this.addressLimit = this.address + this.size; // address結(jié)束地址
this.owner = owner;
}
// non-heap的構(gòu)造方法
MemorySegment(long offHeapAddress, int size, Object owner) {
this.heapMemory = null; // heapMemory為null
this.address = offHeapAddress; // 堆外對(duì)絕對(duì)地址,
this.addressLimit = this.address + size;
this.size = size; // 堆外申請(qǐng)的內(nèi)存大小
this.owner = owner;
}
// ------------------------------------------------------------------------------------------------
}
2).DataInputView/DataOutputView
MemorySemgent是flink內(nèi)存分配的最小單元了,對(duì)于數(shù)據(jù)夸MemorySemgent保存,那么對(duì)于上層的使用者來說,需要考慮考慮所有的細(xì)節(jié),由于過于繁瑣,所以在MemorySemgent上又抽象了一層內(nèi)存也,內(nèi)存也是在MemorySemgent數(shù)據(jù)訪問上的視圖,對(duì)數(shù)據(jù)輸入和輸出分別抽象為DataInputView/DataOutputView,有了這一層,上層使用者無需關(guān)心跨MemorySemgent的細(xì)節(jié)問題,內(nèi)存也對(duì)自動(dòng)處理跨MemorySemgent的內(nèi)存操作
DataInputView
DataInputView繼承DataInput,DataInputView是對(duì)MemorySemgent讀取的抽象視圖,提供一系列讀取二進(jìn)制數(shù)據(jù)不同類型的方法,AbstractPageInputView是DataInputView的一個(gè)抽象實(shí)現(xiàn)類,并且基本所有InputView都實(shí)現(xiàn)了該類,即所有實(shí)現(xiàn)該類的InputView都支持Page
InputView持有了多個(gè)MemorySemgent的引用(可以基于數(shù)組,list,deque等),這些MemorySemgent被視為一個(gè)內(nèi)存頁(yè),可以順序,隨機(jī)等方式讀取數(shù)據(jù),要基于不同的實(shí)現(xiàn)類,實(shí)現(xiàn)類不同讀取方式不同
方法圖
類繼承圖
DataOutputView
與DataInputView相對(duì)應(yīng),繼承Output,并有一個(gè)擁有Page功能的抽象類(AbstractPagedOutputView),其大部outputView的實(shí)現(xiàn)都是繼承自該抽象類,對(duì)一組MemorySemgent提供一個(gè)基于頁(yè)的寫入功能
方法圖
類繼承圖
3).Buffer
用于網(wǎng)絡(luò)io數(shù)據(jù)的包裝,每個(gè)buffer持有一個(gè)MemorySegment的引用,resultPartition寫數(shù)據(jù)的時(shí)候,會(huì)向LocalBufferPool申請(qǐng)Buffer,會(huì)返回BufferBuilder,通過BufferBuilder想Buffer<實(shí)際寫入的是MemorySegment>
寫數(shù)據(jù)
BufferBuilder是在上游Task中,負(fù)責(zé)想Buffer寫入數(shù)據(jù),BufferConsumer位于下游,與BufferBuilder相對(duì)應(yīng),用于消費(fèi)Buffer的數(shù)據(jù),每個(gè)bufferBuilder對(duì)應(yīng)一個(gè)bufferConsumer
常用參數(shù)介紹
public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Buffer {
/* buffer中持有的內(nèi)存段 */
private final MemorySegment memorySegment;
/* 用于回收MemorySegment的回收器 */
private final BufferRecycler recycler;
/* buffer分配器,netty會(huì)使用到*/
private ByteBufAllocator allocator;
/* 當(dāng)前buffer的容量 */
private int currentSize;
// 釋放buffer,引用計(jì)數(shù)-1, 引用計(jì)數(shù)=0則調(diào)用deallocate方法回收buffer
@Override
public void recycleBuffer() {
release();
}
// 保留buffer,原理就是引用計(jì)數(shù)+1
@Override
public NetworkBuffer retainBuffer() {
return (NetworkBuffer) super.retain();
}
// 回收MemorySegment,放入pool
@Override
protected void deallocate() {
recycler.recycle(memorySegment);
}
// ....... 忽略
}
buffer申請(qǐng)
// ------------------ resultPartition ------------------------------------------
// BufferWritingResultPartition.java
// 請(qǐng)求Buffer
private BufferBuilder requestNewBufferBuilderFromPool(int targetSubpartition)
throws IOException {
// bufferPool = LocalBufferPool,請(qǐng)求本地buffer,如果LocalBufferPool沒有memorySegment則會(huì)向全局的資源池申請(qǐng)memorySegment
// 實(shí)際上請(qǐng)求buffer就是請(qǐng)求memorySegment的過程,memorySegment被bufferBuilder包裝了一下,方便后面使用
BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(targetSubpartition);
if (bufferBuilder != null) {
return bufferBuilder;
}
final long start = System.currentTimeMillis();
try {
// 當(dāng)前面請(qǐng)求不到的時(shí)候,則會(huì)通過block的形式請(qǐng)求,阻塞直到有可用buffer返回
bufferBuilder = bufferPool.requestBufferBuilderBlocking(targetSubpartition);
idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start);
return bufferBuilder;
} catch (InterruptedException e) {
throw new IOException("Interrupted while waiting for buffer");
}
}
// -------------------
buffer回收
當(dāng)buffer用完之后需要進(jìn)行回收比如在netty的clientHandler收到響應(yīng)之后進(jìn)行處理就會(huì)把buffer回收掉,buffer回收之后并不會(huì)釋放memorySegment,而是放回池中,變?yōu)榭捎脙?nèi)存,反復(fù)使用
// NetworkBuffer.java
@Override
public void recycleBuffer() {
// 由于繼承了netty的AbstractReferenceCountedByteBuf類,所以也具有引用計(jì)數(shù)功能
// 調(diào)用了release之后會(huì)使引用計(jì)數(shù)-1,當(dāng)count=0的時(shí)候就會(huì)回收buffer了,buffer回收了并不會(huì)釋放memorySegment
release();
}
// -----------------------------------------------------------------------------
// LocalBufferPool.java
// 廢品回收站
private void recycle(MemorySegment segment, int channel) {
BufferListener listener;
CompletableFuture<?> toNotify = null;
NotificationResult notificationResult = NotificationResult.BUFFER_NOT_USED;
while (!notificationResult.isBufferUsed()) {
synchronized (availableMemorySegments) {
if (channel != UNKNOWN_CHANNEL) {
if (subpartitionBuffersCount[channel]-- == maxBuffersPerChannel) {
unavailableSubpartitionsCount--;
}
}
if (isDestroyed || hasExcessBuffers()) {
// 返回給全局bufferPool
returnMemorySegment(segment);
return;
} else {
listener = registeredListeners.poll();
if (listener == null) {
// 返回給localBufferPool
availableMemorySegments.add(segment);
break;
}
}
checkConsistentAvailability();
}
notificationResult = fireBufferAvailableNotification(listener, segment);
}
mayNotifyAvailable(toNotify);
}
4).BufferBuilder & BufferConsumer
省略 ...................
就是寫入和消費(fèi)MemorySegment的
3.MemroyManager
flink托管的內(nèi)存,托管內(nèi)存使用堆外內(nèi)存,用于批處理緩存排序等以及提供rocksDB內(nèi)存
內(nèi)存申請(qǐng)
class MemoryManager {
public List<MemorySegment> allocatePages(Object owner, int numPages)
throws MemoryAllocationException {
List<MemorySegment> segments = new ArrayList<>(numPages);
allocatePages(owner, segments, numPages);
return segments;
}
// 申請(qǐng)內(nèi)存
public void allocatePages(Object owner, Collection<MemorySegment> target, int numberOfPages)
throws MemoryAllocationException {
long memoryToReserve = numberOfPages * pageSize;
try {
memoryBudget.reserveMemory(memoryToReserve);
} catch (MemoryReservationException e) {
throw new MemoryAllocationException( String.format("Could not allocate %d pages", numberOfPages), e);
}
// 注冊(cè)一個(gè)釋放內(nèi)存的清理函數(shù)
Runnable gcCleanup = memoryBudget.getReleaseMemoryAction(getPageSize());
// 申請(qǐng)內(nèi)存直接內(nèi)存,并將釋放內(nèi)存的runnable傳入,用于回收內(nèi)存
allocatedSegments.compute(
owner,
(o, currentSegmentsForOwner) -> {
Set<MemorySegment> segmentsForOwner =
currentSegmentsForOwner == null
? new HashSet<>(numberOfPages)
: currentSegmentsForOwner;
for (long i = numberOfPages; i > 0; i--) {
MemorySegment segment =
// 申請(qǐng)一個(gè)HybridMemorySegment內(nèi)存
allocateOffHeapUnsafeMemory(getPageSize(), owner, gcCleanup);
target.add(segment);
segmentsForOwner.add(segment);
}
return segmentsForOwner;
});
}
}
// MemorySegmentFactory
public static MemorySegment allocateOffHeapUnsafeMemory(
int size, Object owner, Runnable gcCleanupAction) {
long address = MemoryUtils.allocateUnsafe(size);
// 通過unsafe分配一個(gè)directBytebuffer
ByteBuffer offHeapBuffer = MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);
// 用于回收內(nèi)存的清理器
Runnable cleaner =
MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address, gcCleanupAction);
return new HybridMemorySegment(offHeapBuffer, owner, false, cleaner);
}
// MemoryUtils
static ByteBuffer wrapUnsafeMemoryWithByteBuffer(long address, int size) {
try {
ByteBuffer buffer = (ByteBuffer) UNSAFE.allocateInstance(DIRECT_BYTE_BUFFER_CLASS);
// 為byteBuffer的addree 和 capacity賦值
UNSAFE.putLong(buffer, BUFFER_ADDRESS_FIELD_OFFSET, address);
UNSAFE.putInt(buffer, BUFFER_CAPACITY_FIELD_OFFSET, size);
buffer.clear();
return buffer;
} catch (Throwable t) {
throw new Error("Failed to wrap unsafe off-heap memory with ByteBuffer", t);
}
}
內(nèi)存釋放
public final class HybridMemorySegment extends MemorySegment {
@Override
public void free() {
// 調(diào)用父類的free方法
super.free();
// 執(zhí)行清理器,cleaner會(huì)執(zhí)行到下面的兩個(gè)方法
if (cleaner != null) {
cleaner.run();
}
// 切斷引用,gc直接回收
offHeapBuffer = null;
}
}
// UnsafeMemoryBudget.java --- MemoryManager的一個(gè)參數(shù),上面申請(qǐng)內(nèi)存的時(shí)候用到了
// 通過cas的方式循環(huán)釋放,實(shí)際的上釋放并不是真正意義上的釋放,只是修改了指針的位置
// 這里釋放的是manager的內(nèi)存,將要釋放的內(nèi)存大小加至到可用內(nèi)存大小中,完整釋放
// 注意實(shí)際上Memory是不存儲(chǔ)內(nèi)存的,通過參數(shù)來控制內(nèi)存的申請(qǐng),最底層的內(nèi)存申請(qǐng)還是通過unsafe來做的
void releaseMemory(@Nonnegative long size) {
if (size == 0) {return; }
boolean released = false;
long currentAvailableMemorySize = 0L;
// cas方式釋放
while (!released && totalMemorySize >= (currentAvailableMemorySize = availableMemorySize.get()) + size){
released = availableMemorySize
.compareAndSet(currentAvailableMemorySize, currentAvailableMemorySize + size);
}
if (!released) {
throw new IllegalStateException(String.format("Trying to release more managed memory (%d bytes) than has been allocated (%d bytes), the total size is %d bytes",size, currentAvailableMemorySize, totalMemorySize));
}
// MemoryUtils.java
// 這里是實(shí)際的釋放內(nèi)存,我們?cè)贛emoryManager申請(qǐng)內(nèi)存的時(shí)候,首先通過unsafe來申請(qǐng)一塊內(nèi)存即size,long address = unsafe.allocateMemory(size)返回的是堆外的實(shí)際地址,通過申請(qǐng)的內(nèi)存來實(shí)例化一個(gè)ByteBuffer對(duì)象
// 最終釋放的時(shí)候仍需要使用unsafe來做,直接通過unsafe.freeMemory(address)即可
private static void releaseUnsafe(long address) {
UNSAFE.freeMemory(address);
}
rocksDB申請(qǐng)資源
MemoryManager更多的是用戶管理,來控制rocksDB的內(nèi)存使用,通過rocksDB block cache和writerBufferManager參數(shù)來限制,具體值通過TM內(nèi)存配置計(jì)算得來,最終還是有rocksDB自己來負(fù)責(zé)運(yùn)行過程中內(nèi)存的申請(qǐng)和釋放,所以對(duì)于rocks真實(shí)的內(nèi)存使用,flink并不能完全的掌握,也就導(dǎo)致了flink 任務(wù)被yarn/k8s給kill掉,這里rocksDB的內(nèi)存申請(qǐng)是通過jni來申請(qǐng),對(duì)于其申請(qǐng)的原理目前我也不是特別清楚,后面我查閱相關(guān)資料.研究一下
下面的代碼看看就行,后面是調(diào)入了rocksDB的代碼,因?yàn)槲覀€(gè)人也不是很了解rocksDB,這里就不多做介紹,這里我們只需要了解MemoryManager是用于提供rocksDB內(nèi)存以及批處理相關(guān)的工作即可,這里不必太深究
<我也不太懂,就不多說了>
public <T extends AutoCloseable> OpaqueMemoryResource<T> getExternalSharedMemoryResource(
String type, LongFunctionWithException<T, Exception> initializer, long numBytes)
throws Exception {
final Object leaseHolder = new Object();
final SharedResources.ResourceAndSize<T> resource =
sharedResources.getOrAllocateSharedResource(
type, leaseHolder, initializer, numBytes);
final ThrowingRunnable<Exception> disposer =
() -> sharedResources.release(type, leaseHolder);
return new OpaqueMemoryResource<>(resource.resourceHandle(), resource.size(), disposer);
}
public <T extends AutoCloseable> OpaqueMemoryResource<T> getSharedMemoryResourceForManagedMemory(
String type,
LongFunctionWithException<T, Exception> initializer,
double fractionToInitializeWith)
throws Exception {
final long numBytes = computeMemorySize(fractionToInitializeWith);
final LongFunctionWithException<T, Exception> reserveAndInitialize =
(size) -> {
try {
reserveMemory(type, size);
} catch (MemoryReservationException e) {
throw new MemoryAllocationException("Could not created the shared memory resource of size "+ size + ". Not enough memory left to reserve from the slot's managed memory.",e);
}
try {
return initializer.apply(size);
} catch (Throwable t) {
releaseMemory(type, size);
throw t;
}
};
final Consumer<Long> releaser = (size) -> releaseMemory(type, size);
final Object leaseHolder = new Object();
final SharedResources.ResourceAndSize<T> resource =
sharedResources.getOrAllocateSharedResource(type, leaseHolder, reserveAndInitialize, numBytes);
final long size = resource.size();
final ThrowingRunnable<Exception> disposer =() -> sharedResources.release(type, leaseHolder, releaser);
return new OpaqueMemoryResource<>(resource.resourceHandle(), size, disposer);
}
三.源碼分析
NetworkBufferPool
NetworkBufferPool是一個(gè)固定大小的MemorySegment實(shí)例吃,用于網(wǎng)絡(luò)棧中,NettyBufferPool會(huì)為每個(gè)ResultPartition創(chuàng)建屬于自己的LocalBufferPool,NettyBufferPool會(huì)作為全局的pool來提供內(nèi)存,LocalBufferPool會(huì)通過限制來控制自己內(nèi)存的申請(qǐng),防止過多申請(qǐng)
// 代碼沒啥 就是提前申請(qǐng)了用于network的代碼,這塊代碼在啟動(dòng)的時(shí)候進(jìn)行申請(qǐng)
LocalBufferPool
LocalBufferPool繼承關(guān)系,實(shí)現(xiàn)了bufferRecycler的接口,用于回收自己持有的buffer
每個(gè)Task擁有一個(gè)自己的LocalBufferPool,在數(shù)據(jù)接收和數(shù)據(jù)發(fā)送的過程中,會(huì)向LocalBufferPool請(qǐng)求buffer,將數(shù)據(jù)存儲(chǔ)在buffer中的,如果LocalBufferPool持有的buffer用盡,則會(huì)想全局的nettyBufferPool請(qǐng)求buffer,為了防止單個(gè)Task導(dǎo)致整個(gè)TM的反壓,會(huì)限制每個(gè)LocalBufferPool請(qǐng)求全局BufferPool的數(shù)量
在數(shù)據(jù)接收的時(shí)候會(huì)將數(shù)據(jù)封裝成NettyBuffer,在數(shù)據(jù)發(fā)送的時(shí)候會(huì)通過BufferBilder向MemorySegment寫入數(shù)據(jù),然后通過BufferConsumer讀取MemorySegment的數(shù)據(jù)
class LocalBufferPool implements BufferPool {
// 全局bufferPool,當(dāng)localBufferPool的buffer用完之后會(huì)向全局bufferPool申請(qǐng)buffer
private final NetworkBufferPool networkBufferPool;
// 可用的memorySegment,memorySegment不會(huì)單獨(dú)提供使用,會(huì)被NetworkBuffer進(jìn)行封裝后使用
private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();
// 監(jiān)聽器,實(shí)際上就是bufferManager,在bufferManager內(nèi)存不夠用的時(shí)候會(huì)注冊(cè)監(jiān)聽器,然后當(dāng)內(nèi)存可用的時(shí)候監(jiān)聽器會(huì)通知buffer可用
// 然后將memorySegment封裝成nettyBuffer添加到bufferManager的浮動(dòng)隊(duì)列中
private final ArrayDeque<BufferListener> registeredListeners = new ArrayDeque<>();
// 所需要的最小memorySegemt的數(shù)量
private final int numberOfRequiredMemorySegments;
// 最大分配數(shù)量
private final int maxNumberOfMemorySegments;
// 當(dāng)前pool的大小
private int currentPoolSize;
// 從全局bufferPool請(qǐng)求的數(shù)量
private int numberOfRequestedMemorySegments;
// 每個(gè)channel最大使用的buffer數(shù)量
private final int maxBuffersPerChannel;
// 每個(gè)結(jié)果子分區(qū)使用的buffer數(shù)量
@GuardedBy("availableMemorySegments")
private final int[] subpartitionBuffersCount;
// 每個(gè)結(jié)果子分區(qū)的buffer回收器
private final BufferRecycler[] subpartitionBufferRecyclers;
LocalBufferPool(
NetworkBufferPool networkBufferPool,
int numberOfRequiredMemorySegments,
int maxNumberOfMemorySegments,
int numberOfSubpartitions,
int maxBuffersPerChannel) {
this.networkBufferPool = networkBufferPool;
this.numberOfRequiredMemorySegments = numberOfRequiredMemorySegments;
this.currentPoolSize = numberOfRequiredMemorySegments;
this.maxNumberOfMemorySegments = maxNumberOfMemorySegments;
this.subpartitionBuffersCount = new int[numberOfSubpartitions];
subpartitionBufferRecyclers = new BufferRecycler[numberOfSubpartitions];
for (int i = 0; i < subpartitionBufferRecyclers.length; i++) {
subpartitionBufferRecyclers[i] = new SubpartitionBufferRecycler(i, this);
}
this.maxBuffersPerChannel = maxBuffersPerChannel;
synchronized (this.availableMemorySegments) {
// 檢查buffer的可用性,如果可用的buffer,如果buffer沒有超過當(dāng)前pool的大小,則會(huì)想全局bufferPool申請(qǐng)一個(gè)buffer
if (checkAvailability()) {
// 可用性設(shè)置成可用,廢話文學(xué)
availabilityHelper.resetAvailable();
}
// 檢查可用性是否保持一致
checkConsistentAvailability();
}
}
// 請(qǐng)求buffer,在bufferManager中沒有可用buffer的時(shí)候會(huì)向LocalBufferPool請(qǐng)求浮動(dòng)buffer
public Buffer requestBuffer() {
// 請(qǐng)求一個(gè)memorySegment并將其封裝到NettyBuffer
return toBuffer(requestMemorySegment());
}
// 請(qǐng)求bufferBuilder,用于task處理完數(shù)據(jù)發(fā)送下游時(shí)候使用,主要用于將處理完的數(shù)據(jù)放入buffer中發(fā)送到下游
// BufferWritingResultPartition中會(huì)請(qǐng)求bufferBuilder存放record
// bufferBuilder前面介紹過,主要是用于將數(shù)據(jù)寫入到memorySegment中
@Override
public BufferBuilder requestBufferBuilder(int targetChannel) {
return toBufferBuilder(requestMemorySegment(targetChannel), targetChannel);
}
// 阻塞的形式請(qǐng)求bufferBuilder,在上面請(qǐng)求bufferBuilder的時(shí)候沒有獲取到,則會(huì)通過這個(gè)方法進(jìn)行阻塞的請(qǐng)求
public BufferBuilder requestBufferBuilderBlocking(int targetChannel)
throws InterruptedException {
return toBufferBuilder(requestMemorySegmentBlocking(targetChannel), targetChannel);
}
// 將memory封裝成buffer
private Buffer toBuffer(MemorySegment memorySegment) {
return new NetworkBuffer(memorySegment, this);
}
// 將memory封裝成bufferBuilder
private BufferBuilder toBufferBuilder(MemorySegment memorySegment, int targetChannel) {
if (targetChannel == UNKNOWN_CHANNEL) {
return new BufferBuilder(memorySegment, this);
} else {
return new BufferBuilder(memorySegment, subpartitionBufferRecyclers[targetChannel]);
}
}
// 阻塞的請(qǐng)求memorySegment
private MemorySegment requestMemorySegmentBlocking(int targetChannel)
throws InterruptedException {
MemorySegment segment;
while ((segment = requestMemorySegment(targetChannel)) == null) {
try {
getAvailableFuture().get();
} catch (ExecutionException e) {
ExceptionUtils.rethrow(e);
}
}
return segment;
}
// 請(qǐng)求memorySegment
private MemorySegment requestMemorySegment(int targetChannel) {
MemorySegment segment;
synchronized (availableMemorySegments) {
if (targetChannel != UNKNOWN_CHANNEL
&& subpartitionBuffersCount[targetChannel] >= maxBuffersPerChannel) {
return null;
}
segment = availableMemorySegments.poll();
if (segment == null) {
return null;
}
if (targetChannel != UNKNOWN_CHANNEL) {
if (++subpartitionBuffersCount[targetChannel] == maxBuffersPerChannel) {
unavailableSubpartitionsCount++;
}
}
if (!checkAvailability()) {
availabilityHelper.resetUnavailable();
}
checkConsistentAvailability();
}
return segment;
}
// 從全局bufferPool請(qǐng)求memorySegment
private boolean requestMemorySegmentFromGlobal() {
if (isRequestedSizeReached()) {
return false;
}
// 如果請(qǐng)求成功則加入到可用的memorySegment隊(duì)列
MemorySegment segment = networkBufferPool.requestMemorySegment();
if (segment != null) {
availableMemorySegments.add(segment);
// 將請(qǐng)求memorySegment數(shù)+!
numberOfRequestedMemorySegments++;
return true;
}
return false;
}
// 檢查可用,順便請(qǐng)求一個(gè)memorySegment
private boolean checkAvailability() {
if (!availableMemorySegments.isEmpty()) {
return unavailableSubpartitionsCount == 0;
}
if (!isRequestedSizeReached()) {
// 全局bufferPool請(qǐng)求一個(gè)內(nèi)存段
if (requestMemorySegmentFromGlobal()) {
return unavailableSubpartitionsCount == 0;
} else {
requestMemorySegmentFromGlobalWhenAvailable();
return shouldBeAvailable();
}
}
return false;
}
// 釋放內(nèi)存段
// 這里可以對(duì)應(yīng)上BufferManager
@Override
public void recycle(MemorySegment segment) {
recycle(segment, UNKNOWN_CHANNEL);
}
// listener實(shí)際上就是memoryManager
private void recycle(MemorySegment segment, int channel) {
BufferListener listener;
CompletableFuture<?> toNotify = null;
NotificationResult notificationResult = NotificationResult.BUFFER_NOT_USED;
while (!notificationResult.isBufferUsed()) {
synchronized (availableMemorySegments) {
if (channel != UNKNOWN_CHANNEL) {
if (subpartitionBuffersCount[channel]-- == maxBuffersPerChannel) {
unavailableSubpartitionsCount--;
}
}
// 如果請(qǐng)求的buffer已經(jīng)大于了當(dāng)前pool的大小,則將這個(gè)memorySegment返回給全局bufferPool
if (isDestroyed || hasExcessBuffers()) {
returnMemorySegment(segment);
return;
} else {
// 從lintener隊(duì)列中取一個(gè)listener,如果listener==null說明bufferManager不需要浮動(dòng)內(nèi)存了
// 會(huì)直接將memorySegment加入當(dāng)前pool的可用隊(duì)列
listener = registeredListeners.poll();
if (listener == null) {
availableMemorySegments.add(segment);
// only need to check unavailableSubpartitionsCount here because
// availableMemorySegments is not empty
if (!availabilityHelper.isApproximatelyAvailable()
&& unavailableSubpartitionsCount == 0) {
toNotify = availabilityHelper.getUnavailableToResetAvailable();
}
break;
}
}
checkConsistentAvailability();
}
// 如果listener不等于null,通知通知給listener buffer可用,
notificationResult = fireBufferAvailableNotification(listener, segment);
}
mayNotifyAvailable(toNotify);
}
private NotificationResult fireBufferAvailableNotification(
BufferListener listener, MemorySegment segment) {
// 調(diào)用到bufferManager的notifyBufferAvailable()方法,將buffer加入到bufferManager的浮動(dòng)buffer中
NotificationResult notificationResult =
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
// 如果bufferManager還需要更多的buffer,則會(huì)將listener再次加入到listener隊(duì)列中,等待下次buffer被回收
if (notificationResult.needsMoreBuffers()) {
synchronized (availableMemorySegments) {
if (isDestroyed) {
listener.notifyBufferDestroyed();
} else {
registeredListeners.add(listener);
}
}
}
return notificationResult;
}
// bufferManager會(huì)調(diào)用,當(dāng)bufferManager的buffer不夠用的時(shí)候會(huì)通過監(jiān)聽器等待localBufferPool回收buufer
public boolean addBufferListener(BufferListener listener) {
synchronized (availableMemorySegments) {
if (!availableMemorySegments.isEmpty() || isDestroyed) {
return false;
}
registeredListeners.add(listener);
return true;
}
}
// 將內(nèi)存段返回給全局bufferPool
private void returnMemorySegment(MemorySegment segment) {
assert Thread.holdsLock(availableMemorySegments);
// 返回給pool后會(huì)將請(qǐng)求的內(nèi)存段數(shù)量-1
numberOfRequestedMemorySegments--;
networkBufferPool.recycle(segment);
}
// localBufferPool為每個(gè)結(jié)果子分區(qū)分配的內(nèi)存回收器,回收器會(huì)持有當(dāng)前LocalBufferPool的引用,調(diào)用到當(dāng)前pool的內(nèi)存回收方法
// 用于結(jié)果子分區(qū)回收buffer,最終還是釋放給localBufferPool
private static class SubpartitionBufferRecycler implements BufferRecycler {
private int channel;
private LocalBufferPool bufferPool;
SubpartitionBufferRecycler(int channel, LocalBufferPool bufferPool) {
this.channel = channel;
this.bufferPool = bufferPool;
}
@Override
public void recycle(MemorySegment memorySegment) {
bufferPool.recycle(memorySegment, channel);
}
}
}
BufferManager
BufferManager主要用于為RemoteInputChannel提供buffer的,bufferManager在啟動(dòng)的時(shí)候會(huì)向全局bufferPool請(qǐng)求自己的獨(dú)有buffer,當(dāng)bufferManager的buffer不夠的時(shí)候,則會(huì)向localBufferPool請(qǐng)求buffer,此時(shí)請(qǐng)求的buffer為浮動(dòng)buffer
實(shí)際上提供的buffer是給到netty的handler了,在netty接收到server響應(yīng)的消息后,會(huì)請(qǐng)求buffer解析message,將消息封裝到buffer中,在請(qǐng)求buffer的過程中,實(shí)際上就是向bufferManager請(qǐng)求buffer的過程
在后面我們會(huì)介紹到LocalBufferPool
public class BufferManager implements BufferListener, BufferRecycler {
// 持有浮動(dòng)buffer和獨(dú)占buffer的一個(gè)隊(duì)列
private final AvailableBufferQueue bufferQueue = new AvailableBufferQueue();
// 用于向全局bufferPool申請(qǐng)buffer,以及釋放buffer
private final MemorySegmentProvider globalPool;
//標(biāo)記,是在等待浮動(dòng)buffer
@GuardedBy("bufferQueue") // 注解表示該變量被bufferQueue保護(hù)
private boolean isWaitingForFloatingBuffers;
//input channel所需的buffer總數(shù)
@GuardedBy("bufferQueue")
private int numRequiredBuffers;
// 請(qǐng)求buffer
// 該方法會(huì)在clinet端收到server端的數(shù)據(jù)后,會(huì)請(qǐng)求buffer封裝server端的response message
// 將解析好的msg發(fā)送到下游,然后釋放掉netty的byteBuf
Buffer requestBuffer() {
synchronized (bufferQueue) {
return bufferQueue.takeBuffer();
}
}
// 這里是屬于bufferManager獨(dú)有buffer,單獨(dú)進(jìn)行管理,是有全局bufferPool申請(qǐng)的,當(dāng)remoteInputChannel啟動(dòng)的時(shí)候在setup()方法中,一次性請(qǐng)求bufferManager所獨(dú)有的buffer
void requestExclusiveBuffers(int numExclusiveBuffers) throws IOException {
// 從全局bufferPool中請(qǐng)求buffer
Collection<MemorySegment> segments = globalPool.requestMemorySegments(numExclusiveBuffers);
synchronized (bufferQueue) {
// 將請(qǐng)求到的buffer加入到bufferManager的獨(dú)有buffer隊(duì)列匯總
for (MemorySegment segment : segments) {
bufferQueue.addExclusiveBuffer(
new NetworkBuffer(segment, this), numRequiredBuffers);
}
}
}
// 這個(gè)方法有onSenderBacklog()方法進(jìn)行調(diào)用,當(dāng)收到server的消息后,會(huì)根據(jù)server的背壓數(shù)量來請(qǐng)求buffer
// 如果請(qǐng)求到buffer,則將請(qǐng)求到的buffer數(shù)量通過credit的形式發(fā)送給server,這里涉及到了flink的背壓通信機(jī)制
// 請(qǐng)求浮動(dòng)buffer,buffer是從localBufferPool請(qǐng)求的,該buffer請(qǐng)求在被回收的時(shí)候會(huì)選擇性的返回,返回給 bufferManager/localBufferPool/全局bufferPool
int requestFloatingBuffers(int numRequired) {
int numRequestedBuffers = 0;
synchronized (bufferQueue) {
if (inputChannel.isReleased()) {
return numRequestedBuffers;
}
numRequiredBuffers = numRequired;
while (bufferQueue.getAvailableBufferSize() < numRequiredBuffers
&& !isWaitingForFloatingBuffers) {
// 從localBufferPool請(qǐng)求buffer
BufferPool bufferPool = inputChannel.inputGate.getBufferPool();
Buffer buffer = bufferPool.requestBuffer();
if (buffer != null) {
bufferQueue.addFloatingBuffer(buffer);
numRequestedBuffers++;
}
// 如果localBufferPool沒有buffer則會(huì)將自己加入的localBufferPool的listener隊(duì)列中
// 這里思考一下為什么要這么做,因?yàn)闆]有buffer了,所以通過一個(gè)監(jiān)聽器來監(jiān)聽buffer什么時(shí)候被釋放
// localBufferPool的buffer釋放之后,會(huì)判斷是否有l(wèi)istener,如果有l(wèi)istener則說明bufferManager需要buffer
// 就會(huì)將回收之后的buffer加入到bufferManager的浮動(dòng)buffer隊(duì)列
// ↓↓↓ --- 下面的notifyBufferAvailable()中有介紹
else if (bufferPool.addBufferListener(this)) {
isWaitingForFloatingBuffers = true;
break;
}
}
}
// 返回請(qǐng)求到的buffer數(shù)量
return numRequestedBuffers;
}
// BufferManager的recycle方法,與LocalBufferPool實(shí)現(xiàn)不同
// 該方法被調(diào)用都是bufferManager獨(dú)有的buffer使用完成了,所以會(huì)將獨(dú)有的buffer返回到自己管理的buffer隊(duì)列中,
// 不會(huì)像LocalBufferPool通過一系列條件判斷來決定buffer返回給誰
public void recycle(MemorySegment segment) {
@Nullable Buffer releasedFloatingBuffer = null;
synchronized (bufferQueue) {
try {
// Similar to notifyBufferAvailable(), make sure that we never add a buffer
// after channel released all buffers via releaseAllResources().
if (inputChannel.isReleased()) {
globalPool.recycleMemorySegments(Collections.singletonList(segment));
return;
} else {
// 釋放多余的浮動(dòng)buffer
releasedFloatingBuffer = bufferQueue.addExclusiveBuffer(
new NetworkBuffer(segment, this), numRequiredBuffers);
}
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
} finally {
bufferQueue.notifyAll();
}
}
if (releasedFloatingBuffer != null) {
// 如果有多余的浮動(dòng)buffer,則釋放掉
releasedFloatingBuffer.recycleBuffer();
} else {
try {
// 有buffer釋放,就可以通知channel有buffer可用,就會(huì)向server反饋信用
inputChannel.notifyBufferAvailable(1);
} catch (Throwable t) {
ExceptionUtils.rethrow(t);
}
}
}
// ------------------------------------------------------------------------------------
// 這個(gè)方法比較有意思,也比較復(fù)雜,讓我們看一下方法名,通過翻譯方法得到 中文方法名 : 通知 buffer 可用
// 提前說明 : bufferPool和bufferManager都實(shí)現(xiàn)了BufferRecycler接口,實(shí)現(xiàn)了recycle方法,用于回收buffer,重新使用的
/*
實(shí)際上這個(gè)方法是有LocalBufferPool調(diào)用,在調(diào)用了recycle方法后會(huì)對(duì)memorySegment釋放,因?yàn)楦?dòng)buffer就是從localBuffer請(qǐng)求的,在創(chuàng)建(NettyBuffer=buffer)的時(shí)候需要一個(gè)BufferRecycler,在bufferManager創(chuàng)建buffer的時(shí)候傳入的是自己,而LocalBufferPool創(chuàng)建的時(shí)候傳入的是自己,所以,這里是由localBufferPool通過recycle方法調(diào)用,并且bufferManager的recycle的方法與localBufferPool實(shí)現(xiàn)不同,所以這里就會(huì)放入的浮動(dòng)buffer的隊(duì)列中
在這個(gè)方法被調(diào)用的時(shí)候會(huì)有判斷,判斷是否需要更多的buffer,如果需要更多的buffer,如果需要更多的buffer會(huì)將listener(實(shí)際上就是BufferManager,因?yàn)锽ufferManager實(shí)現(xiàn)了BufferListener)加入的listener隊(duì)列中(lintener會(huì)被循環(huán)利用),然后當(dāng)buffer使用完成了,對(duì)buffer進(jìn)行回收的時(shí)候,會(huì)選擇是根據(jù)一些條件來判斷是否返回到全局的bufferPool中或者返回到localBufferPool中
返回全局bufferPool的條件是判斷LocalBufferPool請(qǐng)求的buffer數(shù)量已經(jīng)超過了LocalBufferPool的核心buffer數(shù)量,如果超過了則返回給全局bfferPool中
返回LocalBufferPool的條件是,當(dāng)不返回全局bufferPool則會(huì)判斷是否存在listener,如果沒有l(wèi)istener則將buffer返回給localBufferPool,這樣的原因是因?yàn)?在請(qǐng)求buffer的時(shí)候如果沒有可用buffer,就會(huì)添加listener到listener隊(duì)列中,當(dāng)buffer用完之后就會(huì)根據(jù)listener是否存在決定是否還需要更多的buffer
localBufferPool 后面在介紹
*/
public BufferListener.NotificationResult notifyBufferAvailable(Buffer buffer) {
BufferListener.NotificationResult notificationResult = BufferListener.NotificationResult.BUFFER_NOT_USED;
if (inputChannel.isReleased()) {
return notificationResult;
}
try {
synchronized (bufferQueue) {
if (inputChannel.isReleased() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
isWaitingForFloatingBuffers = false;
return notificationResult;
}
// 將buffer方法放入浮動(dòng)buffer隊(duì)列
bufferQueue.addFloatingBuffer(buffer);
// 喚醒在隊(duì)列等待的線程
bufferQueue.notifyAll();
if (bufferQueue.getAvailableBufferSize() == numRequiredBuffers) {
isWaitingForFloatingBuffers = false;
notificationResult = BufferListener.NotificationResult.BUFFER_USED_NO_NEED_MORE;
} else {
notificationResult = BufferListener.NotificationResult.BUFFER_USED_NEED_MORE;
}
}
// 實(shí)際上這個(gè)判斷永遠(yuǎn)都會(huì)進(jìn)入,因?yàn)橥酱a塊的內(nèi)容已經(jīng)保證了result一定不等于BUFFER_NOT_USED
// 要注意調(diào)用這個(gè)方法說明有buffer調(diào)用了recycle,說明buffer釋放了,那么就可以被重新使用了
if (notificationResult != NotificationResult.BUFFER_NOT_USED) {
// 通過netty向server發(fā)送addCreditMessage,通知自己的信用
// 在netty server端收到credit(信用)后會(huì)記錄對(duì)應(yīng)channel的信用
// 當(dāng)server向下游發(fā)送數(shù)據(jù)的時(shí)候,會(huì)根據(jù)下游的信用值來確定發(fā)送多少數(shù)據(jù)甚至不發(fā)送
// 這樣就不會(huì)因?yàn)槟骋粋€(gè)task的反壓導(dǎo)致整個(gè)taskManger的反壓
inputChannel.notifyBufferAvailable(1);
}
} catch (Throwable t) {
inputChannel.setError(t);
}
return notificationResult;
}
// 靜態(tài)內(nèi)部類,用于維護(hù)可用buffer
static final class AvailableBufferQueue {
// 從localBufferPool申請(qǐng)的buffer,優(yōu)先使用
final ArrayDeque<Buffer> floatingBuffers;
// 從全局bufferPool申請(qǐng)的buffer,為channel獨(dú)有
final ArrayDeque<Buffer> exclusiveBuffers;
AvailableBufferQueue() {
this.exclusiveBuffers = new ArrayDeque<>();
this.floatingBuffers = new ArrayDeque<>();
}
// 從全局buffer pool申請(qǐng)的該channel獨(dú)占的buffer
Buffer addExclusiveBuffer(Buffer buffer, int numRequiredBuffers) {
exclusiveBuffers.add(buffer);
// 如果可用的buffer大于bufferManager的必須的buffer數(shù)量,則會(huì)釋放掉多余的浮動(dòng)buffer
if (getAvailableBufferSize() > numRequiredBuffers) {
return floatingBuffers.poll();
}
return null;
}
// 申請(qǐng)的浮動(dòng)buffer,可以認(rèn)為是獨(dú)占的buffer用完了,開始申請(qǐng)臨時(shí)buffer
void addFloatingBuffer(Buffer buffer) {
floatingBuffers.add(buffer);
}
// 在返回的時(shí)候優(yōu)先返回浮動(dòng)buffer
// 為什么先請(qǐng)求浮動(dòng)buffer呢,因?yàn)橹挥歇?dú)有buffer用完之后才會(huì)請(qǐng)求浮動(dòng)buffer,如果浮動(dòng)buffer
// 有buffer,則一定說明了獨(dú)有buffer用完了,你們覺得呢
Buffer takeBuffer() {
if (floatingBuffers.size() > 0) {
return floatingBuffers.poll();
} else {
return exclusiveBuffers.poll();
}
}
}
}
四. tm和jm內(nèi)存分配代碼 或者 內(nèi)存分配源碼
了解即可,如果感興趣可以自行看代碼,等我想起來在補(bǔ)齊
public abstract class AbstractContainerizedClusterClientFactory<ClusterID>
implements ClusterClientFactory<ClusterID> {
@Override
public ClusterSpecification getClusterSpecification(Configuration configuration) {
checkNotNull(configuration);
final int jobManagerMemoryMB =
JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
configuration, JobManagerOptions.TOTAL_PROCESS_MEMORY)
.getTotalProcessMemorySize()
.getMebiBytes();
final int taskManagerMemoryMB =
TaskExecutorProcessUtils.processSpecFromConfig(
TaskExecutorProcessUtils
.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
configuration,
TaskManagerOptions.TOTAL_PROCESS_MEMORY))
.getTotalProcessMemorySize()
.getMebiBytes();
int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
return new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(jobManagerMemoryMB)
.setTaskManagerMemoryMB(taskManagerMemoryMB)
.setSlotsPerTaskManager(slotsPerTaskManager)
.createClusterSpecification();
}
}