項目需要使用限流措施,查閱后主要使用令牌桶算法實現(xiàn)呈昔,為了更靈活的實現(xiàn)限流挥等,就自己實現(xiàn)了一個簡單的基于令牌桶算法的限流實現(xiàn)。
令牌桶算法描述
令牌桶這種控制機制基于令牌桶中是否存在令牌來指示什么時候可以發(fā)送流量堤尾。令牌桶中的每一個令牌都代表一個字節(jié)触菜。如果令牌桶中存在令牌,則允許發(fā)送流量哀峻;而如果令牌桶中不存在令牌涡相,則不允許發(fā)送流量。因此剩蟀,如果突發(fā)門限被合理地配置并且令牌桶中有足夠的令牌催蝗,那么流量就可以以峰值速率發(fā)送。
簡單的說就是育特,一邊請求時會消耗桶內(nèi)的令牌丙号,另一邊會以固定速率往桶內(nèi)放令牌。當消耗的請求大于放入的速率時缰冤,進行相應的措施犬缨,比如等待,或者拒絕等棉浸。
Java的簡單實現(xiàn)
為了更靈活的定制限流措施怀薛,自己實現(xiàn)了限流的部分代碼,如下:
/**
* @author xiezhengchao
* @since 18/1/3 上午9:45.
* 限流器
*/
public class RateLimiter{
private volatile int token;
private final int originToken;
private static Unsafe unsafe = null;
private static final long valueOffset;
private final Object lock = new Object();
static {
try {
// 應用開發(fā)中使用unsafe對象必須通過反射獲取
Class<?> clazz = Unsafe.class;
Field f = clazz.getDeclaredField("theUnsafe");
f.setAccessible(true);
unsafe = (Unsafe) f.get(clazz);
valueOffset = unsafe.objectFieldOffset(RateLimiter.class.getDeclaredField("token"));
} catch (Exception ex) {throw new Error(ex);}
}
public RateLimiter(int token){
this.originToken = token;
this.token = token;
}
/**
* 獲取一個令牌
*/
public boolean acquire(){
int current = token;
if(current<=0){
// 保證在token已經(jīng)用光的情況下依然有獲取競爭的能力
current = originToken;
}
long expect = 1000;// max wait 1s
long future = System.currentTimeMillis()+expect;
while(current>0){
if(compareAndSet(current, current-1)){
return true;
}
current = token;
if(current<=0 && expect>0){
// 在有效期內(nèi)等待通知
synchronized (lock){
try {
lock.wait(expect);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
current = token;
if(current<=0){
current = originToken;
}
expect = future - System.currentTimeMillis();
}
}
return false;
}
private boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
/**
* 補充令牌
*/
public void supplement(final ExecutorService executorService){
this.token = originToken;
executorService.execute(() -> {
synchronized (lock){
lock.notifyAll();
}
});
}
}
核心代碼在acquire方法部分主要思路就是迷郑,優(yōu)先采用CAS進行自旋操作獲取令牌枝恋,一直嘗試令牌消耗光创倔,那么會進入等待,在定時線程調(diào)用supplement方法時焚碌,會喚醒所有等待線程畦攘,此時進入CAS進行嘗試消耗令牌,以此循環(huán)一直到設置的最大等待時間(代碼中的expect)消耗光十电,如果還沒獲得令牌知押,那么會返回false
這段代碼如果自己起一個線程進行限流,然后自己開個定時線程進行補充也可以鹃骂,但是實際運用中往往需要一個限流管理器來分配限流器朗徊,然后通過限流管理器統(tǒng)一的進行定時觸發(fā),這樣可以不用開很多的定時線程偎漫,同時通過線程池也避免了在定時線程競爭鎖時引發(fā)的過長等待造成定時線程不準的情況爷恳。
下面貼出限流管理器部分代碼
/**
* @author xiezhengchao
* @since 18/1/3 上午9:43.
* 限流管理器
*/
@Component
public class ConfineManager{
// 定時線程
private final ScheduledThreadPoolExecutor scheduledCheck = new ScheduledThreadPoolExecutor(2);
// 執(zhí)行補充線程池
private final ExecutorService executorService = new ThreadPoolExecutor(5, 200,
60L, TimeUnit.SECONDS, new SynchronousQueue<>(),
new NamedThreadFactory("supplement",true,false));
// 限流器容器
private Map<String,RateLimiter> rateLimiterMap = new ConcurrentHashMap<>();
@PostConstruct
public void init(){
scheduledCheck.scheduleAtFixedRate(new SupplementRateLimiter(), 1, 1, TimeUnit.SECONDS);
}
@PreDestroy
public void destroy(){
scheduledCheck.shutdown();
}
/**
* 通過key獲取相應的限流器
*/
public void acquire(String key,int tokenCount){
RateLimiter rateLimiter = rateLimiterMap.get(key);
// 雙檢鎖確保安全創(chuàng)建
if(rateLimiter==null){
synchronized (this){
// init RateLimiter
rateLimiter = rateLimiterMap.computeIfAbsent(key, k -> new RateLimiter(tokenCount));
}
}
// 嘗試獲取令牌
if(!rateLimiter.acquire()){
// 獲取失敗,根據(jù)實際情況進行處理象踊,這里直接拋異常了
Assert.throwBizException(ErrorCode.API_CONFINE_RATE_LIMITER);
}
}
/**
* 補充相應的令牌數(shù)
*/
private class SupplementRateLimiter implements Runnable{
@Override
public void run(){
rateLimiterMap.values().forEach(rateLimiter -> rateLimiter.supplement(executorService));
}
}
}
代碼中主要是創(chuàng)建了定時線程温亲,補充令牌線程池。
部分代碼不是開源的杯矩,需要調(diào)整下栈虚,不影響主流程,代碼使用了一些spring的注解史隆。
其中的不足
在集群的環(huán)境下魂务,沒有考慮分布式的情況,也就是如果一個應用部署的限流是1s產(chǎn)生10個令牌泌射,假設部署了5個應用粘姜,那么實際1s可以產(chǎn)生50個令牌。如果需要考慮這部分熔酷,那么在CAS操作可以替換為通過redis的setnx來進行獲取鎖操作然后更新redis存儲對應的令牌孤紧,補充則直接設置更新redis對應的令牌數(shù)即可,這樣效率肯定比現(xiàn)在基于CAS操作低拒秘。
總結(jié)
實際上實現(xiàn)這個限流器更多的考慮是可以自行定義等待的最大時間号显,超時措施,定時補充令牌時間間隔等躺酒,同時也溫習了一下之前的并發(fā)知識押蚤。