分布式鎖的三種實現(xiàn)方案

帶著問題去思考

分布式鎖有哪些解決方案掉伏?方案的利弊各自體現(xiàn)在哪里?
基于redis來實現(xiàn)分布式鎖實現(xiàn)原理炕泳,以及需要主要那些問題昭殉?
基于ZooKeeper 的分布式鎖實現(xiàn)原理

背景概要

互聯(lián)網(wǎng)從開始的單體應用隨之發(fā)展成目前的分布式應用棵癣,例如市場上流行的分布式框架Dubbo辕翰、SpringCloud等等
單體應用的優(yōu)勢:維護、集成狈谊、部署簡單喜命,適合小團隊獨立維護,劣勢隨之產(chǎn)生的是可擴展性太差河劝,代碼腐化維護成本的增加渊抄、應用復雜度越高功能越多風險性就越大。
所以為了解決以上問題丧裁,引入了分布式應用护桦,市場上很多大型網(wǎng)站以及應用都是分布式部署的。

分布式系統(tǒng)主要從三方面獲得提升:

  • 擴展性:集群擴展性煎娇、地理擴展性二庵、管理擴展性
  • 性能:短RT、低延遲缓呛,高吞吐和較低的計算資源占用率催享。
  • 可用性:可用性=可用時間/(可用時間+不可用時間),可用性百分比越高哟绊,難度越高 因妙。

分布式家族中包含分布式服務、分布式消息、分布式緩存攀涵、分布式調(diào)度铣耘、分布式數(shù)據(jù)庫、分布式搜索以故、分布式鎖蜗细、分布式事務、分布式計算等等


分布式家族

在分布式場景中數(shù)據(jù)一致性向來是很重要的話題怒详,CAP理論中“任何一個分布式系統(tǒng)都無法同時滿足一致性(Consistency)炉媒、可用性(Availability)和分區(qū)容錯性(Partition tolerance),最多只能同時滿足兩項昆烁。要么CP,要么AP吊骤。
https://www.yuque.com/docs/share/6e6afffe-c348-461e-90bc-711231fbd209?#
在很多場景下,我們是需要保證數(shù)據(jù)一致性静尼,為了滿足一致性問題水援,我們需要很多技術(shù)方案支持,比如分布式鎖分布式事務等等茅郎。本次分享主要針對分布式鎖來進行延伸探討。

分布式鎖的介紹

分布式鎖

講到分布式鎖或渤,首先要提到與之對應的線程鎖

  • 線程鎖:當某個方法或代碼使用鎖系冗,在同一時刻僅有一個線程執(zhí)行該方法或該代碼段。以保證共享資源安全性薪鹦,線程鎖只在同一個進程【同一個JVM】中才有效掌敬。
  • 分布式鎖:當多個進程不在同一個系統(tǒng)中,用分布式鎖控制多個進程對資源的訪問池磁。

到底什么時候需要分布式鎖呢奔害?

總結(jié)來說,當有多個客戶端需要訪問并防止并操作同一個資源地熄,并且還需要保持這個資源的一致性的時候华临,就需要分布式鎖,實現(xiàn)讓多個客戶端互斥的對共享資源進行訪問端考。
eg: 集群部署下秒殺場景雅潭、集群部署下批處理業(yè)務的執(zhí)行等等

考慮因素

  • 排他性:分布式部署的應用集群中,同一個方法在同一時間只能被一臺機器上的一個線程執(zhí)行
  • 可重入性:同一線程多次獲取鎖避免死鎖
  • 阻塞鎖考慮:考慮業(yè)務是否需要
  • 高可用:獲取釋放鎖性能佳
  • 原子性:加鎖解鎖原子性操作却特,避免多次請求獲得鎖扶供。

常見的分布式鎖實現(xiàn)方案

想要實現(xiàn)分布式鎖,我們就需要借助外部系統(tǒng)實現(xiàn)互斥的功能裂明。常見的有以下幾種方式:

  • 基于數(shù)據(jù)庫椿浓;
  • 基于redis;
  • 基于Zookeeper;

基于數(shù)據(jù)庫

  • Mysql:通過唯一索引、通過樂觀鎖version版本扳碍、通過悲觀鎖行鎖 for update實現(xiàn)提岔;
  • MongoDB:findAndModify原子性命令,相較Mysql性能要好很多

容易理解左腔,但解決問題的方案相對越來越復雜唧垦,并且數(shù)據(jù)庫需要一定的開銷,性能值得考慮

本次分享不涵蓋基于數(shù)據(jù)庫分布式鎖實現(xiàn)液样,有興趣可以下去自行通過上述方向去擴展振亮。

基于Redis

在實現(xiàn)redis分布式鎖之前,我們先mock一個場景來看看鞭莽,當分布式應用場景下坊秸,我們不用鎖或者用java中線程鎖會出現(xiàn)什么問題呢?

@GetMapping("/kill")
public String kill() {
  // 定義商品key值
  String key = "goods";
  // 獲取商品數(shù)量
  Object obj = redisTemplate.opsForValue().get(key);
  Integer mount = Integer.valueOf(obj.toString());
  // 如果商品被搶完澎怒,直接返回
  if (mount < 0 || mount == 0) {
  System.out.println("很遺憾褒搔,商品已被搶完");
  return "很遺憾,商品已被搶完";
  }
  // 線程睡眠喷面,目的在于放大錯誤
  try {
  Thread.sleep(2000);
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  // 搶到商品后星瘾,將redis的商品數(shù)量減一
  mount = --mount;
  redisTemplate.opsForValue().set(key, mount.toString());
  // 打印,以便觀察
  System.out.println(System.currentTimeMillis() + "-" + Thread.currentThread().getName() + ":搶到第" + (mount + 1) + "件商品【kill】");
  return "恭喜惧辈,商品搶購成功";
}

原理

最核心的三個命令:setNx琳状、Px(setNx、expire)盒齿、delete
setNx當返回1時代表獲取鎖成功念逞,0則搶鎖失敗

redis原生實現(xiàn)

具體實現(xiàn)

接下來我們所有的實現(xiàn)redis分布式鎖都是基于Spring切面定義來完成。

    // 準備工作
    @Bean(name="redisTemplate")
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, String> template = new RedisTemplate<>();
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        template.setConnectionFactory(factory);
        //key序列化方式
        template.setKeySerializer(redisSerializer);
        //value序列化
        template.setValueSerializer(redisSerializer);
        //value hashmap序列化
        template.setHashValueSerializer(redisSerializer);
        //key haspmap序列化
        template.setHashKeySerializer(redisSerializer);
        return template;
    }
@Aspect
@Component
public class RedisLockProvider {

    @Around("@annotation(lock)")
    public Object execute(ProceedingJoinPoint point, RedisLock lock) throws IllegalAccessException, InstantiationException {
        Object result = null;
        String lockKey = lock.lockKey();
        Class<? extends RedisLockStrategy> strategy = lock.strategy();
        RedisLockStrategy redisLockStrategy = strategy.newInstance();
        try {
            int expireTime = lock.expireTime();
            if(redisLockStrategy.lock(lockKey, expireTime)){
                result = point.proceed();
            }
        } catch (Throwable throwable) {
            redisLockStrategy.unLock(lockKey);
            throwable.printStackTrace();
        } finally {
            redisLockStrategy.unLock(lockKey);
        }
        return result;
    }

}

我們看了不加鎖或者加java鎖的情況边翁,確實會出現(xiàn)數(shù)據(jù)異常的情況翎承,那現(xiàn)在我們通過redis本身的一些特性來實現(xiàn)redis分布式鎖。

public class RedisLockUtil0 implements RedisLockStrategy {

    private String TEMP_VALUE = "OK";

    private RedisTemplate<String, String> redisTemplate =  ApplicationContextUtil.getBean("redisTemplate");

    public Boolean lock(String key, int expireTime) {
          return redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS);
    }

    public void unLock(String key){
        redisTemplate.delete(key);
    }
}

但上述場景中會有一個問題符匾,就是當我獲取鎖的時候叨咖,如果沒搶鎖成功會立刻返回到客戶端通知結(jié)果,也許下一時間正好就能搶到鎖啊胶,所以我們做了自旋的操作芒澜,并設置默認超時時間,這里也可以不設置超時時間:那就是阻塞鎖了创淡,看業(yè)務場景而制定

public class RedisLockUtil1 implements RedisLockStrategy {

    private static final long DEFAULT_TIME_OUT = 2000; // 默認超時時間(毫秒) 默認2秒

    private String TEMP_VALUE = "OK";

    private RedisTemplate<String, String> redisTemplate =  ApplicationContextUtil.getBean("redisTemplate");

    public Boolean lock(String key, int expireTime) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            while ((System.currentTimeMillis() - currentTimeMillis) < DEFAULT_TIME_OUT) {
                if(redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS)){
                    log.info("申請鎖(" + key + ")成功");
                    return Boolean.TRUE;
                }
                Thread.sleep(100);
            }
        } catch (InterruptedException e) {
            log.error("加鎖失敗, 鎖鍵值:" + key, e);
        }
        return Boolean.FALSE;
    }

    public void unLock(String key){
        log.info("釋放鎖痴晦,key:"+key);
        redisTemplate.delete(key);
    }
}

上述實現(xiàn)方式,看似邏輯ok琳彩,有同學能看出有什么漏洞嗎誊酌?部凑??我們來測試看下碧浊。
讓我們業(yè)務時間執(zhí)行較短時測試涂邀,發(fā)現(xiàn)業(yè)務邏輯沒太大問題;
讓我們業(yè)務時間稍微變大箱锐,結(jié)果發(fā)現(xiàn)了什么比勉?

當業(yè)務執(zhí)行時間大于默認的自旋超時時間時,會觸發(fā)刪除鎖操作驹止,會出現(xiàn)誤刪的情況浩聋,A業(yè)務的鎖還未執(zhí)行完成,B鎖獲取異畴担或獲取失敗或者自旋時間已經(jīng)超時衣洁,導致誤刪了A業(yè)務的鎖,也就會導致分布式鎖的定義沒有任何意義了抖仅。所以我們在設置鎖的時候坊夫,設置一個屬于該鎖的唯一ID,在刪除鎖的時候要判斷是否屬于自己的鎖。以避免誤刪場景撤卢。

public class RedisLockUtil2 implements RedisLockStrategy {

    /**
     * 默認超時時間(毫秒) 默認2秒
     */
    private static final long DEFAULT_TIME_OUT = 2000;

    /**
     * 避免誤刪key环凿,在value中賦值UUID
     */
    private String TEMP_VALUE = UUID.randomUUID().toString();

    private RedisTemplate redisTemplate =  ApplicationContextUtil.getBean("redisTemplate");

    public Boolean lock(String key, int expireTime) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            while ((System.currentTimeMillis() - currentTimeMillis) < DEFAULT_TIME_OUT) {
                if(redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS)){
                    log.info("加鎖成功...");
                    return Boolean.TRUE;
                }
                Thread.sleep(100);
            }
        } catch (InterruptedException e) {
            log.error("加鎖失敗, 鎖鍵值:" + key, e);
        }
        return Boolean.FALSE;
    }

    public void unLock(String key){
        if(TEMP_VALUE.equals(redisTemplate.opsForValue().get(key))){
            log.info("釋放鎖,key:"+key);
            redisTemplate.delete(key);
        }
    }
}

那現(xiàn)在我們模擬一個場景放吩,假設我們業(yè)務的執(zhí)行時間過長的情況下智听,但我們設置的redis過期時間很短,那會出現(xiàn)什么問題呢屎慢??忽洛?我們來測試看下腻惠。

是不是導致A業(yè)務的還未執(zhí)行完成,B業(yè)務卻拿到了本應該屬于A的鎖欲虚,分布式鎖的意思有蕩然無存了集灌,所以我們借鑒了redisson中WatchDog【看門狗】的機制來完善業(yè)務時間大于過期時間的問題。

redisson是用java來實現(xiàn)的分布式框架复哆,稍后我們會介紹redisson是如何基于redis來實現(xiàn)分布式鎖并解決相關問題的欣喧。

public class RedisLockUtil3 implements RedisLockStrategy {

    private static final long DEFAULT_TIME_OUT = 2000; // 默認超時時間(毫秒) 默認2秒

    private String TEMP_VALUE = UUID.randomUUID().toString();

    private RedisTemplate redisTemplate =  ApplicationContextUtil.getBean("redisTemplate");

    /**
     * 初始化任務線程池
     */
    ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
    /**
     * 延長過期次數(shù)閾值
     */
    private Integer maxRenewTimes = 10;
    /**
     * 延長過期時間次數(shù)
     */
    private AtomicInteger renewTimes = new AtomicInteger(0);

    public Boolean lock(String key, int expireTime) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            while ((System.currentTimeMillis() - currentTimeMillis) < DEFAULT_TIME_OUT) {
                if(redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS)){
                    log.info("申請鎖(" + key + ")成功");
                    this.scheduleExpirationRenewal(key, TEMP_VALUE, expireTime);
                    return Boolean.TRUE;
                }
                Thread.sleep(100);
            }
        } catch (InterruptedException e) {
            log.error("加鎖失敗, 鎖鍵值:" + key, e);
        }
        return Boolean.FALSE;
    }

    /**
     * expireTime/3 頻率去重置過期時間
     * @param key
     * @param value
     * @param expireTime
     */
    private void scheduleExpirationRenewal(String key, String value, Integer expireTime){
        scheduledExecutor.schedule(()->{
            // 延長過期時間失敗直接返回
            if(!this.renewExpiration(key, value, expireTime)){
                return;
            }
            // 超過延長次數(shù)閾值直接返回
            if(maxRenewTimes > 0 && renewTimes.incrementAndGet() == maxRenewTimes){
                return;
            }
            this.scheduleExpirationRenewal(key,value,expireTime);
        }, expireTime / 3, TimeUnit.SECONDS);
    }

    /**
     * 重置過期時間
     * @param lockKey
     * @param lockValue
     * @param lockWatchdogTimeout
     * @return
     */
    private boolean renewExpiration(String lockKey, String lockValue, Integer lockWatchdogTimeout) {
        String value = (String) redisTemplate.opsForValue().get(lockKey);
        if (Objects.isNull(value) || !value.equals(lockValue)) {
            return false;
        }
        log.info("延長過期時間,key:"+lockKey);
        return redisTemplate.expire(lockKey, lockWatchdogTimeout, TimeUnit.SECONDS);
    }

    public void unLock(String key){
        if(TEMP_VALUE.equals(redisTemplate.opsForValue().get(key))){
            log.info("釋放鎖梯找,key:"+key);
            redisTemplate.delete(key);
        }
    }
}

Lua腳本語言的引入

到此為止唆阿,我們自己實現(xiàn)的分布式方案看似已經(jīng)ok,但其實還是有很大的問題锈锤,譬如在刪除鎖驯鳖、給鎖加長超時時間等操作闲询,我們是先獲取鎖在刪除或者延長超時時間,兩者操作并不是原子性操作浅辙,如果在獲取鎖成功之后扭弧,redis宕機,那么也會出現(xiàn)業(yè)務紊亂记舆,所以我們在redis操作要盡量保證院子性操作鸽捻。
那么我們可以引入Lua腳本語言來支持,Lua腳本的優(yōu)勢泽腮,興趣的同學可以下去學習下lua語言御蒲,大部分游戲開發(fā)都用的lua來實現(xiàn);語法鏈接:https://www.runoob.com/lua/lua-tutorial.html
并且我們國人章亦春 OpenResty 也是基于Lua和Nginx來實現(xiàn)高性能服務端的盛正。

Lua優(yōu)勢

Redis 腳本使用 Lua 解釋器來執(zhí)行腳本删咱。 Redis 2.6 版本通過內(nèi)嵌支持 Lua 環(huán)境。執(zhí)行腳本的常用命令為 EVAL豪筝。腳本命令:https://www.runoob.com/redis/redis-scripting.html

腳本入?yún)ⅲ簁ey個數(shù)痰滋、KEYS[1]、ARGV[1]续崖、ARGV[2]
eg:EVAL "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second

eval "return redis.call('setNx', KEYS[1], ARGV[1])" 1 wuding WD
eval "return redis.call('get',KEYS[1])" 1 CCC;

接下來我們先測試下redis內(nèi)嵌lua腳本敲街。執(zhí)行成功返回1,否則返回0

@GetMapping("/luaTest")
    public String luaTest() {
        String script_init = "if redis.call('setNx',KEYS[1],ARGV[1]) == 1  then " +
                "return redis.call('expire',KEYS[1],ARGV[2]) " +
                "else " +
                "return 0 " +
                "end";

        Object initResult1 = redisTemplate.execute(new DefaultRedisScript<Long>(script_init, Long.class), Arrays.asList("AAA"), "aaa","1000");
        System.out.println(initResult1);

        Object initResult2 = redisTemplate.execute(new DefaultRedisScript<Long>(script_init, Long.class), Arrays.asList("AAA"), "aaa","1000");
        System.out.println(initResult2);


        String expire_init = "if redis.call('get',KEYS[1]) == ARGV[1]  then " +
                "return redis.call('expire',KEYS[1],ARGV[2]) " +
                "else " +
                "return 0 " +
                "end";


        Object expireResult = redisTemplate.execute(new DefaultRedisScript<Long>(expire_init, Long.class), Arrays.asList("AAA"),"aaa", "5000");
        System.out.println(expireResult);
        
        String script = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
                "return redis.call('del',KEYS[1]) " +
                "else " +
                "return 0 " +
                "end";
        Object aaa = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("AAA"), "aaa");
        System.out.println(aaa);
        return aaa.toString();
    }

完善嵌入lua腳本的redis分布式鎖實現(xiàn)

public class RedisLockUtil4 implements RedisLockStrategy {

    private static final long DEFAULT_TIME_OUT = 20000; // 默認超時時間(毫秒) 默認20秒
    private String TEMP_VALUE = UUID.randomUUID().toString();
    private RedisTemplate redisTemplate =  ApplicationContextUtil.getBean("redisTemplate");
    ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
    private Integer maxRenewTimes = 10;
    private AtomicInteger renewTimes = new AtomicInteger(0);

    public Boolean lock(String key, int expireTime) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            while ((System.currentTimeMillis() - currentTimeMillis) < DEFAULT_TIME_OUT) {
                if(redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS)){
                    log.info("redis申請鎖(" + key + ")成功");
                    this.scheduleExpirationRenewal(key, TEMP_VALUE, expireTime);
                    return Boolean.TRUE;
                }
                Thread.sleep(100);
            }
        } catch (InterruptedException e) {
            log.error("加鎖失敗, 鎖鍵值:" + key, e);
        }
        return Boolean.FALSE;
    }

    private void scheduleExpirationRenewal(String key, String value, Integer expireTime){
        scheduledExecutor.schedule(()->{
            if(!this.renewExpiration(key, value, expireTime)){
                return;
            }
            if(maxRenewTimes > 0 && renewTimes.incrementAndGet() == maxRenewTimes){
                return;
            }
            this.scheduleExpirationRenewal(key,value,expireTime);
        }, expireTime / 3, TimeUnit.SECONDS);
    }

    private boolean renewExpiration(String lockKey, String lockValue, Integer lockWatchdogTimeout) {
        String expire_init = "if redis.call('get',KEYS[1]) == ARGV[1]  then " +
                "return redis.call('expire',KEYS[1],ARGV[2]) " +
                "else " +
                "return 0 " +
                "end";

        // 入?yún)⑶杏浂际亲址贤蝗痪蜁愋娃D(zhuǎn)換失敗多艇,scheduledExecutor把異常捕獲掉了看不到錯誤信息
        Long expireResult = null;
        try {
            expireResult = (Long) redisTemplate.execute(new DefaultRedisScript<Long>(expire_init, Long.class), Arrays.asList(lockKey),lockValue, lockWatchdogTimeout.toString());
        } catch (Exception e) {
            log.error("執(zhí)行l(wèi)ua腳本出錯!e:"+ e.getCause().getMessage());
            return Boolean.FALSE;
        }
        if(expireResult == 1L){
            log.info("延長過期時間像吻,key:"+lockKey);
        }
        return expireResult == 1L;
    }

    public void unLock(String key){
        String script = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
                "return redis.call('del',KEYS[1]) " +
                "else " +
                "return 0 " +
                "end";
        Long result = (Long) redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList(key), TEMP_VALUE);
        if(result == 1L){
            log.info("釋放鎖峻黍,key:"+key);
        }
    }
}

接下來我們再設想一個問題:業(yè)務A調(diào)用業(yè)務B,AB兩者業(yè)務都調(diào)用了分布式鎖,或者A業(yè)務來做個遞歸操作立磁,那么大家猜想下會出現(xiàn)什么問題菠齿??骨饿?我們來測試看下。

把鎖的超時時間設大來進行測試台腥,如果是阻塞鎖會一直阻塞下去宏赘,非阻塞鎖的話,超時時間獲取子方法也沒有執(zhí)行黎侈,業(yè)務邏輯也就會有問題察署。這就是我們可重入性的問題。
可重入鎖指的是可重復可遞歸調(diào)用的鎖峻汉,在外層使用鎖之后箕母,在內(nèi)層仍然可以使用储藐,如果沒有可重入鎖的支持,在第二次嘗試獲得鎖時將會進入死鎖狀態(tài)嘶是。
通俗理解就是:排隊打水钙勃,一個人只能用一個桶來接水,如果你還有一個桶聂喇,只能再去排隊辖源,這就是非重入性,反之希太,只要到排到你克饶,不管你拿幾個桶你都可以來接滿水。

public class RedisLockUtil5 implements RedisLockStrategy {

    private static final long DEFAULT_TIME_OUT = 20000; // 默認超時時間(毫秒) 默認2秒

    private String TEMP_VALUE = UUID.randomUUID().toString();

    private RedisTemplate<String, String> redisTemplate =  ApplicationContextUtil.getBean("redisTemplate");

    ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();

    private Integer maxRenewTimes = 10;

    private AtomicInteger renewTimes = new AtomicInteger(0);

    public Boolean lock(String key){
        return lock(key, 10);
    }

    public Boolean lock(String key, int expireTime) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            while ((System.currentTimeMillis() - currentTimeMillis) < DEFAULT_TIME_OUT) {
                LockInfo lockInfo = ThreadLocalUtil.get();
                if(Objects.nonNull(lockInfo)
                        && key.equals(lockInfo.getKey())){
                    lockInfo.getCount().incrementAndGet();
                    ThreadLocalUtil.put(lockInfo);
                    // 將threadLocal中的value賦值給當前TEMP_VALUE誊辉,保證可重入性矾湃,保證刪除邏輯正常
                    TEMP_VALUE = lockInfo.getValue();
                    // TODO 這里應該重置redis過期時間
                    log.info("可重入加鎖成功...");
                    return Boolean.TRUE;
                }
                if(redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS)){
                    log.info("redis申請鎖(" + key + ")成功");
                    ThreadLocalUtil.put(new LockInfo(key, TEMP_VALUE, new AtomicInteger(1)));
                    this.scheduleExpirationRenewal(key, TEMP_VALUE, expireTime);
                    return Boolean.TRUE;
                }
                Thread.sleep(100);
            }
        } catch (InterruptedException e) {
            log.error("加鎖失敗, 鎖鍵值:" + key, e);
        }
        return Boolean.FALSE;
    }

    private void scheduleExpirationRenewal(String key, String value, Integer expireTime){
        scheduledExecutor.schedule(()->{
            if(!this.renewExpiration(key, value, expireTime)){
                return;
            }
            if(maxRenewTimes > 0 && renewTimes.incrementAndGet() == maxRenewTimes){
                ThreadLocalUtil.clear();
                return;
            }
            this.scheduleExpirationRenewal(key,value,expireTime);
        }, expireTime / 3, TimeUnit.SECONDS);
    }

    private boolean renewExpiration(String lockKey, String lockValue, Integer lockWatchdogTimeout) {
        String expire_init = "if redis.call('get',KEYS[1]) == ARGV[1]  then " +
                "return redis.call('expire',KEYS[1],ARGV[2]) " +
                "else " +
                "return 0 " +
                "end";

        // 入?yún)⑶杏浂际亲址蝗痪蜁愋娃D(zhuǎn)換失敗堕澄,scheduledExecutor把異常捕獲掉了看不到錯誤信息
        Long expireResult = null;
        try {
            expireResult = (Long) redisTemplate.execute(new DefaultRedisScript<Long>(expire_init, Long.class), Arrays.asList(lockKey),lockValue, lockWatchdogTimeout.toString());
        } catch (Exception e) {
            log.error("執(zhí)行l(wèi)ua腳本出錯邀跃!e:"+ e.getCause().getMessage());
            return Boolean.FALSE;
        }
        if(expireResult == 1L){
            log.info("延長過期時間,key:"+lockKey);
        }
        return expireResult == 1L;
    }

    public void unLock(String key){
        LockInfo lockInfo = ThreadLocalUtil.get();
        if(Objects.nonNull(lockInfo)
                && key.equals(lockInfo.getKey())
                && TEMP_VALUE.equals(lockInfo.getValue())){
            lockInfo.getCount().decrementAndGet();
            ThreadLocalUtil.put(lockInfo);
            log.info("釋放threadLocal鎖蛙紫,key:"+key);
        }

        if(Objects.nonNull(lockInfo) && lockInfo.getCount().get() <= 0){
            try {
                String script = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
                        "return redis.call('del',KEYS[1]) " +
                        "else " +
                        "return 0 " +
                        "end";
                Long result = (Long) redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList(key), TEMP_VALUE);
                if(result == 1L){
                    log.info("釋放redis鎖拍屑,key:"+key);
                }
            } finally {
                ThreadLocalUtil.clear();
            }
        }
    }


    /**
     * 也可以將線程id存入redis中去做比較,有興趣可以自行實現(xiàn)
     */
    static class ThreadLocalUtil{
        private static final ThreadLocal<LockInfo> THREAD_LOCAL = new ThreadLocal<>();

        public static LockInfo get() {
            return THREAD_LOCAL.get();
        }

        public static void put(LockInfo lockInfo) {
            THREAD_LOCAL.set(lockInfo);
        }

        public static void clear() {
            THREAD_LOCAL.remove();
        }

    }

    @AllArgsConstructor
    @Data
    static class LockInfo{

       private String key;

       private String value;

       private AtomicInteger count;
    }
}

也可以將線程id存入redis中去做比較坑傅,有興趣可以自行實現(xiàn)
并且避免線程id可能重復僵驰,可以把每個進程標識為唯一id作為前綴,有興趣可以自行實現(xiàn)

最終形態(tài)的流程圖

流程圖

解決了哪些問題

到此為止我們redis分布式鎖就最終實現(xiàn)完成了唁毒。我們回顧下我們解決了那些問題蒜茴?

  • 阻塞鎖-回旋操作
  • 誤刪鎖的問題
  • 業(yè)務執(zhí)行時間大于分布式鎖的過期時間如何處理
  • redis命令非原子性問題
  • 可重入鎖的問題

Redisson實現(xiàn)

在介紹看門狗機制的時候我們有提到redisson框架,那么接下來我們看下redisson分布式鎖框架具體是如何實現(xiàn)的浆西。首先我們先針對上述mock業(yè)務通過redisson分布式鎖來探究是否會出現(xiàn)上述問題

具體實現(xiàn)

配置文件

redisson.yml配置:具體配置映射對象在org.redisson.config;

# 單節(jié)點配置
singleServerConfig:
  # 連接空閑超時粉私,單位:毫秒
  idleConnectionTimeout: 10000
  # 連接超時,單位:毫秒
  connectTimeout: 10000
  # 命令等待超時室谚,單位:毫秒
  timeout: 3000
  # 命令失敗重試次數(shù),如果嘗試達到 retryAttempts(命令失敗重試次數(shù)) 仍然不能將命令發(fā)送至某個指定的節(jié)點時毡鉴,將拋出錯誤崔泵。
  # 如果嘗試在此限制之內(nèi)發(fā)送成功秒赤,則開始啟用 timeout(命令等待超時) 計時。
  retryAttempts: 3
  # 命令重試發(fā)送時間間隔憎瘸,單位:毫秒
  retryInterval: 1500
  #  # 重新連接時間間隔入篮,單位:毫秒
  #  reconnectionTimeout: 3000
  #  # 執(zhí)行失敗最大次數(shù)
  #  failedAttempts: 3
  # 密碼
  password:
  # 單個連接最大訂閱數(shù)量
  subscriptionsPerConnection: 5
  # 客戶端名稱
  clientName: myRedis
  #  # 節(jié)點地址
  address: redis://127.0.0.1:6379
  # 發(fā)布和訂閱連接的最小空閑連接數(shù)
  subscriptionConnectionMinimumIdleSize: 1
  # 發(fā)布和訂閱連接池大小
  subscriptionConnectionPoolSize: 50
  # 最小空閑連接數(shù)
  connectionMinimumIdleSize: 32
  # 連接池大小
  connectionPoolSize: 64
  # 數(shù)據(jù)庫編號
  database: 0
  # DNS監(jiān)測時間間隔,單位:毫秒
  dnsMonitoringInterval: 5000
# 線程池數(shù)量,默認值: 當前處理核數(shù)量 * 2
threads: 0
# Netty線程池數(shù)量,默認值: 當前處理核數(shù)量 * 2
nettyThreads: 0
# 編碼
codec: !<org.redisson.codec.JsonJacksonCodec> {}
# 傳輸模式
transportMode : "NIO"
// 準備工作
@Bean(destroyMethod="shutdown")
public RedissonClient redisson() throws IOException {
    RedissonClient redisson = Redisson.create(
        Config.fromYAML(new ClassPathResource("redisson.yml").getInputStream()));
    return redisson;
}
@Aspect
@Component
public class RedissonLockProvider {

    @Autowired
    private RedissonClient redissonClient;

    @Around("@annotation(lock)")
    public Object execute(ProceedingJoinPoint point, RedissonLock lock){
        Object result = null;
        String lockKey = lock.lockKey();
        RLock rLock = redissonClient.getLock(lockKey);
        rLock.lock();
        try {
            result = point.proceed();
        } catch (Throwable throwable) {
            rLock.unlock();
            throwable.printStackTrace();
        } finally {
            rLock.unlock();
        }
        return result;
    }
}

Redisson鎖分類

  • 可重入鎖(Reentrant Lock)
  • 公平鎖(Fair Lock):也是繼承了可重入鎖的
  • 聯(lián)鎖(MultiLock):將多個RLock對象關聯(lián)為一個聯(lián)鎖幌甘,每個實例可以來自于不同Redisson實例潮售。
  • 紅鎖(RedLock):繼承聯(lián)鎖痊项。n個master節(jié)點完全獨立,并且沒有主從同步酥诽,此時如果有n / 2 + 1個節(jié)點成功拿到鎖并且大多數(shù)節(jié)點加鎖的總耗時鞍泉,要小于鎖設置的過期時間。so加鎖成功肮帐。
  • 讀寫鎖(ReadWriteLock)咖驮、信號量(Semaphore)、可過期性信號量(PermitExpirableSemaphore)训枢、閉鎖(CountDownLatch)

節(jié)點掛掉的時候托修,存在丟失鎖的風險的問題。而現(xiàn)實情況是有一些場景無法容忍的恒界,所以 Redisson 提供了實現(xiàn)了redlock算法睦刃,如果業(yè)務場景可以容忍這種小概率的錯誤,則推薦使用 RedissonLock十酣, 如果無法容忍涩拙,則推薦使用 RedissonRedLock。

源碼探究

大家如果有興趣了解上述鎖的具體實現(xiàn)原理可自行研究婆誓,本次分享主要針對默認redisson實現(xiàn)鎖方案可重入鎖RLock來講解吃环,

    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        // 異步的Executor執(zhí)行器
        this.commandExecutor = commandExecutor;
        this.id = commandExecutor.getConnectionManager().getId();
        // 默認超時時間為30S
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        this.entryName = id + ":" + name;
        // redis的訂閱發(fā)布模式
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
    }

// 獲取鎖成功返回null,獲取鎖失敗 && 等待時間還早就頻繁獲取鎖并監(jiān)聽鎖是否被釋放掉
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        // 嘗試獲取鎖,沒有獲取到鎖洋幻,返回剩余ttl過期時間
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        // waitTime超時后郁轻,返回false獲取鎖失敗
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        // 訂閱分布式鎖,解決通知文留,發(fā)布訂閱模式
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        // 阻塞等待鎖釋放好唯,等待超時釋放訂閱信息
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
            // 循環(huán)去調(diào)用獲取鎖方法tryAcquire
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                //1、latch其實是個信號量Semaphore燥翅,調(diào)用其tryAcquire方法會讓當前線程阻塞一段時間骑篙,避免了在while循環(huán)中頻繁請求獲取鎖;
                //2森书、該Semaphore的release方法靶端,會在訂閱解鎖消息的監(jiān)聽器消息處理方法org.redisson.pubsub.LockPubSub#onMessage調(diào)用;當其他線程釋放了占用的鎖凛膏,會廣播解鎖消息杨名,監(jiān)聽器接收解鎖消息,并釋放信號量猖毫,最終會喚醒阻塞在這里的線程台谍。
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            // 取消解鎖消息的訂閱
            unsubscribe(subscribeFuture, threadId);
        }
    }

    private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(leaseTime, unit, threadId));
    }
    
    private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {
        // tryLock()方法最終執(zhí)行邏輯
        if (leaseTime != -1) {
            // 如果有設置鎖的過期時間,則直接調(diào)用lua吁断,不走看門狗邏輯
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        }
        // lock()方法最終執(zhí)行邏輯
        RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            if (ttlRemaining) {
                // 執(zhí)行看門狗邏輯
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }

    // 將線程id存入hash中的Field趁蕊,用于解決可重入問題
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +  // key是否不存在
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +  // 不存在的話把賦值Key,Filed為線程id,value為1存儲到hash數(shù)據(jù)結(jié)構(gòu)中
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +  // 并且設置過期時間
                      "return nil; " +  // 返回null
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 如果可以存在坞生,并且field等于線程id
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +  // 則吧value++1
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +  // 重置過期時間
                      "return nil; " +  // 返回null
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",  // 返回過期時間
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
// watchDog看門狗邏輯
private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        renewExpiration();
    }
}

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    // 定時任務過期時間 internalLockLeaseTime/3 頻率來延長過期時間為internalLockLeaseTime
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }

            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    return;
                }

                if (res) {
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    ee.setTimeout(task);
}

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                "end; " +
                "return 0;",
            Collections.<Object>singletonList(getName()), 
            internalLockLeaseTime, getLockName(threadId));
}
// 解鎖邏輯
// key+field不存在,直接返回null
// 存在的話掷伙,value--1,判斷value是否大于0是己,大于0重置過期時間,否則刪除key并且發(fā)布刪除訂閱事件
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

    }

基于Zookeeper

zookeeper分布式鎖原理

  • 保持獨占:多個客戶端同時過來創(chuàng)建/lock/zk-001節(jié)點任柜,那么有且僅有一個客戶端能創(chuàng)建成功赃泡。換句話說,倘若把Zookeeper上的一個節(jié)點看做是一把鎖乘盼,那么成功創(chuàng)建的客戶端則能保持獨占升熊;
  • 控制時序:有一種臨時有序節(jié)點,每個來嘗試獲取鎖的客戶端绸栅,都會在Zookeeper的根目錄下創(chuàng)建一個臨時有序節(jié)點级野,Zookeeper的/lock節(jié)點維護一個序列,序號最小的節(jié)點獲取鎖成功粹胯。
  • 監(jiān)聽機制:Watcher機制能原子性的監(jiān)聽Zookeeper上節(jié)點的增刪改操作

基礎理論

zk-znode
zk操作和維護的為一個個數(shù)據(jù)節(jié)點蓖柔,稱為 znode,采用類似文件系統(tǒng)的層級樹狀結(jié)構(gòu)進行管理风纠,如果 znode 節(jié)點包含數(shù)據(jù)則存儲為字節(jié)數(shù)組(byte array)况鸣。同一個節(jié)點多個客戶同時創(chuàng)建,只有一個客戶端會成功竹观,其它客戶端創(chuàng)建時將失敗镐捧。

zk的四種節(jié)點
持久性節(jié)點:節(jié)點創(chuàng)建后將會一直存在
臨時節(jié)點:臨時節(jié)點的生命周期和當前會話綁定,一旦當前會話斷開臨時節(jié)點也會刪除臭增,當然可以主動刪除懂酱。
持久有序節(jié)點:節(jié)點創(chuàng)建一直存在,并且zk會自動為節(jié)點加上一個自增的后綴作為新的節(jié)點名稱誊抛。
臨時有序節(jié)點:保留臨時節(jié)點的特性列牺,并且zk會自動為節(jié)點加上一個自增的后綴作為新的節(jié)點名稱。

zk-watcher
事件監(jiān)聽器是zookeeper中的一個很重要的特性拗窃。

None(-1), 客戶端與服務端成功建立連接
NodeCreated(1),Watcher監(jiān)聽的對應數(shù)據(jù)節(jié)點被創(chuàng)建
NodeDeleted(2),Watcher監(jiān)聽的對應數(shù)據(jù)節(jié)點被刪除
NodeDataChanged(3),Watcher監(jiān)聽的對應數(shù)據(jù)節(jié)點的數(shù)據(jù)內(nèi)容發(fā)生變更
NodeChildrenChanged(4),Wather監(jiān)聽的對應數(shù)據(jù)節(jié)點的子節(jié)點列表發(fā)生變更
DataWatchRemoved(5),
ChildWatchRemoved(6),
_PersistentWatchRemoved _(7);

實現(xiàn)思路

首先方案:同一個節(jié)點只能創(chuàng)建一次瞎领,加鎖時檢查節(jié)點是否exist,不存在則創(chuàng)建節(jié)點,否則監(jiān)聽該節(jié)點的刪除事件随夸,當釋放鎖的時候再次競爭去創(chuàng)建節(jié)點九默。如此帶來的就是當并發(fā)量很高的時候,釋放鎖會喚醒許多客戶端都去競爭逃魄,競爭失敗的客戶端再去休眠荤西,如此反復對系統(tǒng)資源造成了極大的浪費澜搅。


方案一

為了規(guī)避以上問題伍俘,我們可以使用有序子節(jié)點的形式來實現(xiàn)分布式鎖邪锌,而且為了規(guī)避客戶端獲取鎖后突然斷線的風險,我們有必要使用臨時有序節(jié)點癌瘾。

多個客戶端競爭鎖資源觅丰,創(chuàng)建多個臨時有序節(jié)點,檢查所屬節(jié)點是否是最小節(jié)點妨退,如若是妇萄,則獲取鎖成功,如若不是咬荷,那就監(jiān)聽自己節(jié)點-1的刪除事件冠句,等待被喚醒。
這種方案在每次釋放鎖時只喚醒一個客戶端幸乒,減少了線程喚醒的代價懦底,提高了效率。


方案二

zk原生API實現(xiàn)

接下來我們通過zk原生實現(xiàn)API來實現(xiàn)分布式鎖

public class ZkLockUtil implements Watcher {

    public static final String NODE_PATH = "/lock-space-watcher";

    private ZooKeeper zk = null;

    public ZkLockUtil() throws IOException, KeeperException, InterruptedException {
        zk = new ZooKeeper("127.0.0.1:2181", 300000, this);
    }

    protected  CountDownLatch countDownLatch=new CountDownLatch(1);

    private String lockPath;

    public String createNode(String key){
        try {
            String node = NODE_PATH +"/"+ key;
            //檢測節(jié)點是否存在
            Stat stat = zk.exists(node, false);
            //父節(jié)點不存在罕扎,則創(chuàng)建父節(jié)點
            if(Objects.isNull(stat)){
                synchronized (NODE_PATH) {
                    //父節(jié)點是持久節(jié)點 一層層創(chuàng)建否則會報錯
                    zk.create(node, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
            }
            lockPath = zk.create(node + "/", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println("節(jié)點創(chuàng)建成功聚唐,返回值【"+lockPath+"】");
            return lockPath;
        } catch (KeeperException e1) {
            e1.printStackTrace();
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        return null;
    }

    //校驗當前節(jié)點是否為序號最小的節(jié)點
    public boolean checkLockPath(String key, String lockPath){
        String nodePath = NODE_PATH + "/" + key;
        try {
            //注冊父節(jié)點監(jiān)聽事件,當父節(jié)點下面的子節(jié)點有變化,就會觸發(fā)Watcher事件
            List<String> nodeList = zk.getChildren(nodePath, false);
            Collections.sort(nodeList);
            int index = nodeList.indexOf( lockPath.substring(nodePath.length()+1));
            switch (index){
                case -1:{
                    System.out.println("本節(jié)點已不在了"+lockPath);
                    return false;
                }
                case 0:{
                    System.out.println("獲取鎖成功腔召,子節(jié)點序號【"+lockPath+"】");
                    return true;
                }
                default:{

                    String waitPath = nodeList.get(index - 1);
                    zk.exists(nodePath+"/"+waitPath, this);
                    System.out.println(waitPath+"在"+nodeList.get(index)+"點前面,需要等待【"+nodeList.get(index)+"】");
                    return false;
                }
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    public boolean lock(String key, Integer waitTime){
        //創(chuàng)建獲取鎖的節(jié)點(順序臨時節(jié)點)
        String childPath = createNode(key);
        boolean flag = true;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        if(null != childPath){
            lockPath = childPath;
            try {
                //輪詢等待zk獲取鎖的通知
                while(flag){
                    if(checkLockPath(key, childPath)){
                        //獲取鎖成功
                        return true;
                    }
                    if(null != waitTime && atomicInteger.get() > 0){
                        // 刪除當前等待節(jié)點
                        return false;
                    }
                    //節(jié)點創(chuàng)建成功杆查, 則等待zk通知
                    if(null != waitTime){
                        countDownLatch.await(waitTime, TimeUnit.SECONDS);
                        atomicInteger.incrementAndGet();
                        System.out.println("await等待被喚醒~"+waitTime);
                    }else{
                        countDownLatch.await();
                        System.out.println("await等待被喚醒~");
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }else{
            System.out.println("節(jié)點沒有創(chuàng)建成功,獲取鎖失敗");
        }
        return false;
    }

    @Override
    public void process(WatchedEvent event) {
        //成功連接zk臀蛛,狀態(tài)判斷
        if(event.getState() == Event.KeeperState.SyncConnected){
            //子節(jié)點有變化
            if(event.getType() == Event.EventType.NodeDeleted){
                System.out.println("臨時節(jié)點自動刪除");
                countDownLatch.countDown();
            }
        }
    }

    public void unlock(){
        try {
            zk.delete(getLockPath(), -1);
            if(Objects.nonNull(zk)){
                zk.close();
            }
        } catch (Exception e) {
        }
    }

    public String getLockPath() {
        return lockPath;
    }

}

上述實現(xiàn)方式雖能更好的理解zk來實現(xiàn)分布式鎖的邏輯亲桦,但本身zk原生實現(xiàn)編碼實現(xiàn)較多,并且很難保證是否有有問題浊仆,不太建議自己編碼來實現(xiàn)zk原生的分布式鎖烙肺,如果有興趣的同學可自行實現(xiàn)可重入鎖的邏輯,上述已經(jīng)分析了很多實現(xiàn)方案氧卧,這兒不在對此深入桃笙。

客戶端Curator實現(xiàn)

接下來我們通過zk的客戶端Curator來實現(xiàn)zk的分布式鎖。
Apache 開源框架Curator是一個比較完善的ZooKeeper客戶端框架沙绝,通過封裝的一套高級API 簡化了ZooKeeper的操作搏明。

Curator鎖分類

可重入互斥鎖 InterProcessMutex
不可重入互斥鎖 InterProcessSemaphoreMutex
讀寫鎖 InterProcessReadWriteLock
集合鎖 InterProcessMultiLock

具體實現(xiàn)

接下來我們通過zk客戶端Curator來實現(xiàn)分布式鎖

    InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(ApplicationContextUtil.getBean(CuratorFramework.class), String.format("/lock-space-1/%s", "KILL_LOCK"));
//    InterProcessMutex mutex = new InterProcessMutex(ApplicationContextUtil.getBean(CuratorFramework.class), String.format("/lock-space-1/%s", "KILL_LOCK"));

public String kill() throws Exception {
        mutex.acquire();
//        mutex.acquire(15, TimeUnit.SECONDS);
        try{
            // 定義商品key值
            String key = "goods";
            // 獲取商品數(shù)量
            Object obj = redisTemplate.opsForValue().get(key);
            Integer mount = Integer.valueOf(obj.toString());
            // 如果商品被搶完,直接返回
            if (mount < 0 || mount == 0) {
                System.out.println("很遺憾闪檬,商品已被搶完【kill】");
                return "很遺憾星著,商品已被搶完";
            }
            // 線程睡眠,目的在于放大錯誤
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 搶到商品后粗悯,將redis的商品數(shù)量減一
            mount = --mount;
            redisTemplate.opsForValue().set(key, mount.toString());
            // 打印虚循,以便觀察
            System.err.println(System.currentTimeMillis() + "-" + Thread.currentThread().getName() + ":搶到第" + (mount + 1) + "件商品【kill】");

        } catch (Exception e) {
            mutex.release();
            e.printStackTrace();
        } finally {
            mutex.release();
        }
         return "恭喜,商品搶購成功";
    }

Curator-InterProcessMutex可重入鎖源碼探究

InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver)
{
    basePath = PathUtils.validatePath(path);
    internals = new LockInternals(client, driver, path, lockName, maxLeases);
}


public boolean acquire(long time, TimeUnit unit) throws Exception
{
    return internalLock(time, unit);
}
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */

        Thread currentThread = Thread.currentThread();
        // LockData存儲當前持有鎖的線程:為了實現(xiàn)可重入
        LockData lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // 當前鎖 重入++1
            lockData.lockCount.incrementAndGet();
            return true;
        }

        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {   
            // 獲取鎖成功放到緩存中
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
}

 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
        final long      startMillis = System.currentTimeMillis();
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int             retryCount = 0;

        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
        ...省略部分代碼
        // 創(chuàng)建臨時有序節(jié)點
        ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
        // 執(zhí)行獲取鎖邏輯
        hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        ...省略部分代碼
        return ourPath;
    }
    
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }

            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                // 對所有節(jié)點進行排序:從小到大
                List<String>        children = getSortedChildren();
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
                // 返回當前節(jié)點或者等待節(jié)點的上一個節(jié)點
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    // 上一個節(jié)點路徑信息
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try
                        {
                            //設置監(jiān)聽器,getData會判讀前一個節(jié)點是否存在横缔,不存在就會拋出異常從而不會設置監(jiān)聽器
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }
                                // 等待一段時間被喚醒
                                wait(millisToWait);
                            }
                            else
                            {
                                // 一直等待被喚醒
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e )
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }
public void release() throws Exception
    {
        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData == null )
        {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }
        // LockData當前線程可重入value--1
        int newLockCount = lockData.lockCount.decrementAndGet();
        if ( newLockCount > 0 )
        {
            return;
        }
        if ( newLockCount < 0 )
        {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try
        {   
            // value == 0 的時候才真正刪除臨時節(jié)點
            internals.releaseLock(lockData.lockPath);
        }
        finally
        {
            threadData.remove(currentThread);
        }
    }

    final void releaseLock(String lockPath) throws Exception
    {
        // 一處所有監(jiān)聽者
        client.removeWatchers();
        revocable.set(null);
        // 刪除臨時節(jié)點
        deleteOurPath(lockPath);
    }

 private void deleteOurPath(String ourPath) throws Exception
    {
        try
        {
            client.delete().guaranteed().forPath(ourPath);
        }
        catch ( KeeperException.NoNodeException e )
        {
            // ignore - already deleted (possibly expired session, etc.)
        }
    }

分布式鎖各自有何優(yōu)勢

基于Redis的分布式鎖铺遂,適用于并發(fā)量很大、性能要求很高的茎刚、而可靠性問題可以通過其他方案去彌補的場景襟锐。
基于zk的分布式鎖,適用于高可靠(高可用)而并發(fā)量不是太大的場景膛锭;因為每次在創(chuàng)建鎖和釋放鎖的過程中粮坞,都要動態(tài)創(chuàng)建、銷毀瞬時節(jié)點來實現(xiàn)鎖功能初狰。大家知道莫杈,ZK中創(chuàng)建和刪除節(jié)點只能通過Leader服務器來執(zhí)行,然后Leader服務器還需要將數(shù)據(jù)同不到所有的Follower機器上奢入,這樣頻繁的網(wǎng)絡通信姓迅,性能的短板是非常突出的。
而數(shù)據(jù)庫來實現(xiàn)的分布式鎖俊马,受制于連接池資源丁存、無鎖失效機制、單點等因素柴我,在并發(fā)量較低可靠性不那么強的時候也可以用解寝。

分布式鎖沒有絕對的可靠性,只能通過人為補償機制竟可能的提升鎖可靠性艘儒。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
禁止轉(zhuǎn)載聋伦,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者。
  • 序言:七十年代末界睁,一起剝皮案震驚了整個濱河市觉增,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌翻斟,老刑警劉巖逾礁,帶你破解...
    沈念sama閱讀 222,183評論 6 516
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異访惜,居然都是意外死亡嘹履,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 94,850評論 3 399
  • 文/潘曉璐 我一進店門债热,熙熙樓的掌柜王于貴愁眉苦臉地迎上來砾嫉,“玉大人,你說我怎么就攤上這事窒篱』拦危” “怎么了舶沿?”我有些...
    開封第一講書人閱讀 168,766評論 0 361
  • 文/不壞的土叔 我叫張陵,是天一觀的道長配并。 經(jīng)常有香客問我括荡,道長,這世上最難降的妖魔是什么荐绝? 我笑而不...
    開封第一講書人閱讀 59,854評論 1 299
  • 正文 為了忘掉前任,我火速辦了婚禮避消,結(jié)果婚禮上低滩,老公的妹妹穿的比我還像新娘。我一直安慰自己岩喷,他們只是感情好恕沫,可當我...
    茶點故事閱讀 68,871評論 6 398
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著纱意,像睡著了一般婶溯。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上偷霉,一...
    開封第一講書人閱讀 52,457評論 1 311
  • 那天迄委,我揣著相機與錄音,去河邊找鬼类少。 笑死叙身,一個胖子當著我的面吹牛,可吹牛的內(nèi)容都是我干的硫狞。 我是一名探鬼主播信轿,決...
    沈念sama閱讀 40,999評論 3 422
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼残吩!你這毒婦竟也來了财忽?” 一聲冷哼從身側(cè)響起,我...
    開封第一講書人閱讀 39,914評論 0 277
  • 序言:老撾萬榮一對情侶失蹤泣侮,失蹤者是張志新(化名)和其女友劉穎即彪,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體活尊,經(jīng)...
    沈念sama閱讀 46,465評論 1 319
  • 正文 獨居荒郊野嶺守林人離奇死亡祖凫,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 38,543評論 3 342
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了酬凳。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片惠况。...
    茶點故事閱讀 40,675評論 1 353
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖宁仔,靈堂內(nèi)的尸體忽然破棺而出稠屠,到底是詐尸還是另有隱情,我是刑警寧澤,帶...
    沈念sama閱讀 36,354評論 5 351
  • 正文 年R本政府宣布权埠,位于F島的核電站榨了,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏攘蔽。R本人自食惡果不足惜龙屉,卻給世界環(huán)境...
    茶點故事閱讀 42,029評論 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望满俗。 院中可真熱鬧转捕,春花似錦、人聲如沸唆垃。這莊子的主人今日做“春日...
    開封第一講書人閱讀 32,514評論 0 25
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽辕万。三九已至枢步,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間渐尿,已是汗流浹背醉途。 一陣腳步聲響...
    開封第一講書人閱讀 33,616評論 1 274
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留砖茸,地道東北人结蟋。 一個月前我還...
    沈念sama閱讀 49,091評論 3 378
  • 正文 我出身青樓,卻偏偏與公主長得像渔彰,于是被迫代替她去往敵國和親嵌屎。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 45,685評論 2 360

推薦閱讀更多精彩內(nèi)容