RocketMQ中Producer消息的發送源碼分析

  • 2019 年 10 月 3 日
  • 筆記

上篇部落格介紹過Producer的啟動,這裡涉及到相關內容就不再累贅了 【RocketMQ中Producer的啟動源碼分析】

 

Producer發送消息,首先需要生成Message實例:

 1 public class Message implements Serializable {   2     private static final long serialVersionUID = 8445773977080406428L;   3   4     private String topic;   5     private int flag;   6     private Map<String, String> properties;   7     private byte[] body;   8     private String transactionId;   9  10      public Message() {}  11  12     public Message(String topic, byte[] body) {  13         this(topic, "", "", 0, body, true);  14     }  15  16     public Message(String topic, String tags, byte[] body) {  17         this(topic, tags, "", 0, body, true);  18     }  19  20     public Message(String topic, String tags, String keys, byte[] body) {  21         this(topic, tags, keys, 0, body, true);  22     }  23  24     public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {  25         this.topic = topic;  26         this.flag = flag;  27         this.body = body;  28  29         if (tags != null && tags.length() > 0)  30             this.setTags(tags);  31  32         if (keys != null && keys.length() > 0)  33             this.setKeys(keys);  34  35         this.setWaitStoreMsgOK(waitStoreMsgOK);  36     }  37  38     public void setTags(String tags) {  39         this.putProperty(MessageConst.PROPERTY_TAGS, tags);  40     }  41  42     public void setKeys(String keys) {  43         this.putProperty(MessageConst.PROPERTY_KEYS, keys);  44     }  45  46     public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {  47         this.putProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, Boolean.toString(waitStoreMsgOK));  48     }  49  50     void putProperty(final String name, final String value) {  51         if (null == this.properties) {  52             this.properties = new HashMap<String, String>();  53         }  54  55         this.properties.put(name, value);  56     }  57  58     public void putUserProperty(final String name, final String value) {  59         if (MessageConst.STRING_HASH_SET.contains(name)) {  60             throw new RuntimeException(String.format(  61                 "The Property<%s> is used by system, input another please", name));  62         }  63  64         if (value == null || value.trim().isEmpty()  65             || name == null || name.trim().isEmpty()) {  66             throw new IllegalArgumentException(  67                 "The name or value of property can not be null or blank string!"  68             );  69         }  70  71         this.putProperty(name, value);  72     }  73  74 }

其中properties中存放需要配置的屬性,由MessageConst規定其key:

 1 public class MessageConst {   2     public static final String PROPERTY_KEYS = "KEYS";   3     public static final String PROPERTY_TAGS = "TAGS";   4     public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT";   5     public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";   6     public static final String PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";   7     public static final String PROPERTY_REAL_TOPIC = "REAL_TOPIC";   8     public static final String PROPERTY_REAL_QUEUE_ID = "REAL_QID";   9     public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";  10     public static final String PROPERTY_PRODUCER_GROUP = "PGROUP";  11     public static final String PROPERTY_MIN_OFFSET = "MIN_OFFSET";  12     public static final String PROPERTY_MAX_OFFSET = "MAX_OFFSET";  13     public static final String PROPERTY_BUYER_ID = "BUYER_ID";  14     public static final String PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID";  15     public static final String PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG";  16     public static final String PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG";  17     public static final String PROPERTY_MQ2_FLAG = "MQ2_FLAG";  18     public static final String PROPERTY_RECONSUME_TIME = "RECONSUME_TIME";  19     public static final String PROPERTY_MSG_REGION = "MSG_REGION";  20     public static final String PROPERTY_TRACE_SWITCH = "TRACE_ON";  21     public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";  22     public static final String PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES";  23     public static final String PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME";  24     public static final String PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET = "TRAN_PREPARED_QUEUE_OFFSET";  25     public static final String PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES";  26     public static final String PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS = "CHECK_IMMUNITY_TIME_IN_SECONDS";  27 }

在創建完Message後,通過DefaultMQProducer的send方法對消息進行發送

Producer支援三種模式的消息發送,由CommunicationMode枚舉規定:

1 public enum CommunicationMode {  2     SYNC,  3     ASYNC,  4     ONEWAY,  5 }

分別代表:同步、非同步以及單向發送
其中同步和非同步是根據不同參數類型的send方法來決定的

只要send方法中帶有SendCallback參數,都代表著非同步發送,否則就是同步,SendCallback提供了非同步發送的回滾事件響應:

1 public interface SendCallback {  2     void onSuccess(final SendResult sendResult);  3  4     void onException(final Throwable e);  5 }

而單向發送需要使用sendOneway方法

 

無論使用哪種方式,最後都是通過調用DefaultMQProducer包裝的defaultMQProducerImpl的sendDefaultImpl方法

DefaultMQProducerImpl的sendDefaultImpl方法:

  1 private SendResult sendDefaultImpl(    2         Message msg,    3         final CommunicationMode communicationMode,    4         final SendCallback sendCallback,    5         final long timeout    6     ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {    7     this.makeSureStateOK();    8     Validators.checkMessage(msg, this.defaultMQProducer);    9   10     final long invokeID = random.nextLong();   11     long beginTimestampFirst = System.currentTimeMillis();   12     long beginTimestampPrev = beginTimestampFirst;   13     long endTimestamp = beginTimestampFirst;   14     TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());   15     if (topicPublishInfo != null && topicPublishInfo.ok()) {   16         boolean callTimeout = false;   17         MessageQueue mq = null;   18         Exception exception = null;   19         SendResult sendResult = null;   20         int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;   21         int times = 0;   22         String[] brokersSent = new String[timesTotal];   23         for (; times < timesTotal; times++) {   24             String lastBrokerName = null == mq ? null : mq.getBrokerName();   25             MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);   26             if (mqSelected != null) {   27                 mq = mqSelected;   28                 brokersSent[times] = mq.getBrokerName();   29                 try {   30                     beginTimestampPrev = System.currentTimeMillis();   31                     long costTime = beginTimestampPrev - beginTimestampFirst;   32                     if (timeout < costTime) {   33                         callTimeout = true;   34                         break;   35                     }   36   37                     sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);   38                     endTimestamp = System.currentTimeMillis();   39                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);   40                     switch (communicationMode) {   41                         case ASYNC:   42                             return null;   43                         case ONEWAY:   44                             return null;   45                         case SYNC:   46                             if (sendResult.getSendStatus() != SendStatus.SEND_OK) {   47                                 if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {   48                                     continue;   49                                 }   50                             }   51   52                             return sendResult;   53                         default:   54                             break;   55                     }   56                 } catch (RemotingException e) {   57                     endTimestamp = System.currentTimeMillis();   58                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);   59                     log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);   60                     log.warn(msg.toString());   61                     exception = e;   62                     continue;   63                 } catch (MQClientException e) {   64                     endTimestamp = System.currentTimeMillis();   65                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);   66                     log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);   67                     log.warn(msg.toString());   68                     exception = e;   69                     continue;   70                 } catch (MQBrokerException e) {   71                     endTimestamp = System.currentTimeMillis();   72                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);   73                     log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);   74                     log.warn(msg.toString());   75                     exception = e;   76                     switch (e.getResponseCode()) {   77                         case ResponseCode.TOPIC_NOT_EXIST:   78                         case ResponseCode.SERVICE_NOT_AVAILABLE:   79                         case ResponseCode.SYSTEM_ERROR:   80                         case ResponseCode.NO_PERMISSION:   81                         case ResponseCode.NO_BUYER_ID:   82                         case ResponseCode.NOT_IN_CURRENT_UNIT:   83                             continue;   84                         default:   85                             if (sendResult != null) {   86                                 return sendResult;   87                             }   88   89                             throw e;   90                     }   91                 } catch (InterruptedException e) {   92                     endTimestamp = System.currentTimeMillis();   93                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);   94                     log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);   95                     log.warn(msg.toString());   96   97                     log.warn("sendKernelImpl exception", e);   98                     log.warn(msg.toString());   99                     throw e;  100                 }  101             } else {  102                 break;  103             }  104         }  105  106         if (sendResult != null) {  107             return sendResult;  108         }  109  110         String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",  111             times,  112             System.currentTimeMillis() - beginTimestampFirst,  113             msg.getTopic(),  114             Arrays.toString(brokersSent));  115  116         info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);  117  118         MQClientException mqClientException = new MQClientException(info, exception);  119         if (callTimeout) {  120             throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");  121         }  122  123         if (exception instanceof MQBrokerException) {  124             mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());  125         } else if (exception instanceof RemotingConnectException) {  126             mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);  127         } else if (exception instanceof RemotingTimeoutException) {  128             mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);  129         } else if (exception instanceof MQClientException) {  130             mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);  131         }  132  133         throw mqClientException;  134     }  135  136     List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();  137     if (null == nsList || nsList.isEmpty()) {  138         throw new MQClientException(  139             "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);  140     }  141  142     throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),  143         null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);  144 }

其中CommunicationMode參數會根據調用的API進行如上所說進行發送類型的設置
而SendCallback參數,只有當使用非同步發送的API時才不是null

首先調用makeSureStateOK方法,檢查Producer是否啟動:

1 private void makeSureStateOK() throws MQClientException {  2     if (this.serviceState != ServiceState.RUNNING) {  3         throw new MQClientException("The producer service state not OK, "  4             + this.serviceState  5             + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),  6             null);  7     }  8 }

serviceState 在上一篇部落格中介紹過了

在檢查完Producer的狀態後,還需要通過Validators的checkTopic方法驗證Message的合法性:

 1 public static void checkTopic(String topic) throws MQClientException {   2     if (UtilAll.isBlank(topic)) {   3         throw new MQClientException("The specified topic is blank", null);   4     }   5   6     if (!regularExpressionMatcher(topic, PATTERN)) {   7         throw new MQClientException(String.format(   8             "The specified topic[%s] contains illegal characters, allowing only %s", topic,   9             VALID_PATTERN_STR), null);  10     }  11  12     if (topic.length() > CHARACTER_MAX_LENGTH) {  13         throw new MQClientException("The specified topic is longer than topic max length 255.", null);  14     }  15  16     //whether the same with system reserved keyword  17     if (topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {  18         throw new MQClientException(  19             String.format("The topic[%s] is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.", topic), null);  20     }  21 }

驗證完畢後,記錄開始時間戳,預示著發送的真正開始

接著調用tryToFindTopicPublishInfo,根據Topic獲取路由資訊
tryToFindTopicPublishInfo方法:

 1 private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {   2     TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);   3     if (null == topicPublishInfo || !topicPublishInfo.ok()) {   4         this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());   5         this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);   6         topicPublishInfo = this.topicPublishInfoTable.get(topic);   7     }   8   9     if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {  10         return topicPublishInfo;  11     } else {  12         this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);  13         topicPublishInfo = this.topicPublishInfoTable.get(topic);  14         return topicPublishInfo;  15     }  16 }

在Producer啟動中已經介紹過了topicPublishInfoTable,是一張記錄有關Topic的路由資訊的map,先嘗試獲取是否有存在的TopicPublishInfo
若是不存在,或者消息隊列不可用(ok不成立):

1 public boolean ok() {  2     return null != this.messageQueueList && !this.messageQueueList.isEmpty();  3 }

ok用來驗證該路由上的消息隊列是否可用

需要創建一個新的TopicPublishInfo放在map中,然後調用updateTopicRouteInfoFromNameServer來更新路由資訊,updateTopicRouteInfoFromNameServer在上一篇說過,在定時任務中會使用,這裡就是為了及時更新

若是存在,且有路由資訊消息隊列可用,則直接返回topicPublishInfo
否則還需要調用updateTopicRouteInfoFromNameServer來進行一次更新

回到sendDefaultImpl,在取得到路由資訊後,現設置callTimeout超時響應為false,用於處理髮送超時
接著根據發送方式CommunicationMode,計算如果發送失敗,允許重發的次數,這裡是針對同步發送,默認1+2共三次,其他兩種模式只允許發送一次

根據發送次數,創建一個記錄BrokerName的數組,再由發送次數進行for循環

首先根據topicPublishInfo和lastBrokerName調用selectOneMessageQueue選取指定的消息隊列,是由TopicPublishInfo的selectOneMessageQueue方法實現的:

 1 public MessageQueue selectOneMessageQueue(final String lastBrokerName) {   2     if (lastBrokerName == null) {   3         return selectOneMessageQueue();   4     } else {   5         int index = this.sendWhichQueue.getAndIncrement();   6         for (int i = 0; i < this.messageQueueList.size(); i++) {   7             int pos = Math.abs(index++) % this.messageQueueList.size();   8             if (pos < 0)   9                 pos = 0;  10             MessageQueue mq = this.messageQueueList.get(pos);  11             if (!mq.getBrokerName().equals(lastBrokerName)) {  12                 return mq;  13             }  14         }  15         return selectOneMessageQueue();  16     }  17 }  18  19 public MessageQueue selectOneMessageQueue() {  20     int index = this.sendWhichQueue.getAndIncrement();  21     int pos = Math.abs(index) % this.messageQueueList.size();  22     if (pos < 0)  23         pos = 0;  24     return this.messageQueueList.get(pos);  25 }

當lastBrokerName等於null,使用selectOneMessageQueue的無參方法,其中sendWhichQueue我在上一篇介紹過,不同執行緒通過getAndIncrement獲得到的index是一個隨機值
根據這個index對messageQueueList取余,來獲取在list中的下標,根據這個下標在messageQueueList中選取一個MessageQueue
由於不同的MessageQueue有不同的路由資訊,所里在這裡其實是為了負載均衡,保證每次發送能發送給不同的broker

若是lastBrokerName不等於null,還是和上面相似,只不過當選取到了MessageQueue時,要和lastBrokerName比較,當不想同時,才返回,同樣也是為了保證不向同一broker重複發送來保證負載均衡

回到sendDefaultImpl,在選取完MessageQueue後,記錄BrokerName,在計算是否達到超時事件,當這些成功後需要調用sendKernelImpl來完成真正的發送:
sendKernelImpl方法:

  1 private SendResult sendKernelImpl(final Message msg,    2                                       final MessageQueue mq,    3                                       final CommunicationMode communicationMode,    4                                       final SendCallback sendCallback,    5                                       final TopicPublishInfo topicPublishInfo,    6                                       final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {    7     long beginStartTime = System.currentTimeMillis();    8     String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());    9     if (null == brokerAddr) {   10         tryToFindTopicPublishInfo(mq.getTopic());   11         brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());   12     }   13   14     SendMessageContext context = null;   15     if (brokerAddr != null) {   16         brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);   17   18         byte[] prevBody = msg.getBody();   19         try {   20             //for MessageBatch,ID has been set in the generating process   21             if (!(msg instanceof MessageBatch)) {   22                 MessageClientIDSetter.setUniqID(msg);   23             }   24   25             int sysFlag = 0;   26             boolean msgBodyCompressed = false;   27             if (this.tryToCompressMessage(msg)) {   28                 sysFlag |= MessageSysFlag.COMPRESSED_FLAG;   29                 msgBodyCompressed = true;   30             }   31   32             final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);   33             if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {   34                 sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;   35             }   36   37             if (hasCheckForbiddenHook()) {   38                 CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();   39                 checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());   40                 checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());   41                 checkForbiddenContext.setCommunicationMode(communicationMode);   42                 checkForbiddenContext.setBrokerAddr(brokerAddr);   43                 checkForbiddenContext.setMessage(msg);   44                 checkForbiddenContext.setMq(mq);   45                 checkForbiddenContext.setUnitMode(this.isUnitMode());   46                 this.executeCheckForbiddenHook(checkForbiddenContext);   47             }   48   49             if (this.hasSendMessageHook()) {   50                 context = new SendMessageContext();   51                 context.setProducer(this);   52                 context.setProducerGroup(this.defaultMQProducer.getProducerGroup());   53                 context.setCommunicationMode(communicationMode);   54                 context.setBornHost(this.defaultMQProducer.getClientIP());   55                 context.setBrokerAddr(brokerAddr);   56                 context.setMessage(msg);   57                 context.setMq(mq);   58                 String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);   59                 if (isTrans != null && isTrans.equals("true")) {   60                     context.setMsgType(MessageType.Trans_Msg_Half);   61                 }   62   63                 if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {   64                     context.setMsgType(MessageType.Delay_Msg);   65                 }   66                 this.executeSendMessageHookBefore(context);   67             }   68   69             SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();   70             requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());   71             requestHeader.setTopic(msg.getTopic());   72             requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());   73             requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());   74             requestHeader.setQueueId(mq.getQueueId());   75             requestHeader.setSysFlag(sysFlag);   76             requestHeader.setBornTimestamp(System.currentTimeMillis());   77             requestHeader.setFlag(msg.getFlag());   78             requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));   79             requestHeader.setReconsumeTimes(0);   80             requestHeader.setUnitMode(this.isUnitMode());   81             requestHeader.setBatch(msg instanceof MessageBatch);   82             if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {   83                 String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);   84                 if (reconsumeTimes != null) {   85                     requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));   86                     MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);   87                 }   88   89                 String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);   90                 if (maxReconsumeTimes != null) {   91                     requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));   92                     MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);   93                 }   94             }   95   96             SendResult sendResult = null;   97             switch (communicationMode) {   98                 case ASYNC:   99                     Message tmpMessage = msg;  100                     if (msgBodyCompressed) {  101                         //If msg body was compressed, msgbody should be reset using prevBody.  102                         //Clone new message using commpressed message body and recover origin massage.  103                         //Fix bug:https://github.com/apache/rocketmq-externals/issues/66  104                         tmpMessage = MessageAccessor.cloneMessage(msg);  105                         msg.setBody(prevBody);  106                     }  107                     long costTimeAsync = System.currentTimeMillis() - beginStartTime;  108                     if (timeout < costTimeAsync) {  109                         throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");  110                     }  111                     sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(  112                         brokerAddr,  113                         mq.getBrokerName(),  114                         tmpMessage,  115                         requestHeader,  116                         timeout - costTimeAsync,  117                         communicationMode,  118                         sendCallback,  119                         topicPublishInfo,  120                         this.mQClientFactory,  121                         this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),  122                         context,  123                         this);  124                     break;  125                 case ONEWAY:  126                 case SYNC:  127                     long costTimeSync = System.currentTimeMillis() - beginStartTime;  128                     if (timeout < costTimeSync) {  129                         throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");  130                     }  131                     sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(  132                         brokerAddr,  133                         mq.getBrokerName(),  134                         msg,  135                         requestHeader,  136                         timeout - costTimeSync,  137                         communicationMode,  138                         context,  139                         this);  140                     break;  141                 default:  142                     assert false;  143                     break;  144             }  145  146             if (this.hasSendMessageHook()) {  147                 context.setSendResult(sendResult);  148                 this.executeSendMessageHookAfter(context);  149             }  150  151             return sendResult;  152         } catch (RemotingException e) {  153             if (this.hasSendMessageHook()) {  154                 context.setException(e);  155                 this.executeSendMessageHookAfter(context);  156             }  157             throw e;  158         } catch (MQBrokerException e) {  159             if (this.hasSendMessageHook()) {  160                 context.setException(e);  161                 this.executeSendMessageHookAfter(context);  162             }  163             throw e;  164         } catch (InterruptedException e) {  165             if (this.hasSendMessageHook()) {  166                 context.setException(e);  167                 this.executeSendMessageHookAfter(context);  168             }  169             throw e;  170         } finally {  171             msg.setBody(prevBody);  172         }  173     }  174  175     throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);  176 }

先記錄開始時間beginStartTime,為可能的超時做準備
然後根據BrokerName來獲取對應的Broker地址
findBrokerAddressInPublish方法:

1 public String findBrokerAddressInPublish(final String brokerName) {  2     HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);  3     if (map != null && !map.isEmpty()) {  4         return map.get(MixAll.MASTER_ID);  5     }  6  7     return null;  8 }

根據brokerName在brokerAddrTable表中進行查找

若是沒有找到還是通過tryToFindTopicPublishInfo來進行更新,然後再通過findBrokerAddressInPublish重新查找

再往後,如果設置了VIP(高優先順序隊列)通道,那麼這裡將根據brokerAddr獲取VIP通道的的地址:
MixAll的brokerVIPChannel方法:

1 public static String brokerVIPChannel(final boolean isChange, final String brokerAddr) {  2     if (isChange) {  3         String[] ipAndPort = brokerAddr.split(":");  4         String brokerAddrNew = ipAndPort[0] + ":" + (Integer.parseInt(ipAndPort[1]) - 2);  5         return brokerAddrNew;  6     } else {  7         return brokerAddr;  8     }  9 }

VIP通道的地址計算很簡單,只是將埠號減去2

在設置完後,就是一大堆的配置了

這裡定義了一個sysFlag的整型值,表示消息的類型,有如下取值:

1 public class MessageSysFlag {  2     public final static int COMPRESSED_FLAG = 0x1;  3     public final static int MULTI_TAGS_FLAG = 0x1 << 1;  4     public final static int TRANSACTION_NOT_TYPE = 0;  5     public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2;  6     public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2;  7     public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2;  8 }

還定義了一個msgBodyCompressed,表示消息是否經過壓縮,tryToCompressMessage判斷並對消息進行壓縮:
tryToCompressMessage方法:

 1 private boolean tryToCompressMessage(final Message msg) {   2     if (msg instanceof MessageBatch) {   3         //batch dose not support compressing right now   4         return false;   5     }   6     byte[] body = msg.getBody();   7     if (body != null) {   8         if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {   9             try {  10                 byte[] data = UtilAll.compress(body, zipCompressLevel);  11                 if (data != null) {  12                     msg.setBody(data);  13                     return true;  14                 }  15             } catch (IOException e) {  16                 log.error("tryToCompressMessage exception", e);  17                 log.warn(msg.toString());  18             }  19         }  20     }  21  22     return false;  23 }

當消息大小大於等於compressMsgBodyOverHowmuch(默認4M)時,使用UtilAll的compress消息進行壓縮處理:

 1 public static byte[] compress(final byte[] src, final int level) throws IOException {   2     byte[] result = src;   3     ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length);   4     java.util.zip.Deflater defeater = new java.util.zip.Deflater(level);   5     DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(byteArrayOutputStream, defeater);   6     try {   7         deflaterOutputStream.write(src);   8         deflaterOutputStream.finish();   9         deflaterOutputStream.close();  10         result = byteArrayOutputStream.toByteArray();  11     } catch (IOException e) {  12         defeater.end();  13         throw e;  14     } finally {  15         try {  16             byteArrayOutputStream.close();  17         } catch (IOException ignored) {  18         }  19  20         defeater.end();  21     }  22  23     return result;  24 }

這裡採用zip的方式進行消息壓縮

接下來,根據消息是否是事務消息來選擇設置sysFlag,關於事務消息在後面部落格再說

接下來檢查是否設置了CheckForbiddenHook,若是設置了需要遍歷所有的CheckForbiddenHook,執行其 checkForbidden方法,來完成禁發

同理檢查是否設置了SendMessageHook,遍歷所有的SendMessageHook,執行其sendMessageBefore方法,在消息發送完畢後,會執行其sendMessageAfter方法

接著會對請求頭requestHeader進行一大堆設置,做完這些後,進入switch塊,根據不同的發送方式做了相應檢查
最後無論是哪種發送方式,都會調用MQClientAPIImpl的sendMessage方法:

 1 public SendResult sendMessage(   2     final String addr,   3     final String brokerName,   4     final Message msg,   5     final SendMessageRequestHeader requestHeader,   6     final long timeoutMillis,   7     final CommunicationMode communicationMode,   8     final SendCallback sendCallback,   9     final TopicPublishInfo topicPublishInfo,  10     final MQClientInstance instance,  11     final int retryTimesWhenSendFailed,  12     final SendMessageContext context,  13     final DefaultMQProducerImpl producer  14 ) throws RemotingException, MQBrokerException, InterruptedException {  15     long beginStartTime = System.currentTimeMillis();  16     RemotingCommand request = null;  17     if (sendSmartMsg || msg instanceof MessageBatch) {  18         SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);  19         request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);  20     } else {  21         request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);  22     }  23  24     request.setBody(msg.getBody());  25  26     switch (communicationMode) {  27         case ONEWAY:  28             this.remotingClient.invokeOneway(addr, request, timeoutMillis);  29             return null;  30         case ASYNC:  31             final AtomicInteger times = new AtomicInteger();  32             long costTimeAsync = System.currentTimeMillis() - beginStartTime;  33             if (timeoutMillis < costTimeAsync) {  34                 throw new RemotingTooMuchRequestException("sendMessage call timeout");  35             }  36             this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,  37                 retryTimesWhenSendFailed, times, context, producer);  38             return null;  39         case SYNC:  40             long costTimeSync = System.currentTimeMillis() - beginStartTime;  41             if (timeoutMillis < costTimeSync) {  42                 throw new RemotingTooMuchRequestException("sendMessage call timeout");  43             }  44             return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);  45         default:  46             assert false;  47             break;  48     }  49  50     return null;  51 }

首先會根據消息的類型,設置不同類型的請求RemotingCommand

在完成請求的封裝後,還是根據發送方式來執行

ONEWAY方式:
會直接調用remotingClient即Netty客戶端的invokeOneway方法:

 1 public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,   2         RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {   3     final Channel channel = this.getAndCreateChannel(addr);   4     if (channel != null && channel.isActive()) {   5         try {   6             doBeforeRpcHooks(addr, request);   7             this.invokeOnewayImpl(channel, request, timeoutMillis);   8         } catch (RemotingSendRequestException e) {   9             log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);  10             this.closeChannel(addr, channel);  11             throw e;  12         }  13     } else {  14         this.closeChannel(addr, channel);  15         throw new RemotingConnectException(addr);  16     }  17 }

首先根據broker的地址在channelTables中選取一個Channel(上一篇部落格介紹過在Netty客戶端會快取一張建立好連接的Channel的map即channelTables)

然後和前面相似,執行所有配置了的RPCHook的doBeforeRequest方法
之後執行invokeOnewayImpl方法:

 1 public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)   2         throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {   3     request.markOnewayRPC();   4     boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);   5     if (acquired) {   6         final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);   7         try {   8             channel.writeAndFlush(request).addListener(new ChannelFutureListener() {   9                 @Override  10                 public void operationComplete(ChannelFuture f) throws Exception {  11                     once.release();  12                     if (!f.isSuccess()) {  13                         log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");  14                     }  15                 }  16             });  17         } catch (Exception e) {  18             once.release();  19             log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");  20             throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);  21         }  22     } else {  23         if (timeoutMillis <= 0) {  24             throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");  25         } else {  26             String info = String.format(  27                 "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",  28                 timeoutMillis,  29                 this.semaphoreOneway.getQueueLength(),  30                 this.semaphoreOneway.availablePermits()  31             );  32             log.warn(info);  33             throw new RemotingTimeoutException(info);  34         }  35     }  36 }

首先對request的標誌位進行設置:

1 public void markOnewayRPC() {  2     int bits = 1 << RPC_ONEWAY;  3     this.flag |= bits;  4 }

接著會使用一個訊號量SemaphoreReleaseOnlyOnce,會保證該訊號量被釋放一次
最後調用Netty的writeAndFlush方法,進行request的發送,同時設置了非同步監聽,用於成功後訊號量的釋放

由於是單向發送,發送完成後並沒有過多的處理

 

ASYNC方式:
調用sendMessageAsync方法:

 1 private void sendMessageAsync(   2         final String addr,   3         final String brokerName,   4         final Message msg,   5         final long timeoutMillis,   6         final RemotingCommand request,   7         final SendCallback sendCallback,   8         final TopicPublishInfo topicPublishInfo,   9         final MQClientInstance instance,  10         final int retryTimesWhenSendFailed,  11         final AtomicInteger times,  12         final SendMessageContext context,  13         final DefaultMQProducerImpl producer  14     ) throws InterruptedException, RemotingException {  15     this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {  16         @Override  17         public void operationComplete(ResponseFuture responseFuture) {  18             RemotingCommand response = responseFuture.getResponseCommand();  19             if (null == sendCallback && response != null) {  20  21                 try {  22                     SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);  23                     if (context != null && sendResult != null) {  24                         context.setSendResult(sendResult);  25                         context.getProducer().executeSendMessageHookAfter(context);  26                     }  27                 } catch (Throwable e) {  28                 }  29  30                 producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);  31                 return;  32             }  33  34             if (response != null) {  35                 try {  36                     SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);  37                     assert sendResult != null;  38                     if (context != null) {  39                         context.setSendResult(sendResult);  40                         context.getProducer().executeSendMessageHookAfter(context);  41                     }  42  43                     try {  44                         sendCallback.onSuccess(sendResult);  45                     } catch (Throwable e) {  46                     }  47  48                     producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);  49                 } catch (Exception e) {  50                     producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);  51                     onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,  52                         retryTimesWhenSendFailed, times, e, context, false, producer);  53                 }  54             } else {  55                 producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);  56                 if (!responseFuture.isSendRequestOK()) {  57                     MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());  58                     onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,  59                         retryTimesWhenSendFailed, times, ex, context, true, producer);  60                 } else if (responseFuture.isTimeout()) {  61                     MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",  62                         responseFuture.getCause());  63                     onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,  64                         retryTimesWhenSendFailed, times, ex, context, true, producer);  65                 } else {  66                     MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());  67                     onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,  68                         retryTimesWhenSendFailed, times, ex, context, true, producer);  69                 }  70             }  71         }  72     });  73 }

在這裡設置了一個InvokeCallback,用於處理髮送之後的回調

先看到invokeAsync方法:

 1 public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)   2         throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,   3         RemotingSendRequestException {   4     long beginStartTime = System.currentTimeMillis();   5     final Channel channel = this.getAndCreateChannel(addr);   6     if (channel != null && channel.isActive()) {   7         try {   8             doBeforeRpcHooks(addr, request);   9             long costTime = System.currentTimeMillis() - beginStartTime;  10             if (timeoutMillis < costTime) {  11                 throw new RemotingTooMuchRequestException("invokeAsync call timeout");  12             }  13             this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);  14         } catch (RemotingSendRequestException e) {  15             log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);  16             this.closeChannel(addr, channel);  17             throw e;  18         }  19     } else {  20         this.closeChannel(addr, channel);  21         throw new RemotingConnectException(addr);  22     }  23 }

和前面ONEWAY類似,其具體實現是invokeAsyncImpl

invokeAsyncImpl方法:

 1 public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,   2         final InvokeCallback invokeCallback)   3         throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {   4     long beginStartTime = System.currentTimeMillis();   5     final int opaque = request.getOpaque();   6     boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);   7     if (acquired) {   8         final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);   9         long costTime = System.currentTimeMillis() - beginStartTime;  10         if (timeoutMillis < costTime) {  11             once.release();  12             throw new RemotingTimeoutException("invokeAsyncImpl call timeout");  13         }  14  15         final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);  16         this.responseTable.put(opaque, responseFuture);  17         try {  18             channel.writeAndFlush(request).addListener(new ChannelFutureListener() {  19                 @Override  20                 public void operationComplete(ChannelFuture f) throws Exception {  21                     if (f.isSuccess()) {  22                         responseFuture.setSendRequestOK(true);  23                         return;  24                     }  25                     requestFail(opaque);  26                     log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));  27                 }  28             });  29         } catch (Exception e) {  30             responseFuture.release();  31             log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);  32             throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);  33         }  34     } else {  35         if (timeoutMillis <= 0) {  36             throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");  37         } else {  38             String info =  39                 String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",  40                     timeoutMillis,  41                     this.semaphoreAsync.getQueueLength(),  42                     this.semaphoreAsync.availablePermits()  43                 );  44             log.warn(info);  45             throw new RemotingTimeoutException(info);  46         }  47     }  48 }

這裡會通過request的getOpaque方法獲取一個opaque值,這個值在request創建時就會被賦值,是一個自增的AtomicInteger,也就是每個request的唯一ID

之後會創建一個ResponseFuture封裝invokeCallback及channel,並將其放入responseTable中
responseTable是一個map:

1 protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =  2     new ConcurrentHashMap<Integer, ResponseFuture>(256);

其記錄了requestID對應的ResponseFuture,用於管理非同步發送後,對接收到響應的非同步事件處理
也就是說當發送完畢,接收到響應消息,會通過requestID查找到對應的ResponseFuture,進而執行剛才設置的InvokeCallback中的方法,在InvokeCallback中,會執行processSendResponse方法,完成Broker回送的響應消息的處理,最終根據情況會執行用戶傳入的SendCallback的onSuccess或者onException方法,以此完成消息的非同步發送

之後的步驟和ONEWAY一樣,由Netty的writeAndFlush完成發送

 

SYNC方式:
調用sendMessageSync方法:

 1 private SendResult sendMessageSync(   2         final String addr,   3         final String brokerName,   4         final Message msg,   5         final long timeoutMillis,   6         final RemotingCommand request   7     ) throws RemotingException, MQBrokerException, InterruptedException {   8     RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);   9     assert response != null;  10     return this.processSendResponse(brokerName, msg, response);  11 }

首先調用Netty客戶端的invokeSync方法:

invokeSync方法:

 1 public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)   2         throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {   3     long beginStartTime = System.currentTimeMillis();   4     final Channel channel = this.getAndCreateChannel(addr);   5     if (channel != null && channel.isActive()) {   6         try {   7             doBeforeRpcHooks(addr, request);   8             long costTime = System.currentTimeMillis() - beginStartTime;   9             if (timeoutMillis < costTime) {  10                 throw new RemotingTimeoutException("invokeSync call timeout");  11             }  12             RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);  13             doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);  14             return response;  15         } catch (RemotingSendRequestException e) {  16             log.warn("invokeSync: send request exception, so close the channel[{}]", addr);  17             this.closeChannel(addr, channel);  18             throw e;  19         } catch (RemotingTimeoutException e) {  20             if (nettyClientConfig.isClientCloseSocketIfTimeout()) {  21                 this.closeChannel(addr, channel);  22                 log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);  23             }  24             log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);  25             throw e;  26         }  27     } else {  28         this.closeChannel(addr, channel);  29         throw new RemotingConnectException(addr);  30     }  31 }

還是和前面類似的步驟

直接看到invokeSyncImpl方法:

 1 public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,   2         final long timeoutMillis)   3         throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {   4     final int opaque = request.getOpaque();   5   6     try {   7         final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);   8         this.responseTable.put(opaque, responseFuture);   9         final SocketAddress addr = channel.remoteAddress();  10         channel.writeAndFlush(request).addListener(new ChannelFutureListener() {  11             @Override  12             public void operationComplete(ChannelFuture f) throws Exception {  13                 if (f.isSuccess()) {  14                     responseFuture.setSendRequestOK(true);  15                     return;  16                 } else {  17                     responseFuture.setSendRequestOK(false);  18                 }  19  20                 responseTable.remove(opaque);  21                 responseFuture.setCause(f.cause());  22                 responseFuture.putResponse(null);  23                 log.warn("send a request command to channel <" + addr + "> failed.");  24             }  25         });  26  27         RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);  28         if (null == responseCommand) {  29             if (responseFuture.isSendRequestOK()) {  30                 throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,  31                     responseFuture.getCause());  32             } else {  33                 throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());  34             }  35         }  36  37         return responseCommand;  38     } finally {  39         this.responseTable.remove(opaque);  40     }  41 }

和ASYNC基本一致,只不過在完成writeAndFlush後,使用responseFuture的waitResponse方法,在超時時間內進行等待response的回送
若是發送失敗,則會在DefaultMQProducerImpl的sendDefaultImpl中的for循環繼續,直至發送完成或者發送此時用完

若是在超時時間內,接收到Broker的回送response,在invokeSync中會執行配置了的RPCHook的doAfterResponse方法,然後在sendMessageSync中由processSendResponse處理接收到的響應

 

到此Producer的消息發送結束