Nacos源碼系列—訂閱機制的前因後果(下)
點贊再看,養成習慣,微信搜索【牧小農】關注我獲取更多資訊,風裡雨里,小農等你,很高興能夠成為你的朋友。
項目源碼地址:公眾號回復 nacos,即可免費獲取源碼
事件發布
在上一節中我們講解了在NotifyCenter
中維護了事件名稱和事件發布者的關係,而默認的事件發布者為DefaultPublisher
,今天我們就來講一下DefaultPublisher的事件發布的具體邏輯
首先我們來看一下DefaultPublisher
的源碼:
public class DefaultPublisher extends Thread implements EventPublisher {
@Override
public void init(Class<? extends Event> type, int bufferSize) {
//守護執行緒
setDaemon(true);
//設置執行緒名
setName("nacos.publisher-" + type.getName());
this.eventType = type;
this.queueMaxSize = bufferSize;
//阻塞隊列初始化
this.queue = new ArrayBlockingQueue<>(bufferSize);
//啟動執行緒
start();
}
@Override
public synchronized void start() {
if (!initialized) {
// start just called once
//啟動run方法
super.start();
if (queueMaxSize == -1) {
queueMaxSize = ringBufferSize;
}
initialized = true;
}
}
}
我們可以看到這個類繼承自Thread
,說明他是一個執行緒類,同時實現了EventPublisher
說明他也是一個發布者,在init()中,是以守護執行緒的方式運作的,同時初始化了一個阻塞隊列,最後調用start()啟動執行緒。
在start()裡面,其實就是啟動run():
@Override
public void run() {
openEventHandler();
}
void openEventHandler() {
try {
// This variable is defined to resolve the problem which message overstock in the queue.
int waitTimes = 60;
// To ensure that messages are not lost, enable EventHandler when
// waiting for the first Subscriber to register
//死循環遍歷,執行緒啟動設置最大延遲60秒,用來解決消息積壓問題
for (; ; ) {
if (shutdown || hasSubscriber() || waitTimes <= 0) {
break;
}
ThreadUtils.sleep(1000L);
waitTimes--;
}
//死循環從隊列中取出event對象,同時通知訂閱者(subscriber)執行event對象
for (; ; ) {
if (shutdown) {
break;
}
final Event event = queue.take();
receiveEvent(event);
UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
}
} catch (Throwable ex) {
LOGGER.error("Event listener exception : ", ex);
}
}
在上述程式碼中我們可以看到for (; ; )
這個循環出現了兩次,這個就是循環遍歷(死循環),第一個死循環我們可以理解成延時效果,裡面最大延時60秒,每隔一秒運行一次,判斷(當前執行緒是否關閉、是否有訂閱者、是否超過60秒)只要滿足其中任意一個條件,跳出循環
第二個死循環,是我們業務邏輯處理,用來消費,從隊列中取出event
事件,然後通過receiveEvent()
執行。
那麼我們可以從隊列中取出事件,那麼這個事件又在哪一步注入進去的呢,我們還是在當前類裡面,找到一個叫publish()
的方法
@Override
public boolean publish(Event event) {
checkIsStart();
//向隊列中插入元素
boolean success = this.queue.offer(event);
//判斷是否插入成功
if (!success) {
LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
//失敗直接執行
receiveEvent(event);
return true;
}
return true;
}
這個方法其實就是發布事件調用了publish
往阻塞隊列中存入事件,如果失敗那麼立即執行receiveEvent()
,不在繼續走隊列方法
void receiveEvent(Event event) {
final long currentEventSequence = event.sequence();
if (!hasSubscriber()) {
LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);
return;
}
// Notification single event listener
//循環遍歷subscribers對象
for (Subscriber subscriber : subscribers) {
// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
event.getClass());
continue;
}
// Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
// Remove original judge part of codes.
//通知訂閱者執行event
notifySubscriber(subscriber, event);
}
}
而在receiveEvent()
方法中,這裡其實就是遍歷的subscribers
集合(訂閱者),然後通過notifySubscriber()
通知訂閱者方法,而這個subscribers
集合就是在我們之前講到的NacosNamingService.init()
方法中設置的。
public class NacosNamingService implements NamingService {
private void init(Properties properties) throws NacosException {
//將Subscribe註冊到Publisher
NotifyCenter.registerSubscriber(changeNotifier);
}
}
而 NotifyCenter.registerSubscriber(changeNotifier);
會調用NotifyCenter.addSubscriber()
方法,進行最終的操作。
private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,
EventPublisherFactory factory) {
final String topic = ClassUtils.getCanonicalName(subscribeType);
synchronized (NotifyCenter.class) {
// MapUtils.computeIfAbsent is a unsafe method.
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);
}
//獲取對應的publisher
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher instanceof ShardedEventPublisher) {
((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
} else {
//添加到subscribers集合
publisher.addSubscriber(consumer);
}
}
addSubscriber()
方法的邏輯就是講訂閱事件、發布中、訂閱者三個關係進行綁定,而發布者和事件通過Map進行維護,發布者與訂閱者通過關聯關係進行維護。
我們回到剛剛DefaulePublisher.notifySubscriber()
方法,這裡是最後執行訂閱者事件的方法
@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {
LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
//執行訂閱者事件
final Runnable job = () -> subscriber.onEvent(event);
//執行者
final Executor executor = subscriber.executor();
if (executor != null) {
executor.execute(job);
} else {
try {
job.run();
} catch (Throwable e) {
LOGGER.error("Event callback exception: ", e);
}
}
}
到這裡,訂閱機制就講完了,可能會有點繞,最好是我們能夠去跟著程式碼走一遍,這樣會比較理解和記憶,在這裡我們重點需要理解NotifyCenter
對事件發布者、訂閱者以及之間關係的維護,關係維護的入口就在NacosNamingService.init()
中,我們來看一下他的核心邏輯
首先ServiceInfoHolder
中通過NotifyCenter
發布InstancesChangeEvent
事件.
NotifyCenter
獲取對應的CanonicalName
,並將這個參數作為key,從NotifyCenter.publisherMap
中獲取對應的事件發布者,然後將InstancesChangeEvent
事件進行發布.
InstancesChangeEvent
事件發布主要是通過EventPublisher
的實現類,DefaultPublisher
進行InstancesChangeEvent
事件發布,而DefaultPublisher
本身作為守護執行緒的方式進行運作,在執行業務邏輯時判斷是否執行緒啟動,如果啟動,將事件添加到隊列中,如果成功,則發布過程完成,如果添加失敗,立即執行DefaultPublisher.receiveEvent
,接收事件通知訂閱者,創建一個Runnable
對象,執行訂閱者的Event事件。
在添加到隊列成功的時候,DefaultPublisher
會創建一個阻塞隊列(BlockingQueue),標記執行緒啟動,當他執行 super.start()
,會調用它的run方法,在這個run方法裡面核心的業務邏輯就是openEventHandler()
,裡面會有兩個死循環,第一個是在執行緒啟動的60秒內執行條件,第二個是從阻塞隊列中獲取Event事件,調用DefaultPublisher.receiveEvent()
通知訂閱者,流程結束
本地快取
我們在之前的系列中,客戶端會快取一些資訊在本地中,來獲取ServiceInfo
的資訊,但是在執行本地快取的時候,難免會有一些故障,有故障就需要進行處理,在這裡主要涉及到兩個類ServiceInfoHolder
和FailoverReactor
Nacos快取主要是分為兩個方面,一個從註冊中心獲取實例資訊快取到記憶體中,通過ConcurrentMap
進行存儲,一個是通過磁碟文件的形式定時快取。
同時故障處理也分為兩個部分,一個是故障處理的開關通過文件進行標記,一個是當起來故障處理後,可以從故障備份的文件中獲取服務實例資訊。
介紹完上面幾點,我們先來詳細講解第一個核心類ServiceInfoHolder
ServiceInfoHolder
ServiceInfoHolder
類,主要是用來處理服務資訊的,每次客戶端從服務端拉取服務資訊時,都用經過這個類,而processServiceInfo
用來處理本地資訊(快取、發布、更新、本地目錄初始化)等
ServiceInfo: 註冊服務的資訊,主要包含(服務名、分組名、集群資訊、實例列表、最後一次更新時間),客戶端獲取的資訊,都是通過ServiceInfo
作為承載體,ServiceInfoHolder.ServiceInfo
,通過ConcurrentMap
進行存儲,如下所示:
public class ServiceInfoHolder implements Closeable {
private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
public ServiceInfoHolder(String namespace, Properties properties) {
initCacheDir(namespace, properties);
//啟動是判斷是否從快取資訊中獲取,默認為false
if (isLoadCacheAtStart(properties)) {
//從快取目錄中讀取資訊
this.serviceInfoMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir));
} else {
//創建空集合對象
this.serviceInfoMap = new ConcurrentHashMap<>(16);
}
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.pushEmptyProtection = isPushEmptyProtect(properties);
}
public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
//判斷服務key是否為空
String serviceKey = serviceInfo.getKey();
if (serviceKey == null) {
return null;
}
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (isEmptyOrErrorPush(serviceInfo)) {
//empty or error push, just ignore
return oldService;
}
//將快取資訊放置到map中
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
//判斷實例資訊是否發生改變
boolean changed = isChangedServiceInfo(oldService, serviceInfo);
if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
}
//監控服務快取map的大小
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) {
NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
JacksonUtils.toJson(serviceInfo.getHosts()));
//添加實例變更事件,被訂閱者執行
NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
serviceInfo.getClusters(), serviceInfo.getHosts()));
//寫入本地文件
DiskCache.write(serviceInfo, cacheDir);
}
return serviceInfo;
}
}
這裡就是Nacos獲取註冊資訊的快取,之前我們有講過,當服務資訊變更的時候會第一時間更新ServiceInfoMap
中的資訊,通過isChangedServiceInfo
進行判斷,當發生變動時,serviceInfoMap.put
最新數據,當我們需要使用的時候,通過key進行get操作,ServiceInfoMap
默認創建空的對象,但如果配置啟動從快取文件中獲取,則會從快取中獲取資訊。而且當我們服務實例發生變更的時候,會通過DiskCache.write()
向對應的目錄文件中寫入ServiceInfo
資訊
本地快取地址
本地快取的地址通過cacheDir
進行執行本地快取和故障處理的根目錄,在ServiceInfoHolder
構造方法中,會默認生成快取目錄,默認路徑為${user}/nacos/naming/public
,我們也可以需要通過System.setProperty("JM.SNAPSHOT.PATH")
指定。
public class ServiceInfoHolder implements Closeable {
private String cacheDir;
public ServiceInfoHolder(String namespace, Properties properties) {
//初始化生成快取目錄
initCacheDir(namespace, properties);
......
}
private void initCacheDir(String namespace, Properties properties) {
String jmSnapshotPath = System.getProperty(JM_SNAPSHOT_PATH_PROPERTY);
String namingCacheRegistryDir = "";
if (properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR) != null) {
namingCacheRegistryDir = File.separator + properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR);
}
if (!StringUtils.isBlank(jmSnapshotPath)) {
cacheDir = jmSnapshotPath + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir
+ File.separator + FILE_PATH_NAMING + File.separator + namespace;
} else {
cacheDir = System.getProperty(USER_HOME_PROPERTY) + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir
+ File.separator + FILE_PATH_NAMING + File.separator + namespace;
}
}
}
故障處理
在ServiceInfoHolder
構造方法中,還會初始化一個FailoverReactor
的類,這個類主要是用來故障處理。
public class ServiceInfoHolder implements Closeable {
private final FailoverReactor failoverReactor;
public ServiceInfoHolder(String namespace, Properties properties) {
....
//為兩者相互持有對方的引用
this.failoverReactor = new FailoverReactor(this, cacheDir);
.....
}
public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String cacheDir) {
//獲取serviceInfoHolder引用對象
this.serviceInfoHolder = serviceInfoHolder;
//故障目錄${user}/nacos/naming/public/failover
this.failoverDir = cacheDir + FAILOVER_DIR;
//初始化executorService
this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
//開啟守護執行緒
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.failover");
return thread;
}
});
//其他資訊初始化
this.init();
}
public void init() {
//執行初始化操作,間隔5秒,執行SwitchRefresher()任務
executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);
//初始化操作,延遲30分鐘執行,間隔24小時,執行DiskFileWriter()任務
executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);
//初始化操作,間隔10秒,核心方法為DiskFileWriter
executorService.schedule(new Runnable() {
@Override
public void run() {
try {
File cacheDir = new File(failoverDir);
if (!cacheDir.exists() && !cacheDir.mkdirs()) {
throw new IllegalStateException("failed to create cache dir: " + failoverDir);
}
File[] files = cacheDir.listFiles();
//如果故障目錄為空,啟動立即執行,備份文件
if (files == null || files.length <= 0) {
new DiskFileWriter().run();
}
} catch (Throwable e) {
NAMING_LOGGER.error("[NA] failed to backup file on startup.", e);
}
}
}, 10000L, TimeUnit.MILLISECONDS);
}
}
在init()
程式碼中,開啟了三個定時任務,三個任務都是FailoverReactor
內部類,
- 執行初始化操作,間隔5秒,執行SwitchRefresher()任務
- 初始化操作,延遲30分鐘執行,間隔24小時,執行DiskFileWriter()任務
- 初始化操作,間隔10秒,核心方法為DiskFileWriter
我們這裡先來看一下核心方法DiskFileWriter
,這裡主要是獲取服務資訊,判斷是否能夠寫入磁碟,條件滿足,寫入拼接的故障目錄中,因為第一個和第二個初始化操作,都會用到DiskFileWriter
,當我們第三個定時判斷如果文件不存在,則會將文件寫入本地磁碟中
class DiskFileWriter extends TimerTask {
@Override
public void run() {
Map<String, ServiceInfo> map = serviceInfoHolder.getServiceInfoMap();
for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) {
ServiceInfo serviceInfo = entry.getValue();
//主要是判斷服務資訊是否完整
if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils
.equals(serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY) || StringUtils
.equals(serviceInfo.getName(), UtilAndComs.ENV_CONFIGS) || StringUtils
.equals(serviceInfo.getName(), UtilAndComs.VIP_CLIENT_FILE) || StringUtils
.equals(serviceInfo.getName(), UtilAndComs.ALL_HOSTS)) {
continue;
}
//將文件寫入磁碟中
DiskCache.write(serviceInfo, failoverDir);
}
}
}
接下來我們再看一下,第一個定時任務SwitchRefresher
的業務邏輯,
class SwitchRefresher implements Runnable {
long lastModifiedMillis = 0L;
@Override
public void run() {
try {
File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH);
//如果文件不存在返回
if (!switchFile.exists()) {
switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
NAMING_LOGGER.debug("failover switch is not found, {}", switchFile.getName());
return;
}
long modified = switchFile.lastModified();
//判斷文件修改時間
if (lastModifiedMillis < modified) {
lastModifiedMillis = modified;
//獲取故障處理文件內容
String failover = ConcurrentDiskUtil.getFileContent(failoverDir + UtilAndComs.FAILOVER_SWITCH,
Charset.defaultCharset().toString());
if (!StringUtils.isEmpty(failover)) {
String[] lines = failover.split(DiskCache.getLineSeparator());
for (String line : lines) {
String line1 = line.trim();
//"1" 開啟故障處理
if (IS_FAILOVER_MODE.equals(line1)) {
switchParams.put(FAILOVER_MODE_PARAM, Boolean.TRUE.toString());
NAMING_LOGGER.info("failover-mode is on");
new FailoverFileReader().run();
//"0" 關閉故障處理
} else if (NO_FAILOVER_MODE.equals(line1)) {
switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
NAMING_LOGGER.info("failover-mode is off");
}
}
} else {
switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
}
}
} catch (Throwable e) {
NAMING_LOGGER.error("[NA] failed to read failover switch.", e);
}
}
}
這裡面主要是判斷故障處理文件是否存在,不存在直接返回,再去比較文件的修改時間,如果已經修改,則獲取文件中的內容,繼續進行判斷,當我們開啟故障處理時,執行執行緒FailoverFileReader().run()
class FailoverFileReader implements Runnable {
@Override
public void run() {
Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16);
BufferedReader reader = null;
try {
//讀取failoverDir目錄下的文件
File cacheDir = new File(failoverDir);
//不存在返回錯誤
if (!cacheDir.exists() && !cacheDir.mkdirs()) {
throw new IllegalStateException("failed to create cache dir: " + failoverDir);
}
//獲取文件
File[] files = cacheDir.listFiles();
//文件不存在返回
if (files == null) {
return;
}
//遍歷處理
for (File file : files) {
//文件不存在跳過
if (!file.isFile()) {
continue;
}
//如果是故障處理標誌文件,跳過這一步
if (file.getName().equals(UtilAndComs.FAILOVER_SWITCH)) {
continue;
}
ServiceInfo dom = new ServiceInfo(file.getName());
//讀取備份中的內容,轉換為ServiceInfo對象
try {
String dataString = ConcurrentDiskUtil
.getFileContent(file, Charset.defaultCharset().toString());
reader = new BufferedReader(new StringReader(dataString));
String json;
if ((json = reader.readLine()) != null) {
try {
dom = JacksonUtils.toObj(json, ServiceInfo.class);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] error while parsing cached dom : {}", json, e);
}
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to read cache for dom: {}", file.getName(), e);
} finally {
try {
if (reader != null) {
reader.close();
}
} catch (Exception e) {
//ignore
}
}
if (!CollectionUtils.isEmpty(dom.getHosts())) {
//將ServiceInfo對象放入domMap中
domMap.put(dom.getKey(), dom);
}
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to read cache file", e);
}
//如果不為空,賦值serviceMap
if (domMap.size() > 0) {
serviceMap = domMap;
}
}
}
FailoverFileReader
主要是操作讀取failover
目錄存儲的備份服務資訊文件內容,然後裝換成ServiceInfo
資訊,並將所有的ServiceInfo
儲存在FailoverReactor
的ServiceMap
屬性中。
總結
到這裡我們Nacos訂閱機制核心流程就講完了,整體訂閱機制的流程還是比較複雜的,因為還涉及到之前將的邏輯,會有點繞,並且用到了保證執行緒Map、守護執行緒、阻塞隊列、執行緒的使用等等,我們需要重點掌握的主要是事件發布者、訂閱者之間的關係,這裡還是推薦大家有機會的話可以自己跟著源碼走一遍,會有更深的體驗。
如果覺得文中有幫助的,記得點贊支援,你的支援是我創作的最大動力!
我是牧小農,怕什麼真理無窮,進一步有進一步的歡喜,大家加油!