一帅韧、起因
最近使用Flink(1.8.1)遇到一個問題务唐,ValueState中數(shù)據(jù)結(jié)構(gòu)的一個字段類型在開發(fā)的時候定義的是int類型服球,但是實際上應(yīng)該是long類型汛兜。flink雖然從1.8.0開始支持POJO類型的演化劝篷,但只支持字段的增減哨鸭,不支持字段類型的改變,具體規(guī)則如下:
POJO types
Flink supports evolving schema of POJO types, based on the following set of rules:
- Fields can be removed. Once removed, the previous value for the removed field will be dropped in future checkpoints and savepoints.
- New fields can be added. The new field will be initialized to the default value for its type, as defined by Java.
- Declared fields types cannot change.
- Class name of the POJO type cannot change, including the namespace of the class.
Note that the schema of POJO type state can only be evolved when restoring from a previous savepoint with Flink versions newer than 1.8.0. When restoring with Flink versions older than 1.8.0, the schema cannot be changed.
為了解決當(dāng)前遇到的困境和內(nèi)心的疑問娇妓,下文將解答以下幾個問題:
- 如何知道用的ValueState是不是POJO類型的像鸡?
- 為什么POJO 支持字段增減,不支持改類型哈恰?
- 遇到字段類型需要改變的場景如何搶救只估?
二、如何知道用的ValueState是不是POJO類型的
有幾個方法着绷,第一種方法是參考Flink給出的POJO類型規(guī)范:
Rules for POJO types
Flink recognizes a data type as a POJO type (and allows “by-name” field referencing) if the following conditions are fulfilled:
- The class is public and standalone (no non-static inner class)
- The class has a public no-argument constructor
- All non-static, non-transient fields in the class (and all superclasses) are either public (and non-final) or have a public getter- and a setter- method that follows the Java beans naming conventions for getters and setters.
Note that when a user-defined data type can’t be recognized as a POJO type, it must be processed as GenericType and serialized with Kryo.
如果沒有被識別為POJO類型的蛔钙,將按照Kryo方式進(jìn)行操作。注意Kryo目前是不支持演化的荠医。
當(dāng)?shù)谝环N方法不能讓你覺得放心吁脱,還有一種更方便的方法桑涎。
/**
* Create a new {@code StateDescriptor} with the given name and the given type information.
*
* <p>If this constructor fails (because it is not possible to describe the type via a class),
* consider using the {@link #StateDescriptor(String, TypeInformation, Object)} constructor.
*
* @param name The name of the {@code StateDescriptor}.
* @param type The class of the type of values in the state.
* @param defaultValue The default value that will be set when requesting state without setting
* a value before.
*/
protected StateDescriptor(String name, Class<T> type, @Nullable T defaultValue) {
this.name = checkNotNull(name, "name must not be null");
checkNotNull(type, "type class must not be null");
try {
this.typeInfo = TypeExtractor.createTypeInfo(type);
} catch (Exception e) {
throw new RuntimeException(
"Could not create the type information for '" + type.getName() + "'. " +
"The most common reason is failure to infer the generic type information, due to Java's type erasure. " +
"In that case, please pass a 'TypeHint' instead of a class to describe the type. " +
"For example, to describe 'Tuple2<String, String>' as a generic type, use " +
"'new PravegaDeserializationSchema<>(new TypeHint<Tuple2<String, String>>(){}, serializer);'", e);
}
this.defaultValue = defaultValue;
}
上面這個方法是在創(chuàng)建ValueState的時候需要傳入的描述類的構(gòu)造方法。其中
this.typeInfo = TypeExtractor.createTypeInfo(type);
就是為了識別出是什么類型兼贡∈矗可以自己寫個main方法單獨調(diào)用這個方法,type傳入自己存入的類紧显。如果執(zhí)行返回的是PojoTypeInfo,則認(rèn)為是POJO缕棵。如果是GenericTypeInfo則會使用Kryo孵班。
三、為什么POJO 支持字段增減招驴,不支持改類型
Flink通過org.apache.flink.api.java.typeutils.runtime.PojoSerializer實現(xiàn)對ValueState中存儲POJO對象的序列化和反序列化操作篙程。
我們假定ValueState存儲POJO類:
//這里使用了lombok
@Data
public class Dog {
private int type;
private String name;
private int age;
private int sex;
}
下面是截取的序列化方法serialize的代碼,fields是用戶類的字段的字段反射數(shù)組别厘,長度為4虱饿,分別對應(yīng)4個字段,類為java.lang.reflect.Field触趴。fieldSerializers是字段對應(yīng)的序列化器氮发,同樣長度也是4,如果字段類型是int冗懦,序列化器是IntSerializer爽冕,以此類推。通過反射獲取對象字段內(nèi)容披蕉,然后調(diào)用各自的序列化器寫入到字節(jié)流颈畸。
for (int i = 0; i < numFields; i++) {
Object o = (fields[i] != null) ? fields[i].get(value) : null;
if (o == null) {
//如果對象該字段為null,則寫入true
target.writeBoolean(true); // null field handling
} else {
//如果對象該字段不為null没讲,則寫入false眯娱,然后寫入序列化之后的值內(nèi)容
target.writeBoolean(false);
fieldSerializers[i].serialize(o, target);
}
}
Flink在做checkpoint時會將ValueState的對象「字段內(nèi)容」和「字段相應(yīng)的序列化器」同時進(jìn)行快照。序列化器的快照操作調(diào)用了PojoSerializer的以下方法:
@Override
public PojoSerializerSnapshot<T> snapshotConfiguration() {
return buildSnapshot(
clazz,
registeredClasses,
registeredSerializers,
fields,
fieldSerializers,
subclassSerializerCache);
}
反序列化方法deserialize的代碼判斷截取如下爬凑。
for (int i = 0; i < numFields; i++) {
//讀取對象下一個字段是否為null
boolean isNull = source.readBoolean();
//下一個字段的反射Field對象是否為null徙缴。如果這個字段在新的POJO類中被刪除則為null。
if (fields[i] != null) {
if (isNull) {
fields[i].set(target, null);
} else {
Object field = fieldSerializers[i].deserialize(source);
fields[i].set(target, field);
}
} else if (!isNull) {
// 這條分支表示:該字段在新的POJO結(jié)構(gòu)中刪除了贰谣,但是在之前序列化時值非null
// 雖然這個值不需要賦值到新的POJO對象中娜搂,但是需要保證流的結(jié)構(gòu)正確,還是需要跳過這部分
fieldSerializers[i].deserialize(source);
}
}
字段減少
字段沒有變化情況從略吱抚,這里對于字段增減的場景進(jìn)行說明百宇。假定ValueState做了一次checkpoint后去掉了name字段,類結(jié)構(gòu)變?yōu)椋?/p>
//這里使用了lombok
@Data
public class Dog {
private int type;
private int age;
private int sex;
}
程序在反序列化的時候fields長度和fieldSerializers長度依然都是4秘豹。不同的是fields數(shù)組中字段name對應(yīng)數(shù)據(jù)元素值為null携御。name字段的反序列化以下分支:
fieldSerializers[i].deserialize(source);
字段增加
假定ValueState做了一次checkpoint后增了address字段,類結(jié)構(gòu)變?yōu)椋?/p>
//這里使用了lombok
@Data
public class Dog {
private int type;
private String name;
private int age;
private String address;
private int sex;
}
程序在反序列化的時候fields長度和fieldSerializers長度仍然都是4。走正常反序列化流程啄刹,不同的是對象新增加的字段address值為null涮坐。
字段類型變化
字段類型變化有可能從long到int,或者String到int誓军,F(xiàn)link無法決定如何進(jìn)行轉(zhuǎn)換袱讹,所以不支持字段類型轉(zhuǎn)換情有可原。對字段變化的兼容性檢查昵时,在使用PojoSerializer進(jìn)行反序列化之前捷雕。在org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot的getCompatibilityOfPreExistingFields方法中判斷兼容性,具體在此不展開說明壹甥。
四救巷、遇到字段類型需要改變的場景如何搶救
如果該字段暫時沒有下游使用,可以刪除掉這個字段句柠,等到checkpoint做完后浦译,再加回來,利用POJO字段可以增刪的特性溯职。
如果字段已經(jīng)有下游使用精盅,需要溝通協(xié)調(diào),比如另外增加字段存儲這個值谜酒。
六渤弛、參考內(nèi)容
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html#rules-for-pojo-types
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html#pojo-types