11.源码分析—SOFARPC数据透传是实现的?

  • 2019 年 10 月 3 日
  • 筆記

SOFARPC源码解析系列:

1. 源码分析—SOFARPC可扩展的机制SPI

2. 源码分析—SOFARPC客户端服务引用

3. 源码分析—SOFARPC客户端服务调用

4. 源码分析—SOFARPC服务端暴露

5.源码分析—SOFARPC调用服务

6.源码分析—和dubbo相比SOFARPC是如何实现负载均衡的?

7.源码分析—SOFARPC是如何实现连接管理与心跳?

8.源码分析—从设计模式中看SOFARPC中的EventBus?

9.源码分析—SOFARPC是如何实现故障剔除的?

10.源码分析—SOFARPC内置链路追踪SOFATRACER是怎么做的?


先把栗子放上,让大家方便测试用:
Service端

public static void main(String[] args) {      ServerConfig serverConfig = new ServerConfig()          .setProtocol("bolt") // 设置一个协议,默认bolt          .setPort(12200) // 设置一个端口,默认12200          .setDaemon(false); // 非守护线程        ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()          .setInterfaceId(HelloService.class.getName()) // 指定接口          .setRef(new HelloServiceImpl()) // 指定实现          .setServer(serverConfig); // 指定服务端        providerConfig.export(); // 发布服务  }    public class HelloServiceImpl implements HelloService {        private final static Logger LOGGER = LoggerFactory.getLogger(HelloServiceImpl.class);        @Override      public String sayHello(String string) {          LOGGER.info("Server receive: " + string);            // 获取请求透传数据并打印          System.out.println("service receive reqBag -> " + RpcInvokeContext.getContext().getRequestBaggage("req_bag"));          // 设置响应透传数据到当前线程的上下文中          RpcInvokeContext.getContext().putResponseBaggage("req_bag", "s2c");            return "hello " + string + " !";      }  }  

client端

public static void main(String[] args) {      ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()          .setInterfaceId(HelloService.class.getName()) // 指定接口          .setProtocol("bolt") // 指定协议          .setDirectUrl("bolt://127.0.0.1:12200") // 指定直连地址          .setConnectTimeout(10 * 1000);        RpcInvokeContext.getContext().putRequestBaggage("req_bag", "a2bbb");        HelloService helloService = consumerConfig.refer();        while (true) {          System.out.println("service receive reqBag -> " + RpcInvokeContext.getContext().getResponseBaggage("req_bag"));          try {              LOGGER.info(helloService.sayHello("world"));          } catch (Exception e) {              e.printStackTrace();          }            try {              Thread.sleep(2000);          } catch (InterruptedException e) {              e.printStackTrace();          }        }  }  

通过上面的栗子我们可以看出整个流程应该是:

  1. 客户端把需要透传的数据放入到requestBaggage中,然后调用服务端
  2. 服务端在HelloServiceImpl中获取请求透传数据并打印,并把响应数据放入到responseBaggage中
  3. 客户端收到透传数据

所以下面我们从客户端开始源码讲解。

客户端数据透传给服务端

首先客户端在引用之前要设置putRequestBaggage,然后在客户端引用的时候会调用ClientProxyInvoker#invoke方法。

如下:
ClientProxyInvoker#invoke

public SofaResponse invoke(SofaRequest request) throws SofaRpcException {      ....          // 包装请求          decorateRequest(request);         ....  }  

通过调用decorateRequest会调用到子类DefaultClientProxyInvoker的decorateRequest方法。

DefaultClientProxyInvoker#decorateRequest

protected void decorateRequest(SofaRequest request) {      ....      RpcInvokeContext invokeCtx = RpcInvokeContext.peekContext();      RpcInternalContext internalContext = RpcInternalContext.getContext();      if (invokeCtx != null) {         ....          // 如果用户指定了透传数据          if (RpcInvokeContext.isBaggageEnable()) {              // 需要透传              BaggageResolver.carryWithRequest(invokeCtx, request);              internalContext.setAttachment(HIDDEN_KEY_INVOKE_CONTEXT, invokeCtx);          }      }      ....  } 

在decorateRequest方法里首先会校验有没有开启透传数据,如果开启了,那么就调用BaggageResolver#carryWithRequest,把要透传的数据放入到request里面

BaggageResolver#carryWithRequest

public static void carryWithRequest(RpcInvokeContext context, SofaRequest request) {      if (context != null) {            //获取所有的透传数据          Map<String, String> requestBaggage = context.getAllRequestBaggage();          if (CommonUtils.isNotEmpty(requestBaggage)) { // 需要透传              request.addRequestProp(RemotingConstants.RPC_REQUEST_BAGGAGE, requestBaggage);          }      }  }  

这个方法里面要做的就是获取所有的透传数据,然后放置到RequestProp里面,这样在发送请求的时候就会传送到服务端。

服务端接受透传数据

服务端的调用流程如下:

BoltServerProcessor->FilterChain->ProviderExceptionFilter->FilterInvoker->RpcServiceContextFilter->FilterInvoker->ProviderBaggageFilter->FilterInvoker->ProviderTracerFilter->ProviderInvoker 

所以从上面的调用链可以知道,在服务端引用的时候会经过ProviderBaggageFilter过滤器,我们下面看看这个过滤器做了什么事情:

ProviderBaggageFilter#invoke

public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {      SofaResponse response = null;      try {          //从request中获取透传数据存入到requestBaggage中          BaggageResolver.pickupFromRequest(RpcInvokeContext.peekContext(), request, true);          response = invoker.invoke(request);      } finally {          if (response != null) {              BaggageResolver.carryWithResponse(RpcInvokeContext.peekContext(), response);          }      }      return response;  }  

ProviderBaggageFilter会调用BaggageResolver#pickupFromRequest从request中获取数据

BaggageResolver#pickupFromRequest

public static void pickupFromRequest(RpcInvokeContext context, SofaRequest request, boolean init) {      if (context == null && !init) {          return;      }      // 解析请求      Map<String, String> requestBaggage = (Map<String, String>) request          .getRequestProp(RemotingConstants.RPC_REQUEST_BAGGAGE);      if (CommonUtils.isNotEmpty(requestBaggage)) {          if (context == null) {              context = RpcInvokeContext.getContext();          }          context.putAllRequestBaggage(requestBaggage);      }  }

最后会在ProviderBaggageFilter invoke方法的finally里面调用BaggageResolver#carryWithResponse把响应透传数据回写到response里面。

public static void carryWithResponse(RpcInvokeContext context, SofaResponse response) {      if (context != null) {          Map<String, String> responseBaggage = context.getAllResponseBaggage();          if (CommonUtils.isNotEmpty(responseBaggage)) {              String prefix = RemotingConstants.RPC_RESPONSE_BAGGAGE + ".";              for (Map.Entry<String, String> entry : responseBaggage.entrySet()) {                  response.addResponseProp(prefix + entry.getKey(), entry.getValue());              }          }      }  }

客户端收到响应透传数据

最后客户端会在ClientProxyInvoker#invoke方法里调用decorateResponse获取response回写的数据。

public SofaResponse invoke(SofaRequest request) throws SofaRpcException {          ....       // 包装响应       decorateResponse(response);          ....  }  

decorateResponse是在子类DefaultClientProxyInvoker实现的:

DefaultClientProxyInvoker#decorateResponse

protected void decorateResponse(SofaResponse response) {     ....      //如果开启了透传      if (RpcInvokeContext.isBaggageEnable()) {          BaggageResolver.pickupFromResponse(invokeCtx, response, true);      }     ....  }  

这个方法里面会调用BaggageResolver#pickupFromResponse

public static void pickupFromResponse(RpcInvokeContext context, SofaResponse response, boolean init) {      if (context == null && !init) {          return;      }      Map<String, String> responseBaggage = response.getResponseProps();      if (CommonUtils.isNotEmpty(responseBaggage)) {          String prefix = RemotingConstants.RPC_RESPONSE_BAGGAGE + ".";          for (Map.Entry<String, String> entry : responseBaggage.entrySet()) {              if (entry.getKey().startsWith(prefix)) {                  if (context == null) {                      context = RpcInvokeContext.getContext();                  }                  //因为entry的key里面会包含rpc_resp_baggage,所以需要截取掉                  context.putResponseBaggage(entry.getKey().substring(prefix.length()),                      entry.getValue());              }          }      }  }  

这个方法里面response获取所有的透传数据,然后放入到ResponseBaggage中。

到这里SOFARPC数据透传就分析完毕了