聊聊rocketmq的SlaveSynchronize

  • 2019 年 12 月 31 日
  • 筆記

本文主要研究一下rocketmq的SlaveSynchronize

BrokerController

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

public class BrokerController {  ​      //......  ​      private void handleSlaveSynchronize(BrokerRole role) {          if (role == BrokerRole.SLAVE) {              if (null != slaveSyncFuture) {                  slaveSyncFuture.cancel(false);              }              this.slaveSynchronize.setMasterAddr(null);              slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {                  @Override                  public void run() {                      try {                          BrokerController.this.slaveSynchronize.syncAll();                      }                      catch (Throwable e) {                          log.error("ScheduledTask SlaveSynchronize syncAll error.", e);                      }                  }              }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);          } else {              //handle the slave synchronise              if (null != slaveSyncFuture) {                  slaveSyncFuture.cancel(false);              }              this.slaveSynchronize.setMasterAddr(null);          }      }  ​      //......  }
  • BrokerController有個handleSlaveSynchronize方法,在role為BrokerRole.SLAVE的時候,會註冊一個定時任務,每隔10秒鐘執行一次BrokerController.this.slaveSynchronize.syncAll()

SlaveSynchronize

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java

public class SlaveSynchronize {      private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);      private final BrokerController brokerController;      private volatile String masterAddr = null;  ​      public SlaveSynchronize(BrokerController brokerController) {          this.brokerController = brokerController;      }  ​      public String getMasterAddr() {          return masterAddr;      }  ​      public void setMasterAddr(String masterAddr) {          this.masterAddr = masterAddr;      }  ​      public void syncAll() {          this.syncTopicConfig();          this.syncConsumerOffset();          this.syncDelayOffset();          this.syncSubscriptionGroupConfig();      }  ​      //......  }
  • SlaveSynchronize的syncAll方法分別調用了syncTopicConfig、syncConsumerOffset、syncDelayOffset、syncSubscriptionGroupConfig方法

syncTopicConfig

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java

public class SlaveSynchronize {        //......  ​      private void syncTopicConfig() {          String masterAddrBak = this.masterAddr;          if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {              try {                  TopicConfigSerializeWrapper topicWrapper =                      this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);                  if (!this.brokerController.getTopicConfigManager().getDataVersion()                      .equals(topicWrapper.getDataVersion())) {  ​                      this.brokerController.getTopicConfigManager().getDataVersion()                          .assignNewOne(topicWrapper.getDataVersion());                      this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();                      this.brokerController.getTopicConfigManager().getTopicConfigTable()                          .putAll(topicWrapper.getTopicConfigTable());                      this.brokerController.getTopicConfigManager().persist();  ​                      log.info("Update slave topic config from master, {}", masterAddrBak);                  }              } catch (Exception e) {                  log.error("SyncTopicConfig Exception, {}", masterAddrBak, e);              }          }      }  ​      //......  }
  • syncTopicConfig方法從this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak)方法獲取TopicConfigSerializeWrapper,之後判斷其dataVersion是否與this.brokerController.getTopicConfigManager().getDataVersion()相同,不同的話則使用wrapper的數據更新brokerController.getTopicConfigManager(),然後持久化

syncConsumerOffset

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java

public class SlaveSynchronize {        //......  ​      private void syncConsumerOffset() {          String masterAddrBak = this.masterAddr;          if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {              try {                  ConsumerOffsetSerializeWrapper offsetWrapper =                      this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);                  this.brokerController.getConsumerOffsetManager().getOffsetTable()                      .putAll(offsetWrapper.getOffsetTable());                  this.brokerController.getConsumerOffsetManager().persist();                  log.info("Update slave consumer offset from master, {}", masterAddrBak);              } catch (Exception e) {                  log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);              }          }      }  ​      //......  }
  • syncConsumerOffset方法從this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak)獲取ConsumerOffsetSerializeWrapper,之後用其數據更新brokerController.getConsumerOffsetManager().getOffsetTable()並持久化

syncDelayOffset

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java

public class SlaveSynchronize {        //......  ​      private void syncDelayOffset() {          String masterAddrBak = this.masterAddr;          if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {              try {                  String delayOffset =                      this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);                  if (delayOffset != null) {  ​                      String fileName =                          StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController                              .getMessageStoreConfig().getStorePathRootDir());                      try {                          MixAll.string2File(delayOffset, fileName);                      } catch (IOException e) {                          log.error("Persist file Exception, {}", fileName, e);                      }                  }                  log.info("Update slave delay offset from master, {}", masterAddrBak);              } catch (Exception e) {                  log.error("SyncDelayOffset Exception, {}", masterAddrBak, e);              }          }      }  ​      //......  }
  • syncDelayOffset方法從this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak)獲取delayOffset,然後使用MixAll.string2File(delayOffset, fileName)持久化到文件

syncSubscriptionGroupConfig

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java

public class SlaveSynchronize {        //......  ​      private void syncSubscriptionGroupConfig() {          String masterAddrBak = this.masterAddr;          if (masterAddrBak != null  && !masterAddrBak.equals(brokerController.getBrokerAddr())) {              try {                  SubscriptionGroupWrapper subscriptionWrapper =                      this.brokerController.getBrokerOuterAPI()                          .getAllSubscriptionGroupConfig(masterAddrBak);  ​                  if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()                      .equals(subscriptionWrapper.getDataVersion())) {                      SubscriptionGroupManager subscriptionGroupManager =                          this.brokerController.getSubscriptionGroupManager();                      subscriptionGroupManager.getDataVersion().assignNewOne(                          subscriptionWrapper.getDataVersion());                      subscriptionGroupManager.getSubscriptionGroupTable().clear();                      subscriptionGroupManager.getSubscriptionGroupTable().putAll(                          subscriptionWrapper.getSubscriptionGroupTable());                      subscriptionGroupManager.persist();                      log.info("Update slave Subscription Group from master, {}", masterAddrBak);                  }              } catch (Exception e) {                  log.error("SyncSubscriptionGroup Exception, {}", masterAddrBak, e);              }          }      }  ​      //......  }
  • syncSubscriptionGroupConfig方法從this.brokerController.getBrokerOuterAPI().getAllSubscriptionGroupConfig(masterAddrBak)獲取SubscriptionGroupWrapper,之後判斷其dataVersion是否與this.brokerController.getSubscriptionGroupManager().getDataVersion()相同,不同的話則使用wrapper的數據更新subscriptionGroupManager.getSubscriptionGroupTable(),然後持久化

小結

BrokerController有個handleSlaveSynchronize方法,在role為BrokerRole.SLAVE的時候,會註冊一個定時任務,每隔10秒鐘執行一次BrokerController.this.slaveSynchronize.syncAll();SlaveSynchronize的syncAll方法分別調用了syncTopicConfig、syncConsumerOffset、syncDelayOffset、syncSubscriptionGroupConfig方法

doc