寫在前面的話
本示例是借助Flink Doris Connector插件實(shí)現(xiàn)通過 Flink 同步mysql數(shù)據(jù)(讀取戚嗅、插入严沥、修改连霉、刪除)到Doris 中存儲(chǔ)。
最佳實(shí)踐:使用 Flink Doris Connector 最適合的場景就是實(shí)時(shí)/批次同步源數(shù)據(jù)(Mysql,Oracle残腌,PostgreSQL 等)到 Doris叠艳,使用 Flink 對 Doris 中的數(shù)據(jù)和其他數(shù)據(jù)源進(jìn)行聯(lián)合分析,這里介紹Mysql到doris的數(shù)據(jù)接入方式怯屉。
Doirs詳細(xì)參考鏈接
FlinkCDC詳細(xì)參考鏈接
1 軟件版本及環(huán)境準(zhǔn)備
1.1 軟件版本
Doris Version : doris-2.0.9
Flink flink-1.17.2
FlinkCDC 3.0.1
版本兼容
Connector Version | Flink Version | Doris Version | Java Version | Scala Version |
---|---|---|---|---|
1.0.3 | 1.11,1.12,1.13,1.14 | 0.15+ | 8 | 2.11,2.12 |
1.1.1 | 1.14 | 1.0+ | 8 | 2.11,2.12 |
1.2.1 | 1.15 | 1.0+ | 8 | - |
1.3.0 | 1.16 | 1.0+ | 8 | - |
1.4.0 | 1.15,1.16,1.17 | 1.0+ | 8 | - |
1.5.2 | 1.15,1.16,1.17,1.18 | 1.0+ | 8 | - |
1.6.1 | 1.15,1.16,1.17,1.18,1.19 | 1.0+ | 8 | - |
1.2 環(huán)境準(zhǔn)備
準(zhǔn)備依賴文件到FLINK lib/文件下
flink-doris-connector-1.17-1.5.2.jar
mysql-connector-java-8.0.27.jar
flink-sql-connector-mysql-cdc-3.0.1.jar
jar 包依賴
2 編輯腳本
2.1 腳本模版及說明
腳本模版
<FLINK_HOME>bin/flink run \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar \
<mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database> \
--database <doris-database-name> \
[--job-name <flink-job-name>] \
[--table-prefix <doris-table-prefix>] \
[--table-suffix <doris-table-suffix>] \
[--including-tables <mysql-table-name|name-regular-expr>] \
[--excluding-tables <mysql-table-name|name-regular-expr>] \
--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
--oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
--postgres-conf <postgres-cdc-source-conf> [--postgres-conf <postgres-cdc-source-conf> ...] \
--sqlserver-conf <sqlserver-cdc-source-conf> [--sqlserver-conf <sqlserver-cdc-source-conf> ...] \
--sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
[--table-conf <doris-table-conf> [--table-conf <doris-table-conf> ...]]
參數(shù)說明:
Key | Comment |
---|---|
--job-name | Flink 任務(wù)名稱蔚舀,非必需 |
--database | 同步到 Doris 的數(shù)據(jù)庫名 |
--table-prefix | Doris 表前綴名,例如 --table-prefix ods_锨络。 |
--table-suffix | 同上赌躺,Doris 表的后綴名。 |
--including-tables | 需要同步的 MySQL 表羡儿,可以使用"|" 分隔多個(gè)表礼患,并支持正則表達(dá)式。比如--including-tables table1 例如:以 ts開頭的寫法 ts.* |
--excluding-tables | 不需要同步的表掠归,用法同上缅叠。 |
--mysql-conf | MySQL CDCSource 配置,例如--mysql-conf hostname=127.0.0.1虏冻,您可以在這里查看所有配置 MySQL-CDC肤粱,其中 hostname/username/password/database-name 是必需的。同步的庫表中含有非主鍵表時(shí)厨相,必須設(shè)置 scan.incremental.snapshot.chunk.key-column领曼,且只能選擇非空類型的一個(gè)字段鸥鹉。例如:scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...,不同的庫表列之間用,隔開悯森。 |
--oracle-conf | Oracle CDCSource 配置宋舷,例如--oracle-conf hostname=127.0.0.1,您可以在這里查看所有配置 Oracle-CDC瓢姻,其中 hostname/username/password/database-name/schema-name 是必需的祝蝠。 |
--postgres-conf | Postgres CDCSource 配置,例如--postgres-conf hostname=127.0.0.1幻碱,您可以在這里查看所有配置 Postgres-CDC绎狭,其中 hostname/username/password/database-name/schema-name/slot.name 是必需的。 |
--sqlserver-conf | SQLServer CDCSource 配置褥傍,例如--sqlserver-conf hostname=127.0.0.1儡嘶,您可以在這里查看所有配置 SQLServer-CDC,其中 hostname/username/password/database-name/schema-name 是必需的恍风。 |
--sink-conf | Doris Sink 的所有配置蹦狂,可以在這里查看完整的配置項(xiàng)。 |
--table-conf | Doris 表的配置項(xiàng)朋贬,即 properties 中包含的內(nèi)容(其中 table-buckets 例外凯楔,非 properties 屬性)。例如 --table-conf replication_num=1锦募,而 --table-conf table-buckets="tbl1:10,tbl2:20,a.:30,b.:40,.*:50"表示按照正則表達(dá)式順序指定不同表的 buckets 數(shù)量摆屯,如果沒有匹配到則采用 BUCKETS AUTO 建表。 |
--ignore-default-value | 關(guān)閉同步 mysql 表結(jié)構(gòu)的默認(rèn)值糠亩。適用于同步 mysql 數(shù)據(jù)到 doris 時(shí)虐骑,字段有默認(rèn)值,但實(shí)際插入數(shù)據(jù)為 null 情況赎线。參考#152 |
--use-new-schema-change | 是否使用新的 schema change廷没,支持同步 mysql 多列變更、默認(rèn)值垂寥。1.6.0 默認(rèn)為true颠黎。參考#167 |
--single-sink | 是否使用單個(gè) Sink 同步所有表,開啟后也可自動(dòng)識(shí)別上游新創(chuàng)建的表矫废,自動(dòng)創(chuàng)建表盏缤。 |
--multi-to-one-origin | 將上游多張表寫入同一張表時(shí),源表的配置蓖扑,比如:--multi-to-one-origin="a_.|b_."唉铜,具體參考#208 |
--multi-to-one-target | 與 multi-to-one-origin 搭配使用,目標(biāo)表的配置律杠,比如:--multi-to-one-target="a|b" |
--create-table-only | 是否只僅僅創(chuàng)建表的結(jié)構(gòu) |
2.2 運(yùn)行實(shí)例
mysql創(chuàng)建colin庫潭流,建如下表單竞惋,手動(dòng)寫入測試數(shù)據(jù)N條
-- colin.books definition
CREATE TABLE `books` (
`id` varchar(99) NOT NULL,
`title` varchar(99) DEFAULT NULL,
`author` varchar(99) DEFAULT NULL,
`price` double DEFAULT NULL,
`qty` int DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
-- colin.books_batch_1 definition
CREATE TABLE `books_batch_1` (
`id` varchar(99) NOT NULL,
`title` varchar(99) DEFAULT NULL,
`author` varchar(99) DEFAULT NULL,
`price` double DEFAULT NULL,
`qty` int DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
-- colin.books_batch_2 definition
CREATE TABLE `books_batch_2` (
`id` varchar(99) NOT NULL,
`title` varchar(99) DEFAULT NULL,
`author` varchar(99) DEFAULT NULL,
`price` double DEFAULT NULL,
`qty` int DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
doris 創(chuàng)建庫即可,批量同步的時(shí)候需要指定目標(biāo)源的庫名,需要提前創(chuàng)建灰嫉。目標(biāo)表結(jié)構(gòu)會(huì)自動(dòng)創(chuàng)建
create database colin_batch;
制作同步腳本
-- 無痕版本拆宛,直接修改運(yùn)行即可,flink on yarn per模式
bin/flink run -m yarn-cluster \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.17-1.5.2.jar \
mysql-sync-database \
--database colin_batch \
--table-prefix ods_ \
--mysql-conf hostname=192.168.XX.XX \
--mysql-conf port=3306 \
--mysql-conf username=root \
--mysql-conf password=123456 \
--mysql-conf database-name=colin \
--including-tables "books.*" \
--sink-conf fenodes=192.168.XX.XX:8030 \
--sink-conf username=root \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://192.168.XX.XX:9030 \
--sink-conf sink.label-prefix=label \
--table-conf replication_num=3
-- 注釋說明
bin/flink run -m yarn-cluster \ -- 指定flink程序運(yùn)行模式為yarn per模式
-Dexecution.checkpointing.interval=10s \ -- 數(shù)據(jù)刷新時(shí)間 10s
-Dparallelism.default=1 \ -- 平行度 默認(rèn)1
-c org.apache.doris.flink.tools.cdc.CdcTools \ --類名讼撒,程序入口
lib/flink-doris-connector-1.17-1.5.2.jar \ -- jar名稱
mysql-sync-database \ -- 程序入?yún)⒒牒瘢琺ysql同步功能
--database colin_batch \ -- doris 庫名
--table-prefix ods_ \ -- 統(tǒng)一添加前綴 ods_
--mysql-conf hostname=192.168.XX.XX \ -- mysql ip
--mysql-conf port=3306 \ --mysql 端口
--mysql-conf username=root \ -- mysql 用戶名
--mysql-conf password=123456 \ -- mysql 密碼
--mysql-conf database-name=colin \ -- 鏈接庫名
--including-tables "books.*" \ --需要同步的表名,正則表達(dá)式
--sink-conf fenodes=192.168.XX.XX:8030 \ -- doris fe服務(wù)
--sink-conf username=root \ -- doris 用戶名
--sink-conf password=123456 \ --doris 密碼
--sink-conf jdbc-url=jdbc:mysql://192.168.XX.XX:9030 \ -- doris jdbc 鏈接url
--sink-conf sink.label-prefix=label \ -- Stream load 導(dǎo)入使用的 label 前綴
--table-conf replication_num=3 --doris 副本數(shù)
提交flink程序到y(tǒng)arn根盒,提交成功后會(huì)顯示钳幅,Job has been submitted with JobID XXX
查看flink ui界面
點(diǎn)擊yarn 對應(yīng)任務(wù)的ApplicationMaster
任務(wù)運(yùn)行FLink監(jiān)控界面
2.3 查看并測試數(shù)據(jù)同步情況
Mysql源庫源表:
Doris目標(biāo)庫目標(biāo)表:
測試 新增、修改炎滞、刪除均能同步到更新到doris敢艰。
3 錯(cuò)誤錦集
報(bào)錯(cuò) 1
Caused by: org.apache.flink.util.FlinkRuntimeException: Chunk splitting has encountered exception
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.checkSplitterErrors(MySqlSnapshotSplitAssigner.java:583)
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.getNext(MySqlSnapshotSplitAssigner.java:341)
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlHybridSplitAssigner.getNext(MySqlHybridSplitAssigner.java:123)
at com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.assignSplits(MySqlSourceEnumerator.java:218)
at com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator.handleSplitRequest(MySqlSourceEnumerator.java:109)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.handleRequestSplitEvent(SourceCoordinator.java:564)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda3(SourceCoordinator.java:291)
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda10(SourceCoordinator.java:465)
... 8 more
Caused by: java.lang.IllegalStateException: Error when splitting chunks for colin.books_batch_3
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.splitTable(MySqlSnapshotSplitAssigner.java:307)
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.splitChunksForRemainingTables(MySqlSnapshotSplitAssigner.java:566)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more
Caused by: java.lang.RuntimeException: Fail to analyze table in chunk splitter.
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter.analyzeTable(MySqlChunkSplitter.java:158)
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter.splitChunks(MySqlChunkSplitter.java:119)
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.splitTable(MySqlSnapshotSplitAssigner.java:305)
... 6 more
Caused by: org.apache.flink.table.api.ValidationException: 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.
at com.ververica.cdc.connectors.mysql.source.utils.ChunkUtils.getChunkKeyColumn(ChunkUtils.java:67)
at com.ververica.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter.analyzeTable(MySqlChunkSplitter.java:152)
報(bào)錯(cuò) 1解決
因?yàn)橥皆磶毂碇杏袀€(gè)別表沒有設(shè)置主鍵,flinkcdc 使用chunk 分塊是依據(jù)主鍵切分的册赛,所以會(huì)跑錯(cuò)钠导。增設(shè)主鍵,再次啟動(dòng)程序即可森瘪。