Flink Iceberg 0.11

背景

我們?cè)谑褂肍link+Kafka做實(shí)時(shí)數(shù)倉以及數(shù)據(jù)傳輸過程中季惯,遇到了一些問題吠各,Iceberg 0.11的新特性解決了這些業(yè)務(wù)場(chǎng)景,基于Iceberg我們做了一些實(shí)踐勉抓,對(duì)比Kafka來說贾漏,Iceberg在某些特定場(chǎng)景有自己的優(yōu)勢(shì),在這里做一些小的實(shí)踐分享

分享主題

  • 痛點(diǎn)
    1.kafka數(shù)據(jù)具有時(shí)效性藕筋,在消費(fèi)積壓的時(shí)候容易造成數(shù)據(jù)過期丟失
    2.Flink結(jié)合Hive做近實(shí)時(shí)系統(tǒng)越來越慢
  • 選擇iceberg的原因
    1.解決了kafka因數(shù)據(jù)過期而導(dǎo)致的數(shù)據(jù)不完整問題纵散,0.11版本支持了實(shí)時(shí)讀取的功能
    2.解決了Hive做為近實(shí)時(shí)數(shù)倉的性能問題,比如hive元數(shù)據(jù)文件過多念逞,執(zhí)行計(jì)劃慢
  • 使用總結(jié)
痛點(diǎn)1:KAFKA數(shù)據(jù)丟失

存儲(chǔ)上,通常我們會(huì)選擇kafka做實(shí)時(shí)數(shù)倉边翁,以及日志傳輸翎承。kafka本身存儲(chǔ)成本其實(shí)蠻高的,數(shù)據(jù)保留時(shí)間有時(shí)效性符匾,一旦消費(fèi)積壓叨咖,數(shù)據(jù)達(dá)到過期時(shí)間后,會(huì)導(dǎo)致數(shù)據(jù)丟失

什么數(shù)據(jù)適合入湖

對(duì)實(shí)時(shí)有適當(dāng)放寬的啊胶,能接受1-10分鐘的延遲甸各,比如業(yè)務(wù)方可以接受近實(shí)時(shí)數(shù)據(jù),比如日志類數(shù)據(jù)這樣時(shí)效性不是特別敏感的

為什么Iceberg只能做近實(shí)時(shí)入湖焰坪?

物理數(shù)據(jù)寫入Iceberg后趣倾,直到觸發(fā)了checkpoint,這個(gè)時(shí)候才會(huì)寫元數(shù)據(jù)某饰,當(dāng)元數(shù)據(jù)寫入完畢后儒恋,數(shù)據(jù)由不可見變?yōu)榭梢姡@也是實(shí)時(shí)性為什么不能像kafka一樣黔漂,只能做近實(shí)時(shí)


image.png
Flink SQL入湖流程

Flink Iceberg實(shí)時(shí)讀寫


實(shí)時(shí)入湖流程

入湖流程分析

術(shù)語解析

  • 數(shù)據(jù)文件(data files)
    Iceberg 表真實(shí)存儲(chǔ)數(shù)據(jù)的文件诫尽,一般存儲(chǔ)在data目錄下

  • 清單文件(Manifest file)
    每行都是每個(gè)數(shù)據(jù)文件的詳細(xì)描述,包括數(shù)據(jù)文件的狀態(tài)炬守、文件路徑牧嫉、分區(qū)信息、列級(jí)別的統(tǒng)計(jì)信息(比如每列的最大最小值减途、空值數(shù)等)酣藻、通過該文件、可過濾掉無關(guān)數(shù)據(jù)鳍置、提高檢索速度

  • 快照(Snapshot)
    快照代表一張表在某個(gè)時(shí)刻的狀態(tài)臊恋。每個(gè)快照里面會(huì)列出表在某個(gè)時(shí)刻的所有數(shù)據(jù)文件列表。Data files 是存儲(chǔ)在不同的 manifest files 里面墓捻, manifest files 是存儲(chǔ)在一個(gè) Manifest list 文件里面抖仅,而一個(gè) Manifest list 文件代表一個(gè)快照坊夫。

image.png
Flink入湖流程
image.png

組件介紹

  • IcebergStreamWriter
    主要用來寫入記錄到對(duì)應(yīng)的 avro、parquet撤卢、orc 文件环凿,生成一個(gè)對(duì)應(yīng)的 Iceberg DataFile,并發(fā)送給下游算子放吩;另外一個(gè)叫做 IcebergFilesCommitter智听,主要用來在 checkpoint 到來時(shí)把所有的 DataFile 文件收集起來,并提交 Transaction 到 Apache iceberg渡紫,完成本次 checkpoint 的數(shù)據(jù)寫入到推、生成DataFile

  • IcebergFilesCommitter
    為每個(gè)checkpointId 維護(hù)了一個(gè) DataFile 文件列表,即 map<Long, List<DataFile>>惕澎,這樣即使中間有某個(gè) checkpoint的transaction 提交失敗了莉测,它的 DataFile 文件仍然維護(hù)在 State 中,依然可以通過后續(xù)的 checkpoint 來提交數(shù)據(jù)到 Iceberg 表中唧喉。

踩坑記錄

我之前在SQL Clinet寫數(shù)據(jù)到Iceberg捣卤、data目錄數(shù)據(jù)一直在更新,但是metadata沒有數(shù)據(jù)八孝,導(dǎo)致查詢的時(shí)候沒有數(shù)董朝,因?yàn)镮ceberg的查詢計(jì)劃是需要元數(shù)據(jù)來索引真實(shí)數(shù)據(jù)的,本質(zhì)上時(shí)候因?yàn)镮cebergFilesCommitter這個(gè)組件需要狀態(tài)來存儲(chǔ)某個(gè)checkpoint對(duì)應(yīng)的數(shù)據(jù)文件干跛,而SQL Clinet是不支持狀態(tài)開啟的子姜。所以會(huì)導(dǎo)致data寫入數(shù)據(jù)而metadata目錄不寫入元數(shù)據(jù)
PS:寫數(shù)據(jù)必須開啟checkpoint

實(shí)時(shí)讀取Demo

前期工作

  • 開啟實(shí)時(shí)讀寫功能
    set execution.type = streaming
  • 開啟table sql hint功能來使用OPTIONS屬性
    set table.dynamic-table-options.enabled=true
    //注冊(cè)Iceberg catalog用于操作Iceberg表
    CREATE CATALOG iceberg_catalog WITH (\n" +
            "  'type'='iceberg',\n" +
            "  'catalog-type'='hive'," +
            "  'uri'='thrift://localhost:9083'" +
            ");

   //實(shí)時(shí)數(shù)據(jù)入湖
   insert into iceberg_catalog.iceberg_db.tbl1 \n 
            select  *  from  kafka_tbl;

  //實(shí)時(shí)查詢?nèi)牒?shù)據(jù)、也可以sink到
  insert into iceberg_catalog.iceberg_db.tbl2  
    select * from iceberg_catalog.iceberg_db.tbl 
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='10s', snapshot-id'='3821550127947089987')*/ ;
參數(shù)解釋
  • monitor-interval:連續(xù)監(jiān)視新提交的數(shù)據(jù)文件的時(shí)間間隔(默認(rèn)值:1s)
  • start-snapshot-id:從指定的快照ID開始讀取數(shù)據(jù)楼入、每個(gè)快照ID關(guān)聯(lián)的是一組mainfest元數(shù)據(jù)文件闲询,每個(gè)元數(shù)據(jù)文件映射著自己的真實(shí)數(shù)據(jù)文件,通過快照ID浅辙,可以讀取到某個(gè)版本的數(shù)據(jù)
  • 一秒前的數(shù)據(jù)


    image.png
  • 一秒后刷新的數(shù)據(jù)


    image.png
小文件處理

Iceberg 0.11新特性扭弧,對(duì)小文件合并支持了處理

  • 通過分區(qū)/存儲(chǔ)桶鍵使用哈希混洗方式寫記錄记舆、這樣的好處在于鸽捻,一個(gè)task會(huì)處理某個(gè)分區(qū)的數(shù)據(jù),提交自己的Datafile文件泽腮,比如一個(gè)task只處理對(duì)應(yīng)分區(qū)的數(shù)據(jù)御蒲,這樣避免了多個(gè)task處理提交很多小文件的問題
    write.distribution-mode: 該參數(shù)與其它引擎是通用的、比如spark等
CREATE TABLE city_table ( 
     province BIGINT,
     city STRING
  ) PARTITIONED BY (province, city) WITH (
    'write.distribution-mode'='hash' 
  );
Iceberg動(dòng)態(tài)更新Schema

如果數(shù)據(jù)入hive table诊赊,上游數(shù)據(jù)字段變更厚满,需要重建表、以及重啟作業(yè)碧磅、這是一個(gè)相當(dāng)麻煩的工作碘箍,Iceberg通過捕捉上游Schema變更遵馆,將元數(shù)據(jù)信息寫入最新的快照版本,通過版本可以動(dòng)態(tài)的讀取到最新的Schema
PS:社區(qū)暫時(shí)不支持的該功能丰榴,目前只在商業(yè)版做了

元數(shù)據(jù)變更處理流程

痛點(diǎn)2:Flink結(jié)合Hive的近實(shí)時(shí)越來越慢

image.png
隨著表和分區(qū)增多货邓,將會(huì)面臨以下問題
  • 元數(shù)據(jù)過多
    hive將分區(qū)改為小時(shí)/分鐘級(jí)、雖然提高了數(shù)據(jù)的準(zhǔn)實(shí)時(shí)性四濒,但是metestore的壓力也是顯而易見的换况,進(jìn)而導(dǎo)致查詢計(jì)劃變慢

  • 數(shù)據(jù)庫壓力變大
    隨著元數(shù)據(jù)增加,存儲(chǔ)hive元數(shù)據(jù)的數(shù)據(jù)庫壓力也會(huì)增加盗蟆,一段時(shí)間后戈二,還需要對(duì)該庫進(jìn)行升級(jí)。比如存儲(chǔ)空間

image.png

Iceberg 查詢計(jì)劃

  • 查詢計(jì)劃是在表中查找查詢所需文件的過程喳资。
  • 元數(shù)據(jù)過濾
    清單文件包括分區(qū)數(shù)據(jù)元組和每個(gè)數(shù)據(jù)文件的列級(jí)統(tǒng)計(jì)信息觉吭。
    在計(jì)劃期間,查詢謂詞會(huì)自動(dòng)轉(zhuǎn)換為分區(qū)數(shù)據(jù)上的謂詞骨饿,并首先應(yīng)用于過濾數(shù)據(jù)文件亏栈。接下來台腥,使用列級(jí)值計(jì)數(shù)宏赘,空計(jì)數(shù),下限和上限來消除與查詢謂詞不匹配的文件黎侈。
  • 查詢檢索的時(shí)候察署,會(huì)根據(jù)當(dāng)前snapshot ID查詢關(guān)聯(lián)到的maintalifilses,這是個(gè)文件清單列表峻汉,每個(gè)
    maintalifilse又記錄了當(dāng)前data數(shù)據(jù)塊的元數(shù)據(jù)信息贴汪,其中就包含了文件列的最大值和最小值,然后根據(jù)這個(gè)元數(shù)據(jù)信息休吠,索引到具體的文件塊扳埂,從而更快的查詢到數(shù)據(jù)
Iceberg 0.11 排序
  • 排序介紹
    在Iceberg 0.11之前,F(xiàn)link是不支持iceberg排序功能的瘤礁,所以之前只能結(jié)合spark以批模式來支持排序功能阳懂,0.11新增了排序特性的支持,也意味著柜思,我們?cè)趯?shí)時(shí)也可以體會(huì)到這個(gè)好處


    image.png
排序demo
insert into iceberg_table select days from kafka_tbl order by days, province_id
Iceberg manifest詳解

參數(shù)解釋

  • file_path: 物理文件位置
  • partition: 文件所對(duì)應(yīng)的分區(qū)
  • lower_bounds: 該文件中岩调,多個(gè)排序字段的最小值,下圖是我的days和province_id最小值
  • upper_bounds: 該文件中赡盘,多個(gè)排序字段的最大值号枕,下圖是我的days和province_id最大值
    通過分區(qū)、列的上下限信息來確定是否讀取file_path的文件陨享,數(shù)據(jù)排序后葱淳,文件列的信息也會(huì)記錄在元數(shù)據(jù)中钝腺,查詢計(jì)劃從manifest去定位文件,不需要把信息記錄在hive metadata蛙紫,從而減輕hive metadata壓力拍屑,提升查詢效率
    image.png

總結(jié)

Apache Iceberg0.11有很多實(shí)用的新特性、比如實(shí)時(shí)讀取數(shù)據(jù)坑傅,商業(yè)版的捕獲Schema動(dòng)態(tài)變更發(fā)送下游僵驰,通過hash的方法讓task處理一個(gè)區(qū)域的數(shù)據(jù),避免了小文件問題唁毒,以及排序功能提升了查詢速度蒜茴。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市浆西,隨后出現(xiàn)的幾起案子粉私,更是在濱河造成了極大的恐慌,老刑警劉巖近零,帶你破解...
    沈念sama閱讀 218,546評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件诺核,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡久信,警方通過查閱死者的電腦和手機(jī)窖杀,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來裙士,“玉大人入客,你說我怎么就攤上這事⊥茸担” “怎么了桌硫?”我有些...
    開封第一講書人閱讀 164,911評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)啃炸。 經(jīng)常有香客問我铆隘,道長(zhǎng),這世上最難降的妖魔是什么南用? 我笑而不...
    開封第一講書人閱讀 58,737評(píng)論 1 294
  • 正文 為了忘掉前任膀钠,我火速辦了婚禮,結(jié)果婚禮上训枢,老公的妹妹穿的比我還像新娘托修。我一直安慰自己,他們只是感情好恒界,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,753評(píng)論 6 392
  • 文/花漫 我一把揭開白布睦刃。 她就那樣靜靜地躺著,像睡著了一般十酣。 火紅的嫁衣襯著肌膚如雪涩拙。 梳的紋絲不亂的頭發(fā)上际长,一...
    開封第一講書人閱讀 51,598評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音兴泥,去河邊找鬼工育。 笑死,一個(gè)胖子當(dāng)著我的面吹牛搓彻,可吹牛的內(nèi)容都是我干的如绸。 我是一名探鬼主播,決...
    沈念sama閱讀 40,338評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼旭贬,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼怔接!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起稀轨,我...
    開封第一講書人閱讀 39,249評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤扼脐,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后奋刽,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體瓦侮,經(jīng)...
    沈念sama閱讀 45,696評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,888評(píng)論 3 336
  • 正文 我和宋清朗相戀三年佣谐,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了肚吏。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,013評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡台谍,死狀恐怖须喂,靈堂內(nèi)的尸體忽然破棺而出吁断,到底是詐尸還是另有隱情趁蕊,我是刑警寧澤,帶...
    沈念sama閱讀 35,731評(píng)論 5 346
  • 正文 年R本政府宣布仔役,位于F島的核電站掷伙,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏又兵。R本人自食惡果不足惜任柜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,348評(píng)論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望沛厨。 院中可真熱鬧宙地,春花似錦、人聲如沸逆皮。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽电谣。三九已至秽梅,卻和暖如春抹蚀,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背企垦。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評(píng)論 1 270
  • 我被黑心中介騙來泰國(guó)打工环壤, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人钞诡。 一個(gè)月前我還...
    沈念sama閱讀 48,203評(píng)論 3 370
  • 正文 我出身青樓郑现,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親荧降。 傳聞我的和親對(duì)象是個(gè)殘疾皇子懂酱,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,960評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容

  • 今天感恩節(jié)哎,感謝一直在我身邊的親朋好友誊抛。感恩相遇列牺!感恩不離不棄。 中午開了第一次的黨會(huì)拗窃,身份的轉(zhuǎn)變要...
    迷月閃星情閱讀 10,566評(píng)論 0 11
  • 彩排完瞎领,天已黑
    劉凱書法閱讀 4,218評(píng)論 1 3
  • 表情是什么,我認(rèn)為表情就是表現(xiàn)出來的情緒随夸。表情可以傳達(dá)很多信息九默。高興了當(dāng)然就笑了,難過就哭了宾毒。兩者是相互影響密不可...
    Persistenc_6aea閱讀 125,065評(píng)論 2 7