聊聊rocketmq的compressMsgBodyOverHowmuch

本文主要研究一下rocketmq的compressMsgBodyOverHowmuch

compressMsgBodyOverHowmuch

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.java

public class DefaultMQProducer extends ClientConfig implements MQProducer {        //......        /**       * Compress message body threshold, namely, message body larger than 4k will be compressed on default.       */      private int compressMsgBodyOverHowmuch = 1024 * 4;        public int getCompressMsgBodyOverHowmuch() {          return compressMsgBodyOverHowmuch;      }        public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) {          this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;      }        //......  }
  • DefaultMQProducer定义了compressMsgBodyOverHowmuch属性,默认值为4k

DefaultMQProducerImpl

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public class DefaultMQProducerImpl implements MQProducerInner {      private final InternalLogger log = ClientLogger.getLog();      private final Random random = new Random();      private final DefaultMQProducer defaultMQProducer;      private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =          new ConcurrentHashMap<String, TopicPublishInfo>();      private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();      private final RPCHook rpcHook;      protected BlockingQueue<Runnable> checkRequestQueue;      protected ExecutorService checkExecutor;      private ServiceState serviceState = ServiceState.CREATE_JUST;      private MQClientInstance mQClientFactory;      private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();      private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));        private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();        private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;      private final ExecutorService defaultAsyncSenderExecutor;      private ExecutorService asyncSenderExecutor;        //......        private SendResult sendKernelImpl(final Message msg,                                        final MessageQueue mq,                                        final CommunicationMode communicationMode,                                        final SendCallback sendCallback,                                        final TopicPublishInfo topicPublishInfo,                                        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {          long beginStartTime = System.currentTimeMillis();          String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());          if (null == brokerAddr) {              tryToFindTopicPublishInfo(mq.getTopic());              brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());          }            SendMessageContext context = null;          if (brokerAddr != null) {              brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);                byte[] prevBody = msg.getBody();              try {                  //for MessageBatch,ID has been set in the generating process                  if (!(msg instanceof MessageBatch)) {                      MessageClientIDSetter.setUniqID(msg);                  }                    boolean topicWithNamespace = false;                  if (null != this.mQClientFactory.getClientConfig().getNamespace()) {                      msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());                      topicWithNamespace = true;                  }                    int sysFlag = 0;                  boolean msgBodyCompressed = false;                  if (this.tryToCompressMessage(msg)) {                      sysFlag |= MessageSysFlag.COMPRESSED_FLAG;                      msgBodyCompressed = true;                  }                    final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);                  if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {                      sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;                  }                    if (hasCheckForbiddenHook()) {                      CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();                      checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());                      checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());                      checkForbiddenContext.setCommunicationMode(communicationMode);                      checkForbiddenContext.setBrokerAddr(brokerAddr);                      checkForbiddenContext.setMessage(msg);                      checkForbiddenContext.setMq(mq);                      checkForbiddenContext.setUnitMode(this.isUnitMode());                      this.executeCheckForbiddenHook(checkForbiddenContext);                  }                    if (this.hasSendMessageHook()) {                      context = new SendMessageContext();                      context.setProducer(this);                      context.setProducerGroup(this.defaultMQProducer.getProducerGroup());                      context.setCommunicationMode(communicationMode);                      context.setBornHost(this.defaultMQProducer.getClientIP());                      context.setBrokerAddr(brokerAddr);                      context.setMessage(msg);                      context.setMq(mq);                      context.setNamespace(this.defaultMQProducer.getNamespace());                      String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);                      if (isTrans != null && isTrans.equals("true")) {                          context.setMsgType(MessageType.Trans_Msg_Half);                      }                        if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {                          context.setMsgType(MessageType.Delay_Msg);                      }                      this.executeSendMessageHookBefore(context);                  }                    SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();                  requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());                  requestHeader.setTopic(msg.getTopic());                  requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());                  requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());                  requestHeader.setQueueId(mq.getQueueId());                  requestHeader.setSysFlag(sysFlag);                  requestHeader.setBornTimestamp(System.currentTimeMillis());                  requestHeader.setFlag(msg.getFlag());                  requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));                  requestHeader.setReconsumeTimes(0);                  requestHeader.setUnitMode(this.isUnitMode());                  requestHeader.setBatch(msg instanceof MessageBatch);                  if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                      String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);                      if (reconsumeTimes != null) {                          requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));                          MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);                      }                        String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);                      if (maxReconsumeTimes != null) {                          requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));                          MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);                      }                  }                    SendResult sendResult = null;                  switch (communicationMode) {                      case ASYNC:                          Message tmpMessage = msg;                          boolean messageCloned = false;                          if (msgBodyCompressed) {                              //If msg body was compressed, msgbody should be reset using prevBody.                              //Clone new message using commpressed message body and recover origin massage.                              //Fix bug:https://github.com/apache/rocketmq-externals/issues/66                              tmpMessage = MessageAccessor.cloneMessage(msg);                              messageCloned = true;                              msg.setBody(prevBody);                          }                            if (topicWithNamespace) {                              if (!messageCloned) {                                  tmpMessage = MessageAccessor.cloneMessage(msg);                                  messageCloned = true;                              }                              msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));                          }                            long costTimeAsync = System.currentTimeMillis() - beginStartTime;                          if (timeout < costTimeAsync) {                              throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");                          }                          sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(                              brokerAddr,                              mq.getBrokerName(),                              tmpMessage,                              requestHeader,                              timeout - costTimeAsync,                              communicationMode,                              sendCallback,                              topicPublishInfo,                              this.mQClientFactory,                              this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),                              context,                              this);                          break;                      case ONEWAY:                      case SYNC:                          long costTimeSync = System.currentTimeMillis() - beginStartTime;                          if (timeout < costTimeSync) {                              throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");                          }                          sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(                              brokerAddr,                              mq.getBrokerName(),                              msg,                              requestHeader,                              timeout - costTimeSync,                              communicationMode,                              context,                              this);                          break;                      default:                          assert false;                          break;                  }                    if (this.hasSendMessageHook()) {                      context.setSendResult(sendResult);                      this.executeSendMessageHookAfter(context);                  }                    return sendResult;              } catch (RemotingException e) {                  if (this.hasSendMessageHook()) {                      context.setException(e);                      this.executeSendMessageHookAfter(context);                  }                  throw e;              } catch (MQBrokerException e) {                  if (this.hasSendMessageHook()) {                      context.setException(e);                      this.executeSendMessageHookAfter(context);                  }                  throw e;              } catch (InterruptedException e) {                  if (this.hasSendMessageHook()) {                      context.setException(e);                      this.executeSendMessageHookAfter(context);                  }                  throw e;              } finally {                  msg.setBody(prevBody);                  msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));              }          }            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);      }        private boolean tryToCompressMessage(final Message msg) {          if (msg instanceof MessageBatch) {              //batch dose not support compressing right now              return false;          }          byte[] body = msg.getBody();          if (body != null) {              if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {                  try {                      byte[] data = UtilAll.compress(body, zipCompressLevel);                      if (data != null) {                          msg.setBody(data);                          return true;                      }                  } catch (IOException e) {                      log.error("tryToCompressMessage exception", e);                      log.warn(msg.toString());                  }              }          }            return false;      }        //......  }
  • DefaultMQProducerImpl的sendKernelImpl方法会通过tryToCompressMessage(msg)方法来决定是否压缩msgBody,返回true的话,会设置sysFlag,然后通过requestHeader传递给broker

MessageDecoder

rocketmq/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java

public class MessageDecoder {      public final static int MSG_ID_LENGTH = 8 + 8;        public final static Charset CHARSET_UTF8 = Charset.forName("UTF-8");      public final static int MESSAGE_MAGIC_CODE_POSTION = 4;      public final static int MESSAGE_FLAG_POSTION = 16;      public final static int MESSAGE_PHYSIC_OFFSET_POSTION = 28;      public final static int MESSAGE_STORE_TIMESTAMP_POSTION = 56;      public final static int MESSAGE_MAGIC_CODE = -626843481;      public static final char NAME_VALUE_SEPARATOR = 1;      public static final char PROPERTY_SEPARATOR = 2;      public static final int PHY_POS_POSITION =  4 + 4 + 4 + 4 + 4 + 8;      public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE          + 4 // 2 MAGICCODE          + 4 // 3 BODYCRC          + 4 // 4 QUEUEID          + 4 // 5 FLAG          + 8 // 6 QUEUEOFFSET          + 8 // 7 PHYSICALOFFSET          + 4 // 8 SYSFLAG          + 8 // 9 BORNTIMESTAMP          + 8 // 10 BORNHOST          + 8 // 11 STORETIMESTAMP          + 8 // 12 STOREHOSTADDRESS          + 4 // 13 RECONSUMETIMES          + 8; // 14 Prepared Transaction Offset        //......       public static byte[] encode(MessageExt messageExt, boolean needCompress) throws Exception {          byte[] body = messageExt.getBody();          byte[] topics = messageExt.getTopic().getBytes(CHARSET_UTF8);          byte topicLen = (byte) topics.length;          String properties = messageProperties2String(messageExt.getProperties());          byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);          short propertiesLength = (short) propertiesBytes.length;          int sysFlag = messageExt.getSysFlag();          byte[] newBody = messageExt.getBody();          if (needCompress && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {              newBody = UtilAll.compress(body, 5);          }          int bodyLength = newBody.length;          int storeSize = messageExt.getStoreSize();          ByteBuffer byteBuffer;          if (storeSize > 0) {              byteBuffer = ByteBuffer.allocate(storeSize);          } else {              storeSize = 4 // 1 TOTALSIZE                  + 4 // 2 MAGICCODE                  + 4 // 3 BODYCRC                  + 4 // 4 QUEUEID                  + 4 // 5 FLAG                  + 8 // 6 QUEUEOFFSET                  + 8 // 7 PHYSICALOFFSET                  + 4 // 8 SYSFLAG                  + 8 // 9 BORNTIMESTAMP                  + 8 // 10 BORNHOST                  + 8 // 11 STORETIMESTAMP                  + 8 // 12 STOREHOSTADDRESS                  + 4 // 13 RECONSUMETIMES                  + 8 // 14 Prepared Transaction Offset                  + 4 + bodyLength // 14 BODY                  + 1 + topicLen // 15 TOPIC                  + 2 + propertiesLength // 16 propertiesLength                  + 0;              byteBuffer = ByteBuffer.allocate(storeSize);          }          // 1 TOTALSIZE          byteBuffer.putInt(storeSize);            // 2 MAGICCODE          byteBuffer.putInt(MESSAGE_MAGIC_CODE);            // 3 BODYCRC          int bodyCRC = messageExt.getBodyCRC();          byteBuffer.putInt(bodyCRC);            // 4 QUEUEID          int queueId = messageExt.getQueueId();          byteBuffer.putInt(queueId);            // 5 FLAG          int flag = messageExt.getFlag();          byteBuffer.putInt(flag);            // 6 QUEUEOFFSET          long queueOffset = messageExt.getQueueOffset();          byteBuffer.putLong(queueOffset);            // 7 PHYSICALOFFSET          long physicOffset = messageExt.getCommitLogOffset();          byteBuffer.putLong(physicOffset);            // 8 SYSFLAG          byteBuffer.putInt(sysFlag);            // 9 BORNTIMESTAMP          long bornTimeStamp = messageExt.getBornTimestamp();          byteBuffer.putLong(bornTimeStamp);            // 10 BORNHOST          InetSocketAddress bornHost = (InetSocketAddress) messageExt.getBornHost();          byteBuffer.put(bornHost.getAddress().getAddress());          byteBuffer.putInt(bornHost.getPort());            // 11 STORETIMESTAMP          long storeTimestamp = messageExt.getStoreTimestamp();          byteBuffer.putLong(storeTimestamp);            // 12 STOREHOST          InetSocketAddress serverHost = (InetSocketAddress) messageExt.getStoreHost();          byteBuffer.put(serverHost.getAddress().getAddress());          byteBuffer.putInt(serverHost.getPort());            // 13 RECONSUMETIMES          int reconsumeTimes = messageExt.getReconsumeTimes();          byteBuffer.putInt(reconsumeTimes);            // 14 Prepared Transaction Offset          long preparedTransactionOffset = messageExt.getPreparedTransactionOffset();          byteBuffer.putLong(preparedTransactionOffset);            // 15 BODY          byteBuffer.putInt(bodyLength);          byteBuffer.put(newBody);            // 16 TOPIC          byteBuffer.put(topicLen);          byteBuffer.put(topics);            // 17 properties          byteBuffer.putShort(propertiesLength);          byteBuffer.put(propertiesBytes);            return byteBuffer.array();      }        public static MessageExt decode(          java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody, final boolean isClient) {          try {                MessageExt msgExt;              if (isClient) {                  msgExt = new MessageClientExt();              } else {                  msgExt = new MessageExt();              }                // 1 TOTALSIZE              int storeSize = byteBuffer.getInt();              msgExt.setStoreSize(storeSize);                // 2 MAGICCODE              byteBuffer.getInt();                // 3 BODYCRC              int bodyCRC = byteBuffer.getInt();              msgExt.setBodyCRC(bodyCRC);                // 4 QUEUEID              int queueId = byteBuffer.getInt();              msgExt.setQueueId(queueId);                // 5 FLAG              int flag = byteBuffer.getInt();              msgExt.setFlag(flag);                // 6 QUEUEOFFSET              long queueOffset = byteBuffer.getLong();              msgExt.setQueueOffset(queueOffset);                // 7 PHYSICALOFFSET              long physicOffset = byteBuffer.getLong();              msgExt.setCommitLogOffset(physicOffset);                // 8 SYSFLAG              int sysFlag = byteBuffer.getInt();              msgExt.setSysFlag(sysFlag);                // 9 BORNTIMESTAMP              long bornTimeStamp = byteBuffer.getLong();              msgExt.setBornTimestamp(bornTimeStamp);                // 10 BORNHOST              byte[] bornHost = new byte[4];              byteBuffer.get(bornHost, 0, 4);              int port = byteBuffer.getInt();              msgExt.setBornHost(new InetSocketAddress(InetAddress.getByAddress(bornHost), port));                // 11 STORETIMESTAMP              long storeTimestamp = byteBuffer.getLong();              msgExt.setStoreTimestamp(storeTimestamp);                // 12 STOREHOST              byte[] storeHost = new byte[4];              byteBuffer.get(storeHost, 0, 4);              port = byteBuffer.getInt();              msgExt.setStoreHost(new InetSocketAddress(InetAddress.getByAddress(storeHost), port));                // 13 RECONSUMETIMES              int reconsumeTimes = byteBuffer.getInt();              msgExt.setReconsumeTimes(reconsumeTimes);                // 14 Prepared Transaction Offset              long preparedTransactionOffset = byteBuffer.getLong();              msgExt.setPreparedTransactionOffset(preparedTransactionOffset);                // 15 BODY              int bodyLen = byteBuffer.getInt();              if (bodyLen > 0) {                  if (readBody) {                      byte[] body = new byte[bodyLen];                      byteBuffer.get(body);                        // uncompress body                      if (deCompressBody && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {                          body = UtilAll.uncompress(body);                      }                        msgExt.setBody(body);                  } else {                      byteBuffer.position(byteBuffer.position() + bodyLen);                  }              }                // 16 TOPIC              byte topicLen = byteBuffer.get();              byte[] topic = new byte[(int) topicLen];              byteBuffer.get(topic);              msgExt.setTopic(new String(topic, CHARSET_UTF8));                // 17 properties              short propertiesLength = byteBuffer.getShort();              if (propertiesLength > 0) {                  byte[] properties = new byte[propertiesLength];                  byteBuffer.get(properties);                  String propertiesString = new String(properties, CHARSET_UTF8);                  Map<String, String> map = string2messageProperties(propertiesString);                  msgExt.setProperties(map);              }                ByteBuffer byteBufferMsgId = ByteBuffer.allocate(MSG_ID_LENGTH);              String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset());              msgExt.setMsgId(msgId);                if (isClient) {                  ((MessageClientExt) msgExt).setOffsetMsgId(msgId);              }                return msgExt;          } catch (Exception e) {              byteBuffer.position(byteBuffer.limit());          }            return null;      }        //......  }    
  • MessageDecoder的encode会根据sysFlag判断是否需要压缩,是的话执行UtilAll.compress(body, 5);decode方法会根据根据sysFlag判断是否需要解压缩,是的话执行UtilAll.uncompress(body)

小结

DefaultMQProducerImpl的sendKernelImpl方法会通过tryToCompressMessage(msg)方法来决定是否压缩msgBody,返回true的话,会设置sysFlag,然后通过requestHeader传递给broker;MessageDecoder的encode会根据sysFlag判断是否需要压缩,是的话执行UtilAll.compress(body, 5);decode方法会根据根据sysFlag判断是否需要解压缩,是的话执行UtilAll.uncompress(body)

doc

  • DefaultMQProducer