.Net Core with 微服務 – Polly 服務降級熔斷

在我們實施微服務之後,服務間的調用變的異常頻繁。多個服務之間可能是互相依賴的關係。某個服務出現故障或者是服務間的網絡出現故障都會造成服務調用的失敗,進而影響到某個業務服務處理失敗。某一個服務調用失敗輕則造成當前相關業務無法處理;重則可能耗盡資源而拉垮整個應用。為了儘可能的保證我們生產環境的可用性,至少是部分可用性我們就需要一些策略來保護我們的服務。

服務降級

比如我們的訂單詳情服務裏面會調用會員信息服務接口。如果會員信息服務接口故障會造成訂單詳情服務也同樣故障。這時候我們可以對會員信息服務接口進行降級,在發生故障的時候直接返回固定的信息從而保證訂單詳情主服務是可用的。
另外一種情況是服務器的資源總是有限的,在面對突發的高並發,高流量情況下我們也可以對部分服務進行降級處理,從而釋放更多的資源給核心服務,從而保證核心業務正常工作。

熔斷

我們的服務很可能是一個鏈式的調用的過程。期間如果某個服務出現故障,特別是出現超時故障的時候很有可能耗盡服務器的資源從而影響整個服務。比如訂單詳情服務依賴會員信息服務,如果會員信息服務因為某些原因出現處理過慢、異常等情況,會阻塞整個訂單詳情服務的鏈路。而可能其它服務同樣依賴訂單詳情服務,這樣其它服務同樣也會被阻塞。資源被越來越多的消耗而不釋放,造成所有服務處理越來越慢,積壓的請求越來越多, 猶如死循環一般,直到所有資源都被耗盡,整個生成環境奔潰。
所以面對這種情況當我們某個服務持續出現故障的時候我們可以直接斷開對它的調用依賴,從而保證不會因為請求積壓造成資源耗盡的情況發生。

Polly

Polly 是一個開源的彈性跟瞬態故障處理類庫。它可以在你的程序出現故障,超時,或者返回值達成某種條件的時候進行多種策略處理,比如重試、降級、熔斷等等。它是 .NET Foundation 的成員項目。

Policy.Handle< T >

Policy.Handle< T > 用來定義異常的類型,表示當執行的方法發生某種異常的時候定義為故障。
當故障發生的時候 Polly 會為我們自動執行某種恢復策略,比如重試。
我們演示項目中,訂單接口需要獲取會員的詳細信息。
http 有一定幾率失敗,下面我們演示下如果使用 Polly 在出現當請求網絡失敗的時候進行3次重試。

var memberJson = await Policy.Handle<HttpRequestException>().RetryAsync(3).ExecuteAsync(async () =>
                {
                    using (var httpClient = new HttpClient())
                    {
                        httpClient.BaseAddress = new Uri($"//{memberServiceAddress.Address}:{memberServiceAddress.Port}");
                        var memberResult = await httpClient.GetAsync("/member/" + order.MemberId);
                        memberResult.EnsureSuccessStatusCode();
                        var json = await memberResult.Content.ReadAsStringAsync();
                        return json;
                    }
                });

使用 Policy.Handle< HttpRequestException > 來捕獲網絡異常。當發生 HttpRequestException 的時候觸發 RetryAsync 重試,並且最多重試3次。
以下我們接着演示下當 http 的返回值是500的時候進行3次重試:

Policy.HandleResult< T>

Policy.HandleResult< T > 用來定義返回值的類型,表示當執行的方法返回值達成某種條件的時候定義為故障。
當故障發生的時候 Polly 會為我們自動執行某種恢復策略,比如重試。
下面我們演示下如何使用 Polly 在出現當請求結果為 http status_code 500 的時候進行3次重試。

  var memberResult = await Policy.HandleResult<HttpResponseMessage>(x => (int)x.StatusCode == 500).RetryAsync(3).ExecuteAsync(async () =>
                  {
                      using (var httpClient = new HttpClient())
                      {
                          httpClient.BaseAddress =
                              new Uri($"//{memberServiceAddress.Address}:{memberServiceAddress.Port}");
                          var result = await httpClient.GetAsync("/member/" + order.MemberId);
                          return result;
                      }
                  });

Policy.TimeoutAsync

Policy.TimeoutAsync 表示當一個操作超過設定時間時會引發一個 TimeoutRejectedException 。
這也是一個很常用的故障處理策略。

var memberJson = await Policy.TimeoutAsync(10).ExecuteAsync(async () =>
                {
                    using (var httpClient = new HttpClient())
                    {
                        httpClient.BaseAddress =
                            new Uri($"//{memberServiceAddress.Address}:{memberServiceAddress.Port}");
                        var memberResult = await httpClient.GetAsync("/member/" + order.MemberId);
                        memberResult.EnsureSuccessStatusCode();
                        var json = await memberResult.Content.ReadAsStringAsync();
                        return json;
                    }
                });
            

以上代碼表示當獲取會員詳情的接口超過10秒還未返回結果的時候直接拋出一個 TimeoutRejectedException 異常終止執行。

服務降級

以上我們演示了出現故障的時候如何進行重試,但是所有重試都失敗我們的程序還是會故障。
因為期間某個服務持續的故障導致更多的服務出現故障,一系列連鎖反應後很可能導致整個應用癱瘓。
面對這種情況我們可以把相關服務進行降級。
當相關服務調用失敗的時候我們可以給出一個統一標準的失敗返回值,而不是直接拋出異常。讓我們的程序依然能夠繼續執行下去。
下面我們演示下如何使用 Polly 進行服務調用的降級處理。

 var fallback = Policy<string>.Handle<HttpRequestException>().FallbackAsync("FALLBACK")
                    .WrapAsync(Policy.Handle<HttpRequestException>().RetryAsync(3));
                var memberJson = await fallback.ExecuteAsync(async () =>
                {
                    using (var httpClient = new HttpClient())
                    {
                        httpClient.BaseAddress =
                            new Uri($"//{memberServiceAddress.Address}:{memberServiceAddress.Port}");
                        var result = await httpClient.GetAsync("/member/" + order.MemberId);
                        result.EnsureSuccessStatusCode();
                        var json = await result.Content.ReadAsStringAsync();
                        return json;
                    }

                });
                if (memberJson != "FALLBACK")
                {
                    var member = JsonConvert.DeserializeObject<MemberVM>(memberJson);
                    vm.Member = member;
                }

首先我們使用 Policy 的 FallbackAsync(“FALLBACK”) 方法設置降級的返回值。當我們服務需要降級的時候會返回 “FALLBACK” 的固定值。
同時使用 WrapAsync 方法把重試策略包裹起來。這樣我們就可以達到當服務調用失敗的時候重試3次,如果重試依然失敗那麼返回值降級為固定的 “FALLBACK” 值。

熔斷

通過以上演示,我們的服務當發生故障的時候可以自動重試,自動降級了。雖然現在看起來挺健壯,但是還是會有不小的問題。
當我們引入重試策略後,如果服務調用一直失敗,每次調用都會反覆進行重試,雖然最後會進行降級處理,但是這勢必會影響服務的處理速度。
當流量很大的時候,某個接口服務調用很慢有可能會阻塞整個服務,請求不斷積壓,資源不斷耗盡,速度越來越慢,這是一種惡性循環。最終同樣可能導致整個應用全部癱瘓的嚴重後果。
面對這種情況我們就需要引入熔斷機制。當一個服務的調用頻繁出現故障的時候我們可以認為它當前是不穩定的,在一段時間內我們不應該再去調用這個服務。

        static AsyncCircuitBreakerPolicy circuitBreaker =  Policy.Handle<HttpRequestException>().CircuitBreakerAsync(
            exceptionsAllowedBeforeBreaking: 10,
            durationOfBreak: TimeSpan.FromSeconds(30),
        onBreak: (ex, ts) =>
        {
            Console.WriteLine("circuitBreaker onBreak .");
        },
        onReset: () =>
        {
            Console.WriteLine("circuitBreaker onReset ");
        },
        onHalfOpen: () =>
        {
            Console.WriteLine("circuitBreaker onHalfOpen");
        }
        );

         var retry = Policy.Handle<HttpRequestException>().RetryAsync(3);
                var fallback = Policy<string>.Handle<HttpRequestException>().Or<BrokenCircuitException>().FallbackAsync("FALLBACK")
                    .WrapAsync(circuitBreaker.WrapAsync(retry));
                var memberJson = await fallback.ExecuteAsync(async () =>
                {
                    using (var httpClient = new HttpClient())
                    {
                        httpClient.BaseAddress =
                            new Uri($"//{memberServiceAddress.Address}:{memberServiceAddress.Port}");
                        var result = await httpClient.GetAsync("/member/" + order.MemberId);
                        result.EnsureSuccessStatusCode();
                        var json = await result.Content.ReadAsStringAsync();
                        return json;
                    }
                });
                if (memberJson != "FALLBACK")
                {
                    var member = JsonConvert.DeserializeObject<MemberVM>(memberJson);
                    vm.Member = member;
                }

首先定義 circuitBreaker 熔斷器策略。這個策略注意最好定義成靜態變量。這樣能夠以整個完整服務的錯誤為基礎來判斷是否開啟斷路器。
然後在業務代碼內定義重試策略,降級策略。我們使這些策略一一嵌套。fallback => circuitBreaker => retry ,表示當發生異常的時候首先開始重試,
重試失敗後嘗試熔斷,如果達到熔斷的條件就拋出 BrokenCircuitException 異常,降級策略捕獲到 HttpRequestException 或者 BrokenCircuitException 進行降級操作。
Polly 還有很多用法比如「緩存」、「隔離」 等策略,這裡不在一一演示了。更多請查看文檔://github.com/App-vNext/Polly/wiki

使用AOP思想改進體驗

通過以上對於 Polly 的演示,雖然我們完成了簡單的重試、服務降級、熔斷等功能。但是顯然對於每個方法都去使用 Polly 編寫一堆策略的話實在是太麻煩了。那麼有什麼辦法能改進一下 Polly 的使用體驗嗎?答案是使用 AOP 的思想,通過在執行的方法上打上 Attribute 的方式來指定 Polly 的策略。
下面我們使用 lemon 大佬的 AspectCore AOP 組件結合 Polly 來演示下如何通過 AOP 的思想來處理重試、降級、熔斷等策略。

Install-Package AspectCore.Core

通過 nuget 安裝 AspectCore 核心類庫。

 public class PollyHandleAttribute : AbstractInterceptorAttribute
    {
        /// <summary>
        /// 重試次數
        /// </summary>
        public int RetryTimes { get; set; } 

        /// <summary>
        /// 是否熔斷
        /// </summary>
        public bool IsCircuitBreaker { get; set; }

        /// <summary>
        /// 熔斷前的異常次數
        /// </summary>
        public int ExceptionsAllowedBeforeBreaking { get; set; }

        /// <summary>
        /// 熔斷時間
        /// </summary>
        public int SecondsOfBreak { get; set; }

        /// <summary>
        /// 降級方法
        /// </summary>
        public string FallbackMethod { get; set; }

        /// <summary>
        /// 一些方法級別統一計數的策略,比如熔斷
        /// </summary>
        static ConcurrentDictionary<string, AsyncCircuitBreakerPolicy> policyCaches = new ConcurrentDictionary<string, AsyncCircuitBreakerPolicy>();

        public PollyHandleAttribute()
        {

        }

        public override async Task Invoke(AspectContext context, AspectDelegate next)
        {
            Context pollyCtx = new Context();
            pollyCtx["aspectContext"] = context;

            Polly.Wrap.AsyncPolicyWrap policyWarp = null;

            var retry = Policy.Handle<HttpRequestException>().RetryAsync(RetryTimes);
            var fallback = Policy.Handle<Exception>().FallbackAsync(async (fallbackContent, token) =>
            {
                AspectContext aspectContext = (AspectContext)fallbackContent["aspectContext"];
                var fallBackMethod = context.ServiceMethod.DeclaringType.GetMethod(this.FallbackMethod);
                var fallBackResult = fallBackMethod.Invoke(context.Implementation, context.Parameters);
                aspectContext.ReturnValue = fallBackResult;
            }, async (ex, t) => { });
            AsyncCircuitBreakerPolicy circuitBreaker = null;
            if (IsCircuitBreaker)
            {
                var cacheKey = $"{context.ServiceMethod.DeclaringType.ToString()}_{context.ServiceMethod.Name}";
                if (policyCaches.TryGetValue(cacheKey, out circuitBreaker))
                {
                    //從緩存內獲取該方法的全局熔斷策略
                }
                else
                {
                    circuitBreaker = Policy.Handle<Exception>().CircuitBreakerAsync(
                      exceptionsAllowedBeforeBreaking: this.ExceptionsAllowedBeforeBreaking,
                      durationOfBreak: TimeSpan.FromSeconds(this.SecondsOfBreak));

                    policyCaches.TryAdd(cacheKey, circuitBreaker);
                }
            }

            if (circuitBreaker == null)
            {
                policyWarp = fallback.WrapAsync(retry);
            }
            else
            {
                policyWarp = fallback.WrapAsync(circuitBreaker.WrapAsync(retry));
            }


            await policyWarp.ExecuteAsync(ctx => next(context), pollyCtx);
        }
    }

定義一個 PollyHandleAttribute 類,它繼承自 AbstractInterceptorAttribute 類,然後實現 Invoke 方法。我們需要在 Invoke 方法內動態構造出 Polly 的相關策略,然後通過 Polly 去執行真正的方法。這裡主要需要注意的是熔斷策略不能每次新建,因為對於熔斷來說是需要全局統計該方法的異常數量來判斷是否熔斷的,所以需要把熔斷策略緩存起來。
這個類參考了 Edison Zhou 大佬的部分代碼,原文:Polly+AspectCore實現熔斷與降級機制

   public interface IMemberService
    {
        Task<MemberVM> GetMemberInfo(string id);
        MemberVM GetMemberInfoFallback(string id);
    }

public class MemberService : IMemberService
    {
        private IConsulService _consulservice;

        public MemberService(IConsulService consulService)
        {
            _consulservice = consulService;
        }

        [PollyHandle(IsCircuitBreaker = true, FallbackMethod = "GetMemberInfoFallback", ExceptionsAllowedBeforeBreaking = 5, SecondsOfBreak = 30, RetryTimes = 3)]
        public async Task<MemberVM> GetMemberInfo(string id)
        {
            var memberServiceAddresses = await _consulservice.GetServicesAsync("member_center");
            var memberServiceAddress = memberServiceAddresses.FirstOrDefault();

            using (var httpClient = new HttpClient())
            {
                httpClient.BaseAddress =
                    new Uri($"//{memberServiceAddress.Address}:{memberServiceAddress.Port}");
                var result = await httpClient.GetAsync("/member/" + id);
                result.EnsureSuccessStatusCode();
                var json = await result.Content.ReadAsStringAsync();

                if (string.IsNullOrEmpty(json))
                {
                    return JsonConvert.DeserializeObject<MemberVM>(json);
                }
            }

            return null;
        }

        public MemberVM GetMemberInfoFallback(string id)
        {
            return null;
        }
    }

因為我們需要在方法上標記 PollyHandleAttribute ,所以把獲取會員相關的邏輯封住進 MemberService 的 GetMemberInfo 方法內。並且在方法上打上Attribute :
[PollyHandle(IsCircuitBreaker = true, FallbackMethod = “GetMemberInfoFallback”, ExceptionsAllowedBeforeBreaking = 5, SecondsOfBreak = 30, RetryTimes = 3)] 直接通過 AOP 的方式來配置 Polly 的策略,這樣就方便了很多。
上面這些配置好之後,下面開始就是如何使 aspectcore 接管 asp.net core 的依賴注入了。根據文檔也很簡單:

Install-Package AspectCore.Extensions.DependencyInjection

通過 nuget 安裝 AspectCore.Extensions.DependencyInjection 包。

  public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureWebHostDefaults(webBuilder =>
                {
                    webBuilder.ConfigureKestrel(options =>
                    {
                        options.ListenAnyIP(6001);
                    });
                    webBuilder.UseStartup<Startup>();
                })
                .UseServiceProviderFactory(new DynamicProxyServiceProviderFactory());

在 CreateHostBuilder 內使用 UseServiceProviderFactory 替換 ServiceProviderFactory 為 aspectcore 的實現。

     public void ConfigureServices(IServiceCollection services)
        {
            services.AddSingleton<IMemberService, MemberService>();

            ...

            services.ConfigureDynamicProxy();
        }

在 ConfigureServices 方法內配置 IMemberService 的依賴關係以及配置 aspectcore 的動態代理。

總結

通過以上文字我們大致了解了什麼是服務降級、什麼是熔斷。並且通過 Polly 演示了如何處理這些情況。最後使用 lemon 大佬的 AspectCore 封裝成一個 Attribute 來演示如何通過 AOP 的思想來簡化 Polly 的使用。

謝謝閱讀。

演示項目地址

//github.com/kklldog/myhotel_microservice

相關文章

NET Core with 微服務 – 什麼是微服務
.Net Core with 微服務 – 架構圖
.Net Core with 微服務 – Ocelot 網關
.Net Core with 微服務 – Consul 註冊中心
.Net Core with 微服務 – Seq 日誌聚合
.Net Core with 微服務 – Elastic APM
.Net Core with 微服務 – Consul 配置中心

關注我的公眾號一起玩轉技術