本文整理自阿里云高級開發(fā)工程師佳遂,F(xiàn)link Committer 羅宇俠老師在 Flink Forward Asia 2024上海站分論壇流批一體(二)中的分享台妆,內(nèi)容主要分為以下四個部分:
一昆雀、湖流割裂的現(xiàn)狀和挑戰(zhàn)
二词身、Fluss 湖流一體架構(gòu)
三冶匹、湖流一體架構(gòu)的收益
四椰弊、未來規(guī)劃
一、湖流割裂的現(xiàn)狀和挑戰(zhàn)
從 Lambda 架構(gòu)到數(shù)據(jù)湖統(tǒng)一存儲架構(gòu)
在大數(shù)據(jù)處理領(lǐng)域温鸽,Lambda 架構(gòu)是使用非常廣泛的一種架構(gòu)保屯。Lambda 架構(gòu)將數(shù)據(jù)處理分成單獨的兩條鏈路,一條是離線計算鏈路涤垫,通常由 Hive 作為離線計算鏈路的存儲姑尺,另外一條是實時鏈路,通常由流存儲蝠猬,如 Kafka 作為實時鏈路的存儲切蟋。
隨著技術(shù)的演進(jìn), Apache Paimon榆芦,Apache Iceberg 柄粹,Apache Hudi 等湖存儲在支持大數(shù)據(jù)量的批式計算的基礎(chǔ)上喘鸟,還可以提供分鐘級別的數(shù)據(jù)新鮮度,Lamda 架構(gòu)中的兩套不同的存儲逐漸被統(tǒng)一的數(shù)據(jù)湖存儲替代了驻右。數(shù)據(jù)湖統(tǒng)一存儲極大地簡化整個架構(gòu)什黑,并且可以同時滿足離線和近實時(分鐘級別數(shù)據(jù)新鮮度)的需求,逐漸開始變得流行堪夭。
盡管數(shù)據(jù)湖統(tǒng)一存儲架構(gòu)非常簡潔和高效兑凿,但是最多只能提供分鐘級別的數(shù)據(jù)新鮮度。雖然部分場景對數(shù)據(jù)新鮮度要求不高茵瘾,但是數(shù)據(jù)新鮮度的重要性依然不容忽視礼华。在對數(shù)據(jù)新鮮度具有強(qiáng)訴求(秒級延遲)的場景下,如實時用戶圈選拗秘,異常檢測圣絮,廣告歸因等等,不可避免地將在整個架構(gòu)中又引入可以提供秒級數(shù)據(jù)新鮮度的流存儲雕旨,如 Kafka扮匠。
湖流割裂面臨的挑戰(zhàn)
當(dāng)引入 Kafka 后,整個架構(gòu)又回到了 Lambda 架構(gòu)了凡涩,依然是兩套存儲棒搜,只不過傳統(tǒng)的 Lambda 架構(gòu)的 Hive 換成了湖存儲而已。
湖存儲和流存儲割裂地混在 Lambda 架構(gòu)當(dāng)中活箕,湖流割裂在架構(gòu)層面和數(shù)據(jù)層面都面臨著不小的挑戰(zhàn):
-
架構(gòu)
架構(gòu)復(fù)雜:兩套存儲力麸,兩套代碼,兩條鏈路育韩,數(shù)據(jù)開發(fā)周期長
運(yùn)維監(jiān)控繁瑣:每套存儲都需要單獨的故障排除克蚂,監(jiān)控,升級等
資源浪費(fèi):同樣的邏輯計算兩遍筋讨,需要更多的資源
-
數(shù)據(jù)
數(shù)據(jù)一致性:流存儲鏈路和湖存儲鏈路很難保證結(jié)果一致
數(shù)據(jù)治理:元數(shù)據(jù)管理埃叭,數(shù)據(jù)血緣,數(shù)據(jù)質(zhì)量等在兩套存儲中治理起來難度很大
數(shù)據(jù)冗余:相同的數(shù)據(jù)重復(fù)存儲在兩套存儲中
湖流一體的業(yè)界趨勢
引入流存儲本身并不是一個問題悉罕,天下沒有免費(fèi)的午餐赤屋,要秒級的數(shù)據(jù)新鮮度勢必要引入流存儲。核心問題在于不能讓流存儲和湖存儲互相割裂壁袄,理想的架構(gòu)應(yīng)該是流和湖互為一體类早,互為補(bǔ)充,湖提供高效的歷史數(shù)據(jù)處理能力然想,而流存儲提供秒級數(shù)據(jù)新鮮度和Serving 能力莺奔。
業(yè)界知名的流存儲廠商也在湖流一體方向上做了不少工作欣范,Kafka 的商業(yè)化公司 Confluent 提出 TableFlow变泄,Kafka 中的數(shù)據(jù)將被 TableFlow 轉(zhuǎn)成 Iceberg 格式令哟,分析引擎可以直接在 Iceberg 格式的數(shù)據(jù)上進(jìn)行高效查詢;RedPanda 公司也提出了 Iceberg Topic妨蛹,如果一個 Topic 是 Iceberg Topic屏富,數(shù)據(jù)也將同時轉(zhuǎn)成 Iceberg 格式,分析引擎也可以查 Iceberg 格式的表蛙卤。
可以看到狠半,流和湖之間正在逐步靠攏,各補(bǔ)所長颤难,可以預(yù)見神年,未來流和湖的結(jié)合只會越來越緊密。不過他們提出的湖與流的結(jié)合還有很多改進(jìn)空間行嗤,比如數(shù)據(jù)依然存在冗余存儲的問題已日,讀依然是兩套 API 等。并且它們其實是自己在已有的流存儲的基礎(chǔ)上與湖進(jìn)行結(jié)合栅屏,很難做到完美融合飘千,比如流存儲在數(shù)據(jù)分布上其實就很難和湖存儲對齊,湖存儲有分區(qū)表的概念栈雳,但是Kafka 就沒有护奈。當(dāng)然,還會有各種各樣的其他的問題哥纫,簡而言之霉旗,它們并不是湖原生的。
二蛀骇、Fluss 湖流一體架構(gòu)
湖流一體是未來數(shù)據(jù)湖和流存儲發(fā)展的一個大趨勢奖慌,F(xiàn)luss 作為一款面向?qū)崟r分析設(shè)計的流存儲,從一開始就采用了湖流一體的架構(gòu)設(shè)計松靡,可以更好的融入用戶已有的 Lakehouse 架構(gòu)简僧。關(guān)于 Fluss 更多細(xì)節(jié)介紹,可以查看 FFA2024 主論壇上的 Fluss 分享《Fluss:面向面向?qū)崟r分析設(shè)計的流存儲》雕欺。Fluss 的核心特性包括實時的流讀流寫岛马、列式裁剪、流式的更新屠列、CDC訂閱啦逆、實時點查、還有湖流一體笛洛。本文將著重介紹湖流一體的設(shè)計和收益夏志。Fluss 湖流一體的架構(gòu)如下圖所示:
Fluss 的 server 集群提供了數(shù)據(jù)的實時寫入和讀取,提供了毫秒級的端到端延遲苛让。同時沟蔑,F(xiàn)luss 的 Compaction Service 會將 Fluss 中的數(shù)據(jù) compact 成標(biāo)準(zhǔn)的湖格式湿诊,如 Paimon/Iceberg 等,外部的查詢引擎就可以直接在湖格式上的數(shù)據(jù)進(jìn)行分析瘦材。
另外厅须,最新的數(shù)據(jù)在 Fluss 中,歷史數(shù)據(jù)在 Paimon 中食棕,F(xiàn)link 可以支持 Union Read朗和,將Fluss 和 Paimon 中的數(shù)據(jù) Union 起來實現(xiàn)秒級新鮮度的分析。
統(tǒng)一元數(shù)據(jù)
之前流存儲 Kafka 和湖存儲 Paimon 割裂的存在簿晓,其都有一套元數(shù)據(jù)眶拉,對計算引擎(如 Flink)來說是兩套單獨的 Catalog,兩張單獨的表憔儿,用戶需要創(chuàng)建兩個Catalog镀层,訪問數(shù)據(jù)的時候需要手動切換 Catalog 來確定是訪問流存儲還是湖存儲的數(shù)據(jù),十分繁瑣皿曲。
在 Fluss 構(gòu)建湖流一體架構(gòu)下唱逢,雖然 Fluss 和 Paimon 也都有單獨的元數(shù)據(jù),但是對計算引擎(如 Flink)暴露的是一個 Catalog屋休,一張統(tǒng)一的表坞古。用戶不需要切換 Catalog 也可以直接訪問湖存儲的數(shù)據(jù),以及直接訪問流存儲 Fluss中的數(shù)據(jù)劫樟,以及Union 訪問 Fluss 和湖存儲中的數(shù)據(jù)痪枫。
數(shù)據(jù)分布的對齊
Fluss 和湖存儲 Paimon 中的數(shù)據(jù)分布是嚴(yán)格對齊的,F(xiàn)luss 也有分區(qū)表叠艳,也有 bucket奶陈,并且 Fluss 的 bucketing 算法與 Paimon 的 bucketing 算法是一致的,確保了一條數(shù)據(jù)被一致地分配到同樣的 bucket附较,即Fluss 的 bucket 和 Paimon 的 bucket是一一對應(yīng)的吃粒。
這種數(shù)據(jù)分布的強(qiáng)一致性有兩個重要的好處:
- Compact 的時候避免 Shuffle 開銷
在將 Fluss 中的數(shù)據(jù) Compact 成 Paimon 格式的時候,可以直接將Fluss 的某個 bucket拒课,如 bucket1 的數(shù)據(jù)文件直接 compact 到 Paimon 的 bucket1徐勃,而不需要將 Fluss 的 bucket1 的數(shù)據(jù)讀出來,判斷每條數(shù)據(jù)屬于 Paimon 中的哪個bucket早像,然后寫到對應(yīng) bucket 中僻肖,避免了 Shuffle 的開銷。
- 避免數(shù)據(jù)的不一致
bucketing 算法是指對一條數(shù)據(jù)計算其所屬的 bucket卢鹦,F(xiàn)luss 對齊了 Paimon 使用了一致的 bucketing 算法臀脏,即 bucket_id = hash(row) % bucket_num
,并采用一樣的 hash 算法。如果 Fluss 和 Paimon 采用不一樣的 bucketing 算法的話揉稚,就會出現(xiàn)數(shù)據(jù)不一致現(xiàn)象秒啦。比如對于主鍵表來說,對于一條數(shù)據(jù) a窃植,可能 Fluss 將其分配到 bucket1帝蒿,而 Paimon 將其分配到 bucket2荐糜,如果 Compaction Service 將這條數(shù)據(jù)同步到 Paimon 中的 bucket1 的話巷怜,用戶在 Paimon 中就查不到這條數(shù)據(jù)了。而通過保證數(shù)據(jù)分布的強(qiáng)一致性暴氏,則不存在這個問題延塑。
流讀:更高效的數(shù)據(jù)回追
歷史數(shù)據(jù)在湖存儲中,實時數(shù)據(jù)在 Fluss 中答渔,在流讀場景下关带,F(xiàn)luss 先讀湖格式的歷史數(shù)據(jù)進(jìn)行數(shù)據(jù)回追,然后再讀 Fluss 的實時數(shù)據(jù)沼撕。借助湖存儲高效的過濾條件下推宋雏、列裁剪、高壓縮率等優(yōu)勢务豺,可以實現(xiàn)高效的數(shù)據(jù)回追磨总。
批讀:秒級數(shù)據(jù)新鮮度
歷史數(shù)據(jù)在湖存儲中,實時數(shù)據(jù)在 Fluss 中笼沥,在批讀場景下蚪燕,計算引擎(如 Flink)可以將 Fluss 中的數(shù)據(jù)和湖存儲中的進(jìn)行union讀,以達(dá)到極致的秒級數(shù)據(jù)新鮮度的分析奔浅。
Flink + Fluss
Fluss 對 Flink用戶暴露了統(tǒng)一的 API支持用戶選擇 union 讀還是只讀數(shù)據(jù)湖上的數(shù)據(jù)馆纳,通過如下的 SQL:
SELECT * FROM orders
代表讀取 orders 表的完整數(shù)據(jù),則 Flink 將 union 讀 Fluss和數(shù)據(jù)湖上的數(shù)據(jù)汹桦。
如果用戶只需要讀取數(shù)據(jù)湖上的數(shù)據(jù)鲁驶,可以在要讀的表后面加上 $lake
后綴,SQL 如下所示
-- analytical queries
SELECT COUNT(*), MAX(t), SUM(amount)
FROM orders$lake
-- query system tables
SELECT * FROM orders$lake$snapshots
對于只讀數(shù)據(jù)湖上的數(shù)據(jù)的場景舞骆,F(xiàn)luss 繼承了湖格式作為一個 Flink source 的全部優(yōu)化和能力灵嫌,如 runtime filter,系統(tǒng)表查詢葛作,time travel 等寿羞。
三、湖流一體架構(gòu)的收益
接下來赂蠢,以湖存儲 Paimon 為例绪穆,闡述一下在 Paimon 的基礎(chǔ)上,引入 Fluss 來構(gòu)建湖流一體架構(gòu)的收益。
數(shù)據(jù)湖的時效提升至秒級
對于 Paimon 來說玖院,數(shù)據(jù)的可見性取決于 Flink checkpoint 的時間間隔菠红,通常是分鐘級別的,但是通過 Fluss + Paimon 構(gòu)建湖流一體架構(gòu)后难菌,數(shù)據(jù)的可見性不再取決于 Flink checkpoint 的時間間隔试溯,數(shù)據(jù)進(jìn)入 Fluss 后就可見,數(shù)據(jù)的時效性提升至秒級郊酒。
數(shù)倉分層每層表數(shù)據(jù)新鮮度一致遇绞,不受層級影響
在數(shù)倉的建設(shè)過程中,為了更好地管理數(shù)據(jù)燎窘,通常會進(jìn)行分層摹闽,如 ODS層,DWD層褐健,ADS層等付鹿,原始數(shù)據(jù)會在數(shù)倉多層進(jìn)行流轉(zhuǎn)。如果只通過 Paimon 來作為每一層的存儲蚜迅,由于 Paimon 只有在Flink checkpoint 后數(shù)據(jù)才可見舵匾,其對應(yīng)的 changelog 才會流轉(zhuǎn)到下一層,那么每一層的數(shù)據(jù)新鮮度都會增加一個 checkpoint 的時間谁不。如果 Flink checkpoint 的時間設(shè)置為 3 分鐘的話坐梯,那么 ODS層,DWD層拍谐,ADS層的數(shù)據(jù)延遲將分別為3分鐘烛缔,6分鐘,9分鐘轩拨。
而如果基于 Fluss + Paimon 作為每一層的存儲践瓷,則數(shù)據(jù)新鮮度不受層級的影響,每一層的數(shù)據(jù)新鮮度都是秒級亡蓉。一條數(shù)據(jù)到達(dá) ODS 層之后晕翠,其 changelog 會立刻流轉(zhuǎn)到下一層,而不用等 Flink checkpoint 的完成砍濒,以此類推淋肾,每一層的數(shù)據(jù)新鮮度都可以保證一致。如果 Fluss 的 Compaction Service 設(shè)置3分鐘的 compact 周期爸邢,那么對于Paimon中的數(shù)據(jù)樊卓,每一層的數(shù)據(jù)延遲都是3分鐘。
更高效更高吞吐的 changelog 生成
目前Paimon 通用的 changelog 生成方式主要有兩種(Input producer 對數(shù)據(jù)源的要求較高杠河,暫不考慮)碌尔, Lookup change producer 和 Full compaction producer浇辜。
Lookup change producer 的生成方式時效性好,但是需要更多的資源唾戚。Full Compaction Producer不需要多余的資源消耗柳洋,會在 Full Compaction 的時候生成 changelog,但是時效性差叹坦,因為需要等 Full Compaction 的觸發(fā)熊镣,通常是若干個 checkpoint。
而在 Fluss + Paimon 的架構(gòu)下募书,changelog 的生成則可以兼顧時效性和性能绪囱。對于 Fluss 來說,changelog 的生成是秒級的锐膜,同時 Fluss compaction service 則可以將 Fluss 的 changelog 直接寫成 Paimon 的 changelog 格式毕箍,轉(zhuǎn)換成 Paimon changelog 這個過程是很高效的弛房,因為并不涉及到 lookup 等開銷道盏,只是一次直讀直寫。
解決 Paimon 部分更新不支持多 writer 的問題
Paimon 的部分更新是使用非常多的一個功能文捶,特別適用于大寬表的場景荷逞。但是在 Paimon 中,如果要對一個寬表進(jìn)行部分更新粹排,則需要將所有對這個表的部分更新都放到一個 SQL作業(yè)里面种远,然后又要通過一個 UNION 語句將所有對這個表部分更新的 SQL 語句 union 到一起,保證只有一個 writer 來寫這個寬表顽耳。這也導(dǎo)致了作業(yè)不好管理和單獨調(diào)優(yōu)。
而在 Fluss + Paimon 的架構(gòu)下,則沒有這個問題了圣勒。因為所有的更新會先走 Fluss盆色,由Fluss再將更新同步到 Paimon,而 Fluss 的部分更新可以支持多作業(yè)同時并發(fā)更新胰耗。所以在新架構(gòu)下限次,你可以有任意多個 SQL 作業(yè)來對這個寬表進(jìn)行任意多列的部分更新,可以進(jìn)行 per-job 級別的調(diào)優(yōu)的管理柴灯。
總結(jié)一下卖漫,通過 Fluss 來構(gòu)建湖流一體架構(gòu)將帶來如下收益:
湖存儲強(qiáng)實時化,邁向秒級數(shù)據(jù)新鮮度
統(tǒng)一湖流赠群,write once羊始,run batch & stream
降低維護(hù),降低重復(fù)數(shù)據(jù)存儲的成本查描,降低重復(fù)加工鏈路的成本
四突委、未來規(guī)劃
目前速警,F(xiàn)luss 社區(qū)在湖流一體方向上的規(guī)劃主要有以下三點:
Union Read 生態(tài):目前 Union Read 的能力只對接了 Flink, 未來將對接更多的查詢引擎鸯两,如 Spark/StarRocks等闷旧。
湖生態(tài):目前 Fluss 只支持 Paimon 作為湖存儲,未來將支持更多的湖格式钧唐,如 Iceberg/Hudi等忙灼。
Arrow -> Paruqet 的高效轉(zhuǎn)換:Fluss 使用 Arrow 作為存儲格式,湖格式使用 Parquet 作為存儲格式钝侠,而 Arrow 到 Parquet 的轉(zhuǎn)換在 Arrow 社區(qū)有非常成熟和高效的解決方案该园,未來將支持 Arrow 到 Parquet 的高效轉(zhuǎn)換,降低 compaction service 成本帅韧。