一、go-mysql-transfer
go-mysql-transfer是使用Go語言實現的MySQL數據庫實時增量同步工具。能夠實時監(jiān)聽MySQL二進制日志(binlog)的變動,將變更內容形成指定格式的消息祥国,發(fā)送到接收端鸵隧。在數據庫和接收端之間形成一個高性能、低延遲的增量數據(Binlog)同步管道, 具有如下特點:
1草添、不依賴其它組件,一鍵部署
2扼仲、集成多種接收端远寸,如:Redis抄淑、MongoDB、Elasticsearch驰后、RabbitMQ肆资、Kafka、RocketMQ灶芝,不需要再編寫客戶端郑原,開箱即用
3、內置豐富的數據解析夜涕、消息生成規(guī)則犯犁;支持Lua腳本,以處理更復雜的數據邏輯
4钠乏、支持監(jiān)控告警栖秕,集成Prometheus客戶端
5、高可用集群部署
6晓避、數據同步失敗重試
7簇捍、全量數據初始化
詳情及安裝說明 請參見: MySQL Binlog 增量同步工具go-mysql-transfer實現詳解
項目開源地址:
gitee (速度更快) :go-mysql-transfer
github:go-mysql-transfer
如果此工具對你有幫助,請Star支持下
二俏拱、配置
Redis部署模式可以為單機暑塑、主從(哨兵)、集群(cluster)锅必。相關配置如下:
# app.yml
redis_addrs: 127.0.0.1:6379 #地址事格,多個用逗號分隔
#redis_group_type: cluster # 集群類型 sentinel或者cluster
#redis_master_name: mymaster # Master節(jié)點名稱,如果group_type為sentinel則此項不能為空,為cluster此項無效
#redis_pass: 123456 #redis密碼
#redis_database: 0 #redis數據庫 0-16,默認0搞隐。如果group_type為cluster此項無效
三驹愚、數據轉換規(guī)則
相關配置如下:
rule:
-
schema: eseap #數據庫名稱
table: t_user #表名稱
#order_by_column: id #排序字段,存量數據同步時不能為空
#column_lower_case:false #列名稱轉為小寫,默認為false
#column_upper_case:false#列名稱轉為大寫,默認為false
column_underscore_to_camel: true #列名稱下劃線轉駝峰,默認為false
# 包含的列劣纲,多值逗號分隔逢捺,如:id,name,age,area_id 為空時表示包含全部列
#include_columns: ID,USER_NAME,PASSWORD
#exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號分隔癞季,如:id,name,age,area_id 默認為空
#column_mappings: CARD_NO=sfz #列名稱映射劫瞳,多個映射關系用逗號分隔,如:USER_NAME=account 表示將字段名USER_NAME映射為account
#default_column_values: source=binlog,area_name=合肥 #默認的列-值绷柒,多個用逗號分隔志于,如:source=binlog,area_name=合肥
#date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認yyyy-MM-dd
#datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime废睦、timestamp類型格式化伺绽,不填寫默認yyyy-MM-dd HH:mm:ss
#lua_file_path: lua/t_user.lua #lua腳本文件,詳見使用手冊,當此值不為空時后面的配置除redis_structure其余均無效
#lua_script: #lua 腳本憔恳,詳見使用手冊瓤荔,當此值不為空時后面的配置均無效
value_encoder: json #值編碼
#value_formatter: ${ID}|${USER_NAME} #值格式化表達式净蚤,如:${ID}|${USER_NAME},${ID}表示字段id的值钥组、${USER_NAME}表示字段name的值
#redis相關
redis_structure: string # 數據類型。 支持string今瀑、hash程梦、list、set橘荠、sortedset類型(與redis的數據類型一致)
redis_key_prefix: USER_ #key的前綴
redis_key_column: USER_NAME #使用哪個列的值作為key屿附,不填寫默認使用主鍵
#redis_key_formatter: ${id}-${name} # KEY格式化表達式,如:${ID}-${USER_NAME},${ID}表示字段id的值哥童、${USER_NAME}表示字段name的值
#redis_key_value: user #KEY的值(固定值)挺份;當redis_structure為hash、list贮懈、set匀泊、sortedset此值不能為空
#redis_hash_field_prefix: _CARD_ #hash的field前綴,僅redis_structure為hash時起作用
#redis_hash_field_column: Cert_No #使用哪個列的值作為hash的field朵你,僅redis_structure為hash時起作用各聘,不填寫默認使用主鍵
value_encoder表示值編碼方式,不填寫默認為json抡医,支持如下編碼方式:
格式 | 說明 | 舉例 |
---|---|---|
json | json | {"id": "1001","userName": "admin","password": "123456", "createTime": "2020-07-20 14:29:19"} |
kv-commas | key-value逗號分隔 | id=1001,userName=admin,password=123456,createTime=2020-07-20 14:29:19 |
v-commas | value逗號分隔 | 1001,admin,123456,2020-07-20 14:29:19 |
示例
t_user表躲因,數據如下:
同步為string類型
配置如下:
schema: eseap #數據庫名稱
table: t_user #表名稱
column_underscore_to_camel: true #列名稱下劃線轉駝峰,默認為false
value_encoder: json #值編碼
redis_structure: string # 數據類型。 支持string忌傻、hash大脉、list、set水孩、sortedset類型(與redis的數據類型一致)
redis_key_prefix: USER_ #key的前綴
redis_key_column: USER_NAME #使用哪個列的值作為key镰矿,不填寫默認使用主鍵
同步到Redis的數據如下:
同步為hash類型
配置如下:
column_underscore_to_camel: true #列名稱下劃線轉駝峰,默認為false
value_encoder: json #值編碼,支持json荷愕、kv-commas衡怀、v-commas
redis_structure: hash
redis_key_value: user_cache #key的值(固定值);當redis_structure為hash安疗、list抛杨、set、sortedset此值不能為空
redis_hash_field_prefix: user_name_ #hash的field前綴荐类,僅redis_structure為hash時起作用
redis_hash_field_column: user_name #使用哪個列的值作為hash的field怖现,僅redis_structure為hash時起作用,不填寫默認使用主鍵
同步到Redis的數據如下:
使用規(guī)則能將一個table映射成為一個HASH,如果需要將talbe中的每一行映射成一個HASH屈嗤,可以使用Lua腳本實現潘拨,詳請參見下面示例。
同步為list類型
配置如下:
value_formatter: ${ID}|${USER_NAME} # 值格式化表達式饶号,如:${ID}|${USER_NAME},${ID}表示字段id的值铁追、${USER_NAME}表示字段name的值
redis_structure: list
redis_key_value: user_list #key的值(固定值);當redis_structure為hash茫船、list琅束、set、sortedset此值不能為空
value_formatter為值格式化表達式算谈,value_formatter不為空時value_encoder無效涩禀。
同步到Redis的數據如下:
同步為set類型
配置如下:
value_formatter: ${ID}|${USER_NAME} #值格式化表達式,如:${ID}|${USER_NAME},${ID}表示字段id的值然眼、${USER_NAME}表示字段name的值
redis_structure: set
redis_key_value: user_set #key的值(固定值)艾船;當redis_structure為hash、list高每、set屿岂、sortedset此值不能為空
同步到Redis的數據如下:
同步為Sorted Set類型
t_user表,數據如下:
配置如下:
value_formatter: ${ID}|${USER_NAME} #值格式化表達式觉义,如:${ID}|${USER_NAME},${ID}表示字段id的值雁社、${USER_NAME}表示字段name的值
redis_structure: sortedset
redis_key_value: users #key的值(固定值);當redis_structure為hash晒骇、list霉撵、set、sortedset此值不能為空
redis_sorted_set_score_column: CREATE_TIME #sortedset的score洪囤,當數據類型為sortedset時徒坡,此項不能為空,此項的值應為數字類型
同步到Redis的數據如下:
四瘤缩、Lua腳本
使用Lua腳本可以實現更復雜的數據處理邏輯喇完,go-mysql-transfer支持Lua5.1語法。
示例
t_user表剥啤,數據如下:
引入Lua腳本:
rule:
-
schema: eseap
table: t_user
lua_file_path: lua/t_user_redis.lua #lua腳本文件
Lua腳本:
local json = require("json") -- 加載json模塊
local ops = require("redisOps") --加載redis操作模塊
local row = ops.rawRow() --數據庫當前變更的一行數據,table類型锦溪,key為列名稱
local action = ops.rawAction() --當前數據庫事件,包括:insert、updare府怯、delete
local id = row["ID"] --獲取ID列的值
local userName = row["USER_NAME"] --獲取USER_NAME列的值
local key = "user_"..id -- 定義key
if action == "delete" -- 刪除事件
then
ops.DEL(key)
ops.SREM("user_set",userName)
else
local password = row["PASSWORD"] --獲取USER_NAME列的值
local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值
local result = {} -- 定義一個table
result["id"] = id
result["userName"] = userName
result["password"] = password
result["createTime"] = createTime
result["source"] = "binlog" -- 數據來源
local val = json.encode(result) -- 將newTable轉為json
ops.SET(key,val) -- 對應Redis的SET命令刻诊,第一個參數為key(支持string類型),第二個參數為value
if action == "update" -- 修改事件
then
local oldRow = ops.rawOldRow() --數據庫變更之前的數據(修改之前的數據)
local oldUserName = oldRow["USER_NAME"] --獲取USER_NAME列的值
ops.SREM("user_set",oldUserName) -- 刪除舊值
end
ops.SADD("user_set",userName) -- 對應Redis的SADD命令牺丙,第一個參數為key(支持string類型)则涯,第二個參數為value
end
同步到Redis的數據如下:
將talbe中的一行映射成一個HASH复局,腳本如下:
local ops = require("redisOps") --加載redis操作模塊
local row = ops.rawRow() --當前數據庫的一行數據,table類型,key為列名稱
local action = ops.rawAction() --當前數據庫事件,包括:insert粟判、updare亿昏、delete
if action == "insert" -- 只監(jiān)聽insert事件
then
local key = row["USER_NAME"] --獲取USER_NAME列的值
local id = row["ID"] --獲取ID列的值
local userName = row["USER_NAME"] --獲取USER_NAME列的值
local password = row["PASSWORD"] --獲取PASSWORD列的值
local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值
ops.HSET(key,"id",id) -- 對應Redis的HSET命令
ops.HSET(key,"userName",userName) -- 對應Redis的HSET命令
ops.HSET(key,"password",password) -- 對應Redis的HSET命令
ops.HSET(key,"createTime",createTime) -- 對應Redis的HSET命令
end
同步到Redis的數據如下:
redisOps模塊提供的方法如下:
- SET: Redis字符串命令,設置指定key的值。如:ops.SET(key,val)
- DEL: Redis字符串命令,刪除指定key的值档礁。如:ops.DEL(key)
- HSET: Redishash命令,設置哈希表key中的字段field的值角钩。如:ops.HSET(key,field,val)
- HDEL: Redishash命令,設置哈希表key中的字段。如:ops.HDEL(key,field)
- RPUSH: Redis列表命令,將值插入到列表key的頭部事秀。如:ops.RPUSH(key,val)
- LREM: Redis列表命令,移除列表key的值彤断。如:ops.LREM(key,val)
- SADD: Redis集合命令,向集合key添加值野舶。如:ops.SADD(key,val)
- SREM: Redis集合命令,移除集合key的值易迹。如:ops.SREM(key,val)
- ZADD: Redis有序集合命令,向有序集合key添加值。如:ops.ZADD(key,score,val)
- ZREM: Redis有序集合命令,移除有序集合key的值平道。如:ops.ZREM(key,val)