Nacos源碼系列—服務端那些事兒

點贊再看,養成習慣,微信搜索【牧小農】關注我獲取更多資訊,風裡雨里,小農等你,很高興能夠成為你的朋友。
項目源碼地址:公眾號回復 nacos,即可免費獲取源碼

前言

在上節課中,我們講解了客戶端註冊服務的大體流程,客戶端在註冊服務的時候調用的是 NamingService.registerInstance 來完成實例的註冊,在最後呢我們知道服務註冊是通過 nacos/v1/ns/instance 介面來完成註冊的,我們今天來講解服務端的註冊,首先就從這個介面地址開始,來看具體服務端都做了哪些事情

服務註冊

上面是我們從官網中找到的Nacos架構圖,從這個圖中我們大體可以得出我們要找的介面應該是在NamingService這個服務中,同時我們在項目結構中也可以看到naming這個模組,naming就是實現服務註冊的,我們都知道請求路徑都是通過controller來進行處理的,而在其中我們可以看到一個InstanceController的這麼一個類,那麼註冊實例肯定會和它有關。可以看到InstanceController類的請求路由即是我們POST請求的路由的部分,如下:

所以,我們就從開始研究接收請求處理服務註冊的源碼,我們找到通過RestFul API介面,請求類型為Post,的方法,符合條件的只有InstanceController.register方法,這個方法用來接收用戶的請求,並且把收到的資訊進行解析,裝換成實例資訊,然後通過getInstanceOperator().registerInstance進行調用,這個方法也是服務註冊的核心

    @CanDistro
    @PostMapping
    @Secured(action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        //從 request資訊中獲取namespaceId,如果沒有默認為public
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        //獲取服務名稱 格式:「group@@serviceName」
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        //將request參數還原成instance實例
        final Instance instance = HttpRequestInstanceBuilder.newBuilder()
                .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
        //【核心】註冊服務實例
        getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

我們先來看一下下面這個核心方法

getInstanceOperator().registerInstance(namespaceId, serviceName, instance);

getInstanceOperator() 這個判斷是否走Grpc協議,默認走Grpc,所以我們使用的是instanceServiceV2這個實例對象

  private InstanceOperator getInstanceOperator() {
        return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1;
    }

instanceServiceV2就是InstanceOperatorClientImpl,方法所以我們需要進入的是下面這個實例的處理類

具體方法如下所示:

    @Override
    public void registerInstance(String namespaceId, String serviceName, Instance instance) {
        //判斷是否為臨時客戶端
        boolean ephemeral = instance.isEphemeral();
        //獲取客戶端ID
        String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
        //通過客戶端ID創建客戶端連接
        createIpPortClientIfAbsent(clientId);
        //獲取服務資訊
        Service service = getService(namespaceId, serviceName, ephemeral);
        //註冊服務
        clientOperationService.registerInstance(service, instance, clientId);
    }

從Nacos2.0以後,新增了Client模型,管理與該客戶機有關的數據內容,如果一個客戶機發布了一個服務,那麼這個客戶機發布的所有服務和訂閱者資訊都會被更新到一個Client對象中,這個Client對象對應於這個客戶機的鏈接,然後通過事件機制觸發索引資訊的更新。Client負責管理一個客戶機的服務實例註冊Publish和服務訂閱Subscribe,可以方便地對需要推送的服務範圍進行快速聚合,同時一個客戶端gRPC長連接對應一個Client,每個Client有自己唯一的 clientId

package com.alibaba.nacos.naming.core.v2.client;
public interface Client {

    // 客戶端id
    String getClientId();
    // 是否臨時客戶端
    boolean isEphemeral();
    //設置客戶端更新時間
    void setLastUpdatedTime();
    //獲取客戶端更新時間
    long getLastUpdatedTime();
    // 服務實例註冊
    boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo);
    //服務實例移除
    InstancePublishInfo removeServiceInstance(Service service);
    //服務實例查詢
    InstancePublishInfo getInstancePublishInfo(Service service);
    Collection<Service> getAllPublishedService();
    // 服務訂閱
    boolean addServiceSubscriber(Service service, Subscriber subscriber);
    ///取消訂閱
    boolean removeServiceSubscriber(Service service);
    //查詢訂閱
    Subscriber getSubscriber(Service service);
    Collection<Service> getAllSubscribeService();
    // 生成同步給其他節點的client數據
    ClientSyncData generateSyncData();
    // 是否過期
    boolean isExpire(long currentTime);
    // 釋放資源
    void release();

}

知道了Client模型後,我們來接著從clientOperationService.registerInstance(service, instance, clientId);找到對應的具體實現

EphemeralClientOperationServiceImpl.registerInstance()

下面這個方法是具體來負責處理服務註冊,我們來詳細了解一下:

    @Override
    public void registerInstance(Service service, Instance instance, String clientId) {
        //確保Service單例存在,注意Service的equals和hasCode方法
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        //如果不是臨時客戶端
        if (!singleton.isEphemeral()) {
            throw new NacosRuntimeException(NacosException.INVALID_PARAM,
                    String.format("Current service %s is persistent service, can't register ephemeral instance.",
                            singleton.getGroupedServiceName()));
        }
        //根據客戶端ID找到客戶端資訊,這個關係在連接建立的時候存儲
        Client client = clientManager.getClient(clientId);
        if (!clientIsLegal(client, clientId)) {
            return;
        }
        //將客戶端實例模型,裝換成服務端實例模型
        InstancePublishInfo instanceInfo = getPublishInfo(instance);
        //將實例存儲到client中
        client.addServiceInstance(singleton, instanceInfo);
        //設置最後更新時間
        client.setLastUpdatedTime();
        //建立服務和客戶端的關聯關係
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
        NotifyCenter
                .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
    }
  1. ServiceManager.getInstance().getSingleton() 當調用getSingleton的時候會負責管理service的單例,在這裡service會重寫equlas和hasCode方法作為key
public class ServiceManager {

    //單例service 看service中equals和hasCode方法
    private final ConcurrentHashMap<Service, Service> singletonRepository;
    //namespace下所有的service
    private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
    
    //通過Map儲存單例的Service
    public Service getSingleton(Service service) {
        singletonRepository.putIfAbsent(service, service);
        Service result = singletonRepository.get(service);
        namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());
        namespaceSingletonMaps.get(result.getNamespace()).add(result);
        return result;
    }
}
  1. service中 equal和hasCode方法,namespace+group+name在服務端是一個單例Service
    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof Service)) {
            return false;
        }
        Service service = (Service) o;
        return namespace.equals(service.namespace) && group.equals(service.group) && name.equals(service.name);
    }
    
    @Override
    public int hashCode() {
        return Objects.hash(namespace, group, name);
    }
  1. clientManager.getClient() 這裡對應的實現類為ConnectionBasedClientManager這個實現類負責管理長連接clientId和client模型的映射關係
@Component("connectionBasedClientManager")
public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
    //通過map存儲ID和client之間的關聯關係
    private final ConcurrentMap<String, ConnectionBasedClient> clients = new ConcurrentHashMap<>();

    //根據clientId查詢client
    @Override
    public Client getClient(String clientId) {
        return clients.get(clientId);
    }
}
  1. client.addServiceInstance(); 抽象類為AbstractClient:負責存儲當前客戶端服務註冊表,也就是 service和instance的關係。
  protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);、    

  //將service和實例進行關聯
    @Override
    public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
        if (null == publishers.put(service, instancePublishInfo)) {
            //監控指標自增實例數
            MetricsMonitor.incrementInstanceCount();
        }
        NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
        return true;
    }
  1. ClientOperationEvent.ClientRegisterServiceEvent() :這裡目的是為了過濾目標服務得到最終instance列表建立service和client的關係,能夠方便我們快速查詢,同時會觸發ClientServiceIndexesManager的監聽事件。


  //服務與發布client的關係
   private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
    //服務與訂閱clientId的關係
    private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();

    private void handleClientOperation(ClientOperationEvent event) {
        Service service = event.getService();
        String clientId = event.getClientId();
        if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
            addPublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
            removePublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
            addSubscriberIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
            removeSubscriberIndexes(service, clientId);
        }
    }
    
        
        private void addPublisherIndexes(Service service, String clientId) {
        publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
        publisherIndexes.get(service).add(clientId);
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }
    
       private void removePublisherIndexes(Service service, String clientId) {
        if (!publisherIndexes.containsKey(service)) {
            return;
        }
        publisherIndexes.get(service).remove(clientId);
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }
    
        private void addSubscriberIndexes(Service service, String clientId) {
        subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
        // Fix #5404, Only first time add need notify event.
        if (subscriberIndexes.get(service).add(clientId)) {
            NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
        }
    }
    
    private void removeSubscriberIndexes(Service service, String clientId) {
        if (!subscriberIndexes.containsKey(service)) {
            return;
        }
        subscriberIndexes.get(service).remove(clientId);
        if (subscriberIndexes.get(service).isEmpty()) {
            subscriberIndexes.remove(service);
        }
    }

請求流程圖:

服務端監控檢查

Nacos作為註冊中心不止提供了服務註冊和服務發現的功能,還提供了服務可用性檢測的功能,在1.0的版本中,臨時實例走的是distro協議,客戶端向註冊中心發送心跳來維持自身的健康(healthy)狀態,持久實例則走的是Raft協議存儲。

  1. 兩種檢測機制
  • 客戶端主動上報機制
  • 伺服器端主動下探機制

客戶端主動上報機制:你主動找上級,說你沒有打卡(不健康狀態)

伺服器端主動下探機制:上級檢測到你有不打卡的記錄,主動來找你

對於Nacos健康檢測機制,我們不能主動去設置,但是健康檢查機制是和Nacos的服務實例類型強相關,主要是有兩種服務實例:

  • 臨時實例:客戶端主動上報
  • 持久實例:服務端主動下探

客戶端主動上報

臨時實例每隔5秒會主動上報自己的健康狀態,發送心跳,如果發送心跳的間隔時間超過15秒,Nacos伺服器端會將服務標記為亞健康狀態,如果超過30S沒有發送心跳,那麼服務實例會被從服務列表中剔除

在2.0版本以後,持久實例不變,臨時實例而是通過長連接來判斷實例是否健康。

  1. 長連接: 一個連接上可以連續發送多數據包,在連接保持期間,如果沒有數據包發送,需要雙方發鏈路檢測包,在Nacos2.0之後,使用Grpc協議代替了http協議。長連接會保持客戶端和服務端發送的狀態,在源碼中ConnectionManager 管理所有客戶端的長連接

ConnectionManager: 每3秒檢測所有超過20S內沒有發生過通訊的客戶端,向客戶端發起ClientDetectionRequest探測請求,如果客戶端在1s內成功響應,則檢測通過,否則執行unregister方法移除Connection

如果客戶端持續和服務端進行通訊,服務端是不需要主動下探的,只有當客戶端沒有一直和服務端通訊的時候,服務端才會主動下探操作

@Service
public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> {

Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();

   //只要spring容器啟動,會觸發這個方法
    @PostConstruct
    public void start() {
    // 啟動不健康連接排除功能.
    RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
      @Override
      public void run() {
        // 1. 統計過時(20s)連接
         Set<Map.Entry<String, Connection>> entries = connections.entrySet();
        //2.獲得需要剔除的IP和埠
        //3.根據限制獲取剔除的IP和埠
        //4.如果還是有需要剔除的客戶端,則繼續執行
        //5.沒有活動的客戶端執行探測            
        //6.如果沒有馬上響應,則馬上剔除
        //7.剔除後發布ClientDisconnectEvent事件
      }
    });

    }
}

//註銷(移出)連接方法
public synchronized void unregister(String connectionId) {
    Connection remove = this.connections.remove(connectionId);
    if (remove != null) {
        String clientIp = remove.getMetaInfo().clientIp;
        AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
        if (atomicInteger != null) {
            int count = atomicInteger.decrementAndGet();
            if (count <= 0) {
                connectionForClientIp.remove(clientIp);
            }
        }
        remove.close();
        Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId);
        clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
    }

當服務端操作移除事件以後,會操作notifyClientDisConnected()方法,主要調用的是 clientConnectionEventListener.clientDisConnected(connection)方法,將連接資訊傳入進去

    public void notifyClientDisConnected(final Connection connection) {
        
        for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {
            try {
                clientConnectionEventListener.clientDisConnected(connection);
            } catch (Throwable throwable) {
                Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}",
                        clientConnectionEventListener.getName(), throwable);
            }
        }
        

clientConnectionEventListenerd的實現類是ConnectionBasedClientManager,在這裡面會出發清除索引快取等操作

@Component("connectionBasedClientManager")
public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
    @Override
    public boolean clientDisconnected(String clientId) {
        Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
        //同步移除client數據
        ConnectionBasedClient client = clients.remove(clientId);
        if (null == client) {
            return true;
        }
        client.release();
        //服務訂閱,將變更通知到客戶端
        NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
        return true;
    }
}

總結

到這裡Nacos服務端的基礎的源碼就講完了,有些地方我們沒有展開來講,在後續的源碼講解中,會給大家詳細的進行講解,今天主要講解了,服務端註冊以及監控檢查的基礎程式碼,後面會有最新的內容呈現給大家,如果覺得文中對您有幫助的,記得點贊關注~

今天是母親節,在這裡祝媽媽們,節日快樂!

我是牧小農,怕什麼真理無窮,進一步有進一步的歡喜,大家加油