實現(xiàn)同步mysql數(shù)據(jù)異步同步

1.首先mysql要開啟binlog白胀,修改mysql的配置文件:

server_id=2023  #主節(jié)點的編號熟菲,保證唯一即可
log_bin=mysql-bin
binlog_format=row

2.使用mysql-binlog-connector-java來監(jiān)聽MySQL的二進制日志(binlog)事件毡琉,引入依賴谜嫉。

<dependency>
    <groupId>com.github.shyiko</groupId>
    <artifactId>mysql-binlog-connector-java</artifactId>
    <version>0.29.2</version>
</dependency>

定義配置:

mysql:
  binlog:
    host: localhost
    port: 3306
    schema: heg_hotel
    username: root
    password: 123456
    tables: t_user,t_order

3.代碼中配置:

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.stephen.listener.CustomerBinlogListener;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

/**
 * @Description 開啟binlog監(jiān)聽工具
 * @Author jack
 * @Date 2024/8/22 14:35
 */
@Configuration
public class BinlogConfig {
    @Value("${mysql.binlog.host}")
    private String host;

    @Value("${mysql.binlog.port}")
    private Integer port;

    @Value("${mysql.binlog.username}")
    private String userName;

    @Value("${mysql.binlog.password}")
    private String password;

    @Value("${mysql.binlog.schema:#{null}}")
    private String schema;

    @Value("${mysql.binlog.tables}")
    private String tables;

    @Bean
    public CustomerBinlogListener customerBinlogListener() {
        //考慮到多個數(shù)據(jù)庫的情況
        if(StringUtils.isNotBlank(schema)){
            List<String> schemaTables = Lists.newArrayList();
            //只監(jiān)聽指定庫 例如:demo.user
            Arrays.stream(tables.split(",")).forEach(table-> schemaTables.add(Joiner.on('.').join(schema,table)));
            return new CustomerBinlogListener(schemaTables);
        }
        else{
            //監(jiān)聽多個庫
            return new CustomerBinlogListener(Arrays.asList(tables.split(",")));
        }
    }

    @Bean
    public BinaryLogClient binaryLogClient() {
        //不指定schema監(jiān)聽所有的數(shù)據(jù)庫
        BinaryLogClient client = new BinaryLogClient(host,port,userName,password);
        //BinaryLogClient client = new BinaryLogClient(host,port,schema,userName,password);
        client.registerEventListener(customerBinlogListener());
        client.setServerId(1);
        client.setKeepAlive(true); // 保持連接
        client.setKeepAliveInterval(10 * 1000); // 心跳包發(fā)送頻率
        client.setKeepAliveConnectTimeout(5 * 1000); // 心跳發(fā)送超時設(shè)置
        try {
            client.connect();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        return client;
    }
}

自己實現(xiàn)監(jiān)聽事件:

import com.alibaba.fastjson.JSON;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import com.google.common.collect.Maps;
import com.stephen.model.BinlogDto;
import lombok.extern.slf4j.Slf4j;

import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


/**
 * @Description 自定義監(jiān)聽數(shù)據(jù)
 * @Author jack
 * @Date 2024/8/22 15:05
 */
@Slf4j
public class CustomerBinlogListener implements BinaryLogClient.EventListener {
    private HashMap<Long, String> tableMap = Maps.newHashMap();
    private List<String> databaseTables;

    public CustomerBinlogListener(List<String> databaseTables){
        this.databaseTables = databaseTables;
    }

    @Override
    public void onEvent(Event event) {
        // binlog事件
        EventData data = event.getData();
        if (data != null) {
            if (data instanceof TableMapEventData) {
                TableMapEventData tableMapEventData = (TableMapEventData) data;
                tableMap.put(tableMapEventData.getTableId(), tableMapEventData.getDatabase() + "." + tableMapEventData.getTable());
            }
            // update數(shù)據(jù)
            if (data instanceof UpdateRowsEventData) {
                UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data;
                String tableName = tableMap.get(updateRowsEventData.getTableId());
                if (tableName != null && databaseTables.contains(tableName)) {
                    String eventKey = tableName + ".update";
                    System.out.println("212121212121");
                    for (Map.Entry<Serializable[], Serializable[]> row : updateRowsEventData.getRows()) {
                        String msg = JSON.toJSONString(new BinlogDto(eventKey, row.getValue()));
                        log.info("binlog修改日志:{}",msg);
                    }
                }
            }
            // insert數(shù)據(jù)
            else if (data instanceof WriteRowsEventData) {
                WriteRowsEventData writeRowsEventData = (WriteRowsEventData) data;
                String tableName = tableMap.get(writeRowsEventData.getTableId());
                if (tableName != null && databaseTables.contains(tableName)) {
                    String eventKey = tableName + ".insert";
                    for (Serializable[] row : writeRowsEventData.getRows()) {
                        String msg = JSON.toJSONString(new BinlogDto(eventKey, row));
                        log.info("binlog插入日志:{}",msg);
                    }
                }
            }
            // delete數(shù)據(jù)
            else if (data instanceof DeleteRowsEventData) {
                DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) data;
                String tableName = tableMap.get(deleteRowsEventData.getTableId());
                if (tableName != null && databaseTables.contains(tableName)) {
                    String eventKey = tableName + ".delete";
                    for (Serializable[] row : deleteRowsEventData.getRows()) {
                        String msg = JSON.toJSONString(new BinlogDto(eventKey, row));
                        log.info("binlog刪除日志:{}",msg);
                    }
                }
            }
        }
    }
}

可以結(jié)合kafka等消息隊列骄酗,推送數(shù)據(jù)到es等數(shù)據(jù)倉庫

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末嘱兼,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子及舍,更是在濱河造成了極大的恐慌未辆,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,383評論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件锯玛,死亡現(xiàn)場離奇詭異咐柜,居然都是意外死亡,警方通過查閱死者的電腦和手機更振,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,522評論 3 385
  • 文/潘曉璐 我一進店門炕桨,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人肯腕,你說我怎么就攤上這事献宫。” “怎么了实撒?”我有些...
    開封第一講書人閱讀 157,852評論 0 348
  • 文/不壞的土叔 我叫張陵姊途,是天一觀的道長。 經(jīng)常有香客問我知态,道長捷兰,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,621評論 1 284
  • 正文 為了忘掉前任负敏,我火速辦了婚禮贡茅,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘其做。我一直安慰自己顶考,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 65,741評論 6 386
  • 文/花漫 我一把揭開白布妖泄。 她就那樣靜靜地躺著驹沿,像睡著了一般。 火紅的嫁衣襯著肌膚如雪蹈胡。 梳的紋絲不亂的頭發(fā)上渊季,一...
    開封第一講書人閱讀 49,929評論 1 290
  • 那天,我揣著相機與錄音罚渐,去河邊找鬼却汉。 笑死,一個胖子當(dāng)著我的面吹牛荷并,可吹牛的內(nèi)容都是我干的合砂。 我是一名探鬼主播,決...
    沈念sama閱讀 39,076評論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼璧坟,長吁一口氣:“原來是場噩夢啊……” “哼既穆!你這毒婦竟也來了赎懦?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,803評論 0 268
  • 序言:老撾萬榮一對情侶失蹤幻工,失蹤者是張志新(化名)和其女友劉穎励两,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體囊颅,經(jīng)...
    沈念sama閱讀 44,265評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡当悔,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,582評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了踢代。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片盲憎。...
    茶點故事閱讀 38,716評論 1 341
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖胳挎,靈堂內(nèi)的尸體忽然破棺而出饼疙,到底是詐尸還是另有隱情,我是刑警寧澤慕爬,帶...
    沈念sama閱讀 34,395評論 4 333
  • 正文 年R本政府宣布窑眯,位于F島的核電站,受9級特大地震影響医窿,放射性物質(zhì)發(fā)生泄漏磅甩。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 40,039評論 3 316
  • 文/蒙蒙 一姥卢、第九天 我趴在偏房一處隱蔽的房頂上張望卷要。 院中可真熱鬧,春花似錦独榴、人聲如沸僧叉。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,798評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽彪标。三九已至倍权,卻和暖如春掷豺,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背薄声。 一陣腳步聲響...
    開封第一講書人閱讀 32,027評論 1 266
  • 我被黑心中介騙來泰國打工当船, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人默辨。 一個月前我還...
    沈念sama閱讀 46,488評論 2 361
  • 正文 我出身青樓德频,卻偏偏與公主長得像,于是被迫代替她去往敵國和親缩幸。 傳聞我的和親對象是個殘疾皇子壹置,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 43,612評論 2 350

推薦閱讀更多精彩內(nèi)容