聊聊artemis的DiscoveryGroup
- 2020 年 2 月 24 日
- 筆記
序
本文主要研究一下artemis的DiscoveryGroup
DiscoveryGroup
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java
public final class DiscoveryGroup implements ActiveMQComponent { private static final Logger logger = Logger.getLogger(DiscoveryGroup.class); private final List<DiscoveryListener> listeners = new ArrayList<>(); private final String name; private Thread thread; private boolean received; private final Object waitLock = new Object(); private final Map<String, DiscoveryEntry> connectors = new ConcurrentHashMap<>(); private final long timeout; private volatile boolean started; private final String nodeID; private final Map<String, String> uniqueIDMap = new HashMap<>(); private final BroadcastEndpoint endpoint; private final NotificationService notificationService; /** * This is the main constructor, intended to be used * * @param nodeID * @param name * @param timeout * @param endpointFactory * @param service * @throws Exception */ public DiscoveryGroup(final String nodeID, final String name, final long timeout, BroadcastEndpointFactory endpointFactory, NotificationService service) throws Exception { this.nodeID = nodeID; this.name = name; this.timeout = timeout; this.endpoint = endpointFactory.createBroadcastEndpoint(); this.notificationService = service; } @Override public synchronized void start() throws Exception { if (started) { return; } endpoint.openClient(); started = true; thread = new Thread(new DiscoveryRunnable(), "activemq-discovery-group-thread-" + name); thread.setDaemon(true); thread.start(); if (notificationService != null) { TypedProperties props = new TypedProperties(); props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name)); Notification notification = new Notification(nodeID, CoreNotificationType.DISCOVERY_GROUP_STARTED, props); notificationService.sendNotification(notification); } } /** * This will start the DiscoveryRunnable and run it directly. * This is useful for a test process where we need this execution blocking a thread. */ public void internalRunning() throws Exception { endpoint.openClient(); started = true; DiscoveryRunnable runnable = new DiscoveryRunnable(); runnable.run(); } @Override public void stop() { synchronized (this) { if (!started) { return; } started = false; } synchronized (waitLock) { waitLock.notifyAll(); } try { endpoint.close(false); } catch (Exception e1) { ActiveMQClientLogger.LOGGER.errorStoppingDiscoveryBroadcastEndpoint(endpoint, e1); } try { if (thread != null) { thread.interrupt(); thread.join(10000); if (thread.isAlive()) { ActiveMQClientLogger.LOGGER.timedOutStoppingDiscovery(); } } } catch (InterruptedException e) { throw new ActiveMQInterruptedException(e); } thread = null; if (notificationService != null) { TypedProperties props = new TypedProperties(); props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name)); Notification notification = new Notification(nodeID, CoreNotificationType.DISCOVERY_GROUP_STOPPED, props); try { notificationService.sendNotification(notification); } catch (Exception e) { ActiveMQClientLogger.LOGGER.errorSendingNotifOnDiscoveryStop(e); } } } @Override public boolean isStarted() { return started; } public String getName() { return name; } public synchronized List<DiscoveryEntry> getDiscoveryEntries() { return new ArrayList<>(connectors.values()); } //...... }
- DiscoveryGroup的构造器会使用endpointFactory.createBroadcastEndpoint()创建endpoint;start方法会执行endpoint.openClient(),创建并执行DiscoveryRunnable
DiscoveryRunnable
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java
class DiscoveryRunnable implements Runnable { @Override public void run() { byte[] data = null; while (started) { try { try { data = endpoint.receiveBroadcast(); if (data == null) { if (started) { ActiveMQClientLogger.LOGGER.unexpectedNullDataReceived(); } break; } } catch (Exception e) { if (!started) { return; } else { ActiveMQClientLogger.LOGGER.errorReceivingPacketInDiscovery(e); } } ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(data); String originatingNodeID = buffer.readString(); String uniqueID = buffer.readString(); checkUniqueID(originatingNodeID, uniqueID); if (nodeID.equals(originatingNodeID)) { if (checkExpiration()) { callListeners(); } // Ignore traffic from own node continue; } int size = buffer.readInt(); boolean changed = false; DiscoveryEntry[] entriesRead = new DiscoveryEntry[size]; // Will first decode all the elements outside of any lock for (int i = 0; i < size; i++) { TransportConfiguration connector = new TransportConfiguration(); connector.decode(buffer); entriesRead[i] = new DiscoveryEntry(originatingNodeID, connector, System.currentTimeMillis()); } synchronized (DiscoveryGroup.this) { for (DiscoveryEntry entry : entriesRead) { if (connectors.put(originatingNodeID, entry) == null) { changed = true; } } changed = changed || checkExpiration(); } //only call the listeners if we have changed //also make sure that we aren't stopping to avoid deadlock if (changed && started) { if (logger.isTraceEnabled()) { logger.trace("Connectors changed on Discovery:"); for (DiscoveryEntry connector : connectors.values()) { logger.trace(connector); } } callListeners(); } synchronized (waitLock) { received = true; waitLock.notifyAll(); } } catch (Throwable e) { ActiveMQClientLogger.LOGGER.failedToReceiveDatagramInDiscovery(e); } } } }
- DiscoveryRunnable实现了Runnable接口,其run方法通过endpoint.receiveBroadcast()接收数据,之后解析为DiscoveryEntry更新到connectors中;在changed为true时会执行callListeners,执行DiscoveryListener.connectorsChanged回调
JGroupsBroadcastEndpoint
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java
public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint { private static final Logger logger = Logger.getLogger(JGroupsBroadcastEndpoint.class); private final String channelName; private boolean clientOpened; private boolean broadcastOpened; private JChannelWrapper channel; private JGroupsReceiver receiver; private JChannelManager manager; public JGroupsBroadcastEndpoint(JChannelManager manager, String channelName) { this.manager = manager; this.channelName = channelName; } @Override public void broadcast(final byte[] data) throws Exception { if (logger.isTraceEnabled()) logger.trace("Broadcasting: BroadCastOpened=" + broadcastOpened + ", channelOPen=" + channel.getChannel().isOpen()); if (broadcastOpened) { org.jgroups.Message msg = new org.jgroups.Message(); msg.setBuffer(data); channel.send(msg); } } @Override public byte[] receiveBroadcast() throws Exception { if (logger.isTraceEnabled()) logger.trace("Receiving Broadcast: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen()); if (clientOpened) { return receiver.receiveBroadcast(); } else { return null; } } @Override public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception { if (logger.isTraceEnabled()) logger.trace("Receiving Broadcast2: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen()); if (clientOpened) { return receiver.receiveBroadcast(time, unit); } else { return null; } } @Override public synchronized void openClient() throws Exception { if (clientOpened) { return; } internalOpen(); receiver = new JGroupsReceiver(); channel.addReceiver(receiver); clientOpened = true; } @Override public synchronized void openBroadcaster() throws Exception { if (broadcastOpened) return; internalOpen(); broadcastOpened = true; } public abstract JChannel createChannel() throws Exception; public JGroupsBroadcastEndpoint initChannel() throws Exception { this.channel = manager.getJChannel(channelName, this); return this; } protected void internalOpen() throws Exception { channel.connect(); } @Override public synchronized void close(boolean isBroadcast) throws Exception { if (isBroadcast) { broadcastOpened = false; } else { channel.removeReceiver(receiver); clientOpened = false; } internalCloseChannel(channel); } /** * Closes the channel used in this JGroups Broadcast. * Can be overridden by implementations that use an externally managed channel. * * @param channel */ protected synchronized void internalCloseChannel(JChannelWrapper channel) { channel.close(true); } }
- JGroupsBroadcastEndpoint是个抽象类,它声明实现了BroadcastEndpoint接口;其broadcast方法创建org.jgroups.Message然后使用JChannelWrapper发送消息;其receiveBroadcast方法使用JGroupsReceiver来receiveBroadcast;其openClient则创建JGroupsReceiver;internalOpen方法则是执行channel.connect()
BroadcastGroupImpl
activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BroadcastGroupImpl.java
public class BroadcastGroupImpl implements BroadcastGroup, Runnable { //...... public synchronized void broadcastConnectors() throws Exception { ActiveMQBuffer buff = ActiveMQBuffers.dynamicBuffer(4096); buff.writeString(nodeManager.getNodeId().toString()); buff.writeString(uniqueID); buff.writeInt(connectors.size()); for (TransportConfiguration tcConfig : connectors) { tcConfig.encode(buff); } // Only send as many bytes as we need. byte[] data = new byte[buff.readableBytes()]; buff.getBytes(buff.readerIndex(), data); endpoint.broadcast(data); } public void run() { if (!started) { return; } try { broadcastConnectors(); loggedBroadcastException = false; } catch (Exception e) { // only log the exception at ERROR level once, even if it fails multiple times in a row - HORNETQ-919 if (!loggedBroadcastException) { ActiveMQServerLogger.LOGGER.errorBroadcastingConnectorConfigs(e); loggedBroadcastException = true; } else { logger.debug("Failed to broadcast connector configs...again", e); } } } //...... }
- BroadcastGroupImpl实现了BroadcastGroup及Runnable方法,其run方法执行broadcastConnectors;broadcastConnectors方法则遍历connectors将TransportConfiguration写入到buff中之后通过endpoint.broadcast(data)广播出去
小结
DiscoveryGroup的构造器会使用endpointFactory.createBroadcastEndpoint()创建endpoint;start方法会执行endpoint.openClient(),创建并执行DiscoveryRunnable;DiscoveryRunnable实现了Runnable接口,其run方法通过endpoint.receiveBroadcast()接收数据,之后解析为DiscoveryEntry更新到connectors中;在changed为true时会执行callListeners,执行DiscoveryListener.connectorsChanged回调;JGroupsBroadcastEndpoint是个抽象类,它声明实现了BroadcastEndpoint接口;其broadcast方法创建org.jgroups.Message然后使用JChannelWrapper发送消息;其receiveBroadcast方法使用JGroupsReceiver来receiveBroadcast;其openClient则创建JGroupsReceiver;internalOpen方法则是执行channel.connect()
doc
- DiscoveryGroup