@TOC
DataX概述
DataX 是阿里巴巴集團(tuán)內(nèi)被廣泛使用的離線數(shù)據(jù)同步工具/平臺,實(shí)現(xiàn)包括 MySQL饰剥、Oracle、SqlServer畸悬、Postgre、HDFS珊佣、Hive蹋宦、ADS闺骚、HBase、TableStore(OTS)妆档、MaxCompute(ODPS)、DRDS 等各種異構(gòu)數(shù)據(jù)源之間高效的數(shù)據(jù)同步功能虫碉。
Github地址:https://github.com/alibaba/DataX
- Features
將不同數(shù)據(jù)源的同步抽象為從源頭數(shù)據(jù)源讀取數(shù)據(jù)的Reader插件贾惦,以及向目標(biāo)端寫入數(shù)據(jù)的Writer插件,理論上DataX框架可以支持任意數(shù)據(jù)源類型的數(shù)據(jù)同步工作敦捧。同時DataX插件體系作為一套生態(tài)系統(tǒng), 每接入一套新數(shù)據(jù)源該新加入的數(shù)據(jù)源即可實(shí)現(xiàn)和現(xiàn)有的數(shù)據(jù)源互通须板。
- 設(shè)計(jì)理念
DataX將復(fù)雜的網(wǎng)狀的同步鏈路變成了星型數(shù)據(jù)鏈路,DataX作為中間傳輸載體負(fù)責(zé)連接各種數(shù)據(jù)源兢卵。當(dāng)需要接入一個新的數(shù)據(jù)源的時候习瑰,只需要將此數(shù)據(jù)源對接到DataX,便能跟已有的數(shù)據(jù)源做到無縫數(shù)據(jù)同步秽荤。
DataX3.0設(shè)計(jì)框架
DataX本身作為離線數(shù)據(jù)同步框架甜奄,采用Framework + plugin架構(gòu)構(gòu)建。將數(shù)據(jù)源讀取和寫入抽象成為Reader/Writer插件窃款,納入到整個同步框架中课兄。
- Reader:Reader為數(shù)據(jù)采集模塊,負(fù)責(zé)采集數(shù)據(jù)源的數(shù)據(jù)晨继,將數(shù)據(jù)發(fā)送給Framework
- Writer: Writer為數(shù)據(jù)寫入模塊烟阐,負(fù)責(zé)不斷從Framework取數(shù)據(jù),并將數(shù)據(jù)寫入到目的端紊扬。
- Framework:Framework用于連接reader和writer蜒茄,作為兩者的數(shù)據(jù)傳輸通道,并處理緩沖餐屎,流控,并發(fā)啤挎,數(shù)據(jù)轉(zhuǎn)換等核心技術(shù)問題。
支持插件體系
經(jīng)過幾年積累庆聘,DataX目前已經(jīng)有了比較全面的插件體系,主流的RDBMS數(shù)據(jù)庫伙判、NOSQL象对、大數(shù)據(jù)計(jì)算系統(tǒng)都已經(jīng)接入。DataX目前支持?jǐn)?shù)據(jù)如下:
詳情參考:https://github.com/alibaba/DataX
DataX3.0核心架構(gòu)
DataX3.0開源版本支持單機(jī)多線程模式完成同步作業(yè)運(yùn)行勒魔,本小節(jié)按一個DataX作業(yè)聲明周期的時序圖甫煞,從整體架構(gòu)設(shè)計(jì)非常簡要說明DataX各個模塊相互關(guān)系冠绢。
- 核心模塊介紹:
- DataX完成單個數(shù)據(jù)同步的作業(yè)抚吠,我們稱之為Job,DataX接受到一個Job之后弟胀,將啟動一個進(jìn)程來完成整個作業(yè)同步過程楷力。DataX Job模塊是單個作業(yè)的中樞管理節(jié)點(diǎn)孵户,承擔(dān)了數(shù)據(jù)清理、子任務(wù)切分(將單一作業(yè)計(jì)算轉(zhuǎn)化為多個子Task)夏哭、TaskGroup管理等功能。
- DataXJob啟動后何址,會根據(jù)不同的源端切分策略械念,將Job切分成多個小的Task(子任務(wù))头朱,以便于并發(fā)執(zhí)行龄减。Task便是DataX作業(yè)的最小單元,每一個Task都會負(fù)責(zé)一部分?jǐn)?shù)據(jù)的同步工作希停。
- 切分多個Task之后,DataX Job會調(diào)用Scheduler模塊亚隙,根據(jù)配置的并發(fā)數(shù)據(jù)量,將拆分成的Task重新組合阿弃,組裝成TaskGroup(任務(wù)組)羞延。每一個TaskGroup負(fù)責(zé)以一定的并發(fā)運(yùn)行完畢分配好的所有Task渣淳,默認(rèn)單個任務(wù)組的并發(fā)數(shù)量為5伴箩。
- 每一個Task都由TaskGroup負(fù)責(zé)啟動,Task啟動后,會固定啟動Reader—>Channel—>Writer的線程來完成任務(wù)同步工作怔蚌。
- DataX作業(yè)運(yùn)行起來之后旁赊,Job監(jiān)控并等待多個TaskGroup模塊任務(wù)完成,等待所有TaskGroup任務(wù)完成后Job成功退出终畅。否則,異常退出声离,進(jìn)程退出值非0
- DataX調(diào)度流程:
舉例來說瘫怜,用戶提交了一個DataX作業(yè),并且配置了20個并發(fā)鲸湃,目的是將一個100張分表的mysql數(shù)據(jù)同步到odps里面。 DataX的調(diào)度決策思路是:
- DataXJob根據(jù)分庫分表切分成了100個Task暗挑。
- 根據(jù)20個并發(fā),DataX計(jì)算共需要分配4個TaskGroup垃它。
- 4個TaskGroup平分切分好的100個Task,每一個TaskGroup負(fù)責(zé)以5個并發(fā)共計(jì)運(yùn)行25個Task国拇。
部署DataX
直接下載DataX工具包:DataX下載地址(http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz)
下載后解壓至本地某個目錄惯殊,進(jìn)入bin目錄酱吝,即可運(yùn)行同步作業(yè):
$ cd {YOUR_DATAX_HOME}/bin
$ python datax.py {YOUR_JOB.json}
在oushudb中準(zhǔn)備源表和目標(biāo)表
create table source_table(id int); //準(zhǔn)備源表
INSERT INTO source_table select generate_series(1,1000); 往源表中寫入測試數(shù)據(jù)
create table target_table(id int); //準(zhǔn)備目標(biāo)表
根據(jù)postgresql模板配置json
# 對應(yīng)參數(shù)說明文檔
https://github.com/alibaba/DataX/blob/master/postgresqlreader/doc/postgresqlreader.md
https://github.com/HashDataInc/DataX/blob/master/gpdbwriter/doc/gpdbwriter.md
python datax.py -r postgresqlreader -w gpdbwriter >oushudb.json.tamplate
編寫json:
{
"job": {
"content": [
{
"reader": {
"name": "postgresqlreader",
"parameter": {
"column": [ "id"],
"connection": [
{
"jdbcUrl": [ "jdbc:postgresql://127.0.0.1:5432/postgres" ],
"table": [ "public.source_table" ]
}
],
"password": "gpadmin",
"username": "gpadmin"
}
},
"writer": {
"name": "gpdbwriter",
"parameter": {
"column": [ "id" ],
"connection": [
{
"jdbcUrl": "jdbc:postgresql://127.0.0.1:5432/postgres",
"table": [ "public.target_table" ]
}
],
"preSql": [ "select 'beginTime: '||now();truncate table target_table" ],
"postSql": [ "select 'endTime: '||now() " ],
"segment_reject_limit": 0,
"username": "gpadmin",
"password": "gpadmin"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
執(zhí)行job
python datax.py oushu.json