源码分析–dubbo服务端暴露

  • 2019 年 10 月 3 日
  • 笔记

服务暴露的入口方法是 ServiceBean 的 onApplicationEvent。onApplicationEvent 是一个事件响应方法,该方法会在收到 Spring 上下文刷新事件后执行服务导出操作。方法代码如下:

ServiceBean#onApplicationEvent

public void onApplicationEvent(ContextRefreshedEvent event) {      //是不是已经暴露或 是不是被取消      if (!isExported() && !isUnexported()) {          if (logger.isInfoEnabled()) {              logger.info("The service ready on spring started. service: " + getInterface());          }          export();      }  }

下面我们看看export方法:

public synchronized void export() {      //一开始进来这个是空的      if (provider != null) {          if (export == null) {              export = provider.getExport();          }          if (delay == null) {              delay = provider.getDelay();          }      }      //表示是否服务已经暴露      if (export != null && !export) {          return;      }      //是否设置了延迟暴露      if (delay != null && delay > 0) {          delayExportExecutor.schedule(new Runnable() {              @Override              public void run() {                  doExport();              }          }, delay, TimeUnit.MILLISECONDS);      } else {          doExport();      }  }

这个方法主要是进行校验服务有没有暴露过,有没有设置延迟。

然后进入到doExport方法中:

ServiceConfig#doExport

protected synchronized void doExport() {      //如果已经取消暴露,则直接抛出异常      if (unexported) {          throw new IllegalStateException("Already unexported!");      }      //如果已经暴露过,则返回      if (exported) {          return;      }      exported = true;      if (interfaceName == null || interfaceName.length() == 0) {          throw new IllegalStateException("<dubbo:service interface="" /> interface not allow null!");      }      //如果ProviderConfig为空,则给它进行赋值      checkDefault();      // 下面几个 if 语句用于检测 provider、application 等核心配置类对象是否为空,      // 若为空,则尝试从其他配置类对象中获取相应的实例。      if (provider != null) {          if (application == null) {              application = provider.getApplication();          }          if (module == null) {              module = provider.getModule();          }          if (registries == null) {              registries = provider.getRegistries();          }          if (monitor == null) {              monitor = provider.getMonitor();          }          if (protocols == null) {              protocols = provider.getProtocols();          }      }      if (module != null) {          if (registries == null) {              registries = module.getRegistries();          }          if (monitor == null) {              monitor = module.getMonitor();          }      }      if (application != null) {          if (registries == null) {              registries = application.getRegistries();          }          if (monitor == null) {              monitor = application.getMonitor();          }      }      //泛型化      if (ref instanceof GenericService) {          interfaceClass = GenericService.class;          if (StringUtils.isEmpty(generic)) {              generic = Boolean.TRUE.toString();          }      } else {          try {              interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()                      .getContextClassLoader());          } catch (ClassNotFoundException e) {              throw new IllegalStateException(e.getMessage(), e);          }          //校验接口里面是否有设置的方法          checkInterfaceAndMethods(interfaceClass, methods);          //校验接口的实现类不能为空          checkRef();          generic = Boolean.FALSE.toString();      }      if (local != null) {          if ("true".equals(local)) {              local = interfaceName + "Local";          }          Class<?> localClass;          try {              localClass = ClassHelper.forNameWithThreadContextClassLoader(local);          } catch (ClassNotFoundException e) {              throw new IllegalStateException(e.getMessage(), e);          }          if (!interfaceClass.isAssignableFrom(localClass)) {              throw new IllegalStateException("The local implementation class " + localClass.getName() + " not " +                      "implement interface " + interfaceName);          }      }      //本地存根      if (stub != null) {          if ("true".equals(stub)) {              stub = interfaceName + "Stub";          }          Class<?> stubClass;          try {              stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);          } catch (ClassNotFoundException e) {              throw new IllegalStateException(e.getMessage(), e);          }          if (!interfaceClass.isAssignableFrom(stubClass)) {              throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not " +                      "implement interface " + interfaceName);          }      }      //校验application,如果为空则从系统变量赋值      checkApplication();      //校验RegistryConfig,如果为空则从系统变量赋值      checkRegistry();      //校验protocols,如果为空则从系统变量赋值      checkProtocol();      //从系统变量为serviceconfig赋值      appendProperties(this);      //校验本地存根和本地伪装      checkStub(interfaceClass);      checkMock(interfaceClass);      if (path == null || path.length() == 0) {          path = interfaceName;      }      //服务暴露      doExportUrls();      // ProviderModel 表示服务提供者模型,此对象中存储了与服务提供者相关的信息。      // 比如服务的配置信息,服务实例等。每个被导出的服务对应一个 ProviderModel。      ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), ref, interfaceClass);      ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);  }

这个方法里主要做了这么几件事:

  1. 校验防止重复服务暴露
  2. 创建ProviderConfig,并赋值
  3. 泛型化校验
  4. 校验要暴露的接口
  5. 本地存根配置
  6. 校验ApplicationConfig、RegistryConfig、ProtocolConfig
  7. 服务暴露
  8. 暴露完后将暴露的服务存入providedServices中
    所以,总体来说就是做校验和config的初始化工作,然后调用doExportUrls进行服务暴露。

接下来我们看看doExportUrls方法:

private void doExportUrls() {      //将Registries封装成URL      List<URL> registryURLs = loadRegistries(true);      //遍历protocolConfig,并注册      for (ProtocolConfig protocolConfig : protocols) {          doExportUrlsFor1Protocol(protocolConfig, registryURLs);      }  }

讲RegistryConfig转换为URL对象,然后调用doExportUrlsFor1Protocol方法。

private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();    private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {      ....忽略不相关代码      String scope = url.getParameter(Constants.SCOPE_KEY);      // don't export when none is configured      if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {      //本地暴露      // export to local if the config is not remote (export to remote only when config is remote)      if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {          exportLocal(url);      }      //远程暴露      // export to remote if the config is not local (export to local only when config is local)      if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {          if (logger.isInfoEnabled()) {              logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);          }          if (registryURLs != null && !registryURLs.isEmpty()) {              for (URL registryURL : registryURLs) {                  //有时候希望人工管理服务提供者的上线和下线,此时需将注册中心标识为非动态管理模式。                  //dynamic="false" 为人工管理服务模式                  url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY,                          registryURL.getParameter(Constants.DYNAMIC_KEY));                  //监控中心                  URL monitorUrl = loadMonitor(registryURL);                  if (monitorUrl != null) {                      url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());                  }                  if (logger.isInfoEnabled()) {                      logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);                  }                  //配置是JdkProxyFactory还是JavassistProxyFactory                  // For providers, this is used to enable custom proxy to generate invoker                  String proxy = url.getParameter(Constants.PROXY_KEY);                  if (StringUtils.isNotEmpty(proxy)) {                      registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);                  }                  //ProxyFactory$Adaptive                  Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));                  DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);                  //Protocol$Adaptive                  Exporter<?> exporter = protocol.export(wrapperInvoker);                  exporters.add(exporter);              }          } else {              Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);              DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);                Exporter<?> exporter = protocol.export(wrapperInvoker);              exporters.add(exporter);          }      }  }  }

doExportUrlsFor1Protocol代码里面有很多配置和设值的代码,我这里就不贴出来了,有兴趣的可以自己去看看是怎么设值的。

这段代码我们只需要看这个if判断就可以了。在这个if判断里面,如果scope配置的不是none,并且不是remote,那么就会进行本地暴露和远程暴露。

下面我只对远程暴露进行讲解。

首先会对url进行dynamic和monitorUrl校验和配置,然后会调用proxyFactory#getInvoker生成一个invoker对象。proxyFactory是由dubbo的spi生成的代理对象ProxyFactory$Adaptive,我们来看看具体的代码:

public class ProxyFactory$Adaptive implements ProxyFactory {      private static final Logger logger = LoggerFactory.getLogger(ExtensionLoader.class);      private AtomicInteger count = new AtomicInteger(0);        public Object getProxy(Invoker var1) throws RpcException {          if (var1 == null) {              throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");          } else if (var1.getUrl() == null) {              throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");          } else {              URL var2 = var1.getUrl();              String var3 = var2.getParameter("proxy", "javassist");              if (var3 == null) {                  throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var2.toString() + ") use keys([proxy])");              } else {                  ProxyFactory var4 = null;                    try {                      var4 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var3);                  } catch (Exception var6) {                      if (this.count.incrementAndGet() == 1) {                          logger.warn("Failed to find extension named " + var3 + " for type org.apache.dubbo.rpc.ProxyFactory, will use default extension javassist instead.", var6);                      }                        var4 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension("javassist");                  }                    return var4.getProxy(var1);              }          }      }        public Object getProxy(Invoker var1, boolean var2) throws RpcException {          if (var1 == null) {              throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");          } else if (var1.getUrl() == null) {              throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");          } else {              URL var3 = var1.getUrl();              String var4 = var3.getParameter("proxy", "javassist");              if (var4 == null) {                  throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var3.toString() + ") use keys([proxy])");              } else {                  ProxyFactory var5 = null;                    try {                      var5 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var4);                  } catch (Exception var7) {                      if (this.count.incrementAndGet() == 1) {                          logger.warn("Failed to find extension named " + var4 + " for type org.apache.dubbo.rpc.ProxyFactory, will use default extension javassist instead.", var7);                      }                        var5 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension("javassist");                  }                    return var5.getProxy(var1, var2);              }          }      }      //var1 = ref      //var2 = (Class) interfaceClass      //var3 = url 对象      public Invoker getInvoker(Object var1, Class var2, URL var3) throws RpcException {          if (var3 == null) {              throw new IllegalArgumentException("url == null");          } else {              //获取代理方式,默认是javassist              String var5 = var3.getParameter("proxy", "javassist");              if (var5 == null) {                  throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var3.toString() + ") use keys([proxy])");              } else {                  ProxyFactory var6 = null;                    try {                      //通过spi获取ProxyFactory实例JavassistProxyFactory                      var6 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var5);                  } catch (Exception var8) {                      if (this.count.incrementAndGet() == 1) {                          logger.warn("Failed to find extension named " + var5 + " for type org.apache.dubbo.rpc.ProxyFactory, will use default extension javassist instead.", var8);                      }                        var6 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension("javassist");                  }                  //调用JavassistProxyFactory实例的invoker方法                  return var6.getInvoker(var1, var2, var3);              }          }      }        public ProxyFactory$Adaptive() {      }  }

所以proxyFactory#getInvoker最终会通过 ProxyFactory$Adaptive生成一个invoker对象。

然后封装成wrapperInvoker实例传入到protocol#export中。protocol是生成的代理类Protocol$Adaptive。

我们看一下Protocol$Adaptive生成的代码是怎么样的:

public class Protocol$Adaptive implements Protocol {      private static final Logger logger = LoggerFactory.getLogger(ExtensionLoader.class);      private AtomicInteger count = new AtomicInteger(0);        public void destroy() {          throw new UnsupportedOperationException("method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");      }        public int getDefaultPort() {          throw new UnsupportedOperationException("method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");      }        public Exporter export(Invoker var1) throws RpcException {          if (var1 == null) {              throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");          } else if (var1.getUrl() == null) {              throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");          } else {              URL var2 = var1.getUrl();              String var3 = var2.getProtocol() == null ? "dubbo" : var2.getProtocol();              if (var3 == null) {                  throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url(" + var2.toString() + ") use keys([protocol])");              } else {                  Protocol var4 = null;                    try {                      var4 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(var3);                  } catch (Exception var6) {                      if (this.count.incrementAndGet() == 1) {                          logger.warn("Failed to find extension named " + var3 + " for type org.apache.dubbo.rpc.Protocol, will use default extension dubbo instead.", var6);                      }                        var4 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo");                  }                    return var4.export(var1);              }          }      }        public Invoker refer(Class var1, URL var2) throws RpcException {          if (var2 == null) {              throw new IllegalArgumentException("url == null");          } else {              String var4 = var2.getProtocol() == null ? "dubbo" : var2.getProtocol();              if (var4 == null) {                  throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url(" + var2.toString() + ") use keys([protocol])");              } else {                  Protocol var5 = null;                    try {                      var5 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(var4);                  } catch (Exception var7) {                      if (this.count.incrementAndGet() == 1) {                          logger.warn("Failed to find extension named " + var4 + " for type org.apache.dubbo.rpc.Protocol, will use default extension dubbo instead.", var7);                      }                        var5 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo");                  }                    return var5.refer(var1, var2);              }          }      }        public Protocol$Adaptive() {      }  }

最后会通过下面代码,获得ProtocolListenerWrapper实例,并调用其export方法进行暴露。

var4 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(var3);

然后调用链:ProtocolListenerWrapper->ProtocolFilterWrapper->RegistryProtocol

最后调用RegistryProtocol#export方法。

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {      //服务端开启服务      //export invoker      final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);        URL registryUrl = getRegistryUrl(originInvoker);        //获取注册中心实例      //registry provider      final Registry registry = getRegistry(originInvoker);      final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);        //to judge to delay publish whether or not      boolean register = registeredProviderUrl.getParameter("register", true);        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);        if (register) {          //注册          register(registryUrl, registeredProviderUrl);          ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);      }        // Subscribe the override data      // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call      //  the same service. Because the subscribed is cached key with the name of the service, it causes the      //  subscription information to cover.      final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);      final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);      overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);      //订阅监听      registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);      //Ensure that a new exporter instance is returned every time export      return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);  }

我们先看doLocalExport这个方法,这个方法里面实现了服务端的服务暴露。

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {      String key = getCacheKey(originInvoker);      ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);      if (exporter == null) {          synchronized (bounds) {              exporter = (ExporterChangeableWrapper<T>) bounds.get(key);              if (exporter == null) {                  final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker,                          getProviderUrl(originInvoker));                  //通过spi调用dubboProtocol进行服务暴露                  exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete),                          originInvoker);                  //缓存暴露过的服务                  bounds.put(key, exporter);              }          }      }      return exporter;  }

在doLocalExport方法里面,会通过spi调用
ProtocolListenerWrapper->ProtocolFilterWrapper->DubboProtocol

DubboProtocol#export

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {      URL url = invoker.getUrl();        // export service.      String key = serviceKey(url);      DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);      exporterMap.put(key, exporter);        //export an stub service for dispatching event      Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);      Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);      if (isStubSupportEvent && !isCallbackservice) {          String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);          if (stubServiceMethods == null || stubServiceMethods.length() == 0) {              if (logger.isWarnEnabled()) {                  logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +                          "], has set stubproxy support event ,but no stub methods founded."));              }          } else {              stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);          }      }      //实现服务的暴露      openServer(url);      optimizeSerialization(url);      return exporter;  }      private void openServer(URL url) {      ...忽略代码      if (server == null) {          //创建服务          serverMap.put(key, createServer(url));      }  }    private ExchangeServer createServer(URL url) {      ...忽略代码      //启动服务      server = Exchangers.bind(url, requestHandler);      ...忽略代码      return server;  }

最终会通过Exchangers#bind来启动服务

Exchangers#bind->HeaderExchanger#bind->Transporters#bind->NettyTransporter#bind->NettyServer#doOpen

最后会在NettyServer调用doOpen方法启动服务。

我们再回到RegistryProtocol#export往下走
register(registryUrl, registeredProviderUrl),这句代码就是用来注册到注册中心的,如果用的是zk,那么就是注册到Zookeeper的。

我们进到这个方法里瞧瞧。

public void register(URL registryUrl, URL registedProviderUrl) {      //ZookeeperRegistry      Registry registry = registryFactory.getRegistry(registryUrl);      registry.register(registedProviderUrl);  }

registryFactory是spi实现的代理类:

public class RegistryFactory$Adaptive implements RegistryFactory {      private static final Logger logger = LoggerFactory.getLogger(ExtensionLoader.class);      private AtomicInteger count = new AtomicInteger(0);        public Registry getRegistry(URL var1) {          if (var1 == null) {              throw new IllegalArgumentException("url == null");          } else {              String var3 = var1.getProtocol() == null ? "dubbo" : var1.getProtocol();              if (var3 == null) {                  throw new IllegalStateException("Fail to get extension(org.apache.dubbo.registry.RegistryFactory) name from url(" + var1.toString() + ") use keys([protocol])");              } else {                  RegistryFactory var4 = null;                    try {                      var4 = (RegistryFactory)ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(var3);                  } catch (Exception var6) {                      if (this.count.incrementAndGet() == 1) {                          logger.warn("Failed to find extension named " + var3 + " for type org.apache.dubbo.registry.RegistryFactory, will use default extension dubbo instead.", var6);                      }                        var4 = (RegistryFactory)ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension("dubbo");                  }                    return var4.getRegistry(var1);              }          }      }        public RegistryFactory$Adaptive() {      }  }

然后通过调用RegistryFactory$Adaptive的getRegistry方法获取Registry的实现类ZookeeperRegistry实例。

ZookeeperRegistry继承关系图如下所示:

所以调用ZookeeperRegistry#register会先调用到父类的register方法。

FailbackRegistry#register

public void register(URL url) {      super.register(url);      //从失败集合中移除      failedRegistered.remove(url);      //从失败取消注册的集合中移除      failedUnregistered.remove(url);      try {          // Sending a registration request to the server side          //调用子类的方法进行注册          doRegister(url);      } catch (Exception e) {          Throwable t = e;            // If the startup detection is opened, the Exception is thrown directly.          boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)                  && url.getParameter(Constants.CHECK_KEY, true)                  && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());          boolean skipFailback = t instanceof SkipFailbackWrapperException;          if (check || skipFailback) {              if (skipFailback) {                  t = t.getCause();              }              throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);          } else {              logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);          }            // Record a failed registration request to a failed list, retry regularly          failedRegistered.add(url);      }  }

通过调用doRegister回到ZookeeperRegistry的doRegister方法中,然后调用zk的spi进行注册。

到这里,服务端的暴露也就讲完了。