Asp.Net Core 7 preview 4 重磅新特性–限流中間件
- 2022 年 5 月 12 日
- 筆記
- .NET Core, aspnetcore, RateLimit, 限流
前言
限流是應對流量暴增或某些用戶惡意攻擊等場景的重要手段之一,然而微軟官方從未支援這一重要特性,AspNetCoreRateLimit
這一第三方庫限流庫一般作為首選使用,然而其配置參數過於繁多,對使用者造成較大的學習成本。令人高興的是,在剛剛發布的.NET 7 Preview 4
中開始支援限流
中間件。
UseRateLimiter嘗鮮
- 安裝
.NET 7.0 SDK(v7.0.100-preview.4)
- 通過nuget包安裝
Microsoft.AspNetCore.RateLimiting
- 創建.Net7網站應用,註冊中間件
全局限流並發1個
app.UseRateLimiter(new RateLimiterOptions
{
Limiter = PartitionedRateLimiter.Create<HttpContext, string>(resource =>
{
return RateLimitPartition.CreateConcurrencyLimiter("MyLimiter",
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
})
});
根據不同資源不同限制並發數,/api
前綴的資源租約數2,等待隊列長度為2,其他默認租約數1,隊列長度1。
app.UseRateLimiter(new RateLimiterOptions()
{
// 觸發限流的響應碼
DefaultRejectionStatusCode = 500,
OnRejected = async (ctx, rateLimitLease) =>
{
// 觸發限流回調處理
},
Limiter = PartitionedRateLimiter.Create<HttpContext, string>(resource =>
{
if (resource.Request.Path.StartsWithSegments("/api"))
{
return RateLimitPartition.CreateConcurrencyLimiter("WebApiLimiter",
_ => new ConcurrencyLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2));
}
else
{
return RateLimitPartition.CreateConcurrencyLimiter("DefaultLimiter",
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
}
})
});
本地測試
- 新建一個webapi項目,並註冊限流中間件如下
using Microsoft.AspNetCore.RateLimiting;
using System.Threading.RateLimiting;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at //aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
var app = builder.Build();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseRateLimiter(new RateLimiterOptions
{
DefaultRejectionStatusCode = 500,
OnRejected = async (ctx, lease) =>
{
await Task.FromResult(ctx.Response.WriteAsync("ConcurrencyLimiter"));
},
Limiter = PartitionedRateLimiter.Create<HttpContext, string>(resource =>
{
return RateLimitPartition.CreateConcurrencyLimiter("MyLimiter",
_ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
})
});
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
- 啟動項目,使用jmeter測試100並發,請求介面
/WeatherForecast
所有請求處理成功,失敗0!
這個結果是不是有點失望,其實RateLimitPartition.CreateConcurrencyLimiter
創建的限流器是
ConcurrencyLimiter
,後續可以實現個各種策略的限流器進行替換之。
看了ConcurrencyLimiter
的實現,其實就是令牌桶
的限流思想,上面配置的new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1))
,第一個1代表令牌的個數,第二個1代表可以當桶里的令牌為空時,進入等待隊列,而不是直接失敗,當前面的請求結束後,會歸還令牌,此時等待的請求就可以拿到令牌了,QueueProcessingOrder.NewestFirst
代表最新的請求優先獲取令牌,也就是獲取令牌時非公平的,還有另一個枚舉值QueueProcessingOrder.OldestFirst
老的優先,獲取令牌是公平的。只要我們獲取到令牌的人幹活速度快,雖然我們令牌只有1,並發就很高。
3. 測試觸發失敗場景
只需要讓我們拿到令牌的人持有時間長點,就能輕易的觸發。
調整jmater並發數為10
相應內容也是我們設置的內容。
ConcurrencyLimiter源碼
令牌桶限流思想
獲取令牌
protected override RateLimitLease AcquireCore(int permitCount)
{
// These amounts of resources can never be acquired
if (permitCount > _options.PermitLimit)
{
throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, SR.Format(SR.PermitLimitExceeded, permitCount, _options.PermitLimit));
}
ThrowIfDisposed();
// Return SuccessfulLease or FailedLease to indicate limiter state
if (permitCount == 0)
{
return _permitCount > 0 ? SuccessfulLease : FailedLease;
}
// Perf: Check SemaphoreSlim implementation instead of locking
if (_permitCount >= permitCount)
{
lock (Lock)
{
if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease))
{
return lease;
}
}
}
return FailedLease;
}
嘗試獲取令牌核心邏輯
private bool TryLeaseUnsynchronized(int permitCount, [NotNullWhen(true)] out RateLimitLease? lease)
{
ThrowIfDisposed();
// if permitCount is 0 we want to queue it if there are no available permits
if (_permitCount >= permitCount && _permitCount != 0)
{
if (permitCount == 0)
{
// Edge case where the check before the lock showed 0 available permits but when we got the lock some permits were now available
lease = SuccessfulLease;
return true;
}
// a. if there are no items queued we can lease
// b. if there are items queued but the processing order is newest first, then we can lease the incoming request since it is the newest
if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst))
{
_idleSince = null;
_permitCount -= permitCount;
Debug.Assert(_permitCount >= 0);
lease = new ConcurrencyLease(true, this, permitCount);
return true;
}
}
lease = null;
return false;
}
令牌獲取失敗後進入等待隊列
protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, CancellationToken cancellationToken = default)
{
// These amounts of resources can never be acquired
if (permitCount > _options.PermitLimit)
{
throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, SR.Format(SR.PermitLimitExceeded, permitCount, _options.PermitLimit));
}
// Return SuccessfulLease if requestedCount is 0 and resources are available
if (permitCount == 0 && _permitCount > 0 && !_disposed)
{
return new ValueTask<RateLimitLease>(SuccessfulLease);
}
// Perf: Check SemaphoreSlim implementation instead of locking
lock (Lock)
{
if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease))
{
return new ValueTask<RateLimitLease>(lease);
}
// Avoid integer overflow by using subtraction instead of addition
Debug.Assert(_options.QueueLimit >= _queueCount);
if (_options.QueueLimit - _queueCount < permitCount)
{
if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && permitCount <= _options.QueueLimit)
{
// remove oldest items from queue until there is space for the newest request
do
{
RequestRegistration oldestRequest = _queue.DequeueHead();
_queueCount -= oldestRequest.Count;
Debug.Assert(_queueCount >= 0);
if (!oldestRequest.Tcs.TrySetResult(FailedLease))
{
// Updating queue count is handled by the cancellation code
_queueCount += oldestRequest.Count;
}
}
while (_options.QueueLimit - _queueCount < permitCount);
}
else
{
// Don't queue if queue limit reached and QueueProcessingOrder is OldestFirst
return new ValueTask<RateLimitLease>(QueueLimitLease);
}
}
CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken);
CancellationTokenRegistration ctr = default;
if (cancellationToken.CanBeCanceled)
{
ctr = cancellationToken.Register(static obj =>
{
((CancelQueueState)obj!).TrySetCanceled();
}, tcs);
}
RequestRegistration request = new RequestRegistration(permitCount, tcs, ctr);
_queue.EnqueueTail(request);
_queueCount += permitCount;
Debug.Assert(_queueCount <= _options.QueueLimit);
return new ValueTask<RateLimitLease>(request.Tcs.Task);
}
}
歸還令牌
private void Release(int releaseCount)
{
lock (Lock)
{
if (_disposed)
{
return;
}
_permitCount += releaseCount;
Debug.Assert(_permitCount <= _options.PermitLimit);
while (_queue.Count > 0)
{
RequestRegistration nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.PeekHead()
: _queue.PeekTail();
if (_permitCount >= nextPendingRequest.Count)
{
nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.DequeueHead()
: _queue.DequeueTail();
_permitCount -= nextPendingRequest.Count;
_queueCount -= nextPendingRequest.Count;
Debug.Assert(_permitCount >= 0);
ConcurrencyLease lease = nextPendingRequest.Count == 0 ? SuccessfulLease : new ConcurrencyLease(true, this, nextPendingRequest.Count);
// Check if request was canceled
if (!nextPendingRequest.Tcs.TrySetResult(lease))
{
// Queued item was canceled so add count back
_permitCount += nextPendingRequest.Count;
// Updating queue count is handled by the cancellation code
_queueCount += nextPendingRequest.Count;
}
nextPendingRequest.CancellationTokenRegistration.Dispose();
Debug.Assert(_queueCount >= 0);
}
else
{
break;
}
}
if (_permitCount == _options.PermitLimit)
{
Debug.Assert(_idleSince is null);
Debug.Assert(_queueCount == 0);
_idleSince = Stopwatch.GetTimestamp();
}
}
}
總結
雖然這次官方對限流進行了支援,但貌似還不能支援對ip或client級別的限制支援,對於更高級的限流策略仍需要藉助第三方庫或自己實現,期待後續越來越完善。