聊聊artemis的MessageLoadBalancingType
- 2020 年 2 月 24 日
- 筆記
序
本文主要研究一下artemis的MessageLoadBalancingType
MessageLoadBalancingType
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/MessageLoadBalancingType.java
public enum MessageLoadBalancingType { OFF("OFF"), STRICT("STRICT"), ON_DEMAND("ON_DEMAND"); static { // for URI support on ClusterConnection BeanSupport.registerConverter(new MessageLoadBalancingTypeConverter(), MessageLoadBalancingType.class); } static class MessageLoadBalancingTypeConverter implements Converter { @Override public <T> T convert(Class<T> type, Object value) { return type.cast(MessageLoadBalancingType.getType(value.toString())); } } private String type; MessageLoadBalancingType(final String type) { this.type = type; } public String getType() { return type; } public static MessageLoadBalancingType getType(String string) { if (string.equals(OFF.getType())) { return MessageLoadBalancingType.OFF; } else if (string.equals(STRICT.getType())) { return MessageLoadBalancingType.STRICT; } else if (string.equals(ON_DEMAND.getType())) { return MessageLoadBalancingType.ON_DEMAND; } else { return null; } } }
- MessageLoadBalancingType枚举定义了OFF、STRICT、ON_DEMAND三个枚举值
PostOfficeImpl
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory { //...... public Pair<RoutingContext, Message> redistribute(final Message message, final Queue originatingQueue, final Transaction tx) throws Exception { Bindings bindings = addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress()); if (bindings != null && bindings.allowRedistribute()) { // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message // arrived the target node // as described on https://issues.jboss.org/browse/JBPAPP-6130 Message copyRedistribute = message.copy(storageManager.generateID()); copyRedistribute.setAddress(originatingQueue.getAddress()); if (tx != null) { tx.addOperation(new TransactionOperationAbstract() { @Override public void afterRollback(Transaction tx) { try { //this will cause large message file to be //cleaned up copyRedistribute.decrementRefCount(); } catch (Exception e) { logger.warn("Failed to clean up message: " + copyRedistribute); } } }); } RoutingContext context = new RoutingContextImpl(tx); boolean routed = bindings.redistribute(copyRedistribute, originatingQueue, context); if (routed) { return new Pair<>(context, copyRedistribute); } } return null; } //...... }
- PostOfficeImpl的redistribute方法通过bindings.allowRedistribute()判断是否redistribute,是的话则执行bindings.redistribute
BindingsImpl
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
public final class BindingsImpl implements Bindings { //...... @Override public boolean allowRedistribute() { return messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND); } @Override public boolean redistribute(final Message message, final Queue originatingQueue, final RoutingContext context) throws Exception { if (messageLoadBalancingType.equals(MessageLoadBalancingType.STRICT) || messageLoadBalancingType.equals(MessageLoadBalancingType.OFF)) { return false; } if (logger.isTraceEnabled()) { logger.trace("Redistributing message " + message); } SimpleString routingName = originatingQueue.getName(); List<Binding> bindings = routingNameBindingMap.get(routingName); if (bindings == null) { // The value can become null if it's concurrently removed while we're iterating - this is expected // ConcurrentHashMap behaviour! return false; } Integer ipos = routingNamePositions.get(routingName); int pos = ipos != null ? ipos.intValue() : 0; int length = bindings.size(); int startPos = pos; Binding theBinding = null; // TODO - combine this with similar logic in route() while (true) { Binding binding; try { binding = bindings.get(pos); } catch (IndexOutOfBoundsException e) { // This can occur if binding is removed while in route if (!bindings.isEmpty()) { pos = 0; startPos = 0; length = bindings.size(); continue; } else { break; } } pos = incrementPos(pos, length); Filter filter = binding.getFilter(); boolean highPrior = binding.isHighAcceptPriority(message); if (highPrior && binding.getBindable() != originatingQueue && (filter == null || filter.match(message))) { theBinding = binding; break; } if (pos == startPos) { break; } } routingNamePositions.put(routingName, pos); if (theBinding != null) { theBinding.route(message, context); return true; } else { return false; } } //...... }
- BindingsImpl的allowRedistribute方法在messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND)时返回true;其redistribute方法会校验messageLoadBalancingType,若为STRICT或OFF时直接返回false;之后通过round robin的方式来获取binding,然后执行binding.route方法
小结
MessageLoadBalancingType枚举定义了OFF、STRICT、ON_DEMAND三个枚举值;PostOfficeImpl的redistribute方法通过bindings.allowRedistribute()判断是否redistribute,是的话则执行bindings.redistribute;BindingsImpl的allowRedistribute方法在messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND)时返回true;其redistribute方法会校验messageLoadBalancingType,若为STRICT或OFF时直接返回false;之后通过round robin的方式来获取binding,然后执行binding.route方法
doc
- MessageLoadBalancingType