基于Apache doris怎么構(gòu)建數(shù)據(jù)中臺(tái)(四)-數(shù)據(jù)接入系統(tǒng)
上一次我們講解了數(shù)據(jù)資產(chǎn),元數(shù)據(jù)管理臊泌,血緣關(guān)系等妻怎,這次我們開始將數(shù)據(jù)接入箫锤,怎么實(shí)現(xiàn)快速的數(shù)據(jù)接入
在開發(fā)數(shù)據(jù)模型時(shí)逛钻,我們必須有一個(gè)統(tǒng)一的平臺(tái)僚焦,能夠像流水線一樣,把數(shù)據(jù)一步步加工成數(shù)據(jù)模型绣的。這其中涉及到數(shù)據(jù)萃取、數(shù)據(jù)聚合欲账、作業(yè)調(diào)度等屡江。
主要是為了實(shí)現(xiàn)業(yè)務(wù)數(shù)據(jù)的快速接入,零代碼實(shí)現(xiàn)赛不,數(shù)據(jù)分析人員只需要通過UI進(jìn)行簡單的配置惩嘉、提交任務(wù)即可完成數(shù)據(jù)的接入,并能實(shí)現(xiàn)對數(shù)據(jù)接入任務(wù)的管理及監(jiān)控踢故。
Mysql數(shù)據(jù)源數(shù)據(jù)接入
主要是為了完成針對Mysql數(shù)據(jù)的業(yè)務(wù)系統(tǒng)數(shù)據(jù)接入零代碼實(shí)現(xiàn)文黎,不需要開發(fā)人員接入,提供給數(shù)據(jù)分析人員使用殿较,目的是為了業(yè)務(wù)數(shù)據(jù)快速接入耸峭,無需編碼
數(shù)據(jù)接入系統(tǒng)我們通過自研的規(guī)則引擎,和接入任務(wù)整合淋纲,同時(shí)自動(dòng)化完成了數(shù)據(jù)接入的ETL工作劳闹,規(guī)則可以通過頁面進(jìn)行可視化配置,這塊我會(huì)在后面的質(zhì)量模塊介紹
- 通過UI界面添加數(shù)據(jù)接入任務(wù)的方式
- Mysql數(shù)據(jù)的采集是通過Canal 采集binlog完成
- 在界面上第一步是配置canal實(shí)例(canal實(shí)例的管理是通過cannal admin)洽瞬,除了kafka topic名稱需要手工輸入本涕,其他信息盡可能不要讓使用人員手工數(shù)據(jù)
- 第二步配置Flink Job任務(wù)信息,需要的kafka topic名稱來源于上一步伙窃,業(yè)務(wù)表和數(shù)倉表的對應(yīng)關(guān)系通過元數(shù)據(jù)選擇方式完成菩颖,避免手工輸入
- 第三步:提交任務(wù),這時(shí)候完成canal實(shí)例創(chuàng)建为障,運(yùn)行晦闰,F(xiàn)link job任務(wù)提交運(yùn)行
- 并在列表上監(jiān)控canal實(shí)例及Flink job運(yùn)行狀態(tài)的監(jiān)控
DataX 數(shù)據(jù)接入
要實(shí)現(xiàn)的內(nèi)容基本和Mysql binlog的同步一樣
只不過是Datax是為了實(shí)現(xiàn)非mysql數(shù)據(jù)的數(shù)據(jù)接入零代碼完成
數(shù)據(jù)API方式數(shù)據(jù)接入
傳統(tǒng)數(shù)據(jù)API方式的數(shù)據(jù)接入都需要進(jìn)行代碼開發(fā)對接才能完成,初步設(shè)想這塊通過通用的代碼生成器的方式實(shí)現(xiàn)針對常用API方式(WebService,RestFul API)零代碼接入
圖形化數(shù)據(jù)接入
我們目前支持Kafka鳍怨,Mysql鹅髓,datax數(shù)據(jù)零代碼接入
Mysql :通過Canal采集業(yè)務(wù)數(shù)據(jù)庫的binlog日志,將數(shù)據(jù)推送到指定的Kafka隊(duì)列
其他DB:通過datax(全量和增量)定時(shí)的將數(shù)據(jù)京景,推送到指定的Kafka隊(duì)列窿冯,這里我們對Datax做了改造,讓Datax的數(shù)據(jù)格式和Canal格式一致确徙,
數(shù)據(jù)接口:后端數(shù)據(jù)接收服務(wù)隊(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換(可配置)以后醒串,形成和Canal一致的數(shù)據(jù)格式执桌,推送到指定的Kafka隊(duì)列
- 后端針對業(yè)務(wù)db,我們會(huì)通過元數(shù)據(jù)采集系統(tǒng)采集業(yè)務(wù)系統(tǒng)庫表及字段的元數(shù)據(jù)信息芜赌,進(jìn)入到元數(shù)據(jù)管理系
- 針對沒有數(shù)據(jù)庫表的仰挣,通過接口直接推送到Kafka的數(shù)據(jù),我們在元數(shù)據(jù)管理里統(tǒng)一提供虛擬庫表缠沈,通過這個(gè)完成數(shù)據(jù)的統(tǒng)一接入
- 在數(shù)據(jù)接入的時(shí)候膘壶,我們整合了我們自研的規(guī)則引擎,可以實(shí)現(xiàn)數(shù)據(jù)接入和ETL規(guī)則自動(dòng)綁定洲愤,通過阿波羅配置系統(tǒng)進(jìn)行統(tǒng)一下推到Flink Job里執(zhí)行颓芭,
- 對于異常數(shù)據(jù)(不符合規(guī)則的),自動(dòng)推送到指定的Kafka隊(duì)列柬赐,后端有對應(yīng)的服務(wù)進(jìn)行處理亡问,我們這里是通過Flink實(shí)現(xiàn)了一個(gè)輕量級(jí)的ETL,及數(shù)據(jù)入Doris的自動(dòng)化工具
效果如下:
第一步選擇要接入的數(shù)據(jù)表
第二步選擇數(shù)據(jù)倉相關(guān)的信息肛宋,這一步會(huì)進(jìn)行表字段映射檢查及配置州藕,目前要首先在數(shù)倉中建立相應(yīng)的表,后續(xù)會(huì)自動(dòng)化建表
第三步就是輸入Flink Job名稱進(jìn)行提交了酝陈,整個(gè)就完成了
數(shù)據(jù)開發(fā)控制臺(tái)
- 提供一個(gè)數(shù)據(jù)類似于HUE的SQL數(shù)據(jù)開發(fā)控制臺(tái)床玻,數(shù)據(jù)開發(fā)人員可以通過這個(gè)控制臺(tái)進(jìn)行sql的開發(fā)調(diào)試
- 生產(chǎn)環(huán)境這里delete,drop等操作要進(jìn)行審批確認(rèn)沉帮,才能進(jìn)行笨枯,避免誤操作,刪除數(shù)據(jù)
- 可以將調(diào)試好的sql遇西,添加到定時(shí)任務(wù)調(diào)度系統(tǒng)中馅精,這里我們將海豚調(diào)度集成到我們數(shù)據(jù)中臺(tái)中
零代碼入倉的問題解答
很多朋友問到,我們這種方式會(huì)不會(huì)數(shù)據(jù)丟失粱檀,會(huì)不會(huì)數(shù)據(jù)重復(fù)洲敢,結(jié)合我們自己的場景,給我我們的解決方案
數(shù)據(jù)入到Doris數(shù)據(jù)倉庫對應(yīng)的表中茄蚯,這里我們采用的是Flink實(shí)時(shí)消費(fèi)KafKa的數(shù)據(jù)压彭,然后通過Doris的 Stream Load完成
Flink消費(fèi)Kafka數(shù)據(jù)我們支持兩種方式:
指定Kafka Topic的Offset進(jìn)行消費(fèi):kafka.offset
指定時(shí)間戳的方式:kafka.timestamp
數(shù)據(jù)丟失的問題
針對Flink Job失敗,重啟也是通過這兩個(gè)參數(shù)渗常,
- 如果你記錄了失敗的時(shí)間點(diǎn)的Kafka Offset壮不,可以通過配置文件配置這個(gè)參數(shù)來重啟Flink Job就
行。這樣不會(huì)造成數(shù)據(jù)丟失
- 如果沒有記錄這個(gè)offset皱碘,可通過指定consumer.setStartFromTimestamp(timestamp);這個(gè)時(shí)間就是在配置文件中配置的時(shí)間戳 询一,這樣無論是通過offset還是從指定的時(shí)間開始消費(fèi)Kafka數(shù)據(jù),都不會(huì)造成數(shù)據(jù)丟失
數(shù)據(jù)重復(fù)問題
因?yàn)槲覀冞@個(gè)是在數(shù)據(jù)接入層使用的,數(shù)據(jù)是進(jìn)入到數(shù)據(jù)倉ODS層健蕊,在這一層我們采用的是Doris Unique Key模型菱阵,就算數(shù)據(jù)重復(fù)入庫,會(huì)覆蓋原先的數(shù)據(jù)缩功,不會(huì)存在數(shù)據(jù)重復(fù)問
下一講開始講解數(shù)據(jù)質(zhì)量管理