一 canal應(yīng)用架構(gòu)設(shè)計(jì)
組件說(shuō)明:
- linux內(nèi)核版本(CentOS Linux 7):(命令:uname -a)
Linux slave1 3.10.0-693.el7.x86_64 #1 SMP Tue Aug 22 21:09:27 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
- linux內(nèi)核版本(CentOS Linux 7):(命令:uname -a)
- mysql版本:(SQL命令:select version(); 或 status)
Server version: 5.6.43-log MySQL Community Server (GPL)
- mysql版本:(SQL命令:select version(); 或 status)
- canal版本:canal-1.1.3
- JDK版本: 1.8
canal工作原理:
- 模擬mysql slave的交互協(xié)議,偽裝自己為mysql slave,向mysql master發(fā)送dump協(xié)議蹲诀;
- mysql master收到dump請(qǐng)求,開(kāi)始推送binary log給slave(也就是canal)平酿;
- 解析binary log對(duì)象(原始為byte流)
了解更多詳細(xì)更新可以查看文章:【了解canal尔艇,看這個(gè)就夠了】
二 架構(gòu)落地實(shí)現(xiàn)流程
2.1 mysql配置與安裝
1. 下載安裝
在192.168.175.21和192.168.175.22上分別安裝mysql,具體安裝流程可參考文章:Linux-安裝MySQL.
2. 創(chuàng)建canal賬戶
在創(chuàng)建root賬號(hào)并設(shè)置遠(yuǎn)程訪問(wèn)之后桨仿,接著創(chuàng)建canal賬號(hào)并設(shè)置遠(yuǎn)程訪問(wèn)和權(quán)限:
mysql> CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
mysql> GRANT ALL ON canal.* TO 'canal'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'canal'@'%';
mysql>FLUSH PRIVILEGES;
3. 驗(yàn)證登錄
#遠(yuǎn)程登錄
mysql -h 192.168.175.22 -P 3306 -u canal -pcanal
#本地登錄
mysql -ucanal -pcanal
4. 修改my.cnf配置
分別在175.21和175.22兩臺(tái)服務(wù)器修改my.conf配置,查找my.cnf配置位置命令:whereis my
.
192.168.175.21中的my.cnf配置新增如下內(nèi)容:
log_bin=mysql-bin #指定bin-log的名稱,盡量可以標(biāo)識(shí)業(yè)務(wù)含義
binlog_format=row #選擇row模式,必須!!!
server_id=1 #mysql服務(wù)器id
2.2 canal server配置與啟動(dòng)
1. 下載canal
下載地址: https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
2.上傳并解壓
進(jìn)入192.168.175.20服務(wù)器,使用rz命令上傳,使用如下命令進(jìn)行解壓至/usr/local/hadoop/app/canal
:
tar xzvf canal.deployer-1.1.3.tar.gz -C canal
3. 修改配置
新解壓的文件夾/usr/local/hadoop/app/canal/conf/
有一個(gè)example
文件夾,一個(gè)example就代表一個(gè)instance實(shí)例.而一個(gè)instance實(shí)例就是一個(gè)消息隊(duì)列,所以這里可以將文件名改為example1,同時(shí)再?gòu)?fù)制出來(lái)一個(gè)叫example2.(命名可以使用監(jiān)聽(tīng)的數(shù)據(jù)庫(kù)名)
修改/usr/local/hadoop/app/canal/conf/example1/instance.properties
配置文件:
canal.instance.master.address=192.168.175.21:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.mq.topic=example1
修改/usr/local/hadoop/app/canal/conf/example2/instance.properties
配置文件:
canal.instance.master.address=192.168.175.22:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.mq.topic=example2
配置文件參數(shù)說(shuō)明,可查看:https://github.com/alibaba/canal/wiki/AdminGuide
4. 啟動(dòng)canal server
進(jìn)入文件夾/usr/local/hadoop/app/canal/bin
執(zhí)行如下命令:
./startup.sh
查看日志/usr/local/hadoop/app/canal/logs/canal/canal.log
,出現(xiàn)如下內(nèi)容,即表示啟動(dòng)成功:
2019-06-07 21:15:03.372 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2019-06-07 21:15:03.427 [main] INFO c.a.o.c.d.monitor.remote.RemoteConfigLoaderFactory - ## load local canal configurations
2019-06-07 21:15:03.529 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## start the canal server.
2019-06-07 21:15:06.251 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.175.22:11111]
2019-06-07 21:15:22.245 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## the canal server is running now ......
5. 啟動(dòng)canal client
注意運(yùn)行canal客戶端代碼時(shí),一定要先啟動(dòng)canal server!!!
(1) 添加pom依賴
<!--canal-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.3</version>
</dependency>
(2) canal client代碼:
package com.xgh.canal;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
public class CanalClientTest {
public static void main(String args[]) {
// 創(chuàng)建鏈接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.175.20", 11111),
"example1", "", "");//或者example2
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");//訂閱所有庫(kù)下面的所有表
//connector.subscribe("canal.t_canal");//訂閱庫(kù)canal庫(kù)下的表t_canal
connector.rollback();
int totalEmtryCount = 1200;
while (emptyCount < totalEmtryCount) {//實(shí)際生產(chǎn)中需要設(shè)置為true,死循環(huán)
Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù)
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);//此時(shí)代表當(dāng)前數(shù)據(jù)庫(kù)無(wú)遍更數(shù)據(jù)
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
emptyCount = 0;
System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交確認(rèn)
// connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù)
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
System.out.println("rowChare ======>"+rowChage.toString());
EventType eventType = rowChage.getEventType(); //事件類型,比如insert,update,delete
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(),//mysql的my.cnf配置中的log-bin名稱
entry.getHeader().getLogfileOffset(), //偏移量
entry.getHeader().getSchemaName(),//庫(kù)名
entry.getHeader().getTableName(), //表名
eventType));//事件名
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
canal client運(yùn)行實(shí)例:
empty count : 1
empty count : 2
empty count : 3
empty count : 4
6. 觸發(fā)數(shù)據(jù)庫(kù)變更
創(chuàng)建庫(kù):create database canal;
創(chuàng)建表:create table t_canal (id int,name varchar(20),status int);
插入數(shù)據(jù):insert into t_canal values(10,'hello',1);
canal client輸出日志:
================> binlog[mysql-bin.000001:6764] , name[canal,t_canal] , eventType : INSERT
id : 10 update=true
name : hello update=true
status : 1 update=true
三. 自問(wèn)自答-為何設(shè)置了數(shù)據(jù)表的過(guò)濾條件锡足,但貌似沒(méi)有生效波丰?
答:首先看文檔AdminGuide,了解canal.instance.filter.regex的書寫格式舶得。mysql 數(shù)據(jù)解析關(guān)注的表,Perl正則表達(dá)式.多個(gè)正則之間以逗號(hào)(,)分隔爽蝴,轉(zhuǎn)義符需要雙斜杠(\)
常見(jiàn)例子:
- 所有表:.* or .\..
- canal schema下所有表: canal\..*
- canal下的以canal打頭的表:canal\.canal.*
- canal schema下的一張表:canal.test1
- 多個(gè)規(guī)則組合使用:canal\..*,mysql.test1,mysql.test2 (逗號(hào)分隔)
檢查binlog格式沐批,過(guò)濾條件只針對(duì)row模式的數(shù)據(jù)有效(ps. mixed/statement因?yàn)椴唤馕鰏ql,所以無(wú)法準(zhǔn)確提取tableName進(jìn)行過(guò)濾)蝎亚。
檢查下CanalConnector是否調(diào)用subscribe(filter)方法九孩;有的話,filter需要和instance.properties的canal.instance.filter.regex一致发框,否則subscribe的filter會(huì)覆蓋instance的配置躺彬,如果subscribe的filter是...,那么相當(dāng)于你消費(fèi)了所有的更新數(shù)據(jù) 【特別注意】