Apache Doris 代碼倉庫地址:apache/incubator-doris 歡迎大家關(guān)注加星
本文通過實(shí)例來演示怎么通過Flink CDC 結(jié)合Doris的Flink Connector實(shí)現(xiàn)從Mysql數(shù)據(jù)庫中監(jiān)聽數(shù)據(jù)并實(shí)時(shí)入庫到Doris數(shù)倉對應(yīng)的表中留荔。
1.什么是CDC
CDC 是變更數(shù)據(jù)捕獲(Change Data Capture)技術(shù)的縮寫命锄,它可以將源數(shù)據(jù)庫(Source)的增量變動記錄阅羹,同步到一個(gè)或多個(gè)數(shù)據(jù)目的(Sink)幽钢。在同步過程中,還可以對數(shù)據(jù)進(jìn)行一定的處理蕊蝗,例如分組(GROUP BY)如迟、多表的關(guān)聯(lián)(JOIN)等咨演。
例如對于電商平臺,用戶的訂單會實(shí)時(shí)寫入到某個(gè)源數(shù)據(jù)庫风纠;A 部門需要將每分鐘的實(shí)時(shí)數(shù)據(jù)簡單聚合處理后保存到 Redis 中以供查詢况鸣,B 部門需要將當(dāng)天的數(shù)據(jù)暫存到 Elasticsearch 一份來做報(bào)表展示,C 部門也需要一份數(shù)據(jù)到 ClickHouse 做實(shí)時(shí)數(shù)倉竹观。隨著時(shí)間的推移镐捧,后續(xù) D 部門、E 部門也會有數(shù)據(jù)分析的需求臭增,這種場景下懂酱,傳統(tǒng)的拷貝分發(fā)多個(gè)副本方法很不靈活,而 CDC 可以實(shí)現(xiàn)一份變動記錄誊抛,實(shí)時(shí)處理并投遞到多個(gè)目的地列牺。
1.1 CDC的應(yīng)用場景
- 數(shù)據(jù)同步:用于備份,容災(zāi)芍锚;
- 數(shù)據(jù)分發(fā):一個(gè)數(shù)據(jù)源分發(fā)給多個(gè)下游系統(tǒng)昔园;
- 數(shù)據(jù)采集:面向數(shù)據(jù)倉庫 / 數(shù)據(jù)湖的 ETL 數(shù)據(jù)集成,是非常重要的數(shù)據(jù)源并炮。
CDC 的技術(shù)方案非常多默刚,目前業(yè)界主流的實(shí)現(xiàn)機(jī)制可以分為兩種:
- 基于查詢的 CDC:
- 離線調(diào)度查詢作業(yè),批處理逃魄。把一張表同步到其他系統(tǒng)荤西,每次通過查詢?nèi)カ@取表中最新的數(shù)據(jù);
- 無法保障數(shù)據(jù)一致性伍俘,查的過程中有可能數(shù)據(jù)已經(jīng)發(fā)生了多次變更邪锌;
- 不保障實(shí)時(shí)性,基于離線調(diào)度存在天然的延遲癌瘾。
- 基于日志的 CDC:
- 實(shí)時(shí)消費(fèi)日志觅丰,流處理,例如 MySQL 的 binlog 日志完整記錄了數(shù)據(jù)庫中的變更妨退,可以把 binlog 文件當(dāng)作流的數(shù)據(jù)源妇萄;
- 保障數(shù)據(jù)一致性蜕企,因?yàn)?binlog 文件包含了所有歷史變更明細(xì);
- 保障實(shí)時(shí)性冠句,因?yàn)轭愃?binlog 的日志文件是可以流式消費(fèi)的轻掩,提供的是實(shí)時(shí)數(shù)據(jù)。
2.Flink CDC
Flink在1.11版本中新增了CDC的特性懦底,簡稱 改變數(shù)據(jù)捕獲唇牧。名稱來看有點(diǎn)亂,我們先從之前的數(shù)據(jù)架構(gòu)來看CDC的內(nèi)容聚唐。
以上是之前的mysq binlog
日志處理流程丐重,例如 canal 監(jiān)聽 binlog 把日志寫入到 kafka 中。而 Apache Flink 實(shí)時(shí)消費(fèi) Kakfa 的數(shù)據(jù)實(shí)現(xiàn) mysql 數(shù)據(jù)的同步或其他內(nèi)容等拱层。拆分來說整體上可以分為以下幾個(gè)階段弥臼。
- mysql開啟binlog
- canal同步binlog數(shù)據(jù)寫入到kafka
- flink讀取kakfa中的binlog數(shù)據(jù)進(jìn)行相關(guān)的業(yè)務(wù)處理。
整體的處理鏈路較長根灯,需要用到的組件也比較多径缅。Apache Flink CDC可以直接從數(shù)據(jù)庫獲取到binlog供下游進(jìn)行業(yè)務(wù)計(jì)算分析
2.1 Flink Connector Mysql CDC 2.0 特性
提供 MySQL CDC 2.0,核心 feature 包括
- 并發(fā)讀取烙肺,全量數(shù)據(jù)的讀取性能可以水平擴(kuò)展纳猪;
- 全程無鎖,不對線上業(yè)務(wù)產(chǎn)生鎖的風(fēng)險(xiǎn)桃笙;
- 斷點(diǎn)續(xù)傳氏堤,支持全量階段的 checkpoint。
網(wǎng)上有測試文檔顯示用 TPC-DS 數(shù)據(jù)集中的 customer 表進(jìn)行了測試搏明,F(xiàn)link 版本是 1.13.1鼠锈,customer 表的數(shù)據(jù)量是 6500 萬條,Source 并發(fā)為 8星著,全量讀取階段:
- MySQL CDC 2.0 用時(shí) 13 分鐘购笆;
- MySQL CDC 1.4 用時(shí) 89 分鐘;
- 讀取性能提升 6.8 倍虚循。
3.什么是Flink Doris Connector
Apache Doris是一個(gè)現(xiàn)代化的MPP分析型數(shù)據(jù)庫產(chǎn)品同欠。僅需亞秒級響應(yīng)時(shí)間即可獲得查詢結(jié)果,有效地支持實(shí)時(shí)數(shù)據(jù)分析横缔。Apache Doris的分布式架構(gòu)非常簡潔铺遂,易于運(yùn)維,并且可以支持10PB以上的超大數(shù)據(jù)集茎刚。
Apache Doris可以滿足多種數(shù)據(jù)分析需求襟锐,例如固定歷史報(bào)表,實(shí)時(shí)數(shù)據(jù)分析膛锭,交互式數(shù)據(jù)分析和探索式數(shù)據(jù)分析等粮坞。令您的數(shù)據(jù)分析工作更加簡單高效笛质!
Flink Doris Connector 是 doris 社區(qū)為了方便用戶使用 Flink 讀寫Doris數(shù)據(jù)表的一個(gè)擴(kuò)展,
目前 doris 支持 Flink 1.11.x 捞蚂,1.12.x,1.13.x跷究,Scala版本:2.12.x
目前Flink doris connector目前控制入庫通過兩個(gè)參數(shù):
- sink.batch.size :每多少條寫入一次姓迅,默認(rèn)100條
- sink.batch.interval :每個(gè)多少秒寫入一下,默認(rèn)1秒
這兩參數(shù)同時(shí)起作用俊马,那個(gè)條件先到就觸發(fā)寫doris表操作丁存,
注意:
這里注意的是要啟用 http v2 版本,具體在 fe.conf 中配置 enable_http_server_v2=true
柴我,同時(shí)因?yàn)槭峭ㄟ^ fe http rest api 獲取 be 列表解寝,這倆需要配置的用戶有 admin 權(quán)限。
4. 用法示例
4.1 Flink Doris Connector 編譯
首先我們要編譯Doris的Flink connector艘儒,也可以通過下面的地址進(jìn)行下載:
https://github.com/hf200012/hf200012.github.io/raw/main/lib/doris-flink-1.0-SNAPSHOT.jar
注意:
這里因?yàn)镈oris 的Flink Connector 是基于Scala 2.12.x版本進(jìn)行開發(fā)的聋伦,所有你在使用Flink 的時(shí)候請選擇對應(yīng)scala 2.12的版本,
如果你使用上面地址下載了相應(yīng)的jar界睁,請忽略下面的編譯內(nèi)容部分
在 doris 的 docker 編譯環(huán)境 apache/incubator-doris:build-env-1.2
下進(jìn)行編譯觉增,因?yàn)?1.3 下面的JDK 版本是 11,會存在編譯問題翻斟。
在 extension/flink-doris-connector/ 源碼目錄下執(zhí)行:
sh build.sh
編譯成功后逾礁,會在 output/
目錄下生成文件 doris-flink-1.0.0-SNAPSHOT.jar
。將此文件復(fù)制到 Flink
的 ClassPath
中即可使用 Flink-Doris-Connector
访惜。例如嘹履,Local
模式運(yùn)行的 Flink
,將此文件放入 jars/
文件夾下债热。Yarn
集群模式運(yùn)行的Flink
砾嫉,則將此文件放入預(yù)部署包中。
針對Flink 1.13.x版本適配問題
<properties>
<scala.version>2.12</scala.version>
<flink.version>1.11.2</flink.version>
<libthrift.version>0.9.3</libthrift.version>
<arrow.version>0.15.1</arrow.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<doris.home>${basedir}/../../</doris.home>
<doris.thirdparty>${basedir}/../../thirdparty</doris.thirdparty>
</properties>
只需要將這里的 flink.version
改成和你 Flink 集群版本一致阳柔,重新編輯即可
4.2 配置Flink
這里我們是通過Flink Sql Client 方式來進(jìn)行操作焰枢。
這里我們演示使用的軟件版本:
- Mysql 8.x
- Apache Flink : 1.13.3
- Apache Doris :0.14.13.1
4.2.1 安裝Flink
首先下載和安裝 Flink :
https://dlcdn.apache.org/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.12.tgz
這里演示使用的是本地單機(jī)模式,
# wget https://dlcdn.apache.org/flink/flink-1.12.5/flink-1.12.5-bin-scala_2.12.tgz
# tar zxvf flink-1.12.5-bin-scala_2.12.tgz
下載Flink CDC相關(guān)Jar包:
這里注意Flink CDC 和Flink 的版本對應(yīng)關(guān)系
# wget https://dlcdn.apache.org/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.12.tgz
# tar zxvf flink-1.13.3-bin-scala_2.12.tgz
# cd flink-1.13.3
# wget https://repo1.maven.org/maven2/com/ververica/flink-connector-mysql-cdc/2.0.2/flink-connector-mysql-cdc-2.0.2.jar -P ./lib/
# wget https://github.com/hf200012/hf200012.github.io/raw/main/lib/doris-flink-1.0-SNAPSHOT.jar -P ./lib/
4.2.2 啟動Flink
這里我們使用的是本地單機(jī)模式
# bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host doris01.
Starting taskexecutor daemon on host doris01.
我們通過web訪問(默認(rèn)端口是8081)啟動起來Flink 集群舌剂,可以看到集群正常啟動
4.3 安裝Apache Doris
具體安裝部署Doris的方法济锄,參照下面的連接:
https://hf200012.github.io/2021/09/Apache-Doris-環(huán)境安裝部署
4.3 安裝配置 Mysql
-
安裝Mysql
快速使用Docker安裝配置Mysql,具體參照下面的連接
開啟Mysql binlog
進(jìn)入 Docker 容器修改/etc/my.cnf 文件霍转,在 [mysqld] 下面添加以下內(nèi)容荐绝,
log_bin=mysql_bin
binlog-format=Row
server-id=1
然后重啟Mysql
systemctl restart mysqld
- 創(chuàng)建Mysql數(shù)據(jù)庫表
CREATE TABLE `test_cdc` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB
4.4 創(chuàng)建doris表
CREATE TABLE `doris_test` (
`id` int NULL COMMENT "",
`name` varchar(100) NULL COMMENT ""
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "V2"
);
4.5 啟動 Flink Sql Client
./bin/sql-client.sh embedded
> set execution.result-mode=tableau;
4.5.1 創(chuàng)建 Flink CDC Mysql 映射表
CREATE TABLE test_flink_cdc (
id INT,
name STRING,
primary key(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'password',
'database-name' = 'demo',
'table-name' = 'test_cdc'
);
執(zhí)行查詢創(chuàng)建的Mysql映射表,顯示正常
select * from test_flink_cdc;
4.5.2 創(chuàng)建Flink Doris Table 映射表
使用Doris Flink Connector創(chuàng)建 Doris映射表
CREATE TABLE doris_test_sink (
id INT,
name STRING
)
WITH (
'connector' = 'doris',
'fenodes' = 'localhost:8030',
'table.identifier' = 'db_audit.doris_test',
'sink.batch.size' = '2',
'sink.batch.interval'='1',
'username' = 'root',
'password' = ''
)
在命令行下執(zhí)行上面的語句避消,可以看到創(chuàng)建表成功低滩,然后執(zhí)行查詢語句召夹,驗(yàn)證是否正常
select * from doris_test_sink;
執(zhí)行插入操作,將Mysql 里的數(shù)據(jù)通過 Flink CDC結(jié)合Doris Flink Connector方式插入到 Doris中
INSERT INTO doris_test_sink select id,name from test_flink_cdc
提交成功之后我們在Flink的Web界面可以看到相關(guān)的Job任務(wù)信息
4.5.3 向Mysql表中插入數(shù)據(jù)
INSERT INTO test_cdc VALUES (123, 'this is a update');
INSERT INTO test_cdc VALUES (1212, '測試flink CDC');
INSERT INTO test_cdc VALUES (1234, '這是測試');
INSERT INTO test_cdc VALUES (11233, 'zhangfeng_1');
INSERT INTO test_cdc VALUES (21233, 'zhangfeng_2');
INSERT INTO test_cdc VALUES (31233, 'zhangfeng_3');
INSERT INTO test_cdc VALUES (41233, 'zhangfeng_4');
INSERT INTO test_cdc VALUES (51233, 'zhangfeng_5');
INSERT INTO test_cdc VALUES (61233, 'zhangfeng_6');
INSERT INTO test_cdc VALUES (71233, 'zhangfeng_7');
INSERT INTO test_cdc VALUES (81233, 'zhangfeng_8');
INSERT INTO test_cdc VALUES (91233, 'zhangfeng_9');
4.5.4 觀察Doris表的數(shù)據(jù)
首先停掉Insert into這個(gè)任務(wù)恕沫,因?yàn)槲沂窃诒镜貑螜C(jī)模式监憎,只有一個(gè)task任務(wù),所以要停掉婶溯,然后在命令行執(zhí)行查詢語句才能看到數(shù)據(jù)
4.5.5 修改Mysql的數(shù)據(jù)
重新啟動Insert into任務(wù)
修改Mysql表里的數(shù)據(jù)
update test_cdc set name='這個(gè)是驗(yàn)證修改的操作' where id =123
再去觀察Doris表中的數(shù)據(jù)鲸阔,你會發(fā)現(xiàn)已經(jīng)修改
注意這里如果要想Mysql表里的數(shù)據(jù)修改,Doris里的數(shù)據(jù)也同樣修改迄委,Doris數(shù)據(jù)表的模型要是Unique key模型褐筛,其他數(shù)據(jù)模型(Aggregate Key 和 Duplicate Key)不能進(jìn)行數(shù)據(jù)的更新操作。
4.5.6 刪除數(shù)據(jù)操作
目前Doris Flink Connector 還不支持刪除操作叙身,后面計(jì)劃會加上這個(gè)操作