聊聊artemis的ConnectionLoadBalancingPolicy

  • 2020 年 2 月 24 日
  • 筆記

本文主要研究一下artemis的ConnectionLoadBalancingPolicy

ServerLocatorImpl.selectConnector

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java

public final class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener {       //......       private TransportConfiguration selectConnector() {        Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;          flushTopology();          synchronized (topologyArrayGuard) {           usedTopology = topologyArray;        }          synchronized (this) {           if (usedTopology != null && useTopologyForLoadBalancing) {              if (logger.isTraceEnabled()) {                 logger.trace("Selecting connector from topology.");              }              int pos = loadBalancingPolicy.select(usedTopology.length);              Pair<TransportConfiguration, TransportConfiguration> pair = usedTopology[pos];                return pair.getA();           } else {              if (logger.isTraceEnabled()) {                 logger.trace("Selecting connector from initial connectors.");              }                int pos = loadBalancingPolicy.select(initialConnectors.length);                return initialConnectors[pos];           }        }     }       //......  }
  • ServerLocatorImpl的selectConnector方法會對於useTopologyForLoadBalancing的會通過loadBalancingPolicy.select(usedTopology.length)來獲取pos,之後返回usedTopology[pos].getA();否則通過loadBalancingPolicy.select(initialConnectors.length)獲取pos,之後返回initialConnectors[pos]

ConnectionLoadBalancingPolicy

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/loadbalance/ConnectionLoadBalancingPolicy.java

public interface ConnectionLoadBalancingPolicy {       /**      * Returns the selected index according to the policy implementation.      *      * @param max maximum position index that can be selected      */     int select(int max);  }
  • ConnectionLoadBalancingPolicy定義了select介面,返回選中的index;它有四個實現類分別是FirstElementConnectionLoadBalancingPolicy、RandomConnectionLoadBalancingPolicy、RoundRobinConnectionLoadBalancingPolicy、RandomStickyConnectionLoadBalancingPolicy

FirstElementConnectionLoadBalancingPolicy

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/loadbalance/FirstElementConnectionLoadBalancingPolicy.java

public final class FirstElementConnectionLoadBalancingPolicy implements ConnectionLoadBalancingPolicy {       /**      * @param max param is ignored      * @return 0      */     @Override     public int select(final int max) {        return 0;     }  }
  • FirstElementConnectionLoadBalancingPolicy實現了ConnectionLoadBalancingPolicy介面,其select方法始終返回0

RandomConnectionLoadBalancingPolicy

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/loadbalance/RandomConnectionLoadBalancingPolicy.java

public final class RandomConnectionLoadBalancingPolicy implements ConnectionLoadBalancingPolicy {       /**      * Returns a pseudo random number between {@code 0} (inclusive) and {@code max} exclusive.      *      * @param max the upper limit of the random number selection      * @see java.util.Random#nextInt(int)      */     @Override     public int select(final int max) {        return RandomUtil.randomInterval(0, max);     }  }
  • RandomConnectionLoadBalancingPolicy實現了ConnectionLoadBalancingPolicy介面,其select方法使用RandomUtil.randomInterval(0, max)隨機返回一個index

RoundRobinConnectionLoadBalancingPolicy

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/loadbalance/RoundRobinConnectionLoadBalancingPolicy.java

public final class RoundRobinConnectionLoadBalancingPolicy implements ConnectionLoadBalancingPolicy, Serializable {       private static final long serialVersionUID = 7511196010141439559L;       private boolean first = true;       private int pos;       @Override     public int select(final int max) {        if (first) {           // We start on a random one           pos = RandomUtil.randomInterval(0, max);             first = false;        } else {           pos++;             if (pos >= max) {              pos = 0;           }        }          return pos;     }  }
  • RoundRobinConnectionLoadBalancingPolicy實現了ConnectionLoadBalancingPolicy介面,其select在第一次執行的時候隨機選擇一個pos,之後對pos遞增,對於遞增之後大於等於max的重置pos為0

RandomStickyConnectionLoadBalancingPolicy

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/loadbalance/RandomStickyConnectionLoadBalancingPolicy.java

public final class RandomStickyConnectionLoadBalancingPolicy implements ConnectionLoadBalancingPolicy {       private int pos = -1;       /**      * @see java.util.Random#nextInt(int)      */     @Override     public int select(final int max) {        if (pos == -1) {           pos = RandomUtil.randomInterval(0, max);        }          return pos;     }  }
  • RandomStickyConnectionLoadBalancingPolicy實現了ConnectionLoadBalancingPolicy介面,其select方法第一次隨機選擇一個pos,之後都返回該pos

小結

ConnectionLoadBalancingPolicy定義了select介面,返回選中的index;它有四個實現類分別是FirstElementConnectionLoadBalancingPolicy、RandomConnectionLoadBalancingPolicy、RoundRobinConnectionLoadBalancingPolicy、RandomStickyConnectionLoadBalancingPolicy

doc

  • ConnectionLoadBalancingPolicy