Nacos源碼分析-事件發布機制

溫馨提示:
本文內容基於個人學習Nacos 2.0.1版本程式碼總結而來,因個人理解差異,不保證完全正確。如有理解錯誤之處歡迎各位拍磚指正,相互學習;轉載請註明出處。

Nacos的服務註冊、服務變更等功能都是通過事件發布來通知的,搞清楚事件發布訂閱的機制,有利於理解業務的流程走向。本文將淺顯的分析Nacos中的事件發布訂閱實現。

事件(Event)

常規事件(Event)

package com.alibaba.nacos.common.notify;

public abstract class Event implements Serializable {
    
    private static final AtomicLong SEQUENCE = new AtomicLong(0);
    
    private final long sequence = SEQUENCE.getAndIncrement();
    
    /**
     * Event sequence number, which can be used to handle the sequence of events.
     *
     * @return sequence num, It's best to make sure it's monotone.
     */
    public long sequence() {
        return sequence;
    }
}

在事件抽象類中定義了一個事件的序列號,它是自增的。用於區分事件執行的前後順序。它是由DefaultPublisher來處理。

慢事件(SlowEvent)

之所以稱之為慢事件,可能因為所有的事件都共享同一個隊列吧。

package com.alibaba.nacos.common.notify;

/**
 * This event share one event-queue.
 * @author <a href="mailto:[email protected]">liaochuntao</a>
 * @author zongtanghu
 */
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class SlowEvent extends Event {
    
    @Override
    public long sequence() {
        return 0;
    }
}

提示:
SlowEvent可以共享一個事件隊列,也就是一個發布者可以同時管理多個事件的發布(區別於DefaultPublisher只能管理一個事件)。

訂閱者(Subscriber)

單事件訂閱者

這裡的單事件訂閱者指的是當前的訂閱者只能訂閱一種類型的事件。

package com.alibaba.nacos.common.notify.listener;

/**
 * An abstract subscriber class for subscriber interface.
 * @author <a href="mailto:[email protected]">liaochuntao</a>
 * @author zongtanghu
 */
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class Subscriber<T extends Event> {
    
    /**
     * Event callback.
     * 事件處理入口,由對應的事件發布器調用
     * @param event {@link Event}
     */
    public abstract void onEvent(T event);
    
    /**
     * Type of this subscriber's subscription.
     * 訂閱的事件類型
     * @return Class which extends {@link Event}
     */
    public abstract Class<? extends Event> subscribeType();
    
    /**
     * It is up to the listener to determine whether the callback is asynchronous or synchronous.
     * 執行緒執行器,由具體的實現類來決定是非同步還是同步調用
     * @return {@link Executor}
     */
    public Executor executor() {
        return null;
    }
    
    /**
     * Whether to ignore expired events.
     * 是否忽略過期事件
     * @return default value is {@link Boolean#FALSE}
     */
    public boolean ignoreExpireEvent() {
        return false;
    }
}

這是默認的訂閱者對象,默認情況下一個訂閱者只能訂閱一個類型的事件。

多事件訂閱者

package com.alibaba.nacos.common.notify.listener;


/**
 * Subscribers to multiple events can be listened to.
 *
 * @author <a href="mailto:[email protected]">liaochuntao</a>
 * @author zongtanghu
 */
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class SmartSubscriber extends Subscriber {
    
    /**
     * Returns which event type are smartsubscriber interested in.
     * 區別於父類,這裡支援多個事件類型
     * @return The interestd event types.
     */
    public abstract List<Class<? extends Event>> subscribeTypes();
    
    @Override
    public final Class<? extends Event> subscribeType() {
		// 採用final修飾,禁止使用單一事件屬性
        return null;
    }
    
    @Override
    public final boolean ignoreExpireEvent() {
        return false;
    }
}

提示
SmartSubscriber和Subscriber的區別是一個可以訂閱多個事件,一個只能訂閱一個事件,處理它們的發布者也不同。

發布者(Publisher)

發布者指的是Nacos中的事件發布者,頂級介面為EventPublisher。

package com.alibaba.nacos.common.notify;

/**
 * Event publisher.
 *
 * @author <a href="mailto:[email protected]">liaochuntao</a>
 * @author zongtanghu
 */
public interface EventPublisher extends Closeable {
    
    /**
     * Initializes the event publisher.
     * 初始化事件發布者
     * @param type       {@link Event >}
     * @param bufferSize Message staging queue size
     */
    void init(Class<? extends Event> type, int bufferSize);
    
    /**
     * The number of currently staged events.
     * 當前暫存的事件數量
     * @return event size
     */
    long currentEventSize();
    
    /**
     * Add listener.
     * 添加訂閱者
     * @param subscriber {@link Subscriber}
     */
    void addSubscriber(Subscriber subscriber);
    
    /**
     * Remove listener.
     * 移除訂閱者
     * @param subscriber {@link Subscriber}
     */
    void removeSubscriber(Subscriber subscriber);
    
    /**
     * publish event.
     * 發布事件
     * @param event {@link Event}
     * @return publish event is success
     */
    boolean publish(Event event);
    
    /**
     * Notify listener.
     * 通知訂閱者
     * @param subscriber {@link Subscriber}
     * @param event      {@link Event}
     */
    void notifySubscriber(Subscriber subscriber, Event event);
    
}

發布者的主要功能就是新增訂閱者、通知訂閱者,目前有兩種類型的發布者分別是DefaultPublisher和DefaultSharePublisher。

單事件發布者(DefaultPublisher)

一個發布者實例只能處理一種類型的事件。

public class DefaultPublisher extends Thread implements EventPublisher {
	
	// 發布者是否初始化完畢
	private volatile boolean initialized = false;
	// 是否關閉了發布者
	private volatile boolean shutdown = false;
	// 事件的類型
	private Class<? extends Event> eventType;
	// 訂閱者列表
	protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<Subscriber>();
	// 隊列最大容量
	private int queueMaxSize = -1;
	// 隊列類型
	private BlockingQueue<Event> queue;
	// 最後一個事件的序列號
	protected volatile Long lastEventSequence = -1L;
	// 事件序列號更新對象,用於更新原子屬性lastEventSequence
	private static final AtomicReferenceFieldUpdater<DefaultPublisher, Long> UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultPublisher.class, Long.class, "lastEventSequence");
}

發布者的初始化

public void init(Class<? extends Event> type, int bufferSize) {
	setDaemon(true);
	setName("nacos.publisher-" + type.getName());
	this.eventType = type;
	this.queueMaxSize = bufferSize;
	this.queue = new ArrayBlockingQueue<Event>(bufferSize);
	start();
}

在初始化方法中,將其設置為了守護執行緒,意味著它將持續運行(它需要持續監控內部的事件隊列),傳入的type屬性為當前發布者需要處理的事件類型,設置當前執行緒的名稱以事件類型為區分,它將會以多個執行緒的形式存在,每個執行緒代表一種事件類型的發布者,後面初始化了隊列的長度。最後調用啟動方法完成當前執行緒的啟動。

發布者執行緒啟動

public synchronized void start() {
	if (!initialized) {
		// start just called once
		super.start();
		if (queueMaxSize == -1) {
			queueMaxSize = ringBufferSize;
		}
		initialized = true;
	}
}

直接調用了Thread的start方法開啟守護執行緒,並設置初始化狀態為true。根據java執行緒的啟動方式,調用start方法之後start方法是會調用run方法的。

public void run() {
	openEventHandler();
}

void openEventHandler() {
	try {
		
		// This variable is defined to resolve the problem which message overstock in the queue.
		int waitTimes = 60;
		// To ensure that messages are not lost, enable EventHandler when
		// waiting for the first Subscriber to register
		for (; ; ) {
			// 執行緒終止條件判斷
			if (shutdown || hasSubscriber() || waitTimes <= 0) {
				break;
			}
			// 執行緒休眠1秒
			ThreadUtils.sleep(1000L);
			// 等待次數減1
			waitTimes--;
		}
		
		for (; ; ) {
			// 執行緒終止條件判斷
			if (shutdown) {
				break;
			}
			// 從隊列取出事件
			final Event event = queue.take();
			// 接收事件
			receiveEvent(event);
			// 更新事件序列號
			UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
		}
	} catch (Throwable ex) {
		LOGGER.error("Event listener exception : {}", ex);
	}
}

在run方法中調用了openEventHandler()方法。那發布者的實際工作原理就存在於這個方法內部。在首次啟動的時候會等待1分鐘,然後再進行消息消費。

接收並發布事件

這裡的接收事件指的是接收通知中心發過來的事件,發布給訂閱者。

void receiveEvent(Event event) {
	// 獲取當前事件的序列號,它是自增的
	final long currentEventSequence = event.sequence();
	
	// 通知所有訂閱了該事件的訂閱者
	// Notification single event listener
	for (Subscriber subscriber : subscribers) {
		// 判斷訂閱者是否忽略事件過期,判斷當前事件是否被處理過(lastEventSequence初始化的值為-1,而Event的sequence初始化的值為0)
		// Whether to ignore expiration events
		if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
			LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", event.getClass());
			continue;
		}
		
		// Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
		// Remove original judge part of codes.
		notifySubscriber(subscriber, event);
	}
}

public void notifySubscriber(final Subscriber subscriber, final Event event) {
	
	LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
	
	// 為每個訂閱者創建一個Runnable對象
	final Runnable job = () -> subscriber.onEvent(event);
	// 使用訂閱者的執行緒執行器
	final Executor executor = subscriber.executor();
	// 若訂閱者沒有自己的執行器,則直接執行run方法啟動訂閱者消費執行緒
	if (executor != null) {
		executor.execute(job);
	} else {
		try {
			job.run();
		} catch (Throwable e) {
			LOGGER.error("Event callback exception: ", e);
		}
	}
}

外部調用發布事件

前面的發布事件是指從隊列內部獲取事件並通知訂閱者,這裡的發布事件區別在於它是開放給外部調用者,接收統一通知中心的事件並放入隊列中的。

public boolean publish(Event event) {
	checkIsStart();
	boolean success = this.queue.offer(event);
	if (!success) {
		LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
		receiveEvent(event);
		return true;
	}
	return true;
}

在放入隊列成功的時候直接返回,若放入隊列失敗,則是直接同步發送事件給訂閱者,不經過隊列。這裡的同步我認為的是從調用者到發布者調用訂閱者之間是同步的,若隊列可用,則是調用者到入隊列就完成了本次調用,不需要等待循環通知訂閱者。使用隊列解耦無疑會提升通知中心的工作效率。

總體來說就是一個發布者內部維護一個BlockingQueue,在實現上使用了ArrayBlockingQueue,它是一個有界阻塞隊列,元素先進先出。並且使用非公平模式提升性能,意味著等待消費的訂閱者執行順序將得不到保障(業務需求沒有這種順序性要求)。同時也維護了一個訂閱者集合(他們都訂閱了同一個事件類型),在死循環中不斷從ArrayBlockingQueue中獲取數據來循環通知每一個訂閱者,也就是調用訂閱者的onEvent()方法。

多事件發布者(DefaultSharePublisher)

用於發布SlowEvent事件並通知所有訂閱了該事件的訂閱者。

public class DefaultSharePublisher extends DefaultPublisher {
	// 用於保存事件類型為SlowEvent的訂閱者,一個事件類型對應多個訂閱者
	private final Map<Class<? extends SlowEvent>, Set<Subscriber>> subMappings = new ConcurrentHashMap<Class<? extends SlowEvent>, Set<Subscriber>>();
    // 可重入鎖
    private final Lock lock = new ReentrantLock();
}

它繼承了DefaultPublisher,意味著它將擁有其所有的特性。從subMappings屬性來看,這個發布器是支援多個SlowEvent事件的。DefaultSharePublisher重載了DefaultPublisher的addSubscriber()和removeSubscriber()方法,用於處理多事件類型的情形。

添加訂閱者:

public void addSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType) {
	
	// 將事件類型轉換為當前發布者支援的類型
	// Actually, do a classification based on the slowEvent type.
	Class<? extends SlowEvent> subSlowEventType = (Class<? extends SlowEvent>) subscribeType;
	// 添加到父類的訂閱者列表中,為何要添加呢?因為它需要使用父類的隊列消費邏輯
	// For adding to parent class attributes synchronization.
	subscribers.add(subscriber);
	// 為多個操作加鎖
	lock.lock();
	try {
		// 首先從事件訂閱列表裡面獲取當前事件對應的訂閱者集合
		Set<Subscriber> sets = subMappings.get(subSlowEventType);
		// 若沒有訂閱者,則新增當前訂閱者
		if (sets == null) {
			Set<Subscriber> newSet = new ConcurrentHashSet<Subscriber>();
			newSet.add(subscriber);
			subMappings.put(subSlowEventType, newSet);
			return;
		}
		// 若當前事件訂閱者列表不為空,則插入,因為使用的是Set集合因此可以避免重複數據
		sets.add(subscriber);
	} finally {
		// 別忘了解鎖
		lock.unlock();
	}
}

提示:
Set newSet = new ConcurrentHashSet(); 它這裡實際上使用的是自己實現的ConcurrentHashSet,它內部使用了ConcurrentHashMap來實現存儲。
在ConcurrentHashSet.add()方法的實現上,它以當前插入的Subscriber對象為key,以一個Boolean值佔位:map.putIfAbsent(o, Boolean.TRUE)。

事件類型和訂閱者的存儲狀態為:
EventType1 -> {Subscriber1, Subscriber2, Subscriber3…}
EventType2 -> {Subscriber1, Subscriber2, Subscriber3…}
EventType3 -> {Subscriber1, Subscriber2, Subscriber3…}
感興趣的可以自己查閱一下源碼。

移除訂閱者

public void removeSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType) {
	// 轉換類型
	// Actually, do a classification based on the slowEvent type.
	Class<? extends SlowEvent> subSlowEventType = (Class<? extends SlowEvent>) subscribeType;
	// 先移除父類中的訂閱者
	// For removing to parent class attributes synchronization.
	subscribers.remove(subscriber);
	// 加鎖
	lock.lock();
	try {
		// 移除指定事件的指定訂閱者
		Set<Subscriber> sets = subMappings.get(subSlowEventType);
		
		if (sets != null) {
			sets.remove(subscriber);
		}
	} finally {
		// 解鎖
		lock.unlock();
	}
}

接收事件

@Override
public void receiveEvent(Event event) {
	// 獲取當前事件的序列號
	final long currentEventSequence = event.sequence();
	// 獲取事件的類型,轉換為當前發布器支援的事件
	// get subscriber set based on the slow EventType.
	final Class<? extends SlowEvent> slowEventType = (Class<? extends SlowEvent>) event.getClass();
	
	// 獲取當前事件的訂閱者列表
	// Get for Map, the algorithm is O(1).
	Set<Subscriber> subscribers = subMappings.get(slowEventType);
	if (null == subscribers) {
		LOGGER.debug("[NotifyCenter] No subscribers for slow event {}", slowEventType.getName());
		return;
	}
	
	// 循環通知所有訂閱者
	// Notification single event subscriber
	for (Subscriber subscriber : subscribers) {
		// Whether to ignore expiration events
		if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
			LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", event.getClass());
			continue;
		}
		// 通知邏輯和父類是共用的
		// Notify single subscriber for slow event.
		notifySubscriber(subscriber, event);
	}
}

提示:
DefaultPublisher是一個發布器只負責發布一個事件,並通知訂閱了這個事件的所有訂閱者;DefaultSharePublisher則是一個發布器可以發布多個事件,並通知訂閱了這個事件的所有訂閱者。

通知中心(NotifyCenter)

NotifyCenter 在Nacos中主要用於註冊發布者、調用發布者發布事件、為發布者註冊訂閱者、為指定的事件增加指定的訂閱者等操作。可以說它完全接管了訂閱者、發布者和事件他們的組合過程。直接調用通知中心的相關方法即可實現事件發布訂閱者註冊等功能。

初始化資訊

package com.alibaba.nacos.common.notify;

public class NotifyCenter {
	
    /**
     * 單事件發布者內部的事件隊列初始容量
     */
    public static int ringBufferSize = 16384;

    /**
     * 多事件發布者內部的事件隊列初始容量
     */
    public static int shareBufferSize = 1024;

    /**
     * 發布者的狀態
     */
    private static final AtomicBoolean CLOSED = new AtomicBoolean(false);

    /**
     * 構造發布者的工廠
     */
    private static BiFunction<Class<? extends Event>, Integer, EventPublisher> publisherFactory = null;

    /**
     * 通知中心的實例
     */
    private static final NotifyCenter INSTANCE = new NotifyCenter();

    /**
     * 默認的多事件發布者
     */
    private DefaultSharePublisher sharePublisher;

    /**
     * 默認的單事件發布者類型
     * 此處並未直接指定單事件發布者是誰,只是限定了它的類別
     * 因為單事件發布者一個發布者只負責一個事件,因此會存在
     * 多個發布者實例,後面按需創建,並快取在publisherMap
     */
    private static Class<? extends EventPublisher> clazz = null;

    /**
     * Publisher management container.
     * 單事件發布者存儲容器
     */
    private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<String, EventPublisher>(16);
	
	// 省略部分程式碼
}

可以看到它初始化了一個通知中心的實例,這裡是單例模式。定義了發布者。訂閱者是保存在發布者的內部,而發布者又保存在通知者的內部。這樣就組成了一套完整的事件發布機制。

靜態程式碼塊

static {

	// 初始化DefaultPublisher的queue容量值
	// Internal ArrayBlockingQueue buffer size. For applications with high write throughput,
	// this value needs to be increased appropriately. default value is 16384
	String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size";
	ringBufferSize = Integer.getInteger(ringBufferSizeProperty, 16384);
	
	// 初始化DefaultSharePublisher的queue容量值
	// The size of the public publisher's message staging queue buffer
	String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size";
	shareBufferSize = Integer.getInteger(shareBufferSizeProperty, 1024);
	
	// 使用Nacos SPI機制獲取事件發布者
	final Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class);
	
	// 獲取迭代器
	Iterator<EventPublisher> iterator = publishers.iterator();
	
	if (iterator.hasNext()) {
		clazz = iterator.next().getClass();
	} else {
		// 若為空,則使用默認的發布器(單事件發布者)
		clazz = DefaultPublisher.class;
	}
	
	// 聲明發布者工廠為一個函數,用於創建發布者實例
	publisherFactory = new BiFunction<Class<? extends Event>, Integer, EventPublisher>() {
		
		/**
		 * 為指定類型的事件創建一個單事件發布者對象
		 * @param cls       事件類型
		 * @param buffer    發布者內部隊列初始容量
		 * @return
		 */
		@Override
		public EventPublisher apply(Class<? extends Event> cls, Integer buffer) {
			try {
				// 實例化發布者
				EventPublisher publisher = clazz.newInstance();
				// 初始化
				publisher.init(cls, buffer);
				return publisher;
			} catch (Throwable ex) {
				LOGGER.error("Service class newInstance has error : {}", ex);
				throw new NacosRuntimeException(SERVER_ERROR, ex);
			}
		}
	};
	
	try {
		// 初始化多事件發布者
		// Create and init DefaultSharePublisher instance.
		INSTANCE.sharePublisher = new DefaultSharePublisher();
		INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);
		
	} catch (Throwable ex) {
		LOGGER.error("Service class newInstance has error : {}", ex);
	}

	// 增加關閉鉤子,用於關閉Publisher
	ThreadUtils.addShutdownHook(new Runnable() {
		@Override
		public void run() {
			shutdown();
		}
	});

    }

在靜態程式碼塊中主要就做了兩件事:

初始化單事件發布者:可以由用戶擴展指定(通過Nacos SPI機制),也可以是Nacos默認的(DefaultPublisher)。

初始化多事件發布者:DefaultSharePublisher。

註冊訂閱者

註冊訂閱者實際上就是將Subscriber添加到Publisher中。因為事件的發布是靠發布者來通知它內部的所有訂閱者。

/**
 * Register a Subscriber. If the Publisher concerned by the Subscriber does not exist, then PublihserMap will
 * preempt a placeholder Publisher first.
 *
 * @param consumer subscriber
 * @param <T>      event type
 */
public static <T> void registerSubscriber(final Subscriber consumer) {
	
	// 若想監聽多個事件,實現SmartSubscriber.subscribeTypes()方法,在裡面返回多個事件的列表即可
	// If you want to listen to multiple events, you do it separately,
	// based on subclass's subscribeTypes method return list, it can register to publisher.
	
	// 多事件訂閱者註冊
	if (consumer instanceof SmartSubscriber) {
		// 獲取事件列表
		for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
			// 判斷它的事件類型來決定採用哪種Publisher,多事件訂閱者由多事件發布者調度
			// For case, producer: defaultSharePublisher -> consumer: smartSubscriber.
			if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
				//註冊到多事件發布者中
				INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
			} else {
				// 註冊到單事件發布者中
				// For case, producer: defaultPublisher -> consumer: subscriber.
				addSubscriber(consumer, subscribeType);
			}
		}
		return;
	}
	
	// 單事件的訂閱者註冊
	final Class<? extends Event> subscribeType = consumer.subscribeType();
	// 防止誤使用,萬一有人在使用單事件訂閱者Subscriber的時候傳入了SlowEvent則可以在此避免
	if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
		INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
		// 添加完畢返回
		return;
	}

	// 註冊到單事件發布者中
	addSubscriber(consumer, subscribeType);
}

/**
 * 單事件發布者添加訂閱者
 * Add a subscriber to publisher.
 * @param consumer      subscriber instance.
 * @param subscribeType subscribeType.
 */
private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType) {
	// 獲取類的規範名稱,實際上就是包名加類名,作為topic
	final String topic = ClassUtils.getCanonicalName(subscribeType);
	synchronized (NotifyCenter.class) {
		// MapUtils.computeIfAbsent is a unsafe method.
		
		/**
		 * 生成指定類型的發布者,並將其放入publisherMap中
		 * 使用topic為key從publisherMap獲取數據,若為空則使用publisherFactory函數並傳遞subscribeType和ringBufferSize來實例
		 * 化一個clazz類型的發布者對象,使用topic為key放入publisherMap中,實際上就是為每一個類型的事件創建一個發布者。具體
		 * 可查看publisherFactory的邏輯。
		 */
		MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, subscribeType, ringBufferSize);
	}
	// 獲取生成的發布者對象,將訂閱者添加進去
	EventPublisher publisher = INSTANCE.publisherMap.get(topic);
	publisher.addSubscriber(consumer);
}

提示:
單事件發布者容器內的存儲狀態為: 事件類型的完整限定名 -> DefaultPublisher.
例如:
com.alibaba.nacos.core.cluster.MembersChangeEvent -> {DefaultPublisher@6839} “Thread[nacos.publisher-com.alibaba.nacos.core.cluster.MembersChangeEvent,5,main]”

註冊發布者

實際上並沒有直接的註冊發布者這個概念,通過前面的章節你肯定知道發布者就兩種類型:單事件發布者、多事件發布者。單事件發布者直接就一個實例,多事件發布者會根據事件類型創建不同的實例,存儲於publisherMap中。它已經在通知中心了,因此並不需要有刻意的註冊動作。需要使用的時候
直接取即可。

註冊事件

註冊事件實際上就是將具體的事件和具體的發布者進行關聯,發布者有2種類型,那麼事件也一定是兩種類型了(事件的類型這裡說的是分類,服務於單事件發布者的事件和服務於多事件發布者的事件)。

/**
 * Register publisher.
 *
 * @param eventType    class Instances type of the event type.
 * @param queueMaxSize the publisher's queue max size.
 */
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) {

	// 慢事件由多事件發布者處理
	if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
		return INSTANCE.sharePublisher;
	}
	// 若不是慢事件,因為它可以存在多個不同的類型,因此需要判斷對應的發布者是否存在
	final String topic = ClassUtils.getCanonicalName(eventType);
	synchronized (NotifyCenter.class) {
		// 當前傳入的事件類型對應的發布者,有則忽略無則新建
		MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, eventType, queueMaxSize);
	}
	return INSTANCE.publisherMap.get(topic);
}

這裡並未有註冊動作,若是SlowEvent則直接返回了,為何呢?這裡再理一下關係,事件的實際用途是由訂閱者來決定的,由訂閱者來執行對應事件觸發後的操作,事件和發布者並沒有直接關係。而多事件發布者呢,它是一個發布者來處理所有的事件和訂閱者(事件:訂閱者,一對多的關係),這個事件都沒人訂閱何談發布呢?因此單純的註冊事件並沒有實際意義。反觀一次只能處理一個事件的單事件處理器(DefaultPublisher)則需要一個事件對應一個發布者,即便這個事件沒有人訂閱,也可以快取起來。

註銷訂閱者

註銷的操作基本上就是註冊的反向操作。

public static <T> void deregisterSubscriber(final Subscriber consumer) {
	// 若是多事件訂閱者
	if (consumer instanceof SmartSubscriber) {
		// 獲取事件列表
		for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
			// 若是慢事件
			if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
				// 從多事件發布者中移除
				INSTANCE.sharePublisher.removeSubscriber(consumer, subscribeType);
			} else {
				// 從單事件發布者中移除
				removeSubscriber(consumer, subscribeType);
			}
		}
		return;
	}
	
	// 若是單事件訂閱者
	final Class<? extends Event> subscribeType = consumer.subscribeType();
	// 判斷是否是慢事件
	if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
		INSTANCE.sharePublisher.removeSubscriber(consumer, subscribeType);
		return;
	}
	
	// 調用移除方法
	if (removeSubscriber(consumer, subscribeType)) {
		return;
	}
	throw new NoSuchElementException("The subscriber has no event publisher");
}

private static boolean removeSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType) {
	// 獲取topic
	final String topic = ClassUtils.getCanonicalName(subscribeType);
	// 根據topic獲取對應的發布者
	EventPublisher eventPublisher = INSTANCE.publisherMap.get(topic);
	if (eventPublisher != null) {
		// 從發布者中移除訂閱者
		eventPublisher.removeSubscriber(consumer);
		return true;
	}
	return false;
}

註銷發布者

註銷發布者主要針對於單事件發布者來說的,因為多事件發布者只有一個實例,它需要處理多個事件類型,因此發布者不能移除。而單事件發布者一個發布者對應一個事件類型,因此某個類型的事件不需要處理的時候則需要將對應的發布者移除。

public static void deregisterPublisher(final Class<? extends Event> eventType) {
	// 獲取topic
	final String topic = ClassUtils.getCanonicalName(eventType);
	// 根據topic移除對應的發布者
	EventPublisher publisher = INSTANCE.publisherMap.remove(topic);
	try {
		// 調用關閉方法
		publisher.shutdown();
	} catch (Throwable ex) {
		LOGGER.error("There was an exception when publisher shutdown : {}", ex);
	}
}

public void shutdown() {
	// 標記關閉
	this.shutdown = true;
	// 清空快取
	this.queue.clear();
}

發布事件

發布事件的本質就是不同類型的發布者來調用內部維護的訂閱者的onEvent()方法。

private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
	
	// 慢事件處理
	if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
		return INSTANCE.sharePublisher.publish(event);
	}
	
	// 常規事件處理
	final String topic = ClassUtils.getCanonicalName(eventType);
	
	EventPublisher publisher = INSTANCE.publisherMap.get(topic);
	if (publisher != null) {
		return publisher.publish(event);
	}
	LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
	return false;
}

總結

在Nacos中的事件發布分為兩條線:單一事件處理、多事件處理。圍繞這兩條線又有負責單一類型事件的訂閱者、發布者,也有負責多事件的訂閱者、發布者。區分開來兩種類型便很容易理解。

上圖展示了在通知中心中不同類型的事件、訂閱者、發布者的存儲狀態。

多事件發布者:

  • 發布者和事件的關係是一對多
  • 事件和訂閱者的關係是一對多
  • 發布者和訂閱者的關係是一對多
  • 事件類型為SlowEvent, 訂閱者類型是SmartSubscriber

單事件發布者

  • 發布者和事件的關係是一對一
  • 事件和訂閱者的關係是一對多
  • 發布者和訂閱者的關係是一對多
  • 事件類型為Event,訂閱者類型是Subscriber