MySQL Binlog 同步工具go-mysql-transfer Lua模塊使用說明

一、go-mysql-transfer

go-mysql-transfer是一款MySQL實時、增量數(shù)據(jù)同步工具帅掘。能夠?qū)崟r解析MySQL二進制日志binlog汰蓉,并生成指定格式的消息绷蹲,同步到接收端。

go-mysql-transfer具有如下特點:

1顾孽、不依賴其它組件祝钢,一鍵部署

2、集成多種接收端若厚,如:Redis拦英、MongoDB、Elasticsearch测秸、RabbitMQ疤估、Kafka灾常、RocketMQ,不需要再編寫客戶端铃拇,開箱即用

3钞瀑、內(nèi)置豐富的數(shù)據(jù)解析、消息生成規(guī)則慷荔;支持Lua腳本雕什,以處理更復(fù)雜的數(shù)據(jù)邏輯

4、支持監(jiān)控告警拧廊,集成Prometheus客戶端

5监徘、高可用集群部署

6、數(shù)據(jù)同步失敗重試

7吧碾、全量數(shù)據(jù)初始化

詳情及安裝說明 請參見: MySQL Binlog 增量同步工具go-mysql-transfer實現(xiàn)詳解

項目開源地址:

gitee (速度更快) :go-mysql-transfer
github:go-mysql-transfer

如果此工具對你有幫助凰盔,請Star支持下

二、Lua腳本引擎

go-mysql-transfer中使用gopher-lua作為Lua虛擬機倦春,支持Lua5.1規(guī)范户敬。Lua作為專業(yè)的內(nèi)置腳本語言,其設(shè)計目的是為了嵌入應(yīng)用程序中睁本,從而為應(yīng)用程序提供靈活的擴展和定制功能尿庐。開發(fā)者只需要花費少量時間就能大致掌握其用法。

基于Lua的高擴展性呢堰,可以實現(xiàn)更為復(fù)雜的數(shù)據(jù)解析抄瑟、消息生成、數(shù)據(jù)處理邏輯枉疼。

三皮假、json模塊

提供json數(shù)據(jù)格式的序列化和反序列化功能,提供encode和decode兩個方法骂维。
使用示例如下:

local json = require("json")   -- 加載json模塊
local ops = require("mqOps") --加載mq操作模塊

local row = ops.rawRow()  --當(dāng)前數(shù)據(jù)庫的一行數(shù)據(jù),table類型惹资,key為列名稱
local action = ops.rawAction()  --當(dāng)前數(shù)據(jù)庫事件,包括:insert、updare航闺、delete

local id = row["ID"] --獲取ID列的值
local userName = row["USER_NAME"] --獲取USER_NAME列的值
local password = row["PASSWORD"] --獲取USER_NAME列的值
local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值

local result = {}  -- 定義一個table,作為結(jié)果
result["id"] = id
result["action"] = action

if action == "delete" -- 刪除事件
then
    local val = json.encode(result) -- 將result轉(zhuǎn)為json
    ops.SEND("transfer_test_topic",val) -- 發(fā)送消息褪测,第一個參數(shù)為topic(string類型),第二個參數(shù)為消息內(nèi)容
else 
    result["userName"] = userName
    result["password"] = password
    result["createTime"] = createTime
    result["source"] = "binlog" -- 數(shù)據(jù)來源
    local val = json.encode(result) -- 將result轉(zhuǎn)為json
    ops.SEND("transfer_test_topic",val) -- 發(fā)送消息潦刃,第一個參數(shù)為topic(string類型)侮措,第二個參數(shù)為消息內(nèi)容
    -- local obj = json.decode(val ) -- json反序列化
    -- print(obj ["createTime"])

四、db(數(shù)據(jù)庫操作)模塊

比如我們有角色表(t_role):

ID CODE NAME REMARK
1 r1 管理員 具有所有操作權(quán)限
2 r2 測試員 具有測試功能的操作權(quán)限

用戶表(t_user):

ID USER_NAME PASSWORD ROLE_CODE CREATE_TIME
1 admin 123456 r1 2020-10-20 22:00:10

我們需要監(jiān)聽t_user表福铅,并向接收端發(fā)送如下格式的消息:

{
       
    "id": "1",
    "userName": "admin"
    "password": "123456",
    "createTime": 100001,
    "roleName": "系統(tǒng)管理員",
    "roleRemark": "管理后臺相關(guān)信息",
    "source": "binlog",
    
}

基于Binlog的數(shù)據(jù)同步工具萝毛,只能監(jiān)聽到一行數(shù)據(jù)的變更,進行響應(yīng)滑黔。無法像基于SQL的ETL工具那樣具有多表連接的能力笆包。如果要得到向上面那樣的聚合數(shù)據(jù)环揽,需要使用dbOps模塊,用法如下:

local json = require("json")   -- 加載json模塊
local ops = require("mqOps") --加載mq操作模塊
local db = require("dbOps") --加載數(shù)據(jù)庫(db)操作模塊

local row = ops.rawRow()  --當(dāng)前數(shù)據(jù)庫的一行數(shù)據(jù),table類型庵佣,key為列名稱
-- print(json.encode(row))
local action = ops.rawAction()  --當(dāng)前數(shù)據(jù)庫事件,包括:insert歉胶、updare、delete

local id = row["ID"] --獲取ID列的值
local userName = row["USER_NAME"] --獲取USER_NAME列的值
local password = row["PASSWORD"] --獲取USER_NAME列的值
local roleCode = row["ROLE_CODE"] --角色編碼
local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值

local result = {}  -- 定義一個table,作為結(jié)果
result["id"] = id
result["action"] = action

if action == "delete" -- 刪除事件
then
    local val = json.encode(result) -- 將result轉(zhuǎn)為json
    ops.SEND("user_topic",val) -- 發(fā)送消息巴粪,第一個參數(shù)為topic(string類型)通今,第二個參數(shù)為消息內(nèi)容
else 
    
    local sql = string.format("SELECT * FROM ESEAP.T_ROLE WHERE CODE = '%s'",roleCode) -- SQL語句,不能直接使用表名肛根,要使用(數(shù)據(jù)庫名稱.表名稱)辫塌,如:ESEAP.T_ROLE
    local roleRS = db.selectOne(sql) -- 執(zhí)行SQL查詢,返回一條查詢結(jié)果派哲,table類型臼氨,結(jié)構(gòu)如:{"CODE":"a1","ID":"1","NAME":"系統(tǒng)管理員","REMARK":"管理后臺相關(guān)信息"}
    -- print(json.encode(roleRS))
    local roleName = roleRS["NAME"] --角色名稱
    local roleRemark = roleRS["REMARK"] --角色描述
    
    -- local roleListRS = db.select(sql) -- 執(zhí)行SQL查詢,返回多條條查詢結(jié)果芭届,數(shù)組類型,元素為table储矩,結(jié)構(gòu)如:[{"CODE":"a1","ID":"1","NAME":"系統(tǒng)管理員","REMARK":"管理后臺相關(guān)信息"}]
    -- print(json.encode(roleListRS))
    
    result["userName"] = userName
    result["password"] = password
    result["createTime"] = createTime
    result["source"] = "binlog" -- 數(shù)據(jù)來源
    result["roleName"] = roleName
    result["roleRemark"] = roleRemark
    local val = json.encode(result) -- 將result轉(zhuǎn)為json
    ops.SEND("user_topic",val) -- 發(fā)送消息,第一個參數(shù)為topic(string類型)褂乍,第二個參數(shù)為消息內(nèi)容
end 

dbOps模塊的方法說明:
1持隧、selectOne(sql) 查詢一條數(shù)據(jù),返回table類型的結(jié)果逃片;如果查詢不到數(shù)據(jù)屡拨,返回空table;如果查詢到多個結(jié)果褥实,會出錯
2洁仗、select(sql) 查詢多條數(shù)據(jù),返回數(shù)組類型的結(jié)果,數(shù)組元素為tablem(格式如:[table1,table2])性锭;查詢不到結(jié)果,返回空table叫胖;

四草冈、http客戶端模塊

讓go-mysql-transfer具體發(fā)送任意http請求的能力,httpOps提供的方法說明:

1瓮增、get(url,headers) 發(fā)送get請求怎棱;url為請求地址;headers為請求頭參數(shù)绷跑,table類型
2拳恋、delete(url,headers) 發(fā)送delete請求;url為請求地址砸捏;headers為請求頭參數(shù)谬运,table類型
3隙赁、post(url,headers,formItems) 發(fā)送post請求;url為請求地址梆暖;headers為請求頭參數(shù)伞访,table類型;formItems為表單數(shù)據(jù),table類型
4轰驳、put(url,headers,formItems) 發(fā)送put請求厚掷;url為請求地址;headers為請求頭參數(shù)级解,table類型;formItems為表單數(shù)據(jù)冒黑,table類型

上面4個方法的返回值為一個table類型的結(jié)果,元素"status_code"為http響應(yīng)狀態(tài),Number類型(如:200勤哗、401抡爹、403、500等)俺陋;元素body為http響應(yīng)內(nèi)容豁延,string類型

httpOps模塊具體用法如下:

local json = require("json")   -- 加載json模塊
local ops = require("redisOps") --加載redis操作模塊
local httpcli = require("httpOps") --加載http操作模塊

local row = ops.rawRow()  --數(shù)據(jù)庫當(dāng)前變更的一行數(shù)據(jù),table類型,key為列名稱
local action = ops.rawAction()  --當(dāng)前數(shù)據(jù)庫事件,包括:insert腊状、updare诱咏、delete

local _id = row["ID"] --獲取ID列的值
local _userName = row["USER_NAME"] --獲取USER_NAME列的值
local _password = row["PASSWORD"] --獲取USER_NAME列的值
local _createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值
local key = "user_".._id -- 定義key

if action == "insert" -- 插入事件
then
    -- get
     local url = string.format("http://localhost:9999/http_tests?user_name=%s", userName) 
     local res = httpcli.get(url,{
        Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="
     }) -- http get請求,第一個參數(shù)為URL,類型為string;第二個參數(shù)為header參數(shù)缴挖,類型為table
     local status = res.status_code
    --print(res.status_code)  -- http響應(yīng)代碼袋狞,如:200、401映屋、403苟鸯、500等
    --print(res.body)-- http響應(yīng)內(nèi)容,string類型
    --local resObj = json.decode(res.body) -- json反序列化響應(yīng)內(nèi)容
    --print(resObj["msg"])
    
    
    -- delete
    --local url = string.format("http://localhost:9999/http_tests?user_name=%s", userName) 
    --local res = httpcli.delete(url,{
    --  Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="
    --}) -- http delete請求棚点,第一個參數(shù)為URL,類型為string;第二個參數(shù)為header參數(shù)早处,類型為table
    
    -- post
    --local url = "http://localhost:9999/http_tests"
    --local res = httpcli.post(url,{
    --  Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="
    --},{
    --  id=_id,
    --  userName=_userName,
    --  password=_password,
    --  createTime=_createTime
    --}) -- http post請求,第一個參數(shù)為URL,類型為string;第二個參數(shù)為header參數(shù)瘫析,類型為table;第三個參數(shù)為post內(nèi)容砌梆,類型為table
    
    --put
    --local url = "http://localhost:9999/http_tests"
    --local res = httpcli.put(url,{
    --  Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="
    --},{
    --  id=_id,
    --  userName=_userName,
    --  password=_password,
    --  createTime=_createTime
    --}) -- http put請求,第一個參數(shù)為URL,類型為string;第二個參數(shù)為header參數(shù)贬循,類型為table;第三個參數(shù)為post內(nèi)容咸包,類型為table
    
    if status == 200
    then 
        ops.SADD("user_set",userName.."|succeed") -- 對應(yīng)Redis的SADD命令,第一個參數(shù)為key(支持string類型)杖虾,第二個參數(shù)為value
    else
        ops.SADD("user_set",userName.."|failed") -- 對應(yīng)Redis的SADD命令烂瘫,第一個參數(shù)為key(支持string類型),第二個參數(shù)為value
    end
    
    
end 
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末奇适,一起剝皮案震驚了整個濱河市坟比,隨后出現(xiàn)的幾起案子芦鳍,更是在濱河造成了極大的恐慌,老刑警劉巖温算,帶你破解...
    沈念sama閱讀 216,843評論 6 502
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件怜校,死亡現(xiàn)場離奇詭異,居然都是意外死亡注竿,警方通過查閱死者的電腦和手機茄茁,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,538評論 3 392
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來巩割,“玉大人裙顽,你說我怎么就攤上這事⌒福” “怎么了愈犹?”我有些...
    開封第一講書人閱讀 163,187評論 0 353
  • 文/不壞的土叔 我叫張陵,是天一觀的道長闻丑。 經(jīng)常有香客問我漩怎,道長,這世上最難降的妖魔是什么嗦嗡? 我笑而不...
    開封第一講書人閱讀 58,264評論 1 292
  • 正文 為了忘掉前任勋锤,我火速辦了婚禮,結(jié)果婚禮上侥祭,老公的妹妹穿的比我還像新娘叁执。我一直安慰自己,他們只是感情好矮冬,可當(dāng)我...
    茶點故事閱讀 67,289評論 6 390
  • 文/花漫 我一把揭開白布谈宛。 她就那樣靜靜地躺著,像睡著了一般胎署。 火紅的嫁衣襯著肌膚如雪吆录。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,231評論 1 299
  • 那天琼牧,我揣著相機與錄音径筏,去河邊找鬼。 笑死障陶,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的聊训。 我是一名探鬼主播抱究,決...
    沈念sama閱讀 40,116評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼带斑!你這毒婦竟也來了鼓寺?” 一聲冷哼從身側(cè)響起勋拟,我...
    開封第一講書人閱讀 38,945評論 0 275
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎妈候,沒想到半個月后敢靡,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,367評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡苦银,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,581評論 2 333
  • 正文 我和宋清朗相戀三年啸胧,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片幔虏。...
    茶點故事閱讀 39,754評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡纺念,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出想括,到底是詐尸還是另有隱情陷谱,我是刑警寧澤,帶...
    沈念sama閱讀 35,458評論 5 344
  • 正文 年R本政府宣布瑟蜈,位于F島的核電站烟逊,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏铺根。R本人自食惡果不足惜宪躯,卻給世界環(huán)境...
    茶點故事閱讀 41,068評論 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望夷都。 院中可真熱鬧眷唉,春花似錦、人聲如沸囤官。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,692評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽党饮。三九已至肝陪,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間刑顺,已是汗流浹背氯窍。 一陣腳步聲響...
    開封第一講書人閱讀 32,842評論 1 269
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留蹲堂,地道東北人狼讨。 一個月前我還...
    沈念sama閱讀 47,797評論 2 369
  • 正文 我出身青樓,卻偏偏與公主長得像柒竞,于是被迫代替她去往敵國和親政供。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,654評論 2 354