Orleans 知多少 | 3. Hello Orleans

  • 2019 年 10 月 28 日
  • 筆記

Orleans 3.0 Released

1. 引言

是的,Orleans v3.0.0 已經發佈了,並已經完全支持 .NET Core 3.0
所以,Orleans 系列是時候繼續了,抱歉,讓大家久等了。
萬丈高樓平地起,這一節我們就先來了解下Orleans的基本使用。

2. 模板項目講解

Orleans 核心概念

在上一篇文章中,我們了解到Orleans 作為.NET 分佈式框架,其主要包括三個部分:Client、Grains、Silo Host(Server)。因此,為了方便講解,創建如下的項目結構進行演示:
Hello.Orleans 項目結構

這裡有幾點需要說明:

  1. Orleans.Grains: 類庫項目,用於定義Grain的接口以及實現,需要引用Microsoft.Orleans.CodeGenerator.MSBuildMicrosoft.Orleans.Core.Abstractions NuGet包。
  2. Orleans.Server:控制台項目,為 Silo 宿主提供宿主環境,需要引用Microsoft.Orleans.ServerMicrosoft.Extensions.Hosting NuGet包,以及Orleans.Grains 項目。
  3. Orleans.Client:控制台項目,用於演示如何藉助Orleans Client建立與Orleans Server的連接,需要引用Microsoft.Orleans.ClientMicrosoft.Extensions.Hosting NuGet包,同時添加Orleans.Grains項目引用。

3. 第一個Grain

Grain作為Orleans的第一公民,以及Virtual Actor的實際代言人,想吃透Orleans,那Grain就是第一道坎。
先看一個簡單的Demo,我們來模擬統計網站的實時在線用戶。
Orlean s.Grains添加ISessionControl接口,主要用戶登錄狀態的管理。

public interface ISessionControlGrain : IGrainWithStringKey  {      Task Login(string userId);      Task Logout(string userId);      Task<int> GetActiveUserCount();  }

可以看見Grain的定義很簡單,只需要指定繼承自IGrain的接口就好。這裏面繼承自IGrainWithStringKey,說明該Grain 的Identity Key(身份標識)為string類型。同時需要注意的是
Grain 的方法申明,返回值必須是: Task、Task、ValueTask
緊接着定義SessionControlGrain來實現ISessionControlGrain接口。

public class SessionControlGrain : Grain, ISessionControlGrain  {      private List<string> LoginUsers { get; set; } = new List<string>();        public Task Login(string userId)      {          //獲取當前Grain的身份標識(因為ISessionControlGrain身份標識為string類型,GetPrimaryKeyString());          var appName = this.GetPrimaryKeyString();            LoginUsers.Add(userId);            Console.WriteLine($"Current active users count of {appName} is {LoginUsers.Count}");          return Task.CompletedTask;      }        public Task Logout(string userId)      {          //獲取當前Grain的身份標識          var appName = this.GetPrimaryKey();          LoginUsers.Remove(userId);            Console.WriteLine($"Current active users count of {appName} is {LoginUsers.Count}");          return Task.CompletedTask;      }        public Task<int> GetActiveUserCount()      {          return Task.FromResult(LoginUsers.Count);      }  }

實現也很簡單,Grain的實現要繼承自Grain基類。代碼中我們定義了一個List<string>集合用於保存登錄用戶。

4. 第一個Silo Host(Server)

定義一個Silo用於暴露Grain提供的服務,在Orleans.Server.Program中添加以下代碼用於啟動Silo Host。

static Task Main(string[] args)  {      Console.Title = typeof(Program).Namespace;        // define the cluster configuration      return Host.CreateDefaultBuilder()          .UseOrleans((builder) =>              {                  builder.UseLocalhostClustering()                      .AddMemoryGrainStorageAsDefault()                      .Configure<ClusterOptions>(options =>                      {                          options.ClusterId = "Hello.Orleans";                          options.ServiceId = "Hello.Orleans";                      })                      .Configure<EndpointOptions>(options => options.AdvertisedIPAddress = IPAddress.Loopback)                      .ConfigureApplicationParts(parts =>                          parts.AddApplicationPart(typeof(ISessionControlGrain).Assembly).WithReferences());              }          )          .ConfigureServices(services =>          {              services.Configure<ConsoleLifetimeOptions>(options =>              {                  options.SuppressStatusMessages = true;              });          })          .ConfigureLogging(builder => { builder.AddConsole(); })          .RunConsoleAsync();  }
  1. Host.CreateDefaultBuilder():創建泛型主機提供宿主環境。
  2. UseOrleans:用來配置Oleans。
  3. UseLocalhostClustering() :用於在開發環境下指定連接到本地集群。
  4. Configure<ClusterOptions>:用於指定連接到那個集群。
  5. Configure<EndpointOptions>:用於配置silo與silo、silo與client之間的通信端點。開發環境下可僅指定迴環地址作為集群間通信的IP地址。
  6. ConfigureApplicationParts():用於指定暴露哪些Grain服務。

以上就是開發環境下,Orleans Server的基本配置。對於詳細的配置也可以先參考Orleans Server Configuration。後續也會有專門的一篇文章來詳解。

5. 第一個Client

客戶端的定義也很簡單,主要是創建IClusterClient對象建立於Orleans Server的連接。因為IClusterClient最好能在程序啟動之時就建立連接,所以可以通過繼承IHostedService來實現。
Orleans.Client中定義ClusterClientHostedService繼承自IHostedService

public class ClusterClientHostedService : IHostedService  {      public IClusterClient Client { get; }        private readonly ILogger<ClusterClientHostedService> _logger;        public ClusterClientHostedService(ILogger<ClusterClientHostedService> logger, ILoggerProvider loggerProvider)      {          _logger = logger;          Client = new ClientBuilder()              .UseLocalhostClustering()              .Configure<ClusterOptions>(options =>              {                  options.ClusterId = "Hello.Orleans";                  options.ServiceId = "Hello.Orleans";              })              .ConfigureLogging(builder => builder.AddProvider(loggerProvider))              .Build();      }        public Task StartAsync(CancellationToken cancellationToken)      {          var attempt = 0;          var maxAttempts = 100;          var delay = TimeSpan.FromSeconds(1);          return Client.Connect(async error =>          {              if (cancellationToken.IsCancellationRequested)              {                  return false;              }                if (++attempt < maxAttempts)              {                  _logger.LogWarning(error,                      "Failed to connect to Orleans cluster on attempt {@Attempt} of {@MaxAttempts}.",                      attempt, maxAttempts);                    try                  {                      await Task.Delay(delay, cancellationToken);                  }                  catch (OperationCanceledException)                  {                      return false;                  }                    return true;              }              else              {                  _logger.LogError(error,                      "Failed to connect to Orleans cluster on attempt {@Attempt} of {@MaxAttempts}.",                      attempt, maxAttempts);                    return false;              }          });      }        public async Task StopAsync(CancellationToken cancellationToken)      {          try          {              await Client.Close();          }          catch (OrleansException error)          {              _logger.LogWarning(error, "Error while gracefully disconnecting from Orleans cluster. Will ignore and continue to shutdown.");          }      }  }

代碼講解:

  1. 構造函數中通過藉助ClientBuilder() 來初始化IClusterClient。其中UseLocalhostClustering()用於連接到開發環境中的localhost 集群。並通過Configure<ClusterOptions>指定連接到哪個集群。(需要注意的是,這裡的ClusterId必須與Orleans.Server中配置的保持一致。
Client = new ClientBuilder()      .UseLocalhostClustering()      .Configure<ClusterOptions>(options =>      {          options.ClusterId = "Hello.Orleans";          options.ServiceId = "Hello.Orleans";      })      .ConfigureLogging(builder => builder.AddProvider(loggerProvider))      .Build();
  1. StartAsync方法中通過調用Client.Connect建立與Orleans Server的連接。同時定義了一個重試機制。

緊接着我們需要將ClusterClientHostedService添加到Ioc容器,添加以下代碼到Orleans.Client.Program中:

static Task Main(string[] args)  {      Console.Title = typeof(Program).Namespace;        return Host.CreateDefaultBuilder()          .ConfigureServices(services =>          {              services.AddSingleton<ClusterClientHostedService>();              services.AddSingleton<IHostedService>(_ => _.GetService<ClusterClientHostedService>());              services.AddSingleton(_ => _.GetService<ClusterClientHostedService>().Client);                services.AddHostedService<HelloOrleansClientHostedService>();              services.Configure<ConsoleLifetimeOptions>(options =>              {                  options.SuppressStatusMessages = true;              });          })          .ConfigureLogging(builder =>          {              builder.AddConsole();          })          .RunConsoleAsync();  }

對於ClusterClientHostedService,並沒有選擇直接通過services.AddHostedService<T>的方式注入,是因為我們需要注入該服務中提供的IClusterClient(單例),以供其他類去消費。

緊接着,定義一個HelloOrleansClientHostedService用來消費定義的ISessionControlGrain

public class HelloOrleansClientHostedService : IHostedService  {      private readonly IClusterClient _client;      private readonly ILogger<HelloOrleansClientHostedService> _logger;        public HelloOrleansClientHostedService(IClusterClient client, ILogger<HelloOrleansClientHostedService> logger)      {          _client = client;          _logger = logger;      }      public async Task StartAsync(CancellationToken cancellationToken)      {          // 模擬控制台終端用戶登錄         await MockLogin("Hello.Orleans.Console");         // 模擬網頁終端用戶登錄         await MockLogin("Hello.Orleans.Web");      }        /// <summary>      /// 模擬指定應用的登錄      /// </summary>      /// <param name="appName"></param>      /// <returns></returns>      public async Task MockLogin(string appName)      {          //假設我們需要支持不同端登錄用戶,則只需要將項目名稱作為身份標識。          //即可獲取一個代表用來維護當前項目登錄狀態的的單例Grain。          var sessionControl = _client.GetGrain<ISessionControlGrain>(appName);          ParallelLoopResult result = Parallel.For(0, 10000, (index) =>          {              var userId = $"User-{index}";              sessionControl.Login(userId);          });            if (result.IsCompleted)          {              //ParallelLoopResult.IsCompleted 只是返回所有循環創建完畢,並不保證循環的內部任務創建並執行完畢              //所以,此處手動延遲5秒後再去讀取活動用戶數。              await Task.Delay(TimeSpan.FromSeconds(5));              var activeUserCount = await sessionControl.GetActiveUserCount();                _logger.LogInformation($"The Active Users Count of {appName} is {activeUserCount}");          }      }        public Task StopAsync(CancellationToken cancellationToken)      {          _logger.LogInformation("Closed!");            return Task.CompletedTask; ;      }  }

代碼講解:
這裡定義了一個MockLogin用於模擬不同終端10000個用戶的並發登錄。

  1. 通過構造函數注入需要的IClusterClient
  2. 通過指定Grain接口以及身份標識,就可以通過Client 獲取對應的Grain,進而消費Grain中暴露的方法。var sessionControl = _client.GetGrain<ISessionControlGrain>(appName); 這裡需要注意的是,指定的身份標識為終端應用的名稱,那麼在整個應用生命周期內,將有且僅有一個代表這個終端應用的Grain。
  3. 使用Parallel.For 模擬並發
  4. ParallelLoopResult.IsCompleted 只是返回所有循環任務創建完畢,並不代表循環的內部任務執行完畢。

6. 啟動第一個 Orleans 應用

先啟動Orleans.Server
Orleans Server Stared
再啟動Orleans.Client
Orleans Client

Orleans Server log

從上面的運行結果來看,模擬兩個終端10000個用戶的並發登錄,最終輸出的活動用戶數量均為10000個。
回顧整個實現,並沒有用到諸如鎖、並發集合等避免並發導致的線程安全問題,但卻輸出正確的期望結果,這就正好說明了Orleans強大的並發控制特性。

public class SessionControlGrain : Grain, ISessionControlGrain  {      // 未使用並發集合      private List<string> LoginUsers { get; set; } = new List<string>();        public Task Login(string userId)      {          //獲取當前Grain的身份標識(因為ISessionControlGrain身份標識為string類型,GetPrimaryKeyString());          var appName = this.GetPrimaryKeyString();            LoginUsers.Add(userId);//未加鎖            Console.WriteLine($"Current active users count of {appName} is {LoginUsers.Count}");          return Task.CompletedTask;      }      ....  }

7. 小結

通過簡單的演示,想必你對Orleans的編程實現有了基本的認知,並體會到其並發控制的強大之處。
這只是簡單的入門演練,Orleans很多強大的特性,後續再結合具體場景進行詳細闡述。
源碼已上傳至GitHub:Hello.Orleans