【一起学源码-微服务】Ribbon 源码三:Ribbon与Eureka整合原理分析

  • 2020 年 2 月 11 日
  • 筆記

前言

前情回顾

上一篇讲了Ribbon的初始化过程,从LoadBalancerAutoConfigurationRibbonAutoConfiguration 再到RibbonClientConfiguration,我们找到了ILoadBalancer默认初始化的对象等。

本讲目录

这一讲我们会进一步往下探究Ribbon和Eureka是如何结合的。

通过上一讲ILoadBalancer 我们已经可以拿到一个服务所有的服务节点信息了,这里面是怎么把服务的名称转化为对应的具体host请求信息的呢?

通过这一讲 我们来一探究竟

目录如下:

  1. EurekaClientAutoConfiguration.getLoadBalancer()回顾
  2. 再次梳理Ribbon初始化过程
  3. ServerList实现类初始化过程
  4. getUpdatedListOfServers()获取注册表列表分析
  5. ribbon如何更新自己保存的注册表信息?

说明

原创不易,如若转载 请标明来源!

博客地址:一枝花算不算浪漫 微信公众号:壹枝花算不算浪漫

源码阅读

EurekaClientAutoConfiguration.getLoadBalancer()回顾

上一讲我们已经深入的讲解过getLoadBalancer() 方法的实现,每个serviceName都对应一个自己的SpringContext上下文信息,然后通过ILoadBalancer.class从上下文信息中获取默认的LoadBalancer:ZoneAwareLoadBalancer, 我们看下这个类的构造函数:

public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,                               IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,                               ServerListUpdater serverListUpdater) {      super(clientConfig, rule, ping, serverList, filter, serverListUpdater);  }

继续跟父类DynamicServerListLoadBalancer的初始化方法:

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {      volatile ServerList<T> serverListImpl;        volatile ServerListFilter<T> filter;        public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,                                           ServerList<T> serverList, ServerListFilter<T> filter,                                           ServerListUpdater serverListUpdater) {          super(clientConfig, rule, ping);          this.serverListImpl = serverList;          this.filter = filter;          this.serverListUpdater = serverListUpdater;          if (filter instanceof AbstractServerListFilter) {              ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());          }          restOfInit(clientConfig);      }        void restOfInit(IClientConfig clientConfig) {          boolean primeConnection = this.isEnablePrimingConnections();          // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()          this.setEnablePrimingConnections(false);          enableAndInitLearnNewServersFeature();            updateListOfServers();          if (primeConnection && this.getPrimeConnections() != null) {              this.getPrimeConnections()                      .primeConnections(getReachableServers());          }          this.setEnablePrimingConnections(primeConnection);          LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());      }        @VisibleForTesting      public void updateListOfServers() {          List<T> servers = new ArrayList<T>();          if (serverListImpl != null) {              servers = serverListImpl.getUpdatedListOfServers();              LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",                      getIdentifier(), servers);                if (filter != null) {                  servers = filter.getFilteredListOfServers(servers);                  LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",                          getIdentifier(), servers);              }          }          updateAllServerList(servers);      }  }

构造方法中有个restOfInit()方法,进去后又会有updateListOfServers() 方法,看方法名就知道这个肯定是和server注册表相关的,继续往后看,servers = serverListImpl.getUpdatedListOfServers();,这里直接调用getUpdatedListOfServers()就获取到了所有的注册表信息。

可以看到ServerList有四个实现类,这个到底是该调用哪个实现类的getUpdatedListOfServers()方法呢?接着往下看。

再次梳理Ribbon初始化过程

第二讲我们已经见过Ribbon的初始化过程,并画了图整理,这里针对于之前的图再更新一下:

这里主要是增加了RibbonEurekaAutoConfigurationEurekaRibbonClientConfiguration两个配置类的初始化。

ServerList实现类初始化过程

上面已经梳理过 Ribbon初始化的过程,其中在EurekaRibbonClientConfiguration 会初始化RibbonServerList,代码如下:

@Configuration      public class EurekaRibbonClientConfiguration {      @Bean      @ConditionalOnMissingBean      public ServerList<?> ribbonServerList(IClientConfig config, Provider<EurekaClient> eurekaClientProvider) {          if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {              return this.propertiesFactory.get(ServerList.class, config, serviceId);          }          DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(                  config, eurekaClientProvider);          DomainExtractingServerList serverList = new DomainExtractingServerList(                  discoveryServerList, config, this.approximateZoneFromHostname);          return serverList;      }  }

这里实际的ServerList实际就是DiscoveryEnabledNIWSServerList,我们看下这个类:

public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer>{    }    public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {    }

所以可以看出来ServerList 实际就是在这里进行初始化的,上面那个serverListImpl.getUpdatedListOfServers();即为调用DiscoveryEnabledNIWSServerList.getUpdatedListOfServers() 方法了,继续往下看。

getUpdatedListOfServers()获取注册表分析

直接看DiscoveryEnabledNIWSServerList.getUpdatedListOfServers()源代码:

@Override  public List<DiscoveryEnabledServer> getUpdatedListOfServers(){      return obtainServersViaDiscovery();  }    private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {      List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {          logger.warn("EurekaClient has not been initialized yet, returning an empty list");          return new ArrayList<DiscoveryEnabledServer>();      }        EurekaClient eurekaClient = eurekaClientProvider.get();      if (vipAddresses!=null){          for (String vipAddress : vipAddresses.split(",")) {              // if targetRegion is null, it will be interpreted as the same region of client              List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);              for (InstanceInfo ii : listOfInstanceInfo) {                  if (ii.getStatus().equals(InstanceStatus.UP)) {                        if(shouldUseOverridePort){                          if(logger.isDebugEnabled()){                              logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);                          }                            // copy is necessary since the InstanceInfo builder just uses the original reference,                          // and we don't want to corrupt the global eureka copy of the object which may be                          // used by other clients in our system                          InstanceInfo copy = new InstanceInfo(ii);                            if(isSecure){                              ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();                          }else{                              ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();                          }                      }                        DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);                      des.setZone(DiscoveryClient.getZone(ii));                      serverList.add(des);                  }              }              if (serverList.size()>0 && prioritizeVipAddressBasedServers){                  break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers              }          }      }      return serverList;  }

看到这里代码就已经很明显了,我们来解读下这段代码:

  1. 通过eurekaClientProvider获取对应EurekaClient
  2. 通过vipAdress(实际就是serviceName)获取对应注册表集合信息
  3. 将注册信息组装成DiscoveryEnabledServer列表

再回到DynamicServerListLoadBalancer.updateListOfServers() 中,这里获取到对应的DiscoveryEnabledServer list后调用updateAllServerList()方法,一路跟踪这里最终会调用BaseLoadBalancer.setServersList()

public class BaseLoadBalancer extends AbstractLoadBalancer implements          PrimeConnections.PrimeConnectionListener, IClientConfigAware {        @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)      protected volatile List<Server> allServerList = Collections              .synchronizedList(new ArrayList<Server>());        public void setServersList(List lsrv) {          Lock writeLock = allServerLock.writeLock();          logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name);            ArrayList<Server> newServers = new ArrayList<Server>();          writeLock.lock();          try {              ArrayList<Server> allServers = new ArrayList<Server>();              for (Object server : lsrv) {                  if (server == null) {                      continue;                  }                    if (server instanceof String) {                      server = new Server((String) server);                  }                    if (server instanceof Server) {                      logger.debug("LoadBalancer [{}]:  addServer [{}]", name, ((Server) server).getId());                      allServers.add((Server) server);                  } else {                      throw new IllegalArgumentException(                              "Type String or Server expected, instead found:"                                      + server.getClass());                  }                }              boolean listChanged = false;              if (!allServerList.equals(allServers)) {                  listChanged = true;                  if (changeListeners != null && changeListeners.size() > 0) {                     List<Server> oldList = ImmutableList.copyOf(allServerList);                     List<Server> newList = ImmutableList.copyOf(allServers);                     for (ServerListChangeListener l: changeListeners) {                         try {                             l.serverListChanged(oldList, newList);                         } catch (Exception e) {                             logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e);                         }                     }                  }              }              if (isEnablePrimingConnections()) {                  for (Server server : allServers) {                      if (!allServerList.contains(server)) {                          server.setReadyToServe(false);                          newServers.add((Server) server);                      }                  }                  if (primeConnections != null) {                      primeConnections.primeConnectionsAsync(newServers, this);                  }              }              // This will reset readyToServe flag to true on all servers              // regardless whether              // previous priming connections are success or not              allServerList = allServers;              if (canSkipPing()) {                  for (Server s : allServerList) {                      s.setAlive(true);                  }                  upServerList = allServerList;              } else if (listChanged) {                  forceQuickPing();              }          } finally {              writeLock.unlock();          }      }  }

这个过程最后用一张图总结为:

ribbon如何更新自己保存的注册表信息?

上面已经讲了 Ribbon是如何通过serviceName拉取到注册表的,我们知道EurekaClient默认是30s拉取一次注册表信息的,因为Ribbon要关联注册表信息,那么Ribbon该如何更新自己存储的注册表信息呢?

继续回到DynamicSeverListLoadBalancer.restOfInit()方法中:

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {        protected volatile ServerListUpdater serverListUpdater;        void restOfInit(IClientConfig clientConfig) {          boolean primeConnection = this.isEnablePrimingConnections();          // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()          this.setEnablePrimingConnections(false);          enableAndInitLearnNewServersFeature();            updateListOfServers();          if (primeConnection && this.getPrimeConnections() != null) {              this.getPrimeConnections()                      .primeConnections(getReachableServers());          }          this.setEnablePrimingConnections(primeConnection);          LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());      }        public void enableAndInitLearnNewServersFeature() {          LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());          serverListUpdater.start(updateAction);      }  }

重点查看enableAndInitLearnNewServersFeature()方法,从名字我们就可以看出来这意思为激活和初始化学习新服务的功能,这里实际上就启动serverListUpdater中的一个线程。

在最上面Ribbon初始化的过程中我们知道,在RibbonClientConfiguration中默认初始化的ServerListUpdaterPollingServreListUpdater,我们继续跟这个类的start方法:

@Override  public synchronized void start(final UpdateAction updateAction) {      if (isActive.compareAndSet(false, true)) {          final Runnable wrapperRunnable = new Runnable() {              @Override              public void run() {                  if (!isActive.get()) {                      if (scheduledFuture != null) {                          scheduledFuture.cancel(true);                      }                      return;                  }                  try {                      updateAction.doUpdate();                      lastUpdated = System.currentTimeMillis();                  } catch (Exception e) {                      logger.warn("Failed one update cycle", e);                  }              }          };            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(                  wrapperRunnable,                  initialDelayMs,                  refreshIntervalMs,                  TimeUnit.MILLISECONDS          );      } else {          logger.info("Already active, no-op");      }  }

这里只要是执行updateAction.doUpdate();,然后后面启动了一个调度任务,默认30s执行一次。

继续往后跟doUpdate()方法:

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {      protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {          @Override          public void doUpdate() {              updateListOfServers();          }      };  }

这里又调用了之前通过serviceName获取对应注册服务列表的方法了。

总结到一张图如下:

总结

本文主要是重新梳理了Ribbon的初始化过程,主要是几个Configure初始化的过程,然后是Ribbon与Eureka的整合,这里也涉及到了注册表的更新逻辑。

看到这里真是被Spring的各种AutoConfigure绕晕了,哈哈,但是最后分析完 还是觉得挺清晰的,对于复杂的业务画张流程图还挺容易理解的。

申明

本文章首发自本人博客:https://www.cnblogs.com/wang-meng 和公众号:壹枝花算不算浪漫,如若转载请标明来源!