dubbo源码解析——服务调用过程

  • 2020 年 1 月 16 日
  • 筆記

本文中,将进入消费端源码解析(具体逻辑会放到代码的注释中)。本文先是对消费过程的总体代码逻辑理一遍,个别需要细讲的点,后面会专门的文章进行解析。

首先,把完整的流程给出来:

服务消费方发送请求

  • 应用启动的时候,消费者会订阅服务,并且拉取所有订阅的提供者节点信息到Directory中
  • 正式调用开始,在Directory中找出本次集群中的全部invokers
  • 在Router中,将上一步的全部invokers进行筛选,得到满足条件的invokers
  • 利用cluster集群容错来调用invoker
  • 在LoadBalance中,根据负载均衡策略,挑选出需要执行的invoker
  • 执行消费者filter链
  • 通过Exchange封装请求-响应对象
  • 对请求对象进行编码(序列化),通过网络框架(如Netty)发送数据

服务提供方接收请求

  • 对请求进行解码(反序列化)
  • 将这个请求对象封装成runable对象,由派发器决定这个请求由IO线程 或 业务线程池执行
    • 请求解码可在 IO 线程上执行,也可在线程池中执行,这个取决于运行时配置。
  • 获取invoker实例,执行invoke方法
  • 执行提供者filter链
  • invoker反射调用真正的实现类方法
  • 将结果进行编码
  • 服务提供方返回对应的结果

服务消费方接收调用结果

  • 对响应数据进行解码
  • 向用户线程传递调用结果
    • 响应数据解码完成后,Dubbo 会将响应对象派发到线程池上(默认)
    • 然后将响应对象从线程池线程传递到用户线程上。
    • 当响应对象到来后,用户线程会被唤醒,并通过调用编号获取属于自己的响应对象。

源码解析

开头进入InvokerInvocationHandler

通过实现InvocationHandler,我们知道dubbo生成代理使用的是JDK动态代理。这个类中主要是对特殊方法进行处理。由于在生成代理实例的时候,在构造函数中赋值了invoker,因此可以只用该invoker进行invoke方法的调用。

/**   * dubbo使用JDK动态代理,对接口对象进行注入   * InvokerHandler   *   * 程序启动的过程中,在构造函数中,赋值下一个需要调用的invoker,从而形成执行链   */  public class InvokerInvocationHandler implements InvocationHandler {        private final Invoker<?> invoker;        public InvokerInvocationHandler(Invoker<?> handler) {          this.invoker = handler;      }        @Override      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {          // 获取方法名称          String methodName = method.getName();          // 获取参数类型          Class<?>[] parameterTypes = method.getParameterTypes();          // 方法所处的类 是 Object类,则直接调用          if (method.getDeclaringClass() == Object.class) {              return method.invoke(invoker, args);          }            /*           * toString、hashCode、equals方法比较特殊,如果interface里面定义了这几个方法,并且进行实现,           * 通过dubbo远程调用是不会执行这些代码实现的。           */            /*           * 方法调用是toString,依次执行MockClusterInvoker、AbstractClusterInvoker的toString方法           */          if ("toString".equals(methodName) && parameterTypes.length == 0) {              return invoker.toString();          }            /*           * interface中含有hashCode方法,直接调用invoker的hashCode           */          if ("hashCode".equals(methodName) && parameterTypes.length == 0) {              return invoker.hashCode();          }            /*           * interface中含有equals方法,直接调用invoker的equals           */          if ("equals".equals(methodName) && parameterTypes.length == 1) {              return invoker.equals(args[0]);          }            /*           * invocationv包含了远程调用的参数、方法信息           */          RpcInvocation invocation;            /*           * todo这段代码在最新的dubbo版本中没有           */          if (RpcUtils.hasGeneratedFuture(method)) {              Class<?> clazz = method.getDeclaringClass();              String syncMethodName = methodName.substring(0, methodName.length() - Constants.ASYNC_SUFFIX.length());              Method syncMethod = clazz.getMethod(syncMethodName, method.getParameterTypes());              invocation = new RpcInvocation(syncMethod, args);              invocation.setAttachment(Constants.FUTURE_GENERATED_KEY, "true");              invocation.setAttachment(Constants.ASYNC_KEY, "true");          } else {              invocation = new RpcInvocation(method, args);              if (RpcUtils.hasFutureReturnType(method)) {                  invocation.setAttachment(Constants.FUTURE_RETURNTYPE_KEY, "true");                  invocation.setAttachment(Constants.ASYNC_KEY, "true");              }          }            // 继续invoker链式调用          return invoker.invoke(invocation).recreate();      }      }

进入MockClusterInvoker

这段代码主要是判断是否需要进行mock调用

@Override      public Result invoke(Invocation invocation) throws RpcException {          Result result = null;            // 获取mock参数,从而判断是否需要mock          String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();            if (value.length() == 0 || value.equalsIgnoreCase("false")) {              // 不需要mock,继续往下调用              result = this.invoker.invoke(invocation);          } else if (value.startsWith("force")) {              if (logger.isWarnEnabled()) {                  logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());              }              // 选择mock的invoker              result = doMockInvoke(invocation, null);          } else {              // 正常调用失败,则调用mock              try {                  result = this.invoker.invoke(invocation);              } catch (RpcException e) {                  if (e.isBiz()) {                      throw e;                  } else {                      if (logger.isWarnEnabled()) {                          logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);                      }                      result = doMockInvoke(invocation, e);                  }              }          }          return result;      }

进入AbstractClusterInvoker

进入这段代码,表明开始进入到集群

@Override      public Result invoke(final Invocation invocation) throws RpcException {          // 检查消费端invoker是否销毁了          checkWhetherDestroyed();            // 将参数绑定到invocation          Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();          if (contextAttachments != null && contextAttachments.size() != 0) {              ((RpcInvocation) invocation).addAttachments(contextAttachments);          }            // 获取满足条件的invoker(从Directory获取,并且经过router过滤)          List<Invoker<T>> invokers = list(invocation);            // 初始化loadBalance          LoadBalance loadbalance = initLoadBalance(invokers, invocation);            // invocation ID将被添加在异步操作          RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);            return doInvoke(invocation, invokers, loadbalance);      }

进入AbstractDirectory

@Override      public List<Invoker<T>> list(Invocation invocation) throws RpcException {          // 判断Directory是否销毁          if (destroyed) {              throw new RpcException("Directory already destroyed .url: " + getUrl());          }          // 从methodInvokerMap中取出满足条件的invoker          List<Invoker<T>> invokers = doList(invocation);            // 根据路由列表,筛选出满足条件的invoker          List<Router> localRouters = this.routers; // local reference          if (localRouters != null && !localRouters.isEmpty()) {              for (Router router : localRouters) {                  try {                      if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {                          invokers = router.route(invokers, getConsumerUrl(), invocation);                      }                  } catch (Throwable t) {                      logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);                  }              }          }          return invokers;      }

这里主要是从Directory中获取invoker,并且经过router路由的筛选,获得满足条件的invoker。 在AbstractDirectory中,有一个关键的方法com.alibaba.dubbo.rpc.cluster.directory.AbstractDirectory#doList,这是一个抽象方法,子类RegistryDirectory的有具体实现,并且调用RegistryDirectory的doList方法。(这里应该是用到了模板方法模式)。后面的文字中会详细讲下doList方法中做了啥。

进入FailoverClusterInvoker

经过从Directory中获取invoker,然后router筛选出满足条件的invoker之后,进入到FailoverClusterInvoker。为什么会到这里呢?

根据官网的描述: 在集群调用失败时,Dubbo 提供了多种容错方案,缺省为 failover 重试。 所以这个时候是到了FailoverClusterInvoker类,但是如果你配置的是Failfast Cluster(快速失败),Failsafe Cluster(失败安全),Failback Cluster(失败自动恢复),Forking Cluster(并行调用多个服务器,只要一个成功即返回),Broadcast Cluster(广播调用所有提供者,逐个调用,任意一台报错则报错),他也会到达相应的类。

@Override      @SuppressWarnings({"unchecked", "rawtypes"})      public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {          // 局部引用          List<Invoker<T>> copyinvokers = invokers;            // 参数校验(这种封装方法我在工作中借鉴,个人感觉比较好)          checkInvokers(copyinvokers, invocation);            // 获取方法名称          String methodName = RpcUtils.getMethodName(invocation);            // 获取重试次数          int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;          if (len <= 0) {              // 最少要调用1次              len = 1;          }            // 局部引用          RpcException le = null;          List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.          Set<String> providers = new HashSet<String>(len);            // i < len 作为循环条件,说明len是多少就循环多少次          for (int i = 0; i < len; i++) {              // 在重试之前,需要重新选择,以避免候选invoker的改变              if (i > 0) {                  // 检查invoker是否被销毁                  checkWhetherDestroyed();                  // 重新选择invoker                  copyinvokers = list(invocation);                  // 参数检查                  checkInvokers(copyinvokers, invocation);              }                /*               * 这一步就是进入loadBalance负载均衡               * 因为上述步骤可能筛选出invoker数量大于1,所以再次经过loadBalance的筛选               */              Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);              invoked.add(invoker);                RpcContext.getContext().setInvokers((List) invoked);              try {                  // 远程方法调用                  Result result = invoker.invoke(invocation);                  if (le != null && logger.isWarnEnabled()) {                      logger.warn("Although retry the method " + methodName                              + " in the service " + getInterface().getName()                              + " was successful by the provider " + invoker.getUrl().getAddress()                              + ", but there have been failed providers " + providers                              + " (" + providers.size() + "/" + copyinvokers.size()                              + ") from the registry " + directory.getUrl().getAddress()                              + " on the consumer " + NetUtils.getLocalHost()                              + " using the dubbo version " + Version.getVersion() + ". Last error is: "                              + le.getMessage(), le);                  }                  return result;              } catch (RpcException e) {                  if (e.isBiz()) { // biz exception.                      throw e;                  }                  le = e;              } catch (Throwable e) {                  le = new RpcException(e.getMessage(), e);              } finally {                  providers.add(invoker.getUrl().getAddress());              }          }          throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "                  + methodName + " in the service " + getInterface().getName()                  + ". Tried " + len + " times of the providers " + providers                  + " (" + providers.size() + "/" + copyinvokers.size()                  + ") from the registry " + directory.getUrl().getAddress()                  + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "                  + Version.getVersion() + ". Last error is: "                  + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);      }

到达终点站.我们回忆总结一下,文初提到的三个关键词,在这个集群容错的整体架构过程中,dubbo究竟做了什么.其实也就是三件事

(1)在Directory中找出本次集群中的全部invokers (2)在Router中,将上一步的全部invokers挑选出能正常执行的invokers (3)在LoadBalance中,将上一步的能正常的执行invokers中,根据配置的负载均衡策略,挑选出需要执行的invoker

然后就是编码、网络传输、提供方处理数据的过程。后面会补充。