延時隊列我在項目里是怎麼實現的?
- 2022 年 6 月 6 日
- 筆記
我是3y,一年CRUD
經驗用十年的markdown
程式設計師👨🏻💻常年被譽為職業八股文選手
前陣子,有個小夥伴找到問我,如果要實現延時發送,那是基於什麼來做的。
我看到這個問題之後,稍微思考了下,覺得確實也是austin
平台所需要實現的功能。對於前端而言,只要讓業務方在創建模板的時候填選屏蔽類型,後端根據這個欄位增添一點點細節,這個需求就做完了,簡單!
延遲消息如何實現?
延遲消息就是字面上的意思:當接收到消息之後,我需要隔一段時間進行處理(相對於立馬處理,它隔了一段時間,所以他叫延遲消息)。
在原生的Java有DelayQueue
供我們去使用,在使用的時候,我們add
進去的隊列的元素需要實現Delayed
介面(同時該介面繼承了Comparable
介面,所以我們DelayQueue
是有序的)
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
從poll
的源碼上可以清晰地發現本質上就是在取數的時候判斷了下時間
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
有的人就反駁到:這不是廢話嗎?肯定要判斷時間啊,不判斷時間怎麼知道我要延遲的消息什麼時候執行。
明白了這點之後,我們再來別的方案。因為在生產環境中是不太可能使用JDK原生延遲隊列的,它是沒有持久化的,重啟就會導致數據丟失。
當austin
項目使用記憶體隊列去解耦處理數據已經有人提出伺服器重啟的時候該怎麼辦,我的解決思路就是通過優雅關閉伺服器這種手段去盡量避免數據丟失,而延遲隊列這種就不能這麼幹了,我們等不了這麼久的。
稍微想想還有什麼存儲適合當隊列且有持久化機制的呢?
答案顯而易見:Redis
和消息隊列(Kafka
/RocketMQ
/RabbmitMQ
等)
我們先來看Redis
里提供了一種數據結構叫做zset
,它是可排序的集合併且Redis原生就支援持久化。有贊的延遲隊列就是基於通過zset
進行設計和存儲的。整體架構如下圖:
簡單理解這張圖就是:將需要延遲的消息放置Redis,通過Timer
輪詢得到可執行的消息,將可執行的消息放置不同的Topic
供業務方自行消費。
更多的設計思路可以參考有贊的技術原文,這裡我不再贅述://tech.youzan.com/queuing_delay/
通過timer
去輪詢zset
查看是否有可執行的消息是一種思路,也有人通過Redis的過期回調的姿勢也能達到延遲消息的效果(把消息執行的時間定義為key
過期的時間,當key
觸發了過期回調,那說明該消息可執行了)。
說完Redis
,我們再來看看消息隊列。在austin
項目上使用消息隊列是Kafka
,而Kafka
在官方是沒有提供延遲隊列這種機制的。不過RabbmitMQ
和RocketMQ
都有對應的機制,我們可以簡單看看窺探下它們的實現思路。
RabbmitMQ
它的延遲隊列機制本質上也是通過TTL
(Time To Live 消息存活的時間)所實現的,當隊列里的元素觸發了過期時,會被送往到Dead Letter Exchanges
(死信隊列中)。我們可以將死信隊列的元素再次轉發,對其進行消費,從而達到延遲隊列的效果。
畢竟RabbmitMQ
是專門做消息隊列的,所以它對消息的可靠性會比Redis
更加高(消息投遞的可靠性、至少處理一次的消費語義)
RocketMQ
支援在我們投遞消息的時候設置延遲等級
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
默認支援18個延遲等級,分別是:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
當我們設置了延遲等級的消息之後,RocketMQ
不會把消息直接投遞到對應的topic,而是轉發到對應延遲等級的隊列中。在Broker內部會為每個延遲隊列起TimerTask
來進行判斷是否有消息到達了時間。
ScheduleMessageService#start
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
如果到期了,則將消息重新存儲到CommitLog,轉發到真正目標的topic
對RocketMQ
延遲隊列比較感興趣的,推薦看這篇文章://cloud.tencent.com/developer/article/1581368
實現需求
在前面提到我們可以利用JDK原生的延時隊列,又或是Redis的zset
數據結構或者其過期時間機制、又或是RabbitMQ
使用TTL
+死信隊列機制、又或是RocketMQ
的延時等級隊列機制來實現我們的需求(延時隊列)
針對此次需求,上面所講的延時隊列,我都沒用到…
austin
項目引入的是Kafka
,不太可能去為了延時隊列去引入第二種消息隊列(RabbitMQ
在互聯網應該用得相對較少,RocketMQ
需要改動配置文件的延遲等級才能支援更豐富的延時需求)。
如果基於Kafka或者Redis去二次開發延時隊列,開發成本還是有不少的,在GitHub也還沒撈到我想要的輪子。
於是,我換了一種方案:萬物皆掃表
針對這次需求(晚上發的消息,次日早上發送),就不需要上延時隊列,因為austin
已經接入了分散式定時任務框架了(對應的實現是xxl-job
)
只要把晚上的接收到的消息扔進Redis list
,然後啟個定時任務(每天早上9點)輪詢該list
是否有數據,如果有再重新做處理就完事了。
總結
這篇文章主要講述了如果我們要使用延時隊列,我們可以有什麼方案,他們的設計是怎麼樣的。在需求側上看,這個需求就是「延時隊列」的場景,但基於現狀的系統架構和開發成本考慮,我們是可以用另類(分散式定時任務框架)的方式去把需求給實現了。
很多時候,我們看到的系統很爛,技術棧很爛,發現好多場景都沒有用到最佳實踐而感到懊惱,在年輕的時候都想有重構的心。但實際上每引入一個中間件都是需要付出成本的,粗糙也有粗糙的好處。
只要業務能完美支援,那就是好的方案。想要搞自己想搞的技術,那就做開源,如果有一天我覺得分散式定時任務來實現此次需求不順眼了,我再花時間來重構才幹掉,現在就這麼實現吧( // TODO)。
如果你實在是覺得看著糟心,歡迎提個pull request
,這樣我就不得不把這種實現給幹掉了(我對提過來的pull request
都會謹慎且用心處理)
都看到這裡了,點個贊一點都不過分吧?我是3y,下期見。
關注我的微信公眾號【Java3y】除了技術我還會聊點日常,有些話只能悄悄說~ 【對線面試官+從零編寫Java項目】 持續高強度更新中!求star!!原創不易!!求三連!!
austin項目源碼Gitee鏈接:gitee.com/austin
austin項目源碼GitHub鏈接:github.com/austin