使用 Flink CDC 實(shí)現(xiàn) MySQL 數(shù)據(jù)實(shí)時(shí)入 Apache Doris

Apache Doris 代碼倉庫地址:apache/incubator-doris 歡迎大家關(guān)注加星


本文通過實(shí)例來演示怎么通過Flink CDC 結(jié)合Doris的Flink Connector實(shí)現(xiàn)從Mysql數(shù)據(jù)庫中監(jiān)聽數(shù)據(jù)并實(shí)時(shí)入庫到Doris數(shù)倉對應(yīng)的表中留荔。

1.什么是CDC

CDC 是變更數(shù)據(jù)捕獲(Change Data Capture)技術(shù)的縮寫命锄,它可以將源數(shù)據(jù)庫(Source)的增量變動記錄阅羹,同步到一個(gè)或多個(gè)數(shù)據(jù)目的(Sink)幽钢。在同步過程中,還可以對數(shù)據(jù)進(jìn)行一定的處理蕊蝗,例如分組(GROUP BY)如迟、多表的關(guān)聯(lián)(JOIN)等咨演。

例如對于電商平臺,用戶的訂單會實(shí)時(shí)寫入到某個(gè)源數(shù)據(jù)庫风纠;A 部門需要將每分鐘的實(shí)時(shí)數(shù)據(jù)簡單聚合處理后保存到 Redis 中以供查詢况鸣,B 部門需要將當(dāng)天的數(shù)據(jù)暫存到 Elasticsearch 一份來做報(bào)表展示,C 部門也需要一份數(shù)據(jù)到 ClickHouse 做實(shí)時(shí)數(shù)倉竹观。隨著時(shí)間的推移镐捧,后續(xù) D 部門、E 部門也會有數(shù)據(jù)分析的需求臭增,這種場景下懂酱,傳統(tǒng)的拷貝分發(fā)多個(gè)副本方法很不靈活,而 CDC 可以實(shí)現(xiàn)一份變動記錄誊抛,實(shí)時(shí)處理并投遞到多個(gè)目的地列牺。

1.1 CDC的應(yīng)用場景

  • 數(shù)據(jù)同步:用于備份,容災(zāi)芍锚;
  • 數(shù)據(jù)分發(fā):一個(gè)數(shù)據(jù)源分發(fā)給多個(gè)下游系統(tǒng)昔园;
  • 數(shù)據(jù)采集:面向數(shù)據(jù)倉庫 / 數(shù)據(jù)湖的 ETL 數(shù)據(jù)集成,是非常重要的數(shù)據(jù)源并炮。

CDC 的技術(shù)方案非常多默刚,目前業(yè)界主流的實(shí)現(xiàn)機(jī)制可以分為兩種:

  • 基于查詢的 CDC:
    • 離線調(diào)度查詢作業(yè),批處理逃魄。把一張表同步到其他系統(tǒng)荤西,每次通過查詢?nèi)カ@取表中最新的數(shù)據(jù);
    • 無法保障數(shù)據(jù)一致性伍俘,查的過程中有可能數(shù)據(jù)已經(jīng)發(fā)生了多次變更邪锌;
    • 不保障實(shí)時(shí)性,基于離線調(diào)度存在天然的延遲癌瘾。
  • 基于日志的 CDC:
    • 實(shí)時(shí)消費(fèi)日志觅丰,流處理,例如 MySQL 的 binlog 日志完整記錄了數(shù)據(jù)庫中的變更妨退,可以把 binlog 文件當(dāng)作流的數(shù)據(jù)源妇萄;
    • 保障數(shù)據(jù)一致性蜕企,因?yàn)?binlog 文件包含了所有歷史變更明細(xì);
    • 保障實(shí)時(shí)性冠句,因?yàn)轭愃?binlog 的日志文件是可以流式消費(fèi)的轻掩,提供的是實(shí)時(shí)數(shù)據(jù)。

2.Flink CDC

Flink在1.11版本中新增了CDC的特性懦底,簡稱 改變數(shù)據(jù)捕獲唇牧。名稱來看有點(diǎn)亂,我們先從之前的數(shù)據(jù)架構(gòu)來看CDC的內(nèi)容聚唐。

以上是之前的mysq binlog日志處理流程丐重,例如 canal 監(jiān)聽 binlog 把日志寫入到 kafka 中。而 Apache Flink 實(shí)時(shí)消費(fèi) Kakfa 的數(shù)據(jù)實(shí)現(xiàn) mysql 數(shù)據(jù)的同步或其他內(nèi)容等拱层。拆分來說整體上可以分為以下幾個(gè)階段弥臼。

  1. mysql開啟binlog
  2. canal同步binlog數(shù)據(jù)寫入到kafka
  3. flink讀取kakfa中的binlog數(shù)據(jù)進(jìn)行相關(guān)的業(yè)務(wù)處理。

整體的處理鏈路較長根灯,需要用到的組件也比較多径缅。Apache Flink CDC可以直接從數(shù)據(jù)庫獲取到binlog供下游進(jìn)行業(yè)務(wù)計(jì)算分析

2.1 Flink Connector Mysql CDC 2.0 特性

提供 MySQL CDC 2.0,核心 feature 包括

  • 并發(fā)讀取烙肺,全量數(shù)據(jù)的讀取性能可以水平擴(kuò)展纳猪;
  • 全程無鎖,不對線上業(yè)務(wù)產(chǎn)生鎖的風(fēng)險(xiǎn)桃笙;
  • 斷點(diǎn)續(xù)傳氏堤,支持全量階段的 checkpoint。

網(wǎng)上有測試文檔顯示用 TPC-DS 數(shù)據(jù)集中的 customer 表進(jìn)行了測試搏明,F(xiàn)link 版本是 1.13.1鼠锈,customer 表的數(shù)據(jù)量是 6500 萬條,Source 并發(fā)為 8星著,全量讀取階段:

  • MySQL CDC 2.0 用時(shí) 13 分鐘购笆;
  • MySQL CDC 1.4 用時(shí) 89 分鐘;
  • 讀取性能提升 6.8 倍虚循。

3.什么是Flink Doris Connector

image.png

Apache Doris是一個(gè)現(xiàn)代化的MPP分析型數(shù)據(jù)庫產(chǎn)品同欠。僅需亞秒級響應(yīng)時(shí)間即可獲得查詢結(jié)果,有效地支持實(shí)時(shí)數(shù)據(jù)分析横缔。Apache Doris的分布式架構(gòu)非常簡潔铺遂,易于運(yùn)維,并且可以支持10PB以上的超大數(shù)據(jù)集茎刚。

Apache Doris可以滿足多種數(shù)據(jù)分析需求襟锐,例如固定歷史報(bào)表,實(shí)時(shí)數(shù)據(jù)分析膛锭,交互式數(shù)據(jù)分析和探索式數(shù)據(jù)分析等粮坞。令您的數(shù)據(jù)分析工作更加簡單高效笛质!

Flink Doris Connector 是 doris 社區(qū)為了方便用戶使用 Flink 讀寫Doris數(shù)據(jù)表的一個(gè)擴(kuò)展,

目前 doris 支持 Flink 1.11.x 捞蚂,1.12.x,1.13.x跷究,Scala版本:2.12.x

目前Flink doris connector目前控制入庫通過兩個(gè)參數(shù):

  1. sink.batch.size :每多少條寫入一次姓迅,默認(rèn)100條
  2. sink.batch.interval :每個(gè)多少秒寫入一下,默認(rèn)1秒

這兩參數(shù)同時(shí)起作用俊马,那個(gè)條件先到就觸發(fā)寫doris表操作丁存,

注意:

這里注意的是要啟用 http v2 版本,具體在 fe.conf 中配置 enable_http_server_v2=true柴我,同時(shí)因?yàn)槭峭ㄟ^ fe http rest api 獲取 be 列表解寝,這倆需要配置的用戶有 admin 權(quán)限。

4. 用法示例

4.1 Flink Doris Connector 編譯

首先我們要編譯Doris的Flink connector艘儒,也可以通過下面的地址進(jìn)行下載:

https://github.com/hf200012/hf200012.github.io/raw/main/lib/doris-flink-1.0-SNAPSHOT.jar

注意:

這里因?yàn)镈oris 的Flink Connector 是基于Scala 2.12.x版本進(jìn)行開發(fā)的聋伦,所有你在使用Flink 的時(shí)候請選擇對應(yīng)scala 2.12的版本,

如果你使用上面地址下載了相應(yīng)的jar界睁,請忽略下面的編譯內(nèi)容部分

在 doris 的 docker 編譯環(huán)境 apache/incubator-doris:build-env-1.2 下進(jìn)行編譯觉增,因?yàn)?1.3 下面的JDK 版本是 11,會存在編譯問題翻斟。

在 extension/flink-doris-connector/ 源碼目錄下執(zhí)行:

sh build.sh

編譯成功后逾礁,會在 output/ 目錄下生成文件 doris-flink-1.0.0-SNAPSHOT.jar。將此文件復(fù)制到 FlinkClassPath 中即可使用 Flink-Doris-Connector访惜。例如嘹履,Local 模式運(yùn)行的 Flink,將此文件放入 jars/ 文件夾下债热。Yarn集群模式運(yùn)行的Flink砾嫉,則將此文件放入預(yù)部署包中。

針對Flink 1.13.x版本適配問題

   <properties>
        <scala.version>2.12</scala.version>
        <flink.version>1.11.2</flink.version>
        <libthrift.version>0.9.3</libthrift.version>
        <arrow.version>0.15.1</arrow.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <doris.home>${basedir}/../../</doris.home>
        <doris.thirdparty>${basedir}/../../thirdparty</doris.thirdparty>
    </properties>

只需要將這里的 flink.version 改成和你 Flink 集群版本一致阳柔,重新編輯即可

4.2 配置Flink

這里我們是通過Flink Sql Client 方式來進(jìn)行操作焰枢。

這里我們演示使用的軟件版本:

  1. Mysql 8.x
  2. Apache Flink : 1.13.3
  3. Apache Doris :0.14.13.1

4.2.1 安裝Flink

首先下載和安裝 Flink :

https://dlcdn.apache.org/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.12.tgz

這里演示使用的是本地單機(jī)模式,

# wget https://dlcdn.apache.org/flink/flink-1.12.5/flink-1.12.5-bin-scala_2.12.tgz
# tar zxvf flink-1.12.5-bin-scala_2.12.tgz 

下載Flink CDC相關(guān)Jar包:

https://repo1.maven.org/maven2/com/ververica/flink-connector-mysql-cdc/2.0.2/flink-connector-mysql-cdc-2.0.2.jar

這里注意Flink CDC 和Flink 的版本對應(yīng)關(guān)系

image.png
# wget https://dlcdn.apache.org/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.12.tgz
# tar zxvf flink-1.13.3-bin-scala_2.12.tgz 
# cd flink-1.13.3
# wget https://repo1.maven.org/maven2/com/ververica/flink-connector-mysql-cdc/2.0.2/flink-connector-mysql-cdc-2.0.2.jar -P ./lib/
# wget https://github.com/hf200012/hf200012.github.io/raw/main/lib/doris-flink-1.0-SNAPSHOT.jar -P ./lib/
image.png

4.2.2 啟動Flink

這里我們使用的是本地單機(jī)模式

# bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host doris01.
Starting taskexecutor daemon on host doris01.

我們通過web訪問(默認(rèn)端口是8081)啟動起來Flink 集群舌剂,可以看到集群正常啟動

image.png

4.3 安裝Apache Doris

具體安裝部署Doris的方法济锄,參照下面的連接:

https://hf200012.github.io/2021/09/Apache-Doris-環(huán)境安裝部署

4.3 安裝配置 Mysql

  1. 安裝Mysql

    快速使用Docker安裝配置Mysql,具體參照下面的連接

    https://segmentfault.com/a/1190000021523570

  2. 開啟Mysql binlog

進(jìn)入 Docker 容器修改/etc/my.cnf 文件霍转,在 [mysqld] 下面添加以下內(nèi)容荐绝,

log_bin=mysql_bin
binlog-format=Row
server-id=1

然后重啟Mysql

systemctl restart mysqld
  1. 創(chuàng)建Mysql數(shù)據(jù)庫表
 CREATE TABLE `test_cdc` (
  `id` int NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
 ) ENGINE=InnoDB 

4.4 創(chuàng)建doris表

CREATE TABLE `doris_test` (
  `id` int NULL COMMENT "",
  `name` varchar(100) NULL COMMENT ""
 ) ENGINE=OLAP
 UNIQUE KEY(`id`)
 COMMENT "OLAP"
 DISTRIBUTED BY HASH(`id`) BUCKETS 1
 PROPERTIES (
 "replication_num" = "3",
 "in_memory" = "false",
 "storage_format" = "V2"
 );

4.5 啟動 Flink Sql Client

./bin/sql-client.sh embedded
> set execution.result-mode=tableau;
image.png

4.5.1 創(chuàng)建 Flink CDC Mysql 映射表

CREATE TABLE test_flink_cdc ( 
  id INT, 
  name STRING,
  primary key(id)  NOT ENFORCED
) WITH ( 
  'connector' = 'mysql-cdc', 
  'hostname' = 'localhost', 
  'port' = '3306', 
  'username' = 'root', 
  'password' = 'password', 
  'database-name' = 'demo', 
  'table-name' = 'test_cdc' 
);

執(zhí)行查詢創(chuàng)建的Mysql映射表,顯示正常

select * from test_flink_cdc;
image.png

4.5.2 創(chuàng)建Flink Doris Table 映射表

使用Doris Flink Connector創(chuàng)建 Doris映射表

CREATE TABLE doris_test_sink (
   id INT,
   name STRING
) 
WITH (
  'connector' = 'doris',
  'fenodes' = 'localhost:8030',
  'table.identifier' = 'db_audit.doris_test',
  'sink.batch.size' = '2',
  'sink.batch.interval'='1',
  'username' = 'root',
  'password' = ''
)

在命令行下執(zhí)行上面的語句避消,可以看到創(chuàng)建表成功低滩,然后執(zhí)行查詢語句召夹,驗(yàn)證是否正常

select * from doris_test_sink;
image.png

執(zhí)行插入操作,將Mysql 里的數(shù)據(jù)通過 Flink CDC結(jié)合Doris Flink Connector方式插入到 Doris中

INSERT INTO doris_test_sink select id,name from test_flink_cdc
image.png

提交成功之后我們在Flink的Web界面可以看到相關(guān)的Job任務(wù)信息

image.png

4.5.3 向Mysql表中插入數(shù)據(jù)

INSERT INTO test_cdc VALUES (123, 'this is a update');
INSERT INTO test_cdc VALUES (1212, '測試flink CDC');
INSERT INTO test_cdc VALUES (1234, '這是測試');
INSERT INTO test_cdc VALUES (11233, 'zhangfeng_1');
INSERT INTO test_cdc VALUES (21233, 'zhangfeng_2');
INSERT INTO test_cdc VALUES (31233, 'zhangfeng_3');
INSERT INTO test_cdc VALUES (41233, 'zhangfeng_4');
INSERT INTO test_cdc VALUES (51233, 'zhangfeng_5');
INSERT INTO test_cdc VALUES (61233, 'zhangfeng_6');
INSERT INTO test_cdc VALUES (71233, 'zhangfeng_7');
INSERT INTO test_cdc VALUES (81233, 'zhangfeng_8');
INSERT INTO test_cdc VALUES (91233, 'zhangfeng_9');

4.5.4 觀察Doris表的數(shù)據(jù)

首先停掉Insert into這個(gè)任務(wù)恕沫,因?yàn)槲沂窃诒镜貑螜C(jī)模式监憎,只有一個(gè)task任務(wù),所以要停掉婶溯,然后在命令行執(zhí)行查詢語句才能看到數(shù)據(jù)

image.png

4.5.5 修改Mysql的數(shù)據(jù)

重新啟動Insert into任務(wù)

image.png

修改Mysql表里的數(shù)據(jù)

update test_cdc set name='這個(gè)是驗(yàn)證修改的操作' where id =123

再去觀察Doris表中的數(shù)據(jù)鲸阔,你會發(fā)現(xiàn)已經(jīng)修改

注意這里如果要想Mysql表里的數(shù)據(jù)修改,Doris里的數(shù)據(jù)也同樣修改迄委,Doris數(shù)據(jù)表的模型要是Unique key模型褐筛,其他數(shù)據(jù)模型(Aggregate Key 和 Duplicate Key)不能進(jìn)行數(shù)據(jù)的更新操作。

image.png

4.5.6 刪除數(shù)據(jù)操作

目前Doris Flink Connector 還不支持刪除操作叙身,后面計(jì)劃會加上這個(gè)操作

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末渔扎,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子信轿,更是在濱河造成了極大的恐慌晃痴,老刑警劉巖,帶你破解...
    沈念sama閱讀 207,248評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件财忽,死亡現(xiàn)場離奇詭異愧旦,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)定罢,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,681評論 2 381
  • 文/潘曉璐 我一進(jìn)店門笤虫,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人祖凫,你說我怎么就攤上這事琼蚯。” “怎么了惠况?”我有些...
    開封第一講書人閱讀 153,443評論 0 344
  • 文/不壞的土叔 我叫張陵遭庶,是天一觀的道長。 經(jīng)常有香客問我稠屠,道長峦睡,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 55,475評論 1 279
  • 正文 為了忘掉前任权埠,我火速辦了婚禮榨了,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘攘蔽。我一直安慰自己龙屉,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,458評論 5 374
  • 文/花漫 我一把揭開白布满俗。 她就那樣靜靜地躺著转捕,像睡著了一般作岖。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上五芝,一...
    開封第一講書人閱讀 49,185評論 1 284
  • 那天痘儡,我揣著相機(jī)與錄音,去河邊找鬼枢步。 笑死谤辜,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的价捧。 我是一名探鬼主播,決...
    沈念sama閱讀 38,451評論 3 401
  • 文/蒼蘭香墨 我猛地睜開眼涡戳,長吁一口氣:“原來是場噩夢啊……” “哼结蟋!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起渔彰,我...
    開封第一講書人閱讀 37,112評論 0 261
  • 序言:老撾萬榮一對情侶失蹤嵌屎,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后恍涂,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體宝惰,經(jīng)...
    沈念sama閱讀 43,609評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,083評論 2 325
  • 正文 我和宋清朗相戀三年再沧,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了尼夺。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,163評論 1 334
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡炒瘸,死狀恐怖淤堵,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情顷扩,我是刑警寧澤拐邪,帶...
    沈念sama閱讀 33,803評論 4 323
  • 正文 年R本政府宣布,位于F島的核電站隘截,受9級特大地震影響扎阶,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜婶芭,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,357評論 3 307
  • 文/蒙蒙 一东臀、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧犀农,春花似錦啡邑、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,357評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽贵扰。三九已至,卻和暖如春流部,著一層夾襖步出監(jiān)牢的瞬間戚绕,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,590評論 1 261
  • 我被黑心中介騙來泰國打工枝冀, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留舞丛,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,636評論 2 355
  • 正文 我出身青樓果漾,卻偏偏與公主長得像球切,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子绒障,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,925評論 2 344