上篇文章(限流算法與Guava RateLimiter解析)對常用的限流算法及Google Guava基于令牌桶算法的實(shí)現(xiàn)RateLimiter進(jìn)行了介紹蛉顽。RateLimiter通過線程鎖控制同步,只適用于單機(jī)應(yīng)用幅虑,在分布式環(huán)境下文兑,雖然有像阿里Sentinel的限流開源框架盒刚,但對于一些小型應(yīng)用來說未免過重,但限流的需求在小型項(xiàng)目中也是存在的绿贞,比如獲取手機(jī)驗(yàn)證碼的控制因块,對資源消耗較大操作的訪問頻率控制等。本文介紹最近寫的一個基于RateLimiter籍铁,適用于分布式環(huán)境下的限流實(shí)現(xiàn)涡上,并使用spring-boot-starter的形式發(fā)布,比較輕量級且“開箱即用”拒名。
本文限流實(shí)現(xiàn)包括兩種形式:
- 基于RateLimiter令牌桶算法的限速控制(嚴(yán)格限制訪問速度)
- 基于Lua腳本的限量控制(限制一個時間窗口內(nèi)的訪問量吩愧,對訪問速度沒有嚴(yán)格限制)
限速控制
1. 令牌桶模型
首先定義令牌桶模型,與RateLimiter中類似增显,包括幾個關(guān)鍵屬性與關(guān)鍵方法雁佳。其中關(guān)鍵屬性定義如下,
@Data
public class RedisPermits {
/**
* 最大存儲令牌數(shù)
*/
private double maxPermits;
/**
* 當(dāng)前存儲令牌數(shù)
*/
private double storedPermits;
/**
* 添加令牌的時間間隔/毫秒
*/
private double intervalMillis;
/**
* 下次請求可以獲取令牌的時間同云,可以是過去(令牌積累)也可以是將來的時間(令牌預(yù)消費(fèi))
*/
private long nextFreeTicketMillis;
//...
關(guān)鍵方法定義與RateLimiter也大同小異糖权,方法注釋基本已描述各方法用途,不再贅述炸站。
/**
* 構(gòu)建Redis令牌數(shù)據(jù)模型
*
* @param permitsPerSecond 每秒放入的令牌數(shù)
* @param maxBurstSeconds maxPermits由此字段計(jì)算星澳,最大存儲maxBurstSeconds秒生成的令牌
* @param nextFreeTicketMillis 下次請求可以獲取令牌的起始時間,默認(rèn)當(dāng)前系統(tǒng)時間
*/
public RedisPermits(double permitsPerSecond, double maxBurstSeconds, Long nextFreeTicketMillis) {
this.maxPermits = permitsPerSecond * maxBurstSeconds;
this.storedPermits = maxPermits;
this.intervalMillis = TimeUnit.SECONDS.toMillis(1) / permitsPerSecond;
this.nextFreeTicketMillis = nextFreeTicketMillis;
}
/**
* 基于當(dāng)前時間旱易,若當(dāng)前時間晚于nextFreeTicketMicros禁偎,則計(jì)算該段時間內(nèi)可以生成多少令牌,將生成的令牌加入令牌桶中并更新數(shù)據(jù)
*/
public void resync(long nowMillis) {
if (nowMillis > nextFreeTicketMillis) {
double newPermits = (nowMillis - nextFreeTicketMillis) / intervalMillis;
storedPermits = Math.min(maxPermits, storedPermits + newPermits);
nextFreeTicketMillis = nowMillis;
}
}
/**
* 保留指定數(shù)量令牌咒唆,并返回需要等待的時間
*/
public long reserveAndGetWaitLength(long nowMillis, int permits) {
resync(nowMillis);
double storedPermitsToSpend = Math.min(permits, storedPermits); // 可以消耗的令牌數(shù)
double freshPermits = permits - storedPermitsToSpend; // 需要等待的令牌數(shù)
long waitMillis = (long) (freshPermits * intervalMillis); // 需要等待的時間
nextFreeTicketMillis = LongMath.saturatedAdd(nextFreeTicketMillis, waitMillis);
storedPermits -= storedPermitsToSpend;
return waitMillis;
}
/**
* 在超時時間內(nèi)届垫,是否有指定數(shù)量的令牌可用
*/
public boolean canAcquire(long nowMillis, int permits, long timeoutMillis) {
return queryEarliestAvailable(nowMillis, permits) <= timeoutMillis;
}
/**
* 指定數(shù)量令牌數(shù)可用需等待的時間
*
* @param permits 需保留的令牌數(shù)
* @return 指定數(shù)量令牌可用的等待時間,如果為0或負(fù)數(shù)全释,表示當(dāng)前可用
*/
private long queryEarliestAvailable(long nowMillis, int permits) {
resync(nowMillis);
double storedPermitsToSpend = Math.min(permits, storedPermits); // 可以消耗的令牌數(shù)
double freshPermits = permits - storedPermitsToSpend; // 需要等待的令牌數(shù)
long waitMillis = (long) (freshPermits * intervalMillis); // 需要等待的時間
return LongMath.saturatedAdd(nextFreeTicketMillis - nowMillis, waitMillis);
}
2. 令牌桶控制類
Guava RateLimiter中的控制都在RateLimiter及其子類中(如SmoothBursty)装处,本處涉及到分布式環(huán)境下的同步,因此將其解耦,令牌桶模型存儲于Redis中妄迁,對其同步操作的控制放置在如下控制類寝蹈,其中同步控制使用到了前面介紹的分布式鎖(參考基于Redis分布式鎖的正確打開方式)
@Slf4j
public class RedisRateLimiter {
/**
* 獲取一個令牌,阻塞一直到獲取令牌登淘,返回阻塞等待時間
*
* @return time 阻塞等待時間/毫秒
*/
public long acquire(String key) throws IllegalArgumentException {
return acquire(key, 1);
}
/**
* 獲取指定數(shù)量的令牌箫老,如果令牌數(shù)不夠,則一直阻塞黔州,返回阻塞等待的時間
*
* @param permits 需要獲取的令牌數(shù)
* @return time 等待的時間/毫秒
* @throws IllegalArgumentException tokens值不能為負(fù)數(shù)或零
*/
public long acquire(String key, int permits) throws IllegalArgumentException {
long millisToWait = reserve(key, permits);
log.info("acquire {} permits for key[{}], waiting for {}ms", permits, key, millisToWait);
try {
Thread.sleep(millisToWait);
} catch (InterruptedException e) {
log.error("Interrupted when trying to acquire {} permits for key[{}]", permits, key, e);
}
return millisToWait;
}
/**
* 在指定時間內(nèi)獲取一個令牌耍鬓,如果獲取不到則一直阻塞,直到超時
*
* @param timeout 最大等待時間(超時時間)流妻,為0則不等待立即返回
* @param unit 時間單元
* @return 獲取到令牌則true牲蜀,否則false
* @throws IllegalArgumentException
*/
public boolean tryAcquire(String key, long timeout, TimeUnit unit) throws IllegalArgumentException {
return tryAcquire(key, 1, timeout, unit);
}
/**
* 在指定時間內(nèi)獲取指定數(shù)量的令牌,如果在指定時間內(nèi)獲取不到指定數(shù)量的令牌绅这,則直接返回false涣达,
* 否則阻塞直到能獲取到指定數(shù)量的令牌
*
* @param permits 需要獲取的令牌數(shù)
* @param timeout 最大等待時間(超時時間)
* @param unit 時間單元
* @return 如果在指定時間內(nèi)能獲取到指定令牌數(shù),則true,否則false
* @throws IllegalArgumentException tokens為負(fù)數(shù)或零证薇,拋出異常
*/
public boolean tryAcquire(String key, int permits, long timeout, TimeUnit unit) throws IllegalArgumentException {
long timeoutMillis = Math.max(unit.toMillis(timeout), 0);
checkPermits(permits);
long millisToWait;
boolean locked = false;
try {
locked = lock.lock(key + LOCK_KEY_SUFFIX, WebUtil.getRequestId(), 60, 2, TimeUnit.SECONDS);
if (locked) {
long nowMillis = getNowMillis();
RedisPermits permit = getPermits(key, nowMillis);
if (!permit.canAcquire(nowMillis, permits, timeoutMillis)) {
return false;
} else {
millisToWait = permit.reserveAndGetWaitLength(nowMillis, permits);
permitsRedisTemplate.opsForValue().set(key, permit, expire, TimeUnit.SECONDS);
}
} else {
return false; //超時獲取不到鎖度苔,也返回false
}
} finally {
if (locked) {
lock.unLock(key + LOCK_KEY_SUFFIX, WebUtil.getRequestId());
}
}
if (millisToWait > 0) {
try {
Thread.sleep(millisToWait);
} catch (InterruptedException e) {
}
}
return true;
}
/**
* 保留指定的令牌數(shù)待用
*
* @param permits 需保留的令牌數(shù)
* @return time 令牌可用的等待時間
* @throws IllegalArgumentException tokens不能為負(fù)數(shù)或零
*/
private long reserve(String key, int permits) throws IllegalArgumentException {
checkPermits(permits);
try {
lock.lock(key + LOCK_KEY_SUFFIX, WebUtil.getRequestId(), 60, 2, TimeUnit.SECONDS);
long nowMillis = getNowMillis();
RedisPermits permit = getPermits(key, nowMillis);
long waitMillis = permit.reserveAndGetWaitLength(nowMillis, permits);
permitsRedisTemplate.opsForValue().set(key, permit, expire, TimeUnit.SECONDS);
return waitMillis;
} finally {
lock.unLock(key + LOCK_KEY_SUFFIX, WebUtil.getRequestId());
}
}
/**
* 獲取令牌桶
*
* @return
*/
private RedisPermits getPermits(String key, long nowMillis) {
RedisPermits permit = permitsRedisTemplate.opsForValue().get(key);
if (permit == null) {
permit = new RedisPermits(permitsPerSecond, maxBurstSeconds, nowMillis);
}
return permit;
}
/**
* 獲取redis服務(wù)器時間
*/
private long getNowMillis() {
String luaScript = "return redis.call('time')";
DefaultRedisScript<List> redisScript = new DefaultRedisScript<>(luaScript, List.class);
List<String> now = (List<String>)stringRedisTemplate.execute(redisScript, null);
return now == null ? System.currentTimeMillis() : Long.valueOf(now.get(0))*1000+Long.valueOf(now.get(1))/1000;
}
//...
}
其中:
- acquire 是阻塞方法,如果沒有可用的令牌浑度,則一直阻塞直到獲取到令牌寇窑。
- tryAcquire 則是非阻塞方法,如果在指定超時時間內(nèi)獲取不到指定數(shù)量的令牌俺泣,則直接返回false疗认,不阻塞等待完残。
- getNowMillis 獲取Redis服務(wù)器時間伏钠,避免業(yè)務(wù)服務(wù)器時間不一致導(dǎo)致的問題,如果業(yè)務(wù)服務(wù)器能保障時間同步谨设,則可從本地獲取提高效率熟掂。
3. 令牌桶控制工廠類
工廠類負(fù)責(zé)管理令牌桶控制類,將其緩存在本地扎拣,這里使用了Guava中的Cache赴肚,一方面避免每次都新建控制類提高效率,另一方面通過控制緩存的最大容量來避免像用戶粒度的限流占用過多的內(nèi)存二蓝。
public class RedisRateLimiterFactory {
private PermitsRedisTemplate permitsRedisTemplate;
private StringRedisTemplate stringRedisTemplate;
private DistributedLock distributedLock;
private Cache<String, RedisRateLimiter> cache = CacheBuilder.newBuilder()
.initialCapacity(100) //初始大小
.maximumSize(10000) // 緩存的最大容量
.expireAfterAccess(5, TimeUnit.MINUTES) // 緩存在最后一次訪問多久之后失效
.concurrencyLevel(Runtime.getRuntime().availableProcessors()) // 設(shè)置并發(fā)級別
.build();
public RedisRateLimiterFactory(PermitsRedisTemplate permitsRedisTemplate, StringRedisTemplate stringRedisTemplate, DistributedLock distributedLock) {
this.permitsRedisTemplate = permitsRedisTemplate;
this.stringRedisTemplate = stringRedisTemplate;
this.distributedLock = distributedLock;
}
/**
* 創(chuàng)建RateLimiter
*
* @param key RedisRateLimiter本地緩存key
* @param permitsPerSecond 每秒放入的令牌數(shù)
* @param maxBurstSeconds 最大存儲maxBurstSeconds秒生成的令牌
* @param expire 該令牌桶的redis tty/秒
* @return RateLimiter
*/
public RedisRateLimiter build(String key, double permitsPerSecond, double maxBurstSeconds, int expire) {
if (cache.getIfPresent(key) == null) {
synchronized (this) {
if (cache.getIfPresent(key) == null) {
cache.put(key, new RedisRateLimiter(permitsRedisTemplate, stringRedisTemplate, distributedLock, permitsPerSecond,
maxBurstSeconds, expire));
}
}
}
return cache.getIfPresent(key);
}
}
4. 注解支持
定義注解 @RateLimit 如下誉券,表示以每秒rate的速率放置令牌,最多保留burst秒的令牌刊愚,取令牌的超時時間為timeout踊跟,limitType用于控制key類型,目前支持:
- IP, 根據(jù)客戶端IP限流
- USER, 根據(jù)用戶限流鸥诽,對于Spring Security可從SecurityContextHolder中獲取當(dāng)前用戶信息商玫,如userId
- METHOD, 根據(jù)方法名全局限流箕憾,className.methodName,注意避免同時對同一個類中的同名方法做限流控制拳昌,否則需要修改獲取key的邏輯
- CUSTOM袭异,自定義,支持表達(dá)式解析炬藤,如#{id}, #{user.id}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RateLimit {
String key() default "";
String prefix() default "rateLimit:"; //key前綴
int expire() default 60; // 表示令牌桶模型RedisPermits redis key的過期時間/秒
double rate() default 1.0; // permitsPerSecond值
double burst() default 1.0; // maxBurstSeconds值
int timeout() default 0; // 超時時間/秒
LimitType limitType() default LimitType.METHOD;
}
通過切面的前置增強(qiáng)來為添加了 @RateLimit 注解的方法提供限流控制御铃,如下
@Aspect
@Slf4j
public class RedisLimitAspect {
//...
@Before(value = "@annotation(rateLimit)")
public void rateLimit(JoinPoint point, RateLimit rateLimit) throws Throwable {
String key = getKey(point, rateLimit.limitType(), rateLimit.key(), rateLimit.prefix());
RedisRateLimiter redisRateLimiter = redisRateLimiterFactory.build(key, rateLimit.rate(), rateLimit.burst(), rateLimit.expire());
if(!redisRateLimiter.tryAcquire(key, rateLimit.timeout(), TimeUnit.SECONDS)){
ExceptionUtil.rethrowClientSideException(LIMIT_MESSAGE);
}
}
//...
限量控制
1. 限量控制類
限制一個時間窗口內(nèi)的訪問量,可使用計(jì)數(shù)器算法沈矿,借助Lua腳本執(zhí)行的原子性來實(shí)現(xiàn)畅买。
Lua腳本邏輯:
- 以需要控制的對象為key(如方法,用戶ID细睡,或IP等)谷羞,當(dāng)前訪問次數(shù)為Value,時間窗口值為緩存的過期時間
- 如果key存在則將其增1溜徙,判斷當(dāng)前值是否大于訪問量限制值湃缎,如果大于則返回0,表示該時間窗口內(nèi)已達(dá)訪問量上限蠢壹,如果小于則返回1表示允許訪問
- 如果key不存在嗓违,則將其初始化為1,并設(shè)置過期時間图贸,返回1表示允許訪問
public class RedisCountLimiter {
private StringRedisTemplate stringRedisTemplate;
private static final String LUA_SCRIPT = "local c \nc = redis.call('get',KEYS[1]) \nif c and redis.call('incr',KEYS[1]) > tonumber(ARGV[1]) then return 0 end"
+ " \nif c then return 1 else \nredis.call('set', KEYS[1], 1) \nredis.call('expire', KEYS[1], tonumber(ARGV[2])) \nreturn 1 end";
private static final int SUCCESS_RESULT = 1;
private static final int FAIL_RESULT = 0;
public RedisCountLimiter(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
/**
* 是否允許訪問
*
* @param key redis key
* @param limit 限制次數(shù)
* @param expire 時間段/秒
* @return 獲取成功true蹂季,否則false
* @throws IllegalArgumentException
*/
public boolean tryAcquire(String key, int limit, int expire) throws IllegalArgumentException {
RedisScript<Number> redisScript = new DefaultRedisScript<>(LUA_SCRIPT, Number.class);
Number result = stringRedisTemplate.execute(redisScript, Collections.singletonList(key), String.valueOf(limit), String.valueOf(expire));
if(result != null && result.intValue() == SUCCESS_RESULT) {
return true;
}
return false;
}
}
2. 注解支持
定義注解 @CountLimit 如下,表示在period時間窗口內(nèi)疏日,最多允許訪問limit次偿洁,limitType用于控制key類型,取值與 @RateLimit 同沟优。
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface CountLimit {
String key() default "";
String prefix() default "countLimit:"; //key前綴
int limit() default 1; // expire時間段內(nèi)限制訪問次數(shù)
int period() default 1; // 表示時間段/秒
LimitType limitType() default LimitType.METHOD;
}
同樣采用前值增強(qiáng)來為添加了 @CountLimit 注解的方法提供限流控制涕滋,如下
@Before(value = "@annotation(countLimit)")
public void countLimit(JoinPoint point, CountLimit countLimit) throws Throwable {
String key = getKey(point, countLimit.limitType(), countLimit.key(), countLimit.prefix());
if (!redisCountLimiter.tryAcquire(key, countLimit.limit(), countLimit.period())) {
ExceptionUtil.rethrowClientSideException(LIMIT_MESSAGE);
}
}
使用示例
1.添加依賴
<dependencies>
<dependency>
<groupId>cn.jboost.springboot</groupId>
<artifactId>limiter-spring-boot-starter</artifactId>
<version>1.3-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
2.配置redis相關(guān)參數(shù)
spring:
application:
name: limiter-demo
redis:
#數(shù)據(jù)庫索引
database: 0
host: 192.168.40.92
port: 6379
password: password
#連接超時時間
timeout: 2000
3.測試類
@RestController
@RequestMapping("limiter")
public class LimiterController {
/**
* 注解形式
* @param key
* @return
*/
@GetMapping("/count")
@CountLimit(key = "#{key}", limit = 2, period = 10, limitType = LimitType.CUSTOM)
public String testCountLimit(@RequestParam("key") String key){
return "test count limiter...";
}
/**
* 注解形式
* @param key
* @return
*/
@GetMapping("/rate")
@RateLimit(rate = 1.0/5, burst = 5.0, expire = 120, timeout = 0)
public String testRateLimit(@RequestParam("key") String key){
return "test rate limiter...";
}
@Autowired
private RedisRateLimiterFactory redisRateLimiterFactory;
/**
* 代碼段形式
* @param
* @return
*/
@GetMapping("/rate2")
public String testRateLimit(){
RedisRateLimiter limiter = redisRateLimiterFactory.build("LimiterController.testRateLimit", 1.0/30, 30, 120);
if(!limiter.tryAcquire("app.limiter", 0, TimeUnit.SECONDS)) {
System.out.println(LocalDateTime.now());
ExceptionUtil.rethrowClientSideException("您的訪問過于頻繁,請稍后重試");
}
return "test rate limiter 2...";
}
}
4.驗(yàn)證
啟動測試項(xiàng)目挠阁,瀏覽器中訪問 http://localhost:8080/limiter/rate?key=test 宾肺,第一次訪問成功,如圖
持續(xù)刷新侵俗,將返回如下錯誤锨用,直到5s之后再返回成功,限制5秒1次的訪問速度
注解的使用
- 限流類型LimitType支持IP(客戶端IP)隘谣、用戶(userId)增拥、方法(className.methodName)、自定義(CUSTOM)幾種形式,默認(rèn)為METHOD
- LimitType為CUSTOM時跪者,需要手動指定key(其它key自動為ip棵帽,userid,或methodname)渣玲,key支持表達(dá)式形式逗概,如#{id}, #{user.id}
- 針對某個時間窗口內(nèi)限制訪問一次的場景,既可以使用 @CountLimit忘衍, 也可以使用 @RateLimit逾苫,比如驗(yàn)證碼一分鐘內(nèi)只允許獲取一次,以下兩種形式都能達(dá)到目的
//同一個手機(jī)號碼60s內(nèi)最多訪問一次
@CountLimit(key = "#{params.phone}", limit = 1, period = 60, limitType = LimitType.CUSTOM)
//以1/60的速度放置令牌枚钓,最多保存60s的令牌(也就是最多保存一個)铅搓,控制訪問速度為1/60個每秒(1個每分鐘)
@RateLimit(key = "#{params.phone}", rate = 1.0/60, burst = 60, expire = 120, limitType = LimitType.CUSTOM)
總結(jié)
本文介紹了適用于分布式環(huán)境的基于RateLimiter令牌桶算法的限速控制與基于計(jì)數(shù)器算法的限量控制,可應(yīng)用于中小型項(xiàng)目中有相關(guān)需求的場景(注:本實(shí)現(xiàn)未做壓力測試搀捷,如果用戶并發(fā)量較大需驗(yàn)證效果)星掰。
- 本文完整代碼見:https://github.com/ronwxy/base-spring-boot ,目錄 spring-boot-autoconfigure/src/main/java/cn/jboost/springboot/autoconfig/limiter
- 示例項(xiàng)目代碼地址:https://github.com/ronwxy/springboot-demos/tree/master/springboot-limiter
如果覺得有幫助嫩舟,別忘了給個star _氢烘。作者公眾號:半路雨歌,歡迎關(guān)注查看更多干貨文章家厌。
[轉(zhuǎn)載請注明出處]
作者:雨歌
歡迎關(guān)注作者公眾號:半路雨歌播玖,查看更多技術(shù)干貨文章