1.前言
datax是阿里出品辱士,最初是為了解決淘寶數(shù)據(jù)交換的問題舒萎,據(jù)說淘寶有30%的數(shù)據(jù)交換是通過datax完成的致燥。
2.介紹
DataX 是一個(gè)開源異構(gòu)數(shù)據(jù)源離線同步工具,致力于實(shí)現(xiàn)包括關(guān)系型數(shù)據(jù)庫(MySQL赂韵、Oracle等)娱节、HDFS、Hive祭示、ODPS肄满、HBase、FTP等各種異構(gòu)數(shù)據(jù)源之間穩(wěn)定高效的數(shù)據(jù)同步功能质涛。采用Framework + plugin架構(gòu)構(gòu)建稠歉。將數(shù)據(jù)源讀取和寫入抽象成為Reader/Writer插件,納入到整個(gè)同步框架中汇陆。
Data目前已經(jīng)支持常用的插件體系怒炸,主流的RDBMS,NOSQL毡代,大數(shù)據(jù)計(jì)算系統(tǒng)都已接入阅羹。
3.源碼解析
從github上clone源碼到本地,源碼地址:https://github.com/alibaba/DataX教寂。
DataX源碼由Framework(core包捏鱼,common包和transformer包)及?plugin(ReadPlugin和WritePlugin)組成。
Framework:Framework用于連接reader和writer酪耕,作為兩者的數(shù)據(jù)傳輸通道导梆,并處理緩沖,流控,并發(fā)看尼,數(shù)據(jù)轉(zhuǎn)換等核心技術(shù)問題递鹉。
Reader:Reader為數(shù)據(jù)采集模塊,負(fù)責(zé)采集數(shù)據(jù)源的數(shù)據(jù)藏斩,將數(shù)據(jù)發(fā)送給Framework梳虽。Writer: Writer為數(shù)據(jù)寫入模塊,負(fù)責(zé)不斷向Framework取數(shù)據(jù)灾茁,并將數(shù)據(jù)寫入到目的端窜觉。
3.1入口類Engine
? entry()方法:
? 主要用于獲取項(xiàng)目啟動(dòng)參數(shù):job,jobid北专,mode禀挫;
? 注意:mode分為單機(jī)模式和分布式模式,這里指定為standalone 單機(jī)模式拓颓。
? ? ? ? ? ? jobid默認(rèn)值為-1语婴,只有在standalone模式下使用,非 standalone 模式必須提供有效的jobid值驶睦。
public static void entry(String jobPath)throws Throwable {
? ? ? ? String jobIdString ="-1";
// 指定單機(jī)還是分布式模式運(yùn)行
? ? ? ? RUNTIME_MODE ="standalone";
Configuration configuration = ConfigParser.parse(jobPath);
......
configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);
......
ConfigurationValidate.doValidate(configuration);
Engine engine =new Engine();
engine.start(configuration);
}
start()方法:
主要用于初始化配置砰左,檢查job的model信息。
public void start(Configuration allConf) {
// 綁定column轉(zhuǎn)換信息
? ? ColumnCast.bind(allConf);
/**
* 初始化PluginLoader场航,可以獲取各種插件配置
*/
? ? LoadUtil.bind(allConf);
......
Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
//初始化PerfTrace
? ? PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
perfTrace.setJobInfo(jobInfoConfig, perfReportEnable, channelNumber);
container.start();
}
3.2 jobContainer容器
job實(shí)例運(yùn)行在jobContainer容器中缠导,它是所有任務(wù)的master,負(fù)責(zé)初始化溉痢、拆分僻造、調(diào)度、運(yùn)行孩饼、回收髓削、監(jiān)控和匯報(bào)。
start()方法:
jobContainer主要負(fù)責(zé)的工作全部在start()里面镀娶,包括init立膛、prepare、split梯码、scheduler宝泵、post以及destroy和statistics。
public void start() {
LOG.info("DataX jobContainer starts job.");
this.preHandle();
this.init();
this.prepare();
this.totalStage =this.split();
this.schedule();
this.post();
this.postHandle();
this.invokeHooks();
}
init()方法:reader和writer的初始化
private void init() {
......
JobPluginCollector jobPluginCollector =new DefaultJobPluginCollector(
this.getContainerCommunicator());
//必須先Reader 忍些,后Writer
this.jobReader =this.initJobReader(jobPluginCollector);
this.jobWriter =this.initJobWriter(jobPluginCollector);
}
schedule()方法:
任務(wù)調(diào)度器schedule首先完成的工作是把上一步reader和writer split的結(jié)果整合到具體taskGroupContainer中鲁猩。
private void schedule() {
/**
* 通過獲取配置信息得到每個(gè)taskGroup需要運(yùn)行哪些tasks任務(wù)
*/
? ? List taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
this.needChannelNumber, channelsPerTaskGroup);
......
AbstractScheduler scheduler;
scheduler = initStandaloneScheduler(this.configuration);
scheduler.schedule(taskGroupConfigs);
......
/** * 檢查任務(wù)執(zhí)行情況 */this.checkLimit();
this.checkLimit();
}
post()方法: 啟動(dòng)各類數(shù)據(jù)庫插件的讀寫任務(wù)坎怪。
private void post() {
this.postJobWriter();
this.postJobReader();
}
4. Spring Boot集成DataX
在springboot項(xiàng)目上罢坝,通過POM文件引入datax相關(guān)jar包
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
同時(shí)需要引入數(shù)據(jù)源讀取和寫入相關(guān)的Reader/Writer插件
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>11.2.0.3</version>
<scope>system</scope>
<systemPath>${basedir}/src/main/lib/ojdbc6-11.2.0.3.jar</systemPath>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>sqljdbc4</artifactId>
<version>4.0</version>
<scope>system</scope>
<systemPath>${basedir}/src/main/lib/sqljdbc4-4.0.jar</systemPath>
</dependency>
這里引入mysql 及oracle數(shù)據(jù)源對(duì)應(yīng)的插件