DolphinScheduler 調(diào)度 DataX 實(shí)現(xiàn) MySQL To ElasticSearch 增量數(shù)據(jù)同步實(shí)踐

數(shù)據(jù)同步的方式

數(shù)據(jù)同步的2大方式

  • 基于SQL查詢的 CDC(Change Data Capture):
    • 離線調(diào)度查詢作業(yè),批處理。把一張表同步到其他系統(tǒng),每次通過(guò)查詢?nèi)カ@取表中最新的數(shù)據(jù)适掰。也就是我們說(shuō)的基于SQL查詢抽取荠列;
    • 無(wú)法保障數(shù)據(jù)一致性类浪,查的過(guò)程中有可能數(shù)據(jù)已經(jīng)發(fā)生了多次變更;
    • 不保障實(shí)時(shí)性肌似,基于離線調(diào)度存在天然的延遲;
    • 工具軟件以Kettle(Apache Hop最新版)费就、DataX為代表,需要結(jié)合任務(wù)調(diào)度系統(tǒng)使用。
  • 基于日志的 CDC:
    • 實(shí)時(shí)消費(fèi)日志川队,流處理力细,例如 MySQL 的 binlog 日志完整記錄了數(shù)據(jù)庫(kù)中的變更,可以把 binlog 文件當(dāng)作流的數(shù)據(jù)源固额;
    • 保障數(shù)據(jù)一致性眠蚂,因?yàn)?binlog 文件包含了所有歷史變更明細(xì);
    • 保障實(shí)時(shí)性斗躏,因?yàn)轭愃?binlog 的日志文件是可以流式消費(fèi)的逝慧,提供的是實(shí)時(shí)數(shù)據(jù);
    • 工具軟件以Flink CDC、阿里巴巴Canal、Debezium為代表笛臣。

基于SQL查詢?cè)隽繑?shù)據(jù)同步原理

我們考慮用SQL如何查詢?cè)隽繑?shù)據(jù)云稚? 數(shù)據(jù)有增加、修改沈堡、刪除
刪除數(shù)據(jù)采用邏輯刪除的方式静陈,比如定義一個(gè)is_deleted字段標(biāo)識(shí)邏輯刪除
如果數(shù)據(jù)是 UPDATE的,也就是會(huì)被修改的诞丽,那么 where update_datetime >= last_datetime(調(diào)度滾動(dòng)時(shí)間)就是增量數(shù)據(jù)
如果數(shù)據(jù)是 APPEND ONLY 的除了用更新時(shí)間還可以用where id >= 調(diào)度上次last_id

結(jié)合任務(wù)調(diào)度系統(tǒng)
調(diào)度時(shí)間是每日調(diào)度執(zhí)行一次鲸拥,那么 last_datetime = 當(dāng)前調(diào)度開(kāi)始執(zhí)行時(shí)間 - 24小時(shí),延遲就是1天
調(diào)度時(shí)間是15分鐘一次僧免,那么 last_datetime = 當(dāng)前調(diào)度開(kāi)始執(zhí)行時(shí)間 - 15分鐘刑赶,延遲就是15分鐘

這樣就實(shí)現(xiàn)了捕獲增量數(shù)據(jù),從而實(shí)現(xiàn)增量同步

DolphinScheduler + Datax 構(gòu)建離線增量數(shù)據(jù)同步平臺(tái)

本實(shí)踐使用
單機(jī)8c16g
DataX 2022-03-01 官網(wǎng)下載
DolphinScheduler 2.0.3(DolphinScheduler的安裝過(guò)程略猬膨,請(qǐng)參考官網(wǎng))

DolphinScheduler 中設(shè)置好DataX環(huán)境變量
DolphinScheduler 提供了可視化的作業(yè)流程定義角撞,用來(lái)離線定時(shí)調(diào)度DataX Job作業(yè),使用起來(lái)很是順滑

基于SQL查詢離線數(shù)據(jù)同步的用武之地
為什么不用基于日志實(shí)時(shí)的方式勃痴?不是不用谒所,而是根據(jù)場(chǎng)合用∨嫔辏考慮到業(yè)務(wù)實(shí)際需求情況劣领,基于SQL查詢這種離線的方式也并非完全淘汰了
特別是業(yè)務(wù)上實(shí)時(shí)性要求不高,每次調(diào)度增量數(shù)據(jù)沒(méi)那么大的情況下铁材,不需要分布式架構(gòu)來(lái)負(fù)載尖淘,這種情況下是比較合適的選擇
場(chǎng)景舉例:
網(wǎng)站、APP的百萬(wàn)級(jí)著觉、千萬(wàn)級(jí)的內(nèi)容搜索村生,每天幾百篇內(nèi)容新增+修改,搜索上會(huì)用到ES(ElasticSearch)饼丘,那么就需要把 MySQL內(nèi)容數(shù)據(jù)增量同步到ES
DataX就能滿足需求趁桃!

DolphinScheduler中配置DataX MySQL To ElasticSearch工作流

工作流定義

工作流定義 > 創(chuàng)建工作流 > 拖入1個(gè)SHELL組件 > 拖入1個(gè)DATAX組件
SHELL組件(文章)
腳本

echo '文章同步 MySQL To ElasticSearch'

DATAX組件(t_article)
用到2個(gè)插件mysqlreader、elasticsearchwriter^[1]
選 自定義模板:

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/你的數(shù)據(jù)庫(kù)?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false"
                                ],
                                "querySql": [
                                    "select a.id as pk,a.id,a.title,a.content,a.is_delete,a.delete_date,a.create_date,a.update_date from t_article a.update_date >= '${biz_update_dt}';"
                                ]
                            }
                        ],
                        "password": "${biz_mysql_password}",
                        "username": "${biz_mysql_username}"
                    }
                },
                "writer": {
                    "name": "elasticsearchwriter",
                    "parameter": {
                        "endpoint": "${biz_es_host}",
                        "accessId": "${biz_es_username}",
                        "accessKey": "${biz_es_password}",
                        "index": "t_article",
                        "type": "_doc",
                        "batchSize": 1000,
                        "cleanup": false,
                        "discovery": false,
                        "dynamic": true,
                        "settings": {
                            "index": {
                                "number_of_replicas": 0,
                                "number_of_shards": 1
                            }
                        },
                        "splitter": ",",
                        "column": [
                            {
                                "name": "pk",
                                "type": "id"
                            },
                            {
                                "name": "id",
                                "type": "long"
                            },
                            {
                                "name": "title",
                                "type": "text"
                            },
                            {
                                "name": "content",
                                "type": "text"
                            }
                            {
                                "name": "is_delete",
                                "type": "text"
                            },
                            {
                                "name": "delete_date",
                                "type": "date"
                            },
                            {
                                "name": "create_date",
                                "type": "date"
                            },
                            {
                                "name": "update_date",
                                "type": "date"
                            }
                        ]
                    }
                }
            }
        ],
        "setting": {
            "errorLimit": {
                "percentage": 0,
                "record": 0
            },
            "speed": {
                "channel": 1,
                "record": 1000
            }
        }
    }
}

reader和writer的字段配置需保持一致

自定義參數(shù):

biz_update_dt: ${global_bizdate} 
biz_mysql_host: 你的mysql ip
biz_mysql_port: 3306
biz_mysql_username: 你的mysql賬號(hào)
biz_mysql_password: 你的mysql密碼
biz_es_host: 你的es地址帶協(xié)議和端口 http://127.0.0.1:9200
biz_es_username: 你的es賬號(hào)
biz_es_password: 你的es密碼

配置的自定義參數(shù)將會(huì)自動(dòng)替換json模板中的同名變量


reader mysqlreader插件中關(guān)鍵配置: a.update_date >= '${biz_update_dt}' 就是實(shí)現(xiàn)增量同步的關(guān)鍵配置
writer elasticsearchwriter插件中關(guān)鍵配置: ``

"column": [
    {
        "name": "pk",
        "type": "id"
    },
    ......
]

type = id 這樣配置肄鸽,就把文章主鍵映射到es主鍵 _id 從而實(shí)現(xiàn)相同主鍵id重復(fù)寫(xiě)入數(shù)據(jù)卫病,就會(huì)更新數(shù)據(jù)。如果不這樣配置數(shù)據(jù)將會(huì)重復(fù)導(dǎo)入es中

保存工作流

全局變量設(shè)置
global_bizdate: $[yyyy-MM-dd 00:00:00-1]

global_bizdate 引用的變量為 DolphinScheduler 內(nèi)置變量典徘,具體參考官網(wǎng)文檔^[2]
結(jié)合調(diào)度時(shí)間設(shè)計(jì)好時(shí)間滾動(dòng)的窗口時(shí)長(zhǎng)蟀苛,比如按1天增量,那么這里時(shí)間就是減1天

最終的工作流DAG圖為:

by 流水理魚(yú)|wwek

參考

1. DataX ElasticSearchWriter 插件文檔
2. Apache DolphinScheduler 內(nèi)置參數(shù)
本文首發(fā)于流水理魚(yú)博客逮诲,如要轉(zhuǎn)載請(qǐng)注明出處帜平。
歡迎關(guān)注我的公眾號(hào):流水理魚(yú)(liushuiliyu)幽告,全棧、云原生罕模、Homelab交流评腺。
如果您對(duì)相關(guān)文章感興趣帘瞭,也可以關(guān)注我的博客:www.iamle.com 上面有更多內(nèi)容

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末淑掌,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子蝶念,更是在濱河造成了極大的恐慌抛腕,老刑警劉巖,帶你破解...
    沈念sama閱讀 216,591評(píng)論 6 501
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件媒殉,死亡現(xiàn)場(chǎng)離奇詭異担敌,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)廷蓉,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,448評(píng)論 3 392
  • 文/潘曉璐 我一進(jìn)店門(mén)全封,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人桃犬,你說(shuō)我怎么就攤上這事刹悴。” “怎么了攒暇?”我有些...
    開(kāi)封第一講書(shū)人閱讀 162,823評(píng)論 0 353
  • 文/不壞的土叔 我叫張陵土匀,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我形用,道長(zhǎng)就轧,這世上最難降的妖魔是什么? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,204評(píng)論 1 292
  • 正文 為了忘掉前任田度,我火速辦了婚禮妒御,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘镇饺。我一直安慰自己乎莉,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,228評(píng)論 6 388
  • 文/花漫 我一把揭開(kāi)白布兰怠。 她就那樣靜靜地躺著梦鉴,像睡著了一般。 火紅的嫁衣襯著肌膚如雪揭保。 梳的紋絲不亂的頭發(fā)上肥橙,一...
    開(kāi)封第一講書(shū)人閱讀 51,190評(píng)論 1 299
  • 那天,我揣著相機(jī)與錄音秸侣,去河邊找鬼存筏。 笑死宠互,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的椭坚。 我是一名探鬼主播予跌,決...
    沈念sama閱讀 40,078評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼善茎!你這毒婦竟也來(lái)了券册?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 38,923評(píng)論 0 274
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤垂涯,失蹤者是張志新(化名)和其女友劉穎烁焙,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體耕赘,經(jīng)...
    沈念sama閱讀 45,334評(píng)論 1 310
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡骄蝇,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,550評(píng)論 2 333
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了操骡。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片九火。...
    茶點(diǎn)故事閱讀 39,727評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖册招,靈堂內(nèi)的尸體忽然破棺而出岔激,到底是詐尸還是另有隱情,我是刑警寧澤跨细,帶...
    沈念sama閱讀 35,428評(píng)論 5 343
  • 正文 年R本政府宣布鹦倚,位于F島的核電站,受9級(jí)特大地震影響冀惭,放射性物質(zhì)發(fā)生泄漏震叙。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,022評(píng)論 3 326
  • 文/蒙蒙 一散休、第九天 我趴在偏房一處隱蔽的房頂上張望媒楼。 院中可真熱鬧,春花似錦戚丸、人聲如沸划址。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,672評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)夺颤。三九已至,卻和暖如春胁勺,著一層夾襖步出監(jiān)牢的瞬間世澜,已是汗流浹背。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,826評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工署穗, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留寥裂,地道東北人嵌洼。 一個(gè)月前我還...
    沈念sama閱讀 47,734評(píng)論 2 368
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像封恰,于是被迫代替她去往敵國(guó)和親麻养。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,619評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容