1.INCREMENTAL UPDATES
? ? ? ?Hadoop和Hive正在快速發(fā)展,以超越以前集成和數(shù)據(jù)訪問的限制携茂。在近期的開發(fā)路線圖上,我們希望Hive支持完整的CRUD操作诅岩。當(dāng)我們等待這些更改時讳苦,仍然需要使用當(dāng)前選項——OVERWRITE或APPEND來進(jìn)行Hive表集成。
? ? ? ?OVERWRITE選項要求將完整的記錄集從源端移動到hadoop吩谦。雖然這種方法可能適用于較小數(shù)據(jù)集鸳谜,但一般是禁止的。
? ? ? ?APPEND選項會對數(shù)據(jù)進(jìn)行限制式廷,只包含new或update的記錄咐扭。由于Hive中還沒有真正的插入和更新,我們需要考慮一個防止重復(fù)記錄的過程滑废,因為update被附加到累計的記錄集中蝗肪。
? ? ? ?本文中,我們將研究從RDBMS源向現(xiàn)有Hive表定義添加update和insert的4步策略蠕趁。
IMPORTANT:本文中薛闪,我們假設(shè)每個源表都有唯一的單鍵或多鍵,并且為每個記錄維護(hù)一個“modified_date”字段——要么定義為原始源表的一部分俺陋,要么在抽取時添加豁延。
2.HIVE TABLE DEFINITION OPTIONS:EXTERNAL, LOCAL AND VIEW
? ? ? ?外部表是Hive表定義和HDFS管理的文件夾和文件的組合。表定義獨立于數(shù)據(jù)存在腊状,因此诱咏,如果刪除表,HDFS文件夾和文件將保持原始狀態(tài)缴挖。
? ? ? ?本地表是直接綁定到源數(shù)據(jù)的Hive表袋狞。數(shù)據(jù)在物理上綁定到表定義,如果表被刪除醇疼,數(shù)據(jù)也將被刪除硕并。
? ? ? ?與傳統(tǒng)的RDBMS一樣,視圖是存儲的SQL查詢秧荆,支持與Hive表相同的READ交互倔毙,但是它們不存儲自己的任何數(shù)據(jù)。相反乙濒,數(shù)據(jù)從存儲的對Hive表的SQL查詢中獲取陕赃。
? ? ? ?以下流程概述了一個工作流卵蛉,該工作流利用了上述所有的四個步驟:
- Ingest(抽取)么库。從完整的表(base table)抽取更改記錄(incremental_table)傻丝。
- Reconcile(一致)。創(chuàng)建一個Base+Change records(reconcile_view)的單一視圖诉儒,反應(yīng)最新的記錄集葡缰。
- Compact(結(jié)合)。從reconcile視圖創(chuàng)建報表(reporting_table)忱反。
- Purge(凈化)泛释。使用reporting table替換base table,并在下一個數(shù)據(jù)抽取周期前刪除以前處理過的更改記錄温算。
? ? ? ?將作為增量更新工作流的表和視圖有:
- base_table:一個HIVE本地表怜校,最初保存來自源系統(tǒng)的所有記錄。在初始處理周期后注竿,它將維護(hù)來自源的最新同步記錄集的副本茄茁。在每個處理周期后,reporting_table將覆蓋它巩割。(Step 4:Purge)裙顽。
- incremental_table:一個Hive外部表,保存來自源系統(tǒng)的增量更改記錄(插入和更新)宣谈。在每個處理周期結(jié)束時锦庸,清除其中內(nèi)存(Step 4:Purge)
- reconcile_view:一個Hive視圖,組合并產(chǎn)生base_table和incremental_table內(nèi)容蒲祈,只顯示最新記錄。用于填充reporting_table(Step 3:Compact)
- reporting_table:一個Hive本地表萝嘁,其中包含目標(biāo)的最新記錄梆掸。用來對base_table進(jìn)行覆蓋。
STEP 1:INGEST
? ? ? ?根據(jù)RDBMS源系統(tǒng)是否提供直接訪問牙言,可以選擇文件處理方法(直接訪問不可用時)或RDBMS處理(直接訪問可用時)酸钦。
? ? ? ?無論選擇哪種,都需要以下工作流:
? ? ? ?1.一次性咱枉,初始加載所有數(shù)據(jù)到HIVE卑硫。
? ? ? ?2.不斷加載變化數(shù)據(jù)到HIVE。
? ? ? ?下面將會討論文件處理和數(shù)據(jù)庫直接處理(Sqoop)蚕断。
FILE PROCESSING
? ? ? ?本文中欢伏,我們假設(shè)文件夾中的文件具有分隔格式,且將由關(guān)系系統(tǒng)產(chǎn)生(即記錄具有唯一的鍵或標(biāo)識符)亿乳。
? ? ? ?文件通過以下兩種方式移動到HDFS:
- WebHDFS:主要用于與應(yīng)用集成硝拧,Web URL提供一個上載端點到指定HDFS文件夾中径筏。
- NFS:顯示為標(biāo)準(zhǔn)網(wǎng)絡(luò)驅(qū)動器,并允許終端用戶使用標(biāo)準(zhǔn)復(fù)制粘貼操作將文件從標(biāo)準(zhǔn)文件系統(tǒng)移動到HDFS障陶。
? ? ? ?一旦將初始記錄集移動到HDFS中滋恬,后續(xù)就可以只移動新插入的更新的記錄。
RDBMS PROCESSING
? ? ? ?SQOOP是基于JDBC的集成傳統(tǒng)數(shù)據(jù)庫的工具抱究。SQOOP允許將數(shù)據(jù)移動到HDFS恢氯,或直接移動到Hive表中。
? ? ? ?可以使用-table參數(shù)將整個源表移動到HDFS或Hive中鼓寺。
sqoop import --connect jdbc:teradata://{host name or ip address}/Database=retail
--connection-manager org.apache.sqoop.teradata.TeradataConnManager
--username dbc
--password dbc
--table SOURCE_TBL
--target-dir /user/hive/incremental_table -m 1
? ? ? ?在初始導(dǎo)入之后勋拟,后續(xù)導(dǎo)入可以使用“check-column”、“incremental”和“l(fā)ast-value”參數(shù)利用SQOOP對增量導(dǎo)入的支持侄刽。
sqoop import --connect jdbc:teradata://{host name or ip address}/Database=retail
--connection-manager org.apache.sqoop.teradata.TeradataConnManager
--username dbc
--password dbc
--table SOURCE_TBL
--target-dir /user/hive/incremental_table
-m 1 --check-column modified_date
--incremetal lastmodified --last-value {last_import_date}
? ? ? ?另外指黎,可以利用“query”參數(shù),讓SQL select語句將導(dǎo)入限制為只導(dǎo)入新記錄或更改記錄州丹。
sqoop import --connect jdbc:teradata://{host name or ip address}/Database=retail
--connection-manager org.apache.sqoop.teradata.TeradataConnManager
--username dbc
--password dbc
--target-dir /user/hive/incremental_table
-m 1 --query 'select * from SOURCE_TBL where modified_date > {last_import_date} AND $CONDITIONS'
note:對于初始加載醋安,將“base_table”替換為“incremental_table”。對所有后續(xù)加載墓毒,使用“incremental_table”吓揪。
STEP 2:RECONCILE
? ? ? ?為了支持HIVE中的當(dāng)前記錄和新的更改記錄之間的持續(xù)協(xié)調(diào),應(yīng)該定義兩個表:base_table和incremental_table所计。
BASE_TABLE
? ? ? ?下面的示例顯示了Hive表“base_table”的DDL柠辞,它將包含位于“/user/hive/base_table”目錄下的HDFS中的任何帶分隔符的文件。這個表將包含來自源系統(tǒng)的初始主胧、完整的記錄加載叭首。在第一次處理程序運行后,它將存放來自源系統(tǒng)的最新記錄:
create table base_table(
id string,
field1 string,
field2 string,
modified_date string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/user/hive/base_table';
INCREMENTAL_TABLE
create external table base_table(
id string,
field1 string,
field2 string,
modified_date string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/user/hive/incremental_table';
RECONCILE_VIEW
? ? ? ?這個視圖組合了來自基表(base_table)和Change表(incremental_table)的記錄集踪栋,并且僅生成每個惟一“id”的最近的記錄焙格。
? ? ? ?定義如下:
CREATE VIEW reconcile_view AS
SELECT t1.* FROM
(SELECT * FROM base_table UNION ALL SELECT * FROM incremental_table) t1
JOIN
(SELECT id, max(modified_date) max_modified FROM
SELECT * FROM base_table
UNION ALL
SELECT * FROM incremental_table) t2
GROUP BY id) s
ON t1.id = s.id AND t1.modified_date = s.max_modified;
STEP 3:COMPACT
? ? ? ?reconcile_view現(xiàn)在包含最新的記錄集,并與來自RDBMS源系統(tǒng)的更改同步夷都。對于BI報告和分析工具眷唉,reporting_table可以從reconcile_view生成。在創(chuàng)建此表之前囤官,應(yīng)該像下面示例一樣刪除表的任何以前的實例冬阳。
REPORTING_TABLE
DROP TABLE reporting_table;
CREATE TABLE reporting_table AS SELECT * FROM reconcile_view;
? ? ? ?將已協(xié)調(diào)視圖(reconcile_view)移動到報表表(reporting_table)可以減少報表查詢所需的處理量。
? ? ? ?此外党饮,報表表中存儲的數(shù)據(jù)也將是靜態(tài)的肝陪,直到下一個處理周期才會更改。這提供了處理周期之間報告的一致性劫谅。相反见坑,協(xié)調(diào)視圖(reconcile_view)是動態(tài)的嚷掠,只要在更改表(incremental_table)文件夾/user/hive/incremental_table中添加或刪除新文件(保存更改記錄),它就會更改荞驴。
STEP 4:PURGE
? ? ? ?要準(zhǔn)備來自源的下一系列增量記錄不皆,只需用最新的記錄(reporting_table)替換基本表(base_table)。此外熊楼,通過刪除位于外部表位置(' /user/hive/incremental_table ')的文件霹娄,刪除以前導(dǎo)入的更改記錄內(nèi)容(incremental_table)。
DROP TABLE base_table;
CREATE TABLE base_table AS
SELECT * FROM reporting_table;
hadoop fs -rm -r /user/hive/incremental_table/*
FINAL THOUGHT
? ? ? ?雖然有幾種可能的方法來支持增量數(shù)據(jù)饋送到Hive鲫骗,這個例子有幾個關(guān)鍵的優(yōu)勢:
- 通過維護(hù)只用于更新的外部表犬耻,可以通過簡單地向該文件夾添加或刪除文件來刷新表內(nèi)容。
- 處理周期中的四個步驟(攝取执泰、協(xié)調(diào)枕磁、壓縮和清除)可以在一個OOZIE工作流中協(xié)調(diào)。OOZIE工作流可以是與數(shù)據(jù)更新SLA(即每日术吝、每周计济、每月等)相對應(yīng)的計劃事件。
- 除了支持插入和更新同步之外排苍,還可以通過向?qū)朐刺砑覦ELETE_FLAG或DELETE_DATE字段來同步刪除沦寂。然后,在Hive報表表中使用該字段作為過濾器來隱藏已刪除的記錄淘衙。例如,
CREATE VIEW reconcile_view AS
SELECT t1.* FROM
(SELECT * FROM base_table
UNION
SELECT * FROM incremental_table) t1
JOIN
(SELECT id, max(modified_date) max_modified FROM
(SELECT * FROM base_table
UNION
SELECT * FROM incremental_table)
GROUP BY id) s
ON t1.id = s.id AND t1.modified_date = s.max_modified
AND t1.delete_date IS NULL;