源碼分析–dubbo服務端暴露

  • 2019 年 10 月 3 日
  • 筆記

服務暴露的入口方法是 ServiceBean 的 onApplicationEvent。onApplicationEvent 是一個事件響應方法,該方法會在收到 Spring 上下文刷新事件後執行服務導出操作。方法程式碼如下:

ServiceBean#onApplicationEvent

public void onApplicationEvent(ContextRefreshedEvent event) {      //是不是已經暴露或 是不是被取消      if (!isExported() && !isUnexported()) {          if (logger.isInfoEnabled()) {              logger.info("The service ready on spring started. service: " + getInterface());          }          export();      }  }

下面我們看看export方法:

public synchronized void export() {      //一開始進來這個是空的      if (provider != null) {          if (export == null) {              export = provider.getExport();          }          if (delay == null) {              delay = provider.getDelay();          }      }      //表示是否服務已經暴露      if (export != null && !export) {          return;      }      //是否設置了延遲暴露      if (delay != null && delay > 0) {          delayExportExecutor.schedule(new Runnable() {              @Override              public void run() {                  doExport();              }          }, delay, TimeUnit.MILLISECONDS);      } else {          doExport();      }  }

這個方法主要是進行校驗服務有沒有暴露過,有沒有設置延遲。

然後進入到doExport方法中:

ServiceConfig#doExport

protected synchronized void doExport() {      //如果已經取消暴露,則直接拋出異常      if (unexported) {          throw new IllegalStateException("Already unexported!");      }      //如果已經暴露過,則返回      if (exported) {          return;      }      exported = true;      if (interfaceName == null || interfaceName.length() == 0) {          throw new IllegalStateException("<dubbo:service interface="" /> interface not allow null!");      }      //如果ProviderConfig為空,則給它進行賦值      checkDefault();      // 下面幾個 if 語句用於檢測 provider、application 等核心配置類對象是否為空,      // 若為空,則嘗試從其他配置類對象中獲取相應的實例。      if (provider != null) {          if (application == null) {              application = provider.getApplication();          }          if (module == null) {              module = provider.getModule();          }          if (registries == null) {              registries = provider.getRegistries();          }          if (monitor == null) {              monitor = provider.getMonitor();          }          if (protocols == null) {              protocols = provider.getProtocols();          }      }      if (module != null) {          if (registries == null) {              registries = module.getRegistries();          }          if (monitor == null) {              monitor = module.getMonitor();          }      }      if (application != null) {          if (registries == null) {              registries = application.getRegistries();          }          if (monitor == null) {              monitor = application.getMonitor();          }      }      //泛型化      if (ref instanceof GenericService) {          interfaceClass = GenericService.class;          if (StringUtils.isEmpty(generic)) {              generic = Boolean.TRUE.toString();          }      } else {          try {              interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()                      .getContextClassLoader());          } catch (ClassNotFoundException e) {              throw new IllegalStateException(e.getMessage(), e);          }          //校驗介面裡面是否有設置的方法          checkInterfaceAndMethods(interfaceClass, methods);          //校驗介面的實現類不能為空          checkRef();          generic = Boolean.FALSE.toString();      }      if (local != null) {          if ("true".equals(local)) {              local = interfaceName + "Local";          }          Class<?> localClass;          try {              localClass = ClassHelper.forNameWithThreadContextClassLoader(local);          } catch (ClassNotFoundException e) {              throw new IllegalStateException(e.getMessage(), e);          }          if (!interfaceClass.isAssignableFrom(localClass)) {              throw new IllegalStateException("The local implementation class " + localClass.getName() + " not " +                      "implement interface " + interfaceName);          }      }      //本地存根      if (stub != null) {          if ("true".equals(stub)) {              stub = interfaceName + "Stub";          }          Class<?> stubClass;          try {              stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);          } catch (ClassNotFoundException e) {              throw new IllegalStateException(e.getMessage(), e);          }          if (!interfaceClass.isAssignableFrom(stubClass)) {              throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not " +                      "implement interface " + interfaceName);          }      }      //校驗application,如果為空則從系統變數賦值      checkApplication();      //校驗RegistryConfig,如果為空則從系統變數賦值      checkRegistry();      //校驗protocols,如果為空則從系統變數賦值      checkProtocol();      //從系統變數為serviceconfig賦值      appendProperties(this);      //校驗本地存根和本地偽裝      checkStub(interfaceClass);      checkMock(interfaceClass);      if (path == null || path.length() == 0) {          path = interfaceName;      }      //服務暴露      doExportUrls();      // ProviderModel 表示服務提供者模型,此對象中存儲了與服務提供者相關的資訊。      // 比如服務的配置資訊,服務實例等。每個被導出的服務對應一個 ProviderModel。      ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), ref, interfaceClass);      ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);  }

這個方法里主要做了這麼幾件事:

  1. 校驗防止重複服務暴露
  2. 創建ProviderConfig,並賦值
  3. 泛型化校驗
  4. 校驗要暴露的介面
  5. 本地存根配置
  6. 校驗ApplicationConfig、RegistryConfig、ProtocolConfig
  7. 服務暴露
  8. 暴露完後將暴露的服務存入providedServices中
    所以,總體來說就是做校驗和config的初始化工作,然後調用doExportUrls進行服務暴露。

接下來我們看看doExportUrls方法:

private void doExportUrls() {      //將Registries封裝成URL      List<URL> registryURLs = loadRegistries(true);      //遍歷protocolConfig,並註冊      for (ProtocolConfig protocolConfig : protocols) {          doExportUrlsFor1Protocol(protocolConfig, registryURLs);      }  }

講RegistryConfig轉換為URL對象,然後調用doExportUrlsFor1Protocol方法。

private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();    private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {      ....忽略不相關程式碼      String scope = url.getParameter(Constants.SCOPE_KEY);      // don't export when none is configured      if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {      //本地暴露      // export to local if the config is not remote (export to remote only when config is remote)      if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {          exportLocal(url);      }      //遠程暴露      // export to remote if the config is not local (export to local only when config is local)      if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {          if (logger.isInfoEnabled()) {              logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);          }          if (registryURLs != null && !registryURLs.isEmpty()) {              for (URL registryURL : registryURLs) {                  //有時候希望人工管理服務提供者的上線和下線,此時需將註冊中心標識為非動態管理模式。                  //dynamic="false" 為人工管理服務模式                  url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY,                          registryURL.getParameter(Constants.DYNAMIC_KEY));                  //監控中心                  URL monitorUrl = loadMonitor(registryURL);                  if (monitorUrl != null) {                      url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());                  }                  if (logger.isInfoEnabled()) {                      logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);                  }                  //配置是JdkProxyFactory還是JavassistProxyFactory                  // For providers, this is used to enable custom proxy to generate invoker                  String proxy = url.getParameter(Constants.PROXY_KEY);                  if (StringUtils.isNotEmpty(proxy)) {                      registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);                  }                  //ProxyFactory$Adaptive                  Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));                  DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);                  //Protocol$Adaptive                  Exporter<?> exporter = protocol.export(wrapperInvoker);                  exporters.add(exporter);              }          } else {              Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);              DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);                Exporter<?> exporter = protocol.export(wrapperInvoker);              exporters.add(exporter);          }      }  }  }

doExportUrlsFor1Protocol程式碼裡面有很多配置和設值的程式碼,我這裡就不貼出來了,有興趣的可以自己去看看是怎麼設值的。

這段程式碼我們只需要看這個if判斷就可以了。在這個if判斷裡面,如果scope配置的不是none,並且不是remote,那麼就會進行本地暴露和遠程暴露。

下面我只對遠程暴露進行講解。

首先會對url進行dynamic和monitorUrl校驗和配置,然後會調用proxyFactory#getInvoker生成一個invoker對象。proxyFactory是由dubbo的spi生成的代理對象ProxyFactory$Adaptive,我們來看看具體的程式碼:

public class ProxyFactory$Adaptive implements ProxyFactory {      private static final Logger logger = LoggerFactory.getLogger(ExtensionLoader.class);      private AtomicInteger count = new AtomicInteger(0);        public Object getProxy(Invoker var1) throws RpcException {          if (var1 == null) {              throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");          } else if (var1.getUrl() == null) {              throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");          } else {              URL var2 = var1.getUrl();              String var3 = var2.getParameter("proxy", "javassist");              if (var3 == null) {                  throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var2.toString() + ") use keys([proxy])");              } else {                  ProxyFactory var4 = null;                    try {                      var4 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var3);                  } catch (Exception var6) {                      if (this.count.incrementAndGet() == 1) {                          logger.warn("Failed to find extension named " + var3 + " for type org.apache.dubbo.rpc.ProxyFactory, will use default extension javassist instead.", var6);                      }                        var4 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension("javassist");                  }                    return var4.getProxy(var1);              }          }      }        public Object getProxy(Invoker var1, boolean var2) throws RpcException {          if (var1 == null) {              throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");          } else if (var1.getUrl() == null) {              throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");          } else {              URL var3 = var1.getUrl();              String var4 = var3.getParameter("proxy", "javassist");              if (var4 == null) {                  throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var3.toString() + ") use keys([proxy])");              } else {                  ProxyFactory var5 = null;                    try {                      var5 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var4);                  } catch (Exception var7) {                      if (this.count.incrementAndGet() == 1) {                          logger.warn("Failed to find extension named " + var4 + " for type org.apache.dubbo.rpc.ProxyFactory, will use default extension javassist instead.", var7);                      }                        var5 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension("javassist");                  }                    return var5.getProxy(var1, var2);              }          }      }      //var1 = ref      //var2 = (Class) interfaceClass      //var3 = url 對象      public Invoker getInvoker(Object var1, Class var2, URL var3) throws RpcException {          if (var3 == null) {              throw new IllegalArgumentException("url == null");          } else {              //獲取代理方式,默認是javassist              String var5 = var3.getParameter("proxy", "javassist");              if (var5 == null) {                  throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var3.toString() + ") use keys([proxy])");              } else {                  ProxyFactory var6 = null;                    try {                      //通過spi獲取ProxyFactory實例JavassistProxyFactory                      var6 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var5);                  } catch (Exception var8) {                      if (this.count.incrementAndGet() == 1) {                          logger.warn("Failed to find extension named " + var5 + " for type org.apache.dubbo.rpc.ProxyFactory, will use default extension javassist instead.", var8);                      }                        var6 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension("javassist");                  }                  //調用JavassistProxyFactory實例的invoker方法                  return var6.getInvoker(var1, var2, var3);              }          }      }        public ProxyFactory$Adaptive() {      }  }

所以proxyFactory#getInvoker最終會通過 ProxyFactory$Adaptive生成一個invoker對象。

然後封裝成wrapperInvoker實例傳入到protocol#export中。protocol是生成的代理類Protocol$Adaptive。

我們看一下Protocol$Adaptive生成的程式碼是怎麼樣的:

public class Protocol$Adaptive implements Protocol {      private static final Logger logger = LoggerFactory.getLogger(ExtensionLoader.class);      private AtomicInteger count = new AtomicInteger(0);        public void destroy() {          throw new UnsupportedOperationException("method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");      }        public int getDefaultPort() {          throw new UnsupportedOperationException("method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");      }        public Exporter export(Invoker var1) throws RpcException {          if (var1 == null) {              throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");          } else if (var1.getUrl() == null) {              throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");          } else {              URL var2 = var1.getUrl();              String var3 = var2.getProtocol() == null ? "dubbo" : var2.getProtocol();              if (var3 == null) {                  throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url(" + var2.toString() + ") use keys([protocol])");              } else {                  Protocol var4 = null;                    try {                      var4 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(var3);                  } catch (Exception var6) {                      if (this.count.incrementAndGet() == 1) {                          logger.warn("Failed to find extension named " + var3 + " for type org.apache.dubbo.rpc.Protocol, will use default extension dubbo instead.", var6);                      }                        var4 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo");                  }                    return var4.export(var1);              }          }      }        public Invoker refer(Class var1, URL var2) throws RpcException {          if (var2 == null) {              throw new IllegalArgumentException("url == null");          } else {              String var4 = var2.getProtocol() == null ? "dubbo" : var2.getProtocol();              if (var4 == null) {                  throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url(" + var2.toString() + ") use keys([protocol])");              } else {                  Protocol var5 = null;                    try {                      var5 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(var4);                  } catch (Exception var7) {                      if (this.count.incrementAndGet() == 1) {                          logger.warn("Failed to find extension named " + var4 + " for type org.apache.dubbo.rpc.Protocol, will use default extension dubbo instead.", var7);                      }                        var5 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo");                  }                    return var5.refer(var1, var2);              }          }      }        public Protocol$Adaptive() {      }  }

最後會通過下面程式碼,獲得ProtocolListenerWrapper實例,並調用其export方法進行暴露。

var4 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(var3);

然後調用鏈:ProtocolListenerWrapper->ProtocolFilterWrapper->RegistryProtocol

最後調用RegistryProtocol#export方法。

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {      //服務端開啟服務      //export invoker      final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);        URL registryUrl = getRegistryUrl(originInvoker);        //獲取註冊中心實例      //registry provider      final Registry registry = getRegistry(originInvoker);      final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);        //to judge to delay publish whether or not      boolean register = registeredProviderUrl.getParameter("register", true);        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);        if (register) {          //註冊          register(registryUrl, registeredProviderUrl);          ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);      }        // Subscribe the override data      // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call      //  the same service. Because the subscribed is cached key with the name of the service, it causes the      //  subscription information to cover.      final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);      final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);      overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);      //訂閱監聽      registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);      //Ensure that a new exporter instance is returned every time export      return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);  }

我們先看doLocalExport這個方法,這個方法裡面實現了服務端的服務暴露。

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {      String key = getCacheKey(originInvoker);      ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);      if (exporter == null) {          synchronized (bounds) {              exporter = (ExporterChangeableWrapper<T>) bounds.get(key);              if (exporter == null) {                  final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker,                          getProviderUrl(originInvoker));                  //通過spi調用dubboProtocol進行服務暴露                  exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete),                          originInvoker);                  //快取暴露過的服務                  bounds.put(key, exporter);              }          }      }      return exporter;  }

在doLocalExport方法裡面,會通過spi調用
ProtocolListenerWrapper->ProtocolFilterWrapper->DubboProtocol

DubboProtocol#export

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {      URL url = invoker.getUrl();        // export service.      String key = serviceKey(url);      DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);      exporterMap.put(key, exporter);        //export an stub service for dispatching event      Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);      Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);      if (isStubSupportEvent && !isCallbackservice) {          String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);          if (stubServiceMethods == null || stubServiceMethods.length() == 0) {              if (logger.isWarnEnabled()) {                  logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +                          "], has set stubproxy support event ,but no stub methods founded."));              }          } else {              stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);          }      }      //實現服務的暴露      openServer(url);      optimizeSerialization(url);      return exporter;  }      private void openServer(URL url) {      ...忽略程式碼      if (server == null) {          //創建服務          serverMap.put(key, createServer(url));      }  }    private ExchangeServer createServer(URL url) {      ...忽略程式碼      //啟動服務      server = Exchangers.bind(url, requestHandler);      ...忽略程式碼      return server;  }

最終會通過Exchangers#bind來啟動服務

Exchangers#bind->HeaderExchanger#bind->Transporters#bind->NettyTransporter#bind->NettyServer#doOpen

最後會在NettyServer調用doOpen方法啟動服務。

我們再回到RegistryProtocol#export往下走
register(registryUrl, registeredProviderUrl),這句程式碼就是用來註冊到註冊中心的,如果用的是zk,那麼就是註冊到Zookeeper的。

我們進到這個方法里瞧瞧。

public void register(URL registryUrl, URL registedProviderUrl) {      //ZookeeperRegistry      Registry registry = registryFactory.getRegistry(registryUrl);      registry.register(registedProviderUrl);  }

registryFactory是spi實現的代理類:

public class RegistryFactory$Adaptive implements RegistryFactory {      private static final Logger logger = LoggerFactory.getLogger(ExtensionLoader.class);      private AtomicInteger count = new AtomicInteger(0);        public Registry getRegistry(URL var1) {          if (var1 == null) {              throw new IllegalArgumentException("url == null");          } else {              String var3 = var1.getProtocol() == null ? "dubbo" : var1.getProtocol();              if (var3 == null) {                  throw new IllegalStateException("Fail to get extension(org.apache.dubbo.registry.RegistryFactory) name from url(" + var1.toString() + ") use keys([protocol])");              } else {                  RegistryFactory var4 = null;                    try {                      var4 = (RegistryFactory)ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(var3);                  } catch (Exception var6) {                      if (this.count.incrementAndGet() == 1) {                          logger.warn("Failed to find extension named " + var3 + " for type org.apache.dubbo.registry.RegistryFactory, will use default extension dubbo instead.", var6);                      }                        var4 = (RegistryFactory)ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension("dubbo");                  }                    return var4.getRegistry(var1);              }          }      }        public RegistryFactory$Adaptive() {      }  }

然後通過調用RegistryFactory$Adaptive的getRegistry方法獲取Registry的實現類ZookeeperRegistry實例。

ZookeeperRegistry繼承關係圖如下所示:

所以調用ZookeeperRegistry#register會先調用到父類的register方法。

FailbackRegistry#register

public void register(URL url) {      super.register(url);      //從失敗集合中移除      failedRegistered.remove(url);      //從失敗取消註冊的集合中移除      failedUnregistered.remove(url);      try {          // Sending a registration request to the server side          //調用子類的方法進行註冊          doRegister(url);      } catch (Exception e) {          Throwable t = e;            // If the startup detection is opened, the Exception is thrown directly.          boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)                  && url.getParameter(Constants.CHECK_KEY, true)                  && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());          boolean skipFailback = t instanceof SkipFailbackWrapperException;          if (check || skipFailback) {              if (skipFailback) {                  t = t.getCause();              }              throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);          } else {              logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);          }            // Record a failed registration request to a failed list, retry regularly          failedRegistered.add(url);      }  }

通過調用doRegister回到ZookeeperRegistry的doRegister方法中,然後調用zk的spi進行註冊。

到這裡,服務端的暴露也就講完了。