阿里開源Canal--③Client入門

在上一章節(jié)中室埋,我們配置啟動了canal server悔耘,本節(jié)我們使用Java語言編寫啟動client端消費server端同步過來的binlog數(shù)據(jù)诫惭。

1.添加依賴

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.2</version>
</dependency>

2.核心Api介紹

在了解具體API之前驮宴,需要提前了解下canal client的類設(shè)計沛简,這樣才可以正確的使用好canal.


大致分為幾部分:

  • ClientIdentity
    canal client和server交互之間的身份標(biāo)識杆勇,目前clientId寫死為1001. (目前canal server上的一個instance只能有一個client消費贪壳,clientId的設(shè)計是為1個instance多client消費模式而預(yù)留的,暫時不需要理會)
  • CanalConnector
    SimpleCanalConnector/ClusterCanalConnector : 兩種connector的實現(xiàn)蚜退,simple針對的是簡單的ip直連模式闰靴,cluster針對多ip的模式,可依賴CanalNodeAccessStrategy進行failover控制
  • CanalNodeAccessStrategy
    SimpleNodeAccessStrategy/ClusterNodeAccessStrategy:兩種failover的實現(xiàn)钻注,simple針對給定的初始ip列表進行failover選擇蚂且,cluster基于zookeeper上的cluster節(jié)點動態(tài)選擇正在運行的canal server.
  • ClientRunningMonitor/ClientRunningListener/ClientRunningData
    client running相關(guān)控制,主要為解決client自身的failover機制幅恋。canal client允許同時啟動多個canal client杏死,通過running機制,可保證只有一個client在工作佳遣,其他client做為冷備. 當(dāng)運行中的client掛了识埋,running會控制讓冷備中的client轉(zhuǎn)為工作模式,這樣就可以確保canal client也不會是單點. 保證整個系統(tǒng)的高可用性.

javadoc查看:

2.1.server/client交互協(xié)議


get/ack/rollback協(xié)議介紹:

  • Message getWithoutAck(int batchSize)零渐,允許指定batchSize窒舟,一次可以獲取多條,每次返回的對象為Message诵盼,包含的內(nèi)容為:
    a. batch id 唯一標(biāo)識
    b. entries 具體的數(shù)據(jù)對象惠豺,可參見下面的數(shù)據(jù)介紹
  • getWithoutAck(int batchSize, Long timeout, TimeUnit unit),相比于getWithoutAck(int batchSize)风宁,允許設(shè)定獲取數(shù)據(jù)的timeout超時時間
    a. 拿夠batchSize條記錄或者超過timeout時間
    b. timeout=0洁墙,阻塞等到足夠的batchSize
  • void rollback(long batchId),顧命思議戒财,回滾上次的get請求热监,重新獲取數(shù)據(jù)∫基于get獲取的batchId進行提交孝扛,避免誤操作
  • void ack(long batchId),顧命思議幽崩,確認(rèn)已經(jīng)消費成功苦始,通知server刪除數(shù)據(jù)』派辏基于get獲取的batchId進行提交陌选,避免誤操作

canal的get/ack/rollback協(xié)議和常規(guī)的jms協(xié)議有所不同,允許get/ack異步處理,比如可以連續(xù)調(diào)用get多次咨油,后續(xù)異步按順序提交ack/rollback您炉,項目中稱之為流式api.

流式api設(shè)計的好處:

  • get/ack異步化,減少因ack帶來的網(wǎng)絡(luò)延遲和操作成本 (99%的狀態(tài)都是處于正常狀態(tài)臼勉,異常的rollback屬于個別情況邻吭,沒必要為個別的case犧牲整個性能)
  • get獲取數(shù)據(jù)后,業(yè)務(wù)消費存在瓶頸或者需要多進程/多線程消費時宴霸,可以不停的輪詢get數(shù)據(jù),不停的往后發(fā)送任務(wù)膏蚓,提高并行化. (作者在實際業(yè)務(wù)中的一個case:業(yè)務(wù)數(shù)據(jù)消費需要跨中美網(wǎng)絡(luò)瓢谢,所以一次操作基本在200ms以上,為了減少延遲驮瞧,所以需要實施并行化)

流式api設(shè)計:

  • 每次get操作都會在meta中產(chǎn)生一個mark氓扛,mark標(biāo)記會遞增,保證運行過程中mark的唯一性
  • 每次的get操作论笔,都會在上一次的mark操作記錄的cursor繼續(xù)往后取采郎,如果mark不存在,則在last ack cursor繼續(xù)往后取
  • 進行ack時狂魔,需要按照mark的順序進行數(shù)序ack蒜埋,不能跳躍ack. ack會刪除當(dāng)前的mark標(biāo)記,并將對應(yīng)的mark位置更新為last ack cursor
  • 一旦出現(xiàn)異常情況最楷,客戶端可發(fā)起rollback情況整份,重新置位:刪除所有的mark, 清理get請求位置,下次請求會從last ack cursor繼續(xù)往后取

流式api帶來的異步響應(yīng)模型:


image.png

2.2.數(shù)據(jù)對象格式簡單介紹

Entry  
    Header  
        logfileName [binlog文件名]  
        logfileOffset [binlog position]  
        executeTime [binlog里記錄變更發(fā)生的時間戳,精確到秒]  
        schemaName   
        tableName  
        eventType [insert/update/delete類型]  
    entryType   [事務(wù)頭BEGIN/事務(wù)尾END/數(shù)據(jù)ROWDATA]  
    storeValue  [byte數(shù)據(jù),可展開籽孙,對應(yīng)的類型為RowChange]  

RowChange

isDdl       [是否是ddl變更操作烈评,比如create table/drop table]

sql         [具體的ddl sql]

rowDatas    [具體insert/update/delete的變更數(shù)據(jù),可為多條犯建,1個binlog event事件可對應(yīng)多條變更讲冠,比如批處理]

beforeColumns [Column類型的數(shù)組,變更前的數(shù)據(jù)字段]

afterColumns [Column類型的數(shù)組适瓦,變更后的數(shù)據(jù)字段]

Column

index

sqlType     [jdbc type]

name        [column name]

isKey       [是否為主鍵]

updated     [是否發(fā)生過變更]

isNull      [值是否為null]

value       [具體的內(nèi)容竿开,注意為string文本]  

說明:

  • 可以提供數(shù)據(jù)庫變更前和變更后的字段內(nèi)容,針對binlog中沒有的name,isKey等信息進行補全
  • 可以提供ddl的變更語句
  • insert只有after columns, delete只有before columns犹菇,而update則會有before / after columns數(shù)據(jù).

2.3.Client使用例子

2.3.1.創(chuàng)建Connector

a. 創(chuàng)建SimpleCanalConnector (直連ip德迹,不支持server/client的failover機制)

CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(AddressUtils.getHostIp(),11111), destination, "", "");

b. 創(chuàng)建ClusterCanalConnector (基于zookeeper獲取canal server ip,支持server/client的failover機制)

CanalConnector connector = CanalConnectors.newClusterConnector(
"10.20.144.51:2181", destination, "", "");

c. 創(chuàng)建ClusterCanalConnector (基于固定canal server的地址揭芍,支持固定的server ip的failover機制胳搞,不支持client的failover機制

CanalConnector connector = CanalConnectors.newClusterConnector(
Arrays.asList(new InetSocketAddress(AddressUtils.getHostIp(),11111)), destination,"", "");
2.3.2.get/ack/rollback使用
2.3.3.RowData數(shù)據(jù)處理

3.實戰(zhàn)

3.1.創(chuàng)建鏈接

// 創(chuàng)建鏈接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("172.0.0.1",11111), "example", "", "");

connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();

3.2.獲取數(shù)據(jù)

Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù)
long batchId = message.getId();
int size = message.getEntries().size();

3.3.數(shù)據(jù)處理

private static void printEntry(List<Entry> entrys) {
    for (Entry entry : entrys) {
        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
            continue;
        }

        RowChange rowChage = null;
        try {
            rowChage = RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                       e);
        }

        EventType eventType = rowChage.getEventType();
        System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

        for (RowData rowData : rowChage.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumn(rowData.getAfterColumnsList());
            } else {
                System.out.println("-------&gt; before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("-------&gt; after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }
}

3.4.完整代碼

import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample {


public static void main(String args[]) {
    // 創(chuàng)建鏈接
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                                                                                        11111), "example", "", "");
    int batchSize = 1000;
    int emptyCount = 0;
    try {
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();
        int totalEmptyCount = 120;
        while (emptyCount < totalEmptyCount) {
            Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù)
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                emptyCount++;
                System.out.println("empty count : " + emptyCount);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            } else {
                emptyCount = 0;
                // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                printEntry(message.getEntries());
            }

            connector.ack(batchId); // 提交確認(rèn)
            // connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù)
        }

        System.out.println("empty too many times, exit");
    } finally {
        connector.disconnect();
    }
}

private static void printEntry(List<Entry> entrys) {
    for (Entry entry : entrys) {
        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
            continue;
        }

        RowChange rowChage = null;
        try {
            rowChage = RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                       e);
        }

        EventType eventType = rowChage.getEventType();
        System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

        for (RowData rowData : rowChage.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumn(rowData.getAfterColumnsList());
            } else {
                System.out.println("-------&gt; before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("-------&gt; after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }
}

private static void printColumn(List<Column> columns) {
    for (Column column : columns) {
        System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
    }
}

4.運行Client

啟動Canal Client后,可以從控制臺從看到類似消息:

empty count : 1
empty count : 2
empty count : 3
empty count : 4

此時代表當(dāng)前數(shù)據(jù)庫無變更數(shù)據(jù)

觸發(fā)數(shù)據(jù)庫變更

mysql> use test;
Database changed
mysql> CREATE TABLE `xdual` (
    ->   `ID` int(11) NOT NULL AUTO_INCREMENT,
    ->   `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
    ->   PRIMARY KEY (`ID`)
    -> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ;
Query OK, 0 rows affected (0.06 sec)
mysql> insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)

可以從控制臺中看到:

empty count : 1
empty count : 2
empty count : 3
empty count : 4
================> binlog[mysql-bin.001946:313661577] , name[test,xdual] , eventType : INSERT
ID : 4    update=true
X : 2013-02-05 23:29:46    update=true

本節(jié)完。

基于這個簡單的client demo肌毅,Billow后續(xù)會出一章節(jié)筷转,講解如何將數(shù)據(jù)發(fā)送到kafka隊列中,再從下游去消費處理數(shù)據(jù)保存到數(shù)據(jù)倉庫中悬而。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末呜舒,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子笨奠,更是在濱河造成了極大的恐慌袭蝗,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,525評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件般婆,死亡現(xiàn)場離奇詭異到腥,居然都是意外死亡,警方通過查閱死者的電腦和手機蔚袍,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,203評論 3 395
  • 文/潘曉璐 我一進店門乡范,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人啤咽,你說我怎么就攤上這事晋辆。” “怎么了宇整?”我有些...
    開封第一講書人閱讀 164,862評論 0 354
  • 文/不壞的土叔 我叫張陵瓶佳,是天一觀的道長。 經(jīng)常有香客問我没陡,道長涩哟,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,728評論 1 294
  • 正文 為了忘掉前任盼玄,我火速辦了婚禮贴彼,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘埃儿。我一直安慰自己器仗,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,743評論 6 392
  • 文/花漫 我一把揭開白布童番。 她就那樣靜靜地躺著精钮,像睡著了一般。 火紅的嫁衣襯著肌膚如雪剃斧。 梳的紋絲不亂的頭發(fā)上轨香,一...
    開封第一講書人閱讀 51,590評論 1 305
  • 那天,我揣著相機與錄音幼东,去河邊找鬼臂容。 笑死科雳,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的脓杉。 我是一名探鬼主播糟秘,決...
    沈念sama閱讀 40,330評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼球散!你這毒婦竟也來了尿赚?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,244評論 0 276
  • 序言:老撾萬榮一對情侶失蹤蕉堰,失蹤者是張志新(化名)和其女友劉穎凌净,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體嘁灯,經(jīng)...
    沈念sama閱讀 45,693評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡泻蚊,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,885評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了丑婿。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點故事閱讀 40,001評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡没卸,死狀恐怖羹奉,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情约计,我是刑警寧澤诀拭,帶...
    沈念sama閱讀 35,723評論 5 346
  • 正文 年R本政府宣布,位于F島的核電站煤蚌,受9級特大地震影響耕挨,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜尉桩,卻給世界環(huán)境...
    茶點故事閱讀 41,343評論 3 330
  • 文/蒙蒙 一筒占、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧蜘犁,春花似錦翰苫、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,919評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至屈扎,卻和暖如春埃唯,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背鹰晨。 一陣腳步聲響...
    開封第一講書人閱讀 33,042評論 1 270
  • 我被黑心中介騙來泰國打工墨叛, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留止毕,地道東北人。 一個月前我還...
    沈念sama閱讀 48,191評論 3 370
  • 正文 我出身青樓巍实,卻偏偏與公主長得像滓技,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子棚潦,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,955評論 2 355

推薦閱讀更多精彩內(nèi)容