聊聊rocketmq的ClientManageProcessor

  • 2019 年 12 月 30 日
  • 筆記

本文主要研究一下rocketmq的ClientManageProcessor

NettyRequestProcessor

rocketmq-all-4.6.0-source-release/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java

public interface NettyRequestProcessor {      RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)          throws Exception;        boolean rejectRequest();  }
  • NettyRequestProcessor介面定義了processRequest、rejectRequest方法

ClientManageProcessor

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java

public class ClientManageProcessor implements NettyRequestProcessor {      private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);      private final BrokerController brokerController;        public ClientManageProcessor(final BrokerController brokerController) {          this.brokerController = brokerController;      }        @Override      public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)          throws RemotingCommandException {          switch (request.getCode()) {              case RequestCode.HEART_BEAT:                  return this.heartBeat(ctx, request);              case RequestCode.UNREGISTER_CLIENT:                  return this.unregisterClient(ctx, request);              case RequestCode.CHECK_CLIENT_CONFIG:                  return this.checkClientConfig(ctx, request);              default:                  break;          }          return null;      }        @Override      public boolean rejectRequest() {          return false;      }        //......  }
  • ClientManageProcessor實現了NettyRequestProcessor介面,其processRequest方法只處理code為RequestCode.HEART_BEAT、RequestCode.UNREGISTER_CLIENT或者RequestCode.CHECK_CLIENT_CONFIG的request;其中針對RequestCode.HEART_BEAT執行heartBeat方法,針對RequestCode.UNREGISTER_CLIENT執行unregisterClient方法,針對RequestCode.CHECK_CLIENT_CONFIG執行checkClientConfig方法;其rejectRequest返回false

heartBeat

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java

public class ClientManageProcessor implements NettyRequestProcessor {        //......        public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {          RemotingCommand response = RemotingCommand.createResponseCommand(null);          HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);          ClientChannelInfo clientChannelInfo = new ClientChannelInfo(              ctx.channel(),              heartbeatData.getClientID(),              request.getLanguage(),              request.getVersion()          );            for (ConsumerData data : heartbeatData.getConsumerDataSet()) {              SubscriptionGroupConfig subscriptionGroupConfig =                  this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(                      data.getGroupName());              boolean isNotifyConsumerIdsChangedEnable = true;              if (null != subscriptionGroupConfig) {                  isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();                  int topicSysFlag = 0;                  if (data.isUnitMode()) {                      topicSysFlag = TopicSysFlag.buildSysFlag(false, true);                  }                  String newTopic = MixAll.getRetryTopic(data.getGroupName());                  this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(                      newTopic,                      subscriptionGroupConfig.getRetryQueueNums(),                      PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);              }                boolean changed = this.brokerController.getConsumerManager().registerConsumer(                  data.getGroupName(),                  clientChannelInfo,                  data.getConsumeType(),                  data.getMessageModel(),                  data.getConsumeFromWhere(),                  data.getSubscriptionDataSet(),                  isNotifyConsumerIdsChangedEnable              );                if (changed) {                  log.info("registerConsumer info changed {} {}",                      data.toString(),                      RemotingHelper.parseChannelRemoteAddr(ctx.channel())                  );              }          }            for (ProducerData data : heartbeatData.getProducerDataSet()) {              this.brokerController.getProducerManager().registerProducer(data.getGroupName(),                  clientChannelInfo);          }          response.setCode(ResponseCode.SUCCESS);          response.setRemark(null);          return response;      }        //......  }
  • heartBeat方法遍歷heartbeatData.getConsumerDataSet(),挨個執行brokerController.getConsumerManager().registerConsumer方法;之後遍歷heartbeatData.getProducerDataSet(),挨個執行brokerController.getProducerManager().registerProducer

unregisterClient

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java

public class ClientManageProcessor implements NettyRequestProcessor {        //......        public RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request)          throws RemotingCommandException {          final RemotingCommand response =              RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class);          final UnregisterClientRequestHeader requestHeader =              (UnregisterClientRequestHeader) request                  .decodeCommandCustomHeader(UnregisterClientRequestHeader.class);            ClientChannelInfo clientChannelInfo = new ClientChannelInfo(              ctx.channel(),              requestHeader.getClientID(),              request.getLanguage(),              request.getVersion());          {              final String group = requestHeader.getProducerGroup();              if (group != null) {                  this.brokerController.getProducerManager().unregisterProducer(group, clientChannelInfo);              }          }            {              final String group = requestHeader.getConsumerGroup();              if (group != null) {                  SubscriptionGroupConfig subscriptionGroupConfig =                      this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);                  boolean isNotifyConsumerIdsChangedEnable = true;                  if (null != subscriptionGroupConfig) {                      isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();                  }                  this.brokerController.getConsumerManager().unregisterConsumer(group, clientChannelInfo, isNotifyConsumerIdsChangedEnable);              }          }            response.setCode(ResponseCode.SUCCESS);          response.setRemark(null);          return response;      }        //......  }
  • unregisterClient方法對於producerGroup執行brokerController.getProducerManager().unregisterProducer方法;對於consumerGroup執行brokerController.getConsumerManager().unregisterConsumer方法

checkClientConfig

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java

public class ClientManageProcessor implements NettyRequestProcessor {        //......        public RemotingCommand checkClientConfig(ChannelHandlerContext ctx, RemotingCommand request)          throws RemotingCommandException {          final RemotingCommand response = RemotingCommand.createResponseCommand(null);            CheckClientRequestBody requestBody = CheckClientRequestBody.decode(request.getBody(),              CheckClientRequestBody.class);            if (requestBody != null && requestBody.getSubscriptionData() != null) {              SubscriptionData subscriptionData = requestBody.getSubscriptionData();                if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {                  response.setCode(ResponseCode.SUCCESS);                  response.setRemark(null);                  return response;              }                if (!this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {                  response.setCode(ResponseCode.SYSTEM_ERROR);                  response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());                  return response;              }                try {                  FilterFactory.INSTANCE.get(subscriptionData.getExpressionType()).compile(subscriptionData.getSubString());              } catch (Exception e) {                  log.warn("Client {}@{} filter message, but failed to compile expression! sub={}, error={}",                      requestBody.getClientId(), requestBody.getGroup(), requestBody.getSubscriptionData(), e.getMessage());                  response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);                  response.setRemark(e.getMessage());                  return response;              }          }            response.setCode(ResponseCode.SUCCESS);          response.setRemark(null);          return response;      }        //......  }
  • checkClientConfig方法通過FilterFactory.INSTANCE.get(subscriptionData.getExpressionType()).compile(subscriptionData.getSubString())來校驗expression

小結

ClientManageProcessor實現了NettyRequestProcessor介面,其processRequest方法只處理code為RequestCode.HEART_BEAT、RequestCode.UNREGISTER_CLIENT或者RequestCode.CHECK_CLIENT_CONFIG的request;其中針對RequestCode.HEART_BEAT執行heartBeat方法,針對RequestCode.UNREGISTER_CLIENT執行unregisterClient方法,針對RequestCode.CHECK_CLIENT_CONFIG執行checkClientConfig方法;其rejectRequest返回false

doc

  • ClientManageProcessor