聊聊rocketmq的ListenerContainerConfiguration

  • 2019 年 10 月 28 日
  • 筆記

本文主要研究一下rocketmq的ListenerContainerConfiguration

ListenerContainerConfiguration

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java

@Configuration  public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {      private final static Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class);        private ConfigurableApplicationContext applicationContext;        private AtomicLong counter = new AtomicLong(0);        private StandardEnvironment environment;        private RocketMQProperties rocketMQProperties;        private ObjectMapper objectMapper;        public ListenerContainerConfiguration(ObjectMapper rocketMQMessageObjectMapper,          StandardEnvironment environment,          RocketMQProperties rocketMQProperties) {          this.objectMapper = rocketMQMessageObjectMapper;          this.environment = environment;          this.rocketMQProperties = rocketMQProperties;      }        @Override      public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {          this.applicationContext = (ConfigurableApplicationContext) applicationContext;      }        @Override      public void afterSingletonsInstantiated() {          Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);            if (Objects.nonNull(beans)) {              beans.forEach(this::registerContainer);          }      }        private void registerContainer(String beanName, Object bean) {          Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);            if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {              throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());          }            RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);          validate(annotation);            String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),              counter.incrementAndGet());          GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;            genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,              () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));          DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,              DefaultRocketMQListenerContainer.class);          if (!container.isRunning()) {              try {                  container.start();              } catch (Exception e) {                  log.error("Started container failed. {}", container, e);                  throw new RuntimeException(e);              }          }            log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);      }        private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) {          DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();            String nameServer = environment.resolvePlaceholders(annotation.nameServer());          nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;          String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());          container.setNameServer(nameServer);          if (!StringUtils.isEmpty(accessChannel)) {              container.setAccessChannel(AccessChannel.valueOf(accessChannel));          }          container.setTopic(environment.resolvePlaceholders(annotation.topic()));          container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));          container.setRocketMQMessageListener(annotation);          container.setRocketMQListener((RocketMQListener) bean);          container.setObjectMapper(objectMapper);          container.setName(name);  // REVIEW ME, use the same clientId or multiple?            return container;      }        private void validate(RocketMQMessageListener annotation) {          if (annotation.consumeMode() == ConsumeMode.ORDERLY &&              annotation.messageModel() == MessageModel.BROADCASTING) {              throw new BeanDefinitionValidationException(                  "Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!");          }      }  }
  • ListenerContainerConfiguration实现了ApplicationContextAware、SmartInitializingSingleton接口
  • 其setApplicationContext方法保存了applicationContext;其afterSingletonsInstantiated方法会获取标注了RocketMQMessageListener注解的bean,然后挨个执行registerContainer
  • registerContainer方法首先判断该bean是否是RocketMQListener的实现类,不是则抛出IllegalStateException;接着获取RocketMQMessageListener注解的信息,判断是否设置了不支持的属性;之后通过createRocketMQListenerContainer创建DefaultRocketMQListenerContainer并注册到applicationContext,然后对于没有running的container执行start方法

RocketMQMessageListener

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java

@Target(ElementType.TYPE)  @Retention(RetentionPolicy.RUNTIME)  @Documented  public @interface RocketMQMessageListener {        String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";      String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";      String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";      String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";      String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";        /**       * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve       * load balance. It's required and needs to be globally unique.       *       *       * See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.       */      String consumerGroup();        /**       * Topic name.       */      String topic();        /**       * Control how to selector message.       *       * @see SelectorType       */      SelectorType selectorType() default SelectorType.TAG;        /**       * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}       */      String selectorExpression() default "*";        /**       * Control consume mode, you can choice receive message concurrently or orderly.       */      ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;        /**       * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.       */      MessageModel messageModel() default MessageModel.CLUSTERING;        /**       * Max consumer thread number.       */      int consumeThreadMax() default 64;        /**       * Max consumer timeout, default 30s.       */      long consumeTimeout() default 30000L;        /**       * The property of "access-key".       */      String accessKey() default ACCESS_KEY_PLACEHOLDER;        /**       * The property of "secret-key".       */      String secretKey() default SECRET_KEY_PLACEHOLDER;        /**       * Switch flag instance for message trace.       */      boolean enableMsgTrace() default true;        /**       * The name value of message trace topic.If you don't config,you can use the default trace topic name.       */      String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;        /**       * The property of "name-server".       */      String nameServer() default NAME_SERVER_PLACEHOLDER;        /**       * The property of "access-channel".       */      String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;  }
  • RocketMQMessageListener注解定义了consumerGroup、topic、selectorType、selectorExpression、consumeMode、messageModel、consumeThreadMax、consumeTimeout、accessKey、secretKey、enableMsgTrace、customizedTraceTopic、nameServer、accessChannel属性

小结

  • ListenerContainerConfiguration实现了ApplicationContextAware、SmartInitializingSingleton接口
  • 其setApplicationContext方法保存了applicationContext;其afterSingletonsInstantiated方法会获取标注了RocketMQMessageListener注解的bean,然后挨个执行registerContainer
  • registerContainer方法首先判断该bean是否是RocketMQListener的实现类,不是则抛出IllegalStateException;接着获取RocketMQMessageListener注解的信息,判断是否设置了不支持的属性;之后通过createRocketMQListenerContainer创建DefaultRocketMQListenerContainer并注册到applicationContext,然后对于没有running的container执行start方法

doc

  • ListenerContainerConfiguration