9.源碼分析—SOFARPC是如何實現故障剔除的?
- 2019 年 10 月 3 日
- 筆記
SOFARPC源碼解析系列:
6.源碼分析—和dubbo相比SOFARPC是如何實現負載均衡的?
8.源碼分析—從設計模式中看SOFARPC中的EventBus?
在第七講裡面7.源碼分析—SOFARPC是如何實現連接管理與心跳?,我講了客戶端是怎麼維護服務端的長連接的。但是有一種情況是Consumer 和 Provider的長連接還在,註冊中心未下發摘除,但伺服器端由於某些原因,例如長時間的 Full GC, 硬體故障(後文中為避免重複,統一描述為機器假死)等場景,處於假死狀態。
這個時候 Consumer 應該不調用或少調用該 Provider,可以通過權重的方式來進行控制。目前 SOFARPC 5.3.0 以上的版本支援 RPC 單機故障剔除能力。SOFARPC 通過服務權重控制方式來減少異常服務的調用,將更多流量打到正常服務機器上,提高服務可用性。
接下來我們來講講具體的服務權重降級是怎麼實現的。在看這篇文章之前我希望讀者能讀完如下幾篇文章:
- 8.源碼分析—從設計模式中看SOFARPC中的EventBus?,因為SOFARPC的服務權重降級是通過EventBus來調用的。
- 3. 源碼分析—SOFARPC客戶端服務調用,這篇文章裡面寫了是如何調用服務端的,客戶端會在調用服務端的時候觸發匯流排,給訂閱者發送一個消息。
- 6.源碼分析—和dubbo相比SOFARPC是如何實現負載均衡的?,這篇文章裡面寫的是SOFARPC的負載均衡是怎麼實現的,以及如何通過權重控制並發量。
如果你了解了上面的知識,那麼可以開始接下來的內容了。
實例
我們首先給出一個服務端和客戶端的實例,方便大家去調試。
官方的文檔在這裡:自動故障剔除
service
public static void main(String[] args) { ServerConfig serverConfig = new ServerConfig() .setProtocol("bolt") // 設置一個協議,默認bolt .setPort(12200) // 設置一個埠,默認12200 .setDaemon(false); // 非守護執行緒 ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>() .setInterfaceId(HelloService.class.getName()) // 指定介面 .setRef(new HelloServiceImpl()) // 指定實現 .setServer(serverConfig); // 指定服務端 providerConfig.export(); // 發布服務 }
client
public static void main(String[] args) { FaultToleranceConfig faultToleranceConfig = new FaultToleranceConfig(); faultToleranceConfig.setRegulationEffective(true); faultToleranceConfig.setDegradeEffective(true); faultToleranceConfig.setTimeWindow(10); faultToleranceConfig.setWeightDegradeRate(0.5); FaultToleranceConfigManager.putAppConfig("appName", faultToleranceConfig); ApplicationConfig applicationConfig = new ApplicationConfig(); applicationConfig.setAppName("appName"); ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>() .setInterfaceId(HelloService.class.getName()) // 指定介面 .setProtocol("bolt") // 指定協議 .setDirectUrl("bolt://127.0.0.1:12200") // 指定直連地址 .setConnectTimeout(2000 * 1000) .setApplication(applicationConfig); HelloService helloService = consumerConfig.refer(); while (true) { try { LOGGER.info(helloService.sayHello("world")); } catch (Exception e) { e.printStackTrace(); } try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }
自動故障剔除模組的註冊
我們在在客戶端的例子裡面通過FaultToleranceConfigManager註冊了FaultToleranceConfig配置。
FaultToleranceConfig faultToleranceConfig = new FaultToleranceConfig(); faultToleranceConfig.setRegulationEffective(true); faultToleranceConfig.setDegradeEffective(true); faultToleranceConfig.setTimeWindow(10); faultToleranceConfig.setWeightDegradeRate(0.5); FaultToleranceConfigManager.putAppConfig("appName", faultToleranceConfig);
我們先進入到FaultToleranceConfigManager裡面看看putAppConfig做了什麼。
FaultToleranceConfigManager#putAppConfig
/** * All fault-tolerance config of apps */ private static final ConcurrentMap<String, FaultToleranceConfig> APP_CONFIGS = new ConcurrentHashMap<String, FaultToleranceConfig>(); public static void putAppConfig(String appName, FaultToleranceConfig value) { if (appName == null) { if (LOGGER.isWarnEnabled()) { LOGGER.warn("App name is null when put fault-tolerance config"); } return; } if (value != null) { APP_CONFIGS.put(appName, value); if (LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, "Get a new resource, value[" + value + "]"); } } else { APP_CONFIGS.remove(appName); if (LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, "Remove a resource, key[" + appName + "]"); } } calcEnable(); } static void calcEnable() { for (FaultToleranceConfig config : APP_CONFIGS.values()) { if (config.isRegulationEffective()) { aftEnable = true; return; } } aftEnable = false; }
上面的方法寫的非常的清楚:
- 校驗appName,為空的話直接返回
- 然後把我們定義的config放到APP_CONFIGS這個變數裡面
- 調用calcEnable,根據我們配置的config,將aftEnable變數設置為true
到這裡就完成了故障剔除的配置設置。
註冊故障剔除模組
我們在8.源碼分析—從設計模式中看SOFARPC中的EventBus?裡面講了,初始化ConsumerConfig的時候會初始化父類的靜態程式碼塊,然後會初始化RpcRuntimeContext的靜態程式碼塊。
RpcRuntimeContext
static { if (LOGGER.isInfoEnabled()) { LOGGER.info("Welcome! Loading SOFA RPC Framework : {}, PID is:{}", Version.BUILD_VERSION, PID); } put(RpcConstants.CONFIG_KEY_RPC_VERSION, Version.RPC_VERSION); // 初始化一些上下文 initContext(); // 初始化其它模組 ModuleFactory.installModules(); // 增加jvm關閉事件 if (RpcConfigs.getOrDefaultValue(RpcOptions.JVM_SHUTDOWN_HOOK, true)) { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { if (LOGGER.isWarnEnabled()) { LOGGER.warn("SOFA RPC Framework catch JVM shutdown event, Run shutdown hook now."); } destroy(false); } }, "SOFA-RPC-ShutdownHook")); } }
在這個程式碼塊裡面會調用ModuleFactory初始化其他模組
ModuleFactory#installModules
public static void installModules() { ExtensionLoader<Module> loader = ExtensionLoaderFactory.getExtensionLoader(Module.class); //moduleLoadList 默認是 * String moduleLoadList = RpcConfigs.getStringValue(RpcOptions.MODULE_LOAD_LIST); for (Map.Entry<String, ExtensionClass<Module>> o : loader.getAllExtensions().entrySet()) { String moduleName = o.getKey(); Module module = o.getValue().getExtInstance(); // judge need load from rpc option if (needLoad(moduleLoadList, moduleName)) { // judge need load from implement if (module.needLoad()) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Install Module: {}", moduleName); } //安裝模板 module.install(); INSTALLED_MODULES.put(moduleName, module); } else { if (LOGGER.isInfoEnabled()) { LOGGER.info("The module " + moduleName + " does not need to be loaded."); } } } else { if (LOGGER.isInfoEnabled()) { LOGGER.info("The module " + moduleName + " is not in the module load list."); } } } }
這裡會根據SPI初始化四個模組,分別是:
fault-tolerance
sofaTracer-resteasy
lookout
sofaTracer
我們這裡只講解fault-tolerance模組。
然後我們進入到FaultToleranceModule#install方法中
private Regulator regulator = new TimeWindowRegulator(); public void install() { subscriber = new FaultToleranceSubscriber(); //註冊ClientSyncReceiveEvent和ClientAsyncReceiveEvent到匯流排中 EventBus.register(ClientSyncReceiveEvent.class, subscriber); EventBus.register(ClientAsyncReceiveEvent.class, subscriber); String regulatorAlias = RpcConfigs.getOrDefaultValue(RpcOptions.AFT_REGULATOR, "timeWindow"); regulator = ExtensionLoaderFactory.getExtensionLoader(Regulator.class).getExtension(regulatorAlias); //調用TimeWindowRegulator的init方法 regulator.init(); }
這裡我們的訂閱者是FaultToleranceSubscriber實例,訂閱了兩個ClientSyncReceiveEvent和ClientAsyncReceiveEvent事件。
然後會調用regulator的實現類TimeWindowRegulator的初始化方法
TimeWindowRegulator#init
/** * 度量策略(創建計算模型, 對計算模型里的數據進行度量,選出正常和異常節點) */ private MeasureStrategy measureStrategy; /** * 計算策略(根據度量結果,判斷是否需要執行降級或者恢復) */ private RegulationStrategy regulationStrategy; /** * 降級策略: 例如調整權重 */ private DegradeStrategy degradeStrategy; /** * 恢復策略:例如調整權重 */ private RecoverStrategy recoverStrategy; /** * Listener for invocation stat change. */ private final InvocationStatListener listener = new TimeWindowRegulatorListener(); public void init() { String measureStrategyAlias = RpcConfigs .getOrDefaultValue(RpcOptions.AFT_MEASURE_STRATEGY, "serviceHorizontal"); String regulationStrategyAlias = RpcConfigs.getOrDefaultValue(RpcOptions.AFT_REGULATION_STRATEGY, "serviceHorizontal"); String degradeStrategyAlias = RpcConfigs.getOrDefaultValue(RpcOptions.AFT_DEGRADE_STRATEGY, "weight"); String recoverStrategyAlias = RpcConfigs.getOrDefaultValue(RpcOptions.AFT_RECOVER_STRATEGY, "weight"); //ServiceHorizontalMeasureStrategy measureStrategy = ExtensionLoaderFactory.getExtensionLoader(MeasureStrategy.class).getExtension( measureStrategyAlias); //ServiceHorizontalRegulationStrategy regulationStrategy = ExtensionLoaderFactory.getExtensionLoader(RegulationStrategy.class).getExtension( regulationStrategyAlias); //WeightDegradeStrategy degradeStrategy = ExtensionLoaderFactory.getExtensionLoader(DegradeStrategy.class).getExtension( degradeStrategyAlias); //WeightRecoverStrategy recoverStrategy = ExtensionLoaderFactory.getExtensionLoader(RecoverStrategy.class).getExtension( recoverStrategyAlias); //TimeWindowRegulatorListener InvocationStatFactory.addListener(listener); }
這裡面主要是根據SPI初始化了度量策略,計算策略,降級策略,恢復策略,這些東西有什麼用,我們下面講。
觸發權重降級
我們在3. 源碼分析—SOFARPC客戶端服務調用裡面講到了,客戶端在調用的時候最後會調用AbstractCluster#doSendMsg方法,然後根據不同的策略,同步、非同步、單向等調用然後返回response實例。
protected SofaResponse doSendMsg(ProviderInfo providerInfo, ClientTransport transport, SofaRequest request) throws SofaRpcException { .... // 同步調用 if (RpcConstants.INVOKER_TYPE_SYNC.equals(invokeType)) { long start = RpcRuntimeContext.now(); try { //BoltClientTransport#syncSend response = transport.syncSend(request, timeout); } finally { if (RpcInternalContext.isAttachmentEnable()) { long elapsed = RpcRuntimeContext.now() - start; context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, elapsed); } } } .... }
因為在故障模組註冊的時候訂閱了兩個ClientSyncReceiveEvent和ClientAsyncReceiveEvent事件。即一個同步事件和一個非同步事件,我們這裡挑同步調用進行講解。
在上面的程式碼片段中,我們看到了會調用到BoltClientTransport#syncSend。
BoltClientTransport#syncSend
public SofaResponse syncSend(SofaRequest request, int timeout) throws SofaRpcException { //檢查連接 checkConnection(); RpcInternalContext context = RpcInternalContext.getContext(); InvokeContext boltInvokeContext = createInvokeContext(request); SofaResponse response = null; SofaRpcException throwable = null; try { //向匯流排發出ClientBeforeSendEvent事件 beforeSend(context, request); response = doInvokeSync(request, boltInvokeContext, timeout); return response; } catch (Exception e) { // 其它異常 throwable = convertToRpcException(e); throw throwable; } finally { //向匯流排發出ClientAfterSendEvent事件 afterSend(context, boltInvokeContext, request); //向匯流排發出ClientSyncReceiveEvent事件 if (EventBus.isEnable(ClientSyncReceiveEvent.class)) { //把當前被調用的provider和ConsumerConfig發送到匯流排中去 EventBus.post(new ClientSyncReceiveEvent(transportConfig.getConsumerConfig(), transportConfig.getProviderInfo(), request, response, throwable)); } } }
其實上面這麼一大段程式碼和我們這篇文章有關係的也就只要最後向匯流排發送ClientSyncReceiveEvent事件而已。
匯流排發送的時候會觸發訂閱者FaultToleranceSubscriber的onEvent方法。
我們進入到FaultToleranceSubscriber#onEvent中
public void onEvent(Event originEvent) { Class eventClass = originEvent.getClass(); if (eventClass == ClientSyncReceiveEvent.class) { //這裡會調用aftEnable if (!FaultToleranceConfigManager.isEnable()) { return; } // 同步結果 ClientSyncReceiveEvent event = (ClientSyncReceiveEvent) originEvent; ConsumerConfig consumerConfig = event.getConsumerConfig(); ProviderInfo providerInfo = event.getProviderInfo(); InvocationStat result = InvocationStatFactory.getInvocationStat(consumerConfig, providerInfo); if (result != null) { //記錄調用次數 result.invoke(); Throwable t = event.getThrowable(); if (t != null) { //記錄異常次數 result.catchException(t); } } } ... }
這裡我們忽略其他的事件,只留下ClientSyncReceiveEvent事件的處理流程。
在這裡我們又看到了InvocationStatFactory這個工廠類,在上面TimeWindowRegulator#init也用到了這個類。
在返回result之後會調用invoke方法,記錄一下客戶端調用服務端的次數,如果有異常,也會調用一下catchException方法,記錄一下異常的次數。這兩個參數會在做服務剔除的時候非同步做統計使用。
InvocationStatFactory#getInvocationStat
public static InvocationStat getInvocationStat(ConsumerConfig consumerConfig, ProviderInfo providerInfo) { String appName = consumerConfig.getAppName(); if (appName == null) { return null; } // 應用開啟單機故障摘除功能 if (FaultToleranceConfigManager.isRegulationEffective(appName)) { return getInvocationStat(new InvocationStatDimension(providerInfo, consumerConfig)); } return null; } public static InvocationStat getInvocationStat(InvocationStatDimension statDimension) { //第一次的時候為空 InvocationStat invocationStat = ALL_STATS.get(statDimension); if (invocationStat == null) { //直接new一個實例放入到ALL_STATS變數中 invocationStat = new ServiceExceptionInvocationStat(statDimension); InvocationStat old = ALL_STATS.putIfAbsent(statDimension, invocationStat); if (old != null) { invocationStat = old; } //LISTENERS在調用TimeWindowRegulator#init的時候add進來的,只有一個TimeWindowRegulatorListener for (InvocationStatListener listener : LISTENERS) { listener.onAddInvocationStat(invocationStat); } } return invocationStat; }
如果是第一次來到這個方法的話,那麼會實例化一個ServiceExceptionInvocationStat放入到ALL_STATS變數中,然後遍歷InvocationStatFactory的遍歷LISTENERS,調用監聽器的onAddInvocationStat方法。
LISTENERS裡面的實例是我們在TimeWindowRegulator#init方法裡面add進去的TimeWindowRegulatorListener。
注意,這裡用了兩個封裝類,都是接下來要用到的。分別是InvocationStatDimension和ServiceExceptionInvocationStat。
InvocationStatDimension
public class InvocationStatDimension { /** * One provider of service reference */ private final ProviderInfo providerInfo; /** * Config of service reference */ private final ConsumerConfig consumerConfig; /** * cache value: dimensionKey */ private transient String dimensionKey; /** * cache value : originWeight */ private transient Integer originWeight; }
ServiceExceptionInvocationStat的結構圖:
ServiceExceptionInvocationStat
public class ServiceExceptionInvocationStat extends AbstractInvocationStat { /** * Instantiates a new Service exception invocation stat. * * @param invocation the invocation */ public ServiceExceptionInvocationStat(InvocationStatDimension invocation) { super(invocation); } @Override public long catchException(Throwable t) { //統計異常次數 if (t instanceof SofaRpcException) { SofaRpcException exception = (SofaRpcException) t; if (exception.getErrorType() == RpcErrorType.CLIENT_TIMEOUT || exception.getErrorType() == RpcErrorType.SERVER_BUSY) { return exceptionCount.incrementAndGet(); } } return exceptionCount.get(); } }
然後直接看它父類的具體參數就好了
AbstractInvocationStat
public abstract class AbstractInvocationStat implements InvocationStat { /** * 統計維度 */ protected final InvocationStatDimension dimension; /** * 調用次數 */ protected final AtomicLong invokeCount = new AtomicLong(0L); /** * 異常次數 */ protected final AtomicLong exceptionCount = new AtomicLong(0L); /** * when useless in one window, this value increment 1. <br /> * If this value is greater than threshold, this stat will be deleted. */ private final transient AtomicInteger uselessCycle = new AtomicInteger(0); }
上面的這些參數,我們接下來還會用到。
權重降級具體實現
TimeWindowRegulatorListener是TimeWIndowRegulator的內部類。
class TimeWindowRegulatorListener implements InvocationStatListener { @Override public void onAddInvocationStat(InvocationStat invocationStat) { //度量策略不為空 if (measureStrategy != null) { //ServiceHorizontalMeasureStrategy MeasureModel measureModel = measureStrategy.buildMeasureModel(invocationStat); if (measureModel != null) { measureModels.add(measureModel); startRegulate(); } } } @Override public void onRemoveInvocationStat(InvocationStat invocationStat) { if (measureStrategy != null) { measureStrategy.removeMeasureModel(invocationStat); } } }
這個監聽器裡面就是調用ServiceHorizontalMeasureStrategy#buildMeasureModel,返回調控模型。
我們先看一下MeasureModel裡面封裝了什麼:
MeasureModel
public class MeasureModel { /** * App name of measure model * 服務名 */ private final String appName; /** * service name of measure model * 被調用的服務 */ private final String service; /** * all dimension statics stats of measure model * InvokeStat集合 */ private final ConcurrentHashSet<InvocationStat> stats = new ConcurrentHashSet<InvocationStat>(); .... }
所以根據這幾個全局變數,我們可以推測,MeasureModel應該是根據appName+service為維度,裡面有很多的InvocationStat。
我們再回到ServiceHorizontalMeasureStrategy#buildMeasureModel
public MeasureModel buildMeasureModel(InvocationStat invocationStat) { InvocationStatDimension statDimension = invocationStat.getDimension(); //AppName + ":" + Service String key = statDimension.getDimensionKey(); MeasureModel measureModel = appServiceMeasureModels.get(key); if (measureModel == null) { measureModel = new MeasureModel(statDimension.getAppName(), statDimension.getService()); MeasureModel oldMeasureModel = appServiceMeasureModels.putIfAbsent(key, measureModel); if (oldMeasureModel == null) { measureModel.addInvocationStat(invocationStat); return measureModel; } else { oldMeasureModel.addInvocationStat(invocationStat); return null; } } else { measureModel.addInvocationStat(invocationStat); return null; } }
buildMeasureModel方法裡面的做法也和我上面說的一樣。根據appName+service為維度封裝不同的invocationStat在MeasureModel裡面。
接著,回到TimeWindowRegulatorListener#onAddInvocationStat中,會往下調用startRegulate方法。
/** * 度量執行緒池 */ private final ScheduledService measureScheduler = new ScheduledService("AFT-MEASURE", ScheduledService.MODE_FIXEDRATE, new MeasureRunnable(), 1, 1, TimeUnit.SECONDS); public void startRegulate() { if (measureStarted.compareAndSet(false, true)) { measureScheduler.start(); } }
ScheduledService是一個執行緒池,measureScheduler變數實例化了一個固定頻率執行延遲執行緒池,會每1秒鐘固定調用MeasureRunnable的run方法。
MeasureRunnable是TimeWindowRegulator的內部類:
private class MeasureRunnable implements Runnable { @Override public void run() { measureCounter.incrementAndGet(); //遍歷TimeWindowRegulatorListener加入的MeasureModel實例 for (MeasureModel measureModel : measureModels) { try { //時間窗口是10,也就是說默認每過10秒才能進入下面的方法。 if (isArriveTimeWindow(measureModel)) { //ServiceHorizontalMeasureStrategy MeasureResult measureResult = measureStrategy.measure(measureModel); regulationExecutor.submit(new RegulationRunnable(measureResult)); } } catch (Exception e) { LOGGER.errorWithApp(measureModel.getAppName(), "Error when doMeasure: " + e.getMessage(), e); } } } private boolean isArriveTimeWindow(MeasureModel measureModel) { //timeWindow默認是10 long timeWindow = FaultToleranceConfigManager.getTimeWindow(measureModel.getAppName()); return measureCounter.get() % timeWindow == 0; } }
我們先來到ServiceHorizontalMeasureStrategy#measure來看看是怎麼判斷為異常或正常
如何判斷一個節點是異常還是正常
我們首先不看程式碼的實現,先白話的說明一下是如何實現的。
- 首先在FaultToleranceSubscriber#onEvent中收到同步或非同步結果事件後,就會從工廠中獲取這次調用的 InvokeStat(如果 InvokeStat 已經存在則直接返回,如果沒有則創建新的並保持到快取中)。通過調用 InvokeStat 的 invoke 和 catchException 方法統計調用次數和異常次數。
- 然後在MeasureRunnable方法中根據設置的窗口期,在到達窗口期的時候會從 MeasueModel 的各個 InvokeStat 創建一份鏡像數據,表示當前串口內的調用情況。
- 對所有的節點進行度量,計算出所有節點的平均異常率,如果某個節點的異常率大於平均異常率到一定比例,則判定為異常。
我這裡選用官方的例子來進行說明:
假如有三個節點,提供同一服務,調用次數和異常數如表格所示:
invokeCount | expCount | |
---|---|---|
invokeStat 1 | 5 | 4 |
invokeStat 2 | 10 | 1 |
invokeStat 3 | 10 | 0 |
結合上述例子,度量策略的大致邏輯如下:
- 首先統計該服務下所有 ip 的平均異常率,並用 averageExceptionRate 表示。平均異常率比較好理解,即異常總數 / 總調用次數,上例中 averageExceptionRate =(1 + 4) / (5 + 10 + 10) = 0.2.
- 當某個ip的窗口調用次數小於該服務的最小窗口調用次數( leastWindCount )則忽略並將狀態設置為 IGNOGRE。否則進行降級和恢復度量。 如 invokeStat 1 的 invokeCount 為5,如果 leastWindCount 設置為6 則 invokeStat 1 會被忽略。
- 當某個ip的 時間窗口內的異常率和服務平均異常比例 windowExceptionRate 大於 配置的 leastWindowExceptionRateMultiplte (最小時間窗口內異常率和服務平均異常率的降級比值),那麼將該IP設置為 ABNORMAL, 否則設置為 HEALTH.
windowExceptionRate 是異常率和服務平均異常比例,invokeStat 1 的異常率為 4/5 = 0.8, 則其對應的 windowExceptionRate = 0.8 / 0.2 = 4. 假設 leastWindowExceptionRateMultiplte =4, 那麼 invokeStat 1 是一次服務,則需要進行降級操作。
接下來我們來看具體的源碼實現:
ServiceHorizontalMeasureStrategy#measure
public MeasureResult measure(MeasureModel measureModel) { MeasureResult measureResult = new MeasureResult(); measureResult.setMeasureModel(measureModel); String appName = measureModel.getAppName(); List<InvocationStat> stats = measureModel.getInvocationStats(); if (!CommonUtils.isNotEmpty(stats)) { return measureResult; } //1 //這個方法主要是複製出一個當前時間點的調用情況,只統計被複制的InvocationStat //如果有被新剔除的InvocationStat,則不會存在於該次獲取結果中。 List<InvocationStat> invocationStats = getInvocationStatSnapshots(stats); //FaultToleranceConfig的timeWindow所設置的,時間窗口,默認是10 long timeWindow = FaultToleranceConfigManager.getTimeWindow(appName); /* leastWindowCount在同一次度量中保持不變*/ //默認InvocationStat如果要參與統計的窗口內最低調用次數,時間窗口內,至少調用的次數.在時間窗口內總共都不足10,認為不需要調控. long leastWindowCount = FaultToleranceConfigManager.getLeastWindowCount(appName); //最小是1,也就是時間窗口內,只要調用了就進行統計 leastWindowCount = leastWindowCount < LEGAL_LEAST_WINDOW_COUNT ? LEGAL_LEAST_WINDOW_COUNT : leastWindowCount; //2. /* 計算平均異常率和度量單個ip的時候都需要使用到appWeight*/ double averageExceptionRate = calculateAverageExceptionRate(invocationStats, leastWindowCount); //表示當前機器是平均異常率的多少倍才降級,默認是6 double leastWindowExceptionRateMultiple = FaultToleranceConfigManager .getLeastWindowExceptionRateMultiple(appName); for (InvocationStat invocationStat : invocationStats) { MeasureResultDetail measureResultDetail = null; InvocationStatDimension statDimension = invocationStat.getDimension(); long windowCount = invocationStat.getInvokeCount(); //3 //這裡主要是根據Invocation的實際權重計算該Invocation的實際最小窗口調用次數 long invocationLeastWindowCount = getInvocationLeastWindowCount(invocationStat, ProviderInfoWeightManager.getWeight(statDimension.getProviderInfo()), leastWindowCount); //4 //當總調用的次數為0的時候,averageExceptionRate =-1,這個時候可以設置為忽略 if (averageExceptionRate == -1) { measureResultDetail = new MeasureResultDetail(statDimension, MeasureState.IGNORE); } else { if (invocationLeastWindowCount != -1 && windowCount >= invocationLeastWindowCount) { //獲取異常率 double windowExceptionRate = invocationStat.getExceptionRate(); //沒有異常的情況,設置狀態為健康 if (averageExceptionRate == 0) { measureResultDetail = new MeasureResultDetail(statDimension, MeasureState.HEALTH); } else { //5 //這裡主要是看這次被遍歷到invocationStat的異常率和平均異常率之比 double windowExceptionRateMultiple = CalculateUtils.divide( windowExceptionRate, averageExceptionRate); //如果當前的invocationStat的異常是平均異常的6倍,那麼就設置狀態為異常 measureResultDetail = windowExceptionRateMultiple >= leastWindowExceptionRateMultiple ? new MeasureResultDetail(statDimension, MeasureState.ABNORMAL) : new MeasureResultDetail(statDimension, MeasureState.HEALTH); } measureResultDetail.setAbnormalRate(windowExceptionRate); measureResultDetail.setAverageAbnormalRate(averageExceptionRate); measureResultDetail.setLeastAbnormalRateMultiple(leastWindowExceptionRateMultiple); } else { measureResultDetail = new MeasureResultDetail(statDimension, MeasureState.IGNORE); } } measureResultDetail.setWindowCount(windowCount); measureResultDetail.setTimeWindow(timeWindow); measureResultDetail.setLeastWindowCount(invocationLeastWindowCount); measureResult.addMeasureDetail(measureResultDetail); } //打日誌 logMeasureResult(measureResult, timeWindow, leastWindowCount, averageExceptionRate, leastWindowExceptionRateMultiple); InvocationStatFactory.updateInvocationStats(invocationStats); return measureResult; }
上面這個方法有點長,我給這個方法標註了數字,跟著數字標記去看。
- getInvocationStatSnapshots
public static List<InvocationStat> getInvocationStatSnapshots(List<InvocationStat> stats) { List<InvocationStat> snapshots = new ArrayList<InvocationStat>(stats.size()); for (InvocationStat stat : stats) { //賦值一個InvocationStat出來 InvocationStat snapshot = stat.snapshot(); //如果被調用的次數小於0 if (snapshot.getInvokeCount() <= 0) { if (stat.getUselessCycle().incrementAndGet() > 6) { // 6 個時間窗口無調用,刪除統計 InvocationStatFactory.removeInvocationStat(stat); InvocationStatDimension dimension = stat.getDimension(); String appName = dimension.getAppName(); if (LOGGER.isDebugEnabled(appName)) { LOGGER.debugWithApp(appName, "Remove invocation stat : {}, {} because of useless cycle > 6", dimension.getDimensionKey(), dimension.getProviderInfo()); } } } else { //如果被調用了,那麼就從新計數 stat.getUselessCycle().set(0); snapshots.add(snapshot); } } return snapshots; } //ServiceExceptionInvocationStat#snapshot public InvocationStat snapshot() { ServiceExceptionInvocationStat invocationStat = new ServiceExceptionInvocationStat(dimension); invocationStat.setInvokeCount(getInvokeCount()); invocationStat.setExceptionCount(getExceptionCount()); return invocationStat; }
首先 這個方法裡面首先是遍歷所有的InvocationStat,然後調用snapshot創建一個新的InvocationStat實例。
其次 校驗新的InvocationStat實例調用次數是不是小於等於0,如果是,說明沒有在時間窗口內沒有被調用過一次,那麼就再看是不是在6 個時間窗口無調用,如果是,那麼就刪除統計數據
然後返回新的InvocationStat集合
- calculateAverageExceptionRate
private double calculateAverageExceptionRate(List<InvocationStat> invocationStats, long leastWindowCount) { long sumException = 0; long sumCall = 0; for (InvocationStat invocationStat : invocationStats) { long invocationLeastWindowCount = getInvocationLeastWindowCount(invocationStat, ProviderInfoWeightManager.getWeight(invocationStat.getDimension().getProviderInfo()), leastWindowCount); //統計所有的invocationStat被調用的次數,和異常次數 if (invocationLeastWindowCount != -1 && invocationStat.getInvokeCount() >= invocationLeastWindowCount) { sumException += invocationStat.getExceptionCount(); sumCall += invocationStat.getInvokeCount(); } } if (sumCall == 0) { return -1; } //計算異常比率 return CalculateUtils.divide(sumException, sumCall); } private long getInvocationLeastWindowCount(InvocationStat invocationStat, Integer weight, long leastWindowCount) { //目標地址原始權重 InvocationStatDimension statDimension = invocationStat.getDimension(); Integer originWeight = statDimension.getOriginWeight(); if (originWeight == 0) { LOGGER.errorWithApp(statDimension.getAppName(), "originWeight is 0,but is invoked. service[" + statDimension.getService() + "];ip[" + statDimension.getIp() + "]."); return -1; } else if (weight == null) { //如果地址還未被調控過或者已經恢復。 return leastWindowCount; } else if (weight == -1) { //如果地址被剔除 return -1; } //這裡主要是根據Invocation的實際權重計算該Invocation的實際最小窗口調用次數 double rate = CalculateUtils.divide(weight, originWeight); long invocationLeastWindowCount = CalculateUtils.multiply(leastWindowCount, rate); return invocationLeastWindowCount < LEGAL_LEAST_WINDOW_COUNT ? LEGAL_LEAST_WINDOW_COUNT : invocationLeastWindowCount; }
這個方法總的來說就是遍歷所有的InvocationStat,然後求和說有的調用次數和異常次數,然後用(異常次數/調用次數)計算平均異常比率。
getInvocationLeastWindowCount方法主要是用來做校驗,如果原始的權重為0,或者為-1,那麼就返回-1。
因為當前的InvocationStat的權重可能被降權過,所以我們不能按原來的最小窗口調用次數來算,所以這裡需要乘以一個比率,然後看是不是小於LEGAL_LEAST_WINDOW_COUNT,返回際權重計算該Invocation的實際最小窗口調用次數。
- if判斷
我們在分析calculateAverageExceptionRate方法的時候看了,如果總的調用次數為0,那麼averageExceptionRate會為-1。代表所有的InvocationStat沒有被調用,我們設置忽略。
那麼接著往下走,會發現有一個averageExceptionRate是否為0的判斷,由於averageExceptionRate =(異常次數/調用次數),所以如果沒有異常的時候設置狀態為健康。
- windowExceptionRateMultipe
windowExceptionRateMultipe這個變數主要是用來看這次被遍歷到invocationStat的異常率和平均異常率之比。如果當前的(異常率/平均異常率)>=leastWindowExceptionRateMultiple,默認是6倍,那麼就設置當前的invocationStat為異常。
根據MeasureResult進行降權或恢復
調用完ServiceHorizontalMeasureStrategy#measure方法後會返回一個MeasureResult,然會新建一個RegulationRunnable實例,丟到regulationExecutor執行緒池中執行。
RegulationRunnable是TimeWeindowRegulator的內部類。
RegulationRunnable#run
RegulationRunnable(MeasureResult measureResult) { this.measureResult = measureResult; } public void run() { List<MeasureResultDetail> measureResultDetails = measureResult.getAllMeasureResultDetails(); for (MeasureResultDetail measureResultDetail : measureResultDetails) { try { doRegulate(measureResultDetail); } catch (Exception e) { LOGGER.errorWithApp(measureResult.getMeasureModel().getAppName(), "Error when doRegulate: " + e.getMessage(), e); } } }
RegulationRunnable會在run方法裡面遍歷所有的measureResult,然後調用doRegulate方法進行降權或恢復的處理
void doRegulate(MeasureResultDetail measureResultDetail) { MeasureState measureState = measureResultDetail.getMeasureState(); InvocationStatDimension statDimension = measureResultDetail.getInvocationStatDimension(); //默認是否進行降級 ,默認為否 ServiceHorizontalRegulationStrategy boolean isDegradeEffective = regulationStrategy.isDegradeEffective(measureResultDetail); if (isDegradeEffective) { measureResultDetail.setLogOnly(false); if (measureState.equals(MeasureState.ABNORMAL)) { //這裡是為了以防對太多節點做了降權,所以默認限制只能最多給兩個節點降權 boolean isReachMaxDegradeIpCount = regulationStrategy.isReachMaxDegradeIpCount(measureResultDetail); if (!isReachMaxDegradeIpCount) { //降權 WeightDegradeStrategy degradeStrategy.degrade(measureResultDetail); } else { String appName = measureResult.getMeasureModel().getAppName(); if (LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_REGULATION_ABNORMAL_NOT_DEGRADE, "Reach degrade number limit.", statDimension.getService(), statDimension.getIp(), statDimension.getAppName())); } } } else if (measureState.equals(MeasureState.HEALTH)) { boolean isExistDegradeList = regulationStrategy.isExistInTheDegradeList(measureResultDetail); if (isExistDegradeList) { //恢復 recoverStrategy.recover(measureResultDetail); regulationStrategy.removeFromDegradeList(measureResultDetail); } //沒有被降級過,因此不需要被恢復。 } } else { measureResultDetail.setLogOnly(true); if (measureState.equals(MeasureState.ABNORMAL)) { //這個時候調用degrade,主要是列印日誌用的 degradeStrategy.degrade(measureResultDetail); String appName = measureResult.getMeasureModel().getAppName(); if (LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_REGULATION_ABNORMAL_NOT_DEGRADE, "Degrade switch is off", statDimension.getService(), statDimension.getIp(), statDimension.getAppName())); } } } } }
我們分兩種情況進行分析。
- 如果該節點是異常節點
首先會調用ServiceHorizontalRegulationStrategy#isReachMaxDegradeIpCount方法。
ServiceHorizontalRegulationStrategy#isReachMaxDegradeIpCount
public boolean isReachMaxDegradeIpCount(MeasureResultDetail measureResultDetail) { InvocationStatDimension statDimension = measureResultDetail.getInvocationStatDimension(); ConcurrentHashSet<String> ips = getDegradeProviders(statDimension.getDimensionKey()); String ip = statDimension.getIp(); if (ips.contains(ip)) { return false; } else { //默認一個服務能夠調控的最大ip數 int degradeMaxIpCount = FaultToleranceConfigManager.getDegradeMaxIpCount(statDimension.getAppName()); ipsLock.lock(); try { if (ips.size() < degradeMaxIpCount) { ips.add(ip); return false; } else { return true; } } finally { ipsLock.unlock(); } } }
這個方法是為了能夠控制最多一個服務下面能調控多少個節點。比如一個服務下面只有3個節點,其中2個節點出了問題,通過調控解決了,那麼不可能將第三個節點也進行調控了吧,必須要進行人工干預了,為啥會出現這樣的問題。
然後會調用WeightDegradeStrategy#degrade對節點進行降權
WeightDegradeStrategy#degrade
public void degrade(MeasureResultDetail measureResultDetail) { //調用LogPrintDegradeStrategy方法,列印日誌用 super.degrade(measureResultDetail); if (measureResultDetail.isLogOnly()) { return; } InvocationStatDimension statDimension = measureResultDetail.getInvocationStatDimension(); String appName = statDimension.getAppName(); ProviderInfo providerInfo = statDimension.getProviderInfo(); // if provider is removed or provider is warming up //如果為空,或是在預熱中,則直接返回 if (providerInfo == null || providerInfo.getStatus() == ProviderStatus.WARMING_UP) { return; } //目前provider權重 int currentWeight = ProviderInfoWeightManager.getWeight(providerInfo); //降權比重 double weightDegradeRate = FaultToleranceConfigManager.getWeightDegradeRate(appName); //最少權重,默認為1 int degradeLeastWeight = FaultToleranceConfigManager.getDegradeLeastWeight(appName); //權重比率 * 目前權重 int degradeWeight = CalculateUtils.multiply(currentWeight, weightDegradeRate); //不能小於最小值 degradeWeight = degradeWeight < degradeLeastWeight ? degradeLeastWeight : degradeWeight; // degrade weight of this provider info boolean success = ProviderInfoWeightManager.degradeWeight(providerInfo, degradeWeight); if (success && LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, "the weight was degraded. serviceUniqueName:[" + statDimension.getService() + "],ip:[" + statDimension.getIp() + "],origin weight:[" + currentWeight + "],degraded weight:[" + degradeWeight + "]."); } } //ProviderInfoWeightManager public static boolean degradeWeight(ProviderInfo providerInfo, int weight) { providerInfo.setStatus(ProviderStatus.DEGRADED); providerInfo.setWeight(weight); return true; }
這個方法實際上就是權重拿出來,然後根據比率進行設值並且不能小於最小的比重。
最後調用ProviderInfoWeightManager把當前的節點設值為DEGRADED,並設值新的權重。
- 如果是健康節點
調用ServiceHorizontalRegulationStrategy#isExistInTheDegradeList判斷一下當前節點有沒有被降級
ServiceHorizontalRegulationStrategy#isExistInTheDegradeList
public boolean isExistInTheDegradeList(MeasureResultDetail measureResultDetail) { InvocationStatDimension statDimension = measureResultDetail.getInvocationStatDimension(); ConcurrentHashSet<String> ips = getDegradeProviders(statDimension.getDimensionKey()); return ips != null && ips.contains(statDimension.getIp()); }
在調用isReachMaxDegradeIpCount方法的時候會把被降級的ip放入到ips集合中,所以這裡只要獲取就可以了。
如果該節點已被降級那麼調用WeightRecoverStrategy#recover進行恢復
WeightRecoverStrategy#recover
public void recover(MeasureResultDetail measureResultDetail) { InvocationStatDimension statDimension = measureResultDetail.getInvocationStatDimension(); ProviderInfo providerInfo = statDimension.getProviderInfo(); // if provider is removed or provider is warming up if (providerInfo == null || providerInfo.getStatus() == ProviderStatus.WARMING_UP) { return; } Integer currentWeight = ProviderInfoWeightManager.getWeight(providerInfo); if (currentWeight == -1) { return; } String appName = statDimension.getAppName(); //默認2 double weightRecoverRate = FaultToleranceConfigManager.getWeightRecoverRate(appName); //也就是說一次只能恢復到2倍,不會一次性就恢復到originWeight int recoverWeight = CalculateUtils.multiply(currentWeight, weightRecoverRate); int originWeight = statDimension.getOriginWeight(); // recover weight of this provider info if (recoverWeight >= originWeight) { measureResultDetail.setRecoveredOriginWeight(true); //將provider狀態設置為AVAILABLE,並且設置Weight ProviderInfoWeightManager.recoverOriginWeight(providerInfo, originWeight); if (LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, "the weight was recovered to origin value. serviceUniqueName:[" + statDimension.getService() + "],ip:[" + statDimension.getIp() + "],origin weight:[" + currentWeight + "],recover weight:[" + originWeight + "]."); } } else { measureResultDetail.setRecoveredOriginWeight(false); boolean success = ProviderInfoWeightManager.recoverWeight(providerInfo, recoverWeight); if (success && LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, "the weight was recovered. serviceUniqueName:[" + statDimension.getService() + "],ip:[" + statDimension.getIp() + "],origin weight:[" + currentWeight + "],recover weight:[" + recoverWeight + "]."); } } }
這個方法很簡單,各位可以看看我上面的注釋。
總結
總的來說FaultToleranceModule分為兩部分:
- FaultToleranceSubscriber訂閱事件,負責訂閱同步和非同步結果事件
- 根據調用事件進行統計,以及內置的一些策略完成服務的降級和恢復操作。