- 2020 年 2 月 24 日
- 笔记
public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener { private TransportConfiguration liveTransportConfiguration; public enum BACKUP_ACTIVATION { FAIL_OVER, FAILURE_REPLICATING, ALREADY_REPLICATING, STOP; } private QuorumManager quorumManager; private String targetServerID = ""; private final NodeManager nodeManager; private final StorageManager storageManager; private final ScheduledExecutorService scheduledPool; private final int quorumSize; private final int voteRetries; private final long voteRetryWait; private final Object voteGuard = new Object(); private CountDownLatch latch; private ClientSessionFactoryInternal sessionFactory; private CoreRemotingConnection connection; private final NetworkHealthCheck networkHealthCheck; private volatile boolean stopped = false; private final int quorumVoteWait; //...... @Override public void nodeDown(Topology topology, long eventUID, String nodeID) { if (targetServerID.equals(nodeID)) { decideOnAction(topology); } } @Override public void nodeUp(Topology topology) { //noop } /** * if the connection to our replicated live goes down then decide on an action */ @Override public void connectionFailed(ActiveMQException exception, boolean failedOver) { decideOnAction(sessionFactory.getServerLocator().getTopology()); } @Override public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) { connectionFailed(me, failedOver); } @Override public void beforeReconnect(ActiveMQException exception) { //noop } public void decideOnAction(Topology topology) { //we may get called via multiple paths so need to guard synchronized (decisionGuard) { if (signal == BACKUP_ACTIVATION.FAIL_OVER) { if (networkHealthCheck != null && !networkHealthCheck.check()) { signal = BACKUP_ACTIVATION.FAILURE_REPLICATING; } return; } if (!isLiveDown()) { //lost connection but don't know if live is down so restart as backup as we can't replicate any more ActiveMQServerLogger.LOGGER.restartingAsBackupBasedOnQuorumVoteResults(); signal = BACKUP_ACTIVATION.FAILURE_REPLICATING; } else { // live is assumed to be down, backup fails-over ActiveMQServerLogger.LOGGER.failingOverBasedOnQuorumVoteResults(); signal = BACKUP_ACTIVATION.FAIL_OVER; } /* use NetworkHealthCheck to determine if node is isolated * if there are no addresses/urls configured then ignore and rely on quorum vote only */ if (networkHealthCheck != null && !networkHealthCheck.isEmpty()) { if (networkHealthCheck.check()) { // live is assumed to be down, backup fails-over signal = BACKUP_ACTIVATION.FAIL_OVER; } else { ActiveMQServerLogger.LOGGER.serverIsolatedOnNetwork(); signal = BACKUP_ACTIVATION.FAILURE_REPLICATING; } } } latch.countDown(); } private boolean isLiveDown() { //lets assume live is not down Boolean decision = false; int voteAttempts = 0; int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : quorumSize; synchronized (voteGuard) { while (!stopped && voteAttempts++ < voteRetries) { //the live is dead so lets vote for quorum QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, targetServerID); quorumManager.vote(quorumVote); try { quorumVote.await(quorumVoteWait, TimeUnit.SECONDS); } catch (InterruptedException interruption) { // No-op. The best the quorum can do now is to return the latest number it has ActiveMQServerLogger.LOGGER.quorumVoteAwaitInterrupted(); } quorumManager.voteComplete(quorumVote); decision = quorumVote.getDecision(); if (decision) { return decision; } try { voteGuard.wait(voteRetryWait); } catch (InterruptedException e) { //nothing to do here } } } return decision; } public synchronized void reset() { latch = new CountDownLatch(1); } //...... }
- SharedNothingBackupQuorum的nodeDown及connectionFailed方法都会执行decideOnAction;该方法对于signal为BACKUP_ACTIVATION.FAIL_OVER的在networkHealthCheck不为null时执行networkHealthCheck.check(),若为false则更新signal为BACKUP_ACTIVATION.FAILURE_REPLICATING然后返回
- 对于signal为其他值的执行isLiveDown方法,若为false则更新signal为BACKUP_ACTIVATION.FAILURE_REPLICATING,否则更新signal为BACKUP_ACTIVATION.FAIL_OVER;最后在networkHealthCheck不为null不为空时会执行networkHealthCheck.check(),返回true则更新signal为BACKUP_ACTIVATION.FAIL_OVER,否则更新signal为BACKUP_ACTIVATION.FAILURE_REPLICATING
- isLiveDown方法创建QuorumVoteServerConnect,然后执行quorumManager.vote(quorumVote)之后进行quorumVote.await(quorumVoteWait, TimeUnit.SECONDS),最后执行quorumManager.voteComplete(quorumVote),然后取quorumVote.getDecision()值,若为true则立刻返回,否则执行voteGuard.wait(voteRetryWait),进行重试,重试voteRetries次
public final class SharedNothingBackupActivation extends Activation { //...... public void run() { try { logger.trace("SharedNothingBackupActivation..start"); synchronized (activeMQServer) { activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED); } //...... synchronized (this) { logger.trace("Entered a synchronized"); if (closed) return; backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize(), replicaPolicy.getVoteRetries(), replicaPolicy.getVoteRetryWait(), replicaPolicy.getQuorumVoteWait()); activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum); activeMQServer.getClusterManager().getQuorumManager().registerQuorumHandler(new ServerConnectVoteHandler(activeMQServer)); } //...... SharedNothingBackupQuorum.BACKUP_ACTIVATION signal; do { if (closed) { if (logger.isTraceEnabled()) { logger.trace("Activation is closed, so giving up"); } return; } if (logger.isTraceEnabled()) { logger.trace("looking up the node through nodeLocator.locateNode()"); } //locate the first live server to try to replicate nodeLocator.locateNode(); Pair<TransportConfiguration, TransportConfiguration> possibleLive = nodeLocator.getLiveConfiguration(); nodeID = nodeLocator.getNodeID(); if (logger.isTraceEnabled()) { logger.trace("nodeID = " + nodeID); } //in a normal (non failback) scenario if we couldn't find our live server we should fail if (!attemptFailBack) { if (logger.isTraceEnabled()) { logger.trace("attemptFailback=false, nodeID=" + nodeID); } //this shouldn't happen if (nodeID == null) { logger.debug("Throwing a RuntimeException as nodeID==null ant attemptFailback=false"); throw new RuntimeException("Could not establish the connection"); } activeMQServer.getNodeManager().setNodeID(nodeID); } if (possibleLive != null) { clusterControl = tryConnectToNodeInReplicatedCluster(clusterController, possibleLive.getA()); if (clusterControl == null) { clusterControl = tryConnectToNodeInReplicatedCluster(clusterController, possibleLive.getB()); } } else { clusterControl = null; } if (clusterControl == null) { if (logger.isTraceEnabled()) { logger.trace("sleeping " + clusterController.getRetryIntervalForReplicatedCluster() + " it should retry"); } //its ok to retry here since we haven't started replication yet //it may just be the server has gone since discovery Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster()); signal = SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING; continue; } activeMQServer.getThreadPool().execute(endpointConnector); /** * Wait for a signal from the the quorum manager, at this point if replication has been successful we can * fail over or if there is an error trying to replicate (such as already replicating) we try the * process again on the next live server. All the action happens inside {@link BackupQuorum} */ signal = backupQuorum.waitForStatusChange(); if (logger.isTraceEnabled()) { logger.trace("Got a signal " + signal + " through backupQuorum.waitForStatusChange()"); } /** * replicationEndpoint will be holding lots of open files. Make sure they get * closed/sync'ed. */ ActiveMQServerImpl.stopComponent(replicationEndpoint); // time to give up if (!activeMQServer.isStarted() || signal == STOP) { if (logger.isTraceEnabled()) { logger.trace("giving up on the activation:: activemqServer.isStarted=" + activeMQServer.isStarted() + " while signal = " + signal); } return; } else if (signal == FAIL_OVER) { // time to fail over if (logger.isTraceEnabled()) { logger.trace("signal == FAIL_OVER, breaking the loop"); } break; } else if (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_REPLICATING) { // something has gone badly run restart from scratch if (logger.isTraceEnabled()) { logger.trace("Starting a new thread to stop the server!"); } Thread startThread = new Thread(new Runnable() { @Override public void run() { try { if (logger.isTraceEnabled()) { logger.trace("Calling activeMQServer.stop() and start() to restart the server"); } activeMQServer.stop(); activeMQServer.start(); } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorRestartingBackupServer(e, activeMQServer); } } }); startThread.start(); return; } //ok, this live is no good, let's reset and try again //close this session factory, we're done with it clusterControl.close(); backupQuorum.reset(); if (replicationEndpoint.getChannel() != null) { replicationEndpoint.getChannel().close(); replicationEndpoint.setChannel(null); } } while (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING); //...... } catch (Exception e) { if (logger.isTraceEnabled()) { logger.trace(e.getMessage() + ", serverStarted=" + activeMQServer.isStarted(), e); } if ((e instanceof InterruptedException || e instanceof IllegalStateException) && !activeMQServer.isStarted()) // do not log these errors if the server is being stopped. return; ActiveMQServerLogger.LOGGER.initializationError(e); } } //...... }
- SharedNothingBackupActivation的run方法会创建SharedNothingBackupQuorum,然后while循环执行backupQuorum.waitForStatusChange()更新signal,直到signal不为SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING;在signal为SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_REPLICATING时执行activeMQServer.stop()及activeMQServer.start();最后执行backupQuorum.reset()
