隨著Apache Parquet和Apache ORC等存儲(chǔ)格式以及Presto和Apache Impala等查詢引擎的發(fā)展冕象,Hadoop生態(tài)系統(tǒng)有潛力作為面向分鐘級(jí)延時(shí)場(chǎng)景的通用統(tǒng)一服務(wù)層。然而瘩燥,為了實(shí)現(xiàn)這一點(diǎn)撑教,這需要在HDFS中實(shí)現(xiàn)高效且低延遲的數(shù)據(jù)攝取及數(shù)據(jù)準(zhǔn)備。
為了解決這個(gè)問(wèn)題,優(yōu)步開發(fā)了Hudi項(xiàng)目驶忌,這是一個(gè)增量處理框架,高效和低延遲地為所有業(yè)務(wù)關(guān)鍵數(shù)據(jù)鏈路提供有力支持笑跛。事實(shí)上付魔,Uber已經(jīng)將Hudi開源 - https://github.com/uber/hudi。在深入的了解Hudi之前飞蹂,我們首先討論一下為什么將Hadoop作為統(tǒng)一的服務(wù)層是一個(gè)不錯(cuò)的想法几苍。
動(dòng)機(jī)
Lambda架構(gòu)是一種常見(jiàn)的數(shù)據(jù)處理體系結(jié)構(gòu),它的數(shù)據(jù)的處理依賴流式計(jì)算層(Streaming Layer)和批處理計(jì)算層(Batch Layer)的雙重計(jì)算陈哑。每隔幾個(gè)小時(shí)妻坝,批處理過(guò)程被啟動(dòng)以計(jì)算精確的業(yè)務(wù)狀態(tài),并將批量更新加載到服務(wù)層(Serving Layer)惊窖。同時(shí)刽宪,為了消除上述幾個(gè)小時(shí)的等待時(shí)間我們會(huì)在流式計(jì)算層對(duì)這個(gè)業(yè)務(wù)數(shù)據(jù)進(jìn)行實(shí)時(shí)的狀態(tài)更新。然而界酒,這個(gè)流計(jì)算的狀態(tài)只是一個(gè)最終結(jié)果的近似值圣拄,最終需要被批處理的計(jì)算結(jié)果所覆蓋。由于兩種模式提供的狀態(tài)差異盾计,我們需要為批處理和流處理提供不同的服務(wù)層售担,并在這個(gè)上面再做合并抽象,或者設(shè)計(jì)應(yīng)用一個(gè)相當(dāng)復(fù)雜的服務(wù)系統(tǒng)(如Druid)署辉,用于同時(shí)在行級(jí)更新和批量加載中提供優(yōu)異表現(xiàn)族铆。
對(duì)于是否需要一個(gè)額外單獨(dú)的批處理層,Kappa架構(gòu)認(rèn)為一個(gè)單獨(dú)的流式計(jì)算層足以成為數(shù)據(jù)處理的通用解決方案哭尝。廣義上哥攘,所有數(shù)據(jù)計(jì)算都可以描述為生產(chǎn)者生產(chǎn)一個(gè)數(shù)據(jù)流,而消費(fèi)者不斷的逐條迭代消費(fèi)這個(gè)流中的記錄材鹦,如火山模型(Volcano Iterator model)逝淹。這就意味著流式計(jì)算層可以依靠堆資源以增加并行能力的方式來(lái)對(duì)業(yè)務(wù)狀態(tài)進(jìn)行重算更新。這類系統(tǒng)可以依靠有效的檢查點(diǎn)(checkpoint)和大量的狀態(tài)管理來(lái)讓流式處理的結(jié)果不再只是一個(gè)近似值桶唐。這個(gè)模型被應(yīng)用于很多的數(shù)據(jù)攝取任務(wù)栅葡。盡管如此,雖然批處理層在這個(gè)模型中被去掉了尤泽,但是在服務(wù)層仍然存在兩個(gè)問(wèn)題欣簇。
如今很多流式處理引擎都支持行級(jí)的數(shù)據(jù)處理规脸,這就要求我們的服務(wù)層也需要能夠支持行級(jí)更新的能力。通常熊咽,這類系統(tǒng)并不能對(duì)分析類的查詢掃描優(yōu)化到這個(gè)地步莫鸭,除非我們?cè)趦?nèi)存中緩存大量記錄(如Memsql)或者有強(qiáng)大的索引支持(如ElasticSearch)。這些系統(tǒng)為了獲得數(shù)據(jù)攝取和掃描的性能往往需要增加成本和犧牲服務(wù)的可擴(kuò)展性横殴。出于這個(gè)原因被因,這類服務(wù)系統(tǒng)的數(shù)據(jù)駐留的能力往往是有限的,從時(shí)間上可能30~90天衫仑,從總量上來(lái)說(shuō)幾個(gè)TB的數(shù)據(jù)就是他們的極限了梨与。對(duì)于歷史數(shù)據(jù)的分析又會(huì)被重新定向到時(shí)延要求不那么高的HDFS上。
對(duì)于數(shù)據(jù)攝取延時(shí)蛋欣、掃描性能和計(jì)算資源和操作復(fù)雜性的權(quán)衡是無(wú)法避免的。但是如果我們的業(yè)務(wù)場(chǎng)景對(duì)時(shí)延的要求并不是那么的高如贷,比如能接受10分鐘左右的延遲陷虎,在我們?nèi)绻新纷涌梢栽贖DFS上快速的進(jìn)行數(shù)據(jù)攝取和數(shù)據(jù)準(zhǔn)備的基礎(chǔ)上,服務(wù)層中的Speed Serving就不必要了杠袱。這么做可以統(tǒng)一服務(wù)層尚猿,大大降低系統(tǒng)整體的復(fù)雜度和資源消耗。
要將HDFS用作統(tǒng)一的服務(wù)層楣富,我們不但需要使它支持存儲(chǔ)變更日志(或者叫日志記錄系統(tǒng))凿掂,而且需要支持根據(jù)實(shí)際業(yè)務(wù)維度來(lái)分區(qū)、壓縮纹蝴、去重的業(yè)務(wù)狀態(tài)管理庄萎。這類統(tǒng)一服務(wù)層需具備如下幾個(gè)特性:
- 大型HDFS數(shù)據(jù)集的快速變更能力
- 數(shù)據(jù)存儲(chǔ)需要針對(duì)分析類掃描進(jìn)行優(yōu)化(列存)
- 有效的連接和將更新傳播到上層建模數(shù)據(jù)集的能力
被壓縮的業(yè)務(wù)狀態(tài)變更是無(wú)法避免的,即使我們以事件時(shí)間(Event time)作為業(yè)務(wù)分區(qū)字段塘安。由于遲到數(shù)據(jù)和事件時(shí)間和處理時(shí)間(Processing time)的不一致糠涛,在數(shù)據(jù)攝取場(chǎng)景中我們依然需要對(duì)老的分區(qū)進(jìn)行必要的更新操作。最后就算我們把處理時(shí)間作為分區(qū)字段兼犯,依然存在一些需要進(jìn)行更新的場(chǎng)景忍捡,比如由于安全、審計(jì)方面的原因?qū)υ瓟?shù)據(jù)進(jìn)行校正的需求切黔。
Hudi簡(jiǎn)介:Hi, Hudi
作為一個(gè)增量處理框架砸脊,我們的Hudi支持前面章節(jié)中所述的所有需求。一言以蔽之纬霞,Hudi是一種針對(duì)分析型業(yè)務(wù)的凌埂、掃描優(yōu)化的數(shù)據(jù)存儲(chǔ)抽象,它能夠使HDFS數(shù)據(jù)集在分鐘級(jí)的時(shí)延內(nèi)支持變更诗芜,也支持下游系統(tǒng)對(duì)這個(gè)數(shù)據(jù)集的增量處理瞳抓。
Hudi數(shù)據(jù)集通過(guò)自定義的InputFormat
兼容當(dāng)前Hadoop生態(tài)系統(tǒng)秒紧,包括Apache Hive,Apache Parquet挨下,Presto和Apache Spark,使得終端用戶可以無(wú)縫的對(duì)接脐湾。
該數(shù)據(jù)流模型通過(guò)時(shí)延和數(shù)據(jù)完整性保證兩個(gè)維度去權(quán)衡以構(gòu)建數(shù)據(jù)管道。下圖所示的是Uber Engineering如何根據(jù)這兩個(gè)維度進(jìn)行處理方式的劃分秤掌。
對(duì)于很少一些需要真正做到約1分鐘的延時(shí)的用例及簡(jiǎn)單業(yè)務(wù)指標(biāo)的展示應(yīng)用愁铺,我們基于行級(jí)的流式處理。對(duì)于傳統(tǒng)的機(jī)器學(xué)習(xí)和實(shí)驗(yàn)有效性分析用例闻鉴,我們選擇更加擅長(zhǎng)較重計(jì)算的批處理茵乱。對(duì)于包含復(fù)雜連接或者重要數(shù)據(jù)處理的近實(shí)時(shí)場(chǎng)景,我們基于Hudi以及它的增量處理原語(yǔ)來(lái)獲得兩全其美的結(jié)果孟岛。想要了解Uber使用Hudi的更多用例和場(chǎng)景瓶竭,可以去他們的Githup文檔(https://uber.github.io/hudi/use_cases.html)里面看一下。
Hudi數(shù)據(jù)集的存儲(chǔ)
Hudi數(shù)據(jù)集的組織目錄結(jié)構(gòu)與Hive表示非常相似渠羞,一份數(shù)據(jù)集對(duì)應(yīng)這一個(gè)根目錄斤贰。數(shù)據(jù)集被打散為多個(gè)分區(qū),分區(qū)字段以文件夾形式存在次询,該文件夾包含該分區(qū)的所有文件荧恍。在根目錄下,每個(gè)分區(qū)都有唯一的分區(qū)路徑屯吊。每個(gè)分區(qū)記錄分布于多個(gè)文件中送巡。每個(gè)文件都有惟一的fileId
和生成文件的commit
所標(biāo)識(shí)。如果發(fā)生更新操作時(shí)盒卸,多個(gè)文件共享相同的fileId骗爆,但會(huì)有不同的commit
。
每條記錄由記錄的key值進(jìn)行標(biāo)識(shí)并映射到一個(gè)fileId蔽介。一條記錄的key與fileId之間的映射一旦在第一個(gè)版本寫入該文件時(shí)就是永久確定的淮腾。換言之,一個(gè)fileId標(biāo)識(shí)的是一組文件屉佳,每個(gè)文件包含一組特定的記錄谷朝,不同文件之間的相同記錄通過(guò)版本號(hào)區(qū)分。
Hudi Storage由三個(gè)不同部分組成:
-
Metadata - 以時(shí)間軸(timeline)的形式將數(shù)據(jù)集上的各項(xiàng)操作元數(shù)據(jù)維護(hù)起來(lái)武花,以支持?jǐn)?shù)據(jù)集的瞬態(tài)視圖圆凰,這部分元數(shù)據(jù)存儲(chǔ)于根目錄下的元數(shù)據(jù)目錄。一共有三種類型的元數(shù)據(jù):
- Commits - 一個(gè)單獨(dú)的commit包含對(duì)數(shù)據(jù)集之上一批數(shù)據(jù)的一次原子寫入操作的相關(guān)信息体箕。我們用單調(diào)遞增的時(shí)間戳來(lái)標(biāo)識(shí)commits专钉,標(biāo)定的是一次寫入操作的開始挑童。
- Cleans - 用于清除數(shù)據(jù)集中不再被查詢所用到的舊版本文件的后臺(tái)活動(dòng)。
- Compactions - 用于協(xié)調(diào)Hudi內(nèi)部的數(shù)據(jù)結(jié)構(gòu)差異的后臺(tái)活動(dòng)跃须。例如站叼,將更新操作由基于行存的日志文件歸集到列存數(shù)據(jù)上。
-
Index - Hudi維護(hù)著一個(gè)索引菇民,以支持在記錄key存在情況下尽楔,將新記錄的key快速映射到對(duì)應(yīng)的fileId。索引的實(shí)現(xiàn)是插件式的第练,
- Bloom filter - 存儲(chǔ)于數(shù)據(jù)文件頁(yè)腳阔馋。默認(rèn)選項(xiàng),不依賴外部系統(tǒng)實(shí)現(xiàn)娇掏。數(shù)據(jù)和索引始終保持一致呕寝。
- Apache HBase - 可高效查找一小批key。在索引標(biāo)記期間婴梧,此選項(xiàng)可能快幾秒鐘下梢。
-
Data - Hudi以兩種不同的存儲(chǔ)格式存儲(chǔ)所有攝取的數(shù)據(jù)。這塊的設(shè)計(jì)也是插件式的塞蹭,用戶可選擇滿足下列條件的任意數(shù)據(jù)格式:
- 讀優(yōu)化的列存格式(ROFormat)怔球。缺省值為Apache Parquet
- 寫優(yōu)化的行存格式(WOFormat)。缺省值為Apache Avro
寫Hudi文件
Compaction
Hudi對(duì)HDFS的使用模式進(jìn)行了優(yōu)化竟坛。Compaction是將數(shù)據(jù)從寫優(yōu)化格式轉(zhuǎn)換為讀優(yōu)化格式的關(guān)鍵操作。Compaction操作的基本并行單位是對(duì)一個(gè)fileID的重寫钧舌,Hudi保證所有的數(shù)據(jù)文件的大小和HDFS的塊大小對(duì)齊担汤,這樣可以使Compaction操作的并行度、查詢的并行度和HDFS文件總數(shù)間取得平衡洼冻。Compaction操作也是插件式的崭歧,可以擴(kuò)展為合并不頻繁更新的老的數(shù)據(jù)文件已進(jìn)一步減少文件總數(shù)。
寫入方式
Hudi是一個(gè)Spark的第三方庫(kù)撞牢,以Spark Streaming的方式運(yùn)行數(shù)據(jù)攝取作業(yè)率碾,這些作業(yè)一般建議以1~2分鐘左右的微批(micro-batch)進(jìn)行處理。當(dāng)然屋彪,在權(quán)衡自己業(yè)務(wù)在時(shí)延要求和資源層面的前提下所宰,我們也可以用Apache Oozie或者Apache Airflow來(lái)進(jìn)行離線作業(yè)周期性調(diào)度。
在默認(rèn)配置下畜挥,Hudi使用一下寫入路徑:
Hudi從相關(guān)的分區(qū)下的parquet文件中加載BloomFilter索引仔粥,并通過(guò)傳入key值映射到對(duì)應(yīng)的文件來(lái)標(biāo)記是更新還是插入。此處的連接操作可能由于輸入數(shù)據(jù)的大小,分區(qū)的分布或者單個(gè)分區(qū)下的文件數(shù)問(wèn)題導(dǎo)致數(shù)據(jù)傾斜躯泰。通過(guò)對(duì)連接字段進(jìn)行范圍分區(qū)以及新建子分區(qū)的方式處理谭羔,以避免Spark某些低版本中處理Shuffle文件時(shí)的2GB限制的問(wèn)題 - https://issues.apache.org/jira/browse/SPARK-6190。
-
Hudi按分區(qū)對(duì)
insert
進(jìn)行分組麦向,分配一個(gè)fileId瘟裸,然后對(duì)相應(yīng)的日志文件進(jìn)行append操作,知道文件大小達(dá)到HDSF塊大小诵竭。然后话告,新的fileId生成,重復(fù)上述過(guò)程秀撇,直到所有的數(shù)據(jù)都被插入。- 一個(gè)有時(shí)間限制compaction操作會(huì)被后臺(tái)以幾分鐘為周期調(diào)度起來(lái)向族,生成一個(gè)compactions的優(yōu)先級(jí)列表呵燕,并壓縮一個(gè)fileId包含的所有avro文件以生成進(jìn)行當(dāng)前parquet文件的下一個(gè)版本。
- Compaction操作是異步的件相,鎖定幾個(gè)特定的日志版本進(jìn)行壓縮再扭,并以新的日志記錄更新到對(duì)應(yīng)fileId中。鎖維護(hù)在Zookeeper中夜矗。
- Compaction操作的優(yōu)先級(jí)順序由被壓縮的日志數(shù)據(jù)大小決定泛范,并基于一個(gè)Compaction策略可配置。每一輪壓縮迭代過(guò)程中紊撕,大文件優(yōu)先被壓縮罢荡,因?yàn)橹貙憄arquet文件的開銷并不會(huì)根據(jù)文件的更新次數(shù)進(jìn)行分?jǐn)偂?/li>
Hudi在針對(duì)一個(gè)fileId進(jìn)行更新操作時(shí),如果對(duì)應(yīng)的日志文件存在則append对扶,反之区赵,會(huì)新建日志文件。
如果數(shù)據(jù)攝取作業(yè)成功浪南,一個(gè)
commit
記錄會(huì)在Hudi的元數(shù)據(jù)時(shí)間軸中記錄笼才,即將inflight文件重命名為commit文件,并將分區(qū)和所創(chuàng)建fileId版本的詳細(xì)信息記錄下來(lái)络凿。
HDFS塊對(duì)齊
如上所述骡送,Hudi會(huì)努力將文件大小和HDFS底層塊大小對(duì)齊。取決于一個(gè)分區(qū)下數(shù)據(jù)的總量和列存的壓縮效果絮记,compaction操作依然能夠創(chuàng)建parquet小文件摔踱。因?yàn)閷?duì)分區(qū)的插入操作會(huì)是以對(duì)現(xiàn)有小文件的更新來(lái)進(jìn)行的,所有這些小文件的問(wèn)題最終會(huì)被一次次的迭代不斷修正怨愤。最終昌渤,文件大小會(huì)不斷增長(zhǎng)直到與HDFS塊大小一致。
故障恢復(fù)
首先憔四,Spark的本身的重試機(jī)制會(huì)cover一些間歇性的異常膀息,當(dāng)然如果超過(guò)了重試次數(shù)的閾值般眉,我們的整個(gè)作業(yè)都會(huì)失敗。下一次的迭代作業(yè)會(huì)在同一批次數(shù)據(jù)上進(jìn)行重試潜支。以下列出兩個(gè)重要的區(qū)別:
- 攝取失敗可能在日志文件中生成包含部分?jǐn)?shù)據(jù)的avro塊 - 這個(gè)問(wèn)題通過(guò)在
commit
元數(shù)據(jù)中存儲(chǔ)對(duì)應(yīng)數(shù)據(jù)塊的起始偏移量和日志文件版本來(lái)解決甸赃。當(dāng)讀取日志文件時(shí),偶爾發(fā)生的部分寫入的數(shù)據(jù)塊會(huì)被跳過(guò)冗酿,且會(huì)從正確的位置開始讀取avro文件埠对。 - Compaction過(guò)程失敗會(huì)生產(chǎn)包含部分?jǐn)?shù)據(jù)的parquet文件 - 這個(gè)問(wèn)題在查詢階段被解決,通過(guò)
commit
元數(shù)據(jù)進(jìn)行文件版本的過(guò)濾裁替。查詢階段只會(huì)讀取最新的完成的compaction后的文件项玛。這些失敗的compaction文件會(huì)在下一個(gè)compaction周期被回滾。
讀取Hudi文件
commit
時(shí)間軸元數(shù)據(jù)可以讓我們?cè)谕环軭DFS數(shù)據(jù)上同時(shí)享有讀取優(yōu)化的視圖和實(shí)時(shí)視圖弱判〗缶冢客戶端可以基于延遲要求和查詢性能決定使用哪種視圖。Hudi以自定義的InputFormat
和一個(gè)Hive注冊(cè)模塊來(lái)提供這兩種視圖昌腰,后者可以將這兩種視圖注冊(cè)為Hive Metastore表开伏。這兩種輸入格式都可以識(shí)別fileId和commit
時(shí)間,可以篩選并讀取最新提交的文件遭商。然后固灵,Hudi會(huì)基于這些數(shù)據(jù)文件生成輸入分片供查詢使用。
InputFormat
的具體信息如下:
- HoodieReadOptimizedInputFormat - 提供掃描優(yōu)化的視圖劫流,篩選所有的日志文件并獲取最新版本的parquet壓縮文件
- HoodieRealtimeInputFormat - 提供一個(gè)實(shí)時(shí)的視圖巫玻,除了會(huì)獲取最新的parquet壓縮文件之外,還提供一個(gè)
RecordReader
以合并與parquet文件相關(guān)的日志文件祠汇。
這兩類InputFormat
都擴(kuò)展了MapredParquetInputFormat
和VectorizedParquetRecordReader
大审,因此所有針對(duì)parquet文件的優(yōu)化依然被保留。依賴于hoodie-hadoop-mr
類庫(kù)座哩,Presto和Spark SQL可以對(duì)Hudi格式的Hive Metastore表做到開箱即用徒扶。
增量處理
前面提到過(guò)根穷,數(shù)據(jù)模型表需要在HDFS中處理和提供姜骡,才能使的HDFS算的上是一個(gè)統(tǒng)一的服務(wù)層。構(gòu)建低延時(shí)的數(shù)據(jù)模型表需要能夠鏈接HDFS數(shù)據(jù)集記性增量處理屿良。由于Hudi在元數(shù)據(jù)中維護(hù)了每次提交的提交時(shí)間以及對(duì)應(yīng)的文件版本圈澈,使得我們可以基于起始時(shí)間戳和結(jié)束時(shí)間戳從特定的Hudi數(shù)據(jù)集中提取增量的變更數(shù)據(jù)集。
這個(gè)過(guò)程基本上與普通的查詢大致相同尘惧,只是選取特定時(shí)間范圍內(nèi)的文件版本進(jìn)行讀取而不是選最新的康栈,提交時(shí)間會(huì)最為過(guò)濾條件被謂詞下推到文件掃描階段。這個(gè)增量結(jié)果集也收到文件自動(dòng)清理的影響,如果某些時(shí)間范圍內(nèi)的文件被自動(dòng)清理掉了啥么,那自然也是不能被訪問(wèn)到了登舞。
這樣我們就可以基于watermark做雙流join和流與靜態(tài)數(shù)據(jù)的join以對(duì)存儲(chǔ)在HDFS中的數(shù)據(jù)模型表計(jì)算和upsert
。
參考文獻(xiàn)
[1] https://eng.uber.com/hoodie/
[2] https://whatis.techtarget.com/definition/data-ingestion
[3] https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
[4] https://github.com/uber/hudi