聊聊rocketmq的pullInterval

  • 2019 年 11 月 27 日
  • 笔记

本文主要研究一下rocketmq的pullInterval

pullInterval

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {        private final InternalLogger log = ClientLogger.getLog();        //......        /**       * Message pull Interval       */      private long pullInterval = 0;        public long getPullInterval() {          return pullInterval;      }        public void setPullInterval(long pullInterval) {          this.pullInterval = pullInterval;      }        //......  }
  • DefaultMQPushConsumer定义了pullInterval属性,默认值为0

checkConfig

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {    //......        private void checkConfig() throws MQClientException {          Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());            //......            // pullInterval          if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) {              throw new MQClientException(                  "pullInterval Out of range [0, 65535]"                      + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),                  null);          }            //......      }        //......  }
  • checkConfig方法会校验defaultMQPushConsumer.getPullInterval()必须大于等于0且小于等于65535

pullCallback

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {    //......            PullCallback pullCallback = new PullCallback() {              @Override              public void onSuccess(PullResult pullResult) {                  if (pullResult != null) {                      pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,                          subscriptionData);                        switch (pullResult.getPullStatus()) {                          case FOUND:                              long prevRequestOffset = pullRequest.getNextOffset();                              pullRequest.setNextOffset(pullResult.getNextBeginOffset());                              long pullRT = System.currentTimeMillis() - beginTimestamp;                              DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),                                  pullRequest.getMessageQueue().getTopic(), pullRT);                                long firstMsgOffset = Long.MAX_VALUE;                              if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {                                  DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                              } else {                                  firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();                                    DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),                                      pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());                                    boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());                                  DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(                                      pullResult.getMsgFoundList(),                                      processQueue,                                      pullRequest.getMessageQueue(),                                      dispatchToConsume);                                    if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {                                      DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,                                          DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());                                  } else {                                      DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                                  }                              }                                if (pullResult.getNextBeginOffset() < prevRequestOffset                                  || firstMsgOffset < prevRequestOffset) {                                  log.warn(                                      "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",                                      pullResult.getNextBeginOffset(),                                      firstMsgOffset,                                      prevRequestOffset);                              }                                break;                          case NO_NEW_MSG:                              pullRequest.setNextOffset(pullResult.getNextBeginOffset());                                DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                              break;                          case NO_MATCHED_MSG:                              pullRequest.setNextOffset(pullResult.getNextBeginOffset());                                DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);                              break;                          case OFFSET_ILLEGAL:                              log.warn("the pull request offset illegal, {} {}",                                  pullRequest.toString(), pullResult.toString());                              pullRequest.setNextOffset(pullResult.getNextBeginOffset());                                pullRequest.getProcessQueue().setDropped(true);                              DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {                                    @Override                                  public void run() {                                      try {                                          DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),                                              pullRequest.getNextOffset(), false);                                            DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());                                            DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());                                            log.warn("fix the pull request offset, {}", pullRequest);                                      } catch (Throwable e) {                                          log.error("executeTaskLater Exception", e);                                      }                                  }                              }, 10000);                              break;                          default:                              break;                      }                  }              }                @Override              public void onException(Throwable e) {                  if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                      log.warn("execute the pull request exception", e);                  }                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);              }          };        //......        private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {          this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);      }        //......  }    
  • pullCallback的onSuccess方法首先通过pullAPIWrapper.processPullResult处理pullResult,之后对于pullResult.getPullStatus()为FOUND类型的会判断pullResult.getMsgFoundList()是否为空,不为空则会执行consumeMessageService.submitConsumeRequest,然后再判断defaultMQPushConsumer.getPullInterval()是否大于0,大于0的话则执行executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()),否则这执行executePullRequestImmediately(pullRequest)

小结

DefaultMQPushConsumer定义了pullInterval属性,默认值为0;checkConfig方法会校验defaultMQPushConsumer.getPullInterval()必须大于等于0且小于等于65535;pullCallback的onSuccess方法判断defaultMQPushConsumer.getPullInterval()是否大于0,大于0的话则执行executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()),否则这执行executePullRequestImmediately(pullRequest)

doc

  • DefaultMQPushConsumer