聊聊artemis的ConnectionLoadBalancingPolicy

  • 2020 年 2 月 24 日
  • 笔记

本文主要研究一下artemis的ConnectionLoadBalancingPolicy

ServerLocatorImpl.selectConnector

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java

public final class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener {       //......       private TransportConfiguration selectConnector() {        Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;          flushTopology();          synchronized (topologyArrayGuard) {           usedTopology = topologyArray;        }          synchronized (this) {           if (usedTopology != null && useTopologyForLoadBalancing) {              if (logger.isTraceEnabled()) {                 logger.trace("Selecting connector from topology.");              }              int pos = loadBalancingPolicy.select(usedTopology.length);              Pair<TransportConfiguration, TransportConfiguration> pair = usedTopology[pos];                return pair.getA();           } else {              if (logger.isTraceEnabled()) {                 logger.trace("Selecting connector from initial connectors.");              }                int pos = loadBalancingPolicy.select(initialConnectors.length);                return initialConnectors[pos];           }        }     }       //......  }
  • ServerLocatorImpl的selectConnector方法会对于useTopologyForLoadBalancing的会通过loadBalancingPolicy.select(usedTopology.length)来获取pos,之后返回usedTopology[pos].getA();否则通过loadBalancingPolicy.select(initialConnectors.length)获取pos,之后返回initialConnectors[pos]

ConnectionLoadBalancingPolicy

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/loadbalance/ConnectionLoadBalancingPolicy.java

public interface ConnectionLoadBalancingPolicy {       /**      * Returns the selected index according to the policy implementation.      *      * @param max maximum position index that can be selected      */     int select(int max);  }
  • ConnectionLoadBalancingPolicy定义了select接口,返回选中的index;它有四个实现类分别是FirstElementConnectionLoadBalancingPolicy、RandomConnectionLoadBalancingPolicy、RoundRobinConnectionLoadBalancingPolicy、RandomStickyConnectionLoadBalancingPolicy

FirstElementConnectionLoadBalancingPolicy

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/loadbalance/FirstElementConnectionLoadBalancingPolicy.java

public final class FirstElementConnectionLoadBalancingPolicy implements ConnectionLoadBalancingPolicy {       /**      * @param max param is ignored      * @return 0      */     @Override     public int select(final int max) {        return 0;     }  }
  • FirstElementConnectionLoadBalancingPolicy实现了ConnectionLoadBalancingPolicy接口,其select方法始终返回0

RandomConnectionLoadBalancingPolicy

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/loadbalance/RandomConnectionLoadBalancingPolicy.java

public final class RandomConnectionLoadBalancingPolicy implements ConnectionLoadBalancingPolicy {       /**      * Returns a pseudo random number between {@code 0} (inclusive) and {@code max} exclusive.      *      * @param max the upper limit of the random number selection      * @see java.util.Random#nextInt(int)      */     @Override     public int select(final int max) {        return RandomUtil.randomInterval(0, max);     }  }
  • RandomConnectionLoadBalancingPolicy实现了ConnectionLoadBalancingPolicy接口,其select方法使用RandomUtil.randomInterval(0, max)随机返回一个index

RoundRobinConnectionLoadBalancingPolicy

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/loadbalance/RoundRobinConnectionLoadBalancingPolicy.java

public final class RoundRobinConnectionLoadBalancingPolicy implements ConnectionLoadBalancingPolicy, Serializable {       private static final long serialVersionUID = 7511196010141439559L;       private boolean first = true;       private int pos;       @Override     public int select(final int max) {        if (first) {           // We start on a random one           pos = RandomUtil.randomInterval(0, max);             first = false;        } else {           pos++;             if (pos >= max) {              pos = 0;           }        }          return pos;     }  }
  • RoundRobinConnectionLoadBalancingPolicy实现了ConnectionLoadBalancingPolicy接口,其select在第一次执行的时候随机选择一个pos,之后对pos递增,对于递增之后大于等于max的重置pos为0

RandomStickyConnectionLoadBalancingPolicy

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/loadbalance/RandomStickyConnectionLoadBalancingPolicy.java

public final class RandomStickyConnectionLoadBalancingPolicy implements ConnectionLoadBalancingPolicy {       private int pos = -1;       /**      * @see java.util.Random#nextInt(int)      */     @Override     public int select(final int max) {        if (pos == -1) {           pos = RandomUtil.randomInterval(0, max);        }          return pos;     }  }
  • RandomStickyConnectionLoadBalancingPolicy实现了ConnectionLoadBalancingPolicy接口,其select方法第一次随机选择一个pos,之后都返回该pos

小结

ConnectionLoadBalancingPolicy定义了select接口,返回选中的index;它有四个实现类分别是FirstElementConnectionLoadBalancingPolicy、RandomConnectionLoadBalancingPolicy、RoundRobinConnectionLoadBalancingPolicy、RandomStickyConnectionLoadBalancingPolicy

doc

  • ConnectionLoadBalancingPolicy