RocketMQ中Producer消息的发送源码分析

  • 2019 年 10 月 3 日
  • 筆記

上篇博客介绍过Producer的启动,这里涉及到相关内容就不再累赘了 【RocketMQ中Producer的启动源码分析】

 

Producer发送消息,首先需要生成Message实例:

 1 public class Message implements Serializable {   2     private static final long serialVersionUID = 8445773977080406428L;   3   4     private String topic;   5     private int flag;   6     private Map<String, String> properties;   7     private byte[] body;   8     private String transactionId;   9  10      public Message() {}  11  12     public Message(String topic, byte[] body) {  13         this(topic, "", "", 0, body, true);  14     }  15  16     public Message(String topic, String tags, byte[] body) {  17         this(topic, tags, "", 0, body, true);  18     }  19  20     public Message(String topic, String tags, String keys, byte[] body) {  21         this(topic, tags, keys, 0, body, true);  22     }  23  24     public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {  25         this.topic = topic;  26         this.flag = flag;  27         this.body = body;  28  29         if (tags != null && tags.length() > 0)  30             this.setTags(tags);  31  32         if (keys != null && keys.length() > 0)  33             this.setKeys(keys);  34  35         this.setWaitStoreMsgOK(waitStoreMsgOK);  36     }  37  38     public void setTags(String tags) {  39         this.putProperty(MessageConst.PROPERTY_TAGS, tags);  40     }  41  42     public void setKeys(String keys) {  43         this.putProperty(MessageConst.PROPERTY_KEYS, keys);  44     }  45  46     public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {  47         this.putProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, Boolean.toString(waitStoreMsgOK));  48     }  49  50     void putProperty(final String name, final String value) {  51         if (null == this.properties) {  52             this.properties = new HashMap<String, String>();  53         }  54  55         this.properties.put(name, value);  56     }  57  58     public void putUserProperty(final String name, final String value) {  59         if (MessageConst.STRING_HASH_SET.contains(name)) {  60             throw new RuntimeException(String.format(  61                 "The Property<%s> is used by system, input another please", name));  62         }  63  64         if (value == null || value.trim().isEmpty()  65             || name == null || name.trim().isEmpty()) {  66             throw new IllegalArgumentException(  67                 "The name or value of property can not be null or blank string!"  68             );  69         }  70  71         this.putProperty(name, value);  72     }  73  74 }

其中properties中存放需要配置的属性,由MessageConst规定其key:

 1 public class MessageConst {   2     public static final String PROPERTY_KEYS = "KEYS";   3     public static final String PROPERTY_TAGS = "TAGS";   4     public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT";   5     public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";   6     public static final String PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";   7     public static final String PROPERTY_REAL_TOPIC = "REAL_TOPIC";   8     public static final String PROPERTY_REAL_QUEUE_ID = "REAL_QID";   9     public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";  10     public static final String PROPERTY_PRODUCER_GROUP = "PGROUP";  11     public static final String PROPERTY_MIN_OFFSET = "MIN_OFFSET";  12     public static final String PROPERTY_MAX_OFFSET = "MAX_OFFSET";  13     public static final String PROPERTY_BUYER_ID = "BUYER_ID";  14     public static final String PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID";  15     public static final String PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG";  16     public static final String PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG";  17     public static final String PROPERTY_MQ2_FLAG = "MQ2_FLAG";  18     public static final String PROPERTY_RECONSUME_TIME = "RECONSUME_TIME";  19     public static final String PROPERTY_MSG_REGION = "MSG_REGION";  20     public static final String PROPERTY_TRACE_SWITCH = "TRACE_ON";  21     public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";  22     public static final String PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES";  23     public static final String PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME";  24     public static final String PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET = "TRAN_PREPARED_QUEUE_OFFSET";  25     public static final String PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES";  26     public static final String PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS = "CHECK_IMMUNITY_TIME_IN_SECONDS";  27 }

在创建完Message后,通过DefaultMQProducer的send方法对消息进行发送

Producer支持三种模式的消息发送,由CommunicationMode枚举规定:

1 public enum CommunicationMode {  2     SYNC,  3     ASYNC,  4     ONEWAY,  5 }

分别代表:同步、异步以及单向发送
其中同步和异步是根据不同参数类型的send方法来决定的

只要send方法中带有SendCallback参数,都代表着异步发送,否则就是同步,SendCallback提供了异步发送的回滚事件响应:

1 public interface SendCallback {  2     void onSuccess(final SendResult sendResult);  3  4     void onException(final Throwable e);  5 }

而单向发送需要使用sendOneway方法

 

无论使用哪种方式,最后都是通过调用DefaultMQProducer包装的defaultMQProducerImpl的sendDefaultImpl方法

DefaultMQProducerImpl的sendDefaultImpl方法:

  1 private SendResult sendDefaultImpl(    2         Message msg,    3         final CommunicationMode communicationMode,    4         final SendCallback sendCallback,    5         final long timeout    6     ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {    7     this.makeSureStateOK();    8     Validators.checkMessage(msg, this.defaultMQProducer);    9   10     final long invokeID = random.nextLong();   11     long beginTimestampFirst = System.currentTimeMillis();   12     long beginTimestampPrev = beginTimestampFirst;   13     long endTimestamp = beginTimestampFirst;   14     TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());   15     if (topicPublishInfo != null && topicPublishInfo.ok()) {   16         boolean callTimeout = false;   17         MessageQueue mq = null;   18         Exception exception = null;   19         SendResult sendResult = null;   20         int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;   21         int times = 0;   22         String[] brokersSent = new String[timesTotal];   23         for (; times < timesTotal; times++) {   24             String lastBrokerName = null == mq ? null : mq.getBrokerName();   25             MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);   26             if (mqSelected != null) {   27                 mq = mqSelected;   28                 brokersSent[times] = mq.getBrokerName();   29                 try {   30                     beginTimestampPrev = System.currentTimeMillis();   31                     long costTime = beginTimestampPrev - beginTimestampFirst;   32                     if (timeout < costTime) {   33                         callTimeout = true;   34                         break;   35                     }   36   37                     sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);   38                     endTimestamp = System.currentTimeMillis();   39                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);   40                     switch (communicationMode) {   41                         case ASYNC:   42                             return null;   43                         case ONEWAY:   44                             return null;   45                         case SYNC:   46                             if (sendResult.getSendStatus() != SendStatus.SEND_OK) {   47                                 if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {   48                                     continue;   49                                 }   50                             }   51   52                             return sendResult;   53                         default:   54                             break;   55                     }   56                 } catch (RemotingException e) {   57                     endTimestamp = System.currentTimeMillis();   58                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);   59                     log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);   60                     log.warn(msg.toString());   61                     exception = e;   62                     continue;   63                 } catch (MQClientException e) {   64                     endTimestamp = System.currentTimeMillis();   65                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);   66                     log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);   67                     log.warn(msg.toString());   68                     exception = e;   69                     continue;   70                 } catch (MQBrokerException e) {   71                     endTimestamp = System.currentTimeMillis();   72                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);   73                     log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);   74                     log.warn(msg.toString());   75                     exception = e;   76                     switch (e.getResponseCode()) {   77                         case ResponseCode.TOPIC_NOT_EXIST:   78                         case ResponseCode.SERVICE_NOT_AVAILABLE:   79                         case ResponseCode.SYSTEM_ERROR:   80                         case ResponseCode.NO_PERMISSION:   81                         case ResponseCode.NO_BUYER_ID:   82                         case ResponseCode.NOT_IN_CURRENT_UNIT:   83                             continue;   84                         default:   85                             if (sendResult != null) {   86                                 return sendResult;   87                             }   88   89                             throw e;   90                     }   91                 } catch (InterruptedException e) {   92                     endTimestamp = System.currentTimeMillis();   93                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);   94                     log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);   95                     log.warn(msg.toString());   96   97                     log.warn("sendKernelImpl exception", e);   98                     log.warn(msg.toString());   99                     throw e;  100                 }  101             } else {  102                 break;  103             }  104         }  105  106         if (sendResult != null) {  107             return sendResult;  108         }  109  110         String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",  111             times,  112             System.currentTimeMillis() - beginTimestampFirst,  113             msg.getTopic(),  114             Arrays.toString(brokersSent));  115  116         info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);  117  118         MQClientException mqClientException = new MQClientException(info, exception);  119         if (callTimeout) {  120             throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");  121         }  122  123         if (exception instanceof MQBrokerException) {  124             mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());  125         } else if (exception instanceof RemotingConnectException) {  126             mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);  127         } else if (exception instanceof RemotingTimeoutException) {  128             mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);  129         } else if (exception instanceof MQClientException) {  130             mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);  131         }  132  133         throw mqClientException;  134     }  135  136     List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();  137     if (null == nsList || nsList.isEmpty()) {  138         throw new MQClientException(  139             "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);  140     }  141  142     throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),  143         null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);  144 }

其中CommunicationMode参数会根据调用的API进行如上所说进行发送类型的设置
而SendCallback参数,只有当使用异步发送的API时才不是null

首先调用makeSureStateOK方法,检查Producer是否启动:

1 private void makeSureStateOK() throws MQClientException {  2     if (this.serviceState != ServiceState.RUNNING) {  3         throw new MQClientException("The producer service state not OK, "  4             + this.serviceState  5             + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),  6             null);  7     }  8 }

serviceState 在上一篇博客中介绍过了

在检查完Producer的状态后,还需要通过Validators的checkTopic方法验证Message的合法性:

 1 public static void checkTopic(String topic) throws MQClientException {   2     if (UtilAll.isBlank(topic)) {   3         throw new MQClientException("The specified topic is blank", null);   4     }   5   6     if (!regularExpressionMatcher(topic, PATTERN)) {   7         throw new MQClientException(String.format(   8             "The specified topic[%s] contains illegal characters, allowing only %s", topic,   9             VALID_PATTERN_STR), null);  10     }  11  12     if (topic.length() > CHARACTER_MAX_LENGTH) {  13         throw new MQClientException("The specified topic is longer than topic max length 255.", null);  14     }  15  16     //whether the same with system reserved keyword  17     if (topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {  18         throw new MQClientException(  19             String.format("The topic[%s] is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.", topic), null);  20     }  21 }

验证完毕后,记录开始时间戳,预示着发送的真正开始

接着调用tryToFindTopicPublishInfo,根据Topic获取路由信息
tryToFindTopicPublishInfo方法:

 1 private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {   2     TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);   3     if (null == topicPublishInfo || !topicPublishInfo.ok()) {   4         this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());   5         this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);   6         topicPublishInfo = this.topicPublishInfoTable.get(topic);   7     }   8   9     if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {  10         return topicPublishInfo;  11     } else {  12         this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);  13         topicPublishInfo = this.topicPublishInfoTable.get(topic);  14         return topicPublishInfo;  15     }  16 }

在Producer启动中已经介绍过了topicPublishInfoTable,是一张记录有关Topic的路由信息的map,先尝试获取是否有存在的TopicPublishInfo
若是不存在,或者消息队列不可用(ok不成立):

1 public boolean ok() {  2     return null != this.messageQueueList && !this.messageQueueList.isEmpty();  3 }

ok用来验证该路由上的消息队列是否可用

需要创建一个新的TopicPublishInfo放在map中,然后调用updateTopicRouteInfoFromNameServer来更新路由信息,updateTopicRouteInfoFromNameServer在上一篇说过,在定时任务中会使用,这里就是为了及时更新

若是存在,且有路由信息消息队列可用,则直接返回topicPublishInfo
否则还需要调用updateTopicRouteInfoFromNameServer来进行一次更新

回到sendDefaultImpl,在取得到路由信息后,现设置callTimeout超时响应为false,用于处理发送超时
接着根据发送方式CommunicationMode,计算如果发送失败,允许重发的次数,这里是针对同步发送,默认1+2共三次,其他两种模式只允许发送一次

根据发送次数,创建一个记录BrokerName的数组,再由发送次数进行for循环

首先根据topicPublishInfo和lastBrokerName调用selectOneMessageQueue选取指定的消息队列,是由TopicPublishInfo的selectOneMessageQueue方法实现的:

 1 public MessageQueue selectOneMessageQueue(final String lastBrokerName) {   2     if (lastBrokerName == null) {   3         return selectOneMessageQueue();   4     } else {   5         int index = this.sendWhichQueue.getAndIncrement();   6         for (int i = 0; i < this.messageQueueList.size(); i++) {   7             int pos = Math.abs(index++) % this.messageQueueList.size();   8             if (pos < 0)   9                 pos = 0;  10             MessageQueue mq = this.messageQueueList.get(pos);  11             if (!mq.getBrokerName().equals(lastBrokerName)) {  12                 return mq;  13             }  14         }  15         return selectOneMessageQueue();  16     }  17 }  18  19 public MessageQueue selectOneMessageQueue() {  20     int index = this.sendWhichQueue.getAndIncrement();  21     int pos = Math.abs(index) % this.messageQueueList.size();  22     if (pos < 0)  23         pos = 0;  24     return this.messageQueueList.get(pos);  25 }

当lastBrokerName等于null,使用selectOneMessageQueue的无参方法,其中sendWhichQueue我在上一篇介绍过,不同线程通过getAndIncrement获得到的index是一个随机值
根据这个index对messageQueueList取余,来获取在list中的下标,根据这个下标在messageQueueList中选取一个MessageQueue
由于不同的MessageQueue有不同的路由信息,所里在这里其实是为了负载均衡,保证每次发送能发送给不同的broker

若是lastBrokerName不等于null,还是和上面相似,只不过当选取到了MessageQueue时,要和lastBrokerName比较,当不想同时,才返回,同样也是为了保证不向同一broker重复发送来保证负载均衡

回到sendDefaultImpl,在选取完MessageQueue后,记录BrokerName,在计算是否达到超时事件,当这些成功后需要调用sendKernelImpl来完成真正的发送:
sendKernelImpl方法:

  1 private SendResult sendKernelImpl(final Message msg,    2                                       final MessageQueue mq,    3                                       final CommunicationMode communicationMode,    4                                       final SendCallback sendCallback,    5                                       final TopicPublishInfo topicPublishInfo,    6                                       final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {    7     long beginStartTime = System.currentTimeMillis();    8     String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());    9     if (null == brokerAddr) {   10         tryToFindTopicPublishInfo(mq.getTopic());   11         brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());   12     }   13   14     SendMessageContext context = null;   15     if (brokerAddr != null) {   16         brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);   17   18         byte[] prevBody = msg.getBody();   19         try {   20             //for MessageBatch,ID has been set in the generating process   21             if (!(msg instanceof MessageBatch)) {   22                 MessageClientIDSetter.setUniqID(msg);   23             }   24   25             int sysFlag = 0;   26             boolean msgBodyCompressed = false;   27             if (this.tryToCompressMessage(msg)) {   28                 sysFlag |= MessageSysFlag.COMPRESSED_FLAG;   29                 msgBodyCompressed = true;   30             }   31   32             final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);   33             if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {   34                 sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;   35             }   36   37             if (hasCheckForbiddenHook()) {   38                 CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();   39                 checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());   40                 checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());   41                 checkForbiddenContext.setCommunicationMode(communicationMode);   42                 checkForbiddenContext.setBrokerAddr(brokerAddr);   43                 checkForbiddenContext.setMessage(msg);   44                 checkForbiddenContext.setMq(mq);   45                 checkForbiddenContext.setUnitMode(this.isUnitMode());   46                 this.executeCheckForbiddenHook(checkForbiddenContext);   47             }   48   49             if (this.hasSendMessageHook()) {   50                 context = new SendMessageContext();   51                 context.setProducer(this);   52                 context.setProducerGroup(this.defaultMQProducer.getProducerGroup());   53                 context.setCommunicationMode(communicationMode);   54                 context.setBornHost(this.defaultMQProducer.getClientIP());   55                 context.setBrokerAddr(brokerAddr);   56                 context.setMessage(msg);   57                 context.setMq(mq);   58                 String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);   59                 if (isTrans != null && isTrans.equals("true")) {   60                     context.setMsgType(MessageType.Trans_Msg_Half);   61                 }   62   63                 if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {   64                     context.setMsgType(MessageType.Delay_Msg);   65                 }   66                 this.executeSendMessageHookBefore(context);   67             }   68   69             SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();   70             requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());   71             requestHeader.setTopic(msg.getTopic());   72             requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());   73             requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());   74             requestHeader.setQueueId(mq.getQueueId());   75             requestHeader.setSysFlag(sysFlag);   76             requestHeader.setBornTimestamp(System.currentTimeMillis());   77             requestHeader.setFlag(msg.getFlag());   78             requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));   79             requestHeader.setReconsumeTimes(0);   80             requestHeader.setUnitMode(this.isUnitMode());   81             requestHeader.setBatch(msg instanceof MessageBatch);   82             if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {   83                 String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);   84                 if (reconsumeTimes != null) {   85                     requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));   86                     MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);   87                 }   88   89                 String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);   90                 if (maxReconsumeTimes != null) {   91                     requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));   92                     MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);   93                 }   94             }   95   96             SendResult sendResult = null;   97             switch (communicationMode) {   98                 case ASYNC:   99                     Message tmpMessage = msg;  100                     if (msgBodyCompressed) {  101                         //If msg body was compressed, msgbody should be reset using prevBody.  102                         //Clone new message using commpressed message body and recover origin massage.  103                         //Fix bug:https://github.com/apache/rocketmq-externals/issues/66  104                         tmpMessage = MessageAccessor.cloneMessage(msg);  105                         msg.setBody(prevBody);  106                     }  107                     long costTimeAsync = System.currentTimeMillis() - beginStartTime;  108                     if (timeout < costTimeAsync) {  109                         throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");  110                     }  111                     sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(  112                         brokerAddr,  113                         mq.getBrokerName(),  114                         tmpMessage,  115                         requestHeader,  116                         timeout - costTimeAsync,  117                         communicationMode,  118                         sendCallback,  119                         topicPublishInfo,  120                         this.mQClientFactory,  121                         this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),  122                         context,  123                         this);  124                     break;  125                 case ONEWAY:  126                 case SYNC:  127                     long costTimeSync = System.currentTimeMillis() - beginStartTime;  128                     if (timeout < costTimeSync) {  129                         throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");  130                     }  131                     sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(  132                         brokerAddr,  133                         mq.getBrokerName(),  134                         msg,  135                         requestHeader,  136                         timeout - costTimeSync,  137                         communicationMode,  138                         context,  139                         this);  140                     break;  141                 default:  142                     assert false;  143                     break;  144             }  145  146             if (this.hasSendMessageHook()) {  147                 context.setSendResult(sendResult);  148                 this.executeSendMessageHookAfter(context);  149             }  150  151             return sendResult;  152         } catch (RemotingException e) {  153             if (this.hasSendMessageHook()) {  154                 context.setException(e);  155                 this.executeSendMessageHookAfter(context);  156             }  157             throw e;  158         } catch (MQBrokerException e) {  159             if (this.hasSendMessageHook()) {  160                 context.setException(e);  161                 this.executeSendMessageHookAfter(context);  162             }  163             throw e;  164         } catch (InterruptedException e) {  165             if (this.hasSendMessageHook()) {  166                 context.setException(e);  167                 this.executeSendMessageHookAfter(context);  168             }  169             throw e;  170         } finally {  171             msg.setBody(prevBody);  172         }  173     }  174  175     throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);  176 }

先记录开始时间beginStartTime,为可能的超时做准备
然后根据BrokerName来获取对应的Broker地址
findBrokerAddressInPublish方法:

1 public String findBrokerAddressInPublish(final String brokerName) {  2     HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);  3     if (map != null && !map.isEmpty()) {  4         return map.get(MixAll.MASTER_ID);  5     }  6  7     return null;  8 }

根据brokerName在brokerAddrTable表中进行查找

若是没有找到还是通过tryToFindTopicPublishInfo来进行更新,然后再通过findBrokerAddressInPublish重新查找

再往后,如果设置了VIP(高优先级队列)通道,那么这里将根据brokerAddr获取VIP通道的的地址:
MixAll的brokerVIPChannel方法:

1 public static String brokerVIPChannel(final boolean isChange, final String brokerAddr) {  2     if (isChange) {  3         String[] ipAndPort = brokerAddr.split(":");  4         String brokerAddrNew = ipAndPort[0] + ":" + (Integer.parseInt(ipAndPort[1]) - 2);  5         return brokerAddrNew;  6     } else {  7         return brokerAddr;  8     }  9 }

VIP通道的地址计算很简单,只是将端口号减去2

在设置完后,就是一大堆的配置了

这里定义了一个sysFlag的整型值,表示消息的类型,有如下取值:

1 public class MessageSysFlag {  2     public final static int COMPRESSED_FLAG = 0x1;  3     public final static int MULTI_TAGS_FLAG = 0x1 << 1;  4     public final static int TRANSACTION_NOT_TYPE = 0;  5     public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2;  6     public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2;  7     public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2;  8 }

还定义了一个msgBodyCompressed,表示消息是否经过压缩,tryToCompressMessage判断并对消息进行压缩:
tryToCompressMessage方法:

 1 private boolean tryToCompressMessage(final Message msg) {   2     if (msg instanceof MessageBatch) {   3         //batch dose not support compressing right now   4         return false;   5     }   6     byte[] body = msg.getBody();   7     if (body != null) {   8         if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {   9             try {  10                 byte[] data = UtilAll.compress(body, zipCompressLevel);  11                 if (data != null) {  12                     msg.setBody(data);  13                     return true;  14                 }  15             } catch (IOException e) {  16                 log.error("tryToCompressMessage exception", e);  17                 log.warn(msg.toString());  18             }  19         }  20     }  21  22     return false;  23 }

当消息大小大于等于compressMsgBodyOverHowmuch(默认4M)时,使用UtilAll的compress消息进行压缩处理:

 1 public static byte[] compress(final byte[] src, final int level) throws IOException {   2     byte[] result = src;   3     ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length);   4     java.util.zip.Deflater defeater = new java.util.zip.Deflater(level);   5     DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(byteArrayOutputStream, defeater);   6     try {   7         deflaterOutputStream.write(src);   8         deflaterOutputStream.finish();   9         deflaterOutputStream.close();  10         result = byteArrayOutputStream.toByteArray();  11     } catch (IOException e) {  12         defeater.end();  13         throw e;  14     } finally {  15         try {  16             byteArrayOutputStream.close();  17         } catch (IOException ignored) {  18         }  19  20         defeater.end();  21     }  22  23     return result;  24 }

这里采用zip的方式进行消息压缩

接下来,根据消息是否是事务消息来选择设置sysFlag,关于事务消息在后面博客再说

接下来检查是否设置了CheckForbiddenHook,若是设置了需要遍历所有的CheckForbiddenHook,执行其 checkForbidden方法,来完成禁发

同理检查是否设置了SendMessageHook,遍历所有的SendMessageHook,执行其sendMessageBefore方法,在消息发送完毕后,会执行其sendMessageAfter方法

接着会对请求头requestHeader进行一大堆设置,做完这些后,进入switch块,根据不同的发送方式做了相应检查
最后无论是哪种发送方式,都会调用MQClientAPIImpl的sendMessage方法:

 1 public SendResult sendMessage(   2     final String addr,   3     final String brokerName,   4     final Message msg,   5     final SendMessageRequestHeader requestHeader,   6     final long timeoutMillis,   7     final CommunicationMode communicationMode,   8     final SendCallback sendCallback,   9     final TopicPublishInfo topicPublishInfo,  10     final MQClientInstance instance,  11     final int retryTimesWhenSendFailed,  12     final SendMessageContext context,  13     final DefaultMQProducerImpl producer  14 ) throws RemotingException, MQBrokerException, InterruptedException {  15     long beginStartTime = System.currentTimeMillis();  16     RemotingCommand request = null;  17     if (sendSmartMsg || msg instanceof MessageBatch) {  18         SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);  19         request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);  20     } else {  21         request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);  22     }  23  24     request.setBody(msg.getBody());  25  26     switch (communicationMode) {  27         case ONEWAY:  28             this.remotingClient.invokeOneway(addr, request, timeoutMillis);  29             return null;  30         case ASYNC:  31             final AtomicInteger times = new AtomicInteger();  32             long costTimeAsync = System.currentTimeMillis() - beginStartTime;  33             if (timeoutMillis < costTimeAsync) {  34                 throw new RemotingTooMuchRequestException("sendMessage call timeout");  35             }  36             this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,  37                 retryTimesWhenSendFailed, times, context, producer);  38             return null;  39         case SYNC:  40             long costTimeSync = System.currentTimeMillis() - beginStartTime;  41             if (timeoutMillis < costTimeSync) {  42                 throw new RemotingTooMuchRequestException("sendMessage call timeout");  43             }  44             return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);  45         default:  46             assert false;  47             break;  48     }  49  50     return null;  51 }

首先会根据消息的类型,设置不同类型的请求RemotingCommand

在完成请求的封装后,还是根据发送方式来执行

ONEWAY方式:
会直接调用remotingClient即Netty客户端的invokeOneway方法:

 1 public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,   2         RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {   3     final Channel channel = this.getAndCreateChannel(addr);   4     if (channel != null && channel.isActive()) {   5         try {   6             doBeforeRpcHooks(addr, request);   7             this.invokeOnewayImpl(channel, request, timeoutMillis);   8         } catch (RemotingSendRequestException e) {   9             log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);  10             this.closeChannel(addr, channel);  11             throw e;  12         }  13     } else {  14         this.closeChannel(addr, channel);  15         throw new RemotingConnectException(addr);  16     }  17 }

首先根据broker的地址在channelTables中选取一个Channel(上一篇博客介绍过在Netty客户端会缓存一张建立好连接的Channel的map即channelTables)

然后和前面相似,执行所有配置了的RPCHook的doBeforeRequest方法
之后执行invokeOnewayImpl方法:

 1 public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)   2         throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {   3     request.markOnewayRPC();   4     boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);   5     if (acquired) {   6         final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);   7         try {   8             channel.writeAndFlush(request).addListener(new ChannelFutureListener() {   9                 @Override  10                 public void operationComplete(ChannelFuture f) throws Exception {  11                     once.release();  12                     if (!f.isSuccess()) {  13                         log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");  14                     }  15                 }  16             });  17         } catch (Exception e) {  18             once.release();  19             log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");  20             throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);  21         }  22     } else {  23         if (timeoutMillis <= 0) {  24             throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");  25         } else {  26             String info = String.format(  27                 "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",  28                 timeoutMillis,  29                 this.semaphoreOneway.getQueueLength(),  30                 this.semaphoreOneway.availablePermits()  31             );  32             log.warn(info);  33             throw new RemotingTimeoutException(info);  34         }  35     }  36 }

首先对request的标志位进行设置:

1 public void markOnewayRPC() {  2     int bits = 1 << RPC_ONEWAY;  3     this.flag |= bits;  4 }

接着会使用一个信号量SemaphoreReleaseOnlyOnce,会保证该信号量被释放一次
最后调用Netty的writeAndFlush方法,进行request的发送,同时设置了异步监听,用于成功后信号量的释放

由于是单向发送,发送完成后并没有过多的处理

 

ASYNC方式:
调用sendMessageAsync方法:

 1 private void sendMessageAsync(   2         final String addr,   3         final String brokerName,   4         final Message msg,   5         final long timeoutMillis,   6         final RemotingCommand request,   7         final SendCallback sendCallback,   8         final TopicPublishInfo topicPublishInfo,   9         final MQClientInstance instance,  10         final int retryTimesWhenSendFailed,  11         final AtomicInteger times,  12         final SendMessageContext context,  13         final DefaultMQProducerImpl producer  14     ) throws InterruptedException, RemotingException {  15     this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {  16         @Override  17         public void operationComplete(ResponseFuture responseFuture) {  18             RemotingCommand response = responseFuture.getResponseCommand();  19             if (null == sendCallback && response != null) {  20  21                 try {  22                     SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);  23                     if (context != null && sendResult != null) {  24                         context.setSendResult(sendResult);  25                         context.getProducer().executeSendMessageHookAfter(context);  26                     }  27                 } catch (Throwable e) {  28                 }  29  30                 producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);  31                 return;  32             }  33  34             if (response != null) {  35                 try {  36                     SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);  37                     assert sendResult != null;  38                     if (context != null) {  39                         context.setSendResult(sendResult);  40                         context.getProducer().executeSendMessageHookAfter(context);  41                     }  42  43                     try {  44                         sendCallback.onSuccess(sendResult);  45                     } catch (Throwable e) {  46                     }  47  48                     producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);  49                 } catch (Exception e) {  50                     producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);  51                     onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,  52                         retryTimesWhenSendFailed, times, e, context, false, producer);  53                 }  54             } else {  55                 producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);  56                 if (!responseFuture.isSendRequestOK()) {  57                     MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());  58                     onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,  59                         retryTimesWhenSendFailed, times, ex, context, true, producer);  60                 } else if (responseFuture.isTimeout()) {  61                     MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",  62                         responseFuture.getCause());  63                     onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,  64                         retryTimesWhenSendFailed, times, ex, context, true, producer);  65                 } else {  66                     MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());  67                     onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,  68                         retryTimesWhenSendFailed, times, ex, context, true, producer);  69                 }  70             }  71         }  72     });  73 }

在这里设置了一个InvokeCallback,用于处理发送之后的回调

先看到invokeAsync方法:

 1 public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)   2         throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,   3         RemotingSendRequestException {   4     long beginStartTime = System.currentTimeMillis();   5     final Channel channel = this.getAndCreateChannel(addr);   6     if (channel != null && channel.isActive()) {   7         try {   8             doBeforeRpcHooks(addr, request);   9             long costTime = System.currentTimeMillis() - beginStartTime;  10             if (timeoutMillis < costTime) {  11                 throw new RemotingTooMuchRequestException("invokeAsync call timeout");  12             }  13             this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);  14         } catch (RemotingSendRequestException e) {  15             log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);  16             this.closeChannel(addr, channel);  17             throw e;  18         }  19     } else {  20         this.closeChannel(addr, channel);  21         throw new RemotingConnectException(addr);  22     }  23 }

和前面ONEWAY类似,其具体实现是invokeAsyncImpl

invokeAsyncImpl方法:

 1 public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,   2         final InvokeCallback invokeCallback)   3         throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {   4     long beginStartTime = System.currentTimeMillis();   5     final int opaque = request.getOpaque();   6     boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);   7     if (acquired) {   8         final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);   9         long costTime = System.currentTimeMillis() - beginStartTime;  10         if (timeoutMillis < costTime) {  11             once.release();  12             throw new RemotingTimeoutException("invokeAsyncImpl call timeout");  13         }  14  15         final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);  16         this.responseTable.put(opaque, responseFuture);  17         try {  18             channel.writeAndFlush(request).addListener(new ChannelFutureListener() {  19                 @Override  20                 public void operationComplete(ChannelFuture f) throws Exception {  21                     if (f.isSuccess()) {  22                         responseFuture.setSendRequestOK(true);  23                         return;  24                     }  25                     requestFail(opaque);  26                     log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));  27                 }  28             });  29         } catch (Exception e) {  30             responseFuture.release();  31             log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);  32             throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);  33         }  34     } else {  35         if (timeoutMillis <= 0) {  36             throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");  37         } else {  38             String info =  39                 String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",  40                     timeoutMillis,  41                     this.semaphoreAsync.getQueueLength(),  42                     this.semaphoreAsync.availablePermits()  43                 );  44             log.warn(info);  45             throw new RemotingTimeoutException(info);  46         }  47     }  48 }

这里会通过request的getOpaque方法获取一个opaque值,这个值在request创建时就会被赋值,是一个自增的AtomicInteger,也就是每个request的唯一ID

之后会创建一个ResponseFuture封装invokeCallback及channel,并将其放入responseTable中
responseTable是一个map:

1 protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =  2     new ConcurrentHashMap<Integer, ResponseFuture>(256);

其记录了requestID对应的ResponseFuture,用于管理异步发送后,对接收到响应的异步事件处理
也就是说当发送完毕,接收到响应消息,会通过requestID查找到对应的ResponseFuture,进而执行刚才设置的InvokeCallback中的方法,在InvokeCallback中,会执行processSendResponse方法,完成Broker回送的响应消息的处理,最终根据情况会执行用户传入的SendCallback的onSuccess或者onException方法,以此完成消息的异步发送

之后的步骤和ONEWAY一样,由Netty的writeAndFlush完成发送

 

SYNC方式:
调用sendMessageSync方法:

 1 private SendResult sendMessageSync(   2         final String addr,   3         final String brokerName,   4         final Message msg,   5         final long timeoutMillis,   6         final RemotingCommand request   7     ) throws RemotingException, MQBrokerException, InterruptedException {   8     RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);   9     assert response != null;  10     return this.processSendResponse(brokerName, msg, response);  11 }

首先调用Netty客户端的invokeSync方法:

invokeSync方法:

 1 public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)   2         throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {   3     long beginStartTime = System.currentTimeMillis();   4     final Channel channel = this.getAndCreateChannel(addr);   5     if (channel != null && channel.isActive()) {   6         try {   7             doBeforeRpcHooks(addr, request);   8             long costTime = System.currentTimeMillis() - beginStartTime;   9             if (timeoutMillis < costTime) {  10                 throw new RemotingTimeoutException("invokeSync call timeout");  11             }  12             RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);  13             doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);  14             return response;  15         } catch (RemotingSendRequestException e) {  16             log.warn("invokeSync: send request exception, so close the channel[{}]", addr);  17             this.closeChannel(addr, channel);  18             throw e;  19         } catch (RemotingTimeoutException e) {  20             if (nettyClientConfig.isClientCloseSocketIfTimeout()) {  21                 this.closeChannel(addr, channel);  22                 log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);  23             }  24             log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);  25             throw e;  26         }  27     } else {  28         this.closeChannel(addr, channel);  29         throw new RemotingConnectException(addr);  30     }  31 }

还是和前面类似的步骤

直接看到invokeSyncImpl方法:

 1 public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,   2         final long timeoutMillis)   3         throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {   4     final int opaque = request.getOpaque();   5   6     try {   7         final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);   8         this.responseTable.put(opaque, responseFuture);   9         final SocketAddress addr = channel.remoteAddress();  10         channel.writeAndFlush(request).addListener(new ChannelFutureListener() {  11             @Override  12             public void operationComplete(ChannelFuture f) throws Exception {  13                 if (f.isSuccess()) {  14                     responseFuture.setSendRequestOK(true);  15                     return;  16                 } else {  17                     responseFuture.setSendRequestOK(false);  18                 }  19  20                 responseTable.remove(opaque);  21                 responseFuture.setCause(f.cause());  22                 responseFuture.putResponse(null);  23                 log.warn("send a request command to channel <" + addr + "> failed.");  24             }  25         });  26  27         RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);  28         if (null == responseCommand) {  29             if (responseFuture.isSendRequestOK()) {  30                 throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,  31                     responseFuture.getCause());  32             } else {  33                 throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());  34             }  35         }  36  37         return responseCommand;  38     } finally {  39         this.responseTable.remove(opaque);  40     }  41 }

和ASYNC基本一致,只不过在完成writeAndFlush后,使用responseFuture的waitResponse方法,在超时时间内进行等待response的回送
若是发送失败,则会在DefaultMQProducerImpl的sendDefaultImpl中的for循环继续,直至发送完成或者发送此时用完

若是在超时时间内,接收到Broker的回送response,在invokeSync中会执行配置了的RPCHook的doAfterResponse方法,然后在sendMessageSync中由processSendResponse处理接收到的响应

 

到此Producer的消息发送结束