RocketMQ中Broker的啟動源碼分析(一)

  • 2019 年 10 月 3 日
  • 筆記

在RocketMQ中,使用BrokerStartup作為啟動類,相較於NameServer的啟動,Broker作為RocketMQ的核心可複雜得多

【RocketMQ中NameServer的啟動源碼分析】

主函數作為其啟動的入口:

1 public static void main(String[] args) {  2     start(createBrokerController(args));  3 }

首先通過createBrokerController方法生成Broker的控制器BrokerController

createBrokerController方法:

  1 public static BrokerController createBrokerController(String[] args) {    2     System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));    3    4     if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {    5         NettySystemConfig.socketSndbufSize = 131072;    6     }    7    8     if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {    9         NettySystemConfig.socketRcvbufSize = 131072;   10     }   11   12     try {   13         //PackageConflictDetect.detectFastjson();   14         Options options = ServerUtil.buildCommandlineOptions(new Options());   15         commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),   16             new PosixParser());   17         if (null == commandLine) {   18             System.exit(-1);   19         }   20   21         final BrokerConfig brokerConfig = new BrokerConfig();   22         final NettyServerConfig nettyServerConfig = new NettyServerConfig();   23         final NettyClientConfig nettyClientConfig = new NettyClientConfig();   24   25         nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,   26             String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));   27         nettyServerConfig.setListenPort(10911);   28         final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();   29   30         if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {   31             int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;   32             messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);   33         }   34   35         if (commandLine.hasOption('c')) {   36             String file = commandLine.getOptionValue('c');   37             if (file != null) {   38                 configFile = file;   39                 InputStream in = new BufferedInputStream(new FileInputStream(file));   40                 properties = new Properties();   41                 properties.load(in);   42   43                 properties2SystemEnv(properties);   44                 MixAll.properties2Object(properties, brokerConfig);   45                 MixAll.properties2Object(properties, nettyServerConfig);   46                 MixAll.properties2Object(properties, nettyClientConfig);   47                 MixAll.properties2Object(properties, messageStoreConfig);   48   49                 BrokerPathConfigHelper.setBrokerConfigPath(file);   50                 in.close();   51             }   52         }   53   54         MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);   55   56         if (null == brokerConfig.getRocketmqHome()) {   57             System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);   58             System.exit(-2);   59         }   60   61         String namesrvAddr = brokerConfig.getNamesrvAddr();   62         if (null != namesrvAddr) {   63             try {   64                 String[] addrArray = namesrvAddr.split(";");   65                 for (String addr : addrArray) {   66                     RemotingUtil.string2SocketAddress(addr);   67                 }   68             } catch (Exception e) {   69                 System.out.printf(   70                     "The Name Server Address[%s] illegal, please set it as follows, "127.0.0.1:9876;192.168.0.1:9876"%n",   71                     namesrvAddr);   72                 System.exit(-3);   73             }   74         }   75   76         switch (messageStoreConfig.getBrokerRole()) {   77             case ASYNC_MASTER:   78             case SYNC_MASTER:   79                 brokerConfig.setBrokerId(MixAll.MASTER_ID);   80                 break;   81             case SLAVE:   82                 if (brokerConfig.getBrokerId() <= 0) {   83                     System.out.printf("Slave's brokerId must be > 0");   84                     System.exit(-3);   85                 }   86   87                 break;   88             default:   89                 break;   90         }   91   92         messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);   93         LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();   94         JoranConfigurator configurator = new JoranConfigurator();   95         configurator.setContext(lc);   96         lc.reset();   97         configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");   98   99         if (commandLine.hasOption('p')) {  100             InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);  101             MixAll.printObjectProperties(console, brokerConfig);  102             MixAll.printObjectProperties(console, nettyServerConfig);  103             MixAll.printObjectProperties(console, nettyClientConfig);  104             MixAll.printObjectProperties(console, messageStoreConfig);  105             System.exit(0);  106         } else if (commandLine.hasOption('m')) {  107             InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);  108             MixAll.printObjectProperties(console, brokerConfig, true);  109             MixAll.printObjectProperties(console, nettyServerConfig, true);  110             MixAll.printObjectProperties(console, nettyClientConfig, true);  111             MixAll.printObjectProperties(console, messageStoreConfig, true);  112             System.exit(0);  113         }  114  115         log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);  116         MixAll.printObjectProperties(log, brokerConfig);  117         MixAll.printObjectProperties(log, nettyServerConfig);  118         MixAll.printObjectProperties(log, nettyClientConfig);  119         MixAll.printObjectProperties(log, messageStoreConfig);  120  121         final BrokerController controller = new BrokerController(  122             brokerConfig,  123             nettyServerConfig,  124             nettyClientConfig,  125             messageStoreConfig);  126         // remember all configs to prevent discard  127         controller.getConfiguration().registerConfig(properties);  128  129         boolean initResult = controller.initialize();  130         if (!initResult) {  131             controller.shutdown();  132             System.exit(-3);  133         }  134  135         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {  136             private volatile boolean hasShutdown = false;  137             private AtomicInteger shutdownTimes = new AtomicInteger(0);  138  139             @Override  140             public void run() {  141                 synchronized (this) {  142                     log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());  143                     if (!this.hasShutdown) {  144                         this.hasShutdown = true;  145                         long beginTime = System.currentTimeMillis();  146                         controller.shutdown();  147                         long consumingTimeTotal = System.currentTimeMillis() - beginTime;  148                         log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);  149                     }  150                 }  151             }  152         }, "ShutdownHook"));  153  154         return controller;  155     } catch (Throwable e) {  156         e.printStackTrace();  157         System.exit(-1);  158     }  159  160     return null;  161 }

這裡和NameServer中的createNamesrvController方法作用類似,對Broker所需做了一系列的配置

先設置了Netty通訊時的緩衝區大小,這裡默認是128K

接著會創建了幾個實體類
BrokerConfig,用來封裝其絕大多數基本配置資訊
NettyServerConfig,封裝了其作為對外暴露的消息隊列伺服器的資訊
NettyClientConfig,則封裝了其作為NameServer客戶端的資訊
這裡面封裝的資訊和NameServer一個道理,都是映射了配置文件相應的配置

然後對NettyClientConfig的TLS進行設置
讓NettyServerConfig默認監聽10911埠

緊接著創建了一個MessageStoreConfig,這個就是用來封裝Store的資訊,

MessageStoreConfig會默認配置BrokerRole為ASYNC_MASTER
Broker有三種身份,用BrokerRole枚舉來表示:

1 public enum BrokerRole {  2     ASYNC_MASTER,  3     SYNC_MASTER,  4     SLAVE;  5 }

也就是非同步Master,同步Master,以及Slave

這裡會對其身份進行檢查,若是Slave,則需要調整其允許的消息最大記憶體佔比,默認值是40,也就是說Master允許消息最大記憶體佔用40%,而Slave則只允許30%

接著會對”-c“指令進行相應配置的載入

往後看到對namesrvAddr進行了檢查,只是簡單地檢查NameServer集群地址資訊是否合法

往下看到有個switch塊,其根據Broker的身份,進行設置
只要是Master,將其BrokerId設為0,而Slave的BrokerId需要大於0
(一個Master可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave的對應關係通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,大於0表示Slave,Master也可以部署多個)

繼續往下,這裡會對Store設置HA的監聽埠,是NettyServer偵聽埠加1

往下是對“-p”,”-m“指令進行相應配置的載入,以及日誌的相關配置

之後就會創建了一個BrokerController:

 1 public BrokerController(   2         final BrokerConfig brokerConfig,   3         final NettyServerConfig nettyServerConfig,   4         final NettyClientConfig nettyClientConfig,   5         final MessageStoreConfig messageStoreConfig   6     ) {   7     this.brokerConfig = brokerConfig;   8     this.nettyServerConfig = nettyServerConfig;   9     this.nettyClientConfig = nettyClientConfig;  10     this.messageStoreConfig = messageStoreConfig;  11     this.consumerOffsetManager = new ConsumerOffsetManager(this);  12     this.topicConfigManager = new TopicConfigManager(this);  13     this.pullMessageProcessor = new PullMessageProcessor(this);  14     this.pullRequestHoldService = new PullRequestHoldService(this);  15     this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);  16     this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);  17     this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);  18     this.consumerFilterManager = new ConsumerFilterManager(this);  19     this.producerManager = new ProducerManager();  20     this.clientHousekeepingService = new ClientHousekeepingService(this);  21     this.broker2Client = new Broker2Client(this);  22     this.subscriptionGroupManager = new SubscriptionGroupManager(this);  23     this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);  24     this.filterServerManager = new FilterServerManager(this);  25  26     this.slaveSynchronize = new SlaveSynchronize(this);  27  28     this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());  29     this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());  30     this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());  31     this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());  32     this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());  33     this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());  34     this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());  35  36     this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());  37     this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));  38  39     this.brokerFastFailure = new BrokerFastFailure(this);  40     this.configuration = new Configuration(  41         log,  42         BrokerPathConfigHelper.getBrokerConfigPath(),  43         this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig  44     );  45 }

可以看到,這裡實例化了許多成員,我就不一一分析,挑幾個重要的介紹

 

ConsumerOffsetManager:用來管理消費者的消費消息的進度,主要通過一張map來快取

1 private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =  2         new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);

由topic@group的形式構成鍵,而值中的map的Integer代表具體的哪條消息隊列,Long表示該消息隊列的偏移量offset

 

TopicConfigManager:管理Topic和消息隊列的資訊,主要通過一張map來快取

1 private final ConcurrentMap<String, TopicConfig> topicConfigTable =  2         new ConcurrentHashMap<String, TopicConfig>(1024);  3     private final DataVersion dataVersion = new DataVersion();

鍵就是Topic,值TopicConfig用來記錄對應的消息隊列的個數

PullMessageProcessor、PullRequestHoldService、NotifyMessageArrivingListener這三個來管理Pull消息請求,關於Pull消息在後續部落格再細說

ConsumerManager:管理Consumer,主要通過一張map來快取

1 private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =  2         new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);

鍵值就是Consumer的GroupName,
而ConsumerGroupInfo由如下構成:

1 private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =  2         new ConcurrentHashMap<String, SubscriptionData>();  3 private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =  4 new ConcurrentHashMap<Channel, ClientChannelInfo>(16);  5 private volatile ConsumeType consumeType;  6 private volatile MessageModel messageModel;        

可以看到封裝了一個subscriptionTable ,這個map記錄Topic和訂閱內容
以及一個channelInfoTable,記錄Consumer的物理連接
ConsumeType是一個枚舉,表明兩種消費方式:

1 public enum ConsumeType {  2     CONSUME_ACTIVELY("PULL"),  3     CONSUME_PASSIVELY("PUSH");  4 }

MessageModel 也是一個枚舉,表明兩種消費模式:

 1 public enum MessageModel {   2     /**   3      * broadcast   4      */   5     BROADCASTING("BROADCASTING"),   6     /**   7      * clustering   8      */   9     CLUSTERING("CLUSTERING");  10 }

Broadcasting:同一個ConsumerGroup里的每個Consumer都能消費到所訂閱Topic的全部消息,也就是一個消息會被多次分發,被多個Consumer消費
Clustering:同一個ConsumerGroup里的每個Consumer只消費所訂閱消息的一部分內容,同一個ConsumerGroup里所有的Consumer消費的內容合起來才是所訂閱Topic內容的整體,從而達到負載均衡的目的

結合著來看,也就是說使用相同GroupName的一組Consumer,其ConsumeType和MessageModel必定相同,其訂閱的Topic會根據ConsumeType和MessageModel來完成相應的方式的消息處理

回到BrokerController的構造

ProducerManager:管理Producer,主要通過一張map來快取

1 private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =  2         new HashMap<String, HashMap<Channel, ClientChannelInfo>>();

相比ConsumerManager,對Producer的管理簡單的多,只需要記錄group name 和物理連接的映射

 

再回到createBrokerController方法,在完成BrokerController的創建後,會調用BrokerController的initialize方法:

BrokerController的initialize方法:

  1 public boolean initialize() throws CloneNotSupportedException {    2     boolean result = this.topicConfigManager.load();    3    4     result = result && this.consumerOffsetManager.load();    5     result = result && this.subscriptionGroupManager.load();    6     result = result && this.consumerFilterManager.load();    7    8     if (result) {    9         try {   10             this.messageStore =   11                 new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,   12                     this.brokerConfig);   13             if (messageStoreConfig.isEnableDLegerCommitLog()) {   14                 DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);   15                 ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);   16             }   17             this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);   18             //load plugin   19             MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);   20             this.messageStore = MessageStoreFactory.build(context, this.messageStore);   21             this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));   22         } catch (IOException e) {   23             result = false;   24             log.error("Failed to initialize", e);   25         }   26     }   27   28     result = result && this.messageStore.load();   29   30     if (result) {   31         this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);   32         NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();   33         fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);   34         this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);   35         this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(   36             this.brokerConfig.getSendMessageThreadPoolNums(),   37             this.brokerConfig.getSendMessageThreadPoolNums(),   38             1000 * 60,   39             TimeUnit.MILLISECONDS,   40             this.sendThreadPoolQueue,   41             new ThreadFactoryImpl("SendMessageThread_"));   42   43         this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(   44             this.brokerConfig.getPullMessageThreadPoolNums(),   45             this.brokerConfig.getPullMessageThreadPoolNums(),   46             1000 * 60,   47             TimeUnit.MILLISECONDS,   48             this.pullThreadPoolQueue,   49             new ThreadFactoryImpl("PullMessageThread_"));   50   51         this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(   52             this.brokerConfig.getQueryMessageThreadPoolNums(),   53             this.brokerConfig.getQueryMessageThreadPoolNums(),   54             1000 * 60,   55             TimeUnit.MILLISECONDS,   56             this.queryThreadPoolQueue,   57             new ThreadFactoryImpl("QueryMessageThread_"));   58   59         this.adminBrokerExecutor =   60             Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(   61                 "AdminBrokerThread_"));   62   63         this.clientManageExecutor = new ThreadPoolExecutor(   64             this.brokerConfig.getClientManageThreadPoolNums(),   65             this.brokerConfig.getClientManageThreadPoolNums(),   66             1000 * 60,   67             TimeUnit.MILLISECONDS,   68             this.clientManagerThreadPoolQueue,   69             new ThreadFactoryImpl("ClientManageThread_"));   70   71         this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(   72             this.brokerConfig.getHeartbeatThreadPoolNums(),   73             this.brokerConfig.getHeartbeatThreadPoolNums(),   74             1000 * 60,   75             TimeUnit.MILLISECONDS,   76             this.heartbeatThreadPoolQueue,   77             new ThreadFactoryImpl("HeartbeatThread_", true));   78   79         this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(   80             this.brokerConfig.getEndTransactionThreadPoolNums(),   81             this.brokerConfig.getEndTransactionThreadPoolNums(),   82             1000 * 60,   83             TimeUnit.MILLISECONDS,   84             this.endTransactionThreadPoolQueue,   85             new ThreadFactoryImpl("EndTransactionThread_"));   86   87         this.consumerManageExecutor =   88             Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(   89                 "ConsumerManageThread_"));   90   91         this.registerProcessor();   92   93         final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();   94         final long period = 1000 * 60 * 60 * 24;   95         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {   96             @Override   97             public void run() {   98                 try {   99                     BrokerController.this.getBrokerStats().record();  100                 } catch (Throwable e) {  101                     log.error("schedule record error.", e);  102                 }  103             }  104         }, initialDelay, period, TimeUnit.MILLISECONDS);  105  106         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  107             @Override  108             public void run() {  109                 try {  110                     BrokerController.this.consumerOffsetManager.persist();  111                 } catch (Throwable e) {  112                     log.error("schedule persist consumerOffset error.", e);  113                 }  114             }  115         }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);  116  117         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  118             @Override  119             public void run() {  120                 try {  121                     BrokerController.this.consumerFilterManager.persist();  122                 } catch (Throwable e) {  123                     log.error("schedule persist consumer filter error.", e);  124                 }  125             }  126         }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);  127  128         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  129             @Override  130             public void run() {  131                 try {  132                     BrokerController.this.protectBroker();  133                 } catch (Throwable e) {  134                     log.error("protectBroker error.", e);  135                 }  136             }  137         }, 3, 3, TimeUnit.MINUTES);  138  139         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  140             @Override  141             public void run() {  142                 try {  143                     BrokerController.this.printWaterMark();  144                 } catch (Throwable e) {  145                     log.error("printWaterMark error.", e);  146                 }  147             }  148         }, 10, 1, TimeUnit.SECONDS);  149  150         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  151  152             @Override  153             public void run() {  154                 try {  155                     log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());  156                 } catch (Throwable e) {  157                     log.error("schedule dispatchBehindBytes error.", e);  158                 }  159             }  160         }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);  161  162         if (this.brokerConfig.getNamesrvAddr() != null) {  163             this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());  164             log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());  165         } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {  166             this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  167  168                 @Override  169                 public void run() {  170                     try {  171                         BrokerController.this.brokerOuterAPI.fetchNameServerAddr();  172                     } catch (Throwable e) {  173                         log.error("ScheduledTask fetchNameServerAddr exception", e);  174                     }  175                 }  176             }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);  177         }  178  179         if (!messageStoreConfig.isEnableDLegerCommitLog()) {  180             if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {  181                 if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {  182                     this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());  183                     this.updateMasterHAServerAddrPeriodically = false;  184                 } else {  185                     this.updateMasterHAServerAddrPeriodically = true;  186                 }  187             } else {  188                 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  189                     @Override  190                     public void run() {  191                         try {  192                             BrokerController.this.printMasterAndSlaveDiff();  193                         } catch (Throwable e) {  194                             log.error("schedule printMasterAndSlaveDiff error.", e);  195                         }  196                     }  197                 }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);  198             }  199         }  200  201         if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {  202             // Register a listener to reload SslContext  203             try {  204                 fileWatchService = new FileWatchService(  205                     new String[] {  206                         TlsSystemConfig.tlsServerCertPath,  207                         TlsSystemConfig.tlsServerKeyPath,  208                         TlsSystemConfig.tlsServerTrustCertPath  209                     },  210                     new FileWatchService.Listener() {  211                         boolean certChanged, keyChanged = false;  212  213                         @Override  214                         public void onChanged(String path) {  215                             if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {  216                                 log.info("The trust certificate changed, reload the ssl context");  217                                 reloadServerSslContext();  218                             }  219                             if (path.equals(TlsSystemConfig.tlsServerCertPath)) {  220                                 certChanged = true;  221                             }  222                             if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {  223                                 keyChanged = true;  224                             }  225                             if (certChanged && keyChanged) {  226                                 log.info("The certificate and private key changed, reload the ssl context");  227                                 certChanged = keyChanged = false;  228                                 reloadServerSslContext();  229                             }  230                         }  231  232                         private void reloadServerSslContext() {  233                             ((NettyRemotingServer) remotingServer).loadSslContext();  234                             ((NettyRemotingServer) fastRemotingServer).loadSslContext();  235                         }  236                     });  237             } catch (Exception e) {  238                 log.warn("FileWatchService created error, can't load the certificate dynamically");  239             }  240         }  241         initialTransaction();  242         initialAcl();  243         initialRpcHooks();  244     }  245     return result;  246 }

首先完成對…/store/config/topics.json、…/store/config/consumerOffset.json、
…/store/config/subscriptionGroup.json、…/store/config/consumerFilter.json這幾個文件的載入

接著創建一個DefaultMessageStore,這是Broker的核心存儲

DefaultMessageStore的構造:

 1 private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;   2 ......   3 public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,   4         final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {   5     this.messageArrivingListener = messageArrivingListener;   6     this.brokerConfig = brokerConfig;   7     this.messageStoreConfig = messageStoreConfig;   8     this.brokerStatsManager = brokerStatsManager;   9     // 請求定位服務  10     this.allocateMappedFileService = new AllocateMappedFileService(this);  11     // 存儲服務  12     if (messageStoreConfig.isEnableDLegerCommitLog()) {  13         this.commitLog = new DLedgerCommitLog(this);  14     } else {  15         this.commitLog = new CommitLog(this);  16     }  17     // 消費隊列資訊  18     this.consumeQueueTable = new ConcurrentHashMap<>(32);  19     // 刷新隊列服務  20     this.flushConsumeQueueService = new FlushConsumeQueueService();  21     // 清除CommitLog數據服務  22     this.cleanCommitLogService = new CleanCommitLogService();  23     // 清除消費隊列服務  24     this.cleanConsumeQueueService = new CleanConsumeQueueService();  25     this.storeStatsService = new StoreStatsService();  26     // 索引服務  27     this.indexService = new IndexService(this);  28  29     // HA服務,主從複製  30     if (!messageStoreConfig.isEnableDLegerCommitLog()) {  31         this.haService = new HAService(this);  32     } else {  33         this.haService = null;  34     }  35     this.reputMessageService = new ReputMessageService();  36     this.scheduleMessageService = new ScheduleMessageService(this);  37  38     this.transientStorePool = new TransientStorePool(messageStoreConfig);  39  40     if (messageStoreConfig.isTransientStorePoolEnable()) {  41         this.transientStorePool.init();  42     }  43  44     this.allocateMappedFileService.start();  45  46     this.indexService.start();  47  48     this.dispatcherList = new LinkedList<>();  49     this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());  50     this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());  51  52     File file = new File(StorePathConfigHelper.getLockFile(messageStoreConfig.getStorePathRootDir()));  53     MappedFile.ensureDirOK(file.getParent());  54     lockFile = new RandomAccessFile(file, "rw");  55 }

可以看到DefaultMessageStore的構造會創建很多服務,來管理store的存儲

其中isEnableDLegerCommitLog用來判斷是否使用DLeger,默認false是關閉的
所以在默認情況下使用CommitLog + HAService

 

關於DLeger可參考這篇部落格 【Dledger-RocketMQ 基於Raft協議的commitlog存儲庫】

後續的主從複製還是以CommitLog + HAService為主

回到initialize方法,接著會調用DefaultMessageStore的load方法:

 1 public boolean load() {   2     boolean result = true;   3   4     try {   5         boolean lastExitOK = !this.isTempFileExist();   6         log.info("last shutdown {}", lastExitOK ? "normally" : "abnormally");   7   8         if (null != scheduleMessageService) {   9             result = result && this.scheduleMessageService.load();  10         }  11  12         // load Commit Log  13         result = result && this.commitLog.load();  14  15         // load Consume Queue  16         result = result && this.loadConsumeQueue();  17  18         if (result) {  19             this.storeCheckpoint =  20                 new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));  21  22             this.indexService.load(lastExitOK);  23  24             this.recover(lastExitOK);  25  26             log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());  27         }  28     } catch (Exception e) {  29         log.error("load exception", e);  30         result = false;  31     }  32  33     if (!result) {  34         this.allocateMappedFileService.shutdown();  35     }  36  37     return result;  38 }

這裡會載入CommitLog和ConsumeQueue對應的文件

接著創建熟悉的NettyRemotingServer,在前面部落格中介紹過了,就不再展開
這裡會根據nettyServerConfig克隆一份服務端配置,以此創建fastRemotingServer服務端,只不過這個服務端偵聽的是上面服務端埠減2的埠號

看過我前面的部落格就會發現這個fastRemotingServer的埠號其實就是之前提到過的VIP通道
詳見:

【RocketMQ中Producer的啟動源碼分析】

【RocketMQ中Producer消息的發送源碼分析】

接著會根據不同的需求創建很多不同的執行緒池

然後調用registerProcessor方法:

registerProcessor方法:

 1 public void registerProcessor() {   2    /**   3      * SendMessageProcessor   4      */   5     SendMessageProcessor sendProcessor = new SendMessageProcessor(this);   6     sendProcessor.registerSendMessageHook(sendMessageHookList);   7     sendProcessor.registerConsumeMessageHook(consumeMessageHookList);   8   9     this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);  10     this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);  11     this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);  12     this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);  13     this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);  14     this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);  15     this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);  16     this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);  17     /**  18      * PullMessageProcessor  19      */  20     this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);  21     this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);  22  23     /**  24      * QueryMessageProcessor  25      */  26     NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);  27     this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);  28     this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);  29  30     this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);  31     this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);  32  33     /**  34      * ClientManageProcessor  35      */  36     ClientManageProcessor clientProcessor = new ClientManageProcessor(this);  37     this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);  38     this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);  39     this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);  40  41     this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);  42     this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);  43     this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);  44  45     /**  46      * ConsumerManageProcessor  47      */  48     ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);  49     this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);  50     this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);  51     this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);  52  53     this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);  54     this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);  55     this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);  56  57     /**  58      * EndTransactionProcessor  59      */  60     this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);  61     this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);  62  63     /**  64      * Default  65      */  66     AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);  67     this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);  68     this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);  69 }

這裡會創建好幾種Processor,通過registerProcessor方法同時註冊到remotingServer和fastRemotingServer中

registerProcessor方法:

1 public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {  2         ExecutorService executorThis = executor;  3     if (null == executor) {  4         executorThis = this.publicExecutor;  5     }  6  7     Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);  8     this.processorTable.put(requestCode, pair);  9 }

這裡實際上就是向processorTable進行了記錄的添加,為的是後續收到請求能做出對應的處理

processorTable:

1 protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =  2         new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);

 

這裡的SendMessageProcessor很重要,後續會詳細介紹

 

在完成registerProcessor後,會創建8個定時任務

1 BrokerController.this.getBrokerStats().record();

定時列印Broker的狀態

1 BrokerController.this.consumerOffsetManager.persist();

定時向consumerOffset.json文件中寫入消費者偏移量

1 BrokerController.this.consumerFilterManager.persist();

定時向consumerFilter.json文件寫入消費者過濾器資訊

1 BrokerController.this.protectBroker();

定時禁用消費慢的consumer,保護Broker,需要設置disableConsumeIfConsumerReadSlowly屬性,默認false

1 BrokerController.this.printWaterMark();

定時列印Send、Pull、Query、Transaction隊列資訊

1 public void run() {  2     try {  3         log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());  4     } catch (Throwable e) {  5         log.error("schedule dispatchBehindBytes error.", e);  6     }  7 }

定時列印已存儲在提交日誌中但尚未調度到消費隊列的位元組數

 1 if (this.brokerConfig.getNamesrvAddr() != null) {   2     this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());   3     log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());   4 } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {   5     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {   6   7         @Override   8         public void run() {   9             try {  10                 BrokerController.this.brokerOuterAPI.fetchNameServerAddr();  11             } catch (Throwable e) {  12                 log.error("ScheduledTask fetchNameServerAddr exception", e);  13             }  14         }  15     }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);  16 }

若是設置了NamesrvAddr,需要通過updateNameServerAddressList完成一次NameServer地址的跟新(updateNameServerAddressList在前面部落格介紹過了)
若是設置了NamesrvAddr,並且設置了fetchNamesrvAddrByAddressServer屬性(默認關閉),需要定時獲取更新NameServer地址(fetchNameServerAddr方法在之前部落格也介紹過)

 1 if (!messageStoreConfig.isEnableDLegerCommitLog()) {   2     if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {   3         if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {   4             this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());   5             this.updateMasterHAServerAddrPeriodically = false;   6         } else {   7             this.updateMasterHAServerAddrPeriodically = true;   8         }   9     } else {  10         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  11             @Override  12             public void run() {  13                 try {  14                     BrokerController.this.printMasterAndSlaveDiff();  15                 } catch (Throwable e) {  16                     log.error("schedule printMasterAndSlaveDiff error.", e);  17                 }  18             }  19         }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);  20     }  21 }

在非DLeger模式下
若是SLAVE,則需要檢查是否設置了HA的Master地址
若設置了Master地址要通過updateHaMasterAddress方法向更新Master地址
updateHaMasterAddress方法實則是在HAClient中實現的:

1 public void updateMasterAddress(final String newAddr) {  2     String currentAddr = this.masterAddress.get();  3     if (currentAddr == null || !currentAddr.equals(newAddr)) {  4         this.masterAddress.set(newAddr);  5         log.info("update master address, OLD: " + currentAddr + " NEW: " + newAddr);  6     }  7 }

非常簡單,只是一個比較替換的操作

若沒有設置需要更改updateMasterHAServerAddrPeriodically為true,在後面會有用

若是MASTER,則需要定時列印slave落後的位元組數

 

設置完定時任務後,和NameServer中一樣設置了對SslContext的監聽

接著通過initialTransaction方法,載入事務需要的實例
initialTransaction方法:

 1 private void initialTransaction() {   2     this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);   3     if (null == this.transactionalMessageService) {   4         this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));   5         log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());   6     }   7     this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);   8     if (null == this.transactionalMessageCheckListener) {   9         this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();  10         log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());  11     }  12     this.transactionalMessageCheckListener.setBrokerController(this);  13     this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);  14 }

這裡動態載入了TransactionalMessageService和AbstractTransactionalMessageCheckListener的實現類,位於如下
“META-INF/service/org.apache.rocketmq.broker.transaction.TransactionalMessageService”
“META-INF/service/org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener”
還創建了TransactionalMessageCheckService

initialAcl會創建ACL許可權檢查:

 1 private void initialAcl() {   2    if (!this.brokerConfig.isAclEnable()) {   3         log.info("The broker dose not enable acl");   4         return;   5     }   6   7     List<AccessValidator> accessValidators = ServiceProvider.load(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);   8     if (accessValidators == null || accessValidators.isEmpty()) {   9         log.info("The broker dose not load the AccessValidator");  10         return;  11     }  12  13     for (AccessValidator accessValidator: accessValidators) {  14         final AccessValidator validator = accessValidator;  15         this.registerServerRPCHook(new RPCHook() {  16  17             @Override  18             public void doBeforeRequest(String remoteAddr, RemotingCommand request) {  19                 //Do not catch the exception  20                 validator.validate(validator.parse(request, remoteAddr));  21             }  22  23             @Override  24             public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {  25             }  26         });  27     }  28 }

需要設置aclEnable屬性,默認關閉
若是設置了,同樣會載入”META-INF/service/org.apache.rocketmq.acl.AccessValidator”配置的AccessValidator實體類
然後將其包裝成RPC鉤子,註冊到remotingServer和fastRemotingServer中,用於請求的調用validate方法進行ACL許可權檢查

initialRpcHooks方法則會註冊配置了的RPC鉤子:

1 private void initialRpcHooks() {  2     List<RPCHook> rpcHooks = ServiceProvider.load(ServiceProvider.RPC_HOOK_ID, RPCHook.class);  3     if (rpcHooks == null || rpcHooks.isEmpty()) {  4         return;  5     }  6     for (RPCHook rpcHook: rpcHooks) {  7         this.registerServerRPCHook(rpcHook);  8     }  9 }

載入”META-INF/service/org.apache.rocketmq.remoting.RPCHook”下的配置的實體類

 

到此Broker啟動前的準備工作已經完成,後面start方法會進行真正的啟動,在下一篇部落格繼續分析