11.源码分析—SOFARPC数据透传是实现的?
- 2019 年 10 月 3 日
- 筆記
SOFARPC源码解析系列:
6.源码分析—和dubbo相比SOFARPC是如何实现负载均衡的?
8.源码分析—从设计模式中看SOFARPC中的EventBus?
10.源码分析—SOFARPC内置链路追踪SOFATRACER是怎么做的?
先把栗子放上,让大家方便测试用:
Service端
public static void main(String[] args) { ServerConfig serverConfig = new ServerConfig() .setProtocol("bolt") // 设置一个协议,默认bolt .setPort(12200) // 设置一个端口,默认12200 .setDaemon(false); // 非守护线程 ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>() .setInterfaceId(HelloService.class.getName()) // 指定接口 .setRef(new HelloServiceImpl()) // 指定实现 .setServer(serverConfig); // 指定服务端 providerConfig.export(); // 发布服务 } public class HelloServiceImpl implements HelloService { private final static Logger LOGGER = LoggerFactory.getLogger(HelloServiceImpl.class); @Override public String sayHello(String string) { LOGGER.info("Server receive: " + string); // 获取请求透传数据并打印 System.out.println("service receive reqBag -> " + RpcInvokeContext.getContext().getRequestBaggage("req_bag")); // 设置响应透传数据到当前线程的上下文中 RpcInvokeContext.getContext().putResponseBaggage("req_bag", "s2c"); return "hello " + string + " !"; } }
client端
public static void main(String[] args) { ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>() .setInterfaceId(HelloService.class.getName()) // 指定接口 .setProtocol("bolt") // 指定协议 .setDirectUrl("bolt://127.0.0.1:12200") // 指定直连地址 .setConnectTimeout(10 * 1000); RpcInvokeContext.getContext().putRequestBaggage("req_bag", "a2bbb"); HelloService helloService = consumerConfig.refer(); while (true) { System.out.println("service receive reqBag -> " + RpcInvokeContext.getContext().getResponseBaggage("req_bag")); try { LOGGER.info(helloService.sayHello("world")); } catch (Exception e) { e.printStackTrace(); } try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }
通过上面的栗子我们可以看出整个流程应该是:
- 客户端把需要透传的数据放入到requestBaggage中,然后调用服务端
- 服务端在HelloServiceImpl中获取请求透传数据并打印,并把响应数据放入到responseBaggage中
- 客户端收到透传数据
所以下面我们从客户端开始源码讲解。
客户端数据透传给服务端
首先客户端在引用之前要设置putRequestBaggage
,然后在客户端引用的时候会调用ClientProxyInvoker#invoke方法。
如下:
ClientProxyInvoker#invoke
public SofaResponse invoke(SofaRequest request) throws SofaRpcException { .... // 包装请求 decorateRequest(request); .... }
通过调用decorateRequest会调用到子类DefaultClientProxyInvoker的decorateRequest方法。
DefaultClientProxyInvoker#decorateRequest
protected void decorateRequest(SofaRequest request) { .... RpcInvokeContext invokeCtx = RpcInvokeContext.peekContext(); RpcInternalContext internalContext = RpcInternalContext.getContext(); if (invokeCtx != null) { .... // 如果用户指定了透传数据 if (RpcInvokeContext.isBaggageEnable()) { // 需要透传 BaggageResolver.carryWithRequest(invokeCtx, request); internalContext.setAttachment(HIDDEN_KEY_INVOKE_CONTEXT, invokeCtx); } } .... }
在decorateRequest方法里首先会校验有没有开启透传数据,如果开启了,那么就调用BaggageResolver#carryWithRequest,把要透传的数据放入到request里面
BaggageResolver#carryWithRequest
public static void carryWithRequest(RpcInvokeContext context, SofaRequest request) { if (context != null) { //获取所有的透传数据 Map<String, String> requestBaggage = context.getAllRequestBaggage(); if (CommonUtils.isNotEmpty(requestBaggage)) { // 需要透传 request.addRequestProp(RemotingConstants.RPC_REQUEST_BAGGAGE, requestBaggage); } } }
这个方法里面要做的就是获取所有的透传数据,然后放置到RequestProp里面,这样在发送请求的时候就会传送到服务端。
服务端接受透传数据
服务端的调用流程如下:
BoltServerProcessor->FilterChain->ProviderExceptionFilter->FilterInvoker->RpcServiceContextFilter->FilterInvoker->ProviderBaggageFilter->FilterInvoker->ProviderTracerFilter->ProviderInvoker
所以从上面的调用链可以知道,在服务端引用的时候会经过ProviderBaggageFilter过滤器,我们下面看看这个过滤器做了什么事情:
ProviderBaggageFilter#invoke
public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException { SofaResponse response = null; try { //从request中获取透传数据存入到requestBaggage中 BaggageResolver.pickupFromRequest(RpcInvokeContext.peekContext(), request, true); response = invoker.invoke(request); } finally { if (response != null) { BaggageResolver.carryWithResponse(RpcInvokeContext.peekContext(), response); } } return response; }
ProviderBaggageFilter会调用BaggageResolver#pickupFromRequest
从request中获取数据
BaggageResolver#pickupFromRequest
public static void pickupFromRequest(RpcInvokeContext context, SofaRequest request, boolean init) { if (context == null && !init) { return; } // 解析请求 Map<String, String> requestBaggage = (Map<String, String>) request .getRequestProp(RemotingConstants.RPC_REQUEST_BAGGAGE); if (CommonUtils.isNotEmpty(requestBaggage)) { if (context == null) { context = RpcInvokeContext.getContext(); } context.putAllRequestBaggage(requestBaggage); } }
最后会在ProviderBaggageFilter invoke方法的finally里面调用BaggageResolver#carryWithResponse
把响应透传数据回写到response里面。
public static void carryWithResponse(RpcInvokeContext context, SofaResponse response) { if (context != null) { Map<String, String> responseBaggage = context.getAllResponseBaggage(); if (CommonUtils.isNotEmpty(responseBaggage)) { String prefix = RemotingConstants.RPC_RESPONSE_BAGGAGE + "."; for (Map.Entry<String, String> entry : responseBaggage.entrySet()) { response.addResponseProp(prefix + entry.getKey(), entry.getValue()); } } } }
客户端收到响应透传数据
最后客户端会在ClientProxyInvoker#invoke方法里调用decorateResponse获取response回写的数据。
public SofaResponse invoke(SofaRequest request) throws SofaRpcException { .... // 包装响应 decorateResponse(response); .... }
decorateResponse是在子类DefaultClientProxyInvoker实现的:
DefaultClientProxyInvoker#decorateResponse
protected void decorateResponse(SofaResponse response) { .... //如果开启了透传 if (RpcInvokeContext.isBaggageEnable()) { BaggageResolver.pickupFromResponse(invokeCtx, response, true); } .... }
这个方法里面会调用BaggageResolver#pickupFromResponse
public static void pickupFromResponse(RpcInvokeContext context, SofaResponse response, boolean init) { if (context == null && !init) { return; } Map<String, String> responseBaggage = response.getResponseProps(); if (CommonUtils.isNotEmpty(responseBaggage)) { String prefix = RemotingConstants.RPC_RESPONSE_BAGGAGE + "."; for (Map.Entry<String, String> entry : responseBaggage.entrySet()) { if (entry.getKey().startsWith(prefix)) { if (context == null) { context = RpcInvokeContext.getContext(); } //因为entry的key里面会包含rpc_resp_baggage,所以需要截取掉 context.putResponseBaggage(entry.getKey().substring(prefix.length()), entry.getValue()); } } } }
这个方法里面response获取所有的透传数据,然后放入到ResponseBaggage中。
到这里SOFARPC数据透传就分析完毕了