使用ABP SignalR重構消息服務(二)

使用ABP SignalR重構消息服務(二)

上篇使用ABP SignalR重構消息服務(一)主要講的是SignalR的基礎知識和前端如何使用SignalR,這段時間也是落實方案設計。這篇我主要講解SignalR源碼(最近我手頭工作比較忙@蟹老闆)。

SignalR源碼分析(原地址,原地址已經停止維護了合併到了AspNetCore

使用SignalR我們主要是添加services.AddSignalR();,添加ChatHub類繼承我們的Hub ,然後管道注入endpoints.MapHub<ChatHub>("/ChatHub");
通過services.AddSignalR()可以看到使用的類是SignalRDependencyInjectionExtensions
通過Hub類可以看到程式集是Microsoft.AspNetCore.SignalR.Core
通過MapHub<ChatHub>可以看到使用的類是HubEndpointRouteBuilderExtensions

SignalR服務註冊

我們先分析services.AddSignalR()注入做了什麼準備

這裡我們要講一個東西Microsoft.AspNetCore.SignalR.Core類庫有一個SignalRDependencyInjectionExtensions
Microsoft.AspNetCore.SignalR類庫也存在一個SignalRDependencyInjectionExtensions

Microsoft.AspNetCore.SignalR類庫中的SignalRDependencyInjectionExtensions解讀

public static class SignalRDependencyInjectionExtensions
{
    // 單獨注入SignalR配置
    public static ISignalRServerBuilder AddHubOptions<THub>(this ISignalRServerBuilder signalrBuilder, Action<HubOptions<THub>> configure) where THub : Hub
    {
        if (signalrBuilder == null)
        {
            throw new ArgumentNullException(nameof(signalrBuilder));
        }

        signalrBuilder.Services.AddSingleton<IConfigureOptions<HubOptions<THub>>, HubOptionsSetup<THub>>();
        signalrBuilder.Services.Configure(configure);
        return signalrBuilder;
    }

    //  添加SignalR服務
    public static ISignalRServerBuilder AddSignalR(this IServiceCollection services)
    {
        if (services == null)
        {
            throw new ArgumentNullException(nameof(services));
        }
        // ConnectionsDependencyInjectionExtensions拓展類 添加請求路由、添加身份驗證、添加Http連接調度程式、添加Http連接管理器
        services.AddConnections();
        // 禁用WebSocket保持活動,因為SignalR有它自己的
        services.Configure<WebSocketOptions>(o => o.KeepAliveInterval = TimeSpan.Zero);
        services.TryAddSingleton<SignalRMarkerService>();
        services.TryAddEnumerable(ServiceDescriptor.Singleton<IConfigureOptions<HubOptions>, HubOptionsSetup>());
        //調用 Microsoft.AspNetCore.SignalR.Core 類庫中的 SignalRDependencyInjectionExtensions
        return services.AddSignalRCore();
    }

    // 添加SignalR服務。注入SignalR配置資訊
    public static ISignalRServerBuilder AddSignalR(this IServiceCollection services, Action<HubOptions> configure)
    {
        if (services == null)
        {
            throw new ArgumentNullException(nameof(services));
        }

        var signalrBuilder = services.AddSignalR();
        services.Configure(configure);
        return signalrBuilder;
    }
}

Microsoft.AspNetCore.SignalR.Core類庫中的SignalRDependencyInjectionExtensions解讀
這裡面注入了SignalR中核心類,所以下面的程式碼我們一定要仔細研讀了。

public static class SignalRDependencyInjectionExtensions
{    
    // 將最小的基本SignalR服務添加IServiceCollection 中
    public static ISignalRServerBuilder AddSignalRCore(this IServiceCollection services)
    {
        // 用於標記SignalR是否注入
        services.TryAddSingleton<SignalRCoreMarkerService>();
        // 注入默認集線器生命周期管理器
        services.TryAddSingleton(typeof(HubLifetimeManager<>), typeof(DefaultHubLifetimeManager<>));
        // 注入默認集線器協議解析器
        services.TryAddSingleton(typeof(IHubProtocolResolver), typeof(DefaultHubProtocolResolver));
        // 注入集線器上下文
        services.TryAddSingleton(typeof(IHubContext<>), typeof(HubContext<>));
        services.TryAddSingleton(typeof(IHubContext<,>), typeof(HubContext<,>));
        // 注入集線器中心連接處理程式
        services.TryAddSingleton(typeof(HubConnectionHandler<>), typeof(HubConnectionHandler<>));
        // 注入獲取用戶唯一標識方法
        services.TryAddSingleton(typeof(IUserIdProvider), typeof(DefaultUserIdProvider));
        // 注入默認中心調度員
        services.TryAddSingleton(typeof(HubDispatcher<>), typeof(DefaultHubDispatcher<>));
        // 注入默認激活中心
        services.TryAddScoped(typeof(IHubActivator<>), typeof(DefaultHubActivator<>));
        // 添加授權
        services.AddAuthorization();

        var builder = new SignalRServerBuilder(services);
        // 添加Protocol轉json
        builder.AddJsonProtocol();
        return builder;
    }
}

SignalR集線器設計

通過Hub類可以看到程式集是Microsoft.AspNetCore.SignalR.Core

// Hub 是一個抽象類
public abstract class Hub : IDisposable
{
    private bool _disposed;
    // 客戶端鏈接
    private IHubCallerClients _clients = default!;
    // 集線器呼叫中心上下文
    private HubCallerContext _context = default!;
    // 集線器組管理
    private IGroupManager _groups = default!;
    // 客戶端鏈接(管理所有用戶鏈接)
    public IHubCallerClients Clients
    {
        get
        {
            CheckDisposed();
            return _clients;
        }
        set
        {
            CheckDisposed();
            _clients = value;
        }
    }
    // 集線器上下文(保存當前用戶鏈接資訊)
    public HubCallerContext Context
    {
        get
        {
            CheckDisposed();
            return _context;
        }
        set
        {
            CheckDisposed();
            _context = value;
        }
    }
    // 組管理(對於組進行添加或者刪除)
    public IGroupManager Groups
    {
        get
        {
            CheckDisposed();
            return _groups;
        }
        set
        {
            CheckDisposed();
            _groups = value;
        }
    }

    // 連接方法(用於兼容用戶連接操作)
    public virtual Task OnConnectedAsync()
    {
        return Task.CompletedTask;
    }

    // 鏈接釋放方法(用於監控用戶下線操作)
    public virtual Task OnDisconnectedAsync(Exception? exception)
    {
        return Task.CompletedTask;
    }

    protected virtual void Dispose(bool disposing)
    {
    }

    public void Dispose()
    {
        if (_disposed)
        {
            return;
        }

        Dispose(true);

        _disposed = true;
    }

    private void CheckDisposed()
    {
        if (_disposed)
        {
            throw new ObjectDisposedException(GetType().Name);
        }
    }
}

SignalR中間件

通過MapHub可以看到使用的類是HubEndpointRouteBuilderExtensions

app.UseEndpoints(endpoints =>
{
  endpoints.MapHub<ChatHub>("/ChatHub");
});

HubEndpointRouteBuilderExtensions源程式碼

public static class HubEndpointRouteBuilderExtensions
{
    ................................

    // 註冊集線器
    public static HubEndpointConventionBuilder MapHub<[DynamicallyAccessedMembers(HubAccessibility)] THub>(this IEndpointRouteBuilder endpoints, string pattern, Action<HttpConnectionDispatcherOptions>? configureOptions) where THub : Hub
    {
        // 這個就是我們上面註冊SignalR保留來判斷是否注入
        var marker = endpoints.ServiceProvider.GetService<SignalRMarkerService>();

        if (marker == null)
        {
            throw new InvalidOperationException("Unable to find the required services. Please add all the required services by calling " +
                                                "'IServiceCollection.AddSignalR' inside the call to 'ConfigureServices(...)' in the application startup code.");
        }
        // SignalR配置資訊
        var options = new HttpConnectionDispatcherOptions();
        configureOptions?.Invoke(options);

        // endpoints.MapConnections用來接收第一次連接請求,然後開啟對於協議連接
        var conventionBuilder = endpoints.MapConnections(pattern, options, b =>
        {
            // SignalRConnectionBuilderExtensions拓展類(這裡是一個重點,將我們的泛型集線器連接進行注入,就可以開始它的工作了)
            b.UseHub<THub>();
        });

        ....................................
        return new HubEndpointConventionBuilder(conventionBuilder);
    }
}

SignalRConnectionBuilderExtensions源程式碼

public static class SignalRConnectionBuilderExtensions
{
    public static IConnectionBuilder UseHub<[DynamicallyAccessedMembers(HubAccessibility)] THub>(this IConnectionBuilder connectionBuilder) where THub : Hub
    {
        var marker = connectionBuilder.ApplicationServices.GetService(typeof(SignalRCoreMarkerService));
        if (marker == null)
        {
            throw new InvalidOperationException("Unable to find the required services. Please add all the required services by calling " +
                "'IServiceCollection.AddSignalR' inside the call to 'ConfigureServices(...)' in the application startup code.");
        }
        // 1.connectionBuilder.UseConnectionHandler拓展方法在 ConnectionBuilderExtensions中
        // 2.HubConnectionHandler這個不就是我們注入服務的集線器中心連接處理程式嗎?
        return connectionBuilder.UseConnectionHandler<HubConnectionHandler<THub>>();
    }
}

ConnectionBuilderExtensions源程式碼

public static class ConnectionBuilderExtensions
{
    // 執行集線器的連接方法,到了這裡就代表本次連接成功了
    public static IConnectionBuilder UseConnectionHandler<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicConstructors)] TConnectionHandler>(this IConnectionBuilder connectionBuilder) where TConnectionHandler : ConnectionHandler
    {
        var handler = ActivatorUtilities.GetServiceOrCreateInstance<TConnectionHandler>(connectionBuilder.ApplicationServices);

        // 這是一個終端中間件,所以沒有必要使用'next'參數 
        return connectionBuilder.Run(connection => handler.OnConnectedAsync(connection));
    }
}

小結

通過services.AddSignalR()進行SignalR基礎服務進行註冊。
通過Hub抽象工程,由不同的集線器繼承,定義同一的連接、斷開方法、客戶端連接管理、群組管理、當前上下文資訊。
通過MapHub<ChatHub>通過中間件路由規則進行流量劃分。
當我們看完上面調用鏈路,腦中是不是已經有了一個清晰的方向了,它是怎麼與前端進行連接的,並且對於注入的服務有一定的了解。

HubConnectionHandler連接處理

我們已經知道進入中間件之後就會進入HubConnectionHandler.OnConnectedAsync()方法

    public override async Task OnConnectedAsync(ConnectionContext connection)
    {
        // 我們檢查是否設置了HubOptions<THub>,因為它們優先於全局hub選項。  
        // 然後將keepAlive和handshakeTimeout值設置為HubOptionsSetup中的默認值,當它們顯式地設置為null時。  

        var supportedProtocols = _hubOptions.SupportedProtocols ?? _globalHubOptions.SupportedProtocols;
        if (supportedProtocols == null || supportedProtocols.Count == 0)
        {
            throw new InvalidOperationException("There are no supported protocols");
        }
        // 默認握手超時15分鐘
        var handshakeTimeout = _hubOptions.HandshakeTimeout ?? _globalHubOptions.HandshakeTimeout ?? HubOptionsSetup.DefaultHandshakeTimeout;
        // 集線器連接配置
        var contextOptions = new HubConnectionContextOptions()
        {
            KeepAliveInterval = _hubOptions.KeepAliveInterval ?? _globalHubOptions.KeepAliveInterval ?? HubOptionsSetup.DefaultKeepAliveInterval,
            ClientTimeoutInterval = _hubOptions.ClientTimeoutInterval ?? _globalHubOptions.ClientTimeoutInterval ?? HubOptionsSetup.DefaultClientTimeoutInterval,
            StreamBufferCapacity = _hubOptions.StreamBufferCapacity ?? _globalHubOptions.StreamBufferCapacity ?? HubOptionsSetup.DefaultStreamBufferCapacity,
            MaximumReceiveMessageSize = _maximumMessageSize,
            SystemClock = SystemClock,
            MaximumParallelInvocations = _maxParallelInvokes,
        };

        Log.ConnectedStarting(_logger);
        // 創建連接上下文,將用戶資訊添加到上下文中
        var connectionContext = new HubConnectionContext(connection, contextOptions, _loggerFactory);

        var resolvedSupportedProtocols = (supportedProtocols as IReadOnlyList<string>) ?? supportedProtocols.ToList();
        if (!await connectionContext.HandshakeAsync(handshakeTimeout, resolvedSupportedProtocols, _protocolResolver, _userIdProvider, _enableDetailedErrors))
        {
            return;
        }

        // 已建立connectionContext

        try
        {
            // 默認集線器生命周期管理器(DefaultHubLifetimeManager)將當前用戶添加到連接池中
            await _lifetimeManager.OnConnectedAsync(connectionContext);
            // 獲取我們對應的集線器,執行OnConnectedAsync()方法,這個時候就真正的開始執行我們寫的程式碼了。
            // 裡面有一個消息分配方法DispatchMessagesAsync(),獲取我們交互的資訊進行處理
            await RunHubAsync(connectionContext);
        }
        finally
        {
            connectionContext.Cleanup();

            Log.ConnectedEnding(_logger);
            // 當處理消息方法跳出,之後代表當前用戶已經斷開連接了,所以我們需要觸發斷線方法
            await _lifetimeManager.OnDisconnectedAsync(connectionContext);
        }
    }

SignalR非同步分派消息

//  非同步分派消息
private async Task DispatchMessagesAsync(HubConnectionContext connection)
    {
        var input = connection.Input;
        var protocol = connection.Protocol;
        connection.BeginClientTimeout();

        var binder = new HubConnectionBinder<THub>(_dispatcher, connection);

        while (true)
        {
            var result = await input.ReadAsync();
            var buffer = result.Buffer;

            try
            {
                if (result.IsCanceled)
                {
                    break;
                }
                // 存在消息
                if (!buffer.IsEmpty)
                {
                    bool messageReceived = false;
                    // 沒有消息限制,只是解析和分派
                    if (_maximumMessageSize == null)
                    {
                        while (protocol.TryParseMessage(ref buffer, binder, out var message))
                        {
                            connection.StopClientTimeout();
                            // 我們接收到了消息,停止超時檢查
                            messageReceived = true;
                            // 將接收的消息,根據不同的類型進行分發處理
                            await _dispatcher.DispatchMessageAsync(connection, message);
                        }

                        if (messageReceived)
                        {
                            // 處理完接收消息之後,開啟超時檢查
                            connection.BeginClientTimeout();
                        }
                    }
                    else
                    {
                        // 我們給解析器一個默認消息大小的滑動窗口  
                        var maxMessageSize = _maximumMessageSize.Value;

                        while (!buffer.IsEmpty)
                        {
                            var segment = buffer;
                            var overLength = false;
                            // 切分消息,慢慢進行處理
                            if (segment.Length > maxMessageSize)
                            {
                                segment = segment.Slice(segment.Start, maxMessageSize);
                                overLength = true;
                            }

                            if (protocol.TryParseMessage(ref segment, binder, out var message))
                            {
                                connection.StopClientTimeout();
                                // 我們接收到了消息,停止超時檢查
                                messageReceived = true;
                                // 將接收的消息,根據不同的類型進行分發處理
                                await _dispatcher.DispatchMessageAsync(connection, message);
                            }
                            else if (overLength)
                            {
                                throw new InvalidDataException($"The maximum message size of {maxMessageSize}B was exceeded. The message size can be configured in AddHubOptions.");
                            }
                            else
                            {
                                // No need to update the buffer since we didn't parse anything
                                break;
                            }

                            // Update the buffer to the remaining segment
                            buffer = buffer.Slice(segment.Start);
                        }

                        if (messageReceived)
                        {
                            connection.BeginClientTimeout();
                        }
                    }
                }

                if (result.IsCompleted)
                {
                    if (!buffer.IsEmpty)
                    {
                        throw new InvalidDataException("Connection terminated while reading a message.");
                    }
                    break;
                }
            }
            finally
            {
                // 緩衝區被分割到它被消耗的地方,所以我們可以直接開始。  我們把檢查標記為緩衝。 結束,如果我們沒有收到完整的幀,我們將等待更多的數據  再讀一遍之前。
                input.AdvanceTo(buffer.Start, buffer.End);
            }
        }

SignalR針對用戶發送消息

針對於群發消息,我們知道有一個組的容器,我們只要將大家添加到一個組中就可以了,那麼我們想根據用戶發送消息1:1的模式,SignalR源碼中是怎麼處理的呢?

在註冊SignalR服務中我們可以看到這個services.TryAddSingleton(typeof(IUserIdProvider), typeof(DefaultUserIdProvider));

public class DefaultUserIdProvider : IUserIdProvider
{
    // 獲取當前用戶標識
    public virtual string? GetUserId(HubConnectionContext connection)
    {
        // 這個也就是為什麼我們在不做任何處理之下想使用SignalR用戶模式,需要在Jwt中添加一個ClaimTypes.NameIdentifier了
        return connection.User.FindFirst(ClaimTypes.NameIdentifier)?.Value;
    }
}

我們只需要自己定義一個實現類,將模式實現替換掉就可以了。

    // 用戶模式發送源碼
    public override Task SendUserAsync(string userId, string methodName, object?[] args, CancellationToken cancellationToken = default)
    {
        //  connection.UserIdentifier就是執行了GetUserId()方法獲取的用戶標識
        return SendToAllConnections(methodName, args, (connection, state) => string.Equals(connection.UserIdentifier, (string)state!, StringComparison.Ordinal), userId, cancellationToken);
    }

SignalR項目使用設計

上面我們已經講完,SignalR從連接==>處理消息以及用戶模式的源碼設計,相信小夥伴已經腦海中已經漿糊了,那麼就開始項目中實踐方式

我主要負責提供基礎SignalR庫,給到不同的部門進行使用,所以我首先需要考慮到一個高內聚,低耦合的設計,這裡我首先不能摻雜業務邏輯,但是又需要所有業務聚合到我這邊,然後通過不同的業務進行不同的處理。
設計思路:

  • 定義兩個介面IReceiveMessageISendMessage,介面中分別有MessageType屬性,HandlerAsync(input)方法
  • 定義一個公用的集線器注入IEnumerable<IReceiveMessage>IEnumerable<ISendMessage>添加Receive(input)Send(input)方法通過不同的入參中的MessageType屬性,從注入集合中獲取對應的消息實現進行處理

集線器偽程式碼

    public class SignalRHub : Hub
    {
        private readonly IEnumerable<IReceiveMessage> _receiveMessages;
        private readonly IEnumerable<ISendMessage> _sendMessages;

        public SignalRHub(IEnumerable<IReceiveMessage> receiveMessages,
            IEnumerable<ISendMessage> sendMessages)
        {
            _receiveMessages = receiveMessages;
            _sendMessages = sendMessages;
        }

        public async Task Receive(SignalRReceiveMessage input)
        {
            await _receiveMessages.FirstOrDefault(x => string.Compare(x.MessageType, input.MessageType, true) == 0).HandlerAsync(input);
        }

        public async Task Send(SignalRSendMessage outInput) 
        {
            await _sendMessages.FirstOrDefault(x => string.Compare(x.MessageType, outInput.MessageType, true) == 0).HandlerAsync(outInput);
        }
    }

業務實現示例

    public class NotificationSendMessage : ISendMessage, ISingletonDependency
    {
        public string MessageType
        {
            get => SignalRSendMessageEnum.Notification.ToString();
        }

        public Task HandlerAsync(SignalRSendMessage message)
        {
            //.......業務邏輯......
        }
    }

這樣我就只需要接收消息,進行轉發給對應實現就可以了,我給同事提供了SignalR服務,又不干涉他們的業務。