RedissonLock不同的加鎖方法浓恳,流程會有所差別:
tryLock()不帶參數(shù)最終調(diào)用的是
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
"只有l(wèi)easeTime為-1時才會,有延期鎖的作用(也就是看門狗),鎖的過期時間需要看配置,默認(rèn)是30S"
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
"獲取鎖成功后,調(diào)用定時任務(wù)延遲鎖時間"
if (ttlRemaining) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
傳過來的參數(shù)leaseTime為-1皮获,unint是null,這個方法獲取不到直接就結(jié)束了纹冤,有點(diǎn)下failfast的模式洒宝,線程不會做自旋重試。
具體加鎖實(shí)現(xiàn):
tryLockInnerAsync:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
通過這種eval表達(dá)式萌京,lua腳本保證原子性
如果不存在鎖:
等價于:
命令 | 備注 |
---|---|
EXISTS lockkey | 判斷鎖是否存在 |
HSET lockkey uuid:threadId 1 | 設(shè)置hash field和value值 |
PEXPIRE lockkey internalLockLeaseTime | 設(shè)置lockkey的過期時間 |
如果存在鎖:
等價于:
命令 | 備注 |
---|---|
HEXISTS lockkey uuid:threadId | 判斷當(dāng)前線程是否已經(jīng)獲取到鎖 |
HSET lockkey uuid:threadId 1 | 設(shè)置hash field和value值 |
HINCRBY lockkey uuid:threadId 1 | 給對應(yīng)的field的值加1(相當(dāng)于可重入) |
PEXPIRE lockkey internalLockLeaseTime | 重置過期時間 |
普通的加鎖邏輯就結(jié)束了雁歌。
tryLock帶參數(shù)就相對復(fù)雜一些,加入了線程自旋相關(guān)的邏輯處理:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
"走tryAcquireAsync的邏輯"
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - current;
if (time <= 0) {
"如果已超時知残,則直接失敗"
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(threadId);
return false;
}
try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
如果客戶端宕機(jī)靠瞎,就不會在續(xù)期,那鎖到了30s之后自然就失效了
鎖續(xù)期邏輯:
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
// reschedule itself
renewExpiration();
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
特別注意:以上過程存在一個細(xì)節(jié)求妹,這里有必要說明一下乏盐,也是分布式鎖的一個關(guān)鍵點(diǎn):當(dāng)鎖正在被占用時,丑勤,有效的解決了無效的鎖申請浪費(fèi)資源的問題。