ASP.NET Core 中間件(Middleware)(一)
本文主要目標:記錄Middleware的運行原理流程,並繪製流程圖。
目錄結構:
1、運行環境
2、Demo實踐
3、源碼追蹤
4、AspnetCore內置middleware
一、運行環境
Visual Studio Community 2019 版本 16.8.5
.Net Sdk Version: 5.0.103
二、Demo實踐
講解或學習一個東西的時候,最方便的方式是先寫一個Demo。基於此,我寫以一個中間件的記錄請求輸出的實踐Demo來理解Middleware。
實體:
public class RequestResponseLog
{
public string Id { get; set; }
public DateTime CreateTime { get; set; }
public string RequestJson { get; set; }
public string ResponseJson { get; set; }
}
public class Student
{
public string Id { get; set; }
public string Name { get; set; }
/// <summary>
/// 學校
/// </summary>
public string School { get; set; }
/// <summary>
/// 班級
/// </summary>
public string Class { get; set; }
/// <summary>
/// 年級
/// </summary>
public string Grade { get; set; }
}
Controller:用於接收請求
[Route("api/[controller]")]
[ApiController]
public class StudentController : Controller
{
[HttpGet("GetStudent")]
public IActionResult GetStudent()
{
var student = new Student()
{
Id = Guid.NewGuid().ToString(),
Class = "321",
Grade = "23",
Name = "Name001",
School = "School002"
};
return Ok(student);
}
}
Middleware 中間件(記錄Request和Response):
public class RequestResponseLoggingMiddleware
{
private RequestDelegate _next;
public RequestResponseLoggingMiddleware(RequestDelegate next)
{
this._next = next;
}
/// <summary>
///
/// </summary>
/// <param name="httpContext"></param>
/// <returns></returns>
public async Task Invoke(HttpContext context)
{
//First, get the incoming request
var request = await FormatRequest(context.Request);
var body = context.Response.Body;
//Copy a pointer to the original response body stream
var originalBodyStream = context.Response.Body;
//Create a new memory stream...
using (var responseBody = new MemoryStream())
{
//...and use that for the temporary response body
context.Response.Body = responseBody;
//Continue down the Middleware pipeline, eventually returning to this class
await _next(context);
//Format the response from the server
var response = await FormatResponse(context.Response);
//TODO: Save log to chosen datastore,臨時使用
DemoQueueBlock<RequestResponseLog>.Add(new RequestResponseLog()
{
Id=Guid.NewGuid().ToString(),
CreateTime = DateTime.Now,
ResponseJson = response,
RequestJson = request
});
//Copy the contents of the new memory stream (which contains the response) to the original stream, which is then returned to the client.
await responseBody.CopyToAsync(originalBodyStream);
}
}
為了防止實時存儲資料庫壓力過大,倉儲部分用了BlockingCollection實現的簡易隊列。
blockingcollection-1.getconsumingenumerable
public static void Consume(Action<T> func)
{
Task.Factory.StartNew(() =>
{
foreach (var item in Colls.GetConsumingEnumerable())
{
func(item);
Console.WriteLine(string.Format("---------------: {0}", item));
}
});
}
消費隊列時入庫:
public class DemoConsume
{
private readonly MysqlDbContext _dbContext;
public DemoConsume(MysqlDbContext dbContext)
{
_dbContext = dbContext;
}
public bool Consume()
{
DemoQueueBlock<RequestResponseLog>.Consume(async (log)=> {
await _dbContext.AddAsync(log);
await _dbContext.SaveChangesAsync();
});
return true;
}
}
StartUp文件AddConsume和
app.UseMiddleware
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
var connection = Configuration.GetConnectionString("MysqlConnection");
services.AddDbContext<MysqlDbContext>(options => options.UseMySQL(connection),ServiceLifetime.Scoped);
services.AddConsume();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseHttpsRedirection();
app.UseRouting();
app.UseAuthorization();
app.UseMiddleware<RequestResponseLoggingMiddleware>();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
Sql語句:
CREATE TABLE `request_response_log` (
`id` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`create_time` datetime(0) NULL DEFAULT NULL,
`request_json` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL,
`response_json` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
運行程式效果:
可以看到該Demo提供了一個記錄Http請求和輸出日誌的功能。
這裡面和Middleware有關的功能為:
1、定義了RequestResponseLoggingMiddleware類
RequestDelegate向下轉發請求,
Invoke方法
2、StartUp的app.UseMiddleware
這些方法具體怎麼流轉運行的呢?我們來搜一下源碼可以確認下。
三、源碼跟蹤
所以我們可以看下UseMiddlewareExtensions
public static class UseMiddlewareExtensions
{
internal const string InvokeMethodName = "Invoke";
internal const string InvokeAsyncMethodName = "InvokeAsync";
/// <summary>
/// Adds a middleware type to the application's request pipeline.
/// </summary>
/// <param name="app">The <see cref="IApplicationBuilder"/> instance.</param>
/// <param name="middleware">The middleware type.</param>
/// <param name="args">The arguments to pass to the middleware type instance's constructor.</param>
/// <returns>The <see cref="IApplicationBuilder"/> instance.</returns>
public static IApplicationBuilder UseMiddleware(this IApplicationBuilder app, [DynamicallyAccessedMembers(MiddlewareAccessibility)] Type middleware, params object?[] args)
{
if (typeof(IMiddleware).IsAssignableFrom(middleware))
{
// IMiddleware doesn't support passing args directly since it's
// activated from the container
if (args.Length > 0)
{
throw new NotSupportedException(Resources.FormatException_UseMiddlewareExplicitArgumentsNotSupported(typeof(IMiddleware)));
}
return UseMiddlewareInterface(app, middleware);
}
var applicationServices = app.ApplicationServices;
return app.Use(next =>
{
var methods = middleware.GetMethods(BindingFlags.Instance | BindingFlags.Public);
var invokeMethods = methods.Where(m =>
string.Equals(m.Name, InvokeMethodName, StringComparison.Ordinal)
|| string.Equals(m.Name, InvokeAsyncMethodName, StringComparison.Ordinal)
).ToArray();
if (invokeMethods.Length > 1)
{
throw new InvalidOperationException(Resources.FormatException_UseMiddleMutlipleInvokes(InvokeMethodName, InvokeAsyncMethodName));
}
if (invokeMethods.Length == 0)
{
throw new InvalidOperationException(Resources.FormatException_UseMiddlewareNoInvokeMethod(InvokeMethodName, InvokeAsyncMethodName, middleware));
}
var methodInfo = invokeMethods[0];
if (!typeof(Task).IsAssignableFrom(methodInfo.ReturnType))
{
throw new InvalidOperationException(Resources.FormatException_UseMiddlewareNonTaskReturnType(InvokeMethodName, InvokeAsyncMethodName, nameof(Task)));
}
var parameters = methodInfo.GetParameters();
if (parameters.Length == 0 || parameters[0].ParameterType != typeof(HttpContext))
{
throw new InvalidOperationException(Resources.FormatException_UseMiddlewareNoParameters(InvokeMethodName, InvokeAsyncMethodName, nameof(HttpContext)));
}
var ctorArgs = new object[args.Length + 1];
ctorArgs[0] = next;
Array.Copy(args, 0, ctorArgs, 1, args.Length);
var instance = ActivatorUtilities.CreateInstance(app.ApplicationServices, middleware, ctorArgs);
if (parameters.Length == 1)
{
return (RequestDelegate)methodInfo.CreateDelegate(typeof(RequestDelegate), instance);
}
var factory = Compile<object>(methodInfo, parameters);
return context =>
{
var serviceProvider = context.RequestServices ?? applicationServices;
if (serviceProvider == null)
{
throw new InvalidOperationException(Resources.FormatException_UseMiddlewareIServiceProviderNotAvailable(nameof(IServiceProvider)));
}
return factory(instance, context, serviceProvider);
};
});
}
這裡面用了
UseMiddleware
UseMiddleware(type TMiddleware)
進行了如下判斷:
1、如果TMiddleware是繼承了IMiddleware,則執行UseMiddlewareInterface方法。利用IMiddlewareFactory提供中間件的工廠創建方式,Microsoft.AspNetCore.Http提供了IMiddlewareFactory的默認實現MiddlewareFactory。
return app.Use(next =>
{
return async context =>
{
var middlewareFactory = (IMiddlewareFactory?)context.RequestServices.GetService(typeof(IMiddlewareFactory));
if (middlewareFactory == null)
{
// No middleware factory
throw new InvalidOperationException(Resources.FormatException_UseMiddlewareNoMiddlewareFactory(typeof(IMiddlewareFactory)));
}
var middleware = middlewareFactory.Create(middlewareType);
if (middleware == null)
{
// The factory returned null, it's a broken implementation
throw new InvalidOperationException(Resources.FormatException_UseMiddlewareUnableToCreateMiddleware(middlewareFactory.GetType(), middlewareType));
}
try
{
await middleware.InvokeAsync(context, next);
}
finally
{
middlewareFactory.Release(middleware);
}
};
});
2、如果沒有繼承Middleware,則執行以下操作:
1、根據Invoke或InvokeAsync查找方法
2、驗證只存在一個方法
3、驗證返回類型為Task
4、驗證第一個參數必須是HttpContext
5、ActivatorUtilities.CreateInstance 創建實例
6、如果只有一個參數,返回一個RequestDelegate類型的方法委託?
7、多個參數繼續執行如下操作。Compile方法和參數。
var factory = Compile
return factory(instance, context, serviceProvider);
};
8、Compile演示了Lamuda表達式的編譯方式,以後可作參考
private static Func<T, HttpContext, IServiceProvider, Task> Compile<T>(MethodInfo methodInfo, ParameterInfo[] parameters)
{
// If we call something like
//
// public class Middleware
// {
// public Task Invoke(HttpContext context, ILoggerFactory loggerFactory)
// {
//
// }
// }
//
// We'll end up with something like this:
// Generic version:
//
// Task Invoke(Middleware instance, HttpContext httpContext, IServiceProvider provider)
// {
// return instance.Invoke(httpContext, (ILoggerFactory)UseMiddlewareExtensions.GetService(provider, typeof(ILoggerFactory));
// }
// Non generic version:
//
// Task Invoke(object instance, HttpContext httpContext, IServiceProvider provider)
// {
// return ((Middleware)instance).Invoke(httpContext, (ILoggerFactory)UseMiddlewareExtensions.GetService(provider, typeof(ILoggerFactory));
// }
var middleware = typeof(T);
var httpContextArg = Expression.Parameter(typeof(HttpContext), "httpContext");
var providerArg = Expression.Parameter(typeof(IServiceProvider), "serviceProvider");
var instanceArg = Expression.Parameter(middleware, "middleware");
var methodArguments = new Expression[parameters.Length];
methodArguments[0] = httpContextArg;
for (int i = 1; i < parameters.Length; i++)
{
var parameterType = parameters[i].ParameterType;
if (parameterType.IsByRef)
{
throw new NotSupportedException(Resources.FormatException_InvokeDoesNotSupportRefOrOutParams(InvokeMethodName));
}
var parameterTypeExpression = new Expression[]
{
providerArg,
Expression.Constant(parameterType, typeof(Type)),
Expression.Constant(methodInfo.DeclaringType, typeof(Type))
};
var getServiceCall = Expression.Call(GetServiceInfo, parameterTypeExpression);
methodArguments[i] = Expression.Convert(getServiceCall, parameterType);
}
Expression middlewareInstanceArg = instanceArg;
if (methodInfo.DeclaringType != null && methodInfo.DeclaringType != typeof(T))
{
middlewareInstanceArg = Expression.Convert(middlewareInstanceArg, methodInfo.DeclaringType);
}
var body = Expression.Call(middlewareInstanceArg, methodInfo, methodArguments);
var lambda = Expression.Lambda<Func<T, HttpContext, IServiceProvider, Task>>(body, instanceArg, httpContextArg, providerArg);
return lambda.Compile();
}
從上面我們可以看到這個擴展方法主要做了兩件事:
判斷是IMiddleware,然後採用不同的處理方式。
文章剛開始我們已經實踐了非繼承的模式,下面我們來實踐下繼承IMiddleware的模式。
public class TestMiddleware : IMiddleware
{
public async Task InvokeAsync(HttpContext context, RequestDelegate next)
{
Console.WriteLine("TestMiddleware");
await next(context);
// throw new NotImplementedException();
}
}
StartUp
(由於
MiddlewareFactory通過_serviceProvider.GetRequiredService(middlewareType) as IMiddleware獲取中間件,所以需要在ConfigureServices裡面注入TestMiddleware,不然會報錯):
public void ConfigureServices(IServiceCollection services)
{
services.AddSingleton<TestMiddleware>();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
app.UseMiddleware<TestMiddleware>();
}
效果如下:
以上搜查暫時告一段落。
但裡面還有個IApplicationBuilder的use方式尚沒有看到使用方式,還需要繼續探查。
IApplicationBuilder介面:
定義一個用於配置應用程式請求管道的類
public interface IApplicationBuilder
{
/// <summary>
/// Gets or sets the <see cref="IServiceProvider"/> that provides access to the application's service container.
/// </summary>
IServiceProvider ApplicationServices { get; set; }
/// <summary>
/// Gets the set of HTTP features the application's server provides.
/// </summary>
IFeatureCollection ServerFeatures { get; }
/// <summary>
/// Gets a key/value collection that can be used to share data between middleware.
/// </summary>
IDictionary<string, object?> Properties { get; }
/// <summary>
/// Adds a middleware delegate to the application's request pipeline.
/// </summary>
/// <param name="middleware">The middleware delegate.</param>
/// <returns>The <see cref="IApplicationBuilder"/>.</returns>
IApplicationBuilder Use(Func<RequestDelegate, RequestDelegate> middleware);
/// <summary>
/// Creates a new <see cref="IApplicationBuilder"/> that shares the <see cref="Properties"/> of this
/// <see cref="IApplicationBuilder"/>.
/// </summary>
/// <returns>The new <see cref="IApplicationBuilder"/>.</returns>
IApplicationBuilder New();
/// <summary>
/// Builds the delegate used by this application to process HTTP requests.
/// </summary>
/// <returns>The request handling delegate.</returns>
RequestDelegate Build();
}
通過查看引用,我們可以看到提供了以下擴展:AspNetCore.Http.Abstractions\Extension
圖片
通過翻看源碼,可以看出這些擴展都是調用的IApplicationBuilder的use,我們只需要繼續關注這個Use就行了。通過繼續追溯源碼,可以搜到IApplicationBuilderFactory的默認實現ApplicationBuilderFactory,它是一個創建ApplicationBuilder的工廠類。
public class ApplicationBuilderFactory : IApplicationBuilderFactory
{
private readonly IServiceProvider _serviceProvider;
/// <summary>
/// Initialize a new factory instance with an <see cref="IServiceProvider" />.
/// </summary>
/// <param name="serviceProvider">The <see cref="IServiceProvider"/> used to resolve dependencies and initialize components.</param>
public ApplicationBuilderFactory(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
/// <summary>
/// Create an <see cref="IApplicationBuilder" /> builder given a <paramref name="serverFeatures" />.
/// </summary>
/// <param name="serverFeatures">An <see cref="IFeatureCollection"/> of HTTP features.</param>
/// <returns>An <see cref="IApplicationBuilder"/> configured with <paramref name="serverFeatures"/>.</returns>
public IApplicationBuilder CreateBuilder(IFeatureCollection serverFeatures)
{
return new ApplicationBuilder(_serviceProvider, serverFeatures);
}
}
關注一下 ApplicationBuilder的重點部分:
public class ApplicationBuilder : IApplicationBuilder
{
private const string ServerFeaturesKey = "server.Features";
private const string ApplicationServicesKey = "application.Services";
private readonly List<Func<RequestDelegate, RequestDelegate>> _components = new();
/// <summary>
/// Initializes a new instance of <see cref="ApplicationBuilder"/>.
/// </summary>
/// <param name="serviceProvider">The <see cref="IServiceProvider"/> for application services.</param>
public ApplicationBuilder(IServiceProvider serviceProvider)
{
Properties = new Dictionary<string, object?>(StringComparer.Ordinal);
ApplicationServices = serviceProvider;
}
/// <summary>
/// Initializes a new instance of <see cref="ApplicationBuilder"/>.
/// </summary>
/// <param name="serviceProvider">The <see cref="IServiceProvider"/> for application services.</param>
/// <param name="server">The server instance that hosts the application.</param>
public ApplicationBuilder(IServiceProvider serviceProvider, object server)
: this(serviceProvider)
{
SetProperty(ServerFeaturesKey, server);
}
private ApplicationBuilder(ApplicationBuilder builder)
{
Properties = new CopyOnWriteDictionary<string, object?>(builder.Properties, StringComparer.Ordinal);
}
/// <summary>
/// Gets the <see cref="IServiceProvider"/> for application services.
/// </summary>
public IServiceProvider ApplicationServices
{
get
{
return GetProperty<IServiceProvider>(ApplicationServicesKey)!;
}
set
{
SetProperty<IServiceProvider>(ApplicationServicesKey, value);
}
}
/// <summary>
/// Gets the <see cref="IFeatureCollection"/> for server features.
/// </summary>
public IFeatureCollection ServerFeatures
{
get
{
return GetProperty<IFeatureCollection>(ServerFeaturesKey)!;
}
}
/// <summary>
/// Gets a set of properties for <see cref="ApplicationBuilder"/>.
/// </summary>
public IDictionary<string, object?> Properties { get; }
private T? GetProperty<T>(string key)
{
return Properties.TryGetValue(key, out var value) ? (T?)value : default(T);
}
private void SetProperty<T>(string key, T value)
{
Properties[key] = value;
}
/// <summary>
/// Adds the middleware to the application request pipeline.
/// </summary>
/// <param name="middleware">The middleware.</param>
/// <returns>An instance of <see cref="IApplicationBuilder"/> after the operation has completed.</returns>
public IApplicationBuilder Use(Func<RequestDelegate, RequestDelegate> middleware)
{
_components.Add(middleware);
return this;
}
/// <summary>
/// Produces a <see cref="RequestDelegate"/> that executes added middlewares.
/// </summary>
/// <returns>The <see cref="RequestDelegate"/>.</returns>
public RequestDelegate Build()
{
RequestDelegate app = context =>
{
// If we reach the end of the pipeline, but we have an endpoint, then something unexpected has happened.
// This could happen if user code sets an endpoint, but they forgot to add the UseEndpoint middleware.
var endpoint = context.GetEndpoint();
var endpointRequestDelegate = endpoint?.RequestDelegate;
if (endpointRequestDelegate != null)
{
var message =
$"The request reached the end of the pipeline without executing the endpoint: '{endpoint!.DisplayName}'. " +
$"Please register the EndpointMiddleware using '{nameof(IApplicationBuilder)}.UseEndpoints(...)' if using " +
$"routing.";
throw new InvalidOperationException(message);
}
context.Response.StatusCode = StatusCodes.Status404NotFound;
return Task.CompletedTask;
};
for (var c = _components.Count - 1; c >= 0; c--)
{
app = _components[c](app);
}
return app;
}
}
從上面源碼的實現來看Use的作用僅僅是將一個中間件添加到List<Func<RequestDelegate, RequestDelegate>> _components裡面,換句話來講就是將一個RequestDelegate的委託放到一個list裡面。
流程圖如下:
圖片
四、Asp.netCore內置Middleware舉例:
以ConcurrencyLimiterMiddleware為例,傳入的請求進行排隊處理,避免執行緒池的不足.
public class ConcurrencyLimiterMiddleware
{
private readonly IQueuePolicy _queuePolicy;
private readonly RequestDelegate _next;
private readonly RequestDelegate _onRejected;
private readonly ILogger _logger;
/// <summary>
/// Creates a new <see cref="ConcurrencyLimiterMiddleware"/>.
/// </summary>
/// <param name="next">The <see cref="RequestDelegate"/> representing the next middleware in the pipeline.</param>
/// <param name="loggerFactory">The <see cref="ILoggerFactory"/> used for logging.</param>
/// <param name="queue">The queueing strategy to use for the server.</param>
/// <param name="options">The options for the middleware, currently containing the 'OnRejected' callback.</param>
public ConcurrencyLimiterMiddleware(RequestDelegate next, ILoggerFactory loggerFactory, IQueuePolicy queue, IOptions<ConcurrencyLimiterOptions> options)
{
if (options.Value.OnRejected == null)
{
throw new ArgumentException("The value of 'options.OnRejected' must not be null.", nameof(options));
}
_next = next;
_logger = loggerFactory.CreateLogger<ConcurrencyLimiterMiddleware>();
_onRejected = options.Value.OnRejected;
_queuePolicy = queue;
}
/// <summary>
/// Invokes the logic of the middleware.
/// </summary>
/// <param name="context">The <see cref="HttpContext"/>.</param>
/// <returns>A <see cref="Task"/> that completes when the request leaves.</returns>
public async Task Invoke(HttpContext context)
{
var waitInQueueTask = _queuePolicy.TryEnterAsync();
// Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
bool result;
if (waitInQueueTask.IsCompleted)
{
ConcurrencyLimiterEventSource.Log.QueueSkipped();
result = waitInQueueTask.Result;
}
else
{
using (ConcurrencyLimiterEventSource.Log.QueueTimer())
{
result = await waitInQueueTask;
}
}
if (result)
{
try
{
await _next(context);
}
finally
{
_queuePolicy.OnExit();
}
}
else
{
ConcurrencyLimiterEventSource.Log.RequestRejected();
ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
await _onRejected(context);
}
}
需要注意的是有兩個:
1、IQueuePolicy,asp.netCore內置了兩種實現QueuePolicy和StackPolicy,這裡就不貼程式碼了,主要是關於堆和棧的實現邏輯。
2、ConcurrencyLimiterOptions
QueuePolicyServiceCollectionExtensions
public static class QueuePolicyServiceCollectionExtensions
{
/// <summary>
/// Tells <see cref="ConcurrencyLimiterMiddleware"/> to use a FIFO queue as its queueing strategy.
/// </summary>
/// <param name="services">The <see cref="IServiceCollection"/> to add services to.</param>
/// <param name="configure">Set the options used by the queue.
/// Mandatory, since <see cref="QueuePolicyOptions.MaxConcurrentRequests"></see> must be provided.</param>
/// <returns></returns>
public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
{
services.Configure(configure);
services.AddSingleton<IQueuePolicy, QueuePolicy>();
return services;
}
/// <summary>
/// Tells <see cref="ConcurrencyLimiterMiddleware"/> to use a LIFO stack as its queueing strategy.
/// </summary>
/// <param name="services">The <see cref="IServiceCollection"/> to add services to.</param>
/// <param name="configure">Set the options used by the queue.
/// Mandatory, since <see cref="QueuePolicyOptions.MaxConcurrentRequests"></see> must be provided.</param>
/// <returns></returns>
public static IServiceCollection AddStackPolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
{
services.Configure(configure);
services.AddSingleton<IQueuePolicy, StackPolicy>();
return services;
}
}
public class QueuePolicyOptions
{
/// <summary>
/// Maximum number of concurrent requests. Any extras will be queued on the server.
/// This option is highly application dependant, and must be configured by the application.
/// </summary>
public int MaxConcurrentRequests { get; set; }
/// <summary>
/// Maximum number of queued requests before the server starts rejecting connections with '503 Service Unavailable'.
/// This option is highly application dependant, and must be configured by the application.
/// </summary>
public int RequestQueueLimit { get; set; }
}
通過源碼可以大概看出使用方式了吧,這裡就不做實踐了。
今天的分享到此結束,謝謝觀看。
由於排版問題,原文請參考://mp.weixin.qq.com/s/nm8Pa-q3oOInX0LIw9swNA