CDC介紹
CDC 是 Change Data Capture(變更數(shù)據(jù)獲取)的簡稱别凤。核心思想是,監(jiān)測并捕獲數(shù)據(jù)庫的變動(dòng)(包括數(shù)據(jù)或數(shù)據(jù)表的插入领虹、更新以及刪除等)规哪,將這些變更按發(fā)生的順序完整記錄下來,寫入到消息中間件中以供其他服務(wù)進(jìn)行訂閱及消費(fèi)塌衰。
CDC種類
1.基于查詢的CDC
例如:Sqoop诉稍、Kafka JDBC source等產(chǎn)品。
特點(diǎn):基于批處理最疆,不能捕獲到所有數(shù)據(jù)的變化杯巨、高延遲、需要查詢數(shù)據(jù)庫努酸,會(huì)增加數(shù)據(jù)庫壓力
2.基于binlog的CDC
例如:Maxwell服爷、Canal、Debezium
特點(diǎn):基于streaming模式获诈、能捕捉所有數(shù)據(jù)的變化仍源、低延遲、不會(huì)增加數(shù)據(jù)庫壓力舔涎。
Flink-CDC
Flink 社區(qū)開發(fā)了 flink-cdc-connectors 組件笼踩,這是一個(gè)可以直接從 MySQL、PostgreSQL
等數(shù)據(jù)庫直接讀取全量數(shù)據(jù)和增量變更數(shù)據(jù)的 source 組件亡嫌。目前也已開源嚎于。
開源地址:https://github.com/ververica/flink-cdc-connectors
代碼實(shí)現(xiàn):
注:flink1.12不支持sql模式,支持stream模式挟冠,flink1.13支持
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
Flink stream模式
public class FlinkCDC {
public static void main(String[] args) throws Exception {
//1.創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.Flink-CDC 將讀取 binlog 的位置信息以狀態(tài)的方式保存在 CK,如果想要做到斷點(diǎn)
續(xù)傳,需要從 Checkpoint 或者 Savepoint 啟動(dòng)程序
//2.1 開啟 Checkpoint,每隔 5 秒鐘做一次 CK
env.enableCheckpointing(5000L);
//2.2 指定 CK 的一致性語義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//2.3 設(shè)置任務(wù)關(guān)閉的時(shí)候保留最后一次 CK 數(shù)據(jù)
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckp
ointCleanup.RETAIN_ON_CANCELLATION);
//2.4 指定從 CK 自動(dòng)重啟策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
//2.5 設(shè)置狀態(tài)后端
env.setStateBackend(new FsStateBackend("hdfs://hadoop-101:8020/flinkCDC"));
//2.6 設(shè)置訪問 HDFS 的用戶名
System.setProperty("HADOOP_USER_NAME", "hadoop");
//3.創(chuàng)建 Flink-MySQL-CDC 的 Source
//initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.
//latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.
//timestamp: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified timestamp. The consumer will traverse the binlog from the beginning and ignore change events whose timestamp is smaller than the specified timestamp.
//specific-offset: Never to perform snapshot on the monitored database tables upon first startup, and directly read binlog from the specified offset.
DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
.hostname("hadoop102")
.port(3306)
.username("root")
.password("root")
.databaseList("flink_test")
.tableList("flink.user_info") //可選配置項(xiàng),如果不指定該參數(shù),則會(huì)讀取上一個(gè)配置下的所有表的數(shù)據(jù)于购,注意:指定的時(shí)候需要使用"db.table"的方式
.startupOptions(StartupOptions.initial())
.deserializer(new StringDebeziumDeserializationSchema())
.build();
//4.使用 CDC Source 從 MySQL 讀取數(shù)據(jù)
DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
//5.打印數(shù)據(jù)
mysqlDS.print();
//6.執(zhí)行任務(wù)
env.execute();
} }
FlinkSQL 方式
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQL_CDC {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE user_info (" +
" id INT," +
" name STRING," +
" phone_num STRING" +
") WITH (" +
" 'connector' = 'mysql-cdc'," +
" 'hostname' = 'hadoop-101'," +
" 'port' = '3306'," +
" 'username' = 'root'," +
" 'password' = 'root'," +
" 'database-name' = 'flink_test'," +
" 'table-name' = 'user_info'" +
")");
tableEnv.executeSql("select * from user_info").print();
env.execute();
} }
自定義反序列化器
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.Properties;
public class Flink_CDCWithCustomerSchema {
public static void main(String[] args) throws Exception {
//1.創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.創(chuàng)建 Flink-MySQL-CDC 的 Source
DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
.hostname("hadoop102")
.port(3306)
.username("root")
.password("000000")
.databaseList("gmall-flink")
.tableList("gmall-flink.z_user_info") //可選配置項(xiàng),如果不指定該參數(shù),則會(huì)讀取上一個(gè)配置下的所有表的數(shù)據(jù),注意:指定的時(shí)候需要使用"db.table"的方式
.startupOptions(StartupOptions.initial())
.deserializer(new DebeziumDeserializationSchema<String>() { //自定義數(shù)
據(jù)解析器
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String>
collector) throws Exception {
//獲取主題信息,包含著數(shù)據(jù)庫和表名
mysql_binlog_source.gmall-flink.z_user_info
String topic = sourceRecord.topic();
String[] arr = topic.split("\\.");
String db = arr[1];
String tableName = arr[2];
//獲取操作類型 READ DELETE UPDATE CREATE
Envelope.Operation operation =
Envelope.operationFor(sourceRecord);
//獲取值信息并轉(zhuǎn)換為 Struct 類型
Struct value = (Struct) sourceRecord.value();
//獲取變化后的數(shù)據(jù)
Struct after = value.getStruct("after");
//創(chuàng)建 JSON 對象用于存儲(chǔ)數(shù)據(jù)信息
JSONObject data = new JSONObject();
for (Field field : after.schema().fields()) {
Object o = after.get(field);
data.put(field.name(), o);
}
//創(chuàng)建 JSON 對象用于封裝最終返回值數(shù)據(jù)信息
JSONObject result = new JSONObject();
result.put("operation", operation.toString().toLowerCase());
result.put("data", data);
result.put("database", db);
result.put("table", tableName);
//發(fā)送數(shù)據(jù)至下游
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
})
.build();
//3.使用 CDC Source 從 MySQL 讀取數(shù)據(jù)
DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);
//4.打印數(shù)據(jù)
mysqlDS.print();
//5.執(zhí)行任務(wù)
env.execute();
} }
Flink CDC2.0
Flink-CDC1.0存在的問題
1.一致性通過加鎖保證
Debesium在保證數(shù)據(jù)一致性時(shí),需要對讀取的庫或者表加鎖知染。
2.不支持水平拓展
Flink-CDC只支持單并發(fā)价涝,全量讀取數(shù)據(jù)階段,如果表數(shù)據(jù)據(jù)量級大持舆,讀取效率在小時(shí)級別色瘩。
3.全量讀取階段不支持ckeckpoint
解決辦法
在對于有主鍵的表做初始化模式伪窖,整體的流程主要分為 5 個(gè)階段:
1.Chunk 切分;
2.Chunk 分配居兆;(實(shí)現(xiàn)并行讀取數(shù)據(jù)&CheckPoint)
3.Chunk 讀雀采健;(實(shí)現(xiàn)無鎖讀饶嗥堋)
4.Chunk 匯報(bào)簇宽;
5.Chunk 分配。
Chunk 切分
根據(jù) Netflix DBlog 的論文中的無鎖算法原理吧享,對于目標(biāo)表按照主鍵進(jìn)行數(shù)據(jù)分片魏割,設(shè)置每個(gè)切片的區(qū)間為左閉右開或者左開右閉來保證數(shù)據(jù)的連續(xù)性。
Chunk 分配
將劃分好的 Chunk 分發(fā)給多個(gè) SourceReader钢颂,每個(gè) SourceReader 讀取表中的一部分?jǐn)?shù)據(jù)钞它,實(shí)現(xiàn)了并行讀取的目標(biāo)。
同時(shí)在每個(gè) Chunk 讀取的時(shí)候可以單獨(dú)做 CheckPoint殊鞭,某個(gè) Chunk 讀取失敗只需要單獨(dú)執(zhí)行該 Chunk 的任務(wù)遭垛,而不需要像 1.x 中失敗了只能從頭讀取。
若每個(gè) SourceReader 保證了數(shù)據(jù)一致性操灿,則全表就保證了數(shù)據(jù)一致性锯仪。
Chunk 讀取
讀取可以分為 5 個(gè)階段
1)SourceReader 讀取表數(shù)據(jù)之前先記錄當(dāng)前的 Binlog 位置信息記為低位點(diǎn);
2)SourceReader 將自身區(qū)間內(nèi)的數(shù)據(jù)查詢出來并放置在 buffer 中趾盐;
3)查詢完成之后記錄當(dāng)前的 Binlog 位置信息記為高位點(diǎn)庶喜;
4)在增量部分消費(fèi)從低位點(diǎn)到高位點(diǎn)的 Binlog; 5)根據(jù)主鍵救鲤,對 buffer 中的數(shù)據(jù)進(jìn)行修正并輸出溃卡。
通過以上5個(gè)階段可以保證每個(gè)Chunk最終的輸出就是在高位點(diǎn)時(shí)該Chunk中最新的數(shù)據(jù),但是目前只是做到了保證單個(gè) Chunk 中的數(shù)據(jù)一致性蜒简。
Chunk 匯報(bào)
在 Snapshot Chunk 讀取完成之后瘸羡,有一個(gè)匯報(bào)的流程,如上圖所示搓茬,即 SourceReader 需要將 Snapshot Chunk 完成信息匯報(bào)給 SourceEnumerator犹赖。
Chunk 分配
FlinkCDC 是支持全量+增量數(shù)據(jù)同步的,在 SourceEnumerator 接收到所有的 Snapshot卷仑。Chunk 完成信息之后峻村,還有一個(gè)消費(fèi)增量數(shù)據(jù)(Binlog)的任務(wù),此時(shí)是通過下發(fā) Binlog Chunk給任意一個(gè) SourceReader 進(jìn)行單并發(fā)讀取來實(shí)現(xiàn)的锡凝。