聊聊artemis ClientConsumer的handleRegularMessage

  • 2020 年 2 月 14 日
  • 筆記

本文主要研究一下artemis ClientConsumer的handleRegularMessage

handleRegularMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

public final class ClientConsumerImpl implements ClientConsumerInternal {       //......       private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<>(ClientConsumerImpl.NUM_PRIORITIES);       private final Runner runner = new Runner();       private volatile MessageHandler handler;       //......       private void handleRegularMessage(ClientMessageInternal message) {        if (message.getAddress() == null) {           message.setAddress(queueInfo.getAddress());        }          message.onReceipt(this);          if (!ackIndividually && message.getPriority() != 4 && !message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {           // We have messages of different priorities so we need to ack them individually since the order           // of them in the ServerConsumerImpl delivery list might not be the same as the order they are           // consumed in, which means that acking all up to won't work           ackIndividually = true;        }          // Add it to the buffer        buffer.addTail(message, message.getPriority());          if (handler != null) {           // Execute using executor           if (!stopped) {              queueExecutor();           }        } else {           notify();        }     }       private void queueExecutor() {        if (logger.isTraceEnabled()) {           logger.trace(this + "::Adding Runner on Executor for delivery");        }          sessionExecutor.execute(runner);     }       //......  }
  • ClientConsumerImpl的handleRegularMessage方法先執行buffer.addTail(message, message.getPriority()),之後對於handler不為null的會執行queueExecutor(),否則執行notify();queueExecutor方法是通過sessionExecutor執行runner

Runner

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

public final class ClientConsumerImpl implements ClientConsumerInternal {       //......       private class Runner implements Runnable {          @Override        public void run() {           try {              callOnMessage();           } catch (Exception e) {              ActiveMQClientLogger.LOGGER.onMessageError(e);                lastException = e;           }        }     }       private void callOnMessage() throws Exception {        if (closing || stopped) {           return;        }          session.workDone();          // We pull the message from the buffer from inside the Runnable so we can ensure priority        // ordering. If we just added a Runnable with the message to the executor immediately as we get it        // we could not do that          ClientMessageInternal message;          // Must store handler in local variable since might get set to null        // otherwise while this is executing and give NPE when calling onMessage        MessageHandler theHandler = handler;          if (theHandler != null) {           if (rateLimiter != null) {              rateLimiter.limit();           }             failedOver = false;             synchronized (this) {              message = buffer.poll();           }             if (message != null) {              if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {                 //Ignore, this could be a relic from a previous receiveImmediate();                 return;              }                boolean expired = message.isExpired();                flowControlBeforeConsumption(message);                if (!expired) {                 if (logger.isTraceEnabled()) {                    logger.trace(this + "::Calling handler.onMessage");                 }                 final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {                    @Override                    public ClassLoader run() {                       ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();                         Thread.currentThread().setContextClassLoader(contextClassLoader);                         return originalLoader;                    }                 });                   onMessageThread = Thread.currentThread();                 try {                    theHandler.onMessage(message);                 } finally {                    try {                       AccessController.doPrivileged(new PrivilegedAction<Object>() {                          @Override                          public Object run() {                             Thread.currentThread().setContextClassLoader(originalLoader);                             return null;                          }                       });                    } catch (Exception e) {                       ActiveMQClientLogger.LOGGER.failedPerformPostActionsOnMessage(e);                    }                      onMessageThread = null;                 }                   if (logger.isTraceEnabled()) {                    logger.trace(this + "::Handler.onMessage done");                 }                   if (message.isLargeMessage()) {                    message.discardBody();                 }              } else {                 session.expire(this, message);              }                // If slow consumer, we need to send 1 credit to make sure we get another message              if (clientWindowSize == 0) {                 startSlowConsumer();              }           }        }     }       private void flowControlBeforeConsumption(final ClientMessageInternal message) throws ActiveMQException {        // Chunk messages will execute the flow control while receiving the chunks        if (message.getFlowControlSize() != 0) {           // on large messages we should discount 1 on the first packets as we need continuity until the last packet           flowControl(message.getFlowControlSize(), !message.isLargeMessage());        }     }       public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws ActiveMQException {        if (clientWindowSize >= 0) {           creditsToSend += messageBytes;             if (creditsToSend >= clientWindowSize) {              if (clientWindowSize == 0 && discountSlowConsumer) {                 if (logger.isTraceEnabled()) {                    logger.trace(this + "::FlowControl::Sending " + creditsToSend + " -1, for slow consumer");                 }                   // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be                 // always buffering one after received the first message                 final int credits = creditsToSend - 1;                   creditsToSend = 0;                   if (credits > 0) {                    sendCredits(credits);                 }              } else {                 if (logger.isDebugEnabled()) {                    logger.debug("Sending " + messageBytes + " from flow-control");                 }                   final int credits = creditsToSend;                   creditsToSend = 0;                   if (credits > 0) {                    sendCredits(credits);                 }              }           }        }     }         //......  }  
  • Runner實現了Runnable介面,其run方法執行callOnMessage();該方法對於rateLimiter不為null會執行rateLimiter.limit();之後執行buffer.poll()獲取ClientMessageInternal,若不為null,則執行flowControlBeforeConsumption(message),對於非expired的會執行theHandler.onMessage(message)方法;對於clientWindowSize為0的則執行startSlowConsumer();flowControlBeforeConsumption方法會執行flowControl方法,該方法會計算credits,然後執行sendCredits(credits)

小結

ClientConsumerImpl的handleRegularMessage方法先執行buffer.addTail(message, message.getPriority()),之後對於handler不為null的會執行queueExecutor(),否則執行notify();queueExecutor方法是通過sessionExecutor執行runner;Runner實現了Runnable介面,其run方法執行callOnMessage();該方法對於rateLimiter不為null會執行rateLimiter.limit();之後執行buffer.poll()獲取ClientMessageInternal進行處理

doc

  • ClientConsumerImpl