聊聊rocketmq的AllocateMessageQueueAveragely

  • 2019 年 11 月 30 日
  • 筆記

本文主要研究一下rocketmq的AllocateMessageQueueAveragely

AllocateMessageQueueStrategy

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java

public interface AllocateMessageQueueStrategy {        /**       * Allocating by consumer id       *       * @param consumerGroup current consumer group       * @param currentCID current consumer id       * @param mqAll message queue set in current topic       * @param cidAll consumer set in current consumer group       * @return The allocate result of given strategy       */      List<MessageQueue> allocate(          final String consumerGroup,          final String currentCID,          final List<MessageQueue> mqAll,          final List<String> cidAll      );        /**       * Algorithm name       *       * @return The strategy name       */      String getName();  }
  • AllocateMessageQueueStrategy定义了allocate、getName方法

AllocateMessageQueueAveragely

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {      private final InternalLogger log = ClientLogger.getLog();        @Override      public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,          List<String> cidAll) {          if (currentCID == null || currentCID.length() < 1) {              throw new IllegalArgumentException("currentCID is empty");          }          if (mqAll == null || mqAll.isEmpty()) {              throw new IllegalArgumentException("mqAll is null or mqAll empty");          }          if (cidAll == null || cidAll.isEmpty()) {              throw new IllegalArgumentException("cidAll is null or cidAll empty");          }            List<MessageQueue> result = new ArrayList<MessageQueue>();          if (!cidAll.contains(currentCID)) {              log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",                  consumerGroup,                  currentCID,                  cidAll);              return result;          }            int index = cidAll.indexOf(currentCID);          int mod = mqAll.size() % cidAll.size();          int averageSize =              mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()                  + 1 : mqAll.size() / cidAll.size());          int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;          int range = Math.min(averageSize, mqAll.size() - startIndex);          for (int i = 0; i < range; i++) {              result.add(mqAll.get((startIndex + i) % mqAll.size()));          }          return result;      }        @Override      public String getName() {          return "AVG";      }  }
  • AllocateMessageQueueAveragely实现了AllocateMessageQueueStrategy接口,其getName返还AVG,其allocate方法首先计算index(cidAll.indexOf(currentCID)),然后计算mod(mqAll.size() % cidAll.size())
  • 之后计算averageSize,若mqAll.size()小于等于cidAll.size()则取1,否则取mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size()
  • 最后计算startIndex((mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod)及range(Math.min(averageSize, mqAll.size() - startIndex));之后按range进行循环,将下标为(startIndex + i) % mqAll.size()的消息添加到result

小结

AllocateMessageQueueAveragely实现了AllocateMessageQueueStrategy接口,其getName返还AVG,其allocate方法按range进行循环,将下标为(startIndex + i) % mqAll.size()的消息添加到result