canal 基于Mysql數(shù)據(jù)庫(kù)增量日志解析

canal 基于Mysql數(shù)據(jù)庫(kù)增量日志解析

?1.前言

?最近太多事情 工作的事情竿刁,以及終身大事等等 耽誤更新勤讽,由于最近做項(xiàng)目需要同步監(jiān)聽(tīng) 未來(lái)電視 mysql的變更了解到公司會(huì)用canal做增量監(jiān)聽(tīng)琼牧,就嘗試使用了一下 這里做個(gè)demo 簡(jiǎn)單的記錄一下吭服。

?2.canal簡(jiǎn)介

?canal:主要用途是基于 MySQL 數(shù)據(jù)庫(kù)增量日志解析独撇,提供增量數(shù)據(jù)訂閱和消費(fèi)的中間件
?當(dāng)前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

Xnip20200117_145806.png

?3.MySQL 注備復(fù)制原理

Xnip20200117_150056.png

??3.1 mysql主備復(fù)制工作原理

??1.MySQL master 將數(shù)據(jù)變更寫入二進(jìn)制日志( binary log, 其中記錄叫做二進(jìn)制日志事件binary log events攀痊,可以通過(guò) show binlog events 進(jìn)行查看)
??2.MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)
??3.MySQL slave 重放 relay log 中事件丽涩,將數(shù)據(jù)變更反映它自己的數(shù)據(jù)

??3.2 canal 工作原理

??1.canal 模擬 MySQL slave 的交互協(xié)議棺滞,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送dump 協(xié)議
??2.MySQL master 收到 dump 請(qǐng)求矢渊,開(kāi)始推送 binary log 給 slave (即 canal )
??3.canal 解析 binary log 對(duì)象(原始為 byte 流)

?4.準(zhǔn)備

?對(duì)于自建MySQL ,需要先開(kāi)啟 Binlog寫入功能继准,并且配置binlog-format 為Row模式 在my.cnf中配置

Xnip20200117_151803.png

?授權(quán) canal 鏈接 MySQL 賬號(hào)具有作為 MySQL slave 的權(quán)限, 如果已有賬戶可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

?5.canal 下載安裝配置

??5.1 canal下載

??canal 下載地址 (下載速度可能很慢)

??下載 canal.deployer-xxx.tar.gz 如 canal.deployer-1.1.4.tar.gz

??解壓后 可以看到如下結(jié)構(gòu)


Xnip20200117_150905.png

??5.2 canal 初始配置

??配置修改:

vim conf/example/instance.properties

??如下:

#################################################
## mysql serverId
canal.instance.mysql.slaveId = 2020

# position info 修改自己的數(shù)據(jù)庫(kù)(canal要監(jiān)聽(tīng)的數(shù)據(jù)庫(kù) 地址 )
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 

#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 

# username/password 修改成自己 數(shù)據(jù)庫(kù)信息的賬號(hào) (單獨(dú)開(kāi)一個(gè) 準(zhǔn)備階段創(chuàng)建的賬號(hào))
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8

# table regex  表的監(jiān)聽(tīng)規(guī)則 
# canal.instance.filter.regex = blogs\.blog_info  
canal.instance.filter.regex = .\*\\\\..\*
# table black regex
canal.instance.filter.black.regex = 

??啟動(dòng)canal

sh bin/startup.sh

??查看server日志
??看到 the canal server is running now 表示啟動(dòng)成功

vi logs/canal/canal.log


2020-01-08 15:25:33.361 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ##    start the canal server.
2020-01-08 15:25:33.468 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.12.245:11111]
2020-01-08 15:25:34.061 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

??查看instance的日志

vi logs/example/example.log

2020-01-08 15:25:33.864 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-blogs 
2020-01-08 15:25:33.998 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
2020-01-08 15:25:33.999 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master status

??5.3 擴(kuò)展 destination 配置

vi conf/canal.properties

??在canal.destinations 處可以配置當(dāng)前server上部署的instance 列表 默認(rèn)為 example ,我這里改成了 blogs最好對(duì)應(yīng)數(shù)據(jù)庫(kù)名稱矮男。一個(gè)instance 對(duì)應(yīng)一個(gè) 數(shù)據(jù)庫(kù)

Xnip20200117_153243.png
Xnip20200117_153644.png

?6.創(chuàng)建Java 客戶端 監(jiān)聽(tīng)canal 消費(fèi)數(shù)據(jù)

??6.1 創(chuàng)建maven項(xiàng)目

??6.2 添加canal client POM 依賴

<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>1.1.0</version>
</dependency>

??6.3 創(chuàng)建 canal 的客戶端監(jiān)聽(tīng)

??CanalMessageListener.java

??該類實(shí)現(xiàn)InitializingBean 主要是在初始化的時(shí)候 執(zhí)行 init 方法移必,在init()方法中 創(chuàng)建 CanalConnector對(duì)象,連接需要監(jiān)聽(tīng)的canal毡鉴,主要提供 canal的 host 崔泵,port ,destination 猪瞬,以及username 和 password

??parse 方法 主要用于將監(jiān)聽(tīng)的對(duì)象 通過(guò)反射等轉(zhuǎn)換成對(duì)應(yīng)的實(shí)體類

/**
* @author johnny
**/
@Component
@Slf4j
@ConditionalOnProperty(name = "application.canal.accessor", havingValue = "canal")
public class CanalMessageListener implements InitializingBean, ParseCanal {


private CanalConnector connector;

@Autowired
private CanalConfig canalConfig;

@Autowired
private IParseDispatcher configParseDispatcher;

private void init() {
    //創(chuàng)建canal 監(jiān)聽(tīng) 傳入host port destination等參數(shù)
    connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getHost(), canalConfig.getPort()),
            canalConfig.getDestination(), canalConfig.getUsername(), canalConfig.getPassword());
    connector.connect();
    //  .*\..*
    connector.subscribe(".*\\..*");
    connector.rollback();

    new Thread(() -> {
    
        while (true) {
            Message message = connector.getWithoutAck(canalConfig.getBatchSize());
            long batchId = message.getId();
            long size = message.getEntries().size();
        //batchId == -1 表示沒(méi)有數(shù)據(jù)變更
            if (batchId == -1 || size == 0) {
                System.out.println("empty data ");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            } else {
            //解析數(shù)據(jù)變更
                resoleveEntry(message.getEntries());
            }
        }

    }).start();

}
//解析數(shù)據(jù)變更
private void resoleveEntry(List<CanalEntry.Entry> entries) {
    CanalEntry.RowChange rowChange = null;
    for (CanalEntry.Entry row : entries) {
     //判斷是否是 事物開(kāi)始 和 事物結(jié)束 
        if (row.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || row.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
            continue;
        }
        try {
            rowChange = CanalEntry.RowChange.parseFrom(row.getStoreValue());
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }

        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
        String tableName = row.getHeader().getTableName();
        CanalEntry.EventType eventType = row.getHeader().getEventType();

        for (CanalEntry.RowData rowData : rowDataList) {
            if (eventType == CanalEntry.EventType.UPDATE) {
                List<CanalEntry.Column> columns = rowData.getBeforeColumnsList();
                Object object = parse(columns, tableName);
                log.info("收到的 object:{}", JsonUtils.marshalToString(object));
                //根據(jù)收到的對(duì)象 處理后續(xù)業(yè)務(wù)邏輯
            }
        }

    }
}

@Override
public void afterPropertiesSet() throws Exception {
    init();
}

//解析 List<CanalEntry.Column>對(duì)象到對(duì)應(yīng)的 實(shí)體類
@Override
public Object parse(List<CanalEntry.Column> canalDatas, String tableName) {
//根據(jù)配置好的map 從中根據(jù)key 表名 獲取對(duì)應(yīng)的映射后的 實(shí)體類class
    String className = configParseDispatcher.dispatch(tableName);
    Object entity = null;
    Class c = null;
    try {
        c = Class.forName(className);
        entity = c.newInstance();
    } catch (ClassNotFoundException e) {
        log.error("【未找到對(duì)應(yīng) {} 的 實(shí)體類 】", className);
    } catch (Exception e) {
    }

    for (CanalEntry.Column canalDataColumn : canalDatas) {
        String columnName = canalDataColumn.getName();
        Field[] fields = c.getDeclaredFields();

        for (Field field : fields) {
            Object fieldValue = null;
            field.setAccessible(true);
            String fiedName = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, field.getName());
            log.info("【filedName: {}】", fiedName);
            if (fiedName.equals(columnName)) {
                try {
                    if (Long.class.equals(field.getType())) {
                        fieldValue = NumberUtils.toLong(canalDataColumn.getValue());
                    }else if(Integer.class.equals(field.getType())){
                        fieldValue = NumberUtils.toInt(canalDataColumn.getValue());
                    }else if(Double.class.equals(field.getType())){
                        fieldValue = NumberUtils.toDouble(canalDataColumn.getValue());
                    }else if(Date.class.equals(field.getType())){
                        try {
                            fieldValue = DateUtils.parseDate(canalDataColumn.getValue(), new String[]{"yyyy-MM-dd HH:mm:ss"});
                        } catch (ParseException e) {
                            e.printStackTrace();
                        }
                    }else{
                        fieldValue = canalDataColumn.getValue();
                    }
                    field.set(entity, fieldValue);
                    break;
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                }
            }
        }

    }
    return entity;
}
}

??application.yml
??配置canal 地址憎瘸,以及表名和實(shí)體的映射規(guī)則

server:
port: 8881



application:
  canal:
    accessor: canal
    host: 127.0.0.1
    port: 11111
    username:
    password:
    destination: blogs
    batchSize: 30

    parse:   規(guī)則,根據(jù)表名獲取對(duì)應(yīng)要映射的 實(shí)體class
      rule:
        mapping:
          blog_info: com.johnny.canal.canal_test.entity.BlogInfo

??IParseDispatcher.java
??接口:用來(lái)根據(jù)表名key獲取對(duì)應(yīng)的 要映射的實(shí)體陈瘦,這里寫成接口是因?yàn)榭梢蕴峁┒喾N獲取方式幌甘,比如我這里通過(guò)yml 配置去獲取

/**
* @author johnny
* @create 2020-01-17 上午11:09
**/
public interface IParseDispatcher {

 String dispatch(String key);

}

??ConfigParseDispatcher.java
??實(shí)現(xiàn)上面的接口,提供一種從 application.yml 獲取初始源配置 根據(jù) application.canal.parse.rule進(jìn)行配置

/**
* @author johnny
* @create 2020-01-17 上午11:07
**/
@Data
@Configuration
@ConfigurationProperties(prefix = "application.canal.parse.rule")
public class ConfigParseDispatcher implements IParseDispatcher {

private Map<String,String> mapping=new HashMap<>();

@Override
public String dispatch(String key) {
    return mapping.get(key);
}

}

??7.演示

??啟動(dòng)項(xiàng)目 此時(shí)控制臺(tái)打印 empty data 痊项,無(wú)數(shù)據(jù)變更

Xnip20200117_160125.png

??通過(guò)執(zhí)行 在 canal監(jiān)聽(tīng)的mysql 上執(zhí)行 更新語(yǔ)句

update blog_info set blog_title = 'SpringBoot配置相關(guān)for canal test '  where id = 40

??debug 程序锅风,當(dāng)執(zhí)行上面的update語(yǔ)句后 可以看到立即收到


Xnip20200117_160552.png

??通過(guò)parse方法解析為對(duì)應(yīng)的 實(shí)體對(duì)象,后續(xù)做自己的業(yè)務(wù)邏輯 即可

Xnip20200117_160718.png

?8.總結(jié)

?本篇主要介紹了canal是什么鞍泉,如何下載安裝和配置 皱埠,以及提供了自己寫的一個(gè)簡(jiǎn)單demo 。后續(xù)有機(jī)會(huì)深入了解一下canal的其他功能咖驮,比如 如何同步到Kafka/RocketMQ等等漱逸。。

個(gè)人博客網(wǎng)站 https://www.askajohnny.com 歡迎來(lái)訪問(wèn)游沿!
本文由博客一文多發(fā)平臺(tái) OpenWrite 發(fā)布!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末肮砾,一起剝皮案震驚了整個(gè)濱河市诀黍,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌仗处,老刑警劉巖眯勾,帶你破解...
    沈念sama閱讀 217,657評(píng)論 6 505
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件枣宫,死亡現(xiàn)場(chǎng)離奇詭異,居然都是意外死亡吃环,警方通過(guò)查閱死者的電腦和手機(jī)也颤,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,889評(píng)論 3 394
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)郁轻,“玉大人翅娶,你說(shuō)我怎么就攤上這事『梦ǎ” “怎么了竭沫?”我有些...
    開(kāi)封第一講書(shū)人閱讀 164,057評(píng)論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長(zhǎng)骑篙。 經(jīng)常有香客問(wèn)我蜕提,道長(zhǎng),這世上最難降的妖魔是什么靶端? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 58,509評(píng)論 1 293
  • 正文 為了忘掉前任谎势,我火速辦了婚禮,結(jié)果婚禮上杨名,老公的妹妹穿的比我還像新娘脏榆。我一直安慰自己,他們只是感情好镣煮,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,562評(píng)論 6 392
  • 文/花漫 我一把揭開(kāi)白布姐霍。 她就那樣靜靜地躺著,像睡著了一般典唇。 火紅的嫁衣襯著肌膚如雪镊折。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書(shū)人閱讀 51,443評(píng)論 1 302
  • 那天介衔,我揣著相機(jī)與錄音恨胚,去河邊找鬼。 笑死炎咖,一個(gè)胖子當(dāng)著我的面吹牛赃泡,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播乘盼,決...
    沈念sama閱讀 40,251評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼升熊,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了绸栅?” 一聲冷哼從身側(cè)響起级野,我...
    開(kāi)封第一講書(shū)人閱讀 39,129評(píng)論 0 276
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎粹胯,沒(méi)想到半個(gè)月后蓖柔,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體辰企,經(jīng)...
    沈念sama閱讀 45,561評(píng)論 1 314
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,779評(píng)論 3 335
  • 正文 我和宋清朗相戀三年况鸣,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了牢贸。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 39,902評(píng)論 1 348
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡镐捧,死狀恐怖潜索,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情愤估,我是刑警寧澤帮辟,帶...
    沈念sama閱讀 35,621評(píng)論 5 345
  • 正文 年R本政府宣布,位于F島的核電站玩焰,受9級(jí)特大地震影響由驹,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜昔园,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,220評(píng)論 3 328
  • 文/蒙蒙 一蔓榄、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧默刚,春花似錦甥郑、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 31,838評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至邪锌,卻和暖如春勉躺,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背觅丰。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,971評(píng)論 1 269
  • 我被黑心中介騙來(lái)泰國(guó)打工饵溅, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人妇萄。 一個(gè)月前我還...
    沈念sama閱讀 48,025評(píng)論 2 370
  • 正文 我出身青樓蜕企,卻偏偏與公主長(zhǎng)得像,于是被迫代替她去往敵國(guó)和親冠句。 傳聞我的和親對(duì)象是個(gè)殘疾皇子轻掩,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,843評(píng)論 2 354

推薦閱讀更多精彩內(nèi)容