聊聊artemis的callFailoverTimeout

  • 2020 年 2 月 24 日
  • 筆記

本文主要研究一下artemis的callFailoverTimeout

establishNewConnection

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java

public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {       //......       protected RemotingConnection establishNewConnection() {        Connection transportConnection = createTransportConnection();          if (transportConnection == null) {           if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {              logger.trace("Neither backup or live were active, will just give up now");           }           return null;        }          RemotingConnection newConnection = clientProtocolManager.connect(transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors, new SessionFactoryTopologyHandler());          newConnection.addFailureListener(new DelegatingFailureListener(newConnection.getID()));          schedulePing();          if (logger.isTraceEnabled()) {           logger.trace("returning " + newConnection);        }          return newConnection;     }       //......  }
  • ClientSessionFactoryImpl的establishNewConnection通过clientProtocolManager.connect创建RemotingConnection

ActiveMQClientProtocolManager

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java

public class ActiveMQClientProtocolManager implements ClientProtocolManager {       //......       public RemotingConnection connect(Connection transportConnection,                                       long callTimeout,                                       long callFailoverTimeout,                                       List<Interceptor> incomingInterceptors,                                       List<Interceptor> outgoingInterceptors,                                       TopologyResponseHandler topologyResponseHandler) {        this.connection = new RemotingConnectionImpl(createPacketDecoder(), transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors, executor);          this.topologyResponseHandler = topologyResponseHandler;          getChannel0().setHandler(new Channel0Handler(connection));          sendHandshake(transportConnection);          return connection;     }       //......  }
  • ActiveMQClientProtocolManager的connect方法创建了RemotingConnectionImpl

RemotingConnectionImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java

public class RemotingConnectionImpl extends AbstractRemotingConnection implements CoreRemotingConnection {       //......       private final long blockingCallTimeout;       private final long blockingCallFailoverTimeout;       //......       public RemotingConnectionImpl(final PacketDecoder packetDecoder,                                   final Connection transportConnection,                                   final long blockingCallTimeout,                                   final long blockingCallFailoverTimeout,                                   final List<Interceptor> incomingInterceptors,                                   final List<Interceptor> outgoingInterceptors,                                   final Executor connectionExecutor) {        this(packetDecoder, transportConnection, blockingCallTimeout, blockingCallFailoverTimeout, incomingInterceptors, outgoingInterceptors, true, null, connectionExecutor);     }       private RemotingConnectionImpl(final PacketDecoder packetDecoder,                                    final Connection transportConnection,                                    final long blockingCallTimeout,                                    final long blockingCallFailoverTimeout,                                    final List<Interceptor> incomingInterceptors,                                    final List<Interceptor> outgoingInterceptors,                                    final boolean client,                                    final SimpleString nodeID,                                    final Executor connectionExecutor) {        super(transportConnection, connectionExecutor);          this.packetDecoder = packetDecoder;          this.blockingCallTimeout = blockingCallTimeout;          this.blockingCallFailoverTimeout = blockingCallFailoverTimeout;          this.incomingInterceptors = incomingInterceptors;          this.outgoingInterceptors = outgoingInterceptors;          this.client = client;          this.nodeID = nodeID;          transportConnection.setProtocolConnection(this);          if (logger.isTraceEnabled()) {           logger.trace("RemotingConnectionImpl created: " + this);        }     }       @Override     public long getBlockingCallTimeout() {        return blockingCallTimeout;     }       @Override     public long getBlockingCallFailoverTimeout() {        return blockingCallFailoverTimeout;     }       //......  }  
  • RemotingConnectionImpl定义了blockingCallFailoverTimeout属性

waitForFailOver

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java

public final class ChannelImpl implements Channel {       //......       private final Lock lock = new ReentrantLock();       private final Condition sendCondition = lock.newCondition();       private final Condition failoverCondition = lock.newCondition();       private boolean failingOver;       //......       private void waitForFailOver(String timeoutMsg) {        try {           if (connection.getBlockingCallFailoverTimeout() < 0) {              while (failingOver) {                 failoverCondition.await();              }           } else if (!ConcurrentUtil.await(failoverCondition, connection.getBlockingCallFailoverTimeout())) {              logger.debug(timeoutMsg);           }        } catch (InterruptedException e) {           throw new ActiveMQInterruptedException(e);        }     }       private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) {        if (invokeInterceptors(packet, interceptors, connection) != null) {           return false;        }          synchronized (sendLock) {           packet.setChannelID(id);             if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {              packet.setCorrelationID(responseAsyncCache.nextCorrelationID());           }             if (logger.isTraceEnabled()) {              logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);           }             ActiveMQBuffer buffer = packet.encode(connection);             lock.lock();             try {              if (failingOver) {                 waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send");              }                // Sanity check              if (transferring) {                 throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover();              }                if (resendCache != null && packet.isRequiresConfirmations()) {                 addResendPacket(packet);              }             } finally {              lock.unlock();           }             if (logger.isTraceEnabled()) {              logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id);           }             checkReconnectID(reconnectID);             //We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,           //As the send could block if the response cache cannot add, preventing responses to be handled.           if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {              while (!responseAsyncCache.add(packet)) {                 try {                    Thread.sleep(1);                 } catch (Exception e) {                    // Ignore                 }              }           }             // The actual send must be outside the lock, or with OIO transport, the write can block if the tcp           // buffer is full, preventing any incoming buffers being handled and blocking failover           try {              connection.getTransportConnection().write(buffer, flush, batch);           } catch (Throwable t) {              //If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full.              //The client would get still know about this as the exception bubbles up the call stack instead.              if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {                 responseAsyncCache.remove(packet.getCorrelationID());              }              throw t;           }           return true;        }     }       @Override     public void lock() {        if (logger.isTraceEnabled()) {           logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " lock channel " + this);        }        lock.lock();          reconnectID.incrementAndGet();          failingOver = true;          lock.unlock();     }       @Override     public void unlock() {        if (logger.isTraceEnabled()) {           logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " unlock channel " + this);        }        lock.lock();          failingOver = false;          failoverCondition.signalAll();          lock.unlock();     }       //......  }
  • ChannelImpl的waitForFailOver在connection.getBlockingCallFailoverTimeout()大于等于0的时候执行ConcurrentUtil.await(failoverCondition, connection.getBlockingCallFailoverTimeout());send方法在failingOver为true时会执行waitForFailOver方法;其lock方法会设置failingOver为true,unlock方法会设置failingOver为false

ActiveMQSessionContext

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java

public class ActiveMQSessionContext extends SessionContext {       //......       private final Channel sessionChannel;       //......       @Override     public void lockCommunications() {        sessionChannel.lock();     }       @Override     public void releaseCommunications() {        sessionChannel.setTransferring(false);        sessionChannel.unlock();     }       //......  }
  • ActiveMQSessionContext的lockCommunications方法会执行sessionChannel.lock(),而releaseCommunications会执行sessionChannel.unlock()

ClientSessionImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java

public final class ClientSessionImpl implements ClientSessionInternal, FailureListener {       //......       public void preHandleFailover(RemotingConnection connection) {        // We lock the channel to prevent any packets to be added to the re-send        // cache during the failover process        //we also do this before the connection fails over to give the session a chance to block for failover        sessionContext.lockCommunications();     }       public boolean handleFailover(final RemotingConnection backupConnection, ActiveMQException cause) {        boolean suc = true;          synchronized (this) {           if (closed) {              return true;           }             boolean resetCreditManager = false;             try {                // TODO remove this and encapsulate it                boolean reattached = sessionContext.reattachOnNewConnection(backupConnection);                if (!reattached) {                   // We change the name of the Session, otherwise the server could close it while we are still sending the recreate                 // in certain failure scenarios                 // For instance the fact we didn't change the name of the session after failover or reconnect                 // was the reason allowing multiple Sessions to be closed simultaneously breaking concurrency                 this.name = UUIDGenerator.getInstance().generateStringUUID();                   sessionContext.resetName(name);                   Map<ConsumerContext, ClientConsumerInternal> clonedConsumerEntries = cloneConsumerEntries();                   for (ClientConsumerInternal consumer : clonedConsumerEntries.values()) {                    consumer.clearAtFailover();                 }                   // The session wasn't found on the server - probably we're failing over onto a backup server where the                 // session won't exist or the target server has been restarted - in this case the session will need to be                 // recreated,                 // and we'll need to recreate any consumers                   // It could also be that the server hasn't been restarted, but the session is currently executing close,                 // and                 // that                 // has already been executed on the server, that's why we can't find the session- in this case we *don't*                 // want                 // to recreate the session, we just want to unblock the blocking call                 if (!inClose && mayAttemptToFailover) {                    sessionContext.recreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge);                      for (Map.Entry<ConsumerContext, ClientConsumerInternal> entryx : clonedConsumerEntries.entrySet()) {                         ClientConsumerInternal consumerInternal = entryx.getValue();                       synchronized (consumerInternal) {                          if (!consumerInternal.isClosed()) {                             sessionContext.recreateConsumerOnServer(consumerInternal, entryx.getKey().getId(), started);                          }                       }                    }                      if ((!autoCommitAcks || !autoCommitSends) && workDone) {                       // this is protected by a lock, so we can guarantee nothing will sneak here                       // while we do our work here                       rollbackOnly = true;                    }                    if (currentXID != null) {                       sessionContext.xaFailed(currentXID);                       rollbackOnly = true;                    }                      // Now start the session if it was already started                    if (started) {                       for (ClientConsumerInternal consumer : clonedConsumerEntries.values()) {                          consumer.clearAtFailover();                          consumer.start();                       }                         sessionContext.restartSession();                    }                      resetCreditManager = true;                 }                   sessionContext.returnBlocking(cause);              }           } catch (Throwable t) {              ActiveMQClientLogger.LOGGER.failedToHandleFailover(t);              suc = false;           } finally {              sessionContext.releaseCommunications();           }             if (resetCreditManager) {              synchronized (producerCreditManager) {                 producerCreditManager.reset();              }                // Also need to send more credits for consumers, otherwise the system could hand with the server              // not having any credits to send           }        }          HashMap<String, String> metaDataToSend;          synchronized (metadata) {           metaDataToSend = new HashMap<>(metadata);        }          sessionContext.resetMetadata(metaDataToSend);          return suc;       }       //......  }  
  • ClientSessionImpl的preHandleFailover方法会执行sessionContext.lockCommunications(),而handleFailover方法在recreateConsumerOnServer之后的finally里头会执行sessionContext.releaseCommunications()

小结

RemotingConnectionImpl定义了blockingCallFailoverTimeout属性;ChannelImpl的waitForFailOver在connection.getBlockingCallFailoverTimeout()大于等于0的时候执行ConcurrentUtil.await(failoverCondition, connection.getBlockingCallFailoverTimeout());send方法在failingOver为true时会执行waitForFailOver方法;其lock方法会设置failingOver为true,unlock方法会设置failingOver为false

doc

  • RemotingConnectionImpl