Feed實時數(shù)倉

實時數(shù)倉

背景

在早期數(shù)倉建設(shè)中何荚,大多是以批處理的方式為基線進(jìn)行開發(fā)须教,隨著業(yè)務(wù)發(fā)展嗜闻,需求對于時效性和準(zhǔn)確性的要求越來越高,為能滿足日益強烈的訴求典勇,并且兼容批處理本身已有的基礎(chǔ)建設(shè)劫哼,本文結(jié)合當(dāng)前infra team所能支撐的前提下,對實時數(shù)倉構(gòu)建的一些想法

現(xiàn)有方式的一些不足

lambda

概述

lambda

作為最為經(jīng)典和廣泛應(yīng)用的設(shè)計割笙,抽象如下:

  • 所有的數(shù)據(jù)需要分別寫入批處理層和流處理層
  • 批處理持久化數(shù)據(jù)权烧,并做預(yù)計算
  • 流處理作為速度層,對實時數(shù)據(jù)計算近似的 real-time view伤溉,作為高延遲 batch view 的快速補償視圖
  • 所有查詢需要合并batch view 和 real-time view

優(yōu)勢

  • 在實時性和準(zhǔn)確性上達(dá)到均衡般码,從而滿足業(yè)務(wù)訴求
  • 批處理持久化存儲,數(shù)據(jù)可重放乱顾,可擴(kuò)展性和容錯性較好
  • 流處理產(chǎn)出近似結(jié)果板祝,時效性上得到滿足

不足

  • 數(shù)據(jù)從源頭上區(qū)分傳輸和存儲,存在數(shù)據(jù)一致性問題
  • 兩套計算邏輯走净,開發(fā)維護(hù)成本較大
  • 因為框架的天然劣勢券时,實時只能是近似計算,需要T+1的批處理進(jìn)行修正

kappa

概述
核心點在于去掉批處理的數(shù)據(jù)流向温技,全部用流處理代替革为,近些年也有依賴同DB的方式,即兼容OLAP和OLTP舵鳞,統(tǒng)一數(shù)據(jù)流向

kappa

優(yōu)勢

  • 數(shù)據(jù)源頭不一致問題得到改善震檩,或者全部持久化到消息中間件(kafka為代表),或者consume消息中間件蜓堕,持久化到兼容DB(Tidb為代表)
  • 服務(wù)層不一致問題得到改善抛虏,通過重放持久化的消息中間件或兼容DB,實現(xiàn)容錯或可擴(kuò)展目的

不足

  • 無法解決存儲成本套才,以kafka為例迂猴,kafka的壓縮能力有限,存儲成本較大
  • 計算資源開銷隨著窗口步長的增加而飆升背伴。例如計算月活或年報統(tǒng)計時沸毁,涉及到大狀態(tài)的緩存峰髓,無論是持久化到kafka或兼容DB,可重放性極差息尺,存在消費雪崩甚至無法消費的情況

kappa+

概述

核心點在于統(tǒng)一計算引擎携兵,同套計算邏輯消費不同流向的數(shù)據(jù),以flink為代表搂誉,無論是datastream/dataset徐紧,還是力推的flink sql,皆以此為目的

kappa+

優(yōu)勢

  • 計算過程不一致問題得到改善炭懊,以flink sql為例并级,流或批處理時,無需維護(hù)兩套代碼侮腹,由框架本身承接場景切換時的兼容邏輯

不足

  • 不能完全兼容hive sql/spark sql嘲碧,已經(jīng)成熟的數(shù)倉體系下,切換有較大成本
  • 流批sql同個function實際表達(dá)的功能可能不同
  • 在流join或大數(shù)據(jù)量去重上父阻,開銷或?qū)崿F(xiàn)成本仍較大或較為困難

總結(jié)

可以看到呀潭,痛點主要有以下三點:

  • 數(shù)據(jù)源的流向和存儲不同,從源頭上不能一致
  • 計算邏輯或框架不同至非,計算過程不能一致
  • 計算結(jié)果需要合并,服務(wù)層不能一致

因此糠聪,迭代方案主要圍繞以上三點進(jìn)行優(yōu)化

設(shè)計思路

我們假設(shè)數(shù)據(jù)源只包含業(yè)務(wù)庫(以MySQL為代表)和用戶埋點(Nginx Log)

如圖所示荒椭,實時數(shù)倉的建設(shè)思路中,大量參考和綁定離線數(shù)倉的構(gòu)建舰蟆,包括數(shù)據(jù)鏈路趣惠,加工流程,交互的Services身害。重點強調(diào)三個”Same“

lambda+

Same Source

即流或批處理的源數(shù)據(jù)流向統(tǒng)一由consume kafka獲得味悄,通過at least once的機制,確保數(shù)據(jù)在消費階段不會丟失

same source
  • At least once導(dǎo)致的消費重復(fù)塌鸯,流侍瑟、批作業(yè)需根據(jù)場景選擇性的處理,比如ETL process的DB支持冪等丙猬,Application不需要任何處理涨颜。下游DB是append-only的,則在層級流轉(zhuǎn)時茧球,做額外的去重庭瑰、排序處理
  • Producer的ack也是需要根據(jù)場景靈活調(diào)整。例如cdc類絕不允許丟失的情況(kafka極端丟失的case除外)抢埋,可以給-1弹灭。針對埋點類數(shù)據(jù)量龐大的督暂,為了保證吞吐,容忍一部分潛在的丟失風(fēng)險穷吮,可以給1
  • Same source只確保流逻翁、批作業(yè)的original datasource一致,避免傳統(tǒng)lambda的方式中酒来,兩套作業(yè)是完全獨立的路徑卢未,為之后的數(shù)據(jù)一致性、補償機制上帶來大量的工作量
  • 針對cdc類數(shù)據(jù)堰汉,批處理則需要額外的row number來維護(hù)T+1的狀態(tài)

Same SQL

即無論是流辽社、批處理,所處理的(submit application)的引擎不同翘鸭,語義可能也不同滴铅,但需要確保業(yè)務(wù)(加工)邏輯一致

same sql
  • 在主流引擎中,流就乓、批處理統(tǒng)一用一套框架不是很現(xiàn)實汉匙。flink對批處理的兼容程度(例如大量的Hive/Spark built-in func不支持,對復(fù)雜數(shù)據(jù)類型支持較弱生蚁,讀Hive表數(shù)據(jù)略復(fù)雜)不夠噩翠。Spark對流處理,在state和event time上沒有Flink優(yōu)秀邦投,因此還是lambda的方式伤锚,兩套引擎對應(yīng)兩種場景
  • 盡量避免兩套代碼的開發(fā),最大程度上直接復(fù)用/移植批處理的代碼志衣,即Spark sql = Flink sql[+udf]
  • 對于窗口聚合等語義不一致的點屯援,需要確保業(yè)務(wù)邏輯的代碼保持一致

Same DB[option]

即OLAP,OLTP兩種模式的兼容

  • 終極目標(biāo)是一套DB可以兼容念脯。既能支持事務(wù)狞洋,有類cdc的通知機制,亦能支持多范圍绿店、多層次吉懊、大時間粒度的統(tǒng)計。例如OceanBase假勿、TiDB
  • 在實際場景中惕它,大多是數(shù)倉ODS或DWD層的DB統(tǒng)一。特點是開發(fā)或遷徙成本低废登,ETL process不復(fù)雜淹魄,強貼合Hive或Hadoop套件。例如Hudi堡距,或流式寫Hive(+presto/impala)
  • 實時數(shù)倉的DWS因為業(yè)務(wù)場景甲锡、業(yè)務(wù)需求的不同兆蕉,幾乎都是case by case的方案,很難做到一套框架+DB的統(tǒng)一

框架選擇

根據(jù)上文提到的三個“Same”缤沦,框架選擇則如下圖所示

  • 業(yè)務(wù)庫和埋點數(shù)據(jù)ingest到kafka
  • 計算統(tǒng)一使用Flink虎韵,且以SQL為主
框架選擇

數(shù)倉分層

數(shù)倉分層

關(guān)鍵計算邏輯描述

計算UV

流內(nèi)構(gòu)建

bitmap

即在流內(nèi)通過狀態(tài)維護(hù)bitmap,相對于HyperLogLog可以做到精準(zhǔn)去重缸废,且資源開銷在可接受范圍內(nèi)包蓝。如以下的示例:

StateTtlConfig ttlConfig = StateTtlConfig
        .newBuilder(Time.days(2))
        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
        .build();
 
 
ValueStateDescriptor<Roaring64Bitmap> bitmapDescriptor = new ValueStateDescriptor(
        "Roaring64Bitmap",
        TypeInformation.of(new TypeHint<Roaring64Bitmap>() {
        }));
 
 
 
bitmapDescriptor.enableTimeToLive(ttlConfig);
 
 
 
// ...
if (!bitmap.contains(uid)) {
    bitmap.addLong(uid);
    bitmapState.update(bitmap);
}

缺陷在于

  1. 純flink sql的count distinct,state的實現(xiàn)與之不同企量,需要額外的udf
  2. 對于多個key而言测萎,累計開銷巨大。例如通常我們計算uv的維度組合不止一個
  3. 基于第2點届巩,通常業(yè)務(wù)需求的時間范圍或跨度都較大硅瞧,例如當(dāng)日實時uv,累計uv恕汇,新用戶數(shù)等腕唧。state在不設(shè)置ttl的情況下,會導(dǎo)致cp時間過長瘾英,背壓枣接,甚至流量雪崩的情況
  4. 強依賴于state

將count distinct 轉(zhuǎn)化為 count(1)

借助外部存儲

通俗而言,就是不借助flink本身的state缺谴,而是靠外存來維護(hù)bitmap或其他結(jié)構(gòu)月腋,在query層完成最終的統(tǒng)計(或同時在外存中維護(hù)統(tǒng)計值)


bitmap sink hbase
api merge

好處在于flink除IO外存時,沒有額外的開銷瓣赂,state也不大,并且因為外存的原因片拍,保證at least once即可

缺陷在于

  1. 外存序列化/反序列化的IO開銷
  2. 同樣受多個key的限制
  3. 寫壓力 -> 讀壓力的轉(zhuǎn)移煌集,query的性能和并發(fā)受較大影響

query下沉

流任務(wù)只承擔(dān)ETL的功能,保證數(shù)據(jù)不丟失捌省。明細(xì)數(shù)據(jù)灌入例如doris/hudi等組件苫纤,在query層完成最終的聚合統(tǒng)計。優(yōu)勢在于解放流任務(wù)纲缓,將壓力最終轉(zhuǎn)移到query卷拘。缺陷依然是在大數(shù)據(jù)量、高并發(fā)的情況下祝高,開銷和查詢性能的影響

寬表構(gòu)建

流JOIN

flink joining栗弟,即在流中完成join操作。Flink具有多樣的join語義工闺,且易用性很高乍赫。主要的問題點在于瓣蛀,流內(nèi)join時(無論是事實join事實,還是事實join維表雷厂,都在變化之中)惋增,無法判斷數(shù)據(jù)是否全部到達(dá)。因為生產(chǎn)環(huán)境數(shù)據(jù)量的原因改鲫,不可能做全關(guān)聯(lián)诈皿。窗口join則需要考慮數(shù)據(jù)延遲、是否回補像棘,retrigger的機制等稽亏,開發(fā)和維護(hù)成本急劇上升。因此大多用于對時效性要求較高讲弄,一致性要求不強措左,業(yè)務(wù)邏輯不復(fù)雜的場景

流join
  • 全關(guān)聯(lián)需要考慮大狀態(tài)的資源開銷、故障恢復(fù)的成本
  • 窗口join需要考慮數(shù)據(jù)延遲帶來的額外成本避除,比如窗口等待的時長和時效性的取舍呕乎,遲到數(shù)據(jù)的回補等
  • 側(cè)流數(shù)據(jù)沒有解決遲到數(shù)據(jù)的回補,從另一種意義上而言篮条,還會導(dǎo)致額外的數(shù)據(jù)重復(fù)(sink不冪等的話)

Source Union

多流union馍驯,按主鍵更新不同列,變相實現(xiàn)join的功能群井。優(yōu)勢在于状飞,流處理僅僅是ETL,邏輯復(fù)雜度低书斜∥鼙玻靠sink DB支持upsert和按列更新(或更新非空列)完成功能。但缺陷在于荐吉,依然沒有解決遲到數(shù)據(jù)的retrigger(即什么時候算數(shù)據(jù)全部到達(dá)焙糟,補充完成,下游可以重新計算)样屠,并且一定程度上犧牲了時效性保障一致性


source union

Sink Union

分兩種情況

1.事實流union穿撮,在DB不支持按列更新的前提下,在聚合前union case when 的方式聚合計算

sink union 1

2.事實流關(guān)聯(lián)維表痪欲,在聚合完成后悦穿,query查詢前,點查維表去format維度屬性

sink union 2

與離線數(shù)倉聯(lián)動和差異

數(shù)據(jù)補償

在業(yè)務(wù)需求中业踢,并不全是只依賴或計算增量數(shù)據(jù)栗柒,例如最近7天的訂單數(shù),支付金額等知举,又或者在流任務(wù)故障傍衡,需要從離線數(shù)倉回補時深员,都需要與存量數(shù)據(jù)交互。存量數(shù)據(jù)初始化或數(shù)據(jù)補償?shù)牧鞒倘鐖D所示

利用Hudi提供的bulk_insert的write.operation(類似hbase的bulkload)將Hive表中的數(shù)據(jù)load到Hudi
任務(wù)結(jié)束后蛙埂,增量任務(wù)調(diào)起倦畅。兩者之間的records有一定堆疊,避免數(shù)據(jù)丟失


數(shù)據(jù)補償

共享ODS绣的、DIM[option]

將原本小時級/天級的ODS ETL任務(wù)改由流式sink叠赐,將速度提升至分鐘/小時級

  • 加速原本T+1才能產(chǎn)出的業(yè)務(wù)需求
  • 確保流和批的數(shù)據(jù)源一致,除卻加工邏輯和語義的不同外屡江,不會有源數(shù)據(jù)所帶來的數(shù)據(jù)不一致

數(shù)據(jù)延遲芭概、亂序

數(shù)據(jù)丟失

維表JOIN

對于遲到數(shù)據(jù)的trigger

具體實施

主旨思想

  1. 對本身有窗口要求的業(yè)務(wù)需求,對實時敏感程度一般惩嘉,要求一致性的業(yè)務(wù)需求罢洲,從流 -> 批轉(zhuǎn)化
    例如每15分鐘/每小時的當(dāng)天uv,pv文黎,gmv
  2. 對原本的T+1輸出的業(yè)務(wù)需求惹苗,從天 -> 小時的轉(zhuǎn)化,即流式寫ODS耸峭,給下游層級做加速
  3. 對于無法規(guī)避的業(yè)務(wù)需求桩蓉,例如對實時敏感程度高,即時trigger類需求劳闹,抽象流kafka(dwd邏輯表)
  4. 數(shù)據(jù)流向不宜過長院究,通常為ODS -> [DWD] -> DWS
  5. dws因業(yè)務(wù)需求各異,沒有統(tǒng)一的方案輸出

流DW

退維

ETL

公共邏輯下沉

Hudi的一些建議

index.type的選擇

index type

table.type的選擇

image.png

index.bootstrap.enabled的潛在問題

  • 參數(shù)為是否將已存在表中的記錄的index load到flink state本涕,默認(rèn)為false
  • 索引導(dǎo)入是一個阻塞過程业汰,在這期間無法完成cp
  • index bootstrap由輸入數(shù)據(jù)觸發(fā),需要確保每個分區(qū)中至少有一條記錄
  • index bootstrap是并發(fā)執(zhí)行的菩颖,可以在日志文件中通過finish loading the index under partition以及Load record form file觀察index bootstrap的進(jìn)度
  • 第一個成功的checkpoint表明index bootstrap已完成样漆。 從checkpoint恢復(fù)時,不需要再次加載索引
  • sink 多個hudi表時位他,需要將write.index_bootstrap.tasks set 為1,否則會出現(xiàn)部分index丟失的情況产场。sink 單hudi表時沒有這個限制
  • exist hudi 表數(shù)據(jù)量極大時鹅髓,index導(dǎo)入會失敗

其他

  • cow表強依賴于flink cp,因為寫放大的問題京景,通常建議調(diào)大cp的間隔和timeout窿冯,避免因cp失敗導(dǎo)致的數(shù)據(jù)無法更新
  • flink on hudi不支持schema evolution,字段變更會頻繁的變動任務(wù)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末确徙,一起剝皮案震驚了整個濱河市醒串,隨后出現(xiàn)的幾起案子执桌,更是在濱河造成了極大的恐慌,老刑警劉巖芜赌,帶你破解...
    沈念sama閱讀 207,248評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件仰挣,死亡現(xiàn)場離奇詭異,居然都是意外死亡缠沈,警方通過查閱死者的電腦和手機膘壶,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評論 2 381
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來洲愤,“玉大人颓芭,你說我怎么就攤上這事〖泶停” “怎么了亡问?”我有些...
    開封第一講書人閱讀 153,443評論 0 344
  • 文/不壞的土叔 我叫張陵,是天一觀的道長肛宋。 經(jīng)常有香客問我州藕,道長,這世上最難降的妖魔是什么悼吱? 我笑而不...
    開封第一講書人閱讀 55,475評論 1 279
  • 正文 為了忘掉前任慎框,我火速辦了婚禮,結(jié)果婚禮上后添,老公的妹妹穿的比我還像新娘笨枯。我一直安慰自己,他們只是感情好遇西,可當(dāng)我...
    茶點故事閱讀 64,458評論 5 374
  • 文/花漫 我一把揭開白布馅精。 她就那樣靜靜地躺著,像睡著了一般粱檀。 火紅的嫁衣襯著肌膚如雪洲敢。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,185評論 1 284
  • 那天茄蚯,我揣著相機與錄音压彭,去河邊找鬼。 笑死渗常,一個胖子當(dāng)著我的面吹牛壮不,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播皱碘,決...
    沈念sama閱讀 38,451評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼询一,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起健蕊,我...
    開封第一講書人閱讀 37,112評論 0 261
  • 序言:老撾萬榮一對情侶失蹤菱阵,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后缩功,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體晴及,經(jīng)...
    沈念sama閱讀 43,609評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,083評論 2 325
  • 正文 我和宋清朗相戀三年掂之,在試婚紗的時候發(fā)現(xiàn)自己被綠了抗俄。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 38,163評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡世舰,死狀恐怖动雹,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情跟压,我是刑警寧澤胰蝠,帶...
    沈念sama閱讀 33,803評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站震蒋,受9級特大地震影響茸塞,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜查剖,卻給世界環(huán)境...
    茶點故事閱讀 39,357評論 3 307
  • 文/蒙蒙 一钾虐、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧笋庄,春花似錦效扫、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,357評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至静暂,卻和暖如春济丘,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背洽蛀。 一陣腳步聲響...
    開封第一講書人閱讀 31,590評論 1 261
  • 我被黑心中介騙來泰國打工摹迷, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人郊供。 一個月前我還...
    沈念sama閱讀 45,636評論 2 355
  • 正文 我出身青樓峡碉,卻偏偏與公主長得像,于是被迫代替她去往敵國和親颂碘。 傳聞我的和親對象是個殘疾皇子异赫,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,925評論 2 344

推薦閱讀更多精彩內(nèi)容