【RocketMQ源碼學習】- 3. Client 發送同步消息

  • 2019 年 11 月 12 日
  • 筆記

本文較長,程式碼後面給了方法簡圖,希望給你幫助

發送的方式

  • 同步發送

  • 非同步發送

消息的類型

  • 普通消息

  • 順序消息

  • 事務消息

發送同步消息的時序圖

為了防止讀者朋友嫌煩,可以看下時序圖,後面我也會給出方法的簡圖

 

 

源碼示例【發送同步消息】

調用DefaultMQProducer.send()發送同步消息

同時需要設置發送的nameSrvAddrproducerGroupName

可以設置發送的超時時間,(默認3s), msgQueueNum(默認4個), 生產端發送非同步消息失敗重試次數(默認2次),同步消息無重試次數

public class Producer {      public static void main(String[] args) throws MQClientException, InterruptedException {            DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");          producer.setNamesrvAddr("127.0.0.1:9876");          producer.setInstanceName("producer1");            producer.start();            for (int i = 0; i < 10; i++)              try {                  {                      Message msg = new Message("TopicTest",                          "TagA",                          "OrderID188",                              ("Hello world" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));                      // send方法                      SendResult sendResult = producer.send(msg);                      System.out.printf("%s%n", sendResult);                  }                } catch (Exception e) {                  e.printStackTrace();              }            producer.shutdown();      }  }

3. send方法內部調用sendDefaultImpl()

private SendResult sendDefaultImpl(      Message msg,      final CommunicationMode communicationMode,      final SendCallback sendCallback,      final long timeout      // + 用戶來處理異常  ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {      // + 確保服務狀態是RUNNING      this.makeSureStateOK();      // + 傳參判空      Validators.checkMessage(msg, this.defaultMQProducer);        final long invokeID = random.nextLong();      long beginTimestampFirst = System.currentTimeMillis();      long beginTimestampPrev = beginTimestampFirst;      long endTimestamp = beginTimestampFirst;      /**       * 根據topic的name,從本地獲取tocip資訊,如果本地沒有就從nameserver中取,同時快取到本地       * 包括MessageQueueList, brokeName, topic_name       */      TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());      if (topicPublishInfo != null && topicPublishInfo.ok()) {          boolean callTimeout = false;          MessageQueue mq = null;          Exception exception = null;          SendResult sendResult = null;          /**           * 生產端的重試:非同步方式最大執行次數總共3次,同步1次,           * 重試針對的是brokeExceptionMQClientExceptionRemotingException返回值失敗           */          int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;          int times = 0;          String[] brokersSent = new String[timesTotal];          for (; times < timesTotal; times++) {              String lastBrokerName = null == mq ? null : mq.getBrokerName();              /**               * 根據topic和broke選擇1個隊列               * 選擇策略,產生一個隨機數,hash % broke中隊列數,然後hash+1               * 這個隨機數:是執行緒私有的               */              MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);              if (mqSelected != null) {                  mq = mqSelected;                  // 在重試數組中放入broke_name                  brokersSent[times] = mq.getBrokerName();                  try {                      beginTimestampPrev = System.currentTimeMillis();                      if (times > 0) {                          // Reset topic with namespace during resend.                          // 重置topic                          msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));                      }                      // 超時就break,拋出call timeout異常,這時還沒有,通過socket重試                      long costTime = beginTimestampPrev - beginTimestampFirst;                      if (timeout < costTime) {                          callTimeout = true;                          break;                      }                        // 【核心,如下】調用sendKernelImpl方法,想選中的messageQueu中投遞消息                      sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);                      // 獲取當前時間,控制所有步驟的時間,不超過用戶設置的超時時間,或默認超時時間                      endTimestamp = System.currentTimeMillis();                        // 把這個操作時間記錄到map中,key=brokerName, value=對象[包括:brokeName, currentLatency當前操作花費時間,startTimestamp開始的時間]                      this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);                      switch (communicationMode) {                          case ASYNC:                              return null;                          case ONEWAY:                              return null;                          case SYNC:                              // 當開啟了重試另外一個broke時,才會失敗重試                              if (sendResult.getSendStatus() != SendStatus.SEND_OK) {                                  if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {                                      continue;                                  }                              }                                return sendResult;                          default:                              break;                      }                  } catch (RemotingException e) {                      // 遠程調用時異常,會重試                      endTimestamp = System.currentTimeMillis();                      // 記錄操作時間                      this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                      log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                      log.warn(msg.toString());                      exception = e;                      continue;                  } catch (MQClientException e) {                      endTimestamp = System.currentTimeMillis();                      // 記錄操作時間                      this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                      log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                      log.warn(msg.toString());                      exception = e;                      continue;                  } catch (MQBrokerException e) {                      endTimestamp = System.currentTimeMillis();                      // 記錄操作時間                      this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                      log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                      log.warn(msg.toString());                      exception = e;                      switch (e.getResponseCode()) {                          case ResponseCode.TOPIC_NOT_EXIST:          // topic不存在                          case ResponseCode.SERVICE_NOT_AVAILABLE:    // 服務不可用                          case ResponseCode.SYSTEM_ERROR:             // 系統錯誤                          case ResponseCode.NO_PERMISSION:            // 無許可權                          case ResponseCode.NO_BUYER_ID:              //                          case ResponseCode.NOT_IN_CURRENT_UNIT:      // 不在集群中                              continue;                          default:                              if (sendResult != null) {                                  return sendResult;                              }                                throw e;                      }                  } catch (InterruptedException e) {                      endTimestamp = System.currentTimeMillis();                      // 記錄操作時間                      this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);                      log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                      log.warn(msg.toString());                        log.warn("sendKernelImpl exception", e);                      log.warn(msg.toString());                      throw e;                  }              } else {                  break;              }          }            if (sendResult != null) {              return sendResult;          }            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",              times,              System.currentTimeMillis() - beginTimestampFirst,              msg.getTopic(),              Arrays.toString(brokersSent));            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);            MQClientException mqClientException = new MQClientException(info, exception);          if (callTimeout) {              throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");          }            if (exception instanceof MQBrokerException) {              mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());          } else if (exception instanceof RemotingConnectException) {              mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);          } else if (exception instanceof RemotingTimeoutException) {              mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);          } else if (exception instanceof MQClientException) {              mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);          }            throw mqClientException;      }        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();      if (null == nsList || nsList.isEmpty()) {          throw new MQClientException(              "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);      }        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),          null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);  }

sendDefaultImpl方法簡圖

  

4. 構建發送參數,使用netty發送消息[sendKernelImpl]

private SendResult sendKernelImpl(final Message msg,                                      final MessageQueue mq,                                      final CommunicationMode communicationMode,                                      final SendCallback sendCallback,                                      final TopicPublishInfo topicPublishInfo,                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {      long beginStartTime = System.currentTimeMillis();      // 獲取broker的IP地址,獲取到的是主broker      String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());      if (null == brokerAddr) {          // 使用topic的name去獲取topic,如果本地沒有,則在從nameserver中獲取,同時也更新broker的資訊          tryToFindTopicPublishInfo(mq.getTopic());          brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());      }        SendMessageContext context = null;      if (brokerAddr != null) {          // 若開啟了vipchannel,broke的埠減2          brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);            byte[] prevBody = msg.getBody();          try {              //for MessageBatch,ID has been set in the generating process              if (!(msg instanceof MessageBatch)) {                  // 設置UNIQ_KEY                  MessageClientIDSetter.setUniqID(msg);              }                boolean topicWithNamespace = false;              if (null != this.mQClientFactory.getClientConfig().getNamespace()) {                  msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());                  topicWithNamespace = true;              }                // 嘗試壓縮消息,有一定的條件,不是MessageBatch,消息超過4k              int sysFlag = 0;              boolean msgBodyCompressed = false;              if (this.tryToCompressMessage(msg)) {                  sysFlag |= MessageSysFlag.COMPRESSED_FLAG;                  msgBodyCompressed = true;              }                // 判斷是否是事務消息              final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);              if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {                  sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;              }                // 禁用鉤子 todo 疑問待解=>              if (hasCheckForbiddenHook()) {                  CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();                  checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());                  checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());                  checkForbiddenContext.setCommunicationMode(communicationMode);                  checkForbiddenContext.setBrokerAddr(brokerAddr);                  checkForbiddenContext.setMessage(msg);                  checkForbiddenContext.setMq(mq);                  checkForbiddenContext.setUnitMode(this.isUnitMode());                  this.executeCheckForbiddenHook(checkForbiddenContext);              }                // 發送消息鉤子              if (this.hasSendMessageHook()) {                  context = new SendMessageContext();                  context.setProducer(this);                  context.setProducerGroup(this.defaultMQProducer.getProducerGroup());                  context.setCommunicationMode(communicationMode);                  context.setBornHost(this.defaultMQProducer.getClientIP());                  context.setBrokerAddr(brokerAddr);                  context.setMessage(msg);                  context.setMq(mq);                  context.setNamespace(this.defaultMQProducer.getNamespace());                  // 事務消息消息類型是半消息                  String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);                  if (isTrans != null && isTrans.equals("true")) {                      context.setMsgType(MessageType.Trans_Msg_Half);                  }                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {                      context.setMsgType(MessageType.Delay_Msg);                  }                     // 記錄消息軌跡                  this.executeSendMessageHookBefore(context);              }                // 構建請求頭              SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();              requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());              requestHeader.setTopic(msg.getTopic());              requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());              requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());              requestHeader.setQueueId(mq.getQueueId());              requestHeader.setSysFlag(sysFlag);              requestHeader.setBornTimestamp(System.currentTimeMillis());              requestHeader.setFlag(msg.getFlag());              requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));              requestHeader.setReconsumeTimes(0);              requestHeader.setUnitMode(this.isUnitMode());              requestHeader.setBatch(msg instanceof MessageBatch);              // 更新 %RETRY%重試topic里消息的消費時間和 最大消費次數              if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                  String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);                  if (reconsumeTimes != null) {                      requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));                      MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);                  }                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);                  if (maxReconsumeTimes != null) {                      requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));                      MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);                  }              }                SendResult sendResult = null;              switch (communicationMode) {                  // 非同步                  case ASYNC:                      Message tmpMessage = msg;                      boolean messageCloned = false;                      if (msgBodyCompressed) {                          //If msg body was compressed, msgbody should be reset using prevBody. 如果消息體被壓縮了,應該用prevBody重置msgBody                          //Clone new message using commpressed message body and recover origin massage.                          //Fix bug:https://github.com/apache/rocketmq-externals/issues/66                          tmpMessage = MessageAccessor.cloneMessage(msg);                          messageCloned = true;                          msg.setBody(prevBody);                      }                        if (topicWithNamespace) {                          if (!messageCloned) {                              tmpMessage = MessageAccessor.cloneMessage(msg);                              messageCloned = true;                          }                          msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));                      }                        // 防止發送之前的處理超時                      long costTimeAsync = System.currentTimeMillis() - beginStartTime;                      if (timeout < costTimeAsync) {                          throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");                      }                        // 下面仔細說明這個方法                       sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(                          brokerAddr,                          mq.getBrokerName(),                          tmpMessage,                          requestHeader,                          timeout - costTimeAsync,                          communicationMode,                          sendCallback,                          topicPublishInfo,                          this.mQClientFactory,                          this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),                          context,                          this);                      break;                  // 直接發送,不關心發送結果/同步消息                  case ONEWAY:                  case SYNC:                      // 防止發送之前的處理超時                      long costTimeSync = System.currentTimeMillis() - beginStartTime;                      if (timeout < costTimeSync) {                          throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");                      }                      // 下面仔細說明這個方法                      sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(                          brokerAddr,                          mq.getBrokerName(),                          msg,                          requestHeader,                          timeout - costTimeSync,                          communicationMode,                          context,                          this);                      break;                  default:                      assert false;                      break;              }                if (this.hasSendMessageHook()) {                  context.setSendResult(sendResult);                  this.executeSendMessageHookAfter(context);              }                return sendResult;          } catch (RemotingException e) {              if (this.hasSendMessageHook()) {                  context.setException(e);                  this.executeSendMessageHookAfter(context);              }              throw e;          } catch (MQBrokerException e) {              if (this.hasSendMessageHook()) {                  context.setException(e);                  this.executeSendMessageHookAfter(context);              }              throw e;          } catch (InterruptedException e) {              if (this.hasSendMessageHook()) {                  context.setException(e);                  this.executeSendMessageHookAfter(context);              }              throw e;          } finally {              msg.setBody(prevBody);              msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));          }      }        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);  }

方法簡圖

7. 使用netty發送同步消息[invokeSync]

this.mQClientFactory.getMQClientAPIImpl().sendMessage()

內部調用

  非同步消息調用的是 NettyRemotingClient.invokeAsync方法

  同步消息調用的是 NettyRemotingClient.invokeSync方法

下面跟著程式碼查看invokeSync方法

@Override  public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)      throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {      long beginStartTime = System.currentTimeMillis();      // 創建channal,如果channel為空,使用 this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));來創建channel      final Channel channel = this.getAndCreateChannel(addr);      // 活躍的channel才能發消息      if (channel != null && channel.isActive()) {          try {              // rpc鑒權              doBeforeRpcHooks(addr, request);              long costTime = System.currentTimeMillis() - beginStartTime;              if (timeoutMillis < costTime) {                  throw new RemotingTimeoutException("invokeSync call timeout");              }              //              RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);              //              doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);              return response;          } catch (RemotingSendRequestException e) {              log.warn("invokeSync: send request exception, so close the channel[{}]", addr);              this.closeChannel(addr, channel);              throw e;          } catch (RemotingTimeoutException e) {              if (nettyClientConfig.isClientCloseSocketIfTimeout()) {                  this.closeChannel(addr, channel);                  log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);              }              log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);              throw e;          }      } else {          this.closeChannel(addr, channel);          throw new RemotingConnectException(addr);      }  }

 

 

8. 使用netty發送同步消息[invokeSyncImpl]

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,      final long timeoutMillis)      throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {      final int opaque = request.getOpaque();        try {          final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);          this.responseTable.put(opaque, responseFuture);          final SocketAddress addr = channel.remoteAddress();          // 使用writeAndFlush發請求,建立返回值的監聽,這是netty-client發送消息,還有netty-server收消息          channel.writeAndFlush(request).addListener(new ChannelFutureListener() {              @Override              public void operationComplete(ChannelFuture f) throws Exception {                  if (f.isSuccess()) {                      responseFuture.setSendRequestOK(true);                      // 執行成功後return, 跳出這個監聽                      return;                  } else {                      responseFuture.setSendRequestOK(false);                  }                    responseTable.remove(opaque);                  responseFuture.setCause(f.cause());                  responseFuture.putResponse(null);                  log.warn("send a request command to channel <" + addr + "> failed.");              }          });            // 使用countDownLatch掛起執行緒,等待收到netty的返回值          RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);          // 在clinet的channelRead0將responseCommand賦值,若為空,說明超過一定時間還未獲取返回值,這時拋出異常,交由用戶處理          // 可能netty-server收到消息,也有可能沒有收到消息,不確定          if (null == responseCommand) {              if (responseFuture.isSendRequestOK()) {                  throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,                      responseFuture.getCause());              } else {                  throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());              }          }            return responseCommand;      } finally {          this.responseTable.remove(opaque);      }  }

 

 整體重要步驟

0. 用send方法發送同步消息

1. 判斷發送service是否處於running狀態

2. 檢驗發送的參數 如topic是否為空

3. 根據topicName獲取topic詳情,裡面有這個topic的隊列

4. 選擇其中一個隊列,選擇的策略是:產生一個隨機數,hash%brokeSize 然後hash+1, 同時把隨機數記錄下來,下次還是使用這個隨機數

5. 獲取broke的IP地址,如果本地沒有,則從nameserver中獲取

6. 如果開啟了vipchannel,則埠缺口

7. 嘗試壓縮消息,消息數不大於4K

8. 構建消息頭

9. 根據IP創建channel

10. rpc鑒權

11. 創建responseFuture, 並把他放到reponseTable中

12. 使用channel.writeAndFlush發起netty請求,並建立監聽

13. 使用countDownLatch掛起執行緒,等待收到netty的返回值

14. 返回結果

 結語

剛開始開發送的程式碼,覺得太長了,不想看了,現在想想他的邏輯挺清晰的,能夠幫助你看清別人的開源框架整個構建構成

======  【多學一點,for Better】======