架構(gòu)師必備:巧用Canal實現(xiàn)異步、解耦的架構(gòu)

本文介紹如何應(yīng)用Canal實現(xiàn)異步、解耦的架構(gòu)锰扶,后續(xù)有空再寫文章分析Canal原理和源代碼。

Canal簡介

Canal是用來獲取數(shù)據(jù)庫變更的中間件寝受。
偽裝自己為MySQL從庫坷牛,拉取主庫binlog并解析、處理很澄。處理結(jié)果可發(fā)送給MQ京闰,方便其他服務(wù)獲取數(shù)據(jù)庫變更消息,這一點非常有用甩苛。下面介紹一些典型用途蹂楣。

image.png

其中,Canal+MQ作為一個整體讯蒲,從外界看來就是一個數(shù)據(jù)管道服務(wù)服務(wù)痊土,如下圖。


image.png

Canal典型用途

異構(gòu)數(shù)據(jù)(如ES墨林、HBase赁酝、不同路由key的DB)

通過Canal自帶的adapter,同步異構(gòu)數(shù)據(jù)至ES旭等、HBase酌呆,而不用自行實現(xiàn)繁瑣的數(shù)據(jù)轉(zhuǎn)換、同步操作搔耕。這里的adapter就是典型的適配器模式隙袁,把數(shù)據(jù)轉(zhuǎn)成相應(yīng)格式,并寫入異構(gòu)的存儲系統(tǒng)弃榨。

image.png

當(dāng)然菩收,也可以同步數(shù)據(jù)至DB,甚至構(gòu)建一份按不同字段分片路由的數(shù)據(jù)庫鲸睛。
比如:下單時按用戶id分庫分表訂單記錄坛梁,然后借助Canal數(shù)據(jù)通道,構(gòu)建一份按商家id分庫分表的訂單記錄腊凶,用于B端業(yè)務(wù)(如商家查詢自己接到哪些訂單)划咐。

image.png

緩存刷新

緩存刷新的常規(guī)做法是,先更新DB褐缠,再刪除緩存政鼠,再延遲刪除(即cache-aside pattern+延遲雙刪),這種多步操作可能失敗队魏,而且實現(xiàn)相對復(fù)雜公般。借助Canal刷新緩存,使主服務(wù)胡桨、主流程無需關(guān)心緩存更新等一致性問題官帘,保證最終一致性。


image.png

價格變化等重要業(yè)務(wù)消息

下游服務(wù)可立即感知價格變化昧谊。
常規(guī)做法是刽虹,先修改價格,再發(fā)出消息呢诬,此處的難點是要保證消息一定發(fā)送成功涌哲,以及如果發(fā)送不成功時如何處理。借助Canal尚镰,不用在業(yè)務(wù)層面擔(dān)心消息丟失的問題阀圾。

數(shù)據(jù)庫遷移

  • 多機房數(shù)據(jù)同步
  • 拆庫
    雖然可以自己在代碼中實現(xiàn)雙寫邏輯,然后對歷史數(shù)據(jù)做處理狗唉,但是歷史數(shù)據(jù)也可能被更新初烘,需要不斷迭代對比、更新分俯,總之很復(fù)雜肾筐。

實時對賬

常規(guī)做法是定時任務(wù)跑對賬邏輯,時效性低澳迫,不能及時發(fā)現(xiàn)不一致問題局齿。借助Canal剧劝,可實時觸發(fā)對賬邏輯橄登。
大致流程如下:

  • 接收數(shù)據(jù)變更消息
  • 寫入hbase作為流水記錄
  • 一段窗口時間過后,觸發(fā)比較與對端數(shù)據(jù)做比較

Canal客戶端demo代碼分析

以下示例是客戶端連接Canal的例子讥此,修改自官方github示例拢锹,樓主做了一些優(yōu)化,并且在關(guān)鍵代碼行中加入了注釋萄喳。如果Canal把數(shù)據(jù)變更消息發(fā)送至MQ卒稳,寫法有所不同,不同之處只是一個是訂閱Canal他巨,一個是訂閱MQ充坑,但是解析和處理邏輯基本相同减江。

`

public void process() {
    // 每批次處理的條數(shù)
    int batchSize = 1024;
    while (running) {
        try {
            // 連上Canal服務(wù)
            connector.connect();
            // 訂閱數(shù)據(jù)(比如某個表)
            connector.subscribe("table_xxx");
            while (running) {
                // 批量獲取數(shù)據(jù)變更記錄
                Message message = connector.getWithoutAck(batchSize);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    // 非預(yù)期情況,需做異常處理
                } else {
                    // 打印數(shù)據(jù)變更明細
                    printEntry(message.getEntries());
                }

                if (batchId != -1) {
                    // 使用batchId做ack操作:表明該批次處理完成捻爷,更新Canal側(cè)消費進度
                    connector.ack(batchId);
                }
            }
        } catch (Throwable e) {
            logger.error("process error!", e);
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e1) {
                // ignore
            }

            // 處理失敗, 回滾進度
            connector.rollback();
        } finally {
            // 斷開連接
            connector.disconnect();
        }
    }
}

private void printEntry(List<Entry> entrys) {
    for (Entry entry : entrys) {
        long executeTime = entry.getHeader().getExecuteTime();
        long delayTime = new Date().getTime() - executeTime;
        Date date = new Date(entry.getHeader().getExecuteTime());
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        // 只關(guān)心數(shù)據(jù)變更的類型
        if (entry.getEntryType() == EntryType.ROWDATA) {
            RowChange rowChange = null;
            try {
                // 解析數(shù)據(jù)變更對象
                rowChange = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
            }

            EventType eventType = rowChange.getEventType();

            logger.info(row_format,
                new Object[] { entry.getHeader().getLogfileName(),
                        String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
                        entry.getHeader().getTableName(), eventType,
                        String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
                        entry.getHeader().getGtid(), String.valueOf(delayTime) });

            // 不關(guān)心查詢辈灼,和DDL變更
            if (eventType == EventType.QUERY || rowChange.getIsDdl()) {
                logger.info("ddl : " + rowChange.getIsDdl() + " ,  sql ----> " + rowChange.getSql() + SEP);
                continue;
            }

            for (RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    // 數(shù)據(jù)變更類型為 刪除 時,打印變化前的列值
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    // 數(shù)據(jù)變更類型為 插入 時也榄,打印變化后的列值
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    // 數(shù)據(jù)變更類型為 其他(即更新) 時巡莹,打印變化前后的列值
                    printColumn(rowData.getBeforeColumnsList());
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }
}

// 打印列值
private void printColumn(List<Column> columns) {
    for (Column column : columns) {
        StringBuilder builder = new StringBuilder();
        try {
            if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB")
                || StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) {
                // get value bytes
                builder.append(column.getName() + " : "
                               + new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8"));
            } else {
                builder.append(column.getName() + " : " + column.getValue());
            }
        } catch (UnsupportedEncodingException e) {
        }
        builder.append("    type=" + column.getMysqlType());
        if (column.getUpdated()) {
            builder.append("    update=" + column.getUpdated());
        }
        builder.append(SEP);
        logger.info(builder.toString());
    }
}

`

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市甜紫,隨后出現(xiàn)的幾起案子降宅,更是在濱河造成了極大的恐慌,老刑警劉巖囚霸,帶你破解...
    沈念sama閱讀 221,576評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件腰根,死亡現(xiàn)場離奇詭異,居然都是意外死亡邮辽,警方通過查閱死者的電腦和手機唠雕,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,515評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來吨述,“玉大人岩睁,你說我怎么就攤上這事〈г疲” “怎么了捕儒?”我有些...
    開封第一講書人閱讀 168,017評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長邓夕。 經(jīng)常有香客問我刘莹,道長,這世上最難降的妖魔是什么焚刚? 我笑而不...
    開封第一講書人閱讀 59,626評論 1 296
  • 正文 為了忘掉前任点弯,我火速辦了婚禮,結(jié)果婚禮上矿咕,老公的妹妹穿的比我還像新娘抢肛。我一直安慰自己,他們只是感情好碳柱,可當(dāng)我...
    茶點故事閱讀 68,625評論 6 397
  • 文/花漫 我一把揭開白布捡絮。 她就那樣靜靜地躺著,像睡著了一般莲镣。 火紅的嫁衣襯著肌膚如雪福稳。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 52,255評論 1 308
  • 那天瑞侮,我揣著相機與錄音的圆,去河邊找鬼鼓拧。 笑死,一個胖子當(dāng)著我的面吹牛越妈,可吹牛的內(nèi)容都是我干的毁枯。 我是一名探鬼主播,決...
    沈念sama閱讀 40,825評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼叮称,長吁一口氣:“原來是場噩夢啊……” “哼种玛!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起瓤檐,我...
    開封第一講書人閱讀 39,729評論 0 276
  • 序言:老撾萬榮一對情侶失蹤赂韵,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后挠蛉,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體祭示,經(jīng)...
    沈念sama閱讀 46,271評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,363評論 3 340
  • 正文 我和宋清朗相戀三年谴古,在試婚紗的時候發(fā)現(xiàn)自己被綠了质涛。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,498評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡掰担,死狀恐怖汇陆,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情带饱,我是刑警寧澤毡代,帶...
    沈念sama閱讀 36,183評論 5 350
  • 正文 年R本政府宣布,位于F島的核電站勺疼,受9級特大地震影響教寂,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜执庐,卻給世界環(huán)境...
    茶點故事閱讀 41,867評論 3 333
  • 文/蒙蒙 一酪耕、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧轨淌,春花似錦迂烁、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,338評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽狡忙。三九已至梳虽,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間灾茁,已是汗流浹背窜觉。 一陣腳步聲響...
    開封第一講書人閱讀 33,458評論 1 272
  • 我被黑心中介騙來泰國打工谷炸, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人禀挫。 一個月前我還...
    沈念sama閱讀 48,906評論 3 376
  • 正文 我出身青樓旬陡,卻偏偏與公主長得像,于是被迫代替她去往敵國和親语婴。 傳聞我的和親對象是個殘疾皇子描孟,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 45,507評論 2 359

推薦閱讀更多精彩內(nèi)容