聊聊rocketmq的maxReconsumeTimes

  • 2019 年 11 月 23 日
  • 筆記

本文主要研究一下rocketmq的maxReconsumeTimes

maxReconsumeTimes

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {        private final InternalLogger log = ClientLogger.getLog();        //......        /**       * Max re-consume times. -1 means 16 times.       * </p>       *       * If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion       * queue waiting.       */      private int maxReconsumeTimes = -1;        //......        public int getMaxReconsumeTimes() {          return maxReconsumeTimes;      }        public void setMaxReconsumeTimes(final int maxReconsumeTimes) {          this.maxReconsumeTimes = maxReconsumeTimes;      }        //......  }
  • DefaultMQPushConsumer定义了maxReconsumeTimes属性,默认为-1

sendMessageBack

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {    //......        public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)          throws RemotingException, MQBrokerException, InterruptedException, MQClientException {          try {              String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)                  : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());              this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,                  this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());          } catch (Exception e) {              log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);                Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());                String originMsgId = MessageAccessor.getOriginMessageId(msg);              MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);                newMsg.setFlag(msg.getFlag());              MessageAccessor.setProperties(newMsg, msg.getProperties());              MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());              MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));              MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));              newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());                this.mQClientFactory.getDefaultMQProducer().send(newMsg);          } finally {              msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));          }      }        private int getMaxReconsumeTimes() {          // default reconsume times: 16          if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {              return 16;          } else {              return this.defaultMQPushConsumer.getMaxReconsumeTimes();          }      }        //......  }
  • DefaultMQPushConsumerImpl的sendMessageBack方法会对mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack进行异常捕获,出现异常时会使用MessageAccessor.setReconsumeTime更新newMsg的reconsumeTime,以及调用getMaxReconsumeTimes方法设置newMsg的maxReconsumeTimes,最后使用mQClientFactory.getDefaultMQProducer().send(newMsg)发送消息

handleRetryAndDLQ

rocketmq/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {    //......        private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,                                        RemotingCommand request,                                        MessageExt msg, TopicConfig topicConfig) {          String newTopic = requestHeader.getTopic();          if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {              String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());              SubscriptionGroupConfig subscriptionGroupConfig =                  this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);              if (null == subscriptionGroupConfig) {                  response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);                  response.setRemark(                      "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));                  return false;              }                int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();              if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {                  maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();              }              int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();              if (reconsumeTimes >= maxReconsumeTimes) {                  newTopic = MixAll.getDLQTopic(groupName);                  int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;                  topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,                      DLQ_NUMS_PER_GROUP,                      PermName.PERM_WRITE, 0                  );                  msg.setTopic(newTopic);                  msg.setQueueId(queueIdInt);                  if (null == topicConfig) {                      response.setCode(ResponseCode.SYSTEM_ERROR);                      response.setRemark("topic[" + newTopic + "] not exist");                      return false;                  }              }          }          int sysFlag = requestHeader.getSysFlag();          if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {              sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;          }          msg.setSysFlag(sysFlag);          return true;      }        //......  }
  • SendMessageProcessor的handleRetryAndDLQ方法会判断如果topic是RETRY_GROUP_TOPIC_PREFIX(%RETRY%)开头的,会先从subscriptionGroupConfig.getRetryMaxTimes()获取maxReconsumeTimes,对于mq版本大于等于MQVersion.Version.V3_4_9.ordinal()的则会从request的header中读取maxReconsumeTimes;之后从request的header读取reconsumeTimes,如果该值大于等于maxReconsumeTimes则更新newTopic为MixAll.getDLQTopic(groupName)

小结

DefaultMQPushConsumer定义了maxReconsumeTimes属性,默认为-1;DefaultMQPushConsumerImpl的sendMessageBack方法会对mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack进行异常捕获,出现异常时会使用MessageAccessor.setReconsumeTime更新newMsg的reconsumeTime,以及调用getMaxReconsumeTimes方法设置newMsg的maxReconsumeTimes,最后使用mQClientFactory.getDefaultMQProducer().send(newMsg)发送消息;broker端会判断reconsumeTimes如果大于等于maxReconsumeTimes则会将其topic改为MixAll.getDLQTopic(groupName)