一 妒牙、 業(yè)務(wù)場景
使用mysql 數(shù)據(jù)存儲孽亲,Redis 做緩存查詢。
二顷牌、 架構(gòu)如下
三剪芍、實施步驟
1.下載最新版本的https://github.com/alibaba/canal/releases/
2. 可以在linux 或者windows上執(zhí)行都可以。 ?修改canal/conf/example/instance.properties
3.啟動 canal
./canal/startup.sh
4.java 連接canal
4.1 ? 配置pom.xml
redis.clientsjedis2.4.2
<dependency>? ? ? ? <groupId>com.alibaba.otter</groupId>? ? ? ? <artifactId>canal.client</artifactId>? ? ? ? <version>1.0.24</version>?
?</dependency>?
注意此處spring boot里面配置的時候會報錯窟蓝。需要處理一下罪裹。
4.2 ?編寫canal客戶的程序
import java.net.InetSocketAddress; ?import java.util.List; ?
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector; ?
import com.alibaba.otter.canal.common.utils.AddressUtils; ?
import com.alibaba.otter.canal.protocol.Message; ?
import com.alibaba.otter.canal.protocol.CanalEntry.Column; ?
import com.alibaba.otter.canal.protocol.CanalEntry.Entry; ?
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; ?
import com.alibaba.otter.canal.protocol.CanalEntry.EventType; ?
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; ?
import com.alibaba.otter.canal.protocol.CanalEntry.RowData; ?
import com.alibaba.otter.canal.client.*; ?
public class CanalClient{ ?
? ?public static void main(String args[]) { ?
? ? ? ?CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), ?
? ? ? ? ? ? ? ?11111), "example", "", ""); ?
? ? ? ?int batchSize = 1000; ?
? ? ? ?try { ?
? ? ? ? ? ?connector.connect(); ?
? ? ? ? ? ?connector.subscribe(".*\\..*"); ?
? ? ? ? ? ?connector.rollback(); ? ?
? ? ? ? ? ?while (true) { ?
? ? ? ? ? ? ? ?Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù) ?
? ? ? ? ? ? ? ?long batchId = message.getId(); ?
? ? ? ? ? ? ? ?int size = message.getEntries().size(); ?
? ? ? ? ? ? ? ?if (batchId == -1 || size == 0) { ?
? ? ? ? ? ? ? ? ? ?try { ?
? ? ? ? ? ? ? ? ? ? ? ?Thread.sleep(1000); ?
? ? ? ? ? ? ? ? ? ?} catch (InterruptedException e) { ?
? ? ? ? ? ? ? ? ? ? ? ?e.printStackTrace(); ?
? ? ? ? ? ? ? ? ? ?} ?
? ? ? ? ? ? ? ?} else { ?
? ? ? ? ? ? ? ? ? ?printEntry(message.getEntries()); ?
? ? ? ? ? ? ? ?} ?
? ? ? ? ? ? ? ?connector.ack(batchId); // 提交確認(rèn) ?
? ? ? ? ? ? ? ?// connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù) ?
? ? ? ? ? ?} ?
? ? ? ?} finally { ?
? ? ? ? ? ?connector.disconnect(); ?
? ? ? ?} ?
? ?} ?
? ?private static void printEntry( List<Entry> entrys) { ?
? ? ? ?for (Entry entry : entrys) { ?
? ? ? ? ? ?if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { ?
? ? ? ? ? ? ? ?continue; ?
? ? ? ? ? ?} ?
? ? ? ? ? ?RowChange rowChage = null; ?
? ? ? ? ? ?try { ?
? ? ? ? ? ? ? ?rowChage = RowChange.parseFrom(entry.getStoreValue()); ?
? ? ? ? ? ?} catch (Exception e) { ?
? ? ? ? ? ? ? ?throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), ?
? ? ? ? ? ? ? ? ? ? ? ?e); ?
? ? ? ? ? ?} ?
? ? ? ? ? ?EventType eventType = rowChage.getEventType(); ?
? ? ? ? ? ?System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", ?
? ? ? ? ? ? ? ? ? ?entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), ?
? ? ? ? ? ? ? ? ? ?entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), ?
? ? ? ? ? ? ? ? ? ?eventType)); ?
? ? ? ? ? ?for (RowData rowData : rowChage.getRowDatasList()) { ?
? ? ? ? ? ? ? ?if (eventType == EventType.DELETE) { ?
? ? ? ? ? ? ? ? ? ?redisDelete(rowData.getBeforeColumnsList()); ?
? ? ? ? ? ? ? ?} else if (eventType == EventType.INSERT) { ?
? ? ? ? ? ? ? ? ? ?redisInsert(rowData.getAfterColumnsList()); ?
? ? ? ? ? ? ? ?} else { ?
? ? ? ? ? ? ? ? ? ?System.out.println("-------> before"); ?
? ? ? ? ? ? ? ? ? ?printColumn(rowData.getBeforeColumnsList()); ?
? ? ? ? ? ? ? ? ? ?System.out.println("-------> after"); ?
? ? ? ? ? ? ? ? ? ?redisUpdate(rowData.getAfterColumnsList()); ?
? ? ? ? ? ? ? ?} ?
? ? ? ? ? ?} ?
? ? ? ?} ?
? ?} ?
? ?private static void printColumn( List<Column> columns) { ?
? ? ? ?for (Column column : columns) { ?
? ? ? ? ? ?System.out.println(column.getName() + " : " + column.getValue() + " ? ?update=" + column.getUpdated()); ?
? ? ? ?} ?
? ?} ?
? private static void redisInsert( List<Column> columns){
? ? ? JSONObject json=new JSONObject();
? ? ? for (Column column : columns) { ?
? ? ? ? ? json.put(column.getName(), column.getValue()); ?
? ? ? ?} ?
? ? ? if(columns.size()>0){
? ? ? ? ? RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
? ? ? }
? ?}
? private static ?void redisUpdate( List<Column> columns){
? ? ? JSONObject json=new JSONObject();
? ? ? for (Column column : columns) { ?
? ? ? ? ? json.put(column.getName(), column.getValue()); ?
? ? ? ?} ?
? ? ? if(columns.size()>0){
? ? ? ? ? RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
? ? ? }
? }
? ?private static ?void redisDelete( List<Column> columns){
? ? ? ?JSONObject json=new JSONObject();
? ? ? ? ? for (Column column : columns) { ?
? ? ? ? ? ? ? json.put(column.getName(), column.getValue()); ?
? ? ? ? ? ?} ?
? ? ? ? ? if(columns.size()>0){
? ? ? ? ? ? ? RedisUtil.delKey("user:"+ columns.get(0).getValue());
? ? ? ? ? }
? ?} ??
} ?
//// redis 工具類
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class RedisUtil {
? ? // Redis服務(wù)器IP
? ? private static String ADDR = "0.0.0.0";
? ? // Redis的端口號
? ? private static int PORT = 6379;
? ? // 訪問密碼
? ? //private static String AUTH = "admin";
? ? // 可用連接實例的最大數(shù)目,默認(rèn)值為8运挫;
? ? // 如果賦值為-1状共,則表示不限制;如果pool已經(jīng)分配了maxActive個jedis實例谁帕,則此時pool的狀態(tài)為exhausted(耗盡)峡继。
? ? private static int MAX_ACTIVE = 1024;
? ? // 控制一個pool最多有多少個狀態(tài)為idle(空閑的)的jedis實例,默認(rèn)值也是8雇卷。
? ? private static int MAX_IDLE = 200;
? ? // 等待可用連接的最大時間鬓椭,單位毫秒,默認(rèn)值為-1关划,表示永不超時小染。如果超過等待時間,則直接拋出JedisConnectionException贮折;
? ? private static int MAX_WAIT = 10000;
? ? // 過期時間
? ? protected static int ?expireTime = 60 * 60 *24;
? ? // 連接池
? ? protected static JedisPool pool;
? ? /**
? ? ?* 靜態(tài)代碼裤翩,只在初次調(diào)用一次
? ? ?*/
? ? static {
? ? ? ? JedisPoolConfig config = new JedisPoolConfig();
? ? ? ? //最大連接數(shù)
? ? ? ? config.setMaxTotal(MAX_ACTIVE);
? ? ? ? //最多空閑實例
? ? ? ? config.setMaxIdle(MAX_IDLE);
? ? ? ? //超時時間
? ? ? ? config.setMaxWaitMillis(MAX_WAIT);
? ? ? ? //
? ? ? ? config.setTestOnBorrow(false);
? ? ? ? pool = new JedisPool(config, ADDR, PORT, 1000);
? ? }
? ? /**
? ? ?* 獲取jedis實例
? ? ?*/
? ? protected static synchronized Jedis getJedis() {
? ? ? ? Jedis jedis = null;
? ? ? ? try {
? ? ? ? ? ? jedis = pool.getResource();
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? if (jedis != null) {
? ? ? ? ? ? ? ? pool.returnBrokenResource(jedis);
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? return jedis;
? ? }
? ? /**
? ? ?* 釋放jedis資源
? ? ?* @param jedis
? ? ?* @param isBroken
? ? ?*/
? ? protected static void closeResource(Jedis jedis, boolean isBroken) {
? ? ? ? try {
? ? ? ? ? ? if (isBroken) {
? ? ? ? ? ? ? ? pool.returnBrokenResource(jedis);
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? pool.returnResource(jedis);
? ? ? ? ? ? }
? ? ? ? } catch (Exception e) {
? ? ? ? }
? ? }
? ? /**
? ? ?* 是否存在key
? ? ?* @param key
? ? ?*/
? ? public static boolean existKey(String key) {
? ? ? ? Jedis jedis = null;
? ? ? ? boolean isBroken = false;
? ? ? ? try {
? ? ? ? ? ? jedis = getJedis();
? ? ? ? ? ? jedis.select(0);
? ? ? ? ? ? return jedis.exists(key);
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? isBroken = true;
? ? ? ? } finally {
? ? ? ? ? ? closeResource(jedis, isBroken);
? ? ? ? }
? ? ? ? return false;
? ? }
? ? /**
? ? ?* 刪除key
? ? ?* @param key
? ? ?*/
? ? public static void delKey(String key) {
? ? ? ? Jedis jedis = null;
? ? ? ? boolean isBroken = false;
? ? ? ? try {
? ? ? ? ? ? jedis = getJedis();
? ? ? ? ? ? jedis.select(0);
? ? ? ? ? ? jedis.del(key);
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? isBroken = true;
? ? ? ? } finally {
? ? ? ? ? ? closeResource(jedis, isBroken);
? ? ? ? }
? ? }
? ? /**
? ? ?* 取得key的值
? ? ?* @param key
? ? ?*/
? ? public static String stringGet(String key) {
? ? ? ? Jedis jedis = null;
? ? ? ? boolean isBroken = false;
? ? ? ? String lastVal = null;
? ? ? ? try {
? ? ? ? ? ? jedis = getJedis();
? ? ? ? ? ? jedis.select(0);
? ? ? ? ? ? lastVal = jedis.get(key);
? ? ? ? ? ? jedis.expire(key, expireTime);
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? isBroken = true;
? ? ? ? } finally {
? ? ? ? ? ? closeResource(jedis, isBroken);
? ? ? ? }
? ? ? ? return lastVal;
? ? }
? ? /**
? ? ?* 添加string數(shù)據(jù)
? ? ?* @param key
? ? ?* @param value
? ? ?*/
? ? public static String stringSet(String key, String value) {
? ? ? ? Jedis jedis = null;
? ? ? ? boolean isBroken = false;
? ? ? ? String lastVal = null;
? ? ? ? try {
? ? ? ? ? ? jedis = getJedis();
? ? ? ? ? ? jedis.select(0);
? ? ? ? ? ? lastVal = jedis.set(key, value);
? ? ? ? ? ? jedis.expire(key, expireTime);
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? isBroken = true;
? ? ? ? } finally {
? ? ? ? ? ? closeResource(jedis, isBroken);
? ? ? ? }
? ? ? ? return lastVal;
? ? }
? ? /**
? ? ?* ?添加hash數(shù)據(jù)
? ? ?* @param key
? ? ?* @param field
? ? ?* @param value
? ? ?*/
? ? public static void hashSet(String key, String field, String value) {
? ? ? ? boolean isBroken = false;
? ? ? ? Jedis jedis = null;
? ? ? ? try {
? ? ? ? ? ? jedis = getJedis();
? ? ? ? ? ? if (jedis != null) {
? ? ? ? ? ? ? ? jedis.select(0);
? ? ? ? ? ? ? ? jedis.hset(key, field, value);
? ? ? ? ? ? ? ? jedis.expire(key, expireTime);
? ? ? ? ? ? }
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? isBroken = true;
? ? ? ? } finally {
? ? ? ? ? ? closeResource(jedis, isBroken);
? ? ? ? }
? ? }
}
4.3? canal客戶端啟動....
4.4 ?編寫數(shù)據(jù)庫存儲過程測試數(shù)據(jù) 。
4.4.1 ?寫存儲過程 ? test
????????DROP PROCEDURE test;????
????????DELIMITER $$
????????CREATE PROCEDURE test(IN number INT)
????????BEGIN?
? ? ? ????????DECLARE i INT(8) DEFAULT 0;
? ? ? ????????SET i=1;
? ? ? ????????WHILE i<=number DO
?INSERT INTO student(NAME,age) VALUES('ceshi',i);
?SET i=i+1;
? ? ? ????????END WHILE;
? ? ?END$$
4.4.2 執(zhí)行存儲過程 call test(10000);
備注:
1.如果是同步的兩個表情況下 调榄,相同的Id 將會覆蓋 ?所以建議同步一個表
2.經(jīng)過測試踊赠,3個列,存儲過程連續(xù)insert ?20000條記錄?,沒有明顯延遲每庆。
3.存儲過程insert 30000記錄 筐带,執(zhí)行時間11.826秒 ?,緩存數(shù)據(jù)庫有明顯延遲缤灵。
4.spring boot 做微服務(wù)時伦籍,加入canal 包會報錯蓝晒。 按下面的寫法排除:
?<dependency>
????????????<groupId>com.alibaba.otter</groupId>
????????????<artifactId>canal.client</artifactId>
????????????<version>1.0.24</version>
????????????<exclusions>
????????????????<exclusion>
????????????????????<groupId>ch.qos.logback</groupId>
????????????????????<artifactId>logback-core</artifactId>
????????? ? ? ?</exclusion>
????????? ? ? ?<exclusion>
????????????? ? ???<groupId>ch.qos.logback</groupId>
????????????? ? ??<artifactId>logback-classic</artifactId>
????? ? ? ? ? </exclusion>
? ? ? ? ? ? ?<exclusion>
? ? ? ? ? ? ? ?? <groupId>org.springframework</groupId>
? ? ? ? ? ? ? ? <artifactId>spring</artifactId>
? ? ? ? ? ? </exclusion>
? ? ? ?</exclusions>
</dependency>
5.測試中我使用了spring boot 啟動成功后直接執(zhí)行canal 客戶端程序。需要做如下配置
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
/**
?* Created by mark on 2017/9/21.
?*/
@Component
public class ServiceListener implements ApplicationListener<ContextRefreshedEvent> {
? ?@Autowired
? ? private ?CanalClientI canalClientI;
? ? @Override
? ? public void onApplicationEvent(ContextRefreshedEvent event) {
? ? ? ? if(event.getApplicationContext().getParent() == null)//root application context 沒有parent帖鸦,他就是老大.
? ? ? ? {
? ? ? ? ? ? ? ? //需要執(zhí)行的邏輯代碼芝薇,當(dāng)spring容器初始化完成后就會執(zhí)行該方法。
? ? ? ? ? ? ???System.out.println("\n\n\n\n\n______________\n\n\n加載了\n\n_________\n\n");
?canalClientI.run(); ?
? ? ? ? }
? ? }
}