聊聊rocketmq的SERVICE_NOT_AVAILABLE

  • 2019 年 12 月 19 日
  • 筆記

本文主要研究一下rocketmq的SERVICE_NOT_AVAILABLE

SERVICE_NOT_AVAILABLE

rocketmq-all-4.6.0-source-release/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java

public class ResponseCode extends RemotingSysResponseCode {  ​      public static final int FLUSH_DISK_TIMEOUT = 10;  ​      public static final int SLAVE_NOT_AVAILABLE = 11;  ​      public static final int FLUSH_SLAVE_TIMEOUT = 12;  ​      public static final int MESSAGE_ILLEGAL = 13;  ​      public static final int SERVICE_NOT_AVAILABLE = 14;  ​      //......  ​  }
  • ResponseCode定义了SERVICE_NOT_AVAILABLE

PutMessageStatus

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java

public enum PutMessageStatus {      PUT_OK,      FLUSH_DISK_TIMEOUT,      FLUSH_SLAVE_TIMEOUT,      SLAVE_NOT_AVAILABLE,      SERVICE_NOT_AVAILABLE,      CREATE_MAPEDFILE_FAILED,      MESSAGE_ILLEGAL,      PROPERTIES_SIZE_EXCEEDED,      OS_PAGECACHE_BUSY,      UNKNOWN_ERROR,  }
  • PutMessageStatus枚举中有一个是SERVICE_NOT_AVAILABLE

DefaultMessageStore

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

public class DefaultMessageStore implements MessageStore {  ​      //......  ​      public PutMessageResult putMessage(MessageExtBrokerInner msg) {          if (this.shutdown) {              log.warn("message store has shutdown, so putMessage is forbidden");              return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);          }  ​          if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {              long value = this.printTimes.getAndIncrement();              if ((value % 50000) == 0) {                  log.warn("message store is slave mode, so putMessage is forbidden ");              }  ​              return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);          }  ​          if (!this.runningFlags.isWriteable()) {              long value = this.printTimes.getAndIncrement();              if ((value % 50000) == 0) {                  log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());              }  ​              return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);          } else {              this.printTimes.set(0);          }  ​          if (msg.getTopic().length() > Byte.MAX_VALUE) {              log.warn("putMessage message topic length too long " + msg.getTopic().length());              return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);          }  ​          if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {              log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());              return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);          }  ​          if (this.isOSPageCacheBusy()) {              return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);          }  ​          long beginTime = this.getSystemClock().now();          PutMessageResult result = this.commitLog.putMessage(msg);  ​          long elapsedTime = this.getSystemClock().now() - beginTime;          if (elapsedTime > 500) {              log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);          }          this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);  ​          if (null == result || !result.isOk()) {              this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();          }  ​          return result;      }  ​      //......  }
  • putMessage方法在shutdown为true、当前broker的role变为SLAVE、runningFlags.isWriteable()为false时都会返回PutMessageStatus.SERVICE_NOT_AVAILABLE的PutMessageResult

RunningFlags

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java

public class RunningFlags {  ​      private static final int NOT_READABLE_BIT = 1;  ​      private static final int NOT_WRITEABLE_BIT = 1 << 1;  ​      private static final int WRITE_LOGICS_QUEUE_ERROR_BIT = 1 << 2;  ​      private static final int WRITE_INDEX_FILE_ERROR_BIT = 1 << 3;  ​      private static final int DISK_FULL_BIT = 1 << 4;  ​      private volatile int flagBits = 0;  ​      public RunningFlags() {      }  ​      //......  ​      public boolean isWriteable() {          if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT | DISK_FULL_BIT | WRITE_INDEX_FILE_ERROR_BIT)) == 0) {              return true;          }  ​          return false;      }  ​      public boolean getAndMakeReadable() {          boolean result = this.isReadable();          if (!result) {              this.flagBits &= ~NOT_READABLE_BIT;          }          return result;      }  ​      public boolean getAndMakeNotReadable() {          boolean result = this.isReadable();          if (result) {              this.flagBits |= NOT_READABLE_BIT;          }          return result;      }  ​      public boolean getAndMakeWriteable() {          boolean result = this.isWriteable();          if (!result) {              this.flagBits &= ~NOT_WRITEABLE_BIT;          }          return result;      }  ​      public boolean getAndMakeNotWriteable() {          boolean result = this.isWriteable();          if (result) {              this.flagBits |= NOT_WRITEABLE_BIT;          }          return result;      }  ​      public void makeLogicsQueueError() {          this.flagBits |= WRITE_LOGICS_QUEUE_ERROR_BIT;      }  ​      public void makeIndexFileError() {          this.flagBits |= WRITE_INDEX_FILE_ERROR_BIT;      }  ​      public boolean getAndMakeDiskFull() {          boolean result = !((this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT);          this.flagBits |= DISK_FULL_BIT;          return result;      }  ​      public boolean getAndMakeDiskOK() {          boolean result = !((this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT);          this.flagBits &= ~DISK_FULL_BIT;          return result;      }  ​      //......  }
  • RunningFlags提供了isWriteable方法,在NOT_WRITEABLE_BIT、WRITE_LOGICS_QUEUE_ERROR_BIT、DISK_FULL_BIT、WRITE_INDEX_FILE_ERROR_BIT的flag下都会返回false;而getAndMakeReadable、getAndMakeNotReadable、getAndMakeWriteable、getAndMakeNotWriteable、makeLogicsQueueError、makeIndexFileError、getAndMakeDiskFull、getAndMakeDiskOK均可能改变flagBits的值

SendMessageProcessor

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {  ​      //......  ​      private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,                                                     RemotingCommand request, MessageExt msg,                                                     SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,                                                     int queueIdInt) {          if (putMessageResult == null) {              response.setCode(ResponseCode.SYSTEM_ERROR);              response.setRemark("store putMessage return null");              return response;          }          boolean sendOK = false;  ​          switch (putMessageResult.getPutMessageStatus()) {              // Success              case PUT_OK:                  sendOK = true;                  response.setCode(ResponseCode.SUCCESS);                  break;              case FLUSH_DISK_TIMEOUT:                  response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);                  sendOK = true;                  break;              case FLUSH_SLAVE_TIMEOUT:                  response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);                  sendOK = true;                  break;              case SLAVE_NOT_AVAILABLE:                  response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);                  sendOK = true;                  break;  ​              // Failed              case CREATE_MAPEDFILE_FAILED:                  response.setCode(ResponseCode.SYSTEM_ERROR);                  response.setRemark("create mapped file failed, server is busy or broken.");                  break;              case MESSAGE_ILLEGAL:              case PROPERTIES_SIZE_EXCEEDED:                  response.setCode(ResponseCode.MESSAGE_ILLEGAL);                  response.setRemark(                      "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");                  break;              case SERVICE_NOT_AVAILABLE:                  response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);                  response.setRemark(                      "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");                  break;              case OS_PAGECACHE_BUSY:                  response.setCode(ResponseCode.SYSTEM_ERROR);                  response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");                  break;              case UNKNOWN_ERROR:                  response.setCode(ResponseCode.SYSTEM_ERROR);                  response.setRemark("UNKNOWN_ERROR");                  break;              default:                  response.setCode(ResponseCode.SYSTEM_ERROR);                  response.setRemark("UNKNOWN_ERROR DEFAULT");                  break;          }  ​          String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);          if (sendOK) {  ​              this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);              this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),                  putMessageResult.getAppendMessageResult().getWroteBytes());              this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());  ​              response.setRemark(null);  ​              responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());              responseHeader.setQueueId(queueIdInt);              responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());  ​              doResponse(ctx, request, response);  ​              if (hasSendMessageHook()) {                  sendMessageContext.setMsgId(responseHeader.getMsgId());                  sendMessageContext.setQueueId(responseHeader.getQueueId());                  sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());  ​                  int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();                  int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();                  int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;  ​                  sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);                  sendMessageContext.setCommercialSendTimes(incValue);                  sendMessageContext.setCommercialSendSize(wroteSize);                  sendMessageContext.setCommercialOwner(owner);              }              return null;          } else {              if (hasSendMessageHook()) {                  int wroteSize = request.getBody().length;                  int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);  ​                  sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);                  sendMessageContext.setCommercialSendTimes(incValue);                  sendMessageContext.setCommercialSendSize(wroteSize);                  sendMessageContext.setCommercialOwner(owner);              }          }          return response;      }  ​      //......  }
  • handlePutMessageResult方法会将SERVICE_NOT_AVAILABLE转化为ResponseCode.SERVICE_NOT_AVAILABLE

小结

  • ResponseCode定义了SERVICE_NOT_AVAILABLE;PutMessageStatus枚举中有一个是SERVICE_NOT_AVAILABLE;handlePutMessageResult方法会将SERVICE_NOT_AVAILABLE转化为ResponseCode.SERVICE_NOT_AVAILABLE
  • DefaultMessageStore的putMessage方法在shutdown为true、当前broker的role变为SLAVE、runningFlags.isWriteable()为false时都会返回PutMessageStatus.SERVICE_NOT_AVAILABLE的PutMessageResult
  • RunningFlags提供了isWriteable方法,在NOT_WRITEABLE_BIT、WRITE_LOGICS_QUEUE_ERROR_BIT、DISK_FULL_BIT、WRITE_INDEX_FILE_ERROR_BIT的flag下都会返回false;而getAndMakeReadable、getAndMakeNotReadable、getAndMakeWriteable、getAndMakeNotWriteable、makeLogicsQueueError、makeIndexFileError、getAndMakeDiskFull、getAndMakeDiskOK均可能改变flagBits的值

doc