統一流控服務開源:基於.Net Core的流控服務

  • 2019 年 10 月 3 日
  • 筆記

先前有一篇博文,梳理了流控服務的場景、業界做法和常用演算法

統一流控服務開源-1:場景&業界做法&演算法篇

最近完成了流控服務的開發,並在生產系統進行了大半年的驗證,穩定可靠。今天整理一下核心設計和實現思路,開源到Github上,分享給大家

     https://github.com/zhouguoqing/FlowControl

 一、令牌桶演算法實現

  先回顧一下令牌桶演算法示意圖

  

  

  隨著時間流逝,系統會按恆定1/QPS時間間隔(如果QPS=100,則間隔是10ms) 往桶里加入Token(想像和漏洞漏水相反,有個水龍頭在不斷的加水),

 

  如果桶已經滿了就不再加了. 新請求來臨時, 會各自拿走一個Token,如果沒有Token可拿了就阻塞或者拒絕服務.

 

  令牌添加速度支援動態變化,實時控制處理的速率.

  令牌桶有兩個關鍵的屬性:令牌桶容量(大小)和時間間隔,

  有兩個關鍵操作,從令牌桶中取Token;令牌桶定時的Reset重置。

  我們看TokenBucket類:

using System;    namespace CZ.FlowControl.Service  {      using CZ.FlowControl.Spi;      /// <summary>      /// 令牌桶      /// </summary>      public abstract class TokenBucket : IThrottleStrategy      {          protected long bucketTokenCapacity;          private static readonly object syncRoot = new object();          protected readonly long ticksRefillInterval;          protected long nextRefillTime;            //number of tokens in the bucket          protected long tokens;            protected TokenBucket(long bucketTokenCapacity, long refillInterval, long refillIntervalInMilliSeconds)          {              if (bucketTokenCapacity <= 0)                  throw new ArgumentOutOfRangeException("bucketTokenCapacity", "bucket token capacity can not be negative");              if (refillInterval < 0)                  throw new ArgumentOutOfRangeException("refillInterval", "Refill interval cannot be negative");              if (refillIntervalInMilliSeconds <= 0)                  throw new ArgumentOutOfRangeException("refillIntervalInMilliSeconds", "Refill interval in milliseconds cannot be negative");                this.bucketTokenCapacity = bucketTokenCapacity;              ticksRefillInterval = TimeSpan.FromMilliseconds(refillInterval * refillIntervalInMilliSeconds).Ticks;          }            /// <summary>          /// 是否流控          /// </summary>          /// <param name="n"></param>          /// <returns></returns>          public bool ShouldThrottle(long n = 1)          {              TimeSpan waitTime;              return ShouldThrottle(n, out waitTime);          }          public bool ShouldThrottle(long n, out TimeSpan waitTime)          {              if (n <= 0) throw new ArgumentOutOfRangeException("n", "Should be positive integer");                lock (syncRoot)              {                  UpdateTokens();                  if (tokens < n)                  {                      var timeToIntervalEnd = nextRefillTime - SystemTime.UtcNow.Ticks;                      if (timeToIntervalEnd < 0) return ShouldThrottle(n, out waitTime);                        waitTime = TimeSpan.FromTicks(timeToIntervalEnd);                      return true;                  }                  tokens -= n;                    waitTime = TimeSpan.Zero;                  return false;              }          }            /// <summary>          /// 更新令牌          /// </summary>          protected abstract void UpdateTokens();            public bool ShouldThrottle(out TimeSpan waitTime)          {              return ShouldThrottle(1, out waitTime);          }            public long CurrentTokenCount          {              get              {                  lock (syncRoot)                  {                      UpdateTokens();                      return tokens;                  }              }          }      }  }

 這個抽象類中,將UpdateToken作為抽象方法暴露出來,給實現類更多的靈活去控制令牌桶重置操作。基於此實現了“固定令牌桶”FixedTokenBucket

    /// <summary>      /// 固定令牌桶      /// </summary>      class FixedTokenBucket : TokenBucket      {          public FixedTokenBucket(long maxTokens, long refillInterval, long refillIntervalInMilliSeconds)              : base(maxTokens, refillInterval, refillIntervalInMilliSeconds)          {          }            protected override void UpdateTokens()          {              var currentTime = SystemTime.UtcNow.Ticks;                if (currentTime < nextRefillTime)                  return;                tokens = bucketTokenCapacity;              nextRefillTime = currentTime + ticksRefillInterval;          }      }

   固定令牌桶在每次取Token時,都要執行方法ShouldThrottle。這個方法中:

   並發取Token是執行緒安全的,這個地方用了Lock控制,損失了一部分性能。同時每次獲取可用Token的時候,都會實時Check一下是否需要到達Reset令牌桶的時間。

   獲取到可用令牌後,令牌桶中令牌的數量-1。如果沒有足夠的可用令牌,則返回等待到下次Reset令牌桶的時間。如下程式碼:

        public bool ShouldThrottle(long n, out TimeSpan waitTime)          {              if (n <= 0) throw new ArgumentOutOfRangeException("n", "Should be positive integer");                lock (syncRoot)              {                  UpdateTokens();                  if (tokens < n)                  {                      var timeToIntervalEnd = nextRefillTime - SystemTime.UtcNow.Ticks;                      if (timeToIntervalEnd < 0) return ShouldThrottle(n, out waitTime);                        waitTime = TimeSpan.FromTicks(timeToIntervalEnd);                      return true;                  }                  tokens -= n;                    waitTime = TimeSpan.Zero;                  return false;              }          }

   以上就是令牌桶演算法的實現。我們繼續看漏桶演算法。

 二、漏桶演算法實現

  首先回顧一下漏桶演算法的原理:

  

  

  水(請求)先進入到漏桶里,漏桶以一定的速度出水(介面有響應速率),

 

  當水流入速度過大會直接溢出(訪問頻率超過介面響應速率), 然後就拒絕請求,

 

  可以看出漏桶演算法能強行限制數據的傳輸速率.

 

  有兩個變數:

 

  • 一個是桶的大小,支援流量突發增多時可以存多少的水(burst),
  • 另一個是水桶漏洞的大小(rate)。

 

   漏桶抽象類:LeakTokenBucket,繼承與令牌桶抽象父類 TokenBucket,說明了獲取令牌(漏出令牌)在底層的方式是一致的,不一樣的是重置令牌的方式(務必理解這一點)

using System;    namespace CZ.FlowControl.Service  {      /// <summary>      /// 漏桶      /// </summary>      abstract class LeakyTokenBucket : TokenBucket      {          protected readonly long stepTokens;          protected long ticksStepInterval;            protected LeakyTokenBucket(long maxTokens, long refillInterval, int refillIntervalInMilliSeconds,              long stepTokens, long stepInterval, int stepIntervalInMilliseconds)              : base(maxTokens, refillInterval, refillIntervalInMilliSeconds)          {              this.stepTokens = stepTokens;              if (stepInterval < 0) throw new ArgumentOutOfRangeException("stepInterval", "Step interval cannot be negative");              if (stepTokens < 0) throw new ArgumentOutOfRangeException("stepTokens", "Step tokens cannot be negative");              if (stepIntervalInMilliseconds <= 0) throw new ArgumentOutOfRangeException("stepIntervalInMilliseconds", "Step interval in milliseconds cannot be negative");                ticksStepInterval = TimeSpan.FromMilliseconds(stepInterval * stepIntervalInMilliseconds).Ticks;          }      }  }

    可以看出,漏桶是在令牌桶的基礎上增加了二個重要的屬性:這兩個屬性決定了重置令牌桶的方式

    stepTokens:每間隔時間內漏的數量

    ticksStepInterval:漏的間隔時間

    舉個例子:TPS 100,即每秒漏出100個Token,stepTokens =100, ticksStepInterval=1000ms

    漏桶的具體實現有兩種:空桶和滿桶

    StepDownTokenBucket 滿桶:即一把將令牌桶填充滿

using System;    namespace CZ.FlowControl.Service  {      /// <summary>      /// 漏桶(滿桶)      /// </summary>      /// <remarks>      /// StepDownLeakyTokenBucketStrategy resembles a bucket which has been filled with tokens at the beginning but subsequently leaks tokens at a fixed interval      /// </remarks>      class StepDownTokenBucket : LeakyTokenBucket      {          public StepDownTokenBucket(long maxTokens, long refillInterval, int refillIntervalInMilliSeconds, long stepTokens, long stepInterval, int stepIntervalInMilliseconds) : base(maxTokens, refillInterval, refillIntervalInMilliSeconds, stepTokens, stepInterval, stepIntervalInMilliseconds)          {          }            protected override void UpdateTokens()          {              var currentTime = SystemTime.UtcNow.Ticks;                if (currentTime >= nextRefillTime)              {                  //set tokens to max                  tokens = bucketTokenCapacity;                    //compute next refill time                  nextRefillTime = currentTime + ticksRefillInterval;                  return;              }                //calculate max tokens possible till the end              var timeToNextRefill = nextRefillTime - currentTime;              var stepsToNextRefill = timeToNextRefill/ticksStepInterval;                var maxPossibleTokens = stepsToNextRefill*stepTokens;                if ((timeToNextRefill%ticksStepInterval) > 0) maxPossibleTokens += stepTokens;              if (maxPossibleTokens < tokens) tokens = maxPossibleTokens;          }      }  }

View Code

   StepUpLeakyTokenBucket 空桶:即每次只將stepTokens個數的令牌放到桶中   

 1 using System;   2   3 namespace CZ.FlowControl.Service   4 {   5     /// <summary>   6     /// 漏桶(空桶)   7     /// </summary>   8     /// <remarks>   9     ///  StepUpLeakyTokenBucketStrategy resemembles an empty bucket at the beginning but get filled will tokens over a fixed interval.  10     /// </remarks>  11     class StepUpLeakyTokenBucket : LeakyTokenBucket  12     {  13         private long lastActivityTime;  14  15         public StepUpLeakyTokenBucket(long maxTokens, long refillInterval, int refillIntervalInMilliSeconds, long stepTokens, long stepInterval, int stepIntervalInMilliseconds)  16             : base(maxTokens, refillInterval, refillIntervalInMilliSeconds, stepTokens, stepInterval, stepIntervalInMilliseconds)  17         {  18         }  19  20         protected override void UpdateTokens()  21         {  22             var currentTime = SystemTime.UtcNow.Ticks;  23  24             if (currentTime >= nextRefillTime)  25             {  26                 tokens = stepTokens;  27  28                 lastActivityTime = currentTime;  29                 nextRefillTime = currentTime + ticksRefillInterval;  30  31                 return;  32             }  33  34             //calculate tokens at current step  35  36             long elapsedTimeSinceLastActivity = currentTime - lastActivityTime;  37             long elapsedStepsSinceLastActivity = elapsedTimeSinceLastActivity / ticksStepInterval;  38  39             tokens += (elapsedStepsSinceLastActivity*stepTokens);  40  41             if (tokens > bucketTokenCapacity) tokens = bucketTokenCapacity;  42             lastActivityTime = currentTime;  43         }  44     }  45 }

View Code

 三、流控服務封裝

  第二章節,詳細介紹了令牌桶和漏桶的具體實現。基於以上,要重點介紹介面:IThrottleStrategy:流控的具體方式

using System;    namespace CZ.FlowControl.Spi  {      /// <summary>      /// 流量控制演算法策略      /// </summary>      public interface IThrottleStrategy      {          /// <summary>          /// 是否流控          /// </summary>          /// <param name="n"></param>          /// <returns></returns>          bool ShouldThrottle(long n = 1);            /// <summary>          /// 是否流控          /// </summary>          /// <param name="n"></param>          /// <param name="waitTime"></param>          /// <returns></returns>          bool ShouldThrottle(long n, out TimeSpan waitTime);            /// <summary>          /// 是否流控          /// </summary>          /// <param name="waitTime"></param>          /// <returns></returns>          bool ShouldThrottle(out TimeSpan waitTime);            /// <summary>          /// 當前令牌個數          /// </summary>          long CurrentTokenCount { get; }      }  }

    有了這個流控方式介面後,我們還需要一個流控策略定義類:FlowControlStrategy

    即定義具體的流控策略:以下是這個類的詳細屬性和成員:  不僅定義了流控策略類型,還定義了流控的維度資訊和流控閾值,這樣流控就做成依賴注入的方式了! 

using System;  using System.Collections.Generic;  using System.Text;    namespace CZ.FlowControl.Spi  {      /// <summary>      /// 流控策略      /// </summary>      public class FlowControlStrategy      {          /// <summary>          /// 標識          /// </summary>          public string ID { get; set; }            /// <summary>          /// 名稱          /// </summary>          public string Name { get; set; }            /// <summary>          /// 流控策略類型          /// </summary>          public FlowControlStrategyType StrategyType { get; set; }            /// <summary>          /// 流控閾值-Int          /// </summary>          public long IntThreshold { get; set; }            /// <summary>          /// 流控閾值-Double          /// </summary>          public decimal DoubleThreshold { get; set; }            /// <summary>          /// 時間區間跨度          /// </summary>          public FlowControlTimespan TimeSpan { get; set; }            private Dictionary<string, string> flowControlConfigs;            /// <summary>          /// 流控維度資訊          /// </summary>          public Dictionary<string, string> FlowControlConfigs          {              get              {                  if (flowControlConfigs == null)                      flowControlConfigs = new Dictionary<string, string>();                    return flowControlConfigs;              }              set              {                  flowControlConfigs = value;              }          }            /// <summary>          /// 描述          /// </summary>          public string Descriptions { get; set; }            /// <summary>          /// 觸發流控後是否直接拒絕請求          /// </summary>          public bool IsRefusedRequest { get; set; }            /// <summary>          /// 創建時間          /// </summary>          public DateTime CreateTime { get; set; }            /// <summary>          /// 創建人          /// </summary>          public string Creator { get; set; }            /// <summary>          /// 最後修改時間          /// </summary>          public DateTime LastModifyTime { get; set; }            /// <summary>          /// 最後修改人          /// </summary>          public string LastModifier { get; set; }      }  }

   同時,流控策略類型,我們抽象了一個枚舉:FlowControlStrategyType

   支援3種流控策略:TPS、Sum(指定時間段內請求的次數),Delay延遲

using System;  using System.Collections.Generic;  using System.Text;    namespace CZ.FlowControl.Spi  {      /// <summary>      /// 流控策略類型枚舉      /// </summary>      public enum FlowControlStrategyType      {          /// <summary>          /// TPS控制策略          /// </summary>          TPS,       /// <summary>          /// 總數控制策略          /// </summary>          Sum,            /// <summary>          /// 延遲控制策略          /// </summary>          Delay      }  }

  面向每種流控策略類型,提供了一個對應的流控器,比如說TPS的流控器

TPSFlowController,內部使用了固定令牌桶演算法
using System;    namespace CZ.FlowControl.Service  {      using CZ.FlowControl.Spi;        /// <summary>      /// TPS流量控制器      /// </summary>      class TPSFlowController : IFlowController      {          public IThrottleStrategy InnerThrottleStrategy          {              get; private set;          }            public FlowControlStrategy FlowControlStrategy { get; private set; }            public bool ShouldThrottle(long n, out TimeSpan waitTime)          {              return InnerThrottleStrategy.ShouldThrottle(n, out waitTime);          }            public TPSFlowController(FlowControlStrategy strategy)          {              FlowControlStrategy = strategy;                InnerThrottleStrategy = new FixedTokenBucket(strategy.IntThreshold, 1, 1000);          }      }  }

  Sum(指定時間段內請求的次數)流控器:

  

using System;  using System.Collections.Generic;  using System.IO;  using System.Linq;  using System.Text;    namespace CZ.FlowControl.Service  {      using CZ.FlowControl.Spi;        /// <summary>      /// 一段時間內合計值流量控制器      /// </summary>      class SumFlowController : IFlowController      {          public IThrottleStrategy InnerThrottleStrategy          {              get; private set;          }            public FlowControlStrategy FlowControlStrategy { get; private set; }            public bool ShouldThrottle(long n, out TimeSpan waitTime)          {              return InnerThrottleStrategy.ShouldThrottle(n, out waitTime);          }            public SumFlowController(FlowControlStrategy strategy)          {              FlowControlStrategy = strategy;                var refillInterval = GetTokenBucketRefillInterval(strategy);                InnerThrottleStrategy = new FixedTokenBucket(strategy.IntThreshold, refillInterval, 1000);          }            private long GetTokenBucketRefillInterval(FlowControlStrategy strategy)          {              long refillInterval = 0;                switch (strategy.TimeSpan)              {                  case FlowControlTimespan.Second:                      refillInterval = 1;                      break;                  case FlowControlTimespan.Minute:                      refillInterval = 60;                      break;                  case FlowControlTimespan.Hour:                      refillInterval = 60 * 60;                      break;                  case FlowControlTimespan.Day:                      refillInterval = 24 * 60 * 60;                      break;              }                return refillInterval;          }      }  }

  同時,通過一個創建者工廠,根據不同的流控策略,創建對應的流控器(做了一層快取,性能更好):

using System;  using System.Collections.Generic;  using System.Text;    namespace CZ.FlowControl.Service  {      using CZ.FlowControl.Spi;        /// <summary>      /// 流控策略工廠      /// </summary>      class FlowControllerFactory      {          private static Dictionary<string, IFlowController> fcControllers;          private static object syncObj = new object();            private static FlowControllerFactory instance;            private FlowControllerFactory()          {              fcControllers = new Dictionary<string, IFlowController>();          }            public static FlowControllerFactory GetInstance()          {              if (instance == null)              {                  lock (syncObj)                  {                      if (instance == null)                      {                          instance = new FlowControllerFactory();                      }                  }              }                return instance;          }            public IFlowController GetOrCreateFlowController(FlowControlStrategy strategy)          {              if (strategy == null)                  throw new ArgumentNullException("FlowControllerFactory.GetOrCreateFlowController.strategy");                if (!fcControllers.ContainsKey(strategy.ID))              {                  lock (syncObj)                  {                      if (!fcControllers.ContainsKey(strategy.ID))                      {                          var fcController = CreateFlowController(strategy);                          if (fcController != null)                              fcControllers.Add(strategy.ID, fcController);                      }                  }              }                if (fcControllers.ContainsKey(strategy.ID))              {                  var controller = fcControllers[strategy.ID];                  return controller;              }                return null;          }            private IFlowController CreateFlowController(FlowControlStrategy strategy)          {              if (strategy == null)                  throw new ArgumentNullException("FlowControllerFactory.CreateFlowController.strategy");                IFlowController controller = null;                switch (strategy.StrategyType)              {                  case FlowControlStrategyType.TPS:                      controller = new TPSFlowController(strategy);                      break;                  case FlowControlStrategyType.Delay:                      controller = new DelayFlowController(strategy);                      break;                  case FlowControlStrategyType.Sum:                      controller = new SumFlowController(strategy);                      break;                  default:                      break;              }                return controller;          }      }  }

 

   有了流控策略定義、我們更進一步,繼續封裝了流控Facade服務,這樣把流控的變化封裝到內部。對外只提供流控服務介面,流控時動態傳入流控策略和流控個數:FlowControlService

   

using System;  using System.Collections.Generic;  using System.Text;    namespace CZ.FlowControl.Service  {      using CZ.FlowControl.Spi;      using System.Threading;        /// <summary>      /// 統一流控服務      /// </summary>      public class FlowControlService      {          /// <summary>          /// 流控          /// </summary>          /// <param name="strategy">流控策略</param>          /// <param name="count">請求次數</param>          public static void FlowControl(FlowControlStrategy strategy, int count = 1)          {              var controller = FlowControllerFactory.GetInstance().GetOrCreateFlowController(strategy);                TimeSpan waitTimespan = TimeSpan.Zero;                var result = controller.ShouldThrottle(count, out waitTimespan);              if (result)              {                  if (strategy.IsRefusedRequest == false && waitTimespan != TimeSpan.Zero)                  {                      WaitForAvailable(strategy, controller, waitTimespan, count);                  }                  else if (strategy.IsRefusedRequest)                  {                      throw new Exception("觸發流控!");                  }              }          }            /// <summary>          /// 等待可用          /// </summary>          /// <param name="strategy">流控策略</param>          /// <param name="controller">流控器</param>          /// <param name="waitTimespan">等待時間</param>          /// <param name="count">請求次數</param>          private static void WaitForAvailable(FlowControlStrategy strategy, IFlowController controller, TimeSpan waitTimespan, int count)          {              var timespan = waitTimespan;              if (strategy.StrategyType == FlowControlStrategyType.Delay)              {                  Thread.Sleep(timespan);                  return;              }                while (controller.ShouldThrottle(count, out timespan))              {                  Thread.Sleep(timespan);              }          }      }  }

  以上,統一流控服務完成了第一個版本的封裝。接下來我們看示例程式碼

 四、示例程式碼

    先安裝Nuget:


Install-Package CZ.FlowControl.Service -Version 1.0.0


 

    

   

    是不是很簡單。

    大家如果希望了解詳細的程式碼,請參考這個項目的GitHub地址:

    https://github.com/zhouguoqing/FlowControl

    同時也歡迎大家一起改進完善。

    

 

周國慶

2019/8/9