Redisson 提供的分布式鎖
使用實例
private void redissonDoc() throws InterruptedException {
//1. 普通的可重入鎖
RLock lock = redissonClient.getLock("generalLock");
// 拿鎖失敗時會不停的重試
// 具有Watch Dog 自動延期機制 默認(rèn)續(xù)30s 每隔30/3=10 秒續(xù)到30s
lock.lock();
// 嘗試拿鎖10s后停止重試,返回false
// 具有Watch Dog 自動延期機制 默認(rèn)續(xù)30s
boolean res1 = lock.tryLock(10, TimeUnit.SECONDS);
// 拿鎖失敗時會不停的重試
// 沒有Watch Dog 废酷,10s后自動釋放
lock.lock(10, TimeUnit.SECONDS);
// 嘗試拿鎖100s后停止重試,返回false
// 沒有Watch Dog 遂黍,10s后自動釋放
boolean res2 = lock.tryLock(100, 10, TimeUnit.SECONDS);
//2. 公平鎖 保證 Redisson 客戶端線程將以其請求的順序獲得鎖
RLock fairLock = redissonClient.getFairLock("fairLock");
//3. 讀寫鎖 沒錯與JDK中ReentrantLock的讀寫鎖效果一樣
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readWriteLock");
readWriteLock.readLock().lock();
readWriteLock.writeLock().lock();
}
如果拿到分布式鎖的節(jié)點宕機,且這個鎖正好處于鎖住的狀態(tài)時安接,會出現(xiàn)鎖死的狀態(tài)翔忽,為了避免這種情況的發(fā)生,鎖都會設(shè)置一個過期時間盏檐。這樣也存在一個問題歇式,加入一個線程拿到了鎖設(shè)置了30s超時,在30s后這個線程還沒有執(zhí)行完畢糯笙,鎖超時釋放了贬丛,就會導(dǎo)致問題,Redisson給出了自己的答案给涕,就是 watch dog 自動延期機制。
Redisson提供了一個監(jiān)控鎖的看門狗额获,它的作用是在Redisson實例被關(guān)閉前够庙,不斷的延長鎖的有效期,也就是說抄邀,如果一個拿到鎖的線程一直沒有完成邏輯耘眨,那么看門狗會幫助線程不斷的延長鎖超時時間,鎖不會因為超時而被釋放境肾。
默認(rèn)情況下剔难,看門狗的續(xù)期時間是30s,也可以通過修改Config.lockWatchdogTimeout來另行指定奥喻。
另外Redisson 還提供了可以指定leaseTime參數(shù)的加鎖方法來指定加鎖的時間偶宫。超過這個時間后鎖便自動解開了,不會延長鎖的有效期
watch dog 核心源碼解讀
// 直接使用lock無參數(shù)方法
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
// 進入該方法 其中l(wèi)easeTime = -1
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
//...
}
// 進入 tryAcquire(-1, leaseTime, unit, threadId)
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
// 進入 tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//當(dāng)leaseTime = -1 時 啟動 watch dog機制
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
//執(zhí)行完lua腳本后的回調(diào)
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
if (ttlRemaining == null) {
// watch dog
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
scheduleExpirationRenewal 方法開啟監(jiān)控
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
//將線程放入緩存中
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
//第二次獲得鎖后 不會進行延期操作
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
// 第一次獲得鎖 延期操作
renewExpiration();
}
}
// 進入 renewExpiration()
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
//如果緩存不存在环鲤,那不再鎖續(xù)期
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
//執(zhí)行l(wèi)ua 進行續(xù)期
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
//延期成功纯趋,繼續(xù)循環(huán)操作
renewExpiration();
}
});
}
//每隔internalLockLeaseTime/3=10秒檢查一次
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
//lua腳本 執(zhí)行包裝好的lua腳本進行key續(xù)期
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
關(guān)鍵結(jié)論
上述源碼讀過來我們可以記住幾個關(guān)鍵情報:
- watch dog 在當(dāng)前節(jié)點存活時每10s給分布式鎖的key續(xù)期 30s;
- watch dog 機制啟動,且代碼中沒有釋放鎖操作時吵冒,watch dog 會不斷的給鎖續(xù)期纯命;
- 從可2得出,如果程序釋放鎖操作時因為異常沒有被執(zhí)行痹栖,那么鎖無法被釋放亿汞,所以釋放鎖操作一定要放到 finally {} 中;
看到3的時候揪阿,可能會有人有疑問留夜,如果釋放鎖操作本身異常了,watch dog 還會不停的續(xù)期嗎图甜?下面看一下釋放鎖的源碼碍粥,找找答案
// 鎖釋放
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
// 進入 unlockAsync(Thread.currentThread().getId()) 方法 入?yún)⑹钱?dāng)前線程的id
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
//執(zhí)行l(wèi)ua腳本 刪除key
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
// 無論執(zhí)行l(wèi)ua腳本是否成功 執(zhí)行cancelExpirationRenewal(threadId) 方法來刪除EXPIRATION_RENEWAL_MAP中的緩存
cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
// 此方法會停止 watch dog 機制
void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) {
return;
}
if (threadId != null) {
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
Timeout timeout = task.getTimeout();
if (timeout != null) {
timeout.cancel();
}
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}
釋放鎖的操作中 有一步操作是從 EXPIRATION_RENEWAL_MAP 中獲取 ExpirationEntry 對象,然后將其remove黑毅,結(jié)合watch dog中的續(xù)期前的判斷:
EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
如果釋放鎖操作本身異常了嚼摩,watch dog 還會不停的續(xù)期嗎?不會矿瘦,因為無論釋放鎖操作是否成功枕面,EXPIRATION_RENEWAL_MAP中的目標(biāo) ExpirationEntry 對象已經(jīng)被移除了,watch dog 通過判斷后就不會繼續(xù)給鎖續(xù)期了缚去。