SpringCloud 源碼系列(3)—— 註冊中心 Eureka(下)

SpringCloud 源碼系列(1)—— 註冊中心 Eureka(上)

SpringCloud 源碼系列(2)—— 註冊中心 Eureka(中)

SpringCloud 源碼系列(3)—— 註冊中心 Eureka(下)

十一、Eureka Server 集群

在實際的生產環境中,可能有幾十個或者幾百個的微服務實例,Eureka Server 承擔了非常高的負載,而且為了保證註冊中心高可用,一般都要部署成集群的,下面就來看看 eureka server 的集群。

1、搭建 Eureka Server 集群

首先來搭建一個三個節點的 eureka-server 集群,看看效果。

① 集群配置

首先在本地 hosts 文件中配置如下映射:

1 127.0.0.1 peer1
2 127.0.0.1 peer2
3 127.0.0.1 peer3

更改註冊中心的 application.yml 配置文件,增加三個 profile,分別對應三個 eureka-server 的客戶端配置。

eureka-server 在集群中作為客戶端就需要抓取註冊表,並配置 eureka-server 的地址。

 1 spring:
 2   application:
 3     name: sunny-register
 4 
 5 ---
 6 spring:
 7   profiles: peer1
 8 server:
 9   port: 8001
10 
11 eureka:
12   instance:
13     hostname: peer1
14   client:
15     # 是否向註冊中心註冊自己
16     register-with-eureka: false
17     # 是否抓取註冊表
18     fetch-registry: true
19     service-url:
20       defaultZone: http://peer1:8001/eureka,//peer2:8002/eureka,//peer3:8003/eureka
21 
22 
23 ---
24 spring:
25   profiles: peer2
26 server:
27   port: 8002
28 
29 eureka:
30   instance:
31     hostname: peer2
32   client:
33     # 是否向註冊中心註冊自己
34     register-with-eureka: false
35     # 是否抓取註冊表
36     fetch-registry: true
37     service-url:
38       defaultZone: http://peer1:8001/eureka,//peer2:8002/eureka,//peer3:8003/eureka
39 
40 ---
41 spring:
42   profiles: peer3
43 server:
44   port: 8003
45 
46 eureka:
47   instance:
48     hostname: peer3
49   client:
50     # 是否向註冊中心註冊自己
51     register-with-eureka: false
52     # 是否抓取註冊表
53     fetch-registry: true
54     service-url:
55       defaultZone: http://peer1:8001/eureka,//peer2:8002/eureka,//peer3:8003/eureka

② 啟動集群

分別啟動三個註冊中心,環境變數 spring.profiles.active 激活對應的集群配置。

啟動之後訪問 //peer1:8001/ 進入 peer1 這個註冊中心,就可以看到另外兩個分片 peer2、peer3,說明集群中有3個節點了。

③ 啟動客戶端

首先客戶端配置增加集群地址:

1 eureka:
2   client:
3     serviceUrl:
4       defaultZone: http://peer1:8001/eureka,//peer2:8002/eureka,//peer3:8003/eureka

啟動幾個客戶端實例,過一會 之後,會發現三個 eureka-server 上都註冊上去了:

到此 eureka-server 集群就搭建起來了,可以看到註冊中心的實例會互相同步,每隔註冊註冊都可以接收註冊、續約、下線等請求,它們是對等的。

2、Eureka Server 集群架構

一般來說,分散式系統的數據在多個副本之間的複製方式,可分為主從複製和對等複製。

① 主從複製

主從複製就是 Master-Slave 模式,即一個主副本,其它副本都為從副本。所有對數據的寫操作都提交到主副本,然後再由主副本同步到從副本。

對於主從複製模式來說,寫操作的壓力都在主副本上,它是整個系統的瓶頸,而從副本則可以幫助主副本分擔讀請求。

② 對等複製

對等複製就是 Peer to Peer 的模式,副本之間不分主從,任何副本都可以接收寫操作,每個副本之間相互進行數據更新同步。

Peer to Peer 模式每個副本之間都可以接收寫請求,不存在寫操作壓力瓶頸。但是由於每個副本都可以進行寫操作,各個副本之間的數據同步及衝突處理是一個棘手的問題。

③ Eureka Server 集群架構

Eureka Server 採用的就是 Peer to Peer 的複製模式,比如一個客戶端實例隨機向其中一個server註冊,然後它就會同步到其它節點中。

3、Eureka Server 啟動時抓取註冊表

前面已經分析過了,在 eureka server 啟動初始化的時候,即 EurekaBootStrap 初始化類,先初始化了 DiscoveryClient,DiscoveryClient 會向註冊中心全量抓取註冊表到本地。

初始化的最後調用了 registry.syncUp() 來同步註冊表,就是將 DiscoveryClient 快取的實例註冊到 eureka-server 的註冊表裡去。

需要注意的是 eureka 配置的註冊表同步重試次數默認為5,springcloud 中默認為 0,因此需要添加如下配置來開啟註冊表同步。

1 eureka:
2   server:
3     registry-sync-retries: 5

將 DiscoveryClient 本地的實例註冊到註冊表中:

4、集群節點同步

① 註冊、續約、下線

前面也分析過了,在客戶端註冊、續約、下線的時候,都會同步到集群其它節點。可以看到都調用了 replicateToPeers 方法來複制到其它集群。

 1 /////////////////////// 註冊 ///////////////////////
 2 public void register(final InstanceInfo info, final boolean isReplication) {
 3     int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
 4     // 如果實例中沒有周期的配置,就設置為默認的 90 秒
 5     if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
 6         leaseDuration = info.getLeaseInfo().getDurationInSecs();
 7     }
 8     // 註冊實例
 9     super.register(info, leaseDuration, isReplication);
10     // 複製到集群其它 server 節點
11     replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
12 }
13 
14 
15 /////////////////////// 下線 ///////////////////////
16 public boolean cancel(final String appName, final String id,
17                       final boolean isReplication) {
18     if (super.cancel(appName, id, isReplication)) {
19         replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
20 
21         return true;
22     }
23     return false;
24 }
25 
26 
27 /////////////////////// 續約 ///////////////////////
28 public boolean renew(final String appName, final String id, final boolean isReplication) {
29     // 調用父類(AbstractInstanceRegistry)的 renew 續約
30     if (super.renew(appName, id, isReplication)) {
31         // 續約完成後同步到集群其它節點
32         replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
33         return true;
34     }
35     return false;
36 }

② 同步到其它節點

來看看 replicateToPeers 方法:

  • 首先判斷 isReplication 參數,如果是集群複製操作,最近一分鐘複製次數 numberOfReplicationsLastMin + 1。isReplication 是在請求頭中指定的,請求頭為 PeerEurekaNode.HEADER_REPLICATION(x-netflix-discovery-replication)。
  • 接著遍歷集群列表,複製實例操作到集群節點中。前面也分析過了,PeerEurekaNode 就代表了一個 eureka-server,PeerEurekaNodes 就代表了 eureka-server 集群。
  • 複製實例操作到集群的方法 replicateInstanceActionsToPeers 就是根據不同的操作類型調用集群 PeerEurekaNode 對應的方法完成操作複製。
 1 private void replicateToPeers(Action action, String appName, String id,
 2                               InstanceInfo info /* optional */,
 3                               InstanceStatus newStatus /* optional */, boolean isReplication) {
 4     Stopwatch tracer = action.getTimer().start();
 5     try {
 6         if (isReplication) {
 7             // 如果是來自其它server節點的註冊請求,則最近一分鐘集群同步次數+1
 8             numberOfReplicationsLastMin.increment();
 9         }
10         // If it is a replication already, do not replicate again as this will create a poison replication
11         if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
12             return;
13         }
14 
15         // 如果是來自客戶端的註冊請求,就同步到集群中其它server節點
16         for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
17             // If the url represents this host, do not replicate to yourself.
18             if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
19                 continue;
20             }
21 
22             replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
23         }
24     } finally {
25         tracer.stop();
26     }
27 }
 1 private void replicateInstanceActionsToPeers(Action action, String appName,
 2                                              String id, InstanceInfo info, InstanceStatus newStatus,
 3                                              PeerEurekaNode node) {
 4     try {
 5         InstanceInfo infoFromRegistry;
 6         CurrentRequestVersion.set(Version.V2);
 7         switch (action) {
 8             case Cancel:
 9                 // 下線
10                 node.cancel(appName, id);
11                 break;
12             case Heartbeat:
13                 InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
14                 infoFromRegistry = getInstanceByAppAndId(appName, id, false);
15                 // 續約
16                 node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
17                 break;
18             case Register:
19                 // 註冊
20                 node.register(info);
21                 break;
22             case StatusUpdate:
23                 infoFromRegistry = getInstanceByAppAndId(appName, id, false);
24                 node.statusUpdate(appName, id, newStatus, infoFromRegistry);
25                 break;
26             case DeleteStatusOverride:
27                 infoFromRegistry = getInstanceByAppAndId(appName, id, false);
28                 node.deleteStatusOverride(appName, id, infoFromRegistry);
29                 break;
30         }
31     } catch (Throwable t) {
32         logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
33     } finally {
34         CurrentRequestVersion.remove();
35     }
36 }

③ isReplication

PeerEurekaNode 與 eureka-server 通訊的組件是 JerseyReplicationClient,這個類重寫了 addExtraHeaders 方法,並添加了請求頭 PeerEurekaNode.HEADER_REPLICATION,設置為 true。

這樣其它 eureka-server 收到這個複製操作後,就知道是來自集群節點的同步操作,就不會再同步給其它節點了,從而避免死循環。

1 @Override
2 protected void addExtraHeaders(Builder webResource) {
3     webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true");
4 }

十二、集群同步機制

Eureka Server 集群間同步機制還是比較複雜的,試想如果每次客戶端的請求一過來,比如註冊、心跳,然後 eureka-server 就立馬同步給集群中其它 server 節點,那 eureka-server 這種 Peer to Peer 的模式實際上就無法分擔客戶端的寫操作壓力,相當於每個 eureka-server 接收到的請求量都是一樣的。那 eureka server 為了避免這種情況,底層採用了三層隊列,加批量任務的方式來進行集群間的同步。簡單來說就是先將客戶端操作放入隊列中,然後從隊列中取出一批操作,然後將這一批操作發送給其它 Server 節點,Server節點接收到之後再將這批操作解析到本地。下面就來詳細看看是如何實現的。

1、集群節點 PeerEurekaNode

之前分析 eureka-server 啟動初始化的時候,EurekaBootStrap 初始化了代表集群的 PeerEurekaNodes,它裡面又根據配置的註冊中心地址構造了 PeerEurekaNode,集群間同步核心的組件就是這個 PeerEurekaNode 了。下面以客戶端註冊為例來看下是如何同步的。

① 註冊同步

replicateInstanceActionsToPeers 中調用了 PeerEurekaNode 的 register 方法來同步註冊操作到集群。

node.register 方法:

  • 可以看到先計算了過期時間,為當前時間 + 租約間隔時間(默認90秒)
  • 然後調用了 batchingDispatcher 批量任務分發器來處理任務,提交了一個 InstanceReplicationTask 的實例,其 execute 方法中調用了 replicationClient 來向這個 server 註冊同步。
 1 public void register(final InstanceInfo info) throws Exception {
 2     // 過期時間:當前時間 + 租約時間(默認90秒)
 3     long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
 4     batchingDispatcher.process(
 5             taskId("register", info),
 6             new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
 7                 public EurekaHttpResponse<Void> execute() {
 8                     return replicationClient.register(info);
 9                 }
10             },
11             expiryTime
12     );
13 }

再看下 getLeaseRenewalOf 這個方法,這裡應該是有bug的,這個方法返回的是毫秒數,可以看到它的衛語句的else部分是乘以 1000 了的,而 if 部分則沒有,返回的是 90,不過這裡 info.getLeaseInfo() 應該都不會為 null。

1 private static int getLeaseRenewalOf(InstanceInfo info) {
2     // bug : Lease.DEFAULT_DURATION_IN_SECS * 1000
3     return (info.getLeaseInfo() == null ? Lease.DEFAULT_DURATION_IN_SECS : info.getLeaseInfo().getRenewalIntervalInSecs()) * 1000;
4 }

② PeerEurekaNode 的構造

batchingDispatcher 是在 PeerEurekaNode 的構造方法中初始化的,來看下它的構造方法:

  • registry:本地註冊表
  • targetHost:eureka-server host
  • replicationClient:基於 jersey 的集群複製客戶端通訊組件,它在請求頭中設置了 PeerEurekaNode.HEADER_REPLICATION 為 true
  • serviceUrl:eureka-server 地址
  • maxProcessingDelayMs:最大處理延遲毫秒數,默認為30000毫秒,即30秒,在下線的時候有用到
  • batcherName:批處理器名稱
  • taskProcessor:複製任務處理器,它封裝了 targetHost 和 replicationClient,主要就是 ReplicationTaskProcessor 在處理批量任務的提交
  • batchingDispatcher:批量任務分發器,它會將任務打成一個批次提交到 eureka-server,避免多次請求eureka-server,註冊時就是先用這個分發器提交的任務
  • nonBatchingDispatcher:非批量任務分發器,就是一個任務一個任務的提交
 1 public PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, HttpReplicationClient replicationClient, EurekaServerConfig config) {
 2     this(registry, targetHost, serviceUrl, replicationClient, config, BATCH_SIZE, MAX_BATCHING_DELAY_MS, RETRY_SLEEP_TIME_MS, SERVER_UNAVAILABLE_SLEEP_TIME_MS);
 3 }
 4 
 5 PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
 6                                  HttpReplicationClient replicationClient, EurekaServerConfig config,
 7                                  int batchSize, long maxBatchingDelayMs,
 8                                  long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
 9     this.registry = registry;
10     // 集群節點 host
11     this.targetHost = targetHost;
12     this.replicationClient = replicationClient;
13 
14     // 集群節點地址
15     this.serviceUrl = serviceUrl;
16     this.config = config;
17     // 最大延遲時間 默認30秒
18     this.maxProcessingDelayMs = config.getMaxTimeForReplication();
19 
20     // 批處理器名稱
21     String batcherName = getBatcherName();
22 
23     // 複製任務處理器
24     ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
25     // 批量任務分發器
26     this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
27             batcherName,
28             // 複製池裡最大容量,默認 10000
29             config.getMaxElementsInPeerReplicationPool(),
30             batchSize, // 250
31             // 同步使用的最大執行緒數 默認 20
32             config.getMaxThreadsForPeerReplication(),
33             maxBatchingDelayMs, // 500
34             serverUnavailableSleepTimeMs, // 1000
35             retrySleepTimeMs, // 100
36             taskProcessor
37     );
38     // 單個任務分發器
39     this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
40             targetHost,
41             config.getMaxElementsInStatusReplicationPool(),
42             config.getMaxThreadsForStatusReplication(),
43             maxBatchingDelayMs,
44             serverUnavailableSleepTimeMs,
45             retrySleepTimeMs,
46             taskProcessor
47     );
48 }

2、批量分發器 TaskDispatcher

創建 batchingDispatcher 時調用了 TaskDispatchers.createBatchingTaskDispatcher 方法創建了 batchingDispatcher。

首先看下 createBatchingTaskDispatcher 的參數及默認值,後面分析程式碼的時候會用到這些參數:

  • id:批量分發器的名稱
  • maxBufferSize:快取池最大數量,默認 10000
  • workloadSize:工作負載數量,即一個批次最多多少任務,默認 250
  • workerCount:工作者數量,這個是執行緒池執行緒工作執行緒的數量,默認20
  • maxBatchingDelay:批量任務最大延遲毫秒數,默認為 500 毫秒
  • congestionRetryDelayMs:阻塞重試延遲毫秒數,默認為 1000 毫秒
  • networkFailureRetryMs:網路失敗重試延遲毫秒數,默認為 100 毫秒
  • taskProcessor:任務處理器,即 ReplicationTaskProcessor

再看下這個方法:

  • 首先創建了一個接收者執行器 AcceptorExecutor,主要的參數是快取、時間相關的
  • 再創建了一個任務處理器 TaskExecutors,主要的參數是工作執行緒數、任務處理器以及接收者執行器,可以猜測這應該就是最終執行批量任務提交的執行器
  • 最後創建了任務分發器 TaskDispatcher,從它的 process 方法可以看出,分發器提交的任務實際上又提交給了 AcceptorExecutor

從這裡可以知道,前面註冊時 batchingDispatcher.process() 提交的任務其實就是分發到 acceptorExecutor 這個接收者執行器了。創建的這個分發器 TaskDispatcher 主要有接收者執行器 AcceptorExecutor 和 任務處理器 TaskExecutors 這兩個組件,核心的分發功能就在這兩個組件中。

 1 public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id, int maxBufferSize, int workloadSize,
 2                                                                          int workerCount, long maxBatchingDelay, long congestionRetryDelayMs,
 3                                                                          long networkFailureRetryMs, TaskProcessor<T> taskProcessor) {
 4     // 接收者執行器 AcceptorExecutor
 5     final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
 6             id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
 7     );
 8 
 9     // 任務處理器 TaskExecutors, workerCount = 20
10     final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
11 
12     return new TaskDispatcher<ID, T>() {
13         @Override
14         public void process(ID id, T task, long expiryTime) {
15             // 任務由 acceptorExecutor 處理
16             acceptorExecutor.process(id, task, expiryTime);
17         }
18 
19         @Override
20         public void shutdown() {
21             acceptorExecutor.shutdown();
22             taskExecutor.shutdown();
23         }
24     };
25 }

3、接收者執行器 AcceptorExecutor

先看下創建 AcceptorExecutor 的構造方法:

  • 根據 congestionRetryDelayMs、networkFailureRetryMs 創建了一個時間調整器 TrafficShaper,應該主要就是用來調整補償時間的
  • 然後創建了一個後台執行緒 acceptorThread,它運行的任務是 AcceptorRunner,主要就是將任務轉成批量任務的
  • 最後就是註冊了一些監控統計之類的
 1 AcceptorExecutor(String id,
 2                  int maxBufferSize,
 3                  int maxBatchingSize,
 4                  long maxBatchingDelay,
 5                  long congestionRetryDelayMs,
 6                  long networkFailureRetryMs) {
 7     // 批處理器名稱
 8     this.id = id;
 9     // 最大緩衝數:10000
10     this.maxBufferSize = maxBufferSize;
11     // 每批最大數量:250
12     this.maxBatchingSize = maxBatchingSize;
13     // 最大延遲時間:500 ms
14     this.maxBatchingDelay = maxBatchingDelay;
15     // 時間調整器
16     // congestionRetryDelayMs 阻塞重試延遲時間,1000ms
17     // networkFailureRetryMs 網路異常重試時間,100ms
18     this.trafficShaper = new TrafficShaper(congestionRetryDelayMs, networkFailureRetryMs);
19 
20     // 接收者後台處理執行緒
21     ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
22     this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id);
23     this.acceptorThread.setDaemon(true);
24     this.acceptorThread.start();
25 
26     // 監控統計相關
27     final double[] percentiles = {50.0, 95.0, 99.0, 99.5};
28     final StatsConfig statsConfig = new StatsConfig.Builder()
29             .withSampleSize(1000)
30             .withPercentiles(percentiles)
31             .withPublishStdDev(true)
32             .build();
33     final MonitorConfig config = MonitorConfig.builder(METRIC_REPLICATION_PREFIX + "batchSize").build();
34     this.batchSizeMetric = new StatsTimer(config, statsConfig);
35     try {
36         Monitors.registerObject(id, this);
37     } catch (Throwable e) {
38         logger.warn("Cannot register servo monitor for this object", e);
39     }
40 }

然後看看 AcceptorExecutor 的屬性,它定義了幾個隊列以及容器來處理批量任務,我們先知道有這些東西,後面再來看看都怎麼使用的。

然後可以看到 AcceptorExecutor 大量使用了並發包下的一些類,以及隊列的特性,這裡我們需要了解下這些類的特性:

  • LinkedBlockingQueue:基於鏈表的單端阻塞隊列,就是隊尾入隊,隊首出隊
  • Deque:雙端隊列,就是隊首、隊尾都可以入隊、出隊
  • Semaphore:訊號量,需要通過 acquire(或 tryAcquire) 獲取到許可證之後才可以進入臨界區,通過 release 釋放許可證。只要能拿到許可證,Semaphore 是可以允許多個執行緒進入臨界區的。另外注意它這裡設置的許可證數量是0,說明要先調用了 release 放入一個許可證,才有可能調用 acquire 獲取到許可證。
 1 // 接收任務的隊列
 2 private final BlockingQueue<TaskHolder<ID, T>> acceptorQueue = new LinkedBlockingQueue<>();
 3 // 重試任務的隊列
 4 private final BlockingDeque<TaskHolder<ID, T>> reprocessQueue = new LinkedBlockingDeque<>();
 5 // 後台接收者執行緒
 6 private final Thread acceptorThread;
 7 // 待處理任務容器
 8 private final Map<ID, TaskHolder<ID, T>> pendingTasks = new HashMap<>();
 9 // 處理中的隊列
10 private final Deque<ID> processingOrder = new LinkedList<>();
11 
12 // 單項隊列請求的訊號量
13 private final Semaphore singleItemWorkRequests = new Semaphore(0);
14 // 單項任務隊列
15 private final BlockingQueue<TaskHolder<ID, T>> singleItemWorkQueue = new LinkedBlockingQueue<>();
16 
17 // 批量隊列請求的訊號量
18 private final Semaphore batchWorkRequests = new Semaphore(0);
19 // 批量任務隊列
20 private final BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue = new LinkedBlockingQueue<>();
21 // 時間調整器
22 private final TrafficShaper trafficShaper;

TaskDispatcher 調用 acceptorExecutor.process 將任務轉給 AcceptorExecutor,可以看到就是將任務添加到接收者隊列 acceptorQueue 的隊尾了。

1 void process(ID id, T task, long expiryTime) {
2     acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
3     acceptedTasks++;
4 }

4、接收者任務 AcceptorRunner

任務添加到 acceptorQueue 了,那任務在哪處理的呢?這就是在 AcceptorRunner 這個任務里去處理的了,這個任務比較複雜,我先把整個程式碼放出來,再來分析。

  1 class AcceptorRunner implements Runnable {
  2     @Override
  3     public void run() {
  4         long scheduleTime = 0;
  5         while (!isShutdown.get()) {
  6             try {
  7                 // 排出輸入隊列的任務:將 reprocessQueue、acceptorQueue 隊列的任務轉移到 pendingTasks
  8                 drainInputQueues();
  9 
 10                 // 待處理的數量
 11                 int totalItems = processingOrder.size();
 12 
 13                 long now = System.currentTimeMillis();
 14                 if (scheduleTime < now) {
 15                     // 時間補償,正常情況下 transmissionDelay() 返回 0
 16                     scheduleTime = now + trafficShaper.transmissionDelay();
 17                 }
 18                 if (scheduleTime <= now) {
 19                     // 分配批量工作任務:將 pendingTasks 的任務分一批到(最多250個) batchWorkQueue 隊列中
 20                     assignBatchWork();
 21                     // 分配單項工作任務:pendingTasks 如果還有剩餘任務,將沒有過期的轉移到 singleItemWorkQueue 隊列中
 22                     assignSingleItemWork();
 23                 }
 24 
 25                 // If no worker is requesting data or there is a delay injected by the traffic shaper,
 26                 // sleep for some time to avoid tight loop.
 27                 if (totalItems == processingOrder.size()) {
 28                     Thread.sleep(10);
 29                 }
 30             } catch (InterruptedException ex) {
 31                 // Ignore
 32             } catch (Throwable e) {
 33                 // Safe-guard, so we never exit this loop in an uncontrolled way.
 34                 logger.warn("Discovery AcceptorThread error", e);
 35             }
 36         }
 37     }
 38 
 39     private boolean isFull() {
 40         // 待處理的任務 >= 10000,也就是說 pendingTasks 最多放 10000 個任務
 41         return pendingTasks.size() >= maxBufferSize;
 42     }
 43 
 44     private void drainInputQueues() throws InterruptedException {
 45         do {
 46             // 排出 reprocessQueue,將 reprocessQueue 隊列的任務轉移到 pendingTasks
 47             drainReprocessQueue();
 48             // 排出 acceptorQueue,將 acceptorQueue 隊列的任務轉移到 pendingTasks
 49             drainAcceptorQueue();
 50 
 51             if (isShutdown.get()) {
 52                 break;
 53             }
 54             // If all queues are empty, block for a while on the acceptor queue
 55             if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
 56                 // 等待任務放入 acceptorQueue,等待 10 毫秒
 57                 TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
 58                 if (taskHolder != null) {
 59                     // 放入之後 acceptorQueue、pendingTasks 就不為空了
 60                     appendTaskHolder(taskHolder);
 61                 }
 62             }
 63             // pendingTasks 為空、acceptorQueue 不為空、reprocessQueue不為空時,就會一直循環
 64             // 如果所有任務都處理完了,reprocessQueue、acceptorQueue、pendingTasks 都是空的,
 65             // 這時就會循環等待任務進入 acceptorQueue,每次等待 10 毫秒
 66         } while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty());
 67     }
 68 
 69     private void drainAcceptorQueue() {
 70         while (!acceptorQueue.isEmpty()) {
 71             // 將 acceptorQueue 的任務轉移到 pendingTasks
 72             appendTaskHolder(acceptorQueue.poll());
 73         }
 74     }
 75 
 76     private void drainReprocessQueue() {
 77         long now = System.currentTimeMillis();
 78         while (!reprocessQueue.isEmpty() && !isFull()) {
 79             // 從 reprocessQueue 隊尾取出任務
 80             TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast();
 81             ID id = taskHolder.getId();
 82             if (taskHolder.getExpiryTime() <= now) {
 83                 // 任務過期
 84                 expiredTasks++;
 85             } else if (pendingTasks.containsKey(id)) {
 86                 // pendingTasks 已存在
 87                 overriddenTasks++;
 88             } else {
 89                 // 將 reprocessQueue 隊列的任務放到 pendingTasks
 90                 pendingTasks.put(id, taskHolder);
 91                 // 添加到 processingOrder 隊列的頭部,reprocessQueue 是失敗重試的隊列,所以優先順序高一些
 92                 processingOrder.addFirst(id);
 93             }
 94         }
 95         if (isFull()) {
 96             queueOverflows += reprocessQueue.size();
 97             // pendingTasks 滿了,就清空 reprocessQueue
 98             reprocessQueue.clear();
 99         }
100     }
101 
102     private void appendTaskHolder(TaskHolder<ID, T> taskHolder) {
103         if (isFull()) {
104             // pendingTasks 滿了就移除一個元素
105             pendingTasks.remove(processingOrder.poll());
106             queueOverflows++;
107         }
108         // 將 acceptorQueue 里的任務放到 pendingTasks
109         TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder);
110         if (previousTask == null) {
111             // 原本不存在,將任務ID添加到 processingOrder 隊列的最後
112             processingOrder.add(taskHolder.getId());
113         } else {
114             // 已經存在了,就是覆蓋
115             overriddenTasks++;
116         }
117     }
118 
119     void assignSingleItemWork() {
120         if (!processingOrder.isEmpty()) {
121             if (singleItemWorkRequests.tryAcquire(1)) {
122                 long now = System.currentTimeMillis();
123                 while (!processingOrder.isEmpty()) {
124                     ID id = processingOrder.poll();
125                     TaskHolder<ID, T> holder = pendingTasks.remove(id);
126                     if (holder.getExpiryTime() > now) {
127                         // 將 pendingTasks 的任務移到 singleItemWorkQueue
128                         singleItemWorkQueue.add(holder);
129                         return;
130                     }
131                     expiredTasks++;
132                 }
133                 singleItemWorkRequests.release();
134             }
135         }
136     }
137 
138     void assignBatchWork() {
139         // 有足夠的任務做一個批處理
140         if (hasEnoughTasksForNextBatch()) {
141             if (batchWorkRequests.tryAcquire(1)) {
142                 long now = System.currentTimeMillis();
143                 // 一批任務最多 250 個
144                 int len = Math.min(maxBatchingSize, processingOrder.size());
145                 List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
146                 // 將 pendingTasks 中的任務移動一批到 holders 中
147                 // 也就是說,如果隊列中有500個任務,這一批任務最多也是250個
148                 while (holders.size() < len && !processingOrder.isEmpty()) {
149                     ID id = processingOrder.poll();
150                     TaskHolder<ID, T> holder = pendingTasks.remove(id);
151                     if (holder.getExpiryTime() > now) {
152                         holders.add(holder);
153                     } else {
154                         expiredTasks++;
155                     }
156                 }
157                 if (holders.isEmpty()) {
158                     batchWorkRequests.release();
159                 } else {
160                     batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
161                     // 添加到批量隊列中
162                     batchWorkQueue.add(holders);
163                 }
164             }
165         }
166     }
167 
168     // 是否有足夠的任務做一個批處理
169     private boolean hasEnoughTasksForNextBatch() {
170         if (processingOrder.isEmpty()) {
171             return false;
172         }
173         if (pendingTasks.size() >= maxBufferSize) {
174             return true;
175         }
176 
177         // 從 processingOrder 隊首取一個任務ID,然後從 pendingTasks 讀取這個任務。注意 peek() 只是取出元素,並不會移除隊首的元素
178         TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
179         // 判斷任務提交到現在的時間差是否超過最大批任務延遲時間(500毫秒)
180         long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
181         return delay >= maxBatchingDelay;
182     }
183 }

View Code

先看它的 run 方法:

① 隊列中的任務轉移到待處理容器中

drainInputQueues 將輸入隊列(reprocessQueue、acceptorQueue)的任務轉移到 pendingTasks 這個待處理容器中。

先是 drainReprocessQueue 將重處理隊列 reprocessQueue 中的任務轉移到 pendingTasks:

  • 如果 pendingTasks 已滿(超過10000),就直接清空了 reprocessQueue。任務丟棄會不會有影響呢?
  • 否則,如果 reprocessQueue 非空,就從 reprocessQueue 隊尾一個個取出來:
    • 如果過期了就丟掉這個任務,說明已經超過續約周期了(90秒)。比如實例註冊,如果多次同步失敗後,然後就直接丟棄,那不是其它 server 永遠無法知道註冊的這個實例?後面再分析這個問題。
    • 如果 pendingTasks 已經存在了,也丟棄這個重試任務
    • 否則就添加到 pendingTasks 中,並且往 processingOrder 的頭部添加了任務ID
    • 注意它這裡是從 reprocessQueue 隊尾一個個取出,放入 processingOrder 頭部,最終任務在 processingOrder 中的順序跟 reprocessQueue 是一樣的

然後是 drainAcceptorQueue 將接收者隊列 acceptorQueue 中的任務轉移到 pendingTasks:

  • 只要 acceptorQueue 非空,就從隊首取出任務
  • 如果 pendingTasks 已滿,則從 processingOrder 隊首取出第一個任務的ID,並從 pendingTasks 中移除這個任務
  • 否則就將任務添加到 pendingTasks,如果之前不存在相同ID的任務,就將任務ID添加到 processingOrder 隊尾
  • 注意它這裡是從 acceptorQueue 隊首取出任務,放到 processingOrder 隊尾,最終任務在 processingOrder 中的順序跟 acceptorQueue 是一樣的

從這段任務轉移以及後面的使用來看,processingOrder 將決定任務的處理順序,最前面的將最先處理,也說明了 reprocessQueue 的優先順序比 acceptorQueue 更高。而 pendingTasks 是一個 key-value 的隊列,便於快速通過ID讀取任務。

 1 private void drainAcceptorQueue() {
 2     while (!acceptorQueue.isEmpty()) {
 3         // 將 acceptorQueue 的任務轉移到 pendingTasks
 4         appendTaskHolder(acceptorQueue.poll());
 5     }
 6 }
 7 
 8 private void drainReprocessQueue() {
 9     long now = System.currentTimeMillis();
10     while (!reprocessQueue.isEmpty() && !isFull()) {
11         // 從 reprocessQueue 隊尾取出任務
12         TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast();
13         ID id = taskHolder.getId();
14         if (taskHolder.getExpiryTime() <= now) {
15             // 任務過期
16             expiredTasks++;
17         } else if (pendingTasks.containsKey(id)) {
18             // pendingTasks 已存在
19             overriddenTasks++;
20         } else {
21             // 將 reprocessQueue 隊列的任務放到 pendingTasks
22             pendingTasks.put(id, taskHolder);
23             // 添加到 processingOrder 隊列的頭部,reprocessQueue 是失敗重試的隊列,所以優先順序高一些
24             processingOrder.addFirst(id);
25         }
26     }
27     if (isFull()) {
28         queueOverflows += reprocessQueue.size();
29         // pendingTasks 滿了,就清空 reprocessQueue
30         reprocessQueue.clear();
31     }
32 }
33 
34 private void appendTaskHolder(TaskHolder<ID, T> taskHolder) {
35     if (isFull()) {
36         // pendingTasks 滿了就移除一個元素
37         pendingTasks.remove(processingOrder.poll());
38         queueOverflows++;
39     }
40     // 將 acceptorQueue 里的任務放到 pendingTasks
41     TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder);
42     if (previousTask == null) {
43         // 原本不存在,將任務ID添加到 processingOrder 隊列的最後
44         processingOrder.add(taskHolder.getId());
45     } else {
46         // 已經存在了,就是覆蓋
47         overriddenTasks++;
48     }
49 }

② 接下來通過 trafficShaper 獲取了一個補償時間,它主要是在發生阻塞或網路異常導致任務提交失敗後,在任務調度周期內做一個時間補償,這塊等分析到提交任務失敗的時候再回來看看。

1 long now = System.currentTimeMillis();
2 if (scheduleTime < now) {
3     // 時間補償,正常情況下 transmissionDelay() 返回 0
4     scheduleTime = now + trafficShaper.transmissionDelay();
5 }

③ 任務打包

接著看 assignBatchWork ,它就是將任務打包成一個批次:

  • 首先調用 hasEnoughTasksForNextBatch 判斷是否有足夠的任務來打成一個批次,注意它判斷了最新提交的任務的時間是否超過了延遲時間 maxBatchingDelay(500ms),也就是說批次任務每隔500毫秒運行一次。
  • 能夠打包後,要獲取 batchWorkRequests 訊號量的一個許可證,因為許可證默認數量是 0,那一定是先有地方調用了 batchWorkRequests.release() 放入許可證,否則這裡就不會打包了。
  • 然後可以看出,一個批次的任務數量最多是250個
  • 它從 processingOrder 的隊首取出這個批次的任務ID,並從 pendingTasks 中取出任務,如果是過期的任務就直接丟棄了。
  • 然後如果這個批次並沒有任務,他才調用 batchWorkRequests.release() 釋放了許可證,否則就把這個批次任務添加到批量工作隊列 batchWorkQueue 中,注意並沒有釋放許可證。
 1 void assignBatchWork() {
 2     // 有足夠的任務做一個批處理
 3     if (hasEnoughTasksForNextBatch()) {
 4         // 獲取許可證
 5         if (batchWorkRequests.tryAcquire(1)) {
 6             long now = System.currentTimeMillis();
 7             // 一批任務最多 250 個
 8             int len = Math.min(maxBatchingSize, processingOrder.size());
 9             List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
10             // 將 pendingTasks 中的任務移動一批到 holders 中
11             // 也就是說,如果隊列中有500個任務,這一批任務最多也是250個
12             while (holders.size() < len && !processingOrder.isEmpty()) {
13                 ID id = processingOrder.poll();
14                 TaskHolder<ID, T> holder = pendingTasks.remove(id);
15                 if (holder.getExpiryTime() > now) {
16                     holders.add(holder);
17                 } else {
18                     expiredTasks++;
19                 }
20             }
21             if (holders.isEmpty()) {
22                 batchWorkRequests.release();
23             } else {
24                 batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
25                 // 添加到批量隊列中
26                 batchWorkQueue.add(holders);
27             }
28         }
29     }
30 }
31 
32 // 是否有足夠的任務做一個批處理
33 private boolean hasEnoughTasksForNextBatch() {
34     if (processingOrder.isEmpty()) {
35         return false;
36     }
37     if (pendingTasks.size() >= maxBufferSize) {
38         return true;
39     }
40 
41     // 從 processingOrder 隊首取一個任務ID,然後從 pendingTasks 讀取這個任務。注意 peek() 只是取出元素,並不會移除隊首的元素
42     TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
43     // 判斷任務提交到現在的時間差是否超過最大批任務延遲時間(500毫秒)
44     long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
45     return delay >= maxBatchingDelay;
46 }

接著看分配單項任務的方法 assignSingleItemWork:

  • 如果 processingOrder 非空且獲取到了 singleItemWorkRequests 訊號量的許可證,就將 processingOrder 隊列剩餘的任務都取出來,放入單項工作隊列 singleItemWorkQueue 中
  • 也就是前面已經打了一批任務(250個)之後,processingOrder 中還有任務,就全部取出來放到 singleItemWorkQueue 隊列中
 1 void assignSingleItemWork() {
 2     if (!processingOrder.isEmpty()) {
 3         if (singleItemWorkRequests.tryAcquire(1)) {
 4             long now = System.currentTimeMillis();
 5             while (!processingOrder.isEmpty()) {
 6                 ID id = processingOrder.poll();
 7                 TaskHolder<ID, T> holder = pendingTasks.remove(id);
 8                 if (holder.getExpiryTime() > now) {
 9                     // 將 pendingTasks 的任務移到 singleItemWorkQueue
10                     singleItemWorkQueue.add(holder);
11                     return;
12                 }
13                 expiredTasks++;
14             }
15             singleItemWorkRequests.release();
16         }
17     }
18 }

5、任務處理器 TaskExecutors

batchWorkQueue 中的批量任務以及 singleItemWorkQueue 中的單項任務都已經準備好了,那是在哪裡發送到集群節點的呢,那就是任務執行器 TaskExecutors 了。

① 創建 TaskExecutors

從創建 TaskExecutors 的方法中可以看出:

  • 批量處理任務的類是 BatchWorkerRunnable,它主要就是處理批量任務隊列 batchWorkQueue 中的任務
  • 處理單項任務的類是 SingleTaskWorkerRunnable,它主要就是處理單項任務隊列 singleItemWorkQueue 中的任務
  • TaskExecutors 創建了一個執行緒池,batchExecutors 默認有20個工作執行緒(不太理解他為什麼不用JDK現成的執行緒池。。),singleItemExecutors 默認只有一個工作執行緒。
 1 static <ID, T> TaskExecutors<ID, T> singleItemExecutors(final String name, int workerCount, final TaskProcessor<T> processor, final AcceptorExecutor<ID, T> acceptorExecutor) {
 2     final AtomicBoolean isShutdown = new AtomicBoolean();
 3     final TaskExecutorMetrics metrics = new TaskExecutorMetrics(name);
 4     registeredMonitors.put(name, metrics);
 5     // workerCount = 1
 6     return new TaskExecutors<>(idx -> new SingleTaskWorkerRunnable<>("TaskNonBatchingWorker-" + name + '-' + idx, isShutdown, metrics, processor, acceptorExecutor), workerCount, isShutdown);
 7 }
 8 
 9 ////////////////////////////////////////////////
10 
11 static <ID, T> TaskExecutors<ID, T> batchExecutors(final String name, int workerCount, final TaskProcessor<T> processor, final AcceptorExecutor<ID, T> acceptorExecutor) {
12     final AtomicBoolean isShutdown = new AtomicBoolean();
13     final TaskExecutorMetrics metrics = new TaskExecutorMetrics(name);
14     registeredMonitors.put(name, metrics);
15     // BatchWorkerRunnable 批量任務處理
16     return new TaskExecutors<>(idx -> new BatchWorkerRunnable<>("TaskBatchingWorker-" + name + '-' + idx, isShutdown, metrics, processor, acceptorExecutor), workerCount, isShutdown);
17 }
18 
19 ////////////////////////////////////////////////
20 
21 private final List<Thread> workerThreads;
22 
23 TaskExecutors(WorkerRunnableFactory<ID, T> workerRunnableFactory, int workerCount, AtomicBoolean isShutdown) {
24     this.isShutdown = isShutdown;
25     // 工作執行緒集合
26     this.workerThreads = new ArrayList<>();
27 
28     // 創建20個執行緒,相當於是搞了一個執行緒池
29     ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
30     for (int i = 0; i < workerCount; i++) {
31         WorkerRunnable<ID, T> runnable = workerRunnableFactory.create(i);
32         Thread workerThread = new Thread(threadGroup, runnable, runnable.getWorkerName());
33         workerThreads.add(workerThread);
34         workerThread.setDaemon(true);
35         workerThread.start();
36     }
37 }

② BatchWorkerRunnable

看批量處理的任務:

  • 首先 getWork 獲取批量任務,它調用 taskDispatcher.requestWorkItems(),實際就是返回了 taskDispatcher 的 batchWorkQueue,並且調用 batchWorkRequests.release() 往訊號量放入一個許可證,這樣前面 AcceptorRunner 就可以得到許可證然後去打包批量任務了
  • 如果 batchWorkQueue 中沒有批量任務,可以看到是一直在 while 循環等待的,直到拿到一個批量任務。它這個 BatchWorkerRunnable 任務和前面的 AcceptorRunner 任務,感覺通過訊號量的方式就形成了一個等待通知的機制,BatchWorkerRunnable 放入一個許可證,讓 AcceptorRunner 拿到這個許可證去打個批次的任務過來。
  • 拿到這個批次任務後,就調用 processor(ReplicationTaskProcessor)來處理任務。
  • 如果任務處理結果是 Congestion(阻塞)、TransientError(傳輸失敗)就要重處理,調用了 taskDispatcher.reprocess 將這個批次的任務提交到重處理隊列 reprocessQueue 中。
 1 static class BatchWorkerRunnable<ID, T> extends WorkerRunnable<ID, T> {
 2 
 3     BatchWorkerRunnable(String workerName, AtomicBoolean isShutdown, TaskExecutorMetrics metrics, TaskProcessor<T> processor, AcceptorExecutor<ID, T> acceptorExecutor) {
 4         super(workerName, isShutdown, metrics, processor, acceptorExecutor);
 5     }
 6 
 7     @Override
 8     public void run() {
 9         try {
10             while (!isShutdown.get()) {
11                 // 獲取一個批量任務
12                 List<TaskHolder<ID, T>> holders = getWork();
13                 metrics.registerExpiryTimes(holders);
14                 // TaskHolder 提取 ReplicationTask
15                 List<T> tasks = getTasksOf(holders);
16                 // processor => 任務複製處理器 ReplicationTaskProcessor
17                 ProcessingResult result = processor.process(tasks);
18                 switch (result) {
19                     case Success:
20                         break;
21                     case Congestion:
22                     case TransientError:
23                         // 阻塞或網路失敗就重新處理這批任務
24                         taskDispatcher.reprocess(holders, result);
25                         break;
26                     case PermanentError:
27                         logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
28                 }
29                 metrics.registerTaskResult(result, tasks.size());
30             }
31         } catch (InterruptedException e) {
32             // Ignore
33         } catch (Throwable e) {
34             // Safe-guard, so we never exit this loop in an uncontrolled way.
35             logger.warn("Discovery WorkerThread error", e);
36         }
37     }
38 
39     private List<TaskHolder<ID, T>> getWork() throws InterruptedException {
40         // 獲取批量隊列 batchWorkQueue
41         BlockingQueue<List<TaskHolder<ID, T>>> workQueue = taskDispatcher.requestWorkItems();
42         List<TaskHolder<ID, T>> result;
43         do {
44             result = workQueue.poll(1, TimeUnit.SECONDS);
45             // 循環等待,直到取到一個批量任務
46         } while (!isShutdown.get() && result == null);
47         return (result == null) ? new ArrayList<>() : result;
48     }
49 
50     private List<T> getTasksOf(List<TaskHolder<ID, T>> holders) {
51         List<T> tasks = new ArrayList<>(holders.size());
52         for (TaskHolder<ID, T> holder : holders) {
53             tasks.add(holder.getTask());
54         }
55         return tasks;
56     }
57 }
1 BlockingQueue<TaskHolder<ID, T>> requestWorkItem() {
2     singleItemWorkRequests.release();
3     return singleItemWorkQueue;
4 }
5 
6 BlockingQueue<List<TaskHolder<ID, T>>> requestWorkItems() {
7     batchWorkRequests.release();
8     return batchWorkQueue;
9 }

③ 任務重處理

可以看到處理失敗後,就是將這批任務添加到重處理隊列 reprocessQueue 中去,然後向時間調整期註冊失敗,這就和前面 AcceptorRunner 處理 reprocessQueue 對應起來了。

1 void reprocess(List<TaskHolder<ID, T>> holders, ProcessingResult processingResult) {
2     // 添加到重處理隊列 reprocessQueue
3     reprocessQueue.addAll(holders);
4     replayedTasks += holders.size();
5     // 時間調整器註冊失敗
6     trafficShaper.registerFailure(processingResult);
7 }

④ TrafficShaper 

還記得前面 AcceptorRunner 中又這樣一段程式碼,可以看到是通過 trafficShaper 計算了一個延遲時間,這裡就來看看是如何計算的。

 1 long now = System.currentTimeMillis();
 2 if (scheduleTime < now) {
 3     // 時間補償,正常情況下 transmissionDelay() 返回 0
 4     scheduleTime = now + trafficShaper.transmissionDelay();
 5 }
 6 if (scheduleTime <= now) {
 7     // 分配批量工作任務:將 pendingTasks 的任務分一批到(最多250個) batchWorkQueue 隊列中
 8     assignBatchWork();
 9     // 分配單項工作任務:pendingTasks 如果還有剩餘任務,將沒有過期的轉移到 singleItemWorkQueue 隊列中
10     assignSingleItemWork();
11 }

時間調整器 TrafficShaper:

  • registerFailure 就是設置了失敗的最後時間
  • 然後看 transmissionDelay,以阻塞為例,如果上一次阻塞失敗到現在 500 毫秒,那麼 transmissionDelay 返回 500,那麼 transmissionDelay 就大於 now 了,就不會打包任務了。
  • 總結下來就是如果上一次阻塞導致批量任務提交失敗,就延遲1000毫秒後執行。如果上一次網路導致批量任務提交失敗,就延遲100毫秒執行。
 1 TrafficShaper(long congestionRetryDelayMs, long networkFailureRetryMs) {
 2     // 1000
 3     this.congestionRetryDelayMs = Math.min(MAX_DELAY, congestionRetryDelayMs);
 4     // 100
 5     this.networkFailureRetryMs = Math.min(MAX_DELAY, networkFailureRetryMs);
 6 }
 7 
 8 void registerFailure(ProcessingResult processingResult) {
 9     if (processingResult == ProcessingResult.Congestion) {
10         // 最後一次阻塞導致提交批處理失敗的時間
11         lastCongestionError = System.currentTimeMillis();
12     } else if (processingResult == ProcessingResult.TransientError) {
13         // 最後一次網路原因導致提交批處理失敗的時間
14         lastNetworkFailure = System.currentTimeMillis();
15     }
16 }
17 
18 // 計算傳輸延遲的時間
19 long transmissionDelay() {
20     if (lastCongestionError == -1 && lastNetworkFailure == -1) {
21         return 0;
22     }
23 
24     long now = System.currentTimeMillis();
25     if (lastCongestionError != -1) {
26         // 阻塞延遲時間
27         long congestionDelay = now - lastCongestionError;
28         if (congestionDelay >= 0 && congestionDelay < congestionRetryDelayMs) {
29             return congestionRetryDelayMs - congestionDelay;
30         }
31         lastCongestionError = -1;
32     }
33 
34     if (lastNetworkFailure != -1) {
35         // 網路延遲時間
36         long failureDelay = now - lastNetworkFailure;
37         if (failureDelay >= 0 && failureDelay < networkFailureRetryMs) {
38             return networkFailureRetryMs - failureDelay;
39         }
40         lastNetworkFailure = -1;
41     }
42     return 0;
43 }

⑤ SingleTaskWorkerRunnable

單項任務處理跟批量任務處理的流程是類似的,只不過是一個個的發送同步操作,處理失敗同樣也會放入重處理隊列中。

一個批量任務250個對於大部分場景來說其實不會觸發單項任務的處理,如果微服務集群中有很多的實例,eureka 通過不斷的輪詢也能盡量使用批量處理,我覺得單項任務處理更像是對批量任務處理的一種補充。

6、複製任務處理器 ReplicationTaskProcessor

批量任務最終是提交到 ReplicationTaskProcessor 去處理的,可以看到,就是調用了 replicationClient 提交了批量任務,提交的介面是 POST peerreplication/batch,那我們就可以從這個入口去看 eureka-server 如何接收批量任務的。

 1 public ProcessingResult process(List<ReplicationTask> tasks) {
 2     // 任務封裝到 ReplicationList
 3     ReplicationList list = createReplicationListOf(tasks);
 4     try {
 5         // 提交批量任務:POST peerreplication/batch/
 6         EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
 7         int statusCode = response.getStatusCode();
 8         if (!isSuccess(statusCode)) {
 9             if (statusCode == 503) {
10                 logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
11                 return ProcessingResult.Congestion;
12             } else {
13                 // Unexpected error returned from the server. This should ideally never happen.
14                 logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
15                 return ProcessingResult.PermanentError;
16             }
17         } else {
18             // 處理批量任務結果
19             handleBatchResponse(tasks, response.getEntity().getResponseList());
20         }
21     } catch (Throwable e) {
22         if (maybeReadTimeOut(e)) {
23             //read timeout exception is more Congestion then TransientError, return Congestion for longer delay
24             return ProcessingResult.Congestion;
25         } else if (isNetworkConnectException(e)) {
26             logNetworkErrorSample(null, e);
27             return ProcessingResult.TransientError;
28         } else {
29             logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
30             return ProcessingResult.PermanentError;
31         }
32     }
33     return ProcessingResult.Success;
34 }

7、接收複製同步請求

很容易找到批量任務提交的介面在 PeerReplicationResource 的 batchReplication 方法中。

可以看到,其實遍歷批量任務,然後根據不同的操作類型,調用 XxxResource 介面進行對應的操作。比如註冊,就是調用 applicationResource.addInstance 完成實例的註冊。

  1 @Path("/{version}/peerreplication")
  2 @Produces({"application/xml", "application/json"})
  3 public class PeerReplicationResource {
  4 
  5     private static final Logger logger = LoggerFactory.getLogger(PeerReplicationResource.class);
  6 
  7     private static final String REPLICATION = "true";
  8 
  9     private final EurekaServerConfig serverConfig;
 10     private final PeerAwareInstanceRegistry registry;
 11 
 12     @Inject
 13     PeerReplicationResource(EurekaServerContext server) {
 14         this.serverConfig = server.getServerConfig();
 15         this.registry = server.getRegistry();
 16     }
 17 
 18     public PeerReplicationResource() {
 19         this(EurekaServerContextHolder.getInstance().getServerContext());
 20     }
 21 
 22     @Path("batch")
 23     @POST
 24     public Response batchReplication(ReplicationList replicationList) {
 25         try {
 26             ReplicationListResponse batchResponse = new ReplicationListResponse();
 27             for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
 28                 try {
 29                     // dispatch 分發任務
 30                     batchResponse.addResponse(dispatch(instanceInfo));
 31                 } catch (Exception e) {
 32                     batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));
 33                     logger.error("{} request processing failed for batch item {}/{}",
 34                             instanceInfo.getAction(), instanceInfo.getAppName(), instanceInfo.getId(), e);
 35                 }
 36             }
 37             return Response.ok(batchResponse).build();
 38         } catch (Throwable e) {
 39             logger.error("Cannot execute batch Request", e);
 40             return Response.status(Status.INTERNAL_SERVER_ERROR).build();
 41         }
 42     }
 43 
 44     private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
 45         ApplicationResource applicationResource = createApplicationResource(instanceInfo);
 46         InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);
 47 
 48         String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp());
 49         String overriddenStatus = toString(instanceInfo.getOverriddenStatus());
 50         String instanceStatus = toString(instanceInfo.getStatus());
 51 
 52         Builder singleResponseBuilder = new Builder();
 53         // 根據不同的類型分別處理
 54         switch (instanceInfo.getAction()) {
 55             case Register:
 56                 singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
 57                 break;
 58             case Heartbeat:
 59                 singleResponseBuilder = handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
 60                 break;
 61             case Cancel:
 62                 singleResponseBuilder = handleCancel(resource);
 63                 break;
 64             case StatusUpdate:
 65                 singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
 66                 break;
 67             case DeleteStatusOverride:
 68                 singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
 69                 break;
 70         }
 71         return singleResponseBuilder.build();
 72     }
 73 
 74     /* Visible for testing */ ApplicationResource createApplicationResource(ReplicationInstance instanceInfo) {
 75         return new ApplicationResource(instanceInfo.getAppName(), serverConfig, registry);
 76     }
 77 
 78     /* Visible for testing */ InstanceResource createInstanceResource(ReplicationInstance instanceInfo,
 79                                                                       ApplicationResource applicationResource) {
 80         return new InstanceResource(applicationResource, instanceInfo.getId(), serverConfig, registry);
 81     }
 82 
 83     private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) {
 84         // addInstance
 85         applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION);
 86         return new Builder().setStatusCode(Status.OK.getStatusCode());
 87     }
 88 
 89     private static Builder handleCancel(InstanceResource resource) {
 90         // cancelLease
 91         Response response = resource.cancelLease(REPLICATION);
 92         return new Builder().setStatusCode(response.getStatus());
 93     }
 94 
 95     private static Builder handleHeartbeat(EurekaServerConfig config, InstanceResource resource, String lastDirtyTimestamp, String overriddenStatus, String instanceStatus) {
 96         Response response = resource.renewLease(REPLICATION, overriddenStatus, instanceStatus, lastDirtyTimestamp);
 97         int responseStatus = response.getStatus();
 98         Builder responseBuilder = new Builder().setStatusCode(responseStatus);
 99 
100         if ("false".equals(config.getExperimental("bugfix.934"))) {
101             if (responseStatus == Status.OK.getStatusCode() && response.getEntity() != null) {
102                 responseBuilder.setResponseEntity((InstanceInfo) response.getEntity());
103             }
104         } else {
105             if ((responseStatus == Status.OK.getStatusCode() || responseStatus == Status.CONFLICT.getStatusCode())
106                     && response.getEntity() != null) {
107                 responseBuilder.setResponseEntity((InstanceInfo) response.getEntity());
108             }
109         }
110         return responseBuilder;
111     }
112 
113     private static Builder handleStatusUpdate(ReplicationInstance instanceInfo, InstanceResource resource) {
114         Response response = resource.statusUpdate(instanceInfo.getStatus(), REPLICATION, toString(instanceInfo.getLastDirtyTimestamp()));
115         return new Builder().setStatusCode(response.getStatus());
116     }
117 
118     private static Builder handleDeleteStatusOverride(ReplicationInstance instanceInfo, InstanceResource resource) {
119         Response response = resource.deleteStatusUpdate(REPLICATION, instanceInfo.getStatus(),
120                 instanceInfo.getLastDirtyTimestamp().toString());
121         return new Builder().setStatusCode(response.getStatus());
122     }
123 
124     private static <T> String toString(T value) {
125         if (value == null) {
126             return null;
127         }
128         return value.toString();
129     }
130 }

View Code

8、集群數據同步衝突問題

Peer to Peer 模式重點要解決的一個問題是數據複製衝突的問題,因為 peer 節點間的相互複製並不能保證所有操作都成功。eureka 主要通過 lastDirtyTimestamp 標識和心跳來進行數據的最終修復,下面就來看下 eureka 如何處理數據衝突問題的。

① 先看續約的這個方法

  • 在續約 renewLease 里,如果 lastDirtyTimestamp 不為空且允許時間戳不一致時進行同步(默認開啟),就調用了 validateDirtyTimestamp 方法校驗 lastDirtyTimestamp。
  • 接著看 validateDirtyTimestamp,如果 lastDirtyTimestamp 與本地實例的 lastDirtyTimestamp 一致,說明數據是一致的,就續約成功,返回 OK(200)。
  • 如果 lastDirtyTimestamp 大於 本地實例的 lastDirtyTimestamp,說明複製的實例最新更新的,出現數據衝突,返回 NOT_FOUND(404)。
  • 如果 lastDirtyTimestamp 小於 本地實例的 lastDirtyTimestamp ,說明複製的實例是舊的,出現數據衝突,返回 CONFLICT(409),並且返回了本地的實例。
 1 @PUT
 2 public Response renewLease(
 3         @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
 4         @QueryParam("overriddenstatus") String overriddenStatus,
 5         @QueryParam("status") String status,
 6         @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
 7     boolean isFromReplicaNode = "true".equals(isReplication);
 8     // 調用註冊表的 renew 進行服務續約
 9     boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
10 
11     // Not found in the registry, immediately ask for a register
12     if (!isSuccess) {
13         logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
14         return Response.status(Status.NOT_FOUND).build();
15     }
16     // Check if we need to sync based on dirty time stamp, the client
17     // instance might have changed some value
18     Response response;
19     // 如果是複製操作,就校驗 lastDirtyTimestamp
20     if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
21         response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
22         // Store the overridden status since the validation found out the node that replicates wins
23         if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
24                 && (overriddenStatus != null)
25                 && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
26                 && isFromReplicaNode) {
27             registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
28         }
29     } else {
30         response = Response.ok().build();
31     }
32     logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
33     return response;
34 }
35 
36 ///////////////////////////////////////////
37 
38 private Response validateDirtyTimestamp(Long lastDirtyTimestamp, boolean isReplication) {
39     InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
40     if (appInfo != null) {
41         // 如果複製傳過來的實例中 lastDirtyTimestamp 不等於本地實例的 lastDirtyTimestamp
42         if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
43             Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};
44 
45             if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
46                 logger.debug(
47                         "Time to sync, since the last dirty timestamp differs -"
48                                 + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
49                         args);
50                 // 如果複製實例的 lastDirtyTimestamp > 本地實例的 lastDirtyTimestamp,表示數據出現衝突,返回 404,要求應用實例重新進行 register 操作
51                 return Response.status(Status.NOT_FOUND).build();
52             } else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
53                 // In the case of replication, send the current instance info in the registry for the
54                 // replicating node to sync itself with this one.
55                 if (isReplication) {
56                     logger.debug(
57                             "Time to sync, since the last dirty timestamp differs -"
58                                     + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
59                             args);
60                     // 如果本地實例的 lastDirtyTimestamp > 複製實例的 lastDirtyTimestamp,就返回 CONFLICT(409),說明數據衝突,要求其同步自己最新的數據
61                     // 注意這裡將本地實例 appInfo 放入 Response entity 中了
62                     return Response.status(Status.CONFLICT).entity(appInfo).build();
63                 } else {
64                     return Response.ok().build();
65                 }
66             }
67         }
68 
69     }
70     return Response.ok().build();
71 }

② 接著看 PeerReplicationResource 處理心跳的方法

  • 首先就是調用了續約的方法 renewLease 進行續約
  • 如果返回的狀態是 OK 或者 CONFLICT,就在 resposeEntity 中返回本地實例
 1 private static Builder handleHeartbeat(EurekaServerConfig config, InstanceResource resource, String lastDirtyTimestamp, String overriddenStatus, String instanceStatus) {
 2     // 調用 renewLease 續約
 3     Response response = resource.renewLease(REPLICATION, overriddenStatus, instanceStatus, lastDirtyTimestamp);
 4     int responseStatus = response.getStatus();
 5     Builder responseBuilder = new Builder().setStatusCode(responseStatus);
 6 
 7     if ("false".equals(config.getExperimental("bugfix.934"))) {
 8         if (responseStatus == Status.OK.getStatusCode() && response.getEntity() != null) {
 9             responseBuilder.setResponseEntity((InstanceInfo) response.getEntity());
10         }
11     } else {
12         if ((responseStatus == Status.OK.getStatusCode() || responseStatus == Status.CONFLICT.getStatusCode())
13                 && response.getEntity() != null) {
14             // 續約成功或 CONFLICT 衝突時,將本地實例 appInfo 返回到客戶端
15             responseBuilder.setResponseEntity((InstanceInfo) response.getEntity());
16         }
17     }
18     return responseBuilder;
19 }

③ PeerEurekaNode 發送心跳

ReplicationTaskProcessor 收到批量任務返回結果後,會處理響應結果,對於心跳任務,可以找到,失敗後就會回調 handleFailure 方法。

  • 如果返回狀態是 404(NOT_FOUND),就會重新註冊,也是提交到隊列中。通過重新註冊來實現數據同步。
  • 如果是其它狀態(409 CONFLICT)並且開啟了時間戳不一致就同步的配置,就將服務端返回的實例註冊到本地,實現數據的同步。
 1 public void heartbeat(final String appName, final String id,
 2                       final InstanceInfo info, final InstanceStatus overriddenStatus,
 3                       boolean primeConnection) throws Throwable {
 4     if (primeConnection) {
 5         // We do not care about the result for priming request.
 6         replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
 7         return;
 8     }
 9     ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
10         @Override
11         public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
12             return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
13         }
14 
15         @Override
16         public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
17             super.handleFailure(statusCode, responseEntity);
18             if (statusCode == 404) {
19                 logger.warn("{}: missing entry.", getTaskName());
20                 if (info != null) {
21                     logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",
22                             getTaskName(), info.getId(), info.getStatus());
23                     // 複製返回 404 時,重新註冊
24                     register(info);
25                 }
26             } else if (config.shouldSyncWhenTimestampDiffers()) {
27                 // 409(CONFLICT)
28                 InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
29                 if (peerInstanceInfo != null) {
30                     syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
31                 }
32             }
33         }
34     };
35     long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
36     batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
37 }
38 
39 //////////////////////////////////////////
40 
41 private void syncInstancesIfTimestampDiffers(String appName, String id, InstanceInfo info, InstanceInfo infoFromPeer) {
42     try {
43         if (infoFromPeer != null) {
44             if (infoFromPeer.getOverriddenStatus() != null && !InstanceStatus.UNKNOWN.equals(infoFromPeer.getOverriddenStatus())) {
45                 logger.warn("Overridden Status info -id {}, mine {}, peer's {}", id, info.getOverriddenStatus(), infoFromPeer.getOverriddenStatus());
46                 registry.storeOverriddenStatusIfRequired(appName, id, infoFromPeer.getOverriddenStatus());
47             }
48             // 將服務端的實例註冊到本地,實現數據同步
49             registry.register(infoFromPeer, true);
50         }
51     } catch (Throwable e) {
52         logger.warn("Exception when trying to set information from peer :", e);
53     }
54 }

至此,我們就可以總結出,eureka server 通過對比 lastDirtyTimestamp 和心跳操作來實現集群數據的複製和最終同步。

前面提到的實例過期就丟棄任務這樣看來就沒問題,它也不保證peer節點間相互複製的所有操作都成功,eureka 採用的是最終一致性,它是通過心跳的方式實現集群數據的最終修復和同步,只是集群間可能會同步延遲。

9、集群同步總結

下面總結下 eureka-server 集群節點間的同步:

  • 首先 eureka-server 集群採用的是 Peer To Peer 的模式,即對等複製,各個 server 不分主從,每個 server 都可以接收寫請求,然後互相之間進行數據更新同步。
  • 數據同步採用了多層任務隊列+批量處理的機制:
    • eureka-server 接收到客戶端請求(註冊、下線、續約)後都會調用集群 PeerEurekaNode 進行操作的同步
    • PeerEurekaNode  將操作封裝成 InstanceReplicationTask 實例複製任務,並用批量分發器 batchingDispatcher(TaskDispatcher)來分發處理
    • batchingDispatcher 內部則將任務交給接收者執行器 AcceptorExecutor 處理,任務首先進入到 AcceptorExecutor 內的接收者隊列 acceptorQueue 中
    • AcceptorExecutor 有個後台工作執行緒(AcceptorRunner)不斷輪詢,將接收者隊列 acceptorQueue 和 重處理隊列 reprocessQueue 中的任務轉移到處理中隊列中(processingOrder + pendingTasks)
    • 接著將處理中隊列中的任務打包,一次最多 250 個任務,然後放到批量工作隊列 batchWorkQueue。如果處理中隊列中還有任務,就將任務放到單項任務隊列 singleItemWorkQueue
    • 任務都打包好了,任務執行器 TaskExecutors 內分別有批量任務處理器(BatchWorkerRunnable)和單項任務處理器(SingleTaskWorkerRunnable)來處理 batchWorkQueue 和 singleItemWorkQueue 中的任務
    • 處理器會利用任務複製處理器(ReplicationTaskProcessor)來提交任務,批量任務會提交給 server 節點的批量介面(peerreplication/batch/),單項任務則提交到對應的操作介面
    • 任務提交如果阻塞或者網路失敗就會被放入重處理隊列 reprocessQueue,然後再次被 AcceptorRunner 輪詢處理,不過過期(超過90秒)的任務會被丟棄掉
  • 其它 eureka-server 同步:
    • 其它 eureka-server 接收到批量複製請求後,會輪詢批量任務列表,根據不同的操作類型(Register、Heartbeat、Cancel 等)分別調用 Resource 的介面進行處理
    • 如果是續約操作,會判斷複製實例的 lastDirtyTimestamp 與本地實例的 lastDirtyTimestamp,如果是一致的,就任務數據一致
    • 如果複製實例的 lastDirtyTimestamp > 本地實例的 lastDirtyTimestamp,則複製實例的數據是最新的,返回 404(NOT_FOUND) 要求客戶端重新發送一個註冊操作過來
    • 如果複製實例的 lastDirtyTimestamp < 本地實例的 lastDirtyTimestamp,則本地實例的數據是最新的,返回 409(CONFLICT)和本地實例,客戶端用返回來的實例覆蓋本地的實例

下面再用一張圖總結集群同步:

十二、SpringCloud Eureka

到這裡,對 Eureka 核心源碼的研究就差不多了,這節來看下 Spring cloud eureka。Spring cloud eureka 提供了服務端的依賴 spring-cloud-starter-netflix-eureka-server 和客戶端的依賴 spring-cloud-starter-netflix-eureka-client,這兩個依賴包本身是比較簡單的,只是對 netflix 的 eureka-server 和 eureka-client 的封裝,它通過一些註解和配置類將 eureka 整合到 springboot 技術棧中,便於使用。

1、spring-cloud-starter-netflix-eureka-server

看 spring-cloud-starter-netflix-eureka-server 我們從 @EnableEurekaServer 這個註解來看,因為我們的註冊中心是基於 springboot 的,在啟動類上加上了 @EnableEurekaServer 註解就啟用了 eureka-server 註冊中心。

① Eureka Server 自動化配置

看這個註解的定義,從注釋中可以了解到,這個註解會激活 EurekaServerAutoConfiguration 的自動化配置類。

 1 /**
 2  * Annotation to activate Eureka Server related configuration.
 3  * {@link EurekaServerAutoConfiguration}
 4  *
 5  * @author Dave Syer
 6  * @author Biju Kunjummen
 7  */
 8 @Target(ElementType.TYPE)
 9 @Retention(RetentionPolicy.RUNTIME)
10 @Documented
11 @Import(EurekaServerMarkerConfiguration.class)
12 public @interface EnableEurekaServer {
13 
14 }

看 EurekaServerAutoConfiguration 這個類,可以發現 springcloud 幾乎是將 com.netflix.eureka.EurekaBootStrap 中初始化組件的程式碼拷貝到了 EurekaServerAutoConfiguration,然後以 springboot 創建 bean 的方式來創建相關的組件。

  1 @Configuration(proxyBeanMethods = false)
  2 @Import(EurekaServerInitializerConfiguration.class)
  3 @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
  4 @EnableConfigurationProperties({ EurekaDashboardProperties.class,
  5         InstanceRegistryProperties.class })
  6 @PropertySource("classpath:/eureka/server.properties")
  7 public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
  8 
  9     /**
 10      * List of packages containing Jersey resources required by the Eureka server.
 11      */
 12     private static final String[] EUREKA_PACKAGES = new String[] {
 13             "com.netflix.discovery", "com.netflix.eureka" };
 14 
 15     @Autowired
 16     private ApplicationInfoManager applicationInfoManager;
 17 
 18     @Autowired
 19     private EurekaServerConfig eurekaServerConfig;
 20 
 21     @Autowired
 22     private EurekaClientConfig eurekaClientConfig;
 23 
 24     @Autowired
 25     private EurekaClient eurekaClient;
 26 
 27     @Autowired
 28     private InstanceRegistryProperties instanceRegistryProperties;
 29 
 30     /**
 31      * A {@link CloudJacksonJson} instance.
 32      */
 33     public static final CloudJacksonJson JACKSON_JSON = new CloudJacksonJson();
 34 
 35     @Bean
 36     public HasFeatures eurekaServerFeature() {
 37         return HasFeatures.namedFeature("Eureka Server",
 38                 EurekaServerAutoConfiguration.class);
 39     }
 40 
 41     @Bean
 42     @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled",
 43             matchIfMissing = true)
 44     public EurekaController eurekaController() {
 45         return new EurekaController(this.applicationInfoManager);
 46     }
 47 
 48     static {
 49         CodecWrappers.registerWrapper(JACKSON_JSON);
 50         EurekaJacksonCodec.setInstance(JACKSON_JSON.getCodec());
 51     }
 52 
 53     @Bean
 54     public ServerCodecs serverCodecs() {
 55         return new CloudServerCodecs(this.eurekaServerConfig);
 56     }
 57 
 58     private static CodecWrapper getFullJson(EurekaServerConfig serverConfig) {
 59         CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getJsonCodecName());
 60         return codec == null ? CodecWrappers.getCodec(JACKSON_JSON.codecName()) : codec;
 61     }
 62 
 63     private static CodecWrapper getFullXml(EurekaServerConfig serverConfig) {
 64         CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getXmlCodecName());
 65         return codec == null ? CodecWrappers.getCodec(CodecWrappers.XStreamXml.class)
 66                 : codec;
 67     }
 68 
 69     @Bean
 70     @ConditionalOnMissingBean
 71     public ReplicationClientAdditionalFilters replicationClientAdditionalFilters() {
 72         return new ReplicationClientAdditionalFilters(Collections.emptySet());
 73     }
 74 
 75     @Bean
 76     public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
 77             ServerCodecs serverCodecs) {
 78         this.eurekaClient.getApplications(); // force initialization
 79         return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
 80                 serverCodecs, this.eurekaClient,
 81                 this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
 82                 this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
 83     }
 84 
 85     @Bean
 86     @ConditionalOnMissingBean
 87     public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
 88             ServerCodecs serverCodecs,
 89             ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
 90         return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
 91                 this.eurekaClientConfig, serverCodecs, this.applicationInfoManager,
 92                 replicationClientAdditionalFilters);
 93     }
 94 
 95     @Bean
 96     @ConditionalOnMissingBean
 97     public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
 98             PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
 99         return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
100                 registry, peerEurekaNodes, this.applicationInfoManager);
101     }
102 
103     @Bean
104     public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
105             EurekaServerContext serverContext) {
106         return new EurekaServerBootstrap(this.applicationInfoManager,
107                 this.eurekaClientConfig, this.eurekaServerConfig, registry,
108                 serverContext);
109     }
110 
111     /**
112      * Register the Jersey filter.
113      * @param eurekaJerseyApp an {@link Application} for the filter to be registered
114      * @return a jersey {@link FilterRegistrationBean}
115      */
116     @Bean
117     public FilterRegistrationBean<?> jerseyFilterRegistration(
118             javax.ws.rs.core.Application eurekaJerseyApp) {
119         FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
120         bean.setFilter(new ServletContainer(eurekaJerseyApp));
121         bean.setOrder(Ordered.LOWEST_PRECEDENCE);
122         bean.setUrlPatterns(
123                 Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
124 
125         return bean;
126     }
127 
128     /**
129      * Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources
130      * required by the Eureka server.
131      * @param environment an {@link Environment} instance to retrieve classpath resources
132      * @param resourceLoader a {@link ResourceLoader} instance to get classloader from
133      * @return created {@link Application} object
134      */
135     @Bean
136     public javax.ws.rs.core.Application jerseyApplication(Environment environment,
137             ResourceLoader resourceLoader) {
138 
139         ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
140                 false, environment);
141 
142         // Filter to include only classes that have a particular annotation.
143         //
144         provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
145         provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));
146 
147         // Find classes in Eureka packages (or subpackages)
148         //
149         Set<Class<?>> classes = new HashSet<>();
150         for (String basePackage : EUREKA_PACKAGES) {
151             Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
152             for (BeanDefinition bd : beans) {
153                 Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),
154                         resourceLoader.getClassLoader());
155                 classes.add(cls);
156             }
157         }
158 
159         // Construct the Jersey ResourceConfig
160         Map<String, Object> propsAndFeatures = new HashMap<>();
161         propsAndFeatures.put(
162                 // Skip static content used by the webapp
163                 ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,
164                 EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");
165 
166         DefaultResourceConfig rc = new DefaultResourceConfig(classes);
167         rc.setPropertiesAndFeatures(propsAndFeatures);
168 
169         return rc;
170     }
171 
172     @Bean
173     @ConditionalOnBean(name = "httpTraceFilter")
174     public FilterRegistrationBean<?> traceFilterRegistration(
175             @Qualifier("httpTraceFilter") Filter filter) {
176         FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
177         bean.setFilter(filter);
178         bean.setOrder(Ordered.LOWEST_PRECEDENCE - 10);
179         return bean;
180     }
181 
182     @Configuration(proxyBeanMethods = false)
183     protected static class EurekaServerConfigBeanConfiguration {
184 
185         @Bean
186         @ConditionalOnMissingBean
187         public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
188             EurekaServerConfigBean server = new EurekaServerConfigBean();
189             if (clientConfig.shouldRegisterWithEureka()) {
190                 // Set a sensible default if we are supposed to replicate
191                 server.setRegistrySyncRetries(5);
192             }
193             return server;
194         }
195 
196     }
197 
198     /**
199      * {@link PeerEurekaNodes} which updates peers when /refresh is invoked. Peers are
200      * updated only if <code>eureka.client.use-dns-for-fetching-service-urls</code> is
201      * <code>false</code> and one of following properties have changed.
202      * <p>
203      * </p>
204      * <ul>
205      * <li><code>eureka.client.availability-zones</code></li>
206      * <li><code>eureka.client.region</code></li>
207      * <li><code>eureka.client.service-url.&lt;zone&gt;</code></li>
208      * </ul>
209      */
210     static class RefreshablePeerEurekaNodes extends PeerEurekaNodes
211             implements ApplicationListener<EnvironmentChangeEvent> {
212 
213         private ReplicationClientAdditionalFilters replicationClientAdditionalFilters;
214 
215         RefreshablePeerEurekaNodes(final PeerAwareInstanceRegistry registry,
216                 final EurekaServerConfig serverConfig,
217                 final EurekaClientConfig clientConfig, final ServerCodecs serverCodecs,
218                 final ApplicationInfoManager applicationInfoManager,
219                 final ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
220             super(registry, serverConfig, clientConfig, serverCodecs,
221                     applicationInfoManager);
222             this.replicationClientAdditionalFilters = replicationClientAdditionalFilters;
223         }
224 
225         @Override
226         protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
227             JerseyReplicationClient replicationClient = JerseyReplicationClient
228                     .createReplicationClient(serverConfig, serverCodecs,
229                             peerEurekaNodeUrl);
230 
231             this.replicationClientAdditionalFilters.getFilters()
232                     .forEach(replicationClient::addReplicationClientFilter);
233 
234             String targetHost = hostFromUrl(peerEurekaNodeUrl);
235             if (targetHost == null) {
236                 targetHost = "host";
237             }
238             return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl,
239                     replicationClient, serverConfig);
240         }
241 
242         @Override
243         public void onApplicationEvent(final EnvironmentChangeEvent event) {
244             if (shouldUpdate(event.getKeys())) {
245                 updatePeerEurekaNodes(resolvePeerUrls());
246             }
247         }
248 
249         /*
250          * Check whether specific properties have changed.
251          */
252         protected boolean shouldUpdate(final Set<String> changedKeys) {
253             assert changedKeys != null;
254 
255             // if eureka.client.use-dns-for-fetching-service-urls is true, then
256             // service-url will not be fetched from environment.
257             if (this.clientConfig.shouldUseDnsForFetchingServiceUrls()) {
258                 return false;
259             }
260 
261             if (changedKeys.contains("eureka.client.region")) {
262                 return true;
263             }
264 
265             for (final String key : changedKeys) {
266                 // property keys are not expected to be null.
267                 if (key.startsWith("eureka.client.service-url.")
268                         || key.startsWith("eureka.client.availability-zones.")) {
269                     return true;
270                 }
271             }
272             return false;
273         }
274 
275     }
276 
277     class CloudServerCodecs extends DefaultServerCodecs {
278 
279         CloudServerCodecs(EurekaServerConfig serverConfig) {
280             super(getFullJson(serverConfig),
281                     CodecWrappers.getCodec(CodecWrappers.JacksonJsonMini.class),
282                     getFullXml(serverConfig),
283                     CodecWrappers.getCodec(CodecWrappers.JacksonXmlMini.class));
284         }
285 
286     }
287 
288 }

View Code

但是要注意,springcloud 中,感知集群的註冊表組件是 InstanceRegistry,集群 PeerEurekaNodes 組件是 RefreshablePeerEurekaNodes。

② Springboot 方式的配置

EurekaServerAutoConfiguration 中注入了 EurekaServerConfig、EurekaClientConfig,在 Netflix 中,EurekaServerConfig 默認讀取的是 eureka-server.properties 配置文件,EurekaClientConfig 默認讀取的是 eureka-client.properties 配置文件。而在 springcloud 中,它們的實現類為 EurekaServerConfigBean、EurekaClientConfigBean,可以看到,就是基於 springboot 的配置方式來的了,讀取的是 application.yml 配置文件中 eureka 的配置了,並且每個配置也提供了默認值。

例如 EurekaServerConfigBean:

   1 @ConfigurationProperties(EurekaServerConfigBean.PREFIX)
   2 public class EurekaServerConfigBean implements EurekaServerConfig {
   3 
   4     /**
   5      * Eureka server configuration properties prefix.
   6      */
   7     public static final String PREFIX = "eureka.server";
   8 
   9     private static final int MINUTES = 60 * 1000;
  10 
  11     @Autowired(required = false)
  12     PropertyResolver propertyResolver;
  13 
  14     private String aWSAccessId;
  15 
  16     private String aWSSecretKey;
  17 
  18     private int eIPBindRebindRetries = 3;
  19 
  20     private int eIPBindingRetryIntervalMs = 5 * MINUTES;
  21 
  22     private int eIPBindingRetryIntervalMsWhenUnbound = 1 * MINUTES;
  23 
  24     private boolean enableSelfPreservation = true;
  25 
  26     private double renewalPercentThreshold = 0.85;
  27 
  28     private int renewalThresholdUpdateIntervalMs = 15 * MINUTES;
  29 
  30     private int peerEurekaNodesUpdateIntervalMs = 10 * MINUTES;
  31 
  32     private int numberOfReplicationRetries = 5;
  33 
  34     private int peerEurekaStatusRefreshTimeIntervalMs = 30 * 1000;
  35 
  36     private int waitTimeInMsWhenSyncEmpty = 5 * MINUTES;
  37 
  38     private int peerNodeConnectTimeoutMs = 200;
  39 
  40     private int peerNodeReadTimeoutMs = 200;
  41 
  42     private int peerNodeTotalConnections = 1000;
  43 
  44     private int peerNodeTotalConnectionsPerHost = 500;
  45 
  46     private int peerNodeConnectionIdleTimeoutSeconds = 30;
  47 
  48     private long retentionTimeInMSInDeltaQueue = 3 * MINUTES;
  49 
  50     private long deltaRetentionTimerIntervalInMs = 30 * 1000;
  51 
  52     private long evictionIntervalTimerInMs = 60 * 1000;
  53 
  54     private int aSGQueryTimeoutMs = 300;
  55 
  56     private long aSGUpdateIntervalMs = 5 * MINUTES;
  57 
  58     private long aSGCacheExpiryTimeoutMs = 10 * MINUTES; // defaults to longer than the
  59 
  60     // asg update interval
  61 
  62     private long responseCacheAutoExpirationInSeconds = 180;
  63 
  64     private long responseCacheUpdateIntervalMs = 30 * 1000;
  65 
  66     private boolean useReadOnlyResponseCache = true;
  67 
  68     private boolean disableDelta;
  69 
  70     private long maxIdleThreadInMinutesAgeForStatusReplication = 10;
  71 
  72     private int minThreadsForStatusReplication = 1;
  73 
  74     private int maxThreadsForStatusReplication = 1;
  75 
  76     private int maxElementsInStatusReplicationPool = 10000;
  77 
  78     private boolean syncWhenTimestampDiffers = true;
  79 
  80     private int registrySyncRetries = 0;
  81 
  82     private long registrySyncRetryWaitMs = 30 * 1000;
  83 
  84     private int maxElementsInPeerReplicationPool = 10000;
  85 
  86     private long maxIdleThreadAgeInMinutesForPeerReplication = 15;
  87 
  88     private int minThreadsForPeerReplication = 5;
  89 
  90     private int maxThreadsForPeerReplication = 20;
  91 
  92     private int maxTimeForReplication = 30000;
  93 
  94     private boolean primeAwsReplicaConnections = true;
  95 
  96     private boolean disableDeltaForRemoteRegions;
  97 
  98     private int remoteRegionConnectTimeoutMs = 1000;
  99 
 100     private int remoteRegionReadTimeoutMs = 1000;
 101 
 102     private int remoteRegionTotalConnections = 1000;
 103 
 104     private int remoteRegionTotalConnectionsPerHost = 500;
 105 
 106     private int remoteRegionConnectionIdleTimeoutSeconds = 30;
 107 
 108     private boolean gZipContentFromRemoteRegion = true;
 109 
 110     private Map<String, String> remoteRegionUrlsWithName = new HashMap<>();
 111 
 112     private String[] remoteRegionUrls;
 113 
 114     private Map<String, Set<String>> remoteRegionAppWhitelist = new HashMap<>();
 115 
 116     private int remoteRegionRegistryFetchInterval = 30;
 117 
 118     private int remoteRegionFetchThreadPoolSize = 20;
 119 
 120     private String remoteRegionTrustStore = "";
 121 
 122     private String remoteRegionTrustStorePassword = "changeit";
 123 
 124     private boolean disableTransparentFallbackToOtherRegion;
 125 
 126     private boolean batchReplication;
 127 
 128     private boolean rateLimiterEnabled = false;
 129 
 130     private boolean rateLimiterThrottleStandardClients = false;
 131 
 132     private Set<String> rateLimiterPrivilegedClients = Collections.emptySet();
 133 
 134     private int rateLimiterBurstSize = 10;
 135 
 136     private int rateLimiterRegistryFetchAverageRate = 500;
 137 
 138     private int rateLimiterFullFetchAverageRate = 100;
 139 
 140     private boolean logIdentityHeaders = true;
 141 
 142     private String listAutoScalingGroupsRoleName = "ListAutoScalingGroups";
 143 
 144     private boolean enableReplicatedRequestCompression = false;
 145 
 146     private String jsonCodecName;
 147 
 148     private String xmlCodecName;
 149 
 150     private int route53BindRebindRetries = 3;
 151 
 152     private int route53BindingRetryIntervalMs = 5 * MINUTES;
 153 
 154     private long route53DomainTTL = 30;
 155 
 156     private AwsBindingStrategy bindingStrategy = AwsBindingStrategy.EIP;
 157 
 158     private int minAvailableInstancesForPeerReplication = -1;
 159 
 160     private int initialCapacityOfResponseCache = 1000;
 161 
 162     private int expectedClientRenewalIntervalSeconds = 30;
 163 
 164     private boolean useAwsAsgApi = true;
 165 
 166     private String myUrl;
 167 
 168     @Override
 169     public boolean shouldEnableSelfPreservation() {
 170         return this.enableSelfPreservation;
 171     }
 172 
 173     @Override
 174     public boolean shouldDisableDelta() {
 175         return this.disableDelta;
 176     }
 177 
 178     @Override
 179     public boolean shouldSyncWhenTimestampDiffers() {
 180         return this.syncWhenTimestampDiffers;
 181     }
 182 
 183     @Override
 184     public boolean shouldPrimeAwsReplicaConnections() {
 185         return this.primeAwsReplicaConnections;
 186     }
 187 
 188     @Override
 189     public boolean shouldDisableDeltaForRemoteRegions() {
 190         return this.disableDeltaForRemoteRegions;
 191     }
 192 
 193     @Override
 194     public boolean shouldGZipContentFromRemoteRegion() {
 195         return this.gZipContentFromRemoteRegion;
 196     }
 197 
 198     @Override
 199     public Set<String> getRemoteRegionAppWhitelist(String regionName) {
 200         return this.remoteRegionAppWhitelist
 201                 .get(regionName == null ? "global" : regionName.trim().toLowerCase());
 202     }
 203 
 204     @Override
 205     public boolean disableTransparentFallbackToOtherRegion() {
 206         return this.disableTransparentFallbackToOtherRegion;
 207     }
 208 
 209     @Override
 210     public boolean shouldBatchReplication() {
 211         return this.batchReplication;
 212     }
 213 
 214     @Override
 215     public String getMyUrl() {
 216         return this.myUrl;
 217     }
 218 
 219     public void setMyUrl(String myUrl) {
 220         this.myUrl = myUrl;
 221     }
 222 
 223     @Override
 224     public boolean shouldLogIdentityHeaders() {
 225         return this.logIdentityHeaders;
 226     }
 227 
 228     @Override
 229     public String getJsonCodecName() {
 230         return this.jsonCodecName;
 231     }
 232 
 233     @Override
 234     public String getXmlCodecName() {
 235         return this.xmlCodecName;
 236     }
 237 
 238     @Override
 239     public boolean shouldUseReadOnlyResponseCache() {
 240         return this.useReadOnlyResponseCache;
 241     }
 242 
 243     @Override
 244     public boolean shouldEnableReplicatedRequestCompression() {
 245         return this.enableReplicatedRequestCompression;
 246     }
 247 
 248     @Override
 249     public String getExperimental(String name) {
 250         if (this.propertyResolver != null) {
 251             return this.propertyResolver.getProperty(PREFIX + ".experimental." + name,
 252                     String.class, null);
 253         }
 254         return null;
 255     }
 256 
 257     @Override
 258     public int getInitialCapacityOfResponseCache() {
 259         return this.initialCapacityOfResponseCache;
 260     }
 261 
 262     public void setInitialCapacityOfResponseCache(int initialCapacityOfResponseCache) {
 263         this.initialCapacityOfResponseCache = initialCapacityOfResponseCache;
 264     }
 265 
 266     @Override
 267     public int getHealthStatusMinNumberOfAvailablePeers() {
 268         return this.minAvailableInstancesForPeerReplication;
 269     }
 270 
 271     public PropertyResolver getPropertyResolver() {
 272         return propertyResolver;
 273     }
 274 
 275     public void setPropertyResolver(PropertyResolver propertyResolver) {
 276         this.propertyResolver = propertyResolver;
 277     }
 278 
 279     public String getAWSAccessId() {
 280         return aWSAccessId;
 281     }
 282 
 283     public void setAWSAccessId(String aWSAccessId) {
 284         this.aWSAccessId = aWSAccessId;
 285     }
 286 
 287     public String getAWSSecretKey() {
 288         return aWSSecretKey;
 289     }
 290 
 291     public void setAWSSecretKey(String aWSSecretKey) {
 292         this.aWSSecretKey = aWSSecretKey;
 293     }
 294 
 295     public int getEIPBindRebindRetries() {
 296         return eIPBindRebindRetries;
 297     }
 298 
 299     public void setEIPBindRebindRetries(int eIPBindRebindRetries) {
 300         this.eIPBindRebindRetries = eIPBindRebindRetries;
 301     }
 302 
 303     public int getEIPBindingRetryIntervalMs() {
 304         return eIPBindingRetryIntervalMs;
 305     }
 306 
 307     public void setEIPBindingRetryIntervalMs(int eIPBindingRetryIntervalMs) {
 308         this.eIPBindingRetryIntervalMs = eIPBindingRetryIntervalMs;
 309     }
 310 
 311     public int getEIPBindingRetryIntervalMsWhenUnbound() {
 312         return eIPBindingRetryIntervalMsWhenUnbound;
 313     }
 314 
 315     public void setEIPBindingRetryIntervalMsWhenUnbound(
 316             int eIPBindingRetryIntervalMsWhenUnbound) {
 317         this.eIPBindingRetryIntervalMsWhenUnbound = eIPBindingRetryIntervalMsWhenUnbound;
 318     }
 319 
 320     public boolean isEnableSelfPreservation() {
 321         return enableSelfPreservation;
 322     }
 323 
 324     public void setEnableSelfPreservation(boolean enableSelfPreservation) {
 325         this.enableSelfPreservation = enableSelfPreservation;
 326     }
 327 
 328     @Override
 329     public double getRenewalPercentThreshold() {
 330         return renewalPercentThreshold;
 331     }
 332 
 333     public void setRenewalPercentThreshold(double renewalPercentThreshold) {
 334         this.renewalPercentThreshold = renewalPercentThreshold;
 335     }
 336 
 337     @Override
 338     public int getRenewalThresholdUpdateIntervalMs() {
 339         return renewalThresholdUpdateIntervalMs;
 340     }
 341 
 342     @Override
 343     public int getExpectedClientRenewalIntervalSeconds() {
 344         return this.expectedClientRenewalIntervalSeconds;
 345     }
 346 
 347     public void setExpectedClientRenewalIntervalSeconds(
 348             int expectedClientRenewalIntervalSeconds) {
 349         this.expectedClientRenewalIntervalSeconds = expectedClientRenewalIntervalSeconds;
 350     }
 351 
 352     public void setRenewalThresholdUpdateIntervalMs(
 353             int renewalThresholdUpdateIntervalMs) {
 354         this.renewalThresholdUpdateIntervalMs = renewalThresholdUpdateIntervalMs;
 355     }
 356 
 357     @Override
 358     public int getPeerEurekaNodesUpdateIntervalMs() {
 359         return peerEurekaNodesUpdateIntervalMs;
 360     }
 361 
 362     public void setPeerEurekaNodesUpdateIntervalMs(int peerEurekaNodesUpdateIntervalMs) {
 363         this.peerEurekaNodesUpdateIntervalMs = peerEurekaNodesUpdateIntervalMs;
 364     }
 365 
 366     @Override
 367     public int getNumberOfReplicationRetries() {
 368         return numberOfReplicationRetries;
 369     }
 370 
 371     public void setNumberOfReplicationRetries(int numberOfReplicationRetries) {
 372         this.numberOfReplicationRetries = numberOfReplicationRetries;
 373     }
 374 
 375     @Override
 376     public int getPeerEurekaStatusRefreshTimeIntervalMs() {
 377         return peerEurekaStatusRefreshTimeIntervalMs;
 378     }
 379 
 380     public void setPeerEurekaStatusRefreshTimeIntervalMs(
 381             int peerEurekaStatusRefreshTimeIntervalMs) {
 382         this.peerEurekaStatusRefreshTimeIntervalMs = peerEurekaStatusRefreshTimeIntervalMs;
 383     }
 384 
 385     @Override
 386     public int getWaitTimeInMsWhenSyncEmpty() {
 387         return waitTimeInMsWhenSyncEmpty;
 388     }
 389 
 390     public void setWaitTimeInMsWhenSyncEmpty(int waitTimeInMsWhenSyncEmpty) {
 391         this.waitTimeInMsWhenSyncEmpty = waitTimeInMsWhenSyncEmpty;
 392     }
 393 
 394     @Override
 395     public int getPeerNodeConnectTimeoutMs() {
 396         return peerNodeConnectTimeoutMs;
 397     }
 398 
 399     public void setPeerNodeConnectTimeoutMs(int peerNodeConnectTimeoutMs) {
 400         this.peerNodeConnectTimeoutMs = peerNodeConnectTimeoutMs;
 401     }
 402 
 403     @Override
 404     public int getPeerNodeReadTimeoutMs() {
 405         return peerNodeReadTimeoutMs;
 406     }
 407 
 408     public void setPeerNodeReadTimeoutMs(int peerNodeReadTimeoutMs) {
 409         this.peerNodeReadTimeoutMs = peerNodeReadTimeoutMs;
 410     }
 411 
 412     @Override
 413     public int getPeerNodeTotalConnections() {
 414         return peerNodeTotalConnections;
 415     }
 416 
 417     public void setPeerNodeTotalConnections(int peerNodeTotalConnections) {
 418         this.peerNodeTotalConnections = peerNodeTotalConnections;
 419     }
 420 
 421     @Override
 422     public int getPeerNodeTotalConnectionsPerHost() {
 423         return peerNodeTotalConnectionsPerHost;
 424     }
 425 
 426     public void setPeerNodeTotalConnectionsPerHost(int peerNodeTotalConnectionsPerHost) {
 427         this.peerNodeTotalConnectionsPerHost = peerNodeTotalConnectionsPerHost;
 428     }
 429 
 430     @Override
 431     public int getPeerNodeConnectionIdleTimeoutSeconds() {
 432         return peerNodeConnectionIdleTimeoutSeconds;
 433     }
 434 
 435     public void setPeerNodeConnectionIdleTimeoutSeconds(
 436             int peerNodeConnectionIdleTimeoutSeconds) {
 437         this.peerNodeConnectionIdleTimeoutSeconds = peerNodeConnectionIdleTimeoutSeconds;
 438     }
 439 
 440     @Override
 441     public long getRetentionTimeInMSInDeltaQueue() {
 442         return retentionTimeInMSInDeltaQueue;
 443     }
 444 
 445     public void setRetentionTimeInMSInDeltaQueue(long retentionTimeInMSInDeltaQueue) {
 446         this.retentionTimeInMSInDeltaQueue = retentionTimeInMSInDeltaQueue;
 447     }
 448 
 449     @Override
 450     public long getDeltaRetentionTimerIntervalInMs() {
 451         return deltaRetentionTimerIntervalInMs;
 452     }
 453 
 454     public void setDeltaRetentionTimerIntervalInMs(long deltaRetentionTimerIntervalInMs) {
 455         this.deltaRetentionTimerIntervalInMs = deltaRetentionTimerIntervalInMs;
 456     }
 457 
 458     @Override
 459     public long getEvictionIntervalTimerInMs() {
 460         return evictionIntervalTimerInMs;
 461     }
 462 
 463     @Override
 464     public boolean shouldUseAwsAsgApi() {
 465         return this.useAwsAsgApi;
 466     }
 467 
 468     public void setUseAwsAsgApi(boolean useAwsAsgApi) {
 469         this.useAwsAsgApi = useAwsAsgApi;
 470     }
 471 
 472     public void setEvictionIntervalTimerInMs(long evictionIntervalTimerInMs) {
 473         this.evictionIntervalTimerInMs = evictionIntervalTimerInMs;
 474     }
 475 
 476     public int getASGQueryTimeoutMs() {
 477         return aSGQueryTimeoutMs;
 478     }
 479 
 480     public void setASGQueryTimeoutMs(int aSGQueryTimeoutMs) {
 481         this.aSGQueryTimeoutMs = aSGQueryTimeoutMs;
 482     }
 483 
 484     public long getASGUpdateIntervalMs() {
 485         return aSGUpdateIntervalMs;
 486     }
 487 
 488     public void setASGUpdateIntervalMs(long aSGUpdateIntervalMs) {
 489         this.aSGUpdateIntervalMs = aSGUpdateIntervalMs;
 490     }
 491 
 492     public long getASGCacheExpiryTimeoutMs() {
 493         return aSGCacheExpiryTimeoutMs;
 494     }
 495 
 496     public void setASGCacheExpiryTimeoutMs(long aSGCacheExpiryTimeoutMs) {
 497         this.aSGCacheExpiryTimeoutMs = aSGCacheExpiryTimeoutMs;
 498     }
 499 
 500     @Override
 501     public long getResponseCacheAutoExpirationInSeconds() {
 502         return responseCacheAutoExpirationInSeconds;
 503     }
 504 
 505     public void setResponseCacheAutoExpirationInSeconds(
 506             long responseCacheAutoExpirationInSeconds) {
 507         this.responseCacheAutoExpirationInSeconds = responseCacheAutoExpirationInSeconds;
 508     }
 509 
 510     @Override
 511     public long getResponseCacheUpdateIntervalMs() {
 512         return responseCacheUpdateIntervalMs;
 513     }
 514 
 515     public void setResponseCacheUpdateIntervalMs(long responseCacheUpdateIntervalMs) {
 516         this.responseCacheUpdateIntervalMs = responseCacheUpdateIntervalMs;
 517     }
 518 
 519     public boolean isUseReadOnlyResponseCache() {
 520         return useReadOnlyResponseCache;
 521     }
 522 
 523     public void setUseReadOnlyResponseCache(boolean useReadOnlyResponseCache) {
 524         this.useReadOnlyResponseCache = useReadOnlyResponseCache;
 525     }
 526 
 527     public boolean isDisableDelta() {
 528         return disableDelta;
 529     }
 530 
 531     public void setDisableDelta(boolean disableDelta) {
 532         this.disableDelta = disableDelta;
 533     }
 534 
 535     @Override
 536     public long getMaxIdleThreadInMinutesAgeForStatusReplication() {
 537         return maxIdleThreadInMinutesAgeForStatusReplication;
 538     }
 539 
 540     public void setMaxIdleThreadInMinutesAgeForStatusReplication(
 541             long maxIdleThreadInMinutesAgeForStatusReplication) {
 542         this.maxIdleThreadInMinutesAgeForStatusReplication = maxIdleThreadInMinutesAgeForStatusReplication;
 543     }
 544 
 545     @Override
 546     public int getMinThreadsForStatusReplication() {
 547         return minThreadsForStatusReplication;
 548     }
 549 
 550     public void setMinThreadsForStatusReplication(int minThreadsForStatusReplication) {
 551         this.minThreadsForStatusReplication = minThreadsForStatusReplication;
 552     }
 553 
 554     @Override
 555     public int getMaxThreadsForStatusReplication() {
 556         return maxThreadsForStatusReplication;
 557     }
 558 
 559     public void setMaxThreadsForStatusReplication(int maxThreadsForStatusReplication) {
 560         this.maxThreadsForStatusReplication = maxThreadsForStatusReplication;
 561     }
 562 
 563     @Override
 564     public int getMaxElementsInStatusReplicationPool() {
 565         return maxElementsInStatusReplicationPool;
 566     }
 567 
 568     public void setMaxElementsInStatusReplicationPool(
 569             int maxElementsInStatusReplicationPool) {
 570         this.maxElementsInStatusReplicationPool = maxElementsInStatusReplicationPool;
 571     }
 572 
 573     public boolean isSyncWhenTimestampDiffers() {
 574         return syncWhenTimestampDiffers;
 575     }
 576 
 577     public void setSyncWhenTimestampDiffers(boolean syncWhenTimestampDiffers) {
 578         this.syncWhenTimestampDiffers = syncWhenTimestampDiffers;
 579     }
 580 
 581     @Override
 582     public int getRegistrySyncRetries() {
 583         return registrySyncRetries;
 584     }
 585 
 586     public void setRegistrySyncRetries(int registrySyncRetries) {
 587         this.registrySyncRetries = registrySyncRetries;
 588     }
 589 
 590     @Override
 591     public long getRegistrySyncRetryWaitMs() {
 592         return registrySyncRetryWaitMs;
 593     }
 594 
 595     public void setRegistrySyncRetryWaitMs(long registrySyncRetryWaitMs) {
 596         this.registrySyncRetryWaitMs = registrySyncRetryWaitMs;
 597     }
 598 
 599     @Override
 600     public int getMaxElementsInPeerReplicationPool() {
 601         return maxElementsInPeerReplicationPool;
 602     }
 603 
 604     public void setMaxElementsInPeerReplicationPool(
 605             int maxElementsInPeerReplicationPool) {
 606         this.maxElementsInPeerReplicationPool = maxElementsInPeerReplicationPool;
 607     }
 608 
 609     @Override
 610     public long getMaxIdleThreadAgeInMinutesForPeerReplication() {
 611         return maxIdleThreadAgeInMinutesForPeerReplication;
 612     }
 613 
 614     public void setMaxIdleThreadAgeInMinutesForPeerReplication(
 615             long maxIdleThreadAgeInMinutesForPeerReplication) {
 616         this.maxIdleThreadAgeInMinutesForPeerReplication = maxIdleThreadAgeInMinutesForPeerReplication;
 617     }
 618 
 619     @Override
 620     public int getMinThreadsForPeerReplication() {
 621         return minThreadsForPeerReplication;
 622     }
 623 
 624     public void setMinThreadsForPeerReplication(int minThreadsForPeerReplication) {
 625         this.minThreadsForPeerReplication = minThreadsForPeerReplication;
 626     }
 627 
 628     @Override
 629     public int getMaxThreadsForPeerReplication() {
 630         return maxThreadsForPeerReplication;
 631     }
 632 
 633     public void setMaxThreadsForPeerReplication(int maxThreadsForPeerReplication) {
 634         this.maxThreadsForPeerReplication = maxThreadsForPeerReplication;
 635     }
 636 
 637     @Override
 638     public int getMaxTimeForReplication() {
 639         return maxTimeForReplication;
 640     }
 641 
 642     public void setMaxTimeForReplication(int maxTimeForReplication) {
 643         this.maxTimeForReplication = maxTimeForReplication;
 644     }
 645 
 646     public boolean isPrimeAwsReplicaConnections() {
 647         return primeAwsReplicaConnections;
 648     }
 649 
 650     public void setPrimeAwsReplicaConnections(boolean primeAwsReplicaConnections) {
 651         this.primeAwsReplicaConnections = primeAwsReplicaConnections;
 652     }
 653 
 654     public boolean isDisableDeltaForRemoteRegions() {
 655         return disableDeltaForRemoteRegions;
 656     }
 657 
 658     public void setDisableDeltaForRemoteRegions(boolean disableDeltaForRemoteRegions) {
 659         this.disableDeltaForRemoteRegions = disableDeltaForRemoteRegions;
 660     }
 661 
 662     @Override
 663     public int getRemoteRegionConnectTimeoutMs() {
 664         return remoteRegionConnectTimeoutMs;
 665     }
 666 
 667     public void setRemoteRegionConnectTimeoutMs(int remoteRegionConnectTimeoutMs) {
 668         this.remoteRegionConnectTimeoutMs = remoteRegionConnectTimeoutMs;
 669     }
 670 
 671     @Override
 672     public int getRemoteRegionReadTimeoutMs() {
 673         return remoteRegionReadTimeoutMs;
 674     }
 675 
 676     public void setRemoteRegionReadTimeoutMs(int remoteRegionReadTimeoutMs) {
 677         this.remoteRegionReadTimeoutMs = remoteRegionReadTimeoutMs;
 678     }
 679 
 680     @Override
 681     public int getRemoteRegionTotalConnections() {
 682         return remoteRegionTotalConnections;
 683     }
 684 
 685     public void setRemoteRegionTotalConnections(int remoteRegionTotalConnections) {
 686         this.remoteRegionTotalConnections = remoteRegionTotalConnections;
 687     }
 688 
 689     @Override
 690     public int getRemoteRegionTotalConnectionsPerHost() {
 691         return remoteRegionTotalConnectionsPerHost;
 692     }
 693 
 694     public void setRemoteRegionTotalConnectionsPerHost(
 695             int remoteRegionTotalConnectionsPerHost) {
 696         this.remoteRegionTotalConnectionsPerHost = remoteRegionTotalConnectionsPerHost;
 697     }
 698 
 699     @Override
 700     public int getRemoteRegionConnectionIdleTimeoutSeconds() {
 701         return remoteRegionConnectionIdleTimeoutSeconds;
 702     }
 703 
 704     public void setRemoteRegionConnectionIdleTimeoutSeconds(
 705             int remoteRegionConnectionIdleTimeoutSeconds) {
 706         this.remoteRegionConnectionIdleTimeoutSeconds = remoteRegionConnectionIdleTimeoutSeconds;
 707     }
 708 
 709     public boolean isgZipContentFromRemoteRegion() {
 710         return gZipContentFromRemoteRegion;
 711     }
 712 
 713     public void setgZipContentFromRemoteRegion(boolean gZipContentFromRemoteRegion) {
 714         this.gZipContentFromRemoteRegion = gZipContentFromRemoteRegion;
 715     }
 716 
 717     @Override
 718     public Map<String, String> getRemoteRegionUrlsWithName() {
 719         return remoteRegionUrlsWithName;
 720     }
 721 
 722     public void setRemoteRegionUrlsWithName(
 723             Map<String, String> remoteRegionUrlsWithName) {
 724         this.remoteRegionUrlsWithName = remoteRegionUrlsWithName;
 725     }
 726 
 727     @Override
 728     public String[] getRemoteRegionUrls() {
 729         return remoteRegionUrls;
 730     }
 731 
 732     public void setRemoteRegionUrls(String[] remoteRegionUrls) {
 733         this.remoteRegionUrls = remoteRegionUrls;
 734     }
 735 
 736     public Map<String, Set<String>> getRemoteRegionAppWhitelist() {
 737         return remoteRegionAppWhitelist;
 738     }
 739 
 740     public void setRemoteRegionAppWhitelist(
 741             Map<String, Set<String>> remoteRegionAppWhitelist) {
 742         this.remoteRegionAppWhitelist = remoteRegionAppWhitelist;
 743     }
 744 
 745     @Override
 746     public int getRemoteRegionRegistryFetchInterval() {
 747         return remoteRegionRegistryFetchInterval;
 748     }
 749 
 750     public void setRemoteRegionRegistryFetchInterval(
 751             int remoteRegionRegistryFetchInterval) {
 752         this.remoteRegionRegistryFetchInterval = remoteRegionRegistryFetchInterval;
 753     }
 754 
 755     @Override
 756     public int getRemoteRegionFetchThreadPoolSize() {
 757         return remoteRegionFetchThreadPoolSize;
 758     }
 759 
 760     public void setRemoteRegionFetchThreadPoolSize(int remoteRegionFetchThreadPoolSize) {
 761         this.remoteRegionFetchThreadPoolSize = remoteRegionFetchThreadPoolSize;
 762     }
 763 
 764     @Override
 765     public String getRemoteRegionTrustStore() {
 766         return remoteRegionTrustStore;
 767     }
 768 
 769     public void setRemoteRegionTrustStore(String remoteRegionTrustStore) {
 770         this.remoteRegionTrustStore = remoteRegionTrustStore;
 771     }
 772 
 773     @Override
 774     public String getRemoteRegionTrustStorePassword() {
 775         return remoteRegionTrustStorePassword;
 776     }
 777 
 778     public void setRemoteRegionTrustStorePassword(String remoteRegionTrustStorePassword) {
 779         this.remoteRegionTrustStorePassword = remoteRegionTrustStorePassword;
 780     }
 781 
 782     public boolean isDisableTransparentFallbackToOtherRegion() {
 783         return disableTransparentFallbackToOtherRegion;
 784     }
 785 
 786     public void setDisableTransparentFallbackToOtherRegion(
 787             boolean disableTransparentFallbackToOtherRegion) {
 788         this.disableTransparentFallbackToOtherRegion = disableTransparentFallbackToOtherRegion;
 789     }
 790 
 791     public boolean isBatchReplication() {
 792         return batchReplication;
 793     }
 794 
 795     public void setBatchReplication(boolean batchReplication) {
 796         this.batchReplication = batchReplication;
 797     }
 798 
 799     @Override
 800     public boolean isRateLimiterEnabled() {
 801         return rateLimiterEnabled;
 802     }
 803 
 804     public void setRateLimiterEnabled(boolean rateLimiterEnabled) {
 805         this.rateLimiterEnabled = rateLimiterEnabled;
 806     }
 807 
 808     @Override
 809     public boolean isRateLimiterThrottleStandardClients() {
 810         return rateLimiterThrottleStandardClients;
 811     }
 812 
 813     public void setRateLimiterThrottleStandardClients(
 814             boolean rateLimiterThrottleStandardClients) {
 815         this.rateLimiterThrottleStandardClients = rateLimiterThrottleStandardClients;
 816     }
 817 
 818     @Override
 819     public Set<String> getRateLimiterPrivilegedClients() {
 820         return rateLimiterPrivilegedClients;
 821     }
 822 
 823     public void setRateLimiterPrivilegedClients(
 824             Set<String> rateLimiterPrivilegedClients) {
 825         this.rateLimiterPrivilegedClients = rateLimiterPrivilegedClients;
 826     }
 827 
 828     @Override
 829     public int getRateLimiterBurstSize() {
 830         return rateLimiterBurstSize;
 831     }
 832 
 833     public void setRateLimiterBurstSize(int rateLimiterBurstSize) {
 834         this.rateLimiterBurstSize = rateLimiterBurstSize;
 835     }
 836 
 837     @Override
 838     public int getRateLimiterRegistryFetchAverageRate() {
 839         return rateLimiterRegistryFetchAverageRate;
 840     }
 841 
 842     public void setRateLimiterRegistryFetchAverageRate(
 843             int rateLimiterRegistryFetchAverageRate) {
 844         this.rateLimiterRegistryFetchAverageRate = rateLimiterRegistryFetchAverageRate;
 845     }
 846 
 847     @Override
 848     public int getRateLimiterFullFetchAverageRate() {
 849         return rateLimiterFullFetchAverageRate;
 850     }
 851 
 852     public void setRateLimiterFullFetchAverageRate(int rateLimiterFullFetchAverageRate) {
 853         this.rateLimiterFullFetchAverageRate = rateLimiterFullFetchAverageRate;
 854     }
 855 
 856     public boolean isLogIdentityHeaders() {
 857         return logIdentityHeaders;
 858     }
 859 
 860     public void setLogIdentityHeaders(boolean logIdentityHeaders) {
 861         this.logIdentityHeaders = logIdentityHeaders;
 862     }
 863 
 864     @Override
 865     public String getListAutoScalingGroupsRoleName() {
 866         return listAutoScalingGroupsRoleName;
 867     }
 868 
 869     public void setListAutoScalingGroupsRoleName(String listAutoScalingGroupsRoleName) {
 870         this.listAutoScalingGroupsRoleName = listAutoScalingGroupsRoleName;
 871     }
 872 
 873     public boolean isEnableReplicatedRequestCompression() {
 874         return enableReplicatedRequestCompression;
 875     }
 876 
 877     public void setEnableReplicatedRequestCompression(
 878             boolean enableReplicatedRequestCompression) {
 879         this.enableReplicatedRequestCompression = enableReplicatedRequestCompression;
 880     }
 881 
 882     public void setJsonCodecName(String jsonCodecName) {
 883         this.jsonCodecName = jsonCodecName;
 884     }
 885 
 886     public void setXmlCodecName(String xmlCodecName) {
 887         this.xmlCodecName = xmlCodecName;
 888     }
 889 
 890     @Override
 891     public int getRoute53BindRebindRetries() {
 892         return route53BindRebindRetries;
 893     }
 894 
 895     public void setRoute53BindRebindRetries(int route53BindRebindRetries) {
 896         this.route53BindRebindRetries = route53BindRebindRetries;
 897     }
 898 
 899     @Override
 900     public int getRoute53BindingRetryIntervalMs() {
 901         return route53BindingRetryIntervalMs;
 902     }
 903 
 904     public void setRoute53BindingRetryIntervalMs(int route53BindingRetryIntervalMs) {
 905         this.route53BindingRetryIntervalMs = route53BindingRetryIntervalMs;
 906     }
 907 
 908     @Override
 909     public long getRoute53DomainTTL() {
 910         return route53DomainTTL;
 911     }
 912 
 913     public void setRoute53DomainTTL(long route53DomainTTL) {
 914         this.route53DomainTTL = route53DomainTTL;
 915     }
 916 
 917     @Override
 918     public AwsBindingStrategy getBindingStrategy() {
 919         return bindingStrategy;
 920     }
 921 
 922     public void setBindingStrategy(AwsBindingStrategy bindingStrategy) {
 923         this.bindingStrategy = bindingStrategy;
 924     }
 925 
 926     public int getMinAvailableInstancesForPeerReplication() {
 927         return minAvailableInstancesForPeerReplication;
 928     }
 929 
 930     public void setMinAvailableInstancesForPeerReplication(
 931             int minAvailableInstancesForPeerReplication) {
 932         this.minAvailableInstancesForPeerReplication = minAvailableInstancesForPeerReplication;
 933     }
 934 
 935     @Override
 936     public boolean equals(Object o) {
 937         if (this == o) {
 938             return true;
 939         }
 940         if (o == null || getClass() != o.getClass()) {
 941             return false;
 942         }
 943         EurekaServerConfigBean that = (EurekaServerConfigBean) o;
 944         return aSGCacheExpiryTimeoutMs == that.aSGCacheExpiryTimeoutMs
 945                 && aSGQueryTimeoutMs == that.aSGQueryTimeoutMs
 946                 && aSGUpdateIntervalMs == that.aSGUpdateIntervalMs
 947                 && Objects.equals(aWSAccessId, that.aWSAccessId)
 948                 && Objects.equals(aWSSecretKey, that.aWSSecretKey)
 949                 && batchReplication == that.batchReplication
 950                 && bindingStrategy == that.bindingStrategy
 951                 && deltaRetentionTimerIntervalInMs == that.deltaRetentionTimerIntervalInMs
 952                 && disableDelta == that.disableDelta
 953                 && disableDeltaForRemoteRegions == that.disableDeltaForRemoteRegions
 954                 && disableTransparentFallbackToOtherRegion == that.disableTransparentFallbackToOtherRegion
 955                 && eIPBindingRetryIntervalMs == that.eIPBindingRetryIntervalMs
 956                 && eIPBindingRetryIntervalMsWhenUnbound == that.eIPBindingRetryIntervalMsWhenUnbound
 957                 && eIPBindRebindRetries == that.eIPBindRebindRetries
 958                 && enableReplicatedRequestCompression == that.enableReplicatedRequestCompression
 959                 && enableSelfPreservation == that.enableSelfPreservation
 960                 && evictionIntervalTimerInMs == that.evictionIntervalTimerInMs
 961                 && gZipContentFromRemoteRegion == that.gZipContentFromRemoteRegion
 962                 && Objects.equals(jsonCodecName, that.jsonCodecName)
 963                 && Objects.equals(listAutoScalingGroupsRoleName,
 964                         that.listAutoScalingGroupsRoleName)
 965                 && logIdentityHeaders == that.logIdentityHeaders
 966                 && maxElementsInPeerReplicationPool == that.maxElementsInPeerReplicationPool
 967                 && maxElementsInStatusReplicationPool == that.maxElementsInStatusReplicationPool
 968                 && maxIdleThreadAgeInMinutesForPeerReplication == that.maxIdleThreadAgeInMinutesForPeerReplication
 969                 && maxIdleThreadInMinutesAgeForStatusReplication == that.maxIdleThreadInMinutesAgeForStatusReplication
 970                 && maxThreadsForPeerReplication == that.maxThreadsForPeerReplication
 971                 && maxThreadsForStatusReplication == that.maxThreadsForStatusReplication
 972                 && maxTimeForReplication == that.maxTimeForReplication
 973                 && minAvailableInstancesForPeerReplication == that.minAvailableInstancesForPeerReplication
 974                 && minThreadsForPeerReplication == that.minThreadsForPeerReplication
 975                 && minThreadsForStatusReplication == that.minThreadsForStatusReplication
 976                 && numberOfReplicationRetries == that.numberOfReplicationRetries
 977                 && peerEurekaNodesUpdateIntervalMs == that.peerEurekaNodesUpdateIntervalMs
 978                 && peerEurekaStatusRefreshTimeIntervalMs == that.peerEurekaStatusRefreshTimeIntervalMs
 979                 && peerNodeConnectionIdleTimeoutSeconds == that.peerNodeConnectionIdleTimeoutSeconds
 980                 && peerNodeConnectTimeoutMs == that.peerNodeConnectTimeoutMs
 981                 && peerNodeReadTimeoutMs == that.peerNodeReadTimeoutMs
 982                 && peerNodeTotalConnections == that.peerNodeTotalConnections
 983                 && peerNodeTotalConnectionsPerHost == that.peerNodeTotalConnectionsPerHost
 984                 && primeAwsReplicaConnections == that.primeAwsReplicaConnections
 985                 && Objects.equals(propertyResolver, that.propertyResolver)
 986                 && rateLimiterBurstSize == that.rateLimiterBurstSize
 987                 && rateLimiterEnabled == that.rateLimiterEnabled
 988                 && rateLimiterFullFetchAverageRate == that.rateLimiterFullFetchAverageRate
 989                 && Objects.equals(rateLimiterPrivilegedClients,
 990                         that.rateLimiterPrivilegedClients)
 991                 && rateLimiterRegistryFetchAverageRate == that.rateLimiterRegistryFetchAverageRate
 992                 && rateLimiterThrottleStandardClients == that.rateLimiterThrottleStandardClients
 993                 && registrySyncRetries == that.registrySyncRetries
 994                 && registrySyncRetryWaitMs == that.registrySyncRetryWaitMs
 995                 && Objects.equals(remoteRegionAppWhitelist, that.remoteRegionAppWhitelist)
 996                 && remoteRegionConnectionIdleTimeoutSeconds == that.remoteRegionConnectionIdleTimeoutSeconds
 997                 && remoteRegionConnectTimeoutMs == that.remoteRegionConnectTimeoutMs
 998                 && remoteRegionFetchThreadPoolSize == that.remoteRegionFetchThreadPoolSize
 999                 && remoteRegionReadTimeoutMs == that.remoteRegionReadTimeoutMs
1000                 && remoteRegionRegistryFetchInterval == that.remoteRegionRegistryFetchInterval
1001                 && remoteRegionTotalConnections == that.remoteRegionTotalConnections
1002                 && remoteRegionTotalConnectionsPerHost == that.remoteRegionTotalConnectionsPerHost
1003                 && Objects.equals(remoteRegionTrustStore, that.remoteRegionTrustStore)
1004                 && Objects.equals(remoteRegionTrustStorePassword,
1005                         that.remoteRegionTrustStorePassword)
1006                 && Arrays.equals(remoteRegionUrls, that.remoteRegionUrls)
1007                 && Objects.equals(remoteRegionUrlsWithName, that.remoteRegionUrlsWithName)
1008                 && Double.compare(that.renewalPercentThreshold,
1009                         renewalPercentThreshold) == 0
1010                 && renewalThresholdUpdateIntervalMs == that.renewalThresholdUpdateIntervalMs
1011                 && responseCacheAutoExpirationInSeconds == that.responseCacheAutoExpirationInSeconds
1012                 && responseCacheUpdateIntervalMs == that.responseCacheUpdateIntervalMs
1013                 && retentionTimeInMSInDeltaQueue == that.retentionTimeInMSInDeltaQueue
1014                 && route53BindingRetryIntervalMs == that.route53BindingRetryIntervalMs
1015                 && route53BindRebindRetries == that.route53BindRebindRetries
1016                 && route53DomainTTL == that.route53DomainTTL
1017                 && syncWhenTimestampDiffers == that.syncWhenTimestampDiffers
1018                 && useReadOnlyResponseCache == that.useReadOnlyResponseCache
1019                 && waitTimeInMsWhenSyncEmpty == that.waitTimeInMsWhenSyncEmpty
1020                 && Objects.equals(xmlCodecName, that.xmlCodecName)
1021                 && initialCapacityOfResponseCache == that.initialCapacityOfResponseCache
1022                 && expectedClientRenewalIntervalSeconds == that.expectedClientRenewalIntervalSeconds
1023                 && useAwsAsgApi == that.useAwsAsgApi && Objects.equals(myUrl, that.myUrl);
1024     }
1025 
1026     @Override
1027     public int hashCode() {
1028         return Objects.hash(aSGCacheExpiryTimeoutMs, aSGQueryTimeoutMs,
1029                 aSGUpdateIntervalMs, aWSAccessId, aWSSecretKey, batchReplication,
1030                 bindingStrategy, deltaRetentionTimerIntervalInMs, disableDelta,
1031                 disableDeltaForRemoteRegions, disableTransparentFallbackToOtherRegion,
1032                 eIPBindRebindRetries, eIPBindingRetryIntervalMs,
1033                 eIPBindingRetryIntervalMsWhenUnbound, enableReplicatedRequestCompression,
1034                 enableSelfPreservation, evictionIntervalTimerInMs,
1035                 gZipContentFromRemoteRegion, jsonCodecName, listAutoScalingGroupsRoleName,
1036                 logIdentityHeaders, maxElementsInPeerReplicationPool,
1037                 maxElementsInStatusReplicationPool,
1038                 maxIdleThreadAgeInMinutesForPeerReplication,
1039                 maxIdleThreadInMinutesAgeForStatusReplication,
1040                 maxThreadsForPeerReplication, maxThreadsForStatusReplication,
1041                 maxTimeForReplication, minAvailableInstancesForPeerReplication,
1042                 minThreadsForPeerReplication, minThreadsForStatusReplication,
1043                 numberOfReplicationRetries, peerEurekaNodesUpdateIntervalMs,
1044                 peerEurekaStatusRefreshTimeIntervalMs, peerNodeConnectTimeoutMs,
1045                 peerNodeConnectionIdleTimeoutSeconds, peerNodeReadTimeoutMs,
1046                 peerNodeTotalConnections, peerNodeTotalConnectionsPerHost,
1047                 primeAwsReplicaConnections, propertyResolver, rateLimiterBurstSize,
1048                 rateLimiterEnabled, rateLimiterFullFetchAverageRate,
1049                 rateLimiterPrivilegedClients, rateLimiterRegistryFetchAverageRate,
1050                 rateLimiterThrottleStandardClients, registrySyncRetries,
1051                 registrySyncRetryWaitMs, remoteRegionAppWhitelist,
1052                 remoteRegionConnectTimeoutMs, remoteRegionConnectionIdleTimeoutSeconds,
1053                 remoteRegionFetchThreadPoolSize, remoteRegionReadTimeoutMs,
1054                 remoteRegionRegistryFetchInterval, remoteRegionTotalConnections,
1055                 remoteRegionTotalConnectionsPerHost, remoteRegionTrustStore,
1056                 remoteRegionTrustStorePassword, remoteRegionUrls,
1057                 remoteRegionUrlsWithName, renewalPercentThreshold,
1058                 renewalThresholdUpdateIntervalMs, responseCacheAutoExpirationInSeconds,
1059                 responseCacheUpdateIntervalMs, retentionTimeInMSInDeltaQueue,
1060                 route53BindRebindRetries, route53BindingRetryIntervalMs, route53DomainTTL,
1061                 syncWhenTimestampDiffers, useReadOnlyResponseCache,
1062                 waitTimeInMsWhenSyncEmpty, xmlCodecName, initialCapacityOfResponseCache,
1063                 expectedClientRenewalIntervalSeconds, useAwsAsgApi, myUrl);
1064     }
1065 
1066     @Override
1067     public String toString() {
1068         return new ToStringCreator(this)
1069                 .append("aSGCacheExpiryTimeoutMs", this.aSGCacheExpiryTimeoutMs)
1070                 .append("aSGQueryTimeoutMs", this.aSGQueryTimeoutMs)
1071                 .append("aSGUpdateIntervalMs", this.aSGUpdateIntervalMs)
1072                 .append("aWSAccessId", this.aWSAccessId)
1073                 .append("aWSSecretKey", this.aWSSecretKey)
1074                 .append("batchReplication", this.batchReplication)
1075                 .append("bindingStrategy", this.bindingStrategy)
1076                 .append("deltaRetentionTimerIntervalInMs",
1077                         this.deltaRetentionTimerIntervalInMs)
1078                 .append("disableDelta", this.disableDelta)
1079                 .append("disableDeltaForRemoteRegions", this.disableDeltaForRemoteRegions)
1080                 .append("disableTransparentFallbackToOtherRegion",
1081                         this.disableTransparentFallbackToOtherRegion)
1082                 .append("eIPBindRebindRetries", this.eIPBindRebindRetries)
1083                 .append("eIPBindingRetryIntervalMs", this.eIPBindingRetryIntervalMs)
1084                 .append("eIPBindingRetryIntervalMsWhenUnbound",
1085                         this.eIPBindingRetryIntervalMsWhenUnbound)
1086                 .append("enableReplicatedRequestCompression",
1087                         this.enableReplicatedRequestCompression)
1088                 .append("enableSelfPreservation", this.enableSelfPreservation)
1089                 .append("evictionIntervalTimerInMs", this.evictionIntervalTimerInMs)
1090                 .append("gZipContentFromRemoteRegion", this.gZipContentFromRemoteRegion)
1091                 .append("jsonCodecName", this.jsonCodecName)
1092                 .append("listAutoScalingGroupsRoleName",
1093                         this.listAutoScalingGroupsRoleName)
1094                 .append("logIdentityHeaders", this.logIdentityHeaders)
1095                 .append("maxElementsInPeerReplicationPool",
1096                         this.maxElementsInPeerReplicationPool)
1097                 .append("maxElementsInStatusReplicationPool",
1098                         this.maxElementsInStatusReplicationPool)
1099                 .append("maxIdleThreadAgeInMinutesForPeerReplication",
1100                         this.maxIdleThreadAgeInMinutesForPeerReplication)
1101                 .append("maxIdleThreadInMinutesAgeForStatusReplication",
1102                         this.maxIdleThreadInMinutesAgeForStatusReplication)
1103                 .append("maxThreadsForPeerReplication", this.maxThreadsForPeerReplication)
1104                 .append("maxThreadsForStatusReplication",
1105                         this.maxThreadsForStatusReplication)
1106                 .append("maxTimeForReplication", this.maxTimeForReplication)
1107                 .append("minAvailableInstancesForPeerReplication",
1108                         this.minAvailableInstancesForPeerReplication)
1109                 .append("minThreadsForPeerReplication", this.minThreadsForPeerReplication)
1110                 .append("minThreadsForStatusReplication",
1111                         this.minThreadsForStatusReplication)
1112                 .append("numberOfReplicationRetries", this.numberOfReplicationRetries)
1113                 .append("peerEurekaNodesUpdateIntervalMs",
1114                         this.peerEurekaNodesUpdateIntervalMs)
1115                 .append("peerEurekaStatusRefreshTimeIntervalMs",
1116                         this.peerEurekaStatusRefreshTimeIntervalMs)
1117                 .append("peerNodeConnectTimeoutMs", this.peerNodeConnectTimeoutMs)
1118                 .append("peerNodeConnectionIdleTimeoutSeconds",
1119                         this.peerNodeConnectionIdleTimeoutSeconds)
1120                 .append("peerNodeReadTimeoutMs", this.peerNodeReadTimeoutMs)
1121                 .append("peerNodeTotalConnections", this.peerNodeTotalConnections)
1122                 .append("peerNodeTotalConnectionsPerHost",
1123                         this.peerNodeTotalConnectionsPerHost)
1124                 .append("primeAwsReplicaConnections", this.primeAwsReplicaConnections)
1125                 .append("propertyResolver", this.propertyResolver)
1126                 .append("rateLimiterBurstSize", this.rateLimiterBurstSize)
1127                 .append("rateLimiterEnabled", this.rateLimiterEnabled)
1128                 .append("rateLimiterFullFetchAverageRate",
1129                         this.rateLimiterFullFetchAverageRate)
1130                 .append("rateLimiterPrivilegedClients", this.rateLimiterPrivilegedClients)
1131                 .append("rateLimiterRegistryFetchAverageRate",
1132                         this.rateLimiterRegistryFetchAverageRate)
1133                 .append("rateLimiterThrottleStandardClients",
1134                         this.rateLimiterThrottleStandardClients)
1135                 .append("registrySyncRetries", this.registrySyncRetries)
1136                 .append("registrySyncRetryWaitMs", this.registrySyncRetryWaitMs)
1137                 .append("remoteRegionAppWhitelist", this.remoteRegionAppWhitelist)
1138                 .append("remoteRegionConnectTimeoutMs", this.remoteRegionConnectTimeoutMs)
1139                 .append("remoteRegionConnectionIdleTimeoutSeconds",
1140                         this.remoteRegionConnectionIdleTimeoutSeconds)
1141                 .append("remoteRegionFetchThreadPoolSize",
1142                         this.remoteRegionFetchThreadPoolSize)
1143                 .append("remoteRegionReadTimeoutMs", this.remoteRegionReadTimeoutMs)
1144                 .append("remoteRegionRegistryFetchInterval",
1145                         this.remoteRegionRegistryFetchInterval)
1146                 .append("remoteRegionTotalConnections", this.remoteRegionTotalConnections)
1147                 .append("remoteRegionTotalConnectionsPerHost",
1148                         this.remoteRegionTotalConnectionsPerHost)
1149                 .append("remoteRegionTrustStore", this.remoteRegionTrustStore)
1150                 .append("remoteRegionTrustStorePassword",
1151                         this.remoteRegionTrustStorePassword)
1152                 .append("remoteRegionUrls", this.remoteRegionUrls)
1153                 .append("remoteRegionUrlsWithName", this.remoteRegionUrlsWithName)
1154                 .append("renewalPercentThreshold", this.renewalPercentThreshold)
1155                 .append("renewalThresholdUpdateIntervalMs",
1156                         this.renewalThresholdUpdateIntervalMs)
1157                 .append("responseCacheAutoExpirationInSeconds",
1158                         this.responseCacheAutoExpirationInSeconds)
1159                 .append("responseCacheUpdateIntervalMs",
1160                         this.responseCacheUpdateIntervalMs)
1161                 .append("retentionTimeInMSInDeltaQueue",
1162                         this.retentionTimeInMSInDeltaQueue)
1163                 .append("route53BindRebindRetries", this.route53BindRebindRetries)
1164                 .append("route53BindingRetryIntervalMs",
1165                         this.route53BindingRetryIntervalMs)
1166                 .append("route53DomainTTL", this.route53DomainTTL)
1167                 .append("syncWhenTimestampDiffers", this.syncWhenTimestampDiffers)
1168                 .append("useReadOnlyResponseCache", this.useReadOnlyResponseCache)
1169                 .append("waitTimeInMsWhenSyncEmpty", this.waitTimeInMsWhenSyncEmpty)
1170                 .append("xmlCodecName", this.xmlCodecName)
1171                 .append("initialCapacityOfResponseCache",
1172                         this.initialCapacityOfResponseCache)
1173                 .append("expectedClientRenewalIntervalSeconds",
1174                         this.expectedClientRenewalIntervalSeconds)
1175                 .append("useAwsAsgApi", this.useAwsAsgApi).append("myUrl", this.myUrl)
1176                 .toString();
1177     }
1178 
1179 }

View Code

③ Eureka Server 初始化

還可以看到 EurekaServerAutoConfiguration 導入了 EurekaServerInitializerConfiguration 的初始化配置類,它啟動了一個後台執行緒來初始化 eurekaServerBootstrap,進入可以看到跟 EurekaBootStrap 的初始化是類似的,只不過是簡化了些,就不在展示了。

 1 public void start() {
 2     new Thread(() -> {
 3         try {
 4             // TODO: is this class even needed now?
 5             eurekaServerBootstrap.contextInitialized(
 6                     EurekaServerInitializerConfiguration.this.servletContext);
 7             log.info("Started Eureka Server");
 8 
 9             publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
10             EurekaServerInitializerConfiguration.this.running = true;
11             publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
12         }
13         catch (Exception ex) {
14             // Help!
15             log.error("Could not initialize Eureka servlet context", ex);
16         }
17     }).start();
18 }

View Code

2、spring-cloud-starter-netflix-eureka-client

① Eureka Client 自動化配置

Eureka Client 自動化配置類是 EurekaClientAutoConfiguration(@EnableEurekaClient 註解感覺沒啥用),這裡初始化類里主要初始化了 ApplicationInfoManager、EurekaClientConfigBean、EurekaInstanceConfigBean、EurekaClient 等。

需要注意,在 springcloud 中,EurekaClient 組件默認是 CloudEurekaClient;

  1 @Configuration(proxyBeanMethods = false)
  2 @EnableConfigurationProperties
  3 @ConditionalOnClass(EurekaClientConfig.class)
  4 @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
  5 @ConditionalOnDiscoveryEnabled
  6 @AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
  7         CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
  8 @AutoConfigureAfter(name = {
  9         "org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration",
 10         "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
 11         "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
 12         "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
 13 public class EurekaClientAutoConfiguration {
 14 
 15     private ConfigurableEnvironment env;
 16 
 17     public EurekaClientAutoConfiguration(ConfigurableEnvironment env) {
 18         this.env = env;
 19     }
 20 
 21     @Bean
 22     public HasFeatures eurekaFeature() {
 23         return HasFeatures.namedFeature("Eureka Client", EurekaClient.class);
 24     }
 25 
 26     @Bean
 27     @ConditionalOnMissingBean(value = EurekaClientConfig.class,
 28             search = SearchStrategy.CURRENT)
 29     public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
 30         return new EurekaClientConfigBean();
 31     }
 32 
 33     @Bean
 34     @ConditionalOnMissingBean
 35     public ManagementMetadataProvider serviceManagementMetadataProvider() {
 36         return new DefaultManagementMetadataProvider();
 37     }
 38 
 39     private String getProperty(String property) {
 40         return this.env.containsProperty(property) ? this.env.getProperty(property) : "";
 41     }
 42 
 43     @Bean
 44     @ConditionalOnMissingBean(value = EurekaInstanceConfig.class,
 45             search = SearchStrategy.CURRENT)
 46     public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,
 47             ManagementMetadataProvider managementMetadataProvider) {
 48         String hostname = getProperty("eureka.instance.hostname");
 49         boolean preferIpAddress = Boolean
 50                 .parseBoolean(getProperty("eureka.instance.prefer-ip-address"));
 51         String ipAddress = getProperty("eureka.instance.ip-address");
 52         boolean isSecurePortEnabled = Boolean
 53                 .parseBoolean(getProperty("eureka.instance.secure-port-enabled"));
 54 
 55         String serverContextPath = env.getProperty("server.servlet.context-path", "/");
 56         int serverPort = Integer.parseInt(
 57                 env.getProperty("server.port", env.getProperty("port", "8080")));
 58 
 59         Integer managementPort = env.getProperty("management.server.port", Integer.class);
 60         String managementContextPath = env
 61                 .getProperty("management.server.servlet.context-path");
 62         Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port",
 63                 Integer.class);
 64         EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);
 65 
 66         instance.setNonSecurePort(serverPort);
 67         instance.setInstanceId(getDefaultInstanceId(env));
 68         instance.setPreferIpAddress(preferIpAddress);
 69         instance.setSecurePortEnabled(isSecurePortEnabled);
 70         if (StringUtils.hasText(ipAddress)) {
 71             instance.setIpAddress(ipAddress);
 72         }
 73 
 74         if (isSecurePortEnabled) {
 75             instance.setSecurePort(serverPort);
 76         }
 77 
 78         if (StringUtils.hasText(hostname)) {
 79             instance.setHostname(hostname);
 80         }
 81         String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path");
 82         String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path");
 83 
 84         if (StringUtils.hasText(statusPageUrlPath)) {
 85             instance.setStatusPageUrlPath(statusPageUrlPath);
 86         }
 87         if (StringUtils.hasText(healthCheckUrlPath)) {
 88             instance.setHealthCheckUrlPath(healthCheckUrlPath);
 89         }
 90 
 91         ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort,
 92                 serverContextPath, managementContextPath, managementPort);
 93 
 94         if (metadata != null) {
 95             instance.setStatusPageUrl(metadata.getStatusPageUrl());
 96             instance.setHealthCheckUrl(metadata.getHealthCheckUrl());
 97             if (instance.isSecurePortEnabled()) {
 98                 instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl());
 99             }
100             Map<String, String> metadataMap = instance.getMetadataMap();
101             metadataMap.computeIfAbsent("management.port",
102                     k -> String.valueOf(metadata.getManagementPort()));
103         }
104         else {
105             // without the metadata the status and health check URLs will not be set
106             // and the status page and health check url paths will not include the
107             // context path so set them here
108             if (StringUtils.hasText(managementContextPath)) {
109                 instance.setHealthCheckUrlPath(
110                         managementContextPath + instance.getHealthCheckUrlPath());
111                 instance.setStatusPageUrlPath(
112                         managementContextPath + instance.getStatusPageUrlPath());
113             }
114         }
115 
116         setupJmxPort(instance, jmxPort);
117         return instance;
118     }
119 
120     private void setupJmxPort(EurekaInstanceConfigBean instance, Integer jmxPort) {
121         Map<String, String> metadataMap = instance.getMetadataMap();
122         if (metadataMap.get("jmx.port") == null && jmxPort != null) {
123             metadataMap.put("jmx.port", String.valueOf(jmxPort));
124         }
125     }
126 
127     @Bean
128     public EurekaServiceRegistry eurekaServiceRegistry() {
129         return new EurekaServiceRegistry();
130     }
131 
132     // @Bean
133     // @ConditionalOnBean(AutoServiceRegistrationProperties.class)
134     // @ConditionalOnProperty(value =
135     // "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
136     // public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
137     // CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager
138     // applicationInfoManager, ObjectProvider<HealthCheckHandler> healthCheckHandler) {
139     // return EurekaRegistration.builder(instanceConfig)
140     // .with(applicationInfoManager)
141     // .with(eurekaClient)
142     // .with(healthCheckHandler)
143     // .build();
144     // }
145 
146     @Bean
147     @ConditionalOnBean(AutoServiceRegistrationProperties.class)
148     @ConditionalOnProperty(
149             value = "spring.cloud.service-registry.auto-registration.enabled",
150             matchIfMissing = true)
151     public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(
152             ApplicationContext context, EurekaServiceRegistry registry,
153             EurekaRegistration registration) {
154         return new EurekaAutoServiceRegistration(context, registry, registration);
155     }
156 
157     @Configuration(proxyBeanMethods = false)
158     @ConditionalOnMissingRefreshScope
159     protected static class EurekaClientConfiguration {
160 
161         @Autowired
162         private ApplicationContext context;
163 
164         @Autowired
165         private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;
166 
167         @Bean(destroyMethod = "shutdown")
168         @ConditionalOnMissingBean(value = EurekaClient.class,
169                 search = SearchStrategy.CURRENT)
170         public EurekaClient eurekaClient(ApplicationInfoManager manager,
171                 EurekaClientConfig config) {
172             return new CloudEurekaClient(manager, config, this.optionalArgs,
173                     this.context);
174         }
175 
176         @Bean
177         @ConditionalOnMissingBean(value = ApplicationInfoManager.class,
178                 search = SearchStrategy.CURRENT)
179         public ApplicationInfoManager eurekaApplicationInfoManager(
180                 EurekaInstanceConfig config) {
181             InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
182             return new ApplicationInfoManager(config, instanceInfo);
183         }
184 
185         @Bean
186         @ConditionalOnBean(AutoServiceRegistrationProperties.class)
187         @ConditionalOnProperty(
188                 value = "spring.cloud.service-registry.auto-registration.enabled",
189                 matchIfMissing = true)
190         public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
191                 CloudEurekaInstanceConfig instanceConfig,
192                 ApplicationInfoManager applicationInfoManager, @Autowired(
193                         required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
194             return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager)
195                     .with(eurekaClient).with(healthCheckHandler).build();
196         }
197 
198     }
199 
200     @Configuration(proxyBeanMethods = false)
201     @ConditionalOnRefreshScope
202     protected static class RefreshableEurekaClientConfiguration {
203 
204         @Autowired
205         private ApplicationContext context;
206 
207         @Autowired
208         private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;
209 
210         @Bean(destroyMethod = "shutdown")
211         @ConditionalOnMissingBean(value = EurekaClient.class,
212                 search = SearchStrategy.CURRENT)
213         @org.springframework.cloud.context.config.annotation.RefreshScope
214         @Lazy
215         public EurekaClient eurekaClient(ApplicationInfoManager manager,
216                 EurekaClientConfig config, EurekaInstanceConfig instance,
217                 @Autowired(required = false) HealthCheckHandler healthCheckHandler) {
218             // If we use the proxy of the ApplicationInfoManager we could run into a
219             // problem
220             // when shutdown is called on the CloudEurekaClient where the
221             // ApplicationInfoManager bean is
222             // requested but wont be allowed because we are shutting down. To avoid this
223             // we use the
224             // object directly.
225             ApplicationInfoManager appManager;
226             if (AopUtils.isAopProxy(manager)) {
227                 appManager = ProxyUtils.getTargetObject(manager);
228             }
229             else {
230                 appManager = manager;
231             }
232             CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,
233                     config, this.optionalArgs, this.context);
234             cloudEurekaClient.registerHealthCheck(healthCheckHandler);
235             return cloudEurekaClient;
236         }
237 
238         @Bean
239         @ConditionalOnMissingBean(value = ApplicationInfoManager.class,
240                 search = SearchStrategy.CURRENT)
241         @org.springframework.cloud.context.config.annotation.RefreshScope
242         @Lazy
243         public ApplicationInfoManager eurekaApplicationInfoManager(
244                 EurekaInstanceConfig config) {
245             InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
246             return new ApplicationInfoManager(config, instanceInfo);
247         }
248 
249         @Bean
250         @org.springframework.cloud.context.config.annotation.RefreshScope
251         @ConditionalOnBean(AutoServiceRegistrationProperties.class)
252         @ConditionalOnProperty(
253                 value = "spring.cloud.service-registry.auto-registration.enabled",
254                 matchIfMissing = true)
255         public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
256                 CloudEurekaInstanceConfig instanceConfig,
257                 ApplicationInfoManager applicationInfoManager, @Autowired(
258                         required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
259             return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager)
260                     .with(eurekaClient).with(healthCheckHandler).build();
261         }
262 
263     }
264 
265     @Target({ ElementType.TYPE, ElementType.METHOD })
266     @Retention(RetentionPolicy.RUNTIME)
267     @Documented
268     @Conditional(OnMissingRefreshScopeCondition.class)
269     @interface ConditionalOnMissingRefreshScope {
270 
271     }
272 
273     @Target({ ElementType.TYPE, ElementType.METHOD })
274     @Retention(RetentionPolicy.RUNTIME)
275     @Documented
276     @ConditionalOnClass(RefreshScope.class)
277     @ConditionalOnBean(RefreshAutoConfiguration.class)
278     @ConditionalOnProperty(value = "eureka.client.refresh.enable", havingValue = "true",
279             matchIfMissing = true)
280     @interface ConditionalOnRefreshScope {
281 
282     }
283 
284     private static class OnMissingRefreshScopeCondition extends AnyNestedCondition {
285 
286         OnMissingRefreshScopeCondition() {
287             super(ConfigurationPhase.REGISTER_BEAN);
288         }
289 
290         @ConditionalOnMissingClass("org.springframework.cloud.context.scope.refresh.RefreshScope")
291         static class MissingClass {
292 
293         }
294 
295         @ConditionalOnMissingBean(RefreshAutoConfiguration.class)
296         static class MissingScope {
297 
298         }
299 
300         @ConditionalOnProperty(value = "eureka.client.refresh.enable",
301                 havingValue = "false")
302         static class OnPropertyDisabled {
303 
304         }
305 
306     }
307 
308     @Configuration(proxyBeanMethods = false)
309     @ConditionalOnClass(Health.class)
310     protected static class EurekaHealthIndicatorConfiguration {
311 
312         @Bean
313         @ConditionalOnMissingBean
314         @ConditionalOnEnabledHealthIndicator("eureka")
315         public EurekaHealthIndicator eurekaHealthIndicator(EurekaClient eurekaClient,
316                 EurekaInstanceConfig instanceConfig, EurekaClientConfig clientConfig) {
317             return new EurekaHealthIndicator(eurekaClient, instanceConfig, clientConfig);
318         }
319 
320     }
321 
322 }

View Code

② Eureka Client 註冊

Netflix 中服務註冊的邏輯是在 InstanceInfoReplicator,springcloud 則封裝到了 EurekaAutoServiceRegistration,InstanceInfoReplicator 啟動之後要延遲40秒才會註冊到註冊中心,而這裡的自動化配置在服務啟動時就會註冊到註冊中心。

它這裡調用了 serviceRegistry 來註冊,進去可以發現它就是調用了 ApplicationInfoManager 的 setInstanceStatus 方法,進而觸發了那個狀態變更器 StatusChangeListener,然後向註冊中心註冊。

 1 public void start() {
 2     // only set the port if the nonSecurePort or securePort is 0 and this.port != 0
 3     if (this.port.get() != 0) {
 4         if (this.registration.getNonSecurePort() == 0) {
 5             this.registration.setNonSecurePort(this.port.get());
 6         }
 7 
 8         if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
 9             this.registration.setSecurePort(this.port.get());
10         }
11     }
12 
13     // only initialize if nonSecurePort is greater than 0 and it isn't already running
14     // because of containerPortInitializer below
15     if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
16 
17         this.serviceRegistry.register(this.registration);
18 
19         this.context.publishEvent(new InstanceRegisteredEvent<>(this,
20                 this.registration.getInstanceConfig()));
21         this.running.set(true);
22     }
23 }

View Code

十三、Eureka 總結

這一節來對 eureka 的學習做個總結,注意下面的一些截圖是來自《重新定義Spring Cloud實戰 》,具體可以參考原著。

1、Eureka Server 提供的 API

大部分的 API 在前面的源碼分析中都已經接觸過了,這裡看下 eureka 提供的 API列表。注意 eureka 整合到 springcloud 之後,api 前綴固定為 /eureka,在 netflix 中是 /{version} 的形式。

2、Eureka Client 核心參數

Eureka Client 的參數可以分為基本參數、定時任務參數、http參數三大類。

① 基本參數

② 定時任務參數

③ http 參數

3、Eureka Server 核心參數

Eureka Server 的參數可以分為 基本參數、多級快取參數、集群相關參數、http參數 四大類。

① 基本參數

② 多級快取參數

③ 集群參數

④ http 參數

4、Eureka 核心功能

① 服務註冊和發現:eureka 分客戶端(Eureka Client)和服務端(Eureka Server),服務端即為註冊中心,提供服務註冊和發現的功能。所有客戶端將自己註冊到註冊中心上,服務端使用 Map 結構基於記憶體保存所有客戶端資訊(IP、埠、續約等資訊)。客戶端定時從註冊中心拉取註冊表到本地,就可以通過負載均衡的方式進行服務間的調用。

② 服務註冊(Register):Eureka Client 啟動時向 Eureka Server 註冊,並提供自身的元數據、IP地址、埠、狀態等資訊。

③ 服務續約(Renew):Eureka Client 默認每隔30秒向 Eureka Server 發送一次心跳進行服務續約,通過續約告知 Eureka Server 自己是正常的。如果 Eureka Server 180 秒沒有收到客戶端的續約,就會認為客戶端故障,並將其剔除。

④ 抓取註冊表(Fetch Registry):Eureka Client 啟動時會向 Eureka Server 全量抓取一次註冊表到本地,之後會每隔30秒增量抓取註冊表合併到本地註冊表。如果合併後的本地註冊表與 Eureka Server 端的註冊表不一致(hash 比對),就全量抓取註冊表覆蓋本地的註冊表。

⑤ 服務下線(Cancel):Eureka Client 程式正常關閉時,會向 Eureka Server 發送下線請求,之後 Eureka Server 將這個實例從註冊表中剔除。

⑥ 故障剔除(Eviction):默認情況下,Eureka Client 連續180秒沒有向 Eureka Server 發送續約請求,就會被認為實例故障,然後從註冊表剔除。

⑦ Eureka Server 集群:Eureka Server 採用對等複製模式(Peer to Peer)來進行副本之間的數據同步,集群中每個 Server 節點都可以接收寫操作和讀操作。Server 節點接收到寫操作後(註冊、續約、下線、狀態更新)會通過後台任務打包成批量任務發送到集群其它 Server 節點進行數據同步。Eureka Server 集群副本之間的數據會有短暫的不一致性,它是滿足 CAP 中的 AP,即 高可用性和分區容錯性。

5、Eureka 整體架構

最後用一張圖來總結下 Eureka 的整體架構、運行流程以及核心機制。

 

//www.cnblogs.com/chiangchou/p/eureka-1.html