在上一章節(jié)中室埋,我們配置啟動了canal server悔耘,本節(jié)我們使用Java語言編寫啟動client端消費server端同步過來的binlog數(shù)據(jù)诫惭。
1.添加依賴
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
2.核心Api介紹
在了解具體API之前驮宴,需要提前了解下canal client的類設(shè)計沛简,這樣才可以正確的使用好canal.
大致分為幾部分:
- ClientIdentity
canal client和server交互之間的身份標(biāo)識杆勇,目前clientId寫死為1001. (目前canal server上的一個instance只能有一個client消費贪壳,clientId的設(shè)計是為1個instance多client消費模式而預(yù)留的,暫時不需要理會) - CanalConnector
SimpleCanalConnector/ClusterCanalConnector : 兩種connector的實現(xiàn)蚜退,simple針對的是簡單的ip直連模式闰靴,cluster針對多ip的模式,可依賴CanalNodeAccessStrategy進行failover控制 - CanalNodeAccessStrategy
SimpleNodeAccessStrategy/ClusterNodeAccessStrategy:兩種failover的實現(xiàn)钻注,simple針對給定的初始ip列表進行failover選擇蚂且,cluster基于zookeeper上的cluster節(jié)點動態(tài)選擇正在運行的canal server. - ClientRunningMonitor/ClientRunningListener/ClientRunningData
client running相關(guān)控制,主要為解決client自身的failover機制幅恋。canal client允許同時啟動多個canal client杏死,通過running機制,可保證只有一個client在工作佳遣,其他client做為冷備. 當(dāng)運行中的client掛了识埋,running會控制讓冷備中的client轉(zhuǎn)為工作模式,這樣就可以確保canal client也不會是單點. 保證整個系統(tǒng)的高可用性.
javadoc查看:
- CanalConnector :(http://alibaba.github.io/canal/apidocs/1.0.13/com/alibaba/otter/canal/client/CanalConnector.html)
2.1.server/client交互協(xié)議
get/ack/rollback協(xié)議介紹:
- Message getWithoutAck(int batchSize)零渐,允許指定batchSize窒舟,一次可以獲取多條,每次返回的對象為Message诵盼,包含的內(nèi)容為:
a. batch id 唯一標(biāo)識
b. entries 具體的數(shù)據(jù)對象惠豺,可參見下面的數(shù)據(jù)介紹 - getWithoutAck(int batchSize, Long timeout, TimeUnit unit),相比于getWithoutAck(int batchSize)风宁,允許設(shè)定獲取數(shù)據(jù)的timeout超時時間
a. 拿夠batchSize條記錄或者超過timeout時間
b. timeout=0洁墙,阻塞等到足夠的batchSize - void rollback(long batchId),顧命思議戒财,回滾上次的get請求热监,重新獲取數(shù)據(jù)∫基于get獲取的batchId進行提交孝扛,避免誤操作
- void ack(long batchId),顧命思議幽崩,確認(rèn)已經(jīng)消費成功苦始,通知server刪除數(shù)據(jù)』派辏基于get獲取的batchId進行提交陌选,避免誤操作
canal的get/ack/rollback協(xié)議和常規(guī)的jms協(xié)議有所不同,允許get/ack異步處理,比如可以連續(xù)調(diào)用get多次咨油,后續(xù)異步按順序提交ack/rollback您炉,項目中稱之為流式api.
流式api設(shè)計的好處:
- get/ack異步化,減少因ack帶來的網(wǎng)絡(luò)延遲和操作成本 (99%的狀態(tài)都是處于正常狀態(tài)臼勉,異常的rollback屬于個別情況邻吭,沒必要為個別的case犧牲整個性能)
- get獲取數(shù)據(jù)后,業(yè)務(wù)消費存在瓶頸或者需要多進程/多線程消費時宴霸,可以不停的輪詢get數(shù)據(jù),不停的往后發(fā)送任務(wù)膏蚓,提高并行化. (作者在實際業(yè)務(wù)中的一個case:業(yè)務(wù)數(shù)據(jù)消費需要跨中美網(wǎng)絡(luò)瓢谢,所以一次操作基本在200ms以上,為了減少延遲驮瞧,所以需要實施并行化)
流式api設(shè)計:
- 每次get操作都會在meta中產(chǎn)生一個mark氓扛,mark標(biāo)記會遞增,保證運行過程中mark的唯一性
- 每次的get操作论笔,都會在上一次的mark操作記錄的cursor繼續(xù)往后取采郎,如果mark不存在,則在last ack cursor繼續(xù)往后取
- 進行ack時狂魔,需要按照mark的順序進行數(shù)序ack蒜埋,不能跳躍ack. ack會刪除當(dāng)前的mark標(biāo)記,并將對應(yīng)的mark位置更新為last ack cursor
- 一旦出現(xiàn)異常情況最楷,客戶端可發(fā)起rollback情況整份,重新置位:刪除所有的mark, 清理get請求位置,下次請求會從last ack cursor繼續(xù)往后取
流式api帶來的異步響應(yīng)模型:
2.2.數(shù)據(jù)對象格式簡單介紹
Entry
Header
logfileName [binlog文件名]
logfileOffset [binlog position]
executeTime [binlog里記錄變更發(fā)生的時間戳,精確到秒]
schemaName
tableName
eventType [insert/update/delete類型]
entryType [事務(wù)頭BEGIN/事務(wù)尾END/數(shù)據(jù)ROWDATA]
storeValue [byte數(shù)據(jù),可展開籽孙,對應(yīng)的類型為RowChange]
RowChange
isDdl [是否是ddl變更操作烈评,比如create table/drop table]
sql [具體的ddl sql]
rowDatas [具體insert/update/delete的變更數(shù)據(jù),可為多條犯建,1個binlog event事件可對應(yīng)多條變更讲冠,比如批處理]
beforeColumns [Column類型的數(shù)組,變更前的數(shù)據(jù)字段]
afterColumns [Column類型的數(shù)組适瓦,變更后的數(shù)據(jù)字段]
Column
index
sqlType [jdbc type]
name [column name]
isKey [是否為主鍵]
updated [是否發(fā)生過變更]
isNull [值是否為null]
value [具體的內(nèi)容竿开,注意為string文本]
說明:
- 可以提供數(shù)據(jù)庫變更前和變更后的字段內(nèi)容,針對binlog中沒有的name,isKey等信息進行補全
- 可以提供ddl的變更語句
- insert只有after columns, delete只有before columns犹菇,而update則會有before / after columns數(shù)據(jù).
2.3.Client使用例子
2.3.1.創(chuàng)建Connector
a. 創(chuàng)建SimpleCanalConnector (直連ip德迹,不支持server/client的failover機制)
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(AddressUtils.getHostIp(),11111), destination, "", "");
b. 創(chuàng)建ClusterCanalConnector (基于zookeeper獲取canal server ip,支持server/client的failover機制)
CanalConnector connector = CanalConnectors.newClusterConnector(
"10.20.144.51:2181", destination, "", "");
c. 創(chuàng)建ClusterCanalConnector (基于固定canal server的地址揭芍,支持固定的server ip的failover機制胳搞,不支持client的failover機制
CanalConnector connector = CanalConnectors.newClusterConnector(
Arrays.asList(new InetSocketAddress(AddressUtils.getHostIp(),11111)), destination,"", "");
2.3.2.get/ack/rollback使用
2.3.3.RowData數(shù)據(jù)處理
3.實戰(zhàn)
3.1.創(chuàng)建鏈接
// 創(chuàng)建鏈接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("172.0.0.1",11111), "example", "", "");
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
3.2.獲取數(shù)據(jù)
Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù)
long batchId = message.getId();
int size = message.getEntries().size();
3.3.數(shù)據(jù)處理
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
3.4.完整代碼
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
public class SimpleCanalClientExample {
public static void main(String args[]) {
// 創(chuàng)建鏈接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù)
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交確認(rèn)
// connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù)
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
4.運行Client
啟動Canal Client后,可以從控制臺從看到類似消息:
empty count : 1
empty count : 2
empty count : 3
empty count : 4
此時代表當(dāng)前數(shù)據(jù)庫無變更數(shù)據(jù)
觸發(fā)數(shù)據(jù)庫變更
mysql> use test;
Database changed
mysql> CREATE TABLE `xdual` (
-> `ID` int(11) NOT NULL AUTO_INCREMENT,
-> `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
-> PRIMARY KEY (`ID`)
-> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ;
Query OK, 0 rows affected (0.06 sec)
mysql> insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)
可以從控制臺中看到:
empty count : 1
empty count : 2
empty count : 3
empty count : 4
================> binlog[mysql-bin.001946:313661577] , name[test,xdual] , eventType : INSERT
ID : 4 update=true
X : 2013-02-05 23:29:46 update=true
本節(jié)完。
基于這個簡單的client demo肌毅,Billow后續(xù)會出一章節(jié)筷转,講解如何將數(shù)據(jù)發(fā)送到kafka隊列中,再從下游去消費處理數(shù)據(jù)保存到數(shù)據(jù)倉庫中悬而。