創(chuàng)建Spring項目使用DataX進行定時數(shù)據(jù)庫備份

? DataX 是一個異構(gòu)數(shù)據(jù)源離線同步工具疾嗅,致力于實現(xiàn)包括關(guān)系型數(shù)據(jù)庫(MySQL豪嗽、Oracle等)谴蔑、HDFS、Hive龟梦、ODPS隐锭、HBase、FTP等各種異構(gòu)數(shù)據(jù)源之間穩(wěn)定高效的數(shù)據(jù)同步功能计贰。

在DataX的官網(wǎng)介紹文檔中钦睡,其使用十分簡單。下載安裝包之后躁倒,使用python datax.py [demo.json]命令即可進行數(shù)據(jù)同步荞怒。雖然其啟動命令使用的是python腳本,但是看其安裝包之后發(fā)現(xiàn)秧秉,只有啟動的部分配置環(huán)境變量使用的是python褐桌,其余具體源碼都是使用的java。既然底層是用java寫的象迎,所以萌發(fā)了使用Spring來時備份數(shù)據(jù)的想法荧嵌。

一、DataX3.0基本結(jié)構(gòu)

在之前的一篇博文中指蚜,在Intell Idea中啟動了DataX鸽扁,證明了使用Java項目引用DataX是可行的苟呐。下面簡單分析一下DataX的源碼。

DataX本身作為離線數(shù)據(jù)同步框架赃春,采用Framework + plugin架構(gòu)構(gòu)建。將數(shù)據(jù)源讀取和寫入抽象成為Reader/Writer插件劫乱,納入到整個同步框架中聘鳞。

datax

總的來說DataX項目由FrameWork(core包薄辅、common包和transformer包)以及ReadPlugin和WritePlugin組成,對于DataX所支持的數(shù)據(jù)庫抠璃,都有一對XXXreader和XXXwriter包站楚。下面就是從DataX的github項目clone下來的源碼包的結(jié)構(gòu)目錄。

--DataX
----common
----core
----transformer
----XXXreader
----XXXwriter
----...
  • Reader:Reader為數(shù)據(jù)采集模塊搏嗡,負責(zé)采集數(shù)據(jù)源的數(shù)據(jù)窿春,將數(shù)據(jù)發(fā)送給Framework。
  • Writer: Writer為數(shù)據(jù)寫入模塊采盒,負責(zé)不斷向Framework取數(shù)據(jù)旧乞,并將數(shù)據(jù)寫入到目的端。
  • Framework:Framework(core磅氨、common和transfer模塊)用于連接reader和writer尺栖,作為兩者的數(shù)據(jù)傳輸通道,并處理緩沖烦租,流控延赌,并發(fā),數(shù)據(jù)轉(zhuǎn)換等核心技術(shù)問題叉橱。

二挫以、基本框架模塊代碼解析

入口方法是core包下面的 com.alibaba.datax.core.Engine.main(String[] args)方法,直接調(diào)用了Engine類的entry()方法

2.1 Engine啟動類代碼解析
  • entry()方法
public static void entry(final String[] args) throws Throwable {
        Options options = new Options();
        options.addOption("job", true, "Job config.");
        options.addOption("jobid", true, "Job unique id.");
        options.addOption("mode", true, "Job runtime mode.");
        String jobPath = cl.getOptionValue("job");
        Configuration configuration = ConfigParser.parse(jobPath);
        ...
        configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);
        ...
        ConfigurationValidate.doValidate(configuration);
        Engine engine = new Engine();
        engine.start(configuration);
}

entry()方法主要作用:
1窃祝、 獲取項目啟動參數(shù):job掐松、jobid和mode;
2粪小、 使用ConfigParser工具類從jobpath即傳輸任務(wù)的json文件獲取configuration大磺,并對jobId以及mode進行了驗證,隨后將configuration作為入?yún)⑻讲玻{(diào)用Engine類的start()方法量没。

  • start()方法
public void start(Configuration allConf) {
        ColumnCast.bind(allConf);
        LoadUtil.bind(allConf);

        boolean isJob = !("taskGroup".equalsIgnoreCase(allConf
                .getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
         ...
        if (isJob) {
            allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
            container = new JobContainer(allConf);
            instanceId = allConf.getLong(
                    CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);

        }
        ...
        container.start();
    }

start()方法主要作用:
1、初始化一些配置突想,檢查任務(wù)的model殴蹄;
2、根據(jù)Configuration的內(nèi)容創(chuàng)建JobContainer猾担,真正啟動任務(wù)是調(diào)用JobContainer的start()方法袭灯。

2.2 JobContainer類解析
  • start()方法
   public void start() {
            this.userConf = this.configuration.clone();
            this.preHandle();
            this.init();
            this.prepare();
            this.totalStage = this.split();
            this.schedule();
            this.post();
            this.postHandle();
            this.invokeHooks();
        }

start()方法可以看出datax進行數(shù)據(jù)備份的一系列流程,從預(yù)處理绑嘹,初始化稽荧,到實際調(diào)用對應(yīng)的reader和writer的插件,有興趣的讀者可以自行查看源代碼工腋。

三姨丈、打包DataX項目

解析了DataX的源代碼之后畅卓,我們已經(jīng)知道了從哪里可以調(diào)用DataX的備份功能。我們可以將從github上clone源碼到本地蟋恬,使用maven將項目打包放在本地倉庫翁潘,有條件的話可以上傳到私服。

在core包的pom.xml文件里面加上插件如下:

            <!-- 打包源碼 -->
            <plugin>
                <artifactId>maven-source-plugin</artifactId>
                <version>2.1</version>
                <configuration>
                    <attach>true</attach>
                </configuration>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

使用mvn clean install命令將包安裝到本地倉庫歼争,同理可以打包自己需要的reader和writer包拜马,比如我想將Oracle的表導(dǎo)入到MySql數(shù)據(jù)庫,那么我就需要打包oraclereader和mysqlwriter這兩個包沐绒。

四俩莽、創(chuàng)建boot項目,使用maven包

依賴都已經(jīng)準(zhǔn)備好乔遮,下一步就可以來創(chuàng)建boot工程了扮超,除了Springboot項目所需要的相關(guān)依賴以外,pom文件里面還要加上DataX的相關(guān)依賴如下:

        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>datax-core</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>mysqlreader</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.datax</groupId>
            <artifactId>mysqlwriter</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

從第二節(jié)我們知道蹋肮,入口是Engine類的entry()方法調(diào)用自身的start()方法出刷,但是entry()方法的入?yún)⒈容^復(fù)雜,再加上在實際應(yīng)用中括尸,Datax的job.json任務(wù)文件里面配置的數(shù)據(jù)庫的密碼是加密的巷蚪,所以我自定義了一個DataxUtil類病毡,然后去調(diào)用Engine類的start()方法濒翻。

五、多線程時的錯誤

在使用定時任務(wù)的過程中啦膜,我發(fā)現(xiàn)當(dāng)兩個job在同一時刻開始有送,并且reader和writer不同的時候,會出現(xiàn)找不到對應(yīng)的reader或者writer的異常僧家。比如說 job1和job2都在凌晨01點整執(zhí)行雀摘,job1是從Oracle備份數(shù)據(jù)到MySQL,而job2是從MySQL備份數(shù)據(jù)到MySQL八拱,就會報找不到oraclereader plugin或者mysqlreader plugin的錯誤阵赠。如果兩個job的reader和writer分別相同,比如說都是從Oracle備份到MySQL肌稻,或者都是是從MySQL備份到Oracle則可以正常運行清蚀。
經(jīng)過一番查找,發(fā)現(xiàn)問題就出現(xiàn)在LoadUtil這個類爹谭。在2.2節(jié)介紹的JobContainer類的start()方法里面調(diào)用了preHandle()方法枷邪,preHandle()方法里面使用了LoadUtil來加載對應(yīng)的reader和writer,而LoadUtil的loadJobPlugin()方法線程不安全诺凡,從而導(dǎo)致了前一個job加載到一半的reader或者writer會被其他線程篡改东揣,導(dǎo)致前一個job的reader或者writer不可用践惑。

AbstractJobPlugin handler = LoadUtil.loadJobPlugin(handlerPluginType, handlerPluginName);

解決方法有兩個:
1、強行將定時任務(wù)改成串行的嘶卧,前一個job結(jié)束之后才調(diào)用下一個job尔觉;
2、修改LoadUtil類脸候,保證loadJobPlugin()方法的線程安全穷娱。

以下是LoadUtil類需要修改的部分:

  • 刪除以下兩行
private static Configuration pluginRegisterCenter;
private static Map<String, JarLoader> jarLoaderCenter = new HashMap<String, JarLoader>();

替換成:

/**
* 所有插件配置放置在pluginRegisterCenter中,為區(qū)別reader运沦、transformer和writer泵额,還能區(qū)別
* 具體pluginName,故使用pluginType.pluginName作為key放置在該map中
*/
private static ThreadLocal<Configuration> pluginRegisterCenter = new InheritableThreadLocal<Configuration>();

/** jarLoader的緩沖 */
private final static Map<String, JarLoader> jarLoaderCenter = new ConcurrentHashMap<String, JarLoader>();
  • 修改bind方法携添,改為:
    public static void bind(final Configuration pluginConfigs) {
        pluginRegisterCenter.set(pluginConfigs);
    }
  • 修改getPluginConf方法嫁盲,改為:
private static Configuration getPluginConf(PluginType pluginType,
                                               String pluginName) {
        Configuration pluginConf = pluginRegisterCenter.get()
                .getConfiguration(generatePluginKey(pluginType, pluginName));

        if (null == pluginConf) {
            throw DataXException.asDataXException(
                    FrameworkErrorCode.PLUGIN_INSTALL_ERROR,
                    String.format("DataX不能找到插件[%s]的配置.",
                            pluginName));
        }

        return pluginConf;
    }

至此就將LoadUtil改成了線程安全的類。

不過估計在DataX設(shè)計之初烈掠,就是面向運維人員羞秤,所以才會使用python命令以使其運行對應(yīng)job。故其可能在多線程支持方面可能會有欠缺左敌,所以在自行使用的時候瘾蛋,最好是不要使用多線程。一是由于源碼本身就不支持多線程矫限,即使對目前暴露出的問題修改了部分源碼哺哼,由于沒有深入閱讀源碼,可能會在將來遇到其他方面的問題叼风;二是復(fù)制效率問題取董,多個線程一起傳輸數(shù)據(jù)時,如果都是大量數(shù)據(jù)的傳輸无宿,可能會對內(nèi)存茵汰、IO、還有網(wǎng)絡(luò)造成爭用孽鸡,造成其他問題蹂午。

六、spring項目代碼

boot項目代碼待上傳彬碱。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末豆胸,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子堡妒,更是在濱河造成了極大的恐慌配乱,老刑警劉巖,帶你破解...
    沈念sama閱讀 206,968評論 6 482
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異搬泥,居然都是意外死亡桑寨,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,601評論 2 382
  • 文/潘曉璐 我一進店門忿檩,熙熙樓的掌柜王于貴愁眉苦臉地迎上來尉尾,“玉大人,你說我怎么就攤上這事燥透∩秤剑” “怎么了?”我有些...
    開封第一講書人閱讀 153,220評論 0 344
  • 文/不壞的土叔 我叫張陵班套,是天一觀的道長肢藐。 經(jīng)常有香客問我,道長吱韭,這世上最難降的妖魔是什么吆豹? 我笑而不...
    開封第一講書人閱讀 55,416評論 1 279
  • 正文 為了忘掉前任,我火速辦了婚禮理盆,結(jié)果婚禮上痘煤,老公的妹妹穿的比我還像新娘。我一直安慰自己猿规,他們只是感情好衷快,可當(dāng)我...
    茶點故事閱讀 64,425評論 5 374
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著姨俩,像睡著了一般蘸拔。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上哼勇,一...
    開封第一講書人閱讀 49,144評論 1 285
  • 那天都伪,我揣著相機與錄音呕乎,去河邊找鬼积担。 笑死,一個胖子當(dāng)著我的面吹牛猬仁,可吹牛的內(nèi)容都是我干的帝璧。 我是一名探鬼主播,決...
    沈念sama閱讀 38,432評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼湿刽,長吁一口氣:“原來是場噩夢啊……” “哼的烁!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起诈闺,我...
    開封第一講書人閱讀 37,088評論 0 261
  • 序言:老撾萬榮一對情侶失蹤渴庆,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體襟雷,經(jīng)...
    沈念sama閱讀 43,586評論 1 300
  • 正文 獨居荒郊野嶺守林人離奇死亡刃滓,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,028評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了耸弄。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片咧虎。...
    茶點故事閱讀 38,137評論 1 334
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖计呈,靈堂內(nèi)的尸體忽然破棺而出砰诵,到底是詐尸還是另有隱情,我是刑警寧澤捌显,帶...
    沈念sama閱讀 33,783評論 4 324
  • 正文 年R本政府宣布茁彭,位于F島的核電站,受9級特大地震影響扶歪,放射性物質(zhì)發(fā)生泄漏尉间。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,343評論 3 307
  • 文/蒙蒙 一击罪、第九天 我趴在偏房一處隱蔽的房頂上張望哲嘲。 院中可真熱鬧,春花似錦媳禁、人聲如沸眠副。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,333評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽囱怕。三九已至,卻和暖如春毫别,著一層夾襖步出監(jiān)牢的瞬間娃弓,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,559評論 1 262
  • 我被黑心中介騙來泰國打工岛宦, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留台丛,地道東北人。 一個月前我還...
    沈念sama閱讀 45,595評論 2 355
  • 正文 我出身青樓砾肺,卻偏偏與公主長得像挽霉,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子变汪,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 42,901評論 2 345

推薦閱讀更多精彩內(nèi)容