mysql binlog的獲取和解析

mysql.jpeg

一 背景
1 binlog定義
binlog基本定義:二進制日志修然,也成為二進制日志救巷,記錄對數(shù)據(jù)發(fā)生或潛在發(fā)生更改的SQL語句烁峭,并以二進制的形式保存在磁盤中悲伶。
作用:MySQL的作用類似于Oracle的歸檔日志,可以用來查看數(shù)據(jù)庫的變更歷史(具體的時間點所有的SQL操作)翰意、數(shù)據(jù)庫增量備份和恢復(增量備份和基于時間點的恢復)木人、Mysql的復制(主主數(shù)據(jù)庫的復制信柿、主從數(shù)據(jù)庫的復制)。

2 開啟binlog
找到mysql的配置文件醒第,linux下一般為my.cnf在/etc 下渔嚷,window下一般為my.ini
在[mysqld]下添加

log-bin=mysql-bin
binlog_format="ROW"

添加完成后重啟mysql

mysql> show binary logs;

會顯示如下:

+------------------+-----------+
| Log_name         | File_size |
+------------------+-----------+
| mysql-bin.000001 |       732 |
+------------------+-----------+

3 binlog格式
mysql的binlog有多種格式
a Statement:每一條會修改數(shù)據(jù)的sql都會記錄在binlog中
b Row:不記錄sql語句上下文相關信息,僅保存哪條記錄被修改
c Mixed:是以上兩種level的混合使用稠曼,一般的語句修改使用statment格式保存binlog形病,如一些函數(shù),statement無法完成主從復制的操作蒲列,則采用row格式保存binlog
注:我們的binlog-access只支持row格式的解析

二 binlog-accessor
由于我們的項目中需要實時獲取mysql中某些字段的修改窒朋,考慮到添加觸發(fā)器或者在代碼層面監(jiān)聽修改過大搀罢,因此最終決定通過監(jiān)聽myslq的binlog來完成蝗岖。
調(diào)研了一些現(xiàn)有的方案后,最終基于open-replicator實現(xiàn)了一套binlog的監(jiān)聽及解析程序榔至。

1 open-replicator
open-replicator是一個開源的binlog解析框架抵赢。

https://github.com/whitesock/open-replicator

它的主要原理是將自己偽裝成一臺mysql的備庫從而從主庫獲取binlog數(shù)據(jù)。
比如刪除mysql中的一條數(shù)據(jù)唧取,open-replicator會返回:

DeleteRowsEventV2[header=BinlogEventV4HeaderImpl[timestamp=1488177443000,eventType=32,serverId=1,eventLength=72,nextPosition=1653,flags=0,timestampOfReceipt=1488177443997],tableId=116,reserved=1,extraInfoLength=2,extraInfo=<null>,columnCount=5,usedColumns=11111,rows=[Row[columns=[13, 0, 0, 0, 100]]]]

這個返回結(jié)果基本和binlog的格式完全一樣铅鲤,但對于我們實際的使用中,有許多不方便的地方枫弟。
比如:tableId是mysql內(nèi)部使用的邢享,如果對外使用,我們需要將tableId翻譯為tableName淡诗。還有row的值骇塘,只描述了原始值,并沒有描述列的字段名韩容。鑒于此款违,我們需要對open-replicator做諸多的加工。

2 加工數(shù)據(jù)
我們只關注binlog中的4種event類型
a tableMapEvent群凶,該event主要描述tableId和tableName的對應
b insertEvent插爹,該event描述insert事件
c updateEvent,該event描述update事件
d deleteEvent请梢,該event描述delete事件
加工分為兩個截斷
a 通過tableId獲取tableName(解析tableMapEvent)
b 獲取每個字段的列名赠尾,主要功過調(diào)用 desc tableName 得到
加工后的輸出結(jié)果為一個bean:

@Data
public class RowDiffModel {
    long timestamp;
    String tableName;
    List<String> pkColumnName = new ArrayList<>();  //主鍵列
    List<Object> pk = new ArrayList<>();
    int type;  //1 新建 //2 更新 //3 刪除
    List<String> diffColumns = new ArrayList<>();
    Map<String, Object> preValue = new HashMap<>();
    Map<String, Object> newValue = new HashMap<>();
}

比如上條的刪除事件,加工后返回的結(jié)果為:

RowDiffModel(timestamp=1488177443000, tableName=lx_charge.user_fund, pkColumnName=[], pk=[], type=3, diffColumns=[user_id, invest, extend, rebate, balance], preValue={extend=0, balance=100, user_id=13, rebate=0, invest=0}, newValue={})

3 訂閱數(shù)據(jù)
我們將加工后的binlog發(fā)送到rabbitmq的一個topic中毅弧,所有的需求放訂閱需要的數(shù)據(jù)即可气嫁。這里貼一個訂閱的示例:

@Service
public class RowDiffRawMessageConsumerPool {
        private static final String EXCHANGE = "db-diff";
        private static final String ROUTING = "row-diff";
        private static final String QUEUE = "row-diff-raw";

        @Autowired
        ConnectionFactory connectionFactory;

        private ThreadPoolConsumer<RowDiffModel> threadPoolConsumer;

        @PostConstruct
        public void init() {
            MQAccessBuilder mqAccessBuilder = new MQAccessBuilder(connectionFactory);
            MessageProcess<RowDiffModel> messageProcess = message -> {
                System.out.println("received: " + message);

                return new DetailRes(true, "");
            };

            threadPoolConsumer = new ThreadPoolConsumer.ThreadPoolConsumerBuilder<RowDiffModel>()
                    .setThreadCount(Constants.CONSUMER_THREAD_COUNT).setIntervalMils(Constants.INTERVAL_MILS)
                    .setExchange(EXCHANGE).setRoutingKey(ROUTING).setQueue(QUEUE).setType("topic")
                    .setMQAccessBuilder(mqAccessBuilder).setMessageProcess(messageProcess)
                    .build();
        }

        public void start() throws IOException {
            threadPoolConsumer.start();
        }

        public void stop() {
            threadPoolConsumer.stop();
        }
}

在本例中,將所有的binlog直接打印形真。
關于rabbitmq的使用請參考

http://www.reibang.com/p/4112d78a8753

4 高可用性
任何一個項目都需要考慮高可用性杉编,尤其是一些偏底層的模塊超全。在binlog-access中,我們從兩方面考慮高可用性
a mysql的可用性邓馒。我們需要考慮mysql掛掉嘶朱,網(wǎng)絡異常的情況。我們對原始的open-replicator做了一個加強光酣,重寫了它的start方法疏遏,保證在各種情況下的自動重試

    @Override
    public void start() {
        new Thread(() -> {
            while (!stop) {
                try {
                    if (!isRunning()) {
                        if (this.transport != null
                                || this.binlogParser != null) {
                            this.stopQuietly(0, TimeUnit.SECONDS);
                            this.transport = null;
                            this.binlogParser = null;
                        }

                        BinlogMeta binlogMeta = binlogMetaBuilder.getBinlogMeta();
                        setBinlogFileName(binlogMeta.getBinlogName());
                        setBinlogPosition(binlogMeta.getPos());

                        log.info(binlogMeta.toString());

                        super.start();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    try {
                        Thread.sleep(10 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }

b 多機器部署。為了避免單點效應救军,我們需要將binlog-access支持多機部署财异。這里引入redis來保證不會發(fā)送重復數(shù)據(jù)到topic中,主要通過日志偏移去重:

    @Log
    public DetailRes send(long pos, List<RowDiffModel> rowDiffModels) {
        if (redisCache.cacheIfAbsent("binlog:" + pos, Constants.TIMESTAMP_VALID_TIME)) {
            DetailRes detailRes = new DetailRes(true, "");

            for (RowDiffModel rowDiffModel : rowDiffModels) {
                if (detailRes.isSuccess()) {
                    String dbName = rowDiffModel.getTableName().split("\\.")[0].toLowerCase();

                    if (dbSet.isEmpty()) {
                        detailRes = messageSender.send(rowDiffModel);
                    } else {
                        if (dbSet.contains(dbName)) {
                            detailRes = messageSender.send(rowDiffModel);
                        }
                    }
                } else {
                    break;
                }
            }

            return detailRes;
        } else {
            return new DetailRes(true, "");
        }
    }

關于redis的使用唱遭,請參考

http://www.reibang.com/p/fa036f364ae2

5 項目依賴
a open-replicator

<dependency>
    <groupId>com.flipkart</groupId>
    <artifactId>open-replicator</artifactId>
    <version>1.0.8</version>
</dependency>

b rabbitmq-access

<dependency>
    <groupId>com.littlersmall.rabbitmq-access</groupId>
    <artifactId>rabbitmq-access</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

:該模塊需要自己打包成jar包導入項目或者deploy在自己的代碼庫中
c redis-access

<dependency>
    <groupId>com.littlersmall.redis-access</groupId>
    <artifactId>redis-access</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>

:同上

三 binlog-access的使用
1 準備好所依賴的jar包(或deploy在自己的代碼庫中戳寸,rabbitmq-access & redis-access)
2 安裝好rabbitmq和redis
3 確定所監(jiān)聽的mysql開啟了binlog,且binlog的格式為ROW
4 配置文件(resources/application.properties)拷泽,如下

#db
db.host=127.0.0.1
db.port=3306
db.username=root
db.password=root
db.url=jdbc:mysql://${db.host}:${db.port}/?useUnicode=true&characterEncoding=utf8

#rabbitmq
rabbit.ip=127.0.0.1
rabbit.port=5672
rabbit.user_name=guest
rabbit.password=guest

#redis
redis.ip=127.0.0.1
redis.port=6379

#監(jiān)聽的庫','分割疫鹊,例如: diff.db=user,info,不配置則表示監(jiān)聽全部庫
diff.db=

5 權(quán)限配置司致。需要確保mysql賬戶擁有備庫的全部權(quán)限+所有表的權(quán)限
6 項目啟動:java -jar binlog-access.jar

項目代碼見

https://github.com/littlersmall/binlog-access

路過的麻煩點個星星拆吆,謝謝(__)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個濱河市脂矫,隨后出現(xiàn)的幾起案子枣耀,更是在濱河造成了極大的恐慌,老刑警劉巖庭再,帶你破解...
    沈念sama閱讀 221,548評論 6 515
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件捞奕,死亡現(xiàn)場離奇詭異,居然都是意外死亡佩微,警方通過查閱死者的電腦和手機缝彬,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,497評論 3 399
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來哺眯,“玉大人谷浅,你說我怎么就攤上這事∧套浚” “怎么了一疯?”我有些...
    開封第一講書人閱讀 167,990評論 0 360
  • 文/不壞的土叔 我叫張陵,是天一觀的道長夺姑。 經(jīng)常有香客問我墩邀,道長,這世上最難降的妖魔是什么盏浙? 我笑而不...
    開封第一講書人閱讀 59,618評論 1 296
  • 正文 為了忘掉前任眉睹,我火速辦了婚禮荔茬,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘竹海。我一直安慰自己慕蔚,他們只是感情好,可當我...
    茶點故事閱讀 68,618評論 6 397
  • 文/花漫 我一把揭開白布斋配。 她就那樣靜靜地躺著孔飒,像睡著了一般。 火紅的嫁衣襯著肌膚如雪艰争。 梳的紋絲不亂的頭發(fā)上坏瞄,一...
    開封第一講書人閱讀 52,246評論 1 308
  • 那天,我揣著相機與錄音甩卓,去河邊找鬼鸠匀。 笑死,一個胖子當著我的面吹牛猛频,可吹牛的內(nèi)容都是我干的狮崩。 我是一名探鬼主播,決...
    沈念sama閱讀 40,819評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼鹿寻,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了诽凌?” 一聲冷哼從身側(cè)響起毡熏,我...
    開封第一講書人閱讀 39,725評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎侣诵,沒想到半個月后痢法,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 46,268評論 1 320
  • 正文 獨居荒郊野嶺守林人離奇死亡杜顺,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,356評論 3 340
  • 正文 我和宋清朗相戀三年财搁,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片躬络。...
    茶點故事閱讀 40,488評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡尖奔,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出穷当,到底是詐尸還是另有隱情提茁,我是刑警寧澤,帶...
    沈念sama閱讀 36,181評論 5 350
  • 正文 年R本政府宣布馁菜,位于F島的核電站茴扁,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏汪疮。R本人自食惡果不足惜峭火,卻給世界環(huán)境...
    茶點故事閱讀 41,862評論 3 333
  • 文/蒙蒙 一毁习、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧卖丸,春花似錦蜓洪、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,331評論 0 24
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至粹湃,卻和暖如春恐仑,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背为鳄。 一陣腳步聲響...
    開封第一講書人閱讀 33,445評論 1 272
  • 我被黑心中介騙來泰國打工裳仆, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留,地道東北人孤钦。 一個月前我還...
    沈念sama閱讀 48,897評論 3 376
  • 正文 我出身青樓歧斟,卻偏偏與公主長得像,于是被迫代替她去往敵國和親偏形。 傳聞我的和親對象是個殘疾皇子静袖,可洞房花燭夜當晚...
    茶點故事閱讀 45,500評論 2 359

推薦閱讀更多精彩內(nèi)容