實時數(shù)倉
背景
在早期數(shù)倉建設(shè)中何荚,大多是以批處理的方式為基線進(jìn)行開發(fā)须教,隨著業(yè)務(wù)發(fā)展嗜闻,需求對于時效性和準(zhǔn)確性的要求越來越高,為能滿足日益強烈的訴求典勇,并且兼容批處理本身已有的基礎(chǔ)建設(shè)劫哼,本文結(jié)合當(dāng)前infra team所能支撐的前提下,對實時數(shù)倉構(gòu)建的一些想法
現(xiàn)有方式的一些不足
lambda
概述
作為最為經(jīng)典和廣泛應(yīng)用的設(shè)計割笙,抽象如下:
- 所有的數(shù)據(jù)需要分別寫入批處理層和流處理層
- 批處理持久化數(shù)據(jù)权烧,并做預(yù)計算
- 流處理作為速度層,對實時數(shù)據(jù)計算近似的 real-time view伤溉,作為高延遲 batch view 的快速補償視圖
- 所有查詢需要合并batch view 和 real-time view
優(yōu)勢
- 在實時性和準(zhǔn)確性上達(dá)到均衡般码,從而滿足業(yè)務(wù)訴求
- 批處理持久化存儲,數(shù)據(jù)可重放乱顾,可擴(kuò)展性和容錯性較好
- 流處理產(chǎn)出近似結(jié)果板祝,時效性上得到滿足
不足
- 數(shù)據(jù)從源頭上區(qū)分傳輸和存儲,存在數(shù)據(jù)一致性問題
- 兩套計算邏輯走净,開發(fā)維護(hù)成本較大
- 因為框架的天然劣勢券时,實時只能是近似計算,需要T+1的批處理進(jìn)行修正
kappa
概述
核心點在于去掉批處理的數(shù)據(jù)流向温技,全部用流處理代替革为,近些年也有依賴同DB的方式,即兼容OLAP和OLTP舵鳞,統(tǒng)一數(shù)據(jù)流向
優(yōu)勢
- 數(shù)據(jù)源頭不一致問題得到改善震檩,或者全部持久化到消息中間件(kafka為代表),或者consume消息中間件蜓堕,持久化到兼容DB(Tidb為代表)
- 服務(wù)層不一致問題得到改善抛虏,通過重放持久化的消息中間件或兼容DB,實現(xiàn)容錯或可擴(kuò)展目的
不足
- 無法解決存儲成本套才,以kafka為例迂猴,kafka的壓縮能力有限,存儲成本較大
- 計算資源開銷隨著窗口步長的增加而飆升背伴。例如計算月活或年報統(tǒng)計時沸毁,涉及到大狀態(tài)的緩存峰髓,無論是持久化到kafka或兼容DB,可重放性極差息尺,存在消費雪崩甚至無法消費的情況
kappa+
概述
核心點在于統(tǒng)一計算引擎携兵,同套計算邏輯消費不同流向的數(shù)據(jù),以flink為代表搂誉,無論是datastream/dataset徐紧,還是力推的flink sql,皆以此為目的
優(yōu)勢
- 計算過程不一致問題得到改善炭懊,以flink sql為例并级,流或批處理時,無需維護(hù)兩套代碼侮腹,由框架本身承接場景切換時的兼容邏輯
不足
- 不能完全兼容hive sql/spark sql嘲碧,已經(jīng)成熟的數(shù)倉體系下,切換有較大成本
- 流批sql同個function實際表達(dá)的功能可能不同
- 在流join或大數(shù)據(jù)量去重上父阻,開銷或?qū)崿F(xiàn)成本仍較大或較為困難
總結(jié)
可以看到呀潭,痛點主要有以下三點:
- 數(shù)據(jù)源的流向和存儲不同,從源頭上不能一致
- 計算邏輯或框架不同至非,計算過程不能一致
- 計算結(jié)果需要合并,服務(wù)層不能一致
因此糠聪,迭代方案主要圍繞以上三點進(jìn)行優(yōu)化
設(shè)計思路
我們假設(shè)數(shù)據(jù)源只包含業(yè)務(wù)庫(以MySQL為代表)和用戶埋點(Nginx Log)
如圖所示荒椭,實時數(shù)倉的建設(shè)思路中,大量參考和綁定離線數(shù)倉的構(gòu)建舰蟆,包括數(shù)據(jù)鏈路趣惠,加工流程,交互的Services身害。重點強調(diào)三個”Same“
Same Source
即流或批處理的源數(shù)據(jù)流向統(tǒng)一由consume kafka獲得味悄,通過at least once的機制,確保數(shù)據(jù)在消費階段不會丟失
- At least once導(dǎo)致的消費重復(fù)塌鸯,流侍瑟、批作業(yè)需根據(jù)場景選擇性的處理,比如ETL process的DB支持冪等丙猬,Application不需要任何處理涨颜。下游DB是append-only的,則在層級流轉(zhuǎn)時茧球,做額外的去重庭瑰、排序處理
- Producer的ack也是需要根據(jù)場景靈活調(diào)整。例如cdc類絕不允許丟失的情況(kafka極端丟失的case除外)抢埋,可以給-1弹灭。針對埋點類數(shù)據(jù)量龐大的督暂,為了保證吞吐,容忍一部分潛在的丟失風(fēng)險穷吮,可以給1
- Same source只確保流逻翁、批作業(yè)的original datasource一致,避免傳統(tǒng)lambda的方式中酒来,兩套作業(yè)是完全獨立的路徑卢未,為之后的數(shù)據(jù)一致性、補償機制上帶來大量的工作量
- 針對cdc類數(shù)據(jù)堰汉,批處理則需要額外的row number來維護(hù)T+1的狀態(tài)
Same SQL
即無論是流辽社、批處理,所處理的(submit application)的引擎不同翘鸭,語義可能也不同滴铅,但需要確保業(yè)務(wù)(加工)邏輯一致
- 在主流引擎中,流就乓、批處理統(tǒng)一用一套框架不是很現(xiàn)實汉匙。flink對批處理的兼容程度(例如大量的Hive/Spark built-in func不支持,對復(fù)雜數(shù)據(jù)類型支持較弱生蚁,讀Hive表數(shù)據(jù)略復(fù)雜)不夠噩翠。Spark對流處理,在state和event time上沒有Flink優(yōu)秀邦投,因此還是lambda的方式伤锚,兩套引擎對應(yīng)兩種場景
- 盡量避免兩套代碼的開發(fā),最大程度上直接復(fù)用/移植批處理的代碼志衣,即Spark sql = Flink sql[+udf]
- 對于窗口聚合等語義不一致的點屯援,需要確保業(yè)務(wù)邏輯的代碼保持一致
Same DB[option]
即OLAP,OLTP兩種模式的兼容
- 終極目標(biāo)是一套DB可以兼容念脯。既能支持事務(wù)狞洋,有類cdc的通知機制,亦能支持多范圍绿店、多層次吉懊、大時間粒度的統(tǒng)計。例如OceanBase假勿、TiDB
- 在實際場景中惕它,大多是數(shù)倉ODS或DWD層的DB統(tǒng)一。特點是開發(fā)或遷徙成本低废登,ETL process不復(fù)雜淹魄,強貼合Hive或Hadoop套件。例如Hudi堡距,或流式寫Hive(+presto/impala)
- 實時數(shù)倉的DWS因為業(yè)務(wù)場景甲锡、業(yè)務(wù)需求的不同兆蕉,幾乎都是case by case的方案,很難做到一套框架+DB的統(tǒng)一
框架選擇
根據(jù)上文提到的三個“Same”缤沦,框架選擇則如下圖所示
- 業(yè)務(wù)庫和埋點數(shù)據(jù)ingest到kafka
- 計算統(tǒng)一使用Flink虎韵,且以SQL為主
數(shù)倉分層
關(guān)鍵計算邏輯描述
計算UV
流內(nèi)構(gòu)建
bitmap
即在流內(nèi)通過狀態(tài)維護(hù)bitmap,相對于HyperLogLog可以做到精準(zhǔn)去重缸废,且資源開銷在可接受范圍內(nèi)包蓝。如以下的示例:
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.days(2))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<Roaring64Bitmap> bitmapDescriptor = new ValueStateDescriptor(
"Roaring64Bitmap",
TypeInformation.of(new TypeHint<Roaring64Bitmap>() {
}));
bitmapDescriptor.enableTimeToLive(ttlConfig);
// ...
if (!bitmap.contains(uid)) {
bitmap.addLong(uid);
bitmapState.update(bitmap);
}
缺陷在于
- 純flink sql的count distinct,state的實現(xiàn)與之不同企量,需要額外的udf
- 對于多個key而言测萎,累計開銷巨大。例如通常我們計算uv的維度組合不止一個
- 基于第2點届巩,通常業(yè)務(wù)需求的時間范圍或跨度都較大硅瞧,例如當(dāng)日實時uv,累計uv恕汇,新用戶數(shù)等腕唧。state在不設(shè)置ttl的情況下,會導(dǎo)致cp時間過長瘾英,背壓枣接,甚至流量雪崩的情況
- 強依賴于state
將count distinct 轉(zhuǎn)化為 count(1)
借助外部存儲
通俗而言,就是不借助flink本身的state缺谴,而是靠外存來維護(hù)bitmap或其他結(jié)構(gòu)月腋,在query層完成最終的統(tǒng)計(或同時在外存中維護(hù)統(tǒng)計值)
好處在于flink除IO外存時,沒有額外的開銷瓣赂,state也不大,并且因為外存的原因片拍,保證at least once即可
缺陷在于
- 外存序列化/反序列化的IO開銷
- 同樣受多個key的限制
- 寫壓力 -> 讀壓力的轉(zhuǎn)移煌集,query的性能和并發(fā)受較大影響
query下沉
流任務(wù)只承擔(dān)ETL的功能,保證數(shù)據(jù)不丟失捌省。明細(xì)數(shù)據(jù)灌入例如doris/hudi等組件苫纤,在query層完成最終的聚合統(tǒng)計。優(yōu)勢在于解放流任務(wù)纲缓,將壓力最終轉(zhuǎn)移到query卷拘。缺陷依然是在大數(shù)據(jù)量、高并發(fā)的情況下祝高,開銷和查詢性能的影響
寬表構(gòu)建
流JOIN
flink joining栗弟,即在流中完成join操作。Flink具有多樣的join語義工闺,且易用性很高乍赫。主要的問題點在于瓣蛀,流內(nèi)join時(無論是事實join事實,還是事實join維表雷厂,都在變化之中)惋增,無法判斷數(shù)據(jù)是否全部到達(dá)。因為生產(chǎn)環(huán)境數(shù)據(jù)量的原因改鲫,不可能做全關(guān)聯(lián)诈皿。窗口join則需要考慮數(shù)據(jù)延遲、是否回補像棘,retrigger的機制等稽亏,開發(fā)和維護(hù)成本急劇上升。因此大多用于對時效性要求較高讲弄,一致性要求不強措左,業(yè)務(wù)邏輯不復(fù)雜的場景
- 全關(guān)聯(lián)需要考慮大狀態(tài)的資源開銷、故障恢復(fù)的成本
- 窗口join需要考慮數(shù)據(jù)延遲帶來的額外成本避除,比如窗口等待的時長和時效性的取舍呕乎,遲到數(shù)據(jù)的回補等
- 側(cè)流數(shù)據(jù)沒有解決遲到數(shù)據(jù)的回補,從另一種意義上而言篮条,還會導(dǎo)致額外的數(shù)據(jù)重復(fù)(sink不冪等的話)
Source Union
多流union馍驯,按主鍵更新不同列,變相實現(xiàn)join的功能群井。優(yōu)勢在于状飞,流處理僅僅是ETL,邏輯復(fù)雜度低书斜∥鼙玻靠sink DB支持upsert和按列更新(或更新非空列)完成功能。但缺陷在于荐吉,依然沒有解決遲到數(shù)據(jù)的retrigger(即什么時候算數(shù)據(jù)全部到達(dá)焙糟,補充完成,下游可以重新計算)样屠,并且一定程度上犧牲了時效性保障一致性
Sink Union
分兩種情況
1.事實流union穿撮,在DB不支持按列更新的前提下,在聚合前union case when 的方式聚合計算
2.事實流關(guān)聯(lián)維表痪欲,在聚合完成后悦穿,query查詢前,點查維表去format維度屬性
與離線數(shù)倉聯(lián)動和差異
數(shù)據(jù)補償
在業(yè)務(wù)需求中业踢,并不全是只依賴或計算增量數(shù)據(jù)栗柒,例如最近7天的訂單數(shù),支付金額等知举,又或者在流任務(wù)故障傍衡,需要從離線數(shù)倉回補時深员,都需要與存量數(shù)據(jù)交互。存量數(shù)據(jù)初始化或數(shù)據(jù)補償?shù)牧鞒倘鐖D所示
利用Hudi提供的bulk_insert的write.operation(類似hbase的bulkload)將Hive表中的數(shù)據(jù)load到Hudi
任務(wù)結(jié)束后蛙埂,增量任務(wù)調(diào)起倦畅。兩者之間的records有一定堆疊,避免數(shù)據(jù)丟失
共享ODS绣的、DIM[option]
將原本小時級/天級的ODS ETL任務(wù)改由流式sink叠赐,將速度提升至分鐘/小時級
- 加速原本T+1才能產(chǎn)出的業(yè)務(wù)需求
- 確保流和批的數(shù)據(jù)源一致,除卻加工邏輯和語義的不同外屡江,不會有源數(shù)據(jù)所帶來的數(shù)據(jù)不一致
數(shù)據(jù)延遲芭概、亂序
數(shù)據(jù)丟失
維表JOIN
對于遲到數(shù)據(jù)的trigger
具體實施
主旨思想
- 對本身有窗口要求的業(yè)務(wù)需求,對實時敏感程度一般惩嘉,要求一致性的業(yè)務(wù)需求罢洲,從流 -> 批轉(zhuǎn)化
例如每15分鐘/每小時的當(dāng)天uv,pv文黎,gmv - 對原本的T+1輸出的業(yè)務(wù)需求惹苗,從天 -> 小時的轉(zhuǎn)化,即流式寫ODS耸峭,給下游層級做加速
- 對于無法規(guī)避的業(yè)務(wù)需求桩蓉,例如對實時敏感程度高,即時trigger類需求劳闹,抽象流kafka(dwd邏輯表)
- 數(shù)據(jù)流向不宜過長院究,通常為ODS -> [DWD] -> DWS
- dws因業(yè)務(wù)需求各異,沒有統(tǒng)一的方案輸出
流DW
退維
ETL
公共邏輯下沉
Hudi的一些建議
index.type的選擇
table.type的選擇
index.bootstrap.enabled的潛在問題
- 參數(shù)為是否將已存在表中的記錄的index load到flink state本涕,默認(rèn)為false
- 索引導(dǎo)入是一個阻塞過程业汰,在這期間無法完成cp
- index bootstrap由輸入數(shù)據(jù)觸發(fā),需要確保每個分區(qū)中至少有一條記錄
- index bootstrap是并發(fā)執(zhí)行的菩颖,可以在日志文件中通過finish loading the index under partition以及Load record form file觀察index bootstrap的進(jìn)度
- 第一個成功的checkpoint表明index bootstrap已完成样漆。 從checkpoint恢復(fù)時,不需要再次加載索引
- sink 多個hudi表時位他,需要將write.index_bootstrap.tasks set 為1,否則會出現(xiàn)部分index丟失的情況产场。sink 單hudi表時沒有這個限制
- exist hudi 表數(shù)據(jù)量極大時鹅髓,index導(dǎo)入會失敗
其他
- cow表強依賴于flink cp,因為寫放大的問題京景,通常建議調(diào)大cp的間隔和timeout窿冯,避免因cp失敗導(dǎo)致的數(shù)據(jù)無法更新
- flink on hudi不支持schema evolution,字段變更會頻繁的變動任務(wù)