DataX 增量同步數(shù)據(jù)

全量數(shù)據(jù)導(dǎo)出請查看DataX mongodb導(dǎo)出數(shù)據(jù)到mysql

Datax UDF手冊

datax.py mongodb2mysql_inc.json

{
  "job": {
    "setting": {
      "speed": {
        "channel": 4
      }
    },
    "content": [{
      "reader": {
        "name": "mongodbreader",
        "parameter": {
          "address": ["*.*.*.*:27017"],
          "userName": "DataXTest",
          "userPassword": "123456",
          "dbName": "weixin",
          "collectionName": "fileids_wxpy",
          "column": [{
            "index":0,
            "name": "_id",
            "type": "string"
          }, {
            "index":1,
            "name": "crawler_time",
            "type": "string"
          }, {
            "index":2,
            "name": "file_url",
            "type": "string"
          }, {
            "index":3,
            "name": "flag",
            "type": "string"
          }, {
            "index":4,
            "name": "logo_url",
            "type": "string"
          }, {
            "index":5,
            "name": "source",
            "type": "string"
          }, {
            "index":6,
            "name": "update_date",
            "type": "string"
          }, {
            "index":7,
            "name": "update_time",
            "type": "long"
          }, {
            "index":8,
            "name": "wx_id",
            "type": "string"
          }, {
            "index":9,
            "name": "wx_name",
            "type": "string"
          }]
        }
      },
       "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                        "id",
                        "crawler_time",
                        "file_url",
                        "flag",
                        "logo_url",
                        "source",
                        "update_date",
                        "update_time",
                        "wx_id",
                        "wx_name"
            ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://*.*.*.*:3306/weixin?characterEncoding=utf8", 
                                "table": ["fileids_wxpy"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root"
                    }
                },
                "transformer": [
                    {
                        "name": "dx_filter",
                        "parameter": 
                            {
                            "columnIndex":1,
                            "paras":["<","1560493441"]
                            }  
                    }
                ]
    }]
  }
}

運(yùn)行

# python 環(huán)境為2.7
python datax.py mongodb2mysql_inc.json

運(yùn)行結(jié)果

2019-06-14 15:22:58.886 [job-0] INFO  JobContainer - PerfTrace not enable!
2019-06-14 15:22:58.886 [job-0] INFO  StandAloneJobContainerCommunicator - Total 53 records, 18669 bytes | Speed 93B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Transfermor Success 51848 records | Transformer Error 0 records | Transformer Filter 51795 records | Transformer usedTime 0.000s | Percentage 100.00%
2019-06-14 15:22:58.887 [job-0] INFO  JobContainer - 
任務(wù)啟動(dòng)時(shí)刻                    : 2019-06-14 15:19:37
任務(wù)結(jié)束時(shí)刻                    : 2019-06-14 15:22:58
任務(wù)總計(jì)耗時(shí)                    :                201s
任務(wù)平均流量                    :               93B/s
記錄寫入速度                    :              0rec/s
讀出記錄總數(shù)                    :                  53
讀寫失敗總數(shù)                    :                   0

2019-06-14 15:22:58.887 [job-0] INFO  JobContainer - 
Transformer成功記錄總數(shù)         :               51848
Transformer失敗記錄總數(shù)         :                   0
Transformer過濾記錄總數(shù)         :               51795

擴(kuò)展: 定時(shí)同步實(shí)現(xiàn)

  • mysql_max_timestamp2csv.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://*.*.*.*:x:3306/weixin?characterEncoding=utf8"], 
                                "querySql": [
                                    "SELECT max(crawler_time) FROM fileids_wxpy"
                                ]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "txtfilewriter",
                    "parameter": {
                        "fileName": "mysql_max_timestamp_result",
                        "fileFormat": "csv",
                        "path": "/root/datax/bin",
                        "writeMode": "truncate"
                    }
                }
            }
        ],
        "setting": { 
            "speed": {
                    "channel": 2
                }
        }
    }
}
  • datax.py mongodb2mysql_inc.json(這里與上面的同名文件只有過濾條件時(shí)間戳不同蜓陌,此處固定為"timestamp",方便shell腳本替換更新)
{
  "job": {
    "setting": {
      "speed": {
        "channel": 4
      }
    },
    "content": [{
      "reader": {
        "name": "mongodbreader",
        "parameter": {
          "address": ["*.*.*.*:27017"],
          "userName": "DataXTest",
          "userPassword": "123456",
          "dbName": "weixin",
          "collectionName": "fileids_wxpy",
          "column": [{
            "index":0,
            "name": "_id",
            "type": "string"
          }, {
            "index":1,
            "name": "crawler_time",
            "type": "string"
          }, {
            "index":2,
            "name": "file_url",
            "type": "string"
          }, {
            "index":3,
            "name": "flag",
            "type": "string"
          }, {
            "index":4,
            "name": "logo_url",
            "type": "string"
          }, {
            "index":5,
            "name": "source",
            "type": "string"
          }, {
            "index":6,
            "name": "update_date",
            "type": "string"
          }, {
            "index":7,
            "name": "update_time",
            "type": "long"
          }, {
            "index":8,
            "name": "wx_id",
            "type": "string"
          }, {
            "index":9,
            "name": "wx_name",
            "type": "string"
          }]
        }
      },
       "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                        "id",
                        "crawler_time",
                        "file_url",
                        "flag",
                        "logo_url",
                        "source",
                        "update_date",
                        "update_time",
                        "wx_id",
                        "wx_name"
            ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://*.*.*.*:3306/weixin?characterEncoding=utf8", 
                                "table": ["fileids_wxpy"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root"
                    }
                },
                "transformer": [
                    {
                        "name": "dx_filter",
                        "parameter": 
                            {
                            "columnIndex":1,
                            "paras":["<","timestamp"]
                            }  
                    }
                ]
    }]
  }
}
  • cron_datax_mongodb2mysql.sh
python2 /root/datax/bin/datax.py /root/datax/bin/mysql_max_timestamp2csv.json
# $?是shell變量,表示"最后一次執(zhí)行命令"的退出狀態(tài).0為成功,非0為失敗, -ne 為不等于
if [ $? -ne 0 ]; then
  echo "minute_data_sync.sh error, can not get max_time from target db!"
  exit 1
fi
# 找到 DataX 寫入的文本文件捏顺,并將內(nèi)容讀取到一個(gè)變量中
RESULT_FILE=`ls /root/datax/bin/mysql_max_timestamp_result_*`
MAX_TIME=`cat $RESULT_FILE`
echo "$RESULT_FILE   $MAX_TIME"
# 如果最大時(shí)間戳不為 null 的話咆霜, 修改全部同步的配置郁惜,進(jìn)行增量更新胎挎;
if [ "$MAX_TIME" != "null" ]; then
  # 設(shè)置增量更新過濾條件
  WHERE="$MAX_TIME"
  # 將timestamp字符串替換為上次同步的最大時(shí)間戳
  sed "s/timestamp/$WHERE/g" mongodb2mysql_inc.json > mongodb2mysql_inc_tmp.json
  #echo "增量更新"
  python2 /root/datax/bin/datax.py /root/datax/bin/mongodb2mysql_inc_tmp.json
  # 刪除臨時(shí)文件
  rm ./mongodb2mysql_inc_tmp.json
else
  # echo "全部更新"
  python2 /root/datax/bin/datax.py /root/datax/bin/mongodb2mysql.json
fi
  • 通過linux 自帶的crontab進(jìn)行定時(shí)管理
30 22 * * * cd /root/datax/bin && sh cron_datax_mongodb2mysql.sh >>/root/datax/bin/cron_datax_mongodb2mysql.log
  • 定時(shí)運(yùn)行日志
vim /root/datax/bin/cron_datax_mongodb2mysql.log
······
2019-06-14 17:14:36.178 [job-0] INFO  JobContainer - PerfTrace not enable!
2019-06-14 17:14:36.178 [job-0] INFO  StandAloneJobContainerCommunicator - Total 65 records, 22919 bytes | Speed 114B/s, 0 records/s | Error 1 records, 350 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 171.039s | Transfermor Success 52013 records | Transformer Error 0 records | Transformer Filter 51948 records | Transformer usedTime 0.000s | Percentage 100.00%
2019-06-14 17:14:36.179 [job-0] INFO  JobContainer -
任務(wù)啟動(dòng)時(shí)刻                    : 2019-06-14 17:11:13
任務(wù)結(jié)束時(shí)刻                    : 2019-06-14 17:14:36
任務(wù)總計(jì)耗時(shí)                    :                202s
任務(wù)平均流量                    :              114B/s
記錄寫入速度                    :              0rec/s
讀出記錄總數(shù)                    :                  65
讀寫失敗總數(shù)                    :                   1

2019-06-14 17:14:36.179 [job-0] INFO  JobContainer -
Transformer成功記錄總數(shù)         :               52013
Transformer失敗記錄總數(shù)         :                   0
Transformer過濾記錄總數(shù)         :               51948

總結(jié)

  • 優(yōu)點(diǎn): 就不說了掘而,太多了
  • 缺點(diǎn):缺乏對增量更新的內(nèi)置支持配深,但因?yàn)镈ataX的靈活架構(gòu),可以通過shell腳本等方式方便實(shí)現(xiàn)增量同步

對于DataX中支持querySql語法的源數(shù)據(jù)庫推薦參考下文使用 DataX 增量同步數(shù)據(jù),從數(shù)據(jù)源頭過濾數(shù)據(jù)躁愿,可以很好的提高同步效率

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末叛本,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子彤钟,更是在濱河造成了極大的恐慌来候,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,718評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件样勃,死亡現(xiàn)場離奇詭異吠勘,居然都是意外死亡性芬,警方通過查閱死者的電腦和手機(jī)峡眶,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來植锉,“玉大人辫樱,你說我怎么就攤上這事】”樱” “怎么了狮暑?”我有些...
    開封第一講書人閱讀 158,207評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長辉饱。 經(jīng)常有香客問我搬男,道長,這世上最難降的妖魔是什么彭沼? 我笑而不...
    開封第一講書人閱讀 56,755評論 1 284
  • 正文 為了忘掉前任缔逛,我火速辦了婚禮,結(jié)果婚禮上姓惑,老公的妹妹穿的比我還像新娘褐奴。我一直安慰自己,他們只是感情好于毙,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,862評論 6 386
  • 文/花漫 我一把揭開白布敦冬。 她就那樣靜靜地躺著,像睡著了一般唯沮。 火紅的嫁衣襯著肌膚如雪脖旱。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 50,050評論 1 291
  • 那天介蛉,我揣著相機(jī)與錄音萌庆,去河邊找鬼。 笑死甘耿,一個(gè)胖子當(dāng)著我的面吹牛踊兜,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播佳恬,決...
    沈念sama閱讀 39,136評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼捏境,長吁一口氣:“原來是場噩夢啊……” “哼于游!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起垫言,我...
    開封第一講書人閱讀 37,882評論 0 268
  • 序言:老撾萬榮一對情侶失蹤贰剥,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后筷频,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體蚌成,經(jīng)...
    沈念sama閱讀 44,330評論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,651評論 2 327
  • 正文 我和宋清朗相戀三年凛捏,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了担忧。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,789評論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡坯癣,死狀恐怖瓶盛,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情示罗,我是刑警寧澤惩猫,帶...
    沈念sama閱讀 34,477評論 4 333
  • 正文 年R本政府宣布,位于F島的核電站蚜点,受9級特大地震影響轧房,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜绍绘,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,135評論 3 317
  • 文/蒙蒙 一奶镶、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧脯倒,春花似錦实辑、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至悠反,卻和暖如春残黑,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背斋否。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評論 1 267
  • 我被黑心中介騙來泰國打工梨水, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人茵臭。 一個(gè)月前我還...
    沈念sama閱讀 46,598評論 2 362
  • 正文 我出身青樓疫诽,卻偏偏與公主長得像,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子奇徒,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,697評論 2 351

推薦閱讀更多精彩內(nèi)容