[Abp vNext 源碼分析] – 12. 後台作業與後台工作者

  • 2019 年 10 月 25 日
  • 筆記

一、簡要說明

文章信息:

基於的 ABP vNext 版本:1.0.0

創作日期:2019 年 10 月 24 日晚

更新日期:暫無

ABP vNext 提供了後台工作者和後台作業的支持,基本實現與原來的 ABP 框架類似,並且 ABP vNext 還提供了對 HangFire 和 RabbitMQ 的後台作業集成。開發人員在使用這些第三方庫的時候,基本就是開箱即用,不需要做其他複雜的配置。

後台作業在系統開發的過程當中,是比較常用的功能。因為總是有一些長耗時的任務,而這些任務我們不是立即響應的,例如 Excel 文檔導入、批量發送短訊通知等。

後台工作者 的話,ABP vNext 的實現就是在 CLR 的 Timer 之上封裝了一層,周期性地執行用戶邏輯。ABP vNext 默認提供的 後台任務管理器,就是在後台工作者基礎之上進行的封裝。

涉及到後台任務、後台工作者的模塊一共有 6 個,它們分別是:

  • Volo.Abp.Threading :提供了一些常用的線程組件,其中 AbpTimer 就是在裏面實現的。
  • Volo.Abp.BackgroundWorkers :後台工作者的定義和實現。

  • Volo.Abp.BackgroundJobs.Abstractions :後台任務的一些共有定義。
  • Volo.Abp.BackgroundJobs :默認的後台任務管理器實現。
  • Volo.Abp.BackgroundJobs.HangFire :基於 Hangfire 庫實現的後台任務管理器。
  • Volo.Abp.BackgroundJobs.RabbitMQ : 基於 RabbitMQ 實現的後台任務管理器。

二、源碼分析

線程組件

健壯的計時器

CLR 為我們提供了多種計時器,我們一般使用的是 System.Threading.Timer ,它是基於 CLR 線程池的一個周期計時器,會根據我們配置的 Period (周期) 定時執行。在 CLR 線程池中,所有的 Timer 只有 1 個線程為其服務。這個線程直到下一個計時器的觸發時間,當下一個 Timer 對象到期時,這個線程就會將 Timer 的回調方法通過 ThreadPool.QueueUserWorkItem() 扔到線程池去執行。

不過這帶來了一個問題,即你的回調方法執行時間超過了計時器的周期,那麼就會造成上一個任務還沒執行完成又開始執行新的任務。

解決這個方法其實很簡單,即啟動之後,將周期設置為 Timeout.Infinite ,這樣只會執行一次。當回調方法執行完成之後,就設置 dueTime 參數說明下次執行要等待多久,並且周期還是 Timeout.Infinite

ABP vNext 已經為我們提供了健壯的計時器,該類型的定義是 AbpTimer ,在內部用到了 volatile 關鍵字和 Monitor 實現 條件變量模式 解決多線程環境下的問題。

public class AbpTimer : ITransientDependency  {      // 回調事件。      public event EventHandler Elapsed;        // 執行周期。      public int Period { get; set; }        // 定時器啟動之後就開始運行,默認為 Fasle。      public bool RunOnStart { get; set; }        // 日誌記錄器。      public ILogger<AbpTimer> Logger { get; set; }        private readonly Timer _taskTimer;      // 定時器是否在執行任務,默認為 false。      private volatile bool _performingTasks;      // 定時器的運行狀態,默認為 false。      private volatile bool _isRunning;        public AbpTimer()      {          Logger = NullLogger<AbpTimer>.Instance;            // 回調函數是 TimerCallBack,執行周期為永不執行。          _taskTimer = new Timer(TimerCallBack, null, Timeout.Infinite, Timeout.Infinite);      }        public void Start(CancellationToken cancellationToken = default)      {          // 如果傳遞的周期小於等於 0 ,則拋出異常。          if (Period <= 0)          {              throw new AbpException("Period should be set before starting the timer!");          }            // 使用互斥鎖,保證線程安全。          lock (_taskTimer)          {              // 如果啟動之後就需要馬上執行,則設置為 0,馬上執行任務,否則會等待 Period 毫秒之後再執行(1 個周期)。              _taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite);              // 定時器成功運行了。              _isRunning = true;          }          // 釋放 _taskTimer 的互斥鎖。      }        public void Stop(CancellationToken cancellationToken = default)      {          // 使用互斥鎖。          lock (_taskTimer)          {              // 將內部定時器設置為永不執行的狀態。              _taskTimer.Change(Timeout.Infinite, Timeout.Infinite);                // 檢測當前是否還有正在執行的任務,如果有則等待任務執行完成。              while (_performingTasks)              {                  // 臨時釋放鎖,阻塞當前線程。但是其他線程可以獲取 _timer 的互斥鎖。                  Monitor.Wait(_taskTimer);              }                // 需要表示停止狀態,所以標記狀態為 false。              _isRunning = false;          }      }        private void TimerCallBack(object state)      {          lock (_taskTimer)          {              // 如果有任務正在運行,或者內部定時器已經停止了,則不做任何事情。              if (!_isRunning || _performingTasks)              {                  return;              }                // 臨時停止內部定時器。              _taskTimer.Change(Timeout.Infinite, Timeout.Infinite);              // 表明馬上需要執行任務了。              _performingTasks = true;          }            try          {              // 調用綁定的事件。              Elapsed.InvokeSafely(this, new EventArgs());          }          catch          {              // 注意,這裡將會吞噬異常。          }          finally          {              lock (_taskTimer)              {                  // 任務執行完成,更改狀態。                  _performingTasks = false;                    // 如果定時器還在運行,沒有被停止,則啟動下一個 Period 周期。                  if (_isRunning)                  {                      _taskTimer.Change(Period, Timeout.Infinite);                  }                    // 解除因為釋放鎖而阻塞的線程。                  // 如果已經調用了 Stop,則會喚醒那個因為 Wait 阻塞的線程,就會使 _isRunning 置為 false。                  Monitor.Pulse(_taskTimer);              }          }      }  }

這裡對 _performingTasks_isRunning 字段設置為 volatile 防止指令重排和寄存器緩存。這是因為在 Stop 方法內部使用到的 _performingTasks 可能會被優化,所以將該字段設置為了易失的。

IRunnable 接口

ABP vNext 為任務的啟動和停止,抽象了一個 IRunnable 接口。雖然描述說的是對線程的行為進行抽象,但千萬千萬不要手動調用 Thread.Abort() 。關於 Thread.Abort() 的壞處,這裡不再多加贅述,可以參考 這篇文章 的描述,或者搜索其他的相關文章。

public interface IRunnable  {      // 啟動這個服務。      Task StartAsync(CancellationToken cancellationToken = default);        /// <summary>      /// 停止這個服務。      /// </summary>      Task StopAsync(CancellationToken cancellationToken = default);  }

後台工作者

模塊的構造

後台工作者的模塊行為比較簡單,它定義了在應用程序初始化和銷毀時的行為。在初始化時,後台工作者管理器 獲得所有 後台工作者,並開始啟動它們。在銷毀時,後台工作者管理器獲得所有後台工作者,並開始停止他們,這樣才能夠做到優雅退出。

[DependsOn(      typeof(AbpThreadingModule)      )]  public class AbpBackgroundWorkersModule : AbpModule  {      public override void OnApplicationInitialization(ApplicationInitializationContext context)      {          var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;          // 如果啟用了後台工作者,那麼獲得後台工作者管理器的實例,並調用 StartAsync 啟動所有後台工作者。          if (options.IsEnabled)          {              AsyncHelper.RunSync(                  () => context.ServiceProvider                      .GetRequiredService<IBackgroundWorkerManager>()                      .StartAsync()              );          }      }        public override void OnApplicationShutdown(ApplicationShutdownContext context)      {          var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;          // 如果啟用了後台工作者,那麼獲得後台工作者管理器的實例,並調用 StopAsync 停止所有後台工作者。          if (options.IsEnabled)          {              AsyncHelper.RunSync(                  () => context.ServiceProvider                      .GetRequiredService<IBackgroundWorkerManager>()                      .StopAsync()              );          }      }  }

後台工作者的定義

首先看看 IBackgroundWorker 接口的定義,是空的。不過繼承了 ISingletonDependency 接口,說明我們的每個後台工作者都是 單例 的。

/// <summary>  /// 在後台運行,執行某些任務的工作程序(線程)的接口定義。  /// </summary>  public interface IBackgroundWorker : IRunnable, ISingletonDependency  {    }

ABP vNext 為我們定義了一個抽象的後台工作者類型 BackgroundWorkerBase,這個基類的設計目的是提供一些常用組件(和 ApplicationService 一樣)。

public abstract class BackgroundWorkerBase : IBackgroundWorker  {      //TODO: Add UOW, Localization and other useful properties..?      //TODO: 是否應該提供工作單元、本地化以及其他常用的屬性?        public ILogger<BackgroundWorkerBase> Logger { protected get; set; }        protected BackgroundWorkerBase()      {          Logger = NullLogger<BackgroundWorkerBase>.Instance;      }        public virtual Task StartAsync(CancellationToken cancellationToken = default)      {          Logger.LogDebug("Started background worker: " + ToString());          return Task.CompletedTask;      }        public virtual Task StopAsync(CancellationToken cancellationToken = default)      {          Logger.LogDebug("Stopped background worker: " + ToString());          return Task.CompletedTask;      }        public override string ToString()      {          return GetType().FullName;      }  }

ABP vNext 內部只有一個默認的後台工作者實現 PeriodicBackgroundWorkerBase。從名字上來看,意思是就是周期執行的後台工作者,內部就是用的 AbpTimer 來實現,ABP vNext 將其包裝起來是為了實現統一的模式(後台工作者)。

public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase  {      protected readonly AbpTimer Timer;        // 也就意味着子類必須在其構造函數,指定 timer 的執行周期。      protected PeriodicBackgroundWorkerBase(AbpTimer timer)      {          Timer = timer;          Timer.Elapsed += Timer_Elapsed;      }        // 啟動後台工作者。      public override async Task StartAsync(CancellationToken cancellationToken = default)      {          await base.StartAsync(cancellationToken);          Timer.Start(cancellationToken);      }        // 停止後台工作者。      public override async Task StopAsync(CancellationToken cancellationToken = default)      {          Timer.Stop(cancellationToken);          await base.StopAsync(cancellationToken);      }        // Timer 關聯的周期事件,之所以不直接掛載 DoWork,是為了捕獲異常。      private void Timer_Elapsed(object sender, System.EventArgs e)      {          try          {              DoWork();          }          catch (Exception ex)          {              Logger.LogException(ex);          }      }        // 你要周期執行的任務。      protected abstract void DoWork();  }

我們如果要實現自己的後台工作者,只需要繼承該類,實現 DoWork() 方法即可。

public class TestWorker : PeriodicBackgroundWorkerBase  {      public TestWorker(AbpTimer timer) : base(timer)      {          // 每五分鐘執行一次。          timer.Period = (int)TimeSpan.FromMinutes(5).TotalMilliseconds;      }        protected override void DoWork()      {          Console.WriteLine("後台工作者被執行了。");      }  }

然後在我們自己模塊的 OnPreApplicationInitialization() 方法內解析出後台作業管理器(IBackgroundWorkerManager),調用它的 Add() 方法,將我們定義的 TestWorker 添加到管理器當中即可。

後台工作者管理器

所有的後台工作者都是通過 IBackgroundWorkerManager 進行管理的,它提供了 StartAsync()StopAsync()Add() 方法。前面兩個方法就是 IRunnable 接口定義的,後台工作者管理器直接集成了該接口,後面的 Add() 方法就是用來動態添加我們的後台工作者。

後台工作者管理器的默認實現是 BackgroundWorkerManager 類型,它內部做的事情很簡單,就是維護一個後台工作者集合。每當調用 StartAsync()StopAsync() 方法的時候,都從這個集合遍歷後台工作者,執行他們的啟動和停止方法。

這裡值得注意的一點是,當我們調用 Add() 方法添加了一個後台工作者之後,後台工作者管理器就會啟動這個後台工作者。

public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency, IDisposable  {      protected bool IsRunning { get; private set; }        private bool _isDisposed;        private readonly List<IBackgroundWorker> _backgroundWorkers;        public BackgroundWorkerManager()      {          _backgroundWorkers = new List<IBackgroundWorker>();      }        public virtual void Add(IBackgroundWorker worker)      {          _backgroundWorkers.Add(worker);            // 如果當前後台工作者管理器還處於運行狀態,則調用工作者的 StartAsync() 方法啟動。          if (IsRunning)          {              AsyncHelper.RunSync(                  () => worker.StartAsync()              );          }      }        public virtual void Dispose()      {          if (_isDisposed)          {              return;          }            _isDisposed = true;            //TODO: ???      }        // 啟動,則遍歷集合啟動。      public virtual async Task StartAsync(CancellationToken cancellationToken = default)      {          IsRunning = true;            foreach (var worker in _backgroundWorkers)          {              await worker.StartAsync(cancellationToken);          }      }        // 停止, 則遍歷集合停止。      public virtual async Task StopAsync(CancellationToken cancellationToken = default)      {          IsRunning = false;            foreach (var worker in _backgroundWorkers)          {              await worker.StopAsync(cancellationToken);          }      }  }

上述代碼其實存在一個問題,即後台工作者被釋放以後,是否還能執行 Add() 操作。參考我 之前的文章 ,其實當對象被釋放之後,就應該拋出 ObjectDisposedException 異常。

後台作業

比起後台工作者,我們執行一次性任務的時候,一般會使用後台作業進行處理。比起只能設置固定周期的 PeriodicBackgroundWorkerBase ,集成了 Hangfire 的後台作業管理器,能夠讓我們使用 Cron 表達式,更加靈活地設置任務的執行周期。

模塊的構造

關於後台作業的模塊,我們需要說道兩處。第一處是位於 Volo.Abp.BackgroundJobs.Abstractions 項目的 AbpBackgroundJobsAbstractionsModule ,第二出則是位於 Volo.Abp.BackgroundJobs 項目的 AbpBackgroundJobsModule

AbpBackgroundJobsAbstractionsModule 的主要行為是將符合條件的後台作業,添加到 AbpBackgroundJobOptions 配置當中,以便後續進行使用。

public override void PreConfigureServices(ServiceConfigurationContext context)  {      RegisterJobs(context.Services);  }    private static void RegisterJobs(IServiceCollection services)  {      var jobTypes = new List<Type>();        // 如果註冊的類型符合 IBackgroundJob<> 泛型,則添加到集合當中。      services.OnRegistred(context =>      {          if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IBackgroundJob<>)))          {              jobTypes.Add(context.ImplementationType);          }      });        services.Configure<AbpBackgroundJobOptions>(options =>      {          // 將數據賦值給配置類。          foreach (var jobType in jobTypes)          {              options.AddJob(jobType);          }      });  }

Volo.Abp.BackgroundJobs 內部是 ABP vNext 為我們提供的 默認後台作業管理器,這個後台作業管理器 本質上是一個後台工作者

這個後台工作者會周期性(取決於 AbpBackgroundJobWorkerOptions.JobPollPeriod 值,默認為 5 秒種)地從 IBackgroundJobStore 撈出一堆後台任務,並且在後台執行。至於每次執行多少個後台任務,這也取決於 AbpBackgroundJobWorkerOptions.MaxJobFetchCount 的值,默認值是 1000 個。

注意:

這裡的 Options 類是 AbpBackgroundJobWorkerOptions,別和 AbpBackgroundWorkerOptions 混淆了。

所以在 AbpBackgroundJobsModule 模塊裏面,只做了一件事情,就是將負責後台作業的後台工作者,添加到後台工作者管理器種,並開始周期性地執行。

public override void OnApplicationInitialization(ApplicationInitializationContext context)  {      var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value;      if (options.IsJobExecutionEnabled)      {          // 獲得後台工作者管理器,並將負責後台作業的工作者添加進去。          context.ServiceProvider              .GetRequiredService<IBackgroundWorkerManager>()              .Add(context.ServiceProvider.GetRequiredService<IBackgroundJobWorker>()              );      }  }

後台作業的定義

在上一節裏面看到,只要是實現 IBackgroundJob<TArgs> 類型的都視為一個後台作業。這個後台作業接口,只定義了一個行為,那就是執行(Execute(TArgs))。這裡的 TArgs 泛型作為執行後台作業時,需要傳遞的參數類型。

// 因為是傳入的參數,所以泛型參數是逆變的。  public interface IBackgroundJob<in TArgs>  {      void Execute(TArgs args);  }

檢查源碼,發現 ABP vNext 的郵箱模塊定義了一個郵件發送任務 BackgroundEmailSendingJob,它的實現大概如下。

public class BackgroundEmailSendingJob : BackgroundJob<BackgroundEmailSendingJobArgs>, ITransientDependency  {      // ...        public override void Execute(BackgroundEmailSendingJobArgs args)      {          AsyncHelper.RunSync(() => EmailSender.SendAsync(args.To, args.Subject, args.Body, args.IsBodyHtml));      }  }

後台作業管理器

後台作業都是通過一個後台作業管理器(IBackgroundJobManager)進行管理的,這個接口定義了一個入隊方法(EnqueueAsync()),注意,我們的後台作業在入隊後,不是馬上執行的。

說一下這個入隊處理邏輯:

  1. 首先我們會通過參數的類型,獲取到任務的名稱。(假設任務上面沒有標註 BackgroundJobNameAttribute 特性,那麼任務的名稱就是參數類型的 FullName 。)
  2. 構造一個 BackgroundJobInfo 對象。
  3. 通過 IBackgroundJobStore 持久化任務信息。
public virtual async Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)  {      // 獲取任務名稱。      var jobName = BackgroundJobNameAttribute.GetName<TArgs>();      var jobId = await EnqueueAsync(jobName, args, priority, delay);      return jobId.ToString();  }    protected virtual async Task<Guid> EnqueueAsync(string jobName, object args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)  {      var jobInfo = new BackgroundJobInfo      {          Id = GuidGenerator.Create(),          JobName = jobName,          // 通過序列化器,序列化參數值,方便存儲。這裡內部其實使用的是 JSON.NET 進行序列化。          JobArgs = Serializer.Serialize(args),          Priority = priority,          CreationTime = Clock.Now,          NextTryTime = Clock.Now      };        // 如果任務有執行延遲,則任務的初始執行時間要加上這個延遲。      if (delay.HasValue)      {          jobInfo.NextTryTime = Clock.Now.Add(delay.Value);      }        // 持久化任務信息,方便後面執行後台作業的工作者能夠取到。      await Store.InsertAsync(jobInfo);        return jobInfo.Id;  }

BackgroundJobNameAttribute 相關的方法:

public static string GetName<TJobArgs>()  {      return GetName(typeof(TJobArgs));  }    public static string GetName([NotNull] Type jobArgsType)  {      Check.NotNull(jobArgsType, nameof(jobArgsType));        // 判斷參數類型上面是否標註了特性,並且特性實現了 IBackgroundJobNameProvider 接口。      return jobArgsType                  .GetCustomAttributes(true)                  .OfType<IBackgroundJobNameProvider>()                  .FirstOrDefault()                  ?.Name          // 拿不到名字,則使用類型的 FullName。              ?? jobArgsType.FullName;  }

後台作業的存儲

後台作業的存儲默認是放在內存的,這點可以從 InMemoryBackgroundJobStore 類型實現看出來。在它的內部使用了一個並行字典,通過作業的 Guid 與作業進行關聯綁定。

除了內存實現,在 Volo.Abp.BackgroundJobs.Domain 模塊還有一個 BackgroundJobStore 實現,基本套路與 SettingStore 一樣,都是存儲到數據庫裏面。

public class BackgroundJobStore : IBackgroundJobStore, ITransientDependency  {      protected IBackgroundJobRepository BackgroundJobRepository { get; }        // ...        public BackgroundJobInfo Find(Guid jobId)      {          return ObjectMapper.Map<BackgroundJobRecord, BackgroundJobInfo>(              BackgroundJobRepository.Find(jobId)          );      }        // ...        public void Insert(BackgroundJobInfo jobInfo)      {          BackgroundJobRepository.Insert(              ObjectMapper.Map<BackgroundJobInfo, BackgroundJobRecord>(jobInfo)          );      }        // ...  }

後台作業的執行

默認的後台作業管理器是通過一個後台工作者來執行後台任務的,這個實現叫做 BackgroundJobWorker,這個後台工作者的生命周期也是單例的。後台作業的具體執行邏輯裏面,涉及到了以下幾個類型的交互。

類型 作用
AbpBackgroundJobOptions 提供每個後台任務的配置信息,包括任務的類型、參數類型、任務名稱數據。
AbpBackgroundJobWorkerOptions 提供後台作業工作者的配置信息,例如每個周期 最大執行的作業數量、後台
工作者的 執行周期、作業執行 超時時間 等。
BackgroundJobConfiguration 後台任務的配置信息,作用是將持久化存儲的作業信息與運行時類型進行綁定
和實例化,以便 ABP vNext 來執行具體的任務。
IBackgroundJobExecuter 後台作業的執行器,當我們從持久化存儲獲取到後台作業信息時,將會通過
這個執行器來執行具體的後台作業。
IBackgroundJobSerializer 後台作業序列化器,用於後台作業持久化時進行序列化的工具,默認採用的
是 JSON.NET 進行實現。
JobExecutionContext 執行器在執行後台作業時,是通過這個上下文參數進行執行的,在這個上下
文內部,包含了後台作業的具體類型、後台作業的參數值。
IBackgroundJobStore 前面已經講過了,這個是用於後台作業的持久化存儲,默認實現是存儲在內存。
BackgroundJobPriority 後台作業的執行優先級定義,ABP vNext 在執行後台任務時,會根據任務的優
先級進行排序,以便在後面執行的時候優先級高的任務先執行。

我們來按照邏輯順序走一遍它的實現,首先後台作業的執行工作者會從持久化存儲內,獲取 MaxJobFetchCount 個任務用於執行。從持久化存儲獲取後台作業信息(BackgroundJobInfo),是由 IBackgroundJobStore 提供的。

var store = scope.ServiceProvider.GetRequiredService<IBackgroundJobStore>();    var waitingJobs = store.GetWaitingJobs(WorkerOptions.MaxJobFetchCount);    // 不存在任何後台作業,則直接結束本次調用。  if (!waitingJobs.Any())  {      return;  }

InMemoryBackgroundJobStore 的相關實現:

public List<BackgroundJobInfo> GetWaitingJobs(int maxResultCount)  {      return _jobs.Values          .Where(t => !t.IsAbandoned && t.NextTryTime <= Clock.Now)          .OrderByDescending(t => t.Priority)          .ThenBy(t => t.TryCount)          .ThenBy(t => t.NextTryTime)          .Take(maxResultCount)          .ToList();  }  

上面的代碼可以看出來,首先排除 被放棄的任務 ,包含達到執行時間的任務,然後根據任務的優先級從高到低進行排序。重試次數少的優先執行,預計執行時間越早的越先執行。最後從這些數據中,篩選出 maxResultCount 結果並返回。

說到這裡,我們來看一下這個 NextTryTime 是如何被計算出來的?回想起最開始的後台作業管理器,我們在添加一個後台任務的時候,就會設置這個後台任務的 預計執行時間。第一個任務被添加到執行隊列中時,它的值一般是 Clock.Now ,也就是它被添加到隊列的時間。

不過 ABP vNext 為了讓那些經常執行失敗的任務,有比較低的優先級再執行,就在每次任務執行失敗之後,會將 NextTryTime 的值指數級進行增加。這塊代碼可以在 CalculateNextTryTime 裏面看到,也就是說某個任務的執行 失敗次數越高,那麼它下一次的預期執行時間就會越遠。

protected virtual DateTime? CalculateNextTryTime(BackgroundJobInfo jobInfo, IClock clock)  {      // 一般來說,這個 DefaultWaitFactor 因子的值是 2.0 。      var nextWaitDuration = WorkerOptions.DefaultFirstWaitDuration * (Math.Pow(WorkerOptions.DefaultWaitFactor, jobInfo.TryCount - 1)); // 同執行失敗的次數進行掛鈎。      var nextTryDate = jobInfo.LastTryTime?.AddSeconds(nextWaitDuration) ??                          clock.Now.AddSeconds(nextWaitDuration);        if (nextTryDate.Subtract(jobInfo.CreationTime).TotalSeconds > WorkerOptions.DefaultTimeout)      {          return null;      }        return nextTryDate;  }  

當預期的執行時間都超過 DefaultTimeout 的超時時間時(默認為 2 天),說明這個任務確實沒救了,就不要再執行了。

我們之前說到,從 IBackgroundJobStore 拿到了需要執行的後台任務信息集合,接下來我們就要開始執行後台任務了。

foreach (var jobInfo in waitingJobs)  {      jobInfo.TryCount++;      jobInfo.LastTryTime = clock.Now;        try      {          // 根據任務名稱獲取任務的配置參數。          var jobConfiguration = JobOptions.GetJob(jobInfo.JobName);          // 根據配置裏面存儲的任務類型,將參數值進行反序列化。          var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType);          // 構造一個新的執行上下文,讓執行器執行任務。          var context = new JobExecutionContext(scope.ServiceProvider, jobConfiguration.JobType, jobArgs);            try          {              jobExecuter.Execute(context);                // 如果任務執行成功則刪除該任務。              store.Delete(jobInfo.Id);          }          catch (BackgroundJobExecutionException)          {              // 發生任務執行失敗異常時,根據指定的公式計算下一次的執行時間。              var nextTryTime = CalculateNextTryTime(jobInfo, clock);                if (nextTryTime.HasValue)              {                  jobInfo.NextTryTime = nextTryTime.Value;              }              else              {                  // 超過超時時間的時候,公式計算函數返回 null,該任務置為廢棄任務。                  jobInfo.IsAbandoned = true;              }                TryUpdate(store, jobInfo);          }      }      catch (Exception ex)      {          // 執行過程中,產生了未知異常,設置為廢棄任務,並打印日誌。          Logger.LogException(ex);          jobInfo.IsAbandoned = true;          TryUpdate(store, jobInfo);      }  }  

執行後台任務的時候基本分為 5 步,它們分別是:

  1. 獲得任務關聯的配置參數,默認不用提供,因為在之前模塊初始化的時候就已經配置了(你也可以顯式指定)。
  2. 通過之前存儲的配置參數,將參數值反序列化出來,構造具體實例。
  3. 構造一個執行上下文。
  4. 後台任務執行器執行具體的後台任務。
  5. 成功則刪除任務,失敗則更新任務下次的執行狀態。

至於執行器裏面的真正執行操作,你都拿到了參數值和任務類型了。就可以通過類型用 IoC 獲取後台任務對象的實例,然後通過反射匹配方法簽名,在實例上調用這個方法傳入參數即可。

public virtual void Execute(JobExecutionContext context)  {      // 構造具體的後台作業實例對象。      var job = context.ServiceProvider.GetService(context.JobType);      if (job == null)      {          throw new AbpException("The job type is not registered to DI: " + context.JobType);      }        // 獲得需要執行的方法簽名。      var jobExecuteMethod = context.JobType.GetMethod(nameof(IBackgroundJob<object>.Execute));      if (jobExecuteMethod == null)      {          throw new AbpException($"Given job type does not implement {typeof(IBackgroundJob<>).Name}. The job type was: " + context.JobType);      }        try      {          // 直接通過 MethodInfo 的 Invoke 方法調用,傳入具體的實例對象和參數值即可。          jobExecuteMethod.Invoke(job, new[] { context.JobArgs });      }      catch (Exception ex)      {          Logger.LogException(ex);            // 如果是執行方法內的異常,則包裝進行處理,然後拋出。          throw new BackgroundJobExecutionException("A background job execution is failed. See inner exception for details.", ex)          {              JobType = context.JobType.AssemblyQualifiedName,              JobArgs = context.JobArgs          };      }  }  

集成 Hangfire

ABP vNext 對於 Hangfire 的集成代碼分佈在 Volo.Abp.HangFireVolo.Abp.BackgroundJobs.HangFire 模塊內部,前者是在模塊配置裏面,調用 Hangfire 庫的相關方法,注入組件到 IoC 容器當中。後者則是對後台作業進行了適配處理,替換了默認的 IBackgroundJobManager 實現。

AbpHangfireModule 模塊內部,通過工廠創建出來一個 BackgroudJobServer 實例,並將它的生命周期與應用程序的生命周期進行綁定,以便進行銷毀處理。

public class AbpHangfireModule : AbpModule  {      private BackgroundJobServer _backgroundJobServer;        public override void ConfigureServices(ServiceConfigurationContext context)      {          context.Services.AddHangfire(configuration =>          {              context.Services.ExecutePreConfiguredActions(configuration);          });      }        public override void OnApplicationInitialization(ApplicationInitializationContext context)      {          var options = context.ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value;          _backgroundJobServer = options.BackgroundJobServerFactory.Invoke(context.ServiceProvider);      }        public override void OnApplicationShutdown(ApplicationShutdownContext context)      {          //TODO: ABP may provide two methods for application shutdown: OnPreApplicationShutdown & OnApplicationShutdown          _backgroundJobServer.SendStop();          _backgroundJobServer.Dispose();      }  }  

我們直奔主題,看一下基於 Hangfire 的後台作業管理器是怎麼實現的。

public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency  {      public Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal,          TimeSpan? delay = null)      {          // 如果沒有延遲參數,則直接通過 Enqueue() 方法扔進執行對了。          if (!delay.HasValue)          {              return Task.FromResult(                  BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>(                      adapter => adapter.Execute(args)                  )              );          }          else          {              return Task.FromResult(                  BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>(                      adapter => adapter.Execute(args),                      delay.Value                  )              );          }      }  

上述代碼中使用 HangfireJobExecutionAdapter 進行了一個適配操作,因為 Hangfire 要將一個後台任務扔進隊列執行,不是用 TArgs 就能解決的。

轉到這個適配器定義,提供了一個 Execute(TArgs) 方法,當被添加到 Hangfire 隊列執行的時候。實際 Hangfire 會調用適配器的 Excetue(TArgs) 方法,然後內部還是使用的 IBackgroundJobExecuter 來執行具體定義的任務。

public class HangfireJobExecutionAdapter<TArgs>  {      protected AbpBackgroundJobOptions Options { get; }      protected IServiceScopeFactory ServiceScopeFactory { get; }      protected IBackgroundJobExecuter JobExecuter { get; }        public HangfireJobExecutionAdapter(          IOptions<AbpBackgroundJobOptions> options,          IBackgroundJobExecuter jobExecuter,          IServiceScopeFactory serviceScopeFactory)      {          JobExecuter = jobExecuter;          ServiceScopeFactory = serviceScopeFactory;          Options = options.Value;      }        public void Execute(TArgs args)      {          using (var scope = ServiceScopeFactory.CreateScope())          {              var jobType = Options.GetJob(typeof(TArgs)).JobType;              var context = new JobExecutionContext(scope.ServiceProvider, jobType, args);              JobExecuter.Execute(context);          }      }  }  

集成 RabbitMQ

基於 RabbitMQ 的後台作業實現,我想放在分佈式事件總線裏面,對其一起進行講解。

三、總結

ABP vNext 為我們提供了多種後台作業管理器的實現,你可以根據自己的需求選用不同的後台作業管理器,又或者是自己動手造輪子。

需要看其他的 ABP vNext 相關文章?點擊我 即可跳轉到總目錄。