一、go-mysql-transfer
go-mysql-transfer是使用Go語(yǔ)言實(shí)現(xiàn)的MySQL數(shù)據(jù)庫(kù)實(shí)時(shí)增量同步工具疗韵。能夠?qū)崟r(shí)監(jiān)聽(tīng)MySQL二進(jìn)制日志(binlog)的變動(dòng)调俘,將變更內(nèi)容形成指定格式的消息旺垒,發(fā)送到接收端先蒋。在數(shù)據(jù)庫(kù)和接收端之間形成一個(gè)高性能宛渐、低延遲的增量數(shù)據(jù)(Binlog)同步管道, 具有如下特點(diǎn):
go-mysql-transfer具有如下特點(diǎn):
1、不依賴其它組件窥翩,一鍵部署
2、集成多種接收端笔时,如:Redis仗岸、MongoDB、Elasticsearch较锡、RabbitMQ盗痒、Kafka、RocketMQ骡楼,不需要再編寫客戶端看成,開(kāi)箱即用
3、內(nèi)置豐富的數(shù)據(jù)解析吃嘿、消息生成規(guī)則梦重;支持Lua腳本兑燥,以處理更復(fù)雜的數(shù)據(jù)邏輯
4、支持監(jiān)控告警琴拧,集成Prometheus客戶端
5降瞳、高可用集群部署
6、數(shù)據(jù)同步失敗重試
7、全量數(shù)據(jù)初始化
詳情及安裝說(shuō)明 請(qǐng)參見(jiàn): MySQL Binlog 增量同步工具go-mysql-transfer實(shí)現(xiàn)詳解
項(xiàng)目開(kāi)源地址:
gitee (速度更快) :go-mysql-transfer
github:go-mysql-transfer
如果此工具對(duì)你有幫助挣饥,請(qǐng)Star支持下
二除师、配置
# app.yml
target: mongodb #目標(biāo)類型
mongodb_addrs: 127.0.0.1:27017 #mongodb連接地址,多個(gè)用逗號(hào)分隔
#mongodb_username: #mongodb用戶名扔枫,默認(rèn)為空
#mongodb_password: #mongodb密碼汛聚,默認(rèn)為空
三短荐、數(shù)據(jù)轉(zhuǎn)換規(guī)則
相關(guān)配置如下:
rule:
-
schema: eseap #數(shù)據(jù)庫(kù)名稱
table: t_user #表名稱
#order_by_column: id #排序字段倚舀,存量數(shù)據(jù)同步時(shí)不能為空
#column_lower_case:false #列名稱轉(zhuǎn)為小寫,默認(rèn)為false
#column_upper_case:false#列名稱轉(zhuǎn)為大寫,默認(rèn)為false
column_underscore_to_camel: true #列名稱下劃線轉(zhuǎn)駝峰,默認(rèn)為false
# 包含的列,多值逗號(hào)分隔忍宋,如:id,name,age,area_id 為空時(shí)表示包含全部列
#include_columns: ID,USER_NAME,PASSWORD
#exclude_columns: BIRTHDAY,MOBIE # 排除掉的列痕貌,多值逗號(hào)分隔,如:id,name,age,area_id 默認(rèn)為空
#column_mappings: CARD_NO=sfz #列名稱映射糠排,多個(gè)映射關(guān)系用逗號(hào)分隔舵稠,如:USER_NAME=account 表示將字段名USER_NAME映射為account
#default_column_values: source=binlog,area_name=合肥 #默認(rèn)的列-值,多個(gè)用逗號(hào)分隔乳讥,如:source=binlog,area_name=合肥
#date_formatter: yyyy-MM-dd #date類型格式化柱查, 不填寫默認(rèn)yyyy-MM-dd
#datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化云石,不填寫默認(rèn)yyyy-MM-dd HH:mm:ss
#mongodb相關(guān)
mongodb_database: transfer #mongodb database不能為空
#mongodb_collection: transfer_user #mongodb collection,可以為空研乒,默認(rèn)使用表名稱
示例一
t_user表汹忠,數(shù)據(jù)如下:
同步到MongoDB的數(shù)據(jù)如下:
示例二
使用如下配置:
rule:
-
schema: eseap #數(shù)據(jù)庫(kù)名稱
table: t_user #表名稱
#order_by_column: id #排序字段,存量數(shù)據(jù)同步時(shí)不能為空
column_lower_case: true #列名稱轉(zhuǎn)為小寫,默認(rèn)為false
#column_upper_case:false#列名稱轉(zhuǎn)為大寫,默認(rèn)為false
#column_underscore_to_camel: true #列名稱下劃線轉(zhuǎn)駝峰,默認(rèn)為false
# 包含的列雹熬,多值逗號(hào)分隔宽菜,如:id,name,age,area_id 為空時(shí)表示包含全部列
#include_columns: ID,USER_NAME,PASSWORD
#exclude_columns: BIRTHDAY,MOBIE # 排除掉的列,多值逗號(hào)分隔竿报,如:id,name,age,area_id 默認(rèn)為空
column_mappings: USER_NAME=account #列名稱映射铅乡,多個(gè)映射關(guān)系用逗號(hào)分隔,如:USER_NAME=account 表示將字段名USER_NAME映射為account
default_column_values: area_name=合肥 #默認(rèn)的列-值烈菌,多個(gè)用逗號(hào)分隔阵幸,如:source=binlog,area_name=合肥
#date_formatter: yyyy-MM-dd #date類型格式化, 不填寫默認(rèn)yyyy-MM-dd
#datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime芽世、timestamp類型格式化挚赊,不填寫默認(rèn)yyyy-MM-dd HH:mm:ss
#mongodb相關(guān)
mongodb_database: transfer #mongodb database不能為空
mongodb_collection: account #mongodb collection,可以為空济瓢,默認(rèn)使用表名稱
column_mappings配置項(xiàng)表示對(duì)列名稱進(jìn)行重新映射
同步到MongoDB的數(shù)據(jù)如下:
四荠割、Lua腳本
使用Lua腳本可以實(shí)現(xiàn)更復(fù)雜的數(shù)據(jù)處理邏輯,go-mysql-transfer支持Lua5.1語(yǔ)法旺矾。
示例一
t_user表蔑鹦,數(shù)據(jù)如下:
引入Lua腳本:
rule:
-
schema: eseap #數(shù)據(jù)庫(kù)名稱
table: t_user #表名稱
lua_file_path: lua/t_user_mongo.lua #lua腳本文件
mongodb_database: transfer #mongodb database不能為空
Lua腳本:
local ops = require("mongodbOps") --加載mongodb操作模塊
local row = ops.rawRow() --當(dāng)前數(shù)據(jù)庫(kù)的一行數(shù)據(jù),table類型
local action = ops.rawAction() --當(dāng)前數(shù)據(jù)庫(kù)事件,包括:insert夺克、update、delete
local result = {} -- 定義一個(gè)table
for k, v in pairs(row) do
if k ~="ID" then -- 列名不為ID
result[k] = v
end
end
local id = row["ID"] --獲取ID列的值
result["_id"] = id -- _id為MongoDB的主鍵標(biāo)識(shí)
if action == "insert" then -- 新增事件
ops.INSERT("t_user",result) -- 新增嚎朽,第一個(gè)參數(shù)為collection名稱懊直,string類型;第二個(gè)參數(shù)為要修改的數(shù)據(jù)火鼻,talbe類型
elseif action == "delete" then -- 刪除事件 -- 修改事件
ops.DELETE("t_user",id) -- 刪除室囊,第一個(gè)參數(shù)為collection名稱(string類型),第二個(gè)參數(shù)為ID
else -- 修改事件
ops.UPDATE("t_user",id,result) -- 修改魁索,第一個(gè)參數(shù)為collection名稱融撞,string類型;第二個(gè)參數(shù)為ID粗蔚;第三個(gè)參數(shù)為要修改的數(shù)據(jù)尝偎,talbe類型
end
同步到MongoDB的數(shù)據(jù)如下:
示例二
t_user表,同實(shí)例一
使用如下腳本:
local ops = require("mongodbOps") --加載mongodb操作模塊
local row = ops.rawRow() --當(dāng)前數(shù)據(jù)庫(kù)的一行數(shù)據(jù),table類型
local action = ops.rawAction() --當(dāng)前數(shù)據(jù)庫(kù)事件,包括:insert鹏控、update致扯、delete
local id = row["ID"] --獲取ID列的值
local userName = row["USER_NAME"] --獲取USER_NAME列的值
local password = row["PASSWORD"] --獲取USER_NAME列的值
local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值
local result = {} -- 定義一個(gè)table
result["_id"] = id -- _id為MongoDB的主鍵標(biāo)識(shí)
result["account"] = userName
result["password"] = password
result["createTime"] = createTime
result["source"] = "binlog" -- 數(shù)據(jù)來(lái)源
if action == "insert" then -- 只監(jiān)聽(tīng)insert事件
ops.INSERT("t_user",result) -- 新增,第一個(gè)參數(shù)為collection名稱当辐,string類型抖僵;第二個(gè)參數(shù)為要修改的數(shù)據(jù),talbe類型
end
同步到MongoDB的數(shù)據(jù)如下:
mongodbOps模塊提供的方法如下:
- INSERT: 插入操作,如:ops.INSERT(collection,result)缘揪。參數(shù)collection為集合名稱耍群,字符串類型;參數(shù)result為要插入的數(shù)據(jù)找筝,table類型
- UPDATE: 修改操作,如:ops.UPDATE(collection,id,result)蹈垢。參數(shù)collection為集合名稱,字符串類型袖裕;參數(shù)id為要修改的數(shù)據(jù)主鍵曹抬,類型不限;參數(shù)result為要修改的數(shù)據(jù)急鳄,table類型
- DELETE: 刪除操作,如:ops.DELETE(collection,id)谤民。參數(shù)collection為集合名稱,字符串類型攒岛;參數(shù)id為要修改的數(shù)據(jù)主鍵赖临,類型不限;