前言
工作當中我們往往會遇到這樣的需求場景:
某張表中不同的統(tǒng)計指標,需要從多個不同的 kafka 數據源當中統(tǒng)計扛稽。這種做法,我們很容易想到把多個 kafka source 統(tǒng)計到的指標 union all 后,再分別統(tǒng)計瘩将,但是這樣會存在一個問題怔接,如果指標非常多的情況下搪泳,會有大量的 0 值填充,不僅會重復寫更多的代碼扼脐,還會讓代碼的可讀性降低岸军。
煩人的 0 填充代碼
試想我們有兩個業(yè)務:寄件下單和快遞攬收,分別對應了兩個 kafka 數據源瓦侮,這時我們需要統(tǒng)計當天的下單量和快遞攬收量艰赞,并且入庫到同一張表當中,這個需求非常簡單肚吏,分別統(tǒng)計兩個數據源對應的指標方妖,然后 union all 起來再 sum,代碼如下:
SET 'parallelism.default' = '1';
CREATE TABLE order_datagen (
order_count INT,
proctime as proctime()
) WITH (
'connector' = 'datagen',
'rows-per-second'='5',
'fields.order_count.kind'='random',
'fields.order_count.min'='1',
'fields.order_count.max'='5'
);
CREATE TABLE collect_datagen (
collect_count INT,
proctime as proctime()
) WITH (
'connector' = 'datagen',
'rows-per-second'='5',
'fields.collect_count.kind'='random',
'fields.collect_count.min'='1',
'fields.collect_count.max'='5'
);
CREATE TABLE print (
sta_date STRING,
order_count INT,
collect_count INT
) WITH (
'connector' = 'print'
);
INSERT INTO print
SELECT
sta_date,
sum(order_count),
sum(collect_count)
FROM (
SELECT
FROM_UNIXTIME(CAST(proctime AS BIGINT), 'yyyyMMdd') as sta_date,
order_count,
0 as collect_count
FROM order_datagen
UNION ALL
SELECT
FROM_UNIXTIME(CAST(proctime AS BIGINT), 'yyyyMMdd') as sta_date,
0 as order_count,
collect_count
FROM collect_datagen
)
GROUP BY sta_date;
這段 sql 代碼很容易想到罚攀,沒什么難度党觅,運行也沒什么問題雌澄。但是它有個非常讓人討厭的“毛病”—— 0填充。也就是其它數據源不能統(tǒng)計的指標仔役,需要用0去填充它掷伙,否則 union all 的時候字段對應不上;這個在指標多的情況下又兵,非常讓人頭疼任柜。那有沒有什么辦法解決呢?答案當然有沛厨,我們接著往下看宙地。
使用 MAP 類型消除 0 填充
在前面的代碼中,我們統(tǒng)計 order_count
的時候逆皮,需要把 collect_count
默認設置成 0宅粥;現在我們只統(tǒng)計 order_count
,再把它放到一個 MAP 字段里电谣,同理秽梅,collect_count
也做相同的處理,union all 的時候剿牺,我們只對齊 map 字段就可以了企垦,優(yōu)化后代碼如下:
...
INSERT INTO print
SELECT
sta_date,
sum(init_map['order']),
sum(init_map['collect'])
FROM (
SELECT
FROM_UNIXTIME(CAST(proctime AS BIGINT), 'yyyyMMdd') as sta_date,
MAP['order', order_count] AS init_map
FROM order_datagen
UNION ALL
SELECT
FROM_UNIXTIME(CAST(proctime AS BIGINT), 'yyyyMMdd') as sta_date,
MAP['collect', collect_count] AS init_map
FROM collect_datagen
)
GROUP BY sta_date;
上面代碼是不是簡潔多了,在指標多的時候晒来,非常方便钞诡,代碼也容易閱讀。
今天的文章很短湃崩,但非常實用荧降,希望還在被 union all 折磨的小伙伴們,早日脫離苦海~~
PS: 首先想到這個方法也是一個偶然的機會攒读,是我的一個其它組的同事朵诫,看了我們的 flink sql 代碼后,問我為什么 union all 中有那么多無用的 0 填充薄扁,我就問他有沒有什么好的辦法拗窃,他說 flink sql 支持 map 的話,可以用 map 試試泌辫;果然試了 map 后,優(yōu)化效果非常明顯九默,感謝這位小伙伴震放。_
THE END.