RateLimiter源碼解析

  • 2021 年 3 月 14 日
  • 筆記

RateLimiterGuava包提供的限流器,採用了令牌桶算法,特定是均勻地向桶中添加令牌,每次消費時也必須持有令牌,否則就需要等待。應用場景之一是限制消息消費的速度,避免消息消費過快而對下游的數據庫造成較大的壓力。
本文主要介紹RateLimiter的源碼,包括他的基本限流器SmoothBursty,以及帶預熱效果的SmoothWarmingUp

RateLimiter作為限流器的頂層類,只有兩個屬性:

  private final SleepingStopwatch stopwatch;
  private volatile Object mutexDoNotUseDirectly;

stopwatch用來計算時間間隔,以及實現了當拿不到令牌時將線程阻塞的功能;mutexDoNotUseDirectly主要用來進行線程同步。
RateLimiter作為一個抽象類,本身不能直接實例化,可以使用靜態工廠方法來創建:

 public static RateLimiter create(double permitsPerSecond);  //①
 public static RateLimiter create(double permitsPerSecond, Duration warmupPeriod);  //②
 public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)  //③

RateLimiter對外提供了3個構造器,分成兩類,構造器①是第一類,底層會創建基本限流器SmoothBursty;構造器②和③是第二類,底層會創建帶warm up效果的SmoothWarmingUp。參數permitsPerSecond表示每秒產生多少個令牌,參數warmupPeriod是限流器warm up階段的時間,即限流器產生令牌從最慢到最快所需要的時間,參數unitwarm up的時間單位。
SmoothRateLimiter新增了4個屬性:

  //桶中存儲的令牌數
  double storedPermits;
  //桶中允許的最大令牌數
  double maxPermits;
  //穩定狀態下產生令牌是速度,其值為1/permitsPerSecond
  double stableIntervalMicros;
  //下一次請求需要等待的時間
  private long nextFreeTicketMicros = 0L; // could be either in the past or future

這其中比較有意思的是nextFreeTicketMicros字段,它表示下一次獲取令牌的請求到來時需要等待的時間,該字段可以實現上一次獲取令牌的請求預支的等待時間由下一次請求來兌現。
接下來先介紹下SmoothBursty的構造過程:

public static RateLimiter create(double permitsPerSecond) {
	return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}

static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
	RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
	rateLimiter.setRate(permitsPerSecond);
	return rateLimiter;
}

構造SmoothBursty時出傳入了兩個參數,stopwatch好理解,第二個參數意思是當限流器長時間沒用時,令牌桶內最多存儲多少秒的令牌,這裡限定了最多只存儲1秒鐘的令牌,也就是permitsPerSecond個。
我們繼續分析setRate方法的實現:

  public final void setRate(double permitsPerSecond) {
    checkArgument(
        permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
    synchronized (mutex()) {
      doSetRate(permitsPerSecond, stopwatch.readMicros());
    }
  }

setRate方法先校驗permitsPerSecond必須為整數,然後在同步塊中執行doSetRate方法。mutex方法通過雙重檢測的方式實例化mutexDoNotUseDirectly字段,詳細代碼略去,doSetRate是抽象方法,其具體的實現在抽象子類SmoothRateLimiter中:

  final void doSetRate(double permitsPerSecond, long nowMicros) {
    resync(nowMicros);
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
  }

doSetRate方法主要是設置了stableIntervalMicros字段,調用的兩個方法resync和重載方法doSetRate我們接着分析。resync方法主要用來設置storedPermitsnextFreeTicketMicros這倆字段,代碼如下:

  void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {
      //計算超過的這些時間裏產生了多少新的令牌
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      //重新計算當前令牌桶內持有的令牌數
      storedPermits = min(maxPermits, storedPermits + newPermits);
      //更新下次准許獲取令牌的時間為當前時間
      nextFreeTicketMicros = nowMicros;
    }
  }

此方法會根據當前的時間決定是否進行字段賦值,如果當前時間已經超過了nextFreeTicketMicros的值,那麼就重新計算storedPermitsnextFreeTicketMicros字段,其中計算storedPermits的代碼雖然容易理解,但是思路挺巧妙。一般來說,令牌桶算法的令牌需要以固定的速率進行添加,那麼很自然想到可以起一個任務,按照一定的速度產生令牌,但是起一個新任務會佔用一定的資源,從而加重系統的負擔,此處的實現是根據利用時間差來計算這段時間產生的令牌數,以簡單的計算完成了新任務需要做的事情,開銷大大減少了。coolDownIntervalMicros方法是抽象方法,在SmoothBurstySmoothWarmingUp有不同的實現,在SmoothBursty的實現是直接返回stableIntervalMicros字段,這個字段目前還沒設置過值,取默認值0.0,這裡double的除零操作並不會拋異常,而是會返回無窮大。
我們接着看一下doSetRate方法,這也是個抽象方法,在SmoothBursty的實現如下:

    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
      double oldMaxPermits = this.maxPermits;
      maxPermits = maxBurstSeconds * permitsPerSecond;
      if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = maxPermits;
      } else {
        storedPermits =
            (oldMaxPermits == 0.0)
                ? 0.0 // initial state
                : storedPermits * maxPermits / oldMaxPermits;
      }
    }

maxPermits在此之前並沒有設置過值,因此默認是0.0,這裡只是將storedPermits初始化成了0。不過這裡的代碼也說明,在執行期間maxPermits是可以在其他地方被修改的,如果出現了更改,就會等比例修改storedPermits的值。
到這裡SmoothBursty的初始化過程就結束了,大體上是將內部的字段賦予了初始值。我們接下來看看SmoothBursty的使用:

  public double acquire() {
    return acquire(1);
  }

  public double acquire(int permits) {
    long microsToWait = reserve(permits);
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
  }

acquire方法用於從令牌桶中獲取令牌,參數permits表示需要獲取的令牌數量,如果當前沒辦法拿到需要的令牌,線程會阻塞一段時間,該方法返回等待的時間,reserve的實現如下:

  final long reserve(int permits) {
    checkPermits(permits);
    synchronized (mutex()) {
      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
  }

  final long reserveAndGetWaitLength(int permits, long nowMicros) {
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    //返回等待時間,如果不需要等待,返回0
    return max(momentAvailable - nowMicros, 0);
  }

  final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    resync(nowMicros);
    long returnValue = nextFreeTicketMicros;
    //取可用的令牌與需要的令牌兩者的最小值
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    //計算該次請求超出的令牌數
    double freshPermits = requiredPermits - storedPermitsToSpend;
    long waitMicros =
        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
            + (long) (freshPermits * stableIntervalMicros);

    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
    //扣減令牌桶庫存
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
  }

reserve的核心邏輯在reserveEarliestAvailable方法中,該方法的主要思想是檢查當前令牌桶內令牌數是否滿足需求,如果滿足則不需要額外的等待時間,否則需要將額外等待時間追加到nextFreeTicketMicros,需要注意的是方法返回的不是更新過後的nextFreeTicketMicros,而是上一次請求更新的時間,這個時間就是當前線程需要阻塞的時間,也就是說,當前請求所需要等待的時間是由下次請求完成的,下次請求需要的等待時間由下下次請求完成,以此類推。當前請求的令牌數超過令牌桶中的令牌數越多,下次請求需要等待的時間就越長。並且這裡並沒有對requiredPermits的上限做檢查,這就允許預支令牌,即假設桶的上限是100個令牌,一次請求可以允許超過100個令牌,只是生成多餘令牌的時間需要算到下一個請求上。同時這裡的邏輯也說明,獲取令牌是直接成功的,只是獲取完令牌後才需要一小段等待時間。
到這裡SmoothBursty的初始化以及獲取令牌的所有邏輯就介紹完了,接下來看看另一個類SmoothWarmingUp的源碼。

  static RateLimiter create(
      double permitsPerSecond,
      long warmupPeriod,
      TimeUnit unit,
      double coldFactor,
      SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
  }

我們之前介紹的另外領個構造器的底層調用的是這個包級的create方法,該方法的5個參數中,只有coldFactor是新出現的,字面意思是冷啟動因子,源碼寫死了是3.0,該值表示指在warm up階段開始時,以多大的速率產生令牌,速率是穩定速率的三分之一,冷啟動階段結束後恢復到正常速率。
setRate方法底層會調用如下的doSetRate方法:

  final void doSetRate(double permitsPerSecond, long nowMicros) {
    resync(nowMicros);
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
  }

   void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
	  double oldMaxPermits = maxPermits;
	  //設置冷啟動生成令牌的間隔是正常值的3倍(codeFactor固定為3)
	  double coldIntervalMicros = stableIntervalMicros * coldFactor;
	  thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
	  maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
	  //slope是梯形部分斜線的斜率
	  slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
	  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
	    // if we don't special-case this, we would get storedPermits == NaN, below
	    storedPermits = 0.0;
	  } else {
	    storedPermits =
	        (oldMaxPermits == 0.0)
	            ? maxPermits // initial state is cold
	            : storedPermits * maxPermits / oldMaxPermits;
	  }
	}


doSetRate的代碼不容易理解,源碼中利用圖示介紹了幾個變量之間的關係(但是本人仍然不是很理解,因此只能將結論放在這裡,無法進行更多解釋),如圖所示,源碼注釋中說明了如下的兩個等式:

  • 梯形的面積等於預熱時間warmupPeriodMicros
warmupPeriodMicros = 0.5 * (coldIntervalMicros + stableIntervalMicros) * (maxPermits - thresholdPermits)

由此可以得到maxPermits的值:

maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
  • 左邊矩形的面積是梯形面積的一半,由此可知:
warmupPeriodMicros * 0.5 = thresholdPermits * stableIntervalMicros

計算出thresholdPermits的值為:

thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros

SmoothWarmingUp的初始化邏輯到這裡就結束了,接下來介紹下它獲取令牌的流程,acquire方法的其他部分上文已經結束過,此處重點介紹storedPermitsToWaitTime方法:

    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
      //存儲的令牌數量超出thresholdPermits的部分,這部分反應在梯形區域
      double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
      long micros = 0;
      // measuring the integral on the right part of the function (the climbing line)
      if (availablePermitsAboveThreshold > 0.0) {
      	//permitsAboveThresholdToTake表示梯形區域的高
        double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
        //length計算的是梯形的上底+下底
        double length =
            permitsToTime(availablePermitsAboveThreshold)
                + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
        //梯形區域的面積,即生產梯形區域的令牌數所需要的時間
        micros = (long) (permitsAboveThresholdToTake * length / 2.0);
        //扣除掉需要消耗的梯形區域的令牌數,表示還需要從左側矩形區域取得的令牌數量
        permitsToTake -= permitsAboveThresholdToTake;
      }
      // measuring the integral on the left part of the function (the horizontal line)
      //等待時間=梯形區域的時間+矩形區域的時間
      micros += (long) (stableIntervalMicros * permitsToTake);
      return micros;
    }

    //由前文可知,slope = =y/x = 產生令牌間隔/令牌數,permits * slope表示產生令牌間隔的增量,加上stableIntervalMicros表示梯形的底
    private double permitsToTime(double permits) {
      return stableIntervalMicros + permits * slope;
    }


此處的storedPermitsToWaitTimeSmoothBursty中的實現大不相同,SmoothBursty由於不需要預熱,可以直接獲取桶中的令牌,因此直接返回了0,而此處存在預熱階段,不能直接獲取到令牌,因此計算邏輯稍微複雜些,總體來說,就是求圖中陰影部分的面積。