? DataX 是一個異構(gòu)數(shù)據(jù)源離線同步工具疾嗅,致力于實現(xiàn)包括關(guān)系型數(shù)據(jù)庫(MySQL豪嗽、Oracle等)谴蔑、HDFS、Hive龟梦、ODPS隐锭、HBase、FTP等各種異構(gòu)數(shù)據(jù)源之間穩(wěn)定高效的數(shù)據(jù)同步功能计贰。
在DataX的官網(wǎng)介紹文檔中钦睡,其使用十分簡單。下載安裝包之后躁倒,使用python datax.py [demo.json]
命令即可進行數(shù)據(jù)同步荞怒。雖然其啟動命令使用的是python腳本,但是看其安裝包之后發(fā)現(xiàn)秧秉,只有啟動的部分配置環(huán)境變量使用的是python褐桌,其余具體源碼都是使用的java。既然底層是用java寫的象迎,所以萌發(fā)了使用Spring來時備份數(shù)據(jù)的想法荧嵌。
一、DataX3.0基本結(jié)構(gòu)
在之前的一篇博文中指蚜,在Intell Idea中啟動了DataX鸽扁,證明了使用Java項目引用DataX是可行的苟呐。下面簡單分析一下DataX的源碼。
DataX本身作為離線數(shù)據(jù)同步框架赃春,采用Framework + plugin架構(gòu)構(gòu)建。將數(shù)據(jù)源讀取和寫入抽象成為Reader/Writer插件劫乱,納入到整個同步框架中聘鳞。
總的來說DataX項目由FrameWork(core包薄辅、common包和transformer包)以及ReadPlugin和WritePlugin組成,對于DataX所支持的數(shù)據(jù)庫抠璃,都有一對XXXreader和XXXwriter包站楚。下面就是從DataX的github項目clone下來的源碼包的結(jié)構(gòu)目錄。
--DataX
----common
----core
----transformer
----XXXreader
----XXXwriter
----...
- Reader:Reader為數(shù)據(jù)采集模塊搏嗡,負責(zé)采集數(shù)據(jù)源的數(shù)據(jù)窿春,將數(shù)據(jù)發(fā)送給Framework。
- Writer: Writer為數(shù)據(jù)寫入模塊采盒,負責(zé)不斷向Framework取數(shù)據(jù)旧乞,并將數(shù)據(jù)寫入到目的端。
- Framework:Framework(core磅氨、common和transfer模塊)用于連接reader和writer尺栖,作為兩者的數(shù)據(jù)傳輸通道,并處理緩沖烦租,流控延赌,并發(fā),數(shù)據(jù)轉(zhuǎn)換等核心技術(shù)問題叉橱。
二挫以、基本框架模塊代碼解析
入口方法是core包下面的 com.alibaba.datax.core.Engine.main(String[] args)
方法,直接調(diào)用了Engine類的entry()
方法
2.1 Engine
啟動類代碼解析
-
entry()
方法
public static void entry(final String[] args) throws Throwable {
Options options = new Options();
options.addOption("job", true, "Job config.");
options.addOption("jobid", true, "Job unique id.");
options.addOption("mode", true, "Job runtime mode.");
String jobPath = cl.getOptionValue("job");
Configuration configuration = ConfigParser.parse(jobPath);
...
configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);
...
ConfigurationValidate.doValidate(configuration);
Engine engine = new Engine();
engine.start(configuration);
}
entry()
方法主要作用:
1窃祝、 獲取項目啟動參數(shù):job掐松、jobid和mode;
2粪小、 使用ConfigParser工具類從jobpath即傳輸任務(wù)的json文件獲取configuration大磺,并對jobId以及mode進行了驗證,隨后將configuration作為入?yún)⑻讲玻{(diào)用Engine類的start()
方法量没。
-
start()
方法
public void start(Configuration allConf) {
ColumnCast.bind(allConf);
LoadUtil.bind(allConf);
boolean isJob = !("taskGroup".equalsIgnoreCase(allConf
.getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
...
if (isJob) {
allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
container = new JobContainer(allConf);
instanceId = allConf.getLong(
CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);
}
...
container.start();
}
start()
方法主要作用:
1、初始化一些配置突想,檢查任務(wù)的model殴蹄;
2、根據(jù)Configuration的內(nèi)容創(chuàng)建JobContainer猾担,真正啟動任務(wù)是調(diào)用JobContainer的start()
方法袭灯。
2.2 JobContainer
類解析
-
start()
方法
public void start() {
this.userConf = this.configuration.clone();
this.preHandle();
this.init();
this.prepare();
this.totalStage = this.split();
this.schedule();
this.post();
this.postHandle();
this.invokeHooks();
}
從start()
方法可以看出datax進行數(shù)據(jù)備份的一系列流程,從預(yù)處理绑嘹,初始化稽荧,到實際調(diào)用對應(yīng)的reader和writer的插件,有興趣的讀者可以自行查看源代碼工腋。
三姨丈、打包DataX項目
解析了DataX的源代碼之后畅卓,我們已經(jīng)知道了從哪里可以調(diào)用DataX的備份功能。我們可以將從github上clone源碼到本地蟋恬,使用maven將項目打包放在本地倉庫翁潘,有條件的話可以上傳到私服。
在core包的pom.xml文件里面加上插件如下:
<!-- 打包源碼 -->
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>2.1</version>
<configuration>
<attach>true</attach>
</configuration>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
使用mvn clean install
命令將包安裝到本地倉庫歼争,同理可以打包自己需要的reader和writer包拜马,比如我想將Oracle的表導(dǎo)入到MySql數(shù)據(jù)庫,那么我就需要打包oraclereader和mysqlwriter這兩個包沐绒。
四俩莽、創(chuàng)建boot項目,使用maven包
依賴都已經(jīng)準(zhǔn)備好乔遮,下一步就可以來創(chuàng)建boot工程了扮超,除了Springboot項目所需要的相關(guān)依賴以外,pom文件里面還要加上DataX的相關(guān)依賴如下:
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>mysqlreader</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>mysqlwriter</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
從第二節(jié)我們知道蹋肮,入口是Engine
類的entry()
方法調(diào)用自身的start()
方法出刷,但是entry()
方法的入?yún)⒈容^復(fù)雜,再加上在實際應(yīng)用中括尸,Datax的job.json任務(wù)文件里面配置的數(shù)據(jù)庫的密碼是加密的巷蚪,所以我自定義了一個DataxUtil類病毡,然后去調(diào)用Engine
類的start()
方法濒翻。
五、多線程時的錯誤
在使用定時任務(wù)的過程中啦膜,我發(fā)現(xiàn)當(dāng)兩個job在同一時刻開始有送,并且reader和writer不同的時候,會出現(xiàn)找不到對應(yīng)的reader或者writer的異常僧家。比如說 job1和job2都在凌晨01點整執(zhí)行雀摘,job1是從Oracle備份數(shù)據(jù)到MySQL,而job2是從MySQL備份數(shù)據(jù)到MySQL八拱,就會報找不到oraclereader plugin或者mysqlreader plugin的錯誤阵赠。如果兩個job的reader和writer分別相同,比如說都是從Oracle備份到MySQL肌稻,或者都是是從MySQL備份到Oracle則可以正常運行清蚀。
經(jīng)過一番查找,發(fā)現(xiàn)問題就出現(xiàn)在LoadUtil
這個類爹谭。在2.2節(jié)介紹的JobContainer
類的start()
方法里面調(diào)用了preHandle()
方法枷邪,preHandle()
方法里面使用了LoadUtil
來加載對應(yīng)的reader和writer,而LoadUtil的loadJobPlugin()
方法線程不安全诺凡,從而導(dǎo)致了前一個job加載到一半的reader或者writer會被其他線程篡改东揣,導(dǎo)致前一個job的reader或者writer不可用践惑。
AbstractJobPlugin handler = LoadUtil.loadJobPlugin(handlerPluginType, handlerPluginName);
解決方法有兩個:
1、強行將定時任務(wù)改成串行的嘶卧,前一個job結(jié)束之后才調(diào)用下一個job尔觉;
2、修改LoadUtil類脸候,保證loadJobPlugin()
方法的線程安全穷娱。
以下是LoadUtil類需要修改的部分:
- 刪除以下兩行
private static Configuration pluginRegisterCenter;
private static Map<String, JarLoader> jarLoaderCenter = new HashMap<String, JarLoader>();
替換成:
/**
* 所有插件配置放置在pluginRegisterCenter中,為區(qū)別reader运沦、transformer和writer泵额,還能區(qū)別
* 具體pluginName,故使用pluginType.pluginName作為key放置在該map中
*/
private static ThreadLocal<Configuration> pluginRegisterCenter = new InheritableThreadLocal<Configuration>();
/** jarLoader的緩沖 */
private final static Map<String, JarLoader> jarLoaderCenter = new ConcurrentHashMap<String, JarLoader>();
- 修改bind方法携添,改為:
public static void bind(final Configuration pluginConfigs) {
pluginRegisterCenter.set(pluginConfigs);
}
- 修改getPluginConf方法嫁盲,改為:
private static Configuration getPluginConf(PluginType pluginType,
String pluginName) {
Configuration pluginConf = pluginRegisterCenter.get()
.getConfiguration(generatePluginKey(pluginType, pluginName));
if (null == pluginConf) {
throw DataXException.asDataXException(
FrameworkErrorCode.PLUGIN_INSTALL_ERROR,
String.format("DataX不能找到插件[%s]的配置.",
pluginName));
}
return pluginConf;
}
至此就將LoadUtil改成了線程安全的類。
不過估計在DataX設(shè)計之初烈掠,就是面向運維人員羞秤,所以才會使用python命令以使其運行對應(yīng)job。故其可能在多線程支持方面可能會有欠缺左敌,所以在自行使用的時候瘾蛋,最好是不要使用多線程。一是由于源碼本身就不支持多線程矫限,即使對目前暴露出的問題修改了部分源碼哺哼,由于沒有深入閱讀源碼,可能會在將來遇到其他方面的問題叼风;二是復(fù)制效率問題取董,多個線程一起傳輸數(shù)據(jù)時,如果都是大量數(shù)據(jù)的傳輸无宿,可能會對內(nèi)存茵汰、IO、還有網(wǎng)絡(luò)造成爭用孽鸡,造成其他問題蹂午。
六、spring項目代碼
boot項目代碼待上傳彬碱。