聊聊artemis的reconnectAttempts

  • 2020 年 2 月 24 日
  • 筆記

本文主要研究一下artemis的reconnectAttempts

reconnectAttempts

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java

public final class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener {       //......       public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration,                                                      int reconnectAttempts) throws Exception {        assertOpen();          initialize();          ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);          addToConnecting(factory);        try {           try {              factory.connect(reconnectAttempts);           } catch (ActiveMQException e1) {              //we need to make sure is closed just for garbage collection              factory.close();              throw e1;           }           addFactory(factory);           return factory;        } finally {           removeFromConnecting(factory);        }     }       //......  }
  • ServerLocatorImpl的createSessionFactory方法创建ClientSessionFactoryImpl,然后执行factory.connect(reconnectAttempts)

ClientSessionFactoryImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java

public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {       //......       public void connect(final int initialConnectAttempts) throws ActiveMQException {        // Get the connection        getConnectionWithRetry(initialConnectAttempts, null);          if (connection == null) {           StringBuilder msg = new StringBuilder("Unable to connect to server using configuration ").append(currentConnectorConfig);           if (backupConfig != null) {              msg.append(" and backup configuration ").append(backupConfig);           }           throw new ActiveMQNotConnectedException(msg.toString());        }       }       private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) {        if (!clientProtocolManager.isAlive())           return;        if (logger.isTraceEnabled()) {           logger.trace("getConnectionWithRetry::" + reconnectAttempts +                           " with retryInterval = " +                           retryInterval +                           " multiplier = " +                           retryIntervalMultiplier, new Exception("trace"));        }          long interval = retryInterval;          int count = 0;          while (clientProtocolManager.isAlive()) {           if (logger.isDebugEnabled()) {              logger.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts);           }             if (getConnection() != null) {              if (oldConnection != null && oldConnection instanceof CoreRemotingConnection) {                 // transferring old connection version into the new connection                 ((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion());              }              if (logger.isDebugEnabled()) {                 logger.debug("Reconnection successful");              }              return;           } else {              // Failed to get connection                if (reconnectAttempts != 0) {                 count++;                   if (reconnectAttempts != -1 && count == reconnectAttempts) {                    if (reconnectAttempts != 1) {                       ActiveMQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts);                    }                      return;                 }                   if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {                    ClientSessionFactoryImpl.logger.trace("Waiting " + interval + " milliseconds before next retry. RetryInterval=" + retryInterval + " and multiplier=" + retryIntervalMultiplier);                 }                   if (waitForRetry(interval))                    return;                   // Exponential back-off                 long newInterval = (long) (interval * retryIntervalMultiplier);                   if (newInterval > maxRetryInterval) {                    newInterval = maxRetryInterval;                 }                   interval = newInterval;              } else {                 logger.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory");                 return;              }           }        }     }       public boolean waitForRetry(long interval) {        try {           if (clientProtocolManager.waitOnLatch(interval)) {              return true;           }        } catch (InterruptedException ignore) {           throw new ActiveMQInterruptedException(createTrace);        }        return false;     }       //......  }  
  • ClientSessionFactoryImpl的connect方法主要是执行getConnectionWithRetry;而getConnectionWithRetry方法以clientProtocolManager.isAlive()条件进行while循环执行getConnection(),如果为null且reconnectAttempts不为0则进行重试,递增count,当reconnectAttempts不为-1且reconnectAttempts等于count时跳出循环,重试的时候通过waitForRetry(interval)进行等待若返回true则提前return,否则更新interval进行下一轮循环;waitForRetry则通过clientProtocolManager.waitOnLatch(interval)进行等待

ActiveMQClientProtocolManager

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java

public class ActiveMQClientProtocolManager implements ClientProtocolManager {       //......       private final CountDownLatch waitLatch = new CountDownLatch(1);       //......       public boolean waitOnLatch(long milliseconds) throws InterruptedException {        return waitLatch.await(milliseconds, TimeUnit.MILLISECONDS);     }       public void stop() {        alive = false;          synchronized (inCreateSessionGuard) {           if (inCreateSessionLatch != null)              inCreateSessionLatch.countDown();        }          Channel channel1 = getChannel1();        if (channel1 != null) {           channel1.returnBlocking();        }          waitLatch.countDown();       }       //......  }
  • ActiveMQClientProtocolManager有个名为waitLatch的CountDownLatch,waitOnLatch方法通过waitLatch.await(milliseconds, TimeUnit.MILLISECONDS)进行等待,而stop方法则执行waitLatch.countDown()

小结

ClientSessionFactoryImpl的connect方法主要是执行getConnectionWithRetry;而getConnectionWithRetry方法以clientProtocolManager.isAlive()条件进行while循环执行getConnection(),如果为null且reconnectAttempts不为0则进行重试,递增count,当reconnectAttempts不为-1且reconnectAttempts等于count时跳出循环,重试的时候通过waitForRetry(interval)进行等待若返回true则提前return,否则更新interval进行下一轮循环;waitForRetry则通过clientProtocolManager.waitOnLatch(interval)进行等待

doc

  • ClientSessionFactoryImpl