聊聊artemis的handleConnectionFailure
- 2020 年 2 月 24 日
- 筆記
序
本文主要研究一下artemis的handleConnectionFailure
handleConnectionFailure
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener { //...... private void handleConnectionFailure(final Object connectionID, final ActiveMQException me, String scaleDownTargetNodeID) { try { failoverOrReconnect(connectionID, me, scaleDownTargetNodeID); } catch (ActiveMQInterruptedException e1) { // this is just a debug, since an interrupt is an expected event (in case of a shutdown) logger.debug(e1.getMessage(), e1); } catch (Throwable t) { ActiveMQClientLogger.LOGGER.unableToHandleConnectionFailure(t); //for anything else just close so clients are un blocked close(); throw t; } } private void failoverOrReconnect(final Object connectionID, final ActiveMQException me, String scaleDownTargetNodeID) { ActiveMQClientLogger.LOGGER.failoverOrReconnect(connectionID, me); for (ClientSessionInternal session : sessions) { SessionContext context = session.getSessionContext(); if (context instanceof ActiveMQSessionContext) { ActiveMQSessionContext sessionContext = (ActiveMQSessionContext) context; if (sessionContext.isKilled()) { setReconnectAttempts(0); } } } Set<ClientSessionInternal> sessionsToClose = null; if (!clientProtocolManager.isAlive()) return; Lock localFailoverLock = lockFailover(); try { if (connection == null || !connection.getID().equals(connectionID) || !clientProtocolManager.isAlive()) { // We already failed over/reconnected - probably the first failure came in, all the connections were failed // over then an async connection exception or disconnect // came in for one of the already exitLoop connections, so we return true - we don't want to call the // listeners again return; } if (ClientSessionFactoryImpl.logger.isTraceEnabled()) { logger.trace("Client Connection failed, calling failure listeners and trying to reconnect, reconnectAttempts=" + reconnectAttempts); } callFailoverListeners(FailoverEventType.FAILURE_DETECTED); // We call before reconnection occurs to give the user a chance to do cleanup, like cancel messages callSessionFailureListeners(me, false, false, scaleDownTargetNodeID); // Now get locks on all channel 1s, whilst holding the failoverLock - this makes sure // There are either no threads executing in createSession, or one is blocking on a createSession // result. // Then interrupt the channel 1 that is blocking (could just interrupt them all) // Then release all channel 1 locks - this allows the createSession to exit the monitor // Then get all channel 1 locks again - this ensures the any createSession thread has executed the section and // returned all its connections to the connection manager (the code to return connections to connection manager // must be inside the lock // Then perform failover // Then release failoverLock // The other side of the bargain - during createSession: // The calling thread must get the failoverLock and get its' connections when this is // locked. // While this is still locked it must then get the channel1 lock // It can then release the failoverLock // It should catch ActiveMQException.INTERRUPTED in the call to channel.sendBlocking // It should then return its connections, with channel 1 lock still held // It can then release the channel 1 lock, and retry (which will cause locking on // failoverLock // until failover is complete if (reconnectAttempts != 0) { if (clientProtocolManager.cleanupBeforeFailover(me)) { // Now we absolutely know that no threads are executing in or blocked in // createSession, // and no // more will execute it until failover is complete // So.. do failover / reconnection RemotingConnection oldConnection = connection; connection = null; Connector localConnector = connector; if (localConnector != null) { try { localConnector.close(); } catch (Exception ignore) { // no-op } } cancelScheduledTasks(); connector = null; reconnectSessions(oldConnection, reconnectAttempts, me); if (oldConnection != null) { oldConnection.destroy(); } if (connection != null) { callFailoverListeners(FailoverEventType.FAILOVER_COMPLETED); } } } else { RemotingConnection connectionToDestory = connection; if (connectionToDestory != null) { connectionToDestory.destroy(); } connection = null; } if (connection == null) { synchronized (sessions) { sessionsToClose = new HashSet<>(sessions); } callFailoverListeners(FailoverEventType.FAILOVER_FAILED); callSessionFailureListeners(me, true, false, scaleDownTargetNodeID); } } finally { localFailoverLock.unlock(); } // This needs to be outside the failover lock to prevent deadlock if (connection != null) { callSessionFailureListeners(me, true, true); } if (sessionsToClose != null) { // If connection is null it means we didn't succeed in failing over or reconnecting // so we close all the sessions, so they will throw exceptions when attempted to be used for (ClientSessionInternal session : sessionsToClose) { try { session.cleanUp(true); } catch (Exception cause) { ActiveMQClientLogger.LOGGER.failedToCleanupSession(cause); } } } } //...... }
- handleConnectionFailure方法会调用failoverOrReconnect方法,该方法会先遍历sessions,对于sessionContext.isKilled()为true的执行setReconnectAttempts(0);之后执行lockFailover(),然后reconnectAttempts不为0的执行reconnectSessions,最后执行localFailoverLock.unlock()
reconnectSessions
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener { //...... private void reconnectSessions(final RemotingConnection oldConnection, final int reconnectAttempts, final ActiveMQException cause) { HashSet<ClientSessionInternal> sessionsToFailover; synchronized (sessions) { sessionsToFailover = new HashSet<>(sessions); } for (ClientSessionInternal session : sessionsToFailover) { session.preHandleFailover(connection); } getConnectionWithRetry(reconnectAttempts, oldConnection); if (connection == null) { if (!clientProtocolManager.isAlive()) ActiveMQClientLogger.LOGGER.failedToConnectToServer(); return; } List<FailureListener> oldListeners = oldConnection.getFailureListeners(); List<FailureListener> newListeners = new ArrayList<>(connection.getFailureListeners()); for (FailureListener listener : oldListeners) { // Add all apart from the old DelegatingFailureListener if (listener instanceof DelegatingFailureListener == false) { newListeners.add(listener); } } connection.setFailureListeners(newListeners); // This used to be done inside failover // it needs to be done on the protocol ((CoreRemotingConnection) connection).syncIDGeneratorSequence(((CoreRemotingConnection) oldConnection).getIDGeneratorSequence()); for (ClientSessionInternal session : sessionsToFailover) { if (!session.handleFailover(connection, cause)) { connection.destroy(); this.connection = null; return; } } } private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) { if (!clientProtocolManager.isAlive()) return; if (logger.isTraceEnabled()) { logger.trace("getConnectionWithRetry::" + reconnectAttempts + " with retryInterval = " + retryInterval + " multiplier = " + retryIntervalMultiplier, new Exception("trace")); } long interval = retryInterval; int count = 0; while (clientProtocolManager.isAlive()) { if (logger.isDebugEnabled()) { logger.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts); } if (getConnection() != null) { if (oldConnection != null && oldConnection instanceof CoreRemotingConnection) { // transferring old connection version into the new connection ((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion()); } if (logger.isDebugEnabled()) { logger.debug("Reconnection successful"); } return; } else { // Failed to get connection if (reconnectAttempts != 0) { count++; if (reconnectAttempts != -1 && count == reconnectAttempts) { if (reconnectAttempts != 1) { ActiveMQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts); } return; } if (ClientSessionFactoryImpl.logger.isTraceEnabled()) { ClientSessionFactoryImpl.logger.trace("Waiting " + interval + " milliseconds before next retry. RetryInterval=" + retryInterval + " and multiplier=" + retryIntervalMultiplier); } if (waitForRetry(interval)) return; // Exponential back-off long newInterval = (long) (interval * retryIntervalMultiplier); if (newInterval > maxRetryInterval) { newInterval = maxRetryInterval; } interval = newInterval; } else { logger.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory"); return; } } } } public RemotingConnection getConnection() { if (closed) throw new IllegalStateException("ClientSessionFactory is closed!"); if (!clientProtocolManager.isAlive()) return null; synchronized (connectionLock) { if (connection != null) { // a connection already exists, so returning the same one return connection; } else { RemotingConnection connection = establishNewConnection(); this.connection = connection; //we check if we can actually connect. // we do it here as to receive the reply connection has to be not null //make sure to reset this.connection == null if (connection != null && liveNodeID != null) { try { if (!clientProtocolManager.checkForFailover(liveNodeID)) { connection.destroy(); this.connection = null; return null; } } catch (ActiveMQException e) { connection.destroy(); this.connection = null; return null; } } if (connection != null && serverLocator.getAfterConnectInternalListener() != null) { serverLocator.getAfterConnectInternalListener().onConnection(this); } if (serverLocator.getTopology() != null) { if (connection != null) { if (ClientSessionFactoryImpl.logger.isTraceEnabled()) { logger.trace(this + "::Subscribing Topology"); } clientProtocolManager.sendSubscribeTopology(serverLocator.isClusterConnection()); } } else { logger.debug("serverLocator@" + System.identityHashCode(serverLocator + " had no topology")); } return connection; } } } //...... }
- reconnectSessions方法首先执行getConnectionWithRetry,然后挨个将oldListeners添加到新的connection中,最后遍历sessionsToFailover执行session.handleFailover(connection, cause),对于返回false的执行connection.destroy()然后return;getConnectionWithRetry方法通过getConnection()获取新连接并赋值给connection,如果connection为null则进行重试直到reconnectAttempts小于等于0,重试时通过waitForRetry(interval)来控制重试的间隔
小结
ClientSessionFactoryImpl的handleConnectionFailure方法会调用failoverOrReconnect方法,该方法会先遍历sessions,对于sessionContext.isKilled()为true的执行setReconnectAttempts(0);之后执行lockFailover(),然后reconnectAttempts不为0的执行reconnectSessions,最后执行localFailoverLock.unlock()
doc
- ClientSessionFactoryImpl