步驟
先說總體步驟:
- 下載源碼属提,并編譯到本地
maven
倉庫[上傳私服(可選)]权逗; -
pom
文件依賴datax-core
和需要的reader
和writer
- 環(huán)境變量設(shè)置
datax.home
(或者利用System#setProperty(String)
)和一些需要替換腳本中的變量:腳本中${}
占位符的變量將被系統(tǒng)變量替換美尸。 - 將datax.tar.gz中解壓出來的
conf
冤议、plugin
等文件放到datax.home目錄中。 - 構(gòu)造參數(shù)數(shù)組:
{"-job", "xxx.json", "-mode", "standalone", "-jobid", "-1"}
- 調(diào)用
Engin#main(String[])
或者Engine#entry(String[])
引言
目前官方的使用指南里都是利用python來調(diào)用dataX執(zhí)行任務(wù)师坎。而且現(xiàn)有的博客基本上也是利用java來調(diào)用python命令Runtime.getRuntime().exec()
來執(zhí)行恕酸。
個(gè)人感覺,dataX未提供java集成開發(fā)的方法胯陋,應(yīng)該是定位生產(chǎn)系統(tǒng)蕊温,運(yùn)維需要吧袱箱?!
我們的業(yè)務(wù)場(chǎng)景:執(zhí)行完dataX的job之后义矛,還有一定的業(yè)務(wù)邏輯发笔,所以希望在java應(yīng)用里調(diào)用dataX執(zhí)行完job之后,再執(zhí)行后續(xù)邏輯凉翻。
DataX分析
筆者簡(jiǎn)單的看了一下午的DataX的邏輯了讨,完全以使用者的視角分析DataX,必然不能完全了解DataX的整個(gè)執(zhí)行過程制轰。
本文僅分析如果能夠在java代碼里集成DataX進(jìn)行開發(fā)前计。
集成準(zhǔn)備
DataX沒有將代碼上傳到maven服務(wù)器上,所以需要自己先pull代碼到本地垃杖,編譯男杈,才能在集成開發(fā)的使用通過pom引用。有條件的可以上傳到自己的私服上调俘。
代碼地址
代碼依賴
通過pom文件加入datax-core
:
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
如果需要對(duì)應(yīng)的reader
和writer
的話伶棒,加入到pom文件中,比如需要streamreader和streamwriter:
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>streamreader</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>streamwriter</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
要依賴datax一定要保證有對(duì)應(yīng)的源碼或者編譯到本機(jī)的maven repository或者在對(duì)應(yīng)的私服上有上傳相應(yīng)的編譯版本彩库,不然pom文件是找不到依賴的苞冯。
為了集成開發(fā),可能需要一口氣引用所有的reader和writer侧巨,目前所知舅锄,就得一個(gè)一個(gè)寫,如果大家有好辦法司忱,麻煩告知皇忿!
準(zhǔn)備相應(yīng)的文件
從com.alibaba.datax.core.util.container.CoreConstant
中可以看到,datax.home
很重要坦仍,很多文件的讀取都是在datax.home
里面獲取的鳍烁。就如我們?cè)诎惭b版的datax中可以看到里面一些目錄一樣
$ ll
total 4
drwxr-xr-x 2 mcbadm mcb 56 Sep 20 18:28 bin
drwxr-xr-x 2 mcbadm mcb 65 Sep 20 18:28 conf
drwxr-xr-x 2 mcbadm mcb 21 Sep 20 18:28 job
drwxr-xr-x 2 mcbadm mcb 4096 Sep 20 18:28 lib
drwxr-xr-x 4 mcbadm mcb 32 Sep 20 18:28 plugin
drwxr-xr-x 2 mcbadm mcb 22 Sep 20 18:28 script
drwxr-xr-x 2 mcbadm mcb 23 Sep 20 18:28 tmp
目前所知的,Engine#entry
在解析配置的時(shí)候會(huì)讀取conf目錄下的文件繁扎,還有對(duì)應(yīng)plugin/reader/xxxreader幔荒、plugin/writer/xxxwriter的plugin.json文件:
{
"name": "streamreader",
"class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader",
"description": {
"useScene": "only for developer test.",
"mechanism": "use datax framework to transport data from stream.",
"warn": "Never use it in your real job."
},
"developer": "alibaba"
}
編寫代碼
編寫job代碼:
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 1,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好梳玫,世界-DataX爹梁,現(xiàn)在是${now}"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
寫個(gè)測(cè)試類吧:
import java.time.LocalTime;
import com.alibaba.datax.core.Engine;
public class EngineTest {
public static void main(String[] args) {
System.setProperty("datax.home", getCurrentClasspath());
System.setProperty("now", LocalTime.now().toString());// 替換job中的占位符
String[] datxArgs = {"-job", getCurrentClasspath() + "/job/stream2stream.json", "-mode", "standalone", "-jobid", "-1"};
try {
Engine.entry(datxArgs);
} catch (Throwable e) {
e.printStackTrace();
}
}
public static String getCurrentClasspath() {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
String currentClasspath = classLoader.getResource("").getPath();
// 當(dāng)前操作系統(tǒng)
String osName = System.getProperty("os.name");
if (osName.startsWith("Windows")) {
// 刪除path中最前面的/
currentClasspath = currentClasspath.substring(1);
}
return currentClasspath;
}
}
datax在解析完配置后,會(huì)將core.json提澎,job.json姚垃,plugin.json合并在一起:
{
"common": {
"column": {
"dateFormat": "yyyy-MM-dd",
"datetimeFormat": "yyyy-MM-dd HH:mm:ss",
"encoding": "utf-8",
"extraFormats": [
"yyyyMMdd"
],
"timeFormat": "HH:mm:ss",
"timeZone": "GMT+8"
}
},
"core": {
"container": {
"job": {
"id": -1,
"reportInterval": 10000
},
"taskGroup": {
"channel": 5
},
"trace": {
"enable": "false"
}
},
"dataXServer": {
"address": "http://localhost:7001/api",
"reportDataxLog": false,
"reportPerfLog": false,
"timeout": 10000
},
"statistics": {
"collector": {
"plugin": {
"maxDirtyNumber": 10,
"taskClass": "com.alibaba.datax.core.statistics.plugin.task.StdoutPluginCollector"
}
}
},
"transport": {
"channel": {
"byteCapacity": 67108864,
"capacity": 512,
"class": "com.alibaba.datax.core.transport.channel.memory.MemoryChannel",
"flowControlInterval": 20,
"speed": {
"byte": -1,
"record": -1
}
},
"exchanger": {
"bufferSize": 32,
"class": "com.alibaba.datax.core.plugin.BufferedRecordExchanger"
}
}
},
"entry": {
"jvm": "-Xms1G -Xmx1G"
},
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello,你好盼忌,世界-DataX"
}
],
"sliceRecordCount": 1
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
},
"plugin": {
"reader": {
"streamreader": {
"class": "com.alibaba.datax.plugin.reader.streamreader.StreamReader",
"description": {
"mechanism": "use datax framework to transport data from stream.",
"useScene": "only for developer test.",
"warn": "Never use it in your real job."
},
"developer": "alibaba",
"name": "streamreader",
"path": "D:/workspace/datax-test/target/test-classes/\\plugin\\reader\\streamreader"
}
},
"writer": {
"streamwriter": {
"class": "com.alibaba.datax.plugin.writer.streamwriter.StreamWriter",
"description": {
"mechanism": "use datax framework to transport data to stream.",
"useScene": "only for developer test.",
"warn": "Never use it in your real job."
},
"developer": "alibaba",
"name": "streamwriter",
"path": "D:/workspace/datax-test/target/test-classes/\\plugin\\writer\\streamwriter"
}
}
}
}
說說插件原理
每個(gè)reader和writer都有自己的plugin.json文件积糯,里面最重要的就是class配置了掂墓,這個(gè)類的全路徑配置用于classloader將其加載進(jìn)來并通過反射將其實(shí)例化。加載代碼可看com.alibaba.datax.core.util.container.LoadUtil
所以我們?cè)诩傻臅r(shí)候看成,plugin目錄下面不需要有jar包了君编,只需要放json文件就行,因?yàn)槲覀兺ㄟ^pom文件依賴了對(duì)應(yīng)的reader和writer川慌,說白了啦粹,就是classpath下面有對(duì)應(yīng)的reader和writer即可。
結(jié)束語
文章有點(diǎn)長(zhǎng)窘游,記錄了一個(gè)下午的研究結(jié)果唠椭,應(yīng)該有很多不完善的地方,希望可以和大家多交流忍饰。如果覺得有幫助贪嫂,可以點(diǎn)個(gè)贊。