使用Redisson實現(xiàn)分布式鎖草描,Spring AOP簡化之

本文的部分代碼存在問題互拾,可參考優(yōu)化版:分布式鎖可以這么簡單抵碟?氛驮。比如:支持 SpEL 表達式刹淌;修復了鎖超時后祟偷,數(shù)據未回滾的問題彩郊;修復了在某些情況下分布式鎖不生效的問題等前弯。

源碼

Redisson概述

Redisson是一個在Redis的基礎上實現(xiàn)的Java駐內存數(shù)據網格(In-Memory Data Grid)。它不僅提供了一系列的分布式的Java常用對象秫逝,還提供了許多分布式服務恕出。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern)违帆,從而讓使用者能夠將精力更集中地放在處理業(yè)務邏輯上浙巫。

Redisson底層采用的是Netty 框架。支持Redis 2.8以上版本刷后,支持Java1.6+以上版本的畴。

關于Redisson更多詳細介紹,可參考Redssion概述

Redisson提供的分布式鎖

可重入鎖

Redisson的分布式可重入鎖RLock Java對象實現(xiàn)了java.util.concurrent.locks.Lock接口尝胆,同時還支持自動過期解鎖丧裁。下面是RLock的基本使用方法:

RLock lock = redisson.getLock("anyLock");
// 最常見的使用方法
lock.lock();

// 支持過期解鎖功能
// 10秒鐘以后自動解鎖
// 無需調用unlock方法手動解鎖
lock.lock(10, TimeUnit.SECONDS);

// 嘗試加鎖,最多等待100秒班巩,上鎖以后10秒自動解鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

Redisson同時還為分布式鎖提供了異步執(zhí)行的相關方法:

RLock lock = redisson.getLock("anyLock");
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);

公平鎖

Redisson分布式可重入公平鎖也是實現(xiàn)了java.util.concurrent.locks.Lock接口的一種RLock對象渣慕。在提供了自動過期解鎖功能的同時嘶炭,保證了當多個Redisson客戶端線程同時請求加鎖時,優(yōu)先分配給先發(fā)出請求的線程逊桦。

RLock fairLock = redisson.getFairLock("anyLock");
// 最常見的使用方法
fairLock.lock();

// 支持過期解鎖功能
// 10秒鐘以后自動解鎖
// 無需調用unlock方法手動解鎖
fairLock.lock(10, TimeUnit.SECONDS);

// 嘗試加鎖眨猎,最多等待100秒,上鎖以后10秒自動解鎖
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
...
fairLock.unlock();

其他鎖

Redisson還提供了其他機制的鎖强经,如聯(lián)鎖(MultiLock)睡陪、紅鎖(RedLock)等。詳細可參考:分布式鎖和同步器

使用Redisson實現(xiàn)分布式鎖

  1. 定義回調接口
/**
 * 分布式鎖回調接口
 */
public interface DistributedLockCallback<T> {
    /**
     * 調用者必須在此方法中實現(xiàn)需要加分布式鎖的業(yè)務邏輯
     *
     * @return
     */
    public T process();

    /**
     * 得到分布式鎖名稱
     *
     * @return
     */
    public String getLockName();
}
  1. 定義分布式鎖模板
/**
 * 分布式鎖操作模板
 */
public interface DistributedLockTemplate {

    long DEFAULT_WAIT_TIME = 30;
    long DEFAULT_TIMEOUT   = 5;
    TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;

    /**
     * 使用分布式鎖匿情,使用鎖默認超時時間兰迫。
     * @param callback
     * @param fairLock 是否使用公平鎖
     * @return
     */
    <T> T lock(DistributedLockCallback<T> callback, boolean fairLock);

    /**
     * 使用分布式鎖。自定義鎖的超時時間
     *
     * @param callback
     * @param leaseTime 鎖超時時間炬称。超時后自動釋放鎖汁果。
     * @param timeUnit
     * @param fairLock 是否使用公平鎖
     * @param <T>
     * @return
     */
    <T> T lock(DistributedLockCallback<T> callback, long leaseTime, TimeUnit timeUnit, boolean fairLock);

    /**
     * 嘗試分布式鎖,使用鎖默認等待時間玲躯、超時時間据德。
     * @param callback
     * @param fairLock 是否使用公平鎖
     * @param <T>
     * @return
     */
    <T> T tryLock(DistributedLockCallback<T> callback, boolean fairLock);

    /**
     * 嘗試分布式鎖,自定義等待時間跷车、超時時間棘利。
     * @param callback
     * @param waitTime 獲取鎖最長等待時間
     * @param leaseTime 鎖超時時間。超時后自動釋放鎖朽缴。
     * @param timeUnit
     * @param fairLock 是否使用公平鎖
     * @param <T>
     * @return
     */
    <T> T tryLock(DistributedLockCallback<T> callback, long waitTime, long leaseTime, TimeUnit timeUnit, boolean fairLock);
}
  1. 實現(xiàn)分布式鎖模板
public class SingleDistributedLockTemplate implements DistributedLockTemplate {
    private RedissonClient redisson;

    public SingleDistributedLockTemplate() {
    }

    public SingleDistributedLockTemplate(RedissonClient redisson) {
        this.redisson = redisson;
    }

    @Override
    public <T> T lock(DistributedLockCallback<T> callback, boolean fairLock) {
        return lock(callback, DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT, fairLock);
    }

    @Override
    public <T> T lock(DistributedLockCallback<T> callback, long leaseTime, TimeUnit timeUnit, boolean fairLock) {
        RLock lock = getLock(callback.getLockName(), fairLock);
        try {
            lock.lock(leaseTime, timeUnit);
            return callback.process();
        } finally {
            if (lock != null && lock.isLocked()) {
                lock.unlock();
            }
        }
    }

    @Override
    public <T> T tryLock(DistributedLockCallback<T> callback, boolean fairLock) {
        return tryLock(callback, DEFAULT_WAIT_TIME, DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT, fairLock);
    }

    @Override
    public <T> T tryLock(DistributedLockCallback<T> callback, long waitTime, long leaseTime, TimeUnit timeUnit, boolean fairLock) {
        RLock lock = getLock(callback.getLockName(), fairLock);
        try {
            if (lock.tryLock(waitTime, leaseTime, timeUnit)) {
                return callback.process();
            }
        } catch (InterruptedException e) {

        } finally {
            if (lock != null && lock.isLocked()) {
                lock.unlock();
            }
        }
        return null;
    }

    private RLock getLock(String lockName, boolean fairLock) {
        RLock lock;
        if (fairLock) {
            lock = redisson.getFairLock(lockName);
        } else {
            lock = redisson.getLock(lockName);
        }
        return lock;
    }

    public void setRedisson(RedissonClient redisson) {
        this.redisson = redisson;
    }
}

注:
加鎖后的業(yè)務邏輯處理完畢后善玫,會進行“釋放鎖”操作,在釋放的時候密强,有可能因為業(yè)務邏輯的處理時間較長茅郎,超過預設的leaseTime,默認5s誓斥,鎖會被自動釋放只洒,這時若執(zhí)行釋放鎖的邏輯,會拋一個類似“xx鎖不被該線程所持有”的異常劳坑。所以上面代碼毕谴,在執(zhí)行釋放鎖之前的判斷語句,應該換成if ( lock != null && lock.isHeldByCurrentThread() )距芬。因為isLocked()方法是判斷該鎖是否被某一個線程所持有涝开,而不能知道是哪一個線程持有。而isHeldByCurrentThread()的作用是檢測該鎖是否被當前線程所持有框仔,當返回true的時候舀武,就可以放心的去釋放鎖了(一般不會出問題了,除非在釋放鎖的前一刻离斩,剛好時間到了)银舱。不過瘪匿,一般用到鎖的邏輯,要盡可能的少寻馏,而不能把一大段代碼塞到需要同步執(zhí)行的邏輯中棋弥。
修改完的代碼如下:

 public class SingleDistributedLockTemplate implements DistributedLockTemplate {
    private RedissonClient redisson;

    public SingleDistributedLockTemplate() {
    }

    public SingleDistributedLockTemplate(RedissonClient redisson) {
        this.redisson = redisson;
    }

    @Override
    public <T> T lock(DistributedLockCallback<T> callback, boolean fairLock) {
        return lock(callback, DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT, fairLock);
    }

    @Override
    public <T> T lock(DistributedLockCallback<T> callback, long leaseTime, TimeUnit timeUnit, boolean fairLock) {
        RLock lock = getLock(redisson, callback.getLockName(), fairLock);
        try {
            lock.lock(leaseTime, timeUnit);
            return callback.process();
        } finally {
            if (lock != null && lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }

    @Override
    public <T> T tryLock(DistributedLockCallback<T> callback, boolean fairLock) {
        return tryLock(callback, DEFAULT_WAIT_TIME, DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT, fairLock);
    }

    @Override
    public <T> T tryLock(DistributedLockCallback<T> callback,
                         long waitTime,
                         long leaseTime,
                         TimeUnit timeUnit,
                         boolean fairLock) {
        RLock lock = getLock(redisson, callback.getLockName(), fairLock);
        try {
            if (lock.tryLock(waitTime, leaseTime, timeUnit)) {
                return callback.process();
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            if (lock != null && lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
        return null;
    }

    public static RLock getLock(RedissonClient redisson, String lockName, boolean fairLock) {
        RLock lock;
        if (fairLock) {
            lock = redisson.getFairLock(lockName);
        } else {
            lock = redisson.getLock(lockName);
        }
        return lock;
    }

}
  1. 使用SingleDistributedLockTemplate
DistributedLockTemplate lockTemplate = ...;
final String lockName = ...; 
lockTemplate.lock(new DistributedLockCallback<Object>() {
    @Override
    public Object process() {
        //do some business
        return null;
    }

    @Override
    public String getLockName() {
        return lockName;
    }
}, false);

但是每次使用分布式鎖都要寫類似上面的重復代碼,有沒有什么方法可以只關注核心業(yè)務邏輯代碼的編寫诚欠,即上面的"do some business"顽染。下面介紹如何使用Spring AOP來實現(xiàn)這一目標。

使用Spring AOP簡化分布式鎖

  1. 定義注解@DistributedLock
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DistributedLock {
    /**
     * 鎖的名稱轰绵。
     * 如果lockName可以確定粉寞,直接設置該屬性。
     */
    String lockName() default "";

    /**
     * lockName后綴
     */
    String lockNamePre() default "";
    /**
     * lockName后綴
     */
    String lockNamePost() default "lock";

    /**
     * 獲得鎖名時拼接前后綴用到的分隔符
     * @return
     */
    String separator() default ".";
    /**
     * <pre>
     *     獲取注解的方法參數(shù)列表的某個參數(shù)對象的某個屬性值來作為lockName左腔。因為有時候lockName是不固定的唧垦。
     *     當param不為空時,可以通過argNum參數(shù)來設置具體是參數(shù)列表的第幾個參數(shù)液样,不設置則默認取第一個业崖。
     * </pre>
     */
    String param() default "";
    /**
     * 將方法第argNum個參數(shù)作為鎖
     */
    int argNum() default 0;
    /**
     * 是否使用公平鎖。
     * 公平鎖即先來先得蓄愁。
     */
    boolean fairLock() default false;
    /**
     * 是否使用嘗試鎖。
     */
    boolean tryLock() default false;
    /**
     * 最長等待時間狞悲。
     * 該字段只有當tryLock()返回true才有效撮抓。
     */
    long waitTime() default 30L;
    /**
     * 鎖超時時間。
     * 超時時間過后摇锋,鎖自動釋放丹拯。
     * 建議:
     *   盡量縮簡需要加鎖的邏輯。
     */
    long leaseTime() default 5L;
    /**
     * 時間單位荸恕。默認為秒乖酬。
     */
    TimeUnit timeUnit() default TimeUnit.SECONDS;

}
  1. 定義切面織入的代碼
@Aspect
@Component
public class DistributedLockAspect {

    @Autowired
    private DistributedLockTemplate lockTemplate;

    @Pointcut("@annotation(cn.sprinkle.study.distributedlock.common.annotation.DistributedLock)")
    public void DistributedLockAspect() {}

    @Around(value = "DistributedLockAspect()")
    public Object doAround(ProceedingJoinPoint pjp) throws Throwable {

        //切點所在的類
        Class targetClass = pjp.getTarget().getClass();
        //使用了注解的方法
        String methodName = pjp.getSignature().getName();

        Class[] parameterTypes = ((MethodSignature)pjp.getSignature()).getMethod().getParameterTypes();

        Method method = targetClass.getMethod(methodName, parameterTypes);

        Object[] arguments = pjp.getArgs();

        final String lockName = getLockName(method, arguments);

        return lock(pjp, method, lockName);
    }

    @AfterThrowing(value = "DistributedLockAspect()", throwing="ex")
    public void afterThrowing(Throwable ex) {
        throw new RuntimeException(ex);
    }

    public String getLockName(Method method, Object[] args) {
        Objects.requireNonNull(method);
        DistributedLock annotation = method.getAnnotation(DistributedLock.class);

        String lockName = annotation.lockName(),
                param = annotation.param();

        if (isEmpty(lockName)) {
            if (args.length > 0) {
                if (isNotEmpty(param)) {
                    Object arg;
                    if (annotation.argNum() > 0) {
                        arg = args[annotation.argNum() - 1];
                    } else {
                        arg = args[0];
                    }
                    lockName = String.valueOf(getParam(arg, param));
                } else if (annotation.argNum() > 0) {
                    lockName = args[annotation.argNum() - 1].toString();
                }
            }
        }

        if (isNotEmpty(lockName)) {
            String preLockName = annotation.lockNamePre(),
                    postLockName = annotation.lockNamePost(),
                    separator = annotation.separator();

            StringBuilder lName = new StringBuilder();
            if (isNotEmpty(preLockName)) {
                lName.append(preLockName).append(separator);
            }
            lName.append(lockName);
            if (isNotEmpty(postLockName)) {
                lName.append(separator).append(postLockName);
            }

            lockName = lName.toString();

            return lockName;
        }

        throw new IllegalArgumentException("Can't get or generate lockName accurately!");
    }

    /**
     * 從方法參數(shù)獲取數(shù)據
     *
     * @param param
     * @param arg 方法的參數(shù)數(shù)組
     * @return
     */
    public Object getParam(Object arg, String param) {
        if (isNotEmpty(param) && arg != null) {
            try {
                Object result = PropertyUtils.getProperty(arg, param);
                return result;
            } catch (NoSuchMethodException e) {
                throw new IllegalArgumentException(arg + "沒有屬性" + param + "或未實現(xiàn)get方法。", e);
            } catch (Exception e) {
                throw new RuntimeException("", e);
            }
        }
        return null;
    }

    public Object lock(ProceedingJoinPoint pjp, Method method, final String lockName) {

        DistributedLock annotation = method.getAnnotation(DistributedLock.class);

        boolean fairLock = annotation.fairLock();

        boolean tryLock = annotation.tryLock();

        if (tryLock) {
            return tryLock(pjp, annotation, lockName, fairLock);
        } else {
            return lock(pjp,lockName, fairLock);
        }
    }

    public Object lock(ProceedingJoinPoint pjp, final String lockName, boolean fairLock) {
        return lockTemplate.lock(new DistributedLockCallback<Object>() {
            @Override
            public Object process() {
                return proceed(pjp);
            }

            @Override
            public String getLockName() {
                return lockName;
            }
        }, fairLock);
    }

    public Object tryLock(ProceedingJoinPoint pjp, DistributedLock annotation, final String lockName, boolean fairLock) {

        long waitTime = annotation.waitTime(),
                leaseTime = annotation.leaseTime();
        TimeUnit timeUnit = annotation.timeUnit();

        return lockTemplate.tryLock(new DistributedLockCallback<Object>() {
            @Override
            public Object process() {
                return proceed(pjp);
            }

            @Override
            public String getLockName() {
                return lockName;
            }
        }, waitTime, leaseTime, timeUnit, fairLock);
    }

    public Object proceed(ProceedingJoinPoint pjp) {
        try {
            return pjp.proceed();
        } catch (Throwable throwable) {
            throw new RuntimeException(throwable);
        }
    }

    private boolean isEmpty(Object str) {
        return str == null || "".equals(str);
    }

    private boolean isNotEmpty(Object str) {
        return !isEmpty(str);
    }
}
  1. 使用注解@DistributedLock實現(xiàn)分布式鎖

有了上面兩段代碼融求,以后需要用到分布式鎖咬像,只需在核心業(yè)務邏輯方法添加注解@DistributedLock,并設置LockName生宛、fairLock等即可县昂。下面的DistributionService演示了多種使用情景。

@Service
public class DistributionService {

    @Autowired
    private RedissonClient redissonClient;

    @DistributedLock(param = "id", lockNamePost = ".lock")
    public Integer aspect(Person person) {
        RMap<String, Integer> map = redissonClient.getMap("distributionTest");

        Integer count = map.get("count");

        if (count > 0) {
            count = count - 1;
            map.put("count", count);
        }

        return count;
    }

    @DistributedLock(argNum = 1, lockNamePost = ".lock")
    public Integer aspect(String i) {
        RMap<String, Integer> map = redissonClient.getMap("distributionTest");

        Integer count = map.get("count");

        if (count > 0) {
            count = count - 1;
            map.put("count", count);
        }

        return count;
    }

    @DistributedLock(lockName = "lock", lockNamePost = ".lock")
    public int aspect(Action<Integer> action) {
        return action.action();
    }
}
  1. 測試

定義一個Worker類:

public class Worker implements Runnable {

    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;
    private final DistributionService service;
    private RedissonClient redissonClient;

    public Worker(CountDownLatch startSignal, CountDownLatch doneSignal, DistributionService service, RedissonClient redissonClient) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
        this.service = service;
        this.redissonClient = redissonClient;
    }

    @Override
    public void run() {
        try {
            startSignal.await();

            System.out.println(Thread.currentThread().getName() + " start");

//            Integer count = service.aspect(new Person(1, "張三"));
//            Integer count = service.aspect("1");

            Integer count = service.aspect(() -> {
                RMap<String, Integer> map = redissonClient.getMap("distributionTest");

                Integer count1 = map.get("count");
                if (count1 > 0) {
                    count1 = count1 - 1;
                    map.put("count", count1);
                }
                return count1;
            });

            System.out.println(Thread.currentThread().getName() + ": count = " + count);

            doneSignal.countDown();

        } catch (InterruptedException ex) {
            System.out.println(ex);
        }
    }
}

定義Controller類:

@RestController
@RequestMapping("/distributedLockTest")
public class DistributedLockTestController {

    private int count = 10;

    @Autowired
    private RedissonClient redissonClient;

    @Autowired
    private DistributionService service;

    @RequestMapping(method = RequestMethod.GET)
    public String distributedLockTest() throws Exception {

        RMap<String, Integer> map = redissonClient.getMap("distributionTest");
        map.put("count", 8);

        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(count);

        for (int i = 0; i < count; ++i) { // create and start threads
            new Thread(new Worker(startSignal, doneSignal, service)).start();
        }

        startSignal.countDown(); // let all threads proceed
        doneSignal.await();
        System.out.println("All processors done. Shutdown connection");

        return "finish";
    }

}

Redisson基本配置:

singleServerConfig:
  idleConnectionTimeout: 10000
  pingTimeout: 1000
  connectTimeout: 10000
  timeout: 3000
  retryAttempts: 3
  retryInterval: 1500
  reconnectionTimeout: 3000
  failedAttempts: 3
  password:
  subscriptionsPerConnection: 5
  clientName: null
  address: "redis://127.0.0.1:6379"
  subscriptionConnectionMinimumIdleSize: 1
  subscriptionConnectionPoolSize: 50
  connectionMinimumIdleSize: 10
  connectionPoolSize: 64
  database: 0
  dnsMonitoring: false
  dnsMonitoringInterval: 5000
threads: 0
nettyThreads: 0
codec: !<org.redisson.codec.JsonJacksonCodec> {}
useLinuxNativeEpoll: false

工程中需要注入的對象:

@Value("classpath:/redisson-conf.yml")
Resource configFile;

@Bean(destroyMethod = "shutdown")
RedissonClient redisson()
        throws IOException {
    Config config = Config.fromYAML(configFile.getInputStream());
    return Redisson.create(config);
}

@Bean
DistributedLockTemplate distributedLockTemplate(RedissonClient redissonClient) {
    return new SingleDistributedLockTemplate(redissonClient);
}

需要引入的依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.5.3</version>
</dependency>
<dependency>
    <groupId>commons-beanutils</groupId>
    <artifactId>commons-beanutils</artifactId>
    <version>1.8.3</version>
</dependency>

最后啟動工程陷舅,然后訪問localhost:8080/distributedLockTest倒彰,可以看到如下結果:

分布式鎖測試結果

觀察結果,可以看出莱睁,10個線程中只有8個線程能執(zhí)行count減1操作待讳,而且多個線程是依次執(zhí)行的芒澜。也就是說分布式鎖起作用了。

使用lambda

該注解還可以配合lambda使用创淡。在介紹之前痴晦,先科普一下使用spring注解時需要注意的地方,有兩點辩昆。

第一阅酪,在使用spring提供的方法注解時,比較常用的是@Transactional注解汁针。若是Service層不帶注解的方法A調用同一個Service類帶@Transactional注解的方法B术辐,那么方法B的事務注解將不起作用。比如:

...
    public void methodA() {
        methodB();
    }

    @Transactional
    public void methodB() {
        // 操作表A
        // 操作表B
    }
...

上面的代碼中施无,假設有一次調用方法A辉词,方法A又調用方法B,但是此次調用在操作表B時出錯了猾骡。我們的意愿是這樣的:之前對表A的操作回滾瑞躺。但實際上卻不會回滾,因為此時的@Transactional注解并不會生效兴想。原因是調用方法B是同一個Service的方法A幢哨,而若是在其他類中調用方法B注解才生效。這也就不難解釋為什么注解加在private方法上是不起作用的了嫂便。因為private方法只能在同一個方法中調用捞镰。

上面所說的調用同一個類的帶注解的方法,該注解將不生效毙替,感興趣的可以自己找找原因岸售,這里就不細說了。

第二厂画,注解(包括spring提供的凸丸、自定義的)加在普通類的方法上,spring是掃描不到的袱院。普通類指類簽名上沒有諸如@Service等Spring提供的注解(因為此分布式鎖集成使用的是spring aop屎慢,所以介紹的都是與spring相關的)。比如忽洛,如果把上面貼出的DistributionService中的各個方法放到Worker中抛人,那么這些注解將不起作用,因為Worker類簽名并沒有加任何注解脐瑰,所以spring在掃描的時候直接跳過該類妖枚,因此定義在Worker中的帶@DistributedLock注解的方法(如果有的話)也就無法被掃描到。

在上面貼出的代碼中苍在,Worker中需要使用分布式鎖的業(yè)務邏輯比較簡單绝页,所以都寫到DistributionService中荠商,但在實際開發(fā)中,我們通常有把業(yè)務邏輯直接寫在Worker中的需求续誉,畢竟是與Worker相關的莱没,放到哪一個Service都感覺很別扭。所以酷鸦,我們可以定義一個分布式鎖管理器饰躲,如DistributedLockManager,然后在初始化Worker時引入即可臼隔。接下來改造Worker和定義DistributedLockManager

Worker1:

public class Worker1 implements Runnable {

    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;
    private final DistributedLockManager distributedLockManager;
    private RedissonClient redissonClient;

    public Worker1(CountDownLatch startSignal, CountDownLatch doneSignal, DistributedLockManager distributedLockManager, RedissonClient redissonClient) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
        this.distributedLockManager = distributedLockManager;
        this.redissonClient = redissonClient;
    }

    @Override
    public void run() {

        try {
            System.out.println(Thread.currentThread().getName() + " start");

            startSignal.await();

            Integer count = aspect("lock");

            System.out.println(Thread.currentThread().getName() + ": count = " + count);

            doneSignal.countDown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public int aspect(String lockName) {
        return distributedLockManager.aspect(lockName, this);
    }

    public int aspectBusiness(String lockName) {
        RMap<String, Integer> map = redissonClient.getMap("distributionTest");

        Integer count = map.get("count");

        if (count > 0) {
            count = count - 1;
            map.put("count", count);
        }

        return count;
    }

}

DistributedLockManager:

@Component
public class DistributedLockManager {

    @DistributedLock(argNum = 1, lockNamePost = ".lock")
    public Integer aspect(String lockName, Worker1 worker1) {
        return worker1.aspectBusiness(lockName);
    }

}

這樣做嘹裂,雖然可以將業(yè)務從Service層抽離出來,放到分布式鎖管理器DistributedLockManager統(tǒng)一管理摔握,但每次都需要將Worker一起傳過去寄狼,同樣感覺很別扭。那么有沒有更好的辦法呢氨淌?有泊愧,使用lambda。(上面鋪墊了那么多盛正,終于進入正題了删咱!o(╥﹏╥)o)

lambda是java 8的新特性之一,若未了解過的建議先去惡補一番豪筝。因為java 8支持lambda腋腮,所以也新加了很多函數(shù)式接口,這里簡單列幾個:

函數(shù)式接口 參數(shù)類型 返回類型 描述
Supplier<T> T 提供一個T類型的值
Consumer<T> T void 處理一個T類型的值
BiConsumer<T, U> T, U void 處理T類型和U類型的值
Predicate<T> T boolean 一個 計算Boolean值的函數(shù)
Function<T, R> T R 一個參數(shù)類型為T的函數(shù)
ToIntFunction<T>
ToLongFunction<T>
ToDoubleFunction<T>
T int
long
double
分別計算int壤蚜、long、double值的函數(shù)
IntFunction<R>
LongFunction<R>
DoubleFunction<R>
int
long
double
R 參數(shù)分別為int徊哑、long袜刷、double類型的函數(shù)
BiFunction<T, U, R> T, U R 一個參數(shù)類型為T和U的函數(shù)
UnaryOperator<T> T T 對類型T進行的一元操作
BinaryOperator<T> T, T T 對類型T進行的二元操作

觀察Worker1中方法aspect(Person)的邏輯,最后需要返回一個int值莺丑,所以我們可以使用Supplier<T>來作為參數(shù)的類型著蟹,在分布式鎖管理器中添加一個方法,如下:

@DistributedLock(lockName = "lock", lockNamePost = ".lock")
public int aspect(Supplier<Integer> supplier) {
    return supplier.get();
}

然后梢莽,在Worker1中也定義一個方法:

private int aspect() {
    RMap<String, Integer> map = redissonClient.getMap("distributionTest");

    Integer count1 = map.get("count");
    if (count1 > 0) {
        count1 = count1 - 1;
        map.put("count", count1);
    }
    return count1;
}

最后在Worker1的run方法中使用萧豆,把Integer count = aspect("lock");替換成如下:

Integer count = distributedLockManager.aspect(() -> {
    return aspect();
});

其實也可以簡寫:

Integer count = distributedLockManager.aspect(() -> aspect());

通過這樣改造,是不是發(fā)現(xiàn)優(yōu)雅多了昏名。

測試

DistributedLockTestController中涮雷,幫下面的代碼替換成另一段代碼:

for (int i = 0; i < count; ++i) { // create and start threads
    new Thread(new Worker(startSignal, doneSignal, service, redissonClient)).start();
}

替換成:

for (int i = 0; i < count; ++i) { // create and start threads
    new Thread(new Worker1(startSignal, doneSignal, distributedLockManager, redissonClient)).start();
}

最后啟動工程,訪問http://localhost:8080/distributedLockTest轻局,可以看到類似如下的結果:

分布式鎖測試結果

另外洪鸭,因為暫時沒有找到合適的參數(shù)類型為“無”样刷、返回類型也為“無”的函數(shù)式接口(找到一個——Runnable览爵,但如果用了置鼻,怕產生歧義,所以就算了)蜓竹,既然如此箕母,我們不妨自己定義一個,如下:

@FunctionalInterface
public interface Action {
    void action();
}

使用也很簡單俱济,在DistributedLockManager定義類似如下的方法:

@DistributedLock(lockName = "lock", lockNamePost = ".lock")
public void doSomething(Action action) {
    action.action();
}

然后嘶是,在需要的地方這樣用:

distributedLockManager.doSomething(() -> {
    // do something
});

至此,使用Redisson實現(xiàn)分布式鎖姨蝴,然后使用Spring AOP簡化分布式鎖介紹完畢俊啼。

若有什么地方有錯誤的或需要改進的,歡迎留言一起討論交流左医。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
  • 序言:七十年代末授帕,一起剝皮案震驚了整個濱河市,隨后出現(xiàn)的幾起案子浮梢,更是在濱河造成了極大的恐慌跛十,老刑警劉巖,帶你破解...
    沈念sama閱讀 217,185評論 6 503
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件秕硝,死亡現(xiàn)場離奇詭異芥映,居然都是意外死亡,警方通過查閱死者的電腦和手機远豺,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 92,652評論 3 393
  • 文/潘曉璐 我一進店門奈偏,熙熙樓的掌柜王于貴愁眉苦臉地迎上來,“玉大人躯护,你說我怎么就攤上這事惊来。” “怎么了棺滞?”我有些...
    開封第一講書人閱讀 163,524評論 0 353
  • 文/不壞的土叔 我叫張陵裁蚁,是天一觀的道長。 經常有香客問我继准,道長枉证,這世上最難降的妖魔是什么? 我笑而不...
    開封第一講書人閱讀 58,339評論 1 293
  • 正文 為了忘掉前任移必,我火速辦了婚禮室谚,結果婚禮上,老公的妹妹穿的比我還像新娘。我一直安慰自己舞萄,他們只是感情好眨补,可當我...
    茶點故事閱讀 67,387評論 6 391
  • 文/花漫 我一把揭開白布。 她就那樣靜靜地躺著倒脓,像睡著了一般撑螺。 火紅的嫁衣襯著肌膚如雪。 梳的紋絲不亂的頭發(fā)上崎弃,一...
    開封第一講書人閱讀 51,287評論 1 301
  • 那天甘晤,我揣著相機與錄音,去河邊找鬼饲做。 笑死线婚,一個胖子當著我的面吹牛,可吹牛的內容都是我干的盆均。 我是一名探鬼主播塞弊,決...
    沈念sama閱讀 40,130評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼,長吁一口氣:“原來是場噩夢啊……” “哼泪姨!你這毒婦竟也來了游沿?” 一聲冷哼從身側響起,我...
    開封第一講書人閱讀 38,985評論 0 275
  • 序言:老撾萬榮一對情侶失蹤肮砾,失蹤者是張志新(化名)和其女友劉穎诀黍,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體仗处,經...
    沈念sama閱讀 45,420評論 1 313
  • 正文 獨居荒郊野嶺守林人離奇死亡眯勾,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內容為張勛視角 年9月15日...
    茶點故事閱讀 37,617評論 3 334
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了婆誓。 大學時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片存炮。...
    茶點故事閱讀 39,779評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡著榴,死狀恐怖柬姚,靈堂內的尸體忽然破棺而出严里,到底是詐尸還是另有隱情,我是刑警寧澤鞋屈,帶...
    沈念sama閱讀 35,477評論 5 345
  • 正文 年R本政府宣布,位于F島的核電站故觅,受9級特大地震影響厂庇,放射性物質發(fā)生泄漏。R本人自食惡果不足惜输吏,卻給世界環(huán)境...
    茶點故事閱讀 41,088評論 3 328
  • 文/蒙蒙 一权旷、第九天 我趴在偏房一處隱蔽的房頂上張望。 院中可真熱鬧,春花似錦拄氯、人聲如沸躲查。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,716評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽镣煮。三九已至,卻和暖如春鄙麦,著一層夾襖步出監(jiān)牢的瞬間典唇,已是汗流浹背。 一陣腳步聲響...
    開封第一講書人閱讀 32,857評論 1 269
  • 我被黑心中介騙來泰國打工胯府, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留介衔,地道東北人。 一個月前我還...
    沈念sama閱讀 47,876評論 2 370
  • 正文 我出身青樓骂因,卻偏偏與公主長得像炎咖,于是被迫代替她去往敵國和親。 傳聞我的和親對象是個殘疾皇子寒波,可洞房花燭夜當晚...
    茶點故事閱讀 44,700評論 2 354

推薦閱讀更多精彩內容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理乘盼,服務發(fā)現(xiàn),斷路器影所,智...
    卡卡羅2017閱讀 134,654評論 18 139
  • Spring Boot 參考指南 介紹 轉載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 46,808評論 6 342
  • Android 自定義View的各種姿勢1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 172,099評論 25 707
  • 今天戴老師讓我八點半到學校蹦肴,她要看著我寫語文練習冊,我還發(fā)現(xiàn)她穿的衣服和我是一樣的猴娩。后來過了一會兒戴老師才發(fā)現(xiàn)我和...
    小狐貍的麻麻閱讀 249評論 0 0
  • 今天是瑪雅歷日“藍色太陽的猴” “猴”代表小孩阴幌,“陽光”就是真實、燦爛卷中。那么具有“藍猴”圖騰的人就是像...