SpringCloud 源碼系列(4)—— 負載均衡 Ribbon
- 2020 年 12 月 17 日
- 筆記
- RestTemplate, ribbon, SpringCloud, 負載均衡
一、負載均衡
1、RestTemplate
在研究 eureka 源碼上篇中,我們在 demo-consumer 消費者服務中定義了用 @LoadBalanced 標記的 RestTemplate,然後使用 RestTemplate 通過服務名的形式來調用遠程服務 demo-producer,然後請求會輪詢到兩個 demo-producer 實例上。
RestTemplate 是 Spring Resources 中一個訪問第三方 RESTful API 接口的網絡請求框架。RestTemplate 是用來消費 REST 服務的,所以 RestTemplate 的主要方法都與 REST 的 Http協議的一些方法緊密相連,例如 HEAD、GET、POST、PUT、DELETE 和 OPTIONS 等方法,這些方法在 RestTemplate 類對應的方法為 headForHeaders()、getForObject()、postForObject()、put() 和 delete() 等。
RestTemplate 本身是不具備負載均衡的能力的,如果 RestTemplate 未使用 @LoadBalanced 標記,就通過服務名的形式來調用,必然會報錯。用 @LoadBalanced 標記後,調用 RestTemplate 的 REST 方法就會通過負載均衡的方式通過一定的策略路由到某個服務實例上,底層負責負載均衡的組件就是 Ribbon。後面我們再來看 @LoadBalanced 是如何讓 RestTemplate 具備負載均衡的能力的。
1 @SpringBootApplication 2 public class ConsumerApplication { 3 4 @Bean 5 @LoadBalanced 6 public RestTemplate restTemplate() { 7 return new RestTemplate(); 8 } 9 10 public static void main(String[] args) { 11 SpringApplication.run(ConsumerApplication.class, args); 12 } 13 } 14 15 @RestController 16 public class DemoController { 17 private final Logger logger = LoggerFactory.getLogger(getClass()); 18 19 @Autowired 20 private RestTemplate restTemplate; 21 22 @GetMapping("/v1/id") 23 public ResponseEntity<String> getId() { 24 ResponseEntity<String> result = restTemplate.getForEntity("//demo-producer/v1/uuid", String.class); 25 String uuid = result.getBody(); 26 logger.info("request id: {}", uuid); 27 return ResponseEntity.ok(uuid); 28 } 29 }
2、Ribbon 與負載均衡
① 負載均衡
負載均衡是指將負載分攤到多個執行單元上,負載均衡主要可以分為集中式負載均衡與進程內負載均衡:
- 集中式負載均衡指位於互聯網與執行單元之間,並負責把網絡請求轉發到各個執行單元上,比如 Nginx、F5。集中式負載均衡也可以稱為服務端負載均衡。
- 進程內負載均衡是將負載均衡邏輯集成到客戶端上,客戶端維護了一份服務提供者的實例列表,實例列表一般會從註冊中心比如 Eureka 中獲取。有了實例列表,就可以通過負載均衡策略將請求分攤給多個服務提供者,從而達到負載均衡的目的。進程內負載均衡一般也稱為客戶端負載均衡。
Ribbon 是一個客戶端負載均衡器,可以很好地控制 HTTP 和 TCP 客戶端的負載均衡行為。Ribbon 是 Netflix 公司開源的一個負載均衡組件,已經整合到 SpringCloud 生態中,它在 Spring Cloud 生態內是一個不可缺少的組件,少了它,服務便不能橫向擴展。
② Ribbon 模塊
Ribbon 有很多子模塊,官方文檔中說明,目前 Netflix 公司主要用於生產環境的 Ribbon 子模塊如下:
- ribbon-loadbalancer:可以獨立使用或與其他模塊一起使用的負載均衡器 API。
- ribbon-eureka:Ribbon 結合 Eureka 客戶端的 API,為負載均衡器提供動態服務註冊列表信息。
- ribbon-core:Ribbon 的核心API。
③ springcloud 與 ribbon 整合
與 eureka 整合到 springcloud 類似,springcloud 提供了對應的 spring-cloud-starter-netflix-eureka-client(server) 依賴包,ribbon 則整合到了 spring-cloud-starter-netflix-ribbon 中。一般也不需要單獨引入 ribbon 的依賴包,spring-cloud-starter-netflix-eureka-client 中已經依賴了 spring-cloud-starter-netflix-ribbon。因此我們引入了 spring-cloud-starter-netflix-eureka-client 就可以使用 Ribbon 的功能了。
④ Ribbon 與 RestTemplate 整合使用
在 Spring Cloud 構建的微服務系統中,Ribbon 作為服務消費者的負載均衡器,有兩種使用方式,一種是和 RestTemplate 相結合,另一種是和 Feign 相結合。前面已經演示了帶有負載均衡的 RestTemplate 的使用,下面用一張圖來看看 RestTemplate 基於 Ribbon 的遠程調用。
二、RestTemplate 負載均衡
1、@LoadBalanced 註解
以 RestTemplate 為切入點,來看 Ribbon 的負載均衡核心原理。那麼首先就要先看看 @LoadBalanced 註解如何讓 RestTemplate 具備負載均衡的能力了。
首先看 @LoadBalanced 這個註解的定義,可以得到如下信息:
- 這個註解使用 @Qualifier 標記,其它地方就可以注入 LoadBalanced 註解的 bean 對象。
- 從注釋中可以了解到,@LoadBalanced 標記的 RestTemplate 或 WebClient 將使用 LoadBalancerClient 來配置 bean 對象。
1 /** 2 * Annotation to mark a RestTemplate or WebClient bean to be configured to use a LoadBalancerClient. 3 * @author Spencer Gibb 4 */ 5 @Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD }) 6 @Retention(RetentionPolicy.RUNTIME) 7 @Documented 8 @Inherited 9 @Qualifier 10 public @interface LoadBalanced { 11 12 }
注意 @LoadBalanced 是 spring-cloud-commons 模塊下 loadbalancer 包下的。
2、RestTemplate 負載均衡自動化配置
在 @LoadBalanced 同包下,有一個 LoadBalancerAutoConfiguration 自動化配置類,從注釋也可以看出,這是客戶端負載均衡 Ribbon 的自動化配置類。
從這個自動化配置類可以得到如下信息:
- 首先要有 RestTemplate 的依賴和定義了 LoadBalancerClient 對象的前提下才會觸發這個自動化配置類,這也對應了前面,RestTemplate 要用 LoadBalancerClient 來配置。
- 接着可以看到這個類注入了帶有 @LoadBalanced 標識的 RestTemplate 對象,就是要對這部分對象增加負載均衡的能力。
- 從 SmartInitializingSingleton 的構造中可以看到,就是在 bean 初始化完成後,用 RestTemplateCustomizer 定製化 RestTemplate。
- 再往下可以看到,RestTemplateCustomizer 其實就是向 RestTemplate 中添加了 LoadBalancerInterceptor 這個攔截器。
- 而 LoadBalancerInterceptor 的構建又需要 LoadBalancerClient 和 LoadBalancerRequestFactory,LoadBalancerRequestFactory 則通過 LoadBalancerClient 和 LoadBalancerRequestTransformer 構造完成。
1 /** 2 * Auto-configuration for Ribbon (client-side load balancing). 3 */ 4 @Configuration(proxyBeanMethods = false) 5 @ConditionalOnClass(RestTemplate.class) // 有 RestTemplate 的依賴 6 @ConditionalOnBean(LoadBalancerClient.class) // 定義了 LoadBalancerClient 的 bean 對象 7 @EnableConfigurationProperties(LoadBalancerRetryProperties.class) 8 public class LoadBalancerAutoConfiguration { 9 10 // 注入 @LoadBalanced 標記的 RestTemplate 對象 11 @LoadBalanced 12 @Autowired(required = false) 13 private List<RestTemplate> restTemplates = Collections.emptyList(); 14 15 @Autowired(required = false) 16 private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList(); 17 18 @Bean 19 public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated( 20 final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) { 21 return () -> restTemplateCustomizers.ifAvailable(customizers -> { 22 for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) { 23 for (RestTemplateCustomizer customizer : customizers) { 24 // 利用 RestTemplateCustomizer 定製化 restTemplate 25 customizer.customize(restTemplate); 26 } 27 } 28 }); 29 } 30 31 @Bean 32 @ConditionalOnMissingBean 33 public LoadBalancerRequestFactory loadBalancerRequestFactory( 34 LoadBalancerClient loadBalancerClient) { 35 return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers); 36 } 37 38 @Configuration(proxyBeanMethods = false) 39 @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate") 40 static class LoadBalancerInterceptorConfig { 41 42 // 創建 LoadBalancerInterceptor 需要 LoadBalancerClient 和 LoadBalancerRequestFactory 43 @Bean 44 public LoadBalancerInterceptor ribbonInterceptor( 45 LoadBalancerClient loadBalancerClient, 46 LoadBalancerRequestFactory requestFactory) { 47 return new LoadBalancerInterceptor(loadBalancerClient, requestFactory); 48 } 49 50 @Bean 51 @ConditionalOnMissingBean 52 public RestTemplateCustomizer restTemplateCustomizer( 53 final LoadBalancerInterceptor loadBalancerInterceptor) { 54 return restTemplate -> { 55 List<ClientHttpRequestInterceptor> list = new ArrayList<>( 56 restTemplate.getInterceptors()); 57 // 向 restTemplate 添加 LoadBalancerInterceptor 攔截器 58 list.add(loadBalancerInterceptor); 59 restTemplate.setInterceptors(list); 60 }; 61 } 62 63 } 64 }
3、RestTemplate 攔截器 LoadBalancerInterceptor
LoadBalancerAutoConfiguration 自動化配置主要就是給 RestTemplate 添加了一個負載均衡攔截器 LoadBalancerInterceptor。從 setInterceptors 的參數可以看出,攔截器的類型是 ClientHttpRequestInterceptor,如果我們想定製化 RestTemplate,就可以實現這個接口來定製化,然後還可以用 @Order 標記攔截器的先後順序。
1 public void setInterceptors(List<ClientHttpRequestInterceptor> interceptors) { 2 if (this.interceptors != interceptors) { 3 this.interceptors.clear(); 4 this.interceptors.addAll(interceptors); 5 // 根據 @Order 註解的順序排序 6 AnnotationAwareOrderComparator.sort(this.interceptors); 7 } 8 }
interceptors 攔截器是在 RestTemplate 的父類 InterceptingHttpAccessor 中的, RestTemplate 的類結構如下圖所示。
從 restTemplate.getForEntity(“//demo-producer/v1/uuid”, String.class) 這個GET請求進去看看,是如何使用 LoadBalancerInterceptor 的。一步步進去,可以看到最終是進入到 doExecute 這個方法了。
在 doExecute 方法中,首先根據 url、method 創建一個 ClientHttpRequest,然後利用 ClientHttpRequest 來發起請求。
1 protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback, 2 @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException { 3 ClientHttpResponse response = null; 4 try { 5 // 創建一個 ClientHttpRequest 6 ClientHttpRequest request = createRequest(url, method); 7 if (requestCallback != null) { 8 requestCallback.doWithRequest(request); 9 } 10 // 調用 ClientHttpRequest 的 execute() 方法 11 response = request.execute(); 12 // 處理返回結果 13 handleResponse(url, method, response); 14 return (responseExtractor != null ? responseExtractor.extractData(response) : null); 15 } 16 catch (IOException ex) { 17 // ... 18 } 19 finally { 20 if (response != null) { 21 response.close(); 22 } 23 } 24 } 25 26 ////////////////////////////////////// 27 28 protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException { 29 ClientHttpRequest request = getRequestFactory().createRequest(url, method); 30 initialize(request); 31 if (logger.isDebugEnabled()) { 32 logger.debug("HTTP " + method.name() + " " + url); 33 } 34 return request; 35 }
InterceptingHttpAccessor 中重寫了父類 HttpAccessor 的 getRequestFactory 方法,父類默認的 requestFactory 是 SimpleClientHttpRequestFactory。
重寫後的 getRequestFactory 方法中,如果攔截器不為空,則基於父類默認的 SimpleClientHttpRequestFactory 和攔截器創建了 InterceptingClientHttpRequestFactory。
1 public ClientHttpRequestFactory getRequestFactory() { 2 List<ClientHttpRequestInterceptor> interceptors = getInterceptors(); 3 if (!CollectionUtils.isEmpty(interceptors)) { 4 ClientHttpRequestFactory factory = this.interceptingRequestFactory; 5 if (factory == null) { 6 // 傳入 SimpleClientHttpRequestFactory 和 ClientHttpRequestInterceptor 攔截器 7 factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors); 8 this.interceptingRequestFactory = factory; 9 } 10 return factory; 11 } 12 else { 13 return super.getRequestFactory(); 14 } 15 }
也就是說調用了 InterceptingClientHttpRequestFactory 的 createRequest 方法來創建 ClientHttpRequest。進去可以看到,ClientHttpRequest 的實際類型就是 InterceptingClientHttpRequest。
1 protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) { 2 return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod); 3 }
InterceptingClientHttpRequest 的類結構如下:
RestTemplate 的 doExecute 中調用 request.execute() 其實是調用了 InterceptingClientHttpRequest 父類 AbstractClientHttpRequest 中的 execute 方法。一步步進去可以發現最終其實是調用了 InterceptingClientHttpRequest 的 executeInternal 方法。
在 InterceptingClientHttpRequest 的 executeInternal 方法中,創建了 InterceptingRequestExecution 來執行請求。在 InterceptingRequestExecution 的 execute 方法中,會先遍歷執行所有攔截器,然後通過 ClientHttpRequest 發起真正的 http 請求。
1 protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException { 2 // 創建 InterceptingRequestExecution 3 InterceptingRequestExecution requestExecution = new InterceptingRequestExecution(); 4 // 請求調用 5 return requestExecution.execute(this, bufferedOutput); 6 } 7 8 private class InterceptingRequestExecution implements ClientHttpRequestExecution { 9 10 private final Iterator<ClientHttpRequestInterceptor> iterator; 11 12 public InterceptingRequestExecution() { 13 // 攔截器迭代器 14 this.iterator = interceptors.iterator(); 15 } 16 17 @Override 18 public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException { 19 if (this.iterator.hasNext()) { 20 ClientHttpRequestInterceptor nextInterceptor = this.iterator.next(); 21 // 利用攔截器攔截處理,並傳入 InterceptingRequestExecution 22 return nextInterceptor.intercept(request, body, this); 23 } 24 else { 25 // 攔截器遍歷完後開始發起真正的 http 請求 26 HttpMethod method = request.getMethod(); 27 Assert.state(method != null, "No standard HTTP method"); 28 ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method); 29 request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value)); 30 if (body.length > 0) { 31 if (delegate instanceof StreamingHttpOutputMessage) { 32 StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate; 33 streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream)); 34 } 35 else { 36 StreamUtils.copy(body, delegate.getBody()); 37 } 38 } 39 return delegate.execute(); 40 } 41 } 42 }
進入到 LoadBalancerInterceptor 的 intercept 攔截方法內,可以看到從請求的原始地址中獲取了服務名稱,然後調用了 loadBalancer 的 execute 方法,也就是 LoadBalancerClient。
到這裡,其實已經可以想像,loadBalancer.execute 這行代碼就是根據服務名稱去獲取一個具體的實例,然後將原始地址替換為實例的IP地址。那這個 loadBalancer 又是什麼呢?
1 public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { 2 // 原始地址://demo-producer/v1/uuid 3 final URI originalUri = request.getURI(); 4 // host 就是服務名:demo-producer 5 String serviceName = originalUri.getHost(); 6 Assert.state(serviceName != null, 7 "Request URI does not contain a valid hostname: " + originalUri); 8 return this.loadBalancer.execute(serviceName, 9 this.requestFactory.createRequest(request, body, execution)); 10 }
4、負載均衡客戶端 LoadBalancerClient
在配置 LoadBalancerInterceptor 時,需要兩個參數,LoadBalancerClient 和 LoadBalancerRequestFactory,LoadBalancerRequestFactory前面已經知道是如何創建的了。LoadBalancerClient 又是在哪創建的呢?通過 IDEA 搜索,可以發現是在 spring-cloud-netflix-ribbon 模塊下的 RibbonAutoConfiguration 中配置的,可以看到 LoadBalancerClient 的實際類型是 RibbonLoadBalancerClient。
配置類的順序是 EurekaClientAutoConfiguration、RibbonAutoConfiguration、LoadBalancerAutoConfiguration,因為使 RestTemplate 具備負載均衡的能力需要 LoadBalancerInterceptor 攔截器,創建 LoadBalancerInterceptor 又需要 LoadBalancerClient,而 LoadBalancerClient 底層要根據服務名獲取某個實例,肯定又需要一個實例庫,比如從配置文件、註冊中心獲取。從這裡就可以看出來,RibbonLoadBalancerClient 默認會從 Eureka 註冊中心獲取實例。
1 @Configuration 2 @Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class) 3 @RibbonClients 4 // 後於 EurekaClientAutoConfiguration 配置 5 @AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration") 6 // 先於 LoadBalancerAutoConfiguration 配置 7 @AutoConfigureBefore({ LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class }) 8 @EnableConfigurationProperties({ RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class }) 9 public class RibbonAutoConfiguration { 10 11 @Autowired(required = false) 12 private List<RibbonClientSpecification> configurations = new ArrayList<>(); 13 14 @Bean 15 @ConditionalOnMissingBean 16 public SpringClientFactory springClientFactory() { 17 SpringClientFactory factory = new SpringClientFactory(); 18 factory.setConfigurations(this.configurations); 19 return factory; 20 } 21 22 @Bean 23 @ConditionalOnMissingBean(LoadBalancerClient.class) 24 public LoadBalancerClient loadBalancerClient() { 25 return new RibbonLoadBalancerClient(springClientFactory()); 26 } 27 }
LoadBalancerClient 主要提供了三個接口:
1 public interface LoadBalancerClient extends ServiceInstanceChooser { 2 3 // 從 LoadBalancer 找一個 Server 來發送請求 4 <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException; 5 6 // 從傳入的 ServiceInstance 取 Server 來發送請求 7 <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException; 8 9 // 對原始 URI 重構 10 URI reconstructURI(ServiceInstance instance, URI original); 11 }
進入到 RibbonLoadBalancerClient 的 execute 方法中可以看到:
- 首先根據服務名獲取服務對應的負載均衡器 ILoadBalancer。
- 然後從 ILoadBalancer 中根據一定策略選出一個實例 Server。
- 然後將 server、serviceId 等信息封裝到 RibbonServer 中,也就是一個服務實例 ServiceInstance。
- 最後調用了 LoadBalancerRequest 的 apply,並傳入 ServiceInstance,將地址中的服務名替換為真實的IP地址。
1 public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException { 2 return execute(serviceId, request, null); 3 } 4 5 public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) 6 throws IOException { 7 // 根據服務名獲取一個負載均衡器 ILoadBalancer 8 ILoadBalancer loadBalancer = getLoadBalancer(serviceId); 9 // 利用負載均衡器獲取實例 Server 10 Server server = getServer(loadBalancer, hint); 11 if (server == null) { 12 throw new IllegalStateException("No instances available for " + serviceId); 13 } 14 // 封裝實例信息:RibbonServer 的父類是 ServiceInstance 15 RibbonServer ribbonServer = new RibbonServer(serviceId, server, 16 isSecure(server, serviceId), 17 serverIntrospector(serviceId).getMetadata(server)); 18 return execute(serviceId, ribbonServer, request); 19 } 20 21 @Override 22 public <T> T execute(String serviceId, ServiceInstance serviceInstance, 23 LoadBalancerRequest<T> request) throws IOException { 24 Server server = null; 25 if (serviceInstance instanceof RibbonServer) { 26 server = ((RibbonServer) serviceInstance).getServer(); 27 } 28 if (server == null) { 29 throw new IllegalStateException("No instances available for " + serviceId); 30 } 31 32 try { 33 // 處理地址,將服務名替換為真實的IP地址 34 T returnVal = request.apply(serviceInstance); 35 return returnVal; 36 } catch (Exception ex) { 37 // ... 38 } 39 return null; 40 }
這個 LoadBalancerRequest 其實就是 LoadBalancerInterceptor 的 intercept 中創建的一個匿名類,在它的函數式接口內,主要是用裝飾器 ServiceRequestWrapper 將 request 包了一層。
1 public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) { 2 return instance -> { 3 // 封裝 HttpRequest,ServiceRequestWrapper 重載了 getURI 方法。 4 HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer); 5 if (this.transformers != null) { 6 for (LoadBalancerRequestTransformer transformer : this.transformers) { 7 serviceRequest = transformer.transformRequest(serviceRequest, instance); 8 } 9 } 10 // 繼續執行攔截器 11 return execution.execute(serviceRequest, body); 12 }; 13 }
ServiceRequestWrapper 主要就是重寫了 getURI 方法,在重寫的 getURI 方法內,它用 loadBalancer 對 URI 進行了重構,進去可以發現,就是將原始地址中的服務名替換為 Server 的真實IP、端口地址。
1 @Override 2 public URI getURI() { 3 // 重構 URI 4 URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI()); 5 return uri; 6 }
1 public URI reconstructURI(ServiceInstance instance, URI original) { 2 Assert.notNull(instance, "instance can not be null"); 3 // 服務名 4 String serviceId = instance.getServiceId(); 5 RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId); 6 7 URI uri; 8 Server server; 9 if (instance instanceof RibbonServer) { 10 RibbonServer ribbonServer = (RibbonServer) instance; 11 server = ribbonServer.getServer(); 12 uri = updateToSecureConnectionIfNeeded(original, ribbonServer); 13 } 14 else { 15 server = new Server(instance.getScheme(), instance.getHost(), instance.getPort()); 16 IClientConfig clientConfig = clientFactory.getClientConfig(serviceId); 17 ServerIntrospector serverIntrospector = serverIntrospector(serviceId); 18 uri = updateToSecureConnectionIfNeeded(original, clientConfig, serverIntrospector, server); 19 } 20 // 重構地址 21 return context.reconstructURIWithServer(server, uri); 22 }
reconstructURIWithServer:


1 public URI reconstructURIWithServer(Server server, URI original) { 2 String host = server.getHost(); 3 int port = server.getPort(); 4 String scheme = server.getScheme(); 5 6 if (host.equals(original.getHost()) 7 && port == original.getPort() 8 && scheme == original.getScheme()) { 9 return original; 10 } 11 if (scheme == null) { 12 scheme = original.getScheme(); 13 } 14 if (scheme == null) { 15 scheme = deriveSchemeAndPortFromPartialUri(original).first(); 16 } 17 18 try { 19 StringBuilder sb = new StringBuilder(); 20 sb.append(scheme).append("://"); 21 if (!Strings.isNullOrEmpty(original.getRawUserInfo())) { 22 sb.append(original.getRawUserInfo()).append("@"); 23 } 24 sb.append(host); 25 if (port >= 0) { 26 sb.append(":").append(port); 27 } 28 sb.append(original.getRawPath()); 29 if (!Strings.isNullOrEmpty(original.getRawQuery())) { 30 sb.append("?").append(original.getRawQuery()); 31 } 32 if (!Strings.isNullOrEmpty(original.getRawFragment())) { 33 sb.append("#").append(original.getRawFragment()); 34 } 35 URI newURI = new URI(sb.toString()); 36 return newURI; 37 } catch (URISyntaxException e) { 38 throw new RuntimeException(e); 39 } 40 }
View Code
5、RestTemplate 負載均衡總結
到這裡,我們基本就弄清楚了一個簡單的 @LoadBalanced 註解如何讓 RestTemplate 具備了負載均衡的能力了,這一節來做個小結。
① RestTemplate 如何獲得負載均衡的能力
- 1)首先 RestTemplate 是 spring-web 模塊下一個訪問第三方 RESTful API 接口的網絡請求框架
- 2)在 spring cloud 微服務架構中,用 @LoadBalanced 對 RestTemplate 做個標記,就可以使 RestTemplate 具備負載均衡的能力
- 3)使 RestTemplate 具備負載均衡的核心組件就是 LoadBalancerAutoConfiguration 配置類中向其添加的 LoadBalancerInterceptor 負載均衡攔截器
- 4)RestTemplate 在發起 http 調用前,會遍歷所有攔截器來對 RestTemplate 定製化,LoadBalancerInterceptor 就是在這時將URI中的服務名替換為實例的真實IP地址。定製完成後,就會發起真正的 http 請求。
- 5)LoadBalancerInterceptor 又主要是使用負載均衡客戶端 LoadBalancerClient 來完成URI的重構的,LoadBalancerClient 就可以根據服務名查找一個可用的實例,然後重構URI。
② 核心組件
這裡會涉及多個模塊,下面是核心組件的所屬模塊:
spring-web:
- RestTemplate
- InterceptingClientHttpRequest:執行攔截器,並發起最終http調用
spring-cloud-commons:
- @LoadBalanced
- LoadBalancerAutoConfiguration
- LoadBalancerRequestFactory:創建裝飾類 ServiceRequestWrapper 替換原來的 HttpRequest,重載 getURI 方法。
- LoadBalancerInterceptor:負載均衡攔截器
- LoadBalancerClient:負載均衡客戶端接口
spring-cloud-netflix-ribbon:
- RibbonLoadBalancerClient:LoadBalancerClient 的實現類,Ribbon 的負載均衡客戶端
- RibbonAutoConfiguration
ribbon-loadbalancer:
- ILoadBalancer:負載均衡器
- Server:實例
③ 最後再用一張圖把 RestTemplate 這塊的關係捋一下
三、ILoadBalancer 獲取 Server
從前面 RestTemplate 那張圖可以看出,使 RestTemplate 具備負載均衡的能力,最重要的一個組件之一就是 ILoadBalancer,因為要用它來獲取能調用的 Server,有了 Server 才能對原始帶有服務名的 URI 進行重構。這節就來看下 Ribbon 的負載均衡器 ILoadBalancer 是如何創建的以及如何通過它獲取 Server。
1、創建負載均衡器 ILoadBalancer
① SpringClientFactory與上下文
ILoadBalancer 是用 SpringClientFactory 的 getLoadBalancer 方法根據服務名獲取的,從 getInstance 一步步進去可以發現,每個服務都會創建一個 AnnotationConfigApplicationContext,也就是一個應用上下文 ApplicationContext。相當於就是一個服務綁定一個 ILoadBalancer。
1 public <C> C getInstance(String name, Class<C> type) { 2 C instance = super.getInstance(name, type); 3 if (instance != null) { 4 return instance; 5 } 6 IClientConfig config = getInstance(name, IClientConfig.class); 7 return instantiateWithConfig(getContext(name), type, config); 8 }
1 public <T> T getInstance(String name, Class<T> type) { 2 // 根據名稱獲取 3 AnnotationConfigApplicationContext context = getContext(name); 4 if (BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context, type).length > 0) { 5 return context.getBean(type); 6 } 7 return null; 8 }
1 protected AnnotationConfigApplicationContext getContext(String name) { 2 // contexts => Map<String, AnnotationConfigApplicationContext> 3 if (!this.contexts.containsKey(name)) { 4 synchronized (this.contexts) { 5 if (!this.contexts.containsKey(name)) { 6 this.contexts.put(name, createContext(name)); 7 } 8 } 9 } 10 return this.contexts.get(name); 11 }
調試看下 AnnotationConfigApplicationContext 上下文,可以看到放入了與這個服務綁定的 ILoadBalancer、IClientConfig、RibbonLoadBalancerContext 等。
它這裡為什麼要每個服務都綁定一個 ApplicationContext 呢?我猜想應該是因為服務實例列表可以有多個來源,比如可以從 eureka 註冊中心獲取、可以通過代碼配置、可以通過配置文件配置,另外每個服務還可以有很多個性化的配置,有默認的配置、定製的全局配置、個別服務的特定配置等,它這樣做就便於用戶定製每個服務的負載均衡策略。
② Ribbon的飢餓加載
而且這個Ribbon客戶端的應用上下文默認是懶加載的,並不是在啟動的時候就加載上下文,而是在第一次調用的時候才會去初始化。
如果想服務啟動時就初始化,可以指定Ribbon客戶端的具體名稱,在啟動的時候就加載配置項的上下文:
1 ribbon: 2 eager-load: 3 enabled: true 4 clients: demo-producer,demo-xxx
③ RibbonClientConfiguration
ILoadBalancer 的創建在哪呢?看 RibbonClientConfiguration,這個配置類提供了 ILoadBalancer 的默認創建方法,ILoadBalancer 的默認實現類為 ZoneAwareLoadBalancer。
1 public class RibbonClientConfiguration { 2 3 public static final int DEFAULT_CONNECT_TIMEOUT = 1000; 4 5 public static final int DEFAULT_READ_TIMEOUT = 1000; 6 7 public static final boolean DEFAULT_GZIP_PAYLOAD = true; 8 9 @RibbonClientName 10 private String name = "client"; 11 12 @Autowired 13 private PropertiesFactory propertiesFactory; 14 15 @Bean 16 @ConditionalOnMissingBean 17 public IClientConfig ribbonClientConfig() { 18 DefaultClientConfigImpl config = new DefaultClientConfigImpl(); 19 config.loadProperties(this.name); 20 config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT); 21 config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT); 22 config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD); 23 return config; 24 } 25 26 @Bean 27 @ConditionalOnMissingBean 28 public IRule ribbonRule(IClientConfig config) { 29 if (this.propertiesFactory.isSet(IRule.class, name)) { 30 return this.propertiesFactory.get(IRule.class, config, name); 31 } 32 ZoneAvoidanceRule rule = new ZoneAvoidanceRule(); 33 rule.initWithNiwsConfig(config); 34 return rule; 35 } 36 37 @Bean 38 @ConditionalOnMissingBean 39 public IPing ribbonPing(IClientConfig config) { 40 if (this.propertiesFactory.isSet(IPing.class, name)) { 41 return this.propertiesFactory.get(IPing.class, config, name); 42 } 43 return new DummyPing(); 44 } 45 46 @Bean 47 @ConditionalOnMissingBean 48 @SuppressWarnings("unchecked") 49 public ServerList<Server> ribbonServerList(IClientConfig config) { 50 if (this.propertiesFactory.isSet(ServerList.class, name)) { 51 return this.propertiesFactory.get(ServerList.class, config, name); 52 } 53 ConfigurationBasedServerList serverList = new ConfigurationBasedServerList(); 54 serverList.initWithNiwsConfig(config); 55 return serverList; 56 } 57 58 @Bean 59 @ConditionalOnMissingBean 60 public ServerListUpdater ribbonServerListUpdater(IClientConfig config) { 61 return new PollingServerListUpdater(config); 62 } 63 64 @Bean 65 @ConditionalOnMissingBean 66 @SuppressWarnings("unchecked") 67 public ServerListFilter<Server> ribbonServerListFilter(IClientConfig config) { 68 if (this.propertiesFactory.isSet(ServerListFilter.class, name)) { 69 return this.propertiesFactory.get(ServerListFilter.class, config, name); 70 } 71 ZonePreferenceServerListFilter filter = new ZonePreferenceServerListFilter(); 72 filter.initWithNiwsConfig(config); 73 return filter; 74 } 75 76 @Bean 77 @ConditionalOnMissingBean 78 public ILoadBalancer ribbonLoadBalancer(IClientConfig config, 79 ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, 80 IRule rule, IPing ping, ServerListUpdater serverListUpdater) { 81 // 先判斷配置文件中是否配置了負載均衡器 82 if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) { 83 // 通過反射創建 84 return this.propertiesFactory.get(ILoadBalancer.class, config, name); 85 } 86 return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList, 87 serverListFilter, serverListUpdater); 88 } 89 }
可以看到創建 ILoadBalancer 需要 IClientConfig、ServerList<Server>、ServerListFilter<Server>、IRule、IPing、ServerListUpdater,其實這6個接口加上 ILoadBalancer 就是 Ribbon 的核心接口,它們共同定義了 Ribbon 的行為特性。
這7個核心接口和默認實現類如下:
2、客戶端 Ribbon 定製
可以看到在 RibbonClientConfiguration 中創建 IRule、IPing、ServerList<Server>、ServerListFilter<Server>、ILoadBalancer 時,都先通過 propertiesFactory.isSet 判斷是否已配置了對應類型的實現類,沒有才使用默認的實現類。
也就是說針對特定的服務,這幾個類可以自行定製化,也可以通過配置指定其它的實現類。
① 全局策略配置
如果想要全局更改配置,需要加一個配置類,比如像下面這樣:
1 @Configuration 2 public class GlobalRibbonConfiguration { 3 4 @Bean 5 public IRule ribbonRule() { 6 return new RandomRule(); 7 } 8 9 @Bean 10 public IPing ribbonPing() { 11 return new NoOpPing(); 12 } 13 }
② 基於註解的配置
如果想針對某一個服務定製配置,可以通過 @RibbonClients 來配置特定服務的配置類。
需要先定義一個服務配置類:
1 @Configuration 2 public class ProducerRibbonConfiguration { 3 4 @Bean 5 public IRule ribbonRule() { 6 return new RandomRule(); 7 } 8 9 @Bean 10 public IPing ribbonPing() { 11 return new NoOpPing(); 12 } 13 }
用 @RibbonClients 註解為服務指定特定的配置類,並排除掉,不讓 Spring 掃描,否則就變成了全局配置了。
1 @RibbonClients({ 2 @RibbonClient(name = "demo-producer", configuration = ProducerRibbonConfiguration.class) 3 }) 4 @ComponentScan(excludeFilters = { 5 @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = ProducerRibbonConfiguration.class) 6 })
③ 配置文件配置
通過配置文件的方式來配置,配置的格式就是 <服務名稱>.ribbon.<屬性>:
1 demo-producer: 2 ribbon: 3 # ILoadBalancer 4 NFLoadBalancerClassName: com.netflix.loadbalancer.NoOpLoadBalancer 5 # IRule 6 NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule 7 # IPing 8 NFLoadBalancerPingClassName: 9 # ServerList<Server> 10 NIWSServerListClassName: 11 # ServerListFilter<Server> 12 NIWSServerListFilterClassName:
④ 優先級順序
這幾種配置方式的優先級順序是 配置文件配置 > @RibbonClients 配置 > 全局配置 > 默認配置。
3、ZoneAwareLoadBalancer 選擇 Server
獲取到 ILoadBalancer 後,就要去獲取 Server 了,可以看到,就是用 ILoadBalancer 來獲取 Server。
1 protected Server getServer(ILoadBalancer loadBalancer, Object hint) { 2 if (loadBalancer == null) { 3 return null; 4 } 5 // Use 'default' on a null hint, or just pass it on? 6 return loadBalancer.chooseServer(hint != null ? hint : "default"); 7 }
ILoadBalancer 的默認實現類是 ZoneAwareLoadBalancer,進入它的 chooseServer 方法內,如果只配置了一個 zone,就走父類的 chooseServer,否則從多個 zone 中去選擇實例。
1 public Server chooseServer(Object key) { 2 // ENABLED => ZoneAwareNIWSDiscoveryLoadBalancer.enabled 默認 true 3 // AvailableZones 配置的只有一個 defaultZone 4 if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) { 5 logger.debug("Zone aware logic disabled or there is only one zone"); 6 // 走父類獲取 Server 的邏輯 7 return super.chooseServer(key); 8 } 9 10 // 多 zone 邏輯.... 11 }
先看下 ZoneAwareLoadBalancer 的類繼承結構,ZoneAwareLoadBalancer 的直接父類是 DynamicServerListLoadBalancer,DynamicServerListLoadBalancer 的父類又是 BaseLoadBalancer。
ZoneAwareLoadBalancer 調用父類的 chooseServer 方法是在 BaseLoadBalancer 中的,進去可以看到,它主要是用 IRule 來選擇實例,最終選擇實例的策略就交給了 IRule 接口。
1 public Server chooseServer(Object key) { 2 if (counter == null) { 3 counter = createCounter(); 4 } 5 counter.increment(); 6 if (rule == null) { 7 return null; 8 } else { 9 try { 10 // IRule 11 return rule.choose(key); 12 } catch (Exception e) { 13 logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e); 14 return null; 15 } 16 } 17 }
4、ZoneAvoidanceRule 斷言篩選、輪詢選擇 Server
IRule 的默認實現類是 ZoneAvoidanceRule,先看下 ZoneAvoidanceRule 的繼承結構,ZoneAvoidanceRule 的直接父類是 PredicateBasedRule。
rule.choose 的邏輯在 PredicateBasedRule 中,getPredicate() 返回的是 ZoneAvoidanceRule 創建的一個組合斷言 CompositePredicate,就是用這個斷言來過濾出可用的 Server,並通過輪詢的策略返回一個 Server。
1 public Server choose(Object key) { 2 ILoadBalancer lb = getLoadBalancer(); 3 // getPredicate() Server斷言 => CompositePredicate 4 // RoundRobin 輪詢方式獲取實例 5 Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); 6 if (server.isPresent()) { 7 return server.get(); 8 } else { 9 return null; 10 } 11 }
在初始化 ZoneAvoidanceRule 配置時,創建了 CompositePredicate,可以看到這個組合斷言主要有兩個斷言,一個是斷言 Server 的 zone 是否可用,一個斷言 Server 本身是否可用,例如 Server 無法 ping 通。
1 public void initWithNiwsConfig(IClientConfig clientConfig) { 2 // 斷言 Server 的 zone 是否可用,只有一個 defaultZone 的情況下都是可用的 3 ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig); 4 // 斷言 Server 是否可用 5 AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig); 6 // 封裝組合斷言 7 compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate); 8 } 9 10 private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) { 11 // 建造者模式創建斷言 12 return CompositePredicate.withPredicates(p1, p2) 13 .addFallbackPredicate(p2) 14 .addFallbackPredicate(AbstractServerPredicate.alwaysTrue()) 15 .build(); 16 17 }
接着看選擇Server的 chooseRoundRobinAfterFiltering,參數 servers 是通過 ILoadBalancer 獲取的所有實例,可以看到它其實就是返回了 ILoadBalancer 在內存中緩存的服務所有 Server。這個 Server 從哪來的我們後面再來看。
1 public List<Server> getAllServers() { 2 // allServerList => List<Server> 3 return Collections.unmodifiableList(allServerList); 4 }
先對所有實例通過斷言過濾掉不可用的 Server,然後是通過輪詢的方式獲取一個 Server 返回。這就是默認配置下 ILoadBalancer(ZoneAwareLoadBalancer) 通過 IRule(ZoneAvoidanceRule) 選擇 Server 的流程了。
1 public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) { 2 // 斷言獲取可用的 Server 3 List<Server> eligible = getEligibleServers(servers, loadBalancerKey); 4 if (eligible.size() == 0) { 5 return Optional.absent(); 6 } 7 // 通過取模的方式輪詢 Server 8 return Optional.of(eligible.get(incrementAndGetModulo(eligible.size()))); 9 } 10 11 public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) { 12 if (loadBalancerKey == null) { 13 return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate())); 14 } else { 15 List<Server> results = Lists.newArrayList(); 16 // 對每個 Server 斷言 17 for (Server server: servers) { 18 if (this.apply(new PredicateKey(loadBalancerKey, server))) { 19 results.add(server); 20 } 21 } 22 return results; 23 } 24 } 25 26 private int incrementAndGetModulo(int modulo) { 27 for (;;) { 28 int current = nextIndex.get(); 29 // 模運算取餘數 30 int next = (current + 1) % modulo; 31 // CAS 更新 nextIndex 32 if (nextIndex.compareAndSet(current, next) && current < modulo) 33 return current; 34 } 35 }
四、Ribbon 整合 Eureka Client 拉取Server列表
前面在通過 IRule 選擇 Server 的時候,首先通過 lb.getAllServers() 獲取了所有的 Server,那這些 Server 從哪裡來的呢,這節就來看下。
1、ILoadBalancer 初始化
ILoadBalancer 的默認實現類是 ZoneAwareLoadBalancer,先從 ZoneAwareLoadBalancer 的構造方法進去看看都做了些什麼事情。
1 @Bean 2 @ConditionalOnMissingBean 3 public ILoadBalancer ribbonLoadBalancer(IClientConfig config, 4 ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, 5 IRule rule, IPing ping, ServerListUpdater serverListUpdater) { 6 if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) { 7 return this.propertiesFactory.get(ILoadBalancer.class, config, name); 8 } 9 return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList, 10 serverListFilter, serverListUpdater); 11 }
可以看到,ZoneAwareLoadBalancer 直接調用了父類 DynamicServerListLoadBalancer 的構造方法,DynamicServerListLoadBalancer 先調用父類 BaseLoadBalancer 初始化,然後又做了一些剩餘的初始化工作。
1 public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule, 2 IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, 3 ServerListUpdater serverListUpdater) { 4 // DynamicServerListLoadBalancer 5 super(clientConfig, rule, ping, serverList, filter, serverListUpdater); 6 } 7 8 public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, 9 ServerList<T> serverList, ServerListFilter<T> filter, 10 ServerListUpdater serverListUpdater) { 11 // BaseLoadBalancer 12 super(clientConfig, rule, ping); 13 this.serverListImpl = serverList; 14 this.filter = filter; 15 this.serverListUpdater = serverListUpdater; 16 if (filter instanceof AbstractServerListFilter) { 17 ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats()); 18 } 19 // 剩餘的一些初始化 20 restOfInit(clientConfig); 21 } 22 23 public BaseLoadBalancer(IClientConfig config, IRule rule, IPing ping) { 24 // createLoadBalancerStatsFromConfig => LoadBalancerStats 統計 25 initWithConfig(config, rule, ping, createLoadBalancerStatsFromConfig(config)); 26
看 BaseLoadBalancer 的 initWithConfig,主要做了如下的初始化:
- 設置 IPing 和 IRule,ping 的間隔時間是 30 秒,setPing 會啟動一個後台定時任務,然後每隔30秒運行一次 PingTask 任務。
- 設置了 ILoadBalancer 的 統計器 LoadBalancerStats,對 ILoadBalancer 的 Server 狀態進行統計,比如連接失敗、成功、熔斷等信息。
- 在啟用 PrimeConnections 請求預熱的情況下,創建 PrimeConnections 來預熱客戶端 與 Server 的鏈接。默認是關閉的。
- 最後是註冊了一些監控、開啟請求預熱。
1 void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) { 2 this.config = clientConfig; 3 String clientName = clientConfig.getClientName(); 4 this.name = clientName; 5 // ping 間隔時間,默認30秒 6 int pingIntervalTime = Integer.parseInt("" 7 + clientConfig.getProperty( 8 CommonClientConfigKey.NFLoadBalancerPingInterval, 9 Integer.parseInt("30"))); 10 // 沒看到用的地方 11 int maxTotalPingTime = Integer.parseInt("" 12 + clientConfig.getProperty( 13 CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime, 14 Integer.parseInt("2"))); 15 // 設置 ping 間隔時間,並重新設置了 ping 任務 16 setPingInterval(pingIntervalTime); 17 setMaxTotalPingTime(maxTotalPingTime); 18 19 // 設置 IRule、IPing 20 setRule(rule); 21 setPing(ping); 22 23 setLoadBalancerStats(stats); 24 rule.setLoadBalancer(this); 25 if (ping instanceof AbstractLoadBalancerPing) { 26 ((AbstractLoadBalancerPing) ping).setLoadBalancer(this); 27 } 28 logger.info("Client: {} instantiated a LoadBalancer: {}", name, this); 29 30 // PrimeConnections,請求預熱,默認關閉 31 // 作用主要用於解決那些部署環境(如讀EC2)在實際使用實時請求之前,從防火牆連接/路徑進行預熱(比如先加白名單、初始化等等動作比較耗時,可以用它先去打通)。 32 boolean enablePrimeConnections = clientConfig.get( 33 CommonClientConfigKey.EnablePrimeConnections, DefaultClientConfigImpl.DEFAULT_ENABLE_PRIME_CONNECTIONS); 34 if (enablePrimeConnections) { 35 this.setEnablePrimingConnections(true); 36 PrimeConnections primeConnections = new PrimeConnections( 37 this.getName(), clientConfig); 38 this.setPrimeConnections(primeConnections); 39 } 40 // 註冊一些監控 41 init(); 42 } 43 44 protected void init() { 45 Monitors.registerObject("LoadBalancer_" + name, this); 46 // register the rule as it contains metric for available servers count 47 Monitors.registerObject("Rule_" + name, this.getRule()); 48 // 默認關閉 49 if (enablePrimingConnections && primeConnections != null) { 50 primeConnections.primeConnections(getReachableServers()); 51 } 52 }
再看下 DynamicServerListLoadBalancer 的初始化,核心的初始化邏輯在 restOfInit 中,主要就是做了兩件事情:
- 開啟動態更新 Server 的特性,比如實例上線、下線、故障等,要能夠更新 ILoadBalancer 的 Server 列表。
- 然後就全量更新一次本地的 Server 列表。
1 void restOfInit(IClientConfig clientConfig) { 2 boolean primeConnection = this.isEnablePrimingConnections(); 3 // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList() 4 this.setEnablePrimingConnections(false); 5 6 // 開啟動態更新 Server 的特性 7 enableAndInitLearnNewServersFeature(); 8 9 // 更新 Server 列表 10 updateListOfServers(); 11 12 // 開啟請求預熱的情況下,對可用的 Server 進行預熱 13 if (primeConnection && this.getPrimeConnections() != null) { 14 this.getPrimeConnections() 15 .primeConnections(getReachableServers()); 16 } 17 this.setEnablePrimingConnections(primeConnection); 18 LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString()); 19 }
2、全量更新Server列表
先看下 updateListOfServers() 是如何更新 Server 列表的,進而看下 ILoadBalancer 是如何存儲 Server 的。
- 首先使用 ServerList 獲取所有的 Server 列表,在 RibbonClientConfiguration 中配置的是 ConfigurationBasedServerList,但和 eureka 集合和,就不是 ConfigurationBasedServerList 了,這塊下一節再來看。
- 然後使用 ServerListFilter 對 Server 列表過濾,其默認實現類是 ZonePreferenceServerListFilter,它主要是過濾出當前 Zone(defaultZone)下的 Server。
- 最後就是更新所有 Server 列表,先是設置 Server alive,然後調用父類(BaseLoadBalancer)的 setServersList 來更新Server列表,這說明 Server 是存儲在 BaseLoadBalancer 里的。
1 public void updateListOfServers() { 2 List<T> servers = new ArrayList<T>(); 3 if (serverListImpl != null) { 4 // 從 ServerList 獲取所有 Server 列表 5 servers = serverListImpl.getUpdatedListOfServers(); 6 LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); 7 8 if (filter != null) { 9 // 用 ServerListFilter 過濾 Server 10 servers = filter.getFilteredListOfServers(servers); 11 LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); 12 } 13 } 14 // 更新所有 Server 到本地緩存 15 updateAllServerList(servers); 16 } 17 18 protected void updateAllServerList(List<T> ls) { 19 if (serverListUpdateInProgress.compareAndSet(false, true)) { 20 try { 21 for (T s : ls) { 22 s.setAlive(true); // 設置 Server alive 23 } 24 setServersList(ls); 25 // 強制初始化 Ping 26 super.forceQuickPing(); 27 } finally { 28 serverListUpdateInProgress.set(false); 29 } 30 } 31 } 32 33 public void setServersList(List lsrv) { 34 // BaseLoadBalancer 35 super.setServersList(lsrv); 36 37 // 將 Server 更新到 LoadBalancerStats 統計中 .... 38 }
接着看父類的 setServersList,可以看出,存儲所有 Server 的數據結構 allServerList 是一個加了 synchronized 的線程安全的容器,setServersList 就是直接將得到的 Server 列表替換 allServerList。
1 public void setServersList(List lsrv) { 2 Lock writeLock = allServerLock.writeLock(); 3 ArrayList<Server> newServers = new ArrayList<Server>(); 4 // 加寫鎖 5 writeLock.lock(); 6 try { 7 // for 循環將 lsrv 中的 Server 轉移到 allServers 8 ArrayList<Server> allServers = new ArrayList<Server>(); 9 for (Object server : lsrv) { 10 if (server == null) { 11 continue; 12 } 13 if (server instanceof String) { 14 server = new Server((String) server); 15 } 16 if (server instanceof Server) { 17 logger.debug("LoadBalancer [{}]: addServer [{}]", name, ((Server) server).getId()); 18 allServers.add((Server) server); 19 } else { 20 throw new IllegalArgumentException("Type String or Server expected, instead found:" + server.getClass()); 21 } 22 } 23 24 boolean listChanged = false; 25 // allServerList => volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>()) 26 if (!allServerList.equals(allServers)) { 27 listChanged = true; 28 // 服務列表變更監聽器 ServerListChangeListener, 發出服務變更通知... 29 } 30 31 // 啟用了服務預熱,開始 Server 預熱... 32 33 // 直接替換 34 allServerList = allServers; 35 if (canSkipPing()) { 36 for (Server s : allServerList) { 37 s.setAlive(true); 38 } 39 upServerList = allServerList; 40 } else if (listChanged) { 41 forceQuickPing(); 42 } 43 } finally { 44 // 釋放寫鎖 45 writeLock.unlock(); 46 } 47 }
前面 chooseRoundRobinAfterFiltering 獲取所有 Server 時就是返回的這個 allServerList列表。
1 public List<Server> getAllServers() { 2 return Collections.unmodifiableList(allServerList); 3 }
3、Eureka Ribbon 客戶端配置
獲取 Server 的組件是 ServerList,RibbonClientConfiguration 中配置的默認實現類是 ConfigurationBasedServerList。ConfigurationBasedServerList 默認是從配置文件中獲取,可以像下面這樣配置服務實例地址,多個 Server 地址用逗號隔開。
1 demo-producer: 2 ribbon: 3 listOfServers: http://10.215.0.92:8010,//10.215.0.92:8011
但是和 eureka-client 結合後,也就是引入 spring-cloud-starter-netflix-eureka-client 的客戶端依賴,它會幫我們引入 spring-cloud-netflix-eureka-client 依賴,這個包中有一個 RibbonEurekaAutoConfiguration 自動化配置類,它通過 @RibbonClients 註解定義了全局的 Ribbon 客戶端配置類 為 EurekaRibbonClientConfiguration
1 @Configuration(proxyBeanMethods = false) 2 @EnableConfigurationProperties 3 @ConditionalOnRibbonAndEurekaEnabled 4 @AutoConfigureAfter(RibbonAutoConfiguration.class) 5 @RibbonClients(defaultConfiguration = EurekaRibbonClientConfiguration.class) 6 public class RibbonEurekaAutoConfiguration { 7 8 }
進入 EurekaRibbonClientConfiguration 可以看到:
- IPing 的默認實現類為 NIWSDiscoveryPing。
- ServerList 的默認實現類為 DomainExtractingServerList,但是 DomainExtractingServerList 在構造時又傳入了一個類型為 DiscoveryEnabledNIWSServerList 的 ServerList。看名字大概也可以看出,DiscoveryEnabledNIWSServerList 就是從 EurekaClient 獲取 Server 的組件。
1 @Configuration(proxyBeanMethods = false) 2 public class EurekaRibbonClientConfiguration { 3 @Value("${ribbon.eureka.approximateZoneFromHostname:false}") 4 private boolean approximateZoneFromHostname = false; 5 6 @RibbonClientName 7 private String serviceId = "client"; 8 @Autowired 9 private PropertiesFactory propertiesFactory; 10 11 @Bean 12 @ConditionalOnMissingBean 13 public IPing ribbonPing(IClientConfig config) { 14 if (this.propertiesFactory.isSet(IPing.class, serviceId)) { 15 return this.propertiesFactory.get(IPing.class, config, serviceId); 16 } 17 NIWSDiscoveryPing ping = new NIWSDiscoveryPing(); 18 ping.initWithNiwsConfig(config); 19 return ping; 20 } 21 22 @Bean 23 @ConditionalOnMissingBean 24 public ServerList<?> ribbonServerList(IClientConfig config, 25 Provider<EurekaClient> eurekaClientProvider) { 26 if (this.propertiesFactory.isSet(ServerList.class, serviceId)) { 27 return this.propertiesFactory.get(ServerList.class, config, serviceId); 28 } 29 DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(config, eurekaClientProvider); 30 DomainExtractingServerList serverList = new DomainExtractingServerList(discoveryServerList, config, this.approximateZoneFromHostname); 31 return serverList; 32 } 33 }
4、從 DiscoveryClient 獲取Server列表
DynamicServerListLoadBalancer 中通過 ServerList 的 getUpdatedListOfServers 方法全量獲取服務列表,在 eureka-client 環境下,ServerList 默認實現類為 DomainExtractingServerList,那就先看下它的 getUpdatedListOfServers 方法。
可以看出,DomainExtractingServerList 先用 DomainExtractingServerList 獲取服務列表,然後根據 Ribbon 客戶端配置重新構造 Server 對象返回。獲取服務列表的核心在 DiscoveryEnabledNIWSServerList 中。
1 @Override 2 public List<DiscoveryEnabledServer> getUpdatedListOfServers() { 3 // list => DiscoveryEnabledNIWSServerList 4 List<DiscoveryEnabledServer> servers = setZones(this.list.getUpdatedListOfServers()); 5 return servers; 6 } 7 8 private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) { 9 List<DiscoveryEnabledServer> result = new ArrayList<>(); 10 boolean isSecure = this.ribbon.isSecure(true); 11 boolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer(); 12 // 根據客戶端配置重新構造 DomainExtractingServer 返回 13 for (DiscoveryEnabledServer server : servers) { 14 result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr, this.approximateZoneFromHostname)); 15 } 16 return result; 17 }
先看下 DiscoveryEnabledNIWSServerList 的構造初始化:
- 主要是傳入了 Provider<EurekaClient> 用來獲取 EurekaClient
- 另外還設置了客戶端名稱 clientName ,以及 vipAddresses 也是客戶端名稱,這個後面會用得上。
1 public DiscoveryEnabledNIWSServerList(IClientConfig clientConfig, Provider<EurekaClient> eurekaClientProvider) { 2 this.eurekaClientProvider = eurekaClientProvider; 3 initWithNiwsConfig(clientConfig); 4 } 5 6 @Override 7 public void initWithNiwsConfig(IClientConfig clientConfig) { 8 // 客戶端名稱,就是服務名稱 9 clientName = clientConfig.getClientName(); 10 // vipAddresses 得到的也是客戶端名稱 11 vipAddresses = clientConfig.resolveDeploymentContextbasedVipAddresses(); 12 13 // 其它的一些配置.... 14 }
接着看獲取實例的 getUpdatedListOfServers,可以看到它的核心邏輯就是根據服務名從 EurekaClient 獲取 InstanceInfo 實例列表,然後封裝 Server 信息返回。
1 public List<DiscoveryEnabledServer> getUpdatedListOfServers(){ 2 return obtainServersViaDiscovery(); 3 } 4 5 private List<DiscoveryEnabledServer> obtainServersViaDiscovery() { 6 List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>(); 7 if (eurekaClientProvider == null || eurekaClientProvider.get() == null) { 8 return new ArrayList<DiscoveryEnabledServer>(); 9 } 10 // 得到 EurekaClient,實際類型是 CloudEurekaClient,其父類是 DiscoveryClient 11 EurekaClient eurekaClient = eurekaClientProvider.get(); 12 if (vipAddresses!=null){ 13 // 分割 vipAddresses,默認就是服務名稱 14 for (String vipAddress : vipAddresses.split(",")) { 15 // 根據服務名稱從 EurekaClient 獲取實例信息 16 List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion); 17 for (InstanceInfo ii : listOfInstanceInfo) { 18 if (ii.getStatus().equals(InstanceStatus.UP)) { 19 // ... 20 // 根據實例信息 InstanceInfo 創建 Server 21 DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr); 22 serverList.add(des); 23 } 24 } 25 if (serverList.size()>0 && prioritizeVipAddressBasedServers){ 26 break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers 27 } 28 } 29 } 30 return serverList; 31 }
注意這裡的 vipAddress 其實就是服務名:
最後看 EurekaClient 的 getInstancesByVipAddress,到這裡就很清楚了,其實就是從 DiscoveryClient 的本地應用 Applications 中根據服務名取出所有的實例列表。
這裡就和 Eureka 源碼那塊銜接上了,eureka-client 全量抓取註冊表以及每隔30秒增量抓取註冊表,都是合併到本地的 Applications 中。Ribbon 與 Eureka 結合後,Ribbon 獲取 Server 就從 DiscoveryClient 的 Applications 中獲取 Server 列表了。
1 public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure, String region) { 2 // ... 3 Applications applications; 4 if (instanceRegionChecker.isLocalRegion(region)) { 5 // 取本地應用 Applications 6 applications = this.localRegionApps.get(); 7 } else { 8 applications = remoteRegionVsApps.get(region); 9 if (null == applications) { 10 return Collections.emptyList(); 11 } 12 } 13 14 if (!secure) { 15 // 返回服務名對應的實例 16 return applications.getInstancesByVirtualHostName(vipAddress); 17 } else { 18 return applications.getInstancesBySecureVirtualHostName(vipAddress); 19 } 20 }
5、定時更新Server列表
DynamicServerListLoadBalancer 初始化時,有個方法還沒說,就是 enableAndInitLearnNewServersFeature()。這個方法只是調用 ServerListUpdater 啟動了一個 UpdateAction,這個 UpdateAction 又只是調用了一下 updateListOfServers 方法,就是前面講解過的全量更新 Server 的邏輯。
1 public void enableAndInitLearnNewServersFeature() { 2 serverListUpdater.start(updateAction); 3 } 4 5 protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() { 6 @Override 7 public void doUpdate() { 8 // 調用 updateListOfServers 9 updateListOfServers(); 10 } 11 };
ServerListUpdater 的默認實現類是 PollingServerListUpdater,看下它的 start 方法:
其實就是以固定的頻率,每隔30秒調用一下 updateListOfServers 方法,將 DiscoveryClient 中 Applications 中緩存的實例同步到 ILoadBalancer 中的 allServerList 列表中。
1 public synchronized void start(final UpdateAction updateAction) { 2 if (isActive.compareAndSet(false, true)) { 3 final Runnable wrapperRunnable = new Runnable() { 4 @Override 5 public void run() { 6 // ... 7 try { 8 // 執行一次 updateListOfServers 9 updateAction.doUpdate(); 10 // 設置最後更新時間 11 lastUpdated = System.currentTimeMillis(); 12 } catch (Exception e) { 13 logger.warn("Failed one update cycle", e); 14 } 15 } 16 }; 17 18 // 固定頻率調度 19 scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( 20 wrapperRunnable, 21 initialDelayMs, // 默認 1000 22 refreshIntervalMs, // 默認 30 * 1000 23 TimeUnit.MILLISECONDS 24 ); 25 } else { 26 logger.info("Already active, no-op"); 27 } 28 }
6、判斷Server是否存活
在創建 ILoadBalancer 時,IPing 還沒有看過是如何工作的。在初始化的時候,可以看到,主要就是設置了當前的 ping,然後重新設置了一個調度任務,默認每隔30秒調度一次 PingTask 任務。
1 public void setPing(IPing ping) { 2 if (ping != null) { 3 if (!ping.equals(this.ping)) { 4 this.ping = ping; 5 // 設置 Ping 任務 6 setupPingTask(); 7 } 8 } else { 9 this.ping = null; 10 // cancel the timer task 11 lbTimer.cancel(); 12 } 13 } 14 15 void setupPingTask() { 16 // ... 17 // 創建一個定時調度器 18 lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name, true); 19 // pingIntervalTime 默認為 30 秒,每隔30秒調度一次 PingTask 20 lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000); 21 // 立即發起以 Ping 22 forceQuickPing(); 23 }
ShutdownEnabledTimer 可以簡單了解下,它是繼承自 Timer 的,它在創建的時候向 Runtime 註冊了一個回調,在 jvm 關閉的時候來取消 Timer 的執行,進而釋放資源。


1 public class ShutdownEnabledTimer extends Timer { 2 private static final Logger LOGGER = LoggerFactory.getLogger(ShutdownEnabledTimer.class); 3 4 private Thread cancelThread; 5 private String name; 6 7 public ShutdownEnabledTimer(String name, boolean daemon) { 8 super(name, daemon); 9 this.name = name; 10 // 取消定時器的線程 11 this.cancelThread = new Thread(new Runnable() { 12 public void run() { 13 ShutdownEnabledTimer.super.cancel(); 14 } 15 }); 16 17 LOGGER.info("Shutdown hook installed for: {}", this.name); 18 // 向 Runtime 註冊一個鉤子,在 jvm 關閉時,調用 cancelThread 取消定時任務 19 Runtime.getRuntime().addShutdownHook(this.cancelThread); 20 } 21 22 @Override 23 public void cancel() { 24 super.cancel(); 25 LOGGER.info("Shutdown hook removed for: {}", this.name); 26 try { 27 Runtime.getRuntime().removeShutdownHook(this.cancelThread); 28 } catch (IllegalStateException ise) { 29 LOGGER.info("Exception caught (might be ok if at shutdown)", ise); 30 } 31 32 } 33 }
View Code
再來看下 PingTask,PingTask 核心邏輯就是遍歷 allServers 列表,使用 IPingStrategy 和 IPing 來判斷 Server 是否存活,並更新 Server 的狀態,以及將所有存活的 Server 更新到 upServerList 中,upServerList 緩存了所有存活的 Server。


1 class PingTask extends TimerTask { 2 public void run() { 3 try { 4 // pingStrategy => SerialPingStrategy 5 new Pinger(pingStrategy).runPinger(); 6 } catch (Exception e) { 7 logger.error("LoadBalancer [{}]: Error pinging", name, e); 8 } 9 } 10 } 11 12 class Pinger { 13 private final IPingStrategy pingerStrategy; 14 15 public Pinger(IPingStrategy pingerStrategy) { 16 this.pingerStrategy = pingerStrategy; 17 } 18 19 public void runPinger() throws Exception { 20 if (!pingInProgress.compareAndSet(false, true)) { 21 return; // Ping in progress - nothing to do 22 } 23 24 Server[] allServers = null; 25 boolean[] results = null; 26 27 Lock allLock = null; 28 Lock upLock = null; 29 30 try { 31 allLock = allServerLock.readLock(); 32 allLock.lock(); 33 // 加讀鎖,取出 allServerList 中的 Server 34 allServers = allServerList.toArray(new Server[allServerList.size()]); 35 allLock.unlock(); 36 37 int numCandidates = allServers.length; 38 // 使用 IPingStrategy 和 IPing 對所有 Server 發起 ping 請求 39 results = pingerStrategy.pingServers(ping, allServers); 40 41 final List<Server> newUpList = new ArrayList<Server>(); 42 final List<Server> changedServers = new ArrayList<Server>(); 43 44 for (int i = 0; i < numCandidates; i++) { 45 boolean isAlive = results[i]; 46 Server svr = allServers[i]; 47 boolean oldIsAlive = svr.isAlive(); 48 // 設置 alive 是否存活 49 svr.setAlive(isAlive); 50 51 // 實例變更 52 if (oldIsAlive != isAlive) { 53 changedServers.add(svr); 54 logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}", name, svr.getId(), (isAlive ? "ALIVE" : "DEAD")); 55 } 56 57 // 添加存活的 Server 58 if (isAlive) { 59 newUpList.add(svr); 60 } 61 } 62 upLock = upServerLock.writeLock(); 63 upLock.lock(); 64 // 更新 upServerList,upServerList 只保存了存活的 Server 65 upServerList = newUpList; 66 upLock.unlock(); 67 // 通知變更 68 notifyServerStatusChangeListener(changedServers); 69 } finally { 70 pingInProgress.set(false); 71 } 72 } 73 }
View Code
IPingStrategy 的默認實現類是 SerialPingStrategy,進入可以發現它只是遍歷所有 Server,然後用 IPing 判斷 Server 是否存活。
1 private static class SerialPingStrategy implements IPingStrategy { 2 @Override 3 public boolean[] pingServers(IPing ping, Server[] servers) { 4 int numCandidates = servers.length; 5 boolean[] results = new boolean[numCandidates]; 6 7 for (int i = 0; i < numCandidates; i++) { 8 results[i] = false; 9 try { 10 if (ping != null) { 11 // 使用 IPing 判斷 Server 是否存活 12 results[i] = ping.isAlive(servers[i]); 13 } 14 } catch (Exception e) { 15 logger.error("Exception while pinging Server: '{}'", servers[i], e); 16 } 17 } 18 return results; 19 } 20 }
在集成 eureka-client 後,IPing默認實現類是 NIWSDiscoveryPing,看它的 isAlive 方法,其實就是判斷對應 Server 的實例 InstanceInfo 的狀態是否是 UP 狀態,UP狀態就表示 Server 存活。
1 public boolean isAlive(Server server) { 2 boolean isAlive = true; 3 if (server!=null && server instanceof DiscoveryEnabledServer){ 4 DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server; 5 InstanceInfo instanceInfo = dServer.getInstanceInfo(); 6 if (instanceInfo!=null){ 7 InstanceStatus status = instanceInfo.getStatus(); 8 if (status!=null){ 9 // 判斷Server對應的實例狀態是否是 UP 10 isAlive = status.equals(InstanceStatus.UP); 11 } 12 } 13 } 14 return isAlive; 15 }
7、一張圖總結 Ribbon 核心原理
① Ribbon 核心工作原理總結
- 首先,Ribbon 的7個核心接口共同定義了 Ribbon 的行為特性,它們就是 Ribbon 的核心骨架。
- 使用 Ribbon 來對客戶端做負載均衡,基本的用法就是用 @LoadBalanced 註解標註一個 RestTemplate 的 bean 對象,之後在 LoadBalancerAutoConfiguration 配置類中會對帶有 @LoadBalanced 註解的 RestTemplate 添加 LoadBalancerInterceptor 攔截器。
- LoadBalancerInterceptor 會攔截 RestTemplate 的 HTTP 請求,將請求綁定進 Ribbon 負載均衡的生命周期,然後使用 LoadBalancerClient 的 execute 方法來處理請求。
- LoadBalancerClient 首先會得到一個 ILoadBalancer,再使用它去得到一個 Server,這個 Server 就是具體某一個實例的信息封裝。得到 Server 之後,就用 Server 的 IP 和端口重構原始 URI。
- ILoadBalancer 最終在選擇實例的時候,會通過 IRule 均衡策略來選擇一個 Server。
- ILoadBalancer 的父類 BaseLoadBalancer 中有一個 allServerList 列表緩存了所有 Server,Ribbon 中 Server 的來源就是 allServerList。
- 在加載Ribbon客戶端上下文時,ILoadBalancer 會用 ServerList 從 DiscoveryClient 的 Applications 中獲取客戶端對應的實例列表,然後使用 ServerListFilter 過濾,最後更新到 allServerList 中。
- ILoadBalancer 還會開啟一個後台任務 ServerListUpdater ,每隔30秒運行一次,用 ServerList 將 DiscoveryClient 的 Applications 中的實例列表同步到 allServerList 中。
- ILoadBalancer 還會開啟一個後台任務 PingTask,每隔30秒運行一次,用 IPing 判斷 Server 的存活狀態,EurekaClient 環境下,就是判斷 InstanceInfo 的狀態是否為 UP。
② 下面用一張圖來總結下 Ribbon 這塊獲取Server的核心流程以及對應的核心接口間的關係。
五、Ribbon 核心接口
前面已經了解到 Ribbon 核心接口以及默認實現如何協作來查找要調用的一個實例,這節再來看下各個核心接口的一些特性及其它實現類。
1、客戶端配置 — IClientConfig
IClientConfig 就是管理客戶端配置的核心接口,它的默認實現類是 DefaultClientConfigImpl。可以看到在創建 IClientConfig 時,設置了 Ribbon 客戶端默認的連接和讀取超時時間為 1 秒,例如讀取如果超過1秒,就會返回超時,這兩個一般需要根據實際情況來調整。
1 @Bean 2 @ConditionalOnMissingBean 3 public IClientConfig ribbonClientConfig() { 4 DefaultClientConfigImpl config = new DefaultClientConfigImpl(); 5 // 加載配置 6 config.loadProperties(this.name); 7 // 連接超時默認 1 秒 8 config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT); 9 // 讀取超時默認 1 秒 10 config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT); 11 config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD); 12 return config; 13 }
CommonClientConfigKey 這個類定義了 Ribbon 客戶端相關的所有配置的鍵常量,可以通過這個類來看有哪些配置。


1 public abstract class CommonClientConfigKey<T> implements IClientConfigKey<T> { 2 3 public static final IClientConfigKey<String> AppName = new CommonClientConfigKey<String>("AppName"){}; 4 5 public static final IClientConfigKey<String> Version = new CommonClientConfigKey<String>("Version"){}; 6 7 public static final IClientConfigKey<Integer> Port = new CommonClientConfigKey<Integer>("Port"){}; 8 9 public static final IClientConfigKey<Integer> SecurePort = new CommonClientConfigKey<Integer>("SecurePort"){}; 10 11 public static final IClientConfigKey<String> VipAddress = new CommonClientConfigKey<String>("VipAddress"){}; 12 13 public static final IClientConfigKey<Boolean> ForceClientPortConfiguration = new CommonClientConfigKey<Boolean>("ForceClientPortConfiguration"){}; // use client defined port regardless of server advert 14 15 public static final IClientConfigKey<String> DeploymentContextBasedVipAddresses = new CommonClientConfigKey<String>("DeploymentContextBasedVipAddresses"){}; 16 17 public static final IClientConfigKey<Integer> MaxAutoRetries = new CommonClientConfigKey<Integer>("MaxAutoRetries"){}; 18 19 public static final IClientConfigKey<Integer> MaxAutoRetriesNextServer = new CommonClientConfigKey<Integer>("MaxAutoRetriesNextServer"){}; 20 21 public static final IClientConfigKey<Boolean> OkToRetryOnAllOperations = new CommonClientConfigKey<Boolean>("OkToRetryOnAllOperations"){}; 22 23 public static final IClientConfigKey<Boolean> RequestSpecificRetryOn = new CommonClientConfigKey<Boolean>("RequestSpecificRetryOn"){}; 24 25 public static final IClientConfigKey<Integer> ReceiveBufferSize = new CommonClientConfigKey<Integer>("ReceiveBufferSize"){}; 26 27 public static final IClientConfigKey<Boolean> EnablePrimeConnections = new CommonClientConfigKey<Boolean>("EnablePrimeConnections"){}; 28 29 public static final IClientConfigKey<String> PrimeConnectionsClassName = new CommonClientConfigKey<String>("PrimeConnectionsClassName"){}; 30 31 public static final IClientConfigKey<Integer> MaxRetriesPerServerPrimeConnection = new CommonClientConfigKey<Integer>("MaxRetriesPerServerPrimeConnection"){}; 32 33 public static final IClientConfigKey<Integer> MaxTotalTimeToPrimeConnections = new CommonClientConfigKey<Integer>("MaxTotalTimeToPrimeConnections"){}; 34 35 public static final IClientConfigKey<Float> MinPrimeConnectionsRatio = new CommonClientConfigKey<Float>("MinPrimeConnectionsRatio"){}; 36 37 public static final IClientConfigKey<String> PrimeConnectionsURI = new CommonClientConfigKey<String>("PrimeConnectionsURI"){}; 38 39 public static final IClientConfigKey<Integer> PoolMaxThreads = new CommonClientConfigKey<Integer>("PoolMaxThreads"){}; 40 41 public static final IClientConfigKey<Integer> PoolMinThreads = new CommonClientConfigKey<Integer>("PoolMinThreads"){}; 42 43 public static final IClientConfigKey<Integer> PoolKeepAliveTime = new CommonClientConfigKey<Integer>("PoolKeepAliveTime"){}; 44 45 public static final IClientConfigKey<String> PoolKeepAliveTimeUnits = new CommonClientConfigKey<String>("PoolKeepAliveTimeUnits"){}; 46 47 public static final IClientConfigKey<Boolean> EnableConnectionPool = new CommonClientConfigKey<Boolean>("EnableConnectionPool") {}; 48 49 /** 50 * Use {@link #MaxConnectionsPerHost} 51 */ 52 @Deprecated 53 public static final IClientConfigKey<Integer> MaxHttpConnectionsPerHost = new CommonClientConfigKey<Integer>("MaxHttpConnectionsPerHost"){}; 54 55 /** 56 * Use {@link #MaxTotalConnections} 57 */ 58 @Deprecated 59 public static final IClientConfigKey<Integer> MaxTotalHttpConnections = new CommonClientConfigKey<Integer>("MaxTotalHttpConnections"){}; 60 61 public static final IClientConfigKey<Integer> MaxConnectionsPerHost = new CommonClientConfigKey<Integer>("MaxConnectionsPerHost"){}; 62 63 public static final IClientConfigKey<Integer> MaxTotalConnections = new CommonClientConfigKey<Integer>("MaxTotalConnections"){}; 64 65 public static final IClientConfigKey<Boolean> IsSecure = new CommonClientConfigKey<Boolean>("IsSecure"){}; 66 67 public static final IClientConfigKey<Boolean> GZipPayload = new CommonClientConfigKey<Boolean>("GZipPayload"){}; 68 69 public static final IClientConfigKey<Integer> ConnectTimeout = new CommonClientConfigKey<Integer>("ConnectTimeout"){}; 70 71 public static final IClientConfigKey<Integer> BackoffInterval = new CommonClientConfigKey<Integer>("BackoffTimeout"){}; 72 73 public static final IClientConfigKey<Integer> ReadTimeout = new CommonClientConfigKey<Integer>("ReadTimeout"){}; 74 75 public static final IClientConfigKey<Integer> SendBufferSize = new CommonClientConfigKey<Integer>("SendBufferSize"){}; 76 77 public static final IClientConfigKey<Boolean> StaleCheckingEnabled = new CommonClientConfigKey<Boolean>("StaleCheckingEnabled"){}; 78 79 public static final IClientConfigKey<Integer> Linger = new CommonClientConfigKey<Integer>("Linger"){}; 80 81 public static final IClientConfigKey<Integer> ConnectionManagerTimeout = new CommonClientConfigKey<Integer>("ConnectionManagerTimeout"){}; 82 83 public static final IClientConfigKey<Boolean> FollowRedirects = new CommonClientConfigKey<Boolean>("FollowRedirects"){}; 84 85 public static final IClientConfigKey<Boolean> ConnectionPoolCleanerTaskEnabled = new CommonClientConfigKey<Boolean>("ConnectionPoolCleanerTaskEnabled"){}; 86 87 public static final IClientConfigKey<Integer> ConnIdleEvictTimeMilliSeconds = new CommonClientConfigKey<Integer>("ConnIdleEvictTimeMilliSeconds"){}; 88 89 public static final IClientConfigKey<Integer> ConnectionCleanerRepeatInterval = new CommonClientConfigKey<Integer>("ConnectionCleanerRepeatInterval"){}; 90 91 public static final IClientConfigKey<Boolean> EnableGZIPContentEncodingFilter = new CommonClientConfigKey<Boolean>("EnableGZIPContentEncodingFilter"){}; 92 93 public static final IClientConfigKey<String> ProxyHost = new CommonClientConfigKey<String>("ProxyHost"){}; 94 95 public static final IClientConfigKey<Integer> ProxyPort = new CommonClientConfigKey<Integer>("ProxyPort"){}; 96 97 public static final IClientConfigKey<String> KeyStore = new CommonClientConfigKey<String>("KeyStore"){}; 98 99 public static final IClientConfigKey<String> KeyStorePassword = new CommonClientConfigKey<String>("KeyStorePassword"){}; 100 101 public static final IClientConfigKey<String> TrustStore = new CommonClientConfigKey<String>("TrustStore"){}; 102 103 public static final IClientConfigKey<String> TrustStorePassword = new CommonClientConfigKey<String>("TrustStorePassword"){}; 104 105 // if this is a secure rest client, must we use client auth too? 106 public static final IClientConfigKey<Boolean> IsClientAuthRequired = new CommonClientConfigKey<Boolean>("IsClientAuthRequired"){}; 107 108 public static final IClientConfigKey<String> CustomSSLSocketFactoryClassName = new CommonClientConfigKey<String>("CustomSSLSocketFactoryClassName"){}; 109 // must host name match name in certificate? 110 public static final IClientConfigKey<Boolean> IsHostnameValidationRequired = new CommonClientConfigKey<Boolean>("IsHostnameValidationRequired"){}; 111 112 // see also //hc.apache.org/httpcomponents-client-ga/tutorial/html/advanced.html 113 public static final IClientConfigKey<Boolean> IgnoreUserTokenInConnectionPoolForSecureClient = new CommonClientConfigKey<Boolean>("IgnoreUserTokenInConnectionPoolForSecureClient"){}; 114 115 // Client implementation 116 public static final IClientConfigKey<String> ClientClassName = new CommonClientConfigKey<String>("ClientClassName"){}; 117 118 //LoadBalancer Related 119 public static final IClientConfigKey<Boolean> InitializeNFLoadBalancer = new CommonClientConfigKey<Boolean>("InitializeNFLoadBalancer"){}; 120 121 public static final IClientConfigKey<String> NFLoadBalancerClassName = new CommonClientConfigKey<String>("NFLoadBalancerClassName"){}; 122 123 public static final IClientConfigKey<String> NFLoadBalancerRuleClassName = new CommonClientConfigKey<String>("NFLoadBalancerRuleClassName"){}; 124 125 public static final IClientConfigKey<String> NFLoadBalancerPingClassName = new CommonClientConfigKey<String>("NFLoadBalancerPingClassName"){}; 126 127 public static final IClientConfigKey<Integer> NFLoadBalancerPingInterval = new CommonClientConfigKey<Integer>("NFLoadBalancerPingInterval"){}; 128 129 public static final IClientConfigKey<Integer> NFLoadBalancerMaxTotalPingTime = new CommonClientConfigKey<Integer>("NFLoadBalancerMaxTotalPingTime"){}; 130 131 public static final IClientConfigKey<String> NFLoadBalancerStatsClassName = new CommonClientConfigKey<String>("NFLoadBalancerStatsClassName"){}; 132 133 public static final IClientConfigKey<String> NIWSServerListClassName = new CommonClientConfigKey<String>("NIWSServerListClassName"){}; 134 135 public static final IClientConfigKey<String> ServerListUpdaterClassName = new CommonClientConfigKey<String>("ServerListUpdaterClassName"){}; 136 137 public static final IClientConfigKey<String> NIWSServerListFilterClassName = new CommonClientConfigKey<String>("NIWSServerListFilterClassName"){}; 138 139 public static final IClientConfigKey<Integer> ServerListRefreshInterval = new CommonClientConfigKey<Integer>("ServerListRefreshInterval"){}; 140 141 public static final IClientConfigKey<Boolean> EnableMarkingServerDownOnReachingFailureLimit = new CommonClientConfigKey<Boolean>("EnableMarkingServerDownOnReachingFailureLimit"){}; 142 143 public static final IClientConfigKey<Integer> ServerDownFailureLimit = new CommonClientConfigKey<Integer>("ServerDownFailureLimit"){}; 144 145 public static final IClientConfigKey<Integer> ServerDownStatWindowInMillis = new CommonClientConfigKey<Integer>("ServerDownStatWindowInMillis"){}; 146 147 public static final IClientConfigKey<Boolean> EnableZoneAffinity = new CommonClientConfigKey<Boolean>("EnableZoneAffinity"){}; 148 149 public static final IClientConfigKey<Boolean> EnableZoneExclusivity = new CommonClientConfigKey<Boolean>("EnableZoneExclusivity"){}; 150 151 public static final IClientConfigKey<Boolean> PrioritizeVipAddressBasedServers = new CommonClientConfigKey<Boolean>("PrioritizeVipAddressBasedServers"){}; 152 153 public static final IClientConfigKey<String> VipAddressResolverClassName = new CommonClientConfigKey<String>("VipAddressResolverClassName"){}; 154 155 public static final IClientConfigKey<String> TargetRegion = new CommonClientConfigKey<String>("TargetRegion"){}; 156 157 public static final IClientConfigKey<String> RulePredicateClasses = new CommonClientConfigKey<String>("RulePredicateClasses"){}; 158 159 public static final IClientConfigKey<String> RequestIdHeaderName = new CommonClientConfigKey<String>("RequestIdHeaderName") {}; 160 161 public static final IClientConfigKey<Boolean> UseIPAddrForServer = new CommonClientConfigKey<Boolean>("UseIPAddrForServer") {}; 162 163 public static final IClientConfigKey<String> ListOfServers = new CommonClientConfigKey<String>("listOfServers") {}; 164 165 private static final Set<IClientConfigKey> keys = new HashSet<IClientConfigKey>(); 166 167 // ... 168 }
View Code
進入到 DefaultClientConfigImpl,可以看到 CommonClientConfigKey 中的每個配置都對應了一個默認值。在加載配置的時候,如果用戶沒有定製配置,就會使用默認的配置。


1 public class DefaultClientConfigImpl implements IClientConfig { 2 3 public static final Boolean DEFAULT_PRIORITIZE_VIP_ADDRESS_BASED_SERVERS = Boolean.TRUE; 4 5 public static final String DEFAULT_NFLOADBALANCER_PING_CLASSNAME = "com.netflix.loadbalancer.DummyPing"; // DummyPing.class.getName(); 6 7 public static final String DEFAULT_NFLOADBALANCER_RULE_CLASSNAME = "com.netflix.loadbalancer.AvailabilityFilteringRule"; 8 9 public static final String DEFAULT_NFLOADBALANCER_CLASSNAME = "com.netflix.loadbalancer.ZoneAwareLoadBalancer"; 10 11 public static final boolean DEFAULT_USEIPADDRESS_FOR_SERVER = Boolean.FALSE; 12 13 public static final String DEFAULT_CLIENT_CLASSNAME = "com.netflix.niws.client.http.RestClient"; 14 15 public static final String DEFAULT_VIPADDRESS_RESOLVER_CLASSNAME = "com.netflix.client.SimpleVipAddressResolver"; 16 17 public static final String DEFAULT_PRIME_CONNECTIONS_URI = "/"; 18 19 public static final int DEFAULT_MAX_TOTAL_TIME_TO_PRIME_CONNECTIONS = 30000; 20 21 public static final int DEFAULT_MAX_RETRIES_PER_SERVER_PRIME_CONNECTION = 9; 22 23 public static final Boolean DEFAULT_ENABLE_PRIME_CONNECTIONS = Boolean.FALSE; 24 25 public static final int DEFAULT_MAX_REQUESTS_ALLOWED_PER_WINDOW = Integer.MAX_VALUE; 26 27 public static final int DEFAULT_REQUEST_THROTTLING_WINDOW_IN_MILLIS = 60000; 28 29 public static final Boolean DEFAULT_ENABLE_REQUEST_THROTTLING = Boolean.FALSE; 30 31 public static final Boolean DEFAULT_ENABLE_GZIP_CONTENT_ENCODING_FILTER = Boolean.FALSE; 32 33 public static final Boolean DEFAULT_CONNECTION_POOL_CLEANER_TASK_ENABLED = Boolean.TRUE; 34 35 public static final Boolean DEFAULT_FOLLOW_REDIRECTS = Boolean.FALSE; 36 37 public static final float DEFAULT_PERCENTAGE_NIWS_EVENT_LOGGED = 0.0f; 38 39 public static final int DEFAULT_MAX_AUTO_RETRIES_NEXT_SERVER = 1; 40 41 public static final int DEFAULT_MAX_AUTO_RETRIES = 0; 42 43 public static final int DEFAULT_BACKOFF_INTERVAL = 0; 44 45 public static final int DEFAULT_READ_TIMEOUT = 5000; 46 47 public static final int DEFAULT_CONNECTION_MANAGER_TIMEOUT = 2000; 48 49 public static final int DEFAULT_CONNECT_TIMEOUT = 2000; 50 51 public static final Boolean DEFAULT_ENABLE_CONNECTION_POOL = Boolean.TRUE; 52 53 @Deprecated 54 public static final int DEFAULT_MAX_HTTP_CONNECTIONS_PER_HOST = 50; 55 56 @Deprecated 57 public static final int DEFAULT_MAX_TOTAL_HTTP_CONNECTIONS = 200; 58 59 public static final int DEFAULT_MAX_CONNECTIONS_PER_HOST = 50; 60 61 public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 200; 62 63 public static final float DEFAULT_MIN_PRIME_CONNECTIONS_RATIO = 1.0f; 64 65 public static final String DEFAULT_PRIME_CONNECTIONS_CLASS = "com.netflix.niws.client.http.HttpPrimeConnection"; 66 67 public static final String DEFAULT_SEVER_LIST_CLASS = "com.netflix.loadbalancer.ConfigurationBasedServerList"; 68 69 public static final String DEFAULT_SERVER_LIST_UPDATER_CLASS = "com.netflix.loadbalancer.PollingServerListUpdater"; 70 71 public static final int DEFAULT_CONNECTION_IDLE_TIMERTASK_REPEAT_IN_MSECS = 30000; // every half minute (30 secs) 72 73 public static final int DEFAULT_CONNECTIONIDLE_TIME_IN_MSECS = 30000; // all connections idle for 30 secs 74 75 protected volatile Map<String, Object> properties = new ConcurrentHashMap<String, Object>(); 76 77 protected Map<IClientConfigKey<?>, Object> typedProperties = new ConcurrentHashMap<IClientConfigKey<?>, Object>(); 78 79 private static final Logger LOG = LoggerFactory.getLogger(DefaultClientConfigImpl.class); 80 81 private String clientName = null; 82 83 private VipAddressResolver resolver = null; 84 85 private boolean enableDynamicProperties = true; 86 /** 87 * Defaults for the parameters for the thread pool used by batchParallel 88 * calls 89 */ 90 public static final int DEFAULT_POOL_MAX_THREADS = DEFAULT_MAX_TOTAL_HTTP_CONNECTIONS; 91 public static final int DEFAULT_POOL_MIN_THREADS = 1; 92 public static final long DEFAULT_POOL_KEEP_ALIVE_TIME = 15 * 60L; 93 public static final TimeUnit DEFAULT_POOL_KEEP_ALIVE_TIME_UNITS = TimeUnit.SECONDS; 94 public static final Boolean DEFAULT_ENABLE_ZONE_AFFINITY = Boolean.FALSE; 95 public static final Boolean DEFAULT_ENABLE_ZONE_EXCLUSIVITY = Boolean.FALSE; 96 public static final int DEFAULT_PORT = 7001; 97 public static final Boolean DEFAULT_ENABLE_LOADBALANCER = Boolean.TRUE; 98 99 public static final String DEFAULT_PROPERTY_NAME_SPACE = "ribbon"; 100 101 private String propertyNameSpace = DEFAULT_PROPERTY_NAME_SPACE; 102 103 public static final Boolean DEFAULT_OK_TO_RETRY_ON_ALL_OPERATIONS = Boolean.FALSE; 104 105 public static final Boolean DEFAULT_ENABLE_NIWS_EVENT_LOGGING = Boolean.TRUE; 106 107 public static final Boolean DEFAULT_IS_CLIENT_AUTH_REQUIRED = Boolean.FALSE; 108 109 private final Map<String, DynamicStringProperty> dynamicProperties = new ConcurrentHashMap<String, DynamicStringProperty>(); 110 111 public Boolean getDefaultPrioritizeVipAddressBasedServers() { 112 return DEFAULT_PRIORITIZE_VIP_ADDRESS_BASED_SERVERS; 113 } 114 115 public String getDefaultNfloadbalancerPingClassname() { 116 return DEFAULT_NFLOADBALANCER_PING_CLASSNAME; 117 } 118 119 public String getDefaultNfloadbalancerRuleClassname() { 120 return DEFAULT_NFLOADBALANCER_RULE_CLASSNAME; 121 } 122 123 public String getDefaultNfloadbalancerClassname() { 124 return DEFAULT_NFLOADBALANCER_CLASSNAME; 125 } 126 127 public boolean getDefaultUseIpAddressForServer() { 128 return DEFAULT_USEIPADDRESS_FOR_SERVER; 129 } 130 131 public String getDefaultClientClassname() { 132 return DEFAULT_CLIENT_CLASSNAME; 133 } 134 135 public String getDefaultVipaddressResolverClassname() { 136 return DEFAULT_VIPADDRESS_RESOLVER_CLASSNAME; 137 } 138 139 public String getDefaultPrimeConnectionsUri() { 140 return DEFAULT_PRIME_CONNECTIONS_URI; 141 } 142 143 public int getDefaultMaxTotalTimeToPrimeConnections() { 144 return DEFAULT_MAX_TOTAL_TIME_TO_PRIME_CONNECTIONS; 145 } 146 147 public int getDefaultMaxRetriesPerServerPrimeConnection() { 148 return DEFAULT_MAX_RETRIES_PER_SERVER_PRIME_CONNECTION; 149 } 150 151 public Boolean getDefaultEnablePrimeConnections() { 152 return DEFAULT_ENABLE_PRIME_CONNECTIONS; 153 } 154 155 public int getDefaultMaxRequestsAllowedPerWindow() { 156 return DEFAULT_MAX_REQUESTS_ALLOWED_PER_WINDOW; 157 } 158 159 public int getDefaultRequestThrottlingWindowInMillis() { 160 return DEFAULT_REQUEST_THROTTLING_WINDOW_IN_MILLIS; 161 } 162 163 public Boolean getDefaultEnableRequestThrottling() { 164 return DEFAULT_ENABLE_REQUEST_THROTTLING; 165 } 166 167 public Boolean getDefaultEnableGzipContentEncodingFilter() { 168 return DEFAULT_ENABLE_GZIP_CONTENT_ENCODING_FILTER; 169 } 170 171 public Boolean getDefaultConnectionPoolCleanerTaskEnabled() { 172 return DEFAULT_CONNECTION_POOL_CLEANER_TASK_ENABLED; 173 } 174 175 public Boolean getDefaultFollowRedirects() { 176 return DEFAULT_FOLLOW_REDIRECTS; 177 } 178 179 public float getDefaultPercentageNiwsEventLogged() { 180 return DEFAULT_PERCENTAGE_NIWS_EVENT_LOGGED; 181 } 182 183 public int getDefaultMaxAutoRetriesNextServer() { 184 return DEFAULT_MAX_AUTO_RETRIES_NEXT_SERVER; 185 } 186 187 public int getDefaultMaxAutoRetries() { 188 return DEFAULT_MAX_AUTO_RETRIES; 189 } 190 191 public int getDefaultReadTimeout() { 192 return DEFAULT_READ_TIMEOUT; 193 } 194 195 public int getDefaultConnectionManagerTimeout() { 196 return DEFAULT_CONNECTION_MANAGER_TIMEOUT; 197 } 198 199 public int getDefaultConnectTimeout() { 200 return DEFAULT_CONNECT_TIMEOUT; 201 } 202 203 @Deprecated 204 public int getDefaultMaxHttpConnectionsPerHost() { 205 return DEFAULT_MAX_HTTP_CONNECTIONS_PER_HOST; 206 } 207 208 @Deprecated 209 public int getDefaultMaxTotalHttpConnections() { 210 return DEFAULT_MAX_TOTAL_HTTP_CONNECTIONS; 211 } 212 213 public int getDefaultMaxConnectionsPerHost() { 214 return DEFAULT_MAX_CONNECTIONS_PER_HOST; 215 } 216 217 public int getDefaultMaxTotalConnections() { 218 return DEFAULT_MAX_TOTAL_CONNECTIONS; 219 } 220 221 public float getDefaultMinPrimeConnectionsRatio() { 222 return DEFAULT_MIN_PRIME_CONNECTIONS_RATIO; 223 } 224 225 public String getDefaultPrimeConnectionsClass() { 226 return DEFAULT_PRIME_CONNECTIONS_CLASS; 227 } 228 229 public String getDefaultSeverListClass() { 230 return DEFAULT_SEVER_LIST_CLASS; 231 } 232 233 public int getDefaultConnectionIdleTimertaskRepeatInMsecs() { 234 return DEFAULT_CONNECTION_IDLE_TIMERTASK_REPEAT_IN_MSECS; 235 } 236 237 public int getDefaultConnectionidleTimeInMsecs() { 238 return DEFAULT_CONNECTIONIDLE_TIME_IN_MSECS; 239 } 240 241 public VipAddressResolver getResolver() { 242 return resolver; 243 } 244 245 public boolean isEnableDynamicProperties() { 246 return enableDynamicProperties; 247 } 248 249 public int getDefaultPoolMaxThreads() { 250 return DEFAULT_POOL_MAX_THREADS; 251 } 252 253 public int getDefaultPoolMinThreads() { 254 return DEFAULT_POOL_MIN_THREADS; 255 } 256 257 public long getDefaultPoolKeepAliveTime() { 258 return DEFAULT_POOL_KEEP_ALIVE_TIME; 259 } 260 261 public TimeUnit getDefaultPoolKeepAliveTimeUnits() { 262 return DEFAULT_POOL_KEEP_ALIVE_TIME_UNITS; 263 } 264 265 public Boolean getDefaultEnableZoneAffinity() { 266 return DEFAULT_ENABLE_ZONE_AFFINITY; 267 } 268 269 public Boolean getDefaultEnableZoneExclusivity() { 270 return DEFAULT_ENABLE_ZONE_EXCLUSIVITY; 271 } 272 273 public int getDefaultPort() { 274 return DEFAULT_PORT; 275 } 276 277 public Boolean getDefaultEnableLoadbalancer() { 278 return DEFAULT_ENABLE_LOADBALANCER; 279 } 280 281 282 public Boolean getDefaultOkToRetryOnAllOperations() { 283 return DEFAULT_OK_TO_RETRY_ON_ALL_OPERATIONS; 284 } 285 286 public Boolean getDefaultIsClientAuthRequired(){ 287 return DEFAULT_IS_CLIENT_AUTH_REQUIRED; 288 } 289 290 291 /** 292 * Create instance with no properties in default name space {@link #DEFAULT_PROPERTY_NAME_SPACE} 293 */ 294 public DefaultClientConfigImpl() { 295 this.dynamicProperties.clear(); 296 this.enableDynamicProperties = false; 297 } 298 299 /** 300 * Create instance with no properties in the specified name space 301 */ 302 public DefaultClientConfigImpl(String nameSpace) { 303 this(); 304 this.propertyNameSpace = nameSpace; 305 } 306 307 public void loadDefaultValues() { 308 putDefaultIntegerProperty(CommonClientConfigKey.MaxHttpConnectionsPerHost, getDefaultMaxHttpConnectionsPerHost()); 309 putDefaultIntegerProperty(CommonClientConfigKey.MaxTotalHttpConnections, getDefaultMaxTotalHttpConnections()); 310 putDefaultBooleanProperty(CommonClientConfigKey.EnableConnectionPool, getDefaultEnableConnectionPool()); 311 putDefaultIntegerProperty(CommonClientConfigKey.MaxConnectionsPerHost, getDefaultMaxConnectionsPerHost()); 312 putDefaultIntegerProperty(CommonClientConfigKey.MaxTotalConnections, getDefaultMaxTotalConnections()); 313 putDefaultIntegerProperty(CommonClientConfigKey.ConnectTimeout, getDefaultConnectTimeout()); 314 putDefaultIntegerProperty(CommonClientConfigKey.ConnectionManagerTimeout, getDefaultConnectionManagerTimeout()); 315 putDefaultIntegerProperty(CommonClientConfigKey.ReadTimeout, getDefaultReadTimeout()); 316 putDefaultIntegerProperty(CommonClientConfigKey.MaxAutoRetries, getDefaultMaxAutoRetries()); 317 putDefaultIntegerProperty(CommonClientConfigKey.MaxAutoRetriesNextServer, getDefaultMaxAutoRetriesNextServer()); 318 putDefaultBooleanProperty(CommonClientConfigKey.OkToRetryOnAllOperations, getDefaultOkToRetryOnAllOperations()); 319 putDefaultBooleanProperty(CommonClientConfigKey.FollowRedirects, getDefaultFollowRedirects()); 320 putDefaultBooleanProperty(CommonClientConfigKey.ConnectionPoolCleanerTaskEnabled, getDefaultConnectionPoolCleanerTaskEnabled()); 321 putDefaultIntegerProperty(CommonClientConfigKey.ConnIdleEvictTimeMilliSeconds, getDefaultConnectionidleTimeInMsecs()); 322 putDefaultIntegerProperty(CommonClientConfigKey.ConnectionCleanerRepeatInterval, getDefaultConnectionIdleTimertaskRepeatInMsecs()); 323 putDefaultBooleanProperty(CommonClientConfigKey.EnableGZIPContentEncodingFilter, getDefaultEnableGzipContentEncodingFilter()); 324 String proxyHost = ConfigurationManager.getConfigInstance().getString(getDefaultPropName(CommonClientConfigKey.ProxyHost.key())); 325 if (proxyHost != null && proxyHost.length() > 0) { 326 setProperty(CommonClientConfigKey.ProxyHost, proxyHost); 327 } 328 Integer proxyPort = ConfigurationManager 329 .getConfigInstance() 330 .getInteger( 331 getDefaultPropName(CommonClientConfigKey.ProxyPort), 332 (Integer.MIN_VALUE + 1)); // + 1 just to avoid potential clash with user setting 333 if (proxyPort != (Integer.MIN_VALUE + 1)) { 334 setProperty(CommonClientConfigKey.ProxyPort, proxyPort); 335 } 336 putDefaultIntegerProperty(CommonClientConfigKey.Port, getDefaultPort()); 337 putDefaultBooleanProperty(CommonClientConfigKey.EnablePrimeConnections, getDefaultEnablePrimeConnections()); 338 putDefaultIntegerProperty(CommonClientConfigKey.MaxRetriesPerServerPrimeConnection, getDefaultMaxRetriesPerServerPrimeConnection()); 339 putDefaultIntegerProperty(CommonClientConfigKey.MaxTotalTimeToPrimeConnections, getDefaultMaxTotalTimeToPrimeConnections()); 340 putDefaultStringProperty(CommonClientConfigKey.PrimeConnectionsURI, getDefaultPrimeConnectionsUri()); 341 putDefaultIntegerProperty(CommonClientConfigKey.PoolMinThreads, getDefaultPoolMinThreads()); 342 putDefaultIntegerProperty(CommonClientConfigKey.PoolMaxThreads, getDefaultPoolMaxThreads()); 343 putDefaultLongProperty(CommonClientConfigKey.PoolKeepAliveTime, getDefaultPoolKeepAliveTime()); 344 putDefaultTimeUnitProperty(CommonClientConfigKey.PoolKeepAliveTimeUnits, getDefaultPoolKeepAliveTimeUnits()); 345 putDefaultBooleanProperty(CommonClientConfigKey.EnableZoneAffinity, getDefaultEnableZoneAffinity()); 346 putDefaultBooleanProperty(CommonClientConfigKey.EnableZoneExclusivity, getDefaultEnableZoneExclusivity()); 347 putDefaultStringProperty(CommonClientConfigKey.ClientClassName, getDefaultClientClassname()); 348 putDefaultStringProperty(CommonClientConfigKey.NFLoadBalancerClassName, getDefaultNfloadbalancerClassname()); 349 putDefaultStringProperty(CommonClientConfigKey.NFLoadBalancerRuleClassName, getDefaultNfloadbalancerRuleClassname()); 350 putDefaultStringProperty(CommonClientConfigKey.NFLoadBalancerPingClassName, getDefaultNfloadbalancerPingClassname()); 351 putDefaultBooleanProperty(CommonClientConfigKey.PrioritizeVipAddressBasedServers, getDefaultPrioritizeVipAddressBasedServers()); 352 putDefaultFloatProperty(CommonClientConfigKey.MinPrimeConnectionsRatio, getDefaultMinPrimeConnectionsRatio()); 353 putDefaultStringProperty(CommonClientConfigKey.PrimeConnectionsClassName, getDefaultPrimeConnectionsClass()); 354 putDefaultStringProperty(CommonClientConfigKey.NIWSServerListClassName, getDefaultSeverListClass()); 355 putDefaultStringProperty(CommonClientConfigKey.VipAddressResolverClassName, getDefaultVipaddressResolverClassname()); 356 putDefaultBooleanProperty(CommonClientConfigKey.IsClientAuthRequired, getDefaultIsClientAuthRequired()); 357 // putDefaultStringProperty(CommonClientConfigKey.RequestIdHeaderName, getDefaultRequestIdHeaderName()); 358 putDefaultBooleanProperty(CommonClientConfigKey.UseIPAddrForServer, getDefaultUseIpAddressForServer()); 359 putDefaultStringProperty(CommonClientConfigKey.ListOfServers, ""); 360 } 361 }
View Code
也可以在配置文件中定製配置,例如配置超時和重試:
1 # 全局配置 2 ribbon: 3 # 客戶端讀取超時時間 4 ReadTimeout: 3000 5 # 客戶端連接超時時間 6 ConnectTimeout: 3000 7 # 默認只重試 GET,設置為 true 時將重試所有類型,如 POST、PUT、DELETE 8 OkToRetryOnAllOperations: false 9 # 重試次數 10 MaxAutoRetries: 1 11 # 最多重試幾個實例 12 MaxAutoRetriesNextServer: 1 13 14 # 只針對 demo-producer 客戶端 15 demo-producer: 16 ribbon: 17 # 客戶端讀取超時時間 18 ReadTimeout: 5000 19 # 客戶端連接超時時間 20 ConnectTimeout: 3000
2、均衡策略 — IRule
IRule 是最終選擇 Server 的策略規則類,核心的接口就是 choose。
1 public interface IRule{ 2 3 // 選擇 Server 4 public Server choose(Object key); 5 6 // 設置 ILoadBalancer 7 public void setLoadBalancer(ILoadBalancer lb); 8 9 // 獲取 ILoadBalancer 10 public ILoadBalancer getLoadBalancer(); 11 }
Ribbon 提供了豐富的負載均衡策略,我們也可以通過配置指定使用某個均衡策略。下面是整個Ribbon提供的 IRule 均衡策略。
3、服務檢查 — IPing
IPing 是用於定期檢查 Server 的可用性的,它只提供了一個接口,用來判斷 Server 是否存活:
1 public interface IPing { 2 3 public boolean isAlive(Server server); 4 }
IPing 也提供了多種策略可選,下面是整個 IPing 體系結構:
4、獲取服務列表 — ServerList
ServerList 提供了兩個接口,一個是第一次獲取 Server 列表,一個是更新 Server 列表,其中 getUpdatedListOfServers 會每被 Loadbalancer 隔 30 秒調一次來更新 allServerList。
1 public interface ServerList<T extends Server> { 2 3 public List<T> getInitialListOfServers(); 4 5 /** 6 * Return updated list of servers. This is called say every 30 secs 7 * (configurable) by the Loadbalancer's Ping cycle 8 */ 9 public List<T> getUpdatedListOfServers(); 10 }
ServerList 也提供了多種實現,ServerList 體系結構如下:
5、過濾服務 — ServerListFilter
ServerListFilter 提供了一個接口用來過濾出可用的 Server。
1 public interface ServerListFilter<T extends Server> { 2 3 public List<T> getFilteredListOfServers(List<T> servers); 4 }
ServerListFilter 體系結構如下:
6、服務列表更新 — ServerListUpdater
ServerListUpdater 有多個接口,最核心的就是 start 開啟定時任務調用 updateAction 來更新 allServerList。
1 public interface ServerListUpdater { 2 3 /** 4 * an interface for the updateAction that actually executes a server list update 5 */ 6 public interface UpdateAction { 7 void doUpdate(); 8 } 9 10 /** 11 * start the serverList updater with the given update action 12 * This call should be idempotent. 13 */ 14 void start(UpdateAction updateAction); 15 }
默認有兩個實現類:
7、負載均衡器 — ILoadBalancer
ILoadBalancer 是負載均衡選擇服務的核心接口,主要提供了如下的獲取Server列表和根據客戶端名稱選擇Server的接口。
1 public interface ILoadBalancer { 2 3 // 添加Server 4 public void addServers(List<Server> newServers); 5 6 // 根據key選擇一個Server 7 public Server chooseServer(Object key); 8 9 // 獲取存活的Server列表,返回 upServerList 10 public List<Server> getReachableServers(); 11 12 // 獲取所有Server列表,返回 allServerList 13 public List<Server> getAllServers(); 14 }
ILoadBalancer 的體系結構如下: