聊聊rocketmq的ConsumeMode.ORDERLY

  • 2019 年 12 月 25 日
  • 筆記

本文主要研究一下rocketmq的ConsumeMode.ORDERLY

ConsumeMode.ORDERLY

rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/annotation/ConsumeMode.java

public enum ConsumeMode {      /**       * Receive asynchronously delivered messages concurrently       */      CONCURRENTLY,        /**       * Receive asynchronously delivered messages orderly. one queue, one thread       */      ORDERLY  }
  • ConsumeMode定义了CONCURRENTLY、ORDERLY两个枚举值

DefaultRocketMQListenerContainer

rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

public class DefaultRocketMQListenerContainer implements InitializingBean,      RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {        //......        private void initRocketMQPushConsumer() throws MQClientException {          Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");          Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");          Assert.notNull(nameServer, "Property 'nameServer' is required");          Assert.notNull(topic, "Property 'topic' is required");            RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),              this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());          boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();          if (Objects.nonNull(rpcHook)) {              consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),                  enableMsgTrace, this.applicationContext.getEnvironment().                  resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));              consumer.setVipChannelEnabled(false);              consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup));          } else {              log.debug("Access-key or secret-key not configure in " + this + ".");              consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,                      this.applicationContext.getEnvironment().                      resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));          }            String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());          if (customizedNameServer != null) {              consumer.setNamesrvAddr(customizedNameServer);          } else {              consumer.setNamesrvAddr(nameServer);          }          if (accessChannel != null) {              consumer.setAccessChannel(accessChannel);          }          consumer.setConsumeThreadMax(consumeThreadMax);          if (consumeThreadMax < consumer.getConsumeThreadMin()) {              consumer.setConsumeThreadMin(consumeThreadMax);          }          consumer.setConsumeTimeout(consumeTimeout);            switch (messageModel) {              case BROADCASTING:                  consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);                  break;              case CLUSTERING:                  consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);                  break;              default:                  throw new IllegalArgumentException("Property 'messageModel' was wrong.");          }            switch (selectorType) {              case TAG:                  consumer.subscribe(topic, selectorExpression);                  break;              case SQL92:                  consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));                  break;              default:                  throw new IllegalArgumentException("Property 'selectorType' was wrong.");          }            switch (consumeMode) {              case ORDERLY:                  consumer.setMessageListener(new DefaultMessageListenerOrderly());                  break;              case CONCURRENTLY:                  consumer.setMessageListener(new DefaultMessageListenerConcurrently());                  break;              default:                  throw new IllegalArgumentException("Property 'consumeMode' was wrong.");          }            if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {              ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);          }        }    }
  • initRocketMQPushConsumer方法对于consumeMode为ORDERLY的设置的messageListener为DefaultMessageListenerOrderly

DefaultMessageListenerOrderly

rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

    public class DefaultMessageListenerOrderly implements MessageListenerOrderly {            @SuppressWarnings("unchecked")          @Override          public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {              for (MessageExt messageExt : msgs) {                  log.debug("received msg: {}", messageExt);                  try {                      long now = System.currentTimeMillis();                      rocketMQListener.onMessage(doConvertMessage(messageExt));                      long costTime = System.currentTimeMillis() - now;                      log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);                  } catch (Exception e) {                      log.warn("consume message failed. messageExt:{}", messageExt, e);                      context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);                      return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;                  }              }                return ConsumeOrderlyStatus.SUCCESS;          }      }
  • DefaultMessageListenerOrderly实现了MessageListenerOrderly接口,其consumeMessage方法遍历msgs,然后挨个调用rocketMQListener.onMessage(doConvertMessage(messageExt)),若循环中出现异常被捕获住了会执行context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis),然后返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT

ConsumeMessageOrderlyService

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java

    class ConsumeRequest implements Runnable {          private final ProcessQueue processQueue;          private final MessageQueue messageQueue;            public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {              this.processQueue = processQueue;              this.messageQueue = messageQueue;          }            public ProcessQueue getProcessQueue() {              return processQueue;          }            public MessageQueue getMessageQueue() {              return messageQueue;          }            @Override          public void run() {              if (this.processQueue.isDropped()) {                  log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);                  return;              }                final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);              synchronized (objLock) {                  if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())                      || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {                      final long beginTime = System.currentTimeMillis();                      for (boolean continueConsume = true; continueConsume; ) {                          if (this.processQueue.isDropped()) {                              log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);                              break;                          }                            if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())                              && !this.processQueue.isLocked()) {                              log.warn("the message queue not locked, so consume later, {}", this.messageQueue);                              ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);                              break;                          }                            if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())                              && this.processQueue.isLockExpired()) {                              log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);                              ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);                              break;                          }                            long interval = System.currentTimeMillis() - beginTime;                          if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {                              ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);                              break;                          }                            final int consumeBatchSize =                              ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();                            List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);                          defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());                          if (!msgs.isEmpty()) {                              final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);                                ConsumeOrderlyStatus status = null;                                ConsumeMessageContext consumeMessageContext = null;                              if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {                                  consumeMessageContext = new ConsumeMessageContext();                                  consumeMessageContext                                      .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());                                  consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());                                  consumeMessageContext.setMq(messageQueue);                                  consumeMessageContext.setMsgList(msgs);                                  consumeMessageContext.setSuccess(false);                                  // init the consume context type                                  consumeMessageContext.setProps(new HashMap<String, String>());                                  ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);                              }                                long beginTimestamp = System.currentTimeMillis();                              ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;                              boolean hasException = false;                              try {                                  this.processQueue.getLockConsume().lock();                                  if (this.processQueue.isDropped()) {                                      log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",                                          this.messageQueue);                                      break;                                  }                                    status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);                              } catch (Throwable e) {                                  log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",                                      RemotingHelper.exceptionSimpleDesc(e),                                      ConsumeMessageOrderlyService.this.consumerGroup,                                      msgs,                                      messageQueue);                                  hasException = true;                              } finally {                                  this.processQueue.getLockConsume().unlock();                              }                                if (null == status                                  || ConsumeOrderlyStatus.ROLLBACK == status                                  || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {                                  log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",                                      ConsumeMessageOrderlyService.this.consumerGroup,                                      msgs,                                      messageQueue);                              }                                long consumeRT = System.currentTimeMillis() - beginTimestamp;                              if (null == status) {                                  if (hasException) {                                      returnType = ConsumeReturnType.EXCEPTION;                                  } else {                                      returnType = ConsumeReturnType.RETURNNULL;                                  }                              } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {                                  returnType = ConsumeReturnType.TIME_OUT;                              } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {                                  returnType = ConsumeReturnType.FAILED;                              } else if (ConsumeOrderlyStatus.SUCCESS == status) {                                  returnType = ConsumeReturnType.SUCCESS;                              }                                if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {                                  consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());                              }                                if (null == status) {                                  status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;                              }                                if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {                                  consumeMessageContext.setStatus(status.toString());                                  consumeMessageContext                                      .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);                                  ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);                              }                                ConsumeMessageOrderlyService.this.getConsumerStatsManager()                                  .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);                                continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);                          } else {                              continueConsume = false;                          }                      }                  } else {                      if (this.processQueue.isDropped()) {                          log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);                          return;                      }                        ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);                  }              }          }        }
  • ConsumeRequest实现了Runnable接口,它的构造器要求传入processQueue、messageQueue参数;其run方法首先通过messageQueueLock.fetchLockObject(this.messageQueue)获取objLock,之后synchronized该objLock进行后续操作
  • 对于messageModel非MessageModel.BROADCASTING的且(this.processQueue.isLocked() && !this.processQueue.isLockExpired())不成立的则执行tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100)
  • 之后通过processQueue.takeMessags(consumeBatchSize),然后执行processQueue.getLockConsume().lock(),再执行messageListener.consumeMessage(Collections.unmodifiableList(msgs), context),最后在finally执行processQueue.getLockConsume().unlock(),之后通过ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this)处理ConsumeOrderlyStatus

ProcessQueue

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java

public class ProcessQueue {        //......        public List<MessageExt> takeMessags(final int batchSize) {          List<MessageExt> result = new ArrayList<MessageExt>(batchSize);          final long now = System.currentTimeMillis();          try {              this.lockTreeMap.writeLock().lockInterruptibly();              this.lastConsumeTimestamp = now;              try {                  if (!this.msgTreeMap.isEmpty()) {                      for (int i = 0; i < batchSize; i++) {                          Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();                          if (entry != null) {                              result.add(entry.getValue());                              consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());                          } else {                              break;                          }                      }                  }                    if (result.isEmpty()) {                      consuming = false;                  }              } finally {                  this.lockTreeMap.writeLock().unlock();              }          } catch (InterruptedException e) {              log.error("take Messages exception", e);          }            return result;      }        //......  }
  • takeMessags方法先执行lockTreeMap.writeLock().lockInterruptibly(),然后执行msgTreeMap.pollFirstEntry()及consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());最后finally执行lockTreeMap.writeLock().unlock()

小结

DefaultMessageListenerOrderly实现了MessageListenerOrderly接口,其consumeMessage方法遍历msgs,然后挨个调用rocketMQListener.onMessage(doConvertMessage(messageExt)),若循环中出现异常被捕获住了会执行context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis),然后返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT

doc

  • ConsumeMessageOrderlyService
Exit mobile version