[toc]
canal是什么
canal是一個(gè)偽裝成slave訂閱mysql的binlog,實(shí)現(xiàn)數(shù)據(jù)同步的中間件浪汪。
中文文檔 https://www.wenjiangs.com/doc/canal-introduction
官網(wǎng) https://github.com/alibaba/canal
工作原理
1.canal模擬mysql slave的交互協(xié)議巴柿,偽裝自己為mysql slave,向mysql master發(fā)送dump協(xié)議
2.mysql master收到dump請(qǐng)求死遭,開始推送binary log給slave(也就是canal)
3.canal解析binary log對(duì)象(原始為byte流)
架構(gòu)
說明:
- server代表一個(gè)canal運(yùn)行實(shí)例广恢,對(duì)應(yīng)于一個(gè)jvm
- instance對(duì)應(yīng)于一個(gè)數(shù)據(jù)隊(duì)列 (1個(gè)server對(duì)應(yīng)1..n個(gè)instance)
instance模塊:
- eventParser (數(shù)據(jù)源接入,模擬slave協(xié)議和master進(jìn)行交互呀潭,協(xié)議解析)
- eventSink (Parser和Store鏈接器钉迷,進(jìn)行數(shù)據(jù)過濾,加工钠署,分發(fā)的工作)
- eventStore (數(shù)據(jù)存儲(chǔ))
- metaManager (增量訂閱&消費(fèi)信息管理器)
安裝
1.下載安裝包
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
tar -xvf canal.deployer-1.1.5.tar.gz
2.mysql開啟binlog
修改my.cnf
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
secure-file-priv= NULL
log-bin=mysql-bin
binlog-format=ROW
server_id=1
binlog是row模式
重啟后糠聪,執(zhí)行sql指令show variables like '%log_bin%'
3.創(chuàng)建mysql的canal用戶
mysql> CREATE USER 'canal'@'localhost' IDENTIFIED BY 'canal';
Query OK, 0 rows affected (0.00 sec)
mysql> GRANT ALL PRIVILEGES ON *.* TO 'canal'@'localhost' WITH GRANT OPTION;
Query OK, 0 rows affected (0.01 sec)
mysql> CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
Query OK, 0 rows affected (0.00 sec)
mysql> GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' WITH GRANT OPTION;
Query OK, 0 rows affected (0.00 sec)
mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)
4.修改canal配置文件
文件夾/root/canal/conf/有一個(gè)example文件夾,一個(gè)example就代表一個(gè)instance實(shí)例
vi /root/canal/conf/example/instance.properties
#################################################
# 定義mysql slave的id
canal.instance.mysql.slaveId=1234
# 填寫數(shù)據(jù)庫ip:端口
canal.instance.master.address=192.168.10.27:3306
# 填寫數(shù)據(jù)庫username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
#################################################
5.啟動(dòng)、關(guān)閉谐鼎、重啟canal
cd /root/canal/bin
sh startup.sh
sh stop.sh
sh restart.sh
6.相關(guān)日志
/root/canal/logs/canal/canal.log
/root/canal/logs/example/example.log
java代碼讀取binlog同步到redis
1.添加依賴
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
2.RedisUtil
package com.wangyue.study.canal;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class RedisUtil {
// Redis服務(wù)器IP
private static String ADDR = "192.168.10.27";
// Redis的端口號(hào)
private static int PORT = 6379;
// 訪問密碼
private static String AUTH = "hxcx123!@#";
// 可用連接實(shí)例的最大數(shù)目舰蟆,默認(rèn)值為8;
// 如果賦值為-1狸棍,則表示不限制身害;如果pool已經(jīng)分配了maxActive個(gè)jedis實(shí)例,則此時(shí)pool的狀態(tài)為exhausted(耗盡)隔缀。
private static int MAX_ACTIVE = 1024;
// 控制一個(gè)pool最多有多少個(gè)狀態(tài)為idle(空閑的)的jedis實(shí)例题造,默認(rèn)值也是8。
private static int MAX_IDLE = 200;
// 等待可用連接的最大時(shí)間猾瘸,單位毫秒界赔,默認(rèn)值為-1,表示永不超時(shí)牵触。如果超過等待時(shí)間淮悼,則直接拋出JedisConnectionException;
private static int MAX_WAIT = 10000;
// 過期時(shí)間
protected static int expireTime = 660 * 660 *24;
// 連接池
protected static JedisPool pool;
/**
* 靜態(tài)代碼揽思,只在初次調(diào)用一次
*/
static {
JedisPoolConfig config = new JedisPoolConfig();
//最大連接數(shù)
config.setMaxTotal(MAX_ACTIVE);
//最多空閑實(shí)例
config.setMaxIdle(MAX_IDLE);
//超時(shí)時(shí)間
config.setMaxWaitMillis(MAX_WAIT);
//
config.setTestOnBorrow(false);
pool = new JedisPool(config, ADDR, PORT, 1000, AUTH, 3);
}
/**
* 獲取jedis實(shí)例
*/
protected static synchronized Jedis getJedis() {
Jedis jedis = null;
try {
jedis = pool.getResource();
} catch (Exception e) {
e.printStackTrace();
}
return jedis;
}
/**
* 釋放jedis資源
*
* @param jedis
* @param isBroken
*/
protected static void closeResource(Jedis jedis, boolean isBroken) {
return;
}
/**
* 是否存在key
*
* @param key
*/
public static boolean existKey(String key) {
Jedis jedis = null;
boolean isBroken = false;
try {
jedis = getJedis();
jedis.select(0);
return jedis.exists(key);
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
return false;
}
/**
* 刪除key
*
* @param key
*/
public static void delKey(String key) {
Jedis jedis = null;
boolean isBroken = false;
try {
jedis = getJedis();
jedis.select(0);
jedis.del(key);
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
}
/**
* 取得key的值
*
* @param key
*/
public static String stringGet(String key) {
Jedis jedis = null;
boolean isBroken = false;
String lastVal = null;
try {
jedis = getJedis();
jedis.select(0);
lastVal = jedis.get(key);
jedis.expire(key, expireTime);
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
return lastVal;
}
/**
* 添加string數(shù)據(jù)
*
* @param key
* @param value
*/
public static String stringSet(String key, String value) {
Jedis jedis = null;
boolean isBroken = false;
String lastVal = null;
try {
jedis = getJedis();
jedis.select(0);
lastVal = jedis.set(key, value);
jedis.expire(key, expireTime);
} catch (Exception e) {
e.printStackTrace();
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
return lastVal;
}
/**
* 添加hash數(shù)據(jù)
*
* @param key
* @param field
* @param value
*/
public static void hashSet(String key, String field, String value) {
boolean isBroken = false;
Jedis jedis = null;
try {
jedis = getJedis();
if (jedis != null) {
jedis.select(0);
jedis.hset(key, field, value);
jedis.expire(key, expireTime);
}
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
}
}
3.CanalTest
package com.wangyue.study.canal;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import java.util.List;
public class CanalTest {
public static void main(String args[]) {
// 創(chuàng)建鏈接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.10.27", 11111),
"example", "canal", "canal");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù)
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交確認(rèn)
// connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù)
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
redisDelete(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
redisInsert(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
redisUpdate(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
private static void redisInsert( List<Column> columns){
JSONObject json=new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if(columns.size()>0){
RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
}
}
private static void redisUpdate( List<Column> columns){
JSONObject json=new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if(columns.size()>0){
RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
}
}
private static void redisDelete( List<Column> columns){
JSONObject json=new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if(columns.size()>0){
RedisUtil.delKey("user:"+ columns.get(0).getValue());
}
}
}
運(yùn)行后袜腥,在mysql數(shù)據(jù)庫里修改數(shù)據(jù)保存
控制臺(tái)結(jié)果:
canal集群搭建
安裝zookeeper
!钉汗!這里注意下羹令,不要使用zookeeper的高版本鲤屡,可能會(huì)出現(xiàn)啟動(dòng)失敗的情況,Starting zookeeper ... FAILED TO START
1.下載解壓
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz
tar -xvf zookeeper-3.4.9.tar.gz
2.修改配置
cd zookeeper-3.4.9/conf
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
############################
#設(shè)置數(shù)據(jù)存儲(chǔ)位置
dataDir=/root/zookeeper/data
############################
- 啟動(dòng)/重啟/關(guān)閉
./zkServer.sh start
./zkServer.sh restart
./zkServer.sh stop
- 查看狀態(tài)
./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /root/zookeeper/zookeeper-3.4.9/bin/../conf/zoo.cfg
Mode: standalone
- 客戶端連接
# 2181 是zk默認(rèn)端口
./zkCli.sh -server localhost:2181
集群部署
目前canal的集群部署僅支持HA形式福侈,使用zookeeper來實(shí)現(xiàn)搶占式HA酒来,一個(gè)active,多個(gè)standby肪凛。
修改canal配置文件
vi canal/conf/canal.properties
# register ip to zookeeper
canal.register.ip = 192.168.10.27
# zk地址堰汉,如果多個(gè)zk用逗號(hào)隔開且不留空格,例如10.105.10.123:2181,10.105.10.124:2181,10.105.10.125:2181
canal.zkServers = 192.168.10.27:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
部署從節(jié)點(diǎn)canal
拷貝主節(jié)點(diǎn)的canal到另一臺(tái)機(jī)器伟墙,修改instance配置
vi /root/canal/conf/example/instance.properties
# 設(shè)置slaveid翘鸭,和master不同即可
canal.instance.mysql.slaveId=1235
修改canal配置
vi /root/canal/conf/canal.properties
# register ip to zookeeper
canal.register.ip = 192.168.10.26
其他配置項(xiàng)都跟主節(jié)點(diǎn)一致,然后兩個(gè)節(jié)點(diǎn)canal啟動(dòng)
查看canal在zk中的狀態(tài)
./zkCli.sh -server localhost:2181
[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running
{"active":true,"address":"192.168.10.26:11111"}
cZxid = 0x86
ctime = Fri Mar 04 01:14:55 EST 2022
mZxid = 0x86
mtime = Fri Mar 04 01:14:55 EST 2022
pZxid = 0x86
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x17f53863740000a
dataLength = 47
numChildren = 0
部署情況:
- 部署了192.168.10.27:11111和192.168.10.26:11111
- 當(dāng)前192.168.10.26:11111節(jié)點(diǎn)是active戳葵,192.168.10.27:11111是standby
- 如果去192.168.10.26關(guān)閉canal就乓,可以看到192.168.10.27成為active
[zk: localhost:2181(CONNECTED) 3] get /otter/canal/destinations/example/running
{"active":true,"address":"192.168.10.27:11111"}
cZxid = 0x9f
ctime = Fri Mar 04 03:06:18 EST 2022
mZxid = 0x9f
mtime = Fri Mar 04 03:06:18 EST 2022
pZxid = 0x9f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x17f53863740000b
dataLength = 47
numChildren = 0
java客戶端代碼
修改CanalConnector即可,連接地址改成zookeeper的ip:端口
CanalConnector connector = CanalConnectors.newClusterConnector("192.168.10.27:2181","example", "canal", "canal");
Canal Admin
canal-admin設(shè)計(jì)上是為canal提供整體配置管理譬淳、節(jié)點(diǎn)運(yùn)維等面向運(yùn)維的功能档址,提供相對(duì)友好的WebUI操作界面,方便更多用戶快速和安全的操作
部署
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
mkdir /tmp/canal-admin
tar zxvf canal.admin-$version.tar.gz -C /tmp/canal-admin
修改配置文件 conf/application.yml
設(shè)置mysql地址
在mysql執(zhí)行conf/canal_manager.sql
初始化sql
啟動(dòng)
sh bin/startup.sh
啟動(dòng)成功邻梆,可以通過 http://127.0.0.1:8089/ 訪問守伸,默認(rèn)密碼:admin/123456
canal+Kafka進(jìn)行數(shù)據(jù)庫同步
為了高可用和更高的性能,我們會(huì)創(chuàng)建多個(gè)canal-client構(gòu)成一個(gè)集群浦妄,來進(jìn)行解析并同步到新的數(shù)據(jù)庫尼摹。這里就出現(xiàn)了一個(gè)比較重要的問題,如何保證canal-client集群解析消費(fèi)binlog的順序性呢剂娄?
我們使用的binlog是row模式蠢涝。每一個(gè)寫操作都會(huì)產(chǎn)生一條binlog日志。 舉個(gè)簡(jiǎn)單的例子:插入了一條a記錄阅懦,并且立馬修改a記錄和二。這樣會(huì)有兩個(gè)消息發(fā)送給canal-client,如果由于網(wǎng)絡(luò)等原因耳胎,更新的消息早于插入的消息被處理了惯吕,還沒有插入記錄,更新操作的最后效果是失敗的怕午。
canal可以和消息隊(duì)列組合废登,支持kafka,rabbitmq郁惜,rocketmq多種選擇堡距,在消息隊(duì)列這層來實(shí)現(xiàn)消息的順序性。
安裝kafka
部署
wget https://archive.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz
修改配置文件
vim /usr/local/kafka/kafka_2.11-1.1.1/config/server.properties 修改參數(shù)
zookeeper.connect=192.168.10.27:2181
listeners=PLAINTEXT://:9092
# zookeeper地址
advertised.listeners=PLAINTEXT://192.168.10.27:9092
啟動(dòng)server
start腳本
# bin/kafka-server-start.sh -daemon config/server.properties &
查看所有topic
# bin/kafka-topics.sh --list --zookeeper 192.168.1.110:2181
查看指定topic 下面的數(shù)據(jù)
# bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.117:9092 --from-beginning --topic example_t
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
修改canal配置
修改配置文件canal.properties
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
# kafka地址
kafka.bootstrap.servers = 192.168.10.27:9092
然后重啟
[root@bogon kafka_2.11-1.1.1]# sh bin/kafka-topics.sh --list --zookeeper 192.168.10.27:2181
example
java代碼
pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
package com.wangyue.study.canal;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class CanalKafkaConsumer {
public static void main(String[] args) {
/* 消費(fèi)者三個(gè)屬性必須指定(broker地址清單、key和value的反序列化器) */
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.10.27:9092");
properties.put("key.deserializer", StringDeserializer.class);
properties.put("value.deserializer", StringDeserializer.class);
// 群組并非完全必須. 重要知識(shí):在同一Topic下羽戒,相同的groupID消費(fèi)群組中缤沦,只有一個(gè)消費(fèi)者可以拿到數(shù)據(jù)。
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group1");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
try {
//消費(fèi)者訂閱主題(可以多個(gè))
consumer.subscribe(Collections.singletonList("example"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("topic:%s,分區(qū):%d,偏移量:%d," + "key:%s,value:%s", record.topic(), record.partition(),
record.offset(), record.key(), record.value()));
JSONObject valueJson = JSONObject.parseObject(record.value());
JSONArray data = valueJson.getJSONArray("data");
String type = valueJson.getString("type");
String table = valueJson.getString("table");
if (StringUtils.equalsIgnoreCase(type, "delete")) {
redisDelete(data, table);
} else if (StringUtils.equalsIgnoreCase(type, "insert")) {
redisInsert(data, table);
} else if (StringUtils.equalsIgnoreCase(type, "update")) {
redisUpdate(data, table);
}
}
}
//通過另外一個(gè)線程 consumer. wakeup()
} finally {
consumer.close();
}
}
private static void redisInsert(JSONArray data, String tableName) {
for (int i = 0; i < data.size(); i++) {
JSONObject rowData = data.getJSONObject(i);
String key = tableName + ":" + rowData.getString("id");
RedisUtil.stringSet(key, rowData.toJSONString());
}
}
private static void redisUpdate(JSONArray data, String tableName) {
for (int i = 0; i < data.size(); i++) {
JSONObject rowData = data.getJSONObject(i);
String key = tableName + ":" + rowData.getString("id");
RedisUtil.stringSet(key, rowData.toJSONString());
}
}
private static void redisDelete(JSONArray data, String tableName) {
for (int i = 0; i < data.size(); i++) {
JSONObject rowData = data.getJSONObject(i);
String key = tableName + ":" + rowData.getString("id");
RedisUtil.delKey(key);
}
}
}
canal存到kafka里的數(shù)據(jù)內(nèi)容范例:
{
"data": [{
"id": "17",
"doctorId": "15",
"name": "來二樓3",
"birthday": "2013-02-28",
"sex": "0",
"telephone": "15632554566",
"province": "重慶市",
"city": "重慶城區(qū)",
"area": "萬州區(qū)",
"address": "AP庫珀熱熱蓉蓉",
"createTime": "2022-02-28 17:29:43",
"updateTime": "2022-03-03 14:48:31",
"createBy": null,
"updateBy": "18960862122",
"isRemove": "1"
}],
"database": "tcm",
"es": 1646631848000,
"id": 9,
"isDdl": false,
"mysqlType": {
"id": "int",
"doctorId": "int",
"name": "varchar(200)",
"birthday": "varchar(100)",
"sex": "int",
"telephone": "varchar(20)",
"province": "varchar(50)",
"city": "varchar(50)",
"area": "varchar(50)",
"address": "varchar(255)",
"createTime": "varchar(50)",
"updateTime": "varchar(50)",
"createBy": "varchar(50)",
"updateBy": "varchar(50)",
"isRemove": "bit(1)"
},
"old": [{
"name": "來二樓"
}],
"pkNames": ["id"],
"sql": "",
"sqlType": {
"id": 4,
"doctorId": 4,
"name": 12,
"birthday": 12,
"sex": 4,
"telephone": 12,
"province": 12,
"city": 12,
"area": 12,
"address": 12,
"createTime": 12,
"updateTime": 12,
"createBy": 12,
"updateBy": 12,
"isRemove": -7
},
"table": "t_patient",
"ts": 1646631848288,
"type": "UPDATE"
}
redis 存取結(jié)果: