背景
本篇為Hudi概念和特性相關(guān)介紹淹朋。依據(jù)于官網(wǎng)和相關(guān)博客資料模庐,融入了個人理解掂碱。內(nèi)容可能會有疏漏疼燥,歡迎大家指正和補充醉者。
Hudi概念
Apache Hudi是一個Data Lakes的開源方案撬即,Hudi是Hadoop Updates and Incrementals的簡寫,它是由Uber開發(fā)并開源的Data Lakes解決方案粒竖。Hudi具有如下基本特性/能力:
- Hudi能夠攝入(Ingest)和管理(Manage)基于HDFS之上的大型分析數(shù)據(jù)集蕊苗,主要目的是高效的減少入庫延時沿后。
- Hudi基于Spark/Flink/Hive來對HDFS上的數(shù)據(jù)進行更新、插入尖滚、刪除等熔掺。
- Hudi在HDFS數(shù)據(jù)集上提供如下流原語:插入更新(如何改變數(shù)據(jù)集);增量拉缺刚馈(如何獲取變更的數(shù)據(jù))。
- Hudi可以對HDFS上的parquet格式數(shù)據(jù)進行插入/更新操作宇驾。
- Hudi通過自定義InputFormat與Hadoop生態(tài)系統(tǒng)(Spark课舍、Hive捡需、Parquet)集成筹淫。
- Hudi通過Savepoint來實現(xiàn)數(shù)據(jù)恢復饰剥。
來源:Hudi數(shù)據(jù)湖簡介_阿福Chris的博客-CSDN博客_hudi 數(shù)據(jù)湖
時間線
https://hudi.apache.org/docs/next/timeline/
Hudi內(nèi)部維護了時間線,支持按照數(shù)據(jù)的到達時間順序來獲取數(shù)據(jù)瘸右。Hudi確保時間線上的動作是原子性的。
Hudi 表類型
https://hudi.apache.org/docs/next/table_types/#table-and-query-types
Copy on Write
數(shù)據(jù)使用列存儲形式存放太颤,每次提交都會產(chǎn)生新版本的列存儲文件盹沈。將原有數(shù)據(jù)文件copy一份,合并入變更的數(shù)據(jù)后保存一份新的數(shù)據(jù)锚贱。因此寫入放大很高晋修,讀取放大基本為0。適用于讀取多寫入少的場景落剪。寫入存在延遲忠怖,F(xiàn)link每次checkpoint或者Spark每次微批處理才會commit新數(shù)據(jù)。讀取延遲很低。可以通過hoodie.cleaner.commits.retained
配置項確定需要保存的最近commit個數(shù)哮笆,防止存儲空間占用無限放大福铅÷缘矗可以提供較好的并發(fā)控制,因為讀取數(shù)據(jù)的時候,無法讀取到正在寫入但尚未commit或者是寫入失敗的數(shù)據(jù)。
Merge on Read
數(shù)據(jù)使用行存儲(avro)和列存儲(parquet)共同存放哟玷。其中新變更的數(shù)據(jù)使用行存儲抑月,歷史數(shù)據(jù)采用列存儲。每當滿足一定條件的時候(經(jīng)過n個commit或者經(jīng)過特定時間)Hudi開始compact操作洁仗,將行存儲的數(shù)據(jù)和列存儲的合并瓮增,生成新的列存儲文件你踩。適用于寫多讀少的場景吩谦。寫入延遲很小滑废,讀取的時候需要合并新文件avro和parquet袜爪,存在一定延遲。需要使用定期的online compaction或者是手工執(zhí)行的offline compaction將avro格式和parquet格式文件合并腊状。
通過配置項table.type
指定表類型。
兩種表類型的特性對比:
Trade-off | CopyOnWrite | MergeOnRead |
---|---|---|
Data Latency | Higher | Lower |
Query Latency | Lower | Higher |
Update cost (I/O) | Higher (rewrite entire parquet) | Lower (append to delta log) |
Parquet File Size | Smaller (high update(I/0) cost) | Larger (low update cost) |
Write Amplification | Higher | Lower (depending on compaction strategy) |
個人理解:MOR表在compaction之后也會生成包含最新版本數(shù)據(jù)的parquet文件同蜻。也就是說它的最終數(shù)據(jù)保存形式和COW表完全相同秧荆。所以MOR表可以理解成是一種每提交N次才Copy on write的COW表。MOR表的中間結(jié)果使用log日志文件(行存儲)保存埃仪。log中只保存變更的數(shù)據(jù)乙濒。新增的數(shù)據(jù)使用parquet保存。由于Hudi支持像文件優(yōu)化策略,如果原來的parquet被判定為小文件颁股,新增的數(shù)據(jù)和原有數(shù)據(jù)合并后生成新的parquet文件“替換”原有舊parquet文件么库。如果原來parquet不是小文件,或者說合并之后的數(shù)據(jù)量超過小文件判定標準甘有,超出小文件判定標準的部分會分裂成新的parquet文件保存诉儒。
查詢類型
https://hudi.apache.org/docs/next/table_types/#table-and-query-types
Snapshot Queries : Queries see the latest snapshot of the table as of a given commit or compaction action. In case of merge on read table, it exposes near-real time data(few mins) by merging the base and delta files of the latest file slice on-the-fly. For copy on write table, it provides a drop-in replacement for existing parquet tables, while providing upsert/delete and other write side features.
Incremental Queries : Queries only see new data written to the table, since a given commit/compaction. This effectively provides change streams to enable incremental data pipelines.
Read Optimized Queries : Queries see the latest snapshot of table as of a given commit/compaction action. Exposes only the base/columnar files in latest file slices and guarantees the same columnar query performance compared to a non-hudi columnar table.
總結(jié):
- Snapshot Queries:MOR表合并avro和parquet文件之后輸出結(jié)果,存在較大的查詢延遲亏掀,但是數(shù)據(jù)沒有延遲忱反。COW表直接讀取現(xiàn)有的新版本parquet文件,查詢延遲低滤愕,但是有數(shù)據(jù)延遲(最新的數(shù)據(jù)可能沒有寫入温算,還沒有commit)。
- Incremental Queries:只查詢某次commit或compaction之后新插入或者修改的數(shù)據(jù)间影。
- Read Optimized Queries:僅適用于MOR表注竿,為了優(yōu)化MOR表的讀取性能,此方式讀取的時候不會強制合并avro和parquet文件魂贬。而是只讀取parquet文件巩割。這樣查詢延遲很低,但是數(shù)據(jù)存在延遲(在avro但是還沒有compact的數(shù)據(jù)無法被查詢到)付燥。
Table Type | Supported Query types |
---|---|
Copy On Write | Snapshot Queries + Incremental Queries |
Merge On Read | Snapshot Queries + Incremental Queries + Read Optimized Queries |
通過hoodie.datasource.query.type
參數(shù)控制查詢類型宣谈。配置項對應(yīng)為:
- snapshot(默認值)
- read_optimized
- incremental
Trade-off | Snapshot | Read Optimized |
---|---|---|
Data Latency | Lower | Higher |
Query Latency | Higher (merge base / columnar file + row based delta / log files) | Lower (raw base / columnar file performance) |
索引
Hudi通過HoodieKey(recordKey和poartition path)和file id的對應(yīng)關(guān)系來加速upsert操作。這正是Hudi的索引機制键科。這種對應(yīng)關(guān)系在初始記錄寫入之后不會在改變闻丑。
對于COW表插入數(shù)據(jù)的場景,索引可以快速的過濾掉不涉及數(shù)據(jù)修改的file萝嘁。對于MOR表插入數(shù)據(jù)的場景梆掸,索引能夠很快定位到需要合并的文件扬卷。不需要像Hive ACID一樣牙言,合并所有的base file。
Hudi支持下面4種Index選項:
- Bloom Index(默認值):基于recordkey構(gòu)建布隆過濾器怪得,可以快速定位record key位于哪個file中咱枉。
- Simple:將更新/刪除的數(shù)據(jù)同表中存儲的數(shù)據(jù)的key做簡單的join操作。
- HBase index:將索引映射存儲到HBase中徒恋。
- 自己實現(xiàn)索引蚕断。
所有具有GLOBAL和非GLOBAL兩種(HBase本來就是global的)。其中:
- global index確保一個表的所有分區(qū)都不會有重復的數(shù)據(jù)入挣。但是更新的刪除的代價會隨著表中數(shù)據(jù)量的增長而加大亿乳。對于較小的表可以接受。
- 非global index只能確保一個表的同一分區(qū)之內(nèi)不會存在重復數(shù)據(jù)。需要writer去確保某條記錄的更新/刪除操作對應(yīng)的partition path始終一致葛假。在查找index的時候性能更好障陶。
使用場景:
- 事實表數(shù)據(jù)遲到場景:Bloom
- event表去重(通常只append數(shù)據(jù)):Bloom
- 維度表(通常不會分區(qū))變更(變更量較少):SIMPLE或HBase
hoodie.index.type
用來修改Index選項。
注意:使用GLOBAL_BLOOM的時候需要留意hoodie.bloom.index.update.partition.path
配置聊训。源碼給出的解釋如下:
/**
* Only applies if index type is GLOBAL_BLOOM.
* <p>
* When set to true, an update to a record with a different partition from its existing one
* will insert the record to the new partition and delete it from the old partition.
* <p>
* When set to false, a record will be updated to the old partition.
*/
設(shè)置為true的話抱究,如果record更新操作修改了partition,則會在新partition插入這條數(shù)據(jù)带斑,然后在舊partition刪除這條數(shù)據(jù)鼓寺。
如果設(shè)置為false,會在就parititon更新這條數(shù)據(jù)勋磕。
hoodie.simple.index.update.partition.path
對于GLOBAL_SIMPLE也同理妈候。
文件結(jié)構(gòu)
https://hudi.apache.org/docs/next/file_layouts/
- Hudi在分布式文件系統(tǒng)base path下的目錄結(jié)構(gòu)來保存表數(shù)據(jù)。
- 表被分解為一個個分區(qū)朋凉。
- 在每個分區(qū)中州丹,文件以file group形式組織,每個都由獨特的file id標識杂彭。
- 每個file group包含多個file slice墓毒。
- 每個slice包含一個base file(commit或者compact的時候創(chuàng)建)還有一系列的log file(MOR表的插入或修改)。
元數(shù)據(jù)表(Metadata Table)
https://hudi.apache.org/docs/next/metadata/
用于提高讀寫性能亲怠,避免使用list file操作所计。
0.10.1以后metadata table默認啟用(hoodie.metadata.enable
)。
為了確保metadata table保持最新团秽,針對同一張Hudi表的寫操作需要根據(jù)不同的場景增加相應(yīng)配置主胧。
單個writer同步表服務(wù)(清理,聚簇习勤,壓縮)踪栋,只需配置hoodie.metadata.enable=true
,重啟writer图毕。
單個writer異步表服務(wù)(同一進程內(nèi))夷都,需要配置樂觀并發(fā)訪問控制:
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
多個writer(不同進程)異步表服務(wù),需要配置樂觀并發(fā)訪問控制:
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=<distributed-lock-provider-classname>
外部分布式鎖提供方有:ZookeeperBasedLockProvider
, HiveMetastoreBasedLockProvider
和DynamoDBBasedLockProvider
予颤。
Hudi 寫類型
https://hudi.apache.org/docs/next/write_operations/
- UPSERT : This is the default operation where the input records are first tagged as inserts or updates by looking up the index. The records are ultimately written after heuristics are run to determine how best to pack them on storage to optimize for things like file sizing. This operation is recommended for use-cases like database change capture where the input almost certainly contains updates. The target table will never show duplicates.
- INSERT : This operation is very similar to upsert in terms of heuristics/file sizing but completely skips the index lookup step. Thus, it can be a lot faster than upserts for use-cases like log de-duplication (in conjunction with options to filter duplicates mentioned below). This is also suitable for use-cases where the table can tolerate duplicates, but just need the transactional writes/incremental pull/storage management capabilities of Hudi.
- BULK_INSERT : Both upsert and insert operations keep input records in memory to speed up storage heuristics computations faster (among other things) and thus can be cumbersome for initial loading/bootstrapping a Hudi table at first. Bulk insert provides the same semantics as insert, while implementing a sort-based data writing algorithm, which can scale very well for several hundred TBs of initial load. However, this just does a best-effort job at sizing files vs guaranteeing file sizes like inserts/upserts do.
總結(jié):
- UPSERT: 通過索引來區(qū)別數(shù)據(jù)是insert還是update囤官。不會存在重復數(shù)據(jù)。
- INSERT: 跳過index查找過程蛤虐。比upsert執(zhí)行要快党饮,但無法保證數(shù)據(jù)不重復。
- BULK_INSERT: upsert和insert使用內(nèi)存緩存驳庭。upsert和insert操作對加載原始(存量)數(shù)據(jù)到Hudi中不是很合適刑顺。加載初始數(shù)據(jù)適合使用bulk insert方式。此方式使用基于排序的寫入算法,適合TB級別數(shù)據(jù)的導入操作蹲堂。減少序列化和數(shù)據(jù)合并操作荞驴。不能夠提供數(shù)據(jù)去重功能。
- DELETE: 刪除操作贯城。Hudi支持軟刪除和硬刪除熊楼。軟刪除指的是保留Hoodie key的同時將所有其他字段設(shè)置為null。需要表的schema允許其他所有字段為null能犯,然后將其他所有字段upsert為null鲫骗。硬刪除指的是物理刪除。
通過write.operation
配置項指定踩晶。
寫入步驟
The following is an inside look on the Hudi write path and the sequence of events that occur during a write.
- Deduping(去重)
同一批中的數(shù)據(jù)先去重执泰,需要combine或者reduce by key。 - Index Lookup(查詢索引)
查找插入的數(shù)據(jù)位于哪些file slice中渡蜻。 - File Sizing(文件大小控制)
Hudi基于之前提交數(shù)據(jù)的平均大小术吝,制定計劃將小文件中插入足夠的數(shù)據(jù),使它的大小接近配置的容量上限限制茸苇。 - Partitioning(分區(qū))
確定update和insert的數(shù)據(jù)屬于哪個file group排苍,可能伴隨新file group的創(chuàng)建。 - Write I/O(寫入數(shù)據(jù))
寫入數(shù)據(jù)過程学密。根據(jù)表類型寫入base file或者是log file淘衙。 - Update Index(更新索引)
Now that the write is performed, we will go back and update the index.
寫入操作已完成,更新索引腻暮。 - Commit(提交)
原子提交所有的更改彤守。 - Clean (if needed)(清理,如果需要的話)
如果需要哭靖,運行清理步驟具垫。 - Compaction(壓縮)
如果使用MOR表,壓縮會同步運行试幽,或者是異步調(diào)度筝蚕。 - Archive(歸檔)
最后歸檔步驟將老舊的timeline項目移動到歸檔目錄。
Schema變更
https://hudi.apache.org/docs/next/schema_evolution
試驗功能抡草,Spark 3.1.x和3.2.x支持Schema變更饰及。
Key Generation
https://hudi.apache.org/docs/next/key_generation
Primary key由RecordKey和Partition path組成蔗坯。
RecordKey由hoodie.datasource.write.recordkey.field
決定康震。Partition path由hoodie.datasource.write.partitionpath.field
決定。
并發(fā)控制
https://hudi.apache.org/docs/next/concurrency_control
Hudi支持MVCC和樂觀并發(fā)訪問兩種方式宾濒。MVCC方式所有的table service都使用同一個writer來保證沒有沖突腿短,避免竟態(tài)條件。新版本的Hudi增加了樂觀并發(fā)訪問控制(OCC)。支持文件級別的樂觀鎖橘忱。需要依賴外部組件實現(xiàn)樂觀鎖赴魁,例如Zookeeper,Hive metastore等钝诚。
啟用并發(fā)控制:
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=<lock-provider-classname>
使用Zookeeper分布式鎖:
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
hoodie.write.lock.zookeeper.url
hoodie.write.lock.zookeeper.port
hoodie.write.lock.zookeeper.lock_key
hoodie.write.lock.zookeeper.base_path
使用HiveMetastore分布式鎖:
hoodie.write.lock.provider=org.apache.hudi.hive.HiveMetastoreBasedLockProvider
hoodie.write.lock.hivemetastore.database
hoodie.write.lock.hivemetastore.table
禁用并發(fā)寫入:
hoodie.write.concurrency.mode=single_writer
hoodie.cleaner.policy.failed.writes=EAGER