前言
在開發(fā)高并發(fā)系統(tǒng)時有三把利器用來保護系統(tǒng):緩存、降級和限流
-
緩存
緩存的目的是提升系統(tǒng)訪問速度和增大系統(tǒng)處理容量 -
降級
降級是當(dāng)服務(wù)出現(xiàn)問題或者影響到核心流程時衬衬,需要暫時屏蔽掉邑滨,待高峰或者問題解決后再打開 -
限流
限流的目的是通過對并發(fā)訪問/請求進行限速疾忍,或者對一個時間窗口內(nèi)的請求進行限速來保護系統(tǒng)贸典,一旦達到限制速率則可以拒絕服務(wù)配乓、排隊或等待底循、降級等處理
常用的限流算法
漏桶算法
漏桶算法思路很簡單巢株,水(請求)先進入到漏桶里,漏桶以一定的速度出水熙涤,當(dāng)水流入速度過大會直接溢出阁苞,可以看出漏桶算法能強行限制數(shù)據(jù)的傳輸速率。
令牌桶算法
對于很多應(yīng)用場景來說祠挫,除了要求能夠限制數(shù)據(jù)的平均傳輸速率外猬错,還要求允許某種程度的突發(fā)傳輸。這時候漏桶算法可能就不合適了茸歧,令牌桶算法更為適合倦炒。如圖所示,令牌桶算法的原理是系統(tǒng)會以一個恒定的速度往桶里放入令牌软瞎,而如果請求需要被處理逢唤,則需要先從桶里獲取一個令牌拉讯,當(dāng)桶里沒有令牌可取時,則拒絕服務(wù)鳖藕。
RateLimiter使用以及源碼解析
Google開源工具包Guava提供了限流工具類RateLimiter魔慷,該類基于令牌桶算法實現(xiàn)流量限制,使用十分方便著恩,而且十分高效院尔。
RateLimiter使用
首先簡單介紹下RateLimiter的使用,
public void testAcquire() {
RateLimiter limiter = RateLimiter.create(1);
for(int i = 1; i < 10; i = i + 2 ) {
double waitTime = limiter.acquire(i);
System.out.println("cutTime=" + System.currentTimeMillis() + " acq:" + i + " waitTime:" + waitTime);
}
}
輸出結(jié)果:
cutTime=1535439657427 acq:1 waitTime:0.0
cutTime=1535439658431 acq:3 waitTime:0.997045
cutTime=1535439661429 acq:5 waitTime:2.993028
cutTime=1535439666426 acq:7 waitTime:4.995625
cutTime=1535439673426 acq:9 waitTime:6.999223
首先通過RateLimiter.create(1);
創(chuàng)建一個限流器喉誊,參數(shù)代表每秒生成的令牌數(shù)邀摆,通過limiter.acquire(i);
來以阻塞的方式獲取令牌,當(dāng)然也可以通過tryAcquire(int permits, long timeout, TimeUnit unit)
來設(shè)置等待超時時間的方式獲取令牌伍茄,如果超timeout為0栋盹,則代表非阻塞,獲取不到立即返回敷矫。
從輸出來看例获,RateLimiter支持預(yù)消費,比如在acquire(5)時曹仗,等待時間是3秒榨汤,是上一個獲取令牌時預(yù)消費了3個兩排,固需要等待3*1秒怎茫,然后又預(yù)消費了5個令牌收壕,以此類推
RateLimiter通過限制后面請求的等待時間,來支持一定程度的突發(fā)請求(預(yù)消費)遭居,在使用過程中需要注意這一點啼器,具體實現(xiàn)原理后面再分析。
RateLimiter實現(xiàn)原理
Guava有兩種限流模式俱萍,一種為穩(wěn)定模式(SmoothBursty:令牌生成速度恒定)端壳,一種為漸進模式(SmoothWarmingUp:令牌生成速度緩慢提升直到維持在一個穩(wěn)定值) 兩種模式實現(xiàn)思路類似,主要區(qū)別在等待時間的計算上枪蘑,本篇重點介紹SmoothBursty
RateLimiter的創(chuàng)建
通過調(diào)用RateLimiter的create
接口來創(chuàng)建實例损谦,實際是調(diào)用的SmoothBuisty
穩(wěn)定模式創(chuàng)建的實例。
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
SmoothBursty
中的兩個構(gòu)造參數(shù)含義:
- SleepingStopwatch:guava中的一個時鐘類實例岳颇,會通過這個來計算時間及令牌
- maxBurstSeconds:官方解釋照捡,在ReteLimiter未使用時,最多保存幾秒的令牌话侧,默認是1
在解析SmoothBursty原理前栗精,重點解釋下SmoothBursty中幾個屬性的含義
/**
* The work (permits) of how many seconds can be saved up if this RateLimiter is unused?
* 在RateLimiter未使用時,最多存儲幾秒的令牌
* */
final double maxBurstSeconds;
/**
* The currently stored permits.
* 當(dāng)前存儲令牌數(shù)
*/
double storedPermits;
/**
* The maximum number of stored permits.
* 最大存儲令牌數(shù) = maxBurstSeconds * stableIntervalMicros(見下文)
*/
double maxPermits;
/**
* The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
* per second has a stable interval of 200ms.
* 添加令牌時間間隔 = SECONDS.toMicros(1L) / permitsPerSecond;(1秒/每秒的令牌數(shù))
*/
double stableIntervalMicros;
/**
* The time when the next request (no matter its size) will be granted. After granting a request,
* this is pushed further in the future. Large requests push this further than small requests.
* 下一次請求可以獲取令牌的起始時間
* 由于RateLimiter允許預(yù)消費悲立,上次請求預(yù)消費令牌后
* 下次請求需要等待相應(yīng)的時間到nextFreeTicketMicros時刻才可以獲取令牌
*/
private long nextFreeTicketMicros = 0L; // could be either in the past or future
接下來介紹幾個關(guān)鍵函數(shù)
- setRate
public final void setRate(double permitsPerSecond) {
checkArgument(
permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}
通過這個接口設(shè)置令牌通每秒生成令牌的數(shù)量鹿寨,內(nèi)部時間通過調(diào)用SmoothRateLimiter
的doSetRate
來實現(xiàn)
- doSetRate
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
這里先通過調(diào)用resync
生成令牌以及更新下一期令牌生成時間,然后更新stableIntervalMicros薪夕,最后又調(diào)用了SmoothBursty
的doSetRate
- resync
/**
* Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time.
* 基于當(dāng)前時間脚草,更新下一次請求令牌的時間,以及當(dāng)前存儲的令牌(可以理解為生成令牌)
*/
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
根據(jù)令牌桶算法原献,桶中的令牌是持續(xù)生成存放的馏慨,有請求時需要先從桶中拿到令牌才能開始執(zhí)行,誰來持續(xù)生成令牌存放呢姑隅?
一種解法是写隶,開啟一個定時任務(wù),由定時任務(wù)持續(xù)生成令牌粤策。這樣的問題在于會極大的消耗系統(tǒng)資源樟澜,如误窖,某接口需要分別對每個用戶做訪問頻率限制叮盘,假設(shè)系統(tǒng)中存在6W用戶,則至多需要開啟6W個定時任務(wù)來維持每個桶中的令牌數(shù)霹俺,這樣的開銷是巨大的柔吼。
另一種解法則是延遲計算,如上resync
函數(shù)丙唧。該函數(shù)會在每次獲取令牌之前調(diào)用愈魏,其實現(xiàn)思路為,若當(dāng)前時間晚于nextFreeTicketMicros想际,則計算該段時間內(nèi)可以生成多少令牌培漏,將生成的令牌加入令牌桶中并更新數(shù)據(jù)。這樣一來胡本,只需要在獲取令牌時計算一次即可牌柄。
- SmoothBursty的doSetRate
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
// Double.POSITIVE_INFINITY 代表無窮啊
storedPermits = maxPermits;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
桶中可存放的最大令牌數(shù)由maxBurstSeconds計算而來,其含義為最大存儲maxBurstSeconds秒生成的令牌侧甫。
該參數(shù)的作用在于珊佣,可以更為靈活地控制流量。如披粟,某些接口限制為300次/20秒咒锻,某些接口限制為50次/45秒等。也就是流量不局限于qps
RateLimiter幾個常用接口分析
在了解以上概念后守屉,就非常容易理解RateLimiter暴露出來的接口
@CanIgnoreReturnValue
public double acquire() {
return acquire(1);
}
/**
* 獲取令牌惑艇,返回阻塞的時間
**/
@CanIgnoreReturnValue
public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
acquire
函數(shù)主要用于獲取permits個令牌,并計算需要等待多長時間拇泛,進而掛起等待滨巴,并將該值返回须板,主要通過reserve
返回需要等待的時間,reserve
中通過調(diào)用reserveAndGetWaitLength
獲取等待時間
/**
* Reserves next ticket and returns the wait time that the caller must wait for.
*
* @return the required wait time, never negative
*/
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
最后調(diào)用了reserveEarliestAvailable
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
首先通過resync生成令牌以及同步nextFreeTicketMicros時間戳兢卵,freshPermits從令牌桶中獲取令牌后還需要的令牌數(shù)量习瑰,通過storedPermitsToWaitTime計算出獲取freshPermits還需要等待的時間,在穩(wěn)定模式中秽荤,這里就是(long) (freshPermits * stableIntervalMicros) 甜奄,然后更新nextFreeTicketMicros以及storedPermits,這次獲取令牌需要的等待到的時間點窃款, reserveAndGetWaitLength返回需要等待的時間間隔课兄。
從`reserveEarliestAvailable`可以看出RateLimiter的預(yù)消費原理,以及獲取令牌的等待時間時間原理(可以解釋示例結(jié)果)晨继,再獲取令牌不足時烟阐,并沒有等待到令牌全部生成,而是更新了下次獲取令牌時的nextFreeTicketMicros紊扬,從而影響的是下次獲取令牌的等待時間蜒茄。
`reserve`這里返回等待時間后,`acquire`通過調(diào)用`stopwatch.sleepMicrosUninterruptibly(microsToWait);`進行sleep操作餐屎,這里不同于Thread.sleep(), 這個函數(shù)的sleep是uninterruptibly的檀葛,內(nèi)部實現(xiàn):
public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
//sleep 阻塞線程 內(nèi)部通過Thread.sleep()
boolean interrupted = false;
try {
long remainingNanos = unit.toNanos(sleepFor);
long end = System.nanoTime() + remainingNanos;
while (true) {
try {
// TimeUnit.sleep() treats negative timeouts just like zero.
NANOSECONDS.sleep(remainingNanos);
return;
} catch (InterruptedException e) {
interrupted = true;
remainingNanos = end - System.nanoTime();
//如果被interrupt可以繼續(xù),更新sleep時間腹缩,循環(huán)繼續(xù)sleep
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
//如果被打斷過屿聋,sleep過后再真正中斷線程
}
}
}
sleep之后,`acquire`返回sleep的時間藏鹊,阻塞結(jié)束润讥,獲取到令牌。
public boolean tryAcquire(int permits) {
return tryAcquire(permits, 0, MICROSECONDS);
}
public boolean tryAcquire() {
return tryAcquire(1, 0, MICROSECONDS);
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
long timeoutMicros = max(unit.toMicros(timeout), 0);
checkPermits(permits);
long microsToWait;
synchronized (mutex()) {
long nowMicros = stopwatch.readMicros();
if (!canAcquire(nowMicros, timeoutMicros)) {
return false;
} else {
microsToWait = reserveAndGetWaitLength(permits, nowMicros);
}
}
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return true;
}
private boolean canAcquire(long nowMicros, long timeoutMicros) {
return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}
@Override
final long queryEarliestAvailable(long nowMicros) {
return nextFreeTicketMicros;
}
tryAcquire
函數(shù)可以嘗試在timeout時間內(nèi)獲取令牌盘寡,如果可以則掛起等待相應(yīng)時間并返回true楚殿,否則立即返回false
canAcquire
用于判斷timeout時間內(nèi)是否可以獲取令牌,通過判斷當(dāng)前時間+超時時間是否大于nextFreeTicketMicros 來決定是否能夠拿到足夠的令牌數(shù)宴抚,如果可以獲取到勒魔,則過程同acquire,線程sleep等待菇曲,如果通過canAcquire
在此超時時間內(nèi)不能回去到令牌冠绢,則可以快速返回,不需要等待timeout后才知道能否獲取到令牌常潮。
到此弟胀,Guava RateLimiter穩(wěn)定模式的實現(xiàn)原理基本已經(jīng)清楚,如發(fā)現(xiàn)文中錯誤的地方,勞煩指正孵户!
上述分析主要參考了:https://segmentfault.com/a/1190000012875897萧朝,再此基礎(chǔ)上做了些筆記補充