摘要:本文整理自字節(jié)跳動基礎(chǔ)架構(gòu)工程師李國君,在 Streaming Lakehouse Meetup 的分享。幸福里業(yè)務(wù)是一種典型的交易、事務(wù)類型的業(yè)務(wù)場景,這種業(yè)務(wù)場景在實時數(shù)倉建模中遇到了諸多挑戰(zhàn)百揭。本次分享主要介紹幸福里業(yè)務(wù)基于 Flink & Paimon 構(gòu)建流式數(shù)倉的實踐經(jīng)驗,從業(yè)務(wù)背景蜓席、流批一體數(shù)倉架構(gòu)器一、實踐中遇到的問題和解決方案,借助 Paimon 最終能拿到的收益厨内,以及未來規(guī)劃方面進行介紹祈秕。
一、業(yè)務(wù)背景
幸福里業(yè)務(wù)是字節(jié)旗下關(guān)于房產(chǎn)的業(yè)務(wù)線雏胃,圍繞這個業(yè)務(wù)有很多針對 BP 支持的方向请毛,其中最重要的方向之一就是工單系統(tǒng)。工單系統(tǒng)面向的用戶是幸福里業(yè)務(wù)線一線的經(jīng)紀人和門店經(jīng)理等瞭亮。如下圖所示方仿,我們可以看下通過工單系統(tǒng),數(shù)據(jù)是如何產(chǎn)生和流轉(zhuǎn)的统翩。
首先由經(jīng)紀人將已完成的代看任務(wù)提交工單仙蚜,后續(xù)相應(yīng)的門店經(jīng)理會對該工單進行審核,在這個過程中就產(chǎn)生了兩條數(shù)據(jù)厂汗,并將其更新到業(yè)務(wù)庫的 Binlog 數(shù)據(jù)委粉,作為實時數(shù)倉的數(shù)據(jù)源進行計算后生成數(shù)據(jù)報表或直接用于一些考核系統(tǒng)。其中數(shù)據(jù)報表用于展示評估一線經(jīng)紀人的工作是否達標等娶桦;考核系統(tǒng)則用于門店經(jīng)理為一線經(jīng)紀人設(shè)定考核任務(wù)量的工作系統(tǒng)艳丛,通過任務(wù)量標準自動反饋獎勵等匣掸。因此在以上應(yīng)用的實時數(shù)倉建模上趟紊,我們發(fā)現(xiàn)房產(chǎn)類業(yè)務(wù)有兩個典型的特點:
準確性要求 100%氮双,不能有數(shù)據(jù)丟失和重復(fù)的情況發(fā)生。
需要全量計算霎匈,增量數(shù)據(jù)在 MQ 留存時間有限戴差,需要拿到全量數(shù)據(jù) View 進行計算。
實時數(shù)倉建模特點
在實際業(yè)務(wù)的實時數(shù)倉 Pipeline 中铛嘱,進入實時數(shù)倉前有多個數(shù)據(jù)源暖释,每個數(shù)據(jù)源的特點也都不同,所以實時增量部分會存在 MQ 中墨吓,全量數(shù)據(jù)則是存在 Hive 中球匕。
上圖實時數(shù)倉中的每一層都是由一個 Flink Streaming SQL 串聯(lián)起來的,DW 層的主要功能是把多個數(shù)據(jù)源進行 Join 打?qū)捥妫ㄟ^計算出來的寬表實現(xiàn)直接輸出進 MQ 中亮曹。由于 MQ 的留存時間有限會形成一個小時級或天級的周期性任務(wù),在一個周期結(jié)束后 MQ 中的數(shù)據(jù)最終會落到 Hive 里秘症。DWM 這一層主要的作用是聚合計算照卦,聚合計算的結(jié)果也會直接輸出到 MQ 中。每一層的計算模式都和上一層相同乡摹,實時數(shù)倉的計算結(jié)果會通過 Service 層服務(wù)于在線的數(shù)據(jù)應(yīng)用役耕,比如上面提到的數(shù)據(jù)報表和考核系統(tǒng)。每層輸出的 Hive 離線數(shù)據(jù)可以用于 BP 同學(xué)做數(shù)據(jù)排查/驗證等離線查詢工作聪廉。
回顧實時數(shù)倉的兩個特點瞬痘,一是準確性要求 100%,也就是說要求整個數(shù)倉的實時任務(wù)狀態(tài)算子都要維護全量數(shù)據(jù)板熊;二是需要全量計算框全,是指由于異構(gòu)存儲,實時數(shù)據(jù)存在 MQ邻邮,歷史數(shù)據(jù)存在 Hive竣况,那么就使得每層消費的 MQ 都需要實時消費增量數(shù)據(jù)和 Hive 全量數(shù)據(jù)。從開發(fā)工程師的視角這套實時數(shù)倉模型存在如下痛點:
在開發(fā)過程中需要時刻關(guān)注業(yè)務(wù)邏輯之外的邏輯筒严,比如在 SQL 中對數(shù)據(jù)的重復(fù)處理丹泉;在數(shù)據(jù)去重過程中,使用單一字段處理不夠精準鸭蛙,需要引入 Nanotime 做非確定性計算來解決問題等摹恨。之所以存在以上問題,主要是因為在整個鏈路中娶视,實時數(shù)據(jù)和離線數(shù)據(jù)是分開存儲的晒哄,這種存儲異構(gòu)使得兩部分的數(shù)據(jù)天然很難對齊睁宰。
這里的數(shù)據(jù)運維包含三個部分:數(shù)據(jù)排查各墨、數(shù)據(jù)驗證和數(shù)據(jù)訂正爹土。存在的問題是,在數(shù)據(jù)排查和數(shù)據(jù)驗證的過程中曼振,如果發(fā)現(xiàn)某條鏈路上的某個 SQL 作業(yè)需要訂正较木。訂正完成的 SQL 的結(jié)果輸出到 MQ 中红符,需要再將 MQ 中的數(shù)據(jù)落盤到存儲中的操作會產(chǎn)生 T+1 的代價。另外在訂正過程中的中間結(jié)果回退會直接暴露給用戶伐债。
第二個問題是如上圖紫色部分是一個簡化的鏈路预侯,而在實際生產(chǎn)過程中的復(fù)雜度很高,體現(xiàn)在主鏈路上的是一些表和任務(wù)會被其他很多任務(wù)或表依賴峰锁,使得訂正過程會影響到很多不可預(yù)知的表或任務(wù)萎馅。造成以上問題的原因,主要有兩點虹蒋,一個是數(shù)據(jù)訂正產(chǎn)生結(jié)果回退暴露給用戶糜芳,另外則是血緣關(guān)系復(fù)雜且需要人為維護。
在當前的這條鏈路上千诬,F(xiàn)link 實時任務(wù)的狀態(tài)維護是非常大的耍目,這就造成存儲和計算資源的消耗非常大,從這么大的狀態(tài)中恢復(fù)作業(yè)的過程也會很慢徐绑。產(chǎn)生狀態(tài)大問題的兩大原因主要是去重算子維護全量數(shù)據(jù)狀態(tài)和級聯(lián) Join 狀態(tài)重復(fù)邪驮。
為什么選擇 Paimon
基于以上存在的痛點,我們考慮希望通過 Flink 生態(tài)搭建 Steaming Lakehouse 的組合來解決原始鏈路上的問題傲茄,如上圖所示毅访,原始鏈路存在的問題有:
存儲異構(gòu),Base+Delta 數(shù)據(jù)難對齊盘榨;
去重引入非確定性計算和大狀態(tài)喻粹;
血緣關(guān)系復(fù)雜 & 數(shù)據(jù)訂正結(jié)果回退暴露給用戶。
對應(yīng)解決原始鏈路的問題草巡,我們選擇了 Paimon:
流批一體的存儲可以以統(tǒng)一 Table 對外輸出守呜,實時和離線數(shù)據(jù)可以存儲到一張 Paimon 表中,直接解決了對齊的問題山憨;
不需要去重查乒,Changelog Producer 代替狀態(tài)算子,同時支持在存儲上產(chǎn)生完整的 Log郁竟,并將其持久化代替原有鏈路上的狀態(tài)算子玛迄;
血緣管理 & 數(shù)據(jù)一致性管理,支持無感知數(shù)據(jù)訂正棚亩。
二蓖议、流式數(shù)倉實踐
首先介紹流式數(shù)倉實踐過程中的架構(gòu)設(shè)計虏杰,如下圖所示:
存儲層選用了 HDFS 或 S3 的對象存儲作為存儲底座,選用 Paimon 作為統(tǒng)一的 Table 抽象勒虾;
計算層選用 Flink 同一的技術(shù)棧纺阔,統(tǒng)一了流批計算;
數(shù)據(jù)管理層實現(xiàn)了 Table 的血緣管理和數(shù)據(jù)的血緣管理从撼,基于這樣的血緣管理可以做到數(shù)據(jù)一致性州弟,血緣管理可以用于數(shù)據(jù)溯源的需求,為數(shù)據(jù)質(zhì)量提供保障低零。
數(shù)據(jù)一致性管理,流批一體 ETL 數(shù)據(jù)管理拯杠。在多表一致性聯(lián)調(diào)的時候掏婶,可以自動對齊數(shù)據(jù),不需要開發(fā)人員手動對齊潭陪。
如上圖可見上層通過 Gateway 或 Service 層對外提供服務(wù)雄妥,最終用戶通過 SQL Client 或是 Rest API 訪問整個系統(tǒng)。
上圖是流式數(shù)倉 Pipeline依溯。數(shù)據(jù)源和前面提到的一樣老厌,離線數(shù)據(jù)存在 Hive 中,實時數(shù)據(jù)存在 MQ 中黎炉。不同的是在進入 Streaming Lakehouse 的時候枝秤,設(shè)置了一個 ODS 層,這層會通過 Flink Streaming SQL 把每一個數(shù)據(jù)源沉淀到 Paimon Table 里慷嗜。第二層是 DWD 層淀弹,通過對多個數(shù)據(jù)源進行 Join 打?qū)挼牟僮鳎瑢⑤敵龅慕Y(jié)果沉淀到 Paimon Table 里庆械。再通過最后一層 APP 層做指標聚合以及透出的工作薇溃。
由于中間數(shù)據(jù)都沉淀在 Paimon Table 中,所以開發(fā)人員在做數(shù)據(jù)排查和驗證的時候可以直接操作缭乘。通過上圖實時數(shù)倉的 Pipeline 可以看到存儲上是流批一體的沐序,在計算上也是用 Flink 的技術(shù)棧統(tǒng)一了流批計算,這樣可以減少開發(fā)和運維的成本堕绩。而且中間數(shù)據(jù)沉淀也是可直接查詢的策幼,不需要在運維的時候做更多繁瑣的操作。
在完成上述 Streaming Lakehouse 實踐落地后總結(jié)了如下收益:
- 簡化開發(fā)流程
流批一體存儲可以解決實時和離線存儲異構(gòu)的問題逛尚;
減少業(yè)務(wù)入侵垄惧,移除去重算子,解決非確定性計算绰寞。
- 提升運維體驗
中間數(shù)據(jù)可查到逊;數(shù)據(jù)可追溯铣口;
血緣關(guān)系 & 多表一致性,增強了多表關(guān)聯(lián)調(diào)試能力觉壶,并且可以做到數(shù)據(jù)訂正無感知脑题。
- 減少狀態(tài)量
Changelog 持久化,可以減少30%的狀態(tài)量铜靶。
在實踐過程中叔遂,除了獲得了不少收益,也同樣遇到了新的問題争剿,主要包括兩個:
數(shù)據(jù)新鮮度差:端到端的延遲變化為分鐘級已艰,數(shù)據(jù)新鮮度降低;
小文件問題:一些小文件可能會影響讀寫性能蚕苇。
三哩掺、流式數(shù)倉的調(diào)優(yōu)
端到端延遲調(diào)優(yōu)
首先要分析下整個鏈路數(shù)據(jù)的可見性與什么相關(guān)。如上圖所示涩笤,Source 在收到數(shù)據(jù)之后嚼吞,會把這些 Records 源源不斷的發(fā)給 Bucket,然后 Bucket Writer 在收到數(shù)據(jù)后蹬碧,先把這些數(shù)據(jù)緩存到一個基于內(nèi)存的 Buffer舱禽,存滿之后會觸發(fā)一個 Flash 將這個 Buffer 里的數(shù)據(jù)全部都 Flash 到磁盤上。這個時候就生成了對外不可見的數(shù)據(jù)文件恩沽,只有當上游觸發(fā)了一個 Checkpoint 的時候誊稚,整個鏈路中 Commit 算子生成一個 Snapshot 指向剛生成的數(shù)據(jù)文件才能對外可見。
分析整個流程飒筑,可以得出兩個結(jié)論:
數(shù)據(jù)可見性與 Checkpoint 綁定片吊。更嚴格的說是一個周期的數(shù)據(jù)可見性與 Checkpoint 周期嚴格綁定。
Checkpoint 周期 = Checkpoint interval + Checkpoint latency协屡。Checkpoint interval 是 Checkpoint 觸發(fā)的頻率俏脊;Checkpoint latency 是整個完成一個 Checkpoint 所需的耗時。
因此在我們在做端到端調(diào)優(yōu)的時候肤晓,是否只需要針對 Checkpoint 周期做相關(guān)調(diào)整就可以呢爷贫?最簡單的是不是將 Checkpoint interval 進行調(diào)小操作呢?
在得出結(jié)論前我們先來看下寫入流程补憾。在 Paimon Sink 算子中漫萄,Bucket Writer 會源源不斷的把數(shù)據(jù)開放到磁盤的數(shù)據(jù)文件里,另外 Paimon Sink 還包含另外一個組件 Compact Manager盈匾。這個組件主要是針對磁盤上的數(shù)據(jù)文件不斷的做 Compaction腾务。如上圖右側(cè)展示,Compaction 在邏輯上是個 Bucket削饵,在存儲上是一個目錄岩瘦,目錄下會存放很多數(shù)據(jù)文件未巫,這些文件是由 LSM 樹組織的,分為多個 Level启昧。實際上 Compact Manager 在做 Compaction 的時候就是針對這些不同層的數(shù)據(jù)做的過程叙凡。
所以我們推斷,整個 Compaction 過程是一個 I/O 比較多的操作過程密末,假設(shè)一味的調(diào)小 Checkpoint Interval握爷,會導(dǎo)致頻繁的 Checkpoint,比如原來 100 條數(shù)據(jù)本來是能分一個文件里的严里,但是現(xiàn)在 Checkpoint 頻率過高后新啼,這 100 條數(shù)據(jù)可能會被分到多個文件里,那么每個文件里面的數(shù)據(jù)都會很小田炭。其次师抄,小文件過多,會讓 Compaction 的整體代價變得更高教硫,也會影響寫入的性能。其實這就是一個追求數(shù)據(jù)新鮮度的過程辆布,主要需要在數(shù)據(jù)的寫入性能和數(shù)據(jù)新鮮度上做權(quán)衡瞬矩。在經(jīng)過多方實踐驗證后,推薦將 Checkpoint Interval 設(shè)置為 1-2 分鐘為優(yōu)锋玲。
Checkpoint Latency 優(yōu)化可以分為幾個方向進行:
- Log-Based 增量 Checkpoint
利用 Flink 高版本的一些特性景用,如 Log-based 增量 Checkpoint 的方式去優(yōu)化上傳階段的耗時。
- 減少狀態(tài)量
比如減少上傳輸數(shù)據(jù)量惭蹂,那么上傳耗時就會減少伞插。
- Checkpoint 持續(xù)上傳
持續(xù)上傳本地狀態(tài)文件。
- 搭建獨立 HDFS 集群
減少遇到慢節(jié)點的概率盾碗。
[圖片上傳失敗...(image-75a910-1695032595538)]
經(jīng)過以上四種方向的優(yōu)化媚污,我們在實踐中得到驗證的結(jié)果是可以將端到端的延遲做到分鐘級。
小文件優(yōu)化
字節(jié)內(nèi)部的實踐是基于 HDFS 為存儲底座的廷雅,我們將小文件定義為明顯小于 HDFS 上一個 Block 大小的文件耗美。小文件引出最直接的問題就是文件數(shù)量太多導(dǎo)致需要更多的 Block,比如 Create Block航缀,Delete Block等商架,直接的影響就是 I/O 操作頻繁,會導(dǎo)致 HDFS 上的 NamaNode 壓力過大對穩(wěn)定性產(chǎn)生影響芥玉;另外蛇摸,無論文件本身有多大,它的 Block 的元信息是固定的灿巧,而這些元信息都是存在 NameNode 內(nèi)存里的赶袄,過多的 Block 元信息會造成內(nèi)存 OOM 問題揽涮;當數(shù)據(jù)太分散/文件數(shù)量太多時,數(shù)據(jù)就有可能被分配到更多的 HDFS 的 DataNode 里弃鸦,就會造成 DataNode 的來回跳轉(zhuǎn)绞吁,增加頻繁的隨機讀寫,使效率和性能變低唬格;并且分配的 DataNode 變多遇到慢節(jié)點的概率也會變大家破。
在小文件相關(guān)的問題中,決定是否產(chǎn)生小文件的時機和因素有以下幾點:
文件生成购岗。數(shù)據(jù)文件在磁盤上生成是有兩個觸發(fā)時機的汰聋,一個是 Checkpoint 的時候,它會強制把當前的 WriteBuffer 里的數(shù)據(jù)刷到磁盤上喊积;第二個是 WriteBuffer烹困,當它滿了也會把內(nèi)存里面的數(shù)據(jù)刷到磁盤上。如果把 Checkpoint Interval 調(diào)的過小乾吻,或是把 WriteBuffer 容量設(shè)置的比較小髓梅,那么數(shù)據(jù)就會更頻繁的被刷到磁盤上,而產(chǎn)生過量的小文件绎签。
文件劃分枯饿。通過設(shè)置一些 Partition key 或 Bucket key,就決定了數(shù)據(jù)的走向诡必,它會落在哪些文件里奢方。比如,生產(chǎn)中實際數(shù)量非常小爸舒,同時又設(shè)置了過多的 Bucket蟋字,那么可以預(yù)見,一個 Bucket 可以分到的數(shù)據(jù)量一定會比較小扭勉。這個過程中也會遇到小文件問題鹊奖。另外,如果設(shè)置 Partition key 或 Bucket key 不合理剖效,可能會產(chǎn)生一些文件傾斜的情況嫉入,即熱 Key 問題。
文件清理璧尸。Paimon 具有文件清理機制咒林,在 Compaction 過程中會刪除一些無用的文件。另外爷光,數(shù)據(jù)由 Snapshot 管理垫竞,如果 Snapshot 過期,就會從磁盤上刪除對應(yīng)的數(shù)據(jù)文件。如果 Compaction 觸發(fā)條件和 Snapshot 過期條件沒有管理好欢瞪,也會造成冗余的文件留在磁盤上活烙。
基于以上的介紹,分享一下我們在實踐過程中積累的一些小文件調(diào)優(yōu)參數(shù)遣鼓,見下表所示啸盏。
Checkpoint interval::推薦在 1-2 min 比較合適;
WriteBuffer 大衅锼睢:推薦使用默認值回懦,除非遇到相關(guān)問題需要調(diào)整;
業(yè)務(wù)數(shù)據(jù)量:可以根據(jù)業(yè)務(wù)數(shù)據(jù)量調(diào)節(jié) Bucket 數(shù)次企,調(diào)整依據(jù)為單個 Bucket 大小在 1G 左右比較合適怯晕;
Key 的設(shè)置:可以根據(jù)實際的業(yè)務(wù) Key 特點設(shè)置合適的 Bucket-key、Partition缸棵,以防產(chǎn)生熱 Key 傾斜的問題舟茶;
Compaction 管理和 Snapshot 管理相關(guān)參數(shù):推薦使用默認值,除非遇到相關(guān)問題需要調(diào)整堵第。
經(jīng)歷了整個架構(gòu)改造之后吧凉,我們將原有實時數(shù)倉鏈路做了對比,如下圖可見踏志,在很多指標上都獲得了一定的收益客燕。
端到端延遲:在原始實時數(shù)倉開啟 Mini-batch 的情況下,端到端延遲沒有明顯退化狰贯,可以支持 1-2 min 的近實時可見;
數(shù)據(jù)排查時效性:可以從小時級提升到分鐘級赏廓;
狀態(tài)量節(jié)省了約 30%涵紊;
開發(fā)周期縮短約 50%。
四幔摸、未來規(guī)劃
當前主要規(guī)劃了以下四個方向:
首先摸柄,秒級端到端延遲的嘗試〖纫洌可能會分幾期來做驱负,計劃引入 Embeded Log System 來解決這個問題。長期來看患雇,會把數(shù)據(jù)可見性與 Checkpoint 解綁跃脊;
其次,數(shù)據(jù)一致性管理苛吱。血緣關(guān)系管理和數(shù)據(jù)一致性管理這兩個方面酪术,在實際數(shù)據(jù)運維中是很有價值的;
第三,狀態(tài)復(fù)用绘雁。狀態(tài)復(fù)用主要是解決 Join 狀態(tài)復(fù)用的問題橡疼。另外,我們希望可以做到中間狀態(tài)可查庐舟;
第四欣除,監(jiān)控運維。未來當規(guī)模上去挪略,希望可以建立監(jiān)控體系历帚,并做到指標可觀測。
Q&A
Q:請問在數(shù)據(jù)源異構(gòu)的情況下瘟檩,是否考慮過其他入湖的技術(shù)選型抹缕?為何最終選擇了 Paimon?
A:在最初技術(shù)選型的時候墨辛,主要考慮幾個點卓研,一個是跟 Flink 生態(tài)的結(jié)合,一個是 Streaming Warehouse 這種模型睹簇,當時與這兩點結(jié)合最好的是 Paimon奏赘,另外在我們 Steaming upsert 的主流場景下情況更優(yōu)。
另外太惠,對于中間存儲層磨淌,是 Plugin 的模式,不是說一定要和 Paimon 做很深的綁定凿渊。
Q:請問在做數(shù)據(jù)回溯梁只、快照和回滾的時候,有做過哪些考慮埃脏?能夠給一些可供參考的建議搪锣?
A:在這個方面我們主要是基于 Paimon 做了血緣管理的功能。血緣關(guān)系管理簡單來講分為兩個部分:第一部分是表的血緣關(guān)系管理彩掐;第二部分是數(shù)據(jù)的血緣關(guān)系管理构舟。
表的血緣關(guān)系管理,比如在提交作業(yè)的時候堵幽,通過任務(wù)可以提取出它上下游表的信息狗超,然后插入到引入的 System Database 里。數(shù)據(jù)血緣關(guān)系管理朴下,可以根據(jù) Checkpoint 去劃分數(shù)據(jù)版本努咐,一個 Checkpoint 完成之后就意味著一個版本數(shù)據(jù)的產(chǎn)生。然后再把具體消費了哪個版本的數(shù)據(jù)桐猬,記錄到 System Database 里麦撵。
基于這兩種血緣關(guān)系管理,既可以保持舊鏈路在線服務(wù)的狀態(tài),也能保障新鏈路回溯數(shù)據(jù)或訂正數(shù)據(jù)成為可能免胃。在生產(chǎn)環(huán)境中音五,由系統(tǒng)層面把表自動切換,就可以完成一次數(shù)據(jù)回溯羔沙。
Q:請問用流本身去處理數(shù)據(jù)躺涝,如果數(shù)據(jù)量過大,是否會造成獲取數(shù)據(jù)源開口的環(huán)節(jié)擁堵扼雏,以至于數(shù)據(jù)進不來坚嗜?
A:這是一個寫入性能優(yōu)化的問題,在 Paimon 官網(wǎng)上有專門針對這塊的詳細指導(dǎo)诗充,大家可以去了解下苍蔬。
Flink Forward Asia 2023 正式啟動