聊聊rocketmq的consumeThread

  • 2019 年 11 月 25 日
  • 笔记

本文主要研究一下rocketmq的consumeThread

DefaultMQPushConsumer

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {        private final InternalLogger log = ClientLogger.getLog();        //......        /**       * Minimum consumer thread number       */      private int consumeThreadMin = 20;        /**       * Max consumer thread number       */      private int consumeThreadMax = 20;        public int getConsumeThreadMax() {          return consumeThreadMax;      }        public int getConsumeThreadMin() {          return consumeThreadMin;      }        //......  }
  • DefaultMQPushConsumer定义了consumeThreadMin、consumeThreadMax属性,默认均为20

DefaultMQPushConsumerImpl

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java

public class DefaultMQPushConsumerImpl implements MQConsumerInner {  //......        private void checkConfig() throws MQClientException {          Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());            //......            // consumeThreadMin          if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1              || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) {              throw new MQClientException(                  "consumeThreadMin Out of range [1, 1000]"                      + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),                  null);          }            // consumeThreadMax          if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {              throw new MQClientException(                  "consumeThreadMax Out of range [1, 1000]"                      + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),                  null);          }            // consumeThreadMin can't be larger than consumeThreadMax          if (this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {              throw new MQClientException(                  "consumeThreadMin (" + this.defaultMQPushConsumer.getConsumeThreadMin() + ") "                      + "is larger than consumeThreadMax (" + this.defaultMQPushConsumer.getConsumeThreadMax() + ")",                  null);          }            //......      }    //......  }
  • checkConfig方法会对consumeThreadMin、consumeThreadMax参数进行校验,其中他们的范围必须大于1小于1000,且consumeThreadMin不能大于consumeThreadMax

ConsumeMessageConcurrentlyService

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {      private static final InternalLogger log = ClientLogger.getLog();      private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;      private final DefaultMQPushConsumer defaultMQPushConsumer;      private final MessageListenerConcurrently messageListener;      private final BlockingQueue<Runnable> consumeRequestQueue;      private final ThreadPoolExecutor consumeExecutor;      private final String consumerGroup;        private final ScheduledExecutorService scheduledExecutorService;      private final ScheduledExecutorService cleanExpireMsgExecutors;        public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,          MessageListenerConcurrently messageListener) {          this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;          this.messageListener = messageListener;            this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();          this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();          this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();            this.consumeExecutor = new ThreadPoolExecutor(              this.defaultMQPushConsumer.getConsumeThreadMin(),              this.defaultMQPushConsumer.getConsumeThreadMax(),              1000 * 60,              TimeUnit.MILLISECONDS,              this.consumeRequestQueue,              new ThreadFactoryImpl("ConsumeMessageThread_"));            this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));          this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));      }        //......        public void submitConsumeRequest(          final List<MessageExt> msgs,          final ProcessQueue processQueue,          final MessageQueue messageQueue,          final boolean dispatchToConsume) {          final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();          if (msgs.size() <= consumeBatchSize) {              ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);              try {                  this.consumeExecutor.submit(consumeRequest);              } catch (RejectedExecutionException e) {                  this.submitConsumeRequestLater(consumeRequest);              }          } else {              for (int total = 0; total < msgs.size(); ) {                  List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);                  for (int i = 0; i < consumeBatchSize; i++, total++) {                      if (total < msgs.size()) {                          msgThis.add(msgs.get(total));                      } else {                          break;                      }                  }                    ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);                  try {                      this.consumeExecutor.submit(consumeRequest);                  } catch (RejectedExecutionException e) {                      for (; total < msgs.size(); total++) {                          msgThis.add(msgs.get(total));                      }                        this.submitConsumeRequestLater(consumeRequest);                  }              }          }      }        private void submitConsumeRequestLater(final ConsumeRequest consumeRequest      ) {            this.scheduledExecutorService.schedule(new Runnable() {                @Override              public void run() {                  ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);              }          }, 5000, TimeUnit.MILLISECONDS);      }        //......  }
  • ConsumeMessageConcurrentlyService的构造器创建了consumeExecutor,其corePoolSize为defaultMQPushConsumer.getConsumeThreadMin(),其maximumPoolSize为defaultMQPushConsumer.getConsumeThreadMax(),其workQueue为LinkedBlockingQueue;其submitConsumeRequest及submitConsumeRequestLater方法均会往consumeExecutor提交consumeRequest

ConsumeRequest

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java

    class ConsumeRequest implements Runnable {          private final List<MessageExt> msgs;          private final ProcessQueue processQueue;          private final MessageQueue messageQueue;            public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {              this.msgs = msgs;              this.processQueue = processQueue;              this.messageQueue = messageQueue;          }            public List<MessageExt> getMsgs() {              return msgs;          }            public ProcessQueue getProcessQueue() {              return processQueue;          }            @Override          public void run() {              if (this.processQueue.isDropped()) {                  log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);                  return;              }                MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;              ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);              ConsumeConcurrentlyStatus status = null;              defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());                ConsumeMessageContext consumeMessageContext = null;              if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {                  consumeMessageContext = new ConsumeMessageContext();                  consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());                  consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());                  consumeMessageContext.setProps(new HashMap<String, String>());                  consumeMessageContext.setMq(messageQueue);                  consumeMessageContext.setMsgList(msgs);                  consumeMessageContext.setSuccess(false);                  ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);              }                long beginTimestamp = System.currentTimeMillis();              boolean hasException = false;              ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;              try {                  if (msgs != null && !msgs.isEmpty()) {                      for (MessageExt msg : msgs) {                          MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));                      }                  }                  status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);              } catch (Throwable e) {                  log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",                      RemotingHelper.exceptionSimpleDesc(e),                      ConsumeMessageConcurrentlyService.this.consumerGroup,                      msgs,                      messageQueue);                  hasException = true;              }              long consumeRT = System.currentTimeMillis() - beginTimestamp;              if (null == status) {                  if (hasException) {                      returnType = ConsumeReturnType.EXCEPTION;                  } else {                      returnType = ConsumeReturnType.RETURNNULL;                  }              } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {                  returnType = ConsumeReturnType.TIME_OUT;              } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {                  returnType = ConsumeReturnType.FAILED;              } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {                  returnType = ConsumeReturnType.SUCCESS;              }                if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {                  consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());              }                if (null == status) {                  log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",                      ConsumeMessageConcurrentlyService.this.consumerGroup,                      msgs,                      messageQueue);                  status = ConsumeConcurrentlyStatus.RECONSUME_LATER;              }                if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {                  consumeMessageContext.setStatus(status.toString());                  consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);                  ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);              }                ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()                  .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);                if (!processQueue.isDropped()) {                  ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);              } else {                  log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);              }          }            public MessageQueue getMessageQueue() {              return messageQueue;          }        }
  • ConsumeRequest实现了Runnable方法,其run方法先执行defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext),然后执行listener.consumeMessage(Collections.unmodifiableList(msgs), context),最后执行processConsumeResult(status, context, this)

小结

ConsumeMessageConcurrentlyService的构造器创建了consumeExecutor,其corePoolSize为defaultMQPushConsumer.getConsumeThreadMin(),其maximumPoolSize为defaultMQPushConsumer.getConsumeThreadMax(),其workQueue为LinkedBlockingQueue;其submitConsumeRequest及submitConsumeRequestLater方法均会往consumeExecutor提交consumeRequest

doc

  • DefaultMQPushConsumer