Nacos源碼分析-事件發布機制
溫馨提示:
本文內容基於個人學習Nacos 2.0.1版本程式碼總結而來,因個人理解差異,不保證完全正確。如有理解錯誤之處歡迎各位拍磚指正,相互學習;轉載請註明出處。
Nacos的服務註冊、服務變更等功能都是通過事件發布來通知的,搞清楚事件發布訂閱的機制,有利於理解業務的流程走向。本文將淺顯的分析Nacos中的事件發布訂閱實現。
事件(Event)
常規事件(Event)
package com.alibaba.nacos.common.notify;
public abstract class Event implements Serializable {
private static final AtomicLong SEQUENCE = new AtomicLong(0);
private final long sequence = SEQUENCE.getAndIncrement();
/**
* Event sequence number, which can be used to handle the sequence of events.
*
* @return sequence num, It's best to make sure it's monotone.
*/
public long sequence() {
return sequence;
}
}
在事件抽象類中定義了一個事件的序列號,它是自增的。用於區分事件執行的前後順序。它是由DefaultPublisher來處理。
慢事件(SlowEvent)
之所以稱之為慢事件,可能因為所有的事件都共享同一個隊列吧。
package com.alibaba.nacos.common.notify;
/**
* This event share one event-queue.
* @author <a href="mailto:[email protected]">liaochuntao</a>
* @author zongtanghu
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class SlowEvent extends Event {
@Override
public long sequence() {
return 0;
}
}
提示:
SlowEvent可以共享一個事件隊列,也就是一個發布者可以同時管理多個事件的發布(區別於DefaultPublisher只能管理一個事件)。
訂閱者(Subscriber)
單事件訂閱者
這裡的單事件訂閱者指的是當前的訂閱者只能訂閱一種類型的事件。
package com.alibaba.nacos.common.notify.listener;
/**
* An abstract subscriber class for subscriber interface.
* @author <a href="mailto:[email protected]">liaochuntao</a>
* @author zongtanghu
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class Subscriber<T extends Event> {
/**
* Event callback.
* 事件處理入口,由對應的事件發布器調用
* @param event {@link Event}
*/
public abstract void onEvent(T event);
/**
* Type of this subscriber's subscription.
* 訂閱的事件類型
* @return Class which extends {@link Event}
*/
public abstract Class<? extends Event> subscribeType();
/**
* It is up to the listener to determine whether the callback is asynchronous or synchronous.
* 執行緒執行器,由具體的實現類來決定是非同步還是同步調用
* @return {@link Executor}
*/
public Executor executor() {
return null;
}
/**
* Whether to ignore expired events.
* 是否忽略過期事件
* @return default value is {@link Boolean#FALSE}
*/
public boolean ignoreExpireEvent() {
return false;
}
}
這是默認的訂閱者對象,默認情況下一個訂閱者只能訂閱一個類型的事件。
多事件訂閱者
package com.alibaba.nacos.common.notify.listener;
/**
* Subscribers to multiple events can be listened to.
*
* @author <a href="mailto:[email protected]">liaochuntao</a>
* @author zongtanghu
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class SmartSubscriber extends Subscriber {
/**
* Returns which event type are smartsubscriber interested in.
* 區別於父類,這裡支援多個事件類型
* @return The interestd event types.
*/
public abstract List<Class<? extends Event>> subscribeTypes();
@Override
public final Class<? extends Event> subscribeType() {
// 採用final修飾,禁止使用單一事件屬性
return null;
}
@Override
public final boolean ignoreExpireEvent() {
return false;
}
}
提示
SmartSubscriber和Subscriber的區別是一個可以訂閱多個事件,一個只能訂閱一個事件,處理它們的發布者也不同。
發布者(Publisher)
發布者指的是Nacos中的事件發布者,頂級介面為EventPublisher。
package com.alibaba.nacos.common.notify;
/**
* Event publisher.
*
* @author <a href="mailto:[email protected]">liaochuntao</a>
* @author zongtanghu
*/
public interface EventPublisher extends Closeable {
/**
* Initializes the event publisher.
* 初始化事件發布者
* @param type {@link Event >}
* @param bufferSize Message staging queue size
*/
void init(Class<? extends Event> type, int bufferSize);
/**
* The number of currently staged events.
* 當前暫存的事件數量
* @return event size
*/
long currentEventSize();
/**
* Add listener.
* 添加訂閱者
* @param subscriber {@link Subscriber}
*/
void addSubscriber(Subscriber subscriber);
/**
* Remove listener.
* 移除訂閱者
* @param subscriber {@link Subscriber}
*/
void removeSubscriber(Subscriber subscriber);
/**
* publish event.
* 發布事件
* @param event {@link Event}
* @return publish event is success
*/
boolean publish(Event event);
/**
* Notify listener.
* 通知訂閱者
* @param subscriber {@link Subscriber}
* @param event {@link Event}
*/
void notifySubscriber(Subscriber subscriber, Event event);
}
發布者的主要功能就是新增訂閱者、通知訂閱者,目前有兩種類型的發布者分別是DefaultPublisher和DefaultSharePublisher。
單事件發布者(DefaultPublisher)
一個發布者實例只能處理一種類型的事件。
public class DefaultPublisher extends Thread implements EventPublisher {
// 發布者是否初始化完畢
private volatile boolean initialized = false;
// 是否關閉了發布者
private volatile boolean shutdown = false;
// 事件的類型
private Class<? extends Event> eventType;
// 訂閱者列表
protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<Subscriber>();
// 隊列最大容量
private int queueMaxSize = -1;
// 隊列類型
private BlockingQueue<Event> queue;
// 最後一個事件的序列號
protected volatile Long lastEventSequence = -1L;
// 事件序列號更新對象,用於更新原子屬性lastEventSequence
private static final AtomicReferenceFieldUpdater<DefaultPublisher, Long> UPDATER = AtomicReferenceFieldUpdater.newUpdater(DefaultPublisher.class, Long.class, "lastEventSequence");
}
發布者的初始化:
public void init(Class<? extends Event> type, int bufferSize) {
setDaemon(true);
setName("nacos.publisher-" + type.getName());
this.eventType = type;
this.queueMaxSize = bufferSize;
this.queue = new ArrayBlockingQueue<Event>(bufferSize);
start();
}
在初始化方法中,將其設置為了守護執行緒,意味著它將持續運行(它需要持續監控內部的事件隊列),傳入的type屬性為當前發布者需要處理的事件類型,設置當前執行緒的名稱以事件類型為區分,它將會以多個執行緒的形式存在,每個執行緒代表一種事件類型的發布者,後面初始化了隊列的長度。最後調用啟動方法完成當前執行緒的啟動。
發布者執行緒啟動
public synchronized void start() {
if (!initialized) {
// start just called once
super.start();
if (queueMaxSize == -1) {
queueMaxSize = ringBufferSize;
}
initialized = true;
}
}
直接調用了Thread的start方法開啟守護執行緒,並設置初始化狀態為true。根據java執行緒的啟動方式,調用start方法之後start方法是會調用run方法的。
public void run() {
openEventHandler();
}
void openEventHandler() {
try {
// This variable is defined to resolve the problem which message overstock in the queue.
int waitTimes = 60;
// To ensure that messages are not lost, enable EventHandler when
// waiting for the first Subscriber to register
for (; ; ) {
// 執行緒終止條件判斷
if (shutdown || hasSubscriber() || waitTimes <= 0) {
break;
}
// 執行緒休眠1秒
ThreadUtils.sleep(1000L);
// 等待次數減1
waitTimes--;
}
for (; ; ) {
// 執行緒終止條件判斷
if (shutdown) {
break;
}
// 從隊列取出事件
final Event event = queue.take();
// 接收事件
receiveEvent(event);
// 更新事件序列號
UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
}
} catch (Throwable ex) {
LOGGER.error("Event listener exception : {}", ex);
}
}
在run方法中調用了openEventHandler()方法。那發布者的實際工作原理就存在於這個方法內部。在首次啟動的時候會等待1分鐘,然後再進行消息消費。
接收並發布事件
這裡的接收事件指的是接收通知中心發過來的事件,發布給訂閱者。
void receiveEvent(Event event) {
// 獲取當前事件的序列號,它是自增的
final long currentEventSequence = event.sequence();
// 通知所有訂閱了該事件的訂閱者
// Notification single event listener
for (Subscriber subscriber : subscribers) {
// 判斷訂閱者是否忽略事件過期,判斷當前事件是否被處理過(lastEventSequence初始化的值為-1,而Event的sequence初始化的值為0)
// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", event.getClass());
continue;
}
// Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
// Remove original judge part of codes.
notifySubscriber(subscriber, event);
}
}
public void notifySubscriber(final Subscriber subscriber, final Event event) {
LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
// 為每個訂閱者創建一個Runnable對象
final Runnable job = () -> subscriber.onEvent(event);
// 使用訂閱者的執行緒執行器
final Executor executor = subscriber.executor();
// 若訂閱者沒有自己的執行器,則直接執行run方法啟動訂閱者消費執行緒
if (executor != null) {
executor.execute(job);
} else {
try {
job.run();
} catch (Throwable e) {
LOGGER.error("Event callback exception: ", e);
}
}
}
外部調用發布事件
前面的發布事件是指從隊列內部獲取事件並通知訂閱者,這裡的發布事件區別在於它是開放給外部調用者,接收統一通知中心的事件並放入隊列中的。
public boolean publish(Event event) {
checkIsStart();
boolean success = this.queue.offer(event);
if (!success) {
LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
receiveEvent(event);
return true;
}
return true;
}
在放入隊列成功的時候直接返回,若放入隊列失敗,則是直接同步發送事件給訂閱者,不經過隊列。這裡的同步我認為的是從調用者到發布者調用訂閱者之間是同步的,若隊列可用,則是調用者到入隊列就完成了本次調用,不需要等待循環通知訂閱者。使用隊列解耦無疑會提升通知中心的工作效率。
總體來說就是一個發布者內部維護一個BlockingQueue,在實現上使用了ArrayBlockingQueue,它是一個有界阻塞隊列,元素先進先出。並且使用非公平模式提升性能,意味著等待消費的訂閱者執行順序將得不到保障(業務需求沒有這種順序性要求)。同時也維護了一個訂閱者集合(他們都訂閱了同一個事件類型),在死循環中不斷從ArrayBlockingQueue中獲取數據來循環通知每一個訂閱者,也就是調用訂閱者的onEvent()方法。
多事件發布者(DefaultSharePublisher)
用於發布SlowEvent事件並通知所有訂閱了該事件的訂閱者。
public class DefaultSharePublisher extends DefaultPublisher {
// 用於保存事件類型為SlowEvent的訂閱者,一個事件類型對應多個訂閱者
private final Map<Class<? extends SlowEvent>, Set<Subscriber>> subMappings = new ConcurrentHashMap<Class<? extends SlowEvent>, Set<Subscriber>>();
// 可重入鎖
private final Lock lock = new ReentrantLock();
}
它繼承了DefaultPublisher,意味著它將擁有其所有的特性。從subMappings屬性來看,這個發布器是支援多個SlowEvent事件的。DefaultSharePublisher重載了DefaultPublisher的addSubscriber()和removeSubscriber()方法,用於處理多事件類型的情形。
添加訂閱者:
public void addSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType) {
// 將事件類型轉換為當前發布者支援的類型
// Actually, do a classification based on the slowEvent type.
Class<? extends SlowEvent> subSlowEventType = (Class<? extends SlowEvent>) subscribeType;
// 添加到父類的訂閱者列表中,為何要添加呢?因為它需要使用父類的隊列消費邏輯
// For adding to parent class attributes synchronization.
subscribers.add(subscriber);
// 為多個操作加鎖
lock.lock();
try {
// 首先從事件訂閱列表裡面獲取當前事件對應的訂閱者集合
Set<Subscriber> sets = subMappings.get(subSlowEventType);
// 若沒有訂閱者,則新增當前訂閱者
if (sets == null) {
Set<Subscriber> newSet = new ConcurrentHashSet<Subscriber>();
newSet.add(subscriber);
subMappings.put(subSlowEventType, newSet);
return;
}
// 若當前事件訂閱者列表不為空,則插入,因為使用的是Set集合因此可以避免重複數據
sets.add(subscriber);
} finally {
// 別忘了解鎖
lock.unlock();
}
}
提示:
SetnewSet = new ConcurrentHashSet (); 它這裡實際上使用的是自己實現的ConcurrentHashSet,它內部使用了ConcurrentHashMap來實現存儲。
在ConcurrentHashSet.add()方法的實現上,它以當前插入的Subscriber對象為key,以一個Boolean值佔位:map.putIfAbsent(o, Boolean.TRUE)。
事件類型和訂閱者的存儲狀態為:
EventType1 -> {Subscriber1, Subscriber2, Subscriber3…}
EventType2 -> {Subscriber1, Subscriber2, Subscriber3…}
EventType3 -> {Subscriber1, Subscriber2, Subscriber3…}
感興趣的可以自己查閱一下源碼。
移除訂閱者:
public void removeSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType) {
// 轉換類型
// Actually, do a classification based on the slowEvent type.
Class<? extends SlowEvent> subSlowEventType = (Class<? extends SlowEvent>) subscribeType;
// 先移除父類中的訂閱者
// For removing to parent class attributes synchronization.
subscribers.remove(subscriber);
// 加鎖
lock.lock();
try {
// 移除指定事件的指定訂閱者
Set<Subscriber> sets = subMappings.get(subSlowEventType);
if (sets != null) {
sets.remove(subscriber);
}
} finally {
// 解鎖
lock.unlock();
}
}
接收事件:
@Override
public void receiveEvent(Event event) {
// 獲取當前事件的序列號
final long currentEventSequence = event.sequence();
// 獲取事件的類型,轉換為當前發布器支援的事件
// get subscriber set based on the slow EventType.
final Class<? extends SlowEvent> slowEventType = (Class<? extends SlowEvent>) event.getClass();
// 獲取當前事件的訂閱者列表
// Get for Map, the algorithm is O(1).
Set<Subscriber> subscribers = subMappings.get(slowEventType);
if (null == subscribers) {
LOGGER.debug("[NotifyCenter] No subscribers for slow event {}", slowEventType.getName());
return;
}
// 循環通知所有訂閱者
// Notification single event subscriber
for (Subscriber subscriber : subscribers) {
// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", event.getClass());
continue;
}
// 通知邏輯和父類是共用的
// Notify single subscriber for slow event.
notifySubscriber(subscriber, event);
}
}
提示:
DefaultPublisher是一個發布器只負責發布一個事件,並通知訂閱了這個事件的所有訂閱者;DefaultSharePublisher則是一個發布器可以發布多個事件,並通知訂閱了這個事件的所有訂閱者。
通知中心(NotifyCenter)
NotifyCenter 在Nacos中主要用於註冊發布者、調用發布者發布事件、為發布者註冊訂閱者、為指定的事件增加指定的訂閱者等操作。可以說它完全接管了訂閱者、發布者和事件他們的組合過程。直接調用通知中心的相關方法即可實現事件發布訂閱者註冊等功能。
初始化資訊
package com.alibaba.nacos.common.notify;
public class NotifyCenter {
/**
* 單事件發布者內部的事件隊列初始容量
*/
public static int ringBufferSize = 16384;
/**
* 多事件發布者內部的事件隊列初始容量
*/
public static int shareBufferSize = 1024;
/**
* 發布者的狀態
*/
private static final AtomicBoolean CLOSED = new AtomicBoolean(false);
/**
* 構造發布者的工廠
*/
private static BiFunction<Class<? extends Event>, Integer, EventPublisher> publisherFactory = null;
/**
* 通知中心的實例
*/
private static final NotifyCenter INSTANCE = new NotifyCenter();
/**
* 默認的多事件發布者
*/
private DefaultSharePublisher sharePublisher;
/**
* 默認的單事件發布者類型
* 此處並未直接指定單事件發布者是誰,只是限定了它的類別
* 因為單事件發布者一個發布者只負責一個事件,因此會存在
* 多個發布者實例,後面按需創建,並快取在publisherMap
*/
private static Class<? extends EventPublisher> clazz = null;
/**
* Publisher management container.
* 單事件發布者存儲容器
*/
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<String, EventPublisher>(16);
// 省略部分程式碼
}
可以看到它初始化了一個通知中心的實例,這裡是單例模式。定義了發布者。訂閱者是保存在發布者的內部,而發布者又保存在通知者的內部。這樣就組成了一套完整的事件發布機制。
靜態程式碼塊:
static {
// 初始化DefaultPublisher的queue容量值
// Internal ArrayBlockingQueue buffer size. For applications with high write throughput,
// this value needs to be increased appropriately. default value is 16384
String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size";
ringBufferSize = Integer.getInteger(ringBufferSizeProperty, 16384);
// 初始化DefaultSharePublisher的queue容量值
// The size of the public publisher's message staging queue buffer
String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size";
shareBufferSize = Integer.getInteger(shareBufferSizeProperty, 1024);
// 使用Nacos SPI機制獲取事件發布者
final Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class);
// 獲取迭代器
Iterator<EventPublisher> iterator = publishers.iterator();
if (iterator.hasNext()) {
clazz = iterator.next().getClass();
} else {
// 若為空,則使用默認的發布器(單事件發布者)
clazz = DefaultPublisher.class;
}
// 聲明發布者工廠為一個函數,用於創建發布者實例
publisherFactory = new BiFunction<Class<? extends Event>, Integer, EventPublisher>() {
/**
* 為指定類型的事件創建一個單事件發布者對象
* @param cls 事件類型
* @param buffer 發布者內部隊列初始容量
* @return
*/
@Override
public EventPublisher apply(Class<? extends Event> cls, Integer buffer) {
try {
// 實例化發布者
EventPublisher publisher = clazz.newInstance();
// 初始化
publisher.init(cls, buffer);
return publisher;
} catch (Throwable ex) {
LOGGER.error("Service class newInstance has error : {}", ex);
throw new NacosRuntimeException(SERVER_ERROR, ex);
}
}
};
try {
// 初始化多事件發布者
// Create and init DefaultSharePublisher instance.
INSTANCE.sharePublisher = new DefaultSharePublisher();
INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);
} catch (Throwable ex) {
LOGGER.error("Service class newInstance has error : {}", ex);
}
// 增加關閉鉤子,用於關閉Publisher
ThreadUtils.addShutdownHook(new Runnable() {
@Override
public void run() {
shutdown();
}
});
}
在靜態程式碼塊中主要就做了兩件事:
初始化單事件發布者:可以由用戶擴展指定(通過Nacos SPI機制),也可以是Nacos默認的(DefaultPublisher)。
初始化多事件發布者:DefaultSharePublisher。
註冊訂閱者
註冊訂閱者實際上就是將Subscriber添加到Publisher中。因為事件的發布是靠發布者來通知它內部的所有訂閱者。
/**
* Register a Subscriber. If the Publisher concerned by the Subscriber does not exist, then PublihserMap will
* preempt a placeholder Publisher first.
*
* @param consumer subscriber
* @param <T> event type
*/
public static <T> void registerSubscriber(final Subscriber consumer) {
// 若想監聽多個事件,實現SmartSubscriber.subscribeTypes()方法,在裡面返回多個事件的列表即可
// If you want to listen to multiple events, you do it separately,
// based on subclass's subscribeTypes method return list, it can register to publisher.
// 多事件訂閱者註冊
if (consumer instanceof SmartSubscriber) {
// 獲取事件列表
for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
// 判斷它的事件類型來決定採用哪種Publisher,多事件訂閱者由多事件發布者調度
// For case, producer: defaultSharePublisher -> consumer: smartSubscriber.
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
//註冊到多事件發布者中
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
} else {
// 註冊到單事件發布者中
// For case, producer: defaultPublisher -> consumer: subscriber.
addSubscriber(consumer, subscribeType);
}
}
return;
}
// 單事件的訂閱者註冊
final Class<? extends Event> subscribeType = consumer.subscribeType();
// 防止誤使用,萬一有人在使用單事件訂閱者Subscriber的時候傳入了SlowEvent則可以在此避免
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
// 添加完畢返回
return;
}
// 註冊到單事件發布者中
addSubscriber(consumer, subscribeType);
}
/**
* 單事件發布者添加訂閱者
* Add a subscriber to publisher.
* @param consumer subscriber instance.
* @param subscribeType subscribeType.
*/
private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType) {
// 獲取類的規範名稱,實際上就是包名加類名,作為topic
final String topic = ClassUtils.getCanonicalName(subscribeType);
synchronized (NotifyCenter.class) {
// MapUtils.computeIfAbsent is a unsafe method.
/**
* 生成指定類型的發布者,並將其放入publisherMap中
* 使用topic為key從publisherMap獲取數據,若為空則使用publisherFactory函數並傳遞subscribeType和ringBufferSize來實例
* 化一個clazz類型的發布者對象,使用topic為key放入publisherMap中,實際上就是為每一個類型的事件創建一個發布者。具體
* 可查看publisherFactory的邏輯。
*/
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, subscribeType, ringBufferSize);
}
// 獲取生成的發布者對象,將訂閱者添加進去
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
publisher.addSubscriber(consumer);
}
提示:
單事件發布者容器內的存儲狀態為: 事件類型的完整限定名 -> DefaultPublisher.
例如:
com.alibaba.nacos.core.cluster.MembersChangeEvent -> {DefaultPublisher@6839} “Thread[nacos.publisher-com.alibaba.nacos.core.cluster.MembersChangeEvent,5,main]”
註冊發布者
實際上並沒有直接的註冊發布者這個概念,通過前面的章節你肯定知道發布者就兩種類型:單事件發布者、多事件發布者。單事件發布者直接就一個實例,多事件發布者會根據事件類型創建不同的實例,存儲於publisherMap
中。它已經在通知中心了,因此並不需要有刻意的註冊
動作。需要使用的時候
直接取即可。
註冊事件
註冊事件實際上就是將具體的事件和具體的發布者進行關聯,發布者有2種類型,那麼事件也一定是兩種類型了(事件的類型這裡說的是分類,服務於單事件發布者的事件和服務於多事件發布者的事件)。
/**
* Register publisher.
*
* @param eventType class Instances type of the event type.
* @param queueMaxSize the publisher's queue max size.
*/
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) {
// 慢事件由多事件發布者處理
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher;
}
// 若不是慢事件,因為它可以存在多個不同的類型,因此需要判斷對應的發布者是否存在
final String topic = ClassUtils.getCanonicalName(eventType);
synchronized (NotifyCenter.class) {
// 當前傳入的事件類型對應的發布者,有則忽略無則新建
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, publisherFactory, eventType, queueMaxSize);
}
return INSTANCE.publisherMap.get(topic);
}
這裡並未有註冊動作,若是SlowEvent則直接返回了,為何呢?這裡再理一下關係,事件的實際用途是由訂閱者來決定的,由訂閱者來執行對應事件觸發後的操作,事件和發布者並沒有直接關係。而多事件發布者呢,它是一個發布者來處理所有的事件和訂閱者(事件:訂閱者,一對多的關係),這個事件都沒人訂閱何談發布呢?因此單純的註冊事件並沒有實際意義。反觀一次只能處理一個事件的單事件處理器(DefaultPublisher)則需要一個事件對應一個發布者,即便這個事件沒有人訂閱,也可以快取起來。
註銷訂閱者
註銷的操作基本上就是註冊的反向操作。
public static <T> void deregisterSubscriber(final Subscriber consumer) {
// 若是多事件訂閱者
if (consumer instanceof SmartSubscriber) {
// 獲取事件列表
for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
// 若是慢事件
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
// 從多事件發布者中移除
INSTANCE.sharePublisher.removeSubscriber(consumer, subscribeType);
} else {
// 從單事件發布者中移除
removeSubscriber(consumer, subscribeType);
}
}
return;
}
// 若是單事件訂閱者
final Class<? extends Event> subscribeType = consumer.subscribeType();
// 判斷是否是慢事件
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.removeSubscriber(consumer, subscribeType);
return;
}
// 調用移除方法
if (removeSubscriber(consumer, subscribeType)) {
return;
}
throw new NoSuchElementException("The subscriber has no event publisher");
}
private static boolean removeSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType) {
// 獲取topic
final String topic = ClassUtils.getCanonicalName(subscribeType);
// 根據topic獲取對應的發布者
EventPublisher eventPublisher = INSTANCE.publisherMap.get(topic);
if (eventPublisher != null) {
// 從發布者中移除訂閱者
eventPublisher.removeSubscriber(consumer);
return true;
}
return false;
}
註銷發布者
註銷發布者主要針對於單事件發布者來說的,因為多事件發布者只有一個實例,它需要處理多個事件類型,因此發布者不能移除。而單事件發布者一個發布者對應一個事件類型,因此某個類型的事件不需要處理的時候則需要將對應的發布者移除。
public static void deregisterPublisher(final Class<? extends Event> eventType) {
// 獲取topic
final String topic = ClassUtils.getCanonicalName(eventType);
// 根據topic移除對應的發布者
EventPublisher publisher = INSTANCE.publisherMap.remove(topic);
try {
// 調用關閉方法
publisher.shutdown();
} catch (Throwable ex) {
LOGGER.error("There was an exception when publisher shutdown : {}", ex);
}
}
public void shutdown() {
// 標記關閉
this.shutdown = true;
// 清空快取
this.queue.clear();
}
發布事件
發布事件的本質就是不同類型的發布者來調用內部維護的訂閱者的onEvent()
方法。
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
// 慢事件處理
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher.publish(event);
}
// 常規事件處理
final String topic = ClassUtils.getCanonicalName(eventType);
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher != null) {
return publisher.publish(event);
}
LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
return false;
}
總結
在Nacos中的事件發布分為兩條線:單一事件處理、多事件處理。圍繞這兩條線又有負責單一類型事件的訂閱者、發布者,也有負責多事件的訂閱者、發布者。區分開來兩種類型便很容易理解。
上圖展示了在通知中心中不同類型的事件、訂閱者、發布者的存儲狀態。
多事件發布者:
- 發布者和事件的關係是一對多
- 事件和訂閱者的關係是一對多
- 發布者和訂閱者的關係是一對多
- 事件類型為SlowEvent, 訂閱者類型是SmartSubscriber
單事件發布者
- 發布者和事件的關係是一對一
- 事件和訂閱者的關係是一對多
- 發布者和訂閱者的關係是一對多
- 事件類型為Event,訂閱者類型是Subscriber