分布式鎖的解決方式
- 基于數(shù)據(jù)庫表做樂觀鎖谋减,用于分布式鎖厨剪。(適用于小并發(fā))
- 使用memcached的add()方法,用于分布式鎖麦向。
- 使用memcached的cas()方法,用于分布式鎖客叉。(不常用)
- 使用redis的setnx()诵竭、expire()方法,用于分布式鎖兼搏。
- 使用redis的setnx()卵慰、get()、getset()方法向族,用于分布式鎖呵燕。
- 使用redis的watch、multi件相、exec命令再扭,用于分布式鎖氧苍。(不常用)
- 使用zookeeper,用于分布式鎖泛范。(不常用)
這里主要介紹第四種和第五種:
前文提供的兩種方式其實都有些問題让虐,要么是死鎖,要么是依賴服務(wù)器時間同步罢荡。從Redis 2.6.12 版本開始赡突, SET 命令可以通過參數(shù)來實現(xiàn)和 SETNX 、 SETEX 和 PSETEX 三個命令的效果区赵。這樣我們的可以將加鎖操作用一個set命令來實現(xiàn)惭缰,直接是原子性操作,既沒有死鎖的風(fēng)險笼才,也不依賴服務(wù)器時間同步漱受,可以完美解決這兩個問題。
在redis文檔上有詳細說明:
http://doc.redisfans.com/string/set.html
使用redis的SET resource-name anystring NX EX max-lock-time 方式骡送,用于分布式鎖
原理
命令 SET resource-name anystring NX EX max-lock-time 是一種在 Redis 中實現(xiàn)鎖的簡單方法昂羡。
客戶端執(zhí)行以上的命令:
- 如果服務(wù)器返回 OK ,那么這個客戶端獲得鎖摔踱。
- 如果服務(wù)器返回 NIL 虐先,那么客戶端獲取鎖失敗,可以在稍后再重試派敷。
- 設(shè)置的過期時間到達之后蛹批,鎖將自動釋放。
可以通過以下修改膀息,讓這個鎖實現(xiàn)更健壯:
- 不使用固定的字符串作為鍵的值般眉,而是設(shè)置一個不可猜測(non-guessable)的長隨機字符串,作為口令串(token)潜支。
- 不使用 DEL 命令來釋放鎖,而是發(fā)送一個 Lua 腳本柿汛,這個腳本只在客戶端傳入的值和鍵的口令串相匹配時冗酿,才對鍵進行刪除。
這兩個改動可以防止持有過期鎖的客戶端誤刪現(xiàn)有鎖的情況出現(xiàn)络断。
以下是一個簡單的解鎖腳本示例:
if redis.call("get",KEYS[1]) == ARGV[1]
then
return redis.call("del",KEYS[1])
else
return 0
end
可能存在的問題
要保證redis支持eval命令
具體實現(xiàn)
鎖具體實現(xiàn)RedisLock:
import io.lettuce.core.RedisFuture;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.SetArgs;
import io.lettuce.core.api.async.RedisScriptingAsyncCommands;
import io.lettuce.core.api.async.RedisStringAsyncCommands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisCommands;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Redis分布式鎖
* 使用 SET resource-name anystring NX EX max-lock-time 實現(xiàn)
* <p>
* 該方案在 Redis 官方 SET 命令頁有詳細介紹裁替。
* http://doc.redisfans.com/string/set.html
* <p>
* 在介紹該分布式鎖設(shè)計之前,我們先來看一下在從 Redis 2.6.12 開始 SET 提供的新特性貌笨,
* 命令 SET key value [EX seconds] [PX milliseconds] [NX|XX]弱判,其中:
* <p>
* EX seconds — 以秒為單位設(shè)置 key 的過期時間;
* PX milliseconds — 以毫秒為單位設(shè)置 key 的過期時間锥惋;
* NX — 將key 的值設(shè)為value 昌腰,當(dāng)且僅當(dāng)key 不存在开伏,等效于 SETNX。
* XX — 將key 的值設(shè)為value 遭商,當(dāng)且僅當(dāng)key 存在固灵,等效于 SETEX。
* <p>
* 命令 SET resource-name anystring NX EX max-lock-time 是一種在 Redis 中實現(xiàn)鎖的簡單方法劫流。
* <p>
* 客戶端執(zhí)行以上的命令:
* <p>
* 如果服務(wù)器返回 OK 巫玻,那么這個客戶端獲得鎖。
* 如果服務(wù)器返回 NIL 祠汇,那么客戶端獲取鎖失敗仍秤,可以在稍后再重試。
*
* @author yuhao.wangwang
*/
public class RedisLock {
private static Logger logger = LoggerFactory.getLogger(RedisLock.class);
private RedisTemplate redisTemplate;
/**
* 將key 的值設(shè)為value 可很,當(dāng)且僅當(dāng)key 不存在徒扶,等效于 SETNX。
*/
public static final String NX = "NX";
/**
* seconds — 以秒為單位設(shè)置 key 的過期時間根穷,等效于EXPIRE key seconds
*/
public static final String EX = "EX";
/**
* 調(diào)用set后的返回值
*/
public static final String OK = "OK";
/**
* 默認請求鎖的超時時間(ms 毫秒)
*/
private static final long TIME_OUT = 100;
/**
* 默認鎖的有效時間(s)
*/
public static final int EXPIRE = 60;
/**
* 解鎖的lua腳本
*/
public static final String UNLOCK_LUA;
static {
StringBuilder sb = new StringBuilder();
sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
sb.append("then ");
sb.append(" return redis.call(\"del\",KEYS[1]) ");
sb.append("else ");
sb.append(" return 0 ");
sb.append("end ");
UNLOCK_LUA = sb.toString();
}
/**
* 鎖標(biāo)志對應(yīng)的key
*/
private String lockKey;
/**
* 記錄到日志的鎖標(biāo)志對應(yīng)的key
*/
private String lockKeyLog = "";
/**
* 鎖對應(yīng)的值
*/
private String lockValue;
/**
* 鎖的有效時間(s)
*/
private int expireTime = EXPIRE;
/**
* 請求鎖的超時時間(ms)
*/
private long timeOut = TIME_OUT;
/**
* 鎖標(biāo)記
*/
private volatile boolean locked = false;
final Random random = new Random();
/**
* 使用默認的鎖過期時間和請求鎖的超時時間
*
* @param redisTemplate
* @param lockKey 鎖的key(Redis的Key)
*/
public RedisLock(RedisTemplate redisTemplate, String lockKey) {
this.redisTemplate = redisTemplate;
this.lockKey = lockKey + "_lock";
}
/**
* 使用默認的請求鎖的超時時間姜骡,指定鎖的過期時間
*
* @param redisTemplate
* @param lockKey 鎖的key(Redis的Key)
* @param expireTime 鎖的過期時間(單位:秒)
*/
public RedisLock(RedisTemplate redisTemplate, String lockKey, int expireTime) {
this(redisTemplate, lockKey);
this.expireTime = expireTime;
}
/**
* 使用默認的鎖的過期時間,指定請求鎖的超時時間
*
* @param redisTemplate
* @param lockKey 鎖的key(Redis的Key)
* @param timeOut 請求鎖的超時時間(單位:毫秒)
*/
public RedisLock(StringRedisTemplate redisTemplate, String lockKey, long timeOut) {
this(redisTemplate, lockKey);
this.timeOut = timeOut;
}
/**
* 鎖的過期時間和請求鎖的超時時間都是用指定的值
*
* @param redisTemplate
* @param lockKey 鎖的key(Redis的Key)
* @param expireTime 鎖的過期時間(單位:秒)
* @param timeOut 請求鎖的超時時間(單位:毫秒)
*/
public RedisLock(RedisTemplate redisTemplate, String lockKey, int expireTime, long timeOut) {
this(redisTemplate, lockKey, expireTime);
this.timeOut = timeOut;
}
/**
* 嘗試獲取鎖 超時返回
*
* @return
*/
public boolean tryLock() {
// 生成隨機key
lockValue = UUID.randomUUID().toString();
// 請求鎖超時時間屿良,納秒
long timeout = timeOut * 1000000;
// 系統(tǒng)當(dāng)前時間圈澈,納秒
long nowTime = System.nanoTime();
while ((System.nanoTime() - nowTime) < timeout) {
if (OK.equalsIgnoreCase(this.set(lockKey, lockValue, expireTime))) {
locked = true;
// 上鎖成功結(jié)束請求
return true;
}
// 每次請求等待一段時間
seleep(10, 50000);
}
return locked;
}
/**
* 嘗試獲取鎖 立即返回
*
* @return 是否成功獲得鎖
*/
public boolean lock() {
lockValue = UUID.randomUUID().toString();
//不存在則添加 且設(shè)置過期時間(單位ms)
String result = set(lockKey, lockValue, expireTime);
locked = OK.equalsIgnoreCase(result);
return locked;
}
/**
* 以阻塞方式的獲取鎖
*
* @return 是否成功獲得鎖
*/
public boolean lockBlock() {
lockValue = UUID.randomUUID().toString();
while (true) {
//不存在則添加 且設(shè)置過期時間(單位ms)
String result = set(lockKey, lockValue, expireTime);
if (OK.equalsIgnoreCase(result)) {
locked = true;
return locked;
}
// 每次請求等待一段時間
seleep(10, 50000);
}
}
/**
* 解鎖
* <p>
* 可以通過以下修改,讓這個鎖實現(xiàn)更健壯:
* <p>
* 不使用固定的字符串作為鍵的值尘惧,而是設(shè)置一個不可猜測(non-guessable)的長隨機字符串康栈,作為口令串(token)。
* 不使用 DEL 命令來釋放鎖喷橙,而是發(fā)送一個 Lua 腳本啥么,這個腳本只在客戶端傳入的值和鍵的口令串相匹配時,才對鍵進行刪除贰逾。
* 這兩個改動可以防止持有過期鎖的客戶端誤刪現(xiàn)有鎖的情況出現(xiàn)悬荣。
*/
public Object unlock() {
// 只有加鎖成功并且鎖還有效才去釋放鎖
// 只有加鎖成功并且鎖還有效才去釋放鎖
if (locked) {
try {
return redisTemplate.execute((RedisConnection connection) -> {
Object nativeConnection = connection.getNativeConnection();
Long result = 0L;
List<String> keys = new ArrayList<>();
keys.add(lockKey);
List<String> values = new ArrayList<>();
values.add(lockValue);
// jedis集群模式
if (nativeConnection instanceof JedisCluster) {
result = (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, values);
} else if (nativeConnection instanceof Jedis) {
// jedis單機模式
result = (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, values);
} else if (nativeConnection instanceof RedisScriptingAsyncCommands) {
// lettuce
try {
List<byte[]> bkeys = keys.stream().map(key -> redisTemplate.getKeySerializer().serialize(lockKey)).collect(Collectors.toList());
List<byte[]> bargs = values.stream().map(arg -> redisTemplate.getValueSerializer().serialize(lockValue)).collect(Collectors.toList());
RedisFuture<Long> future = ((RedisScriptingAsyncCommands) nativeConnection).eval(UNLOCK_LUA, ScriptOutputType.INTEGER, bkeys.toArray(new byte[0][0]), bargs.toArray(new byte[0][0]));
result = future.get(1, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
return redisTemplate.delete(lockKey);
}
if (result == 0 && !StringUtils.isEmpty(lockKeyLog)) {
logger.debug("Redis分布式鎖,解鎖{}失敻斫!氯迂!解鎖時間:{}", lockKeyLog, System.currentTimeMillis());
}
locked = result == 0;
return result == 1;
});
} catch (Throwable e) {
logger.warn("Redis不支持EVAL命令,使用降級方式解鎖:{}", e.getMessage());
String value = this.get(lockKey, String.class);
if (lockValue.equals(value)) {
redisTemplate.delete(lockKey);
return true;
}
return false;
}
}
return true;
}
/**
* 獲取鎖狀態(tài)
*
* @return
* @Title: isLock
* @author yuhao.wang
*/
public boolean isLock() {
return locked;
}
/**
* 重寫redisTemplate的set方法
* <p>
* 命令 SET resource-name anystring NX EX max-lock-time 是一種在 Redis 中實現(xiàn)鎖的簡單方法言缤。
* <p>
* 客戶端執(zhí)行以上的命令:
* <p>
* 如果服務(wù)器返回 OK 嚼蚀,那么這個客戶端獲得鎖。
* 如果服務(wù)器返回 NIL 管挟,那么客戶端獲取鎖失敗轿曙,可以在稍后再重試。
*
* @param key 鎖的Key
* @param value 鎖里面的值
* @param seconds 過去時間(秒)
* @return
*/
private String set(final String key, final String value, final long seconds) {
Assert.isTrue(!StringUtils.isEmpty(key), "key不能為空");
return (String) redisTemplate.execute(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
Object nativeConnection = connection.getNativeConnection();
String result = null;
// jedis
if (nativeConnection instanceof JedisCommands) {
result = ((JedisCommands) nativeConnection).set(key, value, NX, EX, seconds);
} else if (nativeConnection instanceof RedisStringAsyncCommands) {
// lettuce
try {
byte[] serializeKey = redisTemplate.getKeySerializer().serialize(key);
byte[] serializeValue = redisTemplate.getValueSerializer().serialize(value);
RedisFuture<String> future = ((RedisStringAsyncCommands) nativeConnection).set(serializeKey, serializeValue, SetArgs.Builder.nx().ex(seconds));
result = future.get(1, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
// 默認方式
boolean flag = redisTemplate.opsForValue().setIfAbsent(key, value, seconds, TimeUnit.SECONDS);
if (flag) {
redisTemplate.expire(key, seconds, TimeUnit.SECONDS);
result = OK;
}
}
if (!StringUtils.isEmpty(lockKeyLog) && !StringUtils.isEmpty(result)) {
logger.info("獲取鎖{}的時間:{}", lockKeyLog, System.currentTimeMillis());
}
return result;
}
});
}
/**
* 獲取redis里面的值
*
* @param key key
* @param aClass class
* @return T
*/
private <T> T get(final String key, Class<T> aClass) {
Assert.isTrue(!StringUtils.isEmpty(key), "key不能為空");
return (T) redisTemplate.execute((RedisConnection connection) -> {
Object nativeConnection = connection.getNativeConnection();
Object result = null;
if (nativeConnection instanceof JedisCommands) {
result = ((JedisCommands) nativeConnection).get(key);
}
return (T) result;
});
}
/**
* @param millis 毫秒
* @param nanos 納秒
* @Title: seleep
* @Description: 線程等待時間
* @author yuhao.wang
*/
private void seleep(long millis, int nanos) {
try {
Thread.sleep(millis, random.nextInt(nanos));
} catch (InterruptedException e) {
logger.info("獲取分布式鎖休眠被中斷:", e);
}
}
public String getLockKeyLog() {
return lockKeyLog;
}
public void setLockKeyLog(String lockKeyLog) {
this.lockKeyLog = lockKeyLog;
}
public int getExpireTime() {
return expireTime;
}
public void setExpireTime(int expireTime) {
this.expireTime = expireTime;
}
public long getTimeOut() {
return timeOut;
}
public void setTimeOut(long timeOut) {
this.timeOut = timeOut;
}
}
調(diào)用方式:
public void redisLock3(int i) {
RedisLock redisLock3 = new RedisLock(redisTemplate, "redisLock:" + i % 10, 5 * 60, 500);
try {
long now = System.currentTimeMillis();
if (redisLock3.tryLock()) {
logger.info("=" + (System.currentTimeMillis() - now));
// TODO 獲取到鎖要執(zhí)行的代碼塊
logger.info("j:" + j++);
} else {
logger.info("k:" + k++);
}
} catch (Exception e) {
logger.info(e.getMessage(), e);
} finally {
redisLock2.unlock();
}
}
對于這種種redis實現(xiàn)分布式鎖的方案還是有一個問題:就是你獲取鎖后執(zhí)行業(yè)務(wù)邏輯的代碼只能在redis鎖的有效時間之內(nèi),因為导帝,redis的key到期后會自動清除守谓,這個鎖就算釋放了。所以這個鎖的有效時間一定要結(jié)合業(yè)務(wù)做好評估舟扎。
源碼: https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases
spring-boot-student-data-redis-distributed-lock 工程
參考: