聊聊artemis的FederatedQueue
- 2020 年 2 月 24 日
- 笔记
序
本文主要研究一下artemis的FederatedQueue
FederatedQueue
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/queue/FederatedQueue.java
public class FederatedQueue extends FederatedAbstract implements ActiveMQServerConsumerPlugin, Serializable { private static final Logger logger = Logger.getLogger(FederatedQueue.class); private final Set<Matcher> includes; private final Set<Matcher> excludes; private final Filter metaDataFilter; private final int priorityAdjustment; private final FederationQueuePolicyConfiguration config; public FederatedQueue(Federation federation, FederationQueuePolicyConfiguration config, ActiveMQServer server, FederationUpstream federationUpstream) throws ActiveMQException { super(federation, server, federationUpstream); Objects.requireNonNull(config.getName()); this.config = config; this.priorityAdjustment = federationUpstream.getPriorityAdjustment() + (config.getPriorityAdjustment() == null ? -1 : config.getPriorityAdjustment()); String metaDataFilterString = config.isIncludeFederated() ? null : "hyphenated_props:" + FederatedQueueConsumer.FEDERATION_NAME + " IS NOT NULL"; metaDataFilter = FilterImpl.createFilter(metaDataFilterString); if (config.getIncludes().isEmpty()) { includes = Collections.emptySet(); } else { includes = new HashSet<>(config.getIncludes().size()); for (FederationQueuePolicyConfiguration.Matcher include : config.getIncludes()) { includes.add(new Matcher(include, wildcardConfiguration)); } } if (config.getExcludes().isEmpty()) { excludes = Collections.emptySet(); } else { excludes = new HashSet<>(config.getExcludes().size()); for (FederationQueuePolicyConfiguration.Matcher exclude : config.getExcludes()) { excludes.add(new Matcher(exclude, wildcardConfiguration)); } } } @Override public void start() { super.start(); server.getPostOffice() .getAllBindings() .values() .stream() .filter(b -> b instanceof QueueBinding) .map(b -> (QueueBinding) b) .forEach(b -> conditionalCreateRemoteConsumer(b.getQueue())); } /** * After a consumer has been created * * @param consumer the created consumer */ @Override public synchronized void afterCreateConsumer(ServerConsumer consumer) { conditionalCreateRemoteConsumer(consumer); } public FederationQueuePolicyConfiguration getConfig() { return config; } private void conditionalCreateRemoteConsumer(ServerConsumer consumer) { if (server.hasBrokerFederationPlugins()) { final AtomicBoolean conditionalCreate = new AtomicBoolean(true); try { server.callBrokerFederationPlugins(plugin -> { conditionalCreate.set(conditionalCreate.get() && plugin.federatedQueueConditionalCreateConsumer(consumer)); }); } catch (ActiveMQException t) { ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "federatedQueueConditionalCreateConsumer"); throw new IllegalStateException(t.getMessage(), t.getCause()); } if (!conditionalCreate.get()) { return; } } createRemoteConsumer(consumer); } private void conditionalCreateRemoteConsumer(Queue queue) { queue.getConsumers() .stream() .filter(consumer -> consumer instanceof ServerConsumer) .map(c -> (ServerConsumer) c).forEach(this::conditionalCreateRemoteConsumer); } private void createRemoteConsumer(ServerConsumer consumer) { //We check the session meta data to see if its a federation session, if so by default we ignore these. //To not ignore these, set include-federated to true, which will mean no meta data filter. ServerSession serverSession = server.getSessionByID(consumer.getSessionID()); if (metaDataFilter != null && serverSession != null && metaDataFilter.match(serverSession.getMetaData())) { return; } if (match(consumer)) { FederatedConsumerKey key = getKey(consumer); Transformer transformer = getTransformer(config.getTransformerRef()); Transformer fqqnTransformer = message -> message == null ? null : message.setAddress(key.getFqqn()); createRemoteConsumer(key, mergeTransformers(fqqnTransformer, transformer), null); } } //...... }
- FederatedQueue继承了FederatedAbstract,其start方法遍历QueueBinding,然后挨个执行conditionalCreateRemoteConsumer;conditionalCreateRemoteConsumer方法通过父类的createRemoteConsumer来创建remoteQueueConsumer
FederatedAbstract
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedAbstract.java
public abstract class FederatedAbstract implements ActiveMQServerBasePlugin { //...... public synchronized void createRemoteConsumer(FederatedConsumerKey key, Transformer transformer, ClientSessionCallback callback) { if (started) { FederatedQueueConsumer remoteQueueConsumer = remoteQueueConsumers.get(key); if (remoteQueueConsumer == null) { if (server.hasBrokerFederationPlugins()) { try { server.callBrokerFederationPlugins(plugin -> plugin.beforeCreateFederatedQueueConsumer(key)); } catch (ActiveMQException t) { ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "beforeCreateFederatedQueueConsumer"); throw new IllegalStateException(t.getMessage(), t.getCause()); } } remoteQueueConsumer = new FederatedQueueConsumerImpl(federation, server, transformer, key, upstream, callback); remoteQueueConsumer.start(); remoteQueueConsumers.put(key, remoteQueueConsumer); if (server.hasBrokerFederationPlugins()) { try { final FederatedQueueConsumer finalConsumer = remoteQueueConsumer; server.callBrokerFederationPlugins(plugin -> plugin.afterCreateFederatedQueueConsumer(finalConsumer)); } catch (ActiveMQException t) { ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "afterCreateFederatedQueueConsumer"); throw new IllegalStateException(t.getMessage(), t.getCause()); } } } remoteQueueConsumer.incrementCount(); } } //...... }
- FederatedAbstract的createRemoteConsumer创建FederatedQueueConsumerImpl并执行其start方法
FederatedQueueConsumerImpl
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java
public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, SessionFailureListener { private static final Logger logger = Logger.getLogger(FederatedQueueConsumerImpl.class); private final ActiveMQServer server; private final Federation federation; private final FederatedConsumerKey key; private final Transformer transformer; private final FederationUpstream upstream; private final AtomicInteger count = new AtomicInteger(); private final ScheduledExecutorService scheduledExecutorService; private final int intialConnectDelayMultiplier = 2; private final int intialConnectDelayMax = 30; private final ClientSessionCallback clientSessionCallback; private ClientSessionFactoryInternal clientSessionFactory; private ClientSession clientSession; private ClientConsumer clientConsumer; public FederatedQueueConsumerImpl(Federation federation, ActiveMQServer server, Transformer transformer, FederatedConsumerKey key, FederationUpstream upstream, ClientSessionCallback clientSessionCallback) { this.federation = federation; this.server = server; this.key = key; this.transformer = transformer; this.upstream = upstream; this.scheduledExecutorService = server.getScheduledPool(); this.clientSessionCallback = clientSessionCallback; } @Override public FederationUpstream getFederationUpstream() { return upstream; } @Override public Federation getFederation() { return federation; } @Override public FederatedConsumerKey getKey() { return key; } @Override public ClientSession getClientSession() { return clientSession; } @Override public int incrementCount() { return count.incrementAndGet(); } @Override public int decrementCount() { return count.decrementAndGet(); } @Override public void start() { scheduleConnect(0); } private void scheduleConnect(int delay) { scheduledExecutorService.schedule(() -> { try { connect(); } catch (Exception e) { scheduleConnect(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax)); } }, delay, TimeUnit.SECONDS); } private void connect() throws Exception { try { if (clientConsumer == null) { synchronized (this) { this.clientSessionFactory = (ClientSessionFactoryInternal) upstream.getConnection().clientSessionFactory(); this.clientSession = clientSessionFactory.createSession(upstream.getUser(), upstream.getPassword(), false, true, true, clientSessionFactory.getServerLocator().isPreAcknowledge(), clientSessionFactory.getServerLocator().getAckBatchSize()); this.clientSession.addFailureListener(this); this.clientSession.addMetaData(FEDERATION_NAME, federation.getName().toString()); this.clientSession.addMetaData(FEDERATION_UPSTREAM_NAME, upstream.getName().toString()); this.clientSession.start(); if (clientSessionCallback != null) { clientSessionCallback.callback(clientSession); } if (clientSession.queueQuery(key.getQueueName()).isExists()) { this.clientConsumer = clientSession.createConsumer(key.getQueueName(), key.getFilterString(), key.getPriority(), false); this.clientConsumer.setMessageHandler(this); } else { throw new ActiveMQNonExistentQueueException("Queue " + key.getQueueName() + " does not exist on remote"); } } } } catch (Exception e) { try { if (clientSessionFactory != null) { clientSessionFactory.cleanup(); } disconnect(); } catch (ActiveMQException ignored) { } throw e; } } @Override public void close() { scheduleDisconnect(0); } private void scheduleDisconnect(int delay) { scheduledExecutorService.schedule(() -> { try { disconnect(); } catch (Exception ignored) { } }, delay, TimeUnit.SECONDS); } private void disconnect() throws ActiveMQException { if (clientConsumer != null) { clientConsumer.close(); } if (clientSession != null) { clientSession.close(); } clientConsumer = null; clientSession = null; if (clientSessionFactory != null && (!upstream.getConnection().isSharedConnection() || clientSessionFactory.numSessions() == 0)) { clientSessionFactory.close(); clientSessionFactory = null; } } @Override public void onMessage(ClientMessage clientMessage) { try { if (server.hasBrokerFederationPlugins()) { try { server.callBrokerFederationPlugins(plugin -> plugin.beforeFederatedQueueConsumerMessageHandled(this, clientMessage)); } catch (ActiveMQException t) { ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "beforeFederatedQueueConsumerMessageHandled"); throw new IllegalStateException(t.getMessage(), t.getCause()); } } Message message = transformer == null ? clientMessage : transformer.transform(clientMessage); if (message != null) { server.getPostOffice().route(message, true); } clientMessage.acknowledge(); if (server.hasBrokerFederationPlugins()) { try { server.callBrokerFederationPlugins(plugin -> plugin.afterFederatedQueueConsumerMessageHandled(this, clientMessage)); } catch (ActiveMQException t) { ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "afterFederatedQueueConsumerMessageHandled"); throw new IllegalStateException(t.getMessage(), t.getCause()); } } } catch (Exception e) { try { clientSession.rollback(); } catch (ActiveMQException e1) { } } } @Override public void connectionFailed(ActiveMQException exception, boolean failedOver) { connectionFailed(exception, failedOver, null); } @Override public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) { try { clientSessionFactory.cleanup(); clientSessionFactory.close(); clientConsumer = null; clientSession = null; clientSessionFactory = null; } catch (Throwable dontCare) { } start(); } @Override public void beforeReconnect(ActiveMQException exception) { } public interface ClientSessionCallback { void callback(ClientSession clientSession) throws ActiveMQException; } }
- FederatedQueueConsumerImpl的start方法执行scheduleConnect方法,delay参数为0;scheduleConnect方法会使用scheduledExecutorService调度执行connect方法,在捕获到异常时计算新的delay再次执行scheduleConnect;其close方法执行scheduleDisconnect方法,delay参数为0;scheduleDisconnect方法则调度执行disconnect方法;connect方法通过upstream的clientSessionFactory创建clientSession并执行其start方法,之后创建clientConsumer并设置其messageHandler;disconnect方法则执行clientConsumer及clientSession的close
ClientSessionImpl
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
public final class ClientSessionImpl implements ClientSessionInternal, FailureListener { //...... public ClientSessionImpl start() throws ActiveMQException { checkClosed(); if (!started) { for (ClientConsumerInternal clientConsumerInternal : cloneConsumers()) { clientConsumerInternal.start(); } sessionContext.sessionStart(); started = true; } return this; } //...... }
- ClientSessionImpl的start方法主要是执行clientConsumerInternal.start()及sessionContext.sessionStart()
ClientConsumerImpl
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
public final class ClientConsumerImpl implements ClientConsumerInternal { //...... public synchronized void start() { stopped = false; requeueExecutors(); } private void requeueExecutors() { for (int i = 0; i < buffer.size(); i++) { queueExecutor(); } } private void queueExecutor() { if (logger.isTraceEnabled()) { logger.trace(this + "::Adding Runner on Executor for delivery"); } sessionExecutor.execute(runner); } private class Runner implements Runnable { @Override public void run() { try { callOnMessage(); } catch (Exception e) { ActiveMQClientLogger.LOGGER.onMessageError(e); lastException = e; } } } private void callOnMessage() throws Exception { if (closing || stopped) { return; } session.workDone(); // We pull the message from the buffer from inside the Runnable so we can ensure priority // ordering. If we just added a Runnable with the message to the executor immediately as we get it // we could not do that ClientMessageInternal message; // Must store handler in local variable since might get set to null // otherwise while this is executing and give NPE when calling onMessage MessageHandler theHandler = handler; if (theHandler != null) { if (rateLimiter != null) { rateLimiter.limit(); } failedOver = false; synchronized (this) { message = buffer.poll(); } if (message != null) { if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) { //Ignore, this could be a relic from a previous receiveImmediate(); return; } boolean expired = message.isExpired(); flowControlBeforeConsumption(message); if (!expired) { if (logger.isTraceEnabled()) { logger.trace(this + "::Calling handler.onMessage"); } final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() { @Override public ClassLoader run() { ClassLoader originalLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(contextClassLoader); return originalLoader; } }); onMessageThread = Thread.currentThread(); try { theHandler.onMessage(message); } finally { try { AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { Thread.currentThread().setContextClassLoader(originalLoader); return null; } }); } catch (Exception e) { ActiveMQClientLogger.LOGGER.failedPerformPostActionsOnMessage(e); } onMessageThread = null; } if (logger.isTraceEnabled()) { logger.trace(this + "::Handler.onMessage done"); } if (message.isLargeMessage()) { message.discardBody(); } } else { session.expire(this, message); } // If slow consumer, we need to send 1 credit to make sure we get another message if (clientWindowSize == 0) { startSlowConsumer(); } } } } //...... }
- ClientConsumerImpl的start方法会调度执行Runner,其run方法则是执行callOnMessage方法,该方法会通过buffer.poll()拉取信息,然后执行theHandler.onMessage(message)回调
ActiveMQSessionContext
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
public class ActiveMQSessionContext extends SessionContext { //...... public void sessionStart() throws ActiveMQException { sessionChannel.send(new PacketImpl(PacketImpl.SESS_START)); } //...... }
- ActiveMQSessionContext的sessionStart方法通过sessionChannel发送PacketImpl.SESS_START消息
小结
FederatedQueueConsumerImpl的start方法执行scheduleConnect方法,delay参数为0;scheduleConnect方法会使用scheduledExecutorService调度执行connect方法,在捕获到异常时计算新的delay再次执行scheduleConnect;其close方法执行scheduleDisconnect方法,delay参数为0;scheduleDisconnect方法则调度执行disconnect方法;connect方法通过upstream的clientSessionFactory创建clientSession并执行其start方法,之后创建clientConsumer并设置其messageHandler;disconnect方法则执行clientConsumer及clientSession的close
doc
- FederatedQueue