Dubbo源碼(五) – 服務目錄
前言
本文基於Dubbo2.6.x版本,中文注釋版源碼已上傳github:xiaoguyu/dubbo
今天,來聊聊Dubbo的服務目錄(Directory)。下面是官方文檔對服務目錄的定義:
服務目錄中存儲了一些和服務提供者有關的資訊,通過服務目錄,服務消費者可獲取到服務提供者的資訊,比如 ip、埠、服務協議等。
服務目錄持有Invoker
對象集合,Dubbo的服務調用均由Invoker
發起。
當服務提供者資訊發生變化時(比如某一個服務掛了),服務目錄也需要動態調整。
繼承體系
服務目錄目前內置的實現有兩個,分別為 StaticDirectory 和 RegistryDirectory。它們均繼承自AbstractDirectory,而 AbstractDirectory 實現了 Directory 介面。Directory 介面提供了list(Invocation invocation) 方法,這個方法就是用來獲取 invoker 集合的。
再看 RegistryDirectory 實現了 NotifyListener 介面,這個介面中只有一個方法,notify(List
源碼分析
上面我們講了,服務調用需求用到 invoker,而服務目錄持有 invoker 集合,並通過 list 方法提供 invoker。下面放上服務消費者Demo中DemoService#sayHello 方法的調用路徑
AbstractDirectory 實現了 Directory 介面的 list 方法
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
// 調用 doList 方法列舉 Invoker,doList 是模板方法,由子類實現
List<Invoker<T>> invokers = doList(invocation);
// 獲取路由 Router 列表
List<Router> localRouters = this.routers; // local reference
if (localRouters != null && !localRouters.isEmpty()) {
for (Router router : localRouters) {
try {
// 獲取 runtime 參數,並根據參數決定是否進行路由
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
// 進行服務路由
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
}
return invokers;
}
此方法就兩段邏輯:
- 通過 doList 獲取 invoker 集合
- 通過路由選擇合適的 invoker
路由非本文重點,略過。
doList 是模板方法,由子類實現。
StaticDirectory
StaticDirectory 是一個靜態服務目錄,其 invokers 集合通過構造方法注入,不應被改變。
// StaticDirectory的doList啥都沒做,直接返回持有的invokers
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
// 列舉 Inovker,也就是直接返回 invokers 成員變數
return invokers;
}
StaticDirectory 的其它方法就不分析了,同樣很簡單。
RegistryDirectory
RegistryDirectory 是動態調整的服務目錄,其持有的 invokers 有內部方法生成。
訂閱節點
在上篇博文《Dubbo源碼(四) – 服務引用(消費者)》中,我留了一個坑,也就是服務引用過程中,創建了註冊中心之後,如何訂閱節點數據。在RegistryProtocol#doRefer
方法中。
其中調用了RegistryDirectory#subscribe(URL url)
方法
public void subscribe(URL url) {
setConsumerUrl(url);
registry.subscribe(url, this);
}
我們用的註冊中心是 zookeeper,所以 registry 是 ZookeeperRegistry
,而 subscribe 方法的實現在其父類FailbackRegistry
中
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// Sending a subscription request to the server side
doSubscribe(url, listener);
} catch (Exception e) {
......
// 訂閱失敗處理
addFailedSubscribed(url, listener);
}
}
模板方法,調用子類的 doSubscribe 方法
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
...
} else {
List<URL> urls = new ArrayList<URL>();
// 切割路徑(providers、configurators、routers等)
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
// 快取操作,獲取節點監聽器
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
// 這裡和方法末尾的 notify(url, listener, urls); 是調用的同一個方法
// 節點變更時觸發變更操作
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
// 註冊節點監聽器
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 觸發節點變更操作
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
訂閱方法做了3個操作:
- 切割url,拆分訂閱路徑
- 創建節點監聽器
- 觸發節點變更操作
這裡注意下,訂閱時節點數據並沒有發生變更,所以需要手動觸發 notify 方法。
下面繼續看節點變更操作做了什麼,調用路徑有點深,就不一步一步調試了,直接把路徑寫在注釋上。
// FailbackRegistry#notify(URL url, NotifyListener listener, List<URL> urls) ->
// FailbackRegistry#doNotify(URL url, NotifyListener listener, List<URL> urls) ->
// AbstractRegistry#notify(URL url, NotifyListener listener, List<URL> urls)
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
......
Map<String, List<URL>> result = new HashMap<String, List<URL>>();
// 將urls按分類分組轉成map
......
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
saveProperties(url);
listener.notify(categoryList);
}
}
此處的listener
變數,就是本節的主角RegistryDirectory
,下面來分析 listener.notify(categoryList)
public synchronized void notify(List<URL> urls) {
// 定義三個集合,分別用於存放服務提供者 url,路由 url,配置器 url
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
// 根據 category 參數分別對3種url進行處理
......
// 刷新 Invoker 列表
refreshInvoker(invokerUrls);
}
此方法分別對服務提供者 url,路由 url,配置器 url各自進行了處理,這裡我省略了對路由 url 和配置器 url 的處理,感興趣的自行去看源碼。咱們聚焦在 Invoker 的處理中
private void refreshInvoker(List<URL> invokerUrls) {
// invokerUrls 僅有一個元素,且 url 協議頭為 empty,此時表示禁用所有服務
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
// 設置 forbidden 為 true
this.forbidden = true; // Forbid to access
this.methodInvokerMap = null; // Set the method invoker map to null
// 銷毀所有 Invoker
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
// 添加快取 url 到 invokerUrls 中
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
// 快取 invokerUrls
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
// 將 url 轉成 Invoker
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
// 將 newUrlInvokerMap 轉成方法名到 Invoker 列表的映射
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
// state change
// If the calculation is wrong, it is not processed.
// 轉換出錯,直接列印異常,並返回
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
// 合併多個組的 Invoker
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
// 銷毀無用 Invoker
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
此方法中的邏輯有點多,
- 判斷是否要銷毀所有 invoker
- 創建 invoker
- 處理映射
- 銷毀無用 invoker
我們關注下 invoker 的創建,toInvokers(invokerUrls)
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
......
// 獲取服務消費端配置的協議
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
......
// 將本地 Invoker 快取賦值給 localUrlInvokerMap
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
// 獲取 disable 配置,取反,然後賦值給 enable 變數
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
// 獲取 enable 配置,並賦值給 enable 變數
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
// 調用 refer 獲取 Invoker
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
// 快取 Invoker 實例
newUrlInvokerMap.put(key, invoker);
}
// 快取命中
} else {
// 將 invoker 存儲到 newUrlInvokerMap 中
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
這裡的判斷有點複雜,會對協議各種判斷(是否支援、是否為empty)等,然後如果快取未命中,則需要創建invoker,也就是protocol.refer(serviceType, url)
這一段程式碼。
此時,我們上一篇文章留下的另一個坑也填上了,也就是DubboProtocol#refer
的調用時機。
獲取invoker集合
public List<Invoker<T>> doList(Invocation invocation) {
......
List<Invoker<T>> invokers = null;
// 獲取 Invoker 本地快取
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
// 獲取方法名和參數列表
String methodName = RpcUtils.getMethodName(invocation);
Object[] args = RpcUtils.getArguments(invocation);
// 檢測參數列表的第一個參數是否為 String 或 enum 類型
if (args != null && args.length > 0 && args[0] != null
&& (args[0] instanceof String || args[0].getClass().isEnum())) {
// 通過 方法名 + 第一個參數名稱 查詢 Invoker 列表,具體的使用場景暫時沒想到
invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
}
if (invokers == null) {
// 通過方法名獲取 Invoker 列表
invokers = localMethodInvokerMap.get(methodName);
}
if (invokers == null) {
// 通過星號 * 獲取 Invoker 列表
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
// 冗餘邏輯,pull request #2861 移除了下面的 if 分支程式碼
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}
}
// 返回 Invoker 列表
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}
這裡的邏輯也很簡單,就是從類變數 methodInvokerMap 中獲取invoker,所有我們需要去看看 methodInvokerMap 的賦值。
我們在上一小節的 refreshInvoker 方法中,講了 invoker 的生成。refreshInvoker 方法中還有對methodInvokerMap 的處理。也就是 toMethodInvokers(newUrlInvokerMap)
方法
這裡面會將 url-invoker 的映射轉成 方法名-invoker 的映射。
總結
Dubbo的服務調用,需要通過服務目錄拿到 invoker 才能發起。當註冊中心發生變化時,服務目錄同樣需要動態調整,並刷新持有的 invoker 集合。服務目錄是 Dubbo 集群容錯的一部分,也是比較基礎的部分。
PS:以上講的不包含本地服務調用,別杠
參考資料