支付操作出現(xiàn)的問題與解決方法
- 場(chǎng)景介紹
- 存在的問題
- 解決方法
- 代碼級(jí)別加鎖
- 數(shù)據(jù)庫級(jí)加鎖
- 悲觀鎖
- 樂觀鎖
- 最終結(jié)果
場(chǎng)景介紹
用戶完成支付后嫩舟, 微信支付系統(tǒng)執(zhí)行回調(diào)地址醒串,通知支付服務(wù)系統(tǒng)更新用戶金額與支付訂單記錄的狀態(tài)。
/**
* 更新付款流水,增加用戶金額
*
* 1.微信執(zhí)行回調(diào)地址后,更新付款流水,增加用戶金額
* (isSuccess = false 微信支付失敗的回調(diào),記錄訂單信息,不更新用戶金額)
*
* 2.當(dāng)訂單的狀態(tài)已更新后宠叼,微信再一次執(zhí)行回調(diào)函數(shù)時(shí)先巴,不執(zhí)行操作,返回
*
* @param serialNum 訂單號(hào)
* @param isSuccess 支付是否成功
*/
@Transactional
public void updateDepositState(String serialNum, boolean isSuccess) {
String state = isSuccess ? Constants.DOPOSIT_RECORD_STATE_PAYED : Constants.DOPOSIT_RECORD_STATE_PAYFL;
PacpDepositRecord record = pacpDepositRecordDao.findBySerialNum(serialNum);
if (!record.getState().equals(Constants.DOPOSIT_RECORD_STATE_PAYING)){
//當(dāng)訂單的狀態(tài)已更新后冒冬,微信再一次執(zhí)行回調(diào)函數(shù)時(shí)伸蚯,不執(zhí)行操作
WXPayUtil.getLogger().info("wxnotify:微信支付回調(diào):訂單號(hào)===>"+serialNum+"狀態(tài)非充值中");
return;
}
String openId = record.getUserCode();
PacpUser pacpUser = pacpUserDao.findByCode(openId);
Long accountBalance = pacpUser.getAccountBalance();
record.setState(state);
record.setUpdateTime(DateTimeUtils.getCurrentTime());
record.setRestMoney(accountBalance + record.getMoney());
pacpDepositRecordDao.save(record);
if (!isSuccess){
return; //如果支付失敗,則不更新用戶的金額
}
pacpBillInfoService.createBillInfo(record);
Long expectAccount = accountBalance + record.getMoney();
pacpUserDao.updateAccountBalance(pacpUser.getId(), expectAccount, accountBalance);
}
存在的問題
第二類丟失更新
時(shí)間 | 充值事務(wù)A | 消費(fèi)事務(wù)B |
---|---|---|
T 1 | 開啟事務(wù) | |
T 2 | 開啟事務(wù) | |
T 3 | 查詢賬戶余額為1000 | |
T 4 | 查詢賬戶余額為1000 | |
T 5 | 充值金額100 | |
T 6 | 提交事務(wù) | |
T 7 | 消費(fèi)金額500 | |
T 8 | 提交事務(wù) | |
T 9 | 余額為500 |
用戶有1000简烤,先充值100后剂邮,在消費(fèi)500元。實(shí)際余額應(yīng)為600元乐埠。但在此種情況內(nèi)查詢余額卻為500(丟失了更新)抗斤。
上面這個(gè)案例只是展示了對(duì)金額操作會(huì)產(chǎn)生丟失更新的問題囚企,在支付服務(wù)系統(tǒng)內(nèi)更改支付訂單狀態(tài)時(shí)也會(huì)出現(xiàn)該問題丈咐。
解決方法
代碼級(jí)別加鎖
- 使用synchronized鎖方法
synchronized(this) { // 讀 - 寫 }
總結(jié): 不可行, 1. 多個(gè)充值任務(wù)都會(huì)執(zhí)行該方法龙宏。會(huì)造成嚴(yán)重的堵塞 2. 在分布式環(huán)境下無法保證數(shù)據(jù)的一致性還是會(huì)出現(xiàn)丟失更新問題棵逊。
- 使用分布式鎖Redisson
try{
// 使用訂單號(hào) + 方法名加鎖
DistributedLocker.lock(serialNum+"-deposit-save-key");
// 讀 - 寫
}finally {
DistributedLocker.unlock(serialNum+"-deposit-save-key");
}
總結(jié): 可行, 使用訂單號(hào) + 方法名加鎖银酗×居埃可以保證每個(gè)支付訂單的更新的一致。而且只會(huì)對(duì)當(dāng)前支付訂單號(hào)的更新訂單方法加鎖
不會(huì)造成嚴(yán)重的堵塞黍特。不過要在每一個(gè)對(duì)用戶金額或支付訂單狀態(tài)的更改都要加鎖蛙讥。
數(shù)據(jù)庫級(jí)別加鎖
要求我們使用的MySQL引擎為InnoDB,其為我們提供了兩種類型的行鎖:
共享鎖(S):允許一個(gè)事務(wù)去讀一行灭衷,阻止其他事務(wù)獲得相同數(shù)據(jù)集的排他鎖次慢。
排他鎖(X):允許獲得排他鎖的事務(wù)更新數(shù)據(jù),阻止其他事務(wù)取得相同數(shù)據(jù)集的共享讀鎖和排他寫鎖翔曲。
事務(wù)可以通過以下語句顯式給記錄集加共享鎖或排他鎖:
共享鎖(S):SELECT * FROM table_name WHERE ... LOCK IN SHARE MODE迫像。 其他 session 仍然可以查詢記錄,并也可以對(duì)該記錄加 share mode 的共享鎖瞳遍。但是如果當(dāng)前事務(wù)需要對(duì)該記錄進(jìn)行更新操作闻妓,則很有可能造成死鎖。
排他鎖(X):SELECT * FROM table_name WHERE ... FOR UPDATE掠械。其他 session 可以查詢?cè)撚涗浻衫拢遣荒軐?duì)該記錄加共享鎖或排他鎖注祖,而是等待獲得鎖。
我們一般給數(shù)據(jù)庫加鎖比較多的說法是悲觀鎖和樂觀鎖均唉,其實(shí)無論是悲觀鎖還是樂觀鎖氓轰,都是人們定義出來的概念,可以認(rèn)為是一種思想浸卦。而某個(gè)數(shù)據(jù)庫的某個(gè)引擎只是通過自身機(jī)制對(duì)其進(jìn)行了實(shí)現(xiàn)而已署鸡。
- 悲觀鎖
當(dāng)我們要對(duì)一個(gè)數(shù)據(jù)庫中的一條數(shù)據(jù)進(jìn)行修改的時(shí)候,為了避免同時(shí)被其他人修改限嫌,最好的辦法就是直接對(duì)該數(shù)據(jù)進(jìn)行加鎖以防止并發(fā)靴庆。
這種借助數(shù)據(jù)庫鎖機(jī)制在修改數(shù)據(jù)之前先鎖定,再修改的方式被稱之為悲觀并發(fā)控制怒医。之所以叫做悲觀鎖炉抒,是因?yàn)檫@是一種對(duì)數(shù)據(jù)的修改抱有悲觀態(tài)度的并發(fā)控制方式。我們一般認(rèn)為數(shù)據(jù)被并發(fā)修改的概率比較大稚叹,所以需要在修改之前先加鎖焰薄。
悲觀并發(fā)控制實(shí)際上是“先取鎖再訪問”的保守策略,為數(shù)據(jù)處理的安全提供了保證扒袖。但是在效率方面塞茅,處理加鎖的機(jī)制會(huì)讓數(shù)據(jù)庫產(chǎn)生額外的開銷,還有增加產(chǎn)生死鎖的機(jī)會(huì)季率;另外野瘦,還會(huì)降低并行性,一個(gè)事務(wù)如果鎖定了某行數(shù)據(jù)飒泻,其他事務(wù)就必須等待該事務(wù)處理完才可以處理那行數(shù)據(jù)鞭光。
而悲觀鎖的實(shí)現(xiàn)就是上面說的共享鎖和排他鎖。其中共享鎖是讀鎖泞遗,多個(gè)事務(wù)都可以獲取惰许,容易造成死鎖;我們通常用的比較多的是排他鎖史辙,也就是FOR UPDATE語句加鎖汹买,配合開啟事務(wù)實(shí)現(xiàn)。
注:MySQL InnoDB默認(rèn)行級(jí)鎖都是基于索引的髓霞,如果一條SQL語句用不到索引是不會(huì)使用行級(jí)鎖的卦睹,會(huì)使用表級(jí)鎖把整張表鎖住,這點(diǎn)需要注意方库。
- 樂觀鎖
樂觀鎖( Optimistic Locking ) 是相對(duì)悲觀鎖而言的结序,樂觀鎖假設(shè)數(shù)據(jù)一般情況下不會(huì)造成沖突,所以在數(shù)據(jù)進(jìn)行提交更新的時(shí)候纵潦,才會(huì)正式對(duì)數(shù)據(jù)的沖突與否進(jìn)行檢測(cè)徐鹤,如果發(fā)現(xiàn)沖突了垃环,則讓返回用戶錯(cuò)誤的信息,讓用戶決定如何去做返敬。
相對(duì)于悲觀鎖遂庄,在對(duì)數(shù)據(jù)庫進(jìn)行處理的時(shí)候,樂觀鎖并不會(huì)使用數(shù)據(jù)庫提供的鎖機(jī)制劲赠。一般的實(shí)現(xiàn)樂觀鎖的方式就是記錄數(shù)據(jù)版本(version)涛目。
樂觀并發(fā)控制相信事務(wù)之間的數(shù)據(jù)競爭(data race)的概率是比較小的,因此盡可能直接做下去凛澎,直到提交的時(shí)候才去鎖定霹肝,所以不會(huì)產(chǎn)生任何鎖和死鎖。
樂觀鎖的概念中其實(shí)已經(jīng)闡述了他的具體實(shí)現(xiàn)細(xì)節(jié)塑煎,主要就是兩個(gè)步驟:沖突檢測(cè)和數(shù)據(jù)更新沫换。
其實(shí)現(xiàn)方式有一種比較典型的就是Compare and Swap(CAS)。CAS是項(xiàng)樂觀鎖技術(shù)最铁,當(dāng)多個(gè)線程嘗試使用CAS同時(shí)更新同一個(gè)變量時(shí)讯赏,只有其中一個(gè)線程能更新變量的值,而其它線程都失敗冷尉,失敗的線程并不會(huì)被掛起漱挎,而是被告知這次競爭中失敗,并可以再次嘗試网严。
在使用Data JPA時(shí)识樱,可以通過對(duì)對(duì)象增加一個(gè)version字段和增加一個(gè)@version注解實(shí)現(xiàn)記錄數(shù)據(jù)版本。具體可看https://www.cnblogs.com/wangzhongqiu/p/7550985.html
在樂觀鎖與悲觀鎖的選擇上面震束,主要看下兩者的區(qū)別以及適用場(chǎng)景就可以了:
樂觀鎖并未真正加鎖,效率高当犯。一旦鎖的粒度掌握不好垢村,更新失敗的概率就會(huì)比較高,容易發(fā)生業(yè)務(wù)失敗嚎卫。
悲觀鎖依賴數(shù)據(jù)庫鎖嘉栓,效率低。更新失敗的概率比較低拓诸。
解決方案
根據(jù)更新操作的場(chǎng)景侵佃。我們需要有兩步的操作。
更新訂單支付記錄狀態(tài)State
更新用戶的金額AccountBalance
我們可以發(fā)現(xiàn)對(duì)更新訂單支付記錄狀態(tài)操作的場(chǎng)景并不常見奠支。對(duì)其更新時(shí)可以采用悲觀鎖馋辈。
在更新用戶的金額AccountBalance是我們可以發(fā)現(xiàn)其實(shí)很多場(chǎng)景都需要進(jìn)行更改用戶金額的操作)如消費(fèi),充值倍谜,提現(xiàn)等)迈螟,所以可以采用樂觀鎖叉抡。
- 更新訂單支付記錄狀態(tài)State添加悲觀鎖
注: 需要給流水號(hào)serialNum 添加索引,否則該操作將會(huì)進(jìn)行表鎖
public interface PacpDepositRecordDao extends BaseJPADao<PacpDepositRecord, String> {
@Query(value = "SELECT * FROM pacp_deposit_record dc WHERE serial_num = :serialNum FOR UPDATE" ,nativeQuery = true)
PacpDepositRecord findBySerialNumForUpdate(@Param(value = "serialNum") String serialNum);
}
在查詢?cè)摋l記錄時(shí)對(duì)其進(jìn)行添加排他鎖(鎖定該行記錄)答毫。防止其他事務(wù)對(duì)其進(jìn)行讀热烀瘛(堵塞)
PacpDepositRecord record = pacpDepositRecordDao.findBySerialNumForUpdate(serialNum);
在對(duì)記錄更改狀態(tài)時(shí),可以保證record不會(huì)丟失更新洗搂。
直到事務(wù)提交后消返, 釋放鎖
- 更新用戶的金額AccountBalance添加樂觀鎖,進(jìn)行CAS更新
由于是要保證金額更新的準(zhǔn)確無誤耘拇,所以只需對(duì)金額的字段進(jìn)行加鎖侦副。
public interface PacpUserDao extends BaseJPADao<PacpUser, String> {
/**
* CAS 更新用戶金額
* @param id userid
* @param expectAccount 所期待的金額
* @param originalAccount 原始的金額
* @return 更新記錄條數(shù)
*/
@Modifying
@Query(value = "UPDATE PacpUser u SET u.accountBalance = :expectAccount WHERE u.id = :id AND u.accountBalance = :originalAccount")
int updateAccountBalanceById(@Param(value = "id") String id, @Param("expectAccount") Long expectAccount,@Param("originalAccount") Long originalAccount);
}
添加AOP注解
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface IsTryAgain {
//重試次數(shù)
int retryTimes() default 10;
}
添加切面,捕獲異常驼鞭,自旋CAS
/**
* 定義重試切面方法秦驯,是為了發(fā)生樂觀鎖異常時(shí)在一個(gè)全新的事務(wù)里提交上一次的操作,
* 直到達(dá)到重試上限挣棕;因此切面實(shí)現(xiàn) org.springframework.core.Ordered 接口译隘,
* 們就可把切面的優(yōu)先級(jí)設(shè)定為高于事務(wù)通知 。
*/
@Aspect
@Component
public class ConcurrentOperationExecutor implements Ordered {
private final Logger log = LogManager.getLogger(getClass());
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
@Pointcut("@annotation(com.syni.pacp.pay.aop.annotation.IsTryAgain)")
public void operationService() {
}
@Around("operationService()")
@Transactional(rollbackOn = Exception.class)
public Object doConcurrentOperation(ProceedingJoinPoint pjp) throws Throwable {
Signature signature = pjp.getSignature();
MethodSignature methodSignature = (MethodSignature) signature;
Method targetMethod = methodSignature.getMethod();
IsTryAgain annotation = targetMethod.getAnnotation(IsTryAgain.class);
int retryTimes = annotation.retryTimes();
while (true) {
try {
return pjp.proceed();
}catch (OptFailureException e) {
if (retryTimes > 0) {
retryTimes--;
log.info("CAS更新失敗洛心, 重新執(zhí)行更新固耘, 還剩余更新次數(shù):"+ retryTimes);
} else {
throw e;
}
}
}
}
}
自旋執(zhí)行更新操作ing
int flag = pacpUserDao.updateAccountBalanceById(pacpUser.getId(), expectAccount, accountBalance);
if (flag != 1 ){
//拋出自定義異常
throw new OptFailureException("更新用戶金額失敗!");
}
當(dāng)更新用戶的金額AccountBalance 失敗時(shí),拋出異常new OptFailureException()词身,ConcurrentOperationExecutor 捕獲異常厅目。
查看重試更新操作是否達(dá)到指定的最大值,如果沒有這繼續(xù)嘗試更新法严。直到更新成功或者超出最大值為止损敷。
到此最終的解決方案如下:
@IsTryAgain //添加AOP注解
@Transactional(rollbackOn = Exception.class)
public void updateDepositState(String serialNum, boolean isSuccess){
String state = isSuccess ? Constants.DOPOSIT_RECORD_STATE_PAYED : Constants.DOPOSIT_RECORD_STATE_PAYFL;
PacpDepositRecord record = pacpDepositRecordDao.findBySerialNumForUpdate(serialNum);
if (!record.getState().equals(Constants.DOPOSIT_RECORD_STATE_PAYING)){
//當(dāng)訂單的狀態(tài)已更新后,微信再一次執(zhí)行回調(diào)函數(shù)時(shí)深啤,不執(zhí)行操作
WXPayUtil.getLogger().info("wxnotify:微信支付回調(diào):訂單號(hào)===>"+serialNum+"狀態(tài)非充值中");
return;
}
String openId = record.getUserCode();
PacpUser pacpUser = pacpUserDao.findByCode(openId);
Long accountBalance = pacpUser.getAccountBalance();
record.setState(state);
record.setUpdateTime(DateTimeUtils.getCurrentTime());
record.setRestMoney(accountBalance + record.getMoney());
pacpDepositRecordDao.save(record);
if (!isSuccess){
return; //如果支付失敗拗馒,則不更新用戶的金額
}
pacpBillInfoService.createBillInfo(record);
Long expectAccount = accountBalance + record.getMoney();
int flag = pacpUserDao.updateAccountBalanceById(pacpUser.getId(), expectAccount, accountBalance);
if (flag != 1 ){
//拋出自定義異常
throw new OptFailureException("更新用戶金額失敗!");
}
}