聊聊artemis的SharedNothingBackupQuorum

  • 2020 年 2 月 24 日
  • 筆記

本文主要研究一下artemis的SharedNothingBackupQuorum

SharedNothingBackupQuorum

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java

public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener {       private TransportConfiguration liveTransportConfiguration;       public enum BACKUP_ACTIVATION {        FAIL_OVER, FAILURE_REPLICATING, ALREADY_REPLICATING, STOP;     }       private QuorumManager quorumManager;       private String targetServerID = "";       private final NodeManager nodeManager;       private final StorageManager storageManager;     private final ScheduledExecutorService scheduledPool;     private final int quorumSize;       private final int voteRetries;       private final long voteRetryWait;       private final Object voteGuard = new Object();       private CountDownLatch latch;       private ClientSessionFactoryInternal sessionFactory;       private CoreRemotingConnection connection;       private final NetworkHealthCheck networkHealthCheck;       private volatile boolean stopped = false;       private final int quorumVoteWait;       //......       @Override     public void nodeDown(Topology topology, long eventUID, String nodeID) {        if (targetServerID.equals(nodeID)) {           decideOnAction(topology);        }     }       @Override     public void nodeUp(Topology topology) {        //noop     }       /**      * if the connection to our replicated live goes down then decide on an action      */     @Override     public void connectionFailed(ActiveMQException exception, boolean failedOver) {        decideOnAction(sessionFactory.getServerLocator().getTopology());     }       @Override     public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) {        connectionFailed(me, failedOver);     }       @Override     public void beforeReconnect(ActiveMQException exception) {        //noop     }       public void decideOnAction(Topology topology) {        //we may get called via multiple paths so need to guard        synchronized (decisionGuard) {           if (signal == BACKUP_ACTIVATION.FAIL_OVER) {              if (networkHealthCheck != null && !networkHealthCheck.check()) {                 signal = BACKUP_ACTIVATION.FAILURE_REPLICATING;              }              return;           }           if (!isLiveDown()) {              //lost connection but don't know if live is down so restart as backup as we can't replicate any more              ActiveMQServerLogger.LOGGER.restartingAsBackupBasedOnQuorumVoteResults();              signal = BACKUP_ACTIVATION.FAILURE_REPLICATING;           } else {              // live is assumed to be down, backup fails-over              ActiveMQServerLogger.LOGGER.failingOverBasedOnQuorumVoteResults();              signal = BACKUP_ACTIVATION.FAIL_OVER;           }             /* use NetworkHealthCheck to determine if node is isolated            * if there are no addresses/urls configured then ignore and rely on quorum vote only            */           if (networkHealthCheck != null && !networkHealthCheck.isEmpty()) {              if (networkHealthCheck.check()) {                 // live is assumed to be down, backup fails-over                 signal = BACKUP_ACTIVATION.FAIL_OVER;              } else {                 ActiveMQServerLogger.LOGGER.serverIsolatedOnNetwork();                 signal = BACKUP_ACTIVATION.FAILURE_REPLICATING;              }           }        }        latch.countDown();     }       private boolean isLiveDown() {        //lets assume live is not down        Boolean decision = false;        int voteAttempts = 0;        int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : quorumSize;          synchronized (voteGuard) {           while (!stopped && voteAttempts++ < voteRetries) {              //the live is dead so lets vote for quorum              QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, targetServerID);                quorumManager.vote(quorumVote);                try {                 quorumVote.await(quorumVoteWait, TimeUnit.SECONDS);              } catch (InterruptedException interruption) {                 // No-op. The best the quorum can do now is to return the latest number it has                 ActiveMQServerLogger.LOGGER.quorumVoteAwaitInterrupted();              }                quorumManager.voteComplete(quorumVote);                decision = quorumVote.getDecision();                if (decision) {                 return decision;              }              try {                 voteGuard.wait(voteRetryWait);              } catch (InterruptedException e) {                 //nothing to do here              }           }        }          return decision;     }       public synchronized void reset() {        latch = new CountDownLatch(1);     }       //......  }
  • SharedNothingBackupQuorum的nodeDown及connectionFailed方法都会执行decideOnAction;该方法对于signal为BACKUP_ACTIVATION.FAIL_OVER的在networkHealthCheck不为null时执行networkHealthCheck.check(),若为false则更新signal为BACKUP_ACTIVATION.FAILURE_REPLICATING然后返回
  • 对于signal为其他值的执行isLiveDown方法,若为false则更新signal为BACKUP_ACTIVATION.FAILURE_REPLICATING,否则更新signal为BACKUP_ACTIVATION.FAIL_OVER;最后在networkHealthCheck不为null不为空时会执行networkHealthCheck.check(),返回true则更新signal为BACKUP_ACTIVATION.FAIL_OVER,否则更新signal为BACKUP_ACTIVATION.FAILURE_REPLICATING
  • isLiveDown方法创建QuorumVoteServerConnect,然后执行quorumManager.vote(quorumVote)之后进行quorumVote.await(quorumVoteWait, TimeUnit.SECONDS),最后执行quorumManager.voteComplete(quorumVote),然后取quorumVote.getDecision()值,若为true则立刻返回,否则执行voteGuard.wait(voteRetryWait),进行重试,重试voteRetries次

SharedNothingBackupActivation

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java

public final class SharedNothingBackupActivation extends Activation {       //......       public void run() {        try {             logger.trace("SharedNothingBackupActivation..start");           synchronized (activeMQServer) {              activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);           }             //......             synchronized (this) {              logger.trace("Entered a synchronized");              if (closed)                 return;              backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize(), replicaPolicy.getVoteRetries(), replicaPolicy.getVoteRetryWait(), replicaPolicy.getQuorumVoteWait());              activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum);              activeMQServer.getClusterManager().getQuorumManager().registerQuorumHandler(new ServerConnectVoteHandler(activeMQServer));           }             //......             SharedNothingBackupQuorum.BACKUP_ACTIVATION signal;           do {                if (closed) {                 if (logger.isTraceEnabled()) {                    logger.trace("Activation is closed, so giving up");                 }                 return;              }                if (logger.isTraceEnabled()) {                 logger.trace("looking up the node through nodeLocator.locateNode()");              }              //locate the first live server to try to replicate              nodeLocator.locateNode();              Pair<TransportConfiguration, TransportConfiguration> possibleLive = nodeLocator.getLiveConfiguration();              nodeID = nodeLocator.getNodeID();                if (logger.isTraceEnabled()) {                 logger.trace("nodeID = " + nodeID);              }              //in a normal (non failback) scenario if we couldn't find our live server we should fail              if (!attemptFailBack) {                 if (logger.isTraceEnabled()) {                    logger.trace("attemptFailback=false, nodeID=" + nodeID);                 }                   //this shouldn't happen                 if (nodeID == null) {                    logger.debug("Throwing a RuntimeException as nodeID==null ant attemptFailback=false");                    throw new RuntimeException("Could not establish the connection");                 }                 activeMQServer.getNodeManager().setNodeID(nodeID);              }                if (possibleLive != null) {                 clusterControl = tryConnectToNodeInReplicatedCluster(clusterController, possibleLive.getA());                 if (clusterControl == null) {                    clusterControl = tryConnectToNodeInReplicatedCluster(clusterController, possibleLive.getB());                 }              } else {                 clusterControl = null;              }              if (clusterControl == null) {                   if (logger.isTraceEnabled()) {                    logger.trace("sleeping " + clusterController.getRetryIntervalForReplicatedCluster() + " it should retry");                 }                 //its ok to retry here since we haven't started replication yet                 //it may just be the server has gone since discovery                 Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster());                 signal = SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING;                 continue;              }                activeMQServer.getThreadPool().execute(endpointConnector);              /**               * Wait for a signal from the the quorum manager, at this point if replication has been successful we can               * fail over or if there is an error trying to replicate (such as already replicating) we try the               * process again on the next live server.  All the action happens inside {@link BackupQuorum}               */              signal = backupQuorum.waitForStatusChange();                if (logger.isTraceEnabled()) {                 logger.trace("Got a signal " + signal + " through backupQuorum.waitForStatusChange()");              }                /**               * replicationEndpoint will be holding lots of open files. Make sure they get               * closed/sync'ed.               */              ActiveMQServerImpl.stopComponent(replicationEndpoint);              // time to give up              if (!activeMQServer.isStarted() || signal == STOP) {                 if (logger.isTraceEnabled()) {                    logger.trace("giving up on the activation:: activemqServer.isStarted=" + activeMQServer.isStarted() + " while signal = " + signal);                 }                 return;              } else if (signal == FAIL_OVER) {                 // time to fail over                 if (logger.isTraceEnabled()) {                    logger.trace("signal == FAIL_OVER, breaking the loop");                 }                 break;              } else if (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_REPLICATING) {                 // something has gone badly run restart from scratch                 if (logger.isTraceEnabled()) {                    logger.trace("Starting a new thread to stop the server!");                 }                   Thread startThread = new Thread(new Runnable() {                    @Override                    public void run() {                       try {                          if (logger.isTraceEnabled()) {                             logger.trace("Calling activeMQServer.stop() and start() to restart the server");                          }                          activeMQServer.stop();                          activeMQServer.start();                       } catch (Exception e) {                          ActiveMQServerLogger.LOGGER.errorRestartingBackupServer(e, activeMQServer);                       }                    }                 });                 startThread.start();                 return;              }              //ok, this live is no good, let's reset and try again              //close this session factory, we're done with it              clusterControl.close();              backupQuorum.reset();              if (replicationEndpoint.getChannel() != null) {                 replicationEndpoint.getChannel().close();                 replicationEndpoint.setChannel(null);              }           }           while (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING);             //......          } catch (Exception e) {           if (logger.isTraceEnabled()) {              logger.trace(e.getMessage() + ", serverStarted=" + activeMQServer.isStarted(), e);           }           if ((e instanceof InterruptedException || e instanceof IllegalStateException) && !activeMQServer.isStarted())              // do not log these errors if the server is being stopped.              return;           ActiveMQServerLogger.LOGGER.initializationError(e);        }     }       //......  }
  • SharedNothingBackupActivation的run方法会创建SharedNothingBackupQuorum,然后while循环执行backupQuorum.waitForStatusChange()更新signal,直到signal不为SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING;在signal为SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_REPLICATING时执行activeMQServer.stop()及activeMQServer.start();最后执行backupQuorum.reset()

小结

SharedNothingBackupQuorum的nodeDown及connectionFailed方法都会执行decideOnAction;该方法对于signal为BACKUP_ACTIVATION.FAIL_OVER的在networkHealthCheck不为null时执行networkHealthCheck.check(),若为false则更新signal为BACKUP_ACTIVATION.FAILURE_REPLICATING然后返回;对于signal为其他值的执行isLiveDown方法,若为false则更新signal为BACKUP_ACTIVATION.FAILURE_REPLICATING,否则更新signal为BACKUP_ACTIVATION.FAIL_OVER;最后在networkHealthCheck不为null不为空时会执行networkHealthCheck.check(),返回true则更新signal为BACKUP_ACTIVATION.FAIL_OVER,否则更新signal为BACKUP_ACTIVATION.FAILURE_REPLICATING

doc

  • SharedNothingBackupQuorum