實(shí)時(shí)數(shù)據(jù)倉庫相比較離線數(shù)倉褥伴,實(shí)時(shí)性更高谅将,這就要求數(shù)據(jù)流盡量短,層次相對(duì)簡化重慢,相比較離線饥臂,這里的ods和明細(xì)表就可以合并等于從業(yè)務(wù)庫實(shí)時(shí)同步數(shù)據(jù)寫入到kafka中的一個(gè)topic。關(guān)于數(shù)據(jù)采集這里不做討論似踱,部門是部署腳本監(jiān)控業(yè)務(wù)庫的DML語句隅熙,將create稽煤、update、delete的數(shù)據(jù)實(shí)時(shí)同步寫入到kafka囚戚。那么我們要實(shí)現(xiàn)多個(gè)數(shù)據(jù)流實(shí)時(shí)關(guān)聯(lián)合并組成寬記錄酵熙,
這里有幾點(diǎn)要求
實(shí)時(shí)性
延遲要低,分鐘級(jí)驰坊、秒級(jí)
一致性
保證數(shù)據(jù)的準(zhǔn)確一致性匾二,不能丟數(shù)據(jù),寧可多算不可漏掉數(shù)據(jù)
易用性
實(shí)時(shí)數(shù)據(jù)有三種狀態(tài)增拳芙、刪察藐、改,對(duì)于下游使用這個(gè)寬表要做到易用态鳖,能抽象通用的匯總模型
思路
1.如何解決誰先到誰后到誰遲遲不到的問題转培?storm的例子中有兩個(gè)數(shù)據(jù)流的join的案例,是使用內(nèi)存來存儲(chǔ)先到的數(shù)據(jù)浆竭,設(shè)置過期時(shí)間浸须,這樣就有兩個(gè)問題,過期就表明會(huì)丟數(shù)據(jù)邦泄,由于實(shí)時(shí)數(shù)據(jù)特點(diǎn)有增有改有刪删窒,即使關(guān)聯(lián)上已經(jīng)往下發(fā)送,但是考慮到update顺囊、delete肌索,那內(nèi)存中數(shù)據(jù)不能立即clear,內(nèi)存消耗也是個(gè)問題特碳,很容易吃滿內(nèi)存诚亚,溢出。如果放到內(nèi)存不合適午乓,那就借助存儲(chǔ)數(shù)據(jù)庫站宗,比如hbase、redis益愈,mysql等梢灭,這里第一版是使用的hbase,如果后期性能跟不上可升級(jí)為redis蒸其。2.如何解決更新敏释、刪除數(shù)據(jù)對(duì)之前合并結(jié)果的影響這里定義了一種操作叫做逆操作。比如金額最早為5元摸袁,后續(xù)變成3元钥顽,那這條記錄我會(huì)把前后都傳遞給下游,3|5,下游如果有匯總靠汁,那就 - 5 + 3耳鸯,如果只是合并不做計(jì)算湿蛔,那就只取 3就可以了膀曾。
整體的結(jié)構(gòu)圖
如圖上所示,除了消費(fèi)各自topic的數(shù)據(jù)寫法不一致外继薛,其他節(jié)點(diǎn)都是將左邊和右邊合并修壕,然后繼續(xù)往下發(fā)送,所以這些點(diǎn)都是可以抽象成一個(gè)通用的bolt來處理遏考。
GeneralMerge
Map<String, String> left = (Map<String, String>)tuple.getValueByField("left");Map<String, String> right = (Map<String, String>)tuple.getValueByField("right");if(left != null && right!=null) { left.putAll(right);}獲取左邊右邊慈鸠,然后merge起來 圖里邊最小的單元如下圖,都是2個(gè)流關(guān)聯(lián)組成寬記錄往下emit灌具,然后再跟右邊的結(jié)果合并青团。一直到最后的節(jié)點(diǎn)
仔細(xì)觀察,整個(gè)拓?fù)浣Y(jié)構(gòu)都是由一個(gè)個(gè)這樣的最小單元組成的咖楣,那么最小單元是否可以繼續(xù)抽象成通用的模型呢督笆。拆分下這個(gè)單元,就是由2個(gè)數(shù)據(jù)流截歉、關(guān)聯(lián)關(guān)系胖腾、流到哪、數(shù)據(jù)流數(shù)據(jù)要存儲(chǔ)到什么表中抽象出來就是左邊的表瘪松,右表的表咸作,表和表之間的關(guān)系【leftjoin、innerjoin】這些信息抽象成對(duì)象
Param leftParam = new Params(hb_table1,hb_table1_index,"L|R");Param rightParam = new Params(hb_table2,hb_table2_index,"L&R");
hb_table{1,2}代表數(shù)據(jù)存儲(chǔ)的表名hb_table{1,2}_index代表數(shù)據(jù)自己的主鍵和另一方關(guān)聯(lián)鍵的索引表宵睦,比如不都是主鍵之間進(jìn)行joinL|R代表 左關(guān)聯(lián)记罚,代表左邊的數(shù)據(jù)到了,會(huì)先去找右邊的數(shù)據(jù)壳嚎,即使右邊的數(shù)據(jù)還未到桐智,發(fā)現(xiàn)關(guān)聯(lián)關(guān)系為L|R,左關(guān)聯(lián)末早,左邊也往下發(fā)送。最小單元的處理過程就是 先定義數(shù)據(jù)流的信息说庭,讀取topic然磷,保存數(shù)據(jù)、索引表刊驴,找對(duì)方然后發(fā)送,如下代碼
//獲取數(shù)據(jù)流信息Map<String, String> leftMap = preParseProcess.process(tuple, leftParam); leftMap = save(tuple, leftMap); //保存emit(tuple,collector, leftMap); //找對(duì)方然后發(fā)送
數(shù)據(jù)傳遞
新增類型姿搜,封裝到map中往下傳遞
更新類型,在更新的字段上標(biāo)記新值捆憎、舊值舅柜,比如 new_value + split + old_value,下游使用時(shí)躲惰,已經(jīng)告訴了你更新后的值致份、更新后的值,自己根據(jù)業(yè)務(wù)處理即可
刪除類型础拨,在每個(gè)字段后標(biāo)記刪除標(biāo)簽氮块,如 old_value + split + DEL
優(yōu)缺點(diǎn)
中間結(jié)果多、表多太伊、操作hbase頻繁對(duì)hbase有一定的壓力雇锡、拓?fù)浣Y(jié)構(gòu)比較多,運(yùn)維比較麻煩
將多個(gè)數(shù)據(jù)流匯總成一個(gè)通用寬表僚焦,下游再有這主題的任何實(shí)時(shí)需求都可以很方便的使用和編寫锰提。提高工作效率,減少驗(yàn)證數(shù)據(jù)成本芳悲。
結(jié)論
方案可行
延遲在分鐘內(nèi)
除了運(yùn)單等變化的字段其他字段可以與離線數(shù)據(jù)吻合
提高實(shí)時(shí)開發(fā)的效率
待續(xù)....困的受不鳥了立肘。。名扛。谅年。