sofa-bolt源碼閱讀(1)-服務端的啟動
- 2020 年 3 月 2 日
- 筆記
Bolt服務器的核心類是RpcServer,啟動的時候調用父類AbstractRemotingServer的startup方法。
com.alipay.remoting.AbstractRemotingServer#startup @Override public void startup() throws LifeCycleException { super.startup(); try { doInit(); logger.warn("Prepare to start server on port {} ", port); if (doStart()) { logger.warn("Server started on port {}", port); } else { logger.warn("Failed starting server on port {}", port); throw new LifeCycleException("Failed starting server on port: " + port); } } catch (Throwable t) { this.shutdown();// do stop to ensure close resources created during doInit() throw new IllegalStateException("ERROR: Failed to start the Server!", t); } }
這裡主要做了三件事
-
調用父類的startup()方法設置狀態為啟動
com.alipay.remoting.AbstractLifeCycle#startup @Override public void startup() throws LifeCycleException { if (isStarted.compareAndSet(false, true)) { return; } throw new LifeCycleException("this component has started"); }
-
調用實現類的doInit()進行實際的初始化工作
com.alipay.remoting.rpc.RpcServer#doInit @Override protected void doInit() { if (this.addressParser == null) { this.addressParser = new RpcAddressParser(); } if (this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) { // in server side, do not care the connection service state, so use null instead of global switch ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy(null); this.connectionManager = new DefaultServerConnectionManager(connectionSelectStrategy); this.connectionManager.startup(); this.connectionEventHandler = new RpcConnectionEventHandler(switches()); this.connectionEventHandler.setConnectionManager(this.connectionManager); this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener); } else { this.connectionEventHandler = new ConnectionEventHandler(switches()); this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener); } initRpcRemoting(); this.bootstrap = new ServerBootstrap(); this.bootstrap.group(bossGroup, workerGroup) .channel(NettyEventLoopUtil.getServerSocketChannelClass()) .option(ChannelOption.SO_BACKLOG, ConfigManager.tcp_so_backlog()) .option(ChannelOption.SO_REUSEADDR, ConfigManager.tcp_so_reuseaddr()) .childOption(ChannelOption.TCP_NODELAY, ConfigManager.tcp_nodelay()) .childOption(ChannelOption.SO_KEEPALIVE, ConfigManager.tcp_so_keepalive()); // set write buffer water mark initWriteBufferWaterMark(); // init byte buf allocator if (ConfigManager.netty_buffer_pooled()) { this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } else { this.bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT); } // enable trigger mode for epoll if need NettyEventLoopUtil.enableTriggeredMode(bootstrap); final boolean idleSwitch = ConfigManager.tcp_idle_switch(); final int idleTime = ConfigManager.tcp_server_idle(); final ChannelHandler serverIdleHandler = new ServerIdleHandler(); final RpcHandler rpcHandler = new RpcHandler(true, this.userProcessors); this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("decoder", codec.newDecoder()); pipeline.addLast("encoder", codec.newEncoder()); if (idleSwitch) { pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime, TimeUnit.MILLISECONDS)); pipeline.addLast("serverIdleHandler", serverIdleHandler); } pipeline.addLast("connectionEventHandler", connectionEventHandler); pipeline.addLast("handler", rpcHandler); createConnection(channel); } /** * create connection operation<br> * <ul> * <li>If flag manageConnection be true, use {@link DefaultConnectionManager} to add a new connection, meanwhile bind it with the channel.</li> * <li>If flag manageConnection be false, just create a new connection and bind it with the channel.</li> * </ul> */ private void createConnection(SocketChannel channel) { Url url = addressParser.parse(RemotingUtil.parseRemoteAddress(channel)); if (switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) { connectionManager.add(new Connection(channel, url), url.getUniqueKey()); } else { new Connection(channel, url); } channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT); } }); }
這裡的代碼看似很複雜,其實主要是配置Netty服務器。Netty服務器的可配置選項通過ConfigManager來獲取,Netty的業務處理在childHandler裏面。
protected void initChannel(SocketChannel channel) { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("decoder", codec.newDecoder()); pipeline.addLast("encoder", codec.newEncoder()); if (idleSwitch) { pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime, TimeUnit.MILLISECONDS)); pipeline.addLast("serverIdleHandler", serverIdleHandler); } pipeline.addLast("connectionEventHandler", connectionEventHandler); pipeline.addLast("handler", rpcHandler); createConnection(channel); }
初始化channel的方法裏面有6個channelHandler
-
decoder 解碼器
-
encoder 編碼器
-
idleStateHandler
Netty自帶的空閑處理器,用於觸發IdleStateEvent。在這裡配置了總空閑時間idleTime(默認值是90000),即idleTime時間內如果通道沒有發生讀寫操作,將出發一個IdleStateEvent事件。
-
serverIdleHandler
配合idleStateHandler使用,用來處理IdleStateEvent。當觸發該時間後,關閉客戶端連接
@Override public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { try { logger.warn("Connection idle, close it from server side: {}", RemotingUtil.parseRemoteAddress(ctx.channel())); ctx.close(); } catch (Exception e) { logger.warn("Exception caught when closing connection in ServerIdleHandler.", e); } } else { super.userEventTriggered(ctx, evt); } }
-
connectionEventHandler
connect事件處理器,類型由枚舉類ConnectionEventType定義
public enum ConnectionEventType { CONNECT, CLOSE, EXCEPTION; }
-
CONNECT
connect事件在RpcServer.createConnection()方法里觸發了一次
-
CLOSE
close事件在連接斷開時會觸發
com.alipay.remoting.ConnectionEventHandler @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { String remoteAddress = RemotingUtil.parseRemoteAddress(ctx.channel()); infoLog("Connection channel inactive: {}", remoteAddress); super.channelInactive(ctx); Attribute attr = ctx.channel().attr(Connection.CONNECTION); if (null != attr) { // add reconnect task if (this.globalSwitch != null && this.globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) { Connection conn = (Connection) attr.get(); if (reconnectManager != null) { reconnectManager.reconnect(conn.getUrl()); } } // trigger close connection event onEvent((Connection) attr.get(), remoteAddress, ConnectionEventType.CLOSE); } }
-
EXCEPTION
exception異常事件在源代碼裏面沒有觸發的地方,應該是預留的。
無論是什麼事件,最終都調用onEvent方法,轉發給ConnectionEventListener,最終由用戶自定義的ConnectionEventProcessor來處理具體邏輯
com.alipay.remoting.ConnectionEventHandler#onEvent private void onEvent(final Connection conn, final String remoteAddress, final ConnectionEventType type) { if (this.eventListener != null) { this.eventExecutor.onEvent(new Runnable() { @Override public void run() { ConnectionEventHandler.this.eventListener.onEvent(type, remoteAddress, conn); } }); } } com.alipay.remoting.ConnectionEventListener#onEvent public void onEvent(ConnectionEventType type, String remoteAddress, Connection connection) { List<ConnectionEventProcessor> processorList = this.processors.get(type); if (processorList != null) { for (ConnectionEventProcessor processor : processorList) { processor.onEvent(remoteAddress, connection); } } }
-
-
handler
具體業務處理器,註冊類為RpcHandler,調用流程圖如下
對於客戶端請求的處理交給UserProcessor, 可以調用RpcServer類的registerUserProcessor註冊自定義的業務。
-
-
調用dostart啟動服務器
com.alipay.remoting.rpc.RpcServer#doStart @Override protected boolean doStart() throws InterruptedException { this.channelFuture = this.bootstrap.bind(new InetSocketAddress(ip(), port())).sync(); return this.channelFuture.isSuccess(); }