mysql的準備
首先開啟binlog日志
- 進入mysql容器
docker exec -it {container id} /bin/bash
- cd /etc/mysql ,vi my.cnf 可能在容器中需要安裝 vim
apt-get update
apt-get install vim
- 修改my.cnf
$ vi my.cnf
[mysqld]
server_id=1
log-bin=master
binlog_format=row
- 重啟
- 驗證
show variables like '%log_bin%'
或者連接數(shù)據(jù)庫開啟binlog
mysql> set global binlog_format=ROW;
mysql> set global binlog_row_image=FULL;
創(chuàng)建Maxwell用戶菱肖,并賦予 maxwell 庫的一些權(quán)限
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
# or for running maxwell locally:
mysql> CREATE USER 'maxwell'@'localhost' IDENTIFIED BY 'XXXXXX';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'localhost';
啟動kafka
kafka在之前的文章中已經(jīng)弄好了扒最,沒安裝的可以先去看docker啟動kafka
docker啟動maxwell
拉取鏡像
docker pull zendesk/maxwell
啟動maxwell,并將解析出的binlog輸出到控制臺
docker run -it --rm zendesk/maxwell bin/maxwell --user='maxwell' --password='123456' --host='192.168.92.66' --producer=stdout
輸出到kafka
docker run -d -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \
--password='123456' --host='192.168.92.66' --producer=kafka \
--kafka.bootstrap.servers='192.168.92.66:9092' --kafka_topic=maxwell --log_level=debug
消費kafka中的log信息,這邊用的java
application.yml中的配置信息
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
url: jdbc:mysql://localhost:3307/ficus?characterEncoding=utf-8
kafka:
bootstrap-server: 192.168.92.66:9092
producer:
# 發(fā)生錯誤后错维,消息重發(fā)的次數(shù)奖地。
retries: 0
# 鍵的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: maxwell
# 鍵的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 但消費者監(jiān)聽的topic不存在時,保證能夠是項目啟動
missing-topics-fatal: false
測試工程整個截圖如下(寫得比較辣眼睛赋焕,大家有時間的話還是可以寫好點)
工程圖
用到的依賴
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- mybatis -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatisplus.version}</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-generator</artifactId>
<version>${mybatisplus.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
測試
我們另起一個mysql参歹,新建了一張測試表collect
create table resdata
(
id int auto_increment
primary key,
name varchar(20) null,
age int null,
value varchar(20) null
);
然后添加數(shù)據(jù),可以在maxwell的控制臺看到輸出的信息maxwell
kafka
效果圖
image.png
自己的蠢操作
這里mysql的ip不能寫localhost隆判,否則容器內(nèi)沒辦法找到宿主機mysql
image.png
除了kafka還可以選擇redis等犬庇,可以點擊官網(wǎng)鏈接看看。
maxwell詳解參考MySQL Binlog 解析工具 Maxwell 詳解