源碼分析–dubbo服務端暴露
- 2019 年 10 月 3 日
- 筆記
服務暴露的入口方法是 ServiceBean 的 onApplicationEvent。onApplicationEvent 是一個事件響應方法,該方法會在收到 Spring 上下文刷新事件後執行服務導出操作。方法程式碼如下:
ServiceBean#onApplicationEvent
public void onApplicationEvent(ContextRefreshedEvent event) { //是不是已經暴露或 是不是被取消 if (!isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } export(); } }
下面我們看看export方法:
public synchronized void export() { //一開始進來這個是空的 if (provider != null) { if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } //表示是否服務已經暴露 if (export != null && !export) { return; } //是否設置了延遲暴露 if (delay != null && delay > 0) { delayExportExecutor.schedule(new Runnable() { @Override public void run() { doExport(); } }, delay, TimeUnit.MILLISECONDS); } else { doExport(); } }
這個方法主要是進行校驗服務有沒有暴露過,有沒有設置延遲。
然後進入到doExport方法中:
ServiceConfig#doExport
protected synchronized void doExport() { //如果已經取消暴露,則直接拋出異常 if (unexported) { throw new IllegalStateException("Already unexported!"); } //如果已經暴露過,則返回 if (exported) { return; } exported = true; if (interfaceName == null || interfaceName.length() == 0) { throw new IllegalStateException("<dubbo:service interface="" /> interface not allow null!"); } //如果ProviderConfig為空,則給它進行賦值 checkDefault(); // 下面幾個 if 語句用於檢測 provider、application 等核心配置類對象是否為空, // 若為空,則嘗試從其他配置類對象中獲取相應的實例。 if (provider != null) { if (application == null) { application = provider.getApplication(); } if (module == null) { module = provider.getModule(); } if (registries == null) { registries = provider.getRegistries(); } if (monitor == null) { monitor = provider.getMonitor(); } if (protocols == null) { protocols = provider.getProtocols(); } } if (module != null) { if (registries == null) { registries = module.getRegistries(); } if (monitor == null) { monitor = module.getMonitor(); } } if (application != null) { if (registries == null) { registries = application.getRegistries(); } if (monitor == null) { monitor = application.getMonitor(); } } //泛型化 if (ref instanceof GenericService) { interfaceClass = GenericService.class; if (StringUtils.isEmpty(generic)) { generic = Boolean.TRUE.toString(); } } else { try { interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } //校驗介面裡面是否有設置的方法 checkInterfaceAndMethods(interfaceClass, methods); //校驗介面的實現類不能為空 checkRef(); generic = Boolean.FALSE.toString(); } if (local != null) { if ("true".equals(local)) { local = interfaceName + "Local"; } Class<?> localClass; try { localClass = ClassHelper.forNameWithThreadContextClassLoader(local); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } if (!interfaceClass.isAssignableFrom(localClass)) { throw new IllegalStateException("The local implementation class " + localClass.getName() + " not " + "implement interface " + interfaceName); } } //本地存根 if (stub != null) { if ("true".equals(stub)) { stub = interfaceName + "Stub"; } Class<?> stubClass; try { stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } if (!interfaceClass.isAssignableFrom(stubClass)) { throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not " + "implement interface " + interfaceName); } } //校驗application,如果為空則從系統變數賦值 checkApplication(); //校驗RegistryConfig,如果為空則從系統變數賦值 checkRegistry(); //校驗protocols,如果為空則從系統變數賦值 checkProtocol(); //從系統變數為serviceconfig賦值 appendProperties(this); //校驗本地存根和本地偽裝 checkStub(interfaceClass); checkMock(interfaceClass); if (path == null || path.length() == 0) { path = interfaceName; } //服務暴露 doExportUrls(); // ProviderModel 表示服務提供者模型,此對象中存儲了與服務提供者相關的資訊。 // 比如服務的配置資訊,服務實例等。每個被導出的服務對應一個 ProviderModel。 ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), ref, interfaceClass); ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel); }
這個方法里主要做了這麼幾件事:
- 校驗防止重複服務暴露
- 創建ProviderConfig,並賦值
- 泛型化校驗
- 校驗要暴露的介面
- 本地存根配置
- 校驗ApplicationConfig、RegistryConfig、ProtocolConfig
- 服務暴露
- 暴露完後將暴露的服務存入providedServices中
所以,總體來說就是做校驗和config的初始化工作,然後調用doExportUrls進行服務暴露。
接下來我們看看doExportUrls方法:
private void doExportUrls() { //將Registries封裝成URL List<URL> registryURLs = loadRegistries(true); //遍歷protocolConfig,並註冊 for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
講RegistryConfig轉換為URL對象,然後調用doExportUrlsFor1Protocol方法。
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { ....忽略不相關程式碼 String scope = url.getParameter(Constants.SCOPE_KEY); // don't export when none is configured if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) { //本地暴露 // export to local if the config is not remote (export to remote only when config is remote) if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) { exportLocal(url); } //遠程暴露 // export to remote if the config is not local (export to local only when config is local) if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) { if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (registryURLs != null && !registryURLs.isEmpty()) { for (URL registryURL : registryURLs) { //有時候希望人工管理服務提供者的上線和下線,此時需將註冊中心標識為非動態管理模式。 //dynamic="false" 為人工管理服務模式 url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY)); //監控中心 URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } //配置是JdkProxyFactory還是JavassistProxyFactory // For providers, this is used to enable custom proxy to generate invoker String proxy = url.getParameter(Constants.PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy); } //ProxyFactory$Adaptive Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); //Protocol$Adaptive Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } else { Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } } }
doExportUrlsFor1Protocol程式碼裡面有很多配置和設值的程式碼,我這裡就不貼出來了,有興趣的可以自己去看看是怎麼設值的。
這段程式碼我們只需要看這個if判斷就可以了。在這個if判斷裡面,如果scope配置的不是none,並且不是remote,那麼就會進行本地暴露和遠程暴露。
下面我只對遠程暴露進行講解。
首先會對url進行dynamic和monitorUrl校驗和配置,然後會調用proxyFactory#getInvoker生成一個invoker對象。proxyFactory是由dubbo的spi生成的代理對象ProxyFactory$Adaptive,我們來看看具體的程式碼:
public class ProxyFactory$Adaptive implements ProxyFactory { private static final Logger logger = LoggerFactory.getLogger(ExtensionLoader.class); private AtomicInteger count = new AtomicInteger(0); public Object getProxy(Invoker var1) throws RpcException { if (var1 == null) { throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null"); } else if (var1.getUrl() == null) { throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null"); } else { URL var2 = var1.getUrl(); String var3 = var2.getParameter("proxy", "javassist"); if (var3 == null) { throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var2.toString() + ") use keys([proxy])"); } else { ProxyFactory var4 = null; try { var4 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var3); } catch (Exception var6) { if (this.count.incrementAndGet() == 1) { logger.warn("Failed to find extension named " + var3 + " for type org.apache.dubbo.rpc.ProxyFactory, will use default extension javassist instead.", var6); } var4 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension("javassist"); } return var4.getProxy(var1); } } } public Object getProxy(Invoker var1, boolean var2) throws RpcException { if (var1 == null) { throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null"); } else if (var1.getUrl() == null) { throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null"); } else { URL var3 = var1.getUrl(); String var4 = var3.getParameter("proxy", "javassist"); if (var4 == null) { throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var3.toString() + ") use keys([proxy])"); } else { ProxyFactory var5 = null; try { var5 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var4); } catch (Exception var7) { if (this.count.incrementAndGet() == 1) { logger.warn("Failed to find extension named " + var4 + " for type org.apache.dubbo.rpc.ProxyFactory, will use default extension javassist instead.", var7); } var5 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension("javassist"); } return var5.getProxy(var1, var2); } } } //var1 = ref //var2 = (Class) interfaceClass //var3 = url 對象 public Invoker getInvoker(Object var1, Class var2, URL var3) throws RpcException { if (var3 == null) { throw new IllegalArgumentException("url == null"); } else { //獲取代理方式,默認是javassist String var5 = var3.getParameter("proxy", "javassist"); if (var5 == null) { throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var3.toString() + ") use keys([proxy])"); } else { ProxyFactory var6 = null; try { //通過spi獲取ProxyFactory實例JavassistProxyFactory var6 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var5); } catch (Exception var8) { if (this.count.incrementAndGet() == 1) { logger.warn("Failed to find extension named " + var5 + " for type org.apache.dubbo.rpc.ProxyFactory, will use default extension javassist instead.", var8); } var6 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension("javassist"); } //調用JavassistProxyFactory實例的invoker方法 return var6.getInvoker(var1, var2, var3); } } } public ProxyFactory$Adaptive() { } }
所以proxyFactory#getInvoker最終會通過 ProxyFactory$Adaptive生成一個invoker對象。
然後封裝成wrapperInvoker實例傳入到protocol#export中。protocol是生成的代理類Protocol$Adaptive。
我們看一下Protocol$Adaptive生成的程式碼是怎麼樣的:
public class Protocol$Adaptive implements Protocol { private static final Logger logger = LoggerFactory.getLogger(ExtensionLoader.class); private AtomicInteger count = new AtomicInteger(0); public void destroy() { throw new UnsupportedOperationException("method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!"); } public int getDefaultPort() { throw new UnsupportedOperationException("method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!"); } public Exporter export(Invoker var1) throws RpcException { if (var1 == null) { throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null"); } else if (var1.getUrl() == null) { throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null"); } else { URL var2 = var1.getUrl(); String var3 = var2.getProtocol() == null ? "dubbo" : var2.getProtocol(); if (var3 == null) { throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url(" + var2.toString() + ") use keys([protocol])"); } else { Protocol var4 = null; try { var4 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(var3); } catch (Exception var6) { if (this.count.incrementAndGet() == 1) { logger.warn("Failed to find extension named " + var3 + " for type org.apache.dubbo.rpc.Protocol, will use default extension dubbo instead.", var6); } var4 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo"); } return var4.export(var1); } } } public Invoker refer(Class var1, URL var2) throws RpcException { if (var2 == null) { throw new IllegalArgumentException("url == null"); } else { String var4 = var2.getProtocol() == null ? "dubbo" : var2.getProtocol(); if (var4 == null) { throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url(" + var2.toString() + ") use keys([protocol])"); } else { Protocol var5 = null; try { var5 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(var4); } catch (Exception var7) { if (this.count.incrementAndGet() == 1) { logger.warn("Failed to find extension named " + var4 + " for type org.apache.dubbo.rpc.Protocol, will use default extension dubbo instead.", var7); } var5 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo"); } return var5.refer(var1, var2); } } } public Protocol$Adaptive() { } }
最後會通過下面程式碼,獲得ProtocolListenerWrapper實例,並調用其export方法進行暴露。
var4 = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(var3);
然後調用鏈:ProtocolListenerWrapper->ProtocolFilterWrapper->RegistryProtocol
最後調用RegistryProtocol#export方法。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //服務端開啟服務 //export invoker final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); URL registryUrl = getRegistryUrl(originInvoker); //獲取註冊中心實例 //registry provider final Registry registry = getRegistry(originInvoker); final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker); //to judge to delay publish whether or not boolean register = registeredProviderUrl.getParameter("register", true); ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); if (register) { //註冊 register(registryUrl, registeredProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call // the same service. Because the subscribed is cached key with the name of the service, it causes the // subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); //訂閱監聽 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl); }
我們先看doLocalExport這個方法,這個方法裡面實現了服務端的服務暴露。
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); //通過spi調用dubboProtocol進行服務暴露 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); //快取暴露過的服務 bounds.put(key, exporter); } } } return exporter; }
在doLocalExport方法裡面,會通過spi調用
ProtocolListenerWrapper->ProtocolFilterWrapper->DubboProtocol
DubboProtocol#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } //實現服務的暴露 openServer(url); optimizeSerialization(url); return exporter; } private void openServer(URL url) { ...忽略程式碼 if (server == null) { //創建服務 serverMap.put(key, createServer(url)); } } private ExchangeServer createServer(URL url) { ...忽略程式碼 //啟動服務 server = Exchangers.bind(url, requestHandler); ...忽略程式碼 return server; }
最終會通過Exchangers#bind來啟動服務
Exchangers#bind->HeaderExchanger#bind->Transporters#bind->NettyTransporter#bind->NettyServer#doOpen
最後會在NettyServer調用doOpen方法啟動服務。
我們再回到RegistryProtocol#export往下走
register(registryUrl, registeredProviderUrl)
,這句程式碼就是用來註冊到註冊中心的,如果用的是zk,那麼就是註冊到Zookeeper的。
我們進到這個方法里瞧瞧。
public void register(URL registryUrl, URL registedProviderUrl) { //ZookeeperRegistry Registry registry = registryFactory.getRegistry(registryUrl); registry.register(registedProviderUrl); }
registryFactory是spi實現的代理類:
public class RegistryFactory$Adaptive implements RegistryFactory { private static final Logger logger = LoggerFactory.getLogger(ExtensionLoader.class); private AtomicInteger count = new AtomicInteger(0); public Registry getRegistry(URL var1) { if (var1 == null) { throw new IllegalArgumentException("url == null"); } else { String var3 = var1.getProtocol() == null ? "dubbo" : var1.getProtocol(); if (var3 == null) { throw new IllegalStateException("Fail to get extension(org.apache.dubbo.registry.RegistryFactory) name from url(" + var1.toString() + ") use keys([protocol])"); } else { RegistryFactory var4 = null; try { var4 = (RegistryFactory)ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(var3); } catch (Exception var6) { if (this.count.incrementAndGet() == 1) { logger.warn("Failed to find extension named " + var3 + " for type org.apache.dubbo.registry.RegistryFactory, will use default extension dubbo instead.", var6); } var4 = (RegistryFactory)ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension("dubbo"); } return var4.getRegistry(var1); } } } public RegistryFactory$Adaptive() { } }
然後通過調用RegistryFactory$Adaptive的getRegistry方法獲取Registry的實現類ZookeeperRegistry實例。
ZookeeperRegistry繼承關係圖如下所示:
所以調用ZookeeperRegistry#register會先調用到父類的register方法。
FailbackRegistry#register
public void register(URL url) { super.register(url); //從失敗集合中移除 failedRegistered.remove(url); //從失敗取消註冊的集合中移除 failedUnregistered.remove(url); try { // Sending a registration request to the server side //調用子類的方法進行註冊 doRegister(url); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // Record a failed registration request to a failed list, retry regularly failedRegistered.add(url); } }
通過調用doRegister回到ZookeeperRegistry的doRegister方法中,然後調用zk的spi進行註冊。
到這裡,服務端的暴露也就講完了。