【源碼】canal和otter的高可靠性分析

一般來說,我們對于數(shù)據(jù)庫最主要的要求就是:數(shù)據(jù)不丟桂塞。不管是主從復制瓶摆,還是使用類似otter+canal這樣的數(shù)據(jù)庫同步方案凉逛,我們最基本的需求是,在數(shù)據(jù)不丟失的前提下群井,盡可能的保證系統(tǒng)的高可用状飞,也就是在某個節(jié)點掛掉,或者數(shù)據(jù)庫發(fā)生主從切換等情況下书斜,我們的數(shù)據(jù)同步系統(tǒng)依然能夠發(fā)揮它的作用--數(shù)據(jù)同步诬辈。本文討論的場景是數(shù)據(jù)庫發(fā)生主從切換,本文將從源碼的角度荐吉,來看看otter和canal是如何保證高可用和高可靠的焙糟。

一、EventParser

通過閱讀文檔和源碼样屠,我們可以知道穿撮,對于一個canal server,基礎的框架包括以下幾個部分:MetaManager痪欲、EventParser悦穿、EventSink和EventStore。其中EventParser的作用就是發(fā)送dump命令业踢,從mysql數(shù)據(jù)庫獲取binlog文件栗柒。發(fā)送dump命令,可以指定時間戳或者position知举,從指定的時間或者位置開始dump傍衡。我們來看看過程:

首先是CanalServer啟動深员。otter默認使用的是內(nèi)置版的canal server负蠕,所以我們主要看CanalServerWithEmbedded這個類蛙埂。來看下他的啟動過程:

    public void start(final String destination) {
        final CanalInstance canalInstance = canalInstances.get(destination);
        if (!canalInstance.isStart()) {
            try {
                MDC.put("destination", destination);
                canalInstance.start();//啟動實例
                logger.info("start CanalInstances[{}] successfully", destination);
            } finally {
                MDC.remove("destination");
            }
        }
    }

我們看下實例啟動那一行,跟到AbstractCanalInstance類中

    public void start() {
        super.start();
        if (!metaManager.isStart()) {
            metaManager.start();//源數(shù)據(jù)管理啟動
        }

        if (!alarmHandler.isStart()) {
            alarmHandler.start();//報警處理器啟動
        }

        if (!eventStore.isStart()) {
            eventStore.start();//數(shù)據(jù)存儲器啟動
        }

        if (!eventSink.isStart()) {
            eventSink.start();//數(shù)據(jù)過濾器啟動
        }

        if (!eventParser.isStart()) {//數(shù)據(jù)解析器啟動
            beforeStartEventParser(eventParser);
            eventParser.start();
            afterStartEventParser(eventParser);
        }
        logger.info("start successful....");
    }

我們主要看下eventParser.start()方法里面的內(nèi)容遮糖。我們主要關注的是EventParser使如何在主從切換的條件下绣的,進行dump節(jié)點的確定的。我們跟蹤到AbstractEventParser類中的start()方法欲账,重點看下

// 4. 獲取最后的位置信息
EntryPosition position = findStartPosition(erosaConnection);

這塊有兩個實現(xiàn)屡江,但是canal目前使用的是MysqlEventParser,也就是基于Mysql的Binlog文件來進行數(shù)據(jù)同步赛不。我們看下代碼:

protected EntryPosition findStartPosition(ErosaConnection connection) throws IOException {
    EntryPosition startPosition = findStartPositionInternal(connection);
    if (needTransactionPosition.get()) {
        logger.warn("prepare to find last position : {}", startPosition.toString());
        Long preTransactionStartPosition = findTransactionBeginPosition(connection, startPosition);
        if (!preTransactionStartPosition.equals(startPosition.getPosition())) {
            logger.warn("find new start Transaction Position , old : {} , new : {}",
                    startPosition.getPosition(),
                    preTransactionStartPosition);
            startPosition.setPosition(preTransactionStartPosition);
        }
        needTransactionPosition.compareAndSet(true, false);
    }
    return startPosition;
}

對于第一行findStartPositionInternal(connection)惩嘉,我們重點關注的情況是數(shù)據(jù)庫連接地址發(fā)生變化,也就是進行了主從切換的情況踢故。

boolean case2 = (standbyInfo == null || standbyInfo.getAddress() == null)
                && logPosition.getPostion().getServerId() != null
                && !logPosition.getPostion().getServerId().equals(findServerId(mysqlConnection));
if (case2) {
    long timestamp = logPosition.getPostion().getTimestamp();
    long newStartTimestamp = timestamp - fallbackIntervalInSeconds * 1000;
    logger.warn("prepare to find start position by last position {}:{}:{}", new Object[]{"", "",
                logPosition.getPostion().getTimestamp()});
    EntryPosition findPosition = findByStartTimeStamp(mysqlConnection, newStartTimestamp);
    // 重新置為一下
    dumpErrorCount = 0;
    return findPosition;
}

我們分析下case2這個條件文黎,其實就是表示的就是配置了主從切換,而且發(fā)生了serverId變化的情況殿较,在這種情況下耸峭,首先需要獲取到事件發(fā)生的時間戳,然后將這個事件發(fā)生的時間減去60s淋纲,也就是向前推一分鐘之后劳闹,在新的binlog文件中根據(jù)新的時間戳來找到當時對應的事件。

這塊根據(jù)時間戳來尋找事件的過程比較簡單洽瞬,首先根據(jù)binglog-index文件找到所有的binlog文件名本涕,然后遍歷binlog文件的頭,找到binlog文件的寫入時間伙窃,與新的時間戳進行對比菩颖,定位到binlog文件。定位到文件后对供,直接根據(jù)時間戳來進行遍歷位他,找到新的時間戳之前發(fā)生的那個事務起始位置。

/**
 * 根據(jù)給定的時間戳产场,在指定的binlog中找到最接近于該時間戳(必須是小于時間戳)的一個事務起始位置鹅髓。
 * 針對最后一個binlog會給定endPosition,避免無盡的查詢
 */
private EntryPosition findAsPerTimestampInSpecificLogFile(MysqlConnection mysqlConnection,
                                                              final Long startTimestamp,
                                                              final EntryPosition endPosition,
                                                              final String searchBinlogFile) {

    final LogPosition logPosition = new LogPosition();
    try {
        mysqlConnection.reconnect();
        // 開始遍歷文件
        mysqlConnection.seek(searchBinlogFile, 4L, new SinkFunction<LogEvent>() {

            private LogPosition lastPosition;

            public boolean sink(LogEvent event) {
                EntryPosition entryPosition = null;
                try {
                    CanalEntry.Entry entry = parseAndProfilingIfNecessary(event);
                    if (entry == null) {
                        return true;
                    }

                    String logfilename = entry.getHeader().getLogfileName();
                    Long logfileoffset = entry.getHeader().getLogfileOffset();
                    Long logposTimestamp = entry.getHeader().getExecuteTime();

                    if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())
                            || CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
                        logger.debug("compare exit condition:{},{},{}, startTimestamp={}...", new Object[]{
                                logfilename, logfileoffset, logposTimestamp, startTimestamp});
                        // 事務頭和尾尋找第一條記錄時間戳京景,如果最小的一條記錄都不滿足條件窿冯,可直接退出
                        if (logposTimestamp >= startTimestamp) {
                            return false;
                        }
                    }

                    if (StringUtils.equals(endPosition.getJournalName(), logfilename)
                            && endPosition.getPosition() <= (logfileoffset + event.getEventLen())) {
                        return false;
                    }

                    // 記錄一下上一個事務結(jié)束的位置,即下一個事務的position
                    // position = current +
                    // data.length确徙,代表該事務的下一條offest醒串,避免多余的事務重復
                    if (CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
                        entryPosition = new EntryPosition(logfilename,
                                logfileoffset + event.getEventLen(),
                                logposTimestamp);
                        logger.debug("set {} to be pending start position before finding another proper one...",
                                entryPosition);
                        logPosition.setPostion(entryPosition);
                    } else if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())) {
                        // 當前事務開始位點
                        entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp);
                        logger.debug("set {} to be pending start position before finding another proper one...",
                                entryPosition);
                        logPosition.setPostion(entryPosition);
                    }

                    lastPosition = buildLastPosition(entry);
                } catch (Throwable e) {
                    processSinkError(e, lastPosition, searchBinlogFile, 4L);
                }

                return running;
            }
        });

    } catch (IOException e) {
        logger.error("ERROR ## findAsPerTimestampInSpecificLogFile has an error", e);
    }

    if (logPosition.getPostion() != null) {
        return logPosition.getPostion();
    } else {
        return null;
    }
}

這塊的邏輯如下:

  • 發(fā)送dump命令执桌,起始位置為4L,也就是跳過了binlog的第一個標志事件芜赌。
  • canal收到binlog仰挣,開始進行對binlog文件進行解析。
  • 主要我們看的是事務開始和事務提交的事件缠沈,判斷事務開始或結(jié)束的時間膘壶,是否小于我們要找的時間戳,如果大于等于洲愤,直接遍歷下一個事件颓芭。
  • 傳入了一個endPosition,防止無限掃描柬赐。
  • 雖說是從頭開始掃描的亡问,但是要想跳出遍歷,需要滿足一定的條件肛宋。在跳出遍歷之前州藕,最后一次設置的logPosition才是我們要招的logPosition。
  • 如果是一個事務提交的事件悼吱,我們要找的position就是這個事件的position+event.length慎框。如果是事務開始,position就是當前事件的position后添。其他的事件都忽略笨枯。

至此,我們已經(jīng)找到了我們想要的binlog文件名和對應的事務開始position遇西,我們繼續(xù)下面的步驟即可馅精。

二、EventStore

這塊內(nèi)容的主要思想如下:

  • 維護一個類似于Disruptor的RingBuffer粱檀,同時維護三個序列洲敢,put/get/ack。
  • EventSink之后的數(shù)據(jù)茄蚯,調(diào)用put接口压彭,將數(shù)據(jù)放入環(huán)形隊列中。
  • Canal client獲取數(shù)據(jù)渗常,調(diào)用get方法壮不。
  • 異步調(diào)用ack方法,清除ack之前的數(shù)據(jù)皱碘。
  • 值得注意的是询一,這塊get和ack采用了流式API的模式,get和ack異步進行,可以先get健蕊,然后異步調(diào)用ack菱阵。
  • ack是有序的,不允許跳躍式的提交缩功。

三晴及、Binlog的Row模式

至此,我們基本上知道了canal是如何在發(fā)生數(shù)據(jù)庫主從切換時保證高可用和高可靠的掂之,我們可能還有疑惑:為什么要回退60s抗俄,來解析binlog,這樣不會導致數(shù)據(jù)重復嗎世舰?還有一些自增的update語句(不具備冪等性),不會產(chǎn)生數(shù)據(jù)錯誤嗎槽卫?要想回答這些問題跟压,就需要我們了解Binlog的Row模式了。

Mysql Binlog的Row模式記錄的歼培,是數(shù)據(jù)庫中每一行的數(shù)據(jù)變化震蒋,而不僅僅是sql語句。比如我們對數(shù)據(jù)庫中的多行躲庄,使用一條sql語句進行了修改查剖。在這種情況下,如果Binlog模式為Statement噪窘,只會記錄一條sql語句笋庄。而Row模式下,會對每一行的數(shù)據(jù)變化進行記錄倔监,以及變化前后每個字段的值直砂。這也就是為什么Row模式的binlog文件如此之大的原因。

對于一些不具備冪等性的sql語句浩习,采用Row語句進行Binlog解析時静暂,也是可以通過重復執(zhí)行,來保證我們數(shù)據(jù)的最終一致性的谱秽。這也就解釋了洽蛀,為什么要回退60s來進行Binlog位點定位、解析的問題疟赊〗脊考慮到Mysql主從的數(shù)據(jù)復制的延遲性(60s,一般來說的延遲沒有這么久)听绳,我們可以在主節(jié)點掛掉的情況下颂碘,回退60s到從節(jié)點上繼續(xù)進行binlog的解析。

當然,也需要考慮一些極端的情況头岔,也就是主從復制確實超過了60s的延遲塔拳,在這種情況下,就需要otter登場了峡竣】恳郑基本思路是:反查數(shù)據(jù)庫同步 (以數(shù)據(jù)庫最新版本同步,解決交替性适掰,比如設置一致性反查數(shù)據(jù)庫延遲閥值為60秒颂碧,即當同步過程中發(fā)現(xiàn)數(shù)據(jù)延遲超過了60秒,就會基于PK反查一次數(shù)據(jù)庫类浪,拿到當前最新值進行同步载城,減少交替性的問題)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末费就,一起剝皮案震驚了整個濱河市诉瓦,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌力细,老刑警劉巖睬澡,帶你破解...
    沈念sama閱讀 212,185評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異眠蚂,居然都是意外死亡煞聪,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,445評論 3 385
  • 文/潘曉璐 我一進店門逝慧,熙熙樓的掌柜王于貴愁眉苦臉地迎上來昔脯,“玉大人,你說我怎么就攤上這事馋艺≌じ桑” “怎么了?”我有些...
    開封第一講書人閱讀 157,684評論 0 348
  • 文/不壞的土叔 我叫張陵捐祠,是天一觀的道長碱鳞。 經(jīng)常有香客問我,道長踱蛀,這世上最難降的妖魔是什么窿给? 我笑而不...
    開封第一講書人閱讀 56,564評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮率拒,結(jié)果婚禮上崩泡,老公的妹妹穿的比我還像新娘。我一直安慰自己猬膨,他們只是感情好角撞,可當我...
    茶點故事閱讀 65,681評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著,像睡著了一般谒所。 火紅的嫁衣襯著肌膚如雪热康。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,874評論 1 290
  • 那天劣领,我揣著相機與錄音姐军,去河邊找鬼。 笑死尖淘,一個胖子當著我的面吹牛奕锌,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播村生,決...
    沈念sama閱讀 39,025評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼惊暴,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了梆造?” 一聲冷哼從身側(cè)響起缴守,我...
    開封第一講書人閱讀 37,761評論 0 268
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎镇辉,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體贴捡,經(jīng)...
    沈念sama閱讀 44,217評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡忽肛,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,545評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了烂斋。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片屹逛。...
    茶點故事閱讀 38,694評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖汛骂,靈堂內(nèi)的尸體忽然破棺而出罕模,到底是詐尸還是另有隱情,我是刑警寧澤帘瞭,帶...
    沈念sama閱讀 34,351評論 4 332
  • 正文 年R本政府宣布淑掌,位于F島的核電站,受9級特大地震影響蝶念,放射性物質(zhì)發(fā)生泄漏抛腕。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 39,988評論 3 315
  • 文/蒙蒙 一媒殉、第九天 我趴在偏房一處隱蔽的房頂上張望担敌。 院中可真熱鬧,春花似錦廷蓉、人聲如沸全封。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,778評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽刹悴。三九已至行楞,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間颂跨,已是汗流浹背敢伸。 一陣腳步聲響...
    開封第一講書人閱讀 32,007評論 1 266
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留恒削,地道東北人池颈。 一個月前我還...
    沈念sama閱讀 46,427評論 2 360
  • 正文 我出身青樓,卻偏偏與公主長得像钓丰,于是被迫代替她去往敵國和親躯砰。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 43,580評論 2 349

推薦閱讀更多精彩內(nèi)容