sofa-bolt源码阅读(1)-服务端的启动
- 2020 年 3 月 2 日
- 筆記
Bolt服务器的核心类是RpcServer,启动的时候调用父类AbstractRemotingServer的startup方法。
com.alipay.remoting.AbstractRemotingServer#startup @Override public void startup() throws LifeCycleException { super.startup(); try { doInit(); logger.warn("Prepare to start server on port {} ", port); if (doStart()) { logger.warn("Server started on port {}", port); } else { logger.warn("Failed starting server on port {}", port); throw new LifeCycleException("Failed starting server on port: " + port); } } catch (Throwable t) { this.shutdown();// do stop to ensure close resources created during doInit() throw new IllegalStateException("ERROR: Failed to start the Server!", t); } }
这里主要做了三件事
-
调用父类的startup()方法设置状态为启动
com.alipay.remoting.AbstractLifeCycle#startup @Override public void startup() throws LifeCycleException { if (isStarted.compareAndSet(false, true)) { return; } throw new LifeCycleException("this component has started"); }
-
调用实现类的doInit()进行实际的初始化工作
com.alipay.remoting.rpc.RpcServer#doInit @Override protected void doInit() { if (this.addressParser == null) { this.addressParser = new RpcAddressParser(); } if (this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) { // in server side, do not care the connection service state, so use null instead of global switch ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy(null); this.connectionManager = new DefaultServerConnectionManager(connectionSelectStrategy); this.connectionManager.startup(); this.connectionEventHandler = new RpcConnectionEventHandler(switches()); this.connectionEventHandler.setConnectionManager(this.connectionManager); this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener); } else { this.connectionEventHandler = new ConnectionEventHandler(switches()); this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener); } initRpcRemoting(); this.bootstrap = new ServerBootstrap(); this.bootstrap.group(bossGroup, workerGroup) .channel(NettyEventLoopUtil.getServerSocketChannelClass()) .option(ChannelOption.SO_BACKLOG, ConfigManager.tcp_so_backlog()) .option(ChannelOption.SO_REUSEADDR, ConfigManager.tcp_so_reuseaddr()) .childOption(ChannelOption.TCP_NODELAY, ConfigManager.tcp_nodelay()) .childOption(ChannelOption.SO_KEEPALIVE, ConfigManager.tcp_so_keepalive()); // set write buffer water mark initWriteBufferWaterMark(); // init byte buf allocator if (ConfigManager.netty_buffer_pooled()) { this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } else { this.bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT); } // enable trigger mode for epoll if need NettyEventLoopUtil.enableTriggeredMode(bootstrap); final boolean idleSwitch = ConfigManager.tcp_idle_switch(); final int idleTime = ConfigManager.tcp_server_idle(); final ChannelHandler serverIdleHandler = new ServerIdleHandler(); final RpcHandler rpcHandler = new RpcHandler(true, this.userProcessors); this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("decoder", codec.newDecoder()); pipeline.addLast("encoder", codec.newEncoder()); if (idleSwitch) { pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime, TimeUnit.MILLISECONDS)); pipeline.addLast("serverIdleHandler", serverIdleHandler); } pipeline.addLast("connectionEventHandler", connectionEventHandler); pipeline.addLast("handler", rpcHandler); createConnection(channel); } /** * create connection operation<br> * <ul> * <li>If flag manageConnection be true, use {@link DefaultConnectionManager} to add a new connection, meanwhile bind it with the channel.</li> * <li>If flag manageConnection be false, just create a new connection and bind it with the channel.</li> * </ul> */ private void createConnection(SocketChannel channel) { Url url = addressParser.parse(RemotingUtil.parseRemoteAddress(channel)); if (switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) { connectionManager.add(new Connection(channel, url), url.getUniqueKey()); } else { new Connection(channel, url); } channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT); } }); }
这里的代码看似很复杂,其实主要是配置Netty服务器。Netty服务器的可配置选项通过ConfigManager来获取,Netty的业务处理在childHandler里面。
protected void initChannel(SocketChannel channel) { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("decoder", codec.newDecoder()); pipeline.addLast("encoder", codec.newEncoder()); if (idleSwitch) { pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime, TimeUnit.MILLISECONDS)); pipeline.addLast("serverIdleHandler", serverIdleHandler); } pipeline.addLast("connectionEventHandler", connectionEventHandler); pipeline.addLast("handler", rpcHandler); createConnection(channel); }
初始化channel的方法里面有6个channelHandler
-
decoder 解码器
-
encoder 编码器
-
idleStateHandler
Netty自带的空闲处理器,用于触发IdleStateEvent。在这里配置了总空闲时间idleTime(默认值是90000),即idleTime时间内如果通道没有发生读写操作,将出发一个IdleStateEvent事件。
-
serverIdleHandler
配合idleStateHandler使用,用来处理IdleStateEvent。当触发该时间后,关闭客户端连接
@Override public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { try { logger.warn("Connection idle, close it from server side: {}", RemotingUtil.parseRemoteAddress(ctx.channel())); ctx.close(); } catch (Exception e) { logger.warn("Exception caught when closing connection in ServerIdleHandler.", e); } } else { super.userEventTriggered(ctx, evt); } }
-
connectionEventHandler
connect事件处理器,类型由枚举类ConnectionEventType定义
public enum ConnectionEventType { CONNECT, CLOSE, EXCEPTION; }
-
CONNECT
connect事件在RpcServer.createConnection()方法里触发了一次
-
CLOSE
close事件在连接断开时会触发
com.alipay.remoting.ConnectionEventHandler @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { String remoteAddress = RemotingUtil.parseRemoteAddress(ctx.channel()); infoLog("Connection channel inactive: {}", remoteAddress); super.channelInactive(ctx); Attribute attr = ctx.channel().attr(Connection.CONNECTION); if (null != attr) { // add reconnect task if (this.globalSwitch != null && this.globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) { Connection conn = (Connection) attr.get(); if (reconnectManager != null) { reconnectManager.reconnect(conn.getUrl()); } } // trigger close connection event onEvent((Connection) attr.get(), remoteAddress, ConnectionEventType.CLOSE); } }
-
EXCEPTION
exception异常事件在源代码里面没有触发的地方,应该是预留的。
无论是什么事件,最终都调用onEvent方法,转发给ConnectionEventListener,最终由用户自定义的ConnectionEventProcessor来处理具体逻辑
com.alipay.remoting.ConnectionEventHandler#onEvent private void onEvent(final Connection conn, final String remoteAddress, final ConnectionEventType type) { if (this.eventListener != null) { this.eventExecutor.onEvent(new Runnable() { @Override public void run() { ConnectionEventHandler.this.eventListener.onEvent(type, remoteAddress, conn); } }); } } com.alipay.remoting.ConnectionEventListener#onEvent public void onEvent(ConnectionEventType type, String remoteAddress, Connection connection) { List<ConnectionEventProcessor> processorList = this.processors.get(type); if (processorList != null) { for (ConnectionEventProcessor processor : processorList) { processor.onEvent(remoteAddress, connection); } } }
-
-
handler
具体业务处理器,注册类为RpcHandler,调用流程图如下
对于客户端请求的处理交给UserProcessor, 可以调用RpcServer类的registerUserProcessor注册自定义的业务。
-
-
调用dostart启动服务器
com.alipay.remoting.rpc.RpcServer#doStart @Override protected boolean doStart() throws InterruptedException { this.channelFuture = this.bootstrap.bind(new InetSocketAddress(ip(), port())).sync(); return this.channelFuture.isSuccess(); }