項目上最近有在不同地區(qū)的站點間同步應用數(shù)據(jù)的需求宙彪,為此開始了同步系統(tǒng)的設計和開發(fā)矩动。
相關代碼:wangshuai@github
系統(tǒng)的大體架構如下:
開發(fā)過程中的問題:
- 采集數(shù)據(jù)的形式
- dubbo服務的提供和區(qū)分
- 被同步數(shù)據(jù)落地的實現(xiàn)
- 兩個站點間數(shù)據(jù)的關聯(lián)
采集數(shù)據(jù)的形式
在所有的DAO方法(在這里是Mybatis的mapper)上增加一個切面,攔截增释漆、刪悲没、改數(shù)據(jù)的ID,并將數(shù)據(jù)ID + 表名 + 應用名信息通過dubbo調用記錄至同步系統(tǒng)的數(shù)據(jù)庫中男图,這樣當同步任務調用時示姿,同步系統(tǒng)就能根據(jù)記錄的數(shù)據(jù)去相應的應用(業(yè)務系統(tǒng))通過dubbo服務取出具體的業(yè)務數(shù)據(jù)甜橱,并整合、通過HTTP將數(shù)據(jù)發(fā)送出去栈戳。
同時在所有的manager上也增加一個切面岂傲,配合緩存實現(xiàn)相關的事務控制。
dubbo服務的提供和區(qū)分
由于業(yè)務系統(tǒng)的具體數(shù)據(jù)只能由業(yè)務系統(tǒng)提供和操作子檀,同步系統(tǒng)無法直接訪問業(yè)務系統(tǒng)的數(shù)據(jù)庫表镊掖,因此同步系統(tǒng)和業(yè)務系統(tǒng)間的數(shù)據(jù)交互只能通過dubbo服務實現(xiàn)。
每個業(yè)務系統(tǒng)都會提供自己的數(shù)據(jù)服務褂痰,并且功能都相同亩进。因此這里我采用的是同步系統(tǒng)提供一個client包的方式,業(yè)務系統(tǒng)依賴這個client包就會在應用啟動時自動提供相關服務缩歪。
數(shù)據(jù)查詢服務及實現(xiàn):
/**
* 同步系統(tǒng)查詢各業(yè)務系統(tǒng)數(shù)據(jù)服務
*
* @author wangshuai
* @version V1.0
* @since 2017-09-14 15:32
*/
public interface SyncDataQueryService {
/**
* 查詢數(shù)據(jù)接口
*
* @param tabName
* @param id
* @return
*/
ResponseDTO<String> querySyncableData(Long id, String tabName);
}
/**
* TODO
*
* @author wangshuai
* @version V1.0
* @since 2017-09-14 13:53
*/
@Component
public class SyncDataQueryServiceImpl implements SyncDataQueryService {
private static final Logger LOGGER = LoggerFactory.getLogger(SyncDataQueryServiceImpl.class);
@Value("${dubbo.application.name}")
private static String application;
@Reference
private MapperConfigService mapperConfigService;
public ResponseDTO<String> querySyncableData(Long id, String tabName) {
ResponseDTO<String> responseDTO = new ResponseDTO<>(ExceptionEnum.SUCCESS);
try {
MapperConfigDTO mapperConfigDTO = mapperConfigService.findByAppAndTabname(application, tabName).getData();
......
String mapperClass = mapperConfigDTO.getMapperClass();
SyncableMapper mapper = (SyncableMapper) SpringUtil.getBean(Class.forName(mapperClass));
......
Map resultMap = mapper.findSyncableDataById(id);
......
responseDTO.setData(JSON.toJSONString(resultMap));
} catch (ClassNotFoundException e) {
......
}
return responseDTO;
}
}
數(shù)據(jù)落地服務接口及實現(xiàn):
/**
* @Type SyncOperateService
* @Desc 同步系統(tǒng)操作各業(yè)務系統(tǒng)數(shù)據(jù)服務
* @author liuhj
* @created 2017年9月20日 下午1:45:55
* @version 1.0.0
*/
public interface SyncDataOperateService {
/**
* 根據(jù)操作類型進行同步方法操作接口
*
* @param id
* @param relationId
* @param tabName
* @param params
* @param operateType
* @return
*/
ResponseDTO<String> operateSyncableData(Long id, Long relationId, String tabName, Map<String, Object> params, String operateType);
}
/**
* @Type SyncOperateServiceImpl
* @Desc 同步系統(tǒng)操作各業(yè)務系統(tǒng)數(shù)據(jù)服務
* @author liuhj
* @created 2017年9月20日 下午1:53:26
* @version 1.0.0
*/
@Component
public class SyncDataOperateServiceImpl implements SyncDataOperateService {
private static final Logger LOGGER = LoggerFactory.getLogger(SyncDataOperateServiceImpl.class);
@Value("${dubbo.application.name}")
private static String application;
/**
* 配置系統(tǒng)服務
*/
@Reference
private MapperConfigService mapperConfigService;
@Override
public ResponseDTO<String> operateSyncableData(Long id, Long relationId, String tabName, Map<String, Object> params,
String operateType) {
ResponseDTO<String> responseDTO = new ResponseDTO<>(ExceptionEnum.SUCCESS);
try {
MapperConfigDTO mapperConfigDTO = mapperConfigService.findByAppAndTabname(application, tabName).getData();
......
String mapperClass = mapperConfigDTO.getMapperClass();
SyncableMapper mapper = (SyncableMapper) SpringUtil.getBean(Class.forName(mapperClass));
......
int count = 0;
Long callBackId = null;
if ("I".equalsIgnoreCase(operateType)) {
count = mapper.insertSyncableData(params);
callBackId = (long) (int) params.get("id");
}
if ("U".equalsIgnoreCase(operateType)) {
params.put("id", relationId);
count = mapper.updateSyncableData(params);
callBackId = null;
}
if ("D".equalsIgnoreCase(operateType)) {
count = mapper.deleteSyncableDataById(relationId);
callBackId = null;
}
......
responseDTO.setData(String.valueOf(callBackId));
} catch (Exception e) {
LOGGER.error("SyncDataOperateServiceImpl.operateSyncableData ->加載mapper類失敗", e);
responseDTO.setDataMessage(ExceptionEnum.ERROR.getCode(), "加載mapper類失敗");
return responseDTO;
}
return responseDTO;
}
}
通過java API在應用啟動時動態(tài)注冊dubbo服務(目的是通過version將不同的服務區(qū)分開來):
/**
* 各業(yè)務系統(tǒng)注冊不同的服務實現(xiàn)類
* @author wangshuai
* @version V1.0
* @since 2017-09-20 11:13
*/
@Component
public class DubboServiceRegister {
@Value("${dubbo.application.name}")
private String application;
@Value("${dubbo.registry.address}")
private String zkAddress;
@Value("${dubbo.protocol.port}")
private int dubboPort;
/**
* 服務實現(xiàn)
*/
@Resource
private SyncDataQueryService syncDataQueryService;
@Resource
private SyncDataOperateService syncDataOperateService;
@PostConstruct
public void regist() {
//當前應用配置
ApplicationConfig applicationConfig = new ApplicationConfig();
applicationConfig.setName(application);
//連接注冊中心配置
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setAddress(zkAddress);
//服務提供者協(xié)議配置
ProtocolConfig protocolConfig = new ProtocolConfig();
protocolConfig.setName("dubbo");
protocolConfig.setPort(dubboPort);
//服務提供者暴露服務配置
ServiceConfig<SyncDataQueryService> dataQueryServiceConfig = new ServiceConfig<>();
dataQueryServiceConfig.setApplication(applicationConfig);
dataQueryServiceConfig.setRegistry(registryConfig);
dataQueryServiceConfig.setProtocol(protocolConfig);
dataQueryServiceConfig.setInterface(SyncDataQueryService.class);
dataQueryServiceConfig.setRef(syncDataQueryService);
dataQueryServiceConfig.setVersion(application);
ServiceConfig<SyncDataOperateService> dataOperateServiceConfig = new ServiceConfig<>();
dataOperateServiceConfig.setApplication(applicationConfig);
dataOperateServiceConfig.setRegistry(registryConfig);
dataOperateServiceConfig.setProtocol(protocolConfig);
dataOperateServiceConfig.setInterface(SyncDataOperateService.class);
dataOperateServiceConfig.setRef(syncDataOperateService);
dataOperateServiceConfig.setVersion(application);
//暴露及注冊服務
dataQueryServiceConfig.export();
dataOperateServiceConfig.export();
}
}
如此归薛,在同步系統(tǒng)調用服務時,就能通過version區(qū)分到對應的服務驶冒。
被同步數(shù)據(jù)落地的實現(xiàn)
當要同步的數(shù)據(jù)以JSON的形式到達分站點(被同步站點)時苟翻,被API系統(tǒng)接入并通過dubbo將數(shù)據(jù)傳給同步系統(tǒng)韵卤,同步系統(tǒng)再調用各站點的數(shù)據(jù)落地服務實現(xiàn)類骗污,這里就有一個問題:數(shù)據(jù)如何落地到數(shù)據(jù)庫中?如果直接動態(tài)拼接sql直接執(zhí)行是一種比較簡單的實現(xiàn)方式沈条,但是我們這里使用的是mybatis需忿,那么如何找到對應的mapper并動態(tài)執(zhí)行插入、更新蜡歹、刪除的方法屋厘?
在這里我的設計是定義一個父接口:
public interface SyncableMapper {
Map findSyncableDataById(Long id);
int insertSyncableData(Map paramMap);
int updateSyncableData(Map paramMap);
int deleteSyncableDataById(Long id);
}
這個接口定義了對業(yè)務系統(tǒng)數(shù)據(jù)表進行同步的公共方法,然后我讓需要同步的mapper接口繼承這個公共接口月而,并在mapper-xml里定義sql汗洒,這樣同步時直接操作父接口的引用(實現(xiàn)類從spring上下文中根據(jù)條件動態(tài)獲取):
@Repository("dubboApplicationDAO")
public interface DubboApplicationMapper extends SyncableMapper {
......
}
獲取對應mapper并操作的代碼(SyncDataOperateServiceImpl 類中):
//從配置中獲取mapper的類的全限定名
String mapperClass = mapperConfigDTO.getMapperClass();
//動態(tài)獲取操作對應表的mapper
SyncableMapper mapper = (SyncableMapper) SpringUtil.getBean(Class.forName(mapperClass));
......
if ("I".equalsIgnoreCase(operateType)) {
mapper.insertSyncableData(params);
}
if ("U".equalsIgnoreCase(operateType)) {
params.put("id", relationId);
count = mapper.updateSyncableData(params);
}
if ("D".equalsIgnoreCase(operateType)) {
count = mapper.deleteSyncableDataById(relationId);
}
那么自然的父款,有了上述規(guī)則后溢谤,采集同步數(shù)據(jù)時我們只記錄繼承自我們定義的父接口的mapper的操作數(shù)據(jù)(在執(zhí)行采集切面時加入判斷):
public void doCollect(JoinPoint point) {
Class mapperType = point.getSignature().getDeclaringType();
if(mapperType.equals(SyncableMapper.class)) {
//如果執(zhí)行的是SyncableMapper接口定義的方法 則不執(zhí)行切面邏輯(只對實際的業(yè)務操作進行攔截)
System.out.println("執(zhí)行的是SyncableMapper接口定義的方法 不執(zhí)行切面邏輯");
return;
}
Class[] classes = point.getSignature().getDeclaringType().getInterfaces();
boolean flag = Arrays.stream(classes).anyMatch((c) -> c.equals(SyncableMapper.class));
if(!flag) {
//如果被攔截的mapper沒有實現(xiàn)SyncableMapper接口,則不對其進行同步操作
System.out.println("被攔截的mapper沒有實現(xiàn)SyncableMapper接口憨攒,不對其進行同步操作");
return;
}
String fullMethodSignature = point.getSignature().getDeclaringTypeName() + "." + point.getSignature().getName();
String sqlType = mybatisSqlUtils.getSqlType(fullMethodSignature);
if("SELECT".equalsIgnoreCase(sqlType)) {
return;
}
......//
}
兩個站點間數(shù)據(jù)的關聯(lián)
假設主站點中的某張表的某一條數(shù)據(jù)世杀, id為A, 將其同步至分站點對應表中后的id是B肝集,那么我們應該在數(shù)據(jù)在分站點中插入時瞻坝,記錄A、B兩個id之間的關聯(lián)杏瞻,方便在做同步更新或者刪除操作時使用:
CREATE TABLE `cloud_sync_relation` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '序號',
`gmt_created` datetime NOT NULL COMMENT '創(chuàng)建時間',
`gmt_modified` datetime NOT NULL COMMENT '修改時間',
`remark` varchar(255) NOT NULL COMMENT '備注',
`is_deleted` tinyint(2) NOT NULL COMMENT '是否刪除 0否1是',
`from_id` bigint(20) NOT NULL COMMENT '關聯(lián)方id',
`to_id` bigint(20) NOT NULL COMMENT '被關聯(lián)方id',
`relate_site_code` varchar(50) NOT NULL COMMENT '關聯(lián)站點code',
`relate_resource` varchar(50) NOT NULL COMMENT '關聯(lián)表名',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='同步系統(tǒng)關聯(lián)關系'
附:
動態(tài)獲取dubbo服務對象幫助類:
/**
* 動態(tài)獲取dubbo服務對象幫助類
* @author wangshuai
* @version V1.0
* @since 2017-09-15 16:34
*/
@Component
public class DubboUtil {
public static final String VERSION = "VERSION";
public static final String GROUP = "GROUP";
public static final String ZOOKEEPERURL = "ZOOKEEPERURL";
public static final String APPLICATION = "APPLICATION";
@Value("${dubbo.application.name}")
private String appName;
@Value("${dubbo.registry.address}")
private String zkUrl;
public <T> T getDubboService(Class<T> clazz, Map<String, String> paramMap){
ApplicationConfig applicationConfig = new ApplicationConfig();
String app = paramMap.get(APPLICATION);
if(app != null && !"".equals(app)) {
applicationConfig.setName(app);
} else {
applicationConfig.setName(appName);
}
RegistryConfig registryConfig = new RegistryConfig();
String zookeeperurl = paramMap.get(ZOOKEEPERURL);
if(zookeeperurl != null && !"".equals(zookeeperurl)) {
registryConfig.setAddress(zookeeperurl);
} else {
registryConfig.setAddress(zkUrl);
}
applicationConfig.setRegistry(registryConfig);
ReferenceConfig<T> rc = new ReferenceConfig<T>();
rc.setApplication(applicationConfig);
String version = paramMap.get(VERSION);
if(version != null && !"".equals(version)) {
rc.setVersion(version);
}
String group = paramMap.get(GROUP);
if(group != null && !"".equals(group)) {
rc.setGroup(group);
}
rc.setInterface(clazz);
//緩存ReferenceConfig對象,防止內存和連接泄露
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
return cache.get(rc);
}
}