詳解Flink-CDC

CDC介紹

CDC 是 Change Data Capture(變更數(shù)據(jù)獲取)的簡稱别凤。核心思想是,監(jiān)測并捕獲數(shù)據(jù)庫的變動(dòng)(包括數(shù)據(jù)或數(shù)據(jù)表的插入领虹、更新以及刪除等)规哪,將這些變更按發(fā)生的順序完整記錄下來,寫入到消息中間件中以供其他服務(wù)進(jìn)行訂閱及消費(fèi)塌衰。

CDC種類

1.基于查詢的CDC

例如:Sqoop诉稍、Kafka JDBC source等產(chǎn)品。
特點(diǎn):基于批處理最疆,不能捕獲到所有數(shù)據(jù)的變化杯巨、高延遲、需要查詢數(shù)據(jù)庫努酸,會(huì)增加數(shù)據(jù)庫壓力

2.基于binlog的CDC

例如:Maxwell服爷、Canal、Debezium
特點(diǎn):基于streaming模式获诈、能捕捉所有數(shù)據(jù)的變化仍源、低延遲、不會(huì)增加數(shù)據(jù)庫壓力舔涎。

Flink-CDC

Flink 社區(qū)開發(fā)了 flink-cdc-connectors 組件笼踩,這是一個(gè)可以直接從 MySQL、PostgreSQL
等數(shù)據(jù)庫直接讀取全量數(shù)據(jù)和增量變更數(shù)據(jù)的 source 組件亡嫌。目前也已開源嚎于。
開源地址:https://github.com/ververica/flink-cdc-connectors

代碼實(shí)現(xiàn):

注:flink1.12不支持sql模式,支持stream模式挟冠,flink1.13支持

<dependencies>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-java</artifactId>
 <version>1.12.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-streaming-java_2.12</artifactId>
 <version>1.12.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-clients_2.12</artifactId>
 <version>1.12.0</version>
 </dependency>
 <dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-client</artifactId>
 <version>3.1.3</version>
 </dependency>
 <dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>5.1.49</version>
</dependency>
<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-table-planner-blink_2.12</artifactId>
 <version>1.12.0</version>
</dependency>
<dependency>
 <groupId>com.ververica</groupId>
 <artifactId>flink-connector-mysql-cdc</artifactId>
 <version>2.0.0</version>
</dependency>
 <dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>fastjson</artifactId>
 <version>1.2.75</version>
 </dependency>
</dependencies>
<build>
 <plugins>
 <plugin>
 <groupId>org.apache.maven.plugins</groupId>
 <artifactId>maven-assembly-plugin</artifactId>
 <version>3.0.0</version>
 <configuration>
 <descriptorRefs>
 <descriptorRef>jar-with-dependencies</descriptorRef>
 </descriptorRefs>
 </configuration>
 <executions>
 <execution>
 <id>make-assembly</id>
 <phase>package</phase>
 <goals>
 <goal>single</goal>
 </goals>
 </execution>
 </executions>
 </plugin>
 </plugins>
</build>

Flink stream模式

public class FlinkCDC {
 public static void main(String[] args) throws Exception {
 //1.創(chuàng)建執(zhí)行環(huán)境
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 //2.Flink-CDC 將讀取 binlog 的位置信息以狀態(tài)的方式保存在 CK,如果想要做到斷點(diǎn)
續(xù)傳,需要從 Checkpoint 或者 Savepoint 啟動(dòng)程序
 //2.1 開啟 Checkpoint,每隔 5 秒鐘做一次 CK
 env.enableCheckpointing(5000L);
 //2.2 指定 CK 的一致性語義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 //2.3 設(shè)置任務(wù)關(guān)閉的時(shí)候保留最后一次 CK 數(shù)據(jù)
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckp
ointCleanup.RETAIN_ON_CANCELLATION);
 //2.4 指定從 CK 自動(dòng)重啟策略
 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
 //2.5 設(shè)置狀態(tài)后端
 env.setStateBackend(new FsStateBackend("hdfs://hadoop-101:8020/flinkCDC"));
 //2.6 設(shè)置訪問 HDFS 的用戶名
 System.setProperty("HADOOP_USER_NAME", "hadoop");

 //3.創(chuàng)建 Flink-MySQL-CDC 的 Source
 //initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
 //latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
 //timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.
 //specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.
 
 DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
 .hostname("hadoop102")
 .port(3306)
 .username("root")
 .password("root")
 .databaseList("flink_test")
 .tableList("flink.user_info") //可選配置項(xiàng),如果不指定該參數(shù),則會(huì)讀取上一個(gè)配置下的所有表的數(shù)據(jù)于购,注意:指定的時(shí)候需要使用"db.table"的方式
 .startupOptions(StartupOptions.initial())
 .deserializer(new StringDebeziumDeserializationSchema())
 .build();
 //4.使用 CDC Source 從 MySQL 讀取數(shù)據(jù)
 DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
 //5.打印數(shù)據(jù)
 mysqlDS.print();
 //6.執(zhí)行任務(wù)
 env.execute();
 } }

FlinkSQL 方式

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQL_CDC {
 public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env = 
 StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 tableEnv.executeSql("CREATE TABLE user_info (" +
 " id INT," +
 " name STRING," +
 " phone_num STRING" +
 ") WITH (" +
 " 'connector' = 'mysql-cdc'," +
 " 'hostname' = 'hadoop-101'," +
 " 'port' = '3306'," +
 " 'username' = 'root'," +
 " 'password' = 'root'," +
 " 'database-name' = 'flink_test'," +
 " 'table-name' = 'user_info'" +
 ")");
 tableEnv.executeSql("select * from user_info").print();
 env.execute();
 } }

自定義反序列化器

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.Properties;
public class Flink_CDCWithCustomerSchema {
 public static void main(String[] args) throws Exception {
 //1.創(chuàng)建執(zhí)行環(huán)境
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setParallelism(1);
 //2.創(chuàng)建 Flink-MySQL-CDC 的 Source
 DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
 .hostname("hadoop102")
 .port(3306)
 .username("root")
 .password("000000")
 .databaseList("gmall-flink")
 .tableList("gmall-flink.z_user_info") //可選配置項(xiàng),如果不指定該參數(shù),則會(huì)讀取上一個(gè)配置下的所有表的數(shù)據(jù),注意:指定的時(shí)候需要使用"db.table"的方式
.startupOptions(StartupOptions.initial())
 .deserializer(new DebeziumDeserializationSchema<String>() { //自定義數(shù)
據(jù)解析器
 @Override
 public void deserialize(SourceRecord sourceRecord, Collector<String> 
collector) throws Exception {
 //獲取主題信息,包含著數(shù)據(jù)庫和表名 
mysql_binlog_source.gmall-flink.z_user_info
 String topic = sourceRecord.topic();
 String[] arr = topic.split("\\.");
 String db = arr[1];
 String tableName = arr[2];
 //獲取操作類型 READ DELETE UPDATE CREATE
 Envelope.Operation operation = 
Envelope.operationFor(sourceRecord);
 //獲取值信息并轉(zhuǎn)換為 Struct 類型
 Struct value = (Struct) sourceRecord.value();
 //獲取變化后的數(shù)據(jù)
 Struct after = value.getStruct("after");
 //創(chuàng)建 JSON 對象用于存儲(chǔ)數(shù)據(jù)信息
 JSONObject data = new JSONObject();
 for (Field field : after.schema().fields()) {
 Object o = after.get(field);
 data.put(field.name(), o);
 }
 //創(chuàng)建 JSON 對象用于封裝最終返回值數(shù)據(jù)信息
 JSONObject result = new JSONObject();
 result.put("operation", operation.toString().toLowerCase());
 result.put("data", data);
 result.put("database", db);
 result.put("table", tableName);
 //發(fā)送數(shù)據(jù)至下游
 collector.collect(result.toJSONString());
 }
 @Override
 public TypeInformation<String> getProducedType() {
 return TypeInformation.of(String.class);
 }
 })
 .build();
 //3.使用 CDC Source 從 MySQL 讀取數(shù)據(jù)
 DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
 //4.打印數(shù)據(jù)
 mysqlDS.print();
 //5.執(zhí)行任務(wù)
 env.execute();
 } }

Flink CDC2.0

Flink-CDC1.0存在的問題

1.一致性通過加鎖保證

Debesium在保證數(shù)據(jù)一致性時(shí),需要對讀取的庫或者表加鎖知染。

2.不支持水平拓展

Flink-CDC只支持單并發(fā)价涝,全量讀取數(shù)據(jù)階段,如果表數(shù)據(jù)據(jù)量級大持舆,讀取效率在小時(shí)級別色瘩。

3.全量讀取階段不支持ckeckpoint

解決辦法

在對于有主鍵的表做初始化模式伪窖,整體的流程主要分為 5 個(gè)階段:
1.Chunk 切分;
2.Chunk 分配居兆;(實(shí)現(xiàn)并行讀取數(shù)據(jù)&CheckPoint)
3.Chunk 讀雀采健;(實(shí)現(xiàn)無鎖讀饶嗥堋)
4.Chunk 匯報(bào)簇宽;
5.Chunk 分配。

Chunk 切分

根據(jù) Netflix DBlog 的論文中的無鎖算法原理吧享,對于目標(biāo)表按照主鍵進(jìn)行數(shù)據(jù)分片魏割,設(shè)置每個(gè)切片的區(qū)間為左閉右開或者左開右閉來保證數(shù)據(jù)的連續(xù)性。

Chunk 分配

將劃分好的 Chunk 分發(fā)給多個(gè) SourceReader钢颂,每個(gè) SourceReader 讀取表中的一部分?jǐn)?shù)據(jù)钞它,實(shí)現(xiàn)了并行讀取的目標(biāo)。
同時(shí)在每個(gè) Chunk 讀取的時(shí)候可以單獨(dú)做 CheckPoint殊鞭,某個(gè) Chunk 讀取失敗只需要單獨(dú)執(zhí)行該 Chunk 的任務(wù)遭垛,而不需要像 1.x 中失敗了只能從頭讀取。
若每個(gè) SourceReader 保證了數(shù)據(jù)一致性操灿,則全表就保證了數(shù)據(jù)一致性锯仪。

Chunk 讀取

讀取可以分為 5 個(gè)階段
1)SourceReader 讀取表數(shù)據(jù)之前先記錄當(dāng)前的 Binlog 位置信息記為低位點(diǎn);
2)SourceReader 將自身區(qū)間內(nèi)的數(shù)據(jù)查詢出來并放置在 buffer 中趾盐;
3)查詢完成之后記錄當(dāng)前的 Binlog 位置信息記為高位點(diǎn)庶喜;
4)在增量部分消費(fèi)從低位點(diǎn)到高位點(diǎn)的 Binlog; 5)根據(jù)主鍵救鲤,對 buffer 中的數(shù)據(jù)進(jìn)行修正并輸出溃卡。
通過以上5個(gè)階段可以保證每個(gè)Chunk最終的輸出就是在高位點(diǎn)時(shí)該Chunk中最新的數(shù)據(jù),但是目前只是做到了保證單個(gè) Chunk 中的數(shù)據(jù)一致性蜒简。

Chunk 匯報(bào)

在 Snapshot Chunk 讀取完成之后瘸羡,有一個(gè)匯報(bào)的流程,如上圖所示搓茬,即 SourceReader 需要將 Snapshot Chunk 完成信息匯報(bào)給 SourceEnumerator犹赖。

Chunk 分配

FlinkCDC 是支持全量+增量數(shù)據(jù)同步的,在 SourceEnumerator 接收到所有的 Snapshot卷仑。Chunk 完成信息之后峻村,還有一個(gè)消費(fèi)增量數(shù)據(jù)(Binlog)的任務(wù),此時(shí)是通過下發(fā) Binlog Chunk給任意一個(gè) SourceReader 進(jìn)行單并發(fā)讀取來實(shí)現(xiàn)的锡凝。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末粘昨,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌张肾,老刑警劉巖芭析,帶你破解...
    沈念sama閱讀 206,602評論 6 481
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異吞瞪,居然都是意外死亡馁启,警方通過查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 88,442評論 2 382
  • 文/潘曉璐 我一進(jìn)店門芍秆,熙熙樓的掌柜王于貴愁眉苦臉地迎上來惯疙,“玉大人,你說我怎么就攤上這事妖啥∶沟撸” “怎么了?”我有些...
    開封第一講書人閱讀 152,878評論 0 344
  • 文/不壞的土叔 我叫張陵荆虱,是天一觀的道長蒿偎。 經(jīng)常有香客問我,道長克伊,這世上最難降的妖魔是什么酥郭? 我笑而不...
    開封第一講書人閱讀 55,306評論 1 279
  • 正文 為了忘掉前任华坦,我火速辦了婚禮愿吹,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘惜姐。我一直安慰自己犁跪,他們只是感情好,可當(dāng)我...
    茶點(diǎn)故事閱讀 64,330評論 5 373
  • 文/花漫 我一把揭開白布歹袁。 她就那樣靜靜地躺著坷衍,像睡著了一般。 火紅的嫁衣襯著肌膚如雪条舔。 梳的紋絲不亂的頭發(fā)上枫耳,一...
    開封第一講書人閱讀 49,071評論 1 285
  • 那天,我揣著相機(jī)與錄音孟抗,去河邊找鬼迁杨。 笑死,一個(gè)胖子當(dāng)著我的面吹牛凄硼,可吹牛的內(nèi)容都是我干的铅协。 我是一名探鬼主播,決...
    沈念sama閱讀 38,382評論 3 400
  • 文/蒼蘭香墨 我猛地睜開眼摊沉,長吁一口氣:“原來是場噩夢啊……” “哼狐史!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,006評論 0 259
  • 序言:老撾萬榮一對情侶失蹤骏全,失蹤者是張志新(化名)和其女友劉穎苍柏,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體吟温,經(jīng)...
    沈念sama閱讀 43,512評論 1 300
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡序仙,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 35,965評論 2 325
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了鲁豪。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片潘悼。...
    茶點(diǎn)故事閱讀 38,094評論 1 333
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖爬橡,靈堂內(nèi)的尸體忽然破棺而出治唤,到底是詐尸還是另有隱情,我是刑警寧澤糙申,帶...
    沈念sama閱讀 33,732評論 4 323
  • 正文 年R本政府宣布宾添,位于F島的核電站,受9級特大地震影響柜裸,放射性物質(zhì)發(fā)生泄漏缕陕。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 39,283評論 3 307
  • 文/蒙蒙 一疙挺、第九天 我趴在偏房一處隱蔽的房頂上張望扛邑。 院中可真熱鬧,春花似錦铐然、人聲如沸蔬崩。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,286評論 0 19
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽沥阳。三九已至,卻和暖如春自点,著一層夾襖步出監(jiān)牢的瞬間桐罕,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,512評論 1 262
  • 我被黑心中介騙來泰國打工桂敛, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留功炮,地道東北人。 一個(gè)月前我還...
    沈念sama閱讀 45,536評論 2 354
  • 正文 我出身青樓埠啃,卻偏偏與公主長得像死宣,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個(gè)殘疾皇子碴开,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 42,828評論 2 345

推薦閱讀更多精彩內(nèi)容