Canal介紹及其原理,多謝博主的共享苹祟,對(duì)Canal和MaxWell的區(qū)別瞬間懂了很多。
1.Canal和Maxwell作為kafka source的區(qū)別
阿里開源的Canal進(jìn)行Mysql binlog數(shù)據(jù)的抽取,另需開發(fā)一個(gè)數(shù)據(jù)轉(zhuǎn)換工具將從binlog中解析出的數(shù)據(jù)轉(zhuǎn)換成自帶schema的json數(shù)據(jù)并寫入kafka中窿克。而使用maxwell可直接完成對(duì)mysql binlog數(shù)據(jù)的抽取和轉(zhuǎn)換成自帶schema的json數(shù)據(jù)寫入到kafka中。
另外Maxwell作為kafka connector的話需要metric的東西也比較多毛甲,因此此處我的kafka Connect選擇了Canal.
2.Canal -> kafka的實(shí)現(xiàn)
像1中所說的實(shí)現(xiàn)Canal作為kafka的生產(chǎn)者年叮,kafka作為消費(fèi)者,還需要一個(gè)中間件玻募。github上有給出這個(gè)只损,地址:https://github.com/sasou/syncClient
以下為運(yùn)行步驟:
- 首先配置Canal,下載deploy的tar包七咧,可單機(jī)環(huán)境跃惫。下載解壓后進(jìn)行如下配置
$ vim [Canal path]/conf/example/instance.properties
修改如下一行
canal.instance.master.journal.name = mysql-bin.000001 #mysql主庫鏈接時(shí)起始的binlog文件
canal.instance.master.position = 4 #mysql主庫鏈接時(shí)起始的binlog偏移量,可不設(shè)置
canal.instance.defaultDatabaseName = test #mysql鏈接時(shí)默認(rèn)schema,選擇一個(gè)你的mysql中存在的數(shù)據(jù)庫
- 開啟mysql的binlog寫入功能艾栋,并且配置binlog模式為row
canal的原理是基于mysql binlog技術(shù)爆存,所以這里一定需要開啟mysql的binlog寫入功能,并且配置binlog模式為row.
$ vim /etc/mysql/my.cnf
[mysqld] log-bin=mysql-bin #添加這一行就ok binlog-format=ROW #選擇row模式 server_id=1 #配置mysql replaction需要定義裹粤,不能和canal的slaveId重復(fù)
注意完成后一定要重啟mysql服務(wù)
$ service mysql stop
$ service mysql start
- 在mysql中添加Canal用戶的權(quán)限
canal的原理是模擬自己為mysql slave终蒂,所以這里一定需要做為mysql slave的相關(guān)權(quán)限
在mysql>下輸入:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
GRANT SELECT,REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal[@localhost];
FLUSH PRIVILEGES;
針對(duì)已有的賬戶可通過grants查詢權(quán)限:
show grants **for** 'canal';
- 下載github上的syncClient
下載并解壓縮數(shù)據(jù)實(shí)時(shí)同步中間件syncClient。
根據(jù)自身情況修改/syncClient/bin/SysConfig.properties
比如我運(yùn)行的是canal的實(shí)例是example遥诉,且kafka在本地拇泣,所以進(jìn)行了如下修改:
debug=1
ip=127.0.0.1
port=11111
destination=example
username=
password=
filter=
#kafka
kafkaIp=127.0.0.1
kafkaPort=9092
注意這里的username,password和canal配置中的dbusername,dbpassword的區(qū)別矮锈。Canal配置中的dbusername,dbpassword是指canal讀取mysql的binlog時(shí)的用戶和密碼霉翔,默認(rèn)是canal,canal。但是這里的username和password是指canal自身的用戶和密碼(即連接到這個(gè)數(shù)據(jù)實(shí)時(shí)同步中間件時(shí)的用戶和密碼)苞笨,默認(rèn)是空债朵。
之后在bin目錄下運(yùn)行
sh start.sh
即可看到
empty 0
empty 1
……
這樣的輸出
- 傳入數(shù)據(jù)
開啟kafka,并啟動(dòng)另一個(gè)終端對(duì)mysql中對(duì)數(shù)據(jù)庫中的表做修改(至于是不是必須是我們之前設(shè)置的canal.instance.defaultDatabaseName = test這個(gè)test數(shù)據(jù)庫子眶,還待進(jìn)一步驗(yàn)證),修改后就可以在start.sh那個(gè)終端下看到變化了序芦。如果有錯(cuò)誤臭杰,請嘗試關(guān)閉防火墻$ sudo ufw stop
- 使用kafka查看變化
如果想使用kafka查看數(shù)據(jù)庫的變化,可以在啟動(dòng)消費(fèi)者端查看
$ cd [the path to kafka]
$ bin/kafka-console-consumer.sh -zookeeper localhost:2181--from-beginning --topic 數(shù)據(jù)庫名_表名 #比如你修改的是test數(shù)據(jù)下的user表谚中,則此處topic為test_user
當(dāng)然你也可以在kafka中查看是否有這個(gè)topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
以上就是今天的全部工作了~Fighting