安裝準(zhǔn)備工作
Maxwell=MySQL+Kafka.
1.安裝MySQL
請(qǐng)參照之前的博客元践。
在安裝完MySQL之后菲茬,需要修改my.cnf文件
vi /etc/my.cnf
[mysqld]
server-id = 1
binlog_format = ROW
重啟MySQL,然后登陸到MySQL之后章姓,查看是否已經(jīng)修改過來:
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
在MySQL中添加Maxwell用戶,以及分配權(quán)限:
mysql> create database maxwell;
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
mysql> flush privileges;
2.安裝Kafka
2.1下載安裝包
https://archive.apache.org/dist/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz
2.2解壓Kafaka安裝包
[hadoop@hadoop001 app]$ tar -zxvf kafka_2.11-0.10.2.1.tgz
2.3啟動(dòng)Zookeeper
[hadoop@hadoop001 app]$ cd zookeeper-3.4.6/bin
[hadoop@hadoop001 bin]$ ./zkServer.sh status
JMX enabled by default
Using config: /home/hadoop/app/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: standalone
2.4啟動(dòng)Kafka
[hadoop@hadoop001 app]$ cd kafka_2.11-0.10.2.1/
[hadoop@hadoop001 kafka_2.11-0.10.2.1]$ bin/kafka-server-start.sh config/server.properties
3.安裝Maxwell
3.1下載安裝包
下載地址:
https://github.com/zendesk/maxwell/releases/download/v1.20.0/maxwell-1.20.0.tar.gz
3.2解壓安裝
[hadoop@hadoop001 app]$ tar -zxvf maxwell-1.20.0.tar.gz
4.使用
4.1 STDOUT配置
4.1.1在MySQL中創(chuàng)建數(shù)據(jù)表:
create table xiaoyao(id int not null primary key,name varchar(20),age int,address varchar(20));
4.1.2開啟Maxwell:
[hadoop@hadoop001 maxwell-1.20.0]$ bin/maxwell --user=maxwell --password=123456 --host='127.0.0.1' --producer=stdout
4.1.3對(duì)數(shù)據(jù)操作:
mysql> insert into xiaoyao values(1,'xiaoyao',0,'beijing');
mysql> insert into xiaoyao values(2,'xiaoyao1',20,'beijing');
mysql> update xiaoyao set age=15 where id=1;
4.1.4可以在Maxwell中可以看到控制臺(tái)輸出:
插入操作:
{"database":"test","table":"xiaoyao","type":"insert","ts":1553397965,"xid":494,"commit":true,"data":{"id":2,"name":"xiaoyao1","age":20,"address":"beijing"}}
更新操作(binlog會(huì)記錄所有的字段,以及原先的值):
{"database":"test","table":"xiaoyao","type":"update","ts":1553398124,"xid":550,"commit":true,"data":{"id":1,"name":"xiaoyao","age":15,"address":"beijing"},"old":{"age":0}}
4.2Maxwell與Kafka結(jié)合
開啟Maxwell:
bin/maxwell --user='maxwell' --password='123456' --host='127.0.0.1' \
--producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell --kafka_version=0.10.2.1
開啟Kakfa:
bin/maxwell --user='maxwell' --password='123456' --host='127.0.0.1' \
--producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell --kafka_version=0.10.2.1
進(jìn)入MySQL吼蚁,修改數(shù)據(jù):
mysql> use test;
mysql> update xiaoyao set age=30 where id=2;
開啟Kafka消費(fèi):
[hadoop@hadoop001 kafka_2.11-0.10.2.1]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic maxwell --from-beginning
{"database":"test","table":"xiaoyao","type":"update","ts":1553401735,"xid":1675,"commit":true,"data":{"id":2,"name":"xiaoyao1","age":30,"address":"beijing"},"old":{"age":20}}
5.Filters
對(duì)產(chǎn)生的數(shù)據(jù)進(jìn)行過濾:
bin/maxwell --user='maxwell' --password='mysqlmaxwellpwd' --host='localhost' \
--producer=kafka --kafka.bootstrap.servers=localhost:9092 \
--kafka_topic=maxwells --filter 'exclude: ambari.*, include: test_binlog.*'
也可以自定義過濾規(guī)則腮猖,請(qǐng)參考:
http://maxwells-daemon.io/filtering/
http://maxwells-daemon.io/config/
6.關(guān)于如何查看binlog
請(qǐng)參考文檔:
https://www.cnblogs.com/martinzhang/p/3454358.html
https://blog.csdn.net/a1010256340/article/details/80306952
Maxwell vs Canal
Canal(服務(wù)端) | Maxwell(客戶端+服務(wù)端) | |
---|---|---|
語言 | Java | Java |
活躍度 | 活躍 | 活躍 |
HA | 支持 | 定制 但是支持?jǐn)帱c(diǎn)還原功能 |
數(shù)據(jù)落地 | 定制 | 落地到Kafka |
分區(qū) | 支持 | 支持 |
bootstrap | 不支持 | 支持 |
數(shù)據(jù)格式 | 格式自由 | json(格式固定) |
文檔 | 較詳細(xì) | 較詳細(xì) |
隨機(jī)讀 | 支持 | 支持 |