聊聊Guava的RateLimiter

本文主要研究一下Guava的RateLimiter

RateLimiter

guava-26.0-jre-sources.jar!/com/google/common/util/concurrent/RateLimiter.java

@Beta
@GwtIncompatible
public abstract class RateLimiter {

    //......
 /**
   * Acquires the given number of permits from this {@code RateLimiter}, blocking until the request
   * can be granted. Tells the amount of time slept, if any.
   *
   * @param permits the number of permits to acquire
   * @return time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited
   * @throws IllegalArgumentException if the requested number of permits is negative or zero
   * @since 16.0 (present in 13.0 with {@code void} return type})
   */
  @CanIgnoreReturnValue
  public double acquire(int permits) {
    long microsToWait = reserve(permits);
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
  }

  /**
   * Reserves the given number of permits from this {@code RateLimiter} for future use, returning
   * the number of microseconds until the reservation can be consumed.
   *
   * @return time in microseconds to wait until the resource can be acquired, never negative
   */
  final long reserve(int permits) {
    checkPermits(permits);
    synchronized (mutex()) {
      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
  }

  private static void checkPermits(int permits) {
    checkArgument(permits > 0, "Requested permits (%s) must be positive", permits);
  }

  /**
   * Reserves next ticket and returns the wait time that the caller must wait for.
   *
   * @return the required wait time, never negative
   */
  final long reserveAndGetWaitLength(int permits, long nowMicros) {
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    return max(momentAvailable - nowMicros, 0);
  }

  public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
    long timeoutMicros = max(unit.toMicros(timeout), 0);
    checkPermits(permits);
    long microsToWait;
    synchronized (mutex()) {
      long nowMicros = stopwatch.readMicros();
      if (!canAcquire(nowMicros, timeoutMicros)) {
        return false;
      } else {
        microsToWait = reserveAndGetWaitLength(permits, nowMicros);
      }
    }
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return true;
  }

  private boolean canAcquire(long nowMicros, long timeoutMicros) {
    return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
  }

  /**
   * Reserves next ticket and returns the wait time that the caller must wait for.
   *
   * @return the required wait time, never negative
   */
  final long reserveAndGetWaitLength(int permits, long nowMicros) {
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    return max(momentAvailable - nowMicros, 0);
  }

  /**
   * Returns the earliest time that permits are available (with one caveat).
   *
   * @return the time that permits are available, or, if permits are available immediately, an
   *     arbitrary past or present time
   */
  abstract long queryEarliestAvailable(long nowMicros);

  /**
   * Reserves the requested number of permits and returns the time that those permits can be used
   * (with one caveat).
   *
   * @return the time that the permits may be used, or, if the permits may be used immediately, an
   *     arbitrary past or present time
   */
  abstract long reserveEarliestAvailable(int permits, long nowMicros);

  //......
}
  • 這里主要看acquire以及tryAcquire方法
  • acquire主要依賴reserve方法畦粮,先調(diào)用reserveAndGetWaitLength,最后是調(diào)用reserveEarliestAvailable方法
  • tryAcquire也會調(diào)用reserveAndGetWaitLength乖阵,最后也是調(diào)用reserveEarliestAvailable方法
  • reserveEarliestAvailable是抽象方法锈玉,由子類去實現(xiàn)

SmoothRateLimiter

guava-26.0-jre-sources.jar!/com/google/common/util/concurrent/SmoothRateLimiter.java

@GwtIncompatible
abstract class SmoothRateLimiter extends RateLimiter {
  //......
  @Override
  final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    resync(nowMicros);
    long returnValue = nextFreeTicketMicros;
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    double freshPermits = requiredPermits - storedPermitsToSpend;
    long waitMicros =
        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
            + (long) (freshPermits * stableIntervalMicros);

    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
  }

  /** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */
  void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      storedPermits = min(maxPermits, storedPermits + newPermits);
      nextFreeTicketMicros = nowMicros;
    }
  }

  /**
   * Translates a specified portion of our currently stored permits which we want to spend/acquire,
   * into a throttling time. Conceptually, this evaluates the integral of the underlying function we
   * use, for the range of [(storedPermits - permitsToTake), storedPermits].
   *
   * <p>This always holds: {@code 0 <= permitsToTake <= storedPermits}
   */
  abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);

  /**
   * Returns the number of microseconds during cool down that we have to wait to get a new permit.
   */
  abstract double coolDownIntervalMicros();

  //......
}
  • SmoothRateLimiter是RateLimiter的抽象子類,是平滑限流實現(xiàn)類的抽象父類
  • 這里首先調(diào)用resync方法(用于處理根據(jù)速率添加token的邏輯)义起,然后再去計算permits扣減以及等待時間的計算
  • 這里調(diào)用了兩個抽象方法拉背,分別是coolDownIntervalMicros以及storedPermitsToWaitTime

SmoothRateLimiter的兩個子類

SmoothRateLimiter有兩個內(nèi)部靜態(tài)子類,分別是SmoothBursty以及SmoothWarmingUp

SmoothBursty

  /**
   * This implements a "bursty" RateLimiter, where storedPermits are translated to zero throttling.
   * The maximum number of permits that can be saved (when the RateLimiter is unused) is defined in
   * terms of time, in this sense: if a RateLimiter is 2qps, and this time is specified as 10
   * seconds, we can save up to 2 * 10 = 20 permits.
   */
  static final class SmoothBursty extends SmoothRateLimiter {
    /** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */
    final double maxBurstSeconds;

    SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
      super(stopwatch);
      this.maxBurstSeconds = maxBurstSeconds;
    }

    @Override
    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
      double oldMaxPermits = this.maxPermits;
      maxPermits = maxBurstSeconds * permitsPerSecond;
      if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = maxPermits;
      } else {
        storedPermits =
            (oldMaxPermits == 0.0)
                ? 0.0 // initial state
                : storedPermits * maxPermits / oldMaxPermits;
      }
    }

    @Override
    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
      return 0L;
    }

    @Override
    double coolDownIntervalMicros() {
      return stableIntervalMicros;
    }
  }
  • SmoothBursty是一個zero throttling的"bursty" RateLimiter
  • coolDownIntervalMicros返回的是stableIntervalMicros默终,而storedPermitsToWaitTime返回的為0

SmoothWarmingUp

  static final class SmoothWarmingUp extends SmoothRateLimiter {
    private final long warmupPeriodMicros;
    /**
     * The slope of the line from the stable interval (when permits == 0), to the cold interval
     * (when permits == maxPermits)
     */
    private double slope;

    private double thresholdPermits;
    private double coldFactor;

    SmoothWarmingUp(
        SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) {
      super(stopwatch);
      this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
      this.coldFactor = coldFactor;
    }

    @Override
    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
      double oldMaxPermits = maxPermits;
      double coldIntervalMicros = stableIntervalMicros * coldFactor;
      thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
      maxPermits =
          thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
      slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
      if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = 0.0;
      } else {
        storedPermits =
            (oldMaxPermits == 0.0)
                ? maxPermits // initial state is cold
                : storedPermits * maxPermits / oldMaxPermits;
      }
    }

    @Override
    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
      double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
      long micros = 0;
      // measuring the integral on the right part of the function (the climbing line)
      if (availablePermitsAboveThreshold > 0.0) {
        double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
        // TODO(cpovirk): Figure out a good name for this variable.
        double length =
            permitsToTime(availablePermitsAboveThreshold)
                + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
        micros = (long) (permitsAboveThresholdToTake * length / 2.0);
        permitsToTake -= permitsAboveThresholdToTake;
      }
      // measuring the integral on the left part of the function (the horizontal line)
      micros += (long) (stableIntervalMicros * permitsToTake);
      return micros;
    }

    private double permitsToTime(double permits) {
      return stableIntervalMicros + permits * slope;
    }

    @Override
    double coolDownIntervalMicros() {
      return warmupPeriodMicros / maxPermits;
    }
  }
  • coolDownIntervalMicros返回的是warmupPeriodMicros / maxPermits椅棺,而storedPermitsToWaitTime的計算相對復(fù)雜一些
  • SmoothBursty是基于token bucket算法,允許一定量的bursty流量齐蔽,但是有些場景需要bursty流量更平滑些两疚,這就需要使用SmoothWarmingUp
  • SmoothWarmingUp有一個warmup period,為thresholdPermits到maxPermits的這段范圍
   * <pre>
   *          ^ throttling
   *          |
   *    cold  +                  /
   * interval |                 /.
   *          |                / .
   *          |               /  .   ← "warmup period" is the area of the trapezoid between
   *          |              /   .     thresholdPermits and maxPermits
   *          |             /    .
   *          |            /     .
   *          |           /      .
   *   stable +----------/  WARM .
   * interval |          .   UP  .
   *          |          . PERIOD.
   *          |          .       .
   *        0 +----------+-------+--------------→ storedPermits
   *          0 thresholdPermits maxPermits
   * </pre>

主要涉及如下幾個公式

coldInterval = coldFactor * stableInterval.
thresholdPermits = 0.5 * warmupPeriod / stableInterval
maxPermits = thresholdPermits + 2 * warmupPeriod / (stableInterval + coldInterval)
  • coldFactor默認是3
  • stableInterval代碼以毫秒計算含滴,即stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond

小結(jié)

  • Guava的RateLimiter(SmoothRateLimiter)基于token bucket算法實現(xiàn)诱渤,具體有兩個實現(xiàn)類,分別是SmoothBursty以及SmoothWarmingUp
  • SmoothBursty初始化的storedPermits為0谈况,可以支持burst到maxPermits
  • SmoothWarmingUp初始化的storedPermits為maxPermits(thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros))勺美,也支持burst,但是總體相對平滑

doc

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
  • 序言:七十年代末碑韵,一起剝皮案震驚了整個濱河市赡茸,隨后出現(xiàn)的幾起案子,更是在濱河造成了極大的恐慌祝闻,老刑警劉巖占卧,帶你破解...
    沈念sama閱讀 218,546評論 6 507
  • 序言:濱河連續(xù)發(fā)生了三起死亡事件,死亡現(xiàn)場離奇詭異联喘,居然都是意外死亡华蜒,警方通過查閱死者的電腦和手機,發(fā)現(xiàn)死者居然都...
    沈念sama閱讀 93,224評論 3 395
  • 文/潘曉璐 我一進店門豁遭,熙熙樓的掌柜王于貴愁眉苦臉地迎上來叭喜,“玉大人,你說我怎么就攤上這事堤框∮蚶模” “怎么了纵柿?”我有些...
    開封第一講書人閱讀 164,911評論 0 354
  • 文/不壞的土叔 我叫張陵,是天一觀的道長启绰。 經(jīng)常有香客問我昂儒,道長,這世上最難降的妖魔是什么委可? 我笑而不...
    開封第一講書人閱讀 58,737評論 1 294
  • 正文 為了忘掉前任渊跋,我火速辦了婚禮,結(jié)果婚禮上着倾,老公的妹妹穿的比我還像新娘拾酝。我一直安慰自己,他們只是感情好卡者,可當我...
    茶點故事閱讀 67,753評論 6 392
  • 文/花漫 我一把揭開白布蒿囤。 她就那樣靜靜地躺著,像睡著了一般崇决。 火紅的嫁衣襯著肌膚如雪材诽。 梳的紋絲不亂的頭發(fā)上,一...
    開封第一講書人閱讀 51,598評論 1 305
  • 那天恒傻,我揣著相機與錄音脸侥,去河邊找鬼。 笑死盈厘,一個胖子當著我的面吹牛睁枕,可吹牛的內(nèi)容都是我干的。 我是一名探鬼主播沸手,決...
    沈念sama閱讀 40,338評論 3 418
  • 文/蒼蘭香墨 我猛地睜開眼外遇,長吁一口氣:“原來是場噩夢啊……” “哼!你這毒婦竟也來了罐氨?” 一聲冷哼從身側(cè)響起臀规,我...
    開封第一講書人閱讀 39,249評論 0 276
  • 序言:老撾萬榮一對情侶失蹤,失蹤者是張志新(化名)和其女友劉穎栅隐,沒想到半個月后,有當?shù)厝嗽跇淞掷锇l(fā)現(xiàn)了一具尸體玩徊,經(jīng)...
    沈念sama閱讀 45,696評論 1 314
  • 正文 獨居荒郊野嶺守林人離奇死亡租悄,尸身上長有42處帶血的膿包…… 初始之章·張勛 以下內(nèi)容為張勛視角 年9月15日...
    茶點故事閱讀 37,888評論 3 336
  • 正文 我和宋清朗相戀三年,在試婚紗的時候發(fā)現(xiàn)自己被綠了恩袱。 大學(xué)時的朋友給我發(fā)了我未婚夫和他白月光在一起吃飯的照片泣棋。...
    茶點故事閱讀 40,013評論 1 348
  • 序言:一個原本活蹦亂跳的男人離奇死亡,死狀恐怖畔塔,靈堂內(nèi)的尸體忽然破棺而出潭辈,到底是詐尸還是另有隱情鸯屿,我是刑警寧澤,帶...
    沈念sama閱讀 35,731評論 5 346
  • 正文 年R本政府宣布把敢,位于F島的核電站寄摆,受9級特大地震影響,放射性物質(zhì)發(fā)生泄漏修赞。R本人自食惡果不足惜婶恼,卻給世界環(huán)境...
    茶點故事閱讀 41,348評論 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一處隱蔽的房頂上張望柏副。 院中可真熱鬧勾邦,春花似錦、人聲如沸割择。這莊子的主人今日做“春日...
    開封第一講書人閱讀 31,929評論 0 22
  • 文/蒼蘭香墨 我抬頭看了看天上的太陽荔泳。三九已至铅歼,卻和暖如春,著一層夾襖步出監(jiān)牢的瞬間换可,已是汗流浹背椎椰。 一陣腳步聲響...
    開封第一講書人閱讀 33,048評論 1 270
  • 我被黑心中介騙來泰國打工, 沒想到剛下飛機就差點兒被人妖公主榨干…… 1. 我叫王不留沾鳄,地道東北人慨飘。 一個月前我還...
    沈念sama閱讀 48,203評論 3 370
  • 正文 我出身青樓,卻偏偏與公主長得像译荞,于是被迫代替她去往敵國和親瓤的。 傳聞我的和親對象是個殘疾皇子,可洞房花燭夜當晚...
    茶點故事閱讀 44,960評論 2 355

推薦閱讀更多精彩內(nèi)容