將WCF遷移到gRPC

使用protobuf-net.Grpc將WCF服務遷移到gRPC非常簡單。在這篇博文中,我們將看看它到底有多簡單。微軟關於將WCF服務遷移到gRPC的官方指南只提到了Gooogle.Protobuf方式,如果你有很多數據契約需要遷移到.proto格式,這可能會很耗時。然而,通過使用protobuf-net.Grpc我們能夠重用舊的WCF數據契約和服務契約,而只需要做最小的代碼更改。

遷移數據契約和服務契約

 在本節中,我們將使用一個簡單的請求響應的組合服務,它可以讓你下載給定交易者的單個投資組合或所有投資組合。服務和數據契約的定義如下:

[ServiceContract]
public interface IPortfolioService
{
    [OperationContract]
    Task<Portfolio> Get(Guid traderId, int portfolioId);

    [OperationContract]
    Task<List<Portfolio>> GetAll(Guid traderId);
}
[DataContract]
public class Portfolio
{
    [DataMember]
    public int Id { get; set; }

    [DataMember]
    public Guid TraderId { get; set; }

    [DataMember]
    public List<PortfolioItem> Items { get; set; }
}

[DataContract]
public class PortfolioItem
{
    [DataMember]
    public int Id { get; set; }

    [DataMember]
    public int ShareId { get; set; }

    [DataMember]
    public int Holding { get; set; }

    [DataMember]
    public decimal Cost { get; set; }
}

在將數據契約和服務契約遷移到gRPC之前,我建議為契約創建一個新的類庫。這些契約可以通過項目引用或包引用在服務器和客戶端之間很容易地共享,這取決於你的WCF解決方案的結構。一旦我們創建了類庫,我們將源文件進行複製並開始遷移到gRPC。

不像使用Google.Protobuf遷移到gRPC那樣,數據契約只需要最小的更改。我們需要做的唯一一件事是在DataMember屬性中定義Order屬性。這相當於在創建.proto格式的消息時定義字段號。這些字段號用於標識消息二進制格式中的字段,並且在使用消息後不應更改。

[DataContract]
public class Portfolio
{
    [DataMember(Order = 1)]
    public int Id { get; set; }

    [DataMember(Order = 2)]
    public Guid TraderId { get; set; }

    [DataMember(Order = 3)]
    public List<PortfolioItem> Items { get; set; }
}

[DataContract]
public class PortfolioItem
{
    [DataMember(Order = 1)]
    public int Id { get; set; }

    [DataMember(Order = 2)]
    public int ShareId { get; set; }

    [DataMember(Order = 3)]
    public int Holding { get; set; }

    [DataMember(Order = 4)]
    public decimal Cost { get; set; }
}

由於gRPC和WCF之間的差異,服務契約將需要更多的修改。gRPC服務中的RPC方法必須只定義一種消息類型作為請求參數,並且只返回一條消息。我們不能接受標量類型(即基本類型)作為請求參數,也不能返回標量類型。我們需要將所有原始參數合併到一個消息中(即DataContract)。這也解釋了Guid參數類型,因為它可能被序列化為字符串,這取決於你如何配置protobuf-net。我們也不能接受消息列表(或標量)或返回消息列表(或標量)。記住這些規則後,我們需要修改我們的服務契約,使其看起來像下面這樣:

[ServiceContract]
public interface IPortfolioService
{
    [OperationContract]
    Task<Portfolio> Get(GetPortfolioRequest request);

    [OperationContract]
    Task<PortfolioCollection> GetAll(GetAllPortfoliosRequest request);
}

服務契約中的上述更改迫使我們創建一些額外的數據契約。因此,我們創建如下:

[DataContract]
public class GetPortfolioRequest
{
    [DataMember(Order = 1)]
    public Guid TraderId { get; set; }

    [DataMember(Order = 2)]
    public int PortfolioId { get; set; }
}

[DataContract]
public class GetAllPortfoliosRequest
{
    [DataMember(Order = 1)]
    public Guid TraderId { get; set; }
}

[DataContract]
public class PortfolioCollection
{
    [DataMember(Order = 1)]
    public List<Portfolio> Items { get; set; }
}

基本上是這樣。現在我們已經將我們的WCF服務契約和數據契約遷移到gRPC。下一步是將數據層遷移到.net Core。

將PortfolioData庫遷移到.net Core

接下來,我們將把PortfolioData庫遷移到.net Core,就像微軟指南中描述的那樣。但是,我們不需要複製模型(Portfolio.cs和PortfolioItem.cs),因為它們已經在我們在上一節中創建的類庫中定義了。相反,我們將向該共享庫添加一個項目引用。下一步是將WCF服務遷移到ASP.Net Core應用程序。

將WCF服務遷移到ASP.Net Core應用程序

我們需要做的第一件事是創建一個ASP.Net Core應用程序。因此,要麼啟動你最喜歡的IDE,創建一個基本的ASP.NET Core application或從命令行運行dotnet new web。接下來,我們需要添加一個對protobuf-net.Grpc的包。使用你最喜歡的包管理器安裝它,或者簡單地運行dotnet add package protobuf-net.Grpc.AspNetCore。我們還需要向上一節中創建的PortfolioData庫添加一個項目引用。

現在我們已經準備好了項目,並且添加了所有的依賴項,我們可以繼續並創建portfolio服務。創建一個具有以下內容的新類。

public class PortfolioService : IPortfolioService
{
    private readonly IPortfolioRepository _repository;

    public PortfolioService(IPortfolioRepository repository)
    {
        _repository = repository;
    }

    public async Task<Portfolio> Get(GetPortfolioRequest request)
    {
        var portfolio = await _repository.GetAsync(request.TraderId, request.PortfolioId);

        return portfolio;
    }

    public async Task<PortfolioCollection> GetAll(GetAllPortfoliosRequest request)
    {
        var portfolios = await _repository.GetAllAsync(request.TraderId);

        var response = new PortfolioCollection
        {
            Items = portfolios
        };

        return response;
    }
}

上面的服務看起來與WCF服務實現非常相似,除了輸入參數類型和返回參數類型之外。

最後但並非最不重要的,我們需要將protobuf-net.Grpc接入ASP.Net Core管道,並在DI容器中註冊。在啟Startup.cs,我們將做以下補充:

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        services.AddScoped<IPortfolioRepository, PortfolioRepository>();
        services.AddCodeFirstGrpc();
    }

    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
    {
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }

        app.UseRouting();

        app.UseEndpoints(endpoints =>
        {
            endpoints.MapGrpcService<PortfolioService>();
        });
    }
}

現在我們已經有了gRPC服務。我們列表上的下一件事是創建客戶端應用程序。

創建gRPC客戶端應用程序

對於我們的客戶端應用程序,我們將繼續創建一個控制台應用程序。要麼使用你最喜歡的IDE創建一個控制台,要麼直接從命令行運行dotnet new console。接下來,我們需要添加對protobuf-net.Grpc和Grpc.Net.Client的NuGet包。使用你最喜歡的包管理器安裝它們,或者簡單地運行dotnet add package protobuf-net.Grpc和dotnet add package Grpc.Net.Client。我們還需要向我們在第一節中創建的共享庫添加一個項目引用。

在我們的Program.cs中,我們將添加以下代碼來創建gRPC客戶端並與gRPC服務通信。

class Program
{
    private const string ServerAddress = "//localhost:5001";

    static async Task Main()
    {
        var channel = GrpcChannel.ForAddress(ServerAddress);
        var portfolios = channel.CreateGrpcService<IPortfolioService>();

        try
        {
            var request = new GetPortfolioRequest
            {
                TraderId = Guid.Parse("68CB16F7-42BD-4330-A191-FA5904D2E5A0"),
                PortfolioId = 42
            };
            var response = await portfolios.Get(request);

            Console.WriteLine($"Portfolio contains {response.Items.Count} items.");
        }
        catch (RpcException e)
        {
            Console.WriteLine(e.ToString());
        }
    }
}

現在我們可以測試我們的實現,首先啟動ASP.NET Core應用程序,然後啟動控制台應用程序。

將WCF雙工服務遷移到gRPC

現在我們已經介紹了使用protobuf-net.Grpc 將WCF服務遷移到gRPC的基本知識,我們可以看看一些更複雜的例子。

在本節中,我們將查看SimpleStockPriceTicker,這是一個雙工服務,客戶端啟動連接,服務器使用回調接口在更新可用時發送更新。WCF服務有一個沒有返回類型的方法,因為它使用回調接口ISimpleStockTickerCallback實時向客戶端發送數據。

[ServiceContract(SessionMode = SessionMode.Required, CallbackContract = typeof(ISimpleStockTickerCallback))]
public interface ISimpleStockTickerService
{
    [OperationContract(IsOneWay = true)]
    void Subscribe(string[] symbols);
}

[ServiceContract]
public interface ISimpleStockTickerCallback
{
    [OperationContract(IsOneWay = true)]
    void Update(string symbol, decimal price);
}

當將這個服務遷移到gRPC時,我們可以使用gRPC流。gRPC服務器流的工作方式與上面的WCF服務類似。例如,客戶端發送一個請求,而服務器以一個消息流響應。在protobuf-net.Grpc中實現服務器流的慣用方法是從RPC方法返回IAsyncEnumerable<T>。通過這種方式,我們可以在客戶端和服務器端為服務契約使用相同的接口。請注意protobuf-net.Grpc也支持Google.Protobuf模式(在服務器端使用IServerStreamWriter<T>,在客戶端使用AsyncServerStreamingCall<T>),需要我們為客戶端和服務端使用單獨的接口方法。使用IAsyncEnumerable<T>作為流媒體將使我們的服務契約看起來像下面的代碼。

[ServiceContract]
public interface IStockTickerService
{
    [OperationContract]
    IAsyncEnumerable<StockTickerUpdate> Subscribe(SubscribeRequest request, CallContext context = default);
}

請注意CallContext參數,它是客戶端和服務器端的gRPC調用上下文。這允許我們在客戶端和服務器端訪問調用上下文,而不需要單獨的接口。Gogogle.Protobuf生成的代碼將在客戶端使用調用,而在服務器端使用ServerCallContext。

因為WCF服務只使用基本類型作為參數,所以我們需要創建一組可以用作參數的數據契約。上面的服務附帶的數據契約看起來像這樣。注意,我們已經向響應消息添加了一個時間戳字段,這個字段在原始WCF服務中不存在。

[DataContract]
public class SubscribeRequest
{
    [DataMember(Order = 1)]
    public List<string> Symbols { get; set; } = new List<string>();
}

[DataContract]
public class StockTickerUpdate
{
    [DataMember(Order = 1)]
    public string Symbol { get; set; }

    [DataMember(Order = 2)]
    public decimal Price { get; set; }

    [DataMember(Order = 3)]
    public DateTime Time { get; set; }
}

通過重用微軟遷移指南中的IStockPriceSubscriberFactory,我們可以實現下面的服務。通過使用System.Threading.Channels,可以很容易地將事件流到一個異步可枚舉對象。

public class StockTickerService : IStockTickerService, IDisposable
{
    private readonly IStockPriceSubscriberFactory _subscriberFactory;
    private readonly ILogger<StockTickerService> _logger;
    private IStockPriceSubscriber _subscriber;

    public StockTickerService(IStockPriceSubscriberFactory subscriberFactory, ILogger<StockTickerService> logger)
    {
        _subscriberFactory = subscriberFactory;
        _logger = logger;
    }

    public IAsyncEnumerable<StockTickerUpdate> Subscribe(SubscribeRequest request, CallContext context = default)
    {
        var buffer = Channel.CreateUnbounded<StockTickerUpdate>();

        _subscriber = _subscriberFactory.GetSubscriber(request.Symbols.ToArray());
        _subscriber.Update += async (sender, args) =>
        {
            try
            {
                await buffer.Writer.WriteAsync(new StockTickerUpdate
                {
                    Symbol = args.Symbol,
                    Price = args.Price,
                    Time = DateTime.UtcNow
                });
            }
            catch (Exception e)
            {
                _logger.LogError($"Failed to write message: {e.Message}");
            }
        };

        return buffer.AsAsyncEnumerable(context.CancellationToken);
    }

    public void Dispose()
    {
        _subscriber?.Dispose();
    }
}

WCF全雙工服務允許雙向異步、實時消息傳遞。在前面的示例中,客戶機啟動了一個請求並接收到一個更新流。在這個版本中,客戶端流化請求消息,以便對訂閱列表添加和刪除,而不必創建新的訂閱。WCF服務契約的定義如下。客戶端使用Subscribe方法啟動訂閱,並使用AddSymbol和RemoveSymbol方法添加或刪除。更新通過回調接口接收,這與前面的服務器流示例相同。

[ServiceContract(SessionMode = SessionMode.Required, CallbackContract = typeof(IFullStockTickerCallback))]
public interface IFullStockTickerService
{
    [OperationContract(IsOneWay = true)]
    void Subscribe();

    [OperationContract(IsOneWay = true)]
    void AddSymbol(string symbol);

    [OperationContract(IsOneWay = true)]
    void RemoveSymbol(string symbol);
}

[ServiceContract]
public interface IFullStockTickerCallback
{
    [OperationContract(IsOneWay = true)]
    void Update(string symbol, decimal price);
}

使用protobuf-net.Gprc實現的等價服務契約的情況如下。該服務接受請求消息流並返迴響應消息流。

[ServiceContract]
public interface IFullStockTicker
{
    [OperationContract]
    IAsyncEnumerable<StockTickerUpdate> Subscribe(IAsyncEnumerable<SymbolRequest> request, CallContext context = default);
}

下面定義了附帶的數據契約。該請求包括一個action屬性,該屬性指定該符號是應該從訂閱中添加還是刪除。響應消息與前面的示例相同。

public enum SymbolRequestAction
{
    Add = 0,
    Remove = 1
}

[DataContract]
public class SymbolRequest
{
    [DataMember(Order = 1)]
    public SymbolRequestAction Action { get; set; }

    [DataMember(Order = 2)]
    public string Symbol { get; set; }
}

[DataContract]
public class StockTickerUpdate
{
    [DataMember(Order = 1)]
    public string Symbol { get; set; }

    [DataMember(Order = 2)]
    public decimal Price { get; set; }

    [DataMember(Order = 3)]
    public DateTime Time { get; set; }
}

服務的實現如下所示。我們使用與前面示例相同的技術,通過IAsyncEnumerable<T>來流動事件,另外創建一個後台任務,它枚舉請求流,並對單個請求進行響應。

public class FullStockTickerService : IFullStockTicker, IDisposable
{
    private readonly IFullStockPriceSubscriberFactory _subscriberFactory;
    private readonly ILogger<FullStockTickerService> _logger;
    private IFullStockPriceSubscriber _subscriber;
    private Task _processRequestTask;
    private CancellationTokenSource _cts;

    public FullStockTickerService(IFullStockPriceSubscriberFactory subscriberFactory, ILogger<FullStockTickerService> logger)
    {
        _subscriberFactory = subscriberFactory;
        _logger = logger;
        _cts = new CancellationTokenSource();
    }

    public IAsyncEnumerable<StockTickerUpdate> Subscribe(IAsyncEnumerable<SymbolRequest> request, CallContext context)
    {
        var cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token, context.CancellationToken).Token;
        var buffer = Channel.CreateUnbounded<StockTickerUpdate>();

        _subscriber = _subscriberFactory.GetSubscriber();
        _subscriber.Update += async (sender, args) =>
        {
            try
            {
                await buffer.Writer.WriteAsync(new StockTickerUpdate
                {
                    Symbol = args.Symbol,
                    Price = args.Price,
                    Time = DateTime.UtcNow
                });
            }
            catch (Exception e)
            {
                _logger.LogError($"Failed to write message: {e.Message}");
            }
        };

        _processRequestTask = ProcessRequests(request, buffer.Writer, cancellationToken);

        return buffer.AsAsyncEnumerable(cancellationToken);
    }

    private async Task ProcessRequests(IAsyncEnumerable<SymbolRequest> requests, ChannelWriter<StockTickerUpdate> writer, CancellationToken cancellationToken)
    {
        await foreach (var request in requests.WithCancellation(cancellationToken))
        {
            switch (request.Action)
            {
                case SymbolRequestAction.Add:
                    _subscriber.Add(request.Symbol);
                    break;
                case SymbolRequestAction.Remove:
                    _subscriber.Remove(request.Symbol);
                    break;
                default:
                    _logger.LogWarning($"Unknown Action '{request.Action}'.");
                    break;
            }
        }

        writer.Complete();
    }

    public void Dispose()
    {
        _cts.Cancel();
        _subscriber?.Dispose();
    }
}

總結

恭喜你!你已經走到這一步了。現在你知道了將WCF服務遷移到gRPC的另一種方法。希望這種技術比用.proto格式重寫現有的數據契約要快得多。

 歡迎關注我的公眾號,如果你有喜歡的外文技術文章,可以通過公眾號留言推薦給我。

 

原文鏈接://martinbjorkstrom.com/posts/2020-09-09-migrating-wcf-to-grpc