Canal是阿里開源的binlog同步工具狮杨。可以解析binlog爽柒,并將解析后的數(shù)據(jù)同步到任何目標(biāo)存儲中吴菠。
1
Canal工作原理
? ? 1、mysql master節(jié)點將改變記錄保存到二進制binlog文件中浩村。
? ? 2做葵、canal 把自己偽裝成mysql slave節(jié)點,向master節(jié)點發(fā)送dump binlog請求。master節(jié)點收到請求并找到對應(yīng)binlog文件及binlog位置pos心墅。
? ? 3酿矢、master根據(jù)pos讀取binlog event,不斷發(fā)往slave節(jié)點(也就是canal)怎燥。
? ? 4瘫筐、slave節(jié)點收到binlog events并拷貝到slave的中繼日志。
? ? 5铐姚、slave結(jié)點回放中繼日志中的event并同步策肝。
? ? 6、新的binlog被master不斷廣播到slave節(jié)點隐绵,slave節(jié)點源源不斷解析同步之众。
1
2
3
4
5
6
Canal目錄結(jié)構(gòu)
canal-1.0.24/
├── bin
│?? ├── canal.pid
│?? ├── startup.bat 啟動canal server腳本
│?? ├── startup.sh? 啟動canal server腳本
│?? └── stop.sh? ? 停止canal server腳本
├── conf
│?? ├── canal.properties common屬性,是全局instance配置文件,被多個instance實例共享
│?? ├── canal_test? instance實例配置目錄
│?? │?? ├── instance.properties instance實例配置文件
│?? │?? ├── meta.dat 記錄Instance實例消費binlog position位置等信息
│?? ├── example 默認(rèn)instance實例
│?? │?? └── instance.properties
│?? ├── logback.xml 日志分割
│?? └── spring instance實例可選處理模式
│??? ? ├── default-instance.xml
│??? ? ├── file-instance.xml 基于文件
│??? ? ├── group-instance.xml
│??? ? ├── local-instance.xml
│??? ? └── memory-instance.xml 基于內(nèi)存
├── lib
└── logs
? ? ├── canal canal server運行日志
? ? │?? └── canal.log
? ? ├── canal_test instance實例日志
? ? │?? ├── canal_test.log
? ? │?? └── meta.log
? ? └── example 默認(rèn)instance實例日志
? ? ? ? └── example.log
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Canal 2種方式部署
配置Mysql
MySQL 開啟Binlog
修改/etc/my.cnf 配置文件,增加如下配置
[root@node2 ~]# vim /etc/my.cnf
#開啟binlog
[mysqld]
#binlog文件保存目錄及binlog文件名前綴
#binlog文件保存目錄: /var/lib/mysql/
#binlog文件名前綴: mysql-binlog
#mysql向文件名前綴添加數(shù)字后綴來按順序創(chuàng)建二進制日志文件 如mysql-binlog.000006 mysql-binlog.000007
log-bin=/var/lib/mysql/mysql-binlog
#選擇基于行的日志記錄方式
binlog-format=ROW
#服務(wù)器 id
#binlog數(shù)據(jù)中包含server_id,標(biāo)識該數(shù)據(jù)是由那個server同步過來的
server_id=1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
MySQL 配置Canal Server權(quán)限
CREATE USER 'canal_sync'@'%' IDENTIFIED BY 'canal_sync123';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal_sync'@'%';
FLUSH PRIVILEGES;
1
2
3
MySQL 建庫建表
create database canal_test;
use canal_test;
create table if not exists `user_info`(
? `userid` int,
? `name` varchar(100),
? `age` int
)engine=innodb default charset=utf8;
1
2
3
4
5
6
7
Canal Server單節(jié)點模式
1、Canal Server單節(jié)點模式,Canal Client可采用多Client多活或單Client進程的方式氢橙。但要注意存在的問題酝枢。
? ? 存在問題:
? ? ? ? (1)Server端會有單點問題。
? ? ? ? (2)多Client多活悍手,同時工作帘睦,并互為主備。但無法保證binlog的順序坦康。實際情況下一個Client進程加監(jiān)控就足以滿足需要竣付。
2、單節(jié)點模式binlog postion偏移量記錄的位置:每個instance配置目錄下的meta.dat文件中滞欠。如canal-1.0.24/conf/canal_test/meta.dat
1
2
3
4
5
Server端配置
下載解壓
[root@node3 software]# pwd
/data/software
[root@node3 software]# wget https://github.com/alibaba/canal/releases/download/canal-1.0.24/canal.deployer-1.0.24.tar.gz
[root@node3 software]# mkdir canal-1.0.24
[root@node3 software]# tar -zxvf canal.deployer-1.0.24.tar.gz -C canal-1.0.24
1
2
3
4
5
6
7
8
9
配置Instance
[root@node3 conf]# pwd
/data/software/canal-1.0.24/conf
#約定以要同步的庫名作為配置文件目錄名
[root@node3 conf]# cp -r example/ canal_test
[root@node3 conf]# vim canal_test/instance.properties
#slaveId
#每個instance都會偽裝成一個mysql slave節(jié)點
#同一個mysql實例(待同步的mysql節(jié)點),此slaveId應(yīng)該唯一
#若是集群模式,則同一集群中,相同的instance,此slaveId應(yīng)相同
canal.instance.mysql.slaveId = 111101
#master 庫
#canal運行時首要連接庫 可以為mysql 主庫
#如果只能基于從庫的binlog,這里用mysql從庫也可
canal.instance.master.address = node2:3306
#起始binlog文件
canal.instance.master.journal.name =
#起始binlog偏移量,同步該binlog位點后的數(shù)據(jù)
canal.instance.master.position =
#起始binlog時間戳,找到該時間戳對應(yīng)的binlog位點后開始同步
canal.instance.master.timestamp =
#standby 庫
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#mysql 數(shù)據(jù)庫賬號
canal.instance.dbUsername = canal_sync
canal.instance.dbPassword = canal_sync123
#默認(rèn)數(shù)據(jù)庫
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#注意:
#(1)這里黑白名單可以覆蓋defaultDatabaseName古胆。
#(2)如果Client端 配置了connector.subscribe則會覆蓋黑白名單配置
#表過濾--白名單 只監(jiān)聽庫表
#如testDB\..*只監(jiān)聽testDB數(shù)據(jù)庫,testDB\.test_1 只監(jiān)聽testDB庫中test_1表。多個用逗號分開
canal.instance.filter.regex = .*\\..*
#表過濾--黑名單 排除庫表
canal.instance.filter.black.regex =
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
啟動
[root@node3 canal-1.0.24]# bin/startup.sh
查看啟動日志,根據(jù)日志中的報錯排查問題
[root@node3 canal-1.0.24]# tail -f logs/canal_test/canal_test.log
1
2
3
4
Client端消費
pom依賴
<dependency>
? ? <groupId>com.alibaba.otter</groupId>
? ? <artifactId>canal.client</artifactId>
? ? <version>1.0.24</version>
</dependency>
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
public class ClientSample {
? ? public static void main(String args[]) {
? ? ? ? // 單連接
? ? ? ? CanalConnector connector = CanalConnectors.newSingleConnector(
? ? ? ? ? ? ? ? new InetSocketAddress("node3", 11111), "canal_test", "", "");
? ? ? ? // 集群連接
? ? ? ? //CanalConnector connector = CanalConnectors.newClusterConnector("192.168.113.101:2181,192.168.113.102:2181,192.168.113.103:2181/canal/cluster1", "canal_test", "", "");
? ? ? ? //計數(shù)器
? ? ? ? int emptyCount = 0;
? ? ? ? //一次最多拉多少條Message
? ? ? ? //注意:
? ? ? ? int batchSize = 1000;
? ? ? ? try {
? ? ? ? ? ? //和server建立連接
? ? ? ? ? ? connector.connect();
? ? ? ? ? ? //訂閱表
? ? ? ? ? ? connector.subscribe("canal_test.user_info");
? ? ? ? ? ? //回滾到上次ack的位置
? ? ? ? ? ? connector.rollback();
? ? ? ? ? ? //最大空閑次數(shù)
? ? ? ? ? ? int maxEmptyCount = 1000;
? ? ? ? ? ? //死循環(huán)去拉取數(shù)據(jù)
? ? ? ? ? ? while (emptyCount < maxEmptyCount) {
? ? ? ? ? ? ? ? //嘗試最多拿batchSize條記錄
? ? ? ? ? ? ? ? //注意:getWithoutAck一次筛璧,對應(yīng)一個Message逸绎。
? ? ? ? ? ? ? ? //一個Message有一個MessageID,還有一個List<CanalEntry.Entry>
? ? ? ? ? ? ? ? Message message = connector.getWithoutAck(batchSize);
? ? ? ? ? ? ? ? long batchId = message.getId();
? ? ? ? ? ? ? ? int size = message.getEntries().size();
? ? ? ? ? ? ? ? //沒有數(shù)據(jù)---等待
? ? ? ? ? ? ? ? if (batchId == -1 || size == 0) {
? ? ? ? ? ? ? ? ? ? emptyCount++;
//? ? ? ? ? ? ? ? ? ? System.out.println("empty count : " + emptyCount);
? ? ? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? ? ? Thread.sleep(1000);
? ? ? ? ? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? //拿到數(shù)據(jù)---開始解析--發(fā)送到目的地如kafka/elasticsearch/redis/hbase...等
? ? ? ? ? ? ? ? ? ? emptyCount = 0;
? ? ? ? ? ? ? ? ? ? //這里打印內(nèi)容
? ? ? ? ? ? ? ? ? ? printEntry(message.getEntries());
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? connector.ack(batchId); // 提交確認(rèn)
? ? ? ? ? ? ? ? //connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù)
? ? ? ? ? ? }
? ? ? ? ? ? System.out.println("empty too many times, exit");
? ? ? ? } finally {
? ? ? ? ? ? connector.disconnect();
? ? ? ? }
? ? }
? ? private static void printEntry(List<CanalEntry.Entry> entrys) {
? ? ? ? for (CanalEntry.Entry entry : entrys) {
? ? ? ? ? ? //該條數(shù)據(jù)的數(shù)據(jù)類型是事務(wù)開始或事務(wù)結(jié)束,不是binlog 二進制數(shù)據(jù)本身夭谤,就跳過繼續(xù)處理
? ? ? ? ? ? //注意:這里是有bug的棺牧,當(dāng)mysql開啟binlog,且為Row行模式朗儒,且開啟了在binlog中顯示原始SQL颊乘。
? ? ? ? ? ? //這時就會有一種新增的類型:CanalEntry.EventType.QUERY
? ? ? ? ? ? // 原始
? ? ? ? ? ? //if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
? ? ? ? ? ? //? ? continue;
? ? ? ? ? ? //}
? ? ? ? ? ? // 修改為 只保留binlog部分對應(yīng)的Entry并解析
? ? ? ? ? ? if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN //跳過事務(wù)開始的Entry
? ? ? ? ? ? ? ? ? ? || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND //跳過事務(wù)結(jié)束的Entry
? ? ? ? ? ? ? ? ? ? || entry.getHeader().getEventType() == CanalEntry.EventType.QUERY //跳過事務(wù)為原始SQL的Entry
? ? ? ? ? ? ? ? ? ? ) {
? ? ? ? ? ? ? ? continue;
? ? ? ? ? ? }
? ? ? ? ? ? //得到當(dāng)前行變化的數(shù)據(jù)Before参淹、After
? ? ? ? ? ? CanalEntry.RowChange rowChage = null;
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
? ? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ? throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
? ? ? ? ? ? ? ? ? ? ? ? e);
? ? ? ? ? ? }
? ? ? ? ? ? //事件類型
? ? ? ? ? ? CanalEntry.EventType eventType = rowChage.getEventType();
? ? ? ? ? ? System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
? ? ? ? ? ? ? ? ? ? entry.getHeader().getLogfileName(), //binlog文件名
? ? ? ? ? ? ? ? ? ? entry.getHeader().getLogfileOffset(),//binlog offset
? ? ? ? ? ? ? ? ? ? entry.getHeader().getSchemaName(),//庫名
? ? ? ? ? ? ? ? ? ? entry.getHeader().getTableName(),//表名
? ? ? ? ? ? ? ? ? ? eventType //事件類型
? ? ? ? ? ? ));
? ? ? ? ? ? for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
? ? ? ? ? ? ? ? //INSERT
? ? ? ? ? ? ? ? if (eventType == CanalEntry.EventType.INSERT) {
? ? ? ? ? ? ? ? ? ? printColumn(rowData.getAfterColumnsList());
? ? ? ? ? ? ? ? //DELETE
? ? ? ? ? ? ? ? } else if (eventType == CanalEntry.EventType.DELETE) {
? ? ? ? ? ? ? ? ? ? printColumn(rowData.getBeforeColumnsList());
? ? ? ? ? ? ? ? //UPDATE
? ? ? ? ? ? ? ? } else if (eventType == CanalEntry.EventType.UPDATE) {
? ? ? ? ? ? ? ? ? ? System.out.println("-------> before");
? ? ? ? ? ? ? ? ? ? printColumn(rowData.getBeforeColumnsList());
? ? ? ? ? ? ? ? ? ? System.out.println("-------> after");
? ? ? ? ? ? ? ? ? ? printColumn(rowData.getAfterColumnsList());
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? private static void printColumn(List<CanalEntry.Column> columns) {
? ? ? ? for (CanalEntry.Column column : columns) {
? ? ? ? ? ? //列名:列值 該列是否被更新
? ? ? ? ? ? System.out.println(column.getName() + " : " + column.getValue() + "? ? update=" + column.getUpdated());
? ? ? ? }
? ? }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
Insert
#mysql 插入
mysql> insert into user_info(userid,name,age) values(3,'name3',3);
#canal 解析binlog結(jié)果
================> binlog[mysql-binlog.000004:1694] , name[canal_test,user_info] , eventType : INSERT
userid : 3? ? update=true
name : name3? ? update=true
age : 3? ? update=true
1
2
3
4
5
6
7
8
Delete
#mysql 刪除
mysql> delete from user_info where userid=3;
#canal 解析binlog結(jié)果
================> binlog[mysql-binlog.000004:1898] , name[canal_test,user_info] , eventType : DELETE
userid : 3? ? update=false
name : name3? ? update=false
age : 3? ? update=false
1
2
3
4
5
6
7
8
Update
#mysql 修改
mysql> update user_info set name='name13',age=13 where userid=1;
#canal 解析binlog結(jié)果
================> binlog[mysql-binlog.000004:2528] , name[canal_test,user_info] , eventType : UPDATE
-------> before
userid : 1? ? update=false
name : name1? ? update=false
age : 20? ? update=false
-------> after
userid : 1? ? update=false
name : name13? ? update=true
age : 13? ? update=true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Canal Server集群模式(HA模式)
Canal Server集群模式Server端和Client端都采用單主多活的方式。這種協(xié)調(diào)由Zookeeper承擔(dān)乏悄。
除此之外浙值,Zookeeper還保存了一些元數(shù)據(jù)信息,如備選Canal Server檩小、當(dāng)前正在運行的Canal Server开呐、Binlog Position信息等。
1
2
Server端配置
Server1
[root@node3 canal-1.0.24]# vim conf/canal.properties
#同一個canal server集群,此id唯一
canal.id= 1
canal.ip=
#canal client 端訪問的端口
canal.port= 11111
#zk的地址 如果多個Canal 集群共享一個ZK识啦,那么每個Canal集群應(yīng)使用同一且唯一的rootpath
canal.zkServers= node1:2181,node2:2181,node3:2181/canal/cluster1
#canal 持久化數(shù)據(jù)到zk上的更新頻率,單位毫秒
canal.zookeeper.flush.period = 1000
#canal持久化數(shù)據(jù)到file上的目錄,默認(rèn)和instance.properties為同一目錄
canal.file.data.dir = ${canal.conf.dir}
#canal持久化數(shù)據(jù)到file上的更新頻率负蚊,單位毫秒
canal.file.flush.period = 1000
#canal內(nèi)存中可緩存buffer記錄數(shù),為2的指數(shù)
canal.instance.memory.buffer.size = 16384
#內(nèi)存記錄的單位大小神妹,默認(rèn)1KB颓哮,和buffer.size組合決定最終的內(nèi)存使用大小
canal.instance.memory.buffer.memunit = 1024
#canal內(nèi)存中數(shù)據(jù)緩存模式
#ITEMSIZE : 根據(jù)buffer.size進行限制,只限制記錄的數(shù)量
#MEMSIZE : 根據(jù)buffer.size * buffer.memunit的大小鸵荠,限制緩存記錄的大小
canal.instance.memory.batch.mode = MEMSIZE
#是否開啟心跳檢查
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
#心跳檢查sql
canal.instance.detecting.sql = select 1
#心跳檢查頻率冕茅,單位秒
canal.instance.detecting.interval.time = 3
#心跳檢查失敗重試次數(shù)
canal.instance.detecting.retry.threshold = 3
#心跳檢查失敗后,是否開啟自動mysql自動切換
#心跳檢查失敗超過閥值后蛹找,如果該配置為true姨伤,canal就會自動鏈到mysql備庫獲取binlog數(shù)據(jù)
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =? 1024
#canal發(fā)生mysql切換時,在新的mysql庫上查找binlog時需要往前查找的時間庸疾,單位秒
#mysql主備庫可能存在解析延遲或者時鐘不統(tǒng)一乍楚,需要回退一段時間,保證數(shù)據(jù)不丟
canal.instance.fallbackIntervalInSeconds = 60
#網(wǎng)絡(luò)鏈接參數(shù)
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
#是否忽略DCL的query語句届慈,比如grant/create user等
canal.instance.filter.query.dcl = false
#是否忽略DML的query語句徒溪,比如insert/update/delete
canal.instance.filter.query.dml = false
#是否忽略DDL的query語句,比如create table/alater table/drop table/rename table/create index/drop index.
canal.instance.filter.query.ddl = false
#
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
#ddl語句是否隔離發(fā)送金顿,開啟隔離可保證每次只返回發(fā)送一條ddl數(shù)據(jù)臊泌,不和其他dml語句混合返回
canal.instance.get.ddl.isolation = false
#################################################
#########? ? ? destinations? ? ? ? #############
#################################################
#當(dāng)前節(jié)點對應(yīng)的destinations
#這里定義了canal.destinations后,需要在canal.conf.dir對應(yīng)的目錄下建立同名目錄
canal.destinations= example
#配置文件根目錄
canal.conf.dir = ../conf
#開啟instance自動掃描
#如果配置為true揍拆,canal.conf.dir目錄下的instance配置變化會自動觸發(fā):
#a. instance目錄新增: 觸發(fā)instance配置載入渠概,lazy為true時則自動啟動
#b. instance目錄刪除:卸載對應(yīng)instance配置,如已啟動則進行關(guān)閉
#c. instance.properties文件變化:reload instance配置嫂拴,如已啟動自動進行重啟操作
canal.auto.scan = true
#instance自動掃描的間隔時間播揪,單位秒
canal.auto.scan.interval = 5
#全局配置加載方式
canal.instance.global.mode = spring
#全局lazy模式
canal.instance.global.lazy = false
#全局的manager配置方式的鏈接信息
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
#全局的spring配置方式的組件文件
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
Server2
[root@node3 software]# pwd
/data/software
[root@node3 software]# scp -r canal-1.0.24/ root@node1:/data/software
#修改canal.properties
[root@node1 canal-1.0.24]# vim conf/canal.properties
canal.id= 2
1
2
3
4
5
6
7
創(chuàng)建Zookeeper Znode
連上任意一臺zookeeper
[root@node1 zookeeper]# bin/zkCli.sh
創(chuàng)建znode
create /canal ""
create /canal/cluster1 ""
1
2
3
4
5
6
啟動所有Server
[root@node1 canal-1.0.24]# bin/startup.sh
[root@node3 canal-1.0.24]# bin/startup.sh
1
2
查看Zookeeper中Canal數(shù)據(jù)
Instance 候選canal server的列表
#可看到兩個候選canal server
[zk: localhost:2181(CONNECTED) 17] ls /canal/cluster1/otter/canal/destinations/canal_test/cluster
[192.168.113.103:11111, 192.168.113.101:11111]
1
2
3
Instance 當(dāng)前服務(wù)的canal server
[zk: localhost:2181(CONNECTED) 18] get /canal/cluster1/otter/canal/destinations/canal_test/running
{"active":true,"address":"192.168.113.101:11111","cid":1}
cZxid = 0x2000000028
ctime = Wed Aug 08 03:59:52 CST 2018
mZxid = 0x2000000028
mtime = Wed Aug 08 03:59:52 CST 2018
pZxid = 0x2000000028
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x16515ef4dae0000
dataLength = 57
numChildren = 0
1
2
3
4
5
6
7
8
9
10
11
12
13
Instance對應(yīng)的消費者
注意:需要在消費者啟動后才能看到。
[zk: localhost:2181(CONNECTED) 23] get /canal/cluster1/otter/canal/destinations/canal_test/1001/running
{"active":true,"address":"192.168.113.1:54048","clientId":1001}
cZxid = 0x2000000036
ctime = Wed Aug 08 04:09:25 CST 2018
mZxid = 0x2000000037
mtime = Wed Aug 08 04:09:26 CST 2018
pZxid = 0x2000000036
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x36515ef48010001
dataLength = 63
numChildren = 0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Client端消費
同server單節(jié)點模式筒狠,只需要修改一行即可猪狈。
使用集群連接:
? ? CanalConnector connector = CanalConnectors.newClusterConnector("192.168.113.101:2181,192.168.113.102:2181,192.168.113.103:2181/canal/cluster1", "canal_test", "", "");
1
2
3
總結(jié)和注意
1、生產(chǎn)環(huán)境下盡量采用HA的方式窟蓝。
2罪裹、關(guān)于Canal消費binlog的順序饱普,為保證binlog嚴(yán)格有序,盡量不要用多線程状共。
3套耕、如果Canal消費binlog后的數(shù)據(jù)要發(fā)往kafka,又要保證有序峡继,kafka topic 的partition可以設(shè)置成1個分區(qū)冯袍。
————————————————
版權(quán)聲明:本文為CSDN博主「wangpei1949」的原創(chuàng)文章,遵循 CC 4.0 BY-SA 版權(quán)協(xié)議碾牌,轉(zhuǎn)載請附上原文出處鏈接及本聲明康愤。
原文鏈接:https://blog.csdn.net/wangpei1949/article/details/81501272