你知道Redis可以實現延遲隊列嗎?

最近,又重新學習了下Redis,深深被Redis的魅力所折服,我才知道Redis不僅能快還能慢(我想也這麼優秀o(╥﹏╥)o),簡直是個利器呀。

 咳咳咳,大家不要誤會,本文很正經的啦!

好了,接下來回到我們的話題,我們都知道Redis是一種基於記憶體的單進程單執行緒資料庫(Redis6.0開始之後支援多執行緒啦!),處理速度都非常快。那麼為何Redis又能慢呢?原來,這裡說的慢是指Redis可以設置一些參數達到慢處理的結果。(這就是為什麼Redis既能快又能慢啦!)

那接下來開始講講我們的楷模Redis在隊列中如何實現延時的情況:

在我們日常生活中,我們可以發現,

  • 在淘寶、京東等購物平台上下單,超過一定時間未付款,訂單會自動取消。
  • 打車的時候,在規定時間沒有車主接單,平台會取消你的單並提醒你暫時沒有車主接單。
  • 點外賣的時候,如果商家在10分鐘還沒接單,就會自動取消訂單。
  • 收快遞的時候,如果我們沒有點確認收貨,在一段時間後程式會自動完成訂單。
  • 在平台完成訂單後,如果我們沒有在規定時間評論商品,會自動默認買家不評論。
  • …….還有很多這樣的場景。

這時,我們可以想想為什麼要這樣做?

因為這樣可以保證商品的庫存可以釋放給其他人購買,你可以不用一直等待打車卻得不到回復,你可以及時換一家店點到外賣。

那麼這些情況都是如何實現的呢?

這時我們可以看看這個圖,來看看消息延遲是如何處理的:

 當用戶發送一個消息請求給伺服器後台的時候,伺服器會檢測這條消息是否需要進行延時處理,如果需要就放入到延時隊列中,由延時任務檢測器進行檢測和處理,對於不需要進行延時處理的任務,伺服器會立馬對消息進行處理,並把處理後的結果相應返會給用戶。

 對於在延時任務檢測器內部的話,有查詢延遲任務和執行延時任務兩個職能,任務檢測器會先去延時任務隊列進行隊列中資訊讀取,判斷當前隊列中哪些任務已經時間到期並將已經到期的任務輸出執行(具有實時性,會存在一定的時間誤差,因為這個是定時任務)。

這時,我們可以想一想在Redis的數據結構中有哪些能進行時間設置標誌的命令?

是不是想到的 zset 這個命令,具有去重有序(分數排序)的功能。沒錯,你想對了呀!

我們可以使用 zset(sortedset)這個命令,用設置好的時間戳作為score進行排序,使用 zadd score1 value1 ….命令就可以一直往記憶體中生產消息。再利用 zrangebysocre 查詢符合條件的所有待處理的任務,通過循環執行隊列任務即可。也可以通過 zrangebyscore key min max withscores limit 0 1 查詢最早的一條任務,來進行消費。

總的來說,你可以通過以下兩種方式來實現((*^▽^*)如果你想到其他方法,也可以告訴我下呀~):

(1)使用zrangebyscore來查詢當前延時隊列中所有任務,找出所有需要進行處理的延時任務,在依次進行操作。

(2)查找當前最早的一條任務,通過score值來判斷任務執行的時候是否大於了當前系統的時候,比如說:最早的任務執行時間在3點,系統時間在2點58分),表示這個應該需要立馬被執行啦,時間快到了(沖沖沖,他來了他來了,他帶著死神的步伐來了)。

我們可以想一想Redis來實現延時隊列有何優勢呢?

其實,Redis用來進行實現延時隊列是具有這些優勢的:

(1)Redis zset支援高性能的 score 排序。

(2)Redis是在記憶體上進行操作的,速度非常快。

(3)Redis可以搭建集群,當消息很多時候,我們可以用集群來提高消息處理的速度,提高可用性。

(4)Redis具有持久化機制,當出現故障的時候,可以通過AOF和RDB方式來對數據進行恢復,保證了數據的可靠性

這時候,會有小夥伴問了還有沒有其他實現延時隊列的方式呀!emmm….當然有的,只有想不到的沒有做不到(O(∩_∩)O哈哈~,開玩笑)

一、用消息中間件實現延時隊列

(1)通過 RabbitMQ 來實現延時隊列,也就是用消息中間件來實現延時隊列啦~

方法一:在MQ中我們可以對Queue設置 x-expires 過期時間或者對 Message設置超時時間x-message-ttl。

(這裡要注意下:延時相同的消息我們要扔到同一個隊列中,對於每一個延時要建立一個與之對應的隊列—這是由於MQ的過期檢測是惰性檢測的。)

方法二:我們可以用RabbitMQ的插件rabbitmq-delayed-message-exchange插件來實現延時隊列。達到可投遞時間時並將其通過x-delayed-type類型標記的交換機類型投遞至目標隊列。

 

(2)RocketMQ實現延時隊列

rocketmq在發送延時消息時,是先把消息按照延遲時間段發送到指定的隊列中(把延時時間段相同的消息放到同一個隊列中,保證了消息處理的順序性,可以讓同一個隊列中消息延時時間是相同的,整個RocketMQ中延時消息時按照遞增順序排序,保證資訊處理的先後順序性。)。之後,通過一個定時器來輪詢處理這些隊列里的資訊,判斷是否到期。對於到期的消息會發送到相應的處理隊列中,進行處理。

哈哈,目前RocketMQ只支援特定的延時時間段,1s,5s,10s,…2h,不能支援任意時間段的延時設置。有興趣的小夥伴可以去了解下它是相關知識呀~

 

二、Kafka實現延時隊

Kafka基於時間輪自定義了一個用於實現延遲功能的定時器(SystemTimer),Kafka中的時間輪(TimingWheel)是一個存儲定時任務的環形隊列,可以進行相關的延時隊列設置。

 

 

三、Netty實現延時隊列

Netty也有基於時間輪演算法來實現延時隊列。Netty在構建延時隊列主要用HashedWheelTimer,HashedWheelTimer底層數據結構是使用DelayedQueue,採用時間輪的演算法來實現。

 

四、DelayQueue來實現延時隊列

Java中有自帶的DelayQueue數據類型,我們可以用這個來實現延時隊列。DelayQueue是封裝了一個PriorityQueue(優先隊列),在向DelayQueue隊列中添加元素時,會給元素一個Delay(延遲時間)作為排序條件,隊列中最小的元素會優先放在隊首,對於隊列中的元素只有到了Delay時間才允許從隊列中取出。這種實現方式是數據保存在記憶體中,可能面臨數據丟失的情況,同時它是無法支援分散式系統的。

 

 

 

 相關程式碼我之後會上傳到我的GitHub上呀~(沖沖沖,小夥伴們!)又是美好的一天。

Tags: