-
環(huán)境準(zhǔn)備
-
開啟MariaDB的Binlog日志
修改/etc/my.conf文件,在配置文件[mysqld]下增加如下配置
server-id=999 log-bin=mysql-bin binlog_format=ROW
注意:
MySQL Binlog支持多種數(shù)據(jù)更新格式包括Row脏答、Statement和mix(Row和Statement的混合)卸奉,這里建議使用Row模式的Binlog格式桑逝,可以更加方便實(shí)時的反應(yīng)行級別的數(shù)據(jù)變化赴背。binlog 配置參考:
https://dev.mysql.com/doc/refman/5.7/en/binary-log-setting.html[root@node01 mariadb]# systemctl restart mysqld [root@node01 mariadb]# systemctl status mysqld ● mysqld.service - LSB: start and stop MariaDB Loaded: loaded (/etc/rc.d/init.d/mysqld; bad; vendor preset: disabled) Active: active (running) since Tue 2020-04-28 09:59:03 CST; 1min 11s ago Docs: man:systemd-sysv-generator(8) Process: 12771 ExecStop=/etc/rc.d/init.d/mysqld stop (code=exited, status=0/SUCCESS) Process: 12925 ExecStart=/etc/rc.d/init.d/mysqld start (code=exited, status=0/SUCCESS) CGroup: /system.slice/mysqld.service ├─12970 /bin/sh /usr/local/mariadb/bin/mysqld_safe --datadir=/usr/local/mariadb/data --pid-file=/usr/local/mariadb/data/node01.pid └─13079 /usr/local/mariadb/bin/mysqld --basedir=/usr/local/mariadb --datadir=/usr/local/mariadb/data --plugin-dir=/usr/local/mariadb/lib/plugin --user=mysql --log-error=/usr/l... Apr 28 09:59:02 node01 systemd[1]: Starting LSB: start and stop MariaDB... Apr 28 09:59:02 node01 mysqld[12925]: Starting MariaDB.200428 09:59:02 mysqld_safe Logging to '/usr/local/mariadb/data/node01.err'. Apr 28 09:59:02 node01 mysqld[12925]: 200428 09:59:02 mysqld_safe Starting mysqld daemon with databases from /usr/local/mariadb/data Apr 28 09:59:03 node01 mysqld[12925]: [ OK ] Apr 28 09:59:03 node01 systemd[1]: Started LSB: start and stop MariaDB.
-
創(chuàng)建 MariaDB 同步賬號
GRANT ALL on maxwell.* to 'maxwell'@'%' identified by '123456'; GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%'; FLUSH PRIVILEGES;
-
StreamSets 安裝 MySQL 驅(qū)動
參考:
https://blog.csdn.net/weixin_43215250/article/details/87981707 -
創(chuàng)建測試表
- 在MariaDB數(shù)據(jù)庫中創(chuàng)建測試表
CREATE DATABASE IF NOT EXISTS test; CREATE TABLE test.`binlog_test` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) NOT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 在 HUE 上創(chuàng)建 KUDU 表
CREATE DATABASE IF NOT EXISTS test; CREATE TABLE IF NOT EXISTS test.binlog_test ( id int, name String, PRIMARY key(id) ) PARTITION BY HASH PARTITIONS 16 STORED AS KUDU;
-
-
創(chuàng)建 StreamSets 的 Pipline
-
創(chuàng)建一個新的 Pipline
image -
選擇 Origins 類別毛雇,搜索 MySQL Binary Log,并拖動到畫布
image
配置 MySQL Binary Log 基本信息
image
**配置 MySQL 連接信息**
**注意:** 此處配置的 Server ID 應(yīng)與 MySQL 的 my.cnf 文件中的 server-id 保持一致玄妈。
data:image/s3,"s3://crabby-images/b50e6/b50e6537b415024bc70e022520433f0c424dc4b8" alt="image"
**配置 MySQL 賬號信息**
data:image/s3,"s3://crabby-images/820b4/820b49a81461bc0a39abebb39aa5e8d2f5352c67" alt="image"
**高級配置乾吻,根據(jù)自己的需要進(jìn)行配置,這里采用默認(rèn)**
data:image/s3,"s3://crabby-images/b104a/b104a32b6d4804b358c86fe2887f2220109cb7e3" alt="image"
**參考:**
https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Origins/MySQLBinaryLog.html?hl=mysql%2Cbinary%2Clog
-
添加表過濾的Stream Selector 1
image
配置 Stream Selector 基本信息
image
配置分流條件
${record:value("/Table") == "binlog_test"}
image -
添加表過濾的 Stream Selector 1
配置 Stream Selector 基本信息
image配置分流條件
${record:value("/Table") == "DELETE"}
image
-
添加處理日志 JavaScript Evaluator
添加解析 DELETE 類型的Binary Log 日志的 JavaScript Evaluator
image配置JavaScript腳本
for(var i = 0; i < records.length; i++) { try { var newRecord = sdcFunctions.createRecord(true); newRecord.value = records[i].value['Data']; newRecord.value.Type = records[i].value['Type']; newRecord.value.Database = records[i].value['Database']; newRecord.value.Table = records[i].value['Table']; log.info(records[i].value['Type']) output.write(newRecord); } catch (e) { // Send record to error error.write(records[i], e); } }
image
**添加解析 INSRET 和 UPDATE 類型日志的 JavaScript Evaluator**
data:image/s3,"s3://crabby-images/967ec/967ec295dc906aa8f7e9d195e8ef159701cf8007" alt="image"
**配置JavaScript腳本**
```
for(var i = 0; i < records.length; i++) {
try {
var newRecord = sdcFunctions.createRecord(true);
newRecord.value = records[i].value['OldData'];
newRecord.value.Type = records[i].value['Type'];
newRecord.value.Database = records[i].value['Database'];
newRecord.value.Table = records[i].value['Table'];
log.info(records[i].value['Type'])
output.write(newRecord);
} catch (e) {
// Send record to error
error.write(records[i], e);
}
}
```
data:image/s3,"s3://crabby-images/58b49/58b497e4a27a1247873e2c219073bc9c51016b20" alt="image"
**參考:**
https://streamsets.com/documentation/datacollector/latest/help/datacollector/UserGuide/Processors/JavaScript.html?hl=javascript%2Cevaluator
-
添加 KUDU
配置 Kudu Delete
配置Kudu基本屬性
image
配置Kudu環(huán)境信息
image
Kudu的高級配置拟蜻,這里使用默認(rèn)配置
image配置 Kudu Upsert
配置Kudu基本屬性
image配置Kudu環(huán)境信息
image
Kudu的高級配置绎签,這里使用默認(rèn)配置
image -
校驗(yàn) Pipelines 配置
image -
啟動 Pipelines
image -
Pipeline 流程測試
-
新增數(shù)據(jù)
向 MariaDB 中的 binlog_test 表中插入數(shù)據(jù)
insert into test.binlog_test values(1, '張三');
在 StreamSets 中查看的 Pipeline 狀態(tài)
image
image使用Hue查看 Kudu 表數(shù)據(jù),驗(yàn)證數(shù)據(jù)是否成功插入
image -
更新數(shù)據(jù)
更新 MariaDB 中的 binlog_test 表中數(shù)據(jù)
update test.binlog_test set name='李四' where id = 1;
在 StreamSets 中查看的 Pipeline 狀態(tài)
image
image
使用Hue查看 Kudu 表數(shù)據(jù)酝锅,驗(yàn)證數(shù)據(jù)是否成功更新
image -
刪除數(shù)據(jù)
刪除 MariaDB 中的 binlog_test 表中數(shù)據(jù)
delete from test.binlog_test where id = 1;
在 StreamSets 中查看的 Pipeline 狀態(tài)
image
image
使用Hue查看 Kudu 表數(shù)據(jù)诡必,驗(yàn)證數(shù)據(jù)是否成功刪除
image
-