聊聊rocketmq的consumeConcurrentlyMaxSpan

  • 2019 年 11 月 23 日
  • 筆記

本文主要研究一下rocketmq的consumeConcurrentlyMaxSpan

consumeConcurrentlyMaxSpan

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {        private final InternalLogger log = ClientLogger.getLog();        //......        /**       * Concurrently max span offset.it has no effect on sequential consumption       */      private int consumeConcurrentlyMaxSpan = 2000;        public int getConsumeConcurrentlyMaxSpan() {          return consumeConcurrentlyMaxSpan;      }        public void setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan) {          this.consumeConcurrentlyMaxSpan = consumeConcurrentlyMaxSpan;      }        //......  }
  • DefaultMQPushConsumer定義了consumeConcurrentlyMaxSpan屬性,默認值為2000

checkConfig

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {  //......        private void checkConfig() throws MQClientException {          Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());            //......            // consumeConcurrentlyMaxSpan          if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1              || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {              throw new MQClientException(                  "consumeConcurrentlyMaxSpan Out of range [1, 65535]"                      + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),                  null);          }            //......      }    //......  }
  • checkConfig方法要求defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()大於等於1且小於等於65535

pullMessage

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {        /**       * Delay some time when exception occur       */      private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;      /**       * Flow control interval       */      private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;    //......        public void pullMessage(final PullRequest pullRequest) {          final ProcessQueue processQueue = pullRequest.getProcessQueue();          if (processQueue.isDropped()) {              log.info("the pull request[{}] is dropped.", pullRequest.toString());              return;          }            pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());            //......            if (!this.consumeOrderly) {              if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {                  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);                  if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {                      log.warn(                          "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",                          processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),                          pullRequest, queueMaxSpanFlowControlTimes);                  }                  return;              }          } else {              if (processQueue.isLocked()) {                  if (!pullRequest.isLockedFirst()) {                      final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());                      boolean brokerBusy = offset < pullRequest.getNextOffset();                      log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",                          pullRequest, offset, brokerBusy);                      if (brokerBusy) {                          log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",                              pullRequest, offset);                      }                        pullRequest.setLockedFirst(true);                      pullRequest.setNextOffset(offset);                  }              } else {                  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);                  log.info("pull message later because not locked in broker, {}", pullRequest);                  return;              }          }            //......        }        private void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {          this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, timeDelay);      }    //......  }
  • pullMessage方法在不是consumeOrderly的時候,會判斷processQueue.getMaxSpan()是否大於this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan(),大於則執行executePullRequestLater方法進行流控,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL默認值為50

ProcessQueue

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java

public class ProcessQueue {      public final static long REBALANCE_LOCK_MAX_LIVE_TIME =          Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));      public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));      private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));      private final InternalLogger log = ClientLogger.getLog();      private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();      private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();    //......        public long getMaxSpan() {          try {              this.lockTreeMap.readLock().lockInterruptibly();              try {                  if (!this.msgTreeMap.isEmpty()) {                      return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();                  }              } finally {                  this.lockTreeMap.readLock().unlock();              }          } catch (InterruptedException e) {              log.error("getMaxSpan exception", e);          }            return 0;      }        //......  }
  • ProcessQueue的getMaxSpan取的是msgTreeMap.lastKey() – this.msgTreeMap.firstKey()

PullMessageService

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/PullMessageService.java

public class PullMessageService extends ServiceThread {      private final InternalLogger log = ClientLogger.getLog();      private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();      private final MQClientInstance mQClientFactory;      private final ScheduledExecutorService scheduledExecutorService = Executors          .newSingleThreadScheduledExecutor(new ThreadFactory() {              @Override              public Thread newThread(Runnable r) {                  return new Thread(r, "PullMessageServiceScheduledThread");              }          });        public PullMessageService(MQClientInstance mQClientFactory) {          this.mQClientFactory = mQClientFactory;      }        public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {          if (!isStopped()) {              this.scheduledExecutorService.schedule(new Runnable() {                  @Override                  public void run() {                      PullMessageService.this.executePullRequestImmediately(pullRequest);                  }              }, timeDelay, TimeUnit.MILLISECONDS);          } else {              log.warn("PullMessageServiceScheduledThread has shutdown");          }      }        public void executePullRequestImmediately(final PullRequest pullRequest) {          try {              this.pullRequestQueue.put(pullRequest);          } catch (InterruptedException e) {              log.error("executePullRequestImmediately pullRequestQueue.put", e);          }      }        @Override      public void run() {          log.info(this.getServiceName() + " service started");            while (!this.isStopped()) {              try {                  PullRequest pullRequest = this.pullRequestQueue.take();                  this.pullMessage(pullRequest);              } catch (InterruptedException ignored) {              } catch (Exception e) {                  log.error("Pull Message Service Run Method exception", e);              }          }            log.info(this.getServiceName() + " service end");      }        private void pullMessage(final PullRequest pullRequest) {          final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());          if (consumer != null) {              DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;              impl.pullMessage(pullRequest);          } else {              log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);          }      }        //......  }
  • executePullRequestLater方法往scheduledExecutorService調度一個延時任務,該任務執行的是executePullRequestImmediately(pullRequest)方法,該方法往pullRequestQueue隊列放入pullRequest;run方法會從pullRequestQueue取pullRequest,然後執行pullMessage方法;pullMessage方法首先通過mQClientFactory.selectConsumer取出consumer,然後執行該consumer的pullMessage方法

小結

DefaultMQPushConsumer定義了consumeConcurrentlyMaxSpan屬性,默認值為2000;DefaultMQPushConsumerImpl的pullMessage方法在不是consumeOrderly的時候,會判斷processQueue.getMaxSpan()是否大於this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan(),大於則執行executePullRequestLater方法進行流控,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL默認值為50

doc

  • DefaultMQPushConsumer