datax是什么
- 阿里開源的ETL工具(github地址:https://github.com/alibaba/DataX
)瞻赶,ETL是描述從數(shù)據(jù)源讀取數(shù)據(jù)视乐,經(jīng)過轉換劫瞳,再加載到目的數(shù)據(jù)源的過程和媳,而datax是對這一過程的實現(xiàn)巴刻,采用framework+plugin框架模式。
image.png
比對 | ETL | datax | 功能 |
---|---|---|---|
數(shù)據(jù)抽取 | Extract | Reader-plugin | 從數(shù)據(jù)源讀取數(shù)據(jù)傍妒,傳輸?shù)絝ramework |
轉換 | transport | Framework | 對數(shù)據(jù)進行轉換幔摸、清洗、并發(fā)颤练、流量控制 |
數(shù)據(jù)寫入 | load | Writer-Plugin | 從framework讀取數(shù)據(jù)既忆,寫入目標數(shù)據(jù)源 |
為什么選擇datax
- 可靠穩(wěn)定,性能強,市場廣泛使用尿贫,經(jīng)得起時間和市場的考驗
- 活躍的社區(qū),完善的使用文檔
- 上手容易踏揣,配置簡單庆亡,學習成本低
- 插件支持的數(shù)據(jù)源覆蓋范圍廣
- 提供自定義插件擴展功能,可根據(jù)需求自主開發(fā)插件
- 完善的運行日志打印與監(jiān)控捞稿,能夠迅速通過日志分析又谋、定位問題,例如總體的運行情況日志如下
任務啟動時刻 : 2019-09-17 10:44:56
任務結束時刻 : 2019-09-17 10:45:18
任務總計耗時 : 22s
任務平均流量 : 492.72KB/s
記錄寫入速度 : 8594rec/s
讀出記錄總數(shù) : 171895
讀寫失敗總數(shù) : 0
datax的運行機制
image.png
- job :數(shù)據(jù)同步的作業(yè)娱局,是datax運行最小業(yè)務單元
- task:任務彰亥,job拆分出來的最小執(zhí)行單元
- taskGroup:任務組,管理一組task的集合
- jobContainer:任務容器衰齐,用于任務拆分任斋、調度,日志打印等工作
- taskGroupContainer:任務執(zhí)行的容器
如何使用datax
$ python datax.py {YOUR_JOB.json}
- datax.py是datax工具提供的python腳本耻涛,目錄{datax目錄/bin}
- {YOUR_JOB.json} 是datax作業(yè)(job)的配置文件废酷,示例如下
{
#全局配置
"core":{
"transport":{
"channel":{
"speed":{
"channel": 2, #job任務通道數(shù),控制并發(fā)的線程數(shù)
"record":-1, #限制數(shù)據(jù)傳輸?shù)挠涗洈?shù)
"byte":-1, #限制數(shù)據(jù)傳輸?shù)牧髁看笮? "batchSize":2048 #限制批量讀取的size
}
}
}
},
#任務配置
"job": {
"content": [
{
"reader": {
"name": "",#插件名稱
"parameter": {
"connection": [#連接信息
{
"jdbcUrl": [""],
"querySql": [
""
],
"table": [""]
}
],
"column": [],
"splitPk":"",#分片鍵,
"where":"",#查詢限制條件
"password": "",
"username": "",
}
},
"writer": {
"name": "",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": "",
"table": [""]
}
],
"password": "",
"username": ""
}
}
}
],
"setting": {
"speed": {
"channel":5,
"record":1000,
},
"errorLimit": {#臟數(shù)據(jù)閾值配置
"record":2,
"percentage": 0.02
}
}
}
}
datax的性能調優(yōu)
datax性能影響因素
服務器性能:內存抹缕、存儲澈蟆,IO
網(wǎng)絡環(huán)境:寬帶大小、網(wǎng)絡穩(wěn)定性
-
配置文件參數(shù)的優(yōu)化
- datax腳本運行時的內存大小配置
python datax.py --jvm '-Xms1G -Xmx1G' {YOUR_JOB.json}
- 調整job任務的限速卓研、限流及并發(fā)線程數(shù)
"speed":{ "channel": 2, #job任務通道數(shù)趴俘,控制并發(fā)的線程數(shù) "record":-1, #限制數(shù)據(jù)傳輸?shù)挠涗洈?shù) "byte":-1, #限制數(shù)據(jù)傳輸?shù)牧髁看笮? "batchSize":2048 #限制批量讀取的size } 注: channel:并發(fā)數(shù),默認為5奏赘,即5個并發(fā)寥闪,每次可執(zhí)行task數(shù)為5 例:channel配置為20個并發(fā),就需要4個taskGroup磨淌,如果作業(yè)有100個 task橙垢,那么每個group管理25個task。 byte:限流伦糯,在帶寬允許條件下合理配置柜某,-1為不限制,往往會出現(xiàn)帶寬占用 過高的問題敛纲。
- datax腳本運行時的內存大小配置
案例分析
問題:數(shù)據(jù)庫A的t_a表數(shù)據(jù)(275w數(shù)據(jù)量)同步到數(shù)據(jù)庫B的t_b表喂击,遷移邏輯:
image.png
表結構如下:
CREATE TABLE `t_a` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`phone` varchar(11) NOT NULL,
`nick_name` varchar(45) DEFAULT NULL,
`user_name` varchar(45) DEFAULT NULL,
`sex` tinyint(2) DEFAULT NULL,
`age` int(4) DEFAULT NULL,
`created_user` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL,
`created_date` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`modified_user` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL,
`modified_date` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `t_b` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`mobile` varchar(11) NOT NULL,
`nick_name` varchar(45) DEFAULT NULL,
`user_name` varchar(45) DEFAULT NULL,
`sex` tinyint(2) DEFAULT NULL,
`age` int(4) DEFAULT NULL,
`created_user` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL,
`created_date` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`modified_user` varchar(45) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL,
`modified_date` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
實現(xiàn)方案
- 基礎datax腳本:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["A"],
"querySql": [
"SELECT id,phone,nick_name,user_name,sex,age,created_user,created_date,modified_user,modified_date from t_a"
]
}
],
"password": "",
"username": ""
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": ["id","mobile","nick_name","user_name","sex","age","created_user","created_date","modified_user","modified_date"],
"connection": [
{
"jdbcUrl": "B",
"table": ["t_b"]
}
],
"password": "",
"username": ""
}
}
}
]
}
}
- 執(zhí)行結果總體情況:
任務啟動時刻 : 2019-09-18 14:38:12
任務結束時刻 : 2019-09-18 14:41:53
任務總計耗時 : 221s
任務平均流量 : 251.45KB/s
記錄寫入速度 : 12501rec/s
讀出記錄總數(shù) : 2750323
讀寫失敗總數(shù) : 0
- 對datax進行優(yōu)化,開啟多線程模式,channel配置必須與splitPk結合使用才能生效
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["id","mobile","nickname","username","gender","20 as age"],
"connection": [
{
"jdbcUrl": [""],
"table": ["tmp_member_all"]
}
],
"splitPk":"id",
"password": "",
"username": ""
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": ["id","phone","nick_name","user_name","sex","age"],
"connection": [
{
"jdbcUrl": "A",
"table": ["t_b"]
}
],
"password": "",
"username": ""
}
}
}
],
"setting": {
"speed": {
"channel":5
}
}
}
}
任務執(zhí)行的總體情況
任務啟動時刻 : 2019-09-18 15:02:58
任務結束時刻 : 2019-09-18 15:03:59
任務總計耗時 : 61s
任務平均流量 : 921.97KB/s
記錄寫入速度 : 45838rec/s
讀出記錄總數(shù) : 2750323
讀寫失敗總數(shù) : 0