前言
Apache Flink(下簡稱Flink)項(xiàng)目是大數(shù)據(jù)處理領(lǐng)域最近冉冉升起的一顆新星,其不同于其他大數(shù)據(jù)項(xiàng)目的諸多特性吸引了越來越多人的關(guān)注。本文將深入分析Flink的一些關(guān)鍵技術(shù)與特性完沪,希望能夠幫助讀者對Flink有更加深入的了解,對其他大數(shù)據(jù)系統(tǒng)開發(fā)者也能有所裨益嵌戈。本文假設(shè)讀者已對MapReduce覆积、Spark及Storm等大數(shù)據(jù)處理框架有所了解听皿,同時(shí)熟悉流處理與批處理的基本概念。
文章轉(zhuǎn)載自:深入理解Flink核心技術(shù)
一.Flink簡介
Flink核心是一個流式的數(shù)據(jù)流執(zhí)行引擎宽档,其針對數(shù)據(jù)流的分布式計(jì)算提供了數(shù)據(jù)分布尉姨、數(shù)據(jù)通信以及容錯機(jī)制等功能〈萍基于流執(zhí)行引擎啊送,F(xiàn)link提供了諸多更高抽象層的API以便用戶編寫分布式任務(wù):
- DataSet API, 對靜態(tài)數(shù)據(jù)進(jìn)行批處理操作欣孤,將靜態(tài)數(shù)據(jù)抽象成分布式的數(shù)據(jù)集馋没,用戶可以方便地使用Flink提供的各種操作符對分布式數(shù)據(jù)集進(jìn)行處理,支持Java降传、Scala和Python篷朵。
- DataStream API,對數(shù)據(jù)流進(jìn)行流處理操作婆排,將流式的數(shù)據(jù)抽象成分布式的數(shù)據(jù)流声旺,用戶可以方便地對分布式數(shù)據(jù)流進(jìn)行各種操作,支持Java和Scala段只。
- Table API腮猖,對結(jié)構(gòu)化數(shù)據(jù)進(jìn)行查詢操作,將結(jié)構(gòu)化數(shù)據(jù)抽象成關(guān)系表赞枕,并通過類SQL的DSL對關(guān)系表進(jìn)行各種查詢操作澈缺,支持Java和Scala。
此外炕婶,F(xiàn)link還針對特定的應(yīng)用領(lǐng)域提供了領(lǐng)域庫姐赡,例如:
- Flink ML,F(xiàn)link的機(jī)器學(xué)習(xí)庫柠掂,提供了機(jī)器學(xué)習(xí)Pipelines API并實(shí)現(xiàn)了多種機(jī)器學(xué)習(xí)算法项滑。
- Gelly,F(xiàn)link的圖計(jì)算庫涯贞,提供了圖計(jì)算的相關(guān)API及多種圖計(jì)算算法實(shí)現(xiàn)枪狂。
此外,F(xiàn)link也可以方便地和Hadoop生態(tài)圈中其他項(xiàng)目集成肩狂,例如Flink可以讀取存儲在HDFS或HBase中的靜態(tài)數(shù)據(jù)摘完,以Kafka作為流式的數(shù)據(jù)源,直接重用MapReduce或Storm代碼傻谁,或是通過YARN申請集群資源等孝治。
二.統(tǒng)一的批處理與流處理系統(tǒng)
在大數(shù)據(jù)處理領(lǐng)域,批處理任務(wù)與流處理任務(wù)一般被認(rèn)為是兩種不同的任務(wù),一個大數(shù)據(jù)項(xiàng)目一般會被設(shè)計(jì)為只能處理其中一種任務(wù)谈飒,例如Apache Storm岂座、Apache Smaza只支持流處理任務(wù),而Aapche MapReduce杭措、Apache Tez费什、Apache Spark只支持批處理任務(wù)。Spark Streaming是Apache Spark之上支持流處理任務(wù)的子系統(tǒng)手素,看似一個特例鸳址,實(shí)則不然——Spark Streaming采用了一種micro-batch的架構(gòu),即把輸入的數(shù)據(jù)流切分成細(xì)粒度的batch泉懦,并為每一個batch數(shù)據(jù)提交一個批處理的Spark任務(wù)稿黍,所以Spark Streaming本質(zhì)上還是基于Spark批處理系統(tǒng)對流式數(shù)據(jù)進(jìn)行處理,和Apache Storm崩哩、Apache Smaza等完全流式的數(shù)據(jù)處理方式完全不同巡球。通過其靈活的執(zhí)行引擎,F(xiàn)link能夠同時(shí)支持批處理任務(wù)與流處理任務(wù)邓嘹。
在執(zhí)行引擎這一層酣栈,流處理系統(tǒng)與批處理系統(tǒng)最大不同在于節(jié)點(diǎn)間的數(shù)據(jù)傳輸方式。
對于一個流處理系統(tǒng)汹押,其節(jié)點(diǎn)間數(shù)據(jù)傳輸?shù)臉?biāo)準(zhǔn)模型是:
當(dāng)一條數(shù)據(jù)被處理完成后矿筝,序列化到緩存中,然后立刻通過網(wǎng)絡(luò)傳輸?shù)较乱粋€節(jié)點(diǎn)棚贾,由下一個節(jié)點(diǎn)繼續(xù)處理跋涣。
而對于一個批處理系統(tǒng),其節(jié)點(diǎn)間數(shù)據(jù)傳輸?shù)臉?biāo)準(zhǔn)模型是:
當(dāng)一條數(shù)據(jù)被處理完成后鸟悴,序列化到緩存中,并不會立刻通過網(wǎng)絡(luò)傳輸?shù)较乱粋€節(jié)點(diǎn)奖年,當(dāng)緩存寫滿细诸,就持久化到本地硬盤上,當(dāng)所有數(shù)據(jù)都被處理完成后陋守,才開始將處理后的數(shù)據(jù)通過網(wǎng)絡(luò)傳輸?shù)较乱粋€節(jié)點(diǎn)震贵。
這兩種數(shù)據(jù)傳輸模式是兩個極端,對應(yīng)的是流處理系統(tǒng)對低延遲的要求和批處理系統(tǒng)對高吞吐量的要求水评。
Flink的執(zhí)行引擎采用了一種十分靈活的方式猩系,同時(shí)支持了這兩種數(shù)據(jù)傳輸模型。Flink以固定的緩存塊為單位進(jìn)行網(wǎng)絡(luò)數(shù)據(jù)傳輸中燥,用戶可以通過緩存塊超時(shí)值指定緩存塊的傳輸時(shí)機(jī)寇甸。如果緩存塊的超時(shí)值為0,則Flink的數(shù)據(jù)傳輸方式類似上文所提到流處理系統(tǒng)的標(biāo)準(zhǔn)模型,此時(shí)系統(tǒng)可以獲得最低的處理延遲拿霉。如果緩存塊的超時(shí)值為無限大吟秩,則Flink的數(shù)據(jù)傳輸方式類似上文所提到批處理系統(tǒng)的標(biāo)準(zhǔn)模型,此時(shí)系統(tǒng)可以獲得最高的吞吐量绽淘。同時(shí)緩存塊的超時(shí)值也可以設(shè)置為0到無限大之間的任意值涵防。緩存塊的超時(shí)閾值越小,則Flink流處理執(zhí)行引擎的數(shù)據(jù)處理延遲越低沪铭,但吞吐量也會降低壮池,反之亦然。通過調(diào)整緩存塊的超時(shí)閾值杀怠,用戶可根據(jù)需求靈活地權(quán)衡系統(tǒng)延遲和吞吐量椰憋。
在統(tǒng)一的流式執(zhí)行引擎基礎(chǔ)上,F(xiàn)link同時(shí)支持了流計(jì)算和批處理驮肉,并對性能(延遲熏矿、吞吐量等)有所保障。相對于其他原生的流處理與批處理系統(tǒng)离钝,并沒有因?yàn)榻y(tǒng)一執(zhí)行引擎而受到影響從而大幅度減輕了用戶安裝票编、部署、監(jiān)控卵渴、維護(hù)等成本慧域。
三.Flink流處理的容錯機(jī)制
對于一個分布式系統(tǒng)來說,單個進(jìn)程或是節(jié)點(diǎn)崩潰導(dǎo)致整個Job失敗是經(jīng)常發(fā)生的事情浪读,在異常發(fā)生時(shí)不會丟失用戶數(shù)據(jù)并能自動恢復(fù)才是分布式系統(tǒng)必須支持的特性之一昔榴。本節(jié)主要介紹Flink流處理系統(tǒng)任務(wù)級別的容錯機(jī)制。
批處理系統(tǒng)比較容易實(shí)現(xiàn)容錯機(jī)制碘橘,由于文件可以重復(fù)訪問互订,當(dāng)某個任務(wù)失敗后,重啟該任務(wù)即可痘拆。但是到了流處理系統(tǒng)仰禽,由于數(shù)據(jù)源是無限的數(shù)據(jù)流,從而導(dǎo)致一個流處理任務(wù)執(zhí)行幾個月的情況纺蛆,將所有數(shù)據(jù)緩存或是持久化吐葵,留待以后重復(fù)訪問基本上是不可行的。Flink基于分布式快照與可部分重發(fā)的數(shù)據(jù)源實(shí)現(xiàn)了容錯桥氏。用戶可自定義對整個Job進(jìn)行快照的時(shí)間間隔温峭,當(dāng)任務(wù)失敗時(shí),F(xiàn)link會將整個Job恢復(fù)到最近一次快照字支,并從數(shù)據(jù)源重發(fā)快照之后的數(shù)據(jù)凤藏。Flink的分布式快照實(shí)現(xiàn)借鑒了Chandy和Lamport在1985年發(fā)表的一篇關(guān)于分布式快照的論文奸忽,其實(shí)現(xiàn)的主要思想如下:
按照用戶自定義的分布式快照間隔時(shí)間,F(xiàn)link會定時(shí)在所有數(shù)據(jù)源中插入一種特殊的快照標(biāo)記消息清笨,這些快照標(biāo)記消息和其他消息一樣在DAG中流動月杉,但是不會被用戶定義的業(yè)務(wù)邏輯所處理,每一個快照標(biāo)記消息都將其所在的數(shù)據(jù)流分成兩部分:本次快照數(shù)據(jù)和下次快照數(shù)據(jù)抠艾。
快照標(biāo)記消息沿著DAG流經(jīng)各個操作符酷勺,當(dāng)操作符處理到快照標(biāo)記消息時(shí)罢防,會對自己的狀態(tài)進(jìn)行快照,并存儲起來。當(dāng)一個操作符有多個輸入的時(shí)候买决,F(xiàn)link會將先抵達(dá)的快照標(biāo)記消息及其之后的消息緩存起來已亥,當(dāng)所有的輸入中對應(yīng)該次快照的快照標(biāo)記消息全部抵達(dá)后引镊,操作符對自己的狀態(tài)快照并存儲胸嘁,之后處理所有快照標(biāo)記消息之后的已緩存消息。操作符對自己的狀態(tài)快照并存儲可以是異步與增量的操作凹蜂,并不需要阻塞消息的處理馍驯。分布式快照的流程如圖所示:
當(dāng)所有的Data Sink(終點(diǎn)操作符)都收到快照標(biāo)記信息并對自己的狀態(tài)快照和存儲后,整個分布式快照就完成了玛痊,同時(shí)通知數(shù)據(jù)源釋放該快照標(biāo)記消息之前的所有消息汰瘫。若之后發(fā)生節(jié)點(diǎn)崩潰等異常情況時(shí),只需要恢復(fù)之前存儲的分布式快照狀態(tài)擂煞,并從數(shù)據(jù)源重發(fā)該快照以后的消息就可以了混弥。
Exactly-Once是流處理系統(tǒng)需要支持的一個非常重要的特性,它保證每一條消息只被流處理系統(tǒng)處理一次对省,許多流處理任務(wù)的業(yè)務(wù)邏輯都依賴于Exactly-Once特性蝗拿。相對于At-Least-Once或是At-Most-Once, Exactly-Once特性對流處理系統(tǒng)的要求更為嚴(yán)格,實(shí)現(xiàn)也更加困難蒿涎。Flink基于分布式快照實(shí)現(xiàn)了Exactly-Once特性哀托。
相對于其他流處理系統(tǒng)的容錯方案,Flink基于分布式快照的方案在功能和性能方面都具有很多優(yōu)點(diǎn)劳秋,包括:
- 低延遲萤捆。由于操作符狀態(tài)的存儲可以異步,所以進(jìn)行快照的過程基本上不會阻塞消息的處理俗批,因此不會對消息延遲產(chǎn)生負(fù)面影響。
- 高吞吐量市怎。當(dāng)操作符狀態(tài)較少時(shí)岁忘,對吞吐量基本沒有影響。當(dāng)操作符狀態(tài)較多時(shí)区匠,相對于其他的容錯機(jī)制干像,分布式快照的時(shí)間間隔是用戶自定義的帅腌,所以用戶可以權(quán)衡錯誤恢復(fù)時(shí)間和吞吐量要求來調(diào)整分布式快照的時(shí)間間隔。
- 與業(yè)務(wù)邏輯的隔離麻汰。Flink的分布式快照機(jī)制與用戶的業(yè)務(wù)邏輯是完全隔離的速客,用戶的業(yè)務(wù)邏輯不會依賴或是對分布式快照產(chǎn)生任何影響。
- 錯誤恢復(fù)代價(jià)五鲫。分布式快照的時(shí)間間隔越短溺职,錯誤恢復(fù)的時(shí)間越少,與吞吐量負(fù)相關(guān)位喂。
四.Flink流處理的時(shí)間窗口
對于流處理系統(tǒng)來說浪耘,流入的消息不存在上限,所以對于聚合或是連接等操作塑崖,流處理系統(tǒng)需要對流入的消息進(jìn)行分段七冲,然后基于每一段數(shù)據(jù)進(jìn)行聚合或是連接。消息的分段即稱為窗口规婆,流處理系統(tǒng)支持的窗口有很多類型澜躺,最常見的就是時(shí)間窗口,基于時(shí)間間隔對消息進(jìn)行分段處理抒蚜。本節(jié)主要介紹Flink流處理系統(tǒng)支持的各種時(shí)間窗口掘鄙。
對于目前大部分流處理系統(tǒng)來說,時(shí)間窗口一般是根據(jù)Task所在節(jié)點(diǎn)的本地時(shí)鐘進(jìn)行切分削锰,這種方式實(shí)現(xiàn)起來比較容易通铲,不會產(chǎn)生阻塞。但是可能無法滿足某些應(yīng)用需求器贩,比如:
消息本身帶有時(shí)間戳颅夺,用戶希望按照消息本身的時(shí)間特性進(jìn)行分段處理。
由于不同節(jié)點(diǎn)的時(shí)鐘可能不同蛹稍,以及消息在流經(jīng)各個節(jié)點(diǎn)的延遲不同吧黄,在某個節(jié)點(diǎn)屬于同一個時(shí)間窗口處理的消息,流到下一個節(jié)點(diǎn)時(shí)可能被切分到不同的時(shí)間窗口中唆姐,從而產(chǎn)生不符合預(yù)期的結(jié)果拗慨。
Flink支持3種類型的時(shí)間窗口,分別適用于用戶對于時(shí)間窗口不同類型的要求:
Operator Time奉芦。根據(jù)Task所在節(jié)點(diǎn)的本地時(shí)鐘來切分的時(shí)間窗口赵抢。
Event Time。消息自帶時(shí)間戳声功,根據(jù)消息的時(shí)間戳進(jìn)行處理烦却,確保時(shí)間戳在同一個時(shí)間窗口的所有消息一定會被正確處理。由于消息可能亂序流入Task先巴,所以Task需要緩存當(dāng)前時(shí)間窗口消息處理的狀態(tài)其爵,直到確認(rèn)屬于該時(shí)間窗口的所有消息都被處理冒冬,才可以釋放,如果亂序的消息延遲很高會影響分布式系統(tǒng)的吞吐量和延遲摩渺。
Ingress Time简烤。有時(shí)消息本身并不帶有時(shí)間戳信息,但用戶依然希望按照消息而不是節(jié)點(diǎn)時(shí)鐘劃分時(shí)間窗口摇幻,例如避免上面提到的第二個問題横侦,此時(shí)可以在消息源流入Flink流處理系統(tǒng)時(shí)自動生成增量的時(shí)間戳賦予消息,之后處理的流程與Event Time相同囚企。Ingress Time可以看成是Event Time的一個特例丈咐,由于其在消息源處時(shí)間戳一定是有序的,所以在流處理系統(tǒng)中龙宏,相對于Event Time棵逊,其亂序的消息延遲不會很高,因此對Flink分布式系統(tǒng)的吞吐量和延遲的影響也會更小银酗。
五.Event Time時(shí)間窗口的實(shí)現(xiàn)
Flink借鑒了Google的MillWheel項(xiàng)目辆影,通過WaterMark來支持基于Event Time的時(shí)間窗口。
當(dāng)操作符通過基于Event Time的時(shí)間窗口來處理數(shù)據(jù)時(shí)黍特,它必須在確定所有屬于該時(shí)間窗口的消息全部流入此操作符后才能開始數(shù)據(jù)處理蛙讥。但是由于消息可能是亂序的,所以操作符無法直接確認(rèn)何時(shí)所有屬于該時(shí)間窗口的消息全部流入此操作符灭衷。WaterMark包含一個時(shí)間戳次慢,F(xiàn)link使用WaterMark標(biāo)記所有小于該時(shí)間戳的消息都已流入,F(xiàn)link的數(shù)據(jù)源在確認(rèn)所有小于某個時(shí)間戳的消息都已輸出到Flink流處理系統(tǒng)后翔曲,會生成一個包含該時(shí)間戳的WaterMark迫像,插入到消息流中輸出到Flink流處理系統(tǒng)中,F(xiàn)link操作符按照時(shí)間窗口緩存所有流入的消息瞳遍,當(dāng)操作符處理到WaterMark時(shí)闻妓,它對所有小于該WaterMark時(shí)間戳的時(shí)間窗口數(shù)據(jù)進(jìn)行處理并發(fā)送到下一個操作符節(jié)點(diǎn),然后也將WaterMark發(fā)送到下一個操作符節(jié)點(diǎn)掠械。
為了保證能夠處理所有屬于某個時(shí)間窗口的消息由缆,操作符必須等到大于這個時(shí)間窗口的WaterMark之后才能開始對該時(shí)間窗口的消息進(jìn)行處理,相對于基于Operator Time的時(shí)間窗口猾蒂,F(xiàn)link需要占用更多內(nèi)存均唉,且會直接影響消息處理的延遲時(shí)間。對此肚菠,一個可能的優(yōu)化措施是舔箭,對于聚合類的操作符,可以提前對部分消息進(jìn)行聚合操作案糙,當(dāng)有屬于該時(shí)間窗口的新消息流入時(shí)限嫌,基于之前的部分聚合結(jié)果繼續(xù)計(jì)算,這樣的話时捌,只需緩存中間計(jì)算結(jié)果即可怒医,無需緩存該時(shí)間窗口的所有消息。
對于基于Event Time時(shí)間窗口的操作符來說奢讨,流入WaterMark的時(shí)間戳與當(dāng)前節(jié)點(diǎn)的時(shí)鐘一致是最簡單理想的狀況稚叹,但是在實(shí)際環(huán)境中是不可能的,由于消息的亂序以及前面節(jié)點(diǎn)處理效率的不同拿诸,總是會有某些消息流入時(shí)間大于其本身的時(shí)間戳扒袖,真實(shí)WaterMark時(shí)間戳與理想情況下WaterMark時(shí)間戳的差別稱為Time Skew,如下圖所示:
Time Skew決定了該WaterMark與上一個WaterMark之間的時(shí)間窗口所有數(shù)據(jù)需要緩存的時(shí)間亩码,Time Skew時(shí)間越長季率,該時(shí)間窗口數(shù)據(jù)的延遲越長,占用內(nèi)存的時(shí)間也越長描沟,同時(shí)會對流處理系統(tǒng)的吞吐量產(chǎn)生負(fù)面影響飒泻。
六.基于時(shí)間戳的排序
在流處理系統(tǒng)中,由于流入的消息是無限的吏廉,所以對消息進(jìn)行排序基本上被認(rèn)為是不可行的泞遗。但是在Flink流處理系統(tǒng)中,基于WaterMark席覆,F(xiàn)link實(shí)現(xiàn)了基于時(shí)間戳的全局排序史辙。排序的實(shí)現(xiàn)思路如下:排序操作符緩存所有流入的消息,當(dāng)其接收到WaterMark時(shí)佩伤,對時(shí)間戳小于該WaterMark的消息進(jìn)行排序聊倔,并發(fā)送到下一個節(jié)點(diǎn),在此排序操作符中釋放所有時(shí)間戳小于該WaterMark的消息畦戒,繼續(xù)緩存流入的消息方库,等待下一個WaterMark觸發(fā)下一次排序。
由于WaterMark保證了在其之后不會出現(xiàn)時(shí)間戳比它小的消息障斋,所以可以保證排序的正確性纵潦。需要注意的是,如果排序操作符有多個節(jié)點(diǎn)垃环,只能保證每個節(jié)點(diǎn)的流出消息是有序的邀层,節(jié)點(diǎn)之間的消息不能保證有序,要實(shí)現(xiàn)全局有序遂庄,則只能有一個排序操作符節(jié)點(diǎn)寥院。
通過支持基于Event Time的消息處理,F(xiàn)link擴(kuò)展了其流處理系統(tǒng)的應(yīng)用范圍涛目,使得更多的流處理任務(wù)可以通過Flink來執(zhí)行秸谢。
七.定制的內(nèi)存管理
Flink項(xiàng)目基于Java及Scala等JVM語言凛澎,JVM本身作為一個各種類型應(yīng)用的執(zhí)行平臺,其對Java對象的管理也是基于通用的處理策略估蹄,其垃圾回收器通過估算Java對象的生命周期對Java對象進(jìn)行有效率的管理塑煎。
針對不同類型的應(yīng)用,用戶可能需要針對該類型應(yīng)用的特點(diǎn)臭蚁,配置針對性的JVM參數(shù)更有效率的管理Java對象最铁,從而提高性能。這種JVM調(diào)優(yōu)的黑魔法需要用戶對應(yīng)用本身及JVM的各參數(shù)有深入了解垮兑,極大地提高了分布式計(jì)算平臺的調(diào)優(yōu)門檻冷尉。Flink框架本身了解計(jì)算邏輯每個步驟的數(shù)據(jù)傳輸,相比于JVM垃圾回收器系枪,其了解更多的Java對象生命周期雀哨,從而為更有效率地管理Java對象提供了可能。
JVM存在的問題
1.Java對象開銷
相對于c/c++等更加接近底層的語言嗤无,Java對象的存儲密度相對偏低震束,例如[1],“abcd”這樣簡單的字符串在UTF-8編碼中需要4個字節(jié)存儲当犯,但采用了UTF-16編碼存儲字符串的Java則需要8個字節(jié)垢村,同時(shí)Java對象還有header等其他額外信息,一個4字節(jié)字符串對象在Java中需要48字節(jié)的空間來存儲嚎卫。對于大部分的大數(shù)據(jù)應(yīng)用嘉栓,內(nèi)存都是稀缺資源,更有效率地內(nèi)存存儲拓诸,意味著CPU數(shù)據(jù)訪問吞吐量更高侵佃,以及更少磁盤落地的存在。
2.對象存儲結(jié)構(gòu)引發(fā)的cache miss
為了緩解CPU處理速度與內(nèi)存訪問速度的差距奠支,現(xiàn)代CPU數(shù)據(jù)訪問一般都會有多級緩存馋辈。當(dāng)從內(nèi)存加載數(shù)據(jù)到緩存時(shí),一般是以cache line為單位加載數(shù)據(jù)倍谜,所以當(dāng)CPU訪問的數(shù)據(jù)如果是在內(nèi)存中連續(xù)存儲的話迈螟,訪問的效率會非常高。如果CPU要訪問的數(shù)據(jù)不在當(dāng)前緩存所有的cache line中尔崔,則需要從內(nèi)存中加載對應(yīng)的數(shù)據(jù)答毫,這被稱為一次cache miss。當(dāng)cache miss非常高的時(shí)候季春,CPU大部分的時(shí)間都在等待數(shù)據(jù)加載洗搂,而不是真正的處理數(shù)據(jù)。Java對象并不是連續(xù)的存儲在內(nèi)存上,同時(shí)很多的Java數(shù)據(jù)結(jié)構(gòu)的數(shù)據(jù)聚集性也不好耘拇。
3.大數(shù)據(jù)的垃圾回收
Java的垃圾回收機(jī)制一直讓Java開發(fā)者又愛又恨撵颊,一方面它免去了開發(fā)者自己回收資源的步驟,提高了開發(fā)效率惫叛,減少了內(nèi)存泄漏的可能秦驯,另一方面垃圾回收也是Java應(yīng)用的不定時(shí)炸彈,有時(shí)秒級甚至是分鐘級的垃圾回收極大影響了Java應(yīng)用的性能和可用性挣棕。在時(shí)下數(shù)據(jù)中心,大容量內(nèi)存得到了廣泛的應(yīng)用亲桥,甚至出現(xiàn)了單臺機(jī)器配置TB內(nèi)存的情況洛心,同時(shí),大數(shù)據(jù)分析通常會遍歷整個源數(shù)據(jù)集题篷,對數(shù)據(jù)進(jìn)行轉(zhuǎn)換词身、清洗、處理等步驟番枚。在這個過程中法严,會產(chǎn)生海量的Java對象,JVM的垃圾回收執(zhí)行效率對性能有很大影響葫笼。通過JVM參數(shù)調(diào)優(yōu)提高垃圾回收效率需要用戶對應(yīng)用和分布式計(jì)算框架以及JVM的各參數(shù)有深入了解深啤,而且有時(shí)候這也遠(yuǎn)遠(yuǎn)不夠。
4.OOM問題
OutOfMemoryError是分布式計(jì)算框架經(jīng)常會遇到的問題路星,當(dāng)JVM中所有對象大小超過分配給JVM的內(nèi)存大小時(shí)溯街,就會出現(xiàn)OutOfMemoryError錯誤,JVM崩潰洋丐,分布式框架的健壯性和性能都會受到影響呈昔。通過JVM管理內(nèi)存,同時(shí)試圖解決OOM問題的應(yīng)用友绝,通常都需要檢查Java對象的大小堤尾,并在某些存儲Java對象特別多的數(shù)據(jù)結(jié)構(gòu)中設(shè)置閾值進(jìn)行控制。但是JVM并沒有提供官方檢查Java對象大小的工具迁客,第三方的工具類庫可能無法準(zhǔn)確通用地確定Java對象大小[6]郭宝。侵入式的閾值檢查也會為分布式計(jì)算框架的實(shí)現(xiàn)增加很多額外與業(yè)務(wù)邏輯無關(guān)的代碼。
Flink的處理策略
為了解決以上提到的問題哲泊,高性能分布式計(jì)算框架通常需要以下技術(shù):
定制的序列化工具剩蟀。顯式內(nèi)存管理的前提步驟就是序列化,將Java對象序列化成二進(jìn)制數(shù)據(jù)存儲在內(nèi)存上(on heap或是off-heap)切威。通用的序列化框架育特,如Java默認(rèn)使用java.io.Serializable將Java對象及其成員變量的所有元信息作為其序列化數(shù)據(jù)的一部分,序列化后的數(shù)據(jù)包含了所有反序列化所需的信息。這在某些場景中十分必要缰冤,但是對于Flink這樣的分布式計(jì)算框架來說犬缨,這些元數(shù)據(jù)信息可能是冗余數(shù)據(jù)。定制的序列化框架棉浸,如Hadoop的org.apache.hadoop.io.Writable需要用戶實(shí)現(xiàn)該接口怀薛,并自定義類的序列化和反序列化方法。這種方式效率最高迷郑,但需要用戶額外的工作枝恋,不夠友好。
顯式的內(nèi)存管理嗡害。一般通用的做法是批量申請和釋放內(nèi)存焚碌,每個JVM實(shí)例有一個統(tǒng)一的內(nèi)存管理器,所有內(nèi)存的申請和釋放都通過該內(nèi)存管理器進(jìn)行霸妹。這可以避免常見的內(nèi)存碎片問題十电,同時(shí)由于數(shù)據(jù)以二進(jìn)制的方式存儲,可以大大減輕垃圾回收壓力叹螟。
緩存友好的數(shù)據(jù)結(jié)構(gòu)和算法鹃骂。對于計(jì)算密集的數(shù)據(jù)結(jié)構(gòu)和算法,直接操作序列化后的二進(jìn)制數(shù)據(jù)罢绽,而不是將對象反序列化后再進(jìn)行操作畏线。同時(shí),只將操作相關(guān)的數(shù)據(jù)連續(xù)存儲良价,可以最大化的利用L1/L2/L3緩存象踊,減少Cache miss的概率,提升CPU計(jì)算的吞吐量棚壁。以排序?yàn)槔兀捎谂判虻闹饕僮魇菍ey進(jìn)行對比,如果將所有排序數(shù)據(jù)的Key與Value分開并對Key連續(xù)存儲袖外,那么訪問Key時(shí)的Cache命中率會大大提高史隆。
八.定制的序列化工具
分布式計(jì)算框架可以使用定制序列化工具的前提是要待處理數(shù)據(jù)流通常是同一類型,由于數(shù)據(jù)集對象的類型固定曼验,從而可以只保存一份對象Schema信息泌射,節(jié)省大量的存儲空間。同時(shí)鬓照,對于固定大小的類型熔酷,也可通過固定的偏移位置存取。在需要訪問某個對象成員變量時(shí)豺裆,通過定制的序列化工具拒秘,并不需要反序列化整個Java對象号显,而是直接通過偏移量,從而只需要反序列化特定的對象成員變量躺酒。如果對象的成員變量較多時(shí)押蚤,能夠大大減少Java對象的創(chuàng)建開銷,以及內(nèi)存數(shù)據(jù)的拷貝大小羹应。Flink數(shù)據(jù)集都支持任意Java或是Scala類型揽碘,通過自動生成定制序列化工具,既保證了API接口對用戶友好(不用像Hadoop那樣數(shù)據(jù)類型需要繼承實(shí)現(xiàn)org.apache.hadoop.io.Writable接口)园匹,也達(dá)到了和Hadoop類似的序列化效率雳刺。
Flink對數(shù)據(jù)集的類型信息進(jìn)行分析,然后自動生成定制的序列化工具類裸违。Flink支持任意的Java或是Scala類型煞烫,通過Java Reflection框架分析基于Java的Flink程序UDF(User Define Function)的返回類型的類型信息,通過Scala Compiler分析基于Scala的Flink程序UDF的返回類型的類型信息累颂。類型信息由TypeInformation類表示,這個類有諸多具體實(shí)現(xiàn)類凛俱,例如:
- BasicTypeInfo:任意Java基本類型(裝包或未裝包)和String類型紊馏。
- BasicArrayTypeInfo:任意Java基本類型數(shù)組(裝包或未裝包)和String數(shù)組。
- WritableTypeInfo:任意Hadoop的Writable接口的實(shí)現(xiàn)類蒲犬。
- TupleTypeInfo:任意的Flink tuple類型(支持Tuple1 to Tuple25)朱监。 Flink tuples是固定長度固定類型的Java Tuple實(shí)現(xiàn)。
- CaseClassTypeInfo:任意的 Scala CaseClass(包括 Scala tuples)原叮。
- PojoTypeInfo:任意的POJO (Java or Scala)赫编,例如Java對象的所有成員變量,要么是public修飾符定義奋隶,要么有g(shù)etter/setter方法擂送。
- GenericTypeInfo:任意無法匹配之前幾種類型的類。
前6種類型數(shù)據(jù)集幾乎覆蓋了絕大部分的Flink程序唯欣,針對前6種類型數(shù)據(jù)集嘹吨,F(xiàn)link皆可以自動生成對應(yīng)的TypeSerializer定制序列化工具,非常有效率地對數(shù)據(jù)集進(jìn)行序列化和反序列化境氢。對于第7種類型蟀拷,F(xiàn)link使用Kryo進(jìn)行序列化和反序列化。此外萍聊,對于可被用作Key的類型问芬,F(xiàn)link還同時(shí)自動生成TypeComparator,用來輔助直接對序列化后的二進(jìn)制數(shù)據(jù)直接進(jìn)行compare寿桨、hash等操作此衅。對于Tuple、CaseClass、Pojo等組合類型炕柔,F(xiàn)link自動生成的TypeSerializer酌泰、TypeComparator同樣是組合的,并把其成員的序列化/反序列化代理給其成員對應(yīng)的TypeSerializer匕累、TypeComparator陵刹,如圖所示:
此外如有需要,用戶可通過集成TypeInformation接口定制實(shí)現(xiàn)自己的序列化工具欢嘿。
九.顯式的內(nèi)存管理
垃圾回收是JVM內(nèi)存管理回避不了的問題衰琐,JDK8的G1算法改善了JVM垃圾回收的效率和可用范圍,但對于大數(shù)據(jù)處理實(shí)際環(huán)境還遠(yuǎn)遠(yuǎn)不夠炼蹦。這也和現(xiàn)在分布式框架的發(fā)展趨勢有所沖突羡宙,越來越多的分布式計(jì)算框架希望盡可能多地將待處理數(shù)據(jù)集放入內(nèi)存,而對于JVM垃圾回收來說掐隐,內(nèi)存中Java對象越少狗热、存活時(shí)間越短,其效率越高虑省。通過JVM進(jìn)行內(nèi)存管理的話匿刮,OutOfMemoryError也是一個很難解決的問題。同時(shí)探颈,在JVM內(nèi)存管理中熟丸,Java對象有潛在的碎片化存儲問題(Java對象所有信息可能在內(nèi)存中連續(xù)存儲),也有可能在所有Java對象大小沒有超過JVM分配內(nèi)存時(shí)伪节,出現(xiàn)OutOfMemoryError問題光羞。Flink將內(nèi)存分為3個部分,每個部分都有不同用途:
- Network buffers: 一些以32KB Byte數(shù)組為單位的buffer怀大,主要被網(wǎng)絡(luò)模塊用于數(shù)據(jù)的網(wǎng)絡(luò)傳輸纱兑。
- Memory Manager pool:大量以32KB Byte數(shù)組為單位的內(nèi)存池,所有的運(yùn)行時(shí)算法(例如Sort/Shuffle/Join)都從這個內(nèi)存池申請內(nèi)存化借,并將序列化后的數(shù)據(jù)存儲其中萍启,結(jié)束后釋放回內(nèi)存池。
- Remaining (Free) Heap:主要留給UDF中用戶自己創(chuàng)建的Java對象屏鳍,由JVM管理勘纯。
Network buffers在Flink中主要基于Netty的網(wǎng)絡(luò)傳輸,無需多講钓瞭。
Remaining Heap用于UDF中用戶自己創(chuàng)建的Java對象驳遵,在UDF中,用戶通常是流式的處理數(shù)據(jù)山涡,并不需要很多內(nèi)存堤结,同時(shí)Flink也不鼓勵用戶在UDF中緩存很多數(shù)據(jù)唆迁,因?yàn)檫@會引起前面提到的諸多問題。
Memory Manager pool(以后以內(nèi)存池代指)通常會配置為最大的一塊內(nèi)存竞穷,接下來會詳細(xì)介紹唐责。
在Flink中,內(nèi)存池由多個MemorySegment組成瘾带,每個MemorySegment代表一塊連續(xù)的內(nèi)存鼠哥,底層存儲是byte[],默認(rèn)32KB大小看政。MemorySegment提供了根據(jù)偏移量訪問數(shù)據(jù)的各種方法朴恳,如get/put int、long允蚣、float于颖、double等,MemorySegment之間數(shù)據(jù)拷貝等方法和java.nio.ByteBuffer類似嚷兔。對于Flink的數(shù)據(jù)結(jié)構(gòu)森渐,通常包括多個向內(nèi)存池申請的MemeorySegment,所有要存入的對象通過TypeSerializer序列化之后冒晰,將二進(jìn)制數(shù)據(jù)存儲在MemorySegment中同衣,在取出時(shí)通過TypeSerializer反序列化。數(shù)據(jù)結(jié)構(gòu)通過MemorySegment提供的set/get方法訪問具體的二進(jìn)制數(shù)據(jù)翩剪。Flink這種看起來比較復(fù)雜的內(nèi)存管理方式帶來的好處主要有:
- 二進(jìn)制的數(shù)據(jù)存儲大大提高了數(shù)據(jù)存儲密度,節(jié)省了存儲空間彩郊。
- 所有的運(yùn)行時(shí)數(shù)據(jù)結(jié)構(gòu)和算法只能通過內(nèi)存池申請內(nèi)存前弯,保證了其使用的內(nèi)存大小是固定的,不會因?yàn)檫\(yùn)行時(shí)數(shù)據(jù)結(jié)構(gòu)和算法而發(fā)生OOM秫逝。對于大部分的分布式計(jì)算框架來說恕出,這部分由于要緩存大量數(shù)據(jù)最有可能導(dǎo)致OOM。
- 內(nèi)存池雖然占據(jù)了大部分內(nèi)存违帆,但其中的MemorySegment容量較大(默認(rèn)32KB)浙巫,所以內(nèi)存池中的Java對象其實(shí)很少,而且一直被內(nèi)存池引用刷后,所有在垃圾回收時(shí)很快進(jìn)入持久代的畴,大大減輕了JVM垃圾回收的壓力。
- Remaining Heap的內(nèi)存雖然由JVM管理尝胆,但是由于其主要用來存儲用戶處理的流式數(shù)據(jù)丧裁,生命周期非常短,速度很快的Minor GC就會全部回收掉含衔,一般不會觸發(fā)Full GC煎娇。
Flink當(dāng)前的內(nèi)存管理在最底層是基于byte[]二庵,所以數(shù)據(jù)最終還是on-heap,最近Flink增加了off-heap的內(nèi)存管理支持缓呛。Flink off-heap的內(nèi)存管理相對于on-heap的優(yōu)點(diǎn)主要在于:
- 啟動分配了大內(nèi)存(例如100G)的JVM很耗費(fèi)時(shí)間催享,垃圾回收也很慢。如果采用off-heap哟绊,剩下的Network buffer和Remaining heap都會很小因妙,垃圾回收也不用考慮MemorySegment中的Java對象了。
- 更有效率的IO操作匿情。在off-heap下兰迫,將MemorySegment寫到磁盤或是網(wǎng)絡(luò)可以支持zeor-copy技術(shù),而on-heap的話則至少需要一次內(nèi)存拷貝炬称。
- off-heap可用于錯誤恢復(fù)汁果,比如JVM崩潰,在on-heap時(shí)數(shù)據(jù)也隨之丟失玲躯,但在off-heap下据德,off-heap的數(shù)據(jù)可能還在。此外跷车,off-heap上的數(shù)據(jù)還可以和其他程序共享棘利。
十.緩存友好的計(jì)算
磁盤IO和網(wǎng)絡(luò)IO之前一直被認(rèn)為是Hadoop系統(tǒng)的瓶頸,但是隨著Spark朽缴、Flink等新一代分布式計(jì)算框架的發(fā)展善玫,越來越多的趨勢使得CPU/Memory逐漸成為瓶頸,這些趨勢包括:
- 更先進(jìn)的IO硬件逐漸普及密强。10GB網(wǎng)絡(luò)和SSD硬盤等已經(jīng)被越來越多的數(shù)據(jù)中心使用茅郎。
- 更高效的存儲格式。Parquet或渤,ORC等列式存儲被越來越多的Hadoop項(xiàng)目支持系冗,其非常高效的壓縮性能大大減少了落地存儲的數(shù)據(jù)量。
- 更高效的執(zhí)行計(jì)劃薪鹦。例如很多SQL系統(tǒng)執(zhí)行計(jì)劃優(yōu)化器的Fliter-Push-Down優(yōu)化會將過濾條件盡可能的提前掌敬,甚至提前到Parquet的數(shù)據(jù)訪問層,使得在很多實(shí)際的工作負(fù)載中并不需要很多的磁盤IO池磁。
由于CPU處理速度和內(nèi)存訪問速度的差距奔害,提升CPU的處理效率的關(guān)鍵在于最大化的利用L1/L2/L3/Memory,減少任何不必要的Cache miss地熄。定制的序列化工具給Flink提供了可能舀武,通過定制的序列化工具,F(xiàn)link訪問的二進(jìn)制數(shù)據(jù)本身离斩,因?yàn)檎加脙?nèi)存較小银舱,存儲密度比較大瘪匿,而且還可以在設(shè)計(jì)數(shù)據(jù)結(jié)構(gòu)和算法時(shí)盡量連續(xù)存儲,減少內(nèi)存碎片化對Cache命中率的影響寻馏,甚至更進(jìn)一步棋弥,F(xiàn)link可以只是將需要操作的部分?jǐn)?shù)據(jù)(如排序時(shí)的Key)連續(xù)存儲,而將其他部分的數(shù)據(jù)存儲在其他地方诚欠,從而最大可能地提升Cache命中的概率顽染。
十一.Flink排序算法
以Flink中的排序?yàn)槔?/strong>排序通常是分布式計(jì)算框架中一個非常重的操作,F(xiàn)link通過特殊設(shè)計(jì)的排序算法獲得了非常好的性能轰绵,其排序算法的實(shí)現(xiàn)如下:
- 將待排序的數(shù)據(jù)經(jīng)過序列化后存儲在兩個不同的MemorySegment集中粉寞。數(shù)據(jù)全部的序列化值存放于其中一個MemorySegment集中。數(shù)據(jù)序列化后的Key和指向第一個MemorySegment集中值的指針存放于第二個MemorySegment集中左腔。
- 對第二個MemorySegment集中的Key進(jìn)行排序唧垦,如需交換Key位置,只需交換對應(yīng)的Key+Pointer的位置液样,第一個MemorySegment集中的數(shù)據(jù)無需改變振亮。 當(dāng)比較兩個Key大小時(shí),TypeComparator提供了直接基于二進(jìn)制數(shù)據(jù)的對比方法鞭莽,無需反序列化任何數(shù)據(jù)坊秸。
- 排序完成后,訪問數(shù)據(jù)時(shí)澎怒,按照第二個MemorySegment集中Key的順序訪問褒搔,并通過Pointer值找到數(shù)據(jù)在第一個MemorySegment集中的位置,通過TypeSerializer反序列化成Java對象返回喷面。
這樣實(shí)現(xiàn)的好處有:
- 通過Key和Full data分離存儲的方式盡量將被操作的數(shù)據(jù)最小化星瘾,提高Cache命中的概率,從而提高CPU的吞吐量乖酬。
- 移動數(shù)據(jù)時(shí)死相,只需移動Key+Pointer融求,而無須移動數(shù)據(jù)本身咬像,大大減少了內(nèi)存拷貝的數(shù)據(jù)量。
- TypeComparator直接基于二進(jìn)制數(shù)據(jù)進(jìn)行操作生宛,節(jié)省了反序列化的時(shí)間县昂。
通過定制的內(nèi)存管理,F(xiàn)link通過充分利用內(nèi)存與CPU緩存陷舅,大大提高了CPU的執(zhí)行效率倒彰,同時(shí)由于大部分內(nèi)存都由框架自己控制,也很大程度提升了系統(tǒng)的健壯性莱睁,減少了OOM出現(xiàn)的可能待讳。
總結(jié)
本文主要介紹了Flink項(xiàng)目的一些關(guān)鍵特性芒澜,F(xiàn)link是一個擁有諸多特色的項(xiàng)目,包括其統(tǒng)一的批處理和流處理執(zhí)行引擎创淡,通用大數(shù)據(jù)計(jì)算框架與傳統(tǒng)數(shù)據(jù)庫系統(tǒng)的技術(shù)結(jié)合痴晦,以及流處理系統(tǒng)的諸多技術(shù)創(chuàng)新等,因?yàn)槠邢蘖詹剩現(xiàn)link還有一些其他很有意思的特性沒有詳細(xì)介紹誊酌,比如DataSet API級別的執(zhí)行計(jì)劃優(yōu)化器,原生的迭代操作符等露乏,感興趣的讀者可以通過Flink官網(wǎng)了解更多Flink的詳細(xì)內(nèi)容碧浊。希望通過本文的介紹能夠讓讀者對Flink有更多的了解,也讓更多的人使用甚至參與到Flink項(xiàng)目中去瘟仿。