Canal的數(shù)據(jù)傳輸有兩塊审姓,一塊是進(jìn)行binlog訂閱時(shí)园担,binlog轉(zhuǎn)換為我們所定義的Message近弟,第二塊是client與server進(jìn)行TCP交互時(shí)缅糟,傳輸?shù)腡CP協(xié)議。
一祷愉、EntryProtocal
這塊是binlog的一個(gè)存儲(chǔ)溺拱。主要的格式如下:
Entry
Header
version [協(xié)議的版本號(hào),default = 1]
logfileName [binlog文件名]
logfileOffset [binlog position]
serverId [服務(wù)端serverId]
serverenCode [變更數(shù)據(jù)的編碼]
executeTime [變更數(shù)據(jù)的執(zhí)行時(shí)間]
sourceType [變更數(shù)據(jù)的來源,default = MYSQL]
schemaName [變更數(shù)據(jù)的schemaname]
tableName [變更數(shù)據(jù)的tablename]
eventLength [每個(gè)event的長度]
eventType [insert/update/delete類型,default = UPDATE]
props [預(yù)留擴(kuò)展]
gtid [當(dāng)前事務(wù)的gitd]
entryType [事務(wù)頭BEGIN/事務(wù)尾END/數(shù)據(jù)ROWDATA/HEARTBEAT/GTIDLOG]
storeValue [byte數(shù)據(jù),可展開,對應(yīng)的類型為RowChange]
RowChange
tableId [tableId,由數(shù)據(jù)庫產(chǎn)生]
eventType [數(shù)據(jù)變更類型,default = UPDATE]
isDdl [標(biāo)識(shí)是否是ddl語句谣辞,比如create table/drop table]
sql [ddl/query的sql語句]
rowDatas [具體insert/update/delete的變更數(shù)據(jù)迫摔,可為多條,1個(gè)binlog event事件可對應(yīng)多條變更泥从,比如批處理]
beforeColumns [字段信息句占,增量數(shù)據(jù)(修改前,刪除前),Column類型的數(shù)組]
afterColumns [字段信息,增量數(shù)據(jù)(修改后,新增后),Column類型的數(shù)組]
props [預(yù)留擴(kuò)展]
props [預(yù)留擴(kuò)展]
ddlSchemaName [ddl/query的schemaName躯嫉,會(huì)存在跨庫ddl纱烘,需要保留執(zhí)行ddl的當(dāng)前schemaName]
Column
index [字段下標(biāo)]
sqlType [jdbc type]
name [字段名稱(忽略大小寫),在mysql中是沒有的]
isKey [是否為主鍵]
updated [是否發(fā)生過變更]
isNull [值是否為null]
props [預(yù)留擴(kuò)展]
value [字段值,timestamp,Datetime是一個(gè)時(shí)間格式的文本]
length [對應(yīng)數(shù)據(jù)對象原始長度]
mysqlType [字段mysql類型]
二祈餐、CanalProtocal
這塊主要定義了client和server交互的協(xié)議擂啥。
Packet
magic_number [default = 17]
version [default = 1]
type [PacketType,類型]
compression [壓縮帆阳,default = NONE]
body [具體內(nèi)容]
主要的類型和對應(yīng)的body哺壶,都可以在CanalProtocal.proto里面查看到。
enum PacketType {
HANDSHAKE = 1;
CLIENTAUTHENTICATION = 2;
ACK = 3;
SUBSCRIPTION = 4;
UNSUBSCRIPTION = 5;
GET = 6;
MESSAGES = 7;
CLIENTACK = 8;
// management part
SHUTDOWN = 9;
// integration
DUMP = 10;
HEARTBEAT = 11;
CLIENTROLLBACK = 12;
}
//心跳
message HeartBeat {
optional int64 send_timestamp = 1;
optional int64 start_timestamp = 2;
}
//握手
message Handshake {
optional string communication_encoding = 1 [default = "utf8"];
optional bytes seeds = 2;
repeated Compression supported_compressions = 3;
}
// client authentication
message ClientAuth {
optional string username = 1;
optional bytes password = 2; // hashed password with seeds from Handshake message
optional int32 net_read_timeout = 3 [default = 0]; // in seconds
optional int32 net_write_timeout = 4 [default = 0]; // in seconds
optional string destination = 5;
optional string client_id = 6;
optional string filter = 7;
optional int64 start_timestamp = 8;
}
//服務(wù)端響應(yīng)
message Ack {
optional int32 error_code = 1 [default = 0];
optional string error_message = 2; // if something like compression is not supported, erorr_message will tell about it.
}
//客戶端提交
message ClientAck {
optional string destination = 1;
optional string client_id = 2;
optional int64 batch_id = 3;
}
// subscription
message Sub {
optional string destination = 1;
optional string client_id = 2;
optional string filter = 7;
}
// Unsubscription
message Unsub {
optional string destination = 1;
optional string client_id = 2;
optional string filter = 7;
}
// PullRequest
message Get {
optional string destination = 1;
optional string client_id = 2;
optional int32 fetch_size = 3;
optional int64 timeout = 4 [default = -1]; // 默認(rèn)-1時(shí)代表不控制
optional int32 unit = 5 [default = 2];// 數(shù)字類型蜒谤,0:納秒,1:毫秒,2:微秒,3:秒,4:分鐘,5:小時(shí),6:天
optional bool auto_ack = 6 [default = false]; // 是否自動(dòng)ack
}
//消息
message Messages {
optional int64 batch_id = 1;
repeated bytes messages = 2;
}
// TBD when new packets are required
message Dump{
optional string journal = 1;
optional int64 position = 2;
optional int64 timestamp = 3 [default = 0];
}
// 客戶端回滾
message ClientRollback{
optional string destination = 1;
optional string client_id = 2;
optional int64 batch_id = 3;
}