Dubbo源碼(九) – 服務調用過程

1. 前言

本文基於Dubbo2.6.x版本,中文注釋版源碼已上傳github:xiaoguyu/dubbo

源碼分析均基於官方Demo,路徑:dubbo/dubbo-demo

如果沒有看過之前Dubbo系列的文章,建議先去看看。因為服務調用過程涉及範圍較廣,需要那些前置知識。

Dubbo 服務調用過程比較複雜,包含眾多步驟,比如發送請求、編解碼、服務降級、過濾器鏈處理、序列化、執行緒派發以及響應請求等步驟。限於篇幅原因,本篇文章無法對所有的步驟一一進行分析。後續挖坑再說吧。本篇文章將會重點分析請求的發送與接收、執行緒派發以及響應的發送與接收等過程。

2. 源碼分析

先了解下 Dubbo 服務調用過程(圖片來自官方文檔)

Untitled

首先服務消費者通過代理對象 Proxy 發起遠程調用,接著通過網路客戶端 Client 將編碼後的請求發送給服務提供方的網路層上,也就是 Server。Server 在收到請求後,首先要做的事情是對數據包進行解碼。然後將解碼後的請求發送至分發器 Dispatcher,再由分發器將請求派發到指定的執行緒池上,最後由執行緒池調用具體的服務。這就是一個遠程調用請求的發送與接收過程。至於響應的發送與接收過程,這張圖中沒有表現出來。

2.1 服務調用方式

Dubbo 支援同步和非同步兩種調用方式,其中非同步調用還可細分為「有返回值」的非同步調用和「無返回值」的非同步調用。所謂「無返回值」非同步調用是指服務消費方只管調用,但不關心調用結果,此時 Dubbo 會直接返回一個空的 RpcResult。Dubbo 默認使用同步調用方式。

2.1.1 非同步調用案例

當有返回值非同步和無返回值非同步同時存在,無返回值非同步優先:

  • 有返回值非同步調用

    修改配置,將參數async設置為 true

    <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
        <dubbo:method name="sayHello" async="true" />
    </dubbo:reference>
    

    程式碼使用如下

    String hello = demoService.sayHello("world");// 返回值為null,要注意
    Future<String> future = RpcContext.getContext().getFuture();
    ... // 業務執行緒可以開始做其他事情
    result = future.get();
    
  • 無返回值非同步調用

    修改配置,將參數return設置為 false

    <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
        <dubbo:method name="sayHello" return="false" />
    </dubbo:reference>
    

    程式碼使用

    String hello = demoService.sayHello("world");// 返回值為null,要注意
    Future<String> future = RpcContext.getContext().getFuture();// future 為 null
    

下面,我們開始進入源碼分析。

2.1.2 InvokerInvocationHandler

當我們通過Spring注入服務介面時,實際上注入的是服務介面的實現類,這個實現類由Dubbo框架生成。請看 服務引用#創建代理對象

package org.apache.dubbo.common.bytecode;

public class proxy0 implements org.apache.dubbo.demo.DemoService {

    public static java.lang.reflect.Method[] methods;

    private java.lang.reflect.InvocationHandler handler;

    public proxy0() {
    }

    public proxy0(java.lang.reflect.InvocationHandler arg0) {
        handler = 1;
    }

    public java.lang.String sayHello(java.lang.String arg0) {
        Object[] args = new Object[1];
        args[0] = (w) $1;
        Object ret = handler.invoke(this, methods[0], args);
        return (java.lang.String) ret;
    }
}

也就是調用 demoService.sayHello 時,實際上是調用 handler.invoke ,而這個 handler 就是InvokerInvocationHandler

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();

        // 攔截定義在 Object 類中的方法(未被子類重寫),比如 wait/notify
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        // 如果 toString、hashCode 和 equals 等方法被子類重寫了,這裡也直接調用
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        // 將 method 和 args 封裝到 RpcInvocation 中,並執行後續的調用
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
}

invoke 方法判斷如果是 java 內置的一下方法,則直接調用,不走 dubbo 的邏輯。所以我們關注的是 invoker.invoke() 。類變數 invoker 實際上是 FailoverClusterInvoker, 但是又被 MockClusterInvoker包裝了一層。這個 FailoverClusterInvoker 是由FailoverCluster生成的,請看 服務引用#遠程引用 。而 MockClusterInvoker 是由MockClusterWrapper生成,其基於Dubbo的SPI機制,將 FailoverCluster 又包裝了一遍。MockClusterInvoker內部封裝了服務降級邏輯。以後再開坑聊。

我們在 Dubbo集群 文章中講過FailoverClusterInvoker,所以直接快進到DubboInvoker#doInvoke()方法。此時是不是一臉懵逼,為啥從 FailoverClusterInvoker 一下子就到了 DubboInvoker ,我們先來看看調用棧

Untitled

我們把視角拉回FailoverClusterInvoker#doInvoke,看看通過負載均衡選出的 invoker

Untitled

從圖片可以看到,最外層的invoker是一個內部類,是 服務目錄通過訂閱註冊中心 生成的

invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);

而 protocol 實際是DubboProtocol,所以 protocol.refer(serviceType, url) 生成的是DubboInvoker,至於為啥調用鏈這麼長,是因為ProtocolFilterWrapper,這個類增加了對Dubbo過濾器的支援。這是一個 protocol 的包裝類,它包裝了DubboProtocol#refer() ,我們取看看 ProtocolFilterWrapper的源碼

@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        return protocol.refer(type, url);
    }
    // 創建invoker鏈條
    return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    // 獲取過濾器
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (!filters.isEmpty()) {
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            // 對invoker進行封裝,責任鏈模式
            last = new Invoker<T>() {
                ......

                @Override
                public Result invoke(Invocation invocation) throws RpcException {
                    return filter.invoke(next, invocation);
                }

            };
        }
    }
    return last;
}

buildInvokerChain 方法將 invoker 轉換成責任鏈的形式,獲取的 filters 為 {ConsumerContextFilter,FutureFilter,MonitorFilter},和圖片中的調用棧就對應上了。

那麼還剩下ListenerInvokerWrapper,這是一個 Invoker 包裝類,由 ProtocolListenerWrapper 生成。

public class ProtocolListenerWrapper implements Protocol {

    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        // 封裝了Invoker監聽器
        return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
                Collections.unmodifiableList(
                        ExtensionLoader.getExtensionLoader(InvokerListener.class)
                                .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
    }

}

public class ListenerInvokerWrapper<T> implements Invoker<T> {

    public ListenerInvokerWrapper(Invoker<T> invoker, List<InvokerListener> listeners) {
        if (invoker == null) {
            throw new IllegalArgumentException("invoker == null");
        }
        this.invoker = invoker;
        this.listeners = listeners;
        if (listeners != null && !listeners.isEmpty()) {
            for (InvokerListener listener : listeners) {
                if (listener != null) {
                    try {
                        listener.referred(invoker);
                    } catch (Throwable t) {
                        logger.error(t.getMessage(), t);
                    }
                }
            }
        }
    }

    @Override
    public void destroy() {
        try {
            invoker.destroy();
        } finally {
            if (listeners != null && !listeners.isEmpty()) {
                for (InvokerListener listener : listeners) {
                    if (listener != null) {
                        try {
                            listener.destroyed(invoker);
                        } catch (Throwable t) {
                            logger.error(t.getMessage(), t);
                        }
                    }
                }
            }
        }
    }

}

總結一下:

ProtocolFilterWrapper是 Invoker 過濾器的支援,dubbo的過濾器用的也是責任鏈模式ListenerInvokerWrapper是 Invoker 監聽器的支援

2.1.3 DubboInvoker

上面啰嗦了很多,終於回到主線 DubboInvoker 。它繼承自 AbstractInvoker ,invoke 方法在抽象父類中

public abstract class AbstractInvoker<T> implements Invoker<T> {
    @Override
    public Result invoke(Invocation inv) throws RpcException {
        ...

        RpcInvocation invocation = (RpcInvocation) inv;
        // 設置 Invoker
        invocation.setInvoker(this);
        if (attachment != null && attachment.size() > 0) {
            // 設置 attachment
            invocation.addAttachmentsIfAbsent(attachment);
        }
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            // 添加 contextAttachments 到 RpcInvocation#attachment 變數中
            invocation.addAttachments(contextAttachments);
        }
        if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
            // 設置非同步資訊到 RpcInvocation#attachment 中
            invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
        }
        // 添加調用id
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        Byte serializationId = CodecSupport.getIDByName(getUrl().getParameter(SERIALIZATION_KEY, DEFAULT_REMOTING_SERIALIZATION));
        if (serializationId != null) {
            invocation.put(SERIALIZATION_ID_KEY, serializationId);
        }

        try {
            // 抽象方法,由子類實現
            return doInvoke(invocation);
        } catch (InvocationTargetException e) { // biz exception
            ...
        } catch (RpcException e) {
            ...
        } catch (Throwable e) {
            return new RpcResult(e);
        }
    }
}

invoke 方法中,主要用於添加資訊到 RpcInvocation#attachment 中,給後續的邏輯使用。重點是 doInvoke 方法,這是一個抽象方法,由子類 DubboInvoker 實現。

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    // 設置 path 和 version 到 attachment 中
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == 1) {
        // 從 clients 數組中獲取 ExchangeClient
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        // 獲取非同步配置
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        // isOneway 為 true,表示「單向」通訊
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // 非同步無返回值
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            // 發送請求
            currentClient.send(inv, isSent);
            // 設置上下文中的 future 欄位為 null
            RpcContext.getContext().setFuture(null);
            // 返回一個空的 RpcResult
            return new RpcResult();
        // 非同步有返回值
        } else if (isAsync) {
            // 發送請求,並得到一個 ResponseFuture 實例
            ResponseFuture future = currentClient.request(inv, timeout);
            // 設置 future 到上下文中
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            // 暫時返回一個空結果
            return new RpcResult();
        // 同步調用
        } else {
            RpcContext.getContext().setFuture(null);
            // 發送請求,得到一個 ResponseFuture 實例,並調用該實例的 get 方法進行等待
            return (Result) currentClient.request(inv, timeout).get();
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

doInvoke 方法主要是對同步和非同步調用的邏輯處理。可以看到,在有返回值的情況下,同步和非同步都是通過 currentClient.request 來發送請求。區別在於,同步調用會使用 ResponseFuture#get 方法阻塞,知道請求完成,得到返回值。而非同步是將 ResponseFuture 放到上下文對象中,返回空結果。

FutureAdapter 是一個適配器,它實現了 jdk 內置的 Future 介面,將 ResponseFuture 轉換成 Future 的用法,更貼合用戶習慣。這裡我們的重點是ResponseFuture是如何支援非同步調用的,這個介面的默認實現是DefaultFuture

public class DefaultFuture implements ResponseFuture {
    private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();
    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();    

    // 構造方法
    public DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        // 獲取請求 id,這個 id 很重要,後面還會見到
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // 存儲 <requestId, DefaultFuture> 映射關係到 FUTURES 中
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }

    // 阻塞等待並獲取請求結果
    @Override
    public Object get() throws RemotingException {
        return get(timeout);
    }

    @Override
    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        // 檢測服務提供方是否成功返回了調用結果
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                // 循環檢測服務提供方是否成功返回了調用結果
                while (!isDone()) {
                    // 如果調用結果尚未返回,這裡等待一段時間,默認1000毫秒
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    // 如果調用結果成功返回,或等待超時,此時跳出 while 循環,執行後續的邏輯
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            // 如果調用結果仍未返回,則拋出超時異常
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

    @Override
    public boolean isDone() {
        // 通過檢測 response 欄位為空與否,判斷是否收到了調用結果
        return response != null;
    }

    // 當請求有響應時,調用此方法
    public static void received(Channel channel, Response response) {
        try {
            // 根據調用編號從 FUTURES 集合中查找指定的 DefaultFuture 對象
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                // 這是請求超時,但是結果返回了的警告
                logger.warn("...");
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }

    private void doReceived(Response res) {
        lock.lock();
        try {
            // 保存響應對象
            response = res;
            if (done != null) {
                // 喚醒用戶執行緒
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }

}

上面對DefaultFuture做了部分程式碼精簡。get 方法阻塞等待返回值。而 received 方法則是在請求有相應時,保存響應對象並喚醒 get 方法中的循環。這裡是很典型的 future 結構的寫法,有疑問的同學可以去了解下 Java 的並發知識。

2.2 服務消費方發送請求

上節講了 Dubbo 的同步、非同步調用方式。本節來講講有返回值的情況下,Dubbo 消費方是如何發送請求的。

我們把實現拉回 DubboInvoker#doInvoke 方法中,其有返回值的請求方法為 currentClient.request(inv, timeout),currentClient 為ReferenceCountExchangeClient,我們看下面這張調用棧圖

Untitled

  • ReferenceCountExchangeClient:為 ExchangeClient 添加引用計數功能
  • HeaderExchangeClient:內部持有 client ,並封裝了心跳的功能

從 DubboInvoker 到 HeaderExchangeChannel,在 服務引用 文章就講過了,這裡不再贅述。下面直接看HeaderExchangeChannel 中的 request 方法

public ResponseFuture request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // create request.
    // 創建 Request 對象
    Request req = new Request();
    req.setVersion(Version.getProtocolVersion());
    // 設置雙向通訊標誌為 true
    req.setTwoWay(true);
    // 這裡的 request 變數類型為 RpcInvocation
    req.setData(request);

    // 創建 DefaultFuture 對象
    DefaultFuture future = new DefaultFuture(channel, req, timeout);
    try {
        // 調用 NettyClient 的 send 方法發送請求(在父類AbstractPeer中)
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    // 返回 DefaultFuture 對象
    return future;
}

從上面的方法可以看到,將請求數據封裝成 Request 對象,傳遞給 DefaultFuture,再發送出去。Request 在構造方法中會創建請求id,用於在接收到響應時,確定是哪個請求的響應。繼續看請求的發送方法 channel.send(req),channel 是 NettyClient,結合類圖看調用路徑

Untitled

public abstract class AbstractPeer implements Endpoint, ChannelHandler {
    @Override
    public void send(Object message) throws RemotingException {
        send(message, url.getParameter(Constants.SENT_KEY, false));
    }
}

public abstract class AbstractClient extends AbstractEndpoint implements Client {
    public void send(Object message, boolean sent) throws RemotingException {
        if (send_reconnect && !isConnected()) {
            connect();
        }
        // 獲取 Channel,getChannel 是一個抽象方法,具體由子類實現
        Channel channel = getChannel();

        if (channel == null || !channel.isConnected()) {
            throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
        }
        // 繼續向下調用
        channel.send(message, sent);
    }
}

這裡就兩個重點,獲取 channel 和 使用 channel 繼續往下調用。先看看如何獲取 channel

public class NettyClient extends AbstractClient {
    @Override
    protected com.alibaba.dubbo.remoting.Channel getChannel() {
        Channel c = channel;
        if (c == null || !c.isActive()) {
            return null;
        }
        // 獲取一個 NettyChannel 類型對象
        return NettyChannel.getOrAddChannel(c, getUrl(), this);
    }
}

final class NettyChannel extends AbstractChannel {
    // 私有構造方法
    private NettyChannel(Channel channel, URL url, ChannelHandler handler) {
        super(url, handler);
        if (channel == null) {
            throw new IllegalArgumentException("netty channel == null;");
        }
        this.channel = channel;
    }

    static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) {
        if (ch == null) {
            return null;
        }

        // 嘗試從集合中獲取 NettyChannel 實例
        NettyChannel ret = channelMap.get(ch);
        if (ret == null) {
            // 如果 ret = null,則創建一個新的 NettyChannel 實例
            NettyChannel nettyChannel = new NettyChannel(ch, url, handler);
            if (ch.isActive()) {
                ret = channelMap.putIfAbsent(ch, nettyChannel);
            }
            if (ret == null) {
                ret = nettyChannel;
            }
        }
        return ret;
    }
}

獲取 channel 的邏輯很簡單,從快取獲取 NettyChannel,沒有則創建。下面繼續看 channel.send(message, sent)

public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent);

    boolean success = true;
    int timeout = 0;
    try {
        // 發送消息(包含請求和響應消息)
        ChannelFuture future = channel.writeAndFlush(message);
        // sent 的值源於 <dubbo:method sent="true/false" /> 中 sent 的配置值,有兩種配置值:
        //   1. true: 等待消息發出,消息發送失敗將拋出異常
        //   2. false: 不等待消息發出,將消息放入 IO 隊列,即刻返回
        // 默認情況下 sent = false;
        if (sent) {
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 等待消息發出,若在規定時間沒能發出,success 會被置為 false
            success = future.await(timeout);
        }
        Throwable cause = future.cause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }

    // 若 success 為 false,這裡拋出異常
    if (!success) {
        throw new RemotingException(this, "...");
    }
}

至此,請求數據的發送過程就結束了。涉及 Netty 的發送編解碼處理過程,感興趣的可以從 NettyClient#doOpen方法入手,這裡鑒於篇幅,就不寫了。

2.2.1 調用路徑

下面我們來總結一下消費端調用發送請求過程的調用棧(以 DemoService 為例)

proxy0#sayHello(String)
  —> InvokerInvocationHandler#invoke(Object, Method, Object[])
    —> MockClusterInvoker#invoke(Invocation)
      —> AbstractClusterInvoker#invoke(Invocation)
        —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance)
          —> Filter#invoke(Invoker, Invocation)  // 包含多個 Filter 調用
            —> ListenerInvokerWrapper#invoke(Invocation) 
              —> AbstractInvoker#invoke(Invocation) 
                —> DubboInvoker#doInvoke(Invocation)
                  —> ReferenceCountExchangeClient#request(Object, int)
                    —> HeaderExchangeClient#request(Object, int)
                      —> HeaderExchangeChannel#request(Object, int)
                        —> AbstractPeer#send(Object)
                          —> AbstractClient#send(Object, boolean)
                            —> NettyChannel#send(Object, boolean)
                              —> NioClientSocketChannel#write(Object)

2.3 服務提供方接收請求

默認情況下 Dubbo 使用 Netty 作為底層的通訊框架,從 NettyServer#doOpen 方法知道,接收請求的入口在 NettyServerHandler#channelRead,這裡已經是解碼之後得到的數據。然後數據依次經過 MultiMessageHandler、HeartbeatHandler 以及 AllChannelHandler 。至於為什麼是這幾個類以及順序,可以去看 NettyServer 的構造方法。下面我們首先看調用棧

Untitled

public class NettyServer extends AbstractServer implements Server {
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
}

public class ChannelHandlers {
    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }
}

public class ChannelHandlers {
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}

MultiMessageHandler、HeartbeatHandler 直接通過構造方法創建,而 AllChannelHandler 則由 Dispatcher 的默認自適應拓展類 AllDispatcher 創建。

2.3.1 執行緒派發模型

剛才講到了 Dispatcher,這是一個執行緒派發器。讓我們回顧一下 Dubbo 服務調用過程圖(圖片來自官方文檔)

Untitled

Dubbo 將底層通訊框架中接收請求的執行緒稱為 IO 執行緒。如果一些事件處理邏輯可以很快執行完,此時直接在 IO 執行緒上執行該段邏輯即可。但如果事件的處理邏輯比較耗時,比如該段邏輯會發起資料庫查詢或者 HTTP 請求。此時我們就不應該讓事件處理邏輯在 IO 執行緒上執行,而是應該派發到執行緒池中去執行。原因也很簡單,IO 執行緒主要用於接收請求,如果 IO 執行緒被佔滿,將導致它不能接收新的請求。PS:像不像Netty的主從模型,萬物殊途同歸啊。

Dispatcher 真實的職責創建具有執行緒派發能力的 ChannelHandler,比如 AllChannelHandler、MessageOnlyChannelHandler 和 ExecutionChannelHandler 等,其本身並不具備執行緒派發能力。Dubbo 支援 5 種不同的執行緒派發策略

策略 用途
all 所有消息都派發到執行緒池,包括請求,響應,連接事件,斷開事件等
direct 所有消息都不派發到執行緒池,全部在 IO 執行緒上直接執行
message 只有請求和響應消息派發到執行緒池,其它消息均在 IO 執行緒上執行
execution 只有請求消息派發到執行緒池,不含響應。其它消息均在 IO 執行緒上執行
connection 在 IO 執行緒上,將連接斷開事件放入隊列,有序逐個執行,其它消息派發到執行緒池

下面我們看看默認的 AllChannelHandler

public class AllChannelHandler extends WrappedChannelHandler {
    /** 處理請求和響應消息,這裡的 message 變數類型可能是 Request,也可能是 Response */
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        // 獲取執行緒池,由自適應拓展生成,默認由 FixedThreadPool 生成
        ExecutorService cexecutor = getExecutorService();
        try {
            // 將請求和響應消息派發到執行緒池中處理
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
        	if(message instanceof Request && t instanceof RejectedExecutionException){
        		Request request = (Request)message;
            // 如果通訊方式為雙向通訊,此時將 Server side ... threadpool is exhausted
            // 錯誤資訊封裝到 Response 中,並返回給服務消費方。
        		if(request.isTwoWay()){
        			String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
        			Response response = new Response(request.getId(), request.getVersion());
        			response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
        			response.setErrorMessage(msg);
              // 返回包含錯誤資訊的 Response 對象
        			channel.send(response);
        			return;
        		}
        	}
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
}

請求對象會被封裝 ChannelEventRunnable 中,也就是 ChannelEventRunnable#run 方法才是實際處理請求的地方。

2.3.2 調用服務

public class ChannelEventRunnable implements Runnable {
    @Override
    public void run() {
        // 檢測通道狀態,對於請求或響應消息,此時 state = RECEIVED
        if (state == ChannelState.RECEIVED) {
            try {
                // 將 channel 和 message 傳給 ChannelHandler 對象,進行後續的調用
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("...", e);
            }
        // 其他消息類型通過 switch 進行處理
        } else {
            switch (state) {
            case CONNECTED:
                ...
            case DISCONNECTED:
                ...
            case SENT:
                ...
            case CAUGHT:
                ...
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
            }
        }
    }
}

ChannelEventRunnable 依然不進行調用邏輯,只是根據通道的狀態將請求轉發。可以注意一下,這裡特意對 RECEIVED 狀態用了 if 判斷,然後其它狀態使用 switch 來判斷,是因為絕大部分的請求都是 RECEIVED 類型。

這裡的 handler 是 DecodeHandler,這是一個解碼處理器。也許你會以為,這個是不是和 InternalDecoder衝突了?既然解碼操作已經在 IO 執行緒(也就是 Netty 的 WorkerGroup)中處理了,為什麼到 Dubbo 執行緒池中,還要再處理一次?這取決於 decode.in.io 參數,允許將部分解碼工作交由 Dubbo 執行緒池中完成。下面我們略過 DecodeHandler,快進到 HeaderExchangeHandler 中

public class HeaderExchangeHandler implements ChannelHandlerDelegate {
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            // 處理請求對象
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    // 處理事件
                    handlerEvent(channel, request);
                // 處理普通的請求
                } else {
                    // 雙向通訊
                    if (request.isTwoWay()) {
                        // 向後調用服務,並得到調用結果
                        Response response = handleRequest(exchangeChannel, request);
                        // 將調用結果返回給服務消費端
                        channel.send(response);
                    } else {
                        // 如果是單向通訊,僅向後調用指定服務即可,無需返回調用結果
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            // 處理響應對象,服務消費方會執行此處邏輯
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                // telnet 相關
                ...
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

    // 處理請求
    Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        // 檢測請求是否合法,不合法則返回狀態碼為 BAD_REQUEST 的響應
        if (req.isBroken()) {
            ...
            return res;
        }
        // 獲取 data 欄位值,也就是 RpcInvocation 對象
        Object msg = req.getData();
        try {
            // handle data.
            // 繼續向下調用
            Object result = handler.reply(channel, msg);
            // 設置 OK 狀態碼
            res.setStatus(Response.OK);
            // 設置調用結果
            res.setResult(result);
        } catch (Throwable e) {
            // 若調用過程出現異常,則設置 SERVICE_ERROR,表示服務端異常
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
        }
        return res;
    }
}

處理過程注釋中已經寫了。通過 handleRequest 方法處理請求得到返回值,並通過 channel.send 將結果返回給消費者。(碎碎念:這個 channel 和 Netty 的是真的像)

handleRequest 方法中主要是對 Response 對象的處理,我們繼續跟進調用過程 handler.reply(channel, msg),這個 handler 是 DubboProtocol的類變數requestHandler

public class DubboProtocol extends AbstractProtocol {

    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

        @Override
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                // 獲取 Invoker 實例
                Invoker<?> invoker = getInvoker(channel, inv);
                // need to consider backward-compatibility if it's a callback
                // 回調相關
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    ...
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                // 通過 Invoker 調用具體的服務
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "...");
        }

        ...
    };

    Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
        ...
        // 計算 service key,格式為 groupName/serviceName:serviceVersion:port。比如:
        //   dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880
        String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));

        // 從 exporterMap 查找與 serviceKey 相對應的 DubboExporter 對象,
        // 服務導出過程中會將 <serviceKey, DubboExporter> 映射關係存儲到 exporterMap 集合中
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

        if (exporter == null)
            throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);

        // 獲取 Invoker 對象,並返回
        return exporter.getInvoker();
    }
}

reply 方法先是獲取 Invoker 實例,然後通過 Invoker 調用具體的服務。想了解 Invoker 的創建以及如何放入到 exporterMap 中的,可以看以前寫過的 服務導出 文章。下面這段在 服務導出 文章中均有提過,不想看的可以直接跳到本節末尾看調用路徑。

invoke 方法定義在 AbstractProxyInvoker 中

public abstract class AbstractProxyInvoker<T> implements Invoker<T> {

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        try {
            // 調用 doInvoke 執行後續的調用,並將調用結果封裝到 RpcResult 中,並
            return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
        } catch (InvocationTargetException e) {
            return new RpcResult(e.getTargetException());
        } catch (Throwable e) {
            throw new RpcException("Failed to invoke remote proxy method ...");
        }
    }
    
    protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable;
}

doInvoke 是一個抽象方法,這個需要由具體的 Invoker 實例實現。Invoker 實例是在運行時通過 JavassistProxyFactory 創建的

public class JavassistProxyFactory extends AbstractProxyFactory {
    
    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        // 創建匿名類對象
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                // 調用 invokeMethod 方法進行後續的調用
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}

Wrapper 是一個抽象類,其中 invokeMethod 是一個抽象方法。Dubbo 會在運行時通過 Javassist 框架為 Wrapper 生成實現類,並實現 invokeMethod 方法,該方法最終會根據調用資訊調用具體的服務。以 DemoServiceImpl 為例,Javassist 為其生成的代理類如下。

public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
    // 省略其他方法

    public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
        DemoService demoService;
        try {
            // 類型轉換
            demoService = (DemoService)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        try {
            // 根據方法名調用指定的方法
            if ("sayHello".equals(string) && arrclass.length == 1) {
                return demoService.sayHello((String)arrobject[0]);
            }
        }
        catch (Throwable throwable) {
            throw new InvocationTargetException(throwable);
        }
        throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString());
    }
}

至此,服務端調用服務的過程就講完了。

2.3.3 調用路徑

下面我們來總結一下服務端調用服務過程的調用棧(以 DemoService 為例)

// 這是IO執行緒的調用過程
NettyServerHandler#channelRead(ChannelHandlerContext, Object)
  —> AbstractPeer#received(Channel, Object)
    —> MultiMessageHandler#received(Channel, Object)
      —> HeartbeatHandler#received(Channel, Object)
        —> AllChannelHandler#received(Channel, Object)
// 這是轉發到執行緒池之後的調用過程
ChannelEventRunnable#run()
  —> DecodeHandler#received(Channel, Object)
    —> HeaderExchangeHandler#received(Channel, Object)
      —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
        —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
          —> Filter#invoke(Invoker, Invocation)
            —> AbstractProxyInvoker#invoke(Invocation)
              —> Wrapper0#invokeMethod(Object, String, Class[], Object[])
                —> DemoServiceImpl#sayHello(String)

2.4 服務提供方返回調用結果

在 2.3.2 節中講了,調用結果會封裝在 Response 對象中,並由NettyChannel 的 send 方法將 Response 對象返回。詳情請看 HeaderExchangeHandler。至於返回 Response 過程中的編碼過程,我們省略。

2.5 服務消費方接收調用結果

消費者接收響應數據的處理過程中,從 NettyHandler (消費者是 NettyClientHandler,生產者是 NettyServerHandler,不過他們的 channelRead 方法一模一樣) 到 AllChannelHandler 的處理過程與服務提供方接收請求(2.3節)的處理過程一致,就不重複分析了。所以本節重點在 Dubbo如何將調用結果傳遞給用戶執行緒。

2.5.1 向用戶執行緒傳遞調用結果

我們直接快進到 HeaderExchangeHandler 的 received 方法中(調用路徑請看 2.3.2 節末尾)

public class HeaderExchangeHandler implements ChannelHandlerDelegate {

    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            // 處理請求對象
            if (message instanceof Request) {
                ...
            // 處理響應對象,服務消費方會執行此處邏輯
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                ...
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }

    static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }
}

可以看到,是調用 DefaultFuture#receive 方法處理的,DefaultFuture 對象我們在 2.1.3 節有講到,繼續追蹤程式碼

public class DefaultFuture implements ResponseFuture {

    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();

    public static void received(Channel channel, Response response) {
        try {
            // 根據調用編號從 FUTURES 集合中查找指定的 DefaultFuture 對象
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                // 這是請求超時,但是結果返回了的警告
                logger.warn("...");
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }

    private void doReceived(Response res) {
        lock.lock();
        try {
            // 保存響應對象
            response = res;
            if (done != null) {
                // 喚醒用戶執行緒
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }
}

在一次調用過程中,請求和相應的編號是一致的,所以可以根據調用編號從 FUTURES 中得到發起請求時創建的 DefaultFuture 。DefaultFuture.get 方法阻塞等待響應結果,而 DefaultFuture#received 是得到響應結果之後喚醒用戶執行緒(也就是 get 方法中的循環)。這兩個方法結合起來看就明白了。

3. 總結

沒啥好總結的,Dubbo 系列就寫完了。閱讀優秀框架的源碼從大的方面可以學習其思想以及架構,小的方面就是一個個小功能的寫法,比如負載均衡演算法、DefaultFuture、SPI 等等。

PS:總感覺 Dubbo 和 Netty 的執行緒模型殊途同歸


參考資料

Dubbo開發指南