Apache Flink 社區(qū)很高興地宣布發(fā)布 Flink CDC 3.1.0烙懦!這是社區(qū)在接受 Flink CDC 作為 Apache Flink 的子項(xiàng)目后的首個(gè)版本,帶來(lái)了令人興奮的新功能堕绩,如 transformation 的支持和分庫(kù)分表合并凤巨。Flink CDC 的生態(tài)系統(tǒng)也在不斷擴(kuò)展巡通,包括新增的 Pipeline 連接器 Kafka 和 Paimon 以及對(duì)已有的 Source 連接器增強(qiáng)。
歡迎瀏覽 Flink CDC 文檔和嘗試快速入門(mén)教程來(lái)探索 Flink CDC 的世界!同時(shí)歡迎下載該版本并通過(guò) Flink 的郵件列表或JIRA在 Flink 社區(qū)進(jìn)行討論和分享,期待大家的反饋悉罕!
Flink CDC 3.1 快速預(yù)覽
作為 Flink CDC 成為 Apache Flink 子項(xiàng)目之后的首個(gè)版本,3.1 帶來(lái)了許多新功能與穩(wěn)定性提升立镶。主要亮點(diǎn)包括:
- Transformation 支持:通過(guò) YAML 管道定義中的 transform 部分壁袄,用戶可以對(duì)數(shù)據(jù)變化事件進(jìn)行投影、計(jì)算和添加常量列等轉(zhuǎn)化谜慌,使用類似 SQL 的語(yǔ)法然想,提升數(shù)據(jù)集成管道的靈活性。
- 分庫(kù)分表合并支持:可以通過(guò)在 YAML 管道定義中配置路由將多個(gè)表合并到一個(gè)目標(biāo)表欣范,自動(dòng)處理業(yè)務(wù)數(shù)據(jù)在不同表或數(shù)據(jù)庫(kù)的分區(qū)及源表的 schema 變化。
- 新連接器:引入了新的 Apache Kafka 和 Apache Paimon 的 Pipeline Sink令哟,增強(qiáng)了生態(tài)系統(tǒng)的擴(kuò)展性恼琼,其中Kafka Sink 使得用戶可以發(fā)送原始Debeizum/ Canal Json 格式的CDC數(shù)據(jù)到消息隊(duì)列,Paimon Sink 則是讓用戶可以簡(jiǎn)單高效地完成MySQL實(shí)時(shí)入湖屏富。
- 連接器改進(jìn):如 MySQL 增加了 tables.exclude 選項(xiàng)和 MysqlDebeziumTimeConverter晴竞,OceanBase 支持 DebeziumDeserializationSchema,Db2 遷移到統(tǒng)一增量快照框架等狠半。
Flink CDC 3.1 核心特性解讀
Transformation 支持
Flink CDC 3.1.0 引入了在 CDC pipeline 中進(jìn)行數(shù)據(jù)變換(transformation)的功能噩死。通過(guò)在 YAML pipeline 定義中加入 transform
部分,用戶現(xiàn)在可以輕松地對(duì)來(lái)源的數(shù)據(jù)變更事件應(yīng)用各種變換神年,包括投影已维、計(jì)算和添加常量列,從而提高數(shù)據(jù)集成管道的效率已日。新特性利用類似 SQL 的語(yǔ)法定義這些轉(zhuǎn)換垛耳,確保用戶可以快速適應(yīng)并使用它。例如飘千,只需編寫(xiě)如下 YAML 語(yǔ)句塊:
transform:
- source-table: db.tbl1
projection: id, age, weight, height, weight / (height * height) as bmi
filter: age > 18 AND name IS NOT NULL
即可對(duì)傳遞的數(shù)據(jù)流應(yīng)用投影操作(僅保留原表中的部分列)堂鲜、計(jì)算操作(根據(jù)原列數(shù)據(jù)計(jì)算新列并追加到數(shù)據(jù)記錄中)和過(guò)濾操作(從結(jié)果中清除符合條件的數(shù)據(jù)行)』つ危可以書(shū)寫(xiě)多條 Transform 規(guī)則缔莲,它們會(huì)同時(shí)生效。[1]
分庫(kù)分表合并支持
Flink CDC 3.1.0 現(xiàn)在通過(guò)在 YAML pipeline 定義中配置 route
霉旗,在分庫(kù)分表場(chǎng)景下將多表合并為一個(gè)痴奏。由于業(yè)務(wù)數(shù)據(jù)量龐大磺箕,業(yè)務(wù)數(shù)據(jù)經(jīng)常會(huì)被分別存放在多個(gè)表甚至數(shù)據(jù)庫(kù)中。通過(guò)配置route
抛虫,用戶可以將多張?jiān)幢碛成渲镣粋€(gè)目標(biāo)表松靡,在同步時(shí),數(shù)據(jù)變更事件(DataChangeEvent)和 Schema 變更事件都將被合并到指定的目標(biāo)表中建椰。例如雕欺,只需編寫(xiě)如下 YAML 語(yǔ)句塊:
route:
- source-table: db.tbl\.*
sink-table: db.unified
- source-table: db.tbl_log\.*
sink-table: db.log
即可將源庫(kù)中所有匹配 tbl.*
和 tbl_log.*
正則表達(dá)式的分片表合并,并分別同步到下游的 db.unified
和 db.log
匯表中棉姐。(.
用于分隔數(shù)據(jù)庫(kù)名稱和表名稱屠列,因此作為正則表達(dá)式關(guān)鍵字時(shí)需要使用 \
進(jìn)行轉(zhuǎn)義。)可以書(shū)寫(xiě)多條 Route 規(guī)則伞矩,它們會(huì)同時(shí)生效笛洛。[2]
Flink CDC 3.1 新功能最佳實(shí)踐
使用 Kafka Pipeline Sink 高效寫(xiě)入 Canal/Debezium 格式數(shù)據(jù)
Flink CDC 3.1.0 引入了新的 Kafka Pipeline Sink(基于 Kafka 3.2.3 版本)。現(xiàn)在乃坤,您可以編寫(xiě)如下所示的 YAML 語(yǔ)句塊來(lái)定義一個(gè)從 MySQL 捕獲變化數(shù)據(jù)并寫(xiě)入下游 Kafka Sink 的 Pipeline 作業(yè)[3]:
source:
type: mysql
# ...
sink:
type: kafka
properties.bootstrap.servers: PLAINTEXT://localhost:62510
value.format: canal-json
該作業(yè)將來(lái)自 MySQL 上游的變化數(shù)據(jù)編碼為 Canal JSON 格式苛让,并寫(xiě)入到指定的 Kafka 服務(wù)器中;相比于 Flink SQL Changelog 格式湿诊,F(xiàn)link CDC 不會(huì)將數(shù)據(jù)更新事件拆分為 BEFORE 和 AFTER 兩條記錄狱杰,能夠更高效地處理分區(qū)表場(chǎng)景,并支持將事件序列化為 Debezium 和 Canal JSON 格式厅须。
Flink 支持將上述格式解析為標(biāo)準(zhǔn)變更消息處理[4]仿畸,因此您可以簡(jiǎn)單地使用以下 Flink SQL 將其攝入流式處理框架,整個(gè)過(guò)程無(wú)需額外部署 Canal 或 Debezium 集群朗和,直接復(fù)用已有 Flink 集群即可:
CREATE TABLE topic_products (
-- 上游的 Schema 結(jié)構(gòu)
) WITH (
'connector' = 'kafka',
-- ...
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'canal-json' -- 從 Kafka 攝取 Canal JSON 格式數(shù)據(jù)
)
完整的數(shù)據(jù)流示意圖如下所示:
更高效地實(shí)時(shí)入湖 Paimon
Flink CDC 3.1.0 引入了新的 Apache Paimon Pipeline Sink(基于 Paimon 0.7.0 版本)〈砉粒現(xiàn)在,您可以編寫(xiě)如下所示的 YAML 語(yǔ)句塊來(lái)定義一個(gè)從 MySQL 捕獲變化數(shù)據(jù)并寫(xiě)入下游 Paimon Sink 的 Pipeline 作業(yè)[5]:
source:
type: mysql
# ...
sink:
type: paimon
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse
可選擇的下游元數(shù)據(jù)存儲(chǔ)支持 FileSystem 和 Hive眶拉。在啟用 Schema Evolution 選項(xiàng)時(shí)千埃,F(xiàn)link CDC 會(huì)同時(shí)捕獲數(shù)據(jù)變更和表結(jié)構(gòu)變更、在應(yīng)用 Transform 和 Route 規(guī)則后將數(shù)據(jù)發(fā)送到下游镀层,并將結(jié)構(gòu)變更應(yīng)用到 Catalog 中镰禾。完整的數(shù)據(jù)流示意圖如下所示:
相比于使用 Flink SQL 和 Paimon CDC Action 的同步方案,F(xiàn)link CDC Pipeline 作業(yè)支持將上游表結(jié)構(gòu)變更動(dòng)態(tài)應(yīng)用至下游唱逢,且進(jìn)一步支持了對(duì)上游表進(jìn)行列投影和行過(guò)濾吴侦,提供細(xì)粒度的數(shù)據(jù)路由規(guī)則,追加計(jì)算列的邏輯配置更加簡(jiǎn)單坞古。例如备韧,以下 Paimon Action 變換語(yǔ)句[6]:
flink run paimon-action.jar
--metadata_column "table_name"
--computed_column "name=UPPER(name)"
--computed_column "nameage=CONCAT(name, age)"
可以使用 Flink CDC YAML 等效地表述為:
projection: \*, __table_name__, UPPER(name) as newage, CONCAT(name, age) as nameage
其他改進(jìn)
MySQL Pipeline 連接器
在此版本中,MySQL pipeline source 引入了一個(gè)新的選項(xiàng) tables.exclude
痪枫,用戶可以更簡(jiǎn)單地使用正則表達(dá)式排除不必要的表织堂。
MySQL Source 連接器
MySQL CDC source 同時(shí)新增了一個(gè)自定義轉(zhuǎn)換器 MysqlDebeziumTimeConverter
叠艳,用于將時(shí)間類型列轉(zhuǎn)換為更易于讀取和序列化的字符串。
OceanBase Source 連接器
OceanBase CDC source 現(xiàn)在支持指定通用的 DebeziumDeserializationSchema
易阳,以重用現(xiàn)有的 Debezium 反序列化器附较。
Db2 Source 連接器
Db2 CDC source 已經(jīng)遷移至統(tǒng)一的增量快照框架。
SinkFunction 支持
盡管 SinkFunction
在 Flink 中已被標(biāo)記為棄用潦俺,但考慮到一些 Flink connector 仍在使用該 API拒课,我們也為 CDC pipeline sink 支持 SinkFunction
API 以幫助擴(kuò)展 Flink CDC 的生態(tài)系統(tǒng)。
CLI 支持從 savepoint 恢復(fù)
Flink CDC pipeline 提交 CLI 現(xiàn)在支持通過(guò)命令行參數(shù) --from-savepoint
從特定的 savepoint 文件恢復(fù) Flink 作業(yè)事示。
Flink CDC 3.1 版本兼容性
捐贈(zèng)給 Apache 基金會(huì)使得 Flink CDC 項(xiàng)目更中立的同時(shí)也帶來(lái)了短期的不便早像,因?yàn)?Apache 基金會(huì)對(duì)所屬項(xiàng)目的包名、license 有著嚴(yán)格的要求肖爵,因此在 Flink CDC 3.1 版本之前版本存在不兼容的情況卢鹦,我們?cè)诖嗽敿?xì)說(shuō)明。當(dāng)然劝堪,后續(xù)的 3.2冀自、3.3 版本會(huì)與 3.1 版本保持兼容。
Group ID 和 Package 路徑變更
如果您正通過(guò) Maven 或 Gradle 聲明 Flink CDC 依賴幅聘,則需要在升級(jí)到 3.1 版本的同時(shí)將 Group ID 從 com.ververica.cdc
改為 org.apache.flink.cdc
凡纳,同時(shí)更改源代碼中 import Package 路徑。
用于 Flink SQL 作業(yè)的 Flink Source 連接器的重要更改
由于許可證與 Apache 2.0 License 不兼容帝蒿,我們無(wú)法將以下連接器的 JDBC driver 包含在我們的二進(jìn)制發(fā)布包中:
- Db2
- MySQL
- Oracle
- OceanBase
請(qǐng)手動(dòng)將相應(yīng)的 JDBC 驅(qū)動(dòng)程序下載到 Flink 集群的 $FLINK_HOME/lib
目錄中,或在使用 --jar
提交 YAML pipeline 時(shí)指定驅(qū)動(dòng)程序的路徑巷怜。如果您在使用 Flink SQL葛超,請(qǐng)確保它們?cè)?classpath 下。
作業(yè) State 兼容性
由于以上不兼容的變更延塑,使用 Flink CDC 3.1 以前版本保存的作業(yè) State 無(wú)法在較新版本上恢復(fù)绣张。因此,您需要在升級(jí) Flink CDC 版本后進(jìn)行一次無(wú)狀態(tài)重啟关带。
致謝
衷心感謝以下開(kāi)發(fā)者為 Flink CDC 3.1.0 版本作出的貢獻(xiàn)侥涵!
Check Null, FocusComputing, GOODBOY008, Hang Ruan, He Wang, Hongshun Wang, Jiabao Sun, Kunni, L, Laffery, Leonard Xu, Muhammet Orazov, Paul Lin, PengFei Li, Qingsheng Ren, Qishang Zhong, Shawn Huang, Thorne, TorinJie, Xianxun Ye, Xin Gong, Yaroslav Tkachenko, e-mhui, gongzhongqiang, joyCurry30, kunni, lzshlzsh, qwding, shikai93, sky, skylines, wenmo, wudi, xleoken, xuzifu666, yanghuaiGit, yuxiqian, 張?zhí)?/p>
參考鏈接:
[1] https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/core-concept/transform/
[2] https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/core-concept/route/
[3] https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/kafka/
[4] https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/formats/canal/
[5] https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/paimon/
[6] https://paimon.apache.org/docs/master/flink/cdc-ingestion/mysql-cdc/