Redis分布式鎖的實現(xiàn)原理 - Redisson和RedisLockRegistry

主要接觸到的Redis分布式鎖有兩種框架RedisLockRegistryRedisson伸刃,今天來看下兩種框架的實現(xiàn)原理屯吊;

RedisLockRegistry

Spring-inintegration-redis中提供的一種實現(xiàn)方式,使用方式如下

@Bean("redisLockRegistry")
public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {
    return new RedisLockRegistry(redisConnectionFactory, "spring-redis-lock");
}

在配置文件中直接注冊成一個Bean怔鳖,下面是一些構(gòu)造時的源碼茉唉;

public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey) {
    // DEFAULT_EXPIRE_AFTER默認是6秒
    this(connectionFactory, registryKey, DEFAULT_EXPIRE_AFTER);
}

public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey, long expireAfter) {
    Assert.notNull(connectionFactory, "'connectionFactory' cannot be null");
    Assert.notNull(registryKey, "'registryKey' cannot be null");
    this.redisTemplate = new StringRedisTemplate(connectionFactory);
    this.obtainLockScript = new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);
    this.registryKey = registryKey;
    this.expireAfter = expireAfter;
}

構(gòu)造時生成了一個RedisTemplate用于執(zhí)行Redis操作,一個ObtainLockScript用來保存獲取鎖的腳本结执;

private final class RedisLock implements Lock {
        
    private final String lockKey;

    private final ReentrantLock localLock = new ReentrantLock();

    private volatile boolean unlinkAvailable = true;

    private volatile long lockedAt;
    // 構(gòu)造的時候只儲存一個key
    private RedisLock(String path) {
        this.lockKey = constructLockKey(path);
    }
        
    private String constructLockKey(String path) {
        return RedisLockRegistry.this.registryKey + ":" + path;
    }
    ...
}

內(nèi)部封裝了一個Redis鎖的實現(xiàn)度陆,主要包括一個鎖的標識lockKey和一個重入鎖ReentrantLock

Lock lock = redisLockRegistry.obtain(key);
try {
    if (lock.tryLock(5, TimeUnit.SECONDS)) {
        try {
            ...
        } finally {
            lock.unlock();
        }
    } else {
        throw new GetLockTimeoutException();
    }
} catch (InterruptedException e) {
    throw new GetLockTimeoutException();
}

上面是代碼中的基本用法献幔,可以選擇封裝成一個切面使用起來方便一些懂傀,首先來看下獲取鎖的操作obtain;

private final Map<String, RedisLock> locks = new ConcurrentHashMap<>();
public Lock obtain(Object lockKey) {
    Assert.isInstanceOf(String.class, lockKey);
    String path = (String) lockKey;
    return this.locks.computeIfAbsent(path, RedisLock::new);
}

從一個ConcurrentHashMap中根據(jù)鎖的標識獲取一個鎖,如果沒有則構(gòu)造一個新的蜡感;

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    long now = System.currentTimeMillis();
    // 先是本地的重入鎖先嘗試加鎖
    if (!this.localLock.tryLock(time, unit)) {
        // 如果本地線程加鎖失敗蹬蚁,直接返回
        return false;
    }
    try {
        long expire = now + TimeUnit.MILLISECONDS.convert(time, unit);
        boolean acquired;
        // 本地加鎖成功后執(zhí)行redis加鎖
        while (!(acquired = obtainLock()) && System.currentTimeMillis() < expire) { 
            // 加鎖失敗且沒過超時時間,睡一會再試
            Thread.sleep(100); 
        }
        if (!acquired) {
            // 如果沒有獲取到鎖郑兴,把本地鎖釋放直接返回
            this.localLock.unlock();
        }
        return acquired;
    }
    catch (Exception e) {
        this.localLock.unlock();
        rethrowAsLockException(e);
    }
    return false;
}

看這段邏輯需要一點ReentrantLock的知識犀斋,可以先了解下Java8 源碼閱讀 - AbstractQueuedSynchronizer

首先是本地的重入鎖先進行加鎖,加鎖步驟如下:

  • 如果鎖資源沒有被其他線程占用情连,直接加鎖叽粹,亦或者是本地線程已經(jīng)占用鎖,再次重入也被視為加鎖成功;
  • 如果鎖資源被其他線程占用球榆,排隊等待指定時間朽肥,指定時間內(nèi)獲取不到資源則加鎖失敗持钉;

成功在本地加鎖后會執(zhí)行Redis加鎖操作

private boolean obtainLock() {
    Boolean success =
            RedisLockRegistry.this.redisTemplate.execute(RedisLockRegistry.this.obtainLockScript,
                    Collections.singletonList(this.lockKey), RedisLockRegistry.this.clientId,
                    String.valueOf(RedisLockRegistry.this.expireAfter));

    boolean result = Boolean.TRUE.equals(success);

    if (result) {
        this.lockedAt = System.currentTimeMillis();
    }
    return result;
}

private static final String OBTAIN_LOCK_SCRIPT =
        "local lockClientId = redis.call('GET', KEYS[1])\n" +
                "if lockClientId == ARGV[1] then\n" +
                "  redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
                "  return true\n" +
                "elseif not lockClientId then\n" +
                "  redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n" +
                "  return true\n" +
                "end\n" +
                "return false";

利用Redis執(zhí)行Lua腳本是原子性的特性來實現(xiàn)的衡招,首先redis.call('GET', KEYS[1])來判斷是否已經(jīng)獲取到鎖,如果發(fā)現(xiàn)鎖已經(jīng)被占用每强,則判斷是不是來自同一個客戶端的請求始腾,如果是就更新解鎖時間(默認6s),如果不是來自同一個客戶端的(即多個節(jié)點對同一個key操作)空执,只不會允許非第一個客戶端的操作浪箭;上鎖的操作是redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2]),key是外面指定的key辨绊,而value則是客戶端的uuid奶栖;

解鎖的邏輯也是比較簡單,直接調(diào)用del或者unlink刪除key即可门坷;

private void removeLockKey() {
    if (this.unlinkAvailable) {
        try {
            RedisLockRegistry.this.redisTemplate.unlink(this.lockKey);
        }
        catch (Exception ex) {
            LOGGER.warn("The UNLINK command has failed (not supported on the Redis server?); " +
                    "falling back to the regular DELETE command", ex);
            this.unlinkAvailable = false;
            RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
        }
    }
    else {
        RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
    }
}

在單節(jié)點的Redis架構(gòu)下宣鄙,RedisLockRegistry是可以滿足一些不復雜的場景的,但是需要考慮鎖的續(xù)租默蚌、Redis集群架構(gòu)部署等問題的話冻晤,這個框架可能就會稍顯拉垮,所以大多數(shù)情況都是使用Redisson绸吸;

Redisson

使用方式大致如下

Config config = new Config();
config.useSentinelServers()
        .addSentinelAddress(
                "redis://127.0.0.1:6379",
                "redis://127.0.0.2:6379",
                "redis://127.0.0.3:6379");
RedissonClient redissonClient = Redisson.create(config);
RLock lock = redissonClient.getLock("key");
// 加鎖的兩種方式
lock.lock();
lock.lock(5, TimeUnit.SECONDS);
lock.unlock();

下面簡單分析下Redisson的加鎖邏輯

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    ...
}

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    // 在獲取鎖后鼻弧,如果還沒有通過調(diào)用unlock釋放鎖,
    // 則保持鎖的最大時間锦茁。如果leaseTime為-1攘轩,則保持鎖定直到顯式解鎖。
    if (leaseTime != -1) {
        // 指定了鎖的釋放時間
        return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 沒有指定鎖的釋放時間
    // 嘗試加鎖
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                                            TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            // 發(fā)現(xiàn)異常
            return;
        }

        // 加鎖成功
        if (ttlRemaining == null) {
            // 定時續(xù)租
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    // 里面處理了包括選取節(jié)點蜻势、重試等操作
    return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

先看下加鎖的邏輯撑刺,這里就忽略了集群節(jié)點選取等其他代碼,直接看這段Lua腳本的命令握玛,執(zhí)行邏輯如下

  • 檢查key是否存在
    • 如果key不存在够傍,直接將field即客戶端id+key+線程id組成的uuid對應的value置為1,并更新過期時間挠铲,最后返回nil冕屯;
    • 如果key存在,則判斷field(即上面的uuid)是否存在拂苹,如果不存在安聘,則說明該鎖的已被其他占用,直接返回key剩余的ttl;如果field存在浴韭,則說明自己是鎖的持有者丘喻,將filed對應的value自增1(重入);

如果沒有指定鎖的釋放時間念颈,將會觸發(fā)鎖續(xù)租的操作泉粉;

protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        // 記錄續(xù)租的次數(shù)
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        // 執(zhí)行續(xù)租操作
        renewExpiration();
    }
}
// 續(xù)租
private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        // 已經(jīng)被停止續(xù)租
        return;
    }
    
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                // 停止續(xù)租就是將該key從renewal集合中移除
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    // 續(xù)租操作有異常,停止續(xù)租
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                
                if (res) {
                    // 重新續(xù)租
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getName()),
            internalLockLeaseTime, getLockName(threadId));
}

來看下續(xù)租的操作榴芳,每個獲取鎖的線程都會執(zhí)行續(xù)租任務嗡靡,默認是30 / 3 = 10秒鐘重新執(zhí)行一次續(xù)租操作直到下面三個條件其中一個觸發(fā)

  • redis執(zhí)行續(xù)租命令異常
  • 手動解鎖操作
  • 續(xù)租操作失敗(通常是redis中key不存在了)

來看下續(xù)租的Lua命令

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1; 
end;
    return 0;

如果對應key和field都存在窟感,直接通過pexpire命令更新過期時間讨彼;

下面是看看獲取鎖失敗的操作

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        if (ttl == null) {
            // 獲取鎖成功
            return;
        }
        // 獲取鎖失敗
        // 利用redis的pub/sub機制來訂閱到鎖被釋放的消息
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        // 根據(jù)是否支持中斷來使用netty不同的同步操作
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }

        try {
            while (true) {
                // 重復嘗試加鎖
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                if (ttl == null) {
                    // 直到獲取鎖成功
                    break;
                }

                if (ttl >= 0) {
                    try {
                        // 利用Semaphore阻塞直到ttl過去
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        future.getNow().getLatch().acquire();
                    } else {
                        future.getNow().getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
    }

如果tryAcquire返回的值不為null,即表示鎖資源被其他線程占用柿祈,本地會啟動一個死循環(huán)重復執(zhí)行加鎖操作哈误,每一次上鎖失敗拿到ttl后,會利用Semaphore阻塞當前線程直到ttl過去谍夭,當然也可能被之前訂閱的解鎖消息主動喚醒黑滴;

protected void onMessage(RedissonLockEntry value, Long message) {
    if (message.equals(UNLOCK_MESSAGE)) {
        Runnable runnableToExecute = value.getListeners().poll();
        if (runnableToExecute != null) {
            runnableToExecute.run();
        }
        // 當收到解鎖的通知時會主動把Semaphore阻塞的線程釋放
        value.getLatch().release();
    }
}

這是當Redis收到訂閱的解鎖事件執(zhí)行的方法,這里會主動把RedissonLockEntry持有的所有被阻塞的線程都釋放紧索;

// 解鎖的實際邏輯
public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<>();
    RFuture<Boolean> future = unlockInnerAsync(threadId);

    future.onComplete((opStatus, e) -> {
        // 取消鎖續(xù)租操作
        cancelExpirationRenewal(threadId);
        ...
    });

    return result;
}

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return nil;",
            Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

解鎖的邏輯如下

  • 判斷這個鎖是否被當前線程持有
    • 如果當前線程不是鎖的持有者,直接返回nil菜谣;
    • 如果當前線程是鎖的持有者珠漂,將redis保存的數(shù)值減1,如果減1后的結(jié)果是大于0的尾膊,說明該鎖是被重復加鎖的媳危,這時候需要對鎖進行續(xù)租,時間為internalLockLeaseTime冈敛,并返回為0待笑;
      • 如果減1后的結(jié)果為0,將key刪除后并發(fā)布解鎖的消息抓谴;
RedisLockRegistry和Redisson對比

不同點:

  • RedisLockRegistry有一個本地加鎖的邏輯暮蹂,只有當本地加鎖成功才能繼續(xù)執(zhí)行redis加鎖邏輯,重入邏輯也是做在本地癌压,所以理論上RedisLockRegistryRedisson會快那么一點點仰泻;
  • Redisson有鎖續(xù)租功能,解決了加鎖成功后邏輯執(zhí)行未完成時鎖到期被釋放滩届,導致其他資源獲取鎖的混亂集侯;
RedLock

因為Redis集群主從同步時會有延遲,有可能因為master節(jié)點掛掉,master節(jié)點的鎖還未同步到slave時棠枉,slave被選舉成master而可能其他線程能在新master上重復獲得鎖浓体,而導致鎖資源加鎖混亂的問題;

所以就有了一個RedLock來解決這種問題辈讶,大致的思想是在集群上不同節(jié)點都嘗試加鎖汹碱,步驟大致如下

  1. 獲取當前的時間戳(毫秒)。
  2. 依次嘗試從 N 個實例荞估,使用相同的 key 和隨機值獲取鎖咳促。在步驟 2,當向 Redis 設置鎖時勘伺,客戶端應該設置一個網(wǎng)絡連接和響應超時時間跪腹,這個超時時間應該小于鎖的失效時間。例如你的鎖自動失效時間為 10 秒飞醉,則超時時間應該在 5-50 毫秒之間冲茸。這樣可以避免服務器端 Redis 已經(jīng)掛掉的情況下,客戶端還在死死地等待響應結(jié)果缅帘。如果服務器端沒有在規(guī)定時間內(nèi)響應轴术,客戶端應該盡快嘗試另外一個 Redis 實例。
  3. 客戶端使用當前時間減去開始獲取鎖時間(步驟 1 記錄的時間)就得到獲取鎖使用的時間钦无。當且僅當從大多數(shù)(這里是 3 個節(jié)點)的 Redis 節(jié)點都取到鎖逗栽,并且使用的時間小于鎖失效時間時,鎖才算獲取成功失暂。
    如果取到了鎖彼宠,key 的真正有效時間等于有效時間減去獲取鎖所使用的時間(步驟 3 計算的結(jié)果)。
  4. 如果因為某些原因弟塞,獲取鎖失斊鞠俊(沒有在至少 N/2+1 個Redis實例取到鎖或者取鎖時間已經(jīng)超過了有效時間),客戶端應該在所有的 Redis 實例上進行解鎖(即便某些 Redis 實例根本就沒有加鎖成功)决记。

通過在集群大多數(shù)實例上獲取來保證鎖的可用性摧冀,比較一個集群內(nèi)節(jié)點都宕機的可能性還是很低的,Redisson內(nèi)置了該算法的實現(xiàn)系宫,實現(xiàn)類是RedissonRedLock索昂;

RedissonRedLock
RLock lock = redissonClient.getLock("key");
RLock lock2 = redissonClient.getLock("key");
RedissonRedLock redLock = new RedissonRedLock(lock, lock2);
redLock.lock();
redLock.unlock();

使用方式如上,下面看下源碼具體的實現(xiàn)邏輯

public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    // 每個鎖至少等待1500ms笙瑟,總時間就是1500*數(shù)量
    long baseWaitTime = locks.size() * 1500;
    long waitTime = -1;
    if (leaseTime == -1) {
        waitTime = baseWaitTime;
    } else {
        // 如果指定了鎖的釋放時間
        leaseTime = unit.toMillis(leaseTime);
        waitTime = leaseTime;
        if (waitTime <= 2000) {
            waitTime = 2000;
        } else if (waitTime <= baseWaitTime) {
            // 如果鎖的釋放時間小于框架內(nèi)置的等待時間
            // 重制等待時間
            waitTime = ThreadLocalRandom.current().nextLong(waitTime/2, waitTime);
        } else {
            // 如果鎖的釋放時間大于框架內(nèi)置的等待時間
            // 重制等待時間
            waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
        }
    }
    
    while (true) {
        // 重復上鎖直到成功或者異常
        if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
            return;
        }
    }
}

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    // 重制鎖的釋放時間 至少要求是集群獲取鎖的等待時間以上
    long newLeaseTime = ...;
    // 獲取當前集群的時間戳
    long time = System.currentTimeMillis();
    long remainTime = -1;
    if (waitTime != -1) {
        remainTime = unit.toMillis(waitTime);
    }
    // 每個鎖保留的時間是waitTime
    long lockWaitTime = calcLockWaitTime(remainTime);
    // 加鎖失敗的限制數(shù)
    int failedLocksLimit = failedLocksLimit();
    List<RLock> acquiredLocks = new ArrayList<>(locks.size());
    // 遍歷鎖
    for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
        RLock lock = iterator.next();
        boolean lockAcquired;
        try {
            // 進行加鎖
            if (waitTime == -1 && leaseTime == -1) {
                lockAcquired = lock.tryLock();
            } else {
                long awaitTime = Math.min(lockWaitTime, remainTime);
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
            }
        } catch (RedisResponseTimeoutException e) {
            // redis訪問超時 解鎖當前的節(jié)點
            unlockInner(Arrays.asList(lock));
            lockAcquired = false;
        } catch (Exception e) {
            lockAcquired = false;
        }
        
        if (lockAcquired) {
            acquiredLocks.add(lock);
        } else {
            // 加鎖失敗
            if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                // 如果獲取到鎖的節(jié)點數(shù)大于預置的節(jié)點數(shù)楼镐,則判斷這次加鎖已經(jīng)成功
                break;
            }
            
            if (failedLocksLimit == 0) {
                // 加鎖失敗次數(shù)已經(jīng)到達了限制
                // 將已加鎖成功的全部解鎖
                unlockInner(acquiredLocks);
                if (waitTime == -1) {
                    return false;
                }
                failedLocksLimit = failedLocksLimit();
                acquiredLocks.clear();
                // 重制迭代器
                while (iterator.hasPrevious()) {
                    iterator.previous();
                }
            } else {
                // 記錄失敗的次數(shù)
                failedLocksLimit--;
            }
        }
        
        if (remainTime != -1) {
            // 計算還剩余的時間
            remainTime -= System.currentTimeMillis() - time;
            time = System.currentTimeMillis();
            // 如果已經(jīng)沒有剩余時間了,將所有獲取到鎖的節(jié)點進行解鎖
            if (remainTime <= 0) {
                unlockInner(acquiredLocks);
                return false;
            }
        }
    }

    if (leaseTime != -1) {
        // 更新每個節(jié)點鎖的超時時間
        List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
        for (RLock rLock : acquiredLocks) {
            RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
            futures.add(future);
        }
        
        for (RFuture<Boolean> rFuture : futures) {
            rFuture.syncUninterruptibly();
        }
    }
    return true;
}

加鎖的邏輯基本上按照RedLock算法來實現(xiàn)的往枷,流程如下:

  1. 每個節(jié)點加鎖至少等待1500ms框产,等待總時間就是1500*節(jié)點數(shù)量凄杯;
  2. 按照RedLock算法,計算每個節(jié)點獲取鎖的等待時間秉宿;
  3. 從第一個節(jié)點開始進行加鎖操作戒突,加鎖的執(zhí)行邏輯和上面單節(jié)點的一樣,如果加鎖成功繼續(xù)下一個節(jié)點描睦,如果加鎖失敗膊存,首先判斷加鎖成功的節(jié)點數(shù)是否已經(jīng)滿足最低加鎖個數(shù)限制,比如5個節(jié)點中必須大于等于3個節(jié)點忱叭;
  • 如果滿足則停止加鎖隔崎,本次加鎖操作執(zhí)行成功;
  • 如果不滿足韵丑,判斷加鎖失敗次數(shù)是否達到上限爵卒,如果沒有達到上限,記錄失敗次數(shù)并對下一個節(jié)點加鎖撵彻,如果已經(jīng)達到加鎖失敗次數(shù)的上限钓株,判斷本次集群加鎖失敗,解鎖所有成功獲取到鎖的節(jié)點陌僵;
  1. 每個節(jié)點加鎖無論成功或失敗轴合,執(zhí)行完成后都需要判斷剩余時間,如果剩余時間已經(jīng)小于0且未完成所有節(jié)點的加鎖碗短,判斷為加鎖失敗并進行解鎖受葛;
  2. 如果加鎖成功,更新所有加鎖成功節(jié)點的過期時間豪椿;
后續(xù)

在Redisson3.15.0版本中RedissonRedLock已經(jīng)不被推薦使用了奔坟,理由是現(xiàn)在RLock對象加鎖時會自動傳播到所有的Slaves,具體文檔可以在這里查詢搭盾;https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers#84-redlock

雖然不再推薦用了,但是長點見識也是可以的婉支;

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末鸯隅,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子向挖,更是在濱河造成了極大的恐慌蝌以,老刑警劉巖,帶你破解...
    沈念sama閱讀 221,198評論 6 514
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件何之,死亡現(xiàn)場離奇詭異跟畅,居然都是意外死亡,警方通過查閱死者的電腦和手機溶推,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,334評論 3 398
  • 文/潘曉璐 我一進店門徊件,熙熙樓的掌柜王于貴愁眉苦臉地迎上來奸攻,“玉大人,你說我怎么就攤上這事虱痕《媚停” “怎么了?”我有些...
    開封第一講書人閱讀 167,643評論 0 360
  • 文/不壞的土叔 我叫張陵部翘,是天一觀的道長硝训。 經(jīng)常有香客問我,道長新思,這世上最難降的妖魔是什么窖梁? 我笑而不...
    開封第一講書人閱讀 59,495評論 1 296
  • 正文 為了忘掉前任,我火速辦了婚禮夹囚,結(jié)果婚禮上纵刘,老公的妹妹穿的比我還像新娘。我一直安慰自己崔兴,他們只是感情好彰导,可當我...
    茶點故事閱讀 68,502評論 6 397
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著敲茄,像睡著了一般位谋。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上堰燎,一...
    開封第一講書人閱讀 52,156評論 1 308
  • 那天掏父,我揣著相機與錄音,去河邊找鬼秆剪。 笑死赊淑,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的仅讽。 我是一名探鬼主播陶缺,決...
    沈念sama閱讀 40,743評論 3 421
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼洁灵!你這毒婦竟也來了饱岸?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,659評論 0 276
  • 序言:老撾萬榮一對情侶失蹤徽千,失蹤者是張志新(化名)和其女友劉穎苫费,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體双抽,經(jīng)...
    沈念sama閱讀 46,200評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡百框,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,282評論 3 340
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了牍汹。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片铐维。...
    茶點故事閱讀 40,424評論 1 352
  • 序言:一個原本活蹦亂跳的男人離奇死亡柬泽,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出方椎,到底是詐尸還是另有隱情聂抢,我是刑警寧澤,帶...
    沈念sama閱讀 36,107評論 5 349
  • 正文 年R本政府宣布棠众,位于F島的核電站琳疏,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏闸拿。R本人自食惡果不足惜空盼,卻給世界環(huán)境...
    茶點故事閱讀 41,789評論 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望新荤。 院中可真熱鬧揽趾,春花似錦、人聲如沸苛骨。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,264評論 0 23
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽痒芝。三九已至俐筋,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間严衬,已是汗流浹背澄者。 一陣腳步聲響...
    開封第一講書人閱讀 33,390評論 1 271
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留请琳,地道東北人粱挡。 一個月前我還...
    沈念sama閱讀 48,798評論 3 376
  • 正文 我出身青樓,卻偏偏與公主長得像俄精,于是被迫代替她去往敵國和親询筏。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,435評論 2 359

推薦閱讀更多精彩內(nèi)容