Flink CDC實(shí)時(shí)同步Mysql多表(整庫)到Doris

寫在前面的話

本示例是借助Flink Doris Connector插件實(shí)現(xiàn)通過 Flink 同步mysql數(shù)據(jù)(讀取戚嗅、插入严沥、修改连霉、刪除)到Doris 中存儲(chǔ)。
最佳實(shí)踐:使用 Flink Doris Connector 最適合的場景就是實(shí)時(shí)/批次同步源數(shù)據(jù)(Mysql,Oracle残腌,PostgreSQL 等)到 Doris叠艳,使用 Flink 對 Doris 中的數(shù)據(jù)和其他數(shù)據(jù)源進(jìn)行聯(lián)合分析,這里介紹Mysql到doris的數(shù)據(jù)接入方式怯屉。
Doirs詳細(xì)參考鏈接
FlinkCDC詳細(xì)參考鏈接

1 軟件版本及環(huán)境準(zhǔn)備

1.1 軟件版本

Doris Version : doris-2.0.9
Flink flink-1.17.2
FlinkCDC 3.0.1

版本兼容

Connector Version Flink Version Doris Version Java Version Scala Version
1.0.3 1.11,1.12,1.13,1.14 0.15+ 8 2.11,2.12
1.1.1 1.14 1.0+ 8 2.11,2.12
1.2.1 1.15 1.0+ 8 -
1.3.0 1.16 1.0+ 8 -
1.4.0 1.15,1.16,1.17 1.0+ 8 -
1.5.2 1.15,1.16,1.17,1.18 1.0+ 8 -
1.6.1 1.15,1.16,1.17,1.18,1.19 1.0+ 8 -

1.2 環(huán)境準(zhǔn)備

準(zhǔn)備依賴文件到FLINK lib/文件下

flink-doris-connector-1.17-1.5.2.jar
mysql-connector-java-8.0.27.jar
flink-sql-connector-mysql-cdc-3.0.1.jar


jar 包依賴

2 編輯腳本

2.1 腳本模版及說明

腳本模版

<FLINK_HOME>bin/flink run \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar \
    <mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database> \
    --database <doris-database-name> \
    [--job-name <flink-job-name>] \
    [--table-prefix <doris-table-prefix>] \
    [--table-suffix <doris-table-suffix>] \
    [--including-tables <mysql-table-name|name-regular-expr>] \
    [--excluding-tables <mysql-table-name|name-regular-expr>] \
    --mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
    --oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
    --postgres-conf <postgres-cdc-source-conf> [--postgres-conf <postgres-cdc-source-conf> ...] \
    --sqlserver-conf <sqlserver-cdc-source-conf> [--sqlserver-conf <sqlserver-cdc-source-conf> ...] \
    --sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
    [--table-conf <doris-table-conf> [--table-conf <doris-table-conf> ...]]

參數(shù)說明:

Key Comment
--job-name Flink 任務(wù)名稱蔚舀,非必需
--database 同步到 Doris 的數(shù)據(jù)庫名
--table-prefix Doris 表前綴名,例如 --table-prefix ods_锨络。
--table-suffix 同上赌躺,Doris 表的后綴名。
--including-tables 需要同步的 MySQL 表羡儿,可以使用"|" 分隔多個(gè)表礼患,并支持正則表達(dá)式。比如--including-tables table1 例如:以 ts開頭的寫法 ts.*
--excluding-tables 不需要同步的表掠归,用法同上缅叠。
--mysql-conf MySQL CDCSource 配置,例如--mysql-conf hostname=127.0.0.1虏冻,您可以在這里查看所有配置 MySQL-CDC肤粱,其中 hostname/username/password/database-name 是必需的。同步的庫表中含有非主鍵表時(shí)厨相,必須設(shè)置 scan.incremental.snapshot.chunk.key-column领曼,且只能選擇非空類型的一個(gè)字段鸥鹉。例如:scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...,不同的庫表列之間用,隔開悯森。
--oracle-conf Oracle CDCSource 配置宋舷,例如--oracle-conf hostname=127.0.0.1,您可以在這里查看所有配置 Oracle-CDC瓢姻,其中 hostname/username/password/database-name/schema-name 是必需的祝蝠。
--postgres-conf Postgres CDCSource 配置,例如--postgres-conf hostname=127.0.0.1幻碱,您可以在這里查看所有配置 Postgres-CDC绎狭,其中 hostname/username/password/database-name/schema-name/slot.name 是必需的。
--sqlserver-conf SQLServer CDCSource 配置褥傍,例如--sqlserver-conf hostname=127.0.0.1儡嘶,您可以在這里查看所有配置 SQLServer-CDC,其中 hostname/username/password/database-name/schema-name 是必需的恍风。
--sink-conf Doris Sink 的所有配置蹦狂,可以在這里查看完整的配置項(xiàng)。
--table-conf Doris 表的配置項(xiàng)朋贬,即 properties 中包含的內(nèi)容(其中 table-buckets 例外凯楔,非 properties 屬性)。例如 --table-conf replication_num=1锦募,而 --table-conf table-buckets="tbl1:10,tbl2:20,a.:30,b.:40,.*:50"表示按照正則表達(dá)式順序指定不同表的 buckets 數(shù)量摆屯,如果沒有匹配到則采用 BUCKETS AUTO 建表。
--ignore-default-value 關(guān)閉同步 mysql 表結(jié)構(gòu)的默認(rèn)值糠亩。適用于同步 mysql 數(shù)據(jù)到 doris 時(shí)虐骑,字段有默認(rèn)值,但實(shí)際插入數(shù)據(jù)為 null 情況赎线。參考#152
--use-new-schema-change 是否使用新的 schema change廷没,支持同步 mysql 多列變更、默認(rèn)值垂寥。1.6.0 默認(rèn)為true颠黎。參考#167
--single-sink 是否使用單個(gè) Sink 同步所有表,開啟后也可自動(dòng)識(shí)別上游新創(chuàng)建的表矫废,自動(dòng)創(chuàng)建表盏缤。
--multi-to-one-origin 將上游多張表寫入同一張表時(shí),源表的配置蓖扑,比如:--multi-to-one-origin="a_.|b_."唉铜,具體參考#208
--multi-to-one-target 與 multi-to-one-origin 搭配使用,目標(biāo)表的配置律杠,比如:--multi-to-one-target="a|b"
--create-table-only 是否只僅僅創(chuàng)建表的結(jié)構(gòu)

2.2 運(yùn)行實(shí)例

mysql創(chuàng)建colin庫潭流,建如下表單竞惋,手動(dòng)寫入測試數(shù)據(jù)N條

-- colin.books definition

CREATE TABLE `books` (
  `id` varchar(99) NOT NULL,
  `title` varchar(99) DEFAULT NULL,
  `author` varchar(99) DEFAULT NULL,
  `price` double DEFAULT NULL,
  `qty` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;


-- colin.books_batch_1 definition

CREATE TABLE `books_batch_1` (
  `id` varchar(99) NOT NULL,
  `title` varchar(99) DEFAULT NULL,
  `author` varchar(99) DEFAULT NULL,
  `price` double DEFAULT NULL,
  `qty` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;


-- colin.books_batch_2 definition

CREATE TABLE `books_batch_2` (
  `id` varchar(99) NOT NULL,
  `title` varchar(99) DEFAULT NULL,
  `author` varchar(99) DEFAULT NULL,
  `price` double DEFAULT NULL,
  `qty` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
 

doris 創(chuàng)建庫即可,批量同步的時(shí)候需要指定目標(biāo)源的庫名,需要提前創(chuàng)建灰嫉。目標(biāo)表結(jié)構(gòu)會(huì)自動(dòng)創(chuàng)建

create database colin_batch;

制作同步腳本

--  無痕版本拆宛,直接修改運(yùn)行即可,flink on yarn per模式
bin/flink run -m yarn-cluster \
    -Dexecution.checkpointing.interval=10s \  
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.17-1.5.2.jar \
    mysql-sync-database \
    --database colin_batch \
    --table-prefix ods_ \
    --mysql-conf hostname=192.168.XX.XX \
    --mysql-conf port=3306 \
    --mysql-conf username=root \
    --mysql-conf password=123456 \
    --mysql-conf database-name=colin \
    --including-tables "books.*" \
    --sink-conf fenodes=192.168.XX.XX:8030 \
    --sink-conf username=root \
    --sink-conf password=123456 \
    --sink-conf jdbc-url=jdbc:mysql://192.168.XX.XX:9030 \
    --sink-conf sink.label-prefix=label \
    --table-conf replication_num=3

-- 注釋說明
bin/flink run -m yarn-cluster \    -- 指定flink程序運(yùn)行模式為yarn per模式
    -Dexecution.checkpointing.interval=10s \   -- 數(shù)據(jù)刷新時(shí)間 10s
    -Dparallelism.default=1 \ -- 平行度 默認(rèn)1
    -c org.apache.doris.flink.tools.cdc.CdcTools \ --類名讼撒,程序入口
    lib/flink-doris-connector-1.17-1.5.2.jar \  -- jar名稱
    mysql-sync-database \ -- 程序入?yún)⒒牒瘢琺ysql同步功能
    --database colin_batch \ -- doris 庫名
    --table-prefix ods_ \ -- 統(tǒng)一添加前綴 ods_
    --mysql-conf hostname=192.168.XX.XX \ -- mysql ip
    --mysql-conf port=3306 \ --mysql 端口
    --mysql-conf username=root \ -- mysql 用戶名
    --mysql-conf password=123456 \ -- mysql 密碼
    --mysql-conf database-name=colin \ -- 鏈接庫名
    --including-tables "books.*" \ --需要同步的表名,正則表達(dá)式
    --sink-conf fenodes=192.168.XX.XX:8030 \ -- doris fe服務(wù)
    --sink-conf username=root \ -- doris 用戶名
    --sink-conf password=123456 \ --doris 密碼
    --sink-conf jdbc-url=jdbc:mysql://192.168.XX.XX:9030 \ -- doris jdbc 鏈接url
    --sink-conf sink.label-prefix=label \  -- Stream load 導(dǎo)入使用的 label 前綴
    --table-conf replication_num=3  --doris 副本數(shù)

提交flink程序到y(tǒng)arn根盒,提交成功后會(huì)顯示钳幅,Job has been submitted with JobID XXX


shell提交成功截圖

查看flink ui界面
點(diǎn)擊yarn 對應(yīng)任務(wù)的ApplicationMaster

ApplicationMaster

任務(wù)運(yùn)行FLink監(jiān)控界面


flinkui

2.3 查看并測試數(shù)據(jù)同步情況
Mysql源庫源表:


image.png

Doris目標(biāo)庫目標(biāo)表:


image.png

測試 新增、修改炎滞、刪除均能同步到更新到doris敢艰。

3 錯(cuò)誤錦集

報(bào)錯(cuò) 1

Caused by: org.apache.flink.util.FlinkRuntimeException: Chunk splitting has encountered exception
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.checkSplitterErrors(MySqlSnapshotSplitAssigner.java:583)
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.getNext(MySqlSnapshotSplitAssigner.java:341)
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.getNext(MySqlHybridSplitAssigner.java:123)
at com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.assignSplits(MySqlSourceEnumerator.java:218)
at com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleSplitRequest(MySqlSourceEnumerator.java:109)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleRequestSplitEvent(SourceCoordinator.java:564)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambdahandleEventFromOperator3(SourceCoordinator.java:291)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambdarunInEventLoop10(SourceCoordinator.java:465)
... 8 more
Caused by: java.lang.IllegalStateException: Error when splitting chunks for colin.books_batch_3
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.splitTable(MySqlSnapshotSplitAssigner.java:307)
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.splitChunksForRemainingTables(MySqlSnapshotSplitAssigner.java:566)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more
Caused by: java.lang.RuntimeException: Fail to analyze table in chunk splitter.
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter.analyzeTable(MySqlChunkSplitter.java:158)
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter.splitChunks(MySqlChunkSplitter.java:119)
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.splitTable(MySqlSnapshotSplitAssigner.java:305)
... 6 more
Caused by: org.apache.flink.table.api.ValidationException: 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.
at com.ververica.cdc.connectors.mysql.source.utils.ChunkUtils.getChunkKeyColumn(ChunkUtils.java:67)
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter.analyzeTable(MySqlChunkSplitter.java:152)

報(bào)錯(cuò) 1解決

因?yàn)橥皆磶毂碇杏袀€(gè)別表沒有設(shè)置主鍵,flinkcdc 使用chunk 分塊是依據(jù)主鍵切分的册赛,所以會(huì)跑錯(cuò)钠导。增設(shè)主鍵,再次啟動(dòng)程序即可森瘪。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末牡属,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子柜砾,更是在濱河造成了極大的恐慌湃望,老刑警劉巖换衬,帶你破解...
    沈念sama閱讀 222,104評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件痰驱,死亡現(xiàn)場離奇詭異,居然都是意外死亡瞳浦,警方通過查閱死者的電腦和手機(jī)担映,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,816評論 3 399
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來叫潦,“玉大人蝇完,你說我怎么就攤上這事〈H铮” “怎么了短蜕?”我有些...
    開封第一講書人閱讀 168,697評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長傻咖。 經(jīng)常有香客問我朋魔,道長,這世上最難降的妖魔是什么卿操? 我笑而不...
    開封第一講書人閱讀 59,836評論 1 298
  • 正文 為了忘掉前任警检,我火速辦了婚禮孙援,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘扇雕。我一直安慰自己拓售,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 68,851評論 6 397
  • 文/花漫 我一把揭開白布镶奉。 她就那樣靜靜地躺著础淤,像睡著了一般。 火紅的嫁衣襯著肌膚如雪哨苛。 梳的紋絲不亂的頭發(fā)上值骇,一...
    開封第一講書人閱讀 52,441評論 1 310
  • 那天,我揣著相機(jī)與錄音移国,去河邊找鬼吱瘩。 笑死,一個(gè)胖子當(dāng)著我的面吹牛迹缀,可吹牛的內(nèi)容都是我干的使碾。 我是一名探鬼主播,決...
    沈念sama閱讀 40,992評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼祝懂,長吁一口氣:“原來是場噩夢啊……” “哼票摇!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起砚蓬,我...
    開封第一講書人閱讀 39,899評論 0 276
  • 序言:老撾萬榮一對情侶失蹤矢门,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后灰蛙,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體祟剔,經(jīng)...
    沈念sama閱讀 46,457評論 1 318
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 38,529評論 3 341
  • 正文 我和宋清朗相戀三年摩梧,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了物延。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 40,664評論 1 352
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡仅父,死狀恐怖叛薯,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情笙纤,我是刑警寧澤耗溜,帶...
    沈念sama閱讀 36,346評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站省容,受9級(jí)特大地震影響抖拴,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜蓉冈,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 42,025評論 3 334
  • 文/蒙蒙 一城舞、第九天 我趴在偏房一處隱蔽的房頂上張望轩触。 院中可真熱鬧,春花似錦家夺、人聲如沸脱柱。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,511評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽榨为。三九已至,卻和暖如春煌茴,著一層夾襖步出監(jiān)牢的瞬間随闺,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,611評論 1 272
  • 我被黑心中介騙來泰國打工蔓腐, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留矩乐,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 49,081評論 3 377
  • 正文 我出身青樓回论,卻偏偏與公主長得像散罕,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子傀蓉,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 45,675評論 2 359

推薦閱讀更多精彩內(nèi)容