聊聊artemis的HAManager

  • 2020 年 2 月 24 日
  • 筆記

本文主要研究一下artemis的HAManager

HAManager

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAManager.java

public interface HAManager extends ActiveMQComponent {       /**      * return the current backup servers      *      * @return the backups      */     Map<String, ActiveMQServer> getBackupServers();  }
  • HAManager继承了ActiveMQComponent接口,它定义了getBackupServers方法

StandaloneHAManager

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/StandaloneHAManager.java

public class StandaloneHAManager implements HAManager {       Map<String, ActiveMQServer> servers = new HashMap<>();       boolean isStarted = false;       @Override     public Map<String, ActiveMQServer> getBackupServers() {        return servers;     }       @Override     public void start() throws Exception {        if (isStarted)           return;        isStarted = true;     }       @Override     public void stop() throws Exception {        if (!isStarted)           return;        isStarted = false;     }       @Override     public boolean isStarted() {        return isStarted;     }  }
  • StandaloneHAManager实现了HAManager接口,其getBackupServers方法返回空map

ColocatedHAManager

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedHAManager.java

public class ColocatedHAManager implements HAManager {       private final ColocatedPolicy haPolicy;       private final ActiveMQServer server;       private final Map<String, ActiveMQServer> backupServers = new HashMap<>();       private boolean started;       public ColocatedHAManager(ColocatedPolicy haPolicy, ActiveMQServer activeMQServer) {        this.haPolicy = haPolicy;        server = activeMQServer;     }       /**      * starts the HA manager.      */     @Override     public void start() {        if (started)           return;          server.getActivation().haStarted();          started = true;     }       /**      * stop any backups      */     @Override     public void stop() {        for (ActiveMQServer activeMQServer : backupServers.values()) {           try {              activeMQServer.stop();           } catch (Exception e) {              ActiveMQServerLogger.LOGGER.errorStoppingServer(e);           }        }        backupServers.clear();        started = false;     }       @Override     public boolean isStarted() {        return started;     }       public synchronized boolean activateBackup(int backupSize,                                                String journalDirectory,                                                String bindingsDirectory,                                                String largeMessagesDirectory,                                                String pagingDirectory,                                                SimpleString nodeID) throws Exception {        if (backupServers.size() >= haPolicy.getMaxBackups() || backupSize != backupServers.size()) {           return false;        }        if (haPolicy.getBackupPolicy().isSharedStore()) {           return activateSharedStoreBackup(journalDirectory, bindingsDirectory, largeMessagesDirectory, pagingDirectory);        } else {           return activateReplicatedBackup(nodeID);        }     }       /**      * return the current backup servers      *      * @return the backups      */     @Override     public Map<String, ActiveMQServer> getBackupServers() {        return backupServers;     }       /**      * send a request to a live server to start a backup for us      *      * @param connectorPair the connector for the node to request a backup from      * @param backupSize    the current size of the requested nodes backups      * @param replicated      * @return true if the request wa successful.      * @throws Exception      */     public boolean requestBackup(Pair<TransportConfiguration, TransportConfiguration> connectorPair,                                  int backupSize,                                  boolean replicated) throws Exception {        ClusterController clusterController = server.getClusterManager().getClusterController();        try           (              ClusterControl clusterControl = clusterController.connectToNode(connectorPair.getA());           ) {           clusterControl.authorize();           if (replicated) {              return clusterControl.requestReplicatedBackup(backupSize, server.getNodeID());           } else {              return clusterControl.requestSharedStoreBackup(backupSize, server.getConfiguration().getJournalLocation().getAbsolutePath(), server.getConfiguration().getBindingsLocation().getAbsolutePath(), server.getConfiguration().getLargeMessagesLocation().getAbsolutePath(), server.getConfiguration().getPagingLocation().getAbsolutePath());             }        }     }       private synchronized boolean activateSharedStoreBackup(String journalDirectory,                                                            String bindingsDirectory,                                                            String largeMessagesDirectory,                                                            String pagingDirectory) throws Exception {        Configuration configuration = server.getConfiguration().copy();        ActiveMQServer backup = server.createBackupServer(configuration);        try {           int portOffset = haPolicy.getBackupPortOffset() * (backupServers.size() + 1);           String name = "colocated_backup_" + backupServers.size() + 1;           //make sure we don't restart as we are colocated           haPolicy.getBackupPolicy().setRestartBackup(false);           //set the backup policy           backup.setHAPolicy(haPolicy.getBackupPolicy());           updateSharedStoreConfiguration(configuration, name, portOffset, haPolicy.getExcludedConnectors(), journalDirectory, bindingsDirectory, largeMessagesDirectory, pagingDirectory, haPolicy.getBackupPolicy().getScaleDownPolicy() == null);             backupServers.put(configuration.getName(), backup);           backup.start();        } catch (Exception e) {           backup.stop();           ActiveMQServerLogger.LOGGER.activateSharedStoreSlaveFailed(e);           return false;        }        ActiveMQServerLogger.LOGGER.activatingSharedStoreSlave();        return true;     }       /**      * activate a backup server replicating from a specified node.      *      * decline and the requesting server can cast a re vote      *      * @param nodeID the id of the node to replicate from      * @return true if the server was created and started      * @throws Exception      */     private synchronized boolean activateReplicatedBackup(SimpleString nodeID) throws Exception {        final TopologyMember member;        try {           member = server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeID.toString());           if (!Objects.equals(member.getBackupGroupName(), haPolicy.getBackupPolicy().getBackupGroupName())) {              return false;           }        } catch (Exception e) {           ActiveMQServerLogger.LOGGER.activateReplicatedBackupFailed(e);           return false;        }        Configuration configuration = server.getConfiguration().copy();        ActiveMQServer backup = server.createBackupServer(configuration);        try {           int portOffset = haPolicy.getBackupPortOffset() * (backupServers.size() + 1);           String name = "colocated_backup_" + backupServers.size() + 1;           //make sure we don't restart as we are colocated           haPolicy.getBackupPolicy().setRestartBackup(false);           //set the backup policy           backup.setHAPolicy(haPolicy.getBackupPolicy());           updateReplicatedConfiguration(configuration, name, portOffset, haPolicy.getExcludedConnectors(), haPolicy.getBackupPolicy().getScaleDownPolicy() == null);           backup.addActivationParam(ActivationParams.REPLICATION_ENDPOINT, member);           backupServers.put(configuration.getName(), backup);           backup.start();        } catch (Exception e) {           backup.stop();           ActiveMQServerLogger.LOGGER.activateReplicatedBackupFailed(e);           return false;        }        ActiveMQServerLogger.LOGGER.activatingReplica(nodeID);        return true;     }       //......  }
  • ColocatedHAManager实现了HAManager接口,其getBackupServers方法返回backupServers;activateSharedStoreBackup方法以及activateReplicatedBackup方法都会通过server.createBackupServer(configuration)创建backup,然后添加到backupServers;activateBackup方法则根据haPolicy.getBackupPolicy()来选择执行activateSharedStoreBackup或者是activateReplicatedBackup方法

ColocatedPolicy

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java

public class ColocatedPolicy implements HAPolicy<LiveActivation> {       /*live stuff*/     private boolean requestBackup = ActiveMQDefaultConfiguration.isDefaultHapolicyRequestBackup();       private int backupRequestRetries = ActiveMQDefaultConfiguration.getDefaultHapolicyBackupRequestRetries();       private long backupRequestRetryInterval = ActiveMQDefaultConfiguration.getDefaultHapolicyBackupRequestRetryInterval();       private int maxBackups = ActiveMQDefaultConfiguration.getDefaultHapolicyMaxBackups();       private int backupPortOffset = ActiveMQDefaultConfiguration.getDefaultHapolicyBackupPortOffset();       /*backup stuff*/     private List<String> excludedConnectors = new ArrayList<>();       private BackupPolicy backupPolicy;       private HAPolicy<LiveActivation> livePolicy;       //......  }
  • ColocatedPolicy实现了HAPolicy接口,其定义了backupPolicy属性

BackupPolicy

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/BackupPolicy.java

public abstract class BackupPolicy implements HAPolicy<Activation> {       protected ScaleDownPolicy scaleDownPolicy;     protected boolean restartBackup = ActiveMQDefaultConfiguration.isDefaultRestartBackup();       public ScaleDownPolicy getScaleDownPolicy() {        return scaleDownPolicy;     }       public void setScaleDownPolicy(ScaleDownPolicy scaleDownPolicy) {        this.scaleDownPolicy = scaleDownPolicy;     }       @Override     public boolean isBackup() {        return true;     }       @Override     public String getScaleDownClustername() {        return null;     }       @Override     public String getScaleDownGroupName() {        return getScaleDownPolicy() != null ? getScaleDownPolicy().getGroupName() : null;     }       public boolean isRestartBackup() {        return restartBackup;     }       public void setRestartBackup(boolean restartBackup) {        this.restartBackup = restartBackup;     }  }
  • BackupPolicy声明实现了HAPolicy接口,其isBackup方法返回true;它有两个实现类分别是SharedStoreSlavePolicy与ReplicaPolicy

SharedStoreSlavePolicy

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreSlavePolicy.java

public class SharedStoreSlavePolicy extends BackupPolicy {       private boolean failoverOnServerShutdown = ActiveMQDefaultConfiguration.isDefaultFailoverOnServerShutdown();       private boolean allowAutoFailBack = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback();       private boolean isWaitForActivation = ActiveMQDefaultConfiguration.isDefaultWaitForActivation();       //this is how we act once we have failed over     private SharedStoreMasterPolicy sharedStoreMasterPolicy;       public SharedStoreSlavePolicy() {     }       public SharedStoreSlavePolicy(boolean failoverOnServerShutdown,                                   boolean restartBackup,                                   boolean allowAutoFailBack,                                   ScaleDownPolicy scaleDownPolicy) {        this.failoverOnServerShutdown = failoverOnServerShutdown;        this.restartBackup = restartBackup;        this.allowAutoFailBack = allowAutoFailBack;        this.scaleDownPolicy = scaleDownPolicy;     }       @Deprecated     public long getFailbackDelay() {        return -1;     }       @Deprecated     public void setFailbackDelay(long failbackDelay) {     }       public boolean isFailoverOnServerShutdown() {        return failoverOnServerShutdown;     }       public void setFailoverOnServerShutdown(boolean failoverOnServerShutdown) {        this.failoverOnServerShutdown = failoverOnServerShutdown;     }       public SharedStoreMasterPolicy getSharedStoreMasterPolicy() {        if (sharedStoreMasterPolicy == null) {           sharedStoreMasterPolicy = new SharedStoreMasterPolicy(failoverOnServerShutdown, isWaitForActivation);        }        return sharedStoreMasterPolicy;     }       public void setSharedStoreMasterPolicy(SharedStoreMasterPolicy sharedStoreMasterPolicy) {        this.sharedStoreMasterPolicy = sharedStoreMasterPolicy;     }       @Override     public boolean isSharedStore() {        return true;     }       @Override     public boolean canScaleDown() {        return scaleDownPolicy != null;     }       public boolean isAllowAutoFailBack() {        return allowAutoFailBack;     }       public void setAllowAutoFailBack(boolean allowAutoFailBack) {        this.allowAutoFailBack = allowAutoFailBack;     }       public void setIsWaitForActivation(boolean isWaitForActivation) {        this.isWaitForActivation = isWaitForActivation;     }       @Override     public Activation createActivation(ActiveMQServerImpl server,                                        boolean wasLive,                                        Map<String, Object> activationParams,                                        ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) {        return new SharedStoreBackupActivation(server, this);     }       @Override     public String getBackupGroupName() {        return null;     }  }
  • SharedStoreSlavePolicy继承了BackupPolicy,其isSharedStore方法返回true

ReplicaPolicy

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java

public class ReplicaPolicy extends BackupPolicy {       private String clusterName;       private int maxSavedReplicatedJournalsSize = ActiveMQDefaultConfiguration.getDefaultMaxSavedReplicatedJournalsSize();       private String groupName = null;       private boolean restartBackup = ActiveMQDefaultConfiguration.isDefaultRestartBackup();       //used if we create a replicated policy for when we become live.     private boolean allowFailback = ActiveMQDefaultConfiguration.isDefaultAllowAutoFailback();       private long initialReplicationSyncTimeout = ActiveMQDefaultConfiguration.getDefaultInitialReplicationSyncTimeout();       /*     * what quorum size to use for voting     * */     private int quorumSize;       /*     * whether or not this live broker should vote to remain live     * */     private boolean voteOnReplicationFailure;       private ReplicatedPolicy replicatedPolicy;       private final NetworkHealthCheck networkHealthCheck;       private int voteRetries;       private long voteRetryWait;       private final int quorumVoteWait;       private long retryReplicationWait;       //......       @Override     public boolean isRestartBackup() {        return restartBackup;     }       @Override     public void setRestartBackup(boolean restartBackup) {        this.restartBackup = restartBackup;     }       @Override     public boolean isSharedStore() {        return false;     }       //......  }
  • ReplicaPolicy继承了BackupPolicy,其isSharedStore方法返回false

小结

HAManager继承了ActiveMQComponent接口,它定义了getBackupServers方法;StandaloneHAManager实现了HAManager接口,其getBackupServers方法返回空map;ColocatedHAManager实现了HAManager接口,其getBackupServers方法返回backupServers;activateSharedStoreBackup方法以及activateReplicatedBackup方法都会通过server.createBackupServer(configuration)创建backup,然后添加到backupServers;activateBackup方法则根据haPolicy.getBackupPolicy()来选择执行activateSharedStoreBackup或者是activateReplicatedBackup方法

doc

  • HAManager