聊聊artemis的callFailoverTimeout
- 2020 年 2 月 24 日
- 筆記
序
本文主要研究一下artemis的callFailoverTimeout
establishNewConnection
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener { //...... protected RemotingConnection establishNewConnection() { Connection transportConnection = createTransportConnection(); if (transportConnection == null) { if (ClientSessionFactoryImpl.logger.isTraceEnabled()) { logger.trace("Neither backup or live were active, will just give up now"); } return null; } RemotingConnection newConnection = clientProtocolManager.connect(transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors, new SessionFactoryTopologyHandler()); newConnection.addFailureListener(new DelegatingFailureListener(newConnection.getID())); schedulePing(); if (logger.isTraceEnabled()) { logger.trace("returning " + newConnection); } return newConnection; } //...... }
- ClientSessionFactoryImpl的establishNewConnection通過clientProtocolManager.connect創建RemotingConnection
ActiveMQClientProtocolManager
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
public class ActiveMQClientProtocolManager implements ClientProtocolManager { //...... public RemotingConnection connect(Connection transportConnection, long callTimeout, long callFailoverTimeout, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors, TopologyResponseHandler topologyResponseHandler) { this.connection = new RemotingConnectionImpl(createPacketDecoder(), transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors, executor); this.topologyResponseHandler = topologyResponseHandler; getChannel0().setHandler(new Channel0Handler(connection)); sendHandshake(transportConnection); return connection; } //...... }
- ActiveMQClientProtocolManager的connect方法創建了RemotingConnectionImpl
RemotingConnectionImpl
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
public class RemotingConnectionImpl extends AbstractRemotingConnection implements CoreRemotingConnection { //...... private final long blockingCallTimeout; private final long blockingCallFailoverTimeout; //...... public RemotingConnectionImpl(final PacketDecoder packetDecoder, final Connection transportConnection, final long blockingCallTimeout, final long blockingCallFailoverTimeout, final List<Interceptor> incomingInterceptors, final List<Interceptor> outgoingInterceptors, final Executor connectionExecutor) { this(packetDecoder, transportConnection, blockingCallTimeout, blockingCallFailoverTimeout, incomingInterceptors, outgoingInterceptors, true, null, connectionExecutor); } private RemotingConnectionImpl(final PacketDecoder packetDecoder, final Connection transportConnection, final long blockingCallTimeout, final long blockingCallFailoverTimeout, final List<Interceptor> incomingInterceptors, final List<Interceptor> outgoingInterceptors, final boolean client, final SimpleString nodeID, final Executor connectionExecutor) { super(transportConnection, connectionExecutor); this.packetDecoder = packetDecoder; this.blockingCallTimeout = blockingCallTimeout; this.blockingCallFailoverTimeout = blockingCallFailoverTimeout; this.incomingInterceptors = incomingInterceptors; this.outgoingInterceptors = outgoingInterceptors; this.client = client; this.nodeID = nodeID; transportConnection.setProtocolConnection(this); if (logger.isTraceEnabled()) { logger.trace("RemotingConnectionImpl created: " + this); } } @Override public long getBlockingCallTimeout() { return blockingCallTimeout; } @Override public long getBlockingCallFailoverTimeout() { return blockingCallFailoverTimeout; } //...... }
- RemotingConnectionImpl定義了blockingCallFailoverTimeout屬性
waitForFailOver
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
public final class ChannelImpl implements Channel { //...... private final Lock lock = new ReentrantLock(); private final Condition sendCondition = lock.newCondition(); private final Condition failoverCondition = lock.newCondition(); private boolean failingOver; //...... private void waitForFailOver(String timeoutMsg) { try { if (connection.getBlockingCallFailoverTimeout() < 0) { while (failingOver) { failoverCondition.await(); } } else if (!ConcurrentUtil.await(failoverCondition, connection.getBlockingCallFailoverTimeout())) { logger.debug(timeoutMsg); } } catch (InterruptedException e) { throw new ActiveMQInterruptedException(e); } } private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) { if (invokeInterceptors(packet, interceptors, connection) != null) { return false; } synchronized (sendLock) { packet.setChannelID(id); if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { packet.setCorrelationID(responseAsyncCache.nextCorrelationID()); } if (logger.isTraceEnabled()) { logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id); } ActiveMQBuffer buffer = packet.encode(connection); lock.lock(); try { if (failingOver) { waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send"); } // Sanity check if (transferring) { throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover(); } if (resendCache != null && packet.isRequiresConfirmations()) { addResendPacket(packet); } } finally { lock.unlock(); } if (logger.isTraceEnabled()) { logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id); } checkReconnectID(reconnectID); //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in, //As the send could block if the response cache cannot add, preventing responses to be handled. if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { while (!responseAsyncCache.add(packet)) { try { Thread.sleep(1); } catch (Exception e) { // Ignore } } } // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp // buffer is full, preventing any incoming buffers being handled and blocking failover try { connection.getTransportConnection().write(buffer, flush, batch); } catch (Throwable t) { //If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full. //The client would get still know about this as the exception bubbles up the call stack instead. if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) { responseAsyncCache.remove(packet.getCorrelationID()); } throw t; } return true; } } @Override public void lock() { if (logger.isTraceEnabled()) { logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " lock channel " + this); } lock.lock(); reconnectID.incrementAndGet(); failingOver = true; lock.unlock(); } @Override public void unlock() { if (logger.isTraceEnabled()) { logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " unlock channel " + this); } lock.lock(); failingOver = false; failoverCondition.signalAll(); lock.unlock(); } //...... }
- ChannelImpl的waitForFailOver在connection.getBlockingCallFailoverTimeout()大於等於0的時候執行ConcurrentUtil.await(failoverCondition, connection.getBlockingCallFailoverTimeout());send方法在failingOver為true時會執行waitForFailOver方法;其lock方法會設置failingOver為true,unlock方法會設置failingOver為false
ActiveMQSessionContext
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
public class ActiveMQSessionContext extends SessionContext { //...... private final Channel sessionChannel; //...... @Override public void lockCommunications() { sessionChannel.lock(); } @Override public void releaseCommunications() { sessionChannel.setTransferring(false); sessionChannel.unlock(); } //...... }
- ActiveMQSessionContext的lockCommunications方法會執行sessionChannel.lock(),而releaseCommunications會執行sessionChannel.unlock()
ClientSessionImpl
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
public final class ClientSessionImpl implements ClientSessionInternal, FailureListener { //...... public void preHandleFailover(RemotingConnection connection) { // We lock the channel to prevent any packets to be added to the re-send // cache during the failover process //we also do this before the connection fails over to give the session a chance to block for failover sessionContext.lockCommunications(); } public boolean handleFailover(final RemotingConnection backupConnection, ActiveMQException cause) { boolean suc = true; synchronized (this) { if (closed) { return true; } boolean resetCreditManager = false; try { // TODO remove this and encapsulate it boolean reattached = sessionContext.reattachOnNewConnection(backupConnection); if (!reattached) { // We change the name of the Session, otherwise the server could close it while we are still sending the recreate // in certain failure scenarios // For instance the fact we didn't change the name of the session after failover or reconnect // was the reason allowing multiple Sessions to be closed simultaneously breaking concurrency this.name = UUIDGenerator.getInstance().generateStringUUID(); sessionContext.resetName(name); Map<ConsumerContext, ClientConsumerInternal> clonedConsumerEntries = cloneConsumerEntries(); for (ClientConsumerInternal consumer : clonedConsumerEntries.values()) { consumer.clearAtFailover(); } // The session wasn't found on the server - probably we're failing over onto a backup server where the // session won't exist or the target server has been restarted - in this case the session will need to be // recreated, // and we'll need to recreate any consumers // It could also be that the server hasn't been restarted, but the session is currently executing close, // and // that // has already been executed on the server, that's why we can't find the session- in this case we *don't* // want // to recreate the session, we just want to unblock the blocking call if (!inClose && mayAttemptToFailover) { sessionContext.recreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge); for (Map.Entry<ConsumerContext, ClientConsumerInternal> entryx : clonedConsumerEntries.entrySet()) { ClientConsumerInternal consumerInternal = entryx.getValue(); synchronized (consumerInternal) { if (!consumerInternal.isClosed()) { sessionContext.recreateConsumerOnServer(consumerInternal, entryx.getKey().getId(), started); } } } if ((!autoCommitAcks || !autoCommitSends) && workDone) { // this is protected by a lock, so we can guarantee nothing will sneak here // while we do our work here rollbackOnly = true; } if (currentXID != null) { sessionContext.xaFailed(currentXID); rollbackOnly = true; } // Now start the session if it was already started if (started) { for (ClientConsumerInternal consumer : clonedConsumerEntries.values()) { consumer.clearAtFailover(); consumer.start(); } sessionContext.restartSession(); } resetCreditManager = true; } sessionContext.returnBlocking(cause); } } catch (Throwable t) { ActiveMQClientLogger.LOGGER.failedToHandleFailover(t); suc = false; } finally { sessionContext.releaseCommunications(); } if (resetCreditManager) { synchronized (producerCreditManager) { producerCreditManager.reset(); } // Also need to send more credits for consumers, otherwise the system could hand with the server // not having any credits to send } } HashMap<String, String> metaDataToSend; synchronized (metadata) { metaDataToSend = new HashMap<>(metadata); } sessionContext.resetMetadata(metaDataToSend); return suc; } //...... }
- ClientSessionImpl的preHandleFailover方法會執行sessionContext.lockCommunications(),而handleFailover方法在recreateConsumerOnServer之後的finally裡頭會執行sessionContext.releaseCommunications()
小結
RemotingConnectionImpl定義了blockingCallFailoverTimeout屬性;ChannelImpl的waitForFailOver在connection.getBlockingCallFailoverTimeout()大於等於0的時候執行ConcurrentUtil.await(failoverCondition, connection.getBlockingCallFailoverTimeout());send方法在failingOver為true時會執行waitForFailOver方法;其lock方法會設置failingOver為true,unlock方法會設置failingOver為false
doc
- RemotingConnectionImpl