聊聊artemis的reconnectAttempts
- 2020 年 2 月 24 日
- 筆記
序
本文主要研究一下artemis的reconnectAttempts
reconnectAttempts
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
public final class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener { //...... public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration, int reconnectAttempts) throws Exception { assertOpen(); initialize(); ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors); addToConnecting(factory); try { try { factory.connect(reconnectAttempts); } catch (ActiveMQException e1) { //we need to make sure is closed just for garbage collection factory.close(); throw e1; } addFactory(factory); return factory; } finally { removeFromConnecting(factory); } } //...... }
- ServerLocatorImpl的createSessionFactory方法創建ClientSessionFactoryImpl,然後執行factory.connect(reconnectAttempts)
ClientSessionFactoryImpl
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener { //...... public void connect(final int initialConnectAttempts) throws ActiveMQException { // Get the connection getConnectionWithRetry(initialConnectAttempts, null); if (connection == null) { StringBuilder msg = new StringBuilder("Unable to connect to server using configuration ").append(currentConnectorConfig); if (backupConfig != null) { msg.append(" and backup configuration ").append(backupConfig); } throw new ActiveMQNotConnectedException(msg.toString()); } } private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) { if (!clientProtocolManager.isAlive()) return; if (logger.isTraceEnabled()) { logger.trace("getConnectionWithRetry::" + reconnectAttempts + " with retryInterval = " + retryInterval + " multiplier = " + retryIntervalMultiplier, new Exception("trace")); } long interval = retryInterval; int count = 0; while (clientProtocolManager.isAlive()) { if (logger.isDebugEnabled()) { logger.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts); } if (getConnection() != null) { if (oldConnection != null && oldConnection instanceof CoreRemotingConnection) { // transferring old connection version into the new connection ((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion()); } if (logger.isDebugEnabled()) { logger.debug("Reconnection successful"); } return; } else { // Failed to get connection if (reconnectAttempts != 0) { count++; if (reconnectAttempts != -1 && count == reconnectAttempts) { if (reconnectAttempts != 1) { ActiveMQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts); } return; } if (ClientSessionFactoryImpl.logger.isTraceEnabled()) { ClientSessionFactoryImpl.logger.trace("Waiting " + interval + " milliseconds before next retry. RetryInterval=" + retryInterval + " and multiplier=" + retryIntervalMultiplier); } if (waitForRetry(interval)) return; // Exponential back-off long newInterval = (long) (interval * retryIntervalMultiplier); if (newInterval > maxRetryInterval) { newInterval = maxRetryInterval; } interval = newInterval; } else { logger.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory"); return; } } } } public boolean waitForRetry(long interval) { try { if (clientProtocolManager.waitOnLatch(interval)) { return true; } } catch (InterruptedException ignore) { throw new ActiveMQInterruptedException(createTrace); } return false; } //...... }
- ClientSessionFactoryImpl的connect方法主要是執行getConnectionWithRetry;而getConnectionWithRetry方法以clientProtocolManager.isAlive()條件進行while循環執行getConnection(),如果為null且reconnectAttempts不為0則進行重試,遞增count,當reconnectAttempts不為-1且reconnectAttempts等於count時跳出循環,重試的時候通過waitForRetry(interval)進行等待若返回true則提前return,否則更新interval進行下一輪循環;waitForRetry則通過clientProtocolManager.waitOnLatch(interval)進行等待
ActiveMQClientProtocolManager
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
public class ActiveMQClientProtocolManager implements ClientProtocolManager { //...... private final CountDownLatch waitLatch = new CountDownLatch(1); //...... public boolean waitOnLatch(long milliseconds) throws InterruptedException { return waitLatch.await(milliseconds, TimeUnit.MILLISECONDS); } public void stop() { alive = false; synchronized (inCreateSessionGuard) { if (inCreateSessionLatch != null) inCreateSessionLatch.countDown(); } Channel channel1 = getChannel1(); if (channel1 != null) { channel1.returnBlocking(); } waitLatch.countDown(); } //...... }
- ActiveMQClientProtocolManager有個名為waitLatch的CountDownLatch,waitOnLatch方法通過waitLatch.await(milliseconds, TimeUnit.MILLISECONDS)進行等待,而stop方法則執行waitLatch.countDown()
小結
ClientSessionFactoryImpl的connect方法主要是執行getConnectionWithRetry;而getConnectionWithRetry方法以clientProtocolManager.isAlive()條件進行while循環執行getConnection(),如果為null且reconnectAttempts不為0則進行重試,遞增count,當reconnectAttempts不為-1且reconnectAttempts等於count時跳出循環,重試的時候通過waitForRetry(interval)進行等待若返回true則提前return,否則更新interval進行下一輪循環;waitForRetry則通過clientProtocolManager.waitOnLatch(interval)進行等待
doc
- ClientSessionFactoryImpl