­

sofa-bolt源碼閱讀(1)-服務端的啟動

Bolt服務器的核心類是RpcServer,啟動的時候調用父類AbstractRemotingServer的startup方法。

com.alipay.remoting.AbstractRemotingServer#startup      @Override      public void startup() throws LifeCycleException {          super.startup();            try {              doInit();                logger.warn("Prepare to start server on port {} ", port);              if (doStart()) {                  logger.warn("Server started on port {}", port);              } else {                  logger.warn("Failed starting server on port {}", port);                  throw new LifeCycleException("Failed starting server on port: " + port);              }          } catch (Throwable t) {              this.shutdown();// do stop to ensure close resources created during doInit()              throw new IllegalStateException("ERROR: Failed to start the Server!", t);          }      }

這裡主要做了三件事

  1. 調用父類的startup()方法設置狀態為啟動

     com.alipay.remoting.AbstractLifeCycle#startup   @Override      public void startup() throws LifeCycleException {          if (isStarted.compareAndSet(false, true)) {              return;          }          throw new LifeCycleException("this component has started");      }
  2. 調用實現類的doInit()進行實際的初始化工作

     com.alipay.remoting.rpc.RpcServer#doInit     @Override      protected void doInit() {          if (this.addressParser == null) {              this.addressParser = new RpcAddressParser();          }          if (this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {              // in server side, do not care the connection service state, so use null instead of global switch              ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy(null);              this.connectionManager = new DefaultServerConnectionManager(connectionSelectStrategy);              this.connectionManager.startup();                this.connectionEventHandler = new RpcConnectionEventHandler(switches());              this.connectionEventHandler.setConnectionManager(this.connectionManager);              this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);          } else {              this.connectionEventHandler = new ConnectionEventHandler(switches());              this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);          }          initRpcRemoting();          this.bootstrap = new ServerBootstrap();          this.bootstrap.group(bossGroup, workerGroup)              .channel(NettyEventLoopUtil.getServerSocketChannelClass())              .option(ChannelOption.SO_BACKLOG, ConfigManager.tcp_so_backlog())              .option(ChannelOption.SO_REUSEADDR, ConfigManager.tcp_so_reuseaddr())              .childOption(ChannelOption.TCP_NODELAY, ConfigManager.tcp_nodelay())              .childOption(ChannelOption.SO_KEEPALIVE, ConfigManager.tcp_so_keepalive());            // set write buffer water mark          initWriteBufferWaterMark();            // init byte buf allocator          if (ConfigManager.netty_buffer_pooled()) {              this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)                  .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);          } else {              this.bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT)                  .childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);          }            // enable trigger mode for epoll if need          NettyEventLoopUtil.enableTriggeredMode(bootstrap);            final boolean idleSwitch = ConfigManager.tcp_idle_switch();          final int idleTime = ConfigManager.tcp_server_idle();          final ChannelHandler serverIdleHandler = new ServerIdleHandler();          final RpcHandler rpcHandler = new RpcHandler(true, this.userProcessors);          this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {                @Override              protected void initChannel(SocketChannel channel) {                  ChannelPipeline pipeline = channel.pipeline();                  pipeline.addLast("decoder", codec.newDecoder());                  pipeline.addLast("encoder", codec.newEncoder());                  if (idleSwitch) {                      pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime,                          TimeUnit.MILLISECONDS));                      pipeline.addLast("serverIdleHandler", serverIdleHandler);                  }                  pipeline.addLast("connectionEventHandler", connectionEventHandler);                  pipeline.addLast("handler", rpcHandler);                  createConnection(channel);              }                /**               * create connection operation<br>               * <ul>               * <li>If flag manageConnection be true, use {@link DefaultConnectionManager} to add a new connection, meanwhile bind it with the channel.</li>               * <li>If flag manageConnection be false, just create a new connection and bind it with the channel.</li>               * </ul>               */              private void createConnection(SocketChannel channel) {                  Url url = addressParser.parse(RemotingUtil.parseRemoteAddress(channel));                  if (switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {                      connectionManager.add(new Connection(channel, url), url.getUniqueKey());                  } else {                      new Connection(channel, url);                  }                  channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);              }          });      }

    這裡的代碼看似很複雜,其實主要是配置Netty服務器。Netty服務器的可配置選項通過ConfigManager來獲取,Netty的業務處理在childHandler裏面。

    protected void initChannel(SocketChannel channel) {                  ChannelPipeline pipeline = channel.pipeline();                  pipeline.addLast("decoder", codec.newDecoder());                  pipeline.addLast("encoder", codec.newEncoder());                  if (idleSwitch) {                      pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime,                          TimeUnit.MILLISECONDS));                      pipeline.addLast("serverIdleHandler", serverIdleHandler);                  }                  pipeline.addLast("connectionEventHandler", connectionEventHandler);                  pipeline.addLast("handler", rpcHandler);                  createConnection(channel);              }

    初始化channel的方法裏面有6個channelHandler

    1. decoder 解碼器

    2. encoder 編碼器

    3. idleStateHandler

      Netty自帶的空閑處理器,用於觸發IdleStateEvent。在這裡配置了總空閑時間idleTime(默認值是90000),即idleTime時間內如果通道沒有發生讀寫操作,將出發一個IdleStateEvent事件。

    4. serverIdleHandler

      配合idleStateHandler使用,用來處理IdleStateEvent。當觸發該時間後,關閉客戶端連接

          @Override      public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {          if (evt instanceof IdleStateEvent) {              try {                  logger.warn("Connection idle, close it from server side: {}",                      RemotingUtil.parseRemoteAddress(ctx.channel()));                  ctx.close();              } catch (Exception e) {                  logger.warn("Exception caught when closing connection in ServerIdleHandler.", e);              }          } else {              super.userEventTriggered(ctx, evt);          }      }
    5. connectionEventHandler

      connect事件處理器,類型由枚舉類ConnectionEventType定義

      public enum ConnectionEventType {      CONNECT, CLOSE, EXCEPTION;  }
      • CONNECT

        connect事件在RpcServer.createConnection()方法里觸發了一次

      • CLOSE

        close事件在連接斷開時會觸發

        com.alipay.remoting.ConnectionEventHandler    @Override  public void channelInactive(ChannelHandlerContext ctx) throws Exception {      String remoteAddress = RemotingUtil.parseRemoteAddress(ctx.channel());      infoLog("Connection channel inactive: {}", remoteAddress);      super.channelInactive(ctx);      Attribute attr = ctx.channel().attr(Connection.CONNECTION);      if (null != attr) {          // add reconnect task          if (this.globalSwitch != null              && this.globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {              Connection conn = (Connection) attr.get();              if (reconnectManager != null) {                  reconnectManager.reconnect(conn.getUrl());              }          }          // trigger close connection event          onEvent((Connection) attr.get(), remoteAddress, ConnectionEventType.CLOSE);      }  }
      • EXCEPTION

        exception異常事件在源代碼裏面沒有觸發的地方,應該是預留的。

      無論是什麼事件,最終都調用onEvent方法,轉發給ConnectionEventListener,最終由用戶自定義的ConnectionEventProcessor來處理具體邏輯

      com.alipay.remoting.ConnectionEventHandler#onEvent    private void onEvent(final Connection conn, final String remoteAddress,                       final ConnectionEventType type) {      if (this.eventListener != null) {          this.eventExecutor.onEvent(new Runnable() {              @Override              public void run() {                  ConnectionEventHandler.this.eventListener.onEvent(type, remoteAddress, conn);              }          });      }  }      com.alipay.remoting.ConnectionEventListener#onEvent  public void onEvent(ConnectionEventType type, String remoteAddress, Connection connection) {          List<ConnectionEventProcessor> processorList = this.processors.get(type);          if (processorList != null) {              for (ConnectionEventProcessor processor : processorList) {                  processor.onEvent(remoteAddress, connection);              }          }      }
    6. handler

      具體業務處理器,註冊類為RpcHandler,調用流程圖如下

      對於客戶端請求的處理交給UserProcessor, 可以調用RpcServer類的registerUserProcessor註冊自定義的業務。

  3. 調用dostart啟動服務器

    com.alipay.remoting.rpc.RpcServer#doStart    @Override  protected boolean doStart() throws InterruptedException {      this.channelFuture = this.bootstrap.bind(new InetSocketAddress(ip(), port())).sync();      return this.channelFuture.isSuccess();  }