redisson 分布式鎖之可重入鎖原理解析

基礎(chǔ)理論

  • redisson是使用java實現(xiàn)的操作redis的一個工具姐军,redisson可以作為spring-data的底層實現(xiàn),通過redisTemplate封裝的api來使用饼丘,redisson利用redis中的hash數(shù)據(jù)結(jié)構(gòu)來實現(xiàn)獲取鎖屡穗、鎖重入和釋放鎖等。
  • redisson包含了各種分布式鎖的實現(xiàn)其中包括了汛骂,可重入鎖、公平鎖、讀寫鎖担敌、聯(lián)鎖桃犬、紅鎖等敢伸。java中常用的鎖redisson都有分布式的實現(xiàn)方案尾序。
  • redisson實現(xiàn)的可重入鎖原理跟java中的ReentrantLock類似,通過redis的hash數(shù)據(jù)來獲取鎖和鎖的可重入,redis的發(fā)布訂閱消息實現(xiàn)了線程阻塞和重試獲取鎖。

源碼和流程

獲取鎖原理解析

  1. 如下圖使用redisson加鎖后會在redis中創(chuàng)建一個hash類型的數(shù)據(jù)秸侣,其中redisKey是order,key是線程的線程id+線程獲取鎖的次數(shù),value是線程獲取鎖的次數(shù)味榛。
鎖數(shù)據(jù)結(jié)構(gòu).png
  1. redisson獲取鎖的實現(xiàn)是通過lua腳本來實現(xiàn)的具體實現(xiàn)如下
//判斷rediskey是否存在巾表,如果不存在則表示鎖沒有被其它線程獲取
"if (redis.call('exists', KEYS[1]) == 0) then " +
//創(chuàng)建命名為order的hash數(shù)據(jù)鞠苟,并且把線程id作為key考榨,1作為value存入hash中
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//重置redis過期時間
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
//返回nil在java中就是null
"return nil; " +
"end; " +
//到這一步了則表示鎖已經(jīng)被獲取了接下來判斷獲取鎖的線程是否是當前線程
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
//如果獲取鎖成功散休,代表獲取鎖次數(shù)的value+1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
//重置redisKey的有效期
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//到了這一步則表示獲取鎖已經(jīng)失敗了扔嵌,最后返回redisKey有效期的剩余時間
"return redis.call('pttl', KEYS[1]);"

3.源碼解析

    @Test
    void method1() throws InterruptedException {
        // 嘗試獲取鎖
        boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
        if (!isLock) {
            log.error("獲取鎖失敗 .... 1");
            return;
        }
        try {
            log.info("獲取鎖成功 .... 1");
            method2();
            log.info("開始執(zhí)行業(yè)務 ... 1");
        } finally {
            log.warn("準備釋放鎖 .... 1");
            lock.unlock();
        }
    }
    
    @Override
    public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
        return tryLock(waitTime, -1, unit);
    }
    
    @Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        
        //1.執(zhí)行l(wèi)ua腳本并且根據(jù)返回的結(jié)果ttl判斷獲取鎖是否成功
        //2.如果獲取鎖成功并且leaseTime(鎖釋放時間)為-1則開啟看門狗
        //刷新鎖的過期時間防止鎖過期失效
        Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        
        //計算剩余的鎖等待時間如果過期了直接返回false
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        //訂閱鎖釋放消息宜狐,訂閱成功后線程會被阻塞在這里等待其它線程釋放鎖
        //并且發(fā)布消息俭驮,等待是有時間限制的
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            //超過鎖等待時間訂閱的消息還未發(fā)布直接取消訂閱并且返回false
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(waitTime, unit, threadId);
            return false;
        }

        try {
            //計算剩余的鎖等待時間并且判斷等待時間是否<=0如果等待時間
            //用完了直接返回false
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                //重新嘗試獲取鎖根據(jù)返回的ttl判斷鎖是否獲取成功
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                //判斷剩余的鎖等待超時時間是否清零了如果清零了返回false
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                // 比較ttl和time誰時間少崭倘,時間少的作為第二次訂閱消息的
                //等待超時時間
                if (ttl >= 0 && ttl < time) {
                    subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                //判斷剩余時間是否超時
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
    }
    
    private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        //阻塞線程等到lua腳本返回的ttl的值
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }
    
    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        //leaseTime鎖過期時間 如果!= -1 不需要開啟看門狗執(zhí)行后直接返回即可
        if (leaseTime != -1) {
            return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        //執(zhí)行釋放鎖和發(fā)布鎖釋放消息的lua腳本
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                                                commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                                                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        //等待lua腳本執(zhí)行結(jié)束
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            //判斷如果獲取鎖成功坞淮,設(shè)置看門狗
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }
    
    private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
    
    private void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        //ExpirationEntry map存放這所有的看門狗定時任務主要作用是
        //1.循環(huán)執(zhí)行定時任務時判斷鎖是否被釋放,鎖釋放時會把map中的key刪除凌盯。
        //2.釋放鎖時刪除map中的看門狗定時任務
        //getEntryName() -> e7b433e7-8f44-46c8-b98a-c59f487b6136:order
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            renewExpiration();
        }
    }
    
    private void renewExpiration() {
        //獲取看門狗定時任務判斷鎖是否被釋放
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        //執(zhí)行定時任務
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                //重置redisKey的時間
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    
                    //遞歸調(diào)用定時任務,保障鎖未釋放期間redisKey不會過期
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    }
                });
            }
        //internalLockLeaseTime / 3 = 10s 默認10s刷一次
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

釋放鎖原理

  1. redisson釋放鎖的實現(xiàn)也是通過lua腳本來實現(xiàn)的具體實現(xiàn)如下
//檢查鎖是否是自己的
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
//如果鎖不是自己的直接返回nil
"return nil;" +
"end; " +
//釋放一次鎖之后返回剩余的數(shù)量
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//如果counter>0表示鎖還未被完全釋放
"if (counter > 0) then " +
//重置鎖的有效期
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
//counter == 0 表示鎖被完全釋放刪除redisKey
"redis.call('del', KEYS[1]); " +
//發(fā)布消息給還在等待獲取鎖的線程
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
  1. 源碼解析
    @Test
    void method1() throws InterruptedException {
        // 嘗試獲取鎖
        lock.lock();
        boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
        if (!isLock) {
            log.error("獲取鎖失敗 .... 1");
            return;
        }
        try {
            log.info("獲取鎖成功 .... 1");
            method2();
            log.info("開始執(zhí)行業(yè)務 ... 1");
        } finally {
            log.warn("準備釋放鎖 .... 1");
            lock.unlock();
        }
    }
    
    @Override
    public void unlock() {
        try {
            //阻塞等待釋放鎖的流程執(zhí)行
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
        
    }
    
    @Override
    public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<Void>();
        //執(zhí)行釋放鎖的lua腳本并且發(fā)布鎖釋放消息
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        //等待lua腳本執(zhí)行完并且執(zhí)行下面的代碼
        future.onComplete((opStatus, e) -> {
            
            //取消看門狗定時任務
            cancelExpirationRenewal(threadId);

            //出現(xiàn)異常拋出異常
            if (e != null) {
                result.tryFailure(e);
                return;
            }

            //lua腳本執(zhí)行返回的結(jié)果opStatus如果是null釋放鎖失敗
            //拋出業(yè)務異常
            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }
            
            //鎖釋放成功
            result.trySuccess(null);
        });

        return result;
    }
    
    void cancelExpirationRenewal(Long threadId) {
        //從EXPIRATION_RENEWAL_MAP中取出看門狗定時任務判斷定時任務是否為空
        //如果定時任務為空直接停止
        ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (task == null) {
            return;
        }
        
        //刪除EXPIRATION_RENEWAL_MAP中的定時任務
        if (threadId != null) {
            task.removeThreadId(threadId);
        }
        
        //取消定時任務
        if (threadId == null || task.hasNoThreads()) {
            Timeout timeout = task.getTimeout();
            if (timeout != null) {
                timeout.cancel();
            }
            EXPIRATION_RENEWAL_MAP.remove(getEntryName());
        }
    }
    

獲取鎖和釋放鎖流程

如下圖左側(cè)的流程圖是獲取redisson鎖的過程右側(cè)的流程圖是釋放redisson鎖的流程圖

源碼流程-1.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末黎烈,一起剝皮案震驚了整個濱河市必怜,隨后出現(xiàn)的幾起案子膏执,更是在濱河造成了極大的恐慌征峦,老刑警劉巖蛉加,帶你破解...
    沈念sama閱讀 211,884評論 6 492
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件贺辰,死亡現(xiàn)場離奇詭異,居然都是意外死亡,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,347評論 3 385
  • 文/潘曉璐 我一進店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人,你說我怎么就攤上這事。” “怎么了拧粪?”我有些...
    開封第一講書人閱讀 157,435評論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長绢记。 經(jīng)常有香客問我签孔,道長罐盔,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 56,509評論 1 284
  • 正文 為了忘掉前任,我火速辦了婚禮铐达,結(jié)果婚禮上选脊,老公的妹妹穿的比我還像新娘钝的。我一直安慰自己,他們只是感情好望薄,可當我...
    茶點故事閱讀 65,611評論 6 386
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著椭员,像睡著了一般。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上浩峡,一...
    開封第一講書人閱讀 49,837評論 1 290
  • 那天咽块,我揣著相機與錄音,去河邊找鬼应役。 笑死袍祖,一個胖子當著我的面吹牛键耕,可吹牛的內(nèi)容都是我干的村视。 我是一名探鬼主播杠氢,決...
    沈念sama閱讀 38,987評論 3 408
  • 文/蒼蘭香墨 我猛地睜開眼勺爱,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了砾隅?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 37,730評論 0 267
  • 序言:老撾萬榮一對情侶失蹤债蜜,失蹤者是張志新(化名)和其女友劉穎晴埂,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體寻定,經(jīng)...
    沈念sama閱讀 44,194評論 1 303
  • 正文 獨居荒郊野嶺守林人離奇死亡儒洛,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 36,525評論 2 327
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了狼速。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片琅锻。...
    茶點故事閱讀 38,664評論 1 340
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖,靈堂內(nèi)的尸體忽然破棺而出恼蓬,到底是詐尸還是另有隱情惊完,我是刑警寧澤,帶...
    沈念sama閱讀 34,334評論 4 330
  • 正文 年R本政府宣布处硬,位于F島的核電站小槐,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏郁油。R本人自食惡果不足惜本股,卻給世界環(huán)境...
    茶點故事閱讀 39,944評論 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望桐腌。 院中可真熱鬧,春花似錦苟径、人聲如沸案站。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,764評論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽蟆盐。三九已至,卻和暖如春遭殉,著一層夾襖步出監(jiān)牢的瞬間石挂,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 31,997評論 1 266
  • 我被黑心中介騙來泰國打工险污, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留痹愚,地道東北人。 一個月前我還...
    沈念sama閱讀 46,389評論 2 360
  • 正文 我出身青樓蛔糯,卻偏偏與公主長得像拯腮,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子蚁飒,可洞房花燭夜當晚...
    茶點故事閱讀 43,554評論 2 349

推薦閱讀更多精彩內(nèi)容