你是否有過躬贡,流批技術(shù)棧不統(tǒng)一的抓狂谆奥?
你是否有過,流批數(shù)據(jù)對不上的煩惱拂玻?
你是否有過酸些,海量數(shù)據(jù)update時效性跟不上的無奈? ...
有最新的數(shù)據(jù)湖Iceberg技術(shù)檐蚜,一切都迎刃而解魄懂!
接下來,以騰訊新聞為例闯第,分享下如何基于iceberg進行流批一體業(yè)務落地
一市栗、背景
眾所周知,在數(shù)據(jù)領(lǐng)域咳短,根據(jù)業(yè)務場景對數(shù)據(jù)時效性的要求不同填帽,數(shù)據(jù)可以劃分為:離線數(shù)據(jù)和實時數(shù)據(jù)兩大類型。針對不同的數(shù)據(jù)類型咙好,數(shù)據(jù)接入篡腌、數(shù)據(jù)處理、數(shù)據(jù)存儲以及數(shù)據(jù)輸出等各個環(huán)節(jié)的技術(shù)棧各不相同敷扫。從而導致目前基于不同技術(shù)棧構(gòu)建的數(shù)倉架構(gòu)也大相徑庭哀蘑。
目前市面上最流行的兩大數(shù)倉架構(gòu):Lambda與Kappa架構(gòu)
Lambda架構(gòu):流批分離模式
Kappa架構(gòu):批轉(zhuǎn)流模式
兩種架構(gòu)比較
總結(jié):
對比發(fā)現(xiàn),現(xiàn)行的lambda和kappa兩種數(shù)倉架構(gòu)在多個維度各有利弊葵第。
但是绘迁,都有一個致命的缺點就是 由于流批數(shù)據(jù)的存儲的不統(tǒng)一,導致后續(xù)的數(shù)據(jù)鏈路割裂卒密,進而導致無論是數(shù)據(jù)接入缀台、數(shù)據(jù)處理以及數(shù)據(jù)輸出等各個環(huán)節(jié)的成本驟然升高。
同時哮奇,對于海量數(shù)據(jù)的upsert時效性都難以保證膛腐。
二睛约、騰訊新聞業(yè)務
當前業(yè)務場景
在騰訊新聞業(yè)務中,文章是最核心的資源哲身,包括圖文辩涝、視頻等。那么勘天,精準的管理騰訊新聞中每一篇文章的完整生命周期中的每一個環(huán)節(jié)怔揩,無論是對文章作者通知反饋、還是對于平臺運營團隊的全局分析脯丝、乃至后臺算法的持續(xù)優(yōu)化迭代等等都是至關(guān)重要商膊。
那么,在騰訊新聞的文章管理中宠进,我們遇到了哪些問題呢晕拆?
遇到的問題
數(shù)據(jù)量級
數(shù)據(jù)量級龐大,字段豐富
騰訊新聞的文章數(shù)據(jù)基數(shù)龐大材蹬,千億級
騰訊新聞的 文章 各環(huán)節(jié)維度眾多实幕,多達幾百個以上
多維度導致的各個生命周期環(huán)節(jié)的 數(shù)據(jù)量 線性膨脹,單環(huán)多達 日均30-50億
數(shù)據(jù)源類型
包括 復雜多變的多種數(shù)據(jù)輸入類型
全量數(shù)據(jù)(靜態(tài)分區(qū)表赚导、離線文件)
準實時增量
消息流
數(shù)據(jù)使用方式
下游使用場景多樣化茬缩,對數(shù)據(jù)處理和輸出方式以及時效性也要求多樣化
流式消費
批加載
on-hoc條件檢索
準實時update
md多維分析
總結(jié):海量數(shù)據(jù),數(shù)據(jù)源多元化吼旧、數(shù)據(jù)使用多元化凰锡、時效性要求較高、流批兼顧
三圈暗、最終解決方案
技術(shù)選型分析
針對當前業(yè)務場景的 特性要求
1掂为、海量數(shù)據(jù),持續(xù)增加
2员串、數(shù)據(jù)類型:流勇哗、批兼顧
3、操作類型:append、upsert、del狱庇、load、on-hoc query扰法、md analysis等
4、數(shù)據(jù)時效性(實時毅厚、準實時塞颁、離線)
可以看出,基于傳統(tǒng)存儲方案的lamda和kappa架構(gòu) 都無法獨立優(yōu)雅、低成本的滿足此類【流批兼具】的場景需求祠锣。
那么我們該怎么辦呢酷窥?真實的業(yè)務場景需求永遠是推動技術(shù)革新的原動力。
為了解決傳統(tǒng)數(shù)據(jù)存儲中存在的這些問題伴网,大數(shù)據(jù)開源領(lǐng)域在存儲層表格式方面萌發(fā)了三種流批一體解決方案蓬推, Apache Iceberg, Delta Lake, Apache Hudi。這三個項目有著較多的共同點澡腾,首先拳氢, 它們都提供了ACID事務語義,其次蛋铆,它們都支持版本控制和時間旅行,再者放接,這三個項目都支持Schema Evolution刺啦,最后, 通過這些表格式,計算引擎可以方便的實現(xiàn)表的更新纠脾,刪除等操作玛瘸。
我們?yōu)槭裁催x擇了Iceberg
Apache Iceberg是由NetFlix最先主導開發(fā)的一個表格式,和Data Bricks的Delta Lake相比苟蹈,Iceberg有更好的開放性糊渊。
Iceberg 支持更多的底層文件格式,包括了Parquet, Orc, Avro, 而Data Bricks 的Delta Lake支持的文件格式只有Parquet 一種
Iceberg支持更多的計算引擎慧脱,目前Iceberg文件格式支持的計算引擎有Spark, Presto, Pig, 目前社區(qū)正在做Flink的集成渺绒,而Delta Lake由于DB公司的強控制,目前只支持Spark 一種引擎菱鸥,未來支持更多計算引擎的可能性也比較小宗兼。
而對比Apache Hudi, Iceberg是基于底層對象存儲的假設做的設計氮采,Hudi是基于底層HDFS存儲做的設計殷绍,這兩者各有優(yōu)劣
Iceberg不僅支持Java API 還支持Python API。
So鹊漠,考慮到后續(xù)業(yè)務變動帶來的表更新頻率主到,業(yè)務存儲媒介的多元化,項目本身的開放程度以及架構(gòu)特點躯概,
我們最終決定采用基于數(shù)據(jù)湖Iceberg技術(shù)的流批一體 數(shù)倉建設方案來承載騰訊新聞 文章業(yè)務分析
業(yè)務目標
旨在 完成 騰訊新聞文章 全生命周期的管理登钥,包括:圖文和視頻 文章
核心生命周期包含以下環(huán)節(jié):
1、文章素材收集
2楞陷、文章編輯與創(chuàng)建
3怔鳖、文章審核與標簽化
4、文章索引化(索引創(chuàng)建、索引上報结执、索引load)
5度陆、文章分發(fā)與推薦
6、...
下面 以 文章索引化 環(huán)節(jié)為例献幔,來重點介紹iceberg落地方案
整體設計
詳細設計
準實時流水明細
當前上游文章索引明細數(shù)據(jù)是基于H離線文件和準實時增量兩種輸出方式懂傀,后期迭代可能會演進成直接以streaming方式。
So蜡感,為了兼容后期的技術(shù)演進蹬蚁,減少改造成本,
我們采用spark filestream方式郑兴,【準實時】監(jiān)控hdfs 索引文件犀斋,按照既定append方式,【mico批】 落地iceberg 流水分區(qū)表
實時流式消費
在索引創(chuàng)建和加載完成后情连,為了能盡可能的提高 流水數(shù)據(jù)對下游的可見性叽粹,
采用flink stream方式,【實時流式消費】iceberg流水表却舀,進行多流JOIN后虫几,按照既定append方式,【批】 落地iceberg 流水分區(qū)表
實時流式mergeinto
基于部分下游對數(shù)據(jù)的時效性要求較高挽拔,例如辆脸,索引監(jiān)控等,我們采用了流式消費+mergeinto方式 來加快數(shù)據(jù)在iceberg流水表和實時snap表中的流轉(zhuǎn)螃诅。
采用spark stream方式啡氢,【實時流式消費】iceberg流水表,與iceberg實時snap表 JOIN后州刽,按照不同operation(append空执,update、delete)方式穗椅,【批】 落地iceberg 實時snap分區(qū)表
on-hoc數(shù)據(jù)檢索與多維分析
無論是千億iceberg流水表還是億級snap表辨绊,都可以結(jié)合presto進行on-hoc查詢和md數(shù)據(jù)分析
當前采用presto 分布式引擎,【on-hoc】查詢iceberg流水和snap表
離線統(tǒng)計
為了兼容已有的傳統(tǒng)hive離線數(shù)倉匹表,滿足部分數(shù)據(jù)使用需求门坷,
采用spark sql方式,【離線批】load Iceberg 實時snap表袍镀,進行相關(guān)aggr后默蚌,寫入hive 分區(qū)表
四、落地成果
根據(jù)線上的實際運行結(jié)果來看苇羡,基于iceberg的數(shù)倉建設已經(jīng)取得了里程碑的成功绸吸。
數(shù)據(jù)輸出結(jié)果
目前已經(jīng)接入全量文章的【索引】數(shù)據(jù),并穩(wěn)定輸出
1、完整歷史流水數(shù)據(jù)锦茁,包含:過百基礎(chǔ)屬性攘轩、指標字段
2、最新索引snap數(shù)據(jù)码俩,即:準實時upsert得到的snap全量數(shù)據(jù)
3度帮、各細分粒度的上卷統(tǒng)計分析數(shù)據(jù)
...
數(shù)據(jù)量級
目前線上 全量文章的【索引】數(shù)據(jù),大致如下
1稿存、圖文索引
日均:5-15E/35G
最大batch:2000萬
當前單表:500億(持續(xù)增長)
2笨篷、視頻索引
日均:10-30E/50G
最大batch:900萬
當前單表:千億(持續(xù)增長)
...
性能與時延
對于iceberg來說,無論數(shù)據(jù)流/批寫入瓣履,還是數(shù)據(jù)的流批消費率翅,最底層都是依賴于datafile。
So袖迎,datafile的【合理性】安聘、以及【hdfs讀寫性能】與【并發(fā)度】就是影響其性能的關(guān)鍵指標。
最終決定Iceberg性能的就是當前操作的表中 【目標datafile數(shù)量是否過多】瓢棒、【filesize是否合理】、【計算資源是否充足】丘喻、【并發(fā)度是否合理】脯宿、【hdfs是否穩(wěn)定】等指標。
五泉粉、問題治理與源碼優(yōu)化
雖然結(jié)果是令人值得高興的连霉,但是在實際的iceberg數(shù)倉構(gòu)建過程中,我們?nèi)耘f需要注意一些核心的問題嗡靡!
(一)小文件過度膨脹
Iceberg表跺撼,每次commit提交時,最終數(shù)據(jù)以datafile文件形式落地成snapshot讨彼,單個snapshot的文件數(shù)由 作業(yè)output并發(fā)度與表分區(qū) 綜合決定歉井。
在流式或者微批場景下,commit較頻繁哈误,會導致當前表的總datafile數(shù)量急速膨脹哩至,從而產(chǎn)生雪崩效應,間接影響后續(xù)數(shù)據(jù)鏈路的穩(wěn)定性與查詢性能蜜自,最終導致可用性大大降低菩貌。
So,我們需要進行同步小文件的優(yōu)化重荠,主要分為以下方法:
1箭阶、降低commit頻次,滿足業(yè)務時效性要求即可
2、降低output并發(fā)度與表分區(qū)仇参,保持單個分區(qū)數(shù)據(jù)量級在 XXX 范圍
3嘹叫、開啟rewrite操作
. 根據(jù)當前表數(shù)據(jù)分布,配置合理的filter
. 根據(jù)當前hdfs blockSize & iceberg表splitsize 合理配置targetSizeInBytes
. 根據(jù)適當控制rewrite頻次冈敛,防止過多消耗資源
代碼樣例:
System.out.println("START TO REWRITE FROM {" + start.getTime() + "} TO {" + end.getTime() + "}");
Actions.forTable(table)
.rewriteDataFiles()
.filter(Expressions.greaterThan("ftime", start.getTimeInMillis() * 1000))
.filter(Expressions.lessThan("ftime", end.getTimeInMillis() * 1000))
.targetSizeInBytes(targetSizeInBytes)
.execute();
table.refresh();
(二)歷史snap過多待笑,導致數(shù)據(jù)膨脹
頻繁的rewrite產(chǎn)生的snap、會導致iceberg表實際磁盤存儲占用急速膨脹抓谴,需要根據(jù)實際業(yè)務場景進行歷史snap的expire工作暮蹂。
當前社區(qū)spark api的retainlat和oldThan兩種策略結(jié)果與預期在一定的差異,且是單并發(fā)癌压,性能不是很好仰泻,多并發(fā)版本,見:news-spark-sdk-1.0-SNAPSHOT.jar
代碼樣例:
int maxSize = getMaxParallelSize();
ActionsWithParallel.forTable(table)
.expireSnapshotsWithParallel()
.retainLast(config.getIntKValue(RETAIN_SNAP_NUMS))
.expireOlderThan(end)
.execute(maxSize, 10000);
(三)orphan files過多
在日常iceberg表的使用過程中滩届,會由于各種原因集侯,導致很多orphan 垃圾文件,導致無效的磁盤占用帜消,如:snap expire未完成棠枉、多端寫入時由于鎖競爭導致、rewrite commit失敗導致等等泡挺,所以我們需要定時進行表清理辈讶。
當前社區(qū)的文件收集和刪除都是單并發(fā),多并發(fā)版本見:news-spark-sdk-1.0-SNAPSHOT.jar
代碼樣例:
int maxSize = getMaxParallelSize();
System.out.println("start to removeOrphanFile before " + today + ", " + end);
ActionsWithParallel.forTable(table)
.removeOrphanFilesWithParallel()
.olderThan(end) // 謹慎操作娄猫,避免誤刪當前寫入還未成功commit的datafile
.execute(maxSize, 10000);
(四)歷史數(shù)據(jù)過多贱除,如何TTL?
行級 刪除
第一步
delete from * where filter...
第二步
snap expire媳溺,參看上文
文件級 刪除
to be checked...
總結(jié):
Iceberg表的核心就是datafile月幌,datafile的合理性決定了所有基于iceberg表的數(shù)據(jù)環(huán)節(jié)的穩(wěn)定性與性能。
So悬蔽,我們需要重點針對iceberg表的datafile等文件扯躺,啟動對應的"補丁"作業(yè),來保證datafile在可預期范圍蝎困。
? **補丁分為:同步方式缅帘、異步方式**
- 同步方式:即在iceberg的數(shù)據(jù)寫入的同時運行,優(yōu)點是可以保證即時的補丁效果难衰,缺點是對于正常的讀寫主線邏輯有一定的干擾钦无,資源不可控,當前不是很穩(wěn)定盖袭。
- 異步方式:即單獨啟動異步的spark作業(yè)對iceberg表進行操作失暂,與主線讀寫完全分離彼宠,資源可控,頻率可控弟塞,穩(wěn)定性較好凭峡。
六、日常維護
分區(qū)優(yōu)化
iceberg作為大數(shù)據(jù)的存儲媒介决记,在生產(chǎn)環(huán)節(jié)中摧冀,分區(qū)設計必不可少。根據(jù)iceberg的核心原理系宫,分區(qū)是影響小文件數(shù)量的關(guān)鍵因素索昂,參看【問題1】。繼而間接的影響整個iceberg表的穩(wěn)定性以及讀寫性能扩借。甚至于對我們的日常數(shù)據(jù)維護也起到息息相關(guān)的作用椒惨。
假如線上業(yè)務或者數(shù)據(jù)變化,需要進行表分區(qū)調(diào)整的潮罪,可以通過spark api進行針對性的優(yōu)化
代碼樣例:
Configuration configuration = new Configuration();
configuration.set("hive.metastore.uris", "XXXX");
HiveCatalog catalog = new HiveCatalog(configuration);
TableIdentifier identifier = TableIdentifier.of(srcDB, srcTable);
Table table = catalog.loadTable(identifier);
BaseTable baseTable = (BaseTable) table;
TableMetadata current = baseTable.operations().current();
PartitionSpec newSpec = PartitionSpec.builderFor(table.schema())
.bucket(config.getKValue(BUCKETS), Integer.parseInt(config.getKValue(BUCKETS_NUM)))
.day(config.getKValue(DAYS))
.withSpecId(current.spec().specId() + 1)
.build();
baseTable.operations().commit(current, current.updatePartitionSpec(newSpec));
table.refresh();
基礎(chǔ)操作
見官網(wǎng)文檔
七康谆、大規(guī)模數(shù)倉建設反思
基于近三個月余的實際落地情況,不由得引人深思嫉到,假如我們要基于iceberg來替換傳統(tǒng)的離線和實時數(shù)倉技術(shù)棧沃暗,我們還欠缺哪些事情,又會有哪些風險呢何恶?
主要從以下幾個維度來與傳統(tǒng)的數(shù)倉技術(shù)進行簡單比對
(一)描睦、可用性
通過當前千億級數(shù)據(jù)落地實踐,不管是數(shù)據(jù)的準實時寫入更新导而,還是亞秒級查詢以及批量load這些基礎(chǔ)功能,還是其事務隔崎、隱藏分區(qū)今艺、時光機等高級特性,都完全可以滿足需求 流批場景的大部分需求爵卒。
不過虚缎,由于其基于文件的核心設計理念,導致其在時效性上暫時還無法做到完全實時钓株。實際應用過程中实牡,時效性要求越高,成本就會相應的越高轴合。
(二)创坞、易用性
在實際的落地過程中,無論表維護(DDL\DML等)受葛、讀寫api题涨,都比較通俗易上手偎谁,就是數(shù)據(jù)治理過程可能有點復雜,在后續(xù)迭代過程中纲堵,可以進一步的封裝巡雨,實現(xiàn)自動化的例行數(shù)據(jù)治理。
(三)席函、穩(wěn)定性
由于iceberg只是一種基于已有metastore服務的數(shù)據(jù)結(jié)構(gòu)铐望,所以其核心的穩(wěn)定性都依賴于metastore以及底層存儲媒介的穩(wěn)定,所以目前上線3月余未出現(xiàn)其本身的大數(shù)據(jù)故障事故
(四)茂附、落地成本
由于處于摸索階段正蛙,暫時性學習成本會稍高一點
相對比,目前市場比較成熟的消息隊列和離線數(shù)倉技術(shù)何之,如果對時效性要求比較高的場景下跟畅,iceberg單表的各項成本還是略高,需要借助外部的計算平臺資源進行各種補丁輔助工作
八溶推、FAQ
(一)iceberg區(qū)
- 寫入時徊件,明明存在 A 字段,為啥報 A 字段不存在呢蒜危?
iceberg表 默認schema大小寫敏感虱痕,操作時注意
配置 spark.sql.caseSensitive=false
- iceberg表dml語句,支持新增字段指定位置或者移動嘛辐赞?
默認不支持after等插入列部翘,也不支持change移動列
- mergeinto 性能較差,莫名其妙多了一個aggr操作响委,是什么原因新思?
默認開啟write.merge.cardinality.check.enabled,嚴重消耗性能
非必要情況下赘风,可以關(guān)閉
設置表屬性
alter table newsapp_data_lake.t_newsapp_dmd_img_txt_parsed_streaming_snapshot set tblproperties ("write.merge.cardinality-check.enabled"="false");
- 監(jiān)控告警 iceberg單表存儲超標夹囚,幾十P數(shù)據(jù)?
別慌邀窃,看看是不是忘記開啟 snap expire和remove orphan "補丁"荸哟,各個目錄實際文件數(shù),orphan文件數(shù)一目了然瞬捕,世界都清凈了~
(二)spark區(qū)
- spark df中的schema 范圍不能比iceberg表的列范圍大
- spark 批寫入時(非fanout)鞍历,報closed partition錯誤?
需要根據(jù)當前表分區(qū)肪虎,自定義分區(qū)udf劣砍,開啟 sort
- spark iceberg save時,單次commit datafile數(shù)量過多扇救?
Iceberg數(shù)據(jù)寫入分為fanout和non fanout writer(default)秆剪,需要確認當前何種方式赊淑,再對應的調(diào)整output并發(fā)度與分區(qū)數(shù),詳見:【問題治理1】
- 通過spark sql建的表仅讽,然后寫入一個timestamp陶缺,為啥讀取出來后少了8小時?
timestamp日期類型字段默認帶時區(qū)洁灵,timestamptz
iceberg默認是 utc時區(qū)饱岸,可以用spark api進行ddl
- 通過spark sql刪除表數(shù)據(jù)時,為什么報錯徽千?
必須帶著where條件
delete from 111.111 where true;
(三)presto區(qū)
- 明明才幾Billion數(shù)據(jù)苫费,為啥查詢十幾分鐘甚至超時失敗呢?
莫慌双抽,先查看iceberg表的files metadata表百框,確認當前表是不是小文件超標,
如果不是牍汹,再去查看presto的執(zhí)行計劃铐维,看看是否是部分dn傾斜,壓力不穩(wěn)定
總結(jié):
1慎菲、目前此文檔旨在 探索iceberg的實際應用場景嫁蛇,并不代表iceberg的全部特性
2、目前千億級別數(shù)據(jù)量級露该,iceberg在離線和準實時領(lǐng)域皆可滿足需求睬棚。準實時場景中,端端數(shù)據(jù)可見性最高可達亞秒至分鐘級別
3解幼、目前iceberg的小文件問題始終是重中之重抑党,也是痛中之痛,要特別注意撵摆!建議分配常駐計算資源來運行底靠,以免由于資源競爭導致線上雪崩事故
4、目前在iceberg流式消費特性上台汇,只是簡單當做消息中間件來使用,對于upsert操作的數(shù)據(jù)尚未涉及
5篱瞎、在實戰(zhàn)業(yè)務中苟呐,使用的說presto作為iceberg的查詢引擎,由于各層組件的不穩(wěn)定性俐筋,導致最終的效果不是很穩(wěn)定牵素,如果想以iceberg+presto做olap分析,還是建議多注意底層組件的穩(wěn)定性澄者,例如:hdfs集群的壓力笆呆,presto的集群規(guī)模等
6请琳、目前已iceberg來替換hive或者消息隊列組件,進行大規(guī)模的數(shù)倉建設赠幕,成本和穩(wěn)定性還有待進一步驗證
參考:
http://iceberg.apache.org
https://prestodb.io/
https://blog.csdn.net/post_yuan/article/details/52241252
https://blog.csdn.net/wypblog/article/details/104744513
To be continued
后續(xù)俄精,給大家?guī)硪恍﹊ceberg的高級特性與底層原理
Iceberg 小文件是如何膨脹的?
Iceberg 底層數(shù)據(jù)存取與傳統(tǒng)數(shù)倉工具 有何差異榕堰?
Iceberg 不同版本特性的對外組件兼容性等
...