通過flink-cdc的Connector讀取mysql數(shù)據(jù)诈豌,并寫入到其他系統(tǒng)或者數(shù)據(jù)庫,需要先開啟mysql的binlog功能
1. 導(dǎo)入maven 依賴
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.19</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.6</version>
</dependency>
<!-- 引入日志管理相關(guān)依賴-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.6</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.13.6</version>
<type>test-jar</type>
</dependency>
</dependencies>
2. 新建Flink-cdc測(cè)試類
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@Slf4j
public class FlinkCDC {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("127.0.0.1")
.port(3306)
.databaseList("user") // set captured database
.tableList("user.log_info") // set captured table
.username("root")
.password("password")
// 自定義反序列化方式
.deserializer(new CustomDeserialization())
// StartupOptions.initial() 先全量后增量
// .startupOptions(StartupOptions.initial())
// StartupOptions.latest() 從最新binlog讀取仆救,增量方式
.startupOptions(StartupOptions.latest())
.build();
Configuration config = new Configuration();
// config.setString("execution.savepoint.path", "file:///D:\\flink\\checkpoints\\cc52b93fd24977e5388f0a19a30d49d2\\chk-87");
// 啟動(dòng)時(shí)設(shè)置
if(ArrayUtils.isNotEmpty(args)) {
String lasCheckpointPath = args[0];
// 例如 D:\flink\checkpoints\8bdf5d49bb1b4cda56aaa0a590fc2cef\chk-55
// 重啟服務(wù)器指定最新checkpoint路徑,從該路徑指定checkpoint位置恢復(fù)讀取數(shù)據(jù)
config.setString("execution.savepoint.path", lasCheckpointPath);
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
// enable checkpoint
env.enableCheckpointing(3000);
// env.getCheckpointConfig().setCheckpointStorage("file:///D:\\flink\\checkpoints");
// 設(shè)置checkpoint保存位置矫渔,這里設(shè)置為本地文件存儲(chǔ)
env.setStateBackend(new FsStateBackend("file:///D:\\flink\\checkpoints"));
env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.addSink(new CustomSink()).setParallelism(1);
// .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("flinkcdc");
}
}
3.自定義反序列化類
import com.alibaba.fastjson2.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import java.util.List;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
/**
* 自定義序列化器
*/
public class CustomDeserialization implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector)
throws Exception {
JSONObject res = new JSONObject();
// 獲取數(shù)據(jù)庫和表名稱
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Struct value = (Struct) sourceRecord.value();
// 獲取before數(shù)據(jù)
Struct before = value.getStruct("before");
JSONObject beforeJson = new JSONObject();
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
// 獲取after數(shù)據(jù)
Struct after = value.getStruct("after");
JSONObject afterJson = new JSONObject();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//獲取操作類型 READ DELETE UPDATE CREATE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("create".equals(type)) {
type = "insert";
}
// 將字段寫到j(luò)son對(duì)象中
res.put("database", database);
res.put("tableName", tableName);
res.put("before", beforeJson);
res.put("after", afterJson);
res.put("type", type);
//輸出數(shù)據(jù)
collector.collect(res.toString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
4.自定義Sink輸出
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
public class CustomSink extends RichSinkFunction {
@Override
public void invoke(Object value,Context context) throws Exception {
String v = value.toString();
TableData<LogInfo> tableData = JSON.parseObject(v, new
TypeReference<TableData<LogInfo>>() {});
System.out.println(t.toString());
// TODO 保存到其他系統(tǒng)/中間件(mq等)/其他數(shù)據(jù)庫,同學(xué)可以自己根據(jù)情況實(shí)現(xiàn)
// 發(fā)送到消息隊(duì)列rabbitmq或者kafaka中處理
// rabbitmqtemplate.send(tableData );
// 保存數(shù)據(jù)庫
// testMapper.insert(tableData.getAfter());
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TableData<T> {
private String database;
private String tableName;
private String update;
private T before;
private T after;
}
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者