Dubbo源碼(五) – 服務目錄

前言

本文基於Dubbo2.6.x版本,中文注釋版源碼已上傳github:xiaoguyu/dubbo

今天,來聊聊Dubbo的服務目錄(Directory)。下面是官方文檔對服務目錄的定義:

服務目錄中存儲了一些和服務提供者有關的資訊,通過服務目錄,服務消費者可獲取到服務提供者的資訊,比如 ip、埠、服務協議等。

服務目錄持有Invoker對象集合,Dubbo的服務調用均由Invoker發起。

當服務提供者資訊發生變化時(比如某一個服務掛了),服務目錄也需要動態調整。

繼承體系

Untitled

服務目錄目前內置的實現有兩個,分別為 StaticDirectory 和 RegistryDirectory。它們均繼承自AbstractDirectory,而 AbstractDirectory 實現了 Directory 介面。Directory 介面提供了list(Invocation invocation) 方法,這個方法就是用來獲取 invoker 集合的。

再看 RegistryDirectory 實現了 NotifyListener 介面,這個介面中只有一個方法,notify(List urls),當註冊中心節點資訊發生變化後,觸發此方法調整服務目錄中的配置資訊以及 invoker 集合。

源碼分析

上面我們講了,服務調用需求用到 invoker,而服務目錄持有 invoker 集合,並通過 list 方法提供 invoker。下面放上服務消費者Demo中DemoService#sayHello 方法的調用路徑

Untitled

AbstractDirectory 實現了 Directory 介面的 list 方法

public List<Invoker<T>> list(Invocation invocation) throws RpcException {
    if (destroyed) {
        throw new RpcException("Directory already destroyed .url: " + getUrl());
    }
    // 調用 doList 方法列舉 Invoker,doList 是模板方法,由子類實現
    List<Invoker<T>> invokers = doList(invocation);
    // 獲取路由 Router 列表
    List<Router> localRouters = this.routers; // local reference
    if (localRouters != null && !localRouters.isEmpty()) {
        for (Router router : localRouters) {
            try {
                // 獲取 runtime 參數,並根據參數決定是否進行路由
                if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                    // 進行服務路由
                    invokers = router.route(invokers, getConsumerUrl(), invocation);
                }
            } catch (Throwable t) {
                logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
            }
        }
    }
    return invokers;
}

此方法就兩段邏輯:

  1. 通過 doList 獲取 invoker 集合
  2. 通過路由選擇合適的 invoker

路由非本文重點,略過。

doList 是模板方法,由子類實現。

StaticDirectory

StaticDirectory 是一個靜態服務目錄,其 invokers 集合通過構造方法注入,不應被改變。

// StaticDirectory的doList啥都沒做,直接返回持有的invokers
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
    // 列舉 Inovker,也就是直接返回 invokers 成員變數
    return invokers;
}

StaticDirectory 的其它方法就不分析了,同樣很簡單。

RegistryDirectory

RegistryDirectory 是動態調整的服務目錄,其持有的 invokers 有內部方法生成。

訂閱節點

在上篇博文《Dubbo源碼(四) – 服務引用(消費者)》中,我留了一個坑,也就是服務引用過程中,創建了註冊中心之後,如何訂閱節點數據。在RegistryProtocol#doRefer方法中。

其中調用了RegistryDirectory#subscribe(URL url)方法

public void subscribe(URL url) {
    setConsumerUrl(url);
    registry.subscribe(url, this);
}

我們用的註冊中心是 zookeeper,所以 registry 是 ZookeeperRegistry,而 subscribe 方法的實現在其父類FailbackRegistry

public void subscribe(URL url, NotifyListener listener) {
    super.subscribe(url, listener);
    removeFailedSubscribed(url, listener);
    try {
        // Sending a subscription request to the server side
        doSubscribe(url, listener);
    } catch (Exception e) {
        ......

        // 訂閱失敗處理
        addFailedSubscribed(url, listener);
    }
}

模板方法,調用子類的 doSubscribe 方法

protected void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            ...
        } else {
            List<URL> urls = new ArrayList<URL>();
            // 切割路徑(providers、configurators、routers等)
            for (String path : toCategoriesPath(url)) {
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }
                // 快取操作,獲取節點監聽器
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        @Override
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            // 這裡和方法末尾的 notify(url, listener, urls); 是調用的同一個方法
                            // 節點變更時觸發變更操作
                            ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                zkClient.create(path, false);
                // 註冊節點監聽器
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            // 觸發節點變更操作
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

訂閱方法做了3個操作:

  1. 切割url,拆分訂閱路徑
  2. 創建節點監聽器
  3. 觸發節點變更操作

這裡注意下,訂閱時節點數據並沒有發生變更,所以需要手動觸發 notify 方法。

下面繼續看節點變更操作做了什麼,調用路徑有點深,就不一步一步調試了,直接把路徑寫在注釋上。

// FailbackRegistry#notify(URL url, NotifyListener listener, List<URL> urls) ->
// FailbackRegistry#doNotify(URL url, NotifyListener listener, List<URL> urls) ->
// AbstractRegistry#notify(URL url, NotifyListener listener, List<URL> urls)
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    ......
    Map<String, List<URL>> result = new HashMap<String, List<URL>>();
    // 將urls按分類分組轉成map
    ......
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        saveProperties(url);
        listener.notify(categoryList);
    }
}

此處的listener變數,就是本節的主角RegistryDirectory,下面來分析 listener.notify(categoryList)

public synchronized void notify(List<URL> urls) {
    // 定義三個集合,分別用於存放服務提供者 url,路由 url,配置器 url
    List<URL> invokerUrls = new ArrayList<URL>();
    List<URL> routerUrls = new ArrayList<URL>();
    List<URL> configuratorUrls = new ArrayList<URL>();
    // 根據 category 參數分別對3種url進行處理
    ......
    // 刷新 Invoker 列表
    refreshInvoker(invokerUrls);
}

此方法分別對服務提供者 url,路由 url,配置器 url各自進行了處理,這裡我省略了對路由 url 和配置器 url 的處理,感興趣的自行去看源碼。咱們聚焦在 Invoker 的處理中

private void refreshInvoker(List<URL> invokerUrls) {
    // invokerUrls 僅有一個元素,且 url 協議頭為 empty,此時表示禁用所有服務
    if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
            && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        // 設置 forbidden 為 true
        this.forbidden = true; // Forbid to access
        this.methodInvokerMap = null; // Set the method invoker map to null
        // 銷毀所有 Invoker
        destroyAllInvokers(); // Close all invokers
    } else {
        this.forbidden = false; // Allow to access
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
            // 添加快取 url 到 invokerUrls 中
            invokerUrls.addAll(this.cachedInvokerUrls);
        } else {
            this.cachedInvokerUrls = new HashSet<URL>();
            // 快取 invokerUrls
            this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
        }
        if (invokerUrls.isEmpty()) {
            return;
        }
        // 將 url 轉成 Invoker
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
        // 將 newUrlInvokerMap 轉成方法名到 Invoker 列表的映射
        Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
        // state change
        // If the calculation is wrong, it is not processed.
        // 轉換出錯,直接列印異常,並返回
        if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
            logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
            return;
        }
        // 合併多個組的 Invoker
        this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
        this.urlInvokerMap = newUrlInvokerMap;
        try {
            // 銷毀無用 Invoker
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}

此方法中的邏輯有點多,

  1. 判斷是否要銷毀所有 invoker
  2. 創建 invoker
  3. 處理映射
  4. 銷毀無用 invoker

我們關注下 invoker 的創建,toInvokers(invokerUrls)

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
    ......
    // 獲取服務消費端配置的協議
    String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
    for (URL providerUrl : urls) {
        ......
        // 將本地 Invoker 快取賦值給 localUrlInvokerMap
        Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
        Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
        if (invoker == null) { // Not in the cache, refer again
            try {
                boolean enabled = true;
                if (url.hasParameter(Constants.DISABLED_KEY)) {
                    // 獲取 disable 配置,取反,然後賦值給 enable 變數
                    enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                } else {
                    // 獲取 enable 配置,並賦值給 enable 變數
                    enabled = url.getParameter(Constants.ENABLED_KEY, true);
                }
                if (enabled) {
                    // 調用 refer 獲取 Invoker
                    invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
                }
            } catch (Throwable t) {
                logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
            }
            if (invoker != null) { // Put new invoker in cache
                // 快取 Invoker 實例
                newUrlInvokerMap.put(key, invoker);
            }
            // 快取命中
        } else {
            // 將 invoker 存儲到 newUrlInvokerMap 中
            newUrlInvokerMap.put(key, invoker);
        }
    }
    keys.clear();
    return newUrlInvokerMap;
}

這裡的判斷有點複雜,會對協議各種判斷(是否支援、是否為empty)等,然後如果快取未命中,則需要創建invoker,也就是protocol.refer(serviceType, url)這一段程式碼。

此時,我們上一篇文章留下的另一個坑也填上了,也就是DubboProtocol#refer的調用時機。

獲取invoker集合

public List<Invoker<T>> doList(Invocation invocation) {
    ......
    List<Invoker<T>> invokers = null;
    // 獲取 Invoker 本地快取
    Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
    if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
        // 獲取方法名和參數列表
        String methodName = RpcUtils.getMethodName(invocation);
        Object[] args = RpcUtils.getArguments(invocation);
        // 檢測參數列表的第一個參數是否為 String 或 enum 類型
        if (args != null && args.length > 0 && args[0] != null
                && (args[0] instanceof String || args[0].getClass().isEnum())) {
            // 通過 方法名 + 第一個參數名稱 查詢 Invoker 列表,具體的使用場景暫時沒想到
            invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
        }
        if (invokers == null) {
            // 通過方法名獲取 Invoker 列表
            invokers = localMethodInvokerMap.get(methodName);
        }
        if (invokers == null) {
            // 通過星號 * 獲取 Invoker 列表
            invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
        }

        // 冗餘邏輯,pull request #2861 移除了下面的 if 分支程式碼
        if (invokers == null) {
            Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
            if (iterator.hasNext()) {
                invokers = iterator.next();
            }
        }
    }
    // 返回 Invoker 列表
    return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}

這裡的邏輯也很簡單,就是從類變數 methodInvokerMap 中獲取invoker,所有我們需要去看看 methodInvokerMap 的賦值。

我們在上一小節的 refreshInvoker 方法中,講了 invoker 的生成。refreshInvoker 方法中還有對methodInvokerMap 的處理。也就是 toMethodInvokers(newUrlInvokerMap) 方法

這裡面會將 url-invoker 的映射轉成 方法名-invoker 的映射。

總結

Dubbo的服務調用,需要通過服務目錄拿到 invoker 才能發起。當註冊中心發生變化時,服務目錄同樣需要動態調整,並刷新持有的 invoker 集合。服務目錄是 Dubbo 集群容錯的一部分,也是比較基礎的部分。

PS:以上講的不包含本地服務調用,別杠


參考資料

Dubbo開發指南

Tags: