數(shù)據(jù)數(shù)據(jù)遷移其實(shí)主要就是垂直拆分和分庫(kù)分表 垂直拆分和分庫(kù)分表過(guò)程中主要數(shù)據(jù)庫(kù)的操作就是雙寫和查詢 我們會(huì)有開關(guān)來(lái)控制狀態(tài)的轉(zhuǎn)換委粉,公司業(yè)務(wù)里orm主要是用mybatis
本文主要目的是為了減少代碼的侵入性和遷移過(guò)程中數(shù)據(jù)庫(kù)讀寫代碼的可復(fù)用性,實(shí)際項(xiàng)目里單個(gè)表涉及的查詢多達(dá)幾十個(gè)娶桦,并且涉及到幾十個(gè)文件的修改贾节,為了減少遷移過(guò)程中對(duì)業(yè)務(wù)代碼的修改,我文章下面會(huì)給一個(gè)樣例(畢竟不能帶上公司業(yè)務(wù)代碼)衷畦。我目前在酷家樂(lè)工作中有遇到以下兩個(gè)之前處理起來(lái)代碼比較繁瑣的地方
1栗涂、一個(gè)是大量的業(yè)務(wù)表從老的數(shù)據(jù)進(jìn)行遷移(這里可能容易遇到自增主鍵切換寫順序一致性問(wèn)題),
2祈争、還有一個(gè)問(wèn)題就是部分大表的擴(kuò)容(實(shí)際上相當(dāng)于垂直拆分 然后分庫(kù)) 本質(zhì)上都是一回事情
進(jìn)行遷移的話會(huì)需要采用dts或者數(shù)據(jù)庫(kù)日志binlog同步存量數(shù)據(jù)的過(guò)程斤程,這里根據(jù)自己公司的技術(shù)棧來(lái)選擇 存量數(shù)據(jù)寫入數(shù)據(jù)庫(kù)的時(shí)候帶上主鍵
INSERT INTO tablename(field1,field2, field3, ...)
VALUES(value1, value2, value3, ...)
ON DUPLICATE KEY UPDATE
field1=value1,field2=value2, field3=value3, ...;
寫增量數(shù)據(jù)
1、通常做法是進(jìn)行雙寫 寫入老表的時(shí)候同時(shí)寫入新表菩混,如果需要的話可以加上手動(dòng)事務(wù)管理忿墅,畢竟是跨庫(kù),不過(guò)實(shí)際應(yīng)用場(chǎng)景中寫入失敗的情況很少沮峡,根據(jù)實(shí)際情況來(lái)決定疚脐。
2、雙寫邏輯會(huì)麻煩點(diǎn)的地方就是插入順序切換時(shí)候的自增主鍵一致性問(wèn)題邢疙。
自增主鍵插入的問(wèn)題
(1) 老表如果是單表自增的棍弄,新表是單表自增的話
找流量低的時(shí)候切換讀寫順序望薄,如果業(yè)務(wù)需要高度一致性,加分布式鎖照卦,需要額外的開關(guān)來(lái)決定是否走有鎖的邏輯式矫,順序切換以后,關(guān)閉那個(gè)控制是否走鎖的邏輯的開關(guān)(如果條件允許 的話役耕,各個(gè)服務(wù)器之間其實(shí)通過(guò)本地rocksDb寫磁盤 實(shí)現(xiàn)一個(gè)分布式一致性算法比如于raft 也是可以的 實(shí)際上各個(gè)服務(wù)自己的集群就是一個(gè)小型的raft集群了) 其實(shí)這里也可以方法 (2)去解決采转,就是需要個(gè)過(guò)渡表來(lái)管理主鍵自增
(2) 老表如果是單表自增的,新表是分表的話
老表自增id 增加一個(gè)大區(qū)間比如原來(lái)是 id = 10^7 我們直接增加到 id = 2 * 10 ^ 7 新表設(shè)置自增id為 10^7 + delta( delta > 0 && delta < 10 ^ 4) 這個(gè)范圍自己控制一下就好, 切換插入順序以后主鍵不會(huì)沖突瞬痘,也不會(huì)阻塞依賴業(yè)務(wù)方修改sql為rpc 遷移完成以后 新表主鍵改成 3 * 10 ^ 7, 這里只是大致數(shù)量, 實(shí)際區(qū)間大小由業(yè)務(wù)來(lái)決定
at
然后就是代碼邏輯冗余問(wèn)題了
這里其實(shí)稍微涉及到一點(diǎn)mybatis的架構(gòu)故慈,通常業(yè)務(wù)里面我們的mybatis的mapper對(duì)象本質(zhì)上是MapperProxy這個(gè)類,套了層jdk動(dòng)態(tài)代理而已框全,對(duì)于需要垂直拆分?jǐn)?shù)據(jù)遷移表相關(guān)的mapper察绷,我這邊直接自己實(shí)現(xiàn)了一個(gè)代理,繞過(guò)MapperProxy津辩,直接通過(guò) SqlSession 去執(zhí)行拆撼,但是我還是會(huì)實(shí)現(xiàn)一個(gè) mapper類的代理對(duì)象去替換掉業(yè)務(wù)代碼里面用到的mapper對(duì)象,從而實(shí)現(xiàn)基本無(wú)侵入性, 遷移完了以后代碼還是需要改一下包名啥的
改造如下 我這里以一個(gè)UserMapper為例
@Primary
@Bean
public UserMapper delegateUserMapper(DbHandler dbHandler) {
return (UserMapper) Proxy.newProxyInstance(UserMapper.class.getClassLoader(),
new Class<?>[]{ UserMapper.class }, dbHandler);
}
MapperHandler
public class MapperHandler {
private final MapperMethod.SqlCommand command;
private final MapperMethod.MethodSignature methodSignature;
public MapperHandler(Class<?> mapperInterface, Method method, Configuration config) {
this.command = new MapperMethod.SqlCommand(config,
mapperInterface, method);
this.methodSignature = new MapperMethod.MethodSignature(config,
mapperInterface, method);
}
public MapperMethod.MethodSignature getMethodSignature() {
return methodSignature;
}
public Object execute(SqlSession sqlSession, Object[] args) {
return execute(sqlSession, args, methodSignature.convertArgsToSqlCommandParam(args));
}
public Object execute(SqlSession sqlSession, Object[] args, Object param) {
Object result;
switch (command.getType()) {
case INSERT: {
result = rowCountResult(sqlSession.insert(command.getName(), param));
break;
}
case UPDATE: {
result = rowCountResult(sqlSession.update(command.getName(), param));
break;
}
case DELETE: {
result = rowCountResult(sqlSession.delete(command.getName(), param));
break;
}
case SELECT:
if (methodSignature.returnsVoid() && methodSignature.hasResultHandler()) {
executeWithResultHandler(sqlSession, args, param);
result = null;
} else if (methodSignature.returnsMany()) {
result = executeForMany(sqlSession, args, param);
} else if (methodSignature.returnsMap()) {
result = executeForMap(sqlSession, args, param);
} else if (methodSignature.returnsCursor()) {
result = executeForCursor(sqlSession, args, param);
} else {
result = sqlSession.selectOne(command.getName(), param);
}
break;
case FLUSH:
result = sqlSession.flushStatements();
break;
default:
throw new BindingException("Unknown execution method for: " + command.getName());
}
if (result == null && methodSignature.getReturnType().isPrimitive() &&
!methodSignature.returnsVoid()) {
throw new BindingException("Mapper method '" + command.getName()
+ " attempted to return null from a method with a primitive return type (" +
methodSignature.getReturnType() + ").");
}
return result;
}
private Object rowCountResult(int rowCount) {
final Object result;
if (methodSignature.returnsVoid()) {
result = null;
} else if (Integer.class.equals(methodSignature.getReturnType()) || Integer.TYPE.equals(
methodSignature.getReturnType())) {
result = rowCount;
} else if (Long.class.equals(methodSignature.getReturnType()) || Long.TYPE.equals(
methodSignature.getReturnType())) {
result = (long) rowCount;
} else if (Boolean.class.equals(methodSignature.getReturnType()) || Boolean.TYPE.equals(
methodSignature.getReturnType())) {
result = rowCount > 0;
} else {
throw new BindingException(
"Mapper method '" + command.getName() + "' has an unsupported return type: " +
methodSignature.getReturnType());
}
return result;
}
private void executeWithResultHandler(SqlSession sqlSession, Object[] args, Object param) {
MappedStatement ms = sqlSession.getConfiguration().getMappedStatement(command.getName());
if (void.class.equals(ms.getResultMaps().get(0).getType())) {
throw new BindingException("method " + command.getName()
+ " needs either a @ResultMap annotation, a @ResultType annotation,"
+
" or a resultType attribute in XML so a ResultHandler can be used as a parameter.");
}
if (methodSignature.hasRowBounds()) {
RowBounds rowBounds = methodSignature.extractRowBounds(args);
sqlSession.select(command.getName(), param, rowBounds,
methodSignature.extractResultHandler(args));
} else {
sqlSession.select(command.getName(), param, methodSignature.extractResultHandler(args));
}
}
private <E> Object executeForMany(SqlSession sqlSession, Object[] args, Object param) {
List<E> result;
if (methodSignature.hasRowBounds()) {
RowBounds rowBounds = methodSignature.extractRowBounds(args);
result = sqlSession.<E>selectList(command.getName(), param, rowBounds);
} else {
result = sqlSession.<E>selectList(command.getName(), param);
}
// issue #510 Collections & arrays support
if (!methodSignature.getReturnType().isAssignableFrom(result.getClass())) {
if (methodSignature.getReturnType().isArray()) {
return convertToArray(result);
} else {
return convertToDeclaredCollection(sqlSession.getConfiguration(), result);
}
}
return result;
}
private <T> Cursor<T> executeForCursor(SqlSession sqlSession, Object[] args, Object param) {
Cursor<T> result;
if (methodSignature.hasRowBounds()) {
RowBounds rowBounds = methodSignature.extractRowBounds(args);
result = sqlSession.<T>selectCursor(command.getName(), param, rowBounds);
} else {
result = sqlSession.<T>selectCursor(command.getName(), param);
}
return result;
}
private <E> Object convertToDeclaredCollection(Configuration config, List<E> list) {
Object collection = config.getObjectFactory().create(methodSignature.getReturnType());
MetaObject metaObject = config.newMetaObject(collection);
metaObject.addAll(list);
return collection;
}
@SuppressWarnings("unchecked")
private <E> Object convertToArray(List<E> list) {
Class<?> arrayComponentType = methodSignature.getReturnType().getComponentType();
Object array = Array.newInstance(arrayComponentType, list.size());
if (arrayComponentType.isPrimitive()) {
for (int i = 0; i < list.size(); i++) {
Array.set(array, i, list.get(i));
}
return array;
} else {
return list.toArray((E[]) array);
}
}
private <K, V> Map<K, V> executeForMap(SqlSession sqlSession, Object[] args, Object param) {
Map<K, V> result;
if (methodSignature.hasRowBounds()) {
RowBounds rowBounds = methodSignature.extractRowBounds(args);
result = sqlSession.<K, V>selectMap(command.getName(), param,
methodSignature.getMapKey(),
rowBounds);
} else {
result = sqlSession.<K, V>selectMap(command.getName(), param,
methodSignature.getMapKey());
}
return result;
}
}
DbHandler 整體架構(gòu)如下
@Service
public class DbHandler implements InvocationHandler, BeanPostProcessor {
private SqlSession mSrcSqlSession;
private SqlSession mDestSqlSession;
private ConcurrentHashMap<Method, MapperHandler> mMethodCache = new ConcurrentHashMap<>();
@Autowired
public void setProperties(
@Qualifier("srcSqlSessionFactory") SqlSessionFactory srcSqlSessionFactory,
@Qualifier("destSqlSessionFactory") SqlSessionFactory destSqlSessionFactory) {
}
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable {
/**
* 這里處理sql處理
* 正常情況下不會(huì)有delete 這里進(jìn)行異常判斷 根據(jù)業(yè)務(wù)場(chǎng)景進(jìn)行處理
*/
return null;
}
/**
* 這里如果是單表遷移 可以考慮整體加分布式鎖喘沿,或者把主鍵自增的任務(wù)交給一個(gè)中間表
* 移交完成以后 再由中間表移交給新表
* @param args
* @param oldHandler
* @param newHandler
* @return
*/
private Object doInsert(Object[] args, MapperHandler oldHandler, MapperHandler newHandler) {
}
private Object doUpdate(Object[] args, MapperHandler oldHandler, MapperHandler newHandler) {
}
private Object doSelect(Object[] args, MapperHandler oldHandler, MapperHandler newHandler) {
if (readNew()) {
return newHandler.execute(mDestSqlSession, args);
}
return oldHandler.execute(mSrcSqlSession, args);
}
/**
* 開關(guān)是否讀新表
* @return
*/
private boolean readNew() {
/**
* TODO
*/
return false;
}
/**
* 開關(guān)寫舊表
* @return
*/
private boolean writeOld() {
/**
* TODO
*/
return true;
}
/**
* 開關(guān)寫新表
* @return
*/
private boolean writeNew() {
/**
* TODO
*/
return true;
}
/**
* 開關(guān)先插入新表
* @return
*/
private boolean insertNewFirst() {
/**
* TODO
*/
return false;
}
private boolean partitionDb() {
return false;
}
private int getSequenceId() {
return 10000;
}
private String getStatementId(Method method) {
return method.getDeclaringClass().getName() + "." + method.getName();
}
private MappedStatement getMappedStatement(Method method, String statementId,
}
private MapperHandler cachedMapperMethod(Method method, Class<?> clazz,
Configuration configuration) {
}
}
很多細(xì)節(jié)我這里暫時(shí)就先略去了闸度,大家可以自己思考下怎么寫,畢竟這個(gè)比較偏向業(yè)務(wù)蚜印,我這里 就是直接獲取 sqlSession, 然后我們可以借助 MapperSignature這個(gè)內(nèi)部類來(lái)完成mybatis的接下來(lái)的工作
下面這里給出一個(gè) invoke和insert方法的簡(jiǎn)要實(shí)現(xiàn) , 細(xì)節(jié)大家看下注釋我用的是java8
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable {
/**
* {@link Object#hashCode()} {@link #equals(Object)} 這些方法不做處理
* interface default實(shí)現(xiàn)不做處理
*/
if (Object.class.equals(method.getDeclaringClass()) || method.isDefault()) {
return method.invoke(this, args);
}
Configuration configuration = mSrcSqlSession.getConfiguration();
String statementId = getStatementId(method);
MappedStatement mappedStatement = getMappedStatement(method, statementId, configuration);
SqlCommandType sqlCommandType = mappedStatement.getSqlCommandType();
MapperHandler oldMapperHandler = cachedMapperMethod(method, OldUserMapper.class,
mSrcSqlSession.getConfiguration());
MapperHandler newMapperHandler = cachedMapperMethod(method, NewUserMapper.class,
mDestSqlSession.getConfiguration());
if (SqlCommandType.INSERT.equals(sqlCommandType)) {
return doInsert(args, oldMapperHandler, newMapperHandler);
} else if (SqlCommandType.UPDATE.equals(sqlCommandType)) {
return doUpdate(args, oldMapperHandler, newMapperHandler);
} else if (SqlCommandType.SELECT.equals(sqlCommandType)) {
return doSelect(args, oldMapperHandler, newMapperHandler);
}
/**
* 正常情況下不會(huì)有delete 這里進(jìn)行異常判斷 根據(jù)業(yè)務(wù)場(chǎng)景進(jìn)行處理
*/
return null;
}
/**
* 這里如果是單表遷移 可以考慮整體加分布式鎖莺禁,或者把主鍵自增的任務(wù)交給一個(gè)中間表
* 移交完成以后 再由中間表移交給新表
* @param args
* @param oldHandler
* @param newHandler
* @return
*/
private Object doInsert(Object[] args, MapperHandler oldHandler, MapperHandler newHandler) {
Object newRet, oldRet;
boolean enableOld = writeOld();
boolean enableNew = writeNew();
if (enableOld && enableNew) {
if (insertNewFirst()) {
/**
* 如果是分庫(kù)分表 我們從sequence里取出id
*/
if (partitionDb()) {
if (args[0] instanceof User) {
int id = getSequenceId();
((User) args[0]).setUserId(id);
}
}
newRet = newHandler.execute(mDestSqlSession, args);
oldRet = oldHandler.execute(mSrcSqlSession, args);
/**
* 這里可以做比較
*/
return newRet;
} else 「
oldRet = oldHandler.execute(mSrcSqlSession, args);
newRet = newHandler.execute(mDestSqlSession, args);
/**
* 這里可以做比較
*/
return oldRet;
}
} else if (enableNew) {
return newHandler.execute(mDestSqlSession, args);
}
return oldHandler.execute(mSrcSqlSession, args);
}
mybatis xml插入的時(shí)候 判斷下userId是否為null 是null就自增否則直接插入
<insert id="addUser" parameterType="com.qunhe.instdeco.partition.data.User" useGeneratedKeys="true">
INSERT INTO user
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="user.userId != null">
user_id,
</if>
<if test="user.name != null">
username,
</if>
<if test="user.age != null">
age,
</if>
</trim>
VALUES
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="user.userId != null">
#{user.userId},
</if>
<if test="user.name != null">
#{user.name},
</if>
<if test="user.age != null">
#{user.age},
</if>
</trim>
</insert>
這樣一來(lái)的話 原有的業(yè)務(wù)代碼里面基本不需要我們自行修改代碼,我這里其實(shí)還省略了很多的細(xì)節(jié)窄赋,酷家樂(lè)業(yè)務(wù)里面查詢的時(shí)候如果是分庫(kù)分表的話哟冬,對(duì)于分表鍵批量查詢其實(shí)多的時(shí)候可以采用 ElasticSearch來(lái)查,這里就需要判斷 分表鍵的參數(shù)的數(shù)量 需要 methodSignature去把 Object[] args轉(zhuǎn)換為 ParamMap 其實(shí)就是一個(gè)hashMap大家自己去看下mybatis這部分源碼就知道了.