­

万万没想到一个xxl-job源码分析,竟然能引发这么多血案!(上)

  • 2020 年 3 月 27 日
  • 筆記

前言

XxlJob 2.0.1版本源码,其实在去年这个时候我已经看完了,并且做了很详细的注释。但是由于自己太懒了,没有写成博客分享。俗话说好记性不如烂笔头,于是乎我挑出几个源码实现中我认为不错的知识点并且结合自己的见解,来分享一波。


源码思考

  • 1.executor端是开启一个jettyServer,其中配置了JettyServerHandler。将请求交给XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);

详细可以看JettyServerHandler中的handle函数

    @Override      public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {            if ("/services".equals(target)) {   // services mapping                StringBuffer stringBuffer = new StringBuffer("<ui>");              for (String serviceKey: xxlRpcProviderFactory.getServiceData().keySet()) {                  stringBuffer.append("<li>").append(serviceKey).append(": ").append(xxlRpcProviderFactory.getServiceData().get(serviceKey)).append("</li>");              }              stringBuffer.append("</ui>");                writeResponse(baseRequest, response, stringBuffer.toString().getBytes());              return;          } else {    // default remoting mapping                // request parse              XxlRpcRequest xxlRpcRequest = null;              try {                    xxlRpcRequest = parseRequest(request);              } catch (Exception e) {                  writeResponse(baseRequest, response, ThrowableUtil.toString(e).getBytes());                  return;              }                // invoke              XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);                // response-serialize + response-write              byte[] responseBytes = xxlRpcProviderFactory.getSerializer().serialize(xxlRpcResponse);              writeResponse(baseRequest, response, responseBytes);          }        }

executor端通过上文中xxlRpcProviderFactory.invokeService(xxlRpcRequest);函数执行本地暴露的服务方法

    /**       * invoke service       *       * @param xxlRpcRequest       * @return       */      public XxlRpcResponse invokeService(XxlRpcRequest xxlRpcRequest) {            //  make response          XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();          xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());            // match service bean          String serviceKey = makeServiceKey(xxlRpcRequest.getClassName(), xxlRpcRequest.getVersion());          Object serviceBean = serviceData.get(serviceKey);            // valid          if (serviceBean == null) {              xxlRpcResponse.setErrorMsg("The serviceKey["+ serviceKey +"] not found.");              return xxlRpcResponse;          }            if (System.currentTimeMillis() - xxlRpcRequest.getCreateMillisTime() > 3*60*1000) {              xxlRpcResponse.setErrorMsg("The timestamp difference between admin and executor exceeds the limit.");              return xxlRpcResponse;          }          if (accessToken!=null && accessToken.trim().length()>0 && !accessToken.trim().equals(xxlRpcRequest.getAccessToken())) {              xxlRpcResponse.setErrorMsg("The access token[" + xxlRpcRequest.getAccessToken() + "] is wrong.");              return xxlRpcResponse;          }            // invoke          try {              Class<?> serviceClass = serviceBean.getClass();              String methodName = xxlRpcRequest.getMethodName();              Class<?>[] parameterTypes = xxlRpcRequest.getParameterTypes();              Object[] parameters = xxlRpcRequest.getParameters();                Method method = serviceClass.getMethod(methodName, parameterTypes);              method.setAccessible(true);              Object result = method.invoke(serviceBean, parameters);                /*FastClass serviceFastClass = FastClass.create(serviceClass);              FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);              Object result = serviceFastMethod.invoke(serviceBean, parameters);*/                xxlRpcResponse.setResult(result);          } catch (Throwable t) {              logger.error("xxl-rpc provider invokeService error.", t);              xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(t));          }            return xxlRpcResponse;      }

executor端服务提供者维护在serviceData中, serviceDatamap结构。key是服务提供者className拼接版本号, value则是服务提供者本身实例。

    // ---------------------- server invoke ----------------------        /**       * init local rpc service map       */      private Map<String, Object> serviceData = new HashMap<String, Object>();      public Map<String, Object> getServiceData() {          return serviceData;      }

admin端其实是没有开启jettyServer服务器,可以通过XxlJobDynamicSchedulerinitRpcProvider()一看究竟。

    // ---------------------- init + destroy ----------------------      public void start() throws Exception {          // valid          Assert.notNull(scheduler, "quartz scheduler is null");            // init i18n          initI18n();            // admin registry monitor run          /*           启动自动注册线程,获取类型为自动注册的执行器信息,完成机器的自动注册与发现           */          JobRegistryMonitorHelper.getInstance().start();            // admin monitor run          /**           * 启动失败日志监控线程           */          JobFailMonitorHelper.getInstance().start();            // admin-server          /**           * 暴露AdminBiz服务,并设置jettyServerHandler           */          initRpcProvider();            logger.info(">>>>>>>>> init xxl-job admin success.");      }
    // ---------------------- admin rpc provider (no server version) ----------------------      private static JettyServerHandler jettyServerHandler;      private void initRpcProvider(){          // init          XxlRpcProviderFactory xxlRpcProviderFactory = new XxlRpcProviderFactory();          xxlRpcProviderFactory.initConfig(NetEnum.JETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), null, 0, XxlJobAdminConfig.getAdminConfig().getAccessToken(), null, null);            // add services          xxlRpcProviderFactory.addService(AdminBiz.class.getName(), null, XxlJobAdminConfig.getAdminConfig().getAdminBiz());            // jetty handler          jettyServerHandler = new JettyServerHandler(xxlRpcProviderFactory);      }      private void stopRpcProvider() throws Exception {          new XxlRpcInvokerFactory().stop();      }      public static void invokeAdminService(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {          jettyServerHandler.handle(null, new Request(null, null), request, response);      }

admin端通过API服务暴露出自己的服务。比如执行器注册服务,任务结果回调服务。admin端服务暴露都是通过JobApiController实现的,来达到和executor端类似的效果,请求交给JettyServerHandler处理, 然后通过xxlRpcProviderFacotry调用本地方法。

@Controller  public class JobApiController implements InitializingBean {          @Override      public void afterPropertiesSet() throws Exception {        }        @RequestMapping(AdminBiz.MAPPING)      @PermessionLimit(limit=false)      public void api(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {          XxlJobDynamicScheduler.invokeAdminService(request, response);      }  }

XxlRpcReferenceBean这个factoryBean中的getObject方法用于创建代理对象, 代理逻辑中过滤掉了非业务方法,也就是Object类中的方法。只是将目标类的方法名,参数,类名等信息包装成XxlRpcRequest,通过JettyClient发送给调度中心。调度中心的接口地址为"admin端的ip/api"。调度中心的API接口拿到请求之后通过参数里面的类名,方法,参数,版本号等信息反射出来一个服务实例,调用invoke执行方法,也就是上文中提到的JobApiController

每个XxlRpcReferenceBean对象中都会初始化一个JettyClient对象。感觉这样做有点耗性能,需要优化啊。毕竟创建一个JettyClient对象开销并不小,也许你使用操作不恰当会造成OOM。之前有一位朋友就是对每一个请求都创建了一个HttpClient,这样由于创建每一个HttpClient实例的时候都会调用evictExpireConnections,造成有多少请求就会创建多少个定时线程,最后造成系统OOM。所以建议这里最好采用单例或者将JettyClient缓存起来。

    // ---------------------- initClient ----------------------        Client client = null;        private void initClient() {          try {              client = netType.clientClass.newInstance();              client.init(this);          } catch (InstantiationException | IllegalAccessException e) {              throw new XxlRpcException(e);          }      }

创建发起RPC请求的代理对象

public Object getObject() {          return Proxy.newProxyInstance(Thread.currentThread()                  .getContextClassLoader(), new Class[] { iface },                  new InvocationHandler() {                      @Override                      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {                          String className = method.getDeclaringClass().getName();                            // filter method like "Object.toString()"                          if (Object.class.getName().equals(className)) {                              logger.info(">>>>>>>>>>> xxl-rpc proxy class-method not support [{}.{}]", className, method.getName());                              throw new XxlRpcException("xxl-rpc proxy class-method not support");                          }                            // address                          String address = routeAddress();                          if (address==null || address.trim().length()==0) {                              throw new XxlRpcException("xxl-rpc reference bean["+ className +"] address empty");                          }                            // request                          XxlRpcRequest xxlRpcRequest = new XxlRpcRequest();                          xxlRpcRequest.setRequestId(UUID.randomUUID().toString());                          xxlRpcRequest.setCreateMillisTime(System.currentTimeMillis());                          xxlRpcRequest.setAccessToken(accessToken);                          xxlRpcRequest.setClassName(className);                          xxlRpcRequest.setMethodName(method.getName());                          xxlRpcRequest.setParameterTypes(method.getParameterTypes());                          xxlRpcRequest.setParameters(args);                            // send                          if (CallType.SYNC == callType) {                              try {                                  // future set                                  XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(xxlRpcRequest, null);                                    // do invoke                                  client.asyncSend(address, xxlRpcRequest);                                    // future get                                  XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);                                  if (xxlRpcResponse.getErrorMsg() != null) {                                      throw new XxlRpcException(xxlRpcResponse.getErrorMsg());                                  }                                  return xxlRpcResponse.getResult();                              } catch (Exception e) {                                  logger.info(">>>>>>>>>>> xxl-job, invoke error, address:{}, XxlRpcRequest{}", address, xxlRpcRequest);                                    throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);                              } finally{                                  // remove-InvokerFuture                                  XxlRpcFutureResponseFactory.removeInvokerFuture(xxlRpcRequest.getRequestId());                              }                          } else if (CallType.FUTURE == callType) {                                // thread future set                              XxlRpcInvokeFuture invokeFuture = null;                              try {                                  // future set                                  invokeFuture = new XxlRpcInvokeFuture(new XxlRpcFutureResponse(xxlRpcRequest, null));                                  XxlRpcInvokeFuture.setFuture(invokeFuture);                                    // do invoke                                  client.asyncSend(address, xxlRpcRequest);                                    return null;                              } catch (Exception e) {                                  logger.info(">>>>>>>>>>> xxl-job, invoke error, address:{}, XxlRpcRequest{}", address, xxlRpcRequest);                                    // remove-InvokerFuture                                  invokeFuture.stop();                                    throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);                              }                            } else if (CallType.CALLBACK == callType) {                                // get callback                              XxlRpcInvokeCallback finalInvokeCallback = invokeCallback;                              XxlRpcInvokeCallback threadInvokeCallback = XxlRpcInvokeCallback.getCallback();                              if (threadInvokeCallback != null) {                                  finalInvokeCallback = threadInvokeCallback;                              }                              if (finalInvokeCallback == null) {                                  throw new XxlRpcException("xxl-rpc XxlRpcInvokeCallback(CallType="+ CallType.CALLBACK.name() +") cannot be null.");                              }                                try {                                  // future set                                  XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(xxlRpcRequest, finalInvokeCallback);                                    client.asyncSend(address, xxlRpcRequest);                              } catch (Exception e) {                                  logger.info(">>>>>>>>>>> xxl-job, invoke error, address:{}, XxlRpcRequest{}", address, xxlRpcRequest);                                    // future remove                                  XxlRpcFutureResponseFactory.removeInvokerFuture(xxlRpcRequest.getRequestId());                                    throw (e instanceof XxlRpcException)?e:new XxlRpcException(e);                              }                                return null;                          } else if (CallType.ONEWAY == callType) {                              client.asyncSend(address, xxlRpcRequest);                              return null;                          } else {                              throw new XxlRpcException("xxl-rpc callType["+ callType +"] invalid");                          }                        }                  });      }

这里只讲下SYNC模式的请求,XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(xxlRpcRequest, null);创建XxlRpcFutureResponse对象(Future任务执行的结果), 这里的invokeCallback回调是null

    public XxlRpcFutureResponse(XxlRpcRequest request, XxlRpcInvokeCallback invokeCallback) {          this.request = request;          this.invokeCallback = invokeCallback;            // set-InvokerFuture          XxlRpcFutureResponseFactory.setInvokerFuture(request.getRequestId(), this);      }

当我们获取Future执行结果时, XxlRpcFutureResponse中的done变量如果是false, 一直阻塞线程(还有超时机制), 除非有调用setResponse(XxlRpcResponse response)方法

使done变量为true, 并获取锁,调用lock.notifyAll(), 唤醒线程,并返回执行结果。

JettyClient中发送请求,调用asyncSend方法

@Override  public void asyncSend(String address, XxlRpcRequest xxlRpcRequest) throws Exception {      // do invoke      postRequestAsync(address, xxlRpcRequest);  }

postRequestAsync方法拿到RPC请求执行结果,我们应该通知future,唤醒等待。

// deserialize response  XxlRpcResponse xxlRpcResponse = (XxlRpcResponse) xxlRpcReferenceBean.getSerializer().deserialize(responseBytes, XxlRpcResponse.class);  // notify response  XxlRpcFutureResponseFactory.notifyInvokerFuture(xxlRpcResponse.getRequestId(), xxlRpcResponse);

在初始化XxlRpcFutureResponse中, 我们有调用setInvokerFuture方法将消息和XxlRpcFutureResponse结果维护起来。

当在JettyClient执行完请求获取结果时, 调用notifyInvokerFuture方法设置XxlRpcFutureResponse中的xxlRpcResponse属性 也就是真实的执行结果。

JettyClient方法请求虽然是异步的, 但是这里还是同步阻塞获取执行结果。

public class XxlRpcFutureResponseFactory {        private static ConcurrentMap<String, XxlRpcFutureResponse> futureResponsePool = new ConcurrentHashMap<String, XxlRpcFutureResponse>();        public static void setInvokerFuture(String requestId, XxlRpcFutureResponse futureResponse){          // TODO,running future method-isolation and limit          futureResponsePool.put(requestId, futureResponse);      }        public static void removeInvokerFuture(String requestId){          futureResponsePool.remove(requestId);      }        public static void notifyInvokerFuture(String requestId, XxlRpcResponse xxlRpcResponse){          XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);          if (futureResponse != null) {              futureResponse.setResponse(xxlRpcResponse);              futureResponsePool.remove(requestId);          }      }    }
  • 2.XxlJob的核心包括XxlRpc,上文大致提到过其简单的流程。如果我们想要实现一个RPC,应该要做些什么呢?比如序列化,压缩算法,协议,动态代理,服务注册,加密,网络编码,连接管理,健康检测,负载均衡,优雅启停机,异常重试,业务分组以及熔断限流等等。由于之前写过一个基于Netty简单的RPC框架,因此可以通过和XxlRpc对比来查漏补缺。(这里不会讲Netty,只会粗略讲一下Rpc代理)

netty-rpc-client端扫描需要代理服务的接口并且修改BeanDefinition初始化的方式。在Spring容器中实例化一个对象的方式有:SupplierFactoryBean,指定FactoryMethodNameFactoryBeanName,Constructor等等。

@Slf4j  public class ClassPathRpcScanner extends ClassPathBeanDefinitionScanner {        private RpcFactoryBean<?> rpcFactoryBean = new RpcFactoryBean<Object>();        private Class<? extends Annotation> annotationClass;        public void setAnnotationClass(Class<? extends Annotation> annotationClass) {          this.annotationClass = annotationClass;      }        public ClassPathRpcScanner(BeanDefinitionRegistry registry) {          super(registry);      }        @Override      protected Set<BeanDefinitionHolder> doScan(String... basePackages) {          Set<BeanDefinitionHolder> beanDefinitions = super.doScan(basePackages);          if (CollectionUtils.isEmpty(beanDefinitions)) {              logger.warn("No RPC mapper was found in '"                  + Arrays.toString(basePackages)                  + "' package. Please check your configuration");          } else {              processBeanDefinitions(beanDefinitions);          }          return beanDefinitions;      }        public void registerFilter() {          boolean acceptAllInterfaces = true;          if (this.annotationClass != null) {              addIncludeFilter(new AnnotationTypeFilter(this.annotationClass));              acceptAllInterfaces = false;          }            if (acceptAllInterfaces) {              addIncludeFilter(new TypeFilter() {                  @Override                  public boolean match(MetadataReader metadataReader, MetadataReaderFactory metadataReaderFactory) throws IOException {                      return true;                  }              });          }            // exclude package-info.java          addExcludeFilter(new TypeFilter() {              @Override              public boolean match(MetadataReader metadataReader, MetadataReaderFactory metadataReaderFactory) throws IOException {                  String className = metadataReader.getClassMetadata().getClassName();                  return className.endsWith("package-info");              }          });      }        private void processBeanDefinitions(Set<BeanDefinitionHolder> beanDefinitionHolders) {          GenericBeanDefinition genericBeanDefinition = null;            for (BeanDefinitionHolder holder : beanDefinitionHolders) {              genericBeanDefinition = (GenericBeanDefinition) holder.getBeanDefinition();              // genericBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue(genericBeanDefinition.getBeanClassName());              // genericBeanDefinition.setBeanClass(this.rpcFactoryBean.getClass());                genericBeanDefinition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);                  /**               * Bean的初始化就可以通过Supplier.get() 和 设置factoryBean.getObject 有异曲同工之妙               */    //            try {  //                genericBeanDefinition.setInstanceSupplier(new RpcSupplier<>(Class.forName(genericBeanDefinition.getBeanClassName())));  //            } catch (Exception ex) {  //                throw new RuntimeException(ex);  //            }                  /**               *               * 指定factoryMethodName,FactoryBeanName               * 当我们能找到无参 就先执行无参方法,最后执行有参的方法。方法的参数来源于ConstructorArgumentValue               *               * 这里设置独一份的。 我们如果设置了FactoryMethodName, 要注入的类型要和Method ReturnType要匹配起来               * 不然会报错。这个customFactoryBean不能设置成泛型               */                /**               * DefaultListableBeanFactory:findAutowireCandidates               * DefaultListableBeanFactory:doGetBeanNamesForType               * AbstractAutowiredCapableBeanFactory:determineTargetType               * AbstractAutowiredCapableBeanFactory:getTypeForFactoryMethod               */              genericBeanDefinition.getConstructorArgumentValues().addGenericArgumentValue(genericBeanDefinition.getBeanClassName());              genericBeanDefinition.setFactoryMethodName("getObject");              genericBeanDefinition.setFactoryBeanName("customFactoryBean");                  // genericBeanDefinition.setFactoryBeanName();              // genericBeanDefinition.setFactoryMethodName();              log.info("ClassPathRpcScanner设置GenericBeanDefinition:{}", genericBeanDefinition);              log.info("ClassPathRpcScanner设置BeanDefinitionHolder:{}", holder);          }      }        @Override      protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {          return beanDefinition.getMetadata().isInterface() && beanDefinition.getMetadata().isIndependent();      }  }

指定FactoryBeanName和FactoryMethodName创建代理对象

public class CustomFactoryBean<T> {        @Autowired      private RpcFactory<T> rpcFactory;        public <T> T getObject(Class<T> rpcInterface) {          return (T) Proxy.newProxyInstance(rpcInterface.getClassLoader(), new Class[] {rpcInterface}, this.rpcFactory);      }          public static void main(String[] args) {          Class factoryClass = ClassUtils.getUserClass(CustomFactoryBean.class);          Method[] candidates = ReflectionUtils.getUniqueDeclaredMethods(factoryClass);          Method getObject = null;          for (Method method : candidates) {              if ("getObject".equalsIgnoreCase(method.getName())) {                  getObject = method;              }          }          System.out.println(getObject);          /**           * 测试泛型           */          System.out.println(getObject.getTypeParameters().length);          System.out.println(getObject.getTypeParameters());          System.out.println(Arrays.asList(getObject.getParameterTypes()));      }      }

FactoryBean创建代理对象

public class RpcFactoryBean<T> implements FactoryBean<T> {        private Class<T> rpcInterface;        @Autowired      private RpcFactory<T> rpcFactory;        public RpcFactoryBean() {        }        public RpcFactoryBean(Class<T> rpcInterface) {          this.rpcInterface = rpcInterface;      }        @Nullable      @Override      public T getObject() throws Exception {          return getRpc();      }        @Nullable      @Override      public Class<?> getObjectType() {          return this.rpcInterface;      }        @Override      public boolean isSingleton() {          return (3&1) == 1;      }        public <T> T getRpc() {          /**           * 这一步不要写成了rpcInterface.getInterfaces()           */          return (T) Proxy.newProxyInstance(rpcInterface.getClassLoader(), new Class[] {this.rpcInterface}, this.rpcFactory);      }        public static void main(String[] args) {          System.out.println(InfoUserService.class.getInterfaces());      }  }

Supplier方式创建代理对象

public class RpcSupplier<T> implements Supplier<T> {        private Class<T> rpcInterface;        public RpcSupplier(Class<T> rpcInterface) {          this.rpcInterface = rpcInterface;      }        /**       * 这里不用注入的方式, 因为RpcSupplier没有被Spring容器托管       */      private RpcFactory<T> rpcFactory = null;        @Override      public T get() {          ApplicationContext context = SpringApplicationContextUtil.getApplicationContext();            if (context != null) {              rpcFactory = context.getBean(RpcFactory.class);          }            return (T) Proxy.newProxyInstance(rpcInterface.getClassLoader(), new Class[] {this.rpcInterface}, this.rpcFactory);      }    }

InvocationHandler

@Component  @Slf4j  public class RpcFactory<T> implements InvocationHandler {        @Autowired(required = false)      private NettyClient nettyClient = new NettyClient();        @Override      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {          Request request = new Request();          request.setClassName(method.getDeclaringClass().getName());          request.setMethodName(method.getName());          request.setParameters(args);          request.setParameterType(method.getParameterTypes());          request.setId(IdUtil.getId());            Object result = nettyClient.send(request);          Class<?> returnType = method.getReturnType();            Response response = JSON.parseObject(result.toString(), Response.class);            if (response.getCode() == 1) {              throw new RuntimeException(response.getErrorMsg());          }            if (returnType.isPrimitive() || String.class.isAssignableFrom(returnType)) {              return response.getData();          } else if (Collection.class.isAssignableFrom(returnType)) {              return JSONArray.parseArray(response.getData().toString(), Object.class);          } else if (Map.class.isAssignableFrom(returnType)) {              return JSON.parseObject(response.getData().toString(), Map.class);          } else {              Object data = response.getData();              if (data != null) {                  return JSON.parseObject(data.toString(), returnType);              } else {                  return null;              }          }        }  }

Rpc扫描配置,指定要扫描的路径和服务接口。

@Component  public class RpcScannerConfigurer implements BeanDefinitionRegistryPostProcessor {        private static final String BASE_PACKAGE = "com.cmazxiaoma.springcloud.netty.service";        @Override      public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {          ClassPathRpcScanner classPathRpcScanner = new ClassPathRpcScanner(registry);          classPathRpcScanner.setAnnotationClass(RpcService.class);          classPathRpcScanner.registerFilter();          classPathRpcScanner.scan(StringUtils.tokenizeToStringArray(BASE_PACKAGE, ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS));      }        @Override      public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {        }        public static void main(String[] args) {          System.out.println(Arrays.asList(StringUtils.tokenizeToStringArray("com.cmazxiaoma.springcloud.netty.service",  ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS)));      }  }

在我们netty-rpc-client, 我们扫描所有被@RpcService注解的接口, 为这些接口 创建BeanDefinition, 指定其BeanClass是我们的factory类, 指定其接口的类型。

我们在这个factory类,给服务接口做动态代理,其InvocationHandler中的invoke函数就会发起rpc调用。

那我们怎么引用服务者提供者接口呢?

Controller类中通过@Autowired注入服务提供者接口即可。

再看XxlJob的实现

比如我们在Controller中 把服务接口打上@XxlRpcReference注解, 可以设置timeout,version,序列化算法等等等属性

然后Spring容器在这个服务接口依赖的bean实例化(postProcessAfterInstantiation)的时候, 会为被@XxlRpcReference注解的服务接口字段,创建XxlRpcReferenceBean, 同时把引用赋值给这些服务接口。

XxlRpcReferenceBean也是一个工厂类, 内部也做了动态代理。

我觉得这些XxlRpcReferenceBean最好缓存起来,不然10次引用服务接口,就要创建10次对象(内部还要创建JettyClient对象)。

public class XxlRpcSpringInvokerFactory extends InstantiationAwareBeanPostProcessorAdapter implements InitializingBean,DisposableBean, BeanFactoryAware {      private Logger logger = LoggerFactory.getLogger(XxlRpcSpringInvokerFactory.class);        // ---------------------- config ----------------------        private Class<? extends ServiceRegistry> serviceRegistryClass;          // class.forname      private Map<String, String> serviceRegistryParam;          public void setServiceRegistryClass(Class<? extends ServiceRegistry> serviceRegistryClass) {          this.serviceRegistryClass = serviceRegistryClass;      }        public void setServiceRegistryParam(Map<String, String> serviceRegistryParam) {          this.serviceRegistryParam = serviceRegistryParam;      }          // ---------------------- util ----------------------        private XxlRpcInvokerFactory xxlRpcInvokerFactory;        @Override      public void afterPropertiesSet() throws Exception {          // start invoker factory          xxlRpcInvokerFactory = new XxlRpcInvokerFactory(serviceRegistryClass, serviceRegistryParam);          xxlRpcInvokerFactory.start();      }        @Override      public boolean postProcessAfterInstantiation(final Object bean, final String beanName) throws BeansException {            ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {              @Override              public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {                  if (field.isAnnotationPresent(XxlRpcReference.class)) {                      // valid                      Class iface = field.getType();                      if (!iface.isInterface()) {                          throw new XxlRpcException("xxl-rpc, reference(XxlRpcReference) must be interface.");                      }                        XxlRpcReference rpcReference = field.getAnnotation(XxlRpcReference.class);                        // init reference bean                      XxlRpcReferenceBean referenceBean = new XxlRpcReferenceBean(                              rpcReference.netType(),                              rpcReference.serializer().getSerializer(),                              rpcReference.callType(),                              iface,                              rpcReference.version(),                              rpcReference.timeout(),                              rpcReference.address(),                              rpcReference.accessToken(),                              null                      );                        Object serviceProxy = referenceBean.getObject();                        // set bean                      field.setAccessible(true);                      field.set(bean, serviceProxy);                        logger.info(">>>>>>>>>>> xxl-rpc, invoker factory init reference bean success. serviceKey = {}, bean.field = {}.{}",                              XxlRpcProviderFactory.makeServiceKey(iface.getName(), rpcReference.version()), beanName, field.getName());                  }              }          });            return super.postProcessAfterInstantiation(bean, beanName);      }          @Override      public void destroy() throws Exception {            // stop invoker factory          xxlRpcInvokerFactory.stop();      }        private BeanFactory beanFactory;        @Override      public void setBeanFactory(BeanFactory beanFactory) throws BeansException {          this.beanFactory = beanFactory;      }  }

在我们netty-rpc-server端, 扫描@RpcService维护到一个map里面, key是接口类型,value是服务提供者实现类。当消费者发起rpc调用时, 需要调用服务提供者实现类的method时, 就减少了反射服务提供者实现类的性能开销。这里是不是可以把服务实现类的所有method都可以缓存起来? 我觉得有利有弊,可以通过CPU,内存,维护缓存的成本这3个维度决策。综合考虑,还是不要缓存起来较好。一个method反射次数超过15次,会交给MethodAccessorImpl处理,会在内存中生成对应的字节码,并调用ClassDefiner.defineClass创建对应的class对象,性能会得到一定的提升。

Java 反射效率低的原因:

  • Method#invoke 方法会对参数做封装和解封操作:我们可以看到,invoke 方法的参数是 Object[] 类型,也就是说,如果方法参数是简单类型的话,需要在此转化成 Object 类型,例如 long ,在 javac compile 的时候 用了Long.valueOf() 转型,也就大量了生成了Long 的 Object, 同时 传入的参数是Object[]数值,那还需要额外封装object数组。而在上面 MethodAccessorGenerator#emitInvoke 方法里我们看到,生成的字节码时,会把参数数组拆解开来,把参数恢复到没有被 Object[] 包装前的样子,同时还要对参数做校验,这里就涉及到了解封操作。因此,在反射调用的时候,因为封装和解封,产生了额外的不必要的内存浪费,当调用次数达到一定量的时候,还会导致 GC。
  • 需要检查方法可见性。我们会发现,反射时每次调用都必须检查方法的可见性(在 Method.invoke 里)
  • 需要校验参数:反射时也必须检查每个实际参数与形式参数的类型匹配性(在NativeMethodAccessorImpl.invoke0 里或者生成的 Java 版 MethodAccessor.invoke 里)
  • 反射方法难以内联:Method invoke 就像是个独木桥一样,各处的反射调用都要挤过去,在调用点上收集到的类型信息就会很乱,影响内联程序的判断,使得 Method.invoke() 自身难以被内联到调用方。(方法内联指的是在即时编译过程中遇到方法调用时,直接编译目标方法的方法体,并替换原方法调用。这样就不再需要像调用方法那样的压栈,出栈,传参了)
  • JIT 无法优化:因为反射涉及到动态加载的类型,所以无法进行优化。

回到正轨,我们这一套实现和Xxl-Rpc的实现方式大同小异。

public class XxlRpcSpringProviderFactory extends XxlRpcProviderFactory implements ApplicationContextAware, InitializingBean,DisposableBean {        // ---------------------- config ----------------------        private String netType = NetEnum.JETTY.name();      private String serialize = Serializer.SerializeEnum.HESSIAN.name();        private String ip = IpUtil.getIp();             // for registry      private int port = 7080;                        // default port      private String accessToken;        private Class<? extends ServiceRegistry> serviceRegistryClass;                          // class.forname      private Map<String, String> serviceRegistryParam;          // set      public void setNetType(String netType) {          this.netType = netType;      }        public void setSerialize(String serialize) {          this.serialize = serialize;      }        public void setIp(String ip) {          this.ip = ip;      }        public void setPort(int port) {          this.port = port;      }        public void setAccessToken(String accessToken) {          this.accessToken = accessToken;      }        public void setServiceRegistryClass(Class<? extends ServiceRegistry> serviceRegistryClass) {          this.serviceRegistryClass = serviceRegistryClass;      }        public void setServiceRegistryParam(Map<String, String> serviceRegistryParam) {          this.serviceRegistryParam = serviceRegistryParam;      }          // util      private void prepareConfig(){            // prepare config          NetEnum netTypeEnum = NetEnum.autoMatch(netType, null);          Serializer.SerializeEnum serializeEnum = Serializer.SerializeEnum.match(serialize, null);          Serializer serializer = serializeEnum!=null?serializeEnum.getSerializer():null;            if (port <= 0) {              throw new XxlRpcException("xxl-rpc provider port["+ port +"] is unvalid.");          }          if (NetUtil.isPortUsed(port)) {              throw new XxlRpcException("xxl-rpc provider port["+ port +"] is used.");          }            // init config          super.initConfig(netTypeEnum, serializer, ip, port, accessToken, serviceRegistryClass, serviceRegistryParam);      }          // ---------------------- util ----------------------        @Override      public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {            Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(XxlRpcService.class);          if (serviceBeanMap!=null && serviceBeanMap.size()>0) {              for (Object serviceBean : serviceBeanMap.values()) {                  // valid                  if (serviceBean.getClass().getInterfaces().length ==0) {                      throw new XxlRpcException("xxl-rpc, service(XxlRpcService) must inherit interface.");                  }                  // add service                  XxlRpcService xxlRpcService = serviceBean.getClass().getAnnotation(XxlRpcService.class);                    String iface = serviceBean.getClass().getInterfaces()[0].getName();                  String version = xxlRpcService.version();                    super.addService(iface, version, serviceBean);              }          }            // TODO,addServices by api + prop        }        @Override      public void afterPropertiesSet() throws Exception {          this.prepareConfig();          super.start();      }        @Override      public void destroy() throws Exception {          super.stop();      }    }
  • 3.XxlJobGroovy的支持,同时还支持注入Spring中的bean.(画外音:Zuul使用Grovvy定义动态过滤器的时候,删除Grovvy文件并不能从当前运行的api网关中移除这个过滤器,只能将shouldFilter返回false。目前过滤器无法注入Spring中的bean)。
public class GlueFactory {          private static GlueFactory glueFactory = new GlueFactory();      public static GlueFactory getInstance(){          return glueFactory;      }      public static void refreshInstance(int type){          if (type == 0) {              glueFactory = new GlueFactory();          } else if (type == 1) {              glueFactory = new SpringGlueFactory();          }      }          /**       * groovy class loader       */      private GroovyClassLoader groovyClassLoader = new GroovyClassLoader();          /**       * load new instance, prototype       *       * @param codeSource       * @return       * @throws Exception       */      public IJobHandler loadNewInstance(String codeSource) throws Exception{          if (codeSource!=null && codeSource.trim().length()>0) {              Class<?> clazz = groovyClassLoader.parseClass(codeSource);              if (clazz != null) {                  Object instance = clazz.newInstance();                  if (instance!=null) {                      if (instance instanceof IJobHandler) {                          this.injectService(instance);                          return (IJobHandler) instance;                      } else {                          throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, "                                  + "cannot convert from instance["+ instance.getClass() +"] to IJobHandler");                      }                  }              }          }          throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, instance is null");      }        /**       * inject service of bean field       *       * @param instance       */      public void injectService(Object instance) {          // do something      }    }
public class SpringGlueFactory extends GlueFactory {      private static Logger logger = LoggerFactory.getLogger(SpringGlueFactory.class);          /**       * inject action of spring       * @param instance       */      @Override      public void injectService(Object instance){          if (instance==null) {              return;          }            if (XxlJobSpringExecutor.getApplicationContext() == null) {              return;          }            Field[] fields = instance.getClass().getDeclaredFields();          for (Field field : fields) {              if (Modifier.isStatic(field.getModifiers())) {                  continue;              }                Object fieldBean = null;              // with bean-id, bean could be found by both @Resource and @Autowired, or bean could only be found by @Autowired                if (AnnotationUtils.getAnnotation(field, Resource.class) != null) {                  try {                      Resource resource = AnnotationUtils.getAnnotation(field, Resource.class);                      if (resource.name()!=null && resource.name().length()>0){                          fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(resource.name());                      } else {                          fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(field.getName());                      }                  } catch (Exception e) {                  }                  if (fieldBean==null ) {                      fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(field.getType());                  }              } else if (AnnotationUtils.getAnnotation(field, Autowired.class) != null) {                  Qualifier qualifier = AnnotationUtils.getAnnotation(field, Qualifier.class);                  if (qualifier!=null && qualifier.value()!=null && qualifier.value().length()>0) {                      fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(qualifier.value());                  } else {                      fieldBean = XxlJobSpringExecutor.getApplicationContext().getBean(field.getType());                  }              }                if (fieldBean!=null) {                  field.setAccessible(true);                  try {                      field.set(instance, fieldBean);                  } catch (IllegalArgumentException e) {                      logger.error(e.getMessage(), e);                  } catch (IllegalAccessException e) {                      logger.error(e.getMessage(), e);                  }              }          }      }    }
  • 4.Shiro通过实现DestructionAwareBeanPostProcessor完成对Bean生命周期的掌握,异曲同工之妙的还有ApplicationListenerDetector等等等。

我觉得xxl-job可以用这个更加优雅的方式来完成对bean的生命周期的托管。

public class LifecycleBeanPostProcessor implements DestructionAwareBeanPostProcessor, PriorityOrdered {        /**       * Private internal class log instance.       */      private static final Logger log = LoggerFactory.getLogger(LifecycleBeanPostProcessor.class);        /**       * Order value of this BeanPostProcessor.       */      private int order;        /**       * Default Constructor.       */      public LifecycleBeanPostProcessor() {          this(LOWEST_PRECEDENCE);      }        /**       * Constructor with definable {@link #getOrder() order value}.       *       * @param order order value of this BeanPostProcessor.       */      public LifecycleBeanPostProcessor(int order) {          this.order = order;      }        /**       * Calls the <tt>init()</tt> methods on the bean if it implements {@link org.apache.shiro.util.Initializable}       *       * @param object the object being initialized.       * @param name   the name of the bean being initialized.       * @return the initialized bean.       * @throws BeansException if any exception is thrown during initialization.       */      public Object postProcessBeforeInitialization(Object object, String name) throws BeansException {          if (object instanceof Initializable) {              try {                  if (log.isDebugEnabled()) {                      log.debug("Initializing bean [" + name + "]...");                  }                    ((Initializable) object).init();              } catch (Exception e) {                  throw new FatalBeanException("Error initializing bean [" + name + "]", e);              }          }          return object;      }          /**       * Does nothing - merely returns the object argument immediately.       */      public Object postProcessAfterInitialization(Object object, String name) throws BeansException {          // Does nothing after initialization          return object;      }          /**       * Calls the <tt>destroy()</tt> methods on the bean if it implements {@link org.apache.shiro.util.Destroyable}       *       * @param object the object being initialized.       * @param name   the name of the bean being initialized.       * @throws BeansException if any exception is thrown during initialization.       */      public void postProcessBeforeDestruction(Object object, String name) throws BeansException {          if (object instanceof Destroyable) {              try {                  if (log.isDebugEnabled()) {                      log.debug("Destroying bean [" + name + "]...");                  }                    ((Destroyable) object).destroy();              } catch (Exception e) {                  throw new FatalBeanException("Error destroying bean [" + name + "]", e);              }          }      }        /**       * Order value of this BeanPostProcessor.       *       * @return order value.       */      public int getOrder() {          // LifecycleBeanPostProcessor needs Order. See https://issues.apache.org/jira/browse/SHIRO-222          return order;      }        /**       * Return true only if <code>bean</code> implements Destroyable.       * @param bean bean to check if requires destruction.       * @return true only if <code>bean</code> implements Destroyable.       * @since 1.4       */      @SuppressWarnings("unused")      public boolean requiresDestruction(Object bean) {          return (bean instanceof Destroyable);      }  }
    1. Dubbo中,Invoke是一个关键组件。在服务者和消费者之间都充当服务调用的一个职责,有点像xxl-job中的XxlRpcProviderFactory

Dubbo中的ProxyFactorygetInvoker方法用于在服务提供者端,将服务的具体实现类转换成InvokergetProxy方法用于在消费者端,将invoker转换成客户端需要的接口。在服务提供者ServiceConfig和消费者ReferenceConfig中,都会对proxyFactory通过ExtensionLoader扩展机制生成适配类ProxyFactory$Adaptive。这个适配类会根据URLProxyFactory参数选择对应的实现类进行操作。

/**   * ProxyFactory. (API/SPI, Singleton, ThreadSafe)   */  @SPI("javassist")  public interface ProxyFactory {        /**       * create proxy.       *       * @param invoker       * @return proxy       */      @Adaptive({Constants.PROXY_KEY})      <T> T getProxy(Invoker<T> invoker) throws RpcException;        /**       * create invoker.       *       * @param <T>       * @param proxy       * @param type       * @param url       * @return invoker       */      @Adaptive({Constants.PROXY_KEY})      <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;    }

ReferenceBean创建的代理对象,就要经过InvokerInovationHandler处理。首先会经过AbstractInvoker中的public Result invoke(Invocation inv) throws RpcException,然后再交给其子类doInvoke(invocation)实现。

public class InvokerInvocationHandler implements InvocationHandler {        private final Invoker<?> invoker;        public InvokerInvocationHandler(Invoker<?> handler) {          this.invoker = handler;      }        @Override      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {          String methodName = method.getName();          Class<?>[] parameterTypes = method.getParameterTypes();          if (method.getDeclaringClass() == Object.class) {              return method.invoke(invoker, args);          }          if ("toString".equals(methodName) && parameterTypes.length == 0) {              return invoker.toString();          }          if ("hashCode".equals(methodName) && parameterTypes.length == 0) {              return invoker.hashCode();          }          if ("equals".equals(methodName) && parameterTypes.length == 1) {              return invoker.equals(args[0]);          }          return invoker.invoke(new RpcInvocation(method, args)).recreate();      }    }

可以看看DubboInvoker中的doInvoke实现,发起远程调用。

@Override      protected Result doInvoke(final Invocation invocation) throws Throwable {          RpcInvocation inv = (RpcInvocation) invocation;          final String methodName = RpcUtils.getMethodName(invocation);          inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());          inv.setAttachment(Constants.VERSION_KEY, version);            ExchangeClient currentClient;          if (clients.length == 1) {              currentClient = clients[0];          } else {              currentClient = clients[index.getAndIncrement() % clients.length];          }          try {              boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);              boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);              int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);              if (isOneway) {                  boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);                  currentClient.send(inv, isSent);                  RpcContext.getContext().setFuture(null);                  return new RpcResult();              } else if (isAsync) {                  ResponseFuture future = currentClient.request(inv, timeout);                  RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));                  return new RpcResult();              } else {                  RpcContext.getContext().setFuture(null);                  return (Result) currentClient.request(inv, timeout).get();              }          } catch (TimeoutException e) {              throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);          } catch (RemotingException e) {              throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);          }      }

服务提供者收到请求时,会回调此方法。最终会通过AbstractProxyInvoker调用到上文提到过 ProxyFactory生成的Invoker对象。

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {            @Override          public Object reply(ExchangeChannel channel, Object message) throws RemotingException {              if (message instanceof Invocation) {                  Invocation inv = (Invocation) message;                  Invoker<?> invoker = getInvoker(channel, inv);                  // need to consider backward-compatibility if it's a callback                  if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {                      String methodsStr = invoker.getUrl().getParameters().get("methods");                      boolean hasMethod = false;                      if (methodsStr == null || methodsStr.indexOf(",") == -1) {                          hasMethod = inv.getMethodName().equals(methodsStr);                      } else {                          String[] methods = methodsStr.split(",");                          for (String method : methods) {                              if (inv.getMethodName().equals(method)) {                                  hasMethod = true;                                  break;                              }                          }                      }                      if (!hasMethod) {                          logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()                                  + " not found in callback service interface ,invoke will be ignored."                                  + " please update the api interface. url is:"                                  + invoker.getUrl()) + " ,invocation is :" + inv);                          return null;                      }                  }                  RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());                  return invoker.invoke(inv);              }              throw new RemotingException(channel, "Unsupported request: "                      + (message == null ? null : (message.getClass().getName() + ": " + message))                      + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());          }

AbstractProxyInvoker最终会调用服务提供者类中的方法。

    @Override      public Result invoke(Invocation invocation) throws RpcException {          try {              return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));          } catch (InvocationTargetException e) {              return new RpcResult(e.getTargetException());          } catch (Throwable e) {              throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);          }      }
public class JavassistProxyFactory extends AbstractProxyFactory {        @Override      @SuppressWarnings("unchecked")      public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {          return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));      }        @Override      public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {          // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'          final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);          return new AbstractProxyInvoker<T>(proxy, type, url) {              @Override              protected Object doInvoke(T proxy, String methodName,                                        Class<?>[] parameterTypes,                                        Object[] arguments) throws Throwable {                  return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);              }          };      }    }

我分析的比较简陋,毕竟这篇文章可不能让Dubbo喧宾夺主。对于xxl-Rpc在消费者扫描服务接口做动态代理,在提供者扫描服务实例并维护的实现,我们对此可以看下Dubbo在这一块是怎么实现的。(在Dubbo中,服务实例维护主要依托Exporter。它是服务暴露数据结构,规定了通常暴露某个协议的服务时,exporter具有获取该服务可执行对象Invoker的能力,以及取消暴露的能力。Invoker转化为Exporter时服务暴露的关键, 将该协议下的Exporter维护到exporterMap中将完成整个服务的暴露,往往每一种协议都会生成与之对应的ExporterExporter本文不会讲到,有兴趣的可以看下DubboProtocol,RegistryProtocol)

欲知后事如何,且听下回分解


尾言

万万没想到,一个知识点竟然能引发这么多血案!溜了溜了,俯卧撑搞起。