藉助Redis完成延時任務
- 2020 年 3 月 15 日
- 筆記
背景
相信我們或多或少的會遇到類似下面這樣的需求:
第三方給了一批數據給我們處理,我們處理好之後就通知他們處理結果。
大概就是下面這個圖說的。
本來在處理完數據之後,我們就會馬上把處理結果返回給對方,但是對方要求我們處理速度不能過快,要有一種人為處理的效果。
換句話就是說,就算是處理好了,也要晚一點再執行通知操作。
這就是一個典型的延時任務。
延時,那還不簡單,執行完之後,讓它Sleep
一下就好了,這樣就達到目標了。
Sleep
一下確定是最容易實現的一種方案,但是試想一下,數據的數量不斷的增加,這樣Sleep
真的好嗎?答案是否定的。
延時隊列,是處理這個場景最為妥當的方案。
RabbitMQ,RocketMQ,Cmq等都可以直接或間接的達到相應的效果。
如果不具備隊列條件,又要怎麼處理呢?還可以藉助Redis來完成這項工作。
MQ不一定每個公司都會用,但Redis應該80%以上的都會用吧。
處理方案
Redis這邊,可用的方案有兩種,下面分別來介紹一下。
#1 鍵的過期時間
在設置快取的時候,我們比較多情況下都會設置一個快取的過期時間,這個時間過期後,會重新去數據源拿數據回來。
可以基於這個過期時間結合Redis的keyspace notifications共同完成。
keyspace notifications裡面包含了非常多的事件,這裡只關注EXPIRE
,這個是和過期有關的。
只要訂閱了__keyevent@0__:expired
這個主題,當有key過期的時候,就會收到對應的資訊。
註:主題@後面的0,指的是db 0.
要想使用這個特性,必不可少的一步是修改Redis默認的配置,把notify-keyspace-events
設置成Ex
。
############################# Event notification ############################## # Redis can notify Pub/Sub clients about events happening in the key space. # This feature is documented at http://redis.io/topics/notifications # # ......... # # By default all notifications are disabled because most users don't need # this feature and the feature has some overhead. Note that if you don't # specify at least one of K or E, no events will be delivered. notify-keyspace-events "Ex"
其中 E 指的是鍵事件通知,x 指的是過期事件。
根據這個特性,重新調整一下流程圖:
應該也比較好懂,下面通過簡單的程式碼來實現一下這種方案。
首先是處理完數據及往Redis寫數據。
public async Task DoTaskAsync() { // 數據處理 // ... // 後續操作要延時,把Id記錄下來 var taskId = new Random().Next(1, 10000); // 要延遲的時間 int sec = new Random().Next(1, 5); // 可以加個重試機制,預防單次執行失敗。 await RedisHelper.SetAsync($"task:{taskId}", "1", sec); }
還需要回傳結果的後台任務,這個任務就是去訂閱上面說的鍵過期事件,然後回傳結果。
這裡可以藉助BackgroundService
來訂閱處理。
public class SubscribeTaskBgTask : BackgroundService { protected override Task ExecuteAsync(CancellationToken stoppingToken) { stoppingToken.ThrowIfCancellationRequested(); var keyPrefix = "task:"; RedisHelper.Subscribe( ("__keyevent@0__:expired", arg => { var msg = arg.Body; Console.WriteLine($"recive {msg}"); if (msg.StartsWith(keyPrefix)) { // 取到任務Id var val = msg.Substring(keyPrefix.Length); Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}"); // 回傳處理結果給第三方,這裡可以考慮這個並發鎖,避免多實例都處理了這個任務。 // .... } } )); return Task.CompletedTask; } }
這裡有一個要注意的地方,要在key裡面包含任務的Id,因為訂閱處理的時候,只能拿到一個key,後續能做的操作也只是基於這個key。
上面的例子,是用了task:任務Id
的形式,所以在訂閱處理的時候,只處理以task:
開頭的那些key。
效果如下:
這種方案,直觀上是非常簡單的,不過這種方案會遇到一個小問題。
當一個key過期後,並不一定會馬上收到通知,這個也是會有一定的延時的,取決於Redis的內部機制。
Redis Keyspace Notifications文檔的最後一段也提到了這個問題。
所以用這種方案的時候,要考慮一下,你的延時是不是要及時~~
#2 有序集合
有序集合是Redis中一種十分有用的數據結構,它的本質其實就是集合加了一個排序的功能,每個集合裡面的元素還會有一個分值的屬性。
它提供了一個可以獲取指定分值範圍內的元素,這個也就是我們的出發點。
在這個場景下,什麼東西可能作為這個分值呢?現在只有一個處理任務的Id還有一個延遲的時間,Id肯定不行,那麼也只能是延遲時間來作這個分值了。
延遲1秒,5秒,1分鐘,這個都是比較大粒度的時間,這裡要轉化一下,用時間戳來代替這些延遲的時間。
假設現在的時間戳是 1584171520
, 要延遲5秒執行,那麼執行任務的時間就是 1584171525
,在當前時間戳的基礎上加個5秒,就是最終要執行的了。
到時有序集合中存的元素就會是這樣的
任務Id-1 1584171525 任務Id-2 1584171528 任務Id-3 1584171530
接下來就是要怎麼取出這些任務的問題了!
把當前時間戳當成是取數的最大分值,0作為最小分值,這個時候取出的元素就是應該要執行回傳的任務了。
根據這個方案,重新調整一下流程圖:
交代清楚了思路,再來點程式碼,加深一下理解。
首先還是處理完數據後往Redis寫數據。
public async Task DoTaskAsync() { // 數據處理 // ... // 後續操作要延時,把Id記錄下來 var taskId = new Random().Next(1, 10000); var cacheKey = "task:delay"; int sec = new Random().Next(1, 5); // 要執行這個任務的時間戳 var time = DateTimeOffset.Now.AddSeconds(sec).ToUnixTimeSeconds(); await RedisHelper.ZAddAsync(cacheKey, (time, taskId)); Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} done {taskId} here - {sec}"); }
後面就是輪訓有序集合裡面的元素了,這裡同樣是藉助BackgroundService
來處理。
public class SubscribeTaskBgTask : BackgroundService { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { stoppingToken.ThrowIfCancellationRequested(); var cacheKey = "task:delay"; while (true) { // 先取,後刪,不具備原子性,可考慮用lua腳本來保證原子性。 var vals = await RedisHelper.ZRangeByScoreAsync(cacheKey, -1, DateTimeOffset.Now.ToUnixTimeSeconds(), 1, 0); if (vals != null && vals.Length > 0) { var val = vals[0]; var rmCount = await RedisHelper.ZRemAsync(cacheKey, vals); if (rmCount > 0) { // 要把這個元素先刪除成功了,再執行任務,不然會重複 Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}"); // 回傳處理結果給第三方,這裡可以考慮這個並發鎖,避免多實例都處理了這個任務。 // .... } } else { // 沒有數據,休眠500ms,避免CPU空轉 await Task.Delay(500); } } } }
效果如下:
參考文章
https://redis.io/topics/notifications
https://zhuanlan.zhihu.com/p/87113913