MySQL數(shù)據(jù)同步Redis

一 妒牙、 業(yè)務(wù)場景

使用mysql 數(shù)據(jù)存儲孽亲,Redis 做緩存查詢。

二顷牌、 架構(gòu)如下

三剪芍、實施步驟

1.下載最新版本的https://github.com/alibaba/canal/releases/

2. 可以在linux 或者windows上執(zhí)行都可以。 ?修改canal/conf/example/instance.properties

3.啟動 canal

./canal/startup.sh

4.java 連接canal

4.1 ? 配置pom.xml

redis.clientsjedis2.4.2


<dependency>? ? ? ? <groupId>com.alibaba.otter</groupId>? ? ? ? <artifactId>canal.client</artifactId>? ? ? ? <version>1.0.24</version>?

?</dependency>?

注意此處spring boot里面配置的時候會報錯窟蓝。需要處理一下罪裹。

4.2 ?編寫canal客戶的程序


import java.net.InetSocketAddress; ?import java.util.List; ?

import com.alibaba.fastjson.JSONObject;

import com.alibaba.otter.canal.client.CanalConnector; ?

import com.alibaba.otter.canal.common.utils.AddressUtils; ?

import com.alibaba.otter.canal.protocol.Message; ?

import com.alibaba.otter.canal.protocol.CanalEntry.Column; ?

import com.alibaba.otter.canal.protocol.CanalEntry.Entry; ?

import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; ?

import com.alibaba.otter.canal.protocol.CanalEntry.EventType; ?

import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; ?

import com.alibaba.otter.canal.protocol.CanalEntry.RowData; ?

import com.alibaba.otter.canal.client.*; ?

public class CanalClient{ ?

? ?public static void main(String args[]) { ?

? ? ? ?CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), ?

? ? ? ? ? ? ? ?11111), "example", "", ""); ?

? ? ? ?int batchSize = 1000; ?

? ? ? ?try { ?

? ? ? ? ? ?connector.connect(); ?

? ? ? ? ? ?connector.subscribe(".*\\..*"); ?

? ? ? ? ? ?connector.rollback(); ? ?

? ? ? ? ? ?while (true) { ?

? ? ? ? ? ? ? ?Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù) ?

? ? ? ? ? ? ? ?long batchId = message.getId(); ?

? ? ? ? ? ? ? ?int size = message.getEntries().size(); ?

? ? ? ? ? ? ? ?if (batchId == -1 || size == 0) { ?

? ? ? ? ? ? ? ? ? ?try { ?

? ? ? ? ? ? ? ? ? ? ? ?Thread.sleep(1000); ?

? ? ? ? ? ? ? ? ? ?} catch (InterruptedException e) { ?

? ? ? ? ? ? ? ? ? ? ? ?e.printStackTrace(); ?

? ? ? ? ? ? ? ? ? ?} ?

? ? ? ? ? ? ? ?} else { ?

? ? ? ? ? ? ? ? ? ?printEntry(message.getEntries()); ?

? ? ? ? ? ? ? ?} ?

? ? ? ? ? ? ? ?connector.ack(batchId); // 提交確認(rèn) ?

? ? ? ? ? ? ? ?// connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù) ?

? ? ? ? ? ?} ?

? ? ? ?} finally { ?

? ? ? ? ? ?connector.disconnect(); ?

? ? ? ?} ?

? ?} ?

? ?private static void printEntry( List<Entry> entrys) { ?

? ? ? ?for (Entry entry : entrys) { ?

? ? ? ? ? ?if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { ?

? ? ? ? ? ? ? ?continue; ?

? ? ? ? ? ?} ?

? ? ? ? ? ?RowChange rowChage = null; ?

? ? ? ? ? ?try { ?

? ? ? ? ? ? ? ?rowChage = RowChange.parseFrom(entry.getStoreValue()); ?

? ? ? ? ? ?} catch (Exception e) { ?

? ? ? ? ? ? ? ?throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), ?

? ? ? ? ? ? ? ? ? ? ? ?e); ?

? ? ? ? ? ?} ?

? ? ? ? ? ?EventType eventType = rowChage.getEventType(); ?

? ? ? ? ? ?System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", ?

? ? ? ? ? ? ? ? ? ?entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), ?

? ? ? ? ? ? ? ? ? ?entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), ?

? ? ? ? ? ? ? ? ? ?eventType)); ?

? ? ? ? ? ?for (RowData rowData : rowChage.getRowDatasList()) { ?

? ? ? ? ? ? ? ?if (eventType == EventType.DELETE) { ?

? ? ? ? ? ? ? ? ? ?redisDelete(rowData.getBeforeColumnsList()); ?

? ? ? ? ? ? ? ?} else if (eventType == EventType.INSERT) { ?

? ? ? ? ? ? ? ? ? ?redisInsert(rowData.getAfterColumnsList()); ?

? ? ? ? ? ? ? ?} else { ?

? ? ? ? ? ? ? ? ? ?System.out.println("-------> before"); ?

? ? ? ? ? ? ? ? ? ?printColumn(rowData.getBeforeColumnsList()); ?

? ? ? ? ? ? ? ? ? ?System.out.println("-------> after"); ?

? ? ? ? ? ? ? ? ? ?redisUpdate(rowData.getAfterColumnsList()); ?

? ? ? ? ? ? ? ?} ?

? ? ? ? ? ?} ?

? ? ? ?} ?

? ?} ?

? ?private static void printColumn( List<Column> columns) { ?

? ? ? ?for (Column column : columns) { ?

? ? ? ? ? ?System.out.println(column.getName() + " : " + column.getValue() + " ? ?update=" + column.getUpdated()); ?

? ? ? ?} ?

? ?} ?

? private static void redisInsert( List<Column> columns){

? ? ? JSONObject json=new JSONObject();

? ? ? for (Column column : columns) { ?

? ? ? ? ? json.put(column.getName(), column.getValue()); ?

? ? ? ?} ?

? ? ? if(columns.size()>0){

? ? ? ? ? RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());

? ? ? }

? ?}

? private static ?void redisUpdate( List<Column> columns){

? ? ? JSONObject json=new JSONObject();

? ? ? for (Column column : columns) { ?

? ? ? ? ? json.put(column.getName(), column.getValue()); ?

? ? ? ?} ?

? ? ? if(columns.size()>0){

? ? ? ? ? RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());

? ? ? }

? }

? ?private static ?void redisDelete( List<Column> columns){

? ? ? ?JSONObject json=new JSONObject();

? ? ? ? ? for (Column column : columns) { ?

? ? ? ? ? ? ? json.put(column.getName(), column.getValue()); ?

? ? ? ? ? ?} ?

? ? ? ? ? if(columns.size()>0){

? ? ? ? ? ? ? RedisUtil.delKey("user:"+ columns.get(0).getValue());

? ? ? ? ? }

? ?} ??

} ?

//// redis 工具類

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisPoolConfig;

public class RedisUtil {

? ? // Redis服務(wù)器IP

? ? private static String ADDR = "0.0.0.0";

? ? // Redis的端口號

? ? private static int PORT = 6379;

? ? // 訪問密碼

? ? //private static String AUTH = "admin";

? ? // 可用連接實例的最大數(shù)目,默認(rèn)值為8运挫;

? ? // 如果賦值為-1状共,則表示不限制;如果pool已經(jīng)分配了maxActive個jedis實例谁帕,則此時pool的狀態(tài)為exhausted(耗盡)峡继。

? ? private static int MAX_ACTIVE = 1024;

? ? // 控制一個pool最多有多少個狀態(tài)為idle(空閑的)的jedis實例,默認(rèn)值也是8雇卷。

? ? private static int MAX_IDLE = 200;

? ? // 等待可用連接的最大時間鬓椭,單位毫秒,默認(rèn)值為-1关划,表示永不超時小染。如果超過等待時間,則直接拋出JedisConnectionException贮折;

? ? private static int MAX_WAIT = 10000;

? ? // 過期時間

? ? protected static int ?expireTime = 60 * 60 *24;

? ? // 連接池

? ? protected static JedisPool pool;

? ? /**

? ? ?* 靜態(tài)代碼裤翩,只在初次調(diào)用一次

? ? ?*/

? ? static {

? ? ? ? JedisPoolConfig config = new JedisPoolConfig();

? ? ? ? //最大連接數(shù)

? ? ? ? config.setMaxTotal(MAX_ACTIVE);

? ? ? ? //最多空閑實例

? ? ? ? config.setMaxIdle(MAX_IDLE);

? ? ? ? //超時時間

? ? ? ? config.setMaxWaitMillis(MAX_WAIT);

? ? ? ? //

? ? ? ? config.setTestOnBorrow(false);

? ? ? ? pool = new JedisPool(config, ADDR, PORT, 1000);

? ? }

? ? /**

? ? ?* 獲取jedis實例

? ? ?*/

? ? protected static synchronized Jedis getJedis() {

? ? ? ? Jedis jedis = null;

? ? ? ? try {

? ? ? ? ? ? jedis = pool.getResource();

? ? ? ? } catch (Exception e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? ? ? if (jedis != null) {

? ? ? ? ? ? ? ? pool.returnBrokenResource(jedis);

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? return jedis;

? ? }

? ? /**

? ? ?* 釋放jedis資源

? ? ?* @param jedis

? ? ?* @param isBroken

? ? ?*/

? ? protected static void closeResource(Jedis jedis, boolean isBroken) {

? ? ? ? try {

? ? ? ? ? ? if (isBroken) {

? ? ? ? ? ? ? ? pool.returnBrokenResource(jedis);

? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? pool.returnResource(jedis);

? ? ? ? ? ? }

? ? ? ? } catch (Exception e) {

? ? ? ? }

? ? }

? ? /**

? ? ?* 是否存在key

? ? ?* @param key

? ? ?*/

? ? public static boolean existKey(String key) {

? ? ? ? Jedis jedis = null;

? ? ? ? boolean isBroken = false;

? ? ? ? try {

? ? ? ? ? ? jedis = getJedis();

? ? ? ? ? ? jedis.select(0);

? ? ? ? ? ? return jedis.exists(key);

? ? ? ? } catch (Exception e) {

? ? ? ? ? ? isBroken = true;

? ? ? ? } finally {

? ? ? ? ? ? closeResource(jedis, isBroken);

? ? ? ? }

? ? ? ? return false;

? ? }

? ? /**

? ? ?* 刪除key

? ? ?* @param key

? ? ?*/

? ? public static void delKey(String key) {

? ? ? ? Jedis jedis = null;

? ? ? ? boolean isBroken = false;

? ? ? ? try {

? ? ? ? ? ? jedis = getJedis();

? ? ? ? ? ? jedis.select(0);

? ? ? ? ? ? jedis.del(key);

? ? ? ? } catch (Exception e) {

? ? ? ? ? ? isBroken = true;

? ? ? ? } finally {

? ? ? ? ? ? closeResource(jedis, isBroken);

? ? ? ? }

? ? }

? ? /**

? ? ?* 取得key的值

? ? ?* @param key

? ? ?*/

? ? public static String stringGet(String key) {

? ? ? ? Jedis jedis = null;

? ? ? ? boolean isBroken = false;

? ? ? ? String lastVal = null;

? ? ? ? try {

? ? ? ? ? ? jedis = getJedis();

? ? ? ? ? ? jedis.select(0);

? ? ? ? ? ? lastVal = jedis.get(key);

? ? ? ? ? ? jedis.expire(key, expireTime);

? ? ? ? } catch (Exception e) {

? ? ? ? ? ? isBroken = true;

? ? ? ? } finally {

? ? ? ? ? ? closeResource(jedis, isBroken);

? ? ? ? }

? ? ? ? return lastVal;

? ? }

? ? /**

? ? ?* 添加string數(shù)據(jù)

? ? ?* @param key

? ? ?* @param value

? ? ?*/

? ? public static String stringSet(String key, String value) {

? ? ? ? Jedis jedis = null;

? ? ? ? boolean isBroken = false;

? ? ? ? String lastVal = null;

? ? ? ? try {

? ? ? ? ? ? jedis = getJedis();

? ? ? ? ? ? jedis.select(0);

? ? ? ? ? ? lastVal = jedis.set(key, value);

? ? ? ? ? ? jedis.expire(key, expireTime);

? ? ? ? } catch (Exception e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? ? ? isBroken = true;

? ? ? ? } finally {

? ? ? ? ? ? closeResource(jedis, isBroken);

? ? ? ? }

? ? ? ? return lastVal;

? ? }

? ? /**

? ? ?* ?添加hash數(shù)據(jù)

? ? ?* @param key

? ? ?* @param field

? ? ?* @param value

? ? ?*/

? ? public static void hashSet(String key, String field, String value) {

? ? ? ? boolean isBroken = false;

? ? ? ? Jedis jedis = null;

? ? ? ? try {

? ? ? ? ? ? jedis = getJedis();

? ? ? ? ? ? if (jedis != null) {

? ? ? ? ? ? ? ? jedis.select(0);

? ? ? ? ? ? ? ? jedis.hset(key, field, value);

? ? ? ? ? ? ? ? jedis.expire(key, expireTime);

? ? ? ? ? ? }

? ? ? ? } catch (Exception e) {

? ? ? ? ? ? isBroken = true;

? ? ? ? } finally {

? ? ? ? ? ? closeResource(jedis, isBroken);

? ? ? ? }

? ? }

}

4.3? canal客戶端啟動....

4.4 ?編寫數(shù)據(jù)庫存儲過程測試數(shù)據(jù) 。

4.4.1 ?寫存儲過程 ? test

????????DROP PROCEDURE test;????

????????DELIMITER $$

????????CREATE PROCEDURE test(IN number INT)

????????BEGIN?

? ? ? ????????DECLARE i INT(8) DEFAULT 0;

? ? ? ????????SET i=1;

? ? ? ????????WHILE i<=number DO

?INSERT INTO student(NAME,age) VALUES('ceshi',i);

?SET i=i+1;

? ? ? ????????END WHILE;

? ? ?END$$

4.4.2 執(zhí)行存儲過程 call test(10000);

備注:

1.如果是同步的兩個表情況下 调榄,相同的Id 將會覆蓋 ?所以建議同步一個表

2.經(jīng)過測試踊赠,3個列,存儲過程連續(xù)insert ?20000條記錄?,沒有明顯延遲每庆。

3.存儲過程insert 30000記錄 筐带,執(zhí)行時間11.826秒 ?,緩存數(shù)據(jù)庫有明顯延遲缤灵。

4.spring boot 做微服務(wù)時伦籍,加入canal 包會報錯蓝晒。 按下面的寫法排除:

?<dependency>

????????????<groupId>com.alibaba.otter</groupId>

????????????<artifactId>canal.client</artifactId>

????????????<version>1.0.24</version>

????????????<exclusions>

????????????????<exclusion>

????????????????????<groupId>ch.qos.logback</groupId>

????????????????????<artifactId>logback-core</artifactId>

????????? ? ? ?</exclusion>

????????? ? ? ?<exclusion>

????????????? ? ???<groupId>ch.qos.logback</groupId>

????????????? ? ??<artifactId>logback-classic</artifactId>

????? ? ? ? ? </exclusion>

? ? ? ? ? ? ?<exclusion>

? ? ? ? ? ? ? ?? <groupId>org.springframework</groupId>

? ? ? ? ? ? ? ? <artifactId>spring</artifactId>

? ? ? ? ? ? </exclusion>

? ? ? ?</exclusions>

</dependency>

5.測試中我使用了spring boot 啟動成功后直接執(zhí)行canal 客戶端程序。需要做如下配置

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.ApplicationListener;

import org.springframework.context.event.ContextRefreshedEvent;

import org.springframework.stereotype.Component;

/**

?* Created by mark on 2017/9/21.

?*/

@Component

public class ServiceListener implements ApplicationListener<ContextRefreshedEvent> {

? ?@Autowired

? ? private ?CanalClientI canalClientI;

? ? @Override

? ? public void onApplicationEvent(ContextRefreshedEvent event) {

? ? ? ? if(event.getApplicationContext().getParent() == null)//root application context 沒有parent帖鸦,他就是老大.

? ? ? ? {

? ? ? ? ? ? ? ? //需要執(zhí)行的邏輯代碼芝薇,當(dāng)spring容器初始化完成后就會執(zhí)行該方法。

? ? ? ? ? ? ???System.out.println("\n\n\n\n\n______________\n\n\n加載了\n\n_________\n\n");

?canalClientI.run(); ?


? ? ? ? }

? ? }

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末作儿,一起剝皮案震驚了整個濱河市洛二,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌攻锰,老刑警劉巖晾嘶,帶你破解...
    沈念sama閱讀 218,204評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異口注,居然都是意外死亡变擒,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,091評論 3 395
  • 文/潘曉璐 我一進(jìn)店門寝志,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人策添,你說我怎么就攤上這事材部。” “怎么了唯竹?”我有些...
    開封第一講書人閱讀 164,548評論 0 354
  • 文/不壞的土叔 我叫張陵乐导,是天一觀的道長。 經(jīng)常有香客問我浸颓,道長物臂,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,657評論 1 293
  • 正文 為了忘掉前任产上,我火速辦了婚禮棵磷,結(jié)果婚禮上,老公的妹妹穿的比我還像新娘晋涣。我一直安慰自己仪媒,他們只是感情好,可當(dāng)我...
    茶點故事閱讀 67,689評論 6 392
  • 文/花漫 我一把揭開白布谢鹊。 她就那樣靜靜地躺著算吩,像睡著了一般。 火紅的嫁衣襯著肌膚如雪佃扼。 梳的紋絲不亂的頭發(fā)上偎巢,一...
    開封第一講書人閱讀 51,554評論 1 305
  • 那天,我揣著相機與錄音兼耀,去河邊找鬼压昼。 笑死挎扰,一個胖子當(dāng)著我的面吹牛,可吹牛的內(nèi)容都是我干的巢音。 我是一名探鬼主播遵倦,決...
    沈念sama閱讀 40,302評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼官撼!你這毒婦竟也來了梧躺?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,216評論 0 276
  • 序言:老撾萬榮一對情侶失蹤傲绣,失蹤者是張志新(化名)和其女友劉穎掠哥,沒想到半個月后,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體秃诵,經(jīng)...
    沈念sama閱讀 45,661評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡续搀,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,851評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了菠净。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片禁舷。...
    茶點故事閱讀 39,977評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖毅往,靈堂內(nèi)的尸體忽然破棺而出牵咙,到底是詐尸還是另有隱情,我是刑警寧澤攀唯,帶...
    沈念sama閱讀 35,697評論 5 347
  • 正文 年R本政府宣布洁桌,位于F島的核電站,受9級特大地震影響侯嘀,放射性物質(zhì)發(fā)生泄漏另凌。R本人自食惡果不足惜,卻給世界環(huán)境...
    茶點故事閱讀 41,306評論 3 330
  • 文/蒙蒙 一戒幔、第九天 我趴在偏房一處隱蔽的房頂上張望吠谢。 院中可真熱鬧,春花似錦溪食、人聲如沸囊卜。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,898評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽栅组。三九已至,卻和暖如春枢析,著一層夾襖步出監(jiān)牢的瞬間玉掸,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,019評論 1 270
  • 我被黑心中介騙來泰國打工醒叁, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留司浪,地道東北人泊业。 一個月前我還...
    沈念sama閱讀 48,138評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像啊易,于是被迫代替她去往敵國和親吁伺。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當(dāng)晚...
    茶點故事閱讀 44,927評論 2 355

推薦閱讀更多精彩內(nèi)容