Spark菜鳥學(xué)習(xí)營Day2
分布式系統(tǒng)需求分析
本分析主要針對從原有代碼向Spark的遷移。要注意的是Spark和傳統(tǒng)開發(fā)有著截然不同的思考思路骗村,所以我們需要首先對原有代碼進(jìn)行需求分析无宿,形成改造思路后布讹,再著手開發(fā)。
對于輸入和輸出证舟,請注意颂暇,指的是以程序?yàn)檫吔绲妮斎牒洼敵銮闆r缺谴。
主要遷移點(diǎn):
A:批量數(shù)據(jù)清理
-
重點(diǎn):分析要清理的表在哪里
- A1.參數(shù)表:存放Oracle、Redis耳鸯。清理Oracle就可以湿蛔,Redis會(huì)同步清理
- 表一般是以par_開頭
- A2.輸入數(shù)據(jù)表(由數(shù)據(jù)接收或者其他渠道導(dǎo)入):存放Oracle、HBase县爬,兩邊都要清理阳啥。
- 表一般是以_temp結(jié)尾
- A3.中間數(shù)據(jù)表(僅拆分內(nèi)部使用):存放RDD,需要清理RDD财喳。
- 表僅被名稱中含有split字樣的程序調(diào)用
- A4.輸出數(shù)據(jù)表(非拆分模塊使用):存放Oracle察迟,可能存放HBase,兩邊都要清理耳高。
- 表被其他程序調(diào)用
- A1.參數(shù)表:存放Oracle、Redis耳鸯。清理Oracle就可以湿蛔,Redis會(huì)同步清理
-
實(shí)際情況下表的用途會(huì)有組合的情況
- A3+A4.中間數(shù)據(jù)表 and 輸出數(shù)據(jù)表
輸入:可能有扎瓶,需分析,比如刪除條件有表關(guān)聯(lián)情況
輸出:無
B:批量數(shù)據(jù)轉(zhuǎn)換
- 特征:insert ... select 語句
這是最接近標(biāo)準(zhǔn)化的分布式處理泌枪,可以使用Dataframe或RDD編程來開發(fā)概荷。 - 開發(fā)方法:
- B1.Dataframe
- 所有輸入表都是參數(shù)表、輸入數(shù)據(jù)表碌燕、中間表误证,并且SQL語句支持(不包含not exists等特殊語法)继薛,使用Dataframe編程
- B2. RDD
- 不滿足B1條件
- B1.Dataframe
- 輸入:肯定有
- 輸出:肯定有
C:單行循環(huán)轉(zhuǎn)換
- 特征:pl/sql的游標(biāo)操作,包括for語法的游標(biāo)操作雷厂。
C1. 游標(biāo)轉(zhuǎn)RDD
將游標(biāo)邏輯轉(zhuǎn)為RDD的操作。
- 輸入:肯定有
- 輸出:無
C2. 單行數(shù)據(jù)過濾
一般語句中有continue或者goto語句叠殷,指對單行數(shù)據(jù)進(jìn)行判斷改鲫,滿足條件就處理,否則不處理
這里可能會(huì)出現(xiàn)對Oracle數(shù)據(jù)的判斷林束,需提前把Oracle數(shù)據(jù)預(yù)先緩存出來像棘,邏輯中訪問緩存下來的數(shù)據(jù),避免對于數(shù)據(jù)庫的大量連接壶冒。
- 輸入:無
- 輸出:無
C3. 重復(fù)數(shù)據(jù)過濾
相比單行過濾更加復(fù)雜缕题,如果與已處理數(shù)據(jù)不重復(fù)才會(huì)處理
- 輸入:無
- 輸出:無
C4.單行數(shù)據(jù)清理
會(huì)根據(jù)單行數(shù)據(jù)的條件執(zhí)行數(shù)據(jù)清理操作
- 輸入:無
- 輸出:無
C5.單行數(shù)據(jù)輸出
一般使用map或mapPartitions算子。
這部分設(shè)計(jì)比較難胖腾,有幾個(gè)設(shè)計(jì)點(diǎn):
- 如果有多個(gè)輸出烟零,需要進(jìn)行多次對的map。
- 如果多次輸出有公共數(shù)據(jù)咸作,需要額外增加一次map來處理公共數(shù)據(jù)锨阿。
- 輸入:無
- 輸出:肯定有
D.優(yōu)化處理
不是直接從原有代碼轉(zhuǎn)化,主要從性能角度出發(fā)來添加记罚,包括:
- D1.緩存Oracle數(shù)據(jù)
- 輸入:肯定有
- 輸出:無
- D2.緩存Redis數(shù)據(jù)
- 輸入:肯定有
- 輸出:無
分析樣例1
- 步驟1:清理中間表+結(jié)果數(shù)據(jù)表(A3+A4)
- 輸入:無
- 輸出:無
DELETE out_trd_qtsl t WHERE t.rq = v_last_date;
- 步驟2:輸出數(shù)據(jù)表墅诡,清理Oracle(A4)
- 輸入:無
- 輸出:無
DELETE FROM out_trd_qtsl_sub t WHERE t.rq = p_i_date;
- 步驟3:輸出數(shù)據(jù)表,清理Oracle(A4)
- 輸入:無
- 輸出:無
DELETE FROM out_trd_qtsl_his t WHERE t.rq = p_i_date;
- 步驟4:使用Dataframe的select語句來進(jìn)行處理(B1)
- 輸入:qtsl_temp:Dataframe ; par_fund_partner:DataFrame
- 輸出:無
INSERT INTO out_trd_qtsl
(scdm,
hydm,
...
SELECT scdm,
hydm,
...
FROM qtsl_temp a
WHERE a.rq = v_last_date
AND a.zqzh IN (SELECT partner_code
FROM par_fund_partner
WHERE market_code = v_scdm -- 上海市場
AND sub_partner_code = '000000' --不含子股東代碼
AND v_last_date BETWEEN inure_begin_date AND
inure_end_date);
- 步驟5:游標(biāo)轉(zhuǎn)RDD(C1)
- 輸入:qtsl_temp:RDD ; par_sys_fill_partner:RDD
- 輸出:無
SELECT nvl(a.scdm, '') scdm, --市場代碼
nvl(a.hydm, '') hydm, --結(jié)算參與人的清算編號
nvl(a.sjlx, '') sjlx, --數(shù)據(jù)類型
...
FROM qtsl_temp a
WHERE a.rq = v_last_date
AND (a.zqzh IN (SELECT t.partner_code FROM par_sys_fill_partner t) OR
a.zqzh IS NULL OR a.zqzh = '0')
- 步驟6:緩存Oracle數(shù)據(jù)(D1)
- 輸入:out_trd_qtsl_his:Oracle
- 輸出:無
str := 'select count(1) from out_trd_qtsl_his t where ';
IF r_qtsl_sub.scdm IS NOT NULL THEN
str := str || 't.SCDM = ''' || r_qtsl_sub.scdm || ''' and '; --市場代碼
END IF;
...
EXECUTE IMMEDIATE str
INTO v_count;
- 步驟7:第一次map操作桐智,根據(jù)Oracle數(shù)據(jù)進(jìn)行過濾末早,并生成補(bǔ)錄編號(C2+C5)
- 輸入:無
- 輸出:無
OPEN c_qtsl_sub;
LOOP
<<error_row>>
FETCH c_qtsl_sub
INTO r_qtsl_sub;
EXIT WHEN c_qtsl_sub%NOTFOUND;
...
--進(jìn)行數(shù)據(jù)過濾
IF v_count = 0 THEN
--生成補(bǔ)錄編號
SELECT lpad(to_char(seq_filldata_no.NEXTVAL), 15, '0')
INTO v_seq
FROM dual;
...
END IF;
END LOOP;
CLOSE c_qtsl_sub;
- 步驟8:第二次map操作,輸出數(shù)據(jù)(C5)
- 輸入:無
- 輸出:out_trd_qtsl_his:RDD
INSERT INTO out_trd_qtsl_his
(scdm, --市場代碼
hydm, --結(jié)算參與人的清算編號
...
seq_no, --補(bǔ)錄編號
sub_no --內(nèi)部順序號
)
VALUES
(nvl(r_qtsl_sub.scdm, ''), --市場代碼
nvl(r_qtsl_sub.hydm, ''), --結(jié)算參與人的清算編號
...
v_seq,
'0');
- 步驟9:第三次map操作说庭,輸出數(shù)據(jù)(C5)
- 輸入:無
- 輸出:out_trd_qtsl_sub:RDD
INSERT INTO out_trd_qtsl_sub
(scdm, --市場代碼
hydm, --結(jié)算參與人的清算編號
...
seq_no, --補(bǔ)錄編號
sub_no, --內(nèi)部順序號
sub_no_pre --父序號
)
VALUES
(nvl(r_qtsl_sub.scdm, ''), --市場代碼
nvl(r_qtsl_sub.hydm, ''), --結(jié)算參與人的清算編號
...
v_seq,
'1',
'0');
分析樣例2
- 步驟1:清理中間+輸出表(A3+A4)
- 輸入:無
- 輸出:無
DELETE out_trd_bloomberg t0
WHERE t0.data_date BETWEEN v_last_date AND p_i_date;
- 步驟2: 數(shù)據(jù)轉(zhuǎn)換(B1)
- 輸入:bloomberg_temp:Dataframe
- 輸出:無
BEGIN
SELECT COUNT(1)
INTO v_count2
FROM bloomberg_temp t
WHERE t.data_date BETWEEN v_last_date AND p_i_date;
EXCEPTION
WHEN OTHERS THEN
v_count2 := 0;
END;
- 步驟3:游標(biāo)轉(zhuǎn)RDD(C1)
- 輸入:bloomberg_temp:RDD ; par_sys_stock_bmtx:RDD ; par_sys_coin:RDD ;par_exchange_coin_trans:RDD
- 輸出:無
SELECT t2.security_id security_id,
t1.price_date price_date,
...
decode(v1.to_coin, null, t1.coin, v1.to_coin) coin, --t1.coin,
round(decode(v1.rate, null, t1.zspj, t1.zspj * v1.rate), 6) zspj, --t1.zspj,
round(decode(v1.rate, null, t1.jkpj, t1.jkpj * v1.rate), 6) jkpj, --t1.jkpj,
...
FROM bloomberg_temp t1,
par_sys_stock_bmtx t2,
(select t4.coin_name from_coin,
t5.coin_name to_coin,
t3.rate,
t3.inure_begin_date,
t4.inure_end_date
from par_exchange_coin_trans t3,
par_sys_coin t4,
par_sys_coin t5
where t3.from_coin = t4.coin_code
and t3.to_coin = t5.coin_code
and p_i_date BETWEEN t3.inure_begin_date AND
t3.inure_end_date
and p_i_date BETWEEN t4.inure_begin_date AND
t4.inure_end_date
and p_i_date BETWEEN t5.inure_begin_date AND
t5.inure_end_date) v1
WHERE t1.data_date BETWEEN v_last_date AND p_i_date
AND t1.stock_code = t2.bm_code
AND t2.bm_type IN ('1','2','10','11') --ISIN code,RIC,CUSIP
AND p_i_date BETWEEN t2.inure_begin_date AND t2.inure_end_date
AND t1.coin = v1.from_coin(+)
AND substr(t2.security_id, 3, 3) <> '056'
- 步驟4:單行數(shù)據(jù)過濾(C2)
- 輸入:無
- 輸出:無
IF rec.stock_kind = '01' THEN
v_count := 0;
SELECT COUNT(1)
INTO v_count
FROM par_sys_stock t6,
par_sys_coin t7
WHERE rec.security_id = t6.security_id
AND p_i_date BETWEEN t6.inure_begin_date AND t6.inure_end_date
AND rec.coin = t7.coin_name
AND t7.coin_code = t6.coin_code;
IF v_count = 0 THEN
continue;
END IF;
...
ELSE
NULL;
END IF;
- 步驟5:重復(fù)數(shù)據(jù)過濾(C3)
- 輸入:無
- 輸出:無
v_count := 0;
SELECT COUNT(1)
INTO v_count
FROM out_trd_bloomberg t
WHERE t.coin = rec.coin
AND t.security_id = rec.security_id
AND t.price_date = rec.price_date
AND t.data_date = rec.data_date
AND t.country = rec.country
AND t.market = rec.market;
IF v_count > 0 THEN
...
continue;
END IF;
- 步驟6:單行數(shù)據(jù)輸出(C5)
- 輸入:無
- 輸出:out_trd_bloomberg:RDD
INSERT INTO out_trd_bloomberg
( SECURITY_ID,
PRICE_DATE,
COUNTRY,
...
) VALUES
(rec.security_id,
rec.price_date,
rec.country,
...
);