聊聊rocketmq的LatencyFaultTolerance

  • 2019 年 12 月 10 日
  • 筆記

本文主要研究一下rocketmq的LatencyFaultTolerance

LatencyFaultTolerance

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java

public interface LatencyFaultTolerance<T> {      void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);  ​      boolean isAvailable(final T name);  ​      void remove(final T name);  ​      T pickOneAtLeast();  }
  • LatencyFaultTolerance接口定义了updateFaultItem、isAvailable、remove、pickOneAtLeast方法

LatencyFaultToleranceImpl

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java

public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {      private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);  ​      private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();  ​      @Override      public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {          FaultItem old = this.faultItemTable.get(name);          if (null == old) {              final FaultItem faultItem = new FaultItem(name);              faultItem.setCurrentLatency(currentLatency);              faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);  ​              old = this.faultItemTable.putIfAbsent(name, faultItem);              if (old != null) {                  old.setCurrentLatency(currentLatency);                  old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);              }          } else {              old.setCurrentLatency(currentLatency);              old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);          }      }  ​      @Override      public boolean isAvailable(final String name) {          final FaultItem faultItem = this.faultItemTable.get(name);          if (faultItem != null) {              return faultItem.isAvailable();          }          return true;      }  ​      @Override      public void remove(final String name) {          this.faultItemTable.remove(name);      }  ​      @Override      public String pickOneAtLeast() {          final Enumeration<FaultItem> elements = this.faultItemTable.elements();          List<FaultItem> tmpList = new LinkedList<FaultItem>();          while (elements.hasMoreElements()) {              final FaultItem faultItem = elements.nextElement();              tmpList.add(faultItem);          }  ​          if (!tmpList.isEmpty()) {              Collections.shuffle(tmpList);  ​              Collections.sort(tmpList);  ​              final int half = tmpList.size() / 2;              if (half <= 0) {                  return tmpList.get(0).getName();              } else {                  final int i = this.whichItemWorst.getAndIncrement() % half;                  return tmpList.get(i).getName();              }          }  ​          return null;      }  ​      @Override      public String toString() {          return "LatencyFaultToleranceImpl{" +              "faultItemTable=" + faultItemTable +              ", whichItemWorst=" + whichItemWorst +              '}';      }  ​      //......  }
  • LatencyFaultToleranceImpl实现了LatencyFaultTolerance接口;它维护了一个faultItemTable,其key为name,value为FaultItem;其updateFaultItem方法会将对应name的currentLatency及notAvailableDuration更新到对应的FaultItem中,没有则创建
  • isAvailable方法则先从faultItemTable获取faultItem,不为null则返回faultItem.isAvailable(),为null则返回true;remove方法则执行faultItemTable.remove(name)
  • pickOneAtLeast方法首先拷贝一份faultItemTable的FaultItem的列表,若该列表为空则返回null;不为空则对tmpList进行shuffle以及sort,然后取half值(tmpList.size() / 2),若half小于等于0则返回tmpList.get(0).getName(),否则取tmpList.get(i).getName(),其中i由whichItemWorst.getAndIncrement() % half计算而来

FaultItem

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java

    class FaultItem implements Comparable<FaultItem> {          private final String name;          private volatile long currentLatency;          private volatile long startTimestamp;  ​          public FaultItem(final String name) {              this.name = name;          }  ​          @Override          public int compareTo(final FaultItem other) {              if (this.isAvailable() != other.isAvailable()) {                  if (this.isAvailable())                      return -1;  ​                  if (other.isAvailable())                      return 1;              }  ​              if (this.currentLatency < other.currentLatency)                  return -1;              else if (this.currentLatency > other.currentLatency) {                  return 1;              }  ​              if (this.startTimestamp < other.startTimestamp)                  return -1;              else if (this.startTimestamp > other.startTimestamp) {                  return 1;              }  ​              return 0;          }  ​          public boolean isAvailable() {              return (System.currentTimeMillis() - startTimestamp) >= 0;          }  ​          @Override          public int hashCode() {              int result = getName() != null ? getName().hashCode() : 0;              result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));              result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));              return result;          }  ​          @Override          public boolean equals(final Object o) {              if (this == o)                  return true;              if (!(o instanceof FaultItem))                  return false;  ​              final FaultItem faultItem = (FaultItem) o;  ​              if (getCurrentLatency() != faultItem.getCurrentLatency())                  return false;              if (getStartTimestamp() != faultItem.getStartTimestamp())                  return false;              return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;  ​          }  ​          @Override          public String toString() {              return "FaultItem{" +                  "name='" + name + ''' +                  ", currentLatency=" + currentLatency +                  ", startTimestamp=" + startTimestamp +                  '}';          }  ​          public String getName() {              return name;          }  ​          public long getCurrentLatency() {              return currentLatency;          }  ​          public void setCurrentLatency(final long currentLatency) {              this.currentLatency = currentLatency;          }  ​          public long getStartTimestamp() {              return startTimestamp;          }  ​          public void setStartTimestamp(final long startTimestamp) {              this.startTimestamp = startTimestamp;          }  ​      }
  • FaultItem首先了Comparable<FaultItem>接口,它定义了name、currentLatency、startTimestamp属性,其isAvailable方法的计算公式为(System.currentTimeMillis() - startTimestamp) >= 0;其compareTo方法依次根据isAvailable()、currentLatency、startTimestamp来排序

ThreadLocalIndex

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/common/ThreadLocalIndex.java

public class ThreadLocalIndex {      private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();      private final Random random = new Random();  ​      public int getAndIncrement() {          Integer index = this.threadLocalIndex.get();          if (null == index) {              index = Math.abs(random.nextInt());              if (index < 0)                  index = 0;              this.threadLocalIndex.set(index);          }  ​          index = Math.abs(index + 1);          if (index < 0)              index = 0;  ​          this.threadLocalIndex.set(index);          return index;      }  ​      @Override      public String toString() {          return "ThreadLocalIndex{" +              "threadLocalIndex=" + threadLocalIndex.get() +              '}';      }  }
  • ThreadLocalIndex定义了threadLocalIndex及random属性,其getAndIncrement方法先从threadLocalIndex.get()获取index,若为null则使用Math.abs(random.nextInt())初始化,若结果小于0则重置为0;之后使用Math.abs(index + 1)来threadLocalIndex.set(index),若index小于0也重置为0

小结

  • LatencyFaultTolerance接口定义了updateFaultItem、isAvailable、remove、pickOneAtLeast方法;LatencyFaultToleranceImpl实现了LatencyFaultTolerance接口;它维护了一个faultItemTable,其key为name,value为FaultItem;其updateFaultItem方法会将对应name的currentLatency及notAvailableDuration更新到对应的FaultItem中,没有则创建
  • isAvailable方法则先从faultItemTable获取faultItem,不为null则返回faultItem.isAvailable(),为null则返回true;remove方法则执行faultItemTable.remove(name)
  • pickOneAtLeast方法首先拷贝一份faultItemTable的FaultItem的列表,若该列表为空则返回null;不为空则对tmpList进行shuffle以及sort,然后取half值(tmpList.size() / 2),若half小于等于0则返回tmpList.get(0).getName(),否则取tmpList.get(i).getName(),其中i由whichItemWorst.getAndIncrement() % half计算而来

doc