聊聊artemis的DelayedAddRedistributor

  • 2020 年 2 月 24 日
  • 笔记

本文主要研究一下artemis的DelayedAddRedistributor

addRedistributor

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {       //......       public synchronized void addRedistributor(final long delay) {        clearRedistributorFuture();          if (redistributor != null) {           // Just prompt delivery           deliverAsync();        }          if (delay > 0) {           if (consumers.isEmpty()) {              DelayedAddRedistributor dar = new DelayedAddRedistributor(executor);                redistributorFuture = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS);           }        } else {           internalAddRedistributor(executor);        }     }       //......  }
  • QueueImpl的addRedistributor在delay大于0的时候会创建并调度DelayedAddRedistributor

DelayedAddRedistributor

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {       //......       private class DelayedAddRedistributor implements Runnable {          private final ArtemisExecutor executor1;          DelayedAddRedistributor(final ArtemisExecutor executor) {           this.executor1 = executor;        }          @Override        public void run() {           synchronized (QueueImpl.this) {              internalAddRedistributor(executor1);                clearRedistributorFuture();           }        }     }       private void internalAddRedistributor(final ArtemisExecutor executor) {        // create the redistributor only once if there are no local consumers        if (consumers.isEmpty() && redistributor == null) {           if (logger.isTraceEnabled()) {              logger.trace("QueueImpl::Adding redistributor on queue " + this.toString());           }             redistributor = (new ConsumerHolder(new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE)));             redistributor.consumer.start();             deliverAsync();        }     }       private void clearRedistributorFuture() {        ScheduledFuture<?> future = redistributorFuture;        redistributorFuture = null;        if (future != null) {           future.cancel(false);        }     }       public void deliverAsync() {        deliverAsync(false);     }       private void deliverAsync(boolean noWait) {        if (scheduledRunners.get() < MAX_SCHEDULED_RUNNERS) {           scheduledRunners.incrementAndGet();           checkDepage(noWait);           try {              getExecutor().execute(deliverRunner);           } catch (RejectedExecutionException ignored) {              // no-op              scheduledRunners.decrementAndGet();           }        }     }       //......  }
  • DelayedAddRedistributor实现了Runnable方法,其run方先执行internalAddRedistributor,后执行clearRedistributorFuture;internalAddRedistributor会创建Redistributor以及ConsumerHolder,然后执行redistributor.consumer.start(),最后执行deliverAsync方法调度执行DeliverRunner

ConsumerHolder

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

   protected static class ConsumerHolder<T extends Consumer> implements PriorityAware {          ConsumerHolder(final T consumer) {           this.consumer = consumer;        }          final T consumer;          LinkedListIterator<MessageReference> iter;          private void resetIterator() {           if (iter != null) {              iter.close();           }           iter = null;        }          private Consumer consumer() {           return consumer;        }          @Override        public boolean equals(Object o) {           if (this == o) return true;           if (o == null || getClass() != o.getClass()) return false;           ConsumerHolder<?> that = (ConsumerHolder<?>) o;           return Objects.equals(consumer, that.consumer);        }          @Override        public int hashCode() {           return Objects.hash(consumer);        }          @Override        public int getPriority() {           return consumer.getPriority();        }     }
  • ConsumerHolder实现了PriorityAware接口,其getPriority方法返回的是consumer.getPriority()

Redistributor

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java

public class Redistributor implements Consumer {       private boolean active;       private final StorageManager storageManager;       private final PostOffice postOffice;       private final Executor executor;       private final int batchSize;       private final Queue queue;       private int count;       private final long sequentialID;       // a Flush executor here is happening inside another executor.     // what may cause issues under load. Say you are running out of executors for cases where you don't need to wait at all.     // So, instead of using a future we will use a plain ReusableLatch here     private ReusableLatch pendingRuns = new ReusableLatch();       public Redistributor(final Queue queue,                          final StorageManager storageManager,                          final PostOffice postOffice,                          final Executor executor,                          final int batchSize) {        this.queue = queue;          this.sequentialID = storageManager.generateID();          this.storageManager = storageManager;          this.postOffice = postOffice;          this.executor = executor;          this.batchSize = batchSize;     }       @Override     public long sequentialID() {        return sequentialID;     }       @Override     public Filter getFilter() {        return null;     }       @Override     public String debug() {        return toString();     }       @Override     public String toManagementString() {        return "Redistributor[" + queue.getName() + "/" + queue.getID() + "]";     }       @Override     public void disconnect() {        //noop     }       public synchronized void start() {        active = true;     }       @Override     public synchronized HandleStatus handle(final MessageReference reference) throws Exception {        if (!active) {           return HandleStatus.BUSY;        } else if (reference.getMessage().getGroupID() != null) {           //we shouldn't redistribute with message groups return NO_MATCH so other messages can be delivered           return HandleStatus.NO_MATCH;        }          final Transaction tx = new TransactionImpl(storageManager);          final Pair<RoutingContext, Message> routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx);          if (routingInfo == null) {           tx.rollback();           return HandleStatus.BUSY;        }          if (!reference.getMessage().isLargeMessage()) {             postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);             ackRedistribution(reference, tx);        } else {           active = false;           executor.execute(new Runnable() {              @Override              public void run() {                 try {                      postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false);                      ackRedistribution(reference, tx);                      synchronized (Redistributor.this) {                       active = true;                         count++;                         queue.deliverAsync();                    }                 } catch (Exception e) {                    try {                       tx.rollback();                    } catch (Exception e2) {                       // Nothing much we can do now                       ActiveMQServerLogger.LOGGER.failedToRollback(e2);                    }                 }              }           });        }          return HandleStatus.HANDLED;     }       //......  }
  • Redistributor实现了Consumer接口,其start方法标记active为true;其handle方法在active为false时返回HandleStatus.BUSY;之后执行postOffice.redistribute(reference.getMessage(), queue, tx)获取routingInfo,然后对于非largeMessage的执行postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false)以及ackRedistribution(reference, tx),最后返回HandleStatus.HANDLED

DeliverRunner

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {       //......       private final class DeliverRunner implements Runnable {          @Override        public void run() {           try {              // during the transition between paging and nonpaging, we could have this using a different executor              // and at this short period we could have more than one delivery thread running in async mode              // this will avoid that possibility              // We will be using the deliverRunner instance as the guard object to avoid multiple threads executing              // an asynchronous delivery              enterCritical(CRITICAL_DELIVER);              boolean needCheckDepage = false;              try {                 deliverLock.lock();                 try {                    needCheckDepage = deliver();                 } finally {                    deliverLock.unlock();                 }              } finally {                 leaveCritical(CRITICAL_DELIVER);              }                if (needCheckDepage) {                 enterCritical(CRITICAL_CHECK_DEPAGE);                 try {                    checkDepage(true);                 } finally {                    leaveCritical(CRITICAL_CHECK_DEPAGE);                 }              }             } catch (Exception e) {              ActiveMQServerLogger.LOGGER.errorDelivering(e);           }        }     }       private synchronized HandleStatus handle(final MessageReference reference, final Consumer consumer) {        HandleStatus status;        try {           status = consumer.handle(reference);        } catch (Throwable t) {           ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference);             // If the consumer throws an exception we remove the consumer           try {              removeConsumer(consumer);           } catch (Exception e) {              ActiveMQServerLogger.LOGGER.errorRemovingConsumer(e);           }           return HandleStatus.BUSY;        }          if (status == null) {           throw new IllegalStateException("ClientConsumer.handle() should never return null");        }          return status;     }       //......  }
  • DeliverRunner实现了Runnable接口,其run方法会执行deliver方法,该方法会执行handle方法,后者会执行consumer.handle(reference);而在redistributor不为null时,其consumer为redistributor.consumer

小结

QueueImpl的addRedistributor在delay大于0的时候会创建并调度DelayedAddRedistributor;DelayedAddRedistributor实现了Runnable方法,其run方先执行internalAddRedistributor,后执行clearRedistributorFuture;internalAddRedistributor会创建Redistributor以及ConsumerHolder,然后执行redistributor.consumer.start(),最后执行deliverAsync方法调度执行DeliverRunner;DeliverRunner实现了Runnable接口,其run方法会执行deliver方法,该方法会执行handle方法,后者会执行consumer.handle(reference);而在redistributor不为null时,其consumer为redistributor.consumer;redistributor.consumer的handle方法对于非largeMessage的执行postOffice.processRoute(routingInfo.getB(), routingInfo.getA(), false)以及ackRedistribution(reference, tx),最后返回HandleStatus.HANDLED

doc

  • QueueImpl