[轉(zhuǎn)]分布式鎖-RedisLockRegistry源碼分析

前言

官網(wǎng)的英文介紹大概如下:

Starting with version 4.0, the RedisLockRegistry is available. Certain components (for example aggregator and resequencer) use a lock obtained from a LockRegistry instance to ensure that only one thread is manipulating a group at a time. The DefaultLockRegistry performs this function within a single component; you can now configure an external lock registry on these components. When used with a shared MessageGroupStore, the RedisLockRegistry can be use to provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time.
When a lock is released by a local thread, another local thread will generally be able to acquire the lock immediately. If a lock is released by a thread using a different registry instance, it can take up to 100ms to acquire the lock.
To avoid "hung" locks (when a server fails), the locks in this registry are expired after a default 60 seconds, but this can be configured on the registry. Locks are normally held for a much smaller time.

上述大概意思是RedisLockRegistry可以確保在分布式環(huán)境中,只有一個(gè)thread在執(zhí)行萧吠,也就是實(shí)現(xiàn)了分布式鎖左冬,當(dāng)一個(gè)本地線程釋放了鎖,其他本地現(xiàn)場(chǎng)會(huì)立即去搶占鎖纸型,如果鎖被占用了拇砰,那么會(huì)進(jìn)行重試機(jī)制,100毫秒進(jìn)行重試一次狰腌。同時(shí)也避免了"hung" locks 當(dāng)服務(wù)器fails的時(shí)候除破。同時(shí)也給鎖設(shè)置了默認(rèn)60秒的過期時(shí)間

如何獲取鎖

鎖的獲取過程

詳細(xì)流程如上圖所示,這里主要核心業(yè)務(wù)是這樣琼腔,首先Lock是java.util.concurrent.locks中的鎖瑰枫,也就是本地鎖。然后自己用RedisLock實(shí)現(xiàn)了Lock接口而已,但是實(shí)際上RedisLock也使用了本地鎖光坝。主要是通過redis鎖+本地鎖雙重鎖的方式實(shí)現(xiàn)的一個(gè)比較好的鎖尸诽。針對(duì)redis鎖來說只要能獲取到鎖,那么就算是成功的盯另。如果獲取不到鎖就等待100毫秒繼續(xù)重試性含,如果獲取到鎖那么就采用本地鎖鎖住本地的線程。通過兩種方式很好的去實(shí)現(xiàn)了一個(gè)完善的分布式鎖機(jī)制鸳惯。
下面代碼主要是獲取鎖的一個(gè)流程商蕴,先從本地鎖里面獲取,如果獲取到了那么和redis里面存放的RedisLock鎖做對(duì)比芝发,判斷是否是同一個(gè)對(duì)象究恤,如果不是那么就刪除本地鎖然后重新創(chuàng)建一個(gè)鎖返回

@Override
public Lock obtain(Object lockKey) {
    Assert.isInstanceOf(String.class, lockKey);

    //try to find the lock within hard references
    //從本地強(qiáng)引用里面獲取鎖,
    RedisLock lock = findLock(this.hardThreadLocks.get(), lockKey);

    /*
     * If the lock is locked, check that it matches what's in the store.
     * If it doesn't, the lock must have expired.
     */
    //這里主要判斷了這個(gè)鎖是否是鎖住的,如果不是的那么該鎖已經(jīng)過期了
    //如果強(qiáng)引用里面有這個(gè)鎖,并且lock.thread!=null,說明這個(gè)鎖沒有被占用
    if (lock != null && lock.thread != null) {
        //從redis獲取鎖,若如果redis鎖為空或者跟當(dāng)前強(qiáng)引用的鎖不一致,可以確定兩個(gè)問題
        //1.redis里面的鎖和本地的鎖不是一個(gè)了
        //2.redis里面沒有鎖
        RedisLock lockInStore = this.redisTemplate.boundValueOps(this.registryKey + ":" + lockKey).get();
        if (lockInStore == null || !lock.equals(lockInStore)) {
            //刪除強(qiáng)引用里面鎖
            getHardThreadLocks().remove(lock);
            lock = null;
        }
    }
    //如果鎖==null
    if (lock == null) {
        //try to find the lock within weak references
        //嘗試線從弱引用里面去找鎖
        lock = findLock(this.weakThreadLocks.get(), lockKey);
        //如果弱引用鎖==null 那么新建一個(gè)鎖
        if (lock == null) {
            lock = new RedisLock((String) lockKey);
            //判斷是否用弱引用,如果用那么就加入到弱引用里面
            if (this.useWeakReferences) {
                getWeakThreadLocks().add(lock);
            }
        }
    }

    return lock;
}

上面獲取到的是RedisLock后德,RedisLock是實(shí)現(xiàn)java原生Lock接口部宿,并重寫了lock()方法。首先從localRegistry中獲取到鎖瓢湃,這里的鎖是java開發(fā)包里面的ReentrantLock理张。首先把本地先鎖住,然后再去遠(yuǎn)程obtainLock绵患。每次sleep() 100毫秒直到獲取到遠(yuǎn)程鎖為止雾叭,代碼如下所示:

@Override
public void lock() {
    //這里采用java開發(fā)包里面的ReentrantLock 進(jìn)行多線程的加鎖,單機(jī)多線程的情況下解決并發(fā)的問題
    Lock localLock = RedisLockRegistry.this.localRegistry.obtain(this.lockKey);
    localLock.lock();
    while (true) {
        try {
            while (!this.obtainLock()) {
                Thread.sleep(100); //NOSONAR
            }
            break;
        }
        catch (InterruptedException e) {
                /*
                 * This method must be uninterruptible so catch and ignore
                 * interrupts and only break out of the while loop when
                 * we get the lock.
                 */
        }
        catch (Exception e) {
            localLock.unlock();
            rethrowAsLockException(e);
        }
    }
}

核心遠(yuǎn)程鎖還是在RedisLock中,這里采用了redis事務(wù)+watch的方式落蝙,watch和事務(wù)都是redis里面自帶的织狐。使用watch時(shí)候如果key的值發(fā)生了任何變化。那么exec()將不會(huì)執(zhí)行筏勒,那么如下代碼返回的success就是false移迫。從而來實(shí)現(xiàn)redis鎖的功能

private boolean obtainLock() {
    //判斷創(chuàng)建這個(gè)類的線程和當(dāng)前是否是一個(gè),如果是就直接獲取鎖
    Thread currentThread = Thread.currentThread();
    if (currentThread.equals(this.thread)) {
        this.reLock++;
        return true;
    }
    //把當(dāng)前鎖存到集合種
    toHardThreadStorage(this);

    /*
     * Set these now so they will be persisted if successful.
     */
    this.lockedAt = System.currentTimeMillis();
    this.threadName = currentThread.getName();

    Boolean success = false;
    try {
        success = RedisLockRegistry.this.redisTemplate.execute(new SessionCallback<Boolean>() {

            @SuppressWarnings({"unchecked", "rawtypes"})
            @Override
            public Boolean execute(RedisOperations ops) throws DataAccessException {
                String key = constructLockKey();
                //監(jiān)控key如果該key被改變了 那么該事務(wù)是不能被實(shí)現(xiàn)的會(huì)進(jìn)行回滾
                ops.watch(key); //monitor key
                //如果key存在了就停止監(jiān)控,如果key已經(jīng)存在了 那么肯定是被別人占用了
                if (ops.opsForValue().get(key) != null) {
                    ops.unwatch(); //key already exists, stop monitoring
                    return false;
                }

                ops.multi(); //transaction start
                //設(shè)置一個(gè)值并加上過期時(shí)間 m默認(rèn)是一分鐘左右的時(shí)間
                //set the value and expire
                //把鎖放入到redis中
                ops.opsForValue()
                        .set(key, RedisLock.this, RedisLockRegistry.this.expireAfter, TimeUnit.MILLISECONDS);

                //exec will contain all operations result or null - if execution has been aborted due to 'watch'
                return ops.exec() != null;
            }

        });

    }
    finally {
      //如果不成功那么把當(dāng)前過期時(shí)間和鎖的名字設(shè)置成null
        if (!success) {
            this.lockedAt = 0;
            this.threadName = null;
            toWeakThreadStorage(this);
        }
        else {
        //如果成功把當(dāng)前鎖的thread名稱設(shè)置成currentThread
            this.thread = currentThread;
            if (logger.isDebugEnabled()) {
                logger.debug("New lock; " + this.toString());
            }
        }

    }

    return success;
}

上面是整個(gè)加鎖的流程,基本流程比較簡(jiǎn)單管行,看完加鎖應(yīng)該自己都能解鎖厨埋,無非就是去除redis鎖和本地的鎖而已。

@Override
public void unlock() {
    //判斷當(dāng)前運(yùn)行的線程和鎖的線程做對(duì)比捐顷,如果兩個(gè)線程不一樣那么拋出異常
    if (!Thread.currentThread().equals(this.thread)) {
        if (this.thread == null) {
            throw new IllegalStateException("Lock is not locked; " + this.toString());
        }
        throw new IllegalStateException("Lock is owned by " + this.thread.getName() + "; " + this.toString());
    }

    try {
       //如果reLock--小于=0的話就刪除redis里面的鎖
        if (this.reLock-- <= 0) {
            try {
                this.assertLockInRedisIsUnchanged();
                RedisLockRegistry.this.redisTemplate.delete(constructLockKey());
                if (logger.isDebugEnabled()) {
                    logger.debug("Released lock; " + this.toString());
                }
            }
            finally {
                this.thread = null;
                this.reLock = 0;
                toWeakThreadStorage(this);
            }
        }
    }
    finally {
    //拿到本地鎖荡陷,進(jìn)行解鎖
        Lock localLock = RedisLockRegistry.this.localRegistry.obtain(this.lockKey);
        localLock.unlock();
    }
}

tryLock在原有的加鎖上面增加了一個(gè)超時(shí)機(jī)制,主要是先通過本地的超時(shí)機(jī)制

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    //拿到本地鎖
    Lock localLock = RedisLockRegistry.this.localRegistry.obtain(this.lockKey);
    //先本地鎖進(jìn)行tryLock
    if (!localLock.tryLock(time, unit)) {
        return false;
    }
    try {
        long expire = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(time, unit);
        boolean acquired;
        //這里添加了超時(shí)機(jī)制迅涮,跟之前的無限等待做了一個(gè)區(qū)分
        while (!(acquired = obtainLock()) && System.currentTimeMillis() < expire) { //NOSONAR
            Thread.sleep(100); //NOSONAR
        }
         //超時(shí)后沒有獲取到鎖废赞,那么就把本地鎖進(jìn)行解鎖
        if (!acquired) {
            localLock.unlock();
        }
        return acquired;
    }
    catch (Exception e) {
        localLock.unlock();
        rethrowAsLockException(e);
    }
    return false;
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
  • 序言:七十年代末,一起剝皮案震驚了整個(gè)濱河市叮姑,隨后出現(xiàn)的幾起案子唉地,更是在濱河造成了極大的恐慌,老刑警劉巖,帶你破解...
    沈念sama閱讀 212,454評(píng)論 6 493
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件渣蜗,死亡現(xiàn)場(chǎng)離奇詭異屠尊,居然都是意外死亡旷祸,警方通過查閱死者的電腦和手機(jī)耕拷,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 90,553評(píng)論 3 385
  • 文/潘曉璐 我一進(jìn)店門,熙熙樓的掌柜王于貴愁眉苦臉地迎上來托享,“玉大人骚烧,你說我怎么就攤上這事∪蛭В” “怎么了赃绊?”我有些...
    開封第一講書人閱讀 157,921評(píng)論 0 348
  • 文/不壞的土叔 我叫張陵,是天一觀的道長羡榴。 經(jīng)常有香客問我碧查,道長,這世上最難降的妖魔是什么校仑? 我笑而不...
    開封第一講書人閱讀 56,648評(píng)論 1 284
  • 正文 為了忘掉前任忠售,我火速辦了婚禮,結(jié)果婚禮上迄沫,老公的妹妹穿的比我還像新娘稻扬。我一直安慰自己,他們只是感情好羊瘩,可當(dāng)我...
    茶點(diǎn)故事閱讀 65,770評(píng)論 6 386
  • 文/花漫 我一把揭開白布泰佳。 她就那樣靜靜地躺著,像睡著了一般尘吗。 火紅的嫁衣襯著肌膚如雪逝她。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 49,950評(píng)論 1 291
  • 那天睬捶,我揣著相機(jī)與錄音汽绢,去河邊找鬼。 笑死侧戴,一個(gè)胖子當(dāng)著我的面吹牛宁昭,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播酗宋,決...
    沈念sama閱讀 39,090評(píng)論 3 410
  • 文/蒼蘭香墨 我猛地睜開眼积仗,長吁一口氣:“原來是場(chǎng)噩夢(mèng)啊……” “哼!你這毒婦竟也來了蜕猫?” 一聲冷哼從身側(cè)響起寂曹,我...
    開封第一講書人閱讀 37,817評(píng)論 0 268
  • 序言:老撾萬榮一對(duì)情侶失蹤,失蹤者是張志新(化名)和其女友劉穎,沒想到半個(gè)月后隆圆,有當(dāng)?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體漱挚,經(jīng)...
    沈念sama閱讀 44,275評(píng)論 1 303
  • 正文 獨(dú)居荒郊野嶺守林人離奇死亡,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點(diǎn)故事閱讀 36,592評(píng)論 2 327
  • 正文 我和宋清朗相戀三年渺氧,在試婚紗的時(shí)候發(fā)現(xiàn)自己被綠了旨涝。 大學(xué)時(shí)的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片。...
    茶點(diǎn)故事閱讀 38,724評(píng)論 1 341
  • 序言:一個(gè)原本活蹦亂跳的男人離奇死亡侣背,死狀恐怖白华,靈堂內(nèi)的尸體忽然破棺而出,到底是詐尸還是另有隱情贩耐,我是刑警寧澤弧腥,帶...
    沈念sama閱讀 34,409評(píng)論 4 333
  • 正文 年R本政府宣布,位于F島的核電站潮太,受9級(jí)特大地震影響管搪,放射性物質(zhì)發(fā)生泄漏。R本人自食惡果不足惜铡买,卻給世界環(huán)境...
    茶點(diǎn)故事閱讀 40,052評(píng)論 3 316
  • 文/蒙蒙 一更鲁、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧寻狂,春花似錦岁经、人聲如沸。這莊子的主人今日做“春日...
    開封第一講書人閱讀 30,815評(píng)論 0 21
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽。三九已至纠亚,卻和暖如春塘慕,著一層夾襖步出監(jiān)牢的瞬間,已是汗流浹背蒂胞。 一陣腳步聲響...
    開封第一講書人閱讀 32,043評(píng)論 1 266
  • 我被黑心中介騙來泰國打工图呢, 沒想到剛下飛機(jī)就差點(diǎn)兒被人妖公主榨干…… 1. 我叫王不留,地道東北人骗随。 一個(gè)月前我還...
    沈念sama閱讀 46,503評(píng)論 2 361
  • 正文 我出身青樓蛤织,卻偏偏與公主長得像,于是被迫代替她去往敵國和親鸿染。 傳聞我的和親對(duì)象是個(gè)殘疾皇子指蚜,可洞房花燭夜當(dāng)晚...
    茶點(diǎn)故事閱讀 43,627評(píng)論 2 350

推薦閱讀更多精彩內(nèi)容