前言
Canal用途很廣鱼冀,并且上手非常簡單报破,小伙伴們在平時完成公司的需求時,很有可能會用到千绪。
舉個例子:
公司目前有多個開發(fā)人員正在開發(fā)一套服務(wù)充易,為了縮短調(diào)用延時,對部分接口數(shù)據(jù)加入了緩存荸型。一旦這些數(shù)據(jù)在數(shù)據(jù)庫中進(jìn)行了更新操作盹靴,緩存就成了舊數(shù)據(jù),必須及時刪除瑞妇。
刪除緩存的代碼理所當(dāng)然可以寫在更新數(shù)據(jù)的業(yè)務(wù)代碼里稿静,但有時候者寫操作是在別的項(xiàng)目代碼里,你可能無權(quán)修改辕狰,亦或者別人不愿你在他代碼里寫這種業(yè)務(wù)之外的代碼改备。(畢竟多人協(xié)作中間會產(chǎn)生各種配合問題)。又或者就是單純的刪除緩存的操作失敗了蔓倍,緩存依然是舊數(shù)據(jù)悬钳。
正如上篇文章緩存與數(shù)據(jù)庫雙寫一致性實(shí)戰(zhàn)里面所說,我們可以將緩存更新操作完全獨(dú)立出來偶翅,形成一套單獨(dú)的系統(tǒng)默勾。Canal正是這么一個很好的幫手。 能幫我們實(shí)現(xiàn)像下圖這樣的系統(tǒng):
本篇文章的要點(diǎn)如下:
- Canal是什么
- Canal工作原理
- 數(shù)據(jù)庫的讀寫分離
- 數(shù)據(jù)庫主從同步
- 數(shù)據(jù)庫主從同步一致性問題
- 異步復(fù)制
- 全同步復(fù)制
- 半同步復(fù)制
- Canal實(shí)戰(zhàn)
- 開啟MySQL Binlog
- 配置Canal服務(wù)
- 運(yùn)行Canal服務(wù)
- Java客戶端Demo
阿里開源MySQL中間件Canal快速入門
Canal是什么
眾所周知聚谁,阿里是國內(nèi)比較早地大量使用MySQL的互聯(lián)網(wǎng)企業(yè)(去IOE化:去掉IBM的小型機(jī)母剥、Oracle數(shù)據(jù)庫、EMC存儲設(shè)備,代之以自己在開源軟件基礎(chǔ)上開發(fā)的系統(tǒng))媳搪,并且基于阿里巴巴/淘寶的業(yè)務(wù)铭段,從 2010 年開始,業(yè)務(wù)逐步嘗試數(shù)據(jù)庫日志解析獲取增量變更進(jìn)行同步秦爆,由此衍生出了大量的數(shù)據(jù)庫增量訂閱和消費(fèi)業(yè)務(wù)序愚。
Canal應(yīng)運(yùn)而生,它通過偽裝成數(shù)據(jù)庫的從庫等限,讀取主庫發(fā)來的binlog爸吮,用來實(shí)現(xiàn)數(shù)據(jù)庫增量訂閱和消費(fèi)業(yè)務(wù)需求。
Canal用途:
- 數(shù)據(jù)庫鏡像
- 數(shù)據(jù)庫實(shí)時備份
- 索引構(gòu)建和實(shí)時維護(hù)(拆分異構(gòu)索引望门、倒排索引等)
- 業(yè)務(wù) cache 緩存刷新
- 帶業(yè)務(wù)邏輯的增量數(shù)據(jù)處理
開源項(xiàng)目地址:
https://github.com/alibaba/canal
在這里就不再摘抄項(xiàng)目簡介了形娇,提煉幾個值得注意的點(diǎn):
- canal 使用 client-server 模式,數(shù)據(jù)傳輸協(xié)議使用 protobuf 3.0(很多RPC框架也在使用例如gRPC)
- 當(dāng)前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
- canal 作為 MySQL binlog 增量獲取和解析工具筹误,可將變更記錄投遞到 MQ 系統(tǒng)中桐早,比如 Kafka/RocketMQ。
Canal工作原理
Canal實(shí)際是將自己偽裝成數(shù)據(jù)庫的從庫厨剪,來讀取Binlog哄酝。我們先補(bǔ)習(xí)下關(guān)于MySQL數(shù)據(jù)庫主從數(shù)據(jù)庫的基礎(chǔ)知識,這樣就能更快的理解Canal祷膳。
數(shù)據(jù)庫的讀寫分離
為了應(yīng)對高并發(fā)場景陶衅,MySQL支持把一臺數(shù)據(jù)庫主機(jī)分為單獨(dú)的一臺寫主庫(主要負(fù)責(zé)寫操作)壹瘟,而把讀的數(shù)據(jù)庫壓力分配給讀的從庫冲九,而且讀從庫可以變?yōu)槎嗯_,這就是讀寫分離的典型場景缩歪。
數(shù)據(jù)庫主從同步
實(shí)現(xiàn)數(shù)據(jù)庫的讀寫分離勇皇,是通過數(shù)據(jù)庫主從同步罩句,讓從數(shù)據(jù)庫監(jiān)聽主數(shù)據(jù)庫Binlog實(shí)現(xiàn)的。大體流程如下圖:
MySQL master 將數(shù)據(jù)變更寫入二進(jìn)制日志( binary log, 其中記錄叫做二進(jìn)制日志事件binary log events敛摘,可以通過 show binlog events 進(jìn)行查看)
MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)
MySQL slave 重放 relay log 中事件的止,將數(shù)據(jù)變更反映它自己的數(shù)據(jù)
詳細(xì)主從同步原理在這里就不展開細(xì)說了。
可以看到着撩,這種架構(gòu)下會有一個問題,數(shù)據(jù)庫主從同步會存在延遲匾委,那么就會有短暫的時間拖叙,主從數(shù)據(jù)庫的數(shù)據(jù)是不一致的。
這種不一致大多數(shù)情況下非常短暫赂乐,很多時候我們可以忽略他薯鳍。
但一旦要求數(shù)據(jù)一致,就會引申出如何解決這個問題的思考。
數(shù)據(jù)庫主從同步一致性問題
我們通常使用MySQL主從復(fù)制來解決MySQL的單點(diǎn)故障問題挖滤,其通過邏輯復(fù)制的方式把主庫的變更同步到從庫崩溪,主備之間無法保證嚴(yán)格一致的模式,
于是斩松,MySQL的主從復(fù)制帶來了主從“數(shù)據(jù)一致性”的問題伶唯。MySQL的復(fù)制分為:異步復(fù)制、半同步復(fù)制惧盹、全同步復(fù)制乳幸。
異步復(fù)制
MySQL默認(rèn)的復(fù)制即是異步復(fù)制,主庫在執(zhí)行完客戶端提交的事務(wù)后會立即將結(jié)果返給給客戶端钧椰,并不關(guān)心從庫是否已經(jīng)接收并處理粹断,這樣就會有一個問題,主如果crash掉了嫡霞,此時主上已經(jīng)提交的事務(wù)可能并沒有傳到從庫上瓶埋,如果此時,強(qiáng)行將從提升為主诊沪,可能導(dǎo)致新主上的數(shù)據(jù)不完整养筒。
主庫將事務(wù) Binlog 事件寫入到 Binlog 文件中,此時主庫只會通知一下 Dump 線程發(fā)送這些新的 Binlog娄徊,然后主庫就會繼續(xù)處理提交操作闽颇,而此時不會保證這些 Binlog 傳到任何一個從庫節(jié)點(diǎn)上。
全同步復(fù)制
指當(dāng)主庫執(zhí)行完一個事務(wù)寄锐,所有的從庫都執(zhí)行了該事務(wù)才返回給客戶端兵多。因?yàn)樾枰却袕膸靾?zhí)行完該事務(wù)才能返回,所以全同步復(fù)制的性能必然會收到嚴(yán)重的影響橄仆。
當(dāng)主庫提交事務(wù)之后剩膘,所有的從庫節(jié)點(diǎn)必須收到、APPLY并且提交這些事務(wù)盆顾,然后主庫線程才能繼續(xù)做后續(xù)操作怠褐。但缺點(diǎn)是,主庫完成一個事務(wù)的時間會被拉長您宪,性能降低奈懒。
半同步復(fù)制
是介于全同步復(fù)制與全異步復(fù)制之間的一種,主庫只需要等待至少一個從庫節(jié)點(diǎn)收到并且 Flush Binlog 到 Relay Log 文件即可宪巨,主庫不需要等待所有從庫給主庫反饋磷杏。同時,這里只是一個收到的反饋捏卓,而不是已經(jīng)完全完成并且提交的反饋极祸,如此,節(jié)省了很多時間。
介于異步復(fù)制和全同步復(fù)制之間遥金,主庫在執(zhí)行完客戶端提交的事務(wù)后不是立刻返回給客戶端浴捆,而是等待至少一個從庫接收到并寫到relay log中才返回給客戶端。相對于異步復(fù)制稿械,半同步復(fù)制提高了數(shù)據(jù)的安全性选泻,同時它也造成了一定程度的延遲,這個延遲最少是一個TCP/IP往返的時間溜哮。所以滔金,半同步復(fù)制最好在低延時的網(wǎng)絡(luò)中使用。
事實(shí)上茂嗓,半同步復(fù)制并不是嚴(yán)格意義上的半同步復(fù)制餐茵,MySQL半同步復(fù)制架構(gòu)中,主庫在等待備庫ack時候述吸,如果超時會退化為異步后忿族,也可能導(dǎo)致“數(shù)據(jù)不一致”。
當(dāng)半同步復(fù)制發(fā)生超時時(由rpl_semi_sync_master_timeout參數(shù)控制蝌矛,單位是毫秒道批,默認(rèn)為10000,即10s)入撒,會暫時關(guān)閉半同步復(fù)制隆豹,轉(zhuǎn)而使用異步復(fù)制。當(dāng)master dump線程發(fā)送完一個事務(wù)的所有事件之后茅逮,如果在rpl_semi_sync_master_timeout內(nèi)璃赡,收到了從庫的響應(yīng),則主從又重新恢復(fù)為半同步復(fù)制献雅。
關(guān)于半同步復(fù)制的詳細(xì)原理分析可以看這篇引申文章碉考,在此不展開:
https://www.cnblogs.com/ivictor/p/5735580.html
回到Canal的工作原理
回顧了數(shù)據(jù)庫從庫的數(shù)據(jù)同步原理,理解Canal十分簡單挺身,直接引用官網(wǎng)原文:
- canal 模擬 MySQL slave 的交互協(xié)議侯谁,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送dump 協(xié)議
- MySQL master 收到 dump 請求章钾,開始推送 binary log 給 slave (即 canal )
- canal 解析 binary log 對象(原始為 byte 流)
Canal實(shí)戰(zhàn)
開啟MySQL Binlog
這個步驟我在之前的文章教你使用Binlog日志恢復(fù)誤刪的MySQL數(shù)據(jù)已經(jīng)提到過墙贱,這里完善了一下,再貼一下贱傀,方便大家惨撇。
首先進(jìn)入數(shù)據(jù)庫控制臺,運(yùn)行指令:
mysql> show variables like'log_bin%';
+---------------------------------+-------+
| Variable_name | Value |
+---------------------------------+-------+
| log_bin | OFF |
| log_bin_basename | |
| log_bin_index | |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
+---------------------------------+-------+
5 rows in set (0.00 sec)
可以看到我們的binlog是關(guān)閉的窍箍,都是OFF。接下來我們需要修改Mysql配置文件,執(zhí)行命令:
sudo vi /etc/mysql/mysql.conf.d/mysqld.cnf
在文件末尾添加:
log-bin=/var/lib/mysql/mysql-bin
binlog-format=ROW
保存文件椰棘,重啟mysql服務(wù):
sudo service mysql restart
重啟完成后纺棺,查看下mysql的狀態(tài):
systemctl status mysql.service
這時,如果你的mysql版本在5.7或更高版本邪狞,就會報(bào)錯:
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.190791Z 0 [Warning] Changed limits: max_open_files: 1024 (requested 5000)
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.190839Z 0 [Warning] Changed limits: table_open_cache: 431 (requested 2000)
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.359713Z 0 [Warning] TIMESTAMP with implicit DEFAULT value is deprecated. Please use --explicit_defaults_for_timestamp server option (se
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.361395Z 0 [Note] /usr/sbin/mysqld (mysqld 5.7.28-0ubuntu0.16.04.2-log) starting as process 5930 ...
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.363017Z 0 [ERROR] You have enabled the binary log, but you haven't provided the mandatory server-id. Please refer to the proper server
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.363747Z 0 [ERROR] Aborting
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.363922Z 0 [Note] Binlog end
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.364108Z 0 [Note] /usr/sbin/mysqld: Shutdown complete
Jan 06 15:49:58 VM-0-11-ubuntu systemd[1]: mysql.service: Main process exited, code=exited, status=1/FAILURE
You have enabled the binary log, but you haven't provided the mandatory server-id. Please refer to the proper server
之前我們的配置祷蝌,對于5.7以下版本應(yīng)該是可以的。但對于高版本帆卓,我們需要指定server-id巨朦。
我們給這個MySQL指定為2(只要不與其他庫id重復(fù)):
server-id=2
創(chuàng)建數(shù)據(jù)庫Canal使用賬號
mysql> select user, host from user;
+------------------+-----------+
| user | host |
+------------------+-----------+
| root | % |
| debian-sys-maint | localhost |
| mysql.session | localhost |
| mysql.sys | localhost |
| root | localhost |
+------------------+-----------+
5 rows in set
CREATE USER canal IDENTIFIED BY 'xxxx'; (填寫密碼)
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
show grants for 'canal'
配置Canal服務(wù)
去Github下載最近的Canal穩(wěn)定版本包:
解壓縮:
mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal
配置文件設(shè)置:
主要有兩個文件配置,一個是conf/canal.properties
一個是conf/example/instance.properties
剑令。
為了快速運(yùn)行Demo糊啡,只修改conf/example/instance.properties
里的數(shù)據(jù)庫連接賬號密碼即可
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=xxxxxxx
canal.instance.connectionCharset = UTF-8
運(yùn)行Canal服務(wù)
請先確保機(jī)器上有JDK,接著運(yùn)行Canal啟動腳本:
sh bin/startup.sh
下圖即成功運(yùn)行:
Java客戶端代碼
我在秒殺系統(tǒng)系列文章的代碼倉庫里(miaosha-job)編寫了如下客戶端代碼
倉庫源碼地址:https://github.com/qqxx6661/miaosha
package job;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class CanalClient {
private static final Logger LOGGER = LoggerFactory.getLogger(CanalClient.class);
public static void main(String[] args) {
// 第一步:與canal進(jìn)行連接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),
"example", "", "");
connector.connect();
// 第二步:開啟訂閱
connector.subscribe();
// 第三步:循環(huán)訂閱
while (true) {
try {
// 每次讀取 1000 條
Message message = connector.getWithoutAck(1000);
long batchID = message.getId();
int size = message.getEntries().size();
if (batchID == -1 || size == 0) {
LOGGER.info("當(dāng)前暫時沒有數(shù)據(jù)吁津,休眠1秒");
Thread.sleep(1000);
} else {
LOGGER.info("-------------------------- 有數(shù)據(jù)啦 -----------------------");
printEntry(message.getEntries());
}
connector.ack(batchID);
} catch (Exception e) {
LOGGER.error("處理出錯");
} finally {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 獲取每條打印的記錄
*/
public static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
// 第一步:拆解entry 實(shí)體
Header header = entry.getHeader();
EntryType entryType = entry.getEntryType();
// 第二步: 如果當(dāng)前是RowData棚蓄,那就是我需要的數(shù)據(jù)
if (entryType == EntryType.ROWDATA) {
String tableName = header.getTableName();
String schemaName = header.getSchemaName();
RowChange rowChange = null;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
EventType eventType = rowChange.getEventType();
LOGGER.info(String.format("當(dāng)前正在操作表 %s.%s, 執(zhí)行操作= %s", schemaName, tableName, eventType));
// 如果是‘查詢’ 或者 是 ‘DDL’ 操作碍脏,那么sql直接打出來
if (eventType == EventType.QUERY || rowChange.getIsDdl()) {
LOGGER.info("執(zhí)行了查詢語句:[{}]", rowChange.getSql());
return;
}
// 第三步:追蹤到 columns 級別
rowChange.getRowDatasList().forEach((rowData) -> {
// 獲取更新之前的column情況
List<Column> beforeColumns = rowData.getBeforeColumnsList();
// 獲取更新之后的 column 情況
List<Column> afterColumns = rowData.getAfterColumnsList();
// 當(dāng)前執(zhí)行的是 刪除操作
if (eventType == EventType.DELETE) {
printColumn(beforeColumns);
}
// 當(dāng)前執(zhí)行的是 插入操作
if (eventType == EventType.INSERT) {
printColumn(afterColumns);
}
// 當(dāng)前執(zhí)行的是 更新操作
if (eventType == EventType.UPDATE) {
printColumn(afterColumns);
// 進(jìn)行刪除緩存操作
deleteCache(afterColumns, tableName, schemaName);
}
});
}
}
}
/**
* 每個row上面的每一個column 的更改情況
* @param columns
*/
public static void printColumn(List<Column> columns) {
columns.forEach((column) -> {
String columnName = column.getName();
String columnValue = column.getValue();
String columnType = column.getMysqlType();
// 判斷 該字段是否更新
boolean isUpdated = column.getUpdated();
LOGGER.info(String.format("數(shù)據(jù)列:columnName=%s, columnValue=%s, columnType=%s, isUpdated=%s", columnName, columnValue, columnType, isUpdated));
});
}
/**
* 秒殺下單接口刪除庫存緩存
*/
public static void deleteCache(List<Column> columns, String tableName, String schemaName) {
if ("stock".equals(tableName) && "m4a_miaosha".equals(schemaName)) {
AtomicInteger id = new AtomicInteger();
columns.forEach((column) -> {
String columnName = column.getName();
String columnValue = column.getValue();
if ("id".equals(columnName)) {
id.set(Integer.parseInt(columnValue));
}
});
// TODO: 刪除緩存
LOGGER.info("Canal刪除stock表id:[{}] 的庫存緩存", id);
}
}
}
代碼中有詳細(xì)的注釋梭依,就不做解釋了。
我們跑起代碼典尾,緊接著我們在數(shù)據(jù)庫中進(jìn)行更改UPDATE操作役拴,把法外狂徒張三改成張三1,然后再改回張三钾埂,見下圖河闰。
Canal成功收到了兩條更新操作:
緊接著我們模擬一個刪除Cache緩存的業(yè)務(wù),在代碼中有:
/**
* 秒殺下單接口刪除庫存緩存
*/
public static void deleteCache(List<Column> columns, String tableName, String schemaName) {
if ("stock".equals(tableName) && "m4a_miaosha".equals(schemaName)) {
AtomicInteger id = new AtomicInteger();
columns.forEach((column) -> {
String columnName = column.getName();
String columnValue = column.getValue();
if ("id".equals(columnName)) {
id.set(Integer.parseInt(columnValue));
}
});
// TODO: 刪除緩存
LOGGER.info("Canal刪除stock表id:[{}] 的庫存緩存", id);
}
}
在上面的代碼中勃教,在收到m4a_miaosha.stock表的更新操作后淤击,我們刷新庫存緩存。效果如下:
簡單的Canal使用就介紹到這里故源,剩下的發(fā)揮空間留給各位讀者大大們污抬。
總結(jié)
本文總結(jié)了Canal的基本原理和簡單的使用。
總結(jié)如下幾點(diǎn):
- Canal實(shí)際是將自己偽裝成數(shù)據(jù)庫的從庫绳军,來讀取主數(shù)據(jù)庫發(fā)來的Binlog印机。
- Canal用途很廣,比如數(shù)據(jù)庫實(shí)時備份门驾、索引構(gòu)建和實(shí)時維護(hù)(拆分異構(gòu)索引射赛、倒排索引等)、業(yè)務(wù) cache 緩存刷新奶是。
- Canal可以推送至非常多數(shù)據(jù)源楣责,并支持推送到消息隊(duì)列竣灌,方便多語言使用。
參考
- https://blog.csdn.net/l1028386804/article/details/81208362
- https://github.com/alibaba/canal/wiki/QuickStart
- https://youzhixueyuan.com/database-master-slave-synchronization.html
- http://www.reibang.com/p/790a158d9eb3
- https://blog.csdn.net/xihuanyuye/article/details/81220524
- https://www.cnblogs.com/ivictor/p/5735580.html