前提
近段時(shí)間,業(yè)務(wù)系統(tǒng)架構(gòu)基本完備,數(shù)據(jù)層面的建設(shè)比較薄弱闷哆,因?yàn)楣P者目前工作重心在于搭建一個(gè)小型的數(shù)據(jù)平臺(tái)腰奋。優(yōu)先級(jí)比較高的一個(gè)任務(wù)就是需要近實(shí)時(shí)同步業(yè)務(wù)系統(tǒng)的數(shù)據(jù)(包括保存、更新或者軟刪除)到一個(gè)另一個(gè)數(shù)據(jù)源抱怔,持久化之前需要清洗數(shù)據(jù)并且構(gòu)建一個(gè)相對(duì)合理的便于后續(xù)業(yè)務(wù)數(shù)據(jù)統(tǒng)計(jì)劣坊、標(biāo)簽系統(tǒng)構(gòu)建等擴(kuò)展功能的數(shù)據(jù)模型∏簦基于當(dāng)前團(tuán)隊(duì)的資源和能力局冰,優(yōu)先調(diào)研了Alibaba
開(kāi)源中間件Canal
的使用。
這篇文章簡(jiǎn)單介紹一下如何快速地搭建一套Canal
相關(guān)的組件灌危。
關(guān)于Canal
簡(jiǎn)介
下面的簡(jiǎn)介和下一節(jié)的原理均來(lái)自于Canal項(xiàng)目的README
:
Canal[k?'n?l]
康二,譯意為水道/管道/溝渠,主要用途是基于MySQL
數(shù)據(jù)庫(kù)增量日志解析勇蝙,提供增量數(shù)據(jù)訂閱和消費(fèi)沫勿。早期阿里巴巴因?yàn)楹贾莺兔绹?guó)雙機(jī)房部署,存在跨機(jī)房同步的業(yè)務(wù)需求味混,實(shí)現(xiàn)方式主要是基于業(yè)務(wù)trigger
獲取增量變更产雹。從 2010 年開(kāi)始,業(yè)務(wù)逐步嘗試數(shù)據(jù)庫(kù)日志解析獲取增量變更進(jìn)行同步惜傲,由此衍生出了大量的數(shù)據(jù)庫(kù)增量訂閱和消費(fèi)業(yè)務(wù)洽故。
基于日志增量訂閱和消費(fèi)的業(yè)務(wù)包括:
- 數(shù)據(jù)庫(kù)鏡像
- 數(shù)據(jù)庫(kù)實(shí)時(shí)備份
- 索引構(gòu)建和實(shí)時(shí)維護(hù)(拆分異構(gòu)索引贝攒、倒排索引等)
- 業(yè)務(wù)
Cache
刷新 - 帶業(yè)務(wù)邏輯的增量數(shù)據(jù)處理
Canal的工作原理
MySQL
主備復(fù)制原理:
-
MySQL
的Master
實(shí)例將數(shù)據(jù)變更寫入二進(jìn)制日志(binary log
盗誊,其中記錄叫做二進(jìn)制日志事件binary log events
,可以通過(guò)show binlog events
進(jìn)行查看) -
MySQL
的Slave
實(shí)例將master
的binary log events
拷貝到它的中繼日志(relay log
) -
MySQL
的Slave
實(shí)例重放relay log
中的事件隘弊,將數(shù)據(jù)變更反映它到自身的數(shù)據(jù)
Canal
的工作原理如下:
-
Canal
模擬MySQL Slave
的交互協(xié)議哈踱,偽裝自己為MySQL Slave
,向MySQL Master
發(fā)送dump
協(xié)議 -
MySQL Master
收到dump
請(qǐng)求梨熙,開(kāi)始推送binary log
給Slave
(即Canal
) -
Canal
解析binary log
對(duì)象(原始為byte
流)开镣,并且可以通過(guò)連接器發(fā)送到對(duì)應(yīng)的消息隊(duì)列等中間件中
關(guān)于Canal的版本和部件
截止筆者開(kāi)始編寫本文的時(shí)候(2020-03-05
),Canal
的最新發(fā)布版本是v1.1.5-alpha-1
(2019-10-09
發(fā)布的)咽扇,最新的正式版是v1.1.4
(2019-09-02
發(fā)布的)邪财。其中,v1.1.4
主要添加了鑒權(quán)质欲、監(jiān)控的功能树埠,并且做了一些列的性能優(yōu)化,此版本集成的連接器是Tcp
嘶伟、Kafka
和RockerMQ
怎憋。而v1.1.5-alpha-1
版本已經(jīng)新增了RabbitMQ
連接器,但是此版本的RabbitMQ
連接器暫時(shí)不能定義連接RabbitMQ
的端口號(hào),不過(guò)此問(wèn)題已經(jīng)在master
分支中修復(fù)(具體可以參看源碼中的CanalRabbitMQProducer
類的提交記錄)绊袋。換言之毕匀,v1.1.4
版本中目前能使用的內(nèi)置連接器只有Tcp
、Kafka
和RockerMQ
三種癌别,如果想嘗鮮使用RabbitMQ
連接器皂岔,可以選用下面的兩種方式之一:
- 選用
v1.1.5-alpha-1
版本,但是無(wú)法修改RabbitMQ
的port
屬性展姐,默認(rèn)為5672
凤薛。 - 基于
master
分支自行構(gòu)建Canal
。
目前诞仓,Canal
項(xiàng)目的活躍度比較高缤苫,但是考慮到功能的穩(wěn)定性問(wèn)題,筆者建議選用穩(wěn)定版本在生產(chǎn)環(huán)境中實(shí)施墅拭,當(dāng)前可以選用v1.1.4
版本活玲,本文的例子用選用的就是v1.1.4
版本,配合Kafka
連接器使用谍婉。Canal
主要包括三個(gè)核心部件:
-
canal-admin
:后臺(tái)管理模塊舒憾,提供面向WebUI
的Canal
管理能力。 -
canal-adapter
:適配器穗熬,增加客戶端數(shù)據(jù)落地的適配及啟動(dòng)功能镀迂,包括REST
、日志適配器唤蔗、關(guān)系型數(shù)據(jù)庫(kù)的數(shù)據(jù)同步(表對(duì)表同步)探遵、HBase
數(shù)據(jù)同步、ES
數(shù)據(jù)同步等等妓柜。 -
canal-deployer
:發(fā)布器箱季,核心功能所在,包括binlog
解析棍掐、轉(zhuǎn)換和發(fā)送報(bào)文到連接器中等等功能都由此模塊提供藏雏。
一般情況下,canal-deployer
部件是必須的作煌,其他兩個(gè)部件按需選用即可掘殴。
部署所需的中間件
搭建一套可以用的組件需要部署MySQL
、Zookeeper
粟誓、Kafka
和Canal
四個(gè)中間件的實(shí)例奏寨,下面簡(jiǎn)單分析一下部署過(guò)程。選用的虛擬機(jī)系統(tǒng)是CentOS7
努酸。
安裝MySQL
為了簡(jiǎn)單起見(jiàn)服爷,選用yum
源安裝(官方鏈接是https://dev.mysql.com/downloads/repo/yum
):
mysql80-community-release-el7-3雖然包名帶了mysql80關(guān)鍵字,其實(shí)已經(jīng)集成了MySQL主流版本5.6、5.7和8.x等等的最新安裝包倉(cāng)庫(kù)
選用的是最新版的MySQL8.x
社區(qū)版仍源,下載CentOS7
適用的rpm包
:
cd /data/mysql
wget https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm
// 下載完畢之后
sudo rpm -Uvh mysql80-community-release-el7-3.noarch.rpm
此時(shí)列舉一下yum
倉(cāng)庫(kù)里面的MySQL
相關(guān)的包:
[root@localhost mysql]# yum repolist all | grep mysql
mysql-cluster-7.5-community/x86_64 MySQL Cluster 7.5 Community disabled
mysql-cluster-7.5-community-source MySQL Cluster 7.5 Community - disabled
mysql-cluster-7.6-community/x86_64 MySQL Cluster 7.6 Community disabled
mysql-cluster-7.6-community-source MySQL Cluster 7.6 Community - disabled
mysql-cluster-8.0-community/x86_64 MySQL Cluster 8.0 Community disabled
mysql-cluster-8.0-community-source MySQL Cluster 8.0 Community - disabled
mysql-connectors-community/x86_64 MySQL Connectors Community enabled: 141
mysql-connectors-community-source MySQL Connectors Community - disabled
mysql-tools-community/x86_64 MySQL Tools Community enabled: 105
mysql-tools-community-source MySQL Tools Community - Sourc disabled
mysql-tools-preview/x86_64 MySQL Tools Preview disabled
mysql-tools-preview-source MySQL Tools Preview - Source disabled
mysql55-community/x86_64 MySQL 5.5 Community Server disabled
mysql55-community-source MySQL 5.5 Community Server - disabled
mysql56-community/x86_64 MySQL 5.6 Community Server disabled
mysql56-community-source MySQL 5.6 Community Server - disabled
mysql57-community/x86_64 MySQL 5.7 Community Server disabled
mysql57-community-source MySQL 5.7 Community Server - disabled
mysql80-community/x86_64 MySQL 8.0 Community Server enabled: 161
mysql80-community-source MySQL 8.0 Community Server - disabled
編輯/etc/yum.repos.d/mysql-community.repo
文件([mysql80-community]
塊中enabled設(shè)置為1
心褐,其實(shí)默認(rèn)就是這樣子,不用改笼踩,如果要選用5.x
版本則需要修改對(duì)應(yīng)的塊):
[mysql80-community]
name=MySQL 8.0 Community Server
baseurl=http://repo.mysql.com/yum/mysql-8.0-community/el/7/$basearch/
enabled=1
gpgcheck=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-mysql
然后安裝MySQL
服務(wù):
sudo yum install mysql-community-server
這個(gè)過(guò)程比較漫長(zhǎng)逗爹,因?yàn)樾枰螺d和安裝5個(gè)rpm
安裝包(或者是所有安裝包組合的壓縮包mysql-8.0.18-1.el7.x86_64.rpm-bundle.tar
)。如果網(wǎng)絡(luò)比較差嚎于,也可以直接從官網(wǎng)手動(dòng)下載后安裝:
// 下載下面5個(gè)rpm包 common --> libs --> libs-compat --> client --> server
mysql-community-common
mysql-community-libs
mysql-community-libs-compat
mysql-community-client
mysql-community-server
// 強(qiáng)制安裝
rpm -ivh mysql-community-common-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-libs-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-libs-compat-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-client-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-server-8.0.18-1.el7.x86_64.rpm --force --nodeps
安裝完畢之后掘而,啟動(dòng)MySQL
服務(wù),然后搜索MySQL
服務(wù)的root
賬號(hào)的臨時(shí)密碼用于首次登陸(mysql -u root -p
):
// 啟動(dòng)服務(wù)于购,關(guān)閉服務(wù)就是service mysqld stop
service mysqld start
// 查看臨時(shí)密碼 cat /var/log/mysqld.log
[root@localhost log]# cat /var/log/mysqld.log
2020-03-02T06:03:53.996423Z 0 [System] [MY-013169] [Server] /usr/sbin/mysqld (mysqld 8.0.18) initializing of server in progress as process 22780
2020-03-02T06:03:57.321447Z 5 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: >kjYaXENK6li
2020-03-02T06:04:00.123845Z 0 [System] [MY-010116] [Server] /usr/sbin/mysqld (mysqld 8.0.18) starting as process 22834
// 登錄臨時(shí)root用戶袍睡,使用臨時(shí)密碼
[root@localhost log]# mysql -u root -p
接下來(lái)做下面的操作:
- 修改
root
用戶的密碼:ALTER USER 'root'@'localhost' IDENTIFIED BY 'QWqw12!@';
(注意密碼規(guī)則必須包含大小寫字母、數(shù)字和特殊字符) - 更新
root
的host
肋僧,切換數(shù)據(jù)庫(kù)use mysql;
斑胜,指定host
為%
以便可以讓其他服務(wù)器遠(yuǎn)程訪問(wèn)UPDATE USER SET HOST = '%' WHERE USER = 'root';
- 賦予
'root'@'%'
用戶,所有權(quán)限嫌吠,執(zhí)行GRANT ALL PRIVILEGES ON *.* TO 'root'@'%';
- 改變
root'@'%
用戶的密碼校驗(yàn)規(guī)則以便可以使用Navicat
等工具訪問(wèn):ALTER USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY 'QWqw12!@';
操作完成之后止潘,就可以使用root
用戶遠(yuǎn)程訪問(wèn)此虛擬機(jī)上的MySQL
服務(wù)。最后確認(rèn)是否開(kāi)啟了binlog
(注意一點(diǎn)是MySQL8.x
默認(rèn)開(kāi)啟binlog
)SHOW VARIABLES LIKE '%bin%';
:
最后在MySQL
的Shell
執(zhí)行下面的命令辫诅,新建一個(gè)用戶名canal
密碼為QWqw12!@
的新用戶凭戴,賦予REPLICATION SLAVE
和 REPLICATION CLIENT
權(quán)限:
CREATE USER canal IDENTIFIED BY 'QWqw12!@';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'QWqw12!@';
切換回去root
用戶,創(chuàng)建一個(gè)數(shù)據(jù)庫(kù)test
:
CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`;
安裝Zookeeper
Canal
和Kafka
集群都依賴于Zookeeper
做服務(wù)協(xié)調(diào)炕矮,為了方便管理么夫,一般會(huì)獨(dú)立部署Zookeeper
服務(wù)或者Zookeeper
集群。筆者這里選用2020-03-04
發(fā)布的3.6.0
版本:
midkr /data/zk
# 創(chuàng)建數(shù)據(jù)目錄
midkr /data/zk/data
cd /data/zk
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.0/apache-zookeeper-3.6.0-bin.tar.gz
tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz
cd apache-zookeeper-3.6.0-bin/conf
cp zoo_sample.cfg zoo.cfg && vim zoo.cfg
把zoo.cfg
文件中的dataDir
設(shè)置為/data/zk/data
吧享,然后啟動(dòng)Zookeeper
:
[root@localhost conf]# sh /data/zk/apache-zookeeper-3.6.0-bin/bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/zk/apache-zookeeper-3.6.0-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
這里注意一點(diǎn)魏割,要啟動(dòng)此版本的Zookeeper
服務(wù)必須本地安裝好JDK8+
譬嚣,這一點(diǎn)需要自行處理钢颂。啟動(dòng)的默認(rèn)端口是2181
,啟動(dòng)成功后的日志如下:
安裝Kafka
Kafka
是一個(gè)高性能分布式消息隊(duì)列中間件拜银,它的部署依賴于Zookeeper
殊鞭。筆者在此選用2.4.0
并且Scala
版本為2.13
的安裝包:
mkdir /data/kafka
mkdir /data/kafka/data
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.13-2.4.0.tgz
tar -zxvf kafka_2.13-2.4.0.tgz
由于解壓后/data/kafka/kafka_2.13-2.4.0/config/server.properties
配置中對(duì)應(yīng)的zookeeper.connect=localhost:2181
已經(jīng)符合需要,不必修改尼桶,需要修改日志文件的目錄log.dirs
為/data/kafka/data
操灿。然后啟動(dòng)Kafka
服務(wù):
sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh /data/kafka/kafka_2.13-2.4.0/config/server.properties
這樣啟動(dòng)一旦退出控制臺(tái)就會(huì)結(jié)束Kafka
進(jìn)程,可以添加-daemon
參數(shù)用于控制Kafka
進(jìn)程后臺(tái)不掛斷運(yùn)行泵督。
sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.13-2.4.0/config/server.properties
安裝和使用Canal
終于到了主角登場(chǎng)趾盐,這里選用Canal
的v1.1.4
穩(wěn)定發(fā)布版,只需要下載deployer
模塊:
mkdir /data/canal
cd /data/canal
# 這里注意一點(diǎn),Github在國(guó)內(nèi)被墻救鲤,下載速度極慢久窟,可以先用其他下載工具下載完再上傳到服務(wù)器中
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
tar -zxvf canal.deployer-1.1.4.tar.gz
解壓后的目錄如下:
- bin # 運(yùn)維腳本
- conf # 配置文件
canal_local.properties # canal本地配置,一般不需要?jiǎng)? canal.properties # canal服務(wù)配置
logback.xml # logback日志配置
metrics # 度量統(tǒng)計(jì)配置
spring # spring-實(shí)例配置本缠,主要和binlog位置計(jì)算斥扛、一些策略配置相關(guān),可以在canal.properties選用其中的任意一個(gè)配置文件
example # 實(shí)例配置文件夾丹锹,一般認(rèn)為單個(gè)數(shù)據(jù)庫(kù)對(duì)應(yīng)一個(gè)獨(dú)立的實(shí)例配置文件夾
instance.properties # 實(shí)例配置稀颁,一般指單個(gè)數(shù)據(jù)庫(kù)的配置
- lib # 服務(wù)依賴包
- logs # 日志文件輸出目錄
在開(kāi)發(fā)和測(cè)試環(huán)境建議把logback.xml
的日志級(jí)別修改為DEBUG
方便定位問(wèn)題。這里需要關(guān)注canal.properties
和instance.properties
兩個(gè)配置文件楣黍。canal.properties
文件中匾灶,需要修改:
- 去掉
canal.instance.parser.parallelThreadSize = 16
這個(gè)配置項(xiàng)的注釋,也就是啟用此配置項(xiàng)租漂,和實(shí)例解析器的線程數(shù)相關(guān)粘昨,不配置會(huì)表現(xiàn)為阻塞或者不進(jìn)行解析。 -
canal.serverMode
配置項(xiàng)指定為kafka
窜锯,可選值有tcp
张肾、kafka
和rocketmq
(master
分支或者最新的的v1.1.5-alpha-1
版本,可以選用rabbitmq
)锚扎,默認(rèn)是kafka
吞瞪。 -
canal.mq.servers
配置需要指定為Kafka
服務(wù)或者集群Broker
的地址,這里配置為127.0.0.1:9092
驾孔。
canal.mq.servers在不同的canal.serverMode有不同的意義芍秆。
kafka模式下,指Kafka服務(wù)或者集群Broker的地址翠勉,也就是bootstrap.servers
rocketmq模式下妖啥,指NameServer列表
rabbitmq模式下骡楼,指RabbitMQ服務(wù)的Host和Port
其他配置項(xiàng)可以參考下面兩個(gè)官方Wiki
的鏈接:
instance.properties
一般指一個(gè)數(shù)據(jù)庫(kù)實(shí)例的配置点楼,Canal
架構(gòu)支持一個(gè)Canal
服務(wù)實(shí)例晃财,處理多個(gè)數(shù)據(jù)庫(kù)實(shí)例的binlog
異步解析陵且。instance.properties
需要修改的配置項(xiàng)主要包括:
canal.instance.mysql.slaveId
需要配置一個(gè)和Master
節(jié)點(diǎn)的服務(wù)ID
完全不同的值赴恨,這里筆者配置為654321
留拾。-
配置數(shù)據(jù)源實(shí)例褐耳,包括地址胁孙、用戶骑脱、密碼和目標(biāo)數(shù)據(jù)庫(kù):
-
canal.instance.master.address
菜枷,這里指定為127.0.0.1:3306
。 -
canal.instance.dbUsername
叁丧,這里指定為canal
啤誊。 -
canal.instance.dbPassword
岳瞭,這里指定為QWqw12!@
。 - 新增
canal.instance.defaultDatabaseName
蚊锹,這里指定為test
(需要在MySQL
中建立一個(gè)test
數(shù)據(jù)庫(kù)寝优,見(jiàn)前面的流程)。
-
Kafka
相關(guān)配置枫耳,這里暫時(shí)使用靜態(tài)topic
和單個(gè)partition
:-
canal.mq.topic
乏矾,這里指定為test
,也就是解析完的binlog
結(jié)構(gòu)化數(shù)據(jù)會(huì)發(fā)送到Kafka
的命名為test
的topic
中迁杨。-
canal.mq.partition
钻心,這里指定為0
。
-
配置工作做好之后铅协,可以啟動(dòng)Canal
服務(wù):
sh /data/canal/bin/startup.sh
# 查看服務(wù)日志
tail -100f /data/canal/logs/canal/canal
# 查看實(shí)例日志 -- 一般情況下捷沸,關(guān)注實(shí)例日志即可
tail -100f /data/canal/logs/example/example.log
啟動(dòng)正常后,見(jiàn)實(shí)例日志如下:
在test
數(shù)據(jù)庫(kù)創(chuàng)建一個(gè)訂單表狐史,并且執(zhí)行幾個(gè)簡(jiǎn)單的DML
:
use `test`;
CREATE TABLE `order`
(
id BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主鍵',
order_id VARCHAR(64) NOT NULL COMMENT '訂單ID',
amount DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '訂單金額',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時(shí)間',
UNIQUE uniq_order_id (`order_id`)
) COMMENT '訂單表';
INSERT INTO `order`(order_id, amount) VALUES ('10086', 999);
UPDATE `order` SET amount = 10087 WHERE order_id = '10086';
DELETE FROM `order` WHERE order_id = '10086';
這個(gè)時(shí)候痒给,可以利用Kafka
的kafka-console-consumer
或者Kafka Tools
查看test
這個(gè)topic
的數(shù)據(jù):
sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test
具體的數(shù)據(jù)如下:
// test數(shù)據(jù)庫(kù)建庫(kù)腳本
{"data":null,"database":"`test`","es":1583143732000,"id":1,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`","sqlType":null,"table":"","ts":1583143930177,"type":"QUERY"}
// order表建表DDL
{"data":null,"database":"test","es":1583143957000,"id":2,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE `order`\n(\n id BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主鍵',\n order_id VARCHAR(64) NOT NULL COMMENT '訂單ID',\n amount DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '訂單金額',\n create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時(shí)間',\n UNIQUE uniq_order_id (`order_id`)\n) COMMENT '訂單表'","sqlType":null,"table":"order","ts":1583143958045,"type":"CREATE"}
// INSERT
{"data":[{"id":"1","order_id":"10086","amount":"999.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143969000,"id":3,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143969460,"type":"INSERT"}
// UPDATE
{"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143974000,"id":4,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":[{"amount":"999.0"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143974870,"type":"UPDATE"}
// DELETE
{"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143980000,"id":5,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143981091,"type":"DELETE"}
可見(jiàn)Kafka
的名為test
的topic
已經(jīng)寫入了對(duì)應(yīng)的結(jié)構(gòu)化binlog
事件數(shù)據(jù),可以編寫消費(fèi)者監(jiān)聽(tīng)Kafka
對(duì)應(yīng)的topic
然后對(duì)獲取到的數(shù)據(jù)進(jìn)行后續(xù)處理骏全。
小結(jié)
這篇文章大部分篇幅用于介紹其他中間件是怎么部署的苍柏,這個(gè)問(wèn)題側(cè)面說(shuō)明了Canal
本身部署并不復(fù)雜,它的配置文件屬性項(xiàng)比較多姜贡,但是實(shí)際上需要自定義和改動(dòng)的配置項(xiàng)是比較少的试吁,也就是說(shuō)明了它的運(yùn)維成本和學(xué)習(xí)成本并不高。后面會(huì)分析基于結(jié)構(gòu)化binlog
事件做ELT
和持久化相關(guān)工作以及Canal
的生產(chǎn)環(huán)境可用級(jí)別HA
集群的搭建楼咳。
參考地址
如果大家喜歡我的文章熄捍,可以關(guān)注個(gè)人訂閱號(hào)。歡迎隨時(shí)留言母怜、交流余耽。如果想加入微信群的話一起討論的話,請(qǐng)加管理員簡(jiǎn)棧文化-小助手(lastpass4u)苹熏,他會(huì)拉你們進(jìn)群碟贾。