站點間數(shù)據(jù)同步系統(tǒng)設計與實現(xiàn) 201709@wedoctor

項目上最近有在不同地區(qū)的站點間同步應用數(shù)據(jù)的需求宙彪,為此開始了同步系統(tǒng)的設計和開發(fā)矩动。
相關代碼:wangshuai@github


系統(tǒng)的大體架構如下


開發(fā)過程中的問題:

  • 采集數(shù)據(jù)的形式
  • dubbo服務的提供和區(qū)分
  • 被同步數(shù)據(jù)落地的實現(xiàn)
  • 兩個站點間數(shù)據(jù)的關聯(lián)

采集數(shù)據(jù)的形式

在所有的DAO方法(在這里是Mybatis的mapper)上增加一個切面,攔截增释漆、刪悲没、改數(shù)據(jù)的ID,并將數(shù)據(jù)ID + 表名 + 應用名信息通過dubbo調用記錄至同步系統(tǒng)的數(shù)據(jù)庫中男图,這樣當同步任務調用時示姿,同步系統(tǒng)就能根據(jù)記錄的數(shù)據(jù)去相應的應用(業(yè)務系統(tǒng))通過dubbo服務取出具體的業(yè)務數(shù)據(jù)甜橱,并整合、通過HTTP將數(shù)據(jù)發(fā)送出去栈戳。

同時在所有的manager上也增加一個切面岂傲,配合緩存實現(xiàn)相關的事務控制。


dubbo服務的提供和區(qū)分

由于業(yè)務系統(tǒng)的具體數(shù)據(jù)只能由業(yè)務系統(tǒng)提供和操作子檀,同步系統(tǒng)無法直接訪問業(yè)務系統(tǒng)的數(shù)據(jù)庫表镊掖,因此同步系統(tǒng)和業(yè)務系統(tǒng)間的數(shù)據(jù)交互只能通過dubbo服務實現(xiàn)。

每個業(yè)務系統(tǒng)都會提供自己的數(shù)據(jù)服務褂痰,并且功能都相同亩进。因此這里我采用的是同步系統(tǒng)提供一個client包的方式,業(yè)務系統(tǒng)依賴這個client包就會在應用啟動時自動提供相關服務缩歪。

數(shù)據(jù)查詢服務及實現(xiàn):

/**
 * 同步系統(tǒng)查詢各業(yè)務系統(tǒng)數(shù)據(jù)服務
 *
 * @author wangshuai
 * @version V1.0
 * @since 2017-09-14 15:32
 */
public interface SyncDataQueryService {

    /**
     * 查詢數(shù)據(jù)接口
     *
     * @param tabName
     * @param id
     * @return
     */
    ResponseDTO<String> querySyncableData(Long id, String tabName);

}

/**
 * TODO
 *
 * @author wangshuai
 * @version V1.0
 * @since 2017-09-14 13:53
 */
@Component
public class SyncDataQueryServiceImpl implements SyncDataQueryService {

    private static final Logger LOGGER = LoggerFactory.getLogger(SyncDataQueryServiceImpl.class);
    
    @Value("${dubbo.application.name}")
    private static String application;

    @Reference
    private MapperConfigService mapperConfigService;

    public ResponseDTO<String> querySyncableData(Long id, String tabName) {
        ResponseDTO<String> responseDTO = new ResponseDTO<>(ExceptionEnum.SUCCESS);
        try {
            MapperConfigDTO mapperConfigDTO = mapperConfigService.findByAppAndTabname(application, tabName).getData();
            ......
            
            String mapperClass = mapperConfigDTO.getMapperClass();
            SyncableMapper mapper = (SyncableMapper) SpringUtil.getBean(Class.forName(mapperClass));
            ......

            Map resultMap = mapper.findSyncableDataById(id);
            ......
            responseDTO.setData(JSON.toJSONString(resultMap));
        } catch (ClassNotFoundException e) {
           ......
        }
        return responseDTO;
    }
}

數(shù)據(jù)落地服務接口及實現(xiàn):

/**
 * @Type SyncOperateService
 * @Desc 同步系統(tǒng)操作各業(yè)務系統(tǒng)數(shù)據(jù)服務
 * @author liuhj 
 * @created 2017年9月20日 下午1:45:55
 * @version 1.0.0
 */
public interface SyncDataOperateService {
    
    /**
     * 根據(jù)操作類型進行同步方法操作接口
     *
     * @param id
     * @param relationId 
     * @param tabName
     * @param params
     * @param operateType
     * @return
     */
    ResponseDTO<String> operateSyncableData(Long id, Long relationId, String tabName, Map<String, Object> params, String operateType);

}

/**
 * @Type SyncOperateServiceImpl
 * @Desc 同步系統(tǒng)操作各業(yè)務系統(tǒng)數(shù)據(jù)服務
 * @author liuhj
 * @created 2017年9月20日 下午1:53:26
 * @version 1.0.0
 */
@Component
public class SyncDataOperateServiceImpl implements SyncDataOperateService {

    private static final Logger LOGGER = LoggerFactory.getLogger(SyncDataOperateServiceImpl.class);

    @Value("${dubbo.application.name}")
    private static String application;

    /**
     * 配置系統(tǒng)服務
     */
    @Reference
    private MapperConfigService mapperConfigService;

    @Override
    public ResponseDTO<String> operateSyncableData(Long id, Long relationId, String tabName, Map<String, Object> params,
            String operateType) {
        ResponseDTO<String> responseDTO = new ResponseDTO<>(ExceptionEnum.SUCCESS);
        try {
            MapperConfigDTO mapperConfigDTO = mapperConfigService.findByAppAndTabname(application, tabName).getData();
            ......
            String mapperClass = mapperConfigDTO.getMapperClass();
            SyncableMapper mapper = (SyncableMapper) SpringUtil.getBean(Class.forName(mapperClass));
            ......
            int count = 0;
            Long callBackId = null;
            if ("I".equalsIgnoreCase(operateType)) {
                count = mapper.insertSyncableData(params);
                callBackId = (long) (int) params.get("id");
            }
            if ("U".equalsIgnoreCase(operateType)) {
                params.put("id", relationId);
                count = mapper.updateSyncableData(params);
                callBackId = null;
            }
            if ("D".equalsIgnoreCase(operateType)) {
                count = mapper.deleteSyncableDataById(relationId);
                callBackId = null;
            }
            ......
            responseDTO.setData(String.valueOf(callBackId));
        } catch (Exception e) {
            LOGGER.error("SyncDataOperateServiceImpl.operateSyncableData ->加載mapper類失敗", e);
            responseDTO.setDataMessage(ExceptionEnum.ERROR.getCode(), "加載mapper類失敗");
            return responseDTO;
        }
        return responseDTO;
    }
}

通過java API在應用啟動時動態(tài)注冊dubbo服務(目的是通過version將不同的服務區(qū)分開來):

/**
 * 各業(yè)務系統(tǒng)注冊不同的服務實現(xiàn)類
 * @author wangshuai
 * @version V1.0
 * @since 2017-09-20 11:13
 */
@Component
public class DubboServiceRegister {

    @Value("${dubbo.application.name}")
    private String application;

    @Value("${dubbo.registry.address}")
    private String zkAddress;

    @Value("${dubbo.protocol.port}")
    private int dubboPort;

    /**
     * 服務實現(xiàn)
     */
    @Resource
    private SyncDataQueryService syncDataQueryService;

    @Resource
    private SyncDataOperateService syncDataOperateService;

    @PostConstruct
    public void regist() {

        //當前應用配置
        ApplicationConfig applicationConfig = new ApplicationConfig();
        applicationConfig.setName(application);

        //連接注冊中心配置
        RegistryConfig registryConfig = new RegistryConfig();
        registryConfig.setAddress(zkAddress);

        //服務提供者協(xié)議配置
        ProtocolConfig protocolConfig = new ProtocolConfig();
        protocolConfig.setName("dubbo");
        protocolConfig.setPort(dubboPort);

        //服務提供者暴露服務配置
        ServiceConfig<SyncDataQueryService> dataQueryServiceConfig = new ServiceConfig<>();
        dataQueryServiceConfig.setApplication(applicationConfig);
        dataQueryServiceConfig.setRegistry(registryConfig);
        dataQueryServiceConfig.setProtocol(protocolConfig);
        dataQueryServiceConfig.setInterface(SyncDataQueryService.class);
        dataQueryServiceConfig.setRef(syncDataQueryService);
        dataQueryServiceConfig.setVersion(application);

        ServiceConfig<SyncDataOperateService> dataOperateServiceConfig = new ServiceConfig<>();
        dataOperateServiceConfig.setApplication(applicationConfig);
        dataOperateServiceConfig.setRegistry(registryConfig);
        dataOperateServiceConfig.setProtocol(protocolConfig);
        dataOperateServiceConfig.setInterface(SyncDataOperateService.class);
        dataOperateServiceConfig.setRef(syncDataOperateService);
        dataOperateServiceConfig.setVersion(application);

        //暴露及注冊服務
        dataQueryServiceConfig.export();
        dataOperateServiceConfig.export();
    }
}

如此归薛,在同步系統(tǒng)調用服務時,就能通過version區(qū)分到對應的服務驶冒。


被同步數(shù)據(jù)落地的實現(xiàn)

當要同步的數(shù)據(jù)以JSON的形式到達分站點(被同步站點)時苟翻,被API系統(tǒng)接入并通過dubbo將數(shù)據(jù)傳給同步系統(tǒng)韵卤,同步系統(tǒng)再調用各站點的數(shù)據(jù)落地服務實現(xiàn)類骗污,這里就有一個問題:數(shù)據(jù)如何落地到數(shù)據(jù)庫中?如果直接動態(tài)拼接sql直接執(zhí)行是一種比較簡單的實現(xiàn)方式沈条,但是我們這里使用的是mybatis需忿,那么如何找到對應的mapper并動態(tài)執(zhí)行插入、更新蜡歹、刪除的方法屋厘?

在這里我的設計是定義一個父接口:

public interface SyncableMapper {

    Map findSyncableDataById(Long id);

    int insertSyncableData(Map paramMap);

    int updateSyncableData(Map paramMap);

    int deleteSyncableDataById(Long id);

}

這個接口定義了對業(yè)務系統(tǒng)數(shù)據(jù)表進行同步的公共方法,然后我讓需要同步的mapper接口繼承這個公共接口月而,并在mapper-xml里定義sql汗洒,這樣同步時直接操作父接口的引用(實現(xiàn)類從spring上下文中根據(jù)條件動態(tài)獲取):

@Repository("dubboApplicationDAO")
public interface DubboApplicationMapper extends SyncableMapper {

    ......
}

獲取對應mapper并操作的代碼(SyncDataOperateServiceImpl 類中):

//從配置中獲取mapper的類的全限定名
String mapperClass = mapperConfigDTO.getMapperClass();

//動態(tài)獲取操作對應表的mapper
SyncableMapper mapper = (SyncableMapper) SpringUtil.getBean(Class.forName(mapperClass));
......

if ("I".equalsIgnoreCase(operateType)) {
   mapper.insertSyncableData(params);
}
if ("U".equalsIgnoreCase(operateType)) {
    params.put("id", relationId);
    count = mapper.updateSyncableData(params);
}
if ("D".equalsIgnoreCase(operateType)) {
    count = mapper.deleteSyncableDataById(relationId);
}

那么自然的父款,有了上述規(guī)則后溢谤,采集同步數(shù)據(jù)時我們只記錄繼承自我們定義的父接口的mapper的操作數(shù)據(jù)(在執(zhí)行采集切面時加入判斷):

public void doCollect(JoinPoint point) {
        Class mapperType = point.getSignature().getDeclaringType();

        if(mapperType.equals(SyncableMapper.class)) {
            //如果執(zhí)行的是SyncableMapper接口定義的方法 則不執(zhí)行切面邏輯(只對實際的業(yè)務操作進行攔截)
            System.out.println("執(zhí)行的是SyncableMapper接口定義的方法 不執(zhí)行切面邏輯");
            return;
        }

        Class[] classes = point.getSignature().getDeclaringType().getInterfaces();
        boolean flag = Arrays.stream(classes).anyMatch((c) -> c.equals(SyncableMapper.class));
        if(!flag) {
            //如果被攔截的mapper沒有實現(xiàn)SyncableMapper接口,則不對其進行同步操作
            System.out.println("被攔截的mapper沒有實現(xiàn)SyncableMapper接口憨攒,不對其進行同步操作");
            return;
        }

        String fullMethodSignature = point.getSignature().getDeclaringTypeName() + "." + point.getSignature().getName();
        String sqlType = mybatisSqlUtils.getSqlType(fullMethodSignature);

        if("SELECT".equalsIgnoreCase(sqlType)) {
            return;
        }
       ......//
}

兩個站點間數(shù)據(jù)的關聯(lián)

假設主站點中的某張表的某一條數(shù)據(jù)世杀, id為A, 將其同步至分站點對應表中后的id是B肝集,那么我們應該在數(shù)據(jù)在分站點中插入時瞻坝,記錄A、B兩個id之間的關聯(lián)杏瞻,方便在做同步更新或者刪除操作時使用:

CREATE TABLE `cloud_sync_relation` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '序號',
  `gmt_created` datetime NOT NULL COMMENT '創(chuàng)建時間',
  `gmt_modified` datetime NOT NULL COMMENT '修改時間',
  `remark` varchar(255) NOT NULL COMMENT '備注',
  `is_deleted` tinyint(2) NOT NULL COMMENT '是否刪除 0否1是',
  `from_id` bigint(20) NOT NULL COMMENT '關聯(lián)方id',
  `to_id` bigint(20) NOT NULL COMMENT '被關聯(lián)方id',
  `relate_site_code` varchar(50) NOT NULL COMMENT '關聯(lián)站點code',
  `relate_resource` varchar(50) NOT NULL COMMENT '關聯(lián)表名',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='同步系統(tǒng)關聯(lián)關系'

附:

動態(tài)獲取dubbo服務對象幫助類:

/**
 * 動態(tài)獲取dubbo服務對象幫助類
 * @author wangshuai
 * @version V1.0
 * @since 2017-09-15 16:34
 */
@Component
public class DubboUtil {

    public static final String VERSION = "VERSION";

    public static final String GROUP = "GROUP";

    public static final String ZOOKEEPERURL = "ZOOKEEPERURL";

    public static final String APPLICATION = "APPLICATION";

    @Value("${dubbo.application.name}")
    private String appName;

    @Value("${dubbo.registry.address}")
    private String zkUrl;

    public <T> T getDubboService(Class<T> clazz, Map<String, String> paramMap){
        ApplicationConfig applicationConfig = new ApplicationConfig();

        String app = paramMap.get(APPLICATION);
        if(app != null && !"".equals(app)) {
            applicationConfig.setName(app);
        } else {
            applicationConfig.setName(appName);
        }


        RegistryConfig registryConfig = new RegistryConfig();

        String zookeeperurl = paramMap.get(ZOOKEEPERURL);
        if(zookeeperurl != null && !"".equals(zookeeperurl)) {
            registryConfig.setAddress(zookeeperurl);
        } else {
            registryConfig.setAddress(zkUrl);
        }

        applicationConfig.setRegistry(registryConfig);

        ReferenceConfig<T> rc = new ReferenceConfig<T>();
        rc.setApplication(applicationConfig);

        String version = paramMap.get(VERSION);
        if(version != null && !"".equals(version)) {
            rc.setVersion(version);
        }

        String group = paramMap.get(GROUP);
        if(group != null && !"".equals(group)) {
            rc.setGroup(group);
        }
        rc.setInterface(clazz);

        //緩存ReferenceConfig對象,防止內存和連接泄露
        ReferenceConfigCache cache = ReferenceConfigCache.getCache();
        return cache.get(rc);
    }

}
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末所刀,一起剝皮案震驚了整個濱河市衙荐,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌勉痴,老刑警劉巖赫模,帶你破解...
    沈念sama閱讀 218,036評論 6 506
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異蒸矛,居然都是意外死亡瀑罗,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,046評論 3 395
  • 文/潘曉璐 我一進店門雏掠,熙熙樓的掌柜王于貴愁眉苦臉地迎上來斩祭,“玉大人,你說我怎么就攤上這事乡话〈菝担” “怎么了?”我有些...
    開封第一講書人閱讀 164,411評論 0 354
  • 文/不壞的土叔 我叫張陵绑青,是天一觀的道長诬像。 經(jīng)常有香客問我,道長闸婴,這世上最難降的妖魔是什么坏挠? 我笑而不...
    開封第一講書人閱讀 58,622評論 1 293
  • 正文 為了忘掉前任,我火速辦了婚禮邪乍,結果婚禮上降狠,老公的妹妹穿的比我還像新娘。我一直安慰自己庇楞,他們只是感情好榜配,可當我...
    茶點故事閱讀 67,661評論 6 392
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著吕晌,像睡著了一般蛋褥。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上睛驳,一...
    開封第一講書人閱讀 51,521評論 1 304
  • 那天烙心,我揣著相機與錄音,去河邊找鬼柏靶。 笑死弃理,一個胖子當著我的面吹牛,可吹牛的內容都是我干的屎蜓。 我是一名探鬼主播痘昌,決...
    沈念sama閱讀 40,288評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了辆苔?” 一聲冷哼從身側響起算灸,我...
    開封第一講書人閱讀 39,200評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎驻啤,沒想到半個月后菲驴,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體,經(jīng)...
    沈念sama閱讀 45,644評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡骑冗,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,837評論 3 336
  • 正文 我和宋清朗相戀三年赊瞬,在試婚紗的時候發(fā)現(xiàn)自己被綠了。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片贼涩。...
    茶點故事閱讀 39,953評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡巧涧,死狀恐怖,靈堂內的尸體忽然破棺而出遥倦,到底是詐尸還是另有隱情谤绳,我是刑警寧澤,帶...
    沈念sama閱讀 35,673評論 5 346
  • 正文 年R本政府宣布袒哥,位于F島的核電站缩筛,受9級特大地震影響,放射性物質發(fā)生泄漏堡称。R本人自食惡果不足惜瞎抛,卻給世界環(huán)境...
    茶點故事閱讀 41,281評論 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望粮呢。 院中可真熱鬧婿失,春花似錦钞艇、人聲如沸啄寡。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,889評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽挺物。三九已至,卻和暖如春飘弧,著一層夾襖步出監(jiān)牢的瞬間识藤,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 33,011評論 1 269
  • 我被黑心中介騙來泰國打工次伶, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留痴昧,地道東北人。 一個月前我還...
    沈念sama閱讀 48,119評論 3 370
  • 正文 我出身青樓冠王,卻偏偏與公主長得像赶撰,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,901評論 2 355

推薦閱讀更多精彩內容