根據(jù)Flink文檔Flink 的 TypeInformation 類介紹:
POJOs 是特別有趣的,因為他們支持復(fù)雜類型的創(chuàng)建以及在鍵的定義中直接使用字段名: dataSet.join(another).where("name").equalTo("personName") 它們對運行時也是透明的检号,并且可以由 Flink 非常高效地處理。
在Flink中使用POJO有利于提高處理效率凑队,并且能夠提高代碼可讀性谤辜。
什么樣的類才能當(dāng)做POJO厉亏?定義了一個類,它是否被Flink當(dāng)做POJO了呢?
- 什么樣的類才能當(dāng)做POJO周拐?
這部分在官方文檔中有介紹:
①該類是公有的 (public) 和獨立的(沒有非靜態(tài)內(nèi)部類)
②該類擁有公有的無參構(gòu)造器
③類(以及所有超類)中的所有非靜態(tài)铡俐、非 transient 字段都是公有的(非 final 的), 或者具有遵循 Java bean 對于 getter 和 setter 命名規(guī)則的公有 getter 和 setter 方法妥粟。
上面的三條看不懂审丘?往下看
- 創(chuàng)建了一個類,它是否被Flink當(dāng)做POJO勾给?
最簡單的就是實例化一個對象滩报,放到Flink中運行一下,看Flink怎么說播急。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(1000);
final MyPOJOClass whiteListAlarm = new MyPOJOClass();
final DataStreamSource<MyPOJOClass> source = env.fromElements(MyPOJOClass);
final DataStream<String> map = source.map((MapFunction<MyPOJOClass, String>) value -> value.toString());
map.print();
env.execute();
把日志設(shè)置為INFO等級脓钾,如果這個類不能被當(dāng)做POJO,日志中會有對應(yīng)的描述桩警。在日志中搜索'pojo'就能看到可训,如果沒有搜索到,那么恭喜你的類是一個POJO捶枢。
- Flink是如何判斷一個類是否為POJO的握截?
請看源碼org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo
方法,主要做了如下驗證:
- 類是否為public
- 類是否有field(個人理解:這個類是不是用來存儲數(shù)據(jù)的)
- 每個field是否都是符合POJO要求的(參看文檔或者源碼方法
isValidPojoField
) - 是否有自定義的序列化烂叔、反序列化方法(是否重寫了
readObject
和writeObject
) - 是否有public的空構(gòu)造方法(如果沒有定義構(gòu)造方法編譯時會自動添加public的空構(gòu)造方法川蒙,如果定義了非空參數(shù)的溝構(gòu)造方法,要顯式的定義一個空參數(shù)的public構(gòu)造方法)
- 答疑
問:我的Class繼承了自另一個抽象類长已,是否為POJO?
答:如果抽象類和繼承后的類都滿足那就是POJO
問:我在Pojo中定義了一些方法昼牛,它還是POJO嗎术瓮?
答:是
問:我的Pojo實現(xiàn)了一個接口,它還是POJO嗎贰健?
答:是的
方法什么的在序列化的時候是不會被序列化的胞四,它們被存儲在一個公共的內(nèi)存區(qū),序列化的是field伶椿,因此每個field是否符合POJO要求(是否有public的getter辜伟、setter)很重要,反序列化時需要先構(gòu)造一個對象所以需要一個默認(rèn)的public構(gòu)造方法(空參數(shù))