聊聊artemis的FederatedQueue

  • 2020 年 2 月 24 日
  • 笔记

本文主要研究一下artemis的FederatedQueue

FederatedQueue

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/queue/FederatedQueue.java

public class FederatedQueue extends FederatedAbstract implements ActiveMQServerConsumerPlugin, Serializable {       private static final Logger logger = Logger.getLogger(FederatedQueue.class);       private final Set<Matcher> includes;     private final Set<Matcher> excludes;     private final Filter metaDataFilter;     private final int priorityAdjustment;       private final FederationQueuePolicyConfiguration config;       public FederatedQueue(Federation federation, FederationQueuePolicyConfiguration config, ActiveMQServer server, FederationUpstream federationUpstream) throws ActiveMQException {        super(federation, server, federationUpstream);        Objects.requireNonNull(config.getName());        this.config = config;        this.priorityAdjustment = federationUpstream.getPriorityAdjustment() + (config.getPriorityAdjustment() == null ? -1 : config.getPriorityAdjustment());        String metaDataFilterString = config.isIncludeFederated() ? null : "hyphenated_props:" + FederatedQueueConsumer.FEDERATION_NAME +  " IS NOT NULL";        metaDataFilter = FilterImpl.createFilter(metaDataFilterString);        if (config.getIncludes().isEmpty()) {           includes = Collections.emptySet();        } else {           includes = new HashSet<>(config.getIncludes().size());           for (FederationQueuePolicyConfiguration.Matcher include : config.getIncludes()) {              includes.add(new Matcher(include, wildcardConfiguration));           }        }          if (config.getExcludes().isEmpty()) {           excludes = Collections.emptySet();        } else {           excludes = new HashSet<>(config.getExcludes().size());           for (FederationQueuePolicyConfiguration.Matcher exclude : config.getExcludes()) {              excludes.add(new Matcher(exclude, wildcardConfiguration));           }        }     }       @Override     public void start() {        super.start();        server.getPostOffice()              .getAllBindings()              .values()              .stream()              .filter(b -> b instanceof QueueBinding)              .map(b -> (QueueBinding) b)              .forEach(b -> conditionalCreateRemoteConsumer(b.getQueue()));     }       /**      * After a consumer has been created      *      * @param consumer the created consumer      */     @Override     public synchronized void afterCreateConsumer(ServerConsumer consumer) {        conditionalCreateRemoteConsumer(consumer);     }       public FederationQueuePolicyConfiguration getConfig() {        return config;     }       private void conditionalCreateRemoteConsumer(ServerConsumer  consumer) {        if (server.hasBrokerFederationPlugins()) {           final AtomicBoolean conditionalCreate = new AtomicBoolean(true);           try {              server.callBrokerFederationPlugins(plugin -> {                 conditionalCreate.set(conditionalCreate.get() && plugin.federatedQueueConditionalCreateConsumer(consumer));              });           } catch (ActiveMQException t) {              ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "federatedQueueConditionalCreateConsumer");              throw new IllegalStateException(t.getMessage(), t.getCause());           }           if (!conditionalCreate.get()) {              return;           }        }        createRemoteConsumer(consumer);     }       private void conditionalCreateRemoteConsumer(Queue queue) {        queue.getConsumers()              .stream()              .filter(consumer -> consumer instanceof ServerConsumer)              .map(c -> (ServerConsumer) c).forEach(this::conditionalCreateRemoteConsumer);     }       private void createRemoteConsumer(ServerConsumer consumer) {          //We check the session meta data to see if its a federation session, if so by default we ignore these.        //To not ignore these, set include-federated to true, which will mean no meta data filter.        ServerSession serverSession = server.getSessionByID(consumer.getSessionID());        if (metaDataFilter != null && serverSession != null && metaDataFilter.match(serverSession.getMetaData())) {           return;        }        if (match(consumer)) {           FederatedConsumerKey key = getKey(consumer);           Transformer transformer = getTransformer(config.getTransformerRef());           Transformer fqqnTransformer = message -> message == null ? null : message.setAddress(key.getFqqn());           createRemoteConsumer(key, mergeTransformers(fqqnTransformer, transformer), null);        }     }       //......  }
  • FederatedQueue继承了FederatedAbstract,其start方法遍历QueueBinding,然后挨个执行conditionalCreateRemoteConsumer;conditionalCreateRemoteConsumer方法通过父类的createRemoteConsumer来创建remoteQueueConsumer

FederatedAbstract

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedAbstract.java

public abstract class FederatedAbstract implements ActiveMQServerBasePlugin {       //......       public synchronized void createRemoteConsumer(FederatedConsumerKey key, Transformer transformer, ClientSessionCallback callback) {        if (started) {           FederatedQueueConsumer remoteQueueConsumer = remoteQueueConsumers.get(key);           if (remoteQueueConsumer == null) {              if (server.hasBrokerFederationPlugins()) {                 try {                    server.callBrokerFederationPlugins(plugin -> plugin.beforeCreateFederatedQueueConsumer(key));                 } catch (ActiveMQException t) {                    ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "beforeCreateFederatedQueueConsumer");                    throw new IllegalStateException(t.getMessage(), t.getCause());                 }              }              remoteQueueConsumer = new FederatedQueueConsumerImpl(federation, server, transformer, key, upstream, callback);              remoteQueueConsumer.start();              remoteQueueConsumers.put(key, remoteQueueConsumer);                if (server.hasBrokerFederationPlugins()) {                 try {                    final FederatedQueueConsumer finalConsumer = remoteQueueConsumer;                    server.callBrokerFederationPlugins(plugin -> plugin.afterCreateFederatedQueueConsumer(finalConsumer));                 } catch (ActiveMQException t) {                    ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "afterCreateFederatedQueueConsumer");                    throw new IllegalStateException(t.getMessage(), t.getCause());                 }              }           }           remoteQueueConsumer.incrementCount();        }     }       //......  }
  • FederatedAbstract的createRemoteConsumer创建FederatedQueueConsumerImpl并执行其start方法

FederatedQueueConsumerImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java

public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, SessionFailureListener {       private static final Logger logger = Logger.getLogger(FederatedQueueConsumerImpl.class);     private final ActiveMQServer server;     private final Federation federation;     private final FederatedConsumerKey key;     private final Transformer transformer;     private final FederationUpstream upstream;     private final AtomicInteger count = new AtomicInteger();     private final ScheduledExecutorService scheduledExecutorService;     private final int intialConnectDelayMultiplier = 2;     private final int intialConnectDelayMax = 30;     private final ClientSessionCallback clientSessionCallback;       private ClientSessionFactoryInternal clientSessionFactory;     private ClientSession clientSession;     private ClientConsumer clientConsumer;       public FederatedQueueConsumerImpl(Federation federation, ActiveMQServer server, Transformer transformer, FederatedConsumerKey key, FederationUpstream upstream, ClientSessionCallback clientSessionCallback) {        this.federation = federation;        this.server = server;        this.key = key;        this.transformer = transformer;        this.upstream = upstream;        this.scheduledExecutorService = server.getScheduledPool();        this.clientSessionCallback = clientSessionCallback;     }       @Override     public FederationUpstream getFederationUpstream() {        return upstream;     }       @Override     public Federation getFederation() {        return federation;     }       @Override     public FederatedConsumerKey getKey() {        return key;     }       @Override     public ClientSession getClientSession() {        return clientSession;     }       @Override     public int incrementCount() {        return count.incrementAndGet();     }       @Override     public int decrementCount() {        return count.decrementAndGet();     }       @Override     public void start() {        scheduleConnect(0);     }       private void scheduleConnect(int delay) {        scheduledExecutorService.schedule(() -> {           try {              connect();           } catch (Exception e) {              scheduleConnect(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax));           }        }, delay, TimeUnit.SECONDS);     }       private void connect() throws Exception {        try {           if (clientConsumer == null) {              synchronized (this) {                 this.clientSessionFactory = (ClientSessionFactoryInternal) upstream.getConnection().clientSessionFactory();                 this.clientSession = clientSessionFactory.createSession(upstream.getUser(), upstream.getPassword(), false, true, true, clientSessionFactory.getServerLocator().isPreAcknowledge(), clientSessionFactory.getServerLocator().getAckBatchSize());                 this.clientSession.addFailureListener(this);                 this.clientSession.addMetaData(FEDERATION_NAME, federation.getName().toString());                 this.clientSession.addMetaData(FEDERATION_UPSTREAM_NAME, upstream.getName().toString());                 this.clientSession.start();                 if (clientSessionCallback != null) {                    clientSessionCallback.callback(clientSession);                 }                 if (clientSession.queueQuery(key.getQueueName()).isExists()) {                    this.clientConsumer = clientSession.createConsumer(key.getQueueName(), key.getFilterString(), key.getPriority(), false);                    this.clientConsumer.setMessageHandler(this);                 } else {                    throw new ActiveMQNonExistentQueueException("Queue " + key.getQueueName() + " does not exist on remote");                 }              }           }        } catch (Exception e) {           try {              if (clientSessionFactory != null) {                 clientSessionFactory.cleanup();              }              disconnect();           } catch (ActiveMQException ignored) {           }           throw e;        }     }       @Override     public void close() {        scheduleDisconnect(0);     }       private void scheduleDisconnect(int delay) {        scheduledExecutorService.schedule(() -> {           try {              disconnect();           } catch (Exception ignored) {           }        }, delay, TimeUnit.SECONDS);     }       private void disconnect() throws ActiveMQException {        if (clientConsumer != null) {           clientConsumer.close();        }        if (clientSession != null) {           clientSession.close();        }        clientConsumer = null;        clientSession = null;          if (clientSessionFactory != null && (!upstream.getConnection().isSharedConnection() ||            clientSessionFactory.numSessions() == 0)) {           clientSessionFactory.close();           clientSessionFactory = null;        }     }       @Override     public void onMessage(ClientMessage clientMessage) {        try {           if (server.hasBrokerFederationPlugins()) {              try {                 server.callBrokerFederationPlugins(plugin -> plugin.beforeFederatedQueueConsumerMessageHandled(this, clientMessage));              } catch (ActiveMQException t) {                 ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "beforeFederatedQueueConsumerMessageHandled");                 throw new IllegalStateException(t.getMessage(), t.getCause());              }           }             Message message = transformer == null ? clientMessage : transformer.transform(clientMessage);           if (message != null) {              server.getPostOffice().route(message, true);           }           clientMessage.acknowledge();             if (server.hasBrokerFederationPlugins()) {              try {                 server.callBrokerFederationPlugins(plugin -> plugin.afterFederatedQueueConsumerMessageHandled(this, clientMessage));              } catch (ActiveMQException t) {                 ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "afterFederatedQueueConsumerMessageHandled");                 throw new IllegalStateException(t.getMessage(), t.getCause());              }           }        } catch (Exception e) {           try {              clientSession.rollback();           } catch (ActiveMQException e1) {           }        }     }       @Override     public void connectionFailed(ActiveMQException exception, boolean failedOver) {        connectionFailed(exception, failedOver, null);     }       @Override     public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {        try {           clientSessionFactory.cleanup();           clientSessionFactory.close();           clientConsumer = null;           clientSession = null;           clientSessionFactory = null;        } catch (Throwable dontCare) {        }        start();     }       @Override     public void beforeReconnect(ActiveMQException exception) {     }       public interface ClientSessionCallback {        void callback(ClientSession clientSession) throws ActiveMQException;     }  }
  • FederatedQueueConsumerImpl的start方法执行scheduleConnect方法,delay参数为0;scheduleConnect方法会使用scheduledExecutorService调度执行connect方法,在捕获到异常时计算新的delay再次执行scheduleConnect;其close方法执行scheduleDisconnect方法,delay参数为0;scheduleDisconnect方法则调度执行disconnect方法;connect方法通过upstream的clientSessionFactory创建clientSession并执行其start方法,之后创建clientConsumer并设置其messageHandler;disconnect方法则执行clientConsumer及clientSession的close

ClientSessionImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java

public final class ClientSessionImpl implements ClientSessionInternal, FailureListener {       //......       public ClientSessionImpl start() throws ActiveMQException {        checkClosed();          if (!started) {           for (ClientConsumerInternal clientConsumerInternal : cloneConsumers()) {              clientConsumerInternal.start();           }             sessionContext.sessionStart();             started = true;        }          return this;     }       //......  }
  • ClientSessionImpl的start方法主要是执行clientConsumerInternal.start()及sessionContext.sessionStart()

ClientConsumerImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

public final class ClientConsumerImpl implements ClientConsumerInternal {       //......       public synchronized void start() {        stopped = false;          requeueExecutors();     }       private void requeueExecutors() {        for (int i = 0; i < buffer.size(); i++) {           queueExecutor();        }     }       private void queueExecutor() {        if (logger.isTraceEnabled()) {           logger.trace(this + "::Adding Runner on Executor for delivery");        }          sessionExecutor.execute(runner);     }       private class Runner implements Runnable {          @Override        public void run() {           try {              callOnMessage();           } catch (Exception e) {              ActiveMQClientLogger.LOGGER.onMessageError(e);                lastException = e;           }        }     }       private void callOnMessage() throws Exception {        if (closing || stopped) {           return;        }          session.workDone();          // We pull the message from the buffer from inside the Runnable so we can ensure priority        // ordering. If we just added a Runnable with the message to the executor immediately as we get it        // we could not do that          ClientMessageInternal message;          // Must store handler in local variable since might get set to null        // otherwise while this is executing and give NPE when calling onMessage        MessageHandler theHandler = handler;          if (theHandler != null) {           if (rateLimiter != null) {              rateLimiter.limit();           }             failedOver = false;             synchronized (this) {              message = buffer.poll();           }             if (message != null) {              if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {                 //Ignore, this could be a relic from a previous receiveImmediate();                 return;              }                boolean expired = message.isExpired();                flowControlBeforeConsumption(message);                if (!expired) {                 if (logger.isTraceEnabled()) {                    logger.trace(this + "::Calling handler.onMessage");                 }                 final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {                    @Override                    public ClassLoader run() {                       ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();                         Thread.currentThread().setContextClassLoader(contextClassLoader);                         return originalLoader;                    }                 });                   onMessageThread = Thread.currentThread();                 try {                    theHandler.onMessage(message);                 } finally {                    try {                       AccessController.doPrivileged(new PrivilegedAction<Object>() {                          @Override                          public Object run() {                             Thread.currentThread().setContextClassLoader(originalLoader);                             return null;                          }                       });                    } catch (Exception e) {                       ActiveMQClientLogger.LOGGER.failedPerformPostActionsOnMessage(e);                    }                      onMessageThread = null;                 }                   if (logger.isTraceEnabled()) {                    logger.trace(this + "::Handler.onMessage done");                 }                   if (message.isLargeMessage()) {                    message.discardBody();                 }              } else {                 session.expire(this, message);              }                // If slow consumer, we need to send 1 credit to make sure we get another message              if (clientWindowSize == 0) {                 startSlowConsumer();              }           }        }     }       //......  }  
  • ClientConsumerImpl的start方法会调度执行Runner,其run方法则是执行callOnMessage方法,该方法会通过buffer.poll()拉取信息,然后执行theHandler.onMessage(message)回调

ActiveMQSessionContext

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java

public class ActiveMQSessionContext extends SessionContext {       //......       public void sessionStart() throws ActiveMQException {        sessionChannel.send(new PacketImpl(PacketImpl.SESS_START));     }       //......  }
  • ActiveMQSessionContext的sessionStart方法通过sessionChannel发送PacketImpl.SESS_START消息

小结

FederatedQueueConsumerImpl的start方法执行scheduleConnect方法,delay参数为0;scheduleConnect方法会使用scheduledExecutorService调度执行connect方法,在捕获到异常时计算新的delay再次执行scheduleConnect;其close方法执行scheduleDisconnect方法,delay参数为0;scheduleDisconnect方法则调度执行disconnect方法;connect方法通过upstream的clientSessionFactory创建clientSession并执行其start方法,之后创建clientConsumer并设置其messageHandler;disconnect方法则执行clientConsumer及clientSession的close

doc

  • FederatedQueue