gRPC服务端启动流程走查
- 2019 年 11 月 28 日
- 筆記
1、服务端启动示例
server = ServerBuilder.forPort(port) // @1 .addService(new GreeterImpl()) // @2 .build() // @3 .start(); // @4
小结: @1 构建监听地址SocketAddress @2 将service注册到缓存 @3 Server构建 @4 服务端启动
小结:服务端启动只有一行代码,设计简洁。
2、构建监听地址SocketAddress
2.1 SPI加载NettyServerProvider
代码坐标:io.grpc.ServerProvider
ServerBuilder.forPort(port) public static ServerBuilder<?> forPort(int port) { return ServerProvider.provider().builderForPort(port); } private static final ServerProvider provider = ServiceProviders.load( ServerProvider.class, Collections.<Class<?>>emptyList(), ServerProvider.class.getClassLoader(), new PriorityAccessor<ServerProvider>() { @Override public boolean isAvailable(ServerProvider provider) { return provider.isAvailable(); } @Override public int getPriority(ServerProvider provider) { return provider.priority(); } }); // @1
@1 通过SPI对实例化NettyServerProvider String PREFIX = "META-INF/services/"; String fullName = PREFIX + service.getName(); c = Class.forName(cn, false, loader); 具体目录为:grpc-netty工程 META-INF/services/io.grpc.ServerProvider配置文件提供的配置io.grpc.netty.NettyServerProvider
2.2 根据指定端口创建监听地址
代码坐标1:io.grpc.netty.NettyServerProvider 代码坐标2:io.grpc.netty.NettyServerBuilder
protected NettyServerBuilder builderForPort(int port) { return NettyServerBuilder.forPort(port); } private final List<SocketAddress> listenAddresses = new ArrayList<>(); private NettyServerBuilder(int port) { this.listenAddresses.add(new InetSocketAddress(port)); // @1 }
@1 添加指定端口的SocketAddress监听地址并存入List容器缓存
小结:在服务端启动时指定监听端口,SPI加载NettyServerProvider,构建SocketAddress监听地址并存入List容器缓存。
3、将service注册到缓存
代码坐标:io.grpc.internal.AbstractServerImplBuilder
.addService(new GreeterImpl()) // @1 public final T addService(BindableService bindableService) { ... return addService(checkNotNull(bindableService, "bindableService").bindService()); // @2 } Builder addService(ServerServiceDefinition service) { services.put(service.getServiceDescriptor().getName(), service); // @3 return this; }
@1 GreeterImpl自定义提供的服务实现类需继承插件生产的抽象类GreeterGrpc.GreeterImplBase同时实现了BindableService接口 @2 bindService()不需要自己实现,插件自动生成代码时会自动生成其实现类 @3 将服务注册到缓存services中(LinkedHashMap)
小结:将自定义的服务提供实现类注册到缓存中。
4、Server构建
.build() public final Server build() { ServerImpl server = new ServerImpl( this, buildTransportServers(getTracerFactories()), // @1 Context.ROOT); for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) { notifyTarget.notifyOnBuild(server); } return server; } protected List<NettyServer> buildTransportServers(){ ... for (SocketAddress listenAddress : listenAddresses) { NettyServer transportServer = new NettyServer(...) // @2 } }
@1 创建NettyServer @2 为注册的每个SocketAddress创建NettyServer
4.1 NettyServer构造函数参数
NettyServer( SocketAddress address, ChannelFactory<? extends ServerChannel> channelFactory, // @1 Map<ChannelOption<?>, ?> channelOptions, // @2 ObjectPool<? extends EventLoopGroup/**一组EventLoop**/> bossGroupPool, // @3 ObjectPool<? extends EventLoopGroup> workerGroupPool, // @4 ProtocolNegotiator protocolNegotiator, // @5 List<? extends ServerStreamTracer.Factory> streamTracerFactories, // @6 TransportTracer.Factory transportTracerFactory, // @7 int maxStreamsPerConnection, // @8 int flowControlWindow, // @9 int maxMessageSize, // @10 int maxHeaderListSize, // @11 long keepAliveTimeInNanos, // @12 long keepAliveTimeoutInNanos, // @13 long maxConnectionIdleInNanos, // @14 long maxConnectionAgeInNanos, // @15 long maxConnectionAgeGraceInNanos,// @16 boolean permitKeepAliveWithoutCalls, // @17 long permitKeepAliveTimeInNanos, // @18 InternalChannelz channelz )
@1 Channel工厂类创建新的通道 @2 ChannelOption设置项 @3 用于accept客户端链接的线程池转发给workerGroupPool @4 初始化客户端连接的线程池 @5 遵循HTTP/2规范的通信协商 @6 用于创建ServerStreamTracer @7 创建TransportTracer工厂类用于统计通信流量 @8 每个连接允许的最大Streams @9 HTTP/2流控窗口大小 @10 服务端允许的最大消息体大小该属性已废弃使用maxInboundMessageSize代替 @11 接受最大Header大小已废弃使用maxInboundMetadataSize代替 @12 存活时间单位纳秒在存活时间内发送下一次keepAlive ping @13 keepalive ping requests的超时时间 @14 连接最大空闲时间超过后将优雅关闭 @15 连接最大存活时间超过后将优雅关闭 @16 当达到最大连接时RPCs有优雅关闭时间,在优雅时间内RPC请求未完成将被取消 @17 是否允许在没有RPC调用的情况下客户端发送keep-alive HTTP/2 PINGs 默认false @18 允许客户端保持连接的最大时间 默认5分钟
5、服务端启动
代码坐标:ServerImpl.start
.start() public ServerImpl start() throws IOException { synchronized (lock) { checkState(!started, "Already started");// @1 checkState(!shutdown, "Shutting down");// @2 ServerListenerImpl listener = new ServerListenerImpl(); // @3 for (InternalServer ts : transportServers) { ts.start(listener); // @4 activeTransportServers++; // @5 } executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); // @6 started = true; // @7 return this; }
@1 检查服务是否启动 @2 检查服务是否正在关闭 @3 transport创建监听器 @4 server启动 @5 活动server统计 @6 从守护线程池中获取一个线程 @7 服务启动标记
代码坐标:NettyServer.start
public void start(ServerListener serverListener) { ServerBootstrap b = new ServerBootstrap(); ... b.childHandler(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) { NettyServerTransport transport = new NettyServerTransport(...); // @1 } transport.start(transportListener); ... }); }
代码坐标:NettyServerTransport.start
... grpcHandler = createHandler(listener, channelUnused); // @2 ...
代码坐标:NettyServerHandler.newHandler
static NettyServerHandler newHandler(...){ // @3 final Http2Connection connection = new DefaultHttp2Connection(true); WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection); dist.allocationQuantum(16 * 1024); // Make benchmarks fast again. DefaultHttp2RemoteFlowController controller = new DefaultHttp2RemoteFlowController(connection, dist); connection.remote().flowController(controller); final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer( permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS); // Create the local flow controller configured to auto-refill the connection window. connection.local().flowController( new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true)); frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer); Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter); encoder = new Http2ControlFrameLimitEncoder(encoder, 10000); Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder, frameReader); Http2Settings settings = new Http2Settings(); settings.initialWindowSize(flowControlWindow); settings.maxConcurrentStreams(maxStreams); settings.maxHeaderListSize(maxHeaderListSize); return new NettyServerHandler(...) }
@1 创建NettyServerTransport @2 基于Netty HTTP/2构建gRPC服务端 @3 具体的Netty HTTP/2实现,具体在分析HTTP/2时再回头分析
6、小结
从一行代码启动gRPC服务端开始,从注册地址、注册服务、Server构建、Server启动流程走查。gRPC基于Netty HTTP/2协议栈封装底层通信。