Flink POJO類型ValueState演化原理剖析

一帅韧、起因

最近使用Flink(1.8.1)遇到一個問題务唐,ValueState中數(shù)據(jù)結(jié)構(gòu)的一個字段類型在開發(fā)的時候定義的是int類型服球,但是實際上應(yīng)該是long類型汛兜。flink雖然從1.8.0開始支持POJO類型的演化劝篷,但只支持字段的增減哨鸭,不支持字段類型的改變,具體規(guī)則如下:


POJO types

Flink supports evolving schema of POJO types, based on the following set of rules:

  1. Fields can be removed. Once removed, the previous value for the removed field will be dropped in future checkpoints and savepoints.
  2. New fields can be added. The new field will be initialized to the default value for its type, as defined by Java.
  3. Declared fields types cannot change.
  4. Class name of the POJO type cannot change, including the namespace of the class.

Note that the schema of POJO type state can only be evolved when restoring from a previous savepoint with Flink versions newer than 1.8.0. When restoring with Flink versions older than 1.8.0, the schema cannot be changed.


為了解決當(dāng)前遇到的困境和內(nèi)心的疑問娇妓,下文將解答以下幾個問題:

  • 如何知道用的ValueState是不是POJO類型的像鸡?
  • 為什么POJO 支持字段增減,不支持改類型哈恰?
  • 遇到字段類型需要改變的場景如何搶救只估?

二、如何知道用的ValueState是不是POJO類型的

有幾個方法着绷,第一種方法是參考Flink給出的POJO類型規(guī)范:


Rules for POJO types

Flink recognizes a data type as a POJO type (and allows “by-name” field referencing) if the following conditions are fulfilled:

  • The class is public and standalone (no non-static inner class)
  • The class has a public no-argument constructor
  • All non-static, non-transient fields in the class (and all superclasses) are either public (and non-final) or have a public getter- and a setter- method that follows the Java beans naming conventions for getters and setters.

Note that when a user-defined data type can’t be recognized as a POJO type, it must be processed as GenericType and serialized with Kryo.


如果沒有被識別為POJO類型的蛔钙,將按照Kryo方式進(jìn)行操作。注意Kryo目前是不支持演化的荠医。
當(dāng)?shù)谝环N方法不能讓你覺得放心吁脱,還有一種更方便的方法桑涎。

/**
     * Create a new {@code StateDescriptor} with the given name and the given type information.
     *
     * <p>If this constructor fails (because it is not possible to describe the type via a class),
     * consider using the {@link #StateDescriptor(String, TypeInformation, Object)} constructor.
     *
     * @param name The name of the {@code StateDescriptor}.
     * @param type The class of the type of values in the state.
     * @param defaultValue The default value that will be set when requesting state without setting
     *                     a value before.
     */
    protected StateDescriptor(String name, Class<T> type, @Nullable T defaultValue) {
        this.name = checkNotNull(name, "name must not be null");
        checkNotNull(type, "type class must not be null");

        try {
            this.typeInfo = TypeExtractor.createTypeInfo(type);
        } catch (Exception e) {
            throw new RuntimeException(
                    "Could not create the type information for '" + type.getName() + "'. " +
                    "The most common reason is failure to infer the generic type information, due to Java's type erasure. " +
                    "In that case, please pass a 'TypeHint' instead of a class to describe the type. " +
                    "For example, to describe 'Tuple2<String, String>' as a generic type, use " +
                    "'new PravegaDeserializationSchema<>(new TypeHint<Tuple2<String, String>>(){}, serializer);'", e);
        }

        this.defaultValue = defaultValue;
    }

上面這個方法是在創(chuàng)建ValueState的時候需要傳入的描述類的構(gòu)造方法。其中

 this.typeInfo = TypeExtractor.createTypeInfo(type);

就是為了識別出是什么類型兼贡∈矗可以自己寫個main方法單獨調(diào)用這個方法,type傳入自己存入的類紧显。如果執(zhí)行返回的是PojoTypeInfo,則認(rèn)為是POJO缕棵。如果是GenericTypeInfo則會使用Kryo孵班。

三、為什么POJO 支持字段增減招驴,不支持改類型

Flink通過org.apache.flink.api.java.typeutils.runtime.PojoSerializer實現(xiàn)對ValueState中存儲POJO對象的序列化和反序列化操作篙程。
我們假定ValueState存儲POJO類:

//這里使用了lombok
@Data 
public class Dog {
    private int type;
    private String name;
    private int age;
    private int sex;
}

下面是截取的序列化方法serialize的代碼,fields是用戶類的字段的字段反射數(shù)組别厘,長度為4虱饿,分別對應(yīng)4個字段,類為java.lang.reflect.Field触趴。fieldSerializers是字段對應(yīng)的序列化器氮发,同樣長度也是4,如果字段類型是int冗懦,序列化器是IntSerializer爽冕,以此類推。通過反射獲取對象字段內(nèi)容披蕉,然后調(diào)用各自的序列化器寫入到字節(jié)流颈畸。

for (int i = 0; i < numFields; i++) {
        Object o = (fields[i] != null) ? fields[i].get(value) : null;
    if (o == null) {
                //如果對象該字段為null,則寫入true
        target.writeBoolean(true); // null field handling
    } else {
                //如果對象該字段不為null没讲,則寫入false眯娱,然后寫入序列化之后的值內(nèi)容
        target.writeBoolean(false);
        fieldSerializers[i].serialize(o, target);
    }
}

Flink在做checkpoint時會將ValueState的對象「字段內(nèi)容」和「字段相應(yīng)的序列化器」同時進(jìn)行快照。序列化器的快照操作調(diào)用了PojoSerializer的以下方法:

@Override
    public PojoSerializerSnapshot<T> snapshotConfiguration() {
        return buildSnapshot(
                clazz,
                registeredClasses,
                registeredSerializers,
                fields,
                fieldSerializers,
                subclassSerializerCache);
    }

反序列化方法deserialize的代碼判斷截取如下爬凑。

for (int i = 0; i < numFields; i++) {
        //讀取對象下一個字段是否為null
    boolean isNull = source.readBoolean();
        
        //下一個字段的反射Field對象是否為null徙缴。如果這個字段在新的POJO類中被刪除則為null。
    if (fields[i] != null) {
        if (isNull) {
                        fields[i].set(target, null);
        } else {
            Object field = fieldSerializers[i].deserialize(source);
            fields[i].set(target, field);
        }
    } else if (!isNull) {
        // 這條分支表示:該字段在新的POJO結(jié)構(gòu)中刪除了贰谣,但是在之前序列化時值非null
                // 雖然這個值不需要賦值到新的POJO對象中娜搂,但是需要保證流的結(jié)構(gòu)正確,還是需要跳過這部分
        fieldSerializers[i].deserialize(source);
    }
}

字段減少

字段沒有變化情況從略吱抚,這里對于字段增減的場景進(jìn)行說明百宇。假定ValueState做了一次checkpoint后去掉了name字段,類結(jié)構(gòu)變?yōu)椋?/p>

//這里使用了lombok
@Data 
public class Dog {
    private int type;
    private int age;
    private int sex;
}

程序在反序列化的時候fields長度和fieldSerializers長度依然都是4秘豹。不同的是fields數(shù)組中字段name對應(yīng)數(shù)據(jù)元素值為null携御。name字段的反序列化以下分支:

fieldSerializers[i].deserialize(source);

字段增加

假定ValueState做了一次checkpoint后增了address字段,類結(jié)構(gòu)變?yōu)椋?/p>

//這里使用了lombok
@Data 
public class Dog {
    private int type;
    private String name;
    private int age;
    private String address;
    private int sex;
}

程序在反序列化的時候fields長度和fieldSerializers長度仍然都是4。走正常反序列化流程啄刹,不同的是對象新增加的字段address值為null涮坐。

字段類型變化

字段類型變化有可能從long到int,或者String到int誓军,F(xiàn)link無法決定如何進(jìn)行轉(zhuǎn)換袱讹,所以不支持字段類型轉(zhuǎn)換情有可原。對字段變化的兼容性檢查昵时,在使用PojoSerializer進(jìn)行反序列化之前捷雕。在org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot的getCompatibilityOfPreExistingFields方法中判斷兼容性,具體在此不展開說明壹甥。

四救巷、遇到字段類型需要改變的場景如何搶救

如果該字段暫時沒有下游使用,可以刪除掉這個字段句柠,等到checkpoint做完后浦译,再加回來,利用POJO字段可以增刪的特性溯职。
如果字段已經(jīng)有下游使用精盅,需要溝通協(xié)調(diào),比如另外增加字段存儲這個值谜酒。

六渤弛、參考內(nèi)容

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html#rules-for-pojo-types
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#pojo-types

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市甚带,隨后出現(xiàn)的幾起案子她肯,更是在濱河造成了極大的恐慌,老刑警劉巖鹰贵,帶你破解...
    沈念sama閱讀 221,548評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件晴氨,死亡現(xiàn)場離奇詭異,居然都是意外死亡碉输,警方通過查閱死者的電腦和手機(jī)籽前,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,497評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來敷钾,“玉大人枝哄,你說我怎么就攤上這事∽杌模” “怎么了挠锥?”我有些...
    開封第一講書人閱讀 167,990評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長侨赡。 經(jīng)常有香客問我蓖租,道長粱侣,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 59,618評論 1 296
  • 正文 為了忘掉前任蓖宦,我火速辦了婚禮齐婴,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘稠茂。我一直安慰自己柠偶,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 68,618評論 6 397
  • 文/花漫 我一把揭開白布睬关。 她就那樣靜靜地躺著嚣州,像睡著了一般。 火紅的嫁衣襯著肌膚如雪共螺。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,246評論 1 308
  • 那天情竹,我揣著相機(jī)與錄音藐不,去河邊找鬼。 笑死秦效,一個胖子當(dāng)著我的面吹牛雏蛮,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播阱州,決...
    沈念sama閱讀 40,819評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼挑秉,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了苔货?” 一聲冷哼從身側(cè)響起犀概,我...
    開封第一講書人閱讀 39,725評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎夜惭,沒想到半個月后姻灶,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,268評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡诈茧,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,356評論 3 340
  • 正文 我和宋清朗相戀三年产喉,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片敢会。...
    茶點故事閱讀 40,488評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡曾沈,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出鸥昏,到底是詐尸還是另有隱情塞俱,我是刑警寧澤,帶...
    沈念sama閱讀 36,181評論 5 350
  • 正文 年R本政府宣布吏垮,位于F島的核電站敛腌,受9級特大地震影響卧土,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜像樊,卻給世界環(huán)境...
    茶點故事閱讀 41,862評論 3 333
  • 文/蒙蒙 一尤莺、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧生棍,春花似錦颤霎、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,331評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至柔纵,卻和暖如春缔杉,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背搁料。 一陣腳步聲響...
    開封第一講書人閱讀 33,445評論 1 272
  • 我被黑心中介騙來泰國打工或详, 沒想到剛下飛機(jī)就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人郭计。 一個月前我還...
    沈念sama閱讀 48,897評論 3 376
  • 正文 我出身青樓霸琴,卻偏偏與公主長得像,于是被迫代替她去往敵國和親昭伸。 傳聞我的和親對象是個殘疾皇子梧乘,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,500評論 2 359