聊聊rocketmq的ScheduleMessageService

  • 2019 年 12 月 24 日
  • 筆記

本文主要研究一下rocketmq的ScheduleMessageService

ScheduleMessageService

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java

public class ScheduleMessageService extends ConfigManager {      private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);        public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";      private static final long FIRST_DELAY_TIME = 1000L;      private static final long DELAY_FOR_A_WHILE = 100L;      private static final long DELAY_FOR_A_PERIOD = 10000L;        private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =          new ConcurrentHashMap<Integer, Long>(32);        private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =          new ConcurrentHashMap<Integer, Long>(32);      private final DefaultMessageStore defaultMessageStore;      private final AtomicBoolean started = new AtomicBoolean(false);      private Timer timer;      private MessageStore writeMessageStore;      private int maxDelayLevel;        public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) {          this.defaultMessageStore = defaultMessageStore;          this.writeMessageStore = defaultMessageStore;      }        public static int queueId2DelayLevel(final int queueId) {          return queueId + 1;      }        public static int delayLevel2QueueId(final int delayLevel) {          return delayLevel - 1;      }        //......        private void updateOffset(int delayLevel, long offset) {          this.offsetTable.put(delayLevel, offset);      }        public long computeDeliverTimestamp(final int delayLevel, final long storeTimestamp) {          Long time = this.delayLevelTable.get(delayLevel);          if (time != null) {              return time + storeTimestamp;          }            return storeTimestamp + 1000;      }        public void start() {          if (started.compareAndSet(false, true)) {              this.timer = new Timer("ScheduleMessageTimerThread", true);              for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {                  Integer level = entry.getKey();                  Long timeDelay = entry.getValue();                  Long offset = this.offsetTable.get(level);                  if (null == offset) {                      offset = 0L;                  }                    if (timeDelay != null) {                      this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);                  }              }                this.timer.scheduleAtFixedRate(new TimerTask() {                    @Override                  public void run() {                      try {                          if (started.get()) ScheduleMessageService.this.persist();                      } catch (Throwable e) {                          log.error("scheduleAtFixedRate flush exception", e);                      }                  }              }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());          }      }        public void shutdown() {          if (this.started.compareAndSet(true, false)) {              if (null != this.timer)                  this.timer.cancel();          }        }        //......        public boolean load() {          boolean result = super.load();          result = result && this.parseDelayLevel();          return result;      }        @Override      public String configFilePath() {          return StorePathConfigHelper.getDelayOffsetStorePath(this.defaultMessageStore.getMessageStoreConfig()              .getStorePathRootDir());      }        @Override      public void decode(String jsonString) {          if (jsonString != null) {              DelayOffsetSerializeWrapper delayOffsetSerializeWrapper =                  DelayOffsetSerializeWrapper.fromJson(jsonString, DelayOffsetSerializeWrapper.class);              if (delayOffsetSerializeWrapper != null) {                  this.offsetTable.putAll(delayOffsetSerializeWrapper.getOffsetTable());              }          }      }        public String encode(final boolean prettyFormat) {          DelayOffsetSerializeWrapper delayOffsetSerializeWrapper = new DelayOffsetSerializeWrapper();          delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable);          return delayOffsetSerializeWrapper.toJson(prettyFormat);      }        public boolean parseDelayLevel() {          HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();          timeUnitTable.put("s", 1000L);          timeUnitTable.put("m", 1000L * 60);          timeUnitTable.put("h", 1000L * 60 * 60);          timeUnitTable.put("d", 1000L * 60 * 60 * 24);            String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();          try {              String[] levelArray = levelString.split(" ");              for (int i = 0; i < levelArray.length; i++) {                  String value = levelArray[i];                  String ch = value.substring(value.length() - 1);                  Long tu = timeUnitTable.get(ch);                    int level = i + 1;                  if (level > this.maxDelayLevel) {                      this.maxDelayLevel = level;                  }                  long num = Long.parseLong(value.substring(0, value.length() - 1));                  long delayTimeMillis = tu * num;                  this.delayLevelTable.put(level, delayTimeMillis);              }          } catch (Exception e) {              log.error("parseDelayLevel exception", e);              log.info("levelString String = {}", levelString);              return false;          }            return true;      }        //......  }
  • ScheduleMessageService继承了ConfigManager;定义了delayLevelTable,其key为level,value为delay timeMillis;其start方法会先延时FIRST_DELAY_TIME调度DeliverDelayedMessageTimerTask;之后注册了另一个定时任务,每隔flushDelayOffsetInterval执行persist方法(ConfigManager.persist)

DeliverDelayedMessageTimerTask

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java

    class DeliverDelayedMessageTimerTask extends TimerTask {          private final int delayLevel;          private final long offset;            public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {              this.delayLevel = delayLevel;              this.offset = offset;          }            @Override          public void run() {              try {                  if (isStarted()) {                      this.executeOnTimeup();                  }              } catch (Exception e) {                  // XXX: warn and notify me                  log.error("ScheduleMessageService, executeOnTimeup exception", e);                  ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(                      this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);              }          }            /**           * @return           */          private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {                long result = deliverTimestamp;                long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);              if (deliverTimestamp > maxTimestamp) {                  result = now;              }                return result;          }            public void executeOnTimeup() {              ConsumeQueue cq =                  ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,                      delayLevel2QueueId(delayLevel));                long failScheduleOffset = offset;                if (cq != null) {                  SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);                  if (bufferCQ != null) {                      try {                          long nextOffset = offset;                          int i = 0;                          ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();                          for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {                              long offsetPy = bufferCQ.getByteBuffer().getLong();                              int sizePy = bufferCQ.getByteBuffer().getInt();                              long tagsCode = bufferCQ.getByteBuffer().getLong();                                if (cq.isExtAddr(tagsCode)) {                                  if (cq.getExt(tagsCode, cqExtUnit)) {                                      tagsCode = cqExtUnit.getTagsCode();                                  } else {                                      //can't find ext content.So re compute tags code.                                      log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",                                          tagsCode, offsetPy, sizePy);                                      long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);                                      tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);                                  }                              }                                long now = System.currentTimeMillis();                              long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);                                nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);                                long countdown = deliverTimestamp - now;                                if (countdown <= 0) {                                  MessageExt msgExt =                                      ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(                                          offsetPy, sizePy);                                    if (msgExt != null) {                                      try {                                          MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);                                          PutMessageResult putMessageResult =                                              ScheduleMessageService.this.writeMessageStore                                                  .putMessage(msgInner);                                            if (putMessageResult != null                                              && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {                                              continue;                                          } else {                                              // XXX: warn and notify me                                              log.error(                                                  "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",                                                  msgExt.getTopic(), msgExt.getMsgId());                                              ScheduleMessageService.this.timer.schedule(                                                  new DeliverDelayedMessageTimerTask(this.delayLevel,                                                      nextOffset), DELAY_FOR_A_PERIOD);                                              ScheduleMessageService.this.updateOffset(this.delayLevel,                                                  nextOffset);                                              return;                                          }                                      } catch (Exception e) {                                          /*                                           * XXX: warn and notify me                                                 */                                          log.error(                                              "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="                                                  + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="                                                  + offsetPy + ",sizePy=" + sizePy, e);                                      }                                  }                              } else {                                  ScheduleMessageService.this.timer.schedule(                                      new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),                                      countdown);                                  ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);                                  return;                              }                          } // end of for                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);                          ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(                              this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);                          ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);                          return;                      } finally {                            bufferCQ.release();                      }                  } // end of if (bufferCQ != null)                  else {                        long cqMinOffset = cq.getMinOffsetInQueue();                      if (offset < cqMinOffset) {                          failScheduleOffset = cqMinOffset;                          log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="                              + cqMinOffset + ", queueId=" + cq.getQueueId());                      }                  }              } // end of if (cq != null)                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,                  failScheduleOffset), DELAY_FOR_A_WHILE);          }            private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {              MessageExtBrokerInner msgInner = new MessageExtBrokerInner();              msgInner.setBody(msgExt.getBody());              msgInner.setFlag(msgExt.getFlag());              MessageAccessor.setProperties(msgInner, msgExt.getProperties());                TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());              long tagsCodeValue =                  MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());              msgInner.setTagsCode(tagsCodeValue);              msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));                msgInner.setSysFlag(msgExt.getSysFlag());              msgInner.setBornTimestamp(msgExt.getBornTimestamp());              msgInner.setBornHost(msgExt.getBornHost());              msgInner.setStoreHost(msgExt.getStoreHost());              msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());                msgInner.setWaitStoreMsgOK(false);              MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);                msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));                String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);              int queueId = Integer.parseInt(queueIdStr);              msgInner.setQueueId(queueId);                return msgInner;          }      }
  • DeliverDelayedMessageTimerTask继承了TimerTask,其run方法执行executeOnTimeup,若出现异常则使用timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, this.offset), DELAY_FOR_A_PERIOD)重新调度
  • executeOnTimeup方法首先通过defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel))方法找到ConsumeQueue,然后取出SelectMappedBufferResult,进行遍历计算tagsCode,从而通过correctDeliverTimestamp方法计算deliverTimestamp
  • 若deliverTimestamp小于等于当前时间则构造MessageExtBrokerInner然后执行writeMessageStore.putMessage(msgInner);没有put成功则重新调度DeliverDelayedMessageTimerTask;如果deliverTimestamp大于当前时间也会重新调度DeliverDelayedMessageTimerTask

小结

ScheduleMessageService继承了ConfigManager;定义了delayLevelTable,其key为level,value为delay timeMillis;其start方法会先延时FIRST_DELAY_TIME调度DeliverDelayedMessageTimerTask;之后注册了另一个定时任务,每隔flushDelayOffsetInterval执行persist方法(ConfigManager.persist)

doc

  • ScheduleMessageService