故事背景
數(shù)據(jù)處理邏輯:
- 將一個json的數(shù)組從map結構里面扣出來
- 然后將json數(shù)組里面的每一個元素和map結構里面的其他元素重新組成一個新的map笆环,存入一個新表
實現(xiàn)方式:采用SparkSQL實現(xiàn)(Spark 3.1.2)
問題:數(shù)據(jù)少了很多
原始代碼邏輯
insert overwrite table iceberg_table
select
id,
map_concat(map_filter(config, (k, v) -> k != 'items'), elem)
from (
select
id,
config,
from_json(config['items'], 'array<map<string,string>>') list
from table
) as a
LATERAL view explode(list) as elem
怎么發(fā)現(xiàn)問題的
- 總體條數(shù)的減少
- 條數(shù)不少,但是
config
這個map
中kv
個數(shù)的減少,采用sum(size(config))
問題
explode的問題
explode
的參數(shù)如果是null
或者數(shù)組為空的時候喊括,整行記錄都會被清除掉戳杀,而不是用一個null
來補充數(shù)據(jù)久妆,而且如果你缺省設置為 array()
也是不行的,因為這個數(shù)組為空铸磅,這行數(shù)據(jù)還是會被清除掉的
map_concat的問題
map_concat
如果傳入的后續(xù)參數(shù)里面有null
的話,整個函數(shù)的返回值也是為null
了
總結
基于以上2點的問題杭朱,導致了數(shù)據(jù)的大量減少
調整之后的邏輯
初版
insert overwrite table iceberg_table
select
id,
map_concat(map_filter(config, (k, v) -> k != 'items'), COALESCE(elem, map()))
from (
select
id,
config,
from_json(config['items'], 'array<map<string,string>>') list
from table
) as a
LATERAL view
explode(COALESCE(if(size(list)=0, null, list), array(null))) as elem
這版可以看到阅仔,我們需要在2個地方進行修改
終版
切入點:將2處修改合并到一處
insert overwrite table iceberg_table
select
id,
map_concat(map_filter(config, (k, v) -> k != 'items'), elem)
from (
select
id,
config,
from_json(config['items'], 'array<map<string,string>>') list
from table
) as a
LATERAL view
explode(COALESCE(if(size(list)=0, null, list), array(map())) as elem