全量數(shù)據(jù)導(dǎo)出請查看DataX mongodb導(dǎo)出數(shù)據(jù)到mysql
Datax UDF手冊
datax.py mongodb2mysql_inc.json
{
"job": {
"setting": {
"speed": {
"channel": 4
}
},
"content": [{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": ["*.*.*.*:27017"],
"userName": "DataXTest",
"userPassword": "123456",
"dbName": "weixin",
"collectionName": "fileids_wxpy",
"column": [{
"index":0,
"name": "_id",
"type": "string"
}, {
"index":1,
"name": "crawler_time",
"type": "string"
}, {
"index":2,
"name": "file_url",
"type": "string"
}, {
"index":3,
"name": "flag",
"type": "string"
}, {
"index":4,
"name": "logo_url",
"type": "string"
}, {
"index":5,
"name": "source",
"type": "string"
}, {
"index":6,
"name": "update_date",
"type": "string"
}, {
"index":7,
"name": "update_time",
"type": "long"
}, {
"index":8,
"name": "wx_id",
"type": "string"
}, {
"index":9,
"name": "wx_name",
"type": "string"
}]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"id",
"crawler_time",
"file_url",
"flag",
"logo_url",
"source",
"update_date",
"update_time",
"wx_id",
"wx_name"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://*.*.*.*:3306/weixin?characterEncoding=utf8",
"table": ["fileids_wxpy"]
}
],
"password": "123456",
"username": "root"
}
},
"transformer": [
{
"name": "dx_filter",
"parameter":
{
"columnIndex":1,
"paras":["<","1560493441"]
}
}
]
}]
}
}
運(yùn)行
# python 環(huán)境為2.7
python datax.py mongodb2mysql_inc.json
運(yùn)行結(jié)果
2019-06-14 15:22:58.886 [job-0] INFO JobContainer - PerfTrace not enable!
2019-06-14 15:22:58.886 [job-0] INFO StandAloneJobContainerCommunicator - Total 53 records, 18669 bytes | Speed 93B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Transfermor Success 51848 records | Transformer Error 0 records | Transformer Filter 51795 records | Transformer usedTime 0.000s | Percentage 100.00%
2019-06-14 15:22:58.887 [job-0] INFO JobContainer -
任務(wù)啟動(dòng)時(shí)刻 : 2019-06-14 15:19:37
任務(wù)結(jié)束時(shí)刻 : 2019-06-14 15:22:58
任務(wù)總計(jì)耗時(shí) : 201s
任務(wù)平均流量 : 93B/s
記錄寫入速度 : 0rec/s
讀出記錄總數(shù) : 53
讀寫失敗總數(shù) : 0
2019-06-14 15:22:58.887 [job-0] INFO JobContainer -
Transformer成功記錄總數(shù) : 51848
Transformer失敗記錄總數(shù) : 0
Transformer過濾記錄總數(shù) : 51795
擴(kuò)展: 定時(shí)同步實(shí)現(xiàn)
- mysql_max_timestamp2csv.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["jdbc:mysql://*.*.*.*:x:3306/weixin?characterEncoding=utf8"],
"querySql": [
"SELECT max(crawler_time) FROM fileids_wxpy"
]
}
],
"password": "123456",
"username": "root"
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"fileName": "mysql_max_timestamp_result",
"fileFormat": "csv",
"path": "/root/datax/bin",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {
"channel": 2
}
}
}
}
- datax.py mongodb2mysql_inc.json(這里與上面的同名文件只有過濾條件時(shí)間戳不同蜓陌,此處固定為"timestamp",方便shell腳本替換更新)
{
"job": {
"setting": {
"speed": {
"channel": 4
}
},
"content": [{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": ["*.*.*.*:27017"],
"userName": "DataXTest",
"userPassword": "123456",
"dbName": "weixin",
"collectionName": "fileids_wxpy",
"column": [{
"index":0,
"name": "_id",
"type": "string"
}, {
"index":1,
"name": "crawler_time",
"type": "string"
}, {
"index":2,
"name": "file_url",
"type": "string"
}, {
"index":3,
"name": "flag",
"type": "string"
}, {
"index":4,
"name": "logo_url",
"type": "string"
}, {
"index":5,
"name": "source",
"type": "string"
}, {
"index":6,
"name": "update_date",
"type": "string"
}, {
"index":7,
"name": "update_time",
"type": "long"
}, {
"index":8,
"name": "wx_id",
"type": "string"
}, {
"index":9,
"name": "wx_name",
"type": "string"
}]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"id",
"crawler_time",
"file_url",
"flag",
"logo_url",
"source",
"update_date",
"update_time",
"wx_id",
"wx_name"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://*.*.*.*:3306/weixin?characterEncoding=utf8",
"table": ["fileids_wxpy"]
}
],
"password": "123456",
"username": "root"
}
},
"transformer": [
{
"name": "dx_filter",
"parameter":
{
"columnIndex":1,
"paras":["<","timestamp"]
}
}
]
}]
}
}
- cron_datax_mongodb2mysql.sh
python2 /root/datax/bin/datax.py /root/datax/bin/mysql_max_timestamp2csv.json
# $?是shell變量,表示"最后一次執(zhí)行命令"的退出狀態(tài).0為成功,非0為失敗, -ne 為不等于
if [ $? -ne 0 ]; then
echo "minute_data_sync.sh error, can not get max_time from target db!"
exit 1
fi
# 找到 DataX 寫入的文本文件捏顺,并將內(nèi)容讀取到一個(gè)變量中
RESULT_FILE=`ls /root/datax/bin/mysql_max_timestamp_result_*`
MAX_TIME=`cat $RESULT_FILE`
echo "$RESULT_FILE $MAX_TIME"
# 如果最大時(shí)間戳不為 null 的話咆霜, 修改全部同步的配置郁惜,進(jìn)行增量更新胎挎;
if [ "$MAX_TIME" != "null" ]; then
# 設(shè)置增量更新過濾條件
WHERE="$MAX_TIME"
# 將timestamp字符串替換為上次同步的最大時(shí)間戳
sed "s/timestamp/$WHERE/g" mongodb2mysql_inc.json > mongodb2mysql_inc_tmp.json
#echo "增量更新"
python2 /root/datax/bin/datax.py /root/datax/bin/mongodb2mysql_inc_tmp.json
# 刪除臨時(shí)文件
rm ./mongodb2mysql_inc_tmp.json
else
# echo "全部更新"
python2 /root/datax/bin/datax.py /root/datax/bin/mongodb2mysql.json
fi
- 通過linux 自帶的crontab進(jìn)行定時(shí)管理
30 22 * * * cd /root/datax/bin && sh cron_datax_mongodb2mysql.sh >>/root/datax/bin/cron_datax_mongodb2mysql.log
- 定時(shí)運(yùn)行日志
vim /root/datax/bin/cron_datax_mongodb2mysql.log
······
2019-06-14 17:14:36.178 [job-0] INFO JobContainer - PerfTrace not enable!
2019-06-14 17:14:36.178 [job-0] INFO StandAloneJobContainerCommunicator - Total 65 records, 22919 bytes | Speed 114B/s, 0 records/s | Error 1 records, 350 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 171.039s | Transfermor Success 52013 records | Transformer Error 0 records | Transformer Filter 51948 records | Transformer usedTime 0.000s | Percentage 100.00%
2019-06-14 17:14:36.179 [job-0] INFO JobContainer -
任務(wù)啟動(dòng)時(shí)刻 : 2019-06-14 17:11:13
任務(wù)結(jié)束時(shí)刻 : 2019-06-14 17:14:36
任務(wù)總計(jì)耗時(shí) : 202s
任務(wù)平均流量 : 114B/s
記錄寫入速度 : 0rec/s
讀出記錄總數(shù) : 65
讀寫失敗總數(shù) : 1
2019-06-14 17:14:36.179 [job-0] INFO JobContainer -
Transformer成功記錄總數(shù) : 52013
Transformer失敗記錄總數(shù) : 0
Transformer過濾記錄總數(shù) : 51948
總結(jié)
- 優(yōu)點(diǎn): 就不說了掘而,太多了
- 缺點(diǎn):缺乏對增量更新的內(nèi)置支持配深,但因?yàn)镈ataX的靈活架構(gòu),可以通過shell腳本等方式方便實(shí)現(xiàn)增量同步
對于DataX中支持querySql語法的源數(shù)據(jù)庫推薦參考下文使用 DataX 增量同步數(shù)據(jù),從數(shù)據(jù)源頭過濾數(shù)據(jù)躁愿,可以很好的提高同步效率