微服務架構 | 5.4 Sentinel 流控、統計和熔斷的源碼分析
- 2022 年 1 月 31 日
- 筆記
- Sentinel, Spring Cloud, Spring Cloud Alibaba, Spring 微服務實踐學習筆記, 分散式, 學習筆記, 微服務架構, 服務容災
前言
參考資料:
《Spring Microservices in Action》
《Spring Cloud Alibaba 微服務原理與實戰》
《B站 尚矽谷 SpringCloud 框架開發教程 周陽》
《Sentinel GitHub 官網》
《Sentinel 官網》
調用鏈路是 Sentinel 的工作主流程,由各個 Slot 槽組成,將不同的 Slot 槽按照順序串在一起,從而將不同的功能(限流、降級、系統保護)組合在一起;
本篇《2. 獲取 ProcessorSlot 鏈》將從源碼級講解如何獲取調用鏈路,接著會以遍歷鏈表的方式處理每一個 Slot 槽,其中就有:FlowSlot、StatisticSlot、DegradeSlot 等。分別對應本篇《3. 流控槽實施流控邏輯》、《4. 統計槽實施指標數據統計》和《5. 熔斷槽實施服務熔斷》;
1. Sentinel 的自動裝配
1.2 依賴引入
- 我們引入 Sentinel 的 starter 依賴文件,不需要太多額外操作,即可使用 Sentinel 默認自帶的限流功能,原因是這些配置和功能都給我們自動裝配了;
- 在 Spring-Cloud-Alibaba-Sentinel 包下的 META-INF/spring.factories 文件里定義了會自動裝配哪些類;
- SentinelWebAutoConfiguration:對 Web Servlet 環境的支援;
- SentinelWebFluxAutoConfiguration:對 Spring WebFlux 的支援;
- SentinelEndpointAutoConfiguration:暴露 Endpoint 資訊;
- SentinelFeignAutoConfiguration:用於適應 Feign 組件;
- SentinelAutoConfiguration:支援對 RestTemplate 的服務調用使用 Sentinel 進行保護;
1.3 SentinelWebAutoConfiguration 配置類
- 在 SentinelWebAutoConfiguration 配置類中自動裝配了一個 FilterRegistrationBean,其主要作用是註冊一個 CommonFilter,並且默認情況下通過
/*
規則攔截所有的請求;
@Configuration
@EnableConfigurationProperties(SentinelProperties.class)
public class SentinelWebAutoConfiguration {
//省略其他程式碼
@Bean
@ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled", matchIfMissing = true)
public FilterRegistrationBean sentinelFilter() {
FilterRegistrationBean<Filter> registration = new FilterRegistrationBean<>();
SentinelProperties.Filter filterConfig = properties.getFilter();
if (filterConfig.getUrlPatterns() == null || filterConfig.getUrlPatterns().isEmpty()) {
List<String> defaultPatterns = new ArrayList<>();
//默認情況下通過 /* 規則攔截所有的請求
defaultPatterns.add("/*");
filterConfig.setUrlPatterns(defaultPatterns);
}
registration.addUrlPatterns(filterConfig.getUrlPatterns().toArray(new String[0]));
//【點進去】註冊 CommonFilter
Filter filter = new CommonFilter();
registration.setFilter(filter);
registration.setOrder(filterConfig.getOrder());
registration.addInitParameter("HTTP_METHOD_SPECIFY", String.valueOf(properties.getHttpMethodSpecify()));
log.info("[Sentinel Starter] register Sentinel CommonFilter with urlPatterns: {}.", filterConfig.getUrlPatterns());
return registration;
}
}
1.4 CommonFilter 過濾器
- CommonFilter 過濾器的作用與源碼如下:
- 從請求中獲取目標 URL;
- 獲取 Urlcleaner;
- 對當前 URL 添加限流埋點;
public class CommonFilter implements Filter {
//省略部分程式碼
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
HttpServletRequest sRequest = (HttpServletRequest)request;
Entry urlEntry = null;
try {
//解析請求 URL
String target = FilterUtil.filterTarget(sRequest);
//URL 清洗
UrlCleaner urlCleaner = WebCallbackManager.getUrlCleaner();
if (urlCleaner != null) {
//如果存在,則說明配置過 URL 清洗策略,替換配置的 targer
target = urlCleaner.clean(target);
}
if (!StringUtil.isEmpty(target)) {
String origin = this.parseOrigin(sRequest);
ContextUtil.enter("sentinel_web_servlet_context", origin);
if (this.httpMethodSpecify) {
String pathWithHttpMethod = sRequest.getMethod().toUpperCase() + ":" + target;
//使用 SphU.entry() 方法對 URL 添加限流埋點
urlEntry = SphU.entry(pathWithHttpMethod, 1, EntryType.IN);
} else {
urlEntry = SphU.entry(target, 1, EntryType.IN);
}
}
//執行過濾
chain.doFilter(request, response);
} catch (BlockException var14) {
HttpServletResponse sResponse = (HttpServletResponse)response;
WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse, var14);
} catch (ServletException | RuntimeException | IOException var15) {
Tracer.traceEntry(var15, urlEntry);
throw var15;
} finally {
if (urlEntry != null) {
urlEntry.exit();
}
ContextUtil.exit();
}
}
}
1.5 小結
- 對於 Web Servlet 環境,只是通過 Filter 的方式將所有請求自動設置為 Sentinel 的資源,從而達到限流的目的;
2. 獲取 ProcessorSlot 鏈
- Sentinel 的工作原理主要依靠 ProcessorSlot 鏈,遍歷鏈中的每一個 Slot 槽,執行相應邏輯;
2.1 Sentinel 源碼包結構
- 在 DeBug 之前,我們需要對 Sentinel 的源碼包結構做個分析,以找到方法的入口;
模組名 | 說明 |
---|---|
sentinel-adapter | 負責針對主流開源框架進行限流適配,如:Dubbo、gRPC、Zuul 等; |
sentinel-core | Sentinel 核心庫,提供限流、熔斷等實現; |
sentinel-dashboard | 控制台模組,提供可視化監控和管理; |
sentinel-demo | 官方案例; |
sentinel-extension | 實現不同組件的數據源擴展,如:Nacos、ZooKeeper、Apollo 等; |
sentinel-transport | 通訊協議處理模組; |
- Slot 槽是 Sentinel 的核心,因此方法的入口在 sentinel-core 核心庫,裡面有好多個
SphU.entry()
方法,我們給方法打上斷點,DeBug 進入,然後登錄 Sentinel 控制台;
2.2 獲取 ProcessorSlot 鏈與操作 Slot 槽的入口 CtSph.entryWithPriority()
- 一直進入最終方法的實現在
CtSph.entryWithPriority()
方法里,其主要邏輯與源碼如下:- 校驗全局上下文 context;
- 構造 ProcessorSlot 鏈;
- 遍歷 ProcessorSlot 鏈操作 Slot 槽(遍歷鏈表);
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
//上下文量已經超過閾值 -> 只初始化條目,不進行規則檢查
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
//沒有指定上下文 -> 使用默認上下文 context
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
if (!Constants.ON) {
//全局開關關閉 -> 沒有規則檢查
return new CtEntry(resourceWrapper, null, context);
}
//【斷點步入 2.2.1】通過 lookProcessChain 方法獲取 ProcessorSlot 鏈
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
if (chain == null) {
//表示資源量超過 Constants.MAX_SLOT_CHAIN_SIZE 常量 -> 不會進行規則檢查
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
//【斷點步入 3./4./5.】執行 ProcessorSlot 對 ProcessorSlot 鏈中的 Slot 槽遍歷操作(遍歷鏈表的方式)
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
//這種情況不應該發生,除非 Sentinel 內部存在錯誤
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
2.2.1 構造 ProcessorSlot 鏈 CtSph.lookProcessChain()
- 進入
CtSph.lookProcessChain()
方法;
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
//從快取中獲取 slot 調用鏈
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
//【斷點步入】構造 Slot 鏈(責任鏈模式)
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
- 最終調用
DefaultSlotChainBuilder.build()
方法構造 DefaultProcessorSlotChain;
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
for (ProcessorSlot slot : sortedSlotList) {
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
- 可以看到最後 ProcessorSlotChain 鏈中有 10 個 Slot 插槽:
- 在本篇筆記中我們關注 3 個槽:
- FlowSlot:進行流控規則校驗,對應本篇《3. 流控槽實施流控邏輯》;
- StatisticSlot:實現指標數據的統計,對應本篇《4. 統計槽實施指標數據統計》;
- DegradeSlot:服務熔斷,對應本篇《5. 熔斷槽實施服務熔斷》
2.2.2 操作 Slot 槽的入口
- 操作 Slot 槽的入口方法是:
ProcessorSlot.entry()
; - 接著會以遍歷鏈表的方式操作每個 Slot 槽,其中就有:FlowSlot、StatisticSlot、DegradeSlot 等。分別對應下面的《3. 流控槽實施流控邏輯》、《4. 統計槽實施指標數據統計》和《5. 熔斷槽實施服務熔斷》;
3. 流控槽實施流控邏輯 FlowSlot.entry()
- 進入
ProcessorSlot.entry()
方法,它會遍歷每個 Slot 插槽,並對其進行操作,其中會經過FlowSlot.entry()
方法(需要提前給該方法打上斷點),方法的邏輯跟源碼如下:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
//【斷點步入】檢查流量規則
checkFlow(resourceWrapper, context, node, count, prioritized);
//調用下一個 Slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
- 進入
FlowSlot.checkFlow()
方法,最終調用FlowRuleChecker.checkFlow()
方法,方法的邏輯和源碼如下:- 遍歷所有流控規則 FlowRule;
- 針對每個規則調用 canPassCheck 進行校驗;
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
//【斷點步入 3.1】獲取流控規則
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
//遍歷所有流控規則 FlowRule
for (FlowRule rule : rules) {
//【點進去 3.2】校驗每條規則
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
3.1 獲取流控規則 FlowSlot.ruleProvider.apply()
- 進入
FlowSlot.ruleProvider.apply()
方法,獲取到 Sentinel 控制台上的流控規則;
private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
@Override
public Collection<FlowRule> apply(String resource) {
// Flow rule map should not be null.
Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
return flowRules.get(resource);
}
};
3.2 校驗每條規則 FlowRuleChecker.canPassCheck()
- 進入
FlowRuleChecker.canPassCheck()
方法,分集群和單機模式校驗每條規則;
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
//集群模式
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
//【點進去】單機模式
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
- 由於我們是單機模式,進入
FlowRuleChecker.passLocalCheck()
方法,其主要邏輯和源碼如下:- 根據來源和策略獲取 Node,從而拿到統計的 runtime 資訊;
- 使用流量控制器檢查是否讓流量通過;
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
//【點進去 3.2.1】獲取 Node
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
//【點進去 3.2.2】獲取流控的處理策略
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
3.2.1 獲取 Node FlowRuleChecker.selectNodeByRequesterAndStrategy()
- 進入
FlowRuleChecker.selectNodeByRequesterAndStrategy()
方法,其根據 FlowRule 中配置的 Strategy 和 limitApp 屬性,返回不同處理策略的 Node;
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
//limitApp 不能為空
String limitApp = rule.getLimitApp();
int strategy = rule.getStrategy();
String origin = context.getOrigin();
//場景1:限流規則設置了具體應用,如果當前流量就是通過該應用的,則命中場景1
if (limitApp.equals(origin) && filterOrigin(origin)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Matches limit origin, return origin statistic node.
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
//場景2:限流規則未指定任何具體應,默認為default,則當前流量直接命中場景2
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Return the cluster node.
return node.getClusterNode();
}
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
//場景3:限流規則設置的是other,當前流量未命中前兩種場景
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
}
return null;
}
- 假設我們對介面 UserService 配置限流 1000 QPS,這 3 種場景分別如下:
- 場景 1:目的是優先保障重要來源的流量。我們需要區分調用來源,將限流規則細化。對A應用配置500QPS,對B應用配置200QPS,此時會產生兩條規則:A應用請求的流量限制在500,B應用請求的流量限制在200;
- 場景 2:沒有特別重要來源的流量。我們不想區分調用來源,所有入口調用 UserService 共享一個規則,所有 client 加起來總流量只能通過 1000 QPS;
- 場景 3:配合第1種場景使用,在長尾應用多的情況下不想對每個應用進行設置,沒有具體設置的應用都將命中;
3.2.2 獲取流控的處理策略 `FlowRule.getRater().canPass()
- 進入
FlowRule.getRater().canPass()
方法,首先通過FlowRule.getRater()
獲得流控行為 TrafficShapingController,這是一個介面,有四種實現類,如下圖所示:
- 有以下四種處理策略:
- DefaultController:直接拒絕;
- RateLimiterController:勻速排隊;
- WarmUpController:冷啟動(預熱);
- WarmUpRateLimiterController:勻速+冷啟動。
- 最終調用
TrafficShapingController.canPass()
方法,執行流控行為;
4. 統計槽實施指標數據統計 StatisticSlot.entry()
- 限流的核心是限流演算法的實現,Sentinel 默認採用滑動窗口演算法來實現限流,具體的指標數據統計由 StatisticSlot 實現;
- 我們給
StatisticSlot.entry()
方法里的語句打上斷點,運行到游標處; StatisticSlot.entry()
方法的核心是使用 Node 統計「增加執行緒數」和「請求通過數」;
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
try {
//先執行後續 Slot 檢查,再統計數據(即先調用後續所有 Slot)
fireEntry(context, resourceWrapper, node, count, prioritized, args);
//【斷點步入】使用 Node 統計「增加執行緒數」和「請求通過數」
node.increaseThreadNum();
node.addPassRequest(count);
//如果存在來源節點,則對來源節點增加執行緒數和請求通過數
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
//如果是入口流量,則對全局節點增加執行緒數和請求通過數
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
//執行事件通知和回調函數
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
//處理優先順序等待異常
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
//如果有來源節點,則對來源節點增加執行緒數
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseThreadNum();
}
//如果是入口流量,對全局節點增加執行緒數
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
}
//執行事件通知和回調函數
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
//處理限流、熔斷等異常
} catch (BlockException e) {
//省略
throw e;
//處理業務異常
} catch (Throwable e) {
context.getCurEntry().setError(e);
throw e;
}
}
4.1 統計「增加執行緒數」和「請求通過數」
- 這兩個方法都是調用同一個類的,筆者以第一個為例,進入
DefaultNode.increaseThreadNum()
方法,最終調用的是StatisticNode.increaseThreadNum()
,而統計也是依靠 StatisticNode 維護的,這裡放上 StatisticNode 的統計核心與源碼:- StatisticNode 持有兩個計數器 Metric 對象,統計行為是通過 Metric 完成的;
public class StatisticNode implements Node {
//省略其他程式碼
//【斷點步入】最近 1s 滑動窗口計數器(默認 1s)
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
//最近 1min 滑動窗口計數器(默認 1min)
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
//增加 「請求通過數」
@Override
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
//增加 RT 和成功數
@Override
public void addRtAndSuccess(long rt, int successCount) {
rollingCounterInSecond.addSuccess(successCount);
rollingCounterInSecond.addRT(rt);
rollingCounterInMinute.addSuccess(successCount);
rollingCounterInMinute.addRT(rt);
}
//增加「執行緒數」
@Override
public void increaseThreadNum() {
curThreadNum.increment();
}
}
- 這裡還有減少請求通過數(執行緒數)、統計最大值等方法,由於篇幅有限,這裡不放出,感興趣的讀者可以自己 DeBug 進入看看;
4.2 數據統計的數據結構
4.2.1 ArrayMetric 指標數組
- ArrayMetric 的構造方法需要先給方法打上斷點,重新 DeBug,在初始化時注入構造;
public class ArrayMetric implements Metric {
//省略其他程式碼
//【點進去 4.2.2】數據存儲
private final LeapArray<MetricBucket> data;
//最近 1s 滑動計數器用的是 OccupiableBucketLeapArray
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
//最近 1min 滑動計數器用的是 BucketLeapArray
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
//增加成功數
@Override
public void addSuccess(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addSuccess(count);
}
//增加通過數
@Override
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
//增加 RT
@Override
public void addRT(long rt) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addRT(rt);
}
}
4.2.2 LeapArray 環形數組
- LeapArray 是處理數據的核心數據結構,採用滑動窗口演算法;
- ArrayMetric 中持有 LeapArray 對象,所有方法都是對 LeapArray 進行操作;
- LeapArray 是環形的數據結構,為了節約記憶體,它存儲固定個數的窗口對象 WindowWrap,只保存最近一段時間的數據,新增的時間窗口會覆蓋最早的時間窗口;
public abstract class LeapArray<T> {
//省略其他程式碼
//單個窗口的長度(1個窗口多長時間)
protected int windowLengthInMs;
//取樣窗口個數
protected int sampleCount;
//全部窗口的長度(全部窗口多長時間)
protected int intervalInMs;
private double intervalInSecond;
//窗口數組:存儲所有窗口(支援原子讀取和寫入)
protected final AtomicReferenceArray<WindowWrap<T>> array;
//更新窗口數據時用的鎖
private final ReentrantLock updateLock = new ReentrantLock();
public LeapArray(int sampleCount, int intervalInMs) {
//計算單個窗口的長度
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
//【點進去 4.2.3】獲取當前窗口
public WindowWrap<T> currentWindow() {
//這裡參數是當前時間
return currentWindow(TimeUtil.currentTimeMillis());
}
//獲取指定時間的窗口
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 計算數組下標
int idx = calculateTimeIdx(timeMillis);
//計算當前請求對應的窗口開始時間
long windowStart = calculateWindowStart(timeMillis);
/*
* 從 array 中獲取窗口。有 3 種情況:
* (1) array 中窗口不在,創建一個 CAS 並寫入 array;
* (2) array 中窗口開始時間 = 當前窗口開始時間,直接返回;
* (3) array 中窗口開始時間 < 當前窗口開始時間,表示 o1d 窗口已過期,重置窗口數據並返回;
*/
while (true) {
// 取窗口
WindowWrap<T> old = array.get(idx);
//(1)窗口不在
if (old == null) {
//創建一個窗口
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
//CAS將窗口寫進 array 中並返回(CAS 操作確保只初始化一次)
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
//並發寫失敗,釋放 CPU 資源,避免有執行緒長時間佔用 CPU,一般下次來的時候 array 中有數據了會命中第2種情況;
Thread.yield();
}
//(2)array 中窗口開始時間 = 當前窗口開始時間
} else if (windowStart == old.windowStart()) {
//直接返回
return old;
//(3)array 中窗口開始時間 < 當前窗口開始時間
} else if (windowStart > old.windowStart()) {
//嘗試獲取更新鎖
if (updateLock.tryLock()) {
try {
//拿到鎖的執行緒才重置窗口
return resetWindowTo(old, windowStart);
} finally {
//釋放鎖
updateLock.unlock();
}
} else {
//並發加鎖失敗,釋放 CPU 資源,避免有執行緒長時間佔用 CPU,一般下次來的時候因為 old 對象時間更新了會命中第 2 種情況;
Thread.yield();
}
//理論上不會出現
} else if (windowStart < old.windowStart()) {
// 正常情況不會進入該分支(機器時鐘回撥等異常情況)
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
//計算索引
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
//timeId 降低時間精度
long timeId = timeMillis / windowLengthInMs;
//計算當前索引,這樣我們就可以將時間戳映射到 leap 數組
return (int)(timeId % array.length());
}
//計算窗口開始時間
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
}
4.2.3 WindowWrap 窗口包裝類
- WindowWrap 是一個窗口對象,它是一個包裝類,包裝的對象是 MetricBucket;
public class WindowWrap<T> {
//窗口長度,與 LeapArray 的 windowLengthInMs 一致
private final long windowLengthInMs;
//窗口開始時間,其值是 windowLengthInMs 的整數倍
private long windowStart;
//窗口的數據,支援 MetricBucket 類型,存儲統計數據
private T value;
//省略其他程式碼
}
4.2.4 MetricBucket 指標桶
- MetricBucket 類的定義如下,可以發現指標數據存在 LongAdder[] counters中;
- LongAdder 是 JDK1.8 中新增的類,用於在高並發場景下代替AtomicLong,以用空間換時間的方式降低了 CAS 失敗的概率,從而提高性能;
public class MetricBucket {
/**
* 存儲指標的計數器;
* LongAdder 是執行緒安全的計數器
* counters[0] PASS 通過數;
* counters[1] BLOCK 拒絕數;
* counters[2] EXCEPTION 異常數;
* counters[3] SUCCESS 成功數;
* counters[4] RT 響應時長;
* counters[5] OCCUPIED_PASS 預分配通過數;
**/
private final LongAdder[] counters;
//最小 RT,默認值是 5000ms
private volatile long minRt;
//構造中初始化
public MetricBucket() {
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
initMinRt();
}
//覆蓋指標
public MetricBucket reset(MetricBucket bucket) {
for (MetricEvent event : MetricEvent.values()) {
counters[event.ordinal()].reset();
counters[event.ordinal()].add(bucket.get(event));
}
initMinRt();
return this;
}
private void initMinRt() {
this.minRt = SentinelConfig.statisticMaxRt();
}
//重置指標為0
public MetricBucket reset() {
for (MetricEvent event : MetricEvent.values()) {
counters[event.ordinal()].reset();
}
initMinRt();
return this;
}
//獲取指標,從 counters 中返回
public long get(MetricEvent event) {
return counters[event.ordinal()].sum();
}
//添加指標
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
public long pass() {
return get(MetricEvent.PASS);
}
public long block() {
return get(MetricEvent.BLOCK);
}
public void addPass(int n) {
add(MetricEvent.PASS, n);
}
public void addBlock(int n) {
add(MetricEvent.BLOCK, n);
}
//省略其他程式碼
}
4.2.5 各數據結構的依賴關係
4.2.6 LeapArray 統計數據的大致思路
- 創建一個長度為 n 的數組,數組元素就是窗口,窗口包裝了 1 個指標桶,桶中存放了該窗口時間範圍中對應的請求統計數據;
- 可以想像成一個環形數組在時間軸上向右滾動,請求到達時,會命中數組中的一個窗口,那麼該請求的數據就會存到命中的這個窗口包含的指標桶中;
- 當數組轉滿一圈時,會回到數組的開頭,而此時下標為 0 的元素需要重複使用,它裡面的窗口數據過期了,需要重置,然後再使用。具體過程如下圖:
5. 熔斷槽實施服務熔斷 DegradeSlot.entry()
- 服務熔斷是通過 DegradeSlot 來實現的,它會根據用戶配置的熔斷規則和系統運行時各個 Node 中的統計數據進行熔斷判斷;
- 注意:熔斷功能在 Sentinel-1.8.0 版本前後有較大變化;
- 我們給
DegradeSlot.entry()
方法里的語句打上斷點,運行到游標處;
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
//【斷點步入】熔斷檢查
performChecking(context, resourceWrapper);
//調用下一個 Slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
- 進入
DegradeSlot.performChecking()
方法,其邏輯與源碼如下:- 根據資源名稱獲取斷路器;
- 循環判斷每個斷路器;
void performChecking(Context context, ResourceWrapper r) throws BlockException {
//根據 resourceName 獲取斷路器
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
//循環判斷每個斷路器
for (CircuitBreaker cb : circuitBreakers) {
//【點進去】嘗試通過斷路器
if (!cb.tryPass(context)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}
5.1 繼續或取消熔斷功能
- 進入
AbstractCircuitBreaker.tryPass()
方法,當請求超時並且處於探測恢復(半開狀態,HALF-OPEN 狀態)失敗時繼續斷路功能;
@Override
public boolean tryPass(Context context) {
//當前斷路器狀態為關閉
if (currentState.get() == State.CLOSED) {
return true;
}
if (currentState.get() == State.OPEN) {
//【點進去】對於半開狀態,我們嘗試通過
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
return false;
}
- 進入
AbstractCircuitBreaker.fromOpenToHalfOpen()
方法,實現狀態的變更;
protected boolean fromOpenToHalfOpen(Context context) {
//嘗試將狀態從 OPEN 設置為 HALF_OPEN
if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
//狀態變化通知
notifyObservers(State.OPEN, State.HALF_OPEN, null);
Entry entry = context.getCurEntry();
//在 entry 添加一個 exitHandler entry.exit() 時會調用
entry.whenTerminate(new BiConsumer<Context, Entry>() {
@Override
public void accept(Context context, Entry entry) {
//如果有發生異常,重新將狀態設置為OPEN 請求不同通過
if (entry.getBlockError() != null) {
currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
}
}
});
//此時狀態已設置為HALF_OPEN正常通行
return true;
}
//熔斷
return false;
}
- 上述講解了:狀態從 OPEN 變為 HALF_OPEN,HALF_OPEN 變為 OPEN;
- 但狀態從 HALF_OPEN 變為 CLOSE 需要在正常執行完請求後,由 entry.exit() 調用
DegradeSlot.exit()
方法來改變狀態;
5.2 請求失敗,啟動熔斷
- 狀態從 HALF_OPEN 變為 CLOSE 的實現方法在
DegradeSlot.exit()
;
@Override
public void exit(Context context, ResourceWrapper r, int count, Object... args) {
Entry curEntry = context.getCurEntry();
//無阻塞異常
if (curEntry.getBlockError() != null) {
fireExit(context, r, count, args);
return;
}
//通過資源名獲取斷路器
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
//沒有配置斷路器,則直接放行
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
fireExit(context, r, count, args);
return;
}
if (curEntry.getBlockError() == null) {
for (CircuitBreaker circuitBreaker : circuitBreakers) {
//【點進去】在請求完成時
circuitBreaker.onRequestComplete(context);
}
}
fireExit(context, r, count, args);
}
- 進入
ExceptionCircuitBreaker.onRequestComplete()
方法,其主要邏輯與源碼如下:- 請求失敗比例與總請求比例加 1,用於判斷後續是否超過閾值;
@Override
public void onRequestComplete(Context context) {
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
Throwable error = entry.getError();
//簡單錯誤計數器
SimpleErrorCounter counter = stat.currentWindow().value();
if (error != null) {
//異常請求數加 1
counter.getErrorCount().add(1);
}
//總請求數加 1
counter.getTotalCount().add(1);
//【點進去】超過閾值時變更狀態
handleStateChangeWhenThresholdExceeded(error);
}
- 進入
ExceptionCircuitBreaker.handleStateChangeWhenThresholdExceeded()
方法,變更狀態;
private void handleStateChangeWhenThresholdExceeded(Throwable error) {
//全開則直接放行
if (currentState.get() == State.OPEN) {
return;
}
//半開狀態
if (currentState.get() == State.HALF_OPEN) {
//檢查請求
if (error == null) {
//發生異常,將狀態從半開 HALF_OPEN 轉為關閉 CLOSE
fromHalfOpenToClose();
} else {
//無異常,解開半開狀態
fromHalfOpenToOpen(1.0d);
}
return;
}
//計算是否超過閾值
List<SimpleErrorCounter> counters = stat.values();
long errCount = 0;
long totalCount = 0;
for (SimpleErrorCounter counter : counters) {
errCount += counter.errorCount.sum();
totalCount += counter.totalCount.sum();
}
if (totalCount < minRequestAmount) {
return;
}
double curCount = errCount;
if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
//熔斷策略為:異常比例
curCount = errCount * 1.0d / totalCount;
}
if (curCount > threshold) {
transformToOpen(curCount);
}
}
6. Sentinel 源碼結構圖小結
- SphU.entry():核心邏輯的入口函數;
- CtSph.entryWithPriority():獲取 Slot 鏈,操作 Slot 槽;
- CtSph.lookProcessChain():獲取 ProcessorSlot 鏈;
- DefaultSlotChainBuilder.build():構造 DefaultProcessorSlotChain 鏈(裡面有 10 個 Slot 插槽);
- ProcessorSlot.entry():遍歷 ProcessorSlot 鏈;
-
FlowSlot.entry():遍歷到 FlowSlot 槽,限流規則;
- FlowSlot.checkFlow():檢查流量規則;
- FlowRuleChecker.checkFlow():使用檢查器檢查流量規則;
- FlowSlot.ruleProvider.apply():獲取流控規則;
- FlowRuleChecker.canPassCheck():校驗每條規則;
- FlowRuleChecker.passClusterCheck():集群模式;
- FlowRuleChecker.passLocalCheck():單機模式;
- FlowRuleChecker.selectNodeByRequesterAndStrategy():獲取 Node;
- FlowRule.getRater():獲得流控行為 TrafficShapingController;
- TrafficShapingController.canPass():執行流控行為;
- FlowRuleChecker.checkFlow():使用檢查器檢查流量規則;
- FlowSlot.checkFlow():檢查流量規則;
-
StatisticSlot.entry:遍歷到 StatisticSlot 槽,統計數據;
- DefaultNode.increaseThreadNum():統計「增加執行緒數」;
- StatisticNode.increaseThreadNum():統計「請求通過數」;
- ArrayMetric.ArrayMetric():初始化指標數組;
- LeapArray:環形數組;
- WindowWrap:窗口包裝類;
- MetricBucket:指標桶;
- LeapArray:環形數組;
- ArrayMetric.ArrayMetric():初始化指標數組;
- StatisticNode.increaseThreadNum():統計「請求通過數」;
- DefaultNode.addPassRequest():統計「增加執行緒數」;
- StatisticNode.addPassRequest():同上;
- DefaultNode.increaseThreadNum():統計「增加執行緒數」;
-
DegradeSlot.entry():遍歷到 DegradeSlot 槽,服務熔斷;
- DegradeSlot.performChecking():執行檢查;
- DegradeRuleManager.getCircuitBreakers():根據 resourceName 獲取斷路器;
- AbstractCircuitBreaker.tryPass():繼續或取消熔斷功能;
- AbstractCircuitBreaker.fromOpenToHalfOpen():嘗試通過半開狀態;
- DegradeSlot.performChecking():執行檢查;
-
DegradeSlot.exit():請求失敗(超時),啟動熔斷;
- ExceptionCircuitBreaker.onRequestComplete():在請求完成時操作;
- ExceptionCircuitBreaker.handleStateChangeWhenThresholdExceeded():變更狀態;
- ExceptionCircuitBreaker.onRequestComplete():在請求完成時操作;
-
- CtSph.lookProcessChain():獲取 ProcessorSlot 鏈;
- CtSph.entryWithPriority():獲取 Slot 鏈,操作 Slot 槽;
最後
