1 場景
程序中經(jīng)常需要對接口進行限流,防止訪問量太大耻台,導(dǎo)致程序崩潰。
常用的算法有:計數(shù)算法空另、漏桶算法
盆耽、令牌桶算法
,最常用的算法是后面兩種痹换。
2 算法詳解
2.1 計數(shù)算法
2.1.1 說明
技術(shù)算法征字,為最簡單的限流算法。
核心思想是娇豫,每隔一段時間
匙姜,為計數(shù)器設(shè)定最大值
,請求一次冯痢,計數(shù)器數(shù)量減一
氮昧,如果計數(shù)器為0框杜,則拒絕請求
。
2.1.2 圖示
2.1.3 適用場景
雖然此算法是大多數(shù)人第一個想到可以限流的算法袖肥,但是不推薦使用此算法
咪辱。
因為,此算法有個致命性的問題椎组,如果1秒允許的訪問次數(shù)為100油狂,前0.99秒內(nèi)沒有任何請求,在最后0.01秒內(nèi)寸癌,出現(xiàn)了200個請求
专筷,則這200個請求,都會獲取調(diào)用許可蒸苇,給程序帶來一次請求的高峰磷蛹。
如下圖所示:
2.1.4 代碼
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
/**
* 計數(shù)器限流器
*/
public class CountLimiter {
/**
* 執(zhí)行區(qū)間(毫秒)
*/
private int secondMill;
/**
* 區(qū)間內(nèi)計數(shù)多少次
*/
private int maxCount;
/**
* 當(dāng)前計數(shù)
*/
private int currentCount;
/**
* 上次更新時間(毫秒)
*/
private long lastUpdateTime;
public CountLimiter(int second, int count) {
if (second <= 0 || count <= 0) {
throw new IllegalArgumentException("second and time must by positive");
}
this.secondMill = second * 1000;
this.maxCount = count;
this.currentCount = this.maxCount;
this.lastUpdateTime = System.currentTimeMillis();
}
/**
* 刷新計數(shù)器
*/
private void refreshCount() {
long now = System.currentTimeMillis();
if ((now - this.lastUpdateTime) >= secondMill) {
this.currentCount = maxCount;
this.lastUpdateTime = now;
}
}
/**
* 獲取授權(quán)
* @return
*/
public synchronized boolean tryAcquire() {
// 刷新計數(shù)器
this.refreshCount();
if ((this.currentCount - 1) >= 0) {
this.currentCount--;
return true;
} else {
return false;
}
}
}
測試方法:
public static void main(String[] args) throws Exception {
// 1秒限制執(zhí)行2次
CountLimiter countLimiter = new CountLimiter(1, 2);
for (int i = 0; i < 10; i++) {
System.out.println(LocalDateTime.now() + " " + countLimiter.tryAcquire());
TimeUnit.MILLISECONDS.sleep(200);
}
}
執(zhí)行結(jié)果:
2021-05-31T22:01:08.660 true
2021-05-31T22:01:08.868 true
2021-05-31T22:01:09.074 false
2021-05-31T22:01:09.275 false
2021-05-31T22:01:09.485 false
2021-05-31T22:01:09.698 true
2021-05-31T22:01:09.901 true
2021-05-31T22:01:10.104 false
2021-05-31T22:01:10.316 false
2021-05-31T22:01:10.520 false
2.2 漏桶算法
2.2.1 說明
漏桶算法稱為leaky bucket
,可限制指定時間內(nèi)的最大流量溪烤,如限制60秒內(nèi)味咳,最多允許100個請求。
其中接受請求的速度是不恒定的
(水滴入桶)檬嘀,處理請求的速度是恒定的
(水滴出桶)槽驶。
算法總體描述如下:
有個固定容量的桶
B
(指定時間區(qū)間X
,允許的的最大流量B
)枪眉,如60秒內(nèi)最多允許100個請求捺檬,則B
為100,X
為60贸铜。有水滴流進來(有請求進來)堡纬,桶里的水+1。
有水滴流出去(執(zhí)行請求對應(yīng)的業(yè)務(wù))蒿秦,桶里的水-1(
業(yè)務(wù)方法烤镐,真正開始執(zhí)行
=>這是保證漏桶勻速處理業(yè)務(wù)的根本
),水滴流出去的速度是勻速的棍鳖,流速為B/X
(1毫秒100/60次炮叶,約1毫秒0.00167次,精度可根據(jù)實際情況自己控制)水桶滿了后(60秒內(nèi)請求達到了100次)渡处,水滴無法進入水桶镜悉,請求被拒絕
2.2.2 圖示
實際開發(fā)中,漏桶的使用方式可參考下圖:
需注意医瘫,水滴滴落
的時候侣肄,才開始執(zhí)行業(yè)務(wù)代碼
,而不是水滴進桶的時候
醇份,去執(zhí)行業(yè)務(wù)代碼稼锅。
業(yè)務(wù)代碼的執(zhí)行方式吼具,個人認為有如下兩種:
-
同步執(zhí)行
1、調(diào)用方請求時矩距,如水滴可以放入桶中拗盒,調(diào)用方所在的線程“阻塞”
2、水滴漏出時锥债,喚醒調(diào)用方線程陡蝇,調(diào)用方線程,執(zhí)行具體業(yè)務(wù) -
異步執(zhí)行
1哮肚、調(diào)用方請求時毅整,如水滴可以放入桶中,調(diào)用方所在的線程收到響應(yīng)绽左,方法將異步執(zhí)行
2、水滴漏出時艇潭,水桶代理執(zhí)行具體業(yè)務(wù)
網(wǎng)上很多滴桶的實現(xiàn)代碼拼窥,在水滴進桶的時候,就去執(zhí)行業(yè)務(wù)代碼了蹋凝。這樣會導(dǎo)致業(yè)務(wù)代碼鲁纠,
無法勻速地執(zhí)行
,仍然對被調(diào)用的接口有一瞬間流量的沖擊
(和令牌桶算法
的最終實現(xiàn)效果一樣)鳍寂。
2.2.3 適用場景
水桶的進水速度是不可控的
改含,有可能一瞬間有大量的請求
進入水桶。處理請求的速度是恒定的
(滴水的時候處理請求)迄汛。
此算法捍壤,主要應(yīng)用于自己的服務(wù),調(diào)用外部接口
鞍爱。以均勻
的速度調(diào)用外部接口鹃觉,防止對外部接口的壓力過大
,而影響外部系統(tǒng)的穩(wěn)定性睹逃。如果影響了別人的系統(tǒng)盗扇,接口所在公司會來找你喝茶。
漏桶算法沉填,主要用來保護別人的接口疗隶。
2.2.4 代碼
本實例代碼的實現(xiàn),在水滴滴下翼闹,執(zhí)行具體業(yè)務(wù)代碼時斑鼻,采用同步執(zhí)行
的方式。即喚醒調(diào)用方的線程橄碾,讓"調(diào)用者"所屬的線程
去執(zhí)行具體業(yè)務(wù)代碼卵沉,去調(diào)用接口
颠锉。
import java.net.SocketTimeoutException;
import java.time.LocalDateTime;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
/**
* 漏桶算法
*/
public class LeakyBucketLimiterUtil {
/**
* 漏桶流出速率(多少納秒執(zhí)行一次)
*/
private long outflowRateNanos;
/**
* 漏桶容器
*/
private volatile BlockingQueue<Drip> queue;
/**
* 滴水線程
*/
private Thread outflowThread;
/**
* 水滴
*/
private static class Drip {
/**
* 業(yè)務(wù)主鍵
*/
private String busId;
/**
* 水滴對應(yīng)的調(diào)用者線程
*/
private Thread thread;
public Drip(String busId, Thread thread) {
this.thread = thread;
}
public String getBusId() {
return this.busId;
}
public Thread getThread() {
return this.thread;
}
}
/**
* @param second 秒
* @param time 調(diào)用次數(shù)
*/
public LeakyBucketLimiterUtil(int second, int time) {
if (second <= 0 || time <= 0) {
throw new IllegalArgumentException("second and time must by positive");
}
outflowRateNanos = TimeUnit.SECONDS.toNanos(second) / time;
queue = new LinkedBlockingQueue<>(time);
outflowThread = new Thread(() -> {
while (true) {
Drip drip = null;
try {
// 阻塞,直到從桶里拿到水滴
drip = queue.take();
} catch (Exception e) {
e.printStackTrace();
}
if (drip != null && drip.getThread() != null) {
// 喚醒阻塞的水滴里面的線程
LockSupport.unpark(drip.getThread());
}
// 休息一段時間史汗,開始下一次滴水
LockSupport.parkNanos(this, outflowRateNanos);
}
}, "漏水線程");
outflowThread.start();
}
/**
* 業(yè)務(wù)請求
*
* @return
*/
public boolean acquire(String busId) {
Thread thread = Thread.currentThread();
Drip drip = new Drip(busId, thread);
if (this.queue.offer(drip)) {
LockSupport.park();
return true;
} else {
return false;
}
}
}
測試代碼如下:
public static void main(String[] args) throws Exception {
// 1秒限制執(zhí)行1次
LeakyBucketLimiterUtil leakyBucketLimiter = new LeakyBucketLimiterUtil(5, 2);
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
String busId = "[業(yè)務(wù)ID:" + LocalDateTime.now().toString() + "]";
if (leakyBucketLimiter.acquire(busId)) {
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + ":調(diào)用外部接口...成功:" + busId);
} else {
System.out.println(LocalDateTime.now() + " " + Thread.currentThread().getName() + ":調(diào)用外部接口...失斍砺印:" + busId);
}
}
}, "測試線程-" + i).start();
TimeUnit.MILLISECONDS.sleep(500);
}
}
執(zhí)行結(jié)果如下:
2021-05-31T20:52:52.297 測試線程-0:調(diào)用外部接口...成功:[業(yè)務(wù)ID:2021-05-31T20:52:52.295]
2021-05-31T20:52:53.782 測試線程-3:調(diào)用外部接口...失敗:[業(yè)務(wù)ID:2021-05-31T20:52:53.782]
2021-05-31T20:52:54.286 測試線程-4:調(diào)用外部接口...失斖W病:[業(yè)務(wù)ID:2021-05-31T20:52:54.286]
2021-05-31T20:52:54.799 測試線程-1:調(diào)用外部接口...成功:[業(yè)務(wù)ID:2021-05-31T20:52:52.761]
2021-05-31T20:52:55.300 測試線程-6:調(diào)用外部接口...失敶赏堋:[業(yè)務(wù)ID:2021-05-31T20:52:55.300]
2021-05-31T20:52:55.806 測試線程-7:調(diào)用外部接口...失敗:[業(yè)務(wù)ID:2021-05-31T20:52:55.806]
2021-05-31T20:52:56.307 測試線程-8:調(diào)用外部接口...失敻甓尽:[業(yè)務(wù)ID:2021-05-31T20:52:56.307]
2021-05-31T20:52:56.822 測試線程-9:調(diào)用外部接口...失敿桠:[業(yè)務(wù)ID:2021-05-31T20:52:56.822]
2021-05-31T20:52:57.304 測試線程-2:調(diào)用外部接口...成功:[業(yè)務(wù)ID:2021-05-31T20:52:53.271]
2021-05-31T20:52:59.817 測試線程-5:調(diào)用外部接口...成功:[業(yè)務(wù)ID:2021-05-31T20:52:54.799]
2.3 令牌桶算法
2.3.1 說明
令牌桶算法,主要是勻速地增加可用令牌埋市,令牌數(shù)因為桶的限制有數(shù)量上限冠桃。
請求拿到令牌,相當(dāng)于拿到授權(quán)道宅,即可進行相應(yīng)的業(yè)務(wù)操作食听。
2.3.2 圖示
2.3.3 適用場景
和漏桶算法比,有可能導(dǎo)致短時間內(nèi)的請求數(shù)上升(因為拿到令牌后污茵,就可以訪問接口樱报,有可能一瞬間將所有令牌拿走),但是不會有計數(shù)算法那樣高的峰值(因為令牌數(shù)量是勻速增加的)泞当。
一般自己調(diào)用自己的接口迹蛤,接口會有一定的伸縮性,令牌桶算法襟士,主要用來保護自己的服務(wù)器接口盗飒。
2.3.4 代碼
簡易代碼實現(xiàn)如下:
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
/**
* 令牌桶限流算法
*/
public class TokenBucketLimiter {
/**
* 桶的大小
*/
private double bucketSize;
/**
* 桶里的令牌數(shù)
*/
private double tokenCount;
/**
* 令牌增加速度(每毫秒)
*/
private double tokenAddRateMillSecond;
/**
* 上次更新時間(毫秒)
*/
private long lastUpdateTime;
/**
* @param second 秒
* @param time 調(diào)用次數(shù)
*/
public TokenBucketLimiter(double second, double time) {
if (second <= 0 || time <= 0) {
throw new IllegalArgumentException("second and time must by positive");
}
// 桶的大小
this.bucketSize = time;
// 桶里的令牌數(shù)
this.tokenCount = this.bucketSize;
// 令牌增加速度(每毫秒)
this.tokenAddRateMillSecond = time / second / 1000;
// 上次更新時間(毫秒)
this.lastUpdateTime = System.currentTimeMillis();
}
/**
* 刷新桶內(nèi)令牌數(shù)(令牌數(shù)不得超過桶的大小)
* 計算“上次刷新時間”到“當(dāng)前刷新時間”中間,增加的令牌數(shù)
*/
private void refreshTokenCount() {
long now = System.currentTimeMillis();
this.tokenCount = Math.min(this.bucketSize, this.tokenCount + ((now - this.lastUpdateTime) * this.tokenAddRateMillSecond));
this.lastUpdateTime = now;
}
/**
* 嘗試拿到權(quán)限
*
* @return
*/
public synchronized boolean tryAcquire() {
// 刷新桶內(nèi)令牌數(shù)
this.refreshTokenCount();
if ((this.tokenCount - 1) >= 0) {
// 如果桶中有令牌陋桂,令牌數(shù)-1
this.tokenCount--;
return true;
} else {
// 桶中已無令牌
return false;
}
}
}
測試代碼:
public static void main(String[] args) throws Exception{
// 2秒執(zhí)行1次
TokenBucketLimiter leakyBucketLimiter = new TokenBucketLimiter(2, 1);
for (int i = 0; i < 10; i++) {
System.out.println(LocalDateTime.now() + " " + leakyBucketLimiter.tryAcquire());
TimeUnit.SECONDS.sleep(1);
}
}
執(zhí)行結(jié)果如下:
2021-05-31T21:38:34.560 true
2021-05-31T21:38:35.582 false
2021-05-31T21:38:36.588 true
2021-05-31T21:38:37.596 false
2021-05-31T21:38:38.608 true
2021-05-31T21:38:39.610 false
2021-05-31T21:38:40.615 true
2021-05-31T21:38:41.627 false
2021-05-31T21:38:42.641 true
2021-05-31T21:38:43.649 false
2.3.5 第三方工具類
可以使用Guava
中的RateLimiter
來實現(xiàn)令牌桶的限流功能箩兽。
maven依賴如下:
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1.1-jre</version>
</dependency>
直接獲取令牌(true為獲取到令牌,false為獲取失斦潞怼):
RateLimiter rateLimiter = RateLimiter.create(2);
boolean acquireResule = rateLimiter.tryAcquire();
if (acquireResule) {
System.out.println("獲取令牌:成功");
} else {
System.out.println("獲取令牌:失敗");
}
等待嘗試獲取令牌(阻塞當(dāng)前線程汗贫,直到獲取到令牌):
RateLimiter rateLimiter = RateLimiter.create(2);
// 阻塞獲取令牌
double waitCount = rateLimiter.acquire();
System.out.println("阻塞等待時間:" + waitCount);