­

[SuperSocket2.0]SuperSocket 2.0從入門到懵逼

SuperSocket 2.0從入門到懵逼

附帶實現, 講解, 與部分源程式碼解讀

1 使用SuperSocket 2.0在AspNetCore項目中搭建一個Socket伺服器

1.1 引入SuperSocket 2.0

SuperSocket 2.0目前仍然處於Beta階段, 但是功能基本可靠, 可以用於生產環境.

可以從Github源地址自行fork程式碼進行其它修改, 如需直接引用, 在默認的Nuget伺服器是沒有的, 需要添加作者自己的Nuget伺服器//www.myget.org/F/supersocket/api/v3/index.json, 獲取預覽版本.

1.2 在AspNetCore中搭建一個Socket伺服器

使用SuperSocket 2.0快速搭建一個Socket伺服器非常簡單, 在框架中, 作者實現了一個SuperSocketHostBuilder進行構建, 我們要做的只是簡單的配置一些參數和業務邏輯.

此外, SuperSocketHostBuilder是可以直接嵌入到AspNetCore項目的CreateHostBuilder上的, 但是並不推薦這麼做…

  1. 配置數據包和過濾器/選擇器
    • 數據包, 即我們進行Socket通訊時的數據包, 可以是包對象, 也可以是byte[], 如果是包對象, 則需要在過濾器中配置解碼器.
    • 過濾器, 作用是對數據包進行篩選然後截取一包數據, 可以將解碼器掛載進來, 直接將二進位數據映射為對象.
    • 選擇器, 我們可以使用選擇器, 來實現一個埠服務於多種協議的情況, 此時, 一個選擇器則會搭配多個過濾器進行使用.
  2. 配置IP和埠
    • 配置IP和埠可以選擇兩種方式, 一種方式是從程式寫入, 另一種是從配置文件中寫入.
    • 在本系列的介紹中, 我們大多數採用程式寫入的方式, 從配置文件寫入的方式, 後續會採用其它方式實現, 來擴展業務.
  3. 配置Session
    • 在程式中作者內置了IAppSessionAppSession供我們使用, 如果我們需要自定義Session, 則需要繼承AppSession並且在程式中進行引用, 即可切換為我們定義的Session.

    • 自定義Session時, 由於在程式中大多數提供的參數都為IAppSession, 所以, 需要實現SuperSocket的更多其它介面進行重寫, 來維持程式運轉.

    • 自定義Session大多數時候是為了添加更多自定義屬性, 作者在設計中提供了另外的方式提供我們選擇:

      // AppSession源碼
      public class AppSession : IAppSession, ILogger, ILoggerAccessor
      {
          //......
      
          private Dictionary<object, object> _items;
      
          public object this[object name]
          {
              get
              {
                  var items = _items;
      
                  if (items == null)
                      return null;
      
                  object value;
                  
                  if (items.TryGetValue(name, out value))
                      return value;
      
                  return null;
              }
      
              set
              {
                  lock (this)
                  {
                      var items = _items;
      
                      if (items == null)
                          items = _items = new Dictionary<object, object>();
      
                      items[name] = value;
                  }
              }
          }
      
          //......
      }
      

      可以看到, 其中內置了一個字典, 我們可以將屬性的Key-Value值直接存在字典中, 即session["prop"] = value;的形式.

  4. 配置SessionHandler
    • SessionHandler會在建立和斷開Socket連接時觸發, 用於處理連接和斷開時的業務, 可以在SuperSocketHostBuilder中直接配置, 也可以通過重寫相應的介面實現.
    • 連接時, 會將創建好的Session傳入該方法.
    • 斷開時, 會將斷開的Session以及觸發斷開的事件傳入該方法.
  5. 配置PackageHandler
    • PackageHandler會在接收到數據包時觸發.
    • 該方法自動觸發時我們將獲取兩個參數, 一個是Session, 一個是Package. 但是要注意的是, 這裡的Session是IAppSession類型, 並不是我們自定義的Session.
      • Session即為當前Socket連接, 裡面附帶了各種連接資訊以及狀態等.
      • Package是通過過濾器後得到的數據包, 不過要注意的是, 例如:如果數據包為頭尾標識符的數據包, 如果採用的是byte[]的形式, 得到的可能不是原包, 而是去除了包頭尾標識後的數據體. 即:7E 01 02 03 7E, 去掉頭尾的7E標識得到的是01 02 03.
  6. Build & Run
    • 構建這裡同時提供了幾種方式, 推薦採用BuildAsServer(), 然後通過StartAsync()進行啟用.

此時, 一個Socket伺服器就搭建完成了. 具體實現:

// SampleSession
public class SampleSession: AppSession
{
    
}
// Socket Server程式碼
var tcpHost = SuperSocketHostBuilder.Create<byte[], PipelineFilter>()
    .ConfigureSuperSocket(options =>
    {
        options.AddListener(new ListenOptions
        {
            Ip = "Any",
            Port = 4040
        })
        .AddListener(new ListenOptions()
        {
            Ip = "Any",
            Port = 8888
        });
    })
    .UseSession<JT808TcpSession>()
    .UseClearIdleSession()
    .UseSessionHandler(s =>
    {
    })
    .UsePackageHandler(async (s, p) =>
    {
        //解包/應答/轉發
    })
    .ConfigureErrorHandler((s, v) =>
    {
    })
    .UseMiddleware<InProcSessionContainerMiddleware>()
    .UseInProcSessionContainer()
    .BuildAsServer();
    
await tcpHost.RunAsync();

.UseClearIdleSession()請務必調用, 在使用各類Socket框架時, 不可避免的我們的應用程式都會維持大量的殭屍連接, SuperSocket中提供了UseClearIdleSession()來自動復用已經閑置或者失去連接的資源.

2 基本的協議概念

2.1 基本協議種類

2.1.1 固定頭格式協議

顧名思義, 這類協議的Header是固定的, 並且一般Header的長度是固定的, 但是也有例外情況, 此外, Header中也會包含數據體的長度等資訊. 然後可以根據長度來截取一包數據.

2.1.2 固定頭尾標識協議

這類協議的數據包, 前幾位元組和後幾位元組是固定的, 這樣就可以通過頭尾的標識來截取一包數據.

通常, 這類協議的數據包, 為了避免數據內容和協議頭、尾的標識衝突, 通常會設置轉義, 即將數據包中出現頭尾標識的地方, 轉義為其它數據, 避免識別時出現錯誤.

2.1.3 固定包大小協議

這類協議每一包數據的大小都是固定的, 所以可以直接根據長度進行讀取.

2.1.4 命令行協議

這類協議通常以\r\n結尾, 採用字元串轉為二進位流進行傳輸.

2.1.5 一些其它協議

PS: 關於協議的一些硬體廠商的私有協議比較奇葩, 他們的協議五花八門的…不過我們這裡不做闡述, 有時間我會再講

3 SuperSocket中的幾個基本概念

3.1 Package Type

Package Type即包類型, 這裡描述的是數據包的結構, 例如SuperSocket中就提供了一些基礎的包類型TextPackageInfo等.

public class TextPackageInfo
{
    public string Text{get; set;}
}

這裡的TextPackageInfo標識了這類類型的數據包中, 僅包含了一個字元串, 當然, 我們通常會有更複雜的網路數據包結構.例如, 我將在下列展示一個包含首尾標識的通訊包, 它包含了首尾標識, 消息號, 終端Id, 以及消息體:

public class SamplePackage
{
    public byte Begin{get; set;}

    public MessageId MessageId{get; set;}

    public string TerminalId{get; set;}

    public SampleBody Body{get; set;}

    public byte End{get; set;}
}

當然, 在SuperSocket中也提供了一些介面供我們實現一些類似格式的包, 不過個人不太喜歡這種方式, 官方文檔也舉了一些例子, 例如,有的包會有一個特殊的欄位來代表此包內容的類型. 我們將此欄位命名為 “Key”. 此欄位也告訴我們用何種邏輯處理此類型的包. 這是在網路應用程式中非常常見的一種設計. 例如,你的 Key 欄位是整數類型,你的包類型需要實現介面IKeyedPackageInfo

public class MyPackage : IKeyedPackageInfo<int>
{
    public int Key { get; set; }

    public short Sequence { get; set; }

    public string Body { get; set; }
}

3.2 PipelineFilter Type

這種類型在網路協議解析中作用重要. 它定義了如何將 IO 數據流解碼成可以被應用程式理解的數據包. 換句話說, 就是把你的二進位流數據, 能夠一包一包的識別出來, 同時可以解析成你構建的Package對象. 當然, 你也可以選擇不構建, 然後將源數據直接返回.

這些是 PipelineFilter 的基本介面. 你的系統中至少需要一個實現這個介面的 PipelineFilter 類型.

public interface IPipelineFilter
{
    void Reset();

    object Context { get; set; }        
}

public interface IPipelineFilter<TPackageInfo> : IPipelineFilter
    where TPackageInfo : class
{

    IPackageDecoder<TPackageInfo> Decoder { get; set; }

    TPackageInfo Filter(ref SequenceReader<byte> reader);

    IPipelineFilter<TPackageInfo> NextFilter { get; }

}

事實上,由於 SuperSocket 已經提供了一些內置的 PipelineFilter 模版,這些幾乎可以覆蓋 90% 的場景的模版極大的簡化了你的開發工作. 所以你不需要完全從頭開始實現 PipelineFilter. 即使這些內置的模版無法滿足你的需求,完全自己實現PipelineFilter也不是難事.

3.3 使用PackageType和PipelineFilter Type創建SuperSocket

你定義好 Package 類型和 PipelineFilter 類型之後,你就可以使用 SuperSocketHostBuilder 創建 SuperSocket 宿主了。

var host = SuperSocketHostBuilder.Create<StringPackageInfo, CommandLinePipelineFilter>();

在某些情況下,你可能需要實現介面 IPipelineFilterFactory 來完全控制 PipelineFilter 的創建。

public class MyFilterFactory : PipelineFilterFactoryBase<TextPackageInfo>
{
    protected override IPipelineFilter<TPackageInfo> CreateCore(object client)
    {
        return new FixedSizePipelineFilter(10);
    }
}

然後在 SuperSocket 宿主被創建出來之後啟用這個 PipelineFilterFactory:

var host = SuperSocketHostBuilder.Create<StringPackageInfo>();
host.UsePipelineFilterFactory<MyFilterFactory>();

4 SuperSocket中的PipelineFilter, 實現自己的PipelineFilter

4.1 內置的PipelineFilter模板

SuperSocket中內置了一些PipelineFilter模板, 這些模板幾乎可以覆蓋到90%的應用場景, 極大簡化了開發工作, 所以不需要完全從頭開始實現PipelineFilter. 即使這些內置的模板無法滿足你的需求, 完全自己實現PipelineFilter.

SuperSocket提供了這些PipelineFilter模板:

  • TerminatorPipelineFilter (SuperSocket.ProtoBase.TerminatorPipelineFilter, SuperSocket.ProtoBase)
  • TerminatorTextPipelineFilter (SuperSocket.ProtoBase.TerminatorTextPipelineFilter, SuperSocket.ProtoBase)
  • LinePipelineFilter (SuperSocket.ProtoBase.LinePipelineFilter, SuperSocket.ProtoBase)
  • CommandLinePipelineFilter (SuperSocket.ProtoBase.CommandLinePipelineFilter, SuperSocket.ProtoBase)
  • BeginEndMarkPipelineFilter (SuperSocket.ProtoBase.BeginEndMarkPipelineFilter, SuperSocket.ProtoBase)
  • FixedSizePipelineFilter (SuperSocket.ProtoBase.FixedSizePipelineFilter, SuperSocket.ProtoBase)
  • FixedHeaderPipelineFilter (SuperSocket.ProtoBase.FixedHeaderPipelineFilter, SuperSocket.ProtoBase)

4.2 基於內置模板實現PipelineFilter

4.2.1 FixedHeaderPipelineFilter-頭部格式固定並且包含內容長度的協議

這種協議講請求定義為兩大部分, 第一部分定義了包含第二部分長度等等基礎資訊, 我們通常稱第一部分為頭部.

例如, 我們有一個這樣的協議: 頭部包含 3 個位元組, 第 1 個位元組用於存儲請求的類型, 後兩個位元組用於代表請求體的長度:

/// +-------+---+-------------------------------+
/// |request| l |                               |
/// | type  | e |    request body               |
/// |  (1)  | n |                               |
/// |       |(2)|                               |
/// +-------+---+-------------------------------+

根據此協議的規範, 我們可以使用如下程式碼定義包的類型:

public class MyPackage
{
    public byte Key { get; set; }

    public string Body { get; set; }
}

下一個是設計PipelineFilter:

public class MyPipelineFilter : FixedHeaderPipelineFilter<MyPackage>
{
    public MyPipelineFilter()
        : base(3) // 包頭的大小是3位元組,所以將3傳如基類的構造方法中去
    {

    }

    // 從數據包的頭部返回包體的大小
    protected override int GetBodyLengthFromHeader(ref ReadOnlySequence<byte> buffer)
    {
        var reader = new SequenceReader<byte>(buffer);
        reader.Advance(1); // skip the first byte
        reader.TryReadBigEndian(out short len);
        return len;
    }

    // 將數據包解析成 MyPackage 的實例
    protected override MyPackage DecodePackage(ref ReadOnlySequence<byte> buffer)
    {
        var package = new MyPackage();

        var reader = new SequenceReader<byte>(buffer);

        reader.TryRead(out byte packageKey);
        package.Key = packageKey;            
        reader.Advance(2); // skip the length             
        package.Body = reader.ReadString();

        return package;
    }
}

最後,你可通過數據包的類型和 PipelineFilter 的類型來創建宿主:

var host = SuperSocketHostBuilder.Create<MyPackage, MyPipelineFilter>()
    .UsePackageHandler(async (s, p) =>
    {
        // handle your package over here
    }).Build();

你也可以通過將解析包的程式碼從 PipelineFilter 移到 你的包解碼器中來獲得更大的靈活性:

public class MyPackageDecoder : IPackageDecoder<MyPackage>
{
    public MyPackage Decode(ref ReadOnlySequence<byte> buffer, object context)
    {
        var package = new MyPackage();

        var reader = new SequenceReader<byte>(buffer);

        reader.TryRead(out byte packageKey);
        package.Key = packageKey;            
        reader.Advance(2); // skip the length             
        package.Body = reader.ReadString();

        return package;
    }
}

通過 host builder 的 UsePackageDecoder 方法來在SuperSocket中啟用它:

builder.UsePackageDecoder<MyPackageDecoder>();

4.2.3 另一種掛載解析器的方式

在Asp.Net Core Application我們可以new(), 直接注入或者採用工廠模式等方式, 向Host中注入協議解析器, 然後在過濾波器中進行使用.

public class MyPipelineFilter : FixedHeaderPipelineFilter<MyPackage>
{
    public readonly PacketConvert _packageConvert;
    public MyPipelineFilter()
        : base(3) // 包頭的大小是3位元組,所以將3傳如基類的構造方法中去
    {
        _packageConvert = new PackageConvert();
    }

    // 從數據包的頭部返回包體的大小
    protected override int GetBodyLengthFromHeader(ref ReadOnlySequence<byte> buffer)
    {
        var reader = new SequenceReader<byte>(buffer);
        reader.Advance(1); // skip the first byte
        reader.TryReadBigEndian(out short len);
        return len;
    }

    // 將數據包解析成 MyPackage 的實例
    protected override MyPackage DecodePackage(ref ReadOnlySequence<byte> buffer)
    {
        var package = _packageConvert.Deserialize<Package>(buffer);

        return package;
    }
}

PS: 過濾器中DecodePackage返回的buffer可能不是完整的包, 例如固定頭尾結構的包中, 返回的buffer可能是去掉頭尾的格式

例如固定頭尾的包0x7E 0x7E xxxxxxx 0x7E 0x7E, 返回的buffer中頭尾的0x7E 0x7E會被去除, 只留下中間xxxxxxx的部分,所以在實現解碼器部分的時候需要注意.

7 擴展AppSession和SuperSocketService

7.1 擴展AppSession

在SuperSocket關於Socket的管理提供了SessionContainer供大家獲取程式中的Session實例, 只需在構建中調用.UseMiddleware<InProcSessionContainerMiddleware>()UseInProcSessionContainer()即可通過AppSession.Server.SessionContainer()獲取.

但是為了方便管理, 個人角色還是實現一個另外的SessionManager比較好, 這樣可以更方便的集成到我們的Asp.Net Core Application中. 使用ConcurrentDictionary原子字典來存儲, 可以避免一些讀寫上的死鎖問題.

public class SessionManager<TSession> where TSession : IAppSession
{
    /// <summary>
    /// 存儲的Session
    /// </summary>
    public ConcurrentDictionary<string, TSession> Sessions { get; private set; } = new();

    /// <summary>
    /// Session的數量
    /// </summary>
    public int Count => Sessions.Count;

    /// <summary>
    /// </summary>
    public SessionManager()
    {
    }

    public ConcurrentDictionary<string, TSession> GetAllSessions()
    {
        return Sessions;
    }

    /// <summary>
    /// 獲取一個Session
    /// </summary>
    /// <param name="key"> </param>
    /// <returns> </returns>
    public virtual async Task<TSession> TryGet(string key)
    {
        return await Task.Run(() =>
        {
            Sessions.TryGetValue(key, out var session);
            return session;
        });
    }

    /// <summary>
    /// 添加或者更新一個Session
    /// </summary>
    /// <param name="key">     </param>
    /// <param name="session"> </param>
    /// <returns> </returns>
    public virtual async Task TryAddOrUpdate(string key, TSession session)
    {
        await Task.Run(() =>
        {
            if (Sessions.TryGetValue(key, out var oldSession))
            {
                Sessions.TryUpdate(key, session, oldSession);
            }
            else
            {
                Sessions.TryAdd(key, session);
            }
        });
    }

    /// <summary>
    /// 移除一個Session
    /// </summary>
    /// <param name="key"> </param>
    /// <returns> </returns>
    public virtual async Task TryRemove(string key)
    {
        await Task.Run(() =>
        {
            if (Sessions.TryRemove(key, out var session))
            {
            }
            else
            {
            }
        });
    }

    /// <summary>
    /// 通過Session移除Session
    /// </summary>
    /// <param name="sessionId"> </param>
    /// <returns> </returns>
    public virtual async Task TryRemoveBySessionId(string sessionId)
    {
        await Task.Run(() =>
        {
            foreach (var session in Sessions)
            {
                if (session.Value.SessionID == sessionId)
                {
                    Sessions.TryRemove(session);
                    return;
                }
            }
        });
    }

    /// <summary>
    /// 刪除殭屍鏈接
    /// </summary>
    /// <returns> </returns>
    [Obsolete("該方法丟棄", true)]
    public virtual async Task TryRemoveZombieSessions()
    {
        await Task.Run(() =>
        {
        });
    }

    /// <summary>
    /// 移除所有Session
    /// </summary>
    /// <returns> </returns>
    public virtual async Task TryRemoveAll()
    {
        await Task.Run(() =>
        {
            Sessions.Clear();
        });
    }

    /// <summary>
    /// </summary>
    /// <param name="session"> </param>
    /// <param name="buffer">  </param>
    /// <returns> </returns>
    public virtual async Task SendAsync(TSession session, ReadOnlyMemory<byte> buffer)
    {
        if (session == null)
        {
            throw new ArgumentNullException(nameof(session));
        }
        await session.SendAsync(buffer);
    }

    /// <summary>
    /// </summary>
    /// <param name="session"> </param>
    /// <param name="message"> </param>
    /// <returns> </returns>
    public virtual async Task SendAsync(ClientSession session, string message)
    {
        if (session == null)
        {
            throw new ArgumentNullException(nameof(session));
        }
        // ReSharper disable once PossibleNullReferenceException
        await session.SendAsync(message);
    }

    /// <summary>
    /// </summary>
    /// <param name="session"> </param>
    /// <returns> </returns>
    public virtual async Task<Guid> FindIdBySession(TSession session)
    {
        return await Task.Run(() =>
        {
            return Guid.Parse(Sessions.First(x => x.Value.SessionID.Equals(session.SessionID)).Key);
        });
    }
}

7.2 如何自己實現SuperSocketService?

我們在使用SuperSocket時需要在Program.cs中來構建, 這樣會導致一個問題, 這樣我們的SuperSocket服務就會變得難以控制, 那麼有沒有一種寫法來將這部分程式碼抽離出來呢?

答案是有的, 我們可以採用.Net Core中的BackgroundService或者IHostedService來實現後台服務, 甚至將這些服務管理起來, 根據需要隨時創建, 隨時啟動, 隨時停止. 這樣做的好處還有, 我們可以隨時獲取依賴注入的服務來做一些更多的操作, 例如讀取配置, 管理Session, 配置編解碼器, 日誌, 應答器, MQ等等.

public class TcpSocketServerHostedService : IHostedService
{
    private readonly IOptions<ServerOption> _serverOptions;
    private readonly IOptions<KafkaOption> _kafkaOptions;
    private readonly ClientSessionManagers _clientSessionManager;
    private readonly TerminalSessionManager _gpsTrackerSessionManager;
    private readonly ILogger<TcpSocketServerHostedService> _logger;
    private readonly IGeneralRepository _generalRepository;
    private readonly NbazhGpsSerializer _nbazhGpsSerializer = new NbazhGpsSerializer();

    private static EV26MsgIdProducer _provider = null;

    /// <summary>
    /// Tcp Server服務
    /// </summary>
    /// <param name="serverOptions">            </param>
    /// <param name="kafkaOptions">             </param>
    /// <param name="clientSessionManager">     </param>
    /// <param name="gpsTrackerSessionManager"> </param>
    /// <param name="logger">                   </param>
    /// <param name="factory">                  </param>
    public TcpSocketServerHostedService(
        IOptions<ServerOption> serverOptions,
        IOptions<KafkaOption> kafkaOptions,
        ClientSessionManagers clientSessionManager,
        TerminalSessionManager gpsTrackerSessionManager,
        ILogger<TcpSocketServerHostedService> logger,
        IServiceScopeFactory factory)
    {
        _serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(serverOptions));
        _kafkaOptions = kafkaOptions;
        _clientSessionManager = clientSessionManager ?? throw new ArgumentNullException(nameof(clientSessionManager));
        _gpsTrackerSessionManager = gpsTrackerSessionManager ?? throw new ArgumentNullException(nameof(gpsTrackerSessionManager));
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
        _generalRepository = factory.CreateScope().ServiceProvider.GetRequiredService<IGeneralRepository>();
    }

    /// <summary>
    /// </summary>
    /// <param name="cancellationToken"> </param>
    /// <returns> </returns>
    public async Task StartAsync(CancellationToken cancellationToken)
    {
        var host = SuperSocketHostBuilder.Create<NbazhGpsPackage, ProtocolPipelineSwitcher>()
            .ConfigureSuperSocket(opts =>
            {
                foreach (var listener in _serverOptions.Value.TcpListeners)
                {
                    opts.AddListener(new ListenOptions()
                    {
                        Ip = listener.Ip,
                        Port = listener.Port
                    });
                }
            })
            .UseSession<GpsTrackerSession>()
            .UseClearIdleSession()
            .UseSessionHandler(onClosed: async (s, v) =>
                {
                    try
                    {
                        // Session管理
                        await _gpsTrackerSessionManager.TryRemoveBySessionId(s.SessionID);
                    }
                    catch
                    {
                        // ignored
                    }
                })
            .UsePackageHandler(async (s, packet) =>
            {
                // 處理包
            })
            .UseInProcSessionContainer()
            .BuildAsServer();

        await host.StartAsync();

        await Task.CompletedTask;
    }

    /// <summary>
    /// </summary>
    /// <param name="cancellationToken"> </param>
    /// <returns> </returns>
    public async Task StopAsync(CancellationToken cancellationToken)
    {
        try
        {
            await _gpsTrackerSessionManager.TryRemoveAll();
        }
        catch
        {
            // ignored
        }

        await Task.CompletedTask;
    }
}

實現服務後, 我們可以寫擴展方法將服務注入.

/// <summary>
/// 伺服器擴展
/// </summary>
public static class ServerBuilderExtensions
{
    /// <summary>
    /// 添加Tcp伺服器
    /// </summary>
    /// <param name="services"> </param>
    /// <returns> </returns>
    public static IServiceCollection AddTcpServer(
        this IServiceCollection services)
    {
        services.AddSingleton<TerminalSessionManager>();
        services.AddHostedService<TcpSocketServerHostedService>();
        return services;
    }

    /// <summary>
    /// 添加Ws伺服器
    /// </summary>
    /// <param name="services"> </param>
    /// <returns> </returns>
    public static IServiceCollection AddWsServer(
        this IServiceCollection services)
    {
        services.AddSingleton<ClientSessionManagers>();
        services.AddHostedService<WebSocketServerHostedService>();
        return services;
    }
}

8 擴展SuperSocket的功能

8.1 多協議切換

我們有時候會面對一種需求, 就是同一個介面, 需要接收不同的終端的協議包, 這樣我們通常會根據協議的不同點來區分協議. PS: 協議最好是同類型協議, 並且有明顯不同的特徵!

具體的實現方式就是實現一個特殊的PipelineFilter, 在下列程式碼中, 我們將讀取該包數據的第一個位元組來分辨該協議為0x78 0x78類型開頭的協議還是0x79 0x79開頭的協議, 然後將標記移回改包開頭, 然後將這一包數據交給對應的過濾器來進行解析:

// NbazhGpsPackage: 包編解碼器
public class ProtocolPipelineSwitcher : PipelineFilterBase<NbazhGpsPackage>
{
    private IPipelineFilter<NbazhGpsPackage> _filter7878;
    private byte _beginMarkA = 0x78;

    private IPipelineFilter<NbazhGpsPackage> _filter7979;
    private byte _beginMarkB = 0x79;

    public ProtocolPipelineSwitcher()
    {
        _filter7878 = new EV26PipelineFilter7878(this);
        _filter7979 = new EV26PipelineFilter7979(this);
    }

    public override NbazhGpsPackage Filter(ref SequenceReader<byte> reader)
    {
        if (!reader.TryRead(out byte flag))
        {
            throw new ProtocolException(@"flag byte cannot be read");
        }

        if (flag == _beginMarkA)
        {
            NextFilter = _filter7878;
        }
        else if (flag == _beginMarkB)
        {
            NextFilter = _filter7979;
        }
        else
        {
            return null;
            //throw new ProtocolException($"首位元組未知 {flag}");
        }

        // 將標記移回開頭
        reader.Rewind(1);
        return null;
    }
}

9 搭建WebSocket伺服器

WebSocket Server的實現方式與之前的Socket Server實現方式大致相同, 其中不同的地方主要為: WebSocket Server不需要配置編解碼器, 採用String作為消息格式等.

/// <summary>
/// </summary>
public class WebSocketServerHostedService : IHostedService
{
    private readonly IOptions<ServerOption> _serverOptions;
    private readonly ClientSessionManagers _clientSessionManager;
    private readonly TerminalSessionManager _gpsTrackerSessionManager;
    private readonly IGeneralRepository _generalRepository;

    /// <summary>
    /// </summary>
    /// <param name="serverOptions">            </param>
    /// <param name="clientSessionManager">     </param>
    /// <param name="gpsTrackerSessionManager"> </param>
    /// <param name="factory">                  </param>
    public WebSocketServerHostedService(
        IOptions<ServerOption> serverOptions,
        ClientSessionManagers clientSessionManager,
        TerminalSessionManager gpsTrackerSessionManager,
        IServiceScopeFactory factory)
    {
        _serverOptions = serverOptions ?? throw new ArgumentNullException(nameof(_serverOptions));
        _clientSessionManager = clientSessionManager ?? throw new ArgumentNullException(nameof(clientSessionManager));
        _gpsTrackerSessionManager = gpsTrackerSessionManager ?? throw new ArgumentNullException(nameof(gpsTrackerSessionManager));
        _generalRepository = factory.CreateScope().ServiceProvider.GetRequiredService<IGeneralRepository>();
    }

    /// <summary>
    /// WebSocketServer
    /// </summary>
    /// <param name="cancellationToken"> </param>
    /// <returns> </returns>
    public async Task StartAsync(CancellationToken cancellationToken)
    {
        var host = WebSocketHostBuilder.Create()
            .ConfigureSuperSocket(opts =>
            {
                foreach (var listener in _serverOptions.Value.WsListeners)
                {
                    opts.AddListener(new ListenOptions()
                    {
                        Ip = listener.Ip,
                        Port = listener.Port
                    });
                }
            })
            .UseSession<ClientSession>()
            .UseClearIdleSession()
            .UseSessionHandler(onClosed: async (s, v) =>
            {
                await _clientSessionManager.TryRemoveBySessionId(s.SessionID);
            })
            .UseWebSocketMessageHandler(async (s, p) =>
            {
                var package = p.Message.ToObject<ClientPackage>();

                if (package.PackageType == PackageType.Heart)
                {
                    
                    return;
                }

                if (package.PackageType == PackageType.Login)
                {
                    var client = _generalRepository.FindAsync<User>(x => x.Id.Equals(Guid.Parse(package.ClientId)));

                    if (client is null)
                    {
                        await s.CloseAsync(CloseReason.ProtocolError, "ClientId不存在");
                    }

                    var verifyCode = Guid.NewGuid().ToString();
                    var loginPacket = new ClientPackage()
                    {
                        PackageType = PackageType.Login,
                        ClientId = package.ClientId,
                        VerifyCode = verifyCode,
                    };
                    s["VerifyCode"] = verifyCode;

                    var msg = loginPacket.ToJson();
                    await s.SendAsync(msg);
                }

                // 追蹤
                if (package.PackageType == PackageType.Trace)
                {
                    return;
                }
            })
            .UseInProcSessionContainer()
            .BuildAsServer();

        await host.StartAsync();

        await Task.CompletedTask;
    }

    /// <summary>
    /// </summary>
    /// <param name="cancellationToken"> </param>
    /// <returns> </returns>
    public async Task StopAsync(CancellationToken cancellationToken)
    {
        await Task.CompletedTask;
    }
}

9.1 番外: WebSocket傳參

WebSocket的第一次請求是基於Http進行建立鏈接的, 所以WebSocket是可以在url中或者請求體中攜帶token等參數的. 當然後端並不像寫Api時那麼簡單的就可以獲取, 需要截取當前請求的Url或者攜帶的資訊, 然後進行讀取, 進而進行驗證等操作. 這部分的程式碼之後再補充…

//.Net Core從Url中讀取參數

10 多伺服器以及不同服務間的協同

得益於我們實現的SessionManager, 我們將不用ServerSessionManger注入DI後, 我們可以在任意ServerService中做到跨Service進行消息傳遞, 驗證等等操作.

More 1 協議的編解碼器開發預覽

協議編解碼器樣例:

EV26 Gps通訊協議(使用方法在xUnit測試中):

  1. Github
  2. Gitee

簡單樣例:

public class Nbazh0X01Test
{
    private readonly ITestOutputHelper _testOutputHelper;
    private NbazhGpsSerializer NbazhGpsSerializer = new NbazhGpsSerializer();

    public Nbazh0X01Test(ITestOutputHelper testOutputHelper)
    {
        _testOutputHelper = testOutputHelper;
    }

    [Fact]
    public void Test1()
    {
        //78 78 11 01 07 52 53 36 78 90 02 42 70 00 32 01 00 05 12 79 0D 0A

        var hex = "7878 11 01 07 52 53 36 78 90 02 42 7000 3201 0005 1279 0D0A".ToHexBytes();

        // ----協議解析部分----//
        var packet = NbazhGpsSerializer.Deserialize(hex);
        Nbazh0X01 body = (Nbazh0X01)packet.Bodies;
        // ----協議解析部分----//

        Assert.Equal(0x11, packet.Header.Length);
        Assert.Equal(0x01, packet.Header.MsgId);

        Assert.Equal("7 52 53 36 78 90 02 42".Replace(" ", ""), body.TerminalId);
        Assert.Equal(0x7000, body.TerminalType);
        //Assert.Equal(0x3201, body.TimeZoneLanguage.Serialize());

        Assert.Equal(0x0005, packet.Header.MsgNum);
        Assert.Equal(0x1279, packet.Header.Crc);

        // 時區 0011 001000000001
    }
}

More 2 DotNetty

以後我們會探究DotNetty與SuperSocket的異同.