MySQL數(shù)據(jù)實(shí)時(shí)增量同步到RabbitMQ

一欲险、go-mysql-transfer

go-mysql-transfer是一款MySQL實(shí)時(shí)、增量數(shù)據(jù)同步工具萍鲸。能夠?qū)崟r(shí)解析MySQL二進(jìn)制日志binlog肮柜,并生成指定格式的消息,同步到接收端望薄。

go-mysql-transfer具有如下特點(diǎn):

1疟游、不依賴其它組件,一鍵部署

2式矫、集成多種接收端乡摹,如:Redis、MongoDB采转、Elasticsearch、RabbitMQ、Kafka故慈、RocketMQ板熊,不需要再編寫客戶端,開箱即用

3察绷、內(nèi)置豐富的數(shù)據(jù)解析干签、消息生成規(guī)則;支持Lua腳本拆撼,以處理更復(fù)雜的數(shù)據(jù)邏輯

4容劳、支持監(jiān)控告警,集成Prometheus客戶端

5闸度、高可用集群部署

6竭贩、數(shù)據(jù)同步失敗重試

7、全量數(shù)據(jù)初始化

詳情及安裝說明 請(qǐng)參見: MySQL Binlog 增量同步工具go-mysql-transfer實(shí)現(xiàn)詳解

項(xiàng)目開源地址:

gitee (速度更快) :go-mysql-transfer
github:go-mysql-transfer

如果此工具對(duì)你有幫助莺禁,請(qǐng)Star支持下

二留量、配置

# app.yml

#目標(biāo)類型
target: rabbitmq # 目標(biāo)類型
#rabbitmq連接配置
rabbitmq_addr: amqp://guest:guest@127.0.0.1:5672/ #連接字符串,如: amqp://guest:guest@localhost:5672/

三、數(shù)據(jù)轉(zhuǎn)換規(guī)則

相關(guān)配置如下:

rule:
  -
    schema: eseap #數(shù)據(jù)庫(kù)名稱
    table: t_user #表名稱
    #order_by_column: id #排序字段哟冬,存量數(shù)據(jù)同步時(shí)不能為空
    #column_lower_case:false #列名稱轉(zhuǎn)為小寫,默認(rèn)為false
    #column_upper_case:false#列名稱轉(zhuǎn)為大寫,默認(rèn)為false
    column_underscore_to_camel: true #列名稱下劃線轉(zhuǎn)駝峰,默認(rèn)為false
    # 包含的列楼熄,多值逗號(hào)分隔,如:id,name,age,area_id  為空時(shí)表示包含全部列
    #include_columns: ID,USER_NAME,PASSWORD
    #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列浩峡,多值逗號(hào)分隔可岂,如:id,name,age,area_id  默認(rèn)為空
    #column_mappings: USER_NAME=account #列名稱映射,多個(gè)映射關(guān)系用逗號(hào)分隔翰灾,如:USER_NAME=account 表示將字段名USER_NAME映射為account
    #default_column_values: source=binlog,area_name=合肥  #默認(rèn)的列-值青柄,多個(gè)用逗號(hào)分隔,如:source=binlog,area_name=合肥
    #date_formatter: yyyy-MM-dd #date類型格式化预侯, 不填寫默認(rèn)yyyy-MM-dd
    #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime致开、timestamp類型格式化,不填寫默認(rèn)yyyy-MM-dd HH:mm:ss
    value_encoder: json  #值編碼萎馅,支持json双戳、kv-commas、v-commas糜芳;默認(rèn)為json
    #value_formatter: ${ID}|${USER_NAME} #值格式化表達(dá)式飒货,如:${ID}|${USER_NAME},${ID}表示字段id的值、${USER_NAME}表示字段name的值
    
    #rabbitmq相關(guān)
    rabbitmq_queue: user_topic #queue名稱,可以為空峭竣,默認(rèn)使用表(Table)名稱

示例一

t_user表塘辅,數(shù)據(jù)如下:

同步到RabbitMQ的數(shù)據(jù)如下:

insert事件消息
update事件消息
delete事件消息

示例二

t_user表 同實(shí)例一

使用如下配置:

rule:
  -
    schema: eseap #數(shù)據(jù)庫(kù)名稱
    table: t_user #表名稱
    #order_by_column: id #排序字段,存量數(shù)據(jù)同步時(shí)不能為空
    column_lower_case: true #列名稱轉(zhuǎn)為小寫,默認(rèn)為false
    #column_upper_case:false#列名稱轉(zhuǎn)為大寫,默認(rèn)為false
    #column_underscore_to_camel: true #列名稱下劃線轉(zhuǎn)駝峰,默認(rèn)為false
    # 包含的列皆撩,多值逗號(hào)分隔扣墩,如:id,name,age,area_id  為空時(shí)表示包含全部列
    #include_columns: ID,USER_NAME,PASSWORD
    #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列哲银,多值逗號(hào)分隔,如:id,name,age,area_id  默認(rèn)為空
    column_mappings: USER_NAME=account #列名稱映射呻惕,多個(gè)映射關(guān)系用逗號(hào)分隔荆责,如:USER_NAME=account 表示將字段名USER_NAME映射為account
    default_column_values: area_name=合肥  #默認(rèn)的列-值,多個(gè)用逗號(hào)分隔亚脆,如:source=binlog,area_name=合肥
    #date_formatter: yyyy-MM-dd #date類型格式化做院, 不填寫默認(rèn)yyyy-MM-dd
    #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime、timestamp類型格式化濒持,不填寫默認(rèn)yyyy-MM-dd HH:mm:ss
    value_encoder: json  #值編碼键耕,支持json、kv-commas柑营、v-commas屈雄;默認(rèn)為json
    #value_formatter: ${ID}|${USER_NAME} #值格式化表達(dá)式,如:${ID}|${USER_NAME},${ID}表示字段id的值由境、${USER_NAME}表示字段name的值

    #rabbitmq相關(guān)
    rabbitmq_queue: user_topic #queue名稱,可以為空棚亩,默認(rèn)使用表(Table)名稱

column_mappings配置項(xiàng)表示對(duì)列名稱進(jìn)行重新映射

同步到Kafka的數(shù)據(jù)如下:

insert事件消息

示例三

t_user表、topic 同實(shí)例一

使用如下配置:

rule:
  -
    schema: eseap #數(shù)據(jù)庫(kù)名稱
    table: t_user #表名稱
    #order_by_column: id #排序字段虏杰,存量數(shù)據(jù)同步時(shí)不能為空
    column_lower_case: true #列名稱轉(zhuǎn)為小寫,默認(rèn)為false
    #column_upper_case:false#列名稱轉(zhuǎn)為大寫,默認(rèn)為false
    #column_underscore_to_camel: true #列名稱下劃線轉(zhuǎn)駝峰,默認(rèn)為false
    # 包含的列讥蟆,多值逗號(hào)分隔,如:id,name,age,area_id  為空時(shí)表示包含全部列
    #include_columns: ID,USER_NAME,PASSWORD
    #exclude_columns: BIRTHDAY,MOBIE # 排除掉的列纺阔,多值逗號(hào)分隔瘸彤,如:id,name,age,area_id  默認(rèn)為空
    column_mappings: USER_NAME=account #列名稱映射,多個(gè)映射關(guān)系用逗號(hào)分隔笛钝,如:USER_NAME=account 表示將字段名USER_NAME映射為account
    default_column_values: area_name=合肥  #默認(rèn)的列-值质况,多個(gè)用逗號(hào)分隔,如:source=binlog,area_name=合肥
    #date_formatter: yyyy-MM-dd #date類型格式化玻靡, 不填寫默認(rèn)yyyy-MM-dd
    #datetime_formatter: yyyy-MM-dd HH:mm:ss #datetime结榄、timestamp類型格式化,不填寫默認(rèn)yyyy-MM-dd HH:mm:ss
    value_encoder: v-commas  #值編碼囤捻,支持json臼朗、kv-commas、v-commas蝎土;默認(rèn)為json
    #value_formatter: ${ID}|${USER_NAME} #值格式化表達(dá)式视哑,如:${ID}|${USER_NAME},${ID}表示字段id的值、${USER_NAME}表示字段name的值

    #rabbitmq相關(guān)
    rabbitmq_queue: user_topic #queue名稱,可以為空誊涯,默認(rèn)使用表(Table)名稱

column_mappings配置項(xiàng)表示對(duì)列名稱進(jìn)行重新映射

value_encoder配置項(xiàng)表示消息編碼方式

同步到RabbitMQ的數(shù)據(jù)如下:

insert事件消息

四挡毅、Lua腳本

使用Lua腳本可以實(shí)現(xiàn)更復(fù)雜的數(shù)據(jù)處理邏輯,go-mysql-transfer支持Lua5.1語(yǔ)法暴构。

示例一

t_user表跪呈,數(shù)據(jù)如下:

引入Lua腳本:

rule:
  -
    schema: eseap #數(shù)據(jù)庫(kù)名稱
    table: t_user #表名稱
    lua_file_path: lua/t_user_rabbit.lua   #lua腳本文件

Lua腳本:

local json = require("json")   -- 加載json模塊
local ops = require("mqOps") --加載mq操作模塊

local row = ops.rawRow()  --當(dāng)前數(shù)據(jù)庫(kù)的一行數(shù)據(jù),table類型段磨,key為列名稱
local action = ops.rawAction()  --當(dāng)前數(shù)據(jù)庫(kù)事件,包括:insert、updare庆械、delete

local id = row["ID"] --獲取ID列的值
local userName = row["USER_NAME"] --獲取USER_NAME列的值
local password = row["PASSWORD"] --獲取USER_NAME列的值
local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值

local result = {}  -- 定義一個(gè)table,作為結(jié)果
result["id"] = id
result["action"] = action

if action == "delete" -- 刪除事件
then
    local val = json.encode(result) -- 將result轉(zhuǎn)為json
    ops.SEND("user_topic",val) -- 發(fā)送消息薇溃,第一個(gè)參數(shù)為topic(string類型)菌赖,第二個(gè)參數(shù)為消息內(nèi)容
else 
    result["userName"] = userName
    result["password"] = password
    result["createTime"] = createTime
    result["source"] = "binlog" -- 數(shù)據(jù)來源
    local val = json.encode(result) -- 將result轉(zhuǎn)為json
    ops.SEND("user_topic",val) -- 發(fā)送消息缭乘,第一個(gè)參數(shù)為topic(string類型),第二個(gè)參數(shù)為消息內(nèi)容
end 

同步到RabbitMQ的數(shù)據(jù)如下:

insert事件消息
update事件消息
delete事件消息

示例二

t_user表 同實(shí)例一

使用如下腳本:

local ops = require("mqOps") --加載mq操作模塊

local row = ops.rawRow()  --當(dāng)前數(shù)據(jù)庫(kù)的一行數(shù)據(jù),table類型琉用,key為列名稱
local action = ops.rawAction()  --當(dāng)前數(shù)據(jù)庫(kù)事件,包括:insert堕绩、updare、delete

local userName = row["USER_NAME"] --獲取USER_NAME列的值

if action == "insert" then -- 只監(jiān)聽添加事件
    local str = string.format("恭喜您:%s  注冊(cè)成功",userName)  
    ops.SEND("user_topic",str) -- 發(fā)送消息邑时,第一個(gè)參數(shù)為topic(string類型)奴紧,第二個(gè)參數(shù)為消息內(nèi)容
end 

同步到RabbitMQ的數(shù)據(jù)如下:

insert事件消息

mqOps模塊提供的方法如下:

  1. SEND: 發(fā)送操作,如:ops.SEND(topic,result)。參數(shù)topic為字符串類型晶丘;參數(shù)result為要發(fā)送的消息
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末黍氮,一起剝皮案震驚了整個(gè)濱河市,隨后出現(xiàn)的幾起案子浅浮,更是在濱河造成了極大的恐慌沫浆,老刑警劉巖,帶你破解...
    沈念sama閱讀 218,682評(píng)論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件滚秩,死亡現(xiàn)場(chǎng)離奇詭異专执,居然都是意外死亡,警方通過查閱死者的電腦和手機(jī)郁油,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,277評(píng)論 3 395
  • 文/潘曉璐 我一進(jìn)店門本股,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人桐腌,你說我怎么就攤上這事拄显。” “怎么了案站?”我有些...
    開封第一講書人閱讀 165,083評(píng)論 0 355
  • 文/不壞的土叔 我叫張陵躬审,是天一觀的道長(zhǎng)。 經(jīng)常有香客問我嚼吞,道長(zhǎng)盒件,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,763評(píng)論 1 295
  • 正文 為了忘掉前任舱禽,我火速辦了婚禮炒刁,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘誊稚。我一直安慰自己翔始,他們只是感情好罗心,可當(dāng)我...
    茶點(diǎn)故事閱讀 67,785評(píng)論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著城瞎,像睡著了一般渤闷。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上脖镀,一...
    開封第一講書人閱讀 51,624評(píng)論 1 305
  • 那天飒箭,我揣著相機(jī)與錄音,去河邊找鬼蜒灰。 笑死弦蹂,一個(gè)胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的强窖。 我是一名探鬼主播凸椿,決...
    沈念sama閱讀 40,358評(píng)論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長(zhǎng)吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼翅溺!你這毒婦竟也來了脑漫?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,261評(píng)論 0 276
  • 序言:老撾萬榮一對(duì)情侶失蹤咙崎,失蹤者是張志新(化名)和其女友劉穎优幸,沒想到半個(gè)月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體叙凡,經(jīng)...
    沈念sama閱讀 45,722評(píng)論 1 315
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡劈伴,尸身上長(zhǎng)有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 37,900評(píng)論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了握爷。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片跛璧。...
    茶點(diǎn)故事閱讀 40,030評(píng)論 1 350
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡,死狀恐怖新啼,靈堂內(nèi)的尸體忽然破棺而出追城,到底是詐尸還是另有隱情,我是刑警寧澤燥撞,帶...
    沈念sama閱讀 35,737評(píng)論 5 346
  • 正文 年R本政府宣布座柱,位于F島的核電站,受9級(jí)特大地震影響物舒,放射性物質(zhì)發(fā)生泄漏色洞。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 41,360評(píng)論 3 330
  • 文/蒙蒙 一冠胯、第九天 我趴在偏房一處隱蔽的房頂上張望火诸。 院中可真熱鬧,春花似錦荠察、人聲如沸置蜀。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,941評(píng)論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽(yáng)盯荤。三九已至馋吗,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間秋秤,已是汗流浹背宏粤。 一陣腳步聲響...
    開封第一講書人閱讀 33,057評(píng)論 1 270
  • 我被黑心中介騙來泰國(guó)打工, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留航缀,地道東北人商架。 一個(gè)月前我還...
    沈念sama閱讀 48,237評(píng)論 3 371
  • 正文 我出身青樓堰怨,卻偏偏與公主長(zhǎng)得像芥玉,于是被迫代替她去往敵國(guó)和親。 傳聞我的和親對(duì)象是個(gè)殘疾皇子备图,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 44,976評(píng)論 2 355