Asp-Net-Core開發筆記:集成Hangfire實現異步任務隊列和定時任務

前言

最近把Python寫的數據採集平台往.Net Core上遷移,原本的採集任務使用多進程+線程池的方式來加快採集速度,使用Celery作為異步任務隊列兼具定時任務功能,這套東西用着還行,但反正就折騰嘛,直接上C#~

本文記錄 Hangfire 在實際應用里的用法,我發現網絡上找到的大部分文章都是用 Hangfire 的異步任務輸出個 Hello World,然後就沒了。我實在不知道這樣的文章寫了有什麼意義??除了浪費看的人的時間之外,還浪費自己寫文章的時間……

.NetCore作為一個高性能的平台,自然不可能輸給Python,不過我不想造輪子了(菜),找個現成的方案來用,免得踩坑~

先是調研了一下.NetCore目前的生態,發現有幾個選擇:

  • FreeScheduler
  • Quartz.net
  • Hangfire

第一個 FreeScheduler 和 FreeSQL 項目出自同一個作者之手,剛好我的項目也是用的 FreeSQL 作為 ORM,不過看了一下Github上stars比較少,而且文檔暫時還不完善,我最關心的依賴注入功能暫時還不好搞,於是只好作罷。

然後在 Quartz.net 和 Hangfire 兩者中,我選擇了後者,原因是我之前做 CrawlCenterNet 項目的時候用過 Hangfire,還挺好用的,且帶有一個簡單的 dashboard,比較直觀~

那麼就開始吧~

關於後端的選擇

這裡的後端指的是任務隊列的存儲後端,也就是 Hangfire 文檔中寫的 Storage。

看了一下官網,大部分關係型非關係型數據庫都是可以的,那我就放心了。由於目前生產環境在使用 Oracle 數據庫,所以一開始我選擇了 Oracle 作為 Storage,但是在同時開到2000多個任務的時候報錯了,看了下原因是表空間不足,是 Oracle 的問題…

所以為了性能和穩定性,我棄坑了,接着嘗試了 SQLite (僅作為本地測試),結果發現配置裏面定義了 Connection String 但它不理我,直接把這個 Connection String 作為數據庫的名稱了,無語… 這樣就沒辦法把 SQLite 數據庫設置為異步模式,那速度就直接烏龜爬了…

再次棄坑… 這次直接上 Redis 了,為了提高性能,捨棄持久化能力~ Redis也沒讓我失望,幾千個任務壓根不帶眨眼的,nice~

數據採集代碼

Hangfire 組件不是一開始就引入的,這裡先上最基本的數據採集代碼,後面的介紹才能更清楚~

關鍵的代碼在 Services/CrawlService.cs 文件中

public class CrawlService {
  // 依賴注入一些服務
  private readonly IBaseRepository<Proc> _repo;
  
  public async Task CrawlAllProc() {
    for(var i=1; i<2000; i++) {
      await CrawlProcList(i);
    }
  }
  
  public async Task CrawlProcList(int page) {
    // 具體代碼省略了
    var procList = ; //...
    foreach (var proc in procList) {
      await CrawlProc(proc);
    }
  }
  
  public async Task CrawlProc(Proc proc) { }
}

然後,當啟動採集任務的時候,直接去調用 CrawlAllProc 方法,這樣就開始一頁一頁採集,每頁又有很多的 Proc 數據,全都採集下來。

上面的代碼用的是 await ,會等待異步方法完成,速度很慢,去掉 await ,在新線程中執行任務,不等待其結束,不過問題也很明顯,如果出錯了很難調試,這樣就不好保證系統的穩定性。

接下來我們用 Hangfire 來改造。

安裝 Hangfire

本項目用到了以下依賴:

  • Hangfire.Core
  • Hangfire.AspNetCore
  • Hangfire.Redis.StackExchange

直接 nuget 一把梭完事

註冊服務

為了跟後面的內容區分,這裡先來官方的例子

註冊服務:

services.AddHangfire(
  configuration => configuration
  .SetDataCompatibilityLevel(CompatibilityLevel.Version_170)
  .UseSimpleAssemblyNameTypeSerializer()
  .UseRecommendedSerializerSettings()
  // 根據實際使用的 Storage 來註冊
  .UseRedisStorage();
services.AddHangfireServer();

添加中間件:

app.UseHangfireDashboard();

簡單使用

Hangfire 註冊的時候默認是單例模式,所以在任意代碼中使用其靜態方法就能添加異步任務或者定時任務。

異步任務:

BackgroundJobs.Enqueue(() => Console.WriteLine("Hello world!"));

定時任務:

Hangfire的定時任務叫做 recurrent tasks,我之前一般習慣叫 scheduled task,一開始差點找不到文檔~

以下代碼添加一個每天執行一次的任務,如果需要其他時間,可以自定義後面的 Cron 參數,具體自行研究 Cron 語法~

RecurringJob.AddOrUpdate("easyjob", () => Console.Write("Easy!"), Cron.Daily);

正經使用

OK,終於進到正文

正如我一開始說的,前面介紹的用法是遠遠不夠的,如果只是介紹個 Hello World,那也沒必要專門寫篇文章了…

現在開始介紹如何將 Hangfire 結合我的 CrawlService 使用

因為 CrawlService 需要操作數據庫,所以是用到了依賴注入的,所以我們需要讓 Hangfire 也支持依賴注入。

官方文檔有一小節是關於 IOC 容器的(//docs.hangfire.io/en/latest/background-methods/using-ioc-containers.html),不過並沒有介紹 AspNetCore 的容器,直接自己動手 豐衣足食咯~

添加 AspNetCore 的依賴注入容器

一開始搜了好久沒找到,最終是在Github上找到一個例子代碼,裏面的 AspNetCore 版本好老,居然是1.1版本,我都沒用過… 不過並不影響我節儉他的寫法~

這一步需要 JobActivator 的子類

來寫一個,我把它放在 Infrastructure 目錄下

using Hangfire;

public class HangfireActivator : JobActivator {
    private readonly IServiceProvider _serviceProvider;

    public HangfireActivator(IServiceProvider serviceProvider) {
        _serviceProvider = serviceProvider;
    }

    public override object? ActivateJob(Type jobType) {
        return _serviceProvider.GetService(jobType);
    }
}

這裡是在 HangfireActivator 的構造函數中把 AspNetCore 的 IOC 容器對象傳入,並且重寫 ActivateJob 方法,讓 Hangfire 才激活任務的時候從 IOC 容器中獲取對象,比較好理解。

修改服務註冊代碼

其實服務註冊部分是一樣的,無須修改

不過按照習慣,為了使 Program.cs 或者 Startup.cs 代碼比較簡潔,我還是寫了擴展方法來實現這部分。

Extensions 目錄中添加 ConfigureHangfire.cs

public static class ConfigureHangfire {
    public static void AddHangfirePkg(this IServiceCollection services, IConfiguration configuration) {
        services.AddHangfire(conf => conf
            .SetDataCompatibilityLevel(CompatibilityLevel.Version_170)
            .UseSimpleAssemblyNameTypeSerializer()
            .UseRecommendedSerializerSettings()
            .UseRedisStorage()
        );

        services.AddHangfireServer();
    }

    public static void UseHangfire(this WebApplication app) {
        GlobalConfiguration.Configuration.UseActivator(new HangfireActivator(app.Services));
        app.UseHangfireDashboard();
    }
}

可以看到有修改的地方就是在添加中間件之前,配置了 Activator 這行代碼:

GlobalConfiguration.Configuration.UseActivator(new HangfireActivator(app.Services));

直接把 IOC 容器傳入

搞定~

接着在 Program.cs (我用的 .Net6)中使用這個擴展方法就完事了~

builder.Services.AddHangfirePkg(builder.Configuration);
// 中間件
app.UseHangfire();

創建任務

有了依賴注入之後,創建異步任務是這樣的。也就是多了個泛型參數。

BackgroundJob.Enqueue<CrawlService>(a => a.CrawlAllProc());

定時任務,每小時執行一次

RecurringJob.AddOrUpdate<CrawlService>(a => a.CrawlAllProc(), Cron.Hourly);

改造一下數據採集代碼

OK,最後回到一開始的數據採集代碼,做如下修改:

public class CrawlService {
  // 依賴注入一些服務
  private readonly IBaseRepository<Proc> _repo;
  
  public async Task CrawlAllProc() {
    for(var i=1; i<2000; i++) {
      // await CrawlProcList(i);
      BackgroundJob.Enqueue(() => CrawlProcList(i, 100));
    }
  }
  
  public async Task CrawlProcList(int page, int pageSize = 100) {
    // 具體代碼省略了
    var procList = ; //...
    foreach (var proc in procList) {
      // await CrawlProc(proc);
      BackgroundJob.Enqueue(() => CrawlProc(proc));
    }
  }
  
  public async Task CrawlProc(Proc proc) { }
}

把原來 await 的地方注釋掉,換成用 Hangfire 創建異步任務,運行起來,打開dashboard,可以看到任務噌的一下就上到幾千,速度極快~

需要注意的就是 CrawlProcList 方法的第二個參數 pageSize 我們給了默認值100,在正常使用是沒問題的,可以不傳入這個參數,默認就是100。

BackgroundJob.Enqueue 方法里不能省略這個參數,不然會報錯說編譯器無法解析啥的,這個應該是C#的語言限制,具體我暫時還沒去深入研究。

小結

OK,這樣就初步搞定了數據採集 & 定時採集的功能,這部分剛好是我國慶第一天加班完成的,後續的就交給時間吧~ 國慶剩下幾天的假期讓它跑個夠,等假期結束再回去看看效果如何,到時有新的進展我也會及時更新博客。

對了,我還打算封裝個異步任務和定時任務的接口(似乎 AspNetCore 沒有這部分功能?),因為我不想代碼和 Hangfire 有太高的耦合,封裝成抽象的接口,以後如果換別的組件也沒有壓力。

就把這件事先加入 todo list 吧~

參考資料