canal模擬mysql slave的交互協議,偽裝自己為mysql slave芭梯,向mysql master發(fā)送dump協議险耀;mysql master收到dump請求,開始推送binary log給slave(也就是canal)玖喘;canal解析binary log對象(原始為byte流)甩牺。
mysql主備復制實現
從上層來看,復制分成三步:
- master將改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件累奈,binary log events柴灯,可以通過show binlog events進行查看);
- slave將master的binary log events拷貝到它的中繼日志(relay log)费尽;
- slave重做中繼日志中的事件,將改變反映它自己的數據羊始。
Binlog獲取詳解
Binlog發(fā)送接收流程旱幼,流程如下圖所示:
首先,我們需要偽造一個slave突委,向master注冊柏卤,這樣master才會發(fā)送binlog event。注冊很簡單匀油,就是向master發(fā)送COM_REGISTER_SLAVE命令缘缚,帶上slave相關信息。這里需要注意敌蚜,因為在MySQL的replication topology中桥滨,都需要使用一個唯一的server id來區(qū)別標示不同的server實例,所以這里我們偽造的slave也需要一個唯一的server id。
接著實現binlog的dump齐媒。MySQL只支持一種binlog dump方式蒲每,也就是指定binlog filename + position,向master發(fā)送COM_BINLOG_DUMP命令喻括。在發(fā)送dump命令的時候邀杏,我們可以指定flag為BINLOG_DUMP_NON_BLOCK,這樣master在沒有可發(fā)送的binlog event之后唬血,就會返回一個EOF package望蜡。不過通常對于slave來說,一直把連接掛著可能更好拷恨,這樣能更及時收到新產生的binlog event脖律。
Dump命令包圖如下所示:
如上圖所示,在報文中塞入binlogPosition和binlogFileName即可讓master從相應的位置發(fā)送binlog event。
關于binlog event的細節(jié)挑随,請參照我另一篇文章状您。 binlog詳解
canal結構
說明:
- server代表一個canal運行實例,對應于一個jvm兜挨,也可以理解為一個進程
- instance對應于一個數據隊列 (1個server對應1..n個instance)膏孟,每一個數據隊列可以理解為一個數據庫實例。
Server設計
server代表了一個canal的運行實例拌汇,為了方便組件化使用柒桑,特意抽象了Embeded(嵌入式) / Netty(網絡訪問)的兩種實現
- Embeded : 對latency和可用性都有比較高的要求,自己又能hold住分布式的相關技術(比如failover)
- Netty : 基于netty封裝了一層網絡協議噪舀,由canal server保證其可用性魁淳,采用的pull模型,當然latency會稍微打點折扣与倡,不過這個也視情況而定界逛。(阿里系的notify和metaq,典型的push/pull模型纺座,目前也逐步的在向pull模型靠攏息拜,push在數據量大的時候會有一些問題)
Instance設計
instance代表了一個實際運行的數據隊列,包括了EventPaser,EventSink,EventStore等組件净响。
抽象了CanalInstanceGenerator少欺,主要是考慮配置的管理方式:
manager方式: 和你自己的內部web console/manager系統(tǒng)進行對接。(目前主要是公司內部使用馋贤,Otter采用這種方式)
spring方式:基于spring xml + properties進行定義赞别,構建spring配置.
下面是canalServer和instance如何運行
canalServer.setCanalInstanceGenerator(new CanalInstanceGenerator() {
public CanalInstance generate(String destination) {
Canal canal = canalConfigClient.findCanal(destination);
// 此處省略部分代碼 大致邏輯是設置canal一些屬性
CanalInstanceWithManager instance = new CanalInstanceWithManager(canal, filter) {
protected CanalHAController initHaController() {
HAMode haMode = parameters.getHaMode();
if (haMode.isMedia()) {
return new MediaHAController(parameters.getMediaGroup(),
parameters.getDbUsername(),
parameters.getDbPassword(),
parameters.getDefaultDatabaseName());
} else {
return super.initHaController();
}
}
protected void startEventParserInternal(CanalEventParser parser, boolean isGroup) {
//大致邏輯是 設置支持的類型
//初始化設置MysqlEventParser的主庫信息,這處抽象不好配乓,目前只支持mysql
}
};
return instance;
}
});
canalServer.start(); //啟動canalServer
canalServer.start(destination);//啟動對應instance
this.clientIdentity = new ClientIdentity(destination, pipeline.getParameters().getMainstemClientId(), filter);
canalServer.subscribe(clientIdentity);// 發(fā)起一次訂閱仿滔,當監(jiān)聽到instance配置時惠毁,調用generate方法注入新的instance
instance模塊:
- eventParser (數據源接入,模擬slave協議和master進行交互堤撵,協議解析)
- eventSink (Parser和Store鏈接器仁讨,進行數據過濾,加工实昨,分發(fā)的工作)
- eventStore (數據存儲)
- metaManager (增量訂閱&消費信息管理器)
EventParser設計
大致過程:
整個parser過程大致可分為幾步:
Connection獲取上一次解析成功的位置 (如果第一次啟動洞豁,則獲取初始指定的位置或者是當前數據庫的binlog位點)
Connection建立鏈接,發(fā)送BINLOG_DUMP指令
// 0. write command number
// 1. write 4 bytes bin-log position to start at
// 2. write 2 bytes bin-log flags
// 3. write 4 bytes server id of the slave
// 4. write bin-log file name
Mysql開始推送Binaly Log
接收到的Binaly Log的通過Binlog parser進行協議解析荒给,補充一些特定信息
// 補充字段名字丈挟,字段類型,主鍵信息志电,unsigned類型處理
傳遞給EventSink模塊進行數據存儲曙咽,是一個阻塞操作,直到存儲成功
存儲成功后挑辆,由CanalLogPositionManager定時記錄Binaly Log位置
EventSink設計
說明:
- 數據過濾:支持通配符的過濾模式例朱,表名,字段內容等
- 數據路由/分發(fā):解決1:n (1個parser對應多個store的模式)
- 數據歸并:解決n:1 (多個parser對應1個store)
- 數據加工:在進入store之前進行額外的處理鱼蝉,比如join
** 數據1:n業(yè)務 **
為了合理的利用數據庫資源洒嗤, 一般常見的業(yè)務都是按照schema進行隔離,然后在mysql上層或者dao這一層面上魁亦,進行一個數據源路由渔隶,屏蔽數據庫物理位置對開發(fā)的影響,阿里系主要是通過cobar/tddl來解決數據源路由問題洁奈。
所以间唉,一般一個數據庫實例上,會部署多個schema利术,每個schema會有由1個或者多個業(yè)務方關注
** 數據n:1業(yè)務 **
同樣呈野,當一個業(yè)務的數據規(guī)模達到一定的量級后,必然會涉及到水平拆分和垂直拆分的問題印叁,針對這些拆分的數據需要處理時际跪,就需要鏈接多個store進行處理,消費的位點就會變成多份喉钢,而且數據消費的進度無法得到盡可能有序的保證。
所以良姆,在一定業(yè)務場景下肠虽,需要將拆分后的增量數據進行歸并處理,比如按照時間戳/全局id進行排序歸并.
EventStore設計
- 目前僅實現了Memory內存模式玛追,后續(xù)計劃增加本地file存儲税课,mixed混合模式
- 借鑒了Disruptor的RingBuffer的實現思路
RingBuffer設計:
定義了3個cursor
Put : Sink模塊進行數據存儲的最后一次寫入位置
Get : 數據訂閱獲取的最后一次提取位置
Ack : 數據消費成功的最后一次消費位置
借鑒Disruptor的RingBuffer的實現闲延,將RingBuffer拉直來看:
實現說明:
Put/Get/Ack cursor用于遞增,采用long型存儲
buffer的get操作韩玩,通過取余或者與操作垒玲。(與操作: cusor & (size - 1) , size需要為2的指數,效率比較高)
HA機制設計
canal的ha分為兩部分找颓,canal server和canal client分別有對應的ha實現
- canal server: 為了減少對mysql dump的請求合愈,不同server上的instance要求同一時間只能有一個處于running,其他的處于standby狀態(tài).
- canal client: 為了保證有序性击狮,一份instance同一時間只能由一個canal client進行get/ack/rollback操作佛析,否則客戶端接收無法保證有序。
整個HA機制的控制主要是依賴了zookeeper的幾個特性彪蓬,watcher和EPHEMERAL節(jié)點(和session生命周期綁定)寸莫,可以看下我之前zookeeper的相關文章。
Canal Server:
大致步驟:
- canal server要啟動某個canal instance時都先向zookeeper進行一次嘗試啟動判斷 (實現:創(chuàng)建EPHEMERAL節(jié)點档冬,誰創(chuàng)建成功就允許誰啟動)
- 創(chuàng)建zookeeper節(jié)點成功后膘茎,對應的canal server就啟動對應的canal instance,沒有創(chuàng)建成功的canal instance就會處于standby狀態(tài)
- 一旦zookeeper發(fā)現canal server A創(chuàng)建的節(jié)點消失后酷誓,立即通知其他的canal server再次進行步驟1的操作披坏,重新選出一個canal server啟動instance.
- canal client每次進行connect時,會首先向zookeeper詢問當前是誰啟動了canal instance呛牲,然后和其建立鏈接刮萌,一旦鏈接不可用,會重新嘗試connect.
Canal Client的方式和canal server方式類似娘扩,也是利用zookeeper的搶占EPHEMERAL節(jié)點的方式進行控制.