­

RocketMQ中NameServer的启动源码分析

  • 2019 年 10 月 3 日
  • 筆記

在RocketMQ中,使用NamesrvStartup作为启动类

 

主函数作为其启动的入口:

1 public static void main(String[] args) {  2     main0(args);  3 }

main0方法:

 1 public static NamesrvController main0(String[] args) {   2     try {   3         NamesrvController controller = createNamesrvController(args);   4         start(controller);   5         String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();   6         log.info(tip);   7         System.out.printf("%s%n", tip);   8         return controller;   9     } catch (Throwable e) {  10         e.printStackTrace();  11         System.exit(-1);  12     }  13  14     return null;  15 }

首先通过createNamesrvController方法生成NameServer的控制器NamesrvController

createNamesrvController方法:

 1 public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {   2     System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));   3     //PackageConflictDetect.detectFastjson();   4   5     Options options = ServerUtil.buildCommandlineOptions(new Options());   6     commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());   7     if (null == commandLine) {   8         System.exit(-1);   9         return null;  10     }  11  12     final NamesrvConfig namesrvConfig = new NamesrvConfig();  13     final NettyServerConfig nettyServerConfig = new NettyServerConfig();  14     nettyServerConfig.setListenPort(9876);  15     if (commandLine.hasOption('c')) {  16         String file = commandLine.getOptionValue('c');  17         if (file != null) {  18             InputStream in = new BufferedInputStream(new FileInputStream(file));  19             properties = new Properties();  20             properties.load(in);  21             MixAll.properties2Object(properties, namesrvConfig);  22             MixAll.properties2Object(properties, nettyServerConfig);  23  24             namesrvConfig.setConfigStorePath(file);  25  26             System.out.printf("load config properties file OK, %s%n", file);  27             in.close();  28         }  29     }  30  31     if (commandLine.hasOption('p')) {  32         InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);  33         MixAll.printObjectProperties(console, namesrvConfig);  34         MixAll.printObjectProperties(console, nettyServerConfig);  35         System.exit(0);  36     }  37  38     MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);  39  40     if (null == namesrvConfig.getRocketmqHome()) {  41         System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);  42         System.exit(-2);  43     }  44  45     LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();  46     JoranConfigurator configurator = new JoranConfigurator();  47     configurator.setContext(lc);  48     lc.reset();  49     configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");  50  51     log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);  52  53     MixAll.printObjectProperties(log, namesrvConfig);  54     MixAll.printObjectProperties(log, nettyServerConfig);  55  56     final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);  57  58     // remember all configs to prevent discard  59     controller.getConfiguration().registerConfig(properties);  60  61     return controller;  62 }

这里创建了两个实体类NamesrvConfig和NettyServerConfig
这两个实体类对应了其配置文件中的配置

NamesrvConfig:

1 private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));  2 private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";  3 private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";  4 private String productEnvName = "center";  5 private boolean clusterTest = false;  6 private boolean orderMessageEnable = false;

NettyServerConfig:

 1 private int listenPort = 8888;   2 private int serverWorkerThreads = 8;   3 private int serverCallbackExecutorThreads = 0;   4 private int serverSelectorThreads = 3;   5 private int serverOnewaySemaphoreValue = 256;   6 private int serverAsyncSemaphoreValue = 64;   7 private int serverChannelMaxIdleTimeSeconds = 120;   8   9 private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize // 65535;  10 private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize // 65535;  11 private boolean serverPooledByteBufAllocatorEnable = true;

对应如下配置文件:

##  # 名称:NamesrvConfig.rocketmqHome <String>  # 默认值:(通过 sh mqnamesrv 设置 ROCKETMQ_HOME 环境变量,在源程序中获取环境变量得  #        到的目录)  # 描述:RocketMQ 主目录  # 建议:不主动配置  ##  rocketmqHome = /usr/rocketmq    ##  # 名称:NamesrvConfig.kvConfigPath <String>  # 默认值:$user.home/namesrv/kvConfig.json <在源程序中获取用户环境变量后生成>  # 描述:kv 配置文件路径,包含顺序消息主题的配置信息  # 建议:启用顺序消息时配置  ##  kvConfigPath = /root/namesrv/kvConfig.json    ##  # 名称:NamesrvConfig.configStorePath <String>  # 默认值:$user.home/namesrv/namesrv.properties <在源程序中获取用户环境变量后生成>  # 描述:NameServer 配置文件路径  # 建议:启动时通过 -c 指定  ##  configStorePath = /root/namesrv/namesrv.properties    ##  # 名称:NamesrvConfig.clusterTest <boolean>  # 默认值:false <在源程序中初始化字段时指定>  # 描述:是否开启集群测试  # 建议:不主动配置  ##  clusterTest = false    ##  # 名称:NamesrvConfig.orderMessageEnable <boolean>  # 默认值:false <在源程序中初始化字段时指定>  # 描述:是否支持顺序消息  # 建议:启用顺序消息时配置  ##  orderMessageEnable = false    ##  # 名称:NettyServerConfig.listenPort <int>  # 默认值:9876 <在源程序中初始化后单独设置>  # 描述:服务端监听端口  # 建议:不主动配置  ##  listenPort = 9876    ##  # 名称:NettyServerConfig.serverWorkerThreads <int>  # 默认值:8 <在源程序中初始化字段时指定>  # 描述:Netty 业务线程池线程个数  # 建议:不主动配置  ##  serverWorkerThreads = 8    ##  # 名称:NettyServerConfig.serverCallbackExecutorThreads <int>  # 默认值:0 <在源程序中初始化字段时指定>  # 描述:Netty public 任务线程池线程个数,Netty 网络设计,根据业务类型会创建不同的线程池,比如处理发送消息、消息消费、心跳检测等。如果该业务类型(RequestCode)未注册线程池,则由 public 线程池执行  # 建议:  ##  serverCallbackExecutorThreads = 0    ##  # 名称:NettyServerConfig.serverSelectorThreads <int>  # 默认值:3 <在源程序中初始化字段时指定>  # 描述:IO 线程池线程个数,主要是 NameServer、Broker 端解析请求、返回响应的线程个数,这类线程池主要是处理网络请求的,解析请求包,然后转发到各个业务线程池完成具体的业务操作,然后将结果再返回调用方  # 建议:不主动配置  ##  serverSelectorThreads = 3    ##  # 名称:NettyServerConfig.serverOnewaySemaphoreValue <int>  # 默认值:256 <在源程序中初始化字段时指定>  # 描述:send oneway 消息请求并发度  # 建议:不主动配置  ##  serverOnewaySemaphoreValue = 256    ##  # 名称:NettyServerConfig.serverAsyncSemaphoreValue <int>  # 默认值:64 <在源程序中初始化字段时指定>  # 描述:异步消息发送最大并发度  # 建议:不主动配置  ##  serverAsyncSemaphoreValue = 64    ##  # 名称:NettyServerConfig.serverChannelMaxIdleTimeSeconds <int>  # 默认值:120 <在源程序中初始化字段时指定>  # 描述:网络连接最大空闲时间,单位秒,如果连接空闲时间超过该参数设置的值,连接将被关闭  # 建议:不主动配置  ##  serverChannelMaxIdleTimeSeconds = 120    ##  # 名称:NettyServerConfig.serverSocketSndBufSize <int>  # 默认值:65535 <在源程序中初始化字段时指定>  # 描述:网络 socket 发送缓存区大小,单位 B,即默认为 64KB  # 建议:不主动配置  ##  serverSocketSndBufSize = 65535    ##  # 名称:NettyServerConfig.serverSocketRcvBufSize <int>  # 默认值:65535 <在源程序中初始化字段时指定>  # 描述:网络 socket 接收缓存区大小,单位 B,即默认为 64KB  # 建议:不主动配置  ##  serverSocketRcvBufSize = 65535    ##  # 名称:NettyServerConfig.serverPooledByteBufAllocatorEnable <int>  # 默认值:true <在源程序中初始化字段时指定>  # 描述:ByteBuffer 是否开启缓存,建议开启  # 建议:不主动配置  ##  serverPooledByteBufAllocatorEnable = true    ##  # 名称:NettyServerConfig.useEpollNativeSelector <int>  # 默认值:false <在源程序中初始化字段时指定>  # 描述:是否启用 Epoll IO 模型  # 建议:Linux 环境开启  ##  useEpollNativeSelector = true

接下来是对‘-c’命令下配置文件的加载,以及‘-p’命令下namesrvConfig和nettyServerConfig属性的打印
后续是对日志的一系列配置

在完成这些后,会根据namesrvConfig和nettyServerConfig创建NamesrvController实例

NamesrvController:

 1 public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {   2     this.namesrvConfig = namesrvConfig;   3     this.nettyServerConfig = nettyServerConfig;   4     this.kvConfigManager = new KVConfigManager(this);   5     this.routeInfoManager = new RouteInfoManager();   6     this.brokerHousekeepingService = new BrokerHousekeepingService(this);   7     this.configuration = new Configuration(   8         log,   9         this.namesrvConfig, this.nettyServerConfig  10     );  11     this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");  12 }

可以看到这里创建了一个KVConfigManager和一个RouteInfoManager

 

KVConfigManager:

 1 public class KVConfigManager {   2     private final NamesrvController namesrvController;   3     private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable =   4             new HashMap<String, HashMap<String, String>>();   5   6     public KVConfigManager(NamesrvController namesrvController) {   7             this.namesrvController = namesrvController;   8     }   9     ......  10 }

KVConfigManager通过建立configTable管理KV

RouteInfoManager:

 1 public class RouteInfoManager {   2     private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;   3     private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;   4     private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;   5     private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;   6     private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;   7     private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;   8   9     public RouteInfoManager() {  10     this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);  11     this.brokerAddrTable = new HashMap<String, BrokerData>(128);  12     this.clusterAddrTable = new HashMap<String, Set<String>>(32);  13     this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);  14     this.filterServerTable = new HashMap<String, List<String>>(256);  15     }  16     ......  17 }

RouteInfoManager则记录了这些路由信息,其中BROKER_CHANNEL_EXPIRED_TIME 表示允许的不活跃的Broker存活时间

在NamesrvController中还创建了一个BrokerHousekeepingService:

 1 public class BrokerHousekeepingService implements ChannelEventListener {   2     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);   3     private final NamesrvController namesrvController;   4   5     public BrokerHousekeepingService(NamesrvController namesrvController) {   6         this.namesrvController = namesrvController;   7     }   8   9     @Override  10     public void onChannelConnect(String remoteAddr, Channel channel) {  11     }  12  13     @Override  14     public void onChannelClose(String remoteAddr, Channel channel) {  15         this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);  16     }  17  18     @Override  19     public void onChannelException(String remoteAddr, Channel channel) {  20         this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);  21     }  22  23     @Override  24     public void onChannelIdle(String remoteAddr, Channel channel) {  25         this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);  26     }  27 }

可以看到这是一个ChannelEventListener,用来处理Netty的中的异步事件监听

在创建完NamesrvController后,回到main0,调用start方法,真正开启NameServer服务

start方法:

 1 public static NamesrvController start(final NamesrvController controller) throws Exception {   2     if (null == controller) {   3         throw new IllegalArgumentException("NamesrvController is null");   4     }   5   6     boolean initResult = controller.initialize();   7     if (!initResult) {   8         controller.shutdown();   9         System.exit(-3);  10     }  11  12     Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {  13         @Override  14         public Void call() throws Exception {  15             controller.shutdown();  16             return null;  17         }  18     }));  19  20     controller.start();  21  22     return controller;  23 }

 

首先调用NamesrvController的initialize方法:

 1 public boolean initialize() {   2     this.kvConfigManager.load();   3   4     this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);   5   6     this.remotingExecutor =   7         Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));   8   9     this.registerProcessor();  10  11     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  12  13         @Override  14         public void run() {  15             NamesrvController.this.routeInfoManager.scanNotActiveBroker();  16         }  17     }, 5, 10, TimeUnit.SECONDS);  18  19     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  20  21         @Override  22         public void run() {  23             NamesrvController.this.kvConfigManager.printAllPeriodically();  24         }  25     }, 1, 10, TimeUnit.MINUTES);  26  27     if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {  28         // Register a listener to reload SslContext  29         try {  30             fileWatchService = new FileWatchService(  31                 new String[] {  32                     TlsSystemConfig.tlsServerCertPath,  33                     TlsSystemConfig.tlsServerKeyPath,  34                     TlsSystemConfig.tlsServerTrustCertPath  35                 },  36                 new FileWatchService.Listener() {  37                     boolean certChanged, keyChanged = false;  38                     @Override  39                     public void onChanged(String path) {  40                         if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {  41                             log.info("The trust certificate changed, reload the ssl context");  42                             reloadServerSslContext();  43                         }  44                         if (path.equals(TlsSystemConfig.tlsServerCertPath)) {  45                             certChanged = true;  46                         }  47                         if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {  48                             keyChanged = true;  49                         }  50                         if (certChanged && keyChanged) {  51                             log.info("The certificate and private key changed, reload the ssl context");  52                             certChanged = keyChanged = false;  53                             reloadServerSslContext();  54                         }  55                     }  56                     private void reloadServerSslContext() {  57                         ((NettyRemotingServer) remotingServer).loadSslContext();  58                     }  59                 });  60         } catch (Exception e) {  61             log.warn("FileWatchService created error, can't load the certificate dynamically");  62         }  63     }  64  65     return true;  66 }

先通过kvConfigManager的load方法,向KVConfigManager中的map加载之前配置好的KV文件路径下的键值对

 1 public void load() {   2     String content = null;   3     try {   4         content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());   5     } catch (IOException e) {   6         log.warn("Load KV config table exception", e);   7     }   8     if (content != null) {   9         KVConfigSerializeWrapper kvConfigSerializeWrapper =  10             KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);  11         if (null != kvConfigSerializeWrapper) {  12             this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());  13             log.info("load KV config table OK");  14         }  15     }  16 }

方法比较简单,将JSON形式的KV文件包装成KVConfigSerializeWrapper,通过getConfigTable方法转换成map放在configTable中

 

完成KV加载后,建立了一个NettyRemotingServer,即Netty服务器

 1 public NettyRemotingServer(final NettyServerConfig nettyServerConfig,   2         final ChannelEventListener channelEventListener) {   3     super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());   4     this.serverBootstrap = new ServerBootstrap();   5     this.nettyServerConfig = nettyServerConfig;   6     this.channelEventListener = channelEventListener;   7   8     int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();   9     if (publicThreadNums <= 0) {  10         publicThreadNums = 4;  11     }  12  13     this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {  14         private AtomicInteger threadIndex = new AtomicInteger(0);  15  16         @Override  17         public Thread newThread(Runnable r) {  18             return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());  19         }  20     });  21  22     if (useEpoll()) {  23         this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {  24             private AtomicInteger threadIndex = new AtomicInteger(0);  25  26             @Override  27             public Thread newThread(Runnable r) {  28                 return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));  29             }  30         });  31  32         this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {  33             private AtomicInteger threadIndex = new AtomicInteger(0);  34             private int threadTotal = nettyServerConfig.getServerSelectorThreads();  35  36             @Override  37             public Thread newThread(Runnable r) {  38                 return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));  39             }  40         });  41     } else {  42         this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {  43             private AtomicInteger threadIndex = new AtomicInteger(0);  44  45             @Override  46             public Thread newThread(Runnable r) {  47                 return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));  48             }  49         });  50  51         this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {  52             private AtomicInteger threadIndex = new AtomicInteger(0);  53             private int threadTotal = nettyServerConfig.getServerSelectorThreads();  54  55             @Override  56             public Thread newThread(Runnable r) {  57                 return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));  58             }  59         });  60     }  61  62     loadSslContext();  63 }

这里创建了ServerBootstrap
channelEventListener就是刚才创建的BrokerHousekeepingService

然后根据是否使用epoll,选择创建两个合适的EventLoopGroup

创建完成后,通过loadSslContext完成对SSL和TLS的设置

 

回到initialize方法,在创建完Netty的服务端后,调用registerProcessor方法:

 1 private void registerProcessor() {   2     if (namesrvConfig.isClusterTest()) {   3   4         this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),   5             this.remotingExecutor);   6     } else {   7   8         this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);   9     }  10 }

这里和是否设置了clusterTest集群测试有关,默认关闭

 

在默认情况下创建了DefaultRequestProcessor,这个类很重要,后面会详细说明,然后通过remotingServer的registerDefaultProcessor方法,将DefaultRequestProcessor注册给Netty服务器:

1 public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {  2     this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);  3 }

 

在做完这些后,提交了两个定时任务
①定时清除不活跃的Broker
RouteInfoManager的scanNotActiveBroker方法:

 1 public void scanNotActiveBroker() {   2     Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();   3     while (it.hasNext()) {   4         Entry<String, BrokerLiveInfo> next = it.next();   5         long last = next.getValue().getLastUpdateTimestamp();   6         if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {   7             RemotingUtil.closeChannel(next.getValue().getChannel());   8             it.remove();   9             log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);  10             this.onChannelDestroy(next.getKey(), next.getValue().getChannel());  11         }  12     }  13 }

这里比较简单,在之前RouteInfoManager中创建的brokerLiveTable表中遍历所有BrokerLiveInfo,找到超出规定时间BROKER_CHANNEL_EXPIRED_TIME的BrokerLiveInfo信息进行删除,同时关闭Channel
而onChannelDestroy方法,会对其他几张表进行相关联的删除工作,代码重复量大就不细说了

 

BrokerLiveInfo记录了Broker的活跃度信息:

1 private long lastUpdateTimestamp;  2 private DataVersion dataVersion;  3 private Channel channel;  4 private String haServerAddr;

lastUpdateTimestamp记录上一次更新时间戳,是其活跃性的关键

 

②定时完成configTable的日志记录
KVConfigManager的printAllPeriodically方法:

 1 public void printAllPeriodically() {   2     try {   3         this.lock.readLock().lockInterruptibly();   4         try {   5             log.info("--------------------------------------------------------");   6   7             {   8                 log.info("configTable SIZE: {}", this.configTable.size());   9                 Iterator<Entry<String, HashMap<String, String>>> it =  10                     this.configTable.entrySet().iterator();  11                 while (it.hasNext()) {  12                     Entry<String, HashMap<String, String>> next = it.next();  13                     Iterator<Entry<String, String>> itSub = next.getValue().entrySet().iterator();  14                     while (itSub.hasNext()) {  15                         Entry<String, String> nextSub = itSub.next();  16                         log.info("configTable NS: {} Key: {} Value: {}", next.getKey(), nextSub.getKey(),  17                             nextSub.getValue());  18                     }  19                 }  20             }  21         } finally {  22             this.lock.readLock().unlock();  23         }  24     } catch (InterruptedException e) {  25         log.error("printAllPeriodically InterruptedException", e);  26     }  27 }

很简单,根据configTable表的内容,完成KV的日志记录

 

在创建完这两个定时任务后会注册一个侦听器,以便完成SslContext的重新加载

initialize随之结束,之后是对关闭事件的处理

 

最后调用NamesrvController的start,此时才是真正的开启物理上的服务
NamesrvController的start方法:

1 public void start() throws Exception {  2     this.remotingServer.start();  3  4     if (this.fileWatchService != null) {  5         this.fileWatchService.start();  6     }  7 }

这里实际上就是开启的Netty服务端

 

NettyRemotingServer的start方法:

 1 public void start() {   2     this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(   3         nettyServerConfig.getServerWorkerThreads(),   4         new ThreadFactory() {   5   6             private AtomicInteger threadIndex = new AtomicInteger(0);   7   8             @Override   9             public Thread newThread(Runnable r) {  10                 return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());  11             }  12         });  13  14     ServerBootstrap childHandler =  15         this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)  16             .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)  17             .option(ChannelOption.SO_BACKLOG, 1024)  18             .option(ChannelOption.SO_REUSEADDR, true)  19             .option(ChannelOption.SO_KEEPALIVE, false)  20             .childOption(ChannelOption.TCP_NODELAY, true)  21             .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())  22             .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())  23             .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))  24             .childHandler(new ChannelInitializer<SocketChannel>() {  25                 @Override  26                 public void initChannel(SocketChannel ch) throws Exception {  27                     ch.pipeline()  28                         .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,  29                             new HandshakeHandler(TlsSystemConfig.tlsMode))  30                         .addLast(defaultEventExecutorGroup,  31                             new NettyEncoder(),  32                             new NettyDecoder(),  33                             new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),  34                             new NettyConnectManageHandler(),  35                             new NettyServerHandler()  36                         );  37                 }  38             });  39  40     if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {  41         childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);  42     }  43  44     try {  45         ChannelFuture sync = this.serverBootstrap.bind().sync();  46         InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();  47         this.port = addr.getPort();  48     } catch (InterruptedException e1) {  49         throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);  50     }  51  52     if (this.channelEventListener != null) {  53         this.nettyEventExecutor.start();  54     }  55  56     this.timer.scheduleAtFixedRate(new TimerTask() {  57  58         @Override  59         public void run() {  60             try {  61                 NettyRemotingServer.this.scanResponseTable();  62             } catch (Throwable e) {  63                 log.error("scanResponseTable exception", e);  64             }  65         }  66     }, 1000 * 3, 1000);  67 }

可以看到也就是正常的Netty服务端启动流程

 

关键在于在childHandler的绑定中,可以看到向pipeline绑定了一个NettyServerHandler:

1 class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {  2  3     @Override  4     protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {  5         processMessageReceived(ctx, msg);  6     }  7 }

那么当客户端和NameServre端建立连接后,之间传输的消息会通过processMessageReceived方法进行处理

processMessageReceived方法:

 1 public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {   2        final RemotingCommand cmd = msg;   3    if (cmd != null) {   4         switch (cmd.getType()) {   5             case REQUEST_COMMAND:   6                 processRequestCommand(ctx, cmd);   7                 break;   8             case RESPONSE_COMMAND:   9                 processResponseCommand(ctx, cmd);  10                 break;  11             default:  12                 break;  13         }  14     }  15 }

根据消息类型(请求消息、响应消息),使用不同的处理

processRequestCommand方法:

 1 public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {   2     final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());   3     final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;   4     final int opaque = cmd.getOpaque();   5   6     if (pair != null) {   7         Runnable run = new Runnable() {   8             @Override   9             public void run() {  10                 try {  11                     doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);  12                     final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);  13                     doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);  14  15                     if (!cmd.isOnewayRPC()) {  16                         if (response != null) {  17                             response.setOpaque(opaque);  18                             response.markResponseType();  19                             try {  20                                 ctx.writeAndFlush(response);  21                             } catch (Throwable e) {  22                                 log.error("process request over, but response failed", e);  23                                 log.error(cmd.toString());  24                                 log.error(response.toString());  25                             }  26                         } else {  27  28                         }  29                     }  30                 } catch (Throwable e) {  31                     log.error("process request exception", e);  32                     log.error(cmd.toString());  33  34                     if (!cmd.isOnewayRPC()) {  35                         final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,  36                             RemotingHelper.exceptionSimpleDesc(e));  37                         response.setOpaque(opaque);  38                         ctx.writeAndFlush(response);  39                     }  40                 }  41             }  42         };  43  44         if (pair.getObject1().rejectRequest()) {  45             final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,  46                 "[REJECTREQUEST]system busy, start flow control for a while");  47             response.setOpaque(opaque);  48             ctx.writeAndFlush(response);  49             return;  50         }  51  52         try {  53             final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);  54             pair.getObject2().submit(requestTask);  55         } catch (RejectedExecutionException e) {  56             if ((System.currentTimeMillis() % 10000) == 0) {  57                 log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())  58                     + ", too many requests and system thread pool busy, RejectedExecutionException "  59                     + pair.getObject2().toString()  60                     + " request code: " + cmd.getCode());  61             }  62  63             if (!cmd.isOnewayRPC()) {  64                 final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,  65                     "[OVERLOAD]system busy, start flow control for a while");  66                 response.setOpaque(opaque);  67                 ctx.writeAndFlush(response);  68             }  69         }  70     } else {  71         String error = " request type " + cmd.getCode() + " not supported";  72         final RemotingCommand response =  73             RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);  74         response.setOpaque(opaque);  75         ctx.writeAndFlush(response);  76         log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);  77     }  78 }

在这里创建了一个Runnable提交给线程池,这个Runnable的核心是

1 final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);

实际上调用的就是前面说过的DefaultRequestProcessor的processRequest方法:

 1 public RemotingCommand processRequest(ChannelHandlerContext ctx,   2    RemotingCommand request) throws RemotingCommandException {   3   4     if (ctx != null) {   5         log.debug("receive request, {} {} {}",   6             request.getCode(),   7             RemotingHelper.parseChannelRemoteAddr(ctx.channel()),   8             request);   9     }  10  11  12     switch (request.getCode()) {  13         case RequestCode.PUT_KV_CONFIG:  14             return this.putKVConfig(ctx, request);  15         case RequestCode.GET_KV_CONFIG:  16             return this.getKVConfig(ctx, request);  17         case RequestCode.DELETE_KV_CONFIG:  18             return this.deleteKVConfig(ctx, request);  19         case RequestCode.QUERY_DATA_VERSION:  20             return queryBrokerTopicConfig(ctx, request);  21         case RequestCode.REGISTER_BROKER:  22             Version brokerVersion = MQVersion.value2Version(request.getVersion());  23             if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {  24                 return this.registerBrokerWithFilterServer(ctx, request);  25             } else {  26                 return this.registerBroker(ctx, request);  27             }  28         case RequestCode.UNREGISTER_BROKER:  29             return this.unregisterBroker(ctx, request);  30         case RequestCode.GET_ROUTEINTO_BY_TOPIC:  31             return this.getRouteInfoByTopic(ctx, request);  32         case RequestCode.GET_BROKER_CLUSTER_INFO:  33             return this.getBrokerClusterInfo(ctx, request);  34         case RequestCode.WIPE_WRITE_PERM_OF_BROKER:  35             return this.wipeWritePermOfBroker(ctx, request);  36         case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:  37             return getAllTopicListFromNameserver(ctx, request);  38         case RequestCode.DELETE_TOPIC_IN_NAMESRV:  39             return deleteTopicInNamesrv(ctx, request);  40         case RequestCode.GET_KVLIST_BY_NAMESPACE:  41             return this.getKVListByNamespace(ctx, request);  42         case RequestCode.GET_TOPICS_BY_CLUSTER:  43             return this.getTopicsByCluster(ctx, request);  44         case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:  45             return this.getSystemTopicListFromNs(ctx, request);  46         case RequestCode.GET_UNIT_TOPIC_LIST:  47             return this.getUnitTopicList(ctx, request);  48         case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:  49             return this.getHasUnitSubTopicList(ctx, request);  50         case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:  51             return this.getHasUnitSubUnUnitTopicList(ctx, request);  52         case RequestCode.UPDATE_NAMESRV_CONFIG:  53             return this.updateConfig(ctx, request);  54         case RequestCode.GET_NAMESRV_CONFIG:  55             return this.getConfig(ctx, request);  56         default:  57             break;  58     }  59     return null;  60 }

这个方法很直观,根据不同的RequestCode,执行不同的方法,其中有熟悉的
REGISTER_BROKER 注册Broker
GET_ROUTEINTO_BY_TOPIC 获取Topic路由信息
而其相对性的方法执行就是通过查阅或者修改之前创建的表来完成
最后将相应的数据包装,在Runnable中通过Netty的writeAndFlush完成发送

至此NameServer的启动结束