5.源码分析—SOFARPC调用服务

  • 2019 年 10 月 3 日
  • 筆記

我们这一次来接着上一篇文章《4. 源码分析—SOFARPC服务端暴露》讲一下服务暴露之后被客户端调用之后服务端是怎么返回数据的。

示例我们还是和上篇文章一样使用一样的bolt协议来讲:

    public static void main(String[] args) {          ServerConfig serverConfig = new ServerConfig()                  .setProtocol("bolt") // 设置一个协议,默认bolt                  .setPort(12200) // 设置一个端口,默认12200                  .setDaemon(false); // 非守护线程            ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()              .setInterfaceId(HelloService.class.getName()) // 指定接口              .setRef(new HelloServiceImpl()) // 指定实现              .setServer(serverConfig); // 指定服务端            providerConfig.export(); // 发布服务      }

在Bolt协议下面,当服务端被调用的时候一个服务的流程如下所示:
BoltServerProcessor->FilterChain->ProviderExceptionFilter->FilterInvoker->RpcServiceContextFilter->FilterInvoker->ProviderBaggageFilter->FilterInvoker->ProviderTracerFilter->ProviderInvoker

BoltServerProcessor#handleRequest

@Override  public void handleRequest(BizContext bizCtx, AsyncContext asyncCtx, SofaRequest request) {      // RPC内置上下文      RpcInternalContext context = RpcInternalContext.getContext();      context.setProviderSide(true);        String appName = request.getTargetAppName();      if (appName == null) {          // 默认全局appName          appName = (String) RpcRuntimeContext.get(RpcRuntimeContext.KEY_APPNAME);      }        // 是否链路异步化中      boolean isAsyncChain = false;      try { // 这个 try-finally 为了保证Context一定被清理          processingCount.incrementAndGet(); // 统计值加1            context.setRemoteAddress(bizCtx.getRemoteHost(), bizCtx.getRemotePort()); // 远程地址          context.setAttachment(RpcConstants.HIDDEN_KEY_ASYNC_CONTEXT, asyncCtx); // 远程返回的通道            if (RpcInternalContext.isAttachmentEnable()) {              InvokeContext boltInvokeCtx = bizCtx.getInvokeContext();              if (boltInvokeCtx != null) {                  putToContextIfNotNull(boltInvokeCtx, InvokeContext.BOLT_PROCESS_WAIT_TIME,                      context, RpcConstants.INTERNAL_KEY_PROCESS_WAIT_TIME); // rpc线程池等待时间 Long              }          }          if (EventBus.isEnable(ServerReceiveEvent.class)) {              EventBus.post(new ServerReceiveEvent(request));          }            // 开始处理          SofaResponse response = null; // 响应,用于返回          Throwable throwable = null; // 异常,用于记录          ProviderConfig providerConfig = null;          String serviceName = request.getTargetServiceUniqueName();            try { // 这个try-catch 保证一定有Response              invoke:              {                  if (!boltServer.isStarted()) { // 服务端已关闭                      throwable = new SofaRpcException(RpcErrorType.SERVER_CLOSED, LogCodes.getLog(                          LogCodes.WARN_PROVIDER_STOPPED, SystemInfo.getLocalHost() + ":" +                              boltServer.serverConfig.getPort()));                      response = MessageBuilder.buildSofaErrorResponse(throwable.getMessage());                      break invoke;                  }                  if (bizCtx.isRequestTimeout()) { // 加上丢弃超时的请求的逻辑                      throwable = clientTimeoutWhenReceiveRequest(appName, serviceName, bizCtx.getRemoteAddress());                      break invoke;                  }                  // 查找服务                  //在server.registerProcessor方法中设置 ProviderProxyInvoker                  Invoker invoker = boltServer.findInvoker(serviceName);                  if (invoker == null) {                      throwable = cannotFoundService(appName, serviceName);                      response = MessageBuilder.buildSofaErrorResponse(throwable.getMessage());                      break invoke;                  }                  if (invoker instanceof ProviderProxyInvoker) {                      providerConfig = ((ProviderProxyInvoker) invoker).getProviderConfig();                      // 找到服务后,打印服务的appName                      appName = providerConfig != null ? providerConfig.getAppName() : null;                  }                  // 查找方法                  String methodName = request.getMethodName();                  //在server.registerProcessor方法中设置                  Method serviceMethod = ReflectCache.getOverloadMethodCache(serviceName, methodName,                      request.getMethodArgSigs());                  if (serviceMethod == null) {                      throwable = cannotFoundServiceMethod(appName, methodName, serviceName);                      response = MessageBuilder.buildSofaErrorResponse(throwable.getMessage());                      break invoke;                  } else {                      request.setMethod(serviceMethod);                  }                    // 真正调用                  response = doInvoke(serviceName, invoker, request);                    if (bizCtx.isRequestTimeout()) { // 加上丢弃超时的响应的逻辑                      throwable = clientTimeoutWhenSendResponse(appName, serviceName, bizCtx.getRemoteAddress());                      break invoke;                  }              }          } catch (Exception e) {              // 服务端异常,不管是啥异常              LOGGER.errorWithApp(appName, "Server Processor Error!", e);              throwable = e;              response = MessageBuilder.buildSofaErrorResponse(e.getMessage());          }            // Response不为空,代表需要返回给客户端          if (response != null) {              RpcInvokeContext invokeContext = RpcInvokeContext.peekContext();              isAsyncChain = CommonUtils.isTrue(invokeContext != null ?                  (Boolean) invokeContext.remove(RemotingConstants.INVOKE_CTX_IS_ASYNC_CHAIN) : null);              // 如果是服务端异步代理模式,特殊处理,因为该模式是在业务代码自主异步返回的              if (!isAsyncChain) {                  // 其它正常请求                  try { // 这个try-catch 保证一定要记录tracer                      asyncCtx.sendResponse(response);                  } finally {                      if (EventBus.isEnable(ServerSendEvent.class)) {                          EventBus.post(new ServerSendEvent(request, response, throwable));                      }                  }              }          }      } catch (Throwable e) {          // 可能有返回时的异常          if (LOGGER.isErrorEnabled(appName)) {              LOGGER.errorWithApp(appName, e.getMessage(), e);          }      } finally {          processingCount.decrementAndGet();          if (!isAsyncChain) {              if (EventBus.isEnable(ServerEndHandleEvent.class)) {                  EventBus.post(new ServerEndHandleEvent());              }          }          RpcInvokeContext.removeContext();          RpcInternalContext.removeAllContext();      }  }

这个方法主要做了如下几件事:

  1. 设置上下文参数
  2. 从缓存中得到服务暴露时设置的invoker
  3. 为request设置method参数
  4. 调用doInvoke返回response
  5. 将response返回给客户端

BoltServerProcessor#doInvoke

我们直接进入到doInvoke方法中,看是如何生成response对象的。

private SofaResponse doInvoke(String serviceName, Invoker invoker, SofaRequest request) throws SofaRpcException {      // 开始调用,先记下当前的ClassLoader      ClassLoader rpcCl = Thread.currentThread().getContextClassLoader();      try {          // 切换线程的ClassLoader到 服务 自己的ClassLoader          ClassLoader serviceCl = ReflectCache.getServiceClassLoader(serviceName);          Thread.currentThread().setContextClassLoader(serviceCl);          return invoker.invoke(request);      } finally {          Thread.currentThread().setContextClassLoader(rpcCl);      }  }

这里主要是为了获取缓存里面加载被暴露服务的类加载器,这样可以防止不同的类加载器之间一个类被加载多次。

然后调用过滤器链,最后进入到ProviderInvoker中

ProviderInvoker#invoke

@Override  public SofaResponse invoke(SofaRequest request) throws SofaRpcException {      SofaResponse sofaResponse = new SofaResponse();      long startTime = RpcRuntimeContext.now();      try {          // 反射 真正调用业务代码          Method method = request.getMethod();          if (method == null) {              throw new SofaRpcException(RpcErrorType.SERVER_FILTER, "Need decode method first!");          }          Object result = method.invoke(providerConfig.getRef(), request.getMethodArgs());            sofaResponse.setAppResponse(result);      } catch (IllegalArgumentException e) { // 非法参数,可能是实现类和接口类不对应)          sofaResponse.setErrorMsg(e.getMessage());      } catch (IllegalAccessException e) { // 如果此 Method 对象强制执行 Java 语言访问控制,并且底层方法是不可访问的          sofaResponse.setErrorMsg(e.getMessage());      } catch (InvocationTargetException e) { // 业务代码抛出异常          cutCause(e.getCause());          sofaResponse.setAppResponse(e.getCause());      } finally {          if (RpcInternalContext.isAttachmentEnable()) {              long endTime = RpcRuntimeContext.now();              RpcInternalContext.getContext().setAttachment(RpcConstants.INTERNAL_KEY_IMPL_ELAPSE,                  endTime - startTime);          }      }        return sofaResponse;  }

到最后我们发现,服务端会通过反射调用被暴露服务的方法,封装成Response类返回。

我们再次回到BoltServerProcessor#handleRequest方法中

....//忽略其他内容  // Response不为空,代表需要返回给客户端  if (response != null) {      RpcInvokeContext invokeContext = RpcInvokeContext.peekContext();      isAsyncChain = CommonUtils.isTrue(invokeContext != null ?          (Boolean) invokeContext.remove(RemotingConstants.INVOKE_CTX_IS_ASYNC_CHAIN) : null);      // 如果是服务端异步代理模式,特殊处理,因为该模式是在业务代码自主异步返回的      if (!isAsyncChain) {          // 其它正常请求          try { // 这个try-catch 保证一定要记录tracer              asyncCtx.sendResponse(response);          } finally {              if (EventBus.isEnable(ServerSendEvent.class)) {                  EventBus.post(new ServerSendEvent(request, response, throwable));              }          }      }  }  ....//忽略其他内容

最后我们的response实例会使用netty传给客户端。