Spring Cloud微服務技術棧(四):服務治理Spring Cloud Eureka部分源碼分析

上一篇文章《Spring Cloud微服務技術棧(三):服務治理Spring Cloud Eureka核心元素分析》主要對Spring Cloud Eureka的三個核心元素(服務註冊中心、服務提供者、服務消費者)進行了分析,熟悉了三者之間的通訊關係,本篇文章將主要分析Spring Cloud Eureka的部分源碼。

當我們搭建好Eureka Server服務註冊中心並啟動後,就可以繼續啟動服務提供者和服務消費者了。大家都知道,當服務提供者成功啟動後,就會向服務註冊中心註冊自己的服務,服務消費者成功啟動後,就會向服務註冊中心獲取服務實例列表,根據實例列表來調用具體服務。那麼,這整個過程是如何運轉的呢?我們一起來根據源碼的思路來探索。

Eureka Server服務註冊中心源碼分析

回憶之前我們一起搭建的服務註冊中心的項目,我們在服務註冊中心的項目中的application.properties文件中配置好服務註冊中心需要的相關配置,然後在Spring Boot的啟動類中加了一個註解@EnableEurekaServer,然後啟動項目就成功啟動了服務註冊中心,那麼到底是如何啟動的呢? 在配置文件中(單節點),我們是如下配置的:

# 配置埠  server.port=1111  # 配置服務註冊中心地址  eureka.instance.hostname=localhost  # 作為服務註冊中心,禁止本應用向自己註冊服務  eureka.client.register-with-eureka=false  # 作為服務註冊中心,禁止本應用向自己檢索服務  eureka.client.fetch-registry=false  # 設置服務註冊中心服務註冊地址  eureka.client.service-url.defaultZone=http://${eureka.instance.hostname}:${server.port}/eureka/  # 關閉自我保護機制,及時剔除無效服務  eureka.server.enable-self-preservation=false

這個配置在工程啟動的時候,會被Spring容器讀取,配置到EurekaClientConfigBean中,而這個配置類會被註冊成SpringBean以供其他的Bean來使用。 我們再進入註解@EnableEurekaServer一探究竟,@EnableEurekaServer的源碼如下:

@Target(ElementType.TYPE)  @Retention(RetentionPolicy.RUNTIME)  @Documented  @Import(EurekaServerMarkerConfiguration.class)  public @interface EnableEurekaServer {    }

從上述註解可以看出,該註解導入了配置類EurekaServerMarkerConfiguration,我們在進一步進入到EurekaServerMarkerConfiguration中,程式碼如下所示:

@Configuration  public class EurekaServerMarkerConfiguration {  	@Bean  	public Marker eurekaServerMarkerBean() {  		return new Marker();  	}  	class Marker {  	}  }

從這個配置類中暫時無法看到什麼具體的內容,我們可以進一步查看類Marker在哪些地方被使用了,通過搜索Marker,可以發現在類EurekaServerAutoConfiguration上的註解中被引用了,具體程式碼如下所示:

@Configuration  @Import(EurekaServerInitializerConfiguration.class)  @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)  @EnableConfigurationProperties({ EurekaDashboardProperties.class,  		InstanceRegistryProperties.class })  @PropertySource("classpath:/eureka/server.properties")  public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {  	/**  	 * List of packages containing Jersey resources required by the Eureka server  	 */  	private static final String[] EUREKA_PACKAGES = new String[] { "com.netflix.discovery",  			"com.netflix.eureka" };    	@Autowired  	private ApplicationInfoManager applicationInfoManager;    	@Autowired  	private EurekaServerConfig eurekaServerConfig;    	@Autowired  	private EurekaClientConfig eurekaClientConfig;    	@Autowired  	private EurekaClient eurekaClient;    	@Autowired  	private InstanceRegistryProperties instanceRegistryProperties;    	public static final CloudJacksonJson JACKSON_JSON = new CloudJacksonJson();    	@Bean  	public HasFeatures eurekaServerFeature() {  		return HasFeatures.namedFeature("Eureka Server",  				EurekaServerAutoConfiguration.class);  	}    	@Configuration  	protected static class EurekaServerConfigBeanConfiguration {  		@Bean  		@ConditionalOnMissingBean  		public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {  			EurekaServerConfigBean server = new EurekaServerConfigBean();  			if (clientConfig.shouldRegisterWithEureka()) {  				// Set a sensible default if we are supposed to replicate  				server.setRegistrySyncRetries(5);  			}  			return server;  		}  	}    	@Bean  	@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)  	public EurekaController eurekaController() {  		return new EurekaController(this.applicationInfoManager);  	}    	static {  		CodecWrappers.registerWrapper(JACKSON_JSON);  		EurekaJacksonCodec.setInstance(JACKSON_JSON.getCodec());  	}    	@Bean  	public ServerCodecs serverCodecs() {  		return new CloudServerCodecs(this.eurekaServerConfig);  	}    	private static CodecWrapper getFullJson(EurekaServerConfig serverConfig) {  		CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getJsonCodecName());  		return codec == null ? CodecWrappers.getCodec(JACKSON_JSON.codecName()) : codec;  	}    	private static CodecWrapper getFullXml(EurekaServerConfig serverConfig) {  		CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getXmlCodecName());  		return codec == null ? CodecWrappers.getCodec(CodecWrappers.XStreamXml.class)  				: codec;  	}    	class CloudServerCodecs extends DefaultServerCodecs {    		public CloudServerCodecs(EurekaServerConfig serverConfig) {  			super(getFullJson(serverConfig),  					CodecWrappers.getCodec(CodecWrappers.JacksonJsonMini.class),  					getFullXml(serverConfig),  					CodecWrappers.getCodec(CodecWrappers.JacksonXmlMini.class));  		}  	}    	@Bean  	public PeerAwareInstanceRegistry peerAwareInstanceRegistry(  			ServerCodecs serverCodecs) {  		this.eurekaClient.getApplications(); // force initialization  		return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,  				serverCodecs, this.eurekaClient,  				this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),  				this.instanceRegistryProperties.getDefaultOpenForTrafficCount());  	}    	@Bean  	@ConditionalOnMissingBean  	public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,  			ServerCodecs serverCodecs) {  		return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,  				this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);  	}    	/**  	 * {@link PeerEurekaNodes} which updates peers when /refresh is invoked.  	 * Peers are updated only if  	 * <code>eureka.client.use-dns-for-fetching-service-urls</code> is  	 * <code>false</code> and one of following properties have changed.  	 * </p>  	 * <ul>  	 * <li><code>eureka.client.availability-zones</code></li>  	 * <li><code>eureka.client.region</code></li>  	 * <li><code>eureka.client.service-url.&lt;zone&gt;</code></li>  	 * </ul>  	 */  	static class RefreshablePeerEurekaNodes extends PeerEurekaNodes  			implements ApplicationListener<EnvironmentChangeEvent> {    		public RefreshablePeerEurekaNodes(  				final PeerAwareInstanceRegistry registry,  				final EurekaServerConfig serverConfig,  				final EurekaClientConfig clientConfig,  				final ServerCodecs serverCodecs,  				final ApplicationInfoManager applicationInfoManager) {  			super(registry, serverConfig, clientConfig, serverCodecs, applicationInfoManager);  		}    		@Override  		public void onApplicationEvent(final EnvironmentChangeEvent event) {  			if (shouldUpdate(event.getKeys())) {  				updatePeerEurekaNodes(resolvePeerUrls());  			}  		}    		/*  		 * Check whether specific properties have changed.  		 */  		protected boolean shouldUpdate(final Set<String> changedKeys) {  			assert changedKeys != null;    			// if eureka.client.use-dns-for-fetching-service-urls is true, then  			// service-url will not be fetched from environment.  			if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {  				return false;  			}    			if (changedKeys.contains("eureka.client.region")) {  				return true;  			}    			for (final String key : changedKeys) {  				// property keys are not expected to be null.  				if (key.startsWith("eureka.client.service-url.") ||  					key.startsWith("eureka.client.availability-zones.")) {  					return true;  				}  			}    			return false;  		}  	}    	@Bean  	public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,  			PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {  		return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,  				registry, peerEurekaNodes, this.applicationInfoManager);  	}    	@Bean  	public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,  			EurekaServerContext serverContext) {  		return new EurekaServerBootstrap(this.applicationInfoManager,  				this.eurekaClientConfig, this.eurekaServerConfig, registry,  				serverContext);  	}    	/**  	 * Register the Jersey filter  	 */  	@Bean  	public FilterRegistrationBean jerseyFilterRegistration(  			javax.ws.rs.core.Application eurekaJerseyApp) {  		FilterRegistrationBean bean = new FilterRegistrationBean();  		bean.setFilter(new ServletContainer(eurekaJerseyApp));  		bean.setOrder(Ordered.LOWEST_PRECEDENCE);  		bean.setUrlPatterns(  				Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));    		return bean;  	}    	/**  	 * Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources  	 * required by the Eureka server.  	 */  	@Bean  	public javax.ws.rs.core.Application jerseyApplication(Environment environment,  			ResourceLoader resourceLoader) {    		ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(  				false, environment);    		// Filter to include only classes that have a particular annotation.  		//  		provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));  		provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));    		// Find classes in Eureka packages (or subpackages)  		//  		Set<Class<?>> classes = new HashSet<>();  		for (String basePackage : EUREKA_PACKAGES) {  			Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);  			for (BeanDefinition bd : beans) {  				Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),  						resourceLoader.getClassLoader());  				classes.add(cls);  			}  		}    		// Construct the Jersey ResourceConfig  		//  		Map<String, Object> propsAndFeatures = new HashMap<>();  		propsAndFeatures.put(  				// Skip static content used by the webapp  				ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,  				EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");    		DefaultResourceConfig rc = new DefaultResourceConfig(classes);  		rc.setPropertiesAndFeatures(propsAndFeatures);    		return rc;  	}    	@Bean  	public FilterRegistrationBean traceFilterRegistration(  			@Qualifier("httpTraceFilter") Filter filter) {  		FilterRegistrationBean bean = new FilterRegistrationBean();  		bean.setFilter(filter);  		bean.setOrder(Ordered.LOWEST_PRECEDENCE - 10);  		return bean;  	}  }

在這個配置類上面,加入了@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class),也就是說,類EurekaServerAutoConfiguration被註冊為Spring Bean的前提是在Spring容器中存在EurekaServerMarkerConfiguration.Marker.class的對象,而這個對象存在的前提是我們在Spring Boot啟動類中加入了@EnableEurekaServer註解。小總結一下就是,在Spring Boot啟動類上加入了@EnableEurekaServer註解以後,就會觸發EurekaServerMarkerConfiguration.Marker.classSpring實例化為Spring Bean,有了這個Bean以後,Spring就會再實例化EurekaServerAutoConfiguration類,而這個類就是配置了Eureka Server的相關內容,列舉如下:

注入EurekaServerConfig—->用於註冊中心相關配置資訊  注入EurekaController—->提供註冊中心上相關服務資訊的展示支援  注入PeerAwareInstanceRegistry—->提供實例註冊支援,例如實例獲取、狀態更新等相關支援  注入PeerEurekaNodes—->提供註冊中心對等服務間通訊支援  注入EurekaServerContext—->提供初始化註冊init服務、初始化PeerEurekaNode節點資訊  注入EurekaServerBootstrap—->用於初始化initEurekaEnvironment/initEurekaServerContext

而且,在類EurekaServerAutoConfiguration上,我們看見@Import(EurekaServerInitializerConfiguration.class),說明實例化類EurekaServerAutoConfiguration之前,已經實例化了EurekaServerInitializerConfiguration類,從類名可以看出,該類是Eureka Server的初始化配置類,我們進入到EurekaServerInitializerConfiguration類中一探究竟,發現該類實現了Spring的生命周期介面SmartLifecycle,也就是說類EurekaServerInitializerConfiguration在被Spring實例化過程中的時候會執行一些生命周期方法,比如Lifecyclestart方法,那麼看看EurekaServerInitializerConfiguration是如何重寫start方法的:

@Configuration  public class EurekaServerInitializerConfiguration  		implements ServletContextAware, SmartLifecycle, Ordered {    	// 此處省略部分程式碼    	@Override  	public void start() {  		new Thread(new Runnable() {  			@Override  			public void run() {  				try {  					//TODO: is this class even needed now?  					eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);  					log.info("Started Eureka Server");    					publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));  					EurekaServerInitializerConfiguration.this.running = true;  					publish(new EurekaServerStartedEvent(getEurekaServerConfig()));  				}  				catch (Exception ex) {  					// Help!  					log.error("Could not initialize Eureka servlet context", ex);  				}  			}  		}).start();  	}    	// 此處省略部分程式碼  }

這個start方法中開啟了一個新的執行緒,然後進行一些Eureka Server的初始化工作,比如調用eurekaServerBootstrap的contextInitialized方法,進入該方法看看:

public class EurekaServerBootstrap {  	public void contextInitialized(ServletContext context) {  		try {  			// 初始化Eureka Server環境變數  			initEurekaEnvironment();  			// 初始化Eureka Server上下文  			initEurekaServerContext();    			context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);  		}  		catch (Throwable e) {  			log.error("Cannot bootstrap eureka server :", e);  			throw new RuntimeException("Cannot bootstrap eureka server :", e);  		}  	}  }

這個方法中主要進行了Eureka的環境初始化和服務初始化,我們進入到initEurekaServerContext方法中來看服務初始化是如何實現的:

public class EurekaServerBootstrap {  	protected void initEurekaServerContext() throws Exception {  		// For backward compatibility  		JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),  				XStream.PRIORITY_VERY_HIGH);  		XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),  				XStream.PRIORITY_VERY_HIGH);    		if (isAws(this.applicationInfoManager.getInfo())) {  			this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,  					this.eurekaClientConfig, this.registry, this.applicationInfoManager);  			this.awsBinder.start();  		}  		// 初始化Eureka Server上下文環境  		EurekaServerContextHolder.initialize(this.serverContext);    		log.info("Initialized server context");    		// Copy registry from neighboring eureka node  		int registryCount = this.registry.syncUp();  		// 期望每30秒接收到一次心跳,1分鐘就是2次      	// 修改Instance Status狀態為up      	// 同時,這裡面會開啟一個定時任務,用於清理 60秒沒有心跳的客戶端。自動下線  		this.registry.openForTraffic(this.applicationInfoManager, registryCount);    		// Register all monitoring statistics.  		EurekaMonitors.registerAllStats();  	}  }

在初始化Eureka Server上下文環境後,就會繼續執行openForTraffic方法,這個方法主要是設置了期望每分鐘接收到的心跳次數,並將服務實例的狀態設置為UP,最後又通過方法postInit來開啟一個定時任務,用於每隔一段時間(默認60秒)將沒有續約的服務實例(默認90秒沒有續約)清理掉。openForTraffic的方法程式碼如下:

@Override  public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {      // Renewals happen every 30 seconds and for a minute it should be a factor of 2.      // 計算每分鐘最大續約數      this.expectedNumberOfRenewsPerMin = count * 2;      // 計算每分鐘最小續約數      this.numberOfRenewsPerMinThreshold =              (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());      logger.info("Got {} instances from neighboring DS node", count);      logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);      this.startupTime = System.currentTimeMillis();      if (count > 0) {          this.peerInstancesTransferEmptyOnStartup = false;      }      DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();      boolean isAws = Name.Amazon == selfName;      if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {          logger.info("Priming AWS connections for all replicas..");          primeAwsReplicas(applicationInfoManager);      }      logger.info("Changing status to UP");      // 修改服務實例的狀態為UP      applicationInfoManager.setInstanceStatus(InstanceStatus.UP);      // 開啟定時任務,每隔一段時間(默認60秒)將沒有續約的服務實例(默認90秒沒有續約)清理掉      super.postInit();  }

postInit方法開啟了一個新的定時任務,程式碼如下:

protected void postInit() {      renewsLastMin.start();      if (evictionTaskRef.get() != null) {          evictionTaskRef.get().cancel();      }      evictionTaskRef.set(new EvictionTask());      evictionTimer.schedule(evictionTaskRef.get(),              serverConfig.getEvictionIntervalTimerInMs(),              serverConfig.getEvictionIntervalTimerInMs());  }

這裡的時間間隔都來自於EurekaServerConfigBean類,可以在配置文件中以eureka.server開頭的配置來進行設置。 當然,服務註冊中心啟動的源碼不僅僅只有這麼多,其還有向其他集群中的服務註冊中心複製服務實例列表的相關源碼沒有在這裡進行分析,感興趣的朋友可以自行斷點分析。

Eureka Client服務註冊行為分析

我們啟動一個本地的服務註冊中心,然後再啟動一個單節點的服務提供者,我們都知道,在服務註冊中心已經啟動情況下,然後再啟動服務提供者,服務提供者會將服務註冊到服務註冊中心,那麼這個註冊行為是如何運作的呢?我們都知道,服務註冊行為是在服務提供者啟動過程中完成的,那麼我們完全可以從啟動日誌中揣摩出註冊行為,請看下面服務提供者的啟動日誌:

2018-12-01 15:37:17.832  INFO 31948 --- [  restartedMain] o.s.c.n.eureka.InstanceInfoFactory       : Setting initial instance status as: STARTING  2018-12-01 15:37:17.868  INFO 31948 --- [  restartedMain] com.netflix.discovery.DiscoveryClient    : Initializing Eureka in region us-east-1  2018-12-01 15:37:18.031  INFO 31948 --- [  restartedMain] c.n.d.provider.DiscoveryJerseyProvider   : Using JSON encoding codec LegacyJacksonJson  2018-12-01 15:37:18.031  INFO 31948 --- [  restartedMain] c.n.d.provider.DiscoveryJerseyProvider   : Using JSON decoding codec LegacyJacksonJson  2018-12-01 15:37:18.168  INFO 31948 --- [  restartedMain] c.n.d.provider.DiscoveryJerseyProvider   : Using XML encoding codec XStreamXml  2018-12-01 15:37:18.168  INFO 31948 --- [  restartedMain] c.n.d.provider.DiscoveryJerseyProvider   : Using XML decoding codec XStreamXml  2018-12-01 15:37:18.370  INFO 31948 --- [  restartedMain] c.n.d.s.r.aws.ConfigClusterResolver      : Resolving eureka endpoints via configuration  2018-12-01 15:37:18.387  INFO 31948 --- [  restartedMain] com.netflix.discovery.DiscoveryClient    : Disable delta property : false  2018-12-01 15:37:18.387  INFO 31948 --- [  restartedMain] com.netflix.discovery.DiscoveryClient    : Single vip registry refresh property : null  2018-12-01 15:37:18.387  INFO 31948 --- [  restartedMain] com.netflix.discovery.DiscoveryClient    : Force full registry fetch : false  2018-12-01 15:37:18.387  INFO 31948 --- [  restartedMain] com.netflix.discovery.DiscoveryClient    : Application is null : false  2018-12-01 15:37:18.387  INFO 31948 --- [  restartedMain] com.netflix.discovery.DiscoveryClient    : Registered Applications size is zero : true  2018-12-01 15:37:18.387  INFO 31948 --- [  restartedMain] com.netflix.discovery.DiscoveryClient    : Application version is -1: true  2018-12-01 15:37:18.387  INFO 31948 --- [  restartedMain] com.netflix.discovery.DiscoveryClient    : Getting all instance registry info from the eureka server  2018-12-01 15:37:18.539  INFO 31948 --- [  restartedMain] com.netflix.discovery.DiscoveryClient    : The response status is 200  2018-12-01 15:37:18.541  INFO 31948 --- [  restartedMain] com.netflix.discovery.DiscoveryClient    : Starting heartbeat executor: renew interval is: 30  2018-12-01 15:37:18.543  INFO 31948 --- [  restartedMain] c.n.discovery.InstanceInfoReplicator     : InstanceInfoReplicator onDemand update allowed rate per min is 4  2018-12-01 15:37:18.548  INFO 31948 --- [  restartedMain] com.netflix.discovery.DiscoveryClient    : Discovery Client initialized at timestamp 1543649838546 with initial instances count: 1  2018-12-01 15:37:18.554  INFO 31948 --- [  restartedMain] o.s.c.n.e.s.EurekaServiceRegistry        : Registering application PRODUCER-SERVICE with eureka with status UP  2018-12-01 15:37:18.556  INFO 31948 --- [  restartedMain] com.netflix.discovery.DiscoveryClient    : Saw local status change event StatusChangeEvent [timestamp=1543649838556, current=UP, previous=STARTING]  2018-12-01 15:37:18.557  INFO 31948 --- [nfoReplicator-0] com.netflix.discovery.DiscoveryClient    : DiscoveryClient_PRODUCER-SERVICE/producer-service:-1612568227: registering service...  2018-12-01 15:37:18.627  INFO 31948 --- [nfoReplicator-0] com.netflix.discovery.DiscoveryClient    : DiscoveryClient_PRODUCER-SERVICE/producer-service:-1612568227 - registration status: 204  2018-12-01 15:37:18.645  INFO 31948 --- [  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 13226 (http) with context path ''  2018-12-01 15:37:18.647  INFO 31948 --- [  restartedMain] .s.c.n.e.s.EurekaAutoServiceRegistration : Updating port to 13226  2018-12-01 15:37:18.654  INFO 31948 --- [  restartedMain] ringCloudEurekaProducerClientApplication : Started SpringCloudEurekaProducerClientApplication in 5.537 seconds (JVM running for 7.0)  2018-12-01 15:42:18.402  INFO 31948 --- [trap-executor-0] c.n.d.s.r.aws.ConfigClusterResolver      : Resolving eureka endpoints via configuration

從日誌中我們可以讀取到許多資訊:

  • 第一行日誌告訴我們,服務提供者實例的狀態被標註為「正在啟動」。
  • 第二行日誌告訴我們,在默認的名為「us-east-1」Region中初始化Eureka客戶端,Region的名稱是可以配置的,可以通過eureka.client.region來配置,如果沒有配置它,那麼默認的Region就是us-east-1。這裡順便多說一句,一個微服務應用只可以註冊到一個Region中,也就是說一個微服務應用對應一個Region,一個Region對應多個Zone,是否還記得,我們在配置集群的Eureka Server服務註冊中心的時候,都設置了eureka.client.service-url.defaultZone這個值,就是為了告訴服務提供者者或者集群內的其他Eureka Server,可以向這個Zone註冊,並且defaultZone的值是使用逗號隔開的,也就是說我們的服務可以同時向多個Zone註冊。由此可見,一個服務可以同時註冊到一個Region中的多個Zone的。如果需要自己指定Zone,可以通過eureka.client.availability-zones來指定。關於RegionZone請看下面的源碼:
public static String getRegion(EurekaClientConfig clientConfig) {      String region = clientConfig.getRegion();      if (region == null) {          region = DEFAULT_REGION;      }      region = region.trim().toLowerCase();      return region;  }
public String[] getAvailabilityZones(String region) {  	String value = this.availabilityZones.get(region);  	if (value == null) {  		value = DEFAULT_ZONE;  	}  	return value.split(",");  }
  • 日誌中getting all instance registry info from the eureka server表示服務在註冊的過程中也會向服務註冊中心獲取其他服務實例的資訊列表。
  • 日誌中Starting heartbeat executor: renew interval is: 30表示以默認的30秒作為間隔向服務註冊中心發起心跳請求,告訴服務註冊中心「我還活著」。
  • 日誌中Discovery Client initialized at timestamp 1543649838546 with initial instances count: 1表示在時間戳1543649838546的時候,服務成功初始化完成。
  • 日誌中DiscoveryClient_PRODUCER-SERVICE/producer-service:-1612568227: registering service...表示開始將服務註冊到服務註冊中心。
  • 日誌中DiscoveryClient_PRODUCER-SERVICE/producer-service:-1612568227 - registration status: 204表示服務註冊完成,完成的狀態標誌為204

接下來,我們進入到源碼中,藉助源程式碼來分析一下服務註冊到服務註冊中心的流程。在分析之前,我們有必要搞清楚Spring Cloud是如何集成Eureka的,我們都知道,在Eureka客戶端,無論是服務提供者還是服務消費者,都需要加上@EnableDiscoveryClient註解,用來開啟DiscoveryClient實例,我們通過搜索DiscoveryClient,可以發現,搜索的結果有一個介面還有一個類,介面在包org.springframework.cloud.client.discovery下,類在com.netflix.discovery包下,介面DiscoveryClientSpring Cloud的介面,它定義了用來發現服務的常用方法,通過該介面可以有效地屏蔽服務治理中的實現細節,這就方便切換不同的服務服務治理框架,而無需改動從Spring Cloud層面調用的程式碼,該介面有一個實現類EurekaDiscoveryClient,從命名可以可以看出他是對Eureka服務發現的封裝,進入到EurekaDiscoveryClient可以看到,它有一個成員變數為EurekaClient,這是包com.netflix.discovery下的一個介面,該介面繼承了LookupService介面,且有一個實現類DiscoveryClient,介面EurekaClientLookupService都在com.netflix.discovery包下,他們都定義了針對Eureka的服務發現的抽象方法,而EurekaClient的實現類DiscoveryClient則實現了這些抽象方法,所以說,類DiscoveryClient是真正實現發現服務的類。結合以上的文字,下面展示介面與類的關係圖如下所示:

我們進入到DiscoveryClient類中查看源碼,首先看到的是它的類注釋如下所示:

/**   * The class that is instrumental for interactions with <tt>Eureka Server</tt>.   *   * <p>   * <tt>Eureka Client</tt> is responsible for a) <em>Registering</em> the   * instance with <tt>Eureka Server</tt> b) <em>Renewal</em>of the lease with   * <tt>Eureka Server</tt> c) <em>Cancellation</em> of the lease from   * <tt>Eureka Server</tt> during shutdown   * <p>   * d) <em>Querying</em> the list of services/instances registered with   * <tt>Eureka Server</tt>   * <p>   *   * <p>   * <tt>Eureka Client</tt> needs a configured list of <tt>Eureka Server</tt>   * {@link java.net.URL}s to talk to.These {@link java.net.URL}s are typically amazon elastic eips   * which do not change. All of the functions defined above fail-over to other   * {@link java.net.URL}s specified in the list in the case of failure.   * </p>   *   * @author Karthik Ranganathan, Greg Kim   * @author Spencer Gibb   *   */

大致翻譯如下:

這個類用於幫助與Eureka Server相互協作。    Eureka Client負責下面的任務:  - 向Eureka Server註冊服務實例  - 向Eureka Server服務續約  - 當服務關閉期間,向Eureka Server取消租約  - 查詢Eureka Server中的服務實例列表    Eureka Client還需要配置一個Eureka Server的URL列表

在分析類DiscoveryClient完成的具體任務之前,我們首先來回憶一下,我們在配置服務提供者的時候,在配置文件中都配置了eureka.client.service-url.defaultZone屬性,而這個屬性的值就是告訴服務提供者,該向哪裡註冊服務,也就是服務註冊的地址,該地址比如是http://peer1:1111/eureka/,http://peer2:1112/eureka/,http://peer3:1113/eureka/,各個地址之間使用逗號隔開,我們在類EndpointUtils中可以找到一個方法名為getServiceUrlsMapFromConfig的方法,程式碼如下:

public static Map<String, List<String>> getServiceUrlsMapFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) {      Map<String, List<String>> orderedUrls = new LinkedHashMap<>();      String region = getRegion(clientConfig);      String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());      if (availZones == null || availZones.length == 0) {          availZones = new String[1];          availZones[0] = DEFAULT_ZONE;      }      logger.debug("The availability zone for the given region {} are {}", region, availZones);      int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);        String zone = availZones[myZoneOffset];      List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);      if (serviceUrls != null) {          orderedUrls.put(zone, serviceUrls);      }      int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1);      while (currentOffset != myZoneOffset) {          zone = availZones[currentOffset];          serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);          if (serviceUrls != null) {              orderedUrls.put(zone, serviceUrls);          }          if (currentOffset == (availZones.length - 1)) {              currentOffset = 0;          } else {              currentOffset++;          }      }        if (orderedUrls.size() < 1) {          throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!");      }      return orderedUrls;  }

該方法就是從我們配置的Zone中讀取註冊地址,並組成一個List,最後將這個List存儲到Map集合中,在讀取過程中,它首先載入的是getRegion方法,這個方法讀取了一個Region返回,進入到getRegion方法中:

/**   * Get the region that this particular instance is in.   *   * @return - The region in which the particular instance belongs to.   */  public static String getRegion(EurekaClientConfig clientConfig) {          String region = clientConfig.getRegion();          if (region == null) {              region = DEFAULT_REGION;          }          region = region.trim().toLowerCase();          return region;      }

從方法的注釋上可以知道,一個微服務應用只可以屬於一個Region,方法體中的第一行程式碼就是從EurekaClientConfigBean類中來讀取Region,而EurekaClientConfigBeangetRegion方法返回的值就是需要我們配置的,在配置文件中,對應的屬性是eureka.client.region,如果我們每月配置,那麼將使用默認的Region,默認DEFAULT_REGIONdefault。在方法getServiceUrlsMapFromConfig中,還載入了getAvailabilityZones方法,方法程式碼如下所示:

public String[] getAvailabilityZones(String region) {  	String value = this.availabilityZones.get(region);  	if (value == null) {  		value = DEFAULT_ZONE;  	}  	return value.split(",");  }

上述方法中第一行程式碼是從Region中獲取ZoneavailabilityZonesEurekaClientConfigBean的一個Map成員變數,如果我們沒有為Region特別配置eureka.client.availablity-zones屬性,那麼zone將採用默認值,默認值是defaultZone,這就是我們一開始配置eureka.client.service-url.defaultZone的由來,由此可見,一個Region對應多個Zone,也就是說一個微服務應用可以向多個服務註冊地址註冊。在獲取了RegionZone的資訊之後,才開始真正地載入Eureka Server的具體地址,它根據傳入的參數按照一定的演算法確定載入位於哪一個Zone配置的serviceUrls,程式碼如下:

int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);  String zone = availZones[myZoneOffset];  List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);

當我們在微服務應用中使用Ribbon來實現服務調用時,對於Zone的設置可以在負載均衡時實現區域親和特性:Ribbon的默認策略會優先訪問同客戶端處於一個Zone中的服務實例,只有當同一個Zone中沒有可用的服務實例的時候才會去訪問其他Zone中的實例。利用親和性這一特性,我們就可以有效地設計出針對區域性故障的容錯集群。 從本小節一開始,我們就分析了Spring Cloud Eureka是對Netflix Eureka的封裝,com.netflix.discovery包下的DiscoveryClient才是真正實現服務的註冊與發現。我們一起來看看它的構造方法:

@Inject  DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,                  Provider<BackupRegistry> backupRegistryProvider) {      if (args != null) {          this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;          this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;          this.eventListeners.addAll(args.getEventListeners());          this.preRegistrationHandler = args.preRegistrationHandler;      } else {          this.healthCheckCallbackProvider = null;          this.healthCheckHandlerProvider = null;          this.preRegistrationHandler = null;      }        this.applicationInfoManager = applicationInfoManager;      InstanceInfo myInfo = applicationInfoManager.getInfo();        clientConfig = config;      staticClientConfig = clientConfig;      transportConfig = config.getTransportConfig();      instanceInfo = myInfo;      if (myInfo != null) {          appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();      } else {          logger.warn("Setting instanceInfo to a passed in null value");      }        this.backupRegistryProvider = backupRegistryProvider;        this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);      localRegionApps.set(new Applications());        fetchRegistryGeneration = new AtomicLong(0);        remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());      remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));        if (config.shouldFetchRegistry()) {          this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});      } else {          this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;      }        if (config.shouldRegisterWithEureka()) {          this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});      } else {          this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;      }        logger.info("Initializing Eureka in region {}", clientConfig.getRegion());        if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {          logger.info("Client configured to neither register nor query for data.");          scheduler = null;          heartbeatExecutor = null;          cacheRefreshExecutor = null;          eurekaTransport = null;          instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()          // to work with DI'd DiscoveryClient          DiscoveryManager.getInstance().setDiscoveryClient(this);          DiscoveryManager.getInstance().setEurekaClientConfig(config);            initTimestampMs = System.currentTimeMillis();          logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",                  initTimestampMs, this.getApplications().size());            return;  // no need to setup up an network tasks and we are done      }        try {          // default size of 2 - 1 each for heartbeat and cacheRefresh          scheduler = Executors.newScheduledThreadPool(2,                  new ThreadFactoryBuilder()                          .setNameFormat("DiscoveryClient-%d")                          .setDaemon(true)                          .build());            heartbeatExecutor = new ThreadPoolExecutor(                  1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,                  new SynchronousQueue<Runnable>(),                  new ThreadFactoryBuilder()                          .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")                          .setDaemon(true)                          .build()          );  // use direct handoff            cacheRefreshExecutor = new ThreadPoolExecutor(                  1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,                  new SynchronousQueue<Runnable>(),                  new ThreadFactoryBuilder()                          .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")                          .setDaemon(true)                          .build()          );  // use direct handoff            eurekaTransport = new EurekaTransport();          scheduleServerEndpointTask(eurekaTransport, args);            AzToRegionMapper azToRegionMapper;          if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {              azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);          } else {              azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);          }          if (null != remoteRegionsToFetch.get()) {              azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));          }          instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());      } catch (Throwable e) {          throw new RuntimeException("Failed to initialize DiscoveryClient!", e);      }        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {          fetchRegistryFromBackup();      }        // call and execute the pre registration handler before all background tasks (inc registration) is started      if (this.preRegistrationHandler != null) {          this.preRegistrationHandler.beforeRegistration();      }        if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {          try {              if (!register() ) {                  throw new IllegalStateException("Registration error at startup. Invalid server response.");              }          } catch (Throwable th) {              logger.error("Registration error at startup: {}", th.getMessage());              throw new IllegalStateException(th);          }      }        // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch      // 這個方法里實現了服務向服務註冊中心註冊的行為      initScheduledTasks();        try {          Monitors.registerObject(this);      } catch (Throwable e) {          logger.warn("Cannot register timers", e);      }        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()      // to work with DI'd DiscoveryClient      DiscoveryManager.getInstance().setDiscoveryClient(this);      DiscoveryManager.getInstance().setEurekaClientConfig(config);        initTimestampMs = System.currentTimeMillis();      logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",              initTimestampMs, this.getApplications().size());  }

整個構造方法里,一開始進行了各種參數的設置,而真正地註冊行為是在initScheduledTasks方法里實現的,我們一起來看看註冊行為是如何實現的:

private void initScheduledTasks() {      if (clientConfig.shouldFetchRegistry()) {          // registry cache refresh timer          int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();          int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();          scheduler.schedule(                  new TimedSupervisorTask(                          "cacheRefresh",                          scheduler,                          cacheRefreshExecutor,                          registryFetchIntervalSeconds,                          TimeUnit.SECONDS,                          expBackOffBound,                          new CacheRefreshThread()                  ),                  registryFetchIntervalSeconds, TimeUnit.SECONDS);      }        if (clientConfig.shouldRegisterWithEureka()) {          int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();          int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();          logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);            // Heartbeat timer          scheduler.schedule(                  new TimedSupervisorTask(                          "heartbeat",                          scheduler,                          heartbeatExecutor,                          renewalIntervalInSecs,                          TimeUnit.SECONDS,                          expBackOffBound,                          new HeartbeatThread()                  ),                  renewalIntervalInSecs, TimeUnit.SECONDS);            // InstanceInfo replicator          instanceInfoReplicator = new InstanceInfoReplicator(                  this,                  instanceInfo,                  clientConfig.getInstanceInfoReplicationIntervalSeconds(),                  2); // burstSize            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {              @Override              public String getId() {                  return "statusChangeListener";              }                @Override              public void notify(StatusChangeEvent statusChangeEvent) {                  if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||                          InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {                      // log at warn level if DOWN was involved                      logger.warn("Saw local status change event {}", statusChangeEvent);                  } else {                      logger.info("Saw local status change event {}", statusChangeEvent);                  }                  instanceInfoReplicator.onDemandUpdate();              }          };            if (clientConfig.shouldOnDemandUpdateStatusChange()) {              applicationInfoManager.registerStatusChangeListener(statusChangeListener);          }            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());      } else {          logger.info("Not registering with Eureka server per configuration");      }  }

這段程式碼中有兩個主要的if程式碼塊,第一個if程式碼塊是決定是否從Eureka Server來獲取註冊資訊,判斷條件clientConfig.shouldFetchRegistry()是需要我們自己的在配置文件中通過屬性eureka.client.fetch-registry=true進行配置的,默認為true,也就是說服務會從Eureka Server拉取註冊資訊,且默認間隔為30秒,每30秒執行一次定時任務,用於刷新所獲取的註冊資訊。 第二個if程式碼塊是決定是否將服務註冊到服務註冊中心的,也是我們本次要探討的主要內容。判斷條件clientConfig.shouldRegisterWithEureka()表示是否向Eureka Server註冊自己,你是否還記得,我們在搭建單節點服務註冊中心的時候,我們搭建的那個Eureka Server設置了屬性eureka.client.register-with-eureka=false,意思就是說禁止Eureka Server把自己當做一個普通服務註冊到自身,而這個屬性默認值也是為true,也就是說我們在註冊的服務的時候,無需配置這個屬性,就可以將服務註冊到服務註冊中心。分析第二個if程式碼塊,程式碼塊中一開始就設置了一個定時任務,這個定時任務就是按照指定的時間間隔向Eureka Server發送心跳,告訴服務註冊中心「我還活著」,對於發送心跳的時間間隔,我們一開始就討論過,默認是30秒,這也就是為什麼按照默認來說,一分鐘理應發送兩次心跳了,這個心跳間隔我們可以在配置文件中進行配置,配置屬性為eureka.instance.lease-renewal-interval-in-seconds=30,對於默認90秒內沒有發送心跳的服務,將會被服務在服務註冊中心剔除,剔除時間間隔可以通過屬性eureka.instance.lease-expiration-duration-in-seconds=90來進行配置。而整個服務的續約邏輯也很簡單,在定時任務中有一個程式碼片段new HeartbeatThread(),然後開啟了一個新的執行緒實現續約服務,就是通過發送REST請求來實現的,具體程式碼如下:

/**   * Renew with the eureka service by making the appropriate REST call   */  boolean renew() {      EurekaHttpResponse<InstanceInfo> httpResponse;      try {          httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);          logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());          if (httpResponse.getStatusCode() == 404) {              REREGISTER_COUNTER.increment();              logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());              long timestamp = instanceInfo.setIsDirtyWithTime();              boolean success = register();              if (success) {                  instanceInfo.unsetIsDirty(timestamp);              }              return success;          }          return httpResponse.getStatusCode() == 200;      } catch (Throwable e) {          logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);          return false;      }  }

服務提供者向服務註冊中心發送心跳,並檢查返回碼,如果是404,那麼服務將重新調用register方法,實現將服務註冊到服務註冊中心,否則直接檢測返回碼是否是200,返回布爾類型來告訴定時器是否續約成功。 續約的操作完成之後,就開始了服務實例的複製工作,緊接著通過服務實例管理器ApplicationInfoManager來創建一個服務實例狀態監聽器,用於監聽服務實例的狀態,並進入到onDemandUpdate中進行註冊,方法onDemandUpdate的程式碼如下:

public boolean onDemandUpdate() {      if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {          if (!scheduler.isShutdown()) {              scheduler.submit(new Runnable() {                  @Override                  public void run() {                      logger.debug("Executing on-demand update of local InstanceInfo");                        Future latestPeriodic = scheduledPeriodicRef.get();                      if (latestPeriodic != null && !latestPeriodic.isDone()) {                          logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");                          latestPeriodic.cancel(false);                      }                        InstanceInfoReplicator.this.run();                  }              });              return true;          } else {              logger.warn("Ignoring onDemand update due to stopped scheduler");              return false;          }      } else {          logger.warn("Ignoring onDemand update due to rate limiter");          return false;      }  }    public void run() {      try {          discoveryClient.refreshInstanceInfo();            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();          if (dirtyTimestamp != null) {              discoveryClient.register();              instanceInfo.unsetIsDirty(dirtyTimestamp);          }      } catch (Throwable t) {          logger.warn("There was a problem with the instance info replicator", t);      } finally {          Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);          scheduledPeriodicRef.set(next);      }  }

服務實例的註冊行為是在方法register中執行的,進入到register方法中,程式碼如下:

/**   * Register with the eureka service by making the appropriate REST call.   */  boolean register() throws Throwable {      logger.info(PREFIX + "{}: registering service...", appPathIdentifier);      EurekaHttpResponse<Void> httpResponse;      try {          httpResponse = eurekaTransport.registrationClient.register(instanceInfo);      } catch (Exception e) {          logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);          throw e;      }      if (logger.isInfoEnabled()) {          logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());      }      return httpResponse.getStatusCode() == 204;  }

註冊行為其實就是將服務實例的資訊通過HTTP請求傳遞給Eureka Server服務註冊中心,當註冊中心接收到註冊資訊之後將返回204狀態碼給客戶端,表示註冊成功。這就是客戶端向服務註冊中心註冊的行為源碼分析,那麼服務註冊中心是如何接收這些實例的註冊資訊,且如何保存的呢?請接著往下看。

Eureka Server服務註冊中心接收註冊行為分析

客戶端向服務端發起註冊請求之後,服務端是如何處理的呢?通過源碼的分析,可以發現,客戶端和服務端的交互都是通過REST請求發起的,而在服務端,包com.netflix.eureka.resources下定義了許多處理REST請求的類,對於接收客戶端的註冊資訊,可以發現在類ApplicationResource下有一個addInstance方法,專門用來處理註冊請求的,我們一起來分析這個方法:

/**   * Registers information about a particular instance for an   * {@link com.netflix.discovery.shared.Application}.   *   * @param info   *            {@link InstanceInfo} information of the instance.   * @param isReplication   *            a header parameter containing information whether this is   *            replicated from other nodes.   */  @POST  @Consumes({"application/json", "application/xml"})  public Response addInstance(InstanceInfo info,                              @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {      logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);      // validate that the instanceinfo contains all the necessary required fields      if (isBlank(info.getId())) {          return Response.status(400).entity("Missing instanceId").build();      } else if (isBlank(info.getHostName())) {          return Response.status(400).entity("Missing hostname").build();      } else if (isBlank(info.getIPAddr())) {          return Response.status(400).entity("Missing ip address").build();      } else if (isBlank(info.getAppName())) {          return Response.status(400).entity("Missing appName").build();      } else if (!appName.equals(info.getAppName())) {          return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();      } else if (info.getDataCenterInfo() == null) {          return Response.status(400).entity("Missing dataCenterInfo").build();      } else if (info.getDataCenterInfo().getName() == null) {          return Response.status(400).entity("Missing dataCenterInfo Name").build();      }        // handle cases where clients may be registering with bad DataCenterInfo with missing data      DataCenterInfo dataCenterInfo = info.getDataCenterInfo();      if (dataCenterInfo instanceof UniqueIdentifier) {          String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();          if (isBlank(dataCenterInfoId)) {              boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));              if (experimental) {                  String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";                  return Response.status(400).entity(entity).build();              } else if (dataCenterInfo instanceof AmazonInfo) {                  AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;                  String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);                  if (effectiveId == null) {                      amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());                  }              } else {                  logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());              }          }      }        registry.register(info, "true".equals(isReplication));      return Response.status(204).build();  // 204 to be backwards compatible  }

在接收服務實例註冊的時候,首先要經過一系列的數據校驗,通過校驗以後調用PeerAwareInstanceRegistry的實現類對象的register方法對服務進行註冊,進入到register方法繼續分析:

@Override  public void register(final InstanceInfo info, final boolean isReplication) {  	handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);  	super.register(info, isReplication);  }

方法體中第一行程式碼中調用了publishEvent方法,將註冊事件傳播出去,然後繼續調用com.netflix.eureka.registry包下的AbstractInstanceRegistry抽象類的register方法進行註冊:

/**   * Registers a new instance with a given duration.   *   * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)   */  public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {      try {          read.lock();          Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());          REGISTER.increment(isReplication);          if (gMap == null) {              final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();              gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);              if (gMap == null) {                  gMap = gNewMap;              }          }          Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());          // Retain the last dirty timestamp without overwriting it, if there is already a lease          if (existingLease != null && (existingLease.getHolder() != null)) {              Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();              Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();              logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted              // InstanceInfo instead of the server local copy.              if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {                  logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +                          " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);                  logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");                  registrant = existingLease.getHolder();              }          } else {              // The lease does not exist and hence it is a new registration              synchronized (lock) {                  if (this.expectedNumberOfRenewsPerMin > 0) {                      // Since the client wants to cancel it, reduce the threshold                      // (1                      // for 30 seconds, 2 for a minute)                      this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;                      this.numberOfRenewsPerMinThreshold =                              (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());                  }              }              logger.debug("No previous lease information found; it is new registration");          }          Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);          if (existingLease != null) {              lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());          }          gMap.put(registrant.getId(), lease);          synchronized (recentRegisteredQueue) {              recentRegisteredQueue.add(new Pair<Long, String>(                      System.currentTimeMillis(),                      registrant.getAppName() + "(" + registrant.getId() + ")"));          }          // This is where the initial state transfer of overridden status happens          if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {              logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "                              + "overrides", registrant.getOverriddenStatus(), registrant.getId());              if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {                  logger.info("Not found overridden id {} and hence adding it", registrant.getId());                  overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());              }          }          InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());          if (overriddenStatusFromMap != null) {              logger.info("Storing overridden status {} from map", overriddenStatusFromMap);              registrant.setOverriddenStatus(overriddenStatusFromMap);          }            // Set the status based on the overridden status rules          InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);          registrant.setStatusWithoutDirty(overriddenInstanceStatus);            // If the lease is registered with UP status, set lease service up timestamp          if (InstanceStatus.UP.equals(registrant.getStatus())) {              lease.serviceUp();          }          registrant.setActionType(ActionType.ADDED);          recentlyChangedQueue.add(new RecentlyChangedItem(lease));          registrant.setLastUpdatedTimestamp();          invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());          logger.info("Registered instance {}/{} with status {} (replication={})",                  registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);      } finally {          read.unlock();      }

在程式碼中我們看到,實例資訊InstanceInfo對象被存儲在以instanceIdkeyConcurrentHashMap中,然後又將這個ConcurrentHashMap對象存儲到了以服務名為keyMap中,這就形成了雙層Map結構,這也就對應了一開始我們所說的服務的元資訊存儲在一個雙層Map結構中。

小結

這就基本完成了對Spring Cloud Eureka的簡單源碼分析,這裡僅僅是對Eureka Server初始化的源碼、服務註冊、服務端接收註冊的源碼進行了簡單分析,感興趣的朋友可以通過DEBUG方式深入了解源碼的運行機制。