【一起學源碼-微服務】Ribbon 源碼三:Ribbon與Eureka整合原理分析

  • 2020 年 2 月 11 日
  • 筆記

前言

前情回顧

上一篇講了Ribbon的初始化過程,從LoadBalancerAutoConfigurationRibbonAutoConfiguration 再到RibbonClientConfiguration,我們找到了ILoadBalancer默認初始化的對象等。

本講目錄

這一講我們會進一步往下探究Ribbon和Eureka是如何結合的。

通過上一講ILoadBalancer 我們已經可以拿到一個服務所有的服務節點資訊了,這裡面是怎麼把服務的名稱轉化為對應的具體host請求資訊的呢?

通過這一講 我們來一探究竟

目錄如下:

  1. EurekaClientAutoConfiguration.getLoadBalancer()回顧
  2. 再次梳理Ribbon初始化過程
  3. ServerList實現類初始化過程
  4. getUpdatedListOfServers()獲取註冊表列表分析
  5. ribbon如何更新自己保存的註冊表資訊?

說明

原創不易,如若轉載 請標明來源!

部落格地址:一枝花算不算浪漫 微信公眾號:壹枝花算不算浪漫

源碼閱讀

EurekaClientAutoConfiguration.getLoadBalancer()回顧

上一講我們已經深入的講解過getLoadBalancer() 方法的實現,每個serviceName都對應一個自己的SpringContext上下文資訊,然後通過ILoadBalancer.class從上下文資訊中獲取默認的LoadBalancer:ZoneAwareLoadBalancer, 我們看下這個類的構造函數:

public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,                               IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,                               ServerListUpdater serverListUpdater) {      super(clientConfig, rule, ping, serverList, filter, serverListUpdater);  }

繼續跟父類DynamicServerListLoadBalancer的初始化方法:

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {      volatile ServerList<T> serverListImpl;        volatile ServerListFilter<T> filter;        public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,                                           ServerList<T> serverList, ServerListFilter<T> filter,                                           ServerListUpdater serverListUpdater) {          super(clientConfig, rule, ping);          this.serverListImpl = serverList;          this.filter = filter;          this.serverListUpdater = serverListUpdater;          if (filter instanceof AbstractServerListFilter) {              ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());          }          restOfInit(clientConfig);      }        void restOfInit(IClientConfig clientConfig) {          boolean primeConnection = this.isEnablePrimingConnections();          // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()          this.setEnablePrimingConnections(false);          enableAndInitLearnNewServersFeature();            updateListOfServers();          if (primeConnection && this.getPrimeConnections() != null) {              this.getPrimeConnections()                      .primeConnections(getReachableServers());          }          this.setEnablePrimingConnections(primeConnection);          LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());      }        @VisibleForTesting      public void updateListOfServers() {          List<T> servers = new ArrayList<T>();          if (serverListImpl != null) {              servers = serverListImpl.getUpdatedListOfServers();              LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",                      getIdentifier(), servers);                if (filter != null) {                  servers = filter.getFilteredListOfServers(servers);                  LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",                          getIdentifier(), servers);              }          }          updateAllServerList(servers);      }  }

構造方法中有個restOfInit()方法,進去後又會有updateListOfServers() 方法,看方法名就知道這個肯定是和server註冊表相關的,繼續往後看,servers = serverListImpl.getUpdatedListOfServers();,這裡直接調用getUpdatedListOfServers()就獲取到了所有的註冊表資訊。

可以看到ServerList有四個實現類,這個到底是該調用哪個實現類的getUpdatedListOfServers()方法呢?接著往下看。

再次梳理Ribbon初始化過程

第二講我們已經見過Ribbon的初始化過程,並畫了圖整理,這裡針對於之前的圖再更新一下:

這裡主要是增加了RibbonEurekaAutoConfigurationEurekaRibbonClientConfiguration兩個配置類的初始化。

ServerList實現類初始化過程

上面已經梳理過 Ribbon初始化的過程,其中在EurekaRibbonClientConfiguration 會初始化RibbonServerList,程式碼如下:

@Configuration      public class EurekaRibbonClientConfiguration {      @Bean      @ConditionalOnMissingBean      public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {          if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {              return this.propertiesFactory.get(ServerList.class, config, serviceId);          }          DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(                  config, eurekaClientProvider);          DomainExtractingServerList serverList = new DomainExtractingServerList(                  discoveryServerList, config, this.approximateZoneFromHostname);          return serverList;      }  }

這裡實際的ServerList實際就是DiscoveryEnabledNIWSServerList,我們看下這個類:

public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer>{    }    public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {    }

所以可以看出來ServerList 實際就是在這裡進行初始化的,上面那個serverListImpl.getUpdatedListOfServers();即為調用DiscoveryEnabledNIWSServerList.getUpdatedListOfServers() 方法了,繼續往下看。

getUpdatedListOfServers()獲取註冊表分析

直接看DiscoveryEnabledNIWSServerList.getUpdatedListOfServers()源程式碼:

@Override  public List<DiscoveryEnabledServer> getUpdatedListOfServers(){      return obtainServersViaDiscovery();  }    private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {      List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {          logger.warn("EurekaClient has not been initialized yet, returning an empty list");          return new ArrayList<DiscoveryEnabledServer>();      }        EurekaClient eurekaClient = eurekaClientProvider.get();      if (vipAddresses!=null){          for (String vipAddress : vipAddresses.split(",")) {              // if targetRegion is null, it will be interpreted as the same region of client              List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);              for (InstanceInfo ii : listOfInstanceInfo) {                  if (ii.getStatus().equals(InstanceStatus.UP)) {                        if(shouldUseOverridePort){                          if(logger.isDebugEnabled()){                              logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);                          }                            // copy is necessary since the InstanceInfo builder just uses the original reference,                          // and we don't want to corrupt the global eureka copy of the object which may be                          // used by other clients in our system                          InstanceInfo copy = new InstanceInfo(ii);                            if(isSecure){                              ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();                          }else{                              ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();                          }                      }                        DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);                      des.setZone(DiscoveryClient.getZone(ii));                      serverList.add(des);                  }              }              if (serverList.size()>0 && prioritizeVipAddressBasedServers){                  break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers              }          }      }      return serverList;  }

看到這裡程式碼就已經很明顯了,我們來解讀下這段程式碼:

  1. 通過eurekaClientProvider獲取對應EurekaClient
  2. 通過vipAdress(實際就是serviceName)獲取對應註冊表集合資訊
  3. 將註冊資訊組裝成DiscoveryEnabledServer列表

再回到DynamicServerListLoadBalancer.updateListOfServers() 中,這裡獲取到對應的DiscoveryEnabledServer list後調用updateAllServerList()方法,一路跟蹤這裡最終會調用BaseLoadBalancer.setServersList()

public class BaseLoadBalancer extends AbstractLoadBalancer implements          PrimeConnections.PrimeConnectionListener, IClientConfigAware {        @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)      protected volatile List<Server> allServerList = Collections              .synchronizedList(new ArrayList<Server>());        public void setServersList(List lsrv) {          Lock writeLock = allServerLock.writeLock();          logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name);            ArrayList<Server> newServers = new ArrayList<Server>();          writeLock.lock();          try {              ArrayList<Server> allServers = new ArrayList<Server>();              for (Object server : lsrv) {                  if (server == null) {                      continue;                  }                    if (server instanceof String) {                      server = new Server((String) server);                  }                    if (server instanceof Server) {                      logger.debug("LoadBalancer [{}]:  addServer [{}]", name, ((Server) server).getId());                      allServers.add((Server) server);                  } else {                      throw new IllegalArgumentException(                              "Type String or Server expected, instead found:"                                      + server.getClass());                  }                }              boolean listChanged = false;              if (!allServerList.equals(allServers)) {                  listChanged = true;                  if (changeListeners != null && changeListeners.size() > 0) {                     List<Server> oldList = ImmutableList.copyOf(allServerList);                     List<Server> newList = ImmutableList.copyOf(allServers);                     for (ServerListChangeListener l: changeListeners) {                         try {                             l.serverListChanged(oldList, newList);                         } catch (Exception e) {                             logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e);                         }                     }                  }              }              if (isEnablePrimingConnections()) {                  for (Server server : allServers) {                      if (!allServerList.contains(server)) {                          server.setReadyToServe(false);                          newServers.add((Server) server);                      }                  }                  if (primeConnections != null) {                      primeConnections.primeConnectionsAsync(newServers, this);                  }              }              // This will reset readyToServe flag to true on all servers              // regardless whether              // previous priming connections are success or not              allServerList = allServers;              if (canSkipPing()) {                  for (Server s : allServerList) {                      s.setAlive(true);                  }                  upServerList = allServerList;              } else if (listChanged) {                  forceQuickPing();              }          } finally {              writeLock.unlock();          }      }  }

這個過程最後用一張圖總結為:

ribbon如何更新自己保存的註冊表資訊?

上面已經講了 Ribbon是如何通過serviceName拉取到註冊表的,我們知道EurekaClient默認是30s拉取一次註冊表資訊的,因為Ribbon要關聯註冊表資訊,那麼Ribbon該如何更新自己存儲的註冊表資訊呢?

繼續回到DynamicSeverListLoadBalancer.restOfInit()方法中:

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {        protected volatile ServerListUpdater serverListUpdater;        void restOfInit(IClientConfig clientConfig) {          boolean primeConnection = this.isEnablePrimingConnections();          // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()          this.setEnablePrimingConnections(false);          enableAndInitLearnNewServersFeature();            updateListOfServers();          if (primeConnection && this.getPrimeConnections() != null) {              this.getPrimeConnections()                      .primeConnections(getReachableServers());          }          this.setEnablePrimingConnections(primeConnection);          LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());      }        public void enableAndInitLearnNewServersFeature() {          LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());          serverListUpdater.start(updateAction);      }  }

重點查看enableAndInitLearnNewServersFeature()方法,從名字我們就可以看出來這意思為激活和初始化學習新服務的功能,這裡實際上就啟動serverListUpdater中的一個執行緒。

在最上面Ribbon初始化的過程中我們知道,在RibbonClientConfiguration中默認初始化的ServerListUpdaterPollingServreListUpdater,我們繼續跟這個類的start方法:

@Override  public synchronized void start(final UpdateAction updateAction) {      if (isActive.compareAndSet(false, true)) {          final Runnable wrapperRunnable = new Runnable() {              @Override              public void run() {                  if (!isActive.get()) {                      if (scheduledFuture != null) {                          scheduledFuture.cancel(true);                      }                      return;                  }                  try {                      updateAction.doUpdate();                      lastUpdated = System.currentTimeMillis();                  } catch (Exception e) {                      logger.warn("Failed one update cycle", e);                  }              }          };            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(                  wrapperRunnable,                  initialDelayMs,                  refreshIntervalMs,                  TimeUnit.MILLISECONDS          );      } else {          logger.info("Already active, no-op");      }  }

這裡只要是執行updateAction.doUpdate();,然後後面啟動了一個調度任務,默認30s執行一次。

繼續往後跟doUpdate()方法:

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {      protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {          @Override          public void doUpdate() {              updateListOfServers();          }      };  }

這裡又調用了之前通過serviceName獲取對應註冊服務列表的方法了。

總結到一張圖如下:

總結

本文主要是重新梳理了Ribbon的初始化過程,主要是幾個Configure初始化的過程,然後是Ribbon與Eureka的整合,這裡也涉及到了註冊表的更新邏輯。

看到這裡真是被Spring的各種AutoConfigure繞暈了,哈哈,但是最後分析完 還是覺得挺清晰的,對於複雜的業務畫張流程圖還挺容易理解的。

申明

本文章首發自本人部落格:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!