聊聊artemis的handleConnectionFailure

  • 2020 年 2 月 24 日
  • 筆記

本文主要研究一下artemis的handleConnectionFailure

handleConnectionFailure

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java

public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {       //......       private void handleConnectionFailure(final Object connectionID,                                          final ActiveMQException me,                                          String scaleDownTargetNodeID) {        try {           failoverOrReconnect(connectionID, me, scaleDownTargetNodeID);        } catch (ActiveMQInterruptedException e1) {           // this is just a debug, since an interrupt is an expected event (in case of a shutdown)           logger.debug(e1.getMessage(), e1);        } catch (Throwable t) {           ActiveMQClientLogger.LOGGER.unableToHandleConnectionFailure(t);           //for anything else just close so clients are un blocked           close();           throw t;        }     }       private void failoverOrReconnect(final Object connectionID,                                      final ActiveMQException me,                                      String scaleDownTargetNodeID) {        ActiveMQClientLogger.LOGGER.failoverOrReconnect(connectionID, me);          for (ClientSessionInternal session : sessions) {           SessionContext context = session.getSessionContext();           if (context instanceof ActiveMQSessionContext) {              ActiveMQSessionContext sessionContext = (ActiveMQSessionContext) context;              if (sessionContext.isKilled()) {                 setReconnectAttempts(0);              }           }        }          Set<ClientSessionInternal> sessionsToClose = null;        if (!clientProtocolManager.isAlive())           return;        Lock localFailoverLock = lockFailover();        try {           if (connection == null || !connection.getID().equals(connectionID) || !clientProtocolManager.isAlive()) {              // We already failed over/reconnected - probably the first failure came in, all the connections were failed              // over then an async connection exception or disconnect              // came in for one of the already exitLoop connections, so we return true - we don't want to call the              // listeners again                return;           }             if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {              logger.trace("Client Connection failed, calling failure listeners and trying to reconnect, reconnectAttempts=" + reconnectAttempts);           }             callFailoverListeners(FailoverEventType.FAILURE_DETECTED);           // We call before reconnection occurs to give the user a chance to do cleanup, like cancel messages           callSessionFailureListeners(me, false, false, scaleDownTargetNodeID);             // Now get locks on all channel 1s, whilst holding the failoverLock - this makes sure           // There are either no threads executing in createSession, or one is blocking on a createSession           // result.             // Then interrupt the channel 1 that is blocking (could just interrupt them all)             // Then release all channel 1 locks - this allows the createSession to exit the monitor             // Then get all channel 1 locks again - this ensures the any createSession thread has executed the section and           // returned all its connections to the connection manager (the code to return connections to connection manager           // must be inside the lock             // Then perform failover             // Then release failoverLock             // The other side of the bargain - during createSession:           // The calling thread must get the failoverLock and get its' connections when this is           // locked.           // While this is still locked it must then get the channel1 lock           // It can then release the failoverLock           // It should catch ActiveMQException.INTERRUPTED in the call to channel.sendBlocking           // It should then return its connections, with channel 1 lock still held           // It can then release the channel 1 lock, and retry (which will cause locking on           // failoverLock           // until failover is complete             if (reconnectAttempts != 0) {                if (clientProtocolManager.cleanupBeforeFailover(me)) {                   // Now we absolutely know that no threads are executing in or blocked in                 // createSession,                 // and no                 // more will execute it until failover is complete                   // So.. do failover / reconnection                   RemotingConnection oldConnection = connection;                   connection = null;                   Connector localConnector = connector;                 if (localConnector != null) {                    try {                       localConnector.close();                    } catch (Exception ignore) {                       // no-op                    }                 }                   cancelScheduledTasks();                   connector = null;                   reconnectSessions(oldConnection, reconnectAttempts, me);                   if (oldConnection != null) {                    oldConnection.destroy();                 }                   if (connection != null) {                    callFailoverListeners(FailoverEventType.FAILOVER_COMPLETED);                 }              }           } else {              RemotingConnection connectionToDestory = connection;              if (connectionToDestory != null) {                 connectionToDestory.destroy();              }              connection = null;           }             if (connection == null) {              synchronized (sessions) {                 sessionsToClose = new HashSet<>(sessions);              }              callFailoverListeners(FailoverEventType.FAILOVER_FAILED);              callSessionFailureListeners(me, true, false, scaleDownTargetNodeID);           }        } finally {           localFailoverLock.unlock();        }          // This needs to be outside the failover lock to prevent deadlock        if (connection != null) {           callSessionFailureListeners(me, true, true);        }        if (sessionsToClose != null) {           // If connection is null it means we didn't succeed in failing over or reconnecting           // so we close all the sessions, so they will throw exceptions when attempted to be used             for (ClientSessionInternal session : sessionsToClose) {              try {                 session.cleanUp(true);              } catch (Exception cause) {                 ActiveMQClientLogger.LOGGER.failedToCleanupSession(cause);              }           }        }     }       //......  }
  • handleConnectionFailure方法会调用failoverOrReconnect方法,该方法会先遍历sessions,对于sessionContext.isKilled()为true的执行setReconnectAttempts(0);之后执行lockFailover(),然后reconnectAttempts不为0的执行reconnectSessions,最后执行localFailoverLock.unlock()

reconnectSessions

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java

public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {       //......       private void reconnectSessions(final RemotingConnection oldConnection,                                    final int reconnectAttempts,                                    final ActiveMQException cause) {        HashSet<ClientSessionInternal> sessionsToFailover;        synchronized (sessions) {           sessionsToFailover = new HashSet<>(sessions);        }          for (ClientSessionInternal session : sessionsToFailover) {           session.preHandleFailover(connection);        }          getConnectionWithRetry(reconnectAttempts, oldConnection);          if (connection == null) {           if (!clientProtocolManager.isAlive())              ActiveMQClientLogger.LOGGER.failedToConnectToServer();             return;        }          List<FailureListener> oldListeners = oldConnection.getFailureListeners();          List<FailureListener> newListeners = new ArrayList<>(connection.getFailureListeners());          for (FailureListener listener : oldListeners) {           // Add all apart from the old DelegatingFailureListener           if (listener instanceof DelegatingFailureListener == false) {              newListeners.add(listener);           }        }          connection.setFailureListeners(newListeners);          // This used to be done inside failover        // it needs to be done on the protocol        ((CoreRemotingConnection) connection).syncIDGeneratorSequence(((CoreRemotingConnection) oldConnection).getIDGeneratorSequence());          for (ClientSessionInternal session : sessionsToFailover) {           if (!session.handleFailover(connection, cause)) {              connection.destroy();              this.connection = null;              return;           }        }     }       private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) {        if (!clientProtocolManager.isAlive())           return;        if (logger.isTraceEnabled()) {           logger.trace("getConnectionWithRetry::" + reconnectAttempts +                           " with retryInterval = " +                           retryInterval +                           " multiplier = " +                           retryIntervalMultiplier, new Exception("trace"));        }          long interval = retryInterval;          int count = 0;          while (clientProtocolManager.isAlive()) {           if (logger.isDebugEnabled()) {              logger.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts);           }             if (getConnection() != null) {              if (oldConnection != null && oldConnection instanceof CoreRemotingConnection) {                 // transferring old connection version into the new connection                 ((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion());              }              if (logger.isDebugEnabled()) {                 logger.debug("Reconnection successful");              }              return;           } else {              // Failed to get connection                if (reconnectAttempts != 0) {                 count++;                   if (reconnectAttempts != -1 && count == reconnectAttempts) {                    if (reconnectAttempts != 1) {                       ActiveMQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts);                    }                      return;                 }                   if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {                    ClientSessionFactoryImpl.logger.trace("Waiting " + interval + " milliseconds before next retry. RetryInterval=" + retryInterval + " and multiplier=" + retryIntervalMultiplier);                 }                   if (waitForRetry(interval))                    return;                   // Exponential back-off                 long newInterval = (long) (interval * retryIntervalMultiplier);                   if (newInterval > maxRetryInterval) {                    newInterval = maxRetryInterval;                 }                   interval = newInterval;              } else {                 logger.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory");                 return;              }           }        }     }       public RemotingConnection getConnection() {        if (closed)           throw new IllegalStateException("ClientSessionFactory is closed!");        if (!clientProtocolManager.isAlive())           return null;        synchronized (connectionLock) {           if (connection != null) {              // a connection already exists, so returning the same one              return connection;           } else {              RemotingConnection connection = establishNewConnection();                this.connection = connection;                //we check if we can actually connect.              // we do it here as to receive the reply connection has to be not null              //make sure to reset this.connection == null              if (connection != null && liveNodeID != null) {                 try {                    if (!clientProtocolManager.checkForFailover(liveNodeID)) {                       connection.destroy();                       this.connection = null;                       return null;                    }                 } catch (ActiveMQException e) {                    connection.destroy();                    this.connection = null;                    return null;                 }              }                if (connection != null && serverLocator.getAfterConnectInternalListener() != null) {                 serverLocator.getAfterConnectInternalListener().onConnection(this);              }                if (serverLocator.getTopology() != null) {                 if (connection != null) {                    if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {                       logger.trace(this + "::Subscribing Topology");                    }                    clientProtocolManager.sendSubscribeTopology(serverLocator.isClusterConnection());                 }              } else {                 logger.debug("serverLocator@" + System.identityHashCode(serverLocator + " had no topology"));              }                return connection;           }        }     }       //......  }
  • reconnectSessions方法首先执行getConnectionWithRetry,然后挨个将oldListeners添加到新的connection中,最后遍历sessionsToFailover执行session.handleFailover(connection, cause),对于返回false的执行connection.destroy()然后return;getConnectionWithRetry方法通过getConnection()获取新连接并赋值给connection,如果connection为null则进行重试直到reconnectAttempts小于等于0,重试时通过waitForRetry(interval)来控制重试的间隔

小结

ClientSessionFactoryImpl的handleConnectionFailure方法会调用failoverOrReconnect方法,该方法会先遍历sessions,对于sessionContext.isKilled()为true的执行setReconnectAttempts(0);之后执行lockFailover(),然后reconnectAttempts不为0的执行reconnectSessions,最后执行localFailoverLock.unlock()

doc

  • ClientSessionFactoryImpl