Apache Flink 進(jìn)階(五):數(shù)據(jù)類型和序列化

本文根據(jù) Apache Flink 系列直播整理而成量愧,由 Apache Flink Contributor、360 數(shù)據(jù)開發(fā)高級工程師馬慶祥老師分享羊始。文章主要從如何為Flink量身定制的序列化框架帮匾、Flink序列化的最佳實踐奉件、Flink通信層的序列化三部分分享。

第一部分:

大家都知道現(xiàn)在大數(shù)據(jù)生態(tài)非忱ブ火县貌,大多數(shù)技術(shù)組件都是運行在 JVM 上的,F(xiàn)link 也是運行在 JVM 上凑懂,基于 JVM 的數(shù)據(jù)分析引擎都需要將大量的數(shù)據(jù)存儲在內(nèi)存中煤痕,這就不得不面臨 JVM 的一些問題,比如 Java 對象存儲密度較低等接谨。針對這些問題摆碉,最常用的方法就是實現(xiàn)一個顯式的內(nèi)存管理,也就是說用自定義的內(nèi)存池來進(jìn)行內(nèi)存的分配回收脓豪,接著將序列化后的對象存儲到內(nèi)存塊中巷帝。

現(xiàn)在 Java 生態(tài)圈中已經(jīng)有許多序列化框架,比如說 Java serialization, Kryo, Apache Avro 等等扫夜。但是 Flink 依然是選擇了自己定制的序列化框架楞泼,那么到底有什么意義呢?若 Flink 選擇自己定制的序列化框架笤闯,對類型信息了解越多堕阔,可以在早期完成類型檢查,更好的選取序列化方式望侈,進(jìn)行數(shù)據(jù)布局印蔬,節(jié)省數(shù)據(jù)的存儲空間,直接操作二進(jìn)制數(shù)據(jù)脱衙。

Flink 的數(shù)據(jù)類型

image.png

Flink 在其內(nèi)部構(gòu)建了一套自己的類型系統(tǒng)侥猬,F(xiàn)link 現(xiàn)階段支持的類型分類如圖所示,從圖中可以看到 Flink 類型可以分為基礎(chǔ)類型(Basic)捐韩、數(shù)組(Arrays)退唠、復(fù)合類型(Composite)、輔助類型(Auxiliary)荤胁、泛型和其它類型(Generic)瞧预。Flink 支持任意的 Java 或是 Scala 類型。不需要像 Hadoop 一樣去實現(xiàn)一個特定的接口(org.apache.hadoop.io.Writable)仅政,F(xiàn)link 能夠自動識別數(shù)據(jù)類型垢油。


image.png

那這么多的數(shù)據(jù)類型,在 Flink 內(nèi)部又是如何表示的呢圆丹?圖示中的 Person 類滩愁,復(fù)合類型的一個 Pojo 在 Flink 中是用 PojoTypeInfo 來表示,它繼承至 TypeInformation辫封,也即在 Flink 中用 TypeInformation 作為類型描述符來表示每一種要表示的數(shù)據(jù)類型硝枉。

TypeInformation

image.png

TypeInformation 的思維導(dǎo)圖如圖所示廉丽,從圖中可以看出,在 Flink 中每一個具體的類型都對應(yīng)了一個具體的 TypeInformation 實現(xiàn)類妻味,例如 BasicTypeInformation 中的 IntegerTypeInformation 和 FractionalTypeInformation 都具體的對應(yīng)了一個 TypeInformation正压。然后還有 BasicArrayTypeInformation、CompositeType 以及一些其它類型责球,也都具體對應(yīng)了一個 TypeInformation焦履。

TypeInformation 是 Flink 類型系統(tǒng)的核心類。對于用戶自定義的 Function 來說雏逾,F(xiàn)link 需要一個類型信息來作為該函數(shù)的輸入輸出類型裁良,即 TypeInfomation。該類型信息類作為一個工具來生成對應(yīng)類型的序列化器 TypeSerializer校套,并用于執(zhí)行語義檢查,比如當(dāng)一些字段在作為 joing 或 grouping 的鍵時牧抵,檢查這些字段是否在該類型中存在笛匙。

如何使用 TypeInformation?下面的實踐中會為大家介紹犀变。

Flink 的序列化過程

image.png

在 Flink 序列化過程中妹孙,進(jìn)行序列化操作必須要有序列化器,那么序列化器從何而來获枝?每一個具體的數(shù)據(jù)類型都對應(yīng)一個 TypeInformation 的具體實現(xiàn)蠢正,每一個 TypeInformation 都會為對應(yīng)的具體數(shù)據(jù)類型提供一個專屬的序列化器。通過 Flink 的序列化過程圖可以看到 TypeInformation 會提供一個 createSerialize() 方法省店,通過這個方法就可以得到該類型進(jìn)行數(shù)據(jù)序列化操作與反序化操作的對象 TypeSerializer嚣崭。

對于大多數(shù)數(shù)據(jù)類型 Flink 可以自動生成對應(yīng)的序列化器,能非常高效地對數(shù)據(jù)集進(jìn)行序列化和反序列化懦傍,比如雹舀,BasicTypeInfo、WritableTypeIno 等粗俱,但針對 GenericTypeInfo 類型说榆,F(xiàn)link 會使用 Kyro 進(jìn)行序列化和反序列化。其中寸认,Tuple签财、Pojo 和 CaseClass 類型是復(fù)合類型,它們可能嵌套一個或者多個數(shù)據(jù)類型偏塞。在這種情況下唱蒸,它們的序列化器同樣是復(fù)合的。它們會將內(nèi)嵌類型的序列化委托給對應(yīng)類型的序列化器烛愧。

簡單的介紹下 Pojo 的類型規(guī)則油宜,即在滿足一些條件的情況下掂碱,才會選用 Pojo 的序列化進(jìn)行相應(yīng)的序列化與反序列化的一個操作。即類必須是 Public 的慎冤,且類有一個 public 的無參數(shù)構(gòu)造函數(shù)疼燥,該類(以及所有超類)中的所有非靜態(tài) no-static、非瞬態(tài) no-transient 字段都是 public 的(和非最終的 final)或者具有公共 getter 和 setter 方法蚁堤,該方法遵循 getter 和 setter 的 Java bean 命名約定醉者。當(dāng)用戶定義的數(shù)據(jù)類型無法識別為 POJO 類型時,必須將其作為 GenericType 處理并使用 Kryo 進(jìn)行序列化披诗。

Flink 自帶了很多 TypeSerializer 子類撬即,大多數(shù)情況下各種自定義類型都是常用類型的排列組合,因而可以直接復(fù)用呈队,如果內(nèi)建的數(shù)據(jù)類型和序列化方式不能滿足你的需求剥槐,F(xiàn)link 的類型信息系統(tǒng)也支持用戶拓展。若用戶有一些特殊的需求宪摧,只需要實現(xiàn) TypeInformation粒竖、TypeSerializer 和 TypeComparator 即可定制自己類型的序列化和比較大小方式,來提升數(shù)據(jù)類型在序列化和比較時的性能几于。

image.png

序列化就是將數(shù)據(jù)結(jié)構(gòu)或者對象轉(zhuǎn)換成一個二進(jìn)制串的過程蕊苗,在 Java 里面可以簡單地理解成一個 byte 數(shù)組。而反序列化恰恰相反沿彭,就是將序列化過程中所生成的二進(jìn)制串轉(zhuǎn)換成數(shù)據(jù)結(jié)構(gòu)或者對象的過程朽砰。下面就以內(nèi)嵌型的 Tuple 3 這個對象為例,簡述一下它的序列化過程喉刘。Tuple 3 包含三個層面瞧柔,一是 int 類型,一是 double 類型睦裳,還有一個是 Person非剃。Person 包含兩個字段,一是 int 型的 ID推沸,另一個是 String 類型的 name备绽,它在序列化操作時,會委托相應(yīng)具體序列化的序列化器進(jìn)行相應(yīng)的序列化操作鬓催。從圖中可以看到 Tuple 3 會把 int 類型通過 IntSerializer 進(jìn)行序列化操作肺素,此時 int 只需要占用四個字節(jié)就可以了。根據(jù) int 占用四個字節(jié)宇驾,這個能夠體現(xiàn)出 Flink 可序列化過程中的一個優(yōu)勢倍靡,即在知道數(shù)據(jù)類型的前提下,可以更好的進(jìn)行相應(yīng)的序列化與反序列化操作课舍。相反塌西,如果采用 Java 的序列化他挎,雖然能夠存儲更多的屬性信息,但一次占據(jù)的存儲空間會受到一定的損耗捡需。

Person 類會被當(dāng)成一個 Pojo 對象來進(jìn)行處理办桨,PojoSerializer 序列化器會把一些屬性信息使用一個字節(jié)存儲起來。同樣站辉,其字段則采取相對應(yīng)的序列化器進(jìn)行相應(yīng)序列化呢撞,在序列化完的結(jié)果中,可以看到所有的數(shù)據(jù)都是由 MemorySegment 去支持饰剥。MemorySegment 具有什么作用呢殊霞?

MemorySegment 在 Flink 中會將對象序列化到預(yù)分配的內(nèi)存塊上,它代表 1 個固定長度的內(nèi)存汰蓉,默認(rèn)大小為 32 kb绷蹲。MemorySegment 代表 Flink 中的一個最小的內(nèi)存分配單元,相當(dāng)于是 Java 的一個 byte 數(shù)組顾孽。 每條記錄都會以序列化的形式存儲在一個或多個 MemorySegment 中瘸右。

第二部分:

Flink 序列化的最佳實踐

最常見的場景

Flink 常見的應(yīng)用場景有四種,即注冊子類型岩齿、注冊自定義序列化器、添加類型提示苞俘、手動創(chuàng)建 TypeInformation盹沈,具體介紹如下:

  • 注冊子類型:如果函數(shù)簽名只描述了超類型,但是它們實際上在執(zhí)行期間使用了超類型的子類型吃谣,那么讓 Flink 了解這些子類型會大大提高性能乞封。可以在 StreamExecutionEnvironment 或 ExecutionEnvironment 中調(diào)用 .registertype (clazz) 注冊子類型信息岗憋。
  • 注冊自定義序列化:對于不適用于自己的序列化框架的數(shù)據(jù)類型肃晚,F(xiàn)link 會使用 Kryo 來進(jìn)行序列化,并不是所有的類型都與 Kryo 無縫連接仔戈,具體注冊方法在下文介紹关串。
  • 添加類型提示:有時,當(dāng) Flink 用盡各種手段都無法推測出泛型信息時监徘,用戶需要傳入一個類型提示 TypeHint晋修,這個通常只在 Java API 中需要。
  • 手動創(chuàng)建一個 TypeInformation:在某些 API 調(diào)用中凰盔,這可能是必需的墓卦,因為 Java 的泛型類型擦除導(dǎo)致 Flink 無法推斷數(shù)據(jù)類型。

其實在大多數(shù)情況下户敬,用戶不必?fù)?dān)心序列化框架和注冊類型落剪,因為 Flink 已經(jīng)提供了大量的序列化操作睁本,不需要去定義自己的一些序列化器,但是在一些特殊場景下忠怖,需要去做一些相應(yīng)的處理呢堰。

實踐–類型聲明

類型聲明去創(chuàng)建一個類型信息的對象是通過哪種方式?通常是用 TypeInformation.of() 方法來創(chuàng)建一個類型信息的對象脑又,具體說明如下:

  • 對于非泛型類暮胧,直接傳入 class 對象即可。
    PojoTypeInfo<Person> typeInfo = (PojoTypeInfo<Person>) TypeInformation.of(Person.class);
    
  • 對于泛型類问麸,需要通過 TypeHint 來保存泛型類型信息往衷。
    final TypeInfomation<Tuple2<Integer,Integer>> resultType = TypeInformation.of(new TypeHint<Tuple2<Integer,Integer>>(){});
    
  • 預(yù)定義常量。

如 BasicTypeInfo严卖,這個類定義了一系列常用類型的快捷方式席舍,對于 String、Boolean哮笆、Byte来颤、Short、Integer稠肘、Long福铅、Float、Double项阴、Char 等基本類型的類型聲明滑黔,可以直接使用。而且 Flink 還提供了完全等價的 Types 類(org.apache.flink.api.common.typeinfo.Types)环揽。特別需要注意的是略荡,flink-table 模塊也有一個 Types 類(org.apache.flink.table.api.Types),用于 table 模塊內(nèi)部的類型定義信息歉胶,用法稍有不同汛兜。使用 IDE 的自動 import 時一定要小心。

  • 自定義 TypeInfo 和 TypeInfoFactory通今。
6.jpg

通過自定義 TypeInfo 為任意類提供 Flink 原生內(nèi)存管理(而非 Kryo)粥谬,可令存儲更緊湊,運行時也更高效辫塌。需要注意在自定義類上使用 @TypeInfo 注解帝嗡,隨后創(chuàng)建相應(yīng)的 TypeInfoFactory 并覆蓋 createTypeInfo() 方法。

實踐–注冊子類型

Flink 認(rèn)識父類璃氢,但不一定認(rèn)識子類的一些獨特特性哟玷,因此需要單獨注冊子類型。

StreamExecutionEnvironment 和 ExecutionEnvironment 提供 registerType() 方法用來向 Flink 注冊子類信息。

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Env. registerType(typeClass);
7.png

在 registerType() 方法內(nèi)部巢寡,會使用 TypeExtractor 來提取類型信息喉脖,如上圖所示,獲取到的類型信息屬于 PojoTypeInfo 及其子類抑月,那么需要將其注冊到一起树叽,否則統(tǒng)一交給 Kryo 去處理,F(xiàn)link 并不過問(這種情況下性能會變差)谦絮。

實踐–Kryo 序列化

對于 Flink 無法序列化的類型(例如用戶自定義類型题诵,沒有 registerType,也沒有自定義 TypeInfo 和 TypeInfoFactory)层皱,默認(rèn)會交給 Kryo 處理性锭,如果 Kryo 仍然無法處理(例如 Guava、Thrift叫胖、Protobuf 等第三方庫的一些類)草冈,有兩種解決方案:

強制使用 Avro 來代替 Kryo。
env.getConfig().enableForceAvro();
為 Kryo 增加自定義的 Serializer 以增強 Kryo 的功能瓮增。
env.getConfig().addDefaultKryoSerializer(clazz, serializer);

注:如果希望完全禁用 Kryo(100% 使用 Flink 的序列化機制)怎棱,可以通過 Kryo-env.getConfig().disableGenericTypes() 的方式完成,但注意一切無法處理的類都將導(dǎo)致異常绷跑,這種對于調(diào)試非常有效拳恋。

第三部分:

Flink 通信層的序列化:

Flink 的 Task 之間如果需要跨網(wǎng)絡(luò)傳輸數(shù)據(jù)記錄, 那么就需要將數(shù)據(jù)序列化之后寫入 NetworkBufferPool砸捏,然后下層的 Task 讀出之后再進(jìn)行反序列化操作谬运,最后進(jìn)行邏輯處理。

為了使得記錄以及事件能夠被寫入 Buffer带膜,隨后在消費時再從 Buffer 中讀出,F(xiàn)link 提供了數(shù)據(jù)記錄序列化器(RecordSerializer)與反序列化器(RecordDeserializer)以及事件序列化器(EventSerializer)鸳谜。

Function 發(fā)送的數(shù)據(jù)被封裝成 SerializationDelegate膝藕,它將任意元素公開為 IOReadableWritable 以進(jìn)行序列化,通過 setInstance() 來傳入要序列化的數(shù)據(jù)咐扭。

在 Flink 通信層的序列化中芭挽,有幾個問題值得關(guān)注,具體如下:


image.png

在構(gòu)建 StreamTransformation 的時候通過 TypeExtractor 工具確定 Function 的輸入輸出類型蝗肪。TypeExtractor 類可以根據(jù)方法簽名袜爪、子類信息等蛛絲馬跡自動提取或恢復(fù)類型信息。

何時確定 Function 的序列化/反序列化器薛闪?
構(gòu)造 StreamGraph 時辛馆,通過 TypeInfomation 的 createSerializer() 方法獲取對應(yīng)類型的序列化器 TypeSerializer,并在 addOperator() 的過程中執(zhí)行 setSerializers() 操作,設(shè)置 StreamConfig 的 TYPE_SERIALIZER_IN_1 昙篙、 TYPE_SERIALIZER_IN_2腊状、 TYPE_SERIALIZER_OUT_1 屬性。
何時進(jìn)行真正的序列化/反序列化操作苔可?這個過程與 TypeSerializer 又是怎么聯(lián)系在一起的呢缴挖?

image.png

大家都應(yīng)該清楚 Tsk 和 StreamTask 兩個概念,Task 是直接受 TaskManager 管理和調(diào)度的焚辅,而 Task 又會調(diào)用 StreamTask映屋,而 StreamTask 中真正封裝了算子的處理邏輯。在 run() 方法中同蜻,首先將反序列化后的數(shù)據(jù)封裝成 StreamRecord 交給算子處理棚点;然后將處理結(jié)果通過 Collector 發(fā)動給下游(在構(gòu)建 Collector 時已經(jīng)確定了 SerializtionDelegate),并通過 RecordWriter 寫入器將序列化后的結(jié)果寫入 DataOutput埃仪;最后序列化的操作交給 SerializerDelegate 處理乙濒,實際還是通過 TypeSerializer 的 serialize() 方法完成。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末卵蛉,一起剝皮案震驚了整個濱河市颁股,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌傻丝,老刑警劉巖甘有,帶你破解...
    沈念sama閱讀 206,839評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異葡缰,居然都是意外死亡亏掀,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,543評論 2 382
  • 文/潘曉璐 我一進(jìn)店門泛释,熙熙樓的掌柜王于貴愁眉苦臉地迎上來滤愕,“玉大人,你說我怎么就攤上這事怜校〖溆埃” “怎么了?”我有些...
    開封第一講書人閱讀 153,116評論 0 344
  • 文/不壞的土叔 我叫張陵茄茁,是天一觀的道長魂贬。 經(jīng)常有香客問我,道長裙顽,這世上最難降的妖魔是什么付燥? 我笑而不...
    開封第一講書人閱讀 55,371評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮愈犹,結(jié)果婚禮上键科,老公的妹妹穿的比我還像新娘。我一直安慰自己,他們只是感情好萝嘁,可當(dāng)我...
    茶點故事閱讀 64,384評論 5 374
  • 文/花漫 我一把揭開白布梆掸。 她就那樣靜靜地躺著,像睡著了一般牙言。 火紅的嫁衣襯著肌膚如雪酸钦。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,111評論 1 285
  • 那天咱枉,我揣著相機與錄音卑硫,去河邊找鬼。 笑死蚕断,一個胖子當(dāng)著我的面吹牛欢伏,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播亿乳,決...
    沈念sama閱讀 38,416評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼硝拧,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了葛假?” 一聲冷哼從身側(cè)響起障陶,我...
    開封第一講書人閱讀 37,053評論 0 259
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎聊训,沒想到半個月后抱究,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 43,558評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡带斑,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,007評論 2 325
  • 正文 我和宋清朗相戀三年鼓寺,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片勋磕。...
    茶點故事閱讀 38,117評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡妈候,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出挂滓,到底是詐尸還是另有隱情苦银,我是刑警寧澤,帶...
    沈念sama閱讀 33,756評論 4 324
  • 正文 年R本政府宣布杂彭,位于F島的核電站墓毒,受9級特大地震影響吓揪,放射性物質(zhì)發(fā)生泄漏亲怠。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,324評論 3 307
  • 文/蒙蒙 一柠辞、第九天 我趴在偏房一處隱蔽的房頂上張望团秽。 院中可真熱鬧,春花似錦、人聲如沸习勤。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,315評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽图毕。三九已至夷都,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間予颤,已是汗流浹背囤官。 一陣腳步聲響...
    開封第一講書人閱讀 31,539評論 1 262
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蛤虐,地道東北人党饮。 一個月前我還...
    沈念sama閱讀 45,578評論 2 355
  • 正文 我出身青樓,卻偏偏與公主長得像驳庭,于是被迫代替她去往敵國和親刑顺。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,877評論 2 345