sofa-bolt源码阅读(1)-服务端的启动

Bolt服务器的核心类是RpcServer,启动的时候调用父类AbstractRemotingServer的startup方法。

com.alipay.remoting.AbstractRemotingServer#startup      @Override      public void startup() throws LifeCycleException {          super.startup();            try {              doInit();                logger.warn("Prepare to start server on port {} ", port);              if (doStart()) {                  logger.warn("Server started on port {}", port);              } else {                  logger.warn("Failed starting server on port {}", port);                  throw new LifeCycleException("Failed starting server on port: " + port);              }          } catch (Throwable t) {              this.shutdown();// do stop to ensure close resources created during doInit()              throw new IllegalStateException("ERROR: Failed to start the Server!", t);          }      }

这里主要做了三件事

  1. 调用父类的startup()方法设置状态为启动

     com.alipay.remoting.AbstractLifeCycle#startup   @Override      public void startup() throws LifeCycleException {          if (isStarted.compareAndSet(false, true)) {              return;          }          throw new LifeCycleException("this component has started");      }
  2. 调用实现类的doInit()进行实际的初始化工作

     com.alipay.remoting.rpc.RpcServer#doInit     @Override      protected void doInit() {          if (this.addressParser == null) {              this.addressParser = new RpcAddressParser();          }          if (this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {              // in server side, do not care the connection service state, so use null instead of global switch              ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy(null);              this.connectionManager = new DefaultServerConnectionManager(connectionSelectStrategy);              this.connectionManager.startup();                this.connectionEventHandler = new RpcConnectionEventHandler(switches());              this.connectionEventHandler.setConnectionManager(this.connectionManager);              this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);          } else {              this.connectionEventHandler = new ConnectionEventHandler(switches());              this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);          }          initRpcRemoting();          this.bootstrap = new ServerBootstrap();          this.bootstrap.group(bossGroup, workerGroup)              .channel(NettyEventLoopUtil.getServerSocketChannelClass())              .option(ChannelOption.SO_BACKLOG, ConfigManager.tcp_so_backlog())              .option(ChannelOption.SO_REUSEADDR, ConfigManager.tcp_so_reuseaddr())              .childOption(ChannelOption.TCP_NODELAY, ConfigManager.tcp_nodelay())              .childOption(ChannelOption.SO_KEEPALIVE, ConfigManager.tcp_so_keepalive());            // set write buffer water mark          initWriteBufferWaterMark();            // init byte buf allocator          if (ConfigManager.netty_buffer_pooled()) {              this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)                  .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);          } else {              this.bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT)                  .childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);          }            // enable trigger mode for epoll if need          NettyEventLoopUtil.enableTriggeredMode(bootstrap);            final boolean idleSwitch = ConfigManager.tcp_idle_switch();          final int idleTime = ConfigManager.tcp_server_idle();          final ChannelHandler serverIdleHandler = new ServerIdleHandler();          final RpcHandler rpcHandler = new RpcHandler(true, this.userProcessors);          this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {                @Override              protected void initChannel(SocketChannel channel) {                  ChannelPipeline pipeline = channel.pipeline();                  pipeline.addLast("decoder", codec.newDecoder());                  pipeline.addLast("encoder", codec.newEncoder());                  if (idleSwitch) {                      pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime,                          TimeUnit.MILLISECONDS));                      pipeline.addLast("serverIdleHandler", serverIdleHandler);                  }                  pipeline.addLast("connectionEventHandler", connectionEventHandler);                  pipeline.addLast("handler", rpcHandler);                  createConnection(channel);              }                /**               * create connection operation<br>               * <ul>               * <li>If flag manageConnection be true, use {@link DefaultConnectionManager} to add a new connection, meanwhile bind it with the channel.</li>               * <li>If flag manageConnection be false, just create a new connection and bind it with the channel.</li>               * </ul>               */              private void createConnection(SocketChannel channel) {                  Url url = addressParser.parse(RemotingUtil.parseRemoteAddress(channel));                  if (switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {                      connectionManager.add(new Connection(channel, url), url.getUniqueKey());                  } else {                      new Connection(channel, url);                  }                  channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);              }          });      }

    这里的代码看似很复杂,其实主要是配置Netty服务器。Netty服务器的可配置选项通过ConfigManager来获取,Netty的业务处理在childHandler里面。

    protected void initChannel(SocketChannel channel) {                  ChannelPipeline pipeline = channel.pipeline();                  pipeline.addLast("decoder", codec.newDecoder());                  pipeline.addLast("encoder", codec.newEncoder());                  if (idleSwitch) {                      pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime,                          TimeUnit.MILLISECONDS));                      pipeline.addLast("serverIdleHandler", serverIdleHandler);                  }                  pipeline.addLast("connectionEventHandler", connectionEventHandler);                  pipeline.addLast("handler", rpcHandler);                  createConnection(channel);              }

    初始化channel的方法里面有6个channelHandler

    1. decoder 解码器

    2. encoder 编码器

    3. idleStateHandler

      Netty自带的空闲处理器,用于触发IdleStateEvent。在这里配置了总空闲时间idleTime(默认值是90000),即idleTime时间内如果通道没有发生读写操作,将出发一个IdleStateEvent事件。

    4. serverIdleHandler

      配合idleStateHandler使用,用来处理IdleStateEvent。当触发该时间后,关闭客户端连接

          @Override      public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {          if (evt instanceof IdleStateEvent) {              try {                  logger.warn("Connection idle, close it from server side: {}",                      RemotingUtil.parseRemoteAddress(ctx.channel()));                  ctx.close();              } catch (Exception e) {                  logger.warn("Exception caught when closing connection in ServerIdleHandler.", e);              }          } else {              super.userEventTriggered(ctx, evt);          }      }
    5. connectionEventHandler

      connect事件处理器,类型由枚举类ConnectionEventType定义

      public enum ConnectionEventType {      CONNECT, CLOSE, EXCEPTION;  }
      • CONNECT

        connect事件在RpcServer.createConnection()方法里触发了一次

      • CLOSE

        close事件在连接断开时会触发

        com.alipay.remoting.ConnectionEventHandler    @Override  public void channelInactive(ChannelHandlerContext ctx) throws Exception {      String remoteAddress = RemotingUtil.parseRemoteAddress(ctx.channel());      infoLog("Connection channel inactive: {}", remoteAddress);      super.channelInactive(ctx);      Attribute attr = ctx.channel().attr(Connection.CONNECTION);      if (null != attr) {          // add reconnect task          if (this.globalSwitch != null              && this.globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {              Connection conn = (Connection) attr.get();              if (reconnectManager != null) {                  reconnectManager.reconnect(conn.getUrl());              }          }          // trigger close connection event          onEvent((Connection) attr.get(), remoteAddress, ConnectionEventType.CLOSE);      }  }
      • EXCEPTION

        exception异常事件在源代码里面没有触发的地方,应该是预留的。

      无论是什么事件,最终都调用onEvent方法,转发给ConnectionEventListener,最终由用户自定义的ConnectionEventProcessor来处理具体逻辑

      com.alipay.remoting.ConnectionEventHandler#onEvent    private void onEvent(final Connection conn, final String remoteAddress,                       final ConnectionEventType type) {      if (this.eventListener != null) {          this.eventExecutor.onEvent(new Runnable() {              @Override              public void run() {                  ConnectionEventHandler.this.eventListener.onEvent(type, remoteAddress, conn);              }          });      }  }      com.alipay.remoting.ConnectionEventListener#onEvent  public void onEvent(ConnectionEventType type, String remoteAddress, Connection connection) {          List<ConnectionEventProcessor> processorList = this.processors.get(type);          if (processorList != null) {              for (ConnectionEventProcessor processor : processorList) {                  processor.onEvent(remoteAddress, connection);              }          }      }
    6. handler

      具体业务处理器,注册类为RpcHandler,调用流程图如下

      对于客户端请求的处理交给UserProcessor, 可以调用RpcServer类的registerUserProcessor注册自定义的业务。

  3. 调用dostart启动服务器

    com.alipay.remoting.rpc.RpcServer#doStart    @Override  protected boolean doStart() throws InterruptedException {      this.channelFuture = this.bootstrap.bind(new InetSocketAddress(ip(), port())).sync();      return this.channelFuture.isSuccess();  }