藉助Redis完成延時任務

  • 2020 年 3 月 15 日
  • 筆記

背景

相信我們或多或少的會遇到類似下面這樣的需求:

第三方給了一批數據給我們處理,我們處理好之後就通知他們處理結果。

大概就是下面這個圖說的。

本來在處理完數據之後,我們就會馬上把處理結果返回給對方,但是對方要求我們處理速度不能過快,要有一種人為處理的效果。

換句話就是說,就算是處理好了,也要晚一點再執行通知操作。

這就是一個典型的延時任務。

延時,那還不簡單,執行完之後,讓它Sleep一下就好了,這樣就達到目標了。

Sleep一下確定是最容易實現的一種方案,但是試想一下,數據的數量不斷的增加,這樣Sleep真的好嗎?答案是否定的。

延時隊列,是處理這個場景最為妥當的方案。

RabbitMQ,RocketMQ,Cmq等都可以直接或間接的達到相應的效果。

如果不具備隊列條件,又要怎麼處理呢?還可以藉助Redis來完成這項工作。

MQ不一定每個公司都會用,但Redis應該80%以上的都會用吧。

處理方案

Redis這邊,可用的方案有兩種,下面分別來介紹一下。

#1 鍵的過期時間

在設置快取的時候,我們比較多情況下都會設置一個快取的過期時間,這個時間過期後,會重新去數據源拿數據回來。

可以基於這個過期時間結合Redis的keyspace notifications共同完成。

keyspace notifications裡面包含了非常多的事件,這裡只關注EXPIRE,這個是和過期有關的。

只要訂閱了__keyevent@0__:expired這個主題,當有key過期的時候,就會收到對應的資訊。

註:主題@後面的0,指的是db 0.

要想使用這個特性,必不可少的一步是修改Redis默認的配置,把notify-keyspace-events設置成Ex

############################# Event notification ##############################    # Redis can notify Pub/Sub clients about events happening in the key space.  # This feature is documented at http://redis.io/topics/notifications  #  # .........  #  #  By default all notifications are disabled because most users don't need  #  this feature and the feature has some overhead. Note that if you don't  #  specify at least one of K or E, no events will be delivered.  notify-keyspace-events "Ex"  

其中 E 指的是鍵事件通知,x 指的是過期事件。

根據這個特性,重新調整一下流程圖:

應該也比較好懂,下面通過簡單的程式碼來實現一下這種方案。

首先是處理完數據及往Redis寫數據。

public async Task DoTaskAsync()  {      // 數據處理      // ...        // 後續操作要延時,把Id記錄下來      var taskId = new Random().Next(1, 10000);      // 要延遲的時間      int sec = new Random().Next(1, 5);        // 可以加個重試機制,預防單次執行失敗。      await RedisHelper.SetAsync($"task:{taskId}", "1", sec);  }

還需要回傳結果的後台任務,這個任務就是去訂閱上面說的鍵過期事件,然後回傳結果。

這裡可以藉助BackgroundService來訂閱處理。

public class SubscribeTaskBgTask : BackgroundService  {      protected override Task ExecuteAsync(CancellationToken stoppingToken)      {          stoppingToken.ThrowIfCancellationRequested();          var keyPrefix = "task:";          RedisHelper.Subscribe(              ("__keyevent@0__:expired", arg =>                  {                      var msg = arg.Body;                      Console.WriteLine($"recive {msg}");                      if (msg.StartsWith(keyPrefix))                      {                          // 取到任務Id                          var val = msg.Substring(keyPrefix.Length);                          Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}");                            // 回傳處理結果給第三方,這裡可以考慮這個並發鎖,避免多實例都處理了這個任務。                          // ....                      }                  }              ));            return Task.CompletedTask;      }  }

這裡有一個要注意的地方,要在key裡面包含任務的Id,因為訂閱處理的時候,只能拿到一個key,後續能做的操作也只是基於這個key。

上面的例子,是用了task:任務Id的形式,所以在訂閱處理的時候,只處理以task:開頭的那些key。

效果如下:

這種方案,直觀上是非常簡單的,不過這種方案會遇到一個小問題。

當一個key過期後,並不一定會馬上收到通知,這個也是會有一定的延時的,取決於Redis的內部機制。

Redis Keyspace Notifications文檔的最後一段也提到了這個問題。

所以用這種方案的時候,要考慮一下,你的延時是不是要及時~~

#2 有序集合

有序集合是Redis中一種十分有用的數據結構,它的本質其實就是集合加了一個排序的功能,每個集合裡面的元素還會有一個分值的屬性。

它提供了一個可以獲取指定分值範圍內的元素,這個也就是我們的出發點。

在這個場景下,什麼東西可能作為這個分值呢?現在只有一個處理任務的Id還有一個延遲的時間,Id肯定不行,那麼也只能是延遲時間來作這個分值了。

延遲1秒,5秒,1分鐘,這個都是比較大粒度的時間,這裡要轉化一下,用時間戳來代替這些延遲的時間。

假設現在的時間戳是 1584171520, 要延遲5秒執行,那麼執行任務的時間就是 1584171525,在當前時間戳的基礎上加個5秒,就是最終要執行的了。

到時有序集合中存的元素就會是這樣的

任務Id-1 1584171525  任務Id-2 1584171528  任務Id-3 1584171530

接下來就是要怎麼取出這些任務的問題了!

把當前時間戳當成是取數的最大分值,0作為最小分值,這個時候取出的元素就是應該要執行回傳的任務了。

根據這個方案,重新調整一下流程圖:

交代清楚了思路,再來點程式碼,加深一下理解。

首先還是處理完數據後往Redis寫數據。

public async Task DoTaskAsync()  {      // 數據處理      // ...        // 後續操作要延時,把Id記錄下來      var taskId = new Random().Next(1, 10000);        var cacheKey = "task:delay";      int sec = new Random().Next(1, 5);        // 要執行這個任務的時間戳      var time = DateTimeOffset.Now.AddSeconds(sec).ToUnixTimeSeconds();        await RedisHelper.ZAddAsync(cacheKey, (time, taskId));      Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} done {taskId} here - {sec}");  }

後面就是輪訓有序集合裡面的元素了,這裡同樣是藉助BackgroundService來處理。

public class SubscribeTaskBgTask : BackgroundService  {      protected override async Task ExecuteAsync(CancellationToken stoppingToken)      {          stoppingToken.ThrowIfCancellationRequested();          var cacheKey = "task:delay";          while (true)          {              // 先取,後刪,不具備原子性,可考慮用lua腳本來保證原子性。              var vals = await RedisHelper.ZRangeByScoreAsync(cacheKey, -1, DateTimeOffset.Now.ToUnixTimeSeconds(), 1, 0);                if (vals != null && vals.Length > 0)              {                  var val = vals[0];                    var rmCount = await RedisHelper.ZRemAsync(cacheKey, vals);                    if (rmCount > 0)                  {                      // 要把這個元素先刪除成功了,再執行任務,不然會重複                      Console.WriteLine($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} begin to do task {val}");                        // 回傳處理結果給第三方,這裡可以考慮這個並發鎖,避免多實例都處理了這個任務。                      // ....                  }              }              else              {                  // 沒有數據,休眠500ms,避免CPU空轉                  await Task.Delay(500);              }          }      }  }

效果如下:

參考文章

https://redis.io/topics/notifications

https://zhuanlan.zhihu.com/p/87113913