Eureka 缓存结构以及服务感知优化

  • 2019 年 10 月 3 日
  • 筆記

果然好记性不如烂笔头,再简单的东西不记录下来总是会忘的!

本文首先会分析eureka中的缓存架构。并在此基础上优化服务之间的感知

Eureka-Client获取注册信息

eureka-client获取注册信息可分为两种,分别是全量获取和增量获取。

Eureka-Client 启动时,首先执行一次全量获取进行本地缓存注册信息,代码如下:

@Inject      DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,                      Provider<BackupRegistry> backupRegistryProvider) {                      if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {              fetchRegistryFromBackup();          }       }

项目中配置

eureka.client.fetch-registry=true

便可以调用fetchRegistry方法,从eureka-server全量获取注册信息

Eureka-Client 启动时,还会初始化一个缓存刷新定时任务

private void initScheduledTasks() {          if (clientConfig.shouldFetchRegistry()) {              // registry cache refresh timer              int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();              int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();              scheduler.schedule(                      new TimedSupervisorTask(                              "cacheRefresh",                              scheduler,                              cacheRefreshExecutor,                              registryFetchIntervalSeconds,                              TimeUnit.SECONDS,                              expBackOffBound,                              new CacheRefreshThread()                      ),                      registryFetchIntervalSeconds, TimeUnit.SECONDS);          }      }

每间隔 registryFetchIntervalSeconds(默认值是30) 秒执行一次CacheRefreshThread任务。CacheRefreshThread最终还是执行了fetchRegistry方法。

private boolean fetchRegistry(boolean forceFullRegistryFetch) {          try {              Applications applications = getApplications();                if (clientConfig.shouldDisableDelta()                      || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))                      || forceFullRegistryFetch                      || (applications == null)                      || (applications.getRegisteredApplications().size() == 0)                      || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta              {                  getAndStoreFullRegistry();              } else {                  getAndUpdateDelta(applications);              }              applications.setAppsHashCode(applications.getReconcileHashCode());          } catch (Throwable e) {              logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);              return false;          } finally {              if (tracer != null) {                  tracer.stop();              }          }          // Notify about cache refresh before updating the instance remote status          onCacheRefreshed();          // Update remote status based on refreshed data held in the cache          updateInstanceRemoteStatus();          // registry was fetched successfully, so return true          return true;      }

fetchRegistry首先判断是全量获取还是增量获取,然后请求server端获取注册信息,成功后更新注册信息。再触发CacheRefreshed事件

Eureka-Server管理注册信息

客户端的请求到Server端后,通过ResponseCache返回服务信息

@GET      public Response getContainers(@PathParam("version") String version,                                    @HeaderParam(HEADER_ACCEPT) String acceptHeader,                                    @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,                                    @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,                                    @Context UriInfo uriInfo,                                    @Nullable @QueryParam("regions") String regionsStr) {            boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();          String[] regions = null;          if (!isRemoteRegionRequested) {              EurekaMonitors.GET_ALL.increment();          } else {              regions = regionsStr.toLowerCase().split(",");              Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.              EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();          }             // 判断是否可以访问          if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {              return Response.status(Status.FORBIDDEN).build();          }          CurrentRequestVersion.set(Version.toEnum(version));          // 返回数据格式          KeyType keyType = Key.KeyType.JSON;          String returnMediaType = MediaType.APPLICATION_JSON;          if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {              keyType = Key.KeyType.XML;              returnMediaType = MediaType.APPLICATION_XML;          }          // 响应缓存键( KEY )          Key cacheKey = new Key(Key.EntityType.Application,                  ResponseCacheImpl.ALL_APPS,                  keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions          );            Response response;          if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {          // 根据cacheKey返回注册信息              response = Response.ok(responseCache.getGZIP(cacheKey))                      .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)                      .header(HEADER_CONTENT_TYPE, returnMediaType)                      .build();          } else {              response = Response.ok(responseCache.get(cacheKey))                      .build();          }          return response;      }

重点就是在responseCache中的get方法了了

String get(final Key key, boolean useReadOnlyCache) {          Value payload = getValue(key, useReadOnlyCache);          if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {              return null;          } else {              return payload.getPayload();          }      }  private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();  private final LoadingCache<Key, Value> readWriteCacheMap;    this.readWriteCacheMap =                  CacheBuilder.newBuilder().initialCapacity(1000)                          .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)                          .removalListener(new RemovalListener<Key, Value>() {                              @Override                              public void onRemoval(RemovalNotification<Key, Value> notification) {                                  Key removedKey = notification.getKey();                                  if (removedKey.hasRegions()) {                                      Key cloneWithNoRegions = removedKey.cloneWithoutRegions();                                      regionSpecificKeys.remove(cloneWithNoRegions, removedKey);                                  }                              }                          })                          .build(new CacheLoader<Key, Value>() {                              @Override                              public Value load(Key key) throws Exception {                                  if (key.hasRegions()) {                                      Key cloneWithNoRegions = key.cloneWithoutRegions();                                      regionSpecificKeys.put(cloneWithNoRegions, key);                                  }                                  Value value = generatePayload(key);                                  return value;                              }                          });    Value getValue(final Key key, boolean useReadOnlyCache) {          Value payload = null;          try {              if (useReadOnlyCache) {              //从只读缓存中获取注册信息                  final Value currentPayload = readOnlyCacheMap.get(key);                  if (currentPayload != null) {                      payload = currentPayload;                  } else {                  //只读缓存不存在便从读写缓存中获取信息                      payload = readWriteCacheMap.get(key);                      readOnlyCacheMap.put(key, payload);                  }              } else {                  payload = readWriteCacheMap.get(key);              }          } catch (Throwable t) {              logger.error("Cannot get value for key :" + key, t);          }          return payload;      }    

这里采用了双层缓存的结构首先从readOnlyCacheMap读取数据,如果readOnlyCacheMap读取不到则从readWriteCacheMap读取数据。readOnlyCacheMap是个ConcurrentMap结构,而readWriteCacheMap则是一个guava cache,最大容量1000,180s后自动过期。

两个map之间的数据是如何交互的呢。这里有个定时任务每隔30秒去对比一次两个缓存中的数据,如果发现两者不一致,则用readWriteCacheMap的值覆盖readOnlyCacheMap的值

if (shouldUseReadOnlyResponseCache) {              timer.schedule(getCacheUpdateTask(),                      new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)                              + responseCacheUpdateIntervalMs),                      responseCacheUpdateIntervalMs);          }
private TimerTask getCacheUpdateTask() {          return new TimerTask() {              @Override              public void run() {                  logger.debug("Updating the client cache from response cache");                  for (Key key : readOnlyCacheMap.keySet()) {                      try {                          CurrentRequestVersion.set(key.getVersion());                          Value cacheValue = readWriteCacheMap.get(key);                          Value currentCacheValue = readOnlyCacheMap.get(key);                          //对比两个缓存的值                          if (cacheValue != currentCacheValue) {                              readOnlyCacheMap.put(key, cacheValue);                          }                      } catch (Throwable th) {                          logger.error("Error while updating the client cache from response cache", th);                      }                  }              }          };      }

现在我们知道了readOnlyCacheMap中的数据是从readWriteCacheMap获得的,并且每隔30s同步一次。那么还有一个问题就是readWriteCacheMap中的数据是从哪里来的呢?

在readWriteCacheMap变量上find usages无法找到明确的信息,便在build方法中添加断点

this.readWriteCacheMap =                  CacheBuilder.newBuilder().initialCapacity(1000)                          .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)                          .removalListener(new RemovalListener<Key, Value>() {                              @Override                              public void onRemoval(RemovalNotification<Key, Value> notification) {                                  Key removedKey = notification.getKey();                                  if (removedKey.hasRegions()) {                                      Key cloneWithNoRegions = removedKey.cloneWithoutRegions();                                      regionSpecificKeys.remove(cloneWithNoRegions, removedKey);                                  }                              }                          })                          .build(new CacheLoader<Key, Value>() {                              @Override                              public Value load(Key key) throws Exception {                                  if (key.hasRegions()) {                                      Key cloneWithNoRegions = key.cloneWithoutRegions();                                      regionSpecificKeys.put(cloneWithNoRegions, key);                                  }                                  //添加断点                                  Value value = generatePayload(key);                                  return value;                              }                          });

最终发现readWriteCacheMap的值是在同步任务中添加的

private TimerTask getCacheUpdateTask() {          return new TimerTask() {              @Override              public void run() {                  logger.debug("Updating the client cache from response cache");                  for (Key key : readOnlyCacheMap.keySet()) {                      try {                          CurrentRequestVersion.set(key.getVersion());                          Value cacheValue = readWriteCacheMap.get(key);                          //触发load方法加载Value                          Value currentCacheValue = readOnlyCacheMap.get(key);                          //对比两个缓存的值                          if (cacheValue != currentCacheValue) {                              readOnlyCacheMap.put(key, cacheValue);                          }                      } catch (Throwable th) {                          logger.error("Error while updating the client cache from response cache", th);                      }                  }              }          };      }

好,触发时机我们现在也知道了,我们再看下数据时怎么产生的。大致我们可以了解到readWriteCacheMap中的value是通过AbstractInstanceRegistry中的registry变量得到的

private final AbstractInstanceRegistry registry;    private Value generatePayload(Key key) {          Stopwatch tracer = null;          try {              String payload;              switch (key.getEntityType()) {                  case Application:                      boolean isRemoteRegionRequested = key.hasRegions();                        if (ALL_APPS.equals(key.getName())) {                          if (isRemoteRegionRequested) {                              tracer = serializeAllAppsWithRemoteRegionTimer.start();                              payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));                          } else {                              tracer = serializeAllAppsTimer.start();                              payload = getPayLoad(key, registry.getApplications());                          }                      } else if (ALL_APPS_DELTA.equals(key.getName())) {                          if (isRemoteRegionRequested) {                              tracer = serializeDeltaAppsWithRemoteRegionTimer.start();                              versionDeltaWithRegions.incrementAndGet();                              versionDeltaWithRegionsLegacy.incrementAndGet();                              payload = getPayLoad(key,                                      registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));                          } else {                              tracer = serializeDeltaAppsTimer.start();                              versionDelta.incrementAndGet();                              versionDeltaLegacy.incrementAndGet();                              payload = getPayLoad(key, registry.getApplicationDeltas());                          }                      } else {                          tracer = serializeOneApptimer.start();                          payload = getPayLoad(key, registry.getApplication(key.getName()));                      }                      break;                  case VIP:                  case SVIP:                      tracer = serializeViptimer.start();                      payload = getPayLoad(key, getApplicationsForVip(key, registry));                      break;                  default:                      logger.error("Unidentified entity type: " + key.getEntityType() + " found in the cache key.");                      payload = "";                      break;              }              return new Value(payload);          } finally {              if (tracer != null) {                  tracer.stop();              }          }      }

AbstractInstanceRegistry中的registry是一个多层缓存结构。client注册,续约,下线的数据都是通过registry进行保存

private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry              = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();

registry有一个定时任务每隔60s去剔除过期的数据

evictionTimer.schedule(evictionTaskRef.get(),                  //60*1000                  serverConfig.getEvictionIntervalTimerInMs(),                  serverConfig.getEvictionIntervalTimerInMs());    @Override          public void run() {              try {                  long compensationTimeMs = getCompensationTimeMs();                  logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);                  evict(compensationTimeMs);              } catch (Throwable e) {                  logger.error("Could not run the evict task", e);              }          }                

总结下

eureka客户端注册,续约,下线都会请求到server端,server端把数据保存在registry这个双层map中。每隔60s会有定时任务去检查registry中保存的租约是否已经过期(租约有效期是90s),然后每隔30s会有定时任务更新readWriteCacheMap的值以及同步readWriteCacheMap和readOnlyCacheMap的值

服务感知优化

基于上述流程,想象下,假如一个服务异常下线server端没有接受到下线请求,那么会有以下情况

  • 0s 时服务未通知 Eureka Client 直接下线;
  • 29s 时第一次过期检查 evict 未超过 90s;
  • 89s 时第二次过期检查 evict 未超过 90s;
  • 149s 时第三次过期检查 evict 未续约时间超过了 90s,故将该服务实例从 registry 中删除;
  • 179s 时定时任务更新readWriteCacheMap以及从 readWriteCacheMap 更新至 readOnlyCacheMap;
  • 209s 时 Eureka Client 从 Eureka Server 的 readOnlyCacheMap 更新;
  • 239s 时 Ribbon 从 Eureka Client 更新。

(ribbon同样也有缓存更新策略,默认30s)

因此,极限情况下服务消费者最长感知时间将无限趋近 240s。

怎么优化呢

server端:

减少registry服务剔除任务时间  减少两个缓存同步定时任务时间  小型系统可以直接去掉readOnlyCacheMap

服务提供端

减少心跳时间  减少租约过期时间

服务消费端

减少ribbon更新时间  减少fetchRegist时间

EurekaServer修改如下配置:

#eureka server刷新readCacheMap的时间,注意,client读取的是readCacheMap,这个时间决定了多久会把readWriteCacheMap的缓存更新到readCacheMap上  #默认30s  eureka.server.responseCacheUpdateIntervalMs=3000  #eureka server缓存readWriteCacheMap失效时间,这个只有在这个时间过去后缓存才会失效,失效前不会更新,过期后从registry重新读取注册服务信息,registry是一个ConcurrentHashMap。  #由于启用了evict其实就用不太上改这个配置了  #默认180s  eureka.server.responseCacheAutoExpirationInSeconds=180    #启用主动失效,并且每次主动失效检测间隔为3s    Eureka Server会定时(间隔值是eureka.server.eviction-interval-timer-in-ms,默认值为0,默认情况不删除实例)进行检查,  如果发现实例在在一定时间(此值由客户端设置的eureka.instance.lease-expiration-duration-in-seconds定义,默认值为90s)  内没有收到心跳,则会注销此实例。  eureka.server.eviction-interval-timer-in-ms=3000

Eureka服务提供方修改如下配置:

#服务过期时间配置,超过这个时间没有接收到心跳EurekaServer就会将这个实例剔除  #注意,EurekaServer一定要设置eureka.server.eviction-interval-timer-in-ms否则这个配置无效,这个配置一般为服务刷新时间配置的三倍  #默认90s  eureka.instance.lease-expiration-duration-in-seconds=15  #服务刷新时间配置,每隔这个时间会主动心跳一次  #默认30s  eureka.instance.lease-renewal-interval-in-seconds=5    

Eureka服务调用方修改如下配置:

#eureka client刷新本地缓存时间  #默认30s  eureka.client.registryFetchIntervalSeconds=5  #eureka客户端ribbon刷新时间  #默认30s  ribbon.ServerListRefreshInterval=5000
Exit mobile version