鳳玉寫了一個(gè)分布式事務(wù)方案望几,做一個(gè)記錄绩脆。
wiki:
0001.jpg
0002.jpg
0003.jpg
0004.jpg
ppt:
幻燈片02.jpg
幻燈片03.jpg
幻燈片04.jpg
幻燈片05.jpg
幻燈片06.jpg
幻燈片07.jpg
幻燈片08.jpg
幻燈片09.jpg
幻燈片10.jpg
幻燈片11.jpg
幻燈片12.jpg
幻燈片13.jpg
/**
* 提交分布式事務(wù)
*/
public void commit(CommitReq req) throws BusinessException, ConcurrentWriteException {
List<AssetTrans> transPOs = assetTransDao.getByTransId(req.getMemberId(), req.getTransId());
if (transPOs.size() == 0) {
return;
}
// 刪除事務(wù)記錄
// 并發(fā)刪除的時(shí)候,先執(zhí)行刪除操作的事務(wù)T1未結(jié)束的情況下,后執(zhí)行刪除操作的事務(wù)T2會(huì)阻塞,如果T1提交事務(wù)成功了,則T2 deleteByTransId會(huì)返回0
int result = assetTransDao.deleteByTransId(req.getMemberId(), req.getTransId());
if (result == 0) {
return;
}
...
//根據(jù)類型進(jìn)行操作
}
/**
* 回滾分布式事務(wù)
*/
public void rollback(RollbackReq req) throws BusinessException, ConcurrentWriteException {
List<AssetTrans> transPOs = assetTransDao.getByTransId(req.getMemberId(), req.getTransId());
if (transPOs.size() == 0) {
return;
}
// 刪除事務(wù)記錄
int result = assetTransDao.deleteByTransId(req.getMemberId(), req.getTransId());
if (result == 0) {
return;
}
for (AssetTrans transPO : transPOs) {
// 添加積分操作回滾事務(wù)
if (transPO.getProcessType().equals(AssetProcessType.ADD_MEMBER_POINT.getValue())) {
// do nothing
}
// 消費(fèi)積分操作回滾事務(wù)
if (transPO.getProcessType().equals(AssetProcessType.PAY_MEMBER_POINT.getValue())) {
PayMemberPointTransData transData = FastJsonUtils.parse(transPO.data, PayMemberPointTransData.class);
pointService.rollbackPayMemberPoint(transData);
}
// 取消添加積分操作回滾事務(wù)
if (transPO.getProcessType().equals(AssetProcessType.CANCEL_ADD_POINT.getValue())) {
CancelPointAddTransData transData = FastJsonUtils.parse(transPO.data, CancelPointAddTransData.class);
pointService.rollbackCancelAddPoint(transData);
}
//取消撤銷消費(fèi)積分操作
if (transPO.getProcessType().equals(AssetProcessType.CANCEL_PAY_POINT.getValue())) {
// do nothing
}
//添加儲(chǔ)值回滾事務(wù)
if (transPO.getProcessType().equals(AssetProcessType.ADD_MEMBER_BALANCE.getValue())) {
//do nothing
}
//消費(fèi)儲(chǔ)值操作回滾事務(wù)
if (transPO.getProcessType().equals(AssetProcessType.PAY_MEMBER_BALANCE.getValue())) {
PayMemberBalanceTransData transData = FastJsonUtils.parse(transPO.getData(), PayMemberBalanceTransData.class);
balanceService.rollbackPayMemberBalance(transData);
}
//取消增加儲(chǔ)值操作回滾事務(wù)
if (transPO.getProcessType().equals(AssetProcessType.CANCEL_ADD_BALANCE.getValue())) {
CancelAddBalanceTransData transData = FastJsonUtils.parse(transPO.getData(), CancelAddBalanceTransData.class);
balanceService.rollbackCancelAddBalance(transData);
}
//取消消費(fèi)儲(chǔ)值操作回滾事務(wù)
if (transPO.getProcessType().equals(AssetProcessType.CANCEL_PAY_BALANCE.getValue())) {
//do nothing
}
}
}
}