接上篇。Guava的令牌桶的實(shí)現(xiàn)中掉弛,包括一條設(shè)計(jì)哲學(xué)搅幅,需要大家注意:它允許瞬間的流量波峰超過(guò)QPS,但瞬間過(guò)后的請(qǐng)求將會(huì)等待較長(zhǎng)的時(shí)間來(lái)緩解上次的波峰堪置,以使得平均的QPS等于預(yù)定值躬存。
RateLimiter類(lèi)提供了令牌桶的接口,它是一個(gè)抽象類(lèi)舀锨,其子類(lèi)有SmoothRateLimiter(也是個(gè)抽象類(lèi))以及孫子類(lèi)SmoothBursty岭洲,SmoothWarmingUp。SmoothRateLimiter類(lèi)實(shí)現(xiàn)了算法的核心部分坎匿,因次我們暫且只討論SmoothRateLimiter和其實(shí)現(xiàn)類(lèi)SmoothBursty盾剩。RateLimiter都是通過(guò)靜態(tài)的create函數(shù)實(shí)例化雷激。以create(double permitsPerSecond)為例。參數(shù)permitsPerSecond為配置的QPS告私。該方法簡(jiǎn)潔明了侥锦,屏蔽了很多用戶無(wú)需關(guān)心的細(xì)節(jié)。
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
接著該方法調(diào)用了create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer())方法(該方法由于是包的訪問(wèn)權(quán)限德挣,在實(shí)際的項(xiàng)目中恭垦,基本不會(huì)直接調(diào)用),同時(shí)創(chuàng)建了一個(gè)StopWatch格嗅,自動(dòng)啟動(dòng)番挺。
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
該方法創(chuàng)建了SmoothBursty實(shí)例,up-casting為RateLimiter屯掖。maxBurstSeconds固定為1玄柏,說(shuō)明令牌桶中所能存儲(chǔ)的的最大令牌數(shù)是1*QPS。接著調(diào)用setRate方法贴铜,該方法初始化一些重要的參數(shù):
public final void setRate(double permitsPerSecond) {
checkArgument(
permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}
主要實(shí)現(xiàn)在SmoothRateLimiter中:
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
其中resync方法是一個(gè)關(guān)鍵的方法粪摘,在請(qǐng)求令牌時(shí)也會(huì)用到,后面還會(huì)說(shuō)明:
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
從中可以看出绍坝,如果nowMicros大于nextFreeTicketMicros徘意,會(huì)重新計(jì)算nextFreeTicketMicros和storedPermit的值。設(shè)置stableIntervalMicros轩褐,該字段表示1/QPS椎咧,即生產(chǎn)令牌的速率。
接著調(diào)用doSetRate方法把介,該方法在SmoothBursty類(lèi)中勤讽。
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
初始化maxPermits和storePermits,后者永遠(yuǎn)不會(huì)大于前者拗踢。
到此脚牍,rateLimiter初始化完成。除了resync方法巢墅,在不重新設(shè)置rate的情況诸狭,其他方法不在處理請(qǐng)求時(shí)用到,暫時(shí)忽略砂缩。
下面看關(guān)鍵的令牌申請(qǐng)的過(guò)程作谚。
首先調(diào)用acquire()方法,申請(qǐng)令牌庵芭,無(wú)參數(shù)表示申請(qǐng)一個(gè)妹懒。
public double acquire() {
return acquire(1);
}
接著調(diào)用acquire(int permits)方法:
@CanIgnoreReturnValue
public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
reserve方法返回獲取令牌所需要等待的時(shí)間,stopwatch阻塞當(dāng)前線程双吆,最后返回線程休眠的秒數(shù)眨唬。如果microsToWait為0会前,表示立即返回。
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
reserve需要獲取鎖才可以操作匾竿,這也是令牌桶線程安全的原因瓦宜,以下操作都在同步代碼塊中。
繼續(xù)reserveAndGetWaitLength方法岭妖。
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
首先調(diào)用reserveEarliestAvailable临庇,方法名說(shuō)明了返回值的意義:即返回滿足當(dāng)前請(qǐng)求的最早的時(shí)鐘,該值大于等于nowMicros昵慌。如何保證這一點(diǎn)的呢假夺?我們看該方法:
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
這十多行代碼是整個(gè)算法實(shí)現(xiàn)的核心,重點(diǎn)說(shuō)明:
- 首先調(diào)用resync(nowMicros)斋攀,重置nextFreeTicketMicros已卷。如果nowMicros在nextFreeTicketMicros之后,nextFreeTicketMicros=nowMicros淳蔼,并往storedPermits中增加這段時(shí)間能產(chǎn)生的令牌侧蘸。
返回值設(shè)置為當(dāng)前的nextFreeTicketMicros。為什么要這樣設(shè)置呢鹉梨?因?yàn)槿绻鹡owMicros大于nextFreeTicketMicros讳癌,說(shuō)明令牌桶肯定能滿足需求(無(wú)論請(qǐng)求的令牌數(shù)目是多少,參見(jiàn)最上面的設(shè)計(jì)哲學(xué))俯画,而resync方法已經(jīng)修改了nextFreeTicketMicros值為nowMicros值析桥,逐層返回給調(diào)用者時(shí),等待時(shí)間為0艰垂,線程無(wú)需等待;反之埋虹,如果nowMicros小于等于nextFreeTicketMicros猜憎,說(shuō)明請(qǐng)求過(guò)快,線程需要等待搔课,等待的時(shí)間就是nextFreeTicketMicros-nowMicros胰柑。 - 接下來(lái),storedPermitsToSpend代表令牌桶中已有的令牌數(shù)爬泥,可以用于當(dāng)前的請(qǐng)求。但未必滿足需求袍啡。
- 其次,freshPermits代表需要新生成的令牌數(shù)蔗牡。如果storedPermits已經(jīng)滿足需求颖系,則freshPermits為0。
- 再次辩越,計(jì)算新生成令牌需要花費(fèi)的時(shí)間,這些需要后來(lái)者償還黔攒。
- 然后修改nextFreeTicketMicros的值。
- 最后修改storedPermits值督惰。
至此整個(gè)處理過(guò)程結(jié)束莲绰。
經(jīng)過(guò)上面的代碼梳理,詳細(xì)大家對(duì)RateLimiter的代碼有個(gè)比較清晰的認(rèn)識(shí)姑丑,但要加深理解蛤签,還需要多做debug和test震肮。
Guava包里面包括了很多test case留拾。我們可以把test類(lèi)單拿出來(lái),根據(jù)自己的情況添加相應(yīng)的case即可痴柔。該類(lèi)是com.google.common.util.concurrent. RateLimiterTest。由于很多類(lèi)都使用了默認(rèn)訪問(wèn)權(quán)限咳蔚,我們需要定義一個(gè) com.google.common.util.concurrent包谈火,導(dǎo)入RateLimiterTest類(lèi)。該類(lèi)中糯耍,guava提供了一個(gè)FakeStopwatch的nested class。它能夠讓時(shí)鐘按照我們的要求暫停革为,休眠隨意的時(shí)長(zhǎng)舵鳞,并記錄休眠和請(qǐng)求對(duì)應(yīng)的事件,并已特定的格式輸出恳蹲。例如:R1.00代表請(qǐng)求給定的令牌延遲了1秒;U1.05表示stopwatch休眠1.05秒嘉蕾,即模擬時(shí)鐘過(guò)了1.05秒。例如一個(gè)測(cè)試通過(guò)的case:
public void testSimple() {
RateLimiter limiter = RateLimiter.create(5.0, stopwatch);
limiter.acquire(); // R0.00
limiter.acquire(); // R0.20
limiter.acquire(); // R0.20
stopwatch.sleepMillis(1000); // U1.00
assertEvents("R0.00", "R0.20", "R0.20", "U1.00");
}
下面提供一個(gè)case儡率,驗(yàn)證下大家的理解以清。
public void testOneSecondBurst3() {
RateLimiter limiter = RateLimiter.create(1.0, stopwatch);
limiter.acquire(1); // R值?
stopwatch.sleepMillis(1050);//U值眉孩?
limiter.acquire(1); // R值勒葱? nowMicros? nextFree凛虽?
stopwatch.sleepMillis(950);
limiter.acquire(1); // R值凯旋? nowMicros? nextFree至非?
stopwatch.sleepMillis(1000);
limiter.acquire(1); // R值? nowMicros踏幻? nextFree戳杀?
}
關(guān)注公眾號(hào)“碼農(nóng)走向藝術(shù)”夭苗,回復(fù)消息可以獲取答案幺:)