分布式鎖介紹
分布式并發(fā)環(huán)境下,為了保證事務(wù)操作的原子性呀舔,需要引入分布式鎖來保證一連串行為是原子性操作
例如經(jīng)典的自增1操作
value = get(a);
set(a, value+1);
假設(shè)當(dāng)前有兩個(gè)線程要執(zhí)行上面的操作特碳,要保證線程a在保存新值之前蹋岩,線程b不會讀取到舊的值進(jìn)行加一操作,一種實(shí)現(xiàn)方案就是引入分布式鎖杆逗,保證該資源全局僅被一個(gè)線程占有
分布式鎖設(shè)計(jì)
分布式鎖最重要的操作就是加鎖與解鎖双吆,除此以外還需要設(shè)置鎖的過期時(shí)間艳馒,防止死鎖問題的發(fā)生
加鎖與解鎖需要明確鎖的是什么,對于同一類業(yè)務(wù)操作同一個(gè)資源對象,有且只有一個(gè)標(biāo)識符對應(yīng)到該資源對象昵慌,反之亦然磨隘。即鎖定該資源對象的標(biāo)識符需要是全局唯一的蒸其,在redis中就是鎖對應(yīng)的key的設(shè)計(jì)需要全局唯一
分布式鎖的實(shí)現(xiàn)比較
Github: https://github.com/ChaselX/devbox-spring-boot-starter
Gitee: https://gitee.com/chasel96/devbox-spring-boot-starter
實(shí)現(xiàn)方案一 基于redis超時(shí)機(jī)制的分布式鎖實(shí)現(xiàn)
最簡單的加鎖實(shí)現(xiàn)就是往redis中放入一個(gè)key侵歇,通過redis的setIfAbsent()
方法腺毫,若插入失敗(該key對應(yīng)的記錄已經(jīng)存在)則代表當(dāng)前對應(yīng)的資源已經(jīng)被其他線程鎖住了挣柬,休眠一段時(shí)間后再次嘗試獲取鎖
而解鎖就是從redis刪除該key對應(yīng)的記錄
為了防止線程長時(shí)間占有鎖不釋放導(dǎo)致死鎖潮酒,在加鎖的時(shí)候設(shè)置該緩存的過期時(shí)間為n秒,當(dāng)n秒過去邪蛔,鎖自動釋放
實(shí)現(xiàn)代碼:
@Component
public class DistributedLockCacheImpl implements DistributedLockCache {
private static Logger logger = LoggerFactory.getLogger(DistributedLockCacheImpl.class);
@Autowired
private StringRedisTemplate stringRedisTemplate;
private final static int EXPIRE_MS = 15 * 1000;
private final static int TIME_OUT_MS = 10 * 1000;
private final static String PREFIX_LOCK = "demo:distributed-lock:";
public void lock(String id) {
String key = PREFIX_LOCK + id;
int delay = 100;
int timeout = TIME_OUT_MS;
try {
while (timeout >= 0) {
Boolean result = redisTemplateString.opsForValue().setIfAbsent(key, "", EXPIRE_MS, TimeUnit.MILLISECONDS);
if (result) {
return;
}
timeout -= delay;
Thread.sleep(delay);
}
throw new DistributedLockException();
}catch (Exception ex){
throw new DistributedLockException();
}
}
public void release(String id) {
String key = PREFIX_LOCK + id;
stringRedisTemplate.delete(key);
}
}
這種實(shí)現(xiàn)有個(gè)問題:
- 若線程阻塞時(shí)間大于鎖的超時(shí)時(shí)間急黎,直接刪除key會出現(xiàn)問題,有可能鎖已經(jīng)自動釋放了店溢,而此時(shí)刪除的可能是其他線程的鎖
優(yōu)化實(shí)現(xiàn)方案一
引入隨機(jī)值的概念叁熔,存入redis的value改為隨機(jī)值委乌,在釋放鎖的時(shí)候判斷當(dāng)前鎖的value是否和隨機(jī)值一致床牧,若一致才進(jìn)行刪除操作,這需要引入redis事務(wù)遭贸,事務(wù)的使用參見深入理解redis事務(wù)
public boolean acquire(String id, String randomStr) {
int delay = 100;
int timeout = TIME_OUT_MS;
randomStr = UUID.randomUUID().toString();
try {
while (timeout >= 0) {
Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, randomStr, expireMsecs, TimeUnit.MILLISECONDS);
if (result != null && result) {
return true;
}
timeout -= delay;
Thread.sleep(delay);
}
throw new DistributedLockException();
} catch (Exception e) {
throw new DistributedLockException();
}
}
public void release(String id, String randomStr) {
String key = PREFIX_LOCK + id;
stringRedisTemplate.watch(key);
String currentValue = stringRedisTemplate.opsForValue().get(key);
if (null==currentValue || !currentValue.equals(randomStr)) {
return;
}
stringRedisTemplate.multi();
stringRedisTemplate.delete(key);
stringRedisTemplate.exec();
}
釋放鎖這部分事務(wù)也可以使用redis腳本代替戈咳,Redis腳本也是事務(wù)型的。因此壕吹,可以通過Redis事務(wù)實(shí)現(xiàn)的功能著蛙,同樣也可以通過Redis腳本來實(shí)現(xiàn),而且通常腳本更簡單耳贬、更快速踏堡。
實(shí)現(xiàn)方案二 鎖超時(shí)時(shí)間約等于過期時(shí)間的分布式鎖解決方案
這種實(shí)現(xiàn)不依賴于redis超時(shí)機(jī)制,也不用擔(dān)心因?yàn)殒I未刪除而帶來的死鎖問題咒劲。但由于其實(shí)現(xiàn)依賴于系統(tǒng)當(dāng)前時(shí)間顷蟆,需要保證服務(wù)器之間的系統(tǒng)時(shí)鐘同步問題(linux可參考ntp時(shí)鐘同步)。一般生產(chǎn)環(huán)境機(jī)器是默認(rèn)需要做時(shí)鐘同步的腐魂。
之所以說鎖超時(shí)時(shí)間約等于expireMsecs是因?yàn)樵摲桨笩o法嚴(yán)格保證鎖的過期時(shí)間為expireMsecs
秒帐偎,因?yàn)樵讷@取鎖的時(shí)候,過期時(shí)間可能會在多線程并發(fā)getAndSet()
的時(shí)候被修改蛔屹,導(dǎo)致過期時(shí)間和當(dāng)前線程加鎖時(shí)候放入的值不等同削樊,但個(gè)人認(rèn)為影響不大,實(shí)現(xiàn)代碼如下:
package com.chasel.demo.cache;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
/**
* 鎖超時(shí)時(shí)間約等于expireMsecs的分布式鎖解決方案
*
* @author XieLongzhen
* @date 2019/3/22 10:31
*/
@Slf4j
public class RedisDistributedLock implements IDistributedLock {
private StringRedisTemplate redisTemplate;
/**
* 鎖的鍵值
*/
private String lockKey;
/**
* 鎖超時(shí), 防止線程得到鎖之后, 不去釋放鎖
*/
private int expireMsecs;
/**
* 鎖等待, 防止線程饑餓
*/
private int timeoutMsecs;
/**
* 是否已經(jīng)獲取鎖
*/
private boolean locked = false;
public RedisDistributedLock(String lockKey, int timeoutMsecs, int expireMsecs, StringRedisTemplate redisTemplate) {
this.lockKey = lockKey;
this.timeoutMsecs = timeoutMsecs;
this.expireMsecs = expireMsecs;
this.redisTemplate = redisTemplate;
}
public String getLockKey() {
return this.lockKey;
}
/**
* 方法去掉了synchronized關(guān)鍵字
*/
@Override
public boolean acquire() {
int timeout = timeoutMsecs;
try {
while (timeout >= 0) {
long expires = System.currentTimeMillis() + expireMsecs + 1;
String expiresStr = String.valueOf(expires); // 鎖到期時(shí)間
if (redisTemplate.opsForValue().setIfAbsent(lockKey, expiresStr)) {
locked = true;
log.debug("[1] 成功獲取分布式鎖!");
return true;
}
String currentValueStr = redisTemplate.opsForValue().get(lockKey); // redis里的時(shí)間
// 判斷是否為空, redis舊鎖是否已經(jīng)過期, 如果被其他線程設(shè)置了值, 則第二個(gè)條件判斷是過不去的
if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
String oldValueStr = redisTemplate.opsForValue().getAndSet(lockKey, expiresStr);
// 獲取上一個(gè)鎖到期時(shí)間, 并設(shè)置現(xiàn)在的鎖到期時(shí)間
// 如果這個(gè)時(shí)候, 多個(gè)線程恰好都到了這里
// 只有一個(gè)線程拿到的過期時(shí)間是小于當(dāng)前時(shí)間的兔毒,后續(xù)的線程set進(jìn)去過期時(shí)間但拿到的過期時(shí)間會大于當(dāng)前時(shí)間
// 只有一個(gè)線程的設(shè)置值和當(dāng)前值相同, 那么它才有權(quán)利獲取鎖漫贞,其余線程繼續(xù)等待
if (oldValueStr == null || oldValueStr.equals(currentValueStr)) {
locked = true;
log.debug("[2] 成功獲取分布式鎖!");
return true;
}
}
timeout -= 100;
Thread.sleep(100);
}
} catch (Exception e) {
log.error("獲取鎖出現(xiàn)異常, 必須釋放: {}", e.getMessage());
}
return false;
}
/**
* 方法去掉了synchronized關(guān)鍵字
*/
@Override
public void release() {
try {
if (locked) {
String currentValueStr = redisTemplate.opsForValue().get(lockKey); // redis里的時(shí)間
// 校驗(yàn)是否超過有效期, 如果不在有效期內(nèi), 那說明當(dāng)前鎖已經(jīng)失效, 不能進(jìn)行刪除鎖操作
if (currentValueStr != null && Long.parseLong(currentValueStr) > System.currentTimeMillis()) {
redisTemplate.delete(lockKey);
locked = false;
log.debug("[3] 成功釋放分布式鎖!");
}
}
} catch (Exception e) {
log.error("釋放鎖出現(xiàn)異常, 必須釋放: {}", e.getMessage());
}
}
}
使用通過DistributedLockComponent
來使用,在業(yè)務(wù)代碼中通過DistributedLockComponent
獲取鎖以后只需要簡單的acquire()
然后release()
即可
package com.chasel.demo.cache;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
/**
* 分布式鎖工具類
*
* @author XieLongzhen
* @date 2019/3/21 10:36
*/
@Component
public class DistributedLockComponent {
@Autowired
private StringRedisTemplate redisTemplate;
private final String PREFIX_KEY = “demo:distributed-lock:”;
/**
* 鎖等待, 防止線程饑餓
*/
private final int DEFAULT_TIMEOUT_MSECS = 15 * 1000;
/**
* 鎖超時(shí), 防止線程得到鎖之后, 不去釋放鎖
*/
private final int DEFAULT_EXPIRE_MSECS = 15 * 1000;
/**
* 獲取分布式鎖
* 默認(rèn)獲取鎖15s超時(shí), 鎖過期時(shí)間15s
*/
public IDistributedLock getRedisLock(String key) {
return getRedisLock(key, DEFAULT_TIMEOUT_MSECS, DEFAULT_EXPIRE_MSECS);
}
/**
* 獲取分布式鎖
*/
public IDistributedLock getRedisLock(String key, int timeoutMsecs) {
return getRedisLock(key, timeoutMsecs, DEFAULT_EXPIRE_MSECS);
}
/**
* 獲取分布式鎖
*/
public IDistributedLock getRedisLock(String key, int timeoutMsecs, int expireMsecs) {
return new RedisDistributedLock(assembleKey(key), timeoutMsecs, expireMsecs, redisTemplate);
}
/**
* 對 lockKey 進(jìn)行拼接裝配
*
* @param key 系統(tǒng)內(nèi)保證該lockKey唯一即可
*/
private String assembleKey(String key) {
return String.format("%s%s", PREFIX_KEY, key);
}
}