背景
MySQL庫A 到 MySQL庫B的增量數(shù)據同步需求
DolphinScheduler中配置DataX MySQL To MySQL工作流
工作流定義
工作流定義 > 創(chuàng)建工作流 > 拖入1個SHELL組件 > 拖入1個DATAX組件
SHELL組件(文章)
腳本
echo '文章同步 MySQL To MySQL'
DATAX組件(t_article)
用到2個插件mysqlreader[1]瘦穆、mysqlwriter[2]
選 自定義模板:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/你的數(shù)據庫A?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false"
],
"querySql": [
"select a.id,a.title,a.content,a.is_delete,a.delete_date,a.create_date,a.update_date from t_article a.update_date >= '${biz_update_dt}';"
]
}
],
"password": "${biz_mysql_password}",
"username": "${biz_mysql_username}"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"`id`",
"`title`",
"`content`",
"`is_delete`",
"`delete_date`",
"`create_date`",
"`update_date`"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/你的數(shù)據庫B?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false",
"table": [
"t_article"
]
}
],
"writeMode": "replace",
"password": "${biz_mysql_password}",
"username": "${biz_mysql_username}"
}
}
}
],
"setting": {
"errorLimit": {
"percentage": 0,
"record": 0
},
"speed": {
"channel": 1,
"record": 1000
}
}
}
}
reader和writer的字段配置需保持一致
自定義參數(shù):
biz_update_dt: ${global_bizdate}
biz_mysql_host: 你的mysql ip
biz_mysql_port: 3306
biz_mysql_username: 你的mysql賬號
biz_mysql_password: 你的mysql密碼
# 本文實驗環(huán)境A庫和B庫用的同一個實例脑溢,如果MySQL是多個實例停撞,可以再新增加參數(shù)定義例如 biz_mysql_host_b,在模板中對應引用即可
配置的自定義參數(shù)將會自動替換json模板中的同名變量
reader mysqlreader插件中關鍵配置: a.update_date >= '${biz_update_dt}'
就是實現(xiàn)增量同步的關鍵配置
writer mysqlwriter插件中關鍵配置: ``
"parameter": {
"writeMode": "replace",
......
}
writeMode為replace,相同主鍵id重復寫入數(shù)據即寡,就會更新數(shù)據。sql本質上執(zhí)行的是 replace into
保存工作流
全局變量設置
global_bizdate: $[yyyy-MM-dd 00:00:00-1]
global_bizdate 引用的變量為 DolphinScheduler 內置變量袜刷,具體參考官網文檔^[3]
結合調度時間設計好時間滾動的窗口時長聪富,比如按1天增量,那么這里時間就是減1天
最終的工作流DAG圖為:
爬坑記錄
- 官網下載的DataX不包含ElasticSearchWriter寫插件
默認不帶該插件著蟹,需要自己編譯ElasticSearchWriter插件墩蔓。
git clone https://github.com/alibaba/DataX.git
為了加快編譯速度,可以只編譯<module>elasticsearchwriter</module>
項目根目錄的pom.xml
全注釋掉萧豆,
下只保留<module>elasticsearchwriter</module>
其他注釋掉奸披,另外``也需要保留
如果自己不想編譯或者編譯失敗請搜索?? "流水理魚"微信公眾號,或者加我私人微信我給你已經編譯好的插件包
by 流水理魚|wwek
參考
1. DataX MysqlReader 插件文檔 https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
2. DataX MysqlWriter 插件文檔 https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md
3. Apache DolphinScheduler 內置參數(shù) https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/parameter/built-in.html
本文首發(fā)于流水理魚博客炕横,如要轉載請注明出處源内。
歡迎關注我的公眾號:流水理魚(liushuiliyu),全棧份殿、云原生膜钓、Homelab交流。
如果您對相關文章感興趣卿嘲,也可以關注我的博客:www.iamle.com 上面有更多內容