SpringCloud升級之路2020.0.x版-32. 改進負載均衡演算法
- 2021 年 11 月 11 日
- 筆記
- Spring Cloud, Spring Cloud 升級之路
在前面一節,我們梳理了實現 Feign 斷路器以及執行緒隔離的思路,這一節,我們先不看如何源碼實現(因為源碼中會包含負載均衡演算法的改進部分),先來討論下如何優化目前的負載均衡演算法。
之前的負載均衡演算法
- 獲取服務實例列表,將實例列表按照 ip 埠排序,如果不排序即使 position 是下一個可能也代表的是之前已經調用過的實例
- 根據請求中的 traceId,從本地快取中以 traceId 為 key 獲取一個初始值為隨機數的原子變數 position,這樣防止所有請求都從第一個實例開始調用,之後第二個、第三個這樣。
- position 原子加一,之後對實例個數取余,返回對應下標的實例進行調用
其中請求包含 traceId 是來自於我們使用了 spring-cloud-sleuth 鏈路追蹤,基於這種機制我們能保證請求不會重試到之前已經調用過的實例。源碼是:
//一定必須是實現ReactorServiceInstanceLoadBalancer
//而不是ReactorLoadBalancer<ServiceInstance>
//因為註冊的時候是ReactorServiceInstanceLoadBalancer
@Log4j2
public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private final ServiceInstanceListSupplier serviceInstanceListSupplier;
//每次請求算上重試不會超過1分鐘
//對於超過1分鐘的,這種請求肯定比較重,不應該重試
private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
//隨機初始值,防止每次都是從第一個開始調用
.build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000)));
private final String serviceId;
private final Tracer tracer;
public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) {
this.serviceInstanceListSupplier = serviceInstanceListSupplier;
this.serviceId = serviceId;
this.tracer = tracer;
}
//每次重試,其實都會調用這個 choose 方法重新獲取一個實例
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances));
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
if (serviceInstances.isEmpty()) {
log.warn("No servers available for service: " + this.serviceId);
return new EmptyResponse();
}
return getInstanceResponseByRoundRobin(serviceInstances);
}
private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) {
if (serviceInstances.isEmpty()) {
log.warn("No servers available for service: " + this.serviceId);
return new EmptyResponse();
}
//為了解決原始演算法不同調用並發可能導致一個請求重試相同的實例
//從 sleuth 的 Tracer 中獲取當前請求的上下文
Span currentSpan = tracer.currentSpan();
//如果上下文不存在,則可能不是前端用戶請求,而是其他某些機制觸發,我們就創建一個新的上下文
if (currentSpan == null) {
currentSpan = tracer.newTrace();
}
//從請求上下文中獲取請求的 traceId,用來唯一標識一個請求
long l = currentSpan.context().traceId();
AtomicInteger seed = positionCache.get(l);
int s = seed.getAndIncrement();
int pos = s % serviceInstances.size();
log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size());
return new DefaultResponse(serviceInstances.stream()
//實例返回列表順序可能不同,為了保持一致,先排序再取
.sorted(Comparator.comparing(ServiceInstance::getInstanceId))
.collect(Collectors.toList()).get(pos));
}
}
但是在這次請求突增很多的時候,這種負載均衡演算法還是給我們帶來了問題。
首先,本次突增,我們並沒有採取擴容,導致本次的性能壓力對於壓力的均衡分布非常敏感。舉個例子是,假設微服務 A 有 9 個實例,在業務高峰點來的時候,最理想的情況是保證無論何時這 9 個負載壓力都完全均衡,但是由於我們使用了初始值為隨機數的原子變數 position,雖然從一天的總量上來看,負責均衡壓力肯定是均衡,但是在某一小段時間內,很可能壓力全都跑到了某幾個實例上,導致這幾個實例被壓垮,熔斷,然後又都跑到了另外的幾個實例上,又被壓垮,熔斷,如此惡性循環。
然後,我們部署採用的是 k8s 部署,同一個虛擬機上面可能會跑很多微服務的 pod。在某些情況下,同一個微服務的多個 pod 可能會跑到同一個虛擬機 Node 上,這個可以從pod 的 ip 網段上看出來:例如某個微服務有如下 7 個實例:10.238.13.12:8181,10.238.13.24:8181,10.238.15.12:8181,10.238.17.12:8181,10.238.20.220:8181,10.238.21.31:8181,10.238.21.121:8181,那麼 10.238.13.12:8181 與 10.238.13.24:8181 很可能在同一個 Node 上,10.238.21.31:8181 和 10.238.21.121:8181 很可能在同一個 Node 上。我們重試,需要優先重試與之前重試過的實例盡量不在同一個 Node 上的實例,因為同一個 Node 上的實例只要有一個有問題或者壓力過大,其他的基本上也有問題或者壓力過大。
最後,如果調用某個實例一直失敗,那麼這個實例的調用優先順序需要排在其他正常的實例後面。這個對於減少快速刷新發布(一下子啟動很多實例之後停掉多個老實例,實例個數大於重試次數配置)對於用戶的影響,以及某個可用區突然發生異常導致多個實例下線對用戶的影響,以及業務壓力已經過去,壓力變小後,需要關掉不再需要的實例,導致大量實例發生遷移的時候對用戶的影響,有很大的作用。
針對以上問題的優化方案
我們針對上面三個問題,提出了一種優化後的解決方案:
- 針對每次請求,記錄:
- 本次請求已經調用過哪些實例 -> 請求調用過的實例快取
- 調用的實例,當前有多少請求在處理中 -> 實例運行請求數
- 調用的實例,最近請求錯誤率 -> 實例請求錯誤率
- 隨機將實例列表打亂,防止在以上三個指標都相同時,總是將請求發給同一個實例。
- 按照 當前請求沒有調用過靠前 -> 錯誤率越小越靠前 的順序排序 -> 實例運行請求數越小越靠前
- 取排好序之後的列表第一個實例作為本次負載均衡的實例
具體實現是:以下的程式碼來自於://github.com/JoJoTec/spring-cloud-parent
我們使用了依賴:
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
記錄實例數據的快取類:
@Log4j2
public class ServiceInstanceMetrics {
private static final String CALLING = "-Calling";
private static final String FAILED = "-Failed";
private MetricRegistry metricRegistry;
ServiceInstanceMetrics() {
}
public ServiceInstanceMetrics(MetricRegistry metricRegistry) {
this.metricRegistry = metricRegistry;
}
/**
* 記錄調用實例
* @param serviceInstance
*/
public void recordServiceInstanceCall(ServiceInstance serviceInstance) {
String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
metricRegistry.counter(key + CALLING).inc();
}
/**
* 記錄調用實例結束
* @param serviceInstance
* @param isSuccess 是否成功
*/
public void recordServiceInstanceCalled(ServiceInstance serviceInstance, boolean isSuccess) {
String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
metricRegistry.counter(key + CALLING).dec();
if (!isSuccess) {
//不成功則記錄失敗
metricRegistry.meter(key + FAILED).mark();
}
}
/**
* 獲取正在運行的調用次數
* @param serviceInstance
* @return
*/
public long getCalling(ServiceInstance serviceInstance) {
String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
long count = metricRegistry.counter(key + CALLING).getCount();
log.debug("ServiceInstanceMetrics-getCalling: {} -> {}", key, count);
return count;
}
/**
* 獲取最近一分鐘調用失敗次數分鐘速率,其實是滑動平均數
* @param serviceInstance
* @return
*/
public double getFailedInRecentOneMin(ServiceInstance serviceInstance) {
String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
double rate = metricRegistry.meter(key + FAILED).getOneMinuteRate();
log.debug("ServiceInstanceMetrics-getFailedInRecentOneMin: {} -> {}", key, rate);
return rate;
}
}
負載均衡核心程式碼:
private final LoadingCache<Long, Set<String>> calledIpPrefixes = Caffeine.newBuilder()
.expireAfterAccess(3, TimeUnit.MINUTES)
.build(k -> Sets.newConcurrentHashSet());
private final String serviceId;
private final Tracer tracer;
private final ServiceInstanceMetrics serviceInstanceMetrics;
//每次重試,其實都會調用這個 choose 方法重新獲取一個實例
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
Span span = tracer.currentSpan();
return serviceInstanceListSupplier.get().next()
.map(serviceInstances -> {
//保持 span 和調用 choose 的 span 一樣
try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
return getInstanceResponse(serviceInstances);
}
});
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
if (serviceInstances.isEmpty()) {
log.warn("No servers available for service: " + this.serviceId);
return new EmptyResponse();
}
//讀取 spring-cloud-sleuth 的對於當前請求的鏈路追蹤上下文,獲取對應的 traceId
Span currentSpan = tracer.currentSpan();
if (currentSpan == null) {
currentSpan = tracer.newTrace();
}
long l = currentSpan.context().traceId();
return getInstanceResponseByRoundRobin(l, serviceInstances);
}
@VisibleForTesting
public Response<ServiceInstance> getInstanceResponseByRoundRobin(long traceId, List<ServiceInstance> serviceInstances) {
//首先隨機打亂列表中實例的順序
Collections.shuffle(serviceInstances);
//需要先將所有參數快取起來,否則 comparator 會調用多次,並且可能在排序過程中參數發生改變(針對實例的請求統計數據一直在並發改變)
Map<ServiceInstance, Integer> used = Maps.newHashMap();
Map<ServiceInstance, Long> callings = Maps.newHashMap();
Map<ServiceInstance, Double> failedInRecentOneMin = Maps.newHashMap();
serviceInstances = serviceInstances.stream().sorted(
Comparator
//之前已經調用過的網段,這裡排後面
.<ServiceInstance>comparingInt(serviceInstance -> {
return used.computeIfAbsent(serviceInstance, k -> {
return calledIpPrefixes.get(traceId).stream().anyMatch(prefix -> {
return serviceInstance.getHost().contains(prefix);
}) ? 1 : 0;
});
})
//當前錯誤率最少的
.thenComparingDouble(serviceInstance -> {
return failedInRecentOneMin.computeIfAbsent(serviceInstance, k -> {
double value = serviceInstanceMetrics.getFailedInRecentOneMin(serviceInstance);
//由於使用的是移動平均值(EMA),需要忽略過小的差異(保留兩位小數,不是四捨五入,而是直接捨棄)
return ((int) (value * 100)) / 100.0;
});
})
//當前負載請求最少的
.thenComparingLong(serviceInstance -> {
return callings.computeIfAbsent(serviceInstance, k ->
serviceInstanceMetrics.getCalling(serviceInstance)
);
})
).collect(Collectors.toList());
if (serviceInstances.isEmpty()) {
log.warn("No servers available for service: " + this.serviceId);
return new EmptyResponse();
}
ServiceInstance serviceInstance = serviceInstances.get(0);
//記錄本次返回的網段
calledIpPrefixes.get(traceId).add(serviceInstance.getHost().substring(0, serviceInstance.getHost().lastIndexOf(".")));
//目前記錄這個只為了兼容之前的單元測試(調用次數測試)
positionCache.get(traceId).getAndIncrement();
return new DefaultResponse(serviceInstance);
}
對於記錄實例數據的快取何時更新,是在 FeignClient 粘合重試,斷路以及執行緒隔離的程式碼中的,這個我們下一節就會看到。
一些組內關於方案設計的取捨 Q&A
1. 為何沒有使用所有微服務共享的快取來保存調用數據,來讓這些數據更加準確?
共享快取的可選方案包括將這些數據記錄放入 Redis,或者是 Apache Ignite 這樣的記憶體網格中。但是有兩個問題:
- 如果數據記錄放入 Redis 這樣的額外存儲,如果 Redis 不可用會導致所有的負載均衡都無法執行。如果放入 Apache Ignite,如果對應的節點下線,那麼對應的負載均衡也無法執行。這些都是不能接受的。
- 假設微服務 A 需要調用微服務 B,可能 A 的某個實例調用 B 的某個實例有問題,但是 A 的其他實例調用 B 的這個實例卻沒有問題,例如當某個可用區與另一個可用區網路擁塞的時候。如果用同一個快取 Key 記錄 A 所有的實例調用 B 這個實例的數據,顯然是不準確的。
每個微服務使用本地快取,記錄自己調用其他實例的數據,在我們這裡看來,不僅是更容易實現,也是更準確的做法。
2. 採用 EMA 的方式而不是請求窗口的方式統計最近錯誤率
採用請求窗口的方式統計,肯定是最準確的,例如我們統計最近一分鐘的錯誤率,就將最近一分鐘的請求快取起來,讀取的時候,將快取起來的請求數據加在一起取平均數即可。但是這種方式在請求突增的時候,可能會佔用很多很多記憶體來快取這些請求。同時計算錯誤率的時候,隨著快取請求數的增多也會消耗更大量的 CPU 進行計算。這樣做很不值得。
EMA 這種滑動平均值的計算方式,常見於各種性能監控統計場景,例如 JVM 中 TLAB 大小的動態計算,G1 GC Region 大小的伸縮以及其他很多 JVM 需要動態得出合適值的地方,都用這種計算方式。他不用將請求快取起來,而是直接用最新值乘以一個比例之後加上老值乘以 (1 – 這個比例),這個比例一般高於 0.5,表示 EMA 和當前最新值更加相關。
但是 EMA 也帶來另一個問題,我們會發現隨著程式運行小數點位數會非常多,會看到類似於如下的值:0.00000000123, 0.120000001, 0.120000003, 為了忽略過於細緻差異的影響(其實這些影響也來自於很久之前的錯誤請求),我們只保留兩位小數進行排序。
微信搜索「我的編程喵」關注公眾號,每日一刷,輕鬆提升技術,斬獲各種offer: