摘要:本文由阿里云 Flink 團(tuán)隊(duì)蘇軒楠老師撰寫窝剖,旨在向 Flink 用戶整體介紹 Flink 流批一體的技術(shù)和挑戰(zhàn)。內(nèi)容主要分為以下三個(gè)部分:
流批一體技術(shù)簡介
面臨的挑戰(zhàn)
總結(jié)
Flink 的流批一體概念相信大家并不陌生梭域,盡管流批一體被廣泛討論斑举,但在很多使用者心中,F(xiàn)link 更多的是作為流計(jì)算引擎的事實(shí)標(biāo)準(zhǔn)病涨,常有朋友提到 Flink懂昂,也是更熟悉其在流計(jì)算場景的使用。今天没宾,本文旨在全面向讀者介紹 Flink 的流批一體技術(shù)及其所面臨的技術(shù)挑戰(zhàn)。在未來的分享中沸柔,我們將進(jìn)一步探討流批一體在不同場景下的應(yīng)用循衰,以及通過公開渠道收集到的一些企業(yè)使用 Flink 流批一體功能的落地情況。
一褐澎、流批一體技術(shù)簡介
在這一章節(jié)我們會簡要地介紹一下流批一體相關(guān)的技術(shù)会钝,讓大家了解流批一體的整體架構(gòu)以及架構(gòu)中各個(gè)組件的技術(shù)選擇。
1. 流批一體的架構(gòu)演進(jìn)
首先我們來看看工三,流批一體的架構(gòu)是怎樣演進(jìn)而來的迁酸。
(1)Lambda 架構(gòu)
Lambda 架構(gòu)通常是在原有的離線計(jì)算中發(fā)展而來的。在離線計(jì)算的鏈路(批處理層)上俭正,為了滿足業(yè)務(wù)數(shù)據(jù)實(shí)時(shí)性的要求奸鬓,會在離線鏈路的基礎(chǔ)上再增加一條實(shí)時(shí)計(jì)算鏈路(速度處理層)。最后掸读,在對外提供服務(wù)的時(shí)候串远,會合并批處理層和速度層的視圖(服務(wù)層)。雖然 Lambda 架構(gòu)可以在不改變原有離線計(jì)算架構(gòu)的基礎(chǔ)上儿惫,同時(shí)享受到離線和實(shí)時(shí)計(jì)算帶來的好處澡罚,但是它在使用的過程中存在著以下的問題:
- 維護(hù)離線和實(shí)時(shí)兩套系統(tǒng)的運(yùn)維成本成本高
- 需要為離線和實(shí)時(shí)開發(fā)兩套代碼,學(xué)習(xí)成本和開發(fā)成本高
- 離線和實(shí)時(shí)的計(jì)算引擎不同肾请,數(shù)據(jù)一致性難以保證
- 使用存儲格式不同留搔,數(shù)據(jù)管理更加復(fù)雜
(2)Kappa 架構(gòu)
Kappa 架構(gòu)相當(dāng)于在 Lambda 架構(gòu)上去掉了批處理層(Batch Layer),只留下單獨(dú)的流處理層(Speed Layer)铛铁。通過消息隊(duì)列的數(shù)據(jù)保留功能隔显,來實(shí)現(xiàn)上游重放(回溯)能力。Kappa 架構(gòu)解決了上面提到的 Lambda 架構(gòu)所面臨的問題避归,但是在這個(gè)架構(gòu)下荣月,就無法享受到離線計(jì)算帶來的好處了。比如通過流計(jì)算來回溯的性能會比使用批計(jì)算的性能要差梳毙。而且對于沒有實(shí)時(shí)需求的作業(yè)彭雾,使用 Kappa 架構(gòu)的流計(jì)算會造成不必要的資源浪費(fèi)验烧。
(3)流批一體架構(gòu)
通過使用流批一體的計(jì)算引擎和流批一體的存儲格式傲绣,我們可以很好地解決 Lambda 和 Kappa 架構(gòu)中存在的問題祈秕。
- 在流批一體的架構(gòu)中,我們使用流批一體的計(jì)算引擎可以避免維護(hù)兩套系統(tǒng)的運(yùn)維成本钦幔。
- 使用相同的流批一體的存儲格式,可以避免分別為離線鏈路和實(shí)時(shí)鏈路使用兩套不同的存儲,減少了存儲鏈路的冗余和成本婴程。
- 用戶只需要寫一套代碼就能同時(shí)用于實(shí)時(shí)計(jì)算和離線計(jì)算,大大降低了用戶的學(xué)習(xí)成本和開發(fā)成本抱婉。同時(shí)档叔,使用統(tǒng)一的計(jì)算引擎,統(tǒng)一的代碼可以更好地保證數(shù)據(jù)的一致性蒸绩。
介紹完流批一體的架構(gòu)之后衙四,讓我們來看一下流批一體架構(gòu)中最重要的兩個(gè)組件:計(jì)算引擎和存儲。
2. 流批一體的計(jì)算引擎
Apache Flink 從設(shè)計(jì)之初就提出了“批處理是流處理的特殊情況”患亿。用戶可以使用 DataStream API 和 Flink 的 SQL API 來同時(shí)定義流作業(yè)和批作業(yè)传蹈。Flink 在流批一體這個(gè)方向上已經(jīng)做了非常多的工作來提高用戶的體驗(yàn),作業(yè)的穩(wěn)定性和性能步藕,目前 Flink Batch 已經(jīng)在很多公司的生產(chǎn)環(huán)境上落地惦界。Flink 社區(qū)未來也會持續(xù)投入 Flink 流批一體的發(fā)展。
Apache Spark 也是最早提出流批一體理念的計(jì)算引擎之一咙冗,可以用作流批一體計(jì)算引擎沾歪。與 Flink 不同的是,它的流計(jì)算是基于微批(mini-batch)來實(shí)現(xiàn)的乞娄,在流計(jì)算語義的支持和端到端延遲上會差一些瞬逊,面對復(fù)雜、大規(guī)模實(shí)時(shí)計(jì)算場景的極致需求可能會力不從心仪或。Apache Spark 雖然也在探索使用 Continuous Processing 來支持流計(jì)算确镊,降低延遲,但目前還是屬于實(shí)驗(yàn)階段 [3]范删,并且經(jīng)過筆者的觀察 Continuous Processing 的投入一直不大蕾域,2021 年之后就幾乎停滯了。
3. 流批一體的湖表格式
在流批一體這個(gè)大場景下到旦,計(jì)算引擎只是其中的一環(huán)旨巷,流批一體的存儲格式更是不可或缺的一部分。Flink 在流批一體的存儲格式上做了許多探索添忘,對接了多個(gè)不同的存儲格式采呐,目前在開源社區(qū)主流的支持 Flink 流批一體的存儲格式有下面這些:
Apache Paimon 是流批一體的湖存儲格式「槠铮可以使用 Flink CDC 來一鍵入湖到 Paimon 中斧吐,也可以通過 Flink SQL 或 Spark SQL 來批寫又固、流寫到 Paimon 當(dāng)中。Paimon 也可以被 Flink 或 Spark 流讀煤率,這也是它作為流式數(shù)據(jù)湖的特有能力之一仰冠。它有著強(qiáng)大的流讀流寫支持,給流式湖存儲帶來僅 1-5 分鐘的延遲 [4]蝶糯。
Apache Hudi 原生支持多引擎洋只,因此既可以對批流進(jìn)行讀寫消費(fèi),也可以使用Presto進(jìn)行交互式分析 [5]昼捍。Flink 接入之后识虚,把 Hudi 的時(shí)延可以達(dá)到十分鐘級[4]。
Apache Iceberg 早在 2020 年妒茬,阿里云就試圖把 Flink 融入 Iceberg 中舷礼,在 Iceberg 中做了很多 Flink 的集成。在把 Flink 融入 Iceberg 后郊闯,Iceberg 就有了 Flink 流讀流寫的力。目前 Flink 寫入 Iceberg蛛株,并不能太實(shí)時(shí)团赁,因此更推薦在 1 小時(shí)左右的更新 SLA 保障[4]。
4. 流批一體的數(shù)倉
除了流批一體的湖表格式谨履,還有流批一體的數(shù)倉也可以作為流批一體的存儲欢摄,例如開源的 Starrocks, Clickhouse, Apache Druid 等。還有類似阿里云的商業(yè)化產(chǎn)品 Hologres 也可以作為流批一體的數(shù)倉笋粟。但是數(shù)倉的存儲成本比湖存儲更高怀挠,我們又看到一些的做法是把數(shù)據(jù)通過湖表格式寫入到數(shù)據(jù)湖中,然后通過數(shù)倉來分析數(shù)據(jù)湖中的數(shù)據(jù)做 OLAP 分析害捕,例如 Startrocks + Paimon[6], Hologres + Paimon [7] 等湖倉一體方案绿淋。
二、面臨的挑戰(zhàn)
在大家使用 Flink Batch 流批一體實(shí)踐的過程中尝盼,難免會遇到各種各樣的問題和挑戰(zhàn)吞滞。Flink 社區(qū)也在積極地解決大家在使用過程中遇到的問題,對 Flink 的批作業(yè)能力進(jìn)行打磨盾沫,使Flink 的流批引擎的能力逐步地完善裁赠。下面介紹了近些年來,大家在 Flink 流批一體實(shí)踐中遇到的挑戰(zhàn)赴精,以及社區(qū)的解決方案佩捞。
1. 流批 Shuffle 差異
Flink 流作業(yè)的 shuffle 與 批作業(yè)的 shuffle 通常是不一樣的,在流作業(yè)的情況下使用的是 Pipeline Shuffle蕾哟,Pipeline Shuffle 的數(shù)據(jù)是不用落盤的一忱,但是這要求作業(yè)啟動的時(shí)候所有的算子都要啟動起來莲蜘,這與常見的 Batch 作業(yè)調(diào)度需求不匹配。因此掀潮,在批模式下菇夸,通常都是使用 Blocking Shuffle,這樣上游 task 會把 Shuffle 數(shù)據(jù)寫到離線文件中仪吧,等下游 task 啟動以后庄新,再來消費(fèi) Shuffle 的數(shù)據(jù)。
Flink 內(nèi)部默認(rèn)的實(shí)現(xiàn)是使用 Internal Shuffle薯鼠, 是把上游計(jì)算節(jié)點(diǎn)數(shù)據(jù)寫到 TaskManager 本地盤择诈,下游節(jié)點(diǎn)連接到上游 TaskManager 上讀取 Shuffle 文件。這會導(dǎo)致 TaskManager 計(jì)算工作完成以后出皇,不能立刻退出羞芍,要等下游消費(fèi)完 Shuffle 文件后才能釋放掉。這樣不僅造成了資源浪費(fèi)郊艘,而且容錯(cuò)代價(jià)大荷科。
因此,F(xiàn)link 社區(qū)在已開始設(shè)計(jì) Shuffle Service 的時(shí)候就把他作為一個(gè) pluggable [8]纱注,以便于用戶能夠方便的拓展來實(shí)現(xiàn) Remote Shuffle Service畏浆。Remote Shuffle Service 通過單獨(dú)的集群提供數(shù)據(jù)的 Shuffle 服務(wù),可以避免 TaskManager Shuffle 的資源利用率低和容錯(cuò)開銷大的問題狞贱。目前 Apache Celeborn[9] 支持作為 Flink 的 remote shuffle.
同時(shí)社區(qū)也在推動 Shuffle 3.0 中提出了 Hybrid Shuffle刻获,Hybrid Shuffle 將流式 Pipeline Shuffle 跟批式 Blocking Shuffle 的特點(diǎn)結(jié)合在一起,讓用戶在寫數(shù)據(jù)時(shí)瞎嬉,既可以寫入內(nèi)存通過內(nèi)存直接進(jìn)行消費(fèi)蝎毡,也可以在內(nèi)存中存放不下這么多數(shù)據(jù)、下游消費(fèi)不夠及時(shí)的時(shí)候氧枣,將數(shù)據(jù)寫入到磁盤當(dāng)中進(jìn)行后期消費(fèi)沐兵。通過自適應(yīng)切換,在上游產(chǎn)出數(shù)據(jù)的過程中和完成后便监,下游可以隨時(shí)消費(fèi)痒筒,從而徹底消除資源碎片的情況 [10]。
2. Batch 性能
Flink batch 作業(yè)的性能也是用戶在使用 Flink batch 中最關(guān)心的問題之一茬贵,F(xiàn)link batch 在作業(yè)性能優(yōu)化上面做了非常多的改進(jìn)簿透,使得 Flink batch 在 TPC-DS benchmark 上的表現(xiàn)在每個(gè)版本都有很大的提升。例如使用 Operator Fusion Codegen [11] 來優(yōu)化 SQL planner 生成的代碼解藻,通過 adaptive local hash aggregate[12] 來動態(tài)決定是否使用 local aggregation老充,通過 runtime filter 和 dynamic data prune 來優(yōu)化數(shù)據(jù)處理的效率,實(shí)現(xiàn) Adaptive Execution Plan(AQE) 做了自動并發(fā)推斷螟左,動態(tài)負(fù)載均衡等[13]啡浊。
3. 慢節(jié)點(diǎn)問題
在一個(gè)分布式系統(tǒng)里觅够,因?yàn)閭€(gè)別的機(jī)器故障、資源緊張或者是網(wǎng)絡(luò)問題巷嚣,可能導(dǎo)致單個(gè)并發(fā)的性能下降喘先,這些慢的節(jié)點(diǎn)可能成為整個(gè)作業(yè)的瓶頸。和傳統(tǒng) MapReduce廷粒、Spark 的思路類似窘拯,F(xiàn)link 1.17 版本引入了推測執(zhí)行來解決慢節(jié)點(diǎn)的問題 [14]。當(dāng)檢測到長尾任務(wù)后坝茎,在非熱的機(jī)器上部署長尾任務(wù)的鏡像實(shí)例涤姊。哪個(gè)先執(zhí)行完就用哪個(gè)結(jié)果,并把其他的鏡像任務(wù)取消掉嗤放。
4. 并發(fā)配置易用性
在 Flink Batch 作業(yè)中思喊,為作業(yè)節(jié)點(diǎn)設(shè)置適當(dāng)?shù)牟⑿行圆⒉皇且患菀椎氖虑椤T谂鳂I(yè)中次酌,并行度設(shè)置得太小可能導(dǎo)致太長的執(zhí)行時(shí)間和 Failover 發(fā)生時(shí)大量的回退恨课。相反,并行度設(shè)置得過大岳服,則可能導(dǎo)致資源浪費(fèi)和任務(wù)部署和網(wǎng)絡(luò) Shuffle 更多的成本開銷庄呈。為了解決這個(gè)問題,F(xiàn)link 在 1.15 中引入了 Adaptive Batch Scheduler [15]派阱,這能讓 Flink 根據(jù)消費(fèi)的數(shù)據(jù)量大小來自動決定作業(yè)節(jié)點(diǎn)的并發(fā)度,免去了用戶需要手動調(diào)整作業(yè)并發(fā)度的煩惱斜纪。
5. Hive SQL 兼容性
由于 Flink SQL 使用的是標(biāo)準(zhǔn)的 ANSI SQL贫母,并且 Hive SQL 與 ANSI SQL 語法差異較多。不少用戶在遷移 Hive SQL 到 Flink SQL 上的時(shí)候會遇到不少的阻礙盒刚。雖然 Flink SQL 本身提供了 Hive Dialect [15]腺劣,但是在 Flink 1.15 版本,距離完全兼容 Hive SQL 仍然有不小的差距因块。比如橘原,快手在選定了一批準(zhǔn)備遷移的作業(yè)后,通過解析驗(yàn)證涡上,就發(fā)現(xiàn)了諸多不支持的語法趾断。在快手給出 input 后,社區(qū)第一優(yōu)先級做出了支持吩愧。如上圖所示芋酌,我們列出了比較重要且很常用的一些語法,比如 CTAS雁佳、ADD JAR脐帝、USING JAR同云、宏命令、Transform 等堵腹。
Flink 社區(qū)在 1.16 版本做了大量工作, 包括 CTAS [16]炸站、ADD JAR、USING JAR [17] 等等疚顷,來補(bǔ)全 Hive 語法旱易。經(jīng)過 qtest 測試,整體兼容度能達(dá)到 95%荡含,基本能保證用戶現(xiàn)有的 Query 都能遷到 Flink 上來咒唆。
三、總結(jié)
流批一體是 Flink 非常重要的一個(gè)發(fā)展方向之一释液,眾多用戶都在使用的過程中給予了 Flink 社區(qū)寶貴的反饋全释。隨著更多的開發(fā)者加入了開源社區(qū)的開發(fā)工作,這讓 Flink Batch 的能力不斷得到打磨和發(fā)展误债。在社區(qū)成員的努力下浸船,很多用戶已經(jīng)可以十分順利地把 Flink 流批一體在他們的生產(chǎn)環(huán)境中落地。我們會在后期分享中寝蹈,給大家介紹一些流批一體的主要落地場景李命,以及我們在公開渠道收集到的各個(gè)公司使用流批一體的落地情況。
雖然箫老,F(xiàn)link 流批一體已經(jīng)達(dá)到生產(chǎn)可用的狀態(tài)封字,但是社區(qū)也看到仍然有不少需要繼續(xù)投入的地方,例如繼續(xù)完善 DataStream API batch 的能力耍鬓,使其能夠完全與 DataSet API 能力對齊阔籽;更加深入地與 Apache Celeborn 結(jié)合,結(jié)合 Flink 特點(diǎn)實(shí)現(xiàn)動態(tài)切換 Shuffle 的機(jī)制牲蜀,多級存儲引入內(nèi)存笆制、支持 Flink Hybird Shuffle 等;更加深入地與 Apache Paimon 對接涣达,整合流批一體引擎和存儲的能力在辆,讓用戶能夠更加簡單地使用 Apache Flink + Apache Paimon 搭建流批一體的數(shù)據(jù)湖倉。
[1] https://www.oreilly.com/radar/questioning-the-lambda-architecture/
[2] https://www.bilibili.com/video/BV1164y1o7yc/
[4] https://flink-learning.org.cn/article/detail/d17cc1d2a06946b40c51d4301df6e540
[6] https://flink-learning.org.cn/article/detail/02a574303b7e65fd53e13a82b40a8d8f
[7] https://flink-learning.org.cn/article/detail/84f501725034542a7f41e0670645c714
[8] https://cwiki.apache.org/confluence/display/FLINK/FLIP-31%3A+Pluggable+Shuffle+Service
[9] https://celeborn.apache.org/
[10] https://flink-learning.org.cn/article/detail/f6449048654123b163e29917e8ad5a79
[12] https://issues.apache.org/jira/browse/FLINK-30542
[13] https://developer.aliyun.com/ebook/8229/115382
[14] https://cwiki.apache.org/confluence/display/FLINK/FLIP-168%3A+Speculative+Execution+for+Batch+Job
[15] https://cwiki.apache.org/confluence/display/FLINK/FLIP-152
[16] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199541185
[17] https://cwiki.apache.org/confluence/display/FLINK/FLIP-214
歡迎大家加入 Flink Batch 交流釘釘群度苔。本群旨在為 Flink Batch 愛好者提供一個(gè)交流技術(shù)和傳遞資訊的平臺匆篓,在這里:
- 你可以掌握Flink Batch前沿的資訊,可以與 Flink 開發(fā)者及 Committer 面對面交流
- Flink Batch 的問題集中解決寇窑,各位開發(fā)者及 Committer 及時(shí)解決你的 Blocker
“Flink Batch 交流群”群的釘釘群號: 34817520