Flink內(nèi)存管理

Flink內(nèi)存管理

1.簡(jiǎn)介

自從2003-2006年粤蝎,Google發(fā)表了三篇著名的大數(shù)據(jù)相關(guān)論文(Google FS,MapReduce,Big Table)后童谒,內(nèi)存問題一直困擾大數(shù)據(jù)工程師們塞帐。

這一問題從MR1.0一直延續(xù)到Spark時(shí)代拦赠,從Spark晚期版本試圖由應(yīng)用程序自行管理內(nèi)存后,人們才初步解決了內(nèi)存問題葵姥。

使用原生的JVM內(nèi)存管理會(huì)帶來如下的致命問題:

  • JVM對(duì)象存儲(chǔ)密度低荷鼠,在32位系統(tǒng)或開啟指針壓縮的64位系統(tǒng)中,普通對(duì)象(非數(shù)組)對(duì)象頭占用64bit榔幸,尾部還需要8字節(jié)對(duì)齊允乐。
  • JVM GC導(dǎo)致的毛刺和性能問題,由于計(jì)算引擎會(huì)頻繁創(chuàng)建對(duì)象削咆,小對(duì)象會(huì)被創(chuàng)建在新生代導(dǎo)致頻繁的minor GC和STW牍疏,大對(duì)象會(huì)被直接創(chuàng)建在老年代導(dǎo)致大量的并發(fā)式GC(CMS)或混合式GC(G1),并且GC的觸發(fā)和執(zhí)行完全由JVM控制拨齐,計(jì)算引擎無法干預(yù)鳞陨。
  • 潛在的OOM風(fēng)險(xiǎn),OOM發(fā)生的時(shí)機(jī)不可控瞻惋。

在Apache Flink中厦滤,taskManager自行管理的內(nèi)存,避免了JVM原生內(nèi)存管理的缺陷熟史,本文將詳細(xì)介紹相關(guān)邏輯馁害。

2.內(nèi)存模型

Task manager管理的JVM內(nèi)存主要分為Network BuffersMemoryManagerFree 三個(gè)區(qū)域蹂匹。

  • Network Buffers碘菜,shuffle / broadcost網(wǎng)絡(luò)活動(dòng)相關(guān)的內(nèi)存
  • MemoryManager,cache / sorting / hashing 計(jì)算相關(guān)的內(nèi)存
  • Free限寞,存放用戶代碼產(chǎn)生的對(duì)象

3.代碼分析

3.1 TaskManagerOptions

內(nèi)存管理的相關(guān)配置

  • MEMORY_SEGMENT_SIZE——內(nèi)存段大小忍啸,默認(rèn)32kb。內(nèi)存段(segment)是Flink內(nèi)存管理的基本模型履植。
  • MANAGED_MEMORY_SIZE——task manager管理的內(nèi)存大小计雌,如果不設(shè)置則使用MANAGED_MEMORY_FRACTION
  • MANAGED_MEMORY_FRACTION——task manager管理的內(nèi)存占比,默認(rèn)0.7f
  • MEMORY_OFF_HEAP——是否使用堆外內(nèi)存玫霎,默認(rèn)false即使用堆內(nèi)內(nèi)存
  • MANAGED_MEMORY_PRE_ALLOCATE——task manager啟動(dòng)時(shí)是否預(yù)分配凿滤,默認(rèn)false
  • NETWORK_NUM_BUFFERS——網(wǎng)絡(luò)緩沖區(qū)的segment數(shù)量妈橄,默認(rèn)2048
  • NETWORK_BUFFERS_MEMORY_FRACTION——網(wǎng)絡(luò)緩沖區(qū)的內(nèi)存占比,默認(rèn)0.1
  • NETWORK_BUFFERS_MEMORY_MIN——網(wǎng)絡(luò)緩沖區(qū)的最小size翁脆,默認(rèn)64MB
  • NETWORK_BUFFERS_MEMORY_MAX——網(wǎng)絡(luò)緩沖區(qū)的最大size眷蚓,默認(rèn)1GB

3.2 MemoryPool

靜態(tài)抽象類MemoryPool定義了內(nèi)存池的方法,它有兩個(gè)實(shí)現(xiàn)類HybridHeapMemoryPool和HybridOffHeapMemoryPool反番,堆內(nèi)內(nèi)存池和堆外內(nèi)存池沙热。

abstract static class MemoryPool {

    abstract int getNumberOfAvailableMemorySegments();

    abstract MemorySegment allocateNewSegment(Object owner);

    abstract MemorySegment requestSegmentFromPool(Object owner);

    abstract void returnSegmentToPool(MemorySegment segment);

    abstract void clear();
  }

3.3 MemoryManager

MemoryManager 類負(fù)責(zé)管理sorting,、hashing罢缸、caching使用的內(nèi)存篙贸,主要方法有allocatePages(申請(qǐng)內(nèi)存段)和release(釋放內(nèi)存段)

public void allocatePages(Object owner, List<MemorySegment> target, int numPages)
      throws MemoryAllocationException {
    ... 入?yún)⑿r?yàn)

    // -------------------- BEGIN CRITICAL SECTION -------------------
    synchronized (lock) {
      if (isShutDown) {
        throw new IllegalStateException("Memory manager has been shut down.");
      }

      // 可用內(nèi)存校驗(yàn)
      if (numPages > (memoryPool.getNumberOfAvailableMemorySegments() + numNonAllocatedPages)) {
        throw new MemoryAllocationException("Could not allocate " + numPages + " pages. Only " +
            (memoryPool.getNumberOfAvailableMemorySegments() + numNonAllocatedPages)
            + " pages are remaining.");
      }

      // allocatedSegments是個(gè)HashMap<Object, Set<MemorySegment>>,key是owner枫疆,值是此owner占用的segment列表
      Set<MemorySegment> segmentsForOwner = allocatedSegments.get(owner);
      if (segmentsForOwner == null) {
        segmentsForOwner = new HashSet<MemorySegment>(numPages);
        allocatedSegments.put(owner, segmentsForOwner);
      }

      if (isPreAllocated) {
        for (int i = numPages; i > 0; i--) {
          MemorySegment segment = memoryPool.requestSegmentFromPool(owner);
          target.add(segment);
          segmentsForOwner.add(segment);
        }
      }
      else {
        for (int i = numPages; i > 0; i--) {
          MemorySegment segment = memoryPool.allocateNewSegment(owner);
          target.add(segment);
          segmentsForOwner.add(segment);
        }
        numNonAllocatedPages -= numPages;
      }
    }
    // -------------------- END CRITICAL SECTION -------------------
  }

3.4 MemorySegment

MemorySegment類管理Flink中的一個(gè)內(nèi)存頁爵川,MemorySegment是抽象類有兩個(gè)實(shí)現(xiàn)類HeapMemorySegment和HybridMemorySegment。

MemorySegment定義了Segment的基本操作:

// 返回segment字節(jié)數(shù)
public int size();
// segment是否已釋放
public boolean isFreed();
// 釋放segment
public void free();
// 是否使用堆外內(nèi)存
public boolean isOffHeap();
// 返回堆內(nèi)內(nèi)存的數(shù)組
public byte[] getArray();
// 返回堆外內(nèi)存的地址
public long getAddress();
// 返回指定區(qū)域的數(shù)據(jù)息楔,并封裝成ByteBuffer
public abstract ByteBuffer wrap(int offset, int length);
// 返回segment owner
public Object getOwner();

// 隨機(jī)讀寫API
public abstract byte get(int index);
public abstract void put(int index, byte b);
public abstract void get(int index, byte[] dst);
public abstract void put(int index, byte[] src);
public abstract void get(int index, byte[] dst, int offset, int length);
public abstract void put(int index, byte[] src, int offset, int length);
public abstract boolean getBoolean(int index);
public abstract void putBoolean(int index, boolean value);
public final char getChar(int index);
public final char getCharLittleEndian(int index);
public final char getCharBigEndian(int index);
public final void putChar(int index, char value);
public final void putCharLittleEndian(int index, char value);
public final void putCharBigEndian(int index, char value);
public final short getShort(int index);
public final short getShortLittleEndian(int index);
public final short getShortBigEndian(int index);
public final void putShort(int index, short value);
public final void putShortLittleEndian(int index, short value);
public final void putShortBigEndian(int index, short value);
public final int getInt(int index);
public final int getIntLittleEndian(int index);
public final int getIntBigEndian(int index);
public final void putInt(int index, int value);
public final void putIntLittleEndian(int index, int value);
public final void putIntBigEndian(int index, int value);
public final long getLong(int index);
public final long getLongLittleEndian(int index);
public final long getLongBigEndian(int index);
public final void putLong(int index, long value);
public final void putLongLittleEndian(int index, long value);
public final void putLongBigEndian(int index, long value);
public final float getFloat(int index);
public final float getFloatLittleEndian(int index);
public final float getFloatBigEndian(int index);
public final void putFloat(int index, float value);
public final void putFloatLittleEndian(int index, float value);
public final void putFloatBigEndian(int index, float value);
public final double getDouble(int index);
public final double getDoubleLittleEndian(int index);
public final double getDoubleBigEndian(int index);
public final void putDouble(int index, double value);
public final void putDoubleLittleEndian(int index, double value);
public final void putDoubleBigEndian(int index, double value);
public abstract void get(DataOutput out, int offset, int length) throws IOException;
public abstract void put(DataInput in, int offset, int length) throws IOException;
public abstract void get(int offset, ByteBuffer target, int numBytes);
public abstract void put(int offset, ByteBuffer source, int numBytes);
// 拷貝數(shù)據(jù)到目標(biāo)segment
public final void copyTo(int offset, MemorySegment target, int targetOffset, int numBytes);
// 比較兩個(gè)segment的數(shù)據(jù)
public final int compare(MemorySegment seg2, int offset1, int offset2, int len);
// 交互兩個(gè)segment的數(shù)據(jù)
public final void swapBytes(byte[] tempBuffer, MemorySegment seg2, int offset1, int offset2, int len);

4. 總結(jié)

通過MemoryManager雁芙、MemoryPool、MemorySegment等類钞螟,F(xiàn)link實(shí)現(xiàn)了應(yīng)用層級(jí)對(duì)于內(nèi)存的管理兔甘,規(guī)避了JVM原生內(nèi)存管理帶來的諸多問題,有效的提升了Flink的內(nèi)存效率和性能鳞滨。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末洞焙,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子拯啦,更是在濱河造成了極大的恐慌澡匪,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,372評(píng)論 6 498
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件褒链,死亡現(xiàn)場(chǎng)離奇詭異唁情,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)甫匹,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,368評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門甸鸟,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人兵迅,你說我怎么就攤上這事抢韭。” “怎么了恍箭?”我有些...
    開封第一講書人閱讀 162,415評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵刻恭,是天一觀的道長。 經(jīng)常有香客問我扯夭,道長鳍贾,這世上最難降的妖魔是什么鞍匾? 我笑而不...
    開封第一講書人閱讀 58,157評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮骑科,結(jié)果婚禮上候学,老公的妹妹穿的比我還像新娘。我一直安慰自己纵散,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,171評(píng)論 6 388
  • 文/花漫 我一把揭開白布隐圾。 她就那樣靜靜地躺著伍掀,像睡著了一般。 火紅的嫁衣襯著肌膚如雪暇藏。 梳的紋絲不亂的頭發(fā)上蜜笤,一...
    開封第一講書人閱讀 51,125評(píng)論 1 297
  • 那天,我揣著相機(jī)與錄音盐碱,去河邊找鬼把兔。 笑死,一個(gè)胖子當(dāng)著我的面吹牛瓮顽,可吹牛的內(nèi)容都是我干的县好。 我是一名探鬼主播,決...
    沈念sama閱讀 40,028評(píng)論 3 417
  • 文/蒼蘭香墨 我猛地睜開眼暖混,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼缕贡!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起拣播,我...
    開封第一講書人閱讀 38,887評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤晾咪,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后贮配,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體谍倦,經(jīng)...
    沈念sama閱讀 45,310評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,533評(píng)論 2 332
  • 正文 我和宋清朗相戀三年泪勒,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了昼蛀。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,690評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡圆存,死狀恐怖曹洽,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情辽剧,我是刑警寧澤送淆,帶...
    沈念sama閱讀 35,411評(píng)論 5 343
  • 正文 年R本政府宣布,位于F島的核電站怕轿,受9級(jí)特大地震影響偷崩,放射性物質(zhì)發(fā)生泄漏辟拷。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,004評(píng)論 3 325
  • 文/蒙蒙 一阐斜、第九天 我趴在偏房一處隱蔽的房頂上張望衫冻。 院中可真熱鬧,春花似錦谒出、人聲如沸隅俘。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,659評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽为居。三九已至,卻和暖如春杀狡,著一層夾襖步出監(jiān)牢的瞬間蒙畴,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,812評(píng)論 1 268
  • 我被黑心中介騙來泰國打工呜象, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留膳凝,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 47,693評(píng)論 2 368
  • 正文 我出身青樓恭陡,卻偏偏與公主長得像蹬音,于是被迫代替她去往敵國和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子休玩,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,577評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容