簡(jiǎn)介
在業(yè)務(wù)開發(fā)過程中,mysql應(yīng)該是最常用的數(shù)據(jù)庫了暑竟。不同業(yè)務(wù)部門會(huì)有自己的mysql集群,為了解決數(shù)據(jù)孤島問題育勺,我們必須對(duì)數(shù)據(jù)進(jìn)行同步整合但荤,為了提高數(shù)據(jù)同步的時(shí)效性,我們一般采用cdc的方式實(shí)時(shí)同步涧至。以下介紹筆者采用flink cdc同步mysql到hudi腹躁,用來構(gòu)建實(shí)時(shí)數(shù)據(jù)湖的過程。
mysql配置
- 開啟binglog
vim /usr/local/etc/my.cnf 添加以下配置
# Default Homebrew MySQL server config
[mysqld]
# Only allow connections from localhost
bind-address = 127.0.0.1
log-bin = mysql-bin
binlog-format = ROW
server_id = 1
重啟mysql :service mysql.server restart
- 創(chuàng)建賬號(hào)密碼南蓬,可以修改密碼復(fù)雜度
set global validate_password_policy=0
set validate_password_length=6
CREATE USER 'flinkuser'@'localhost' IDENTIFIED BY 'flinkpw';
- 授權(quán)
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT > ON . TO 'flinkuser'@'localhost' IDENTIFIED BY 'flinkpw'
- 查看授權(quán)
show grants for 'flinkuser'@'localhost'
- 把配置刷新到文件
FLUSH PRIVILEGES
hadoop配置
- 配置環(huán)境變量
export HADOOP_CLASSPATH=`hadoop classpath`
- 啟動(dòng)集群
sbin/start-all.sh
- 進(jìn)程列表
>flink-1.13.5 % jps
57988 SecondaryNameNode
64612 YarnSessionClusterEntrypoint
57749 NameNode
58183 ResourceManager
58279 NodeManager
60969 SqlClient
64733 Jps
57855 DataNode
flink 配置
- 添加依賴
flink-sql-connector-mysql-cdc-2.2.1.jar
hudi-flink-bundle_2.11-0.10.0.jar
注:為了保證版本的兼容性纺非,最好自己手動(dòng)編譯jar包
- 啟動(dòng)yarn-session集群(注:依賴于hadoop存儲(chǔ),所以只能以yarn模式啟動(dòng)集群)
bin/yarn-session.sh -nm flink-session-cluster -d
- 啟動(dòng)flink client
bin/sql-client.sh embedded -s yarn-session
- 創(chuàng)建mysql cdc源表
CREATE TABLE mysql_user (
id INT,
name STRING,
age INT,
dt STRING,
score DOUBLE,
create_at STRING,
update_at STRING,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'wlapp',
'table-name' = 'user'
);
- 創(chuàng)建hudi目標(biāo)表
CREATE TABLE hudi_user(
id INT,
name STRING,
age INT,
dt STRING,
score DOUBLE,
create_at STRING,
update_at STRING,
PRIMARY KEY(id) NOT ENFORCED
)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://localhost:9000/user/warehouse/wlapp.db/user',
'table.type' = 'COPY_ON_WRITE',
'write.insert.drop.duplicates' = 'true'
);
- 執(zhí)行etl
INSERT INTO hudi_user SELECT * FROM mysql_user;
- 程序運(yùn)行之后在mysql客戶端查看下binlog狀態(tài)
show master status
運(yùn)行結(jié)果
-
yarn資源管理界面
yarn -
flink作業(yè)管理界面
flink
結(jié)
本例中重點(diǎn)關(guān)注mysql的binglog開啟赘方、用戶授權(quán)以及flink的運(yùn)行模式須采用yarn-session烧颖,standalone模式是不行的,因?yàn)橐蕾囉趆adoop存儲(chǔ)蒜焊,需要讀一些hadoop集群的配置信息倒信。