聊聊rocketmq的MQFaultStrategy

  • 2019 年 12 月 12 日
  • 筆記

本文主要研究一下rocketmq的MQFaultStrategy

MQFaultStrategy

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/latency/MQFaultStrategy.java

public class MQFaultStrategy {      private final static InternalLogger log = ClientLogger.getLog();      private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();        private boolean sendLatencyFaultEnable = false;        private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};      private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};        public long[] getNotAvailableDuration() {          return notAvailableDuration;      }        public void setNotAvailableDuration(final long[] notAvailableDuration) {          this.notAvailableDuration = notAvailableDuration;      }        public long[] getLatencyMax() {          return latencyMax;      }        public void setLatencyMax(final long[] latencyMax) {          this.latencyMax = latencyMax;      }        public boolean isSendLatencyFaultEnable() {          return sendLatencyFaultEnable;      }        public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {          this.sendLatencyFaultEnable = sendLatencyFaultEnable;      }        public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {          if (this.sendLatencyFaultEnable) {              try {                  int index = tpInfo.getSendWhichQueue().getAndIncrement();                  for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {                      int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();                      if (pos < 0)                          pos = 0;                      MessageQueue mq = tpInfo.getMessageQueueList().get(pos);                      if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {                          if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))                              return mq;                      }                  }                    final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();                  int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);                  if (writeQueueNums > 0) {                      final MessageQueue mq = tpInfo.selectOneMessageQueue();                      if (notBestBroker != null) {                          mq.setBrokerName(notBestBroker);                          mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);                      }                      return mq;                  } else {                      latencyFaultTolerance.remove(notBestBroker);                  }              } catch (Exception e) {                  log.error("Error occurred when selecting message queue", e);              }                return tpInfo.selectOneMessageQueue();          }            return tpInfo.selectOneMessageQueue(lastBrokerName);      }        public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {          if (this.sendLatencyFaultEnable) {              long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);              this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);          }      }        private long computeNotAvailableDuration(final long currentLatency) {          for (int i = latencyMax.length - 1; i >= 0; i--) {              if (currentLatency >= latencyMax[i])                  return this.notAvailableDuration[i];          }            return 0;      }  }
  • MQFaultStrategy定义了latencyFaultTolerance、sendLatencyFaultEnable、latencyMax、notAvailableDuration属性;其selectOneMessageQueue方法在sendLatencyFaultEnable为false的时候使用的是tpInfo.selectOneMessageQueue(lastBrokerName);在sendLatencyFaultEnable为true时,先通过tpInfo.getSendWhichQueue().getAndIncrement()获取index,之后遍历tpInfo.getMessageQueueList(),计算pos(Math.abs(index++) % tpInfo.getMessageQueueList().size()),若小于0则重置为0;然后使用latencyFaultTolerance.isAvailable来判断是否可用,若可用且null == lastBrokerName或者mq.getBrokerName().equals(lastBrokerName)则返回该MessageQueue
  • selectOneMessageQueue方法在sendLatencyFaultEnable为true时,若遍历tpInfo.getMessageQueueList()都不可用,则通过latencyFaultTolerance.pickOneAtLeast()方法来选择notBestBroker,若其writeQueueNums大于0,则通过tpInfo.selectOneMessageQueue()选择MessageQueue,设置其brokerName为notBestBroker,设置其queueId为(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);若writeQueueNums小于等于0则执行latencyFaultTolerance.remove(notBestBroker);如果前面没有选出MessageQueue则最后使用tpInfo.selectOneMessageQueue()
  • 其updateFaultItem方法在sendLatencyFaultEnable为true时会使用computeNotAvailableDuration计算duration,然后通过latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration)来更新;computeNotAvailableDuration方法则从后开始遍历latencyMax,在找到currentLatency >= latencyMax[i]时返回notAvailableDuration[i],否则最后返回0

DefaultMQProducerImpl

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

public class DefaultMQProducerImpl implements MQProducerInner {      private final InternalLogger log = ClientLogger.getLog();      private final Random random = new Random();      private final DefaultMQProducer defaultMQProducer;      private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =          new ConcurrentHashMap<String, TopicPublishInfo>();      private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();      private final RPCHook rpcHook;      private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;      private final ExecutorService defaultAsyncSenderExecutor;      private final Timer timer = new Timer("RequestHouseKeepingService", true);      protected BlockingQueue<Runnable> checkRequestQueue;      protected ExecutorService checkExecutor;      private ServiceState serviceState = ServiceState.CREATE_JUST;      private MQClientInstance mQClientFactory;      private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();      private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));      private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();      private ExecutorService asyncSenderExecutor;        //......        public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {          return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);      }        public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {          this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);      }        //......  }
  • DefaultMQProducerImpl的selectOneMessageQueue、updateFaultItem方法均委托给mqFaultStrategy来执行

小结

  • MQFaultStrategy定义了latencyFaultTolerance、sendLatencyFaultEnable、latencyMax、notAvailableDuration属性;其selectOneMessageQueue方法在sendLatencyFaultEnable为false的时候使用的是tpInfo.selectOneMessageQueue(lastBrokerName);在sendLatencyFaultEnable为true时,先通过tpInfo.getSendWhichQueue().getAndIncrement()获取index,之后遍历tpInfo.getMessageQueueList(),计算pos(Math.abs(index++) % tpInfo.getMessageQueueList().size()),若小于0则重置为0;然后使用latencyFaultTolerance.isAvailable来判断是否可用,若可用且null == lastBrokerName或者mq.getBrokerName().equals(lastBrokerName)则返回该MessageQueue
  • selectOneMessageQueue方法在sendLatencyFaultEnable为true时,若遍历tpInfo.getMessageQueueList()都不可用,则通过latencyFaultTolerance.pickOneAtLeast()方法来选择notBestBroker,若其writeQueueNums大于0,则通过tpInfo.selectOneMessageQueue()选择MessageQueue,设置其brokerName为notBestBroker,设置其queueId为(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);若writeQueueNums小于等于0则执行latencyFaultTolerance.remove(notBestBroker);如果前面没有选出MessageQueue则最后使用tpInfo.selectOneMessageQueue()
  • 其updateFaultItem方法在sendLatencyFaultEnable为true时会使用computeNotAvailableDuration计算duration,然后通过latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration)来更新;computeNotAvailableDuration方法则从后开始遍历latencyMax,在找到currentLatency >= latencyMax[i]时返回notAvailableDuration[i],否则最后返回0

doc

  • MQFaultStrategy