本文介紹如何應(yīng)用Canal實現(xiàn)異步、解耦的架構(gòu)锰扶,后續(xù)有空再寫文章分析Canal原理和源代碼。
Canal簡介
Canal是用來獲取數(shù)據(jù)庫變更的中間件寝受。
偽裝自己為MySQL從庫坷牛,拉取主庫binlog并解析、處理很澄。處理結(jié)果可發(fā)送給MQ京闰,方便其他服務(wù)獲取數(shù)據(jù)庫變更消息,這一點非常有用甩苛。下面介紹一些典型用途蹂楣。
其中,Canal+MQ作為一個整體讯蒲,從外界看來就是一個數(shù)據(jù)管道服務(wù)服務(wù)痊土,如下圖。
Canal典型用途
異構(gòu)數(shù)據(jù)(如ES墨林、HBase赁酝、不同路由key的DB)
通過Canal自帶的adapter,同步異構(gòu)數(shù)據(jù)至ES旭等、HBase酌呆,而不用自行實現(xiàn)繁瑣的數(shù)據(jù)轉(zhuǎn)換、同步操作搔耕。這里的adapter就是典型的適配器模式隙袁,把數(shù)據(jù)轉(zhuǎn)成相應(yīng)格式,并寫入異構(gòu)的存儲系統(tǒng)弃榨。
當(dāng)然菩收,也可以同步數(shù)據(jù)至DB,甚至構(gòu)建一份按不同字段分片路由的數(shù)據(jù)庫鲸睛。
比如:下單時按用戶id分庫分表訂單記錄坛梁,然后借助Canal數(shù)據(jù)通道,構(gòu)建一份按商家id分庫分表的訂單記錄腊凶,用于B端業(yè)務(wù)(如商家查詢自己接到哪些訂單)划咐。
緩存刷新
緩存刷新的常規(guī)做法是,先更新DB褐缠,再刪除緩存政鼠,再延遲刪除(即cache-aside pattern+延遲雙刪),這種多步操作可能失敗队魏,而且實現(xiàn)相對復(fù)雜公般。借助Canal刷新緩存,使主服務(wù)胡桨、主流程無需關(guān)心緩存更新等一致性問題官帘,保證最終一致性。
價格變化等重要業(yè)務(wù)消息
下游服務(wù)可立即感知價格變化昧谊。
常規(guī)做法是刽虹,先修改價格,再發(fā)出消息呢诬,此處的難點是要保證消息一定發(fā)送成功涌哲,以及如果發(fā)送不成功時如何處理。借助Canal尚镰,不用在業(yè)務(wù)層面擔(dān)心消息丟失的問題阀圾。
數(shù)據(jù)庫遷移
- 多機房數(shù)據(jù)同步
- 拆庫
雖然可以自己在代碼中實現(xiàn)雙寫邏輯,然后對歷史數(shù)據(jù)做處理狗唉,但是歷史數(shù)據(jù)也可能被更新初烘,需要不斷迭代對比、更新分俯,總之很復(fù)雜肾筐。
實時對賬
常規(guī)做法是定時任務(wù)跑對賬邏輯,時效性低澳迫,不能及時發(fā)現(xiàn)不一致問題局齿。借助Canal剧劝,可實時觸發(fā)對賬邏輯橄登。
大致流程如下:
- 接收數(shù)據(jù)變更消息
- 寫入hbase作為流水記錄
- 一段窗口時間過后,觸發(fā)比較與對端數(shù)據(jù)做比較
Canal客戶端demo代碼分析
以下示例是客戶端連接Canal的例子讥此,修改自官方github示例拢锹,樓主做了一些優(yōu)化,并且在關(guān)鍵代碼行中加入了注釋萄喳。如果Canal把數(shù)據(jù)變更消息發(fā)送至MQ卒稳,寫法有所不同,不同之處只是一個是訂閱Canal他巨,一個是訂閱MQ充坑,但是解析和處理邏輯基本相同减江。
`
public void process() {
// 每批次處理的條數(shù)
int batchSize = 1024;
while (running) {
try {
// 連上Canal服務(wù)
connector.connect();
// 訂閱數(shù)據(jù)(比如某個表)
connector.subscribe("table_xxx");
while (running) {
// 批量獲取數(shù)據(jù)變更記錄
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// 非預(yù)期情況,需做異常處理
} else {
// 打印數(shù)據(jù)變更明細
printEntry(message.getEntries());
}
if (batchId != -1) {
// 使用batchId做ack操作:表明該批次處理完成捻爷,更新Canal側(cè)消費進度
connector.ack(batchId);
}
}
} catch (Throwable e) {
logger.error("process error!", e);
try {
Thread.sleep(1000L);
} catch (InterruptedException e1) {
// ignore
}
// 處理失敗, 回滾進度
connector.rollback();
} finally {
// 斷開連接
connector.disconnect();
}
}
}
private void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
long executeTime = entry.getHeader().getExecuteTime();
long delayTime = new Date().getTime() - executeTime;
Date date = new Date(entry.getHeader().getExecuteTime());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 只關(guān)心數(shù)據(jù)變更的類型
if (entry.getEntryType() == EntryType.ROWDATA) {
RowChange rowChange = null;
try {
// 解析數(shù)據(jù)變更對象
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
}
EventType eventType = rowChange.getEventType();
logger.info(row_format,
new Object[] { entry.getHeader().getLogfileName(),
String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(), eventType,
String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
entry.getHeader().getGtid(), String.valueOf(delayTime) });
// 不關(guān)心查詢辈灼,和DDL變更
if (eventType == EventType.QUERY || rowChange.getIsDdl()) {
logger.info("ddl : " + rowChange.getIsDdl() + " , sql ----> " + rowChange.getSql() + SEP);
continue;
}
for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.DELETE) {
// 數(shù)據(jù)變更類型為 刪除 時,打印變化前的列值
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
// 數(shù)據(jù)變更類型為 插入 時也榄,打印變化后的列值
printColumn(rowData.getAfterColumnsList());
} else {
// 數(shù)據(jù)變更類型為 其他(即更新) 時巡莹,打印變化前后的列值
printColumn(rowData.getBeforeColumnsList());
printColumn(rowData.getAfterColumnsList());
}
}
}
}
}
// 打印列值
private void printColumn(List<Column> columns) {
for (Column column : columns) {
StringBuilder builder = new StringBuilder();
try {
if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB")
|| StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) {
// get value bytes
builder.append(column.getName() + " : "
+ new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8"));
} else {
builder.append(column.getName() + " : " + column.getValue());
}
} catch (UnsupportedEncodingException e) {
}
builder.append(" type=" + column.getMysqlType());
if (column.getUpdated()) {
builder.append(" update=" + column.getUpdated());
}
builder.append(SEP);
logger.info(builder.toString());
}
}
`