主要接觸到的Redis分布式鎖有兩種框架RedisLockRegistry和Redisson伸刃,今天來看下兩種框架的實現(xiàn)原理屯吊;
RedisLockRegistry
Spring-inintegration-redis
中提供的一種實現(xiàn)方式,使用方式如下
@Bean("redisLockRegistry")
public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {
return new RedisLockRegistry(redisConnectionFactory, "spring-redis-lock");
}
在配置文件中直接注冊成一個Bean怔鳖,下面是一些構(gòu)造時的源碼茉唉;
public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey) {
// DEFAULT_EXPIRE_AFTER默認是6秒
this(connectionFactory, registryKey, DEFAULT_EXPIRE_AFTER);
}
public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey, long expireAfter) {
Assert.notNull(connectionFactory, "'connectionFactory' cannot be null");
Assert.notNull(registryKey, "'registryKey' cannot be null");
this.redisTemplate = new StringRedisTemplate(connectionFactory);
this.obtainLockScript = new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);
this.registryKey = registryKey;
this.expireAfter = expireAfter;
}
構(gòu)造時生成了一個RedisTemplate
用于執(zhí)行Redis操作,一個ObtainLockScript
用來保存獲取鎖的腳本结执;
private final class RedisLock implements Lock {
private final String lockKey;
private final ReentrantLock localLock = new ReentrantLock();
private volatile boolean unlinkAvailable = true;
private volatile long lockedAt;
// 構(gòu)造的時候只儲存一個key
private RedisLock(String path) {
this.lockKey = constructLockKey(path);
}
private String constructLockKey(String path) {
return RedisLockRegistry.this.registryKey + ":" + path;
}
...
}
內(nèi)部封裝了一個Redis鎖的實現(xiàn)度陆,主要包括一個鎖的標識lockKey
和一個重入鎖ReentrantLock
;
Lock lock = redisLockRegistry.obtain(key);
try {
if (lock.tryLock(5, TimeUnit.SECONDS)) {
try {
...
} finally {
lock.unlock();
}
} else {
throw new GetLockTimeoutException();
}
} catch (InterruptedException e) {
throw new GetLockTimeoutException();
}
上面是代碼中的基本用法献幔,可以選擇封裝成一個切面使用起來方便一些懂傀,首先來看下獲取鎖的操作obtain
;
private final Map<String, RedisLock> locks = new ConcurrentHashMap<>();
public Lock obtain(Object lockKey) {
Assert.isInstanceOf(String.class, lockKey);
String path = (String) lockKey;
return this.locks.computeIfAbsent(path, RedisLock::new);
}
從一個ConcurrentHashMap
中根據(jù)鎖的標識獲取一個鎖,如果沒有則構(gòu)造一個新的蜡感;
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
long now = System.currentTimeMillis();
// 先是本地的重入鎖先嘗試加鎖
if (!this.localLock.tryLock(time, unit)) {
// 如果本地線程加鎖失敗蹬蚁,直接返回
return false;
}
try {
long expire = now + TimeUnit.MILLISECONDS.convert(time, unit);
boolean acquired;
// 本地加鎖成功后執(zhí)行redis加鎖
while (!(acquired = obtainLock()) && System.currentTimeMillis() < expire) {
// 加鎖失敗且沒過超時時間,睡一會再試
Thread.sleep(100);
}
if (!acquired) {
// 如果沒有獲取到鎖郑兴,把本地鎖釋放直接返回
this.localLock.unlock();
}
return acquired;
}
catch (Exception e) {
this.localLock.unlock();
rethrowAsLockException(e);
}
return false;
}
看這段邏輯需要一點ReentrantLock
的知識犀斋,可以先了解下Java8 源碼閱讀 - AbstractQueuedSynchronizer
首先是本地的重入鎖先進行加鎖,加鎖步驟如下:
- 如果鎖資源沒有被其他線程占用情连,直接加鎖叽粹,亦或者是本地線程已經(jīng)占用鎖,再次重入也被視為加鎖成功;
- 如果鎖資源被其他線程占用球榆,排隊等待指定時間朽肥,指定時間內(nèi)獲取不到資源則加鎖失敗持钉;
成功在本地加鎖后會執(zhí)行Redis加鎖操作
private boolean obtainLock() {
Boolean success =
RedisLockRegistry.this.redisTemplate.execute(RedisLockRegistry.this.obtainLockScript,
Collections.singletonList(this.lockKey), RedisLockRegistry.this.clientId,
String.valueOf(RedisLockRegistry.this.expireAfter));
boolean result = Boolean.TRUE.equals(success);
if (result) {
this.lockedAt = System.currentTimeMillis();
}
return result;
}
private static final String OBTAIN_LOCK_SCRIPT =
"local lockClientId = redis.call('GET', KEYS[1])\n" +
"if lockClientId == ARGV[1] then\n" +
" redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
" return true\n" +
"elseif not lockClientId then\n" +
" redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n" +
" return true\n" +
"end\n" +
"return false";
利用Redis執(zhí)行Lua腳本是原子性的特性來實現(xiàn)的衡招,首先redis.call('GET', KEYS[1])
來判斷是否已經(jīng)獲取到鎖,如果發(fā)現(xiàn)鎖已經(jīng)被占用每强,則判斷是不是來自同一個客戶端的請求始腾,如果是就更新解鎖時間(默認6s),如果不是來自同一個客戶端的(即多個節(jié)點對同一個key操作)空执,只不會允許非第一個客戶端的操作浪箭;上鎖的操作是redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])
,key是外面指定的key辨绊,而value則是客戶端的uuid奶栖;
解鎖的邏輯也是比較簡單,直接調(diào)用del
或者unlink
刪除key即可门坷;
private void removeLockKey() {
if (this.unlinkAvailable) {
try {
RedisLockRegistry.this.redisTemplate.unlink(this.lockKey);
}
catch (Exception ex) {
LOGGER.warn("The UNLINK command has failed (not supported on the Redis server?); " +
"falling back to the regular DELETE command", ex);
this.unlinkAvailable = false;
RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
}
}
else {
RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
}
}
在單節(jié)點的Redis架構(gòu)下宣鄙,RedisLockRegistry是可以滿足一些不復雜的場景的,但是需要考慮鎖的續(xù)租默蚌、Redis集群架構(gòu)部署等問題的話冻晤,這個框架可能就會稍顯拉垮,所以大多數(shù)情況都是使用Redisson绸吸;
Redisson
使用方式大致如下
Config config = new Config();
config.useSentinelServers()
.addSentinelAddress(
"redis://127.0.0.1:6379",
"redis://127.0.0.2:6379",
"redis://127.0.0.3:6379");
RedissonClient redissonClient = Redisson.create(config);
RLock lock = redissonClient.getLock("key");
// 加鎖的兩種方式
lock.lock();
lock.lock(5, TimeUnit.SECONDS);
lock.unlock();
下面簡單分析下Redisson的加鎖邏輯
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
...
}
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
// 在獲取鎖后鼻弧,如果還沒有通過調(diào)用unlock釋放鎖,
// 則保持鎖的最大時間锦茁。如果leaseTime為-1攘轩,則保持鎖定直到顯式解鎖。
if (leaseTime != -1) {
// 指定了鎖的釋放時間
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 沒有指定鎖的釋放時間
// 嘗試加鎖
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
// 發(fā)現(xiàn)異常
return;
}
// 加鎖成功
if (ttlRemaining == null) {
// 定時續(xù)租
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
// 里面處理了包括選取節(jié)點蜻势、重試等操作
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
先看下加鎖的邏輯撑刺,這里就忽略了集群節(jié)點選取等其他代碼,直接看這段Lua腳本的命令握玛,執(zhí)行邏輯如下
- 檢查key是否存在
- 如果key不存在够傍,直接將field即客戶端id+key+線程id組成的uuid對應的value置為1,并更新過期時間挠铲,最后返回nil冕屯;
- 如果key存在,則判斷field(即上面的uuid)是否存在拂苹,如果不存在安聘,則說明該鎖的已被其他占用,直接返回key剩余的ttl;如果field存在浴韭,則說明自己是鎖的持有者丘喻,將filed對應的value自增1(重入);
如果沒有指定鎖的釋放時間念颈,將會觸發(fā)鎖續(xù)租的操作泉粉;
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
// 記錄續(xù)租的次數(shù)
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
// 執(zhí)行續(xù)租操作
renewExpiration();
}
}
// 續(xù)租
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
// 已經(jīng)被停止續(xù)租
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
// 停止續(xù)租就是將該key從renewal集合中移除
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
// 續(xù)租操作有異常,停止續(xù)租
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// 重新續(xù)租
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
來看下續(xù)租的操作榴芳,每個獲取鎖的線程都會執(zhí)行續(xù)租任務嗡靡,默認是30 / 3 = 10秒鐘重新執(zhí)行一次續(xù)租操作直到下面三個條件其中一個觸發(fā)
- redis執(zhí)行續(xù)租命令異常
- 手動解鎖操作
- 續(xù)租操作失敗(通常是redis中key不存在了)
來看下續(xù)租的Lua命令
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end;
return 0;
如果對應key和field都存在窟感,直接通過pexpire命令更新過期時間讨彼;
下面是看看獲取鎖失敗的操作
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
if (ttl == null) {
// 獲取鎖成功
return;
}
// 獲取鎖失敗
// 利用redis的pub/sub機制來訂閱到鎖被釋放的消息
RFuture<RedissonLockEntry> future = subscribe(threadId);
// 根據(jù)是否支持中斷來使用netty不同的同步操作
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
while (true) {
// 重復嘗試加鎖
ttl = tryAcquire(-1, leaseTime, unit, threadId);
if (ttl == null) {
// 直到獲取鎖成功
break;
}
if (ttl >= 0) {
try {
// 利用Semaphore阻塞直到ttl過去
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(future, threadId);
}
}
如果tryAcquire返回的值不為null,即表示鎖資源被其他線程占用柿祈,本地會啟動一個死循環(huán)重復執(zhí)行加鎖操作哈误,每一次上鎖失敗拿到ttl后,會利用Semaphore阻塞當前線程直到ttl過去谍夭,當然也可能被之前訂閱的解鎖消息主動喚醒黑滴;
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(UNLOCK_MESSAGE)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// 當收到解鎖的通知時會主動把Semaphore阻塞的線程釋放
value.getLatch().release();
}
}
這是當Redis收到訂閱的解鎖事件執(zhí)行的方法,這里會主動把RedissonLockEntry
持有的所有被阻塞的線程都釋放紧索;
// 解鎖的實際邏輯
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
// 取消鎖續(xù)租操作
cancelExpirationRenewal(threadId);
...
});
return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
解鎖的邏輯如下
- 判斷這個鎖是否被當前線程持有
- 如果當前線程不是鎖的持有者,直接返回nil菜谣;
- 如果當前線程是鎖的持有者珠漂,將redis保存的數(shù)值減1,如果減1后的結(jié)果是大于0的尾膊,說明該鎖是被重復加鎖的媳危,這時候需要對鎖進行續(xù)租,時間為
internalLockLeaseTime
冈敛,并返回為0待笑;- 如果減1后的結(jié)果為0,將key刪除后并發(fā)布解鎖的消息抓谴;
RedisLockRegistry和Redisson對比
不同點:
- RedisLockRegistry有一個本地加鎖的邏輯暮蹂,只有當本地加鎖成功才能繼續(xù)執(zhí)行redis加鎖邏輯,重入邏輯也是做在本地癌压,所以理論上RedisLockRegistry比Redisson會快那么一點點仰泻;
- Redisson有鎖續(xù)租功能,解決了加鎖成功后邏輯執(zhí)行未完成時鎖到期被釋放滩届,導致其他資源獲取鎖的混亂集侯;
RedLock
因為Redis集群主從同步時會有延遲,有可能因為master節(jié)點掛掉,master節(jié)點的鎖還未同步到slave時棠枉,slave被選舉成master而可能其他線程能在新master上重復獲得鎖浓体,而導致鎖資源加鎖混亂的問題;
所以就有了一個RedLock來解決這種問題辈讶,大致的思想是在集群上不同節(jié)點都嘗試加鎖汹碱,步驟大致如下
- 獲取當前的時間戳(毫秒)。
- 依次嘗試從 N 個實例荞估,使用相同的 key 和隨機值獲取鎖咳促。在步驟 2,當向 Redis 設置鎖時勘伺,客戶端應該設置一個網(wǎng)絡連接和響應超時時間跪腹,這個超時時間應該小于鎖的失效時間。例如你的鎖自動失效時間為 10 秒飞醉,則超時時間應該在 5-50 毫秒之間冲茸。這樣可以避免服務器端 Redis 已經(jīng)掛掉的情況下,客戶端還在死死地等待響應結(jié)果缅帘。如果服務器端沒有在規(guī)定時間內(nèi)響應轴术,客戶端應該盡快嘗試另外一個 Redis 實例。
- 客戶端使用當前時間減去開始獲取鎖時間(步驟 1 記錄的時間)就得到獲取鎖使用的時間钦无。當且僅當從大多數(shù)(這里是 3 個節(jié)點)的 Redis 節(jié)點都取到鎖逗栽,并且使用的時間小于鎖失效時間時,鎖才算獲取成功失暂。
如果取到了鎖彼宠,key 的真正有效時間等于有效時間減去獲取鎖所使用的時間(步驟 3 計算的結(jié)果)。- 如果因為某些原因弟塞,獲取鎖失斊鞠俊(沒有在至少 N/2+1 個Redis實例取到鎖或者取鎖時間已經(jīng)超過了有效時間),客戶端應該在所有的 Redis 實例上進行解鎖(即便某些 Redis 實例根本就沒有加鎖成功)决记。
通過在集群大多數(shù)實例上獲取來保證鎖的可用性摧冀,比較一個集群內(nèi)節(jié)點都宕機的可能性還是很低的,Redisson內(nèi)置了該算法的實現(xiàn)系宫,實現(xiàn)類是RedissonRedLock索昂;
RedissonRedLock
RLock lock = redissonClient.getLock("key");
RLock lock2 = redissonClient.getLock("key");
RedissonRedLock redLock = new RedissonRedLock(lock, lock2);
redLock.lock();
redLock.unlock();
使用方式如上,下面看下源碼具體的實現(xiàn)邏輯
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
// 每個鎖至少等待1500ms笙瑟,總時間就是1500*數(shù)量
long baseWaitTime = locks.size() * 1500;
long waitTime = -1;
if (leaseTime == -1) {
waitTime = baseWaitTime;
} else {
// 如果指定了鎖的釋放時間
leaseTime = unit.toMillis(leaseTime);
waitTime = leaseTime;
if (waitTime <= 2000) {
waitTime = 2000;
} else if (waitTime <= baseWaitTime) {
// 如果鎖的釋放時間小于框架內(nèi)置的等待時間
// 重制等待時間
waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
} else {
// 如果鎖的釋放時間大于框架內(nèi)置的等待時間
// 重制等待時間
waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
}
}
while (true) {
// 重復上鎖直到成功或者異常
if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
return;
}
}
}
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 重制鎖的釋放時間 至少要求是集群獲取鎖的等待時間以上
long newLeaseTime = ...;
// 獲取當前集群的時間戳
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
// 每個鎖保留的時間是waitTime
long lockWaitTime = calcLockWaitTime(remainTime);
// 加鎖失敗的限制數(shù)
int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
// 遍歷鎖
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
// 進行加鎖
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
// redis訪問超時 解鎖當前的節(jié)點
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
lockAcquired = false;
}
if (lockAcquired) {
acquiredLocks.add(lock);
} else {
// 加鎖失敗
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
// 如果獲取到鎖的節(jié)點數(shù)大于預置的節(jié)點數(shù)楼镐,則判斷這次加鎖已經(jīng)成功
break;
}
if (failedLocksLimit == 0) {
// 加鎖失敗次數(shù)已經(jīng)到達了限制
// 將已加鎖成功的全部解鎖
unlockInner(acquiredLocks);
if (waitTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// 重制迭代器
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
// 記錄失敗的次數(shù)
failedLocksLimit--;
}
}
if (remainTime != -1) {
// 計算還剩余的時間
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
// 如果已經(jīng)沒有剩余時間了,將所有獲取到鎖的節(jié)點進行解鎖
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}
if (leaseTime != -1) {
// 更新每個節(jié)點鎖的超時時間
List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
return true;
}
加鎖的邏輯基本上按照RedLock算法來實現(xiàn)的往枷,流程如下:
- 每個節(jié)點加鎖至少等待1500ms框产,等待總時間就是1500*節(jié)點數(shù)量凄杯;
- 按照RedLock算法,計算每個節(jié)點獲取鎖的等待時間秉宿;
- 從第一個節(jié)點開始進行加鎖操作戒突,加鎖的執(zhí)行邏輯和上面單節(jié)點的一樣,如果加鎖成功繼續(xù)下一個節(jié)點描睦,如果加鎖失敗膊存,首先判斷加鎖成功的節(jié)點數(shù)是否已經(jīng)滿足最低加鎖個數(shù)限制,比如5個節(jié)點中必須大于等于3個節(jié)點忱叭;
- 如果滿足則停止加鎖隔崎,本次加鎖操作執(zhí)行成功;
- 如果不滿足韵丑,判斷加鎖失敗次數(shù)是否達到上限爵卒,如果沒有達到上限,記錄失敗次數(shù)并對下一個節(jié)點加鎖撵彻,如果已經(jīng)達到加鎖失敗次數(shù)的上限钓株,判斷本次集群加鎖失敗,解鎖所有成功獲取到鎖的節(jié)點陌僵;
- 每個節(jié)點加鎖無論成功或失敗轴合,執(zhí)行完成后都需要判斷剩余時間,如果剩余時間已經(jīng)小于0且未完成所有節(jié)點的加鎖碗短,判斷為加鎖失敗并進行解鎖受葛;
- 如果加鎖成功,更新所有加鎖成功節(jié)點的過期時間豪椿;
后續(xù)
在Redisson3.15.0版本中RedissonRedLock已經(jīng)不被推薦使用了奔坟,理由是現(xiàn)在RLock對象加鎖時會自動傳播到所有的Slaves,具體文檔可以在這里查詢搭盾;https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers#84-redlock
雖然不再推薦用了,但是長點見識也是可以的婉支;