Nacos源碼系列—訂閱機制的前因後果(下)

點贊再看,養成習慣,微信搜索【牧小農】關注我獲取更多資訊,風裡雨里,小農等你,很高興能夠成為你的朋友。
項目源碼地址:公眾號回復 nacos,即可免費獲取源碼

事件發布

在上一節中我們講解了在NotifyCenter中維護了事件名稱和事件發布者的關係,而默認的事件發布者為DefaultPublisher,今天我們就來講一下DefaultPublisher的事件發布的具體邏輯

首先我們來看一下DefaultPublisher的源碼:

public class DefaultPublisher extends Thread implements EventPublisher {
    @Override
    public void init(Class<? extends Event> type, int bufferSize) {
        //守護執行緒
        setDaemon(true);
        //設置執行緒名
        setName("nacos.publisher-" + type.getName());
        this.eventType = type;
        this.queueMaxSize = bufferSize;
        //阻塞隊列初始化
        this.queue = new ArrayBlockingQueue<>(bufferSize);
        //啟動執行緒
        start();
    }
    
    @Override
    public synchronized void start() {
        if (!initialized) {
            // start just called once
            //啟動run方法
            super.start();
            if (queueMaxSize == -1) {
                queueMaxSize = ringBufferSize;
            }
            initialized = true;
        }
    }
}

我們可以看到這個類繼承自Thread,說明他是一個執行緒類,同時實現了EventPublisher說明他也是一個發布者,在init()中,是以守護執行緒的方式運作的,同時初始化了一個阻塞隊列,最後調用start()啟動執行緒。

在start()裡面,其實就是啟動run():

@Override
public void run() {
    openEventHandler();
}

   void openEventHandler() {
    try {

        // This variable is defined to resolve the problem which message overstock in the queue.
        int waitTimes = 60;
        // To ensure that messages are not lost, enable EventHandler when
        // waiting for the first Subscriber to register
        //死循環遍歷,執行緒啟動設置最大延遲60秒,用來解決消息積壓問題
        for (; ; ) {
            if (shutdown || hasSubscriber() || waitTimes <= 0) {
                break;
            }
            ThreadUtils.sleep(1000L);
            waitTimes--;
        }
        //死循環從隊列中取出event對象,同時通知訂閱者(subscriber)執行event對象
        for (; ; ) {
            if (shutdown) {
                break;
            }
            final Event event = queue.take();
            receiveEvent(event);
            UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
        }
    } catch (Throwable ex) {
        LOGGER.error("Event listener exception : ", ex);
    }
}

在上述程式碼中我們可以看到for (; ; )這個循環出現了兩次,這個就是循環遍歷(死循環),第一個死循環我們可以理解成延時效果,裡面最大延時60秒,每隔一秒運行一次,判斷(當前執行緒是否關閉、是否有訂閱者、是否超過60秒)只要滿足其中任意一個條件,跳出循環
第二個死循環,是我們業務邏輯處理,用來消費,從隊列中取出event事件,然後通過receiveEvent()執行。

那麼我們可以從隊列中取出事件,那麼這個事件又在哪一步注入進去的呢,我們還是在當前類裡面,找到一個叫publish()的方法

@Override
public boolean publish(Event event) {
    checkIsStart();
    //向隊列中插入元素
    boolean success = this.queue.offer(event);
    //判斷是否插入成功
    if (!success) {
        LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
        //失敗直接執行
        receiveEvent(event);
        return true;
    }
    return true;
}

這個方法其實就是發布事件調用了publish往阻塞隊列中存入事件,如果失敗那麼立即執行receiveEvent(),不在繼續走隊列方法

void receiveEvent(Event event) {
    final long currentEventSequence = event.sequence();

    if (!hasSubscriber()) {
        LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);
        return;
    }

    // Notification single event listener
    //循環遍歷subscribers對象
    for (Subscriber subscriber : subscribers) {
        // Whether to ignore expiration events
        if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
            LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
                    event.getClass());
            continue;
        }

        // Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
        // Remove original judge part of codes.
        //通知訂閱者執行event
        notifySubscriber(subscriber, event);
    }
}

而在receiveEvent()方法中,這裡其實就是遍歷的subscribers集合(訂閱者),然後通過notifySubscriber() 通知訂閱者方法,而這個subscribers集合就是在我們之前講到的NacosNamingService.init()方法中設置的。

public class NacosNamingService implements NamingService {
 private void init(Properties properties) throws NacosException {
   //將Subscribe註冊到Publisher
   NotifyCenter.registerSubscriber(changeNotifier);
 }
}

NotifyCenter.registerSubscriber(changeNotifier);會調用NotifyCenter.addSubscriber()方法,進行最終的操作。

private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,
      EventPublisherFactory factory) {

  final String topic = ClassUtils.getCanonicalName(subscribeType);
  synchronized (NotifyCenter.class) {
      // MapUtils.computeIfAbsent is a unsafe method.
      MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);
  }
  //獲取對應的publisher
  EventPublisher publisher = INSTANCE.publisherMap.get(topic);
  if (publisher instanceof ShardedEventPublisher) {
      ((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
  } else {
      //添加到subscribers集合
      publisher.addSubscriber(consumer);
  }
}

addSubscriber()方法的邏輯就是講訂閱事件、發布中、訂閱者三個關係進行綁定,而發布者和事件通過Map進行維護,發布者與訂閱者通過關聯關係進行維護。

我們回到剛剛DefaulePublisher.notifySubscriber()方法,這裡是最後執行訂閱者事件的方法

@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {

  LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
  //執行訂閱者事件
  final Runnable job = () -> subscriber.onEvent(event);
  //執行者
  final Executor executor = subscriber.executor();

  if (executor != null) {
      executor.execute(job);
  } else {
      try {
          job.run();
      } catch (Throwable e) {
          LOGGER.error("Event callback exception: ", e);
      }
  }
}

到這裡,訂閱機制就講完了,可能會有點繞,最好是我們能夠去跟著程式碼走一遍,這樣會比較理解和記憶,在這裡我們重點需要理解NotifyCenter對事件發布者、訂閱者以及之間關係的維護,關係維護的入口就在NacosNamingService.init()中,我們來看一下他的核心邏輯

首先ServiceInfoHolder中通過NotifyCenter發布InstancesChangeEvent事件.

NotifyCenter獲取對應的CanonicalName,並將這個參數作為key,從NotifyCenter.publisherMap中獲取對應的事件發布者,然後將InstancesChangeEvent事件進行發布.

InstancesChangeEvent事件發布主要是通過EventPublisher的實現類,DefaultPublisher進行InstancesChangeEvent事件發布,而DefaultPublisher本身作為守護執行緒的方式進行運作,在執行業務邏輯時判斷是否執行緒啟動,如果啟動,將事件添加到隊列中,如果成功,則發布過程完成,如果添加失敗,立即執行DefaultPublisher.receiveEvent,接收事件通知訂閱者,創建一個Runnable對象,執行訂閱者的Event事件。

在添加到隊列成功的時候,DefaultPublisher會創建一個阻塞隊列(BlockingQueue),標記執行緒啟動,當他執行 super.start(),會調用它的run方法,在這個run方法裡面核心的業務邏輯就是openEventHandler(),裡面會有兩個死循環,第一個是在執行緒啟動的60秒內執行條件,第二個是從阻塞隊列中獲取Event事件,調用DefaultPublisher.receiveEvent()通知訂閱者,流程結束

本地快取

我們在之前的系列中,客戶端會快取一些資訊在本地中,來獲取ServiceInfo的資訊,但是在執行本地快取的時候,難免會有一些故障,有故障就需要進行處理,在這裡主要涉及到兩個類ServiceInfoHolderFailoverReactor

Nacos快取主要是分為兩個方面,一個從註冊中心獲取實例資訊快取到記憶體中,通過ConcurrentMap進行存儲,一個是通過磁碟文件的形式定時快取。

同時故障處理也分為兩個部分,一個是故障處理的開關通過文件進行標記,一個是當起來故障處理後,可以從故障備份的文件中獲取服務實例資訊。

介紹完上面幾點,我們先來詳細講解第一個核心類ServiceInfoHolder

ServiceInfoHolder

ServiceInfoHolder類,主要是用來處理服務資訊的,每次客戶端從服務端拉取服務資訊時,都用經過這個類,而processServiceInfo用來處理本地資訊(快取、發布、更新、本地目錄初始化)等

ServiceInfo: 註冊服務的資訊,主要包含(服務名、分組名、集群資訊、實例列表、最後一次更新時間),客戶端獲取的資訊,都是通過ServiceInfo作為承載體,ServiceInfoHolder.ServiceInfo,通過ConcurrentMap進行存儲,如下所示:

public class ServiceInfoHolder implements Closeable {

  private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
  
   public ServiceInfoHolder(String namespace, Properties properties) {
    initCacheDir(namespace, properties);
    //啟動是判斷是否從快取資訊中獲取,默認為false
    if (isLoadCacheAtStart(properties)) {
        //從快取目錄中讀取資訊
        this.serviceInfoMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir));
    } else {
        //創建空集合對象
        this.serviceInfoMap = new ConcurrentHashMap<>(16);
    }
    this.failoverReactor = new FailoverReactor(this, cacheDir);
    this.pushEmptyProtection = isPushEmptyProtect(properties);
}
  
  public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
        //判斷服務key是否為空
        String serviceKey = serviceInfo.getKey();
        if (serviceKey == null) {
            return null;
        }
        ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
        if (isEmptyOrErrorPush(serviceInfo)) {
            //empty or error push, just ignore
            return oldService;
        }
        //將快取資訊放置到map中
        serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
        //判斷實例資訊是否發生改變
        boolean changed = isChangedServiceInfo(oldService, serviceInfo);
        if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
            serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
        }
        //監控服務快取map的大小
        MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
        if (changed) {
            NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(),
                    JacksonUtils.toJson(serviceInfo.getHosts()));
            //添加實例變更事件,被訂閱者執行
            NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                    serviceInfo.getClusters(), serviceInfo.getHosts()));
            //寫入本地文件
            DiskCache.write(serviceInfo, cacheDir);
        }
        return serviceInfo;
    }
}

這裡就是Nacos獲取註冊資訊的快取,之前我們有講過,當服務資訊變更的時候會第一時間更新ServiceInfoMap中的資訊,通過isChangedServiceInfo進行判斷,當發生變動時,serviceInfoMap.put最新數據,當我們需要使用的時候,通過key進行get操作,ServiceInfoMap默認創建空的對象,但如果配置啟動從快取文件中獲取,則會從快取中獲取資訊。而且當我們服務實例發生變更的時候,會通過DiskCache.write()向對應的目錄文件中寫入ServiceInfo資訊

本地快取地址

本地快取的地址通過cacheDir進行執行本地快取和故障處理的根目錄,在ServiceInfoHolder構造方法中,會默認生成快取目錄,默認路徑為${user}/nacos/naming/public,我們也可以需要通過System.setProperty("JM.SNAPSHOT.PATH")指定。

public class ServiceInfoHolder implements Closeable {
    private String cacheDir;
    
    public ServiceInfoHolder(String namespace, Properties properties) {
    //初始化生成快取目錄
    initCacheDir(namespace, properties);
    ......
    }
    
    private void initCacheDir(String namespace, Properties properties) {
        String jmSnapshotPath = System.getProperty(JM_SNAPSHOT_PATH_PROPERTY);

        String namingCacheRegistryDir = "";
        if (properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR) != null) {
            namingCacheRegistryDir = File.separator + properties.getProperty(PropertyKeyConst.NAMING_CACHE_REGISTRY_DIR);
        }

        if (!StringUtils.isBlank(jmSnapshotPath)) {
            cacheDir = jmSnapshotPath + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir
                    + File.separator + FILE_PATH_NAMING + File.separator + namespace;
        } else {
            cacheDir = System.getProperty(USER_HOME_PROPERTY) + File.separator + FILE_PATH_NACOS + namingCacheRegistryDir
                    + File.separator + FILE_PATH_NAMING + File.separator + namespace;
        }
    }
    
}

故障處理

ServiceInfoHolder構造方法中,還會初始化一個FailoverReactor的類,這個類主要是用來故障處理。

public class ServiceInfoHolder implements Closeable {
  private final FailoverReactor failoverReactor;
  
  public ServiceInfoHolder(String namespace, Properties properties) {
    ....
    //為兩者相互持有對方的引用
    this.failoverReactor = new FailoverReactor(this, cacheDir);
    .....
  }
  
   public FailoverReactor(ServiceInfoHolder serviceInfoHolder, String cacheDir) {
        //獲取serviceInfoHolder引用對象
        this.serviceInfoHolder = serviceInfoHolder;
        //故障目錄${user}/nacos/naming/public/failover
        this.failoverDir = cacheDir + FAILOVER_DIR;
        //初始化executorService
        this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                //開啟守護執行緒
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.failover");
                return thread;
            }
        });
        //其他資訊初始化
        this.init();
    }
    
     public void init() {
        //執行初始化操作,間隔5秒,執行SwitchRefresher()任務
        executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);

        //初始化操作,延遲30分鐘執行,間隔24小時,執行DiskFileWriter()任務
        executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);
        

        //初始化操作,間隔10秒,核心方法為DiskFileWriter
        executorService.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    File cacheDir = new File(failoverDir);
                    
                    if (!cacheDir.exists() && !cacheDir.mkdirs()) {
                        throw new IllegalStateException("failed to create cache dir: " + failoverDir);
                    }
                    
                    File[] files = cacheDir.listFiles();
                    //如果故障目錄為空,啟動立即執行,備份文件
                    if (files == null || files.length <= 0) {
                        new DiskFileWriter().run();
                    }
                } catch (Throwable e) {
                    NAMING_LOGGER.error("[NA] failed to backup file on startup.", e);
                }
                
            }
        }, 10000L, TimeUnit.MILLISECONDS);
    }
}

init()程式碼中,開啟了三個定時任務,三個任務都是FailoverReactor內部類,

  • 執行初始化操作,間隔5秒,執行SwitchRefresher()任務
  • 初始化操作,延遲30分鐘執行,間隔24小時,執行DiskFileWriter()任務
  • 初始化操作,間隔10秒,核心方法為DiskFileWriter

我們這裡先來看一下核心方法DiskFileWriter,這裡主要是獲取服務資訊,判斷是否能夠寫入磁碟,條件滿足,寫入拼接的故障目錄中,因為第一個和第二個初始化操作,都會用到DiskFileWriter,當我們第三個定時判斷如果文件不存在,則會將文件寫入本地磁碟中

class DiskFileWriter extends TimerTask {

  @Override
  public void run() {
      Map<String, ServiceInfo> map = serviceInfoHolder.getServiceInfoMap();
      for (Map.Entry<String, ServiceInfo> entry : map.entrySet()) {
          ServiceInfo serviceInfo = entry.getValue();
          //主要是判斷服務資訊是否完整
          if (StringUtils.equals(serviceInfo.getKey(), UtilAndComs.ALL_IPS) || StringUtils
                  .equals(serviceInfo.getName(), UtilAndComs.ENV_LIST_KEY) || StringUtils
                  .equals(serviceInfo.getName(), UtilAndComs.ENV_CONFIGS) || StringUtils
                  .equals(serviceInfo.getName(), UtilAndComs.VIP_CLIENT_FILE) || StringUtils
                  .equals(serviceInfo.getName(), UtilAndComs.ALL_HOSTS)) {
              continue;
          }
          //將文件寫入磁碟中
          DiskCache.write(serviceInfo, failoverDir);
      }
  }
}

接下來我們再看一下,第一個定時任務SwitchRefresher的業務邏輯,

class SwitchRefresher implements Runnable {

long lastModifiedMillis = 0L;

@Override
public void run() {
  try {
      File switchFile = new File(failoverDir + UtilAndComs.FAILOVER_SWITCH);
      //如果文件不存在返回
      if (!switchFile.exists()) {
          switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
          NAMING_LOGGER.debug("failover switch is not found, {}", switchFile.getName());
          return;
      }

      long modified = switchFile.lastModified();
      //判斷文件修改時間
      if (lastModifiedMillis < modified) {
          lastModifiedMillis = modified;
          //獲取故障處理文件內容
          String failover = ConcurrentDiskUtil.getFileContent(failoverDir + UtilAndComs.FAILOVER_SWITCH,
                  Charset.defaultCharset().toString());
          if (!StringUtils.isEmpty(failover)) {
              String[] lines = failover.split(DiskCache.getLineSeparator());

              for (String line : lines) {
                  String line1 = line.trim();
                  //"1" 開啟故障處理
                  if (IS_FAILOVER_MODE.equals(line1)) {
                      switchParams.put(FAILOVER_MODE_PARAM, Boolean.TRUE.toString());
                      NAMING_LOGGER.info("failover-mode is on");
                      new FailoverFileReader().run();
                      //"0" 關閉故障處理
                  } else if (NO_FAILOVER_MODE.equals(line1)) {
                      switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
                      NAMING_LOGGER.info("failover-mode is off");
                  }
              }
          } else {
              switchParams.put(FAILOVER_MODE_PARAM, Boolean.FALSE.toString());
          }
      }

  } catch (Throwable e) {
      NAMING_LOGGER.error("[NA] failed to read failover switch.", e);
  }
}
}

這裡面主要是判斷故障處理文件是否存在,不存在直接返回,再去比較文件的修改時間,如果已經修改,則獲取文件中的內容,繼續進行判斷,當我們開啟故障處理時,執行執行緒FailoverFileReader().run()

class FailoverFileReader implements Runnable {

@Override
public void run() {
  Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16);

  BufferedReader reader = null;
  try {
      //讀取failoverDir目錄下的文件
      File cacheDir = new File(failoverDir);
      //不存在返回錯誤
      if (!cacheDir.exists() && !cacheDir.mkdirs()) {
          throw new IllegalStateException("failed to create cache dir: " + failoverDir);
      }
      //獲取文件
      File[] files = cacheDir.listFiles();
      //文件不存在返回
      if (files == null) {
          return;
      }
      //遍歷處理
      for (File file : files) {
          //文件不存在跳過
          if (!file.isFile()) {
              continue;
          }
          //如果是故障處理標誌文件,跳過這一步
          if (file.getName().equals(UtilAndComs.FAILOVER_SWITCH)) {
              continue;
          }

          ServiceInfo dom = new ServiceInfo(file.getName());

          //讀取備份中的內容,轉換為ServiceInfo對象
          try {
              String dataString = ConcurrentDiskUtil
                      .getFileContent(file, Charset.defaultCharset().toString());
              reader = new BufferedReader(new StringReader(dataString));

              String json;
              if ((json = reader.readLine()) != null) {
                  try {
                      dom = JacksonUtils.toObj(json, ServiceInfo.class);
                  } catch (Exception e) {
                      NAMING_LOGGER.error("[NA] error while parsing cached dom : {}", json, e);
                  }
              }

          } catch (Exception e) {
              NAMING_LOGGER.error("[NA] failed to read cache for dom: {}", file.getName(), e);
          } finally {
              try {
                  if (reader != null) {
                      reader.close();
                  }
              } catch (Exception e) {
                  //ignore
              }
          }
          if (!CollectionUtils.isEmpty(dom.getHosts())) {
              //將ServiceInfo對象放入domMap中
              domMap.put(dom.getKey(), dom);
          }
      }
  } catch (Exception e) {
      NAMING_LOGGER.error("[NA] failed to read cache file", e);
  }
  //如果不為空,賦值serviceMap
  if (domMap.size() > 0) {
      serviceMap = domMap;
  }
}
}

FailoverFileReader主要是操作讀取failover目錄存儲的備份服務資訊文件內容,然後裝換成ServiceInfo資訊,並將所有的ServiceInfo儲存在FailoverReactorServiceMap屬性中。

總結

到這裡我們Nacos訂閱機制核心流程就講完了,整體訂閱機制的流程還是比較複雜的,因為還涉及到之前將的邏輯,會有點繞,並且用到了保證執行緒Map、守護執行緒、阻塞隊列、執行緒的使用等等,我們需要重點掌握的主要是事件發布者、訂閱者之間的關係,這裡還是推薦大家有機會的話可以自己跟著源碼走一遍,會有更深的體驗。

如果覺得文中有幫助的,記得點贊支援,你的支援是我創作的最大動力!

我是牧小農,怕什麼真理無窮,進一步有進一步的歡喜,大家加油!

Tags: