[Abp vNext 源碼分析] – 12. 後台作業與後台工作者
- 2019 年 10 月 25 日
- 筆記
一、簡要說明
文章資訊:
基於的 ABP vNext 版本:1.0.0
創作日期:2019 年 10 月 24 日晚
更新日期:暫無
ABP vNext 提供了後台工作者和後台作業的支援,基本實現與原來的 ABP 框架類似,並且 ABP vNext 還提供了對 HangFire 和 RabbitMQ 的後台作業集成。開發人員在使用這些第三方庫的時候,基本就是開箱即用,不需要做其他複雜的配置。
後台作業在系統開發的過程當中,是比較常用的功能。因為總是有一些長耗時的任務,而這些任務我們不是立即響應的,例如 Excel 文檔導入、批量發送簡訊通知等。
後台工作者 的話,ABP vNext 的實現就是在 CLR 的 Timer
之上封裝了一層,周期性地執行用戶邏輯。ABP vNext 默認提供的 後台任務管理器,就是在後台工作者基礎之上進行的封裝。
涉及到後台任務、後台工作者的模組一共有 6 個,它們分別是:
- Volo.Abp.Threading :提供了一些常用的執行緒組件,其中
AbpTimer
就是在裡面實現的。 -
Volo.Abp.BackgroundWorkers :後台工作者的定義和實現。
- Volo.Abp.BackgroundJobs.Abstractions :後台任務的一些共有定義。
- Volo.Abp.BackgroundJobs :默認的後台任務管理器實現。
- Volo.Abp.BackgroundJobs.HangFire :基於 Hangfire 庫實現的後台任務管理器。
-
Volo.Abp.BackgroundJobs.RabbitMQ : 基於 RabbitMQ 實現的後台任務管理器。
二、源碼分析
執行緒組件
健壯的計時器
CLR 為我們提供了多種計時器,我們一般使用的是 System.Threading.Timer
,它是基於 CLR 執行緒池的一個周期計時器,會根據我們配置的 Period
(周期) 定時執行。在 CLR 執行緒池中,所有的 Timer
只有 1 個執行緒為其服務。這個執行緒直到下一個計時器的觸發時間,當下一個 Timer
對象到期時,這個執行緒就會將 Timer
的回調方法通過 ThreadPool.QueueUserWorkItem()
扔到執行緒池去執行。
不過這帶來了一個問題,即你的回調方法執行時間超過了計時器的周期,那麼就會造成上一個任務還沒執行完成又開始執行新的任務。
解決這個方法其實很簡單,即啟動之後,將周期設置為 Timeout.Infinite
,這樣只會執行一次。當回調方法執行完成之後,就設置 dueTime
參數說明下次執行要等待多久,並且周期還是 Timeout.Infinite
。
ABP vNext 已經為我們提供了健壯的計時器,該類型的定義是 AbpTimer
,在內部用到了 volatile
關鍵字和 Monitor
實現 條件變數模式 解決多執行緒環境下的問題。
public class AbpTimer : ITransientDependency { // 回調事件。 public event EventHandler Elapsed; // 執行周期。 public int Period { get; set; } // 定時器啟動之後就開始運行,默認為 Fasle。 public bool RunOnStart { get; set; } // 日誌記錄器。 public ILogger<AbpTimer> Logger { get; set; } private readonly Timer _taskTimer; // 定時器是否在執行任務,默認為 false。 private volatile bool _performingTasks; // 定時器的運行狀態,默認為 false。 private volatile bool _isRunning; public AbpTimer() { Logger = NullLogger<AbpTimer>.Instance; // 回調函數是 TimerCallBack,執行周期為永不執行。 _taskTimer = new Timer(TimerCallBack, null, Timeout.Infinite, Timeout.Infinite); } public void Start(CancellationToken cancellationToken = default) { // 如果傳遞的周期小於等於 0 ,則拋出異常。 if (Period <= 0) { throw new AbpException("Period should be set before starting the timer!"); } // 使用互斥鎖,保證執行緒安全。 lock (_taskTimer) { // 如果啟動之後就需要馬上執行,則設置為 0,馬上執行任務,否則會等待 Period 毫秒之後再執行(1 個周期)。 _taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite); // 定時器成功運行了。 _isRunning = true; } // 釋放 _taskTimer 的互斥鎖。 } public void Stop(CancellationToken cancellationToken = default) { // 使用互斥鎖。 lock (_taskTimer) { // 將內部定時器設置為永不執行的狀態。 _taskTimer.Change(Timeout.Infinite, Timeout.Infinite); // 檢測當前是否還有正在執行的任務,如果有則等待任務執行完成。 while (_performingTasks) { // 臨時釋放鎖,阻塞當前執行緒。但是其他執行緒可以獲取 _timer 的互斥鎖。 Monitor.Wait(_taskTimer); } // 需要表示停止狀態,所以標記狀態為 false。 _isRunning = false; } } private void TimerCallBack(object state) { lock (_taskTimer) { // 如果有任務正在運行,或者內部定時器已經停止了,則不做任何事情。 if (!_isRunning || _performingTasks) { return; } // 臨時停止內部定時器。 _taskTimer.Change(Timeout.Infinite, Timeout.Infinite); // 表明馬上需要執行任務了。 _performingTasks = true; } try { // 調用綁定的事件。 Elapsed.InvokeSafely(this, new EventArgs()); } catch { // 注意,這裡將會吞噬異常。 } finally { lock (_taskTimer) { // 任務執行完成,更改狀態。 _performingTasks = false; // 如果定時器還在運行,沒有被停止,則啟動下一個 Period 周期。 if (_isRunning) { _taskTimer.Change(Period, Timeout.Infinite); } // 解除因為釋放鎖而阻塞的執行緒。 // 如果已經調用了 Stop,則會喚醒那個因為 Wait 阻塞的執行緒,就會使 _isRunning 置為 false。 Monitor.Pulse(_taskTimer); } } } }
這裡對 _performingTasks
和 _isRunning
欄位設置為 volatile
防止指令重排和暫存器快取。這是因為在 Stop
方法內部使用到的 _performingTasks
可能會被優化,所以將該欄位設置為了易失的。
IRunnable
介面
ABP vNext 為任務的啟動和停止,抽象了一個 IRunnable
介面。雖然描述說的是對執行緒的行為進行抽象,但千萬千萬不要手動調用 Thread.Abort()
。關於 Thread.Abort()
的壞處,這裡不再多加贅述,可以參考 這篇文章 的描述,或者搜索其他的相關文章。
public interface IRunnable { // 啟動這個服務。 Task StartAsync(CancellationToken cancellationToken = default); /// <summary> /// 停止這個服務。 /// </summary> Task StopAsync(CancellationToken cancellationToken = default); }
後台工作者
模組的構造
後台工作者的模組行為比較簡單,它定義了在應用程式初始化和銷毀時的行為。在初始化時,後台工作者管理器 獲得所有 後台工作者,並開始啟動它們。在銷毀時,後台工作者管理器獲得所有後台工作者,並開始停止他們,這樣才能夠做到優雅退出。
[DependsOn( typeof(AbpThreadingModule) )] public class AbpBackgroundWorkersModule : AbpModule { public override void OnApplicationInitialization(ApplicationInitializationContext context) { var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value; // 如果啟用了後台工作者,那麼獲得後台工作者管理器的實例,並調用 StartAsync 啟動所有後台工作者。 if (options.IsEnabled) { AsyncHelper.RunSync( () => context.ServiceProvider .GetRequiredService<IBackgroundWorkerManager>() .StartAsync() ); } } public override void OnApplicationShutdown(ApplicationShutdownContext context) { var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value; // 如果啟用了後台工作者,那麼獲得後台工作者管理器的實例,並調用 StopAsync 停止所有後台工作者。 if (options.IsEnabled) { AsyncHelper.RunSync( () => context.ServiceProvider .GetRequiredService<IBackgroundWorkerManager>() .StopAsync() ); } } }
後台工作者的定義
首先看看 IBackgroundWorker
介面的定義,是空的。不過繼承了 ISingletonDependency
介面,說明我們的每個後台工作者都是 單例 的。
/// <summary> /// 在後台運行,執行某些任務的工作程式(執行緒)的介面定義。 /// </summary> public interface IBackgroundWorker : IRunnable, ISingletonDependency { }
ABP vNext 為我們定義了一個抽象的後台工作者類型 BackgroundWorkerBase
,這個基類的設計目的是提供一些常用組件(和 ApplicationService
一樣)。
public abstract class BackgroundWorkerBase : IBackgroundWorker { //TODO: Add UOW, Localization and other useful properties..? //TODO: 是否應該提供工作單元、本地化以及其他常用的屬性? public ILogger<BackgroundWorkerBase> Logger { protected get; set; } protected BackgroundWorkerBase() { Logger = NullLogger<BackgroundWorkerBase>.Instance; } public virtual Task StartAsync(CancellationToken cancellationToken = default) { Logger.LogDebug("Started background worker: " + ToString()); return Task.CompletedTask; } public virtual Task StopAsync(CancellationToken cancellationToken = default) { Logger.LogDebug("Stopped background worker: " + ToString()); return Task.CompletedTask; } public override string ToString() { return GetType().FullName; } }
ABP vNext 內部只有一個默認的後台工作者實現 PeriodicBackgroundWorkerBase
。從名字上來看,意思是就是周期執行的後台工作者,內部就是用的 AbpTimer
來實現,ABP vNext 將其包裝起來是為了實現統一的模式(後台工作者)。
public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase { protected readonly AbpTimer Timer; // 也就意味著子類必須在其構造函數,指定 timer 的執行周期。 protected PeriodicBackgroundWorkerBase(AbpTimer timer) { Timer = timer; Timer.Elapsed += Timer_Elapsed; } // 啟動後台工作者。 public override async Task StartAsync(CancellationToken cancellationToken = default) { await base.StartAsync(cancellationToken); Timer.Start(cancellationToken); } // 停止後台工作者。 public override async Task StopAsync(CancellationToken cancellationToken = default) { Timer.Stop(cancellationToken); await base.StopAsync(cancellationToken); } // Timer 關聯的周期事件,之所以不直接掛載 DoWork,是為了捕獲異常。 private void Timer_Elapsed(object sender, System.EventArgs e) { try { DoWork(); } catch (Exception ex) { Logger.LogException(ex); } } // 你要周期執行的任務。 protected abstract void DoWork(); }
我們如果要實現自己的後台工作者,只需要繼承該類,實現 DoWork()
方法即可。
public class TestWorker : PeriodicBackgroundWorkerBase { public TestWorker(AbpTimer timer) : base(timer) { // 每五分鐘執行一次。 timer.Period = (int)TimeSpan.FromMinutes(5).TotalMilliseconds; } protected override void DoWork() { Console.WriteLine("後台工作者被執行了。"); } }
然後在我們自己模組的 OnPreApplicationInitialization()
方法內解析出後台作業管理器(IBackgroundWorkerManager
),調用它的 Add()
方法,將我們定義的 TestWorker
添加到管理器當中即可。
後台工作者管理器
所有的後台工作者都是通過 IBackgroundWorkerManager
進行管理的,它提供了 StartAsync()
、StopAsync()
、Add()
方法。前面兩個方法就是 IRunnable
介面定義的,後台工作者管理器直接集成了該介面,後面的 Add()
方法就是用來動態添加我們的後台工作者。
後台工作者管理器的默認實現是 BackgroundWorkerManager
類型,它內部做的事情很簡單,就是維護一個後台工作者集合。每當調用 StartAsync()
或 StopAsync()
方法的時候,都從這個集合遍歷後台工作者,執行他們的啟動和停止方法。
這裡值得注意的一點是,當我們調用 Add()
方法添加了一個後台工作者之後,後台工作者管理器就會啟動這個後台工作者。
public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency, IDisposable { protected bool IsRunning { get; private set; } private bool _isDisposed; private readonly List<IBackgroundWorker> _backgroundWorkers; public BackgroundWorkerManager() { _backgroundWorkers = new List<IBackgroundWorker>(); } public virtual void Add(IBackgroundWorker worker) { _backgroundWorkers.Add(worker); // 如果當前後台工作者管理器還處於運行狀態,則調用工作者的 StartAsync() 方法啟動。 if (IsRunning) { AsyncHelper.RunSync( () => worker.StartAsync() ); } } public virtual void Dispose() { if (_isDisposed) { return; } _isDisposed = true; //TODO: ??? } // 啟動,則遍歷集合啟動。 public virtual async Task StartAsync(CancellationToken cancellationToken = default) { IsRunning = true; foreach (var worker in _backgroundWorkers) { await worker.StartAsync(cancellationToken); } } // 停止, 則遍歷集合停止。 public virtual async Task StopAsync(CancellationToken cancellationToken = default) { IsRunning = false; foreach (var worker in _backgroundWorkers) { await worker.StopAsync(cancellationToken); } } }
上述程式碼其實存在一個問題,即後台工作者被釋放以後,是否還能執行 Add()
操作。參考我 之前的文章 ,其實當對象被釋放之後,就應該拋出 ObjectDisposedException
異常。
後台作業
比起後台工作者,我們執行一次性任務的時候,一般會使用後台作業進行處理。比起只能設置固定周期的 PeriodicBackgroundWorkerBase
,集成了 Hangfire 的後台作業管理器,能夠讓我們使用 Cron 表達式,更加靈活地設置任務的執行周期。
模組的構造
關於後台作業的模組,我們需要說道兩處。第一處是位於 Volo.Abp.BackgroundJobs.Abstractions 項目的 AbpBackgroundJobsAbstractionsModule
,第二出則是位於 Volo.Abp.BackgroundJobs 項目的 AbpBackgroundJobsModule
。
AbpBackgroundJobsAbstractionsModule
的主要行為是將符合條件的後台作業,添加到 AbpBackgroundJobOptions
配置當中,以便後續進行使用。
public override void PreConfigureServices(ServiceConfigurationContext context) { RegisterJobs(context.Services); } private static void RegisterJobs(IServiceCollection services) { var jobTypes = new List<Type>(); // 如果註冊的類型符合 IBackgroundJob<> 泛型,則添加到集合當中。 services.OnRegistred(context => { if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IBackgroundJob<>))) { jobTypes.Add(context.ImplementationType); } }); services.Configure<AbpBackgroundJobOptions>(options => { // 將數據賦值給配置類。 foreach (var jobType in jobTypes) { options.AddJob(jobType); } }); }
Volo.Abp.BackgroundJobs 內部是 ABP vNext 為我們提供的 默認後台作業管理器,這個後台作業管理器 本質上是一個後台工作者。
這個後台工作者會周期性(取決於 AbpBackgroundJobWorkerOptions.JobPollPeriod
值,默認為 5 秒種)地從 IBackgroundJobStore
撈出一堆後台任務,並且在後台執行。至於每次執行多少個後台任務,這也取決於 AbpBackgroundJobWorkerOptions.MaxJobFetchCount
的值,默認值是 1000 個。
注意:
這裡的 Options 類是
AbpBackgroundJobWorkerOptions
,別和AbpBackgroundWorkerOptions
混淆了。
所以在 AbpBackgroundJobsModule
模組裡面,只做了一件事情,就是將負責後台作業的後台工作者,添加到後台工作者管理器種,並開始周期性地執行。
public override void OnApplicationInitialization(ApplicationInitializationContext context) { var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value; if (options.IsJobExecutionEnabled) { // 獲得後台工作者管理器,並將負責後台作業的工作者添加進去。 context.ServiceProvider .GetRequiredService<IBackgroundWorkerManager>() .Add(context.ServiceProvider.GetRequiredService<IBackgroundJobWorker>() ); } }
後台作業的定義
在上一節裡面看到,只要是實現 IBackgroundJob<TArgs>
類型的都視為一個後台作業。這個後台作業介面,只定義了一個行為,那就是執行(Execute(TArgs)
)。這裡的 TArgs
泛型作為執行後台作業時,需要傳遞的參數類型。
// 因為是傳入的參數,所以泛型參數是逆變的。 public interface IBackgroundJob<in TArgs> { void Execute(TArgs args); }
檢查源碼,發現 ABP vNext 的郵箱模組定義了一個郵件發送任務 BackgroundEmailSendingJob
,它的實現大概如下。
public class BackgroundEmailSendingJob : BackgroundJob<BackgroundEmailSendingJobArgs>, ITransientDependency { // ... public override void Execute(BackgroundEmailSendingJobArgs args) { AsyncHelper.RunSync(() => EmailSender.SendAsync(args.To, args.Subject, args.Body, args.IsBodyHtml)); } }
後台作業管理器
後台作業都是通過一個後台作業管理器(IBackgroundJobManager
)進行管理的,這個介面定義了一個入隊方法(EnqueueAsync()
),注意,我們的後台作業在入隊後,不是馬上執行的。
說一下這個入隊處理邏輯:
- 首先我們會通過參數的類型,獲取到任務的名稱。(假設任務上面沒有標註
BackgroundJobNameAttribute
特性,那麼任務的名稱就是參數類型的FullName
。) - 構造一個
BackgroundJobInfo
對象。 - 通過
IBackgroundJobStore
持久化任務資訊。
public virtual async Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null) { // 獲取任務名稱。 var jobName = BackgroundJobNameAttribute.GetName<TArgs>(); var jobId = await EnqueueAsync(jobName, args, priority, delay); return jobId.ToString(); } protected virtual async Task<Guid> EnqueueAsync(string jobName, object args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null) { var jobInfo = new BackgroundJobInfo { Id = GuidGenerator.Create(), JobName = jobName, // 通過序列化器,序列化參數值,方便存儲。這裡內部其實使用的是 JSON.NET 進行序列化。 JobArgs = Serializer.Serialize(args), Priority = priority, CreationTime = Clock.Now, NextTryTime = Clock.Now }; // 如果任務有執行延遲,則任務的初始執行時間要加上這個延遲。 if (delay.HasValue) { jobInfo.NextTryTime = Clock.Now.Add(delay.Value); } // 持久化任務資訊,方便後面執行後台作業的工作者能夠取到。 await Store.InsertAsync(jobInfo); return jobInfo.Id; }
BackgroundJobNameAttribute
相關的方法:
public static string GetName<TJobArgs>() { return GetName(typeof(TJobArgs)); } public static string GetName([NotNull] Type jobArgsType) { Check.NotNull(jobArgsType, nameof(jobArgsType)); // 判斷參數類型上面是否標註了特性,並且特性實現了 IBackgroundJobNameProvider 介面。 return jobArgsType .GetCustomAttributes(true) .OfType<IBackgroundJobNameProvider>() .FirstOrDefault() ?.Name // 拿不到名字,則使用類型的 FullName。 ?? jobArgsType.FullName; }
後台作業的存儲
後台作業的存儲默認是放在記憶體的,這點可以從 InMemoryBackgroundJobStore
類型實現看出來。在它的內部使用了一個並行字典,通過作業的 Guid 與作業進行關聯綁定。
除了記憶體實現,在 Volo.Abp.BackgroundJobs.Domain 模組還有一個 BackgroundJobStore
實現,基本套路與 SettingStore
一樣,都是存儲到資料庫裡面。
public class BackgroundJobStore : IBackgroundJobStore, ITransientDependency { protected IBackgroundJobRepository BackgroundJobRepository { get; } // ... public BackgroundJobInfo Find(Guid jobId) { return ObjectMapper.Map<BackgroundJobRecord, BackgroundJobInfo>( BackgroundJobRepository.Find(jobId) ); } // ... public void Insert(BackgroundJobInfo jobInfo) { BackgroundJobRepository.Insert( ObjectMapper.Map<BackgroundJobInfo, BackgroundJobRecord>(jobInfo) ); } // ... }
後台作業的執行
默認的後台作業管理器是通過一個後台工作者來執行後台任務的,這個實現叫做 BackgroundJobWorker
,這個後台工作者的生命周期也是單例的。後台作業的具體執行邏輯裡面,涉及到了以下幾個類型的交互。
類型 | 作用 |
---|---|
AbpBackgroundJobOptions |
提供每個後台任務的配置資訊,包括任務的類型、參數類型、任務名稱數據。 |
AbpBackgroundJobWorkerOptions |
提供後台作業工作者的配置資訊,例如每個周期 最大執行的作業數量、後台 工作者的 執行周期、作業執行 超時時間 等。 |
BackgroundJobConfiguration |
後台任務的配置資訊,作用是將持久化存儲的作業資訊與運行時類型進行綁定 和實例化,以便 ABP vNext 來執行具體的任務。 |
IBackgroundJobExecuter |
後台作業的執行器,當我們從持久化存儲獲取到後台作業資訊時,將會通過 這個執行器來執行具體的後台作業。 |
IBackgroundJobSerializer |
後台作業序列化器,用於後台作業持久化時進行序列化的工具,默認採用的 是 JSON.NET 進行實現。 |
JobExecutionContext |
執行器在執行後台作業時,是通過這個上下文參數進行執行的,在這個上下 文內部,包含了後台作業的具體類型、後台作業的參數值。 |
IBackgroundJobStore |
前面已經講過了,這個是用於後台作業的持久化存儲,默認實現是存儲在記憶體。 |
BackgroundJobPriority |
後台作業的執行優先順序定義,ABP vNext 在執行後台任務時,會根據任務的優 先級進行排序,以便在後面執行的時候優先順序高的任務先執行。 |
我們來按照邏輯順序走一遍它的實現,首先後台作業的執行工作者會從持久化存儲內,獲取 MaxJobFetchCount
個任務用於執行。從持久化存儲獲取後台作業資訊(BackgroundJobInfo
),是由 IBackgroundJobStore
提供的。
var store = scope.ServiceProvider.GetRequiredService<IBackgroundJobStore>(); var waitingJobs = store.GetWaitingJobs(WorkerOptions.MaxJobFetchCount); // 不存在任何後台作業,則直接結束本次調用。 if (!waitingJobs.Any()) { return; }
InMemoryBackgroundJobStore
的相關實現:
public List<BackgroundJobInfo> GetWaitingJobs(int maxResultCount) { return _jobs.Values .Where(t => !t.IsAbandoned && t.NextTryTime <= Clock.Now) .OrderByDescending(t => t.Priority) .ThenBy(t => t.TryCount) .ThenBy(t => t.NextTryTime) .Take(maxResultCount) .ToList(); }
上面的程式碼可以看出來,首先排除 被放棄的任務 ,包含達到執行時間的任務,然後根據任務的優先順序從高到低進行排序。重試次數少的優先執行,預計執行時間越早的越先執行。最後從這些數據中,篩選出 maxResultCount
結果並返回。
說到這裡,我們來看一下這個 NextTryTime
是如何被計算出來的?回想起最開始的後台作業管理器,我們在添加一個後台任務的時候,就會設置這個後台任務的 預計執行時間。第一個任務被添加到執行隊列中時,它的值一般是 Clock.Now
,也就是它被添加到隊列的時間。
不過 ABP vNext 為了讓那些經常執行失敗的任務,有比較低的優先順序再執行,就在每次任務執行失敗之後,會將 NextTryTime
的值指數級進行增加。這塊程式碼可以在 CalculateNextTryTime
裡面看到,也就是說某個任務的執行 失敗次數越高,那麼它下一次的預期執行時間就會越遠。
protected virtual DateTime? CalculateNextTryTime(BackgroundJobInfo jobInfo, IClock clock) { // 一般來說,這個 DefaultWaitFactor 因子的值是 2.0 。 var nextWaitDuration = WorkerOptions.DefaultFirstWaitDuration * (Math.Pow(WorkerOptions.DefaultWaitFactor, jobInfo.TryCount - 1)); // 同執行失敗的次數進行掛鉤。 var nextTryDate = jobInfo.LastTryTime?.AddSeconds(nextWaitDuration) ?? clock.Now.AddSeconds(nextWaitDuration); if (nextTryDate.Subtract(jobInfo.CreationTime).TotalSeconds > WorkerOptions.DefaultTimeout) { return null; } return nextTryDate; }
當預期的執行時間都超過 DefaultTimeout
的超時時間時(默認為 2 天),說明這個任務確實沒救了,就不要再執行了。
我們之前說到,從 IBackgroundJobStore
拿到了需要執行的後台任務資訊集合,接下來我們就要開始執行後台任務了。
foreach (var jobInfo in waitingJobs) { jobInfo.TryCount++; jobInfo.LastTryTime = clock.Now; try { // 根據任務名稱獲取任務的配置參數。 var jobConfiguration = JobOptions.GetJob(jobInfo.JobName); // 根據配置裡面存儲的任務類型,將參數值進行反序列化。 var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType); // 構造一個新的執行上下文,讓執行器執行任務。 var context = new JobExecutionContext(scope.ServiceProvider, jobConfiguration.JobType, jobArgs); try { jobExecuter.Execute(context); // 如果任務執行成功則刪除該任務。 store.Delete(jobInfo.Id); } catch (BackgroundJobExecutionException) { // 發生任務執行失敗異常時,根據指定的公式計算下一次的執行時間。 var nextTryTime = CalculateNextTryTime(jobInfo, clock); if (nextTryTime.HasValue) { jobInfo.NextTryTime = nextTryTime.Value; } else { // 超過超時時間的時候,公式計算函數返回 null,該任務置為廢棄任務。 jobInfo.IsAbandoned = true; } TryUpdate(store, jobInfo); } } catch (Exception ex) { // 執行過程中,產生了未知異常,設置為廢棄任務,並列印日誌。 Logger.LogException(ex); jobInfo.IsAbandoned = true; TryUpdate(store, jobInfo); } }
執行後台任務的時候基本分為 5 步,它們分別是:
- 獲得任務關聯的配置參數,默認不用提供,因為在之前模組初始化的時候就已經配置了(你也可以顯式指定)。
- 通過之前存儲的配置參數,將參數值反序列化出來,構造具體實例。
- 構造一個執行上下文。
- 後台任務執行器執行具體的後台任務。
- 成功則刪除任務,失敗則更新任務下次的執行狀態。
至於執行器裡面的真正執行操作,你都拿到了參數值和任務類型了。就可以通過類型用 IoC 獲取後台任務對象的實例,然後通過反射匹配方法簽名,在實例上調用這個方法傳入參數即可。
public virtual void Execute(JobExecutionContext context) { // 構造具體的後台作業實例對象。 var job = context.ServiceProvider.GetService(context.JobType); if (job == null) { throw new AbpException("The job type is not registered to DI: " + context.JobType); } // 獲得需要執行的方法簽名。 var jobExecuteMethod = context.JobType.GetMethod(nameof(IBackgroundJob<object>.Execute)); if (jobExecuteMethod == null) { throw new AbpException($"Given job type does not implement {typeof(IBackgroundJob<>).Name}. The job type was: " + context.JobType); } try { // 直接通過 MethodInfo 的 Invoke 方法調用,傳入具體的實例對象和參數值即可。 jobExecuteMethod.Invoke(job, new[] { context.JobArgs }); } catch (Exception ex) { Logger.LogException(ex); // 如果是執行方法內的異常,則包裝進行處理,然後拋出。 throw new BackgroundJobExecutionException("A background job execution is failed. See inner exception for details.", ex) { JobType = context.JobType.AssemblyQualifiedName, JobArgs = context.JobArgs }; } }
集成 Hangfire
ABP vNext 對於 Hangfire 的集成程式碼分布在 Volo.Abp.HangFire 和 Volo.Abp.BackgroundJobs.HangFire 模組內部,前者是在模組配置裡面,調用 Hangfire 庫的相關方法,注入組件到 IoC 容器當中。後者則是對後台作業進行了適配處理,替換了默認的 IBackgroundJobManager
實現。
在 AbpHangfireModule
模組內部,通過工廠創建出來一個 BackgroudJobServer
實例,並將它的生命周期與應用程式的生命周期進行綁定,以便進行銷毀處理。
public class AbpHangfireModule : AbpModule { private BackgroundJobServer _backgroundJobServer; public override void ConfigureServices(ServiceConfigurationContext context) { context.Services.AddHangfire(configuration => { context.Services.ExecutePreConfiguredActions(configuration); }); } public override void OnApplicationInitialization(ApplicationInitializationContext context) { var options = context.ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value; _backgroundJobServer = options.BackgroundJobServerFactory.Invoke(context.ServiceProvider); } public override void OnApplicationShutdown(ApplicationShutdownContext context) { //TODO: ABP may provide two methods for application shutdown: OnPreApplicationShutdown & OnApplicationShutdown _backgroundJobServer.SendStop(); _backgroundJobServer.Dispose(); } }
我們直奔主題,看一下基於 Hangfire 的後台作業管理器是怎麼實現的。
public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency { public Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null) { // 如果沒有延遲參數,則直接通過 Enqueue() 方法扔進執行對了。 if (!delay.HasValue) { return Task.FromResult( BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>( adapter => adapter.Execute(args) ) ); } else { return Task.FromResult( BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>( adapter => adapter.Execute(args), delay.Value ) ); } }
上述程式碼中使用 HangfireJobExecutionAdapter
進行了一個適配操作,因為 Hangfire 要將一個後台任務扔進隊列執行,不是用 TArgs
就能解決的。
轉到這個適配器定義,提供了一個 Execute(TArgs)
方法,當被添加到 Hangfire 隊列執行的時候。實際 Hangfire 會調用適配器的 Excetue(TArgs)
方法,然後內部還是使用的 IBackgroundJobExecuter
來執行具體定義的任務。
public class HangfireJobExecutionAdapter<TArgs> { protected AbpBackgroundJobOptions Options { get; } protected IServiceScopeFactory ServiceScopeFactory { get; } protected IBackgroundJobExecuter JobExecuter { get; } public HangfireJobExecutionAdapter( IOptions<AbpBackgroundJobOptions> options, IBackgroundJobExecuter jobExecuter, IServiceScopeFactory serviceScopeFactory) { JobExecuter = jobExecuter; ServiceScopeFactory = serviceScopeFactory; Options = options.Value; } public void Execute(TArgs args) { using (var scope = ServiceScopeFactory.CreateScope()) { var jobType = Options.GetJob(typeof(TArgs)).JobType; var context = new JobExecutionContext(scope.ServiceProvider, jobType, args); JobExecuter.Execute(context); } } }
集成 RabbitMQ
基於 RabbitMQ 的後台作業實現,我想放在分散式事件匯流排裡面,對其一起進行講解。
三、總結
ABP vNext 為我們提供了多種後台作業管理器的實現,你可以根據自己的需求選用不同的後台作業管理器,又或者是自己動手造輪子。
需要看其他的 ABP vNext 相關文章?點擊我 即可跳轉到總目錄。