聊聊rocketmq的compressMsgBodyOverHowmuch
- 2019 年 11 月 9 日
- 筆記
- javascript, sync
序
本文主要研究一下rocketmq的compressMsgBodyOverHowmuch
compressMsgBodyOverHowmuch
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.java
public class DefaultMQProducer extends ClientConfig implements MQProducer { //...... /** * Compress message body threshold, namely, message body larger than 4k will be compressed on default. */ private int compressMsgBodyOverHowmuch = 1024 * 4; public int getCompressMsgBodyOverHowmuch() { return compressMsgBodyOverHowmuch; } public void setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch) { this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch; } //...... }
- DefaultMQProducer定义了compressMsgBodyOverHowmuch属性,默认值为4k
DefaultMQProducerImpl
rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
public class DefaultMQProducerImpl implements MQProducerInner { private final InternalLogger log = ClientLogger.getLog(); private final Random random = new Random(); private final DefaultMQProducer defaultMQProducer; private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>(); private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>(); private final RPCHook rpcHook; protected BlockingQueue<Runnable> checkRequestQueue; protected ExecutorService checkExecutor; private ServiceState serviceState = ServiceState.CREATE_JUST; private MQClientInstance mQClientFactory; private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>(); private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5")); private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue; private final ExecutorService defaultAsyncSenderExecutor; private ExecutorService asyncSenderExecutor; //...... private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } SendMessageContext context = null; if (brokerAddr != null) { brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte[] prevBody = msg.getBody(); try { //for MessageBatch,ID has been set in the generating process if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } boolean topicWithNamespace = false; if (null != this.mQClientFactory.getClientConfig().getNamespace()) { msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace()); topicWithNamespace = true; } int sysFlag = 0; boolean msgBodyCompressed = false; if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true; } final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } if (hasCheckForbiddenHook()) { CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this.isUnitMode()); this.executeCheckForbiddenHook(checkForbiddenContext); } if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); context.setNamespace(this.defaultMQProducer.getNamespace()); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } this.executeSendMessageHookBefore(context); } SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } SendResult sendResult = null; switch (communicationMode) { case ASYNC: Message tmpMessage = msg; boolean messageCloned = false; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; msg.setBody(prevBody); } if (topicWithNamespace) { if (!messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; } msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; } if (this.hasSendMessageHook()) { context.setSendResult(sendResult); this.executeSendMessageHookAfter(context); } return sendResult; } catch (RemotingException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (MQBrokerException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (InterruptedException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } finally { msg.setBody(prevBody); msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } } throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); } private boolean tryToCompressMessage(final Message msg) { if (msg instanceof MessageBatch) { //batch dose not support compressing right now return false; } byte[] body = msg.getBody(); if (body != null) { if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) { try { byte[] data = UtilAll.compress(body, zipCompressLevel); if (data != null) { msg.setBody(data); return true; } } catch (IOException e) { log.error("tryToCompressMessage exception", e); log.warn(msg.toString()); } } } return false; } //...... }
- DefaultMQProducerImpl的sendKernelImpl方法会通过tryToCompressMessage(msg)方法来决定是否压缩msgBody,返回true的话,会设置sysFlag,然后通过requestHeader传递给broker
MessageDecoder
rocketmq/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
public class MessageDecoder { public final static int MSG_ID_LENGTH = 8 + 8; public final static Charset CHARSET_UTF8 = Charset.forName("UTF-8"); public final static int MESSAGE_MAGIC_CODE_POSTION = 4; public final static int MESSAGE_FLAG_POSTION = 16; public final static int MESSAGE_PHYSIC_OFFSET_POSTION = 28; public final static int MESSAGE_STORE_TIMESTAMP_POSTION = 56; public final static int MESSAGE_MAGIC_CODE = -626843481; public static final char NAME_VALUE_SEPARATOR = 1; public static final char PROPERTY_SEPARATOR = 2; public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8; public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE + 4 // 2 MAGICCODE + 4 // 3 BODYCRC + 4 // 4 QUEUEID + 4 // 5 FLAG + 8 // 6 QUEUEOFFSET + 8 // 7 PHYSICALOFFSET + 4 // 8 SYSFLAG + 8 // 9 BORNTIMESTAMP + 8 // 10 BORNHOST + 8 // 11 STORETIMESTAMP + 8 // 12 STOREHOSTADDRESS + 4 // 13 RECONSUMETIMES + 8; // 14 Prepared Transaction Offset //...... public static byte[] encode(MessageExt messageExt, boolean needCompress) throws Exception { byte[] body = messageExt.getBody(); byte[] topics = messageExt.getTopic().getBytes(CHARSET_UTF8); byte topicLen = (byte) topics.length; String properties = messageProperties2String(messageExt.getProperties()); byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8); short propertiesLength = (short) propertiesBytes.length; int sysFlag = messageExt.getSysFlag(); byte[] newBody = messageExt.getBody(); if (needCompress && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) { newBody = UtilAll.compress(body, 5); } int bodyLength = newBody.length; int storeSize = messageExt.getStoreSize(); ByteBuffer byteBuffer; if (storeSize > 0) { byteBuffer = ByteBuffer.allocate(storeSize); } else { storeSize = 4 // 1 TOTALSIZE + 4 // 2 MAGICCODE + 4 // 3 BODYCRC + 4 // 4 QUEUEID + 4 // 5 FLAG + 8 // 6 QUEUEOFFSET + 8 // 7 PHYSICALOFFSET + 4 // 8 SYSFLAG + 8 // 9 BORNTIMESTAMP + 8 // 10 BORNHOST + 8 // 11 STORETIMESTAMP + 8 // 12 STOREHOSTADDRESS + 4 // 13 RECONSUMETIMES + 8 // 14 Prepared Transaction Offset + 4 + bodyLength // 14 BODY + 1 + topicLen // 15 TOPIC + 2 + propertiesLength // 16 propertiesLength + 0; byteBuffer = ByteBuffer.allocate(storeSize); } // 1 TOTALSIZE byteBuffer.putInt(storeSize); // 2 MAGICCODE byteBuffer.putInt(MESSAGE_MAGIC_CODE); // 3 BODYCRC int bodyCRC = messageExt.getBodyCRC(); byteBuffer.putInt(bodyCRC); // 4 QUEUEID int queueId = messageExt.getQueueId(); byteBuffer.putInt(queueId); // 5 FLAG int flag = messageExt.getFlag(); byteBuffer.putInt(flag); // 6 QUEUEOFFSET long queueOffset = messageExt.getQueueOffset(); byteBuffer.putLong(queueOffset); // 7 PHYSICALOFFSET long physicOffset = messageExt.getCommitLogOffset(); byteBuffer.putLong(physicOffset); // 8 SYSFLAG byteBuffer.putInt(sysFlag); // 9 BORNTIMESTAMP long bornTimeStamp = messageExt.getBornTimestamp(); byteBuffer.putLong(bornTimeStamp); // 10 BORNHOST InetSocketAddress bornHost = (InetSocketAddress) messageExt.getBornHost(); byteBuffer.put(bornHost.getAddress().getAddress()); byteBuffer.putInt(bornHost.getPort()); // 11 STORETIMESTAMP long storeTimestamp = messageExt.getStoreTimestamp(); byteBuffer.putLong(storeTimestamp); // 12 STOREHOST InetSocketAddress serverHost = (InetSocketAddress) messageExt.getStoreHost(); byteBuffer.put(serverHost.getAddress().getAddress()); byteBuffer.putInt(serverHost.getPort()); // 13 RECONSUMETIMES int reconsumeTimes = messageExt.getReconsumeTimes(); byteBuffer.putInt(reconsumeTimes); // 14 Prepared Transaction Offset long preparedTransactionOffset = messageExt.getPreparedTransactionOffset(); byteBuffer.putLong(preparedTransactionOffset); // 15 BODY byteBuffer.putInt(bodyLength); byteBuffer.put(newBody); // 16 TOPIC byteBuffer.put(topicLen); byteBuffer.put(topics); // 17 properties byteBuffer.putShort(propertiesLength); byteBuffer.put(propertiesBytes); return byteBuffer.array(); } public static MessageExt decode( java.nio.ByteBuffer byteBuffer, final boolean readBody, final boolean deCompressBody, final boolean isClient) { try { MessageExt msgExt; if (isClient) { msgExt = new MessageClientExt(); } else { msgExt = new MessageExt(); } // 1 TOTALSIZE int storeSize = byteBuffer.getInt(); msgExt.setStoreSize(storeSize); // 2 MAGICCODE byteBuffer.getInt(); // 3 BODYCRC int bodyCRC = byteBuffer.getInt(); msgExt.setBodyCRC(bodyCRC); // 4 QUEUEID int queueId = byteBuffer.getInt(); msgExt.setQueueId(queueId); // 5 FLAG int flag = byteBuffer.getInt(); msgExt.setFlag(flag); // 6 QUEUEOFFSET long queueOffset = byteBuffer.getLong(); msgExt.setQueueOffset(queueOffset); // 7 PHYSICALOFFSET long physicOffset = byteBuffer.getLong(); msgExt.setCommitLogOffset(physicOffset); // 8 SYSFLAG int sysFlag = byteBuffer.getInt(); msgExt.setSysFlag(sysFlag); // 9 BORNTIMESTAMP long bornTimeStamp = byteBuffer.getLong(); msgExt.setBornTimestamp(bornTimeStamp); // 10 BORNHOST byte[] bornHost = new byte[4]; byteBuffer.get(bornHost, 0, 4); int port = byteBuffer.getInt(); msgExt.setBornHost(new InetSocketAddress(InetAddress.getByAddress(bornHost), port)); // 11 STORETIMESTAMP long storeTimestamp = byteBuffer.getLong(); msgExt.setStoreTimestamp(storeTimestamp); // 12 STOREHOST byte[] storeHost = new byte[4]; byteBuffer.get(storeHost, 0, 4); port = byteBuffer.getInt(); msgExt.setStoreHost(new InetSocketAddress(InetAddress.getByAddress(storeHost), port)); // 13 RECONSUMETIMES int reconsumeTimes = byteBuffer.getInt(); msgExt.setReconsumeTimes(reconsumeTimes); // 14 Prepared Transaction Offset long preparedTransactionOffset = byteBuffer.getLong(); msgExt.setPreparedTransactionOffset(preparedTransactionOffset); // 15 BODY int bodyLen = byteBuffer.getInt(); if (bodyLen > 0) { if (readBody) { byte[] body = new byte[bodyLen]; byteBuffer.get(body); // uncompress body if (deCompressBody && (sysFlag & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) { body = UtilAll.uncompress(body); } msgExt.setBody(body); } else { byteBuffer.position(byteBuffer.position() + bodyLen); } } // 16 TOPIC byte topicLen = byteBuffer.get(); byte[] topic = new byte[(int) topicLen]; byteBuffer.get(topic); msgExt.setTopic(new String(topic, CHARSET_UTF8)); // 17 properties short propertiesLength = byteBuffer.getShort(); if (propertiesLength > 0) { byte[] properties = new byte[propertiesLength]; byteBuffer.get(properties); String propertiesString = new String(properties, CHARSET_UTF8); Map<String, String> map = string2messageProperties(propertiesString); msgExt.setProperties(map); } ByteBuffer byteBufferMsgId = ByteBuffer.allocate(MSG_ID_LENGTH); String msgId = createMessageId(byteBufferMsgId, msgExt.getStoreHostBytes(), msgExt.getCommitLogOffset()); msgExt.setMsgId(msgId); if (isClient) { ((MessageClientExt) msgExt).setOffsetMsgId(msgId); } return msgExt; } catch (Exception e) { byteBuffer.position(byteBuffer.limit()); } return null; } //...... }
- MessageDecoder的encode会根据sysFlag判断是否需要压缩,是的话执行UtilAll.compress(body, 5);decode方法会根据根据sysFlag判断是否需要解压缩,是的话执行UtilAll.uncompress(body)
小结
DefaultMQProducerImpl的sendKernelImpl方法会通过tryToCompressMessage(msg)方法来决定是否压缩msgBody,返回true的话,会设置sysFlag,然后通过requestHeader传递给broker;MessageDecoder的encode会根据sysFlag判断是否需要压缩,是的话执行UtilAll.compress(body, 5);decode方法会根据根据sysFlag判断是否需要解压缩,是的话执行UtilAll.uncompress(body)
doc
- DefaultMQProducer