聊聊artemis的FederationManager

  • 2020 年 2 月 24 日
  • 笔记

本文主要研究一下artemis的FederationManager

FederationManager

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederationManager.java

public class FederationManager implements ActiveMQComponent {       private final ActiveMQServer server;       private Map<String, Federation> federations = new HashMap<>();     private State state;       enum State {        STOPPED,        STOPPING,        /**         * Deployed means {@link FederationManager#deploy()} was called but         * {@link FederationManager#start()} was not called.         * <p>         * We need the distinction if {@link FederationManager#stop()} is called before 'start'. As         * otherwise we would leak locators.         */        DEPLOYED, STARTED,     }         public FederationManager(final ActiveMQServer server) {        this.server = server;     }       @Override     public synchronized void start() throws ActiveMQException {        if (state == State.STARTED) return;        deploy();        for (Federation federation : federations.values()) {           federation.start();        }        state = State.STARTED;     }       @Override     public synchronized void stop() {        if (state == State.STOPPED) return;        state = State.STOPPING;            for (Federation federation : federations.values()) {           federation.stop();        }        federations.clear();        state = State.STOPPED;     }       @Override     public boolean isStarted() {        return state == State.STARTED;     }       public synchronized void deploy() throws ActiveMQException {        for (FederationConfiguration federationConfiguration : server.getConfiguration().getFederationConfigurations()) {           deploy(federationConfiguration);        }        if (state != State.STARTED) {           state = State.DEPLOYED;        }     }       public synchronized boolean undeploy(String name) {        Federation federation = federations.remove(name);        if (federation != null) {           federation.stop();        }        return true;     }           public synchronized boolean deploy(FederationConfiguration federationConfiguration) throws ActiveMQException {        Federation federation = federations.get(federationConfiguration.getName());        if (federation == null) {           federation = newFederation(federationConfiguration);        } else if (!Objects.equals(federation.getConfig().getCredentials(), federationConfiguration.getCredentials())) {           undeploy(federationConfiguration.getName());           federation = newFederation(federationConfiguration);        }        federation.deploy();        return true;     }       private synchronized Federation newFederation(FederationConfiguration federationConfiguration) throws ActiveMQException {        Federation federation = new Federation(server, federationConfiguration);        federations.put(federationConfiguration.getName(), federation);        if (state == State.STARTED) {           federation.start();        }        return federation;     }       public Federation get(String name) {        return federations.get(name);     }       public void register(FederatedAbstract federatedAbstract) {        server.registerBrokerPlugin(federatedAbstract);     }       public void unregister(FederatedAbstract federatedAbstract) {        server.unRegisterBrokerPlugin(federatedAbstract);     }    }
  • FederationManager实现了ActiveMQComponent接口,它提供了start()、stop()、deploy、undeploy等方法;其中start方法会先执行deploy方法,然后遍历federations.values()执行federation.start();stop方法则是遍历federations.values()执行federation.stop(),然后清空federations;deploy方法在federation为null时执行newFederation,然后执行federation.deploy(),若不为null且credentials与配置不一致则执行undeploy,重新newFederation;undeploy方法则是将federation从federations中移除然后执行federation.stop()

Federation

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/Federation.java

public class Federation {         private final ActiveMQServer server;     private final SimpleString name;       private final Map<String, FederationUpstream> upstreams = new HashMap<>();     private final Map<String, FederationDownstream> downstreams = new HashMap<>();     private final FederationConfiguration config;     private FederationManager.State state;       //......       public synchronized void deploy() throws ActiveMQException {        for (FederationUpstreamConfiguration upstreamConfiguration : config.getUpstreamConfigurations()) {           deploy(upstreamConfiguration, config.getFederationPolicyMap());        }        for (FederationDownstreamConfiguration downstreamConfiguration : config.getDownstreamConfigurations()) {           deploy(downstreamConfiguration, config.getFederationPolicyMap());        }        if (state != FederationManager.State.STARTED) {           state = FederationManager.State.DEPLOYED;        }     }       public synchronized boolean deploy(FederationUpstreamConfiguration upstreamConfiguration, Map<String, FederationPolicy> federationPolicyMap) throws ActiveMQException {        String name = upstreamConfiguration.getName();        FederationUpstream upstream = upstreams.get(name);          //If connection has changed we will need to do a full undeploy and redeploy.        if (upstream == null) {           undeploy(name);           upstream = deploy(name, upstreamConfiguration);        } else if (!upstream.getConnection().getConfig().equals(upstreamConfiguration.getConnectionConfiguration())) {           undeploy(name);           upstream = deploy(name, upstreamConfiguration);        }          upstream.deploy(upstreamConfiguration.getPolicyRefs(), federationPolicyMap);        return true;     }       public synchronized boolean deploy(FederationDownstreamConfiguration downstreamConfiguration, Map<String, FederationPolicy> federationPolicyMap) throws ActiveMQException {        String name = downstreamConfiguration.getName();        FederationDownstream downstream = downstreams.get(name);          //If connection has changed we will need to do a full undeploy and redeploy.        if (downstream == null) {           undeploy(name);           downstream = deploy(name, downstreamConfiguration);        } else if (!downstream.getConnection().getConfig().equals(downstreamConfiguration.getConnectionConfiguration())) {           undeploy(name);           downstream = deploy(name, downstreamConfiguration);        }          downstream.deploy(config);        return true;     }       private synchronized FederationUpstream deploy(String name, FederationUpstreamConfiguration upstreamConfiguration) {        FederationUpstream upstream = new FederationUpstream(server, this, name, upstreamConfiguration);        upstreams.put(name, upstream);        if (state == FederationManager.State.STARTED) {           upstream.start();        }        return upstream;     }       private synchronized FederationDownstream deploy(String name, FederationDownstreamConfiguration downstreamConfiguration) {        //If we have a matching upstream connection already configured then use it for the initiating downstream connection        FederationConnection connection = null;        if (downstreamConfiguration.getConnectionConfiguration().isShareConnection()) {           for (FederationUpstream upstream : upstreams.values()) {              if (upstream.getConfig().getConnectionConfiguration()                  .equals(downstreamConfiguration.getConnectionConfiguration())) {                 connection = upstream.getConnection();                 connection.setSharedConnection(true);                 break;              }           }        }          FederationDownstream downstream = new FederationDownstream(server, this, name, downstreamConfiguration, connection);        downstreams.put(name, downstream);        if (state == FederationManager.State.STARTED) {           downstream.start();        }        return downstream;     }       //......    }  
  • Federation的deploy方法先遍历config.getUpstreamConfigurations(),进行upstream的deploy,再遍历config.getDownstreamConfigurations(),进行downstream的deploy

小结

FederationManager实现了ActiveMQComponent接口,它提供了start()、stop()、deploy、undeploy等方法;其中start方法会先执行deploy方法,然后遍历federations.values()执行federation.start();stop方法则是遍历federations.values()执行federation.stop(),然后清空federations;deploy方法在federation为null时执行newFederation,然后执行federation.deploy(),若不为null且credentials与配置不一致则执行undeploy,重新newFederation;undeploy方法则是将federation从federations中移除然后执行federation.stop()

doc

  • FederationManager