摘要:本文投稿自阿里云開源大數(shù)據(jù)平臺(tái)數(shù)據(jù)通道團(tuán)隊(duì)奴艾,主要介紹了 Flink CDC YAML 在實(shí)時(shí)計(jì)算Flink版的最佳實(shí)踐务漩。內(nèi)容分為以下五個(gè)部分:
- CDC YAML 簡(jiǎn)介
- CDC YAML 核心能力
- CDC YAML 應(yīng)用場(chǎng)景
- 阿里云 Flink CDC 企業(yè)級(jí)功能
- 十分鐘在阿里云免費(fèi)實(shí)現(xiàn)一個(gè) CDC YAML 作業(yè)
CDC YAML 簡(jiǎn)介
CDC YAML 是 Flink CDC [1] 提供的簡(jiǎn)單易用的數(shù)據(jù)集成 API况木,用于幫助用戶快速構(gòu)建功能強(qiáng)大的數(shù)據(jù)同步鏈路咐蚯,實(shí)時(shí)地同步業(yè)務(wù)數(shù)據(jù)庫中的數(shù)據(jù)變更和表結(jié)構(gòu)變更實(shí)時(shí)同步到數(shù)據(jù)倉庫,數(shù)據(jù)湖以及其他下游系統(tǒng)苛让。CDC YAML 上手門檻較低沟蔑,即使沒有研發(fā)背景和Flink基礎(chǔ)湿诊,用戶也可以較快地完成數(shù)據(jù)的同步和ETL加工,快速完成數(shù)據(jù)的實(shí)時(shí)入湖入倉瘦材,加速數(shù)據(jù)分析效率厅须。
阿里云實(shí)時(shí)計(jì)算 Flink 版基于 Flink CDC 提供了數(shù)據(jù)攝入 CDC YAML 開發(fā)[2],通過開發(fā) YAML 作業(yè)的方式有效地實(shí)現(xiàn)將數(shù)據(jù)從源端同步到目標(biāo)端的數(shù)據(jù)攝入工作食棕,幫助用戶在云上高效完成數(shù)據(jù)入湖入倉朗和。
CDC YAML 核心能力
CDC YAML 主要用于同步數(shù)據(jù)庫變更到其他系統(tǒng),同步前支持對(duì)數(shù)據(jù)進(jìn)行簡(jiǎn)單處理和清洗宣蠕,幫助用戶完成秒級(jí)的數(shù)據(jù)同步工作例隆,構(gòu)建入湖入倉作業(yè)甥捺,主要的核心能力有以下幾個(gè)方面:
端到端 Data Pipeline:支持秒級(jí)同步數(shù)據(jù)變更和結(jié)構(gòu)變更到其他系統(tǒng)抢蚀,用戶可以快速構(gòu)建自己的數(shù)據(jù)湖和數(shù)據(jù)倉庫,為數(shù)據(jù)分析提供基礎(chǔ)镰禾。
細(xì)粒度 Schema Evolution:出于數(shù)據(jù)安全的考慮皿曲,部分高危操作(如刪除表,清空數(shù)據(jù)等)用戶不希望同步到目標(biāo)端吴侦。CDC YAML 提供細(xì)粒度 Schema 變更的能力屋休,幫助用戶限制可同步的數(shù)據(jù)變更類型。
全增量一體化 CDC 讀缺溉汀:CDC YAML提供了全量和增量的一體化讀取能力劫樟,全量數(shù)據(jù)讀取完成后自動(dòng)切換增量讀取,無需用戶操作织堂。
豐富的 Transform 支持 :支持對(duì)數(shù)據(jù)字段進(jìn)行處理后同步到下游叠艳,如添加額外計(jì)算列,添加元數(shù)據(jù)易阳、只同步某些列附较、重新指定主鍵或分區(qū)鍵等場(chǎng)景。CDC YAML 內(nèi)置了豐富的函數(shù)潦俺,用戶也可以自行開發(fā)UDF拒课,兼容 Flink 的 UDF。支持對(duì)數(shù)據(jù)源數(shù)據(jù)進(jìn)行過濾事示,跳過不需要的數(shù)據(jù)早像,完成對(duì)無用數(shù)據(jù)和臟數(shù)據(jù)的清洗工作。
靈活的 Route 策略控制:支持自定義數(shù)據(jù)源的表同步到目標(biāo)端表的映射關(guān)系肖爵,支持一對(duì)一扎酷、一對(duì)多和多對(duì)一的多種映射關(guān)系,幫助用戶靈活指定目標(biāo)端表名遏匆,支持分庫分表合并的場(chǎng)景法挨。
完善的 作業(yè) Metric 支持:為了便于判斷作業(yè)運(yùn)行的階段和狀態(tài)谁榜,CDC YAML 提供了豐富的指標(biāo)。如全量階段未處理/已處理的表數(shù)量凡纳,全量階段未處理/已處理的分片數(shù)量窃植、最新一條數(shù)據(jù)的時(shí)間戳等。
阿里云實(shí)時(shí)計(jì)算 Flink 結(jié)合用戶需求荐糜,YAML 作業(yè)支持了更多的上下游巷怜,支持同步到常見的數(shù)據(jù)湖和數(shù)據(jù)倉庫,已經(jīng)支持的上下游如下暴氏。
連接器 | 支持類型 | |
---|---|---|
Source | Sink | |
MySQL | √ | × |
消息隊(duì)列Kafka | √ | √ |
實(shí)時(shí)數(shù)倉Hologres | × | √ |
Upsert Kafka | × | √ |
× | √ | |
StarRocks | × | √ |
流式數(shù)據(jù)湖倉Paimon | × | √ |
YAML 作業(yè) 與 SQL / DataStream 作業(yè)對(duì)比
Flink 提供了兩種級(jí)別的作業(yè)開發(fā)方式:SQL 和 DataStream延塑,下面會(huì)比較一下相比于 SQL 和 DataStream來說 CDC YAML 開發(fā)有什么優(yōu)勢(shì)。
CDC YAML 作業(yè)相比 SQL 主要有以下一些優(yōu)勢(shì):
數(shù)據(jù)攝入YAML | SQL |
---|---|
自動(dòng)識(shí)別 Schema答渔,支持整庫同步 | 需要人工寫 Create Table 和 Insert 語句 |
支持細(xì)粒度 Schema 變更 | 不支持 Schema 變更 |
支持原始 Changelog 同步 | 破壞原始 Changelog 結(jié)構(gòu) |
支持讀寫多個(gè)表 | 讀寫單個(gè)表 |
CDC YAML 作業(yè)相比 DataStream 作業(yè)的優(yōu)勢(shì)如下:
數(shù)據(jù)攝入YAML | DataStream |
---|---|
為各級(jí)別用戶設(shè)計(jì)关带,不只是專家 | 需要熟悉Java和分布式系統(tǒng) |
隱藏底層細(xì)節(jié),便于開發(fā) | 需要熟悉Flink框架 |
YAML格式容易理解和學(xué)習(xí) | 需要了解Maven等工具管理相關(guān)依賴 |
已有作業(yè)方便復(fù)用 | 難以復(fù)用已有代碼 |
CDC YAML 應(yīng)用場(chǎng)景
CDC YAML 能夠支持用戶數(shù)據(jù)同步多種應(yīng)用場(chǎng)景沼撕,下面簡(jiǎn)單介紹一些常見的使用場(chǎng)景宋雏,以及應(yīng)該如何使用 CDC YAML 解決這些問題。
整庫同步务豺,構(gòu)建數(shù)據(jù)湖倉
整庫同步是數(shù)據(jù)同步最常見的一種使用場(chǎng)景磨总,將存在數(shù)據(jù)庫的數(shù)據(jù)同步到數(shù)據(jù)湖或數(shù)據(jù)倉庫中,為后續(xù)的數(shù)據(jù)分析提供基礎(chǔ)笼沥。
如下的數(shù)據(jù)攝入YAML作業(yè)可以完成同步整個(gè)app_db數(shù)據(jù)庫到Paimon的工作蚪燕,快速完成數(shù)據(jù)入湖的同步工作。
<pre class="md-fences md-end-block ty-contain-cm modeLoaded" spellcheck="false" lang="yaml" cid="n124" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; caret-color: rgb(51, 51, 51); color: rgb(51, 51, 51); font-style: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: auto; text-indent: 0px; text-transform: none; widows: auto; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration: none;">source:
type: mysql
name: MySQL Source
hostname: {secret_values.mysql-password}
tables: app_db..*
server-id: 18601-18604
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: oss://test-bucket/warehouse
catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
catalog.properties.fs.oss.accessKeyId: {secret_values.test_sk}</pre>
在整庫同步中奔浅,有時(shí)需要重新定義同步到目標(biāo)端的表名馆纳,防止發(fā)生沖突〕送梗或者需要做一些簡(jiǎn)單的數(shù)據(jù)處理和數(shù)據(jù)過濾工作厕诡,這時(shí)需要結(jié)合 transform 模塊和 route 模塊完成,transform 模塊負(fù)責(zé)數(shù)據(jù)處理工作营勤,route 模塊負(fù)責(zé)數(shù)據(jù)分發(fā)工作灵嫌。
例如在如下的整庫同步作業(yè)中,為 app_db.customers 表添加額外的計(jì)算列 upper 和數(shù)據(jù)庫名這個(gè)元數(shù)據(jù)列db葛作,同時(shí)在目標(biāo)端為三張表名添加版本后綴寿羞。
<pre class="md-fences md-end-block ty-contain-cm modeLoaded" spellcheck="false" lang="yaml" cid="n127" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; caret-color: rgb(51, 51, 51); color: rgb(51, 51, 51); font-style: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: auto; text-indent: 0px; text-transform: none; widows: auto; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration: none;">source:
type: mysql
name: MySQL Source
hostname: {secret_values.mysql-password}
tables: app_db..*
server-id: 18601-18604
transform:
- source-table: app_db.customers
projection: *, UPPER(name
) AS upper, schema_name AS db
route:
- source-table: app_db.customers
sink-table: app_db.customers_v1 - source-table: app_db.products
sink-table: app_db.products_v0 - source-table: app_db.orders
sink-table: app_db.orders_v0
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: oss://test-bucket/warehouse
catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
catalog.properties.fs.oss.accessKeyId: {secret_values.test_sk}</pre>
CDC YAML提供了豐富的內(nèi)置函數(shù),還支持用戶自定義函數(shù)來完成復(fù)雜的數(shù)據(jù)處理赂蠢。CDC YAML 對(duì)Flink SQL 的自定義函數(shù)進(jìn)行了兼容绪穆,大部分的 Flink 自定義函數(shù)可以在數(shù)據(jù)攝入YAML中直接使用。
分庫分表合并
在高并發(fā)和大數(shù)據(jù)量場(chǎng)景下,用戶可能選擇將一個(gè)表拆分為多個(gè)庫的多張表存儲(chǔ)數(shù)據(jù)玖院,對(duì)于分庫分表在分析數(shù)據(jù)前菠红,希望將數(shù)據(jù)合并為數(shù)據(jù)湖倉中的一張表。
假設(shè)app_db數(shù)據(jù)庫中只有customers_v0难菌,customers_v1和customers_v2三張表试溯,如下 CDC YAML 作業(yè)可以將這三張分表合并為一張表customers,完成分庫分表的數(shù)據(jù)同步郊酒。
<pre class="md-fences md-end-block ty-contain-cm modeLoaded" spellcheck="false" lang="yaml" cid="n132" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; caret-color: rgb(51, 51, 51); color: rgb(51, 51, 51); font-style: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: auto; text-indent: 0px; text-transform: none; widows: auto; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration: none;">source:
type: mysql
name: MySQL Source
hostname: {secret_values.mysql-password}
tables: app_db.customers.*
server-id: 18601-18604
route:
- source-table: app_db.customers.*
sink-table: app_db.customers
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: oss://test-bucket/warehouse
catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
catalog.properties.fs.oss.accessKeyId: {secret_values.test_sk}</pre>
原始 Binlog 數(shù)據(jù)同步 Kafka遇绞,對(duì)接已有系統(tǒng)
用戶除了整庫整表同步,有些場(chǎng)景下需要獲取到詳細(xì)的 Changelog 變更歷史燎窘,而不是變更后的數(shù)據(jù)摹闽。將 Binlog 同步到 Kafka,結(jié)合分布式消息隊(duì)列 Kafka 可以提高數(shù)據(jù)消費(fèi)速度褐健,解決消費(fèi)同一個(gè) Binlog 導(dǎo)致的數(shù)據(jù)瓶頸付鹿。后續(xù)能夠使用 Kafka 里的 Binlog 歷史, 進(jìn)行數(shù)據(jù)回放和數(shù)據(jù)審計(jì)工作铝量,或者消費(fèi) Kafka 實(shí)時(shí)監(jiān)控?cái)?shù)據(jù)變更歷史倘屹,從而觸發(fā)通知和報(bào)警银亲。
由于 Flink SQL 作業(yè)里傳遞的數(shù)據(jù)結(jié)構(gòu)是 RowData慢叨,會(huì)將一條 Update 操作拆分為 Update before 和 Update after 兩條消息發(fā)送,破壞了 Changelog 原有的結(jié)構(gòu)务蝠,無法完成原始 Binlog 數(shù)據(jù)同步拍谐。CDC YAML 使用了 SchemaChangeEvent 和 DataChangeEvent 傳遞數(shù)據(jù),可以完整保留 Changelog馏段,幫助用戶完成同步原始 Binlog 數(shù)據(jù)到 Kafka 的工作轩拨。
如下作業(yè)可以將數(shù)據(jù)庫 app_db 的變更歷史同步到 Kafka,app_db 數(shù)據(jù)庫下的表 customers院喜、products 和 shipments 的變更會(huì)各自寫入對(duì)應(yīng)的 topic 中亡蓉。
<pre class="md-fences md-end-block ty-contain-cm modeLoaded" spellcheck="false" lang="yaml" cid="n137" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; caret-color: rgb(51, 51, 51); color: rgb(51, 51, 51); font-style: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: auto; text-indent: 0px; text-transform: none; widows: auto; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration: none;">source:
type: mysql
name: MySQL Source
hostname: {secret_values.mysql-password}
tables: app_db..*
server-id: 18601-18604
metadata-column.include-list: op_ts
sink:
type: Kafka
name: Kafka Sink
properties.bootstrap.servers: ${secret_values.bootstraps-server}
properties.enable.idempotence: false</pre>
存儲(chǔ)時(shí)支持使用 debezium-json(默認(rèn)) 或 canal-json 格式,使用 debezium 或 canal 的歷史同步作業(yè)可以平滑切換為使用 Flink CDC YAML 進(jìn)行同步喷舀,下游的消費(fèi)者無需修改邏輯砍濒。以一條Update消息為例,debezium-json 和 canal-json 數(shù)據(jù)格式分別如下硫麻。
<pre class="md-fences md-end-block ty-contain-cm modeLoaded" spellcheck="false" lang="json" cid="n139" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; caret-color: rgb(51, 51, 51); color: rgb(51, 51, 51); font-style: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: auto; text-indent: 0px; text-transform: none; widows: auto; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration: none;">{
"before": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
},
"after": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
},
"op": "u",
"source": {
"db": null,
"table": "customers",
"ts_ms": 1728528674000
}
}</pre>
<pre class="md-fences md-end-block ty-contain-cm modeLoaded" spellcheck="false" lang="json" cid="n140" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; caret-color: rgb(51, 51, 51); color: rgb(51, 51, 51); font-style: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: auto; text-indent: 0px; text-transform: none; widows: auto; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration: none;">{
"old": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
}
],
"data": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
}
],
"type": "UPDATE",
"database": null,
"table": "customers",
"pkNames": [
"id"
],
"ts": 1728528674000
}</pre>
細(xì)粒度 Schema 變更
CDC YAML 作業(yè)支持同步數(shù)據(jù)源的 Schema 變更到目標(biāo)端爸邢,例如創(chuàng)建表、添加列拿愧、重命名列杠河、更改列類型、刪除列和刪除表等。但是下游目標(biāo)端可能無法支持全部類型的變更券敌,或者出于數(shù)據(jù)安全和權(quán)限的考慮唾戚,不希望將全部的變更同步到下游。比如希望保留全部歷史數(shù)據(jù)待诅,不希望執(zhí)行刪除和清空等操作颈走。
為了滿足更多的用戶場(chǎng)景,數(shù)據(jù)攝入YAML提供了多種變更策略:
LENIENT(默認(rèn)):按照固定的模式咱士,將部分類型的變更轉(zhuǎn)換后同步或跳過同步立由,確保 Schema Evolution 的向后兼容性。
EXCEPTION:不允許變更行為序厉,發(fā)生變更時(shí)作業(yè)拋出異常锐膜。
IGNORE:跳過全部變更。
EVOLVE:同步所有變更弛房,同步失敗作業(yè)拋出異常道盏。
TRY_EVOLVE:嘗試同步變更,目標(biāo)端不支持變更時(shí)不報(bào)錯(cuò)文捶。
如果不同的變更策略無法滿足需求荷逞,數(shù)據(jù)攝入YAML還提供了更細(xì)粒度的調(diào)控配置,在 Sink 模塊中設(shè)置 include.schema.changes 和 exclude.schema.changes 選項(xiàng)可以控制需要同步的和需要過濾的變更類型粹排。
如下作業(yè)使用 EVOLVE 模式种远,可以正常同步全部變更,但是會(huì)跳過同步刪除表顽耳,刪除列和清空表的操作坠敷。
<pre class="md-fences md-end-block ty-contain-cm modeLoaded" spellcheck="false" lang="yaml" cid="n157" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; caret-color: rgb(51, 51, 51); color: rgb(51, 51, 51); font-style: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: auto; text-indent: 0px; text-transform: none; widows: auto; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration: none;">source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test..*
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: {secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
exclude.schema.changes: [drop, truncate.table]
pipeline:
name: MySQL to Hologres yaml job
schema.change.behavior: EVOLVE</pre>
寬容模式同步
Schema 變更同步相比于數(shù)據(jù)同步可能耗時(shí)更多,因?yàn)樵诙嗖l(fā)情況下需要等全部數(shù)據(jù)都寫出后射富,才可以安全地進(jìn)行 Schema 變更膝迎,而且下游的目標(biāo)端可能無法支持重現(xiàn)全部的 Schema 變更。
為了作業(yè)能夠更寬容地處理這些變更胰耗,在 Hologres 目標(biāo)端支持了寬容模式同步限次。Hologres 不支持變更列的類型,在寬容模式下柴灯,CDC YAML 將多個(gè) MySQL 數(shù)據(jù)類型映射到更寬的 Hologres 類型卖漫,跳過不必要的類型變更事件,從而讓作業(yè)正常運(yùn)行弛槐,可以通過配置項(xiàng)sink.type-normalize-strategy
進(jìn)行更改懊亡。
例如,如下作業(yè)使用 ONLY_BIGINT_OR_TEXT 讓 MySQL 類型只對(duì)應(yīng)到 Hologres 的 int8 和 text 兩種類型乎串。如果 MySQL 某個(gè)列類型從 INT 改為 BIGINT 店枣,Hologres 將這兩種 MySQL 類型都對(duì)應(yīng)到 int8 類型速警,作業(yè)不會(huì)因?yàn)闊o法處理類型轉(zhuǎn)換而報(bào)錯(cuò)。
<pre class="md-fences md-end-block ty-contain-cm modeLoaded" spellcheck="false" lang="yaml" cid="n162" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; caret-color: rgb(51, 51, 51); color: rgb(51, 51, 51); font-style: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: auto; text-indent: 0px; text-transform: none; widows: auto; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration: none;">source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test..*
server-id: 8601-8604
sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: {secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT</pre>
新增同步表
在一些業(yè)務(wù)場(chǎng)景企業(yè)開發(fā)了新業(yè)務(wù)模塊(如會(huì)員系統(tǒng)鸯两、積分系統(tǒng))闷旧,需要新增數(shù)據(jù)庫表,并將其數(shù)據(jù)同步到現(xiàn)有的數(shù)據(jù)倉庫钧唐、數(shù)據(jù)湖或?qū)崟r(shí)計(jì)算平臺(tái)中忙灼。或者由于業(yè)務(wù)調(diào)整變化钝侠,作業(yè)啟動(dòng)時(shí)未同步的表需要同步到數(shù)據(jù)湖倉该园。
對(duì)于在運(yùn)行中的作業(yè),這些新表就是新增表帅韧。新增表存在兩種不同的場(chǎng)景里初,CDC YAML 對(duì)不同新增表場(chǎng)景需要使用不同的處理方式,不需要新增作業(yè)忽舟。
如果新增加的表是空表双妨,由于數(shù)據(jù)都是新插入的數(shù)據(jù),因此不需要同步歷史數(shù)據(jù)叮阅,可以在 mysql source 模塊上設(shè)置 scan.binlog.newly-added-table.enabled=true刁品。在這種場(chǎng)景下,被 CDC YAML 作業(yè)匹配的新創(chuàng)建的表會(huì)自動(dòng)同步到目標(biāo)端浩姥。
如果新增加的表是作業(yè)啟動(dòng)前存在的表挑随,客戶希望表里的歷史數(shù)據(jù)需要同步,可以在 mysql source 模塊上設(shè)置 scan.newly-added-table.enabled=true及刻,然后從 savepoint 重啟作業(yè)镀裤。
指定同步位點(diǎn)重跑數(shù)據(jù)
運(yùn)行中的 CDC YAML 作業(yè)可能因?yàn)橐恍╊A(yù)期外的錯(cuò)誤而退出竞阐,比如 Binlog 過期清理缴饭,無法解析的 Binlog 內(nèi)容,解析代碼的 Bug 等骆莹,這些錯(cuò)誤會(huì)導(dǎo)致作業(yè)無法從原有位置恢復(fù)颗搂。CDC YAML 支持使用指定位點(diǎn)啟動(dòng)作業(yè),通過修正部分?jǐn)?shù)據(jù) + 指定位點(diǎn)的方式幕垦,可以幫助作業(yè)繼續(xù)運(yùn)行丢氢。
阿里云 Flink CDC 企業(yè)級(jí)功能
阿里云實(shí)時(shí)計(jì)算 Flink 版在支持開源Flink CDC的所有功能外,還結(jié)合企業(yè)級(jí)客戶的需求和場(chǎng)景先改,提供了以下企業(yè)級(jí)特性疚察,幫助云上企業(yè)更好地完成數(shù)據(jù)實(shí)時(shí)化改造。
MySQL CDC 企業(yè)級(jí)性能優(yōu)化
MySQL CDC 消費(fèi) Binlog 是單并行度運(yùn)行的仇奶,消費(fèi)性能存在瓶頸貌嫡,阿里云實(shí)時(shí)計(jì)算 Flink 版數(shù)據(jù)攝入 YAML 對(duì)MySQL CDC 消費(fèi)性能進(jìn)行了大幅優(yōu)化:
Debezium Bump參數(shù)優(yōu)化:Debezium 讀取數(shù)據(jù)時(shí),一些參數(shù)可以適當(dāng)調(diào)整以獲取更好的性能。該方式對(duì)比開源 Flink CDC 可以提高11%的性能岛抄。
過濾無關(guān)表數(shù)據(jù):MySQL CDC 消費(fèi)整個(gè)實(shí)例的 Binlog别惦,跳過不匹配的表的數(shù)據(jù)可以加速解析。該方式提升的性能取決于無關(guān)表數(shù)據(jù)的占比夫椭。
并行解析 Binlog:Binlog 解析字節(jié)流時(shí)掸掸,可以從單并發(fā)優(yōu)化為多并發(fā)加速解析速度。該方式對(duì)比開源 Flink CDC 可以提高14%的性能蹭秋。
并行序列化:通過火焰圖發(fā)現(xiàn) CPU 在完成從 Event 到 SourceRecord 和從 SourceRecord 到 JSON 的序列化過程中耗時(shí)較多扰付,優(yōu)化為并行序列化并保序可以提高性能。該方式對(duì)比開源 Flink CDC 可以提高42%的性能仁讨。
結(jié)合以上 4 種優(yōu)化方式悯周,實(shí)時(shí)計(jì)算 Flink 版數(shù)據(jù)攝入 YAML 相比于社區(qū) Flink CDC 來說,如果 Binlog 只有單個(gè)表的數(shù)據(jù)陪竿,普適的性能會(huì)提升 80%左右禽翼;如果 Binlog 包含多個(gè)表的數(shù)據(jù)且 YAML 作業(yè)只需要同步部分表,則可以獲得 10 倍左右的性能提升族跛。
OSS 持久化 binlog 消費(fèi)支持
MySQL 的數(shù)據(jù)庫實(shí)例只有一份 Binlog闰挡,如果數(shù)據(jù)更新太快很可能導(dǎo)致消費(fèi)速度趕不上生產(chǎn)速度,從而 Binlog 日志被清理礁哄,無法從消費(fèi)失敗的位置指定位點(diǎn)重啟作業(yè)长酗。
阿里云 RDS MySQL 實(shí)例支持將 Binlog 同步到 OSS,MySQL CDC 可以使用這部分離線的日志啟動(dòng)作業(yè)桐绒。用戶使用 RDS MySQL 作為上游時(shí)夺脾,可以指定對(duì)應(yīng) OSS 配置,當(dāng)指定的時(shí)間戳或者 Binlog 位點(diǎn)對(duì)應(yīng)的文件保存在 OSS 時(shí)茉继,會(huì)自動(dòng)拉取 OSS 日志文件到 Flink 集群本地進(jìn)行讀取咧叭,當(dāng)指定的時(shí)間戳或者 Binlog 位點(diǎn)對(duì)應(yīng)的文件保存在數(shù)據(jù)庫本地時(shí),會(huì)自動(dòng)切換使用數(shù)據(jù)庫連接讀取烁竭,徹底解決 Binlog 日志過期導(dǎo)致的作業(yè)重跑或數(shù)據(jù)不一致問題菲茬。
更豐富的監(jiān)控指標(biāo)
為了便于判斷作業(yè)運(yùn)行的階段和狀態(tài),商業(yè)版提供更豐富的監(jiān)控指標(biāo)派撕。
當(dāng)前作業(yè)處于全量或增量階段
全量階段未處理/已處理的表數(shù)量
全量階段未處理/已處理的分片數(shù)量
最新一條數(shù)據(jù)的時(shí)間戳
讀取數(shù)據(jù)的延遲
全量階段的消息條數(shù)/全量階段每個(gè)表的消息條數(shù)
消息總條數(shù)/每個(gè)表的消息條數(shù)
十分鐘在阿里云免費(fèi)實(shí)現(xiàn)一個(gè) CDC YAML 作業(yè)
接下來我們使用阿里云免費(fèi)試用 [3] 來快速測(cè)試一下 CDC YAML 作業(yè)的功能婉弹,完成一個(gè)簡(jiǎn)單的 MySQL 到 Paimon 的整庫同步作業(yè)。
資源準(zhǔn)備
開始測(cè)試前需要準(zhǔn)備好一個(gè) RDS MySQL 實(shí)例终吼,一個(gè)實(shí)時(shí)計(jì)算 Flink 版環(huán)境镀赌,一個(gè) OSS 對(duì)象存儲(chǔ)。
OSS 對(duì)象存儲(chǔ)
OSS 對(duì)象存儲(chǔ)用作數(shù)據(jù)湖存儲(chǔ)际跪,并且用在實(shí)時(shí)計(jì)算 Flink 版的 checkpoint 存儲(chǔ)商佛。
在免費(fèi)試用搜索oss蛙粘,點(diǎn)擊立即試用對(duì)象存儲(chǔ) OSS。
試用成功后威彰,在 OSS 控制臺(tái)創(chuàng)建一個(gè)杭州地區(qū)的Bucket出牧。創(chuàng)建成功后,在新建的 Bucket 的文件列表中新建目錄 warehouse歇盼,用于存儲(chǔ)數(shù)據(jù)湖數(shù)據(jù)舔痕。
RDS MySQL 實(shí)例
RDS MySQL 作為測(cè)試的 MySQL 數(shù)據(jù)源。
在免費(fèi)試用搜索RDS MySQL豹缀,點(diǎn)擊立即試用云數(shù)據(jù)庫 RDS MySQL伯复。
試用成功后,在 RDS 控制臺(tái)創(chuàng)建一個(gè)杭州地區(qū)的RDS MySQL集群邢笙。
在實(shí)例列表點(diǎn)擊進(jìn)入剛創(chuàng)建的實(shí)例啸如,在賬號(hào)管理創(chuàng)建一個(gè)高權(quán)限賬號(hào)。
賬號(hào)創(chuàng)建完成后氮惯,在數(shù)據(jù)庫管理創(chuàng)建一個(gè)數(shù)據(jù)庫用于測(cè)試叮雳。
點(diǎn)擊登錄數(shù)據(jù)庫,使用用戶名和密碼登錄妇汗,然后在數(shù)據(jù)庫下創(chuàng)建一些表并插入測(cè)試數(shù)據(jù)帘不。此處創(chuàng)建了products 和 users 兩張表,每個(gè)表各生成 5 條測(cè)試數(shù)據(jù)杨箭。
在白名單與安全組點(diǎn)擊全部開放枝哄,打通網(wǎng)絡(luò)連接误阻。此處因?yàn)闇y(cè)試目的使用了全部開放纺荧,在生產(chǎn)環(huán)境請(qǐng)合理配置白名單叭披。
實(shí)時(shí)計(jì)算 Flink 版
在免費(fèi)試用搜索 flink,點(diǎn)擊立即試用實(shí)時(shí)計(jì)算 Flink 版慈参。
完成RAM授權(quán)并領(lǐng)取資源抵扣包后呛牲,在杭州地區(qū)與RDS相同的可用區(qū)創(chuàng)建實(shí)時(shí)計(jì)算 Flink 版實(shí)例。
創(chuàng)建 AccessKey
Paimon 訪問 OSS 時(shí)需要使用 AccessKey懂牧,參照創(chuàng)建AccessKey文檔[4]創(chuàng)建 AccessKey侈净。
CDC YAML 整庫同步 Paimon
當(dāng)所需資源和測(cè)試數(shù)據(jù)都準(zhǔn)備好后,接下來讓我們?cè)趯?shí)時(shí)計(jì)算 Flink 版本上快速開發(fā)整庫同步作業(yè)僧凤。
為了數(shù)據(jù)安全,可以將需要使用的信息在變量管理用密文保存元扔。
保存好變量后躯保,在數(shù)據(jù)攝入中創(chuàng)建一個(gè)整庫同步作業(yè)并部署上線。
部署成功后澎语,在作業(yè)運(yùn)維點(diǎn)擊啟動(dòng)按鈕啟動(dòng)作業(yè)途事。
作業(yè)啟動(dòng)后验懊,可以在監(jiān)控告警的數(shù)據(jù)攝入看到 CDC YAML 同步狀態(tài)。如下監(jiān)控可以看出測(cè)試作業(yè)已經(jīng)進(jìn)入了增量階段尸变,一共同步了2張表义图,2個(gè)分片,每個(gè)表全量分別同步了5條數(shù)據(jù)召烂。
為了查看測(cè)試數(shù)據(jù)碱工,可以使用數(shù)據(jù)開發(fā) ETL 的調(diào)試功能。
首先在元數(shù)據(jù)管理創(chuàng)建對(duì)應(yīng)的Paimon Catalog奏夫。
創(chuàng)建一個(gè) Session 集群用于運(yùn)行查看數(shù)據(jù)結(jié)果的 SQL怕篷,注意選擇數(shù)據(jù)攝入支持的引擎版本。
在數(shù)據(jù)開發(fā) ETL 中酗昼,創(chuàng)建一個(gè) select SQL 作業(yè)查看數(shù)據(jù)廊谓,點(diǎn)擊調(diào)試在 Session 集群運(yùn)行,可以在控制臺(tái)查看數(shù)據(jù)結(jié)果麻削。
至此一個(gè)完整的業(yè)務(wù)案例就已經(jīng)實(shí)現(xiàn)了蒸痹,接下來可以自由地在 RDS MySQL 數(shù)據(jù)庫側(cè)操作相應(yīng)的數(shù)據(jù)修改,重新執(zhí)行 select 命令查看數(shù)據(jù)時(shí)呛哟,變能夠觀察到通過YAML作業(yè)實(shí)時(shí)同步到Paimon中的數(shù)據(jù)了电抚。
相關(guān)鏈接
[1] https://nightlies.apache.org/flink/flink-cdc-docs-stable/