[Abp vNext 源码分析] – 12. 后台作业与后台工作者
- 2019 年 10 月 25 日
- 笔记
一、简要说明
文章信息:
基于的 ABP vNext 版本:1.0.0
创作日期:2019 年 10 月 24 日晚
更新日期:暂无
ABP vNext 提供了后台工作者和后台作业的支持,基本实现与原来的 ABP 框架类似,并且 ABP vNext 还提供了对 HangFire 和 RabbitMQ 的后台作业集成。开发人员在使用这些第三方库的时候,基本就是开箱即用,不需要做其他复杂的配置。
后台作业在系统开发的过程当中,是比较常用的功能。因为总是有一些长耗时的任务,而这些任务我们不是立即响应的,例如 Excel 文档导入、批量发送短信通知等。
后台工作者 的话,ABP vNext 的实现就是在 CLR 的 Timer
之上封装了一层,周期性地执行用户逻辑。ABP vNext 默认提供的 后台任务管理器,就是在后台工作者基础之上进行的封装。
涉及到后台任务、后台工作者的模块一共有 6 个,它们分别是:
- Volo.Abp.Threading :提供了一些常用的线程组件,其中
AbpTimer
就是在里面实现的。 -
Volo.Abp.BackgroundWorkers :后台工作者的定义和实现。
- Volo.Abp.BackgroundJobs.Abstractions :后台任务的一些共有定义。
- Volo.Abp.BackgroundJobs :默认的后台任务管理器实现。
- Volo.Abp.BackgroundJobs.HangFire :基于 Hangfire 库实现的后台任务管理器。
-
Volo.Abp.BackgroundJobs.RabbitMQ : 基于 RabbitMQ 实现的后台任务管理器。
二、源码分析
线程组件
健壮的计时器
CLR 为我们提供了多种计时器,我们一般使用的是 System.Threading.Timer
,它是基于 CLR 线程池的一个周期计时器,会根据我们配置的 Period
(周期) 定时执行。在 CLR 线程池中,所有的 Timer
只有 1 个线程为其服务。这个线程直到下一个计时器的触发时间,当下一个 Timer
对象到期时,这个线程就会将 Timer
的回调方法通过 ThreadPool.QueueUserWorkItem()
扔到线程池去执行。
不过这带来了一个问题,即你的回调方法执行时间超过了计时器的周期,那么就会造成上一个任务还没执行完成又开始执行新的任务。
解决这个方法其实很简单,即启动之后,将周期设置为 Timeout.Infinite
,这样只会执行一次。当回调方法执行完成之后,就设置 dueTime
参数说明下次执行要等待多久,并且周期还是 Timeout.Infinite
。
ABP vNext 已经为我们提供了健壮的计时器,该类型的定义是 AbpTimer
,在内部用到了 volatile
关键字和 Monitor
实现 条件变量模式 解决多线程环境下的问题。
public class AbpTimer : ITransientDependency { // 回调事件。 public event EventHandler Elapsed; // 执行周期。 public int Period { get; set; } // 定时器启动之后就开始运行,默认为 Fasle。 public bool RunOnStart { get; set; } // 日志记录器。 public ILogger<AbpTimer> Logger { get; set; } private readonly Timer _taskTimer; // 定时器是否在执行任务,默认为 false。 private volatile bool _performingTasks; // 定时器的运行状态,默认为 false。 private volatile bool _isRunning; public AbpTimer() { Logger = NullLogger<AbpTimer>.Instance; // 回调函数是 TimerCallBack,执行周期为永不执行。 _taskTimer = new Timer(TimerCallBack, null, Timeout.Infinite, Timeout.Infinite); } public void Start(CancellationToken cancellationToken = default) { // 如果传递的周期小于等于 0 ,则抛出异常。 if (Period <= 0) { throw new AbpException("Period should be set before starting the timer!"); } // 使用互斥锁,保证线程安全。 lock (_taskTimer) { // 如果启动之后就需要马上执行,则设置为 0,马上执行任务,否则会等待 Period 毫秒之后再执行(1 个周期)。 _taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite); // 定时器成功运行了。 _isRunning = true; } // 释放 _taskTimer 的互斥锁。 } public void Stop(CancellationToken cancellationToken = default) { // 使用互斥锁。 lock (_taskTimer) { // 将内部定时器设置为永不执行的状态。 _taskTimer.Change(Timeout.Infinite, Timeout.Infinite); // 检测当前是否还有正在执行的任务,如果有则等待任务执行完成。 while (_performingTasks) { // 临时释放锁,阻塞当前线程。但是其他线程可以获取 _timer 的互斥锁。 Monitor.Wait(_taskTimer); } // 需要表示停止状态,所以标记状态为 false。 _isRunning = false; } } private void TimerCallBack(object state) { lock (_taskTimer) { // 如果有任务正在运行,或者内部定时器已经停止了,则不做任何事情。 if (!_isRunning || _performingTasks) { return; } // 临时停止内部定时器。 _taskTimer.Change(Timeout.Infinite, Timeout.Infinite); // 表明马上需要执行任务了。 _performingTasks = true; } try { // 调用绑定的事件。 Elapsed.InvokeSafely(this, new EventArgs()); } catch { // 注意,这里将会吞噬异常。 } finally { lock (_taskTimer) { // 任务执行完成,更改状态。 _performingTasks = false; // 如果定时器还在运行,没有被停止,则启动下一个 Period 周期。 if (_isRunning) { _taskTimer.Change(Period, Timeout.Infinite); } // 解除因为释放锁而阻塞的线程。 // 如果已经调用了 Stop,则会唤醒那个因为 Wait 阻塞的线程,就会使 _isRunning 置为 false。 Monitor.Pulse(_taskTimer); } } } }
这里对 _performingTasks
和 _isRunning
字段设置为 volatile
防止指令重排和寄存器缓存。这是因为在 Stop
方法内部使用到的 _performingTasks
可能会被优化,所以将该字段设置为了易失的。
IRunnable
接口
ABP vNext 为任务的启动和停止,抽象了一个 IRunnable
接口。虽然描述说的是对线程的行为进行抽象,但千万千万不要手动调用 Thread.Abort()
。关于 Thread.Abort()
的坏处,这里不再多加赘述,可以参考 这篇文章 的描述,或者搜索其他的相关文章。
public interface IRunnable { // 启动这个服务。 Task StartAsync(CancellationToken cancellationToken = default); /// <summary> /// 停止这个服务。 /// </summary> Task StopAsync(CancellationToken cancellationToken = default); }
后台工作者
模块的构造
后台工作者的模块行为比较简单,它定义了在应用程序初始化和销毁时的行为。在初始化时,后台工作者管理器 获得所有 后台工作者,并开始启动它们。在销毁时,后台工作者管理器获得所有后台工作者,并开始停止他们,这样才能够做到优雅退出。
[DependsOn( typeof(AbpThreadingModule) )] public class AbpBackgroundWorkersModule : AbpModule { public override void OnApplicationInitialization(ApplicationInitializationContext context) { var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value; // 如果启用了后台工作者,那么获得后台工作者管理器的实例,并调用 StartAsync 启动所有后台工作者。 if (options.IsEnabled) { AsyncHelper.RunSync( () => context.ServiceProvider .GetRequiredService<IBackgroundWorkerManager>() .StartAsync() ); } } public override void OnApplicationShutdown(ApplicationShutdownContext context) { var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value; // 如果启用了后台工作者,那么获得后台工作者管理器的实例,并调用 StopAsync 停止所有后台工作者。 if (options.IsEnabled) { AsyncHelper.RunSync( () => context.ServiceProvider .GetRequiredService<IBackgroundWorkerManager>() .StopAsync() ); } } }
后台工作者的定义
首先看看 IBackgroundWorker
接口的定义,是空的。不过继承了 ISingletonDependency
接口,说明我们的每个后台工作者都是 单例 的。
/// <summary> /// 在后台运行,执行某些任务的工作程序(线程)的接口定义。 /// </summary> public interface IBackgroundWorker : IRunnable, ISingletonDependency { }
ABP vNext 为我们定义了一个抽象的后台工作者类型 BackgroundWorkerBase
,这个基类的设计目的是提供一些常用组件(和 ApplicationService
一样)。
public abstract class BackgroundWorkerBase : IBackgroundWorker { //TODO: Add UOW, Localization and other useful properties..? //TODO: 是否应该提供工作单元、本地化以及其他常用的属性? public ILogger<BackgroundWorkerBase> Logger { protected get; set; } protected BackgroundWorkerBase() { Logger = NullLogger<BackgroundWorkerBase>.Instance; } public virtual Task StartAsync(CancellationToken cancellationToken = default) { Logger.LogDebug("Started background worker: " + ToString()); return Task.CompletedTask; } public virtual Task StopAsync(CancellationToken cancellationToken = default) { Logger.LogDebug("Stopped background worker: " + ToString()); return Task.CompletedTask; } public override string ToString() { return GetType().FullName; } }
ABP vNext 内部只有一个默认的后台工作者实现 PeriodicBackgroundWorkerBase
。从名字上来看,意思是就是周期执行的后台工作者,内部就是用的 AbpTimer
来实现,ABP vNext 将其包装起来是为了实现统一的模式(后台工作者)。
public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase { protected readonly AbpTimer Timer; // 也就意味着子类必须在其构造函数,指定 timer 的执行周期。 protected PeriodicBackgroundWorkerBase(AbpTimer timer) { Timer = timer; Timer.Elapsed += Timer_Elapsed; } // 启动后台工作者。 public override async Task StartAsync(CancellationToken cancellationToken = default) { await base.StartAsync(cancellationToken); Timer.Start(cancellationToken); } // 停止后台工作者。 public override async Task StopAsync(CancellationToken cancellationToken = default) { Timer.Stop(cancellationToken); await base.StopAsync(cancellationToken); } // Timer 关联的周期事件,之所以不直接挂载 DoWork,是为了捕获异常。 private void Timer_Elapsed(object sender, System.EventArgs e) { try { DoWork(); } catch (Exception ex) { Logger.LogException(ex); } } // 你要周期执行的任务。 protected abstract void DoWork(); }
我们如果要实现自己的后台工作者,只需要继承该类,实现 DoWork()
方法即可。
public class TestWorker : PeriodicBackgroundWorkerBase { public TestWorker(AbpTimer timer) : base(timer) { // 每五分钟执行一次。 timer.Period = (int)TimeSpan.FromMinutes(5).TotalMilliseconds; } protected override void DoWork() { Console.WriteLine("后台工作者被执行了。"); } }
然后在我们自己模块的 OnPreApplicationInitialization()
方法内解析出后台作业管理器(IBackgroundWorkerManager
),调用它的 Add()
方法,将我们定义的 TestWorker
添加到管理器当中即可。
后台工作者管理器
所有的后台工作者都是通过 IBackgroundWorkerManager
进行管理的,它提供了 StartAsync()
、StopAsync()
、Add()
方法。前面两个方法就是 IRunnable
接口定义的,后台工作者管理器直接集成了该接口,后面的 Add()
方法就是用来动态添加我们的后台工作者。
后台工作者管理器的默认实现是 BackgroundWorkerManager
类型,它内部做的事情很简单,就是维护一个后台工作者集合。每当调用 StartAsync()
或 StopAsync()
方法的时候,都从这个集合遍历后台工作者,执行他们的启动和停止方法。
这里值得注意的一点是,当我们调用 Add()
方法添加了一个后台工作者之后,后台工作者管理器就会启动这个后台工作者。
public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency, IDisposable { protected bool IsRunning { get; private set; } private bool _isDisposed; private readonly List<IBackgroundWorker> _backgroundWorkers; public BackgroundWorkerManager() { _backgroundWorkers = new List<IBackgroundWorker>(); } public virtual void Add(IBackgroundWorker worker) { _backgroundWorkers.Add(worker); // 如果当前后台工作者管理器还处于运行状态,则调用工作者的 StartAsync() 方法启动。 if (IsRunning) { AsyncHelper.RunSync( () => worker.StartAsync() ); } } public virtual void Dispose() { if (_isDisposed) { return; } _isDisposed = true; //TODO: ??? } // 启动,则遍历集合启动。 public virtual async Task StartAsync(CancellationToken cancellationToken = default) { IsRunning = true; foreach (var worker in _backgroundWorkers) { await worker.StartAsync(cancellationToken); } } // 停止, 则遍历集合停止。 public virtual async Task StopAsync(CancellationToken cancellationToken = default) { IsRunning = false; foreach (var worker in _backgroundWorkers) { await worker.StopAsync(cancellationToken); } } }
上述代码其实存在一个问题,即后台工作者被释放以后,是否还能执行 Add()
操作。参考我 之前的文章 ,其实当对象被释放之后,就应该抛出 ObjectDisposedException
异常。
后台作业
比起后台工作者,我们执行一次性任务的时候,一般会使用后台作业进行处理。比起只能设置固定周期的 PeriodicBackgroundWorkerBase
,集成了 Hangfire 的后台作业管理器,能够让我们使用 Cron 表达式,更加灵活地设置任务的执行周期。
模块的构造
关于后台作业的模块,我们需要说道两处。第一处是位于 Volo.Abp.BackgroundJobs.Abstractions 项目的 AbpBackgroundJobsAbstractionsModule
,第二出则是位于 Volo.Abp.BackgroundJobs 项目的 AbpBackgroundJobsModule
。
AbpBackgroundJobsAbstractionsModule
的主要行为是将符合条件的后台作业,添加到 AbpBackgroundJobOptions
配置当中,以便后续进行使用。
public override void PreConfigureServices(ServiceConfigurationContext context) { RegisterJobs(context.Services); } private static void RegisterJobs(IServiceCollection services) { var jobTypes = new List<Type>(); // 如果注册的类型符合 IBackgroundJob<> 泛型,则添加到集合当中。 services.OnRegistred(context => { if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IBackgroundJob<>))) { jobTypes.Add(context.ImplementationType); } }); services.Configure<AbpBackgroundJobOptions>(options => { // 将数据赋值给配置类。 foreach (var jobType in jobTypes) { options.AddJob(jobType); } }); }
Volo.Abp.BackgroundJobs 内部是 ABP vNext 为我们提供的 默认后台作业管理器,这个后台作业管理器 本质上是一个后台工作者。
这个后台工作者会周期性(取决于 AbpBackgroundJobWorkerOptions.JobPollPeriod
值,默认为 5 秒种)地从 IBackgroundJobStore
捞出一堆后台任务,并且在后台执行。至于每次执行多少个后台任务,这也取决于 AbpBackgroundJobWorkerOptions.MaxJobFetchCount
的值,默认值是 1000 个。
注意:
这里的 Options 类是
AbpBackgroundJobWorkerOptions
,别和AbpBackgroundWorkerOptions
混淆了。
所以在 AbpBackgroundJobsModule
模块里面,只做了一件事情,就是将负责后台作业的后台工作者,添加到后台工作者管理器种,并开始周期性地执行。
public override void OnApplicationInitialization(ApplicationInitializationContext context) { var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value; if (options.IsJobExecutionEnabled) { // 获得后台工作者管理器,并将负责后台作业的工作者添加进去。 context.ServiceProvider .GetRequiredService<IBackgroundWorkerManager>() .Add(context.ServiceProvider.GetRequiredService<IBackgroundJobWorker>() ); } }
后台作业的定义
在上一节里面看到,只要是实现 IBackgroundJob<TArgs>
类型的都视为一个后台作业。这个后台作业接口,只定义了一个行为,那就是执行(Execute(TArgs)
)。这里的 TArgs
泛型作为执行后台作业时,需要传递的参数类型。
// 因为是传入的参数,所以泛型参数是逆变的。 public interface IBackgroundJob<in TArgs> { void Execute(TArgs args); }
检查源码,发现 ABP vNext 的邮箱模块定义了一个邮件发送任务 BackgroundEmailSendingJob
,它的实现大概如下。
public class BackgroundEmailSendingJob : BackgroundJob<BackgroundEmailSendingJobArgs>, ITransientDependency { // ... public override void Execute(BackgroundEmailSendingJobArgs args) { AsyncHelper.RunSync(() => EmailSender.SendAsync(args.To, args.Subject, args.Body, args.IsBodyHtml)); } }
后台作业管理器
后台作业都是通过一个后台作业管理器(IBackgroundJobManager
)进行管理的,这个接口定义了一个入队方法(EnqueueAsync()
),注意,我们的后台作业在入队后,不是马上执行的。
说一下这个入队处理逻辑:
- 首先我们会通过参数的类型,获取到任务的名称。(假设任务上面没有标注
BackgroundJobNameAttribute
特性,那么任务的名称就是参数类型的FullName
。) - 构造一个
BackgroundJobInfo
对象。 - 通过
IBackgroundJobStore
持久化任务信息。
public virtual async Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null) { // 获取任务名称。 var jobName = BackgroundJobNameAttribute.GetName<TArgs>(); var jobId = await EnqueueAsync(jobName, args, priority, delay); return jobId.ToString(); } protected virtual async Task<Guid> EnqueueAsync(string jobName, object args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null) { var jobInfo = new BackgroundJobInfo { Id = GuidGenerator.Create(), JobName = jobName, // 通过序列化器,序列化参数值,方便存储。这里内部其实使用的是 JSON.NET 进行序列化。 JobArgs = Serializer.Serialize(args), Priority = priority, CreationTime = Clock.Now, NextTryTime = Clock.Now }; // 如果任务有执行延迟,则任务的初始执行时间要加上这个延迟。 if (delay.HasValue) { jobInfo.NextTryTime = Clock.Now.Add(delay.Value); } // 持久化任务信息,方便后面执行后台作业的工作者能够取到。 await Store.InsertAsync(jobInfo); return jobInfo.Id; }
BackgroundJobNameAttribute
相关的方法:
public static string GetName<TJobArgs>() { return GetName(typeof(TJobArgs)); } public static string GetName([NotNull] Type jobArgsType) { Check.NotNull(jobArgsType, nameof(jobArgsType)); // 判断参数类型上面是否标注了特性,并且特性实现了 IBackgroundJobNameProvider 接口。 return jobArgsType .GetCustomAttributes(true) .OfType<IBackgroundJobNameProvider>() .FirstOrDefault() ?.Name // 拿不到名字,则使用类型的 FullName。 ?? jobArgsType.FullName; }
后台作业的存储
后台作业的存储默认是放在内存的,这点可以从 InMemoryBackgroundJobStore
类型实现看出来。在它的内部使用了一个并行字典,通过作业的 Guid 与作业进行关联绑定。
除了内存实现,在 Volo.Abp.BackgroundJobs.Domain 模块还有一个 BackgroundJobStore
实现,基本套路与 SettingStore
一样,都是存储到数据库里面。
public class BackgroundJobStore : IBackgroundJobStore, ITransientDependency { protected IBackgroundJobRepository BackgroundJobRepository { get; } // ... public BackgroundJobInfo Find(Guid jobId) { return ObjectMapper.Map<BackgroundJobRecord, BackgroundJobInfo>( BackgroundJobRepository.Find(jobId) ); } // ... public void Insert(BackgroundJobInfo jobInfo) { BackgroundJobRepository.Insert( ObjectMapper.Map<BackgroundJobInfo, BackgroundJobRecord>(jobInfo) ); } // ... }
后台作业的执行
默认的后台作业管理器是通过一个后台工作者来执行后台任务的,这个实现叫做 BackgroundJobWorker
,这个后台工作者的生命周期也是单例的。后台作业的具体执行逻辑里面,涉及到了以下几个类型的交互。
类型 | 作用 |
---|---|
AbpBackgroundJobOptions |
提供每个后台任务的配置信息,包括任务的类型、参数类型、任务名称数据。 |
AbpBackgroundJobWorkerOptions |
提供后台作业工作者的配置信息,例如每个周期 最大执行的作业数量、后台 工作者的 执行周期、作业执行 超时时间 等。 |
BackgroundJobConfiguration |
后台任务的配置信息,作用是将持久化存储的作业信息与运行时类型进行绑定 和实例化,以便 ABP vNext 来执行具体的任务。 |
IBackgroundJobExecuter |
后台作业的执行器,当我们从持久化存储获取到后台作业信息时,将会通过 这个执行器来执行具体的后台作业。 |
IBackgroundJobSerializer |
后台作业序列化器,用于后台作业持久化时进行序列化的工具,默认采用的 是 JSON.NET 进行实现。 |
JobExecutionContext |
执行器在执行后台作业时,是通过这个上下文参数进行执行的,在这个上下 文内部,包含了后台作业的具体类型、后台作业的参数值。 |
IBackgroundJobStore |
前面已经讲过了,这个是用于后台作业的持久化存储,默认实现是存储在内存。 |
BackgroundJobPriority |
后台作业的执行优先级定义,ABP vNext 在执行后台任务时,会根据任务的优 先级进行排序,以便在后面执行的时候优先级高的任务先执行。 |
我们来按照逻辑顺序走一遍它的实现,首先后台作业的执行工作者会从持久化存储内,获取 MaxJobFetchCount
个任务用于执行。从持久化存储获取后台作业信息(BackgroundJobInfo
),是由 IBackgroundJobStore
提供的。
var store = scope.ServiceProvider.GetRequiredService<IBackgroundJobStore>(); var waitingJobs = store.GetWaitingJobs(WorkerOptions.MaxJobFetchCount); // 不存在任何后台作业,则直接结束本次调用。 if (!waitingJobs.Any()) { return; }
InMemoryBackgroundJobStore
的相关实现:
public List<BackgroundJobInfo> GetWaitingJobs(int maxResultCount) { return _jobs.Values .Where(t => !t.IsAbandoned && t.NextTryTime <= Clock.Now) .OrderByDescending(t => t.Priority) .ThenBy(t => t.TryCount) .ThenBy(t => t.NextTryTime) .Take(maxResultCount) .ToList(); }
上面的代码可以看出来,首先排除 被放弃的任务 ,包含达到执行时间的任务,然后根据任务的优先级从高到低进行排序。重试次数少的优先执行,预计执行时间越早的越先执行。最后从这些数据中,筛选出 maxResultCount
结果并返回。
说到这里,我们来看一下这个 NextTryTime
是如何被计算出来的?回想起最开始的后台作业管理器,我们在添加一个后台任务的时候,就会设置这个后台任务的 预计执行时间。第一个任务被添加到执行队列中时,它的值一般是 Clock.Now
,也就是它被添加到队列的时间。
不过 ABP vNext 为了让那些经常执行失败的任务,有比较低的优先级再执行,就在每次任务执行失败之后,会将 NextTryTime
的值指数级进行增加。这块代码可以在 CalculateNextTryTime
里面看到,也就是说某个任务的执行 失败次数越高,那么它下一次的预期执行时间就会越远。
protected virtual DateTime? CalculateNextTryTime(BackgroundJobInfo jobInfo, IClock clock) { // 一般来说,这个 DefaultWaitFactor 因子的值是 2.0 。 var nextWaitDuration = WorkerOptions.DefaultFirstWaitDuration * (Math.Pow(WorkerOptions.DefaultWaitFactor, jobInfo.TryCount - 1)); // 同执行失败的次数进行挂钩。 var nextTryDate = jobInfo.LastTryTime?.AddSeconds(nextWaitDuration) ?? clock.Now.AddSeconds(nextWaitDuration); if (nextTryDate.Subtract(jobInfo.CreationTime).TotalSeconds > WorkerOptions.DefaultTimeout) { return null; } return nextTryDate; }
当预期的执行时间都超过 DefaultTimeout
的超时时间时(默认为 2 天),说明这个任务确实没救了,就不要再执行了。
我们之前说到,从 IBackgroundJobStore
拿到了需要执行的后台任务信息集合,接下来我们就要开始执行后台任务了。
foreach (var jobInfo in waitingJobs) { jobInfo.TryCount++; jobInfo.LastTryTime = clock.Now; try { // 根据任务名称获取任务的配置参数。 var jobConfiguration = JobOptions.GetJob(jobInfo.JobName); // 根据配置里面存储的任务类型,将参数值进行反序列化。 var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType); // 构造一个新的执行上下文,让执行器执行任务。 var context = new JobExecutionContext(scope.ServiceProvider, jobConfiguration.JobType, jobArgs); try { jobExecuter.Execute(context); // 如果任务执行成功则删除该任务。 store.Delete(jobInfo.Id); } catch (BackgroundJobExecutionException) { // 发生任务执行失败异常时,根据指定的公式计算下一次的执行时间。 var nextTryTime = CalculateNextTryTime(jobInfo, clock); if (nextTryTime.HasValue) { jobInfo.NextTryTime = nextTryTime.Value; } else { // 超过超时时间的时候,公式计算函数返回 null,该任务置为废弃任务。 jobInfo.IsAbandoned = true; } TryUpdate(store, jobInfo); } } catch (Exception ex) { // 执行过程中,产生了未知异常,设置为废弃任务,并打印日志。 Logger.LogException(ex); jobInfo.IsAbandoned = true; TryUpdate(store, jobInfo); } }
执行后台任务的时候基本分为 5 步,它们分别是:
- 获得任务关联的配置参数,默认不用提供,因为在之前模块初始化的时候就已经配置了(你也可以显式指定)。
- 通过之前存储的配置参数,将参数值反序列化出来,构造具体实例。
- 构造一个执行上下文。
- 后台任务执行器执行具体的后台任务。
- 成功则删除任务,失败则更新任务下次的执行状态。
至于执行器里面的真正执行操作,你都拿到了参数值和任务类型了。就可以通过类型用 IoC 获取后台任务对象的实例,然后通过反射匹配方法签名,在实例上调用这个方法传入参数即可。
public virtual void Execute(JobExecutionContext context) { // 构造具体的后台作业实例对象。 var job = context.ServiceProvider.GetService(context.JobType); if (job == null) { throw new AbpException("The job type is not registered to DI: " + context.JobType); } // 获得需要执行的方法签名。 var jobExecuteMethod = context.JobType.GetMethod(nameof(IBackgroundJob<object>.Execute)); if (jobExecuteMethod == null) { throw new AbpException($"Given job type does not implement {typeof(IBackgroundJob<>).Name}. The job type was: " + context.JobType); } try { // 直接通过 MethodInfo 的 Invoke 方法调用,传入具体的实例对象和参数值即可。 jobExecuteMethod.Invoke(job, new[] { context.JobArgs }); } catch (Exception ex) { Logger.LogException(ex); // 如果是执行方法内的异常,则包装进行处理,然后抛出。 throw new BackgroundJobExecutionException("A background job execution is failed. See inner exception for details.", ex) { JobType = context.JobType.AssemblyQualifiedName, JobArgs = context.JobArgs }; } }
集成 Hangfire
ABP vNext 对于 Hangfire 的集成代码分布在 Volo.Abp.HangFire 和 Volo.Abp.BackgroundJobs.HangFire 模块内部,前者是在模块配置里面,调用 Hangfire 库的相关方法,注入组件到 IoC 容器当中。后者则是对后台作业进行了适配处理,替换了默认的 IBackgroundJobManager
实现。
在 AbpHangfireModule
模块内部,通过工厂创建出来一个 BackgroudJobServer
实例,并将它的生命周期与应用程序的生命周期进行绑定,以便进行销毁处理。
public class AbpHangfireModule : AbpModule { private BackgroundJobServer _backgroundJobServer; public override void ConfigureServices(ServiceConfigurationContext context) { context.Services.AddHangfire(configuration => { context.Services.ExecutePreConfiguredActions(configuration); }); } public override void OnApplicationInitialization(ApplicationInitializationContext context) { var options = context.ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value; _backgroundJobServer = options.BackgroundJobServerFactory.Invoke(context.ServiceProvider); } public override void OnApplicationShutdown(ApplicationShutdownContext context) { //TODO: ABP may provide two methods for application shutdown: OnPreApplicationShutdown & OnApplicationShutdown _backgroundJobServer.SendStop(); _backgroundJobServer.Dispose(); } }
我们直奔主题,看一下基于 Hangfire 的后台作业管理器是怎么实现的。
public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency { public Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null) { // 如果没有延迟参数,则直接通过 Enqueue() 方法扔进执行对了。 if (!delay.HasValue) { return Task.FromResult( BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>( adapter => adapter.Execute(args) ) ); } else { return Task.FromResult( BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>( adapter => adapter.Execute(args), delay.Value ) ); } }
上述代码中使用 HangfireJobExecutionAdapter
进行了一个适配操作,因为 Hangfire 要将一个后台任务扔进队列执行,不是用 TArgs
就能解决的。
转到这个适配器定义,提供了一个 Execute(TArgs)
方法,当被添加到 Hangfire 队列执行的时候。实际 Hangfire 会调用适配器的 Excetue(TArgs)
方法,然后内部还是使用的 IBackgroundJobExecuter
来执行具体定义的任务。
public class HangfireJobExecutionAdapter<TArgs> { protected AbpBackgroundJobOptions Options { get; } protected IServiceScopeFactory ServiceScopeFactory { get; } protected IBackgroundJobExecuter JobExecuter { get; } public HangfireJobExecutionAdapter( IOptions<AbpBackgroundJobOptions> options, IBackgroundJobExecuter jobExecuter, IServiceScopeFactory serviceScopeFactory) { JobExecuter = jobExecuter; ServiceScopeFactory = serviceScopeFactory; Options = options.Value; } public void Execute(TArgs args) { using (var scope = ServiceScopeFactory.CreateScope()) { var jobType = Options.GetJob(typeof(TArgs)).JobType; var context = new JobExecutionContext(scope.ServiceProvider, jobType, args); JobExecuter.Execute(context); } } }
集成 RabbitMQ
基于 RabbitMQ 的后台作业实现,我想放在分布式事件总线里面,对其一起进行讲解。
三、总结
ABP vNext 为我们提供了多种后台作业管理器的实现,你可以根据自己的需求选用不同的后台作业管理器,又或者是自己动手造轮子。
需要看其他的 ABP vNext 相关文章?点击我 即可跳转到总目录。