SpringCloud 源碼系列(3)—— 註冊中心 Eureka(下)
- 2020 年 12 月 6 日
- 筆記
- Eureka, eureka server 集群, SpringCloud
SpringCloud 源碼系列(1)—— 註冊中心 Eureka(上)
SpringCloud 源碼系列(2)—— 註冊中心 Eureka(中)
SpringCloud 源碼系列(3)—— 註冊中心 Eureka(下)
十一、Eureka Server 集群
在實際的生產環境中,可能有幾十個或者幾百個的微服務實例,Eureka Server 承擔了非常高的負載,而且為了保證註冊中心高可用,一般都要部署成集群的,下面就來看看 eureka server 的集群。
1、搭建 Eureka Server 集群
首先來搭建一個三個節點的 eureka-server 集群,看看效果。
① 集群配置
首先在本地 hosts 文件中配置如下映射:
1 127.0.0.1 peer1 2 127.0.0.1 peer2 3 127.0.0.1 peer3
更改註冊中心的 application.yml 配置文件,增加三個 profile,分別對應三個 eureka-server 的客戶端配置。
eureka-server 在集群中作為客戶端就需要抓取註冊表,並配置 eureka-server 的地址。
1 spring: 2 application: 3 name: sunny-register 4 5 --- 6 spring: 7 profiles: peer1 8 server: 9 port: 8001 10 11 eureka: 12 instance: 13 hostname: peer1 14 client: 15 # 是否向註冊中心註冊自己 16 register-with-eureka: false 17 # 是否抓取註冊表 18 fetch-registry: true 19 service-url: 20 defaultZone: http://peer1:8001/eureka,//peer2:8002/eureka,//peer3:8003/eureka 21 22 23 --- 24 spring: 25 profiles: peer2 26 server: 27 port: 8002 28 29 eureka: 30 instance: 31 hostname: peer2 32 client: 33 # 是否向註冊中心註冊自己 34 register-with-eureka: false 35 # 是否抓取註冊表 36 fetch-registry: true 37 service-url: 38 defaultZone: http://peer1:8001/eureka,//peer2:8002/eureka,//peer3:8003/eureka 39 40 --- 41 spring: 42 profiles: peer3 43 server: 44 port: 8003 45 46 eureka: 47 instance: 48 hostname: peer3 49 client: 50 # 是否向註冊中心註冊自己 51 register-with-eureka: false 52 # 是否抓取註冊表 53 fetch-registry: true 54 service-url: 55 defaultZone: http://peer1:8001/eureka,//peer2:8002/eureka,//peer3:8003/eureka
② 啟動集群
分別啟動三個註冊中心,環境變數 spring.profiles.active 激活對應的集群配置。
啟動之後訪問 //peer1:8001/ 進入 peer1 這個註冊中心,就可以看到另外兩個分片 peer2、peer3,說明集群中有3個節點了。
③ 啟動客戶端
首先客戶端配置增加集群地址:
1 eureka: 2 client: 3 serviceUrl: 4 defaultZone: http://peer1:8001/eureka,//peer2:8002/eureka,//peer3:8003/eureka
啟動幾個客戶端實例,過一會 之後,會發現三個 eureka-server 上都註冊上去了:
到此 eureka-server 集群就搭建起來了,可以看到註冊中心的實例會互相同步,每隔註冊註冊都可以接收註冊、續約、下線等請求,它們是對等的。
2、Eureka Server 集群架構
一般來說,分散式系統的數據在多個副本之間的複製方式,可分為主從複製和對等複製。
① 主從複製
主從複製就是 Master-Slave 模式,即一個主副本,其它副本都為從副本。所有對數據的寫操作都提交到主副本,然後再由主副本同步到從副本。
對於主從複製模式來說,寫操作的壓力都在主副本上,它是整個系統的瓶頸,而從副本則可以幫助主副本分擔讀請求。
② 對等複製
對等複製就是 Peer to Peer 的模式,副本之間不分主從,任何副本都可以接收寫操作,每個副本之間相互進行數據更新同步。
Peer to Peer 模式每個副本之間都可以接收寫請求,不存在寫操作壓力瓶頸。但是由於每個副本都可以進行寫操作,各個副本之間的數據同步及衝突處理是一個棘手的問題。
③ Eureka Server 集群架構
Eureka Server 採用的就是 Peer to Peer 的複製模式,比如一個客戶端實例隨機向其中一個server註冊,然後它就會同步到其它節點中。
3、Eureka Server 啟動時抓取註冊表
前面已經分析過了,在 eureka server 啟動初始化的時候,即 EurekaBootStrap 初始化類,先初始化了 DiscoveryClient,DiscoveryClient 會向註冊中心全量抓取註冊表到本地。
初始化的最後調用了 registry.syncUp() 來同步註冊表,就是將 DiscoveryClient 快取的實例註冊到 eureka-server 的註冊表裡去。
需要注意的是 eureka 配置的註冊表同步重試次數默認為5,springcloud 中默認為 0,因此需要添加如下配置來開啟註冊表同步。
1 eureka: 2 server: 3 registry-sync-retries: 5
將 DiscoveryClient 本地的實例註冊到註冊表中:
4、集群節點同步
① 註冊、續約、下線
前面也分析過了,在客戶端註冊、續約、下線的時候,都會同步到集群其它節點。可以看到都調用了 replicateToPeers 方法來複制到其它集群。
1 /////////////////////// 註冊 /////////////////////// 2 public void register(final InstanceInfo info, final boolean isReplication) { 3 int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; 4 // 如果實例中沒有周期的配置,就設置為默認的 90 秒 5 if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { 6 leaseDuration = info.getLeaseInfo().getDurationInSecs(); 7 } 8 // 註冊實例 9 super.register(info, leaseDuration, isReplication); 10 // 複製到集群其它 server 節點 11 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); 12 } 13 14 15 /////////////////////// 下線 /////////////////////// 16 public boolean cancel(final String appName, final String id, 17 final boolean isReplication) { 18 if (super.cancel(appName, id, isReplication)) { 19 replicateToPeers(Action.Cancel, appName, id, null, null, isReplication); 20 21 return true; 22 } 23 return false; 24 } 25 26 27 /////////////////////// 續約 /////////////////////// 28 public boolean renew(final String appName, final String id, final boolean isReplication) { 29 // 調用父類(AbstractInstanceRegistry)的 renew 續約 30 if (super.renew(appName, id, isReplication)) { 31 // 續約完成後同步到集群其它節點 32 replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); 33 return true; 34 } 35 return false; 36 }
② 同步到其它節點
來看看 replicateToPeers 方法:
- 首先判斷 isReplication 參數,如果是集群複製操作,最近一分鐘複製次數 numberOfReplicationsLastMin + 1。isReplication 是在請求頭中指定的,請求頭為 PeerEurekaNode.HEADER_REPLICATION(x-netflix-discovery-replication)。
- 接著遍歷集群列表,複製實例操作到集群節點中。前面也分析過了,PeerEurekaNode 就代表了一個 eureka-server,PeerEurekaNodes 就代表了 eureka-server 集群。
- 複製實例操作到集群的方法 replicateInstanceActionsToPeers 就是根據不同的操作類型調用集群 PeerEurekaNode 對應的方法完成操作複製。
1 private void replicateToPeers(Action action, String appName, String id, 2 InstanceInfo info /* optional */, 3 InstanceStatus newStatus /* optional */, boolean isReplication) { 4 Stopwatch tracer = action.getTimer().start(); 5 try { 6 if (isReplication) { 7 // 如果是來自其它server節點的註冊請求,則最近一分鐘集群同步次數+1 8 numberOfReplicationsLastMin.increment(); 9 } 10 // If it is a replication already, do not replicate again as this will create a poison replication 11 if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { 12 return; 13 } 14 15 // 如果是來自客戶端的註冊請求,就同步到集群中其它server節點 16 for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { 17 // If the url represents this host, do not replicate to yourself. 18 if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { 19 continue; 20 } 21 22 replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); 23 } 24 } finally { 25 tracer.stop(); 26 } 27 }
1 private void replicateInstanceActionsToPeers(Action action, String appName, 2 String id, InstanceInfo info, InstanceStatus newStatus, 3 PeerEurekaNode node) { 4 try { 5 InstanceInfo infoFromRegistry; 6 CurrentRequestVersion.set(Version.V2); 7 switch (action) { 8 case Cancel: 9 // 下線 10 node.cancel(appName, id); 11 break; 12 case Heartbeat: 13 InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); 14 infoFromRegistry = getInstanceByAppAndId(appName, id, false); 15 // 續約 16 node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); 17 break; 18 case Register: 19 // 註冊 20 node.register(info); 21 break; 22 case StatusUpdate: 23 infoFromRegistry = getInstanceByAppAndId(appName, id, false); 24 node.statusUpdate(appName, id, newStatus, infoFromRegistry); 25 break; 26 case DeleteStatusOverride: 27 infoFromRegistry = getInstanceByAppAndId(appName, id, false); 28 node.deleteStatusOverride(appName, id, infoFromRegistry); 29 break; 30 } 31 } catch (Throwable t) { 32 logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); 33 } finally { 34 CurrentRequestVersion.remove(); 35 } 36 }
③ isReplication
PeerEurekaNode 與 eureka-server 通訊的組件是 JerseyReplicationClient,這個類重寫了 addExtraHeaders 方法,並添加了請求頭 PeerEurekaNode.HEADER_REPLICATION,設置為 true。
這樣其它 eureka-server 收到這個複製操作後,就知道是來自集群節點的同步操作,就不會再同步給其它節點了,從而避免死循環。
1 @Override 2 protected void addExtraHeaders(Builder webResource) { 3 webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true"); 4 }
十二、集群同步機制
Eureka Server 集群間同步機制還是比較複雜的,試想如果每次客戶端的請求一過來,比如註冊、心跳,然後 eureka-server 就立馬同步給集群中其它 server 節點,那 eureka-server 這種 Peer to Peer 的模式實際上就無法分擔客戶端的寫操作壓力,相當於每個 eureka-server 接收到的請求量都是一樣的。那 eureka server 為了避免這種情況,底層採用了三層隊列,加批量任務的方式來進行集群間的同步。簡單來說就是先將客戶端操作放入隊列中,然後從隊列中取出一批操作,然後將這一批操作發送給其它 Server 節點,Server節點接收到之後再將這批操作解析到本地。下面就來詳細看看是如何實現的。
1、集群節點 PeerEurekaNode
之前分析 eureka-server 啟動初始化的時候,EurekaBootStrap 初始化了代表集群的 PeerEurekaNodes,它裡面又根據配置的註冊中心地址構造了 PeerEurekaNode,集群間同步核心的組件就是這個 PeerEurekaNode 了。下面以客戶端註冊為例來看下是如何同步的。
① 註冊同步
replicateInstanceActionsToPeers 中調用了 PeerEurekaNode 的 register 方法來同步註冊操作到集群。
node.register 方法:
- 可以看到先計算了過期時間,為當前時間 + 租約間隔時間(默認90秒)
- 然後調用了 batchingDispatcher 批量任務分發器來處理任務,提交了一個 InstanceReplicationTask 的實例,其 execute 方法中調用了 replicationClient 來向這個 server 註冊同步。
1 public void register(final InstanceInfo info) throws Exception { 2 // 過期時間:當前時間 + 租約時間(默認90秒) 3 long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info); 4 batchingDispatcher.process( 5 taskId("register", info), 6 new InstanceReplicationTask(targetHost, Action.Register, info, null, true) { 7 public EurekaHttpResponse<Void> execute() { 8 return replicationClient.register(info); 9 } 10 }, 11 expiryTime 12 ); 13 }
再看下 getLeaseRenewalOf 這個方法,這裡應該是有bug的,這個方法返回的是毫秒數,可以看到它的衛語句的else部分是乘以 1000 了的,而 if 部分則沒有,返回的是 90,不過這裡 info.getLeaseInfo() 應該都不會為 null。
1 private static int getLeaseRenewalOf(InstanceInfo info) { 2 // bug : Lease.DEFAULT_DURATION_IN_SECS * 1000 3 return (info.getLeaseInfo() == null ? Lease.DEFAULT_DURATION_IN_SECS : info.getLeaseInfo().getRenewalIntervalInSecs()) * 1000; 4 }
② PeerEurekaNode 的構造
batchingDispatcher 是在 PeerEurekaNode 的構造方法中初始化的,來看下它的構造方法:
- registry:本地註冊表
- targetHost:eureka-server host
- replicationClient:基於 jersey 的集群複製客戶端通訊組件,它在請求頭中設置了 PeerEurekaNode.HEADER_REPLICATION 為 true
- serviceUrl:eureka-server 地址
- maxProcessingDelayMs:最大處理延遲毫秒數,默認為30000毫秒,即30秒,在下線的時候有用到
- batcherName:批處理器名稱
- taskProcessor:複製任務處理器,它封裝了 targetHost 和 replicationClient,主要就是 ReplicationTaskProcessor 在處理批量任務的提交
- batchingDispatcher:批量任務分發器,它會將任務打成一個批次提交到 eureka-server,避免多次請求eureka-server,註冊時就是先用這個分發器提交的任務
- nonBatchingDispatcher:非批量任務分發器,就是一個任務一個任務的提交
1 public PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, HttpReplicationClient replicationClient, EurekaServerConfig config) { 2 this(registry, targetHost, serviceUrl, replicationClient, config, BATCH_SIZE, MAX_BATCHING_DELAY_MS, RETRY_SLEEP_TIME_MS, SERVER_UNAVAILABLE_SLEEP_TIME_MS); 3 } 4 5 PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, 6 HttpReplicationClient replicationClient, EurekaServerConfig config, 7 int batchSize, long maxBatchingDelayMs, 8 long retrySleepTimeMs, long serverUnavailableSleepTimeMs) { 9 this.registry = registry; 10 // 集群節點 host 11 this.targetHost = targetHost; 12 this.replicationClient = replicationClient; 13 14 // 集群節點地址 15 this.serviceUrl = serviceUrl; 16 this.config = config; 17 // 最大延遲時間 默認30秒 18 this.maxProcessingDelayMs = config.getMaxTimeForReplication(); 19 20 // 批處理器名稱 21 String batcherName = getBatcherName(); 22 23 // 複製任務處理器 24 ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient); 25 // 批量任務分發器 26 this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher( 27 batcherName, 28 // 複製池裡最大容量,默認 10000 29 config.getMaxElementsInPeerReplicationPool(), 30 batchSize, // 250 31 // 同步使用的最大執行緒數 默認 20 32 config.getMaxThreadsForPeerReplication(), 33 maxBatchingDelayMs, // 500 34 serverUnavailableSleepTimeMs, // 1000 35 retrySleepTimeMs, // 100 36 taskProcessor 37 ); 38 // 單個任務分發器 39 this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher( 40 targetHost, 41 config.getMaxElementsInStatusReplicationPool(), 42 config.getMaxThreadsForStatusReplication(), 43 maxBatchingDelayMs, 44 serverUnavailableSleepTimeMs, 45 retrySleepTimeMs, 46 taskProcessor 47 ); 48 }
2、批量分發器 TaskDispatcher
創建 batchingDispatcher 時調用了 TaskDispatchers.createBatchingTaskDispatcher 方法創建了 batchingDispatcher。
首先看下 createBatchingTaskDispatcher 的參數及默認值,後面分析程式碼的時候會用到這些參數:
- id:批量分發器的名稱
- maxBufferSize:快取池最大數量,默認 10000
- workloadSize:工作負載數量,即一個批次最多多少任務,默認 250
- workerCount:工作者數量,這個是執行緒池執行緒工作執行緒的數量,默認20
- maxBatchingDelay:批量任務最大延遲毫秒數,默認為 500 毫秒
- congestionRetryDelayMs:阻塞重試延遲毫秒數,默認為 1000 毫秒
- networkFailureRetryMs:網路失敗重試延遲毫秒數,默認為 100 毫秒
- taskProcessor:任務處理器,即 ReplicationTaskProcessor
再看下這個方法:
- 首先創建了一個接收者執行器 AcceptorExecutor,主要的參數是快取、時間相關的
- 再創建了一個任務處理器 TaskExecutors,主要的參數是工作執行緒數、任務處理器以及接收者執行器,可以猜測這應該就是最終執行批量任務提交的執行器
- 最後創建了任務分發器 TaskDispatcher,從它的 process 方法可以看出,分發器提交的任務實際上又提交給了 AcceptorExecutor
從這裡可以知道,前面註冊時 batchingDispatcher.process() 提交的任務其實就是分發到 acceptorExecutor 這個接收者執行器了。創建的這個分發器 TaskDispatcher 主要有接收者執行器 AcceptorExecutor 和 任務處理器 TaskExecutors 這兩個組件,核心的分發功能就在這兩個組件中。
1 public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id, int maxBufferSize, int workloadSize, 2 int workerCount, long maxBatchingDelay, long congestionRetryDelayMs, 3 long networkFailureRetryMs, TaskProcessor<T> taskProcessor) { 4 // 接收者執行器 AcceptorExecutor 5 final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>( 6 id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs 7 ); 8 9 // 任務處理器 TaskExecutors, workerCount = 20 10 final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor); 11 12 return new TaskDispatcher<ID, T>() { 13 @Override 14 public void process(ID id, T task, long expiryTime) { 15 // 任務由 acceptorExecutor 處理 16 acceptorExecutor.process(id, task, expiryTime); 17 } 18 19 @Override 20 public void shutdown() { 21 acceptorExecutor.shutdown(); 22 taskExecutor.shutdown(); 23 } 24 }; 25 }
3、接收者執行器 AcceptorExecutor
先看下創建 AcceptorExecutor 的構造方法:
- 根據 congestionRetryDelayMs、networkFailureRetryMs 創建了一個時間調整器 TrafficShaper,應該主要就是用來調整補償時間的
- 然後創建了一個後台執行緒 acceptorThread,它運行的任務是 AcceptorRunner,主要就是將任務轉成批量任務的
- 最後就是註冊了一些監控統計之類的
1 AcceptorExecutor(String id, 2 int maxBufferSize, 3 int maxBatchingSize, 4 long maxBatchingDelay, 5 long congestionRetryDelayMs, 6 long networkFailureRetryMs) { 7 // 批處理器名稱 8 this.id = id; 9 // 最大緩衝數:10000 10 this.maxBufferSize = maxBufferSize; 11 // 每批最大數量:250 12 this.maxBatchingSize = maxBatchingSize; 13 // 最大延遲時間:500 ms 14 this.maxBatchingDelay = maxBatchingDelay; 15 // 時間調整器 16 // congestionRetryDelayMs 阻塞重試延遲時間,1000ms 17 // networkFailureRetryMs 網路異常重試時間,100ms 18 this.trafficShaper = new TrafficShaper(congestionRetryDelayMs, networkFailureRetryMs); 19 20 // 接收者後台處理執行緒 21 ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors"); 22 this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id); 23 this.acceptorThread.setDaemon(true); 24 this.acceptorThread.start(); 25 26 // 監控統計相關 27 final double[] percentiles = {50.0, 95.0, 99.0, 99.5}; 28 final StatsConfig statsConfig = new StatsConfig.Builder() 29 .withSampleSize(1000) 30 .withPercentiles(percentiles) 31 .withPublishStdDev(true) 32 .build(); 33 final MonitorConfig config = MonitorConfig.builder(METRIC_REPLICATION_PREFIX + "batchSize").build(); 34 this.batchSizeMetric = new StatsTimer(config, statsConfig); 35 try { 36 Monitors.registerObject(id, this); 37 } catch (Throwable e) { 38 logger.warn("Cannot register servo monitor for this object", e); 39 } 40 }
然後看看 AcceptorExecutor 的屬性,它定義了幾個隊列以及容器來處理批量任務,我們先知道有這些東西,後面再來看看都怎麼使用的。
然後可以看到 AcceptorExecutor 大量使用了並發包下的一些類,以及隊列的特性,這裡我們需要了解下這些類的特性:
- LinkedBlockingQueue:基於鏈表的單端阻塞隊列,就是隊尾入隊,隊首出隊
- Deque:雙端隊列,就是隊首、隊尾都可以入隊、出隊
- Semaphore:訊號量,需要通過 acquire(或 tryAcquire) 獲取到許可證之後才可以進入臨界區,通過 release 釋放許可證。只要能拿到許可證,Semaphore 是可以允許多個執行緒進入臨界區的。另外注意它這裡設置的許可證數量是0,說明要先調用了 release 放入一個許可證,才有可能調用 acquire 獲取到許可證。
1 // 接收任務的隊列 2 private final BlockingQueue<TaskHolder<ID, T>> acceptorQueue = new LinkedBlockingQueue<>(); 3 // 重試任務的隊列 4 private final BlockingDeque<TaskHolder<ID, T>> reprocessQueue = new LinkedBlockingDeque<>(); 5 // 後台接收者執行緒 6 private final Thread acceptorThread; 7 // 待處理任務容器 8 private final Map<ID, TaskHolder<ID, T>> pendingTasks = new HashMap<>(); 9 // 處理中的隊列 10 private final Deque<ID> processingOrder = new LinkedList<>(); 11 12 // 單項隊列請求的訊號量 13 private final Semaphore singleItemWorkRequests = new Semaphore(0); 14 // 單項任務隊列 15 private final BlockingQueue<TaskHolder<ID, T>> singleItemWorkQueue = new LinkedBlockingQueue<>(); 16 17 // 批量隊列請求的訊號量 18 private final Semaphore batchWorkRequests = new Semaphore(0); 19 // 批量任務隊列 20 private final BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue = new LinkedBlockingQueue<>(); 21 // 時間調整器 22 private final TrafficShaper trafficShaper;
TaskDispatcher 調用 acceptorExecutor.process 將任務轉給 AcceptorExecutor,可以看到就是將任務添加到接收者隊列 acceptorQueue 的隊尾了。
1 void process(ID id, T task, long expiryTime) { 2 acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime)); 3 acceptedTasks++; 4 }
4、接收者任務 AcceptorRunner
任務添加到 acceptorQueue 了,那任務在哪處理的呢?這就是在 AcceptorRunner 這個任務里去處理的了,這個任務比較複雜,我先把整個程式碼放出來,再來分析。


1 class AcceptorRunner implements Runnable { 2 @Override 3 public void run() { 4 long scheduleTime = 0; 5 while (!isShutdown.get()) { 6 try { 7 // 排出輸入隊列的任務:將 reprocessQueue、acceptorQueue 隊列的任務轉移到 pendingTasks 8 drainInputQueues(); 9 10 // 待處理的數量 11 int totalItems = processingOrder.size(); 12 13 long now = System.currentTimeMillis(); 14 if (scheduleTime < now) { 15 // 時間補償,正常情況下 transmissionDelay() 返回 0 16 scheduleTime = now + trafficShaper.transmissionDelay(); 17 } 18 if (scheduleTime <= now) { 19 // 分配批量工作任務:將 pendingTasks 的任務分一批到(最多250個) batchWorkQueue 隊列中 20 assignBatchWork(); 21 // 分配單項工作任務:pendingTasks 如果還有剩餘任務,將沒有過期的轉移到 singleItemWorkQueue 隊列中 22 assignSingleItemWork(); 23 } 24 25 // If no worker is requesting data or there is a delay injected by the traffic shaper, 26 // sleep for some time to avoid tight loop. 27 if (totalItems == processingOrder.size()) { 28 Thread.sleep(10); 29 } 30 } catch (InterruptedException ex) { 31 // Ignore 32 } catch (Throwable e) { 33 // Safe-guard, so we never exit this loop in an uncontrolled way. 34 logger.warn("Discovery AcceptorThread error", e); 35 } 36 } 37 } 38 39 private boolean isFull() { 40 // 待處理的任務 >= 10000,也就是說 pendingTasks 最多放 10000 個任務 41 return pendingTasks.size() >= maxBufferSize; 42 } 43 44 private void drainInputQueues() throws InterruptedException { 45 do { 46 // 排出 reprocessQueue,將 reprocessQueue 隊列的任務轉移到 pendingTasks 47 drainReprocessQueue(); 48 // 排出 acceptorQueue,將 acceptorQueue 隊列的任務轉移到 pendingTasks 49 drainAcceptorQueue(); 50 51 if (isShutdown.get()) { 52 break; 53 } 54 // If all queues are empty, block for a while on the acceptor queue 55 if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) { 56 // 等待任務放入 acceptorQueue,等待 10 毫秒 57 TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS); 58 if (taskHolder != null) { 59 // 放入之後 acceptorQueue、pendingTasks 就不為空了 60 appendTaskHolder(taskHolder); 61 } 62 } 63 // pendingTasks 為空、acceptorQueue 不為空、reprocessQueue不為空時,就會一直循環 64 // 如果所有任務都處理完了,reprocessQueue、acceptorQueue、pendingTasks 都是空的, 65 // 這時就會循環等待任務進入 acceptorQueue,每次等待 10 毫秒 66 } while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty()); 67 } 68 69 private void drainAcceptorQueue() { 70 while (!acceptorQueue.isEmpty()) { 71 // 將 acceptorQueue 的任務轉移到 pendingTasks 72 appendTaskHolder(acceptorQueue.poll()); 73 } 74 } 75 76 private void drainReprocessQueue() { 77 long now = System.currentTimeMillis(); 78 while (!reprocessQueue.isEmpty() && !isFull()) { 79 // 從 reprocessQueue 隊尾取出任務 80 TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast(); 81 ID id = taskHolder.getId(); 82 if (taskHolder.getExpiryTime() <= now) { 83 // 任務過期 84 expiredTasks++; 85 } else if (pendingTasks.containsKey(id)) { 86 // pendingTasks 已存在 87 overriddenTasks++; 88 } else { 89 // 將 reprocessQueue 隊列的任務放到 pendingTasks 90 pendingTasks.put(id, taskHolder); 91 // 添加到 processingOrder 隊列的頭部,reprocessQueue 是失敗重試的隊列,所以優先順序高一些 92 processingOrder.addFirst(id); 93 } 94 } 95 if (isFull()) { 96 queueOverflows += reprocessQueue.size(); 97 // pendingTasks 滿了,就清空 reprocessQueue 98 reprocessQueue.clear(); 99 } 100 } 101 102 private void appendTaskHolder(TaskHolder<ID, T> taskHolder) { 103 if (isFull()) { 104 // pendingTasks 滿了就移除一個元素 105 pendingTasks.remove(processingOrder.poll()); 106 queueOverflows++; 107 } 108 // 將 acceptorQueue 里的任務放到 pendingTasks 109 TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder); 110 if (previousTask == null) { 111 // 原本不存在,將任務ID添加到 processingOrder 隊列的最後 112 processingOrder.add(taskHolder.getId()); 113 } else { 114 // 已經存在了,就是覆蓋 115 overriddenTasks++; 116 } 117 } 118 119 void assignSingleItemWork() { 120 if (!processingOrder.isEmpty()) { 121 if (singleItemWorkRequests.tryAcquire(1)) { 122 long now = System.currentTimeMillis(); 123 while (!processingOrder.isEmpty()) { 124 ID id = processingOrder.poll(); 125 TaskHolder<ID, T> holder = pendingTasks.remove(id); 126 if (holder.getExpiryTime() > now) { 127 // 將 pendingTasks 的任務移到 singleItemWorkQueue 128 singleItemWorkQueue.add(holder); 129 return; 130 } 131 expiredTasks++; 132 } 133 singleItemWorkRequests.release(); 134 } 135 } 136 } 137 138 void assignBatchWork() { 139 // 有足夠的任務做一個批處理 140 if (hasEnoughTasksForNextBatch()) { 141 if (batchWorkRequests.tryAcquire(1)) { 142 long now = System.currentTimeMillis(); 143 // 一批任務最多 250 個 144 int len = Math.min(maxBatchingSize, processingOrder.size()); 145 List<TaskHolder<ID, T>> holders = new ArrayList<>(len); 146 // 將 pendingTasks 中的任務移動一批到 holders 中 147 // 也就是說,如果隊列中有500個任務,這一批任務最多也是250個 148 while (holders.size() < len && !processingOrder.isEmpty()) { 149 ID id = processingOrder.poll(); 150 TaskHolder<ID, T> holder = pendingTasks.remove(id); 151 if (holder.getExpiryTime() > now) { 152 holders.add(holder); 153 } else { 154 expiredTasks++; 155 } 156 } 157 if (holders.isEmpty()) { 158 batchWorkRequests.release(); 159 } else { 160 batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS); 161 // 添加到批量隊列中 162 batchWorkQueue.add(holders); 163 } 164 } 165 } 166 } 167 168 // 是否有足夠的任務做一個批處理 169 private boolean hasEnoughTasksForNextBatch() { 170 if (processingOrder.isEmpty()) { 171 return false; 172 } 173 if (pendingTasks.size() >= maxBufferSize) { 174 return true; 175 } 176 177 // 從 processingOrder 隊首取一個任務ID,然後從 pendingTasks 讀取這個任務。注意 peek() 只是取出元素,並不會移除隊首的元素 178 TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek()); 179 // 判斷任務提交到現在的時間差是否超過最大批任務延遲時間(500毫秒) 180 long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp(); 181 return delay >= maxBatchingDelay; 182 } 183 }
View Code
先看它的 run 方法:
① 隊列中的任務轉移到待處理容器中
drainInputQueues 將輸入隊列(reprocessQueue、acceptorQueue)的任務轉移到 pendingTasks 這個待處理容器中。
先是 drainReprocessQueue 將重處理隊列 reprocessQueue 中的任務轉移到 pendingTasks:
- 如果 pendingTasks 已滿(超過10000),就直接清空了 reprocessQueue。任務丟棄會不會有影響呢?
- 否則,如果 reprocessQueue 非空,就從 reprocessQueue 隊尾一個個取出來:
- 如果過期了就丟掉這個任務,說明已經超過續約周期了(90秒)。比如實例註冊,如果多次同步失敗後,然後就直接丟棄,那不是其它 server 永遠無法知道註冊的這個實例?後面再分析這個問題。
- 如果 pendingTasks 已經存在了,也丟棄這個重試任務
- 否則就添加到 pendingTasks 中,並且往 processingOrder 的頭部添加了任務ID
- 注意它這裡是從 reprocessQueue 隊尾一個個取出,放入 processingOrder 頭部,最終任務在 processingOrder 中的順序跟 reprocessQueue 是一樣的
然後是 drainAcceptorQueue 將接收者隊列 acceptorQueue 中的任務轉移到 pendingTasks:
- 只要 acceptorQueue 非空,就從隊首取出任務
- 如果 pendingTasks 已滿,則從 processingOrder 隊首取出第一個任務的ID,並從 pendingTasks 中移除這個任務
- 否則就將任務添加到 pendingTasks,如果之前不存在相同ID的任務,就將任務ID添加到 processingOrder 隊尾
- 注意它這裡是從 acceptorQueue 隊首取出任務,放到 processingOrder 隊尾,最終任務在 processingOrder 中的順序跟 acceptorQueue 是一樣的
從這段任務轉移以及後面的使用來看,processingOrder 將決定任務的處理順序,最前面的將最先處理,也說明了 reprocessQueue 的優先順序比 acceptorQueue 更高。而 pendingTasks 是一個 key-value 的隊列,便於快速通過ID讀取任務。
1 private void drainAcceptorQueue() { 2 while (!acceptorQueue.isEmpty()) { 3 // 將 acceptorQueue 的任務轉移到 pendingTasks 4 appendTaskHolder(acceptorQueue.poll()); 5 } 6 } 7 8 private void drainReprocessQueue() { 9 long now = System.currentTimeMillis(); 10 while (!reprocessQueue.isEmpty() && !isFull()) { 11 // 從 reprocessQueue 隊尾取出任務 12 TaskHolder<ID, T> taskHolder = reprocessQueue.pollLast(); 13 ID id = taskHolder.getId(); 14 if (taskHolder.getExpiryTime() <= now) { 15 // 任務過期 16 expiredTasks++; 17 } else if (pendingTasks.containsKey(id)) { 18 // pendingTasks 已存在 19 overriddenTasks++; 20 } else { 21 // 將 reprocessQueue 隊列的任務放到 pendingTasks 22 pendingTasks.put(id, taskHolder); 23 // 添加到 processingOrder 隊列的頭部,reprocessQueue 是失敗重試的隊列,所以優先順序高一些 24 processingOrder.addFirst(id); 25 } 26 } 27 if (isFull()) { 28 queueOverflows += reprocessQueue.size(); 29 // pendingTasks 滿了,就清空 reprocessQueue 30 reprocessQueue.clear(); 31 } 32 } 33 34 private void appendTaskHolder(TaskHolder<ID, T> taskHolder) { 35 if (isFull()) { 36 // pendingTasks 滿了就移除一個元素 37 pendingTasks.remove(processingOrder.poll()); 38 queueOverflows++; 39 } 40 // 將 acceptorQueue 里的任務放到 pendingTasks 41 TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder); 42 if (previousTask == null) { 43 // 原本不存在,將任務ID添加到 processingOrder 隊列的最後 44 processingOrder.add(taskHolder.getId()); 45 } else { 46 // 已經存在了,就是覆蓋 47 overriddenTasks++; 48 } 49 }
② 接下來通過 trafficShaper 獲取了一個補償時間,它主要是在發生阻塞或網路異常導致任務提交失敗後,在任務調度周期內做一個時間補償,這塊等分析到提交任務失敗的時候再回來看看。
1 long now = System.currentTimeMillis(); 2 if (scheduleTime < now) { 3 // 時間補償,正常情況下 transmissionDelay() 返回 0 4 scheduleTime = now + trafficShaper.transmissionDelay(); 5 }
③ 任務打包
接著看 assignBatchWork ,它就是將任務打包成一個批次:
- 首先調用 hasEnoughTasksForNextBatch 判斷是否有足夠的任務來打成一個批次,注意它判斷了最新提交的任務的時間是否超過了延遲時間 maxBatchingDelay(500ms),也就是說批次任務每隔500毫秒運行一次。
- 能夠打包後,要獲取 batchWorkRequests 訊號量的一個許可證,因為許可證默認數量是 0,那一定是先有地方調用了 batchWorkRequests.release() 放入許可證,否則這裡就不會打包了。
- 然後可以看出,一個批次的任務數量最多是250個
- 它從 processingOrder 的隊首取出這個批次的任務ID,並從 pendingTasks 中取出任務,如果是過期的任務就直接丟棄了。
- 然後如果這個批次並沒有任務,他才調用 batchWorkRequests.release() 釋放了許可證,否則就把這個批次任務添加到批量工作隊列 batchWorkQueue 中,注意並沒有釋放許可證。
1 void assignBatchWork() { 2 // 有足夠的任務做一個批處理 3 if (hasEnoughTasksForNextBatch()) { 4 // 獲取許可證 5 if (batchWorkRequests.tryAcquire(1)) { 6 long now = System.currentTimeMillis(); 7 // 一批任務最多 250 個 8 int len = Math.min(maxBatchingSize, processingOrder.size()); 9 List<TaskHolder<ID, T>> holders = new ArrayList<>(len); 10 // 將 pendingTasks 中的任務移動一批到 holders 中 11 // 也就是說,如果隊列中有500個任務,這一批任務最多也是250個 12 while (holders.size() < len && !processingOrder.isEmpty()) { 13 ID id = processingOrder.poll(); 14 TaskHolder<ID, T> holder = pendingTasks.remove(id); 15 if (holder.getExpiryTime() > now) { 16 holders.add(holder); 17 } else { 18 expiredTasks++; 19 } 20 } 21 if (holders.isEmpty()) { 22 batchWorkRequests.release(); 23 } else { 24 batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS); 25 // 添加到批量隊列中 26 batchWorkQueue.add(holders); 27 } 28 } 29 } 30 } 31 32 // 是否有足夠的任務做一個批處理 33 private boolean hasEnoughTasksForNextBatch() { 34 if (processingOrder.isEmpty()) { 35 return false; 36 } 37 if (pendingTasks.size() >= maxBufferSize) { 38 return true; 39 } 40 41 // 從 processingOrder 隊首取一個任務ID,然後從 pendingTasks 讀取這個任務。注意 peek() 只是取出元素,並不會移除隊首的元素 42 TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek()); 43 // 判斷任務提交到現在的時間差是否超過最大批任務延遲時間(500毫秒) 44 long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp(); 45 return delay >= maxBatchingDelay; 46 }
接著看分配單項任務的方法 assignSingleItemWork:
- 如果 processingOrder 非空且獲取到了 singleItemWorkRequests 訊號量的許可證,就將 processingOrder 隊列剩餘的任務都取出來,放入單項工作隊列 singleItemWorkQueue 中
- 也就是前面已經打了一批任務(250個)之後,processingOrder 中還有任務,就全部取出來放到 singleItemWorkQueue 隊列中
1 void assignSingleItemWork() { 2 if (!processingOrder.isEmpty()) { 3 if (singleItemWorkRequests.tryAcquire(1)) { 4 long now = System.currentTimeMillis(); 5 while (!processingOrder.isEmpty()) { 6 ID id = processingOrder.poll(); 7 TaskHolder<ID, T> holder = pendingTasks.remove(id); 8 if (holder.getExpiryTime() > now) { 9 // 將 pendingTasks 的任務移到 singleItemWorkQueue 10 singleItemWorkQueue.add(holder); 11 return; 12 } 13 expiredTasks++; 14 } 15 singleItemWorkRequests.release(); 16 } 17 } 18 }
5、任務處理器 TaskExecutors
batchWorkQueue 中的批量任務以及 singleItemWorkQueue 中的單項任務都已經準備好了,那是在哪裡發送到集群節點的呢,那就是任務執行器 TaskExecutors 了。
① 創建 TaskExecutors
從創建 TaskExecutors 的方法中可以看出:
- 批量處理任務的類是 BatchWorkerRunnable,它主要就是處理批量任務隊列 batchWorkQueue 中的任務
- 處理單項任務的類是 SingleTaskWorkerRunnable,它主要就是處理單項任務隊列 singleItemWorkQueue 中的任務
- TaskExecutors 創建了一個執行緒池,batchExecutors 默認有20個工作執行緒(不太理解他為什麼不用JDK現成的執行緒池。。),singleItemExecutors 默認只有一個工作執行緒。
1 static <ID, T> TaskExecutors<ID, T> singleItemExecutors(final String name, int workerCount, final TaskProcessor<T> processor, final AcceptorExecutor<ID, T> acceptorExecutor) { 2 final AtomicBoolean isShutdown = new AtomicBoolean(); 3 final TaskExecutorMetrics metrics = new TaskExecutorMetrics(name); 4 registeredMonitors.put(name, metrics); 5 // workerCount = 1 6 return new TaskExecutors<>(idx -> new SingleTaskWorkerRunnable<>("TaskNonBatchingWorker-" + name + '-' + idx, isShutdown, metrics, processor, acceptorExecutor), workerCount, isShutdown); 7 } 8 9 //////////////////////////////////////////////// 10 11 static <ID, T> TaskExecutors<ID, T> batchExecutors(final String name, int workerCount, final TaskProcessor<T> processor, final AcceptorExecutor<ID, T> acceptorExecutor) { 12 final AtomicBoolean isShutdown = new AtomicBoolean(); 13 final TaskExecutorMetrics metrics = new TaskExecutorMetrics(name); 14 registeredMonitors.put(name, metrics); 15 // BatchWorkerRunnable 批量任務處理 16 return new TaskExecutors<>(idx -> new BatchWorkerRunnable<>("TaskBatchingWorker-" + name + '-' + idx, isShutdown, metrics, processor, acceptorExecutor), workerCount, isShutdown); 17 } 18 19 //////////////////////////////////////////////// 20 21 private final List<Thread> workerThreads; 22 23 TaskExecutors(WorkerRunnableFactory<ID, T> workerRunnableFactory, int workerCount, AtomicBoolean isShutdown) { 24 this.isShutdown = isShutdown; 25 // 工作執行緒集合 26 this.workerThreads = new ArrayList<>(); 27 28 // 創建20個執行緒,相當於是搞了一個執行緒池 29 ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors"); 30 for (int i = 0; i < workerCount; i++) { 31 WorkerRunnable<ID, T> runnable = workerRunnableFactory.create(i); 32 Thread workerThread = new Thread(threadGroup, runnable, runnable.getWorkerName()); 33 workerThreads.add(workerThread); 34 workerThread.setDaemon(true); 35 workerThread.start(); 36 } 37 }
② BatchWorkerRunnable
看批量處理的任務:
- 首先 getWork 獲取批量任務,它調用 taskDispatcher.requestWorkItems(),實際就是返回了 taskDispatcher 的 batchWorkQueue,並且調用 batchWorkRequests.release() 往訊號量放入一個許可證,這樣前面 AcceptorRunner 就可以得到許可證然後去打包批量任務了
- 如果 batchWorkQueue 中沒有批量任務,可以看到是一直在 while 循環等待的,直到拿到一個批量任務。它這個 BatchWorkerRunnable 任務和前面的 AcceptorRunner 任務,感覺通過訊號量的方式就形成了一個等待通知的機制,BatchWorkerRunnable 放入一個許可證,讓 AcceptorRunner 拿到這個許可證去打個批次的任務過來。
- 拿到這個批次任務後,就調用 processor(ReplicationTaskProcessor)來處理任務。
- 如果任務處理結果是 Congestion(阻塞)、TransientError(傳輸失敗)就要重處理,調用了 taskDispatcher.reprocess 將這個批次的任務提交到重處理隊列 reprocessQueue 中。
1 static class BatchWorkerRunnable<ID, T> extends WorkerRunnable<ID, T> { 2 3 BatchWorkerRunnable(String workerName, AtomicBoolean isShutdown, TaskExecutorMetrics metrics, TaskProcessor<T> processor, AcceptorExecutor<ID, T> acceptorExecutor) { 4 super(workerName, isShutdown, metrics, processor, acceptorExecutor); 5 } 6 7 @Override 8 public void run() { 9 try { 10 while (!isShutdown.get()) { 11 // 獲取一個批量任務 12 List<TaskHolder<ID, T>> holders = getWork(); 13 metrics.registerExpiryTimes(holders); 14 // TaskHolder 提取 ReplicationTask 15 List<T> tasks = getTasksOf(holders); 16 // processor => 任務複製處理器 ReplicationTaskProcessor 17 ProcessingResult result = processor.process(tasks); 18 switch (result) { 19 case Success: 20 break; 21 case Congestion: 22 case TransientError: 23 // 阻塞或網路失敗就重新處理這批任務 24 taskDispatcher.reprocess(holders, result); 25 break; 26 case PermanentError: 27 logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName); 28 } 29 metrics.registerTaskResult(result, tasks.size()); 30 } 31 } catch (InterruptedException e) { 32 // Ignore 33 } catch (Throwable e) { 34 // Safe-guard, so we never exit this loop in an uncontrolled way. 35 logger.warn("Discovery WorkerThread error", e); 36 } 37 } 38 39 private List<TaskHolder<ID, T>> getWork() throws InterruptedException { 40 // 獲取批量隊列 batchWorkQueue 41 BlockingQueue<List<TaskHolder<ID, T>>> workQueue = taskDispatcher.requestWorkItems(); 42 List<TaskHolder<ID, T>> result; 43 do { 44 result = workQueue.poll(1, TimeUnit.SECONDS); 45 // 循環等待,直到取到一個批量任務 46 } while (!isShutdown.get() && result == null); 47 return (result == null) ? new ArrayList<>() : result; 48 } 49 50 private List<T> getTasksOf(List<TaskHolder<ID, T>> holders) { 51 List<T> tasks = new ArrayList<>(holders.size()); 52 for (TaskHolder<ID, T> holder : holders) { 53 tasks.add(holder.getTask()); 54 } 55 return tasks; 56 } 57 }
1 BlockingQueue<TaskHolder<ID, T>> requestWorkItem() { 2 singleItemWorkRequests.release(); 3 return singleItemWorkQueue; 4 } 5 6 BlockingQueue<List<TaskHolder<ID, T>>> requestWorkItems() { 7 batchWorkRequests.release(); 8 return batchWorkQueue; 9 }
③ 任務重處理
可以看到處理失敗後,就是將這批任務添加到重處理隊列 reprocessQueue 中去,然後向時間調整期註冊失敗,這就和前面 AcceptorRunner 處理 reprocessQueue 對應起來了。
1 void reprocess(List<TaskHolder<ID, T>> holders, ProcessingResult processingResult) { 2 // 添加到重處理隊列 reprocessQueue 3 reprocessQueue.addAll(holders); 4 replayedTasks += holders.size(); 5 // 時間調整器註冊失敗 6 trafficShaper.registerFailure(processingResult); 7 }
④ TrafficShaper
還記得前面 AcceptorRunner 中又這樣一段程式碼,可以看到是通過 trafficShaper 計算了一個延遲時間,這裡就來看看是如何計算的。
1 long now = System.currentTimeMillis(); 2 if (scheduleTime < now) { 3 // 時間補償,正常情況下 transmissionDelay() 返回 0 4 scheduleTime = now + trafficShaper.transmissionDelay(); 5 } 6 if (scheduleTime <= now) { 7 // 分配批量工作任務:將 pendingTasks 的任務分一批到(最多250個) batchWorkQueue 隊列中 8 assignBatchWork(); 9 // 分配單項工作任務:pendingTasks 如果還有剩餘任務,將沒有過期的轉移到 singleItemWorkQueue 隊列中 10 assignSingleItemWork(); 11 }
時間調整器 TrafficShaper:
- registerFailure 就是設置了失敗的最後時間
- 然後看 transmissionDelay,以阻塞為例,如果上一次阻塞失敗到現在 500 毫秒,那麼 transmissionDelay 返回 500,那麼 transmissionDelay 就大於 now 了,就不會打包任務了。
- 總結下來就是如果上一次阻塞導致批量任務提交失敗,就延遲1000毫秒後執行。如果上一次網路導致批量任務提交失敗,就延遲100毫秒執行。
1 TrafficShaper(long congestionRetryDelayMs, long networkFailureRetryMs) { 2 // 1000 3 this.congestionRetryDelayMs = Math.min(MAX_DELAY, congestionRetryDelayMs); 4 // 100 5 this.networkFailureRetryMs = Math.min(MAX_DELAY, networkFailureRetryMs); 6 } 7 8 void registerFailure(ProcessingResult processingResult) { 9 if (processingResult == ProcessingResult.Congestion) { 10 // 最後一次阻塞導致提交批處理失敗的時間 11 lastCongestionError = System.currentTimeMillis(); 12 } else if (processingResult == ProcessingResult.TransientError) { 13 // 最後一次網路原因導致提交批處理失敗的時間 14 lastNetworkFailure = System.currentTimeMillis(); 15 } 16 } 17 18 // 計算傳輸延遲的時間 19 long transmissionDelay() { 20 if (lastCongestionError == -1 && lastNetworkFailure == -1) { 21 return 0; 22 } 23 24 long now = System.currentTimeMillis(); 25 if (lastCongestionError != -1) { 26 // 阻塞延遲時間 27 long congestionDelay = now - lastCongestionError; 28 if (congestionDelay >= 0 && congestionDelay < congestionRetryDelayMs) { 29 return congestionRetryDelayMs - congestionDelay; 30 } 31 lastCongestionError = -1; 32 } 33 34 if (lastNetworkFailure != -1) { 35 // 網路延遲時間 36 long failureDelay = now - lastNetworkFailure; 37 if (failureDelay >= 0 && failureDelay < networkFailureRetryMs) { 38 return networkFailureRetryMs - failureDelay; 39 } 40 lastNetworkFailure = -1; 41 } 42 return 0; 43 }
⑤ SingleTaskWorkerRunnable
單項任務處理跟批量任務處理的流程是類似的,只不過是一個個的發送同步操作,處理失敗同樣也會放入重處理隊列中。
一個批量任務250個對於大部分場景來說其實不會觸發單項任務的處理,如果微服務集群中有很多的實例,eureka 通過不斷的輪詢也能盡量使用批量處理,我覺得單項任務處理更像是對批量任務處理的一種補充。
6、複製任務處理器 ReplicationTaskProcessor
批量任務最終是提交到 ReplicationTaskProcessor 去處理的,可以看到,就是調用了 replicationClient 提交了批量任務,提交的介面是 POST peerreplication/batch,那我們就可以從這個入口去看 eureka-server 如何接收批量任務的。
1 public ProcessingResult process(List<ReplicationTask> tasks) { 2 // 任務封裝到 ReplicationList 3 ReplicationList list = createReplicationListOf(tasks); 4 try { 5 // 提交批量任務:POST peerreplication/batch/ 6 EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list); 7 int statusCode = response.getStatusCode(); 8 if (!isSuccess(statusCode)) { 9 if (statusCode == 503) { 10 logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId); 11 return ProcessingResult.Congestion; 12 } else { 13 // Unexpected error returned from the server. This should ideally never happen. 14 logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size()); 15 return ProcessingResult.PermanentError; 16 } 17 } else { 18 // 處理批量任務結果 19 handleBatchResponse(tasks, response.getEntity().getResponseList()); 20 } 21 } catch (Throwable e) { 22 if (maybeReadTimeOut(e)) { 23 //read timeout exception is more Congestion then TransientError, return Congestion for longer delay 24 return ProcessingResult.Congestion; 25 } else if (isNetworkConnectException(e)) { 26 logNetworkErrorSample(null, e); 27 return ProcessingResult.TransientError; 28 } else { 29 logger.error("Not re-trying this exception because it does not seem to be a network exception", e); 30 return ProcessingResult.PermanentError; 31 } 32 } 33 return ProcessingResult.Success; 34 }
7、接收複製同步請求
很容易找到批量任務提交的介面在 PeerReplicationResource 的 batchReplication 方法中。
可以看到,其實遍歷批量任務,然後根據不同的操作類型,調用 XxxResource 介面進行對應的操作。比如註冊,就是調用 applicationResource.addInstance 完成實例的註冊。


1 @Path("/{version}/peerreplication") 2 @Produces({"application/xml", "application/json"}) 3 public class PeerReplicationResource { 4 5 private static final Logger logger = LoggerFactory.getLogger(PeerReplicationResource.class); 6 7 private static final String REPLICATION = "true"; 8 9 private final EurekaServerConfig serverConfig; 10 private final PeerAwareInstanceRegistry registry; 11 12 @Inject 13 PeerReplicationResource(EurekaServerContext server) { 14 this.serverConfig = server.getServerConfig(); 15 this.registry = server.getRegistry(); 16 } 17 18 public PeerReplicationResource() { 19 this(EurekaServerContextHolder.getInstance().getServerContext()); 20 } 21 22 @Path("batch") 23 @POST 24 public Response batchReplication(ReplicationList replicationList) { 25 try { 26 ReplicationListResponse batchResponse = new ReplicationListResponse(); 27 for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) { 28 try { 29 // dispatch 分發任務 30 batchResponse.addResponse(dispatch(instanceInfo)); 31 } catch (Exception e) { 32 batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null)); 33 logger.error("{} request processing failed for batch item {}/{}", 34 instanceInfo.getAction(), instanceInfo.getAppName(), instanceInfo.getId(), e); 35 } 36 } 37 return Response.ok(batchResponse).build(); 38 } catch (Throwable e) { 39 logger.error("Cannot execute batch Request", e); 40 return Response.status(Status.INTERNAL_SERVER_ERROR).build(); 41 } 42 } 43 44 private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) { 45 ApplicationResource applicationResource = createApplicationResource(instanceInfo); 46 InstanceResource resource = createInstanceResource(instanceInfo, applicationResource); 47 48 String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp()); 49 String overriddenStatus = toString(instanceInfo.getOverriddenStatus()); 50 String instanceStatus = toString(instanceInfo.getStatus()); 51 52 Builder singleResponseBuilder = new Builder(); 53 // 根據不同的類型分別處理 54 switch (instanceInfo.getAction()) { 55 case Register: 56 singleResponseBuilder = handleRegister(instanceInfo, applicationResource); 57 break; 58 case Heartbeat: 59 singleResponseBuilder = handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus); 60 break; 61 case Cancel: 62 singleResponseBuilder = handleCancel(resource); 63 break; 64 case StatusUpdate: 65 singleResponseBuilder = handleStatusUpdate(instanceInfo, resource); 66 break; 67 case DeleteStatusOverride: 68 singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource); 69 break; 70 } 71 return singleResponseBuilder.build(); 72 } 73 74 /* Visible for testing */ ApplicationResource createApplicationResource(ReplicationInstance instanceInfo) { 75 return new ApplicationResource(instanceInfo.getAppName(), serverConfig, registry); 76 } 77 78 /* Visible for testing */ InstanceResource createInstanceResource(ReplicationInstance instanceInfo, 79 ApplicationResource applicationResource) { 80 return new InstanceResource(applicationResource, instanceInfo.getId(), serverConfig, registry); 81 } 82 83 private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) { 84 // addInstance 85 applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION); 86 return new Builder().setStatusCode(Status.OK.getStatusCode()); 87 } 88 89 private static Builder handleCancel(InstanceResource resource) { 90 // cancelLease 91 Response response = resource.cancelLease(REPLICATION); 92 return new Builder().setStatusCode(response.getStatus()); 93 } 94 95 private static Builder handleHeartbeat(EurekaServerConfig config, InstanceResource resource, String lastDirtyTimestamp, String overriddenStatus, String instanceStatus) { 96 Response response = resource.renewLease(REPLICATION, overriddenStatus, instanceStatus, lastDirtyTimestamp); 97 int responseStatus = response.getStatus(); 98 Builder responseBuilder = new Builder().setStatusCode(responseStatus); 99 100 if ("false".equals(config.getExperimental("bugfix.934"))) { 101 if (responseStatus == Status.OK.getStatusCode() && response.getEntity() != null) { 102 responseBuilder.setResponseEntity((InstanceInfo) response.getEntity()); 103 } 104 } else { 105 if ((responseStatus == Status.OK.getStatusCode() || responseStatus == Status.CONFLICT.getStatusCode()) 106 && response.getEntity() != null) { 107 responseBuilder.setResponseEntity((InstanceInfo) response.getEntity()); 108 } 109 } 110 return responseBuilder; 111 } 112 113 private static Builder handleStatusUpdate(ReplicationInstance instanceInfo, InstanceResource resource) { 114 Response response = resource.statusUpdate(instanceInfo.getStatus(), REPLICATION, toString(instanceInfo.getLastDirtyTimestamp())); 115 return new Builder().setStatusCode(response.getStatus()); 116 } 117 118 private static Builder handleDeleteStatusOverride(ReplicationInstance instanceInfo, InstanceResource resource) { 119 Response response = resource.deleteStatusUpdate(REPLICATION, instanceInfo.getStatus(), 120 instanceInfo.getLastDirtyTimestamp().toString()); 121 return new Builder().setStatusCode(response.getStatus()); 122 } 123 124 private static <T> String toString(T value) { 125 if (value == null) { 126 return null; 127 } 128 return value.toString(); 129 } 130 }
View Code
8、集群數據同步衝突問題
Peer to Peer 模式重點要解決的一個問題是數據複製衝突的問題,因為 peer 節點間的相互複製並不能保證所有操作都成功。eureka 主要通過 lastDirtyTimestamp 標識和心跳來進行數據的最終修復,下面就來看下 eureka 如何處理數據衝突問題的。
① 先看續約的這個方法
- 在續約 renewLease 里,如果 lastDirtyTimestamp 不為空且允許時間戳不一致時進行同步(默認開啟),就調用了 validateDirtyTimestamp 方法校驗 lastDirtyTimestamp。
- 接著看 validateDirtyTimestamp,如果 lastDirtyTimestamp 與本地實例的 lastDirtyTimestamp 一致,說明數據是一致的,就續約成功,返回 OK(200)。
- 如果 lastDirtyTimestamp 大於 本地實例的 lastDirtyTimestamp,說明複製的實例最新更新的,出現數據衝突,返回 NOT_FOUND(404)。
- 如果 lastDirtyTimestamp 小於 本地實例的 lastDirtyTimestamp ,說明複製的實例是舊的,出現數據衝突,返回 CONFLICT(409),並且返回了本地的實例。
1 @PUT 2 public Response renewLease( 3 @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, 4 @QueryParam("overriddenstatus") String overriddenStatus, 5 @QueryParam("status") String status, 6 @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { 7 boolean isFromReplicaNode = "true".equals(isReplication); 8 // 調用註冊表的 renew 進行服務續約 9 boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode); 10 11 // Not found in the registry, immediately ask for a register 12 if (!isSuccess) { 13 logger.warn("Not Found (Renew): {} - {}", app.getName(), id); 14 return Response.status(Status.NOT_FOUND).build(); 15 } 16 // Check if we need to sync based on dirty time stamp, the client 17 // instance might have changed some value 18 Response response; 19 // 如果是複製操作,就校驗 lastDirtyTimestamp 20 if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) { 21 response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode); 22 // Store the overridden status since the validation found out the node that replicates wins 23 if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode() 24 && (overriddenStatus != null) 25 && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus)) 26 && isFromReplicaNode) { 27 registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus)); 28 } 29 } else { 30 response = Response.ok().build(); 31 } 32 logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus()); 33 return response; 34 } 35 36 /////////////////////////////////////////// 37 38 private Response validateDirtyTimestamp(Long lastDirtyTimestamp, boolean isReplication) { 39 InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false); 40 if (appInfo != null) { 41 // 如果複製傳過來的實例中 lastDirtyTimestamp 不等於本地實例的 lastDirtyTimestamp 42 if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) { 43 Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication}; 44 45 if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) { 46 logger.debug( 47 "Time to sync, since the last dirty timestamp differs -" 48 + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}", 49 args); 50 // 如果複製實例的 lastDirtyTimestamp > 本地實例的 lastDirtyTimestamp,表示數據出現衝突,返回 404,要求應用實例重新進行 register 操作 51 return Response.status(Status.NOT_FOUND).build(); 52 } else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) { 53 // In the case of replication, send the current instance info in the registry for the 54 // replicating node to sync itself with this one. 55 if (isReplication) { 56 logger.debug( 57 "Time to sync, since the last dirty timestamp differs -" 58 + " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}", 59 args); 60 // 如果本地實例的 lastDirtyTimestamp > 複製實例的 lastDirtyTimestamp,就返回 CONFLICT(409),說明數據衝突,要求其同步自己最新的數據 61 // 注意這裡將本地實例 appInfo 放入 Response entity 中了 62 return Response.status(Status.CONFLICT).entity(appInfo).build(); 63 } else { 64 return Response.ok().build(); 65 } 66 } 67 } 68 69 } 70 return Response.ok().build(); 71 }
② 接著看 PeerReplicationResource 處理心跳的方法
- 首先就是調用了續約的方法 renewLease 進行續約
- 如果返回的狀態是 OK 或者 CONFLICT,就在 resposeEntity 中返回本地實例
1 private static Builder handleHeartbeat(EurekaServerConfig config, InstanceResource resource, String lastDirtyTimestamp, String overriddenStatus, String instanceStatus) { 2 // 調用 renewLease 續約 3 Response response = resource.renewLease(REPLICATION, overriddenStatus, instanceStatus, lastDirtyTimestamp); 4 int responseStatus = response.getStatus(); 5 Builder responseBuilder = new Builder().setStatusCode(responseStatus); 6 7 if ("false".equals(config.getExperimental("bugfix.934"))) { 8 if (responseStatus == Status.OK.getStatusCode() && response.getEntity() != null) { 9 responseBuilder.setResponseEntity((InstanceInfo) response.getEntity()); 10 } 11 } else { 12 if ((responseStatus == Status.OK.getStatusCode() || responseStatus == Status.CONFLICT.getStatusCode()) 13 && response.getEntity() != null) { 14 // 續約成功或 CONFLICT 衝突時,將本地實例 appInfo 返回到客戶端 15 responseBuilder.setResponseEntity((InstanceInfo) response.getEntity()); 16 } 17 } 18 return responseBuilder; 19 }
③ PeerEurekaNode 發送心跳
ReplicationTaskProcessor 收到批量任務返回結果後,會處理響應結果,對於心跳任務,可以找到,失敗後就會回調 handleFailure 方法。
- 如果返回狀態是 404(NOT_FOUND),就會重新註冊,也是提交到隊列中。通過重新註冊來實現數據同步。
- 如果是其它狀態(409 CONFLICT)並且開啟了時間戳不一致就同步的配置,就將服務端返回的實例註冊到本地,實現數據的同步。
1 public void heartbeat(final String appName, final String id, 2 final InstanceInfo info, final InstanceStatus overriddenStatus, 3 boolean primeConnection) throws Throwable { 4 if (primeConnection) { 5 // We do not care about the result for priming request. 6 replicationClient.sendHeartBeat(appName, id, info, overriddenStatus); 7 return; 8 } 9 ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) { 10 @Override 11 public EurekaHttpResponse<InstanceInfo> execute() throws Throwable { 12 return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus); 13 } 14 15 @Override 16 public void handleFailure(int statusCode, Object responseEntity) throws Throwable { 17 super.handleFailure(statusCode, responseEntity); 18 if (statusCode == 404) { 19 logger.warn("{}: missing entry.", getTaskName()); 20 if (info != null) { 21 logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}", 22 getTaskName(), info.getId(), info.getStatus()); 23 // 複製返回 404 時,重新註冊 24 register(info); 25 } 26 } else if (config.shouldSyncWhenTimestampDiffers()) { 27 // 409(CONFLICT) 28 InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity; 29 if (peerInstanceInfo != null) { 30 syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo); 31 } 32 } 33 } 34 }; 35 long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info); 36 batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime); 37 } 38 39 ////////////////////////////////////////// 40 41 private void syncInstancesIfTimestampDiffers(String appName, String id, InstanceInfo info, InstanceInfo infoFromPeer) { 42 try { 43 if (infoFromPeer != null) { 44 if (infoFromPeer.getOverriddenStatus() != null && !InstanceStatus.UNKNOWN.equals(infoFromPeer.getOverriddenStatus())) { 45 logger.warn("Overridden Status info -id {}, mine {}, peer's {}", id, info.getOverriddenStatus(), infoFromPeer.getOverriddenStatus()); 46 registry.storeOverriddenStatusIfRequired(appName, id, infoFromPeer.getOverriddenStatus()); 47 } 48 // 將服務端的實例註冊到本地,實現數據同步 49 registry.register(infoFromPeer, true); 50 } 51 } catch (Throwable e) { 52 logger.warn("Exception when trying to set information from peer :", e); 53 } 54 }
至此,我們就可以總結出,eureka server 通過對比 lastDirtyTimestamp 和心跳操作來實現集群數據的複製和最終同步。
前面提到的實例過期就丟棄任務這樣看來就沒問題,它也不保證peer節點間相互複製的所有操作都成功,eureka 採用的是最終一致性,它是通過心跳的方式實現集群數據的最終修復和同步,只是集群間可能會同步延遲。
9、集群同步總結
下面總結下 eureka-server 集群節點間的同步:
- 首先 eureka-server 集群採用的是 Peer To Peer 的模式,即對等複製,各個 server 不分主從,每個 server 都可以接收寫請求,然後互相之間進行數據更新同步。
- 數據同步採用了多層任務隊列+批量處理的機制:
- eureka-server 接收到客戶端請求(註冊、下線、續約)後都會調用集群 PeerEurekaNode 進行操作的同步
- PeerEurekaNode 將操作封裝成 InstanceReplicationTask 實例複製任務,並用批量分發器 batchingDispatcher(TaskDispatcher)來分發處理
- batchingDispatcher 內部則將任務交給接收者執行器 AcceptorExecutor 處理,任務首先進入到 AcceptorExecutor 內的接收者隊列 acceptorQueue 中
- AcceptorExecutor 有個後台工作執行緒(AcceptorRunner)不斷輪詢,將接收者隊列 acceptorQueue 和 重處理隊列 reprocessQueue 中的任務轉移到處理中隊列中(processingOrder + pendingTasks)
- 接著將處理中隊列中的任務打包,一次最多 250 個任務,然後放到批量工作隊列 batchWorkQueue。如果處理中隊列中還有任務,就將任務放到單項任務隊列 singleItemWorkQueue
- 任務都打包好了,任務執行器 TaskExecutors 內分別有批量任務處理器(BatchWorkerRunnable)和單項任務處理器(SingleTaskWorkerRunnable)來處理 batchWorkQueue 和 singleItemWorkQueue 中的任務
- 處理器會利用任務複製處理器(ReplicationTaskProcessor)來提交任務,批量任務會提交給 server 節點的批量介面(peerreplication/batch/),單項任務則提交到對應的操作介面
- 任務提交如果阻塞或者網路失敗就會被放入重處理隊列 reprocessQueue,然後再次被 AcceptorRunner 輪詢處理,不過過期(超過90秒)的任務會被丟棄掉
- 其它 eureka-server 同步:
- 其它 eureka-server 接收到批量複製請求後,會輪詢批量任務列表,根據不同的操作類型(Register、Heartbeat、Cancel 等)分別調用 Resource 的介面進行處理
- 如果是續約操作,會判斷複製實例的 lastDirtyTimestamp 與本地實例的 lastDirtyTimestamp,如果是一致的,就任務數據一致
- 如果複製實例的 lastDirtyTimestamp > 本地實例的 lastDirtyTimestamp,則複製實例的數據是最新的,返回 404(NOT_FOUND) 要求客戶端重新發送一個註冊操作過來
- 如果複製實例的 lastDirtyTimestamp < 本地實例的 lastDirtyTimestamp,則本地實例的數據是最新的,返回 409(CONFLICT)和本地實例,客戶端用返回來的實例覆蓋本地的實例
下面再用一張圖總結集群同步:
十二、SpringCloud Eureka
到這裡,對 Eureka 核心源碼的研究就差不多了,這節來看下 Spring cloud eureka。Spring cloud eureka 提供了服務端的依賴 spring-cloud-starter-netflix-eureka-server 和客戶端的依賴 spring-cloud-starter-netflix-eureka-client,這兩個依賴包本身是比較簡單的,只是對 netflix 的 eureka-server 和 eureka-client 的封裝,它通過一些註解和配置類將 eureka 整合到 springboot 技術棧中,便於使用。
1、spring-cloud-starter-netflix-eureka-server
看 spring-cloud-starter-netflix-eureka-server 我們從 @EnableEurekaServer 這個註解來看,因為我們的註冊中心是基於 springboot 的,在啟動類上加上了 @EnableEurekaServer 註解就啟用了 eureka-server 註冊中心。
① Eureka Server 自動化配置
看這個註解的定義,從注釋中可以了解到,這個註解會激活 EurekaServerAutoConfiguration 的自動化配置類。
1 /** 2 * Annotation to activate Eureka Server related configuration. 3 * {@link EurekaServerAutoConfiguration} 4 * 5 * @author Dave Syer 6 * @author Biju Kunjummen 7 */ 8 @Target(ElementType.TYPE) 9 @Retention(RetentionPolicy.RUNTIME) 10 @Documented 11 @Import(EurekaServerMarkerConfiguration.class) 12 public @interface EnableEurekaServer { 13 14 }
看 EurekaServerAutoConfiguration 這個類,可以發現 springcloud 幾乎是將 com.netflix.eureka.EurekaBootStrap 中初始化組件的程式碼拷貝到了 EurekaServerAutoConfiguration,然後以 springboot 創建 bean 的方式來創建相關的組件。


1 @Configuration(proxyBeanMethods = false) 2 @Import(EurekaServerInitializerConfiguration.class) 3 @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) 4 @EnableConfigurationProperties({ EurekaDashboardProperties.class, 5 InstanceRegistryProperties.class }) 6 @PropertySource("classpath:/eureka/server.properties") 7 public class EurekaServerAutoConfiguration implements WebMvcConfigurer { 8 9 /** 10 * List of packages containing Jersey resources required by the Eureka server. 11 */ 12 private static final String[] EUREKA_PACKAGES = new String[] { 13 "com.netflix.discovery", "com.netflix.eureka" }; 14 15 @Autowired 16 private ApplicationInfoManager applicationInfoManager; 17 18 @Autowired 19 private EurekaServerConfig eurekaServerConfig; 20 21 @Autowired 22 private EurekaClientConfig eurekaClientConfig; 23 24 @Autowired 25 private EurekaClient eurekaClient; 26 27 @Autowired 28 private InstanceRegistryProperties instanceRegistryProperties; 29 30 /** 31 * A {@link CloudJacksonJson} instance. 32 */ 33 public static final CloudJacksonJson JACKSON_JSON = new CloudJacksonJson(); 34 35 @Bean 36 public HasFeatures eurekaServerFeature() { 37 return HasFeatures.namedFeature("Eureka Server", 38 EurekaServerAutoConfiguration.class); 39 } 40 41 @Bean 42 @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", 43 matchIfMissing = true) 44 public EurekaController eurekaController() { 45 return new EurekaController(this.applicationInfoManager); 46 } 47 48 static { 49 CodecWrappers.registerWrapper(JACKSON_JSON); 50 EurekaJacksonCodec.setInstance(JACKSON_JSON.getCodec()); 51 } 52 53 @Bean 54 public ServerCodecs serverCodecs() { 55 return new CloudServerCodecs(this.eurekaServerConfig); 56 } 57 58 private static CodecWrapper getFullJson(EurekaServerConfig serverConfig) { 59 CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getJsonCodecName()); 60 return codec == null ? CodecWrappers.getCodec(JACKSON_JSON.codecName()) : codec; 61 } 62 63 private static CodecWrapper getFullXml(EurekaServerConfig serverConfig) { 64 CodecWrapper codec = CodecWrappers.getCodec(serverConfig.getXmlCodecName()); 65 return codec == null ? CodecWrappers.getCodec(CodecWrappers.XStreamXml.class) 66 : codec; 67 } 68 69 @Bean 70 @ConditionalOnMissingBean 71 public ReplicationClientAdditionalFilters replicationClientAdditionalFilters() { 72 return new ReplicationClientAdditionalFilters(Collections.emptySet()); 73 } 74 75 @Bean 76 public PeerAwareInstanceRegistry peerAwareInstanceRegistry( 77 ServerCodecs serverCodecs) { 78 this.eurekaClient.getApplications(); // force initialization 79 return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, 80 serverCodecs, this.eurekaClient, 81 this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(), 82 this.instanceRegistryProperties.getDefaultOpenForTrafficCount()); 83 } 84 85 @Bean 86 @ConditionalOnMissingBean 87 public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, 88 ServerCodecs serverCodecs, 89 ReplicationClientAdditionalFilters replicationClientAdditionalFilters) { 90 return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig, 91 this.eurekaClientConfig, serverCodecs, this.applicationInfoManager, 92 replicationClientAdditionalFilters); 93 } 94 95 @Bean 96 @ConditionalOnMissingBean 97 public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, 98 PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) { 99 return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, 100 registry, peerEurekaNodes, this.applicationInfoManager); 101 } 102 103 @Bean 104 public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, 105 EurekaServerContext serverContext) { 106 return new EurekaServerBootstrap(this.applicationInfoManager, 107 this.eurekaClientConfig, this.eurekaServerConfig, registry, 108 serverContext); 109 } 110 111 /** 112 * Register the Jersey filter. 113 * @param eurekaJerseyApp an {@link Application} for the filter to be registered 114 * @return a jersey {@link FilterRegistrationBean} 115 */ 116 @Bean 117 public FilterRegistrationBean<?> jerseyFilterRegistration( 118 javax.ws.rs.core.Application eurekaJerseyApp) { 119 FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>(); 120 bean.setFilter(new ServletContainer(eurekaJerseyApp)); 121 bean.setOrder(Ordered.LOWEST_PRECEDENCE); 122 bean.setUrlPatterns( 123 Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*")); 124 125 return bean; 126 } 127 128 /** 129 * Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources 130 * required by the Eureka server. 131 * @param environment an {@link Environment} instance to retrieve classpath resources 132 * @param resourceLoader a {@link ResourceLoader} instance to get classloader from 133 * @return created {@link Application} object 134 */ 135 @Bean 136 public javax.ws.rs.core.Application jerseyApplication(Environment environment, 137 ResourceLoader resourceLoader) { 138 139 ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider( 140 false, environment); 141 142 // Filter to include only classes that have a particular annotation. 143 // 144 provider.addIncludeFilter(new AnnotationTypeFilter(Path.class)); 145 provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class)); 146 147 // Find classes in Eureka packages (or subpackages) 148 // 149 Set<Class<?>> classes = new HashSet<>(); 150 for (String basePackage : EUREKA_PACKAGES) { 151 Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage); 152 for (BeanDefinition bd : beans) { 153 Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(), 154 resourceLoader.getClassLoader()); 155 classes.add(cls); 156 } 157 } 158 159 // Construct the Jersey ResourceConfig 160 Map<String, Object> propsAndFeatures = new HashMap<>(); 161 propsAndFeatures.put( 162 // Skip static content used by the webapp 163 ServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX, 164 EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*"); 165 166 DefaultResourceConfig rc = new DefaultResourceConfig(classes); 167 rc.setPropertiesAndFeatures(propsAndFeatures); 168 169 return rc; 170 } 171 172 @Bean 173 @ConditionalOnBean(name = "httpTraceFilter") 174 public FilterRegistrationBean<?> traceFilterRegistration( 175 @Qualifier("httpTraceFilter") Filter filter) { 176 FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>(); 177 bean.setFilter(filter); 178 bean.setOrder(Ordered.LOWEST_PRECEDENCE - 10); 179 return bean; 180 } 181 182 @Configuration(proxyBeanMethods = false) 183 protected static class EurekaServerConfigBeanConfiguration { 184 185 @Bean 186 @ConditionalOnMissingBean 187 public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) { 188 EurekaServerConfigBean server = new EurekaServerConfigBean(); 189 if (clientConfig.shouldRegisterWithEureka()) { 190 // Set a sensible default if we are supposed to replicate 191 server.setRegistrySyncRetries(5); 192 } 193 return server; 194 } 195 196 } 197 198 /** 199 * {@link PeerEurekaNodes} which updates peers when /refresh is invoked. Peers are 200 * updated only if <code>eureka.client.use-dns-for-fetching-service-urls</code> is 201 * <code>false</code> and one of following properties have changed. 202 * <p> 203 * </p> 204 * <ul> 205 * <li><code>eureka.client.availability-zones</code></li> 206 * <li><code>eureka.client.region</code></li> 207 * <li><code>eureka.client.service-url.<zone></code></li> 208 * </ul> 209 */ 210 static class RefreshablePeerEurekaNodes extends PeerEurekaNodes 211 implements ApplicationListener<EnvironmentChangeEvent> { 212 213 private ReplicationClientAdditionalFilters replicationClientAdditionalFilters; 214 215 RefreshablePeerEurekaNodes(final PeerAwareInstanceRegistry registry, 216 final EurekaServerConfig serverConfig, 217 final EurekaClientConfig clientConfig, final ServerCodecs serverCodecs, 218 final ApplicationInfoManager applicationInfoManager, 219 final ReplicationClientAdditionalFilters replicationClientAdditionalFilters) { 220 super(registry, serverConfig, clientConfig, serverCodecs, 221 applicationInfoManager); 222 this.replicationClientAdditionalFilters = replicationClientAdditionalFilters; 223 } 224 225 @Override 226 protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) { 227 JerseyReplicationClient replicationClient = JerseyReplicationClient 228 .createReplicationClient(serverConfig, serverCodecs, 229 peerEurekaNodeUrl); 230 231 this.replicationClientAdditionalFilters.getFilters() 232 .forEach(replicationClient::addReplicationClientFilter); 233 234 String targetHost = hostFromUrl(peerEurekaNodeUrl); 235 if (targetHost == null) { 236 targetHost = "host"; 237 } 238 return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, 239 replicationClient, serverConfig); 240 } 241 242 @Override 243 public void onApplicationEvent(final EnvironmentChangeEvent event) { 244 if (shouldUpdate(event.getKeys())) { 245 updatePeerEurekaNodes(resolvePeerUrls()); 246 } 247 } 248 249 /* 250 * Check whether specific properties have changed. 251 */ 252 protected boolean shouldUpdate(final Set<String> changedKeys) { 253 assert changedKeys != null; 254 255 // if eureka.client.use-dns-for-fetching-service-urls is true, then 256 // service-url will not be fetched from environment. 257 if (this.clientConfig.shouldUseDnsForFetchingServiceUrls()) { 258 return false; 259 } 260 261 if (changedKeys.contains("eureka.client.region")) { 262 return true; 263 } 264 265 for (final String key : changedKeys) { 266 // property keys are not expected to be null. 267 if (key.startsWith("eureka.client.service-url.") 268 || key.startsWith("eureka.client.availability-zones.")) { 269 return true; 270 } 271 } 272 return false; 273 } 274 275 } 276 277 class CloudServerCodecs extends DefaultServerCodecs { 278 279 CloudServerCodecs(EurekaServerConfig serverConfig) { 280 super(getFullJson(serverConfig), 281 CodecWrappers.getCodec(CodecWrappers.JacksonJsonMini.class), 282 getFullXml(serverConfig), 283 CodecWrappers.getCodec(CodecWrappers.JacksonXmlMini.class)); 284 } 285 286 } 287 288 }
View Code
但是要注意,springcloud 中,感知集群的註冊表組件是 InstanceRegistry,集群 PeerEurekaNodes 組件是 RefreshablePeerEurekaNodes。
② Springboot 方式的配置
EurekaServerAutoConfiguration 中注入了 EurekaServerConfig、EurekaClientConfig,在 Netflix 中,EurekaServerConfig 默認讀取的是 eureka-server.properties 配置文件,EurekaClientConfig 默認讀取的是 eureka-client.properties 配置文件。而在 springcloud 中,它們的實現類為 EurekaServerConfigBean、EurekaClientConfigBean,可以看到,就是基於 springboot 的配置方式來的了,讀取的是 application.yml 配置文件中 eureka 的配置了,並且每個配置也提供了默認值。
例如 EurekaServerConfigBean:


1 @ConfigurationProperties(EurekaServerConfigBean.PREFIX) 2 public class EurekaServerConfigBean implements EurekaServerConfig { 3 4 /** 5 * Eureka server configuration properties prefix. 6 */ 7 public static final String PREFIX = "eureka.server"; 8 9 private static final int MINUTES = 60 * 1000; 10 11 @Autowired(required = false) 12 PropertyResolver propertyResolver; 13 14 private String aWSAccessId; 15 16 private String aWSSecretKey; 17 18 private int eIPBindRebindRetries = 3; 19 20 private int eIPBindingRetryIntervalMs = 5 * MINUTES; 21 22 private int eIPBindingRetryIntervalMsWhenUnbound = 1 * MINUTES; 23 24 private boolean enableSelfPreservation = true; 25 26 private double renewalPercentThreshold = 0.85; 27 28 private int renewalThresholdUpdateIntervalMs = 15 * MINUTES; 29 30 private int peerEurekaNodesUpdateIntervalMs = 10 * MINUTES; 31 32 private int numberOfReplicationRetries = 5; 33 34 private int peerEurekaStatusRefreshTimeIntervalMs = 30 * 1000; 35 36 private int waitTimeInMsWhenSyncEmpty = 5 * MINUTES; 37 38 private int peerNodeConnectTimeoutMs = 200; 39 40 private int peerNodeReadTimeoutMs = 200; 41 42 private int peerNodeTotalConnections = 1000; 43 44 private int peerNodeTotalConnectionsPerHost = 500; 45 46 private int peerNodeConnectionIdleTimeoutSeconds = 30; 47 48 private long retentionTimeInMSInDeltaQueue = 3 * MINUTES; 49 50 private long deltaRetentionTimerIntervalInMs = 30 * 1000; 51 52 private long evictionIntervalTimerInMs = 60 * 1000; 53 54 private int aSGQueryTimeoutMs = 300; 55 56 private long aSGUpdateIntervalMs = 5 * MINUTES; 57 58 private long aSGCacheExpiryTimeoutMs = 10 * MINUTES; // defaults to longer than the 59 60 // asg update interval 61 62 private long responseCacheAutoExpirationInSeconds = 180; 63 64 private long responseCacheUpdateIntervalMs = 30 * 1000; 65 66 private boolean useReadOnlyResponseCache = true; 67 68 private boolean disableDelta; 69 70 private long maxIdleThreadInMinutesAgeForStatusReplication = 10; 71 72 private int minThreadsForStatusReplication = 1; 73 74 private int maxThreadsForStatusReplication = 1; 75 76 private int maxElementsInStatusReplicationPool = 10000; 77 78 private boolean syncWhenTimestampDiffers = true; 79 80 private int registrySyncRetries = 0; 81 82 private long registrySyncRetryWaitMs = 30 * 1000; 83 84 private int maxElementsInPeerReplicationPool = 10000; 85 86 private long maxIdleThreadAgeInMinutesForPeerReplication = 15; 87 88 private int minThreadsForPeerReplication = 5; 89 90 private int maxThreadsForPeerReplication = 20; 91 92 private int maxTimeForReplication = 30000; 93 94 private boolean primeAwsReplicaConnections = true; 95 96 private boolean disableDeltaForRemoteRegions; 97 98 private int remoteRegionConnectTimeoutMs = 1000; 99 100 private int remoteRegionReadTimeoutMs = 1000; 101 102 private int remoteRegionTotalConnections = 1000; 103 104 private int remoteRegionTotalConnectionsPerHost = 500; 105 106 private int remoteRegionConnectionIdleTimeoutSeconds = 30; 107 108 private boolean gZipContentFromRemoteRegion = true; 109 110 private Map<String, String> remoteRegionUrlsWithName = new HashMap<>(); 111 112 private String[] remoteRegionUrls; 113 114 private Map<String, Set<String>> remoteRegionAppWhitelist = new HashMap<>(); 115 116 private int remoteRegionRegistryFetchInterval = 30; 117 118 private int remoteRegionFetchThreadPoolSize = 20; 119 120 private String remoteRegionTrustStore = ""; 121 122 private String remoteRegionTrustStorePassword = "changeit"; 123 124 private boolean disableTransparentFallbackToOtherRegion; 125 126 private boolean batchReplication; 127 128 private boolean rateLimiterEnabled = false; 129 130 private boolean rateLimiterThrottleStandardClients = false; 131 132 private Set<String> rateLimiterPrivilegedClients = Collections.emptySet(); 133 134 private int rateLimiterBurstSize = 10; 135 136 private int rateLimiterRegistryFetchAverageRate = 500; 137 138 private int rateLimiterFullFetchAverageRate = 100; 139 140 private boolean logIdentityHeaders = true; 141 142 private String listAutoScalingGroupsRoleName = "ListAutoScalingGroups"; 143 144 private boolean enableReplicatedRequestCompression = false; 145 146 private String jsonCodecName; 147 148 private String xmlCodecName; 149 150 private int route53BindRebindRetries = 3; 151 152 private int route53BindingRetryIntervalMs = 5 * MINUTES; 153 154 private long route53DomainTTL = 30; 155 156 private AwsBindingStrategy bindingStrategy = AwsBindingStrategy.EIP; 157 158 private int minAvailableInstancesForPeerReplication = -1; 159 160 private int initialCapacityOfResponseCache = 1000; 161 162 private int expectedClientRenewalIntervalSeconds = 30; 163 164 private boolean useAwsAsgApi = true; 165 166 private String myUrl; 167 168 @Override 169 public boolean shouldEnableSelfPreservation() { 170 return this.enableSelfPreservation; 171 } 172 173 @Override 174 public boolean shouldDisableDelta() { 175 return this.disableDelta; 176 } 177 178 @Override 179 public boolean shouldSyncWhenTimestampDiffers() { 180 return this.syncWhenTimestampDiffers; 181 } 182 183 @Override 184 public boolean shouldPrimeAwsReplicaConnections() { 185 return this.primeAwsReplicaConnections; 186 } 187 188 @Override 189 public boolean shouldDisableDeltaForRemoteRegions() { 190 return this.disableDeltaForRemoteRegions; 191 } 192 193 @Override 194 public boolean shouldGZipContentFromRemoteRegion() { 195 return this.gZipContentFromRemoteRegion; 196 } 197 198 @Override 199 public Set<String> getRemoteRegionAppWhitelist(String regionName) { 200 return this.remoteRegionAppWhitelist 201 .get(regionName == null ? "global" : regionName.trim().toLowerCase()); 202 } 203 204 @Override 205 public boolean disableTransparentFallbackToOtherRegion() { 206 return this.disableTransparentFallbackToOtherRegion; 207 } 208 209 @Override 210 public boolean shouldBatchReplication() { 211 return this.batchReplication; 212 } 213 214 @Override 215 public String getMyUrl() { 216 return this.myUrl; 217 } 218 219 public void setMyUrl(String myUrl) { 220 this.myUrl = myUrl; 221 } 222 223 @Override 224 public boolean shouldLogIdentityHeaders() { 225 return this.logIdentityHeaders; 226 } 227 228 @Override 229 public String getJsonCodecName() { 230 return this.jsonCodecName; 231 } 232 233 @Override 234 public String getXmlCodecName() { 235 return this.xmlCodecName; 236 } 237 238 @Override 239 public boolean shouldUseReadOnlyResponseCache() { 240 return this.useReadOnlyResponseCache; 241 } 242 243 @Override 244 public boolean shouldEnableReplicatedRequestCompression() { 245 return this.enableReplicatedRequestCompression; 246 } 247 248 @Override 249 public String getExperimental(String name) { 250 if (this.propertyResolver != null) { 251 return this.propertyResolver.getProperty(PREFIX + ".experimental." + name, 252 String.class, null); 253 } 254 return null; 255 } 256 257 @Override 258 public int getInitialCapacityOfResponseCache() { 259 return this.initialCapacityOfResponseCache; 260 } 261 262 public void setInitialCapacityOfResponseCache(int initialCapacityOfResponseCache) { 263 this.initialCapacityOfResponseCache = initialCapacityOfResponseCache; 264 } 265 266 @Override 267 public int getHealthStatusMinNumberOfAvailablePeers() { 268 return this.minAvailableInstancesForPeerReplication; 269 } 270 271 public PropertyResolver getPropertyResolver() { 272 return propertyResolver; 273 } 274 275 public void setPropertyResolver(PropertyResolver propertyResolver) { 276 this.propertyResolver = propertyResolver; 277 } 278 279 public String getAWSAccessId() { 280 return aWSAccessId; 281 } 282 283 public void setAWSAccessId(String aWSAccessId) { 284 this.aWSAccessId = aWSAccessId; 285 } 286 287 public String getAWSSecretKey() { 288 return aWSSecretKey; 289 } 290 291 public void setAWSSecretKey(String aWSSecretKey) { 292 this.aWSSecretKey = aWSSecretKey; 293 } 294 295 public int getEIPBindRebindRetries() { 296 return eIPBindRebindRetries; 297 } 298 299 public void setEIPBindRebindRetries(int eIPBindRebindRetries) { 300 this.eIPBindRebindRetries = eIPBindRebindRetries; 301 } 302 303 public int getEIPBindingRetryIntervalMs() { 304 return eIPBindingRetryIntervalMs; 305 } 306 307 public void setEIPBindingRetryIntervalMs(int eIPBindingRetryIntervalMs) { 308 this.eIPBindingRetryIntervalMs = eIPBindingRetryIntervalMs; 309 } 310 311 public int getEIPBindingRetryIntervalMsWhenUnbound() { 312 return eIPBindingRetryIntervalMsWhenUnbound; 313 } 314 315 public void setEIPBindingRetryIntervalMsWhenUnbound( 316 int eIPBindingRetryIntervalMsWhenUnbound) { 317 this.eIPBindingRetryIntervalMsWhenUnbound = eIPBindingRetryIntervalMsWhenUnbound; 318 } 319 320 public boolean isEnableSelfPreservation() { 321 return enableSelfPreservation; 322 } 323 324 public void setEnableSelfPreservation(boolean enableSelfPreservation) { 325 this.enableSelfPreservation = enableSelfPreservation; 326 } 327 328 @Override 329 public double getRenewalPercentThreshold() { 330 return renewalPercentThreshold; 331 } 332 333 public void setRenewalPercentThreshold(double renewalPercentThreshold) { 334 this.renewalPercentThreshold = renewalPercentThreshold; 335 } 336 337 @Override 338 public int getRenewalThresholdUpdateIntervalMs() { 339 return renewalThresholdUpdateIntervalMs; 340 } 341 342 @Override 343 public int getExpectedClientRenewalIntervalSeconds() { 344 return this.expectedClientRenewalIntervalSeconds; 345 } 346 347 public void setExpectedClientRenewalIntervalSeconds( 348 int expectedClientRenewalIntervalSeconds) { 349 this.expectedClientRenewalIntervalSeconds = expectedClientRenewalIntervalSeconds; 350 } 351 352 public void setRenewalThresholdUpdateIntervalMs( 353 int renewalThresholdUpdateIntervalMs) { 354 this.renewalThresholdUpdateIntervalMs = renewalThresholdUpdateIntervalMs; 355 } 356 357 @Override 358 public int getPeerEurekaNodesUpdateIntervalMs() { 359 return peerEurekaNodesUpdateIntervalMs; 360 } 361 362 public void setPeerEurekaNodesUpdateIntervalMs(int peerEurekaNodesUpdateIntervalMs) { 363 this.peerEurekaNodesUpdateIntervalMs = peerEurekaNodesUpdateIntervalMs; 364 } 365 366 @Override 367 public int getNumberOfReplicationRetries() { 368 return numberOfReplicationRetries; 369 } 370 371 public void setNumberOfReplicationRetries(int numberOfReplicationRetries) { 372 this.numberOfReplicationRetries = numberOfReplicationRetries; 373 } 374 375 @Override 376 public int getPeerEurekaStatusRefreshTimeIntervalMs() { 377 return peerEurekaStatusRefreshTimeIntervalMs; 378 } 379 380 public void setPeerEurekaStatusRefreshTimeIntervalMs( 381 int peerEurekaStatusRefreshTimeIntervalMs) { 382 this.peerEurekaStatusRefreshTimeIntervalMs = peerEurekaStatusRefreshTimeIntervalMs; 383 } 384 385 @Override 386 public int getWaitTimeInMsWhenSyncEmpty() { 387 return waitTimeInMsWhenSyncEmpty; 388 } 389 390 public void setWaitTimeInMsWhenSyncEmpty(int waitTimeInMsWhenSyncEmpty) { 391 this.waitTimeInMsWhenSyncEmpty = waitTimeInMsWhenSyncEmpty; 392 } 393 394 @Override 395 public int getPeerNodeConnectTimeoutMs() { 396 return peerNodeConnectTimeoutMs; 397 } 398 399 public void setPeerNodeConnectTimeoutMs(int peerNodeConnectTimeoutMs) { 400 this.peerNodeConnectTimeoutMs = peerNodeConnectTimeoutMs; 401 } 402 403 @Override 404 public int getPeerNodeReadTimeoutMs() { 405 return peerNodeReadTimeoutMs; 406 } 407 408 public void setPeerNodeReadTimeoutMs(int peerNodeReadTimeoutMs) { 409 this.peerNodeReadTimeoutMs = peerNodeReadTimeoutMs; 410 } 411 412 @Override 413 public int getPeerNodeTotalConnections() { 414 return peerNodeTotalConnections; 415 } 416 417 public void setPeerNodeTotalConnections(int peerNodeTotalConnections) { 418 this.peerNodeTotalConnections = peerNodeTotalConnections; 419 } 420 421 @Override 422 public int getPeerNodeTotalConnectionsPerHost() { 423 return peerNodeTotalConnectionsPerHost; 424 } 425 426 public void setPeerNodeTotalConnectionsPerHost(int peerNodeTotalConnectionsPerHost) { 427 this.peerNodeTotalConnectionsPerHost = peerNodeTotalConnectionsPerHost; 428 } 429 430 @Override 431 public int getPeerNodeConnectionIdleTimeoutSeconds() { 432 return peerNodeConnectionIdleTimeoutSeconds; 433 } 434 435 public void setPeerNodeConnectionIdleTimeoutSeconds( 436 int peerNodeConnectionIdleTimeoutSeconds) { 437 this.peerNodeConnectionIdleTimeoutSeconds = peerNodeConnectionIdleTimeoutSeconds; 438 } 439 440 @Override 441 public long getRetentionTimeInMSInDeltaQueue() { 442 return retentionTimeInMSInDeltaQueue; 443 } 444 445 public void setRetentionTimeInMSInDeltaQueue(long retentionTimeInMSInDeltaQueue) { 446 this.retentionTimeInMSInDeltaQueue = retentionTimeInMSInDeltaQueue; 447 } 448 449 @Override 450 public long getDeltaRetentionTimerIntervalInMs() { 451 return deltaRetentionTimerIntervalInMs; 452 } 453 454 public void setDeltaRetentionTimerIntervalInMs(long deltaRetentionTimerIntervalInMs) { 455 this.deltaRetentionTimerIntervalInMs = deltaRetentionTimerIntervalInMs; 456 } 457 458 @Override 459 public long getEvictionIntervalTimerInMs() { 460 return evictionIntervalTimerInMs; 461 } 462 463 @Override 464 public boolean shouldUseAwsAsgApi() { 465 return this.useAwsAsgApi; 466 } 467 468 public void setUseAwsAsgApi(boolean useAwsAsgApi) { 469 this.useAwsAsgApi = useAwsAsgApi; 470 } 471 472 public void setEvictionIntervalTimerInMs(long evictionIntervalTimerInMs) { 473 this.evictionIntervalTimerInMs = evictionIntervalTimerInMs; 474 } 475 476 public int getASGQueryTimeoutMs() { 477 return aSGQueryTimeoutMs; 478 } 479 480 public void setASGQueryTimeoutMs(int aSGQueryTimeoutMs) { 481 this.aSGQueryTimeoutMs = aSGQueryTimeoutMs; 482 } 483 484 public long getASGUpdateIntervalMs() { 485 return aSGUpdateIntervalMs; 486 } 487 488 public void setASGUpdateIntervalMs(long aSGUpdateIntervalMs) { 489 this.aSGUpdateIntervalMs = aSGUpdateIntervalMs; 490 } 491 492 public long getASGCacheExpiryTimeoutMs() { 493 return aSGCacheExpiryTimeoutMs; 494 } 495 496 public void setASGCacheExpiryTimeoutMs(long aSGCacheExpiryTimeoutMs) { 497 this.aSGCacheExpiryTimeoutMs = aSGCacheExpiryTimeoutMs; 498 } 499 500 @Override 501 public long getResponseCacheAutoExpirationInSeconds() { 502 return responseCacheAutoExpirationInSeconds; 503 } 504 505 public void setResponseCacheAutoExpirationInSeconds( 506 long responseCacheAutoExpirationInSeconds) { 507 this.responseCacheAutoExpirationInSeconds = responseCacheAutoExpirationInSeconds; 508 } 509 510 @Override 511 public long getResponseCacheUpdateIntervalMs() { 512 return responseCacheUpdateIntervalMs; 513 } 514 515 public void setResponseCacheUpdateIntervalMs(long responseCacheUpdateIntervalMs) { 516 this.responseCacheUpdateIntervalMs = responseCacheUpdateIntervalMs; 517 } 518 519 public boolean isUseReadOnlyResponseCache() { 520 return useReadOnlyResponseCache; 521 } 522 523 public void setUseReadOnlyResponseCache(boolean useReadOnlyResponseCache) { 524 this.useReadOnlyResponseCache = useReadOnlyResponseCache; 525 } 526 527 public boolean isDisableDelta() { 528 return disableDelta; 529 } 530 531 public void setDisableDelta(boolean disableDelta) { 532 this.disableDelta = disableDelta; 533 } 534 535 @Override 536 public long getMaxIdleThreadInMinutesAgeForStatusReplication() { 537 return maxIdleThreadInMinutesAgeForStatusReplication; 538 } 539 540 public void setMaxIdleThreadInMinutesAgeForStatusReplication( 541 long maxIdleThreadInMinutesAgeForStatusReplication) { 542 this.maxIdleThreadInMinutesAgeForStatusReplication = maxIdleThreadInMinutesAgeForStatusReplication; 543 } 544 545 @Override 546 public int getMinThreadsForStatusReplication() { 547 return minThreadsForStatusReplication; 548 } 549 550 public void setMinThreadsForStatusReplication(int minThreadsForStatusReplication) { 551 this.minThreadsForStatusReplication = minThreadsForStatusReplication; 552 } 553 554 @Override 555 public int getMaxThreadsForStatusReplication() { 556 return maxThreadsForStatusReplication; 557 } 558 559 public void setMaxThreadsForStatusReplication(int maxThreadsForStatusReplication) { 560 this.maxThreadsForStatusReplication = maxThreadsForStatusReplication; 561 } 562 563 @Override 564 public int getMaxElementsInStatusReplicationPool() { 565 return maxElementsInStatusReplicationPool; 566 } 567 568 public void setMaxElementsInStatusReplicationPool( 569 int maxElementsInStatusReplicationPool) { 570 this.maxElementsInStatusReplicationPool = maxElementsInStatusReplicationPool; 571 } 572 573 public boolean isSyncWhenTimestampDiffers() { 574 return syncWhenTimestampDiffers; 575 } 576 577 public void setSyncWhenTimestampDiffers(boolean syncWhenTimestampDiffers) { 578 this.syncWhenTimestampDiffers = syncWhenTimestampDiffers; 579 } 580 581 @Override 582 public int getRegistrySyncRetries() { 583 return registrySyncRetries; 584 } 585 586 public void setRegistrySyncRetries(int registrySyncRetries) { 587 this.registrySyncRetries = registrySyncRetries; 588 } 589 590 @Override 591 public long getRegistrySyncRetryWaitMs() { 592 return registrySyncRetryWaitMs; 593 } 594 595 public void setRegistrySyncRetryWaitMs(long registrySyncRetryWaitMs) { 596 this.registrySyncRetryWaitMs = registrySyncRetryWaitMs; 597 } 598 599 @Override 600 public int getMaxElementsInPeerReplicationPool() { 601 return maxElementsInPeerReplicationPool; 602 } 603 604 public void setMaxElementsInPeerReplicationPool( 605 int maxElementsInPeerReplicationPool) { 606 this.maxElementsInPeerReplicationPool = maxElementsInPeerReplicationPool; 607 } 608 609 @Override 610 public long getMaxIdleThreadAgeInMinutesForPeerReplication() { 611 return maxIdleThreadAgeInMinutesForPeerReplication; 612 } 613 614 public void setMaxIdleThreadAgeInMinutesForPeerReplication( 615 long maxIdleThreadAgeInMinutesForPeerReplication) { 616 this.maxIdleThreadAgeInMinutesForPeerReplication = maxIdleThreadAgeInMinutesForPeerReplication; 617 } 618 619 @Override 620 public int getMinThreadsForPeerReplication() { 621 return minThreadsForPeerReplication; 622 } 623 624 public void setMinThreadsForPeerReplication(int minThreadsForPeerReplication) { 625 this.minThreadsForPeerReplication = minThreadsForPeerReplication; 626 } 627 628 @Override 629 public int getMaxThreadsForPeerReplication() { 630 return maxThreadsForPeerReplication; 631 } 632 633 public void setMaxThreadsForPeerReplication(int maxThreadsForPeerReplication) { 634 this.maxThreadsForPeerReplication = maxThreadsForPeerReplication; 635 } 636 637 @Override 638 public int getMaxTimeForReplication() { 639 return maxTimeForReplication; 640 } 641 642 public void setMaxTimeForReplication(int maxTimeForReplication) { 643 this.maxTimeForReplication = maxTimeForReplication; 644 } 645 646 public boolean isPrimeAwsReplicaConnections() { 647 return primeAwsReplicaConnections; 648 } 649 650 public void setPrimeAwsReplicaConnections(boolean primeAwsReplicaConnections) { 651 this.primeAwsReplicaConnections = primeAwsReplicaConnections; 652 } 653 654 public boolean isDisableDeltaForRemoteRegions() { 655 return disableDeltaForRemoteRegions; 656 } 657 658 public void setDisableDeltaForRemoteRegions(boolean disableDeltaForRemoteRegions) { 659 this.disableDeltaForRemoteRegions = disableDeltaForRemoteRegions; 660 } 661 662 @Override 663 public int getRemoteRegionConnectTimeoutMs() { 664 return remoteRegionConnectTimeoutMs; 665 } 666 667 public void setRemoteRegionConnectTimeoutMs(int remoteRegionConnectTimeoutMs) { 668 this.remoteRegionConnectTimeoutMs = remoteRegionConnectTimeoutMs; 669 } 670 671 @Override 672 public int getRemoteRegionReadTimeoutMs() { 673 return remoteRegionReadTimeoutMs; 674 } 675 676 public void setRemoteRegionReadTimeoutMs(int remoteRegionReadTimeoutMs) { 677 this.remoteRegionReadTimeoutMs = remoteRegionReadTimeoutMs; 678 } 679 680 @Override 681 public int getRemoteRegionTotalConnections() { 682 return remoteRegionTotalConnections; 683 } 684 685 public void setRemoteRegionTotalConnections(int remoteRegionTotalConnections) { 686 this.remoteRegionTotalConnections = remoteRegionTotalConnections; 687 } 688 689 @Override 690 public int getRemoteRegionTotalConnectionsPerHost() { 691 return remoteRegionTotalConnectionsPerHost; 692 } 693 694 public void setRemoteRegionTotalConnectionsPerHost( 695 int remoteRegionTotalConnectionsPerHost) { 696 this.remoteRegionTotalConnectionsPerHost = remoteRegionTotalConnectionsPerHost; 697 } 698 699 @Override 700 public int getRemoteRegionConnectionIdleTimeoutSeconds() { 701 return remoteRegionConnectionIdleTimeoutSeconds; 702 } 703 704 public void setRemoteRegionConnectionIdleTimeoutSeconds( 705 int remoteRegionConnectionIdleTimeoutSeconds) { 706 this.remoteRegionConnectionIdleTimeoutSeconds = remoteRegionConnectionIdleTimeoutSeconds; 707 } 708 709 public boolean isgZipContentFromRemoteRegion() { 710 return gZipContentFromRemoteRegion; 711 } 712 713 public void setgZipContentFromRemoteRegion(boolean gZipContentFromRemoteRegion) { 714 this.gZipContentFromRemoteRegion = gZipContentFromRemoteRegion; 715 } 716 717 @Override 718 public Map<String, String> getRemoteRegionUrlsWithName() { 719 return remoteRegionUrlsWithName; 720 } 721 722 public void setRemoteRegionUrlsWithName( 723 Map<String, String> remoteRegionUrlsWithName) { 724 this.remoteRegionUrlsWithName = remoteRegionUrlsWithName; 725 } 726 727 @Override 728 public String[] getRemoteRegionUrls() { 729 return remoteRegionUrls; 730 } 731 732 public void setRemoteRegionUrls(String[] remoteRegionUrls) { 733 this.remoteRegionUrls = remoteRegionUrls; 734 } 735 736 public Map<String, Set<String>> getRemoteRegionAppWhitelist() { 737 return remoteRegionAppWhitelist; 738 } 739 740 public void setRemoteRegionAppWhitelist( 741 Map<String, Set<String>> remoteRegionAppWhitelist) { 742 this.remoteRegionAppWhitelist = remoteRegionAppWhitelist; 743 } 744 745 @Override 746 public int getRemoteRegionRegistryFetchInterval() { 747 return remoteRegionRegistryFetchInterval; 748 } 749 750 public void setRemoteRegionRegistryFetchInterval( 751 int remoteRegionRegistryFetchInterval) { 752 this.remoteRegionRegistryFetchInterval = remoteRegionRegistryFetchInterval; 753 } 754 755 @Override 756 public int getRemoteRegionFetchThreadPoolSize() { 757 return remoteRegionFetchThreadPoolSize; 758 } 759 760 public void setRemoteRegionFetchThreadPoolSize(int remoteRegionFetchThreadPoolSize) { 761 this.remoteRegionFetchThreadPoolSize = remoteRegionFetchThreadPoolSize; 762 } 763 764 @Override 765 public String getRemoteRegionTrustStore() { 766 return remoteRegionTrustStore; 767 } 768 769 public void setRemoteRegionTrustStore(String remoteRegionTrustStore) { 770 this.remoteRegionTrustStore = remoteRegionTrustStore; 771 } 772 773 @Override 774 public String getRemoteRegionTrustStorePassword() { 775 return remoteRegionTrustStorePassword; 776 } 777 778 public void setRemoteRegionTrustStorePassword(String remoteRegionTrustStorePassword) { 779 this.remoteRegionTrustStorePassword = remoteRegionTrustStorePassword; 780 } 781 782 public boolean isDisableTransparentFallbackToOtherRegion() { 783 return disableTransparentFallbackToOtherRegion; 784 } 785 786 public void setDisableTransparentFallbackToOtherRegion( 787 boolean disableTransparentFallbackToOtherRegion) { 788 this.disableTransparentFallbackToOtherRegion = disableTransparentFallbackToOtherRegion; 789 } 790 791 public boolean isBatchReplication() { 792 return batchReplication; 793 } 794 795 public void setBatchReplication(boolean batchReplication) { 796 this.batchReplication = batchReplication; 797 } 798 799 @Override 800 public boolean isRateLimiterEnabled() { 801 return rateLimiterEnabled; 802 } 803 804 public void setRateLimiterEnabled(boolean rateLimiterEnabled) { 805 this.rateLimiterEnabled = rateLimiterEnabled; 806 } 807 808 @Override 809 public boolean isRateLimiterThrottleStandardClients() { 810 return rateLimiterThrottleStandardClients; 811 } 812 813 public void setRateLimiterThrottleStandardClients( 814 boolean rateLimiterThrottleStandardClients) { 815 this.rateLimiterThrottleStandardClients = rateLimiterThrottleStandardClients; 816 } 817 818 @Override 819 public Set<String> getRateLimiterPrivilegedClients() { 820 return rateLimiterPrivilegedClients; 821 } 822 823 public void setRateLimiterPrivilegedClients( 824 Set<String> rateLimiterPrivilegedClients) { 825 this.rateLimiterPrivilegedClients = rateLimiterPrivilegedClients; 826 } 827 828 @Override 829 public int getRateLimiterBurstSize() { 830 return rateLimiterBurstSize; 831 } 832 833 public void setRateLimiterBurstSize(int rateLimiterBurstSize) { 834 this.rateLimiterBurstSize = rateLimiterBurstSize; 835 } 836 837 @Override 838 public int getRateLimiterRegistryFetchAverageRate() { 839 return rateLimiterRegistryFetchAverageRate; 840 } 841 842 public void setRateLimiterRegistryFetchAverageRate( 843 int rateLimiterRegistryFetchAverageRate) { 844 this.rateLimiterRegistryFetchAverageRate = rateLimiterRegistryFetchAverageRate; 845 } 846 847 @Override 848 public int getRateLimiterFullFetchAverageRate() { 849 return rateLimiterFullFetchAverageRate; 850 } 851 852 public void setRateLimiterFullFetchAverageRate(int rateLimiterFullFetchAverageRate) { 853 this.rateLimiterFullFetchAverageRate = rateLimiterFullFetchAverageRate; 854 } 855 856 public boolean isLogIdentityHeaders() { 857 return logIdentityHeaders; 858 } 859 860 public void setLogIdentityHeaders(boolean logIdentityHeaders) { 861 this.logIdentityHeaders = logIdentityHeaders; 862 } 863 864 @Override 865 public String getListAutoScalingGroupsRoleName() { 866 return listAutoScalingGroupsRoleName; 867 } 868 869 public void setListAutoScalingGroupsRoleName(String listAutoScalingGroupsRoleName) { 870 this.listAutoScalingGroupsRoleName = listAutoScalingGroupsRoleName; 871 } 872 873 public boolean isEnableReplicatedRequestCompression() { 874 return enableReplicatedRequestCompression; 875 } 876 877 public void setEnableReplicatedRequestCompression( 878 boolean enableReplicatedRequestCompression) { 879 this.enableReplicatedRequestCompression = enableReplicatedRequestCompression; 880 } 881 882 public void setJsonCodecName(String jsonCodecName) { 883 this.jsonCodecName = jsonCodecName; 884 } 885 886 public void setXmlCodecName(String xmlCodecName) { 887 this.xmlCodecName = xmlCodecName; 888 } 889 890 @Override 891 public int getRoute53BindRebindRetries() { 892 return route53BindRebindRetries; 893 } 894 895 public void setRoute53BindRebindRetries(int route53BindRebindRetries) { 896 this.route53BindRebindRetries = route53BindRebindRetries; 897 } 898 899 @Override 900 public int getRoute53BindingRetryIntervalMs() { 901 return route53BindingRetryIntervalMs; 902 } 903 904 public void setRoute53BindingRetryIntervalMs(int route53BindingRetryIntervalMs) { 905 this.route53BindingRetryIntervalMs = route53BindingRetryIntervalMs; 906 } 907 908 @Override 909 public long getRoute53DomainTTL() { 910 return route53DomainTTL; 911 } 912 913 public void setRoute53DomainTTL(long route53DomainTTL) { 914 this.route53DomainTTL = route53DomainTTL; 915 } 916 917 @Override 918 public AwsBindingStrategy getBindingStrategy() { 919 return bindingStrategy; 920 } 921 922 public void setBindingStrategy(AwsBindingStrategy bindingStrategy) { 923 this.bindingStrategy = bindingStrategy; 924 } 925 926 public int getMinAvailableInstancesForPeerReplication() { 927 return minAvailableInstancesForPeerReplication; 928 } 929 930 public void setMinAvailableInstancesForPeerReplication( 931 int minAvailableInstancesForPeerReplication) { 932 this.minAvailableInstancesForPeerReplication = minAvailableInstancesForPeerReplication; 933 } 934 935 @Override 936 public boolean equals(Object o) { 937 if (this == o) { 938 return true; 939 } 940 if (o == null || getClass() != o.getClass()) { 941 return false; 942 } 943 EurekaServerConfigBean that = (EurekaServerConfigBean) o; 944 return aSGCacheExpiryTimeoutMs == that.aSGCacheExpiryTimeoutMs 945 && aSGQueryTimeoutMs == that.aSGQueryTimeoutMs 946 && aSGUpdateIntervalMs == that.aSGUpdateIntervalMs 947 && Objects.equals(aWSAccessId, that.aWSAccessId) 948 && Objects.equals(aWSSecretKey, that.aWSSecretKey) 949 && batchReplication == that.batchReplication 950 && bindingStrategy == that.bindingStrategy 951 && deltaRetentionTimerIntervalInMs == that.deltaRetentionTimerIntervalInMs 952 && disableDelta == that.disableDelta 953 && disableDeltaForRemoteRegions == that.disableDeltaForRemoteRegions 954 && disableTransparentFallbackToOtherRegion == that.disableTransparentFallbackToOtherRegion 955 && eIPBindingRetryIntervalMs == that.eIPBindingRetryIntervalMs 956 && eIPBindingRetryIntervalMsWhenUnbound == that.eIPBindingRetryIntervalMsWhenUnbound 957 && eIPBindRebindRetries == that.eIPBindRebindRetries 958 && enableReplicatedRequestCompression == that.enableReplicatedRequestCompression 959 && enableSelfPreservation == that.enableSelfPreservation 960 && evictionIntervalTimerInMs == that.evictionIntervalTimerInMs 961 && gZipContentFromRemoteRegion == that.gZipContentFromRemoteRegion 962 && Objects.equals(jsonCodecName, that.jsonCodecName) 963 && Objects.equals(listAutoScalingGroupsRoleName, 964 that.listAutoScalingGroupsRoleName) 965 && logIdentityHeaders == that.logIdentityHeaders 966 && maxElementsInPeerReplicationPool == that.maxElementsInPeerReplicationPool 967 && maxElementsInStatusReplicationPool == that.maxElementsInStatusReplicationPool 968 && maxIdleThreadAgeInMinutesForPeerReplication == that.maxIdleThreadAgeInMinutesForPeerReplication 969 && maxIdleThreadInMinutesAgeForStatusReplication == that.maxIdleThreadInMinutesAgeForStatusReplication 970 && maxThreadsForPeerReplication == that.maxThreadsForPeerReplication 971 && maxThreadsForStatusReplication == that.maxThreadsForStatusReplication 972 && maxTimeForReplication == that.maxTimeForReplication 973 && minAvailableInstancesForPeerReplication == that.minAvailableInstancesForPeerReplication 974 && minThreadsForPeerReplication == that.minThreadsForPeerReplication 975 && minThreadsForStatusReplication == that.minThreadsForStatusReplication 976 && numberOfReplicationRetries == that.numberOfReplicationRetries 977 && peerEurekaNodesUpdateIntervalMs == that.peerEurekaNodesUpdateIntervalMs 978 && peerEurekaStatusRefreshTimeIntervalMs == that.peerEurekaStatusRefreshTimeIntervalMs 979 && peerNodeConnectionIdleTimeoutSeconds == that.peerNodeConnectionIdleTimeoutSeconds 980 && peerNodeConnectTimeoutMs == that.peerNodeConnectTimeoutMs 981 && peerNodeReadTimeoutMs == that.peerNodeReadTimeoutMs 982 && peerNodeTotalConnections == that.peerNodeTotalConnections 983 && peerNodeTotalConnectionsPerHost == that.peerNodeTotalConnectionsPerHost 984 && primeAwsReplicaConnections == that.primeAwsReplicaConnections 985 && Objects.equals(propertyResolver, that.propertyResolver) 986 && rateLimiterBurstSize == that.rateLimiterBurstSize 987 && rateLimiterEnabled == that.rateLimiterEnabled 988 && rateLimiterFullFetchAverageRate == that.rateLimiterFullFetchAverageRate 989 && Objects.equals(rateLimiterPrivilegedClients, 990 that.rateLimiterPrivilegedClients) 991 && rateLimiterRegistryFetchAverageRate == that.rateLimiterRegistryFetchAverageRate 992 && rateLimiterThrottleStandardClients == that.rateLimiterThrottleStandardClients 993 && registrySyncRetries == that.registrySyncRetries 994 && registrySyncRetryWaitMs == that.registrySyncRetryWaitMs 995 && Objects.equals(remoteRegionAppWhitelist, that.remoteRegionAppWhitelist) 996 && remoteRegionConnectionIdleTimeoutSeconds == that.remoteRegionConnectionIdleTimeoutSeconds 997 && remoteRegionConnectTimeoutMs == that.remoteRegionConnectTimeoutMs 998 && remoteRegionFetchThreadPoolSize == that.remoteRegionFetchThreadPoolSize 999 && remoteRegionReadTimeoutMs == that.remoteRegionReadTimeoutMs 1000 && remoteRegionRegistryFetchInterval == that.remoteRegionRegistryFetchInterval 1001 && remoteRegionTotalConnections == that.remoteRegionTotalConnections 1002 && remoteRegionTotalConnectionsPerHost == that.remoteRegionTotalConnectionsPerHost 1003 && Objects.equals(remoteRegionTrustStore, that.remoteRegionTrustStore) 1004 && Objects.equals(remoteRegionTrustStorePassword, 1005 that.remoteRegionTrustStorePassword) 1006 && Arrays.equals(remoteRegionUrls, that.remoteRegionUrls) 1007 && Objects.equals(remoteRegionUrlsWithName, that.remoteRegionUrlsWithName) 1008 && Double.compare(that.renewalPercentThreshold, 1009 renewalPercentThreshold) == 0 1010 && renewalThresholdUpdateIntervalMs == that.renewalThresholdUpdateIntervalMs 1011 && responseCacheAutoExpirationInSeconds == that.responseCacheAutoExpirationInSeconds 1012 && responseCacheUpdateIntervalMs == that.responseCacheUpdateIntervalMs 1013 && retentionTimeInMSInDeltaQueue == that.retentionTimeInMSInDeltaQueue 1014 && route53BindingRetryIntervalMs == that.route53BindingRetryIntervalMs 1015 && route53BindRebindRetries == that.route53BindRebindRetries 1016 && route53DomainTTL == that.route53DomainTTL 1017 && syncWhenTimestampDiffers == that.syncWhenTimestampDiffers 1018 && useReadOnlyResponseCache == that.useReadOnlyResponseCache 1019 && waitTimeInMsWhenSyncEmpty == that.waitTimeInMsWhenSyncEmpty 1020 && Objects.equals(xmlCodecName, that.xmlCodecName) 1021 && initialCapacityOfResponseCache == that.initialCapacityOfResponseCache 1022 && expectedClientRenewalIntervalSeconds == that.expectedClientRenewalIntervalSeconds 1023 && useAwsAsgApi == that.useAwsAsgApi && Objects.equals(myUrl, that.myUrl); 1024 } 1025 1026 @Override 1027 public int hashCode() { 1028 return Objects.hash(aSGCacheExpiryTimeoutMs, aSGQueryTimeoutMs, 1029 aSGUpdateIntervalMs, aWSAccessId, aWSSecretKey, batchReplication, 1030 bindingStrategy, deltaRetentionTimerIntervalInMs, disableDelta, 1031 disableDeltaForRemoteRegions, disableTransparentFallbackToOtherRegion, 1032 eIPBindRebindRetries, eIPBindingRetryIntervalMs, 1033 eIPBindingRetryIntervalMsWhenUnbound, enableReplicatedRequestCompression, 1034 enableSelfPreservation, evictionIntervalTimerInMs, 1035 gZipContentFromRemoteRegion, jsonCodecName, listAutoScalingGroupsRoleName, 1036 logIdentityHeaders, maxElementsInPeerReplicationPool, 1037 maxElementsInStatusReplicationPool, 1038 maxIdleThreadAgeInMinutesForPeerReplication, 1039 maxIdleThreadInMinutesAgeForStatusReplication, 1040 maxThreadsForPeerReplication, maxThreadsForStatusReplication, 1041 maxTimeForReplication, minAvailableInstancesForPeerReplication, 1042 minThreadsForPeerReplication, minThreadsForStatusReplication, 1043 numberOfReplicationRetries, peerEurekaNodesUpdateIntervalMs, 1044 peerEurekaStatusRefreshTimeIntervalMs, peerNodeConnectTimeoutMs, 1045 peerNodeConnectionIdleTimeoutSeconds, peerNodeReadTimeoutMs, 1046 peerNodeTotalConnections, peerNodeTotalConnectionsPerHost, 1047 primeAwsReplicaConnections, propertyResolver, rateLimiterBurstSize, 1048 rateLimiterEnabled, rateLimiterFullFetchAverageRate, 1049 rateLimiterPrivilegedClients, rateLimiterRegistryFetchAverageRate, 1050 rateLimiterThrottleStandardClients, registrySyncRetries, 1051 registrySyncRetryWaitMs, remoteRegionAppWhitelist, 1052 remoteRegionConnectTimeoutMs, remoteRegionConnectionIdleTimeoutSeconds, 1053 remoteRegionFetchThreadPoolSize, remoteRegionReadTimeoutMs, 1054 remoteRegionRegistryFetchInterval, remoteRegionTotalConnections, 1055 remoteRegionTotalConnectionsPerHost, remoteRegionTrustStore, 1056 remoteRegionTrustStorePassword, remoteRegionUrls, 1057 remoteRegionUrlsWithName, renewalPercentThreshold, 1058 renewalThresholdUpdateIntervalMs, responseCacheAutoExpirationInSeconds, 1059 responseCacheUpdateIntervalMs, retentionTimeInMSInDeltaQueue, 1060 route53BindRebindRetries, route53BindingRetryIntervalMs, route53DomainTTL, 1061 syncWhenTimestampDiffers, useReadOnlyResponseCache, 1062 waitTimeInMsWhenSyncEmpty, xmlCodecName, initialCapacityOfResponseCache, 1063 expectedClientRenewalIntervalSeconds, useAwsAsgApi, myUrl); 1064 } 1065 1066 @Override 1067 public String toString() { 1068 return new ToStringCreator(this) 1069 .append("aSGCacheExpiryTimeoutMs", this.aSGCacheExpiryTimeoutMs) 1070 .append("aSGQueryTimeoutMs", this.aSGQueryTimeoutMs) 1071 .append("aSGUpdateIntervalMs", this.aSGUpdateIntervalMs) 1072 .append("aWSAccessId", this.aWSAccessId) 1073 .append("aWSSecretKey", this.aWSSecretKey) 1074 .append("batchReplication", this.batchReplication) 1075 .append("bindingStrategy", this.bindingStrategy) 1076 .append("deltaRetentionTimerIntervalInMs", 1077 this.deltaRetentionTimerIntervalInMs) 1078 .append("disableDelta", this.disableDelta) 1079 .append("disableDeltaForRemoteRegions", this.disableDeltaForRemoteRegions) 1080 .append("disableTransparentFallbackToOtherRegion", 1081 this.disableTransparentFallbackToOtherRegion) 1082 .append("eIPBindRebindRetries", this.eIPBindRebindRetries) 1083 .append("eIPBindingRetryIntervalMs", this.eIPBindingRetryIntervalMs) 1084 .append("eIPBindingRetryIntervalMsWhenUnbound", 1085 this.eIPBindingRetryIntervalMsWhenUnbound) 1086 .append("enableReplicatedRequestCompression", 1087 this.enableReplicatedRequestCompression) 1088 .append("enableSelfPreservation", this.enableSelfPreservation) 1089 .append("evictionIntervalTimerInMs", this.evictionIntervalTimerInMs) 1090 .append("gZipContentFromRemoteRegion", this.gZipContentFromRemoteRegion) 1091 .append("jsonCodecName", this.jsonCodecName) 1092 .append("listAutoScalingGroupsRoleName", 1093 this.listAutoScalingGroupsRoleName) 1094 .append("logIdentityHeaders", this.logIdentityHeaders) 1095 .append("maxElementsInPeerReplicationPool", 1096 this.maxElementsInPeerReplicationPool) 1097 .append("maxElementsInStatusReplicationPool", 1098 this.maxElementsInStatusReplicationPool) 1099 .append("maxIdleThreadAgeInMinutesForPeerReplication", 1100 this.maxIdleThreadAgeInMinutesForPeerReplication) 1101 .append("maxIdleThreadInMinutesAgeForStatusReplication", 1102 this.maxIdleThreadInMinutesAgeForStatusReplication) 1103 .append("maxThreadsForPeerReplication", this.maxThreadsForPeerReplication) 1104 .append("maxThreadsForStatusReplication", 1105 this.maxThreadsForStatusReplication) 1106 .append("maxTimeForReplication", this.maxTimeForReplication) 1107 .append("minAvailableInstancesForPeerReplication", 1108 this.minAvailableInstancesForPeerReplication) 1109 .append("minThreadsForPeerReplication", this.minThreadsForPeerReplication) 1110 .append("minThreadsForStatusReplication", 1111 this.minThreadsForStatusReplication) 1112 .append("numberOfReplicationRetries", this.numberOfReplicationRetries) 1113 .append("peerEurekaNodesUpdateIntervalMs", 1114 this.peerEurekaNodesUpdateIntervalMs) 1115 .append("peerEurekaStatusRefreshTimeIntervalMs", 1116 this.peerEurekaStatusRefreshTimeIntervalMs) 1117 .append("peerNodeConnectTimeoutMs", this.peerNodeConnectTimeoutMs) 1118 .append("peerNodeConnectionIdleTimeoutSeconds", 1119 this.peerNodeConnectionIdleTimeoutSeconds) 1120 .append("peerNodeReadTimeoutMs", this.peerNodeReadTimeoutMs) 1121 .append("peerNodeTotalConnections", this.peerNodeTotalConnections) 1122 .append("peerNodeTotalConnectionsPerHost", 1123 this.peerNodeTotalConnectionsPerHost) 1124 .append("primeAwsReplicaConnections", this.primeAwsReplicaConnections) 1125 .append("propertyResolver", this.propertyResolver) 1126 .append("rateLimiterBurstSize", this.rateLimiterBurstSize) 1127 .append("rateLimiterEnabled", this.rateLimiterEnabled) 1128 .append("rateLimiterFullFetchAverageRate", 1129 this.rateLimiterFullFetchAverageRate) 1130 .append("rateLimiterPrivilegedClients", this.rateLimiterPrivilegedClients) 1131 .append("rateLimiterRegistryFetchAverageRate", 1132 this.rateLimiterRegistryFetchAverageRate) 1133 .append("rateLimiterThrottleStandardClients", 1134 this.rateLimiterThrottleStandardClients) 1135 .append("registrySyncRetries", this.registrySyncRetries) 1136 .append("registrySyncRetryWaitMs", this.registrySyncRetryWaitMs) 1137 .append("remoteRegionAppWhitelist", this.remoteRegionAppWhitelist) 1138 .append("remoteRegionConnectTimeoutMs", this.remoteRegionConnectTimeoutMs) 1139 .append("remoteRegionConnectionIdleTimeoutSeconds", 1140 this.remoteRegionConnectionIdleTimeoutSeconds) 1141 .append("remoteRegionFetchThreadPoolSize", 1142 this.remoteRegionFetchThreadPoolSize) 1143 .append("remoteRegionReadTimeoutMs", this.remoteRegionReadTimeoutMs) 1144 .append("remoteRegionRegistryFetchInterval", 1145 this.remoteRegionRegistryFetchInterval) 1146 .append("remoteRegionTotalConnections", this.remoteRegionTotalConnections) 1147 .append("remoteRegionTotalConnectionsPerHost", 1148 this.remoteRegionTotalConnectionsPerHost) 1149 .append("remoteRegionTrustStore", this.remoteRegionTrustStore) 1150 .append("remoteRegionTrustStorePassword", 1151 this.remoteRegionTrustStorePassword) 1152 .append("remoteRegionUrls", this.remoteRegionUrls) 1153 .append("remoteRegionUrlsWithName", this.remoteRegionUrlsWithName) 1154 .append("renewalPercentThreshold", this.renewalPercentThreshold) 1155 .append("renewalThresholdUpdateIntervalMs", 1156 this.renewalThresholdUpdateIntervalMs) 1157 .append("responseCacheAutoExpirationInSeconds", 1158 this.responseCacheAutoExpirationInSeconds) 1159 .append("responseCacheUpdateIntervalMs", 1160 this.responseCacheUpdateIntervalMs) 1161 .append("retentionTimeInMSInDeltaQueue", 1162 this.retentionTimeInMSInDeltaQueue) 1163 .append("route53BindRebindRetries", this.route53BindRebindRetries) 1164 .append("route53BindingRetryIntervalMs", 1165 this.route53BindingRetryIntervalMs) 1166 .append("route53DomainTTL", this.route53DomainTTL) 1167 .append("syncWhenTimestampDiffers", this.syncWhenTimestampDiffers) 1168 .append("useReadOnlyResponseCache", this.useReadOnlyResponseCache) 1169 .append("waitTimeInMsWhenSyncEmpty", this.waitTimeInMsWhenSyncEmpty) 1170 .append("xmlCodecName", this.xmlCodecName) 1171 .append("initialCapacityOfResponseCache", 1172 this.initialCapacityOfResponseCache) 1173 .append("expectedClientRenewalIntervalSeconds", 1174 this.expectedClientRenewalIntervalSeconds) 1175 .append("useAwsAsgApi", this.useAwsAsgApi).append("myUrl", this.myUrl) 1176 .toString(); 1177 } 1178 1179 }
View Code
③ Eureka Server 初始化
還可以看到 EurekaServerAutoConfiguration 導入了 EurekaServerInitializerConfiguration 的初始化配置類,它啟動了一個後台執行緒來初始化 eurekaServerBootstrap,進入可以看到跟 EurekaBootStrap 的初始化是類似的,只不過是簡化了些,就不在展示了。


1 public void start() { 2 new Thread(() -> { 3 try { 4 // TODO: is this class even needed now? 5 eurekaServerBootstrap.contextInitialized( 6 EurekaServerInitializerConfiguration.this.servletContext); 7 log.info("Started Eureka Server"); 8 9 publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig())); 10 EurekaServerInitializerConfiguration.this.running = true; 11 publish(new EurekaServerStartedEvent(getEurekaServerConfig())); 12 } 13 catch (Exception ex) { 14 // Help! 15 log.error("Could not initialize Eureka servlet context", ex); 16 } 17 }).start(); 18 }
View Code
2、spring-cloud-starter-netflix-eureka-client
① Eureka Client 自動化配置
Eureka Client 自動化配置類是 EurekaClientAutoConfiguration(@EnableEurekaClient 註解感覺沒啥用),這裡初始化類里主要初始化了 ApplicationInfoManager、EurekaClientConfigBean、EurekaInstanceConfigBean、EurekaClient 等。
需要注意,在 springcloud 中,EurekaClient 組件默認是 CloudEurekaClient;


1 @Configuration(proxyBeanMethods = false) 2 @EnableConfigurationProperties 3 @ConditionalOnClass(EurekaClientConfig.class) 4 @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true) 5 @ConditionalOnDiscoveryEnabled 6 @AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class, 7 CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class }) 8 @AutoConfigureAfter(name = { 9 "org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration", 10 "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration", 11 "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration", 12 "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" }) 13 public class EurekaClientAutoConfiguration { 14 15 private ConfigurableEnvironment env; 16 17 public EurekaClientAutoConfiguration(ConfigurableEnvironment env) { 18 this.env = env; 19 } 20 21 @Bean 22 public HasFeatures eurekaFeature() { 23 return HasFeatures.namedFeature("Eureka Client", EurekaClient.class); 24 } 25 26 @Bean 27 @ConditionalOnMissingBean(value = EurekaClientConfig.class, 28 search = SearchStrategy.CURRENT) 29 public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) { 30 return new EurekaClientConfigBean(); 31 } 32 33 @Bean 34 @ConditionalOnMissingBean 35 public ManagementMetadataProvider serviceManagementMetadataProvider() { 36 return new DefaultManagementMetadataProvider(); 37 } 38 39 private String getProperty(String property) { 40 return this.env.containsProperty(property) ? this.env.getProperty(property) : ""; 41 } 42 43 @Bean 44 @ConditionalOnMissingBean(value = EurekaInstanceConfig.class, 45 search = SearchStrategy.CURRENT) 46 public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils, 47 ManagementMetadataProvider managementMetadataProvider) { 48 String hostname = getProperty("eureka.instance.hostname"); 49 boolean preferIpAddress = Boolean 50 .parseBoolean(getProperty("eureka.instance.prefer-ip-address")); 51 String ipAddress = getProperty("eureka.instance.ip-address"); 52 boolean isSecurePortEnabled = Boolean 53 .parseBoolean(getProperty("eureka.instance.secure-port-enabled")); 54 55 String serverContextPath = env.getProperty("server.servlet.context-path", "/"); 56 int serverPort = Integer.parseInt( 57 env.getProperty("server.port", env.getProperty("port", "8080"))); 58 59 Integer managementPort = env.getProperty("management.server.port", Integer.class); 60 String managementContextPath = env 61 .getProperty("management.server.servlet.context-path"); 62 Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port", 63 Integer.class); 64 EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils); 65 66 instance.setNonSecurePort(serverPort); 67 instance.setInstanceId(getDefaultInstanceId(env)); 68 instance.setPreferIpAddress(preferIpAddress); 69 instance.setSecurePortEnabled(isSecurePortEnabled); 70 if (StringUtils.hasText(ipAddress)) { 71 instance.setIpAddress(ipAddress); 72 } 73 74 if (isSecurePortEnabled) { 75 instance.setSecurePort(serverPort); 76 } 77 78 if (StringUtils.hasText(hostname)) { 79 instance.setHostname(hostname); 80 } 81 String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path"); 82 String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path"); 83 84 if (StringUtils.hasText(statusPageUrlPath)) { 85 instance.setStatusPageUrlPath(statusPageUrlPath); 86 } 87 if (StringUtils.hasText(healthCheckUrlPath)) { 88 instance.setHealthCheckUrlPath(healthCheckUrlPath); 89 } 90 91 ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort, 92 serverContextPath, managementContextPath, managementPort); 93 94 if (metadata != null) { 95 instance.setStatusPageUrl(metadata.getStatusPageUrl()); 96 instance.setHealthCheckUrl(metadata.getHealthCheckUrl()); 97 if (instance.isSecurePortEnabled()) { 98 instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl()); 99 } 100 Map<String, String> metadataMap = instance.getMetadataMap(); 101 metadataMap.computeIfAbsent("management.port", 102 k -> String.valueOf(metadata.getManagementPort())); 103 } 104 else { 105 // without the metadata the status and health check URLs will not be set 106 // and the status page and health check url paths will not include the 107 // context path so set them here 108 if (StringUtils.hasText(managementContextPath)) { 109 instance.setHealthCheckUrlPath( 110 managementContextPath + instance.getHealthCheckUrlPath()); 111 instance.setStatusPageUrlPath( 112 managementContextPath + instance.getStatusPageUrlPath()); 113 } 114 } 115 116 setupJmxPort(instance, jmxPort); 117 return instance; 118 } 119 120 private void setupJmxPort(EurekaInstanceConfigBean instance, Integer jmxPort) { 121 Map<String, String> metadataMap = instance.getMetadataMap(); 122 if (metadataMap.get("jmx.port") == null && jmxPort != null) { 123 metadataMap.put("jmx.port", String.valueOf(jmxPort)); 124 } 125 } 126 127 @Bean 128 public EurekaServiceRegistry eurekaServiceRegistry() { 129 return new EurekaServiceRegistry(); 130 } 131 132 // @Bean 133 // @ConditionalOnBean(AutoServiceRegistrationProperties.class) 134 // @ConditionalOnProperty(value = 135 // "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) 136 // public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, 137 // CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager 138 // applicationInfoManager, ObjectProvider<HealthCheckHandler> healthCheckHandler) { 139 // return EurekaRegistration.builder(instanceConfig) 140 // .with(applicationInfoManager) 141 // .with(eurekaClient) 142 // .with(healthCheckHandler) 143 // .build(); 144 // } 145 146 @Bean 147 @ConditionalOnBean(AutoServiceRegistrationProperties.class) 148 @ConditionalOnProperty( 149 value = "spring.cloud.service-registry.auto-registration.enabled", 150 matchIfMissing = true) 151 public EurekaAutoServiceRegistration eurekaAutoServiceRegistration( 152 ApplicationContext context, EurekaServiceRegistry registry, 153 EurekaRegistration registration) { 154 return new EurekaAutoServiceRegistration(context, registry, registration); 155 } 156 157 @Configuration(proxyBeanMethods = false) 158 @ConditionalOnMissingRefreshScope 159 protected static class EurekaClientConfiguration { 160 161 @Autowired 162 private ApplicationContext context; 163 164 @Autowired 165 private AbstractDiscoveryClientOptionalArgs<?> optionalArgs; 166 167 @Bean(destroyMethod = "shutdown") 168 @ConditionalOnMissingBean(value = EurekaClient.class, 169 search = SearchStrategy.CURRENT) 170 public EurekaClient eurekaClient(ApplicationInfoManager manager, 171 EurekaClientConfig config) { 172 return new CloudEurekaClient(manager, config, this.optionalArgs, 173 this.context); 174 } 175 176 @Bean 177 @ConditionalOnMissingBean(value = ApplicationInfoManager.class, 178 search = SearchStrategy.CURRENT) 179 public ApplicationInfoManager eurekaApplicationInfoManager( 180 EurekaInstanceConfig config) { 181 InstanceInfo instanceInfo = new InstanceInfoFactory().create(config); 182 return new ApplicationInfoManager(config, instanceInfo); 183 } 184 185 @Bean 186 @ConditionalOnBean(AutoServiceRegistrationProperties.class) 187 @ConditionalOnProperty( 188 value = "spring.cloud.service-registry.auto-registration.enabled", 189 matchIfMissing = true) 190 public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, 191 CloudEurekaInstanceConfig instanceConfig, 192 ApplicationInfoManager applicationInfoManager, @Autowired( 193 required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) { 194 return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager) 195 .with(eurekaClient).with(healthCheckHandler).build(); 196 } 197 198 } 199 200 @Configuration(proxyBeanMethods = false) 201 @ConditionalOnRefreshScope 202 protected static class RefreshableEurekaClientConfiguration { 203 204 @Autowired 205 private ApplicationContext context; 206 207 @Autowired 208 private AbstractDiscoveryClientOptionalArgs<?> optionalArgs; 209 210 @Bean(destroyMethod = "shutdown") 211 @ConditionalOnMissingBean(value = EurekaClient.class, 212 search = SearchStrategy.CURRENT) 213 @org.springframework.cloud.context.config.annotation.RefreshScope 214 @Lazy 215 public EurekaClient eurekaClient(ApplicationInfoManager manager, 216 EurekaClientConfig config, EurekaInstanceConfig instance, 217 @Autowired(required = false) HealthCheckHandler healthCheckHandler) { 218 // If we use the proxy of the ApplicationInfoManager we could run into a 219 // problem 220 // when shutdown is called on the CloudEurekaClient where the 221 // ApplicationInfoManager bean is 222 // requested but wont be allowed because we are shutting down. To avoid this 223 // we use the 224 // object directly. 225 ApplicationInfoManager appManager; 226 if (AopUtils.isAopProxy(manager)) { 227 appManager = ProxyUtils.getTargetObject(manager); 228 } 229 else { 230 appManager = manager; 231 } 232 CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, 233 config, this.optionalArgs, this.context); 234 cloudEurekaClient.registerHealthCheck(healthCheckHandler); 235 return cloudEurekaClient; 236 } 237 238 @Bean 239 @ConditionalOnMissingBean(value = ApplicationInfoManager.class, 240 search = SearchStrategy.CURRENT) 241 @org.springframework.cloud.context.config.annotation.RefreshScope 242 @Lazy 243 public ApplicationInfoManager eurekaApplicationInfoManager( 244 EurekaInstanceConfig config) { 245 InstanceInfo instanceInfo = new InstanceInfoFactory().create(config); 246 return new ApplicationInfoManager(config, instanceInfo); 247 } 248 249 @Bean 250 @org.springframework.cloud.context.config.annotation.RefreshScope 251 @ConditionalOnBean(AutoServiceRegistrationProperties.class) 252 @ConditionalOnProperty( 253 value = "spring.cloud.service-registry.auto-registration.enabled", 254 matchIfMissing = true) 255 public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, 256 CloudEurekaInstanceConfig instanceConfig, 257 ApplicationInfoManager applicationInfoManager, @Autowired( 258 required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) { 259 return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager) 260 .with(eurekaClient).with(healthCheckHandler).build(); 261 } 262 263 } 264 265 @Target({ ElementType.TYPE, ElementType.METHOD }) 266 @Retention(RetentionPolicy.RUNTIME) 267 @Documented 268 @Conditional(OnMissingRefreshScopeCondition.class) 269 @interface ConditionalOnMissingRefreshScope { 270 271 } 272 273 @Target({ ElementType.TYPE, ElementType.METHOD }) 274 @Retention(RetentionPolicy.RUNTIME) 275 @Documented 276 @ConditionalOnClass(RefreshScope.class) 277 @ConditionalOnBean(RefreshAutoConfiguration.class) 278 @ConditionalOnProperty(value = "eureka.client.refresh.enable", havingValue = "true", 279 matchIfMissing = true) 280 @interface ConditionalOnRefreshScope { 281 282 } 283 284 private static class OnMissingRefreshScopeCondition extends AnyNestedCondition { 285 286 OnMissingRefreshScopeCondition() { 287 super(ConfigurationPhase.REGISTER_BEAN); 288 } 289 290 @ConditionalOnMissingClass("org.springframework.cloud.context.scope.refresh.RefreshScope") 291 static class MissingClass { 292 293 } 294 295 @ConditionalOnMissingBean(RefreshAutoConfiguration.class) 296 static class MissingScope { 297 298 } 299 300 @ConditionalOnProperty(value = "eureka.client.refresh.enable", 301 havingValue = "false") 302 static class OnPropertyDisabled { 303 304 } 305 306 } 307 308 @Configuration(proxyBeanMethods = false) 309 @ConditionalOnClass(Health.class) 310 protected static class EurekaHealthIndicatorConfiguration { 311 312 @Bean 313 @ConditionalOnMissingBean 314 @ConditionalOnEnabledHealthIndicator("eureka") 315 public EurekaHealthIndicator eurekaHealthIndicator(EurekaClient eurekaClient, 316 EurekaInstanceConfig instanceConfig, EurekaClientConfig clientConfig) { 317 return new EurekaHealthIndicator(eurekaClient, instanceConfig, clientConfig); 318 } 319 320 } 321 322 }
View Code
② Eureka Client 註冊
Netflix 中服務註冊的邏輯是在 InstanceInfoReplicator,springcloud 則封裝到了 EurekaAutoServiceRegistration,InstanceInfoReplicator 啟動之後要延遲40秒才會註冊到註冊中心,而這裡的自動化配置在服務啟動時就會註冊到註冊中心。
它這裡調用了 serviceRegistry 來註冊,進去可以發現它就是調用了 ApplicationInfoManager 的 setInstanceStatus 方法,進而觸發了那個狀態變更器 StatusChangeListener,然後向註冊中心註冊。


1 public void start() { 2 // only set the port if the nonSecurePort or securePort is 0 and this.port != 0 3 if (this.port.get() != 0) { 4 if (this.registration.getNonSecurePort() == 0) { 5 this.registration.setNonSecurePort(this.port.get()); 6 } 7 8 if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) { 9 this.registration.setSecurePort(this.port.get()); 10 } 11 } 12 13 // only initialize if nonSecurePort is greater than 0 and it isn't already running 14 // because of containerPortInitializer below 15 if (!this.running.get() && this.registration.getNonSecurePort() > 0) { 16 17 this.serviceRegistry.register(this.registration); 18 19 this.context.publishEvent(new InstanceRegisteredEvent<>(this, 20 this.registration.getInstanceConfig())); 21 this.running.set(true); 22 } 23 }
View Code
十三、Eureka 總結
這一節來對 eureka 的學習做個總結,注意下面的一些截圖是來自《重新定義Spring Cloud實戰 》,具體可以參考原著。
1、Eureka Server 提供的 API
大部分的 API 在前面的源碼分析中都已經接觸過了,這裡看下 eureka 提供的 API列表。注意 eureka 整合到 springcloud 之後,api 前綴固定為 /eureka,在 netflix 中是 /{version} 的形式。
2、Eureka Client 核心參數
Eureka Client 的參數可以分為基本參數、定時任務參數、http參數三大類。
① 基本參數
② 定時任務參數
③ http 參數
3、Eureka Server 核心參數
Eureka Server 的參數可以分為 基本參數、多級快取參數、集群相關參數、http參數 四大類。
① 基本參數
② 多級快取參數
③ 集群參數
④ http 參數
4、Eureka 核心功能
① 服務註冊和發現:eureka 分客戶端(Eureka Client)和服務端(Eureka Server),服務端即為註冊中心,提供服務註冊和發現的功能。所有客戶端將自己註冊到註冊中心上,服務端使用 Map 結構基於記憶體保存所有客戶端資訊(IP、埠、續約等資訊)。客戶端定時從註冊中心拉取註冊表到本地,就可以通過負載均衡的方式進行服務間的調用。
② 服務註冊(Register):Eureka Client 啟動時向 Eureka Server 註冊,並提供自身的元數據、IP地址、埠、狀態等資訊。
③ 服務續約(Renew):Eureka Client 默認每隔30秒向 Eureka Server 發送一次心跳進行服務續約,通過續約告知 Eureka Server 自己是正常的。如果 Eureka Server 180 秒沒有收到客戶端的續約,就會認為客戶端故障,並將其剔除。
④ 抓取註冊表(Fetch Registry):Eureka Client 啟動時會向 Eureka Server 全量抓取一次註冊表到本地,之後會每隔30秒增量抓取註冊表合併到本地註冊表。如果合併後的本地註冊表與 Eureka Server 端的註冊表不一致(hash 比對),就全量抓取註冊表覆蓋本地的註冊表。
⑤ 服務下線(Cancel):Eureka Client 程式正常關閉時,會向 Eureka Server 發送下線請求,之後 Eureka Server 將這個實例從註冊表中剔除。
⑥ 故障剔除(Eviction):默認情況下,Eureka Client 連續180秒沒有向 Eureka Server 發送續約請求,就會被認為實例故障,然後從註冊表剔除。
⑦ Eureka Server 集群:Eureka Server 採用對等複製模式(Peer to Peer)來進行副本之間的數據同步,集群中每個 Server 節點都可以接收寫操作和讀操作。Server 節點接收到寫操作後(註冊、續約、下線、狀態更新)會通過後台任務打包成批量任務發送到集群其它 Server 節點進行數據同步。Eureka Server 集群副本之間的數據會有短暫的不一致性,它是滿足 CAP 中的 AP,即 高可用性和分區容錯性。
5、Eureka 整體架構
最後用一張圖來總結下 Eureka 的整體架構、運行流程以及核心機制。
//www.cnblogs.com/chiangchou/p/eureka-1.html