(八)分布式通信—-主机Host

  • 2019 年 10 月 3 日
  • 筆記

 ==>>点击查看本系列文章目录

 

上节中有谈到的是通信主机(TransportHost),本节中主机(ServiceHost)负责管理服务的生命周期。

项目中将两个主机拆分开,实现不同的功能:

通信主机:用于启动通信监听端口;

生命周期管理的主机:负责模块功能的依赖注入,管理生命周期。

 

先看一下启动服务端主机和客户端主机后完成通信的效果图:

 

 文件结构如下:

 

ServiceHost 主机由ServiceHostBuilder来构建。

过程如下:

先看调用图: 

1.Program中Main() 调用 ServiceHostBuilder 的方法:MapServices、RegisterServices、ConfigureServices、Configure

  分别将委托填充到 List<Action<IContainer>>、List<Action<ContainerBuilder>>、List<Action<IServiceCollection>>、List<Action<IConfigurationBuilder>> 类型的容器中。

  其中 IContainer、ContainerBuilder 是 Autofac中的容器,IServiceCollection、IConfigurationBuilder 是 Microsoft中的容器。

2. Program中Main() 调用 ServiceHostBuilder 的方法 UseStartup<Startup>()  ,Startup 必须实现 IStartup,完成Startup 的单例注入(微软中的 Startup 可以不实现 IStartup ,但是必须使用方法ConfigureServices、Configure)

3. Program中Main() 调用 ServiceHostBuilder 的方法 Build()

  (1)回调容器 List<Action<IContainer>>、List<Action<ContainerBuilder>>、List<Action<IServiceCollection>>、List<Action<IConfigurationBuilder>> 中的委托。

      容器生成过程: ConfigureServices   —》  List<Action<IServiceCollection>>      —》  IServiceCollection

            Configure      —》  List<Action<IConfigurationBuilder>>      —》  IConfigurationBuilder

            IServiceCollection + IConfigurationBuilder  —》  IServiceCollection   —》  ServiceProvider

            RegisterServices     —》  List<Action<ContainerBuilder>>       —》  ContainerBuilder

            ContainerBuilder + IServiceCollection         —》  ContainerBuilder 

            MapServices      —》  List<Action<IContainer>>

  (2)将上面红色字体的对象通过构造函数传给new 的 ServiceHost 对象。

  (3)调用ServiceHost .Initialize(),  该方法中执行如下过程

    a. 用ServiceProvider 解析出 Startup 对象

    b. 回调Startup的 IContainer ConfigureServices(ContainerBuilder builder) , 返回构建好的容器 IContainer

    c. 回调Startup的 void Configure(IContainer app) , IContainer中注入其它功能

  (4)将包含IContainer容器的ServiceHost 对象返回

4. ServiceHost.Run(), 回调主机中一直未执行的容器委托 List<Action<IContainer>> 

总结一下,整个过程就是将原来的四个委托的容器最后合并成一个 IContainer 容器。

解析容器中的服务,可以用 :

IContainer _container;  IDemoService service = _container.Resolve<IDemoService>(); 

 

服务端和客户端启动:

 

 

代码:

我们先看客户端和服务端代码:

服务端:

namespace Leo.ServiceLaunch.Server  {      class Program      {          static void Main(string[] args)          {              Console.WriteLine("Server, Hello World!");                var host = new ServiceHostBuilder()                  .RegisterServices(builder =>                  {                      builder.RegisterType<MessagePackTransportMessageCodecFactory>().As<ITransportMessageCodecFactory>().SingleInstance();                      builder.RegisterType(typeof(HttpServiceExecutor)).As(typeof(IServiceExecutor)).Named<IServiceExecutor>("tcp").SingleInstance();                      builder.Register(provider =>                      {                          return new DotNettyServerMessageListener(provider.Resolve<ILogger<DotNettyServerMessageListener>>(),                                provider.Resolve<ITransportMessageCodecFactory>());                      }).SingleInstance();                      builder.Register(provider =>                      {                          var serviceExecutor = provider.ResolveKeyed<IServiceExecutor>("tcp");                          var messageListener = provider.Resolve<DotNettyServerMessageListener>();                          return new DotNettyTransportHost(async endPoint =>                          {                              await messageListener.StartAsync(endPoint);                              return messageListener;                          }, serviceExecutor);                      }).As<ITransportHost>();                  })                  .UseServer()  // 指定监听的端口                  .UseStartup<Startup>()                  .Build();                using (host.Run())              {                  Console.WriteLine($"服务端启动成功,{DateTime.Now}。");              }                Console.ReadLine();          }        }  }

namespace Leo.ServiceLaunch.Server  {      class Startup : IStartup      {          public IContainer ConfigureServices(ContainerBuilder builder)          {              return builder.Build();          }            public void Configure(IContainer app)          {            }      }  }

客户端:

namespace Leo.ServiceLaunch.Client  {      class Program      {          static void Main(string[] args)          {              Console.WriteLine("Client, Hello World!");                var host = new ServiceHostBuilder()                  .RegisterServices(builder =>                  {                      builder.RegisterType<MessagePackTransportMessageCodecFactory>().As<ITransportMessageCodecFactory>().SingleInstance();                      builder.Register(provider =>                      {                          IServiceExecutor serviceExecutor = null;                          if (provider.IsRegistered(typeof(IServiceExecutor)))  // 没有注册客户端接收消息执行器,因此一直为空                              serviceExecutor = provider.Resolve<IServiceExecutor>();                          return new DotNettyTransportClientFactory(provider.Resolve<ITransportMessageCodecFactory>(),                              provider.Resolve<ILogger<DotNettyTransportClientFactory>>(),                              serviceExecutor);                      }).As(typeof(ITransportClientFactory)).SingleInstance();                  })                  .UseStartup<Startup>()                  .Build();                using (host.Run())              {                  Startup.Test();              }              Console.ReadLine();          }      }  }

namespace Leo.ServiceLaunch.Client  {      class Startup : IStartup      {          private static IContainer _container;          public void Configure(IContainer app)          {          }            public IContainer ConfigureServices(ContainerBuilder builder)          {              _container = builder.Build();              return _container;          }            internal static void Test()          {              Task.Run(async () =>              {                  do                  {                      Console.WriteLine("正在循环 1万次发送消息.....");                        //1w次调用                      var watch = Stopwatch.StartNew();                      for (var i = 1; i < 10000; i++)                      {                          var invokeMessage = new TransportMessage                          {                              Id = i.ToString(),                              ContentType = "string",                              Content = "你好啊,这是客户端发给服务端的消息"                          };                          try                          {                              var endPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 981);                              ITransportClientFactory transportClientFactory = _container.Resolve<ITransportClientFactory>();                              var client = await transportClientFactory.CreateClientAsync(endPoint);                              await client.SendAsync(invokeMessage);                          }                          catch (Exception exception)                          {                              Console.WriteLine(exception.ToString(), $"发起请求中发生了错误,服务Id:{invokeMessage.Id}。");                              throw;                          }                      }                      watch.Stop();                      Console.WriteLine($"1万次发送结束,执行时间:{watch.ElapsedMilliseconds}ms");                      Console.WriteLine("Press any key to continue, q to exit the loop...");                      var key = Console.ReadLine();                      if (key.ToLower() == "q")                          break;                  } while (true);              }).Wait();          }      }  }

主机:

IServiceHost:

    public interface IServiceHost : IDisposable      {          IDisposable Run();            IContainer Initialize();      }

IServiceHostBuilder:

    public interface IServiceHostBuilder      {          IServiceHost Build();            IServiceHostBuilder RegisterServices(Action<ContainerBuilder> builder);            IServiceHostBuilder ConfigureServices(Action<IServiceCollection> configureServices);            IServiceHostBuilder Configure(Action<IConfigurationBuilder> builder);            IServiceHostBuilder MapServices(Action<IContainer> mapper);      }

ServiceHost:

public class ServiceHost : IServiceHost      {          private readonly ContainerBuilder _builder;          private IStartup _startup;          private IContainer _applicationServices;          private readonly IServiceProvider _hostingServiceProvider;          private readonly List<Action<IContainer>> _mapServicesDelegates;            public ServiceHost(ContainerBuilder builder,              IServiceProvider hostingServiceProvider,               List<Action<IContainer>> mapServicesDelegate)          {              _builder = builder;              _hostingServiceProvider = hostingServiceProvider;              _mapServicesDelegates = mapServicesDelegate;          }            public IContainer Initialize()          {              if (_applicationServices == null)              {                  try                  {                      if (_applicationServices == null)                      {                          if (_startup == null)                          {                              // 解析出 Startup                               _startup = _hostingServiceProvider.GetRequiredService<IStartup>();                          }                          //回调Startup中的 ConfigureServices,                          _applicationServices = _startup.ConfigureServices(_builder);                      }                      if (_applicationServices == null)                          _applicationServices = _builder.Build();                      Action<IContainer> configure = _startup.Configure;                      configure(_applicationServices);                  }                  catch (Exception ex)                  {                      Console.Out.WriteLine("应用程序启动异常: " + ex.ToString());                      throw;                  }              }              return _applicationServices;          }            public IDisposable Run()          {              RunAsync().GetAwaiter().GetResult();              return this;          }            public async Task RunAsync()          {              if (_applicationServices != null)                  MapperServices(_applicationServices);          }            private void MapperServices(IContainer mapper)          {              foreach (var mapServices in _mapServicesDelegates)              {                  mapServices(mapper);              }          }            public void Dispose()          {              (_hostingServiceProvider as IDisposable)?.Dispose();          }      }

ServiceHostBuilder:

public class ServiceHostBuilder : IServiceHostBuilder      {          private readonly List<Action<IServiceCollection>> _configureServicesDelegates;          private readonly List<Action<ContainerBuilder>> _registerServicesDelegates;          private readonly List<Action<IConfigurationBuilder>> _configureDelegates;          private readonly List<Action<IContainer>> _mapServicesDelegates;            public ServiceHostBuilder()          {              _configureServicesDelegates = new List<Action<IServiceCollection>>();              _registerServicesDelegates = new List<Action<ContainerBuilder>>();              _configureDelegates = new List<Action<IConfigurationBuilder>>();              _mapServicesDelegates = new List<Action<IContainer>>();            }            public IServiceHost Build()          {              #region Microsoft原生的容器              //执行 IServiceCollection 类型的委托              var services = BuildCommonServices();              //执行 IConfigurationBuilder 类型的委托              var config = Configure();              //日志注入到 IServiceCollection              services.AddLogging();              //IConfigurationBuilder 注入到 IServiceCollection              services.AddSingleton(typeof(IConfigurationBuilder), config);              //用 IServiceCollection 生成 ServiceProvider 服务提供器              var hostingServiceProvider = services.BuildServiceProvider();              #endregion                #region Autofac的容器              //执行 ContainerBuilder 类型的委托              var hostingServices = RegisterServices();              #endregion                //将 IServiceCollection 填充到 Autofac 的 ContainerBuilder 构建器中              hostingServices.Populate(services);                  //把Autofac的ContainerBuild的容器构建器、Microsoft的ServiceProvider服务提供器、已有的IContainer容器的委托 都放入主机中              var host = new ServiceHost(hostingServices, hostingServiceProvider, _mapServicesDelegates);              //主机初始化以后返回的是IContainer容器              var container = host.Initialize();              return host;          }            public IServiceHostBuilder MapServices(Action<IContainer> mapper)          {              if (mapper == null)              {                  throw new ArgumentNullException(nameof(mapper));              }              _mapServicesDelegates.Add(mapper);              return this;          }            public IServiceHostBuilder RegisterServices(Action<ContainerBuilder> builder)          {              if (builder == null)              {                  throw new ArgumentNullException(nameof(builder));              }              _registerServicesDelegates.Add(builder);              return this;          }            public IServiceHostBuilder ConfigureServices(Action<IServiceCollection> configureServices)          {              if (configureServices == null)              {                  throw new ArgumentNullException(nameof(configureServices));              }              _configureServicesDelegates.Add(configureServices);              return this;          }            public IServiceHostBuilder Configure(Action<IConfigurationBuilder> builder)          {              if (builder == null)              {                  throw new ArgumentNullException(nameof(builder));              }              _configureDelegates.Add(builder);              return this;          }            private IServiceCollection BuildCommonServices()          {              var services = new ServiceCollection();              foreach (var configureServices in _configureServicesDelegates)              {                  configureServices(services);              }              return services;          }            private IConfigurationBuilder Configure()          {              //var config = new ConfigurationBuilder().SetBasePath(AppContext.BaseDirectory);              var config = new ConfigurationBuilder();              foreach (var configure in _configureDelegates)              {                  configure(config);              }              return config;          }            private ContainerBuilder RegisterServices()          {              var hostingServices = new ContainerBuilder();              foreach (var registerServices in _registerServicesDelegates)              {                  registerServices(hostingServices);              }              return hostingServices;          }      }

IStartup:

    public interface IStartup      {          IContainer ConfigureServices(ContainerBuilder builder);            void Configure(IContainer app);      }

ServerExtensions:

    public static class ServerExtensions      {          public static IServiceHostBuilder UseServer(this IServiceHostBuilder hostBuilder)          {              return hostBuilder.MapServices(async mapper =>              {                  int _port = 981;                  string _ip = "127.0.0.1";                    Console.WriteLine($"准备启动服务主机,监听地址:{_ip}:{_port}。");                  var transportHosts = mapper.Resolve<IList<ITransportHost>>();                  Task.Factory.StartNew(async () =>                  {                      foreach (var transportHost in transportHosts)                          await transportHost.StartAsync(_ip, _port);                  }).Wait();              });          }            public static IServiceHostBuilder UseStartup<TStartup>(this IServiceHostBuilder hostBuilder) where TStartup : IStartup          {              return hostBuilder                  .ConfigureServices(services =>                  {                      services.AddSingleton(typeof(IStartup), typeof(TStartup));                  });          }      }