ASP.NET Core 3.x 並發限制

  • 2019 年 11 月 12 日
  • 筆記

前言

Microsoft.AspNetCore.ConcurrencyLimiter AspNetCore3.0後增加的,用於傳入的請求進行排隊處理,避免線程池的不足.
我們日常開發中可能常做的給某web服務器配置連接數以及,請求隊列大小,那麼今天我們看看如何在通過中間件形式實現一個並發量以及隊列長度限制.

Queue策略

添加Nuget

Install-Package Microsoft.AspNetCore.ConcurrencyLimiter

        public void ConfigureServices(IServiceCollection services)          {              services.AddQueuePolicy(options =>              {                  //最大並發請求數                  options.MaxConcurrentRequests = 2;                  //請求隊列長度限制                  options.RequestQueueLimit = 1;              });              services.AddControllers();          }          public void Configure(IApplicationBuilder app, IWebHostEnvironment env)          {              //添加並發限制中間件              app.UseConcurrencyLimiter();              app.Run(async context =>              {                  Task.Delay(100).Wait(); // 100ms sync-over-async                    await context.Response.WriteAsync("Hello World!");              });              if (env.IsDevelopment())              {                  app.UseDeveloperExceptionPage();              }                app.UseHttpsRedirection();                app.UseRouting();                app.UseAuthorization();                app.UseEndpoints(endpoints =>              {                  endpoints.MapControllers();              });          }     

通過上面簡單的配置,我們就可以將他引入到我們的代碼中,從而做並發量限制,以及隊列的長度;那麼問題來了,他是怎麼實現的呢?

 public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)  {          services.Configure(configure);          services.AddSingleton<IQueuePolicy, QueuePolicy>();          return services;  }

QueuePolicy採用的是SemaphoreSlim信號量設計,SemaphoreSlim、Semaphore(信號量)支持並發多線程進入被保護代碼,對象在初始化時會指定 最大任務數量,當線程請求訪問資源,信號量遞減,而當他們釋放時,信號量計數又遞增。

      /// <summary>          ///     構造方法(初始化Queue策略)          /// </summary>          /// <param name="options"></param>          public QueuePolicy(IOptions<QueuePolicyOptions> options)          {              _maxConcurrentRequests = options.Value.MaxConcurrentRequests;              if (_maxConcurrentRequests <= 0)              {                  throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer.");              }                _requestQueueLimit = options.Value.RequestQueueLimit;              if (_requestQueueLimit < 0)              {                  throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number.");              }              //使用SemaphoreSlim來限制任務最大個數              _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests);          }  

ConcurrencyLimiterMiddleware中間件

        /// <summary>          /// Invokes the logic of the middleware.          /// </summary>          /// <param name="context">The <see cref="HttpContext"/>.</param>          /// <returns>A <see cref="Task"/> that completes when the request leaves.</returns>          public async Task Invoke(HttpContext context)          {              var waitInQueueTask = _queuePolicy.TryEnterAsync();                // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.              bool result;                if (waitInQueueTask.IsCompleted)              {                  ConcurrencyLimiterEventSource.Log.QueueSkipped();                  result = waitInQueueTask.Result;              }              else              {                  using (ConcurrencyLimiterEventSource.Log.QueueTimer())                  {                      result = await waitInQueueTask;                  }              }                if (result)              {                  try                  {                      await _next(context);                  }                  finally                  {                      _queuePolicy.OnExit();                  }              }              else              {                  ConcurrencyLimiterEventSource.Log.RequestRejected();                  ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);                  context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;                  await _onRejected(context);              }          }

每次當我們請求的時候首先會調用_queuePolicy.TryEnterAsync(),進入該方法後先開啟一個私有lock鎖,再接着判斷總請求量是否≥(請求隊列限制的大小+最大並發請求數),如果當前數量超出了,那麼我直接拋出,送你個503狀態;

  if (result)    {           try           {               await _next(context);           }           finally          {              _queuePolicy.OnExit();          }          }          else          {              ConcurrencyLimiterEventSource.Log.RequestRejected();              ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);              context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;              await _onRejected(context);          }  

問題來了,我這邊如果說還沒到你設置的大小呢,我這個請求沒有給你服務器造不成壓力,那麼你給我處理一下吧.
await _serverSemaphore.WaitAsync();異步等待進入信號量,如果沒有線程被授予對信號量的訪問權限,則進入執行保護代碼;否則此線程將在此處等待,直到信號量被釋放為止

 lock (_totalRequestsLock)      {          if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests)          {               return false;          }              TotalRequests++;          }          //異步等待進入信號量,如果沒有線程被授予對信號量的訪問權限,則進入執行保護代碼;否則此線程將在此處等待,直到信號量被釋放為止          await _serverSemaphore.WaitAsync();          return true;      }

返回成功後那麼中間件這邊再進行處理,_queuePolicy.OnExit();通過該調用進行調用_serverSemaphore.Release();釋放信號燈,再對總請求數遞減

Stack策略

再來看看另一種方法,棧策略,他是怎麼做的呢?一起來看看.再附加上如何使用的代碼.

     public void ConfigureServices(IServiceCollection services)          {              services.AddStackPolicy(options =>              {                  //最大並發請求數                  options.MaxConcurrentRequests = 2;                  //請求隊列長度限制                  options.RequestQueueLimit = 1;              });              services.AddControllers();          }

通過上面的配置,我們便可以對我們的應用程序執行出相應的策略.下面再來看看他是怎麼實現的呢

  public static IServiceCollection AddStackPolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)          {              services.Configure(configure);              services.AddSingleton<IQueuePolicy, StackPolicy>();              return services;          }  

可以看到這次是通過StackPolicy類做的策略.來一起來看看主要的方法

        /// <summary>          ///     構造方法(初始化參數)          /// </summary>          /// <param name="options"></param>          public StackPolicy(IOptions<QueuePolicyOptions> options)          {              //棧分配              _buffer = new List<ResettableBooleanCompletionSource>();              //隊列大小              _maxQueueCapacity = options.Value.RequestQueueLimit;              //最大並發請求數              _maxConcurrentRequests = options.Value.MaxConcurrentRequests;              //剩餘可用空間              _freeServerSpots = options.Value.MaxConcurrentRequests;          }

當我們通過中間件請求調用,_queuePolicy.TryEnterAsync()時,首先會判斷我們是否還有訪問請求次數,如果_freeServerSpots>0,那麼則直接給我們返回true,讓中間件直接去執行下一步,如果當前隊列=我們設置的隊列大小的話,那我們需要取消先前請求;每次取消都是先取消之前的保留後面的請求;

    public ValueTask<bool> TryEnterAsync()          {              lock (_bufferLock)              {                  if (_freeServerSpots > 0)                  {                      _freeServerSpots--;                      return _trueTask;                  }                  // 如果隊列滿了,取消先前的請求                  if (_queueLength == _maxQueueCapacity)                  {                      _hasReachedCapacity = true;                      _buffer[_head].Complete(false);                      _queueLength--;                  }                  var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this);                  _cachedResettableTCS = null;                  if (_hasReachedCapacity || _queueLength < _buffer.Count)                  {                      _buffer[_head] = tcs;                  }                  else                  {                      _buffer.Add(tcs);                  }                  _queueLength++;                  // increment _head for next time                  _head++;                  if (_head == _maxQueueCapacity)                  {                      _head = 0;                  }                  return tcs.GetValueTask();              }          }

當我們請求後調用_queuePolicy.OnExit();出棧,再將請求長度遞減

    public void OnExit()          {              lock (_bufferLock)              {                  if (_queueLength == 0)                  {                      _freeServerSpots++;                        if (_freeServerSpots > _maxConcurrentRequests)                      {                          _freeServerSpots--;                          throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");                      }                        return;                  }                    // step backwards and launch a new task                  if (_head == 0)                  {                      _head = _maxQueueCapacity - 1;                  }                  else                  {                      _head--;                  }                  //退出,設置成已完成                  _buffer[_head].Complete(true);                  _queueLength--;              }          }  

總結

基於棧結構的特點,在實際應用中,通常只會對棧執行以下兩種操作:

  • 向棧中添加元素,此過程被稱為"進棧"(入棧或壓棧);
  • 從棧中提取出指定元素,此過程被稱為"出棧"(或彈棧);

隊列存儲結構的實現有以下兩種方式:

  • 順序隊列:在順序表的基礎上實現的隊列結構;
  • 鏈隊列:在鏈表的基礎上實現的隊列結構;