RocketMQ-2.RocketMQ的负载均衡

  • 2020 年 3 月 15 日
  • 笔记

RocketMQ的负载均衡

producer对MessageQueue的负载均衡


通过调试代码可以知道,所谓的MessageQueue就是broker上的队列信息,每个topic在创建的时候可以指定相应的queue的数量。也就是说,一个topic的消息存储在多个主broker中

producer负载均衡

producer端的负载均衡主要是在选择对应的broker。在producer发送消息的时候会对消息进行路由,看到底是路由到哪个broker。下面主要说下以下两种发送消息的方法:系统计算路由MessageQueue自定义路由MessageQueue

系统计算路由MessageQueue

 SendResult send = producer.send(message, 60 * 1000);

系统计算路由MessageQueue的其他路由算法

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {          if (this.sendLatencyFaultEnable) {              try {                  int index = tpInfo.getSendWhichQueue().getAndIncrement();                  for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {                      int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();                      if (pos < 0)                          pos = 0;                      MessageQueue mq = tpInfo.getMessageQueueList().get(pos);                      if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {                          if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))                              return mq;                      }                  }                    final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();                  int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);                  if (writeQueueNums > 0) {                      final MessageQueue mq = tpInfo.selectOneMessageQueue();                      if (notBestBroker != null) {                          mq.setBrokerName(notBestBroker);                          mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);                      }                      return mq;                  } else {                      latencyFaultTolerance.remove(notBestBroker);                  }              } catch (Exception e) {                  log.error("Error occurred when selecting message queue", e);              }                return tpInfo.selectOneMessageQueue();          }            // 默认策略(路由到当前的broker主节点列表取模后的broker中)          return tpInfo.selectOneMessageQueue(lastBrokerName);      }  

自定义路由MessageQueue

   SendResult send = producer.send(message, new MessageQueueSelector() {                  /**                   *                   * @param mqs 通过name server返回的broker主节点列表                   * @param msg 当前消息                   * @param arg                   * @return                   */                  @Override                  public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {                      int size = mqs.size();                      long timeMillis = System.currentTimeMillis();                        return mqs.get((int)timeMillis % size);                  }              }, 60 * 1000);

Consumer的负载均衡

消费端设置负责均衡策略

consumer.statrt()中,consumer会对所订阅的topic上的messagequeue做负载均衡DefaultConsumerPushImpl.start()下的this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());, 默认返回的是AllocateMessageQueueAveragely

负责均衡策略

  1. AllocateMessageQueueAveragely

负载均衡的时机

 // RebalanceService   @Override      public void run() {          log.info(this.getServiceName() + " service started");            while (!this.isStopped()) {              this.waitForRunning(waitInterval);              // 开始进行分配              this.mqClientFactory.doRebalance();          }            log.info(this.getServiceName() + " service end");      }

具体实现

/**  consumerGroup : 消费组名称  currentCID:当前消费者实例Id(随机数)  mqAll: 该topic对应的queue的信息列表  cidAll: 消费组中所有的消费者列表    */  @Override  public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,      List<String> cidAll) {      if (currentCID == null || currentCID.length() < 1) {          throw new IllegalArgumentException("currentCID is empty");      }      if (mqAll == null || mqAll.isEmpty()) {          throw new IllegalArgumentException("mqAll is null or mqAll empty");      }      if (cidAll == null || cidAll.isEmpty()) {          throw new IllegalArgumentException("cidAll is null or cidAll empty");      }        List<MessageQueue> result = new ArrayList<MessageQueue>();      if (!cidAll.contains(currentCID)) {          log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",              consumerGroup,              currentCID,              cidAll);          return result;      }        int index = cidAll.indexOf(currentCID);      int mod = mqAll.size() % cidAll.size();      int averageSize =          mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()              + 1 : mqAll.size() / cidAll.size());      int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;      int range = Math.min(averageSize, mqAll.size() - startIndex);      for (int i = 0; i < range; i++) {          result.add(mqAll.get((startIndex + i) % mqAll.size()));      }      return result;  }