生產(chǎn)環(huán)境應(yīng)用場(chǎng)景描述
每天有很多事實(shí)表需要增量同步,在HIVE沒有開啟事務(wù)模式的條件下,需要全表重新寫入HDFS中作烟,這在需要巨大的IO時(shí)間開銷振湾,每天的增量數(shù)據(jù)占總數(shù)據(jù)的比列很小球涛,這種方式顯得非常低效劣针。
現(xiàn)在的想法是,考慮利用 INSERT OVER TABLE 語(yǔ)句對(duì)分區(qū)表進(jìn)行指定分區(qū)覆蓋插入亿扁,來實(shí)現(xiàn)增量更新的效果捺典。
1,首先創(chuàng)建測(cè)試數(shù)據(jù)集
- a从祝、全量數(shù)據(jù)集
CREATE TABLE `ods.employees_all`( `employee_id` double,
`first_name` string,
`last_name` string,
`email` string,
`phone_number` string,
`hire_date` timestamp,
`job_id` string,
`salary` double,
`commission_pct` double,
`manager_id` double,
`department_id` int)
partitioned by (ds string)
stored as orcfile;
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(1, 'Eleni', 'Zlotkey', 'EZLOTKEY', '011.44.1344.429018', '2001-01-29 00:00:00', 'SA_MAN', 10500.0, 0.2, 100.0, 80.0, '2001');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(2, 'Mattea', 'Marvins', 'MMARVINS', '011.44.1346.329268', '2001-01-24 00:00:00', 'SA_REP', 7200.0, 0.1, 147.0, 80.0, '2001');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(3, 'David', 'Lee', 'DLEE', '011.44.1346.529268', '2002-02-23 00:00:00', 'SA_REP', 6800.0, 0.1, 147.0, 80.0, '2002');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(4, 'Sundar', 'Ande', 'SANDE', '011.44.1346.629268', '2002-03-24 00:00:00', 'SA_REP', 6400.0, 0.1, 147.0, 80.0, '2002');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(5, 'Amit', 'Banda', 'ABANDA', '011.44.1346.729268', '2003-04-21 00:00:00', 'SA_REP', 6200.0, 0.1, 147.0, 80.0, '2003');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(6, 'shabi', 'shabi', 'shabi', '590.423.4569', '2003-06-25 00:00:00.0', 'IT_PROG', 4800.0, 0.0, 103.0, 60.0, '2003');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(7, 'chunhuo', 'chunhuo', 'chunhuo', '590.423.4560', '2004-02-05 00:00:00.0', 'IT_PROG', 4800.0, 0.0, 103.0, 60.0, '2004');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(8, 'Payam', 'Kaufling', 'PKAUFLIN', '650.123.3234', '2004-05-01 00:00:00.0', 'ST_MAN', 7900.0, 0.0, 100.0, 50.0, '2004');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(9, 'David', 'Bernstein', 'DBERNSTE', '011.44.1344.345268', '2005-03-24 00:00:00.0', 'SA_REP', 9500.0, 0.25, 145.0, 80.0, '2005');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(10, 'Ellen', 'Abel', 'EABEL', '011.44.1644.429267', '2005-05-11 00:00:00.0', 'SA_REP', 11000.0, 0.3, 149.0, 80.0, '2005');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(11, 'Britney', 'Everett', 'BEVERETT', '650.501.2876', '2006-03-03 00:00:00.0', 'SH_CLERK', 3900.0, 0.0, 123.0, 50.0, '2006');
INSERT INTO ods.employees_all (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(12, 'Samuel', 'McCain', 'SMCCAIN', '650.501.3876', '2006-07-01 00:00:00.0', 'SH_CLERK', 3200.0, 0.0, 123.0, 50.0, '2006');
- b襟己、增量數(shù)據(jù)集
CREATE TABLE `ods.employees_all`( `employee_id` double,
`first_name` string,
`last_name` string,
`email` string,
`phone_number` string,
`hire_date` timestamp,
`job_id` string,
`salary` double,
`commission_pct` double,
`manager_id` double,
`department_id` int)
partitioned by (ds string)
stored as orcfile;
INSERT INTO ods.employees_tmp (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(4, 'Sundar_update', 'Sundar_update', 'Sundar_update', '011.44.1346.629268', '2002-03-24 00:00:00', 'SA_REP', 6400.0, 0.1, 147.0, 80.0, '2002');
INSERT INTO ods.employees_tmp (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(6, 'shabi_update', 'shabi_update', 'shabi_update', '590.423.4569', '2003-06-25 00:00:00.0', 'IT_PROG', 4800.0, 0.0, 103.0, 60.0, '2003');
INSERT INTO ods.employees_tmp (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(8, 'Payam_update', 'Payam_update', 'Payam_update', '650.123.3234', '2004-05-01 00:00:00.0', 'ST_MAN', 7900.0, 0.0, 100.0, 50.0, '2004');
INSERT INTO ods.employees_tmp (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(13, 'David_incremental', 'David_incremental', 'David_incremental', '011.44.1344.345268', '2007-03-24 00:00:00.0', 'SA_REP', 9500.0, 0.25, 145.0, 80.0, '2007');
INSERT INTO ods.employees_tmp (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(14, 'Ellen_incremental', 'Ellen_incremental', 'Ellen_incremental', '011.44.1644.429267', '2007-05-11 00:00:00.0', 'SA_REP', 11000.0, 0.3, 149.0, 80.0, '2007');
INSERT INTO ods.employees_tmp (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(15, 'Britney_incremental', 'Britney_incremental', 'Britney_incremental', '650.501.2876', '2008-03-03 00:00:00.0', 'SH_CLERK', 3900.0, 0.0, 123.0, 50.0, '2008');
INSERT INTO ods.employees_tmp (employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id, ds) VALUES(16, 'Samuel_incremental', 'Samuel_incremental', 'Samuel_incremental', '650.501.3876', '2008-07-01 00:00:00.0', 'SH_CLERK', 3200.0, 0.0, 123.0, 50.0, '2008');
2,預(yù)覽測(cè)試數(shù)據(jù)
-
全量
-
增量
3牍陌,查看全量表HDFS分區(qū)的目錄情況
4擎浴,用INSER OVERWRITE TABLE語(yǔ)句定向更新目標(biāo)分區(qū)
通過預(yù)覽表數(shù)據(jù),我知道全量表當(dāng)前有12條數(shù)據(jù)毒涧,6個(gè)分區(qū)贮预;增量表有7條數(shù)據(jù),5個(gè)分區(qū)契讲,其中2007和2008是新增分區(qū)仿吞,2002,2003,2004是更新分區(qū)。更新語(yǔ)句如下:
INSERT OVERWRITE TABLE ods.employees_all PARTITION(ds)
SELECT t1.* FROM (
SELECT a.*
FROM ods.employees_all a
LEFT join ods.employees_tmp b ON a.employee_id = b.employee_id
WHERE b.employee_id IS NULL
AND EXISTS (SELECT 1 FROM ods.employees_tmp c WHERE a.ds=c.ds )
UNION ALL
SELECT * FROM ods.employees_tmp
) t1
其中表t1的結(jié)果集為:
這里需要解釋一下的被動(dòng)更新行唤冈,他的意思是這些行本身并不在新增數(shù)據(jù)集中,但因?yàn)槠浞謪^(qū)與新增數(shù)據(jù)集中的某些行的分區(qū)相同霹琼,因此也被命中以便覆蓋全量數(shù)據(jù)集中的目標(biāo)分區(qū)务傲。
由t1數(shù)據(jù)集中有共有10條數(shù)據(jù)凉当,其中新增4條枣申,更新3條,被動(dòng)更新3條看杭,因此如果INSERT OVERWRITE TABLE語(yǔ)句執(zhí)行成功后忠藤,ods.employees_all中應(yīng)該有16條數(shù)據(jù)。
以下是更新后的全量數(shù)據(jù)集:
數(shù)據(jù)如預(yù)期的完全一致楼雹,說明INSERT OVERWRITE TABLE語(yǔ)句確實(shí)是分區(qū)表做增量更新的最優(yōu)選擇模孩,這種更新方式邏輯清晰簡(jiǎn)單,實(shí)現(xiàn)方式優(yōu)雅贮缅,絕對(duì)是不二之選榨咐。
5,更新后全量表HDFS分區(qū)的目錄情況
可以看到谴供,在HDFS目錄中块茁,語(yǔ)句執(zhí)行成功后,自動(dòng)創(chuàng)建了新增的4條記錄所對(duì)應(yīng)的分區(qū)。
6数焊,總結(jié)
之前一直苦于在HIVE不開啟事務(wù)的模式下永淌,怎么做增量更新。
剛接觸hive佩耳,思維還停留在傳統(tǒng)的關(guān)系型數(shù)據(jù)庫(kù)中遂蛀,對(duì)不能DELETE,UPDATE操作的數(shù)倉(cāng)方式非常不適應(yīng)干厚,對(duì)這種無法更新數(shù)據(jù)行的數(shù)倉(cāng)深感蛋疼(認(rèn)識(shí)還處于低級(jí)水平所致)李滴。經(jīng)過這番探索后發(fā)現(xiàn),其實(shí)hive遠(yuǎn)比自己想的要強(qiáng)大的多萍诱,一般常規(guī)性的問題悬嗓,前人早已給出解決方案,自己的困惑完全是來自于低級(jí)的無知裕坊。
上面的這個(gè)例子中包竹,如果有數(shù)據(jù)庫(kù)日志更新表或原表中有一個(gè)可用的update_time時(shí)間戳,t1數(shù)據(jù)集其實(shí)可以在sqoop中的query選項(xiàng)使用籍凝,導(dǎo)入HDFS后生成一個(gè)臨時(shí)表周瞎,然后直接用這個(gè)臨時(shí)表insert overwrite table到目標(biāo)全量表。
完饵蒂。