Flink序列化框架分析

1.Flink的TypeInformation類

TypeInformation是flink中所有類型的基類癣诱,其作為生產(chǎn)序列化器和比較的一個工具。它包括了類型的一些基本屬性剪个,并可以通過它來生產(chǎn)序列化器(serializer)食店,特殊情況下還可以生成類型比較器材义。(Flink中的比較器不僅僅是定義大小順序青责,更是處理keys的基本輔助工具)

  • 基本類型:所有Java基本數(shù)據(jù)類型和對應(yīng)的裝箱類型挺据,加上void取具,String,Date扁耐,BigDecimal和BigInteger
  • 基本數(shù)組和對象數(shù)組
  • 復(fù)合類型:
    • Flink Java Tuples (Flink Java API的一部分): 最多25個成員暇检,不支持null成員
    • Scala case 類 (包括 Scala tuples): 最多25個成員, 不支持null成員
    • Row: 包含任意多個字段的元組并且支持null成員
    • POJOs: 遵循類bean模式的類
  • 輔助類型 (Option, Either, Lists, Maps, …)
  • 泛型: Flink自身不會序列化泛型,而是借助Kryo進(jìn)行序列化.

POJO類非常有意思婉称,因為POJO類可以支持復(fù)雜類型的創(chuàng)建块仆,并且在定義keys時可以使用成員的名字:dataSet.join(another).where("name").equalTo("personName")。同時王暗,POJO類對于運行時(runtime)是透明的悔据,這使得Flink可以非常高效地處理它們。

1.1 POJO類型的規(guī)則

在滿足如下條件時瘫筐,F(xiàn)link會將這種數(shù)據(jù)類型識別成POJO類型(并允許以成員名引用字段):

  • 該類是public的并且是獨立的(即沒有非靜態(tài)的內(nèi)部類)
  • 該類有一個public的無參構(gòu)造方法
  • 該類(及該類的父類)的所有成員要么是public的蜜暑,要么是擁有按照標(biāo)準(zhǔn)java bean命名規(guī)則命名的public getter和 public setter方法铐姚。

1.2 創(chuàng)建一個TypeInformation對象或序列化器###

創(chuàng)建一個TypeInformation對象時如下:

在Scala中策肝,F(xiàn)link使用在編譯時運行的宏,在宏可供調(diào)用時去捕獲所有泛型信息隐绵。

// 重要: 為了能夠訪問'createTypeInformation' 的宏方法之众,這個import是必須的
import org.apache.flink.streaming.api.scala._

val stringInfo: TypeInformation[String] = createTypeInformation[String]

val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)]

你也可以在Java使用相同的方法作為備選。

為了創(chuàng)建一個序列化器(TypeSerializer)依许,只需要在TypeInformation 對象上調(diào)用typeInfo.createSerializer(config)方法棺禾。

config參數(shù)的類型是ExecutionConfig,它保留了程序的注冊的自定義序列化器的相關(guān)信息峭跳。在可能用到TypeSerializer的地方膘婶,盡量傳入程序的ExecutionConfig,你可以調(diào)用DataStream 或 DataSet的 getExecutionConfig()方法獲取ExecutionConfig蛀醉。一些內(nèi)部方法(如:MapFunction)中悬襟,你可以通過將該方法變成一個Rich Function,然后調(diào)用getRuntimeContext().getExecutionConfig()獲取ExecutionConfig.

2 基本類型實現(xiàn)示例

以String為例:

//BasicTypeInfo.java
public static final BasicTypeInfo<String> STRING_TYPE_INFO = new BasicTypeInfo<>(String.class, new Class<?>[]{}, StringSerializer.INSTANCE, StringComparator.class);

StringSerializer如下

//StringSerializer.java
public final class StringSerializer extends TypeSerializerSingleton<String> {

    private static final long serialVersionUID = 1L;
    
    public static final StringSerializer INSTANCE = new StringSerializer();
    
    private static final String EMPTY = "";

    @Override
    public boolean isImmutableType() {
        return true;
    }

    @Override
    public String createInstance() {
        return EMPTY;
    }

    @Override
    public String copy(String from) {
        return from;
    }
    
    @Override
    public String copy(String from, String reuse) {
        return from;
    }

    @Override
    public int getLength() {
        return -1;
    }

    @Override
    public void serialize(String record, DataOutputView target) throws IOException {
        StringValue.writeString(record, target);
    }

    @Override
    public String deserialize(DataInputView source) throws IOException {
        return StringValue.readString(source);
    }
    
    @Override
    public String deserialize(String record, DataInputView source) throws IOException {
        return deserialize(source);
    }

    @Override
    public void copy(DataInputView source, DataOutputView target) throws IOException {
        StringValue.copyString(source, target);
    }

    @Override
    public boolean canEqual(Object obj) {
        return obj instanceof StringSerializer;
    }

    @Override
    protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
        return super.isCompatibleSerializationFormatIdentifier(identifier)
                || identifier.equals(StringValue.class.getCanonicalName());
    }
}

上面代碼中出現(xiàn)的StringValue是真正進(jìn)行input以及output序列化過程操作拯刁,基本類型都有相應(yīng)的方法脊岳,后面會單獨說明下多字段Record序列化形式。
StringComparator如下

public final class StringComparator extends BasicTypeComparator<String> {

    private static final long serialVersionUID = 1L;
    
    private static final int HIGH_BIT = 0x1 << 7;
    
    private static final int HIGH_BIT2 = 0x1 << 13;
    
    private static final int HIGH_BIT2_MASK = 0x3 << 6;
    
    
    public StringComparator(boolean ascending) {
        super(ascending);
    }

    @Override
    public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
        String s1 = StringValue.readString(firstSource);
        String s2 = StringValue.readString(secondSource);
        int comp = s1.compareTo(s2); 
        return ascendingComparison ? comp : -comp;
    }


    @Override
    public boolean supportsNormalizedKey() {
        return true;
    }


    @Override
    public boolean supportsSerializationWithKeyNormalization() {
        return false;
    }

    @Override
    public int getNormalizeKeyLen() {
        return Integer.MAX_VALUE;
    }

    @Override
    public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
        return true;
    }


    @Override
    public void putNormalizedKey(String record, MemorySegment target, int offset, int len) {;
        final int limit = offset + len;
        final int end = record.length();
        int pos = 0;
        
        while (pos < end && offset < limit) {
            char c = record.charAt(pos++);
            if (c < HIGH_BIT) {
                target.put(offset++, (byte) c);
            }
            else if (c < HIGH_BIT2) {
                target.put(offset++, (byte) ((c >>> 7) | HIGH_BIT));
                if (offset < limit) {
                    target.put(offset++, (byte) c);
                }
            }
            else {
                target.put(offset++, (byte) ((c >>> 10) | HIGH_BIT2_MASK));
                if (offset < limit) {
                    target.put(offset++, (byte) (c >>> 2));
                }
                if (offset < limit) {
                    target.put(offset++, (byte) c);
                }
            }
        }
        while (offset < limit) {
            target.put(offset++, (byte) 0);
        }
    }


    @Override
    public StringComparator duplicate() {
        return new StringComparator(ascendingComparison);
    }
}

3 多字段Record示例

在開始這部分原理分析之前可以先看個示例代碼

//RecordTest.java
public void testAddField() {
  try {
    // Add a value to an empty record
    Record record = new Record();
    assertTrue(record.getNumFields() == 0);
    record.addField(this.origVal1);
    assertTrue(record.getNumFields() == 1);
    assertTrue(origVal1.getValue().equals(record.getField(0, StringValue.class).getValue()));
    
    // Add 100 random integers to the record
    record = new Record();
    for (int i = 0; i < 100; i++) {
      IntValue orig = new IntValue(this.rand.nextInt());
      record.addField(orig);
      IntValue rec = record.getField(i, IntValue.class);
      
      assertTrue(record.getNumFields() == i + 1);
      assertTrue(orig.getValue() == rec.getValue());
    }
    
    // Add 3 values of different type to the record
    record = new Record(this.origVal1, this.origVal2);
    record.addField(this.origVal3);
    
    assertTrue(record.getNumFields() == 3);
    
    StringValue recVal1 = record.getField(0, StringValue.class);
    DoubleValue recVal2 = record.getField(1, DoubleValue.class);
    IntValue recVal3 = record.getField(2, IntValue.class);
    
    assertTrue("The value of the first field has changed", recVal1.equals(this.origVal1));
    assertTrue("The value of the second field changed", recVal2.equals(this.origVal2));
    assertTrue("The value of the third field has changed", recVal3.equals(this.origVal3));
  } catch (Throwable t) {
    Assert.fail("Test failed due to an exception: " + t.getMessage());
  }
}

Record代表多個數(shù)值的記錄垛玻,其可以包含多個字段(可空并不體現(xiàn)在該記錄中)割捅,內(nèi)部有一個bitmap標(biāo)記字段是否被賦值。為了數(shù)據(jù)交換方便帚桩,Record中的數(shù)據(jù)都以bytes方式存儲亿驾,字段在訪問時才被進(jìn)行反序列化。當(dāng)字段被修改時首先是放在cache中账嚎,并在下次序列化時合入或者顯式調(diào)用updateBinaryRepresenation()方法莫瞬。
Notes:

  • 該record必須是一個可變的對象参淹,這樣才可以被多個自定義方法使用來提升性能(后面單獨分析)。該record是一個比較中的對象乏悄,為了減少對每個字段的序列化浙值、反序列化操作,其保存了比較大的狀態(tài)檩小,需要有多個指針以及數(shù)組开呐,從而要占用相對比較大的內(nèi)存空間,在64位的JVM中要占用超過200bytes规求。
  • 該類是非線程安全的

4 存放Record的數(shù)據(jù)結(jié)構(gòu)

針對上面提出的存放數(shù)據(jù)結(jié)構(gòu)的疑問筐付,這里繼續(xù)深入分析下。

  • 將record放在一個迭代器中阻肿,當(dāng)前存在一個叫BlockResettableMutableObjectIterator瓦戚,其包含如下一些方法,讀寫都是在這個迭代器中進(jìn)行丛塌。
    Record迭代器.png

其中以無參數(shù)next()方法為示例走讀存儲或者讀取流程较解,代碼如下:

public T next() throws IOException {
        // check for the left over element
        if (this.readPhase) {
            return getNextRecord();
        } else {
            // writing phase. check for leftover first
            T result = null;
            if (this.leftOverReturned) {
                // get next record
                if ((result = this.input.next()) != null) {
                    if (writeNextRecord(result)) {
                        return result;
                    } else {
                        // did not fit into memory, keep as leftover
                        this.leftOverRecord = this.serializer.copy(result);
                        this.leftOverReturned = false;
                        this.fullWriteBuffer = true;
                        return null;
                    }
                } else {
                    this.noMoreBlocks = true;
                    return null;
                }
            } else if (this.fullWriteBuffer) {
                return null;
            } else {
                this.leftOverReturned = true;
                return this.leftOverRecord;
            }
        }
    }

通過源碼可以看出,在方法執(zhí)行時根據(jù)標(biāo)記判斷是讀取還是寫入流程赴邻,同時方法對應(yīng)getNextRecord和writeNextRecord兩個方法印衔,都在抽象類AbstractBlockResettableIterator中,兩個方法源碼如下:

protected T getNextRecord() throws IOException {
        if (this.numRecordsReturned < this.numRecordsInBuffer) {
            this.numRecordsReturned++;
            return this.serializer.deserialize(this.readView);
        } else {
            return null;
        }
    }
protected boolean writeNextRecord(T record) throws IOException {
        try {
            this.serializer.serialize(record, this.collectingView);
            this.numRecordsInBuffer++;
            return true;
        } catch (EOFException eofex) {
            return false;
        }
    }

其中存放數(shù)據(jù)是基于Flink內(nèi)存管理部分進(jìn)行申請以及維護(hù)大小等姥敛,相關(guān)初始化源碼如下:

 memoryManager.allocatePages(ownerTask, emptySegments, numPages);
        
 this.collectingView = new SimpleCollectingOutputView(this.fullSegments, 
                        new ListMemorySegmentSource(this.emptySegments), memoryManager.getPageSize());
 this.readView = new RandomAccessInputView(this.fullSegments, memoryManager.getPageSize());

5 Flink 如何直接操作二進(jìn)制數(shù)據(jù)

Flink 提供了如 group奸焙、sort、join 等操作彤敛,這些操作都需要訪問海量數(shù)據(jù)与帆。這里,我們以sort為例墨榄,這是一個在 Flink 中使用非常頻繁的操作玄糟。
首先,F(xiàn)link 會從 MemoryManager 中申請一批 MemorySegment渠概,我們把這批 MemorySegment 稱作 sort buffer茶凳,用來存放排序的數(shù)據(jù)。

sort示例.png

我們會把 sort buffer 分成兩塊區(qū)域播揪。一個區(qū)域是用來存放所有對象完整的二進(jìn)制數(shù)據(jù)贮喧。另一個區(qū)域用來存放指向完整二進(jìn)制數(shù)據(jù)的指針以及定長的序列化后的key(key+pointer)。如果需要序列化的key是個變長類型猪狈,如String箱沦,則會取其前綴序列化。如上圖所示雇庙,當(dāng)一個對象要加到 sort buffer 中時谓形,它的二進(jìn)制數(shù)據(jù)會被加到第一個區(qū)域灶伊,指針(可能還有key)會被加到第二個區(qū)域。

將實際的數(shù)據(jù)和指針加定長key分開存放有兩個目的寒跳。第一聘萨,交換定長塊(key+pointer)更高效,不用交換真實的數(shù)據(jù)也不用移動其他key和pointer童太。第二米辐,這樣做是緩存友好的,因為key都是連續(xù)存儲在內(nèi)存中的书释,可以大大減少 cache miss(后面會詳細(xì)解釋)翘贮。

排序的關(guān)鍵是比大小和交換。Flink 中爆惧,會先用 key 比大小狸页,這樣就可以直接用二進(jìn)制的key比較而不需要反序列化出整個對象。因為key是定長的扯再,所以如果key相同(或者沒有提供二進(jìn)制key)芍耘,那就必須將真實的二進(jìn)制數(shù)據(jù)反序列化出來,然后再做比較叔收。之后齿穗,只需要交換key+pointer就可以達(dá)到排序的效果傲隶,真實的數(shù)據(jù)不用移動饺律。

sort指針.png

最后,訪問排序后的數(shù)據(jù)跺株,可以沿著排好序的key+pointer區(qū)域順序訪問复濒,通過pointer找到對應(yīng)的真實數(shù)據(jù),并寫到內(nèi)存或外部(更多細(xì)節(jié)可以看這篇文章 Joins in Flink)乒省。

5.1 緩存友好的數(shù)據(jù)結(jié)構(gòu)和算法

隨著磁盤IO和網(wǎng)絡(luò)IO越來越快巧颈,CPU逐漸成為了大數(shù)據(jù)領(lǐng)域的瓶頸。從 L1/L2/L3 緩存讀取數(shù)據(jù)的速度比從主內(nèi)存讀取數(shù)據(jù)的速度快好幾個量級袖扛。通過性能分析可以發(fā)現(xiàn)砸泛,CPU時間中的很大一部分都是浪費在等待數(shù)據(jù)從主內(nèi)存過來上。如果這些數(shù)據(jù)可以從 L1/L2/L3 緩存過來蛆封,那么這些等待時間可以極大地降低唇礁,并且所有的算法會因此而受益。

在上面討論中我們談到的惨篱,F(xiàn)link 通過定制的序列化框架將算法中需要操作的數(shù)據(jù)(如sort中的key)連續(xù)存儲盏筐,而完整數(shù)據(jù)存儲在其他地方。因為對于完整的數(shù)據(jù)來說砸讳,key+pointer更容易裝進(jìn)緩存琢融,這大大提高了緩存命中率界牡,從而提高了基礎(chǔ)算法的效率。這對于上層應(yīng)用是完全透明的漾抬,可以充分享受緩存友好帶來的性能提升宿亡。

References

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市纳令,隨后出現(xiàn)的幾起案子她混,更是在濱河造成了極大的恐慌,老刑警劉巖泊碑,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件坤按,死亡現(xiàn)場離奇詭異,居然都是意外死亡馒过,警方通過查閱死者的電腦和手機(jī)臭脓,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來腹忽,“玉大人来累,你說我怎么就攤上這事【阶啵” “怎么了嘹锁?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長着裹。 經(jīng)常有香客問我领猾,道長,這世上最難降的妖魔是什么骇扇? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任摔竿,我火速辦了婚禮,結(jié)果婚禮上少孝,老公的妹妹穿的比我還像新娘继低。我一直安慰自己,他們只是感情好稍走,可當(dāng)我...
    茶點故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布袁翁。 她就那樣靜靜地躺著,像睡著了一般婿脸。 火紅的嫁衣襯著肌膚如雪粱胜。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天盖淡,我揣著相機(jī)與錄音年柠,去河邊找鬼。 笑死,一個胖子當(dāng)著我的面吹牛冗恨,可吹牛的內(nèi)容都是我干的答憔。 我是一名探鬼主播,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼掀抹,長吁一口氣:“原來是場噩夢啊……” “哼虐拓!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起傲武,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤蓉驹,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后揪利,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體态兴,經(jīng)...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年疟位,在試婚紗的時候發(fā)現(xiàn)自己被綠了瞻润。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡甜刻,死狀恐怖绍撞,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情得院,我是刑警寧澤傻铣,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站祥绞,受9級特大地震影響非洲,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜就谜,卻給世界環(huán)境...
    茶點故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一怪蔑、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧丧荐,春花似錦、人聲如沸喧枷。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽隧甚。三九已至车荔,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間戚扳,已是汗流浹背忧便。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留帽借,地道東北人珠增。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓超歌,卻偏偏與公主長得像,于是被迫代替她去往敵國和親蒂教。 傳聞我的和親對象是個殘疾皇子巍举,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內(nèi)容