一欲险、go-mysql-transfer
go-mysql-transfer是一款MySQL實(shí)時(shí)、增量數(shù)據(jù)同步工具萍鲸。能夠?qū)崟r(shí)解析MySQL二進(jìn)制日志binlog肮柜,并生成指定格式的消息,同步到接收端望薄。
go-mysql-transfer具有如下特點(diǎn):
1疟游、不依賴其它組件,一鍵部署
2式矫、集成多種接收端乡摹,如:Redis、MongoDB采转、Elasticsearch、RabbitMQ、Kafka故慈、RocketMQ板熊,不需要再編寫客戶端,開箱即用
3察绷、內(nèi)置豐富的數(shù)據(jù)解析干签、消息生成規(guī)則;支持Lua腳本拆撼,以處理更復(fù)雜的數(shù)據(jù)邏輯
4容劳、支持監(jiān)控告警,集成Prometheus客戶端
5闸度、高可用集群部署
6竭贩、數(shù)據(jù)同步失敗重試
7、全量數(shù)據(jù)初始化
詳情及安裝說明 請(qǐng)參見: MySQL Binlog 增量同步工具go-mysql-transfer實(shí)現(xiàn)詳解
項(xiàng)目開源地址:
gitee (速度更快) :go-mysql-transfer
github:go-mysql-transfer
如果此工具對(duì)你有幫助莺禁,請(qǐng)Star支持下
二留量、配置
# app.yml
#目標(biāo)類型
target: rabbitmq # 目標(biāo)類型
#rabbitmq連接配置
rabbitmq_addr: amqp://guest:guest@127.0.0.1:5672/ #連接字符串,如: amqp://guest:guest@localhost:5672/
三、數(shù)據(jù)轉(zhuǎn)換規(guī)則
相關(guān)配置如下:
rule:
-
schema: eseap #數(shù)據(jù)庫(kù)名稱
table: t_user #表名稱
#order_by_column: id #排序字段哟冬,存量數(shù)據(jù)同步時(shí)不能為空
#column_lower_case:false #列名稱轉(zhuǎn)為小寫,默認(rèn)為false
#column_upper_case:false#列名稱轉(zhuǎn)為大寫,默認(rèn)為false
column_underscore_to_camel: true #列名稱下劃線轉(zhuǎn)駝峰,默認(rèn)為false
# 包含的列楼熄,多值逗號(hào)分隔,如:id,name,age,area_id 為空時(shí)表示包含全部列
#include_columns: ID,USER_NAME,PASSWORD
#exclude_columns: BIRTHDAY,MOBIE # 排除掉的列浩峡,多值逗號(hào)分隔可岂,如:id,name,age,area_id 默認(rèn)為空
#column_mappings: USER_NAME=account #列名稱映射,多個(gè)映射關(guān)系用逗號(hào)分隔翰灾,如:USER_NAME=account 表示將字段名USER_NAME映射為account
#default_column_values: source=binlog,area_name=合肥 #默認(rèn)的列-值青柄,多個(gè)用逗號(hào)分隔,如:source=binlog,area_name=合肥
#date_formatter: yyyy-MM-dd #date類型格式化预侯, 不填寫默認(rèn)yyyy-MM-dd
#datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime致开、timestamp類型格式化,不填寫默認(rèn)yyyy-MM-dd HH:mm:ss
value_encoder: json #值編碼萎馅,支持json双戳、kv-commas、v-commas糜芳;默認(rèn)為json
#value_formatter: ${ID}|${USER_NAME} #值格式化表達(dá)式飒货,如:${ID}|${USER_NAME},${ID}表示字段id的值、${USER_NAME}表示字段name的值
#rabbitmq相關(guān)
rabbitmq_queue: user_topic #queue名稱,可以為空峭竣,默認(rèn)使用表(Table)名稱
示例一
t_user表塘辅,數(shù)據(jù)如下:
同步到RabbitMQ的數(shù)據(jù)如下:
示例二
t_user表 同實(shí)例一
使用如下配置:
rule:
-
schema: eseap #數(shù)據(jù)庫(kù)名稱
table: t_user #表名稱
#order_by_column: id #排序字段,存量數(shù)據(jù)同步時(shí)不能為空
column_lower_case: true #列名稱轉(zhuǎn)為小寫,默認(rèn)為false
#column_upper_case:false#列名稱轉(zhuǎn)為大寫,默認(rèn)為false
#column_underscore_to_camel: true #列名稱下劃線轉(zhuǎn)駝峰,默認(rèn)為false
# 包含的列皆撩,多值逗號(hào)分隔扣墩,如:id,name,age,area_id 為空時(shí)表示包含全部列
#include_columns: ID,USER_NAME,PASSWORD
#exclude_columns: BIRTHDAY,MOBIE # 排除掉的列哲银,多值逗號(hào)分隔,如:id,name,age,area_id 默認(rèn)為空
column_mappings: USER_NAME=account #列名稱映射呻惕,多個(gè)映射關(guān)系用逗號(hào)分隔荆责,如:USER_NAME=account 表示將字段名USER_NAME映射為account
default_column_values: area_name=合肥 #默認(rèn)的列-值,多個(gè)用逗號(hào)分隔亚脆,如:source=binlog,area_name=合肥
#date_formatter: yyyy-MM-dd #date類型格式化做院, 不填寫默認(rèn)yyyy-MM-dd
#datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化濒持,不填寫默認(rèn)yyyy-MM-dd HH:mm:ss
value_encoder: json #值編碼键耕,支持json、kv-commas柑营、v-commas屈雄;默認(rèn)為json
#value_formatter: ${ID}|${USER_NAME} #值格式化表達(dá)式,如:${ID}|${USER_NAME},${ID}表示字段id的值由境、${USER_NAME}表示字段name的值
#rabbitmq相關(guān)
rabbitmq_queue: user_topic #queue名稱,可以為空棚亩,默認(rèn)使用表(Table)名稱
column_mappings配置項(xiàng)表示對(duì)列名稱進(jìn)行重新映射
同步到Kafka的數(shù)據(jù)如下:
示例三
t_user表、topic 同實(shí)例一
使用如下配置:
rule:
-
schema: eseap #數(shù)據(jù)庫(kù)名稱
table: t_user #表名稱
#order_by_column: id #排序字段虏杰,存量數(shù)據(jù)同步時(shí)不能為空
column_lower_case: true #列名稱轉(zhuǎn)為小寫,默認(rèn)為false
#column_upper_case:false#列名稱轉(zhuǎn)為大寫,默認(rèn)為false
#column_underscore_to_camel: true #列名稱下劃線轉(zhuǎn)駝峰,默認(rèn)為false
# 包含的列讥蟆,多值逗號(hào)分隔,如:id,name,age,area_id 為空時(shí)表示包含全部列
#include_columns: ID,USER_NAME,PASSWORD
#exclude_columns: BIRTHDAY,MOBIE # 排除掉的列纺阔,多值逗號(hào)分隔瘸彤,如:id,name,age,area_id 默認(rèn)為空
column_mappings: USER_NAME=account #列名稱映射,多個(gè)映射關(guān)系用逗號(hào)分隔笛钝,如:USER_NAME=account 表示將字段名USER_NAME映射為account
default_column_values: area_name=合肥 #默認(rèn)的列-值质况,多個(gè)用逗號(hào)分隔,如:source=binlog,area_name=合肥
#date_formatter: yyyy-MM-dd #date類型格式化玻靡, 不填寫默認(rèn)yyyy-MM-dd
#datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime结榄、timestamp類型格式化,不填寫默認(rèn)yyyy-MM-dd HH:mm:ss
value_encoder: v-commas #值編碼囤捻,支持json臼朗、kv-commas、v-commas蝎土;默認(rèn)為json
#value_formatter: ${ID}|${USER_NAME} #值格式化表達(dá)式视哑,如:${ID}|${USER_NAME},${ID}表示字段id的值、${USER_NAME}表示字段name的值
#rabbitmq相關(guān)
rabbitmq_queue: user_topic #queue名稱,可以為空誊涯,默認(rèn)使用表(Table)名稱
column_mappings配置項(xiàng)表示對(duì)列名稱進(jìn)行重新映射
value_encoder配置項(xiàng)表示消息編碼方式
同步到RabbitMQ的數(shù)據(jù)如下:
四挡毅、Lua腳本
使用Lua腳本可以實(shí)現(xiàn)更復(fù)雜的數(shù)據(jù)處理邏輯,go-mysql-transfer支持Lua5.1語(yǔ)法暴构。
示例一
t_user表跪呈,數(shù)據(jù)如下:
引入Lua腳本:
rule:
-
schema: eseap #數(shù)據(jù)庫(kù)名稱
table: t_user #表名稱
lua_file_path: lua/t_user_rabbit.lua #lua腳本文件
Lua腳本:
local json = require("json") -- 加載json模塊
local ops = require("mqOps") --加載mq操作模塊
local row = ops.rawRow() --當(dāng)前數(shù)據(jù)庫(kù)的一行數(shù)據(jù),table類型段磨,key為列名稱
local action = ops.rawAction() --當(dāng)前數(shù)據(jù)庫(kù)事件,包括:insert、updare庆械、delete
local id = row["ID"] --獲取ID列的值
local userName = row["USER_NAME"] --獲取USER_NAME列的值
local password = row["PASSWORD"] --獲取USER_NAME列的值
local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值
local result = {} -- 定義一個(gè)table,作為結(jié)果
result["id"] = id
result["action"] = action
if action == "delete" -- 刪除事件
then
local val = json.encode(result) -- 將result轉(zhuǎn)為json
ops.SEND("user_topic",val) -- 發(fā)送消息薇溃,第一個(gè)參數(shù)為topic(string類型)菌赖,第二個(gè)參數(shù)為消息內(nèi)容
else
result["userName"] = userName
result["password"] = password
result["createTime"] = createTime
result["source"] = "binlog" -- 數(shù)據(jù)來源
local val = json.encode(result) -- 將result轉(zhuǎn)為json
ops.SEND("user_topic",val) -- 發(fā)送消息缭乘,第一個(gè)參數(shù)為topic(string類型),第二個(gè)參數(shù)為消息內(nèi)容
end
同步到RabbitMQ的數(shù)據(jù)如下:
示例二
t_user表 同實(shí)例一
使用如下腳本:
local ops = require("mqOps") --加載mq操作模塊
local row = ops.rawRow() --當(dāng)前數(shù)據(jù)庫(kù)的一行數(shù)據(jù),table類型琉用,key為列名稱
local action = ops.rawAction() --當(dāng)前數(shù)據(jù)庫(kù)事件,包括:insert堕绩、updare、delete
local userName = row["USER_NAME"] --獲取USER_NAME列的值
if action == "insert" then -- 只監(jiān)聽添加事件
local str = string.format("恭喜您:%s 注冊(cè)成功",userName)
ops.SEND("user_topic",str) -- 發(fā)送消息邑时,第一個(gè)參數(shù)為topic(string類型)奴紧,第二個(gè)參數(shù)為消息內(nèi)容
end
同步到RabbitMQ的數(shù)據(jù)如下:
mqOps模塊提供的方法如下:
- SEND: 發(fā)送操作,如:ops.SEND(topic,result)。參數(shù)topic為字符串類型晶丘;參數(shù)result為要發(fā)送的消息