一. canal概述
canal是Alibaba旗下的一款開(kāi)源項(xiàng)目,純Java開(kāi)發(fā).它是基于數(shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱&消費(fèi),目前主要支持mysql。
應(yīng)用場(chǎng)景:
- 1.數(shù)據(jù)同步府树,比如:做在線澜倦、離線數(shù)據(jù)庫(kù)之間的數(shù)據(jù)同步操作;
- 2.數(shù)據(jù)消費(fèi)篡殷,比如:需要根據(jù)關(guān)注的數(shù)據(jù)庫(kù)表的變化钝吮,做搜索增量;
- 3.數(shù)據(jù)脫敏贴唇,比如:需要將線上動(dòng)態(tài)數(shù)據(jù)導(dǎo)入到其他地方搀绣,做數(shù)據(jù)脫敏。
二. canal工作原理
1. mysql主備復(fù)制實(shí)現(xiàn)
(1) master將改變記錄到二進(jìn)制日志(binary log)中(這些記錄叫做二進(jìn)制日志事件戳气,binary log events链患,可以通過(guò)show binlog events進(jìn)行查看);
(2) slave將master的binary log events拷貝到它的中繼日志(relay log)瓶您;
(3) slave重做中繼日志中的事件麻捻,將改變反映它自己的數(shù)據(jù)。
2. canal的工作原理
(1) canal模擬mysql salve的交互協(xié)議,偽裝自己為mysql slave,向mysql master發(fā)送dump協(xié)議;
(2) mysql master收到dump請(qǐng)求,開(kāi)始推送binary log給slave(也就是canal);
(3) canal解析binary log對(duì)象(原始byte流).
三. canal架構(gòu)設(shè)計(jì)
說(shuō)明:
- server代表一個(gè)canal運(yùn)行實(shí)例,對(duì)應(yīng)與一個(gè)jvm;
- instance對(duì)應(yīng)于一個(gè)數(shù)據(jù)隊(duì)列(1個(gè)server對(duì)應(yīng)1..n個(gè)instance).
instance下的子模塊:
- eventParser: 數(shù)據(jù)源接入,模擬slave協(xié)議和master進(jìn)行交互,協(xié)議解析;
- eventSink: parser和store鏈接器,進(jìn)行數(shù)據(jù)的過(guò)濾,加工和分發(fā)工作;
- eventStore: 數(shù)據(jù)存儲(chǔ);
- metaManager: 增量訂閱&消費(fèi)信息管理器.
1. EventParser設(shè)計(jì)
整個(gè)parser過(guò)程大致可分為幾部:
- 1.Connection獲取上一次解析成功的位置(如果第一次啟動(dòng)呀袱,則獲取初始制定的位置或者是當(dāng)前數(shù)據(jù)庫(kù)的binlog位點(diǎn));
- 2.Connection建立連接贸毕,發(fā)生BINLOG_DUMP命令
- 3.Mysql開(kāi)始推送Binary Log;
- 4.接收到的Binary Log通過(guò)Binlog parser進(jìn)行協(xié)議解析,補(bǔ)充一些特定信息;
- 5.傳遞給EventSink模塊進(jìn)行數(shù)據(jù)存儲(chǔ)夜赵,是一個(gè)阻塞操作明棍,直到存儲(chǔ)成功
存儲(chǔ)成功后,定時(shí)記錄Binary Log位置.
2. EventSink設(shè)計(jì)
- 數(shù)據(jù)過(guò)濾:支持通配符的過(guò)濾模式寇僧,表名摊腋,字段內(nèi)容等;
- 數(shù)據(jù)路由/分發(fā):解決1:n (1個(gè)parser對(duì)應(yīng)多個(gè)store的模式);
- 數(shù)據(jù)歸并:解決n:1 (多個(gè)parser對(duì)應(yīng)1個(gè)store);
- 數(shù)據(jù)加工:在進(jìn)入store之前進(jìn)行額外的處理,比如join.
3. EventStore設(shè)計(jì)
目前canal實(shí)現(xiàn)了memory內(nèi)存嘁傀、本地file存儲(chǔ)以及持久化到zookeeper以保障數(shù)據(jù)集群共享兴蒸。memory內(nèi)存的RingBuffer設(shè)計(jì)如下圖:
定義了3個(gè)cursor:
- Put : Sink模塊進(jìn)行數(shù)據(jù)存儲(chǔ)的最后一次寫入位置
- Get : 數(shù)據(jù)訂閱獲取的最后一次提取位置
- Ack : 數(shù)據(jù)消費(fèi)成功的最后一次消費(fèi)位置
借鑒Disruptor的RingBuffer的實(shí)現(xiàn),將RingBuffer拉直來(lái)看:
4. 增量訂閱细办、消費(fèi)設(shè)計(jì)
get/ack/rollback協(xié)議介紹:
- Message getWithoutAck(int batchSize)橙凳,允許指定batchSize,一次可以獲取多條,每次返回的對(duì)象為Message,包含的內(nèi)容為:batch id(唯一標(biāo)識(shí))和entries(具體的數(shù)據(jù)對(duì)象),具體格式后面會(huì)進(jìn)行介紹离赫。
- void rollback(long batchId),顧命思議议谷,回滾上次的get請(qǐng)求,重新獲取數(shù)據(jù)堕虹。基于get獲取的batchId進(jìn)行提交芬首,避免誤操作赴捞;
- void ack(long batchId),顧命思議郁稍,確認(rèn)已經(jīng)消費(fèi)成功赦政,通知server刪除數(shù)據(jù)∫基于get獲取的batchId進(jìn)行提交恢着,避免誤操作
- canal的get/ack/rollback協(xié)議和常規(guī)的jms協(xié)議有所不同,允許get/ack異步處理财破,比如可以連續(xù)調(diào)用get多次掰派,后續(xù)異步按順序提交ack/rollback,項(xiàng)目中稱之為流式api.
流式api設(shè)計(jì)的好處:
- get/ack異步化左痢,減少因ack帶來(lái)的網(wǎng)絡(luò)延遲和操作成本 (99%的狀態(tài)都是處于正常狀態(tài)靡羡,異常的rollback屬于個(gè)別情況,沒(méi)必要為個(gè)別的case犧牲整個(gè)性能)俊性;
- get獲取數(shù)據(jù)后略步,業(yè)務(wù)消費(fèi)存在瓶頸或者需要多進(jìn)程/多線程消費(fèi)時(shí),可以不停的輪詢get數(shù)據(jù)定页,不停的往后發(fā)送任務(wù)趟薄,提高并行化.
數(shù)據(jù)格式:
Entry
Header
logfileName [binlog文件名]
logfileOffset [binlog position]
executeTime [發(fā)生的變更]
schemaName
tableName
eventType [insert/update/delete類型]
entryType [事務(wù)頭BEGIN/事務(wù)尾END/數(shù)據(jù)ROWDATA]
storeValue [byte數(shù)據(jù),可展開(kāi),對(duì)應(yīng)的類型為RowChange]
RowChange
isDdl [是否是ddl變更操作典徊,比如create table/drop table]
sql [具體的ddl sql]
rowDatas [具體insert/update/delete的變更數(shù)據(jù)杭煎,可為多條,1個(gè)binlog event事件可對(duì)應(yīng)多條變更宫峦,比如批處理]
beforeColumns [Column類型的數(shù)組]
afterColumns [Column類型的數(shù)組]
Column
index
sqlType [jdbc type]
name [column name]
isKey [是否為主鍵]
updated [是否發(fā)生過(guò)變更]
isNull [值是否為null]
value [具體的內(nèi)容岔帽,注意為文本]
四. canal的HA機(jī)制設(shè)計(jì)
canal的HA機(jī)制主要是依賴zookeeper的特性,共分為canal server和canal client兩部分:
- canal server:為了減少對(duì)mysql dump的請(qǐng)求,不同server上的instance要求同一時(shí)間只能有一個(gè)處于running,其他的處于standby狀態(tài).
- canal client:為了保證有序性,一份instance同一時(shí)間只能由一個(gè)canal client進(jìn)行g(shù)et/ack/rollback操作,否則客戶端接收無(wú)法保證有序.
大致步驟:
- canal server要啟動(dòng)某個(gè)canal instance時(shí)都先向zookeeper進(jìn)行一次嘗試啟動(dòng)判斷 (實(shí)現(xiàn):創(chuàng)建EPHEMERAL節(jié)點(diǎn),誰(shuí)創(chuàng)建成功就允許誰(shuí)啟動(dòng))
- 創(chuàng)建zookeeper節(jié)點(diǎn)成功后导绷,對(duì)應(yīng)的canal server就啟動(dòng)對(duì)應(yīng)的canal instance犀勒,沒(méi)有創(chuàng)建成功的canal instance就會(huì)處于standby狀態(tài)
- 一旦zookeeper發(fā)現(xiàn)canal server A創(chuàng)建的節(jié)點(diǎn)消失后,立即通知其他的canal server再次進(jìn)行步驟1的操作,重新選出一個(gè)canal server啟動(dòng)instance.
- canal client每次進(jìn)行connect時(shí)贾费,會(huì)首先向zookeeper詢問(wèn)當(dāng)前是誰(shuí)啟動(dòng)了canal instance钦购,然后和其建立鏈接,一旦鏈接不可用褂萧,會(huì)重新嘗試connect.
- Canal Client的方式和canal server方式類似押桃,也是利用zokeeper的搶占EPHEMERAL節(jié)點(diǎn)的方式進(jìn)行控制.
HA配置架構(gòu)圖:
canal其他鏈接方式:
1. 單連
2. 兩個(gè)client+兩個(gè)instance+1個(gè)mysql(其實(shí)就是兩個(gè)單連)
3. 一個(gè)server+兩個(gè)instance+兩個(gè)mysql+兩個(gè)client
4. instance的standby配置
五. canal總結(jié)
- canal原理:模擬mysql slave的交互協(xié)議,偽裝自己為mysql slave导犹,向mysql master發(fā)送dump協(xié)議唱凯;mysql master收到dump請(qǐng)求,開(kāi)始推送binary log給slave(也就是canal)谎痢;解析binary log對(duì)象(原始為byte流)
- 存在重復(fù)消費(fèi)問(wèn)題:需要在消費(fèi)端解決磕昼。
- canal需要維護(hù)EventStore,可以存取在Memory, File, zk.
- canal需要維護(hù)客戶端的狀態(tài)节猿,同一時(shí)刻一個(gè)instance只能有一個(gè)消費(fèi)端消費(fèi).
- 支持binlog format 類型:statement, row, mixed. 多次附加功能只能在row下使用票从,比如otter.
- 有ACK機(jī)制.