Nacos源碼系列—服務端那些事兒
點贊再看,養成習慣,微信搜索【牧小農】關注我獲取更多資訊,風裡雨里,小農等你,很高興能夠成為你的朋友。
項目源碼地址:公眾號回復 nacos,即可免費獲取源碼
前言
在上節課中,我們講解了客戶端註冊服務的大體流程,客戶端在註冊服務的時候調用的是 NamingService.registerInstance
來完成實例的註冊,在最後呢我們知道服務註冊是通過 nacos/v1/ns/instance
介面來完成註冊的,我們今天來講解服務端的註冊,首先就從這個介面地址開始,來看具體服務端都做了哪些事情
服務註冊
上面是我們從官網中找到的Nacos架構圖,從這個圖中我們大體可以得出我們要找的介面應該是在NamingService
這個服務中,同時我們在項目結構中也可以看到naming這個模組,naming就是實現服務註冊的,我們都知道請求路徑都是通過controller來進行處理的,而在其中我們可以看到一個InstanceController
的這麼一個類,那麼註冊實例肯定會和它有關。可以看到InstanceController
類的請求路由即是我們POST請求的路由的部分,如下:
所以,我們就從開始研究接收請求處理服務註冊的源碼,我們找到通過RestFul API介面,請求類型為Post,的方法,符合條件的只有InstanceController.register
方法,這個方法用來接收用戶的請求,並且把收到的資訊進行解析,裝換成實例資訊,然後通過getInstanceOperator().registerInstance
進行調用,這個方法也是服務註冊的核心
@CanDistro
@PostMapping
@Secured(action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
//從 request資訊中獲取namespaceId,如果沒有默認為public
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
//獲取服務名稱 格式:「group@@serviceName」
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
//將request參數還原成instance實例
final Instance instance = HttpRequestInstanceBuilder.newBuilder()
.setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
//【核心】註冊服務實例
getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
return "ok";
}
我們先來看一下下面這個核心方法
getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
getInstanceOperator()
這個判斷是否走Grpc協議,默認走Grpc,所以我們使用的是instanceServiceV2
這個實例對象
private InstanceOperator getInstanceOperator() {
return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1;
}
instanceServiceV2
就是InstanceOperatorClientImpl
,方法所以我們需要進入的是下面這個實例的處理類
具體方法如下所示:
@Override
public void registerInstance(String namespaceId, String serviceName, Instance instance) {
//判斷是否為臨時客戶端
boolean ephemeral = instance.isEphemeral();
//獲取客戶端ID
String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
//通過客戶端ID創建客戶端連接
createIpPortClientIfAbsent(clientId);
//獲取服務資訊
Service service = getService(namespaceId, serviceName, ephemeral);
//註冊服務
clientOperationService.registerInstance(service, instance, clientId);
}
從Nacos2.0以後,新增了Client模型,管理與該客戶機有關的數據內容,如果一個客戶機發布了一個服務,那麼這個客戶機發布的所有服務和訂閱者資訊都會被更新到一個Client對象中,這個Client對象對應於這個客戶機的鏈接,然後通過事件機制觸發索引資訊的更新。Client負責管理一個客戶機的服務實例註冊Publish和服務訂閱Subscribe,可以方便地對需要推送的服務範圍進行快速聚合,同時一個客戶端gRPC長連接對應一個Client,每個Client有自己唯一的 clientId
package com.alibaba.nacos.naming.core.v2.client;
public interface Client {
// 客戶端id
String getClientId();
// 是否臨時客戶端
boolean isEphemeral();
//設置客戶端更新時間
void setLastUpdatedTime();
//獲取客戶端更新時間
long getLastUpdatedTime();
// 服務實例註冊
boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo);
//服務實例移除
InstancePublishInfo removeServiceInstance(Service service);
//服務實例查詢
InstancePublishInfo getInstancePublishInfo(Service service);
Collection<Service> getAllPublishedService();
// 服務訂閱
boolean addServiceSubscriber(Service service, Subscriber subscriber);
///取消訂閱
boolean removeServiceSubscriber(Service service);
//查詢訂閱
Subscriber getSubscriber(Service service);
Collection<Service> getAllSubscribeService();
// 生成同步給其他節點的client數據
ClientSyncData generateSyncData();
// 是否過期
boolean isExpire(long currentTime);
// 釋放資源
void release();
}
知道了Client模型後,我們來接著從clientOperationService.registerInstance(service, instance, clientId);
找到對應的具體實現
EphemeralClientOperationServiceImpl.registerInstance()
下面這個方法是具體來負責處理服務註冊,我們來詳細了解一下:
@Override
public void registerInstance(Service service, Instance instance, String clientId) {
//確保Service單例存在,注意Service的equals和hasCode方法
Service singleton = ServiceManager.getInstance().getSingleton(service);
//如果不是臨時客戶端
if (!singleton.isEphemeral()) {
throw new NacosRuntimeException(NacosException.INVALID_PARAM,
String.format("Current service %s is persistent service, can't register ephemeral instance.",
singleton.getGroupedServiceName()));
}
//根據客戶端ID找到客戶端資訊,這個關係在連接建立的時候存儲
Client client = clientManager.getClient(clientId);
if (!clientIsLegal(client, clientId)) {
return;
}
//將客戶端實例模型,裝換成服務端實例模型
InstancePublishInfo instanceInfo = getPublishInfo(instance);
//將實例存儲到client中
client.addServiceInstance(singleton, instanceInfo);
//設置最後更新時間
client.setLastUpdatedTime();
//建立服務和客戶端的關聯關係
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
NotifyCenter
.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
- ServiceManager.getInstance().getSingleton() 當調用
getSingleton
的時候會負責管理service的單例,在這裡service會重寫equlas和hasCode方法作為key
public class ServiceManager {
//單例service 看service中equals和hasCode方法
private final ConcurrentHashMap<Service, Service> singletonRepository;
//namespace下所有的service
private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
//通過Map儲存單例的Service
public Service getSingleton(Service service) {
singletonRepository.putIfAbsent(service, service);
Service result = singletonRepository.get(service);
namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());
namespaceSingletonMaps.get(result.getNamespace()).add(result);
return result;
}
}
- service中 equal和hasCode方法,namespace+group+name在服務端是一個單例Service
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Service)) {
return false;
}
Service service = (Service) o;
return namespace.equals(service.namespace) && group.equals(service.group) && name.equals(service.name);
}
@Override
public int hashCode() {
return Objects.hash(namespace, group, name);
}
- clientManager.getClient() 這裡對應的實現類為
ConnectionBasedClientManager
這個實現類負責管理長連接clientId和client模型的映射關係
@Component("connectionBasedClientManager")
public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
//通過map存儲ID和client之間的關聯關係
private final ConcurrentMap<String, ConnectionBasedClient> clients = new ConcurrentHashMap<>();
//根據clientId查詢client
@Override
public Client getClient(String clientId) {
return clients.get(clientId);
}
}
- client.addServiceInstance(); 抽象類為
AbstractClient
:負責存儲當前客戶端服務註冊表,也就是 service和instance的關係。
protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);、
//將service和實例進行關聯
@Override
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
if (null == publishers.put(service, instancePublishInfo)) {
//監控指標自增實例數
MetricsMonitor.incrementInstanceCount();
}
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
return true;
}
- ClientOperationEvent.ClientRegisterServiceEvent() :這裡目的是為了過濾目標服務得到最終instance列表建立service和client的關係,能夠方便我們快速查詢,同時會觸發
ClientServiceIndexesManager
的監聽事件。
//服務與發布client的關係
private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
//服務與訂閱clientId的關係
private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
private void handleClientOperation(ClientOperationEvent event) {
Service service = event.getService();
String clientId = event.getClientId();
if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
addPublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
removePublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
addSubscriberIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
removeSubscriberIndexes(service, clientId);
}
}
private void addPublisherIndexes(Service service, String clientId) {
publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
publisherIndexes.get(service).add(clientId);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
private void removePublisherIndexes(Service service, String clientId) {
if (!publisherIndexes.containsKey(service)) {
return;
}
publisherIndexes.get(service).remove(clientId);
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
private void addSubscriberIndexes(Service service, String clientId) {
subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
// Fix #5404, Only first time add need notify event.
if (subscriberIndexes.get(service).add(clientId)) {
NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
}
}
private void removeSubscriberIndexes(Service service, String clientId) {
if (!subscriberIndexes.containsKey(service)) {
return;
}
subscriberIndexes.get(service).remove(clientId);
if (subscriberIndexes.get(service).isEmpty()) {
subscriberIndexes.remove(service);
}
}
請求流程圖:
服務端監控檢查
Nacos作為註冊中心不止提供了服務註冊和服務發現的功能,還提供了服務可用性檢測的功能,在1.0的版本中,臨時實例走的是distro協議,客戶端向註冊中心發送心跳來維持自身的健康(healthy)狀態,持久實例則走的是Raft協議存儲。
- 兩種檢測機制:
- 客戶端主動上報機制
- 伺服器端主動下探機制
客戶端主動上報機制:你主動找上級,說你沒有打卡(不健康狀態)
伺服器端主動下探機制:上級檢測到你有不打卡的記錄,主動來找你
對於Nacos健康檢測機制,我們不能主動去設置,但是健康檢查機制是和Nacos的服務實例類型強相關,主要是有兩種服務實例:
- 臨時實例:客戶端主動上報
- 持久實例:服務端主動下探
客戶端主動上報
臨時實例每隔5秒會主動上報自己的健康狀態,發送心跳,如果發送心跳的間隔時間超過15秒,Nacos伺服器端會將服務標記為亞健康狀態,如果超過30S沒有發送心跳,那麼服務實例會被從服務列表中剔除
在2.0版本以後,持久實例不變,臨時實例而是通過長連接來判斷實例是否健康。
- 長連接: 一個連接上可以連續發送多數據包,在連接保持期間,如果沒有數據包發送,需要雙方發鏈路檢測包,在Nacos2.0之後,使用Grpc協議代替了http協議。長連接會保持客戶端和服務端發送的狀態,在源碼中
ConnectionManager
管理所有客戶端的長連接
ConnectionManager: 每3秒檢測所有超過20S內沒有發生過通訊的客戶端,向客戶端發起ClientDetectionRequest探測請求,如果客戶端在1s內成功響應,則檢測通過,否則執行unregister方法移除Connection
如果客戶端持續和服務端進行通訊,服務端是不需要主動下探的,只有當客戶端沒有一直和服務端通訊的時候,服務端才會主動下探操作
@Service
public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> {
Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
//只要spring容器啟動,會觸發這個方法
@PostConstruct
public void start() {
// 啟動不健康連接排除功能.
RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
// 1. 統計過時(20s)連接
Set<Map.Entry<String, Connection>> entries = connections.entrySet();
//2.獲得需要剔除的IP和埠
//3.根據限制獲取剔除的IP和埠
//4.如果還是有需要剔除的客戶端,則繼續執行
//5.沒有活動的客戶端執行探測
//6.如果沒有馬上響應,則馬上剔除
//7.剔除後發布ClientDisconnectEvent事件
}
});
}
}
//註銷(移出)連接方法
public synchronized void unregister(String connectionId) {
Connection remove = this.connections.remove(connectionId);
if (remove != null) {
String clientIp = remove.getMetaInfo().clientIp;
AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
if (atomicInteger != null) {
int count = atomicInteger.decrementAndGet();
if (count <= 0) {
connectionForClientIp.remove(clientIp);
}
}
remove.close();
Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId);
clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
}
當服務端操作移除事件以後,會操作notifyClientDisConnected()
方法,主要調用的是 clientConnectionEventListener.clientDisConnected(connection)
方法,將連接資訊傳入進去
public void notifyClientDisConnected(final Connection connection) {
for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {
try {
clientConnectionEventListener.clientDisConnected(connection);
} catch (Throwable throwable) {
Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}",
clientConnectionEventListener.getName(), throwable);
}
}
clientConnectionEventListenerd
的實現類是ConnectionBasedClientManager
,在這裡面會出發清除索引快取等操作
@Component("connectionBasedClientManager")
public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
@Override
public boolean clientDisconnected(String clientId) {
Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
//同步移除client數據
ConnectionBasedClient client = clients.remove(clientId);
if (null == client) {
return true;
}
client.release();
//服務訂閱,將變更通知到客戶端
NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
return true;
}
}
總結
到這裡Nacos服務端的基礎的源碼就講完了,有些地方我們沒有展開來講,在後續的源碼講解中,會給大家詳細的進行講解,今天主要講解了,服務端註冊以及監控檢查的基礎程式碼,後面會有最新的內容呈現給大家,如果覺得文中對您有幫助的,記得點贊關注~
今天是母親節,在這裡祝媽媽們,節日快樂!
我是牧小農,怕什麼真理無窮,進一步有進一步的歡喜,大家加油