數(shù)據(jù)同步的方式
數(shù)據(jù)同步的2大方式
-
基于SQL查詢的 CDC(Change Data Capture):
- 離線調(diào)度查詢作業(yè),批處理。把一張表同步到其他系統(tǒng),每次通過(guò)查詢?nèi)カ@取表中最新的數(shù)據(jù)适掰。也就是我們說(shuō)的基于SQL查詢抽取荠列;
- 無(wú)法保障數(shù)據(jù)一致性类浪,查的過(guò)程中有可能數(shù)據(jù)已經(jīng)發(fā)生了多次變更;
- 不保障實(shí)時(shí)性肌似,基于離線調(diào)度存在天然的延遲;
- 工具軟件以Kettle(Apache Hop最新版)费就、DataX為代表,需要結(jié)合任務(wù)調(diào)度系統(tǒng)使用。
-
基于日志的 CDC:
- 實(shí)時(shí)消費(fèi)日志川队,流處理力细,例如 MySQL 的 binlog 日志完整記錄了數(shù)據(jù)庫(kù)中的變更,可以把 binlog 文件當(dāng)作流的數(shù)據(jù)源固额;
- 保障數(shù)據(jù)一致性眠蚂,因?yàn)?binlog 文件包含了所有歷史變更明細(xì);
- 保障實(shí)時(shí)性斗躏,因?yàn)轭愃?binlog 的日志文件是可以流式消費(fèi)的逝慧,提供的是實(shí)時(shí)數(shù)據(jù);
- 工具軟件以Flink CDC、阿里巴巴Canal、Debezium為代表笛臣。
基于SQL查詢?cè)隽繑?shù)據(jù)同步原理
我們考慮用SQL如何查詢?cè)隽繑?shù)據(jù)云稚? 數(shù)據(jù)有增加、修改沈堡、刪除
刪除數(shù)據(jù)采用邏輯刪除的方式静陈,比如定義一個(gè)is_deleted字段標(biāo)識(shí)邏輯刪除
如果數(shù)據(jù)是 UPDATE的,也就是會(huì)被修改的诞丽,那么 where update_datetime >= last_datetime(調(diào)度滾動(dòng)時(shí)間)就是增量數(shù)據(jù)
如果數(shù)據(jù)是 APPEND ONLY 的除了用更新時(shí)間還可以用where id >= 調(diào)度上次last_id
結(jié)合任務(wù)調(diào)度系統(tǒng)
調(diào)度時(shí)間是每日調(diào)度執(zhí)行一次鲸拥,那么 last_datetime = 當(dāng)前調(diào)度開(kāi)始執(zhí)行時(shí)間 - 24小時(shí),延遲就是1天
調(diào)度時(shí)間是15分鐘一次僧免,那么 last_datetime = 當(dāng)前調(diào)度開(kāi)始執(zhí)行時(shí)間 - 15分鐘刑赶,延遲就是15分鐘
這樣就實(shí)現(xiàn)了捕獲增量數(shù)據(jù),從而實(shí)現(xiàn)增量同步
DolphinScheduler + Datax 構(gòu)建離線增量數(shù)據(jù)同步平臺(tái)
本實(shí)踐使用
單機(jī)8c16g
DataX 2022-03-01 官網(wǎng)下載
DolphinScheduler 2.0.3(DolphinScheduler的安裝過(guò)程略猬膨,請(qǐng)參考官網(wǎng))
DolphinScheduler 中設(shè)置好DataX環(huán)境變量
DolphinScheduler 提供了可視化的作業(yè)流程定義角撞,用來(lái)離線定時(shí)調(diào)度DataX Job作業(yè),使用起來(lái)很是順滑
基于SQL查詢離線數(shù)據(jù)同步的用武之地
為什么不用基于日志實(shí)時(shí)的方式勃痴?不是不用谒所,而是根據(jù)場(chǎng)合用∨嫔辏考慮到業(yè)務(wù)實(shí)際需求情況劣领,基于SQL查詢這種離線的方式也并非完全淘汰了
特別是業(yè)務(wù)上實(shí)時(shí)性要求不高,每次調(diào)度增量數(shù)據(jù)沒(méi)那么大的情況下铁材,不需要分布式架構(gòu)來(lái)負(fù)載尖淘,這種情況下是比較合適的選擇
場(chǎng)景舉例:
網(wǎng)站、APP的百萬(wàn)級(jí)著觉、千萬(wàn)級(jí)的內(nèi)容搜索村生,每天幾百篇內(nèi)容新增+修改,搜索上會(huì)用到ES(ElasticSearch)饼丘,那么就需要把 MySQL內(nèi)容數(shù)據(jù)增量同步到ES
DataX就能滿足需求趁桃!
DolphinScheduler中配置DataX MySQL To ElasticSearch工作流
工作流定義
工作流定義 > 創(chuàng)建工作流 > 拖入1個(gè)SHELL組件 > 拖入1個(gè)DATAX組件
SHELL組件(文章)
腳本
echo '文章同步 MySQL To ElasticSearch'
DATAX組件(t_article)
用到2個(gè)插件mysqlreader、elasticsearchwriter^[1]
選 自定義模板:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://${biz_mysql_host}:${biz_mysql_port}/你的數(shù)據(jù)庫(kù)?useUnicode=true&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF8&autoReconnect=true&useSSL=false&&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false"
],
"querySql": [
"select a.id as pk,a.id,a.title,a.content,a.is_delete,a.delete_date,a.create_date,a.update_date from t_article a.update_date >= '${biz_update_dt}';"
]
}
],
"password": "${biz_mysql_password}",
"username": "${biz_mysql_username}"
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "${biz_es_host}",
"accessId": "${biz_es_username}",
"accessKey": "${biz_es_password}",
"index": "t_article",
"type": "_doc",
"batchSize": 1000,
"cleanup": false,
"discovery": false,
"dynamic": true,
"settings": {
"index": {
"number_of_replicas": 0,
"number_of_shards": 1
}
},
"splitter": ",",
"column": [
{
"name": "pk",
"type": "id"
},
{
"name": "id",
"type": "long"
},
{
"name": "title",
"type": "text"
},
{
"name": "content",
"type": "text"
}
{
"name": "is_delete",
"type": "text"
},
{
"name": "delete_date",
"type": "date"
},
{
"name": "create_date",
"type": "date"
},
{
"name": "update_date",
"type": "date"
}
]
}
}
}
],
"setting": {
"errorLimit": {
"percentage": 0,
"record": 0
},
"speed": {
"channel": 1,
"record": 1000
}
}
}
}
reader和writer的字段配置需保持一致
自定義參數(shù):
biz_update_dt: ${global_bizdate}
biz_mysql_host: 你的mysql ip
biz_mysql_port: 3306
biz_mysql_username: 你的mysql賬號(hào)
biz_mysql_password: 你的mysql密碼
biz_es_host: 你的es地址帶協(xié)議和端口 http://127.0.0.1:9200
biz_es_username: 你的es賬號(hào)
biz_es_password: 你的es密碼
配置的自定義參數(shù)將會(huì)自動(dòng)替換json模板中的同名變量
reader mysqlreader插件中關(guān)鍵配置: a.update_date >= '${biz_update_dt}'
就是實(shí)現(xiàn)增量同步的關(guān)鍵配置
writer elasticsearchwriter插件中關(guān)鍵配置: ``
"column": [
{
"name": "pk",
"type": "id"
},
......
]
type = id 這樣配置肄鸽,就把文章主鍵映射到es主鍵 _id
從而實(shí)現(xiàn)相同主鍵id重復(fù)寫(xiě)入數(shù)據(jù)卫病,就會(huì)更新數(shù)據(jù)。如果不這樣配置數(shù)據(jù)將會(huì)重復(fù)導(dǎo)入es中
保存工作流
全局變量設(shè)置
global_bizdate: $[yyyy-MM-dd 00:00:00-1]
global_bizdate 引用的變量為 DolphinScheduler 內(nèi)置變量典徘,具體參考官網(wǎng)文檔^[2]
結(jié)合調(diào)度時(shí)間設(shè)計(jì)好時(shí)間滾動(dòng)的窗口時(shí)長(zhǎng)蟀苛,比如按1天增量,那么這里時(shí)間就是減1天
最終的工作流DAG圖為:
by 流水理魚(yú)|wwek
參考
1. DataX ElasticSearchWriter 插件文檔
2. Apache DolphinScheduler 內(nèi)置參數(shù)
本文首發(fā)于流水理魚(yú)博客逮诲,如要轉(zhuǎn)載請(qǐng)注明出處帜平。
歡迎關(guān)注我的公眾號(hào):流水理魚(yú)(liushuiliyu)幽告,全棧、云原生罕模、Homelab交流评腺。
如果您對(duì)相關(guān)文章感興趣帘瞭,也可以關(guān)注我的博客:www.iamle.com 上面有更多內(nèi)容