【DataX】Java中集成DataX開發(fā)

步驟

先說總體步驟:

  1. 下載源碼属提,并編譯到本地maven倉庫[上傳私服(可選)]权逗;
  2. pom文件依賴datax-core和需要的readerwriter
  3. 環(huán)境變量設(shè)置datax.home(或者利用System#setProperty(String))和一些需要替換腳本中的變量:腳本中${}占位符的變量將被系統(tǒng)變量替換美尸。
  4. 將datax.tar.gz中解壓出來的conf冤议、plugin等文件放到datax.home目錄中。
  5. 構(gòu)造參數(shù)數(shù)組:{"-job", "xxx.json", "-mode", "standalone", "-jobid", "-1"}
  6. 調(diào)用Engin#main(String[])或者Engine#entry(String[])

引言

目前官方的使用指南里都是利用python來調(diào)用dataX執(zhí)行任務(wù)师坎。而且現(xiàn)有的博客基本上也是利用java來調(diào)用python命令Runtime.getRuntime().exec()來執(zhí)行恕酸。
個(gè)人感覺,dataX未提供java集成開發(fā)的方法胯陋,應(yīng)該是定位生產(chǎn)系統(tǒng)蕊温,運(yùn)維需要吧袱箱?!
我們的業(yè)務(wù)場(chǎng)景:執(zhí)行完dataX的job之后义矛,還有一定的業(yè)務(wù)邏輯发笔,所以希望在java應(yīng)用里調(diào)用dataX執(zhí)行完job之后,再執(zhí)行后續(xù)邏輯凉翻。

DataX分析

筆者簡(jiǎn)單的看了一下午的DataX的邏輯了讨,完全以使用者的視角分析DataX,必然不能完全了解DataX的整個(gè)執(zhí)行過程制轰。
本文僅分析如果能夠在java代碼里集成DataX進(jìn)行開發(fā)前计。

集成準(zhǔn)備

DataX沒有將代碼上傳到maven服務(wù)器上,所以需要自己先pull代碼到本地垃杖,編譯男杈,才能在集成開發(fā)的使用通過pom引用。有條件的可以上傳到自己的私服上调俘。
代碼地址

代碼依賴

通過pom文件加入datax-core

<dependency>
    <groupId>com.alibaba.datax</groupId>
    <artifactId>datax-core</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

如果需要對(duì)應(yīng)的readerwriter的話伶棒,加入到pom文件中,比如需要streamreader和streamwriter:

<dependency>
    <groupId>com.alibaba.datax</groupId>
    <artifactId>streamreader</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
    <groupId>com.alibaba.datax</groupId>
    <artifactId>streamwriter</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

要依賴datax一定要保證有對(duì)應(yīng)的源碼或者編譯到本機(jī)的maven repository或者在對(duì)應(yīng)的私服上有上傳相應(yīng)的編譯版本彩库,不然pom文件是找不到依賴的苞冯。

為了集成開發(fā),可能需要一口氣引用所有的reader和writer侧巨,目前所知舅锄,就得一個(gè)一個(gè)寫,如果大家有好辦法司忱,麻煩告知皇忿!

準(zhǔn)備相應(yīng)的文件

com.alibaba.datax.core.util.container.CoreConstant中可以看到,datax.home很重要坦仍,很多文件的讀取都是在datax.home里面獲取的鳍烁。就如我們?cè)诎惭b版的datax中可以看到里面一些目錄一樣

$ ll
total 4
drwxr-xr-x 2 mcbadm mcb   56 Sep 20 18:28 bin
drwxr-xr-x 2 mcbadm mcb   65 Sep 20 18:28 conf
drwxr-xr-x 2 mcbadm mcb   21 Sep 20 18:28 job
drwxr-xr-x 2 mcbadm mcb 4096 Sep 20 18:28 lib
drwxr-xr-x 4 mcbadm mcb   32 Sep 20 18:28 plugin
drwxr-xr-x 2 mcbadm mcb   22 Sep 20 18:28 script
drwxr-xr-x 2 mcbadm mcb   23 Sep 20 18:28 tmp

目前所知的,Engine#entry在解析配置的時(shí)候會(huì)讀取conf目錄下的文件繁扎,還有對(duì)應(yīng)plugin/reader/xxxreader幔荒、plugin/writer/xxxwriter的plugin.json文件:

{
    "name": "streamreader",
    "class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader",
    "description": {
        "useScene": "only for developer test.",
        "mechanism": "use datax framework to transport data from stream.",
        "warn": "Never use it in your real job."
    },
    "developer": "alibaba"
}

編寫代碼

編寫job代碼:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "sliceRecordCount": 1,
            "column": [
              {
                "type": "long",
                "value": "10"
              },
              {
                "type": "string",
                "value": "hello,你好梳玫,世界-DataX爹梁,現(xiàn)在是${now}"
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "encoding": "UTF-8",
            "print": true
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 1
       }
    }
  }
}

寫個(gè)測(cè)試類吧:

import java.time.LocalTime;

import com.alibaba.datax.core.Engine;

public class EngineTest {
    
    public static void main(String[] args) {
        System.setProperty("datax.home", getCurrentClasspath());
        System.setProperty("now", LocalTime.now().toString());// 替換job中的占位符
        String[] datxArgs = {"-job", getCurrentClasspath() + "/job/stream2stream.json", "-mode", "standalone", "-jobid", "-1"};
        try {
            Engine.entry(datxArgs);
        } catch (Throwable e) {
            e.printStackTrace();
        }
    }
    
    public static String getCurrentClasspath() {
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        String currentClasspath = classLoader.getResource("").getPath();
        // 當(dāng)前操作系統(tǒng)
        String osName = System.getProperty("os.name");
        if (osName.startsWith("Windows")) {
            // 刪除path中最前面的/
            currentClasspath = currentClasspath.substring(1);
        }
        return currentClasspath;
    }
}

datax在解析完配置后,會(huì)將core.json提澎,job.json姚垃,plugin.json合并在一起:

{
    "common": {
        "column": {
            "dateFormat": "yyyy-MM-dd", 
            "datetimeFormat": "yyyy-MM-dd HH:mm:ss", 
            "encoding": "utf-8", 
            "extraFormats": [
                "yyyyMMdd"
            ], 
            "timeFormat": "HH:mm:ss", 
            "timeZone": "GMT+8"
        }
    }, 
    "core": {
        "container": {
            "job": {
                "id": -1, 
                "reportInterval": 10000
            }, 
            "taskGroup": {
                "channel": 5
            }, 
            "trace": {
                "enable": "false"
            }
        }, 
        "dataXServer": {
            "address": "http://localhost:7001/api", 
            "reportDataxLog": false, 
            "reportPerfLog": false, 
            "timeout": 10000
        }, 
        "statistics": {
            "collector": {
                "plugin": {
                    "maxDirtyNumber": 10, 
                    "taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector"
                }
            }
        }, 
        "transport": {
            "channel": {
                "byteCapacity": 67108864, 
                "capacity": 512, 
                "class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel", 
                "flowControlInterval": 20, 
                "speed": {
                    "byte": -1, 
                    "record": -1
                }
            }, 
            "exchanger": {
                "bufferSize": 32, 
                "class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger"
            }
        }
    }, 
    "entry": {
        "jvm": "-Xms1G -Xmx1G"
    }, 
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [
                            {
                                "type": "long", 
                                "value": "10"
                            }, 
                            {
                                "type": "string", 
                                "value": "hello,你好盼忌,世界-DataX"
                            }
                        ], 
                        "sliceRecordCount": 1
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "UTF-8", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }, 
    "plugin": {
        "reader": {
            "streamreader": {
                "class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader", 
                "description": {
                    "mechanism": "use datax framework to transport data from stream.", 
                    "useScene": "only for developer test.", 
                    "warn": "Never use it in your real job."
                }, 
                "developer": "alibaba", 
                "name": "streamreader", 
                "path": "D:/workspace/datax-test/target/test-classes/\\plugin\\reader\\streamreader"
            }
        }, 
        "writer": {
            "streamwriter": {
                "class": "com.alibaba.datax.plugin.writer.streamwriter.StreamWriter", 
                "description": {
                    "mechanism": "use datax framework to transport data to stream.", 
                    "useScene": "only for developer test.", 
                    "warn": "Never use it in your real job."
                }, 
                "developer": "alibaba", 
                "name": "streamwriter", 
                "path": "D:/workspace/datax-test/target/test-classes/\\plugin\\writer\\streamwriter"
            }
        }
    }
}

說說插件原理

每個(gè)reader和writer都有自己的plugin.json文件积糯,里面最重要的就是class配置了掂墓,這個(gè)類的全路徑配置用于classloader將其加載進(jìn)來并通過反射將其實(shí)例化。加載代碼可看com.alibaba.datax.core.util.container.LoadUtil
所以我們?cè)诩傻臅r(shí)候看成,plugin目錄下面不需要有jar包了君编,只需要放json文件就行,因?yàn)槲覀兺ㄟ^pom文件依賴了對(duì)應(yīng)的reader和writer川慌,說白了啦粹,就是classpath下面有對(duì)應(yīng)的reader和writer即可。

結(jié)束語

文章有點(diǎn)長(zhǎng)窘游,記錄了一個(gè)下午的研究結(jié)果唠椭,應(yīng)該有很多不完善的地方,希望可以和大家多交流忍饰。如果覺得有幫助贪嫂,可以點(diǎn)個(gè)贊。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末艾蓝,一起剝皮案震驚了整個(gè)濱河市力崇,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌赢织,老刑警劉巖亮靴,帶你破解...
    沈念sama閱讀 216,470評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異于置,居然都是意外死亡茧吊,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,393評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門八毯,熙熙樓的掌柜王于貴愁眉苦臉地迎上來搓侄,“玉大人,你說我怎么就攤上這事话速⊙茸伲” “怎么了?”我有些...
    開封第一講書人閱讀 162,577評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵泊交,是天一觀的道長(zhǎng)乳讥。 經(jīng)常有香客問我,道長(zhǎng)廓俭,這世上最難降的妖魔是什么云石? 我笑而不...
    開封第一講書人閱讀 58,176評(píng)論 1 292
  • 正文 為了忘掉前任,我火速辦了婚禮白指,結(jié)果婚禮上留晚,老公的妹妹穿的比我還像新娘。我一直安慰自己告嘲,他們只是感情好错维,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,189評(píng)論 6 388
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著橄唬,像睡著了一般赋焕。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上仰楚,一...
    開封第一講書人閱讀 51,155評(píng)論 1 299
  • 那天隆判,我揣著相機(jī)與錄音,去河邊找鬼僧界。 笑死侨嘀,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的捂襟。 我是一名探鬼主播咬腕,決...
    沈念sama閱讀 40,041評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼葬荷!你這毒婦竟也來了涨共?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 38,903評(píng)論 0 274
  • 序言:老撾萬榮一對(duì)情侶失蹤宠漩,失蹤者是張志新(化名)和其女友劉穎举反,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體扒吁,經(jīng)...
    沈念sama閱讀 45,319評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡火鼻,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,539評(píng)論 2 332
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了雕崩。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片凝危。...
    茶點(diǎn)故事閱讀 39,703評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖晨逝,靈堂內(nèi)的尸體忽然破棺而出蛾默,到底是詐尸還是另有隱情,我是刑警寧澤捉貌,帶...
    沈念sama閱讀 35,417評(píng)論 5 343
  • 正文 年R本政府宣布支鸡,位于F島的核電站,受9級(jí)特大地震影響趁窃,放射性物質(zhì)發(fā)生泄漏牧挣。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,013評(píng)論 3 325
  • 文/蒙蒙 一醒陆、第九天 我趴在偏房一處隱蔽的房頂上張望瀑构。 院中可真熱鬧,春花似錦刨摩、人聲如沸寺晌。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,664評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽呻征。三九已至耘婚,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間陆赋,已是汗流浹背沐祷。 一陣腳步聲響...
    開封第一講書人閱讀 32,818評(píng)論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留攒岛,地道東北人赖临。 一個(gè)月前我還...
    沈念sama閱讀 47,711評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像灾锯,于是被迫代替她去往敵國和親兢榨。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,601評(píng)論 2 353

推薦閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理挠进,服務(wù)發(fā)現(xiàn)色乾,斷路器,智...
    卡卡羅2017閱讀 134,651評(píng)論 18 139
  • Spring Boot 參考指南 介紹 轉(zhuǎn)載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 46,803評(píng)論 6 342
  • 1领突、通過CocoaPods安裝項(xiàng)目名稱項(xiàng)目信息 AFNetworking網(wǎng)絡(luò)請(qǐng)求組件 FMDB本地?cái)?shù)據(jù)庫組件 SD...
    陽明先生_X自主閱讀 15,979評(píng)論 3 119
  • 總結(jié):本文用比較淺顯的語言來解釋字符流暖璧、字節(jié)流、以及編碼和解碼君旦。 ??計(jì)算機(jī)中澎办,無論是文本、圖片還是音視頻金砍,所有的...
    secondtown閱讀 1,322評(píng)論 0 1
  • 洗臉怎么樣才算洗干凈?還只是用洗面奶隨便在臉上抹兩下鹅巍?那你就錯(cuò)了千扶!潔面的重要性是不可置疑的,下面一起來看看有哪些潔...
    2f80cfe3d9dc閱讀 215評(píng)論 0 0