統一流控服務開源:基於.Net Core的流控服務
- 2019 年 10 月 3 日
- 筆記
先前有一篇博文,梳理了流控服務的場景、業界做法和常用演算法
最近完成了流控服務的開發,並在生產系統進行了大半年的驗證,穩定可靠。今天整理一下核心設計和實現思路,開源到Github上,分享給大家
https://github.com/zhouguoqing/FlowControl
一、令牌桶演算法實現
先回顧一下令牌桶演算法示意圖
隨著時間流逝,系統會按恆定1/QPS時間間隔(如果QPS=100,則間隔是10ms) 往桶里加入Token(想像和漏洞漏水相反,有個水龍頭在不斷的加水),
如果桶已經滿了就不再加了. 新請求來臨時, 會各自拿走一個Token,如果沒有Token可拿了就阻塞或者拒絕服務.
令牌添加速度支援動態變化,實時控制處理的速率.
令牌桶有兩個關鍵的屬性:令牌桶容量(大小)和時間間隔,
有兩個關鍵操作,從令牌桶中取Token;令牌桶定時的Reset重置。
我們看TokenBucket類:
using System; namespace CZ.FlowControl.Service { using CZ.FlowControl.Spi; /// <summary> /// 令牌桶 /// </summary> public abstract class TokenBucket : IThrottleStrategy { protected long bucketTokenCapacity; private static readonly object syncRoot = new object(); protected readonly long ticksRefillInterval; protected long nextRefillTime; //number of tokens in the bucket protected long tokens; protected TokenBucket(long bucketTokenCapacity, long refillInterval, long refillIntervalInMilliSeconds) { if (bucketTokenCapacity <= 0) throw new ArgumentOutOfRangeException("bucketTokenCapacity", "bucket token capacity can not be negative"); if (refillInterval < 0) throw new ArgumentOutOfRangeException("refillInterval", "Refill interval cannot be negative"); if (refillIntervalInMilliSeconds <= 0) throw new ArgumentOutOfRangeException("refillIntervalInMilliSeconds", "Refill interval in milliseconds cannot be negative"); this.bucketTokenCapacity = bucketTokenCapacity; ticksRefillInterval = TimeSpan.FromMilliseconds(refillInterval * refillIntervalInMilliSeconds).Ticks; } /// <summary> /// 是否流控 /// </summary> /// <param name="n"></param> /// <returns></returns> public bool ShouldThrottle(long n = 1) { TimeSpan waitTime; return ShouldThrottle(n, out waitTime); } public bool ShouldThrottle(long n, out TimeSpan waitTime) { if (n <= 0) throw new ArgumentOutOfRangeException("n", "Should be positive integer"); lock (syncRoot) { UpdateTokens(); if (tokens < n) { var timeToIntervalEnd = nextRefillTime - SystemTime.UtcNow.Ticks; if (timeToIntervalEnd < 0) return ShouldThrottle(n, out waitTime); waitTime = TimeSpan.FromTicks(timeToIntervalEnd); return true; } tokens -= n; waitTime = TimeSpan.Zero; return false; } } /// <summary> /// 更新令牌 /// </summary> protected abstract void UpdateTokens(); public bool ShouldThrottle(out TimeSpan waitTime) { return ShouldThrottle(1, out waitTime); } public long CurrentTokenCount { get { lock (syncRoot) { UpdateTokens(); return tokens; } } } } }
這個抽象類中,將UpdateToken作為抽象方法暴露出來,給實現類更多的靈活去控制令牌桶重置操作。基於此實現了“固定令牌桶”FixedTokenBucket
/// <summary> /// 固定令牌桶 /// </summary> class FixedTokenBucket : TokenBucket { public FixedTokenBucket(long maxTokens, long refillInterval, long refillIntervalInMilliSeconds) : base(maxTokens, refillInterval, refillIntervalInMilliSeconds) { } protected override void UpdateTokens() { var currentTime = SystemTime.UtcNow.Ticks; if (currentTime < nextRefillTime) return; tokens = bucketTokenCapacity; nextRefillTime = currentTime + ticksRefillInterval; } }
固定令牌桶在每次取Token時,都要執行方法ShouldThrottle。這個方法中:
並發取Token是執行緒安全的,這個地方用了Lock控制,損失了一部分性能。同時每次獲取可用Token的時候,都會實時Check一下是否需要到達Reset令牌桶的時間。
獲取到可用令牌後,令牌桶中令牌的數量-1。如果沒有足夠的可用令牌,則返回等待到下次Reset令牌桶的時間。如下程式碼:
public bool ShouldThrottle(long n, out TimeSpan waitTime) { if (n <= 0) throw new ArgumentOutOfRangeException("n", "Should be positive integer"); lock (syncRoot) { UpdateTokens(); if (tokens < n) { var timeToIntervalEnd = nextRefillTime - SystemTime.UtcNow.Ticks; if (timeToIntervalEnd < 0) return ShouldThrottle(n, out waitTime); waitTime = TimeSpan.FromTicks(timeToIntervalEnd); return true; } tokens -= n; waitTime = TimeSpan.Zero; return false; } }
以上就是令牌桶演算法的實現。我們繼續看漏桶演算法。
二、漏桶演算法實現
首先回顧一下漏桶演算法的原理:
‘
水(請求)先進入到漏桶里,漏桶以一定的速度出水(介面有響應速率),
當水流入速度過大會直接溢出(訪問頻率超過介面響應速率), 然後就拒絕請求,
可以看出漏桶演算法能強行限制數據的傳輸速率.
有兩個變數:
- 一個是桶的大小,支援流量突發增多時可以存多少的水(burst),
- 另一個是水桶漏洞的大小(rate)。
漏桶抽象類:LeakTokenBucket,繼承與令牌桶抽象父類 TokenBucket,說明了獲取令牌(漏出令牌)在底層的方式是一致的,不一樣的是重置令牌的方式(務必理解這一點)
using System; namespace CZ.FlowControl.Service { /// <summary> /// 漏桶 /// </summary> abstract class LeakyTokenBucket : TokenBucket { protected readonly long stepTokens; protected long ticksStepInterval; protected LeakyTokenBucket(long maxTokens, long refillInterval, int refillIntervalInMilliSeconds, long stepTokens, long stepInterval, int stepIntervalInMilliseconds) : base(maxTokens, refillInterval, refillIntervalInMilliSeconds) { this.stepTokens = stepTokens; if (stepInterval < 0) throw new ArgumentOutOfRangeException("stepInterval", "Step interval cannot be negative"); if (stepTokens < 0) throw new ArgumentOutOfRangeException("stepTokens", "Step tokens cannot be negative"); if (stepIntervalInMilliseconds <= 0) throw new ArgumentOutOfRangeException("stepIntervalInMilliseconds", "Step interval in milliseconds cannot be negative"); ticksStepInterval = TimeSpan.FromMilliseconds(stepInterval * stepIntervalInMilliseconds).Ticks; } } }
可以看出,漏桶是在令牌桶的基礎上增加了二個重要的屬性:這兩個屬性決定了重置令牌桶的方式
stepTokens:每間隔時間內漏的數量
ticksStepInterval:漏的間隔時間
舉個例子:TPS 100,即每秒漏出100個Token,stepTokens =100, ticksStepInterval=1000ms
漏桶的具體實現有兩種:空桶和滿桶
StepDownTokenBucket 滿桶:即一把將令牌桶填充滿

using System; namespace CZ.FlowControl.Service { /// <summary> /// 漏桶(滿桶) /// </summary> /// <remarks> /// StepDownLeakyTokenBucketStrategy resembles a bucket which has been filled with tokens at the beginning but subsequently leaks tokens at a fixed interval /// </remarks> class StepDownTokenBucket : LeakyTokenBucket { public StepDownTokenBucket(long maxTokens, long refillInterval, int refillIntervalInMilliSeconds, long stepTokens, long stepInterval, int stepIntervalInMilliseconds) : base(maxTokens, refillInterval, refillIntervalInMilliSeconds, stepTokens, stepInterval, stepIntervalInMilliseconds) { } protected override void UpdateTokens() { var currentTime = SystemTime.UtcNow.Ticks; if (currentTime >= nextRefillTime) { //set tokens to max tokens = bucketTokenCapacity; //compute next refill time nextRefillTime = currentTime + ticksRefillInterval; return; } //calculate max tokens possible till the end var timeToNextRefill = nextRefillTime - currentTime; var stepsToNextRefill = timeToNextRefill/ticksStepInterval; var maxPossibleTokens = stepsToNextRefill*stepTokens; if ((timeToNextRefill%ticksStepInterval) > 0) maxPossibleTokens += stepTokens; if (maxPossibleTokens < tokens) tokens = maxPossibleTokens; } } }
View Code
StepUpLeakyTokenBucket 空桶:即每次只將stepTokens個數的令牌放到桶中

1 using System; 2 3 namespace CZ.FlowControl.Service 4 { 5 /// <summary> 6 /// 漏桶(空桶) 7 /// </summary> 8 /// <remarks> 9 /// StepUpLeakyTokenBucketStrategy resemembles an empty bucket at the beginning but get filled will tokens over a fixed interval. 10 /// </remarks> 11 class StepUpLeakyTokenBucket : LeakyTokenBucket 12 { 13 private long lastActivityTime; 14 15 public StepUpLeakyTokenBucket(long maxTokens, long refillInterval, int refillIntervalInMilliSeconds, long stepTokens, long stepInterval, int stepIntervalInMilliseconds) 16 : base(maxTokens, refillInterval, refillIntervalInMilliSeconds, stepTokens, stepInterval, stepIntervalInMilliseconds) 17 { 18 } 19 20 protected override void UpdateTokens() 21 { 22 var currentTime = SystemTime.UtcNow.Ticks; 23 24 if (currentTime >= nextRefillTime) 25 { 26 tokens = stepTokens; 27 28 lastActivityTime = currentTime; 29 nextRefillTime = currentTime + ticksRefillInterval; 30 31 return; 32 } 33 34 //calculate tokens at current step 35 36 long elapsedTimeSinceLastActivity = currentTime - lastActivityTime; 37 long elapsedStepsSinceLastActivity = elapsedTimeSinceLastActivity / ticksStepInterval; 38 39 tokens += (elapsedStepsSinceLastActivity*stepTokens); 40 41 if (tokens > bucketTokenCapacity) tokens = bucketTokenCapacity; 42 lastActivityTime = currentTime; 43 } 44 } 45 }
View Code
三、流控服務封裝
第二章節,詳細介紹了令牌桶和漏桶的具體實現。基於以上,要重點介紹介面:IThrottleStrategy:流控的具體方式
using System; namespace CZ.FlowControl.Spi { /// <summary> /// 流量控制演算法策略 /// </summary> public interface IThrottleStrategy { /// <summary> /// 是否流控 /// </summary> /// <param name="n"></param> /// <returns></returns> bool ShouldThrottle(long n = 1); /// <summary> /// 是否流控 /// </summary> /// <param name="n"></param> /// <param name="waitTime"></param> /// <returns></returns> bool ShouldThrottle(long n, out TimeSpan waitTime); /// <summary> /// 是否流控 /// </summary> /// <param name="waitTime"></param> /// <returns></returns> bool ShouldThrottle(out TimeSpan waitTime); /// <summary> /// 當前令牌個數 /// </summary> long CurrentTokenCount { get; } } }
有了這個流控方式介面後,我們還需要一個流控策略定義類:FlowControlStrategy
即定義具體的流控策略:以下是這個類的詳細屬性和成員: 不僅定義了流控策略類型,還定義了流控的維度資訊和流控閾值,這樣流控就做成依賴注入的方式了!
using System; using System.Collections.Generic; using System.Text; namespace CZ.FlowControl.Spi { /// <summary> /// 流控策略 /// </summary> public class FlowControlStrategy { /// <summary> /// 標識 /// </summary> public string ID { get; set; } /// <summary> /// 名稱 /// </summary> public string Name { get; set; } /// <summary> /// 流控策略類型 /// </summary> public FlowControlStrategyType StrategyType { get; set; } /// <summary> /// 流控閾值-Int /// </summary> public long IntThreshold { get; set; } /// <summary> /// 流控閾值-Double /// </summary> public decimal DoubleThreshold { get; set; } /// <summary> /// 時間區間跨度 /// </summary> public FlowControlTimespan TimeSpan { get; set; } private Dictionary<string, string> flowControlConfigs; /// <summary> /// 流控維度資訊 /// </summary> public Dictionary<string, string> FlowControlConfigs { get { if (flowControlConfigs == null) flowControlConfigs = new Dictionary<string, string>(); return flowControlConfigs; } set { flowControlConfigs = value; } } /// <summary> /// 描述 /// </summary> public string Descriptions { get; set; } /// <summary> /// 觸發流控後是否直接拒絕請求 /// </summary> public bool IsRefusedRequest { get; set; } /// <summary> /// 創建時間 /// </summary> public DateTime CreateTime { get; set; } /// <summary> /// 創建人 /// </summary> public string Creator { get; set; } /// <summary> /// 最後修改時間 /// </summary> public DateTime LastModifyTime { get; set; } /// <summary> /// 最後修改人 /// </summary> public string LastModifier { get; set; } } }
同時,流控策略類型,我們抽象了一個枚舉:FlowControlStrategyType
支援3種流控策略:TPS、Sum(指定時間段內請求的次數),Delay延遲
using System; using System.Collections.Generic; using System.Text; namespace CZ.FlowControl.Spi { /// <summary> /// 流控策略類型枚舉 /// </summary> public enum FlowControlStrategyType { /// <summary> /// TPS控制策略 /// </summary> TPS, /// <summary> /// 總數控制策略 /// </summary> Sum, /// <summary> /// 延遲控制策略 /// </summary> Delay } }
面向每種流控策略類型,提供了一個對應的流控器,比如說TPS的流控器
TPSFlowController,內部使用了固定令牌桶演算法
using System; namespace CZ.FlowControl.Service { using CZ.FlowControl.Spi; /// <summary> /// TPS流量控制器 /// </summary> class TPSFlowController : IFlowController { public IThrottleStrategy InnerThrottleStrategy { get; private set; } public FlowControlStrategy FlowControlStrategy { get; private set; } public bool ShouldThrottle(long n, out TimeSpan waitTime) { return InnerThrottleStrategy.ShouldThrottle(n, out waitTime); } public TPSFlowController(FlowControlStrategy strategy) { FlowControlStrategy = strategy; InnerThrottleStrategy = new FixedTokenBucket(strategy.IntThreshold, 1, 1000); } } }
Sum(指定時間段內請求的次數)流控器:
using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; namespace CZ.FlowControl.Service { using CZ.FlowControl.Spi; /// <summary> /// 一段時間內合計值流量控制器 /// </summary> class SumFlowController : IFlowController { public IThrottleStrategy InnerThrottleStrategy { get; private set; } public FlowControlStrategy FlowControlStrategy { get; private set; } public bool ShouldThrottle(long n, out TimeSpan waitTime) { return InnerThrottleStrategy.ShouldThrottle(n, out waitTime); } public SumFlowController(FlowControlStrategy strategy) { FlowControlStrategy = strategy; var refillInterval = GetTokenBucketRefillInterval(strategy); InnerThrottleStrategy = new FixedTokenBucket(strategy.IntThreshold, refillInterval, 1000); } private long GetTokenBucketRefillInterval(FlowControlStrategy strategy) { long refillInterval = 0; switch (strategy.TimeSpan) { case FlowControlTimespan.Second: refillInterval = 1; break; case FlowControlTimespan.Minute: refillInterval = 60; break; case FlowControlTimespan.Hour: refillInterval = 60 * 60; break; case FlowControlTimespan.Day: refillInterval = 24 * 60 * 60; break; } return refillInterval; } } }
同時,通過一個創建者工廠,根據不同的流控策略,創建對應的流控器(做了一層快取,性能更好):
using System; using System.Collections.Generic; using System.Text; namespace CZ.FlowControl.Service { using CZ.FlowControl.Spi; /// <summary> /// 流控策略工廠 /// </summary> class FlowControllerFactory { private static Dictionary<string, IFlowController> fcControllers; private static object syncObj = new object(); private static FlowControllerFactory instance; private FlowControllerFactory() { fcControllers = new Dictionary<string, IFlowController>(); } public static FlowControllerFactory GetInstance() { if (instance == null) { lock (syncObj) { if (instance == null) { instance = new FlowControllerFactory(); } } } return instance; } public IFlowController GetOrCreateFlowController(FlowControlStrategy strategy) { if (strategy == null) throw new ArgumentNullException("FlowControllerFactory.GetOrCreateFlowController.strategy"); if (!fcControllers.ContainsKey(strategy.ID)) { lock (syncObj) { if (!fcControllers.ContainsKey(strategy.ID)) { var fcController = CreateFlowController(strategy); if (fcController != null) fcControllers.Add(strategy.ID, fcController); } } } if (fcControllers.ContainsKey(strategy.ID)) { var controller = fcControllers[strategy.ID]; return controller; } return null; } private IFlowController CreateFlowController(FlowControlStrategy strategy) { if (strategy == null) throw new ArgumentNullException("FlowControllerFactory.CreateFlowController.strategy"); IFlowController controller = null; switch (strategy.StrategyType) { case FlowControlStrategyType.TPS: controller = new TPSFlowController(strategy); break; case FlowControlStrategyType.Delay: controller = new DelayFlowController(strategy); break; case FlowControlStrategyType.Sum: controller = new SumFlowController(strategy); break; default: break; } return controller; } } }
有了流控策略定義、我們更進一步,繼續封裝了流控Facade服務,這樣把流控的變化封裝到內部。對外只提供流控服務介面,流控時動態傳入流控策略和流控個數:FlowControlService
using System; using System.Collections.Generic; using System.Text; namespace CZ.FlowControl.Service { using CZ.FlowControl.Spi; using System.Threading; /// <summary> /// 統一流控服務 /// </summary> public class FlowControlService { /// <summary> /// 流控 /// </summary> /// <param name="strategy">流控策略</param> /// <param name="count">請求次數</param> public static void FlowControl(FlowControlStrategy strategy, int count = 1) { var controller = FlowControllerFactory.GetInstance().GetOrCreateFlowController(strategy); TimeSpan waitTimespan = TimeSpan.Zero; var result = controller.ShouldThrottle(count, out waitTimespan); if (result) { if (strategy.IsRefusedRequest == false && waitTimespan != TimeSpan.Zero) { WaitForAvailable(strategy, controller, waitTimespan, count); } else if (strategy.IsRefusedRequest) { throw new Exception("觸發流控!"); } } } /// <summary> /// 等待可用 /// </summary> /// <param name="strategy">流控策略</param> /// <param name="controller">流控器</param> /// <param name="waitTimespan">等待時間</param> /// <param name="count">請求次數</param> private static void WaitForAvailable(FlowControlStrategy strategy, IFlowController controller, TimeSpan waitTimespan, int count) { var timespan = waitTimespan; if (strategy.StrategyType == FlowControlStrategyType.Delay) { Thread.Sleep(timespan); return; } while (controller.ShouldThrottle(count, out timespan)) { Thread.Sleep(timespan); } } } }
以上,統一流控服務完成了第一個版本的封裝。接下來我們看示例程式碼
四、示例程式碼
先安裝Nuget:
Install-Package CZ.FlowControl.Service -Version 1.0.0
是不是很簡單。
大家如果希望了解詳細的程式碼,請參考這個項目的GitHub地址:
https://github.com/zhouguoqing/FlowControl
同時也歡迎大家一起改進完善。
周國慶
2019/8/9