摘要:本文由阿里云 Flink 團隊郭偉杰老師撰寫,旨在向 Flink Batch 社區(qū)用戶介紹 Flink DataStream API 批處理能力的演進之路症脂。內(nèi)容主要分為以下三個部分:
- 批處理語義和性能優(yōu)化
- Batch API 功能增強
- 總結(jié)
最近在和一個朋友閑聊時谚赎,他問了一個很有意思的問題:Flink 是如何在流處理引擎上支持批處理能力的淫僻?
鑒于 Flink 已經(jīng)成為了流處理領(lǐng)域的事實標準,可能很多人都不知道壶唤,F(xiàn)link 在誕生的第一天起就是支持批處理的雳灵。DataSet API 也是從那時起就被引入的,它被用來支持對有界數(shù)據(jù)的批處理操作闸盔。隨著 Flink 社區(qū)逐步意識到基于 Pipeline 的架構(gòu)非常適合流處理悯辙,因此發(fā)展出了 DataStream API[1],它是為無界的流式應(yīng)用開發(fā)的蕾殴,引入了狀態(tài)笑撞,事件時間和窗口等特殊概念岛啸。
但隨著對兩套 API 本質(zhì)的深入思考钓觉,F(xiàn)link 社區(qū)逐漸發(fā)現(xiàn):DataStream API 其實完全可以成為 DataSet API 的超集。
- 概念上:有界數(shù)據(jù)集只是無限數(shù)據(jù)流的一種特例坚踩。
- 語義上:DataStream API 覆蓋了 DataSet API 的大部分荡灾,同時還有針對實時流計算的擴展,只有少數(shù)和分區(qū)有關(guān)的語義暫時沒有支持瞬铸。具體的差異見下圖:
同時維護兩套 API 也對社區(qū)造成了很大的困擾批幌,并且用戶開發(fā)作業(yè)前必須提前在兩種 API 中作出選擇。對于用戶來說嗓节,離線和實時作業(yè)具有相同的處理邏輯是很常見的荧缘。如果只編寫一次代碼,就能達到分別開發(fā)流和批兩個作業(yè)的相同效果拦宣,將會帶來極大的便利截粗。鑒于以上諸多原因,F(xiàn)link 最終走上了基于 DataStream 的流批一體的發(fā)展道路鸵隧。也正是如此绸罗,F(xiàn)link 社區(qū)早在 1.12 版本就開始逐步棄用 DataSet API,將會在 Flink 2.0 中完全移除 DataSet 相關(guān)代碼豆瘫。同時珊蟀,不斷提升 DataStream 上的批處理能力,以 DataStream 為核心打造流批一體的 API外驱。
流批一體是一個相對寬泛的概念育灸,它包含 API,調(diào)度,Shuffle赐劣,容錯等多個維度筋帖,本文主要關(guān)注于 API 及其底層算子執(zhí)行上 DataStream 對批處理所做的工作,其他細節(jié)可以參考文章《Flink 執(zhí)行引擎:流批一體的融合之路》[2]绽诚。下面我們將沿著批處理語義和性能優(yōu)化以及 Batch API 功能增強兩個大的方向回顧 Flink DataStream API 批處理能力演進之路。
1. 批處理語義和性能優(yōu)化
DataStream API 雖然理論上可以覆蓋絕大多數(shù) DataSet API 上的語義和操作,但在一些細微之處還是存在一些差異恩够。下面我們從幾個方面詳細介紹一些 Flink 社區(qū)在這方面所做的努力卒落。
1.1 輸出語義
為了最大化數(shù)據(jù)的實時性,DataStream 上算子的輸出是增量式的蜂桶。例如:KeyedStream.reduce
儡毕,它會在每次到來一條新的數(shù)據(jù)時更新內(nèi)部維護的狀態(tài),并向下游發(fā)送當前最新的聚合值扑媚。用數(shù)據(jù)庫的術(shù)語來說腰湾,它產(chǎn)生了一個 Upsert 流作為輸出: 如果一個鍵有 10 個輸入元素,那么也會得到 10 條輸出記錄疆股。
而對于實時性往往沒有這么強要求的批作業(yè)來說费坊,這些中間的增量輸出會極大地增加下游算子的計算負擔。由于批作業(yè)的算子不需要感知數(shù)據(jù)的 Changelog, 其更期望的是一種 All-or-Nothing 式的輸出語義旬痹,即僅僅在每個 Key 最后一條數(shù)據(jù)到來后附井,才向下游發(fā)送數(shù)據(jù)。因此两残,我們需要在批模式下對一些 API(例如:KeyedStream#reduce, sum,min,max,minBy,maxBy) 的行為做出改變永毅,使其僅在輸入結(jié)束時輸出最終結(jié)果。
下表描述了 Sum 操作在流和批兩種模式下的輸入輸出情況:
(假設(shè)它們具有相同的 Key人弓,4 為該 Key 的最后一條數(shù)據(jù))
輸入 | 流模式輸出 | 批模式輸出 |
---|---|---|
1 | 1 | 無 |
2 | 3 | 無 |
3 | 6 | 無 |
4 | 10 | 10 |
1.2 狀態(tài)訪問和更新算法
對于有狀態(tài)算子沼死,DataSet 算子在迭代數(shù)據(jù)時直接在內(nèi)存中維護最新的狀態(tài)值。在 DataStream API 中崔赌,狀態(tài)的訪問和更新則是通過與 StateBackend 交互所進行的意蛀。實現(xiàn)流批一體的統(tǒng)一架構(gòu),就意味著 DataStream API 在流和批模式下要盡可能復(fù)用相同的算子實現(xiàn)峰鄙。但是與 RocksDB 等 StateBackend 交互會帶來不小的 IO 開銷浸间,站在 Flink 開發(fā)者的視角上,該如何解決這個問題呢吟榴?讓我們更深入地思考一下他們之間的本質(zhì)差異魁蒜。
流模式下 DataStream API 上的聚合算法其實可以類比為基于 Hash 的聚合,StateBackend 在這里扮演著哈希表的角色吩翻。下圖展示了在流模式下一個 KeyedOperator 的輸入數(shù)據(jù)和狀態(tài)存儲的關(guān)系:
(綠色部分表示新數(shù)據(jù)到來后狀態(tài)存儲的更新)
我們可以看到:在流模式下兜看,狀態(tài)存儲必須維持一個哈希表,為每個 Key 存儲一條 Item狭瞎。值得注意的是细移,該狀態(tài)并不是完全存儲在內(nèi)存中的,達到一定閾值后需要溢寫到磁盤熊锭。由于批作業(yè)是沒有 Checkpoint 的弧轧,并且其 Shuffle 的中間數(shù)據(jù)直接寫入到了磁盤中雪侥,發(fā)生 Failover 后直接從上一個 Stage 的數(shù)據(jù)重新計算狀態(tài)即可,因此并不需要對狀態(tài)進行持久化存儲精绎,理論上狀態(tài)完全可以放在內(nèi)存中速缨。
接下來要考慮的是內(nèi)存是否有 OOM 風(fēng)險:對于單個 Key 來說,其狀態(tài)不會非常大代乃。由于批作業(yè)的數(shù)據(jù)是有界的旬牲,如果我們能對 key 進行分組,就可以在同一時間只追蹤單一 Key 的狀態(tài)搁吓。沿著這個思路原茅,我們可以把基于 Hash 的狀態(tài)訪問算法變?yōu)榛谂判虻摹R虼硕樽校現(xiàn)link 在批執(zhí)行模式下會對 KeyOperator 的所有輸入數(shù)據(jù)按 Key 進行排序擂橘,并且在該模式下使用一種特殊的 StateBackend,它在內(nèi)存中追蹤當前 Key 所對應(yīng)的狀態(tài)贮预,當 Key 發(fā)生切換時清除上一個 Key 的狀態(tài)值贝室。
批執(zhí)行模式下契讲,一個 KeyedOperator 的輸入數(shù)據(jù)和狀態(tài)存儲的關(guān)系如下圖所示:
需要注意的是仿吞,這種方式引入了額外的數(shù)據(jù)排序開銷,當狀態(tài)訪問的頻率比較低捡偏,狀態(tài)的數(shù)據(jù)量比較小時唤冈,對性能會有負面影響。但是考慮到絕大多數(shù)的批處理作業(yè)規(guī)模都比較大银伟,其中的有狀態(tài)算子往往需要 per-record 的訪問和更新狀態(tài)你虹。比如對常見的 Join、Group Agg 等彤避,往往存在很多重復(fù) Key 的數(shù)據(jù)傅物,該優(yōu)化帶來的收益通常比排序帶來的開銷要大的多。
1.3 EventTime 和 Watermark
實時數(shù)據(jù)流中事件可能是亂序的琉预,即時間戳為 T 的事件可能出現(xiàn)在時間戳為 T+1 的事件之后董饰。此外,系統(tǒng)無法確定是否將來還有時間戳為 t < T 的元素到來圆米。因此卒暂,F(xiàn)link 的流處理模式是建立在事件的順序無法得到保證的前提下的。為了消除這種無序性帶來的影響娄帖,F(xiàn)link 引入了一種名為 Watermark 的標記也祠。一個時間戳為 T 的 Watermark 到來,表示不會再收到或者可以直接忽略任何 t < T 的數(shù)據(jù)近速。
但在批執(zhí)行模式下诈嘿,數(shù)據(jù)是有界的堪旧,我們明確知道每一條數(shù)據(jù)的時間,因此可以認為不存在無法預(yù)知的遲到數(shù)據(jù)奖亚。發(fā)送中間的 Watermark 是沒有意義的, 反而只會增加網(wǎng)絡(luò)傳輸?shù)膲毫拖掠翁幚磉@些 Watermark 的復(fù)雜度崎场。由于定時器和窗口的閉合都需要 Watermark 來觸發(fā),因此我們可以只在輸入結(jié)束時發(fā)送 MAX_WATERMARK遂蛀,或者在每個 Key 結(jié)束時發(fā)送 MAX_WATERMARK谭跨。這樣既不會引入太多開銷,又可以統(tǒng)一流批算子對于 EventTime 的處理李滴。
2. Batch API 功能增強
需要注意的是螃宙,DataStream API 和 DataSet API 所支持的操作并非完全一一對應(yīng)。Flink 社區(qū)有一個官方遷移文檔來專門講解如何從 DataSet 作業(yè)遷移到 DataStream 作業(yè)[3] (下文簡稱文檔)所坯。在該文檔中谆扎,根據(jù)遷移所帶來的代碼改動和執(zhí)行效率的差異,把 DataSet API 分成了四大類:
- 在 DataStream 有等價的 API芹助,只需要很少的方法名改動就可以完成遷移堂湖。
- 通過 DataStream 上不等價的其他 API 可以實現(xiàn)同樣的行為,遷移雖然需要進行代碼改動状土,但是執(zhí)行效率和 DataSet 相同无蜂。
- 通過 DataStream 上不等價的其他 API 可以實現(xiàn)同樣的行為,遷移不僅需要進行代碼改動蒙谓,而且執(zhí)行效率可能會存在一些差異斥季。
- 目前 DataStream 沒有支持,且沒有簡單的 Workaround 的 API累驮。
按照目前 DataStream 上對這些操作的支持情況酣倾,我們又可以把它們進一步分為下面兩大類:
2.1 完美支持或者可以通過 Workaround 支持
上述四類中,第1和第2類都屬于可以無痛遷移的谤专,第3類可以通過 Workaround 來實現(xiàn)躁锡,但是在執(zhí)行效率上有比較大的差異。因此置侍,我們主要關(guān)注于第三和第四類映之。
第三類主要有兩種操作:全量 Partition 處理以及笛卡爾積。DataStream 上可以通過 Window 機制來支持這類需求墅垮,但是其中主要存在以下兩個問題:
(1)需要明確知道輸入在什么時候結(jié)束惕医,在拿到全量數(shù)據(jù)后才能進行處理。
Flink 目前內(nèi)置的窗口一般都是隨著時間推進到某個具體的點算色,或者輸入數(shù)據(jù)的量達到某個具體的值來觸發(fā)的抬伺。并沒有一種能夠感知輸入是否結(jié)束的窗口實現(xiàn)。文檔通過自定義的 WindowAssigner 和 Window Trigger 實現(xiàn)了一種僅在輸入結(jié)束時才觸發(fā)計算的窗口灾梦。
隨著用戶作業(yè)的遷移峡钓,我們看到這種需求其實廣泛存在妓笙,因此 Flink 社區(qū)在 FLIP-331[4] 中提出了EndOfStreamWindow 的概念,并會在 Flink 1.20 版本中進行支持能岩,你可以通過如下方式來使用:
input.window(GlobalWindows.createWithEndOfStreamTrigger())
.apply(
new WindowFunction<T, R, KEY, GlobalWindow>() {
@Override
public void apply(
KEY key,
GlobalWindow window,
Iterable<T> input,
Collector<R> out)
throws Exception {
// do something with the iterable input, It has all the input data.
}
},
resultType);
(2)Non-Keyed Stream 上不支持窗口操作
Flink 中的窗口是基于 State 來實現(xiàn)的寞宫,而不同 Key 的 State 是不屬于同一個命名空間的,因此窗口只有在能明確定義 Key 的流上才有意義拉鹃。文檔中引入如下函數(shù)來給數(shù)據(jù)附加上當前分區(qū)(并行度)的信息辈赋,然后以該字段作為數(shù)據(jù)的 Key。
public static class AddSubtaskIDMapFunction<T> extends RichMapFunction<T, Tuple2<String, T>> {
@Override
public Tuple2<String, T> map(T value) {
return Tuple2.of(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()), value);
}
}
這種方式雖然可以產(chǎn)生正確結(jié)果膏燕,但也引入了per-record的額外開銷钥屈。為了優(yōu)化這個問題,F(xiàn)link 社區(qū)在 FLIP-380[5] 中引入了對 Non-Keyed 全量分區(qū)處理的原生支持坝辫。下面一一介紹這幾個 API 的使用方式和注意事項:
2.1.1Map Partition
該 API 用來對一個分區(qū)的數(shù)據(jù)做全量處理篷就,并在獲取所有數(shù)據(jù)后進行輸出。
假如我們需要計算每個分區(qū)內(nèi)數(shù)據(jù)的條數(shù)近忙,并輸出給下游算子竭业。可以使用如下方式來實現(xiàn):
inputStream.fullWindowPartition()
.mapPartition(
new MapPartitionFunction<Record, Long>() {
@Override
public void mapPartition(
Iterable<Record> values, Collector<Long> out)
throws Exception {
long counter = 0;
for (T value : values) {
counter++;
}
out.collect(counter));
}
})
它與 map 的主要區(qū)別如下:
MapPartition | Map | |
---|---|---|
計算觸發(fā)時機 | 所有輸入結(jié)束后觸發(fā)一次 | 每條輸入數(shù)據(jù)都會觸發(fā)一次 |
輸入數(shù)據(jù)類型 | 包含所有數(shù)據(jù)的 Iterable 對象 | 每條數(shù)據(jù)自身 |
值得注意的是:MapPartition 雖然給調(diào)用者提供了一個基于全量數(shù)據(jù)的 Iterable 對象及舍,但它并不會把全量數(shù)據(jù)都加載到內(nèi)存未辆。該 API 的底層實現(xiàn)充分利用了 Flink 執(zhí)行引擎的反壓機制,在對 Iterable 對象進行迭代時只會按需把數(shù)據(jù)加載到內(nèi)存击纬。
2.1.2Reduce/Aggregate Partition
該 API 主要用于對分區(qū)內(nèi)的數(shù)據(jù)做全量聚合鼎姐,分別需要傳入 ReduceFunction 和 AggregateFunction。ReduceFunction 描述了兩條輸入數(shù)據(jù)如何合并產(chǎn)生同樣類型的輸出數(shù)據(jù)更振,而 AggregateFunction 是更通用的 ReduceFunction, 它通過引入一個中間的 Accumulator, 支持產(chǎn)生不同類型的輸出。
下面的例子展示了如何在一個雙字段的 Tuple 數(shù)據(jù)流上對其第二個字段做全量聚合
inputStream.fullWindowPartition()
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(
Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
})
2.1.3 Sort Partition
另一種比較重要的操作就是排序饭尝,對分區(qū)內(nèi)數(shù)據(jù)進行排序的需求在批處理中是廣泛存在的肯腕。理論上,我們可以通過 MapPartition 來輕松實現(xiàn)全內(nèi)存的排序钥平,但是在大規(guī)模 Batch 作業(yè)中实撒,把數(shù)據(jù)全部加載到內(nèi)存中往往是不現(xiàn)實的。sortPartition API 支持外部排序涉瘾,在數(shù)據(jù)量到達一定閾值后會溢寫磁盤知态,因此無需擔心內(nèi)存的 OOM 問題。
下面是一個對分區(qū)內(nèi)數(shù)據(jù)做全量升序排列的示例代碼:
DataStreamSource<Tuple2<String, Integer>> source = xxxxx
// 按照 tuple 的第一個字段進行排序
SingleOutputStreamOperator<Tuple2<String, Integer>> sortedPartition =
source.fullWindowPartition().sortPartition(1, Order.ASCENDING);
注意:排序算子會使用 Flink Managed Memory立叛。內(nèi)存的大小會影響排序的效率负敏,過小的內(nèi)存會導(dǎo)致數(shù)據(jù)頻繁地寫入和讀出磁盤。如果你的一些排序操作相對較重(數(shù)據(jù) Record 比較大秘蛇,數(shù)據(jù)量比較多)其做,建議調(diào)大“execution.sort-partition.memory”值來提升性能顶考。
2.2 目前還不支持
上述第四類代表目前 Flink DataStream API 還沒有支持的操作。主要有兩種: RangePartition 和 GroupCombine.
其中 GroupCombine 會把數(shù)據(jù)分成多個批次妖泄,對每個批次的數(shù)據(jù)進行合并驹沿。它并不是用戶的業(yè)務(wù)需求,是引擎為了提高執(zhí)行效率而對用戶的需求蹈胡,因此Flink 社區(qū)暫時沒有計劃支持該操作渊季。而 RangePartition 基于現(xiàn)有的 DataStream API 可以實現(xiàn),但是相對復(fù)雜(需要用戶實現(xiàn)復(fù)雜的采樣算法)罚渐,筆者所在的團隊已經(jīng)對此在做 PoC 實現(xiàn)了梭域,未來會在合適的時機貢獻回社區(qū)。
3. 總結(jié)
本文回顧了 Flink 在批處理能力上從 DataSet API 到流批一體的 DataStream API 的演進搅轿,并從批處理語義&性能優(yōu)化以及 Batch API 功能增強兩大方面分別展示了 Flink 社區(qū)是如何思考和提升 DataStream 批處理能力的病涨,相信隨著社區(qū)的不斷努力,F(xiàn)link Batch 會越來越好璧坟。Flink DataStream API 的流批一體能力也將在數(shù)據(jù)處理領(lǐng)域扮演越來越重要的角色既穆。
[2] https://developer.aliyun.com/article/783112
[3] https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/dataset_migration/
歡迎大家加入 Flink Batch 交流釘釘群。本群旨在為 Flink Batch 愛好者提供一個交流技術(shù)和傳遞資訊的平臺雀鹃,在這里:
- 你可以掌握Flink Batch前沿的資訊幻工,可以與 Flink 開發(fā)者及 Committer 面對面交流
- Flink Batch 的問題集中解決,各位開發(fā)者及 Committer 及時解決你的 Blocker
“Flink Batch 交流群”群的釘釘群號: 34817520