小米基于 Flink 的實(shí)時(shí)數(shù)倉(cāng)建設(shè)實(shí)踐

摘要:本文整理自小米軟件開(kāi)發(fā)工程師周超饱岸,在 Flink Forward Asia 2022 平臺(tái)建設(shè)專場(chǎng)的分享肖粮。本篇內(nèi)容主要分為四個(gè)部分:

  1. 小米數(shù)倉(cāng)架構(gòu)演變

  2. Flink+Iceberg 架構(gòu)升級(jí)實(shí)踐

  3. 流批一體實(shí)時(shí)數(shù)倉(cāng)探索

  4. 未來(lái)展望

點(diǎn)擊查看原文視頻 & 演講PPT

開(kāi)發(fā)者社區(qū).jpg

一潮太、小米數(shù)倉(cāng)架構(gòu)演變

1.1 數(shù)倉(cāng)架構(gòu)現(xiàn)狀

1.jpg

在介紹演變前医增,我們先來(lái)了解下小米當(dāng)前的技術(shù)現(xiàn)狀蜀漆。

上圖展示的是小米目前的技術(shù)架構(gòu)谅河,在存儲(chǔ)側(cè)我們主要應(yīng)用數(shù)據(jù)湖 Iceberg 和自研消息隊(duì)列 Talos,計(jì)算層主要應(yīng)用 Flink 和 Spark确丢,他們統(tǒng)一運(yùn)行在 Yarn 上绷耍,統(tǒng)一通過(guò) Metacat 獲取元數(shù)據(jù)信息,并通過(guò) Ranger 來(lái)進(jìn)行統(tǒng)一的鑒權(quán)服務(wù)鲜侥。我們內(nèi)部使用 Spark 和 Presto 來(lái)支撐 OLAP 查詢場(chǎng)景褂始,并通過(guò) Kyuubi 來(lái)實(shí)現(xiàn)路由。

在實(shí)時(shí)數(shù)倉(cāng)場(chǎng)景中描函,我們選擇 Flink 作為計(jì)算底座崎苗,Hive、Talos舀寓、Iceberg 作為存儲(chǔ)底座胆数,其中,消息隊(duì)列 Talos 作為傳統(tǒng) Lambda 架構(gòu)的通用選擇互墓,在我們內(nèi)部占比較大且很穩(wěn)定必尼,Iceberg 作為一款優(yōu)秀的湖存儲(chǔ),兼具時(shí)效性和低成本篡撵,其使用占比也在逐步提升判莉,使用到 Iceberg 的 Flink 作業(yè)在總占比中已經(jīng)達(dá)到近 50%。

2.jpg

我們對(duì)內(nèi)部實(shí)時(shí)鏈路進(jìn)行了統(tǒng)計(jì)酸休,Iceberg 在大多數(shù)場(chǎng)景下已經(jīng)對(duì) Hive 進(jìn)行了替換骂租,對(duì)分鐘級(jí)的實(shí)時(shí)鏈路進(jìn)行了較好的支撐;因?yàn)槭褂?Iceberg 搭建的實(shí)時(shí)鏈路目前僅能達(dá)到分鐘級(jí)的時(shí)效斑司,消息隊(duì)列仍有著較高占比。

1.2 數(shù)倉(cāng)架構(gòu)演變

3.jpg

接下來(lái)看下小米內(nèi)部數(shù)倉(cāng)架構(gòu)的演變歷程但汞。

在引入數(shù)據(jù)湖前宿刮,針對(duì)日志埋點(diǎn)這樣的聚合計(jì)算場(chǎng)景,業(yè)務(wù)會(huì)使用離線計(jì)算來(lái)搭建鏈路私蕾,采集模塊會(huì)將日志或埋點(diǎn)數(shù)據(jù)統(tǒng)一收集到消息隊(duì)列中僵缺,F(xiàn)link 消費(fèi)消息隊(duì)列中的數(shù)據(jù)實(shí)時(shí)寫入 ODS 層 Hive 表,下游的計(jì)算則采用 Spark 或者 Hive 按小時(shí)或天進(jìn)行清洗踩叭、聚合磕潮。顯然翠胰,這樣的鏈路處理延遲和成本都較高,這些離線作業(yè)往往都在凌晨進(jìn)行調(diào)度自脯,給整個(gè)集群帶來(lái)較大壓力之景。

4.jpg

針對(duì) CDC 數(shù)據(jù)源,實(shí)時(shí)數(shù)據(jù)通常會(huì)通過(guò)消息隊(duì)列進(jìn)行流轉(zhuǎn)膏潮,保證處理的實(shí)時(shí)性锻狗,數(shù)據(jù)在消息隊(duì)列中以 Changelog-Json 的格式進(jìn)行存儲(chǔ)。但為了保證計(jì)算的準(zhǔn)確性焕参,業(yè)務(wù)鏈路通常會(huì)使用 Lambda 架構(gòu)來(lái)搭建轻纪,會(huì)額外引入一條離線鏈路。離線鏈路基于 Hive 或 Kudu 構(gòu)建叠纷,ODS 層使用 Spark Streaming 近實(shí)時(shí)導(dǎo)入刻帚,部分場(chǎng)景也會(huì)定期全量導(dǎo)入,下游計(jì)算依賴 Spark 做定時(shí)調(diào)度涩嚣。顯然我擂,這樣的架構(gòu)開(kāi)發(fā)和維護(hù)的成本都會(huì)很高。

5.jpg

帶著上面的問(wèn)題缓艳,我們想要對(duì)批和流鏈路進(jìn)行統(tǒng)一校摩,并能夠滿足低成本和低延遲,為此我們引入了 Iceberg阶淘,在引入 Iceberg 初期衙吩,小米內(nèi)部的使用以 v1 表為主(v1 表是數(shù)據(jù)分析表,支持 Append 類型數(shù)據(jù)的增量讀寫)溪窒。因?yàn)?Flink 舊架構(gòu)(1.12 版本)讀取 Iceberg 的數(shù)據(jù)時(shí)效性不高坤塞,所以在日志埋點(diǎn)場(chǎng)景的應(yīng)用主要是替換了 Hive,使用 Iceberg 來(lái)存儲(chǔ) ODS澈蚌、DWD 層數(shù)據(jù)摹芙,可以降低存儲(chǔ)成本,同時(shí)配合 Spark宛瞄、Presto 可以獲得更快的查詢速度浮禾。

6.jpg

針對(duì) CDC 數(shù)據(jù)源的場(chǎng)景,在初期也同樣以替換 Hive 為主以獲取更低的成本份汗。

7.jpg

在中期盈电,我們對(duì) Iceberg v2 表不斷完善,v2 表在 v1 表的基礎(chǔ)上支持了行級(jí)別的更新和刪除杯活,同時(shí)也支持了 Merge on read 模式匆帚,并且有著不錯(cuò)的性能。業(yè)務(wù)的實(shí)時(shí)鏈路也可以完全依賴 Flink 和 Iceberg 來(lái)進(jìn)行搭建旁钧。之前的日志埋點(diǎn)鏈路通過(guò) Iceberg v2 表的升級(jí)后吸重,使用 Flink+Iceberg v2 替換了原先的 Spark + Iceberg v1互拾,將鏈路時(shí)效性由小時(shí)級(jí)提升至分鐘級(jí)。

8.jpg

由于 v2 表能夠支持行級(jí)別的更新嚎幸,而且數(shù)據(jù)實(shí)時(shí)可查颜矿,原本針對(duì) CDC 數(shù)據(jù)源的 Lambda 架構(gòu)鏈路可以升級(jí)到 Kappa 架構(gòu),由 Flink 和 Iceberg v2 表來(lái)構(gòu)建鞭铆,兼顧時(shí)效性和成本或衡,依賴 Parquet+ZSTD 壓縮,存儲(chǔ)成本相比于原先 Parquet+snappy 能夠節(jié)省 30%车遂。

1.3 當(dāng)前架構(gòu)遇到的問(wèn)題

9.jpg

經(jīng)過(guò)我們一段時(shí)間的使用封断,我們發(fā)現(xiàn)目前 Iceberg 能夠很好地兼顧成本、查詢效率舶担,社區(qū)的很多優(yōu)化也以離線為主坡疼,但在實(shí)時(shí)中存在著時(shí)效性和穩(wěn)定性方面的不足,距離消息隊(duì)列仍有差距衣陶,同時(shí)柄瑰,Iceberg 作為統(tǒng)一的存儲(chǔ) Format,在實(shí)際消費(fèi)時(shí)需要讀取底層文件剪况,而 v2 表有著多種文件類型教沾,讀取時(shí)需要組織 DataFile 和兩類 DleteFile(Equlity delete 和 Position delete)的關(guān)系,邏輯較為復(fù)雜译断。

我們?cè)诨?Flink+Iceberg 的實(shí)時(shí)鏈路構(gòu)建中授翻,經(jīng)常會(huì)遇到以下兩類問(wèn)題:

  • 鏈路實(shí)時(shí)性略差,相比于消息隊(duì)列仍有差距孙咪,目前僅能夠穩(wěn)定在 15 分鐘左右堪唐。

  • 鏈路穩(wěn)定性略差,經(jīng)常因?yàn)殒溌分幸粡埍淼南M(fèi)積壓導(dǎo)致作業(yè)失敗翎蹈,從而使得整體鏈路不可用淮菠,嚴(yán)重依賴人工干預(yù)。

二荤堪、Flink+Iceberg 架構(gòu)升級(jí)實(shí)踐

2.1 基于 Flink1.12 的舊架構(gòu)實(shí)現(xiàn)

10.jpg

針對(duì)上述的兩個(gè)問(wèn)題合陵,我們對(duì) Flink+Iceberg 的架構(gòu)進(jìn)行了升級(jí)。

上圖中的實(shí)時(shí)數(shù)倉(cāng)鏈路由多張 Iceberg 表和多個(gè) Flink 作業(yè)組成逞力,其中 Iceberg 負(fù)責(zé)數(shù)據(jù)的存儲(chǔ)曙寡,F(xiàn)link 負(fù)責(zé)數(shù)據(jù)的清洗、流轉(zhuǎn)寇荧,顯然對(duì)一條鏈路的實(shí)時(shí)性和穩(wěn)定性支撐,F(xiàn)link 起了關(guān)鍵作用烈评。在一個(gè) Flink 流式作業(yè)中琳轿,數(shù)據(jù)會(huì)經(jīng)過(guò)讀取、計(jì)算硕糊、寫入峦嗤,在實(shí)際場(chǎng)景中蕊唐,我們發(fā)現(xiàn)數(shù)據(jù)的讀取效率低,嚴(yán)重影響了作業(yè)吞吐烁设,后續(xù)的相關(guān)優(yōu)化也主要圍繞讀取部分展開(kāi)替梨。

11.jpg

在介紹優(yōu)化之前,先來(lái)了解下讀取架構(gòu)的現(xiàn)狀装黑。

在優(yōu)化前副瀑,我們的 Flink+Iceberg 實(shí)時(shí)鏈路主要依托于 Flink 1.12 版本構(gòu)建,在 1.12 版本中恋谭,讀取邏輯被拆分為 Monitor 和 Reader 兩個(gè)算子糠睡,在進(jìn)行增量消費(fèi)時(shí),Monitor 算子掃描 Snapshot 中的文件疚颊,并組織成 Split 發(fā)往下游給 Reader 算子消費(fèi)狈孔。這樣的架構(gòu)做到了很好的掃描和讀取邏輯分離,但是仍有幾點(diǎn)重要缺陷材义,例如:Split 信息在 Jobmanager 和 TaskManager 之間單向同步均抽、單一的時(shí)間驅(qū)動(dòng)掃描、以及我們?yōu)榱吮WC順序性而限制了讀取并發(fā)為 1其掂,這些點(diǎn)一起影響著消費(fèi)速度油挥。

2.2 舊架構(gòu)遇到的主要問(wèn)題

12.jpg

這樣的缺陷在實(shí)際作業(yè)中會(huì)有實(shí)時(shí)性和穩(wěn)定性兩大問(wèn)題表現(xiàn)。在實(shí)時(shí)性方面清寇,存在著消費(fèi)速度慢喘漏、消費(fèi)存在波動(dòng);在穩(wěn)定性方面华烟,存在著 Task OOM翩迈,Checkpoint 容易超時(shí)。

13.jpg

為了保證拓?fù)涞挠邢驘o(wú)環(huán)盔夜,數(shù)據(jù)在上下游算子之間只能單向流動(dòng)负饲,這導(dǎo)致掃描進(jìn)度和讀取進(jìn)度只能夠單向同步,Monitor 算子感知不到 Reader 算子的消費(fèi)情況喂链;在這種單向同步的機(jī)制下返十,Monitor 會(huì)在一定周期內(nèi)下發(fā)固定的 Split 數(shù),如果在一個(gè)周期內(nèi)發(fā)送 Split 的數(shù)量較少椭微,那么 Reader 算子會(huì)有部分時(shí)間處于空閑狀態(tài)洞坑,導(dǎo)致消費(fèi)存在波動(dòng),存在資源浪費(fèi)蝇率。 而如果一個(gè)周期內(nèi)下發(fā)的 Split 超過(guò)了 Reader 的消費(fèi)能力迟杂,那么 Split 就會(huì)在 Reader 側(cè)堆積刽沾,占用額外的堆內(nèi)存。

同時(shí)固定的掃描間隔也會(huì)導(dǎo)致消費(fèi)的延遲排拷,新數(shù)據(jù)需要等待一定掃描間隔后才可能被消費(fèi)到侧漓。而單并發(fā)消費(fèi)又限制了作業(yè)的吞吐上限,以上這三點(diǎn)一起影響著實(shí)時(shí)性监氢。

14.jpg

這樣的機(jī)制不僅影響著實(shí)時(shí)性布蔗,對(duì)穩(wěn)定性也有不小的影響。Monitor 和 Reader 的單向同步機(jī)制浪腐,使得消費(fèi)需要指定間隔和間隔內(nèi)下發(fā)的 Split 數(shù)纵揍,未消費(fèi)完的 Split 會(huì)存儲(chǔ)在堆內(nèi)存中,積壓較多會(huì)導(dǎo)致 OOM牛欢、Full gc 頻繁骡男,Task 吞吐降低。

同時(shí)傍睹,舊架構(gòu)的 SourceFunction 在實(shí)現(xiàn)數(shù)據(jù)下發(fā)時(shí)需要持有 Checkpoint 鎖從而保證數(shù)據(jù)下發(fā)和狀態(tài)更新的一致隔盛,而 Reader 算子 Checkpoint 粒度僅細(xì)化到 Split 級(jí)別,所以 Reader 算子需要長(zhǎng)時(shí)間去持有 Checkpoint 鎖拾稳,只有消費(fèi)完一個(gè) Split 后才會(huì)釋放吮炕,這在下游處理慢,反壓情況下是致命的缺陷访得,很容易導(dǎo)致 Checkpoint 超時(shí)龙亲。這些點(diǎn)一同促使著作業(yè)穩(wěn)定性的降低。

2.3 基于 Flink1.14 的新架構(gòu)實(shí)現(xiàn)

15.jpg

為了解決上述實(shí)時(shí)性和穩(wěn)定性問(wèn)題悍抑,我們?cè)谏鐓^(qū)基于 FLIP-27 的改動(dòng)上改進(jìn)了讀取邏輯鳄炉,主要涵蓋了上圖右側(cè)的七點(diǎn),其中雙向通信搜骡,Monitor 邏輯移至 JobManager 是 FLIP-27 的關(guān)鍵優(yōu)化點(diǎn)拂盯。我們內(nèi)部主要對(duì)后面的五點(diǎn)進(jìn)行了優(yōu)化,分別是 Snapshot 的依次掃描记靡、自適應(yīng)的掃描模式谈竿、分區(qū)多并發(fā)消費(fèi)等。

16.jpg

增量消費(fèi) Iceberg 存在著兩種方式摸吠,分別為依次掃描 Snapshot 和合并多個(gè) Snapshot 掃描空凸。在合并多個(gè) Snapshot 的掃描模式中,需要依賴 Merge on read 模式寸痢,用后續(xù) Snapshot 中的 Delete 文件對(duì)當(dāng)前 Snapshot 中的 Data 文件數(shù)據(jù)進(jìn)行過(guò)濾呀洲。如果合并多個(gè) Snapshot 進(jìn)行消費(fèi),那么一個(gè) DataFile 可能會(huì)關(guān)聯(lián)到很多后續(xù) Snapshot 的 DeleteFile,使得 Split 的組織變得復(fù)雜两嘴,同時(shí) Reader 算子在使用 DeleteFile 過(guò)濾 DataFile 時(shí)丛楚,需要將 Equlity delete file 全部讀取到內(nèi)存中族壳,這也很容易導(dǎo)致 Task 產(chǎn)生內(nèi)存問(wèn)題憔辫。

我們默認(rèn)將掃描模式設(shè)置為了依次掃描,該模式可以更好地追蹤數(shù)據(jù)變化仿荆,并且降低文件組織復(fù)雜度贰您,避免了在合并多個(gè) Snapshot 模式中因?yàn)?Delete 文件較大而產(chǎn)生的內(nèi)存問(wèn)題,對(duì)穩(wěn)定性更加友好拢操。

17.jpg

舊架構(gòu)中锦亦,掃描邏輯主要由時(shí)間驅(qū)動(dòng),定時(shí)觸發(fā)令境,在新架構(gòu)中杠园,我們引入了自適應(yīng)的掃描模式,增加了事件驅(qū)動(dòng)舔庶,解決了消費(fèi)波動(dòng)和 Task 潛在的內(nèi)存問(wèn)題抛蚁。在實(shí)際掃描過(guò)程中,動(dòng)態(tài) Enumerator 會(huì)根據(jù)內(nèi)存中 Buffer 的反饋進(jìn)行決策惕橙,小于閾值就立刻執(zhí)行掃描操作瞧甩,保證 Reader 能夠連續(xù)消費(fèi),大于閾值就阻塞掃描弥鹦,避免將更多的 Split 緩存在內(nèi)存中肚逸。

18.jpg

在新架構(gòu)中,我們針對(duì) v2 表實(shí)現(xiàn)了并發(fā)消費(fèi)彬坏,將原本的單一隊(duì)列 Buffer 按照下游 Task 拆分成多個(gè)隊(duì)列 Buffer朦促,Iceberg 表中不同分區(qū)的數(shù)據(jù)文件會(huì)按照寫入排序,并被 Hash 到不同的隊(duì)列栓始,實(shí)現(xiàn)消費(fèi)的分區(qū)有序务冕。

同時(shí)為了保證各個(gè) Task 消費(fèi)數(shù)據(jù)的對(duì)齊,我們使用 Snapshot 的提交時(shí)間來(lái)生成 Watermark混滔,引入 AlignedAssigner 來(lái)實(shí)現(xiàn)統(tǒng)一的 Split 分配洒疚,在分配端實(shí)現(xiàn)對(duì)齊,保證下游各個(gè) Task 消費(fèi)數(shù)據(jù)的對(duì)齊坯屿。

19.jpg

上面我們講到的自適應(yīng)掃描只能解決單個(gè) Source 實(shí)例的問(wèn)題油湖,在實(shí)際應(yīng)用中,部分場(chǎng)景仍有潛在穩(wěn)定性問(wèn)題存在领跛,例如集成場(chǎng)景中的指標(biāo)拆分乏德,將一張表的數(shù)據(jù)拆分至多張表;數(shù)倉(cāng)場(chǎng)景中,對(duì)同一張表進(jìn)行多次引用喊括,篩選不同部分的數(shù)據(jù)進(jìn)行 Join胧瓜。在這兩個(gè)使用場(chǎng)景中,因?yàn)椴粷M足 Source 復(fù)用規(guī)則郑什,會(huì)有多個(gè)讀取同一張 Source 表的實(shí)例存在府喳。

在 Flink 中,Source 的復(fù)用受 Partition蘑拯、Limit钝满、Project、Filter 影響申窘,以 Project 和 Filter 為例弯蚜。上圖左邊的 SQL 描述了 Project 下推導(dǎo)致的復(fù)用失效,因一個(gè)字段的區(qū)別剃法,同一份數(shù)據(jù)就會(huì)被讀取三次碎捺;上圖右邊的 SQL 描述了 Filter 下推導(dǎo)致復(fù)用失效的場(chǎng)景,即使選取的范圍有很大重復(fù)贷洲,但 Source 仍不會(huì)得到復(fù)用收厨。由于復(fù)用的失效,同一個(gè)表的相同 Split 會(huì)在內(nèi)存中存在多份恩脂,依然有出現(xiàn)內(nèi)存問(wèn)題的可能帽氓。

20.jpg

為了優(yōu)化這種情況,我們引入了兩種方式俩块。

  • 通過(guò)規(guī)則來(lái)自動(dòng)對(duì)當(dāng)前不符合復(fù)用的 Source 算子執(zhí)行復(fù)用黎休。

  • 支持在 SQL 中手動(dòng)指定表實(shí)行強(qiáng)制復(fù)用。這樣的優(yōu)化在減少內(nèi)存占用玉凯、減少數(shù)據(jù)重復(fù)讀取方面有著不錯(cuò)的效果势腮,節(jié)省作業(yè)資源的同時(shí)也增加了穩(wěn)定性。

21.jpg

通過(guò)切換至新架構(gòu)漫仆,消費(fèi) Iceberg 表的平均掃描間隔降至小于 1 秒捎拯,單個(gè) Task 吞吐提升至 70 萬(wàn)條每秒,實(shí)時(shí)數(shù)倉(cāng)鏈路新鮮度提升至 5 分鐘內(nèi)盲厌。

三署照、流批一體實(shí)時(shí)數(shù)倉(cāng)探索

上一章介紹了 Flink 讀取 Iceberg 架構(gòu)的優(yōu)化,這一章將主要介紹小米在 Flink 流批一體實(shí)時(shí)數(shù)倉(cāng)上遇到的問(wèn)題以及相關(guān)探索吗浩。

22.jpg

遇到的問(wèn)題可以歸結(jié)為三類建芙。

第一類是數(shù)據(jù)波動(dòng),實(shí)時(shí)數(shù)倉(cāng)中數(shù)據(jù)是不斷變化的懂扼,由于 Flink 回撤機(jī)制的存在禁荸,-U 和+U 會(huì)拆分為兩條數(shù)據(jù)寫入右蒲,在-U 寫入,+U 未寫入時(shí)執(zhí)行查詢赶熟,會(huì)查詢到異常數(shù)據(jù)瑰妄,而在+U 寫入后又能查詢到正常結(jié)果。

第二類是計(jì)算不確定性映砖,F(xiàn)link 的狀態(tài)過(guò)期會(huì)導(dǎo)致計(jì)算結(jié)果的不確间坐。同時(shí)針對(duì)這部分異常數(shù)據(jù),往往沒(méi)有簡(jiǎn)單的對(duì)比啊央、修復(fù)手段眶诈,這導(dǎo)致實(shí)時(shí)數(shù)據(jù)產(chǎn)出的數(shù)據(jù)修復(fù)難。

23.jpg

針對(duì)數(shù)據(jù)波動(dòng)問(wèn)題瓜饥,考慮到下游絕大多數(shù)系統(tǒng)都能夠支持 Upsert 寫入,我們引入了寫入前數(shù)據(jù)丟棄能力浴骂,用于丟棄無(wú)關(guān)緊要的數(shù)據(jù)乓土,將其稱為 Drop Operator。該算子作用在 Sink 節(jié)點(diǎn)前溯警,能夠根據(jù)配置丟棄指定類型的數(shù)據(jù)趣苏。

針對(duì) Flink 聚合增量數(shù)據(jù)寫入 ADS 層 MySQL 的場(chǎng)景,可以配置丟棄-U梯轻,避免 ADS 層查詢波動(dòng)食磕。同樣,該配置可以很方便的將 Changelog 流丟棄-U 和-D 轉(zhuǎn)為 Append 流喳挑,滿足一些特殊的業(yè)務(wù)場(chǎng)景彬伦。

24.jpg

在解決計(jì)算的不確定性前,我們需要先了解其產(chǎn)生的原因伊诵。在 Flink SQL 中单绑,狀態(tài)起著重要作用,正確的中間狀態(tài)是計(jì)算結(jié)果正確的必要條件曹宴。但顯然搂橙,目前狀態(tài)的保持是昂貴的,我們需要一個(gè)狀態(tài)過(guò)期策略來(lái)進(jìn)行平衡笛坦。

在 Flink 內(nèi)部区转,有著 Watermark 清理和 TTL 清理兩類算子。Watermark 可以根據(jù)業(yè)務(wù)的需要去生成版扩,清理的策略根據(jù)實(shí)際使用場(chǎng)景制定废离,所以對(duì)計(jì)算結(jié)果影響可控。而依賴 TTL 清理的這類算子资厉,在 Flink SQL 中狀態(tài)過(guò)期的策略無(wú)法得到準(zhǔn)確控制厅缺,只能設(shè)置一個(gè)統(tǒng)一的狀態(tài)過(guò)期時(shí)間,往往因?yàn)檫^(guò)期時(shí)間設(shè)置不合理或者滿足不了業(yè)務(wù)需求,從而產(chǎn)生預(yù)料之外的計(jì)算結(jié)果湘捎。

25.jpg

例如物流诀豁、服務(wù)單場(chǎng)景,訂單從創(chuàng)建到關(guān)閉的時(shí)間跨度往往很長(zhǎng)窥妇,很容易出現(xiàn)在訂單還沒(méi)有結(jié)束前舷胜,狀態(tài)就過(guò)期了。為了解決訂單跨度時(shí)間長(zhǎng)導(dǎo)致?tīng)顟B(tài)丟失的問(wèn)題活翩,業(yè)務(wù)會(huì)設(shè)置一個(gè)離線的 Topic烹骨,通過(guò)離線鏈路定期往離線 Topic 里補(bǔ)數(shù)據(jù),補(bǔ)充的數(shù)據(jù)重新流入實(shí)時(shí)鏈路中材泄,將過(guò)期狀態(tài)重新補(bǔ)回沮焕。

26.jpg

針對(duì)由狀態(tài)過(guò)期而導(dǎo)致的計(jì)算不確定問(wèn)題,我們有兩種解決思路拉宗。

  • 針對(duì)定時(shí)批量補(bǔ)狀態(tài)峦树,我們嘗試從源頭解決問(wèn)題,讓狀態(tài)按需過(guò)期旦事,減少額外鏈路維護(hù)魁巩。

  • 針對(duì)定時(shí)批量修復(fù)異常數(shù)據(jù),我們想要提供更加簡(jiǎn)單方便的修復(fù)途徑姐浮。

27.jpg

為了能夠讓狀態(tài)按需過(guò)期谷遂,我們引入了算子級(jí)的狀態(tài)清理功能,將清理規(guī)則應(yīng)用范圍從作業(yè)細(xì)化到各個(gè)算子卖鲤,將清理規(guī)則從時(shí)間規(guī)則拓展到業(yè)務(wù)規(guī)則肾扰,并通過(guò) Query Hint 對(duì)算子提供靈活、方便的定義扫尖。

28.jpg

目前該功能支持兩類算子白对,分別是 Group 聚合算子和 Regular Join 算子,上圖表格為支持的參數(shù)换怖,通過(guò) TTL 的參數(shù)可以設(shè)置該算子狀態(tài)的過(guò)期時(shí)間甩恼,condition 參數(shù)可以填寫清理規(guī)則,為了方便判斷沉颂,清理規(guī)則需要是布爾表達(dá)式条摸。

29.jpg

上圖的 SQL 展示了求某類商品總銷量的聚合計(jì)算邏輯,該聚合算子狀態(tài)保存時(shí)間為 30 天铸屉,覆蓋了作業(yè)級(jí)的 1 天保存時(shí)間钉蒲,且當(dāng)商品狀態(tài)為售罄或下架,那么就清除該商品的狀態(tài)彻坛,這意味著有關(guān)該商品的銷售記錄后續(xù)不會(huì)再出現(xiàn)顷啼。

在聚合算子里踏枣,我們加入了一個(gè)狀態(tài)清理的檢查器,將用戶設(shè)置的清理規(guī)則經(jīng)過(guò) codegen 轉(zhuǎn)換為 Java 代碼钙蒙,在聚合計(jì)算后進(jìn)行規(guī)則檢查茵瀑,匹配成功后執(zhí)行清理。

30.jpg

同樣針對(duì) Join 算子躬厌,狀態(tài)清理檢查器的實(shí)現(xiàn)類似马昨,只是在 Join 算子會(huì)對(duì)左右表的狀態(tài)分別進(jìn)行清理,清理完后會(huì)去對(duì)方狀態(tài)中將引用計(jì)數(shù)-1扛施。

上圖的 SQL 示例描述了一個(gè)物流表的 Join 場(chǎng)景鸿捧,左表為物流訂單表,保存著訂單狀態(tài)以及更新時(shí)間疙渣,右表為維度表匙奴,保存著該訂單的一些基礎(chǔ)信息,包括創(chuàng)建時(shí)間昌阿。在圖中的例子中饥脑,Join 算子的狀態(tài)清理不再依賴 Proctime,只依賴于運(yùn)單狀態(tài)和運(yùn)單的持續(xù)時(shí)間懦冰。

31.jpg

雖然算子級(jí)狀態(tài)清理能夠解決一部分需求,但它的使用門檻較高谣沸,且并非所有業(yè)務(wù)都有明確的清理規(guī)則刷钢,一個(gè)簡(jiǎn)單方便的修復(fù)手段才適用于所有場(chǎng)景。如果想要用 Flink Batch 對(duì)數(shù)據(jù)進(jìn)行修復(fù)乳附,目前有 INSERT 和 OVERWRITE 兩種方式内地。使用 INSERT 實(shí)現(xiàn) SQL 邏輯較為復(fù)雜,且只能對(duì)數(shù)據(jù)進(jìn)行覆蓋赋除,不能刪除阱缓;OVERWRITE 的修復(fù)方式粒度較粗,而且會(huì)使下游實(shí)時(shí)作業(yè)產(chǎn)生較大波動(dòng)举农。

在這樣的場(chǎng)景下荆针,我們使用 Flink 實(shí)現(xiàn)了 Merge 語(yǔ)法。Merge 語(yǔ)法會(huì)對(duì)兩個(gè)數(shù)據(jù)源做 Join颁糟,并可以針對(duì)不同的 Join 情況執(zhí)行增航背、刪、改操作棱貌,對(duì)下游影響小玖媚。

32.jpg

在具體的實(shí)現(xiàn)上,我們?cè)谠镜?Calcite 語(yǔ)法上完善了 Merge 語(yǔ)法的解析邏輯婚脱,支持為每個(gè) Action 設(shè)置獨(dú)立的判斷條件今魔,在 Schema 匹配情況下支持 Insert * 和 Update * 語(yǔ)句勺像,簡(jiǎn)化邏輯。

在 SQL 校驗(yàn)階段错森,Merge 邏輯會(huì)被轉(zhuǎn)為 Outer Join 和多個(gè) Merge Action 的結(jié)合吟宦。在優(yōu)化階段,目前我們會(huì)根據(jù)實(shí)際的 Merge Action 情況來(lái)優(yōu)化 Join 方式问词,將默認(rèn)的 Outer Join 改寫為 Anti Join 或者 Inner Join督函,減少處理的數(shù)據(jù)量。

最終激挪,Merge 邏輯會(huì)生成 Join 和 MergeAction 兩個(gè)算子辰狡,Merge Action 算子根據(jù)上游 Join 情況來(lái)生成增、刪垄分、改數(shù)據(jù)并發(fā)往下游宛篇。由于 Flink SQL 目前提供了優(yōu)秀的流批一體架構(gòu),可以復(fù)用當(dāng)前的邏輯薄湿,將增刪改數(shù)據(jù)寫入下游數(shù)據(jù)系統(tǒng)叫倍。

33.jpg

在我們內(nèi)部,Spark 和 Flink 目前都支持 Merge 語(yǔ)法豺瘤,但 Spark 在框架層只提供了語(yǔ)法側(cè)的支持吆倦,Runtime 層的支持在 Iceberg 側(cè)由插件實(shí)現(xiàn)。Flink 則在框架中實(shí)現(xiàn)了語(yǔ)法和 Runtime 層的支持,使得 Merge 的功能更加通用,也能夠支持更多存儲(chǔ)系統(tǒng)。目前动知,在我們內(nèi)部奠滑,F(xiàn)link merge into 入湖和入庫(kù)場(chǎng)景使用較多摊崭。

34.jpg

因?yàn)閷?shí)現(xiàn)原因矮台,Spark 在我們內(nèi)部目前僅能支持 Merge into 入湖蛤迎,所以我們?cè)?Merge into 入湖場(chǎng)景下對(duì) Spark 和 Flink 的處理速度做了測(cè)試,目前中小批量數(shù)據(jù)的 Merge 操作 Flink 執(zhí)行速度會(huì)略快宜咒,大數(shù)據(jù)場(chǎng)景下 Flink 因?yàn)槿牒俣容^ Spark 慢纸镊,所以耗時(shí)稍多岔冀,但整體來(lái)看,F(xiàn)link 已經(jīng)能夠滿足日常修復(fù)需求。

四夯尽、未來(lái)展望

35.jpg

我們的未來(lái)展望主要包括以下三點(diǎn):

  • 基于 Iceberg 的秒級(jí)湖倉(cāng)建設(shè)圈纺,目前 Flink+Iceberg 在我們內(nèi)部實(shí)時(shí)鏈路中能夠很好的支持分鐘級(jí)的場(chǎng)景法褥,我們希望未來(lái)在實(shí)時(shí)性上有所突破杀饵,將鏈路新鮮度提升至秒級(jí)。
  • 基于 Iceberg 的完整 CDC 的支持葡幸。目前蔑水,如果一張 Iceberg 被多個(gè)上游并行寫入领曼,或者單個(gè)作業(yè)回溯寫入府适,我們需要使用 Upsert 模式疟暖,寫入+I 或+U 前默認(rèn)寫入一條-D,但因?yàn)槿鄙傩畔⒆嚎模瑢懭氲?Delete 可能是多余的且無(wú)法獲取到正確的非主鍵列值缘圈,我們希望在后期能夠?qū)ζ渫晟疲沟孟掠文軌蜃x取到正確完整的 CDC 數(shù)據(jù)袜蚕。
  • 跟進(jìn)基于 Flink SQL Gateway糟把,完善動(dòng)態(tài)查詢的支持。

點(diǎn)擊查看原文視頻 & 演講PPT

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末牲剃,一起剝皮案震驚了整個(gè)濱河市糊饱,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌颠黎,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,607評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件滞项,死亡現(xiàn)場(chǎng)離奇詭異狭归,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)文判,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,239評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門过椎,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人戏仓,你說(shuō)我怎么就攤上這事疚宇⊥鍪螅” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 164,960評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵敷待,是天一觀的道長(zhǎng)间涵。 經(jīng)常有香客問(wèn)我,道長(zhǎng)榜揖,這世上最難降的妖魔是什么勾哩? 我笑而不...
    開(kāi)封第一講書人閱讀 58,750評(píng)論 1 294
  • 正文 為了忘掉前任,我火速辦了婚禮举哟,結(jié)果婚禮上思劳,老公的妹妹穿的比我還像新娘。我一直安慰自己妨猩,他們只是感情好潜叛,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,764評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著壶硅,像睡著了一般威兜。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上森瘪,一...
    開(kāi)封第一講書人閱讀 51,604評(píng)論 1 305
  • 那天牡属,我揣著相機(jī)與錄音,去河邊找鬼扼睬。 笑死逮栅,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的窗宇。 我是一名探鬼主播措伐,決...
    沈念sama閱讀 40,347評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼军俊!你這毒婦竟也來(lái)了侥加?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書人閱讀 39,253評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤粪躬,失蹤者是張志新(化名)和其女友劉穎担败,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體镰官,經(jīng)...
    沈念sama閱讀 45,702評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡提前,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,893評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了泳唠。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片狈网。...
    茶點(diǎn)故事閱讀 40,015評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出拓哺,到底是詐尸還是另有隱情勇垛,我是刑警寧澤,帶...
    沈念sama閱讀 35,734評(píng)論 5 346
  • 正文 年R本政府宣布士鸥,位于F島的核電站闲孤,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏础淤。R本人自食惡果不足惜崭放,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,352評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望鸽凶。 院中可真熱鬧币砂,春花似錦、人聲如沸玻侥。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 31,934評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)凑兰。三九已至掌桩,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間姑食,已是汗流浹背波岛。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 33,052評(píng)論 1 270
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留音半,地道東北人则拷。 一個(gè)月前我還...
    沈念sama閱讀 48,216評(píng)論 3 371
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像曹鸠,于是被迫代替她去往敵國(guó)和親煌茬。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,969評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容