前言:緩慢變化知識(shí)點(diǎn)回顧
緩慢變化分三種類型:
type1:當(dāng)數(shù)據(jù)發(fā)生變化時(shí),直接覆蓋舊值
type2:當(dāng)數(shù)據(jù)發(fā)生變化時(shí)泣侮,新增一行
這里的DWID 也就是我們?nèi)粘m?xiàng)目使用的代理鍵CustomerKey
type3:當(dāng)數(shù)據(jù)發(fā)生變化時(shí),新增一列
傳統(tǒng)的拉鏈表分類(按照實(shí)現(xiàn)時(shí)的方式):
增量拉鏈表:
? ? ? ? 1.適用范圍:? ? ? ? ?1).有增量字段可以識(shí)別數(shù)據(jù)更新,且業(yè)務(wù)數(shù)據(jù)變動(dòng)時(shí),增量時(shí)間會(huì)更新窖认;2).數(shù)據(jù)不存在物理刪除
? ? ? ? 2.實(shí)現(xiàn)方式:? ? ? ? ?2).基于mysql表的增量數(shù)據(jù)實(shí)現(xiàn),此處我們假設(shè)增量數(shù)據(jù)存放于stg_a001_table1告希,全量表存放于ods_a001_table1扑浸,uuid為構(gòu)造字段,具體邏輯為燕偶,md5(concat(nvl(a),nvl(b)...))喝噪,也就是拉鏈比對字段空處理后拼接,再M(fèi)D5的值
則具體實(shí)現(xiàn)可體現(xiàn)為:
select
? ? oat.ods_start_dt,
? ? if(sat.id is not null and oat.uuid<>sat.uuid,'${ymd}',oat.ods_end_dt) as ods_end_dt -- 對更新數(shù)據(jù)進(jìn)行關(guān)鏈
? ? oat.uuid,
? ? oat.其它字段
from ods_a001_table1 oat
left join stg_a001_table1 sat on oat.id=sat.id and sat.ymd='${ymd}' -- stg_a001_table1最好設(shè)置為分區(qū)表
where oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'
union all
select
? ? '${ymd}' as ods_start_dt,
? ? '99991231' as ods_end_dt, -- 新增和更新數(shù)據(jù)進(jìn)行開鏈
? ? sat.uuid,
? ? sat.其它字段
from stg_a001_table1 sat
where sat.ymd='${ymd}'
全量拉鏈表:
?1.適用范圍:? ? ? ? ?1).無增量字段可以識(shí)別數(shù)據(jù)更新指么;2).數(shù)據(jù)存在物理刪除
?2.實(shí)現(xiàn)方式:? ? ? ? ?2).基于mysql表的增量數(shù)據(jù)實(shí)現(xiàn)酝惧,此處我們假設(shè)同步全量數(shù)據(jù)存放于stg_a001_table1,全量表存放于ods_a001_table1
select
? ? oat.ods_start_dt,
? ? case when sat.id is null or (sat.id is not null and oat.uuid<>sat.uuid),'${ymd}',oat.ods_end_dt) as ods_end_dt, -- 對更新和刪除進(jìn)行關(guān)鏈
? ? oat.uuid,
? ? oat.其它字段
from ods_a001_table1 oat
left join stg_a001_table1 sat on oat.id=sat.id and sat.ymd='${ymd}' -- stg_a001_table1最好設(shè)置為分區(qū)表
where oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'
union all
select
? ? '${ymd}' as oat.ods_start_dt,
? ? '99991231' as ods_end_dt, -- 對插入和更新的開鏈
? ? sat.uuid,
? ? sat.其它字段
from stg_a001_table1 sat
left join ods_a001_table1 oat on oat.id=sat.id and oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'-- stg_a001_table1最好設(shè)置為分區(qū)表
where sat.ymd='${ymd}'
and (oat.id is null or (oat.id is not null and oat.uuid<>sat.uuid))
基于binlog的實(shí)現(xiàn)
binlog的數(shù)據(jù)我們通過canel已經(jīng)寫入kafka,所以此處只需要通過flume去實(shí)時(shí)消費(fèi)對應(yīng)的topic并寫入hdfs即可伯诬。
1.對每天路徑下的數(shù)據(jù)進(jìn)行去重
1).創(chuàng)建外表tmp_stg_a001_table1映射到hdfs上的增量數(shù)據(jù)存儲(chǔ)路徑
2).通過row_number函數(shù)晚唇,按照主鍵,對數(shù)據(jù)進(jìn)行去重盗似,只保留最后一條數(shù)據(jù),將數(shù)據(jù)寫入表stg_a001_table1
2.基于去重?cái)?shù)據(jù)對數(shù)據(jù)進(jìn)行拉鏈處理( binlog_type為每條數(shù)據(jù)的操作類型 D標(biāo)識(shí)刪除 I為插入哩陕、U為更新)
select
? ? oat.ods_start_dt,
? ? if((sat.id is not null and oat.uuid<>sat.uuid) or sat.binlog_type='D','${ymd}',oat.ods_end_dt) as ods_end_dt -- 對更新和刪除數(shù)據(jù)進(jìn)行關(guān)鏈
? ? oat.uuid,
? ? oat.其它字段
from ods_a001_table1 oat
left join stg_a001_table1 sat on oat.id=sat.id and sat.ymd='${ymd}' -- stg_a001_table1最好設(shè)置為分區(qū)表
where oat.ods_start_dt<='${ymd}' and oat.ods_end_dt>'${ymd}'
union all
select
? ? '${ymd}' as ods_start_dt,
? ? '99991231' as ods_end_dt, -- 新增和更新數(shù)據(jù)進(jìn)行開鏈
? ? sat.uuid,
? ? sat.其它字段
from stg_a001_table1 sat
where sat.ymd='${ymd}' and sat.binlog_type<>'D' -- 當(dāng)日新增且當(dāng)日刪除的數(shù)據(jù)無需處理