EurekaServer源碼分析

Eureka Server功能

  • 接受服務註冊
  • 接受服務心跳
  • 服務剔除
  • 服務下線
  • 集群同步
  • 獲取註冊表中服務實例信息

需要注意的是,Eureka Server同時也是一個Eureka Client,在不禁止Eureka Server的客戶端行為時,它會向它配置文件中的其他Eureka Server進行拉取註冊表、服務註冊和發送心跳等操作

啟動server註冊相關bean

註冊外部的配置類 spring-cloud-netflix-eureka-server-2.1.2.REALEASE.jar 中 META-INF/spring.factories中

  • org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  • org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

啟動時會自動加載:EurekaServerAutoConfiguration

功能:向spring的bean工廠添加eureka-server相關功能的bean

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.springframework.cloud.netflix.eureka.server;

import com.netflix.appinfo.ApplicationInfoManager;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.EurekaClientConfig;
import com.netflix.discovery.converters.EurekaJacksonCodec;
import com.netflix.discovery.converters.wrappers.CodecWrapper;
import com.netflix.discovery.converters.wrappers.CodecWrappers;
import com.netflix.discovery.converters.wrappers.CodecWrappers.JacksonJsonMini;
import com.netflix.discovery.converters.wrappers.CodecWrappers.JacksonXmlMini;
import com.netflix.discovery.converters.wrappers.CodecWrappers.XStreamXml;
import com.netflix.eureka.DefaultEurekaServerContext;
import com.netflix.eureka.EurekaServerConfig;
import com.netflix.eureka.EurekaServerContext;
import com.netflix.eureka.cluster.PeerEurekaNode;
import com.netflix.eureka.cluster.PeerEurekaNodes;
import com.netflix.eureka.registry.PeerAwareInstanceRegistry;
import com.netflix.eureka.resources.DefaultServerCodecs;
import com.netflix.eureka.resources.ServerCodecs;
import com.netflix.eureka.transport.JerseyReplicationClient;
import com.sun.jersey.api.core.DefaultResourceConfig;
import com.sun.jersey.spi.container.servlet.ServletContainer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.servlet.Filter;
import javax.ws.rs.Path;
import javax.ws.rs.core.Application;
import javax.ws.rs.ext.Provider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.cloud.client.actuator.HasFeatures;
import org.springframework.cloud.context.environment.EnvironmentChangeEvent;
import org.springframework.cloud.netflix.eureka.server.EurekaServerMarkerConfiguration.Marker;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.PropertySource;
import org.springframework.core.env.Environment;
import org.springframework.core.io.ResourceLoader;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.util.ClassUtils;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration(
    proxyBeanMethods = false
)
@Import({EurekaServerInitializerConfiguration.class})
@ConditionalOnBean({Marker.class})
@EnableConfigurationProperties({EurekaDashboardProperties.class, InstanceRegistryProperties.class})
@PropertySource({"classpath:/eureka/server.properties"})
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
    private static final String[] EUREKA_PACKAGES = new String[]{"com.netflix.discovery", "com.netflix.eureka"};
    @Autowired
    private ApplicationInfoManager applicationInfoManager;
    @Autowired
    private EurekaServerConfig eurekaServerConfig;
    @Autowired
    private EurekaClientConfig eurekaClientConfig;
    @Autowired
    private EurekaClient eurekaClient;
    @Autowired
    private InstanceRegistryProperties instanceRegistryProperties;
    public static final CloudJacksonJson JACKSON_JSON = new CloudJacksonJson();

    public EurekaServerAutoConfiguration() {
    }

    @Bean
    public HasFeatures eurekaServerFeature() {
        return HasFeatures.namedFeature("Eureka Server", EurekaServerAutoConfiguration.class);
    }

    @Bean
    @ConditionalOnProperty(
        prefix = "eureka.dashboard",
        name = {"enabled"},
        matchIfMissing = true
    )
    public EurekaController eurekaController() {
        return new EurekaController(this.applicationInfoManager);
    }

    @Bean
    public ServerCodecs serverCodecs() {
        return new EurekaServerAutoConfiguration.CloudServerCodecs(this.eurekaServerConfig);
    }

    private static CodecWrapper getFullJson(EurekaServerConfig serverConfig) {
        CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getJsonCodecName());
        return codec == null ? CodecWrappers.getCodec(JACKSON_JSON.codecName()) : codec;
    }

    private static CodecWrapper getFullXml(EurekaServerConfig serverConfig) {
        CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getXmlCodecName());
        return codec == null ? CodecWrappers.getCodec(XStreamXml.class) : codec;
    }

    @Bean
    @ConditionalOnMissingBean
    public ReplicationClientAdditionalFilters replicationClientAdditionalFilters() {
        return new ReplicationClientAdditionalFilters(Collections.emptySet());
    }

    @Bean
    public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) {
        this.eurekaClient.getApplications();
        return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
    }

    @Bean
    @ConditionalOnMissingBean
    public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs, ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
        return new EurekaServerAutoConfiguration.RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.applicationInfoManager, replicationClientAdditionalFilters);
    }

    @Bean
    @ConditionalOnMissingBean
    public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
        return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager);
    }

    @Bean
    public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) {
        return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig, registry, serverContext);
    }

    @Bean
    public FilterRegistrationBean<?> jerseyFilterRegistration(Application eurekaJerseyApp) {
        FilterRegistrationBean<Filter> bean = new FilterRegistrationBean();
        bean.setFilter(new ServletContainer(eurekaJerseyApp));
        bean.setOrder(2147483647);
        bean.setUrlPatterns(Collections.singletonList("/eureka/*"));
        return bean;
    }

    @Bean
    public Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) {
        ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false, environment);
        provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
        provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
        Set<Class<?>> classes = new HashSet();
        String[] var5 = EUREKA_PACKAGES;
        int var6 = var5.length;

        for(int var7 = 0; var7 < var6; ++var7) {
            String basePackage = var5[var7];
            Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
            Iterator var10 = beans.iterator();

            while(var10.hasNext()) {
                BeanDefinition bd = (BeanDefinition)var10.next();
                Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader());
                classes.add(cls);
            }
        }

        Map<String, Object> propsAndFeatures = new HashMap();
        propsAndFeatures.put("com.sun.jersey.config.property.WebPageContentRegex", "/eureka/(fonts|images|css|js)/.*");
        DefaultResourceConfig rc = new DefaultResourceConfig(classes);
        rc.setPropertiesAndFeatures(propsAndFeatures);
        return rc;
    }

    @Bean
    @ConditionalOnBean(
        name = {"httpTraceFilter"}
    )
    public FilterRegistrationBean<?> traceFilterRegistration(@Qualifier("httpTraceFilter") Filter filter) {
        FilterRegistrationBean<Filter> bean = new FilterRegistrationBean();
        bean.setFilter(filter);
        bean.setOrder(2147483637);
        return bean;
    }

    static {
        CodecWrappers.registerWrapper(JACKSON_JSON);
        EurekaJacksonCodec.setInstance(JACKSON_JSON.getCodec());
    }

    class CloudServerCodecs extends DefaultServerCodecs {
        CloudServerCodecs(EurekaServerConfig serverConfig) {
            super(EurekaServerAutoConfiguration.getFullJson(serverConfig), CodecWrappers.getCodec(JacksonJsonMini.class), EurekaServerAutoConfiguration.getFullXml(serverConfig), CodecWrappers.getCodec(JacksonXmlMini.class));
        }
    }

    static class RefreshablePeerEurekaNodes extends PeerEurekaNodes implements ApplicationListener<EnvironmentChangeEvent> {
        private ReplicationClientAdditionalFilters replicationClientAdditionalFilters;

        RefreshablePeerEurekaNodes(final PeerAwareInstanceRegistry registry, final EurekaServerConfig serverConfig, final EurekaClientConfig clientConfig, final ServerCodecs serverCodecs, final ApplicationInfoManager applicationInfoManager, final ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
            super(registry, serverConfig, clientConfig, serverCodecs, applicationInfoManager);
            this.replicationClientAdditionalFilters = replicationClientAdditionalFilters;
        }

        protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
            JerseyReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(this.serverConfig, this.serverCodecs, peerEurekaNodeUrl);
            this.replicationClientAdditionalFilters.getFilters().forEach(replicationClient::addReplicationClientFilter);
            String targetHost = hostFromUrl(peerEurekaNodeUrl);
            if (targetHost == null) {
                targetHost = "host";
            }

            return new PeerEurekaNode(this.registry, targetHost, peerEurekaNodeUrl, replicationClient, this.serverConfig);
        }

        public void onApplicationEvent(final EnvironmentChangeEvent event) {
            if (this.shouldUpdate(event.getKeys())) {
                this.updatePeerEurekaNodes(this.resolvePeerUrls());
            }

        }

        protected boolean shouldUpdate(final Set<String> changedKeys) {
            assert changedKeys != null;

            if (this.clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                return false;
            } else if (changedKeys.contains("eureka.client.region")) {
                return true;
            } else {
                Iterator var2 = changedKeys.iterator();

                String key;
                do {
                    if (!var2.hasNext()) {
                        return false;
                    }

                    key = (String)var2.next();
                } while(!key.startsWith("eureka.client.service-url.") && !key.startsWith("eureka.client.availability-zones."));

                return true;
            }
        }
    }

    @Configuration(
        proxyBeanMethods = false
    )
    protected static class EurekaServerConfigBeanConfiguration {
        protected EurekaServerConfigBeanConfiguration() {
        }

        @Bean
        @ConditionalOnMissingBean
        public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
            EurekaServerConfigBean server = new EurekaServerConfigBean();
            if (clientConfig.shouldRegisterWithEureka()) {
                server.setRegistrySyncRetries(5);
            }

            return server;
        }
    }
}

但是EurekaServerAutoConfiguration的生效時有條件的

  • EurekaServerAutoConfiguration上有一個註解:@ConditionalOnBean(Marker)
  • 這個Marker是org.springframework.cloud.netflix.eureka.server.EurekaServerMarkerConfiguration.Marker

只有在Spring容器里有Marker這個類的實例時,才會加載EurekaServerAutoConfiguration,這個就是控制是否開啟Eureka Server的關鍵

開啟eureka server

開關

  • @EnableEurekaServer中,@Import(EurekaServerMarkerConfiguration.class)
  • 動態注入此bean到spring 容器,引入了EurekaServerMarkerConfiguration.class
  • 所以開啟了Server服務。所以註冊了前面說的:EurekaServerAutoConfiguration

查看@EnableEurekaServer註解

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.springframework.cloud.netflix.eureka.server;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.context.annotation.Import;

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({EurekaServerMarkerConfiguration.class})
public @interface EnableEurekaServer {
}

開啟註冊

  • 在EurekaServerMarkerConfiguration上有@Import(EurekaServerInitializerConfiguration.class),導入了EurekaServerInitializerConfiguration,
  • EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle
  • SmartLifecycle的作用是:初始化完之後,執行public void start()方法

查看EurekaServerInitializerConfiguration.class

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.springframework.cloud.netflix.eureka.server;

import com.netflix.eureka.EurekaServerConfig;
import javax.servlet.ServletContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.netflix.eureka.server.event.EurekaRegistryAvailableEvent;
import org.springframework.cloud.netflix.eureka.server.event.EurekaServerStartedEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.web.context.ServletContextAware;

@Configuration(
    proxyBeanMethods = false
)
public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered {
    private static final Log log = LogFactory.getLog(EurekaServerInitializerConfiguration.class);
    @Autowired
    private EurekaServerConfig eurekaServerConfig;
    private ServletContext servletContext;
    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private EurekaServerBootstrap eurekaServerBootstrap;
    private boolean running;
    private int order = 1;

    public EurekaServerInitializerConfiguration() {
    }

    public void setServletContext(ServletContext servletContext) {
        this.servletContext = servletContext;
    }

    public void start() {
        (new Thread(() -> {
            try {
                this.eurekaServerBootstrap.contextInitialized(this.servletContext);
                log.info("Started Eureka Server");
                this.publish(new EurekaRegistryAvailableEvent(this.getEurekaServerConfig()));
                this.running = true;
                this.publish(new EurekaServerStartedEvent(this.getEurekaServerConfig()));
            } catch (Exception var2) {
                log.error("Could not initialize Eureka servlet context", var2);
            }

        })).start();
    }

    private EurekaServerConfig getEurekaServerConfig() {
        return this.eurekaServerConfig;
    }

    private void publish(ApplicationEvent event) {
        this.applicationContext.publishEvent(event);
    }

    public void stop() {
        this.running = false;
        this.eurekaServerBootstrap.contextDestroyed(this.servletContext);
    }

    public boolean isRunning() {
        return this.running;
    }

    public int getPhase() {
        return 0;
    }

    public boolean isAutoStartup() {
        return true;
    }

    public void stop(Runnable callback) {
        callback.run();
    }

    public int getOrder() {
        return this.order;
    }
}
  • 在public void start()中,啟動一個線程,看注釋:log.info(“Started Eureka Server”);
  • 發佈事件:publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()))
  • 告訴client,可以來註冊了

上面提到的 log.info(“Started Eureka Server”) 的上面一行

eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);

點contextInitialized進去,查看EurekaServerBootstrap

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.springframework.cloud.netflix.eureka.server;

import com.netflix.appinfo.ApplicationInfoManager;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.appinfo.DataCenterInfo.Name;
import com.netflix.config.ConfigurationManager;
import com.netflix.discovery.EurekaClientConfig;
import com.netflix.discovery.converters.JsonXStream;
import com.netflix.discovery.converters.XmlXStream;
import com.netflix.eureka.EurekaServerConfig;
import com.netflix.eureka.EurekaServerContext;
import com.netflix.eureka.EurekaServerContextHolder;
import com.netflix.eureka.V1AwareInstanceInfoConverter;
import com.netflix.eureka.aws.AwsBinder;
import com.netflix.eureka.aws.AwsBinderDelegate;
import com.netflix.eureka.registry.PeerAwareInstanceRegistry;
import com.netflix.eureka.util.EurekaMonitors;
import javax.servlet.ServletContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class EurekaServerBootstrap {
    private static final Log log = LogFactory.getLog(EurekaServerBootstrap.class);
    private static final String TEST = "test";
    private static final String ARCHAIUS_DEPLOYMENT_ENVIRONMENT = "archaius.deployment.environment";
    private static final String EUREKA_ENVIRONMENT = "eureka.environment";
    private static final String DEFAULT = "default";
    private static final String ARCHAIUS_DEPLOYMENT_DATACENTER = "archaius.deployment.datacenter";
    private static final String EUREKA_DATACENTER = "eureka.datacenter";
    protected EurekaServerConfig eurekaServerConfig;
    protected ApplicationInfoManager applicationInfoManager;
    protected EurekaClientConfig eurekaClientConfig;
    protected PeerAwareInstanceRegistry registry;
    protected volatile EurekaServerContext serverContext;
    protected volatile AwsBinder awsBinder;

    public EurekaServerBootstrap(ApplicationInfoManager applicationInfoManager, EurekaClientConfig eurekaClientConfig, EurekaServerConfig eurekaServerConfig, PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) {
        this.applicationInfoManager = applicationInfoManager;
        this.eurekaClientConfig = eurekaClientConfig;
        this.eurekaServerConfig = eurekaServerConfig;
        this.registry = registry;
        this.serverContext = serverContext;
    }

    public void contextInitialized(ServletContext context) {
        try {
            this.initEurekaEnvironment();
            this.initEurekaServerContext();
            context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
        } catch (Throwable var3) {
            log.error("Cannot bootstrap eureka server :", var3);
            throw new RuntimeException("Cannot bootstrap eureka server :", var3);
        }
    }

    public void contextDestroyed(ServletContext context) {
        try {
            log.info("Shutting down Eureka Server..");
            context.removeAttribute(EurekaServerContext.class.getName());
            this.destroyEurekaServerContext();
            this.destroyEurekaEnvironment();
        } catch (Throwable var3) {
            log.error("Error shutting down eureka", var3);
        }

        log.info("Eureka Service is now shutdown...");
    }

    protected void initEurekaEnvironment() throws Exception {
        log.info("Setting the eureka configuration..");
        String dataCenter = ConfigurationManager.getConfigInstance().getString("eureka.datacenter");
        if (dataCenter == null) {
            log.info("Eureka data center value eureka.datacenter is not set, defaulting to default");
            ConfigurationManager.getConfigInstance().setProperty("archaius.deployment.datacenter", "default");
        } else {
            ConfigurationManager.getConfigInstance().setProperty("archaius.deployment.datacenter", dataCenter);
        }

        String environment = ConfigurationManager.getConfigInstance().getString("eureka.environment");
        if (environment == null) {
            ConfigurationManager.getConfigInstance().setProperty("archaius.deployment.environment", "test");
            log.info("Eureka environment value eureka.environment is not set, defaulting to test");
        } else {
            ConfigurationManager.getConfigInstance().setProperty("archaius.deployment.environment", environment);
        }

    }

    protected void initEurekaServerContext() throws Exception {
        JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
        XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
        if (this.isAws(this.applicationInfoManager.getInfo())) {
            this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager);
            this.awsBinder.start();
        }

        EurekaServerContextHolder.initialize(this.serverContext);
        log.info("Initialized server context");
        int registryCount = this.registry.syncUp();
        this.registry.openForTraffic(this.applicationInfoManager, registryCount);
        EurekaMonitors.registerAllStats();
    }

    protected void destroyEurekaServerContext() throws Exception {
        EurekaMonitors.shutdown();
        if (this.awsBinder != null) {
            this.awsBinder.shutdown();
        }

        if (this.serverContext != null) {
            this.serverContext.shutdown();
        }

    }

    protected void destroyEurekaEnvironment() throws Exception {
    }

    protected boolean isAws(InstanceInfo selfInstanceInfo) {
        boolean result = Name.Amazon == selfInstanceInfo.getDataCenterInfo().getName();
        log.info("isAws returned " + result);
        return result;
    }
}

看到initEurekaServerContext方法,初始化eureka 上下文

點initEurekaServerContext查看該方法

protected void initEurekaServerContext() throws Exception {
    JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
    XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
    if (this.isAws(this.applicationInfoManager.getInfo())) {
        this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager);
        this.awsBinder.start();
    }

    EurekaServerContextHolder.initialize(this.serverContext);
    log.info("Initialized server context");
    int registryCount = this.registry.syncUp();
    this.registry.openForTraffic(this.applicationInfoManager, registryCount);
    EurekaMonitors.registerAllStats();
}

看到 int registryCount = this.registry.syncUp(); // 從相鄰的eureka 節點複製註冊表
 下一行,查看 openForTraffic(主要是和client 交換信息,traffic)的實現類

查看 PeerAwareInstanceRegistryImpl

 

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package com.netflix.eureka.registry;

import com.netflix.appinfo.AmazonInfo;
import com.netflix.appinfo.ApplicationInfoManager;
import com.netflix.appinfo.DataCenterInfo;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.appinfo.LeaseInfo;
import com.netflix.appinfo.AmazonInfo.MetaDataKey;
import com.netflix.appinfo.DataCenterInfo.Name;
import com.netflix.appinfo.InstanceInfo.InstanceStatus;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.EurekaClientConfig;
import com.netflix.discovery.shared.Application;
import com.netflix.discovery.shared.Applications;
import com.netflix.eureka.EurekaServerConfig;
import com.netflix.eureka.Version;
import com.netflix.eureka.cluster.PeerEurekaNode;
import com.netflix.eureka.cluster.PeerEurekaNodes;
import com.netflix.eureka.registry.rule.DownOrStartingRule;
import com.netflix.eureka.registry.rule.FirstMatchWinsCompositeRule;
import com.netflix.eureka.registry.rule.InstanceStatusOverrideRule;
import com.netflix.eureka.registry.rule.LeaseExistsRule;
import com.netflix.eureka.registry.rule.OverrideExistsRule;
import com.netflix.eureka.resources.CurrentRequestVersion;
import com.netflix.eureka.resources.ServerCodecs;
import com.netflix.eureka.resources.ASGResource.ASGStatus;
import com.netflix.eureka.util.MeasuredRate;
import com.netflix.servo.DefaultMonitorRegistry;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
import com.netflix.servo.monitor.Monitors;
import com.netflix.servo.monitor.Stopwatch;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
    private static final Logger logger = LoggerFactory.getLogger(PeerAwareInstanceRegistryImpl.class);
    private static final String US_EAST_1 = "us-east-1";
    private static final int PRIME_PEER_NODES_RETRY_MS = 30000;
    private long startupTime = 0L;
    private boolean peerInstancesTransferEmptyOnStartup = true;
    private static final Comparator<Application> APP_COMPARATOR = new Comparator<Application>() {
        public int compare(Application l, Application r) {
            return l.getName().compareTo(r.getName());
        }
    };
    private final MeasuredRate numberOfReplicationsLastMin;
    protected final EurekaClient eurekaClient;
    protected volatile PeerEurekaNodes peerEurekaNodes;
    private final InstanceStatusOverrideRule instanceStatusOverrideRule;
    private Timer timer = new Timer("ReplicaAwareInstanceRegistry - RenewalThresholdUpdater", true);

    @Inject
    public PeerAwareInstanceRegistryImpl(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs, EurekaClient eurekaClient) {
        super(serverConfig, clientConfig, serverCodecs);
        this.eurekaClient = eurekaClient;
        this.numberOfReplicationsLastMin = new MeasuredRate(60000L);
        this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new InstanceStatusOverrideRule[]{new DownOrStartingRule(), new OverrideExistsRule(this.overriddenInstanceStatusMap), new LeaseExistsRule()});
    }

    protected InstanceStatusOverrideRule getInstanceInfoOverrideRule() {
        return this.instanceStatusOverrideRule;
    }

    public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
        this.numberOfReplicationsLastMin.start();
        this.peerEurekaNodes = peerEurekaNodes;
        this.initializedResponseCache();
        this.scheduleRenewalThresholdUpdateTask();
        this.initRemoteRegionRegistry();

        try {
            Monitors.registerObject(this);
        } catch (Throwable var3) {
            logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", var3);
        }

    }

    public void shutdown() {
        try {
            DefaultMonitorRegistry.getInstance().unregister(Monitors.newObjectMonitor(this));
        } catch (Throwable var3) {
            logger.error("Cannot shutdown monitor registry", var3);
        }

        try {
            this.peerEurekaNodes.shutdown();
        } catch (Throwable var2) {
            logger.error("Cannot shutdown ReplicaAwareInstanceRegistry", var2);
        }

        this.numberOfReplicationsLastMin.stop();
        this.timer.cancel();
        super.shutdown();
    }

    private void scheduleRenewalThresholdUpdateTask() {
        this.timer.schedule(new TimerTask() {
            public void run() {
                PeerAwareInstanceRegistryImpl.this.updateRenewalThreshold();
            }
        }, (long)this.serverConfig.getRenewalThresholdUpdateIntervalMs(), (long)this.serverConfig.getRenewalThresholdUpdateIntervalMs());
    }

    public int syncUp() {
        int count = 0;

        for(int i = 0; i < this.serverConfig.getRegistrySyncRetries() && count == 0; ++i) {
            if (i > 0) {
                try {
                    Thread.sleep(this.serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException var10) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }

            Applications apps = this.eurekaClient.getApplications();
            Iterator var4 = apps.getRegisteredApplications().iterator();

            while(var4.hasNext()) {
                Application app = (Application)var4.next();
                Iterator var6 = app.getInstances().iterator();

                while(var6.hasNext()) {
                    InstanceInfo instance = (InstanceInfo)var6.next();

                    try {
                        if (this.isRegisterable(instance)) {
                            this.register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            ++count;
                        }
                    } catch (Throwable var9) {
                        logger.error("During DS init copy", var9);
                    }
                }
            }
        }

        return count;
    }

    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        this.expectedNumberOfClientsSendingRenews = count;
        this.updateRenewsPerMinThreshold();
        logger.info("Got {} instances from neighboring DS node", count);
        logger.info("Renew threshold is: {}", this.numberOfRenewsPerMinThreshold);
        this.startupTime = System.currentTimeMillis();
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }

        Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        boolean isAws = Name.Amazon == selfName;
        if (isAws && this.serverConfig.shouldPrimeAwsReplicaConnections()) {
            logger.info("Priming AWS connections for all replicas..");
            this.primeAwsReplicas(applicationInfoManager);
        }

        logger.info("Changing status to UP");
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        super.postInit();
    }

    private void primeAwsReplicas(ApplicationInfoManager applicationInfoManager) {
        boolean areAllPeerNodesPrimed = false;

        while(!areAllPeerNodesPrimed) {
            String peerHostName = null;

            try {
                Application eurekaApps = this.getApplication(applicationInfoManager.getInfo().getAppName(), false);
                if (eurekaApps == null) {
                    areAllPeerNodesPrimed = true;
                    logger.info("No peers needed to prime.");
                    return;
                }

                Iterator var5 = this.peerEurekaNodes.getPeerEurekaNodes().iterator();

                while(var5.hasNext()) {
                    PeerEurekaNode node = (PeerEurekaNode)var5.next();
                    Iterator var7 = eurekaApps.getInstances().iterator();

                    while(var7.hasNext()) {
                        InstanceInfo peerInstanceInfo = (InstanceInfo)var7.next();
                        LeaseInfo leaseInfo = peerInstanceInfo.getLeaseInfo();
                        if (System.currentTimeMillis() <= leaseInfo.getRenewalTimestamp() + (long)(leaseInfo.getDurationInSecs() * 1000) + 120000L) {
                            peerHostName = peerInstanceInfo.getHostName();
                            logger.info("Trying to send heartbeat for the eureka server at {} to make sure the network channels are open", peerHostName);
                            if (peerHostName.equalsIgnoreCase((new URI(node.getServiceUrl())).getHost())) {
                                node.heartbeat(peerInstanceInfo.getAppName(), peerInstanceInfo.getId(), peerInstanceInfo, (InstanceStatus)null, true);
                            }
                        }
                    }
                }

                areAllPeerNodesPrimed = true;
            } catch (Throwable var11) {
                logger.error("Could not contact {}", peerHostName, var11);

                try {
                    Thread.sleep(30000L);
                } catch (InterruptedException var10) {
                    logger.warn("Interrupted while priming : ", var10);
                    areAllPeerNodesPrimed = true;
                }
            }
        }

    }

    public boolean shouldAllowAccess(boolean remoteRegionRequired) {
        if (this.peerInstancesTransferEmptyOnStartup && System.currentTimeMillis() <= this.startupTime + (long)this.serverConfig.getWaitTimeInMsWhenSyncEmpty()) {
            return false;
        } else {
            if (remoteRegionRequired) {
                Iterator var2 = this.regionNameVSRemoteRegistry.values().iterator();

                while(var2.hasNext()) {
                    RemoteRegionRegistry remoteRegionRegistry = (RemoteRegionRegistry)var2.next();
                    if (!remoteRegionRegistry.isReadyForServingData()) {
                        return false;
                    }
                }
            }

            return true;
        }
    }

    public boolean shouldAllowAccess() {
        return this.shouldAllowAccess(true);
    }

    /** @deprecated */
    @Deprecated
    public List<PeerEurekaNode> getReplicaNodes() {
        return Collections.unmodifiableList(this.peerEurekaNodes.getPeerEurekaNodes());
    }

    public boolean cancel(String appName, String id, boolean isReplication) {
        if (super.cancel(appName, id, isReplication)) {
            this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Cancel, appName, id, (InstanceInfo)null, (InstanceStatus)null, isReplication);
            return true;
        } else {
            return false;
        }
    }

    public void register(InstanceInfo info, boolean isReplication) {
        int leaseDuration = 90;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }

        super.register(info, leaseDuration, isReplication);
        this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register, info.getAppName(), info.getId(), info, (InstanceStatus)null, isReplication);
    }

    public boolean renew(String appName, String id, boolean isReplication) {
        if (super.renew(appName, id, isReplication)) {
            this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Heartbeat, appName, id, (InstanceInfo)null, (InstanceStatus)null, isReplication);
            return true;
        } else {
            return false;
        }
    }

    public boolean statusUpdate(String appName, String id, InstanceStatus newStatus, String lastDirtyTimestamp, boolean isReplication) {
        if (super.statusUpdate(appName, id, newStatus, lastDirtyTimestamp, isReplication)) {
            this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.StatusUpdate, appName, id, (InstanceInfo)null, newStatus, isReplication);
            return true;
        } else {
            return false;
        }
    }

    public boolean deleteStatusOverride(String appName, String id, InstanceStatus newStatus, String lastDirtyTimestamp, boolean isReplication) {
        if (super.deleteStatusOverride(appName, id, newStatus, lastDirtyTimestamp, isReplication)) {
            this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.DeleteStatusOverride, appName, id, (InstanceInfo)null, (InstanceStatus)null, isReplication);
            return true;
        } else {
            return false;
        }
    }

    public void statusUpdate(String asgName, ASGStatus newStatus, boolean isReplication) {
        if (!isReplication) {
            Iterator var4 = this.peerEurekaNodes.getPeerEurekaNodes().iterator();

            while(var4.hasNext()) {
                PeerEurekaNode node = (PeerEurekaNode)var4.next();
                this.replicateASGInfoToReplicaNodes(asgName, newStatus, node);
            }

        }
    }

    public boolean isLeaseExpirationEnabled() {
        if (!this.isSelfPreservationModeEnabled()) {
            return true;
        } else {
            return this.numberOfRenewsPerMinThreshold > 0 && this.getNumOfRenewsInLastMin() > (long)this.numberOfRenewsPerMinThreshold;
        }
    }

    public boolean isSelfPreservationModeEnabled() {
        return this.serverConfig.shouldEnableSelfPreservation();
    }

    public InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure) {
        return null;
    }

    private void updateRenewalThreshold() {
        try {
            Applications apps = this.eurekaClient.getApplications();
            int count = 0;
            Iterator var3 = apps.getRegisteredApplications().iterator();

            while(var3.hasNext()) {
                Application app = (Application)var3.next();
                Iterator var5 = app.getInstances().iterator();

                while(var5.hasNext()) {
                    InstanceInfo instance = (InstanceInfo)var5.next();
                    if (this.isRegisterable(instance)) {
                        ++count;
                    }
                }
            }

            synchronized(this.lock) {
                if ((double)count > this.serverConfig.getRenewalPercentThreshold() * (double)this.expectedNumberOfClientsSendingRenews || !this.isSelfPreservationModeEnabled()) {
                    this.expectedNumberOfClientsSendingRenews = count;
                    this.updateRenewsPerMinThreshold();
                }
            }

            logger.info("Current renewal threshold is : {}", this.numberOfRenewsPerMinThreshold);
        } catch (Throwable var9) {
            logger.error("Cannot update renewal threshold", var9);
        }

    }

    public List<Application> getSortedApplications() {
        List<Application> apps = new ArrayList(this.getApplications().getRegisteredApplications());
        Collections.sort(apps, APP_COMPARATOR);
        return apps;
    }

    @Monitor(
        name = "numOfReplicationsInLastMin",
        description = "Number of total replications received in the last minute",
        type = DataSourceType.GAUGE
    )
    public long getNumOfReplicationsInLastMin() {
        return this.numberOfReplicationsLastMin.getCount();
    }

    @Monitor(
        name = "isBelowRenewThreshold",
        description = "0 = false, 1 = true",
        type = DataSourceType.GAUGE
    )
    public int isBelowRenewThresold() {
        return this.getNumOfRenewsInLastMin() <= (long)this.numberOfRenewsPerMinThreshold && this.startupTime > 0L && System.currentTimeMillis() > this.startupTime + (long)this.serverConfig.getWaitTimeInMsWhenSyncEmpty() ? 1 : 0;
    }

    public boolean isRegisterable(InstanceInfo instanceInfo) {
        DataCenterInfo datacenterInfo = instanceInfo.getDataCenterInfo();
        String serverRegion = this.clientConfig.getRegion();
        if (AmazonInfo.class.isInstance(datacenterInfo)) {
            AmazonInfo info = (AmazonInfo)AmazonInfo.class.cast(instanceInfo.getDataCenterInfo());
            String availabilityZone = info.get(MetaDataKey.availabilityZone);
            if (availabilityZone == null && "us-east-1".equalsIgnoreCase(serverRegion)) {
                return true;
            }

            if (availabilityZone != null && availabilityZone.contains(serverRegion)) {
                return true;
            }
        }

        return true;
    }

    private void replicateToPeers(PeerAwareInstanceRegistryImpl.Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();

        try {
            if (isReplication) {
                this.numberOfReplicationsLastMin.increment();
            }

            if (this.peerEurekaNodes != Collections.EMPTY_LIST && !isReplication) {
                Iterator var8 = this.peerEurekaNodes.getPeerEurekaNodes().iterator();

                while(var8.hasNext()) {
                    PeerEurekaNode node = (PeerEurekaNode)var8.next();
                    if (!this.peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                        this.replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
                    }
                }

                return;
            }
        } finally {
            tracer.stop();
        }

    }

    private void replicateInstanceActionsToPeers(PeerAwareInstanceRegistryImpl.Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) {
        try {
            CurrentRequestVersion.set(Version.V2);
            InstanceInfo infoFromRegistry;
            switch(action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = (InstanceStatus)this.overriddenInstanceStatusMap.get(id);
                infoFromRegistry = this.getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = this.getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = this.getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
            }
        } catch (Throwable var12) {
            logger.error("Cannot replicate information to {} for action {}", new Object[]{node.getServiceUrl(), action.name(), var12});
        } finally {
            CurrentRequestVersion.remove();
        }

    }

    private void replicateASGInfoToReplicaNodes(String asgName, ASGStatus newStatus, PeerEurekaNode node) {
        CurrentRequestVersion.set(Version.V2);

        try {
            node.statusUpdate(asgName, newStatus);
        } catch (Throwable var8) {
            logger.error("Cannot replicate ASG status information to {}", node.getServiceUrl(), var8);
        } finally {
            CurrentRequestVersion.remove();
        }

    }

    @Monitor(
        name = "localRegistrySize",
        description = "Current registry size",
        type = DataSourceType.GAUGE
    )
    public long getLocalRegistrySize() {
        return super.getLocalRegistrySize();
    }

    public static enum Action {
        Heartbeat,
        Register,
        Cancel,
        StatusUpdate,
        DeleteStatusOverride;

        private com.netflix.servo.monitor.Timer timer = Monitors.newTimer(this.name());

        private Action() {
        }

        public com.netflix.servo.monitor.Timer getTimer() {
            return this.timer;
        }
    }
}

openForTraffic開啟任務postInit,進去之後發現剔除功能(剔除 沒有續約的服務)

postInit,點進去,AbstractInstanceRegistry.class

  protected void postInit() {
        this.renewsLastMin.start();
        if (this.evictionTaskRef.get() != null) {
            ((AbstractInstanceRegistry.EvictionTask)this.evictionTaskRef.get()).cancel();
        }

        this.evictionTaskRef.set(new AbstractInstanceRegistry.EvictionTask());
        this.evictionTimer.schedule((TimerTask)this.evictionTaskRef.get(), this.serverConfig.getEvictionIntervalTimerInMs(), this.serverConfig.getEvictionIntervalTimerInMs());
    }

發現new EvictionTask()點進去

class EvictionTask extends TimerTask {
    private final AtomicLong lastExecutionNanosRef = new AtomicLong(0L);

    EvictionTask() {
    }

    public void run() {
        try {
            long compensationTimeMs = this.getCompensationTimeMs();
            AbstractInstanceRegistry.logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
            AbstractInstanceRegistry.this.evict(compensationTimeMs);
        } catch (Throwable var3) {
            AbstractInstanceRegistry.logger.error("Could not run the evict task", var3);
        }

    }

    long getCompensationTimeMs() {
        long currNanos = this.getCurrentTimeNano();
        long lastNanos = this.lastExecutionNanosRef.getAndSet(currNanos);
        if (lastNanos == 0L) {
            return 0L;
        } else {
            long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
            long compensationTime = elapsedMs - AbstractInstanceRegistry.this.serverConfig.getEvictionIntervalTimerInMs();
            return compensationTime <= 0L ? 0L : compensationTime;
        }
    }

    long getCurrentTimeNano() {
        return System.nanoTime();
    }
}

看到run方法中,evict(compensationTimeMs),點進去就到了,具體剔除邏輯

public void evict() {
    this.evict(0L);
}

public void evict(long additionalLeaseMs) {
    logger.debug("Running the evict task");
    if (!this.isLeaseExpirationEnabled()) {
        logger.debug("DS: lease expiration is currently disabled.");
    } else {
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList();
        Iterator var4 = this.registry.entrySet().iterator();

        while(true) {
            Map leaseMap;
            do {
                if (!var4.hasNext()) {
                    int registrySize = (int)this.getLocalRegistrySize();
                    int registrySizeThreshold = (int)((double)registrySize * this.serverConfig.getRenewalPercentThreshold());
                    int evictionLimit = registrySize - registrySizeThreshold;
                    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
                    if (toEvict > 0) {
                        logger.info("Evicting {} items (expired={}, evictionLimit={})", new Object[]{toEvict, expiredLeases.size(), evictionLimit});
                        Random random = new Random(System.currentTimeMillis());

                        for(int i = 0; i < toEvict; ++i) {
                            int next = i + random.nextInt(expiredLeases.size() - i);
                            Collections.swap(expiredLeases, i, next);
                            Lease<InstanceInfo> lease = (Lease)expiredLeases.get(i);
                            String appName = ((InstanceInfo)lease.getHolder()).getAppName();
                            String id = ((InstanceInfo)lease.getHolder()).getId();
                            EurekaMonitors.EXPIRED.increment();
                            logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                            this.internalCancel(appName, id, false);
                        }
                    }

                    return;
                }

                Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry = (Entry)var4.next();
                leaseMap = (Map)groupEntry.getValue();
            } while(leaseMap == null);

            Iterator var7 = leaseMap.entrySet().iterator();

            while(var7.hasNext()) {
                Entry<String, Lease<InstanceInfo>> leaseEntry = (Entry)var7.next();
                Lease<InstanceInfo> lease = (Lease)leaseEntry.getValue();
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                    expiredLeases.add(lease);
                }
            }
        }
    }
}

PeerAwareInstanceRegistry接口

在EurekaServerAutoConfiguration中 有 public EurekaServerContext eurekaServerContext

@Bean
@ConditionalOnMissingBean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
    return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager);
}

查看DefaultEurekaServerContext

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package com.netflix.eureka;

import com.netflix.appinfo.ApplicationInfoManager;
import com.netflix.eureka.cluster.PeerEurekaNodes;
import com.netflix.eureka.registry.PeerAwareInstanceRegistry;
import com.netflix.eureka.resources.ServerCodecs;
import com.netflix.eureka.util.EurekaMonitors;
import com.netflix.eureka.util.ServoControl;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class DefaultEurekaServerContext implements EurekaServerContext {
    private static final Logger logger = LoggerFactory.getLogger(DefaultEurekaServerContext.class);
    private final EurekaServerConfig serverConfig;
    private final ServerCodecs serverCodecs;
    private final PeerAwareInstanceRegistry registry;
    private final PeerEurekaNodes peerEurekaNodes;
    private final ApplicationInfoManager applicationInfoManager;

    @Inject
    public DefaultEurekaServerContext(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes, ApplicationInfoManager applicationInfoManager) {
        this.serverConfig = serverConfig;
        this.serverCodecs = serverCodecs;
        this.registry = registry;
        this.peerEurekaNodes = peerEurekaNodes;
        this.applicationInfoManager = applicationInfoManager;
    }

    @PostConstruct
    public void initialize() {
        logger.info("Initializing ...");
        this.peerEurekaNodes.start();

        try {
            this.registry.init(this.peerEurekaNodes);
        } catch (Exception var2) {
            throw new RuntimeException(var2);
        }

        logger.info("Initialized");
    }

    @PreDestroy
    public void shutdown() {
        logger.info("Shutting down ...");
        this.registry.shutdown();
        this.peerEurekaNodes.shutdown();
        ServoControl.shutdown();
        EurekaMonitors.shutdown();
        logger.info("Shut down");
    }

    public EurekaServerConfig getServerConfig() {
        return this.serverConfig;
    }

    public PeerEurekaNodes getPeerEurekaNodes() {
        return this.peerEurekaNodes;
    }

    public ServerCodecs getServerCodecs() {
        return this.serverCodecs;
    }

    public PeerAwareInstanceRegistry getRegistry() {
        return this.registry;
    }

    public ApplicationInfoManager getApplicationInfoManager() {
        return this.applicationInfoManager;
    }
}

其中peerEurekaNodes.start();

啟動一個只擁有一個線程的線程池,第一次進去會更新一下集群其他節點信息

registry.init(peerEurekaNodes);

鼠標放在registry上,發現是PeerAwareInstanceRegistryImpl , 的 註冊信息管理類裏面的init方法

public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
    this.numberOfReplicationsLastMin.start();
    this.peerEurekaNodes = peerEurekaNodes;
    this.initializedResponseCache();
    this.scheduleRenewalThresholdUpdateTask();
    this.initRemoteRegionRegistry();

    try {
        Monitors.registerObject(this);
    } catch (Throwable var3) {
        logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", var3);
    }

}

PeerAwareInstanceRegistry是個接口,實現類是:PeerAwareInstanceRegistryImpl

PeerAwareInstanceRegistry接口,實現了com.netflix.eureka.registry.InstanceRegistry。  

服務實例註冊表 

Server是圍繞註冊表管理的

有兩個InstanceRegistry

  • com.netflix.eureka.registry.InstanceRegistry是euraka server中註冊表管理的核心接口,職責是在內存中管理註冊到Eureka Server中的服務實例信息
  • 實現類有PeerAwareInstanceRegistryImpl

org.springframework.cloud.netflix.eureka.server.InstanceRegistry對PeerAwareInstanceRegistryImpl進行了繼承和擴展

使其適配Spring cloud的使用環境,主要的實現由PeerAwareInstanceRegistryImpl提供

com.netflix.eureka.registry.InstanceRegistry extends LeaseManager<InstanceInfo>, LookupService<String>

  • LeaseManager<InstanceInfo> 是對註冊到server中的服務實例租約進行管理
  • LookupService<String>           是提供服務實例的檢索查詢功能
  • LeaseManager<InstanceInfo> 接口的作用是對註冊到Eureka Server中的服務實例租約進行管理

方法有

  • 服務註冊
  • 下線
  • 續約
  • 剔除

此接口管理的類目前是InstanceInfo,InstanceInfo代表服務實例信息

PeerAwareInstanceRegistryImpl 增加了對peer節點的同步複製操作

使得eureka server集群中註冊表信息保持一致

接受服務註冊

我們學過Eureka Client在發起服務註冊時會將自身的服務實例元數據封裝在InstanceInfo中,然後將InstanceInfo發送到Eureka Server

Eureka Server在接收到Eureka Client發送的InstanceInfo後將會嘗試將其放到本地註冊表中以供其他Eureka Client進行服務發現

通過 eureka/apps/{服務名}註冊 

  • 在EurekaServerAutoConfiguration中定義了 public FilterRegistrationBean jerseyFilterRegistration ,表明eureka-server使用了Jersey實現 對外的restful接口
  • 註冊一個 Jersey 的 filter ,配置好相應的Filter 和 url映射
 @Bean
public Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) {
    ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false, environment);
    provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
    provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
    Set<Class<?>> classes = new HashSet();
    String[] var5 = EUREKA_PACKAGES;
    int var6 = var5.length;

    for(int var7 = 0; var7 < var6; ++var7) {
        String basePackage = var5[var7];
        Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
        Iterator var10 = beans.iterator();

        while(var10.hasNext()) {
            BeanDefinition bd = (BeanDefinition)var10.next();
            Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader());
            classes.add(cls);
        }
    }

    Map<String, Object> propsAndFeatures = new HashMap();
    propsAndFeatures.put("com.sun.jersey.config.property.WebPageContentRegex", "/eureka/(fonts|images|css|js)/.*");
    DefaultResourceConfig rc = new DefaultResourceConfig(classes);
    rc.setPropertiesAndFeatures(propsAndFeatures);
    return rc;
}

添加一些過濾器,類似於過濾請求地址,Path類似於@RequestMapping,Provider類似於@Controller   

在com.netflix.eureka.resources包下,是Eureka Server對於Eureka client的REST請求的定義

看ApplicationResource類(這是一類請求,應用類的請求),類似於應用@Controller註解:@Produces({“application/xml”, “application/json”}),接受xml和json

見名識意 public Response addInstance,添加實例instanceinfo

方法中,有一句:

registry.register(info, “true”.equals(isReplication))

鼠標放在registry上PeerAwareInstanceRegistry接口,點擊void register方法

發現 是PeerAwareInstanceRegistryImpl 的方法:public void register(final InstanceInfo info, final boolean isReplication) ,中有一句:super.register(info, leaseDuration, isReplication);

com.netflix.eureka.registry.AbstractInstanceRegistry

register方法

在register中,服務實例的InstanceInfo保存在Lease中,Lease在AbstractInstanceRegistry中統一通過ConcurrentHashMap保存在內存中

  • 在服務註冊過程中,會先獲取一個讀鎖,防止其他線程對registry註冊表進行數據操作,避免數據的不一致
  • 然後從resgitry查詢對應的InstanceInfo租約是否已經存在註冊表中,根據appName劃分服務集群,使用InstanceId唯一標記服務實例
  • 如果租約存在,比較兩個租約中的InstanceInfo的最後更新時間lastDirtyTimestamp,保留時間戳大的服務實例信息InstanceInfo
  • 如果租約不存在,意味這是一次全新的服務註冊,將會進行自我保護的統計,創建新的租約保存InstanceInfo
  • 接着將租約放到resgitry註冊表中
  • 之後將進行一系列緩存操作並根據覆蓋狀態規則設置服務實例的狀態
  • 緩存操作包括將InstanceInfo加入用於統計Eureka Client增量式獲取註冊表信息的recentlyChangedQueue和失效responseCache中對應的緩存
  • 最後設置服務實例租約的上線時間用於計算租約的有效時間,釋放讀鎖並完成服務註冊

接受心跳 續租,renew

在Eureka Client完成服務註冊之後,它需要定時向Eureka Server發送心跳請求(默認30秒一次),維持自己在Eureka Server中租約的有效性

看另一類請求com.netflix.eureka.resources.InstanceResource

public Response renewLease()方法

@PUT
public Response renewLease(@HeaderParam("x-netflix-discovery-replication") String isReplication, @QueryParam("overriddenstatus") String overriddenStatus, @QueryParam("status") String status, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
    boolean isFromReplicaNode = "true".equals(isReplication);
    boolean isSuccess = this.registry.renew(this.app.getName(), this.id, isFromReplicaNode);
    if (!isSuccess) {
        logger.warn("Not Found (Renew): {} - {}", this.app.getName(), this.id);
        return Response.status(Status.NOT_FOUND).build();
    } else {
        Response response;
        if (lastDirtyTimestamp != null && this.serverConfig.shouldSyncWhenTimestampDiffers()) {
            response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
            if (response.getStatus() == Status.NOT_FOUND.getStatusCode() && overriddenStatus != null && !InstanceStatus.UNKNOWN.name().equals(overriddenStatus) && isFromReplicaNode) {
                this.registry.storeOverriddenStatusIfRequired(this.app.getAppName(), this.id, InstanceStatus.valueOf(overriddenStatus));
            }
        } else {
            response = Response.ok().build();
        }

        logger.debug("Found (Renew): {} - {}; reply status={}", new Object[]{this.app.getName(), this.id, response.getStatus()});
        return response;
    }
}

看到一行boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

點擊renew的實現

AbstractInstanceRegistry#renew方法

public boolean renew(String appName, String id, boolean isReplication) {
    EurekaMonitors.RENEW.increment(isReplication);
    Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(appName);
    Lease<InstanceInfo> leaseToRenew = null;
    if (gMap != null) {
        leaseToRenew = (Lease)gMap.get(id);
    }

    if (leaseToRenew == null) {
        EurekaMonitors.RENEW_NOT_FOUND.increment(isReplication);
        logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
        return false;
    } else {
        InstanceInfo instanceInfo = (InstanceInfo)leaseToRenew.getHolder();
        if (instanceInfo != null) {
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication);
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}; re-register required", instanceInfo.getId());
                EurekaMonitors.RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }

            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                logger.info("The instance status {} is different from overridden instance status {} for instance {}. Hence setting the status to overridden status", new Object[]{instanceInfo.getStatus().name(), instanceInfo.getOverriddenStatus().name(), instanceInfo.getId()});
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
            }
        }

        this.renewsLastMin.increment();
        leaseToRenew.renew();
        return true;
    }
}

Eureka Server處理心跳請求的核心邏輯位於AbstractInstanceRegistry#renew方法中

renew方法是對Eureka Client位於註冊表中的租約的續租操作,不像register方法需要服務實例信息,僅根據服務實例的服務名和服務實例id即可更新對應租約的有效時間

//根據appName獲取服務集群的租約集合
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
//查看服務實例狀態
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
    //統計每分鐘續租次數
    renewsLastMin.increment();
    //更新租約
    leaseToRenew.renew();
}

此方法中不關注InstanceInfo,僅關注於租約本身以及租約的服務實例狀態

  • 如果根據服務實例的appName和instanceInfoId查詢出服務實例的租約
  • 並且根據#getOverriddenInstanceStatus方法得到的instanceStatus不為InstanceStatus.UNKNOWN
  • 那麼更新租約中的有效時間,即更新租約Lease中的lastUpdateTimestamp
  • 達到續約的目的
  • 如果租約不存在,那麼返回續租失敗的結果

服務剔除

  • 如果Eureka Client在註冊後,既沒有續約,也沒有下線(服務崩潰或者網絡異常等原因)
  • 那麼服務的狀態就處於不可知的狀態,不能保證能夠從該服務實例中獲取到回饋
  • 所以需要服務剔除此方法定時清理這些不穩定的服務,該方法會批量將註冊表中所有過期租約剔除

剔除是定時任務,默認60秒執行一次,延時60秒,間隔60秒

protected void postInit() {
    this.renewsLastMin.start();
    if (this.evictionTaskRef.get() != null) {
        ((AbstractInstanceRegistry.EvictionTask)this.evictionTaskRef.get()).cancel();
    }

    this.evictionTaskRef.set(new AbstractInstanceRegistry.EvictionTask());
    this.evictionTimer.schedule((TimerTask)this.evictionTaskRef.get(), this.serverConfig.getEvictionIntervalTimerInMs(), this.serverConfig.getEvictionIntervalTimerInMs());
}

從上面eureka server啟動來看,剔除的任務,是線程啟動的,執行的是下面的方法。
com.netflix.eureka.registry.AbstractInstanceRegistry#evict

public void evict() {
    this.evict(0L);
}

public void evict(long additionalLeaseMs) {
    logger.debug("Running the evict task");
//   判斷是否開啟自我保護
if (!this.isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); } else { List<Lease<InstanceInfo>> expiredLeases = new ArrayList(); Iterator var4 = this.registry.entrySet().iterator(); while(true) { Map leaseMap; do { if (!var4.hasNext()) { int registrySize = (int)this.getLocalRegistrySize(); int registrySizeThreshold = (int)((double)registrySize * this.serverConfig.getRenewalPercentThreshold()); int evictionLimit = registrySize - registrySizeThreshold; int toEvict = Math.min(expiredLeases.size(), evictionLimit); if (toEvict > 0) { logger.info("Evicting {} items (expired={}, evictionLimit={})", new Object[]{toEvict, expiredLeases.size(), evictionLimit}); Random random = new Random(System.currentTimeMillis()); for(int i = 0; i < toEvict; ++i) { int next = i + random.nextInt(expiredLeases.size() - i); Collections.swap(expiredLeases, i, next); Lease<InstanceInfo> lease = (Lease)expiredLeases.get(i); String appName = ((InstanceInfo)lease.getHolder()).getAppName(); String id = ((InstanceInfo)lease.getHolder()).getId(); EurekaMonitors.EXPIRED.increment(); logger.warn("DS: Registry: expired lease for {}/{}", appName, id); this.internalCancel(appName, id, false); } } return; } Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry = (Entry)var4.next(); leaseMap = (Map)groupEntry.getValue(); } while(leaseMap == null); Iterator var7 = leaseMap.entrySet().iterator(); while(var7.hasNext()) { Entry<String, Lease<InstanceInfo>> leaseEntry = (Entry)var7.next(); Lease<InstanceInfo> lease = (Lease)leaseEntry.getValue(); if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) { expiredLeases.add(lease); } } } } }

如果開啟自我保護,不剔除

點進去isLeaseExpirationEnabled,查看實現類

public boolean isLeaseExpirationEnabled() {
    if (!this.isSelfPreservationModeEnabled()) {
        return true;
    } else {
        return this.numberOfRenewsPerMinThreshold > 0 && this.getNumOfRenewsInLastMin() > (long)this.numberOfRenewsPerMinThreshold;
    }
}

有一個isSelfPreservationModeEnabled,點進去 

public boolean isSelfPreservationModeEnabled() {
    return this.serverConfig.shouldEnableSelfPreservation();
}

發現EurekaServerConfig,的方法shouldEnableSelfPreservation,看其實現中有EurekaServerConfigBean

public boolean shouldEnableSelfPreservation() {
    return this.enableSelfPreservation;
}

發現屬性:enableSelfPreservation

回到com.netflix.eureka.registry.AbstractInstanceRegistry#evict方法

  • 緊接着一個大的for循環,便利註冊表register
  • 依次判斷租約是否過期
  • 一次性獲取所有的過期租約
while(true) {
    Map leaseMap;
    do {
        if (!var4.hasNext()) {
            // 獲取註冊表租約總數    
            int registrySize = (int)this.getLocalRegistrySize();
            // 計算註冊表租約的閾值總數 * 續租百分比,得出要續租的數量
            int registrySizeThreshold = (int)((double)registrySize * this.serverConfig.getRenewalPercentThreshold());
            // 總數減去要續租的數量,就是理論要剔除的數量     
            int evictionLimit = registrySize - registrySizeThreshold;
            // 求上面理論剔除數量,和過期租約總數的最小值, 就是最終要提出的數量
            int toEvict = Math.min(expiredLeases.size(), evictionLimit);
            if (toEvict > 0) {
                logger.info("Evicting {} items (expired={}, evictionLimit={})", new Object[]{toEvict, expiredLeases.size(), evictionLimit});
                Random random = new Random(System.currentTimeMillis());

                for(int i = 0; i < toEvict; ++i) {
                    int next = i + random.nextInt(expiredLeases.size() - i);
                    Collections.swap(expiredLeases, i, next);
                    Lease<InstanceInfo> lease = (Lease)expiredLeases.get(i);
                    String appName = ((InstanceInfo)lease.getHolder()).getAppName();
                    String id = ((InstanceInfo)lease.getHolder()).getId();
                    EurekaMonitors.EXPIRED.increment();
                    logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                    // 執行服務下線將服務從註冊表清除掉
                    this.internalCancel(appName, id, false);
                }
            }

            return;
        }

        Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry = (Entry)var4.next();
        leaseMap = (Map)groupEntry.getValue();
    } while(leaseMap == null);

    Iterator var7 = leaseMap.entrySet().iterator();

    while(var7.hasNext()) {
        Entry<String, Lease<InstanceInfo>> leaseEntry = (Entry)var7.next();
        Lease<InstanceInfo> lease = (Lease)leaseEntry.getValue();
        if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
            expiredLeases.add(lease);
        }
    }
}    

剔除的限制  

  • 自我保護期間不清除
  • 分批次清除

服務

  • 逐個隨機剔除
  • 剔除均勻分佈在所有應用中
  • 防止在同一時間內同一服務集群中的服務全部過期被剔除
  • 造成在大量剔除服務時,並在進行自我保護時,促使程序崩潰

剔除服務是個定時任務

查看EurekaServerInitializerConfiguration

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.springframework.cloud.netflix.eureka.server;

import com.netflix.eureka.EurekaServerConfig;
import javax.servlet.ServletContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.netflix.eureka.server.event.EurekaRegistryAvailableEvent;
import org.springframework.cloud.netflix.eureka.server.event.EurekaServerStartedEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.web.context.ServletContextAware;

@Configuration(
    proxyBeanMethods = false
)
public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered {
    private static final Log log = LogFactory.getLog(EurekaServerInitializerConfiguration.class);
    @Autowired
    private EurekaServerConfig eurekaServerConfig;
    private ServletContext servletContext;
    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private EurekaServerBootstrap eurekaServerBootstrap;
    private boolean running;
    private int order = 1;

    public EurekaServerInitializerConfiguration() {
    }

    public void setServletContext(ServletContext servletContext) {
        this.servletContext = servletContext;
    }

    public void start() {
        (new Thread(() -> {
            try {
                this.eurekaServerBootstrap.contextInitialized(this.servletContext);
                log.info("Started Eureka Server");
                this.publish(new EurekaRegistryAvailableEvent(this.getEurekaServerConfig()));
                this.running = true;
                this.publish(new EurekaServerStartedEvent(this.getEurekaServerConfig()));
            } catch (Exception var2) {
                log.error("Could not initialize Eureka servlet context", var2);
            }

        })).start();
    }

    private EurekaServerConfig getEurekaServerConfig() {
        return this.eurekaServerConfig;
    }

    private void publish(ApplicationEvent event) {
        this.applicationContext.publishEvent(event);
    }

    public void stop() {
        this.running = false;
        this.eurekaServerBootstrap.contextDestroyed(this.servletContext);
    }

    public boolean isRunning() {
        return this.running;
    }

    public int getPhase() {
        return 0;
    }

    public boolean isAutoStartup() {
        return true;
    }

    public void stop(Runnable callback) {
        callback.run();
    }

    public int getOrder() {
        return this.order;
    }
}

eurekaServerBootstrap.contextInitialized(方法,中initEurekaServerContext();點進去this.registry.openForTraffic(this.applicationInfoManager, registryCount);點進去,super.postInit();點進去

protected void postInit() {
    this.renewsLastMin.start();
    if (this.evictionTaskRef.get() != null) {
        ((AbstractInstanceRegistry.EvictionTask)this.evictionTaskRef.get()).cancel();
    }

    this.evictionTaskRef.set(new AbstractInstanceRegistry.EvictionTask());
    this.evictionTimer.schedule((TimerTask)this.evictionTaskRef.get(), this.serverConfig.getEvictionIntervalTimerInMs(), this.serverConfig.getEvictionIntervalTimerInMs());
}

EvictionTask是定時任務

class EvictionTask extends TimerTask {
    private final AtomicLong lastExecutionNanosRef = new AtomicLong(0L);

    EvictionTask() {
    }

    public void run() {
        try {
            long compensationTimeMs = this.getCompensationTimeMs();
            AbstractInstanceRegistry.logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
            AbstractInstanceRegistry.this.evict(compensationTimeMs);
        } catch (Throwable var3) {
            AbstractInstanceRegistry.logger.error("Could not run the evict task", var3);
        }

    }

    long getCompensationTimeMs() {
        long currNanos = this.getCurrentTimeNano();
        long lastNanos = this.lastExecutionNanosRef.getAndSet(currNanos);
        if (lastNanos == 0L) {
            return 0L;
        } else {
            long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
            long compensationTime = elapsedMs - AbstractInstanceRegistry.this.serverConfig.getEvictionIntervalTimerInMs();
            return compensationTime <= 0L ? 0L : compensationTime;
        }
    }

    long getCurrentTimeNano() {
        return System.nanoTime();
    }
}

剔除服務是個定時任務,用EvictionTask執行,默認60秒執行一次,延時60秒執行

定時剔除過期服務

  • 服務剔除將會遍歷registry註冊表,找出其中所有的過期租約
  • 然後根據配置文件中續租百分比閥值和當前註冊表的租約總數量計算出最大允許的剔除租約的數量
  • (當前註冊表中租約總數量減去當前註冊表租約閥值)
  • 分批次剔除過期的服務實例租約
  • 對過期的服務實例租約調用AbstractInstanceRegistry#internalCancel服務下線的方法將其從註冊表中清除掉​     

自我保護機制主要在Eureka Client和Eureka Server之間存在網絡分區的情況下發揮保護作用

在服務器端和客戶端都有對應實現

  • 假設在某種特定的情況下(如網絡故障),Eureka Client和Eureka Server無法進行通信
  • 此時Eureka Client無法向Eureka Server發起註冊和續約請求,Eureka Server中就可能因註冊表中的服務實例租約出現大量過期而面臨被剔除的危險
  • 然而此時的Eureka Client可能是處於健康狀態的(可接受服務訪問),如果直接將註冊表中大量過期的服務實例租約剔除顯然是不合理的

針對這種情況,Eureka設計了「自我保護機制」

  • 在Eureka Server處,如果出現大量的服務實例過期被剔除的現象
  • 那麼該Server節點將進入自我保護模式,保護註冊表中的信息不再被剔除,在通信穩定後再退出該模式
  • 在Eureka Client處,如果向Eureka Server註冊失敗,將快速超時並嘗試與其他的Eureka Server進行通信
  • 「自我保護機制」的設計大大提高了Eureka的可用性

服務下線

Eureka Client在應用銷毀時

會向Eureka Server發送服務下線請求,清除註冊表中關於本應用的租約,避免無效的服務調用

在服務剔除的過程中,也是通過服務下線的邏輯完成對單個服務實例過期租約的清除工作

在InstanceResource中

@DELETE
public Response cancelLease(@HeaderParam("x-netflix-discovery-replication") String isReplication) {
    try {
        boolean isSuccess = this.registry.cancel(this.app.getName(), this.id, "true".equals(isReplication));
        if (isSuccess) {
            logger.debug("Found (Cancel): {} - {}", this.app.getName(), this.id);
            return Response.ok().build();
        } else {
            logger.info("Not Found (Cancel): {} - {}", this.app.getName(), this.id);
            return Response.status(Status.NOT_FOUND).build();
        }
    } catch (Throwable var3) {
        logger.error("Error (cancel): {} - {}", new Object[]{this.app.getName(), this.id, var3});
        return Response.serverError().build();
    }
}

查看registry.cancel,實現該方法的類是PeerAwareInstanceRegistryImpl.class

public boolean cancel(String appName, String id, boolean isReplication) {
    if (super.cancel(appName, id, isReplication)) {
        this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Cancel, appName, id, (InstanceInfo)null, (InstanceStatus)null, isReplication);
        return true;
    } else {
        return false;
    }
}

點super.cancel進去發現internCancel

public boolean cancel(String appName, String id, boolean isReplication) {
    return this.internalCancel(appName, id, isReplication);
}

查看internCancel實現      

protected boolean internalCancel(String appName, String id, boolean isReplication) {
    try {
        // 先獲取讀鎖, 防止被其他線程修改
        this.read.lock();
        EurekaMonitors.CANCEL.increment(isReplication);
        //  根據appName獲取服務實力集群
        Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(appName);
        Lease<InstanceInfo> leaseToCancel = null;
        // 在內存中取消實例 id的服務
        if (gMap != null) {
            leaseToCancel = (Lease)gMap.remove(id);
        }
        // 添加到最近下線服務的統計隊列
        this.recentCanceledQueue.add(new Pair(System.currentTimeMillis(), appName + "(" + id + ")"));
        InstanceStatus instanceStatus = (InstanceStatus)this.overriddenInstanceStatusMap.remove(id);
        if (instanceStatus != null) {
            logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
        }
        // 往下判斷leaseToCancel是否為空, 租約不存在返回false
         // 如果存在, 設置租約下線時間
        if (leaseToCancel == null) {
            EurekaMonitors.CANCEL_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
            boolean var16 = false;
            return var16;
        }

        leaseToCancel.cancel();
        InstanceInfo instanceInfo = (InstanceInfo)leaseToCancel.getHolder();
        String vip = null;
        String svip = null;
        if (instanceInfo != null) {
            // 獲取持有租約的服務信息, 標記服務實例
            instanceInfo.setActionType(ActionType.DELETED);
            //   添加到租約變更記錄隊列 用於eureka client的增量拉取註冊表信息。
            this.recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(leaseToCancel));
            instanceInfo.setLastUpdatedTimestamp();
            vip = instanceInfo.getVIPAddress();
            svip = instanceInfo.getSecureVipAddress();
        }

        this.invalidateCache(appName, vip, svip);
        logger.info("Cancelled instance {}/{} (replication={})", new Object[]{appName, id, isReplication});
    } finally {
        // 釋放鎖
        this.read.unlock();
    }

    synchronized(this.lock) {
        if (this.expectedNumberOfClientsSendingRenews > 0) {
            --this.expectedNumberOfClientsSendingRenews;
            this.updateRenewsPerMinThreshold();
        }

        return true;
    }
}
  • 首先通過registry根據服務名和服務實例id查詢關於服務實例的租約Lease是否存在
  • 統計最近請求下線的服務實例用於Eureka Server主頁展示
  • 如果租約不存在,返回下線失敗
  • 如果租約存在,從registry註冊表中移除,設置租約的下線時間
  • 同時在最近租約變更記錄隊列中添加新的下線記錄,以用於Eureka Client的增量式獲取註冊表信息

集群同步

如果Eureka Server是通過集群的方式進行部署,那麼為了維護整個集群中Eureka Server註冊表數據的一致性,勢必需要一個機制同步Eureka Server集群中的註冊表數據

Eureka Server集群同步包含兩個部分

  • 一部分是Eureka Server在啟動過程中從它的peer節點中拉取註冊表信息,並將這些服務實例的信息註冊到本地註冊表中
  • 另一部分是Eureka Server每次對本地註冊表進行操作時,同時會將操作同步到它的peer節點中,達到集群註冊表數據統一的目的

啟動拉取別的peer
  

在Eureka Server啟動類中

EurekaServerInitializerConfiguration位於EurekaServerAutoConfiguration 的import註解中。

eurekaServerBootstrap.contextInitialized

點進去:initEurekaServerContext()

 protected void initEurekaServerContext() throws Exception {
    JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
    XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
    if (this.isAws(this.applicationInfoManager.getInfo())) {
        this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager);
        this.awsBinder.start();
    }

    EurekaServerContextHolder.initialize(this.serverContext);
    log.info("Initialized server context");
    int registryCount = this.registry.syncUp();
    this.registry.openForTraffic(this.applicationInfoManager, registryCount);
    EurekaMonitors.registerAllStats();
}

看注釋:拉取註冊表從鄰近節點。點擊syncUp()的實現方法進去

public int syncUp() {
    int count = 0;

    for(int i = 0; i < this.serverConfig.getRegistrySyncRetries() && count == 0; ++i) {
        if (i > 0) {
            try {
                Thread.sleep(this.serverConfig.getRegistrySyncRetryWaitMs());
            } catch (InterruptedException var10) {
                logger.warn("Interrupted during registry transfer..");
                break;
            }
        }

        Applications apps = this.eurekaClient.getApplications();
        Iterator var4 = apps.getRegisteredApplications().iterator();

        while(var4.hasNext()) {
            Application app = (Application)var4.next();
            Iterator var6 = app.getInstances().iterator();

            while(var6.hasNext()) {
                InstanceInfo instance = (InstanceInfo)var6.next();

                try {
                    if (this.isRegisterable(instance)) {
                        this.register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                        ++count;
                    }
                } catch (Throwable var9) {
                    logger.error("During DS init copy", var9);
                }
            }
        }
    }

    return count;
}
  • 看循環:意思是,如果是i第一次進來,為0,不夠等待的代碼,直接執行下面的拉取服務實例
  • 將自己作為一個eureka client,拉取註冊表
  • 並通過register(instance, instance.getLeaseInfo().getDurationInSecs(), true)註冊到自身的註冊表中

Eureka Server也是一個Eureka Client

  • 啟動的時候也會進行DiscoveryClient的初始化,會從其對應的Eureka Server中拉取全量的註冊表信息
  • 在Eureka Server集群部署的情況下,Eureka Server從它的peer節點中拉取到註冊表信息後
  • 將遍歷這個Applications,將所有的服務實例通過AbstractRegistry#register方法註冊到自身註冊表中

回到initEurekaServerContext

int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);        

當執行完上面的syncUp邏輯後,在下面的openForTraffic,開啟此server接受別的client註冊,拉取註冊表等操作

而在它首次拉取其他peer節點時,是不允許client的通信請求的

openForTraffic

public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // 初始化期望client發送過來的服務數量, 即上面獲取到的服務數量
    this.expectedNumberOfClientsSendingRenews = count;
    this.updateRenewsPerMinThreshold();
    logger.info("Got {} instances from neighboring DS node", count);
    logger.info("Renew threshold is: {}", this.numberOfRenewsPerMinThreshold);
    this.startupTime = System.currentTimeMillis();
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }

    Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && this.serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        this.primeAwsReplicas(applicationInfoManager);
    }

    logger.info("Changing status to UP");
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    super.postInit();
}

updateRenewsPerMinThreshold點進去,是計算自我保護的統計參數

protected void updateRenewsPerMinThreshold() {
    this.numberOfRenewsPerMinThreshold = (int)((double)this.expectedNumberOfClientsSendingRenews * (60.0D / (double)this.serverConfig.getExpectedClientRenewalIntervalSeconds()) * this.serverConfig.getRenewalPercentThreshold());
}

服務數*(每個服務每分鐘續約次數)* 閾值

回到openForTraffic

if (count > 0) {
    this.peerInstancesTransferEmptyOnStartup = false;
}
  • 如果count=0,沒有拉取到註冊表信息,將此值設為true,表示其他peer來取空的實例信息
  • 意味着,將不允許client從此server獲取註冊表信息
  • 如果count>0,將此值設置為false,允許client來獲取註冊表

後面將服務置為上線,並開啟剔除的定時任務

  • 當Server的狀態不為UP時,將拒絕所有的請求
  • 在Client請求獲取註冊表信息時,Server會判斷此時是否允許獲取註冊表中的信息
  • 上述做法是為了避免Eureka Server在#syncUp方法中沒有獲取到任何服務實例信息時(Eureka Server集群部署的情況下)
  • Eureka Server註冊表中的信息影響到Eureka Client緩存的註冊表中的信息
  • 因為是全量同步,如果server什麼也沒同步過來,會導致client清空註冊表,服務調用出問題

Server之間註冊表信息的同步複製

為了保證Eureka Server集群運行時註冊表信息的一致性,每個Eureka Server在對本地註冊表進行管理操作時,會將相應的操作同步到所有peer節點中

在外部調用server的restful方法時,在com.netflix.eureka.resources包下的ApplicationResource資源中,查看每個服務的操作

在PeerAwareInstanceRegistryImpl類中,看其他操作,cancel,renew等中都有replicateToPeers,

此方法中有個peerEurekaNodes,代表一個可同步數據的eureka Server的集合,如果註冊表有變化,向此中的peer節點同步

replicateToPeers方法

private void replicateToPeers(PeerAwareInstanceRegistryImpl.Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();

    try {
        if (isReplication) {
            this.numberOfReplicationsLastMin.increment();
        }

        if (this.peerEurekaNodes != Collections.EMPTY_LIST && !isReplication) {
            Iterator var8 = this.peerEurekaNodes.getPeerEurekaNodes().iterator();

            while(var8.hasNext()) {
                PeerEurekaNode node = (PeerEurekaNode)var8.next();
                if (!this.peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    this.replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
                }
            }

            return;
        }
    } finally {
        tracer.stop();
    }

}

它將遍歷Eureka Server中peer節點,向每個peer節點發送同步請求

此replicateInstanceActionsToPeers方法

private void replicateInstanceActionsToPeers(PeerAwareInstanceRegistryImpl.Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) {
    try {
        CurrentRequestVersion.set(Version.V2);
        InstanceInfo infoFromRegistry;
        switch(action) {
        case Cancel:
            node.cancel(appName, id);
            break;
        case Heartbeat:
            InstanceStatus overriddenStatus = (InstanceStatus)this.overriddenInstanceStatusMap.get(id);
            infoFromRegistry = this.getInstanceByAppAndId(appName, id, false);
            node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
            break;
        case Register:
            node.register(info);
            break;
        case StatusUpdate:
            infoFromRegistry = this.getInstanceByAppAndId(appName, id, false);
            node.statusUpdate(appName, id, newStatus, infoFromRegistry);
            break;
        case DeleteStatusOverride:
            infoFromRegistry = this.getInstanceByAppAndId(appName, id, false);
            node.deleteStatusOverride(appName, id, infoFromRegistry);
        }
    } catch (Throwable var12) {
        logger.error("Cannot replicate information to {} for action {}", new Object[]{node.getServiceUrl(), action.name(), var12});
    } finally {
        CurrentRequestVersion.remove();
    }

}

類PeerEurekaNode的實例node的各種方法,cancel,register

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package com.netflix.eureka.cluster;

import com.netflix.appinfo.InstanceInfo;
import com.netflix.appinfo.InstanceInfo.InstanceStatus;
import com.netflix.discovery.shared.transport.EurekaHttpResponse;
import com.netflix.eureka.EurekaServerConfig;
import com.netflix.eureka.registry.PeerAwareInstanceRegistry;
import com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl.Action;
import com.netflix.eureka.resources.ASGResource.ASGStatus;
import com.netflix.eureka.util.batcher.TaskDispatcher;
import com.netflix.eureka.util.batcher.TaskDispatchers;
import java.net.MalformedURLException;
import java.net.URL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PeerEurekaNode {
    private static final long RETRY_SLEEP_TIME_MS = 100L;
    private static final long SERVER_UNAVAILABLE_SLEEP_TIME_MS = 1000L;
    private static final long MAX_BATCHING_DELAY_MS = 500L;
    private static final int BATCH_SIZE = 250;
    private static final Logger logger = LoggerFactory.getLogger(PeerEurekaNode.class);
    public static final String BATCH_URL_PATH = "peerreplication/batch/";
    public static final String HEADER_REPLICATION = "x-netflix-discovery-replication";
    private final String serviceUrl;
    private final EurekaServerConfig config;
    private final long maxProcessingDelayMs;
    private final PeerAwareInstanceRegistry registry;
    private final String targetHost;
    private final HttpReplicationClient replicationClient;
    private final TaskDispatcher<String, ReplicationTask> batchingDispatcher;
    private final TaskDispatcher<String, ReplicationTask> nonBatchingDispatcher;

    public PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, HttpReplicationClient replicationClient, EurekaServerConfig config) {
        this(registry, targetHost, serviceUrl, replicationClient, config, 250, 500L, 100L, 1000L);
    }

    PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, HttpReplicationClient replicationClient, EurekaServerConfig config, int batchSize, long maxBatchingDelayMs, long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
        this.registry = registry;
        this.targetHost = targetHost;
        this.replicationClient = replicationClient;
        this.serviceUrl = serviceUrl;
        this.config = config;
        this.maxProcessingDelayMs = (long)config.getMaxTimeForReplication();
        String batcherName = this.getBatcherName();
        ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
        this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(batcherName, config.getMaxElementsInPeerReplicationPool(), batchSize, config.getMaxThreadsForPeerReplication(), maxBatchingDelayMs, serverUnavailableSleepTimeMs, retrySleepTimeMs, taskProcessor);
        this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(targetHost, config.getMaxElementsInStatusReplicationPool(), config.getMaxThreadsForStatusReplication(), maxBatchingDelayMs, serverUnavailableSleepTimeMs, retrySleepTimeMs, taskProcessor);
    }

    public void register(final InstanceInfo info) throws Exception {
        long expiryTime = System.currentTimeMillis() + (long)getLeaseRenewalOf(info);
        this.batchingDispatcher.process(taskId("register", info), new InstanceReplicationTask(this.targetHost, Action.Register, info, (InstanceStatus)null, true) {
            public EurekaHttpResponse<Void> execute() {
                return PeerEurekaNode.this.replicationClient.register(info);
            }
        }, expiryTime);
    }

    public void cancel(final String appName, final String id) throws Exception {
        long expiryTime = System.currentTimeMillis() + this.maxProcessingDelayMs;
        this.batchingDispatcher.process(taskId("cancel", appName, id), new InstanceReplicationTask(this.targetHost, Action.Cancel, appName, id) {
            public EurekaHttpResponse<Void> execute() {
                return PeerEurekaNode.this.replicationClient.cancel(appName, id);
            }

            public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
                super.handleFailure(statusCode, responseEntity);
                if (statusCode == 404) {
                    PeerEurekaNode.logger.warn("{}: missing entry.", this.getTaskName());
                }

            }
        }, expiryTime);
    }

    public void heartbeat(final String appName, final String id, final InstanceInfo info, final InstanceStatus overriddenStatus, boolean primeConnection) throws Throwable {
        if (primeConnection) {
            this.replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
        } else {
            ReplicationTask replicationTask = new InstanceReplicationTask(this.targetHost, Action.Heartbeat, info, overriddenStatus, false) {
                public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
                    return PeerEurekaNode.this.replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
                }

                public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
                    super.handleFailure(statusCode, responseEntity);
                    if (statusCode == 404) {
                        PeerEurekaNode.logger.warn("{}: missing entry.", this.getTaskName());
                        if (info != null) {
                            PeerEurekaNode.logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}", new Object[]{this.getTaskName(), info.getId(), info.getStatus()});
                            PeerEurekaNode.this.register(info);
                        }
                    } else if (PeerEurekaNode.this.config.shouldSyncWhenTimestampDiffers()) {
                        InstanceInfo peerInstanceInfo = (InstanceInfo)responseEntity;
                        if (peerInstanceInfo != null) {
                            PeerEurekaNode.this.syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
                        }
                    }

                }
            };
            long expiryTime = System.currentTimeMillis() + (long)getLeaseRenewalOf(info);
            this.batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
        }
    }

    public void statusUpdate(final String asgName, final ASGStatus newStatus) {
        long expiryTime = System.currentTimeMillis() + this.maxProcessingDelayMs;
        this.nonBatchingDispatcher.process(asgName, new AsgReplicationTask(this.targetHost, Action.StatusUpdate, asgName, newStatus) {
            public EurekaHttpResponse<?> execute() {
                return PeerEurekaNode.this.replicationClient.statusUpdate(asgName, newStatus);
            }
        }, expiryTime);
    }

    public void statusUpdate(final String appName, final String id, final InstanceStatus newStatus, final InstanceInfo info) {
        long expiryTime = System.currentTimeMillis() + this.maxProcessingDelayMs;
        this.batchingDispatcher.process(taskId("statusUpdate", appName, id), new InstanceReplicationTask(this.targetHost, Action.StatusUpdate, info, (InstanceStatus)null, false) {
            public EurekaHttpResponse<Void> execute() {
                return PeerEurekaNode.this.replicationClient.statusUpdate(appName, id, newStatus, info);
            }
        }, expiryTime);
    }

    public void deleteStatusOverride(final String appName, final String id, final InstanceInfo info) {
        long expiryTime = System.currentTimeMillis() + this.maxProcessingDelayMs;
        this.batchingDispatcher.process(taskId("deleteStatusOverride", appName, id), new InstanceReplicationTask(this.targetHost, Action.DeleteStatusOverride, info, (InstanceStatus)null, false) {
            public EurekaHttpResponse<Void> execute() {
                return PeerEurekaNode.this.replicationClient.deleteStatusOverride(appName, id, info);
            }
        }, expiryTime);
    }

    public String getServiceUrl() {
        return this.serviceUrl;
    }

    public int hashCode() {
        int prime = true;
        int result = 1;
        int result = 31 * result + (this.serviceUrl == null ? 0 : this.serviceUrl.hashCode());
        return result;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        } else if (obj == null) {
            return false;
        } else if (this.getClass() != obj.getClass()) {
            return false;
        } else {
            PeerEurekaNode other = (PeerEurekaNode)obj;
            if (this.serviceUrl == null) {
                if (other.serviceUrl != null) {
                    return false;
                }
            } else if (!this.serviceUrl.equals(other.serviceUrl)) {
                return false;
            }

            return true;
        }
    }

    public void shutDown() {
        this.batchingDispatcher.shutdown();
        this.nonBatchingDispatcher.shutdown();
        this.replicationClient.shutdown();
    }

    private void syncInstancesIfTimestampDiffers(String appName, String id, InstanceInfo info, InstanceInfo infoFromPeer) {
        try {
            if (infoFromPeer != null) {
                logger.warn("Peer wants us to take the instance information from it, since the timestamp differs,Id : {} My Timestamp : {}, Peer's timestamp: {}", new Object[]{id, info.getLastDirtyTimestamp(), infoFromPeer.getLastDirtyTimestamp()});
                if (infoFromPeer.getOverriddenStatus() != null && !InstanceStatus.UNKNOWN.equals(infoFromPeer.getOverriddenStatus())) {
                    logger.warn("Overridden Status info -id {}, mine {}, peer's {}", new Object[]{id, info.getOverriddenStatus(), infoFromPeer.getOverriddenStatus()});
                    this.registry.storeOverriddenStatusIfRequired(appName, id, infoFromPeer.getOverriddenStatus());
                }

                this.registry.register(infoFromPeer, true);
            }
        } catch (Throwable var6) {
            logger.warn("Exception when trying to set information from peer :", var6);
        }

    }

    public String getBatcherName() {
        String batcherName;
        try {
            batcherName = (new URL(this.serviceUrl)).getHost();
        } catch (MalformedURLException var3) {
            batcherName = this.serviceUrl;
        }

        return "target_" + batcherName;
    }

    private static String taskId(String requestType, String appName, String id) {
        return requestType + '#' + appName + '/' + id;
    }

    private static String taskId(String requestType, InstanceInfo info) {
        return taskId(requestType, info.getAppName(), info.getId());
    }

    private static int getLeaseRenewalOf(InstanceInfo info) {
        return (info.getLeaseInfo() == null ? 90 : info.getLeaseInfo().getRenewalIntervalInSecs()) * 1000;
    }
}
  • 用了batchingDispatcher.process
  • 作用是將同一時間段內,相同服務實例的相同操作將使用相同的任務編號
  • 在進行同步複製的時候,將根據任務編號合併操作,減少同步操作的數量和網絡消耗
  • 但是同時也造成了同步複製的延時性,不滿足CAP中的C(強一致性)
  • 所以Eureka,只滿足AP

通過Eureka Server在啟動過程中初始化本地註冊表信息和Eureka Server集群間的同步複製操作,最終達到了集群中Eureka Server註冊表信息一致的目的

獲取註冊表中服務實例信息

Eureka Server中獲取註冊表的服務實例信息主要通過兩個方法實現

  • AbstractInstanceRegistry#getApplicationsFromMultipleRegions從多地區獲取全量註冊表數據
  • AbstractInstanceRegistry#getApplicationDeltasFromMultipleRegions從多地區獲取增量式註冊表數據。

全量

上面講到從節點複製註冊信息的時候,用方法public int syncUp()

Applications apps = eurekaClient.getApplications(); 實現類

public Applications getApplications() {
    boolean disableTransparentFallback = this.serverConfig.disableTransparentFallbackToOtherRegion();
    return disableTransparentFallback ? this.getApplicationsFromLocalRegionOnly() : this.getApplicationsFromAllRemoteRegions();
}

有一行getApplicationsFromAllRemoteRegions()

getApplicationsFromMultipleRegions

public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
    boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
    logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}", includeRemoteRegion, remoteRegions);
    if (includeRemoteRegion) {
        EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
    } else {
        EurekaMonitors.GET_ALL_CACHE_MISS.increment();
    }

    Applications apps = new Applications();
    apps.setVersion(1L);
    Iterator var4 = this.registry.entrySet().iterator();

    while(var4.hasNext()) {
        Entry<String, Map<String, Lease<InstanceInfo>>> entry = (Entry)var4.next();
        Application app = null;
        Lease lease;
        if (entry.getValue() != null) {
            for(Iterator var7 = ((Map)entry.getValue()).entrySet().iterator(); var7.hasNext(); app.addInstance(this.decorateInstanceInfo(lease))) {
                Entry<String, Lease<InstanceInfo>> stringLeaseEntry = (Entry)var7.next();
                lease = (Lease)stringLeaseEntry.getValue();
                if (app == null) {
                    app = new Application(((InstanceInfo)lease.getHolder()).getAppName());
                }
            }
        }

        if (app != null) {
            apps.addApplication(app);
        }
    }

    if (includeRemoteRegion) {
        String[] var15 = remoteRegions;
        int var16 = remoteRegions.length;

        label69:
        for(int var17 = 0; var17 < var16; ++var17) {
            String remoteRegion = var15[var17];
            RemoteRegionRegistry remoteRegistry = (RemoteRegionRegistry)this.regionNameVSRemoteRegistry.get(remoteRegion);
            if (null == remoteRegistry) {
                logger.warn("No remote registry available for the remote region {}", remoteRegion);
            } else {
                Applications remoteApps = remoteRegistry.getApplications();
                Iterator var10 = remoteApps.getRegisteredApplications().iterator();

                while(true) {
                    while(true) {
                        if (!var10.hasNext()) {
                            continue label69;
                        }

                        Application application = (Application)var10.next();
                        if (this.shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
                            logger.info("Application {}  fetched from the remote region {}", application.getName(), remoteRegion);
                            Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
                            if (appInstanceTillNow == null) {
                                appInstanceTillNow = new Application(application.getName());
                                apps.addApplication(appInstanceTillNow);
                            }

                            Iterator var13 = application.getInstances().iterator();

                            while(var13.hasNext()) {
                                InstanceInfo instanceInfo = (InstanceInfo)var13.next();
                                appInstanceTillNow.addInstance(instanceInfo);
                            }
                        } else {
                            logger.debug("Application {} not fetched from the remote region {} as there exists a whitelist and this app is not in the whitelist.", application.getName(), remoteRegion);
                        }
                    }
                }
            }
        }
    }

    apps.setAppsHashCode(apps.getReconcileHashCode());
    return apps;
}
  • 作用從多個地區中獲取全量註冊表信息,並封裝成Applications返回
  • 它首先會將本地註冊表registry中的所有服務實例信息提取出來封裝到Applications中
  • 再根據是否需要拉取Region的註冊信息,將遠程拉取過來的Application放到上面的Applications中
  • 最後得到一個全量的Applications

增量

在前面提到接受服務註冊,接受心跳等方法中,都有recentlyChangedQueue.add(new RecentlyChangedItem(lease));

作用是將新變動的服務放到最近變化的服務實例信息隊列中,用於記錄增量是註冊表信息

getApplicationDeltasFromMultipleRegions,實現了從遠處eureka server中獲取增量式註冊表信息的能力

public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
    if (null == remoteRegions) {
        remoteRegions = this.allKnownRemoteRegions;
    }

    boolean includeRemoteRegion = remoteRegions.length != 0;
    if (includeRemoteRegion) {
        EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment();
    } else {
        EurekaMonitors.GET_ALL_CACHE_MISS_DELTA.increment();
    }

    Applications apps = new Applications();
    apps.setVersion(this.responseCache.getVersionDeltaWithRegions().get());
    HashMap applicationInstancesMap = new HashMap();

    Applications var23;
    try {
        this.write.lock();
        Iterator<AbstractInstanceRegistry.RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
        logger.debug("The number of elements in the delta queue is :{}", this.recentlyChangedQueue.size());

        Lease lease;
        Application app;
        for(; iter.hasNext(); app.addInstance(new InstanceInfo(this.decorateInstanceInfo(lease)))) {
            lease = ((AbstractInstanceRegistry.RecentlyChangedItem)iter.next()).getLeaseInfo();
            InstanceInfo instanceInfo = (InstanceInfo)lease.getHolder();
            logger.debug("The instance id {} is found with status {} and actiontype {}", new Object[]{instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name()});
            app = (Application)applicationInstancesMap.get(instanceInfo.getAppName());
            if (app == null) {
                app = new Application(instanceInfo.getAppName());
                applicationInstancesMap.put(instanceInfo.getAppName(), app);
                apps.addApplication(app);
            }
        }

        if (includeRemoteRegion) {
            String[] var20 = remoteRegions;
            int var22 = remoteRegions.length;

            label155:
            for(int var24 = 0; var24 < var22; ++var24) {
                String remoteRegion = var20[var24];
                RemoteRegionRegistry remoteRegistry = (RemoteRegionRegistry)this.regionNameVSRemoteRegistry.get(remoteRegion);
                if (null != remoteRegistry) {
                    Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();
                    if (null != remoteAppsDelta) {
                        Iterator var12 = remoteAppsDelta.getRegisteredApplications().iterator();

                        while(true) {
                            Application application;
                            do {
                                if (!var12.hasNext()) {
                                    continue label155;
                                }

                                application = (Application)var12.next();
                            } while(!this.shouldFetchFromRemoteRegistry(application.getName(), remoteRegion));

                            Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
                            if (appInstanceTillNow == null) {
                                appInstanceTillNow = new Application(application.getName());
                                apps.addApplication(appInstanceTillNow);
                            }

                            Iterator var15 = application.getInstances().iterator();

                            while(var15.hasNext()) {
                                InstanceInfo instanceInfo = (InstanceInfo)var15.next();
                                appInstanceTillNow.addInstance(new InstanceInfo(instanceInfo));
                            }
                        }
                    }
                }
            }
        }

        Applications allApps = this.getApplicationsFromMultipleRegions(remoteRegions);
        apps.setAppsHashCode(allApps.getReconcileHashCode());
        var23 = apps;
    } finally {
        this.write.unlock();
    }

    return var23;
}

在EurekaServer對外restful中,在com.netflix.eureka.resources下

@Path("{appId}")
public ApplicationResource getApplicationResource(@PathParam("version") String version, @PathParam("appId") String appId) {
    CurrentRequestVersion.set(Version.toEnum(version));

    ApplicationResource var3;
    try {
        var3 = new ApplicationResource(appId, this.serverConfig, this.registry);
    } finally {
        CurrentRequestVersion.remove();
    }

    return var3;
}
@GET
public Response getApplication(@PathParam("version") String version, @HeaderParam("Accept") String acceptHeader, @HeaderParam("X-Eureka-Accept") String eurekaAccept) {
    if (!this.registry.shouldAllowAccess(false)) {
        return Response.status(Status.FORBIDDEN).build();
    } else {
        EurekaMonitors.GET_APPLICATION.increment();
        CurrentRequestVersion.set(Version.toEnum(version));
        KeyType keyType = KeyType.JSON;
        if (acceptHeader == null || !acceptHeader.contains("json")) {
            keyType = KeyType.XML;
        }

        Key cacheKey = new Key(EntityType.Application, this.appName, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept));
        String payLoad = this.responseCache.get(cacheKey);
        CurrentRequestVersion.remove();
        if (payLoad != null) {
            logger.debug("Found: {}", this.appName);
            return Response.ok(payLoad).build();
        } else {
            logger.debug("Not Found: {}", this.appName);
            return Response.status(Status.NOT_FOUND).build();
        }
    }
}

其中有一句:

String payLoad = responseCache.get(cacheKey);

查看responseCache其實現類的構造函數

點進去有一句:registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));從遠程獲取delta增量註冊信息

private ResponseCacheImpl.Value generatePayload(Key key) {
    Stopwatch tracer = null;

    ResponseCacheImpl.Value var8;
    try {
        String payload;
        switch(key.getEntityType()) {
        case Application:
            boolean isRemoteRegionRequested = key.hasRegions();
            if ("ALL_APPS".equals(key.getName())) {
                if (isRemoteRegionRequested) {
                    tracer = this.serializeAllAppsWithRemoteRegionTimer.start();
                    payload = this.getPayLoad(key, this.registry.getApplicationsFromMultipleRegions(key.getRegions()));
                } else {
                    tracer = this.serializeAllAppsTimer.start();
                    payload = this.getPayLoad(key, this.registry.getApplications());
                }
            } else if ("ALL_APPS_DELTA".equals(key.getName())) {
                if (isRemoteRegionRequested) {
                    tracer = this.serializeDeltaAppsWithRemoteRegionTimer.start();
                    this.versionDeltaWithRegions.incrementAndGet();
                    versionDeltaWithRegionsLegacy.incrementAndGet();
                    payload = this.getPayLoad(key, this.registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
                } else {
                    tracer = this.serializeDeltaAppsTimer.start();
                    this.versionDelta.incrementAndGet();
                    versionDeltaLegacy.incrementAndGet();
                    payload = this.getPayLoad(key, this.registry.getApplicationDeltas());
                }
            } else {
                tracer = this.serializeOneApptimer.start();
                payload = this.getPayLoad(key, this.registry.getApplication(key.getName()));
            }
            break;
        case VIP:
        case SVIP:
            tracer = this.serializeViptimer.start();
            payload = this.getPayLoad(key, getApplicationsForVip(key, this.registry));
            break;
        default:
            logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
            payload = "";
        }

        var8 = new ResponseCacheImpl.Value(payload);
    } finally {
        if (tracer != null) {
            tracer.stop();
        }

    }

    return var8;
}

但是這個只是向client提供,不向server提供,因為server可以通過每次變更自動同步到peer

獲取增量式註冊表信息將會從recentlyChangedQueue中獲取最近變化的服務實例信息

recentlyChangedQueue中統計了近3分鐘內進行註冊、修改和剔除的服務實例信息

在服務註冊AbstractInstanceRegistry#registry

接受心跳請求AbstractInstanceRegistry#renew

服務下線AbstractInstanceRegistry#internalCancel等方法中均可見到recentlyChangedQueue對這些服務實例進行登記,用於記錄增量式註冊表信息

#getApplicationsFromMultipleRegions方法同樣提供了從遠程Region的Eureka Server獲取增量式註冊表信息的能力