序
雙向隊(duì)列(Deque),是Queue的一個(gè)子接口惜犀,雙向隊(duì)列是指該隊(duì)列兩端的元素既能入隊(duì)(offer)也能出隊(duì)(poll)革屠。使用場(chǎng)景比如工作竊取剑逃,比如限流沛膳。
限流實(shí)例
使用deque來(lái)限流扁远,其中timeIntervalInMs為事件窗口俊鱼,maxLimit為該事件窗口的最大值刻像。
public class MyRateLimiter {
private static final Logger LOGGER = LoggerFactory.getLogger(DemoRateLimiter.class);
private final Deque<Long> queue;
private long timeIntervalInMs;
public MyRateLimiter(long timeIntervalInMs, int maxLimit) {
this.timeIntervalInMs = timeIntervalInMs;
this.queue = new LinkedBlockingDeque<Long>(maxLimit);
}
public boolean incrAndReachLimit(){
long currentTimeMillis = System.currentTimeMillis();
boolean success = queue.offerFirst(currentTimeMillis);
if(success){
//沒(méi)有超過(guò)maxLimit
return false;
}
synchronized (this){
//queue is full
long last = queue.getLast();
//還在時(shí)間窗口內(nèi),超過(guò)maxLimit
if (currentTimeMillis - last < timeIntervalInMs) {
return true;
}
LOGGER.info("time window expired,current:{},last:{}",currentTimeMillis,last);
//超過(guò)時(shí)間窗口了,超過(guò)maxLimit的情況下,重置時(shí)間窗口
queue.removeLast();
queue.addFirst(currentTimeMillis);
return false;
}
}
}
測(cè)試
@Test
public void testDeque() throws InterruptedException {
DemoRateLimiter limiter = new DemoRateLimiter(5*1000,3);
Callable<Void> test = new Callable<Void>(){
@Override
public Void call() throws Exception {
for(int i=0;i<1000;i++){
LOGGER.info("result:{}",limiter.incrAndReachLimit());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return null;
}
};
ExecutorService pool = Executors.newFixedThreadPool(10);
pool.invokeAll(Arrays.asList(test,test,test,test,test));
Thread.sleep(100000);
}
小結(jié)
這里使用了Deque的容量來(lái)作為時(shí)間窗口的限流大小,利用兩端來(lái)判斷時(shí)間窗口并闲,相對(duì)來(lái)講有點(diǎn)巧妙细睡。