數(shù)據(jù)收集之binlog同步---Canal

Canal是阿里開源的binlog同步工具狮杨。可以解析binlog爽柒,并將解析后的數(shù)據(jù)同步到任何目標(biāo)存儲中吴菠。

1

Canal工作原理


? ? 1、mysql master節(jié)點將改變記錄保存到二進制binlog文件中浩村。

? ? 2做葵、canal 把自己偽裝成mysql slave節(jié)點,向master節(jié)點發(fā)送dump binlog請求。master節(jié)點收到請求并找到對應(yīng)binlog文件及binlog位置pos心墅。

? ? 3酿矢、master根據(jù)pos讀取binlog event,不斷發(fā)往slave節(jié)點(也就是canal)怎燥。

? ? 4瘫筐、slave節(jié)點收到binlog events并拷貝到slave的中繼日志。

? ? 5铐姚、slave結(jié)點回放中繼日志中的event并同步策肝。

? ? 6、新的binlog被master不斷廣播到slave節(jié)點隐绵,slave節(jié)點源源不斷解析同步之众。

1

2

3

4

5

6

Canal目錄結(jié)構(gòu)

canal-1.0.24/

├── bin

│?? ├── canal.pid

│?? ├── startup.bat 啟動canal server腳本

│?? ├── startup.sh? 啟動canal server腳本

│?? └── stop.sh? ? 停止canal server腳本

├── conf

│?? ├── canal.properties common屬性,是全局instance配置文件,被多個instance實例共享

│?? ├── canal_test? instance實例配置目錄

│?? │?? ├── instance.properties instance實例配置文件

│?? │?? ├── meta.dat 記錄Instance實例消費binlog position位置等信息

│?? ├── example 默認(rèn)instance實例

│?? │?? └── instance.properties

│?? ├── logback.xml 日志分割

│?? └── spring instance實例可選處理模式

│??? ? ├── default-instance.xml

│??? ? ├── file-instance.xml 基于文件

│??? ? ├── group-instance.xml

│??? ? ├── local-instance.xml

│??? ? └── memory-instance.xml 基于內(nèi)存

├── lib

└── logs

? ? ├── canal canal server運行日志

? ? │?? └── canal.log

? ? ├── canal_test instance實例日志

? ? │?? ├── canal_test.log

? ? │?? └── meta.log

? ? └── example 默認(rèn)instance實例日志

? ? ? ? └── example.log

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

Canal 2種方式部署

配置Mysql

MySQL 開啟Binlog

修改/etc/my.cnf 配置文件,增加如下配置

[root@node2 ~]# vim /etc/my.cnf

#開啟binlog

[mysqld]

#binlog文件保存目錄及binlog文件名前綴

#binlog文件保存目錄: /var/lib/mysql/

#binlog文件名前綴: mysql-binlog

#mysql向文件名前綴添加數(shù)字后綴來按順序創(chuàng)建二進制日志文件 如mysql-binlog.000006 mysql-binlog.000007

log-bin=/var/lib/mysql/mysql-binlog

#選擇基于行的日志記錄方式

binlog-format=ROW

#服務(wù)器 id

#binlog數(shù)據(jù)中包含server_id,標(biāo)識該數(shù)據(jù)是由那個server同步過來的

server_id=1

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

MySQL 配置Canal Server權(quán)限

CREATE USER 'canal_sync'@'%' IDENTIFIED BY 'canal_sync123';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal_sync'@'%';

FLUSH PRIVILEGES;

1

2

3

MySQL 建庫建表

create database canal_test;

use canal_test;

create table if not exists `user_info`(

? `userid` int,

? `name` varchar(100),

? `age` int

)engine=innodb default charset=utf8;

1

2

3

4

5

6

7

Canal Server單節(jié)點模式

1、Canal Server單節(jié)點模式,Canal Client可采用多Client多活或單Client進程的方式氢橙。但要注意存在的問題酝枢。

? ? 存在問題:

? ? ? ? (1)Server端會有單點問題。

? ? ? ? (2)多Client多活悍手,同時工作帘睦,并互為主備。但無法保證binlog的順序坦康。實際情況下一個Client進程加監(jiān)控就足以滿足需要竣付。

2、單節(jié)點模式binlog postion偏移量記錄的位置:每個instance配置目錄下的meta.dat文件中滞欠。如canal-1.0.24/conf/canal_test/meta.dat

1

2

3

4

5

Server端配置

下載解壓

[root@node3 software]# pwd

/data/software

[root@node3 software]# wget https://github.com/alibaba/canal/releases/download/canal-1.0.24/canal.deployer-1.0.24.tar.gz

[root@node3 software]# mkdir canal-1.0.24

[root@node3 software]# tar -zxvf canal.deployer-1.0.24.tar.gz -C canal-1.0.24

1

2

3

4

5

6

7

8

9

配置Instance

[root@node3 conf]# pwd

/data/software/canal-1.0.24/conf

#約定以要同步的庫名作為配置文件目錄名

[root@node3 conf]# cp -r example/ canal_test

[root@node3 conf]# vim canal_test/instance.properties

#slaveId

#每個instance都會偽裝成一個mysql slave節(jié)點

#同一個mysql實例(待同步的mysql節(jié)點),此slaveId應(yīng)該唯一

#若是集群模式,則同一集群中,相同的instance,此slaveId應(yīng)相同

canal.instance.mysql.slaveId = 111101

#master 庫

#canal運行時首要連接庫 可以為mysql 主庫

#如果只能基于從庫的binlog,這里用mysql從庫也可

canal.instance.master.address = node2:3306

#起始binlog文件

canal.instance.master.journal.name =

#起始binlog偏移量,同步該binlog位點后的數(shù)據(jù)

canal.instance.master.position =

#起始binlog時間戳,找到該時間戳對應(yīng)的binlog位點后開始同步

canal.instance.master.timestamp =

#standby 庫

#canal.instance.standby.address =

#canal.instance.standby.journal.name =

#canal.instance.standby.position =

#canal.instance.standby.timestamp =

#mysql 數(shù)據(jù)庫賬號

canal.instance.dbUsername = canal_sync

canal.instance.dbPassword = canal_sync123

#默認(rèn)數(shù)據(jù)庫

canal.instance.defaultDatabaseName =

canal.instance.connectionCharset = UTF-8

#注意:

#(1)這里黑白名單可以覆蓋defaultDatabaseName古胆。

#(2)如果Client端 配置了connector.subscribe則會覆蓋黑白名單配置

#表過濾--白名單 只監(jiān)聽庫表

#如testDB\..*只監(jiān)聽testDB數(shù)據(jù)庫,testDB\.test_1 只監(jiān)聽testDB庫中test_1表。多個用逗號分開

canal.instance.filter.regex = .*\\..*

#表過濾--黑名單 排除庫表

canal.instance.filter.black.regex =

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

啟動

[root@node3 canal-1.0.24]# bin/startup.sh

查看啟動日志,根據(jù)日志中的報錯排查問題

[root@node3 canal-1.0.24]# tail -f logs/canal_test/canal_test.log

1

2

3

4

Client端消費

pom依賴

<dependency>

? ? <groupId>com.alibaba.otter</groupId>

? ? <artifactId>canal.client</artifactId>

? ? <version>1.0.24</version>

</dependency>

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.CanalEntry;

import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;

import java.util.List;

public class ClientSample {

? ? public static void main(String args[]) {

? ? ? ? // 單連接

? ? ? ? CanalConnector connector = CanalConnectors.newSingleConnector(

? ? ? ? ? ? ? ? new InetSocketAddress("node3", 11111), "canal_test", "", "");

? ? ? ? // 集群連接

? ? ? ? //CanalConnector connector = CanalConnectors.newClusterConnector("192.168.113.101:2181,192.168.113.102:2181,192.168.113.103:2181/canal/cluster1", "canal_test", "", "");

? ? ? ? //計數(shù)器

? ? ? ? int emptyCount = 0;

? ? ? ? //一次最多拉多少條Message

? ? ? ? //注意:

? ? ? ? int batchSize = 1000;

? ? ? ? try {

? ? ? ? ? ? //和server建立連接

? ? ? ? ? ? connector.connect();

? ? ? ? ? ? //訂閱表

? ? ? ? ? ? connector.subscribe("canal_test.user_info");

? ? ? ? ? ? //回滾到上次ack的位置

? ? ? ? ? ? connector.rollback();

? ? ? ? ? ? //最大空閑次數(shù)

? ? ? ? ? ? int maxEmptyCount = 1000;

? ? ? ? ? ? //死循環(huán)去拉取數(shù)據(jù)

? ? ? ? ? ? while (emptyCount < maxEmptyCount) {

? ? ? ? ? ? ? ? //嘗試最多拿batchSize條記錄

? ? ? ? ? ? ? ? //注意:getWithoutAck一次筛璧,對應(yīng)一個Message逸绎。

? ? ? ? ? ? ? ? //一個Message有一個MessageID,還有一個List<CanalEntry.Entry>

? ? ? ? ? ? ? ? Message message = connector.getWithoutAck(batchSize);

? ? ? ? ? ? ? ? long batchId = message.getId();

? ? ? ? ? ? ? ? int size = message.getEntries().size();

? ? ? ? ? ? ? ? //沒有數(shù)據(jù)---等待

? ? ? ? ? ? ? ? if (batchId == -1 || size == 0) {

? ? ? ? ? ? ? ? ? ? emptyCount++;

//? ? ? ? ? ? ? ? ? ? System.out.println("empty count : " + emptyCount);

? ? ? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? ? ? Thread.sleep(1000);

? ? ? ? ? ? ? ? ? ? } catch (InterruptedException e) {

? ? ? ? ? ? ? ? ? ? ? ? e.printStackTrace();

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? //拿到數(shù)據(jù)---開始解析--發(fā)送到目的地如kafka/elasticsearch/redis/hbase...等

? ? ? ? ? ? ? ? ? ? emptyCount = 0;

? ? ? ? ? ? ? ? ? ? //這里打印內(nèi)容

? ? ? ? ? ? ? ? ? ? printEntry(message.getEntries());

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? connector.ack(batchId); // 提交確認(rèn)

? ? ? ? ? ? ? ? //connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù)

? ? ? ? ? ? }

? ? ? ? ? ? System.out.println("empty too many times, exit");

? ? ? ? } finally {

? ? ? ? ? ? connector.disconnect();

? ? ? ? }

? ? }

? ? private static void printEntry(List<CanalEntry.Entry> entrys) {

? ? ? ? for (CanalEntry.Entry entry : entrys) {

? ? ? ? ? ? //該條數(shù)據(jù)的數(shù)據(jù)類型是事務(wù)開始或事務(wù)結(jié)束,不是binlog 二進制數(shù)據(jù)本身夭谤,就跳過繼續(xù)處理

? ? ? ? ? ? //注意:這里是有bug的棺牧,當(dāng)mysql開啟binlog,且為Row行模式朗儒,且開啟了在binlog中顯示原始SQL颊乘。

? ? ? ? ? ? //這時就會有一種新增的類型:CanalEntry.EventType.QUERY

? ? ? ? ? ? // 原始

? ? ? ? ? ? //if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {

? ? ? ? ? ? //? ? continue;

? ? ? ? ? ? //}

? ? ? ? ? ? // 修改為 只保留binlog部分對應(yīng)的Entry并解析

? ? ? ? ? ? if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN //跳過事務(wù)開始的Entry

? ? ? ? ? ? ? ? ? ? || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND //跳過事務(wù)結(jié)束的Entry

? ? ? ? ? ? ? ? ? ? || entry.getHeader().getEventType() == CanalEntry.EventType.QUERY //跳過事務(wù)為原始SQL的Entry

? ? ? ? ? ? ? ? ? ? ) {

? ? ? ? ? ? ? ? continue;

? ? ? ? ? ? }

? ? ? ? ? ? //得到當(dāng)前行變化的數(shù)據(jù)Before参淹、After

? ? ? ? ? ? CanalEntry.RowChange rowChage = null;

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

? ? ? ? ? ? } catch (Exception e) {

? ? ? ? ? ? ? ? throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),

? ? ? ? ? ? ? ? ? ? ? ? e);

? ? ? ? ? ? }

? ? ? ? ? ? //事件類型

? ? ? ? ? ? CanalEntry.EventType eventType = rowChage.getEventType();

? ? ? ? ? ? System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",

? ? ? ? ? ? ? ? ? ? entry.getHeader().getLogfileName(), //binlog文件名

? ? ? ? ? ? ? ? ? ? entry.getHeader().getLogfileOffset(),//binlog offset

? ? ? ? ? ? ? ? ? ? entry.getHeader().getSchemaName(),//庫名

? ? ? ? ? ? ? ? ? ? entry.getHeader().getTableName(),//表名

? ? ? ? ? ? ? ? ? ? eventType //事件類型

? ? ? ? ? ? ));

? ? ? ? ? ? for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {

? ? ? ? ? ? ? ? //INSERT

? ? ? ? ? ? ? ? if (eventType == CanalEntry.EventType.INSERT) {

? ? ? ? ? ? ? ? ? ? printColumn(rowData.getAfterColumnsList());

? ? ? ? ? ? ? ? //DELETE

? ? ? ? ? ? ? ? } else if (eventType == CanalEntry.EventType.DELETE) {

? ? ? ? ? ? ? ? ? ? printColumn(rowData.getBeforeColumnsList());

? ? ? ? ? ? ? ? //UPDATE

? ? ? ? ? ? ? ? } else if (eventType == CanalEntry.EventType.UPDATE) {

? ? ? ? ? ? ? ? ? ? System.out.println("-------> before");

? ? ? ? ? ? ? ? ? ? printColumn(rowData.getBeforeColumnsList());

? ? ? ? ? ? ? ? ? ? System.out.println("-------> after");

? ? ? ? ? ? ? ? ? ? printColumn(rowData.getAfterColumnsList());

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? }

? ? }

? ? private static void printColumn(List<CanalEntry.Column> columns) {

? ? ? ? for (CanalEntry.Column column : columns) {

? ? ? ? ? ? //列名:列值 該列是否被更新

? ? ? ? ? ? System.out.println(column.getName() + " : " + column.getValue() + "? ? update=" + column.getUpdated());

? ? ? ? }

? ? }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

Insert

#mysql 插入

mysql> insert into user_info(userid,name,age) values(3,'name3',3);

#canal 解析binlog結(jié)果

================> binlog[mysql-binlog.000004:1694] , name[canal_test,user_info] , eventType : INSERT

userid : 3? ? update=true

name : name3? ? update=true

age : 3? ? update=true

1

2

3

4

5

6

7

8

Delete

#mysql 刪除

mysql> delete from user_info where userid=3;

#canal 解析binlog結(jié)果

================> binlog[mysql-binlog.000004:1898] , name[canal_test,user_info] , eventType : DELETE

userid : 3? ? update=false

name : name3? ? update=false

age : 3? ? update=false

1

2

3

4

5

6

7

8

Update

#mysql 修改

mysql> update user_info set name='name13',age=13 where userid=1;

#canal 解析binlog結(jié)果

================> binlog[mysql-binlog.000004:2528] , name[canal_test,user_info] , eventType : UPDATE

-------> before

userid : 1? ? update=false

name : name1? ? update=false

age : 20? ? update=false

-------> after

userid : 1? ? update=false

name : name13? ? update=true

age : 13? ? update=true

1

2

3

4

5

6

7

8

9

10

11

12

13

14

Canal Server集群模式(HA模式)

Canal Server集群模式Server端和Client端都采用單主多活的方式。這種協(xié)調(diào)由Zookeeper承擔(dān)乏悄。

除此之外浙值,Zookeeper還保存了一些元數(shù)據(jù)信息,如備選Canal Server檩小、當(dāng)前正在運行的Canal Server开呐、Binlog Position信息等。

1

2

Server端配置

Server1

[root@node3 canal-1.0.24]# vim conf/canal.properties

#同一個canal server集群,此id唯一

canal.id= 1

canal.ip=

#canal client 端訪問的端口

canal.port= 11111

#zk的地址 如果多個Canal 集群共享一個ZK识啦,那么每個Canal集群應(yīng)使用同一且唯一的rootpath

canal.zkServers= node1:2181,node2:2181,node3:2181/canal/cluster1

#canal 持久化數(shù)據(jù)到zk上的更新頻率,單位毫秒

canal.zookeeper.flush.period = 1000

#canal持久化數(shù)據(jù)到file上的目錄,默認(rèn)和instance.properties為同一目錄

canal.file.data.dir = ${canal.conf.dir}

#canal持久化數(shù)據(jù)到file上的更新頻率负蚊,單位毫秒

canal.file.flush.period = 1000

#canal內(nèi)存中可緩存buffer記錄數(shù),為2的指數(shù)

canal.instance.memory.buffer.size = 16384

#內(nèi)存記錄的單位大小神妹,默認(rèn)1KB颓哮,和buffer.size組合決定最終的內(nèi)存使用大小

canal.instance.memory.buffer.memunit = 1024

#canal內(nèi)存中數(shù)據(jù)緩存模式

#ITEMSIZE : 根據(jù)buffer.size進行限制,只限制記錄的數(shù)量

#MEMSIZE : 根據(jù)buffer.size * buffer.memunit的大小鸵荠,限制緩存記錄的大小

canal.instance.memory.batch.mode = MEMSIZE

#是否開啟心跳檢查

canal.instance.detecting.enable = false

#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()

#心跳檢查sql

canal.instance.detecting.sql = select 1

#心跳檢查頻率冕茅,單位秒

canal.instance.detecting.interval.time = 3

#心跳檢查失敗重試次數(shù)

canal.instance.detecting.retry.threshold = 3

#心跳檢查失敗后,是否開啟自動mysql自動切換

#心跳檢查失敗超過閥值后蛹找,如果該配置為true姨伤,canal就會自動鏈到mysql備庫獲取binlog數(shù)據(jù)

canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery

canal.instance.transaction.size =? 1024

#canal發(fā)生mysql切換時,在新的mysql庫上查找binlog時需要往前查找的時間庸疾,單位秒

#mysql主備庫可能存在解析延遲或者時鐘不統(tǒng)一乍楚,需要回退一段時間,保證數(shù)據(jù)不丟

canal.instance.fallbackIntervalInSeconds = 60

#網(wǎng)絡(luò)鏈接參數(shù)

canal.instance.network.receiveBufferSize = 16384

canal.instance.network.sendBufferSize = 16384

canal.instance.network.soTimeout = 30

#是否忽略DCL的query語句届慈,比如grant/create user等

canal.instance.filter.query.dcl = false

#是否忽略DML的query語句徒溪,比如insert/update/delete

canal.instance.filter.query.dml = false

#是否忽略DDL的query語句,比如create table/alater table/drop table/rename table/create index/drop index.

canal.instance.filter.query.ddl = false

#

canal.instance.filter.table.error = false

canal.instance.filter.rows = false

# binlog format/image check

canal.instance.binlog.format = ROW,STATEMENT,MIXED

canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

#ddl語句是否隔離發(fā)送金顿,開啟隔離可保證每次只返回發(fā)送一條ddl數(shù)據(jù)臊泌,不和其他dml語句混合返回

canal.instance.get.ddl.isolation = false

#################################################

#########? ? ? destinations? ? ? ? #############

#################################################

#當(dāng)前節(jié)點對應(yīng)的destinations

#這里定義了canal.destinations后,需要在canal.conf.dir對應(yīng)的目錄下建立同名目錄

canal.destinations= example

#配置文件根目錄

canal.conf.dir = ../conf

#開啟instance自動掃描

#如果配置為true揍拆,canal.conf.dir目錄下的instance配置變化會自動觸發(fā):

#a. instance目錄新增: 觸發(fā)instance配置載入渠概,lazy為true時則自動啟動

#b. instance目錄刪除:卸載對應(yīng)instance配置,如已啟動則進行關(guān)閉

#c. instance.properties文件變化:reload instance配置嫂拴,如已啟動自動進行重啟操作

canal.auto.scan = true

#instance自動掃描的間隔時間播揪,單位秒

canal.auto.scan.interval = 5

#全局配置加載方式

canal.instance.global.mode = spring

#全局lazy模式

canal.instance.global.lazy = false

#全局的manager配置方式的鏈接信息

#canal.instance.global.manager.address = 127.0.0.1:1099

#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml

#全局的spring配置方式的組件文件

canal.instance.global.spring.xml = classpath:spring/file-instance.xml

#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

Server2

[root@node3 software]# pwd

/data/software

[root@node3 software]# scp -r canal-1.0.24/ root@node1:/data/software

#修改canal.properties

[root@node1 canal-1.0.24]# vim conf/canal.properties

canal.id= 2

1

2

3

4

5

6

7

創(chuàng)建Zookeeper Znode

連上任意一臺zookeeper

[root@node1 zookeeper]# bin/zkCli.sh

創(chuàng)建znode

create /canal ""

create /canal/cluster1 ""

1

2

3

4

5

6

啟動所有Server

[root@node1 canal-1.0.24]# bin/startup.sh

[root@node3 canal-1.0.24]# bin/startup.sh

1

2

查看Zookeeper中Canal數(shù)據(jù)

Instance 候選canal server的列表

#可看到兩個候選canal server

[zk: localhost:2181(CONNECTED) 17] ls /canal/cluster1/otter/canal/destinations/canal_test/cluster

[192.168.113.103:11111, 192.168.113.101:11111]

1

2

3

Instance 當(dāng)前服務(wù)的canal server

[zk: localhost:2181(CONNECTED) 18] get /canal/cluster1/otter/canal/destinations/canal_test/running

{"active":true,"address":"192.168.113.101:11111","cid":1}

cZxid = 0x2000000028

ctime = Wed Aug 08 03:59:52 CST 2018

mZxid = 0x2000000028

mtime = Wed Aug 08 03:59:52 CST 2018

pZxid = 0x2000000028

cversion = 0

dataVersion = 0

aclVersion = 0

ephemeralOwner = 0x16515ef4dae0000

dataLength = 57

numChildren = 0

1

2

3

4

5

6

7

8

9

10

11

12

13

Instance對應(yīng)的消費者

注意:需要在消費者啟動后才能看到。

[zk: localhost:2181(CONNECTED) 23] get /canal/cluster1/otter/canal/destinations/canal_test/1001/running

{"active":true,"address":"192.168.113.1:54048","clientId":1001}

cZxid = 0x2000000036

ctime = Wed Aug 08 04:09:25 CST 2018

mZxid = 0x2000000037

mtime = Wed Aug 08 04:09:26 CST 2018

pZxid = 0x2000000036

cversion = 0

dataVersion = 1

aclVersion = 0

ephemeralOwner = 0x36515ef48010001

dataLength = 63

numChildren = 0

1

2

3

4

5

6

7

8

9

10

11

12

13

14

Client端消費

同server單節(jié)點模式筒狠,只需要修改一行即可猪狈。

使用集群連接:

? ? CanalConnector connector = CanalConnectors.newClusterConnector("192.168.113.101:2181,192.168.113.102:2181,192.168.113.103:2181/canal/cluster1", "canal_test", "", "");

1

2

3

總結(jié)和注意

1、生產(chǎn)環(huán)境下盡量采用HA的方式窟蓝。

2罪裹、關(guān)于Canal消費binlog的順序饱普,為保證binlog嚴(yán)格有序,盡量不要用多線程状共。

3套耕、如果Canal消費binlog后的數(shù)據(jù)要發(fā)往kafka,又要保證有序峡继,kafka topic 的partition可以設(shè)置成1個分區(qū)冯袍。

————————————————

版權(quán)聲明:本文為CSDN博主「wangpei1949」的原創(chuàng)文章,遵循 CC 4.0 BY-SA 版權(quán)協(xié)議碾牌,轉(zhuǎn)載請附上原文出處鏈接及本聲明康愤。

原文鏈接:https://blog.csdn.net/wangpei1949/article/details/81501272

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市舶吗,隨后出現(xiàn)的幾起案子征冷,更是在濱河造成了極大的恐慌,老刑警劉巖誓琼,帶你破解...
    沈念sama閱讀 212,718評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件检激,死亡現(xiàn)場離奇詭異,居然都是意外死亡腹侣,警方通過查閱死者的電腦和手機叔收,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來傲隶,“玉大人饺律,你說我怎么就攤上這事《逯辏” “怎么了复濒?”我有些...
    開封第一講書人閱讀 158,207評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長帖鸦。 經(jīng)常有香客問我芝薇,道長,這世上最難降的妖魔是什么作儿? 我笑而不...
    開封第一講書人閱讀 56,755評論 1 284
  • 正文 為了忘掉前任洛二,我火速辦了婚禮,結(jié)果婚禮上攻锰,老公的妹妹穿的比我還像新娘晾嘶。我一直安慰自己,他們只是感情好娶吞,可當(dāng)我...
    茶點故事閱讀 65,862評論 6 386
  • 文/花漫 我一把揭開白布垒迂。 她就那樣靜靜地躺著,像睡著了一般妒蛇。 火紅的嫁衣襯著肌膚如雪机断。 梳的紋絲不亂的頭發(fā)上楷拳,一...
    開封第一講書人閱讀 50,050評論 1 291
  • 那天,我揣著相機與錄音吏奸,去河邊找鬼欢揖。 笑死,一個胖子當(dāng)著我的面吹牛奋蔚,可吹牛的內(nèi)容都是我干的她混。 我是一名探鬼主播,決...
    沈念sama閱讀 39,136評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼泊碑,長吁一口氣:“原來是場噩夢啊……” “哼坤按!你這毒婦竟也來了?” 一聲冷哼從身側(cè)響起馒过,我...
    開封第一講書人閱讀 37,882評論 0 268
  • 序言:老撾萬榮一對情侶失蹤臭脓,失蹤者是張志新(化名)和其女友劉穎,沒想到半個月后沉桌,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體谢鹊,經(jīng)...
    沈念sama閱讀 44,330評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,651評論 2 327
  • 正文 我和宋清朗相戀三年留凭,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片偎巢。...
    茶點故事閱讀 38,789評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡蔼夜,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出压昼,到底是詐尸還是另有隱情求冷,我是刑警寧澤,帶...
    沈念sama閱讀 34,477評論 4 333
  • 正文 年R本政府宣布窍霞,位于F島的核電站匠题,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏但金。R本人自食惡果不足惜韭山,卻給世界環(huán)境...
    茶點故事閱讀 40,135評論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望冷溃。 院中可真熱鬧钱磅,春花似錦、人聲如沸似枕。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,864評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽凿歼。三九已至褪迟,卻和暖如春冗恨,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背味赃。 一陣腳步聲響...
    開封第一講書人閱讀 32,099評論 1 267
  • 我被黑心中介騙來泰國打工派近, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人洁桌。 一個月前我還...
    沈念sama閱讀 46,598評論 2 362
  • 正文 我出身青樓渴丸,卻偏偏與公主長得像,于是被迫代替她去往敵國和親另凌。 傳聞我的和親對象是個殘疾皇子谱轨,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,697評論 2 351

推薦閱讀更多精彩內(nèi)容