業(yè)務(wù)背景
- 業(yè)務(wù)的擴(kuò)展或變化是不可避免的, 尤其像互聯(lián)網(wǎng)行業(yè)镀迂, 需求變更已經(jīng)成為常態(tài)丁溅, 唯一不變的就是變化本身, 其中最常碰到的擴(kuò)展是給一個(gè)已經(jīng)存在的表增加列探遵。
- 以銷售訂單為例窟赏, 假設(shè)因?yàn)闃I(yè)務(wù)需要, 在操作型源系統(tǒng)的客戶表中增加了送貨地址的4個(gè)字段箱季, 并在銷售訂單表中增加了銷售數(shù)量字段涯穷。 由于數(shù)據(jù)源表增加了字段, 數(shù)
據(jù)倉(cāng)庫(kù)中的表也要隨之修改藏雏。本節(jié)說(shuō)明如何在客戶維度表和銷售訂單事實(shí)表上添加列拷况, 并在新列上應(yīng)用SCD2, 以及對(duì)定時(shí)裝載腳本所做的修改。 -
增加列之后的數(shù)據(jù)庫(kù)模型如下所示(藍(lán)色部分為增加列):
數(shù)據(jù)準(zhǔn)備-修改mysql中consumer/sale_order表
- mysql修改表語(yǔ)句
-- 修改mysql中consumer表
alter table
consumer
add
shipping_address varchar(50) after consumer_province,
add
shipping_zip_code int after shipping_address,
add
shipping_city varchar(30) after shipping_zip_code,
add
shipping_province varchar(50) after shipping_city;
-- 修改mysql中sale_order表
alter table
sale_order
add
order_quantity int after order_mount;
- hive 修改表
- 因?yàn)閔ive中表保存的是orcfile,雖然可以采用添加列的方式赚瘦,但是采用這種方式在hive低版本中極易出現(xiàn)錯(cuò)誤粟誓,在hive2.x版本會(huì)有較好的兼容性。所以我們采用的策略是:先對(duì)已經(jīng)存在的源表進(jìn)行重命名起意,然后再創(chuàng)建表鹰服,再進(jìn)行數(shù)據(jù)的遷移。
- 注意訂單事實(shí)表可以保存成parquet格式或者是textfile揽咕,這個(gè)表不涉及到某些字段的更新获诈,只是會(huì)追加數(shù)據(jù),所以沒(méi)必要非要保存成orcfile心褐;
- hive中需要修改的表為:ods.ods_consumer舔涎,source.source_consumer_dim,ods.ods_sale_order逗爹,dw.sale_order_fact共四張表亡嫌,因?yàn)闋可娴綇膍ysql中抽取到hive ods層數(shù)據(jù),再進(jìn)行裝載到source層數(shù)據(jù)掘而。
- 修改hive數(shù)據(jù)庫(kù)挟冠、數(shù)據(jù)表語(yǔ)句如下:
-- ****************************************************
-- @Author: LiYahui
-- @Date: Created in 2019/04/13 10:32
-- @Description: TODO 修改hive中表,增加相應(yīng)的列和數(shù)據(jù)遷移
-- @Version: V1.0
-- ****************************************************
-- hive 修改ods.ods_consumer表
alter table
ods.ods_consumer
add columns
(consumer_shipping_address varchar(50) comment '顧客送貨地址',
consumer_shipping_zip_code int comment 'shipping_zip_code',
consumer_shipping_city varchar(30) comment 'shipping_city',
consumer_shipping_province varchar(50) comment 'shipping_province'
);
-- hive 修改ods.ods_sale_order表
alter table
ods.ods_sale_order
add columns (order_quantity int comment 'order_quantity');
-- 修改hive中表名
alter table source.source_consumer_dim rename to source.source_consumer_dim_old;
-- 創(chuàng)建新表
-- 創(chuàng)建客戶維度表
create table if not exists source.source_consumer_dim(
consumer_key int comment "代理鍵",
consumer_number varchar(50) comment "顧客編號(hào)",
consumer_name varchar(50) comment "顧客名稱",
consumer_street_address varchar(50) comment "顧客地址",
consumer_zip_code varchar(50) comment "郵政編碼",
consumer_city varchar(50) comment "城市",
consumer_province varchar(50) comment "省份",
consumer_shipping_address varchar(50) comment '顧客送貨地址',
consumer_shipping_zip_code int comment 'shipping_zip_code',
consumer_shipping_city varchar(30) comment 'shipping_city',
consumer_shipping_province varchar(50) comment 'shipping_province'
consumer_valid_from date comment "有效期開(kāi)始日期",
consumer_valid_to date comment "有效期結(jié)束日期",
consumer_indicator varchar(50) comment "狀態(tài)指示器",
consumer_version int comment "顧客變化版本號(hào)"
)comment "客戶維度表"
clustered by (consumer_key) into
8 buckets
stored as
orc tblproperties ('transactional'='true')
;
-- 將舊表的數(shù)據(jù)導(dǎo)入到新表中
insert into
source.source_consumer_dim
select
consumer_key,
fromconsumer_number,
consumer_name,
consumer_street_address,
consumer_zip_code,
consumer_city,
consumer_province,
null,null,null,null,
consumer_valid_from,
consumer_valid_to,
consumer_version
from
source.source_consumer_dim_old;
-- 刪除舊表
drop table source.source_consumer_dim_old;
-- 修改銷售訂單事實(shí)表
alter table dw.sale_order_fact rename to dw.sale_order_fact_old;
-- 創(chuàng)建新表
-- 創(chuàng)建訂單事實(shí)表
create table if not exists dw.sale_order_fact(
order_sk int comment 'order surrogate key',
customer_sk int comment 'customer surrogate key',
product_sk int comment 'product surrogate key',
order_date_sk string comment 'date surrogate key',
order_amount decimal (10 , 2 ) comment'order amount',
order_quantity int comment 'order_quantity'
)comment "銷售訂單事實(shí)表"
clustered by
(order_sk) into
8 buckets
stored as
orc tblproperties ('transactional'='true')
;
-- 將舊表中數(shù)據(jù)插入到新表中
insert into dw.sale_order_fact select *,null from dw.sale_order_fact_old;
-- 刪除舊表
drop table dw.sale_order_fact_old;
修改sqoop作業(yè)
- 由于增加了數(shù)據(jù)列袍睡, 銷售訂單表的增量抽取作業(yè)要把銷售數(shù)量這個(gè)新增列的數(shù)據(jù)抽取過(guò)來(lái)知染, 因此需要重建。
- 修改定期裝載腳本 1-sqoop_extract_mysql2hive_daily.sh
- 需要注意的是:一:獲取上次拉取id斑胜,二是將新添加的字段一同拉取控淡。
- 注意:sqoop需要提前配置開(kāi)啟metastore功能,需要在配置文件中進(jìn)行提前配置止潘,以便于增量導(dǎo)入數(shù)據(jù)掺炭。也可以將sqoop的metastore數(shù)據(jù)保存在mysql中。要不然無(wú)法通過(guò)命令讀取sqoop metastore
#!/bin/bash
# @Author: LiYahui
# @Date: Created in 2019/04/12 11:20
# @Description: TODO 修改sqoop增量抽取作業(yè)
# @Version: V1.0
#!/bin/bash
# 讀取sqoop的元數(shù)據(jù)信息凭戴,前提是已經(jīng)在sqoop-site.xml配置開(kāi)啟了sqoop元數(shù)據(jù)
last_value=`sqoop job --show myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://liyahui-02:16000/sqoop | grep incremental.last.value | awk '{print $3}'`
# 刪除元數(shù)據(jù)中的sqoop job
sqoop job --delete myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://liyahui-02:16000/sqoop
#重新創(chuàng)建sqoop-job
sqoop job --meta-connect jdbc:hsqldb:hsql://liyahui-02:16000/sqoop \
--create myjob_incremental_import \
-- import \
--connect "jdbc:mysql://liyahui-02:3306/dw_source_data?useSSL=false&user=root&password=liyahui" \
--table sale_order \
--target-dir /user/root/dw/ods/ods_sale_order \
-m 4 \
--fields-terminated-by "\t" \
--incremental append \
--check-column order_number \
--last-value ${last-value}
修改定期裝載ETL hivesql
- 修改數(shù)據(jù)庫(kù)模式后涧狮, 還要修改已經(jīng)使用的定期裝載HiveQL腳本, 增加對(duì)新增數(shù)據(jù)列的處理么夫。需要對(duì)"2-update_source_consumer_dim_scd2_scd1.hql" 和"5-update_dw_sale_order_fact.hql"
part-01 修改source_consumer_dim SCD2過(guò)期時(shí)間
-- **********************************************
-- 裝載客戶維度者冤,處理consumer_street_address /已刪除記錄/consumer_shipping_address SCD2類型
-- 2-update_source_consumer_dim_scd2_scd1.hql part-01部分
-- *********************************************
-- 設(shè)置SCD的生效時(shí)間和結(jié)束時(shí)間
--結(jié)束日期
set hivevar:cur_date = current_date();
-- 結(jié)束日期
set hivevar:pre_date = date_add(${hivevar:cur_date},-1);
-- 設(shè)置最大有效日期
set hivevar:max_date = cast('9999-12-31' as date);
set hivevar:expired_indicator=cast('Expired' as string);
--更新 設(shè)置已刪除記錄和 consumer_street_address 改變的 列上 SCD2過(guò)期日期 、consumer_indicator的值為Expired
update
source.source_consumer_dim
set consumer_valid_to=${hivevar:pre_date} ,
consumer_indicator=${hivevar:expired_indicator}
where
source_consumer_dim.consumer_key
in
(
select a.consumer_key
from
-- 選擇過(guò)期時(shí)間為"9999-12-31"的表內(nèi)容档痪,作為表a
(select source_consumer_dim.consumer_key,consumer_number,consumer_street_address涉枫,consumer_shipping_address
from
source.source_consumer_dim
where
consumer_valid_to=${hivevar:max_date}
) a
-- join得出地址變化的列
left join ods.ods_consumer b
on
a.consumer_number=b.customer_number
-- 過(guò)濾出刪除的列和地址改變的列
where
b.customer_number is null
or
-- <=>處理null值
(!(a.consumer_street_address <=> b.customer_street_address)
or
!(a.consumer_shipping_address<=>b.customer_shipping_address))
);
- 語(yǔ)句說(shuō)明:
- 同客戶地址一樣, 新增的送貨地址列也是用SCD2新增歷史版本钞它。 與建立的數(shù)倉(cāng)--DW--Hadoop數(shù)倉(cāng)實(shí)踐Case-04-數(shù)據(jù)定期裝載定期裝載腳本中相同部分比較拜银, 會(huì)發(fā)現(xiàn)這里使用了一個(gè)新的關(guān)系操作符“<=>”殊鞭, 這是因?yàn)樵瓉?lái)的腳本中少判斷了一種情況。 在源系統(tǒng)庫(kù)中尼桶, 客戶地址和送貨地址列都是允許為空的操灿, 這樣的設(shè)計(jì)是出于靈活性和容錯(cuò)性的考慮。 我們以送貨地址為例進(jìn)行討論泵督。
- 使用“t1.shipping_address <> t2.shipping_address”條件判斷送貨地址是否更改趾盐, 根據(jù)不等號(hào)兩邊的值是否為空, 會(huì)出現(xiàn)以下三種情況:
(1) t1.shipping_address和t2.shipping_address都不為空小腊。 這種情況下如果兩者相等則返回false救鲤, 說(shuō)明地址沒(méi)有變化; 否則返回true秩冈, 說(shuō)明地址改變了本缠, 邏輯正確。
(2) t1.shipping_address和t2.shipping_address都為空入问。 兩者的比較會(huì)演變成null<>null丹锹, 根據(jù)Hive對(duì)“<>”操作符的定義, 會(huì)返回NULL芬失。 因?yàn)椴樵冋Z(yǔ)句中只會(huì)返回判斷條件為true的記錄楣黍, 所以不會(huì)返回?cái)?shù)據(jù)行, 這符合我們的邏輯棱烂, 說(shuō)明地址沒(méi)有改變租漂。
(3) t1.shipping_address和t2.shipping_address只有一個(gè)為空。 就是說(shuō)地址列從NULL變成非NULL颊糜, 或者從非NULL變成NULL哩治, 這種情況明顯應(yīng)該新增一個(gè)版本, 但根據(jù)“<>”的定義芭析, 此時(shí)返回值是NULL锚扎, 查詢不會(huì)返回行吞瞪, 不符合我們的需求馁启。 - 現(xiàn)在使用“!(a.shipping_address <=> b.shipping_address)”作為判斷條件, 我們先看一下Hive里是怎么定義“<=>”操作符的: A <=> B — Returns same result with EQUAL(=)operator for non-null operands, but returns TRUE if both are NULL, FALSE if one of the them is NULL芍秆。 從這個(gè)定義可知惯疙, 當(dāng)A和B都為NULL時(shí)返回TRUE, 其中一個(gè)為NULL時(shí)返回FALSE妖啥, 其他情況與等號(hào)返回相同的結(jié)果霉颠。 下面再來(lái)看這三種情況:
(1) t1.shipping_address和t2.shipping_address都不為空。 這種情況下如果兩者相等則返回!(true)荆虱, 即false蒿偎, 說(shuō)明地址沒(méi)有變化朽们, 否則返回!(false), 即true诉位, 說(shuō)明地址改變了骑脱, 符合我們的邏輯。
(2) t1.shipping_address和t2.shipping_address都為空苍糠。 兩者的比較會(huì)演變成!(null<=>null)叁丧, 根據(jù)“<=>”的定義, 會(huì)返回!(true)岳瞭, 即返回false拥娄。 因?yàn)椴樵冋Z(yǔ)句中只會(huì)返回判斷條件為true的記錄, 所以查詢不會(huì)返回行瞳筏, 這符合我們的邏輯稚瘾, 說(shuō)明地址沒(méi)有改變。
(3) t1.shipping_address和t2.shipping_address只有一個(gè)為空姚炕。 根據(jù)“<=>”的定義孟抗, 此時(shí)會(huì)返回!(false), 即true钻心, 查詢會(huì)返回行凄硼, 符合我們的需求。 - 空值的邏輯判斷有其特殊性捷沸, 為了避免不必要的麻煩摊沉, 數(shù)據(jù)庫(kù)設(shè)計(jì)時(shí)應(yīng)該盡量將字段設(shè)計(jì)成非空, 必要時(shí)用默認(rèn)值代替NULL痒给, 并將此作為一個(gè)基本的設(shè)計(jì)原則说墨。
part-02 處理SCD2新增行
- 開(kāi)發(fā)腳本如下:
-- **********************************************
-- 裝載客戶維度,處理consumer_street_address SCD2類型 新增行
-- 2-update_source_consumer_dim_scd2_scd1.hql part-02部分
-- *********************************************
-- 設(shè)置SCD的生效時(shí)間和結(jié)束時(shí)間
--結(jié)束日期
set hivevar:cur_date = current_date();
-- 結(jié)束日期
set hivevar:pre_date = date_add(${hivevar:cur_date},-1);
-- 設(shè)置最大有效日期
set hivevar:max_date = cast('9999-12-31' as date);
set hivevar:expired_indicator=cast('Expired' as string);
-- 處理source_consumer_dim的列customer_street_addresses上的新增行 SCD2方式
insert into
source.source_consumer_dim
select
-- 生成新的代理鍵
row_number() over(order by t1.consumer_number)+t2.sk_max,
t1.consumer_number,t1.consumer_name,t1.consumer_street_address,t1.consumer_zip_code,
t1.consumer_city,t1.consumer_province,
t1.consumer_shipping_address,t1.consumer_zip_code,t1.consumer_shipping_city,
t1.consumer_shipping_province,
t1.consumer_valid_from,t1.consumer_valid_to,
"Current" as consumer_indicator,t1.consumer_version
from
(
select
b.customer_number as consumer_number,
b.customer_name as consumer_name,
b.customer_street_address as consumer_street_address,
b.customer_zip_code as consumer_zip_code,
b.customer_city as consumer_city,
b.customer_state as consumer_province,
b.consumer_shipping_address,
b.consumer_zip_code,
b.consumer_shipping_city,
b.consumer_shipping_province,
a.consumer_version+1 as consumer_version,
${hivevar:pre_date} as consumer_valid_from,
${hivevar:max_date} as consumer_valid_to
from
source.source_consumer_dim a
inner join
ods.ods_consumer b
on
a.consumer_number=b.customer_number and a.consumer_valid_to=${hivevar:pre_date}
left join
source.source_consumer_dim c
on
a.consumer_number=c.consumer_number and c.consumer_valid_to=${hivevar:max_date}
where
(!(a.consumer_street_address <=> b.customer_street_address)
or
!(a.consumer_shipping_address<=>b.customer_shipping_address))
and
c.consumer_key is null
) t1
cross join
(select coalesce(max(consumer_key),0) as sk_max from source.source_consumer_dim) t2;
part-03 處理SCD1
- hive 腳本如下
-- **********************************************
-- 裝載客戶維度苍柏,處理consumer_name SCD1類型
-- 2-update_source_consumer_dim_scd2_scd1.hql part-03部分
-- *********************************************
--處理consumer_name 的SCD1尼斧,修改所有記錄中的consumer_name
drop table if exists source.source_consumer_tmp;
-- 創(chuàng)建臨時(shí)過(guò)渡表
create table source.source_consumer_tmp
as
select
a.consumer_key,a.consumer_number,
b.customer_name,b.customer_street_address,b.customer_zip_code,
a.consumer_city,a.consumer_province,
a.consumer_shiipping_address,
a.consumer_shipping_zip_code,
a.consumer_shipping_city,
a.consumer_shipping_province,
a.consumer_valid_from,
a.consumer_valid_to,
a.consumer_indicator,
a.consumer_version
from
source.source_consumer_dim a,ods.ods_consumer b
where
a.consumer_number=b.customer_number and !(a.consumer_name <=> b.customer_name);
--刪除 source.source_consumer_dim中consumer_name修改的列
delete
from
source.source_consumer_dim
where
consumer_key
in (select consumer_key from source.source_consumer_tmp);
-- 將source.source_consumer_tmp中數(shù)據(jù)插入到source.source_consumer_dim中;
insert into
source.source_consumer_dim
select * from source.source_consumer_tmp;
- 語(yǔ)句說(shuō)明:consumer_name 列上的scd1處理只是在select語(yǔ)句中增加了送貨地址的四列试吁, 并出于同樣的原因使用了“<=>”關(guān)系操作符棺棵。
part-04 處理ods.ods_consumer新增行
-- **********************************************
-- 裝載客戶維度,處理ods.ods_consumer 新增行
-- 2-update_source_consumer_dim_scd2_scd1.hql part-04部分
-- *********************************************
-- 設(shè)置SCD的生效時(shí)間和結(jié)束時(shí)間
--結(jié)束日期
set hivevar:cur_date = current_date();
-- 結(jié)束日期
set hivevar:pre_date = date_add(${hivevar:cur_date},-1);
-- 設(shè)置最大有效日期
set hivevar:max_date = cast('9999-12-31' as date);
set hivevar:expired_indicator=cast('Expired' as string);
-- 處理新增的customer數(shù)據(jù)
insert into
source.source_consumer_dim
select
row_number() over(order by t1.consumer_number)+t2.sk_max,
t1.consumer_number,t1.consumer_name,t1.consumer_street_address,t1.consumer_zip_code,
t1.consumer_city,t1.consumer_province,
t1.consumer_shipping_address,t1.consumer_shipping_zip_code,
t1.consumer_shipping_city,t1.consumer_shipping_province,
${hivevar:pre_date},
${hivevar:max_date},
'Current',
1
from
(
select
a.customer_number as consumer_number,
a.customer_name as consumer_name,
a.customer_street_address as consumer_street_address,
a.customer_zip_code as consumer_zip_code,
a.customer_city as consumer_city,
a.customer_state as consumer_province,
a.customer_shipping_address as consumer_shipping_address,
a.customer_shipping_zip_code as consumer_shipping_code,
a.customer_shipping_city as consumer_shipping_city,
a.customer_shipping_province as consumer_shipping_province
from
ods.ods_consumer a
left join
source.source_consumer_dim b
on
a.customer_number=b.consumer_number
where
b.consumer_key is null
) t1
cross join
(select coalesce(max(consumer_key),0) as sk_max from source.source_consumer_dim) t2
;
合并
- 將part-01 至part-04按照順序合并即可熄捍。
修改dw.sale_order_fact
- 修改5-update_dw_sale_order_fact.hql烛恤,添加order_quantity字段,較簡(jiǎn)單
- 修改腳本如下:
-- ****************************************************
-- @Author: LiYahui
-- @Date: Created in 2019/04/13 13:30
-- @Description: TODO 定期裝載dw.sale_order_fact
-- @Version: V1.0
-- ****************************************************
-- 裝載訂單事實(shí)表
insert into
dw.sale_order_fact
select
order_key,
consumer_key,
product_key,
day_key,
order_amount,
order_quantity
from
ods.ods_sale_order a,
source.source_order_dim b,
source.source_consumer_dim c,
source.source_product_dim d,
source.source_date_dim e,
ods.ods_cdc_time f
where
a.order_number=b.order_key
and
a.customer_number=c.consumer_number
and
a.order_date >=c.consumer_valid_from
and
a.entry_date < c.consumer_valid_to
and
a.product_code=d.product_code
and
a.order_date > d.product_valid_from
and
a.order_date < d.product_valid_to
and
to_date(a.order_date) = e.day_key
and
a.entry_date >= f.last_load
and
a.entry_date < f.current_load
;
總結(jié)
- 關(guān)于sqoop的配置余耽,需要提前配置sqoop的元數(shù)據(jù)信息缚柏,才能通過(guò)sqoop job的形式查詢到增量導(dǎo)入的最大id。
- 將修改過(guò)的腳本重新整合碟贾,進(jìn)行修改币喧。