背景
我們?cè)谑褂肍link+Kafka做實(shí)時(shí)數(shù)倉以及數(shù)據(jù)傳輸過程中季惯,遇到了一些問題吠各,Iceberg 0.11的新特性解決了這些業(yè)務(wù)場(chǎng)景,基于Iceberg我們做了一些實(shí)踐勉抓,對(duì)比Kafka來說贾漏,Iceberg在某些特定場(chǎng)景有自己的優(yōu)勢(shì),在這里做一些小的實(shí)踐分享
分享主題
- 痛點(diǎn)
1.kafka數(shù)據(jù)具有時(shí)效性藕筋,在消費(fèi)積壓的時(shí)候容易造成數(shù)據(jù)過期丟失
2.Flink結(jié)合Hive做近實(shí)時(shí)系統(tǒng)越來越慢 - 選擇iceberg的原因
1.解決了kafka因數(shù)據(jù)過期而導(dǎo)致的數(shù)據(jù)不完整問題纵散,0.11版本支持了實(shí)時(shí)讀取的功能
2.解決了Hive做為近實(shí)時(shí)數(shù)倉的性能問題,比如hive元數(shù)據(jù)文件過多念逞,執(zhí)行計(jì)劃慢 - 使用總結(jié)
痛點(diǎn)1:KAFKA數(shù)據(jù)丟失
存儲(chǔ)上,通常我們會(huì)選擇kafka做實(shí)時(shí)數(shù)倉边翁,以及日志傳輸翎承。kafka本身存儲(chǔ)成本其實(shí)蠻高的,數(shù)據(jù)保留時(shí)間有時(shí)效性符匾,一旦消費(fèi)積壓叨咖,數(shù)據(jù)達(dá)到過期時(shí)間后,會(huì)導(dǎo)致數(shù)據(jù)丟失
什么數(shù)據(jù)適合入湖
對(duì)實(shí)時(shí)有適當(dāng)放寬的啊胶,能接受1-10分鐘的延遲甸各,比如業(yè)務(wù)方可以接受近實(shí)時(shí)數(shù)據(jù),比如日志類數(shù)據(jù)這樣時(shí)效性不是特別敏感的
為什么Iceberg只能做近實(shí)時(shí)入湖焰坪?
物理數(shù)據(jù)寫入Iceberg后趣倾,直到觸發(fā)了checkpoint,這個(gè)時(shí)候才會(huì)寫元數(shù)據(jù)某饰,當(dāng)元數(shù)據(jù)寫入完畢后儒恋,數(shù)據(jù)由不可見變?yōu)榭梢姡@也是實(shí)時(shí)性為什么不能像kafka一樣黔漂,只能做近實(shí)時(shí)
Flink SQL入湖流程
Flink Iceberg實(shí)時(shí)讀寫
入湖流程分析
術(shù)語解析
數(shù)據(jù)文件(data files)
Iceberg 表真實(shí)存儲(chǔ)數(shù)據(jù)的文件诫尽,一般存儲(chǔ)在data目錄下清單文件(Manifest file)
每行都是每個(gè)數(shù)據(jù)文件的詳細(xì)描述,包括數(shù)據(jù)文件的狀態(tài)炬守、文件路徑牧嫉、分區(qū)信息、列級(jí)別的統(tǒng)計(jì)信息(比如每列的最大最小值减途、空值數(shù)等)酣藻、通過該文件、可過濾掉無關(guān)數(shù)據(jù)鳍置、提高檢索速度快照(Snapshot)
快照代表一張表在某個(gè)時(shí)刻的狀態(tài)臊恋。每個(gè)快照里面會(huì)列出表在某個(gè)時(shí)刻的所有數(shù)據(jù)文件列表。Data files 是存儲(chǔ)在不同的 manifest files 里面墓捻, manifest files 是存儲(chǔ)在一個(gè) Manifest list 文件里面抖仅,而一個(gè) Manifest list 文件代表一個(gè)快照坊夫。
Flink入湖流程
組件介紹
IcebergStreamWriter
主要用來寫入記錄到對(duì)應(yīng)的 avro、parquet撤卢、orc 文件环凿,生成一個(gè)對(duì)應(yīng)的 Iceberg DataFile,并發(fā)送給下游算子放吩;另外一個(gè)叫做 IcebergFilesCommitter智听,主要用來在 checkpoint 到來時(shí)把所有的 DataFile 文件收集起來,并提交 Transaction 到 Apache iceberg渡紫,完成本次 checkpoint 的數(shù)據(jù)寫入到推、生成DataFileIcebergFilesCommitter
為每個(gè)checkpointId 維護(hù)了一個(gè) DataFile 文件列表,即 map<Long, List<DataFile>>惕澎,這樣即使中間有某個(gè) checkpoint的transaction 提交失敗了莉测,它的 DataFile 文件仍然維護(hù)在 State 中,依然可以通過后續(xù)的 checkpoint 來提交數(shù)據(jù)到 Iceberg 表中唧喉。
踩坑記錄
我之前在SQL Clinet寫數(shù)據(jù)到Iceberg捣卤、data目錄數(shù)據(jù)一直在更新,但是metadata沒有數(shù)據(jù)八孝,導(dǎo)致查詢的時(shí)候沒有數(shù)董朝,因?yàn)镮ceberg的查詢計(jì)劃是需要元數(shù)據(jù)來索引真實(shí)數(shù)據(jù)的,本質(zhì)上時(shí)候因?yàn)镮cebergFilesCommitter這個(gè)組件需要狀態(tài)來存儲(chǔ)某個(gè)checkpoint對(duì)應(yīng)的數(shù)據(jù)文件干跛,而SQL Clinet是不支持狀態(tài)開啟的子姜。所以會(huì)導(dǎo)致data寫入數(shù)據(jù)而metadata目錄不寫入元數(shù)據(jù)
PS:寫數(shù)據(jù)必須開啟checkpoint
實(shí)時(shí)讀取Demo
前期工作
- 開啟實(shí)時(shí)讀寫功能
set execution.type = streaming - 開啟table sql hint功能來使用OPTIONS屬性
set table.dynamic-table-options.enabled=true
//注冊(cè)Iceberg catalog用于操作Iceberg表
CREATE CATALOG iceberg_catalog WITH (\n" +
" 'type'='iceberg',\n" +
" 'catalog-type'='hive'," +
" 'uri'='thrift://localhost:9083'" +
");
//實(shí)時(shí)數(shù)據(jù)入湖
insert into iceberg_catalog.iceberg_db.tbl1 \n
select * from kafka_tbl;
//實(shí)時(shí)查詢?nèi)牒?shù)據(jù)、也可以sink到
insert into iceberg_catalog.iceberg_db.tbl2
select * from iceberg_catalog.iceberg_db.tbl
/*+ OPTIONS('streaming'='true', 'monitor-interval'='10s', snapshot-id'='3821550127947089987')*/ ;
參數(shù)解釋
- monitor-interval:連續(xù)監(jiān)視新提交的數(shù)據(jù)文件的時(shí)間間隔(默認(rèn)值:1s)
- start-snapshot-id:從指定的快照ID開始讀取數(shù)據(jù)楼入、每個(gè)快照ID關(guān)聯(lián)的是一組mainfest元數(shù)據(jù)文件闲询,每個(gè)元數(shù)據(jù)文件映射著自己的真實(shí)數(shù)據(jù)文件,通過快照ID浅辙,可以讀取到某個(gè)版本的數(shù)據(jù)
-
一秒前的數(shù)據(jù)
image.png -
一秒后刷新的數(shù)據(jù)
image.png
小文件處理
Iceberg 0.11新特性扭弧,對(duì)小文件合并支持了處理
- 通過分區(qū)/存儲(chǔ)桶鍵使用哈希混洗方式寫記錄记舆、這樣的好處在于鸽捻,一個(gè)task會(huì)處理某個(gè)分區(qū)的數(shù)據(jù),提交自己的Datafile文件泽腮,比如一個(gè)task只處理對(duì)應(yīng)分區(qū)的數(shù)據(jù)御蒲,這樣避免了多個(gè)task處理提交很多小文件的問題
write.distribution-mode: 該參數(shù)與其它引擎是通用的、比如spark等
CREATE TABLE city_table (
province BIGINT,
city STRING
) PARTITIONED BY (province, city) WITH (
'write.distribution-mode'='hash'
);
Iceberg動(dòng)態(tài)更新Schema
如果數(shù)據(jù)入hive table诊赊,上游數(shù)據(jù)字段變更厚满,需要重建表、以及重啟作業(yè)碧磅、這是一個(gè)相當(dāng)麻煩的工作碘箍,Iceberg通過捕捉上游Schema變更遵馆,將元數(shù)據(jù)信息寫入最新的快照版本,通過版本可以動(dòng)態(tài)的讀取到最新的Schema
PS:社區(qū)暫時(shí)不支持的該功能丰榴,目前只在商業(yè)版做了
痛點(diǎn)2:Flink結(jié)合Hive的近實(shí)時(shí)越來越慢
隨著表和分區(qū)增多货邓,將會(huì)面臨以下問題
元數(shù)據(jù)過多
hive將分區(qū)改為小時(shí)/分鐘級(jí)、雖然提高了數(shù)據(jù)的準(zhǔn)實(shí)時(shí)性四濒,但是metestore的壓力也是顯而易見的换况,進(jìn)而導(dǎo)致查詢計(jì)劃變慢數(shù)據(jù)庫壓力變大
隨著元數(shù)據(jù)增加,存儲(chǔ)hive元數(shù)據(jù)的數(shù)據(jù)庫壓力也會(huì)增加盗蟆,一段時(shí)間后戈二,還需要對(duì)該庫進(jìn)行升級(jí)。比如存儲(chǔ)空間
Iceberg 查詢計(jì)劃
- 查詢計(jì)劃是在表中查找查詢所需文件的過程喳资。
- 元數(shù)據(jù)過濾
清單文件包括分區(qū)數(shù)據(jù)元組和每個(gè)數(shù)據(jù)文件的列級(jí)統(tǒng)計(jì)信息觉吭。
在計(jì)劃期間,查詢謂詞會(huì)自動(dòng)轉(zhuǎn)換為分區(qū)數(shù)據(jù)上的謂詞骨饿,并首先應(yīng)用于過濾數(shù)據(jù)文件亏栈。接下來台腥,使用列級(jí)值計(jì)數(shù)宏赘,空計(jì)數(shù),下限和上限來消除與查詢謂詞不匹配的文件黎侈。 - 查詢檢索的時(shí)候察署,會(huì)根據(jù)當(dāng)前snapshot ID查詢關(guān)聯(lián)到的maintalifilses,這是個(gè)文件清單列表峻汉,每個(gè)
maintalifilse又記錄了當(dāng)前data數(shù)據(jù)塊的元數(shù)據(jù)信息贴汪,其中就包含了文件列的最大值和最小值,然后根據(jù)這個(gè)元數(shù)據(jù)信息休吠,索引到具體的文件塊扳埂,從而更快的查詢到數(shù)據(jù)
Iceberg 0.11 排序
-
排序介紹
在Iceberg 0.11之前,F(xiàn)link是不支持iceberg排序功能的瘤礁,所以之前只能結(jié)合spark以批模式來支持排序功能阳懂,0.11新增了排序特性的支持,也意味著柜思,我們?cè)趯?shí)時(shí)也可以體會(huì)到這個(gè)好處
image.png
排序demo
insert into iceberg_table select days from kafka_tbl order by days, province_id
Iceberg manifest詳解
參數(shù)解釋
- file_path: 物理文件位置
- partition: 文件所對(duì)應(yīng)的分區(qū)
- lower_bounds: 該文件中岩调,多個(gè)排序字段的最小值,下圖是我的days和province_id最小值
-
upper_bounds: 該文件中赡盘,多個(gè)排序字段的最大值号枕,下圖是我的days和province_id最大值
通過分區(qū)、列的上下限信息來確定是否讀取file_path的文件陨享,數(shù)據(jù)排序后葱淳,文件列的信息也會(huì)記錄在元數(shù)據(jù)中钝腺,查詢計(jì)劃從manifest去定位文件,不需要把信息記錄在hive metadata蛙紫,從而減輕hive metadata壓力拍屑,提升查詢效率
image.png
總結(jié)
Apache Iceberg0.11有很多實(shí)用的新特性、比如實(shí)時(shí)讀取數(shù)據(jù)坑傅,商業(yè)版的捕獲Schema動(dòng)態(tài)變更發(fā)送下游僵驰,通過hash的方法讓task處理一個(gè)區(qū)域的數(shù)據(jù),避免了小文件問題唁毒,以及排序功能提升了查詢速度蒜茴。