Spring Cloud微服務技術棧(四):服務治理Spring Cloud Eureka部分源碼分析
- 2020 年 4 月 3 日
- 筆記
上一篇文章《Spring Cloud微服務技術棧(三):服務治理Spring Cloud Eureka核心元素分析》主要對
Spring Cloud Eureka
的三個核心元素(服務註冊中心、服務提供者、服務消費者)進行了分析,熟悉了三者之間的通信關係,本篇文章將主要分析Spring Cloud Eureka
的部分源碼。
當我們搭建好Eureka Server
服務註冊中心並啟動後,就可以繼續啟動服務提供者和服務消費者了。大家都知道,當服務提供者成功啟動後,就會向服務註冊中心註冊自己的服務,服務消費者成功啟動後,就會向服務註冊中心獲取服務實例列表,根據實例列表來調用具體服務。那麼,這整個過程是如何運轉的呢?我們一起來根據源碼的思路來探索。
Eureka Server服務註冊中心源碼分析
回憶之前我們一起搭建的服務註冊中心的項目,我們在服務註冊中心的項目中的application.properties
文件中配置好服務註冊中心需要的相關配置,然後在Spring Boot
的啟動類中加了一個註解@EnableEurekaServer
,然後啟動項目就成功啟動了服務註冊中心,那麼到底是如何啟動的呢? 在配置文件中(單節點),我們是如下配置的:
# 配置端口 server.port=1111 # 配置服務註冊中心地址 eureka.instance.hostname=localhost # 作為服務註冊中心,禁止本應用向自己註冊服務 eureka.client.register-with-eureka=false # 作為服務註冊中心,禁止本應用向自己檢索服務 eureka.client.fetch-registry=false # 設置服務註冊中心服務註冊地址 eureka.client.service-url.defaultZone=http://${eureka.instance.hostname}:${server.port}/eureka/ # 關閉自我保護機制,及時剔除無效服務 eureka.server.enable-self-preservation=false
這個配置在工程啟動的時候,會被Spring
容器讀取,配置到EurekaClientConfigBean
中,而這個配置類會被註冊成Spring
的Bean
以供其他的Bean
來使用。 我們再進入註解@EnableEurekaServer
一探究竟,@EnableEurekaServer
的源碼如下:
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(EurekaServerMarkerConfiguration.class) public @interface EnableEurekaServer { }
從上述註解可以看出,該註解導入了配置類EurekaServerMarkerConfiguration
,我們在進一步進入到EurekaServerMarkerConfiguration
中,代碼如下所示:
@Configuration public class EurekaServerMarkerConfiguration { @Bean public Marker eurekaServerMarkerBean() { return new Marker(); } class Marker { } }
從這個配置類中暫時無法看到什麼具體的內容,我們可以進一步查看類Marker
在哪些地方被使用了,通過搜索Marker
,可以發現在類EurekaServerAutoConfiguration
上的註解中被引用了,具體代碼如下所示:
@Configuration @Import(EurekaServerInitializerConfiguration.class) @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) @EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class }) @PropertySource("classpath:/eureka/server.properties") public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter { /** * List of packages containing Jersey resources required by the Eureka server */ private static final String[] EUREKA_PACKAGES = new String[] { "com.netflix.discovery", "com.netflix.eureka" }; @Autowired private ApplicationInfoManager applicationInfoManager; @Autowired private EurekaServerConfig eurekaServerConfig; @Autowired private EurekaClientConfig eurekaClientConfig; @Autowired private EurekaClient eurekaClient; @Autowired private InstanceRegistryProperties instanceRegistryProperties; public static final CloudJacksonJson JACKSON_JSON = new CloudJacksonJson(); @Bean public HasFeatures eurekaServerFeature() { return HasFeatures.namedFeature("Eureka Server", EurekaServerAutoConfiguration.class); } @Configuration protected static class EurekaServerConfigBeanConfiguration { @Bean @ConditionalOnMissingBean public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) { EurekaServerConfigBean server = new EurekaServerConfigBean(); if (clientConfig.shouldRegisterWithEureka()) { // Set a sensible default if we are supposed to replicate server.setRegistrySyncRetries(5); } return server; } } @Bean @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true) public EurekaController eurekaController() { return new EurekaController(this.applicationInfoManager); } static { CodecWrappers.registerWrapper(JACKSON_JSON); EurekaJacksonCodec.setInstance(JACKSON_JSON.getCodec()); } @Bean public ServerCodecs serverCodecs() { return new CloudServerCodecs(this.eurekaServerConfig); } private static CodecWrapper getFullJson(EurekaServerConfig serverConfig) { CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getJsonCodecName()); return codec == null ? CodecWrappers.getCodec(JACKSON_JSON.codecName()) : codec; } private static CodecWrapper getFullXml(EurekaServerConfig serverConfig) { CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getXmlCodecName()); return codec == null ? CodecWrappers.getCodec(CodecWrappers.XStreamXml.class) : codec; } class CloudServerCodecs extends DefaultServerCodecs { public CloudServerCodecs(EurekaServerConfig serverConfig) { super(getFullJson(serverConfig), CodecWrappers.getCodec(CodecWrappers.JacksonJsonMini.class), getFullXml(serverConfig), CodecWrappers.getCodec(CodecWrappers.JacksonXmlMini.class)); } } @Bean public PeerAwareInstanceRegistry peerAwareInstanceRegistry( ServerCodecs serverCodecs) { this.eurekaClient.getApplications(); // force initialization return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount()); } @Bean @ConditionalOnMissingBean public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs) { return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.applicationInfoManager); } /** * {@link PeerEurekaNodes} which updates peers when /refresh is invoked. * Peers are updated only if * <code>eureka.client.use-dns-for-fetching-service-urls</code> is * <code>false</code> and one of following properties have changed. * </p> * <ul> * <li><code>eureka.client.availability-zones</code></li> * <li><code>eureka.client.region</code></li> * <li><code>eureka.client.service-url.<zone></code></li> * </ul> */ static class RefreshablePeerEurekaNodes extends PeerEurekaNodes implements ApplicationListener<EnvironmentChangeEvent> { public RefreshablePeerEurekaNodes( final PeerAwareInstanceRegistry registry, final EurekaServerConfig serverConfig, final EurekaClientConfig clientConfig, final ServerCodecs serverCodecs, final ApplicationInfoManager applicationInfoManager) { super(registry, serverConfig, clientConfig, serverCodecs, applicationInfoManager); } @Override public void onApplicationEvent(final EnvironmentChangeEvent event) { if (shouldUpdate(event.getKeys())) { updatePeerEurekaNodes(resolvePeerUrls()); } } /* * Check whether specific properties have changed. */ protected boolean shouldUpdate(final Set<String> changedKeys) { assert changedKeys != null; // if eureka.client.use-dns-for-fetching-service-urls is true, then // service-url will not be fetched from environment. if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { return false; } if (changedKeys.contains("eureka.client.region")) { return true; } for (final String key : changedKeys) { // property keys are not expected to be null. if (key.startsWith("eureka.client.service-url.") || key.startsWith("eureka.client.availability-zones.")) { return true; } } return false; } } @Bean public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) { return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager); } @Bean public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) { return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig, registry, serverContext); } /** * Register the Jersey filter */ @Bean public FilterRegistrationBean jerseyFilterRegistration( javax.ws.rs.core.Application eurekaJerseyApp) { FilterRegistrationBean bean = new FilterRegistrationBean(); bean.setFilter(new ServletContainer(eurekaJerseyApp)); bean.setOrder(Ordered.LOWEST_PRECEDENCE); bean.setUrlPatterns( Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*")); return bean; } /** * Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources * required by the Eureka server. */ @Bean public javax.ws.rs.core.Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) { ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider( false, environment); // Filter to include only classes that have a particular annotation. // provider.addIncludeFilter(new AnnotationTypeFilter(Path.class)); provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class)); // Find classes in Eureka packages (or subpackages) // Set<Class<?>> classes = new HashSet<>(); for (String basePackage : EUREKA_PACKAGES) { Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage); for (BeanDefinition bd : beans) { Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader()); classes.add(cls); } } // Construct the Jersey ResourceConfig // Map<String, Object> propsAndFeatures = new HashMap<>(); propsAndFeatures.put( // Skip static content used by the webapp ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX, EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*"); DefaultResourceConfig rc = new DefaultResourceConfig(classes); rc.setPropertiesAndFeatures(propsAndFeatures); return rc; } @Bean public FilterRegistrationBean traceFilterRegistration( @Qualifier("httpTraceFilter") Filter filter) { FilterRegistrationBean bean = new FilterRegistrationBean(); bean.setFilter(filter); bean.setOrder(Ordered.LOWEST_PRECEDENCE - 10); return bean; } }
在這個配置類上面,加入了@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
,也就是說,類EurekaServerAutoConfiguration
被註冊為Spring Bean
的前提是在Spring
容器中存在EurekaServerMarkerConfiguration.Marker.class
的對象,而這個對象存在的前提是我們在Spring Boot
啟動類中加入了@EnableEurekaServer
註解。小總結一下就是,在Spring Boot
啟動類上加入了@EnableEurekaServer
註解以後,就會觸發EurekaServerMarkerConfiguration.Marker.class
被Spring
實例化為Spring Bean
,有了這個Bean
以後,Spring
就會再實例化EurekaServerAutoConfiguration
類,而這個類就是配置了Eureka Server
的相關內容,列舉如下:
注入EurekaServerConfig—->用於註冊中心相關配置信息 注入EurekaController—->提供註冊中心上相關服務信息的展示支持 注入PeerAwareInstanceRegistry—->提供實例註冊支持,例如實例獲取、狀態更新等相關支持 注入PeerEurekaNodes—->提供註冊中心對等服務間通信支持 注入EurekaServerContext—->提供初始化註冊init服務、初始化PeerEurekaNode節點信息 注入EurekaServerBootstrap—->用於初始化initEurekaEnvironment/initEurekaServerContext
而且,在類EurekaServerAutoConfiguration
上,我們看見@Import(EurekaServerInitializerConfiguration.class)
,說明實例化類EurekaServerAutoConfiguration
之前,已經實例化了EurekaServerInitializerConfiguration
類,從類名可以看出,該類是Eureka Server
的初始化配置類,我們進入到EurekaServerInitializerConfiguration
類中一探究竟,發現該類實現了Spring
的生命周期接口SmartLifecycle
,也就是說類EurekaServerInitializerConfiguration
在被Spring
實例化過程中的時候會執行一些生命周期方法,比如Lifecycle
的start
方法,那麼看看EurekaServerInitializerConfiguration
是如何重寫start
方法的:
@Configuration public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered { // 此處省略部分代碼 @Override public void start() { new Thread(new Runnable() { @Override public void run() { try { //TODO: is this class even needed now? eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext); log.info("Started Eureka Server"); publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig())); EurekaServerInitializerConfiguration.this.running = true; publish(new EurekaServerStartedEvent(getEurekaServerConfig())); } catch (Exception ex) { // Help! log.error("Could not initialize Eureka servlet context", ex); } } }).start(); } // 此處省略部分代碼 }
這個start
方法中開啟了一個新的線程,然後進行一些Eureka Server
的初始化工作,比如調用eurekaServerBootstrap的contextInitialized
方法,進入該方法看看:
public class EurekaServerBootstrap { public void contextInitialized(ServletContext context) { try { // 初始化Eureka Server環境變量 initEurekaEnvironment(); // 初始化Eureka Server上下文 initEurekaServerContext(); context.setAttribute(EurekaServerContext.class.getName(), this.serverContext); } catch (Throwable e) { log.error("Cannot bootstrap eureka server :", e); throw new RuntimeException("Cannot bootstrap eureka server :", e); } } }
這個方法中主要進行了Eureka
的環境初始化和服務初始化,我們進入到initEurekaServerContext
方法中來看服務初始化是如何實現的:
public class EurekaServerBootstrap { protected void initEurekaServerContext() throws Exception { // For backward compatibility JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); if (isAws(this.applicationInfoManager.getInfo())) { this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager); this.awsBinder.start(); } // 初始化Eureka Server上下文環境 EurekaServerContextHolder.initialize(this.serverContext); log.info("Initialized server context"); // Copy registry from neighboring eureka node int registryCount = this.registry.syncUp(); // 期望每30秒接收到一次心跳,1分鐘就是2次 // 修改Instance Status狀態為up // 同時,這裏面會開啟一個定時任務,用於清理 60秒沒有心跳的客戶端。自動下線 this.registry.openForTraffic(this.applicationInfoManager, registryCount); // Register all monitoring statistics. EurekaMonitors.registerAllStats(); } }
在初始化Eureka Server
上下文環境後,就會繼續執行openForTraffic
方法,這個方法主要是設置了期望每分鐘接收到的心跳次數,並將服務實例的狀態設置為UP
,最後又通過方法postInit
來開啟一個定時任務,用於每隔一段時間(默認60
秒)將沒有續約的服務實例(默認90
秒沒有續約)清理掉。openForTraffic
的方法代碼如下:
@Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2. // 計算每分鐘最大續約數 this.expectedNumberOfRenewsPerMin = count * 2; // 計算每分鐘最小續約數 this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); logger.info("Got {} instances from neighboring DS node", count); logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold); this.startupTime = System.currentTimeMillis(); if (count > 0) { this.peerInstancesTransferEmptyOnStartup = false; } DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.."); primeAwsReplicas(applicationInfoManager); } logger.info("Changing status to UP"); // 修改服務實例的狀態為UP applicationInfoManager.setInstanceStatus(InstanceStatus.UP); // 開啟定時任務,每隔一段時間(默認60秒)將沒有續約的服務實例(默認90秒沒有續約)清理掉 super.postInit(); }
postInit
方法開啟了一個新的定時任務,代碼如下:
protected void postInit() { renewsLastMin.start(); if (evictionTaskRef.get() != null) { evictionTaskRef.get().cancel(); } evictionTaskRef.set(new EvictionTask()); evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs()); }
這裡的時間間隔都來自於EurekaServerConfigBean
類,可以在配置文件中以eureka.server
開頭的配置來進行設置。 當然,服務註冊中心啟動的源碼不僅僅只有這麼多,其還有向其他集群中的服務註冊中心複製服務實例列表的相關源碼沒有在這裡進行分析,感興趣的朋友可以自行斷點分析。
Eureka Client服務註冊行為分析
我們啟動一個本地的服務註冊中心,然後再啟動一個單節點的服務提供者,我們都知道,在服務註冊中心已經啟動情況下,然後再啟動服務提供者,服務提供者會將服務註冊到服務註冊中心,那麼這個註冊行為是如何運作的呢?我們都知道,服務註冊行為是在服務提供者啟動過程中完成的,那麼我們完全可以從啟動日誌中揣摩出註冊行為,請看下面服務提供者的啟動日誌:
2018-12-01 15:37:17.832 INFO 31948 --- [ restartedMain] o.s.c.n.eureka.InstanceInfoFactory : Setting initial instance status as: STARTING 2018-12-01 15:37:17.868 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Initializing Eureka in region us-east-1 2018-12-01 15:37:18.031 INFO 31948 --- [ restartedMain] c.n.d.provider.DiscoveryJerseyProvider : Using JSON encoding codec LegacyJacksonJson 2018-12-01 15:37:18.031 INFO 31948 --- [ restartedMain] c.n.d.provider.DiscoveryJerseyProvider : Using JSON decoding codec LegacyJacksonJson 2018-12-01 15:37:18.168 INFO 31948 --- [ restartedMain] c.n.d.provider.DiscoveryJerseyProvider : Using XML encoding codec XStreamXml 2018-12-01 15:37:18.168 INFO 31948 --- [ restartedMain] c.n.d.provider.DiscoveryJerseyProvider : Using XML decoding codec XStreamXml 2018-12-01 15:37:18.370 INFO 31948 --- [ restartedMain] c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration 2018-12-01 15:37:18.387 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Disable delta property : false 2018-12-01 15:37:18.387 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Single vip registry refresh property : null 2018-12-01 15:37:18.387 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Force full registry fetch : false 2018-12-01 15:37:18.387 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Application is null : false 2018-12-01 15:37:18.387 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Registered Applications size is zero : true 2018-12-01 15:37:18.387 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Application version is -1: true 2018-12-01 15:37:18.387 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Getting all instance registry info from the eureka server 2018-12-01 15:37:18.539 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : The response status is 200 2018-12-01 15:37:18.541 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Starting heartbeat executor: renew interval is: 30 2018-12-01 15:37:18.543 INFO 31948 --- [ restartedMain] c.n.discovery.InstanceInfoReplicator : InstanceInfoReplicator onDemand update allowed rate per min is 4 2018-12-01 15:37:18.548 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Discovery Client initialized at timestamp 1543649838546 with initial instances count: 1 2018-12-01 15:37:18.554 INFO 31948 --- [ restartedMain] o.s.c.n.e.s.EurekaServiceRegistry : Registering application PRODUCER-SERVICE with eureka with status UP 2018-12-01 15:37:18.556 INFO 31948 --- [ restartedMain] com.netflix.discovery.DiscoveryClient : Saw local status change event StatusChangeEvent [timestamp=1543649838556, current=UP, previous=STARTING] 2018-12-01 15:37:18.557 INFO 31948 --- [nfoReplicator-0] com.netflix.discovery.DiscoveryClient : DiscoveryClient_PRODUCER-SERVICE/producer-service:-1612568227: registering service... 2018-12-01 15:37:18.627 INFO 31948 --- [nfoReplicator-0] com.netflix.discovery.DiscoveryClient : DiscoveryClient_PRODUCER-SERVICE/producer-service:-1612568227 - registration status: 204 2018-12-01 15:37:18.645 INFO 31948 --- [ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 13226 (http) with context path '' 2018-12-01 15:37:18.647 INFO 31948 --- [ restartedMain] .s.c.n.e.s.EurekaAutoServiceRegistration : Updating port to 13226 2018-12-01 15:37:18.654 INFO 31948 --- [ restartedMain] ringCloudEurekaProducerClientApplication : Started SpringCloudEurekaProducerClientApplication in 5.537 seconds (JVM running for 7.0) 2018-12-01 15:42:18.402 INFO 31948 --- [trap-executor-0] c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration
從日誌中我們可以讀取到許多信息:
- 第一行日誌告訴我們,服務提供者實例的狀態被標註為「正在啟動」。
- 第二行日誌告訴我們,在默認的名為
「us-east-1」
的Region
中初始化Eureka
客戶端,Region
的名稱是可以配置的,可以通過eureka.client.region
來配置,如果沒有配置它,那麼默認的Region
就是us-east-1
。這裡順便多說一句,一個微服務應用只可以註冊到一個Region
中,也就是說一個微服務應用對應一個Region
,一個Region
對應多個Zone
,是否還記得,我們在配置集群的Eureka Server
服務註冊中心的時候,都設置了eureka.client.service-url.defaultZone
這個值,就是為了告訴服務提供者者或者集群內的其他Eureka Server
,可以向這個Zone
註冊,並且defaultZone
的值是使用逗號隔開的,也就是說我們的服務可以同時向多個Zone
註冊。由此可見,一個服務可以同時註冊到一個Region
中的多個Zone
的。如果需要自己指定Zone
,可以通過eureka.client.availability-zones
來指定。關於Region
和Zone
請看下面的源碼:
public static String getRegion(EurekaClientConfig clientConfig) { String region = clientConfig.getRegion(); if (region == null) { region = DEFAULT_REGION; } region = region.trim().toLowerCase(); return region; }
public String[] getAvailabilityZones(String region) { String value = this.availabilityZones.get(region); if (value == null) { value = DEFAULT_ZONE; } return value.split(","); }
- 日誌中
getting all instance registry info from the eureka server
表示服務在註冊的過程中也會向服務註冊中心獲取其他服務實例的信息列表。 - 日誌中
Starting heartbeat executor: renew interval is: 30
表示以默認的30
秒作為間隔向服務註冊中心發起心跳請求,告訴服務註冊中心「我還活着
」。 - 日誌中
Discovery Client initialized at timestamp 1543649838546 with initial instances count: 1
表示在時間戳1543649838546
的時候,服務成功初始化完成。 - 日誌中
DiscoveryClient_PRODUCER-SERVICE/producer-service:-1612568227: registering service...
表示開始將服務註冊到服務註冊中心。 - 日誌中
DiscoveryClient_PRODUCER-SERVICE/producer-service:-1612568227 - registration status: 204
表示服務註冊完成,完成的狀態標誌為204
。
接下來,我們進入到源碼中,藉助源代碼來分析一下服務註冊到服務註冊中心的流程。在分析之前,我們有必要搞清楚Spring Cloud
是如何集成Eureka
的,我們都知道,在Eureka
客戶端,無論是服務提供者還是服務消費者,都需要加上@EnableDiscoveryClient
註解,用來開啟DiscoveryClient
實例,我們通過搜索DiscoveryClient
,可以發現,搜索的結果有一個接口還有一個類,接口在包org.springframework.cloud.client.discovery
下,類在com.netflix.discovery
包下,接口DiscoveryClient
是Spring Cloud
的接口,它定義了用來發現服務的常用方法,通過該接口可以有效地屏蔽服務治理中的實現細節,這就方便切換不同的服務服務治理框架,而無需改動從Spring Cloud
層面調用的代碼,該接口有一個實現類EurekaDiscoveryClient
,從命名可以可以看出他是對Eureka
服務發現的封裝,進入到EurekaDiscoveryClient
可以看到,它有一個成員變量為EurekaClient
,這是包com.netflix.discovery
下的一個接口,該接口繼承了LookupService
接口,且有一個實現類DiscoveryClient
,接口EurekaClient
和LookupService
都在com.netflix.discovery
包下,他們都定義了針對Eureka
的服務發現的抽象方法,而EurekaClient
的實現類DiscoveryClient
則實現了這些抽象方法,所以說,類DiscoveryClient
是真正實現發現服務的類。結合以上的文字,下面展示接口與類的關係圖如下所示:
我們進入到DiscoveryClient類中查看源碼,首先看到的是它的類注釋如下所示:
/** * The class that is instrumental for interactions with <tt>Eureka Server</tt>. * * <p> * <tt>Eureka Client</tt> is responsible for a) <em>Registering</em> the * instance with <tt>Eureka Server</tt> b) <em>Renewal</em>of the lease with * <tt>Eureka Server</tt> c) <em>Cancellation</em> of the lease from * <tt>Eureka Server</tt> during shutdown * <p> * d) <em>Querying</em> the list of services/instances registered with * <tt>Eureka Server</tt> * <p> * * <p> * <tt>Eureka Client</tt> needs a configured list of <tt>Eureka Server</tt> * {@link java.net.URL}s to talk to.These {@link java.net.URL}s are typically amazon elastic eips * which do not change. All of the functions defined above fail-over to other * {@link java.net.URL}s specified in the list in the case of failure. * </p> * * @author Karthik Ranganathan, Greg Kim * @author Spencer Gibb * */
大致翻譯如下:
這個類用於幫助與Eureka Server相互協作。 Eureka Client負責下面的任務: - 向Eureka Server註冊服務實例 - 向Eureka Server服務續約 - 當服務關閉期間,向Eureka Server取消租約 - 查詢Eureka Server中的服務實例列表 Eureka Client還需要配置一個Eureka Server的URL列表
在分析類DiscoveryClient
完成的具體任務之前,我們首先來回憶一下,我們在配置服務提供者的時候,在配置文件中都配置了eureka.client.service-url.defaultZone
屬性,而這個屬性的值就是告訴服務提供者,該向哪裡註冊服務,也就是服務註冊的地址,該地址比如是http://peer1:1111/eureka/,http://peer2:1112/eureka/,http://peer3:1113/eureka/
,各個地址之間使用逗號隔開,我們在類EndpointUtils
中可以找到一個方法名為getServiceUrlsMapFromConfig
的方法,代碼如下:
public static Map<String, List<String>> getServiceUrlsMapFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) { Map<String, List<String>> orderedUrls = new LinkedHashMap<>(); String region = getRegion(clientConfig); String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); if (availZones == null || availZones.length == 0) { availZones = new String[1]; availZones[0] = DEFAULT_ZONE; } logger.debug("The availability zone for the given region {} are {}", region, availZones); int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones); String zone = availZones[myZoneOffset]; List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(zone); if (serviceUrls != null) { orderedUrls.put(zone, serviceUrls); } int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1); while (currentOffset != myZoneOffset) { zone = availZones[currentOffset]; serviceUrls = clientConfig.getEurekaServerServiceUrls(zone); if (serviceUrls != null) { orderedUrls.put(zone, serviceUrls); } if (currentOffset == (availZones.length - 1)) { currentOffset = 0; } else { currentOffset++; } } if (orderedUrls.size() < 1) { throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!"); } return orderedUrls; }
該方法就是從我們配置的Zone
中讀取註冊地址,並組成一個List
,最後將這個List
存儲到Map
集合中,在讀取過程中,它首先加載的是getRegion
方法,這個方法讀取了一個Region
返回,進入到getRegion
方法中:
/** * Get the region that this particular instance is in. * * @return - The region in which the particular instance belongs to. */ public static String getRegion(EurekaClientConfig clientConfig) { String region = clientConfig.getRegion(); if (region == null) { region = DEFAULT_REGION; } region = region.trim().toLowerCase(); return region; }
從方法的注釋上可以知道,一個微服務應用只可以屬於一個Region
,方法體中的第一行代碼就是從EurekaClientConfigBean
類中來讀取Region
,而EurekaClientConfigBean
中getRegion
方法返回的值就是需要我們配置的,在配置文件中,對應的屬性是eureka.client.region
,如果我們每月配置,那麼將使用默認的Region
,默認DEFAULT_REGION
為default
。在方法getServiceUrlsMapFromConfig
中,還加載了getAvailabilityZones
方法,方法代碼如下所示:
public String[] getAvailabilityZones(String region) { String value = this.availabilityZones.get(region); if (value == null) { value = DEFAULT_ZONE; } return value.split(","); }
上述方法中第一行代碼是從Region
中獲取Zone
,availabilityZones
是EurekaClientConfigBean
的一個Map
成員變量,如果我們沒有為Region
特別配置eureka.client.availablity-zones
屬性,那麼zone
將採用默認值,默認值是defaultZone
,這就是我們一開始配置eureka.client.service-url.defaultZone
的由來,由此可見,一個Region
對應多個Zone
,也就是說一個微服務應用可以向多個服務註冊地址註冊。在獲取了Region
和Zone
的信息之後,才開始真正地加載Eureka Server
的具體地址,它根據傳入的參數按照一定的算法確定加載位於哪一個Zone
配置的serviceUrls
,代碼如下:
int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones); String zone = availZones[myZoneOffset]; List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);
當我們在微服務應用中使用Ribbon
來實現服務調用時,對於Zone
的設置可以在負載均衡時實現區域親和特性:Ribbon
的默認策略會優先訪問同客戶端處於一個Zone
中的服務實例,只有當同一個Zone
中沒有可用的服務實例的時候才會去訪問其他Zone
中的實例。利用親和性這一特性,我們就可以有效地設計出針對區域性故障的容錯集群。 從本小節一開始,我們就分析了Spring Cloud Eureka
是對Netflix Eureka
的封裝,com.netflix.discovery
包下的DiscoveryClient
才是真正實現服務的註冊與發現。我們一起來看看它的構造方法:
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) { if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); this.preRegistrationHandler = args.preRegistrationHandler; } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; this.preRegistrationHandler = null; } this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } this.backupRegistryProvider = backupRegistryProvider; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done } try { // default size of 2 - 1 each for heartbeat and cacheRefresh scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } // call and execute the pre registration handler before all background tasks (inc registration) is started if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { if (!register() ) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { logger.error("Registration error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch // 這個方法里實現了服務向服務註冊中心註冊的行為 initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); }
整個構造方法里,一開始進行了各種參數的設置,而真正地註冊行為是在initScheduledTasks
方法里實現的,我們一起來看看註冊行為是如何實現的:
private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
這段代碼中有兩個主要的if
代碼塊,第一個if
代碼塊是決定是否從Eureka Server
來獲取註冊信息,判斷條件clientConfig.shouldFetchRegistry()
是需要我們自己的在配置文件中通過屬性eureka.client.fetch-registry=true
進行配置的,默認為true
,也就是說服務會從Eureka Server
拉取註冊信息,且默認間隔為30
秒,每30
秒執行一次定時任務,用於刷新所獲取的註冊信息。 第二個if
代碼塊是決定是否將服務註冊到服務註冊中心的,也是我們本次要探討的主要內容。判斷條件clientConfig.shouldRegisterWithEureka()
表示是否向Eureka Server
註冊自己,你是否還記得,我們在搭建單節點服務註冊中心的時候,我們搭建的那個Eureka Server
設置了屬性eureka.client.register-with-eureka=false
,意思就是說禁止Eureka Server
把自己當做一個普通服務註冊到自身,而這個屬性默認值也是為true
,也就是說我們在註冊的服務的時候,無需配置這個屬性,就可以將服務註冊到服務註冊中心。分析第二個if
代碼塊,代碼塊中一開始就設置了一個定時任務,這個定時任務就是按照指定的時間間隔向Eureka Server
發送心跳,告訴服務註冊中心「我還活着
」,對於發送心跳的時間間隔,我們一開始就討論過,默認是30
秒,這也就是為什麼按照默認來說,一分鐘理應發送兩次心跳了,這個心跳間隔我們可以在配置文件中進行配置,配置屬性為eureka.instance.lease-renewal-interval-in-seconds=30
,對於默認90
秒內沒有發送心跳的服務,將會被服務在服務註冊中心剔除,剔除時間間隔可以通過屬性eureka.instance.lease-expiration-duration-in-seconds=90
來進行配置。而整個服務的續約邏輯也很簡單,在定時任務中有一個代碼片段new HeartbeatThread()
,然後開啟了一個新的線程實現續約服務,就是通過發送REST
請求來實現的,具體代碼如下:
/** * Renew with the eureka service by making the appropriate REST call */ boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } }
服務提供者向服務註冊中心發送心跳,並檢查返回碼,如果是404
,那麼服務將重新調用register
方法,實現將服務註冊到服務註冊中心,否則直接檢測返回碼是否是200
,返回布爾類型來告訴定時器是否續約成功。 續約的操作完成之後,就開始了服務實例的複製工作,緊接着通過服務實例管理器ApplicationInfoManager
來創建一個服務實例狀態監聽器,用於監聽服務實例的狀態,並進入到onDemandUpdate
中進行註冊,方法onDemandUpdate
的代碼如下:
public boolean onDemandUpdate() { if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { if (!scheduler.isShutdown()) { scheduler.submit(new Runnable() { @Override public void run() { logger.debug("Executing on-demand update of local InstanceInfo"); Future latestPeriodic = scheduledPeriodicRef.get(); if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update"); latestPeriodic.cancel(false); } InstanceInfoReplicator.this.run(); } }); return true; } else { logger.warn("Ignoring onDemand update due to stopped scheduler"); return false; } } else { logger.warn("Ignoring onDemand update due to rate limiter"); return false; } } public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
服務實例的註冊行為是在方法register
中執行的,進入到register
方法中,代碼如下:
/** * Register with the eureka service by making the appropriate REST call. */ boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; }
註冊行為其實就是將服務實例的信息通過HTTP
請求傳遞給Eureka Server
服務註冊中心,當註冊中心接收到註冊信息之後將返回204
狀態碼給客戶端,表示註冊成功。這就是客戶端向服務註冊中心註冊的行為源碼分析,那麼服務註冊中心是如何接收這些實例的註冊信息,且如何保存的呢?請接着往下看。
Eureka Server服務註冊中心接收註冊行為分析
客戶端向服務端發起註冊請求之後,服務端是如何處理的呢?通過源碼的分析,可以發現,客戶端和服務端的交互都是通過REST
請求發起的,而在服務端,包com.netflix.eureka.resources
下定義了許多處理REST
請求的類,對於接收客戶端的註冊信息,可以發現在類ApplicationResource
下有一個addInstance
方法,專門用來處理註冊請求的,我們一起來分析這個方法:
/** * Registers information about a particular instance for an * {@link com.netflix.discovery.shared.Application}. * * @param info * {@link InstanceInfo} information of the instance. * @param isReplication * a header parameter containing information whether this is * replicated from other nodes. */ @POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // validate that the instanceinfo contains all the necessary required fields if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getIPAddr())) { return Response.status(400).entity("Missing ip address").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } // handle cases where clients may be registering with bad DataCenterInfo with missing data DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible }
在接收服務實例註冊的時候,首先要經過一系列的數據校驗,通過校驗以後調用PeerAwareInstanceRegistry
的實現類對象的register
方法對服務進行註冊,進入到register
方法繼續分析:
@Override public void register(final InstanceInfo info, final boolean isReplication) { handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication); super.register(info, isReplication); }
方法體中第一行代碼中調用了publishEvent方法,將註冊事件傳播出去,然後繼續調用com.netflix.eureka.registry包下的AbstractInstanceRegistry抽象類的register方法進行註冊:
/** * Registers a new instance with a given duration. * * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean) */ public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { read.lock(); Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); if (gMap == null) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease if (existingLease != null && (existingLease.getHolder() != null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted // InstanceInfo instead of the server local copy. if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); } } else { // The lease does not exist and hence it is a new registration synchronized (lock) { if (this.expectedNumberOfRenewsPerMin > 0) { // Since the client wants to cancel it, reduce the threshold // (1 // for 30 seconds, 2 for a minute) this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2; this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); } } logger.debug("No previous lease information found; it is new registration"); } Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } gMap.put(registrant.getId(), lease); synchronized (recentRegisteredQueue) { recentRegisteredQueue.add(new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); } // This is where the initial state transfer of overridden status happens if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // Set the status based on the overridden status rules InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // If the lease is registered with UP status, set lease service up timestamp if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } registrant.setActionType(ActionType.ADDED); recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); }
在代碼中我們看到,實例信息InstanceInfo
對象被存儲在以instanceId
為key
的ConcurrentHashMap
中,然後又將這個ConcurrentHashMap
對象存儲到了以服務名為key
的Map
中,這就形成了雙層Map
結構,這也就對應了一開始我們所說的服務的元信息存儲在一個雙層Map
結構中。
小結
這就基本完成了對Spring Cloud Eureka
的簡單源碼分析,這裡僅僅是對Eureka Server
初始化的源碼、服務註冊、服務端接收註冊的源碼進行了簡單分析,感興趣的朋友可以通過DEBUG
方式深入了解源碼的運行機制。