1.canal簡介
canal可以用來監(jiān)控數(shù)據(jù)庫數(shù)據(jù)的變化颂暇,從而獲得新增占哟、修改的數(shù)據(jù)捶箱。
原理相對比較簡單:
(1)canal模擬mysql slave的交互協(xié)議坑鱼,偽裝自己為mysql slave膘流,向mysql master發(fā)送dump協(xié)議
(2) mysql master收到dump請求,開始推送binary log給slave(也就是canal)
(3) canal解析binary log對象(原始為byte流)
2.環(huán)境部署
(1)mysql開啟binlog模式
查看當(dāng)前mysql是否開啟binlog模式鲁沥,如果log_bin的值為OFF是未開啟睡扬,為ON是已開啟。
SHOW VARIABLES LIKE '%log_bin%'
修改/etc/my.cnf 需要開啟binlog模式黍析。(修改完成之后,重啟mysqld的服務(wù)屎开。)
log-bin=mysql-bin
binlog-format=ROW
server_id=1
進(jìn)入mysql
mysql -h localhost -u root -p
創(chuàng)建賬號 用于測試使用(使用root賬號創(chuàng)建用戶并授予權(quán)限)
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
3.canal服務(wù)端安裝配置
(1)下載地址canal
https://github.com/alibaba/canal/releases/tag/canal-1.0.24
(2)下載之后 上傳到linux系統(tǒng)中阐枣,解壓縮到指定的目錄/usr/local/canal
解壓縮之后的目錄結(jié)構(gòu)如下:
(3)修改 exmaple下的實例配置(修改如圖所示的幾個參數(shù))
vi conf/example/instance.properties
(4)指定讀取位置
進(jìn)入mysql中執(zhí)行下面語句查看binlog所在位置
如果fifile中binlog文件不為 mysql-bin.000001 可以重置mysql
mysql> reset master;
查看canal配置文件
vim /usr/local/canal/conf/example/meta.dat
找到對應(yīng)的binlog信息更改一致即可
"journalName":"mysql-bin.000001","position":120,"
注意:如果不一致,可能導(dǎo)致以下錯誤
c.a.otter.canal.server.netty.handler.SessionHandler - something goes wrong with channel:[id: 0x7f2e9be3, /192.168.200.56:52225 => /192.168.200.128:11111],exception=java.io.IOException: Connection reset by peer
(5)啟動服務(wù):
[root@localhost canal]# ./bin/startup.sh
(6)查看日志:
cat /usr/local/canal/logs/canal/canal.log
4.數(shù)據(jù)監(jiān)控微服務(wù)
當(dāng)用戶執(zhí)行數(shù)據(jù)庫的操作的時候蔼两,binlog日志會被canal捕獲到,并解析出數(shù)據(jù)逞度。我們就可以將解析出來的數(shù)據(jù)進(jìn)行相應(yīng)的邏輯處理额划。
https://github.com/chenqian56131/spring-boot-starter-canal
以上開源項目,實現(xiàn)了springboot與canal的集成档泽。比原生的canal更加優(yōu)雅俊戳。
使用前需要將starter-canal安裝到本地倉庫。我們可以參照它提供的canal-test馆匿,進(jìn)行代碼實現(xiàn)抑胎。
(1)創(chuàng)建工程模塊changgou_canal,pom引入依賴
<dependency>
????<groupId>com.xpand</groupId>
????<artifactId>starter-canal</artifactId>
????<version>0.0.1-SNAPSHOT</version>
</dependency>
(2)創(chuàng)建包com.changgou.canal 渐北,包下創(chuàng)建啟動類
@SpringBootApplication
@EnableCanalClient //聲明當(dāng)前的服務(wù)是canal的客戶端
public class CanalApplication {
????public static void main(String[] args) {
????????SpringApplication.run(CanalApplication.class,args);
????}
}
(3)添加配置文件application.properties
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000
spring.rabbitmq.host=192.168.200.128
(4)創(chuàng)建com.changgou.canal.listener包阿逃,包下創(chuàng)建類
@CanalEventListener //聲明當(dāng)前的類是canal的監(jiān)聽類
public class BusinessListener {
????@Autowired
????private RabbitTemplate rabbitTemplate;
????/**
????*
????* @param eventType 當(dāng)前操作數(shù)據(jù)庫的類型
????* @param rowData 當(dāng)前操作數(shù)據(jù)庫的數(shù)據(jù)
????*/
????@ListenPoint(schema = "business",table = "tb_ad")
????public void adUpdate(CanalEntry.EventType eventType,CanalEntry.RowData rowData){
????????System.out.println("廣告表數(shù)據(jù)發(fā)生改變");
????????//獲取改變之前的數(shù)據(jù)
????????rowData.getBeforeColumnsList().forEach((c)-> System.out.println("改變前的數(shù)據(jù):"+c.getName()+"::"+c.getValue()));
????????//獲取改變之后的數(shù)據(jù)
????????rowData.getAfterColumnsList().forEach((c)-> System.out.println("改變之后的
????????數(shù)據(jù):"+c.getName()+"::"+c.getValue()));
????}
}
測試:啟動數(shù)據(jù)監(jiān)控微服務(wù),修改business的tb_ad表,觀察控制臺輸出恃锉。