canal應(yīng)用-1個(gè)server+2個(gè)instance+2個(gè)client+2個(gè)mysql

一 canal應(yīng)用架構(gòu)設(shè)計(jì)


組件說(shuō)明:

    1. linux內(nèi)核版本(CentOS Linux 7):(命令:uname -a)
      Linux slave1 3.10.0-693.el7.x86_64 #1 SMP Tue Aug 22 21:09:27 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
    1. mysql版本:(SQL命令:select version(); 或 status)
      Server version: 5.6.43-log MySQL Community Server (GPL)
    1. canal版本:canal-1.1.3
    1. JDK版本: 1.8

canal工作原理:

    1. 模擬mysql slave的交互協(xié)議,偽裝自己為mysql slave,向mysql master發(fā)送dump協(xié)議蹲诀;
    1. mysql master收到dump請(qǐng)求,開(kāi)始推送binary log給slave(也就是canal)平酿;
    1. 解析binary log對(duì)象(原始為byte流)

了解更多詳細(xì)更新可以查看文章:【了解canal尔艇,看這個(gè)就夠了】

二 架構(gòu)落地實(shí)現(xiàn)流程

2.1 mysql配置與安裝

1. 下載安裝

在192.168.175.21和192.168.175.22上分別安裝mysql,具體安裝流程可參考文章:Linux-安裝MySQL.

2. 創(chuàng)建canal賬戶

在創(chuàng)建root賬號(hào)并設(shè)置遠(yuǎn)程訪問(wèn)之后桨仿,接著創(chuàng)建canal賬號(hào)并設(shè)置遠(yuǎn)程訪問(wèn)和權(quán)限:

mysql> CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
mysql> GRANT ALL ON canal.* TO 'canal'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'canal'@'%';
mysql>FLUSH PRIVILEGES;
3. 驗(yàn)證登錄
#遠(yuǎn)程登錄
mysql -h 192.168.175.22 -P 3306 -u canal -pcanal

#本地登錄
mysql -ucanal -pcanal
4. 修改my.cnf配置

分別在175.21和175.22兩臺(tái)服務(wù)器修改my.conf配置,查找my.cnf配置位置命令:whereis my.

192.168.175.21中的my.cnf配置新增如下內(nèi)容:

log_bin=mysql-bin  #指定bin-log的名稱,盡量可以標(biāo)識(shí)業(yè)務(wù)含義
binlog_format=row  #選擇row模式,必須!!!
server_id=1  #mysql服務(wù)器id

2.2 canal server配置與啟動(dòng)

1. 下載canal

下載地址: https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

2.上傳并解壓

進(jìn)入192.168.175.20服務(wù)器,使用rz命令上傳,使用如下命令進(jìn)行解壓至/usr/local/hadoop/app/canal:

tar xzvf canal.deployer-1.1.3.tar.gz -C canal
3. 修改配置

新解壓的文件夾/usr/local/hadoop/app/canal/conf/有一個(gè)example文件夾,一個(gè)example就代表一個(gè)instance實(shí)例.而一個(gè)instance實(shí)例就是一個(gè)消息隊(duì)列,所以這里可以將文件名改為example1,同時(shí)再?gòu)?fù)制出來(lái)一個(gè)叫example2.(命名可以使用監(jiān)聽(tīng)的數(shù)據(jù)庫(kù)名)

修改/usr/local/hadoop/app/canal/conf/example1/instance.properties配置文件:

canal.instance.master.address=192.168.175.21:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.mq.topic=example1

修改/usr/local/hadoop/app/canal/conf/example2/instance.properties配置文件:

canal.instance.master.address=192.168.175.22:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.mq.topic=example2

配置文件參數(shù)說(shuō)明,可查看:https://github.com/alibaba/canal/wiki/AdminGuide

4. 啟動(dòng)canal server

進(jìn)入文件夾/usr/local/hadoop/app/canal/bin執(zhí)行如下命令:

./startup.sh

查看日志/usr/local/hadoop/app/canal/logs/canal/canal.log,出現(xiàn)如下內(nèi)容,即表示啟動(dòng)成功:

2019-06-07 21:15:03.372 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2019-06-07 21:15:03.427 [main] INFO  c.a.o.c.d.monitor.remote.RemoteConfigLoaderFactory - ## load local canal configurations
2019-06-07 21:15:03.529 [main] INFO  com.alibaba.otter.canal.deployer.CanalStater - ## start the canal server.
2019-06-07 21:15:06.251 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.175.22:11111]
2019-06-07 21:15:22.245 [main] INFO  com.alibaba.otter.canal.deployer.CanalStater - ## the canal server is running now ......
5. 啟動(dòng)canal client

注意運(yùn)行canal客戶端代碼時(shí),一定要先啟動(dòng)canal server!!!

(1) 添加pom依賴

    <!--canal-->
    <dependency>
      <groupId>com.alibaba.otter</groupId>
      <artifactId>canal.client</artifactId>
      <version>1.1.3</version>
    </dependency>

(2) canal client代碼:

package com.xgh.canal;


import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;

public class CanalClientTest {

    public static void main(String args[]) {
        // 創(chuàng)建鏈接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.175.20", 11111),
                "example1", "", "");//或者example2
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");//訂閱所有庫(kù)下面的所有表
            //connector.subscribe("canal.t_canal");//訂閱庫(kù)canal庫(kù)下的表t_canal
            connector.rollback();
            int totalEmtryCount = 1200;
            while (emptyCount < totalEmtryCount) {//實(shí)際生產(chǎn)中需要設(shè)置為true,死循環(huán)
                Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù)
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);//此時(shí)代表當(dāng)前數(shù)據(jù)庫(kù)無(wú)遍更數(shù)據(jù)
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    emptyCount = 0;
                    System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交確認(rèn)
                // connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù)
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
            System.out.println("rowChare ======>"+rowChage.toString());

            EventType eventType = rowChage.getEventType(); //事件類型,比如insert,update,delete
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(),//mysql的my.cnf配置中的log-bin名稱
                    entry.getHeader().getLogfileOffset(), //偏移量
                    entry.getHeader().getSchemaName(),//庫(kù)名
                    entry.getHeader().getTableName(), //表名
                    eventType));//事件名

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

}

canal client運(yùn)行實(shí)例:

empty count : 1
empty count : 2
empty count : 3
empty count : 4
6. 觸發(fā)數(shù)據(jù)庫(kù)變更

創(chuàng)建庫(kù):create database canal;
創(chuàng)建表:create table t_canal (id int,name varchar(20),status int);
插入數(shù)據(jù):insert into t_canal values(10,'hello',1);

canal client輸出日志:

================> binlog[mysql-bin.000001:6764] , name[canal,t_canal] , eventType : INSERT
id : 10    update=true
name : hello    update=true
status : 1    update=true

三. 自問(wèn)自答-為何設(shè)置了數(shù)據(jù)表的過(guò)濾條件锡足,但貌似沒(méi)有生效波丰?

:首先看文檔AdminGuide,了解canal.instance.filter.regex的書寫格式舶得。mysql 數(shù)據(jù)解析關(guān)注的表,Perl正則表達(dá)式.多個(gè)正則之間以逗號(hào)(,)分隔爽蝴,轉(zhuǎn)義符需要雙斜杠(\)
常見(jiàn)例子:

  1. 所有表:.* or .\..
  2. canal schema下所有表: canal\..*
  3. canal下的以canal打頭的表:canal\.canal.*
  4. canal schema下的一張表:canal.test1
  5. 多個(gè)規(guī)則組合使用:canal\..*,mysql.test1,mysql.test2 (逗號(hào)分隔)

檢查binlog格式沐批,過(guò)濾條件只針對(duì)row模式的數(shù)據(jù)有效(ps. mixed/statement因?yàn)椴唤馕鰏ql,所以無(wú)法準(zhǔn)確提取tableName進(jìn)行過(guò)濾)蝎亚。

檢查下CanalConnector是否調(diào)用subscribe(filter)方法九孩;有的話,filter需要和instance.properties的canal.instance.filter.regex一致发框,否則subscribe的filter會(huì)覆蓋instance的配置躺彬,如果subscribe的filter是...,那么相當(dāng)于你消費(fèi)了所有的更新數(shù)據(jù) 【特別注意

參考文章:
  1. https://www.cnblogs.com/jayinnn/p/9606466.html
  2. https://github.com/alibaba/canal
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末梅惯,一起剝皮案震驚了整個(gè)濱河市宪拥,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌铣减,老刑警劉巖她君,帶你破解...
    沈念sama閱讀 212,816評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場(chǎng)離奇詭異葫哗,居然都是意外死亡缔刹,警方通過(guò)查閱死者的電腦和手機(jī),發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,729評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門劣针,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái)校镐,“玉大人,你說(shuō)我怎么就攤上這事捺典∧窭” “怎么了?”我有些...
    開(kāi)封第一講書人閱讀 158,300評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵辣苏,是天一觀的道長(zhǎng)肝箱。 經(jīng)常有香客問(wèn)我,道長(zhǎng)稀蟋,這世上最難降的妖魔是什么煌张? 我笑而不...
    開(kāi)封第一講書人閱讀 56,780評(píng)論 1 285
  • 正文 為了忘掉前任,我火速辦了婚禮退客,結(jié)果婚禮上骏融,老公的妹妹穿的比我還像新娘链嘀。我一直安慰自己,他們只是感情好档玻,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,890評(píng)論 6 385
  • 文/花漫 我一把揭開(kāi)白布怀泊。 她就那樣靜靜地躺著,像睡著了一般误趴。 火紅的嫁衣襯著肌膚如雪霹琼。 梳的紋絲不亂的頭發(fā)上,一...
    開(kāi)封第一講書人閱讀 50,084評(píng)論 1 291
  • 那天凉当,我揣著相機(jī)與錄音枣申,去河邊找鬼。 笑死看杭,一個(gè)胖子當(dāng)著我的面吹牛忠藤,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播楼雹,決...
    沈念sama閱讀 39,151評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼模孩,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來(lái)了贮缅?” 一聲冷哼從身側(cè)響起榨咐,我...
    開(kāi)封第一講書人閱讀 37,912評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎携悯,沒(méi)想到半個(gè)月后祭芦,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 44,355評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡憔鬼,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,666評(píng)論 2 327
  • 正文 我和宋清朗相戀三年龟劲,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片轴或。...
    茶點(diǎn)故事閱讀 38,809評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡昌跌,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出照雁,到底是詐尸還是另有隱情蚕愤,我是刑警寧澤,帶...
    沈念sama閱讀 34,504評(píng)論 4 334
  • 正文 年R本政府宣布饺蚊,位于F島的核電站萍诱,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏污呼。R本人自食惡果不足惜裕坊,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,150評(píng)論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望燕酷。 院中可真熱鬧籍凝,春花似錦周瞎、人聲如沸。這莊子的主人今日做“春日...
    開(kāi)封第一講書人閱讀 30,882評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)。三九已至退盯,卻和暖如春彼乌,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背渊迁。 一陣腳步聲響...
    開(kāi)封第一講書人閱讀 32,121評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工囤攀, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人宫纬。 一個(gè)月前我還...
    沈念sama閱讀 46,628評(píng)論 2 362
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像膏萧,于是被迫代替她去往敵國(guó)和親漓骚。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,724評(píng)論 2 351

推薦閱讀更多精彩內(nèi)容