聊聊artemis的QuorumVote

  • 2020 年 2 月 24 日
  • 筆記

本文主要研究一下artemis的QuorumVote

QuorumVote

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVote.java

public abstract class QuorumVote<V extends Vote, T> {       private SimpleString name;       public QuorumVote(SimpleString name) {        this.name = name;     }       /**      * called by the {@link org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager} when one of the nodes in the quorum is      * successfully connected to. The QuorumVote can then decide whether or not a decision can be made with just that information.      *      * @return the vote to use      */     public abstract Vote connected();       /**      * called by the {@link org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager} fails to connect to a node in the quorum.      * The QuorumVote can then decide whether or not a decision can be made with just that information however the node      * cannot cannot be asked.      *      * @return the vote to use      */     public abstract Vote notConnected();       /**      * called by the {@link org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager} when a vote can be made, either from the      * cluster or decided by itself.      *      * @param vote the vote to make.      */     public abstract void vote(V vote);       /**      * get the decion of the vote      *      * @return the voting decision      */     public abstract T getDecision();       /**      * called by the {@link org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager} when all the votes have been cast and received.      *      * @param voteTopology the topology of where the votes were sent.      */     public abstract void allVotesCast(Topology voteTopology);       /**      * the name of this quorum vote, used for identifying the correct {@link org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler}      *      * @return the name of the wuorum vote      */     public SimpleString getName() {        return name;     }  }
  • QuorumVote是个抽象类,定义了connected、notConnected、vote、getDecision、allVotesCast抽象方法

QuorumVoteServerConnect

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java

public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boolean> {       public static final SimpleString LIVE_FAILOVER_VOTE = new SimpleString("LiveFailoverQuorumVote");     private final CountDownLatch latch;     private final String targetNodeId;     private final String liveConnector;       private int votesNeeded;       private int total = 0;       private boolean decision = false;       // Is this the live requesting to stay live, or a backup requesting to become live.     private boolean requestToStayLive = false;       /**      * live nodes | remaining nodes |  majority   | votes needed      * 1      |       0         |     0       |      0      * 2      |       1         |     1       |      1      * n      |    r = n-1      |   n/2 + 1   |   n/2 + 1 rounded      * 3      |       2         |     2.5     |      2      * 4      |       3         |      3      |      3      * 5      |       4         |     3.5     |      3      * 6      |       5         |      4      |      4      */     public QuorumVoteServerConnect(int size, String targetNodeId, boolean requestToStayLive, String liveConnector) {        super(LIVE_FAILOVER_VOTE);        this.targetNodeId = targetNodeId;        this.liveConnector = liveConnector;        double majority;        if (size <= 2) {           majority = ((double) size) / 2;        } else {           //even           majority = ((double) size) / 2 + 1;        }        //votes needed could be say 2.5 so we add 1 in this case        votesNeeded = (int) majority;        latch = new CountDownLatch(votesNeeded);        if (votesNeeded == 0) {           decision = true;        }        this.requestToStayLive = requestToStayLive;     }       public QuorumVoteServerConnect(int size, String targetNodeId) {        this(size, targetNodeId, false, null);     }     /**      * if we can connect to a node      *      * @return      */     @Override     public Vote connected() {        return new ServerConnectVote(targetNodeId, requestToStayLive, null);     }     /**      * if we cant connect to the node      *      * @return      */     @Override     public Vote notConnected() {        return new BooleanVote(false);     }       /**      * live nodes | remaining nodes |  majority   | votes needed      * 1      |       0         |     0       |      0      * 2      |       1         |     1       |      1      * n      |    r = n-1      |   n/2 + 1   |   n/2 + 1 rounded      * 3      |       2         |     2.5     |      2      * 4      |       3         |      3      |      3      * 5      |       4         |     3.5     |      3      * 6      |       5         |      4      |      4      *      * @param vote the vote to make.      */     @Override     public synchronized void vote(ServerConnectVote vote) {        if (decision)           return;        if (!requestToStayLive && vote.getVote()) {           total++;           latch.countDown();           if (total >= votesNeeded) {              decision = true;           }//do the opposite, if it says there is a node connected it means the backup has come live        } else if (requestToStayLive && vote.getVote()) {           total++;           latch.countDown();           if (liveConnector != null && !liveConnector.equals(vote.getTransportConfiguration())) {              ActiveMQServerLogger.LOGGER.qourumBackupIsLive(liveConnector);              return;           }           if (total >= votesNeeded) {              decision = true;           }        }     }       @Override     public void allVotesCast(Topology voteTopology) {        while (latch.getCount() > 0) {           latch.countDown();        }     }       @Override     public Boolean getDecision() {        return decision;     }       public void await(int latchTimeout, TimeUnit unit) throws InterruptedException {        ActiveMQServerLogger.LOGGER.waitingForQuorumVoteResults(latchTimeout, unit.toString().toLowerCase());        if (latch.await(latchTimeout, unit))           ActiveMQServerLogger.LOGGER.receivedAllQuorumVotes();        else           ActiveMQServerLogger.LOGGER.timeoutWaitingForQuorumVoteResponses();     }       public boolean isRequestToStayLive() {        return requestToStayLive;     }  }
  • QuorumVoteServerConnect继承了QuorumVote,其构造器根据size初始化votesNeeded及decision;其connected方法返回ServerConnectVote;其notConnected方法返回BooleanVote(false);其vote方法对于ServerConnectVote的vote为true的递增total,同时latch.countDown(),对于total大于等于votesNeeded的更新decision为true;其allVotesCast方法则循环latch.countDown()

小结

QuorumVote是个抽象类,定义了connected、notConnected、vote、getDecision、allVotesCast抽象方法;QuorumVoteServerConnect继承了QuorumVote,其构造器根据size初始化votesNeeded及decision;其connected方法返回ServerConnectVote;其notConnected方法返回BooleanVote(false);其vote方法对于ServerConnectVote的vote为true的递增total,同时latch.countDown(),对于total大于等于votesNeeded的更新decision为true;其allVotesCast方法则循环latch.countDown()

doc

  • QuorumVoteServerConnect