來(lái)源:https://www.cnblogs.com/huangqingshi/p/10290615.html
隨著現(xiàn)在分布式越來(lái)越普遍荠诬,分布式鎖也十分常用,上篇文章解釋了使用zookeeper實(shí)現(xiàn)分布式鎖苏遥,本次咱們說(shuō)一下如何用Redis實(shí)現(xiàn)分布式鎖和分布限流厕怜。
https://www.cnblogs.com/huangqingshi/p/9650837.html
Redis有個(gè)事務(wù)鎖侮东,就是如下的命令墩瞳,這個(gè)命令的含義是將一個(gè)value設(shè)置到一個(gè)key中,如果不存在將會(huì)賦值并且設(shè)置超時(shí)時(shí)間為30秒收苏,如何這個(gè)key已經(jīng)存在了亿卤,則不進(jìn)行設(shè)置。
SET key value NX PX 30000
這個(gè)事務(wù)鎖很好的解決了兩個(gè)單獨(dú)的命令鹿霸,一個(gè)設(shè)置set key value nx排吴,即該key不存在的話將對(duì)其進(jìn)行設(shè)置,另一個(gè)是expire key seconds懦鼠,設(shè)置該key的超時(shí)時(shí)間钻哩。我們可以想一下屹堰,如果這兩個(gè)命令用程序單獨(dú)使用會(huì)存在什么問(wèn)題:
如果一個(gè)set key的命令設(shè)置了key,然后程序異常了街氢,expire時(shí)間沒(méi)有設(shè)置扯键,那么這個(gè)key會(huì)一直鎖住。
如果一個(gè)set key時(shí)出現(xiàn)了異常珊肃,但是直接執(zhí)行了expire荣刑,過(guò)了一會(huì)兒之后另一個(gè)進(jìn)行set key,還沒(méi)怎么執(zhí)行代碼近范,結(jié)果key過(guò)期了,別的線程也進(jìn)入了鎖延蟹。
還有很多出問(wèn)題的可能點(diǎn)评矩,這里我們就不討論了,下面咱們來(lái)看看如何實(shí)現(xiàn)吧阱飘。
本文使用的Spring Boot 2.x + Spring data redis + Swagger +lombok + AOP + lua腳本斥杜。在實(shí)現(xiàn)的過(guò)程中遇到了很多問(wèn)題,都一一解決實(shí)現(xiàn)了沥匈。
依賴的POM文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
? ? ? ? xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
? ?<modelVersion>4.0.0</modelVersion>
? ?<parent>
? ? ? ?<groupId>org.springframework.boot</groupId>
? ? ? ?<artifactId>spring-boot-starter-parent</artifactId>
? ? ? ?<version>2.1.2.RELEASE</version>
? ? ? ?<relativePath/> <!-- lookup parent from repository -->
? ?</parent>
? ?<groupId>com.hqs</groupId>
? ?<artifactId>distributedlock</artifactId>
? ?<version>0.0.1-SNAPSHOT</version>
? ?<name>distributedlock</name>
? ?<description>Demo project for Spring Boot</description>
? ?<properties>
? ? ? ?<java.version>1.8</java.version>
? ?</properties>
? ?<dependencies>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>org.springframework.boot</groupId>
? ? ? ? ? ?<artifactId>spring-boot-starter-aop</artifactId>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>org.springframework.boot</groupId>
? ? ? ? ? ?<artifactId>spring-boot-starter-web</artifactId>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>org.springframework.boot</groupId>
? ? ? ? ? ?<artifactId>spring-boot-starter-data-redis</artifactId>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>org.springframework.boot</groupId>
? ? ? ? ? ?<artifactId>spring-boot-devtools</artifactId>
? ? ? ? ? ?<scope>runtime</scope>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>org.projectlombok</groupId>
? ? ? ? ? ?<artifactId>lombok</artifactId>
? ? ? ? ? ?<optional>true</optional>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>org.springframework.boot</groupId>
? ? ? ? ? ?<artifactId>spring-boot-starter-test</artifactId>
? ? ? ? ? ?<scope>test</scope>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>io.springfox</groupId>
? ? ? ? ? ?<artifactId>springfox-swagger-ui</artifactId>
? ? ? ? ? ?<version>2.9.2</version>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>io.springfox</groupId>
? ? ? ? ? ?<artifactId>springfox-swagger2</artifactId>
? ? ? ? ? ?<version>2.9.2</version>
? ? ? ? ? ?<scope>compile</scope>
? ? ? ?</dependency>
? ? ? ?<dependency>
? ? ? ? ? ?<groupId>redis.clients</groupId>
? ? ? ? ? ?<artifactId>jedis</artifactId>
? ? ? ? ? ?<version>2.9.0</version>
? ? ? ?</dependency>
? ?</dependencies>
? ?<build>
? ? ? ?<plugins>
? ? ? ? ? ?<plugin>
? ? ? ? ? ? ? ?<groupId>org.springframework.boot</groupId>
? ? ? ? ? ? ? ?<artifactId>spring-boot-maven-plugin</artifactId>
? ? ? ? ? ?</plugin>
? ? ? ?</plugins>
? ?</build>
</project>
使用了兩個(gè)lua腳本蔗喂,一個(gè)用于執(zhí)行l(wèi)ock,另一個(gè)執(zhí)行unlock高帖。
咱們簡(jiǎn)單看一下缰儿,lock腳本就是采用Redis事務(wù)執(zhí)行的set nx px命令,其實(shí)還有set nx ex命令散址,這個(gè)ex命令是采用秒的方式進(jìn)行設(shè)置過(guò)期時(shí)間乖阵,這個(gè)px是采用毫秒的方式設(shè)置過(guò)期時(shí)間。
value需要使用一個(gè)唯一的值预麸,這個(gè)值在解鎖的時(shí)候需要判斷是否一致瞪浸,如果一致的話就進(jìn)行解鎖。這個(gè)也是官方推薦的方法吏祸。另外在lock的地方我設(shè)置了一個(gè)result对蒲,用于輸出測(cè)試時(shí)的結(jié)果,這樣就可以結(jié)合程序去進(jìn)行debug了贡翘。
local expire = tonumber(ARGV[2])
local ret = redis.call('set', KEYS[1], ARGV[1], 'NX', 'PX', expire)
local strret = tostring(ret)
//用于查看結(jié)果蹈矮,我本機(jī)獲取鎖成功后程序返回隨機(jī)結(jié)果"table: 0x7fb4b3700fe0",否則返回"false"
redis.call('set', 'result', strret)
if strret == 'false' then
? ?return false
else
? ?return true
end
redis.call('del', 'result')
if redis.call('get', KEYS[1]) == ARGV[1] then
? ?return redis.call('del', KEYS[1])
else
? ?return 0
end
來(lái)看下代碼鸣驱,主要寫了兩個(gè)方法含滴,一個(gè)是用與鎖另外一個(gè)是用于結(jié)解鎖。這塊需要注意的是使用RedisTemplate丐巫,這塊意味著key和value一定都是String的谈况,我在使用的過(guò)程中就出現(xiàn)了一些錯(cuò)誤勺美。首先初始化兩個(gè)腳本到程序中,然后調(diào)用執(zhí)行腳本碑韵。
package com.hqs.distributedlock.lock;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;
import java.util.Collections;
@Slf4j
@Component
public class DistributedLock {
? ?//注意RedisTemplate用的String,String赡茸,后續(xù)所有用到的key和value都是String的
? ?@Autowired
? ?private RedisTemplate<String, String> redisTemplate;
? ?@Autowired
? ?RedisScript<Boolean> lockScript;
? ?@Autowired
? ?RedisScript<Long> unlockScript;
? ?public Boolean distributedLock(String key, String uuid, String secondsToLock) {
? ? ? ?Boolean locked = false;
? ? ? ?try {
? ? ? ? ? ?String millSeconds = String.valueOf(Integer.parseInt(secondsToLock) * 1000);
? ? ? ? ? ?locked =redisTemplate.execute(lockScript, Collections.singletonList(key), uuid, millSeconds);
? ? ? ? ? ?log.info("distributedLock.key{}: - uuid:{}: - timeToLock:{} - locked:{} - millSeconds:{}",
? ? ? ? ? ? ? ? ? ?key, uuid, secondsToLock, locked, millSeconds);
? ? ? ?} catch (Exception e) {
? ? ? ? ? ?log.error("error", e);
? ? ? ?}
? ? ? ?return locked;
? ?}
? ?public void distributedUnlock(String key, String uuid) {
? ? ? ?Long unlocked = redisTemplate.execute(unlockScript, Collections.singletonList(key),
? ? ? ? ? ? ? ?uuid);
? ? ? ?log.info("distributedLock.key{}: - uuid:{}: - unlocked:{}", key, uuid, unlocked);
? ?}
}
還有一個(gè)就是腳本定義的地方需要注意,返回的結(jié)果集一定是Long, Boolean祝闻,List, 一個(gè)反序列化的值占卧。這塊要注意。
package com.hqs.distributedlock.config;
import com.sun.org.apache.xpath.internal.operations.Bool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.scripting.ScriptSource;
import org.springframework.scripting.support.ResourceScriptSource;
@Configuration
@Slf4j
public class BeanConfiguration {
? ?/**
? ? * The script resultType should be one of
? ? * Long, Boolean, List, or a deserialized value type. It can also be null if the script returns
? ? * a throw-away status (specifically, OK).
? ? * @return
? ? */
? ?@Bean
? ?public RedisScript<Long> limitScript() {
? ? ? ?RedisScript redisScript = null;
? ? ? ?try {
? ? ? ? ? ?ScriptSource scriptSource = new ResourceScriptSource(new ClassPathResource("/scripts/limit.lua"));
// ? ? ? ? ? ?log.info("script:{}", scriptSource.getScriptAsString());
? ? ? ? ? ?redisScript = RedisScript.of(scriptSource.getScriptAsString(), Long.class);
? ? ? ?} catch (Exception e) {
? ? ? ? ? ?log.error("error", e);
? ? ? ?}
? ? ? ?return redisScript;
? ?}
? ?@Bean
? ?public RedisScript<Boolean> lockScript() {
? ? ? ?RedisScript<Boolean> redisScript = null;
? ? ? ?try {
? ? ? ? ? ?ScriptSource scriptSource = new ResourceScriptSource(new ClassPathResource("/scripts/lock.lua"));
? ? ? ? ? ?redisScript = RedisScript.of(scriptSource.getScriptAsString(), Boolean.class);
? ? ? ?} catch (Exception e) {
? ? ? ? ? ?log.error("error" , e);
? ? ? ?}
? ? ? ?return redisScript;
? ?}
? ?@Bean
? ?public RedisScript<Long> unlockScript() {
? ? ? ?RedisScript<Long> redisScript = null;
? ? ? ?try {
? ? ? ? ? ?ScriptSource scriptSource = new ResourceScriptSource(new ClassPathResource("/scripts/unlock.lua"));
? ? ? ? ? ?redisScript = RedisScript.of(scriptSource.getScriptAsString(), Long.class);
? ? ? ?} catch (Exception e) {
? ? ? ? ? ?log.error("error" , e);
? ? ? ?}
? ? ? ?return redisScript;
? ?}
? ?@Bean
? ?public RedisScript<Long> limitAnother() {
? ? ? ?DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
? ? ? ?redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("/scripts/limit.lua")));
? ? ? ?redisScript.setResultType(Long.class);
? ? ? ?return redisScript;
? ?}
}
好了联喘,這塊就寫好了华蜒,然后寫好controller類準(zhǔn)備測(cè)試。
@PostMapping("/distributedLock")
? ?@ResponseBody
? ?public String distributedLock(String key, String uuid, String secondsToLock, String userId) throws Exception{
// ? ? ? ?String uuid = UUID.randomUUID().toString();
? ? ? ?Boolean locked = false;
? ? ? ?try {
? ? ? ? ? ?locked = lock.distributedLock(key, uuid, secondsToLock);
? ? ? ? ? ?if(locked) {
? ? ? ? ? ? ? ?log.info("userId:{} is locked - uuid:{}", userId, uuid);
? ? ? ? ? ? ? ?log.info("do business logic");
? ? ? ? ? ? ? ?TimeUnit.MICROSECONDS.sleep(3000);
? ? ? ? ? ?} else {
? ? ? ? ? ? ? ?log.info("userId:{} is not locked - uuid:{}", userId, uuid);
? ? ? ? ? ?}
? ? ? ?} catch (Exception e) {
? ? ? ? ? ?log.error("error", e);
? ? ? ?} finally {
? ? ? ? ? ?if(locked) {
? ? ? ? ? ? ? ?lock.distributedUnlock(key, uuid);
? ? ? ? ? ?}
? ? ? ?}
? ? ? ?return "ok";
? ?}
我也寫了一個(gè)測(cè)試類豁遭,用于測(cè)試和輸出結(jié)果, 使用100個(gè)線程叭喜,然后鎖的時(shí)間設(shè)置10秒,controller里邊需要休眠3秒模擬業(yè)務(wù)執(zhí)行蓖谢。
@Test
? ?public void distrubtedLock() {
? ? ? ?String url = "http://localhost:8080/distributedLock";
? ? ? ?String uuid = "abcdefg";
// ? ? ? ?log.info("uuid:{}", uuid);
? ? ? ?String key = "redisLock";
? ? ? ?String secondsToLive = "10";
? ? ? ?for(int i = 0; i < 100; i++) {
? ? ? ? ? ?final int userId = i;
? ? ? ? ? ?new Thread(() -> {
? ? ? ? ? ? ? ?MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
? ? ? ? ? ? ? ?params.add("uuid", uuid);
? ? ? ? ? ? ? ?params.add("key", key);
? ? ? ? ? ? ? ?params.add("secondsToLock", secondsToLive);
? ? ? ? ? ? ? ?params.add("userId", String.valueOf(userId));
? ? ? ? ? ? ? ?String result = testRestTemplate.postForObject(url, params, String.class);
? ? ? ? ? ? ? ?System.out.println("-------------" + result);
? ? ? ? ? ?}
? ? ? ? ? ?).start();
? ? ? ?}
? ?}
獲取鎖的地方就會(huì)執(zhí)行do business logic, 然后會(huì)有部分線程獲取到鎖并執(zhí)行業(yè)務(wù)捂蕴,執(zhí)行完業(yè)務(wù)的就會(huì)釋放鎖。
分布式鎖就實(shí)現(xiàn)好了闪幽,接下來(lái)實(shí)現(xiàn)分布式限流啥辨。先看一下limit的lua腳本,需要給腳本傳兩個(gè)值盯腌,一個(gè)值是限流的key,一個(gè)值是限流的數(shù)量溉知。
獲取當(dāng)前key,然后判斷其值是否為nil腕够,如果為nil的話需要賦值為0着倾,然后進(jìn)行加1并且和limit進(jìn)行比對(duì),如果大于limt即返回0燕少,說(shuō)明限流了卡者,如果小于limit則需要使用Redis的INCRBY key 1,就是將key進(jìn)行加1命令。并且設(shè)置超時(shí)時(shí)間客们,超時(shí)時(shí)間是秒崇决,并且如果有需要的話這個(gè)秒也是可以用參數(shù)進(jìn)行設(shè)置。
//lua 下標(biāo)從 1 開始
// 限流 key
local key = KEYS[1]
//限流大小
local limit = tonumber(ARGV[1])
// 獲取當(dāng)前流量大小
local curentLimit = tonumber(redis.call('get', key) or "0")
if curentLimit + 1 > limit then
? ?// 達(dá)到限流大小 返回
? ?return 0;
else
? ?// 沒(méi)有達(dá)到閾值 value + 1
? ?redis.call("INCRBY", key, 1)
? ?// EXPIRE后邊的單位是秒
? ?redis.call("EXPIRE", key, 10)
? ?return curentLimit + 1
end
執(zhí)行l(wèi)imit的腳本和執(zhí)行l(wèi)ock的腳本類似底挫。
package com.hqs.distributedlock.limit;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;
import java.util.Collections;
/**
* @author huangqingshi
* @Date 2019-01-17
*/
@Slf4j
@Component
public class DistributedLimit {
? ?//注意RedisTemplate用的String,String恒傻,后續(xù)所有用到的key和value都是String的
? ?@Autowired
? ?private RedisTemplate<String, String> redisTemplate;
? ?@Autowired
? ?RedisScript<Long> limitScript;
? ?public Boolean distributedLimit(String key, String limit) {
? ? ? ?Long id = 0L;
? ? ? ?try {
? ? ? ? ? ?id = redisTemplate.execute(limitScript, Collections.singletonList(key),
? ? ? ? ? ? ? ? ? ?limit);
? ? ? ? ? ?log.info("id:{}", id);
? ? ? ?} catch (Exception e) {
? ? ? ? ? ?log.error("error", e);
? ? ? ?}
? ? ? ?if(id == 0L) {
? ? ? ? ? ?return false;
? ? ? ?} else {
? ? ? ? ? ?return true;
? ? ? ?}
? ?}
}
接下來(lái)咱們寫一個(gè)限流注解,并且設(shè)置注解的key和限流的大薪ǖ恕:
package com.hqs.distributedlock.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 自定義limit注解
* @author huangqingshi
* @Date 2019-01-17
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistriLimitAnno {
? ?public String limitKey() default "limit";
? ?public int limit() default 1;
}
然后對(duì)注解進(jìn)行切面盈厘,在切面中判斷是否超過(guò)limit,如果超過(guò)limit的時(shí)候就需要拋出異常exceeded limit官边,否則正常執(zhí)行沸手。
package com.hqs.distributedlock.aspect;
import com.hqs.distributedlock.annotation.DistriLimitAnno;
import com.hqs.distributedlock.limit.DistributedLimit;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
/**
* @author huangqingshi
* @Date 2019-01-17
*/
@Slf4j
@Aspect
@Component
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class LimitAspect {
? ?@Autowired
? ?DistributedLimit distributedLimit;
? ?@Pointcut("@annotation(com.hqs.distributedlock.annotation.DistriLimitAnno)")
? ?public void limit() {};
? ?@Before("limit()")
? ?public void beforeLimit(JoinPoint joinPoint) throws Exception {
? ? ? ?MethodSignature signature = (MethodSignature) joinPoint.getSignature();
? ? ? ?Method method = signature.getMethod();
? ? ? ?DistriLimitAnno distriLimitAnno = method.getAnnotation(DistriLimitAnno.class);
? ? ? ?String key = distriLimitAnno.limitKey();
? ? ? ?int limit = distriLimitAnno.limit();
? ? ? ?Boolean exceededLimit = distributedLimit.distributedLimit(key, String.valueOf(limit));
? ? ? ?if(!exceededLimit) {
? ? ? ? ? ?throw new RuntimeException("exceeded limit");
? ? ? ?}
? ?}
}
因?yàn)橛袙伋霎惓M庥觯@里我弄了一個(gè)統(tǒng)一的controller錯(cuò)誤處理,如果controller出現(xiàn)Exception的時(shí)候都需要走這塊異常契吉。如果是正常的RunTimeException的時(shí)候獲取一下跳仿,否則將異常獲取一下并且輸出。
package com.hqs.distributedlock.util;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.context.request.NativeWebRequest;
import javax.servlet.http.HttpServletRequest;
import java.util.HashMap;
import java.util.Map;
/**
* @author huangqingshi
* @Date 2019-01-17
* 統(tǒng)一的controller錯(cuò)誤處理
*/
@Slf4j
@ControllerAdvice
public class UnifiedErrorHandler {
? ?private static Map<String, String> res = new HashMap<>(2);
? ?@ExceptionHandler(value = Exception.class)
? ?@ResponseStatus(HttpStatus.OK)
? ?@ResponseBody
? ?public Object processException(HttpServletRequest req, Exception e) {
? ? ? ?res.put("url", req.getRequestURL().toString());
? ? ? ?if(e instanceof RuntimeException) {
? ? ? ? ? ?res.put("mess", e.getMessage());
? ? ? ?} else {
? ? ? ? ? ?res.put("mess", "sorry error happens");
? ? ? ?}
? ? ? ?return res;
? ?}
}
好了捐晶,接下來(lái)將注解寫到自定義的controller上菲语,limit的大小為10,也就是10秒鐘內(nèi)限制10次訪問(wèn)惑灵。
@PostMapping("/distributedLimit")
? ?@ResponseBody
? ?@DistriLimitAnno(limitKey="limit", limit = 10)
? ?public String distributedLimit(String userId) {
? ? ? ?log.info(userId);
? ? ? ?return "ok";
? ?}
也是來(lái)一段Test方法來(lái)跑山上,老方式100個(gè)線程開始跑,只有10次英支,其他的都是limit佩憾。沒(méi)有問(wèn)題。
總結(jié)一下潭辈,這次實(shí)現(xiàn)采用了使用lua腳本和Redis實(shí)現(xiàn)了鎖和限流鸯屿,但是真實(shí)使用的時(shí)候還需要多測(cè)試澈吨,另外如果此次Redis也是采用的單機(jī)實(shí)現(xiàn)方法把敢,使用集群的時(shí)候可能需要改造一下。
關(guān)于鎖這塊其實(shí)Reids自己也實(shí)現(xiàn)了RedLock, java實(shí)現(xiàn)的版本Redission谅辣。也有很多公司使用了修赞,功能非常強(qiáng)大。各種場(chǎng)景下都用到了桑阶。
擴(kuò)展閱讀