限流的目的是通過對并發(fā)訪問/請求進(jìn)行限速或者一個(gè)時(shí)間窗口內(nèi)的的請求進(jìn)行限速來保護(hù)系統(tǒng)兢交,一旦達(dá)到限制速率則可以拒絕服務(wù)。
前幾天在DD的公眾號(hào),看了一篇關(guān)于使用 瓜娃 實(shí)現(xiàn)單應(yīng)用限流的方案,參考《redis in action》 實(shí)現(xiàn)了一個(gè)jedis版本的,都屬于業(yè)務(wù)層次限制以政。 實(shí)際場景中常用的限流策略:
Nginx接入層限流按照一定的規(guī)則如帳號(hào)、IP伴找、系統(tǒng)調(diào)用邏輯等在Nginx層面做限流
業(yè)務(wù)應(yīng)用系統(tǒng)限流通過業(yè)務(wù)代碼控制流量這個(gè)流量可以被稱為信號(hào)量盈蛮,可以理解成是一種鎖,它可以限制一項(xiàng)資源最多能同時(shí)被多少進(jìn)程訪問技矮。
代碼實(shí)現(xiàn)
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.ZParams;
import java.util.List;
import java.util.UUID;
/**
@email wangiegie@gmail.com
-
@data 2017-08
*/
public class RedisRateLimiter {
private static final String BUCKET = "BUCKET";
private static final String BUCKET_COUNT = "BUCKET_COUNT";
private static final String BUCKET_MONITOR = "BUCKET_MONITOR";static String acquireTokenFromBucket(
Jedis jedis, int limit, long timeout) {
String identifier = UUID.randomUUID().toString();
long now = System.currentTimeMillis();
Transaction transaction = jedis.multi();//刪除信號(hào)量 transaction.zremrangeByScore(BUCKET_MONITOR.getBytes(), "-inf".getBytes(), String.valueOf(now - timeout).getBytes()); ZParams params = new ZParams(); params.weightsByDouble(1.0,0.0); transaction.zinterstore(BUCKET, params, BUCKET, BUCKET_MONITOR); //計(jì)數(shù)器自增 transaction.incr(BUCKET_COUNT); List<Object> results = transaction.exec(); long counter = (Long) results.get(results.size() - 1); transaction = jedis.multi(); transaction.zadd(BUCKET_MONITOR, now, identifier); transaction.zadd(BUCKET, counter, identifier); transaction.zrank(BUCKET, identifier); results = transaction.exec(); //獲取排名抖誉,判斷請求是否取得了信號(hào)量 long rank = (Long) results.get(results.size() - 1); if (rank < limit) { return identifier; } else {//沒有獲取到信號(hào)量,清理之前放入redis 中垃圾數(shù)據(jù) transaction = jedis.multi(); transaction.zrem(BUCKET_MONITOR, identifier); transaction.zrem(BUCKET, identifier); transaction.exec(); } return null;
}
}
調(diào)用
測試接口調(diào)用
@GetMapping("/")
public void index(HttpServletResponse response) throws IOException {
Jedis jedis = jedisPool.getResource();
String token = RedisRateLimiter.acquireTokenFromBucket(jedis, LIMIT,TIMEOUT);
if (token == null) {
response.sendError(500);
}else{
//TODO 你的業(yè)務(wù)邏輯
}
jedisPool.returnResource(jedis);
}
優(yōu)化
使用攔截器 + 注解優(yōu)化代碼
攔截器
@Configuration
static class WebMvcConfigurer extends WebMvcConfigurerAdapter {
private Logger logger =LoggerFactory.getLogger(WebMvcConfigurer.class);
@Autowired
private JedisPool jedisPool;
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new HandlerInterceptorAdapter() {
public boolean preHandle(HttpServletRequest request,HttpServletResponse response,
Object handler) throws Exception {
HandlerMethod handlerMethod = (HandlerMethod) handler;
Method method = handlerMethod.getMethod();
RateLimiter rateLimiter =method.getAnnotation(RateLimiter.class);
if (rateLimiter != null){
int limit = rateLimiter.limit();
int timeout = rateLimiter.timeout();
Jedis jedis = jedisPool.getResource();
String token =RedisRateLimiter.acquireTokenFromBucket(jedis, limit, timeout);
if (token == null) {
response.sendError(500);
return false;
}
logger.debug("token -> {}",token);
jedis.close();
}
return true;
}
}).addPathPatterns("/*");
}
}
定義注解
/**
- @email wangiegie@gmail.com
- @data 2017-08
- 限流注解
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RateLimiter {
int limit() default 5;
int timeout() default 1000;
}
使用
@RateLimiter(limit = 2, timeout = 5000)
@GetMapping("/test")
public void test() {
}
并發(fā)測試
工具:apache-jmeter-3.2
說明: 沒有獲取到信號(hào)量的接口返回500衰倦,status是紅色袒炉,獲取到信號(hào)量的接口返回200,status是綠色樊零。
當(dāng)限制請求信號(hào)量為2我磁,并發(fā)5個(gè)線程:
當(dāng)限制請求信號(hào)量為5孽文,并發(fā)10個(gè)線程:
總結(jié)
對于信號(hào)量的操作,使用事務(wù)操作夺艰。
不要使用時(shí)間戳作為信號(hào)量的排序分?jǐn)?shù)芋哭,因?yàn)樵诜植际江h(huán)境中,各個(gè)節(jié)點(diǎn)的時(shí)間差的原因郁副,會(huì)出現(xiàn)不公平信號(hào)量的現(xiàn)象减牺。
可以使用把這塊代碼抽成@rateLimiter注解,然后再方法上使用就會(huì)很方便啦
不同接口的流控霞势,可以參考源碼的里面RedisRateLimiterPlus,無非是每個(gè)接口生成一個(gè)監(jiān)控參數(shù)