聊聊rocketmq的DefaultRocketMQListenerContainer

  • 2019 年 11 月 1 日
  • 笔记

本文主要研究一下rocketmq的DefaultRocketMQListenerContainer

DefaultRocketMQListenerContainer

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

public class DefaultRocketMQListenerContainer implements InitializingBean,      RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {      private final static Logger log = LoggerFactory.getLogger(DefaultRocketMQListenerContainer.class);        private ApplicationContext applicationContext;        /**       * The name of the DefaultRocketMQListenerContainer instance       */      private String name;        private long suspendCurrentQueueTimeMillis = 1000;        /**       * Message consume retry strategy<br> -1,no retry,put into DLQ directly<br> 0,broker control retry frequency<br>       * >0,client control retry frequency.       */      private int delayLevelWhenNextConsume = 0;        private String nameServer;        private AccessChannel accessChannel = AccessChannel.LOCAL;        private String consumerGroup;        private String topic;        private int consumeThreadMax = 64;        private String charset = "UTF-8";        private ObjectMapper objectMapper;        private RocketMQListener rocketMQListener;        private RocketMQMessageListener rocketMQMessageListener;        private DefaultMQPushConsumer consumer;        private Class messageType;        private boolean running;        // The following properties came from @RocketMQMessageListener.      private ConsumeMode consumeMode;      private SelectorType selectorType;      private String selectorExpression;      private MessageModel messageModel;      private long consumeTimeout;        //......        public void setRocketMQMessageListener(RocketMQMessageListener anno) {          this.rocketMQMessageListener = anno;            this.consumeMode = anno.consumeMode();          this.consumeThreadMax = anno.consumeThreadMax();          this.messageModel = anno.messageModel();          this.selectorExpression = anno.selectorExpression();          this.selectorType = anno.selectorType();          this.consumeTimeout = anno.consumeTimeout();      }        @Override      public void setupMessageListener(RocketMQListener rocketMQListener) {          this.rocketMQListener = rocketMQListener;      }        @Override      public void destroy() {          this.setRunning(false);          if (Objects.nonNull(consumer)) {              consumer.shutdown();          }          log.info("container destroyed, {}", this.toString());      }        @Override      public boolean isAutoStartup() {          return true;      }        @Override      public void stop(Runnable callback) {          stop();          callback.run();      }        @Override      public void start() {          if (this.isRunning()) {              throw new IllegalStateException("container already running. " + this.toString());          }            try {              consumer.start();          } catch (MQClientException e) {              throw new IllegalStateException("Failed to start RocketMQ push consumer", e);          }          this.setRunning(true);            log.info("running container: {}", this.toString());      }        @Override      public void stop() {          if (this.isRunning()) {              if (Objects.nonNull(consumer)) {                  consumer.shutdown();              }              setRunning(false);          }      }        @Override      public boolean isRunning() {          return running;      }        private void setRunning(boolean running) {          this.running = running;      }        @Override      public int getPhase() {          // Returning Integer.MAX_VALUE only suggests that          // we will be the first bean to shutdown and last bean to start          return Integer.MAX_VALUE;      }          @Override      public void afterPropertiesSet() throws Exception {          initRocketMQPushConsumer();            this.messageType = getMessageType();          log.debug("RocketMQ messageType: {}", messageType.getName());      }        @Override      public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {          this.applicationContext = applicationContext;      }        @Override      public String toString() {          return "DefaultRocketMQListenerContainer{" +              "consumerGroup='" + consumerGroup + ''' +              ", nameServer='" + nameServer + ''' +              ", topic='" + topic + ''' +              ", consumeMode=" + consumeMode +              ", selectorType=" + selectorType +              ", selectorExpression='" + selectorExpression + ''' +              ", messageModel=" + messageModel +              '}';      }        private void initRocketMQPushConsumer() throws MQClientException {          Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required");          Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");          Assert.notNull(nameServer, "Property 'nameServer' is required");          Assert.notNull(topic, "Property 'topic' is required");            RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),              this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());          boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();          if (Objects.nonNull(rpcHook)) {              consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),                  enableMsgTrace, this.applicationContext.getEnvironment().                  resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));              consumer.setVipChannelEnabled(false);              consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup));          } else {              log.debug("Access-key or secret-key not configure in " + this + ".");              consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,                      this.applicationContext.getEnvironment().                      resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));          }            String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());          if (customizedNameServer != null) {              consumer.setNamesrvAddr(customizedNameServer);          } else {              consumer.setNamesrvAddr(nameServer);          }          if (accessChannel != null) {              consumer.setAccessChannel(accessChannel);          }          consumer.setConsumeThreadMax(consumeThreadMax);          if (consumeThreadMax < consumer.getConsumeThreadMin()) {              consumer.setConsumeThreadMin(consumeThreadMax);          }          consumer.setConsumeTimeout(consumeTimeout);          consumer.setInstanceName(this.name);            switch (messageModel) {              case BROADCASTING:                  consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);                  break;              case CLUSTERING:                  consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);                  break;              default:                  throw new IllegalArgumentException("Property 'messageModel' was wrong.");          }            switch (selectorType) {              case TAG:                  consumer.subscribe(topic, selectorExpression);                  break;              case SQL92:                  consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));                  break;              default:                  throw new IllegalArgumentException("Property 'selectorType' was wrong.");          }            switch (consumeMode) {              case ORDERLY:                  consumer.setMessageListener(new DefaultMessageListenerOrderly());                  break;              case CONCURRENTLY:                  consumer.setMessageListener(new DefaultMessageListenerConcurrently());                  break;              default:                  throw new IllegalArgumentException("Property 'consumeMode' was wrong.");          }            if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {              ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);          }        }        private Class getMessageType() {          Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);          Type[] interfaces = targetClass.getGenericInterfaces();          Class<?> superclass = targetClass.getSuperclass();          while ((Objects.isNull(interfaces) || 0 == interfaces.length) && Objects.nonNull(superclass)) {              interfaces = superclass.getGenericInterfaces();              superclass = targetClass.getSuperclass();          }          if (Objects.nonNull(interfaces)) {              for (Type type : interfaces) {                  if (type instanceof ParameterizedType) {                      ParameterizedType parameterizedType = (ParameterizedType) type;                      if (Objects.equals(parameterizedType.getRawType(), RocketMQListener.class)) {                          Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();                          if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {                              return (Class) actualTypeArguments[0];                          } else {                              return Object.class;                          }                      }                  }              }                return Object.class;          } else {              return Object.class;          }      }        //......  }
  • DefaultRocketMQListenerContainer实现了InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware接口;setRocketMQMessageListener方法会根据RocketMQMessageListener注解的信息来设置consumeMode、consumeThreadMax、messageModel、selectorExpression、selectorType、consumeTimeout
  • afterPropertiesSet方法执行了initRocketMQPushConsumer及getMessageType方法;initRocketMQPushConsumer方法会根据rpcHook是否为null来创建不同的DefaultMQPushConsumer,之后根据messageModel、selectorType、consumeMode等来配置consumer;如果rocketMQListener类型是RocketMQPushConsumerLifecycleListener的,则执行RocketMQPushConsumerLifecycleListener的prepareStart方法
  • setupMessageListener方法主要是保存了rocketMQListener;isAutoStartup方法返回true;start方法主要是执行consumer.start()方法;stop及destroy方法主要是执行consumer.shutdown()

DefaultMessageListenerConcurrently

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

    public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {            @SuppressWarnings("unchecked")          @Override          public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {              for (MessageExt messageExt : msgs) {                  log.debug("received msg: {}", messageExt);                  try {                      long now = System.currentTimeMillis();                      rocketMQListener.onMessage(doConvertMessage(messageExt));                      long costTime = System.currentTimeMillis() - now;                      log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);                  } catch (Exception e) {                      log.warn("consume message failed. messageExt:{}", messageExt, e);                      context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);                      return ConsumeConcurrentlyStatus.RECONSUME_LATER;                  }              }                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;          }      }
  • DefaultMessageListenerConcurrently方法实现了MessageListenerConcurrently接口;它的consumeMessage方法使用for循环try catch执行rocketMQListener.onMessage(doConvertMessage(messageExt))回调,都成功返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,一旦异常则返回ConsumeConcurrentlyStatus.RECONSUME_LATER

DefaultMessageListenerOrderly

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

    public class DefaultMessageListenerOrderly implements MessageListenerOrderly {            @SuppressWarnings("unchecked")          @Override          public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {              for (MessageExt messageExt : msgs) {                  log.debug("received msg: {}", messageExt);                  try {                      long now = System.currentTimeMillis();                      rocketMQListener.onMessage(doConvertMessage(messageExt));                      long costTime = System.currentTimeMillis() - now;                      log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);                  } catch (Exception e) {                      log.warn("consume message failed. messageExt:{}", messageExt, e);                      context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);                      return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;                  }              }                return ConsumeOrderlyStatus.SUCCESS;          }      }
  • DefaultMessageListenerOrderly实现了MessageListenerOrderly接口,其consumeMessage方法使用for循环try catch执行rocketMQListener.onMessage(doConvertMessage(messageExt))回调,都成功返回ConsumeOrderlyStatus.SUCCESS,一旦异常则返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT

小结

  • DefaultRocketMQListenerContainer实现了InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware接口;setRocketMQMessageListener方法会根据RocketMQMessageListener注解的信息来设置consumeMode、consumeThreadMax、messageModel、selectorExpression、selectorType、consumeTimeout
  • afterPropertiesSet方法执行了initRocketMQPushConsumer及getMessageType方法;initRocketMQPushConsumer方法会根据rpcHook是否为null来创建不同的DefaultMQPushConsumer,之后根据messageModel、selectorType、consumeMode等来配置consumer;如果rocketMQListener类型是RocketMQPushConsumerLifecycleListener的,则执行RocketMQPushConsumerLifecycleListener的prepareStart方法
  • setupMessageListener方法主要是保存了rocketMQListener;isAutoStartup方法返回true;start方法主要是执行consumer.start()方法;stop及destroy方法主要是执行consumer.shutdown()

doc

  • DefaultRocketMQListenerContainer