聊聊rocketmq的sendOrderly
- 2019 年 12 月 12 日
- 筆記
序
本文主要研究一下rocketmq的sendOrderly

sendOrderly
rocketmq-spring-boot/2.0.4/rocketmq-spring-boot-2.0.4-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java
public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean { //...... public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { log.error("syncSendOrderly failed. destination:{}, message is null ", destination); throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); } try { long now = System.currentTimeMillis(); org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout); long costTime = System.currentTimeMillis() - now; if (log.isDebugEnabled()) { log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId()); } return sendResult; } catch (Exception e) { log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } } public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback, long timeout) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { log.error("asyncSendOrderly failed. destination:{}, message is null ", destination); throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); } try { org.apache.rocketmq.common.message.Message rocketMsg = this.createRocketMqMessage(destination, message); producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout); } catch (Exception e) { log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } } //...... }
- syncSendOrderly方法最后调用的是producer.send(rocketMsg, messageQueueSelector, hashKey, timeout);asyncSendOrderly方法最后调用的是producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout),比syncSendOrderly多了一个sendCallback
DefaultMQProducer
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.java
public class DefaultMQProducer extends ClientConfig implements MQProducer { //...... public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { msg.setTopic(withNamespace(msg.getTopic())); return this.defaultMQProducerImpl.send(msg, selector, arg, timeout); } public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout); } //...... }
- DefaultMQProducer的send方法最后调用的是defaultMQProducerImpl.send(msg, selector, arg, timeout)或者defaultMQProducerImpl.send(msg, selector, arg, sendCallback, timeout)方法
DefaultMQProducerImpl
rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
public class DefaultMQProducerImpl implements MQProducerInner { //...... public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout); } private SendResult sendSelectImpl( Message msg, MessageQueueSelector selector, Object arg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; try { List<MessageQueue> messageQueueList = mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList()); Message userMessage = MessageAccessor.cloneMessage(msg); String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace()); userMessage.setTopic(userTopic); mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg)); } catch (Throwable e) { throw new MQClientException("select message queue throwed exception.", e); } long costTime = System.currentTimeMillis() - beginStartTime; if (timeout < costTime) { throw new RemotingTooMuchRequestException("sendSelectImpl call timeout"); } if (mq != null) { return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime); } else { throw new MQClientException("select message queue return null.", null); } } validateNameServerSetting(); throw new MQClientException("No route info for this topic, " + msg.getTopic(), null); } //...... }
- DefaultMQProducerImpl的send方法调用的是sendSelectImpl方法,该方法在Validators.checkMessage之后会通过tryToFindTopicPublishInfo(msg.getTopic())查找topicPublishInfo,找不到则抛出MQClientException
- 找到topicPublishInfo的话则通过mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList())获取messageQueueList
- 之后通过mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg))方法获取mq,最后在mq不为null的情况下通过sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout – costTime)发送
小结
- DefaultMQProducerImpl的send方法调用的是sendSelectImpl方法,该方法在Validators.checkMessage之后会通过tryToFindTopicPublishInfo(msg.getTopic())查找topicPublishInfo,找不到则抛出MQClientException
- 找到topicPublishInfo的话则通过mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList())获取messageQueueList
- 之后通过mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg))方法获取mq,最后在mq不为null的情况下通过sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout – costTime)发送
这里没有使用sendDefaultImpl方法,该方法会通过selectOneMessageQueue(topicPublishInfo, lastBrokerName)来选择MessageQueue来发送;而sendOrderly方法是通过MessageQueueSelector选择MessageQueue来发送