問(wèn)題
系統(tǒng)有一個(gè)支付回調(diào)接口,偶爾出現(xiàn)了秒級(jí)內(nèi)回調(diào)2次的情況。我們?cè)诨卣{(diào)接口中寫(xiě)了推送任務(wù)。結(jié)果就是2次重復(fù)推送寓娩。偽代碼如下:
1. 獲得訂單
2. 更改信息和狀態(tài)(已支付)
3. 通知,推送呼渣,定時(shí)任務(wù)
在接口加入狀態(tài)判斷仍然有2次推送的情況出現(xiàn)棘伴。也就是說(shuō)2個(gè)線程出現(xiàn)了并發(fā)的情況。這個(gè)時(shí)候就要使用到分布式鎖來(lái)限制程序的并發(fā)執(zhí)行屁置。
1. 獲得訂單
x. 如果訂單狀態(tài)為已支付焊夸,則拋異常或直接return
2. 更改信息和狀態(tài)(已支付)
3. 通知蓝角,推送阱穗,定時(shí)任務(wù)
分布式鎖
分布式鎖是控制分布式系統(tǒng)之間同步訪問(wèn)共享資源的一種方式饭冬。在分布式系統(tǒng)中,常常需要協(xié)調(diào)他們的動(dòng)作揪阶。如果不同的系統(tǒng)或是同一個(gè)系統(tǒng)的不同主機(jī)之間共享了一個(gè)或一組資源昌抠,那么訪問(wèn)這些資源的時(shí)候,往往需要互斥來(lái)防止彼此干擾來(lái)保證一致性遣钳,在這種情況下扰魂,便需要使用到分布式鎖。
在分布式系統(tǒng)中蕴茴,常常需要協(xié)調(diào)他們的動(dòng)作劝评。如果不同的系統(tǒng)或是同一個(gè)系統(tǒng)的不同主機(jī)之間共享了一個(gè)或一組資源,那么訪問(wèn)這些資源的時(shí)候倦淀,往往需要互斥來(lái)防止彼此干擾來(lái)保證一致性蒋畜,這個(gè)時(shí)候,便需要使用到分布式鎖撞叽。
首先姻成,為了確保分布式鎖可用,我們至少要確保鎖的實(shí)現(xiàn)同時(shí)滿足以下四個(gè)條件:
- 互斥性愿棋。在任意時(shí)刻科展,只有一個(gè)客戶端能持有鎖。
- 不會(huì)發(fā)生死鎖糠雨。即使有一個(gè)客戶端在持有鎖的期間崩潰而沒(méi)有主動(dòng)解鎖才睹,也能保證后續(xù)其他客戶端能加鎖。
- 具有容錯(cuò)性甘邀。只要大部分的Redis節(jié)點(diǎn)正常運(yùn)行琅攘,客戶端就可以加鎖和解鎖。
- 解鈴還須系鈴人松邪。加鎖和解鎖必須是同一個(gè)客戶端坞琴,客戶端自己不能把別人加的鎖給解了。
Redis實(shí)現(xiàn)分布式鎖
@Slf4j
public class RedisLock {
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
private static final Long RELEASE_SUCCESS = 1L;
/**
* 嘗試獲取分布式鎖
* @param jedis Redis客戶端
* @param lockKey 鎖
* @param requestId 請(qǐng)求標(biāo)識(shí)
* @param expireTime 超期時(shí)間
* @return 是否獲取成功
*/
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
// log.info("lock -> {} lockKey -> {} request -> {}" ,result, lockKey, requestId);
return LOCK_SUCCESS.equals(result);
}
/**
* 釋放分布式鎖
* @param jedis Redis客戶端
* @param lockKey 鎖
* @param requestId 請(qǐng)求標(biāo)識(shí)
* @return 是否釋放成功
*/
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
return RELEASE_SUCCESS.equals(result);
}
}
簡(jiǎn)單的測(cè)試逗抑,controller省略
public void updateStatus(String id) throws Exception {
Jedis jedis = jedisPool.getResource();
String requestId = SerialGenerator.randomUUID();
boolean isGetLock = RedisLock.tryGetDistributedLock(jedis, "onPaySuccess" + id, requestId , 5);
if (isGetLock) {
Optional<MerchantOrder> byId = merchantOrderRepository.findById(id);
MerchantOrder order = byId.get();
if (order.getStatus() == OrderStatus.PAID) {
throw new BaseException("重復(fù)消費(fèi)");
}
order.setStatus(OrderStatus.PAID);
merchantOrderRepository.save(order);
}
RedisLock.releaseDistributedLock(jedis, "onPaySuccess" + id,requestId);
}
JMeter測(cè)試
分別 啟用10個(gè)和100個(gè)線程對(duì)接口進(jìn)行測(cè)試剧辐。從log可以看出只有一個(gè)線程的log打印出了sql語(yǔ)句,后面的都顯示重復(fù)消費(fèi)锋八。
額外的單機(jī)非集群思路
- 只要在update t set state =1 where state=0時(shí)接受下row
- 如果row=0浙于,再查一次冪等返回
- 畢竟前置已經(jīng)查過(guò)了,更新時(shí)碰撞概率太低
碰到問(wèn)題挟纱,解決問(wèn)題,僅做記錄腐宋。如有問(wèn)題紊服,歡迎指教檀轨。