摘要:本文整理自阿里云高級研發(fā)工程師轰异、Apache Flink Contributor 周云峰老師在 Apache Asia CommunityOverCode 2024中的分享澳窑。內(nèi)容主要分為以下三個部分:
- 從流批一體到流批融合
- 流批融合的技術(shù)解決方案
- 社區(qū)進展及未來展望
一恋追、從流批一體到流批融合
1.流批一體
在流批融合之前,F(xiàn)link 已經(jīng)提出過流批一體的理念曙聂。流批一體主要體現(xiàn)在以下方面的統(tǒng)一:
(1)API 統(tǒng)一:Flink 通過提供統(tǒng)一的 DataStream 和 SQL API晦炊,使得用戶在進行離線和在線作業(yè)時不需要開發(fā)兩套代碼,從而提升了開發(fā)效率宁脊。
(2)算子統(tǒng)一:在算子層面實現(xiàn)了統(tǒng)一断国,使用同一套算子既可以處理流作業(yè),也可以處理批作業(yè)榆苞,確保流作業(yè)和批作業(yè)在數(shù)據(jù)處理邏輯稳衬、正確性和語義行為上保持一致。
(3)引擎統(tǒng)一:使用同一個引擎和同一套資源調(diào)度框架坐漏,避免了為流作業(yè)和批作業(yè)搭建不同工作流的需求宋彼,從而優(yōu)化了運維效率。
這些都是 Flink 在流批一體方面已經(jīng)取得的一些成果仙畦。然而,在現(xiàn)有流批一體的基礎(chǔ)上音婶,用戶仍然需要配置一個作業(yè)是流作業(yè)還是批作業(yè)慨畸,并根據(jù)作業(yè)所在的離線或在線場景,采用不同的優(yōu)化策略衣式。這些配置策略仍然會一定程度上增加用戶運維 Flink 作業(yè)的工作量寸士。而這正是流批融合希望解決的問題檐什。
2. 影響流批不同模式的前提條件
在流批融合的背景下,通過分析用戶配置流模式和批模式的思路弱卡,我們發(fā)現(xiàn)流批兩種模式乃正,實際上是根據(jù)不同的前提條件采用不同的優(yōu)化策略。這些前提條件主要包括以下兩個方面:
(1)用戶對性能的傾向性:在配置批作業(yè)時婶博,用戶通常傾向于追求高吞吐量或高資源利用率瓮具。而在流作業(yè)中,用戶則期望低延遲凡人、高數(shù)據(jù)新鮮度和實時性名党。
(2)對數(shù)據(jù)的先驗知識:在批模式下,所有數(shù)據(jù)在一開始都是已準(zhǔn)備好的挠轴,因此作業(yè)可以根據(jù)數(shù)據(jù)的一些統(tǒng)計信息進行相應(yīng)的優(yōu)化传睹。而在流模式下,由于作業(yè)通常不知道未來會有哪些數(shù)據(jù)岸晦,因此需要對隨機訪問等領(lǐng)域進行優(yōu)化欧啤,以提供更好的支持。
由于前提條件不同启上,F(xiàn)link 在流模式和批模式下采取了不同的優(yōu)化策略邢隧。這些策略主要體現(xiàn)在資源調(diào)度、狀態(tài)訪問和容錯機制等方面:
(1)資源調(diào)度(Scheduling):
批作業(yè):可以做到見縫插針式的資源使用方式碧绞。即使當(dāng)前物理資源不滿足所有算子同時執(zhí)行的需求府框,也可以先利用現(xiàn)有資源執(zhí)行一部分任務(wù)(task)。任務(wù)執(zhí)行完后空出的資源可以調(diào)度下一批任務(wù)讥邻,從而提高資源利用率迫靖。
流作業(yè):為了保證更好的實時性,流作業(yè)需要在一開始就申請好從源頭(source)到終點(sink)的所有算子及其并發(fā)資源兴使,以確保數(shù)據(jù)流的連續(xù)性和低延遲系宜。
(2)狀態(tài)訪問(State Access):
批作業(yè):可以只保存一個主鍵對應(yīng)的狀態(tài),并連續(xù)處理該主鍵的所有數(shù)據(jù)发魄。
流作業(yè):由于無法預(yù)知下一個相同主鍵的數(shù)據(jù)何時到來盹牧,需要保存所有主鍵的狀態(tài),并對隨機訪問進行優(yōu)化励幼,以支持實時處理汰寓。
(3)容錯機制(Fault Tolerance):
批作業(yè):在每個任務(wù)執(zhí)行完之后,F(xiàn)link作業(yè)可以暫時將中間結(jié)果緩存下來苹粟,然后下一個任務(wù)可以接著消費這個中間結(jié)果有滑。當(dāng)某個任務(wù)失敗時,只需重啟該任務(wù)嵌削,并從之前保存的中間結(jié)果重新消費即可毛好。
流作業(yè):Flink引入了檢查點(checkpoint)機制望艺,通過定時對整個數(shù)據(jù)處理鏈路進行快照,實現(xiàn)容錯肌访。當(dāng)某個任務(wù)失敗時找默,可以從最近的檢查點恢復(fù),從而保證數(shù)據(jù)處理的連續(xù)性和一致性吼驶。
由于這些不同的前提條件惩激,F(xiàn)link在流模式和批模式下本質(zhì)上采用了不同的優(yōu)化策略,以滿足各自的性能需求和操作特點旨剥。
3.前提條件的動態(tài)變化
在進一步的探索中咧欣,我們發(fā)現(xiàn)這些前提條件并不是在作業(yè)的整個生命周期中一成不變、而是可能會在運行時動態(tài)變化的轨帜。
在離線場景中魄咕,作業(yè)場景一般始終具有高吞吐量的傾向。
而在實時場景下蚌父,用戶通常更注重低延遲哮兰、高實時性和高數(shù)據(jù)新鮮度。然而苟弛,當(dāng)實時場景出現(xiàn)數(shù)據(jù)積壓時喝滞,由于客觀因素的限制,F(xiàn)link 作業(yè)此時已經(jīng)無法維持端到端的低延遲策略膏秫。這時右遭,用戶追求的是以最短時間消費完現(xiàn)有的數(shù)據(jù)積壓、盡快恢復(fù)到實時狀態(tài)缤削,即高吞吐量策略窘哈。
在全增量一體化的場景中,這兩種模式的區(qū)別進一步被細(xì)化為全量和增量的區(qū)別亭敢。在這兩種狀態(tài)下滚婉,除了對吞吐量和實時性的要求不同外,還有關(guān)于數(shù)據(jù)先驗知識的變化帅刀。在同步一個全量數(shù)據(jù)庫的場景下让腹,所有數(shù)據(jù)之間的主鍵不會重復(fù),是對整個數(shù)據(jù)庫進行一次全面掃描扣溺。而在增量場景下骇窍,可能會出現(xiàn)更新操作,對已有的重復(fù)主鍵進行數(shù)據(jù)更新锥余。
這些說明我們需要 Flink 能夠根據(jù)作業(yè)運行時需求的動態(tài)變化像鸡,產(chǎn)生不同的優(yōu)化策略。
4.流批融合的目標(biāo)
基于以上對前提條件和場景的分析,我們可以看到只估,流批融合想要實現(xiàn)的目標(biāo)是使用戶不再需要手動配置流模式或批模式,而是通過 Flink 框架自動檢測用戶在不同場景下(實時和離線)對吞吐量着绷、延遲以及數(shù)據(jù)特征的需求蛔钙,動態(tài)地進行相應(yīng)的優(yōu)化。這使得 Flink 能夠根據(jù)用戶對吞吐量和延遲的傾向性荠医,以及數(shù)據(jù)特征的變化吁脱,自動調(diào)整優(yōu)化策略。
二彬向、實現(xiàn)流批融合的技術(shù)方案
下面介紹Flink是怎樣實現(xiàn)這些目標(biāo)的兼贡。
1.數(shù)據(jù)流批傾向性的定量指標(biāo)
首先,我們將用戶的傾向性或數(shù)據(jù)特征量化為兩個指標(biāo)娃胆。
第一個指標(biāo)是 isProcessingBacklog遍希,可以理解為用于判斷當(dāng)前是否存在數(shù)據(jù)積壓。當(dāng)出現(xiàn)數(shù)據(jù)積壓時里烦,作業(yè)需要在最短時間內(nèi)處理這些積壓數(shù)據(jù)凿蒜。這時,作業(yè)可以通過犧牲延遲來優(yōu)化胁黑,從而提高吞吐量废封。在沒有數(shù)據(jù)積壓的情況下,作業(yè)應(yīng)該像現(xiàn)有的流模式那樣丧蘸,盡量保證低延遲和高數(shù)據(jù)新鮮度的目標(biāo)漂洋。
第二個量化指標(biāo)是 isInsertOnly,可以大致理解為全增量一體中全量場景和增量場景之間的區(qū)別力喷。在 isInsertOnly 情況下刽漂,所有數(shù)據(jù)都會是 Insert 類型,而不是更新(update)或刪除(delete)類型的數(shù)據(jù)冗懦。這些數(shù)據(jù)的主鍵也互不重復(fù)爽冕。
2.量化指標(biāo)的收集
怎樣去收集獲取剛才提到的兩個量化指標(biāo)呢?一個主要的思路是從數(shù)據(jù)源(source)獲取數(shù)據(jù)披蕉。
例如颈畸,對于有明確階段的數(shù)據(jù)源(如Hybrid Source),它可能會先讀取文件系統(tǒng)中的一個文件没讲,將全量文件讀取完畢后眯娱,再消費實時消息隊列中的數(shù)據(jù)。在這種情況下爬凑,初始階段消費文件時徙缴,作業(yè)總會存在數(shù)據(jù)積壓院崇,此時isProcessingBacklog = true;而在后續(xù)階段消費消息隊列時彼念,isProcessingBacklog 才會從 true 變?yōu)?false掩驱。
對于 CDC source 也是類似。在全量場景下穿剖,isProcessingBacklog 等于 true蚤蔓,而在增量場景下則等于 false。
對于沒有明確階段的源(如普通的消息隊列)糊余,作業(yè)可以根據(jù) Flink 現(xiàn)有的一些指標(biāo)(metrics)來判斷是否存在數(shù)據(jù)積壓秀又,例如 Watermark 延遲。Watermark 代表作業(yè)數(shù)據(jù)時間的當(dāng)前水位贬芥,其時間戳與系統(tǒng)時間之間的差異表現(xiàn)為 Watermark 延遲吐辙。當(dāng)延遲高于一定閾值時,說明當(dāng)前存在數(shù)據(jù)積壓蘸劈;反之昏苏,則沒有數(shù)據(jù)積壓。
前面介紹了判斷 isProcessingBacklog 的方法昵时。而對于判斷 isInsertOnly 的方式捷雕,目前主要支持 CDC source。在全量階段壹甥,isInsertOnly 等于 true救巷,而在增量階段,isInsertOnly 等于 false句柠。
3.基于量化指標(biāo)的優(yōu)化策略
在收集到這些指標(biāo)后浦译,接下來要做的是在各個算子中根據(jù)當(dāng)前這兩個指標(biāo)的狀態(tài),采用不同的優(yōu)化策略溯职。
(1)Processing Time Temporal Join
首先精盅,對于 isProcessingBacklog,優(yōu)化措施之一是更好地支持基于處理時間的臨時連接(Processing Time Temporal Join)谜酒。這種 Join 不依賴于 Probe Side 和 Build Side 數(shù)據(jù)本身的時間因素叹俏,而是依賴于系統(tǒng)時間。當(dāng)Probe Side數(shù)據(jù)到達Flink系統(tǒng)時僻族,根據(jù)當(dāng)前最新的 Build Side 數(shù)據(jù)做 Join 即可粘驰。
這種做法在語義上沒有問題,但在實際操作中可能會遇到以下情況:當(dāng) Build Side 有數(shù)據(jù)積壓時述么,有可能某條數(shù)據(jù)已經(jīng)被更新到 Build Side 上游的服務(wù)中了蝌数,但由于數(shù)據(jù)積壓,這條數(shù)據(jù)無法及時被Flink系統(tǒng)接收到度秘。此時當(dāng) Probe Side 的一條數(shù)據(jù)過來進行 Join 時顶伞,就可能無法與這條數(shù)據(jù)進行匹配,從而導(dǎo)致Join結(jié)果不包含預(yù)期中的所有數(shù)據(jù)。
為了解決這個問題唆貌,F(xiàn)link 采取的優(yōu)化措施是: 當(dāng) Build Side 存在數(shù)據(jù)積壓滑潘,即 isProcessingBacklog 等于 true 時,Join 算子先暫停消費 Probe Side 的數(shù)據(jù)锨咙。等作業(yè)追上 Build Side 的數(shù)據(jù)后众羡,Join 算子再繼續(xù)消費 Probe Side 的數(shù)據(jù),從而避免之前提到的 Join 丟失情況蓖租。
(2)調(diào)整checkpoint時間間隔
第二個優(yōu)化是調(diào)整 Flink 的 checkpoint 時間間隔。以 Paimon Sink 為例羊壹,F(xiàn)link 的一些 Connector 能夠保證數(shù)據(jù) exactly-once 的語義蓖宦,并且其 exactly-once 語義依賴于二階段提交能力。而 Paimon Sink 二階段提交的頻率和 Flink 的 checkpoint 時間間隔保持一致油猫。
因此稠茂,這里進行了一個優(yōu)化:當(dāng) isProcessingBacklog 為 true 或 false 時,用戶可以分別設(shè)置不同的 checkpoint 時間間隔情妖。當(dāng)數(shù)據(jù)積壓時睬关,用戶可以配置一個較長的時間間隔,以盡量減少 Paimon Sink 執(zhí)行二階段提交的次數(shù)及其開銷毡证。通過這種方式电爹,Paimon Sink 能夠增加全量數(shù)據(jù)同步階段的吞吐量。
(3)優(yōu)化數(shù)據(jù)的處理順序
另一個基于 isProcessingBacklog 的優(yōu)化是對輸入數(shù)據(jù)進行排序料睛。前面提到丐箩,批處理作業(yè)相比于流處理作業(yè)的一個優(yōu)勢在于可以連續(xù)消費同一個主鍵的數(shù)據(jù),在本地的 Flink 算子中只需要保存一個主鍵所對應(yīng)的狀態(tài)恤煞,不需要有隨機訪問 Key-Value Store的開銷屎勘。受此啟發(fā),我們將這種優(yōu)化應(yīng)用到流處理作業(yè)上居扒。
具體來說概漱,當(dāng) isProcessingBacklog 為 true 時,下游算子會先暫停消費數(shù)據(jù)喜喂,然后對上游積壓的數(shù)據(jù)進行排序瓤摧。排序完成后,算子再對相同主鍵的數(shù)據(jù)進行連續(xù)消費夜惭。這樣不僅不會明顯增加數(shù)據(jù)延遲姻灶,還通過減少隨機訪問狀態(tài)后端的開銷,優(yōu)化了作業(yè)的整體吞吐量诈茧。
(4)基于isInsertOnly優(yōu)化Sink行為
關(guān)于 isInsertOnly 的優(yōu)化产喉,目前看到的主要應(yīng)用在 Paimon sink 和 Hologres sink 上。首先,Paimon sink 除了更新數(shù)據(jù)本身以外曾沈,還支持生成變更日志(changelog)这嚣。在通常情況下,changelog 和數(shù)據(jù)文件之間的關(guān)系可以理解為:changelog 是原始的輸入數(shù)據(jù)塞俱,而數(shù)據(jù)文件是對原始輸入數(shù)據(jù)進行去重和更新后的結(jié)果姐帚。
在 isInsertOnly 為 true 的情況下,因為所有數(shù)據(jù)的主鍵都不同障涯,我們可以認(rèn)為 changelog 去重過程實際上并沒有進行去重操作罐旗,changelog 和數(shù)據(jù)文件的內(nèi)容是相同的∥ǖ基于這一信息九秀,Paimon sink 可以不需要獨立地進行序列化、格式轉(zhuǎn)換粘我、分別寫出 changelog 文件和數(shù)據(jù)文件鼓蜒;相反,Paimon sink 只需要寫出一份數(shù)據(jù)文件征字,然后再拷貝一份作為 changelog 文件即可都弹。這樣做減少了 CPU 的開銷。
類似的優(yōu)化也可以應(yīng)用在 Hologres sink上匙姜。在 isInsertOnly 為 true 的情況下畅厢,Hologres sink可以使用批量插入(batch insert)并避免預(yù)寫日志(write-ahead logging)這些步驟。當(dāng)寫入一條數(shù)據(jù)時搁料,作業(yè)不需要查詢數(shù)據(jù)庫中是否存在該主鍵的數(shù)據(jù)來決定是更新現(xiàn)有數(shù)據(jù)還是插入一條新數(shù)據(jù)或详,因為isInsertOnly 語義已經(jīng)保證不會出現(xiàn)主鍵重復(fù)的情況。因此作業(yè)可以跳過這些查詢和更新的開銷郭计,從而實現(xiàn)吞吐量的優(yōu)化霸琴。
三、社區(qū)進展以及未來展望
最后介紹上面提到的這些優(yōu)化昭伸,它們現(xiàn)在的一些進展以及未來展望梧乘。
1.isProcessingBacklog進展
首先,關(guān)于 isProcessingBacklog庐杨,目前社區(qū)已經(jīng)完成了根據(jù) source 的不同階段生成 isProcessingBacklog 信號选调,并將這個信號傳遞到下游、用它來調(diào)控 Flink 的 checkpoint 時間間隔的部分灵份。對于前面提到的對所有數(shù)據(jù)進行排序以減少狀態(tài)開銷仁堪、以及根據(jù) watermark 的延遲來判斷 isProcessingBacklog 的功能,它們已經(jīng)在社區(qū)中提出討論填渠,但還沒有最終完成弦聂。這里每個條目后都有對應(yīng)的 Flink 設(shè)計文檔編號鸟辅,感興趣的讀者可以參考相關(guān)的具體設(shè)計文檔。
2.isInsertOnly進展
關(guān)于 isInsertOnly莺葫,前面介紹到的內(nèi)容匪凉,包括從 CDC(Change Data Capture)收集 isInsertOnly 信號,并利用該信號優(yōu)化 Paimon Sink 和 Hologres Sink的部分捺檬,這些功能已經(jīng)在阿里云商業(yè)化版本的 Flink 中完成再层,并預(yù)計將在下個版本發(fā)布。
另外堡纬,isInsertOnly 信號的語義與 Flink 社區(qū)的部分現(xiàn)有框架還存在一些沖突聂受,相關(guān)沖突預(yù)計將在 Flink 2.0 支持 Generalized Watermark 機制后自然解決。因此烤镐,目前我們先在阿里云的商業(yè)化版本中實現(xiàn)了這些優(yōu)化饺饭,待 Flink 2.0 支持相應(yīng)的基礎(chǔ)設(shè)施后,我們會將這些優(yōu)化推向 Flink社區(qū)职车。
3.未來展望
未來,我們將進一步推進以下幾個方面的工作:
(1)在 Flink 2.0 中推向社區(qū)的優(yōu)化:我們計劃將關(guān)于 isInsertOnly 的優(yōu)化推向 Flink 社區(qū)鹊杖。這些優(yōu)化已經(jīng)在阿里云商業(yè)化版本中完成悴灵,并預(yù)計將在 Flink 2.0 支持 Generalized Watermark 機制后逐步推向社區(qū)。
(2)動態(tài)修改 Flink 的算子流程結(jié)構(gòu)(DAG 圖):我們將探索根據(jù)用戶在不同階段的需求(流模式或批模式)動態(tài)修改 Flink 的算子流程結(jié)構(gòu)(DAG 圖)的支持骂蓖,以更好地適應(yīng)不同的應(yīng)用場景积瞒。
(3)改進量化指標(biāo)的切換機制:目前對于 isProcessingBacklog/isInsertOnly 信號,我們只支持一次性的切換登下,即對于需要追溯歷史數(shù)據(jù)或同步全量數(shù)據(jù)的作業(yè)茫孔,目前支持在初始化階段 isProcessingBacklog/isInsertOnly 設(shè)置為 true,在追上實時或增量數(shù)據(jù)后一次性切換為 false被芳。在未來缰贝,我們希望即使在量化指標(biāo)切換為 false 后,F(xiàn)link作業(yè)仍然能夠根據(jù)實際過程中偶爾出現(xiàn)的數(shù)據(jù)積壓情況畔濒,再從false切換回true剩晴,重新應(yīng)用批處理的優(yōu)化。這將使系統(tǒng)更具靈活性和適應(yīng)性侵状。
以上就是我們未來的一些優(yōu)化思路赞弥。歡迎大家加入阿里云的開源大數(shù)據(jù)團隊,共同推動技術(shù)進步和創(chuàng)新趣兄。