一般來說,我們對于數(shù)據(jù)庫最主要的要求就是:數(shù)據(jù)不丟桂塞。不管是主從復制瓶摆,還是使用類似otter+canal這樣的數(shù)據(jù)庫同步方案凉逛,我們最基本的需求是,在數(shù)據(jù)不丟失的前提下群井,盡可能的保證系統(tǒng)的高可用状飞,也就是在某個節(jié)點掛掉,或者數(shù)據(jù)庫發(fā)生主從切換等情況下书斜,我們的數(shù)據(jù)同步系統(tǒng)依然能夠發(fā)揮它的作用--數(shù)據(jù)同步诬辈。本文討論的場景是數(shù)據(jù)庫發(fā)生主從切換,本文將從源碼的角度荐吉,來看看otter和canal是如何保證高可用和高可靠的焙糟。
一、EventParser
通過閱讀文檔和源碼样屠,我們可以知道穿撮,對于一個canal server,基礎的框架包括以下幾個部分:MetaManager痪欲、EventParser悦穿、EventSink和EventStore。其中EventParser的作用就是發(fā)送dump命令业踢,從mysql數(shù)據(jù)庫獲取binlog文件栗柒。發(fā)送dump命令,可以指定時間戳或者position知举,從指定的時間或者位置開始dump傍衡。我們來看看過程:
首先是CanalServer啟動深员。otter默認使用的是內(nèi)置版的canal server负蠕,所以我們主要看CanalServerWithEmbedded這個類蛙埂。來看下他的啟動過程:
public void start(final String destination) {
final CanalInstance canalInstance = canalInstances.get(destination);
if (!canalInstance.isStart()) {
try {
MDC.put("destination", destination);
canalInstance.start();//啟動實例
logger.info("start CanalInstances[{}] successfully", destination);
} finally {
MDC.remove("destination");
}
}
}
我們看下實例啟動那一行,跟到AbstractCanalInstance類中
public void start() {
super.start();
if (!metaManager.isStart()) {
metaManager.start();//源數(shù)據(jù)管理啟動
}
if (!alarmHandler.isStart()) {
alarmHandler.start();//報警處理器啟動
}
if (!eventStore.isStart()) {
eventStore.start();//數(shù)據(jù)存儲器啟動
}
if (!eventSink.isStart()) {
eventSink.start();//數(shù)據(jù)過濾器啟動
}
if (!eventParser.isStart()) {//數(shù)據(jù)解析器啟動
beforeStartEventParser(eventParser);
eventParser.start();
afterStartEventParser(eventParser);
}
logger.info("start successful....");
}
我們主要看下eventParser.start()方法里面的內(nèi)容遮糖。我們主要關注的是EventParser使如何在主從切換的條件下绣的,進行dump節(jié)點的確定的。我們跟蹤到AbstractEventParser類中的start()方法欲账,重點看下
// 4. 獲取最后的位置信息
EntryPosition position = findStartPosition(erosaConnection);
這塊有兩個實現(xiàn)屡江,但是canal目前使用的是MysqlEventParser,也就是基于Mysql的Binlog文件來進行數(shù)據(jù)同步赛不。我們看下代碼:
protected EntryPosition findStartPosition(ErosaConnection connection) throws IOException {
EntryPosition startPosition = findStartPositionInternal(connection);
if (needTransactionPosition.get()) {
logger.warn("prepare to find last position : {}", startPosition.toString());
Long preTransactionStartPosition = findTransactionBeginPosition(connection, startPosition);
if (!preTransactionStartPosition.equals(startPosition.getPosition())) {
logger.warn("find new start Transaction Position , old : {} , new : {}",
startPosition.getPosition(),
preTransactionStartPosition);
startPosition.setPosition(preTransactionStartPosition);
}
needTransactionPosition.compareAndSet(true, false);
}
return startPosition;
}
對于第一行findStartPositionInternal(connection)惩嘉,我們重點關注的情況是數(shù)據(jù)庫連接地址發(fā)生變化,也就是進行了主從切換的情況踢故。
boolean case2 = (standbyInfo == null || standbyInfo.getAddress() == null)
&& logPosition.getPostion().getServerId() != null
&& !logPosition.getPostion().getServerId().equals(findServerId(mysqlConnection));
if (case2) {
long timestamp = logPosition.getPostion().getTimestamp();
long newStartTimestamp = timestamp - fallbackIntervalInSeconds * 1000;
logger.warn("prepare to find start position by last position {}:{}:{}", new Object[]{"", "",
logPosition.getPostion().getTimestamp()});
EntryPosition findPosition = findByStartTimeStamp(mysqlConnection, newStartTimestamp);
// 重新置為一下
dumpErrorCount = 0;
return findPosition;
}
我們分析下case2這個條件文黎,其實就是表示的就是配置了主從切換,而且發(fā)生了serverId變化的情況殿较,在這種情況下耸峭,首先需要獲取到事件發(fā)生的時間戳,然后將這個事件發(fā)生的時間減去60s淋纲,也就是向前推一分鐘之后劳闹,在新的binlog文件中根據(jù)新的時間戳來找到當時對應的事件。
這塊根據(jù)時間戳來尋找事件的過程比較簡單洽瞬,首先根據(jù)binglog-index文件找到所有的binlog文件名本涕,然后遍歷binlog文件的頭,找到binlog文件的寫入時間伙窃,與新的時間戳進行對比菩颖,定位到binlog文件。定位到文件后对供,直接根據(jù)時間戳來進行遍歷位他,找到新的時間戳之前發(fā)生的那個事務起始位置。
/**
* 根據(jù)給定的時間戳产场,在指定的binlog中找到最接近于該時間戳(必須是小于時間戳)的一個事務起始位置鹅髓。
* 針對最后一個binlog會給定endPosition,避免無盡的查詢
*/
private EntryPosition findAsPerTimestampInSpecificLogFile(MysqlConnection mysqlConnection,
final Long startTimestamp,
final EntryPosition endPosition,
final String searchBinlogFile) {
final LogPosition logPosition = new LogPosition();
try {
mysqlConnection.reconnect();
// 開始遍歷文件
mysqlConnection.seek(searchBinlogFile, 4L, new SinkFunction<LogEvent>() {
private LogPosition lastPosition;
public boolean sink(LogEvent event) {
EntryPosition entryPosition = null;
try {
CanalEntry.Entry entry = parseAndProfilingIfNecessary(event);
if (entry == null) {
return true;
}
String logfilename = entry.getHeader().getLogfileName();
Long logfileoffset = entry.getHeader().getLogfileOffset();
Long logposTimestamp = entry.getHeader().getExecuteTime();
if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())
|| CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
logger.debug("compare exit condition:{},{},{}, startTimestamp={}...", new Object[]{
logfilename, logfileoffset, logposTimestamp, startTimestamp});
// 事務頭和尾尋找第一條記錄時間戳京景,如果最小的一條記錄都不滿足條件窿冯,可直接退出
if (logposTimestamp >= startTimestamp) {
return false;
}
}
if (StringUtils.equals(endPosition.getJournalName(), logfilename)
&& endPosition.getPosition() <= (logfileoffset + event.getEventLen())) {
return false;
}
// 記錄一下上一個事務結(jié)束的位置,即下一個事務的position
// position = current +
// data.length确徙,代表該事務的下一條offest醒串,避免多余的事務重復
if (CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
entryPosition = new EntryPosition(logfilename,
logfileoffset + event.getEventLen(),
logposTimestamp);
logger.debug("set {} to be pending start position before finding another proper one...",
entryPosition);
logPosition.setPostion(entryPosition);
} else if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())) {
// 當前事務開始位點
entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp);
logger.debug("set {} to be pending start position before finding another proper one...",
entryPosition);
logPosition.setPostion(entryPosition);
}
lastPosition = buildLastPosition(entry);
} catch (Throwable e) {
processSinkError(e, lastPosition, searchBinlogFile, 4L);
}
return running;
}
});
} catch (IOException e) {
logger.error("ERROR ## findAsPerTimestampInSpecificLogFile has an error", e);
}
if (logPosition.getPostion() != null) {
return logPosition.getPostion();
} else {
return null;
}
}
這塊的邏輯如下:
- 發(fā)送dump命令执桌,起始位置為4L,也就是跳過了binlog的第一個標志事件芜赌。
- canal收到binlog仰挣,開始進行對binlog文件進行解析。
- 主要我們看的是事務開始和事務提交的事件缠沈,判斷事務開始或結(jié)束的時間膘壶,是否小于我們要找的時間戳,如果大于等于洲愤,直接遍歷下一個事件颓芭。
- 傳入了一個endPosition,防止無限掃描柬赐。
- 雖說是從頭開始掃描的亡问,但是要想跳出遍歷,需要滿足一定的條件肛宋。在跳出遍歷之前州藕,最后一次設置的logPosition才是我們要招的logPosition。
- 如果是一個事務提交的事件悼吱,我們要找的position就是這個事件的position+event.length慎框。如果是事務開始,position就是當前事件的position后添。其他的事件都忽略笨枯。
至此,我們已經(jīng)找到了我們想要的binlog文件名和對應的事務開始position遇西,我們繼續(xù)下面的步驟即可馅精。
二、EventStore
這塊內(nèi)容的主要思想如下:
- 維護一個類似于Disruptor的RingBuffer粱檀,同時維護三個序列洲敢,put/get/ack。
- EventSink之后的數(shù)據(jù)茄蚯,調(diào)用put接口压彭,將數(shù)據(jù)放入環(huán)形隊列中。
- Canal client獲取數(shù)據(jù)渗常,調(diào)用get方法壮不。
- 異步調(diào)用ack方法,清除ack之前的數(shù)據(jù)皱碘。
- 值得注意的是询一,這塊get和ack采用了流式API的模式,get和ack異步進行,可以先get健蕊,然后異步調(diào)用ack菱阵。
- ack是有序的,不允許跳躍式的提交缩功。
三晴及、Binlog的Row模式
至此,我們基本上知道了canal是如何在發(fā)生數(shù)據(jù)庫主從切換時保證高可用和高可靠的掂之,我們可能還有疑惑:為什么要回退60s抗俄,來解析binlog,這樣不會導致數(shù)據(jù)重復嗎世舰?還有一些自增的update語句(不具備冪等性),不會產(chǎn)生數(shù)據(jù)錯誤嗎槽卫?要想回答這些問題跟压,就需要我們了解Binlog的Row模式了。
Mysql Binlog的Row模式記錄的歼培,是數(shù)據(jù)庫中每一行的數(shù)據(jù)變化震蒋,而不僅僅是sql語句。比如我們對數(shù)據(jù)庫中的多行躲庄,使用一條sql語句進行了修改查剖。在這種情況下,如果Binlog模式為Statement噪窘,只會記錄一條sql語句笋庄。而Row模式下,會對每一行的數(shù)據(jù)變化進行記錄倔监,以及變化前后每個字段的值直砂。這也就是為什么Row模式的binlog文件如此之大的原因。
對于一些不具備冪等性的sql語句浩习,采用Row語句進行Binlog解析時静暂,也是可以通過重復執(zhí)行,來保證我們數(shù)據(jù)的最終一致性的谱秽。這也就解釋了洽蛀,為什么要回退60s來進行Binlog位點定位、解析的問題疟赊〗脊考慮到Mysql主從的數(shù)據(jù)復制的延遲性(60s,一般來說的延遲沒有這么久)听绳,我們可以在主節(jié)點掛掉的情況下颂碘,回退60s到從節(jié)點上繼續(xù)進行binlog的解析。
當然,也需要考慮一些極端的情況头岔,也就是主從復制確實超過了60s的延遲塔拳,在這種情況下,就需要otter登場了峡竣】恳郑基本思路是:反查數(shù)據(jù)庫同步 (以數(shù)據(jù)庫最新版本同步,解決交替性适掰,比如設置一致性反查數(shù)據(jù)庫延遲閥值為60秒颂碧,即當同步過程中發(fā)現(xiàn)數(shù)據(jù)延遲超過了60秒,就會基于PK反查一次數(shù)據(jù)庫类浪,拿到當前最新值進行同步载城,減少交替性的問題)。