gRPC服務端啟動流程走查

  • 2019 年 11 月 28 日
  • 筆記

1、服務端啟動示例

server = ServerBuilder.forPort(port) // @1  .addService(new GreeterImpl()) // @2  .build() // @3  .start(); // @4

小結: @1 構建監聽地址SocketAddress @2 將service註冊到快取 @3 Server構建 @4 服務端啟動

小結:服務端啟動只有一行程式碼,設計簡潔。

2、構建監聽地址SocketAddress
2.1 SPI載入NettyServerProvider

程式碼坐標:io.grpc.ServerProvider

ServerBuilder.forPort(port)    public static ServerBuilder<?> forPort(int port) {      return ServerProvider.provider().builderForPort(port);  }    private static final ServerProvider provider = ServiceProviders.load(    ServerProvider.class,    Collections.<Class<?>>emptyList(),    ServerProvider.class.getClassLoader(),    new PriorityAccessor<ServerProvider>() {      @Override      public boolean isAvailable(ServerProvider provider) {        return provider.isAvailable();      }        @Override      public int getPriority(ServerProvider provider) {        return provider.priority();      }    }); // @1

@1 通過SPI對實例化NettyServerProvider String PREFIX = "META-INF/services/"; String fullName = PREFIX + service.getName(); c = Class.forName(cn, false, loader); 具體目錄為:grpc-netty工程 META-INF/services/io.grpc.ServerProvider配置文件提供的配置io.grpc.netty.NettyServerProvider

2.2 根據指定埠創建監聽地址

程式碼坐標1:io.grpc.netty.NettyServerProvider 程式碼坐標2:io.grpc.netty.NettyServerBuilder

protected NettyServerBuilder builderForPort(int port) {     return NettyServerBuilder.forPort(port);  }    private final List<SocketAddress> listenAddresses = new ArrayList<>();    private NettyServerBuilder(int port) {      this.listenAddresses.add(new InetSocketAddress(port)); // @1  }

@1 添加指定埠的SocketAddress監聽地址並存入List容器快取

小結:在服務端啟動時指定監聽埠,SPI載入NettyServerProvider,構建SocketAddress監聽地址並存入List容器快取。

3、將service註冊到快取

程式碼坐標:io.grpc.internal.AbstractServerImplBuilder

.addService(new GreeterImpl()) // @1    public final T addService(BindableService bindableService) {      ...      return addService(checkNotNull(bindableService, "bindableService").bindService()); // @2    }     Builder addService(ServerServiceDefinition service) {    services.put(service.getServiceDescriptor().getName(), service); // @3    return this;   }

@1 GreeterImpl自定義提供的服務實現類需繼承插件生產的抽象類GreeterGrpc.GreeterImplBase同時實現了BindableService介面 @2 bindService()不需要自己實現,插件自動生成程式碼時會自動生成其實現類 @3 將服務註冊到快取services中(LinkedHashMap)

小結:將自定義的服務提供實現類註冊到快取中。

4、Server構建
.build()  public final Server build() {  ServerImpl server = new ServerImpl(      this,      buildTransportServers(getTracerFactories()), // @1      Context.ROOT);  for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) {    notifyTarget.notifyOnBuild(server);  }  return server;  }    protected List<NettyServer> buildTransportServers(){      ...     for (SocketAddress listenAddress : listenAddresses) {       NettyServer transportServer = new NettyServer(...) // @2     }  }

@1 創建NettyServer @2 為註冊的每個SocketAddress創建NettyServer

4.1 NettyServer構造函數參數

NettyServer(    SocketAddress address, ChannelFactory<? extends ServerChannel> channelFactory, // @1    Map<ChannelOption<?>, ?> channelOptions, // @2    ObjectPool<? extends EventLoopGroup/**一組EventLoop**/> bossGroupPool, // @3    ObjectPool<? extends EventLoopGroup> workerGroupPool, // @4    ProtocolNegotiator protocolNegotiator, // @5    List<? extends ServerStreamTracer.Factory> streamTracerFactories, // @6    TransportTracer.Factory transportTracerFactory, // @7    int maxStreamsPerConnection, // @8    int flowControlWindow, // @9    int maxMessageSize, // @10    int maxHeaderListSize, // @11    long keepAliveTimeInNanos, // @12    long keepAliveTimeoutInNanos, // @13    long maxConnectionIdleInNanos, // @14    long maxConnectionAgeInNanos, // @15    long maxConnectionAgeGraceInNanos,// @16    boolean permitKeepAliveWithoutCalls, // @17    long permitKeepAliveTimeInNanos, // @18    InternalChannelz channelz  )

@1 Channel工廠類創建新的通道 @2 ChannelOption設置項 @3 用於accept客戶端鏈接的執行緒池轉發給workerGroupPool @4 初始化客戶端連接的執行緒池 @5 遵循HTTP/2規範的通訊協商 @6 用於創建ServerStreamTracer @7 創建TransportTracer工廠類用於統計通訊流量 @8 每個連接允許的最大Streams @9 HTTP/2流控窗口大小 @10 服務端允許的最大消息體大小該屬性已廢棄使用maxInboundMessageSize代替 @11 接受最大Header大小已廢棄使用maxInboundMetadataSize代替 @12 存活時間單位納秒在存活時間內發送下一次keepAlive ping @13 keepalive ping requests的超時時間 @14 連接最大空閑時間超過後將優雅關閉 @15 連接最大存活時間超過後將優雅關閉 @16 當達到最大連接時RPCs有優雅關閉時間,在優雅時間內RPC請求未完成將被取消 @17 是否允許在沒有RPC調用的情況下客戶端發送keep-alive HTTP/2 PINGs 默認false @18 允許客戶端保持連接的最大時間 默認5分鐘

5、服務端啟動

程式碼坐標:ServerImpl.start

.start()    public ServerImpl start() throws IOException {  synchronized (lock) {    checkState(!started, "Already started");// @1    checkState(!shutdown, "Shutting down");// @2    ServerListenerImpl listener = new ServerListenerImpl(); // @3    for (InternalServer ts : transportServers) {      ts.start(listener); // @4      activeTransportServers++; // @5    }    executor = Preconditions.checkNotNull(executorPool.getObject(), "executor"); // @6    started = true; // @7    return this;  }

@1 檢查服務是否啟動 @2 檢查服務是否正在關閉 @3 transport創建監聽器 @4 server啟動 @5 活動server統計 @6 從守護執行緒池中獲取一個執行緒 @7 服務啟動標記

程式碼坐標:NettyServer.start

public void start(ServerListener serverListener) {      ServerBootstrap b = new ServerBootstrap();      ...       b.childHandler(new ChannelInitializer<Channel>() {        @Override        public void initChannel(Channel ch) {          NettyServerTransport transport =              new NettyServerTransport(...); // @1        }        transport.start(transportListener);        ...      });  }

程式碼坐標:NettyServerTransport.start

...  grpcHandler = createHandler(listener, channelUnused); // @2  ...

程式碼坐標:NettyServerHandler.newHandler

 static NettyServerHandler newHandler(...){ // @3      final Http2Connection connection = new DefaultHttp2Connection(true);      WeightedFairQueueByteDistributor dist = new WeightedFairQueueByteDistributor(connection);      dist.allocationQuantum(16 * 1024); // Make benchmarks fast again.      DefaultHttp2RemoteFlowController controller =          new DefaultHttp2RemoteFlowController(connection, dist);      connection.remote().flowController(controller);      final KeepAliveEnforcer keepAliveEnforcer = new KeepAliveEnforcer(          permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);        // Create the local flow controller configured to auto-refill the connection window.      connection.local().flowController(          new DefaultHttp2LocalFlowController(connection, DEFAULT_WINDOW_UPDATE_RATIO, true));      frameWriter = new WriteMonitoringFrameWriter(frameWriter, keepAliveEnforcer);      Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, frameWriter);      encoder = new Http2ControlFrameLimitEncoder(encoder, 10000);      Http2ConnectionDecoder decoder = new DefaultHttp2ConnectionDecoder(connection, encoder,          frameReader);        Http2Settings settings = new Http2Settings();      settings.initialWindowSize(flowControlWindow);      settings.maxConcurrentStreams(maxStreams);      settings.maxHeaderListSize(maxHeaderListSize);        return new NettyServerHandler(...)   }

@1 創建NettyServerTransport @2 基於Netty HTTP/2構建gRPC服務端 @3 具體的Netty HTTP/2實現,具體在分析HTTP/2時再回頭分析

6、小結

從一行程式碼啟動gRPC服務端開始,從註冊地址、註冊服務、Server構建、Server啟動流程走查。gRPC基於Netty HTTP/2協議棧封裝底層通訊。