­

聊聊artemis的MessageLoadBalancingType

  • 2020 年 2 月 24 日
  • 筆記

本文主要研究一下artemis的MessageLoadBalancingType

MessageLoadBalancingType

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/MessageLoadBalancingType.java

public enum MessageLoadBalancingType {     OFF("OFF"), STRICT("STRICT"), ON_DEMAND("ON_DEMAND");       static {        // for URI support on ClusterConnection        BeanSupport.registerConverter(new MessageLoadBalancingTypeConverter(), MessageLoadBalancingType.class);     }       static class MessageLoadBalancingTypeConverter implements Converter {          @Override        public <T> T convert(Class<T> type, Object value) {           return type.cast(MessageLoadBalancingType.getType(value.toString()));        }     }       private String type;       MessageLoadBalancingType(final String type) {        this.type = type;     }       public String getType() {        return type;     }       public static MessageLoadBalancingType getType(String string) {        if (string.equals(OFF.getType())) {           return MessageLoadBalancingType.OFF;        } else if (string.equals(STRICT.getType())) {           return MessageLoadBalancingType.STRICT;        } else if (string.equals(ON_DEMAND.getType())) {           return MessageLoadBalancingType.ON_DEMAND;        } else {           return null;        }     }  }
  • MessageLoadBalancingType枚举定义了OFF、STRICT、ON_DEMAND三个枚举值

PostOfficeImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java

public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory {       //......       public Pair<RoutingContext, Message> redistribute(final Message message,                                                       final Queue originatingQueue,                                                       final Transaction tx) throws Exception {        Bindings bindings = addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress());          if (bindings != null && bindings.allowRedistribute()) {           // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message           // arrived the target node           // as described on https://issues.jboss.org/browse/JBPAPP-6130           Message copyRedistribute = message.copy(storageManager.generateID());           copyRedistribute.setAddress(originatingQueue.getAddress());             if (tx != null) {              tx.addOperation(new TransactionOperationAbstract() {                 @Override                 public void afterRollback(Transaction tx) {                    try {                       //this will cause large message file to be                       //cleaned up                       copyRedistribute.decrementRefCount();                    } catch (Exception e) {                       logger.warn("Failed to clean up message: " + copyRedistribute);                    }                 }              });           }             RoutingContext context = new RoutingContextImpl(tx);             boolean routed = bindings.redistribute(copyRedistribute, originatingQueue, context);             if (routed) {              return new Pair<>(context, copyRedistribute);           }        }          return null;     }       //......  }
  • PostOfficeImpl的redistribute方法通过bindings.allowRedistribute()判断是否redistribute,是的话则执行bindings.redistribute

BindingsImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java

public final class BindingsImpl implements Bindings {       //......       @Override     public boolean allowRedistribute() {        return messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND);     }       @Override     public boolean redistribute(final Message message,                                 final Queue originatingQueue,                                 final RoutingContext context) throws Exception {        if (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) {           return false;        }          if (logger.isTraceEnabled()) {           logger.trace("Redistributing message " + message);        }          SimpleString routingName = originatingQueue.getName();          List<Binding> bindings = routingNameBindingMap.get(routingName);          if (bindings == null) {           // The value can become null if it's concurrently removed while we're iterating - this is expected           // ConcurrentHashMap behaviour!           return false;        }          Integer ipos = routingNamePositions.get(routingName);          int pos = ipos != null ? ipos.intValue() : 0;          int length = bindings.size();          int startPos = pos;          Binding theBinding = null;          // TODO - combine this with similar logic in route()        while (true) {           Binding binding;           try {              binding = bindings.get(pos);           } catch (IndexOutOfBoundsException e) {              // This can occur if binding is removed while in route              if (!bindings.isEmpty()) {                 pos = 0;                 startPos = 0;                 length = bindings.size();                   continue;              } else {                 break;              }           }             pos = incrementPos(pos, length);             Filter filter = binding.getFilter();             boolean highPrior = binding.isHighAcceptPriority(message);             if (highPrior && binding.getBindable() != originatingQueue && (filter == null || filter.match(message))) {              theBinding = binding;                break;           }             if (pos == startPos) {              break;           }        }          routingNamePositions.put(routingName, pos);          if (theBinding != null) {           theBinding.route(message, context);             return true;        } else {           return false;        }     }       //......  }
  • BindingsImpl的allowRedistribute方法在messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND)时返回true;其redistribute方法会校验messageLoadBalancingType,若为STRICT或OFF时直接返回false;之后通过round robin的方式来获取binding,然后执行binding.route方法

小结

MessageLoadBalancingType枚举定义了OFF、STRICT、ON_DEMAND三个枚举值;PostOfficeImpl的redistribute方法通过bindings.allowRedistribute()判断是否redistribute,是的话则执行bindings.redistribute;BindingsImpl的allowRedistribute方法在messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND)时返回true;其redistribute方法会校验messageLoadBalancingType,若为STRICT或OFF时直接返回false;之后通过round robin的方式来获取binding,然后执行binding.route方法

doc

  • MessageLoadBalancingType