【一起學源碼-微服務】Ribbon 源碼三:Ribbon與Eureka整合原理分析
- 2020 年 2 月 11 日
- 筆記
前言
前情回顧
上一篇講了Ribbon的初始化過程,從LoadBalancerAutoConfiguration
到RibbonAutoConfiguration
再到RibbonClientConfiguration
,我們找到了ILoadBalancer
默認初始化的對象等。
本講目錄
這一講我們會進一步往下探究Ribbon和Eureka是如何結合的。
通過上一講ILoadBalancer
我們已經可以拿到一個服務所有的服務節點資訊了,這裡面是怎麼把服務的名稱轉化為對應的具體host請求資訊的呢?
通過這一講 我們來一探究竟
目錄如下:
- EurekaClientAutoConfiguration.getLoadBalancer()回顧
- 再次梳理Ribbon初始化過程
- ServerList實現類初始化過程
- getUpdatedListOfServers()獲取註冊表列表分析
- ribbon如何更新自己保存的註冊表資訊?
說明
原創不易,如若轉載 請標明來源!
部落格地址:一枝花算不算浪漫 微信公眾號:壹枝花算不算浪漫
源碼閱讀
EurekaClientAutoConfiguration.getLoadBalancer()回顧
上一講我們已經深入的講解過getLoadBalancer()
方法的實現,每個serviceName都對應一個自己的SpringContext上下文資訊,然後通過ILoadBalancer.class
從上下文資訊中獲取默認的LoadBalancer:ZoneAwareLoadBalancer
, 我們看下這個類的構造函數:
public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) { super(clientConfig, rule, ping, serverList, filter, serverListUpdater); }
繼續跟父類DynamicServerListLoadBalancer
的初始化方法:
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer { volatile ServerList<T> serverListImpl; volatile ServerListFilter<T> filter; public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) { super(clientConfig, rule, ping); this.serverListImpl = serverList; this.filter = filter; this.serverListUpdater = serverListUpdater; if (filter instanceof AbstractServerListFilter) { ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats()); } restOfInit(clientConfig); } void restOfInit(IClientConfig clientConfig) { boolean primeConnection = this.isEnablePrimingConnections(); // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList() this.setEnablePrimingConnections(false); enableAndInitLearnNewServersFeature(); updateListOfServers(); if (primeConnection && this.getPrimeConnections() != null) { this.getPrimeConnections() .primeConnections(getReachableServers()); } this.setEnablePrimingConnections(primeConnection); LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString()); } @VisibleForTesting public void updateListOfServers() { List<T> servers = new ArrayList<T>(); if (serverListImpl != null) { servers = serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); if (filter != null) { servers = filter.getFilteredListOfServers(servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); } } updateAllServerList(servers); } }
構造方法中有個restOfInit()
方法,進去後又會有updateListOfServers()
方法,看方法名就知道這個肯定是和server註冊表相關的,繼續往後看,servers = serverListImpl.getUpdatedListOfServers();
,這裡直接調用getUpdatedListOfServers()
就獲取到了所有的註冊表資訊。

可以看到ServerList
有四個實現類,這個到底是該調用哪個實現類的getUpdatedListOfServers()
方法呢?接著往下看。
再次梳理Ribbon初始化過程
第二講我們已經見過Ribbon的初始化過程,並畫了圖整理,這裡針對於之前的圖再更新一下:

這裡主要是增加了RibbonEurekaAutoConfiguration
和EurekaRibbonClientConfiguration
兩個配置類的初始化。
ServerList實現類初始化過程
上面已經梳理過 Ribbon
初始化的過程,其中在EurekaRibbonClientConfiguration
會初始化RibbonServerList
,程式碼如下:
@Configuration public class EurekaRibbonClientConfiguration { @Bean @ConditionalOnMissingBean public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) { if (this.propertiesFactory.isSet(ServerList.class, serviceId)) { return this.propertiesFactory.get(ServerList.class, config, serviceId); } DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList( config, eurekaClientProvider); DomainExtractingServerList serverList = new DomainExtractingServerList( discoveryServerList, config, this.approximateZoneFromHostname); return serverList; } }
這裡實際的ServerList
實際就是DiscoveryEnabledNIWSServerList
,我們看下這個類:
public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer>{ } public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware { }
所以可以看出來ServerList
實際就是在這裡進行初始化的,上面那個serverListImpl.getUpdatedListOfServers();
即為調用DiscoveryEnabledNIWSServerList.getUpdatedListOfServers()
方法了,繼續往下看。
getUpdatedListOfServers()獲取註冊表分析
直接看DiscoveryEnabledNIWSServerList.getUpdatedListOfServers()
源程式碼:
@Override public List<DiscoveryEnabledServer> getUpdatedListOfServers(){ return obtainServersViaDiscovery(); } private List<DiscoveryEnabledServer> obtainServersViaDiscovery() { List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>(); if (eurekaClientProvider == null || eurekaClientProvider.get() == null) { logger.warn("EurekaClient has not been initialized yet, returning an empty list"); return new ArrayList<DiscoveryEnabledServer>(); } EurekaClient eurekaClient = eurekaClientProvider.get(); if (vipAddresses!=null){ for (String vipAddress : vipAddresses.split(",")) { // if targetRegion is null, it will be interpreted as the same region of client List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion); for (InstanceInfo ii : listOfInstanceInfo) { if (ii.getStatus().equals(InstanceStatus.UP)) { if(shouldUseOverridePort){ if(logger.isDebugEnabled()){ logger.debug("Overriding port on client name: " + clientName + " to " + overridePort); } // copy is necessary since the InstanceInfo builder just uses the original reference, // and we don't want to corrupt the global eureka copy of the object which may be // used by other clients in our system InstanceInfo copy = new InstanceInfo(ii); if(isSecure){ ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build(); }else{ ii = new InstanceInfo.Builder(copy).setPort(overridePort).build(); } } DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr); des.setZone(DiscoveryClient.getZone(ii)); serverList.add(des); } } if (serverList.size()>0 && prioritizeVipAddressBasedServers){ break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers } } } return serverList; }
看到這裡程式碼就已經很明顯了,我們來解讀下這段程式碼:
- 通過eurekaClientProvider獲取對應EurekaClient
- 通過vipAdress(實際就是serviceName)獲取對應註冊表集合資訊
- 將註冊資訊組裝成
DiscoveryEnabledServer
列表
再回到DynamicServerListLoadBalancer.updateListOfServers()
中,這裡獲取到對應的DiscoveryEnabledServer list後調用updateAllServerList()
方法,一路跟蹤這裡最終會調用BaseLoadBalancer.setServersList()
public class BaseLoadBalancer extends AbstractLoadBalancer implements PrimeConnections.PrimeConnectionListener, IClientConfigAware { @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL) protected volatile List<Server> allServerList = Collections .synchronizedList(new ArrayList<Server>()); public void setServersList(List lsrv) { Lock writeLock = allServerLock.writeLock(); logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name); ArrayList<Server> newServers = new ArrayList<Server>(); writeLock.lock(); try { ArrayList<Server> allServers = new ArrayList<Server>(); for (Object server : lsrv) { if (server == null) { continue; } if (server instanceof String) { server = new Server((String) server); } if (server instanceof Server) { logger.debug("LoadBalancer [{}]: addServer [{}]", name, ((Server) server).getId()); allServers.add((Server) server); } else { throw new IllegalArgumentException( "Type String or Server expected, instead found:" + server.getClass()); } } boolean listChanged = false; if (!allServerList.equals(allServers)) { listChanged = true; if (changeListeners != null && changeListeners.size() > 0) { List<Server> oldList = ImmutableList.copyOf(allServerList); List<Server> newList = ImmutableList.copyOf(allServers); for (ServerListChangeListener l: changeListeners) { try { l.serverListChanged(oldList, newList); } catch (Exception e) { logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e); } } } } if (isEnablePrimingConnections()) { for (Server server : allServers) { if (!allServerList.contains(server)) { server.setReadyToServe(false); newServers.add((Server) server); } } if (primeConnections != null) { primeConnections.primeConnectionsAsync(newServers, this); } } // This will reset readyToServe flag to true on all servers // regardless whether // previous priming connections are success or not allServerList = allServers; if (canSkipPing()) { for (Server s : allServerList) { s.setAlive(true); } upServerList = allServerList; } else if (listChanged) { forceQuickPing(); } } finally { writeLock.unlock(); } } }
這個過程最後用一張圖總結為:

ribbon如何更新自己保存的註冊表資訊?
上面已經講了 Ribbon是如何通過serviceName拉取到註冊表的,我們知道EurekaClient默認是30s拉取一次註冊表資訊的,因為Ribbon要關聯註冊表資訊,那麼Ribbon該如何更新自己存儲的註冊表資訊呢?
繼續回到DynamicSeverListLoadBalancer.restOfInit()
方法中:
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer { protected volatile ServerListUpdater serverListUpdater; void restOfInit(IClientConfig clientConfig) { boolean primeConnection = this.isEnablePrimingConnections(); // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList() this.setEnablePrimingConnections(false); enableAndInitLearnNewServersFeature(); updateListOfServers(); if (primeConnection && this.getPrimeConnections() != null) { this.getPrimeConnections() .primeConnections(getReachableServers()); } this.setEnablePrimingConnections(primeConnection); LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString()); } public void enableAndInitLearnNewServersFeature() { LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName()); serverListUpdater.start(updateAction); } }
重點查看enableAndInitLearnNewServersFeature()
方法,從名字我們就可以看出來這意思為激活和初始化學習新服務的功能,這裡實際上就啟動serverListUpdater
中的一個執行緒。
在最上面Ribbon初始化的過程中我們知道,在RibbonClientConfiguration
中默認初始化的ServerListUpdater
為 PollingServreListUpdater
,我們繼續跟這個類的start方法:
@Override public synchronized void start(final UpdateAction updateAction) { if (isActive.compareAndSet(false, true)) { final Runnable wrapperRunnable = new Runnable() { @Override public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } } }; scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, refreshIntervalMs, TimeUnit.MILLISECONDS ); } else { logger.info("Already active, no-op"); } }
這裡只要是執行updateAction.doUpdate();
,然後後面啟動了一個調度任務,默認30s執行一次。
繼續往後跟doUpdate()
方法:
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer { protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() { @Override public void doUpdate() { updateListOfServers(); } }; }
這裡又調用了之前通過serviceName獲取對應註冊服務列表的方法了。
總結到一張圖如下:

總結
本文主要是重新梳理了Ribbon的初始化過程,主要是幾個Configure初始化的過程,然後是Ribbon與Eureka的整合,這裡也涉及到了註冊表的更新邏輯。
看到這裡真是被Spring的各種AutoConfigure繞暈了,哈哈,但是最後分析完 還是覺得挺清晰的,對於複雜的業務畫張流程圖還挺容易理解的。
申明
本文章首發自本人部落格:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!