一、go-mysql-transfer
go-mysql-transfer是一款MySQL實時、增量數(shù)據(jù)同步工具帅掘。能夠?qū)崟r解析MySQL二進制日志binlog汰蓉,并生成指定格式的消息绷蹲,同步到接收端。
go-mysql-transfer具有如下特點:
1顾孽、不依賴其它組件祝钢,一鍵部署
2、集成多種接收端若厚,如:Redis拦英、MongoDB、Elasticsearch测秸、RabbitMQ疤估、Kafka灾常、RocketMQ,不需要再編寫客戶端铃拇,開箱即用
3钞瀑、內(nèi)置豐富的數(shù)據(jù)解析、消息生成規(guī)則慷荔;支持Lua腳本雕什,以處理更復(fù)雜的數(shù)據(jù)邏輯
4、支持監(jiān)控告警拧廊,集成Prometheus客戶端
5监徘、高可用集群部署
6、數(shù)據(jù)同步失敗重試
7吧碾、全量數(shù)據(jù)初始化
詳情及安裝說明 請參見: MySQL Binlog 增量同步工具go-mysql-transfer實現(xiàn)詳解
項目開源地址:
gitee (速度更快) :go-mysql-transfer
github:go-mysql-transfer
如果此工具對你有幫助凰盔,請Star支持下
二、Lua腳本引擎
go-mysql-transfer中使用gopher-lua作為Lua虛擬機倦春,支持Lua5.1規(guī)范户敬。Lua作為專業(yè)的內(nèi)置腳本語言,其設(shè)計目的是為了嵌入應(yīng)用程序中睁本,從而為應(yīng)用程序提供靈活的擴展和定制功能尿庐。開發(fā)者只需要花費少量時間就能大致掌握其用法。
基于Lua的高擴展性呢堰,可以實現(xiàn)更為復(fù)雜的數(shù)據(jù)解析抄瑟、消息生成、數(shù)據(jù)處理邏輯枉疼。
三皮假、json模塊
提供json數(shù)據(jù)格式的序列化和反序列化功能,提供encode和decode兩個方法骂维。
使用示例如下:
local json = require("json") -- 加載json模塊
local ops = require("mqOps") --加載mq操作模塊
local row = ops.rawRow() --當(dāng)前數(shù)據(jù)庫的一行數(shù)據(jù),table類型惹资,key為列名稱
local action = ops.rawAction() --當(dāng)前數(shù)據(jù)庫事件,包括:insert、updare航闺、delete
local id = row["ID"] --獲取ID列的值
local userName = row["USER_NAME"] --獲取USER_NAME列的值
local password = row["PASSWORD"] --獲取USER_NAME列的值
local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值
local result = {} -- 定義一個table,作為結(jié)果
result["id"] = id
result["action"] = action
if action == "delete" -- 刪除事件
then
local val = json.encode(result) -- 將result轉(zhuǎn)為json
ops.SEND("transfer_test_topic",val) -- 發(fā)送消息褪测,第一個參數(shù)為topic(string類型),第二個參數(shù)為消息內(nèi)容
else
result["userName"] = userName
result["password"] = password
result["createTime"] = createTime
result["source"] = "binlog" -- 數(shù)據(jù)來源
local val = json.encode(result) -- 將result轉(zhuǎn)為json
ops.SEND("transfer_test_topic",val) -- 發(fā)送消息潦刃,第一個參數(shù)為topic(string類型)侮措,第二個參數(shù)為消息內(nèi)容
-- local obj = json.decode(val ) -- json反序列化
-- print(obj ["createTime"])
四、db(數(shù)據(jù)庫操作)模塊
比如我們有角色表(t_role):
ID | CODE | NAME | REMARK |
---|---|---|---|
1 | r1 | 管理員 | 具有所有操作權(quán)限 |
2 | r2 | 測試員 | 具有測試功能的操作權(quán)限 |
用戶表(t_user):
ID | USER_NAME | PASSWORD | ROLE_CODE | CREATE_TIME |
---|---|---|---|---|
1 | admin | 123456 | r1 | 2020-10-20 22:00:10 |
我們需要監(jiān)聽t_user表福铅,并向接收端發(fā)送如下格式的消息:
{
"id": "1",
"userName": "admin"
"password": "123456",
"createTime": 100001,
"roleName": "系統(tǒng)管理員",
"roleRemark": "管理后臺相關(guān)信息",
"source": "binlog",
}
基于Binlog的數(shù)據(jù)同步工具萝毛,只能監(jiān)聽到一行數(shù)據(jù)的變更,進行響應(yīng)滑黔。無法像基于SQL的ETL工具那樣具有多表連接的能力笆包。如果要得到向上面那樣的聚合數(shù)據(jù)环揽,需要使用dbOps模塊,用法如下:
local json = require("json") -- 加載json模塊
local ops = require("mqOps") --加載mq操作模塊
local db = require("dbOps") --加載數(shù)據(jù)庫(db)操作模塊
local row = ops.rawRow() --當(dāng)前數(shù)據(jù)庫的一行數(shù)據(jù),table類型庵佣,key為列名稱
-- print(json.encode(row))
local action = ops.rawAction() --當(dāng)前數(shù)據(jù)庫事件,包括:insert歉胶、updare、delete
local id = row["ID"] --獲取ID列的值
local userName = row["USER_NAME"] --獲取USER_NAME列的值
local password = row["PASSWORD"] --獲取USER_NAME列的值
local roleCode = row["ROLE_CODE"] --角色編碼
local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值
local result = {} -- 定義一個table,作為結(jié)果
result["id"] = id
result["action"] = action
if action == "delete" -- 刪除事件
then
local val = json.encode(result) -- 將result轉(zhuǎn)為json
ops.SEND("user_topic",val) -- 發(fā)送消息巴粪,第一個參數(shù)為topic(string類型)通今,第二個參數(shù)為消息內(nèi)容
else
local sql = string.format("SELECT * FROM ESEAP.T_ROLE WHERE CODE = '%s'",roleCode) -- SQL語句,不能直接使用表名肛根,要使用(數(shù)據(jù)庫名稱.表名稱)辫塌,如:ESEAP.T_ROLE
local roleRS = db.selectOne(sql) -- 執(zhí)行SQL查詢,返回一條查詢結(jié)果派哲,table類型臼氨,結(jié)構(gòu)如:{"CODE":"a1","ID":"1","NAME":"系統(tǒng)管理員","REMARK":"管理后臺相關(guān)信息"}
-- print(json.encode(roleRS))
local roleName = roleRS["NAME"] --角色名稱
local roleRemark = roleRS["REMARK"] --角色描述
-- local roleListRS = db.select(sql) -- 執(zhí)行SQL查詢,返回多條條查詢結(jié)果芭届,數(shù)組類型,元素為table储矩,結(jié)構(gòu)如:[{"CODE":"a1","ID":"1","NAME":"系統(tǒng)管理員","REMARK":"管理后臺相關(guān)信息"}]
-- print(json.encode(roleListRS))
result["userName"] = userName
result["password"] = password
result["createTime"] = createTime
result["source"] = "binlog" -- 數(shù)據(jù)來源
result["roleName"] = roleName
result["roleRemark"] = roleRemark
local val = json.encode(result) -- 將result轉(zhuǎn)為json
ops.SEND("user_topic",val) -- 發(fā)送消息,第一個參數(shù)為topic(string類型)褂乍,第二個參數(shù)為消息內(nèi)容
end
dbOps模塊的方法說明:
1持隧、selectOne(sql) 查詢一條數(shù)據(jù),返回table類型的結(jié)果逃片;如果查詢不到數(shù)據(jù)屡拨,返回空table;如果查詢到多個結(jié)果褥实,會出錯
2洁仗、select(sql) 查詢多條數(shù)據(jù),返回數(shù)組類型的結(jié)果,數(shù)組元素為tablem(格式如:[table1,table2])性锭;查詢不到結(jié)果,返回空table叫胖;
四草冈、http客戶端模塊
讓go-mysql-transfer具體發(fā)送任意http請求的能力,httpOps提供的方法說明:
1瓮增、get(url,headers) 發(fā)送get請求怎棱;url為請求地址;headers為請求頭參數(shù)绷跑,table類型
2拳恋、delete(url,headers) 發(fā)送delete請求;url為請求地址砸捏;headers為請求頭參數(shù)谬运,table類型
3隙赁、post(url,headers,formItems) 發(fā)送post請求;url為請求地址梆暖;headers為請求頭參數(shù)伞访,table類型;formItems為表單數(shù)據(jù),table類型
4轰驳、put(url,headers,formItems) 發(fā)送put請求厚掷;url為請求地址;headers為請求頭參數(shù)级解,table類型;formItems為表單數(shù)據(jù)冒黑,table類型
上面4個方法的返回值為一個table類型的結(jié)果,元素"status_code"為http響應(yīng)狀態(tài),Number類型(如:200勤哗、401抡爹、403、500等)俺陋;元素body為http響應(yīng)內(nèi)容豁延,string類型
httpOps模塊具體用法如下:
local json = require("json") -- 加載json模塊
local ops = require("redisOps") --加載redis操作模塊
local httpcli = require("httpOps") --加載http操作模塊
local row = ops.rawRow() --數(shù)據(jù)庫當(dāng)前變更的一行數(shù)據(jù),table類型,key為列名稱
local action = ops.rawAction() --當(dāng)前數(shù)據(jù)庫事件,包括:insert腊状、updare诱咏、delete
local _id = row["ID"] --獲取ID列的值
local _userName = row["USER_NAME"] --獲取USER_NAME列的值
local _password = row["PASSWORD"] --獲取USER_NAME列的值
local _createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值
local key = "user_".._id -- 定義key
if action == "insert" -- 插入事件
then
-- get
local url = string.format("http://localhost:9999/http_tests?user_name=%s", userName)
local res = httpcli.get(url,{
Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="
}) -- http get請求,第一個參數(shù)為URL,類型為string;第二個參數(shù)為header參數(shù)缴挖,類型為table
local status = res.status_code
--print(res.status_code) -- http響應(yīng)代碼袋狞,如:200、401映屋、403苟鸯、500等
--print(res.body)-- http響應(yīng)內(nèi)容,string類型
--local resObj = json.decode(res.body) -- json反序列化響應(yīng)內(nèi)容
--print(resObj["msg"])
-- delete
--local url = string.format("http://localhost:9999/http_tests?user_name=%s", userName)
--local res = httpcli.delete(url,{
-- Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="
--}) -- http delete請求棚点,第一個參數(shù)為URL,類型為string;第二個參數(shù)為header參數(shù)早处,類型為table
-- post
--local url = "http://localhost:9999/http_tests"
--local res = httpcli.post(url,{
-- Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="
--},{
-- id=_id,
-- userName=_userName,
-- password=_password,
-- createTime=_createTime
--}) -- http post請求,第一個參數(shù)為URL,類型為string;第二個參數(shù)為header參數(shù)瘫析,類型為table;第三個參數(shù)為post內(nèi)容砌梆,類型為table
--put
--local url = "http://localhost:9999/http_tests"
--local res = httpcli.put(url,{
-- Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="
--},{
-- id=_id,
-- userName=_userName,
-- password=_password,
-- createTime=_createTime
--}) -- http put請求,第一個參數(shù)為URL,類型為string;第二個參數(shù)為header參數(shù)贬循,類型為table;第三個參數(shù)為post內(nèi)容咸包,類型為table
if status == 200
then
ops.SADD("user_set",userName.."|succeed") -- 對應(yīng)Redis的SADD命令,第一個參數(shù)為key(支持string類型)杖虾,第二個參數(shù)為value
else
ops.SADD("user_set",userName.."|failed") -- 對應(yīng)Redis的SADD命令烂瘫,第一個參數(shù)為key(支持string類型),第二個參數(shù)為value
end
end