聊聊rocketmq的ConsumeFromWhere

  • 2019 年 11 月 28 日
  • 笔记

本文主要研究一下rocketmq的ConsumeFromWhere

ConsumeFromWhere

rocketmq-common-4.5.2-sources.jar!/org/apache/rocketmq/common/consumer/ConsumeFromWhere.java

public enum ConsumeFromWhere {      CONSUME_FROM_LAST_OFFSET,        @Deprecated      CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,      @Deprecated      CONSUME_FROM_MIN_OFFSET,      @Deprecated      CONSUME_FROM_MAX_OFFSET,      CONSUME_FROM_FIRST_OFFSET,      CONSUME_FROM_TIMESTAMP,  }
  • ConsumeFromWhere定义了CONSUME_FROM_LAST_OFFSET、CONSUME_FROM_FIRST_OFFSET、CONSUME_FROM_TIMESTAMP枚举值

computePullFromWhere

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java

public class RebalancePushImpl extends RebalanceImpl {    //......        @Override      public long computePullFromWhere(MessageQueue mq) {          long result = -1;          final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();          final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();          switch (consumeFromWhere) {              case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:              case CONSUME_FROM_MIN_OFFSET:              case CONSUME_FROM_MAX_OFFSET:              case CONSUME_FROM_LAST_OFFSET: {                  long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);                  if (lastOffset >= 0) {                      result = lastOffset;                  }                  // First start,no offset                  else if (-1 == lastOffset) {                      if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                          result = 0L;                      } else {                          try {                              result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);                          } catch (MQClientException e) {                              result = -1;                          }                      }                  } else {                      result = -1;                  }                  break;              }              case CONSUME_FROM_FIRST_OFFSET: {                  long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);                  if (lastOffset >= 0) {                      result = lastOffset;                  } else if (-1 == lastOffset) {                      result = 0L;                  } else {                      result = -1;                  }                  break;              }              case CONSUME_FROM_TIMESTAMP: {                  long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);                  if (lastOffset >= 0) {                      result = lastOffset;                  } else if (-1 == lastOffset) {                      if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                          try {                              result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);                          } catch (MQClientException e) {                              result = -1;                          }                      } else {                          try {                              long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),                                  UtilAll.YYYYMMDDHHMMSS).getTime();                              result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);                          } catch (MQClientException e) {                              result = -1;                          }                      }                  } else {                      result = -1;                  }                  break;              }                default:                  break;          }            return result;      }        //......  }
  • RebalancePushImpl的computePullFromWhere会判断defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere()
  • 对于CONSUME_FROM_LAST_OFFSET,若offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE)大于等于0,则更新result为该值,若lastOffset为-1,则在mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)时更新result为0,否则更新result为mQClientFactory.getMQAdminImpl().maxOffset(mq)
  • 对于CONSUME_FROM_FIRST_OFFSET,若offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE)大于等于0,则更新result为该值,若lastOffset为-1,则更新result为0;对于CONSUME_FROM_TIMESTAMP,若offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE)大于等于0,则更新result为该值;若lastOffset为-1,则对于mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)取mQClientFactory.getMQAdminImpl().maxOffset(mq),否则取defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp()去搜索QClientFactory.getMQAdminImpl().searchOffset,将返回值更新到result

小结

  • ConsumeFromWhere定义了CONSUME_FROM_LAST_OFFSET、CONSUME_FROM_FIRST_OFFSET、CONSUME_FROM_TIMESTAMP枚举值;RebalancePushImpl的computePullFromWhere会判断defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere()
  • 若offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE)大于等于0,则更新result为该值;对于lastOffset为-1且mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX),CONSUME_FROM_LAST_OFFSET取0,CONSUME_FROM_TIMESTAMP取mQClientFactory.getMQAdminImpl().maxOffset(mq)
  • 对于lastOffset为-1但是非q.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)的情况,CONSUME_FROM_LAST_OFFSET取mQClientFactory.getMQAdminImpl().maxOffset(mq),CONSUME_FROM_TIMESTAMP取mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp)

doc

  • ConsumeFromWhere