聊聊artemis的DiscoveryGroup

  • 2020 年 2 月 24 日
  • 筆記

本文主要研究一下artemis的DiscoveryGroup

DiscoveryGroup

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java

public final class DiscoveryGroup implements ActiveMQComponent {       private static final Logger logger = Logger.getLogger(DiscoveryGroup.class);       private final List<DiscoveryListener> listeners = new ArrayList<>();       private final String name;       private Thread thread;       private boolean received;       private final Object waitLock = new Object();       private final Map<String, DiscoveryEntry> connectors = new ConcurrentHashMap<>();       private final long timeout;       private volatile boolean started;       private final String nodeID;       private final Map<String, String> uniqueIDMap = new HashMap<>();       private final BroadcastEndpoint endpoint;       private final NotificationService notificationService;       /**      * This is the main constructor, intended to be used      *      * @param nodeID      * @param name      * @param timeout      * @param endpointFactory      * @param service      * @throws Exception      */     public DiscoveryGroup(final String nodeID,                           final String name,                           final long timeout,                           BroadcastEndpointFactory endpointFactory,                           NotificationService service) throws Exception {        this.nodeID = nodeID;        this.name = name;        this.timeout = timeout;        this.endpoint = endpointFactory.createBroadcastEndpoint();        this.notificationService = service;     }       @Override     public synchronized void start() throws Exception {        if (started) {           return;        }          endpoint.openClient();          started = true;          thread = new Thread(new DiscoveryRunnable(), "activemq-discovery-group-thread-" + name);          thread.setDaemon(true);          thread.start();          if (notificationService != null) {           TypedProperties props = new TypedProperties();             props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));             Notification notification = new Notification(nodeID, CoreNotificationType.DISCOVERY_GROUP_STARTED, props);             notificationService.sendNotification(notification);        }     }       /**      * This will start the DiscoveryRunnable and run it directly.      * This is useful for a test process where we need this execution blocking a thread.      */     public void internalRunning() throws Exception {        endpoint.openClient();        started = true;        DiscoveryRunnable runnable = new DiscoveryRunnable();        runnable.run();     }       @Override     public void stop() {        synchronized (this) {           if (!started) {              return;           }             started = false;        }          synchronized (waitLock) {           waitLock.notifyAll();        }          try {           endpoint.close(false);        } catch (Exception e1) {           ActiveMQClientLogger.LOGGER.errorStoppingDiscoveryBroadcastEndpoint(endpoint, e1);        }          try {           if (thread != null) {              thread.interrupt();              thread.join(10000);              if (thread.isAlive()) {                 ActiveMQClientLogger.LOGGER.timedOutStoppingDiscovery();              }           }        } catch (InterruptedException e) {           throw new ActiveMQInterruptedException(e);        }          thread = null;          if (notificationService != null) {           TypedProperties props = new TypedProperties();           props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));           Notification notification = new Notification(nodeID, CoreNotificationType.DISCOVERY_GROUP_STOPPED, props);           try {              notificationService.sendNotification(notification);           } catch (Exception e) {              ActiveMQClientLogger.LOGGER.errorSendingNotifOnDiscoveryStop(e);           }        }     }       @Override     public boolean isStarted() {        return started;     }       public String getName() {        return name;     }       public synchronized List<DiscoveryEntry> getDiscoveryEntries() {        return new ArrayList<>(connectors.values());     }       //......  }
  • DiscoveryGroup的构造器会使用endpointFactory.createBroadcastEndpoint()创建endpoint;start方法会执行endpoint.openClient(),创建并执行DiscoveryRunnable

DiscoveryRunnable

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java

   class DiscoveryRunnable implements Runnable {          @Override        public void run() {           byte[] data = null;             while (started) {              try {                 try {                      data = endpoint.receiveBroadcast();                    if (data == null) {                       if (started) {                          ActiveMQClientLogger.LOGGER.unexpectedNullDataReceived();                       }                       break;                    }                 } catch (Exception e) {                    if (!started) {                       return;                    } else {                       ActiveMQClientLogger.LOGGER.errorReceivingPacketInDiscovery(e);                    }                 }                   ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(data);                   String originatingNodeID = buffer.readString();                   String uniqueID = buffer.readString();                   checkUniqueID(originatingNodeID, uniqueID);                   if (nodeID.equals(originatingNodeID)) {                    if (checkExpiration()) {                       callListeners();                    }                    // Ignore traffic from own node                    continue;                 }                   int size = buffer.readInt();                   boolean changed = false;                   DiscoveryEntry[] entriesRead = new DiscoveryEntry[size];                 // Will first decode all the elements outside of any lock                 for (int i = 0; i < size; i++) {                    TransportConfiguration connector = new TransportConfiguration();                      connector.decode(buffer);                      entriesRead[i] = new DiscoveryEntry(originatingNodeID, connector, System.currentTimeMillis());                 }                   synchronized (DiscoveryGroup.this) {                    for (DiscoveryEntry entry : entriesRead) {                       if (connectors.put(originatingNodeID, entry) == null) {                          changed = true;                       }                    }                      changed = changed || checkExpiration();                 }                 //only call the listeners if we have changed                 //also make sure that we aren't stopping to avoid deadlock                 if (changed && started) {                    if (logger.isTraceEnabled()) {                       logger.trace("Connectors changed on Discovery:");                       for (DiscoveryEntry connector : connectors.values()) {                          logger.trace(connector);                       }                    }                    callListeners();                 }                   synchronized (waitLock) {                    received = true;                      waitLock.notifyAll();                 }              } catch (Throwable e) {                 ActiveMQClientLogger.LOGGER.failedToReceiveDatagramInDiscovery(e);              }           }        }       }
  • DiscoveryRunnable实现了Runnable接口,其run方法通过endpoint.receiveBroadcast()接收数据,之后解析为DiscoveryEntry更新到connectors中;在changed为true时会执行callListeners,执行DiscoveryListener.connectorsChanged回调

JGroupsBroadcastEndpoint

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java

public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {       private static final Logger logger = Logger.getLogger(JGroupsBroadcastEndpoint.class);       private final String channelName;       private boolean clientOpened;       private boolean broadcastOpened;       private JChannelWrapper channel;       private JGroupsReceiver receiver;       private JChannelManager manager;       public JGroupsBroadcastEndpoint(JChannelManager manager, String channelName) {        this.manager = manager;        this.channelName = channelName;     }       @Override     public void broadcast(final byte[] data) throws Exception {        if (logger.isTraceEnabled())           logger.trace("Broadcasting: BroadCastOpened=" + broadcastOpened + ", channelOPen=" + channel.getChannel().isOpen());        if (broadcastOpened) {           org.jgroups.Message msg = new org.jgroups.Message();             msg.setBuffer(data);             channel.send(msg);        }     }       @Override     public byte[] receiveBroadcast() throws Exception {        if (logger.isTraceEnabled())           logger.trace("Receiving Broadcast: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen());        if (clientOpened) {           return receiver.receiveBroadcast();        } else {           return null;        }     }       @Override     public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception {        if (logger.isTraceEnabled())           logger.trace("Receiving Broadcast2: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen());        if (clientOpened) {           return receiver.receiveBroadcast(time, unit);        } else {           return null;        }     }       @Override     public synchronized void openClient() throws Exception {        if (clientOpened) {           return;        }        internalOpen();        receiver = new JGroupsReceiver();        channel.addReceiver(receiver);        clientOpened = true;     }       @Override     public synchronized void openBroadcaster() throws Exception {        if (broadcastOpened)           return;        internalOpen();        broadcastOpened = true;     }       public abstract JChannel createChannel() throws Exception;       public JGroupsBroadcastEndpoint initChannel() throws Exception {        this.channel = manager.getJChannel(channelName, this);        return this;     }       protected void internalOpen() throws Exception {        channel.connect();     }       @Override     public synchronized void close(boolean isBroadcast) throws Exception {        if (isBroadcast) {           broadcastOpened = false;        } else {           channel.removeReceiver(receiver);           clientOpened = false;        }        internalCloseChannel(channel);     }       /**      * Closes the channel used in this JGroups Broadcast.      * Can be overridden by implementations that use an externally managed channel.      *      * @param channel      */     protected synchronized void internalCloseChannel(JChannelWrapper channel) {        channel.close(true);     }    }
  • JGroupsBroadcastEndpoint是个抽象类,它声明实现了BroadcastEndpoint接口;其broadcast方法创建org.jgroups.Message然后使用JChannelWrapper发送消息;其receiveBroadcast方法使用JGroupsReceiver来receiveBroadcast;其openClient则创建JGroupsReceiver;internalOpen方法则是执行channel.connect()

BroadcastGroupImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BroadcastGroupImpl.java

public class BroadcastGroupImpl implements BroadcastGroup, Runnable {       //......       public synchronized void broadcastConnectors() throws Exception {        ActiveMQBuffer buff = ActiveMQBuffers.dynamicBuffer(4096);          buff.writeString(nodeManager.getNodeId().toString());          buff.writeString(uniqueID);          buff.writeInt(connectors.size());          for (TransportConfiguration tcConfig : connectors) {           tcConfig.encode(buff);        }          // Only send as many bytes as we need.        byte[] data = new byte[buff.readableBytes()];        buff.getBytes(buff.readerIndex(), data);          endpoint.broadcast(data);     }       public void run() {        if (!started) {           return;        }          try {           broadcastConnectors();           loggedBroadcastException = false;        } catch (Exception e) {           // only log the exception at ERROR level once, even if it fails multiple times in a row - HORNETQ-919           if (!loggedBroadcastException) {              ActiveMQServerLogger.LOGGER.errorBroadcastingConnectorConfigs(e);              loggedBroadcastException = true;           } else {              logger.debug("Failed to broadcast connector configs...again", e);           }        }     }       //......  }  
  • BroadcastGroupImpl实现了BroadcastGroup及Runnable方法,其run方法执行broadcastConnectors;broadcastConnectors方法则遍历connectors将TransportConfiguration写入到buff中之后通过endpoint.broadcast(data)广播出去

小结

DiscoveryGroup的构造器会使用endpointFactory.createBroadcastEndpoint()创建endpoint;start方法会执行endpoint.openClient(),创建并执行DiscoveryRunnable;DiscoveryRunnable实现了Runnable接口,其run方法通过endpoint.receiveBroadcast()接收数据,之后解析为DiscoveryEntry更新到connectors中;在changed为true时会执行callListeners,执行DiscoveryListener.connectorsChanged回调;JGroupsBroadcastEndpoint是个抽象类,它声明实现了BroadcastEndpoint接口;其broadcast方法创建org.jgroups.Message然后使用JChannelWrapper发送消息;其receiveBroadcast方法使用JGroupsReceiver来receiveBroadcast;其openClient则创建JGroupsReceiver;internalOpen方法则是执行channel.connect()

doc

  • DiscoveryGroup