canal 基于Mysql數(shù)據(jù)庫(kù)增量日志解析
?1.前言
?最近太多事情 工作的事情竿刁,以及終身大事等等 耽誤更新勤讽,由于最近做項(xiàng)目需要同步監(jiān)聽(tīng) 未來(lái)電視 mysql的變更了解到公司會(huì)用canal做增量監(jiān)聽(tīng)琼牧,就嘗試使用了一下 這里做個(gè)demo 簡(jiǎn)單的記錄一下吭服。
?2.canal簡(jiǎn)介
?canal:主要用途是基于 MySQL 數(shù)據(jù)庫(kù)增量日志解析独撇,提供增量數(shù)據(jù)訂閱和消費(fèi)的中間件
?當(dāng)前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
?3.MySQL 注備復(fù)制原理
??3.1 mysql主備復(fù)制工作原理
??1.MySQL master 將數(shù)據(jù)變更寫入二進(jìn)制日志( binary log, 其中記錄叫做二進(jìn)制日志事件binary log events攀痊,可以通過(guò) show binlog events 進(jìn)行查看)
??2.MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)
??3.MySQL slave 重放 relay log 中事件丽涩,將數(shù)據(jù)變更反映它自己的數(shù)據(jù)
??3.2 canal 工作原理
??1.canal 模擬 MySQL slave 的交互協(xié)議棺滞,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送dump 協(xié)議
??2.MySQL master 收到 dump 請(qǐng)求矢渊,開(kāi)始推送 binary log 給 slave (即 canal )
??3.canal 解析 binary log 對(duì)象(原始為 byte 流)
?4.準(zhǔn)備
?對(duì)于自建MySQL ,需要先開(kāi)啟 Binlog寫入功能继准,并且配置binlog-format 為Row模式 在my.cnf中配置
?授權(quán) canal 鏈接 MySQL 賬號(hào)具有作為 MySQL slave 的權(quán)限, 如果已有賬戶可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
?5.canal 下載安裝配置
??5.1 canal下載
??canal 下載地址 (下載速度可能很慢)
??下載 canal.deployer-xxx.tar.gz 如 canal.deployer-1.1.4.tar.gz
??解壓后 可以看到如下結(jié)構(gòu)
??5.2 canal 初始配置
??配置修改:
vim conf/example/instance.properties
??如下:
#################################################
## mysql serverId
canal.instance.mysql.slaveId = 2020
# position info 修改自己的數(shù)據(jù)庫(kù)(canal要監(jiān)聽(tīng)的數(shù)據(jù)庫(kù) 地址 )
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
# username/password 修改成自己 數(shù)據(jù)庫(kù)信息的賬號(hào) (單獨(dú)開(kāi)一個(gè) 準(zhǔn)備階段創(chuàng)建的賬號(hào))
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
# table regex 表的監(jiān)聽(tīng)規(guī)則
# canal.instance.filter.regex = blogs\.blog_info
canal.instance.filter.regex = .\*\\\\..\*
# table black regex
canal.instance.filter.black.regex =
??啟動(dòng)canal
sh bin/startup.sh
??查看server日志
??看到 the canal server is running now 表示啟動(dòng)成功
vi logs/canal/canal.log
2020-01-08 15:25:33.361 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2020-01-08 15:25:33.468 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.12.245:11111]
2020-01-08 15:25:34.061 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
??查看instance的日志
vi logs/example/example.log
2020-01-08 15:25:33.864 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-blogs
2020-01-08 15:25:33.998 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
2020-01-08 15:25:33.999 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master status
??5.3 擴(kuò)展 destination 配置
vi conf/canal.properties
??在canal.destinations 處可以配置當(dāng)前server上部署的instance 列表 默認(rèn)為 example ,我這里改成了 blogs最好對(duì)應(yīng)數(shù)據(jù)庫(kù)名稱矮男。一個(gè)instance 對(duì)應(yīng)一個(gè) 數(shù)據(jù)庫(kù)
?6.創(chuàng)建Java 客戶端 監(jiān)聽(tīng)canal 消費(fèi)數(shù)據(jù)
??6.1 創(chuàng)建maven項(xiàng)目
??6.2 添加canal client POM 依賴
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
??6.3 創(chuàng)建 canal 的客戶端監(jiān)聽(tīng)
??CanalMessageListener.java
??該類實(shí)現(xiàn)InitializingBean 主要是在初始化的時(shí)候 執(zhí)行 init 方法移必,在init()方法中 創(chuàng)建 CanalConnector對(duì)象,連接需要監(jiān)聽(tīng)的canal毡鉴,主要提供 canal的 host 崔泵,port ,destination 猪瞬,以及username 和 password
??parse 方法 主要用于將監(jiān)聽(tīng)的對(duì)象 通過(guò)反射等轉(zhuǎn)換成對(duì)應(yīng)的實(shí)體類
/**
* @author johnny
**/
@Component
@Slf4j
@ConditionalOnProperty(name = "application.canal.accessor", havingValue = "canal")
public class CanalMessageListener implements InitializingBean, ParseCanal {
private CanalConnector connector;
@Autowired
private CanalConfig canalConfig;
@Autowired
private IParseDispatcher configParseDispatcher;
private void init() {
//創(chuàng)建canal 監(jiān)聽(tīng) 傳入host port destination等參數(shù)
connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getHost(), canalConfig.getPort()),
canalConfig.getDestination(), canalConfig.getUsername(), canalConfig.getPassword());
connector.connect();
// .*\..*
connector.subscribe(".*\\..*");
connector.rollback();
new Thread(() -> {
while (true) {
Message message = connector.getWithoutAck(canalConfig.getBatchSize());
long batchId = message.getId();
long size = message.getEntries().size();
//batchId == -1 表示沒(méi)有數(shù)據(jù)變更
if (batchId == -1 || size == 0) {
System.out.println("empty data ");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
//解析數(shù)據(jù)變更
resoleveEntry(message.getEntries());
}
}
}).start();
}
//解析數(shù)據(jù)變更
private void resoleveEntry(List<CanalEntry.Entry> entries) {
CanalEntry.RowChange rowChange = null;
for (CanalEntry.Entry row : entries) {
//判斷是否是 事物開(kāi)始 和 事物結(jié)束
if (row.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || row.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
try {
rowChange = CanalEntry.RowChange.parseFrom(row.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
String tableName = row.getHeader().getTableName();
CanalEntry.EventType eventType = row.getHeader().getEventType();
for (CanalEntry.RowData rowData : rowDataList) {
if (eventType == CanalEntry.EventType.UPDATE) {
List<CanalEntry.Column> columns = rowData.getBeforeColumnsList();
Object object = parse(columns, tableName);
log.info("收到的 object:{}", JsonUtils.marshalToString(object));
//根據(jù)收到的對(duì)象 處理后續(xù)業(yè)務(wù)邏輯
}
}
}
}
@Override
public void afterPropertiesSet() throws Exception {
init();
}
//解析 List<CanalEntry.Column>對(duì)象到對(duì)應(yīng)的 實(shí)體類
@Override
public Object parse(List<CanalEntry.Column> canalDatas, String tableName) {
//根據(jù)配置好的map 從中根據(jù)key 表名 獲取對(duì)應(yīng)的映射后的 實(shí)體類class
String className = configParseDispatcher.dispatch(tableName);
Object entity = null;
Class c = null;
try {
c = Class.forName(className);
entity = c.newInstance();
} catch (ClassNotFoundException e) {
log.error("【未找到對(duì)應(yīng) {} 的 實(shí)體類 】", className);
} catch (Exception e) {
}
for (CanalEntry.Column canalDataColumn : canalDatas) {
String columnName = canalDataColumn.getName();
Field[] fields = c.getDeclaredFields();
for (Field field : fields) {
Object fieldValue = null;
field.setAccessible(true);
String fiedName = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, field.getName());
log.info("【filedName: {}】", fiedName);
if (fiedName.equals(columnName)) {
try {
if (Long.class.equals(field.getType())) {
fieldValue = NumberUtils.toLong(canalDataColumn.getValue());
}else if(Integer.class.equals(field.getType())){
fieldValue = NumberUtils.toInt(canalDataColumn.getValue());
}else if(Double.class.equals(field.getType())){
fieldValue = NumberUtils.toDouble(canalDataColumn.getValue());
}else if(Date.class.equals(field.getType())){
try {
fieldValue = DateUtils.parseDate(canalDataColumn.getValue(), new String[]{"yyyy-MM-dd HH:mm:ss"});
} catch (ParseException e) {
e.printStackTrace();
}
}else{
fieldValue = canalDataColumn.getValue();
}
field.set(entity, fieldValue);
break;
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
}
return entity;
}
}
??application.yml
??配置canal 地址憎瘸,以及表名和實(shí)體的映射規(guī)則
server:
port: 8881
application:
canal:
accessor: canal
host: 127.0.0.1
port: 11111
username:
password:
destination: blogs
batchSize: 30
parse: 規(guī)則,根據(jù)表名獲取對(duì)應(yīng)要映射的 實(shí)體class
rule:
mapping:
blog_info: com.johnny.canal.canal_test.entity.BlogInfo
??IParseDispatcher.java
??接口:用來(lái)根據(jù)表名key獲取對(duì)應(yīng)的 要映射的實(shí)體陈瘦,這里寫成接口是因?yàn)榭梢蕴峁┒喾N獲取方式幌甘,比如我這里通過(guò)yml 配置去獲取
/**
* @author johnny
* @create 2020-01-17 上午11:09
**/
public interface IParseDispatcher {
String dispatch(String key);
}
??ConfigParseDispatcher.java
??實(shí)現(xiàn)上面的接口,提供一種從 application.yml 獲取初始源配置 根據(jù) application.canal.parse.rule進(jìn)行配置
/**
* @author johnny
* @create 2020-01-17 上午11:07
**/
@Data
@Configuration
@ConfigurationProperties(prefix = "application.canal.parse.rule")
public class ConfigParseDispatcher implements IParseDispatcher {
private Map<String,String> mapping=new HashMap<>();
@Override
public String dispatch(String key) {
return mapping.get(key);
}
}
??7.演示
??啟動(dòng)項(xiàng)目 此時(shí)控制臺(tái)打印 empty data 痊项,無(wú)數(shù)據(jù)變更
??通過(guò)執(zhí)行 在 canal監(jiān)聽(tīng)的mysql 上執(zhí)行 更新語(yǔ)句
update blog_info set blog_title = 'SpringBoot配置相關(guān)for canal test ' where id = 40
??debug 程序锅风,當(dāng)執(zhí)行上面的update語(yǔ)句后 可以看到立即收到
??通過(guò)parse方法解析為對(duì)應(yīng)的 實(shí)體對(duì)象,后續(xù)做自己的業(yè)務(wù)邏輯 即可
?8.總結(jié)
?本篇主要介紹了canal是什么鞍泉,如何下載安裝和配置 皱埠,以及提供了自己寫的一個(gè)簡(jiǎn)單demo 。后續(xù)有機(jī)會(huì)深入了解一下canal的其他功能咖驮,比如 如何同步到Kafka/RocketMQ等等漱逸。。
個(gè)人博客網(wǎng)站 https://www.askajohnny.com 歡迎來(lái)訪問(wèn)游沿!
本文由博客一文多發(fā)平臺(tái) OpenWrite 發(fā)布!