一思瘟、前言
關(guān)于redis分布式鎖, 查了很多資料, 發(fā)現(xiàn)很多只是實(shí)現(xiàn)了最基礎(chǔ)的功能, 但是, 并沒(méi)有解決當(dāng)鎖已超時(shí)而業(yè)務(wù)邏輯還未執(zhí)行完的問(wèn)題, 這樣會(huì)導(dǎo)致: A線程超時(shí)時(shí)間設(shè)為10s(為了解決死鎖問(wèn)題), 但代碼執(zhí)行時(shí)間可能需要30s, 然后redis服務(wù)端10s后將鎖刪除, 此時(shí), B線程恰好申請(qǐng)鎖, redis服務(wù)端不存在該鎖, 可以申請(qǐng), 也執(zhí)行了代碼, 那么問(wèn)題來(lái)了, A须揣、B線程都同時(shí)獲取到鎖并執(zhí)行業(yè)務(wù)邏輯, 這與分布式鎖最基本的性質(zhì)相違背: 在任意一個(gè)時(shí)刻, 只有一個(gè)客戶端持有鎖, 即獨(dú)享
為了解決這個(gè)問(wèn)題, 本文將用完整的代碼和測(cè)試用例進(jìn)行驗(yàn)證, 希望能給小伙伴帶來(lái)一點(diǎn)幫助
二稿黄、準(zhǔn)備工作
也可以直接官網(wǎng)下載, 我這邊都整理到網(wǎng)盤了
需要postman是因?yàn)槲疫€沒(méi)找到j(luò)meter多開窗口的辦法, 哈哈
三、說(shuō)明
springmvc項(xiàng)目
maven依賴
<!--redis-->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>1.6.5.RELEASE</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.7.3</version>
</dependency>
- 核心類
分布式鎖工具類: DistributedLock
測(cè)試接口類: PcInformationServiceImpl
鎖延時(shí)守護(hù)線程類: PostponeTask
四唇撬、實(shí)現(xiàn)思路
先測(cè)試在不開啟鎖延時(shí)線程的情況下, A線程超時(shí)時(shí)間設(shè)為10s, 執(zhí)行業(yè)務(wù)邏輯時(shí)間設(shè)為30s, 10s后, 調(diào)用接口, 查看是否能夠獲取到鎖, 如果獲取到, 說(shuō)明存在線程安全性問(wèn)題
同上, 在加鎖的同時(shí), 開啟鎖延時(shí)線程, 調(diào)用接口, 查看是否能夠獲取到鎖, 如果獲取不到, 說(shuō)明延時(shí)成功, 安全性問(wèn)題解決
五拴鸵、實(shí)現(xiàn)
- 版本01代碼
1)、DistributedLock
package com.cn.pinliang.common.util;
import com.cn.pinliang.common.thread.PostponeTask;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import java.io.Serializable;
import java.util.Collections;
@Component
public class DistributedLock {
@Autowired
private RedisTemplate<Serializable, Object> redisTemplate;
private static final Long RELEASE_SUCCESS = 1L;
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "EX";
// 解鎖腳本(lua)
private static final String RELEASE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
/**
* 分布式鎖
* @param key
* @param value
* @param expireTime 單位: 秒
* @return
*/
public boolean lock(String key, String value, long expireTime) {
return redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> {
Jedis jedis = (Jedis) redisConnection.getNativeConnection();
String result = jedis.set(key, value, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return Boolean.TRUE;
}
return Boolean.FALSE;
});
}
/**
* 解鎖
* @param key
* @param value
* @return
*/
public Boolean unLock(String key, String value) {
return redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> {
Jedis jedis = (Jedis) redisConnection.getNativeConnection();
Object result = jedis.eval(RELEASE_LOCK_SCRIPT, Collections.singletonList(key), Collections.singletonList(value));
if (RELEASE_SUCCESS.equals(result)) {
return Boolean.TRUE;
}
return Boolean.FALSE;
});
}
}
說(shuō)明: 就2個(gè)方法, 加鎖解鎖, 加鎖使用jedis setnx方法, 解鎖執(zhí)行l(wèi)ua腳本, 都是原子性操作
2)撇叁、PcInformationServiceImpl
public JsonResult add() throws Exception {
String key = "add_information_lock";
String value = RandomUtil.produceStringAndNumber(10);
long expireTime = 10L;
boolean lock = distributedLock.lock(key, value, expireTime);
String threadName = Thread.currentThread().getName();
if (lock) {
System.out.println(threadName + " 獲得鎖...............................");
Thread.sleep(30000);
distributedLock.unLock(key, value);
System.out.println(threadName + " 解鎖了...............................");
} else {
System.out.println(threadName + " 未獲取到鎖...............................");
return JsonResult.fail("未獲取到鎖");
}
return JsonResult.succeed();
}
說(shuō)明: 測(cè)試類很簡(jiǎn)單, value隨機(jī)生成, 保證唯一, 不會(huì)在超時(shí)情況下解鎖其他客戶端持有的鎖
3)供鸠、打開redis-desktop-manager客戶端, 刷新緩存, 可以看到, 此時(shí)是沒(méi)有add_information_lock
的key的
4)、啟動(dòng)jmeter, 調(diào)用接口測(cè)試
設(shè)置5個(gè)線程同時(shí)訪問(wèn), 在10s的超時(shí)時(shí)間內(nèi)查看redis, add_information_lock
存在, 多次調(diào)接口, 只有一個(gè)線程能夠獲取到鎖
redis
1-4個(gè)請(qǐng)求, 都未獲取到鎖
第5個(gè)請(qǐng)求, 獲取到鎖
OK, 目前為止, 一切正常, 接下來(lái)測(cè)試10s之后, A仍在執(zhí)行業(yè)務(wù)邏輯, 看別的線程是否能獲取到鎖
可以看到, 操作成功, 說(shuō)明A和B同時(shí)執(zhí)行了這段本應(yīng)該獨(dú)享的代碼, 需要優(yōu)化
- 版本02代碼
1)陨闹、DistributedLock
package com.cn.pinliang.common.util;
import com.cn.pinliang.common.thread.PostponeTask;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import java.io.Serializable;
import java.util.Collections;
@Component
public class DistributedLock {
@Autowired
private RedisTemplate<Serializable, Object> redisTemplate;
private static final Long RELEASE_SUCCESS = 1L;
private static final Long POSTPONE_SUCCESS = 1L;
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "EX";
// 解鎖腳本(lua)
private static final String RELEASE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 延時(shí)腳本
private static final String POSTPONE_LOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return '0' end";
/**
* 分布式鎖
* @param key
* @param value
* @param expireTime 單位: 秒
* @return
*/
public boolean lock(String key, String value, long expireTime) {
// 加鎖
Boolean locked = redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> {
Jedis jedis = (Jedis) redisConnection.getNativeConnection();
String result = jedis.set(key, value, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return Boolean.TRUE;
}
return Boolean.FALSE;
});
if (locked) {
// 加鎖成功, 啟動(dòng)一個(gè)延時(shí)線程, 防止業(yè)務(wù)邏輯未執(zhí)行完畢就因鎖超時(shí)而使鎖釋放
PostponeTask postponeTask = new PostponeTask(key, value, expireTime, this);
Thread thread = new Thread(postponeTask);
thread.setDaemon(Boolean.TRUE);
thread.start();
}
return locked;
}
/**
* 解鎖
* @param key
* @param value
* @return
*/
public Boolean unLock(String key, String value) {
return redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> {
Jedis jedis = (Jedis) redisConnection.getNativeConnection();
Object result = jedis.eval(RELEASE_LOCK_SCRIPT, Collections.singletonList(key), Collections.singletonList(value));
if (RELEASE_SUCCESS.equals(result)) {
return Boolean.TRUE;
}
return Boolean.FALSE;
});
}
/**
* 鎖延時(shí)
* @param key
* @param value
* @param expireTime
* @return
*/
public Boolean postpone(String key, String value, long expireTime) {
return redisTemplate.execute((RedisCallback<Boolean>) redisConnection -> {
Jedis jedis = (Jedis) redisConnection.getNativeConnection();
Object result = jedis.eval(POSTPONE_LOCK_SCRIPT, Lists.newArrayList(key), Lists.newArrayList(value, String.valueOf(expireTime)));
if (POSTPONE_SUCCESS.equals(result)) {
return Boolean.TRUE;
}
return Boolean.FALSE;
});
}
}
說(shuō)明: 新增了鎖延時(shí)方法, lua腳本, 自行腦補(bǔ)相關(guān)語(yǔ)法
2)楞捂、PcInformationServiceImpl不需要改動(dòng)
3)、PostponeTask
package com.cn.pinliang.common.thread;
import com.cn.pinliang.common.util.DistributedLock;
public class PostponeTask implements Runnable {
private String key;
private String value;
private long expireTime;
private boolean isRunning;
private DistributedLock distributedLock;
public PostponeTask() {
}
public PostponeTask(String key, String value, long expireTime, DistributedLock distributedLock) {
this.key = key;
this.value = value;
this.expireTime = expireTime;
this.isRunning = Boolean.TRUE;
this.distributedLock = distributedLock;
}
@Override
public void run() {
long waitTime = expireTime * 1000 * 2 / 3;// 線程等待多長(zhǎng)時(shí)間后執(zhí)行
while (isRunning) {
try {
Thread.sleep(waitTime);
if (distributedLock.postpone(key, value, expireTime)) {
System.out.println("延時(shí)成功...........................................................");
} else {
this.stop();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void stop() {
this.isRunning = Boolean.FALSE;
}
}
說(shuō)明: 調(diào)用lock同時(shí), 立即開啟PostponeTask線程, 線程等待超時(shí)時(shí)間的2/3時(shí)間后, 開始執(zhí)行鎖延時(shí)代碼, 如果延時(shí)成功, add_information_lock
這個(gè)key會(huì)一直存在于redis服務(wù)端, 直到業(yè)務(wù)邏輯執(zhí)行完畢, 因此在此過(guò)程中, 其他線程無(wú)法獲取到鎖, 也即保證了線程安全性
下面是測(cè)試結(jié)果
10s后, 查看redis服務(wù)端, add_information_lock
仍存在, 說(shuō)明延時(shí)成功
此時(shí)用postman再次請(qǐng)求, 發(fā)現(xiàn)獲取不到鎖
看一下控制臺(tái)打印
A線程在19:09:11獲取到鎖, 在10 * 2 / 3 = 6s后進(jìn)行延時(shí), 成功, 保證了業(yè)務(wù)邏輯未執(zhí)行完畢的情況下不會(huì)釋放鎖
A線程執(zhí)行完畢, 鎖釋放, 其他線程又可以競(jìng)爭(zhēng)鎖
OK, 目前為止, 解決了鎖超時(shí)而業(yè)務(wù)邏輯仍在執(zhí)行的鎖沖突問(wèn)題, 還很簡(jiǎn)陋, 而最嚴(yán)謹(jǐn)?shù)姆绞竭€是使用官方的 Redlock 算法實(shí)現(xiàn), 其中 Java 包推薦使用 redisson, 思路差不多其實(shí), 都是在快要超時(shí)時(shí)續(xù)期, 以保證業(yè)務(wù)邏輯未執(zhí)行完畢不會(huì)有其他客戶端持有鎖
后面學(xué)習(xí)redisson, 看一下大神是怎么實(shí)現(xiàn)的
如果有什么不對(duì)的或者可以優(yōu)化的希望小伙伴多多指教, 留言評(píng)論什么的, 謝謝
參考文章: https://blog.csdn.net/weixin_33943347/article/details/88009397