基于 Canal 和 Kafka 實(shí)現(xiàn) MySQL 的 Binlog 近實(shí)時(shí)同步

作者:Throwable
掘金:https://juejin.im/post/5e6a6746f265da575c16d678

前提

近段時(shí)間麸俘,業(yè)務(wù)系統(tǒng)架構(gòu)基本完備悔雹,數(shù)據(jù)層面的建設(shè)比較薄弱庭猩,因?yàn)楣P者目前工作重心在于搭建一個(gè)小型的數(shù)據(jù)平臺(tái)叁征。優(yōu)先級(jí)比較高的一個(gè)任務(wù)就是需要近實(shí)時(shí)同步業(yè)務(wù)系統(tǒng)的數(shù)據(jù)(包括保存码泞、更新或者軟刪除)到一個(gè)另一個(gè)數(shù)據(jù)源,持久化之前需要清洗數(shù)據(jù)并且構(gòu)建一個(gè)相對(duì)合理的便于后續(xù)業(yè)務(wù)數(shù)據(jù)統(tǒng)計(jì)幅聘、標(biāo)簽系統(tǒng)構(gòu)建等擴(kuò)展功能的數(shù)據(jù)模型凡纳。基于當(dāng)前團(tuán)隊(duì)的資源和能力帝蒿,優(yōu)先調(diào)研了Alibaba開(kāi)源中間件Canal的使用荐糜。

image

這篇文章簡(jiǎn)單介紹一下如何快速地搭建一套Canal相關(guān)的組件。

關(guān)于Canal

簡(jiǎn)介

下面的簡(jiǎn)介和下一節(jié)的原理均來(lái)自于Canal項(xiàng)目的README:

image

Canal[k?'n?l]葛超,譯意為水道/管道/溝渠狞尔,主要用途是基于MySQL數(shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi)巩掺。早期阿里巴巴因?yàn)楹贾莺兔绹?guó)雙機(jī)房部署,存在跨機(jī)房同步的業(yè)務(wù)需求页畦,實(shí)現(xiàn)方式主要是基于業(yè)務(wù)trigger獲取增量變更胖替。從 2010 年開(kāi)始,業(yè)務(wù)逐步嘗試數(shù)據(jù)庫(kù)日志解析獲取增量變更進(jìn)行同步豫缨,由此衍生出了大量的數(shù)據(jù)庫(kù)增量訂閱和消費(fèi)業(yè)務(wù)独令。

基于日志增量訂閱和消費(fèi)的業(yè)務(wù)包括:

  • 數(shù)據(jù)庫(kù)鏡像

  • 數(shù)據(jù)庫(kù)實(shí)時(shí)備份

  • 索引構(gòu)建和實(shí)時(shí)維護(hù)(拆分異構(gòu)索引、倒排索引等)

  • 業(yè)務(wù)Cache刷新

  • 帶業(yè)務(wù)邏輯的增量數(shù)據(jù)處理

Canal 的工作原理

MySQL主備復(fù)制原理:

image
  • MySQL的Master實(shí)例將數(shù)據(jù)變更寫(xiě)入二進(jìn)制日志(binary log好芭,其中記錄叫做二進(jìn)制日志事件binary log events燃箭,可以通過(guò)show binlog events進(jìn)行查看)

  • MySQL的Slave實(shí)例將master的binary log events拷貝到它的中繼日志(relay log)

  • MySQL的Slave實(shí)例重放relay log中的事件,將數(shù)據(jù)變更反映它到自身的數(shù)據(jù)

Canal的工作原理如下:

  • Canal模擬MySQL Slave的交互協(xié)議舍败,偽裝自己為MySQL Slave招狸,向MySQL Master發(fā)送dump協(xié)議

  • MySQL Master收到dump請(qǐng)求,開(kāi)始推送binary log給Slave(即Canal)

  • Canal解析binary log對(duì)象(原始為byte流)邻薯,并且可以通過(guò)連接器發(fā)送到對(duì)應(yīng)的消息隊(duì)列等中間件中

關(guān)于Canal的版本和部件

截止筆者開(kāi)始編寫(xiě)本文的時(shí)候(2020-03-05)裙戏,Canal的最新發(fā)布版本是v1.1.5-alpha-1(2019-10-09發(fā)布的),最新的正式版是v1.1.4(2019-09-02發(fā)布的)厕诡。其中累榜,v1.1.4主要添加了鑒權(quán)、監(jiān)控的功能灵嫌,并且做了一些列的性能優(yōu)化壹罚,此版本集成的連接器是Tcp、Kafka和RockerMQ寿羞。而v1.1.5-alpha-1版本已經(jīng)新增了RabbitMQ連接器猖凛,但是此版本的RabbitMQ連接器暫時(shí)不能定義連接RabbitMQ的端口號(hào),不過(guò)此問(wèn)題已經(jīng)在master分支中修復(fù)(具體可以參看源碼中的CanalRabbitMQProducer類(lèi)的提交記錄)稠曼。換言之形病,v1.1.4版本中目前能使用的內(nèi)置連接器只有Tcp客年、Kafka和RockerMQ三種,如果想嘗鮮使用RabbitMQ連接器漠吻,可以選用下面的兩種方式之一:

  • 選用v1.1.5-alpha-1版本量瓜,但是無(wú)法修改RabbitMQ的port屬性,默認(rèn)為5672途乃。

  • 基于master分支自行構(gòu)建Canal绍傲。

目前,Canal項(xiàng)目的活躍度比較高耍共,但是考慮到功能的穩(wěn)定性問(wèn)題烫饼,筆者建議選用穩(wěn)定版本在生產(chǎn)環(huán)境中實(shí)施,當(dāng)前可以選用v1.1.4版本试读,本文的例子用選用的就是v1.1.4版本杠纵,配合Kafka連接器使用。Canal主要包括三個(gè)核心部件:

  • canal-admin:后臺(tái)管理模塊钩骇,提供面向WebUI的Canal管理能力比藻。

  • canal-adapter:適配器,增加客戶(hù)端數(shù)據(jù)落地的適配及啟動(dòng)功能倘屹,包括REST银亲、日志適配器、關(guān)系型數(shù)據(jù)庫(kù)的數(shù)據(jù)同步(表對(duì)表同步)纽匙、HBase數(shù)據(jù)同步务蝠、ES數(shù)據(jù)同步等等。

  • canal-deployer:發(fā)布器烛缔,核心功能所在馏段,包括binlog解析、轉(zhuǎn)換和發(fā)送報(bào)文到連接器中等等功能都由此模塊提供力穗。

一般情況下毅弧,canal-deployer部件是必須的,其他兩個(gè)部件按需選用即可当窗。

部署所需的中間件

搭建一套可以用的組件需要部署MySQL够坐、Zookeeper、Kafka和Canal四個(gè)中間件的實(shí)例崖面,下面簡(jiǎn)單分析一下部署過(guò)程元咙。選用的虛擬機(jī)系統(tǒng)是CentOS7。

安裝MySQL

為了簡(jiǎn)單起見(jiàn)巫员,選用yum源安裝(官方鏈接是https://dev.mysql.com/downloads/repo/yum):

image

mysql80-community-release-el7-3雖然包名帶了mysql80關(guān)鍵字庶香,其實(shí)已經(jīng)集成了MySQL主流版本5.6、5.7和8.x等等的最新安裝包倉(cāng)庫(kù)

選用的是最新版的MySQL8.x社區(qū)版简识,下載CentOS7適用的rpm包:

cd /data/mysql
wget https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm
// 下載完畢之后
sudo rpm -Uvh mysql80-community-release-el7-3.noarch.rpm

此時(shí)列舉一下yum倉(cāng)庫(kù)里面的MySQL相關(guān)的包:

[root@localhost mysql]# yum repolist all | grep mysql
mysql-cluster-7.5-community/x86_64 MySQL Cluster 7.5 Community   disabled
mysql-cluster-7.5-community-source MySQL Cluster 7.5 Community - disabled
mysql-cluster-7.6-community/x86_64 MySQL Cluster 7.6 Community   disabled
mysql-cluster-7.6-community-source MySQL Cluster 7.6 Community - disabled
mysql-cluster-8.0-community/x86_64 MySQL Cluster 8.0 Community   disabled
mysql-cluster-8.0-community-source MySQL Cluster 8.0 Community - disabled
mysql-connectors-community/x86_64  MySQL Connectors Community    enabled:    141
mysql-connectors-community-source  MySQL Connectors Community -  disabled
mysql-tools-community/x86_64       MySQL Tools Community         enabled:    105
mysql-tools-community-source       MySQL Tools Community - Sourc disabled
mysql-tools-preview/x86_64         MySQL Tools Preview           disabled
mysql-tools-preview-source         MySQL Tools Preview - Source  disabled
mysql55-community/x86_64           MySQL 5.5 Community Server    disabled
mysql55-community-source           MySQL 5.5 Community Server -  disabled
mysql56-community/x86_64           MySQL 5.6 Community Server    disabled
mysql56-community-source           MySQL 5.6 Community Server -  disabled
mysql57-community/x86_64           MySQL 5.7 Community Server    disabled
mysql57-community-source           MySQL 5.7 Community Server -  disabled
mysql80-community/x86_64           MySQL 8.0 Community Server    enabled:    161
mysql80-community-source           MySQL 8.0 Community Server -  disabled

編輯/etc/yum.repos.d/mysql-community.repo文件([mysql80-community]塊中enabled設(shè)置為1赶掖,其實(shí)默認(rèn)就是這樣子感猛,不用改,如果要選用5.x版本則需要修改對(duì)應(yīng)的塊):

[mysql80-community]
name=MySQL 8.0 Community Server
baseurl=http://repo.mysql.com/yum/mysql-8.0-community/el/7/$basearch/
enabled=1
gpgcheck=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-mysql

然后安裝MySQL服務(wù):

sudo yum install mysql-community-server

這個(gè)過(guò)程比較漫長(zhǎng)奢赂,因?yàn)樾枰螺d和安裝5個(gè)rpm安裝包(或者是所有安裝包組合的壓縮包mysql-8.0.18-1.el7.x86_64.rpm-bundle.tar)陪白。如果網(wǎng)絡(luò)比較差,也可以直接從官網(wǎng)手動(dòng)下載后安裝:

image
// 下載下面5個(gè)rpm包 common --> libs --> libs-compat --> client --> server
mysql-community-common
mysql-community-libs
mysql-community-libs-compat
mysql-community-client
mysql-community-server

// 強(qiáng)制安裝
rpm -ivh mysql-community-common-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-libs-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-libs-compat-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-client-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-server-8.0.18-1.el7.x86_64.rpm --force --nodeps

安裝完畢之后膳灶,啟動(dòng)MySQL服務(wù)咱士,然后搜索MySQL服務(wù)的root賬號(hào)的臨時(shí)密碼用于首次登陸(mysql -u root -p):

// 啟動(dòng)服務(wù),關(guān)閉服務(wù)就是service mysqld stop
service mysqld start
// 查看臨時(shí)密碼 cat /var/log/mysqld.log
[root@localhost log]# cat /var/log/mysqld.log 
2020-03-02T06:03:53.996423Z 0 [System] [MY-013169] [Server] /usr/sbin/mysqld (mysqld 8.0.18) initializing of server in progress as process 22780
2020-03-02T06:03:57.321447Z 5 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: >kjYaXENK6li
2020-03-02T06:04:00.123845Z 0 [System] [MY-010116] [Server] /usr/sbin/mysqld (mysqld 8.0.18) starting as process 22834
// 登錄臨時(shí)root用戶(hù)轧钓,使用臨時(shí)密碼
[root@localhost log]# mysql -u root -p

接下來(lái)做下面的操作:

  • 修改root用戶(hù)的密碼:ALTER USER 'root'@'localhost' IDENTIFIED BY 'QWqw12!@';(注意密碼規(guī)則必須包含大小寫(xiě)字母序厉、數(shù)字和特殊字符)

  • 更新root的host,切換數(shù)據(jù)庫(kù)use mysql;毕箍,指定host為%以便可以讓其他服務(wù)器遠(yuǎn)程訪(fǎng)問(wèn)UPDATE USER SET HOST = '%' WHERE USER = 'root';

  • 賦予'root'@'%'用戶(hù)弛房,所有權(quán)限,執(zhí)行GRANT ALL PRIVILEGES ON . TO 'root'@'%';

  • 改變r(jià)oot'@'%用戶(hù)的密碼校驗(yàn)規(guī)則以便可以使用Navicat等工具訪(fǎng)問(wèn):ALTER USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY 'QWqw12!@';

image

操作完成之后而柑,就可以使用root用戶(hù)遠(yuǎn)程訪(fǎng)問(wèn)此虛擬機(jī)上的MySQL服務(wù)庭再。最后確認(rèn)是否開(kāi)啟了binlog(注意一點(diǎn)是MySQL8.x默認(rèn)開(kāi)啟binlog)SHOW VARIABLES LIKE '%bin%';:

image

最后在MySQL的Shell執(zhí)行下面的命令,新建一個(gè)用戶(hù)名canal密碼為QWqw12!@的新用戶(hù)牺堰,賦予REPLICATION SLAVE和 REPLICATION CLIENT權(quán)限:

CREATE USER canal IDENTIFIED BY 'QWqw12!@';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'QWqw12!@';

切換回去root用戶(hù),創(chuàng)建一個(gè)數(shù)據(jù)庫(kù)test:

CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`;

安裝Zookeeper

Canal和Kafka集群都依賴(lài)于Zookeeper做服務(wù)協(xié)調(diào)颅围,為了方便管理伟葫,一般會(huì)獨(dú)立部署Zookeeper服務(wù)或者Zookeeper集群。筆者這里選用2020-03-04發(fā)布的3.6.0版本:

midkr /data/zk
# 創(chuàng)建數(shù)據(jù)目錄
midkr /data/zk/data
cd /data/zk
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.0/apache-zookeeper-3.6.0-bin.tar.gz
tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz
cd apache-zookeeper-3.6.0-bin/conf
cp zoo_sample.cfg zoo.cfg && vim zoo.cfg

把zoo.cfg文件中的dataDir設(shè)置為/data/zk/data院促,然后啟動(dòng)Zookeeper:

[root@localhost conf]# sh /data/zk/apache-zookeeper-3.6.0-bin/bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/zk/apache-zookeeper-3.6.0-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

這里注意一點(diǎn)筏养,要啟動(dòng)此版本的Zookeeper服務(wù)必須本地安裝好JDK8+,這一點(diǎn)需要自行處理常拓。啟動(dòng)的默認(rèn)端口是2181渐溶,啟動(dòng)成功后的日志如下:

image

安裝Kafka

Kafka是一個(gè)高性能分布式消息隊(duì)列中間件,它的部署依賴(lài)于Zookeeper弄抬。筆者在此選用2.4.0并且Scala版本為2.13的安裝包:

mkdir /data/kafka
mkdir /data/kafka/data
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.13-2.4.0.tgz
tar -zxvf kafka_2.13-2.4.0.tgz

由于解壓后/data/kafka/kafka_2.13-2.4.0/config/server.properties配置中對(duì)應(yīng)的zookeeper.connect=localhost:2181已經(jīng)符合需要茎辐,不必修改,需要修改日志文件的目錄log.dirs為/data/kafka/data掂恕。然后啟動(dòng)Kafka服務(wù):

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh /data/kafka/kafka_2.13-2.4.0/config/server.properties
image

這樣啟動(dòng)一旦退出控制臺(tái)就會(huì)結(jié)束Kafka進(jìn)程拖陆,可以添加-daemon參數(shù)用于控制Kafka進(jìn)程后臺(tái)不掛斷運(yùn)行。

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.13-2.4.0/config/server.properties

安裝和使用Canal

終于到了主角登場(chǎng)懊亡,這里選用Canal的v1.1.4穩(wěn)定發(fā)布版依啰,只需要下載deployer模塊:

mkdir /data/canal
cd /data/canal
# 這里注意一點(diǎn),Github在國(guó)內(nèi)被墻店枣,下載速度極慢速警,可以先用其他下載工具下載完再上傳到服務(wù)器中
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
tar -zxvf canal.deployer-1.1.4.tar.gz

解壓后的目錄如下:

- bin   # 運(yùn)維腳本
- conf  # 配置文件
  canal_local.properties  # canal本地配置叹誉,一般不需要?jiǎng)?  canal.properties        # canal服務(wù)配置
  logback.xml             # logback日志配置
  metrics                 # 度量統(tǒng)計(jì)配置
  spring                  # spring-實(shí)例配置,主要和binlog位置計(jì)算闷旧、一些策略配置相關(guān)长豁,可以在canal.properties選用其中的任意一個(gè)配置文件
  example                 # 實(shí)例配置文件夾,一般認(rèn)為單個(gè)數(shù)據(jù)庫(kù)對(duì)應(yīng)一個(gè)獨(dú)立的實(shí)例配置文件夾
    instance.properties   # 實(shí)例配置鸠匀,一般指單個(gè)數(shù)據(jù)庫(kù)的配置
- lib   # 服務(wù)依賴(lài)包
- logs  # 日志文件輸出目錄

在開(kāi)發(fā)和測(cè)試環(huán)境建議把logback.xml的日志級(jí)別修改為DEBUG方便定位問(wèn)題蕉斜。這里需要關(guān)注canal.properties和instance.properties兩個(gè)配置文件。canal.properties文件中缀棍,需要修改:

  • 去掉canal.instance.parser.parallelThreadSize = 16這個(gè)配置項(xiàng)的注釋?zhuān)簿褪菃⒂么伺渲庙?xiàng)宅此,和實(shí)例解析器的線(xiàn)程數(shù)相關(guān),不配置會(huì)表現(xiàn)為阻塞或者不進(jìn)行解析爬范。

  • canal.serverMode配置項(xiàng)指定為kafka父腕,可選值有tcp、kafka和rocketmq(master分支或者最新的的v1.1.5-alpha-1版本青瀑,可以選用rabbitmq)璧亮,默認(rèn)是kafka。

  • canal.mq.servers配置需要指定為Kafka服務(wù)或者集群Broker的地址斥难,這里配置為127.0.0.1:9092枝嘶。

canal.mq.servers在不同的canal.serverMode有不同的意義。
kafka模式下哑诊,指Kafka服務(wù)或者集群Broker的地址群扶,也就是bootstrap.servers
rocketmq模式下,指NameServer列表
rabbitmq模式下镀裤,指RabbitMQ服務(wù)的Host和Port

其他配置項(xiàng)可以參考下面兩個(gè)官方Wiki的鏈接:

  • Canal-Kafka-RocketMQ-QuickStart

  • AdminGuide

instance.properties一般指一個(gè)數(shù)據(jù)庫(kù)實(shí)例的配置竞阐,Canal架構(gòu)支持一個(gè)Canal服務(wù)實(shí)例,處理多個(gè)數(shù)據(jù)庫(kù)實(shí)例的binlog異步解析暑劝。instance.properties需要修改的配置項(xiàng)主要包括:

  • canal.instance.mysql.slaveId需要配置一個(gè)和Master節(jié)點(diǎn)的服務(wù)ID完全不同的值骆莹,這里筆者配置為654321。

  • 配置數(shù)據(jù)源實(shí)例担猛,包括地址幕垦、用戶(hù)、密碼和目標(biāo)數(shù)據(jù)庫(kù):

  • canal.instance.master.address傅联,這里指定為127.0.0.1:3306智嚷。

  • canal.instance.dbUsername,這里指定為canal纺且。

  • canal.instance.dbPassword盏道,這里指定為QWqw12!@。

  • 新增canal.instance.defaultDatabaseName载碌,這里指定為test(需要在MySQL中建立一個(gè)test數(shù)據(jù)庫(kù)猜嘱,見(jiàn)前面的流程)衅枫。

  • Kafka相關(guān)配置,這里暫時(shí)使用靜態(tài)topic和單個(gè)partition:

  • canal.mq.topic朗伶,這里指定為test弦撩,也就是解析完的binlog結(jié)構(gòu)化數(shù)據(jù)會(huì)發(fā)送到Kafka的命名為test的topic中。

  • canal.mq.partition论皆,這里指定為0益楼。

配置工作做好之后,可以啟動(dòng)Canal服務(wù):

sh /data/canal/bin/startup.sh 
# 查看服務(wù)日志
tail -100f /data/canal/logs/canal/canal
# 查看實(shí)例日志  -- 一般情況下点晴,關(guān)注實(shí)例日志即可
tail -100f /data/canal/logs/example/example.log

啟動(dòng)正常后感凤,見(jiàn)實(shí)例日志如下:

image

在test數(shù)據(jù)庫(kù)創(chuàng)建一個(gè)訂單表,并且執(zhí)行幾個(gè)簡(jiǎn)單的DML:

use `test`;

CREATE TABLE `order`
(
    id          BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主鍵',
    order_id    VARCHAR(64)    NOT NULL COMMENT '訂單ID',
    amount      DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '訂單金額',
    create_time DATETIME       NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時(shí)間',
    UNIQUE uniq_order_id (`order_id`)
) COMMENT '訂單表';

INSERT INTO `order`(order_id, amount) VALUES ('10086', 999);
UPDATE `order` SET amount = 10087 WHERE order_id = '10086';
DELETE  FROM `order` WHERE order_id = '10086';

這個(gè)時(shí)候粒督,可以利用Kafka的kafka-console-consumer或者Kafka Tools查看test這個(gè)topic的數(shù)據(jù):

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test
image

具體的數(shù)據(jù)如下:

// test數(shù)據(jù)庫(kù)建庫(kù)腳本
{"data":null,"database":"`test`","es":1583143732000,"id":1,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`","sqlType":null,"table":"","ts":1583143930177,"type":"QUERY"}

// order表建表DDL
{"data":null,"database":"test","es":1583143957000,"id":2,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE `order`\n(\n    id          BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主鍵',\n    order_id    VARCHAR(64)    NOT NULL COMMENT '訂單ID',\n    amount      DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '訂單金額',\n    create_time DATETIME       NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創(chuàng)建時(shí)間',\n    UNIQUE uniq_order_id (`order_id`)\n) COMMENT '訂單表'","sqlType":null,"table":"order","ts":1583143958045,"type":"CREATE"}

// INSERT
{"data":[{"id":"1","order_id":"10086","amount":"999.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143969000,"id":3,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143969460,"type":"INSERT"}

// UPDATE
{"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143974000,"id":4,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":[{"amount":"999.0"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143974870,"type":"UPDATE"}

// DELETE
{"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143980000,"id":5,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143981091,"type":"DELETE"}

可見(jiàn)Kafka的名為test的topic已經(jīng)寫(xiě)入了對(duì)應(yīng)的結(jié)構(gòu)化binlog事件數(shù)據(jù)陪竿,可以編寫(xiě)消費(fèi)者監(jiān)聽(tīng)Kafka對(duì)應(yīng)的topic然后對(duì)獲取到的數(shù)據(jù)進(jìn)行后續(xù)處理。

小結(jié)

這篇文章大部分篇幅用于介紹其他中間件是怎么部署的屠橄,這個(gè)問(wèn)題側(cè)面說(shuō)明了Canal本身部署并不復(fù)雜族跛,它的配置文件屬性項(xiàng)比較多,但是實(shí)際上需要自定義和改動(dòng)的配置項(xiàng)是比較少的锐墙,也就是說(shuō)明了它的運(yùn)維成本和學(xué)習(xí)成本并不高礁哄。后面會(huì)分析基于結(jié)構(gòu)化binlog事件做ELT和持久化相關(guān)工作以及Canal的生產(chǎn)環(huán)境可用級(jí)別HA集群的搭建。

參考資料:

  • A Quick Guide to Using the MySQL Yum Repository

  • Canal

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末溪北,一起剝皮案震驚了整個(gè)濱河市姐仅,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌刻盐,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,718評(píng)論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件劳翰,死亡現(xiàn)場(chǎng)離奇詭異敦锌,居然都是意外死亡,警方通過(guò)查閱死者的電腦和手機(jī)佳簸,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,683評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門(mén)乙墙,熙熙樓的掌柜王于貴愁眉苦臉地迎上來(lái),“玉大人生均,你說(shuō)我怎么就攤上這事听想。” “怎么了马胧?”我有些...
    開(kāi)封第一講書(shū)人閱讀 158,207評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵汉买,是天一觀的道長(zhǎng)。 經(jīng)常有香客問(wèn)我佩脊,道長(zhǎng)蛙粘,這世上最難降的妖魔是什么垫卤? 我笑而不...
    開(kāi)封第一講書(shū)人閱讀 56,755評(píng)論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮出牧,結(jié)果婚禮上穴肘,老公的妹妹穿的比我還像新娘。我一直安慰自己舔痕,他們只是感情好评抚,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,862評(píng)論 6 386
  • 文/花漫 我一把揭開(kāi)白布。 她就那樣靜靜地躺著伯复,像睡著了一般慨代。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上边翼,一...
    開(kāi)封第一講書(shū)人閱讀 50,050評(píng)論 1 291
  • 那天鱼响,我揣著相機(jī)與錄音,去河邊找鬼组底。 笑死丈积,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的债鸡。 我是一名探鬼主播江滨,決...
    沈念sama閱讀 39,136評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開(kāi)眼,長(zhǎng)吁一口氣:“原來(lái)是場(chǎng)噩夢(mèng)啊……” “哼厌均!你這毒婦竟也來(lái)了唬滑?” 一聲冷哼從身側(cè)響起,我...
    開(kāi)封第一講書(shū)人閱讀 37,882評(píng)論 0 268
  • 序言:老撾萬(wàn)榮一對(duì)情侶失蹤棺弊,失蹤者是張志新(化名)和其女友劉穎晶密,沒(méi)想到半個(gè)月后,有當(dāng)?shù)厝嗽跇?shù)林里發(fā)現(xiàn)了一具尸體模她,經(jīng)...
    沈念sama閱讀 44,330評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡稻艰,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,651評(píng)論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了侈净。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片尊勿。...
    茶點(diǎn)故事閱讀 38,789評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖畜侦,靈堂內(nèi)的尸體忽然破棺而出元扔,到底是詐尸還是另有隱情,我是刑警寧澤旋膳,帶...
    沈念sama閱讀 34,477評(píng)論 4 333
  • 正文 年R本政府宣布澎语,位于F島的核電站,受9級(jí)特大地震影響,放射性物質(zhì)發(fā)生泄漏咏连。R本人自食惡果不足惜盯孙,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,135評(píng)論 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望祟滴。 院中可真熱鬧振惰,春花似錦、人聲如沸垄懂。這莊子的主人今日做“春日...
    開(kāi)封第一講書(shū)人閱讀 30,864評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)草慧。三九已至桶蛔,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間漫谷,已是汗流浹背仔雷。 一陣腳步聲響...
    開(kāi)封第一講書(shū)人閱讀 32,099評(píng)論 1 267
  • 我被黑心中介騙來(lái)泰國(guó)打工, 沒(méi)想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留舔示,地道東北人碟婆。 一個(gè)月前我還...
    沈念sama閱讀 46,598評(píng)論 2 362
  • 正文 我出身青樓,卻偏偏與公主長(zhǎng)得像惕稻,于是被迫代替她去往敵國(guó)和親竖共。 傳聞我的和親對(duì)象是個(gè)殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,697評(píng)論 2 351