使用ABP SignalR重構消息服務(二)
使用ABP SignalR重構消息服務(二)
上篇使用ABP SignalR重構消息服務(一)主要講的是SignalR的基礎知識和前端如何使用SignalR,這段時間也是落實方案設計。這篇我主要講解SignalR源碼(最近我手頭工作比較忙@蟹老闆)。
SignalR源碼分析(原地址,原地址已經停止維護了合併到了AspNetCore)
使用SignalR我們主要是添加
services.AddSignalR();
,添加ChatHub
類繼承我們的Hub ,然後管道注入endpoints.MapHub<ChatHub>("/ChatHub");
通過services.AddSignalR()
可以看到使用的類是SignalRDependencyInjectionExtensions
通過Hub
類可以看到程式集是Microsoft.AspNetCore.SignalR.Core
通過MapHub<ChatHub>
可以看到使用的類是HubEndpointRouteBuilderExtensions
SignalR服務註冊
我們先分析services.AddSignalR()
注入做了什麼準備
這裡我們要講一個東西
Microsoft.AspNetCore.SignalR.Core
類庫有一個SignalRDependencyInjectionExtensions
Microsoft.AspNetCore.SignalR
類庫也存在一個SignalRDependencyInjectionExtensions
Microsoft.AspNetCore.SignalR
類庫中的SignalRDependencyInjectionExtensions
解讀
public static class SignalRDependencyInjectionExtensions
{
// 單獨注入SignalR配置
public static ISignalRServerBuilder AddHubOptions<THub>(this ISignalRServerBuilder signalrBuilder, Action<HubOptions<THub>> configure) where THub : Hub
{
if (signalrBuilder == null)
{
throw new ArgumentNullException(nameof(signalrBuilder));
}
signalrBuilder.Services.AddSingleton<IConfigureOptions<HubOptions<THub>>, HubOptionsSetup<THub>>();
signalrBuilder.Services.Configure(configure);
return signalrBuilder;
}
// 添加SignalR服務
public static ISignalRServerBuilder AddSignalR(this IServiceCollection services)
{
if (services == null)
{
throw new ArgumentNullException(nameof(services));
}
// ConnectionsDependencyInjectionExtensions拓展類 添加請求路由、添加身份驗證、添加Http連接調度程式、添加Http連接管理器
services.AddConnections();
// 禁用WebSocket保持活動,因為SignalR有它自己的
services.Configure<WebSocketOptions>(o => o.KeepAliveInterval = TimeSpan.Zero);
services.TryAddSingleton<SignalRMarkerService>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<IConfigureOptions<HubOptions>, HubOptionsSetup>());
//調用 Microsoft.AspNetCore.SignalR.Core 類庫中的 SignalRDependencyInjectionExtensions
return services.AddSignalRCore();
}
// 添加SignalR服務。注入SignalR配置資訊
public static ISignalRServerBuilder AddSignalR(this IServiceCollection services, Action<HubOptions> configure)
{
if (services == null)
{
throw new ArgumentNullException(nameof(services));
}
var signalrBuilder = services.AddSignalR();
services.Configure(configure);
return signalrBuilder;
}
}
Microsoft.AspNetCore.SignalR.Core
類庫中的SignalRDependencyInjectionExtensions
解讀
這裡面注入了SignalR中核心類,所以下面的程式碼我們一定要仔細研讀了。
public static class SignalRDependencyInjectionExtensions
{
// 將最小的基本SignalR服務添加IServiceCollection 中
public static ISignalRServerBuilder AddSignalRCore(this IServiceCollection services)
{
// 用於標記SignalR是否注入
services.TryAddSingleton<SignalRCoreMarkerService>();
// 注入默認集線器生命周期管理器
services.TryAddSingleton(typeof(HubLifetimeManager<>), typeof(DefaultHubLifetimeManager<>));
// 注入默認集線器協議解析器
services.TryAddSingleton(typeof(IHubProtocolResolver), typeof(DefaultHubProtocolResolver));
// 注入集線器上下文
services.TryAddSingleton(typeof(IHubContext<>), typeof(HubContext<>));
services.TryAddSingleton(typeof(IHubContext<,>), typeof(HubContext<,>));
// 注入集線器中心連接處理程式
services.TryAddSingleton(typeof(HubConnectionHandler<>), typeof(HubConnectionHandler<>));
// 注入獲取用戶唯一標識方法
services.TryAddSingleton(typeof(IUserIdProvider), typeof(DefaultUserIdProvider));
// 注入默認中心調度員
services.TryAddSingleton(typeof(HubDispatcher<>), typeof(DefaultHubDispatcher<>));
// 注入默認激活中心
services.TryAddScoped(typeof(IHubActivator<>), typeof(DefaultHubActivator<>));
// 添加授權
services.AddAuthorization();
var builder = new SignalRServerBuilder(services);
// 添加Protocol轉json
builder.AddJsonProtocol();
return builder;
}
}
SignalR集線器設計
通過Hub
類可以看到程式集是Microsoft.AspNetCore.SignalR.Core
// Hub 是一個抽象類
public abstract class Hub : IDisposable
{
private bool _disposed;
// 客戶端鏈接
private IHubCallerClients _clients = default!;
// 集線器呼叫中心上下文
private HubCallerContext _context = default!;
// 集線器組管理
private IGroupManager _groups = default!;
// 客戶端鏈接(管理所有用戶鏈接)
public IHubCallerClients Clients
{
get
{
CheckDisposed();
return _clients;
}
set
{
CheckDisposed();
_clients = value;
}
}
// 集線器上下文(保存當前用戶鏈接資訊)
public HubCallerContext Context
{
get
{
CheckDisposed();
return _context;
}
set
{
CheckDisposed();
_context = value;
}
}
// 組管理(對於組進行添加或者刪除)
public IGroupManager Groups
{
get
{
CheckDisposed();
return _groups;
}
set
{
CheckDisposed();
_groups = value;
}
}
// 連接方法(用於兼容用戶連接操作)
public virtual Task OnConnectedAsync()
{
return Task.CompletedTask;
}
// 鏈接釋放方法(用於監控用戶下線操作)
public virtual Task OnDisconnectedAsync(Exception? exception)
{
return Task.CompletedTask;
}
protected virtual void Dispose(bool disposing)
{
}
public void Dispose()
{
if (_disposed)
{
return;
}
Dispose(true);
_disposed = true;
}
private void CheckDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException(GetType().Name);
}
}
}
SignalR中間件
通過MapHubHubEndpointRouteBuilderExtensions
app.UseEndpoints(endpoints =>
{
endpoints.MapHub<ChatHub>("/ChatHub");
});
HubEndpointRouteBuilderExtensions
源程式碼
public static class HubEndpointRouteBuilderExtensions
{
................................
// 註冊集線器
public static HubEndpointConventionBuilder MapHub<[DynamicallyAccessedMembers(HubAccessibility)] THub>(this IEndpointRouteBuilder endpoints, string pattern, Action<HttpConnectionDispatcherOptions>? configureOptions) where THub : Hub
{
// 這個就是我們上面註冊SignalR保留來判斷是否注入
var marker = endpoints.ServiceProvider.GetService<SignalRMarkerService>();
if (marker == null)
{
throw new InvalidOperationException("Unable to find the required services. Please add all the required services by calling " +
"'IServiceCollection.AddSignalR' inside the call to 'ConfigureServices(...)' in the application startup code.");
}
// SignalR配置資訊
var options = new HttpConnectionDispatcherOptions();
configureOptions?.Invoke(options);
// endpoints.MapConnections用來接收第一次連接請求,然後開啟對於協議連接
var conventionBuilder = endpoints.MapConnections(pattern, options, b =>
{
// SignalRConnectionBuilderExtensions拓展類(這裡是一個重點,將我們的泛型集線器連接進行注入,就可以開始它的工作了)
b.UseHub<THub>();
});
....................................
return new HubEndpointConventionBuilder(conventionBuilder);
}
}
SignalRConnectionBuilderExtensions
源程式碼
public static class SignalRConnectionBuilderExtensions
{
public static IConnectionBuilder UseHub<[DynamicallyAccessedMembers(HubAccessibility)] THub>(this IConnectionBuilder connectionBuilder) where THub : Hub
{
var marker = connectionBuilder.ApplicationServices.GetService(typeof(SignalRCoreMarkerService));
if (marker == null)
{
throw new InvalidOperationException("Unable to find the required services. Please add all the required services by calling " +
"'IServiceCollection.AddSignalR' inside the call to 'ConfigureServices(...)' in the application startup code.");
}
// 1.connectionBuilder.UseConnectionHandler拓展方法在 ConnectionBuilderExtensions中
// 2.HubConnectionHandler這個不就是我們注入服務的集線器中心連接處理程式嗎?
return connectionBuilder.UseConnectionHandler<HubConnectionHandler<THub>>();
}
}
ConnectionBuilderExtensions
源程式碼
public static class ConnectionBuilderExtensions
{
// 執行集線器的連接方法,到了這裡就代表本次連接成功了
public static IConnectionBuilder UseConnectionHandler<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TConnectionHandler>(this IConnectionBuilder connectionBuilder) where TConnectionHandler : ConnectionHandler
{
var handler = ActivatorUtilities.GetServiceOrCreateInstance<TConnectionHandler>(connectionBuilder.ApplicationServices);
// 這是一個終端中間件,所以沒有必要使用'next'參數
return connectionBuilder.Run(connection => handler.OnConnectedAsync(connection));
}
}
小結
通過services.AddSignalR()
進行SignalR基礎服務進行註冊。
通過Hub
抽象工程,由不同的集線器繼承,定義同一的連接、斷開方法、客戶端連接管理、群組管理、當前上下文資訊。
通過MapHub<ChatHub>
通過中間件路由規則進行流量劃分。
當我們看完上面調用鏈路,腦中是不是已經有了一個清晰的方向了,它是怎麼與前端進行連接的,並且對於注入的服務有一定的了解。
HubConnectionHandler
連接處理
我們已經知道進入中間件之後就會進入HubConnectionHandler.OnConnectedAsync()
方法
public override async Task OnConnectedAsync(ConnectionContext connection)
{
// 我們檢查是否設置了HubOptions<THub>,因為它們優先於全局hub選項。
// 然後將keepAlive和handshakeTimeout值設置為HubOptionsSetup中的默認值,當它們顯式地設置為null時。
var supportedProtocols = _hubOptions.SupportedProtocols ?? _globalHubOptions.SupportedProtocols;
if (supportedProtocols == null || supportedProtocols.Count == 0)
{
throw new InvalidOperationException("There are no supported protocols");
}
// 默認握手超時15分鐘
var handshakeTimeout = _hubOptions.HandshakeTimeout ?? _globalHubOptions.HandshakeTimeout ?? HubOptionsSetup.DefaultHandshakeTimeout;
// 集線器連接配置
var contextOptions = new HubConnectionContextOptions()
{
KeepAliveInterval = _hubOptions.KeepAliveInterval ?? _globalHubOptions.KeepAliveInterval ?? HubOptionsSetup.DefaultKeepAliveInterval,
ClientTimeoutInterval = _hubOptions.ClientTimeoutInterval ?? _globalHubOptions.ClientTimeoutInterval ?? HubOptionsSetup.DefaultClientTimeoutInterval,
StreamBufferCapacity = _hubOptions.StreamBufferCapacity ?? _globalHubOptions.StreamBufferCapacity ?? HubOptionsSetup.DefaultStreamBufferCapacity,
MaximumReceiveMessageSize = _maximumMessageSize,
SystemClock = SystemClock,
MaximumParallelInvocations = _maxParallelInvokes,
};
Log.ConnectedStarting(_logger);
// 創建連接上下文,將用戶資訊添加到上下文中
var connectionContext = new HubConnectionContext(connection, contextOptions, _loggerFactory);
var resolvedSupportedProtocols = (supportedProtocols as IReadOnlyList<string>) ?? supportedProtocols.ToList();
if (!await connectionContext.HandshakeAsync(handshakeTimeout, resolvedSupportedProtocols, _protocolResolver, _userIdProvider, _enableDetailedErrors))
{
return;
}
// 已建立connectionContext
try
{
// 默認集線器生命周期管理器(DefaultHubLifetimeManager)將當前用戶添加到連接池中
await _lifetimeManager.OnConnectedAsync(connectionContext);
// 獲取我們對應的集線器,執行OnConnectedAsync()方法,這個時候就真正的開始執行我們寫的程式碼了。
// 裡面有一個消息分配方法DispatchMessagesAsync(),獲取我們交互的資訊進行處理
await RunHubAsync(connectionContext);
}
finally
{
connectionContext.Cleanup();
Log.ConnectedEnding(_logger);
// 當處理消息方法跳出,之後代表當前用戶已經斷開連接了,所以我們需要觸發斷線方法
await _lifetimeManager.OnDisconnectedAsync(connectionContext);
}
}
SignalR非同步分派消息
// 非同步分派消息
private async Task DispatchMessagesAsync(HubConnectionContext connection)
{
var input = connection.Input;
var protocol = connection.Protocol;
connection.BeginClientTimeout();
var binder = new HubConnectionBinder<THub>(_dispatcher, connection);
while (true)
{
var result = await input.ReadAsync();
var buffer = result.Buffer;
try
{
if (result.IsCanceled)
{
break;
}
// 存在消息
if (!buffer.IsEmpty)
{
bool messageReceived = false;
// 沒有消息限制,只是解析和分派
if (_maximumMessageSize == null)
{
while (protocol.TryParseMessage(ref buffer, binder, out var message))
{
connection.StopClientTimeout();
// 我們接收到了消息,停止超時檢查
messageReceived = true;
// 將接收的消息,根據不同的類型進行分發處理
await _dispatcher.DispatchMessageAsync(connection, message);
}
if (messageReceived)
{
// 處理完接收消息之後,開啟超時檢查
connection.BeginClientTimeout();
}
}
else
{
// 我們給解析器一個默認消息大小的滑動窗口
var maxMessageSize = _maximumMessageSize.Value;
while (!buffer.IsEmpty)
{
var segment = buffer;
var overLength = false;
// 切分消息,慢慢進行處理
if (segment.Length > maxMessageSize)
{
segment = segment.Slice(segment.Start, maxMessageSize);
overLength = true;
}
if (protocol.TryParseMessage(ref segment, binder, out var message))
{
connection.StopClientTimeout();
// 我們接收到了消息,停止超時檢查
messageReceived = true;
// 將接收的消息,根據不同的類型進行分發處理
await _dispatcher.DispatchMessageAsync(connection, message);
}
else if (overLength)
{
throw new InvalidDataException($"The maximum message size of {maxMessageSize}B was exceeded. The message size can be configured in AddHubOptions.");
}
else
{
// No need to update the buffer since we didn't parse anything
break;
}
// Update the buffer to the remaining segment
buffer = buffer.Slice(segment.Start);
}
if (messageReceived)
{
connection.BeginClientTimeout();
}
}
}
if (result.IsCompleted)
{
if (!buffer.IsEmpty)
{
throw new InvalidDataException("Connection terminated while reading a message.");
}
break;
}
}
finally
{
// 緩衝區被分割到它被消耗的地方,所以我們可以直接開始。 我們把檢查標記為緩衝。 結束,如果我們沒有收到完整的幀,我們將等待更多的數據 再讀一遍之前。
input.AdvanceTo(buffer.Start, buffer.End);
}
}
SignalR針對用戶發送消息
針對於群發消息,我們知道有一個組的容器,我們只要將大家添加到一個組中就可以了,那麼我們想根據用戶發送消息1:1的模式,SignalR源碼中是怎麼處理的呢?
在註冊SignalR服務中我們可以看到這個services.TryAddSingleton(typeof(IUserIdProvider), typeof(DefaultUserIdProvider));
public class DefaultUserIdProvider : IUserIdProvider
{
// 獲取當前用戶標識
public virtual string? GetUserId(HubConnectionContext connection)
{
// 這個也就是為什麼我們在不做任何處理之下想使用SignalR用戶模式,需要在Jwt中添加一個ClaimTypes.NameIdentifier了
return connection.User.FindFirst(ClaimTypes.NameIdentifier)?.Value;
}
}
我們只需要自己定義一個實現類,將模式實現替換掉就可以了。
// 用戶模式發送源碼
public override Task SendUserAsync(string userId, string methodName, object?[] args, CancellationToken cancellationToken = default)
{
// connection.UserIdentifier就是執行了GetUserId()方法獲取的用戶標識
return SendToAllConnections(methodName, args, (connection, state) => string.Equals(connection.UserIdentifier, (string)state!, StringComparison.Ordinal), userId, cancellationToken);
}
SignalR項目使用設計
上面我們已經講完,SignalR從連接==>處理消息以及用戶模式的源碼設計,相信小夥伴已經腦海中已經漿糊了,那麼就開始項目中實踐方式
我主要負責提供基礎SignalR庫,給到不同的部門進行使用,所以我首先需要考慮到一個高內聚,低耦合的設計,這裡我首先不能摻雜業務邏輯,但是又需要所有業務聚合到我這邊,然後通過不同的業務進行不同的處理。
設計思路:
- 定義兩個介面
IReceiveMessage
和ISendMessage
,介面中分別有MessageType
屬性,HandlerAsync(input)
方法 - 定義一個公用的集線器注入
IEnumerable<IReceiveMessage>
和IEnumerable<ISendMessage>
添加Receive(input)
和Send(input)
方法通過不同的入參中的MessageType
屬性,從注入集合中獲取對應的消息實現進行處理
集線器偽程式碼
public class SignalRHub : Hub
{
private readonly IEnumerable<IReceiveMessage> _receiveMessages;
private readonly IEnumerable<ISendMessage> _sendMessages;
public SignalRHub(IEnumerable<IReceiveMessage> receiveMessages,
IEnumerable<ISendMessage> sendMessages)
{
_receiveMessages = receiveMessages;
_sendMessages = sendMessages;
}
public async Task Receive(SignalRReceiveMessage input)
{
await _receiveMessages.FirstOrDefault(x => string.Compare(x.MessageType, input.MessageType, true) == 0).HandlerAsync(input);
}
public async Task Send(SignalRSendMessage outInput)
{
await _sendMessages.FirstOrDefault(x => string.Compare(x.MessageType, outInput.MessageType, true) == 0).HandlerAsync(outInput);
}
}
業務實現示例
public class NotificationSendMessage : ISendMessage, ISingletonDependency
{
public string MessageType
{
get => SignalRSendMessageEnum.Notification.ToString();
}
public Task HandlerAsync(SignalRSendMessage message)
{
//.......業務邏輯......
}
}
這樣我就只需要接收消息,進行轉發給對應實現就可以了,我給同事提供了SignalR服務,又不干涉他們的業務。