CDC?YAML?在阿里云的最佳實(shí)踐

摘要:本文投稿自阿里云開源大數(shù)據(jù)平臺(tái)數(shù)據(jù)通道團(tuán)隊(duì)奴艾,主要介紹了 Flink CDC YAML 在實(shí)時(shí)計(jì)算Flink版的最佳實(shí)踐务漩。內(nèi)容分為以下五個(gè)部分:

  1. CDC YAML 簡(jiǎn)介
  1. CDC YAML 核心能力
  1. CDC YAML 應(yīng)用場(chǎng)景
  1. 阿里云 Flink CDC 企業(yè)級(jí)功能
  1. 十分鐘在阿里云免費(fèi)實(shí)現(xiàn)一個(gè) CDC YAML 作業(yè)

CDC YAML 簡(jiǎn)介

CDC YAML 是 Flink CDC [1] 提供的簡(jiǎn)單易用的數(shù)據(jù)集成 API况木,用于幫助用戶快速構(gòu)建功能強(qiáng)大的數(shù)據(jù)同步鏈路咐蚯,實(shí)時(shí)地同步業(yè)務(wù)數(shù)據(jù)庫中的數(shù)據(jù)變更和表結(jié)構(gòu)變更實(shí)時(shí)同步到數(shù)據(jù)倉庫,數(shù)據(jù)湖以及其他下游系統(tǒng)苛让。CDC YAML 上手門檻較低沟蔑,即使沒有研發(fā)背景和Flink基礎(chǔ)湿诊,用戶也可以較快地完成數(shù)據(jù)的同步和ETL加工,快速完成數(shù)據(jù)的實(shí)時(shí)入湖入倉瘦材,加速數(shù)據(jù)分析效率厅须。

阿里云實(shí)時(shí)計(jì)算 Flink 版基于 Flink CDC 提供了數(shù)據(jù)攝入 CDC YAML 開發(fā)[2],通過開發(fā) YAML 作業(yè)的方式有效地實(shí)現(xiàn)將數(shù)據(jù)從源端同步到目標(biāo)端的數(shù)據(jù)攝入工作食棕,幫助用戶在云上高效完成數(shù)據(jù)入湖入倉朗和。

CDC YAML 核心能力

CDC YAML 主要用于同步數(shù)據(jù)庫變更到其他系統(tǒng),同步前支持對(duì)數(shù)據(jù)進(jìn)行簡(jiǎn)單處理和清洗宣蠕,幫助用戶完成秒級(jí)的數(shù)據(jù)同步工作例隆,構(gòu)建入湖入倉作業(yè)甥捺,主要的核心能力有以下幾個(gè)方面:

  1. 端到端 Data Pipeline:支持秒級(jí)同步數(shù)據(jù)變更和結(jié)構(gòu)變更到其他系統(tǒng)抢蚀,用戶可以快速構(gòu)建自己的數(shù)據(jù)湖和數(shù)據(jù)倉庫,為數(shù)據(jù)分析提供基礎(chǔ)镰禾。

  2. 細(xì)粒度 Schema Evolution:出于數(shù)據(jù)安全的考慮皿曲,部分高危操作(如刪除表,清空數(shù)據(jù)等)用戶不希望同步到目標(biāo)端吴侦。CDC YAML 提供細(xì)粒度 Schema 變更的能力屋休,幫助用戶限制可同步的數(shù)據(jù)變更類型。

  3. 全增量一體化 CDC 讀缺溉汀:CDC YAML提供了全量和增量的一體化讀取能力劫樟,全量數(shù)據(jù)讀取完成后自動(dòng)切換增量讀取,無需用戶操作织堂。

  4. 豐富的 Transform 支持 :支持對(duì)數(shù)據(jù)字段進(jìn)行處理后同步到下游叠艳,如添加額外計(jì)算列,添加元數(shù)據(jù)易阳、只同步某些列附较、重新指定主鍵或分區(qū)鍵等場(chǎng)景。CDC YAML 內(nèi)置了豐富的函數(shù)潦俺,用戶也可以自行開發(fā)UDF拒课,兼容 Flink 的 UDF。支持對(duì)數(shù)據(jù)源數(shù)據(jù)進(jìn)行過濾事示,跳過不需要的數(shù)據(jù)早像,完成對(duì)無用數(shù)據(jù)和臟數(shù)據(jù)的清洗工作。

  5. 靈活的 Route 策略控制:支持自定義數(shù)據(jù)源的表同步到目標(biāo)端表的映射關(guān)系肖爵,支持一對(duì)一扎酷、一對(duì)多和多對(duì)一的多種映射關(guān)系,幫助用戶靈活指定目標(biāo)端表名遏匆,支持分庫分表合并的場(chǎng)景法挨。

  6. 完善的 作業(yè) Metric 支持:為了便于判斷作業(yè)運(yùn)行的階段和狀態(tài)谁榜,CDC YAML 提供了豐富的指標(biāo)。如全量階段未處理/已處理的表數(shù)量凡纳,全量階段未處理/已處理的分片數(shù)量窃植、最新一條數(shù)據(jù)的時(shí)間戳等。

阿里云實(shí)時(shí)計(jì)算 Flink 結(jié)合用戶需求荐糜,YAML 作業(yè)支持了更多的上下游巷怜,支持同步到常見的數(shù)據(jù)湖和數(shù)據(jù)倉庫,已經(jīng)支持的上下游如下暴氏。

連接器 支持類型
Source Sink
MySQL ×
消息隊(duì)列Kafka
實(shí)時(shí)數(shù)倉Hologres ×
Upsert Kafka ×
Print ×
StarRocks ×
流式數(shù)據(jù)湖倉Paimon ×

YAML 作業(yè) 與 SQL / DataStream 作業(yè)對(duì)比

Flink 提供了兩種級(jí)別的作業(yè)開發(fā)方式:SQL 和 DataStream延塑,下面會(huì)比較一下相比于 SQL 和 DataStream來說 CDC YAML 開發(fā)有什么優(yōu)勢(shì)。

CDC YAML 作業(yè)相比 SQL 主要有以下一些優(yōu)勢(shì):

數(shù)據(jù)攝入YAML SQL
自動(dòng)識(shí)別 Schema答渔,支持整庫同步 需要人工寫 Create Table 和 Insert 語句
支持細(xì)粒度 Schema 變更 不支持 Schema 變更
支持原始 Changelog 同步 破壞原始 Changelog 結(jié)構(gòu)
支持讀寫多個(gè)表 讀寫單個(gè)表

CDC YAML 作業(yè)相比 DataStream 作業(yè)的優(yōu)勢(shì)如下:

數(shù)據(jù)攝入YAML DataStream
為各級(jí)別用戶設(shè)計(jì)关带,不只是專家 需要熟悉Java和分布式系統(tǒng)
隱藏底層細(xì)節(jié),便于開發(fā) 需要熟悉Flink框架
YAML格式容易理解和學(xué)習(xí) 需要了解Maven等工具管理相關(guān)依賴
已有作業(yè)方便復(fù)用 難以復(fù)用已有代碼

CDC YAML 應(yīng)用場(chǎng)景

CDC YAML 能夠支持用戶數(shù)據(jù)同步多種應(yīng)用場(chǎng)景沼撕,下面簡(jiǎn)單介紹一些常見的使用場(chǎng)景宋雏,以及應(yīng)該如何使用 CDC YAML 解決這些問題。

整庫同步务豺,構(gòu)建數(shù)據(jù)湖倉

整庫同步是數(shù)據(jù)同步最常見的一種使用場(chǎng)景磨总,將存在數(shù)據(jù)庫的數(shù)據(jù)同步到數(shù)據(jù)湖或數(shù)據(jù)倉庫中,為后續(xù)的數(shù)據(jù)分析提供基礎(chǔ)笼沥。

如下的數(shù)據(jù)攝入YAML作業(yè)可以完成同步整個(gè)app_db數(shù)據(jù)庫到Paimon的工作蚪燕,快速完成數(shù)據(jù)入湖的同步工作。

<pre class="md-fences md-end-block ty-contain-cm modeLoaded" spellcheck="false" lang="yaml" cid="n124" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; caret-color: rgb(51, 51, 51); color: rgb(51, 51, 51); font-style: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: auto; text-indent: 0px; text-transform: none; widows: auto; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration: none;">source:
type: mysql
name: MySQL Source
hostname: {secret_values.mysql-hostname} port: 3306 username: flink password:{secret_values.mysql-password}
tables: app_db..*
server-id: 18601-18604

sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: oss://test-bucket/warehouse
catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
catalog.properties.fs.oss.accessKeyId: {secret_values.test_ak} catalog.properties.fs.oss.accessKeySecret:{secret_values.test_sk}</pre>

在整庫同步中奔浅,有時(shí)需要重新定義同步到目標(biāo)端的表名馆纳,防止發(fā)生沖突〕送梗或者需要做一些簡(jiǎn)單的數(shù)據(jù)處理和數(shù)據(jù)過濾工作厕诡,這時(shí)需要結(jié)合 transform 模塊和 route 模塊完成,transform 模塊負(fù)責(zé)數(shù)據(jù)處理工作营勤,route 模塊負(fù)責(zé)數(shù)據(jù)分發(fā)工作灵嫌。

例如在如下的整庫同步作業(yè)中,為 app_db.customers 表添加額外的計(jì)算列 upper 和數(shù)據(jù)庫名這個(gè)元數(shù)據(jù)列db葛作,同時(shí)在目標(biāo)端為三張表名添加版本后綴寿羞。

<pre class="md-fences md-end-block ty-contain-cm modeLoaded" spellcheck="false" lang="yaml" cid="n127" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; caret-color: rgb(51, 51, 51); color: rgb(51, 51, 51); font-style: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: auto; text-indent: 0px; text-transform: none; widows: auto; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration: none;">source:
type: mysql
name: MySQL Source
hostname: {secret_values.mysql-hostname} port: 3306 username: flink password:{secret_values.mysql-password}
tables: app_db..*
server-id: 18601-18604

transform:

  • source-table: app_db.customers
    projection: *, UPPER(name) AS upper, schema_name AS db

route:

  • source-table: app_db.customers
    sink-table: app_db.customers_v1
  • source-table: app_db.products
    sink-table: app_db.products_v0
  • source-table: app_db.orders
    sink-table: app_db.orders_v0

sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: oss://test-bucket/warehouse
catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
catalog.properties.fs.oss.accessKeyId: {secret_values.test_ak} catalog.properties.fs.oss.accessKeySecret:{secret_values.test_sk}</pre>

CDC YAML提供了豐富的內(nèi)置函數(shù),還支持用戶自定義函數(shù)來完成復(fù)雜的數(shù)據(jù)處理赂蠢。CDC YAML 對(duì)Flink SQL 的自定義函數(shù)進(jìn)行了兼容绪穆,大部分的 Flink 自定義函數(shù)可以在數(shù)據(jù)攝入YAML中直接使用。

分庫分表合并

在高并發(fā)和大數(shù)據(jù)量場(chǎng)景下,用戶可能選擇將一個(gè)表拆分為多個(gè)庫的多張表存儲(chǔ)數(shù)據(jù)玖院,對(duì)于分庫分表在分析數(shù)據(jù)前菠红,希望將數(shù)據(jù)合并為數(shù)據(jù)湖倉中的一張表。

假設(shè)app_db數(shù)據(jù)庫中只有customers_v0难菌,customers_v1和customers_v2三張表试溯,如下 CDC YAML 作業(yè)可以將這三張分表合并為一張表customers,完成分庫分表的數(shù)據(jù)同步郊酒。

<pre class="md-fences md-end-block ty-contain-cm modeLoaded" spellcheck="false" lang="yaml" cid="n132" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; caret-color: rgb(51, 51, 51); color: rgb(51, 51, 51); font-style: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: auto; text-indent: 0px; text-transform: none; widows: auto; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration: none;">source:
type: mysql
name: MySQL Source
hostname: {secret_values.mysql-hostname} port: 3306 username: flink password:{secret_values.mysql-password}
tables: app_db.customers.*
server-id: 18601-18604

route:

  • source-table: app_db.customers.*
    sink-table: app_db.customers

sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: oss://test-bucket/warehouse
catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
catalog.properties.fs.oss.accessKeyId: {secret_values.test_ak} catalog.properties.fs.oss.accessKeySecret:{secret_values.test_sk}</pre>

原始 Binlog 數(shù)據(jù)同步 Kafka遇绞,對(duì)接已有系統(tǒng)

用戶除了整庫整表同步,有些場(chǎng)景下需要獲取到詳細(xì)的 Changelog 變更歷史燎窘,而不是變更后的數(shù)據(jù)摹闽。將 Binlog 同步到 Kafka,結(jié)合分布式消息隊(duì)列 Kafka 可以提高數(shù)據(jù)消費(fèi)速度褐健,解決消費(fèi)同一個(gè) Binlog 導(dǎo)致的數(shù)據(jù)瓶頸付鹿。后續(xù)能夠使用 Kafka 里的 Binlog 歷史, 進(jìn)行數(shù)據(jù)回放和數(shù)據(jù)審計(jì)工作铝量,或者消費(fèi) Kafka 實(shí)時(shí)監(jiān)控?cái)?shù)據(jù)變更歷史倘屹,從而觸發(fā)通知和報(bào)警银亲。

由于 Flink SQL 作業(yè)里傳遞的數(shù)據(jù)結(jié)構(gòu)是 RowData慢叨,會(huì)將一條 Update 操作拆分為 Update before 和 Update after 兩條消息發(fā)送,破壞了 Changelog 原有的結(jié)構(gòu)务蝠,無法完成原始 Binlog 數(shù)據(jù)同步拍谐。CDC YAML 使用了 SchemaChangeEvent 和 DataChangeEvent 傳遞數(shù)據(jù),可以完整保留 Changelog馏段,幫助用戶完成同步原始 Binlog 數(shù)據(jù)到 Kafka 的工作轩拨。

如下作業(yè)可以將數(shù)據(jù)庫 app_db 的變更歷史同步到 Kafka,app_db 數(shù)據(jù)庫下的表 customers院喜、products 和 shipments 的變更會(huì)各自寫入對(duì)應(yīng)的 topic 中亡蓉。

<pre class="md-fences md-end-block ty-contain-cm modeLoaded" spellcheck="false" lang="yaml" cid="n137" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; caret-color: rgb(51, 51, 51); color: rgb(51, 51, 51); font-style: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: auto; text-indent: 0px; text-transform: none; widows: auto; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration: none;">source:
type: mysql
name: MySQL Source
hostname: {secret_values.mysql-hostname} port: 3306 username: flink password:{secret_values.mysql-password}
tables: app_db..*
server-id: 18601-18604
metadata-column.include-list: op_ts

sink:
type: Kafka
name: Kafka Sink
properties.bootstrap.servers: ${secret_values.bootstraps-server}
properties.enable.idempotence: false</pre>

存儲(chǔ)時(shí)支持使用 debezium-json(默認(rèn)) 或 canal-json 格式,使用 debezium 或 canal 的歷史同步作業(yè)可以平滑切換為使用 Flink CDC YAML 進(jìn)行同步喷舀,下游的消費(fèi)者無需修改邏輯砍濒。以一條Update消息為例,debezium-json 和 canal-json 數(shù)據(jù)格式分別如下硫麻。

<pre class="md-fences md-end-block ty-contain-cm modeLoaded" spellcheck="false" lang="json" cid="n139" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; caret-color: rgb(51, 51, 51); color: rgb(51, 51, 51); font-style: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: auto; text-indent: 0px; text-transform: none; widows: auto; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration: none;">{
"before": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
},
"after": {
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
},
"op": "u",
"source": {
"db": null,
"table": "customers",
"ts_ms": 1728528674000
}
}</pre>

<pre class="md-fences md-end-block ty-contain-cm modeLoaded" spellcheck="false" lang="json" cid="n140" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; caret-color: rgb(51, 51, 51); color: rgb(51, 51, 51); font-style: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: auto; text-indent: 0px; text-transform: none; widows: auto; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration: none;">{
"old": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "2222",
"age": 12
}
],
"data": [
{
"id": 4,
"name": "John",
"address": "New York",
"phone_number": "1234",
"age": 12
}
],
"type": "UPDATE",
"database": null,
"table": "customers",
"pkNames": [
"id"
],
"ts": 1728528674000
}</pre>

細(xì)粒度 Schema 變更

CDC YAML 作業(yè)支持同步數(shù)據(jù)源的 Schema 變更到目標(biāo)端爸邢,例如創(chuàng)建表、添加列拿愧、重命名列杠河、更改列類型、刪除列和刪除表等。但是下游目標(biāo)端可能無法支持全部類型的變更券敌,或者出于數(shù)據(jù)安全和權(quán)限的考慮唾戚,不希望將全部的變更同步到下游。比如希望保留全部歷史數(shù)據(jù)待诅,不希望執(zhí)行刪除和清空等操作颈走。

為了滿足更多的用戶場(chǎng)景,數(shù)據(jù)攝入YAML提供了多種變更策略:

  • LENIENT(默認(rèn)):按照固定的模式咱士,將部分類型的變更轉(zhuǎn)換后同步或跳過同步立由,確保 Schema Evolution 的向后兼容性。

  • EXCEPTION:不允許變更行為序厉,發(fā)生變更時(shí)作業(yè)拋出異常锐膜。

  • IGNORE:跳過全部變更。

  • EVOLVE:同步所有變更弛房,同步失敗作業(yè)拋出異常道盏。

  • TRY_EVOLVE:嘗試同步變更,目標(biāo)端不支持變更時(shí)不報(bào)錯(cuò)文捶。

如果不同的變更策略無法滿足需求荷逞,數(shù)據(jù)攝入YAML還提供了更細(xì)粒度的調(diào)控配置,在 Sink 模塊中設(shè)置 include.schema.changes 和 exclude.schema.changes 選項(xiàng)可以控制需要同步的和需要過濾的變更類型粹排。

如下作業(yè)使用 EVOLVE 模式种远,可以正常同步全部變更,但是會(huì)跳過同步刪除表顽耳,刪除列和清空表的操作坠敷。

<pre class="md-fences md-end-block ty-contain-cm modeLoaded" spellcheck="false" lang="yaml" cid="n157" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; caret-color: rgb(51, 51, 51); color: rgb(51, 51, 51); font-style: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: auto; text-indent: 0px; text-transform: none; widows: auto; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration: none;">source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test..*
server-id: 8601-8604

sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: {secret_values.holo-username} password:{secret_values.holo-password}
sink.type-normalize-strategy: BROADEN
exclude.schema.changes: [drop, truncate.table]

pipeline:
name: MySQL to Hologres yaml job
schema.change.behavior: EVOLVE</pre>

寬容模式同步

Schema 變更同步相比于數(shù)據(jù)同步可能耗時(shí)更多,因?yàn)樵诙嗖l(fā)情況下需要等全部數(shù)據(jù)都寫出后射富,才可以安全地進(jìn)行 Schema 變更膝迎,而且下游的目標(biāo)端可能無法支持重現(xiàn)全部的 Schema 變更。

為了作業(yè)能夠更寬容地處理這些變更胰耗,在 Hologres 目標(biāo)端支持了寬容模式同步限次。Hologres 不支持變更列的類型,在寬容模式下柴灯,CDC YAML 將多個(gè) MySQL 數(shù)據(jù)類型映射到更寬的 Hologres 類型卖漫,跳過不必要的類型變更事件,從而讓作業(yè)正常運(yùn)行弛槐,可以通過配置項(xiàng)sink.type-normalize-strategy進(jìn)行更改懊亡。

例如,如下作業(yè)使用 ONLY_BIGINT_OR_TEXT 讓 MySQL 類型只對(duì)應(yīng)到 Hologres 的 int8 和 text 兩種類型乎串。如果 MySQL 某個(gè)列類型從 INT 改為 BIGINT 店枣,Hologres 將這兩種 MySQL 類型都對(duì)應(yīng)到 int8 類型速警,作業(yè)不會(huì)因?yàn)闊o法處理類型轉(zhuǎn)換而報(bào)錯(cuò)。

<pre class="md-fences md-end-block ty-contain-cm modeLoaded" spellcheck="false" lang="yaml" cid="n162" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit; caret-color: rgb(51, 51, 51); color: rgb(51, 51, 51); font-style: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: auto; text-indent: 0px; text-transform: none; widows: auto; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration: none;">source:
type: mysql
name: MySQL Source
hostname: localhost
port: 3306
username: username
password: password
tables: holo_test..*
server-id: 8601-8604

sink:
type: hologres
name: Hologres Sink
endpoint: ****.hologres.aliyuncs.com:80
dbname: cdcyaml_test
username: {secret_values.holo-username} password:{secret_values.holo-password}
sink.type-normalize-strategy: ONLY_BIGINT_OR_TEXT</pre>

新增同步表

在一些業(yè)務(wù)場(chǎng)景企業(yè)開發(fā)了新業(yè)務(wù)模塊(如會(huì)員系統(tǒng)鸯两、積分系統(tǒng))闷旧,需要新增數(shù)據(jù)庫表,并將其數(shù)據(jù)同步到現(xiàn)有的數(shù)據(jù)倉庫钧唐、數(shù)據(jù)湖或?qū)崟r(shí)計(jì)算平臺(tái)中忙灼。或者由于業(yè)務(wù)調(diào)整變化钝侠,作業(yè)啟動(dòng)時(shí)未同步的表需要同步到數(shù)據(jù)湖倉该园。

對(duì)于在運(yùn)行中的作業(yè),這些新表就是新增表帅韧。新增表存在兩種不同的場(chǎng)景里初,CDC YAML 對(duì)不同新增表場(chǎng)景需要使用不同的處理方式,不需要新增作業(yè)忽舟。

  • 如果新增加的表是空表双妨,由于數(shù)據(jù)都是新插入的數(shù)據(jù),因此不需要同步歷史數(shù)據(jù)叮阅,可以在 mysql source 模塊上設(shè)置 scan.binlog.newly-added-table.enabled=true刁品。在這種場(chǎng)景下,被 CDC YAML 作業(yè)匹配的新創(chuàng)建的表會(huì)自動(dòng)同步到目標(biāo)端浩姥。

  • 如果新增加的表是作業(yè)啟動(dòng)前存在的表挑随,客戶希望表里的歷史數(shù)據(jù)需要同步,可以在 mysql source 模塊上設(shè)置 scan.newly-added-table.enabled=true及刻,然后從 savepoint 重啟作業(yè)镀裤。

指定同步位點(diǎn)重跑數(shù)據(jù)

運(yùn)行中的 CDC YAML 作業(yè)可能因?yàn)橐恍╊A(yù)期外的錯(cuò)誤而退出竞阐,比如 Binlog 過期清理缴饭,無法解析的 Binlog 內(nèi)容,解析代碼的 Bug 等骆莹,這些錯(cuò)誤會(huì)導(dǎo)致作業(yè)無法從原有位置恢復(fù)颗搂。CDC YAML 支持使用指定位點(diǎn)啟動(dòng)作業(yè),通過修正部分?jǐn)?shù)據(jù) + 指定位點(diǎn)的方式幕垦,可以幫助作業(yè)繼續(xù)運(yùn)行丢氢。

阿里云 Flink CDC 企業(yè)級(jí)功能

阿里云實(shí)時(shí)計(jì)算 Flink 版在支持開源Flink CDC的所有功能外,還結(jié)合企業(yè)級(jí)客戶的需求和場(chǎng)景先改,提供了以下企業(yè)級(jí)特性疚察,幫助云上企業(yè)更好地完成數(shù)據(jù)實(shí)時(shí)化改造。

MySQL CDC 企業(yè)級(jí)性能優(yōu)化

MySQL CDC 消費(fèi) Binlog 是單并行度運(yùn)行的仇奶,消費(fèi)性能存在瓶頸貌嫡,阿里云實(shí)時(shí)計(jì)算 Flink 版數(shù)據(jù)攝入 YAML 對(duì)MySQL CDC 消費(fèi)性能進(jìn)行了大幅優(yōu)化:

  1. Debezium Bump參數(shù)優(yōu)化:Debezium 讀取數(shù)據(jù)時(shí),一些參數(shù)可以適當(dāng)調(diào)整以獲取更好的性能。該方式對(duì)比開源 Flink CDC 可以提高11%的性能岛抄。

  2. 過濾無關(guān)表數(shù)據(jù):MySQL CDC 消費(fèi)整個(gè)實(shí)例的 Binlog别惦,跳過不匹配的表的數(shù)據(jù)可以加速解析。該方式提升的性能取決于無關(guān)表數(shù)據(jù)的占比夫椭。

  3. 并行解析 Binlog:Binlog 解析字節(jié)流時(shí)掸掸,可以從單并發(fā)優(yōu)化為多并發(fā)加速解析速度。該方式對(duì)比開源 Flink CDC 可以提高14%的性能蹭秋。

  4. 并行序列化:通過火焰圖發(fā)現(xiàn) CPU 在完成從 Event 到 SourceRecord 和從 SourceRecord 到 JSON 的序列化過程中耗時(shí)較多扰付,優(yōu)化為并行序列化并保序可以提高性能。該方式對(duì)比開源 Flink CDC 可以提高42%的性能仁讨。

結(jié)合以上 4 種優(yōu)化方式悯周,實(shí)時(shí)計(jì)算 Flink 版數(shù)據(jù)攝入 YAML 相比于社區(qū) Flink CDC 來說,如果 Binlog 只有單個(gè)表的數(shù)據(jù)陪竿,普適的性能會(huì)提升 80%左右禽翼;如果 Binlog 包含多個(gè)表的數(shù)據(jù)且 YAML 作業(yè)只需要同步部分表,則可以獲得 10 倍左右的性能提升族跛。

OSS 持久化 binlog 消費(fèi)支持

MySQL 的數(shù)據(jù)庫實(shí)例只有一份 Binlog闰挡,如果數(shù)據(jù)更新太快很可能導(dǎo)致消費(fèi)速度趕不上生產(chǎn)速度,從而 Binlog 日志被清理礁哄,無法從消費(fèi)失敗的位置指定位點(diǎn)重啟作業(yè)长酗。

阿里云 RDS MySQL 實(shí)例支持將 Binlog 同步到 OSS,MySQL CDC 可以使用這部分離線的日志啟動(dòng)作業(yè)桐绒。用戶使用 RDS MySQL 作為上游時(shí)夺脾,可以指定對(duì)應(yīng) OSS 配置,當(dāng)指定的時(shí)間戳或者 Binlog 位點(diǎn)對(duì)應(yīng)的文件保存在 OSS 時(shí)茉继,會(huì)自動(dòng)拉取 OSS 日志文件到 Flink 集群本地進(jìn)行讀取咧叭,當(dāng)指定的時(shí)間戳或者 Binlog 位點(diǎn)對(duì)應(yīng)的文件保存在數(shù)據(jù)庫本地時(shí),會(huì)自動(dòng)切換使用數(shù)據(jù)庫連接讀取烁竭,徹底解決 Binlog 日志過期導(dǎo)致的作業(yè)重跑或數(shù)據(jù)不一致問題菲茬。

更豐富的監(jiān)控指標(biāo)

為了便于判斷作業(yè)運(yùn)行的階段和狀態(tài),商業(yè)版提供更豐富的監(jiān)控指標(biāo)派撕。

  • 當(dāng)前作業(yè)處于全量或增量階段

  • 全量階段未處理/已處理的表數(shù)量

  • 全量階段未處理/已處理的分片數(shù)量

  • 最新一條數(shù)據(jù)的時(shí)間戳

  • 讀取數(shù)據(jù)的延遲

  • 全量階段的消息條數(shù)/全量階段每個(gè)表的消息條數(shù)

  • 消息總條數(shù)/每個(gè)表的消息條數(shù)

十分鐘在阿里云免費(fèi)實(shí)現(xiàn)一個(gè) CDC YAML 作業(yè)

接下來我們使用阿里云免費(fèi)試用 [3] 來快速測(cè)試一下 CDC YAML 作業(yè)的功能婉弹,完成一個(gè)簡(jiǎn)單的 MySQL 到 Paimon 的整庫同步作業(yè)。

資源準(zhǔn)備

開始測(cè)試前需要準(zhǔn)備好一個(gè) RDS MySQL 實(shí)例终吼,一個(gè)實(shí)時(shí)計(jì)算 Flink 版環(huán)境镀赌,一個(gè) OSS 對(duì)象存儲(chǔ)。

OSS 對(duì)象存儲(chǔ)

OSS 對(duì)象存儲(chǔ)用作數(shù)據(jù)湖存儲(chǔ)际跪,并且用在實(shí)時(shí)計(jì)算 Flink 版的 checkpoint 存儲(chǔ)商佛。

在免費(fèi)試用搜索oss蛙粘,點(diǎn)擊立即試用對(duì)象存儲(chǔ) OSS。

試用成功后威彰,在 OSS 控制臺(tái)創(chuàng)建一個(gè)杭州地區(qū)的Bucket出牧。創(chuàng)建成功后,在新建的 Bucket 的文件列表中新建目錄 warehouse歇盼,用于存儲(chǔ)數(shù)據(jù)湖數(shù)據(jù)舔痕。

RDS MySQL 實(shí)例

RDS MySQL 作為測(cè)試的 MySQL 數(shù)據(jù)源。

在免費(fèi)試用搜索RDS MySQL豹缀,點(diǎn)擊立即試用云數(shù)據(jù)庫 RDS MySQL伯复。

試用成功后,在 RDS 控制臺(tái)創(chuàng)建一個(gè)杭州地區(qū)的RDS MySQL集群邢笙。

在實(shí)例列表點(diǎn)擊進(jìn)入剛創(chuàng)建的實(shí)例啸如,在賬號(hào)管理創(chuàng)建一個(gè)高權(quán)限賬號(hào)。

賬號(hào)創(chuàng)建完成后氮惯,在數(shù)據(jù)庫管理創(chuàng)建一個(gè)數(shù)據(jù)庫用于測(cè)試叮雳。

點(diǎn)擊登錄數(shù)據(jù)庫,使用用戶名和密碼登錄妇汗,然后在數(shù)據(jù)庫下創(chuàng)建一些表并插入測(cè)試數(shù)據(jù)帘不。此處創(chuàng)建了products 和 users 兩張表,每個(gè)表各生成 5 條測(cè)試數(shù)據(jù)杨箭。

在白名單與安全組點(diǎn)擊全部開放枝哄,打通網(wǎng)絡(luò)連接误阻。此處因?yàn)闇y(cè)試目的使用了全部開放纺荧,在生產(chǎn)環(huán)境請(qǐng)合理配置白名單叭披。

實(shí)時(shí)計(jì)算 Flink 版

在免費(fèi)試用搜索 flink,點(diǎn)擊立即試用實(shí)時(shí)計(jì)算 Flink 版慈参。

完成RAM授權(quán)并領(lǐng)取資源抵扣包后呛牲,在杭州地區(qū)與RDS相同的可用區(qū)創(chuàng)建實(shí)時(shí)計(jì)算 Flink 版實(shí)例。

創(chuàng)建 AccessKey

Paimon 訪問 OSS 時(shí)需要使用 AccessKey懂牧,參照創(chuàng)建AccessKey文檔[4]創(chuàng)建 AccessKey侈净。

CDC YAML 整庫同步 Paimon

當(dāng)所需資源和測(cè)試數(shù)據(jù)都準(zhǔn)備好后,接下來讓我們?cè)趯?shí)時(shí)計(jì)算 Flink 版本上快速開發(fā)整庫同步作業(yè)僧凤。

為了數(shù)據(jù)安全,可以將需要使用的信息在變量管理用密文保存元扔。

保存好變量后躯保,在數(shù)據(jù)攝入中創(chuàng)建一個(gè)整庫同步作業(yè)并部署上線。

部署成功后澎语,在作業(yè)運(yùn)維點(diǎn)擊啟動(dòng)按鈕啟動(dòng)作業(yè)途事。

作業(yè)啟動(dòng)后验懊,可以在監(jiān)控告警的數(shù)據(jù)攝入看到 CDC YAML 同步狀態(tài)。如下監(jiān)控可以看出測(cè)試作業(yè)已經(jīng)進(jìn)入了增量階段尸变,一共同步了2張表义图,2個(gè)分片,每個(gè)表全量分別同步了5條數(shù)據(jù)召烂。

為了查看測(cè)試數(shù)據(jù)碱工,可以使用數(shù)據(jù)開發(fā) ETL 的調(diào)試功能。

首先在元數(shù)據(jù)管理創(chuàng)建對(duì)應(yīng)的Paimon Catalog奏夫。

創(chuàng)建一個(gè) Session 集群用于運(yùn)行查看數(shù)據(jù)結(jié)果的 SQL怕篷,注意選擇數(shù)據(jù)攝入支持的引擎版本。

在數(shù)據(jù)開發(fā) ETL 中酗昼,創(chuàng)建一個(gè) select SQL 作業(yè)查看數(shù)據(jù)廊谓,點(diǎn)擊調(diào)試在 Session 集群運(yùn)行,可以在控制臺(tái)查看數(shù)據(jù)結(jié)果麻削。

至此一個(gè)完整的業(yè)務(wù)案例就已經(jīng)實(shí)現(xiàn)了蒸痹,接下來可以自由地在 RDS MySQL 數(shù)據(jù)庫側(cè)操作相應(yīng)的數(shù)據(jù)修改,重新執(zhí)行 select 命令查看數(shù)據(jù)時(shí)呛哟,變能夠觀察到通過YAML作業(yè)實(shí)時(shí)同步到Paimon中的數(shù)據(jù)了电抚。

相關(guān)鏈接

[1] https://nightlies.apache.org/flink/flink-cdc-docs-stable/

[2] https://help.aliyun.com/zh/flink/user-guide/develop-a-yaml-draft?spm=a2c4g.11186623.help-menu-45029.d_2_2_3.a09a2a71KBQ8n5&scm=20140722.H_2846225._.OR_help-V_1

[3] https://free.aliyun.com/

[4] https://help.aliyun.com/zh/ram/user-guide/create-an-accesskey-pair?spm=a2c4g.11186623.0.0.4ed27ad1pdxcBb

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市竖共,隨后出現(xiàn)的幾起案子蝙叛,更是在濱河造成了極大的恐慌,老刑警劉巖公给,帶你破解...
    沈念sama閱讀 218,284評(píng)論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件借帘,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡淌铐,警方通過查閱死者的電腦和手機(jī)肺然,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,115評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來腿准,“玉大人际起,你說我怎么就攤上這事⊥麓校” “怎么了街望?”我有些...
    開封第一講書人閱讀 164,614評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)弟跑。 經(jīng)常有香客問我灾前,道長(zhǎng),這世上最難降的妖魔是什么孟辑? 我笑而不...
    開封第一講書人閱讀 58,671評(píng)論 1 293
  • 正文 為了忘掉前任哎甲,我火速辦了婚禮蔫敲,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘炭玫。我一直安慰自己奈嘿,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,699評(píng)論 6 392
  • 文/花漫 我一把揭開白布吞加。 她就那樣靜靜地躺著裙犹,像睡著了一般。 火紅的嫁衣襯著肌膚如雪榴鼎。 梳的紋絲不亂的頭發(fā)上伯诬,一...
    開封第一講書人閱讀 51,562評(píng)論 1 305
  • 那天,我揣著相機(jī)與錄音巫财,去河邊找鬼盗似。 笑死,一個(gè)胖子當(dāng)著我的面吹牛平项,可吹牛的內(nèi)容都是我干的赫舒。 我是一名探鬼主播,決...
    沈念sama閱讀 40,309評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼闽瓢,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼接癌!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起扣讼,我...
    開封第一講書人閱讀 39,223評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤缺猛,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后椭符,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體荔燎,經(jīng)...
    沈念sama閱讀 45,668評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,859評(píng)論 3 336
  • 正文 我和宋清朗相戀三年销钝,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了有咨。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,981評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡蒸健,死狀恐怖座享,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情似忧,我是刑警寧澤渣叛,帶...
    沈念sama閱讀 35,705評(píng)論 5 347
  • 正文 年R本政府宣布,位于F島的核電站橡娄,受9級(jí)特大地震影響诗箍,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜挽唉,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,310評(píng)論 3 330
  • 文/蒙蒙 一滤祖、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧瓶籽,春花似錦匠童、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,904評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至严拒,卻和暖如春扬绪,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背裤唠。 一陣腳步聲響...
    開封第一講書人閱讀 33,023評(píng)論 1 270
  • 我被黑心中介騙來泰國打工挤牛, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人种蘸。 一個(gè)月前我還...
    沈念sama閱讀 48,146評(píng)論 3 370
  • 正文 我出身青樓墓赴,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國和親航瞭。 傳聞我的和親對(duì)象是個(gè)殘疾皇子诫硕,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,933評(píng)論 2 355

推薦閱讀更多精彩內(nèi)容