目前實現(xiàn)分布式鎖的方式主要有數(shù)據(jù)庫虏等、Redis和Zookeeper三種捕捂,本文主要闡述利用Redis的相關(guān)命令來實現(xiàn)分布式鎖拷邢。
相關(guān)redis命令
SETNX
如果當前中沒有值,則將其設(shè)置為并返回1嘁字,否則返回0夫椭。
EXPIRE
將設(shè)置為秒后自動過期掸掸。
GETSET
將的值設(shè)置為,并返回其原來的舊值蹭秋。如果原來沒有舊值扰付,則返回nil。
EVAL與EVALSHA
Redis 2.6之后支持的功能仁讨,可以將一段lua腳本發(fā)送到Redis服務(wù)器運行羽莺。
起——分布式鎖初探
利用SETNX命令的原子性,我們可以簡單的實現(xiàn)一個初步的分布式鎖(這里原理就不詳述了洞豁,直接上偽代碼):
boolean tryLock(String key, int lockSeconds) {
if (SETNX key "1" == 1) {
EXPIRE key lockSeconds
return true
} else {
return false
}
}
boolean unlock(String key) {
DEL key
}
tryLock是一個非阻塞的分布式鎖方法盐固,在獲得鎖失敗后會立即返回。如果需要一個阻塞式的鎖方法丈挟,可以將tryLock方法包裝為輪詢(以一定的時間間隔來輪詢刁卜,這很重要,否則Redis會吃不消J镅省)蛔趴。
此種方法看似沒有什么問題,但其實則有一個漏洞:在加鎖的過程中例朱,客戶端順序的向Redis服務(wù)器發(fā)送了SETNX和EXPIRE命令夺脾,那么假設(shè)在SETNX命令執(zhí)行完成之后之拨,在EXPIRE命令發(fā)出去之前客戶端發(fā)生崩潰(或客戶端與Redis服務(wù)器的網(wǎng)絡(luò)連接突然斷掉)茉继,導(dǎo)致EXPIRE命令沒有得到執(zhí)行咧叭,其他客戶端將會發(fā)生永久死鎖!
承——分布式鎖的改進
更新:此方法解鎖存在漏洞烁竭,具體見最文后的追加內(nèi)容菲茬。
為解決上面提出的問題,可以在加鎖時在key中存儲這個鎖過期的時間(當前客戶端時間戳+鎖時間)派撕,然后在獲取鎖失敗時婉弹,取出value與當前客戶端時間進行比較搂抒,如果確定是已經(jīng)過期的鎖笼裳,則可以確認發(fā)生了上面描述的錯誤情況刁岸,此時可以使用DEL清掉這個key展鸡,然后再重新嘗試去獲得這個鎖吝秕∠芏茫可以嗎慎冤?當然不可以整葡!如果沒辦法保證DEL操作和下次SETNX操作之間的原子性姆打,則還是會產(chǎn)生一個競態(tài)條件良姆,比如這樣:
C1 DEL key
C1 SETNX key <expireTime>
C2 DEL key
C2 SETNX key <expireTime>
當Redis服務(wù)器收到這樣的指令序列時,C1和C2的SETNX都同時返回了1幔戏,此時C1和C2都認為自己拿到了鎖玛追,這種情況明顯是不符合預(yù)期的。
為解決這個問題闲延,Redis的GETSET命令就派上用場了痊剖。客戶端可以使用GETSET命令去設(shè)置自己的過期時間垒玲,然后得到的返回值與之前GET到的返回值進行比較陆馁,如果不同,則表示這個過期的鎖被其他客戶端搶占了(此時GETSET命令其實已經(jīng)生效侍匙,也就是說key中的過期時間已經(jīng)被修改氮惯,不過此誤差很小,可以忽略不計)想暗。
根據(jù)上面的分析思路妇汗,可以得出一個改進后的分布式鎖,這里直接給出Java的實現(xiàn)代碼:
public class RedisLock {
private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);
private final StringRedisTemplate stringRedisTemplate;
private final byte[] lockKey;
public RedisLock(StringRedisTemplate stringRedisTemplate, String lockKey) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockKey = lockKey.getBytes();
}
private boolean tryLock(RedisConnection conn, int lockSeconds) throws Exception {
long nowTime = System.currentTimeMillis();
long expireTime = nowTime + lockSeconds * 1000 + 1000; // 容忍不同服務(wù)器時間有1秒內(nèi)的誤差
if (conn.setNX(lockKey, longToBytes(expireTime))) {
conn.expire(lockKey, lockSeconds);
return true;
} else {
byte[] oldValue = conn.get(lockKey);
if (oldValue != null && bytesToLong(oldValue) < nowTime) {
// 這個鎖已經(jīng)過期了说莫,可以獲得它
// PS: 如果setNX和expire之間客戶端發(fā)生崩潰杨箭,可能會出現(xiàn)這樣的情況
byte[] oldValue2 = conn.getSet(lockKey, longToBytes(expireTime));
if (Arrays.equals(oldValue, oldValue2)) {
// 獲得了鎖
conn.expire(lockKey, lockSeconds);
return true;
} else {
// 被別人搶占了鎖(此時已經(jīng)修改了lockKey中的值,不過誤差很小可以忽略)
return false;
}
}
}
return false;
}
/**
* 嘗試獲得鎖储狭,成功返回true互婿,如果失敗或異常立即返回false
*
* @param lockSeconds 加鎖的時間(秒)捣郊,超過這個時間后鎖會自動釋放
*/
public boolean tryLock(final int lockSeconds) {
return stringRedisTemplate.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection conn) throws DataAccessException {
try {
return tryLock(conn, lockSeconds);
} catch (Exception e) {
logger.error("tryLock Error", e);
return false;
}
}
});
}
/**
* 輪詢的方式去獲得鎖,成功返回true慈参,超過輪詢次數(shù)或異常返回false
*
* @param lockSeconds 加鎖的時間(秒)呛牲,超過這個時間后鎖會自動釋放
* @param tryIntervalMillis 輪詢的時間間隔(毫秒)
* @param maxTryCount 最大的輪詢次數(shù)
*/
public boolean tryLock(final int lockSeconds, final long tryIntervalMillis, final int maxTryCount) {
return stringRedisTemplate.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection conn) throws DataAccessException {
int tryCount = 0;
while (true) {
if (++tryCount >= maxTryCount) {
// 獲取鎖超時
return false;
}
try {
if (tryLock(conn, lockSeconds)) {
return true;
}
} catch (Exception e) {
logger.error("tryLock Error", e);
return false;
}
try {
Thread.sleep(tryIntervalMillis);
} catch (InterruptedException e) {
logger.error("tryLock interrupted", e);
return false;
}
}
}
});
}
/**
* 如果加鎖后的操作比較耗時,調(diào)用方其實可以在unlock前根據(jù)時間判斷下鎖是否已經(jīng)過期
* 如果已經(jīng)過期可以不用調(diào)用驮配,減少一次請求
*/
public void unlock() {
stringRedisTemplate.delete(new String(lockKey));
}
public byte[] longToBytes(long value) {
ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE / Byte.SIZE);
buffer.putLong(value);
return buffer.array();
}
public long bytesToLong(byte[] bytes) {
if (bytes.length != Long.SIZE / Byte.SIZE) {
throw new IllegalArgumentException("wrong length of bytes!");
}
return ByteBuffer.wrap(bytes).getLong();
}
}
轉(zhuǎn)——分布式鎖的優(yōu)化
更新:此方法解鎖存在漏洞娘扩,具體見本后最后的追加內(nèi)容。
以上的分布式鎖實現(xiàn)邏輯已經(jīng)較為復(fù)雜壮锻,涉及到了較多的Redis命令琐旁,并使得每一次嘗試加鎖的過程都會有至少2次的Redis命令執(zhí)行,這也就意味著至少兩次與Redis服務(wù)器的網(wǎng)絡(luò)通信猜绣。而添加后面復(fù)雜邏輯的原因只是因為SETNX與EXPIRE這兩條命令執(zhí)行的原子性無法得到保證灰殴。(有些同學會提到Redis的pipeline特性,此處明顯不適用掰邢,因為第二條指令的執(zhí)行以來與第一條執(zhí)行的結(jié)果牺陶,pipeline無法實現(xiàn))
另外,上面的分布式鎖還有一個問題尸变,那就是服務(wù)器之間時間同步的問題义图。在分布式場景中,多臺服務(wù)器之間的時間做到同步是非常困難的召烂,所以在代碼中我加了1秒的時間容錯碱工,但依賴服務(wù)器時間的同步還是可能會不靠譜的。
從Redis 2.6開始奏夫,客戶端可以直接向Redis服務(wù)器提交Lua腳本怕篷,也就是說可以直接在Redis服務(wù)器來執(zhí)行一些較復(fù)雜的邏輯,而此腳本的提交對于客戶端來說是相對原子性的酗昼。這恰好解決了我們的問題廊谓!
我們可以用一個這樣的lua腳本來描述加鎖的邏輯(關(guān)于腳本的提交命令和Redis的相關(guān)規(guī)則可以看https://redis.io/commands/eval):
if (redis.call('setnx', KEYS[1], ARGV[1]) == 1) then
redis.call('expire', KEYS[1], tonumber(ARGV[2]))
return true
else
return false
end
注意:此腳本中命令的執(zhí)行并不是嚴格意義上的原子性,如果其中第二條指令EXPIRE執(zhí)行失敗麻削,整個腳本執(zhí)行會返回錯誤蒸痹,但是第一條指令SETNX仍然是已經(jīng)生效的!不過此種情況基本可以認為是Redis服務(wù)器已經(jīng)崩潰(除非是開發(fā)階段就可以排除的參數(shù)錯誤之類的問題)呛哟,那么鎖的安全性就已經(jīng)不是這里可以關(guān)注的點了叠荠。這里認為對客戶端來說是相對原子性的就足夠了。
這個簡單的腳本在Redis服務(wù)器得到執(zhí)行扫责,并返回是否得到鎖榛鼎。因為腳本的提交執(zhí)行只有一條Redis命令,就避免了上面所說的客戶端異常問題。
使用腳本優(yōu)化了鎖的邏輯和性能者娱,這里給出最終的Java實現(xiàn)代碼:
public class RedisLock {
private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);
private final StringRedisTemplate stringRedisTemplate;
private final String lockKey;
private final List<String> keys;
/**
* 使用腳本在redis服務(wù)器執(zhí)行這個邏輯可以在一定程度上保證此操作的原子性
* (即不會發(fā)生客戶端在執(zhí)行setNX和expire命令之間抡笼,發(fā)生崩潰或失去與服務(wù)器的連接導(dǎo)致expire沒有得到執(zhí)行,發(fā)生永久死鎖)
* <p>
* 除非腳本在redis服務(wù)器執(zhí)行時redis服務(wù)器發(fā)生崩潰黄鳍,不過此種情況鎖也會失效
*/
private static final RedisScript<Boolean> SETNX_AND_EXPIRE_SCRIPT;
static {
StringBuilder sb = new StringBuilder();
sb.append("if (redis.call('setnx', KEYS[1], ARGV[1]) == 1) then\n");
sb.append("\tredis.call('expire', KEYS[1], tonumber(ARGV[2]))\n");
sb.append("\treturn true\n");
sb.append("else\n");
sb.append("\treturn false\n");
sb.append("end");
SETNX_AND_EXPIRE_SCRIPT = new RedisScriptImpl<Boolean>(sb.toString(), Boolean.class);
}
public RedisLock(StringRedisTemplate stringRedisTemplate, String lockKey) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockKey = lockKey;
this.keys = Collections.singletonList(lockKey);
}
private boolean doTryLock(int lockSeconds) throws Exception {
return stringRedisTemplate.execute(SETNX_AND_EXPIRE_SCRIPT, keys, "1", String.valueOf(lockSeconds));
}
/**
* 嘗試獲得鎖推姻,成功返回true,如果失敗立即返回false
*
* @param lockSeconds 加鎖的時間(秒)际起,超過這個時間后鎖會自動釋放
*/
public boolean tryLock(int lockSeconds) {
try {
return doTryLock(lockSeconds);
} catch (Exception e) {
logger.error("tryLock Error", e);
return false;
}
}
/**
* 輪詢的方式去獲得鎖拾碌,成功返回true,超過輪詢次數(shù)或異常返回false
*
* @param lockSeconds 加鎖的時間(秒)街望,超過這個時間后鎖會自動釋放
* @param tryIntervalMillis 輪詢的時間間隔(毫秒)
* @param maxTryCount 最大的輪詢次數(shù)
*/
public boolean tryLock(final int lockSeconds, final long tryIntervalMillis, final int maxTryCount) {
int tryCount = 0;
while (true) {
if (++tryCount >= maxTryCount) {
// 獲取鎖超時
return false;
}
try {
if (doTryLock(lockSeconds)) {
return true;
}
} catch (Exception e) {
logger.error("tryLock Error", e);
return false;
}
try {
Thread.sleep(tryIntervalMillis);
} catch (InterruptedException e) {
logger.error("tryLock interrupted", e);
return false;
}
}
}
/**
* 如果加鎖后的操作比較耗時,調(diào)用方其實可以在unlock前根據(jù)時間判斷下鎖是否已經(jīng)過期
* 如果已經(jīng)過期可以不用調(diào)用弟跑,減少一次請求
*/
public void unlock() {
stringRedisTemplate.delete(lockKey);
}
private static class RedisScriptImpl<T> implements RedisScript<T> {
private final String script;
private final String sha1;
private final Class<T> resultType;
public RedisScriptImpl(String script, Class<T> resultType) {
this.script = script;
this.sha1 = DigestUtils.sha1DigestAsHex(script);
this.resultType = resultType;
}
@Override
public String getSha1() {
return sha1;
}
@Override
public Class<T> getResultType() {
return resultType;
}
@Override
public String getScriptAsString() {
return script;
}
}
}
合——小節(jié)
最后灾前,此文內(nèi)容只是筆者自己學習折騰出來的結(jié)果,如果還有什么筆者沒有考慮到的bug存在孟辑,還請不吝指出哎甲,大家一起學習進步~
追——解鎖漏洞(更新)
經(jīng)過慎重考慮,發(fā)現(xiàn)以上實現(xiàn)的分布式鎖有一個較為嚴重的解鎖漏洞:因為解鎖操作只是做了簡單的DEL KEY饲嗽,如果某客戶端在獲得鎖后執(zhí)行業(yè)務(wù)的時間超過了鎖的過期時間炭玫,則最后的解鎖操作會誤解掉其他客戶端的操作。
為解決此問題貌虾,我們在創(chuàng)建RedisLock對象時用本機時間戳和UUID來創(chuàng)建一個絕對唯一的lockValue吞加,然后在加鎖時存入此值,并在解鎖前用GET取出值進行比較尽狠,如果匹配才做DEL衔憨。這里依然需要用LUA腳本保證整個解鎖過程的原子性。
這里給出修復(fù)此漏洞并做了一些小優(yōu)化之后的代碼:
import java.util.Collections;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DigestUtils;
import org.springframework.data.redis.core.script.RedisScript;
/**
* Created On 10/24 2017
* Redis實現(xiàn)的分布式鎖(不可重入)
* 此對象非線程安全袄膏,使用時務(wù)必注意
*/
public class RedisLock {
private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);
private final StringRedisTemplate stringRedisTemplate;
private final String lockKey;
private final String lockValue;
private boolean locked = false;
/**
* 使用腳本在redis服務(wù)器執(zhí)行這個邏輯可以在一定程度上保證此操作的原子性
* (即不會發(fā)生客戶端在執(zhí)行setNX和expire命令之間践图,發(fā)生崩潰或失去與服務(wù)器的連接導(dǎo)致expire沒有得到執(zhí)行,發(fā)生永久死鎖)
* <p>
* 除非腳本在redis服務(wù)器執(zhí)行時redis服務(wù)器發(fā)生崩潰沉馆,不過此種情況鎖也會失效
*/
private static final RedisScript<Boolean> SETNX_AND_EXPIRE_SCRIPT;
static {
StringBuilder sb = new StringBuilder();
sb.append("if (redis.call('setnx', KEYS[1], ARGV[1]) == 1) then\n");
sb.append("\tredis.call('expire', KEYS[1], tonumber(ARGV[2]))\n");
sb.append("\treturn true\n");
sb.append("else\n");
sb.append("\treturn false\n");
sb.append("end");
SETNX_AND_EXPIRE_SCRIPT = new RedisScriptImpl<Boolean>(sb.toString(), Boolean.class);
}
private static final RedisScript<Boolean> DEL_IF_GET_EQUALS;
static {
StringBuilder sb = new StringBuilder();
sb.append("if (redis.call('get', KEYS[1]) == ARGV[1]) then\n");
sb.append("\tredis.call('del', KEYS[1])\n");
sb.append("\treturn true\n");
sb.append("else\n");
sb.append("\treturn false\n");
sb.append("end");
DEL_IF_GET_EQUALS = new RedisScriptImpl<Boolean>(sb.toString(), Boolean.class);
}
public RedisLock(StringRedisTemplate stringRedisTemplate, String lockKey) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockKey = lockKey;
this.lockValue = UUID.randomUUID().toString() + "." + System.currentTimeMillis();
}
private boolean doTryLock(int lockSeconds) throws Exception {
if (locked) {
throw new IllegalStateException("already locked!");
}
locked = stringRedisTemplate.execute(SETNX_AND_EXPIRE_SCRIPT, Collections.singletonList(lockKey), lockValue,
String.valueOf(lockSeconds));
return locked;
}
/**
* 嘗試獲得鎖码党,成功返回true,如果失敗立即返回false
*
* @param lockSeconds 加鎖的時間(秒)斥黑,超過這個時間后鎖會自動釋放
*/
public boolean tryLock(int lockSeconds) {
try {
return doTryLock(lockSeconds);
} catch (Exception e) {
logger.error("tryLock Error", e);
return false;
}
}
/**
* 輪詢的方式去獲得鎖揖盘,成功返回true,超過輪詢次數(shù)或異常返回false
*
* @param lockSeconds 加鎖的時間(秒)心赶,超過這個時間后鎖會自動釋放
* @param tryIntervalMillis 輪詢的時間間隔(毫秒)
* @param maxTryCount 最大的輪詢次數(shù)
*/
public boolean tryLock(final int lockSeconds, final long tryIntervalMillis, final int maxTryCount) {
int tryCount = 0;
while (true) {
if (++tryCount >= maxTryCount) {
// 獲取鎖超時
return false;
}
try {
if (doTryLock(lockSeconds)) {
return true;
}
} catch (Exception e) {
logger.error("tryLock Error", e);
return false;
}
try {
Thread.sleep(tryIntervalMillis);
} catch (InterruptedException e) {
logger.error("tryLock interrupted", e);
return false;
}
}
}
/**
* 解鎖操作
*/
public void unlock() {
if (!locked) {
throw new IllegalStateException("not locked yet!");
}
locked = false;
// 忽略結(jié)果
stringRedisTemplate.execute(DEL_IF_GET_EQUALS, Collections.singletonList(lockKey), lockValue);
}
private static class RedisScriptImpl<T> implements RedisScript<T> {
private final String script;
private final String sha1;
private final Class<T> resultType;
public RedisScriptImpl(String script, Class<T> resultType) {
this.script = script;
this.sha1 = DigestUtils.sha1DigestAsHex(script);
this.resultType = resultType;
}
@Override
public String getSha1() {
return sha1;
}
@Override
public Class<T> getResultType() {
return resultType;
}
@Override
public String getScriptAsString() {
return script;
}
}
}