­

聊聊rocketmq的sendOrderly

  • 2019 年 12 月 12 日
  • 筆記

本文主要研究一下rocketmq的sendOrderly

sendOrderly

rocketmq-spring-boot/2.0.4/rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java

public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean {  ​      //......  ​      public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) {          if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {              log.error("syncSendOrderly failed. destination:{}, message is null ", destination);              throw new IllegalArgumentException("`message` and `message.payload` cannot be null");          }          try {              long now = System.currentTimeMillis();              org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);              SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);              long costTime = System.currentTimeMillis() - now;              if (log.isDebugEnabled()) {                  log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId());              }              return sendResult;          } catch (Exception e) {              log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message);              throw new MessagingException(e.getMessage(), e);          }      }  ​      public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback,          long timeout) {          if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {              log.error("asyncSendOrderly failed. destination:{}, message is null ", destination);              throw new IllegalArgumentException("`message` and `message.payload` cannot be null");          }          try {              org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message);              producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout);          } catch (Exception e) {              log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message);              throw new MessagingException(e.getMessage(), e);          }      }  ​      //......  } 
  • syncSendOrderly方法最后调用的是producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);asyncSendOrderly方法最后调用的是producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout),比syncSendOrderly多了一个sendCallback

DefaultMQProducer

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.java

public class DefaultMQProducer extends ClientConfig implements MQProducer {  ​      //......  ​      public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)          throws MQClientException, RemotingException, MQBrokerException, InterruptedException {          msg.setTopic(withNamespace(msg.getTopic()));          return this.defaultMQProducerImpl.send(msg, selector, arg, timeout);      }  ​      public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)          throws MQClientException, RemotingException, InterruptedException {          msg.setTopic(withNamespace(msg.getTopic()));          this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout);      }  ​      //......  } 
  • DefaultMQProducer的send方法最后调用的是defaultMQProducerImpl.send(msg, selector, arg, timeout)或者defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout)方法

DefaultMQProducerImpl

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public class DefaultMQProducerImpl implements MQProducerInner {  ​      //......  ​      public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)          throws MQClientException, RemotingException, MQBrokerException, InterruptedException {          return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);      }  ​      private SendResult sendSelectImpl(          Message msg,          MessageQueueSelector selector,          Object arg,          final CommunicationMode communicationMode,          final SendCallback sendCallback, final long timeout      ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {          long beginStartTime = System.currentTimeMillis();          this.makeSureStateOK();          Validators.checkMessage(msg, this.defaultMQProducer);  ​          TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());          if (topicPublishInfo != null && topicPublishInfo.ok()) {              MessageQueue mq = null;              try {                  List<MessageQueue> messageQueueList =                      mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());                  Message userMessage = MessageAccessor.cloneMessage(msg);                  String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());                  userMessage.setTopic(userTopic);  ​                  mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));              } catch (Throwable e) {                  throw new MQClientException("select message queue throwed exception.", e);              }  ​              long costTime = System.currentTimeMillis() - beginStartTime;              if (timeout < costTime) {                  throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");              }              if (mq != null) {                  return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);              } else {                  throw new MQClientException("select message queue return null.", null);              }          }  ​          validateNameServerSetting();          throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);      }  ​      //......  }
  • DefaultMQProducerImpl的send方法调用的是sendSelectImpl方法,该方法在Validators.checkMessage之后会通过tryToFindTopicPublishInfo(msg.getTopic())查找topicPublishInfo,找不到则抛出MQClientException
  • 找到topicPublishInfo的话则通过mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList())获取messageQueueList
  • 之后通过mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg))方法获取mq,最后在mq不为null的情况下通过sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout – costTime)发送

小结

  • DefaultMQProducerImpl的send方法调用的是sendSelectImpl方法,该方法在Validators.checkMessage之后会通过tryToFindTopicPublishInfo(msg.getTopic())查找topicPublishInfo,找不到则抛出MQClientException
  • 找到topicPublishInfo的话则通过mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList())获取messageQueueList
  • 之后通过mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg))方法获取mq,最后在mq不为null的情况下通过sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout – costTime)发送

这里没有使用sendDefaultImpl方法,该方法会通过selectOneMessageQueue(topicPublishInfo, lastBrokerName)来选择MessageQueue来发送;而sendOrderly方法是通过MessageQueueSelector选择MessageQueue来发送

doc