Hangfire只允許同時運行同一個任務

  • 2020 年 11 月 10 日
  • 筆記

Hangfire有個機制可以確保所有任務都會被執行,如果當伺服器停機了一段時間重新啟動時,在此期間的周期任務會幾乎同時執行。而大部分時候,我們希望同個周期任務每段時間只運行一個就行了。

或者是如果周期任務設置得過於頻繁,當之前的任務還沒執行完,我們也不希望繼續添加周期任務進隊列去排隊執行。

Hangfire有提供一個擴展//docs.hangfire.io/en/latest/background-processing/throttling.html 

同個DisableConcurrentExecution我們可以限制同一個任務每次只會執行一個,但是如果有任務正在執行,這時候又有新任務過來,新任務並不會被刪除而是處於排隊狀態,等待前面的任務執行完。

 

 

而且,如果我們的任務用了同一個方法作為入口時(或者說我們需要根據方法的參數來確定是否為同一個任務),此時這個控制就不適用了。

參考了//gist.github.com/sbosell/3831f5bb893b20e82c72467baf8aefea,我們可以用過濾器來實現,將運行期間進來的任務給取消掉。

程式碼的具體實現為:

 1     /// <summary>
 2     /// 禁用多個排隊項目
 3     /// <remarks>同個任務取消並行執行,期間進來的任務不會等待,會被取消</remarks>
 4     /// </summary>
 5     public class DisableMultipleQueuedItemsFilter : JobFilterAttribute, IClientFilter, IServerFilter
 6     {
 7         private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5);
 8         private static readonly TimeSpan FingerprintTimeout = TimeSpan.FromHours(4);//任務執行超時時間
 9 
10         public void OnCreating(CreatingContext filterContext)
11         {
12             var recurringJobId = filterContext.GetJobParameter<string>("RecurringJobId");
13             if (!string.IsNullOrEmpty(recurringJobId)&&!AddFingerprintIfNotExists(filterContext.Connection, recurringJobId))
14             {
15                 filterContext.Canceled = true;
16             }
17         }
18 
19         public void OnPerformed(PerformedContext filterContext)
20         {
21             var recurringJobId = filterContext.GetJobParameter<string>("RecurringJobId");
22             if (!string.IsNullOrEmpty(recurringJobId))
23             {
24                 RemoveFingerprint(filterContext.Connection, recurringJobId);
25             }
26         }
27 
28         private static bool AddFingerprintIfNotExists(IStorageConnection connection, string recurringJobId)
29         {
30             using (connection.AcquireDistributedLock(GetFingerprintLockKey(recurringJobId), LockTimeout))
31             {
32                 var fingerprint = connection.GetAllEntriesFromHash(GetFingerprintKey(recurringJobId));
33 
34                 if (fingerprint != null &&
35                     fingerprint.ContainsKey("Timestamp") &&
36                     DateTimeOffset.TryParse(fingerprint["Timestamp"], null, DateTimeStyles.RoundtripKind, out var timestamp) &&
37                     DateTimeOffset.UtcNow <= timestamp.Add(FingerprintTimeout))
38                 {
39                     // 有任務還未執行完,並且沒有超時
40                     return false;
41                 }
42 
43                 // 沒有任務執行,或者該任務已超時
44                 connection.SetRangeInHash(GetFingerprintKey(recurringJobId), new Dictionary<string, string>
45             {
46                 { "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
47             });
48 
49                 return true;
50             }
51         }

View Code

 

在OnCreating方法中,我們讀取RecurringJobId的值,獲取周期任務的id(同樣的id代表同一個周期任務),然後以這個id為key去設置一個超時。如果在此期間,如果拿到了key的值,以及設置的時間還未超時的話,我們通過設置filterContext.Canceled = true取消掉此任務。

 

 使用connection.AcquireDistributedLock在設置鍵值時添加分散式鎖,確保不會同時設置了多個相同的任務。使用connection.SetRangeInHash鍵RecurringJobId作為key,當前時間作為值保存。以此來確保在FingerprintTimeout的超時時間內,同個RecurringJobId的任務只能創建一個。或者等任務執行完後在OnPerformed方法中釋放掉這個鍵值。

在OnPerformed方法中,將我們在創建方法中設置的RecurringJobId key和對應的時間給刪除,這樣OnCreating可以繼續創建同一個RecurringJobId 的任務。

 

 

 

或者是普通觸發的任務,這時候沒有RecurringJobId 我們希望可以同個參數來控制,同樣的參數不能同時執行。我們可以通過這個方法來生成相應的key

 1         private static string GetFingerprint(Job job)
 2         {
 3             var parameters = string.Empty;
 4             if (job?.Arguments != null)
 5             {
 6                 parameters = string.Join(".", job.Arguments);
 7             }
 8             if (job?.Type == null || job.Method == null)
 9             {
10                 return string.Empty;
11             }
12             var payload = $"{job.Type.FullName}.{job.Method.Name}.{parameters}";
13             var hash = SHA256.Create().ComputeHash(System.Text.Encoding.UTF8.GetBytes(payload));
14             var fingerprint = Convert.ToBase64String(hash);
15             return fingerprint;
16         }    

View Code

這樣我們就能確保我們希望的同一個任務不會同時在執行,而且周期任務也不會繼續在隊列中排隊