算法詳解
計(jì)數(shù)算法
說明
技術(shù)算法,為最簡(jiǎn)單的限流算法焊唬。
核心思想是恋昼,每隔一段時(shí)間,赶促,為計(jì)數(shù)器設(shè)定最大值液肌,請(qǐng)求一次,計(jì)數(shù)器數(shù)量減一鸥滨,如果計(jì)數(shù)器為0嗦哆,則拒絕請(qǐng)求。
圖示流程
適用場(chǎng)景
雖然此算法是大多數(shù)人第一個(gè)想到婿滓,可以限流的算法老速,但是不推薦使用此算法。
因?yàn)榭栈茫怂惴ㄓ袀€(gè)致命性的問題烁峭,如果1秒允許的訪問次數(shù)為100,前0.99秒內(nèi)沒有任何請(qǐng)求秕铛,在最后0.01秒內(nèi)约郁,出現(xiàn)了200個(gè)請(qǐng)求,則這200個(gè)請(qǐng)求但两,都會(huì)獲取調(diào)用許可鬓梅,給程序帶來一次請(qǐng)求的高峰。
如下圖所示:
/**
* 計(jì)數(shù)法
*/
public class CountLimiter {
/**
* 執(zhí)行區(qū)間(毫秒)
*/
private int secondMill;
/**
* 區(qū)間內(nèi)計(jì)數(shù)多少次
*/
private int maxCount;
/**
* 當(dāng)前計(jì)數(shù)
*/
private int currentCount;
/**
* 上次更新時(shí)間(毫秒)
*/
private long lastUpdateTime;
public CountLimiter(int second, int count) {
if (second <= 0 || count <= 0) {
throw new IllegalArgumentException("second and time must by positive");
}
this.secondMill = second * 1000;
this.maxCount = count;
this.currentCount = this.maxCount;
this.lastUpdateTime = System.currentTimeMillis();
}
/**
* 刷新計(jì)數(shù)器
*/
private void refreshCount() {
long now = System.currentTimeMillis();
if ((now - this.lastUpdateTime) >= secondMill) {
this.currentCount = maxCount;
this.lastUpdateTime = now;
}
}
/**
* 獲取授權(quán)
* @return
*/
public synchronized boolean tryAcquire() {
// 刷新計(jì)數(shù)器
this.refreshCount();
if ((this.currentCount - 1) >= 0) {
this.currentCount--;
return true;
} else {
return false;
}
}
public static void main(String[] args) throws InterruptedException {
CountLimiter countLimiter = new CountLimiter(1,2);
for (int i = 0; i < 10; i++) {
System.out.println(LocalDateTime.now() + " " + countLimiter.tryAcquire());
TimeUnit.MILLISECONDS.sleep(200);
}
}
}
漏桶算法
說明
漏桶算法稱為leaky bucket谨湘,可限制指定時(shí)間內(nèi)的最大流量绽快,如限制60秒內(nèi),最多允許100個(gè)請(qǐng)求紧阔。
其中接受請(qǐng)求的速度是不恒定的(水滴入桶)坊罢,處理請(qǐng)求的速度是恒定的(水滴出桶)。
算法總體描述如下:
1擅耽、有個(gè)固定容量的桶B(指定時(shí)間區(qū)間X活孩,允許的的最大流量B),如60秒內(nèi)最多允許100個(gè)請(qǐng)求乖仇,則B為100憾儒,X為60询兴。
2、有水滴流進(jìn)來(有請(qǐng)求進(jìn)來)起趾,桶里的水+1诗舰。
3、有水滴流出去(執(zhí)行請(qǐng)求對(duì)應(yīng)的業(yè)務(wù))训裆,桶里的水-1(業(yè)務(wù)方法眶根,真正開始執(zhí)行=>這是保證漏桶勻速處理業(yè)務(wù)的根本)水滴流出去的速度是勻速的,流速為B/X,1毫秒100/60次缭保,約1毫秒0.00167次汛闸,精度可根據(jù)實(shí)際情況自己控制)。
4艺骂、水桶滿了后(60秒內(nèi)請(qǐng)求達(dá)到了100次)诸老,水滴無法進(jìn)入水桶,請(qǐng)求被拒絕钳恕。
圖示
實(shí)際開發(fā)中别伏,漏桶的使用方式可參考下圖:
需注意,水滴滴落的時(shí)候忧额,才開始執(zhí)行業(yè)務(wù)代碼厘肮,而不是水滴進(jìn)桶的時(shí)候,去執(zhí)行業(yè)務(wù)代碼睦番。
業(yè)務(wù)代碼的執(zhí)行方式类茂,個(gè)人認(rèn)為有如下兩種:
同步執(zhí)行
1、調(diào)用方請(qǐng)求時(shí)托嚣,如水滴可以放入桶中巩检,調(diào)用方所在的線程“阻塞”
2、水滴漏出時(shí)示启,喚醒調(diào)用方線程兢哭,調(diào)用方線程,執(zhí)行具體業(yè)務(wù)
異步執(zhí)行
1夫嗓、調(diào)用方請(qǐng)求時(shí)迟螺,如水滴可以放入桶中,調(diào)用方所在的線程收到響應(yīng)舍咖,方法將異步執(zhí)行
2矩父、水滴漏出時(shí),水桶代理執(zhí)行具體業(yè)務(wù)
網(wǎng)上很多滴桶的實(shí)現(xiàn)代碼排霉,在水滴進(jìn)桶的時(shí)候窍株,這樣會(huì)導(dǎo)致業(yè)務(wù)代碼,無法勻速地執(zhí)行,仍然對(duì)被調(diào)用的接口有一瞬間流量的沖擊(和令牌桶算法的最終實(shí)現(xiàn)效果一樣)夹姥。
適用場(chǎng)景
水桶的進(jìn)水速度是不可控的,有可能一瞬間有大量的請(qǐng)求進(jìn)入水桶辙诞。處理請(qǐng)求的速度是恒定的(滴水的時(shí)候處理請(qǐng)求)辙售。
此算法,主要應(yīng)用于自己的服務(wù)飞涂,調(diào)用外部接口旦部。以均勻的速度調(diào)用外部接口,防止對(duì)外部接口的壓力過大较店,而影響外部系統(tǒng)的穩(wěn)定性士八。如果影響了別人的系統(tǒng),力過大梁呈,而影響外部系統(tǒng)的穩(wěn)定性婚度。如果影響了別人的系統(tǒng),官卡、
代碼
本實(shí)例代碼的實(shí)現(xiàn)蝗茁,在水滴滴下,執(zhí)行具體業(yè)務(wù)代碼時(shí),采用同步執(zhí)行的方式寻咒。即喚醒調(diào)用方的線程哮翘,讓"調(diào)用者"所屬的線程去執(zhí)行具體業(yè)務(wù)代碼,去調(diào)用接口毛秘。
/**
* 漏桶算法
*/
public class LeakyBucketLimiter {
/**
* 漏桶流出速率(多少納秒執(zhí)行一次)
*/
private long outflowRateNanos;
/**
* 漏桶容器
*/
private volatile BlockingQueue<Drip> queue;
/**
* 滴水線程
*/
private Thread outflowThread;
/**
* 水滴
*/
private static class Drip {
/**
* 業(yè)務(wù)主鍵
*/
private String busId;
/**
* 水滴對(duì)應(yīng)的調(diào)用者線程
*/
private Thread thread;
public Drip(String busId, Thread thread) {
this.thread = thread;
}
public String getBusId() {
return this.busId;
}
public Thread getThread() {
return this.thread;
}
}
/**
* @param second 秒
* @param time 調(diào)用次數(shù)
*/
public LeakyBucketLimiter(int second, int time) {
if (second <= 0 || time <= 0) {
throw new IllegalArgumentException("second and time must by positive");
}
//多少納秒執(zhí)行一次
outflowRateNanos = TimeUnit.SECONDS.toNanos(second) / time;
queue = new LinkedBlockingQueue<>(time);
outflowThread = new Thread(() -> {
while (true) {
Drip drip = null;
try {
// 阻塞饭寺,直到從桶里拿到水滴
drip = queue.take();
} catch (Exception e) {
e.printStackTrace();
}
if (drip != null && drip.getThread() != null) {
// 喚醒阻塞的水滴里面的線程
LockSupport.unpark(drip.getThread());
}
// 休息一段時(shí)間,開始下一次滴水
LockSupport.parkNanos(this, outflowRateNanos);
}
}, "漏水線程");
outflowThread.start();
}
/**
* 業(yè)務(wù)請(qǐng)求
*
* @return
*/
public boolean acquire(String busId) {
Thread thread = Thread.currentThread();
Drip drip = new Drip(busId, thread);
//添加失敗返回false叫挟。
if (this.queue.offer(drip)) {
LockSupport.park();
return true;
} else {
return false;
}
}
public static void main(String[] args) throws InterruptedException {
//1秒限制執(zhí)行1次
LeakyBucketLimiter leakyBucketLimiter = new LeakyBucketLimiter(5, 2);
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
String busId = "[業(yè)務(wù)ID:" + LocalDateTime.now().toString() + "]";
if (leakyBucketLimiter.acquire(busId)) {
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + ":調(diào)用外部接口...成功:" + busId);
} else {
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + ":調(diào)用外部接口...失敿璩住:" + busId);
}
}
}, "測(cè)試線程-" + i).start();
TimeUnit.MILLISECONDS.sleep(500);
}
}
}
令牌桶算法
說明
令牌桶算法,主要是勻速地增加可用令牌霞揉,令牌數(shù)因?yàn)橥暗南拗朴袛?shù)量上限旬薯。
請(qǐng)求拿到令牌,相當(dāng)于拿到授權(quán)适秩,绊序,即可進(jìn)行相應(yīng)的業(yè)務(wù)操作。
圖示
適用場(chǎng)景
和漏桶算法比秽荞,有可能導(dǎo)致短時(shí)間內(nèi)的請(qǐng)求數(shù)上升(因?yàn)槟玫搅钆坪笾韫涂梢栽L問接口,有可能一瞬間將所有令牌拿走)扬跋,但是不會(huì)有計(jì)數(shù)算法那樣高的峰值(因?yàn)榱钆茢?shù)量是勻速增加的)阶捆。
一般自己調(diào)用自己的接口,接口會(huì)有一定的伸縮性,令牌桶算法洒试,主要用來保護(hù)自己的服務(wù)器接口倍奢。
/**
* 令牌桶限流算法
*/
public class TokenBucketLimiter {
/**
* 桶的大小
*/
private double bucketSize;
/**
* 桶里的令牌數(shù)
*/
private double tokenCount;
/**
* 令牌增加速度(每毫秒)
*/
private double tokenAddRateMillSecond;
/**
* 上次更新時(shí)間(毫秒)
*/
private long lastUpdateTime;
/**
* @param second 秒
* @param time 調(diào)用次數(shù)
*/
public TokenBucketLimiter(double second, double time) {
if (second <= 0 || time <= 0) {
throw new IllegalArgumentException("second and time must by positive");
}
// 桶的大小
this.bucketSize = time;
// 桶里的令牌數(shù)
this.tokenCount = this.bucketSize;
// 令牌增加速度(每毫秒)
this.tokenAddRateMillSecond = time / second / 1000;
// 上次更新時(shí)間(毫秒)
this.lastUpdateTime = System.currentTimeMillis();
}
/**
* 刷新桶內(nèi)令牌數(shù)(令牌數(shù)不得超過桶的大小)
* 計(jì)算“上次刷新時(shí)間”到“當(dāng)前刷新時(shí)間”中間,增加的令牌數(shù)
*/
private void refreshTokenCount() {
long now = System.currentTimeMillis();
this.tokenCount = Math.min(this.bucketSize, this.tokenCount + ((now - this.lastUpdateTime) * this.tokenAddRateMillSecond));
this.lastUpdateTime = now;
}
/**
* 嘗試拿到權(quán)限
*
* @return
*/
public synchronized boolean tryAcquire() {
// 刷新桶內(nèi)令牌數(shù)
this.refreshTokenCount();
if ((this.tokenCount - 1) >= 0) {
// 如果桶中有令牌垒棋,令牌數(shù)-1
this.tokenCount--;
return true;
} else {
// 桶中已無令牌
return false;
}
}
public static void main(String[] args) throws InterruptedException {
TokenBucketLimiter leakyBucketLimiter = new TokenBucketLimiter(2, 1);
for (int i = 0; i <= 10; i++) {
System.out.println(LocalDateTime.now() + " " + leakyBucketLimiter.tryAcquire());
TimeUnit.SECONDS.sleep(1);
}
}
}